From 70d166e7b5a7b10875d5edefd64392bcb2d9ac7f Mon Sep 17 00:00:00 2001 From: Hugh Nimmo-Smith Date: Fri, 25 Oct 2024 17:31:06 +0100 Subject: [PATCH] inductive sync state WIP --- synapse/handlers/sync.py | 15 ++++++++-- synapse/streams/events.py | 3 +- synapse/types/__init__.py | 55 ++++++++++++++++++++++++------------- tests/handlers/test_sync.py | 9 ++---- 4 files changed, 53 insertions(+), 29 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index f4ea90fbd7..d83b58adb9 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1397,6 +1397,7 @@ class SyncHandler: timeline_contains=timeline_state, timeline_start=state_at_timeline_start, timeline_end=state_at_timeline_end, + previous_timeline_start={}, previous_timeline_end={}, lazy_load_members=lazy_load_members, ) @@ -1535,6 +1536,13 @@ class SyncHandler: await_full_state=await_full_state, ) + state_at_previous_sync_start = {} if since_token.prev_batch is None else await self._state_storage_controller.get_state_ids_at( + room_id, + stream_position=since_token.prev_batch, + state_filter=state_filter, + await_full_state=await_full_state, + ) + state_at_timeline_end = await self._state_storage_controller.get_state_ids_at( room_id, stream_position=end_token, @@ -1546,6 +1554,7 @@ class SyncHandler: timeline_contains=timeline_state, timeline_start=state_at_timeline_start, timeline_end=state_at_timeline_end, + previous_timeline_start=state_at_previous_sync_start, previous_timeline_end=state_at_previous_sync, lazy_load_members=lazy_load_members, ) @@ -1965,7 +1974,7 @@ class SyncHandler: # this is due to some of the underlying streams not supporting the ability # to query up to a given point. # Always use the `now_token` in `SyncResultBuilder` - now_token = self.event_sources.get_current_token() + now_token = self.event_sources.get_current_token(prev_batch=since_token) log_kv({"now_token": now_token}) # Since we fetched the users room list before calculating the `now_token` (see @@ -2980,6 +2989,7 @@ def _calculate_state( timeline_contains: StateMap[str], timeline_start: StateMap[str], timeline_end: StateMap[str], + previous_timeline_start: StateMap[str], previous_timeline_end: StateMap[str], lazy_load_members: bool, ) -> StateMap[str]: @@ -3007,6 +3017,7 @@ def _calculate_state( timeline_end_ids = set(timeline_end.values()) timeline_start_ids = set(timeline_start.values()) + previous_timeline_start_ids = set(previous_timeline_start.values()) previous_timeline_end_ids = set(previous_timeline_end.values()) timeline_contains_ids = set(timeline_contains.values()) @@ -3082,7 +3093,7 @@ def _calculate_state( state_ids = ( (timeline_end_ids | timeline_start_ids) - - previous_timeline_end_ids + - (previous_timeline_end_ids | previous_timeline_start_ids) - timeline_contains_ids ) diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 856f646795..77ba09cdf0 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -77,7 +77,7 @@ class EventSources: self.store = hs.get_datastores().main self._instance_name = hs.get_instance_name() - def get_current_token(self) -> StreamToken: + def get_current_token(self, prev_batch: StreamToken = None) -> StreamToken: push_rules_key = self.store.get_max_push_rules_stream_id() to_device_key = self.store.get_to_device_stream_token() device_list_key = self.store.get_device_stream_token() @@ -97,6 +97,7 @@ class EventSources: # Groups key is unused. groups_key=0, un_partial_stated_rooms_key=un_partial_stated_rooms_key, + prev_batch=prev_batch, ) return token diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 26783c5622..454da2b815 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -980,16 +980,26 @@ class StreamToken: groups_key: int un_partial_stated_rooms_key: int + prev_batch: Optional["StreamToken"] = None + _BATCH_SEPARATOR = "~" + _SEPARATOR = "_" START: ClassVar["StreamToken"] @classmethod @cancellable - async def from_string(cls, store: "DataStore", string: str) -> "StreamToken": + async def from_string(cls, store: "DataStore", string: str, prev_batch: Optional["StreamToken"] = None) -> "StreamToken": """ Creates a RoomStreamToken from its textual representation. """ try: + if string.count(cls._BATCH_SEPARATOR) == 1: + # We have a prev_token + batches = string.split(cls._BATCH_SEPARATOR) + prev_batch = await StreamToken.from_string(store, batches[1]) + batch = await StreamToken.from_string(store, batches[0], prev_batch=prev_batch) + return batch + keys = string.split(cls._SEPARATOR) while len(keys) < len(attr.fields(cls)): # i.e. old token from before receipt_key @@ -1006,6 +1016,7 @@ class StreamToken: device_list_key, groups_key, un_partial_stated_rooms_key, + prev_batch, ) = keys return cls( @@ -1025,24 +1036,30 @@ class StreamToken: except Exception: raise SynapseError(400, "Invalid stream token") - async def to_string(self, store: "DataStore") -> str: - return self._SEPARATOR.join( - [ - await self.room_key.to_string(store), - str(self.presence_key), - str(self.typing_key), - await self.receipt_key.to_string(store), - str(self.account_data_key), - str(self.push_rules_key), - str(self.to_device_key), - str(self.device_list_key), - # Note that the groups key is no longer used, but it is still - # serialized so that there will not be confusion in the future - # if additional tokens are added. - str(self.groups_key), - str(self.un_partial_stated_rooms_key), - ] - ) + async def to_string(self, store: "DataStore", include_prev_batch: bool = True) -> str: + if include_prev_batch and self.prev_batch: + return self._BATCH_SEPARATOR.join([ + await self.to_string(store, include_prev_batch=False), + await self.prev_batch.to_string(store, include_prev_batch=False), + ]) + else: + return self._SEPARATOR.join( + [ + await self.room_key.to_string(store), + str(self.presence_key), + str(self.typing_key), + await self.receipt_key.to_string(store), + str(self.account_data_key), + str(self.push_rules_key), + str(self.to_device_key), + str(self.device_list_key), + # Note that the groups key is no longer used, but it is still + # serialized so that there will not be confusion in the future + # if additional tokens are added. + str(self.groups_key), + str(self.un_partial_stated_rooms_key), + ] + ) @property def room_stream_id(self) -> int: diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index a9c4fca11b..c7adca2083 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -822,12 +822,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): room_sync = incremental_sync.joined[0] self.assertEqual(room_sync.room_id, room_id) - self.assertEqual( - [e.event_id for e in room_sync.state.values()], - [ - s1_event - ], # S1 is repeated because it is the state at the start of the timeline (before E3) - ) + self.assertEqual(room_sync.state, {}) self.assertEqual( [e.event_id for e in room_sync.timeline.events], [ @@ -857,7 +852,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): [e.event_id for e in room_sync.timeline.events], [e5_event], ) - # Problem: S2 is the winning state event but the last state event the client saw was S1. + # FIXED: S2 is the winning state event but and the last that the client saw! def test_state_after_on_branches_winner_at_start_of_timeline(self) -> None: r"""Test `state` and `state_after` where not all information is in `state` + `timeline`.