Add support for putting fed user query API on workers (#6873)

This commit is contained in:
Erik Johnston 2020-02-07 15:45:39 +00:00 committed by GitHub
parent e1d858984d
commit 21db35f77e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 25 additions and 21 deletions

1
changelog.d/6873.feature Normal file
View file

@ -0,0 +1 @@
Add ability to route federation user device queries to workers.

View file

@ -176,6 +176,7 @@ endpoints matching the following regular expressions:
^/_matrix/federation/v1/query_auth/ ^/_matrix/federation/v1/query_auth/
^/_matrix/federation/v1/event_auth/ ^/_matrix/federation/v1/event_auth/
^/_matrix/federation/v1/exchange_third_party_invite/ ^/_matrix/federation/v1/exchange_third_party_invite/
^/_matrix/federation/v1/user/devices/
^/_matrix/federation/v1/send/ ^/_matrix/federation/v1/send/
^/_matrix/federation/v1/get_groups_publicised$ ^/_matrix/federation/v1/get_groups_publicised$
^/_matrix/key/v2/query ^/_matrix/key/v2/query

View file

@ -33,6 +33,7 @@ from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.groups import SlavedGroupServerStore from synapse.replication.slave.storage.groups import SlavedGroupServerStore
@ -68,6 +69,7 @@ class FederationReaderSlavedStore(
SlavedKeyStore, SlavedKeyStore,
SlavedRegistrationStore, SlavedRegistrationStore,
SlavedGroupServerStore, SlavedGroupServerStore,
SlavedDeviceStore,
RoomStore, RoomStore,
DirectoryStore, DirectoryStore,
SlavedTransactionStore, SlavedTransactionStore,

View file

@ -81,6 +81,8 @@ class FederationServer(FederationBase):
self.handler = hs.get_handlers().federation_handler self.handler = hs.get_handlers().federation_handler
self.state = hs.get_state_handler() self.state = hs.get_state_handler()
self.device_handler = hs.get_device_handler()
self._server_linearizer = Linearizer("fed_server") self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler") self._transaction_linearizer = Linearizer("fed_txn_handler")
@ -523,8 +525,9 @@ class FederationServer(FederationBase):
def on_query_client_keys(self, origin, content): def on_query_client_keys(self, origin, content):
return self.on_query_request("client_keys", content) return self.on_query_request("client_keys", content)
def on_query_user_devices(self, origin, user_id): async def on_query_user_devices(self, origin: str, user_id: str):
return self.on_query_request("user_devices", user_id) keys = await self.device_handler.on_federation_query_user_devices(user_id)
return 200, keys
@trace @trace
async def on_claim_client_keys(self, origin, content): async def on_claim_client_keys(self, origin, content):

View file

@ -225,6 +225,22 @@ class DeviceWorkerHandler(BaseHandler):
return result return result
@defer.inlineCallbacks
def on_federation_query_user_devices(self, user_id):
stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
master_key = yield self.store.get_e2e_cross_signing_key(user_id, "master")
self_signing_key = yield self.store.get_e2e_cross_signing_key(
user_id, "self_signing"
)
return {
"user_id": user_id,
"stream_id": stream_id,
"devices": devices,
"master_key": master_key,
"self_signing_key": self_signing_key,
}
class DeviceHandler(DeviceWorkerHandler): class DeviceHandler(DeviceWorkerHandler):
def __init__(self, hs): def __init__(self, hs):
@ -239,9 +255,6 @@ class DeviceHandler(DeviceWorkerHandler):
federation_registry.register_edu_handler( federation_registry.register_edu_handler(
"m.device_list_update", self.device_list_updater.incoming_device_list_update "m.device_list_update", self.device_list_updater.incoming_device_list_update
) )
federation_registry.register_query_handler(
"user_devices", self.on_federation_query_user_devices
)
hs.get_distributor().observe("user_left_room", self.user_left_room) hs.get_distributor().observe("user_left_room", self.user_left_room)
@ -456,22 +469,6 @@ class DeviceHandler(DeviceWorkerHandler):
self.notifier.on_new_event("device_list_key", position, users=[from_user_id]) self.notifier.on_new_event("device_list_key", position, users=[from_user_id])
@defer.inlineCallbacks
def on_federation_query_user_devices(self, user_id):
stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
master_key = yield self.store.get_e2e_cross_signing_key(user_id, "master")
self_signing_key = yield self.store.get_e2e_cross_signing_key(
user_id, "self_signing"
)
return {
"user_id": user_id,
"stream_id": stream_id,
"devices": devices,
"master_key": master_key,
"self_signing_key": self_signing_key,
}
@defer.inlineCallbacks @defer.inlineCallbacks
def user_left_room(self, user, room_id): def user_left_room(self, user, room_id):
user_id = user.to_string() user_id = user.to_string()