diff --git a/changelog.d/6873.feature b/changelog.d/6873.feature new file mode 100644 index 0000000000..bbedf8f7f0 --- /dev/null +++ b/changelog.d/6873.feature @@ -0,0 +1 @@ +Add ability to route federation user device queries to workers. diff --git a/docs/workers.md b/docs/workers.md index 82442d6a0a..6f7ec58780 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -176,6 +176,7 @@ endpoints matching the following regular expressions: ^/_matrix/federation/v1/query_auth/ ^/_matrix/federation/v1/event_auth/ ^/_matrix/federation/v1/exchange_third_party_invite/ + ^/_matrix/federation/v1/user/devices/ ^/_matrix/federation/v1/send/ ^/_matrix/federation/v1/get_groups_publicised$ ^/_matrix/key/v2/query diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 5e17ef1396..d055d11b23 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -33,6 +33,7 @@ from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.groups import SlavedGroupServerStore @@ -68,6 +69,7 @@ class FederationReaderSlavedStore( SlavedKeyStore, SlavedRegistrationStore, SlavedGroupServerStore, + SlavedDeviceStore, RoomStore, DirectoryStore, SlavedTransactionStore, diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index a6c966a393..7f9da49326 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -81,6 +81,8 @@ class FederationServer(FederationBase): self.handler = hs.get_handlers().federation_handler self.state = hs.get_state_handler() + self.device_handler = hs.get_device_handler() + self._server_linearizer = Linearizer("fed_server") self._transaction_linearizer = Linearizer("fed_txn_handler") @@ -523,8 +525,9 @@ class FederationServer(FederationBase): def on_query_client_keys(self, origin, content): return self.on_query_request("client_keys", content) - def on_query_user_devices(self, origin, user_id): - return self.on_query_request("user_devices", user_id) + async def on_query_user_devices(self, origin: str, user_id: str): + keys = await self.device_handler.on_federation_query_user_devices(user_id) + return 200, keys @trace async def on_claim_client_keys(self, origin, content): diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index a9bd431486..6d8e48ed39 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -225,6 +225,22 @@ class DeviceWorkerHandler(BaseHandler): return result + @defer.inlineCallbacks + def on_federation_query_user_devices(self, user_id): + stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) + master_key = yield self.store.get_e2e_cross_signing_key(user_id, "master") + self_signing_key = yield self.store.get_e2e_cross_signing_key( + user_id, "self_signing" + ) + + return { + "user_id": user_id, + "stream_id": stream_id, + "devices": devices, + "master_key": master_key, + "self_signing_key": self_signing_key, + } + class DeviceHandler(DeviceWorkerHandler): def __init__(self, hs): @@ -239,9 +255,6 @@ class DeviceHandler(DeviceWorkerHandler): federation_registry.register_edu_handler( "m.device_list_update", self.device_list_updater.incoming_device_list_update ) - federation_registry.register_query_handler( - "user_devices", self.on_federation_query_user_devices - ) hs.get_distributor().observe("user_left_room", self.user_left_room) @@ -456,22 +469,6 @@ class DeviceHandler(DeviceWorkerHandler): self.notifier.on_new_event("device_list_key", position, users=[from_user_id]) - @defer.inlineCallbacks - def on_federation_query_user_devices(self, user_id): - stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) - master_key = yield self.store.get_e2e_cross_signing_key(user_id, "master") - self_signing_key = yield self.store.get_e2e_cross_signing_key( - user_id, "self_signing" - ) - - return { - "user_id": user_id, - "stream_id": stream_id, - "devices": devices, - "master_key": master_key, - "self_signing_key": self_signing_key, - } - @defer.inlineCallbacks def user_left_room(self, user, room_id): user_id = user.to_string()