mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-18 17:10:43 +03:00
Get lazy loaded members on incremental sync
This commit is contained in:
parent
3b234b53d5
commit
5066cbc6f6
1 changed files with 89 additions and 16 deletions
|
@ -869,6 +869,15 @@ class SlidingSyncHandler:
|
||||||
#
|
#
|
||||||
# Calculate the `StateFilter` based on the `required_state` for the room
|
# Calculate the `StateFilter` based on the `required_state` for the room
|
||||||
required_state_filter = StateFilter.none()
|
required_state_filter = StateFilter.none()
|
||||||
|
added_membership_state_filter = StateFilter.none()
|
||||||
|
# The requested `required_state_map` with the any lazy membership expanded and
|
||||||
|
# `$ME` replaced with the user's ID. This allows us to see what membership we've
|
||||||
|
# sent down to the client in the next request.
|
||||||
|
#
|
||||||
|
# Make a copy so we can modify it. Still need to be careful to make a copy of
|
||||||
|
# the state key sets if we want to add/remove from them. We could make a deep
|
||||||
|
# copy but this saves us some work.
|
||||||
|
expanded_required_state_map = dict(room_sync_config.required_state_map)
|
||||||
if room_membership_for_user_at_to_token.membership not in (
|
if room_membership_for_user_at_to_token.membership not in (
|
||||||
Membership.INVITE,
|
Membership.INVITE,
|
||||||
Membership.KNOCK,
|
Membership.KNOCK,
|
||||||
|
@ -933,22 +942,74 @@ class SlidingSyncHandler:
|
||||||
and state_key == StateValues.LAZY
|
and state_key == StateValues.LAZY
|
||||||
):
|
):
|
||||||
lazy_load_room_members = True
|
lazy_load_room_members = True
|
||||||
|
|
||||||
# Everyone in the timeline is relevant
|
# Everyone in the timeline is relevant
|
||||||
|
#
|
||||||
|
# FIXME: We probably also care about invite, ban, kick, targets, etc
|
||||||
|
# but the spec only mentions "senders".
|
||||||
timeline_membership: Set[str] = set()
|
timeline_membership: Set[str] = set()
|
||||||
if timeline_events is not None:
|
if timeline_events is not None:
|
||||||
for timeline_event in timeline_events:
|
for timeline_event in timeline_events:
|
||||||
timeline_membership.add(timeline_event.sender)
|
timeline_membership.add(timeline_event.sender)
|
||||||
|
|
||||||
|
# Add an explicit entry for each user in the timeline
|
||||||
|
expanded_required_state_map[EventTypes.Member] = (
|
||||||
|
# Make a copy of the state key set so we can modify it
|
||||||
|
# without affecting the original `required_state_map`
|
||||||
|
set(
|
||||||
|
expanded_required_state_map.get(
|
||||||
|
EventTypes.Member, set()
|
||||||
|
)
|
||||||
|
).union(timeline_membership)
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO
|
||||||
for user_id in timeline_membership:
|
for user_id in timeline_membership:
|
||||||
required_state_types.append(
|
required_state_types.append(
|
||||||
(EventTypes.Member, user_id)
|
(EventTypes.Member, user_id)
|
||||||
)
|
)
|
||||||
|
|
||||||
# FIXME: We probably also care about invite, ban, kick, targets, etc
|
# TODO
|
||||||
# but the spec only mentions "senders".
|
if prev_room_sync_config is not None:
|
||||||
|
previous_memberships_given_to_client = (
|
||||||
|
prev_room_sync_config.required_state_map.get(
|
||||||
|
EventTypes.Member, set()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Find what new memberships we need to send down
|
||||||
|
added_membership_user_ids: List[str] = []
|
||||||
|
for user_id in (
|
||||||
|
timeline_membership
|
||||||
|
- previous_memberships_given_to_client
|
||||||
|
):
|
||||||
|
added_membership_user_ids.append(user_id)
|
||||||
|
|
||||||
|
if added_membership_user_ids:
|
||||||
|
added_membership_state_filter = (
|
||||||
|
StateFilter.from_types(
|
||||||
|
[
|
||||||
|
(EventTypes.Member, user_id)
|
||||||
|
for user_id in added_membership_user_ids
|
||||||
|
]
|
||||||
|
)
|
||||||
|
)
|
||||||
elif state_key == StateValues.ME:
|
elif state_key == StateValues.ME:
|
||||||
num_others += 1
|
num_others += 1
|
||||||
required_state_types.append((state_type, user.to_string()))
|
required_state_types.append((state_type, user.to_string()))
|
||||||
|
# Replace `$ME` with the user's ID so we can deduplicate
|
||||||
|
# when someone requests the same state with `$ME` or with
|
||||||
|
# their user ID.
|
||||||
|
# without affecting the original `required_state_map`
|
||||||
|
expanded_required_state_map[EventTypes.Member] = (
|
||||||
|
# Make a copy of the state key set so we can modify it
|
||||||
|
# without affecting the original `required_state_map`
|
||||||
|
set(
|
||||||
|
expanded_required_state_map.get(
|
||||||
|
EventTypes.Member, set()
|
||||||
|
)
|
||||||
|
).union({user.to_string()})
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
num_others += 1
|
num_others += 1
|
||||||
required_state_types.append((state_type, state_key))
|
required_state_types.append((state_type, state_key))
|
||||||
|
@ -1006,15 +1067,27 @@ class SlidingSyncHandler:
|
||||||
else:
|
else:
|
||||||
assert from_bound is not None
|
assert from_bound is not None
|
||||||
|
|
||||||
|
# If we're lazy-loading membership, we need to fetch the current state for
|
||||||
|
# the new members we haven't seen before in the timeline. If we don't do
|
||||||
|
# this we'd only send down membership when it changes.
|
||||||
|
if added_membership_state_filter != StateFilter.none():
|
||||||
|
state_ids = await self.get_current_state_ids_at(
|
||||||
|
room_id=room_id,
|
||||||
|
room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
|
||||||
|
state_filter=added_membership_state_filter,
|
||||||
|
to_token=to_token,
|
||||||
|
)
|
||||||
|
room_state_delta_id_map.update(state_ids)
|
||||||
|
|
||||||
if prev_room_sync_config is not None:
|
if prev_room_sync_config is not None:
|
||||||
# Check if there are any changes to the required state config
|
# Check if there are any changes to the required state config
|
||||||
# that we need to handle.
|
# that we need to handle.
|
||||||
changed_required_state_map, added_state_filter = (
|
changed_required_state_map, added_state_filter = (
|
||||||
_required_state_changes(
|
_required_state_changes(
|
||||||
user.to_string(),
|
user.to_string(),
|
||||||
prev_room_sync_config,
|
prev_required_state_map=prev_room_sync_config.required_state_map,
|
||||||
room_sync_config,
|
request_required_state_map=expanded_required_state_map,
|
||||||
room_state_delta_id_map,
|
state_deltas=room_state_delta_id_map,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1107,13 +1180,13 @@ class SlidingSyncHandler:
|
||||||
# sensible order again.
|
# sensible order again.
|
||||||
bump_stamp = 0
|
bump_stamp = 0
|
||||||
|
|
||||||
unstable_expanded_timeline = False
|
room_sync_required_state_map_to_persist = expanded_required_state_map
|
||||||
# Record the `room_sync_config` if we're `ignore_timeline_bound` (which means
|
|
||||||
# that the `timeline_limit` has increased)
|
|
||||||
room_sync_required_state_map_to_persist = room_sync_config.required_state_map
|
|
||||||
if changed_required_state_map:
|
if changed_required_state_map:
|
||||||
room_sync_required_state_map_to_persist = changed_required_state_map
|
room_sync_required_state_map_to_persist = changed_required_state_map
|
||||||
|
|
||||||
|
# Record the `room_sync_config` if we're `ignore_timeline_bound` (which means
|
||||||
|
# that the `timeline_limit` has increased)
|
||||||
|
unstable_expanded_timeline = False
|
||||||
if ignore_timeline_bound:
|
if ignore_timeline_bound:
|
||||||
# FIXME: We signal the fact that we're sending down more events to
|
# FIXME: We signal the fact that we're sending down more events to
|
||||||
# the client by setting `unstable_expanded_timeline` to true (see
|
# the client by setting `unstable_expanded_timeline` to true (see
|
||||||
|
@ -1161,7 +1234,10 @@ class SlidingSyncHandler:
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
new_connection_state.room_configs[room_id] = room_sync_config
|
new_connection_state.room_configs[room_id] = RoomSyncConfig(
|
||||||
|
timeline_limit=room_sync_config.timeline_limit,
|
||||||
|
required_state_map=room_sync_required_state_map_to_persist,
|
||||||
|
)
|
||||||
|
|
||||||
set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)
|
set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)
|
||||||
|
|
||||||
|
@ -1281,8 +1357,9 @@ class SlidingSyncHandler:
|
||||||
|
|
||||||
def _required_state_changes(
|
def _required_state_changes(
|
||||||
user_id: str,
|
user_id: str,
|
||||||
previous_room_config: "RoomSyncConfig",
|
*,
|
||||||
room_sync_config: RoomSyncConfig,
|
prev_required_state_map: Mapping[str, AbstractSet[str]],
|
||||||
|
request_required_state_map: Mapping[str, AbstractSet[str]],
|
||||||
state_deltas: StateMap[str],
|
state_deltas: StateMap[str],
|
||||||
) -> Tuple[Optional[Mapping[str, AbstractSet[str]]], StateFilter]:
|
) -> Tuple[Optional[Mapping[str, AbstractSet[str]]], StateFilter]:
|
||||||
"""Calculates the changes between the required state room config from the
|
"""Calculates the changes between the required state room config from the
|
||||||
|
@ -1302,10 +1379,6 @@ def _required_state_changes(
|
||||||
A 2-tuple of updated required state config and the state filter to use
|
A 2-tuple of updated required state config and the state filter to use
|
||||||
to fetch extra current state that we need to return.
|
to fetch extra current state that we need to return.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
prev_required_state_map = previous_room_config.required_state_map
|
|
||||||
request_required_state_map = room_sync_config.required_state_map
|
|
||||||
|
|
||||||
if prev_required_state_map == request_required_state_map:
|
if prev_required_state_map == request_required_state_map:
|
||||||
# There has been no change. Return immediately.
|
# There has been no change. Return immediately.
|
||||||
return None, StateFilter.none()
|
return None, StateFilter.none()
|
||||||
|
|
Loading…
Reference in a new issue