The great logging/ migration

This commit is contained in:
Jorik Schellekens 2019-07-04 17:11:46 +01:00
parent 7ae7e796ff
commit bfc50050fd
11 changed files with 122 additions and 122 deletions

View file

@ -31,7 +31,7 @@ from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import measure_func from synapse.util.metrics import measure_func
from synapse.util.retryutils import NotRetryingDestination from synapse.util.retryutils import NotRetryingDestination
import synapse.util.tracerutils as tracerutils import synapse.logging.opentracing as opentracing
from ._base import BaseHandler from ._base import BaseHandler
@ -46,7 +46,7 @@ class DeviceWorkerHandler(BaseHandler):
self.state = hs.get_state_handler() self.state = hs.get_state_handler()
self._auth_handler = hs.get_auth_handler() self._auth_handler = hs.get_auth_handler()
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def get_devices_by_user(self, user_id): def get_devices_by_user(self, user_id):
""" """
@ -58,7 +58,7 @@ class DeviceWorkerHandler(BaseHandler):
defer.Deferred: list[dict[str, X]]: info on each device defer.Deferred: list[dict[str, X]]: info on each device
""" """
tracerutils.set_tag("user_id", user_id) opentracing.set_tag("user_id", user_id)
device_map = yield self.store.get_devices_by_user(user_id) device_map = yield self.store.get_devices_by_user(user_id)
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None) ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None)
@ -67,10 +67,10 @@ class DeviceWorkerHandler(BaseHandler):
for device in devices: for device in devices:
_update_device_from_client_ips(device, ips) _update_device_from_client_ips(device, ips)
tracerutils.log_kv(device_map) opentracing.log_kv(device_map)
return devices return devices
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def get_device(self, user_id, device_id): def get_device(self, user_id, device_id):
""" Retrieve the given device """ Retrieve the given device
@ -91,13 +91,13 @@ class DeviceWorkerHandler(BaseHandler):
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id) ips = yield self.store.get_last_client_ip_by_device(user_id, device_id)
_update_device_from_client_ips(device, ips) _update_device_from_client_ips(device, ips)
tracerutils.set_tag("device", device) opentracing.set_tag("device", device)
tracerutils.set_tag("ips", ips) opentracing.set_tag("ips", ips)
return device return device
@measure_func("device.get_user_ids_changed") @measure_func("device.get_user_ids_changed")
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def get_user_ids_changed(self, user_id, from_token): def get_user_ids_changed(self, user_id, from_token):
"""Get list of users that have had the devices updated, or have newly """Get list of users that have had the devices updated, or have newly
@ -108,8 +108,8 @@ class DeviceWorkerHandler(BaseHandler):
from_token (StreamToken) from_token (StreamToken)
""" """
tracerutils.set_tag("user_id", user_id) opentracing.set_tag("user_id", user_id)
tracerutils.set_tag("from_token", from_token) opentracing.set_tag("from_token", from_token)
now_room_key = yield self.store.get_room_events_max_id() now_room_key = yield self.store.get_room_events_max_id()
room_ids = yield self.store.get_rooms_for_user(user_id) room_ids = yield self.store.get_rooms_for_user(user_id)
@ -161,7 +161,7 @@ class DeviceWorkerHandler(BaseHandler):
# special-case for an empty prev state: include all members # special-case for an empty prev state: include all members
# in the changed list # in the changed list
if not event_ids: if not event_ids:
tracerutils.log_kv( opentracing.log_kv(
{"event": "encountered empty previous state", "room_id": room_id} {"event": "encountered empty previous state", "room_id": room_id}
) )
for key, event_id in iteritems(current_state_ids): for key, event_id in iteritems(current_state_ids):
@ -216,7 +216,7 @@ class DeviceWorkerHandler(BaseHandler):
possibly_joined = [] possibly_joined = []
possibly_left = [] possibly_left = []
tracerutils.log_kv( opentracing.log_kv(
{"changed": list(possibly_joined), "left": list(possibly_left)} {"changed": list(possibly_joined), "left": list(possibly_left)}
) )
@ -287,7 +287,7 @@ class DeviceHandler(DeviceWorkerHandler):
raise errors.StoreError(500, "Couldn't generate a device ID.") raise errors.StoreError(500, "Couldn't generate a device ID.")
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def delete_device(self, user_id, device_id): def delete_device(self, user_id, device_id):
""" Delete the given device """ Delete the given device
@ -305,8 +305,8 @@ class DeviceHandler(DeviceWorkerHandler):
except errors.StoreError as e: except errors.StoreError as e:
if e.code == 404: if e.code == 404:
# no match # no match
tracerutils.set_tag("error", True) opentracing.set_tag("error", True)
tracerutils.set_tag("reason", "User doesn't have that device id.") opentracing.set_tag("reason", "User doesn't have that device id.")
pass pass
else: else:
raise raise
@ -319,7 +319,7 @@ class DeviceHandler(DeviceWorkerHandler):
yield self.notify_device_update(user_id, [device_id]) yield self.notify_device_update(user_id, [device_id])
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def delete_all_devices_for_user(self, user_id, except_device_id=None): def delete_all_devices_for_user(self, user_id, except_device_id=None):
"""Delete all of the user's devices """Delete all of the user's devices
@ -355,8 +355,8 @@ class DeviceHandler(DeviceWorkerHandler):
except errors.StoreError as e: except errors.StoreError as e:
if e.code == 404: if e.code == 404:
# no match # no match
tracerutils.set_tag("error", True) opentracing.set_tag("error", True)
tracerutils.set_tag("reason", "User doesn't have that device id.") opentracing.set_tag("reason", "User doesn't have that device id.")
pass pass
else: else:
raise raise
@ -477,15 +477,15 @@ class DeviceListEduUpdater(object):
iterable=True, iterable=True,
) )
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def incoming_device_list_update(self, origin, edu_content): def incoming_device_list_update(self, origin, edu_content):
"""Called on incoming device list update from federation. Responsible """Called on incoming device list update from federation. Responsible
for parsing the EDU and adding to pending updates list. for parsing the EDU and adding to pending updates list.
""" """
tracerutils.set_tag("origin", origin) opentracing.set_tag("origin", origin)
tracerutils.set_tag("edu_content", edu_content) opentracing.set_tag("edu_content", edu_content)
user_id = edu_content.pop("user_id") user_id = edu_content.pop("user_id")
device_id = edu_content.pop("device_id") device_id = edu_content.pop("device_id")
stream_id = str(edu_content.pop("stream_id")) # They may come as ints stream_id = str(edu_content.pop("stream_id")) # They may come as ints
@ -506,8 +506,8 @@ class DeviceListEduUpdater(object):
if not room_ids: if not room_ids:
# We don't share any rooms with this user. Ignore update, as we # We don't share any rooms with this user. Ignore update, as we
# probably won't get any further updates. # probably won't get any further updates.
tracerutils.set_tag("error", True) opentracing.set_tag("error", True)
tracerutils.log_kv( opentracing.log_kv(
{ {
"message": "Got an update from a user which " "message": "Got an update from a user which "
+ "doesn't share a room with the current user." + "doesn't share a room with the current user."

View file

@ -20,7 +20,7 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.types import UserID, get_domain_from_id from synapse.types import UserID, get_domain_from_id
from synapse.util.stringutils import random_string from synapse.util.stringutils import random_string
import synapse.util.tracerutils as tracerutils import synapse.logging.opentracing as opentracing
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -77,7 +77,7 @@ class DeviceMessageHandler(object):
"to_device_key", stream_id, users=local_messages.keys() "to_device_key", stream_id, users=local_messages.keys()
) )
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def send_device_message(self, sender_user_id, message_type, messages): def send_device_message(self, sender_user_id, message_type, messages):
@ -111,7 +111,7 @@ class DeviceMessageHandler(object):
"message_id": message_id, "message_id": message_id,
} }
tracerutils.log_kv(local_messages) opentracing.log_kv(local_messages)
stream_id = yield self.store.add_messages_to_device_inbox( stream_id = yield self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents local_messages, remote_edu_contents
) )
@ -120,7 +120,7 @@ class DeviceMessageHandler(object):
"to_device_key", stream_id, users=local_messages.keys() "to_device_key", stream_id, users=local_messages.keys()
) )
tracerutils.log_kv(remote_messages) opentracing.log_kv(remote_messages)
for destination in remote_messages.keys(): for destination in remote_messages.keys():
# Enqueue a new federation transaction to send the new # Enqueue a new federation transaction to send the new
# device messages to each remote destination. # device messages to each remote destination.

