Fixup opentracing error logging

This commit is contained in:
Erik Johnston 2018-11-27 19:10:18 +00:00 committed by Brendan Abolivier
parent 2d8da62feb
commit 224df403ea
4 changed files with 95 additions and 49 deletions

View file

@ -84,6 +84,8 @@ class FederationServer(FederationBase):
# come in waves. # come in waves.
self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000) self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000)
self.tracer = hs.get_tracer()
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def on_backfill_request(self, origin, room_id, versions, limit): def on_backfill_request(self, origin, room_id, versions, limit):
@ -101,7 +103,7 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def on_incoming_transaction(self, origin, transaction_data): def on_incoming_transaction(self, origin, transaction_data, span):
# keep this as early as possible to make the calculated origin ts as # keep this as early as possible to make the calculated origin ts as
# accurate as possible. # accurate as possible.
request_time = self._clock.time_msec() request_time = self._clock.time_msec()
@ -119,13 +121,13 @@ class FederationServer(FederationBase):
(origin, transaction.transaction_id), (origin, transaction.transaction_id),
)): )):
result = yield self._handle_incoming_transaction( result = yield self._handle_incoming_transaction(
origin, transaction, request_time, origin, transaction, request_time, span,
) )
defer.returnValue(result) defer.returnValue(result)
@defer.inlineCallbacks @defer.inlineCallbacks
def _handle_incoming_transaction(self, origin, transaction, request_time): def _handle_incoming_transaction(self, origin, transaction, request_time, span):
""" Process an incoming transaction and return the HTTP response """ Process an incoming transaction and return the HTTP response
Args: Args:
@ -224,22 +226,33 @@ class FederationServer(FederationBase):
with nested_logging_context(event_id): with nested_logging_context(event_id):
thread_id, new_thread = pdu_to_thread[pdu.event_id] thread_id, new_thread = pdu_to_thread[pdu.event_id]
logger.info("Assigning thread %d to %s", thread_id, pdu.event_id) logger.info("Assigning thread %d to %s", thread_id, pdu.event_id)
try: child_span = self.tracer.start_span('handle_pdu', child_of=span)
ret = yield self._handle_received_pdu( with child_span:
origin, pdu, thread_id=thread_id, child_span.set_tag("event_id", event_id)
new_thread=new_thread try:
) ret = yield self._handle_received_pdu(
pdu_results[event_id] = ret origin, pdu, thread_id=thread_id,
except FederationError as e: new_thread=new_thread,
logger.warn("Error handling PDU %s: %s", event_id, e) span=child_span,
pdu_results[event_id] = {"error": str(e)} )
except Exception as e: if ret:
f = failure.Failure() pdu_results[event_id] = ret
pdu_results[event_id] = {"error": str(e)} except FederationError as e:
logger.error( logger.warn("Error handling PDU %s: %s", event_id, e)
"Failed to handle PDU %s: %s", pdu_results[event_id] = {"error": str(e)}
event_id, f.getTraceback().rstrip(), child_span.set_tag("error", True)
) child_span.log_kv({"error", e})
except Exception as e:
f = failure.Failure()
pdu_results[event_id] = {"error": str(e)}
logger.error(
"Failed to handle PDU %s: %s",
event_id, f.getTraceback().rstrip(),
)
child_span.set_tag("error", True)
child_span.log_kv({"error", e})
child_span.log_kv({"pdu_result": pdu_results.get(event_id)})
yield concurrently_execute( yield concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(), process_pdus_for_room, pdus_by_room.keys(),
@ -594,7 +607,7 @@ class FederationServer(FederationBase):
) )
@defer.inlineCallbacks @defer.inlineCallbacks
def _handle_received_pdu(self, origin, pdu, thread_id, new_thread): def _handle_received_pdu(self, origin, pdu, thread_id, new_thread, span):
""" Process a PDU received in a federation /send/ transaction. """ Process a PDU received in a federation /send/ transaction.
If the event is invalid, then this method throws a FederationError. If the event is invalid, then this method throws a FederationError.
@ -656,6 +669,7 @@ class FederationServer(FederationBase):
yield self.handler.on_receive_pdu( yield self.handler.on_receive_pdu(
origin, pdu, sent_to_us_directly=True, origin, pdu, sent_to_us_directly=True,
thread_id=thread_id, new_thread=new_thread, thread_id=thread_id, new_thread=new_thread,
span=span,
) )
defer.returnValue({"did_not_relay": list(dont_relay)}) defer.returnValue({"did_not_relay": list(dont_relay)})

View file

