Add index to current_state_delta_stream (#17912)

As we're now using it in the sync APIs to get state changes within a
room
This commit is contained in:
Erik Johnston 2024-11-11 10:45:46 +00:00 committed by GitHub
parent 2f41f6d947
commit c486ec8bc2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 44 additions and 2 deletions

1
changelog.d/17912.misc Normal file
View file

@ -0,0 +1 @@
Add an index to `current_state_delta_stream` table.

View file

@ -20,18 +20,26 @@
# #
import logging import logging
from typing import List, Optional, Tuple from typing import TYPE_CHECKING, List, Optional, Tuple
import attr import attr
from synapse.logging.opentracing import trace from synapse.logging.opentracing import trace
from synapse.storage._base import SQLBaseStore from synapse.storage._base import SQLBaseStore
from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
make_in_list_sql_clause,
)
from synapse.storage.databases.main.stream import _filter_results_by_stream from synapse.storage.databases.main.stream import _filter_results_by_stream
from synapse.types import RoomStreamToken, StrCollection from synapse.types import RoomStreamToken, StrCollection
from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.iterutils import batch_iter from synapse.util.iterutils import batch_iter
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -54,6 +62,21 @@ class StateDeltasStore(SQLBaseStore):
# attribute. TODO: can we get static analysis to enforce this? # attribute. TODO: can we get static analysis to enforce this?
_curr_state_delta_stream_cache: StreamChangeCache _curr_state_delta_stream_cache: StreamChangeCache
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
self.db_pool.updates.register_background_index_update(
update_name="current_state_delta_stream_room_index",
index_name="current_state_delta_stream_room_idx",
table="current_state_delta_stream",
columns=("room_id", "stream_id"),
)
async def get_partial_current_state_deltas( async def get_partial_current_state_deltas(
self, prev_stream_id: int, max_stream_id: int self, prev_stream_id: int, max_stream_id: int
) -> Tuple[int, List[StateDelta]]: ) -> Tuple[int, List[StateDelta]]:

View file

@ -0,0 +1,18 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2024 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
-- Add an index on (user_id, device_id, algorithm, ts_added_ms) on e2e_one_time_keys_json, so that OTKs can
-- efficiently be issued in the same order they were uploaded.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(8804, 'current_state_delta_stream_room_index', '{}');