diff --git a/.ci/scripts/calculate_jobs.py b/.ci/scripts/calculate_jobs.py index 15f2d94a81..ea278173db 100755 --- a/.ci/scripts/calculate_jobs.py +++ b/.ci/scripts/calculate_jobs.py @@ -36,11 +36,11 @@ IS_PR = os.environ["GITHUB_REF"].startswith("refs/pull/") # First calculate the various trial jobs. # # For PRs, we only run each type of test with the oldest Python version supported (which -# is Python 3.8 right now) +# is Python 3.9 right now) trial_sqlite_tests = [ { - "python-version": "3.8", + "python-version": "3.9", "database": "sqlite", "extras": "all", } @@ -53,12 +53,12 @@ if not IS_PR: "database": "sqlite", "extras": "all", } - for version in ("3.9", "3.10", "3.11", "3.12", "3.13") + for version in ("3.10", "3.11", "3.12", "3.13") ) trial_postgres_tests = [ { - "python-version": "3.8", + "python-version": "3.9", "database": "postgres", "postgres-version": "11", "extras": "all", @@ -77,7 +77,7 @@ if not IS_PR: trial_no_extra_tests = [ { - "python-version": "3.8", + "python-version": "3.9", "database": "sqlite", "extras": "", } @@ -99,24 +99,24 @@ set_output("trial_test_matrix", test_matrix) # First calculate the various sytest jobs. # -# For each type of test we only run on focal on PRs +# For each type of test we only run on bullseye on PRs sytest_tests = [ { - "sytest-tag": "focal", + "sytest-tag": "bullseye", }, { - "sytest-tag": "focal", + "sytest-tag": "bullseye", "postgres": "postgres", }, { - "sytest-tag": "focal", + "sytest-tag": "bullseye", "postgres": "multi-postgres", "workers": "workers", }, { - "sytest-tag": "focal", + "sytest-tag": "bullseye", "postgres": "multi-postgres", "workers": "workers", "reactor": "asyncio", @@ -127,11 +127,11 @@ if not IS_PR: sytest_tests.extend( [ { - "sytest-tag": "focal", + "sytest-tag": "bullseye", "reactor": "asyncio", }, { - "sytest-tag": "focal", + "sytest-tag": "bullseye", "postgres": "postgres", "reactor": "asyncio", }, diff --git a/.ci/scripts/prepare_old_deps.sh b/.ci/scripts/prepare_old_deps.sh index 580f87bbdf..3589be26f8 100755 --- a/.ci/scripts/prepare_old_deps.sh +++ b/.ci/scripts/prepare_old_deps.sh @@ -1,5 +1,5 @@ #!/usr/bin/env bash -# this script is run by GitHub Actions in a plain `focal` container; it +# this script is run by GitHub Actions in a plain `jammy` container; it # - installs the minimal system requirements, and poetry; # - patches the project definition file to refer to old versions only; # - creates a venv with these old versions using poetry; and finally diff --git a/.github/workflows/latest_deps.yml b/.github/workflows/latest_deps.yml index b9e9a401b9..3884b6d402 100644 --- a/.github/workflows/latest_deps.yml +++ b/.github/workflows/latest_deps.yml @@ -132,9 +132,9 @@ jobs: fail-fast: false matrix: include: - - sytest-tag: focal + - sytest-tag: bullseye - - sytest-tag: focal + - sytest-tag: bullseye postgres: postgres workers: workers redis: redis diff --git a/.github/workflows/release-artifacts.yml b/.github/workflows/release-artifacts.yml index 9f0feffd94..d77d7792f0 100644 --- a/.github/workflows/release-artifacts.yml +++ b/.github/workflows/release-artifacts.yml @@ -92,7 +92,7 @@ jobs: mv /tmp/.buildx-cache-new /tmp/.buildx-cache - name: Upload debs as artifacts - uses: actions/upload-artifact@v3 # Don't upgrade to v4; broken: https://github.com/actions/upload-artifact#breaking-changes + uses: actions/upload-artifact@v4 with: name: debs path: debs/* @@ -102,7 +102,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - os: [ubuntu-20.04, macos-12] + os: [ubuntu-22.04, macos-12] arch: [x86_64, aarch64] # is_pr is a flag used to exclude certain jobs from the matrix on PRs. # It is not read by the rest of the workflow. @@ -144,7 +144,7 @@ jobs: - name: Only build a single wheel on PR if: startsWith(github.ref, 'refs/pull/') - run: echo "CIBW_BUILD="cp38-manylinux_${{ matrix.arch }}"" >> $GITHUB_ENV + run: echo "CIBW_BUILD="cp39-manylinux_${{ matrix.arch }}"" >> $GITHUB_ENV - name: Build wheels run: python -m cibuildwheel --output-dir wheelhouse @@ -156,9 +156,9 @@ jobs: CARGO_NET_GIT_FETCH_WITH_CLI: true CIBW_ENVIRONMENT_PASS_LINUX: CARGO_NET_GIT_FETCH_WITH_CLI - - uses: actions/upload-artifact@v3 # Don't upgrade to v4; broken: https://github.com/actions/upload-artifact#breaking-changes + - uses: actions/upload-artifact@v4 with: - name: Wheel + name: Wheel-${{ matrix.os }}-${{ matrix.arch }} path: ./wheelhouse/*.whl build-sdist: @@ -177,7 +177,7 @@ jobs: - name: Build sdist run: python -m build --sdist - - uses: actions/upload-artifact@v3 # Don't upgrade to v4; broken: https://github.com/actions/upload-artifact#breaking-changes + - uses: actions/upload-artifact@v4 with: name: Sdist path: dist/*.tar.gz @@ -194,7 +194,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Download all workflow run artifacts - uses: actions/download-artifact@v3 # Don't upgrade to v4, it should match upload-artifact + uses: actions/download-artifact@v4 - name: Build a tarball for the debs run: tar -cvJf debs.tar.xz debs - name: Attach to release diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 5586bd6d94..27dac89220 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -397,7 +397,7 @@ jobs: needs: - linting-done - changes - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v4 @@ -409,12 +409,12 @@ jobs: # their build dependencies - run: | sudo apt-get -qq update - sudo apt-get -qq install build-essential libffi-dev python-dev \ + sudo apt-get -qq install build-essential libffi-dev python3-dev \ libxml2-dev libxslt-dev xmlsec1 zlib1g-dev libjpeg-dev libwebp-dev - uses: actions/setup-python@v5 with: - python-version: '3.8' + python-version: '3.9' - name: Prepare old deps if: steps.cache-poetry-old-deps.outputs.cache-hit != 'true' @@ -458,7 +458,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ["pypy-3.8"] + python-version: ["pypy-3.9"] extras: ["all"] steps: @@ -580,7 +580,7 @@ jobs: strategy: matrix: include: - - python-version: "3.8" + - python-version: "3.9" postgres-version: "11" - python-version: "3.11" diff --git a/.github/workflows/twisted_trunk.yml b/.github/workflows/twisted_trunk.yml index 76609c2118..cdaa00ef90 100644 --- a/.github/workflows/twisted_trunk.yml +++ b/.github/workflows/twisted_trunk.yml @@ -99,11 +99,11 @@ jobs: if: needs.check_repo.outputs.should_run_workflow == 'true' runs-on: ubuntu-latest container: - # We're using ubuntu:focal because it uses Python 3.8 which is our minimum supported Python version. + # We're using debian:bullseye because it uses Python 3.9 which is our minimum supported Python version. # This job is a canary to warn us about unreleased twisted changes that would cause problems for us if # they were to be released immediately. For simplicity's sake (and to save CI runners) we use the oldest # version, assuming that any incompatibilities on newer versions would also be present on the oldest. - image: matrixdotorg/sytest-synapse:focal + image: matrixdotorg/sytest-synapse:bullseye volumes: - ${{ github.workspace }}:/src diff --git a/Cargo.lock b/Cargo.lock index e2f22ba812..2e23b95f85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13,9 +13,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.91" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c042108f3ed77fd83760a5fd79b53be043192bb3b9dba91d8c574c0ada7850c8" +checksum = "74f37166d7d48a0284b99dd824694c26119c700b53bf0d1540cdb147dbdaaf13" [[package]] name = "arc-swap" @@ -485,18 +485,18 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "serde" -version = "1.0.213" +version = "1.0.214" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ea7893ff5e2466df8d720bb615088341b295f849602c6956047f8f80f0e9bc1" +checksum = "f55c3193aca71c12ad7890f1785d2b73e1b9f63a0bbc353c08ef26fe03fc56b5" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.213" +version = "1.0.214" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e85ad2009c50b58e87caa8cd6dac16bdf511bbfb7af6c33df902396aa480fa5" +checksum = "de523f781f095e28fa605cdce0f8307e451cc0fd14e2eb4cd2e98a355b147766" dependencies = [ "proc-macro2", "quote", diff --git a/changelog.d/17657.misc b/changelog.d/17657.misc new file mode 100644 index 0000000000..aff558adf7 --- /dev/null +++ b/changelog.d/17657.misc @@ -0,0 +1 @@ +Bump actions/download-artifact and actions/upload-artifact from v3 -> v4. diff --git a/changelog.d/17809.bugfix b/changelog.d/17809.bugfix new file mode 100644 index 0000000000..e244a36bd3 --- /dev/null +++ b/changelog.d/17809.bugfix @@ -0,0 +1 @@ +Fix bug with sliding sync where `$LAZY`-loading room members would not return `required_state` membership in incremental syncs. diff --git a/changelog.d/17813.bugfix b/changelog.d/17813.bugfix deleted file mode 100644 index 5dd276709b..0000000000 --- a/changelog.d/17813.bugfix +++ /dev/null @@ -1 +0,0 @@ -Avoid lost data on some database query retries. diff --git a/changelog.d/17813.misc b/changelog.d/17813.misc new file mode 100644 index 0000000000..f8676aee59 --- /dev/null +++ b/changelog.d/17813.misc @@ -0,0 +1 @@ +Refactor database calls to remove `Generator` usage. diff --git a/changelog.d/17814.bugfix b/changelog.d/17814.bugfix deleted file mode 100644 index 5dd276709b..0000000000 --- a/changelog.d/17814.bugfix +++ /dev/null @@ -1 +0,0 @@ -Avoid lost data on some database query retries. diff --git a/changelog.d/17814.misc b/changelog.d/17814.misc new file mode 100644 index 0000000000..f8676aee59 --- /dev/null +++ b/changelog.d/17814.misc @@ -0,0 +1 @@ +Refactor database calls to remove `Generator` usage. diff --git a/changelog.d/17815.bugfix b/changelog.d/17815.bugfix deleted file mode 100644 index 5dd276709b..0000000000 --- a/changelog.d/17815.bugfix +++ /dev/null @@ -1 +0,0 @@ -Avoid lost data on some database query retries. diff --git a/changelog.d/17815.misc b/changelog.d/17815.misc new file mode 100644 index 0000000000..f8676aee59 --- /dev/null +++ b/changelog.d/17815.misc @@ -0,0 +1 @@ +Refactor database calls to remove `Generator` usage. diff --git a/changelog.d/17816.bugfix b/changelog.d/17816.bugfix deleted file mode 100644 index 5dd276709b..0000000000 --- a/changelog.d/17816.bugfix +++ /dev/null @@ -1 +0,0 @@ -Avoid lost data on some database query retries. diff --git a/changelog.d/17816.misc b/changelog.d/17816.misc new file mode 100644 index 0000000000..f8676aee59 --- /dev/null +++ b/changelog.d/17816.misc @@ -0,0 +1 @@ +Refactor database calls to remove `Generator` usage. diff --git a/changelog.d/17817.bugfix b/changelog.d/17817.bugfix deleted file mode 100644 index 5dd276709b..0000000000 --- a/changelog.d/17817.bugfix +++ /dev/null @@ -1 +0,0 @@ -Avoid lost data on some database query retries. diff --git a/changelog.d/17817.misc b/changelog.d/17817.misc new file mode 100644 index 0000000000..f8676aee59 --- /dev/null +++ b/changelog.d/17817.misc @@ -0,0 +1 @@ +Refactor database calls to remove `Generator` usage. diff --git a/changelog.d/17818.bugfix b/changelog.d/17818.bugfix deleted file mode 100644 index 5dd276709b..0000000000 --- a/changelog.d/17818.bugfix +++ /dev/null @@ -1 +0,0 @@ -Avoid lost data on some database query retries. diff --git a/changelog.d/17818.misc b/changelog.d/17818.misc new file mode 100644 index 0000000000..f8676aee59 --- /dev/null +++ b/changelog.d/17818.misc @@ -0,0 +1 @@ +Refactor database calls to remove `Generator` usage. diff --git a/changelog.d/17850.bugfix b/changelog.d/17850.bugfix new file mode 100644 index 0000000000..8ea99c4ef9 --- /dev/null +++ b/changelog.d/17850.bugfix @@ -0,0 +1 @@ +Fix bug when some presence and typing timeouts can expire early. \ No newline at end of file diff --git a/changelog.d/17852.misc b/changelog.d/17852.misc new file mode 100644 index 0000000000..b1b7ac9734 --- /dev/null +++ b/changelog.d/17852.misc @@ -0,0 +1 @@ +The nix flake inside the repository no longer tracks nixpkgs/master to not catch the latest bugs from a PR merged 5 minutes ago. diff --git a/changelog.d/17887.misc b/changelog.d/17887.misc new file mode 100644 index 0000000000..6be32caee6 --- /dev/null +++ b/changelog.d/17887.misc @@ -0,0 +1 @@ +Bump the default Python version in the Synapse Dockerfile from 3.11 -> 3.12. \ No newline at end of file diff --git a/changelog.d/17888.feature b/changelog.d/17888.feature new file mode 100644 index 0000000000..3ede8886ab --- /dev/null +++ b/changelog.d/17888.feature @@ -0,0 +1 @@ +Add experimental support for [MSC4222](https://github.com/matrix-org/matrix-spec-proposals/pull/4222). diff --git a/changelog.d/17890.misc b/changelog.d/17890.misc new file mode 100644 index 0000000000..f8676aee59 --- /dev/null +++ b/changelog.d/17890.misc @@ -0,0 +1 @@ +Refactor database calls to remove `Generator` usage. diff --git a/changelog.d/17894.misc b/changelog.d/17894.misc new file mode 100644 index 0000000000..dc1a7577ab --- /dev/null +++ b/changelog.d/17894.misc @@ -0,0 +1 @@ +Remove usage of internal header encoding API. diff --git a/changelog.d/17902.misc b/changelog.d/17902.misc new file mode 100644 index 0000000000..f094f57c2f --- /dev/null +++ b/changelog.d/17902.misc @@ -0,0 +1 @@ +Update version constraint to allow the latest poetry-core 1.9.1. diff --git a/changelog.d/17903.bugfix b/changelog.d/17903.bugfix new file mode 100644 index 0000000000..a4d02fc983 --- /dev/null +++ b/changelog.d/17903.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug in Synapse which could cause one-time keys to be issued in the incorrect order, causing message decryption failures. diff --git a/changelog.d/17905.misc b/changelog.d/17905.misc new file mode 100644 index 0000000000..32ef50dbac --- /dev/null +++ b/changelog.d/17905.misc @@ -0,0 +1 @@ +Use unique name for each os.arch variant when uploading Wheel artifacts. diff --git a/changelog.d/17907.bugfix b/changelog.d/17907.bugfix new file mode 100644 index 0000000000..f38ce6a590 --- /dev/null +++ b/changelog.d/17907.bugfix @@ -0,0 +1 @@ +Fix tests to run with latest Twisted. diff --git a/changelog.d/17908.misc b/changelog.d/17908.misc new file mode 100644 index 0000000000..8f17729148 --- /dev/null +++ b/changelog.d/17908.misc @@ -0,0 +1 @@ +Remove support for python 3.8. diff --git a/docker/Dockerfile b/docker/Dockerfile index 1da196b12e..a4931011a7 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -20,7 +20,7 @@ # `poetry export | pip install -r /dev/stdin`, but beware: we have experienced bugs in # in `poetry export` in the past. -ARG PYTHON_VERSION=3.11 +ARG PYTHON_VERSION=3.12 ### ### Stage 0: generate requirements.txt diff --git a/docs/admin_api/experimental_features.md b/docs/admin_api/experimental_features.md index ef1b58c9ba..e32728e56d 100644 --- a/docs/admin_api/experimental_features.md +++ b/docs/admin_api/experimental_features.md @@ -5,6 +5,7 @@ basis. The currently supported features are: - [MSC3881](https://github.com/matrix-org/matrix-spec-proposals/pull/3881): enable remotely toggling push notifications for another client - [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575): enable experimental sliding sync support +- [MSC4222](https://github.com/matrix-org/matrix-spec-proposals/pull/4222): adding `state_after` to sync v2 To use it, you will need to authenticate by providing an `access_token` for a server admin: see [Admin API](../usage/administration/admin_api/). diff --git a/docs/development/contributing_guide.md b/docs/development/contributing_guide.md index f079f61b48..d6efab96cf 100644 --- a/docs/development/contributing_guide.md +++ b/docs/development/contributing_guide.md @@ -322,7 +322,7 @@ The following command will let you run the integration test with the most common configuration: ```sh -$ docker run --rm -it -v /path/where/you/have/cloned/the/repository\:/src:ro -v /path/to/where/you/want/logs\:/logs matrixdotorg/sytest-synapse:focal +$ docker run --rm -it -v /path/where/you/have/cloned/the/repository\:/src:ro -v /path/to/where/you/want/logs\:/logs matrixdotorg/sytest-synapse:bullseye ``` (Note that the paths must be full paths! You could also write `$(realpath relative/path)` if needed.) diff --git a/docs/setup/installation.md b/docs/setup/installation.md index 9cebb89b4d..d717880aa5 100644 --- a/docs/setup/installation.md +++ b/docs/setup/installation.md @@ -208,7 +208,7 @@ When following this route please make sure that the [Platform-specific prerequis System requirements: - POSIX-compliant system (tested on Linux & OS X) -- Python 3.8 or later, up to Python 3.11. +- Python 3.9 or later, up to Python 3.13. - At least 1GB of free RAM if you want to join large public rooms like #matrix:matrix.org If building on an uncommon architecture for which pre-built wheels are diff --git a/docs/upgrade.md b/docs/upgrade.md index 52b1adbe90..ea9824a5ee 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -117,6 +117,17 @@ each upgrade are complete before moving on to the next upgrade, to avoid stacking them up. You can monitor the currently running background updates with [the Admin API](usage/administration/admin_api/background_updates.html#status). +# Upgrading to v1.119.0 + +## Minimum supported Python version + +The minimum supported Python version has been increased from v3.8 to v3.9. +You will need Python 3.9+ to run Synapse v1.119.0 (due out Nov 7th, 2024). + +If you use current versions of the Matrix.org-distributed Docker images, no action is required. +Please note that support for Ubuntu `focal` was dropped as well since it uses Python 3.8. + + # Upgrading to v1.111.0 ## New worker endpoints for authenticated client and federation media diff --git a/flake.lock b/flake.lock index 9b360fa33e..6b25cef3fc 100644 --- a/flake.lock +++ b/flake.lock @@ -186,16 +186,16 @@ }, "nixpkgs_2": { "locked": { - "lastModified": 1690535733, - "narHash": "sha256-WgjUPscQOw3cB8yySDGlyzo6cZNihnRzUwE9kadv/5I=", + "lastModified": 1729265718, + "narHash": "sha256-4HQI+6LsO3kpWTYuVGIzhJs1cetFcwT7quWCk/6rqeo=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "8cacc05fbfffeaab910e8c2c9e2a7c6b32ce881a", + "rev": "ccc0c2126893dd20963580b6478d1a10a4512185", "type": "github" }, "original": { "owner": "NixOS", - "ref": "master", + "ref": "nixpkgs-unstable", "repo": "nixpkgs", "type": "github" } diff --git a/flake.nix b/flake.nix index 31f2832939..bc360ae44a 100644 --- a/flake.nix +++ b/flake.nix @@ -3,13 +3,13 @@ # (https://github.com/matrix-org/complement) Matrix homeserver test suites are also # installed automatically. # -# You must have already installed Nix (https://nixos.org) on your system to use this. -# Nix can be installed on Linux or MacOS; NixOS is not required. Windows is not -# directly supported, but Nix can be installed inside of WSL2 or even Docker +# You must have already installed Nix (https://nixos.org/download/) on your system to use this. +# Nix can be installed on any Linux distribiution or MacOS; NixOS is not required. +# Windows is not directly supported, but Nix can be installed inside of WSL2 or even Docker # containers. Please refer to https://nixos.org/download for details. # # You must also enable support for flakes in Nix. See the following for how to -# do so permanently: https://nixos.wiki/wiki/Flakes#Enable_flakes +# do so permanently: https://wiki.nixos.org/wiki/Flakes#Other_Distros,_without_Home-Manager # # Be warned: you'll need over 3.75 GB of free space to download all the dependencies. # @@ -20,7 +20,7 @@ # locally from "services", such as PostgreSQL and Redis. # # You should now be dropped into a new shell with all programs and dependencies -# availabile to you! +# available to you! # # You can start up pre-configured local Synapse, PostgreSQL and Redis instances by # running: `devenv up`. To stop them, use Ctrl-C. @@ -39,9 +39,9 @@ { inputs = { - # Use the master/unstable branch of nixpkgs. Used to fetch the latest + # Use the rolling/unstable branch of nixpkgs. Used to fetch the latest # available versions of packages. - nixpkgs.url = "github:NixOS/nixpkgs/master"; + nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable"; # Output a development shell for x86_64/aarch64 Linux/Darwin (MacOS). systems.url = "github:nix-systems/default"; # A development environment manager built on Nix. See https://devenv.sh. @@ -50,7 +50,7 @@ rust-overlay.url = "github:oxalica/rust-overlay"; }; - outputs = { self, nixpkgs, devenv, systems, rust-overlay, ... } @ inputs: + outputs = { nixpkgs, devenv, systems, rust-overlay, ... } @ inputs: let forEachSystem = nixpkgs.lib.genAttrs (import systems); in { @@ -126,7 +126,7 @@ # Automatically activate the poetry virtualenv upon entering the shell. languages.python.poetry.activate.enable = true; # Install all extra Python dependencies; this is needed to run the unit - # tests and utilitise all Synapse features. + # tests and utilise all Synapse features. languages.python.poetry.install.arguments = ["--extras all"]; # Install the 'matrix-synapse' package from the local checkout. languages.python.poetry.install.installRootPackage = true; @@ -163,8 +163,8 @@ # Create a postgres user called 'synapse_user' which has ownership # over the 'synapse' database. services.postgres.initialScript = '' - CREATE USER synapse_user; - ALTER DATABASE synapse OWNER TO synapse_user; + CREATE USER synapse_user; + ALTER DATABASE synapse OWNER TO synapse_user; ''; # Redis is needed in order to run Synapse in worker mode. diff --git a/mypy.ini b/mypy.ini index 3fca15c01b..cf64248cc5 100644 --- a/mypy.ini +++ b/mypy.ini @@ -26,7 +26,7 @@ strict_equality = True # Run mypy type checking with the minimum supported Python version to catch new usage # that isn't backwards-compatible (types, overloads, etc). -python_version = 3.8 +python_version = 3.9 files = docker/, diff --git a/poetry.lock b/poetry.lock index f311e787f9..16b7dc504e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.4 and should not be changed by hand. [[package]] name = "annotated-types" @@ -11,9 +11,6 @@ files = [ {file = "annotated_types-0.7.0.tar.gz", hash = "sha256:aff07c09a53a08bc8cfccb9c85b05f1aa9a2a6f23728d790723543408344ce89"}, ] -[package.dependencies] -typing-extensions = {version = ">=4.0.0", markers = "python_version < \"3.9\""} - [[package]] name = "attrs" version = "24.2.0" @@ -874,9 +871,7 @@ files = [ [package.dependencies] attrs = ">=22.2.0" -importlib-resources = {version = ">=1.4.0", markers = "python_version < \"3.9\""} jsonschema-specifications = ">=2023.03.6" -pkgutil-resolve-name = {version = ">=1.3.10", markers = "python_version < \"3.9\""} referencing = ">=0.28.4" rpds-py = ">=0.7.1" @@ -896,7 +891,6 @@ files = [ ] [package.dependencies] -importlib-resources = {version = ">=1.4.0", markers = "python_version < \"3.9\""} referencing = ">=0.28.0" [[package]] @@ -912,7 +906,6 @@ files = [ [package.dependencies] importlib-metadata = {version = ">=4.11.4", markers = "python_version < \"3.12\""} -importlib-resources = {version = "*", markers = "python_version < \"3.9\""} "jaraco.classes" = "*" jeepney = {version = ">=0.4.2", markers = "sys_platform == \"linux\""} pywin32-ctypes = {version = ">=0.2.0", markers = "sys_platform == \"win32\""} @@ -1380,17 +1373,17 @@ files = [ [[package]] name = "mypy-zope" -version = "1.0.7" +version = "1.0.8" description = "Plugin for mypy to support zope interfaces" optional = false python-versions = "*" files = [ - {file = "mypy_zope-1.0.7-py3-none-any.whl", hash = "sha256:f19de249574319d81083b15f8a022c6b15583582f23340a860922141f1b651ca"}, - {file = "mypy_zope-1.0.7.tar.gz", hash = "sha256:32a79ce78647c0bea61e7e0c0eb1233fcb97bb94e8950cca73f17d3419c602f7"}, + {file = "mypy_zope-1.0.8-py3-none-any.whl", hash = "sha256:8794a77dae0c7e2f28b8ac48569091310b3ee45bb9d6cd4797dcb837c40f9976"}, + {file = "mypy_zope-1.0.8.tar.gz", hash = "sha256:854303a95aefc4289e8a0796808e002c2c7ecde0a10a8f7b8f48092f94ef9b9f"}, ] [package.dependencies] -mypy = ">=1.0.0,<1.12.0" +mypy = ">=1.0.0,<1.13.0" "zope.interface" = "*" "zope.schema" = "*" @@ -1451,13 +1444,13 @@ dev = ["jinja2"] [[package]] name = "phonenumbers" -version = "8.13.48" +version = "8.13.49" description = "Python version of Google's common library for parsing, formatting, storing and validating international phone numbers." optional = false python-versions = "*" files = [ - {file = "phonenumbers-8.13.48-py2.py3-none-any.whl", hash = "sha256:5c51939acefa390eb74119750afb10a85d3c628dc83fd62c52d6f532fcf5d205"}, - {file = "phonenumbers-8.13.48.tar.gz", hash = "sha256:62d8df9b0f3c3c41571c6b396f044ddd999d61631534001b8be7fdf7ba1b18f3"}, + {file = "phonenumbers-8.13.49-py2.py3-none-any.whl", hash = "sha256:e17140955ab3d8f9580727372ea64c5ada5327932d6021ef6fd203c3db8c8139"}, + {file = "phonenumbers-8.13.49.tar.gz", hash = "sha256:e608ccb61f0bd42e6db1d2c421f7c22186b88f494870bf40aa31d1a2718ab0ae"}, ] [[package]] @@ -1571,17 +1564,6 @@ files = [ [package.extras] testing = ["pytest", "pytest-cov"] -[[package]] -name = "pkgutil-resolve-name" -version = "1.3.10" -description = "Resolve a name to an object." -optional = false -python-versions = ">=3.6" -files = [ - {file = "pkgutil_resolve_name-1.3.10-py3-none-any.whl", hash = "sha256:ca27cc078d25c5ad71a9de0a7a330146c4e014c2462d9af19c6b828280649c5e"}, - {file = "pkgutil_resolve_name-1.3.10.tar.gz", hash = "sha256:357d6c9e6a755653cfd78893817c0853af365dd51ec97f3d358a819373bbd174"}, -] - [[package]] name = "prometheus-client" version = "0.21.0" @@ -1948,7 +1930,6 @@ files = [ [package.dependencies] cryptography = ">=3.1" defusedxml = "*" -importlib-resources = {version = "*", markers = "python_version < \"3.9\""} pyopenssl = "*" python-dateutil = "*" pytz = "*" @@ -2164,7 +2145,6 @@ files = [ [package.dependencies] markdown-it-py = ">=2.2.0,<3.0.0" pygments = ">=2.13.0,<3.0.0" -typing-extensions = {version = ">=4.0.0,<5.0", markers = "python_version < \"3.9\""} [package.extras] jupyter = ["ipywidgets (>=7.5.1,<9)"] @@ -2277,29 +2257,29 @@ files = [ [[package]] name = "ruff" -version = "0.7.1" +version = "0.7.2" description = "An extremely fast Python linter and code formatter, written in Rust." optional = false python-versions = ">=3.7" files = [ - {file = "ruff-0.7.1-py3-none-linux_armv6l.whl", hash = "sha256:cb1bc5ed9403daa7da05475d615739cc0212e861b7306f314379d958592aaa89"}, - {file = "ruff-0.7.1-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:27c1c52a8d199a257ff1e5582d078eab7145129aa02721815ca8fa4f9612dc35"}, - {file = "ruff-0.7.1-py3-none-macosx_11_0_arm64.whl", hash = "sha256:588a34e1ef2ea55b4ddfec26bbe76bc866e92523d8c6cdec5e8aceefeff02d99"}, - {file = "ruff-0.7.1-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:94fc32f9cdf72dc75c451e5f072758b118ab8100727168a3df58502b43a599ca"}, - {file = "ruff-0.7.1-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:985818742b833bffa543a84d1cc11b5e6871de1b4e0ac3060a59a2bae3969250"}, - {file = "ruff-0.7.1-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:32f1e8a192e261366c702c5fb2ece9f68d26625f198a25c408861c16dc2dea9c"}, - {file = "ruff-0.7.1-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:699085bf05819588551b11751eff33e9ca58b1b86a6843e1b082a7de40da1565"}, - {file = "ruff-0.7.1-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:344cc2b0814047dc8c3a8ff2cd1f3d808bb23c6658db830d25147339d9bf9ea7"}, - {file = "ruff-0.7.1-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:4316bbf69d5a859cc937890c7ac7a6551252b6a01b1d2c97e8fc96e45a7c8b4a"}, - {file = "ruff-0.7.1-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:79d3af9dca4c56043e738a4d6dd1e9444b6d6c10598ac52d146e331eb155a8ad"}, - {file = "ruff-0.7.1-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:c5c121b46abde94a505175524e51891f829414e093cd8326d6e741ecfc0a9112"}, - {file = "ruff-0.7.1-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:8422104078324ea250886954e48f1373a8fe7de59283d747c3a7eca050b4e378"}, - {file = "ruff-0.7.1-py3-none-musllinux_1_2_i686.whl", hash = "sha256:56aad830af8a9db644e80098fe4984a948e2b6fc2e73891538f43bbe478461b8"}, - {file = "ruff-0.7.1-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:658304f02f68d3a83c998ad8bf91f9b4f53e93e5412b8f2388359d55869727fd"}, - {file = "ruff-0.7.1-py3-none-win32.whl", hash = "sha256:b517a2011333eb7ce2d402652ecaa0ac1a30c114fbbd55c6b8ee466a7f600ee9"}, - {file = "ruff-0.7.1-py3-none-win_amd64.whl", hash = "sha256:f38c41fcde1728736b4eb2b18850f6d1e3eedd9678c914dede554a70d5241307"}, - {file = "ruff-0.7.1-py3-none-win_arm64.whl", hash = "sha256:19aa200ec824c0f36d0c9114c8ec0087082021732979a359d6f3c390a6ff2a37"}, - {file = "ruff-0.7.1.tar.gz", hash = "sha256:9d8a41d4aa2dad1575adb98a82870cf5db5f76b2938cf2206c22c940034a36f4"}, + {file = "ruff-0.7.2-py3-none-linux_armv6l.whl", hash = "sha256:b73f873b5f52092e63ed540adefc3c36f1f803790ecf2590e1df8bf0a9f72cb8"}, + {file = "ruff-0.7.2-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:5b813ef26db1015953daf476202585512afd6a6862a02cde63f3bafb53d0b2d4"}, + {file = "ruff-0.7.2-py3-none-macosx_11_0_arm64.whl", hash = "sha256:853277dbd9675810c6826dad7a428d52a11760744508340e66bf46f8be9701d9"}, + {file = "ruff-0.7.2-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:21aae53ab1490a52bf4e3bf520c10ce120987b047c494cacf4edad0ba0888da2"}, + {file = "ruff-0.7.2-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:ccc7e0fc6e0cb3168443eeadb6445285abaae75142ee22b2b72c27d790ab60ba"}, + {file = "ruff-0.7.2-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:fd77877a4e43b3a98e5ef4715ba3862105e299af0c48942cc6d51ba3d97dc859"}, + {file = "ruff-0.7.2-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:e00163fb897d35523c70d71a46fbaa43bf7bf9af0f4534c53ea5b96b2e03397b"}, + {file = "ruff-0.7.2-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:f3c54b538633482dc342e9b634d91168fe8cc56b30a4b4f99287f4e339103e88"}, + {file = "ruff-0.7.2-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:7b792468e9804a204be221b14257566669d1db5c00d6bb335996e5cd7004ba80"}, + {file = "ruff-0.7.2-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dba53ed84ac19ae4bfb4ea4bf0172550a2285fa27fbb13e3746f04c80f7fa088"}, + {file = "ruff-0.7.2-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:b19fafe261bf741bca2764c14cbb4ee1819b67adb63ebc2db6401dcd652e3748"}, + {file = "ruff-0.7.2-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:28bd8220f4d8f79d590db9e2f6a0674f75ddbc3847277dd44ac1f8d30684b828"}, + {file = "ruff-0.7.2-py3-none-musllinux_1_2_i686.whl", hash = "sha256:9fd67094e77efbea932e62b5d2483006154794040abb3a5072e659096415ae1e"}, + {file = "ruff-0.7.2-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:576305393998b7bd6c46018f8104ea3a9cb3fa7908c21d8580e3274a3b04b691"}, + {file = "ruff-0.7.2-py3-none-win32.whl", hash = "sha256:fa993cfc9f0ff11187e82de874dfc3611df80852540331bc85c75809c93253a8"}, + {file = "ruff-0.7.2-py3-none-win_amd64.whl", hash = "sha256:dd8800cbe0254e06b8fec585e97554047fb82c894973f7ff18558eee33d1cb88"}, + {file = "ruff-0.7.2-py3-none-win_arm64.whl", hash = "sha256:bb8368cd45bba3f57bb29cbb8d64b4a33f8415d0149d2655c5c8539452ce7760"}, + {file = "ruff-0.7.2.tar.gz", hash = "sha256:2b14e77293380e475b4e3a7a368e14549288ed2931fce259a6f99978669e844f"}, ] [[package]] @@ -3121,5 +3101,5 @@ user-search = ["pyicu"] [metadata] lock-version = "2.0" -python-versions = "^3.8.0" -content-hash = "aa1f6d97809596c23a6d160c0c5804971dad0ba49e34b137bbfb79df038fe6f0" +python-versions = "^3.9.0" +content-hash = "0cd942a5193d01cbcef135a0bebd3fa0f12f7dbc63899d6f1c301e0649e9d902" diff --git a/pyproject.toml b/pyproject.toml index 93e9f38ca9..13de146b4e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,7 @@ [tool.ruff] line-length = 88 -target-version = "py38" +target-version = "py39" [tool.ruff.lint] # See https://beta.ruff.rs/docs/rules/#error-e @@ -155,7 +155,7 @@ synapse_review_recent_signups = "synapse._scripts.review_recent_signups:main" update_synapse_database = "synapse._scripts.update_synapse_database:main" [tool.poetry.dependencies] -python = "^3.8.0" +python = "^3.9.0" # Mandatory Dependencies # ---------------------- @@ -178,7 +178,7 @@ Twisted = {extras = ["tls"], version = ">=18.9.0"} treq = ">=15.1" # Twisted has required pyopenssl 16.0 since about Twisted 16.6. pyOpenSSL = ">=16.0.0" -PyYAML = ">=3.13" +PyYAML = ">=5.3" pyasn1 = ">=0.1.9" pyasn1-modules = ">=0.0.7" bcrypt = ">=3.1.7" @@ -241,7 +241,7 @@ authlib = { version = ">=0.15.1", optional = true } # `contrib/systemd/log_config.yaml`. # Note: systemd-python 231 appears to have been yanked from pypi systemd-python = { version = ">=231", optional = true } -lxml = { version = ">=4.2.0", optional = true } +lxml = { version = ">=4.5.2", optional = true } sentry-sdk = { version = ">=0.7.2", optional = true } opentracing = { version = ">=2.2.0", optional = true } jaeger-client = { version = ">=4.0.0", optional = true } @@ -320,7 +320,7 @@ all = [ # failing on new releases. Keeping lower bounds loose here means that dependabot # can bump versions without having to update the content-hash in the lockfile. # This helps prevents merge conflicts when running a batch of dependabot updates. -ruff = "0.7.1" +ruff = "0.7.2" # Type checking only works with the pydantic.v1 compat module from pydantic v2 pydantic = "^2" @@ -370,7 +370,7 @@ tomli = ">=1.2.3" # runtime errors caused by build system changes. # We are happy to raise these upper bounds upon request, # provided we check that it's safe to do so (i.e. that CI passes). -requires = ["poetry-core>=1.1.0,<=1.9.0", "setuptools_rust>=1.3,<=1.8.1"] +requires = ["poetry-core>=1.1.0,<=1.9.1", "setuptools_rust>=1.3,<=1.8.1"] build-backend = "poetry.core.masonry.api" @@ -378,13 +378,13 @@ build-backend = "poetry.core.masonry.api" # Skip unsupported platforms (by us or by Rust). # See https://cibuildwheel.readthedocs.io/en/stable/options/#build-skip for the list of build targets. # We skip: -# - CPython 3.6 and 3.7: EOLed -# - PyPy 3.7: we only support Python 3.8+ +# - CPython 3.6, 3.7 and 3.8: EOLed +# - PyPy 3.7 and 3.8: we only support Python 3.9+ # - musllinux i686: excluded to reduce number of wheels we build. # c.f. https://github.com/matrix-org/synapse/pull/12595#discussion_r963107677 # - PyPy on Aarch64 and musllinux on aarch64: too slow to build. # c.f. https://github.com/matrix-org/synapse/pull/14259 -skip = "cp36* cp37* pp37* *-musllinux_i686 pp*aarch64 *-musllinux_aarch64" +skip = "cp36* cp37* cp38* pp37* pp38* *-musllinux_i686 pp*aarch64 *-musllinux_aarch64" # We need a rust compiler before-all = "curl https://sh.rustup.rs -sSf | sh -s -- --default-toolchain stable -y --profile minimal" diff --git a/scripts-dev/build_debian_packages.py b/scripts-dev/build_debian_packages.py index 88c8419400..6ee695b2ba 100755 --- a/scripts-dev/build_debian_packages.py +++ b/scripts-dev/build_debian_packages.py @@ -28,9 +28,8 @@ from typing import Collection, Optional, Sequence, Set # example) DISTS = ( "debian:bullseye", # (EOL ~2024-07) (our EOL forced by Python 3.9 is 2025-10-05) - "debian:bookworm", # (EOL not specified yet) (our EOL forced by Python 3.11 is 2027-10-24) - "debian:sid", # (EOL not specified yet) (our EOL forced by Python 3.11 is 2027-10-24) - "ubuntu:focal", # 20.04 LTS (EOL 2025-04) (our EOL forced by Python 3.8 is 2024-10-14) + "debian:bookworm", # (EOL 2026-06) (our EOL forced by Python 3.11 is 2027-10-24) + "debian:sid", # (rolling distro, no EOL) "ubuntu:jammy", # 22.04 LTS (EOL 2027-04) (our EOL forced by Python 3.10 is 2026-10-04) "ubuntu:noble", # 24.04 LTS (EOL 2029-06) "ubuntu:oracular", # 24.10 (EOL 2025-07) diff --git a/synapse/__init__.py b/synapse/__init__.py index 73b92f12be..e7784ac5d7 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -39,8 +39,8 @@ ImageFile.LOAD_TRUNCATED_IMAGES = True # Note that we use an (unneeded) variable here so that pyupgrade doesn't nuke the # if-statement completely. py_version = sys.version_info -if py_version < (3, 8): - print("Synapse requires Python 3.8 or above.") +if py_version < (3, 9): + print("Synapse requires Python 3.9 or above.") sys.exit(1) # Allow using the asyncio reactor via env var. diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index c98b8f4864..d453f28133 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -453,3 +453,6 @@ class ExperimentalConfig(Config): # MSC4210: Remove legacy mentions self.msc4210_enabled: bool = experimental.get("msc4210_enabled", False) + + # MSC4222: Adding `state_after` to sync v2 + self.msc4222_enabled: bool = experimental.get("msc4222_enabled", False) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index f78e66ad0a..315461fefb 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -615,7 +615,7 @@ class E2eKeysHandler: 3. Attempt to fetch fallback keys from the database. Args: - local_query: An iterable of tuples of (user ID, device ID, algorithm). + local_query: An iterable of tuples of (user ID, device ID, algorithm, number of keys). always_include_fallback_keys: True to always include fallback keys. Returns: diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py index a1a6728fb9..85cfbc6dbf 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py @@ -12,6 +12,7 @@ # . # +import itertools import logging from itertools import chain from typing import TYPE_CHECKING, AbstractSet, Dict, List, Mapping, Optional, Set, Tuple @@ -79,6 +80,15 @@ sync_processing_time = Histogram( ["initial"], ) +# Limit the number of state_keys we should remember sending down the connection for each +# (room_id, user_id). We don't want to store and pull out too much data in the database. +# +# 100 is an arbitrary but small-ish number. The idea is that we probably won't send down +# too many redundant member state events (that the client already knows about) for a +# given ongoing conversation if we keep 100 around. Most rooms don't have 100 members +# anyway and it takes a while to cycle through 100 members. +MAX_NUMBER_PREVIOUS_STATE_KEYS_TO_REMEMBER = 100 + class SlidingSyncHandler: def __init__(self, hs: "HomeServer"): @@ -873,6 +883,14 @@ class SlidingSyncHandler: # # Calculate the `StateFilter` based on the `required_state` for the room required_state_filter = StateFilter.none() + # The requested `required_state_map` with the lazy membership expanded and + # `$ME` replaced with the user's ID. This allows us to see what membership we've + # sent down to the client in the next request. + # + # Make a copy so we can modify it. Still need to be careful to make a copy of + # the state key sets if we want to add/remove from them. We could make a deep + # copy but this saves us some work. + expanded_required_state_map = dict(room_sync_config.required_state_map) if room_membership_for_user_at_to_token.membership not in ( Membership.INVITE, Membership.KNOCK, @@ -938,21 +956,48 @@ class SlidingSyncHandler: ): lazy_load_room_members = True # Everyone in the timeline is relevant + # + # FIXME: We probably also care about invite, ban, kick, targets, etc + # but the spec only mentions "senders". timeline_membership: Set[str] = set() if timeline_events is not None: for timeline_event in timeline_events: timeline_membership.add(timeline_event.sender) + # Update the required state filter so we pick up the new + # membership for user_id in timeline_membership: required_state_types.append( (EventTypes.Member, user_id) ) - # FIXME: We probably also care about invite, ban, kick, targets, etc - # but the spec only mentions "senders". + # Add an explicit entry for each user in the timeline + # + # Make a new set or copy of the state key set so we can + # modify it without affecting the original + # `required_state_map` + expanded_required_state_map[EventTypes.Member] = ( + expanded_required_state_map.get( + EventTypes.Member, set() + ) + | timeline_membership + ) elif state_key == StateValues.ME: num_others += 1 required_state_types.append((state_type, user.to_string())) + # Replace `$ME` with the user's ID so we can deduplicate + # when someone requests the same state with `$ME` or with + # their user ID. + # + # Make a new set or copy of the state key set so we can + # modify it without affecting the original + # `required_state_map` + expanded_required_state_map[EventTypes.Member] = ( + expanded_required_state_map.get( + EventTypes.Member, set() + ) + | {user.to_string()} + ) else: num_others += 1 required_state_types.append((state_type, state_key)) @@ -1016,8 +1061,8 @@ class SlidingSyncHandler: changed_required_state_map, added_state_filter = ( _required_state_changes( user.to_string(), - previous_room_config=prev_room_sync_config, - room_sync_config=room_sync_config, + prev_required_state_map=prev_room_sync_config.required_state_map, + request_required_state_map=expanded_required_state_map, state_deltas=room_state_delta_id_map, ) ) @@ -1131,7 +1176,9 @@ class SlidingSyncHandler: # sensible order again. bump_stamp = 0 - room_sync_required_state_map_to_persist = room_sync_config.required_state_map + room_sync_required_state_map_to_persist: Mapping[str, AbstractSet[str]] = ( + expanded_required_state_map + ) if changed_required_state_map: room_sync_required_state_map_to_persist = changed_required_state_map @@ -1185,7 +1232,10 @@ class SlidingSyncHandler: ) else: - new_connection_state.room_configs[room_id] = room_sync_config + new_connection_state.room_configs[room_id] = RoomSyncConfig( + timeline_limit=room_sync_config.timeline_limit, + required_state_map=room_sync_required_state_map_to_persist, + ) set_tag(SynapseTags.RESULT_PREFIX + "initial", initial) @@ -1320,8 +1370,8 @@ class SlidingSyncHandler: def _required_state_changes( user_id: str, *, - previous_room_config: "RoomSyncConfig", - room_sync_config: RoomSyncConfig, + prev_required_state_map: Mapping[str, AbstractSet[str]], + request_required_state_map: Mapping[str, AbstractSet[str]], state_deltas: StateMap[str], ) -> Tuple[Optional[Mapping[str, AbstractSet[str]]], StateFilter]: """Calculates the changes between the required state room config from the @@ -1342,10 +1392,6 @@ def _required_state_changes( and the state filter to use to fetch extra current state that we need to return. """ - - prev_required_state_map = previous_room_config.required_state_map - request_required_state_map = room_sync_config.required_state_map - if prev_required_state_map == request_required_state_map: # There has been no change. Return immediately. return None, StateFilter.none() @@ -1378,12 +1424,19 @@ def _required_state_changes( # client. Passed to `StateFilter.from_types(...)` added: List[Tuple[str, Optional[str]]] = [] + # Convert the list of state deltas to map from type to state_keys that have + # changed. + changed_types_to_state_keys: Dict[str, Set[str]] = {} + for event_type, state_key in state_deltas: + changed_types_to_state_keys.setdefault(event_type, set()).add(state_key) + # First we calculate what, if anything, has been *added*. for event_type in ( prev_required_state_map.keys() | request_required_state_map.keys() ): old_state_keys = prev_required_state_map.get(event_type, set()) request_state_keys = request_required_state_map.get(event_type, set()) + changed_state_keys = changed_types_to_state_keys.get(event_type, set()) if old_state_keys == request_state_keys: # No change to this type @@ -1393,8 +1446,55 @@ def _required_state_changes( # Nothing *added*, so we skip. Removals happen below. continue - # Always update changes to include the newly added keys - changes[event_type] = request_state_keys + # We only remove state keys from the effective state if they've been + # removed from the request *and* the state has changed. This ensures + # that if a client removes and then re-adds a state key, we only send + # down the associated current state event if its changed (rather than + # sending down the same event twice). + invalidated_state_keys = ( + old_state_keys - request_state_keys + ) & changed_state_keys + + # Figure out which state keys we should remember sending down the connection + inheritable_previous_state_keys = ( + # Retain the previous state_keys that we've sent down before. + # Wildcard and lazy state keys are not sticky from previous requests. + (old_state_keys - {StateValues.WILDCARD, StateValues.LAZY}) + - invalidated_state_keys + ) + + # Always update changes to include the newly added keys (we've expanded the set + # of state keys), use the new requested set with whatever hasn't been + # invalidated from the previous set. + changes[event_type] = request_state_keys | inheritable_previous_state_keys + # Limit the number of state_keys we should remember sending down the connection + # for each (room_id, user_id). We don't want to store and pull out too much data + # in the database. This is a happy-medium between remembering nothing and + # everything. We can avoid sending redundant state down the connection most of + # the time given that most rooms don't have 100 members anyway and it takes a + # while to cycle through 100 members. + # + # Only remember up to (MAX_NUMBER_PREVIOUS_STATE_KEYS_TO_REMEMBER) + if len(changes[event_type]) > MAX_NUMBER_PREVIOUS_STATE_KEYS_TO_REMEMBER: + # Reset back to only the requested state keys + changes[event_type] = request_state_keys + + # Skip if there isn't any room to fill in the rest with previous state keys + if len(request_state_keys) < MAX_NUMBER_PREVIOUS_STATE_KEYS_TO_REMEMBER: + # Fill the rest with previous state_keys. Ideally, we could sort + # these by recency but it's just a set so just pick an arbitrary + # subset (good enough). + changes[event_type] = changes[event_type] | set( + itertools.islice( + inheritable_previous_state_keys, + # Just taking the difference isn't perfect as there could be + # overlap in the keys between the requested and previous but we + # will decide to just take the easy route for now and avoid + # additional set operations to figure it out. + MAX_NUMBER_PREVIOUS_STATE_KEYS_TO_REMEMBER + - len(request_state_keys), + ) + ) if StateValues.WILDCARD in old_state_keys: # We were previously fetching everything for this type, so we don't need to @@ -1421,12 +1521,6 @@ def _required_state_changes( added_state_filter = StateFilter.from_types(added) - # Convert the list of state deltas to map from type to state_keys that have - # changed. - changed_types_to_state_keys: Dict[str, Set[str]] = {} - for event_type, state_key in state_deltas: - changed_types_to_state_keys.setdefault(event_type, set()).add(state_key) - # Figure out what changes we need to apply to the effective required state # config. for event_type, changed_state_keys in changed_types_to_state_keys.items(): @@ -1437,15 +1531,23 @@ def _required_state_changes( # No change. continue + # If we see the `user_id` as a state_key, also add "$ME" to the list of state + # that has changed to account for people requesting `required_state` with `$ME` + # or their user ID. + if user_id in changed_state_keys: + changed_state_keys.add(StateValues.ME) + + # We only remove state keys from the effective state if they've been + # removed from the request *and* the state has changed. This ensures + # that if a client removes and then re-adds a state key, we only send + # down the associated current state event if its changed (rather than + # sending down the same event twice). + invalidated_state_keys = ( + old_state_keys - request_state_keys + ) & changed_state_keys + + # We've expanded the set of state keys, ... (already handled above) if request_state_keys - old_state_keys: - # We've expanded the set of state keys, so we just clobber the - # current set with the new set. - # - # We could also ensure that we keep entries where the state hasn't - # changed, but are no longer in the requested required state, but - # that's a sufficient edge case that we can ignore (as its only a - # performance optimization). - changes[event_type] = request_state_keys continue old_state_key_wildcard = StateValues.WILDCARD in old_state_keys @@ -1467,11 +1569,6 @@ def _required_state_changes( changes[event_type] = request_state_keys continue - # Handle "$ME" values by adding "$ME" if the state key matches the user - # ID. - if user_id in changed_state_keys: - changed_state_keys.add(StateValues.ME) - # At this point there are no wildcards and no additions to the set of # state keys requested, only deletions. # @@ -1480,9 +1577,8 @@ def _required_state_changes( # that if a client removes and then re-adds a state key, we only send # down the associated current state event if its changed (rather than # sending down the same event twice). - invalidated = (old_state_keys - request_state_keys) & changed_state_keys - if invalidated: - changes[event_type] = old_state_keys - invalidated + if invalidated_state_keys: + changes[event_type] = old_state_keys - invalidated_state_keys if changes: # Update the required state config based on the changes. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index f4ea90fbd7..df9a088063 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -143,6 +143,7 @@ class SyncConfig: filter_collection: FilterCollection is_guest: bool device_id: Optional[str] + use_state_after: bool @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -1141,6 +1142,7 @@ class SyncHandler: since_token: Optional[StreamToken], end_token: StreamToken, full_state: bool, + joined: bool, ) -> MutableStateMap[EventBase]: """Works out the difference in state between the end of the previous sync and the start of the timeline. @@ -1155,6 +1157,7 @@ class SyncHandler: the point just after their leave event. full_state: Whether to force returning the full state. `lazy_load_members` still applies when `full_state` is `True`. + joined: whether the user is currently joined to the room Returns: The state to return in the sync response for the room. @@ -1230,11 +1233,12 @@ class SyncHandler: if full_state: state_ids = await self._compute_state_delta_for_full_sync( room_id, - sync_config.user, + sync_config, batch, end_token, members_to_fetch, timeline_state, + joined, ) else: # If this is an initial sync then full_state should be set, and @@ -1244,6 +1248,7 @@ class SyncHandler: state_ids = await self._compute_state_delta_for_incremental_sync( room_id, + sync_config, batch, since_token, end_token, @@ -1316,20 +1321,24 @@ class SyncHandler: async def _compute_state_delta_for_full_sync( self, room_id: str, - syncing_user: UserID, + sync_config: SyncConfig, batch: TimelineBatch, end_token: StreamToken, members_to_fetch: Optional[Set[str]], timeline_state: StateMap[str], + joined: bool, ) -> StateMap[str]: """Calculate the state events to be included in a full sync response. As with `_compute_state_delta_for_incremental_sync`, the result will include the membership events for the senders of each event in `members_to_fetch`. + Note that whether this returns the state at the start or the end of the + batch depends on `sync_config.use_state_after` (c.f. MSC4222). + Args: room_id: The room we are calculating for. - syncing_user: The user that is calling `/sync`. + sync_confg: The user that is calling `/sync`. batch: The timeline batch for the room that will be sent to the user. end_token: Token of the end of the current batch. Normally this will be the same as the global "now_token", but if the user has left the room, @@ -1338,10 +1347,11 @@ class SyncHandler: events in the timeline. timeline_state: The contribution to the room state from state events in `batch`. Only contains the last event for any given state key. + joined: whether the user is currently joined to the room Returns: A map from (type, state_key) to event_id, for each event that we believe - should be included in the `state` part of the sync response. + should be included in the `state` or `state_after` part of the sync response. """ if members_to_fetch is not None: # Lazy-loading of membership events is enabled. @@ -1359,7 +1369,7 @@ class SyncHandler: # is no guarantee that our membership will be in the auth events of # timeline events when the room is partial stated. state_filter = StateFilter.from_lazy_load_member_list( - members_to_fetch.union((syncing_user.to_string(),)) + members_to_fetch.union((sync_config.user.to_string(),)) ) # We are happy to use partial state to compute the `/sync` response. @@ -1373,6 +1383,61 @@ class SyncHandler: await_full_state = True lazy_load_members = False + # Check if we are wanting to return the state at the start or end of the + # timeline. If at the end we can just use the current state. + if sync_config.use_state_after: + # If we're getting the state at the end of the timeline, we can just + # use the current state of the room (and roll back any changes + # between when we fetched the current state and `end_token`). + # + # For rooms we're not joined to, there might be a very large number + # of deltas between `end_token` and "now", and so instead we fetch + # the state at the end of the timeline. + if joined: + state_ids = await self._state_storage_controller.get_current_state_ids( + room_id, + state_filter=state_filter, + await_full_state=await_full_state, + ) + + # Now roll back the state by looking at the state deltas between + # end_token and now. + deltas = await self.store.get_current_state_deltas_for_room( + room_id, + from_token=end_token.room_key, + to_token=self.store.get_room_max_token(), + ) + if deltas: + mutable_state_ids = dict(state_ids) + + # We iterate over the deltas backwards so that if there are + # multiple changes of the same type/state_key we'll + # correctly pick the earliest delta. + for delta in reversed(deltas): + if delta.prev_event_id: + mutable_state_ids[(delta.event_type, delta.state_key)] = ( + delta.prev_event_id + ) + elif (delta.event_type, delta.state_key) in mutable_state_ids: + mutable_state_ids.pop((delta.event_type, delta.state_key)) + + state_ids = mutable_state_ids + + return state_ids + + else: + # Just use state groups to get the state at the end of the + # timeline, i.e. the state at the leave/etc event. + state_at_timeline_end = ( + await self._state_storage_controller.get_state_ids_at( + room_id, + stream_position=end_token, + state_filter=state_filter, + await_full_state=await_full_state, + ) + ) + return state_at_timeline_end + state_at_timeline_end = await self._state_storage_controller.get_state_ids_at( room_id, stream_position=end_token, @@ -1405,6 +1470,7 @@ class SyncHandler: async def _compute_state_delta_for_incremental_sync( self, room_id: str, + sync_config: SyncConfig, batch: TimelineBatch, since_token: StreamToken, end_token: StreamToken, @@ -1419,8 +1485,12 @@ class SyncHandler: (`compute_state_delta`) is responsible for keeping track of which membership events we have already sent to the client, and hence ripping them out. + Note that whether this returns the state at the start or the end of the + batch depends on `sync_config.use_state_after` (c.f. MSC4222). + Args: room_id: The room we are calculating for. + sync_config batch: The timeline batch for the room that will be sent to the user. since_token: Token of the end of the previous batch. end_token: Token of the end of the current batch. Normally this will be @@ -1433,7 +1503,7 @@ class SyncHandler: Returns: A map from (type, state_key) to event_id, for each event that we believe - should be included in the `state` part of the sync response. + should be included in the `state` or `state_after` part of the sync response. """ if members_to_fetch is not None: # Lazy-loading is enabled. Only return the state that is needed. @@ -1445,6 +1515,51 @@ class SyncHandler: await_full_state = True lazy_load_members = False + # Check if we are wanting to return the state at the start or end of the + # timeline. If at the end we can just use the current state delta stream. + if sync_config.use_state_after: + delta_state_ids: MutableStateMap[str] = {} + + if members_to_fetch is not None: + # We're lazy-loading, so the client might need some more member + # events to understand the events in this timeline. So we always + # fish out all the member events corresponding to the timeline + # here. The caller will then dedupe any redundant ones. + member_ids = await self._state_storage_controller.get_current_state_ids( + room_id=room_id, + state_filter=StateFilter.from_types( + (EventTypes.Member, member) for member in members_to_fetch + ), + await_full_state=await_full_state, + ) + delta_state_ids.update(member_ids) + + # We don't do LL filtering for incremental syncs - see + # https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346 + # N.B. this slows down incr syncs as we are now processing way more + # state in the server than if we were LLing. + # + # i.e. we return all state deltas, including membership changes that + # we'd normally exclude due to LL. + deltas = await self.store.get_current_state_deltas_for_room( + room_id=room_id, + from_token=since_token.room_key, + to_token=end_token.room_key, + ) + for delta in deltas: + if delta.event_id is None: + # There was a state reset and this state entry is no longer + # present, but we have no way of informing the client about + # this, so we just skip it for now. + continue + + # Note that deltas are in stream ordering, so if there are + # multiple deltas for a given type/state_key we'll always pick + # the latest one. + delta_state_ids[(delta.event_type, delta.state_key)] = delta.event_id + + return delta_state_ids + # For a non-gappy sync if the events in the timeline are simply a linear # chain (i.e. no merging/branching of the graph), then we know the state # delta between the end of the previous sync and start of the new one is @@ -2867,6 +2982,7 @@ class SyncHandler: since_token, room_builder.end_token, full_state=full_state, + joined=room_builder.rtype == "joined", ) else: # An out of band room won't have any state changes. diff --git a/synapse/http/proxy.py b/synapse/http/proxy.py index 97aa429e7d..5cd990b0d0 100644 --- a/synapse/http/proxy.py +++ b/synapse/http/proxy.py @@ -51,25 +51,17 @@ logger = logging.getLogger(__name__) # "Hop-by-hop" headers (as opposed to "end-to-end" headers) as defined by RFC2616 # section 13.5.1 and referenced in RFC9110 section 7.6.1. These are meant to only be # consumed by the immediate recipient and not be forwarded on. -HOP_BY_HOP_HEADERS = { - "Connection", - "Keep-Alive", - "Proxy-Authenticate", - "Proxy-Authorization", - "TE", - "Trailers", - "Transfer-Encoding", - "Upgrade", +HOP_BY_HOP_HEADERS_LOWERCASE = { + "connection", + "keep-alive", + "proxy-authenticate", + "proxy-authorization", + "te", + "trailers", + "transfer-encoding", + "upgrade", } - -if hasattr(Headers, "_canonicalNameCaps"): - # Twisted < 24.7.0rc1 - _canonicalHeaderName = Headers()._canonicalNameCaps # type: ignore[attr-defined] -else: - # Twisted >= 24.7.0rc1 - # But note that `_encodeName` still exists on prior versions, - # it just encodes differently - _canonicalHeaderName = Headers()._encodeName +assert all(header.lower() == header for header in HOP_BY_HOP_HEADERS_LOWERCASE) def parse_connection_header_value( @@ -92,12 +84,12 @@ def parse_connection_header_value( Returns: The set of header names that should not be copied over from the remote response. - The keys are capitalized in canonical capitalization. + The keys are lowercased. """ extra_headers_to_remove: Set[str] = set() if connection_header_value: extra_headers_to_remove = { - _canonicalHeaderName(connection_option.strip()).decode("ascii") + connection_option.decode("ascii").strip().lower() for connection_option in connection_header_value.split(b",") } @@ -194,7 +186,7 @@ class ProxyResource(_AsyncResource): # The `Connection` header also defines which headers should not be copied over. connection_header = response_headers.getRawHeaders(b"connection") - extra_headers_to_remove = parse_connection_header_value( + extra_headers_to_remove_lowercase = parse_connection_header_value( connection_header[0] if connection_header else None ) @@ -202,10 +194,10 @@ class ProxyResource(_AsyncResource): for k, v in response_headers.getAllRawHeaders(): # Do not copy over any hop-by-hop headers. These are meant to only be # consumed by the immediate recipient and not be forwarded on. - header_key = k.decode("ascii") + header_key_lowercase = k.decode("ascii").lower() if ( - header_key in HOP_BY_HOP_HEADERS - or header_key in extra_headers_to_remove + header_key_lowercase in HOP_BY_HOP_HEADERS_LOWERCASE + or header_key_lowercase in extra_headers_to_remove_lowercase ): continue diff --git a/synapse/logging/_remote.py b/synapse/logging/_remote.py index f047edee8e..ac34fa6525 100644 --- a/synapse/logging/_remote.py +++ b/synapse/logging/_remote.py @@ -39,7 +39,7 @@ from twisted.internet.endpoints import ( ) from twisted.internet.interfaces import ( IPushProducer, - IReactorTCP, + IReactorTime, IStreamClientEndpoint, ) from twisted.internet.protocol import Factory, Protocol @@ -113,7 +113,7 @@ class RemoteHandler(logging.Handler): port: int, maximum_buffer: int = 1000, level: int = logging.NOTSET, - _reactor: Optional[IReactorTCP] = None, + _reactor: Optional[IReactorTime] = None, ): super().__init__(level=level) self.host = host diff --git a/synapse/rest/admin/experimental_features.py b/synapse/rest/admin/experimental_features.py index d7913896d9..afb71f4a0f 100644 --- a/synapse/rest/admin/experimental_features.py +++ b/synapse/rest/admin/experimental_features.py @@ -43,12 +43,15 @@ class ExperimentalFeature(str, Enum): MSC3881 = "msc3881" MSC3575 = "msc3575" + MSC4222 = "msc4222" def is_globally_enabled(self, config: "HomeServerConfig") -> bool: if self is ExperimentalFeature.MSC3881: return config.experimental.msc3881_enabled if self is ExperimentalFeature.MSC3575: return config.experimental.msc3575_enabled + if self is ExperimentalFeature.MSC4222: + return config.experimental.msc4222_enabled assert_never(self) diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index 122708e933..5c62a74f41 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -152,6 +152,14 @@ class SyncRestServlet(RestServlet): filter_id = parse_string(request, "filter") full_state = parse_boolean(request, "full_state", default=False) + use_state_after = False + if await self.store.is_feature_enabled( + user.to_string(), ExperimentalFeature.MSC4222 + ): + use_state_after = parse_boolean( + request, "org.matrix.msc4222.use_state_after", default=False + ) + logger.debug( "/sync: user=%r, timeout=%r, since=%r, " "set_presence=%r, filter_id=%r, device_id=%r", @@ -184,6 +192,7 @@ class SyncRestServlet(RestServlet): full_state, device_id, last_ignore_accdata_streampos, + use_state_after, ) if filter_id is None: @@ -220,6 +229,7 @@ class SyncRestServlet(RestServlet): filter_collection=filter_collection, is_guest=requester.is_guest, device_id=device_id, + use_state_after=use_state_after, ) since_token = None @@ -258,7 +268,7 @@ class SyncRestServlet(RestServlet): # We know that the the requester has an access token since appservices # cannot use sync. response_content = await self.encode_response( - time_now, sync_result, requester, filter_collection + time_now, sync_config, sync_result, requester, filter_collection ) logger.debug("Event formatting complete") @@ -268,6 +278,7 @@ class SyncRestServlet(RestServlet): async def encode_response( self, time_now: int, + sync_config: SyncConfig, sync_result: SyncResult, requester: Requester, filter: FilterCollection, @@ -292,7 +303,7 @@ class SyncRestServlet(RestServlet): ) joined = await self.encode_joined( - sync_result.joined, time_now, serialize_options + sync_config, sync_result.joined, time_now, serialize_options ) invited = await self.encode_invited( @@ -304,7 +315,7 @@ class SyncRestServlet(RestServlet): ) archived = await self.encode_archived( - sync_result.archived, time_now, serialize_options + sync_config, sync_result.archived, time_now, serialize_options ) logger.debug("building sync response dict") @@ -372,6 +383,7 @@ class SyncRestServlet(RestServlet): @trace_with_opname("sync.encode_joined") async def encode_joined( self, + sync_config: SyncConfig, rooms: List[JoinedSyncResult], time_now: int, serialize_options: SerializeEventConfig, @@ -380,6 +392,7 @@ class SyncRestServlet(RestServlet): Encode the joined rooms in a sync result Args: + sync_config rooms: list of sync results for rooms this user is joined to time_now: current time - used as a baseline for age calculations serialize_options: Event serializer options @@ -389,7 +402,11 @@ class SyncRestServlet(RestServlet): joined = {} for room in rooms: joined[room.room_id] = await self.encode_room( - room, time_now, joined=True, serialize_options=serialize_options + sync_config, + room, + time_now, + joined=True, + serialize_options=serialize_options, ) return joined @@ -477,6 +494,7 @@ class SyncRestServlet(RestServlet): @trace_with_opname("sync.encode_archived") async def encode_archived( self, + sync_config: SyncConfig, rooms: List[ArchivedSyncResult], time_now: int, serialize_options: SerializeEventConfig, @@ -485,6 +503,7 @@ class SyncRestServlet(RestServlet): Encode the archived rooms in a sync result Args: + sync_config rooms: list of sync results for rooms this user is joined to time_now: current time - used as a baseline for age calculations serialize_options: Event serializer options @@ -494,13 +513,18 @@ class SyncRestServlet(RestServlet): joined = {} for room in rooms: joined[room.room_id] = await self.encode_room( - room, time_now, joined=False, serialize_options=serialize_options + sync_config, + room, + time_now, + joined=False, + serialize_options=serialize_options, ) return joined async def encode_room( self, + sync_config: SyncConfig, room: Union[JoinedSyncResult, ArchivedSyncResult], time_now: int, joined: bool, @@ -508,6 +532,7 @@ class SyncRestServlet(RestServlet): ) -> JsonDict: """ Args: + sync_config room: sync result for a single room time_now: current time - used as a baseline for age calculations token_id: ID of the user's auth token - used for namespacing @@ -548,13 +573,20 @@ class SyncRestServlet(RestServlet): account_data = room.account_data + # We either include a `state` or `state_after` field depending on + # whether the client has opted in to the newer `state_after` behavior. + if sync_config.use_state_after: + state_key_name = "org.matrix.msc4222.state_after" + else: + state_key_name = "state" + result: JsonDict = { "timeline": { "events": serialized_timeline, "prev_batch": await room.timeline.prev_batch.to_string(self.store), "limited": room.timeline.limited, }, - "state": {"events": serialized_state}, + state_key_name: {"events": serialized_state}, "account_data": {"events": account_data}, } @@ -688,6 +720,7 @@ class SlidingSyncE2eeRestServlet(RestServlet): filter_collection=self.only_member_events_filter_collection, is_guest=requester.is_guest, device_id=device_id, + use_state_after=False, # We don't return any rooms so this flag is a no-op ) since_token = None diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index 575aaf498b..1fbc49e7c5 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -99,6 +99,13 @@ class EndToEndKeyBackgroundStore(SQLBaseStore): unique=True, ) + self.db_pool.updates.register_background_index_update( + update_name="add_otk_ts_added_index", + index_name="e2e_one_time_keys_json_user_id_device_id_algorithm_ts_added_idx", + table="e2e_one_time_keys_json", + columns=("user_id", "device_id", "algorithm", "ts_added_ms"), + ) + class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorkerStore): def __init__( @@ -1122,7 +1129,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker """Take a list of one time keys out of the database. Args: - query_list: An iterable of tuples of (user ID, device ID, algorithm). + query_list: An iterable of tuples of (user ID, device ID, algorithm, number of keys). Returns: A tuple (results, missing) of: @@ -1310,9 +1317,14 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker OTK was found. """ + # Return the oldest keys from this device (based on `ts_added_ms`). + # Doing so means that keys are issued in the same order they were uploaded, + # which reduces the chances of a client expiring its copy of a (private) + # key while the public key is still on the server, waiting to be issued. sql = """ SELECT key_id, key_json FROM e2e_one_time_keys_json WHERE user_id = ? AND device_id = ? AND algorithm = ? + ORDER BY ts_added_ms LIMIT ? """ @@ -1354,13 +1366,22 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker A list of tuples (user_id, device_id, algorithm, key_id, key_json) for each OTK claimed. """ + # Find, delete, and return the oldest keys from each device (based on + # `ts_added_ms`). + # + # Doing so means that keys are issued in the same order they were uploaded, + # which reduces the chances of a client expiring its copy of a (private) + # key while the public key is still on the server, waiting to be issued. sql = """ WITH claims(user_id, device_id, algorithm, claim_count) AS ( VALUES ? ), ranked_keys AS ( SELECT user_id, device_id, algorithm, key_id, claim_count, - ROW_NUMBER() OVER (PARTITION BY (user_id, device_id, algorithm)) AS r + ROW_NUMBER() OVER ( + PARTITION BY (user_id, device_id, algorithm) + ORDER BY ts_added_ms + ) AS r FROM e2e_one_time_keys_json JOIN claims USING (user_id, device_id, algorithm) ) diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 33569a4391..cc3ce0951e 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -2550,7 +2550,9 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): still contains events with partial state. """ try: - async with self._un_partial_stated_rooms_stream_id_gen.get_next() as un_partial_state_room_stream_id: + async with ( + self._un_partial_stated_rooms_stream_id_gen.get_next() as un_partial_state_room_stream_id + ): await self.db_pool.runInteraction( "clear_partial_state_room", self._clear_partial_state_room_txn, diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index 60312d770d..42b3638e1c 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -681,7 +681,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): context: EventContext, ) -> None: """Update the state group for a partial state event""" - async with self._un_partial_stated_events_stream_id_gen.get_next() as un_partial_state_event_stream_id: + async with ( + self._un_partial_stated_events_stream_id_gen.get_next() as un_partial_state_event_stream_id + ): await self.db_pool.runInteraction( "update_state_for_partial_state_event", self._update_state_for_partial_state_event_txn, diff --git a/synapse/storage/schema/main/delta/88/03_add_otk_ts_added_index.sql b/synapse/storage/schema/main/delta/88/03_add_otk_ts_added_index.sql new file mode 100644 index 0000000000..7712ea68ad --- /dev/null +++ b/synapse/storage/schema/main/delta/88/03_add_otk_ts_added_index.sql @@ -0,0 +1,18 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2024 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- . + + +-- Add an index on (user_id, device_id, algorithm, ts_added_ms) on e2e_one_time_keys_json, so that OTKs can +-- efficiently be issued in the same order they were uploaded. +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (8803, 'add_otk_ts_added_index', '{}'); diff --git a/synapse/util/wheel_timer.py b/synapse/util/wheel_timer.py index 44b109bdfd..95eb1d7185 100644 --- a/synapse/util/wheel_timer.py +++ b/synapse/util/wheel_timer.py @@ -47,7 +47,6 @@ class WheelTimer(Generic[T]): """ self.bucket_size: int = bucket_size self.entries: List[_Entry[T]] = [] - self.current_tick: int = 0 def insert(self, now: int, obj: T, then: int) -> None: """Inserts object into timer. @@ -78,11 +77,10 @@ class WheelTimer(Generic[T]): self.entries[max(min_key, then_key) - min_key].elements.add(obj) return - next_key = now_key + 1 if self.entries: - last_key = self.entries[-1].end_key + last_key = self.entries[-1].end_key + 1 else: - last_key = next_key + last_key = now_key + 1 # Handle the case when `then` is in the past and `entries` is empty. then_key = max(last_key, then_key) diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py index 8a3dfdcf75..bca314db83 100644 --- a/tests/handlers/test_e2e_keys.py +++ b/tests/handlers/test_e2e_keys.py @@ -151,18 +151,30 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase): def test_claim_one_time_key(self) -> None: local_user = "@boris:" + self.hs.hostname device_id = "xyz" - keys = {"alg1:k1": "key1"} - res = self.get_success( self.handler.upload_keys_for_user( - local_user, device_id, {"one_time_keys": keys} + local_user, device_id, {"one_time_keys": {"alg1:k1": "key1"}} ) ) self.assertDictEqual( res, {"one_time_key_counts": {"alg1": 1, "signed_curve25519": 0}} ) - res2 = self.get_success( + # Keys should be returned in the order they were uploaded. To test, advance time + # a little, then upload a second key with an earlier key ID; it should get + # returned second. + self.reactor.advance(1) + res = self.get_success( + self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": {"alg1:k0": "key0"}} + ) + ) + self.assertDictEqual( + res, {"one_time_key_counts": {"alg1": 2, "signed_curve25519": 0}} + ) + + # now claim both keys back. They should be in the same order + res = self.get_success( self.handler.claim_one_time_keys( {local_user: {device_id: {"alg1": 1}}}, self.requester, @@ -171,12 +183,27 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase): ) ) self.assertEqual( - res2, + res, { "failures": {}, "one_time_keys": {local_user: {device_id: {"alg1:k1": "key1"}}}, }, ) + res = self.get_success( + self.handler.claim_one_time_keys( + {local_user: {device_id: {"alg1": 1}}}, + self.requester, + timeout=None, + always_include_fallback_keys=False, + ) + ) + self.assertEqual( + res, + { + "failures": {}, + "one_time_keys": {local_user: {device_id: {"alg1:k0": "key0"}}}, + }, + ) def test_claim_one_time_key_bulk(self) -> None: """Like test_claim_one_time_key but claims multiple keys in one handler call.""" @@ -336,6 +363,47 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase): counts_by_alg, expected_counts_by_alg, f"{user_id}:{device_id}" ) + def test_claim_one_time_key_bulk_ordering(self) -> None: + """Keys returned by the bulk claim call should be returned in the correct order""" + + # Alice has lots of keys, uploaded in a specific order + alice = f"@alice:{self.hs.hostname}" + alice_dev = "alice_dev_1" + + self.get_success( + self.handler.upload_keys_for_user( + alice, + alice_dev, + {"one_time_keys": {"alg1:k20": 20, "alg1:k21": 21, "alg1:k22": 22}}, + ) + ) + # Advance time by 1s, to ensure that there is a difference in upload time. + self.reactor.advance(1) + self.get_success( + self.handler.upload_keys_for_user( + alice, + alice_dev, + {"one_time_keys": {"alg1:k10": 10, "alg1:k11": 11, "alg1:k12": 12}}, + ) + ) + + # Now claim some, and check we get the right ones. + claim_res = self.get_success( + self.handler.claim_one_time_keys( + {alice: {alice_dev: {"alg1": 2}}}, + self.requester, + timeout=None, + always_include_fallback_keys=False, + ) + ) + # We should get the first-uploaded keys, even though they have later key ids. + # We should get a random set of two of k20, k21, k22. + self.assertEqual(claim_res["failures"], {}) + claimed_keys = claim_res["one_time_keys"]["@alice:test"]["alice_dev_1"] + self.assertEqual(len(claimed_keys), 2) + for key_id in claimed_keys.keys(): + self.assertIn(key_id, ["alg1:k20", "alg1:k21", "alg1:k22"]) + def test_fallback_key(self) -> None: local_user = "@boris:" + self.hs.hostname device_id = "xyz" diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index 9847893fce..b64a8a86a2 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -661,9 +661,12 @@ class PartialJoinTestCase(unittest.FederatingHomeserverTestCase): ) ) - with patch.object( - fed_client, "make_membership_event", mock_make_membership_event - ), patch.object(fed_client, "send_join", mock_send_join): + with ( + patch.object( + fed_client, "make_membership_event", mock_make_membership_event + ), + patch.object(fed_client, "send_join", mock_send_join), + ): # Join and check that our join event is rejected # (The join event is rejected because it doesn't have any signatures) join_exc = self.get_failure( @@ -708,9 +711,12 @@ class PartialJoinTestCase(unittest.FederatingHomeserverTestCase): fed_handler = self.hs.get_federation_handler() store = self.hs.get_datastores().main - with patch.object( - fed_handler, "_sync_partial_state_room", mock_sync_partial_state_room - ), patch.object(store, "is_partial_state_room", mock_is_partial_state_room): + with ( + patch.object( + fed_handler, "_sync_partial_state_room", mock_sync_partial_state_room + ), + patch.object(store, "is_partial_state_room", mock_is_partial_state_room), + ): # Start the partial state sync. fed_handler._start_partial_state_room_sync("hs1", {"hs2"}, "room_id") self.assertEqual(mock_sync_partial_state_room.call_count, 1) @@ -760,9 +766,12 @@ class PartialJoinTestCase(unittest.FederatingHomeserverTestCase): fed_handler = self.hs.get_federation_handler() store = self.hs.get_datastores().main - with patch.object( - fed_handler, "_sync_partial_state_room", mock_sync_partial_state_room - ), patch.object(store, "is_partial_state_room", mock_is_partial_state_room): + with ( + patch.object( + fed_handler, "_sync_partial_state_room", mock_sync_partial_state_room + ), + patch.object(store, "is_partial_state_room", mock_is_partial_state_room), + ): # Start the partial state sync. fed_handler._start_partial_state_room_sync("hs1", {"hs2"}, "room_id") self.assertEqual(mock_sync_partial_state_room.call_count, 1) diff --git a/tests/handlers/test_room_member.py b/tests/handlers/test_room_member.py index ad77356ede..f43ce66483 100644 --- a/tests/handlers/test_room_member.py +++ b/tests/handlers/test_room_member.py @@ -172,20 +172,25 @@ class TestJoinsLimitedByPerRoomRateLimiter(FederatingHomeserverTestCase): ) ) - with patch.object( - self.handler.federation_handler.federation_client, - "make_membership_event", - mock_make_membership_event, - ), patch.object( - self.handler.federation_handler.federation_client, - "send_join", - mock_send_join, - ), patch( - "synapse.event_auth._is_membership_change_allowed", - return_value=None, - ), patch( - "synapse.handlers.federation_event.check_state_dependent_auth_rules", - return_value=None, + with ( + patch.object( + self.handler.federation_handler.federation_client, + "make_membership_event", + mock_make_membership_event, + ), + patch.object( + self.handler.federation_handler.federation_client, + "send_join", + mock_send_join, + ), + patch( + "synapse.event_auth._is_membership_change_allowed", + return_value=None, + ), + patch( + "synapse.handlers.federation_event.check_state_dependent_auth_rules", + return_value=None, + ), ): self.get_success( self.handler.update_membership( diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 9a68d1dd95..5b7e2937f8 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -33,6 +33,7 @@ from synapse.api.constants import ( ) from synapse.api.room_versions import RoomVersions from synapse.handlers.sliding_sync import ( + MAX_NUMBER_PREVIOUS_STATE_KEYS_TO_REMEMBER, RoomsForUserType, RoomSyncConfig, StateValues, @@ -3319,6 +3320,32 @@ class RequiredStateChangesTestCase(unittest.TestCase): ), ), ), + ( + "simple_retain_previous_state_keys", + """Test adding a state key to the config and retaining a previously sent state_key""", + RequiredStateChangesTestParameters( + previous_required_state_map={"type": {"state_key1"}}, + request_required_state_map={"type": {"state_key2", "state_key3"}}, + state_deltas={("type", "state_key2"): "$event_id"}, + expected_with_state_deltas=( + # We've added a key so we should persist the changed required state + # config. + # + # Retain `state_key1` from the `previous_required_state_map` + {"type": {"state_key1", "state_key2", "state_key3"}}, + # We should see the new state_keys added + StateFilter.from_types( + [("type", "state_key2"), ("type", "state_key3")] + ), + ), + expected_without_state_deltas=( + {"type": {"state_key1", "state_key2", "state_key3"}}, + StateFilter.from_types( + [("type", "state_key2"), ("type", "state_key3")] + ), + ), + ), + ), ( "simple_remove_type", """ @@ -3724,6 +3751,249 @@ class RequiredStateChangesTestCase(unittest.TestCase): ), ), ), + ( + "state_key_lazy_keep_previous_memberships_and_no_new_memberships", + """ + This test mimics a request with lazy-loading room members enabled where + we have previously sent down user2 and user3's membership events and now + we're sending down another response without any timeline events. + """, + RequiredStateChangesTestParameters( + previous_required_state_map={ + EventTypes.Member: { + StateValues.LAZY, + "@user2:test", + "@user3:test", + } + }, + request_required_state_map={EventTypes.Member: {StateValues.LAZY}}, + state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"}, + expected_with_state_deltas=( + # Remove "@user2:test" since that state has changed and is no + # longer being requested anymore. Since something was removed, + # we should persist the changed to required state. That way next + # time, they request "@user2:test", we see that we haven't sent + # it before and send the new state. (we should still keep track + # that we've sent specific `EventTypes.Member` before) + { + EventTypes.Member: { + StateValues.LAZY, + "@user3:test", + } + }, + # We don't need to request anything more if they are requesting + # less state now + StateFilter.none(), + ), + expected_without_state_deltas=( + # We're not requesting any specific `EventTypes.Member` now but + # since that state hasn't changed, nothing should change (we + # should still keep track that we've sent specific + # `EventTypes.Member` before). + None, + # We don't need to request anything more if they are requesting + # less state now + StateFilter.none(), + ), + ), + ), + ( + "state_key_lazy_keep_previous_memberships_with_new_memberships", + """ + This test mimics a request with lazy-loading room members enabled where + we have previously sent down user2 and user3's membership events and now + we're sending down another response with a new event from user4. + """, + RequiredStateChangesTestParameters( + previous_required_state_map={ + EventTypes.Member: { + StateValues.LAZY, + "@user2:test", + "@user3:test", + } + }, + request_required_state_map={ + EventTypes.Member: {StateValues.LAZY, "@user4:test"} + }, + state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"}, + expected_with_state_deltas=( + # Since "@user4:test" was added, we should persist the changed + # required state config. + # + # Also remove "@user2:test" since that state has changed and is no + # longer being requested anymore. Since something was removed, + # we also should persist the changed to required state. That way next + # time, they request "@user2:test", we see that we haven't sent + # it before and send the new state. (we should still keep track + # that we've sent specific `EventTypes.Member` before) + { + EventTypes.Member: { + StateValues.LAZY, + "@user3:test", + "@user4:test", + } + }, + # We should see the new state_keys added + StateFilter.from_types([(EventTypes.Member, "@user4:test")]), + ), + expected_without_state_deltas=( + # Since "@user4:test" was added, we should persist the changed + # required state config. + { + EventTypes.Member: { + StateValues.LAZY, + "@user2:test", + "@user3:test", + "@user4:test", + } + }, + # We should see the new state_keys added + StateFilter.from_types([(EventTypes.Member, "@user4:test")]), + ), + ), + ), + ( + "state_key_expand_lazy_keep_previous_memberships", + """ + Test expanding the `required_state` to lazy-loading room members. + """, + RequiredStateChangesTestParameters( + previous_required_state_map={ + EventTypes.Member: {"@user2:test", "@user3:test"} + }, + request_required_state_map={EventTypes.Member: {StateValues.LAZY}}, + state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"}, + expected_with_state_deltas=( + # Since `StateValues.LAZY` was added, we should persist the + # changed required state config. + # + # Also remove "@user2:test" since that state has changed and is no + # longer being requested anymore. Since something was removed, + # we also should persist the changed to required state. That way next + # time, they request "@user2:test", we see that we haven't sent + # it before and send the new state. (we should still keep track + # that we've sent specific `EventTypes.Member` before) + { + EventTypes.Member: { + StateValues.LAZY, + "@user3:test", + } + }, + # We don't need to request anything more if they are requesting + # less state now + StateFilter.none(), + ), + expected_without_state_deltas=( + # Since `StateValues.LAZY` was added, we should persist the + # changed required state config. + { + EventTypes.Member: { + StateValues.LAZY, + "@user2:test", + "@user3:test", + } + }, + # We don't need to request anything more if they are requesting + # less state now + StateFilter.none(), + ), + ), + ), + ( + "state_key_retract_lazy_keep_previous_memberships_no_new_memberships", + """ + Test retracting the `required_state` to no longer lazy-loading room members. + """, + RequiredStateChangesTestParameters( + previous_required_state_map={ + EventTypes.Member: { + StateValues.LAZY, + "@user2:test", + "@user3:test", + } + }, + request_required_state_map={}, + state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"}, + expected_with_state_deltas=( + # Remove `EventTypes.Member` since there's been a change to that + # state, (persist the change to required state). That way next + # time, they request `EventTypes.Member`, we see that we haven't + # sent it before and send the new state. (if we were tracking + # that we sent any other state, we should still keep track + # that). + # + # This acts the same as the `simple_remove_type` test. It's + # possible that we could remember the specific `state_keys` that + # we have sent down before but this currently just acts the same + # as if a whole `type` was removed. Perhaps it's good that we + # "garbage collect" and forget what we've sent before for a + # given `type` when the client stops caring about a certain + # `type`. + {}, + # We don't need to request anything more if they are requesting + # less state now + StateFilter.none(), + ), + expected_without_state_deltas=( + # `EventTypes.Member` is no longer requested but since that + # state hasn't changed, nothing should change (we should still + # keep track that we've sent `EventTypes.Member` before). + None, + # We don't need to request anything more if they are requesting + # less state now + StateFilter.none(), + ), + ), + ), + ( + "state_key_retract_lazy_keep_previous_memberships_with_new_memberships", + """ + Test retracting the `required_state` to no longer lazy-loading room members. + """, + RequiredStateChangesTestParameters( + previous_required_state_map={ + EventTypes.Member: { + StateValues.LAZY, + "@user2:test", + "@user3:test", + } + }, + request_required_state_map={EventTypes.Member: {"@user4:test"}}, + state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"}, + expected_with_state_deltas=( + # Since "@user4:test" was added, we should persist the changed + # required state config. + # + # Also remove "@user2:test" since that state has changed and is no + # longer being requested anymore. Since something was removed, + # we also should persist the changed to required state. That way next + # time, they request "@user2:test", we see that we haven't sent + # it before and send the new state. (we should still keep track + # that we've sent specific `EventTypes.Member` before) + { + EventTypes.Member: { + "@user3:test", + "@user4:test", + } + }, + # We should see the new state_keys added + StateFilter.from_types([(EventTypes.Member, "@user4:test")]), + ), + expected_without_state_deltas=( + # Since "@user4:test" was added, we should persist the changed + # required state config. + { + EventTypes.Member: { + "@user2:test", + "@user3:test", + "@user4:test", + } + }, + # We should see the new state_keys added + StateFilter.from_types([(EventTypes.Member, "@user4:test")]), + ), + ), + ), ( "type_wildcard_with_state_key_wildcard_to_explicit_state_keys", """ @@ -3824,7 +4094,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): ), ), ( - "state_key_wildcard_to_explicit_state_keys", + "explicit_state_keys_to_wildcard_state_key", """Test switching from a wildcard to explicit state keys with a concrete type""", RequiredStateChangesTestParameters( previous_required_state_map={ @@ -3837,11 +4107,18 @@ class RequiredStateChangesTestCase(unittest.TestCase): # request. And we need to request all of the state for that type # because we previously, only sent down a few keys. expected_with_state_deltas=( - {"type1": {StateValues.WILDCARD}}, + {"type1": {StateValues.WILDCARD, "state_key2", "state_key3"}}, StateFilter.from_types([("type1", None)]), ), expected_without_state_deltas=( - {"type1": {StateValues.WILDCARD}}, + { + "type1": { + StateValues.WILDCARD, + "state_key1", + "state_key2", + "state_key3", + } + }, StateFilter.from_types([("type1", None)]), ), ), @@ -3857,14 +4134,8 @@ class RequiredStateChangesTestCase(unittest.TestCase): # Without `state_deltas` changed_required_state_map, added_state_filter = _required_state_changes( user_id="@user:test", - previous_room_config=RoomSyncConfig( - timeline_limit=0, - required_state_map=test_parameters.previous_required_state_map, - ), - room_sync_config=RoomSyncConfig( - timeline_limit=0, - required_state_map=test_parameters.request_required_state_map, - ), + prev_required_state_map=test_parameters.previous_required_state_map, + request_required_state_map=test_parameters.request_required_state_map, state_deltas={}, ) @@ -3882,14 +4153,8 @@ class RequiredStateChangesTestCase(unittest.TestCase): # With `state_deltas` changed_required_state_map, added_state_filter = _required_state_changes( user_id="@user:test", - previous_room_config=RoomSyncConfig( - timeline_limit=0, - required_state_map=test_parameters.previous_required_state_map, - ), - room_sync_config=RoomSyncConfig( - timeline_limit=0, - required_state_map=test_parameters.request_required_state_map, - ), + prev_required_state_map=test_parameters.previous_required_state_map, + request_required_state_map=test_parameters.request_required_state_map, state_deltas=test_parameters.state_deltas, ) @@ -3903,3 +4168,121 @@ class RequiredStateChangesTestCase(unittest.TestCase): test_parameters.expected_with_state_deltas[1], "added_state_filter does not match (with state_deltas)", ) + + @parameterized.expand( + [ + # Test with a normal arbitrary type (no special meaning) + ("arbitrary_type", "type", set()), + # Test with membership + ("membership", EventTypes.Member, set()), + # Test with lazy-loading room members + ("lazy_loading_membership", EventTypes.Member, {StateValues.LAZY}), + ] + ) + def test_limit_retained_previous_state_keys( + self, + _test_label: str, + event_type: str, + extra_state_keys: Set[str], + ) -> None: + """ + Test that we limit the number of state_keys that we remember but always include + the state_keys that we've just requested. + """ + previous_required_state_map = { + event_type: { + # Prefix the state_keys we've "prev_"iously sent so they are easier to + # identify in our assertions. + f"prev_state_key{i}" + for i in range(MAX_NUMBER_PREVIOUS_STATE_KEYS_TO_REMEMBER - 30) + } + | extra_state_keys + } + request_required_state_map = { + event_type: {f"state_key{i}" for i in range(50)} | extra_state_keys + } + + # (function under test) + changed_required_state_map, added_state_filter = _required_state_changes( + user_id="@user:test", + prev_required_state_map=previous_required_state_map, + request_required_state_map=request_required_state_map, + state_deltas={}, + ) + assert changed_required_state_map is not None + + # We should only remember up to the maximum number of state keys + self.assertGreaterEqual( + len(changed_required_state_map[event_type]), + # Most of the time this will be `MAX_NUMBER_PREVIOUS_STATE_KEYS_TO_REMEMBER` but + # because we are just naively selecting enough previous state_keys to fill + # the limit, there might be some overlap in what's added back which means we + # might have slightly less than the limit. + # + # `extra_state_keys` overlaps in the previous and requested + # `required_state_map` so we might see this this scenario. + MAX_NUMBER_PREVIOUS_STATE_KEYS_TO_REMEMBER - len(extra_state_keys), + ) + + # Should include all of the requested state + self.assertIncludes( + changed_required_state_map[event_type], + request_required_state_map[event_type], + ) + # And the rest is filled with the previous state keys + # + # We can't assert the exact state_keys since we don't know the order so we just + # check that they all start with "prev_" and that we have the correct amount. + remaining_state_keys = ( + changed_required_state_map[event_type] + - request_required_state_map[event_type] + ) + self.assertGreater( + len(remaining_state_keys), + 0, + ) + assert all( + state_key.startswith("prev_") for state_key in remaining_state_keys + ), "Remaining state_keys should be the previous state_keys" + + def test_request_more_state_keys_than_remember_limit(self) -> None: + """ + Test requesting more state_keys than fit in our limit to remember from previous + requests. + """ + previous_required_state_map = { + "type": { + # Prefix the state_keys we've "prev_"iously sent so they are easier to + # identify in our assertions. + f"prev_state_key{i}" + for i in range(MAX_NUMBER_PREVIOUS_STATE_KEYS_TO_REMEMBER - 30) + } + } + request_required_state_map = { + "type": { + f"state_key{i}" + # Requesting more than the MAX_NUMBER_PREVIOUS_STATE_KEYS_TO_REMEMBER + for i in range(MAX_NUMBER_PREVIOUS_STATE_KEYS_TO_REMEMBER + 20) + } + } + # Ensure that we are requesting more than the limit + self.assertGreater( + len(request_required_state_map["type"]), + MAX_NUMBER_PREVIOUS_STATE_KEYS_TO_REMEMBER, + ) + + # (function under test) + changed_required_state_map, added_state_filter = _required_state_changes( + user_id="@user:test", + prev_required_state_map=previous_required_state_map, + request_required_state_map=request_required_state_map, + state_deltas={}, + ) + assert changed_required_state_map is not None + + # Should include all of the requested state + self.assertIncludes( + changed_required_state_map["type"], + request_required_state_map["type"], + exact=True, + ) diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index d7bbc68037..1960d2f0e1 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -20,7 +20,7 @@ from typing import Collection, ContextManager, List, Optional from unittest.mock import AsyncMock, Mock, patch -from parameterized import parameterized +from parameterized import parameterized, parameterized_class from twisted.internet import defer from twisted.test.proto_helpers import MemoryReactor @@ -32,7 +32,13 @@ from synapse.api.room_versions import RoomVersion, RoomVersions from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.federation.federation_base import event_from_pdu_json -from synapse.handlers.sync import SyncConfig, SyncRequestKey, SyncResult, SyncVersion +from synapse.handlers.sync import ( + SyncConfig, + SyncRequestKey, + SyncResult, + SyncVersion, + TimelineBatch, +) from synapse.rest import admin from synapse.rest.client import knock, login, room from synapse.server import HomeServer @@ -58,9 +64,21 @@ def generate_request_key() -> SyncRequestKey: return ("request_key", _request_key) +@parameterized_class( + ("use_state_after",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, + num, + params_dict: f"{cls.__name__}_{'state_after' if params_dict['use_state_after'] else 'state'}", +) class SyncTestCase(tests.unittest.HomeserverTestCase): """Tests Sync Handler.""" + use_state_after: bool + servlets = [ admin.register_servlets, knock.register_servlets, @@ -79,7 +97,9 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): def test_wait_for_sync_for_user_auth_blocking(self) -> None: user_id1 = "@user1:test" user_id2 = "@user2:test" - sync_config = generate_sync_config(user_id1) + sync_config = generate_sync_config( + user_id1, use_state_after=self.use_state_after + ) requester = create_requester(user_id1) self.reactor.advance(100) # So we get not 0 time @@ -112,7 +132,9 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): self.auth_blocking._hs_disabled = False - sync_config = generate_sync_config(user_id2) + sync_config = generate_sync_config( + user_id2, use_state_after=self.use_state_after + ) requester = create_requester(user_id2) e = self.get_failure( @@ -141,7 +163,9 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): initial_result = self.get_success( self.sync_handler.wait_for_sync_for_user( requester, - sync_config=generate_sync_config(user, device_id="dev"), + sync_config=generate_sync_config( + user, device_id="dev", use_state_after=self.use_state_after + ), sync_version=SyncVersion.SYNC_V2, request_key=generate_request_key(), ) @@ -175,7 +199,9 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): result = self.get_success( self.sync_handler.wait_for_sync_for_user( requester, - sync_config=generate_sync_config(user), + sync_config=generate_sync_config( + user, use_state_after=self.use_state_after + ), sync_version=SyncVersion.SYNC_V2, request_key=generate_request_key(), ) @@ -188,7 +214,9 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): result = self.get_success( self.sync_handler.wait_for_sync_for_user( requester, - sync_config=generate_sync_config(user, device_id="dev"), + sync_config=generate_sync_config( + user, device_id="dev", use_state_after=self.use_state_after + ), sync_version=SyncVersion.SYNC_V2, request_key=generate_request_key(), since_token=initial_result.next_batch, @@ -220,7 +248,9 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): result = self.get_success( self.sync_handler.wait_for_sync_for_user( requester, - sync_config=generate_sync_config(user), + sync_config=generate_sync_config( + user, use_state_after=self.use_state_after + ), sync_version=SyncVersion.SYNC_V2, request_key=generate_request_key(), ) @@ -233,7 +263,9 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): result = self.get_success( self.sync_handler.wait_for_sync_for_user( requester, - sync_config=generate_sync_config(user, device_id="dev"), + sync_config=generate_sync_config( + user, device_id="dev", use_state_after=self.use_state_after + ), sync_version=SyncVersion.SYNC_V2, request_key=generate_request_key(), since_token=initial_result.next_batch, @@ -276,7 +308,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): alice_sync_result: SyncResult = self.get_success( self.sync_handler.wait_for_sync_for_user( create_requester(owner), - generate_sync_config(owner), + generate_sync_config(owner, use_state_after=self.use_state_after), sync_version=SyncVersion.SYNC_V2, request_key=generate_request_key(), ) @@ -296,7 +328,9 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): # Eve syncs. eve_requester = create_requester(eve) - eve_sync_config = generate_sync_config(eve) + eve_sync_config = generate_sync_config( + eve, use_state_after=self.use_state_after + ) eve_sync_after_ban: SyncResult = self.get_success( self.sync_handler.wait_for_sync_for_user( eve_requester, @@ -367,7 +401,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): initial_sync_result = self.get_success( self.sync_handler.wait_for_sync_for_user( alice_requester, - generate_sync_config(alice), + generate_sync_config(alice, use_state_after=self.use_state_after), sync_version=SyncVersion.SYNC_V2, request_key=generate_request_key(), ) @@ -396,6 +430,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): filter_collection=FilterCollection( self.hs, {"room": {"timeline": {"limit": 2}}} ), + use_state_after=self.use_state_after, ), sync_version=SyncVersion.SYNC_V2, request_key=generate_request_key(), @@ -442,7 +477,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): initial_sync_result = self.get_success( self.sync_handler.wait_for_sync_for_user( alice_requester, - generate_sync_config(alice), + generate_sync_config(alice, use_state_after=self.use_state_after), sync_version=SyncVersion.SYNC_V2, request_key=generate_request_key(), ) @@ -481,6 +516,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): } }, ), + use_state_after=self.use_state_after, ), sync_version=SyncVersion.SYNC_V2, request_key=generate_request_key(), @@ -518,6 +554,8 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): ... and a filter that means we only return 1 event, represented by the dashed horizontal lines: `S2` must be included in the `state` section on the second sync. + + When `use_state_after` is enabled, then we expect to see `s2` in the first sync. """ alice = self.register_user("alice", "password") alice_tok = self.login(alice, "password") @@ -528,7 +566,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): initial_sync_result = self.get_success( self.sync_handler.wait_for_sync_for_user( alice_requester, - generate_sync_config(alice), + generate_sync_config(alice, use_state_after=self.use_state_after), sync_version=SyncVersion.SYNC_V2, request_key=generate_request_key(), ) @@ -554,6 +592,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): filter_collection=FilterCollection( self.hs, {"room": {"timeline": {"limit": 1}}} ), + use_state_after=self.use_state_after, ), sync_version=SyncVersion.SYNC_V2, request_key=generate_request_key(), @@ -567,10 +606,18 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): [e.event_id for e in room_sync.timeline.events], [e3_event], ) - self.assertEqual( - [e.event_id for e in room_sync.state.values()], - [], - ) + + if self.use_state_after: + # When using `state_after` we get told about s2 immediately + self.assertEqual( + [e.event_id for e in room_sync.state.values()], + [s2_event], + ) + else: + self.assertEqual( + [e.event_id for e in room_sync.state.values()], + [], + ) # Now send another event that points to S2, but not E3. with self._patch_get_latest_events([s2_event]): @@ -585,6 +632,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): filter_collection=FilterCollection( self.hs, {"room": {"timeline": {"limit": 1}}} ), + use_state_after=self.use_state_after, ), sync_version=SyncVersion.SYNC_V2, request_key=generate_request_key(), @@ -598,10 +646,19 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): [e.event_id for e in room_sync.timeline.events], [e4_event], ) - self.assertEqual( - [e.event_id for e in room_sync.state.values()], - [s2_event], - ) + + if self.use_state_after: + # When using `state_after` we got told about s2 previously, so we + # don't again. + self.assertEqual( + [e.event_id for e in room_sync.state.values()], + [], + ) + else: + self.assertEqual( + [e.event_id for e in room_sync.state.values()], + [s2_event], + ) def test_state_includes_changes_on_ungappy_syncs(self) -> None: """Test `state` where the sync is not gappy. @@ -638,6 +695,8 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): This is the last chance for us to tell the client about S2, so it *must* be included in the response. + + When `use_state_after` is enabled, then we expect to see `s2` in the first sync. """ alice = self.register_user("alice", "password") alice_tok = self.login(alice, "password") @@ -648,7 +707,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): initial_sync_result = self.get_success( self.sync_handler.wait_for_sync_for_user( alice_requester, - generate_sync_config(alice), + generate_sync_config(alice, use_state_after=self.use_state_after), sync_version=SyncVersion.SYNC_V2, request_key=generate_request_key(), ) @@ -673,6 +732,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): filter_collection=FilterCollection( self.hs, {"room": {"timeline": {"limit": 1}}} ), + use_state_after=self.use_state_after, ), sync_version=SyncVersion.SYNC_V2, request_key=generate_request_key(), @@ -684,7 +744,11 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): [e.event_id for e in room_sync.timeline.events], [e3_event], ) - self.assertNotIn(s2_event, [e.event_id for e in room_sync.state.values()]) + if self.use_state_after: + # When using `state_after` we get told about s2 immediately + self.assertIn(s2_event, [e.event_id for e in room_sync.state.values()]) + else: + self.assertNotIn(s2_event, [e.event_id for e in room_sync.state.values()]) # More events, E4 and E5 with self._patch_get_latest_events([e3_event]): @@ -695,7 +759,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): incremental_sync = self.get_success( self.sync_handler.wait_for_sync_for_user( alice_requester, - generate_sync_config(alice), + generate_sync_config(alice, use_state_after=self.use_state_after), sync_version=SyncVersion.SYNC_V2, request_key=generate_request_key(), since_token=initial_sync_result.next_batch, @@ -710,10 +774,19 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): [e.event_id for e in room_sync.timeline.events], [e4_event, e5_event], ) - self.assertEqual( - [e.event_id for e in room_sync.state.values()], - [s2_event], - ) + + if self.use_state_after: + # When using `state_after` we got told about s2 previously, so we + # don't again. + self.assertEqual( + [e.event_id for e in room_sync.state.values()], + [], + ) + else: + self.assertEqual( + [e.event_id for e in room_sync.state.values()], + [s2_event], + ) @parameterized.expand( [ @@ -721,7 +794,8 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): (True, False), (False, True), (True, True), - ] + ], + name_func=lambda func, num, p: f"{func.__name__}_{p.args[0]}_{p.args[1]}", ) def test_archived_rooms_do_not_include_state_after_leave( self, initial_sync: bool, empty_timeline: bool @@ -749,7 +823,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): initial_sync_result = self.get_success( self.sync_handler.wait_for_sync_for_user( bob_requester, - generate_sync_config(bob), + generate_sync_config(bob, use_state_after=self.use_state_after), sync_version=SyncVersion.SYNC_V2, request_key=generate_request_key(), ) @@ -780,7 +854,9 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): self.sync_handler.wait_for_sync_for_user( bob_requester, generate_sync_config( - bob, filter_collection=FilterCollection(self.hs, filter_dict) + bob, + filter_collection=FilterCollection(self.hs, filter_dict), + use_state_after=self.use_state_after, ), sync_version=SyncVersion.SYNC_V2, request_key=generate_request_key(), @@ -791,7 +867,15 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): if empty_timeline: # The timeline should be empty self.assertEqual(sync_room_result.timeline.events, []) + else: + # The last three events in the timeline should be those leading up to the + # leave + self.assertEqual( + [e.event_id for e in sync_room_result.timeline.events[-3:]], + [before_message_event, before_state_event, leave_event], + ) + if empty_timeline or self.use_state_after: # And the state should include the leave event... self.assertEqual( sync_room_result.state[("m.room.member", bob)].event_id, leave_event @@ -801,12 +885,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): sync_room_result.state[("test_state", "")].event_id, before_state_event ) else: - # The last three events in the timeline should be those leading up to the - # leave - self.assertEqual( - [e.event_id for e in sync_room_result.timeline.events[-3:]], - [before_message_event, before_state_event, leave_event], - ) # ... And the state should be empty self.assertEqual(sync_room_result.state, {}) @@ -879,7 +957,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): sync_result: SyncResult = self.get_success( self.sync_handler.wait_for_sync_for_user( create_requester(user), - generate_sync_config(user), + generate_sync_config(user, use_state_after=self.use_state_after), sync_version=SyncVersion.SYNC_V2, request_key=generate_request_key(), ) @@ -928,7 +1006,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): private_sync_result: SyncResult = self.get_success( self.sync_handler.wait_for_sync_for_user( create_requester(user2), - generate_sync_config(user2), + generate_sync_config(user2, use_state_after=self.use_state_after), sync_version=SyncVersion.SYNC_V2, request_key=generate_request_key(), ) @@ -954,7 +1032,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): sync_result: SyncResult = self.get_success( self.sync_handler.wait_for_sync_for_user( create_requester(user), - generate_sync_config(user), + generate_sync_config(user, use_state_after=self.use_state_after), sync_version=SyncVersion.SYNC_V2, request_key=generate_request_key(), ) @@ -991,7 +1069,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): sync_d = defer.ensureDeferred( self.sync_handler.wait_for_sync_for_user( create_requester(user), - generate_sync_config(user), + generate_sync_config(user, use_state_after=self.use_state_after), sync_version=SyncVersion.SYNC_V2, request_key=generate_request_key(), since_token=since_token, @@ -1046,7 +1124,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): sync_d = defer.ensureDeferred( self.sync_handler.wait_for_sync_for_user( create_requester(user), - generate_sync_config(user), + generate_sync_config(user, use_state_after=self.use_state_after), sync_version=SyncVersion.SYNC_V2, request_key=generate_request_key(), since_token=since_token, @@ -1062,6 +1140,7 @@ def generate_sync_config( user_id: str, device_id: Optional[str] = "device_id", filter_collection: Optional[FilterCollection] = None, + use_state_after: bool = False, ) -> SyncConfig: """Generate a sync config (with a unique request key). @@ -1069,7 +1148,8 @@ def generate_sync_config( user_id: user who is syncing. device_id: device that is syncing. Defaults to "device_id". filter_collection: filter to apply. Defaults to the default filter (ie, - return everything, with a default limit) + return everything, with a default limit) + use_state_after: whether the `use_state_after` flag was set. """ if filter_collection is None: filter_collection = Filtering(Mock()).DEFAULT_FILTER_COLLECTION @@ -1079,4 +1159,106 @@ def generate_sync_config( filter_collection=filter_collection, is_guest=False, device_id=device_id, + use_state_after=use_state_after, ) + + +class SyncStateAfterTestCase(tests.unittest.HomeserverTestCase): + """Tests Sync Handler state behavior when using `use_state_after.""" + + servlets = [ + admin.register_servlets, + knock.register_servlets, + login.register_servlets, + room.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.sync_handler = self.hs.get_sync_handler() + self.store = self.hs.get_datastores().main + + # AuthBlocking reads from the hs' config on initialization. We need to + # modify its config instead of the hs' + self.auth_blocking = self.hs.get_auth_blocking() + + def test_initial_sync_multiple_deltas(self) -> None: + """Test that if multiple state deltas have happened during processing of + a full state sync we return the correct state""" + + user = self.register_user("user", "password") + tok = self.login("user", "password") + + # Create a room as the user and set some custom state. + joined_room = self.helper.create_room_as(user, tok=tok) + + first_state = self.helper.send_state( + joined_room, event_type="m.test_event", body={"num": 1}, tok=tok + ) + + # Take a snapshot of the stream token, to simulate doing an initial sync + # at this point. + end_stream_token = self.hs.get_event_sources().get_current_token() + + # Send some state *after* the stream token + self.helper.send_state( + joined_room, event_type="m.test_event", body={"num": 2}, tok=tok + ) + + # Calculating the full state will return the first state, and not the + # second. + state = self.get_success( + self.sync_handler._compute_state_delta_for_full_sync( + room_id=joined_room, + sync_config=generate_sync_config(user, use_state_after=True), + batch=TimelineBatch( + prev_batch=end_stream_token, events=[], limited=True + ), + end_token=end_stream_token, + members_to_fetch=None, + timeline_state={}, + joined=True, + ) + ) + self.assertEqual(state[("m.test_event", "")], first_state["event_id"]) + + def test_incremental_sync_multiple_deltas(self) -> None: + """Test that if multiple state deltas have happened since an incremental + state sync we return the correct state""" + + user = self.register_user("user", "password") + tok = self.login("user", "password") + + # Create a room as the user and set some custom state. + joined_room = self.helper.create_room_as(user, tok=tok) + + # Take a snapshot of the stream token, to simulate doing an incremental sync + # from this point. + since_token = self.hs.get_event_sources().get_current_token() + + self.helper.send_state( + joined_room, event_type="m.test_event", body={"num": 1}, tok=tok + ) + + # Send some state *after* the stream token + second_state = self.helper.send_state( + joined_room, event_type="m.test_event", body={"num": 2}, tok=tok + ) + + end_stream_token = self.hs.get_event_sources().get_current_token() + + # Calculating the incrementals state will return the second state, and not the + # first. + state = self.get_success( + self.sync_handler._compute_state_delta_for_incremental_sync( + room_id=joined_room, + sync_config=generate_sync_config(user, use_state_after=True), + batch=TimelineBatch( + prev_batch=end_stream_token, events=[], limited=True + ), + since_token=since_token, + end_token=end_stream_token, + members_to_fetch=None, + timeline_state={}, + ) + ) + self.assertEqual(state[("m.test_event", "")], second_state["event_id"]) diff --git a/tests/http/test_matrixfederationclient.py b/tests/http/test_matrixfederationclient.py index 6588695e37..e34df54e13 100644 --- a/tests/http/test_matrixfederationclient.py +++ b/tests/http/test_matrixfederationclient.py @@ -903,12 +903,19 @@ class FederationClientProxyTests(BaseMultiWorkerStreamTestCase): headers=Headers( { "Content-Type": ["application/json"], - "Connection": ["close, X-Foo, X-Bar"], + "X-Test": ["test"], + # Define some hop-by-hop headers (try with varying casing to + # make sure we still match-up the headers) + "Connection": ["close, X-fOo, X-Bar, X-baz"], # Should be removed because it's defined in the `Connection` header "X-Foo": ["foo"], "X-Bar": ["bar"], + # (not in canonical case) + "x-baZ": ["baz"], # Should be removed because it's a hop-by-hop header "Proxy-Authorization": "abcdef", + # Should be removed because it's a hop-by-hop header (not in canonical case) + "transfer-EnCoDiNg": "abcdef", } ), ) @@ -938,9 +945,17 @@ class FederationClientProxyTests(BaseMultiWorkerStreamTestCase): header_names = set(headers.keys()) # Make sure the response does not include the hop-by-hop headers - self.assertNotIn(b"X-Foo", header_names) - self.assertNotIn(b"X-Bar", header_names) - self.assertNotIn(b"Proxy-Authorization", header_names) + self.assertIncludes( + header_names, + { + b"Content-Type", + b"X-Test", + # Default headers from Twisted + b"Date", + b"Server", + }, + exact=True, + ) # Make sure the response is as expected back on the main worker self.assertEqual(res, {"foo": "bar"}) diff --git a/tests/http/test_proxy.py b/tests/http/test_proxy.py index 5895270494..7110dcf9f9 100644 --- a/tests/http/test_proxy.py +++ b/tests/http/test_proxy.py @@ -22,27 +22,42 @@ from typing import Set from parameterized import parameterized -from synapse.http.proxy import parse_connection_header_value +from synapse.http.proxy import ( + HOP_BY_HOP_HEADERS_LOWERCASE, + parse_connection_header_value, +) from tests.unittest import TestCase +def mix_case(s: str) -> str: + """ + Mix up the case of each character in the string (upper or lower case) + """ + return "".join(c.upper() if i % 2 == 0 else c.lower() for i, c in enumerate(s)) + + class ProxyTests(TestCase): @parameterized.expand( [ - [b"close, X-Foo, X-Bar", {"Close", "X-Foo", "X-Bar"}], + [b"close, X-Foo, X-Bar", {"close", "x-foo", "x-bar"}], # No whitespace - [b"close,X-Foo,X-Bar", {"Close", "X-Foo", "X-Bar"}], + [b"close,X-Foo,X-Bar", {"close", "x-foo", "x-bar"}], # More whitespace - [b"close, X-Foo, X-Bar", {"Close", "X-Foo", "X-Bar"}], + [b"close, X-Foo, X-Bar", {"close", "x-foo", "x-bar"}], # "close" directive in not the first position - [b"X-Foo, X-Bar, close", {"X-Foo", "X-Bar", "Close"}], + [b"X-Foo, X-Bar, close", {"x-foo", "x-bar", "close"}], # Normalizes header capitalization - [b"keep-alive, x-fOo, x-bAr", {"Keep-Alive", "X-Foo", "X-Bar"}], + [b"keep-alive, x-fOo, x-bAr", {"keep-alive", "x-foo", "x-bar"}], # Handles header names with whitespace [ b"keep-alive, x foo, x bar", - {"Keep-Alive", "X foo", "X bar"}, + {"keep-alive", "x foo", "x bar"}, + ], + # Make sure we handle all of the hop-by-hop headers + [ + mix_case(", ".join(HOP_BY_HOP_HEADERS_LOWERCASE)).encode("ascii"), + HOP_BY_HOP_HEADERS_LOWERCASE, ], ] ) @@ -54,7 +69,8 @@ class ProxyTests(TestCase): """ Tests that the connection header value is parsed correctly """ - self.assertEqual( + self.assertIncludes( expected_extra_headers_to_remove, parse_connection_header_value(connection_header_value), + exact=True, ) diff --git a/tests/push/test_bulk_push_rule_evaluator.py b/tests/push/test_bulk_push_rule_evaluator.py index fc73f3dc2a..16c1292812 100644 --- a/tests/push/test_bulk_push_rule_evaluator.py +++ b/tests/push/test_bulk_push_rule_evaluator.py @@ -120,9 +120,11 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase): # # We have seen stringy and null values for "room" in the wild, so presumably # some of this validation was missing in the past. - with patch("synapse.events.validator.validate_canonicaljson"), patch( - "synapse.events.validator.jsonschema.validate" - ), patch("synapse.handlers.event_auth.check_state_dependent_auth_rules"): + with ( + patch("synapse.events.validator.validate_canonicaljson"), + patch("synapse.events.validator.jsonschema.validate"), + patch("synapse.handlers.event_auth.check_state_dependent_auth_rules"), + ): pl_event_id = self.helper.send_state( self.room_id, "m.room.power_levels", diff --git a/tests/rest/client/sliding_sync/test_rooms_required_state.py b/tests/rest/client/sliding_sync/test_rooms_required_state.py index 7da51d4954..ecea5f2d5b 100644 --- a/tests/rest/client/sliding_sync/test_rooms_required_state.py +++ b/tests/rest/client/sliding_sync/test_rooms_required_state.py @@ -381,10 +381,10 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase): ) self.assertIsNone(response_body["rooms"][room_id1].get("invite_state")) - def test_rooms_required_state_lazy_loading_room_members(self) -> None: + def test_rooms_required_state_lazy_loading_room_members_initial_sync(self) -> None: """ - Test `rooms.required_state` returns people relevant to the timeline when - lazy-loading room members, `["m.room.member","$LAZY"]`. + On initial sync, test `rooms.required_state` returns people relevant to the + timeline when lazy-loading room members, `["m.room.member","$LAZY"]`. """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") @@ -432,6 +432,255 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase): ) self.assertIsNone(response_body["rooms"][room_id1].get("invite_state")) + def test_rooms_required_state_lazy_loading_room_members_incremental_sync( + self, + ) -> None: + """ + On incremental sync, test `rooms.required_state` returns people relevant to the + timeline when lazy-loading room members, `["m.room.member","$LAZY"]`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + user3_id = self.register_user("user3", "pass") + user3_tok = self.login(user3_id, "pass") + user4_id = self.register_user("user4", "pass") + user4_tok = self.login(user4_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.join(room_id1, user3_id, tok=user3_tok) + self.helper.join(room_id1, user4_id, tok=user4_tok) + + self.helper.send(room_id1, "1", tok=user2_tok) + self.helper.send(room_id1, "2", tok=user2_tok) + self.helper.send(room_id1, "3", tok=user2_tok) + + # Make the Sliding Sync request with lazy loading for the room members + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Create, ""], + [EventTypes.Member, StateValues.LAZY], + ], + "timeline_limit": 3, + } + } + } + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Send more timeline events into the room + self.helper.send(room_id1, "4", tok=user2_tok) + self.helper.send(room_id1, "5", tok=user4_tok) + self.helper.send(room_id1, "6", tok=user4_tok) + + # Make an incremental Sliding Sync request + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + + # Only user2 and user4 sent events in the last 3 events we see in the `timeline` + # but since we've seen user2 in the last sync (and their membership hasn't + # changed), we should only see user4 here. + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Member, user4_id)], + }, + exact=True, + ) + self.assertIsNone(response_body["rooms"][room_id1].get("invite_state")) + + def test_rooms_required_state_expand_lazy_loading_room_members_incremental_sync( + self, + ) -> None: + """ + Test that when we expand the `required_state` to include lazy-loading room + members, it returns people relevant to the timeline. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + user3_id = self.register_user("user3", "pass") + user3_tok = self.login(user3_id, "pass") + user4_id = self.register_user("user4", "pass") + user4_tok = self.login(user4_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.join(room_id1, user3_id, tok=user3_tok) + self.helper.join(room_id1, user4_id, tok=user4_tok) + + self.helper.send(room_id1, "1", tok=user2_tok) + self.helper.send(room_id1, "2", tok=user2_tok) + self.helper.send(room_id1, "3", tok=user2_tok) + + # Make the Sliding Sync request *without* lazy loading for the room members + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Create, ""], + ], + "timeline_limit": 3, + } + } + } + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Send more timeline events into the room + self.helper.send(room_id1, "4", tok=user2_tok) + self.helper.send(room_id1, "5", tok=user4_tok) + self.helper.send(room_id1, "6", tok=user4_tok) + + # Expand `required_state` and make an incremental Sliding Sync request *with* + # lazy-loading room members + sync_body["lists"]["foo-list"]["required_state"] = [ + [EventTypes.Create, ""], + [EventTypes.Member, StateValues.LAZY], + ] + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + + # Only user2 and user4 sent events in the last 3 events we see in the `timeline` + # and we haven't seen any membership before this sync so we should see both + # users. + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Member, user2_id)], + state_map[(EventTypes.Member, user4_id)], + }, + exact=True, + ) + self.assertIsNone(response_body["rooms"][room_id1].get("invite_state")) + + # Send a message so the room comes down sync. + self.helper.send(room_id1, "7", tok=user2_tok) + self.helper.send(room_id1, "8", tok=user4_tok) + self.helper.send(room_id1, "9", tok=user4_tok) + + # Make another incremental Sliding Sync request + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + # Only user2 and user4 sent events in the last 3 events we see in the `timeline` + # but since we've seen both memberships in the last sync, they shouldn't appear + # again. + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1].get("required_state", []), + set(), + exact=True, + ) + self.assertIsNone(response_body["rooms"][room_id1].get("invite_state")) + + def test_rooms_required_state_expand_retract_expand_lazy_loading_room_members_incremental_sync( + self, + ) -> None: + """ + Test that when we expand the `required_state` to include lazy-loading room + members, it returns people relevant to the timeline. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + user3_id = self.register_user("user3", "pass") + user3_tok = self.login(user3_id, "pass") + user4_id = self.register_user("user4", "pass") + user4_tok = self.login(user4_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.join(room_id1, user3_id, tok=user3_tok) + self.helper.join(room_id1, user4_id, tok=user4_tok) + + self.helper.send(room_id1, "1", tok=user2_tok) + self.helper.send(room_id1, "2", tok=user2_tok) + self.helper.send(room_id1, "3", tok=user2_tok) + + # Make the Sliding Sync request *without* lazy loading for the room members + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Create, ""], + ], + "timeline_limit": 3, + } + } + } + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Send more timeline events into the room + self.helper.send(room_id1, "4", tok=user2_tok) + self.helper.send(room_id1, "5", tok=user4_tok) + self.helper.send(room_id1, "6", tok=user4_tok) + + # Expand `required_state` and make an incremental Sliding Sync request *with* + # lazy-loading room members + sync_body["lists"]["foo-list"]["required_state"] = [ + [EventTypes.Create, ""], + [EventTypes.Member, StateValues.LAZY], + ] + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + + # Only user2 and user4 sent events in the last 3 events we see in the `timeline` + # and we haven't seen any membership before this sync so we should see both + # users because we're lazy-loading the room members. + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Member, user2_id)], + state_map[(EventTypes.Member, user4_id)], + }, + exact=True, + ) + + # Send a message so the room comes down sync. + self.helper.send(room_id1, "msg", tok=user4_tok) + + # Retract `required_state` and make an incremental Sliding Sync request + # requesting a few memberships + sync_body["lists"]["foo-list"]["required_state"] = [ + [EventTypes.Create, ""], + [EventTypes.Member, StateValues.ME], + [EventTypes.Member, user2_id], + ] + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + + # We've seen user2's membership in the last sync so we shouldn't see it here + # even though it's requested. We should only see user1's membership. + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Member, user1_id)], + }, + exact=True, + ) + def test_rooms_required_state_me(self) -> None: """ Test `rooms.required_state` correctly handles $ME. @@ -561,7 +810,7 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase): ) self.helper.leave(room_id1, user3_id, tok=user3_tok) - # Make the Sliding Sync request with lazy loading for the room members + # Make an incremental Sliding Sync request response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) # Only user2 and user3 sent events in the 3 events we see in the `timeline` diff --git a/tests/server.py b/tests/server.py index 95aff6f66c..23c81203a5 100644 --- a/tests/server.py +++ b/tests/server.py @@ -58,6 +58,7 @@ import twisted from twisted.enterprise import adbapi from twisted.internet import address, tcp, threads, udp from twisted.internet._resolver import SimpleResolverComplexifier +from twisted.internet.address import IPv4Address, IPv6Address from twisted.internet.defer import Deferred, fail, maybeDeferred, succeed from twisted.internet.error import DNSLookupError from twisted.internet.interfaces import ( @@ -73,6 +74,7 @@ from twisted.internet.interfaces import ( IReactorPluggableNameResolver, IReactorTime, IResolverSimple, + ITCPTransport, ITransport, ) from twisted.internet.protocol import ClientFactory, DatagramProtocol, Factory @@ -780,7 +782,7 @@ def get_clock() -> Tuple[ThreadedMemoryReactorClock, Clock]: return clock, hs_clock -@implementer(ITransport) +@implementer(ITCPTransport) @attr.s(cmp=False, auto_attribs=True) class FakeTransport: """ @@ -809,12 +811,12 @@ class FakeTransport: will get called back for connectionLost() notifications etc. """ - _peer_address: IAddress = attr.Factory( + _peer_address: Union[IPv4Address, IPv6Address] = attr.Factory( lambda: address.IPv4Address("TCP", "127.0.0.1", 5678) ) """The value to be returned by getPeer""" - _host_address: IAddress = attr.Factory( + _host_address: Union[IPv4Address, IPv6Address] = attr.Factory( lambda: address.IPv4Address("TCP", "127.0.0.1", 1234) ) """The value to be returned by getHost""" @@ -826,10 +828,10 @@ class FakeTransport: producer: Optional[IPushProducer] = None autoflush: bool = True - def getPeer(self) -> IAddress: + def getPeer(self) -> Union[IPv4Address, IPv6Address]: return self._peer_address - def getHost(self) -> IAddress: + def getHost(self) -> Union[IPv4Address, IPv6Address]: return self._host_address def loseConnection(self) -> None: @@ -939,6 +941,51 @@ class FakeTransport: logger.info("FakeTransport: Buffer now empty, completing disconnect") self.disconnected = True + ## ITCPTransport methods. ## + + def loseWriteConnection(self) -> None: + """ + Half-close the write side of a TCP connection. + + If the protocol instance this is attached to provides + IHalfCloseableProtocol, it will get notified when the operation is + done. When closing write connection, as with loseConnection this will + only happen when buffer has emptied and there is no registered + producer. + """ + raise NotImplementedError() + + def getTcpNoDelay(self) -> bool: + """ + Return if C{TCP_NODELAY} is enabled. + """ + return False + + def setTcpNoDelay(self, enabled: bool) -> None: + """ + Enable/disable C{TCP_NODELAY}. + + Enabling C{TCP_NODELAY} turns off Nagle's algorithm. Small packets are + sent sooner, possibly at the expense of overall throughput. + """ + # Ignore setting this. + + def getTcpKeepAlive(self) -> bool: + """ + Return if C{SO_KEEPALIVE} is enabled. + """ + return False + + def setTcpKeepAlive(self, enabled: bool) -> None: + """ + Enable/disable C{SO_KEEPALIVE}. + + Enabling C{SO_KEEPALIVE} sends packets periodically when the connection + is otherwise idle, usually once every two hours. They are intended + to allow detection of lost peers in a non-infinite amount of time. + """ + # Ignore setting this. + def connect_client( reactor: ThreadedMemoryReactorClock, client_id: int diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index ed5f286243..38a56419f3 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -1465,20 +1465,25 @@ class GetCurrentStateDeltaMembershipChangesForUserFederationTestCase( ) ) - with patch.object( - self.room_member_handler.federation_handler.federation_client, - "make_membership_event", - mock_make_membership_event, - ), patch.object( - self.room_member_handler.federation_handler.federation_client, - "send_join", - mock_send_join, - ), patch( - "synapse.event_auth._is_membership_change_allowed", - return_value=None, - ), patch( - "synapse.handlers.federation_event.check_state_dependent_auth_rules", - return_value=None, + with ( + patch.object( + self.room_member_handler.federation_handler.federation_client, + "make_membership_event", + mock_make_membership_event, + ), + patch.object( + self.room_member_handler.federation_handler.federation_client, + "send_join", + mock_send_join, + ), + patch( + "synapse.event_auth._is_membership_change_allowed", + return_value=None, + ), + patch( + "synapse.handlers.federation_event.check_state_dependent_auth_rules", + return_value=None, + ), ): self.get_success( self.room_member_handler.update_membership( diff --git a/tests/util/test_check_dependencies.py b/tests/util/test_check_dependencies.py index 13a4e6ddaa..c052ba2b75 100644 --- a/tests/util/test_check_dependencies.py +++ b/tests/util/test_check_dependencies.py @@ -109,10 +109,13 @@ class TestDependencyChecker(TestCase): def test_checks_ignore_dev_dependencies(self) -> None: """Both generic and per-extra checks should ignore dev dependencies.""" - with patch( - "synapse.util.check_dependencies.metadata.requires", - return_value=["dummypkg >= 1; extra == 'mypy'"], - ), patch("synapse.util.check_dependencies.RUNTIME_EXTRAS", {"cool-extra"}): + with ( + patch( + "synapse.util.check_dependencies.metadata.requires", + return_value=["dummypkg >= 1; extra == 'mypy'"], + ), + patch("synapse.util.check_dependencies.RUNTIME_EXTRAS", {"cool-extra"}), + ): # We're testing that none of these calls raise. with self.mock_installed_package(None): check_requirements() @@ -141,10 +144,13 @@ class TestDependencyChecker(TestCase): def test_check_for_extra_dependencies(self) -> None: """Complain if a package required for an extra is missing or old.""" - with patch( - "synapse.util.check_dependencies.metadata.requires", - return_value=["dummypkg >= 1; extra == 'cool-extra'"], - ), patch("synapse.util.check_dependencies.RUNTIME_EXTRAS", {"cool-extra"}): + with ( + patch( + "synapse.util.check_dependencies.metadata.requires", + return_value=["dummypkg >= 1; extra == 'cool-extra'"], + ), + patch("synapse.util.check_dependencies.RUNTIME_EXTRAS", {"cool-extra"}), + ): with self.mock_installed_package(None): self.assertRaises(DependencyException, check_requirements, "cool-extra") with self.mock_installed_package(old): diff --git a/tests/util/test_wheel_timer.py b/tests/util/test_wheel_timer.py index 173a7cfaec..6fa575a18e 100644 --- a/tests/util/test_wheel_timer.py +++ b/tests/util/test_wheel_timer.py @@ -28,53 +28,55 @@ class WheelTimerTestCase(unittest.TestCase): def test_single_insert_fetch(self) -> None: wheel: WheelTimer[object] = WheelTimer(bucket_size=5) - obj = object() - wheel.insert(100, obj, 150) + wheel.insert(100, "1", 150) self.assertListEqual(wheel.fetch(101), []) self.assertListEqual(wheel.fetch(110), []) self.assertListEqual(wheel.fetch(120), []) self.assertListEqual(wheel.fetch(130), []) self.assertListEqual(wheel.fetch(149), []) - self.assertListEqual(wheel.fetch(156), [obj]) + self.assertListEqual(wheel.fetch(156), ["1"]) self.assertListEqual(wheel.fetch(170), []) def test_multi_insert(self) -> None: wheel: WheelTimer[object] = WheelTimer(bucket_size=5) - obj1 = object() - obj2 = object() - obj3 = object() - wheel.insert(100, obj1, 150) - wheel.insert(105, obj2, 130) - wheel.insert(106, obj3, 160) + wheel.insert(100, "1", 150) + wheel.insert(105, "2", 130) + wheel.insert(106, "3", 160) self.assertListEqual(wheel.fetch(110), []) - self.assertListEqual(wheel.fetch(135), [obj2]) + self.assertListEqual(wheel.fetch(135), ["2"]) self.assertListEqual(wheel.fetch(149), []) - self.assertListEqual(wheel.fetch(158), [obj1]) + self.assertListEqual(wheel.fetch(158), ["1"]) self.assertListEqual(wheel.fetch(160), []) - self.assertListEqual(wheel.fetch(200), [obj3]) + self.assertListEqual(wheel.fetch(200), ["3"]) self.assertListEqual(wheel.fetch(210), []) def test_insert_past(self) -> None: wheel: WheelTimer[object] = WheelTimer(bucket_size=5) - obj = object() - wheel.insert(100, obj, 50) - self.assertListEqual(wheel.fetch(120), [obj]) + wheel.insert(100, "1", 50) + self.assertListEqual(wheel.fetch(120), ["1"]) def test_insert_past_multi(self) -> None: wheel: WheelTimer[object] = WheelTimer(bucket_size=5) - obj1 = object() - obj2 = object() - obj3 = object() - wheel.insert(100, obj1, 150) - wheel.insert(100, obj2, 140) - wheel.insert(100, obj3, 50) - self.assertListEqual(wheel.fetch(110), [obj3]) + wheel.insert(100, "1", 150) + wheel.insert(100, "2", 140) + wheel.insert(100, "3", 50) + self.assertListEqual(wheel.fetch(110), ["3"]) self.assertListEqual(wheel.fetch(120), []) - self.assertListEqual(wheel.fetch(147), [obj2]) - self.assertListEqual(wheel.fetch(200), [obj1]) + self.assertListEqual(wheel.fetch(147), ["2"]) + self.assertListEqual(wheel.fetch(200), ["1"]) self.assertListEqual(wheel.fetch(240), []) + + def test_multi_insert_then_past(self) -> None: + wheel: WheelTimer[object] = WheelTimer(bucket_size=5) + + wheel.insert(100, "1", 150) + wheel.insert(100, "2", 160) + wheel.insert(100, "3", 155) + + self.assertListEqual(wheel.fetch(110), []) + self.assertListEqual(wheel.fetch(158), ["1"]) diff --git a/tox.ini b/tox.ini index 4cd9dfb966..a506b5034d 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py37, py38, py39, py310 +envlist = py39, py310, py311, py312, py313 # we require tox>=2.3.2 for the fix to https://github.com/tox-dev/tox/issues/208 minversion = 2.3.2