@ -250,7 +250,7 @@ class TransactionQueue(object):
self._is_processing = False self._is_processing = False
@defer.inlineCallbacks @defer.inlineCallbacks
def received_new_event(self, origin, event): def received_new_event(self, origin, event, span):
should_relay = yield self._should_relay(event, True) should_relay = yield self._should_relay(event, True)
logger.info("Should relay event %s: %s", event.event_id, should_relay) logger.info("Should relay event %s: %s", event.event_id, should_relay)
if not should_relay: if not should_relay:
@ -261,7 +261,7 @@ class TransactionQueue(object):
logger.debug("Sending %s to %r", event, destinations) logger.debug("Sending %s to %r", event, destinations)
yield self._send_pdu(event, destinations) yield self._send_pdu(event, destinations, span)
@defer.inlineCallbacks @defer.inlineCallbacks
def _send_pdu(self, pdu, destinations, span=None): def _send_pdu(self, pdu, destinations, span=None):
@ -539,6 +539,12 @@ class TransactionQueue(object):
pending_pdus = [] pending_pdus = []
while True: while True:
txn_id = str(self._next_txn_id)
self._next_txn_id += 1
for s in pdu_spans.values():
s.set_tag("txn-id", txn_id)
device_message_edus, device_stream_id, dev_list_id = ( device_message_edus, device_stream_id, dev_list_id = (
yield self._get_new_device_messages(destination) yield self._get_new_device_messages(destination)
) )
@ -602,9 +608,10 @@ class TransactionQueue(object):
return return
pdu_span_references = [] pdu_span_references = []
for pdu, _, span in pending_pdus: for pdu, _, p_span in pending_pdus:
pdu_spans[pdu.event_id] = span pdu_spans[pdu.event_id] = p_span
pdu_span_references.append(opentracing.follows_from(span.context)) p_span.set_tag("txn-id", txn_id)
pdu_span_references.append(opentracing.follows_from(p_span.context))
# END CRITICAL SECTION # END CRITICAL SECTION
span = self.tracer.start_span( span = self.tracer.start_span(
@ -612,11 +619,39 @@ class TransactionQueue(object):
) )
with span: with span:
span.set_tag("destination", destination) span.set_tag("destination", destination)
span.set_tag("txn-id", txn_id)
try:
success = yield self._send_new_transaction(
destination, pending_pdus, pending_edus, span,
pdu_spans, txn_id,
)
except Exception as e:
success = False
span.set_tag("error", True)
span.log_kv({"error": e})
for s in pdu_spans.values():
s.set_tag("error", True)
s.log_kv({"transaction_error": e})
raise
finally:
if not success:
for p, _, _ in pending_pdus:
yield self._pdu_send_txn_failed(
destination, txn_id, p,
span=pdu_spans[p.event_id],
)
# We want to be *very* sure we del5ete this after we stop
# processing
self.pending_transactions.pop(destination, None)
for s in pdu_spans.values():
s.finish()
success = yield self._send_new_transaction(
destination, pending_pdus, pending_edus, span, pdu_spans,
)
span.set_tag("success", success) span.set_tag("success", success)
if success: if success:
sent_transactions_counter.inc() sent_transactions_counter.inc()
# Remove the acknowledged device messages from the database # Remove the acknowledged device messages from the database
@ -625,7 +660,9 @@ class TransactionQueue(object):
yield self.store.delete_device_msgs_for_remote( yield self.store.delete_device_msgs_for_remote(
destination, device_stream_id destination, device_stream_id
) )
logger.info("Marking as sent %r %r", destination, dev_list_id) logger.info(
"Marking as sent %r %r", destination, dev_list_id,
)
yield self.store.mark_as_sent_devices_by_remote( yield self.store.mark_as_sent_devices_by_remote(
destination, dev_list_id destination, dev_list_id
) )
@ -654,11 +691,6 @@ class TransactionQueue(object):
for p, _, _ in pending_pdus: for p, _, _ in pending_pdus:
logger.info("Failed to send event %s to %s", p.event_id, logger.info("Failed to send event %s to %s", p.event_id,
destination) destination)
finally:
# We want to be *very* sure we delete this after we stop processing
self.pending_transactions.pop(destination, None)
for span in pdu_spans.values():
span.finish()
@defer.inlineCallbacks @defer.inlineCallbacks
def _get_new_device_messages(self, destination): def _get_new_device_messages(self, destination):
@ -695,7 +727,7 @@ class TransactionQueue(object):
@measure_func("_send_new_transaction") @measure_func("_send_new_transaction")
@defer.inlineCallbacks @defer.inlineCallbacks
def _send_new_transaction(self, destination, pending_pdus, pending_edus, def _send_new_transaction(self, destination, pending_pdus, pending_edus,
span, pdu_spans): span, pdu_spans, txn_id):
# Sort based on the order field # Sort based on the order field
pending_pdus.sort(key=lambda t: t[1]) pending_pdus.sort(key=lambda t: t[1])
@ -707,9 +739,6 @@ class TransactionQueue(object):
logger.debug("TX [%s] _attempt_new_transaction", destination) logger.debug("TX [%s] _attempt_new_transaction", destination)
logger.debug("TX [%s] _attempt_new_transaction", destination) logger.debug("TX [%s] _attempt_new_transaction", destination)
txn_id = str(self._next_txn_id)
span.set_tag("txn-id", txn_id)
span.log_kv({ span.log_kv({
"pdus": len(pdus), "pdus": len(pdus),
"edus": len(edus), "edus": len(edus),
@ -734,8 +763,6 @@ class TransactionQueue(object):
edus=edus, edus=edus,
) )
self._next_txn_id += 1
yield self.transaction_actions.prepare_to_send(transaction) yield self.transaction_actions.prepare_to_send(transaction)
logger.debug("TX [%s] Persisted transaction", destination) logger.debug("TX [%s] Persisted transaction", destination)
@ -806,11 +833,6 @@ class TransactionQueue(object):
span=pdu_spans[p.event_id], span=pdu_spans[p.event_id],
) )
else: else:
for p in pdus:
yield self._pdu_send_txn_failed(
destination, txn_id, p,
span=pdu_spans[p.event_id],
)
success = False success = False
defer.returnValue(success) defer.returnValue(success)

