From 565544b6039c619cc4e9af8ad7f04793c4525b70 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Fri, 28 Jun 2019 10:34:53 +0100 Subject: [PATCH] Trace e2e --- synapse/handlers/e2e_keys.py | 50 ++++++++++++++++++++++++++-- synapse/handlers/e2e_room_keys.py | 10 ++++++ synapse/rest/client/v2_alpha/keys.py | 13 ++++++++ synapse/storage/e2e_room_keys.py | 9 +++++ synapse/storage/end_to_end_keys.py | 35 +++++++++++++++++++ 5 files changed, 115 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 1300b540e3..8b2249fc5b 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -15,6 +15,7 @@ # limitations under the License. import logging +from synapse.util.tracerutils import TracerUtil, trace_defered_function from six import iteritems @@ -45,6 +46,7 @@ class E2eKeysHandler(object): "client_keys", self.on_federation_query_client_keys ) + @trace_defered_function @defer.inlineCallbacks def query_devices(self, query_body, timeout): """ Handle a device key query from a client @@ -65,6 +67,7 @@ class E2eKeysHandler(object): } } """ + device_keys_query = query_body.get("device_keys", {}) # separate users by domain. @@ -79,6 +82,9 @@ class E2eKeysHandler(object): else: remote_queries[user_id] = device_ids + TracerUtil.set_tag("local_key_query", local_query) + TracerUtil.set_tag("remote_key_query", remote_queries) + # First get local devices. failures = {} results = {} @@ -119,9 +125,12 @@ class E2eKeysHandler(object): r[user_id] = remote_queries[user_id] # Now fetch any devices that we don't have in our cache + @trace_defered_function @defer.inlineCallbacks def do_remote_query(destination): destination_query = remote_queries_not_in_cache[destination] + + TracerUtil.set_tag("key_query", destination_query) try: remote_result = yield self.federation.query_client_keys( destination, {"device_keys": destination_query}, timeout=timeout @@ -132,7 +141,10 @@ class E2eKeysHandler(object): results[user_id] = keys except Exception as e: - failures[destination] = _exception_to_failure(e) + failure = _exception_to_failure(e) + failures[destination] = failure + TracerUtil.set_tag("error", True) + TracerUtil.set_tag("reason", failure) yield make_deferred_yieldable( defer.gatherResults( @@ -146,6 +158,7 @@ class E2eKeysHandler(object): return {"device_keys": results, "failures": failures} + @trace_defered_function @defer.inlineCallbacks def query_local_devices(self, query): """Get E2E device keys for local users @@ -158,6 +171,7 @@ class E2eKeysHandler(object): defer.Deferred: (resolves to dict[string, dict[string, dict]]): map from user_id -> device_id -> device details """ + TracerUtil.set_tag("local_query", query) local_query = [] result_dict = {} @@ -165,6 +179,14 @@ class E2eKeysHandler(object): # we use UserID.from_string to catch invalid user ids if not self.is_mine(UserID.from_string(user_id)): logger.warning("Request for keys for non-local user %s", user_id) + TracerUtil.log_kv( + { + "message": "Requested a local key for a user which" + + " was not local to the homeserver", + "user_id": user_id, + } + ) + TracerUtil.set_tag("error", True) raise SynapseError(400, "Not a user here") if not device_ids: @@ -189,6 +211,7 @@ class E2eKeysHandler(object): r["unsigned"]["device_display_name"] = display_name result_dict[user_id][device_id] = r + TracerUtil.log_kv(results) return result_dict @defer.inlineCallbacks @@ -199,6 +222,7 @@ class E2eKeysHandler(object): res = yield self.query_local_devices(device_keys_query) return {"device_keys": res} + @trace_defered_function @defer.inlineCallbacks def claim_one_time_keys(self, query, timeout): local_query = [] @@ -213,6 +237,9 @@ class E2eKeysHandler(object): domain = get_domain_from_id(user_id) remote_queries.setdefault(domain, {})[user_id] = device_keys + TracerUtil.set_tag("local_key_query", local_query) + TracerUtil.set_tag("remote_key_query", remote_queries) + results = yield self.store.claim_e2e_one_time_keys(local_query) json_result = {} @@ -224,8 +251,10 @@ class E2eKeysHandler(object): key_id: json.loads(json_bytes) } + @trace_defered_function @defer.inlineCallbacks def claim_client_keys(destination): + TracerUtil.set_tag("destination", destination) device_keys = remote_queries[destination] try: remote_result = yield self.federation.claim_client_keys( @@ -234,8 +263,12 @@ class E2eKeysHandler(object): for user_id, keys in remote_result["one_time_keys"].items(): if user_id in device_keys: json_result[user_id] = keys + except Exception as e: - failures[destination] = _exception_to_failure(e) + failure = _exception_to_failure(e) + failures[destination] = failure + TracerUtil.set_tag("error", True) + TracerUtil.set_tag("reason", failure) yield make_deferred_yieldable( defer.gatherResults( @@ -259,14 +292,21 @@ class E2eKeysHandler(object): ), ) + TracerUtil.log_kv({"one_time_keys": json_result, "failures": failures}) return {"one_time_keys": json_result, "failures": failures} + @trace_defered_function @defer.inlineCallbacks def upload_keys_for_user(self, user_id, device_id, keys): + TracerUtil.set_tag("user_id", user_id) + TracerUtil.set_tag("device_id", device_id) + TracerUtil.set_tag("keys", keys) + time_now = self.clock.time_msec() # TODO: Validate the JSON to make sure it has the right keys. device_keys = keys.get("device_keys", None) + TracerUtil.set_tag("device_keys", device_keys) if device_keys: logger.info( "Updating device_keys for device %r for user %s at %d", @@ -287,6 +327,10 @@ class E2eKeysHandler(object): yield self._upload_one_time_keys_for_user( user_id, device_id, time_now, one_time_keys ) + else: + TracerUtil.log_kv( + {"event": "did not upload one_time_keys", "reason": "no keys given"} + ) # the device should have been registered already, but it may have been # deleted due to a race with a DELETE request. Or we may be using an @@ -297,8 +341,10 @@ class E2eKeysHandler(object): result = yield self.store.count_e2e_one_time_keys(user_id, device_id) + TracerUtil.set_tag("one_time_key_counts", result) return {"one_time_key_counts": result} + @trace_defered_function @defer.inlineCallbacks def _upload_one_time_keys_for_user( self, user_id, device_id, time_now, one_time_keys diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py index 41b871fc59..06837bb126 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py @@ -27,6 +27,7 @@ from synapse.api.errors import ( SynapseError, ) from synapse.util.async_helpers import Linearizer +from synapse.util.tracerutils import TracerUtil, trace_defered_function logger = logging.getLogger(__name__) @@ -49,6 +50,7 @@ class E2eRoomKeysHandler(object): # changed. self._upload_linearizer = Linearizer("upload_room_keys_lock") + @trace_defered_function @defer.inlineCallbacks def get_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk get the E2E room keys for a given backup, optionally filtered to a given @@ -84,8 +86,10 @@ class E2eRoomKeysHandler(object): user_id, version, room_id, session_id ) + TracerUtil.log_kv(results) return results + @trace_defered_function @defer.inlineCallbacks def delete_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk delete the E2E room keys for a given backup, optionally filtered to a given @@ -107,6 +111,7 @@ class E2eRoomKeysHandler(object): with (yield self._upload_linearizer.queue(user_id)): yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id) + @trace_defered_function @defer.inlineCallbacks def upload_room_keys(self, user_id, version, room_keys): """Bulk upload a list of room keys into a given backup version, asserting @@ -174,6 +179,7 @@ class E2eRoomKeysHandler(object): user_id, version, room_id, session_id, session ) + @trace_defered_function @defer.inlineCallbacks def _upload_room_key(self, user_id, version, room_id, session_id, room_key): """Upload a given room_key for a given room and session into a given @@ -236,6 +242,7 @@ class E2eRoomKeysHandler(object): return False return True + @trace_defered_function @defer.inlineCallbacks def create_version(self, user_id, version_info): """Create a new backup version. This automatically becomes the new @@ -264,6 +271,7 @@ class E2eRoomKeysHandler(object): ) return new_version + @trace_defered_function @defer.inlineCallbacks def get_version_info(self, user_id, version=None): """Get the info about a given version of the user's backup @@ -294,6 +302,7 @@ class E2eRoomKeysHandler(object): raise return res + @trace_defered_function @defer.inlineCallbacks def delete_version(self, user_id, version=None): """Deletes a given version of the user's e2e_room_keys backup @@ -314,6 +323,7 @@ class E2eRoomKeysHandler(object): else: raise + @trace_defered_function @defer.inlineCallbacks def update_version(self, user_id, version, version_info): """Update the info about a given version of the user's backup diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 6008adec7c..8de9e12df4 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -25,6 +25,10 @@ from synapse.http.servlet import ( parse_string, ) from synapse.types import StreamToken +from synapse.util.tracerutils import ( + TracerUtil, + trace_defered_function_using_operation_name, +) from ._base import client_patterns @@ -68,6 +72,7 @@ class KeyUploadServlet(RestServlet): self.auth = hs.get_auth() self.e2e_keys_handler = hs.get_e2e_keys_handler() + @trace_defered_function_using_operation_name("upload_keys") @defer.inlineCallbacks def on_POST(self, request, device_id): requester = yield self.auth.get_user_by_req(request, allow_guest=True) @@ -78,6 +83,14 @@ class KeyUploadServlet(RestServlet): # passing the device_id here is deprecated; however, we allow it # for now for compatibility with older clients. if requester.device_id is not None and device_id != requester.device_id: + TracerUtil.set_tag("error", True) + TracerUtil.log_kv( + { + "message": "Client uploading keys for a different device", + "logged_in_id": requester.device_id, + "key_being_uploaded": device_id, + } + ) logger.warning( "Client uploading keys for a different device " "(logged in as %s, uploading for %s)", diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py index 99128f2df7..b0e426d30e 100644 --- a/synapse/storage/e2e_room_keys.py +++ b/synapse/storage/e2e_room_keys.py @@ -18,11 +18,13 @@ import json from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.util.tracerutils import trace_defered_function, trace_function from ._base import SQLBaseStore class EndToEndRoomKeyStore(SQLBaseStore): + @trace_defered_function @defer.inlineCallbacks def get_e2e_room_key(self, user_id, version, room_id, session_id): """Get the encrypted E2E room key for a given session from a given @@ -63,6 +65,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): return row + @trace_defered_function @defer.inlineCallbacks def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key): """Replaces or inserts the encrypted E2E room key for a given session in @@ -95,6 +98,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): lock=False, ) + @trace_defered_function @defer.inlineCallbacks def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk get the E2E room keys for a given backup, optionally filtered to a given @@ -153,6 +157,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): return sessions + @trace_defered_function @defer.inlineCallbacks def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk delete the E2E room keys for a given backup, optionally filtered to a given @@ -194,6 +199,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): raise StoreError(404, "No current backup version") return row[0] + @trace_function def get_e2e_room_keys_version_info(self, user_id, version=None): """Get info metadata about a version of our room_keys backup. @@ -236,6 +242,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): "get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn ) + @trace_function def create_e2e_room_keys_version(self, user_id, info): """Atomically creates a new version of this user's e2e_room_keys store with the given version info. @@ -276,6 +283,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): "create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn ) + @trace_function def update_e2e_room_keys_version(self, user_id, version, info): """Update a given backup version @@ -292,6 +300,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): desc="update_e2e_room_keys_version", ) + @trace_function def delete_e2e_room_keys_version(self, user_id, version=None): """Delete a given backup version of the user's room keys. Doesn't delete their actual key data. diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 1e07474e70..9779cb70c9 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -22,8 +22,11 @@ from synapse.util.caches.descriptors import cached from ._base import SQLBaseStore, db_to_json +from synapse.util.tracerutils import TracerUtil, trace_defered_function, trace_function + class EndToEndKeyWorkerStore(SQLBaseStore): + @trace_defered_function @defer.inlineCallbacks def get_e2e_device_keys( self, query_list, include_all_devices=False, include_deleted_devices=False @@ -40,6 +43,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore): Dict mapping from user-id to dict mapping from device_id to dict containing "key_json", "device_display_name". """ + TracerUtil.set_tag("query_list", query_list) if not query_list: return {} @@ -57,9 +61,13 @@ class EndToEndKeyWorkerStore(SQLBaseStore): return results + @trace_function def _get_e2e_device_keys_txn( self, txn, query_list, include_all_devices=False, include_deleted_devices=False ): + TracerUtil.set_tag("include_all_devices", include_all_devices) + TracerUtil.set_tag("include_deleted_devices", include_deleted_devices) + query_clauses = [] query_params = [] @@ -104,8 +112,10 @@ class EndToEndKeyWorkerStore(SQLBaseStore): for user_id, device_id in deleted_devices: result.setdefault(user_id, {})[device_id] = None + TracerUtil.log_kv(result) return result + @trace_defered_function @defer.inlineCallbacks def get_e2e_one_time_keys(self, user_id, device_id, key_ids): """Retrieve a number of one-time keys for a user @@ -121,6 +131,10 @@ class EndToEndKeyWorkerStore(SQLBaseStore): key_id) to json string for key """ + TracerUtil.set_tag("user_id", user_id) + TracerUtil.set_tag("device_id", device_id) + TracerUtil.set_tag("key_ids", key_ids) + rows = yield self._simple_select_many_batch( table="e2e_one_time_keys_json", column="key_id", @@ -145,7 +159,11 @@ class EndToEndKeyWorkerStore(SQLBaseStore): (algorithm, key_id, key json) """ + @trace_function def _add_e2e_one_time_keys(txn): + TracerUtil.set_tag("user_id", user_id) + TracerUtil.set_tag("device_id", device_id) + TracerUtil.set_tag("new_keys", new_keys) # We are protected from race between lookup and insertion due to # a unique constraint. If there is a race of two calls to # `add_e2e_one_time_keys` then they'll conflict and we will only @@ -201,7 +219,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): or the keys were already in the database. """ + @trace_function def _set_e2e_device_keys_txn(txn): + TracerUtil.set_tag("user_id", user_id) + TracerUtil.set_tag("device_id", device_id) + TracerUtil.set_tag("time_now", time_now) + TracerUtil.set_tag("device_keys", device_keys) + old_key_json = self._simple_select_one_onecol_txn( txn, table="e2e_device_keys_json", @@ -215,6 +239,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): new_key_json = encode_canonical_json(device_keys).decode("utf-8") if old_key_json == new_key_json: + TracerUtil.set_tag("error", True) return False self._simple_upsert_txn( @@ -231,6 +256,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): def claim_e2e_one_time_keys(self, query_list): """Take a list of one time keys out of the database""" + @trace_function def _claim_e2e_one_time_keys(txn): sql = ( "SELECT key_id, key_json FROM e2e_one_time_keys_json" @@ -252,7 +278,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): " AND key_id = ?" ) for user_id, device_id, algorithm, key_id in delete: + TracerUtil.log_kv( + {"message": "executing claim transaction on database"} + ) txn.execute(sql, (user_id, device_id, algorithm, key_id)) + TracerUtil.log_kv( + {"message": "finished executing and invalidating cache"} + ) self._invalidate_cache_and_stream( txn, self.count_e2e_one_time_keys, (user_id, device_id) ) @@ -261,7 +293,10 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): return self.runInteraction("claim_e2e_one_time_keys", _claim_e2e_one_time_keys) def delete_e2e_keys_by_device(self, user_id, device_id): + @trace_function def delete_e2e_keys_by_device_txn(txn): + TracerUtil.set_tag("user_id", user_id) + TracerUtil.set_tag("device_id", device_id) self._simple_delete_txn( txn, table="e2e_device_keys_json",