Combine AbstractStreamIdTracker and AbstractStreamIdGenerator. (#15192)

AbstractStreamIdTracker (now) has only a single sub-class: AbstractStreamIdGenerator,
combine them to simplify some code and remove any direct references to
AbstractStreamIdTracker.
This commit is contained in:
Patrick Cloke 2023-03-03 08:13:37 -05:00 committed by GitHub
parent 848f7e3d5f
commit 02f74f3a99
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 15 additions and 27 deletions

1
changelog.d/15192.misc Normal file
View file

@ -0,0 +1 @@
Combine `AbstractStreamIdTracker` and `AbstractStreamIdGenerator`.

View file

@ -52,7 +52,6 @@ from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.types import Cursor from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import ( from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator, AbstractStreamIdGenerator,
AbstractStreamIdTracker,
StreamIdGenerator, StreamIdGenerator,
) )
from synapse.types import JsonDict, StrCollection, get_verify_key_from_cross_signing_key from synapse.types import JsonDict, StrCollection, get_verify_key_from_cross_signing_key
@ -91,7 +90,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
# In the worker store this is an ID tracker which we overwrite in the non-worker # In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process. # class below that is used on the main process.
self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator( self._device_list_id_gen = StreamIdGenerator(
db_conn, db_conn,
hs.get_replication_notifier(), hs.get_replication_notifier(),
"device_lists_stream", "device_lists_stream",
@ -712,9 +711,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
The new stream ID. The new stream ID.
""" """
# TODO: this looks like it's _writing_. Should this be on DeviceStore rather async with self._device_list_id_gen.get_next() as stream_id:
# than DeviceWorkerStore?
async with self._device_list_id_gen.get_next() as stream_id: # type: ignore[attr-defined]
await self.db_pool.runInteraction( await self.db_pool.runInteraction(
"add_user_sig_change_to_streams", "add_user_sig_change_to_streams",
self._add_user_signature_change_txn, self._add_user_signature_change_txn,

View file

@ -72,7 +72,6 @@ from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Cursor from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import ( from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator, AbstractStreamIdGenerator,
AbstractStreamIdTracker,
MultiWriterIdGenerator, MultiWriterIdGenerator,
StreamIdGenerator, StreamIdGenerator,
) )
@ -187,8 +186,8 @@ class EventsWorkerStore(SQLBaseStore):
): ):
super().__init__(database, db_conn, hs) super().__init__(database, db_conn, hs)
self._stream_id_gen: AbstractStreamIdTracker self._stream_id_gen: AbstractStreamIdGenerator
self._backfill_id_gen: AbstractStreamIdTracker self._backfill_id_gen: AbstractStreamIdGenerator
if isinstance(database.engine, PostgresEngine): if isinstance(database.engine, PostgresEngine):
# If we're using Postgres than we can use `MultiWriterIdGenerator` # If we're using Postgres than we can use `MultiWriterIdGenerator`
# regardless of whether this process writes to the streams or not. # regardless of whether this process writes to the streams or not.

View file

@ -46,7 +46,6 @@ from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException
from synapse.storage.util.id_generators import ( from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator, AbstractStreamIdGenerator,
AbstractStreamIdTracker,
IdGenerator, IdGenerator,
StreamIdGenerator, StreamIdGenerator,
) )
@ -118,7 +117,7 @@ class PushRulesWorkerStore(
# In the worker store this is an ID tracker which we overwrite in the non-worker # In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process. # class below that is used on the main process.
self._push_rules_stream_id_gen: AbstractStreamIdTracker = StreamIdGenerator( self._push_rules_stream_id_gen = StreamIdGenerator(
db_conn, db_conn,
hs.get_replication_notifier(), hs.get_replication_notifier(),
"push_rules_stream", "push_rules_stream",

View file

@ -36,7 +36,6 @@ from synapse.storage.database import (
) )
from synapse.storage.util.id_generators import ( from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator, AbstractStreamIdGenerator,
AbstractStreamIdTracker,
StreamIdGenerator, StreamIdGenerator,
) )
from synapse.types import JsonDict from synapse.types import JsonDict
@ -60,7 +59,7 @@ class PusherWorkerStore(SQLBaseStore):
# In the worker store this is an ID tracker which we overwrite in the non-worker # In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process. # class below that is used on the main process.
self._pushers_id_gen: AbstractStreamIdTracker = StreamIdGenerator( self._pushers_id_gen = StreamIdGenerator(
db_conn, db_conn,
hs.get_replication_notifier(), hs.get_replication_notifier(),
"pushers", "pushers",

View file

@ -39,7 +39,7 @@ from synapse.storage.database import (
from synapse.storage.engines import PostgresEngine from synapse.storage.engines import PostgresEngine
from synapse.storage.engines._base import IsolationLevel from synapse.storage.engines._base import IsolationLevel
from synapse.storage.util.id_generators import ( from synapse.storage.util.id_generators import (
AbstractStreamIdTracker, AbstractStreamIdGenerator,
MultiWriterIdGenerator, MultiWriterIdGenerator,
StreamIdGenerator, StreamIdGenerator,
) )
@ -65,7 +65,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
# In the worker store this is an ID tracker which we overwrite in the non-worker # In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process. # class below that is used on the main process.
self._receipts_id_gen: AbstractStreamIdTracker self._receipts_id_gen: AbstractStreamIdGenerator
if isinstance(database.engine, PostgresEngine): if isinstance(database.engine, PostgresEngine):
self._can_write_to_receipts = ( self._can_write_to_receipts = (
@ -768,7 +768,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
"insert_receipt_conv", self._graph_to_linear, room_id, event_ids "insert_receipt_conv", self._graph_to_linear, room_id, event_ids
) )
async with self._receipts_id_gen.get_next() as stream_id: # type: ignore[attr-defined] async with self._receipts_id_gen.get_next() as stream_id:
event_ts = await self.db_pool.runInteraction( event_ts = await self.db_pool.runInteraction(
"insert_linearized_receipt", "insert_linearized_receipt",
self._insert_linearized_receipt_txn, self._insert_linearized_receipt_txn,

View file

@ -93,8 +93,11 @@ def _load_current_id(
return res return res
class AbstractStreamIdTracker(metaclass=abc.ABCMeta): class AbstractStreamIdGenerator(metaclass=abc.ABCMeta):
"""Tracks the "current" stream ID of a stream that may have multiple writers. """Generates or tracks stream IDs for a stream that may have multiple writers.
Each stream ID represents a write transaction, whose completion is tracked
so that the "current" stream ID of the stream can be determined.
Stream IDs are monotonically increasing or decreasing integers representing write Stream IDs are monotonically increasing or decreasing integers representing write
transactions. The "current" stream ID is the stream ID such that all transactions transactions. The "current" stream ID is the stream ID such that all transactions
@ -130,16 +133,6 @@ class AbstractStreamIdTracker(metaclass=abc.ABCMeta):
""" """
raise NotImplementedError() raise NotImplementedError()
class AbstractStreamIdGenerator(AbstractStreamIdTracker):
"""Generates stream IDs for a stream that may have multiple writers.
Each stream ID represents a write transaction, whose completion is tracked
so that the "current" stream ID of the stream can be determined.
See `AbstractStreamIdTracker` for more details.
"""
@abc.abstractmethod @abc.abstractmethod
def get_next(self) -> AsyncContextManager[int]: def get_next(self) -> AsyncContextManager[int]:
""" """