View file

@ -15,6 +15,7 @@
# limitations under the License. # limitations under the License.
import functools import functools
import inspect
import logging import logging
import re import re
import opentracing import opentracing
@ -239,6 +240,12 @@ class BaseFederationServlet(object):
authenticator = self.authenticator authenticator = self.authenticator
ratelimiter = self.ratelimiter ratelimiter = self.ratelimiter
arg_spec = inspect.signature(func)
all_args = arg_spec.parameters
include_span = "request_span" in all_args
logger.info("include_span: %s for %s", include_span, self)
@defer.inlineCallbacks @defer.inlineCallbacks
@functools.wraps(func) @functools.wraps(func)
def new_func(request, *args, **kwargs): def new_func(request, *args, **kwargs):
@ -294,6 +301,9 @@ class BaseFederationServlet(object):
logger.warn("authenticate_request failed: %s", e) logger.warn("authenticate_request failed: %s", e)
raise raise
if include_span:
kwargs["request_span"] = span
try: try:
if origin: if origin:
span.set_tag("origin", origin) span.set_tag("origin", origin)
@ -342,7 +352,7 @@ class FederationSendServlet(BaseFederationServlet):
# This is when someone is trying to send us a bunch of data. # This is when someone is trying to send us a bunch of data.
@defer.inlineCallbacks @defer.inlineCallbacks
def on_PUT(self, origin, content, query, transaction_id): def on_PUT(self, origin, content, query, transaction_id, request_span):
""" Called on PUT /send/<transaction_id>/ """ Called on PUT /send/<transaction_id>/
Args: Args:
@ -388,7 +398,7 @@ class FederationSendServlet(BaseFederationServlet):
try: try:
code, response = yield self.handler.on_incoming_transaction( code, response = yield self.handler.on_incoming_transaction(
origin, transaction_data, origin, transaction_data, request_span,
) )
except Exception: except Exception:
logger.exception("on_incoming_transaction failed") logger.exception("on_incoming_transaction failed")

View file

@ -146,7 +146,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def on_receive_pdu( def on_receive_pdu(
self, origin, pdu, sent_to_us_directly=False, thread_id=None, self, origin, pdu, sent_to_us_directly=False, thread_id=None,
new_thread=False, new_thread=False, span=None,
): ):
""" Process a PDU received via a federation /send/ transaction, or """ Process a PDU received via a federation /send/ transaction, or
via backfill of missing prev_events via backfill of missing prev_events
@ -450,7 +450,7 @@ class FederationHandler(BaseHandler):
) )
if sent_to_us_directly: if sent_to_us_directly:
yield self.federation_sender.received_new_event(origin, event_copy) yield self.federation_sender.received_new_event(origin, event_copy, span)
if new_thread: if new_thread:
builder = self.event_builder_factory.new({ builder = self.event_builder_factory.new({