From 74bec29c1d16646f78f3e4218eeeccc7fb2d58ad Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Aug 2024 13:41:57 +0100 Subject: [PATCH 01/17] Split out _rewind_current_membership_to_token function --- synapse/handlers/sliding_sync/room_lists.py | 299 ++++++++++++-------- 1 file changed, 177 insertions(+), 122 deletions(-) diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index 0e6cb28524..3bfb6c9323 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -46,6 +46,7 @@ from synapse.storage.databases.main.state import ( Sentinel as StateSentinel, ) from synapse.storage.databases.main.stream import CurrentStateDeltaMembership +from synapse.storage.roommember import RoomsForUser from synapse.types import ( MutableStateMap, PersistedEventPosition, @@ -424,6 +425,157 @@ class SlidingSyncRoomLists: room_membership_for_user_map=room_membership_for_user_map, ) + @trace + async def _rewind_current_membership_to_token( + self, + user: UserID, + rooms_for_user: Mapping[str, RoomsForUser], + to_token: StreamToken, + ) -> Mapping[str, RoomsForUser]: + """ + Takes the current set of rooms for a user (retrieved after the given + token), and "rewinds" it to match the set of memberships *at that + token*. + + Args: + user: User to fetch rooms for + rooms_for_user: The set of rooms for the user after the `to_token`. + to_token: The token to rewind + + Returns: + The set of memberships for the user at the given token + """ + # If the user has never joined any rooms before, we can just return an empty list + if not rooms_for_user: + return {} + + user_id = user.to_string() + + # Get the `RoomStreamToken` that represents the spot we queried up to when we got + # our membership snapshot from `get_rooms_for_local_user_where_membership_is()`. + # + # First, we need to get the max stream_ordering of each event persister instance + # that we queried events from. + instance_to_max_stream_ordering_map: Dict[str, int] = {} + for room_for_user in rooms_for_user.values(): + instance_name = room_for_user.event_pos.instance_name + stream_ordering = room_for_user.event_pos.stream + + current_instance_max_stream_ordering = ( + instance_to_max_stream_ordering_map.get(instance_name) + ) + if ( + current_instance_max_stream_ordering is None + or stream_ordering > current_instance_max_stream_ordering + ): + instance_to_max_stream_ordering_map[instance_name] = stream_ordering + + # Then assemble the `RoomStreamToken` + min_stream_pos = min(instance_to_max_stream_ordering_map.values()) + membership_snapshot_token = RoomStreamToken( + # Minimum position in the `instance_map` + stream=min_stream_pos, + instance_map=immutabledict( + { + instance_name: stream_pos + for instance_name, stream_pos in instance_to_max_stream_ordering_map.items() + if stream_pos > min_stream_pos + } + ), + ) + + # Since we fetched the users room list at some point in time after the + # tokens, we need to revert/rewind some membership changes to match the point in + # time of the `to_token`. In particular, we need to make these fixups: + # + # - a) Remove rooms that the user joined after the `to_token` + # - b) Add back rooms that the user left after the `to_token` + # - c) Update room membership events to the point in time of the `to_token` + + # Fetch membership changes that fall in the range from `to_token` up to + # `membership_snapshot_token` + # + # If our `to_token` is already the same or ahead of the latest room membership + # for the user, we don't need to do any "2)" fix-ups and can just straight-up + # use the room list from the snapshot as a base (nothing has changed) + current_state_delta_membership_changes_after_to_token = [] + if not membership_snapshot_token.is_before_or_eq(to_token.room_key): + current_state_delta_membership_changes_after_to_token = ( + await self.store.get_current_state_delta_membership_changes_for_user( + user_id, + from_key=to_token.room_key, + to_key=membership_snapshot_token, + excluded_room_ids=self.rooms_to_exclude_globally, + ) + ) + + if not current_state_delta_membership_changes_after_to_token: + # There have been no membership changes, so we can early return. + return rooms_for_user + + # Otherwise we're about to make changes to `rooms_for_user`, so we turn + # it into a mutable dict. + rooms_for_user = dict(rooms_for_user) + + # Assemble a list of the first membership event after the `to_token` so we can + # step backward to the previous membership that would apply to the from/to + # range. + first_membership_change_by_room_id_after_to_token: Dict[ + str, CurrentStateDeltaMembership + ] = {} + for membership_change in current_state_delta_membership_changes_after_to_token: + # Only set if we haven't already set it + first_membership_change_by_room_id_after_to_token.setdefault( + membership_change.room_id, membership_change + ) + + # Since we fetched a snapshot of the users room list at some point in time after + # the from/to tokens, we need to revert/rewind some membership changes to match + # the point in time of the `to_token`. + for ( + room_id, + first_membership_change_after_to_token, + ) in first_membership_change_by_room_id_after_to_token.items(): + # 1a) Remove rooms that the user joined after the `to_token` + if first_membership_change_after_to_token.prev_event_id is None: + rooms_for_user.pop(room_id, None) + # 1b) 1c) From the first membership event after the `to_token`, step backward to the + # previous membership that would apply to the from/to range. + else: + # We don't expect these fields to be `None` if we have a `prev_event_id` + # but we're being defensive since it's possible that the prev event was + # culled from the database. + if ( + first_membership_change_after_to_token.prev_event_pos is not None + and first_membership_change_after_to_token.prev_membership + is not None + and first_membership_change_after_to_token.prev_sender is not None + ): + # We need to know the room version ID, which we normally we + # can get from the current membership, but if we don't have + # that then we need to query the DB. + current_membership = rooms_for_user.get(room_id) + if current_membership is not None: + room_version_id = current_membership.room_version_id + else: + room_version_id = await self.store.get_room_version_id(room_id) + + rooms_for_user[room_id] = RoomsForUser( + room_id=room_id, + event_id=first_membership_change_after_to_token.prev_event_id, + event_pos=first_membership_change_after_to_token.prev_event_pos, + membership=first_membership_change_after_to_token.prev_membership, + sender=first_membership_change_after_to_token.prev_sender, + room_version_id=room_version_id, + ) + else: + # If we can't find the previous membership event, we shouldn't + # include the room in the sync response since we can't determine the + # exact membership state and shouldn't rely on the current snapshot. + rooms_for_user.pop(room_id, None) + + return rooms_for_user + @trace async def get_room_membership_for_user_at_to_token( self, @@ -469,6 +621,16 @@ class SlidingSyncRoomLists: if not room_for_user_list: return {} + # Since we fetched the users room list at some point in time after the + # tokens, we need to revert/rewind some membership changes to match the point in + # time of the `to_token`. + rooms_for_user: Mapping[str, RoomsForUser] = { + room.room_id: room for room in room_for_user_list + } + rooms_for_user = await self._rewind_current_membership_to_token( + user, rooms_for_user, to_token + ) + # Our working list of rooms that can show up in the sync response sync_room_id_set = { # Note: The `room_for_user` we're assigning here will need to be fixed up @@ -485,123 +647,16 @@ class SlidingSyncRoomLists: newly_left=False, is_dm=False, ) - for room_for_user in room_for_user_list + for room_for_user in rooms_for_user.values() } - # Get the `RoomStreamToken` that represents the spot we queried up to when we got - # our membership snapshot from `get_rooms_for_local_user_where_membership_is()`. + # We now need to figure out the # - # First, we need to get the max stream_ordering of each event persister instance - # that we queried events from. - instance_to_max_stream_ordering_map: Dict[str, int] = {} - for room_for_user in room_for_user_list: - instance_name = room_for_user.event_pos.instance_name - stream_ordering = room_for_user.event_pos.stream + # - 1) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`) + # - 2) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`) + # - 3) Figure out which rooms are DM's - current_instance_max_stream_ordering = ( - instance_to_max_stream_ordering_map.get(instance_name) - ) - if ( - current_instance_max_stream_ordering is None - or stream_ordering > current_instance_max_stream_ordering - ): - instance_to_max_stream_ordering_map[instance_name] = stream_ordering - - # Then assemble the `RoomStreamToken` - min_stream_pos = min(instance_to_max_stream_ordering_map.values()) - membership_snapshot_token = RoomStreamToken( - # Minimum position in the `instance_map` - stream=min_stream_pos, - instance_map=immutabledict( - { - instance_name: stream_pos - for instance_name, stream_pos in instance_to_max_stream_ordering_map.items() - if stream_pos > min_stream_pos - } - ), - ) - - # Since we fetched the users room list at some point in time after the from/to - # tokens, we need to revert/rewind some membership changes to match the point in - # time of the `to_token`. In particular, we need to make these fixups: - # - # - 1a) Remove rooms that the user joined after the `to_token` - # - 1b) Add back rooms that the user left after the `to_token` - # - 1c) Update room membership events to the point in time of the `to_token` - # - 2) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`) - # - 3) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`) - # - 4) Figure out which rooms are DM's - - # 1) Fetch membership changes that fall in the range from `to_token` up to - # `membership_snapshot_token` - # - # If our `to_token` is already the same or ahead of the latest room membership - # for the user, we don't need to do any "2)" fix-ups and can just straight-up - # use the room list from the snapshot as a base (nothing has changed) - current_state_delta_membership_changes_after_to_token = [] - if not membership_snapshot_token.is_before_or_eq(to_token.room_key): - current_state_delta_membership_changes_after_to_token = ( - await self.store.get_current_state_delta_membership_changes_for_user( - user_id, - from_key=to_token.room_key, - to_key=membership_snapshot_token, - excluded_room_ids=self.rooms_to_exclude_globally, - ) - ) - - # 1) Assemble a list of the first membership event after the `to_token` so we can - # step backward to the previous membership that would apply to the from/to - # range. - first_membership_change_by_room_id_after_to_token: Dict[ - str, CurrentStateDeltaMembership - ] = {} - for membership_change in current_state_delta_membership_changes_after_to_token: - # Only set if we haven't already set it - first_membership_change_by_room_id_after_to_token.setdefault( - membership_change.room_id, membership_change - ) - - # 1) Fixup - # - # Since we fetched a snapshot of the users room list at some point in time after - # the from/to tokens, we need to revert/rewind some membership changes to match - # the point in time of the `to_token`. - for ( - room_id, - first_membership_change_after_to_token, - ) in first_membership_change_by_room_id_after_to_token.items(): - # 1a) Remove rooms that the user joined after the `to_token` - if first_membership_change_after_to_token.prev_event_id is None: - sync_room_id_set.pop(room_id, None) - # 1b) 1c) From the first membership event after the `to_token`, step backward to the - # previous membership that would apply to the from/to range. - else: - # We don't expect these fields to be `None` if we have a `prev_event_id` - # but we're being defensive since it's possible that the prev event was - # culled from the database. - if ( - first_membership_change_after_to_token.prev_event_pos is not None - and first_membership_change_after_to_token.prev_membership - is not None - ): - sync_room_id_set[room_id] = _RoomMembershipForUser( - room_id=room_id, - event_id=first_membership_change_after_to_token.prev_event_id, - event_pos=first_membership_change_after_to_token.prev_event_pos, - membership=first_membership_change_after_to_token.prev_membership, - sender=first_membership_change_after_to_token.prev_sender, - # We will update these fields below to be accurate - newly_joined=False, - newly_left=False, - is_dm=False, - ) - else: - # If we can't find the previous membership event, we shouldn't - # include the room in the sync response since we can't determine the - # exact membership state and shouldn't rely on the current snapshot. - sync_room_id_set.pop(room_id, None) - - # 2) Fetch membership changes that fall in the range from `from_token` up to `to_token` + # 1) Fetch membership changes that fall in the range from `from_token` up to `to_token` current_state_delta_membership_changes_in_from_to_range = [] if from_token: current_state_delta_membership_changes_in_from_to_range = ( @@ -613,7 +668,7 @@ class SlidingSyncRoomLists: ) ) - # 2) Assemble a list of the last membership events in some given ranges. Someone + # 1) Assemble a list of the last membership events in some given ranges. Someone # could have left and joined multiple times during the given range but we only # care about end-result so we grab the last one. last_membership_change_by_room_id_in_from_to_range: Dict[ @@ -646,9 +701,9 @@ class SlidingSyncRoomLists: if membership_change.membership != Membership.JOIN: has_non_join_event_by_room_id_in_from_to_range[room_id] = True - # 2) Fixup + # 1) Fixup # - # 3) We also want to assemble a list of possibly newly joined rooms. Someone + # 2) We also want to assemble a list of possibly newly joined rooms. Someone # could have left and joined multiple times during the given range but we only # care about whether they are joined at the end of the token range so we are # working with the last membership even in the token range. @@ -658,13 +713,13 @@ class SlidingSyncRoomLists: ) in last_membership_change_by_room_id_in_from_to_range.values(): room_id = last_membership_change_in_from_to_range.room_id - # 3) + # 2) if last_membership_change_in_from_to_range.membership == Membership.JOIN: possibly_newly_joined_room_ids.add(room_id) - # 2) Figure out newly_left rooms (> `from_token` and <= `to_token`). + # 1) Figure out newly_left rooms (> `from_token` and <= `to_token`). if last_membership_change_in_from_to_range.membership == Membership.LEAVE: - # 2) Mark this room as `newly_left` + # 1) Mark this room as `newly_left` # If we're seeing a membership change here, we should expect to already # have it in our snapshot but if a state reset happens, it wouldn't have @@ -697,7 +752,7 @@ class SlidingSyncRoomLists: is_dm=False, ) - # 3) Figure out `newly_joined` + # 2) Figure out `newly_joined` for room_id in possibly_newly_joined_room_ids: has_non_join_in_from_to_range = ( has_non_join_event_by_room_id_in_from_to_range.get(room_id, False) @@ -732,7 +787,7 @@ class SlidingSyncRoomLists: room_id ].copy_and_replace(newly_joined=True) - # 4) Figure out which rooms the user considers to be direct-message (DM) rooms + # 3) Figure out which rooms the user considers to be direct-message (DM) rooms # # We're using global account data (`m.direct`) instead of checking for # `is_direct` on membership events because that property only appears for @@ -755,7 +810,7 @@ class SlidingSyncRoomLists: if isinstance(room_id, str): dm_room_id_set.add(room_id) - # 4) Fixup + # 3) Fixup for room_id in sync_room_id_set: sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace( is_dm=room_id in dm_room_id_set From 58071bc9e59351fd1617bc8cef8246f17c18c4d1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Aug 2024 14:05:23 +0100 Subject: [PATCH 02/17] Split out fetching of newly joined/left rooms --- synapse/handlers/sliding_sync/room_lists.py | 160 +++++++++++--------- 1 file changed, 90 insertions(+), 70 deletions(-) diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index 3bfb6c9323..aad68ce5a9 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -25,6 +25,7 @@ from typing import ( Mapping, Optional, Set, + Tuple, Union, ) @@ -426,16 +427,16 @@ class SlidingSyncRoomLists: ) @trace - async def _rewind_current_membership_to_token( + async def _get_rewind_changes_to_current_membership_to_token( self, user: UserID, rooms_for_user: Mapping[str, RoomsForUser], to_token: StreamToken, - ) -> Mapping[str, RoomsForUser]: + ) -> Mapping[str, Optional[RoomsForUser]]: """ Takes the current set of rooms for a user (retrieved after the given - token), and "rewinds" it to match the set of memberships *at that - token*. + token), and returns the changes need to "rewind" it to match the set of + memberships *at that token*. Args: user: User to fetch rooms for @@ -443,7 +444,7 @@ class SlidingSyncRoomLists: to_token: The token to rewind Returns: - The set of memberships for the user at the given token + The changes to apply to rewind the the current memberships. """ # If the user has never joined any rooms before, we can just return an empty list if not rooms_for_user: @@ -511,11 +512,11 @@ class SlidingSyncRoomLists: if not current_state_delta_membership_changes_after_to_token: # There have been no membership changes, so we can early return. - return rooms_for_user + return {} # Otherwise we're about to make changes to `rooms_for_user`, so we turn # it into a mutable dict. - rooms_for_user = dict(rooms_for_user) + changes: Dict[str, Optional[RoomsForUser]] = {} # Assemble a list of the first membership event after the `to_token` so we can # step backward to the previous membership that would apply to the from/to @@ -538,7 +539,7 @@ class SlidingSyncRoomLists: ) in first_membership_change_by_room_id_after_to_token.items(): # 1a) Remove rooms that the user joined after the `to_token` if first_membership_change_after_to_token.prev_event_id is None: - rooms_for_user.pop(room_id, None) + changes[room_id] = None # 1b) 1c) From the first membership event after the `to_token`, step backward to the # previous membership that would apply to the from/to range. else: @@ -560,7 +561,7 @@ class SlidingSyncRoomLists: else: room_version_id = await self.store.get_room_version_id(room_id) - rooms_for_user[room_id] = RoomsForUser( + changes[room_id] = RoomsForUser( room_id=room_id, event_id=first_membership_change_after_to_token.prev_event_id, event_pos=first_membership_change_after_to_token.prev_event_pos, @@ -572,9 +573,9 @@ class SlidingSyncRoomLists: # If we can't find the previous membership event, we shouldn't # include the room in the sync response since we can't determine the # exact membership state and shouldn't rely on the current snapshot. - rooms_for_user.pop(room_id, None) + changes[room_id] = None - return rooms_for_user + return changes @trace async def get_room_membership_for_user_at_to_token( @@ -624,12 +625,23 @@ class SlidingSyncRoomLists: # Since we fetched the users room list at some point in time after the # tokens, we need to revert/rewind some membership changes to match the point in # time of the `to_token`. - rooms_for_user: Mapping[str, RoomsForUser] = { - room.room_id: room for room in room_for_user_list - } - rooms_for_user = await self._rewind_current_membership_to_token( + rooms_for_user = {room.room_id: room for room in room_for_user_list} + changes = await self._get_rewind_changes_to_current_membership_to_token( user, rooms_for_user, to_token ) + for room_id, change_room_for_user in changes.items(): + if change_room_for_user is None: + rooms_for_user.pop(room_id, None) + else: + rooms_for_user[room_id] = change_room_for_user + + newly_joined_room_ids, newly_left_room_ids = ( + await self._get_newly_joined_and_left_rooms( + user_id, to_token=to_token, from_token=from_token + ) + ) + + dm_room_ids = await self._get_dm_rooms_for_user(user_id) # Our working list of rooms that can show up in the sync response sync_room_id_set = { @@ -643,18 +655,59 @@ class SlidingSyncRoomLists: membership=room_for_user.membership, sender=room_for_user.sender, # We will update these fields below to be accurate - newly_joined=False, - newly_left=False, - is_dm=False, + newly_joined=room_id in newly_joined_room_ids, + newly_left=room_id in newly_left_room_ids, + is_dm=room_id in dm_room_ids, ) - for room_for_user in rooms_for_user.values() + for room_id, room_for_user in rooms_for_user.items() } - # We now need to figure out the + # Ensure we have entries for rooms that the user has been "state reset" + # out of. These are rooms appear in the `newly_left_rooms` map but + # aren't in the `rooms_for_user` map. + for room_id, left_event_pos in newly_left_room_ids.items(): + if room_id in sync_room_id_set: + continue + + sync_room_id_set[room_id] = _RoomMembershipForUser( + room_id=room_id, + event_id=None, + event_pos=left_event_pos, + membership=Membership.LEAVE, + sender=None, + # We will update these fields below to be accurate + newly_joined=False, + newly_left=True, + is_dm=room_id in dm_room_ids, + ) + + return sync_room_id_set + + @trace + async def _get_newly_joined_and_left_rooms( + self, + user_id: str, + to_token: StreamToken, + from_token: Optional[StreamToken], + ) -> Tuple[StrCollection, Mapping[str, PersistedEventPosition]]: + """Fetch the sets of rooms that the user newly joined or left in the + given token range. + + Note: there may be rooms in the newly left rooms where the user was + "state reset" out of the room, and so that room would not be part of the + "current memberships" of the user. + + Returns: + A 2-tuple of newly joined room IDs and a map of newly left room + IDs to the event position the leave happened at. + """ + newly_joined_room_ids: Set[str] = set() + newly_left_room_map: Dict[str, PersistedEventPosition] = {} + + # We need to figure out the # # - 1) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`) # - 2) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`) - # - 3) Figure out which rooms are DM's # 1) Fetch membership changes that fall in the range from `from_token` up to `to_token` current_state_delta_membership_changes_in_from_to_range = [] @@ -720,37 +773,9 @@ class SlidingSyncRoomLists: # 1) Figure out newly_left rooms (> `from_token` and <= `to_token`). if last_membership_change_in_from_to_range.membership == Membership.LEAVE: # 1) Mark this room as `newly_left` - - # If we're seeing a membership change here, we should expect to already - # have it in our snapshot but if a state reset happens, it wouldn't have - # shown up in our snapshot but appear as a change here. - existing_sync_entry = sync_room_id_set.get(room_id) - if existing_sync_entry is not None: - # Normal expected case - sync_room_id_set[room_id] = existing_sync_entry.copy_and_replace( - newly_left=True - ) - else: - # State reset! - logger.warn( - "State reset detected for room_id %s with %s who is no longer in the room", - room_id, - user_id, - ) - # Even though a state reset happened which removed the person from - # the room, we still add it the list so the user knows they left the - # room. Downstream code can check for a state reset by looking for - # `event_id=None and membership is not None`. - sync_room_id_set[room_id] = _RoomMembershipForUser( - room_id=room_id, - event_id=last_membership_change_in_from_to_range.event_id, - event_pos=last_membership_change_in_from_to_range.event_pos, - membership=last_membership_change_in_from_to_range.membership, - sender=last_membership_change_in_from_to_range.sender, - newly_joined=False, - newly_left=True, - is_dm=False, - ) + newly_left_room_map[room_id] = ( + last_membership_change_in_from_to_range.event_pos + ) # 2) Figure out `newly_joined` for room_id in possibly_newly_joined_room_ids: @@ -761,9 +786,7 @@ class SlidingSyncRoomLists: # also some non-join in the range, we know they `newly_joined`. if has_non_join_in_from_to_range: # We found a `newly_joined` room (we left and joined within the token range) - sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace( - newly_joined=True - ) + newly_joined_room_ids.add(room_id) else: prev_event_id = first_membership_change_by_room_id_in_from_to_range[ room_id @@ -775,20 +798,23 @@ class SlidingSyncRoomLists: if prev_event_id is None: # We found a `newly_joined` room (we are joining the room for the # first time within the token range) - sync_room_id_set[room_id] = sync_room_id_set[ - room_id - ].copy_and_replace(newly_joined=True) + newly_joined_room_ids.add(room_id) # Last resort, we need to step back to the previous membership event # just before the token range to see if we're joined then or not. elif prev_membership != Membership.JOIN: # We found a `newly_joined` room (we left before the token range # and joined within the token range) - sync_room_id_set[room_id] = sync_room_id_set[ - room_id - ].copy_and_replace(newly_joined=True) + newly_joined_room_ids.add(room_id) + + return newly_joined_room_ids, newly_left_room_map + + @trace + async def _get_dm_rooms_for_user( + self, + user_id: str, + ) -> StrCollection: + """Get the set of DM rooms for the user.""" - # 3) Figure out which rooms the user considers to be direct-message (DM) rooms - # # We're using global account data (`m.direct`) instead of checking for # `is_direct` on membership events because that property only appears for # the invitee membership event (doesn't show up for the inviter). @@ -810,13 +836,7 @@ class SlidingSyncRoomLists: if isinstance(room_id, str): dm_room_id_set.add(room_id) - # 3) Fixup - for room_id in sync_room_id_set: - sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace( - is_dm=room_id in dm_room_id_set - ) - - return sync_room_id_set + return dm_room_id_set @trace async def filter_rooms_relevant_for_sync( From c9a915648faa47bbc73d34f1e5a671d5424e7021 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Aug 2024 14:59:42 +0100 Subject: [PATCH 03/17] Add DB functions --- synapse/handlers/sliding_sync/room_lists.py | 454 +++++++++++++++++- .../databases/main/events_bg_updates.py | 16 + synapse/storage/databases/main/roommember.py | 55 ++- synapse/storage/roommember.py | 12 + 4 files changed, 535 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index aad68ce5a9..526b7d96f6 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -47,7 +47,7 @@ from synapse.storage.databases.main.state import ( Sentinel as StateSentinel, ) from synapse.storage.databases.main.stream import CurrentStateDeltaMembership -from synapse.storage.roommember import RoomsForUser +from synapse.storage.roommember import RoomsForUser, RoomsForUserSlidingSync from synapse.types import ( MutableStateMap, PersistedEventPosition, @@ -198,6 +198,275 @@ class SlidingSyncRoomLists: ) -> SlidingSyncInterestedRooms: """Fetch the set of rooms that match the request""" + if await self.store.have_finished_sliding_sync_background_jobs(): + return await self._compute_interested_rooms_new_tables( + sync_config=sync_config, + previous_connection_state=previous_connection_state, + to_token=to_token, + from_token=from_token, + ) + else: + return await self._compute_interested_rooms_fallback( + sync_config=sync_config, + previous_connection_state=previous_connection_state, + to_token=to_token, + from_token=from_token, + ) + + async def _compute_interested_rooms_new_tables( + self, + sync_config: SlidingSyncConfig, + previous_connection_state: "PerConnectionState", + to_token: StreamToken, + from_token: Optional[StreamToken], + ) -> SlidingSyncInterestedRooms: + """Implementation of `compute_interested_rooms` using new sliding sync db tables.""" + user_id = sync_config.user.to_string() + + # Assemble sliding window lists + lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {} + # Keep track of the rooms that we can display and need to fetch more info about + relevant_room_map: Dict[str, RoomSyncConfig] = {} + # The set of room IDs of all rooms that could appear in any list. These + # include rooms that are outside the list ranges. + all_rooms: Set[str] = set() + + room_membership_for_user_map = await self.store.get_sliding_sync_rooms_for_user( + user_id + ) + # TODO: Roll back memberships that are after the `to_token`. + + # TODO: Fill these out by getting membership changes in rooms. + newly_joined_room_ids: Set[str] = set() + newly_left_room_ids: Set[str] = set() + dm_room_ids: Set[str] = set() + + if sync_config.lists: + sync_room_map = room_membership_for_user_map + with start_active_span("assemble_sliding_window_lists"): + for list_key, list_config in sync_config.lists.items(): + # Apply filters + filtered_sync_room_map = sync_room_map + if list_config.filters is not None: + filtered_sync_room_map = await self.filter_rooms_using_tables( + user_id, + sync_room_map, + list_config.filters, + to_token, + ) + + # Find which rooms are partially stated and may need to be filtered out + # depending on the `required_state` requested (see below). + partial_state_room_map = ( + await self.store.is_partial_state_room_batched( + filtered_sync_room_map.keys() + ) + ) + + # Since creating the `RoomSyncConfig` takes some work, let's just do it + # once and make a copy whenever we need it. + room_sync_config = RoomSyncConfig.from_room_config(list_config) + + # Exclude partially-stated rooms if we must wait for the room to be + # fully-stated + if room_sync_config.must_await_full_state(self.is_mine_id): + filtered_sync_room_map = { + room_id: room + for room_id, room in filtered_sync_room_map.items() + if not partial_state_room_map.get(room_id) + } + + all_rooms.update(filtered_sync_room_map) + + # Sort the list + sorted_room_info = await self.sort_rooms_using_tables( + filtered_sync_room_map, to_token + ) + + ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] + if list_config.ranges: + for range in list_config.ranges: + room_ids_in_list: List[str] = [] + + # We're going to loop through the sorted list of rooms starting + # at the range start index and keep adding rooms until we fill + # up the range or run out of rooms. + # + # Both sides of range are inclusive so we `+ 1` + max_num_rooms = range[1] - range[0] + 1 + for room_membership in sorted_room_info[range[0] :]: + room_id = room_membership.room_id + + if len(room_ids_in_list) >= max_num_rooms: + break + + # Take the superset of the `RoomSyncConfig` for each room. + # + # Update our `relevant_room_map` with the room we're going + # to display and need to fetch more info about. + existing_room_sync_config = relevant_room_map.get( + room_id + ) + if existing_room_sync_config is not None: + existing_room_sync_config.combine_room_sync_config( + room_sync_config + ) + else: + # Make a copy so if we modify it later, it doesn't + # affect all references. + relevant_room_map[room_id] = ( + room_sync_config.deep_copy() + ) + + room_ids_in_list.append(room_id) + + ops.append( + SlidingSyncResult.SlidingWindowList.Operation( + op=OperationType.SYNC, + range=range, + room_ids=room_ids_in_list, + ) + ) + + lists[list_key] = SlidingSyncResult.SlidingWindowList( + count=len(sorted_room_info), + ops=ops, + ) + + if sync_config.room_subscriptions: + with start_active_span("assemble_room_subscriptions"): + # Find which rooms are partially stated and may need to be filtered out + # depending on the `required_state` requested (see below). + partial_state_room_map = await self.store.is_partial_state_room_batched( + sync_config.room_subscriptions.keys() + ) + + for ( + room_id, + room_subscription, + ) in sync_config.room_subscriptions.items(): + if room_id not in room_membership_for_user_map: + # TODO: Handle rooms the user isn't in. + continue + + all_rooms.add(room_id) + + # Take the superset of the `RoomSyncConfig` for each room. + room_sync_config = RoomSyncConfig.from_room_config( + room_subscription + ) + + # Exclude partially-stated rooms if we must wait for the room to be + # fully-stated + if room_sync_config.must_await_full_state(self.is_mine_id): + if partial_state_room_map.get(room_id): + continue + + all_rooms.add(room_id) + + # Update our `relevant_room_map` with the room we're going to display + # and need to fetch more info about. + existing_room_sync_config = relevant_room_map.get(room_id) + if existing_room_sync_config is not None: + existing_room_sync_config.combine_room_sync_config( + room_sync_config + ) + else: + relevant_room_map[room_id] = room_sync_config + + # Filtered subset of `relevant_room_map` for rooms that may have updates + # (in the event stream) + relevant_rooms_to_send_map: Dict[str, RoomSyncConfig] = relevant_room_map + if relevant_room_map: + with start_active_span("filter_relevant_rooms_to_send"): + if from_token: + rooms_should_send = set() + + # First we check if there are rooms that match a list/room + # subscription and have updates we need to send (i.e. either because + # we haven't sent the room down, or we have but there are missing + # updates). + for room_id, room_config in relevant_room_map.items(): + prev_room_sync_config = ( + previous_connection_state.room_configs.get(room_id) + ) + if prev_room_sync_config is not None: + # Always include rooms whose timeline limit has increased. + # (see the "XXX: Odd behavior" described below) + if ( + prev_room_sync_config.timeline_limit + < room_config.timeline_limit + ): + rooms_should_send.add(room_id) + continue + + status = previous_connection_state.rooms.have_sent_room(room_id) + if ( + # The room was never sent down before so the client needs to know + # about it regardless of any updates. + status.status == HaveSentRoomFlag.NEVER + # `PREVIOUSLY` literally means the "room was sent down before *AND* + # there are updates we haven't sent down" so we already know this + # room has updates. + or status.status == HaveSentRoomFlag.PREVIOUSLY + ): + rooms_should_send.add(room_id) + elif status.status == HaveSentRoomFlag.LIVE: + # We know that we've sent all updates up until `from_token`, + # so we just need to check if there have been updates since + # then. + pass + else: + assert_never(status.status) + + # We only need to check for new events since any state changes + # will also come down as new events. + rooms_that_have_updates = ( + self.store.get_rooms_that_might_have_updates( + relevant_room_map.keys(), from_token.room_key + ) + ) + rooms_should_send.update(rooms_that_have_updates) + relevant_rooms_to_send_map = { + room_id: room_sync_config + for room_id, room_sync_config in relevant_room_map.items() + if room_id in rooms_should_send + } + + return SlidingSyncInterestedRooms( + lists=lists, + relevant_room_map=relevant_room_map, + relevant_rooms_to_send_map=relevant_rooms_to_send_map, + all_rooms=all_rooms, + room_membership_for_user_map={ + # FIXME: Ideally we wouldn't have to do these copies and instead + # just return `newly_joined_room_ids` directly. + room_id: _RoomMembershipForUser( + room_id=room_id, + event_id=membership_info.event_id, + event_pos=PersistedEventPosition( + "master", # FIXME + membership_info.membership_event_stream_ordering, + ), + sender=membership_info.sender, + membership=membership_info.membership, + newly_joined=room_id in newly_joined_room_ids, + newly_left=room_id in newly_left_room_ids, + is_dm=room_id in dm_room_ids, + ) + for room_id, membership_info in room_membership_for_user_map.items() + }, + ) + + async def _compute_interested_rooms_fallback( + self, + sync_config: SlidingSyncConfig, + previous_connection_state: "PerConnectionState", + to_token: StreamToken, + from_token: Optional[StreamToken], + ) -> SlidingSyncInterestedRooms: + """Fallback code when the database background updates haven't completed yet.""" + room_membership_for_user_map = ( await self.get_room_membership_for_user_at_to_token( sync_config.user, to_token, from_token @@ -1370,6 +1639,189 @@ class SlidingSyncRoomLists: # Assemble a new sync room map but only with the `filtered_room_id_set` return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set} + @trace + async def filter_rooms_using_tables( + self, + user_id: str, + sync_room_map: Mapping[str, RoomsForUserSlidingSync], + filters: SlidingSyncConfig.SlidingSyncList.Filters, + to_token: StreamToken, + ) -> Dict[str, RoomsForUserSlidingSync]: + """ + Filter rooms based on the sync request. + + Args: + user: User to filter rooms for + sync_room_map: Dictionary of room IDs to sort along with membership + information in the room at the time of `to_token`. + filters: Filters to apply + to_token: We filter based on the state of the room at this token + + Returns: + A filtered dictionary of room IDs along with membership information in the + room at the time of `to_token`. + """ + + filtered_room_id_set = set(sync_room_map.keys()) + + # Filter for Direct-Message (DM) rooms + if filters.is_dm is not None: + with start_active_span("filters.is_dm"): + dm_map = await self.store.get_global_account_data_by_type_for_user( + user_id, AccountDataTypes.DIRECT + ) + + # Flatten out the map. Account data is set by the client so it needs to be + # scrutinized. + dm_room_id_set = set() + if isinstance(dm_map, dict): + for room_ids in dm_map.values(): + # Account data should be a list of room IDs. Ignore anything else + if isinstance(room_ids, list): + for room_id in room_ids: + if isinstance(room_id, str): + dm_room_id_set.add(room_id) + + if filters.is_dm: + # Intersect with the DM room set + filtered_room_id_set &= dm_room_id_set + else: + # Remove DMs + filtered_room_id_set -= dm_room_id_set + + if filters.spaces is not None: + with start_active_span("filters.spaces"): + raise NotImplementedError() + + # Filter for encrypted rooms + if filters.is_encrypted is not None: + filtered_room_id_set = { + room_id + for room_id in filtered_room_id_set + if sync_room_map[room_id].is_encrypted == filters.is_encrypted + } + + # Filter for rooms that the user has been invited to + if filters.is_invite is not None: + with start_active_span("filters.is_invite"): + # Make a copy so we don't run into an error: `Set changed size during + # iteration`, when we filter out and remove items + for room_id in filtered_room_id_set.copy(): + room_for_user = sync_room_map[room_id] + # If we're looking for invite rooms, filter out rooms that the user is + # not invited to and vice versa + if ( + filters.is_invite + and room_for_user.membership != Membership.INVITE + ) or ( + not filters.is_invite + and room_for_user.membership == Membership.INVITE + ): + filtered_room_id_set.remove(room_id) + + # Filter by room type (space vs room, etc). A room must match one of the types + # provided in the list. `None` is a valid type for rooms which do not have a + # room type. + if filters.room_types is not None or filters.not_room_types is not None: + with start_active_span("filters.room_types"): + # Make a copy so we don't run into an error: `Set changed size during + # iteration`, when we filter out and remove items + for room_id in filtered_room_id_set.copy(): + room_type = sync_room_map[room_id].room_type + + if ( + filters.room_types is not None + and room_type not in filters.room_types + ): + filtered_room_id_set.remove(room_id) + + if ( + filters.not_room_types is not None + and room_type in filters.not_room_types + ): + filtered_room_id_set.remove(room_id) + + if filters.room_name_like is not None: + with start_active_span("filters.room_name_like"): + # TODO: The room name is a bit more sensitive to leak than the + # create/encryption event. Maybe we should consider a better way to fetch + # historical state before implementing this. + # + # room_id_to_create_content = await self._bulk_get_partial_current_state_content_for_rooms( + # content_type="room_name", + # room_ids=filtered_room_id_set, + # to_token=to_token, + # sync_room_map=sync_room_map, + # room_id_to_stripped_state_map=room_id_to_stripped_state_map, + # ) + raise NotImplementedError() + + if filters.tags is not None or filters.not_tags is not None: + with start_active_span("filters.tags"): + raise NotImplementedError() + + # Assemble a new sync room map but only with the `filtered_room_id_set` + return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set} + + @trace + async def sort_rooms_using_tables( + self, + sync_room_map: Mapping[str, RoomsForUserSlidingSync], + to_token: StreamToken, + ) -> List[RoomsForUserSlidingSync]: + """ + Sort by `stream_ordering` of the last event that the user should see in the + room. `stream_ordering` is unique so we get a stable sort. + + Args: + sync_room_map: Dictionary of room IDs to sort along with membership + information in the room at the time of `to_token`. + to_token: We sort based on the events in the room at this token (<= `to_token`) + + Returns: + A sorted list of room IDs by `stream_ordering` along with membership information. + """ + + # Assemble a map of room ID to the `stream_ordering` of the last activity that the + # user should see in the room (<= `to_token`) + last_activity_in_room_map: Dict[str, int] = {} + + for room_id, room_for_user in sync_room_map.items(): + if room_for_user.membership != Membership.JOIN: + # If the user has left/been invited/knocked/been banned from a + # room, they shouldn't see anything past that point. + # + # FIXME: It's possible that people should see beyond this point + # in invited/knocked cases if for example the room has + # `invite`/`world_readable` history visibility, see + # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 + last_activity_in_room_map[room_id] = ( + room_for_user.membership_event_stream_ordering + ) + + # For fully-joined rooms, we find the latest activity at/before the + # `to_token`. + joined_room_positions = ( + await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering( + [ + room_id + for room_id, room_for_user in sync_room_map.items() + if room_for_user.membership == Membership.JOIN + ], + to_token.room_key, + ) + ) + + last_activity_in_room_map.update(joined_room_positions) + + return sorted( + sync_room_map.values(), + # Sort by the last activity (stream_ordering) in the room + key=lambda room_info: last_activity_in_room_map[room_info.room_id], + # We want descending order + reverse=True, + ) + @trace async def sort_rooms( self, diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 88ff5aa2df..e97b177dd6 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -2272,6 +2272,22 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS return len(memberships_to_update_rows) + async def have_finished_sliding_sync_background_jobs(self) -> bool: + """Return if its safe to use the sliding sync membership tables.""" + + # TODO: Find a more efficient way of checking this. A cache? + return ( + await self.db_pool.updates.has_completed_background_update( + _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE + ) + or await self.db_pool.updates.has_completed_background_update( + _BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE + ) + or await self.db_pool.updates.has_completed_background_update( + _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE + ) + ) + def _resolve_stale_data_in_sliding_sync_tables( txn: LoggingTransaction, diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 722686d4b8..91aca58271 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -53,7 +53,12 @@ from synapse.storage.database import ( from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import Sqlite3Engine -from synapse.storage.roommember import MemberSummary, ProfileInfo, RoomsForUser +from synapse.storage.roommember import ( + MemberSummary, + ProfileInfo, + RoomsForUser, + RoomsForUserSlidingSync, +) from synapse.types import ( JsonDict, PersistedEventPosition, @@ -1377,6 +1382,54 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): desc="room_forgetter_stream_pos", ) + # @cached(iterable=True, max_entries=10000) + async def get_sliding_sync_rooms_for_user( + self, + user_id: str, + ) -> Mapping[str, RoomsForUserSlidingSync]: + """Get all the rooms for a user to handle a sliding sync request. + + Ignores forgotten rooms and rooms that the user has been kicked from. + + Returns: + Map from room ID to membership info + """ + + def get_sliding_sync_rooms_for_user_txn( + txn: LoggingTransaction, + ) -> Dict[str, RoomsForUserSlidingSync]: + sql = """ + SELECT m.room_id, m.sender, m.membership, m.membership_event_id, + m.event_stream_ordering, + COALESCE(j.room_type, m.room_type), + COALESCE(j.is_encrypted, m.is_encrypted), + COALESCE(j.tombstone_successor_room_id, m.tombstone_successor_room_id) + FROM sliding_sync_membership_snapshots AS m + LEFT JOIN sliding_sync_joined_rooms AS j USING (room_id) + WHERE user_id = ? + AND m.forgotten = 0 + AND (membership != 'leave' OR m.sender != m.user_id) + """ + txn.execute(sql, (user_id,)) + return { + row[0]: RoomsForUserSlidingSync( + room_id=row[0], + sender=row[1], + membership=row[2], + event_id=row[3], + membership_event_stream_ordering=row[4], + room_type=row[5], + is_encrypted=row[6], + tombstone_successor_room_id=row[7], + ) + for row in txn + } + + return await self.db_pool.runInteraction( + "get_sliding_sync_rooms_for_user", + get_sliding_sync_rooms_for_user_txn, + ) + class RoomMemberBackgroundUpdateStore(SQLBaseStore): def __init__( diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 80c9630867..083ac54116 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -39,6 +39,18 @@ class RoomsForUser: room_version_id: str +@attr.s(slots=True, frozen=True, weakref_slot=False, auto_attribs=True) +class RoomsForUserSlidingSync: + room_id: str + sender: str + membership: str + event_id: str + membership_event_stream_ordering: int + room_type: Optional[str] + is_encrypted: bool + tombstone_successor_room_id: Optional[str] + + @attr.s(slots=True, frozen=True, weakref_slot=False, auto_attribs=True) class GetRoomsForUserWithStreamOrdering: room_id: str From e2c0a4b205cc59d4c92d18f51dc3f35830425000 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Aug 2024 15:44:31 +0100 Subject: [PATCH 04/17] Use new tables --- synapse/handlers/sliding_sync/room_lists.py | 69 ++++++++++++++++---- synapse/storage/databases/main/roommember.py | 14 ++-- synapse/storage/roommember.py | 7 +- 3 files changed, 64 insertions(+), 26 deletions(-) diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index 526b7d96f6..f54c4d05f9 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -234,15 +234,56 @@ class SlidingSyncRoomLists: room_membership_for_user_map = await self.store.get_sliding_sync_rooms_for_user( user_id ) - # TODO: Roll back memberships that are after the `to_token`. - # TODO: Fill these out by getting membership changes in rooms. - newly_joined_room_ids: Set[str] = set() - newly_left_room_ids: Set[str] = set() - dm_room_ids: Set[str] = set() + changes = await self._get_rewind_changes_to_current_membership_to_token( + sync_config.user, room_membership_for_user_map, to_token=to_token + ) + if changes: + room_membership_for_user_map = dict(room_membership_for_user_map) + for room_id, change in changes.items(): + if change is None: + room_membership_for_user_map.pop(room_id) + continue + + existing_room = room_membership_for_user_map.get(room_id) + if existing_room is not None: + room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( + room_id=room_id, + sender=change.sender, + membership=change.membership, + event_id=change.event_id, + event_pos=change.event_pos, + room_version_id=change.room_version_id, + # We keep the current state of the room though + room_type=existing_room.room_type, + is_encrypted=existing_room.is_encrypted, + tombstone_successor_room_id=existing_room.tombstone_successor_room_id, + ) + else: + # This is the state reset case. + raise NotImplementedError() + + newly_joined_room_ids, newly_left_room_map = ( + await self._get_newly_joined_and_left_rooms( + user_id, from_token=from_token, to_token=to_token + ) + ) + + dm_room_ids = await self._get_dm_rooms_for_user(user_id) if sync_config.lists: - sync_room_map = room_membership_for_user_map + sync_room_map = { + room_id: room_membership_for_user + for room_id, room_membership_for_user in room_membership_for_user_map.items() + if room_membership_for_user.membership != Membership.LEAVE + # Unless... + or room_id in newly_left_room_map + # Allow kicks + or ( + room_membership_for_user.membership == Membership.LEAVE + and room_membership_for_user.sender not in (user_id, None) + ) + } with start_active_span("assemble_sliding_window_lists"): for list_key, list_config in sync_config.lists.items(): # Apply filters @@ -433,6 +474,11 @@ class SlidingSyncRoomLists: if room_id in rooms_should_send } + # TODO: Handle state reset rooms with newly_left_room_map + + if newly_left_room_map.keys() - room_membership_for_user_map.keys(): + raise NotImplementedError() + return SlidingSyncInterestedRooms( lists=lists, relevant_room_map=relevant_room_map, @@ -444,14 +490,11 @@ class SlidingSyncRoomLists: room_id: _RoomMembershipForUser( room_id=room_id, event_id=membership_info.event_id, - event_pos=PersistedEventPosition( - "master", # FIXME - membership_info.membership_event_stream_ordering, - ), + event_pos=membership_info.event_pos, sender=membership_info.sender, membership=membership_info.membership, newly_joined=room_id in newly_joined_room_ids, - newly_left=room_id in newly_left_room_ids, + newly_left=room_id in newly_left_room_map, is_dm=room_id in dm_room_ids, ) for room_id, membership_info in room_membership_for_user_map.items() @@ -1795,9 +1838,7 @@ class SlidingSyncRoomLists: # in invited/knocked cases if for example the room has # `invite`/`world_readable` history visibility, see # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 - last_activity_in_room_map[room_id] = ( - room_for_user.membership_event_stream_ordering - ) + last_activity_in_room_map[room_id] = room_for_user.event_pos.stream # For fully-joined rooms, we find the latest activity at/before the # `to_token`. diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 91aca58271..2c027b1aa6 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -1400,15 +1400,16 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): ) -> Dict[str, RoomsForUserSlidingSync]: sql = """ SELECT m.room_id, m.sender, m.membership, m.membership_event_id, - m.event_stream_ordering, + r.room_version, + m.event_instance_name, m.event_stream_ordering, COALESCE(j.room_type, m.room_type), COALESCE(j.is_encrypted, m.is_encrypted), COALESCE(j.tombstone_successor_room_id, m.tombstone_successor_room_id) FROM sliding_sync_membership_snapshots AS m + INNER JOIN rooms AS r USING (room_id) LEFT JOIN sliding_sync_joined_rooms AS j USING (room_id) WHERE user_id = ? AND m.forgotten = 0 - AND (membership != 'leave' OR m.sender != m.user_id) """ txn.execute(sql, (user_id,)) return { @@ -1417,10 +1418,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): sender=row[1], membership=row[2], event_id=row[3], - membership_event_stream_ordering=row[4], - room_type=row[5], - is_encrypted=row[6], - tombstone_successor_room_id=row[7], + room_version_id=row[4], + event_pos=PersistedEventPosition(row[5], row[6]), + room_type=row[7], + is_encrypted=row[8], + tombstone_successor_room_id=row[9], ) for row in txn } diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 083ac54116..af61d11f1d 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -40,12 +40,7 @@ class RoomsForUser: @attr.s(slots=True, frozen=True, weakref_slot=False, auto_attribs=True) -class RoomsForUserSlidingSync: - room_id: str - sender: str - membership: str - event_id: str - membership_event_stream_ordering: int +class RoomsForUserSlidingSync(RoomsForUser): room_type: Optional[str] is_encrypted: bool tombstone_successor_room_id: Optional[str] From 86a0730f73d472830695db1545bcb61b27686592 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Aug 2024 15:47:43 +0100 Subject: [PATCH 05/17] Add trace --- synapse/handlers/sliding_sync/room_lists.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index f54c4d05f9..35c02b7958 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -213,6 +213,7 @@ class SlidingSyncRoomLists: from_token=from_token, ) + @trace async def _compute_interested_rooms_new_tables( self, sync_config: SlidingSyncConfig, From c038ff9e24b5724b0aff16ac22f188be4b879fab Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Aug 2024 16:14:50 +0100 Subject: [PATCH 06/17] Proper join --- synapse/storage/databases/main/roommember.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 2c027b1aa6..a124c3d60f 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -1407,7 +1407,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): COALESCE(j.tombstone_successor_room_id, m.tombstone_successor_room_id) FROM sliding_sync_membership_snapshots AS m INNER JOIN rooms AS r USING (room_id) - LEFT JOIN sliding_sync_joined_rooms AS j USING (room_id) + LEFT JOIN sliding_sync_joined_rooms AS j ON (j.room_id = m.room_id AND m.membership = 'join') WHERE user_id = ? AND m.forgotten = 0 """ From a02739766e8a94fa8c71aea34e8d8d5a1fa4e9b7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Aug 2024 17:23:36 +0100 Subject: [PATCH 07/17] Newsfile --- changelog.d/17630.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17630.misc diff --git a/changelog.d/17630.misc b/changelog.d/17630.misc new file mode 100644 index 0000000000..ed1bf6bd55 --- /dev/null +++ b/changelog.d/17630.misc @@ -0,0 +1 @@ +Use new database tables for sliding sync. From 676754d7a7668f7eac421d25f70d3e8abdbea52d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Aug 2024 18:23:15 +0100 Subject: [PATCH 08/17] WIP --- synapse/handlers/sliding_sync/room_lists.py | 36 +++++++++++++-------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index 35c02b7958..213ee29011 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -261,8 +261,20 @@ class SlidingSyncRoomLists: tombstone_successor_room_id=existing_room.tombstone_successor_room_id, ) else: - # This is the state reset case. - raise NotImplementedError() + # This can happen if we get "state reset" out of the room + # after the `to_token`. + room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( + room_id=room_id, + sender=change.sender, + membership=change.membership, + event_id=change.event_id, + event_pos=change.event_pos, + room_version_id=change.room_version_id, + # We keep the current state of the room though + room_type=existing_room.room_type, + is_encrypted=existing_room.is_encrypted, + tombstone_successor_room_id=existing_room.tombstone_successor_room_id, + ) newly_joined_room_ids, newly_left_room_map = ( await self._get_newly_joined_and_left_rooms( @@ -350,15 +362,11 @@ class SlidingSyncRoomLists: room_id ) if existing_room_sync_config is not None: - existing_room_sync_config.combine_room_sync_config( + room_sync_config = existing_room_sync_config.combine_room_sync_config( room_sync_config ) - else: - # Make a copy so if we modify it later, it doesn't - # affect all references. - relevant_room_map[room_id] = ( - room_sync_config.deep_copy() - ) + + relevant_room_map[room_id] = room_sync_config room_ids_in_list.append(room_id) @@ -410,11 +418,13 @@ class SlidingSyncRoomLists: # and need to fetch more info about. existing_room_sync_config = relevant_room_map.get(room_id) if existing_room_sync_config is not None: - existing_room_sync_config.combine_room_sync_config( - room_sync_config + room_sync_config = ( + existing_room_sync_config.combine_room_sync_config( + room_sync_config + ) ) - else: - relevant_room_map[room_id] = room_sync_config + + relevant_room_map[room_id] = room_sync_config # Filtered subset of `relevant_room_map` for rooms that may have updates # (in the event stream) From bc4cb1fc41f844e87389b6ebbe8497bbf974bf2d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Aug 2024 19:13:16 +0100 Subject: [PATCH 09/17] Handle state resets in rooms --- synapse/handlers/sliding_sync/room_lists.py | 41 ++++++++++++++------ synapse/storage/databases/main/roommember.py | 4 +- synapse/storage/roommember.py | 10 ++++- 3 files changed, 38 insertions(+), 17 deletions(-) diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index 213ee29011..c5d86c1d76 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -258,11 +258,13 @@ class SlidingSyncRoomLists: # We keep the current state of the room though room_type=existing_room.room_type, is_encrypted=existing_room.is_encrypted, - tombstone_successor_room_id=existing_room.tombstone_successor_room_id, ) else: # This can happen if we get "state reset" out of the room # after the `to_token`. + room_type = await self.store.get_room_type(room_id) + encryption = await self.store.get_room_encryption(room_id) + room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( room_id=room_id, sender=change.sender, @@ -270,10 +272,8 @@ class SlidingSyncRoomLists: event_id=change.event_id, event_pos=change.event_pos, room_version_id=change.room_version_id, - # We keep the current state of the room though - room_type=existing_room.room_type, - is_encrypted=existing_room.is_encrypted, - tombstone_successor_room_id=existing_room.tombstone_successor_room_id, + room_type=room_type, + is_encrypted=encryption is not None, ) newly_joined_room_ids, newly_left_room_map = ( @@ -281,9 +281,31 @@ class SlidingSyncRoomLists: user_id, from_token=from_token, to_token=to_token ) ) - dm_room_ids = await self._get_dm_rooms_for_user(user_id) + # Handle state resets in the from -> to token range. + state_reset_rooms = ( + newly_left_room_map.keys() - room_membership_for_user_map.keys() + ) + if state_reset_rooms: + room_membership_for_user_map = dict(room_membership_for_user_map) + for room_id in ( + newly_left_room_map.keys() - room_membership_for_user_map.keys() + ): + room_type = await self.store.get_room_type(room_id) + encryption = await self.store.get_room_encryption(room_id) + + room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( + room_id=room_id, + sender=None, + membership=Membership.LEAVE, + event_id=None, + event_pos=newly_left_room_map[room_id], + room_version_id=await self.store.get_room_version_id(room_id), + room_type=room_type, + is_encrypted=encryption is not None, + ) + if sync_config.lists: sync_room_map = { room_id: room_membership_for_user @@ -485,11 +507,6 @@ class SlidingSyncRoomLists: if room_id in rooms_should_send } - # TODO: Handle state reset rooms with newly_left_room_map - - if newly_left_room_map.keys() - room_membership_for_user_map.keys(): - raise NotImplementedError() - return SlidingSyncInterestedRooms( lists=lists, relevant_room_map=relevant_room_map, @@ -753,7 +770,7 @@ class SlidingSyncRoomLists: async def _get_rewind_changes_to_current_membership_to_token( self, user: UserID, - rooms_for_user: Mapping[str, RoomsForUser], + rooms_for_user: Mapping[str, Union[RoomsForUser, RoomsForUserSlidingSync]], to_token: StreamToken, ) -> Mapping[str, Optional[RoomsForUser]]: """ diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index a124c3d60f..eb6d3ca617 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -1403,8 +1403,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): r.room_version, m.event_instance_name, m.event_stream_ordering, COALESCE(j.room_type, m.room_type), - COALESCE(j.is_encrypted, m.is_encrypted), - COALESCE(j.tombstone_successor_room_id, m.tombstone_successor_room_id) + COALESCE(j.is_encrypted, m.is_encrypted) FROM sliding_sync_membership_snapshots AS m INNER JOIN rooms AS r USING (room_id) LEFT JOIN sliding_sync_joined_rooms AS j ON (j.room_id = m.room_id AND m.membership = 'join') @@ -1422,7 +1421,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): event_pos=PersistedEventPosition(row[5], row[6]), room_type=row[7], is_encrypted=row[8], - tombstone_successor_room_id=row[9], ) for row in txn } diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index af61d11f1d..09213627ec 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -40,10 +40,16 @@ class RoomsForUser: @attr.s(slots=True, frozen=True, weakref_slot=False, auto_attribs=True) -class RoomsForUserSlidingSync(RoomsForUser): +class RoomsForUserSlidingSync: + room_id: str + sender: Optional[str] + membership: str + event_id: Optional[str] + event_pos: PersistedEventPosition + room_version_id: str + room_type: Optional[str] is_encrypted: bool - tombstone_successor_room_id: Optional[str] @attr.s(slots=True, frozen=True, weakref_slot=False, auto_attribs=True) From 2980422e9b2c1f59ec2cd0bb58f7ddd1e4dc8a76 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Aug 2024 13:14:54 +0100 Subject: [PATCH 10/17] Apply suggestions from code review Co-authored-by: Eric Eastwood --- synapse/handlers/sliding_sync/room_lists.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index c5d86c1d76..c6435542b5 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -206,6 +206,10 @@ class SlidingSyncRoomLists: from_token=from_token, ) else: + # FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the + # foreground update for + # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by + # https://github.com/element-hq/synapse/issues/17623) return await self._compute_interested_rooms_fallback( sync_config=sync_config, previous_connection_state=previous_connection_state, @@ -243,11 +247,13 @@ class SlidingSyncRoomLists: room_membership_for_user_map = dict(room_membership_for_user_map) for room_id, change in changes.items(): if change is None: + # Remove rooms that the user joined after the `to_token` room_membership_for_user_map.pop(room_id) continue existing_room = room_membership_for_user_map.get(room_id) if existing_room is not None: + # Update room membership events to the point in time of the `to_token` room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( room_id=room_id, sender=change.sender, @@ -261,10 +267,13 @@ class SlidingSyncRoomLists: ) else: # This can happen if we get "state reset" out of the room - # after the `to_token`. + # after the `to_token`. In other words, there is no membership + # for the room after the `to_token` but we see membership in + # the token range. room_type = await self.store.get_room_type(room_id) encryption = await self.store.get_room_encryption(room_id) + # Add back rooms that the user was state-reset out of after `to_token` room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( room_id=room_id, sender=change.sender, From 6c4ad323a9d213acecbd5c558e154da2d3a827fb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Aug 2024 13:31:06 +0100 Subject: [PATCH 11/17] Faster have_finished_sliding_sync_background_jobs --- synapse/storage/background_updates.py | 21 ++++++++++++++++++- .../databases/main/events_bg_updates.py | 15 +++++-------- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index f473294070..efe4238036 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -44,7 +44,7 @@ from synapse._pydantic_compat import HAS_PYDANTIC_V2 from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.engines import PostgresEngine from synapse.storage.types import Connection, Cursor -from synapse.types import JsonDict +from synapse.types import JsonDict, StrCollection from synapse.util import Clock, json_encoder from . import engines @@ -487,6 +487,25 @@ class BackgroundUpdater: return not update_exists + async def have_completed_background_updates( + self, update_names: StrCollection + ) -> bool: + """Return the name of background updates that have not yet been + completed""" + if self._all_done: + return True + + rows = await self.db_pool.simple_select_many_batch( + table="background_updates", + column="update_name", + iterable=update_names, + retcols=("update_name",), + desc="get_uncompleted_background_updates", + ) + + # If we find any rows then we've not completed the update. + return not bool(rows) + async def do_next_background_update(self, sleep: bool = True) -> bool: """Does some amount of work on the next queued background update diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index e97b177dd6..c6c8eeaa02 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -2275,16 +2275,11 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS async def have_finished_sliding_sync_background_jobs(self) -> bool: """Return if its safe to use the sliding sync membership tables.""" - # TODO: Find a more efficient way of checking this. A cache? - return ( - await self.db_pool.updates.has_completed_background_update( - _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE - ) - or await self.db_pool.updates.has_completed_background_update( - _BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE - ) - or await self.db_pool.updates.has_completed_background_update( - _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE + return await self.db_pool.updates.have_completed_background_updates( + ( + _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE, + _BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE, + _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE, ) ) From 5d6386a3c9b74a03dc3b59f65a92c830ebc34d77 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Aug 2024 13:36:20 +0100 Subject: [PATCH 12/17] Use dm_room_ids --- synapse/handlers/sliding_sync/room_lists.py | 25 ++++++--------------- 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index c6435542b5..3bdd98fb13 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -18,6 +18,7 @@ import logging from itertools import chain from typing import ( TYPE_CHECKING, + AbstractSet, Any, Dict, List, @@ -338,6 +339,7 @@ class SlidingSyncRoomLists: sync_room_map, list_config.filters, to_token, + dm_room_ids, ) # Find which rooms are partially stated and may need to be filtered out @@ -1161,7 +1163,7 @@ class SlidingSyncRoomLists: async def _get_dm_rooms_for_user( self, user_id: str, - ) -> StrCollection: + ) -> AbstractSet[str]: """Get the set of DM rooms for the user.""" # We're using global account data (`m.direct`) instead of checking for @@ -1726,6 +1728,7 @@ class SlidingSyncRoomLists: sync_room_map: Mapping[str, RoomsForUserSlidingSync], filters: SlidingSyncConfig.SlidingSyncList.Filters, to_token: StreamToken, + dm_room_ids: AbstractSet[str], ) -> Dict[str, RoomsForUserSlidingSync]: """ Filter rooms based on the sync request. @@ -1736,6 +1739,7 @@ class SlidingSyncRoomLists: information in the room at the time of `to_token`. filters: Filters to apply to_token: We filter based on the state of the room at this token + dm_room_ids: Set of room IDs which are DMs Returns: A filtered dictionary of room IDs along with membership information in the @@ -1747,27 +1751,12 @@ class SlidingSyncRoomLists: # Filter for Direct-Message (DM) rooms if filters.is_dm is not None: with start_active_span("filters.is_dm"): - dm_map = await self.store.get_global_account_data_by_type_for_user( - user_id, AccountDataTypes.DIRECT - ) - - # Flatten out the map. Account data is set by the client so it needs to be - # scrutinized. - dm_room_id_set = set() - if isinstance(dm_map, dict): - for room_ids in dm_map.values(): - # Account data should be a list of room IDs. Ignore anything else - if isinstance(room_ids, list): - for room_id in room_ids: - if isinstance(room_id, str): - dm_room_id_set.add(room_id) - if filters.is_dm: # Intersect with the DM room set - filtered_room_id_set &= dm_room_id_set + filtered_room_id_set &= dm_room_ids else: # Remove DMs - filtered_room_id_set -= dm_room_id_set + filtered_room_id_set -= dm_room_ids if filters.spaces is not None: with start_active_span("filters.spaces"): From acb57ee42e43842f41f41c6689cc5a692da13bfe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Aug 2024 13:44:52 +0100 Subject: [PATCH 13/17] Use filter_membership_for_sync --- synapse/handlers/sliding_sync/room_lists.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index 3bdd98fb13..66bf6231e6 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -144,7 +144,10 @@ class _RoomMembershipForUser: def filter_membership_for_sync( - *, user_id: str, room_membership_for_user: _RoomMembershipForUser + *, + user_id: str, + room_membership_for_user: Union[_RoomMembershipForUser, RoomsForUserSlidingSync], + newly_left: bool, ) -> bool: """ Returns True if the membership event should be included in the sync response, @@ -157,7 +160,6 @@ def filter_membership_for_sync( membership = room_membership_for_user.membership sender = room_membership_for_user.sender - newly_left = room_membership_for_user.newly_left # We want to allow everything except rooms the user has left unless `newly_left` # because we want everything that's *still* relevant to the user. We include @@ -320,13 +322,10 @@ class SlidingSyncRoomLists: sync_room_map = { room_id: room_membership_for_user for room_id, room_membership_for_user in room_membership_for_user_map.items() - if room_membership_for_user.membership != Membership.LEAVE - # Unless... - or room_id in newly_left_room_map - # Allow kicks - or ( - room_membership_for_user.membership == Membership.LEAVE - and room_membership_for_user.sender not in (user_id, None) + if filter_membership_for_sync( + user_id=user_id, + room_membership_for_user=room_membership_for_user, + newly_left=room_id in newly_left_room_map, ) } with start_active_span("assemble_sliding_window_lists"): @@ -1229,6 +1228,7 @@ class SlidingSyncRoomLists: if filter_membership_for_sync( user_id=user_id, room_membership_for_user=room_membership_for_user, + newly_left=room_membership_for_user.newly_left, ) } From 82f58bf7b709e6ba42c19f8bb0039fadde2ca8ad Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Aug 2024 13:58:36 +0100 Subject: [PATCH 14/17] Factor out _filter_relevant_room_to_send --- synapse/handlers/sliding_sync/room_lists.py | 90 +++++++-------------- 1 file changed, 27 insertions(+), 63 deletions(-) diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index 66bf6231e6..8d11970fd8 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -460,62 +460,9 @@ class SlidingSyncRoomLists: # Filtered subset of `relevant_room_map` for rooms that may have updates # (in the event stream) - relevant_rooms_to_send_map: Dict[str, RoomSyncConfig] = relevant_room_map - if relevant_room_map: - with start_active_span("filter_relevant_rooms_to_send"): - if from_token: - rooms_should_send = set() - - # First we check if there are rooms that match a list/room - # subscription and have updates we need to send (i.e. either because - # we haven't sent the room down, or we have but there are missing - # updates). - for room_id, room_config in relevant_room_map.items(): - prev_room_sync_config = ( - previous_connection_state.room_configs.get(room_id) - ) - if prev_room_sync_config is not None: - # Always include rooms whose timeline limit has increased. - # (see the "XXX: Odd behavior" described below) - if ( - prev_room_sync_config.timeline_limit - < room_config.timeline_limit - ): - rooms_should_send.add(room_id) - continue - - status = previous_connection_state.rooms.have_sent_room(room_id) - if ( - # The room was never sent down before so the client needs to know - # about it regardless of any updates. - status.status == HaveSentRoomFlag.NEVER - # `PREVIOUSLY` literally means the "room was sent down before *AND* - # there are updates we haven't sent down" so we already know this - # room has updates. - or status.status == HaveSentRoomFlag.PREVIOUSLY - ): - rooms_should_send.add(room_id) - elif status.status == HaveSentRoomFlag.LIVE: - # We know that we've sent all updates up until `from_token`, - # so we just need to check if there have been updates since - # then. - pass - else: - assert_never(status.status) - - # We only need to check for new events since any state changes - # will also come down as new events. - rooms_that_have_updates = ( - self.store.get_rooms_that_might_have_updates( - relevant_room_map.keys(), from_token.room_key - ) - ) - rooms_should_send.update(rooms_that_have_updates) - relevant_rooms_to_send_map = { - room_id: room_sync_config - for room_id, room_sync_config in relevant_room_map.items() - if room_id in rooms_should_send - } + relevant_rooms_to_send_map = await self._filter_relevant_room_to_send( + previous_connection_state, from_token, relevant_room_map + ) return SlidingSyncInterestedRooms( lists=lists, @@ -709,6 +656,29 @@ class SlidingSyncRoomLists: relevant_room_map[room_id] = room_sync_config + # Filtered subset of `relevant_room_map` for rooms that may have updates + # (in the event stream) + relevant_rooms_to_send_map = await self._filter_relevant_room_to_send( + previous_connection_state, from_token, relevant_room_map + ) + + return SlidingSyncInterestedRooms( + lists=lists, + relevant_room_map=relevant_room_map, + relevant_rooms_to_send_map=relevant_rooms_to_send_map, + all_rooms=all_rooms, + room_membership_for_user_map=room_membership_for_user_map, + ) + + async def _filter_relevant_room_to_send( + self, + previous_connection_state: PerConnectionState, + from_token: Optional[StreamToken], + relevant_room_map: Dict[str, RoomSyncConfig], + ) -> Dict[str, RoomSyncConfig]: + """Filters the `relevant_room_map` down to those rooms that may have + updates we need to fetch and return.""" + # Filtered subset of `relevant_room_map` for rooms that may have updates # (in the event stream) relevant_rooms_to_send_map: Dict[str, RoomSyncConfig] = relevant_room_map @@ -768,13 +738,7 @@ class SlidingSyncRoomLists: if room_id in rooms_should_send } - return SlidingSyncInterestedRooms( - lists=lists, - relevant_room_map=relevant_room_map, - relevant_rooms_to_send_map=relevant_rooms_to_send_map, - all_rooms=all_rooms, - room_membership_for_user_map=room_membership_for_user_map, - ) + return relevant_rooms_to_send_map @trace async def _get_rewind_changes_to_current_membership_to_token( From e76954b9cece47024e61c8376ce183670846b460 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Aug 2024 14:46:29 +0100 Subject: [PATCH 15/17] Parameterize tests --- .../sliding_sync/test_connection_tracking.py | 16 ++++++++++++- .../test_extension_account_data.py | 16 +++++++++++++ .../sliding_sync/test_extension_e2ee.py | 16 +++++++++++++ .../sliding_sync/test_extension_receipts.py | 16 +++++++++++++ .../sliding_sync/test_extension_to_device.py | 15 ++++++++++++ .../sliding_sync/test_extension_typing.py | 16 +++++++++++++ .../client/sliding_sync/test_extensions.py | 16 ++++++++++++- .../sliding_sync/test_room_subscriptions.py | 16 +++++++++++++ .../client/sliding_sync/test_rooms_invites.py | 16 +++++++++++++ .../client/sliding_sync/test_rooms_meta.py | 16 +++++++++++++ .../sliding_sync/test_rooms_required_state.py | 16 ++++++++++++- .../sliding_sync/test_rooms_timeline.py | 16 +++++++++++++ .../client/sliding_sync/test_sliding_sync.py | 24 +++++++++++++++++++ 13 files changed, 212 insertions(+), 3 deletions(-) diff --git a/tests/rest/client/sliding_sync/test_connection_tracking.py b/tests/rest/client/sliding_sync/test_connection_tracking.py index 6863c32f7c..436bd4466c 100644 --- a/tests/rest/client/sliding_sync/test_connection_tracking.py +++ b/tests/rest/client/sliding_sync/test_connection_tracking.py @@ -13,7 +13,7 @@ # import logging -from parameterized import parameterized +from parameterized import parameterized, parameterized_class from twisted.test.proto_helpers import MemoryReactor @@ -28,6 +28,18 @@ from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase): """ Test connection tracking in the Sliding Sync API. @@ -44,6 +56,8 @@ class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase): self.store = hs.get_datastores().main self.storage_controllers = hs.get_storage_controllers() + super().prepare(reactor, clock, hs) + def test_rooms_required_state_incremental_sync_LIVE(self) -> None: """Test that we only get state updates in incremental sync for rooms we've already seen (LIVE). diff --git a/tests/rest/client/sliding_sync/test_extension_account_data.py b/tests/rest/client/sliding_sync/test_extension_account_data.py index 3482a5f887..65a6adf4af 100644 --- a/tests/rest/client/sliding_sync/test_extension_account_data.py +++ b/tests/rest/client/sliding_sync/test_extension_account_data.py @@ -13,6 +13,8 @@ # import logging +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -28,6 +30,18 @@ from tests.server import TimedOutException logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase): """Tests for the account_data sliding sync extension""" @@ -43,6 +57,8 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase): self.store = hs.get_datastores().main self.account_data_handler = hs.get_account_data_handler() + super().prepare(reactor, clock, hs) + def test_no_data_initial_sync(self) -> None: """ Test that enabling the account_data extension works during an intitial sync, diff --git a/tests/rest/client/sliding_sync/test_extension_e2ee.py b/tests/rest/client/sliding_sync/test_extension_e2ee.py index 320f8c788f..2ff6687796 100644 --- a/tests/rest/client/sliding_sync/test_extension_e2ee.py +++ b/tests/rest/client/sliding_sync/test_extension_e2ee.py @@ -13,6 +13,8 @@ # import logging +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -27,6 +29,18 @@ from tests.server import TimedOutException logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncE2eeExtensionTestCase(SlidingSyncBase): """Tests for the e2ee sliding sync extension""" @@ -42,6 +56,8 @@ class SlidingSyncE2eeExtensionTestCase(SlidingSyncBase): self.store = hs.get_datastores().main self.e2e_keys_handler = hs.get_e2e_keys_handler() + super().prepare(reactor, clock, hs) + def test_no_data_initial_sync(self) -> None: """ Test that enabling e2ee extension works during an intitial sync, even if there diff --git a/tests/rest/client/sliding_sync/test_extension_receipts.py b/tests/rest/client/sliding_sync/test_extension_receipts.py index e842349ed2..90b035dd75 100644 --- a/tests/rest/client/sliding_sync/test_extension_receipts.py +++ b/tests/rest/client/sliding_sync/test_extension_receipts.py @@ -13,6 +13,8 @@ # import logging +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -28,6 +30,18 @@ from tests.server import TimedOutException logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase): """Tests for the receipts sliding sync extension""" @@ -42,6 +56,8 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main + super().prepare(reactor, clock, hs) + def test_no_data_initial_sync(self) -> None: """ Test that enabling the receipts extension works during an intitial sync, diff --git a/tests/rest/client/sliding_sync/test_extension_to_device.py b/tests/rest/client/sliding_sync/test_extension_to_device.py index f8500812ea..5ba2443089 100644 --- a/tests/rest/client/sliding_sync/test_extension_to_device.py +++ b/tests/rest/client/sliding_sync/test_extension_to_device.py @@ -14,6 +14,8 @@ import logging from typing import List +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -28,6 +30,18 @@ from tests.server import TimedOutException logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase): """Tests for the to-device sliding sync extension""" @@ -40,6 +54,7 @@ class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main + super().prepare(reactor, clock, hs) def _assert_to_device_response( self, response_body: JsonDict, expected_messages: List[JsonDict] diff --git a/tests/rest/client/sliding_sync/test_extension_typing.py b/tests/rest/client/sliding_sync/test_extension_typing.py index 7f523e0f10..0a0f5aff1a 100644 --- a/tests/rest/client/sliding_sync/test_extension_typing.py +++ b/tests/rest/client/sliding_sync/test_extension_typing.py @@ -13,6 +13,8 @@ # import logging +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -28,6 +30,18 @@ from tests.server import TimedOutException logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncTypingExtensionTestCase(SlidingSyncBase): """Tests for the typing notification sliding sync extension""" @@ -41,6 +55,8 @@ class SlidingSyncTypingExtensionTestCase(SlidingSyncBase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main + super().prepare(reactor, clock, hs) + def test_no_data_initial_sync(self) -> None: """ Test that enabling the typing extension works during an intitial sync, diff --git a/tests/rest/client/sliding_sync/test_extensions.py b/tests/rest/client/sliding_sync/test_extensions.py index ae823d5415..32478467aa 100644 --- a/tests/rest/client/sliding_sync/test_extensions.py +++ b/tests/rest/client/sliding_sync/test_extensions.py @@ -14,7 +14,7 @@ import logging from typing import Literal -from parameterized import parameterized +from parameterized import parameterized, parameterized_class from typing_extensions import assert_never from twisted.test.proto_helpers import MemoryReactor @@ -30,6 +30,18 @@ from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncExtensionsTestCase(SlidingSyncBase): """ Test general extensions behavior in the Sliding Sync API. Each extension has their @@ -49,6 +61,8 @@ class SlidingSyncExtensionsTestCase(SlidingSyncBase): self.storage_controllers = hs.get_storage_controllers() self.account_data_handler = hs.get_account_data_handler() + super().prepare(reactor, clock, hs) + # Any extensions that use `lists`/`rooms` should be tested here @parameterized.expand([("account_data",), ("receipts",), ("typing",)]) def test_extensions_lists_rooms_relevant_rooms( diff --git a/tests/rest/client/sliding_sync/test_room_subscriptions.py b/tests/rest/client/sliding_sync/test_room_subscriptions.py index cc17b0b354..e81d251839 100644 --- a/tests/rest/client/sliding_sync/test_room_subscriptions.py +++ b/tests/rest/client/sliding_sync/test_room_subscriptions.py @@ -14,6 +14,8 @@ import logging from http import HTTPStatus +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -27,6 +29,18 @@ from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncRoomSubscriptionsTestCase(SlidingSyncBase): """ Test `room_subscriptions` in the Sliding Sync API. @@ -43,6 +57,8 @@ class SlidingSyncRoomSubscriptionsTestCase(SlidingSyncBase): self.store = hs.get_datastores().main self.storage_controllers = hs.get_storage_controllers() + super().prepare(reactor, clock, hs) + def test_room_subscriptions_with_join_membership(self) -> None: """ Test `room_subscriptions` with a joined room should give us timeline and current diff --git a/tests/rest/client/sliding_sync/test_rooms_invites.py b/tests/rest/client/sliding_sync/test_rooms_invites.py index f08ffaf674..f6f45c2500 100644 --- a/tests/rest/client/sliding_sync/test_rooms_invites.py +++ b/tests/rest/client/sliding_sync/test_rooms_invites.py @@ -13,6 +13,8 @@ # import logging +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -27,6 +29,18 @@ from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncRoomsInvitesTestCase(SlidingSyncBase): """ Test to make sure the `rooms` response looks good for invites in the Sliding Sync API. @@ -49,6 +63,8 @@ class SlidingSyncRoomsInvitesTestCase(SlidingSyncBase): self.store = hs.get_datastores().main self.storage_controllers = hs.get_storage_controllers() + super().prepare(reactor, clock, hs) + def test_rooms_invite_shared_history_initial_sync(self) -> None: """ Test that `rooms` we are invited to have some stripped `invite_state` during an diff --git a/tests/rest/client/sliding_sync/test_rooms_meta.py b/tests/rest/client/sliding_sync/test_rooms_meta.py index 690912133a..71542923da 100644 --- a/tests/rest/client/sliding_sync/test_rooms_meta.py +++ b/tests/rest/client/sliding_sync/test_rooms_meta.py @@ -13,6 +13,8 @@ # import logging +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -28,6 +30,18 @@ from tests.test_utils.event_injection import create_event logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncRoomsMetaTestCase(SlidingSyncBase): """ Test rooms meta info like name, avatar, joined_count, invited_count, is_dm, @@ -49,6 +63,8 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase): assert persistence is not None self.persistence = persistence + super().prepare(reactor, clock, hs) + def test_rooms_meta_when_joined(self) -> None: """ Test that the `rooms` `name` and `avatar` are included in the response and diff --git a/tests/rest/client/sliding_sync/test_rooms_required_state.py b/tests/rest/client/sliding_sync/test_rooms_required_state.py index 823e7db569..5dab744461 100644 --- a/tests/rest/client/sliding_sync/test_rooms_required_state.py +++ b/tests/rest/client/sliding_sync/test_rooms_required_state.py @@ -13,7 +13,7 @@ # import logging -from parameterized import parameterized +from parameterized import parameterized, parameterized_class from twisted.test.proto_helpers import MemoryReactor @@ -30,6 +30,18 @@ from tests.test_utils.event_injection import mark_event_as_partial_state logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase): """ Test `rooms.required_state` in the Sliding Sync API. @@ -46,6 +58,8 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase): self.store = hs.get_datastores().main self.storage_controllers = hs.get_storage_controllers() + super().prepare(reactor, clock, hs) + def test_rooms_no_required_state(self) -> None: """ Empty `rooms.required_state` should not return any state events in the room diff --git a/tests/rest/client/sliding_sync/test_rooms_timeline.py b/tests/rest/client/sliding_sync/test_rooms_timeline.py index eeac0d6aa9..e56fb58012 100644 --- a/tests/rest/client/sliding_sync/test_rooms_timeline.py +++ b/tests/rest/client/sliding_sync/test_rooms_timeline.py @@ -14,6 +14,8 @@ import logging from typing import List, Optional +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -28,6 +30,18 @@ from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase): """ Test `rooms.timeline` in the Sliding Sync API. @@ -44,6 +58,8 @@ class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase): self.store = hs.get_datastores().main self.storage_controllers = hs.get_storage_controllers() + super().prepare(reactor, clock, hs) + def _assertListEqual( self, actual_items: StrSequence, diff --git a/tests/rest/client/sliding_sync/test_sliding_sync.py b/tests/rest/client/sliding_sync/test_sliding_sync.py index cb7638c5ba..2c58b38f9f 100644 --- a/tests/rest/client/sliding_sync/test_sliding_sync.py +++ b/tests/rest/client/sliding_sync/test_sliding_sync.py @@ -13,7 +13,9 @@ # import logging from typing import Any, Dict, Iterable, List, Literal, Optional, Tuple +from unittest.mock import AsyncMock +from parameterized import parameterized_class from typing_extensions import assert_never from twisted.test.proto_helpers import MemoryReactor @@ -47,8 +49,16 @@ logger = logging.getLogger(__name__) class SlidingSyncBase(unittest.HomeserverTestCase): """Base class for sliding sync test cases""" + # Flag as to whether to use the new sliding sync tables or not + use_new_tables: bool = True + sync_endpoint = "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync" + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + hs.get_datastores().main.have_finished_sliding_sync_background_jobs = AsyncMock( # type: ignore[method-assign] + return_value=self.use_new_tables + ) + def default_config(self) -> JsonDict: config = super().default_config() # Enable sliding sync @@ -203,6 +213,18 @@ class SlidingSyncBase(unittest.HomeserverTestCase): ) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncTestCase(SlidingSyncBase): """ Tests regarding MSC3575 Sliding Sync `/sync` endpoint. @@ -226,6 +248,8 @@ class SlidingSyncTestCase(SlidingSyncBase): self.storage_controllers = hs.get_storage_controllers() self.account_data_handler = hs.get_account_data_handler() + super().prepare(reactor, clock, hs) + def _add_new_dm_to_global_account_data( self, source_user_id: str, target_user_id: str, target_room_id: str ) -> None: From f78ab68fa23dc37dc2dad803efbe453151515e14 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Aug 2024 14:53:08 +0100 Subject: [PATCH 16/17] Add cache --- synapse/storage/_base.py | 4 ++++ synapse/storage/databases/main/roommember.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e12ab94576..ccbc218af3 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -123,6 +123,9 @@ class SQLBaseStore(metaclass=ABCMeta): self._attempt_to_invalidate_cache( "_get_rooms_for_local_user_where_membership_is_inner", (user_id,) ) + self._attempt_to_invalidate_cache( + "get_sliding_sync_rooms_for_user", (user_id,) + ) # Purge other caches based on room state. self._attempt_to_invalidate_cache("get_room_summary", (room_id,)) @@ -157,6 +160,7 @@ class SQLBaseStore(metaclass=ABCMeta): self._attempt_to_invalidate_cache("get_room_summary", (room_id,)) self._attempt_to_invalidate_cache("get_room_type", (room_id,)) self._attempt_to_invalidate_cache("get_room_encryption", (room_id,)) + self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None) def _attempt_to_invalidate_cache( self, cache_name: str, key: Optional[Collection[Any]] diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index eb6d3ca617..57b9b95c28 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -1382,7 +1382,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): desc="room_forgetter_stream_pos", ) - # @cached(iterable=True, max_entries=10000) + @cached(iterable=True, max_entries=10000) async def get_sliding_sync_rooms_for_user( self, user_id: str, From e923a8db814b5cc84b4d18df072e3bae10a49df3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Aug 2024 15:23:30 +0100 Subject: [PATCH 17/17] Get encryption state at the time --- synapse/handlers/sliding_sync/room_lists.py | 60 +++++++++++++++++++-- 1 file changed, 56 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index 8d11970fd8..f9b324e276 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -273,8 +273,13 @@ class SlidingSyncRoomLists: # after the `to_token`. In other words, there is no membership # for the room after the `to_token` but we see membership in # the token range. + + # Get the state at the time. Note that room type never changes, + # so we can just get current room type room_type = await self.store.get_room_type(room_id) - encryption = await self.store.get_room_encryption(room_id) + is_encrypted = await self.get_is_encrypted_for_room_at_token( + room_id, to_token.room_key + ) # Add back rooms that the user was state-reset out of after `to_token` room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( @@ -285,7 +290,7 @@ class SlidingSyncRoomLists: event_pos=change.event_pos, room_version_id=change.room_version_id, room_type=room_type, - is_encrypted=encryption is not None, + is_encrypted=is_encrypted, ) newly_joined_room_ids, newly_left_room_map = ( @@ -304,8 +309,12 @@ class SlidingSyncRoomLists: for room_id in ( newly_left_room_map.keys() - room_membership_for_user_map.keys() ): + # Get the state at the time. Note that room type never changes, + # so we can just get current room type room_type = await self.store.get_room_type(room_id) - encryption = await self.store.get_room_encryption(room_id) + is_encrypted = await self.get_is_encrypted_for_room_at_token( + room_id, newly_left_room_map[room_id].to_room_stream_token() + ) room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( room_id=room_id, @@ -315,7 +324,7 @@ class SlidingSyncRoomLists: event_pos=newly_left_room_map[room_id], room_version_id=await self.store.get_room_version_id(room_id), room_type=room_type, - is_encrypted=encryption is not None, + is_encrypted=is_encrypted, ) if sync_config.lists: @@ -1909,3 +1918,46 @@ class SlidingSyncRoomLists: # We want descending order reverse=True, ) + + async def get_is_encrypted_for_room_at_token( + self, room_id: str, to_token: RoomStreamToken + ) -> bool: + """Get if the room is encrypted at the time.""" + + # Fetch the current encryption state + state_ids = await self.store.get_partial_filtered_current_state_ids( + room_id, StateFilter.from_types([(EventTypes.RoomEncryption, "")]) + ) + encryption_event_id = state_ids.get((EventTypes.RoomEncryption, "")) + + # Now roll back the state by looking at the state deltas between + # to_token and now. + deltas = await self.store.get_current_state_deltas_for_room( + room_id, + from_token=to_token, + to_token=self.store.get_room_max_token(), + ) + + for delta in deltas: + if delta.event_type != EventTypes.RoomEncryption: + continue + + # Found the first change, we look at the previous event ID to get + # the state at the to token. + + if delta.prev_event_id is None: + # There is no prev event, so no encryption state event, so room is not encrypted + return False + + encryption_event_id = delta.prev_event_id + break + + # We didn't find an encryption state, room isn't encrypted + if encryption_event_id is None: + return False + + # We found encryption state, check if content has a non-null algorithm + encrypted_event = await self.store.get_event(encryption_event_id) + algorithm = encrypted_event.content.get(EventContentFields.ENCRYPTION_ALGORITHM) + + return algorithm is not None