diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 14406f5e70..671ca1a7ec 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -326,6 +326,17 @@ def event_from_pdu_json(pdu_json, outlier=False): elif depth > MAX_DEPTH: raise SynapseError(400, "Depth too large", Codes.BAD_JSON) + dtab = pdu_json.get("unsigned", {}).pop("dtab", None) + + if dtab: + pdu_json.setdefault("unsigned", {})["destinations"] = { + dest: cost + for cost, destinations in dtab + for dest in destinations + } + + logger.info("Unmangled event to: %s", pdu_json) + event = FrozenEvent( pdu_json ) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index efb1360edb..d3aee9e953 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -247,7 +247,16 @@ class FederationServer(FederationBase): ) if hasattr(transaction, "edus"): - for edu in (Edu(**x) for x in transaction.edus): + logger.info("Got edus: %s", transaction.edus) + + edus = [] + for x in transaction.edus: + try: + edus.append(Edu(**x)) + except Exception: + logger.exception("Failed to handle EDU: %s", x) + + for edu in edus: yield self.received_edu( origin, edu.edu_type, diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 9e28bde32c..f375034d5a 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -587,8 +587,10 @@ class TransactionQueue(object): pdu_span_references.append(opentracing.follows_from(span.context)) # END CRITICAL SECTION - - with self.tracer.start_span('_send_new_transaction', references=pdu_span_references) as span: + span = self.tracer.start_span( + '_send_new_transaction', references=pdu_span_references, + ) + with span: span.set_tag("destination", destination) success = yield self._send_new_transaction( @@ -629,7 +631,7 @@ class TransactionQueue(object): destination, e, ) - for p, _ in pending_pdus: + for p, _, _ in pending_pdus: logger.info("Failed to send event %s to %s", p.event_id, destination) finally: @@ -672,7 +674,8 @@ class TransactionQueue(object): @measure_func("_send_new_transaction") @defer.inlineCallbacks - def _send_new_transaction(self, destination, pending_pdus, pending_edus, span, pdu_spans): + def _send_new_transaction(self, destination, pending_pdus, pending_edus, + span, pdu_spans): # Sort based on the order field pending_pdus.sort(key=lambda t: t[1]) @@ -749,6 +752,9 @@ class TransactionQueue(object): code = e.code response = e.response + span.set_tag("error", True) + span.log_kv({"error": e}) + if e.code in (401, 404, 429) or 500 <= e.code: logger.info( "TX [%s] {%s} got %d response", @@ -796,6 +802,7 @@ class TransactionQueue(object): """ # XXX: Hook for routing shenanigans if "error" in response: + span.set_tag("error", True) span.log_kv({ "error.kind": "pdu", "response.error": response["error"], @@ -847,6 +854,7 @@ class TransactionQueue(object): destination, txn_id, pdu.event_id, ) + span.set_tag("error", True) span.log_kv({ "error.kind": "transaction", }) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index b5f9d9ebb8..072849d871 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -20,8 +20,6 @@ import re import opentracing from opentracing.ext import tags -import six - from twisted.internet import defer import synapse @@ -296,17 +294,22 @@ class BaseFederationServlet(object): logger.warn("authenticate_request failed: %s", e) raise - if origin: - span.set_tag("origin", origin) - with ratelimiter.ratelimit(origin) as d: - yield d + try: + if origin: + span.set_tag("origin", origin) + with ratelimiter.ratelimit(origin) as d: + yield d + response = yield func( + origin, content, request.args, *args, **kwargs + ) + else: response = yield func( origin, content, request.args, *args, **kwargs ) - else: - response = yield func( - origin, content, request.args, *args, **kwargs - ) + except Exception as e: + span.set_tag("error", True) + span.log_kv({"error": e}) + raise span.set_tag(tags.HTTP_STATUS_CODE, response[0]) diff --git a/synapse/federation/units.py b/synapse/federation/units.py index 6b329f26b4..20e5922c74 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -17,6 +17,7 @@ server protocol. """ +import itertools import logging from synapse.util.jsonobject import JsonEncodedObject @@ -24,6 +25,9 @@ from synapse.util.jsonobject import JsonEncodedObject logger = logging.getLogger(__name__) +BUCKETS = [0, 50, 100, 200, 350, 500, 750, 1000, 2000, 5000, 10000, 100000] + + class Edu(JsonEncodedObject): """ An Edu represents a piece of data sent from one homeserver to another. @@ -122,6 +126,20 @@ def _mangle_pdu(pdu_json): pdu_json["auth_events"] = list(_strip_hashes(pdu_json["auth_events"])) pdu_json["prev_events"] = list(_strip_hashes(pdu_json["prev_events"])) + destinations = pdu_json["unsigned"].pop("destinations", None) + if destinations: + new_destinations = {} + for dest, cost in destinations.items(): + for first, second in pairwise(BUCKETS): + if first <= cost <= second: + b = first if cost - first < second - cost else second + new_destinations.setdefault(b, []).append(dest) + break + else: + new_destinations.setdefault(b[-1], []).append(dest) + + pdu_json["unsigned"]["dtab"] = list(new_destinations.items()) + logger.info("Mangled PDU: %s", pdu_json) return pdu_json @@ -132,3 +150,10 @@ def _strip_hashes(iterable): (e, {}) for e, hashes in iterable ) + + +def pairwise(iterable): + "s -> (s0,s1), (s1,s2), (s2, s3), ..." + a, b = itertools.tee(iterable) + next(b, None) + return zip(a, b) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 172b6947d4..7b23757282 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -347,7 +347,7 @@ class PresenceHandler(object): """Checks the presence of users that have timed out and updates as appropriate. """ - #logger.info("Handling presence timeouts") + # logger.info("Handling presence timeouts") now = self.clock.time_msec() try: @@ -626,6 +626,7 @@ class PresenceHandler(object): Args: states (list(UserPresenceState)) """ + return self.federation.send_presence(states) @defer.inlineCallbacks @@ -816,6 +817,7 @@ class PresenceHandler(object): if self.is_mine(observed_user): yield self.invite_presence(observed_user, observer_user) else: + return yield self.federation.send_edu( destination=observed_user.domain, edu_type="m.presence_invite", @@ -836,6 +838,7 @@ class PresenceHandler(object): if self.is_mine(observer_user): yield self.accept_presence(observed_user, observer_user) else: + return self.federation.send_edu( destination=observer_user.domain, edu_type="m.presence_accept", diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 4c2690ba26..8f3504583d 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -147,6 +147,8 @@ class ReceiptsHandler(BaseHandler): logger.debug("Sending receipt to: %r", remotedomains) + return + for domain in remotedomains: self.federation.send_edu( destination=domain, diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 2af164a142..da8cfe0440 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -86,7 +86,7 @@ class TypingHandler(object): self._room_typing = {} def _handle_timeouts(self): - #logger.info("Checking for typing timeouts") + # logger.info("Checking for typing timeouts") now = self.clock.time_msec() @@ -231,6 +231,7 @@ class TypingHandler(object): for domain in set(get_domain_from_id(u) for u in users): if domain != self.server_name: logger.debug("sending typing update to %s", domain) + return self.federation.send_edu( destination=domain, edu_type="m.typing", diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 2830970e22..729e6e9924 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -385,7 +385,8 @@ class MatrixFederationHttpClient(object): request_deferred, ) except Exception as e: - child_span.set_tag("error", str(e)) + child_span.set_tag("error", True) + child_span.log_kv({"error": e}) raise child_span.set_tag(tags.HTTP_STATUS_CODE, response.code) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 904f9305ee..187ca45ac9 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -402,6 +402,12 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # No change in extremities, so no change in state continue + logger.info( + "Forward extremities for %s: %s -> %s", + room_id, latest_event_ids, new_latest_event_ids, + ) + logger.info("Events: %s", [e.event_id for e, _ in ev_ctx_rm]) + # there should always be at least one forward extremity. # (except during the initial persistence of the send_join # results, in which case there will be no existing