From 224df403ea1af10fa984b511d10917b999783085 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Nov 2018 19:10:18 +0000 Subject: [PATCH] Fixup opentracing error logging --- synapse/federation/federation_server.py | 54 ++++++++++++------- synapse/federation/transaction_queue.py | 72 ++++++++++++++++--------- synapse/federation/transport/server.py | 14 ++++- synapse/handlers/federation.py | 4 +- 4 files changed, 95 insertions(+), 49 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index d3aee9e953..1d25b7c479 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -84,6 +84,8 @@ class FederationServer(FederationBase): # come in waves. self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000) + self.tracer = hs.get_tracer() + @defer.inlineCallbacks @log_function def on_backfill_request(self, origin, room_id, versions, limit): @@ -101,7 +103,7 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function - def on_incoming_transaction(self, origin, transaction_data): + def on_incoming_transaction(self, origin, transaction_data, span): # keep this as early as possible to make the calculated origin ts as # accurate as possible. request_time = self._clock.time_msec() @@ -119,13 +121,13 @@ class FederationServer(FederationBase): (origin, transaction.transaction_id), )): result = yield self._handle_incoming_transaction( - origin, transaction, request_time, + origin, transaction, request_time, span, ) defer.returnValue(result) @defer.inlineCallbacks - def _handle_incoming_transaction(self, origin, transaction, request_time): + def _handle_incoming_transaction(self, origin, transaction, request_time, span): """ Process an incoming transaction and return the HTTP response Args: @@ -224,22 +226,33 @@ class FederationServer(FederationBase): with nested_logging_context(event_id): thread_id, new_thread = pdu_to_thread[pdu.event_id] logger.info("Assigning thread %d to %s", thread_id, pdu.event_id) - try: - ret = yield self._handle_received_pdu( - origin, pdu, thread_id=thread_id, - new_thread=new_thread - ) - pdu_results[event_id] = ret - except FederationError as e: - logger.warn("Error handling PDU %s: %s", event_id, e) - pdu_results[event_id] = {"error": str(e)} - except Exception as e: - f = failure.Failure() - pdu_results[event_id] = {"error": str(e)} - logger.error( - "Failed to handle PDU %s: %s", - event_id, f.getTraceback().rstrip(), - ) + child_span = self.tracer.start_span('handle_pdu', child_of=span) + with child_span: + child_span.set_tag("event_id", event_id) + try: + ret = yield self._handle_received_pdu( + origin, pdu, thread_id=thread_id, + new_thread=new_thread, + span=child_span, + ) + if ret: + pdu_results[event_id] = ret + except FederationError as e: + logger.warn("Error handling PDU %s: %s", event_id, e) + pdu_results[event_id] = {"error": str(e)} + child_span.set_tag("error", True) + child_span.log_kv({"error", e}) + except Exception as e: + f = failure.Failure() + pdu_results[event_id] = {"error": str(e)} + logger.error( + "Failed to handle PDU %s: %s", + event_id, f.getTraceback().rstrip(), + ) + child_span.set_tag("error", True) + child_span.log_kv({"error", e}) + + child_span.log_kv({"pdu_result": pdu_results.get(event_id)}) yield concurrently_execute( process_pdus_for_room, pdus_by_room.keys(), @@ -594,7 +607,7 @@ class FederationServer(FederationBase): ) @defer.inlineCallbacks - def _handle_received_pdu(self, origin, pdu, thread_id, new_thread): + def _handle_received_pdu(self, origin, pdu, thread_id, new_thread, span): """ Process a PDU received in a federation /send/ transaction. If the event is invalid, then this method throws a FederationError. @@ -656,6 +669,7 @@ class FederationServer(FederationBase): yield self.handler.on_receive_pdu( origin, pdu, sent_to_us_directly=True, thread_id=thread_id, new_thread=new_thread, + span=span, ) defer.returnValue({"did_not_relay": list(dont_relay)}) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 3854602bfd..68d69f5193 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -250,7 +250,7 @@ class TransactionQueue(object): self._is_processing = False @defer.inlineCallbacks - def received_new_event(self, origin, event): + def received_new_event(self, origin, event, span): should_relay = yield self._should_relay(event, True) logger.info("Should relay event %s: %s", event.event_id, should_relay) if not should_relay: @@ -261,7 +261,7 @@ class TransactionQueue(object): logger.debug("Sending %s to %r", event, destinations) - yield self._send_pdu(event, destinations) + yield self._send_pdu(event, destinations, span) @defer.inlineCallbacks def _send_pdu(self, pdu, destinations, span=None): @@ -539,6 +539,12 @@ class TransactionQueue(object): pending_pdus = [] while True: + txn_id = str(self._next_txn_id) + self._next_txn_id += 1 + + for s in pdu_spans.values(): + s.set_tag("txn-id", txn_id) + device_message_edus, device_stream_id, dev_list_id = ( yield self._get_new_device_messages(destination) ) @@ -602,9 +608,10 @@ class TransactionQueue(object): return pdu_span_references = [] - for pdu, _, span in pending_pdus: - pdu_spans[pdu.event_id] = span - pdu_span_references.append(opentracing.follows_from(span.context)) + for pdu, _, p_span in pending_pdus: + pdu_spans[pdu.event_id] = p_span + p_span.set_tag("txn-id", txn_id) + pdu_span_references.append(opentracing.follows_from(p_span.context)) # END CRITICAL SECTION span = self.tracer.start_span( @@ -612,11 +619,39 @@ class TransactionQueue(object): ) with span: span.set_tag("destination", destination) + span.set_tag("txn-id", txn_id) + + try: + success = yield self._send_new_transaction( + destination, pending_pdus, pending_edus, span, + pdu_spans, txn_id, + ) + except Exception as e: + success = False + span.set_tag("error", True) + span.log_kv({"error": e}) + + for s in pdu_spans.values(): + s.set_tag("error", True) + s.log_kv({"transaction_error": e}) + + raise + finally: + if not success: + for p, _, _ in pending_pdus: + yield self._pdu_send_txn_failed( + destination, txn_id, p, + span=pdu_spans[p.event_id], + ) + + # We want to be *very* sure we del5ete this after we stop + # processing + self.pending_transactions.pop(destination, None) + for s in pdu_spans.values(): + s.finish() - success = yield self._send_new_transaction( - destination, pending_pdus, pending_edus, span, pdu_spans, - ) span.set_tag("success", success) + if success: sent_transactions_counter.inc() # Remove the acknowledged device messages from the database @@ -625,7 +660,9 @@ class TransactionQueue(object): yield self.store.delete_device_msgs_for_remote( destination, device_stream_id ) - logger.info("Marking as sent %r %r", destination, dev_list_id) + logger.info( + "Marking as sent %r %r", destination, dev_list_id, + ) yield self.store.mark_as_sent_devices_by_remote( destination, dev_list_id ) @@ -654,11 +691,6 @@ class TransactionQueue(object): for p, _, _ in pending_pdus: logger.info("Failed to send event %s to %s", p.event_id, destination) - finally: - # We want to be *very* sure we delete this after we stop processing - self.pending_transactions.pop(destination, None) - for span in pdu_spans.values(): - span.finish() @defer.inlineCallbacks def _get_new_device_messages(self, destination): @@ -695,7 +727,7 @@ class TransactionQueue(object): @measure_func("_send_new_transaction") @defer.inlineCallbacks def _send_new_transaction(self, destination, pending_pdus, pending_edus, - span, pdu_spans): + span, pdu_spans, txn_id): # Sort based on the order field pending_pdus.sort(key=lambda t: t[1]) @@ -707,9 +739,6 @@ class TransactionQueue(object): logger.debug("TX [%s] _attempt_new_transaction", destination) logger.debug("TX [%s] _attempt_new_transaction", destination) - txn_id = str(self._next_txn_id) - - span.set_tag("txn-id", txn_id) span.log_kv({ "pdus": len(pdus), "edus": len(edus), @@ -734,8 +763,6 @@ class TransactionQueue(object): edus=edus, ) - self._next_txn_id += 1 - yield self.transaction_actions.prepare_to_send(transaction) logger.debug("TX [%s] Persisted transaction", destination) @@ -806,11 +833,6 @@ class TransactionQueue(object): span=pdu_spans[p.event_id], ) else: - for p in pdus: - yield self._pdu_send_txn_failed( - destination, txn_id, p, - span=pdu_spans[p.event_id], - ) success = False defer.returnValue(success) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 072849d871..f503ccd6df 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -15,6 +15,7 @@ # limitations under the License. import functools +import inspect import logging import re import opentracing @@ -239,6 +240,12 @@ class BaseFederationServlet(object): authenticator = self.authenticator ratelimiter = self.ratelimiter + arg_spec = inspect.signature(func) + all_args = arg_spec.parameters + + include_span = "request_span" in all_args + logger.info("include_span: %s for %s", include_span, self) + @defer.inlineCallbacks @functools.wraps(func) def new_func(request, *args, **kwargs): @@ -294,6 +301,9 @@ class BaseFederationServlet(object): logger.warn("authenticate_request failed: %s", e) raise + if include_span: + kwargs["request_span"] = span + try: if origin: span.set_tag("origin", origin) @@ -342,7 +352,7 @@ class FederationSendServlet(BaseFederationServlet): # This is when someone is trying to send us a bunch of data. @defer.inlineCallbacks - def on_PUT(self, origin, content, query, transaction_id): + def on_PUT(self, origin, content, query, transaction_id, request_span): """ Called on PUT /send// Args: @@ -388,7 +398,7 @@ class FederationSendServlet(BaseFederationServlet): try: code, response = yield self.handler.on_incoming_transaction( - origin, transaction_data, + origin, transaction_data, request_span, ) except Exception: logger.exception("on_incoming_transaction failed") diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 9350ff989d..7c094d2700 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -146,7 +146,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def on_receive_pdu( self, origin, pdu, sent_to_us_directly=False, thread_id=None, - new_thread=False, + new_thread=False, span=None, ): """ Process a PDU received via a federation /send/ transaction, or via backfill of missing prev_events @@ -450,7 +450,7 @@ class FederationHandler(BaseHandler): ) if sent_to_us_directly: - yield self.federation_sender.received_new_event(origin, event_copy) + yield self.federation_sender.received_new_event(origin, event_copy, span) if new_thread: builder = self.event_builder_factory.new({