diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index b4b9a05ca6..1095483a66 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -53,6 +53,8 @@ from synapse.util import glob_to_regex from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.caches.response_cache import ResponseCache +import synapse.logging.opentracing as opentracing + # when processing incoming transactions, we try to handle multiple rooms in # parallel, up to this limit. TRANSACTION_CONCURRENCY_LIMIT = 10 @@ -808,12 +810,13 @@ class FederationHandlerRegistry(object): if not handler: logger.warn("No handler registered for EDU type %s", edu_type) - try: - yield handler(origin, content) - except SynapseError as e: - logger.info("Failed to handle edu %r: %r", edu_type, e) - except Exception: - logger.exception("Failed to handle edu %r", edu_type) + with opentracing.start_active_span_from_edu(content, "handle_edu"): + try: + yield handler(origin, content) + except SynapseError as e: + logger.info("Failed to handle edu %r: %r", edu_type, e) + except Exception: + logger.exception("Failed to handle edu %r", edu_type) def on_query(self, query_type, args): handler = self.query_handlers.get(query_type) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index fad980b893..734a908ca2 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -16,10 +16,12 @@ import datetime import logging +from canonicaljson import json from prometheus_client import Counter from twisted.internet import defer +import synapse.logging.opentracing as opentracing from synapse.api.errors import ( FederationDeniedError, HttpResponseException, @@ -204,97 +206,142 @@ class PerDestinationQueue(object): pending_edus = device_update_edus + to_device_edus - # BEGIN CRITICAL SECTION - # - # In order to avoid a race condition, we need to make sure that - # the following code (from popping the queues up to the point - # where we decide if we actually have any pending messages) is - # atomic - otherwise new PDUs or EDUs might arrive in the - # meantime, but not get sent because we hold the - # transmission_loop_running flag. + # Make a transaction sending span, this span follows on from all the + # edus in that transaction. This needs to be done because if the edus + # are never received on the remote the span effectively has no causality. - pending_pdus = self._pending_pdus - - # We can only include at most 50 PDUs per transactions - pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:] - - pending_edus.extend(self._get_rr_edus(force_flush=False)) - pending_presence = self._pending_presence - self._pending_presence = {} - if pending_presence: - pending_edus.append( - Edu( - origin=self._server_name, - destination=self._destination, - edu_type="m.presence", - content={ - "push": [ - format_user_presence_state( - presence, self._clock.time_msec() - ) - for presence in pending_presence.values() - ] - }, - ) + span_contexts = [ + opentracing.extract_text_map( + json.loads( + edu.get_dict().get("content", {}).get("context", "{}") + ).get("opentracing", {}) ) + for edu in pending_edus + ] - pending_edus.extend( - self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus)) - ) - while ( - len(pending_edus) < MAX_EDUS_PER_TRANSACTION - and self._pending_edus_keyed + with opentracing.start_active_span_follows_from( + "send_transaction", span_contexts ): - _, val = self._pending_edus_keyed.popitem() - pending_edus.append(val) + # Link each sent edu to this transaction's span + _pending_edus = [] + for edu in pending_edus: + edu_dict = edu.get_dict() + span_context = json.loads( + edu_dict.get("content", {}).get("context", "{}") + ).get("opentracing", {}) + # If there is no span context then we are either blacklisting + # this destination or we are not tracing + if not span_context == {}: + if not "references" in span_context: + span_context["references"] = [ + opentracing.active_span_context_as_string() + ] + else: + span_context["references"].append( + opentracing.active_span_context_as_string() + ) + edu_dict["content"]["context"] = json.dumps( + {"opentracing": span_context} + ) + _pending_edus.append(Edu(**edu_dict)) + pending_edus = _pending_edus - if pending_pdus: - logger.debug( - "TX [%s] len(pending_pdus_by_dest[dest]) = %d", - self._destination, - len(pending_pdus), + # BEGIN CRITICAL SECTION + # + # In order to avoid a race condition, we need to make sure that + # the following code (from popping the queues up to the point + # where we decide if we actually have any pending messages) is + # atomic - otherwise new PDUs or EDUs might arrive in the + # meantime, but not get sent because we hold the + # transmission_loop_running flag. + + pending_pdus = self._pending_pdus + + # We can only include at most 50 PDUs per transactions + pending_pdus, self._pending_pdus = ( + pending_pdus[:50], + pending_pdus[50:], ) - if not pending_pdus and not pending_edus: - logger.debug("TX [%s] Nothing to send", self._destination) - self._last_device_stream_id = device_stream_id - return - - # if we've decided to send a transaction anyway, and we have room, we - # may as well send any pending RRs - if len(pending_edus) < MAX_EDUS_PER_TRANSACTION: - pending_edus.extend(self._get_rr_edus(force_flush=True)) - - # END CRITICAL SECTION - - success = yield self._transaction_manager.send_new_transaction( - self._destination, pending_pdus, pending_edus - ) - if success: - sent_transactions_counter.inc() - sent_edus_counter.inc(len(pending_edus)) - for edu in pending_edus: - sent_edus_by_type.labels(edu.edu_type).inc() - # Remove the acknowledged device messages from the database - # Only bother if we actually sent some device messages - if to_device_edus: - yield self._store.delete_device_msgs_for_remote( - self._destination, device_stream_id + pending_edus.extend(self._get_rr_edus(force_flush=False)) + pending_presence = self._pending_presence + self._pending_presence = {} + if pending_presence: + pending_edus.append( + Edu( + origin=self._server_name, + destination=self._destination, + edu_type="m.presence", + content={ + "push": [ + format_user_presence_state( + presence, self._clock.time_msec() + ) + for presence in pending_presence.values() + ] + }, + ) ) - # also mark the device updates as sent - if device_update_edus: - logger.info( - "Marking as sent %r %r", self._destination, dev_list_id + pending_edus.extend( + self._pop_pending_edus( + MAX_EDUS_PER_TRANSACTION - len(pending_edus) ) - yield self._store.mark_as_sent_devices_by_remote( - self._destination, dev_list_id + ) + while ( + len(pending_edus) < MAX_EDUS_PER_TRANSACTION + and self._pending_edus_keyed + ): + _, val = self._pending_edus_keyed.popitem() + pending_edus.append(val) + + if pending_pdus: + logger.debug( + "TX [%s] len(pending_pdus_by_dest[dest]) = %d", + self._destination, + len(pending_pdus), ) - self._last_device_stream_id = device_stream_id - self._last_device_list_stream_id = dev_list_id - else: - break + if not pending_pdus and not pending_edus: + logger.debug("TX [%s] Nothing to send", self._destination) + self._last_device_stream_id = device_stream_id + return + + # if we've decided to send a transaction anyway, and we have room, we + # may as well send any pending RRs + if len(pending_edus) < MAX_EDUS_PER_TRANSACTION: + pending_edus.extend(self._get_rr_edus(force_flush=True)) + + # END CRITICAL SECTION + + success = yield self._transaction_manager.send_new_transaction( + self._destination, pending_pdus, pending_edus + ) + if success: + sent_transactions_counter.inc() + sent_edus_counter.inc(len(pending_edus)) + for edu in pending_edus: + sent_edus_by_type.labels(edu.edu_type).inc() + # Remove the acknowledged device messages from the database + # Only bother if we actually sent some device messages + if to_device_edus: + yield self._store.delete_device_msgs_for_remote( + self._destination, device_stream_id + ) + + # also mark the device updates as sent + if device_update_edus: + logger.info( + "Marking as sent %r %r", self._destination, dev_list_id + ) + yield self._store.mark_as_sent_devices_by_remote( + self._destination, dev_list_id + ) + + self._last_device_stream_id = device_stream_id + self._last_device_list_stream_id = dev_list_id + else: + break except NotRetryingDestination as e: logger.debug( "TX [%s] not ready for retry yet (next retry at %s) - " diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 6463d900cd..bed564e87b 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -15,12 +15,14 @@ import logging +from canonicaljson import json + from twisted.internet import defer +import synapse.logging.opentracing as opentracing from synapse.api.errors import SynapseError from synapse.types import UserID, get_domain_from_id from synapse.util.stringutils import random_string -import synapse.logging.opentracing as opentracing logger = logging.getLogger(__name__) @@ -102,14 +104,22 @@ class DeviceMessageHandler(object): message_id = random_string(16) + context = {"opentracing": {}} + opentracing.inject_active_span_text_map(context["opentracing"]) + remote_edu_contents = {} for destination, messages in remote_messages.items(): - remote_edu_contents[destination] = { - "messages": messages, - "sender": sender_user_id, - "type": message_type, - "message_id": message_id, - } + with opentracing.start_active_span("to_device_for_user"): + opentracing.set_tag("destination", destination) + remote_edu_contents[destination] = { + "messages": messages, + "sender": sender_user_id, + "type": message_type, + "message_id": message_id, + "context": json.dumps(context) + if opentracing.whitelisted_homeserver(destination) + else "", + } opentracing.log_kv(local_messages) stream_id = yield self.store.add_messages_to_device_inbox( diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index e88e3f69ba..f89cbad2ef 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -74,6 +74,7 @@ class DeviceWorkerStore(SQLBaseStore): return {d["device_id"]: d for d in devices} + @opentracing.trace_defered_function @defer.inlineCallbacks def get_devices_by_remote(self, destination, from_stream_id, limit): """Get stream of updates to send to remote servers @@ -128,8 +129,10 @@ class DeviceWorkerStore(SQLBaseStore): # (user_id, device_id) entries into a map, with the value being # the max stream_id across each set of duplicate entries # - # maps (user_id, device_id) -> stream_id + # maps (user_id, device_id) -> (stream_id, context) # as long as their stream_id does not match that of the last row + # where context is any metadata about the message's context such as + # opentracing data query_map = {} for update in updates: if stream_id_cutoff is not None and update[2] >= stream_id_cutoff: @@ -137,7 +140,7 @@ class DeviceWorkerStore(SQLBaseStore): break key = (update[0], update[1]) - query_map[key] = max(query_map.get(key, 0), update[2]) + query_map[key] = (max(query_map.get(key, 0), update[2]), update[3]) # If we didn't find any updates with a stream_id lower than the cutoff, it # means that there are more than limit updates all of which have the same @@ -172,7 +175,7 @@ class DeviceWorkerStore(SQLBaseStore): List: List of device updates """ sql = """ - SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes + SELECT user_id, device_id, stream_id, context FROM device_lists_outbound_pokes WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ? ORDER BY stream_id LIMIT ? @@ -211,12 +214,15 @@ class DeviceWorkerStore(SQLBaseStore): destination, user_id, from_stream_id ) for device_id, device in iteritems(user_devices): - stream_id = query_map[(user_id, device_id)] + stream_id = query_map[(user_id, device_id)][0] result = { "user_id": user_id, "device_id": device_id, "prev_id": [prev_id] if prev_id else [], "stream_id": stream_id, + "context": query_map[(user_id, device_id)][1] + if opentracing.whitelisted_homeserver(destination) + else "", } prev_id = stream_id @@ -819,6 +825,9 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore): ], ) + context = {"opentracing": {}} + opentracing.inject_active_span_text_map(context["opentracing"]) + self._simple_insert_many_txn( txn, table="device_lists_outbound_pokes", @@ -830,6 +839,9 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore): "device_id": device_id, "sent": False, "ts": now, + "context": json.dumps(context) + if opentracing.whitelisted_homeserver(destination) + else "", } for destination in hosts for device_id in device_ids diff --git a/synapse/storage/schema/delta/55/add_spans_to_device_lists.sql b/synapse/storage/schema/delta/55/add_spans_to_device_lists.sql new file mode 100644 index 0000000000..a4c6b917f7 --- /dev/null +++ b/synapse/storage/schema/delta/55/add_spans_to_device_lists.sql @@ -0,0 +1,16 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C.d + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE device_lists_outbound_pokes ADD context TEXT; \ No newline at end of file