WIP don't pull out left rooms

This commit is contained in:
Erik Johnston 2024-09-10 15:38:44 +01:00
parent d99dd4c1e4
commit 4cf2966797
6 changed files with 196 additions and 78 deletions

View file

@ -54,7 +54,6 @@ from synapse.storage.roommember import (
)
from synapse.types import (
MutableStateMap,
PersistedEventPosition,
RoomStreamToken,
StateMap,
StrCollection,
@ -219,6 +218,8 @@ class SlidingSyncRoomLists:
# include rooms that are outside the list ranges.
all_rooms: Set[str] = set()
# Note: this won't include rooms the user have left themselves. We add
# back in rooms that the user left since `from_token` below.
room_membership_for_user_map = await self.store.get_sliding_sync_rooms_for_user(
user_id
)
@ -234,44 +235,26 @@ class SlidingSyncRoomLists:
room_membership_for_user_map.pop(room_id)
continue
existing_room = room_membership_for_user_map.get(room_id)
if existing_room is not None:
# Update room membership events to the point in time of the `to_token`
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
room_id=room_id,
sender=change.sender,
membership=change.membership,
event_id=change.event_id,
event_pos=change.event_pos,
room_version_id=change.room_version_id,
# We keep the current state of the room though
room_type=existing_room.room_type,
is_encrypted=existing_room.is_encrypted,
)
else:
# This can happen if we get "state reset" out of the room
# after the `to_token`. In other words, there is no membership
# for the room after the `to_token` but we see membership in
# the token range.
current_room_entry = await self.store.get_sliding_sync_room_for_user(
user_id, room_id
)
if current_room_entry is None:
# We should always have an entry, even if we get state reset
# out of the room.
logger.error("Can't find room for user: %s / %s", user_id, room_id)
continue
# Get the state at the time. Note that room type never changes,
# so we can just get current room type
room_type = await self.store.get_room_type(room_id)
is_encrypted = await self.get_is_encrypted_for_room_at_token(
room_id, to_token.room_key
)
# Add back rooms that the user was state-reset out of after `to_token`
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
room_id=room_id,
sender=change.sender,
membership=change.membership,
event_id=change.event_id,
event_pos=change.event_pos,
room_version_id=change.room_version_id,
room_type=room_type,
is_encrypted=is_encrypted,
)
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
room_id=room_id,
sender=change.sender,
membership=change.membership,
event_id=change.event_id,
event_pos=change.event_pos,
room_version_id=change.room_version_id,
# We keep the current state of the room though
room_type=current_room_entry.room_type,
is_encrypted=current_room_entry.is_encrypted,
)
(
newly_joined_room_ids,
@ -281,31 +264,33 @@ class SlidingSyncRoomLists:
)
dm_room_ids = await self._get_dm_rooms_for_user(user_id)
# Handle state resets in the from -> to token range.
state_reset_rooms = (
newly_left_room_map.keys() - room_membership_for_user_map.keys()
)
if state_reset_rooms:
# Handle leaves and state resets in the from -> to token range.
left_rooms = newly_left_room_map.keys() - room_membership_for_user_map.keys()
if left_rooms:
room_membership_for_user_map = dict(room_membership_for_user_map)
for room_id in (
newly_left_room_map.keys() - room_membership_for_user_map.keys()
):
# Get the state at the time. Note that room type never changes,
# so we can just get current room type
room_type = await self.store.get_room_type(room_id)
is_encrypted = await self.get_is_encrypted_for_room_at_token(
room_id, newly_left_room_map[room_id].to_room_stream_token()
for room_id in left_rooms:
room_for_user = newly_left_room_map[room_id]
# We fetch the current room entry for the user, as that's the
# easiest way of getting the room type etc.
current_room_entry = await self.store.get_sliding_sync_room_for_user(
user_id, room_id
)
if current_room_entry is None:
# We should always have an entry, even if we get state reset
# out of the room.
logger.error("Can't find room for user: %s / %s", user_id, room_id)
continue
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
room_id=room_id,
sender=None,
membership=Membership.LEAVE,
event_id=None,
event_pos=newly_left_room_map[room_id],
room_version_id=await self.store.get_room_version_id(room_id),
room_type=room_type,
is_encrypted=is_encrypted,
sender=room_for_user.sender,
membership=room_for_user.membership,
event_id=room_for_user.event_id,
event_pos=room_for_user.event_pos,
room_version_id=room_for_user.room_version_id,
room_type=current_room_entry.room_type,
is_encrypted=current_room_entry.is_encrypted,
)
if sync_config.lists:
@ -417,8 +402,19 @@ class SlidingSyncRoomLists:
room_subscription,
) in sync_config.room_subscriptions.items():
if room_id not in room_membership_for_user_map:
# TODO: Handle rooms the user isn't in.
continue
# Check if we do have an entry for the room, but didn't
# pull it out above. This could be e.g. a leave that we
# don't pull out by default.
current_room_entry = (
await self.store.get_sliding_sync_room_for_user(
user_id, room_id
)
)
if not current_room_entry:
# TODO: Handle rooms the user isn't in.
continue
room_membership_for_user_map[room_id] = current_room_entry
all_rooms.add(room_id)
@ -944,18 +940,11 @@ class SlidingSyncRoomLists:
# Ensure we have entries for rooms that the user has been "state reset"
# out of. These are rooms appear in the `newly_left_rooms` map but
# aren't in the `rooms_for_user` map.
for room_id, left_event_pos in newly_left_room_ids.items():
for room_id, room_membership in newly_left_room_ids.items():
if room_id in rooms_for_user:
continue
rooms_for_user[room_id] = RoomsForUserStateReset(
room_id=room_id,
event_id=None,
event_pos=left_event_pos,
membership=Membership.LEAVE,
sender=None,
room_version_id=await self.store.get_room_version_id(room_id),
)
rooms_for_user[room_id] = room_membership
return rooms_for_user, newly_joined_room_ids, set(newly_left_room_ids)
@ -965,7 +954,7 @@ class SlidingSyncRoomLists:
user_id: str,
to_token: StreamToken,
from_token: Optional[StreamToken],
) -> Tuple[AbstractSet[str], Mapping[str, PersistedEventPosition]]:
) -> Tuple[AbstractSet[str], Mapping[str, RoomsForUserStateReset]]:
"""Fetch the sets of rooms that the user newly joined or left in the
given token range.
@ -975,10 +964,10 @@ class SlidingSyncRoomLists:
Returns:
A 2-tuple of newly joined room IDs and a map of newly left room
IDs to the event position the leave happened at.
IDs to the `RoomsForUserStateReset` entry.
"""
newly_joined_room_ids: Set[str] = set()
newly_left_room_map: Dict[str, PersistedEventPosition] = {}
newly_left_room_map: Dict[str, RoomsForUserStateReset] = {}
# We need to figure out the
#
@ -1045,12 +1034,21 @@ class SlidingSyncRoomLists:
# 2)
if last_membership_change_in_from_to_range.membership == Membership.JOIN:
possibly_newly_joined_room_ids.add(room_id)
break
# 1) Figure out newly_left rooms (> `from_token` and <= `to_token`).
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
if (
last_membership_change_in_from_to_range.prev_membership
== Membership.JOIN
):
# 1) Mark this room as `newly_left`
newly_left_room_map[room_id] = (
last_membership_change_in_from_to_range.event_pos
newly_left_room_map[room_id] = RoomsForUserStateReset(
room_id=room_id,
sender=last_membership_change_in_from_to_range.sender,
membership=Membership.LEAVE,
event_id=last_membership_change_in_from_to_range.event_id,
event_pos=last_membership_change_in_from_to_range.event_pos,
room_version_id=await self.store.get_room_version_id(room_id),
)
# 2) Figure out `newly_joined`

View file

@ -413,6 +413,8 @@ class PersistEventsStore:
to_insert = delta_state.to_insert
to_delete = delta_state.to_delete
# TODO: Add entry to membership snapshots on state resets.
# If no state is changing, we don't need to do anything. This can happen when a
# partial-stated room is re-syncing the current state.
if not to_insert and not to_delete:

View file

@ -1384,7 +1384,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
) -> Mapping[str, RoomsForUserSlidingSync]:
"""Get all the rooms for a user to handle a sliding sync request.
Ignores forgotten rooms and rooms that the user has been kicked from.
Ignores forgotten rooms and rooms that the user has left themselves.
Returns:
Map from room ID to membership info
@ -1404,6 +1404,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
LEFT JOIN sliding_sync_joined_rooms AS j ON (j.room_id = m.room_id AND m.membership = 'join')
WHERE user_id = ?
AND m.forgotten = 0
AND (m.membership != 'leave' OR m.user_id != m.sender)
"""
txn.execute(sql, (user_id,))
return {
@ -1425,6 +1426,47 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
get_sliding_sync_rooms_for_user_txn,
)
async def get_sliding_sync_room_for_user(
self, user_id: str, room_id: str
) -> Optional[RoomsForUserSlidingSync]:
"""Get the sliding sync room entry for the given user and room."""
def get_sliding_sync_room_for_user_txn(
txn: LoggingTransaction,
) -> Optional[RoomsForUserSlidingSync]:
sql = """
SELECT m.room_id, m.sender, m.membership, m.membership_event_id,
r.room_version,
m.event_instance_name, m.event_stream_ordering,
COALESCE(j.room_type, m.room_type),
COALESCE(j.is_encrypted, m.is_encrypted)
FROM sliding_sync_membership_snapshots AS m
INNER JOIN rooms AS r USING (room_id)
LEFT JOIN sliding_sync_joined_rooms AS j ON (j.room_id = m.room_id AND m.membership = 'join')
WHERE user_id = ?
AND m.forgotten = 0
AND m.room_id = ?
"""
txn.execute(sql, (user_id, room_id))
row = txn.fetchone()
if not row:
return None
return RoomsForUserSlidingSync(
room_id=row[0],
sender=row[1],
membership=row[2],
event_id=row[3],
room_version_id=row[4],
event_pos=PersistedEventPosition(row[5], row[6]),
room_type=row[7],
is_encrypted=row[8],
)
return await self.db_pool.runInteraction(
"get_sliding_sync_room_for_user", get_sliding_sync_room_for_user_txn
)
async def have_finished_sliding_sync_background_jobs(self) -> bool:
"""Return if it's safe to use the sliding sync membership tables."""

