From 169c9f85a8036c14d3dce61fa227c0de210f8e5e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Sat, 18 May 2024 12:22:09 +0100 Subject: [PATCH] In sync wait for worker to catch up since token Otherwise things will get confused. --- synapse/handlers/sync.py | 35 +++++++++++++++++++++++++++++++++++ synapse/notifier.py | 20 ++++++++++++++++++-- 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index d3d40e8682..37d5890c65 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -279,6 +279,23 @@ class SyncResult: or self.device_lists ) + @staticmethod + def empty(next_batch: StreamToken) -> "SyncResult": + "Return a new empty result" + return SyncResult( + next_batch=next_batch, + presence=[], + account_data=[], + joined=[], + invited=[], + knocked=[], + archived=[], + to_device=[], + device_lists=DeviceListUpdates(), + device_one_time_keys_count={}, + device_unused_fallback_key_types=[], + ) + class SyncHandler: def __init__(self, hs: "HomeServer"): @@ -401,6 +418,24 @@ class SyncHandler: if context: context.tag = sync_label + if since_token is not None: + # We need to make sure this worker has caught up with the token. If + # this returns false it means we timed out waiting, and we should + # just return an empty response. + start = self.clock.time_msec() + if not await self.notifier.wait_for_stream_token(since_token): + logger.warning( + "Timed out waiting for worker to catch up. Returning empty response" + ) + return SyncResult.empty(since_token) + + # If we've spent significant time waiting to catch up, take it off + # the timeout. + now = self.clock.time_msec() + if now - start > 1_000: + timeout -= now - start + timeout = max(timeout, 0) + # if we have a since token, delete any to-device messages before that token # (since we now know that the device has received them) if since_token is not None: diff --git a/synapse/notifier.py b/synapse/notifier.py index 06ce04c800..459954caeb 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -763,8 +763,24 @@ class Notifier: return result - async def wait_for_stream_position(self, stream_token: StreamToken) -> None: - pass + async def wait_for_stream_token(self, stream_token: StreamToken) -> bool: + """Wait for this worker to catch up with the given stream token.""" + + start = self.clock.time_msec() + while True: + current_token = self.event_sources.get_current_token() + if stream_token.is_before_or_eq(current_token): + return True + + now = self.clock.time_msec() + + if now - start > 10_000: + return False + + logger.info("Waiting for current token to reach %s", stream_token) + + # TODO: be better + await self.clock.sleep(0.5) async def _get_room_ids( self, user: UserID, explicit_room_id: Optional[str]