From c486ec8bc24460d86e23e174b13839875c382ed4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 11 Nov 2024 10:45:46 +0000 Subject: [PATCH] Add index to `current_state_delta_stream` (#17912) As we're now using it in the sync APIs to get state changes within a room --- changelog.d/17912.misc | 1 + .../storage/databases/main/state_deltas.py | 27 +++++++++++++++++-- .../delta/88/04_current_state_delta_index.sql | 18 +++++++++++++ 3 files changed, 44 insertions(+), 2 deletions(-) create mode 100644 changelog.d/17912.misc create mode 100644 synapse/storage/schema/main/delta/88/04_current_state_delta_index.sql diff --git a/changelog.d/17912.misc b/changelog.d/17912.misc new file mode 100644 index 0000000000..f6f661476a --- /dev/null +++ b/changelog.d/17912.misc @@ -0,0 +1 @@ +Add an index to `current_state_delta_stream` table. diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py index ba52fff652..117ee89d0a 100644 --- a/synapse/storage/databases/main/state_deltas.py +++ b/synapse/storage/databases/main/state_deltas.py @@ -20,18 +20,26 @@ # import logging -from typing import List, Optional, Tuple +from typing import TYPE_CHECKING, List, Optional, Tuple import attr from synapse.logging.opentracing import trace from synapse.storage._base import SQLBaseStore -from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, + make_in_list_sql_clause, +) from synapse.storage.databases.main.stream import _filter_results_by_stream from synapse.types import RoomStreamToken, StrCollection from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.iterutils import batch_iter +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -54,6 +62,21 @@ class StateDeltasStore(SQLBaseStore): # attribute. TODO: can we get static analysis to enforce this? _curr_state_delta_stream_cache: StreamChangeCache + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + self.db_pool.updates.register_background_index_update( + update_name="current_state_delta_stream_room_index", + index_name="current_state_delta_stream_room_idx", + table="current_state_delta_stream", + columns=("room_id", "stream_id"), + ) + async def get_partial_current_state_deltas( self, prev_stream_id: int, max_stream_id: int ) -> Tuple[int, List[StateDelta]]: diff --git a/synapse/storage/schema/main/delta/88/04_current_state_delta_index.sql b/synapse/storage/schema/main/delta/88/04_current_state_delta_index.sql new file mode 100644 index 0000000000..ad54302a8f --- /dev/null +++ b/synapse/storage/schema/main/delta/88/04_current_state_delta_index.sql @@ -0,0 +1,18 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2024 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- . + + +-- Add an index on (user_id, device_id, algorithm, ts_added_ms) on e2e_one_time_keys_json, so that OTKs can +-- efficiently be issued in the same order they were uploaded. +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (8804, 'current_state_delta_stream_room_index', '{}');