Opentracing across streams

This commit is contained in:
Jorik Schellekens 2019-07-17 13:53:22 +01:00
parent bfc50050fd
commit 957cd77e95
5 changed files with 184 additions and 96 deletions

View file

@ -53,6 +53,8 @@ from synapse.util import glob_to_regex
from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache from synapse.util.caches.response_cache import ResponseCache
import synapse.logging.opentracing as opentracing
# when processing incoming transactions, we try to handle multiple rooms in # when processing incoming transactions, we try to handle multiple rooms in
# parallel, up to this limit. # parallel, up to this limit.
TRANSACTION_CONCURRENCY_LIMIT = 10 TRANSACTION_CONCURRENCY_LIMIT = 10
@ -808,12 +810,13 @@ class FederationHandlerRegistry(object):
if not handler: if not handler:
logger.warn("No handler registered for EDU type %s", edu_type) logger.warn("No handler registered for EDU type %s", edu_type)
try: with opentracing.start_active_span_from_edu(content, "handle_edu"):
yield handler(origin, content) try:
except SynapseError as e: yield handler(origin, content)
logger.info("Failed to handle edu %r: %r", edu_type, e) except SynapseError as e:
except Exception: logger.info("Failed to handle edu %r: %r", edu_type, e)
logger.exception("Failed to handle edu %r", edu_type) except Exception:
logger.exception("Failed to handle edu %r", edu_type)
def on_query(self, query_type, args): def on_query(self, query_type, args):
handler = self.query_handlers.get(query_type) handler = self.query_handlers.get(query_type)

View file

