mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-29 07:28:55 +03:00
Change device list replication to match new semantics.
Instead of sending down batches of user ID/host tuples, send down a row per entity (user ID or host).
This commit is contained in:
parent
f5caa1864e
commit
9ce4e344a8
4 changed files with 32 additions and 23 deletions
|
@ -774,7 +774,7 @@ class FederationSenderHandler(object):
|
||||||
|
|
||||||
# ... as well as device updates and messages
|
# ... as well as device updates and messages
|
||||||
elif stream_name == DeviceListsStream.NAME:
|
elif stream_name == DeviceListsStream.NAME:
|
||||||
hosts = {row.destination for row in rows}
|
hosts = {row.entity for row in rows if not row.entity.startswith("@")}
|
||||||
for host in hosts:
|
for host in hosts:
|
||||||
self.federation_sender.send_device_messages(host)
|
self.federation_sender.send_device_messages(host)
|
||||||
|
|
||||||
|
|
|
@ -61,23 +61,24 @@ class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedSto
|
||||||
def process_replication_rows(self, stream_name, token, rows):
|
def process_replication_rows(self, stream_name, token, rows):
|
||||||
if stream_name == DeviceListsStream.NAME:
|
if stream_name == DeviceListsStream.NAME:
|
||||||
self._device_list_id_gen.advance(token)
|
self._device_list_id_gen.advance(token)
|
||||||
for row in rows:
|
self._invalidate_caches_for_devices(token, rows)
|
||||||
self._invalidate_caches_for_devices(token, row.user_id, row.destination)
|
|
||||||
elif stream_name == UserSignatureStream.NAME:
|
elif stream_name == UserSignatureStream.NAME:
|
||||||
|
self._device_list_id_gen.advance(token)
|
||||||
for row in rows:
|
for row in rows:
|
||||||
self._user_signature_stream_cache.entity_has_changed(row.user_id, token)
|
self._user_signature_stream_cache.entity_has_changed(row.user_id, token)
|
||||||
return super(SlavedDeviceStore, self).process_replication_rows(
|
return super(SlavedDeviceStore, self).process_replication_rows(
|
||||||
stream_name, token, rows
|
stream_name, token, rows
|
||||||
)
|
)
|
||||||
|
|
||||||
def _invalidate_caches_for_devices(self, token, user_id, destination):
|
def _invalidate_caches_for_devices(self, token, rows):
|
||||||
self._device_list_stream_cache.entity_has_changed(user_id, token)
|
for row in rows:
|
||||||
|
if row.entity.startswith("@"):
|
||||||
|
self._device_list_stream_cache.entity_has_changed(row.entity, token)
|
||||||
|
self.get_cached_devices_for_user.invalidate((row.entity,))
|
||||||
|
self._get_cached_user_device.invalidate_many((row.entity,))
|
||||||
|
self.get_device_list_last_stream_id_for_remote.invalidate((row.entity,))
|
||||||
|
|
||||||
if destination:
|
else:
|
||||||
self._device_list_federation_stream_cache.entity_has_changed(
|
self._device_list_federation_stream_cache.entity_has_changed(
|
||||||
destination, token
|
row.entity, token
|
||||||
)
|
)
|
||||||
|
|
||||||
self.get_cached_devices_for_user.invalidate((user_id,))
|
|
||||||
self._get_cached_user_device.invalidate_many((user_id,))
|
|
||||||
self.get_device_list_last_stream_id_for_remote.invalidate((user_id,))
|
|
||||||
|
|
|
@ -94,9 +94,13 @@ PublicRoomsStreamRow = namedtuple(
|
||||||
"network_id", # str, optional
|
"network_id", # str, optional
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
DeviceListsStreamRow = namedtuple(
|
|
||||||
"DeviceListsStreamRow", ("user_id", "destination") # str # str
|
|
||||||
)
|
@attr.s
|
||||||
|
class DeviceListsStreamRow:
|
||||||
|
entity = attr.ib(type=str)
|
||||||
|
|
||||||
|
|
||||||
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str
|
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str
|
||||||
TagAccountDataStreamRow = namedtuple(
|
TagAccountDataStreamRow = namedtuple(
|
||||||
"TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
|
"TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
|
||||||
|
@ -363,7 +367,8 @@ class PublicRoomsStream(Stream):
|
||||||
|
|
||||||
|
|
||||||
class DeviceListsStream(Stream):
|
class DeviceListsStream(Stream):
|
||||||
"""Someone added/changed/removed a device
|
"""Either a user has updated their devices or a remote server needs to be
|
||||||
|
told about a device update.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
NAME = "device_lists"
|
NAME = "device_lists"
|
||||||
|
|
|
@ -612,15 +612,18 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||||
combined list of changes to devices, and which destinations need to be
|
combined list of changes to devices, and which destinations need to be
|
||||||
poked. `destination` may be None if no destinations need to be poked.
|
poked. `destination` may be None if no destinations need to be poked.
|
||||||
"""
|
"""
|
||||||
# We do a group by here as there can be a large number of duplicate
|
|
||||||
# entries, since we throw away device IDs.
|
# This query Does The Right Thing where it'll correctly apply the
|
||||||
|
# bounds to the inner queries.
|
||||||
sql = """
|
sql = """
|
||||||
SELECT MAX(stream_id) AS stream_id, user_id, destination
|
SELECT stream_id, entity FROM (
|
||||||
FROM device_lists_stream
|
SELECT stream_id, user_id AS entity FROM device_lists_stream
|
||||||
LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id)
|
UNION ALL
|
||||||
|
SELECT stream_id, destination AS entity FROM device_lists_outbound_pokes
|
||||||
|
) AS e
|
||||||
WHERE ? < stream_id AND stream_id <= ?
|
WHERE ? < stream_id AND stream_id <= ?
|
||||||
GROUP BY user_id, destination
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return self.db.execute(
|
return self.db.execute(
|
||||||
"get_all_device_list_changes_for_remotes", None, sql, from_key, to_key
|
"get_all_device_list_changes_for_remotes", None, sql, from_key, to_key
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in a new issue