From 712caeba60a04c2a392ed1cd9111c7867b800238 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Nov 2018 16:12:33 +0000 Subject: [PATCH] Add hooks in federation for funky event routing --- synapse/federation/federation_server.py | 25 ------ synapse/federation/transaction_queue.py | 100 ++++++++++++++++++------ synapse/handlers/federation.py | 11 +++ 3 files changed, 87 insertions(+), 49 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 98722ae543..93c38845f6 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -42,7 +42,6 @@ from synapse.replication.http.federation import ( ReplicationFederationSendEduRestServlet, ReplicationGetQueryRestServlet, ) -from synapse.types import get_domain_from_id from synapse.util import glob_to_regex from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.caches.response_cache import ResponseCache @@ -601,30 +600,6 @@ class FederationServer(FederationBase): if the event was unacceptable for any other reason (eg, too large, too many prev_events, couldn't find the prev_events) """ - # check that it's actually being sent from a valid destination to - # workaround bug #1753 in 0.18.5 and 0.18.6 - if origin != get_domain_from_id(pdu.event_id): - # We continue to accept join events from any server; this is - # necessary for the federation join dance to work correctly. - # (When we join over federation, the "helper" server is - # responsible for sending out the join event, rather than the - # origin. See bug #1893). - if not ( - pdu.type == 'm.room.member' and - pdu.content and - pdu.content.get("membership", None) == 'join' - ): - logger.info( - "Discarding PDU %s from invalid origin %s", - pdu.event_id, origin - ) - return - else: - logger.info( - "Accepting join PDU %s from %s", - pdu.event_id, origin - ) - # Check signature. try: pdu = yield self._check_sigs_and_hash(pdu) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index dd85c0f4e8..52c483139c 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -42,6 +42,8 @@ from .units import Edu, Transaction logger = logging.getLogger(__name__) +pdu_logger = logging.getLogger("synapse.federation.pdu_destination_logger") + sent_pdus_destination_dist_count = Counter( "synapse_federation_client_sent_pdu_destinations:count", "" ) @@ -169,13 +171,9 @@ class TransactionQueue(object): @defer.inlineCallbacks def handle_event(event): - # Only send events for this server. - send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of() - is_mine = self.is_mine_id(event.event_id) - if not is_mine and send_on_behalf_of is None: - return - - if event.internal_metadata.is_internal_event(): + should_relay = yield self._should_relay(event) + logger.info("Should relay event %s: %s", event.event_id, should_relay) + if not should_relay: return try: @@ -197,15 +195,9 @@ class TransactionQueue(object): destinations = set(destinations) - if send_on_behalf_of is not None: - # If we are sending the event on behalf of another server - # then it already has the event and there is no reason to - # send the event to it. - destinations.discard(send_on_behalf_of) - logger.debug("Sending %s to %r", event, destinations) - self._send_pdu(event, destinations) + yield self._send_pdu(event, destinations) @defer.inlineCallbacks def handle_room_events(events): @@ -251,6 +243,7 @@ class TransactionQueue(object): finally: self._is_processing = False + @defer.inlineCallbacks def _send_pdu(self, pdu, destinations): # We loop through all destinations to see whether we already have # a transaction in progress. If we do, stick it in the pending_pdus @@ -261,14 +254,26 @@ class TransactionQueue(object): destinations = set(destinations) destinations.discard(self.server_name) + + destinations = yield self._compute_relay_destinations( + pdu, joined_hosts=destinations, + ) + logger.debug("Sending to: %s", str(destinations)) + pdu_logger.info( + "Relaying PDU %s in %s to %s", + pdu.event_id, pdu.room_id, destinations, + ) + if not destinations: return sent_pdus_destination_dist_total.inc(len(destinations)) sent_pdus_destination_dist_count.inc() + # XXX: Should we decide where to route here. + for destination in destinations: self.pending_pdus_by_dest.setdefault(destination, []).append( (pdu, order) @@ -276,6 +281,36 @@ class TransactionQueue(object): self._attempt_new_transaction(destination) + def _compute_relay_destinations(self, pdu, joined_hosts): + """Compute where we should send an event. Returning an empty set stops + PDU from being sent anywhere. + """ + # XXX: Hook for routing shenanigans + send_on_behalf_of = pdu.internal_metadata.get_send_on_behalf_of() + if send_on_behalf_of is not None: + # If we are sending the event on behalf of another server + # then it already has the event and there is no reason to + # send the event to it. + joined_hosts.discard(send_on_behalf_of) + + return joined_hosts + + def _should_relay(self, event): + """Whether we should consider relaying this event. + """ + + # XXX: Hook for routing shenanigans + + send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of() + is_mine = self.is_mine_id(event.event_id) + if not is_mine and send_on_behalf_of is None: + return False + + if event.internal_metadata.is_internal_event(): + return False + + return True + @logcontext.preserve_fn # the caller should not yield on this @defer.inlineCallbacks def send_presence(self, states): @@ -657,18 +692,35 @@ class TransactionQueue(object): logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id) if code == 200: - for e_id, r in response.get("pdus", {}).items(): - if "error" in r: - logger.warn( - "TX [%s] {%s} Remote returned error for %s: %s", - destination, txn_id, e_id, r, - ) + pdu_results = response.get("pdus", {}) + for p in pdus: + yield self._pdu_send_result( + destination, txn_id, p, + response=pdu_results.get(p.event_id, {}) + ) else: for p in pdus: - logger.warn( - "TX [%s] {%s} Failed to send event %s", - destination, txn_id, p.event_id, - ) + yield self._pdu_send_txn_failed(destination, txn_id, p) success = False defer.returnValue(success) + + def _pdu_send_result(self, destination, txn_id, pdu, response): + """Gets called after sending the event in a transaction, with the + result for the event from the remote server. + """ + # XXX: Hook for routing shenanigans + if "error" in response: + logger.warn( + "TX [%s] {%s} Remote returned error for %s: %s", + destination, txn_id, pdu.event_id, response, + ) + + def _pdu_send_txn_failed(self, destination, txn_id, pdu): + """Gets called when sending a transaction failed (after retries) + """ + # XXX: Hook for routing shenanigans + logger.warn( + "TX [%s] {%s} Failed to send event %s", + destination, txn_id, pdu.event_id, + ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c37a3b8dca..c0bea7a5ed 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -69,6 +69,8 @@ from ._base import BaseHandler logger = logging.getLogger(__name__) +pdu_logger = logging.getLogger("synapse.federation.pdu_destination_logger") + def shortstr(iterable, maxitems=5): """If iterable has maxitems or fewer, return the stringification of a list @@ -178,8 +180,17 @@ class FederationHandler(BaseHandler): ) if already_seen: logger.debug("[%s %s]: Already seen pdu", room_id, event_id) + pdu_logger.info( + "Received already seen event %s in room %s from %s", + pdu.event_id, pdu.room_id, origin, + ) return + pdu_logger.info( + "Received unseen event %s in room %s from %s", + pdu.event_id, pdu.room_id, origin, + ) + # do some initial sanity-checking of the event. In particular, make # sure it doesn't have hundreds of prev_events or auth_events, which # could cause a huge state resolution or cascade of event fetches.