mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-22 01:25:44 +03:00
inductive sync state WIP
This commit is contained in:
parent
96425d4071
commit
70d166e7b5
4 changed files with 53 additions and 29 deletions
|
@ -1397,6 +1397,7 @@ class SyncHandler:
|
||||||
timeline_contains=timeline_state,
|
timeline_contains=timeline_state,
|
||||||
timeline_start=state_at_timeline_start,
|
timeline_start=state_at_timeline_start,
|
||||||
timeline_end=state_at_timeline_end,
|
timeline_end=state_at_timeline_end,
|
||||||
|
previous_timeline_start={},
|
||||||
previous_timeline_end={},
|
previous_timeline_end={},
|
||||||
lazy_load_members=lazy_load_members,
|
lazy_load_members=lazy_load_members,
|
||||||
)
|
)
|
||||||
|
@ -1535,6 +1536,13 @@ class SyncHandler:
|
||||||
await_full_state=await_full_state,
|
await_full_state=await_full_state,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
state_at_previous_sync_start = {} if since_token.prev_batch is None else await self._state_storage_controller.get_state_ids_at(
|
||||||
|
room_id,
|
||||||
|
stream_position=since_token.prev_batch,
|
||||||
|
state_filter=state_filter,
|
||||||
|
await_full_state=await_full_state,
|
||||||
|
)
|
||||||
|
|
||||||
state_at_timeline_end = await self._state_storage_controller.get_state_ids_at(
|
state_at_timeline_end = await self._state_storage_controller.get_state_ids_at(
|
||||||
room_id,
|
room_id,
|
||||||
stream_position=end_token,
|
stream_position=end_token,
|
||||||
|
@ -1546,6 +1554,7 @@ class SyncHandler:
|
||||||
timeline_contains=timeline_state,
|
timeline_contains=timeline_state,
|
||||||
timeline_start=state_at_timeline_start,
|
timeline_start=state_at_timeline_start,
|
||||||
timeline_end=state_at_timeline_end,
|
timeline_end=state_at_timeline_end,
|
||||||
|
previous_timeline_start=state_at_previous_sync_start,
|
||||||
previous_timeline_end=state_at_previous_sync,
|
previous_timeline_end=state_at_previous_sync,
|
||||||
lazy_load_members=lazy_load_members,
|
lazy_load_members=lazy_load_members,
|
||||||
)
|
)
|
||||||
|
@ -1965,7 +1974,7 @@ class SyncHandler:
|
||||||
# this is due to some of the underlying streams not supporting the ability
|
# this is due to some of the underlying streams not supporting the ability
|
||||||
# to query up to a given point.
|
# to query up to a given point.
|
||||||
# Always use the `now_token` in `SyncResultBuilder`
|
# Always use the `now_token` in `SyncResultBuilder`
|
||||||
now_token = self.event_sources.get_current_token()
|
now_token = self.event_sources.get_current_token(prev_batch=since_token)
|
||||||
log_kv({"now_token": now_token})
|
log_kv({"now_token": now_token})
|
||||||
|
|
||||||
# Since we fetched the users room list before calculating the `now_token` (see
|
# Since we fetched the users room list before calculating the `now_token` (see
|
||||||
|
@ -2980,6 +2989,7 @@ def _calculate_state(
|
||||||
timeline_contains: StateMap[str],
|
timeline_contains: StateMap[str],
|
||||||
timeline_start: StateMap[str],
|
timeline_start: StateMap[str],
|
||||||
timeline_end: StateMap[str],
|
timeline_end: StateMap[str],
|
||||||
|
previous_timeline_start: StateMap[str],
|
||||||
previous_timeline_end: StateMap[str],
|
previous_timeline_end: StateMap[str],
|
||||||
lazy_load_members: bool,
|
lazy_load_members: bool,
|
||||||
) -> StateMap[str]:
|
) -> StateMap[str]:
|
||||||
|
@ -3007,6 +3017,7 @@ def _calculate_state(
|
||||||
|
|
||||||
timeline_end_ids = set(timeline_end.values())
|
timeline_end_ids = set(timeline_end.values())
|
||||||
timeline_start_ids = set(timeline_start.values())
|
timeline_start_ids = set(timeline_start.values())
|
||||||
|
previous_timeline_start_ids = set(previous_timeline_start.values())
|
||||||
previous_timeline_end_ids = set(previous_timeline_end.values())
|
previous_timeline_end_ids = set(previous_timeline_end.values())
|
||||||
timeline_contains_ids = set(timeline_contains.values())
|
timeline_contains_ids = set(timeline_contains.values())
|
||||||
|
|
||||||
|
@ -3082,7 +3093,7 @@ def _calculate_state(
|
||||||
|
|
||||||
state_ids = (
|
state_ids = (
|
||||||
(timeline_end_ids | timeline_start_ids)
|
(timeline_end_ids | timeline_start_ids)
|
||||||
- previous_timeline_end_ids
|
- (previous_timeline_end_ids | previous_timeline_start_ids)
|
||||||
- timeline_contains_ids
|
- timeline_contains_ids
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -77,7 +77,7 @@ class EventSources:
|
||||||
self.store = hs.get_datastores().main
|
self.store = hs.get_datastores().main
|
||||||
self._instance_name = hs.get_instance_name()
|
self._instance_name = hs.get_instance_name()
|
||||||
|
|
||||||
def get_current_token(self) -> StreamToken:
|
def get_current_token(self, prev_batch: StreamToken = None) -> StreamToken:
|
||||||
push_rules_key = self.store.get_max_push_rules_stream_id()
|
push_rules_key = self.store.get_max_push_rules_stream_id()
|
||||||
to_device_key = self.store.get_to_device_stream_token()
|
to_device_key = self.store.get_to_device_stream_token()
|
||||||
device_list_key = self.store.get_device_stream_token()
|
device_list_key = self.store.get_device_stream_token()
|
||||||
|
@ -97,6 +97,7 @@ class EventSources:
|
||||||
# Groups key is unused.
|
# Groups key is unused.
|
||||||
groups_key=0,
|
groups_key=0,
|
||||||
un_partial_stated_rooms_key=un_partial_stated_rooms_key,
|
un_partial_stated_rooms_key=un_partial_stated_rooms_key,
|
||||||
|
prev_batch=prev_batch,
|
||||||
)
|
)
|
||||||
return token
|
return token
|
||||||
|
|
||||||
|
|
|
@ -980,16 +980,26 @@ class StreamToken:
|
||||||
groups_key: int
|
groups_key: int
|
||||||
un_partial_stated_rooms_key: int
|
un_partial_stated_rooms_key: int
|
||||||
|
|
||||||
|
prev_batch: Optional["StreamToken"] = None
|
||||||
|
_BATCH_SEPARATOR = "~"
|
||||||
|
|
||||||
_SEPARATOR = "_"
|
_SEPARATOR = "_"
|
||||||
START: ClassVar["StreamToken"]
|
START: ClassVar["StreamToken"]
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@cancellable
|
@cancellable
|
||||||
async def from_string(cls, store: "DataStore", string: str) -> "StreamToken":
|
async def from_string(cls, store: "DataStore", string: str, prev_batch: Optional["StreamToken"] = None) -> "StreamToken":
|
||||||
"""
|
"""
|
||||||
Creates a RoomStreamToken from its textual representation.
|
Creates a RoomStreamToken from its textual representation.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
|
if string.count(cls._BATCH_SEPARATOR) == 1:
|
||||||
|
# We have a prev_token
|
||||||
|
batches = string.split(cls._BATCH_SEPARATOR)
|
||||||
|
prev_batch = await StreamToken.from_string(store, batches[1])
|
||||||
|
batch = await StreamToken.from_string(store, batches[0], prev_batch=prev_batch)
|
||||||
|
return batch
|
||||||
|
|
||||||
keys = string.split(cls._SEPARATOR)
|
keys = string.split(cls._SEPARATOR)
|
||||||
while len(keys) < len(attr.fields(cls)):
|
while len(keys) < len(attr.fields(cls)):
|
||||||
# i.e. old token from before receipt_key
|
# i.e. old token from before receipt_key
|
||||||
|
@ -1006,6 +1016,7 @@ class StreamToken:
|
||||||
device_list_key,
|
device_list_key,
|
||||||
groups_key,
|
groups_key,
|
||||||
un_partial_stated_rooms_key,
|
un_partial_stated_rooms_key,
|
||||||
|
prev_batch,
|
||||||
) = keys
|
) = keys
|
||||||
|
|
||||||
return cls(
|
return cls(
|
||||||
|
@ -1025,24 +1036,30 @@ class StreamToken:
|
||||||
except Exception:
|
except Exception:
|
||||||
raise SynapseError(400, "Invalid stream token")
|
raise SynapseError(400, "Invalid stream token")
|
||||||
|
|
||||||
async def to_string(self, store: "DataStore") -> str:
|
async def to_string(self, store: "DataStore", include_prev_batch: bool = True) -> str:
|
||||||
return self._SEPARATOR.join(
|
if include_prev_batch and self.prev_batch:
|
||||||
[
|
return self._BATCH_SEPARATOR.join([
|
||||||
await self.room_key.to_string(store),
|
await self.to_string(store, include_prev_batch=False),
|
||||||
str(self.presence_key),
|
await self.prev_batch.to_string(store, include_prev_batch=False),
|
||||||
str(self.typing_key),
|
])
|
||||||
await self.receipt_key.to_string(store),
|
else:
|
||||||
str(self.account_data_key),
|
return self._SEPARATOR.join(
|
||||||
str(self.push_rules_key),
|
[
|
||||||
str(self.to_device_key),
|
await self.room_key.to_string(store),
|
||||||
str(self.device_list_key),
|
str(self.presence_key),
|
||||||
# Note that the groups key is no longer used, but it is still
|
str(self.typing_key),
|
||||||
# serialized so that there will not be confusion in the future
|
await self.receipt_key.to_string(store),
|
||||||
# if additional tokens are added.
|
str(self.account_data_key),
|
||||||
str(self.groups_key),
|
str(self.push_rules_key),
|
||||||
str(self.un_partial_stated_rooms_key),
|
str(self.to_device_key),
|
||||||
]
|
str(self.device_list_key),
|
||||||
)
|
# Note that the groups key is no longer used, but it is still
|
||||||
|
# serialized so that there will not be confusion in the future
|
||||||
|
# if additional tokens are added.
|
||||||
|
str(self.groups_key),
|
||||||
|
str(self.un_partial_stated_rooms_key),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def room_stream_id(self) -> int:
|
def room_stream_id(self) -> int:
|
||||||
|
|
|
@ -822,12 +822,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||||
room_sync = incremental_sync.joined[0]
|
room_sync = incremental_sync.joined[0]
|
||||||
|
|
||||||
self.assertEqual(room_sync.room_id, room_id)
|
self.assertEqual(room_sync.room_id, room_id)
|
||||||
self.assertEqual(
|
self.assertEqual(room_sync.state, {})
|
||||||
[e.event_id for e in room_sync.state.values()],
|
|
||||||
[
|
|
||||||
s1_event
|
|
||||||
], # S1 is repeated because it is the state at the start of the timeline (before E3)
|
|
||||||
)
|
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
[e.event_id for e in room_sync.timeline.events],
|
[e.event_id for e in room_sync.timeline.events],
|
||||||
[
|
[
|
||||||
|
@ -857,7 +852,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||||
[e.event_id for e in room_sync.timeline.events],
|
[e.event_id for e in room_sync.timeline.events],
|
||||||
[e5_event],
|
[e5_event],
|
||||||
)
|
)
|
||||||
# Problem: S2 is the winning state event but the last state event the client saw was S1.
|
# FIXED: S2 is the winning state event but and the last that the client saw!
|
||||||
|
|
||||||
def test_state_after_on_branches_winner_at_start_of_timeline(self) -> None:
|
def test_state_after_on_branches_winner_at_start_of_timeline(self) -> None:
|
||||||
r"""Test `state` and `state_after` where not all information is in `state` + `timeline`.
|
r"""Test `state` and `state_after` where not all information is in `state` + `timeline`.
|
||||||
|
|
Loading…
Reference in a new issue