mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-20 10:55:09 +03:00
In sync wait for worker to catch up since token
Otherwise things will get confused.
This commit is contained in:
parent
1b7fa7b04a
commit
169c9f85a8
2 changed files with 53 additions and 2 deletions
|
@ -279,6 +279,23 @@ class SyncResult:
|
||||||
or self.device_lists
|
or self.device_lists
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def empty(next_batch: StreamToken) -> "SyncResult":
|
||||||
|
"Return a new empty result"
|
||||||
|
return SyncResult(
|
||||||
|
next_batch=next_batch,
|
||||||
|
presence=[],
|
||||||
|
account_data=[],
|
||||||
|
joined=[],
|
||||||
|
invited=[],
|
||||||
|
knocked=[],
|
||||||
|
archived=[],
|
||||||
|
to_device=[],
|
||||||
|
device_lists=DeviceListUpdates(),
|
||||||
|
device_one_time_keys_count={},
|
||||||
|
device_unused_fallback_key_types=[],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class SyncHandler:
|
class SyncHandler:
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
|
@ -401,6 +418,24 @@ class SyncHandler:
|
||||||
if context:
|
if context:
|
||||||
context.tag = sync_label
|
context.tag = sync_label
|
||||||
|
|
||||||
|
if since_token is not None:
|
||||||
|
# We need to make sure this worker has caught up with the token. If
|
||||||
|
# this returns false it means we timed out waiting, and we should
|
||||||
|
# just return an empty response.
|
||||||
|
start = self.clock.time_msec()
|
||||||
|
if not await self.notifier.wait_for_stream_token(since_token):
|
||||||
|
logger.warning(
|
||||||
|
"Timed out waiting for worker to catch up. Returning empty response"
|
||||||
|
)
|
||||||
|
return SyncResult.empty(since_token)
|
||||||
|
|
||||||
|
# If we've spent significant time waiting to catch up, take it off
|
||||||
|
# the timeout.
|
||||||
|
now = self.clock.time_msec()
|
||||||
|
if now - start > 1_000:
|
||||||
|
timeout -= now - start
|
||||||
|
timeout = max(timeout, 0)
|
||||||
|
|
||||||
# if we have a since token, delete any to-device messages before that token
|
# if we have a since token, delete any to-device messages before that token
|
||||||
# (since we now know that the device has received them)
|
# (since we now know that the device has received them)
|
||||||
if since_token is not None:
|
if since_token is not None:
|
||||||
|
|
|
@ -763,8 +763,24 @@ class Notifier:
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
async def wait_for_stream_position(self, stream_token: StreamToken) -> None:
|
async def wait_for_stream_token(self, stream_token: StreamToken) -> bool:
|
||||||
pass
|
"""Wait for this worker to catch up with the given stream token."""
|
||||||
|
|
||||||
|
start = self.clock.time_msec()
|
||||||
|
while True:
|
||||||
|
current_token = self.event_sources.get_current_token()
|
||||||
|
if stream_token.is_before_or_eq(current_token):
|
||||||
|
return True
|
||||||
|
|
||||||
|
now = self.clock.time_msec()
|
||||||
|
|
||||||
|
if now - start > 10_000:
|
||||||
|
return False
|
||||||
|
|
||||||
|
logger.info("Waiting for current token to reach %s", stream_token)
|
||||||
|
|
||||||
|
# TODO: be better
|
||||||
|
await self.clock.sleep(0.5)
|
||||||
|
|
||||||
async def _get_room_ids(
|
async def _get_room_ids(
|
||||||
self, user: UserID, explicit_room_id: Optional[str]
|
self, user: UserID, explicit_room_id: Optional[str]
|
||||||
|
|
Loading…
Reference in a new issue