diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index e62bdf5bbe..b878023058 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -226,11 +226,11 @@ class FederationServer(FederationBase): thread_id, new_thread = pdu_to_thread[pdu.event_id] logger.info("Assigning thread %d to %s", thread_id, pdu.event_id) try: - yield self._handle_received_pdu( + ret = yield self._handle_received_pdu( origin, pdu, thread_id=thread_id, new_thread=new_thread ) - pdu_results[event_id] = {} + pdu_results[event_id] = ret except FederationError as e: logger.warn("Error handling PDU %s: %s", event_id, e) pdu_results[event_id] = {"error": str(e)} @@ -259,7 +259,7 @@ class FederationServer(FederationBase): "pdus": pdu_results, } - logger.debug("Returning: %s", str(response)) + logger.info("Returning: %s", str(response)) yield self.transaction_actions.set_response( origin, @@ -627,11 +627,29 @@ class FederationServer(FederationBase): affected=pdu.event_id, ) + destinations = pdu.unsigned.get("destinations", {}) + + costs = yield self.store.get_destination_healths(list(destinations)) + + logger.info("Destinations: %s", destinations) + logger.info("Costs: %s", costs) + + dont_relay = set() + for dest, their_cost in destinations.items(): + our_cost = costs.get(dest) + if our_cost and their_cost and their_cost < our_cost: + dont_relay.add(dest) + + if destinations: + pdu.unsigned["destinations"] = {d: c for d, c in destinations.items() if d not in dont_relay} + yield self.handler.on_receive_pdu( origin, pdu, sent_to_us_directly=True, thread_id=thread_id, new_thread=new_thread, ) + defer.returnValue({"did_not_relay": list(dont_relay)}) + def __str__(self): return "" % self.server_name diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 52c483139c..b65254cfac 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -23,6 +23,7 @@ from twisted.internet import defer import synapse.metrics from synapse.api.errors import FederationDeniedError, HttpResponseException +from synapse.events import FrozenEvent from synapse.handlers.presence import format_user_presence_state, get_interested_remotes from synapse.metrics import ( LaterGauge, @@ -692,6 +693,10 @@ class TransactionQueue(object): logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id) if code == 200: + logger.info( + "TX [%s] {%s} got response json %s", + destination, txn_id, response + ) pdu_results = response.get("pdus", {}) for p in pdus: yield self._pdu_send_result( @@ -705,6 +710,7 @@ class TransactionQueue(object): defer.returnValue(success) + @defer.inlineCallbacks def _pdu_send_result(self, destination, txn_id, pdu, response): """Gets called after sending the event in a transaction, with the result for the event from the remote server. @@ -715,7 +721,35 @@ class TransactionQueue(object): "TX [%s] {%s} Remote returned error for %s: %s", destination, txn_id, pdu.event_id, response, ) + pdu_logger.info( + "SendErrorPDU", + extra={ + "event_id": pdu.event_id, "room_id": pdu.room_id, + "destination": destination, + "server": self.server_name, + }, + ) + new_destinations = set(pdu.unsigned.get("destinations", [])) + new_destinations.discard(destination) + yield self._send_pdu(pdu, list(new_destinations)) + elif "did_not_relay" in response and response["did_not_relay"]: + new_destinations = set(response["did_not_relay"]) + new_destinations.discard(destination) + + pdu_logger.info( + "DidNotRelayPDU", + extra={ + "event_id": pdu.event_id, "room_id": pdu.room_id, + "destination": destination, + "new_destinations": json.dumps(list(new_destinations)), + "server": self.server_name, + }, + ) + + yield self._send_pdu(pdu, list(new_destinations)) + + @defer.inlineCallbacks def _pdu_send_txn_failed(self, destination, txn_id, pdu): """Gets called when sending a transaction failed (after retries) """ @@ -724,3 +758,16 @@ class TransactionQueue(object): "TX [%s] {%s} Failed to send event %s", destination, txn_id, pdu.event_id, ) + + pdu_logger.info( + "SendFailPDU", + extra={ + "event_id": pdu.event_id, "room_id": pdu.room_id, + "destination": destination, + "server": self.server_name, + }, + ) + + new_destinations = set(pdu.unsigned.get("destinations", [])) + new_destinations.discard(destination) + yield self._send_pdu(pdu, list(new_destinations)) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index e18a567d01..0e7690e285 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -120,10 +120,10 @@ class Authenticator(object): ): raise FederationDeniedError(origin) - if not json_request["signatures"]: - raise NoAuthenticationError( - 401, "Missing Authorization headers", Codes.UNAUTHORIZED, - ) + # if not json_request["signatures"]: + # raise NoAuthenticationError( + # 401, "Missing Authorization headers", Codes.UNAUTHORIZED, + # ) # yield self.keyring.verify_json_for_server(origin, json_request)