From 58071bc9e59351fd1617bc8cef8246f17c18c4d1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Aug 2024 14:05:23 +0100 Subject: [PATCH] Split out fetching of newly joined/left rooms --- synapse/handlers/sliding_sync/room_lists.py | 160 +++++++++++--------- 1 file changed, 90 insertions(+), 70 deletions(-) diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index 3bfb6c9323..aad68ce5a9 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -25,6 +25,7 @@ from typing import ( Mapping, Optional, Set, + Tuple, Union, ) @@ -426,16 +427,16 @@ class SlidingSyncRoomLists: ) @trace - async def _rewind_current_membership_to_token( + async def _get_rewind_changes_to_current_membership_to_token( self, user: UserID, rooms_for_user: Mapping[str, RoomsForUser], to_token: StreamToken, - ) -> Mapping[str, RoomsForUser]: + ) -> Mapping[str, Optional[RoomsForUser]]: """ Takes the current set of rooms for a user (retrieved after the given - token), and "rewinds" it to match the set of memberships *at that - token*. + token), and returns the changes need to "rewind" it to match the set of + memberships *at that token*. Args: user: User to fetch rooms for @@ -443,7 +444,7 @@ class SlidingSyncRoomLists: to_token: The token to rewind Returns: - The set of memberships for the user at the given token + The changes to apply to rewind the the current memberships. """ # If the user has never joined any rooms before, we can just return an empty list if not rooms_for_user: @@ -511,11 +512,11 @@ class SlidingSyncRoomLists: if not current_state_delta_membership_changes_after_to_token: # There have been no membership changes, so we can early return. - return rooms_for_user + return {} # Otherwise we're about to make changes to `rooms_for_user`, so we turn # it into a mutable dict. - rooms_for_user = dict(rooms_for_user) + changes: Dict[str, Optional[RoomsForUser]] = {} # Assemble a list of the first membership event after the `to_token` so we can # step backward to the previous membership that would apply to the from/to @@ -538,7 +539,7 @@ class SlidingSyncRoomLists: ) in first_membership_change_by_room_id_after_to_token.items(): # 1a) Remove rooms that the user joined after the `to_token` if first_membership_change_after_to_token.prev_event_id is None: - rooms_for_user.pop(room_id, None) + changes[room_id] = None # 1b) 1c) From the first membership event after the `to_token`, step backward to the # previous membership that would apply to the from/to range. else: @@ -560,7 +561,7 @@ class SlidingSyncRoomLists: else: room_version_id = await self.store.get_room_version_id(room_id) - rooms_for_user[room_id] = RoomsForUser( + changes[room_id] = RoomsForUser( room_id=room_id, event_id=first_membership_change_after_to_token.prev_event_id, event_pos=first_membership_change_after_to_token.prev_event_pos, @@ -572,9 +573,9 @@ class SlidingSyncRoomLists: # If we can't find the previous membership event, we shouldn't # include the room in the sync response since we can't determine the # exact membership state and shouldn't rely on the current snapshot. - rooms_for_user.pop(room_id, None) + changes[room_id] = None - return rooms_for_user + return changes @trace async def get_room_membership_for_user_at_to_token( @@ -624,12 +625,23 @@ class SlidingSyncRoomLists: # Since we fetched the users room list at some point in time after the # tokens, we need to revert/rewind some membership changes to match the point in # time of the `to_token`. - rooms_for_user: Mapping[str, RoomsForUser] = { - room.room_id: room for room in room_for_user_list - } - rooms_for_user = await self._rewind_current_membership_to_token( + rooms_for_user = {room.room_id: room for room in room_for_user_list} + changes = await self._get_rewind_changes_to_current_membership_to_token( user, rooms_for_user, to_token ) + for room_id, change_room_for_user in changes.items(): + if change_room_for_user is None: + rooms_for_user.pop(room_id, None) + else: + rooms_for_user[room_id] = change_room_for_user + + newly_joined_room_ids, newly_left_room_ids = ( + await self._get_newly_joined_and_left_rooms( + user_id, to_token=to_token, from_token=from_token + ) + ) + + dm_room_ids = await self._get_dm_rooms_for_user(user_id) # Our working list of rooms that can show up in the sync response sync_room_id_set = { @@ -643,18 +655,59 @@ class SlidingSyncRoomLists: membership=room_for_user.membership, sender=room_for_user.sender, # We will update these fields below to be accurate - newly_joined=False, - newly_left=False, - is_dm=False, + newly_joined=room_id in newly_joined_room_ids, + newly_left=room_id in newly_left_room_ids, + is_dm=room_id in dm_room_ids, ) - for room_for_user in rooms_for_user.values() + for room_id, room_for_user in rooms_for_user.items() } - # We now need to figure out the + # Ensure we have entries for rooms that the user has been "state reset" + # out of. These are rooms appear in the `newly_left_rooms` map but + # aren't in the `rooms_for_user` map. + for room_id, left_event_pos in newly_left_room_ids.items(): + if room_id in sync_room_id_set: + continue + + sync_room_id_set[room_id] = _RoomMembershipForUser( + room_id=room_id, + event_id=None, + event_pos=left_event_pos, + membership=Membership.LEAVE, + sender=None, + # We will update these fields below to be accurate + newly_joined=False, + newly_left=True, + is_dm=room_id in dm_room_ids, + ) + + return sync_room_id_set + + @trace + async def _get_newly_joined_and_left_rooms( + self, + user_id: str, + to_token: StreamToken, + from_token: Optional[StreamToken], + ) -> Tuple[StrCollection, Mapping[str, PersistedEventPosition]]: + """Fetch the sets of rooms that the user newly joined or left in the + given token range. + + Note: there may be rooms in the newly left rooms where the user was + "state reset" out of the room, and so that room would not be part of the + "current memberships" of the user. + + Returns: + A 2-tuple of newly joined room IDs and a map of newly left room + IDs to the event position the leave happened at. + """ + newly_joined_room_ids: Set[str] = set() + newly_left_room_map: Dict[str, PersistedEventPosition] = {} + + # We need to figure out the # # - 1) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`) # - 2) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`) - # - 3) Figure out which rooms are DM's # 1) Fetch membership changes that fall in the range from `from_token` up to `to_token` current_state_delta_membership_changes_in_from_to_range = [] @@ -720,37 +773,9 @@ class SlidingSyncRoomLists: # 1) Figure out newly_left rooms (> `from_token` and <= `to_token`). if last_membership_change_in_from_to_range.membership == Membership.LEAVE: # 1) Mark this room as `newly_left` - - # If we're seeing a membership change here, we should expect to already - # have it in our snapshot but if a state reset happens, it wouldn't have - # shown up in our snapshot but appear as a change here. - existing_sync_entry = sync_room_id_set.get(room_id) - if existing_sync_entry is not None: - # Normal expected case - sync_room_id_set[room_id] = existing_sync_entry.copy_and_replace( - newly_left=True - ) - else: - # State reset! - logger.warn( - "State reset detected for room_id %s with %s who is no longer in the room", - room_id, - user_id, - ) - # Even though a state reset happened which removed the person from - # the room, we still add it the list so the user knows they left the - # room. Downstream code can check for a state reset by looking for - # `event_id=None and membership is not None`. - sync_room_id_set[room_id] = _RoomMembershipForUser( - room_id=room_id, - event_id=last_membership_change_in_from_to_range.event_id, - event_pos=last_membership_change_in_from_to_range.event_pos, - membership=last_membership_change_in_from_to_range.membership, - sender=last_membership_change_in_from_to_range.sender, - newly_joined=False, - newly_left=True, - is_dm=False, - ) + newly_left_room_map[room_id] = ( + last_membership_change_in_from_to_range.event_pos + ) # 2) Figure out `newly_joined` for room_id in possibly_newly_joined_room_ids: @@ -761,9 +786,7 @@ class SlidingSyncRoomLists: # also some non-join in the range, we know they `newly_joined`. if has_non_join_in_from_to_range: # We found a `newly_joined` room (we left and joined within the token range) - sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace( - newly_joined=True - ) + newly_joined_room_ids.add(room_id) else: prev_event_id = first_membership_change_by_room_id_in_from_to_range[ room_id @@ -775,20 +798,23 @@ class SlidingSyncRoomLists: if prev_event_id is None: # We found a `newly_joined` room (we are joining the room for the # first time within the token range) - sync_room_id_set[room_id] = sync_room_id_set[ - room_id - ].copy_and_replace(newly_joined=True) + newly_joined_room_ids.add(room_id) # Last resort, we need to step back to the previous membership event # just before the token range to see if we're joined then or not. elif prev_membership != Membership.JOIN: # We found a `newly_joined` room (we left before the token range # and joined within the token range) - sync_room_id_set[room_id] = sync_room_id_set[ - room_id - ].copy_and_replace(newly_joined=True) + newly_joined_room_ids.add(room_id) + + return newly_joined_room_ids, newly_left_room_map + + @trace + async def _get_dm_rooms_for_user( + self, + user_id: str, + ) -> StrCollection: + """Get the set of DM rooms for the user.""" - # 3) Figure out which rooms the user considers to be direct-message (DM) rooms - # # We're using global account data (`m.direct`) instead of checking for # `is_direct` on membership events because that property only appears for # the invitee membership event (doesn't show up for the inviter). @@ -810,13 +836,7 @@ class SlidingSyncRoomLists: if isinstance(room_id, str): dm_room_id_set.add(room_id) - # 3) Fixup - for room_id in sync_room_id_set: - sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace( - is_dm=room_id in dm_room_id_set - ) - - return sync_room_id_set + return dm_room_id_set @trace async def filter_rooms_relevant_for_sync(