View file

@ -15,7 +15,7 @@
# limitations under the License. # limitations under the License.
import logging import logging
import synapse.util.tracerutils as tracerutils import synapse.logging.opentracing as opentracing
from six import iteritems from six import iteritems
@ -46,7 +46,7 @@ class E2eKeysHandler(object):
"client_keys", self.on_federation_query_client_keys "client_keys", self.on_federation_query_client_keys
) )
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def query_devices(self, query_body, timeout): def query_devices(self, query_body, timeout):
""" Handle a device key query from a client """ Handle a device key query from a client
@ -82,8 +82,8 @@ class E2eKeysHandler(object):
else: else:
remote_queries[user_id] = device_ids remote_queries[user_id] = device_ids
tracerutils.set_tag("local_key_query", local_query) opentracing.set_tag("local_key_query", local_query)
tracerutils.set_tag("remote_key_query", remote_queries) opentracing.set_tag("remote_key_query", remote_queries)
# First get local devices. # First get local devices.
failures = {} failures = {}
@ -125,12 +125,12 @@ class E2eKeysHandler(object):
r[user_id] = remote_queries[user_id] r[user_id] = remote_queries[user_id]
# Now fetch any devices that we don't have in our cache # Now fetch any devices that we don't have in our cache
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def do_remote_query(destination): def do_remote_query(destination):
destination_query = remote_queries_not_in_cache[destination] destination_query = remote_queries_not_in_cache[destination]
tracerutils.set_tag("key_query", destination_query) opentracing.set_tag("key_query", destination_query)
try: try:
remote_result = yield self.federation.query_client_keys( remote_result = yield self.federation.query_client_keys(
destination, {"device_keys": destination_query}, timeout=timeout destination, {"device_keys": destination_query}, timeout=timeout
@ -143,8 +143,8 @@ class E2eKeysHandler(object):
except Exception as e: except Exception as e:
failure = _exception_to_failure(e) failure = _exception_to_failure(e)
failures[destination] = failure failures[destination] = failure
tracerutils.set_tag("error", True) opentracing.set_tag("error", True)
tracerutils.set_tag("reason", failure) opentracing.set_tag("reason", failure)
yield make_deferred_yieldable( yield make_deferred_yieldable(
defer.gatherResults( defer.gatherResults(
@ -158,7 +158,7 @@ class E2eKeysHandler(object):
return {"device_keys": results, "failures": failures} return {"device_keys": results, "failures": failures}
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def query_local_devices(self, query): def query_local_devices(self, query):
"""Get E2E device keys for local users """Get E2E device keys for local users
@ -171,7 +171,7 @@ class E2eKeysHandler(object):
defer.Deferred: (resolves to dict[string, dict[string, dict]]): defer.Deferred: (resolves to dict[string, dict[string, dict]]):
map from user_id -> device_id -> device details map from user_id -> device_id -> device details
""" """
tracerutils.set_tag("local_query", query) opentracing.set_tag("local_query", query)
local_query = [] local_query = []
result_dict = {} result_dict = {}
@ -179,14 +179,14 @@ class E2eKeysHandler(object):
# we use UserID.from_string to catch invalid user ids # we use UserID.from_string to catch invalid user ids
if not self.is_mine(UserID.from_string(user_id)): if not self.is_mine(UserID.from_string(user_id)):
logger.warning("Request for keys for non-local user %s", user_id) logger.warning("Request for keys for non-local user %s", user_id)
tracerutils.log_kv( opentracing.log_kv(
{ {
"message": "Requested a local key for a user which" "message": "Requested a local key for a user which"
+ " was not local to the homeserver", + " was not local to the homeserver",
"user_id": user_id, "user_id": user_id,
} }
) )
tracerutils.set_tag("error", True) opentracing.set_tag("error", True)
raise SynapseError(400, "Not a user here") raise SynapseError(400, "Not a user here")
if not device_ids: if not device_ids:
@ -211,7 +211,7 @@ class E2eKeysHandler(object):
r["unsigned"]["device_display_name"] = display_name r["unsigned"]["device_display_name"] = display_name
result_dict[user_id][device_id] = r result_dict[user_id][device_id] = r
tracerutils.log_kv(results) opentracing.log_kv(results)
return result_dict return result_dict
@defer.inlineCallbacks @defer.inlineCallbacks
@ -222,7 +222,7 @@ class E2eKeysHandler(object):
res = yield self.query_local_devices(device_keys_query) res = yield self.query_local_devices(device_keys_query)
return {"device_keys": res} return {"device_keys": res}
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def claim_one_time_keys(self, query, timeout): def claim_one_time_keys(self, query, timeout):
local_query = [] local_query = []
@ -237,8 +237,8 @@ class E2eKeysHandler(object):
domain = get_domain_from_id(user_id) domain = get_domain_from_id(user_id)
remote_queries.setdefault(domain, {})[user_id] = device_keys remote_queries.setdefault(domain, {})[user_id] = device_keys
tracerutils.set_tag("local_key_query", local_query) opentracing.set_tag("local_key_query", local_query)
tracerutils.set_tag("remote_key_query", remote_queries) opentracing.set_tag("remote_key_query", remote_queries)
results = yield self.store.claim_e2e_one_time_keys(local_query) results = yield self.store.claim_e2e_one_time_keys(local_query)
@ -251,10 +251,10 @@ class E2eKeysHandler(object):
key_id: json.loads(json_bytes) key_id: json.loads(json_bytes)
} }
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def claim_client_keys(destination): def claim_client_keys(destination):
tracerutils.set_tag("destination", destination) opentracing.set_tag("destination", destination)
device_keys = remote_queries[destination] device_keys = remote_queries[destination]
try: try:
remote_result = yield self.federation.claim_client_keys( remote_result = yield self.federation.claim_client_keys(
@ -267,8 +267,8 @@ class E2eKeysHandler(object):
except Exception as e: except Exception as e:
failure = _exception_to_failure(e) failure = _exception_to_failure(e)
failures[destination] = failure failures[destination] = failure
tracerutils.set_tag("error", True) opentracing.set_tag("error", True)
tracerutils.set_tag("reason", failure) opentracing.set_tag("reason", failure)
yield make_deferred_yieldable( yield make_deferred_yieldable(
defer.gatherResults( defer.gatherResults(
@ -292,21 +292,21 @@ class E2eKeysHandler(object):
), ),
) )
tracerutils.log_kv({"one_time_keys": json_result, "failures": failures}) opentracing.log_kv({"one_time_keys": json_result, "failures": failures})
return {"one_time_keys": json_result, "failures": failures} return {"one_time_keys": json_result, "failures": failures}
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def upload_keys_for_user(self, user_id, device_id, keys): def upload_keys_for_user(self, user_id, device_id, keys):
tracerutils.set_tag("user_id", user_id) opentracing.set_tag("user_id", user_id)
tracerutils.set_tag("device_id", device_id) opentracing.set_tag("device_id", device_id)
tracerutils.set_tag("keys", keys) opentracing.set_tag("keys", keys)
time_now = self.clock.time_msec() time_now = self.clock.time_msec()
# TODO: Validate the JSON to make sure it has the right keys. # TODO: Validate the JSON to make sure it has the right keys.
device_keys = keys.get("device_keys", None) device_keys = keys.get("device_keys", None)
tracerutils.set_tag("device_keys", device_keys) opentracing.set_tag("device_keys", device_keys)
if device_keys: if device_keys:
logger.info( logger.info(
"Updating device_keys for device %r for user %s at %d", "Updating device_keys for device %r for user %s at %d",
@ -328,7 +328,7 @@ class E2eKeysHandler(object):
user_id, device_id, time_now, one_time_keys user_id, device_id, time_now, one_time_keys
) )
else: else:
tracerutils.log_kv( opentracing.log_kv(
{"event": "did not upload one_time_keys", "reason": "no keys given"} {"event": "did not upload one_time_keys", "reason": "no keys given"}
) )
@ -341,10 +341,10 @@ class E2eKeysHandler(object):
result = yield self.store.count_e2e_one_time_keys(user_id, device_id) result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
tracerutils.set_tag("one_time_key_counts", result) opentracing.set_tag("one_time_key_counts", result)
return {"one_time_key_counts": result} return {"one_time_key_counts": result}
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def _upload_one_time_keys_for_user( def _upload_one_time_keys_for_user(
self, user_id, device_id, time_now, one_time_keys self, user_id, device_id, time_now, one_time_keys

View file

@ -27,7 +27,7 @@ from synapse.api.errors import (
SynapseError, SynapseError,
) )
from synapse.util.async_helpers import Linearizer from synapse.util.async_helpers import Linearizer
import synapse.util.tracerutils as tracerutils import synapse.logging.opentracing as opentracing
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -50,7 +50,7 @@ class E2eRoomKeysHandler(object):
# changed. # changed.
self._upload_linearizer = Linearizer("upload_room_keys_lock") self._upload_linearizer = Linearizer("upload_room_keys_lock")
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def get_room_keys(self, user_id, version, room_id=None, session_id=None): def get_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given """Bulk get the E2E room keys for a given backup, optionally filtered to a given
@ -86,10 +86,10 @@ class E2eRoomKeysHandler(object):
user_id, version, room_id, session_id user_id, version, room_id, session_id
) )
tracerutils.log_kv(results) opentracing.log_kv(results)
return results return results
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def delete_room_keys(self, user_id, version, room_id=None, session_id=None): def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given """Bulk delete the E2E room keys for a given backup, optionally filtered to a given
@ -111,7 +111,7 @@ class E2eRoomKeysHandler(object):
with (yield self._upload_linearizer.queue(user_id)): with (yield self._upload_linearizer.queue(user_id)):
yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id) yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def upload_room_keys(self, user_id, version, room_keys): def upload_room_keys(self, user_id, version, room_keys):
"""Bulk upload a list of room keys into a given backup version, asserting """Bulk upload a list of room keys into a given backup version, asserting
@ -179,7 +179,7 @@ class E2eRoomKeysHandler(object):
user_id, version, room_id, session_id, session user_id, version, room_id, session_id, session
) )
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def _upload_room_key(self, user_id, version, room_id, session_id, room_key): def _upload_room_key(self, user_id, version, room_id, session_id, room_key):
"""Upload a given room_key for a given room and session into a given """Upload a given room_key for a given room and session into a given
@ -242,7 +242,7 @@ class E2eRoomKeysHandler(object):
return False return False
return True return True
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def create_version(self, user_id, version_info): def create_version(self, user_id, version_info):
"""Create a new backup version. This automatically becomes the new """Create a new backup version. This automatically becomes the new
@ -301,7 +301,7 @@ class E2eRoomKeysHandler(object):
raise raise
return res return res
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def delete_version(self, user_id, version=None): def delete_version(self, user_id, version=None):
"""Deletes a given version of the user's e2e_room_keys backup """Deletes a given version of the user's e2e_room_keys backup
@ -322,7 +322,7 @@ class E2eRoomKeysHandler(object):
else: else:
raise raise
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def update_version(self, user_id, version, version_info): def update_version(self, user_id, version, version_info):
"""Update the info about a given version of the user's backup """Update the info about a given version of the user's backup

View file

@ -25,7 +25,7 @@ from synapse.http.servlet import (
parse_string, parse_string,
) )
from synapse.types import StreamToken from synapse.types import StreamToken
import synapse.util.tracerutils as tracerutils import synapse.logging.opentracing as opentracing
from ._base import client_patterns from ._base import client_patterns
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -68,7 +68,7 @@ class KeyUploadServlet(RestServlet):
self.auth = hs.get_auth() self.auth = hs.get_auth()
self.e2e_keys_handler = hs.get_e2e_keys_handler() self.e2e_keys_handler = hs.get_e2e_keys_handler()
@tracerutils.trace_defered_function_using_operation_name("upload_keys") @opentracing.trace_defered_function_using_operation_name("upload_keys")
@defer.inlineCallbacks @defer.inlineCallbacks
def on_POST(self, request, device_id): def on_POST(self, request, device_id):
requester = yield self.auth.get_user_by_req(request, allow_guest=True) requester = yield self.auth.get_user_by_req(request, allow_guest=True)
@ -79,8 +79,8 @@ class KeyUploadServlet(RestServlet):
# passing the device_id here is deprecated; however, we allow it # passing the device_id here is deprecated; however, we allow it
# for now for compatibility with older clients. # for now for compatibility with older clients.
if requester.device_id is not None and device_id != requester.device_id: if requester.device_id is not None and device_id != requester.device_id:
tracerutils.set_tag("error", True) opentracing.set_tag("error", True)
tracerutils.log_kv( opentracing.log_kv(
{ {
"message": "Client uploading keys for a different device", "message": "Client uploading keys for a different device",
"logged_in_id": requester.device_id, "logged_in_id": requester.device_id,

View file

@ -23,7 +23,7 @@ from synapse.http.servlet import (
parse_json_object_from_request, parse_json_object_from_request,
parse_string, parse_string,
) )
import synapse.util.tracerutils as tracerutils import synapse.logging.opentracing as opentracing
from ._base import client_patterns from ._base import client_patterns
@ -312,7 +312,7 @@ class RoomKeysVersionServlet(RestServlet):
self.auth = hs.get_auth() self.auth = hs.get_auth()
self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler() self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler()
@tracerutils.trace_defered_function_using_operation_name("get_room_keys_version") @opentracing.trace_defered_function_using_operation_name("get_room_keys_version")
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request, version): def on_GET(self, request, version):
""" """

View file

@ -20,7 +20,7 @@ from twisted.internet import defer
from synapse.http import servlet from synapse.http import servlet
from synapse.http.servlet import parse_json_object_from_request from synapse.http.servlet import parse_json_object_from_request
from synapse.rest.client.transactions import HttpTransactionCache from synapse.rest.client.transactions import HttpTransactionCache
import synapse.util.tracerutils as tracerutils import synapse.logging.opentracing as opentracing
from ._base import client_patterns from ._base import client_patterns
@ -43,10 +43,10 @@ class SendToDeviceRestServlet(servlet.RestServlet):
self.txns = HttpTransactionCache(hs) self.txns = HttpTransactionCache(hs)
self.device_message_handler = hs.get_device_message_handler() self.device_message_handler = hs.get_device_message_handler()
@tracerutils.trace_function_using_operation_name("sendToDevice") @opentracing.trace_function_using_operation_name("sendToDevice")
def on_PUT(self, request, message_type, txn_id): def on_PUT(self, request, message_type, txn_id):
tracerutils.set_tag("message_type", message_type) opentracing.set_tag("message_type", message_type)
tracerutils.set_tag("txn_id", txn_id) opentracing.set_tag("txn_id", txn_id)
return self.txns.fetch_or_execute_request( return self.txns.fetch_or_execute_request(
request, self._put, request, message_type, txn_id request, self._put, request, message_type, txn_id
) )

View file

@ -22,7 +22,7 @@ from twisted.internet import defer
from synapse.storage._base import SQLBaseStore from synapse.storage._base import SQLBaseStore
from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.expiringcache import ExpiringCache
import synapse.util.tracerutils as tracerutils import synapse.logging.opentracing as opentracing
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -73,7 +73,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
"get_new_messages_for_device", get_new_messages_for_device_txn "get_new_messages_for_device", get_new_messages_for_device_txn
) )
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def delete_messages_for_device(self, user_id, device_id, up_to_stream_id): def delete_messages_for_device(self, user_id, device_id, up_to_stream_id):
""" """
@ -90,14 +90,14 @@ class DeviceInboxWorkerStore(SQLBaseStore):
(user_id, device_id), None (user_id, device_id), None
) )
tracerutils.set_tag("last_deleted_stream_id", last_deleted_stream_id) opentracing.set_tag("last_deleted_stream_id", last_deleted_stream_id)
if last_deleted_stream_id: if last_deleted_stream_id:
has_changed = self._device_inbox_stream_cache.has_entity_changed( has_changed = self._device_inbox_stream_cache.has_entity_changed(
user_id, last_deleted_stream_id user_id, last_deleted_stream_id
) )
if not has_changed: if not has_changed:
tracerutils.log_kv({"message": "No changes in cache since last check"}) opentracing.log_kv({"message": "No changes in cache since last check"})
return 0 return 0
def delete_messages_for_device_txn(txn): def delete_messages_for_device_txn(txn):
@ -113,7 +113,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
"delete_messages_for_device", delete_messages_for_device_txn "delete_messages_for_device", delete_messages_for_device_txn
) )
tracerutils.log_kv( opentracing.log_kv(
{"message": "deleted {} messages for device".format(count), "count": count} {"message": "deleted {} messages for device".format(count), "count": count}
) )
@ -127,7 +127,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
return count return count
@tracerutils.trace_function @opentracing.trace_function
def get_new_device_msgs_for_remote( def get_new_device_msgs_for_remote(
self, destination, last_stream_id, current_stream_id, limit self, destination, last_stream_id, current_stream_id, limit
): ):
@ -143,23 +143,23 @@ class DeviceInboxWorkerStore(SQLBaseStore):
in the stream the messages got to. in the stream the messages got to.
""" """
tracerutils.set_tag("destination", destination) opentracing.set_tag("destination", destination)
tracerutils.set_tag("last_stream_id", last_stream_id) opentracing.set_tag("last_stream_id", last_stream_id)
tracerutils.set_tag("current_stream_id", current_stream_id) opentracing.set_tag("current_stream_id", current_stream_id)
tracerutils.set_tag("limit", limit) opentracing.set_tag("limit", limit)
has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( has_changed = self._device_federation_outbox_stream_cache.has_entity_changed(
destination, last_stream_id destination, last_stream_id
) )
if not has_changed or last_stream_id == current_stream_id: if not has_changed or last_stream_id == current_stream_id:
tracerutils.log_kv({"message": "No new messages in stream"}) opentracing.log_kv({"message": "No new messages in stream"})
return defer.succeed(([], current_stream_id)) return defer.succeed(([], current_stream_id))
if limit <= 0: if limit <= 0:
# This can happen if we run out of room for EDUs in the transaction. # This can happen if we run out of room for EDUs in the transaction.
return defer.succeed(([], last_stream_id)) return defer.succeed(([], last_stream_id))
@tracerutils.trace_function @opentracing.trace_function
def get_new_messages_for_remote_destination_txn(txn): def get_new_messages_for_remote_destination_txn(txn):
sql = ( sql = (
"SELECT stream_id, messages_json FROM device_federation_outbox" "SELECT stream_id, messages_json FROM device_federation_outbox"
@ -174,7 +174,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
stream_pos = row[0] stream_pos = row[0]
messages.append(json.loads(row[1])) messages.append(json.loads(row[1]))
if len(messages) < limit: if len(messages) < limit:
tracerutils.log_kv( opentracing.log_kv(
{"message": "Set stream position to current position"} {"message": "Set stream position to current position"}
) )
stream_pos = current_stream_id stream_pos = current_stream_id
@ -185,7 +185,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
get_new_messages_for_remote_destination_txn, get_new_messages_for_remote_destination_txn,
) )
@tracerutils.trace_defered_function @opentracing.trace_defered_function
def delete_device_msgs_for_remote(self, destination, up_to_stream_id): def delete_device_msgs_for_remote(self, destination, up_to_stream_id):
"""Used to delete messages when the remote destination acknowledges """Used to delete messages when the remote destination acknowledges
their receipt. their receipt.
@ -236,7 +236,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
expiry_ms=30 * 60 * 1000, expiry_ms=30 * 60 * 1000,
) )
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def add_messages_to_device_inbox( def add_messages_to_device_inbox(
self, local_messages_by_user_then_device, remote_messages_by_destination self, local_messages_by_user_then_device, remote_messages_by_destination

View file

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging import logging
import synapse.util.tracerutils as tracerutils import synapse.logging.opentracing as opentracing
from six import iteritems from six import iteritems
@ -300,7 +300,7 @@ class DeviceWorkerStore(SQLBaseStore):
def get_device_stream_token(self): def get_device_stream_token(self):
return self._device_list_id_gen.get_current_token() return self._device_list_id_gen.get_current_token()
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def get_user_devices_from_cache(self, query_list): def get_user_devices_from_cache(self, query_list):
"""Get the devices (and keys if any) for remote users from the cache. """Get the devices (and keys if any) for remote users from the cache.
@ -332,8 +332,8 @@ class DeviceWorkerStore(SQLBaseStore):
else: else:
results[user_id] = yield self._get_cached_devices_for_user(user_id) results[user_id] = yield self._get_cached_devices_for_user(user_id)
tracerutils.set_tag("in_cache", results) opentracing.set_tag("in_cache", results)
tracerutils.set_tag("not_in_cache", user_ids_not_in_cache) opentracing.set_tag("not_in_cache", user_ids_not_in_cache)
return (user_ids_not_in_cache, results) return (user_ids_not_in_cache, results)

View file

@ -18,7 +18,7 @@ import json
from twisted.internet import defer from twisted.internet import defer
from synapse.api.errors import StoreError from synapse.api.errors import StoreError
from synapse.util.tracerutils import trace_defered_function from synapse.logging.opentracing import trace_defered_function
from ._base import SQLBaseStore from ._base import SQLBaseStore

View file

@ -22,11 +22,11 @@ from synapse.util.caches.descriptors import cached
from ._base import SQLBaseStore, db_to_json from ._base import SQLBaseStore, db_to_json
import synapse.util.tracerutils as tracerutils import synapse.logging.opentracing as opentracing
class EndToEndKeyWorkerStore(SQLBaseStore): class EndToEndKeyWorkerStore(SQLBaseStore):
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def get_e2e_device_keys( def get_e2e_device_keys(
self, query_list, include_all_devices=False, include_deleted_devices=False self, query_list, include_all_devices=False, include_deleted_devices=False
@ -43,7 +43,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
Dict mapping from user-id to dict mapping from device_id to Dict mapping from user-id to dict mapping from device_id to
dict containing "key_json", "device_display_name". dict containing "key_json", "device_display_name".
""" """
tracerutils.set_tag("query_list", query_list) opentracing.set_tag("query_list", query_list)
if not query_list: if not query_list:
return {} return {}
@ -61,12 +61,12 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
return results return results
@tracerutils.trace_function @opentracing.trace_function
def _get_e2e_device_keys_txn( def _get_e2e_device_keys_txn(
self, txn, query_list, include_all_devices=False, include_deleted_devices=False self, txn, query_list, include_all_devices=False, include_deleted_devices=False
): ):
tracerutils.set_tag("include_all_devices", include_all_devices) opentracing.set_tag("include_all_devices", include_all_devices)
tracerutils.set_tag("include_deleted_devices", include_deleted_devices) opentracing.set_tag("include_deleted_devices", include_deleted_devices)
query_clauses = [] query_clauses = []
query_params = [] query_params = []
@ -112,10 +112,10 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
for user_id, device_id in deleted_devices: for user_id, device_id in deleted_devices:
result.setdefault(user_id, {})[device_id] = None result.setdefault(user_id, {})[device_id] = None
tracerutils.log_kv(result) opentracing.log_kv(result)
return result return result
@tracerutils.trace_defered_function @opentracing.trace_defered_function
@defer.inlineCallbacks @defer.inlineCallbacks
def get_e2e_one_time_keys(self, user_id, device_id, key_ids): def get_e2e_one_time_keys(self, user_id, device_id, key_ids):
"""Retrieve a number of one-time keys for a user """Retrieve a number of one-time keys for a user
@ -131,9 +131,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
key_id) to json string for key key_id) to json string for key
""" """
tracerutils.set_tag("user_id", user_id) opentracing.set_tag("user_id", user_id)
tracerutils.set_tag("device_id", device_id) opentracing.set_tag("device_id", device_id)
tracerutils.set_tag("key_ids", key_ids) opentracing.set_tag("key_ids", key_ids)
rows = yield self._simple_select_many_batch( rows = yield self._simple_select_many_batch(
table="e2e_one_time_keys_json", table="e2e_one_time_keys_json",
@ -159,11 +159,11 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
(algorithm, key_id, key json) (algorithm, key_id, key json)
""" """
@tracerutils.trace_function @opentracing.trace_function
def _add_e2e_one_time_keys(txn): def _add_e2e_one_time_keys(txn):
tracerutils.set_tag("user_id", user_id) opentracing.set_tag("user_id", user_id)
tracerutils.set_tag("device_id", device_id) opentracing.set_tag("device_id", device_id)
tracerutils.set_tag("new_keys", new_keys) opentracing.set_tag("new_keys", new_keys)
# We are protected from race between lookup and insertion due to # We are protected from race between lookup and insertion due to
# a unique constraint. If there is a race of two calls to # a unique constraint. If there is a race of two calls to
# `add_e2e_one_time_keys` then they'll conflict and we will only # `add_e2e_one_time_keys` then they'll conflict and we will only
@ -219,12 +219,12 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
or the keys were already in the database. or the keys were already in the database.
""" """
@tracerutils.trace_function @opentracing.trace_function
def _set_e2e_device_keys_txn(txn): def _set_e2e_device_keys_txn(txn):
tracerutils.set_tag("user_id", user_id) opentracing.set_tag("user_id", user_id)
tracerutils.set_tag("device_id", device_id) opentracing.set_tag("device_id", device_id)
tracerutils.set_tag("time_now", time_now) opentracing.set_tag("time_now", time_now)
tracerutils.set_tag("device_keys", device_keys) opentracing.set_tag("device_keys", device_keys)
old_key_json = self._simple_select_one_onecol_txn( old_key_json = self._simple_select_one_onecol_txn(
txn, txn,
@ -239,7 +239,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
new_key_json = encode_canonical_json(device_keys).decode("utf-8") new_key_json = encode_canonical_json(device_keys).decode("utf-8")
if old_key_json == new_key_json: if old_key_json == new_key_json:
tracerutils.log_kv({"event", "key already stored"}) opentracing.log_kv({"event", "key already stored"})
return False return False
self._simple_upsert_txn( self._simple_upsert_txn(
@ -256,7 +256,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
def claim_e2e_one_time_keys(self, query_list): def claim_e2e_one_time_keys(self, query_list):
"""Take a list of one time keys out of the database""" """Take a list of one time keys out of the database"""
@tracerutils.trace_function @opentracing.trace_function
def _claim_e2e_one_time_keys(txn): def _claim_e2e_one_time_keys(txn):
sql = ( sql = (
"SELECT key_id, key_json FROM e2e_one_time_keys_json" "SELECT key_id, key_json FROM e2e_one_time_keys_json"
@ -278,11 +278,11 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
" AND key_id = ?" " AND key_id = ?"
) )
for user_id, device_id, algorithm, key_id in delete: for user_id, device_id, algorithm, key_id in delete:
tracerutils.log_kv( opentracing.log_kv(
{"message": "executing claim transaction on database"} {"message": "executing claim transaction on database"}
) )
txn.execute(sql, (user_id, device_id, algorithm, key_id)) txn.execute(sql, (user_id, device_id, algorithm, key_id))
tracerutils.log_kv( opentracing.log_kv(
{"message": "finished executing and invalidating cache"} {"message": "finished executing and invalidating cache"}
) )
self._invalidate_cache_and_stream( self._invalidate_cache_and_stream(
@ -293,10 +293,10 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
return self.runInteraction("claim_e2e_one_time_keys", _claim_e2e_one_time_keys) return self.runInteraction("claim_e2e_one_time_keys", _claim_e2e_one_time_keys)
def delete_e2e_keys_by_device(self, user_id, device_id): def delete_e2e_keys_by_device(self, user_id, device_id):
@tracerutils.trace_function @opentracing.trace_function
def delete_e2e_keys_by_device_txn(txn): def delete_e2e_keys_by_device_txn(txn):
tracerutils.set_tag("user_id", user_id) opentracing.set_tag("user_id", user_id)
tracerutils.set_tag("device_id", device_id) opentracing.set_tag("device_id", device_id)
self._simple_delete_txn( self._simple_delete_txn(
txn, txn,
table="e2e_device_keys_json", table="e2e_device_keys_json",