Use new room store to track if we've sent a room down

This commit is contained in:
Erik Johnston 2024-07-16 12:32:24 +01:00
parent 53273db3e8
commit e2a88e44ef

View file

@ -24,6 +24,7 @@ from typing import TYPE_CHECKING, Any, Dict, Final, List, Mapping, Optional, Set
import attr
from immutabledict import immutabledict
from typing_extensions import assert_never
from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership
from synapse.events import EventBase
@ -342,6 +343,8 @@ class SlidingSyncHandler:
self.relations_handler = hs.get_relations_handler()
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
self.connection_store = SlidingSyncConnectionStore()
async def wait_for_sync_for_user(
self,
requester: Requester,
@ -449,6 +452,12 @@ class SlidingSyncHandler:
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()
await self.connection_store.mark_token_seen(
user_id,
conn_id=sync_config.connection_id(),
from_token=from_token,
)
# Get all of the room IDs that the user should be able to see in the sync
# response
has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
@ -596,7 +605,7 @@ class SlidingSyncHandler:
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
for room_id, room_sync_config in relevant_room_map.items():
room_sync_result = await self.get_room_sync_data(
user=sync_config.user,
sync_config=sync_config,
room_id=room_id,
room_sync_config=room_sync_config,
room_membership_for_user_at_to_token=room_membership_for_user_map[
@ -612,8 +621,18 @@ class SlidingSyncHandler:
sync_config=sync_config, to_token=to_token
)
# TODO: Update this when we implement per-connection state
connection_token = 0
if has_lists or has_room_subscriptions:
connection_token = await self.connection_store.record_rooms(
user_id,
conn_id=sync_config.connection_id(),
from_token=from_token,
sent_room_ids=relevant_room_map.keys(),
unsent_room_ids=[], # TODO: We currently ssume that we have sent down all updates.
)
elif from_token:
connection_token = from_token.connection_token
else:
connection_token = 0
return SlidingSyncResult(
next_pos=SlidingSyncStreamToken(to_token, connection_token),
@ -1348,7 +1367,7 @@ class SlidingSyncHandler:
async def get_room_sync_data(
self,
user: UserID,
sync_config: SlidingSyncConfig,
room_id: str,
room_sync_config: RoomSyncConfig,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
@ -1370,6 +1389,7 @@ class SlidingSyncHandler:
from_token: The point in the stream to sync from.
to_token: The point in the stream to sync up to.
"""
user = sync_config.user
# Assemble the list of timeline events
#
@ -1413,14 +1433,27 @@ class SlidingSyncHandler:
# screen of information:
# - Initial sync (because no `from_token` to limit us anyway)
# - When users `newly_joined`
# - TODO: For an incremental sync where we haven't sent it down this
# - For an incremental sync where we haven't sent it down this
# connection before
to_bound = (
from_token.stream_token.room_key
if from_token is not None
and not room_membership_for_user_at_to_token.newly_joined
else None
)
if from_token and not room_membership_for_user_at_to_token.newly_joined:
room_status = await self.connection_store.have_sent_room(
user_id=user.to_string(),
conn_id=sync_config.connection_id(),
connection_token=from_token.connection_token,
room_id=room_id,
)
if room_status.status == HaveSentRoomFlag.LIVE:
to_bound = from_token.stream_token.room_key
elif room_status.status == HaveSentRoomFlag.PREVIOUSLY:
assert room_status.last_token is not None
to_bound = room_status.last_token
elif room_status.status == HaveSentRoomFlag.NEVER:
to_bound = None
else:
assert_never(room_status.status)
else:
to_bound = None
timeline_events, new_room_key = await self.store.paginate_room_events(
room_id=room_id,