diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 940d5570ad..f6c03a0650 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -2051,7 +2051,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS ) return 0 - def _find_previous_membership_txn( + def _find_previous_invite_knock_membership_txn( txn: LoggingTransaction, room_id: str, user_id: str, event_id: str ) -> Tuple[str, str]: # Find the previous invite/knock event before the leave event @@ -2109,9 +2109,19 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS to_insert_membership_snapshots: Dict[ Tuple[str, str], SlidingSyncMembershipSnapshotSharedInsertValues ] = {} + # Map from (room_id, user_id) to ... to_insert_membership_infos: Dict[ Tuple[str, str], SlidingSyncMembershipInfoWithEventPos ] = {} + + # Map from room_id to ... + # Just some convenience maps for easier lookup by `room_id`. + last_to_insert_membership_snapshots_by_room_id: Dict[ + str, SlidingSyncMembershipSnapshotSharedInsertValues + ] = {} + last_to_insert_membership_infos_by_room_id: Dict[ + str, SlidingSyncMembershipInfoWithEventPos + ] = {} for ( room_id, room_id_from_rooms_table, @@ -2153,9 +2163,56 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS }, ) + # Check if the current state has been updated since the last snapshot we inserted. + can_use_last_snapshot = False + # TODO: We could also look in the database to see if a snapshot is available + last_to_insert_membership_snapshot = ( + last_to_insert_membership_snapshots_by_room_id.get(room_id) + ) + last_to_insert_membership_info = ( + last_to_insert_membership_infos_by_room_id.get(room_id) + ) + if last_to_insert_membership_snapshot is not None: + can_use_last_snapshot = True + + # These should go hand-in-hand + assert last_to_insert_membership_info is not None + + state_deltas_since_last_snapshot = await self.get_current_state_deltas_for_room( + room_id, + # From the last snapshot we inserted + from_token=RoomStreamToken( + stream=last_to_insert_membership_info.membership_event_stream_ordering, + ), + # To our current membership position + to_token=RoomStreamToken( + stream=membership_event_stream_ordering, + ), + ) + for state_delta in state_deltas_since_last_snapshot: + # We only need to check for the state is relevant to the + # `sliding_sync_joined_rooms` table. + if ( + state_delta.event_type, + state_delta.state_key, + ) in SLIDING_SYNC_RELEVANT_STATE_SET: + can_use_last_snapshot = False + # Once we find one relevant state event that changed, no need to + # look any further + break + # Map of values to insert/update in the `sliding_sync_membership_snapshots` table sliding_sync_membership_snapshots_insert_map: SlidingSyncMembershipSnapshotSharedInsertValues = {} - if membership == Membership.JOIN: + # If none of the relevant state has changed since the last snapshot, we can + # just re-use the last snapshot we inserted. + if can_use_last_snapshot: + # Based on the logic above, we should have a last snapshot to work from + assert last_to_insert_membership_snapshot is not None + + sliding_sync_membership_snapshots_insert_map = ( + last_to_insert_membership_snapshot + ) + elif membership == Membership.JOIN: # If we're still joined, we can pull from current state. current_state_ids_map: StateMap[ str @@ -2240,8 +2297,8 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS invite_or_knock_event_id, invite_or_knock_membership, ) = await self.db_pool.runInteraction( - "sliding_sync_membership_snapshots_bg_update._find_previous_membership", - _find_previous_membership_txn, + "sliding_sync_membership_snapshots_bg_update._find_previous_invite_knock_membership_txn", + _find_previous_invite_knock_membership_txn, room_id, user_id, membership_event_id, @@ -2327,10 +2384,14 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS f"Unexpected membership {membership} ({membership_event_id}) that we don't know how to handle yet" ) - to_insert_membership_snapshots[(room_id, user_id)] = ( + to_insert_membership_snapshots[ + (room_id, user_id) + ] = last_to_insert_membership_snapshots_by_room_id[room_id] = ( sliding_sync_membership_snapshots_insert_map ) - to_insert_membership_infos[(room_id, user_id)] = ( + to_insert_membership_infos[ + (room_id, user_id) + ] = last_to_insert_membership_infos_by_room_id[room_id] = ( SlidingSyncMembershipInfoWithEventPos( user_id=user_id, sender=sender, @@ -2348,7 +2409,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS # for key, insert_map in to_insert_membership_snapshots.items(): room_id, user_id = key - membership_info = to_insert_membership_infos[key] + membership_info = to_insert_membership_infos[(room_id, user_id)] sender = membership_info.sender membership_event_id = membership_info.membership_event_id membership = membership_info.membership