diff --git a/changelog.d/17630.misc b/changelog.d/17630.misc new file mode 100644 index 0000000000..ed1bf6bd55 --- /dev/null +++ b/changelog.d/17630.misc @@ -0,0 +1 @@ +Use new database tables for sliding sync. diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index 0e6cb28524..f9b324e276 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -18,6 +18,7 @@ import logging from itertools import chain from typing import ( TYPE_CHECKING, + AbstractSet, Any, Dict, List, @@ -25,6 +26,7 @@ from typing import ( Mapping, Optional, Set, + Tuple, Union, ) @@ -46,6 +48,7 @@ from synapse.storage.databases.main.state import ( Sentinel as StateSentinel, ) from synapse.storage.databases.main.stream import CurrentStateDeltaMembership +from synapse.storage.roommember import RoomsForUser, RoomsForUserSlidingSync from synapse.types import ( MutableStateMap, PersistedEventPosition, @@ -141,7 +144,10 @@ class _RoomMembershipForUser: def filter_membership_for_sync( - *, user_id: str, room_membership_for_user: _RoomMembershipForUser + *, + user_id: str, + room_membership_for_user: Union[_RoomMembershipForUser, RoomsForUserSlidingSync], + newly_left: bool, ) -> bool: """ Returns True if the membership event should be included in the sync response, @@ -154,7 +160,6 @@ def filter_membership_for_sync( membership = room_membership_for_user.membership sender = room_membership_for_user.sender - newly_left = room_membership_for_user.newly_left # We want to allow everything except rooms the user has left unless `newly_left` # because we want everything that's *still* relevant to the user. We include @@ -196,6 +201,309 @@ class SlidingSyncRoomLists: ) -> SlidingSyncInterestedRooms: """Fetch the set of rooms that match the request""" + if await self.store.have_finished_sliding_sync_background_jobs(): + return await self._compute_interested_rooms_new_tables( + sync_config=sync_config, + previous_connection_state=previous_connection_state, + to_token=to_token, + from_token=from_token, + ) + else: + # FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the + # foreground update for + # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by + # https://github.com/element-hq/synapse/issues/17623) + return await self._compute_interested_rooms_fallback( + sync_config=sync_config, + previous_connection_state=previous_connection_state, + to_token=to_token, + from_token=from_token, + ) + + @trace + async def _compute_interested_rooms_new_tables( + self, + sync_config: SlidingSyncConfig, + previous_connection_state: "PerConnectionState", + to_token: StreamToken, + from_token: Optional[StreamToken], + ) -> SlidingSyncInterestedRooms: + """Implementation of `compute_interested_rooms` using new sliding sync db tables.""" + user_id = sync_config.user.to_string() + + # Assemble sliding window lists + lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {} + # Keep track of the rooms that we can display and need to fetch more info about + relevant_room_map: Dict[str, RoomSyncConfig] = {} + # The set of room IDs of all rooms that could appear in any list. These + # include rooms that are outside the list ranges. + all_rooms: Set[str] = set() + + room_membership_for_user_map = await self.store.get_sliding_sync_rooms_for_user( + user_id + ) + + changes = await self._get_rewind_changes_to_current_membership_to_token( + sync_config.user, room_membership_for_user_map, to_token=to_token + ) + if changes: + room_membership_for_user_map = dict(room_membership_for_user_map) + for room_id, change in changes.items(): + if change is None: + # Remove rooms that the user joined after the `to_token` + room_membership_for_user_map.pop(room_id) + continue + + existing_room = room_membership_for_user_map.get(room_id) + if existing_room is not None: + # Update room membership events to the point in time of the `to_token` + room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( + room_id=room_id, + sender=change.sender, + membership=change.membership, + event_id=change.event_id, + event_pos=change.event_pos, + room_version_id=change.room_version_id, + # We keep the current state of the room though + room_type=existing_room.room_type, + is_encrypted=existing_room.is_encrypted, + ) + else: + # This can happen if we get "state reset" out of the room + # after the `to_token`. In other words, there is no membership + # for the room after the `to_token` but we see membership in + # the token range. + + # Get the state at the time. Note that room type never changes, + # so we can just get current room type + room_type = await self.store.get_room_type(room_id) + is_encrypted = await self.get_is_encrypted_for_room_at_token( + room_id, to_token.room_key + ) + + # Add back rooms that the user was state-reset out of after `to_token` + room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( + room_id=room_id, + sender=change.sender, + membership=change.membership, + event_id=change.event_id, + event_pos=change.event_pos, + room_version_id=change.room_version_id, + room_type=room_type, + is_encrypted=is_encrypted, + ) + + newly_joined_room_ids, newly_left_room_map = ( + await self._get_newly_joined_and_left_rooms( + user_id, from_token=from_token, to_token=to_token + ) + ) + dm_room_ids = await self._get_dm_rooms_for_user(user_id) + + # Handle state resets in the from -> to token range. + state_reset_rooms = ( + newly_left_room_map.keys() - room_membership_for_user_map.keys() + ) + if state_reset_rooms: + room_membership_for_user_map = dict(room_membership_for_user_map) + for room_id in ( + newly_left_room_map.keys() - room_membership_for_user_map.keys() + ): + # Get the state at the time. Note that room type never changes, + # so we can just get current room type + room_type = await self.store.get_room_type(room_id) + is_encrypted = await self.get_is_encrypted_for_room_at_token( + room_id, newly_left_room_map[room_id].to_room_stream_token() + ) + + room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( + room_id=room_id, + sender=None, + membership=Membership.LEAVE, + event_id=None, + event_pos=newly_left_room_map[room_id], + room_version_id=await self.store.get_room_version_id(room_id), + room_type=room_type, + is_encrypted=is_encrypted, + ) + + if sync_config.lists: + sync_room_map = { + room_id: room_membership_for_user + for room_id, room_membership_for_user in room_membership_for_user_map.items() + if filter_membership_for_sync( + user_id=user_id, + room_membership_for_user=room_membership_for_user, + newly_left=room_id in newly_left_room_map, + ) + } + with start_active_span("assemble_sliding_window_lists"): + for list_key, list_config in sync_config.lists.items(): + # Apply filters + filtered_sync_room_map = sync_room_map + if list_config.filters is not None: + filtered_sync_room_map = await self.filter_rooms_using_tables( + user_id, + sync_room_map, + list_config.filters, + to_token, + dm_room_ids, + ) + + # Find which rooms are partially stated and may need to be filtered out + # depending on the `required_state` requested (see below). + partial_state_room_map = ( + await self.store.is_partial_state_room_batched( + filtered_sync_room_map.keys() + ) + ) + + # Since creating the `RoomSyncConfig` takes some work, let's just do it + # once and make a copy whenever we need it. + room_sync_config = RoomSyncConfig.from_room_config(list_config) + + # Exclude partially-stated rooms if we must wait for the room to be + # fully-stated + if room_sync_config.must_await_full_state(self.is_mine_id): + filtered_sync_room_map = { + room_id: room + for room_id, room in filtered_sync_room_map.items() + if not partial_state_room_map.get(room_id) + } + + all_rooms.update(filtered_sync_room_map) + + # Sort the list + sorted_room_info = await self.sort_rooms_using_tables( + filtered_sync_room_map, to_token + ) + + ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] + if list_config.ranges: + for range in list_config.ranges: + room_ids_in_list: List[str] = [] + + # We're going to loop through the sorted list of rooms starting + # at the range start index and keep adding rooms until we fill + # up the range or run out of rooms. + # + # Both sides of range are inclusive so we `+ 1` + max_num_rooms = range[1] - range[0] + 1 + for room_membership in sorted_room_info[range[0] :]: + room_id = room_membership.room_id + + if len(room_ids_in_list) >= max_num_rooms: + break + + # Take the superset of the `RoomSyncConfig` for each room. + # + # Update our `relevant_room_map` with the room we're going + # to display and need to fetch more info about. + existing_room_sync_config = relevant_room_map.get( + room_id + ) + if existing_room_sync_config is not None: + room_sync_config = existing_room_sync_config.combine_room_sync_config( + room_sync_config + ) + + relevant_room_map[room_id] = room_sync_config + + room_ids_in_list.append(room_id) + + ops.append( + SlidingSyncResult.SlidingWindowList.Operation( + op=OperationType.SYNC, + range=range, + room_ids=room_ids_in_list, + ) + ) + + lists[list_key] = SlidingSyncResult.SlidingWindowList( + count=len(sorted_room_info), + ops=ops, + ) + + if sync_config.room_subscriptions: + with start_active_span("assemble_room_subscriptions"): + # Find which rooms are partially stated and may need to be filtered out + # depending on the `required_state` requested (see below). + partial_state_room_map = await self.store.is_partial_state_room_batched( + sync_config.room_subscriptions.keys() + ) + + for ( + room_id, + room_subscription, + ) in sync_config.room_subscriptions.items(): + if room_id not in room_membership_for_user_map: + # TODO: Handle rooms the user isn't in. + continue + + all_rooms.add(room_id) + + # Take the superset of the `RoomSyncConfig` for each room. + room_sync_config = RoomSyncConfig.from_room_config( + room_subscription + ) + + # Exclude partially-stated rooms if we must wait for the room to be + # fully-stated + if room_sync_config.must_await_full_state(self.is_mine_id): + if partial_state_room_map.get(room_id): + continue + + all_rooms.add(room_id) + + # Update our `relevant_room_map` with the room we're going to display + # and need to fetch more info about. + existing_room_sync_config = relevant_room_map.get(room_id) + if existing_room_sync_config is not None: + room_sync_config = ( + existing_room_sync_config.combine_room_sync_config( + room_sync_config + ) + ) + + relevant_room_map[room_id] = room_sync_config + + # Filtered subset of `relevant_room_map` for rooms that may have updates + # (in the event stream) + relevant_rooms_to_send_map = await self._filter_relevant_room_to_send( + previous_connection_state, from_token, relevant_room_map + ) + + return SlidingSyncInterestedRooms( + lists=lists, + relevant_room_map=relevant_room_map, + relevant_rooms_to_send_map=relevant_rooms_to_send_map, + all_rooms=all_rooms, + room_membership_for_user_map={ + # FIXME: Ideally we wouldn't have to do these copies and instead + # just return `newly_joined_room_ids` directly. + room_id: _RoomMembershipForUser( + room_id=room_id, + event_id=membership_info.event_id, + event_pos=membership_info.event_pos, + sender=membership_info.sender, + membership=membership_info.membership, + newly_joined=room_id in newly_joined_room_ids, + newly_left=room_id in newly_left_room_map, + is_dm=room_id in dm_room_ids, + ) + for room_id, membership_info in room_membership_for_user_map.items() + }, + ) + + async def _compute_interested_rooms_fallback( + self, + sync_config: SlidingSyncConfig, + previous_connection_state: "PerConnectionState", + to_token: StreamToken, + from_token: Optional[StreamToken], + ) -> SlidingSyncInterestedRooms: + """Fallback code when the database background updates haven't completed yet.""" + room_membership_for_user_map = ( await self.get_room_membership_for_user_at_to_token( sync_config.user, to_token, from_token @@ -357,6 +665,29 @@ class SlidingSyncRoomLists: relevant_room_map[room_id] = room_sync_config + # Filtered subset of `relevant_room_map` for rooms that may have updates + # (in the event stream) + relevant_rooms_to_send_map = await self._filter_relevant_room_to_send( + previous_connection_state, from_token, relevant_room_map + ) + + return SlidingSyncInterestedRooms( + lists=lists, + relevant_room_map=relevant_room_map, + relevant_rooms_to_send_map=relevant_rooms_to_send_map, + all_rooms=all_rooms, + room_membership_for_user_map=room_membership_for_user_map, + ) + + async def _filter_relevant_room_to_send( + self, + previous_connection_state: PerConnectionState, + from_token: Optional[StreamToken], + relevant_room_map: Dict[str, RoomSyncConfig], + ) -> Dict[str, RoomSyncConfig]: + """Filters the `relevant_room_map` down to those rooms that may have + updates we need to fetch and return.""" + # Filtered subset of `relevant_room_map` for rooms that may have updates # (in the event stream) relevant_rooms_to_send_map: Dict[str, RoomSyncConfig] = relevant_room_map @@ -416,14 +747,159 @@ class SlidingSyncRoomLists: if room_id in rooms_should_send } - return SlidingSyncInterestedRooms( - lists=lists, - relevant_room_map=relevant_room_map, - relevant_rooms_to_send_map=relevant_rooms_to_send_map, - all_rooms=all_rooms, - room_membership_for_user_map=room_membership_for_user_map, + return relevant_rooms_to_send_map + + @trace + async def _get_rewind_changes_to_current_membership_to_token( + self, + user: UserID, + rooms_for_user: Mapping[str, Union[RoomsForUser, RoomsForUserSlidingSync]], + to_token: StreamToken, + ) -> Mapping[str, Optional[RoomsForUser]]: + """ + Takes the current set of rooms for a user (retrieved after the given + token), and returns the changes need to "rewind" it to match the set of + memberships *at that token*. + + Args: + user: User to fetch rooms for + rooms_for_user: The set of rooms for the user after the `to_token`. + to_token: The token to rewind + + Returns: + The changes to apply to rewind the the current memberships. + """ + # If the user has never joined any rooms before, we can just return an empty list + if not rooms_for_user: + return {} + + user_id = user.to_string() + + # Get the `RoomStreamToken` that represents the spot we queried up to when we got + # our membership snapshot from `get_rooms_for_local_user_where_membership_is()`. + # + # First, we need to get the max stream_ordering of each event persister instance + # that we queried events from. + instance_to_max_stream_ordering_map: Dict[str, int] = {} + for room_for_user in rooms_for_user.values(): + instance_name = room_for_user.event_pos.instance_name + stream_ordering = room_for_user.event_pos.stream + + current_instance_max_stream_ordering = ( + instance_to_max_stream_ordering_map.get(instance_name) + ) + if ( + current_instance_max_stream_ordering is None + or stream_ordering > current_instance_max_stream_ordering + ): + instance_to_max_stream_ordering_map[instance_name] = stream_ordering + + # Then assemble the `RoomStreamToken` + min_stream_pos = min(instance_to_max_stream_ordering_map.values()) + membership_snapshot_token = RoomStreamToken( + # Minimum position in the `instance_map` + stream=min_stream_pos, + instance_map=immutabledict( + { + instance_name: stream_pos + for instance_name, stream_pos in instance_to_max_stream_ordering_map.items() + if stream_pos > min_stream_pos + } + ), ) + # Since we fetched the users room list at some point in time after the + # tokens, we need to revert/rewind some membership changes to match the point in + # time of the `to_token`. In particular, we need to make these fixups: + # + # - a) Remove rooms that the user joined after the `to_token` + # - b) Add back rooms that the user left after the `to_token` + # - c) Update room membership events to the point in time of the `to_token` + + # Fetch membership changes that fall in the range from `to_token` up to + # `membership_snapshot_token` + # + # If our `to_token` is already the same or ahead of the latest room membership + # for the user, we don't need to do any "2)" fix-ups and can just straight-up + # use the room list from the snapshot as a base (nothing has changed) + current_state_delta_membership_changes_after_to_token = [] + if not membership_snapshot_token.is_before_or_eq(to_token.room_key): + current_state_delta_membership_changes_after_to_token = ( + await self.store.get_current_state_delta_membership_changes_for_user( + user_id, + from_key=to_token.room_key, + to_key=membership_snapshot_token, + excluded_room_ids=self.rooms_to_exclude_globally, + ) + ) + + if not current_state_delta_membership_changes_after_to_token: + # There have been no membership changes, so we can early return. + return {} + + # Otherwise we're about to make changes to `rooms_for_user`, so we turn + # it into a mutable dict. + changes: Dict[str, Optional[RoomsForUser]] = {} + + # Assemble a list of the first membership event after the `to_token` so we can + # step backward to the previous membership that would apply to the from/to + # range. + first_membership_change_by_room_id_after_to_token: Dict[ + str, CurrentStateDeltaMembership + ] = {} + for membership_change in current_state_delta_membership_changes_after_to_token: + # Only set if we haven't already set it + first_membership_change_by_room_id_after_to_token.setdefault( + membership_change.room_id, membership_change + ) + + # Since we fetched a snapshot of the users room list at some point in time after + # the from/to tokens, we need to revert/rewind some membership changes to match + # the point in time of the `to_token`. + for ( + room_id, + first_membership_change_after_to_token, + ) in first_membership_change_by_room_id_after_to_token.items(): + # 1a) Remove rooms that the user joined after the `to_token` + if first_membership_change_after_to_token.prev_event_id is None: + changes[room_id] = None + # 1b) 1c) From the first membership event after the `to_token`, step backward to the + # previous membership that would apply to the from/to range. + else: + # We don't expect these fields to be `None` if we have a `prev_event_id` + # but we're being defensive since it's possible that the prev event was + # culled from the database. + if ( + first_membership_change_after_to_token.prev_event_pos is not None + and first_membership_change_after_to_token.prev_membership + is not None + and first_membership_change_after_to_token.prev_sender is not None + ): + # We need to know the room version ID, which we normally we + # can get from the current membership, but if we don't have + # that then we need to query the DB. + current_membership = rooms_for_user.get(room_id) + if current_membership is not None: + room_version_id = current_membership.room_version_id + else: + room_version_id = await self.store.get_room_version_id(room_id) + + changes[room_id] = RoomsForUser( + room_id=room_id, + event_id=first_membership_change_after_to_token.prev_event_id, + event_pos=first_membership_change_after_to_token.prev_event_pos, + membership=first_membership_change_after_to_token.prev_membership, + sender=first_membership_change_after_to_token.prev_sender, + room_version_id=room_version_id, + ) + else: + # If we can't find the previous membership event, we shouldn't + # include the room in the sync response since we can't determine the + # exact membership state and shouldn't rely on the current snapshot. + changes[room_id] = None + + return changes + @trace async def get_room_membership_for_user_at_to_token( self, @@ -469,6 +945,27 @@ class SlidingSyncRoomLists: if not room_for_user_list: return {} + # Since we fetched the users room list at some point in time after the + # tokens, we need to revert/rewind some membership changes to match the point in + # time of the `to_token`. + rooms_for_user = {room.room_id: room for room in room_for_user_list} + changes = await self._get_rewind_changes_to_current_membership_to_token( + user, rooms_for_user, to_token + ) + for room_id, change_room_for_user in changes.items(): + if change_room_for_user is None: + rooms_for_user.pop(room_id, None) + else: + rooms_for_user[room_id] = change_room_for_user + + newly_joined_room_ids, newly_left_room_ids = ( + await self._get_newly_joined_and_left_rooms( + user_id, to_token=to_token, from_token=from_token + ) + ) + + dm_room_ids = await self._get_dm_rooms_for_user(user_id) + # Our working list of rooms that can show up in the sync response sync_room_id_set = { # Note: The `room_for_user` we're assigning here will need to be fixed up @@ -481,127 +978,61 @@ class SlidingSyncRoomLists: membership=room_for_user.membership, sender=room_for_user.sender, # We will update these fields below to be accurate - newly_joined=False, - newly_left=False, - is_dm=False, + newly_joined=room_id in newly_joined_room_ids, + newly_left=room_id in newly_left_room_ids, + is_dm=room_id in dm_room_ids, ) - for room_for_user in room_for_user_list + for room_id, room_for_user in rooms_for_user.items() } - # Get the `RoomStreamToken` that represents the spot we queried up to when we got - # our membership snapshot from `get_rooms_for_local_user_where_membership_is()`. - # - # First, we need to get the max stream_ordering of each event persister instance - # that we queried events from. - instance_to_max_stream_ordering_map: Dict[str, int] = {} - for room_for_user in room_for_user_list: - instance_name = room_for_user.event_pos.instance_name - stream_ordering = room_for_user.event_pos.stream + # Ensure we have entries for rooms that the user has been "state reset" + # out of. These are rooms appear in the `newly_left_rooms` map but + # aren't in the `rooms_for_user` map. + for room_id, left_event_pos in newly_left_room_ids.items(): + if room_id in sync_room_id_set: + continue - current_instance_max_stream_ordering = ( - instance_to_max_stream_ordering_map.get(instance_name) - ) - if ( - current_instance_max_stream_ordering is None - or stream_ordering > current_instance_max_stream_ordering - ): - instance_to_max_stream_ordering_map[instance_name] = stream_ordering - - # Then assemble the `RoomStreamToken` - min_stream_pos = min(instance_to_max_stream_ordering_map.values()) - membership_snapshot_token = RoomStreamToken( - # Minimum position in the `instance_map` - stream=min_stream_pos, - instance_map=immutabledict( - { - instance_name: stream_pos - for instance_name, stream_pos in instance_to_max_stream_ordering_map.items() - if stream_pos > min_stream_pos - } - ), - ) - - # Since we fetched the users room list at some point in time after the from/to - # tokens, we need to revert/rewind some membership changes to match the point in - # time of the `to_token`. In particular, we need to make these fixups: - # - # - 1a) Remove rooms that the user joined after the `to_token` - # - 1b) Add back rooms that the user left after the `to_token` - # - 1c) Update room membership events to the point in time of the `to_token` - # - 2) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`) - # - 3) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`) - # - 4) Figure out which rooms are DM's - - # 1) Fetch membership changes that fall in the range from `to_token` up to - # `membership_snapshot_token` - # - # If our `to_token` is already the same or ahead of the latest room membership - # for the user, we don't need to do any "2)" fix-ups and can just straight-up - # use the room list from the snapshot as a base (nothing has changed) - current_state_delta_membership_changes_after_to_token = [] - if not membership_snapshot_token.is_before_or_eq(to_token.room_key): - current_state_delta_membership_changes_after_to_token = ( - await self.store.get_current_state_delta_membership_changes_for_user( - user_id, - from_key=to_token.room_key, - to_key=membership_snapshot_token, - excluded_room_ids=self.rooms_to_exclude_globally, - ) + sync_room_id_set[room_id] = _RoomMembershipForUser( + room_id=room_id, + event_id=None, + event_pos=left_event_pos, + membership=Membership.LEAVE, + sender=None, + # We will update these fields below to be accurate + newly_joined=False, + newly_left=True, + is_dm=room_id in dm_room_ids, ) - # 1) Assemble a list of the first membership event after the `to_token` so we can - # step backward to the previous membership that would apply to the from/to - # range. - first_membership_change_by_room_id_after_to_token: Dict[ - str, CurrentStateDeltaMembership - ] = {} - for membership_change in current_state_delta_membership_changes_after_to_token: - # Only set if we haven't already set it - first_membership_change_by_room_id_after_to_token.setdefault( - membership_change.room_id, membership_change - ) + return sync_room_id_set - # 1) Fixup + @trace + async def _get_newly_joined_and_left_rooms( + self, + user_id: str, + to_token: StreamToken, + from_token: Optional[StreamToken], + ) -> Tuple[StrCollection, Mapping[str, PersistedEventPosition]]: + """Fetch the sets of rooms that the user newly joined or left in the + given token range. + + Note: there may be rooms in the newly left rooms where the user was + "state reset" out of the room, and so that room would not be part of the + "current memberships" of the user. + + Returns: + A 2-tuple of newly joined room IDs and a map of newly left room + IDs to the event position the leave happened at. + """ + newly_joined_room_ids: Set[str] = set() + newly_left_room_map: Dict[str, PersistedEventPosition] = {} + + # We need to figure out the # - # Since we fetched a snapshot of the users room list at some point in time after - # the from/to tokens, we need to revert/rewind some membership changes to match - # the point in time of the `to_token`. - for ( - room_id, - first_membership_change_after_to_token, - ) in first_membership_change_by_room_id_after_to_token.items(): - # 1a) Remove rooms that the user joined after the `to_token` - if first_membership_change_after_to_token.prev_event_id is None: - sync_room_id_set.pop(room_id, None) - # 1b) 1c) From the first membership event after the `to_token`, step backward to the - # previous membership that would apply to the from/to range. - else: - # We don't expect these fields to be `None` if we have a `prev_event_id` - # but we're being defensive since it's possible that the prev event was - # culled from the database. - if ( - first_membership_change_after_to_token.prev_event_pos is not None - and first_membership_change_after_to_token.prev_membership - is not None - ): - sync_room_id_set[room_id] = _RoomMembershipForUser( - room_id=room_id, - event_id=first_membership_change_after_to_token.prev_event_id, - event_pos=first_membership_change_after_to_token.prev_event_pos, - membership=first_membership_change_after_to_token.prev_membership, - sender=first_membership_change_after_to_token.prev_sender, - # We will update these fields below to be accurate - newly_joined=False, - newly_left=False, - is_dm=False, - ) - else: - # If we can't find the previous membership event, we shouldn't - # include the room in the sync response since we can't determine the - # exact membership state and shouldn't rely on the current snapshot. - sync_room_id_set.pop(room_id, None) + # - 1) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`) + # - 2) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`) - # 2) Fetch membership changes that fall in the range from `from_token` up to `to_token` + # 1) Fetch membership changes that fall in the range from `from_token` up to `to_token` current_state_delta_membership_changes_in_from_to_range = [] if from_token: current_state_delta_membership_changes_in_from_to_range = ( @@ -613,7 +1044,7 @@ class SlidingSyncRoomLists: ) ) - # 2) Assemble a list of the last membership events in some given ranges. Someone + # 1) Assemble a list of the last membership events in some given ranges. Someone # could have left and joined multiple times during the given range but we only # care about end-result so we grab the last one. last_membership_change_by_room_id_in_from_to_range: Dict[ @@ -646,9 +1077,9 @@ class SlidingSyncRoomLists: if membership_change.membership != Membership.JOIN: has_non_join_event_by_room_id_in_from_to_range[room_id] = True - # 2) Fixup + # 1) Fixup # - # 3) We also want to assemble a list of possibly newly joined rooms. Someone + # 2) We also want to assemble a list of possibly newly joined rooms. Someone # could have left and joined multiple times during the given range but we only # care about whether they are joined at the end of the token range so we are # working with the last membership even in the token range. @@ -658,46 +1089,18 @@ class SlidingSyncRoomLists: ) in last_membership_change_by_room_id_in_from_to_range.values(): room_id = last_membership_change_in_from_to_range.room_id - # 3) + # 2) if last_membership_change_in_from_to_range.membership == Membership.JOIN: possibly_newly_joined_room_ids.add(room_id) - # 2) Figure out newly_left rooms (> `from_token` and <= `to_token`). + # 1) Figure out newly_left rooms (> `from_token` and <= `to_token`). if last_membership_change_in_from_to_range.membership == Membership.LEAVE: - # 2) Mark this room as `newly_left` + # 1) Mark this room as `newly_left` + newly_left_room_map[room_id] = ( + last_membership_change_in_from_to_range.event_pos + ) - # If we're seeing a membership change here, we should expect to already - # have it in our snapshot but if a state reset happens, it wouldn't have - # shown up in our snapshot but appear as a change here. - existing_sync_entry = sync_room_id_set.get(room_id) - if existing_sync_entry is not None: - # Normal expected case - sync_room_id_set[room_id] = existing_sync_entry.copy_and_replace( - newly_left=True - ) - else: - # State reset! - logger.warn( - "State reset detected for room_id %s with %s who is no longer in the room", - room_id, - user_id, - ) - # Even though a state reset happened which removed the person from - # the room, we still add it the list so the user knows they left the - # room. Downstream code can check for a state reset by looking for - # `event_id=None and membership is not None`. - sync_room_id_set[room_id] = _RoomMembershipForUser( - room_id=room_id, - event_id=last_membership_change_in_from_to_range.event_id, - event_pos=last_membership_change_in_from_to_range.event_pos, - membership=last_membership_change_in_from_to_range.membership, - sender=last_membership_change_in_from_to_range.sender, - newly_joined=False, - newly_left=True, - is_dm=False, - ) - - # 3) Figure out `newly_joined` + # 2) Figure out `newly_joined` for room_id in possibly_newly_joined_room_ids: has_non_join_in_from_to_range = ( has_non_join_event_by_room_id_in_from_to_range.get(room_id, False) @@ -706,9 +1109,7 @@ class SlidingSyncRoomLists: # also some non-join in the range, we know they `newly_joined`. if has_non_join_in_from_to_range: # We found a `newly_joined` room (we left and joined within the token range) - sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace( - newly_joined=True - ) + newly_joined_room_ids.add(room_id) else: prev_event_id = first_membership_change_by_room_id_in_from_to_range[ room_id @@ -720,20 +1121,23 @@ class SlidingSyncRoomLists: if prev_event_id is None: # We found a `newly_joined` room (we are joining the room for the # first time within the token range) - sync_room_id_set[room_id] = sync_room_id_set[ - room_id - ].copy_and_replace(newly_joined=True) + newly_joined_room_ids.add(room_id) # Last resort, we need to step back to the previous membership event # just before the token range to see if we're joined then or not. elif prev_membership != Membership.JOIN: # We found a `newly_joined` room (we left before the token range # and joined within the token range) - sync_room_id_set[room_id] = sync_room_id_set[ - room_id - ].copy_and_replace(newly_joined=True) + newly_joined_room_ids.add(room_id) + + return newly_joined_room_ids, newly_left_room_map + + @trace + async def _get_dm_rooms_for_user( + self, + user_id: str, + ) -> AbstractSet[str]: + """Get the set of DM rooms for the user.""" - # 4) Figure out which rooms the user considers to be direct-message (DM) rooms - # # We're using global account data (`m.direct`) instead of checking for # `is_direct` on membership events because that property only appears for # the invitee membership event (doesn't show up for the inviter). @@ -755,13 +1159,7 @@ class SlidingSyncRoomLists: if isinstance(room_id, str): dm_room_id_set.add(room_id) - # 4) Fixup - for room_id in sync_room_id_set: - sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace( - is_dm=room_id in dm_room_id_set - ) - - return sync_room_id_set + return dm_room_id_set @trace async def filter_rooms_relevant_for_sync( @@ -803,6 +1201,7 @@ class SlidingSyncRoomLists: if filter_membership_for_sync( user_id=user_id, room_membership_for_user=room_membership_for_user, + newly_left=room_membership_for_user.newly_left, ) } @@ -1295,6 +1694,174 @@ class SlidingSyncRoomLists: # Assemble a new sync room map but only with the `filtered_room_id_set` return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set} + @trace + async def filter_rooms_using_tables( + self, + user_id: str, + sync_room_map: Mapping[str, RoomsForUserSlidingSync], + filters: SlidingSyncConfig.SlidingSyncList.Filters, + to_token: StreamToken, + dm_room_ids: AbstractSet[str], + ) -> Dict[str, RoomsForUserSlidingSync]: + """ + Filter rooms based on the sync request. + + Args: + user: User to filter rooms for + sync_room_map: Dictionary of room IDs to sort along with membership + information in the room at the time of `to_token`. + filters: Filters to apply + to_token: We filter based on the state of the room at this token + dm_room_ids: Set of room IDs which are DMs + + Returns: + A filtered dictionary of room IDs along with membership information in the + room at the time of `to_token`. + """ + + filtered_room_id_set = set(sync_room_map.keys()) + + # Filter for Direct-Message (DM) rooms + if filters.is_dm is not None: + with start_active_span("filters.is_dm"): + if filters.is_dm: + # Intersect with the DM room set + filtered_room_id_set &= dm_room_ids + else: + # Remove DMs + filtered_room_id_set -= dm_room_ids + + if filters.spaces is not None: + with start_active_span("filters.spaces"): + raise NotImplementedError() + + # Filter for encrypted rooms + if filters.is_encrypted is not None: + filtered_room_id_set = { + room_id + for room_id in filtered_room_id_set + if sync_room_map[room_id].is_encrypted == filters.is_encrypted + } + + # Filter for rooms that the user has been invited to + if filters.is_invite is not None: + with start_active_span("filters.is_invite"): + # Make a copy so we don't run into an error: `Set changed size during + # iteration`, when we filter out and remove items + for room_id in filtered_room_id_set.copy(): + room_for_user = sync_room_map[room_id] + # If we're looking for invite rooms, filter out rooms that the user is + # not invited to and vice versa + if ( + filters.is_invite + and room_for_user.membership != Membership.INVITE + ) or ( + not filters.is_invite + and room_for_user.membership == Membership.INVITE + ): + filtered_room_id_set.remove(room_id) + + # Filter by room type (space vs room, etc). A room must match one of the types + # provided in the list. `None` is a valid type for rooms which do not have a + # room type. + if filters.room_types is not None or filters.not_room_types is not None: + with start_active_span("filters.room_types"): + # Make a copy so we don't run into an error: `Set changed size during + # iteration`, when we filter out and remove items + for room_id in filtered_room_id_set.copy(): + room_type = sync_room_map[room_id].room_type + + if ( + filters.room_types is not None + and room_type not in filters.room_types + ): + filtered_room_id_set.remove(room_id) + + if ( + filters.not_room_types is not None + and room_type in filters.not_room_types + ): + filtered_room_id_set.remove(room_id) + + if filters.room_name_like is not None: + with start_active_span("filters.room_name_like"): + # TODO: The room name is a bit more sensitive to leak than the + # create/encryption event. Maybe we should consider a better way to fetch + # historical state before implementing this. + # + # room_id_to_create_content = await self._bulk_get_partial_current_state_content_for_rooms( + # content_type="room_name", + # room_ids=filtered_room_id_set, + # to_token=to_token, + # sync_room_map=sync_room_map, + # room_id_to_stripped_state_map=room_id_to_stripped_state_map, + # ) + raise NotImplementedError() + + if filters.tags is not None or filters.not_tags is not None: + with start_active_span("filters.tags"): + raise NotImplementedError() + + # Assemble a new sync room map but only with the `filtered_room_id_set` + return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set} + + @trace + async def sort_rooms_using_tables( + self, + sync_room_map: Mapping[str, RoomsForUserSlidingSync], + to_token: StreamToken, + ) -> List[RoomsForUserSlidingSync]: + """ + Sort by `stream_ordering` of the last event that the user should see in the + room. `stream_ordering` is unique so we get a stable sort. + + Args: + sync_room_map: Dictionary of room IDs to sort along with membership + information in the room at the time of `to_token`. + to_token: We sort based on the events in the room at this token (<= `to_token`) + + Returns: + A sorted list of room IDs by `stream_ordering` along with membership information. + """ + + # Assemble a map of room ID to the `stream_ordering` of the last activity that the + # user should see in the room (<= `to_token`) + last_activity_in_room_map: Dict[str, int] = {} + + for room_id, room_for_user in sync_room_map.items(): + if room_for_user.membership != Membership.JOIN: + # If the user has left/been invited/knocked/been banned from a + # room, they shouldn't see anything past that point. + # + # FIXME: It's possible that people should see beyond this point + # in invited/knocked cases if for example the room has + # `invite`/`world_readable` history visibility, see + # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 + last_activity_in_room_map[room_id] = room_for_user.event_pos.stream + + # For fully-joined rooms, we find the latest activity at/before the + # `to_token`. + joined_room_positions = ( + await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering( + [ + room_id + for room_id, room_for_user in sync_room_map.items() + if room_for_user.membership == Membership.JOIN + ], + to_token.room_key, + ) + ) + + last_activity_in_room_map.update(joined_room_positions) + + return sorted( + sync_room_map.values(), + # Sort by the last activity (stream_ordering) in the room + key=lambda room_info: last_activity_in_room_map[room_info.room_id], + # We want descending order + reverse=True, + ) + @trace async def sort_rooms( self, @@ -1351,3 +1918,46 @@ class SlidingSyncRoomLists: # We want descending order reverse=True, ) + + async def get_is_encrypted_for_room_at_token( + self, room_id: str, to_token: RoomStreamToken + ) -> bool: + """Get if the room is encrypted at the time.""" + + # Fetch the current encryption state + state_ids = await self.store.get_partial_filtered_current_state_ids( + room_id, StateFilter.from_types([(EventTypes.RoomEncryption, "")]) + ) + encryption_event_id = state_ids.get((EventTypes.RoomEncryption, "")) + + # Now roll back the state by looking at the state deltas between + # to_token and now. + deltas = await self.store.get_current_state_deltas_for_room( + room_id, + from_token=to_token, + to_token=self.store.get_room_max_token(), + ) + + for delta in deltas: + if delta.event_type != EventTypes.RoomEncryption: + continue + + # Found the first change, we look at the previous event ID to get + # the state at the to token. + + if delta.prev_event_id is None: + # There is no prev event, so no encryption state event, so room is not encrypted + return False + + encryption_event_id = delta.prev_event_id + break + + # We didn't find an encryption state, room isn't encrypted + if encryption_event_id is None: + return False + + # We found encryption state, check if content has a non-null algorithm + encrypted_event = await self.store.get_event(encryption_event_id) + algorithm = encrypted_event.content.get(EventContentFields.ENCRYPTION_ALGORITHM) + + return algorithm is not None diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 1ac85ad66d..d22160b85c 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -126,6 +126,9 @@ class SQLBaseStore(metaclass=ABCMeta): self._attempt_to_invalidate_cache( "_get_rooms_for_local_user_where_membership_is_inner", (user_id,) ) + self._attempt_to_invalidate_cache( + "get_sliding_sync_rooms_for_user", (user_id,) + ) # Purge other caches based on room state. self._attempt_to_invalidate_cache("get_room_summary", (room_id,)) @@ -160,6 +163,7 @@ class SQLBaseStore(metaclass=ABCMeta): self._attempt_to_invalidate_cache("get_room_summary", (room_id,)) self._attempt_to_invalidate_cache("get_room_type", (room_id,)) self._attempt_to_invalidate_cache("get_room_encryption", (room_id,)) + self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None) def _attempt_to_invalidate_cache( self, cache_name: str, key: Optional[Collection[Any]] diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index f473294070..efe4238036 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -44,7 +44,7 @@ from synapse._pydantic_compat import HAS_PYDANTIC_V2 from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.engines import PostgresEngine from synapse.storage.types import Connection, Cursor -from synapse.types import JsonDict +from synapse.types import JsonDict, StrCollection from synapse.util import Clock, json_encoder from . import engines @@ -487,6 +487,25 @@ class BackgroundUpdater: return not update_exists + async def have_completed_background_updates( + self, update_names: StrCollection + ) -> bool: + """Return the name of background updates that have not yet been + completed""" + if self._all_done: + return True + + rows = await self.db_pool.simple_select_many_batch( + table="background_updates", + column="update_name", + iterable=update_names, + retcols=("update_name",), + desc="get_uncompleted_background_updates", + ) + + # If we find any rows then we've not completed the update. + return not bool(rows) + async def do_next_background_update(self, sleep: bool = True) -> bool: """Does some amount of work on the next queued background update diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index cb23f433bc..62315c2d96 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -2318,6 +2318,17 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS return len(memberships_to_update_rows) + async def have_finished_sliding_sync_background_jobs(self) -> bool: + """Return if its safe to use the sliding sync membership tables.""" + + return await self.db_pool.updates.have_completed_background_updates( + ( + _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE, + _BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE, + _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE, + ) + ) + def _resolve_stale_data_in_sliding_sync_tables( txn: LoggingTransaction, diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 722686d4b8..57b9b95c28 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -53,7 +53,12 @@ from synapse.storage.database import ( from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import Sqlite3Engine -from synapse.storage.roommember import MemberSummary, ProfileInfo, RoomsForUser +from synapse.storage.roommember import ( + MemberSummary, + ProfileInfo, + RoomsForUser, + RoomsForUserSlidingSync, +) from synapse.types import ( JsonDict, PersistedEventPosition, @@ -1377,6 +1382,54 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): desc="room_forgetter_stream_pos", ) + @cached(iterable=True, max_entries=10000) + async def get_sliding_sync_rooms_for_user( + self, + user_id: str, + ) -> Mapping[str, RoomsForUserSlidingSync]: + """Get all the rooms for a user to handle a sliding sync request. + + Ignores forgotten rooms and rooms that the user has been kicked from. + + Returns: + Map from room ID to membership info + """ + + def get_sliding_sync_rooms_for_user_txn( + txn: LoggingTransaction, + ) -> Dict[str, RoomsForUserSlidingSync]: + sql = """ + SELECT m.room_id, m.sender, m.membership, m.membership_event_id, + r.room_version, + m.event_instance_name, m.event_stream_ordering, + COALESCE(j.room_type, m.room_type), + COALESCE(j.is_encrypted, m.is_encrypted) + FROM sliding_sync_membership_snapshots AS m + INNER JOIN rooms AS r USING (room_id) + LEFT JOIN sliding_sync_joined_rooms AS j ON (j.room_id = m.room_id AND m.membership = 'join') + WHERE user_id = ? + AND m.forgotten = 0 + """ + txn.execute(sql, (user_id,)) + return { + row[0]: RoomsForUserSlidingSync( + room_id=row[0], + sender=row[1], + membership=row[2], + event_id=row[3], + room_version_id=row[4], + event_pos=PersistedEventPosition(row[5], row[6]), + room_type=row[7], + is_encrypted=row[8], + ) + for row in txn + } + + return await self.db_pool.runInteraction( + "get_sliding_sync_rooms_for_user", + get_sliding_sync_rooms_for_user_txn, + ) + class RoomMemberBackgroundUpdateStore(SQLBaseStore): def __init__( diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 80c9630867..09213627ec 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -39,6 +39,19 @@ class RoomsForUser: room_version_id: str +@attr.s(slots=True, frozen=True, weakref_slot=False, auto_attribs=True) +class RoomsForUserSlidingSync: + room_id: str + sender: Optional[str] + membership: str + event_id: Optional[str] + event_pos: PersistedEventPosition + room_version_id: str + + room_type: Optional[str] + is_encrypted: bool + + @attr.s(slots=True, frozen=True, weakref_slot=False, auto_attribs=True) class GetRoomsForUserWithStreamOrdering: room_id: str diff --git a/tests/rest/client/sliding_sync/test_connection_tracking.py b/tests/rest/client/sliding_sync/test_connection_tracking.py index 6863c32f7c..436bd4466c 100644 --- a/tests/rest/client/sliding_sync/test_connection_tracking.py +++ b/tests/rest/client/sliding_sync/test_connection_tracking.py @@ -13,7 +13,7 @@ # import logging -from parameterized import parameterized +from parameterized import parameterized, parameterized_class from twisted.test.proto_helpers import MemoryReactor @@ -28,6 +28,18 @@ from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase): """ Test connection tracking in the Sliding Sync API. @@ -44,6 +56,8 @@ class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase): self.store = hs.get_datastores().main self.storage_controllers = hs.get_storage_controllers() + super().prepare(reactor, clock, hs) + def test_rooms_required_state_incremental_sync_LIVE(self) -> None: """Test that we only get state updates in incremental sync for rooms we've already seen (LIVE). diff --git a/tests/rest/client/sliding_sync/test_extension_account_data.py b/tests/rest/client/sliding_sync/test_extension_account_data.py index 3482a5f887..65a6adf4af 100644 --- a/tests/rest/client/sliding_sync/test_extension_account_data.py +++ b/tests/rest/client/sliding_sync/test_extension_account_data.py @@ -13,6 +13,8 @@ # import logging +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -28,6 +30,18 @@ from tests.server import TimedOutException logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase): """Tests for the account_data sliding sync extension""" @@ -43,6 +57,8 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase): self.store = hs.get_datastores().main self.account_data_handler = hs.get_account_data_handler() + super().prepare(reactor, clock, hs) + def test_no_data_initial_sync(self) -> None: """ Test that enabling the account_data extension works during an intitial sync, diff --git a/tests/rest/client/sliding_sync/test_extension_e2ee.py b/tests/rest/client/sliding_sync/test_extension_e2ee.py index 320f8c788f..2ff6687796 100644 --- a/tests/rest/client/sliding_sync/test_extension_e2ee.py +++ b/tests/rest/client/sliding_sync/test_extension_e2ee.py @@ -13,6 +13,8 @@ # import logging +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -27,6 +29,18 @@ from tests.server import TimedOutException logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncE2eeExtensionTestCase(SlidingSyncBase): """Tests for the e2ee sliding sync extension""" @@ -42,6 +56,8 @@ class SlidingSyncE2eeExtensionTestCase(SlidingSyncBase): self.store = hs.get_datastores().main self.e2e_keys_handler = hs.get_e2e_keys_handler() + super().prepare(reactor, clock, hs) + def test_no_data_initial_sync(self) -> None: """ Test that enabling e2ee extension works during an intitial sync, even if there diff --git a/tests/rest/client/sliding_sync/test_extension_receipts.py b/tests/rest/client/sliding_sync/test_extension_receipts.py index e842349ed2..90b035dd75 100644 --- a/tests/rest/client/sliding_sync/test_extension_receipts.py +++ b/tests/rest/client/sliding_sync/test_extension_receipts.py @@ -13,6 +13,8 @@ # import logging +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -28,6 +30,18 @@ from tests.server import TimedOutException logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase): """Tests for the receipts sliding sync extension""" @@ -42,6 +56,8 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main + super().prepare(reactor, clock, hs) + def test_no_data_initial_sync(self) -> None: """ Test that enabling the receipts extension works during an intitial sync, diff --git a/tests/rest/client/sliding_sync/test_extension_to_device.py b/tests/rest/client/sliding_sync/test_extension_to_device.py index f8500812ea..5ba2443089 100644 --- a/tests/rest/client/sliding_sync/test_extension_to_device.py +++ b/tests/rest/client/sliding_sync/test_extension_to_device.py @@ -14,6 +14,8 @@ import logging from typing import List +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -28,6 +30,18 @@ from tests.server import TimedOutException logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase): """Tests for the to-device sliding sync extension""" @@ -40,6 +54,7 @@ class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main + super().prepare(reactor, clock, hs) def _assert_to_device_response( self, response_body: JsonDict, expected_messages: List[JsonDict] diff --git a/tests/rest/client/sliding_sync/test_extension_typing.py b/tests/rest/client/sliding_sync/test_extension_typing.py index 7f523e0f10..0a0f5aff1a 100644 --- a/tests/rest/client/sliding_sync/test_extension_typing.py +++ b/tests/rest/client/sliding_sync/test_extension_typing.py @@ -13,6 +13,8 @@ # import logging +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -28,6 +30,18 @@ from tests.server import TimedOutException logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncTypingExtensionTestCase(SlidingSyncBase): """Tests for the typing notification sliding sync extension""" @@ -41,6 +55,8 @@ class SlidingSyncTypingExtensionTestCase(SlidingSyncBase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main + super().prepare(reactor, clock, hs) + def test_no_data_initial_sync(self) -> None: """ Test that enabling the typing extension works during an intitial sync, diff --git a/tests/rest/client/sliding_sync/test_extensions.py b/tests/rest/client/sliding_sync/test_extensions.py index ae823d5415..32478467aa 100644 --- a/tests/rest/client/sliding_sync/test_extensions.py +++ b/tests/rest/client/sliding_sync/test_extensions.py @@ -14,7 +14,7 @@ import logging from typing import Literal -from parameterized import parameterized +from parameterized import parameterized, parameterized_class from typing_extensions import assert_never from twisted.test.proto_helpers import MemoryReactor @@ -30,6 +30,18 @@ from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncExtensionsTestCase(SlidingSyncBase): """ Test general extensions behavior in the Sliding Sync API. Each extension has their @@ -49,6 +61,8 @@ class SlidingSyncExtensionsTestCase(SlidingSyncBase): self.storage_controllers = hs.get_storage_controllers() self.account_data_handler = hs.get_account_data_handler() + super().prepare(reactor, clock, hs) + # Any extensions that use `lists`/`rooms` should be tested here @parameterized.expand([("account_data",), ("receipts",), ("typing",)]) def test_extensions_lists_rooms_relevant_rooms( diff --git a/tests/rest/client/sliding_sync/test_room_subscriptions.py b/tests/rest/client/sliding_sync/test_room_subscriptions.py index cc17b0b354..e81d251839 100644 --- a/tests/rest/client/sliding_sync/test_room_subscriptions.py +++ b/tests/rest/client/sliding_sync/test_room_subscriptions.py @@ -14,6 +14,8 @@ import logging from http import HTTPStatus +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -27,6 +29,18 @@ from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncRoomSubscriptionsTestCase(SlidingSyncBase): """ Test `room_subscriptions` in the Sliding Sync API. @@ -43,6 +57,8 @@ class SlidingSyncRoomSubscriptionsTestCase(SlidingSyncBase): self.store = hs.get_datastores().main self.storage_controllers = hs.get_storage_controllers() + super().prepare(reactor, clock, hs) + def test_room_subscriptions_with_join_membership(self) -> None: """ Test `room_subscriptions` with a joined room should give us timeline and current diff --git a/tests/rest/client/sliding_sync/test_rooms_invites.py b/tests/rest/client/sliding_sync/test_rooms_invites.py index f08ffaf674..f6f45c2500 100644 --- a/tests/rest/client/sliding_sync/test_rooms_invites.py +++ b/tests/rest/client/sliding_sync/test_rooms_invites.py @@ -13,6 +13,8 @@ # import logging +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -27,6 +29,18 @@ from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncRoomsInvitesTestCase(SlidingSyncBase): """ Test to make sure the `rooms` response looks good for invites in the Sliding Sync API. @@ -49,6 +63,8 @@ class SlidingSyncRoomsInvitesTestCase(SlidingSyncBase): self.store = hs.get_datastores().main self.storage_controllers = hs.get_storage_controllers() + super().prepare(reactor, clock, hs) + def test_rooms_invite_shared_history_initial_sync(self) -> None: """ Test that `rooms` we are invited to have some stripped `invite_state` during an diff --git a/tests/rest/client/sliding_sync/test_rooms_meta.py b/tests/rest/client/sliding_sync/test_rooms_meta.py index 690912133a..71542923da 100644 --- a/tests/rest/client/sliding_sync/test_rooms_meta.py +++ b/tests/rest/client/sliding_sync/test_rooms_meta.py @@ -13,6 +13,8 @@ # import logging +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -28,6 +30,18 @@ from tests.test_utils.event_injection import create_event logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncRoomsMetaTestCase(SlidingSyncBase): """ Test rooms meta info like name, avatar, joined_count, invited_count, is_dm, @@ -49,6 +63,8 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase): assert persistence is not None self.persistence = persistence + super().prepare(reactor, clock, hs) + def test_rooms_meta_when_joined(self) -> None: """ Test that the `rooms` `name` and `avatar` are included in the response and diff --git a/tests/rest/client/sliding_sync/test_rooms_required_state.py b/tests/rest/client/sliding_sync/test_rooms_required_state.py index 498c921cbd..436ae684da 100644 --- a/tests/rest/client/sliding_sync/test_rooms_required_state.py +++ b/tests/rest/client/sliding_sync/test_rooms_required_state.py @@ -13,7 +13,7 @@ # import logging -from parameterized import parameterized +from parameterized import parameterized, parameterized_class from twisted.test.proto_helpers import MemoryReactor @@ -30,6 +30,18 @@ from tests.test_utils.event_injection import mark_event_as_partial_state logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase): """ Test `rooms.required_state` in the Sliding Sync API. @@ -46,6 +58,8 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase): self.store = hs.get_datastores().main self.storage_controllers = hs.get_storage_controllers() + super().prepare(reactor, clock, hs) + def test_rooms_no_required_state(self) -> None: """ Empty `rooms.required_state` should not return any state events in the room diff --git a/tests/rest/client/sliding_sync/test_rooms_timeline.py b/tests/rest/client/sliding_sync/test_rooms_timeline.py index eeac0d6aa9..e56fb58012 100644 --- a/tests/rest/client/sliding_sync/test_rooms_timeline.py +++ b/tests/rest/client/sliding_sync/test_rooms_timeline.py @@ -14,6 +14,8 @@ import logging from typing import List, Optional +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -28,6 +30,18 @@ from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase): """ Test `rooms.timeline` in the Sliding Sync API. @@ -44,6 +58,8 @@ class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase): self.store = hs.get_datastores().main self.storage_controllers = hs.get_storage_controllers() + super().prepare(reactor, clock, hs) + def _assertListEqual( self, actual_items: StrSequence, diff --git a/tests/rest/client/sliding_sync/test_sliding_sync.py b/tests/rest/client/sliding_sync/test_sliding_sync.py index cb7638c5ba..2c58b38f9f 100644 --- a/tests/rest/client/sliding_sync/test_sliding_sync.py +++ b/tests/rest/client/sliding_sync/test_sliding_sync.py @@ -13,7 +13,9 @@ # import logging from typing import Any, Dict, Iterable, List, Literal, Optional, Tuple +from unittest.mock import AsyncMock +from parameterized import parameterized_class from typing_extensions import assert_never from twisted.test.proto_helpers import MemoryReactor @@ -47,8 +49,16 @@ logger = logging.getLogger(__name__) class SlidingSyncBase(unittest.HomeserverTestCase): """Base class for sliding sync test cases""" + # Flag as to whether to use the new sliding sync tables or not + use_new_tables: bool = True + sync_endpoint = "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync" + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + hs.get_datastores().main.have_finished_sliding_sync_background_jobs = AsyncMock( # type: ignore[method-assign] + return_value=self.use_new_tables + ) + def default_config(self) -> JsonDict: config = super().default_config() # Enable sliding sync @@ -203,6 +213,18 @@ class SlidingSyncBase(unittest.HomeserverTestCase): ) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncTestCase(SlidingSyncBase): """ Tests regarding MSC3575 Sliding Sync `/sync` endpoint. @@ -226,6 +248,8 @@ class SlidingSyncTestCase(SlidingSyncBase): self.storage_controllers = hs.get_storage_controllers() self.account_data_handler = hs.get_account_data_handler() + super().prepare(reactor, clock, hs) + def _add_new_dm_to_global_account_data( self, source_user_id: str, target_user_id: str, target_room_id: str ) -> None: