mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-28 07:00:51 +03:00
Split out fetching of newly joined/left rooms
This commit is contained in:
parent
74bec29c1d
commit
58071bc9e5
1 changed files with 90 additions and 70 deletions
|
@ -25,6 +25,7 @@ from typing import (
|
||||||
Mapping,
|
Mapping,
|
||||||
Optional,
|
Optional,
|
||||||
Set,
|
Set,
|
||||||
|
Tuple,
|
||||||
Union,
|
Union,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -426,16 +427,16 @@ class SlidingSyncRoomLists:
|
||||||
)
|
)
|
||||||
|
|
||||||
@trace
|
@trace
|
||||||
async def _rewind_current_membership_to_token(
|
async def _get_rewind_changes_to_current_membership_to_token(
|
||||||
self,
|
self,
|
||||||
user: UserID,
|
user: UserID,
|
||||||
rooms_for_user: Mapping[str, RoomsForUser],
|
rooms_for_user: Mapping[str, RoomsForUser],
|
||||||
to_token: StreamToken,
|
to_token: StreamToken,
|
||||||
) -> Mapping[str, RoomsForUser]:
|
) -> Mapping[str, Optional[RoomsForUser]]:
|
||||||
"""
|
"""
|
||||||
Takes the current set of rooms for a user (retrieved after the given
|
Takes the current set of rooms for a user (retrieved after the given
|
||||||
token), and "rewinds" it to match the set of memberships *at that
|
token), and returns the changes need to "rewind" it to match the set of
|
||||||
token*.
|
memberships *at that token*.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
user: User to fetch rooms for
|
user: User to fetch rooms for
|
||||||
|
@ -443,7 +444,7 @@ class SlidingSyncRoomLists:
|
||||||
to_token: The token to rewind
|
to_token: The token to rewind
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
The set of memberships for the user at the given token
|
The changes to apply to rewind the the current memberships.
|
||||||
"""
|
"""
|
||||||
# If the user has never joined any rooms before, we can just return an empty list
|
# If the user has never joined any rooms before, we can just return an empty list
|
||||||
if not rooms_for_user:
|
if not rooms_for_user:
|
||||||
|
@ -511,11 +512,11 @@ class SlidingSyncRoomLists:
|
||||||
|
|
||||||
if not current_state_delta_membership_changes_after_to_token:
|
if not current_state_delta_membership_changes_after_to_token:
|
||||||
# There have been no membership changes, so we can early return.
|
# There have been no membership changes, so we can early return.
|
||||||
return rooms_for_user
|
return {}
|
||||||
|
|
||||||
# Otherwise we're about to make changes to `rooms_for_user`, so we turn
|
# Otherwise we're about to make changes to `rooms_for_user`, so we turn
|
||||||
# it into a mutable dict.
|
# it into a mutable dict.
|
||||||
rooms_for_user = dict(rooms_for_user)
|
changes: Dict[str, Optional[RoomsForUser]] = {}
|
||||||
|
|
||||||
# Assemble a list of the first membership event after the `to_token` so we can
|
# Assemble a list of the first membership event after the `to_token` so we can
|
||||||
# step backward to the previous membership that would apply to the from/to
|
# step backward to the previous membership that would apply to the from/to
|
||||||
|
@ -538,7 +539,7 @@ class SlidingSyncRoomLists:
|
||||||
) in first_membership_change_by_room_id_after_to_token.items():
|
) in first_membership_change_by_room_id_after_to_token.items():
|
||||||
# 1a) Remove rooms that the user joined after the `to_token`
|
# 1a) Remove rooms that the user joined after the `to_token`
|
||||||
if first_membership_change_after_to_token.prev_event_id is None:
|
if first_membership_change_after_to_token.prev_event_id is None:
|
||||||
rooms_for_user.pop(room_id, None)
|
changes[room_id] = None
|
||||||
# 1b) 1c) From the first membership event after the `to_token`, step backward to the
|
# 1b) 1c) From the first membership event after the `to_token`, step backward to the
|
||||||
# previous membership that would apply to the from/to range.
|
# previous membership that would apply to the from/to range.
|
||||||
else:
|
else:
|
||||||
|
@ -560,7 +561,7 @@ class SlidingSyncRoomLists:
|
||||||
else:
|
else:
|
||||||
room_version_id = await self.store.get_room_version_id(room_id)
|
room_version_id = await self.store.get_room_version_id(room_id)
|
||||||
|
|
||||||
rooms_for_user[room_id] = RoomsForUser(
|
changes[room_id] = RoomsForUser(
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
event_id=first_membership_change_after_to_token.prev_event_id,
|
event_id=first_membership_change_after_to_token.prev_event_id,
|
||||||
event_pos=first_membership_change_after_to_token.prev_event_pos,
|
event_pos=first_membership_change_after_to_token.prev_event_pos,
|
||||||
|
@ -572,9 +573,9 @@ class SlidingSyncRoomLists:
|
||||||
# If we can't find the previous membership event, we shouldn't
|
# If we can't find the previous membership event, we shouldn't
|
||||||
# include the room in the sync response since we can't determine the
|
# include the room in the sync response since we can't determine the
|
||||||
# exact membership state and shouldn't rely on the current snapshot.
|
# exact membership state and shouldn't rely on the current snapshot.
|
||||||
rooms_for_user.pop(room_id, None)
|
changes[room_id] = None
|
||||||
|
|
||||||
return rooms_for_user
|
return changes
|
||||||
|
|
||||||
@trace
|
@trace
|
||||||
async def get_room_membership_for_user_at_to_token(
|
async def get_room_membership_for_user_at_to_token(
|
||||||
|
@ -624,12 +625,23 @@ class SlidingSyncRoomLists:
|
||||||
# Since we fetched the users room list at some point in time after the
|
# Since we fetched the users room list at some point in time after the
|
||||||
# tokens, we need to revert/rewind some membership changes to match the point in
|
# tokens, we need to revert/rewind some membership changes to match the point in
|
||||||
# time of the `to_token`.
|
# time of the `to_token`.
|
||||||
rooms_for_user: Mapping[str, RoomsForUser] = {
|
rooms_for_user = {room.room_id: room for room in room_for_user_list}
|
||||||
room.room_id: room for room in room_for_user_list
|
changes = await self._get_rewind_changes_to_current_membership_to_token(
|
||||||
}
|
|
||||||
rooms_for_user = await self._rewind_current_membership_to_token(
|
|
||||||
user, rooms_for_user, to_token
|
user, rooms_for_user, to_token
|
||||||
)
|
)
|
||||||
|
for room_id, change_room_for_user in changes.items():
|
||||||
|
if change_room_for_user is None:
|
||||||
|
rooms_for_user.pop(room_id, None)
|
||||||
|
else:
|
||||||
|
rooms_for_user[room_id] = change_room_for_user
|
||||||
|
|
||||||
|
newly_joined_room_ids, newly_left_room_ids = (
|
||||||
|
await self._get_newly_joined_and_left_rooms(
|
||||||
|
user_id, to_token=to_token, from_token=from_token
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
dm_room_ids = await self._get_dm_rooms_for_user(user_id)
|
||||||
|
|
||||||
# Our working list of rooms that can show up in the sync response
|
# Our working list of rooms that can show up in the sync response
|
||||||
sync_room_id_set = {
|
sync_room_id_set = {
|
||||||
|
@ -643,18 +655,59 @@ class SlidingSyncRoomLists:
|
||||||
membership=room_for_user.membership,
|
membership=room_for_user.membership,
|
||||||
sender=room_for_user.sender,
|
sender=room_for_user.sender,
|
||||||
# We will update these fields below to be accurate
|
# We will update these fields below to be accurate
|
||||||
newly_joined=False,
|
newly_joined=room_id in newly_joined_room_ids,
|
||||||
newly_left=False,
|
newly_left=room_id in newly_left_room_ids,
|
||||||
is_dm=False,
|
is_dm=room_id in dm_room_ids,
|
||||||
)
|
)
|
||||||
for room_for_user in rooms_for_user.values()
|
for room_id, room_for_user in rooms_for_user.items()
|
||||||
}
|
}
|
||||||
|
|
||||||
# We now need to figure out the
|
# Ensure we have entries for rooms that the user has been "state reset"
|
||||||
|
# out of. These are rooms appear in the `newly_left_rooms` map but
|
||||||
|
# aren't in the `rooms_for_user` map.
|
||||||
|
for room_id, left_event_pos in newly_left_room_ids.items():
|
||||||
|
if room_id in sync_room_id_set:
|
||||||
|
continue
|
||||||
|
|
||||||
|
sync_room_id_set[room_id] = _RoomMembershipForUser(
|
||||||
|
room_id=room_id,
|
||||||
|
event_id=None,
|
||||||
|
event_pos=left_event_pos,
|
||||||
|
membership=Membership.LEAVE,
|
||||||
|
sender=None,
|
||||||
|
# We will update these fields below to be accurate
|
||||||
|
newly_joined=False,
|
||||||
|
newly_left=True,
|
||||||
|
is_dm=room_id in dm_room_ids,
|
||||||
|
)
|
||||||
|
|
||||||
|
return sync_room_id_set
|
||||||
|
|
||||||
|
@trace
|
||||||
|
async def _get_newly_joined_and_left_rooms(
|
||||||
|
self,
|
||||||
|
user_id: str,
|
||||||
|
to_token: StreamToken,
|
||||||
|
from_token: Optional[StreamToken],
|
||||||
|
) -> Tuple[StrCollection, Mapping[str, PersistedEventPosition]]:
|
||||||
|
"""Fetch the sets of rooms that the user newly joined or left in the
|
||||||
|
given token range.
|
||||||
|
|
||||||
|
Note: there may be rooms in the newly left rooms where the user was
|
||||||
|
"state reset" out of the room, and so that room would not be part of the
|
||||||
|
"current memberships" of the user.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A 2-tuple of newly joined room IDs and a map of newly left room
|
||||||
|
IDs to the event position the leave happened at.
|
||||||
|
"""
|
||||||
|
newly_joined_room_ids: Set[str] = set()
|
||||||
|
newly_left_room_map: Dict[str, PersistedEventPosition] = {}
|
||||||
|
|
||||||
|
# We need to figure out the
|
||||||
#
|
#
|
||||||
# - 1) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`)
|
# - 1) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`)
|
||||||
# - 2) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`)
|
# - 2) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`)
|
||||||
# - 3) Figure out which rooms are DM's
|
|
||||||
|
|
||||||
# 1) Fetch membership changes that fall in the range from `from_token` up to `to_token`
|
# 1) Fetch membership changes that fall in the range from `from_token` up to `to_token`
|
||||||
current_state_delta_membership_changes_in_from_to_range = []
|
current_state_delta_membership_changes_in_from_to_range = []
|
||||||
|
@ -720,36 +773,8 @@ class SlidingSyncRoomLists:
|
||||||
# 1) Figure out newly_left rooms (> `from_token` and <= `to_token`).
|
# 1) Figure out newly_left rooms (> `from_token` and <= `to_token`).
|
||||||
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
|
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
|
||||||
# 1) Mark this room as `newly_left`
|
# 1) Mark this room as `newly_left`
|
||||||
|
newly_left_room_map[room_id] = (
|
||||||
# If we're seeing a membership change here, we should expect to already
|
last_membership_change_in_from_to_range.event_pos
|
||||||
# have it in our snapshot but if a state reset happens, it wouldn't have
|
|
||||||
# shown up in our snapshot but appear as a change here.
|
|
||||||
existing_sync_entry = sync_room_id_set.get(room_id)
|
|
||||||
if existing_sync_entry is not None:
|
|
||||||
# Normal expected case
|
|
||||||
sync_room_id_set[room_id] = existing_sync_entry.copy_and_replace(
|
|
||||||
newly_left=True
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
# State reset!
|
|
||||||
logger.warn(
|
|
||||||
"State reset detected for room_id %s with %s who is no longer in the room",
|
|
||||||
room_id,
|
|
||||||
user_id,
|
|
||||||
)
|
|
||||||
# Even though a state reset happened which removed the person from
|
|
||||||
# the room, we still add it the list so the user knows they left the
|
|
||||||
# room. Downstream code can check for a state reset by looking for
|
|
||||||
# `event_id=None and membership is not None`.
|
|
||||||
sync_room_id_set[room_id] = _RoomMembershipForUser(
|
|
||||||
room_id=room_id,
|
|
||||||
event_id=last_membership_change_in_from_to_range.event_id,
|
|
||||||
event_pos=last_membership_change_in_from_to_range.event_pos,
|
|
||||||
membership=last_membership_change_in_from_to_range.membership,
|
|
||||||
sender=last_membership_change_in_from_to_range.sender,
|
|
||||||
newly_joined=False,
|
|
||||||
newly_left=True,
|
|
||||||
is_dm=False,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# 2) Figure out `newly_joined`
|
# 2) Figure out `newly_joined`
|
||||||
|
@ -761,9 +786,7 @@ class SlidingSyncRoomLists:
|
||||||
# also some non-join in the range, we know they `newly_joined`.
|
# also some non-join in the range, we know they `newly_joined`.
|
||||||
if has_non_join_in_from_to_range:
|
if has_non_join_in_from_to_range:
|
||||||
# We found a `newly_joined` room (we left and joined within the token range)
|
# We found a `newly_joined` room (we left and joined within the token range)
|
||||||
sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace(
|
newly_joined_room_ids.add(room_id)
|
||||||
newly_joined=True
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
prev_event_id = first_membership_change_by_room_id_in_from_to_range[
|
prev_event_id = first_membership_change_by_room_id_in_from_to_range[
|
||||||
room_id
|
room_id
|
||||||
|
@ -775,20 +798,23 @@ class SlidingSyncRoomLists:
|
||||||
if prev_event_id is None:
|
if prev_event_id is None:
|
||||||
# We found a `newly_joined` room (we are joining the room for the
|
# We found a `newly_joined` room (we are joining the room for the
|
||||||
# first time within the token range)
|
# first time within the token range)
|
||||||
sync_room_id_set[room_id] = sync_room_id_set[
|
newly_joined_room_ids.add(room_id)
|
||||||
room_id
|
|
||||||
].copy_and_replace(newly_joined=True)
|
|
||||||
# Last resort, we need to step back to the previous membership event
|
# Last resort, we need to step back to the previous membership event
|
||||||
# just before the token range to see if we're joined then or not.
|
# just before the token range to see if we're joined then or not.
|
||||||
elif prev_membership != Membership.JOIN:
|
elif prev_membership != Membership.JOIN:
|
||||||
# We found a `newly_joined` room (we left before the token range
|
# We found a `newly_joined` room (we left before the token range
|
||||||
# and joined within the token range)
|
# and joined within the token range)
|
||||||
sync_room_id_set[room_id] = sync_room_id_set[
|
newly_joined_room_ids.add(room_id)
|
||||||
room_id
|
|
||||||
].copy_and_replace(newly_joined=True)
|
return newly_joined_room_ids, newly_left_room_map
|
||||||
|
|
||||||
|
@trace
|
||||||
|
async def _get_dm_rooms_for_user(
|
||||||
|
self,
|
||||||
|
user_id: str,
|
||||||
|
) -> StrCollection:
|
||||||
|
"""Get the set of DM rooms for the user."""
|
||||||
|
|
||||||
# 3) Figure out which rooms the user considers to be direct-message (DM) rooms
|
|
||||||
#
|
|
||||||
# We're using global account data (`m.direct`) instead of checking for
|
# We're using global account data (`m.direct`) instead of checking for
|
||||||
# `is_direct` on membership events because that property only appears for
|
# `is_direct` on membership events because that property only appears for
|
||||||
# the invitee membership event (doesn't show up for the inviter).
|
# the invitee membership event (doesn't show up for the inviter).
|
||||||
|
@ -810,13 +836,7 @@ class SlidingSyncRoomLists:
|
||||||
if isinstance(room_id, str):
|
if isinstance(room_id, str):
|
||||||
dm_room_id_set.add(room_id)
|
dm_room_id_set.add(room_id)
|
||||||
|
|
||||||
# 3) Fixup
|
return dm_room_id_set
|
||||||
for room_id in sync_room_id_set:
|
|
||||||
sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace(
|
|
||||||
is_dm=room_id in dm_room_id_set
|
|
||||||
)
|
|
||||||
|
|
||||||
return sync_room_id_set
|
|
||||||
|
|
||||||
@trace
|
@trace
|
||||||
async def filter_rooms_relevant_for_sync(
|
async def filter_rooms_relevant_for_sync(
|
||||||
|
|
Loading…
Reference in a new issue