mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-24 10:35:46 +03:00
Limit amount of replication we send (#17358)
Fixes up #17333, where we failed to actually send less data (the `DISTINCT` didn't work due to `stream_id` being different). We fix this by making it so that every device list outbound poke for a given user ID has the same stream ID. We can't change the query to only return e.g. max stream ID as the receivers look up the destinations to send to by doing `SELECT WHERE stream_id = ?`
This commit is contained in:
parent
554a92601a
commit
c89fea3fd1
2 changed files with 8 additions and 8 deletions
1
changelog.d/17358.misc
Normal file
1
changelog.d/17358.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Handle device lists notifications for large accounts more efficiently in worker mode.
|
|
@ -2131,7 +2131,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||||
user_id: str,
|
user_id: str,
|
||||||
device_id: str,
|
device_id: str,
|
||||||
hosts: Collection[str],
|
hosts: Collection[str],
|
||||||
stream_ids: List[int],
|
stream_id: int,
|
||||||
context: Optional[Dict[str, str]],
|
context: Optional[Dict[str, str]],
|
||||||
) -> None:
|
) -> None:
|
||||||
if self._device_list_federation_stream_cache:
|
if self._device_list_federation_stream_cache:
|
||||||
|
@ -2139,11 +2139,10 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||||
txn.call_after(
|
txn.call_after(
|
||||||
self._device_list_federation_stream_cache.entity_has_changed,
|
self._device_list_federation_stream_cache.entity_has_changed,
|
||||||
host,
|
host,
|
||||||
stream_ids[-1],
|
stream_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
now = self._clock.time_msec()
|
now = self._clock.time_msec()
|
||||||
stream_id_iterator = iter(stream_ids)
|
|
||||||
|
|
||||||
encoded_context = json_encoder.encode(context)
|
encoded_context = json_encoder.encode(context)
|
||||||
mark_sent = not self.hs.is_mine_id(user_id)
|
mark_sent = not self.hs.is_mine_id(user_id)
|
||||||
|
@ -2152,7 +2151,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||||
(
|
(
|
||||||
destination,
|
destination,
|
||||||
self._instance_name,
|
self._instance_name,
|
||||||
next(stream_id_iterator),
|
stream_id,
|
||||||
user_id,
|
user_id,
|
||||||
device_id,
|
device_id,
|
||||||
mark_sent,
|
mark_sent,
|
||||||
|
@ -2337,22 +2336,22 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||||
return
|
return
|
||||||
|
|
||||||
def add_device_list_outbound_pokes_txn(
|
def add_device_list_outbound_pokes_txn(
|
||||||
txn: LoggingTransaction, stream_ids: List[int]
|
txn: LoggingTransaction, stream_id: int
|
||||||
) -> None:
|
) -> None:
|
||||||
self._add_device_outbound_poke_to_stream_txn(
|
self._add_device_outbound_poke_to_stream_txn(
|
||||||
txn,
|
txn,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
device_id=device_id,
|
device_id=device_id,
|
||||||
hosts=hosts,
|
hosts=hosts,
|
||||||
stream_ids=stream_ids,
|
stream_id=stream_id,
|
||||||
context=context,
|
context=context,
|
||||||
)
|
)
|
||||||
|
|
||||||
async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids:
|
async with self._device_list_id_gen.get_next() as stream_id:
|
||||||
return await self.db_pool.runInteraction(
|
return await self.db_pool.runInteraction(
|
||||||
"add_device_list_outbound_pokes",
|
"add_device_list_outbound_pokes",
|
||||||
add_device_list_outbound_pokes_txn,
|
add_device_list_outbound_pokes_txn,
|
||||||
stream_ids,
|
stream_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def add_remote_device_list_to_pending(
|
async def add_remote_device_list_to_pending(
|
||||||
|
|
Loading…
Reference in a new issue