View file

@ -308,8 +308,24 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
return create_event
@cached(max_entries=10000)
async def get_room_type(self, room_id: str) -> Optional[str]:
raise NotImplementedError()
async def get_room_type(self, room_id: str) -> Union[Optional[str], Sentinel]:
"""Fetch room type for given room.
Since this function is cached, any missing values would be cached as
`None`. In order to distinguish between an unencrypted room that has
`None` encryption and a room that is unknown to the server where we
might want to omit the value (which would make it cached as `None`),
instead we use the sentinel value `ROOM_UNKNOWN_SENTINEL`.
"""
try:
create_event = await self.get_create_event_for_room(room_id)
return create_event.content.get(EventContentFields.ROOM_TYPE)
except NotFoundError:
# We use the sentinel value to distinguish between `None` which is a
# valid room type and a room that is unknown to the server so the value
# is just unset.
return ROOM_UNKNOWN_SENTINEL
@cachedList(cached_method_name="get_room_type", list_name="room_ids")
async def bulk_get_room_type(

View file

@ -947,6 +947,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return []
if from_key:
# TODO: We also need to invalidate this on current state change
has_changed = self._membership_stream_cache.has_entity_changed(
user_id, int(from_key.stream)
)

View file

@ -25,6 +25,7 @@ from synapse.api.constants import (
AccountDataTypes,
EventContentFields,
EventTypes,
JoinRules,
RoomTypes,
)
from synapse.events import EventBase
@ -42,6 +43,7 @@ from synapse.util.stringutils import random_string
from tests import unittest
from tests.server import TimedOutException
from tests.test_utils.event_injection import create_event
logger = logging.getLogger(__name__)
@ -1007,3 +1009,60 @@ class SlidingSyncTestCase(SlidingSyncBase):
# Make the Sliding Sync request
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
def test_state_reset_room_comes_down_incremental_sync(self) -> None:
"""Test that a room that we were state reset out of comes down
incremental sync"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, is_public=True, tok=user2_tok)
event_response = self.helper.send(room_id1, "test", tok=user2_tok)
event_id = event_response["event_id"]
self.helper.join(room_id1, user1_id, tok=user1_tok)
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 1,
}
}
}
# Make the Sliding Sync request
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
# Trigger a state reset
join_rule_event, join_rule_context = self.get_success(
create_event(
self.hs,
prev_event_ids=[event_id],
type=EventTypes.JoinRules,
state_key="",
content={"join_rule": JoinRules.INVITE},
sender=user2_id,
room_id=room_id1,
room_version=self.get_success(self.store.get_room_version_id(room_id1)),
)
)
self.get_success(
self.hs.get_storage_controllers().persistence.persist_event(
join_rule_event, join_rule_context
)
)
users_in_room = self.get_success(self.store.get_users_in_room(room_id1))
self.assertIncludes(set(users_in_room), {user2_id}, exact=True)
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
# TODO: What should we expect here? Probably at least *something*?
print(response_body["rooms"][room_id1])