diff --git a/changelog.d/16312.misc b/changelog.d/16312.misc new file mode 100644 index 0000000000..4f266c1fb0 --- /dev/null +++ b/changelog.d/16312.misc @@ -0,0 +1 @@ +Delete device messages asynchronously and in staged batches using the task scheduler. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 0ccd7d250c..f1f19666d7 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -362,21 +362,36 @@ class SyncHandler: # (since we now know that the device has received them) if since_token is not None: since_stream_id = since_token.to_device_key - # Delete device messages asynchronously and in batches using the task scheduler - await self._task_scheduler.schedule_task( - DELETE_DEVICE_MSGS_TASK_NAME, - resource_id=sync_config.device_id, - params={ - "user_id": sync_config.user.to_string(), - "device_id": sync_config.device_id, - "up_to_stream_id": since_stream_id, - }, + # Fast path: delete a limited number of to-device messages up front. + # We do this to avoid the overhead of scheduling a task for every + # sync. + device_deletion_limit = 100 + deleted = await self.store.delete_messages_for_device( + sync_config.user.to_string(), + sync_config.device_id, + since_stream_id, + limit=device_deletion_limit, ) logger.debug( - "Deletion of to-device messages up to %d scheduled", - since_stream_id, + "Deleted %d to-device messages up to %d", deleted, since_stream_id ) + # If we hit the limit, schedule a background task to delete the rest. + if deleted >= device_deletion_limit: + await self._task_scheduler.schedule_task( + DELETE_DEVICE_MSGS_TASK_NAME, + resource_id=sync_config.device_id, + params={ + "user_id": sync_config.user.to_string(), + "device_id": sync_config.device_id, + "up_to_stream_id": since_stream_id, + }, + ) + logger.debug( + "Deletion of to-device messages up to %d scheduled", + since_stream_id, + ) + if timeout == 0 or since_token is None or full_state: # we are going to return immediately, so don't bother calling # notifier.wait_for_events.