From d0d3c63705d72c051cb3e03ec7f717d5a6051179 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 21 Nov 2018 10:45:35 +0000 Subject: [PATCH] Fix threading when pulling in via get_missing_events --- synapse/federation/federation_server.py | 3 ++- synapse/handlers/federation.py | 18 +++++++++++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9e064b2e57..605597acab 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -210,7 +210,7 @@ class FederationServer(FederationBase): pdu_results[event_id] = e.error_dict() return - thread_id = random.randint(0, 999999999) + thread_id = random.randint(1, 999999999) pdu_to_thread = {} first_in_thread = True for pdu in reversed(pdus_by_room[room_id]): @@ -225,6 +225,7 @@ class FederationServer(FederationBase): event_id = pdu.event_id with nested_logging_context(event_id): thread_id, new_thread = pdu_to_thread[pdu.event_id] + logger.info("Assigning thread %d to %s", thread_id, pdu.event_id) try: yield self._handle_received_pdu( origin, pdu, thread_id=thread_id, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 68e13673fd..9e470f8614 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -551,6 +551,21 @@ class FederationHandler(BaseHandler): # tell clients about them in order. missing_events.sort(key=lambda x: x.depth) + pdu_to_thread = {} + if not thread_id: + thread_id = random.randint(1, 999999999) + first_in_thread = True + for pdu in reversed(missing_events): + now = self.clock.time_msec() + if now - pdu.origin_server_ts > 1 * 60 * 1000: + pdu_to_thread[pdu.event_id] = (thread_id, first_in_thread) + first_in_thread = False + else: + pdu_to_thread[pdu.event_id] = (0, False) + else: + for pdu in reversed(missing_events): + pdu_to_thread[pdu.event_id] = (thread_id, False) + for ev in missing_events: logger.info( "[%s %s] Handling received prev_event %s", @@ -562,7 +577,8 @@ class FederationHandler(BaseHandler): origin, ev, sent_to_us_directly=False, - thread_id=thread_id, + thread_id=pdu_to_thread[ev.event_id][0], + new_thread=pdu_to_thread[ev.event_id][1], ) except FederationError as e: if e.code == 403: