From fd48fc45853eb193a22a08d874eb473e668e2d6a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Mar 2024 16:29:23 +0000 Subject: [PATCH] Fixups to new push stream (#17038) Follow on from #17037 --- changelog.d/17037.feature | 2 +- changelog.d/17038.feature | 1 + docker/configure_workers_and_start.py | 8 ++++++++ docs/workers.md | 4 ++-- synapse/config/workers.py | 8 ++++---- synapse/handlers/room_member.py | 6 ++++-- synapse/replication/tcp/handler.py | 2 +- synapse/rest/client/push_rule.py | 4 +++- synapse/storage/databases/main/push_rule.py | 4 +++- 9 files changed, 27 insertions(+), 12 deletions(-) create mode 100644 changelog.d/17038.feature diff --git a/changelog.d/17037.feature b/changelog.d/17037.feature index bd419c817d..498221e19e 100644 --- a/changelog.d/17037.feature +++ b/changelog.d/17037.feature @@ -1 +1 @@ -Add support for moving `/push_rules` off of main process. +Add support for moving `/pushrules` off of main process. diff --git a/changelog.d/17038.feature b/changelog.d/17038.feature new file mode 100644 index 0000000000..498221e19e --- /dev/null +++ b/changelog.d/17038.feature @@ -0,0 +1 @@ +Add support for moving `/pushrules` off of main process. diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 3917d9ae7e..77534a4f4f 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -310,6 +310,13 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { "shared_extra_conf": {}, "worker_extra_conf": "", }, + "push_rules": { + "app": "synapse.app.generic_worker", + "listener_resources": ["client", "replication"], + "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/pushrules/"], + "shared_extra_conf": {}, + "worker_extra_conf": "", + }, } # Templates for sections that may be inserted multiple times in config files @@ -401,6 +408,7 @@ def add_worker_roles_to_shared_config( "receipts", "to_device", "typing", + "push_rules", ] # Worker-type specific sharding config. Now a single worker can fulfill multiple diff --git a/docs/workers.md b/docs/workers.md index 5ea8ad59bd..ab9c1db86b 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -532,12 +532,12 @@ the stream writer for the `presence` stream: ^/_matrix/client/(api/v1|r0|v3|unstable)/presence/ -##### The `push` stream +##### The `push_rules` stream The following endpoints should be routed directly to the worker configured as the stream writer for the `push` stream: - ^/_matrix/client/(api/v1|r0|v3|unstable)/push_rules/ + ^/_matrix/client/(api/v1|r0|v3|unstable)/pushrules/ #### Restrict outbound federation traffic to a specific set of workers diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 9f81a73d6f..7ecf349e4a 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -156,7 +156,7 @@ class WriterLocations: can only be a single instance. presence: The instances that write to the presence stream. Currently can only be a single instance. - push: The instances that write to the push stream. Currently + push_rules: The instances that write to the push stream. Currently can only be a single instance. """ @@ -184,7 +184,7 @@ class WriterLocations: default=["master"], converter=_instance_to_list_converter, ) - push: List[str] = attr.ib( + push_rules: List[str] = attr.ib( default=["master"], converter=_instance_to_list_converter, ) @@ -347,7 +347,7 @@ class WorkerConfig(Config): "account_data", "receipts", "presence", - "push", + "push_rules", ): instances = _instance_to_list_converter(getattr(self.writers, stream)) for instance in instances: @@ -385,7 +385,7 @@ class WorkerConfig(Config): "Must only specify one instance to handle `presence` messages." ) - if len(self.writers.push) != 1: + if len(self.writers.push_rules) != 1: raise ConfigError( "Must only specify one instance to handle `push` messages." ) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index ee2e807afc..601d37341b 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -182,8 +182,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): hs.config.server.forgotten_room_retention_period ) - self._is_push_writer = hs.get_instance_name() in hs.config.worker.writers.push - self._push_writer = hs.config.worker.writers.push[0] + self._is_push_writer = ( + hs.get_instance_name() in hs.config.worker.writers.push_rules + ) + self._push_writer = hs.config.worker.writers.push_rules[0] self._copy_push_client = ReplicationCopyPusherRestServlet.make_client(hs) def _on_user_joined_room(self, event_id: str, room_id: str) -> None: diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 4342d6ce70..72a42cb6cc 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -180,7 +180,7 @@ class ReplicationCommandHandler: continue if isinstance(stream, PushRulesStream): - if hs.get_instance_name() in hs.config.worker.writers.push: + if hs.get_instance_name() in hs.config.worker.writers.push_rules: self._streams_to_replicate.append(stream) continue diff --git a/synapse/rest/client/push_rule.py b/synapse/rest/client/push_rule.py index 9c9bfc9794..af042504c9 100644 --- a/synapse/rest/client/push_rule.py +++ b/synapse/rest/client/push_rule.py @@ -59,7 +59,9 @@ class PushRuleRestServlet(RestServlet): self.auth = hs.get_auth() self.store = hs.get_datastores().main self.notifier = hs.get_notifier() - self._is_push_worker = hs.get_instance_name() in hs.config.worker.writers.push + self._is_push_worker = ( + hs.get_instance_name() in hs.config.worker.writers.push_rules + ) self._push_rules_handler = hs.get_push_rules_handler() self._push_rule_linearizer = Linearizer(name="push_rules") diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index ed734f03ac..660c834518 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -136,7 +136,9 @@ class PushRulesWorkerStore( ): super().__init__(database, db_conn, hs) - self._is_push_writer = hs.get_instance_name() in hs.config.worker.writers.push + self._is_push_writer = ( + hs.get_instance_name() in hs.config.worker.writers.push_rules + ) # In the worker store this is an ID tracker which we overwrite in the non-worker # class below that is used on the main process.