From 2d8da62febe67a037af2bbe99fe6a6f9426475f4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Nov 2018 17:30:11 +0000 Subject: [PATCH] Only relay 'live' events --- synapse/federation/federation_base.py | 20 ++++++++++---------- synapse/federation/transaction_queue.py | 24 ++++++++++++++++++++++-- synapse/handlers/federation.py | 9 +++++++++ synapse/storage/events.py | 2 ++ 4 files changed, 43 insertions(+), 12 deletions(-) diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 671ca1a7ec..ca3c283f45 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -79,16 +79,16 @@ class FederationBase(object): allow_none=True, ) - if not res and pdu.origin != origin: - try: - res = yield self.get_pdu( - destinations=[pdu.origin], - event_id=pdu.event_id, - outlier=outlier, - timeout=10000, - ) - except SynapseError: - pass + # if not res and pdu.origin != origin: + # try: + # res = yield self.get_pdu( + # destinations=[pdu.origin], + # event_id=pdu.event_id, + # outlier=outlier, + # timeout=10000, + # ) + # except SynapseError: + # pass if not res: logger.warn( diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index f375034d5a..3854602bfd 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -177,7 +177,7 @@ class TransactionQueue(object): @defer.inlineCallbacks def handle_event(event): - should_relay = yield self._should_relay(event) + should_relay = yield self._should_relay(event, False) logger.info("Should relay event %s: %s", event.event_id, should_relay) if not should_relay: return @@ -249,6 +249,20 @@ class TransactionQueue(object): finally: self._is_processing = False + @defer.inlineCallbacks + def received_new_event(self, origin, event): + should_relay = yield self._should_relay(event, True) + logger.info("Should relay event %s: %s", event.event_id, should_relay) + if not should_relay: + return + + destinations = event.unsigned.get("destinations") + destinations = set(destinations) + + logger.debug("Sending %s to %r", event, destinations) + + yield self._send_pdu(event, destinations) + @defer.inlineCallbacks def _send_pdu(self, pdu, destinations, span=None): # We loop through all destinations to see whether we already have @@ -352,12 +366,18 @@ class TransactionQueue(object): return joined_hosts - def _should_relay(self, event): + def _should_relay(self, event, from_federation): """Whether we should consider relaying this event. """ # XXX: Hook for routing shenanigans +<<<<<<< HEAD +======= + if from_federation and event.unsigned.get("destinations"): + return True + +>>>>>>> efdec3252... Only relay 'live' events send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of() is_mine = self.is_mine_id(event.event_id) if not is_mine and send_on_behalf_of is None: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 5053038d02..9350ff989d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -48,6 +48,7 @@ from synapse.crypto.event_signing import ( add_hashes_and_signatures, compute_event_signature, ) +from synapse.events import FrozenEvent from synapse.events.validator import EventValidator from synapse.replication.http.federation import ( ReplicationCleanRoomRestServlet, @@ -111,6 +112,7 @@ class FederationHandler(BaseHandler): self.store = hs.get_datastore() # type: synapse.storage.DataStore self.federation_client = hs.get_federation_client() + self.federation_sender = hs.get_federation_sender() self.state_handler = hs.get_state_handler() self.server_name = hs.hostname self.keyring = hs.get_keyring() @@ -435,6 +437,10 @@ class FederationHandler(BaseHandler): logger.info("Thread ID %r", thread_id) + # Remove destinations field before persisting + event_copy = FrozenEvent.from_event(pdu) + pdu.unsigned.pop("destinations", None) + yield self._process_received_pdu( origin, pdu, @@ -443,6 +449,9 @@ class FederationHandler(BaseHandler): thread_id=thread_id, ) + if sent_to_us_directly: + yield self.federation_sender.received_new_event(origin, event_copy) + if new_thread: builder = self.event_builder_factory.new({ "type": "org.matrix.new_thread", diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 187ca45ac9..118e33140e 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -590,6 +590,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore prev_event_id IN (%s) AND NOT events.outlier AND rejections.event_id IS NULL + AND NOT events.internal_event """ % ( ",".join("?" for _ in batch), ) @@ -1290,6 +1291,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore and isinstance(event.content["url"], text_type) ), "thread_id": ctx.thread_id, + "internal_event": event.internal_metadata.is_internal_event(), } for event, ctx in events_and_contexts ],