@ -16,10 +16,12 @@
import datetime import datetime
import logging import logging
from canonicaljson import json
from prometheus_client import Counter from prometheus_client import Counter
from twisted.internet import defer from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.api.errors import ( from synapse.api.errors import (
FederationDeniedError, FederationDeniedError,
HttpResponseException, HttpResponseException,
@ -204,97 +206,142 @@ class PerDestinationQueue(object):
pending_edus = device_update_edus + to_device_edus pending_edus = device_update_edus + to_device_edus
# BEGIN CRITICAL SECTION # Make a transaction sending span, this span follows on from all the
# # edus in that transaction. This needs to be done because if the edus
# In order to avoid a race condition, we need to make sure that # are never received on the remote the span effectively has no causality.
# the following code (from popping the queues up to the point
# where we decide if we actually have any pending messages) is
# atomic - otherwise new PDUs or EDUs might arrive in the
# meantime, but not get sent because we hold the
# transmission_loop_running flag.
pending_pdus = self._pending_pdus span_contexts = [
opentracing.extract_text_map(
# We can only include at most 50 PDUs per transactions json.loads(
pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:] edu.get_dict().get("content", {}).get("context", "{}")
).get("opentracing", {})
pending_edus.extend(self._get_rr_edus(force_flush=False))
pending_presence = self._pending_presence
self._pending_presence = {}
if pending_presence:
pending_edus.append(
Edu(
origin=self._server_name,
destination=self._destination,
edu_type="m.presence",
content={
"push": [
format_user_presence_state(
presence, self._clock.time_msec()
)
for presence in pending_presence.values()
]
},
)
) )
for edu in pending_edus
]
pending_edus.extend( with opentracing.start_active_span_follows_from(
self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus)) "send_transaction", span_contexts
)
while (
len(pending_edus) < MAX_EDUS_PER_TRANSACTION
and self._pending_edus_keyed
): ):
_, val = self._pending_edus_keyed.popitem() # Link each sent edu to this transaction's span
pending_edus.append(val) _pending_edus = []
for edu in pending_edus:
edu_dict = edu.get_dict()
span_context = json.loads(
edu_dict.get("content", {}).get("context", "{}")
).get("opentracing", {})
# If there is no span context then we are either blacklisting
# this destination or we are not tracing
if not span_context == {}:
if not "references" in span_context:
span_context["references"] = [
opentracing.active_span_context_as_string()
]
else:
span_context["references"].append(
opentracing.active_span_context_as_string()
)
edu_dict["content"]["context"] = json.dumps(
{"opentracing": span_context}
)
_pending_edus.append(Edu(**edu_dict))
pending_edus = _pending_edus
if pending_pdus: # BEGIN CRITICAL SECTION
logger.debug( #
"TX [%s] len(pending_pdus_by_dest[dest]) = %d", # In order to avoid a race condition, we need to make sure that
self._destination, # the following code (from popping the queues up to the point
len(pending_pdus), # where we decide if we actually have any pending messages) is
# atomic - otherwise new PDUs or EDUs might arrive in the
# meantime, but not get sent because we hold the
# transmission_loop_running flag.
pending_pdus = self._pending_pdus
# We can only include at most 50 PDUs per transactions
pending_pdus, self._pending_pdus = (
pending_pdus[:50],
pending_pdus[50:],
) )
if not pending_pdus and not pending_edus: pending_edus.extend(self._get_rr_edus(force_flush=False))
logger.debug("TX [%s] Nothing to send", self._destination) pending_presence = self._pending_presence
self._last_device_stream_id = device_stream_id self._pending_presence = {}
return if pending_presence:
pending_edus.append(
# if we've decided to send a transaction anyway, and we have room, we Edu(
# may as well send any pending RRs origin=self._server_name,
if len(pending_edus) < MAX_EDUS_PER_TRANSACTION: destination=self._destination,
pending_edus.extend(self._get_rr_edus(force_flush=True)) edu_type="m.presence",
content={
# END CRITICAL SECTION "push": [
format_user_presence_state(
success = yield self._transaction_manager.send_new_transaction( presence, self._clock.time_msec()
self._destination, pending_pdus, pending_edus )
) for presence in pending_presence.values()
if success: ]
sent_transactions_counter.inc() },
sent_edus_counter.inc(len(pending_edus)) )
for edu in pending_edus:
sent_edus_by_type.labels(edu.edu_type).inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if to_device_edus:
yield self._store.delete_device_msgs_for_remote(
self._destination, device_stream_id
) )
# also mark the device updates as sent pending_edus.extend(
if device_update_edus: self._pop_pending_edus(
logger.info( MAX_EDUS_PER_TRANSACTION - len(pending_edus)
"Marking as sent %r %r", self._destination, dev_list_id
) )
yield self._store.mark_as_sent_devices_by_remote( )
self._destination, dev_list_id while (
len(pending_edus) < MAX_EDUS_PER_TRANSACTION
and self._pending_edus_keyed
):
_, val = self._pending_edus_keyed.popitem()
pending_edus.append(val)
if pending_pdus:
logger.debug(
"TX [%s] len(pending_pdus_by_dest[dest]) = %d",
self._destination,
len(pending_pdus),
) )
self._last_device_stream_id = device_stream_id if not pending_pdus and not pending_edus:
self._last_device_list_stream_id = dev_list_id logger.debug("TX [%s] Nothing to send", self._destination)
else: self._last_device_stream_id = device_stream_id
break return
# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
pending_edus.extend(self._get_rr_edus(force_flush=True))
# END CRITICAL SECTION
success = yield self._transaction_manager.send_new_transaction(
self._destination, pending_pdus, pending_edus
)
if success:
sent_transactions_counter.inc()
sent_edus_counter.inc(len(pending_edus))
for edu in pending_edus:
sent_edus_by_type.labels(edu.edu_type).inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if to_device_edus:
yield self._store.delete_device_msgs_for_remote(
self._destination, device_stream_id
)
# also mark the device updates as sent
if device_update_edus:
logger.info(
"Marking as sent %r %r", self._destination, dev_list_id
)
yield self._store.mark_as_sent_devices_by_remote(
self._destination, dev_list_id
)
self._last_device_stream_id = device_stream_id
self._last_device_list_stream_id = dev_list_id
else:
break
except NotRetryingDestination as e: except NotRetryingDestination as e:
logger.debug( logger.debug(
"TX [%s] not ready for retry yet (next retry at %s) - " "TX [%s] not ready for retry yet (next retry at %s) - "

View file

@ -15,12 +15,14 @@
import logging import logging
from canonicaljson import json
from twisted.internet import defer from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.types import UserID, get_domain_from_id from synapse.types import UserID, get_domain_from_id
from synapse.util.stringutils import random_string from synapse.util.stringutils import random_string
import synapse.logging.opentracing as opentracing
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -102,14 +104,22 @@ class DeviceMessageHandler(object):
message_id = random_string(16) message_id = random_string(16)
context = {"opentracing": {}}
opentracing.inject_active_span_text_map(context["opentracing"])
remote_edu_contents = {} remote_edu_contents = {}
for destination, messages in remote_messages.items(): for destination, messages in remote_messages.items():
remote_edu_contents[destination] = { with opentracing.start_active_span("to_device_for_user"):
"messages": messages, opentracing.set_tag("destination", destination)
"sender": sender_user_id, remote_edu_contents[destination] = {
"type": message_type, "messages": messages,
"message_id": message_id, "sender": sender_user_id,
} "type": message_type,
"message_id": message_id,
"context": json.dumps(context)
if opentracing.whitelisted_homeserver(destination)
else "",
}
opentracing.log_kv(local_messages) opentracing.log_kv(local_messages)
stream_id = yield self.store.add_messages_to_device_inbox( stream_id = yield self.store.add_messages_to_device_inbox(

View file

@ -74,6 +74,7 @@ class DeviceWorkerStore(SQLBaseStore):
return {d["device_id"]: d for d in devices} return {d["device_id"]: d for d in devices}
@opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def get_devices_by_remote(self, destination, from_stream_id, limit): def get_devices_by_remote(self, destination, from_stream_id, limit):
"""Get stream of updates to send to remote servers """Get stream of updates to send to remote servers
@ -128,8 +129,10 @@ class DeviceWorkerStore(SQLBaseStore):
# (user_id, device_id) entries into a map, with the value being # (user_id, device_id) entries into a map, with the value being
# the max stream_id across each set of duplicate entries # the max stream_id across each set of duplicate entries
# #
# maps (user_id, device_id) -> stream_id # maps (user_id, device_id) -> (stream_id, context)
# as long as their stream_id does not match that of the last row # as long as their stream_id does not match that of the last row
# where context is any metadata about the message's context such as
# opentracing data
query_map = {} query_map = {}
for update in updates: for update in updates:
if stream_id_cutoff is not None and update[2] >= stream_id_cutoff: if stream_id_cutoff is not None and update[2] >= stream_id_cutoff:
@ -137,7 +140,7 @@ class DeviceWorkerStore(SQLBaseStore):
break break
key = (update[0], update[1]) key = (update[0], update[1])
query_map[key] = max(query_map.get(key, 0), update[2]) query_map[key] = (max(query_map.get(key, 0), update[2]), update[3])
# If we didn't find any updates with a stream_id lower than the cutoff, it # If we didn't find any updates with a stream_id lower than the cutoff, it
# means that there are more than limit updates all of which have the same # means that there are more than limit updates all of which have the same
@ -172,7 +175,7 @@ class DeviceWorkerStore(SQLBaseStore):
List: List of device updates List: List of device updates
""" """
sql = """ sql = """
SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes SELECT user_id, device_id, stream_id, context FROM device_lists_outbound_pokes
WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ? WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
ORDER BY stream_id ORDER BY stream_id
LIMIT ? LIMIT ?
@ -211,12 +214,15 @@ class DeviceWorkerStore(SQLBaseStore):
destination, user_id, from_stream_id destination, user_id, from_stream_id
) )
for device_id, device in iteritems(user_devices): for device_id, device in iteritems(user_devices):
stream_id = query_map[(user_id, device_id)] stream_id = query_map[(user_id, device_id)][0]
result = { result = {
"user_id": user_id, "user_id": user_id,
"device_id": device_id, "device_id": device_id,
"prev_id": [prev_id] if prev_id else [], "prev_id": [prev_id] if prev_id else [],
"stream_id": stream_id, "stream_id": stream_id,
"context": query_map[(user_id, device_id)][1]
if opentracing.whitelisted_homeserver(destination)
else "",
} }
prev_id = stream_id prev_id = stream_id
@ -819,6 +825,9 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
], ],
) )
context = {"opentracing": {}}
opentracing.inject_active_span_text_map(context["opentracing"])
self._simple_insert_many_txn( self._simple_insert_many_txn(
txn, txn,
table="device_lists_outbound_pokes", table="device_lists_outbound_pokes",
@ -830,6 +839,9 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
"device_id": device_id, "device_id": device_id,
"sent": False, "sent": False,
"ts": now, "ts": now,
"context": json.dumps(context)
if opentracing.whitelisted_homeserver(destination)
else "",
} }
for destination in hosts for destination in hosts
for device_id in device_ids for device_id in device_ids

View file

@ -0,0 +1,16 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C.d
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE device_lists_outbound_pokes ADD context TEXT;