mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-21 17:15:38 +03:00
Create one-off scheduled task to delete old OTKs (#17934)
To work around the fact that, pre-https://github.com/element-hq/synapse/pull/17903, our database may have old one-time-keys that the clients have long thrown away the private keys for, we want to delete OTKs that look like they came from libolm. To spread the load a bit, without holding up other background database updates, we use a scheduled task to do the work.
This commit is contained in:
parent
e918f683d4
commit
c5e89f5fae
6 changed files with 203 additions and 0 deletions
1
changelog.d/17934.feature
Normal file
1
changelog.d/17934.feature
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Add a one-off task to delete old one-time-keys, to guard against us having old OTKs in the database that the client has long forgotten about.
|
|
@ -39,6 +39,8 @@ from synapse.replication.http.devices import ReplicationUploadKeysForUserRestSer
|
||||||
from synapse.types import (
|
from synapse.types import (
|
||||||
JsonDict,
|
JsonDict,
|
||||||
JsonMapping,
|
JsonMapping,
|
||||||
|
ScheduledTask,
|
||||||
|
TaskStatus,
|
||||||
UserID,
|
UserID,
|
||||||
get_domain_from_id,
|
get_domain_from_id,
|
||||||
get_verify_key_from_cross_signing_key,
|
get_verify_key_from_cross_signing_key,
|
||||||
|
@ -70,6 +72,7 @@ class E2eKeysHandler:
|
||||||
self.is_mine = hs.is_mine
|
self.is_mine = hs.is_mine
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self._worker_lock_handler = hs.get_worker_locks_handler()
|
self._worker_lock_handler = hs.get_worker_locks_handler()
|
||||||
|
self._task_scheduler = hs.get_task_scheduler()
|
||||||
|
|
||||||
federation_registry = hs.get_federation_registry()
|
federation_registry = hs.get_federation_registry()
|
||||||
|
|
||||||
|
@ -116,6 +119,10 @@ class E2eKeysHandler:
|
||||||
hs.config.experimental.msc3984_appservice_key_query
|
hs.config.experimental.msc3984_appservice_key_query
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self._task_scheduler.register_action(
|
||||||
|
self._delete_old_one_time_keys_task, "delete_old_otks"
|
||||||
|
)
|
||||||
|
|
||||||
@trace
|
@trace
|
||||||
@cancellable
|
@cancellable
|
||||||
async def query_devices(
|
async def query_devices(
|
||||||
|
@ -1574,6 +1581,45 @@ class E2eKeysHandler:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
async def _delete_old_one_time_keys_task(
|
||||||
|
self, task: ScheduledTask
|
||||||
|
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
|
||||||
|
"""Scheduler task to delete old one time keys.
|
||||||
|
|
||||||
|
Until Synapse 1.119, Synapse used to issue one-time-keys in a random order, leading to the possibility
|
||||||
|
that it could still have old OTKs that the client has dropped. This task is scheduled exactly once
|
||||||
|
by a database schema delta file, and it clears out old one-time-keys that look like they came from libolm.
|
||||||
|
"""
|
||||||
|
last_user = task.result.get("from_user", "") if task.result else ""
|
||||||
|
while True:
|
||||||
|
# We process users in batches of 100
|
||||||
|
users, rowcount = await self.store.delete_old_otks_for_next_user_batch(
|
||||||
|
last_user, 100
|
||||||
|
)
|
||||||
|
if len(users) == 0:
|
||||||
|
# We're done!
|
||||||
|
return TaskStatus.COMPLETE, None, None
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
"Deleted %i old one-time-keys for users '%s'..'%s'",
|
||||||
|
rowcount,
|
||||||
|
users[0],
|
||||||
|
users[-1],
|
||||||
|
)
|
||||||
|
last_user = users[-1]
|
||||||
|
|
||||||
|
# Store our progress
|
||||||
|
await self._task_scheduler.update_task(
|
||||||
|
task.id, result={"from_user": last_user}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Sleep a little before doing the next user.
|
||||||
|
#
|
||||||
|
# matrix.org has about 15M users in the e2e_one_time_keys_json table
|
||||||
|
# (comprising 20M devices). We want this to take about a week, so we need
|
||||||
|
# to do about one batch of 100 users every 4 seconds.
|
||||||
|
await self.clock.sleep(4)
|
||||||
|
|
||||||
|
|
||||||
def _check_cross_signing_key(
|
def _check_cross_signing_key(
|
||||||
key: JsonDict, user_id: str, key_type: str, signing_key: Optional[VerifyKey] = None
|
key: JsonDict, user_id: str, key_type: str, signing_key: Optional[VerifyKey] = None
|
||||||
|
|
|
@ -1453,6 +1453,54 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
|
||||||
impl,
|
impl,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def delete_old_otks_for_next_user_batch(
|
||||||
|
self, after_user_id: str, number_of_users: int
|
||||||
|
) -> Tuple[List[str], int]:
|
||||||
|
"""Deletes old OTKs belonging to the next batch of users
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
`(users, rows)`, where:
|
||||||
|
* `users` is the user IDs of the updated users. An empty list if we are done.
|
||||||
|
* `rows` is the number of deleted rows
|
||||||
|
"""
|
||||||
|
|
||||||
|
def impl(txn: LoggingTransaction) -> Tuple[List[str], int]:
|
||||||
|
# Find a batch of users
|
||||||
|
txn.execute(
|
||||||
|
"""
|
||||||
|
SELECT DISTINCT(user_id) FROM e2e_one_time_keys_json
|
||||||
|
WHERE user_id > ?
|
||||||
|
ORDER BY user_id
|
||||||
|
LIMIT ?
|
||||||
|
""",
|
||||||
|
(after_user_id, number_of_users),
|
||||||
|
)
|
||||||
|
users = [row[0] for row in txn.fetchall()]
|
||||||
|
if len(users) == 0:
|
||||||
|
return users, 0
|
||||||
|
|
||||||
|
# Delete any old OTKs belonging to those users.
|
||||||
|
#
|
||||||
|
# We only actually consider OTKs whose key ID is 6 characters long. These
|
||||||
|
# keys were likely made by libolm rather than Vodozemac; libolm only kept
|
||||||
|
# 100 private OTKs, so was far more vulnerable than Vodozemac to throwing
|
||||||
|
# away keys prematurely.
|
||||||
|
clause, args = make_in_list_sql_clause(
|
||||||
|
txn.database_engine, "user_id", users
|
||||||
|
)
|
||||||
|
sql = f"""
|
||||||
|
DELETE FROM e2e_one_time_keys_json
|
||||||
|
WHERE {clause} AND ts_added_ms < ? AND length(key_id) = 6
|
||||||
|
"""
|
||||||
|
args.append(self._clock.time_msec() - (7 * 24 * 3600 * 1000))
|
||||||
|
txn.execute(sql, args)
|
||||||
|
|
||||||
|
return users, txn.rowcount
|
||||||
|
|
||||||
|
return await self.db_pool.runInteraction(
|
||||||
|
"delete_old_otks_for_next_user_batch", impl
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
||||||
def __init__(
|
def __init__(
|
||||||
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
--
|
||||||
|
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||||
|
--
|
||||||
|
-- Copyright (C) 2024 New Vector, Ltd
|
||||||
|
--
|
||||||
|
-- This program is free software: you can redistribute it and/or modify
|
||||||
|
-- it under the terms of the GNU Affero General Public License as
|
||||||
|
-- published by the Free Software Foundation, either version 3 of the
|
||||||
|
-- License, or (at your option) any later version.
|
||||||
|
--
|
||||||
|
-- See the GNU Affero General Public License for more details:
|
||||||
|
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||||
|
|
||||||
|
-- Until Synapse 1.119, Synapse used to issue one-time-keys in a random order, leading to the possibility
|
||||||
|
-- that it could still have old OTKs that the client has dropped.
|
||||||
|
--
|
||||||
|
-- We create a scheduled task which will drop old OTKs, to flush them out.
|
||||||
|
INSERT INTO scheduled_tasks(id, action, status, timestamp)
|
||||||
|
VALUES ('delete_old_otks_task', 'delete_old_otks', 'scheduled', extract(epoch from current_timestamp) * 1000);
|
|
@ -0,0 +1,19 @@
|
||||||
|
--
|
||||||
|
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||||
|
--
|
||||||
|
-- Copyright (C) 2024 New Vector, Ltd
|
||||||
|
--
|
||||||
|
-- This program is free software: you can redistribute it and/or modify
|
||||||
|
-- it under the terms of the GNU Affero General Public License as
|
||||||
|
-- published by the Free Software Foundation, either version 3 of the
|
||||||
|
-- License, or (at your option) any later version.
|
||||||
|
--
|
||||||
|
-- See the GNU Affero General Public License for more details:
|
||||||
|
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||||
|
|
||||||
|
-- Until Synapse 1.119, Synapse used to issue one-time-keys in a random order, leading to the possibility
|
||||||
|
-- that it could still have old OTKs that the client has dropped.
|
||||||
|
--
|
||||||
|
-- We create a scheduled task which will drop old OTKs, to flush them out.
|
||||||
|
INSERT INTO scheduled_tasks(id, action, status, timestamp)
|
||||||
|
VALUES ('delete_old_otks_task', 'delete_old_otks', 'scheduled', strftime('%s', 'now') * 1000);
|
|
@ -19,6 +19,7 @@
|
||||||
# [This file includes modifications made by New Vector Limited]
|
# [This file includes modifications made by New Vector Limited]
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
|
import time
|
||||||
from typing import Dict, Iterable
|
from typing import Dict, Iterable
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
|
@ -1826,3 +1827,72 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
|
||||||
)
|
)
|
||||||
self.assertIs(exists, True)
|
self.assertIs(exists, True)
|
||||||
self.assertIs(replaceable_without_uia, False)
|
self.assertIs(replaceable_without_uia, False)
|
||||||
|
|
||||||
|
def test_delete_old_one_time_keys(self) -> None:
|
||||||
|
"""Test the db migration that clears out old OTKs"""
|
||||||
|
|
||||||
|
# We upload two sets of keys, one just over a week ago, and one just less than
|
||||||
|
# a week ago. Each batch contains some keys that match the deletion pattern
|
||||||
|
# (key IDs of 6 chars), and some that do not.
|
||||||
|
#
|
||||||
|
# Finally, set the scheduled task going, and check what gets deleted.
|
||||||
|
|
||||||
|
user_id = "@user000:" + self.hs.hostname
|
||||||
|
device_id = "xyz"
|
||||||
|
|
||||||
|
# The scheduled task should be for "now" in real, wallclock time, so
|
||||||
|
# set the test reactor to just over a week ago.
|
||||||
|
self.reactor.advance(time.time() - 7.5 * 24 * 3600)
|
||||||
|
|
||||||
|
# Upload some keys
|
||||||
|
self.get_success(
|
||||||
|
self.handler.upload_keys_for_user(
|
||||||
|
user_id,
|
||||||
|
device_id,
|
||||||
|
{
|
||||||
|
"one_time_keys": {
|
||||||
|
# some keys to delete
|
||||||
|
"alg1:AAAAAA": "key1",
|
||||||
|
"alg2:AAAAAB": {"key": "key2", "signatures": {"k1": "sig1"}},
|
||||||
|
# A key to *not* delete
|
||||||
|
"alg2:AAAAAAAAAA": {"key": "key3"},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# A day passes
|
||||||
|
self.reactor.advance(24 * 3600)
|
||||||
|
|
||||||
|
# Upload some more keys
|
||||||
|
self.get_success(
|
||||||
|
self.handler.upload_keys_for_user(
|
||||||
|
user_id,
|
||||||
|
device_id,
|
||||||
|
{
|
||||||
|
"one_time_keys": {
|
||||||
|
# some keys which match the pattern
|
||||||
|
"alg1:BAAAAA": "key1",
|
||||||
|
"alg2:BAAAAB": {"key": "key2", "signatures": {"k1": "sig1"}},
|
||||||
|
# A key to *not* delete
|
||||||
|
"alg2:BAAAAAAAAA": {"key": "key3"},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# The rest of the week passes, which should set the scheduled task going.
|
||||||
|
self.reactor.advance(6.5 * 24 * 3600)
|
||||||
|
|
||||||
|
# Check what we're left with in the database
|
||||||
|
remaining_key_ids = {
|
||||||
|
row[0]
|
||||||
|
for row in self.get_success(
|
||||||
|
self.handler.store.db_pool.simple_select_list(
|
||||||
|
"e2e_one_time_keys_json", None, ["key_id"]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
self.assertEqual(
|
||||||
|
remaining_key_ids, {"AAAAAAAAAA", "BAAAAA", "BAAAAB", "BAAAAAAAAA"}
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in a new issue