Use stream_id of some point before we fetch the current state

This is simpler and some rooms are so old that they don't have
`current_state_delta_stream` yet. It's easier if we just get a
general max `stream_id` of the whole table than the max `stream_id`
for the specific room anyway.

Thanks @erikjohnston
This commit is contained in:
Eric Eastwood 2024-08-27 19:45:50 -05:00
parent 9d08bc2157
commit a507f152c9
2 changed files with 14 additions and 36 deletions

View file

@ -1849,7 +1849,7 @@ class PersistEventsStore:
@classmethod @classmethod
def _get_relevant_sliding_sync_current_state_event_ids_txn( def _get_relevant_sliding_sync_current_state_event_ids_txn(
cls, txn: LoggingTransaction, room_id: str cls, txn: LoggingTransaction, room_id: str
) -> Tuple[MutableStateMap[str], int]: ) -> MutableStateMap[str]:
""" """
Fetch the current state event IDs for the relevant (to the Fetch the current state event IDs for the relevant (to the
`sliding_sync_joined_rooms` table) state types for the given room. `sliding_sync_joined_rooms` table) state types for the given room.
@ -1888,24 +1888,7 @@ class PersistEventsStore:
(event_type, state_key): event_id for event_id, event_type, state_key in txn (event_type, state_key): event_id for event_id, event_type, state_key in txn
} }
txn.execute( return current_state_map
"""
SELECT stream_id
FROM current_state_delta_stream
WHERE
room_id = ?
ORDER BY stream_id DESC
LIMIT 1
""",
(room_id,),
)
row = txn.fetchone()
# If we're able to fetch the `current_state_events` above, we should have rows
# in `current_state_delta_stream` as well.
assert row, "Failed to fetch the `last_current_state_delta_stream_id`"
last_current_state_delta_stream_id = row[0]
return current_state_map, last_current_state_delta_stream_id
@classmethod @classmethod
def _get_sliding_sync_insert_values_from_state_map( def _get_sliding_sync_insert_values_from_state_map(

View file

@ -123,10 +123,6 @@ class _JoinedRoomStreamOrderingUpdate:
most_recent_event_stream_ordering: int most_recent_event_stream_ordering: int
# The most recent event `bump_stamp` for the room # The most recent event `bump_stamp` for the room
most_recent_bump_stamp: Optional[int] most_recent_bump_stamp: Optional[int]
# The `stream_ordering` in the `current_state_delta_stream` that we got the state
# values from. We can use this to check if the current state has been updated since
# we last checked.
last_current_state_delta_stream_id: int
class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseStore): class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseStore):
@ -1622,7 +1618,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
# Map from room_id to insert/update state values in the `sliding_sync_joined_rooms` table. # Map from room_id to insert/update state values in the `sliding_sync_joined_rooms` table.
joined_room_updates: Dict[str, SlidingSyncStateInsertValues] = {} joined_room_updates: Dict[str, SlidingSyncStateInsertValues] = {}
# Map from room_id to stream_ordering/bump_stamp/last_current_state_delta_stream_id values # Map from room_id to stream_ordering/bump_stamp, etc values
joined_room_stream_ordering_updates: Dict[ joined_room_stream_ordering_updates: Dict[
str, _JoinedRoomStreamOrderingUpdate str, _JoinedRoomStreamOrderingUpdate
] = {} ] = {}
@ -1632,15 +1628,18 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
# `event_stream_ordering` order *ascending* to save our progress position # `event_stream_ordering` order *ascending* to save our progress position
# correctly if we need to exit early. # correctly if we need to exit early.
room_id_to_progress_marker_map: OrderedDict[str, int] = OrderedDict() room_id_to_progress_marker_map: OrderedDict[str, int] = OrderedDict()
# As long as we get this value before we fetch the current state, we can use it
# to check if something has changed since that point.
most_recent_current_state_delta_stream_id = (
await self.get_max_stream_id_in_current_state_deltas()
)
for room_id, progress_event_stream_ordering in rooms_to_update: for room_id, progress_event_stream_ordering in rooms_to_update:
room_id_to_progress_marker_map[room_id] = progress_event_stream_ordering room_id_to_progress_marker_map[room_id] = progress_event_stream_ordering
current_state_ids_map, last_current_state_delta_stream_id = ( current_state_ids_map = await self.db_pool.runInteraction(
await self.db_pool.runInteraction( "_sliding_sync_joined_rooms_bg_update._get_relevant_sliding_sync_current_state_event_ids_txn",
"_sliding_sync_joined_rooms_bg_update._get_relevant_sliding_sync_current_state_event_ids_txn", PersistEventsStore._get_relevant_sliding_sync_current_state_event_ids_txn,
PersistEventsStore._get_relevant_sliding_sync_current_state_event_ids_txn, room_id,
room_id,
)
) )
# We're iterating over rooms pulled from the current_state_events table # We're iterating over rooms pulled from the current_state_events table
# so we should have some current state for each room # so we should have some current state for each room
@ -1694,7 +1693,6 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
_JoinedRoomStreamOrderingUpdate( _JoinedRoomStreamOrderingUpdate(
most_recent_event_stream_ordering=most_recent_event_stream_ordering, most_recent_event_stream_ordering=most_recent_event_stream_ordering,
most_recent_bump_stamp=most_recent_bump_stamp, most_recent_bump_stamp=most_recent_bump_stamp,
last_current_state_delta_stream_id=last_current_state_delta_stream_id,
) )
) )
@ -1718,9 +1716,6 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
joined_room_update.most_recent_event_stream_ordering joined_room_update.most_recent_event_stream_ordering
) )
bump_stamp = joined_room_update.most_recent_bump_stamp bump_stamp = joined_room_update.most_recent_bump_stamp
last_current_state_delta_stream_id = (
joined_room_update.last_current_state_delta_stream_id
)
# Check if the current state has been updated since we gathered it # Check if the current state has been updated since we gathered it
state_deltas_since_we_gathered_current_state = ( state_deltas_since_we_gathered_current_state = (
@ -1728,7 +1723,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
txn, txn,
room_id, room_id,
from_token=RoomStreamToken( from_token=RoomStreamToken(
stream=last_current_state_delta_stream_id stream=most_recent_current_state_delta_stream_id
), ),
to_token=None, to_token=None,
) )
@ -1763,7 +1758,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
# Since we partially update the `sliding_sync_joined_rooms` as new state # Since we partially update the `sliding_sync_joined_rooms` as new state
# is sent, we need to update the state fields `ON CONFLICT`. We just # is sent, we need to update the state fields `ON CONFLICT`. We just
# have to be careful we're not overwriting it with stale data (see # have to be careful we're not overwriting it with stale data (see
# `last_current_state_delta_stream_id` check above). # `most_recent_current_state_delta_stream_id` check above).
# #
self.db_pool.simple_upsert_txn( self.db_pool.simple_upsert_txn(
txn, txn,