mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-22 12:44:30 +03:00
Trace devices
This commit is contained in:
parent
d876cda725
commit
fd669e5e44
2 changed files with 42 additions and 1 deletions
|
@ -31,6 +31,7 @@ from synapse.util.async_helpers import Linearizer
|
||||||
from synapse.util.caches.expiringcache import ExpiringCache
|
from synapse.util.caches.expiringcache import ExpiringCache
|
||||||
from synapse.util.metrics import measure_func
|
from synapse.util.metrics import measure_func
|
||||||
from synapse.util.retryutils import NotRetryingDestination
|
from synapse.util.retryutils import NotRetryingDestination
|
||||||
|
from synapse.util.tracerutils import TracerUtil, trace_defered_function
|
||||||
|
|
||||||
from ._base import BaseHandler
|
from ._base import BaseHandler
|
||||||
|
|
||||||
|
@ -45,6 +46,7 @@ class DeviceWorkerHandler(BaseHandler):
|
||||||
self.state = hs.get_state_handler()
|
self.state = hs.get_state_handler()
|
||||||
self._auth_handler = hs.get_auth_handler()
|
self._auth_handler = hs.get_auth_handler()
|
||||||
|
|
||||||
|
@trace_defered_function
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_devices_by_user(self, user_id):
|
def get_devices_by_user(self, user_id):
|
||||||
"""
|
"""
|
||||||
|
@ -56,6 +58,7 @@ class DeviceWorkerHandler(BaseHandler):
|
||||||
defer.Deferred: list[dict[str, X]]: info on each device
|
defer.Deferred: list[dict[str, X]]: info on each device
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
TracerUtil.set_tag("user_id", user_id)
|
||||||
device_map = yield self.store.get_devices_by_user(user_id)
|
device_map = yield self.store.get_devices_by_user(user_id)
|
||||||
|
|
||||||
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None)
|
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None)
|
||||||
|
@ -64,8 +67,10 @@ class DeviceWorkerHandler(BaseHandler):
|
||||||
for device in devices:
|
for device in devices:
|
||||||
_update_device_from_client_ips(device, ips)
|
_update_device_from_client_ips(device, ips)
|
||||||
|
|
||||||
|
TracerUtil.log_kv(device_map)
|
||||||
return devices
|
return devices
|
||||||
|
|
||||||
|
@trace_defered_function
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_device(self, user_id, device_id):
|
def get_device(self, user_id, device_id):
|
||||||
""" Retrieve the given device
|
""" Retrieve the given device
|
||||||
|
@ -85,9 +90,14 @@ class DeviceWorkerHandler(BaseHandler):
|
||||||
raise errors.NotFoundError
|
raise errors.NotFoundError
|
||||||
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id)
|
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id)
|
||||||
_update_device_from_client_ips(device, ips)
|
_update_device_from_client_ips(device, ips)
|
||||||
|
|
||||||
|
TracerUtil.set_tag("device", device)
|
||||||
|
TracerUtil.set_tag("ips", ips)
|
||||||
|
|
||||||
return device
|
return device
|
||||||
|
|
||||||
@measure_func("device.get_user_ids_changed")
|
@measure_func("device.get_user_ids_changed")
|
||||||
|
@trace_defered_function
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_user_ids_changed(self, user_id, from_token):
|
def get_user_ids_changed(self, user_id, from_token):
|
||||||
"""Get list of users that have had the devices updated, or have newly
|
"""Get list of users that have had the devices updated, or have newly
|
||||||
|
@ -97,6 +107,9 @@ class DeviceWorkerHandler(BaseHandler):
|
||||||
user_id (str)
|
user_id (str)
|
||||||
from_token (StreamToken)
|
from_token (StreamToken)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
TracerUtil("user_id", user_id)
|
||||||
|
TracerUtil.set_tag("from_token", from_token)
|
||||||
now_room_key = yield self.store.get_room_events_max_id()
|
now_room_key = yield self.store.get_room_events_max_id()
|
||||||
|
|
||||||
room_ids = yield self.store.get_rooms_for_user(user_id)
|
room_ids = yield self.store.get_rooms_for_user(user_id)
|
||||||
|
@ -133,7 +146,7 @@ class DeviceWorkerHandler(BaseHandler):
|
||||||
if etype != EventTypes.Member:
|
if etype != EventTypes.Member:
|
||||||
continue
|
continue
|
||||||
possibly_left.add(state_key)
|
possibly_left.add(state_key)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Fetch the current state at the time.
|
# Fetch the current state at the time.
|
||||||
try:
|
try:
|
||||||
|
@ -148,6 +161,9 @@ class DeviceWorkerHandler(BaseHandler):
|
||||||
# special-case for an empty prev state: include all members
|
# special-case for an empty prev state: include all members
|
||||||
# in the changed list
|
# in the changed list
|
||||||
if not event_ids:
|
if not event_ids:
|
||||||
|
TracerUtil.log_kv(
|
||||||
|
{"event": "encountered empty previous state", "room_id": room_id}
|
||||||
|
)
|
||||||
for key, event_id in iteritems(current_state_ids):
|
for key, event_id in iteritems(current_state_ids):
|
||||||
etype, state_key = key
|
etype, state_key = key
|
||||||
if etype != EventTypes.Member:
|
if etype != EventTypes.Member:
|
||||||
|
@ -200,6 +216,10 @@ class DeviceWorkerHandler(BaseHandler):
|
||||||
possibly_joined = []
|
possibly_joined = []
|
||||||
possibly_left = []
|
possibly_left = []
|
||||||
|
|
||||||
|
TracerUtil.log_kv(
|
||||||
|
{"changed": list(possibly_joined), "left": list(possibly_left)}
|
||||||
|
)
|
||||||
|
|
||||||
return {"changed": list(possibly_joined), "left": list(possibly_left)}
|
return {"changed": list(possibly_joined), "left": list(possibly_left)}
|
||||||
|
|
||||||
|
|
||||||
|
@ -267,6 +287,7 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||||
|
|
||||||
raise errors.StoreError(500, "Couldn't generate a device ID.")
|
raise errors.StoreError(500, "Couldn't generate a device ID.")
|
||||||
|
|
||||||
|
@trace_defered_function
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def delete_device(self, user_id, device_id):
|
def delete_device(self, user_id, device_id):
|
||||||
""" Delete the given device
|
""" Delete the given device
|
||||||
|
@ -284,6 +305,8 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||||
except errors.StoreError as e:
|
except errors.StoreError as e:
|
||||||
if e.code == 404:
|
if e.code == 404:
|
||||||
# no match
|
# no match
|
||||||
|
TracerUtil.set_tag("error", True)
|
||||||
|
TracerUtil.set_tag("reason", "User doesn't have that device id.")
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
@ -296,6 +319,7 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||||
|
|
||||||
yield self.notify_device_update(user_id, [device_id])
|
yield self.notify_device_update(user_id, [device_id])
|
||||||
|
|
||||||
|
@trace_defered_function
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def delete_all_devices_for_user(self, user_id, except_device_id=None):
|
def delete_all_devices_for_user(self, user_id, except_device_id=None):
|
||||||
"""Delete all of the user's devices
|
"""Delete all of the user's devices
|
||||||
|
@ -331,6 +355,8 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||||
except errors.StoreError as e:
|
except errors.StoreError as e:
|
||||||
if e.code == 404:
|
if e.code == 404:
|
||||||
# no match
|
# no match
|
||||||
|
TracerUtil.set_tag("error", True)
|
||||||
|
TracerUtil.set_tag("reason", "User doesn't have that device id.")
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
@ -451,12 +477,15 @@ class DeviceListEduUpdater(object):
|
||||||
iterable=True,
|
iterable=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@trace_defered_function
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def incoming_device_list_update(self, origin, edu_content):
|
def incoming_device_list_update(self, origin, edu_content):
|
||||||
"""Called on incoming device list update from federation. Responsible
|
"""Called on incoming device list update from federation. Responsible
|
||||||
for parsing the EDU and adding to pending updates list.
|
for parsing the EDU and adding to pending updates list.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
TracerUtil.set_tag("origin", origin)
|
||||||
|
TracerUtil.set_tag("edu_content", edu_content)
|
||||||
user_id = edu_content.pop("user_id")
|
user_id = edu_content.pop("user_id")
|
||||||
device_id = edu_content.pop("device_id")
|
device_id = edu_content.pop("device_id")
|
||||||
stream_id = str(edu_content.pop("stream_id")) # They may come as ints
|
stream_id = str(edu_content.pop("stream_id")) # They may come as ints
|
||||||
|
@ -477,6 +506,13 @@ class DeviceListEduUpdater(object):
|
||||||
if not room_ids:
|
if not room_ids:
|
||||||
# We don't share any rooms with this user. Ignore update, as we
|
# We don't share any rooms with this user. Ignore update, as we
|
||||||
# probably won't get any further updates.
|
# probably won't get any further updates.
|
||||||
|
TracerUtil.set_tag("error", True)
|
||||||
|
TracerUtil.log_kv(
|
||||||
|
{
|
||||||
|
"message": "Got an update from a user which "
|
||||||
|
+ "doesn't share a room with the current user."
|
||||||
|
}
|
||||||
|
)
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"Got device list update edu for %r/%r, but don't share a room",
|
"Got device list update edu for %r/%r, but don't share a room",
|
||||||
user_id,
|
user_id,
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
import logging
|
import logging
|
||||||
|
from synapse.util.tracerutils import TracerUtil, trace_defered_function
|
||||||
|
|
||||||
from six import iteritems
|
from six import iteritems
|
||||||
|
|
||||||
|
@ -299,6 +300,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||||
def get_device_stream_token(self):
|
def get_device_stream_token(self):
|
||||||
return self._device_list_id_gen.get_current_token()
|
return self._device_list_id_gen.get_current_token()
|
||||||
|
|
||||||
|
@trace_defered_function
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_user_devices_from_cache(self, query_list):
|
def get_user_devices_from_cache(self, query_list):
|
||||||
"""Get the devices (and keys if any) for remote users from the cache.
|
"""Get the devices (and keys if any) for remote users from the cache.
|
||||||
|
@ -330,6 +332,9 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||||
else:
|
else:
|
||||||
results[user_id] = yield self._get_cached_devices_for_user(user_id)
|
results[user_id] = yield self._get_cached_devices_for_user(user_id)
|
||||||
|
|
||||||
|
TracerUtil.set_tag("in_cache", results)
|
||||||
|
TracerUtil.set_tag("not_in_cache", user_ids_not_in_cache)
|
||||||
|
|
||||||
return (user_ids_not_in_cache, results)
|
return (user_ids_not_in_cache, results)
|
||||||
|
|
||||||
@cachedInlineCallbacks(num_args=2, tree=True)
|
@cachedInlineCallbacks(num_args=2, tree=True)
|
||||||
|
|
Loading…
Reference in a new issue