diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index f6fec0afdd..24189d85c9 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -691,7 +691,12 @@ class FederationHandler(BaseHandler): # Don't bother processing events we already have. seen_events = yield self.store.have_events_in_timeline( - set(e.event_id for e in events) + set( + itertools.chain.from_iterable( + itertools.chain([e.event_id], e.prev_event_ids(),) + for e in events + ) + ) ) events = [e for e in events if e.event_id not in seen_events] @@ -706,7 +711,7 @@ class FederationHandler(BaseHandler): edges = [ ev.event_id for ev in events - if set(ev.prev_event_ids()) - event_ids + if set(ev.prev_event_ids()) - event_ids - seen_events ] logger.info( @@ -740,18 +745,26 @@ class FederationHandler(BaseHandler): }) missing_auth = required_auth - set(auth_events) failed_to_fetch = set() + not_in_db = set() # Try and fetch any missing auth events from both DB and remote servers. # We repeatedly do this until we stop finding new auth events. while missing_auth - failed_to_fetch: logger.info("Missing auth for backfill: %r", missing_auth) - ret_events = yield self.store.get_events(missing_auth - failed_to_fetch) - auth_events.update(ret_events) - required_auth.update( - a_id for event in ret_events.values() for a_id in event.auth_event_ids() - ) - missing_auth = required_auth - set(auth_events) + to_fetch_from_db = missing_auth - failed_to_fetch + while to_fetch_from_db - not_in_db: + ret_events = yield self.store.get_events(missing_auth - failed_to_fetch) + auth_events.update(ret_events) + + required_auth.update( + a_id + for event in ret_events.values() + for a_id in event.auth_event_ids() + ) + missing_auth = required_auth - set(auth_events) + + to_fetch_from_db = required_auth - set(auth_events) - not_in_db if missing_auth - failed_to_fetch: logger.info( @@ -820,6 +833,25 @@ class FederationHandler(BaseHandler): events.sort(key=lambda e: e.depth) + event_id_to_thread = {} + event_to_parents = {} + for event in reversed(events): + threads = yield self.store.get_threads_for_backfill_event(event.event_id) + parents = event_to_parents.get(event.event_id, []) + for p in parents: + t = event_id_to_thread.get(p) + if t is not None: + threads.append(t) + + if threads: + thread_id = min(threads) + else: + thread_id = 0 + + event_id_to_thread[event.event_id] = thread_id + for c in event.prev_event_ids(): + event_to_parents.setdefault(c, set()).add(event.event_id) + for event in events: if event in events_to_state: continue @@ -829,6 +861,7 @@ class FederationHandler(BaseHandler): # TODO: We can probably do something more clever here. yield self._handle_new_event( dest, event, backfilled=True, + thread_id=event_id_to_thread[event.event_id], ) defer.returnValue(events) @@ -838,12 +871,13 @@ class FederationHandler(BaseHandler): """Checks the database to see if we should backfill before paginating, and if so do. """ + logger.info("Backfilling") extremities = yield self.store.get_oldest_events_with_depth_in_room( room_id ) if not extremities: - logger.debug("Not backfilling as no extremeties found.") + logger.info("Not backfilling as no extremeties found.") return # Check if we reached a point where we should start backfilling. @@ -858,7 +892,7 @@ class FederationHandler(BaseHandler): extremities = dict(sorted_extremeties_tuple[:5]) if current_depth > max_depth: - logger.debug( + logger.info( "Not backfilling as we don't need to. %d < %d", max_depth, current_depth, ) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index a6b1121e8c..20a031fcec 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -28,7 +28,6 @@ from synapse.events.utils import ( format_event_raw, serialize_event, ) -from synapse.events import FrozenEvent from synapse.handlers.presence import format_user_presence_state from synapse.handlers.sync import SyncConfig from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string @@ -246,7 +245,8 @@ class SyncRestServlet(RestServlet): } @staticmethod - def encode_joined(rooms, time_now, token_id, event_fields, event_formatter, exclude_threaded): + def encode_joined(rooms, time_now, token_id, event_fields, event_formatter, + exclude_threaded): """ Encode the joined rooms in a sync result diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index d3b9dea1d6..a5f72aaa13 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -34,6 +34,24 @@ logger = logging.getLogger(__name__) class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBaseStore): + + def get_threads_for_backfill_event(self, event_id): + def _get_thread_for_backfill_event_txn(txn): + sql = """ + SELECT thread_id + FROM event_edges + INNER JOIN events USING (event_id) + WHERE prev_event_id = ? + """ + + txn.execute(sql, (event_id,)) + return [thread_id for thread_id, in txn] + + return self.runInteraction( + "get_thread_for_backfill_event", + _get_thread_for_backfill_event_txn, + ) + def get_auth_chain(self, event_ids, include_given=False): """Get auth events for given event_ids. The events *must* be state events.