diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 2e4ee723d3..a775f70c4e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -328,51 +328,8 @@ jobs: - arrangement: monolith database: Postgres - steps: - # The path is set via a file given by $GITHUB_PATH. We need both Go 1.17 and GOPATH on the path to run Complement. - # See https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#adding-a-system-path - - name: "Set Go Version" - run: | - # Add Go 1.17 to the PATH: see https://github.com/actions/virtual-environments/blob/main/images/linux/Ubuntu2004-Readme.md#environment-variables-2 - echo "$GOROOT_1_17_X64/bin" >> $GITHUB_PATH - # Add the Go path to the PATH: We need this so we can call gotestfmt - echo "~/go/bin" >> $GITHUB_PATH - - - name: "Install Complement Dependencies" - run: | - sudo apt-get update && sudo apt-get install -y libolm3 libolm-dev - go get -v github.com/haveyoudebuggedit/gotestfmt/v2/cmd/gotestfmt@latest - - - name: Run actions/checkout@v2 for synapse - uses: actions/checkout@v2 - with: - path: synapse - - - name: "Install custom gotestfmt template" - run: | - mkdir .gotestfmt/github -p - cp synapse/.ci/complement_package.gotpl .gotestfmt/github/package.gotpl - - # Attempt to check out the same branch of Complement as the PR. If it - # doesn't exist, fallback to HEAD. - - name: Checkout complement - run: synapse/.ci/scripts/checkout_complement.sh - - - run: | - set -o pipefail - POSTGRES=${{ (matrix.database == 'Postgres') && 1 || '' }} COMPLEMENT_DIR=`pwd`/complement synapse/scripts-dev/complement.sh -json 2>&1 | gotestfmt - shell: bash - name: Run Complement Tests - - # We only run the workers tests on `develop` for now, because they're too slow to wait for on PRs. - # Sadly, you can't have an `if` condition on the value of a matrix, so this is a temporary, separate job for now. - # GitHub Actions doesn't support YAML anchors, so it's full-on duplication for now. - complement-developonly: - if: "${{ !failure() && !cancelled() && (github.ref == 'refs/heads/develop') }}" - needs: linting-done - runs-on: ubuntu-latest - - name: "Complement Workers (develop only)" + - arrangement: workers + database: Postgres steps: # The path is set via a file given by $GITHUB_PATH. We need both Go 1.17 and GOPATH on the path to run Complement. @@ -406,7 +363,7 @@ jobs: - run: | set -o pipefail - WORKERS=1 COMPLEMENT_DIR=`pwd`/complement synapse/scripts-dev/complement.sh -json 2>&1 | gotestfmt + POSTGRES=${{ (matrix.database == 'Postgres') && 1 || '' }} WORKERS=${{ (matrix.arrangement == 'workers') && 1 || '' }} COMPLEMENT_DIR=`pwd`/complement synapse/scripts-dev/complement.sh -json 2>&1 | gotestfmt shell: bash name: Run Complement Tests diff --git a/changelog.d/13127.misc b/changelog.d/13127.misc new file mode 100644 index 0000000000..1414811e0a --- /dev/null +++ b/changelog.d/13127.misc @@ -0,0 +1 @@ +Improve startup times in Complement test runs against workers, particularly in CPU-constrained environments. \ No newline at end of file diff --git a/docker/complement/conf/start_for_complement.sh b/docker/complement/conf/start_for_complement.sh index 773c7db22f..cc6482f763 100755 --- a/docker/complement/conf/start_for_complement.sh +++ b/docker/complement/conf/start_for_complement.sh @@ -59,6 +59,9 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then synchrotron, \ appservice, \ pusher" + + # Improve startup times by using a launcher based on fork() + export SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER=1 else # Empty string here means 'main process only' export SYNAPSE_WORKER_TYPES="" diff --git a/docker/conf-workers/synapse.supervisord.conf.j2 b/docker/conf-workers/synapse.supervisord.conf.j2 index 6443450491..481eb4fc92 100644 --- a/docker/conf-workers/synapse.supervisord.conf.j2 +++ b/docker/conf-workers/synapse.supervisord.conf.j2 @@ -1,3 +1,24 @@ +{% if use_forking_launcher %} +[program:synapse_fork] +command=/usr/local/bin/python -m synapse.app.complement_fork_starter + {{ main_config_path }} + synapse.app.homeserver + --config-path="{{ main_config_path }}" + --config-path=/conf/workers/shared.yaml + {%- for worker in workers %} + -- {{ worker.app }} + --config-path="{{ main_config_path }}" + --config-path=/conf/workers/shared.yaml + --config-path=/conf/workers/{{ worker.name }}.yaml + {%- endfor %} +stdout_logfile=/dev/stdout +stdout_logfile_maxbytes=0 +stderr_logfile=/dev/stderr +stderr_logfile_maxbytes=0 +autorestart=unexpected +exitcodes=0 + +{% else %} [program:synapse_main] command=/usr/local/bin/prefix-log /usr/local/bin/python -m synapse.app.homeserver --config-path="{{ main_config_path }}" @@ -13,7 +34,7 @@ autorestart=unexpected exitcodes=0 -{% for worker in workers %} + {% for worker in workers %} [program:synapse_{{ worker.name }}] command=/usr/local/bin/prefix-log /usr/local/bin/python -m {{ worker.app }} --config-path="{{ main_config_path }}" @@ -27,4 +48,5 @@ stdout_logfile_maxbytes=0 stderr_logfile=/dev/stderr stderr_logfile_maxbytes=0 -{% endfor %} + {% endfor %} +{% endif %} diff --git a/docker/conf/log.config b/docker/conf/log.config index dc8c70befd..d9e85aa533 100644 --- a/docker/conf/log.config +++ b/docker/conf/log.config @@ -2,7 +2,11 @@ version: 1 formatters: precise: + {% if include_worker_name_in_log_line %} + format: '{{ worker_name }} | %(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s' + {% else %} format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s' + {% endif %} handlers: {% if LOG_FILE_PATH %} diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 2134b648d5..4521f99eb4 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -26,6 +26,9 @@ # * SYNAPSE_TLS_CERT: Path to a TLS certificate in PEM format. # * SYNAPSE_TLS_KEY: Path to a TLS key. If this and SYNAPSE_TLS_CERT are specified, # Nginx will be configured to serve TLS on port 8448. +# * SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER: Whether to use the forking launcher, +# only intended for usage in Complement at the moment. +# No stability guarantees are provided. # # NOTE: According to Complement's ENTRYPOINT expectations for a homeserver image (as defined # in the project's README), this script may be run multiple times, and functionality should @@ -525,6 +528,7 @@ def generate_worker_files( "/etc/supervisor/conf.d/synapse.conf", workers=worker_descriptors, main_config_path=config_path, + use_forking_launcher=environ.get("SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER"), ) # healthcheck config @@ -560,6 +564,9 @@ def generate_worker_log_config( log_config_filepath, worker_name=worker_name, **extra_log_template_args, + include_worker_name_in_log_line=environ.get( + "SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER" + ), ) return log_config_filepath diff --git a/docker/start.py b/docker/start.py index 4ac8f03477..5a98dce551 100755 --- a/docker/start.py +++ b/docker/start.py @@ -110,7 +110,11 @@ def generate_config_from_template( log_config_file = environ["SYNAPSE_LOG_CONFIG"] log("Generating log config file " + log_config_file) - convert("/conf/log.config", log_config_file, environ) + convert( + "/conf/log.config", + log_config_file, + {**environ, "include_worker_name_in_log_line": False}, + ) # Hopefully we already have a signing key, but generate one if not. args = [ diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 363ac98ea9..923891ae0d 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -106,7 +106,9 @@ def register_sighup(func: Callable[P, None], *args: P.args, **kwargs: P.kwargs) def start_worker_reactor( appname: str, config: HomeServerConfig, - run_command: Callable[[], None] = reactor.run, + # Use a lambda to avoid binding to a given reactor at import time. + # (needed when synapse.app.complement_fork_starter is being used) + run_command: Callable[[], None] = lambda: reactor.run(), ) -> None: """Run the reactor in the main process @@ -141,7 +143,9 @@ def start_reactor( daemonize: bool, print_pidfile: bool, logger: logging.Logger, - run_command: Callable[[], None] = reactor.run, + # Use a lambda to avoid binding to a given reactor at import time. + # (needed when synapse.app.complement_fork_starter is being used) + run_command: Callable[[], None] = lambda: reactor.run(), ) -> None: """Run the reactor in the main process diff --git a/synapse/app/complement_fork_starter.py b/synapse/app/complement_fork_starter.py new file mode 100644 index 0000000000..89eb07df27 --- /dev/null +++ b/synapse/app/complement_fork_starter.py @@ -0,0 +1,190 @@ +# Copyright 2022 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ## What this script does +# +# This script spawns multiple workers, whilst only going through the code loading +# process once. The net effect is that start-up time for a swarm of workers is +# reduced, particularly in CPU-constrained environments. +# +# Before the workers are spawned, the database is prepared in order to avoid the +# workers racing. +# +# ## Stability +# +# This script is only intended for use within the Synapse images for the +# Complement test suite. +# There are currently no stability guarantees whatsoever; especially not about: +# - whether it will continue to exist in future versions; +# - the format of its command-line arguments; or +# - any details about its behaviour or principles of operation. +# +# ## Usage +# +# The first argument should be the path to the database configuration, used to +# set up the database. The rest of the arguments are used as follows: +# Each worker is specified as an argument group (each argument group is +# separated by '--'). +# The first argument in each argument group is the Python module name of the application +# to start. Further arguments are then passed to that module as-is. +# +# ## Example +# +# python -m synapse.app.complement_fork_starter path_to_db_config.yaml \ +# synapse.app.homeserver [args..] -- \ +# synapse.app.generic_worker [args..] -- \ +# ... +# synapse.app.generic_worker [args..] +# +import argparse +import importlib +import itertools +import multiprocessing +import sys +from typing import Any, Callable, List + +from twisted.internet.main import installReactor + + +class ProxiedReactor: + """ + Twisted tracks the 'installed' reactor as a global variable. + (Actually, it does some module trickery, but the effect is similar.) + + The default EpollReactor is buggy if it's created before a process is + forked, then used in the child. + See https://twistedmatrix.com/trac/ticket/4759#comment:17. + + However, importing certain Twisted modules will automatically create and + install a reactor if one hasn't already been installed. + It's not normally possible to re-install a reactor. + + Given the goal of launching workers with fork() to only import the code once, + this presents a conflict. + Our work around is to 'install' this ProxiedReactor which prevents Twisted + from creating and installing one, but which lets us replace the actual reactor + in use later on. + """ + + def __init__(self) -> None: + self.___reactor_target: Any = None + + def _install_real_reactor(self, new_reactor: Any) -> None: + """ + Install a real reactor for this ProxiedReactor to forward lookups onto. + + This method is specific to our ProxiedReactor and should not clash with + any names used on an actual Twisted reactor. + """ + self.___reactor_target = new_reactor + + def __getattr__(self, attr_name: str) -> Any: + return getattr(self.___reactor_target, attr_name) + + +def _worker_entrypoint( + func: Callable[[], None], proxy_reactor: ProxiedReactor, args: List[str] +) -> None: + """ + Entrypoint for a forked worker process. + + We just need to set up the command-line arguments, create our real reactor + and then kick off the worker's main() function. + """ + + sys.argv = args + + from twisted.internet.epollreactor import EPollReactor + + proxy_reactor._install_real_reactor(EPollReactor()) + func() + + +def main() -> None: + """ + Entrypoint for the forking launcher. + """ + parser = argparse.ArgumentParser() + parser.add_argument("db_config", help="Path to database config file") + parser.add_argument( + "args", + nargs="...", + help="Argument groups separated by `--`. " + "The first argument of each group is a Synapse app name. " + "Subsequent arguments are passed through.", + ) + ns = parser.parse_args() + + # Split up the subsequent arguments into each workers' arguments; + # `--` is our delimiter of choice. + args_by_worker: List[List[str]] = [ + list(args) + for cond, args in itertools.groupby(ns.args, lambda ele: ele != "--") + if cond and args + ] + + # Prevent Twisted from installing a shared reactor that all the workers will + # inherit when we fork(), by installing our own beforehand. + proxy_reactor = ProxiedReactor() + installReactor(proxy_reactor) + + # Import the entrypoints for all the workers. + worker_functions = [] + for worker_args in args_by_worker: + worker_module = importlib.import_module(worker_args[0]) + worker_functions.append(worker_module.main) + + # We need to prepare the database first as otherwise all the workers will + # try to create a schema version table and some will crash out. + from synapse._scripts import update_synapse_database + + update_proc = multiprocessing.Process( + target=_worker_entrypoint, + args=( + update_synapse_database.main, + proxy_reactor, + [ + "update_synapse_database", + "--database-config", + ns.db_config, + "--run-background-updates", + ], + ), + ) + print("===== PREPARING DATABASE =====", file=sys.stderr) + update_proc.start() + update_proc.join() + print("===== PREPARED DATABASE =====", file=sys.stderr) + + # At this point, we've imported all the main entrypoints for all the workers. + # Now we basically just fork() out to create the workers we need. + # Because we're using fork(), all the workers get a clone of this launcher's + # memory space and don't need to repeat the work of loading the code! + # Instead of using fork() directly, we use the multiprocessing library, + # which uses fork() on Unix platforms. + processes = [] + for (func, worker_args) in zip(worker_functions, args_by_worker): + process = multiprocessing.Process( + target=_worker_entrypoint, args=(func, proxy_reactor, worker_args) + ) + process.start() + processes.append(process) + + # Be a good parent and wait for our children to die before exiting. + for process in processes: + process.join() + + +if __name__ == "__main__": + main()