From a507f152c907681431c96e8eb4ff8cce177177e1 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 27 Aug 2024 19:45:50 -0500 Subject: [PATCH] Use `stream_id` of some point before we fetch the current state This is simpler and some rooms are so old that they don't have `current_state_delta_stream` yet. It's easier if we just get a general max `stream_id` of the whole table than the max `stream_id` for the specific room anyway. Thanks @erikjohnston --- synapse/storage/databases/main/events.py | 21 ++------------ .../databases/main/events_bg_updates.py | 29 ++++++++----------- 2 files changed, 14 insertions(+), 36 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index fa41d33920..f8d176d133 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1849,7 +1849,7 @@ class PersistEventsStore: @classmethod def _get_relevant_sliding_sync_current_state_event_ids_txn( cls, txn: LoggingTransaction, room_id: str - ) -> Tuple[MutableStateMap[str], int]: + ) -> MutableStateMap[str]: """ Fetch the current state event IDs for the relevant (to the `sliding_sync_joined_rooms` table) state types for the given room. @@ -1888,24 +1888,7 @@ class PersistEventsStore: (event_type, state_key): event_id for event_id, event_type, state_key in txn } - txn.execute( - """ - SELECT stream_id - FROM current_state_delta_stream - WHERE - room_id = ? - ORDER BY stream_id DESC - LIMIT 1 - """, - (room_id,), - ) - row = txn.fetchone() - # If we're able to fetch the `current_state_events` above, we should have rows - # in `current_state_delta_stream` as well. - assert row, "Failed to fetch the `last_current_state_delta_stream_id`" - last_current_state_delta_stream_id = row[0] - - return current_state_map, last_current_state_delta_stream_id + return current_state_map @classmethod def _get_sliding_sync_insert_values_from_state_map( diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 52b4450bbc..38a786c001 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -123,10 +123,6 @@ class _JoinedRoomStreamOrderingUpdate: most_recent_event_stream_ordering: int # The most recent event `bump_stamp` for the room most_recent_bump_stamp: Optional[int] - # The `stream_ordering` in the `current_state_delta_stream` that we got the state - # values from. We can use this to check if the current state has been updated since - # we last checked. - last_current_state_delta_stream_id: int class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseStore): @@ -1622,7 +1618,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS # Map from room_id to insert/update state values in the `sliding_sync_joined_rooms` table. joined_room_updates: Dict[str, SlidingSyncStateInsertValues] = {} - # Map from room_id to stream_ordering/bump_stamp/last_current_state_delta_stream_id values + # Map from room_id to stream_ordering/bump_stamp, etc values joined_room_stream_ordering_updates: Dict[ str, _JoinedRoomStreamOrderingUpdate ] = {} @@ -1632,15 +1628,18 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS # `event_stream_ordering` order *ascending* to save our progress position # correctly if we need to exit early. room_id_to_progress_marker_map: OrderedDict[str, int] = OrderedDict() + # As long as we get this value before we fetch the current state, we can use it + # to check if something has changed since that point. + most_recent_current_state_delta_stream_id = ( + await self.get_max_stream_id_in_current_state_deltas() + ) for room_id, progress_event_stream_ordering in rooms_to_update: room_id_to_progress_marker_map[room_id] = progress_event_stream_ordering - current_state_ids_map, last_current_state_delta_stream_id = ( - await self.db_pool.runInteraction( - "_sliding_sync_joined_rooms_bg_update._get_relevant_sliding_sync_current_state_event_ids_txn", - PersistEventsStore._get_relevant_sliding_sync_current_state_event_ids_txn, - room_id, - ) + current_state_ids_map = await self.db_pool.runInteraction( + "_sliding_sync_joined_rooms_bg_update._get_relevant_sliding_sync_current_state_event_ids_txn", + PersistEventsStore._get_relevant_sliding_sync_current_state_event_ids_txn, + room_id, ) # We're iterating over rooms pulled from the current_state_events table # so we should have some current state for each room @@ -1694,7 +1693,6 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS _JoinedRoomStreamOrderingUpdate( most_recent_event_stream_ordering=most_recent_event_stream_ordering, most_recent_bump_stamp=most_recent_bump_stamp, - last_current_state_delta_stream_id=last_current_state_delta_stream_id, ) ) @@ -1718,9 +1716,6 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS joined_room_update.most_recent_event_stream_ordering ) bump_stamp = joined_room_update.most_recent_bump_stamp - last_current_state_delta_stream_id = ( - joined_room_update.last_current_state_delta_stream_id - ) # Check if the current state has been updated since we gathered it state_deltas_since_we_gathered_current_state = ( @@ -1728,7 +1723,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS txn, room_id, from_token=RoomStreamToken( - stream=last_current_state_delta_stream_id + stream=most_recent_current_state_delta_stream_id ), to_token=None, ) @@ -1763,7 +1758,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS # Since we partially update the `sliding_sync_joined_rooms` as new state # is sent, we need to update the state fields `ON CONFLICT`. We just # have to be careful we're not overwriting it with stale data (see - # `last_current_state_delta_stream_id` check above). + # `most_recent_current_state_delta_stream_id` check above). # self.db_pool.simple_upsert_txn( txn,