mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-27 03:58:06 +03:00
Merge pull request #5559 from matrix-org/erikj/refactor_changed_devices
Refactor devices changed query to pull less from DB
This commit is contained in:
commit
e79ec03165
4 changed files with 104 additions and 43 deletions
1
changelog.d/5559.feature
Normal file
1
changelog.d/5559.feature
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Optimise devices changed query to not pull unnecessary rows from the database, reducing database load.
|
|
@ -101,9 +101,13 @@ class DeviceWorkerHandler(BaseHandler):
|
||||||
|
|
||||||
room_ids = yield self.store.get_rooms_for_user(user_id)
|
room_ids = yield self.store.get_rooms_for_user(user_id)
|
||||||
|
|
||||||
# First we check if any devices have changed
|
# First we check if any devices have changed for users that we share
|
||||||
changed = yield self.store.get_user_whose_devices_changed(
|
# rooms with.
|
||||||
from_token.device_list_key
|
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
|
||||||
|
user_id
|
||||||
|
)
|
||||||
|
changed = yield self.store.get_users_whose_devices_changed(
|
||||||
|
from_token.device_list_key, users_who_share_room
|
||||||
)
|
)
|
||||||
|
|
||||||
# Then work out if any users have since joined
|
# Then work out if any users have since joined
|
||||||
|
@ -188,10 +192,6 @@ class DeviceWorkerHandler(BaseHandler):
|
||||||
break
|
break
|
||||||
|
|
||||||
if possibly_changed or possibly_left:
|
if possibly_changed or possibly_left:
|
||||||
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
|
|
||||||
user_id
|
|
||||||
)
|
|
||||||
|
|
||||||
# Take the intersection of the users whose devices may have changed
|
# Take the intersection of the users whose devices may have changed
|
||||||
# and those that actually still share a room with the user
|
# and those that actually still share a room with the user
|
||||||
possibly_joined = possibly_changed & users_who_share_room
|
possibly_joined = possibly_changed & users_who_share_room
|
||||||
|
|
|
@ -1058,40 +1058,74 @@ class SyncHandler(object):
|
||||||
newly_left_rooms,
|
newly_left_rooms,
|
||||||
newly_left_users,
|
newly_left_users,
|
||||||
):
|
):
|
||||||
|
"""Generate the DeviceLists section of sync
|
||||||
|
|
||||||
|
Args:
|
||||||
|
sync_result_builder (SyncResultBuilder)
|
||||||
|
newly_joined_rooms (set[str]): Set of rooms user has joined since
|
||||||
|
previous sync
|
||||||
|
newly_joined_or_invited_users (set[str]): Set of users that have
|
||||||
|
joined or been invited to a room since previous sync.
|
||||||
|
newly_left_rooms (set[str]): Set of rooms user has left since
|
||||||
|
previous sync
|
||||||
|
newly_left_users (set[str]): Set of users that have left a room
|
||||||
|
we're in since previous sync
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[DeviceLists]
|
||||||
|
"""
|
||||||
|
|
||||||
user_id = sync_result_builder.sync_config.user.to_string()
|
user_id = sync_result_builder.sync_config.user.to_string()
|
||||||
since_token = sync_result_builder.since_token
|
since_token = sync_result_builder.since_token
|
||||||
|
|
||||||
|
# We're going to mutate these fields, so lets copy them rather than
|
||||||
|
# assume they won't get used later.
|
||||||
|
newly_joined_or_invited_users = set(newly_joined_or_invited_users)
|
||||||
|
newly_left_users = set(newly_left_users)
|
||||||
|
|
||||||
if since_token and since_token.device_list_key:
|
if since_token and since_token.device_list_key:
|
||||||
changed = yield self.store.get_user_whose_devices_changed(
|
# We want to figure out what user IDs the client should refetch
|
||||||
since_token.device_list_key
|
# device keys for, and which users we aren't going to track changes
|
||||||
)
|
# for anymore.
|
||||||
|
#
|
||||||
# TODO: Be more clever than this, i.e. remove users who we already
|
# For the first step we check:
|
||||||
# share a room with?
|
# a. if any users we share a room with have updated their devices,
|
||||||
for room_id in newly_joined_rooms:
|
# and
|
||||||
joined_users = yield self.state.get_current_users_in_room(room_id)
|
# b. we also check if we've joined any new rooms, or if a user has
|
||||||
newly_joined_or_invited_users.update(joined_users)
|
# joined a room we're in.
|
||||||
|
#
|
||||||
for room_id in newly_left_rooms:
|
# For the second step we just find any users we no longer share a
|
||||||
left_users = yield self.state.get_current_users_in_room(room_id)
|
# room with by looking at all users that have left a room plus users
|
||||||
newly_left_users.update(left_users)
|
# that were in a room we've left.
|
||||||
|
|
||||||
# TODO: Check that these users are actually new, i.e. either they
|
|
||||||
# weren't in the previous sync *or* they left and rejoined.
|
|
||||||
changed.update(newly_joined_or_invited_users)
|
|
||||||
|
|
||||||
if not changed and not newly_left_users:
|
|
||||||
defer.returnValue(DeviceLists(changed=[], left=newly_left_users))
|
|
||||||
|
|
||||||
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
|
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
|
||||||
user_id
|
user_id
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Step 1a, check for changes in devices of users we share a room with
|
||||||
|
users_that_have_changed = yield self.store.get_users_whose_devices_changed(
|
||||||
|
since_token.device_list_key, users_who_share_room
|
||||||
|
)
|
||||||
|
|
||||||
|
# Step 1b, check for newly joined rooms
|
||||||
|
for room_id in newly_joined_rooms:
|
||||||
|
joined_users = yield self.state.get_current_users_in_room(room_id)
|
||||||
|
newly_joined_or_invited_users.update(joined_users)
|
||||||
|
|
||||||
|
# TODO: Check that these users are actually new, i.e. either they
|
||||||
|
# weren't in the previous sync *or* they left and rejoined.
|
||||||
|
users_that_have_changed.update(newly_joined_or_invited_users)
|
||||||
|
|
||||||
|
# Now find users that we no longer track
|
||||||
|
for room_id in newly_left_rooms:
|
||||||
|
left_users = yield self.state.get_current_users_in_room(room_id)
|
||||||
|
newly_left_users.update(left_users)
|
||||||
|
|
||||||
|
# Remove any users that we still share a room with.
|
||||||
|
newly_left_users -= users_who_share_room
|
||||||
|
|
||||||
defer.returnValue(
|
defer.returnValue(
|
||||||
DeviceLists(
|
DeviceLists(changed=users_that_have_changed, left=newly_left_users)
|
||||||
changed=users_who_share_room & changed,
|
|
||||||
left=set(newly_left_users) - users_who_share_room,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
defer.returnValue(DeviceLists(changed=[], left=[]))
|
defer.returnValue(DeviceLists(changed=[], left=[]))
|
||||||
|
|
|
@ -24,6 +24,7 @@ from synapse.api.errors import StoreError
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.storage._base import Cache, SQLBaseStore, db_to_json
|
from synapse.storage._base import Cache, SQLBaseStore, db_to_json
|
||||||
from synapse.storage.background_updates import BackgroundUpdateStore
|
from synapse.storage.background_updates import BackgroundUpdateStore
|
||||||
|
from synapse.util import batch_iter
|
||||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
|
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -391,22 +392,47 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||||
|
|
||||||
return now_stream_id, []
|
return now_stream_id, []
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
def get_users_whose_devices_changed(self, from_key, user_ids):
|
||||||
def get_user_whose_devices_changed(self, from_key):
|
"""Get set of users whose devices have changed since `from_key` that
|
||||||
"""Get set of users whose devices have changed since `from_key`.
|
are in the given list of user_ids.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
from_key (str): The device lists stream token
|
||||||
|
user_ids (Iterable[str])
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[set[str]]: The set of user_ids whose devices have changed
|
||||||
|
since `from_key`
|
||||||
"""
|
"""
|
||||||
from_key = int(from_key)
|
from_key = int(from_key)
|
||||||
changed = self._device_list_stream_cache.get_all_entities_changed(from_key)
|
|
||||||
if changed is not None:
|
|
||||||
defer.returnValue(set(changed))
|
|
||||||
|
|
||||||
sql = """
|
# Get set of users who *may* have changed. Users not in the returned
|
||||||
SELECT DISTINCT user_id FROM device_lists_stream WHERE stream_id > ?
|
# list have definitely not changed.
|
||||||
"""
|
to_check = list(
|
||||||
rows = yield self._execute(
|
self._device_list_stream_cache.get_entities_changed(user_ids, from_key)
|
||||||
"get_user_whose_devices_changed", None, sql, from_key
|
)
|
||||||
|
|
||||||
|
if not to_check:
|
||||||
|
return defer.succeed(set())
|
||||||
|
|
||||||
|
def _get_users_whose_devices_changed_txn(txn):
|
||||||
|
changes = set()
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
SELECT DISTINCT user_id FROM device_lists_stream
|
||||||
|
WHERE stream_id > ?
|
||||||
|
AND user_id IN (%s)
|
||||||
|
"""
|
||||||
|
|
||||||
|
for chunk in batch_iter(to_check, 100):
|
||||||
|
txn.execute(sql % (",".join("?" for _ in chunk),), (from_key,) + chunk)
|
||||||
|
changes.update(user_id for user_id, in txn)
|
||||||
|
|
||||||
|
return changes
|
||||||
|
|
||||||
|
return self.runInteraction(
|
||||||
|
"get_users_whose_devices_changed", _get_users_whose_devices_changed_txn
|
||||||
)
|
)
|
||||||
defer.returnValue(set(row[0] for row in rows))
|
|
||||||
|
|
||||||
def get_all_device_list_changes_for_remotes(self, from_key, to_key):
|
def get_all_device_list_changes_for_remotes(self, from_key, to_key):
|
||||||
"""Return a list of `(stream_id, user_id, destination)` which is the
|
"""Return a list of `(stream_id, user_id, destination)` which is the
|
||||||
|
|
Loading…
Reference in a new issue