mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-20 10:55:09 +03:00
Cap the top stream ID when fetching changed devices
This commit is contained in:
parent
bec0313e1b
commit
5b2b3120c2
3 changed files with 22 additions and 6 deletions
|
@ -159,20 +159,32 @@ class DeviceWorkerHandler:
|
|||
|
||||
@cancellable
|
||||
async def get_device_changes_in_shared_rooms(
|
||||
self, user_id: str, room_ids: StrCollection, from_token: StreamToken
|
||||
self,
|
||||
user_id: str,
|
||||
room_ids: StrCollection,
|
||||
from_token: StreamToken,
|
||||
now_token: Optional[StreamToken] = None,
|
||||
) -> Set[str]:
|
||||
"""Get the set of users whose devices have changed who share a room with
|
||||
the given user.
|
||||
"""
|
||||
now_device_lists_key = self.store.get_device_stream_token()
|
||||
if now_token:
|
||||
now_device_lists_key = now_token.device_list_key
|
||||
|
||||
changed_users = await self.store.get_device_list_changes_in_rooms(
|
||||
room_ids, from_token.device_list_key
|
||||
room_ids,
|
||||
from_token.device_list_key,
|
||||
now_device_lists_key,
|
||||
)
|
||||
|
||||
if changed_users is not None:
|
||||
# We also check if the given user has changed their device. If
|
||||
# they're in no rooms then the above query won't include them.
|
||||
changed = await self.store.get_users_whose_devices_changed(
|
||||
from_token.device_list_key, [user_id]
|
||||
from_token.device_list_key,
|
||||
[user_id],
|
||||
to_key=now_device_lists_key,
|
||||
)
|
||||
changed_users.update(changed)
|
||||
return changed_users
|
||||
|
@ -190,7 +202,9 @@ class DeviceWorkerHandler:
|
|||
tracked_users.add(user_id)
|
||||
|
||||
changed = await self.store.get_users_whose_devices_changed(
|
||||
from_token.device_list_key, tracked_users
|
||||
from_token.device_list_key,
|
||||
tracked_users,
|
||||
to_key=now_device_lists_key,
|
||||
)
|
||||
|
||||
return changed
|
||||
|
|
|
@ -1891,6 +1891,7 @@ class SyncHandler:
|
|||
user_id,
|
||||
sync_result_builder.joined_room_ids,
|
||||
from_token=since_token,
|
||||
now_token=sync_result_builder.now_token,
|
||||
)
|
||||
)
|
||||
|
||||
|
|
|
@ -1444,7 +1444,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
|||
|
||||
@cancellable
|
||||
async def get_device_list_changes_in_rooms(
|
||||
self, room_ids: Collection[str], from_id: int
|
||||
self, room_ids: Collection[str], from_id: int, to_id: int
|
||||
) -> Optional[Set[str]]:
|
||||
"""Return the set of users whose devices have changed in the given rooms
|
||||
since the given stream ID.
|
||||
|
@ -1462,7 +1462,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
|||
|
||||
sql = """
|
||||
SELECT DISTINCT user_id FROM device_lists_changes_in_room
|
||||
WHERE {clause} AND stream_id > ?
|
||||
WHERE {clause} AND stream_id > ? AND stream_id <= ?
|
||||
"""
|
||||
|
||||
def _get_device_list_changes_in_rooms_txn(
|
||||
|
@ -1479,6 +1479,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
|||
self.database_engine, "room_id", chunk
|
||||
)
|
||||
args.append(from_id)
|
||||
args.append(to_id)
|
||||
|
||||
changes |= await self.db_pool.runInteraction(
|
||||
"get_device_list_changes_in_rooms",
|
||||
|
|
Loading…
Reference in a new issue