From 5066cbc6f6b385a985bdd55223dd089ad28b821a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 8 Oct 2024 01:52:26 -0500 Subject: [PATCH] Get lazy loaded members on incremental sync --- synapse/handlers/sliding_sync/__init__.py | 105 ++++++++++++++++++---- 1 file changed, 89 insertions(+), 16 deletions(-) diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py index 09e8b6719b..6c39dfd523 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py @@ -869,6 +869,15 @@ class SlidingSyncHandler: # # Calculate the `StateFilter` based on the `required_state` for the room required_state_filter = StateFilter.none() + added_membership_state_filter = StateFilter.none() + # The requested `required_state_map` with the any lazy membership expanded and + # `$ME` replaced with the user's ID. This allows us to see what membership we've + # sent down to the client in the next request. + # + # Make a copy so we can modify it. Still need to be careful to make a copy of + # the state key sets if we want to add/remove from them. We could make a deep + # copy but this saves us some work. + expanded_required_state_map = dict(room_sync_config.required_state_map) if room_membership_for_user_at_to_token.membership not in ( Membership.INVITE, Membership.KNOCK, @@ -933,22 +942,74 @@ class SlidingSyncHandler: and state_key == StateValues.LAZY ): lazy_load_room_members = True + # Everyone in the timeline is relevant + # + # FIXME: We probably also care about invite, ban, kick, targets, etc + # but the spec only mentions "senders". timeline_membership: Set[str] = set() if timeline_events is not None: for timeline_event in timeline_events: timeline_membership.add(timeline_event.sender) + # Add an explicit entry for each user in the timeline + expanded_required_state_map[EventTypes.Member] = ( + # Make a copy of the state key set so we can modify it + # without affecting the original `required_state_map` + set( + expanded_required_state_map.get( + EventTypes.Member, set() + ) + ).union(timeline_membership) + ) + + # TODO for user_id in timeline_membership: required_state_types.append( (EventTypes.Member, user_id) ) - # FIXME: We probably also care about invite, ban, kick, targets, etc - # but the spec only mentions "senders". + # TODO + if prev_room_sync_config is not None: + previous_memberships_given_to_client = ( + prev_room_sync_config.required_state_map.get( + EventTypes.Member, set() + ) + ) + + # Find what new memberships we need to send down + added_membership_user_ids: List[str] = [] + for user_id in ( + timeline_membership + - previous_memberships_given_to_client + ): + added_membership_user_ids.append(user_id) + + if added_membership_user_ids: + added_membership_state_filter = ( + StateFilter.from_types( + [ + (EventTypes.Member, user_id) + for user_id in added_membership_user_ids + ] + ) + ) elif state_key == StateValues.ME: num_others += 1 required_state_types.append((state_type, user.to_string())) + # Replace `$ME` with the user's ID so we can deduplicate + # when someone requests the same state with `$ME` or with + # their user ID. + # without affecting the original `required_state_map` + expanded_required_state_map[EventTypes.Member] = ( + # Make a copy of the state key set so we can modify it + # without affecting the original `required_state_map` + set( + expanded_required_state_map.get( + EventTypes.Member, set() + ) + ).union({user.to_string()}) + ) else: num_others += 1 required_state_types.append((state_type, state_key)) @@ -1006,15 +1067,27 @@ class SlidingSyncHandler: else: assert from_bound is not None + # If we're lazy-loading membership, we need to fetch the current state for + # the new members we haven't seen before in the timeline. If we don't do + # this we'd only send down membership when it changes. + if added_membership_state_filter != StateFilter.none(): + state_ids = await self.get_current_state_ids_at( + room_id=room_id, + room_membership_for_user_at_to_token=room_membership_for_user_at_to_token, + state_filter=added_membership_state_filter, + to_token=to_token, + ) + room_state_delta_id_map.update(state_ids) + if prev_room_sync_config is not None: # Check if there are any changes to the required state config # that we need to handle. changed_required_state_map, added_state_filter = ( _required_state_changes( user.to_string(), - prev_room_sync_config, - room_sync_config, - room_state_delta_id_map, + prev_required_state_map=prev_room_sync_config.required_state_map, + request_required_state_map=expanded_required_state_map, + state_deltas=room_state_delta_id_map, ) ) @@ -1107,13 +1180,13 @@ class SlidingSyncHandler: # sensible order again. bump_stamp = 0 - unstable_expanded_timeline = False - # Record the `room_sync_config` if we're `ignore_timeline_bound` (which means - # that the `timeline_limit` has increased) - room_sync_required_state_map_to_persist = room_sync_config.required_state_map + room_sync_required_state_map_to_persist = expanded_required_state_map if changed_required_state_map: room_sync_required_state_map_to_persist = changed_required_state_map + # Record the `room_sync_config` if we're `ignore_timeline_bound` (which means + # that the `timeline_limit` has increased) + unstable_expanded_timeline = False if ignore_timeline_bound: # FIXME: We signal the fact that we're sending down more events to # the client by setting `unstable_expanded_timeline` to true (see @@ -1161,7 +1234,10 @@ class SlidingSyncHandler: ) else: - new_connection_state.room_configs[room_id] = room_sync_config + new_connection_state.room_configs[room_id] = RoomSyncConfig( + timeline_limit=room_sync_config.timeline_limit, + required_state_map=room_sync_required_state_map_to_persist, + ) set_tag(SynapseTags.RESULT_PREFIX + "initial", initial) @@ -1281,8 +1357,9 @@ class SlidingSyncHandler: def _required_state_changes( user_id: str, - previous_room_config: "RoomSyncConfig", - room_sync_config: RoomSyncConfig, + *, + prev_required_state_map: Mapping[str, AbstractSet[str]], + request_required_state_map: Mapping[str, AbstractSet[str]], state_deltas: StateMap[str], ) -> Tuple[Optional[Mapping[str, AbstractSet[str]]], StateFilter]: """Calculates the changes between the required state room config from the @@ -1302,10 +1379,6 @@ def _required_state_changes( A 2-tuple of updated required state config and the state filter to use to fetch extra current state that we need to return. """ - - prev_required_state_map = previous_room_config.required_state_map - request_required_state_map = room_sync_config.required_state_map - if prev_required_state_map == request_required_state_map: # There has been no change. Return immediately. return None, StateFilter.none()