From 375b0a8a119bb925ca280f050a25a931662fcbb5 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 16 May 2023 15:56:38 -0400 Subject: [PATCH] Update code to refer to "workers". (#15606) A bunch of comments and variables are out of date and use obsolete terms. --- changelog.d/15606.misc | 1 + docs/replication.md | 6 ------ synapse/app/admin_cmd.py | 4 ++-- synapse/app/generic_worker.py | 4 ++-- synapse/module_api/__init__.py | 6 ++---- synapse/replication/tcp/client.py | 4 ++-- synapse/storage/databases/main/account_data.py | 7 ++----- synapse/storage/databases/main/cache.py | 14 +++++++------- synapse/storage/databases/main/devices.py | 2 -- .../storage/databases/main/events_worker.py | 7 ++----- synapse/storage/databases/main/receipts.py | 7 ++----- .../schema/main/delta/34/cache_stream.py | 2 +- tests/app/test_openid_listener.py | 2 +- tests/replication/slave/storage/__init__.py | 13 ------------- .../replication/{slave => storage}/__init__.py | 0 tests/replication/{slave => }/storage/_base.py | 18 +++++++++--------- .../{slave => }/storage/test_events.py | 10 +++++----- 17 files changed, 38 insertions(+), 69 deletions(-) create mode 100644 changelog.d/15606.misc delete mode 100644 tests/replication/slave/storage/__init__.py rename tests/replication/{slave => storage}/__init__.py (100%) rename tests/replication/{slave => }/storage/_base.py (82%) rename tests/replication/{slave => }/storage/test_events.py (98%) diff --git a/changelog.d/15606.misc b/changelog.d/15606.misc new file mode 100644 index 0000000000..44265fbf02 --- /dev/null +++ b/changelog.d/15606.misc @@ -0,0 +1 @@ +Update internal terminology for workers. diff --git a/docs/replication.md b/docs/replication.md index 108da9a065..25145daaf5 100644 --- a/docs/replication.md +++ b/docs/replication.md @@ -30,12 +30,6 @@ minimal. See [the TCP replication documentation](tcp_replication.md). -### The Slaved DataStore - -There are read-only version of the synapse storage layer in -`synapse/replication/slave/storage` that use the response of the -replication API to invalidate their caches. - ### The TCP Replication Module Information about how the tcp replication module is structured, including how the classes interact, can be found in diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index b05fe2c589..f9aada269a 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -64,7 +64,7 @@ from synapse.util.logcontext import LoggingContext logger = logging.getLogger("synapse.app.admin_cmd") -class AdminCmdSlavedStore( +class AdminCmdStore( FilteringWorkerStore, ClientIpWorkerStore, DeviceWorkerStore, @@ -103,7 +103,7 @@ class AdminCmdSlavedStore( class AdminCmdServer(HomeServer): - DATASTORE_CLASS = AdminCmdSlavedStore # type: ignore + DATASTORE_CLASS = AdminCmdStore # type: ignore async def export_data_command(hs: HomeServer, args: argparse.Namespace) -> None: diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index e17ce35b8e..909ebccf78 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -102,7 +102,7 @@ from synapse.util.httpresourcetree import create_resource_tree logger = logging.getLogger("synapse.app.generic_worker") -class GenericWorkerSlavedStore( +class GenericWorkerStore( # FIXME(#3714): We need to add UserDirectoryStore as we write directly # rather than going via the correct worker. UserDirectoryStore, @@ -154,7 +154,7 @@ class GenericWorkerSlavedStore( class GenericWorkerServer(HomeServer): - DATASTORE_CLASS = GenericWorkerSlavedStore # type: ignore + DATASTORE_CLASS = GenericWorkerStore # type: ignore def _listen_http(self, listener_config: ListenerConfig) -> None: assert listener_config.http_options is not None diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 2c9d181acf..0e9f366cba 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -134,7 +134,7 @@ from synapse.util.caches.descriptors import CachedFunction, cached as _cached from synapse.util.frozenutils import freeze if TYPE_CHECKING: - from synapse.app.generic_worker import GenericWorkerSlavedStore + from synapse.app.generic_worker import GenericWorkerStore from synapse.server import HomeServer @@ -237,9 +237,7 @@ class ModuleApi: # TODO: Fix this type hint once the types for the data stores have been ironed # out. - self._store: Union[ - DataStore, "GenericWorkerSlavedStore" - ] = hs.get_datastores().main + self._store: Union[DataStore, "GenericWorkerStore"] = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() self._auth = hs.get_auth() self._auth_handler = auth_handler diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 200f667fdf..139f57cf86 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -60,7 +60,7 @@ _WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 5 class ReplicationDataHandler: """Handles incoming stream updates from replication. - This instance notifies the slave data store about updates. Can be subclassed + This instance notifies the data store about updates. Can be subclassed to handle updates in additional ways. """ @@ -91,7 +91,7 @@ class ReplicationDataHandler: ) -> None: """Called to handle a batch of replication data with a given stream token. - By default this just pokes the slave store. Can be overridden in subclasses to + By default, this just pokes the data store. Can be overridden in subclasses to handle more. Args: diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index a9843f6e17..8f7bdbc61a 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -85,13 +85,10 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) writers=hs.config.worker.writers.account_data, ) else: + # Multiple writers are not supported for SQLite. + # # We shouldn't be running in worker mode with SQLite, but its useful # to support it for unit tests. - # - # If this process is the writer than we need to use - # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets - # updated over replication. (Multiple writers are not supported for - # SQLite). self._account_data_id_gen = StreamIdGenerator( db_conn, hs.get_replication_notifier(), diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index bd07d20171..46fa0a73f9 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -274,11 +274,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore): async def invalidate_cache_and_stream( self, cache_name: str, keys: Tuple[Any, ...] ) -> None: - """Invalidates the cache and adds it to the cache stream so slaves + """Invalidates the cache and adds it to the cache stream so other workers will know to invalidate their caches. - This should only be used to invalidate caches where slaves won't - otherwise know from other replication streams that the cache should + This should only be used to invalidate caches where other workers won't + otherwise have known from other replication streams that the cache should be invalidated. """ cache_func = getattr(self, cache_name, None) @@ -297,11 +297,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore): cache_func: CachedFunction, keys: Tuple[Any, ...], ) -> None: - """Invalidates the cache and adds it to the cache stream so slaves + """Invalidates the cache and adds it to the cache stream so other workers will know to invalidate their caches. - This should only be used to invalidate caches where slaves won't - otherwise know from other replication streams that the cache should + This should only be used to invalidate caches where other workers won't + otherwise have known from other replication streams that the cache should be invalidated. """ txn.call_after(cache_func.invalidate, keys) @@ -310,7 +310,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): def _invalidate_all_cache_and_stream( self, txn: LoggingTransaction, cache_func: CachedFunction ) -> None: - """Invalidates the entire cache and adds it to the cache stream so slaves + """Invalidates the entire cache and adds it to the cache stream so other workers will know to invalidate their caches. """ diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 5503621ad6..a67fdb3c22 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -105,8 +105,6 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): is_writer=hs.config.worker.worker_app is None, ) - # Type-ignore: _device_list_id_gen is mixed in from either DataStore (as a - # StreamIdGenerator) or SlavedDataStore (as a SlavedIdTracker). device_list_max = self._device_list_id_gen.get_current_token() device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict( db_conn, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 0ff3fc7369..53aa5933d5 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -213,13 +213,10 @@ class EventsWorkerStore(SQLBaseStore): writers=hs.config.worker.writers.events, ) else: + # Multiple writers are not supported for SQLite. + # # We shouldn't be running in worker mode with SQLite, but its useful # to support it for unit tests. - # - # If this process is the writer than we need to use - # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets - # updated over replication. (Multiple writers are not supported for - # SQLite). self._stream_id_gen = StreamIdGenerator( db_conn, hs.get_replication_notifier(), diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 074942b167..5ee5c7ad9f 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -85,13 +85,10 @@ class ReceiptsWorkerStore(SQLBaseStore): else: self._can_write_to_receipts = True + # Multiple writers are not supported for SQLite. + # # We shouldn't be running in worker mode with SQLite, but its useful # to support it for unit tests. - # - # If this process is the writer than we need to use - # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets - # updated over replication. (Multiple writers are not supported for - # SQLite). self._receipts_id_gen = StreamIdGenerator( db_conn, hs.get_replication_notifier(), diff --git a/synapse/storage/schema/main/delta/34/cache_stream.py b/synapse/storage/schema/main/delta/34/cache_stream.py index 682c86da1a..882f9b893b 100644 --- a/synapse/storage/schema/main/delta/34/cache_stream.py +++ b/synapse/storage/schema/main/delta/34/cache_stream.py @@ -21,7 +21,7 @@ from synapse.storage.prepare_database import get_statements logger = logging.getLogger(__name__) -# This stream is used to notify replication slaves that some caches have +# This stream is used to notify workers over replication that some caches have # been invalidated that they cannot infer from the other streams. CREATE_TABLE = """ CREATE TABLE cache_invalidation_stream ( diff --git a/tests/app/test_openid_listener.py b/tests/app/test_openid_listener.py index 2ee343d8a4..6e0413400e 100644 --- a/tests/app/test_openid_listener.py +++ b/tests/app/test_openid_listener.py @@ -38,7 +38,7 @@ class FederationReaderOpenIDListenerTests(HomeserverTestCase): def default_config(self) -> JsonDict: conf = super().default_config() - # we're using FederationReaderServer, which uses a SlavedStore, so we + # we're using GenericWorkerServer, which uses a GenericWorkerStore, so we # have to tell the FederationHandler not to try to access stuff that is only # in the primary store. conf["worker_app"] = "yes" diff --git a/tests/replication/slave/storage/__init__.py b/tests/replication/slave/storage/__init__.py deleted file mode 100644 index f43a360a80..0000000000 --- a/tests/replication/slave/storage/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/tests/replication/slave/__init__.py b/tests/replication/storage/__init__.py similarity index 100% rename from tests/replication/slave/__init__.py rename to tests/replication/storage/__init__.py diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/storage/_base.py similarity index 82% rename from tests/replication/slave/storage/_base.py rename to tests/replication/storage/_base.py index 4c9b494344..de26a62ae1 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/storage/_base.py @@ -24,7 +24,7 @@ from synapse.util import Clock from tests.replication._base import BaseStreamTestCase -class BaseSlavedStoreTestCase(BaseStreamTestCase): +class BaseWorkerStoreTestCase(BaseStreamTestCase): def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: return self.setup_test_homeserver(federation_client=Mock()) @@ -34,7 +34,7 @@ class BaseSlavedStoreTestCase(BaseStreamTestCase): self.reconnect() self.master_store = hs.get_datastores().main - self.slaved_store = self.worker_hs.get_datastores().main + self.worker_store = self.worker_hs.get_datastores().main persistence = hs.get_storage_controllers().persistence assert persistence is not None self.persistance = persistence @@ -50,7 +50,7 @@ class BaseSlavedStoreTestCase(BaseStreamTestCase): self, method: str, args: Iterable[Any], expected_result: Optional[Any] = None ) -> None: master_result = self.get_success(getattr(self.master_store, method)(*args)) - slaved_result = self.get_success(getattr(self.slaved_store, method)(*args)) + worker_result = self.get_success(getattr(self.worker_store, method)(*args)) if expected_result is not None: self.assertEqual( master_result, @@ -59,14 +59,14 @@ class BaseSlavedStoreTestCase(BaseStreamTestCase): % (expected_result, master_result), ) self.assertEqual( - slaved_result, + worker_result, expected_result, - "Expected slave result to be %r but was %r" - % (expected_result, slaved_result), + "Expected worker result to be %r but was %r" + % (expected_result, worker_result), ) self.assertEqual( master_result, - slaved_result, - "Slave result %r does not match master result %r" - % (slaved_result, master_result), + worker_result, + "Worker result %r does not match master result %r" + % (worker_result, master_result), ) diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/storage/test_events.py similarity index 98% rename from tests/replication/slave/storage/test_events.py rename to tests/replication/storage/test_events.py index b2125b1fea..f7c6417a09 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/storage/test_events.py @@ -36,7 +36,7 @@ from synapse.util import Clock from tests.server import FakeTransport -from ._base import BaseSlavedStoreTestCase +from ._base import BaseWorkerStoreTestCase USER_ID = "@feeling:test" USER_ID_2 = "@bright:test" @@ -63,7 +63,7 @@ def patch__eq__(cls: object) -> Callable[[], None]: return unpatch -class EventsWorkerStoreTestCase(BaseSlavedStoreTestCase): +class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase): STORE_TYPE = EventsWorkerStore def setUp(self) -> None: @@ -294,7 +294,7 @@ class EventsWorkerStoreTestCase(BaseSlavedStoreTestCase): assert j2.internal_metadata.stream_ordering is not None event_source = RoomEventSource(self.hs) - event_source.store = self.slaved_store + event_source.store = self.worker_store current_token = event_source.get_current_key() # gradually stream out the replication @@ -310,12 +310,12 @@ class EventsWorkerStoreTestCase(BaseSlavedStoreTestCase): # # First, we get a list of the rooms we are joined to joined_rooms = self.get_success( - self.slaved_store.get_rooms_for_user_with_stream_ordering(USER_ID_2) + self.worker_store.get_rooms_for_user_with_stream_ordering(USER_ID_2) ) # Then, we get a list of the events since the last sync membership_changes = self.get_success( - self.slaved_store.get_membership_changes_for_user( + self.worker_store.get_membership_changes_for_user( USER_ID_2, prev_token, current_token ) )