Pass throught SlidingSyncStreamToken

This commit is contained in:
Erik Johnston 2024-07-16 12:13:40 +01:00
parent 30263b43c2
commit 1ad1cce3f2
3 changed files with 28 additions and 14 deletions

View file

@ -36,6 +36,7 @@ from synapse.types import (
PersistedEventPosition, PersistedEventPosition,
Requester, Requester,
RoomStreamToken, RoomStreamToken,
SlidingSyncStreamToken,
StateMap, StateMap,
StreamKeyType, StreamKeyType,
StreamToken, StreamToken,
@ -343,7 +344,7 @@ class SlidingSyncHandler:
self, self,
requester: Requester, requester: Requester,
sync_config: SlidingSyncConfig, sync_config: SlidingSyncConfig,
from_token: Optional[StreamToken] = None, from_token: Optional[SlidingSyncStreamToken] = None,
timeout_ms: int = 0, timeout_ms: int = 0,
) -> SlidingSyncResult: ) -> SlidingSyncResult:
""" """
@ -378,7 +379,7 @@ class SlidingSyncHandler:
# this returns false, it means we timed out waiting, and we should # this returns false, it means we timed out waiting, and we should
# just return an empty response. # just return an empty response.
before_wait_ts = self.clock.time_msec() before_wait_ts = self.clock.time_msec()
if not await self.notifier.wait_for_stream_token(from_token): if not await self.notifier.wait_for_stream_token(from_token.stream_token):
logger.warning( logger.warning(
"Timed out waiting for worker to catch up. Returning empty response" "Timed out waiting for worker to catch up. Returning empty response"
) )
@ -416,7 +417,7 @@ class SlidingSyncHandler:
sync_config.user.to_string(), sync_config.user.to_string(),
timeout_ms, timeout_ms,
current_sync_callback, current_sync_callback,
from_token=from_token, from_token=from_token.stream_token,
) )
return result return result
@ -425,7 +426,7 @@ class SlidingSyncHandler:
self, self,
sync_config: SlidingSyncConfig, sync_config: SlidingSyncConfig,
to_token: StreamToken, to_token: StreamToken,
from_token: Optional[StreamToken] = None, from_token: Optional[SlidingSyncStreamToken] = None,
) -> SlidingSyncResult: ) -> SlidingSyncResult:
""" """
Generates the response body of a Sliding Sync result, represented as a Generates the response body of a Sliding Sync result, represented as a
@ -458,7 +459,7 @@ class SlidingSyncHandler:
await self.get_room_membership_for_user_at_to_token( await self.get_room_membership_for_user_at_to_token(
user=sync_config.user, user=sync_config.user,
to_token=to_token, to_token=to_token,
from_token=from_token, from_token=from_token.stream_token if from_token else None,
) )
) )
@ -609,8 +610,11 @@ class SlidingSyncHandler:
sync_config=sync_config, to_token=to_token sync_config=sync_config, to_token=to_token
) )
# TODO: Update this when we implement per-connection state
connection_token = 0
return SlidingSyncResult( return SlidingSyncResult(
next_pos=to_token, next_pos=SlidingSyncStreamToken(to_token, connection_token),
lists=lists, lists=lists,
rooms=rooms, rooms=rooms,
extensions=extensions, extensions=extensions,
@ -1346,7 +1350,7 @@ class SlidingSyncHandler:
room_id: str, room_id: str,
room_sync_config: RoomSyncConfig, room_sync_config: RoomSyncConfig,
room_membership_for_user_at_to_token: _RoomMembershipForUser, room_membership_for_user_at_to_token: _RoomMembershipForUser,
from_token: Optional[StreamToken], from_token: Optional[SlidingSyncStreamToken],
to_token: StreamToken, to_token: StreamToken,
) -> SlidingSyncResult.RoomResult: ) -> SlidingSyncResult.RoomResult:
""" """
@ -1410,7 +1414,7 @@ class SlidingSyncHandler:
# - TODO: For an incremental sync where we haven't sent it down this # - TODO: For an incremental sync where we haven't sent it down this
# connection before # connection before
to_bound = ( to_bound = (
from_token.room_key from_token.stream_token.room_key
if from_token is not None if from_token is not None
and not room_membership_for_user_at_to_token.newly_joined and not room_membership_for_user_at_to_token.newly_joined
else None else None
@ -1477,7 +1481,9 @@ class SlidingSyncHandler:
instance_name=timeline_event.internal_metadata.instance_name, instance_name=timeline_event.internal_metadata.instance_name,
stream=timeline_event.internal_metadata.stream_ordering, stream=timeline_event.internal_metadata.stream_ordering,
) )
if persisted_position.persisted_after(from_token.room_key): if persisted_position.persisted_after(
from_token.stream_token.room_key
):
num_live += 1 num_live += 1
else: else:
# Since we're iterating over the timeline events in # Since we're iterating over the timeline events in

View file

@ -54,7 +54,7 @@ from synapse.http.servlet import (
from synapse.http.site import SynapseRequest from synapse.http.site import SynapseRequest
from synapse.logging.opentracing import trace_with_opname from synapse.logging.opentracing import trace_with_opname
from synapse.rest.admin.experimental_features import ExperimentalFeature from synapse.rest.admin.experimental_features import ExperimentalFeature
from synapse.types import JsonDict, Requester, StreamToken from synapse.types import JsonDict, Requester, SlidingSyncStreamToken, StreamToken
from synapse.types.rest.client import SlidingSyncBody from synapse.types.rest.client import SlidingSyncBody
from synapse.util import json_decoder from synapse.util import json_decoder
from synapse.util.caches.lrucache import LruCache from synapse.util.caches.lrucache import LruCache
@ -889,7 +889,9 @@ class SlidingSyncRestServlet(RestServlet):
from_token = None from_token = None
if from_token_string is not None: if from_token_string is not None:
from_token = await StreamToken.from_string(self.store, from_token_string) from_token = await SlidingSyncStreamToken.from_string(
self.store, from_token_string
)
# TODO: We currently don't know whether we're going to use sticky params or # TODO: We currently don't know whether we're going to use sticky params or
# maybe some filters like sync v2 where they are built up once and referenced # maybe some filters like sync v2 where they are built up once and referenced

View file

@ -31,7 +31,13 @@ else:
from pydantic import Extra from pydantic import Extra
from synapse.events import EventBase from synapse.events import EventBase
from synapse.types import JsonDict, JsonMapping, StreamToken, UserID from synapse.types import (
JsonDict,
JsonMapping,
SlidingSyncStreamToken,
StreamToken,
UserID,
)
from synapse.types.rest.client import SlidingSyncBody from synapse.types.rest.client import SlidingSyncBody
if TYPE_CHECKING: if TYPE_CHECKING:
@ -287,7 +293,7 @@ class SlidingSyncResult:
def __bool__(self) -> bool: def __bool__(self) -> bool:
return bool(self.to_device) return bool(self.to_device)
next_pos: StreamToken next_pos: SlidingSyncStreamToken
lists: Dict[str, SlidingWindowList] lists: Dict[str, SlidingWindowList]
rooms: Dict[str, RoomResult] rooms: Dict[str, RoomResult]
extensions: Extensions extensions: Extensions
@ -300,7 +306,7 @@ class SlidingSyncResult:
return bool(self.lists or self.rooms or self.extensions) return bool(self.lists or self.rooms or self.extensions)
@staticmethod @staticmethod
def empty(next_pos: StreamToken) -> "SlidingSyncResult": def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult":
"Return a new empty result" "Return a new empty result"
return SlidingSyncResult( return SlidingSyncResult(
next_pos=next_pos, next_pos=next_pos,