Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes

This commit is contained in:
Erik Johnston 2024-09-10 10:32:01 +01:00
commit 0c512abce1
187 changed files with 2042 additions and 1069 deletions

View file

@ -29,10 +29,14 @@ jobs:
with:
install-project: "false"
- name: Run ruff
- name: Run ruff check
continue-on-error: true
run: poetry run ruff check --fix .
- name: Run ruff format
continue-on-error: true
run: poetry run ruff format --quiet .
- run: cargo clippy --all-features --fix -- -D warnings
continue-on-error: true

View file

@ -131,9 +131,12 @@ jobs:
with:
install-project: "false"
- name: Check style
- name: Run ruff check
run: poetry run ruff check --output-format=github .
- name: Run ruff format
run: poetry run ruff format --check .
lint-mypy:
runs-on: ubuntu-latest
name: Typechecking

View file

@ -1,3 +1,69 @@
# Synapse 1.114.0 (2024-09-02)
This release enables support for
[MSC4186](https://github.com/matrix-org/matrix-spec-proposals/pull/4186) —
Simplified Sliding Sync. This allows using the upcoming releases of the Element
X mobile apps without having to run a Sliding Sync Proxy.
### Features
- Enable native sliding sync support ([MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) and [MSC4186](https://github.com/matrix-org/matrix-spec-proposals/pull/4186)) by default. ([\#17648](https://github.com/element-hq/synapse/issues/17648))
# Synapse 1.114.0rc3 (2024-08-30)
### Bugfixes
- Fix regression in v1.114.0rc2 that caused workers to fail to start. ([\#17626](https://github.com/element-hq/synapse/issues/17626))
# Synapse 1.114.0rc2 (2024-08-30)
### Features
- Improve cross-signing upload when using [MSC3861](https://github.com/matrix-org/matrix-spec-proposals/pull/3861) to use a custom UIA flow stage, with web fallback support. ([\#17509](https://github.com/element-hq/synapse/issues/17509))
- Make `hash_password` script accept password input from stdin. ([\#17608](https://github.com/element-hq/synapse/issues/17608))
### Bugfixes
- Fix hierarchy returning 403 when room is accessible through federation. Contributed by Krishan (@kfiven). ([\#17194](https://github.com/element-hq/synapse/issues/17194))
- Fix content-length on federation `/thumbnail` responses. ([\#17532](https://github.com/element-hq/synapse/issues/17532))
- Fix authenticated media responses using a wrong limit when following redirects over federation. ([\#17543](https://github.com/element-hq/synapse/issues/17543))
### Internal Changes
- MSC3861: load the issuer and account management URLs from OIDC discovery. ([\#17407](https://github.com/element-hq/synapse/issues/17407))
- Refactor sliding sync class into multiple files. ([\#17595](https://github.com/element-hq/synapse/issues/17595))
- Store sliding sync per-connection state in the database. ([\#17599](https://github.com/element-hq/synapse/issues/17599))
- Make the sliding sync `PerConnectionState` class immutable. ([\#17600](https://github.com/element-hq/synapse/issues/17600))
- Add support to `@tag_args` for standalone functions. ([\#17604](https://github.com/element-hq/synapse/issues/17604))
- Speed up incremental syncs in sliding sync by adding some more caching. ([\#17606](https://github.com/element-hq/synapse/issues/17606))
- Always return the user's own read receipts in sliding sync. ([\#17617](https://github.com/element-hq/synapse/issues/17617))
- Replace `isort` and `black` with `ruff`. ([\#17620](https://github.com/element-hq/synapse/issues/17620))
- Refactor sliding sync code to move room list logic out into a separate class. ([\#17622](https://github.com/element-hq/synapse/issues/17622))
### Updates to locked dependencies
* Bump attrs from 23.2.0 to 24.2.0. ([\#17609](https://github.com/element-hq/synapse/issues/17609))
* Bump cryptography from 42.0.8 to 43.0.0. ([\#17584](https://github.com/element-hq/synapse/issues/17584))
* Bump phonenumbers from 8.13.43 to 8.13.44. ([\#17610](https://github.com/element-hq/synapse/issues/17610))
* Bump pygithub from 2.3.0 to 2.4.0. ([\#17612](https://github.com/element-hq/synapse/issues/17612))
* Bump pyyaml from 6.0.1 to 6.0.2. ([\#17611](https://github.com/element-hq/synapse/issues/17611))
* Bump sentry-sdk from 2.12.0 to 2.13.0. ([\#17585](https://github.com/element-hq/synapse/issues/17585))
* Bump serde from 1.0.206 to 1.0.208. ([\#17581](https://github.com/element-hq/synapse/issues/17581))
* Bump serde from 1.0.208 to 1.0.209. ([\#17613](https://github.com/element-hq/synapse/issues/17613))
* Bump serde_json from 1.0.124 to 1.0.125. ([\#17582](https://github.com/element-hq/synapse/issues/17582))
* Bump serde_json from 1.0.125 to 1.0.127. ([\#17614](https://github.com/element-hq/synapse/issues/17614))
* Bump types-jsonschema from 4.23.0.20240712 to 4.23.0.20240813. ([\#17583](https://github.com/element-hq/synapse/issues/17583))
* Bump types-setuptools from 71.1.0.20240726 to 71.1.0.20240818. ([\#17586](https://github.com/element-hq/synapse/issues/17586))
# Synapse 1.114.0rc1 (2024-08-20)
### Features

View file

@ -1 +0,0 @@
Fix hierarchy returning 403 when room is accessible through federation. Contributed by Krishan (@kfiven).

View file

@ -1 +0,0 @@
Fix content-length on federation /thumbnail responses.

View file

@ -1 +0,0 @@
Fix authenticated media responses using a wrong limit when following redirects over federation.

View file

@ -1 +0,0 @@
Refactor sliding sync class into multiple files.

View file

@ -1 +0,0 @@
Add support to `@tag_args` for standalone functions.

View file

@ -1 +0,0 @@
Speed up incremental syncs in sliding sync by adding some more caching.

View file

@ -1 +0,0 @@
Make `hash_password` accept password input from stdin.

View file

@ -1 +0,0 @@
Always return the user's own read receipts in sliding sync.

View file

@ -1 +0,0 @@
Refactor sliding sync code to move room list logic out into a separate class.

1
changelog.d/17641.misc Normal file
View file

@ -0,0 +1 @@
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.

1
changelog.d/17643.misc Normal file
View file

@ -0,0 +1 @@
Replace `isort` and `black with `ruff`.

1
changelog.d/17649.misc Normal file
View file

@ -0,0 +1 @@
Use new database tables for sliding sync.

View file

@ -0,0 +1 @@
Stabilise [MSC4156](https://github.com/matrix-org/matrix-spec-proposals/pull/4156) by removing the `msc4156_enabled` config setting and defaulting it to `true`.

1
changelog.d/17654.misc Normal file
View file

@ -0,0 +1 @@
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.

1
changelog.d/17655.misc Normal file
View file

@ -0,0 +1 @@
Prevent duplicate tags being added to Sliding Sync traces.

1
changelog.d/17658.misc Normal file
View file

@ -0,0 +1 @@
Get `bump_stamp` from [new sliding sync tables](https://github.com/element-hq/synapse/pull/17512) which should be faster.

1
changelog.d/17665.misc Normal file
View file

@ -0,0 +1 @@
Speed up incremental Sliding Sync requests by avoiding extra work.

1
changelog.d/17666.misc Normal file
View file

@ -0,0 +1 @@
Small performance improvement in speeding up sliding sync.

1
changelog.d/17670.misc Normal file
View file

@ -0,0 +1 @@
Small performance improvement in speeding up sliding sync.

1
changelog.d/17672.misc Normal file
View file

@ -0,0 +1 @@
Small performance improvement in speeding up sliding sync.

1
changelog.d/17673.misc Normal file
View file

@ -0,0 +1 @@
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.

1
changelog.d/17674.bugfix Normal file
View file

@ -0,0 +1 @@
Fix bug where we returned the wrong `bump_stamp` for invites in sliding sync response, causing incorrect ordering of invites in the room list.

1
changelog.d/17684.misc Normal file
View file

@ -0,0 +1 @@
Speed up sliding sync by reducing number of database calls.

1
changelog.d/17688.misc Normal file
View file

@ -0,0 +1 @@
Speed up sync by pulling out fewer events from the database.

View file

@ -21,7 +21,8 @@
#
#
""" Starts a synapse client console. """
"""Starts a synapse client console."""
import argparse
import binascii
import cmd

18
debian/changelog vendored
View file

@ -1,3 +1,21 @@
matrix-synapse-py3 (1.114.0) stable; urgency=medium
* New Synapse release 1.114.0.
-- Synapse Packaging team <packages@matrix.org> Mon, 02 Sep 2024 15:14:53 +0100
matrix-synapse-py3 (1.114.0~rc3) stable; urgency=medium
* New Synapse release 1.114.0rc3.
-- Synapse Packaging team <packages@matrix.org> Fri, 30 Aug 2024 16:38:05 +0100
matrix-synapse-py3 (1.114.0~rc2) stable; urgency=medium
* New Synapse release 1.114.0rc2.
-- Synapse Packaging team <packages@matrix.org> Fri, 30 Aug 2024 15:35:13 +0100
matrix-synapse-py3 (1.114.0~rc1) stable; urgency=medium
* New synapse release 1.114.0rc1.

24
poetry.lock generated
View file

@ -2557,13 +2557,13 @@ files = [
[[package]]
name = "towncrier"
version = "24.7.1"
version = "24.8.0"
description = "Building newsfiles for your project."
optional = false
python-versions = ">=3.8"
files = [
{file = "towncrier-24.7.1-py3-none-any.whl", hash = "sha256:685e2a94335b5dc47537b4d3b449a25b18571ea85b07dcf6e8df31ba40f692dd"},
{file = "towncrier-24.7.1.tar.gz", hash = "sha256:57a057faedabcadf1a62f6f9bad726ae566c1f31a411338ddb8316993f583b3d"},
{file = "towncrier-24.8.0-py3-none-any.whl", hash = "sha256:9343209592b839209cdf28c339ba45792fbfe9775b5f9c177462fd693e127d8d"},
{file = "towncrier-24.8.0.tar.gz", hash = "sha256:013423ee7eed102b2f393c287d22d95f66f1a3ea10a4baa82d298001a7f18af3"},
]
[package.dependencies]
@ -2622,13 +2622,13 @@ urllib3 = ">=1.26.0"
[[package]]
name = "twisted"
version = "24.7.0rc1"
version = "24.7.0"
description = "An asynchronous networking framework written in Python"
optional = false
python-versions = ">=3.8.0"
files = [
{file = "twisted-24.7.0rc1-py3-none-any.whl", hash = "sha256:f37d6656fe4e2871fab29d8952ae90bd6ca8b48a9e4dfa1b348f4cd62e6ba0bb"},
{file = "twisted-24.7.0rc1.tar.gz", hash = "sha256:bbc4a2193ca34cfa32f626300746698a6d70fcd77d9c0b79a664c347e39634fc"},
{file = "twisted-24.7.0-py3-none-any.whl", hash = "sha256:734832ef98108136e222b5230075b1079dad8a3fc5637319615619a7725b0c81"},
{file = "twisted-24.7.0.tar.gz", hash = "sha256:5a60147f044187a127ec7da96d170d49bcce50c6fd36f594e60f4587eff4d394"},
]
[package.dependencies]
@ -2761,24 +2761,24 @@ files = [
[[package]]
name = "types-pillow"
version = "10.2.0.20240520"
version = "10.2.0.20240822"
description = "Typing stubs for Pillow"
optional = false
python-versions = ">=3.8"
files = [
{file = "types-Pillow-10.2.0.20240520.tar.gz", hash = "sha256:130b979195465fa1e1676d8e81c9c7c30319e8e95b12fae945e8f0d525213107"},
{file = "types_Pillow-10.2.0.20240520-py3-none-any.whl", hash = "sha256:33c36494b380e2a269bb742181bea5d9b00820367822dbd3760f07210a1da23d"},
{file = "types-Pillow-10.2.0.20240822.tar.gz", hash = "sha256:559fb52a2ef991c326e4a0d20accb3bb63a7ba8d40eb493e0ecb0310ba52f0d3"},
{file = "types_Pillow-10.2.0.20240822-py3-none-any.whl", hash = "sha256:d9dab025aba07aeb12fd50a6799d4eac52a9603488eca09d7662543983f16c5d"},
]
[[package]]
name = "types-psycopg2"
version = "2.9.21.20240417"
version = "2.9.21.20240819"
description = "Typing stubs for psycopg2"
optional = false
python-versions = ">=3.8"
files = [
{file = "types-psycopg2-2.9.21.20240417.tar.gz", hash = "sha256:05db256f4a459fb21a426b8e7fca0656c3539105ff0208eaf6bdaf406a387087"},
{file = "types_psycopg2-2.9.21.20240417-py3-none-any.whl", hash = "sha256:644d6644d64ebbe37203229b00771012fb3b3bddd507a129a2e136485990e4f8"},
{file = "types-psycopg2-2.9.21.20240819.tar.gz", hash = "sha256:4ed6b47464d6374fa64e5e3b234cea0f710e72123a4596d67ab50b7415a84666"},
{file = "types_psycopg2-2.9.21.20240819-py3-none-any.whl", hash = "sha256:c9192311c27d7ad561eef705f1b2df1074f2cdcf445a98a6a2fcaaaad43278cf"},
]
[[package]]

View file

@ -97,7 +97,7 @@ module-name = "synapse.synapse_rust"
[tool.poetry]
name = "matrix-synapse"
version = "1.114.0rc1"
version = "1.114.0"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "AGPL-3.0-or-later"

View file

@ -31,6 +31,7 @@ Pydantic does not yet offer a strict mode, but it is planned for pydantic v2. Se
until then, this script is a best effort to stop us from introducing type coersion bugs
(like the infamous stringy power levels fixed in room version 10).
"""
import argparse
import contextlib
import functools

View file

@ -109,6 +109,9 @@ set -x
# --quiet suppresses the update check.
ruff check --quiet --fix "${files[@]}"
# Reformat Python code.
ruff format --quiet "${files[@]}"
# Catch any common programming mistakes in Rust code.
#
# --bins, --examples, --lib, --tests combined explicitly disable checking

View file

@ -20,8 +20,7 @@
#
#
"""An interactive script for doing a release. See `cli()` below.
"""
"""An interactive script for doing a release. See `cli()` below."""
import glob
import json

View file

@ -13,8 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains *incomplete* type hints for txredisapi.
"""
"""Contains *incomplete* type hints for txredisapi."""
from typing import Any, List, Optional, Type, Union
from twisted.internet import protocol

View file

@ -20,8 +20,7 @@
#
#
""" This is an implementation of a Matrix homeserver.
"""
"""This is an implementation of a Matrix homeserver."""
import os
import sys

View file

@ -171,7 +171,7 @@ def elide_http_methods_if_unconflicting(
"""
def paths_to_methods_dict(
methods_and_paths: Iterable[Tuple[str, str]]
methods_and_paths: Iterable[Tuple[str, str]],
) -> Dict[str, Set[str]]:
"""
Given (method, path) pairs, produces a dict from path to set of methods
@ -201,7 +201,7 @@ def elide_http_methods_if_unconflicting(
def simplify_path_regexes(
registrations: Dict[Tuple[str, str], EndpointDescription]
registrations: Dict[Tuple[str, str], EndpointDescription],
) -> Dict[Tuple[str, str], EndpointDescription]:
"""
Simplify all the path regexes for the dict of endpoint descriptions,

View file

@ -40,6 +40,7 @@ from synapse.storage.engines import create_engine
class ReviewConfig(RootConfig):
"A config class that just pulls out the database config"
config_classes = [DatabaseConfig]
@ -160,7 +161,11 @@ def main() -> None:
with make_conn(database_config, engine, "review_recent_signups") as db_conn:
# This generates a type of Cursor, not LoggingTransaction.
user_infos = get_recent_users(db_conn.cursor(), since_ms, exclude_users_with_appservice) # type: ignore[arg-type]
user_infos = get_recent_users(
db_conn.cursor(),
since_ms, # type: ignore[arg-type]
exclude_users_with_appservice,
)
for user_info in user_infos:
if exclude_users_with_email and user_info.emails:

View file

@ -717,9 +717,7 @@ class Porter:
return
# Check if all background updates are done, abort if not.
updates_complete = (
await self.sqlite_store.db_pool.updates.has_completed_background_updates()
)
updates_complete = await self.sqlite_store.db_pool.updates.has_completed_background_updates()
if not updates_complete:
end_error = (
"Pending background updates exist in the SQLite3 database."
@ -1095,10 +1093,10 @@ class Porter:
return done, remaining + done
async def _setup_state_group_id_seq(self) -> None:
curr_id: Optional[int] = (
await self.sqlite_store.db_pool.simple_select_one_onecol(
table="state_groups", keyvalues={}, retcol="MAX(id)", allow_none=True
)
curr_id: Optional[
int
] = await self.sqlite_store.db_pool.simple_select_one_onecol(
table="state_groups", keyvalues={}, retcol="MAX(id)", allow_none=True
)
if not curr_id:
@ -1186,13 +1184,13 @@ class Porter:
)
async def _setup_auth_chain_sequence(self) -> None:
curr_chain_id: Optional[int] = (
await self.sqlite_store.db_pool.simple_select_one_onecol(
table="event_auth_chains",
keyvalues={},
retcol="MAX(chain_id)",
allow_none=True,
)
curr_chain_id: Optional[
int
] = await self.sqlite_store.db_pool.simple_select_one_onecol(
table="event_auth_chains",
keyvalues={},
retcol="MAX(chain_id)",
allow_none=True,
)
def r(txn: LoggingTransaction) -> None:

View file

@ -19,7 +19,8 @@
#
#
"""Contains the URL paths to prefix various aspects of the server with. """
"""Contains the URL paths to prefix various aspects of the server with."""
import hmac
from hashlib import sha256
from urllib.parse import urlencode

View file

@ -54,6 +54,7 @@ UP & quit +---------- YES SUCCESS
This is all tied together by the AppServiceScheduler which DIs the required
components.
"""
import logging
from typing import (
TYPE_CHECKING,

View file

@ -338,8 +338,10 @@ class ExperimentalConfig(Config):
# MSC3391: Removing account data.
self.msc3391_enabled = experimental.get("msc3391_enabled", False)
# MSC3575 (Sliding Sync API endpoints)
self.msc3575_enabled: bool = experimental.get("msc3575_enabled", False)
# MSC3575 (Sliding Sync) alternate endpoints, c.f. MSC4186.
#
# This is enabled by default as a replacement for the sliding sync proxy.
self.msc3575_enabled: bool = experimental.get("msc3575_enabled", True)
# MSC3773: Thread notifications
self.msc3773_enabled: bool = experimental.get("msc3773_enabled", False)
@ -445,6 +447,3 @@ class ExperimentalConfig(Config):
# MSC4151: Report room API (Client-Server API)
self.msc4151_enabled: bool = experimental.get("msc4151_enabled", False)
# MSC4156: Migrate server_name to via
self.msc4156_enabled: bool = experimental.get("msc4156_enabled", False)

View file

@ -200,16 +200,13 @@ class KeyConfig(Config):
)
form_secret = 'form_secret: "%s"' % random_string_with_symbols(50)
return (
"""\
return """\
%(macaroon_secret_key)s
%(form_secret)s
signing_key_path: "%(base_key_name)s.signing.key"
trusted_key_servers:
- server_name: "matrix.org"
"""
% locals()
)
""" % locals()
def read_signing_keys(self, signing_key_path: str, name: str) -> List[SigningKey]:
"""Read the signing keys in the given path.
@ -249,7 +246,9 @@ class KeyConfig(Config):
if is_signing_algorithm_supported(key_id):
key_base64 = key_data["key"]
key_bytes = decode_base64(key_base64)
verify_key: "VerifyKeyWithExpiry" = decode_verify_key_bytes(key_id, key_bytes) # type: ignore[assignment]
verify_key: "VerifyKeyWithExpiry" = decode_verify_key_bytes(
key_id, key_bytes
) # type: ignore[assignment]
verify_key.expired = key_data["expired_ts"]
keys[key_id] = verify_key
else:

View file

@ -157,12 +157,9 @@ class LoggingConfig(Config):
self, config_dir_path: str, server_name: str, **kwargs: Any
) -> str:
log_config = os.path.join(config_dir_path, server_name + ".log.config")
return (
"""\
return """\
log_config: "%(log_config)s"
"""
% locals()
)
""" % locals()
def read_arguments(self, args: argparse.Namespace) -> None:
if args.no_redirect_stdio is not None:

View file

@ -828,13 +828,10 @@ class ServerConfig(Config):
).lstrip()
if not unsecure_listeners:
unsecure_http_bindings = (
"""- port: %(unsecure_port)s
unsecure_http_bindings = """- port: %(unsecure_port)s
tls: false
type: http
x_forwarded: true"""
% locals()
)
x_forwarded: true""" % locals()
if not open_private_ports:
unsecure_http_bindings += (
@ -853,16 +850,13 @@ class ServerConfig(Config):
if not secure_listeners:
secure_http_bindings = ""
return (
"""\
return """\
server_name: "%(server_name)s"
pid_file: %(pid_file)s
listeners:
%(secure_http_bindings)s
%(unsecure_http_bindings)s
"""
% locals()
)
""" % locals()
def read_arguments(self, args: argparse.Namespace) -> None:
if args.manhole is not None:

View file

@ -328,10 +328,11 @@ class WorkerConfig(Config):
)
# type-ignore: the expression `Union[A, B]` is not a Type[Union[A, B]] currently
self.instance_map: Dict[
str, InstanceLocationConfig
] = parse_and_validate_mapping(
instance_map, InstanceLocationConfig # type: ignore[arg-type]
self.instance_map: Dict[str, InstanceLocationConfig] = (
parse_and_validate_mapping(
instance_map,
InstanceLocationConfig, # type: ignore[arg-type]
)
)
# Map from type of streams to source, c.f. WriterLocations.

View file

@ -887,7 +887,8 @@ def _check_power_levels(
raise SynapseError(400, f"{v!r} must be an integer.")
if k in {"events", "notifications", "users"}:
if not isinstance(v, collections.abc.Mapping) or not all(
type(v) is int for v in v.values() # noqa: E721
type(v) is int
for v in v.values() # noqa: E721
):
raise SynapseError(
400,

View file

@ -80,7 +80,7 @@ def load_legacy_presence_router(hs: "HomeServer") -> None:
# All methods that the module provides should be async, but this wasn't enforced
# in the old module system, so we wrap them if needed
def async_wrapper(
f: Optional[Callable[P, R]]
f: Optional[Callable[P, R]],
) -> Optional[Callable[P, Awaitable[R]]]:
# f might be None if the callback isn't implemented by the module. In this
# case we don't want to register a callback at all so we return None.

View file

@ -504,7 +504,7 @@ class UnpersistedEventContext(UnpersistedEventContextBase):
def _encode_state_group_delta(
state_group_delta: Dict[Tuple[int, int], StateMap[str]]
state_group_delta: Dict[Tuple[int, int], StateMap[str]],
) -> List[Tuple[int, int, Optional[List[Tuple[str, str, str]]]]]:
if not state_group_delta:
return []
@ -517,7 +517,7 @@ def _encode_state_group_delta(
def _decode_state_group_delta(
input: List[Tuple[int, int, List[Tuple[str, str, str]]]]
input: List[Tuple[int, int, List[Tuple[str, str, str]]]],
) -> Dict[Tuple[int, int], StateMap[str]]:
if not input:
return {}
@ -544,7 +544,7 @@ def _encode_state_dict(
def _decode_state_dict(
input: Optional[List[Tuple[str, str, str]]]
input: Optional[List[Tuple[str, str, str]]],
) -> Optional[StateMap[str]]:
"""Decodes a state dict encoded using `_encode_state_dict` above"""
if input is None:

View file

@ -19,5 +19,4 @@
#
#
""" This package includes all the federation specific logic.
"""
"""This package includes all the federation specific logic."""

View file

@ -20,7 +20,7 @@
#
#
""" This module contains all the persistence actions done by the federation
"""This module contains all the persistence actions done by the federation
package.
These actions are mostly only used by the :py:mod:`.replication` module.

View file

@ -859,7 +859,6 @@ class FederationMediaThumbnailServlet(BaseFederationServerServlet):
request: SynapseRequest,
media_id: str,
) -> None:
width = parse_integer(request, "width", required=True)
height = parse_integer(request, "height", required=True)
method = parse_string(request, "method", "scale")

View file

@ -19,7 +19,7 @@
#
#
""" Defines the JSON structure of the protocol units used by the server to
"""Defines the JSON structure of the protocol units used by the server to
server protocol.
"""

View file

@ -118,10 +118,10 @@ class AccountHandler:
}
if self._use_account_validity_in_account_status:
status["org.matrix.expired"] = (
await self._account_validity_handler.is_user_expired(
user_id.to_string()
)
status[
"org.matrix.expired"
] = await self._account_validity_handler.is_user_expired(
user_id.to_string()
)
return status

View file

@ -197,14 +197,16 @@ class AdminHandler:
# events that we have and then filtering, this isn't the most
# efficient method perhaps but it does guarantee we get everything.
while True:
events, _ = (
await self._store.paginate_room_events_by_topological_ordering(
room_id=room_id,
from_key=from_key,
to_key=to_key,
limit=100,
direction=Direction.FORWARDS,
)
(
events,
_,
_,
) = await self._store.paginate_room_events_by_topological_ordering(
room_id=room_id,
from_key=from_key,
to_key=to_key,
limit=100,
direction=Direction.FORWARDS,
)
if not events:
break

View file

@ -166,8 +166,7 @@ def login_id_phone_to_thirdparty(identifier: JsonDict) -> Dict[str, str]:
if "country" not in identifier or (
# The specification requires a "phone" field, while Synapse used to require a "number"
# field. Accept both for backwards compatibility.
"phone" not in identifier
and "number" not in identifier
"phone" not in identifier and "number" not in identifier
):
raise SynapseError(
400, "Invalid phone-type identifier", errcode=Codes.INVALID_PARAM

View file

@ -265,9 +265,9 @@ class DirectoryHandler:
async def get_association(self, room_alias: RoomAlias) -> JsonDict:
room_id = None
if self.hs.is_mine(room_alias):
result: Optional[RoomAliasMapping] = (
await self.get_association_from_room_alias(room_alias)
)
result: Optional[
RoomAliasMapping
] = await self.get_association_from_room_alias(room_alias)
if result:
room_id = result.room_id
@ -512,11 +512,9 @@ class DirectoryHandler:
raise SynapseError(403, "Not allowed to publish room")
# Check if publishing is blocked by a third party module
allowed_by_third_party_rules = (
await (
self._third_party_event_rules.check_visibility_can_be_modified(
room_id, visibility
)
allowed_by_third_party_rules = await (
self._third_party_event_rules.check_visibility_can_be_modified(
room_id, visibility
)
)
if not allowed_by_third_party_rules:

View file

@ -1001,11 +1001,11 @@ class FederationHandler:
)
if include_auth_user_id:
event_content[EventContentFields.AUTHORISING_USER] = (
await self._event_auth_handler.get_user_which_could_invite(
room_id,
state_ids,
)
event_content[
EventContentFields.AUTHORISING_USER
] = await self._event_auth_handler.get_user_which_could_invite(
room_id,
state_ids,
)
builder = self.event_builder_factory.for_room_version(

View file

@ -21,6 +21,7 @@
#
"""Utilities for interacting with Identity Servers"""
import logging
import urllib.parse
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Tuple

View file

@ -1225,10 +1225,9 @@ class EventCreationHandler:
)
if prev_event_ids is not None:
assert (
len(prev_event_ids) <= 10
), "Attempting to create an event with %i prev_events" % (
len(prev_event_ids),
assert len(prev_event_ids) <= 10, (
"Attempting to create an event with %i prev_events"
% (len(prev_event_ids),)
)
else:
prev_event_ids = await self.store.get_prev_events_for_room(builder.room_id)

View file

@ -507,15 +507,17 @@ class PaginationHandler:
# Initially fetch the events from the database. With any luck, we can return
# these without blocking on backfill (handled below).
events, next_key = (
await self.store.paginate_room_events_by_topological_ordering(
room_id=room_id,
from_key=from_token.room_key,
to_key=to_room_key,
direction=pagin_config.direction,
limit=pagin_config.limit,
event_filter=event_filter,
)
(
events,
next_key,
_,
) = await self.store.paginate_room_events_by_topological_ordering(
room_id=room_id,
from_key=from_token.room_key,
to_key=to_room_key,
direction=pagin_config.direction,
limit=pagin_config.limit,
event_filter=event_filter,
)
if pagin_config.direction == Direction.BACKWARDS:
@ -584,15 +586,17 @@ class PaginationHandler:
# If we did backfill something, refetch the events from the database to
# catch anything new that might have been added since we last fetched.
if did_backfill:
events, next_key = (
await self.store.paginate_room_events_by_topological_ordering(
room_id=room_id,
from_key=from_token.room_key,
to_key=to_room_key,
direction=pagin_config.direction,
limit=pagin_config.limit,
event_filter=event_filter,
)
(
events,
next_key,
_,
) = await self.store.paginate_room_events_by_topological_ordering(
room_id=room_id,
from_key=from_token.room_key,
to_key=to_room_key,
direction=pagin_config.direction,
limit=pagin_config.limit,
event_filter=event_filter,
)
else:
# Otherwise, we can backfill in the background for eventual

View file

@ -71,6 +71,7 @@ user state; this device follows the normal timeout logic (see above) and will
automatically be replaced with any information from currently available devices.
"""
import abc
import contextlib
import itertools
@ -493,9 +494,9 @@ class WorkerPresenceHandler(BasePresenceHandler):
# The number of ongoing syncs on this process, by (user ID, device ID).
# Empty if _presence_enabled is false.
self._user_device_to_num_current_syncs: Dict[Tuple[str, Optional[str]], int] = (
{}
)
self._user_device_to_num_current_syncs: Dict[
Tuple[str, Optional[str]], int
] = {}
self.notifier = hs.get_notifier()
self.instance_id = hs.get_instance_id()
@ -818,9 +819,9 @@ class PresenceHandler(BasePresenceHandler):
# Keeps track of the number of *ongoing* syncs on this process. While
# this is non zero a user will never go offline.
self._user_device_to_num_current_syncs: Dict[Tuple[str, Optional[str]], int] = (
{}
)
self._user_device_to_num_current_syncs: Dict[
Tuple[str, Optional[str]], int
] = {}
# Keeps track of the number of *ongoing* syncs on other processes.
#

View file

@ -351,9 +351,9 @@ class ProfileHandler:
server_name = host
if self._is_mine_server_name(server_name):
media_info: Optional[Union[LocalMedia, RemoteMedia]] = (
await self.store.get_local_media(media_id)
)
media_info: Optional[
Union[LocalMedia, RemoteMedia]
] = await self.store.get_local_media(media_id)
else:
media_info = await self.store.get_cached_remote_media(server_name, media_id)

View file

@ -188,13 +188,13 @@ class RelationsHandler:
if include_original_event:
# Do not bundle aggregations when retrieving the original event because
# we want the content before relations are applied to it.
return_value["original_event"] = (
await self._event_serializer.serialize_event(
event,
now,
bundle_aggregations=None,
config=serialize_options,
)
return_value[
"original_event"
] = await self._event_serializer.serialize_event(
event,
now,
bundle_aggregations=None,
config=serialize_options,
)
if next_token:

View file

@ -20,6 +20,7 @@
#
"""Contains functions for performing actions on rooms."""
import itertools
import logging
import math
@ -900,11 +901,9 @@ class RoomCreationHandler:
)
# Check whether this visibility value is blocked by a third party module
allowed_by_third_party_rules = (
await (
self._third_party_event_rules.check_visibility_can_be_modified(
room_id, visibility
)
allowed_by_third_party_rules = await (
self._third_party_event_rules.check_visibility_can_be_modified(
room_id, visibility
)
)
if not allowed_by_third_party_rules:
@ -1754,7 +1753,7 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
)
events = list(room_events)
events.extend(e for evs, _ in room_to_events.values() for e in evs)
events.extend(e for evs, _, _ in room_to_events.values() for e in evs)
# We know stream_ordering must be not None here, as its been
# persisted, but mypy doesn't know that

View file

@ -1316,11 +1316,11 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# If this is going to be a local join, additional information must
# be included in the event content in order to efficiently validate
# the event.
content[EventContentFields.AUTHORISING_USER] = (
await self.event_auth_handler.get_user_which_could_invite(
room_id,
state_before_join,
)
content[
EventContentFields.AUTHORISING_USER
] = await self.event_auth_handler.get_user_which_could_invite(
room_id,
state_before_join,
)
return False, []
@ -1429,9 +1429,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
if requester is not None:
sender = UserID.from_string(event.sender)
assert (
sender == requester.user
), "Sender (%s) must be same as requester (%s)" % (sender, requester.user)
assert sender == requester.user, (
"Sender (%s) must be same as requester (%s)" % (sender, requester.user)
)
assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,)
else:
requester = types.create_requester(target_user)

View file

@ -423,9 +423,9 @@ class SearchHandler:
}
if search_result.room_groups and "room_id" in group_keys:
rooms_cat_res.setdefault("groups", {})[
"room_id"
] = search_result.room_groups
rooms_cat_res.setdefault("groups", {})["room_id"] = (
search_result.room_groups
)
if sender_group and "sender" in group_keys:
rooms_cat_res.setdefault("groups", {})["sender"] = sender_group

View file

@ -25,8 +25,8 @@ from synapse.events.utils import strip_event
from synapse.handlers.relations import BundledAggregations
from synapse.handlers.sliding_sync.extensions import SlidingSyncExtensionHandler
from synapse.handlers.sliding_sync.room_lists import (
RoomsForUserType,
SlidingSyncRoomLists,
_RoomMembershipForUser,
)
from synapse.handlers.sliding_sync.store import SlidingSyncConnectionStore
from synapse.logging.opentracing import (
@ -39,12 +39,14 @@ from synapse.logging.opentracing import (
)
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
from synapse.storage.databases.main.stream import PaginateFunction
from synapse.storage.roommember import MemberSummary
from synapse.storage.roommember import (
MemberSummary,
)
from synapse.types import (
JsonDict,
MutableStateMap,
PersistedEventPosition,
Requester,
RoomStreamToken,
SlidingSyncStreamToken,
StateMap,
StreamKeyType,
@ -255,6 +257,8 @@ class SlidingSyncHandler:
],
from_token=from_token,
to_token=to_token,
newly_joined=room_id in interested_rooms.newly_joined_rooms,
is_dm=room_id in interested_rooms.dm_room_ids,
)
# Filter out empty room results during incremental sync
@ -352,7 +356,7 @@ class SlidingSyncHandler:
async def get_current_state_ids_at(
self,
room_id: str,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
room_membership_for_user_at_to_token: RoomsForUserType,
state_filter: StateFilter,
to_token: StreamToken,
) -> StateMap[str]:
@ -417,7 +421,7 @@ class SlidingSyncHandler:
async def get_current_state_at(
self,
room_id: str,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
room_membership_for_user_at_to_token: RoomsForUserType,
state_filter: StateFilter,
to_token: StreamToken,
) -> StateMap[EventBase]:
@ -449,6 +453,7 @@ class SlidingSyncHandler:
return state_map
@trace
async def get_room_sync_data(
self,
sync_config: SlidingSyncConfig,
@ -456,9 +461,11 @@ class SlidingSyncHandler:
new_connection_state: "MutablePerConnectionState",
room_id: str,
room_sync_config: RoomSyncConfig,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
room_membership_for_user_at_to_token: RoomsForUserType,
from_token: Optional[SlidingSyncStreamToken],
to_token: StreamToken,
newly_joined: bool,
is_dm: bool,
) -> SlidingSyncResult.RoomResult:
"""
Fetch room data for the sync response.
@ -474,6 +481,8 @@ class SlidingSyncHandler:
in the room at the time of `to_token`.
from_token: The point in the stream to sync from.
to_token: The point in the stream to sync up to.
newly_joined: If the user has newly joined the room
is_dm: Whether the room is a DM room
"""
user = sync_config.user
@ -518,7 +527,7 @@ class SlidingSyncHandler:
from_bound = None
initial = True
ignore_timeline_bound = False
if from_token and not room_membership_for_user_at_to_token.newly_joined:
if from_token and not newly_joined:
room_status = previous_connection_state.rooms.have_sent_room(room_id)
if room_status.status == HaveSentRoomFlag.LIVE:
from_bound = from_token.stream_token.room_key
@ -587,9 +596,7 @@ class SlidingSyncHandler:
Membership.LEAVE,
Membership.BAN,
):
to_bound = (
room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
)
to_bound = room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
timeline_from_bound = from_bound
if ignore_timeline_bound:
@ -624,7 +631,7 @@ class SlidingSyncHandler:
# Use `stream_ordering` for updates
else paginate_room_events_by_stream_ordering
)
timeline_events, new_room_key = await pagination_method(
timeline_events, new_room_key, limited = await pagination_method(
room_id=room_id,
# The bounds are reversed so we can paginate backwards
# (from newer to older events) starting at to_bound.
@ -632,28 +639,13 @@ class SlidingSyncHandler:
from_key=to_bound,
to_key=timeline_from_bound,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
limit=room_sync_config.timeline_limit + 1,
limit=room_sync_config.timeline_limit,
)
# We want to return the events in ascending order (the last event is the
# most recent).
timeline_events.reverse()
# Determine our `limited` status based on the timeline. We do this before
# filtering the events so we can accurately determine if there is more to
# paginate even if we filter out some/all events.
if len(timeline_events) > room_sync_config.timeline_limit:
limited = True
# Get rid of that extra "+ 1" event because we only used it to determine
# if we hit the limit or not
timeline_events = timeline_events[-room_sync_config.timeline_limit :]
assert timeline_events[0].internal_metadata.stream_ordering
new_room_key = RoomStreamToken(
stream=timeline_events[0].internal_metadata.stream_ordering - 1
)
# Make sure we don't expose any events that the client shouldn't see
timeline_events = await filter_events_for_client(
self.storage_controllers,
@ -746,26 +738,78 @@ class SlidingSyncHandler:
# indicate to the client that a state reset happened. Perhaps we should indicate
# this by setting `initial: True` and empty `required_state`.
# Check whether the room has a name set
name_state_ids = await self.get_current_state_ids_at(
room_id=room_id,
room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
state_filter=StateFilter.from_types([(EventTypes.Name, "")]),
to_token=to_token,
)
name_event_id = name_state_ids.get((EventTypes.Name, ""))
room_membership_summary: Mapping[str, MemberSummary]
empty_membership_summary = MemberSummary([], 0)
if room_membership_for_user_at_to_token.membership in (
Membership.LEAVE,
Membership.BAN,
):
# TODO: Figure out how to get the membership summary for left/banned rooms
room_membership_summary = {}
# Get the changes to current state in the token range from the
# `current_state_delta_stream` table.
#
# For incremental syncs, we can do this first to determine if something relevant
# has changed and strategically avoid fetching other costly things.
room_state_delta_id_map: MutableStateMap[str] = {}
name_event_id: Optional[str] = None
membership_changed = False
name_changed = False
avatar_changed = False
if initial:
# Check whether the room has a name set
name_state_ids = await self.get_current_state_ids_at(
room_id=room_id,
room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
state_filter=StateFilter.from_types([(EventTypes.Name, "")]),
to_token=to_token,
)
name_event_id = name_state_ids.get((EventTypes.Name, ""))
else:
room_membership_summary = await self.store.get_room_summary(room_id)
# TODO: Reverse/rewind back to the `to_token`
assert from_bound is not None
# TODO: Limit the number of state events we're about to send down
# the room, if its too many we should change this to an
# `initial=True`?
deltas = await self.store.get_current_state_deltas_for_room(
room_id=room_id,
from_token=from_bound,
to_token=to_token.room_key,
)
for delta in deltas:
# TODO: Handle state resets where event_id is None
if delta.event_id is not None:
room_state_delta_id_map[(delta.event_type, delta.state_key)] = (
delta.event_id
)
if delta.event_type == EventTypes.Member:
membership_changed = True
elif delta.event_type == EventTypes.Name and delta.state_key == "":
name_changed = True
elif (
delta.event_type == EventTypes.RoomAvatar and delta.state_key == ""
):
avatar_changed = True
room_membership_summary: Optional[Mapping[str, MemberSummary]] = None
empty_membership_summary = MemberSummary([], 0)
# We need the room summary for:
# - Always for initial syncs (or the first time we send down the room)
# - When the room has no name, we need `heroes`
# - When the membership has changed so we need to give updated `heroes` and
# `joined_count`/`invited_count`.
#
# Ideally, instead of just looking at `name_changed`, we'd check if the room
# name is not set but this is a good enough approximation that saves us from
# having to pull out the full event. This just means, we're generating the
# summary whenever the room name changes instead of only when it changes to
# `None`.
if initial or name_changed or membership_changed:
# We can't trace the function directly because it's cached and the `@cached`
# decorator doesn't mix with `@trace` yet.
with start_active_span("get_room_summary"):
if room_membership_for_user_at_to_token.membership in (
Membership.LEAVE,
Membership.BAN,
):
# TODO: Figure out how to get the membership summary for left/banned rooms
room_membership_summary = {}
else:
room_membership_summary = await self.store.get_room_summary(room_id)
# TODO: Reverse/rewind back to the `to_token`
# `heroes` are required if the room name is not set.
#
@ -779,7 +823,12 @@ class SlidingSyncHandler:
# TODO: Should we also check for `EventTypes.CanonicalAlias`
# (`m.room.canonical_alias`) as a fallback for the room name? see
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1671260153
if name_event_id is None:
#
# We need to fetch the `heroes` if the room name is not set. But we only need to
# get them on initial syncs (or the first time we send down the room) or if the
# membership has changed which may change the heroes.
if name_event_id is None and (initial or (not initial and membership_changed)):
assert room_membership_summary is not None
hero_user_ids = extract_heroes_from_room_summary(
room_membership_summary, me=user.to_string()
)
@ -841,13 +890,13 @@ class SlidingSyncHandler:
required_state_filter = StateFilter.all()
else:
required_state_types: List[Tuple[str, Optional[str]]] = []
num_wild_state_keys = 0
lazy_load_room_members = False
num_others = 0
for (
state_type,
state_key_set,
) in room_sync_config.required_state_map.items():
num_wild_state_keys = 0
lazy_load_room_members = False
num_others = 0
for state_key in state_key_set:
if state_key == StateValues.WILDCARD:
num_wild_state_keys += 1
@ -879,27 +928,33 @@ class SlidingSyncHandler:
num_others += 1
required_state_types.append((state_type, state_key))
set_tag(
SynapseTags.FUNC_ARG_PREFIX
+ "required_state_wildcard_state_key_count",
num_wild_state_keys,
)
set_tag(
SynapseTags.FUNC_ARG_PREFIX + "required_state_lazy",
lazy_load_room_members,
)
set_tag(
SynapseTags.FUNC_ARG_PREFIX + "required_state_other_count",
num_others,
)
set_tag(
SynapseTags.FUNC_ARG_PREFIX
+ "required_state_wildcard_state_key_count",
num_wild_state_keys,
)
set_tag(
SynapseTags.FUNC_ARG_PREFIX + "required_state_lazy",
lazy_load_room_members,
)
set_tag(
SynapseTags.FUNC_ARG_PREFIX + "required_state_other_count",
num_others,
)
required_state_filter = StateFilter.from_types(required_state_types)
# We need this base set of info for the response so let's just fetch it along
# with the `required_state` for the room
meta_room_state = [(EventTypes.Name, ""), (EventTypes.RoomAvatar, "")] + [
hero_room_state = [
(EventTypes.Member, hero_user_id) for hero_user_id in hero_user_ids
]
meta_room_state = list(hero_room_state)
if initial or name_changed:
meta_room_state.append((EventTypes.Name, ""))
if initial or avatar_changed:
meta_room_state.append((EventTypes.RoomAvatar, ""))
state_filter = StateFilter.all()
if required_state_filter != StateFilter.all():
state_filter = StateFilter(
@ -922,21 +977,22 @@ class SlidingSyncHandler:
else:
assert from_bound is not None
# TODO: Limit the number of state events we're about to send down
# the room, if its too many we should change this to an
# `initial=True`?
deltas = await self.store.get_current_state_deltas_for_room(
room_id=room_id,
from_token=from_bound,
to_token=to_token.room_key,
)
# TODO: Filter room state before fetching events
# TODO: Handle state resets where event_id is None
events = await self.store.get_events(
[d.event_id for d in deltas if d.event_id]
state_filter.filter_state(room_state_delta_id_map).values()
)
room_state = {(s.type, s.state_key): s for s in events.values()}
# If the membership changed and we have to get heroes, get the remaining
# heroes from the state
if hero_user_ids:
hero_membership_state = await self.get_current_state_at(
room_id=room_id,
room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
state_filter=StateFilter.from_types(hero_room_state),
to_token=to_token,
)
room_state.update(hero_membership_state)
required_room_state: StateMap[EventBase] = {}
if required_state_filter != StateFilter.none():
required_room_state = required_state_filter.filter_state(room_state)
@ -969,26 +1025,19 @@ class SlidingSyncHandler:
)
# Figure out the last bump event in the room
last_bump_event_result = (
await self.store.get_last_event_pos_in_room_before_stream_ordering(
room_id,
to_token.room_key,
event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
)
)
# By default, just choose the membership event position
#
# By default, just choose the membership event position for any non-join membership
bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
# But if we found a bump event, use that instead
if last_bump_event_result is not None:
_, new_bump_event_pos = last_bump_event_result
# If we've just joined a remote room, then the last bump event may
# have been backfilled (and so have a negative stream ordering).
# These negative stream orderings can't sensibly be compared, so
# instead we use the membership event position.
if new_bump_event_pos.stream > 0:
bump_stamp = new_bump_event_pos.stream
# If we're joined to the room, we need to find the last bump event before the
# `to_token`
if room_membership_for_user_at_to_token.membership == Membership.JOIN:
# Try and get a bump stamp, if not we just fall back to the
# membership token.
new_bump_stamp = await self._get_bump_stamp(
room_id, to_token, timeline_events
)
if new_bump_stamp is not None:
bump_stamp = new_bump_stamp
unstable_expanded_timeline = False
prev_room_sync_config = previous_connection_state.room_configs.get(room_id)
@ -1041,11 +1090,25 @@ class SlidingSyncHandler:
set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)
joined_count: Optional[int] = None
if initial or membership_changed:
assert room_membership_summary is not None
joined_count = room_membership_summary.get(
Membership.JOIN, empty_membership_summary
).count
invited_count: Optional[int] = None
if initial or membership_changed:
assert room_membership_summary is not None
invited_count = room_membership_summary.get(
Membership.INVITE, empty_membership_summary
).count
return SlidingSyncResult.RoomResult(
name=room_name,
avatar=room_avatar,
heroes=heroes,
is_dm=room_membership_for_user_at_to_token.is_dm,
is_dm=is_dm,
initial=initial,
required_state=list(required_room_state.values()),
timeline_events=timeline_events,
@ -1056,15 +1119,100 @@ class SlidingSyncHandler:
unstable_expanded_timeline=unstable_expanded_timeline,
num_live=num_live,
bump_stamp=bump_stamp,
joined_count=room_membership_summary.get(
Membership.JOIN, empty_membership_summary
).count,
invited_count=room_membership_summary.get(
Membership.INVITE, empty_membership_summary
).count,
joined_count=joined_count,
invited_count=invited_count,
# TODO: These are just dummy values. We could potentially just remove these
# since notifications can only really be done correctly on the client anyway
# (encrypted rooms).
notification_count=0,
highlight_count=0,
)
@trace
async def _get_bump_stamp(
self, room_id: str, to_token: StreamToken, timeline: List[EventBase]
) -> Optional[int]:
"""Get a bump stamp for the room, if we have a bump event
Args:
room_id
to_token: The upper bound of token to return
timeline: The list of events we have fetched.
"""
# First check the timeline events we're returning to see if one of
# those matches. We iterate backwards and take the stream ordering
# of the first event that matches the bump event types.
for timeline_event in reversed(timeline):
if timeline_event.type in SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES:
new_bump_stamp = timeline_event.internal_metadata.stream_ordering
# All persisted events have a stream ordering
assert new_bump_stamp is not None
# If we've just joined a remote room, then the last bump event may
# have been backfilled (and so have a negative stream ordering).
# These negative stream orderings can't sensibly be compared, so
# instead we use the membership event position.
if new_bump_stamp > 0:
return new_bump_stamp
# We can quickly query for the latest bump event in the room using the
# sliding sync tables.
latest_room_bump_stamp = await self.store.get_latest_bump_stamp_for_room(
room_id
)
min_to_token_position = to_token.room_key.stream
# If we can rely on the new sliding sync tables and the `bump_stamp` is
# `None`, just fallback to the membership event position. This can happen
# when we've just joined a remote room and all the events are backfilled.
if (
# FIXME: The background job check can be removed once we bump
# `SCHEMA_COMPAT_VERSION` and run the foreground update for
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots`
# (tracked by https://github.com/element-hq/synapse/issues/17623)
await self.store.have_finished_sliding_sync_background_jobs()
and latest_room_bump_stamp is None
):
return None
# The `bump_stamp` stored in the database might be ahead of our token. Since
# `bump_stamp` is only a `stream_ordering` position, we can't be 100% sure
# that's before the `to_token` in all scenarios. The only scenario we can be
# sure of is if the `bump_stamp` is totally before the minimum position from
# the token.
#
# We don't need to check if the background update has finished, as if the
# returned bump stamp is not None then it must be up to date.
elif (
latest_room_bump_stamp is not None
and latest_room_bump_stamp < min_to_token_position
):
if latest_room_bump_stamp > 0:
return latest_room_bump_stamp
else:
return None
# Otherwise, if it's within or after the `to_token`, we need to find the
# last bump event before the `to_token`.
else:
last_bump_event_result = (
await self.store.get_last_event_pos_in_room_before_stream_ordering(
room_id,
to_token.room_key,
event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
)
)
if last_bump_event_result is not None:
_, new_bump_event_pos = last_bump_event_result
# If we've just joined a remote room, then the last bump event may
# have been backfilled (and so have a negative stream ordering).
# These negative stream orderings can't sensibly be compared, so
# instead we use the membership event position.
if new_bump_event_pos.stream > 0:
return new_bump_event_pos.stream
return None

View file

@ -386,9 +386,9 @@ class SlidingSyncExtensionHandler:
if have_push_rules_changed:
global_account_data_map = dict(global_account_data_map)
# TODO: This should take into account the `from_token` and `to_token`
global_account_data_map[AccountDataTypes.PUSH_RULES] = (
await self.push_rules_handler.push_rules_for_user(sync_config.user)
)
global_account_data_map[
AccountDataTypes.PUSH_RULES
] = await self.push_rules_handler.push_rules_for_user(sync_config.user)
else:
# TODO: This should take into account the `to_token`
all_global_account_data = await self.store.get_global_account_data_for_user(
@ -397,9 +397,9 @@ class SlidingSyncExtensionHandler:
global_account_data_map = dict(all_global_account_data)
# TODO: This should take into account the `to_token`
global_account_data_map[AccountDataTypes.PUSH_RULES] = (
await self.push_rules_handler.push_rules_for_user(sync_config.user)
)
global_account_data_map[
AccountDataTypes.PUSH_RULES
] = await self.push_rules_handler.push_rules_for_user(sync_config.user)
# Fetch room account data
account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] = {}

View file

@ -19,7 +19,6 @@ from itertools import chain
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Dict,
List,
Literal,
@ -48,7 +47,11 @@ from synapse.storage.databases.main.state import (
Sentinel as StateSentinel,
)
from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
from synapse.storage.roommember import RoomsForUser, RoomsForUserSlidingSync
from synapse.storage.roommember import (
RoomsForUser,
RoomsForUserSlidingSync,
RoomsForUserStateReset,
)
from synapse.types import (
MutableStateMap,
PersistedEventPosition,
@ -76,6 +79,11 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
# Helper definition for the types that we might return. We do this to avoid
# copying data between types (which can be expensive for many rooms).
RoomsForUserType = Union[RoomsForUserStateReset, RoomsForUser, RoomsForUserSlidingSync]
@attr.s(auto_attribs=True, slots=True, frozen=True)
class SlidingSyncInterestedRooms:
"""The set of rooms and metadata a client is interested in based on their
@ -91,13 +99,22 @@ class SlidingSyncInterestedRooms:
includes the rooms that *may* have relevant updates. Rooms not
in this map will definitely not have room updates (though
extensions may have updates in these rooms).
newly_joined_rooms: The set of rooms that were joined in the token range
and the user is still joined to at the end of this range.
newly_left_rooms: The set of rooms that we left in the token range
and are still "leave" at the end of this range.
dm_room_ids: The set of rooms the user consider as direct-message (DM) rooms
"""
lists: Mapping[str, SlidingSyncResult.SlidingWindowList]
relevant_room_map: Mapping[str, RoomSyncConfig]
relevant_rooms_to_send_map: Mapping[str, RoomSyncConfig]
all_rooms: Set[str]
room_membership_for_user_map: Mapping[str, "_RoomMembershipForUser"]
room_membership_for_user_map: Mapping[str, RoomsForUserType]
newly_joined_rooms: AbstractSet[str]
newly_left_rooms: AbstractSet[str]
dm_room_ids: AbstractSet[str]
class Sentinel(enum.Enum):
@ -106,47 +123,10 @@ class Sentinel(enum.Enum):
UNSET_SENTINEL = object()
@attr.s(slots=True, frozen=True, auto_attribs=True)
class _RoomMembershipForUser:
"""
Attributes:
room_id: The room ID of the membership event
event_id: The event ID of the membership event
event_pos: The stream position of the membership event
membership: The membership state of the user in the room
sender: The person who sent the membership event
newly_joined: Whether the user newly joined the room during the given token
range and is still joined to the room at the end of this range.
newly_left: Whether the user newly left (or kicked) the room during the given
token range and is still "leave" at the end of this range.
is_dm: Whether this user considers this room as a direct-message (DM) room
"""
room_id: str
# Optional because state resets can affect room membership without a corresponding event.
event_id: Optional[str]
# Even during a state reset which removes the user from the room, we expect this to
# be set because `current_state_delta_stream` will note the position that the reset
# happened.
event_pos: PersistedEventPosition
# Even during a state reset which removes the user from the room, we expect this to
# be set to `LEAVE` because we can make that assumption based on the situaton (see
# `get_current_state_delta_membership_changes_for_user(...)`)
membership: str
# Optional because state resets can affect room membership without a corresponding event.
sender: Optional[str]
newly_joined: bool
newly_left: bool
is_dm: bool
def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
return attr.evolve(self, **kwds)
def filter_membership_for_sync(
*,
user_id: str,
room_membership_for_user: Union[_RoomMembershipForUser, RoomsForUserSlidingSync],
room_membership_for_user: RoomsForUserType,
newly_left: bool,
) -> bool:
"""
@ -293,10 +273,11 @@ class SlidingSyncRoomLists:
is_encrypted=is_encrypted,
)
newly_joined_room_ids, newly_left_room_map = (
await self._get_newly_joined_and_left_rooms(
user_id, from_token=from_token, to_token=to_token
)
(
newly_joined_room_ids,
newly_left_room_map,
) = await self._get_newly_joined_and_left_rooms(
user_id, from_token=from_token, to_token=to_token
)
dm_room_ids = await self._get_dm_rooms_for_user(user_id)
@ -352,11 +333,7 @@ class SlidingSyncRoomLists:
# Find which rooms are partially stated and may need to be filtered out
# depending on the `required_state` requested (see below).
partial_state_room_map = (
await self.store.is_partial_state_room_batched(
filtered_sync_room_map.keys()
)
)
partial_state_rooms = await self.store.get_partial_rooms()
# Since creating the `RoomSyncConfig` takes some work, let's just do it
# once.
@ -368,18 +345,23 @@ class SlidingSyncRoomLists:
filtered_sync_room_map = {
room_id: room
for room_id, room in filtered_sync_room_map.items()
if not partial_state_room_map.get(room_id)
if room_id not in partial_state_rooms
}
all_rooms.update(filtered_sync_room_map)
# Sort the list
sorted_room_info = await self.sort_rooms_using_tables(
filtered_sync_room_map, to_token
)
ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
if list_config.ranges:
if list_config.ranges == [(0, len(filtered_sync_room_map) - 1)]:
# If we are asking for the full range, we don't need to sort the list.
sorted_room_info = list(filtered_sync_room_map.values())
else:
# Sort the list
sorted_room_info = await self.sort_rooms_using_tables(
filtered_sync_room_map, to_token
)
for range in list_config.ranges:
room_ids_in_list: List[str] = []
@ -428,9 +410,7 @@ class SlidingSyncRoomLists:
with start_active_span("assemble_room_subscriptions"):
# Find which rooms are partially stated and may need to be filtered out
# depending on the `required_state` requested (see below).
partial_state_room_map = await self.store.is_partial_state_room_batched(
sync_config.room_subscriptions.keys()
)
partial_state_rooms = await self.store.get_partial_rooms()
for (
room_id,
@ -450,7 +430,7 @@ class SlidingSyncRoomLists:
# Exclude partially-stated rooms if we must wait for the room to be
# fully-stated
if room_sync_config.must_await_full_state(self.is_mine_id):
if partial_state_room_map.get(room_id):
if room_id in partial_state_rooms:
continue
all_rooms.add(room_id)
@ -478,22 +458,10 @@ class SlidingSyncRoomLists:
relevant_room_map=relevant_room_map,
relevant_rooms_to_send_map=relevant_rooms_to_send_map,
all_rooms=all_rooms,
room_membership_for_user_map={
# FIXME: Ideally we wouldn't have to create a new
# `_RoomMembershipForUser` here and instead just return
# `newly_joined_room_ids` directly, to save CPU time.
room_id: _RoomMembershipForUser(
room_id=room_id,
event_id=membership_info.event_id,
event_pos=membership_info.event_pos,
sender=membership_info.sender,
membership=membership_info.membership,
newly_joined=room_id in newly_joined_room_ids,
newly_left=room_id in newly_left_room_map,
is_dm=room_id in dm_room_ids,
)
for room_id, membership_info in room_membership_for_user_map.items()
},
room_membership_for_user_map=room_membership_for_user_map,
newly_joined_rooms=newly_joined_room_ids,
newly_left_rooms=set(newly_left_room_map),
dm_room_ids=dm_room_ids,
)
async def _compute_interested_rooms_fallback(
@ -505,12 +473,16 @@ class SlidingSyncRoomLists:
) -> SlidingSyncInterestedRooms:
"""Fallback code when the database background updates haven't completed yet."""
room_membership_for_user_map = (
await self.get_room_membership_for_user_at_to_token(
sync_config.user, to_token, from_token
)
(
room_membership_for_user_map,
newly_joined_room_ids,
newly_left_room_ids,
) = await self.get_room_membership_for_user_at_to_token(
sync_config.user, to_token, from_token
)
dm_room_ids = await self._get_dm_rooms_for_user(sync_config.user.to_string())
# Assemble sliding window lists
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
# Keep track of the rooms that we can display and need to fetch more info about
@ -524,6 +496,7 @@ class SlidingSyncRoomLists:
sync_room_map = await self.filter_rooms_relevant_for_sync(
user=sync_config.user,
room_membership_for_user_map=room_membership_for_user_map,
newly_left_room_ids=newly_left_room_ids,
)
for list_key, list_config in sync_config.lists.items():
@ -535,15 +508,12 @@ class SlidingSyncRoomLists:
sync_room_map,
list_config.filters,
to_token,
dm_room_ids,
)
# Find which rooms are partially stated and may need to be filtered out
# depending on the `required_state` requested (see below).
partial_state_room_map = (
await self.store.is_partial_state_room_batched(
filtered_sync_room_map.keys()
)
)
partial_state_rooms = await self.store.get_partial_rooms()
# Since creating the `RoomSyncConfig` takes some work, let's just do it
# once.
@ -555,7 +525,7 @@ class SlidingSyncRoomLists:
filtered_sync_room_map = {
room_id: room
for room_id, room in filtered_sync_room_map.items()
if not partial_state_room_map.get(room_id)
if room_id not in partial_state_rooms
}
all_rooms.update(filtered_sync_room_map)
@ -615,9 +585,7 @@ class SlidingSyncRoomLists:
with start_active_span("assemble_room_subscriptions"):
# Find which rooms are partially stated and may need to be filtered out
# depending on the `required_state` requested (see below).
partial_state_room_map = await self.store.is_partial_state_room_batched(
sync_config.room_subscriptions.keys()
)
partial_state_rooms = await self.store.get_partial_rooms()
for (
room_id,
@ -649,7 +617,7 @@ class SlidingSyncRoomLists:
# Exclude partially-stated rooms if we must wait for the room to be
# fully-stated
if room_sync_config.must_await_full_state(self.is_mine_id):
if partial_state_room_map.get(room_id):
if room_id in partial_state_rooms:
continue
all_rooms.add(room_id)
@ -678,6 +646,9 @@ class SlidingSyncRoomLists:
relevant_rooms_to_send_map=relevant_rooms_to_send_map,
all_rooms=all_rooms,
room_membership_for_user_map=room_membership_for_user_map,
newly_joined_rooms=newly_joined_room_ids,
newly_left_rooms=newly_left_room_ids,
dm_room_ids=dm_room_ids,
)
async def _filter_relevant_room_to_send(
@ -754,7 +725,7 @@ class SlidingSyncRoomLists:
async def _get_rewind_changes_to_current_membership_to_token(
self,
user: UserID,
rooms_for_user: Mapping[str, Union[RoomsForUser, RoomsForUserSlidingSync]],
rooms_for_user: Mapping[str, RoomsForUserType],
to_token: StreamToken,
) -> Mapping[str, Optional[RoomsForUser]]:
"""
@ -906,7 +877,7 @@ class SlidingSyncRoomLists:
user: UserID,
to_token: StreamToken,
from_token: Optional[StreamToken],
) -> Dict[str, _RoomMembershipForUser]:
) -> Tuple[Dict[str, RoomsForUserType], AbstractSet[str], AbstractSet[str]]:
"""
Fetch room IDs that the user has had membership in (the full room list including
long-lost left rooms that will be filtered, sorted, and sliced).
@ -925,8 +896,11 @@ class SlidingSyncRoomLists:
from_token: The point in the stream to sync from.
Returns:
A dictionary of room IDs that the user has had membership in along with
membership information in that room at the time of `to_token`.
A 3-tuple of:
- A dictionary of room IDs that the user has had membership in along with
membership information in that room at the time of `to_token`.
- Set of newly joined rooms
- Set of newly left rooms
"""
user_id = user.to_string()
@ -943,12 +917,14 @@ class SlidingSyncRoomLists:
# If the user has never joined any rooms before, we can just return an empty list
if not room_for_user_list:
return {}
return {}, set(), set()
# Since we fetched the users room list at some point in time after the
# tokens, we need to revert/rewind some membership changes to match the point in
# time of the `to_token`.
rooms_for_user = {room.room_id: room for room in room_for_user_list}
rooms_for_user: Dict[str, RoomsForUserType] = {
room.room_id: room for room in room_for_user_list
}
changes = await self._get_rewind_changes_to_current_membership_to_token(
user, rooms_for_user, to_token
)
@ -958,48 +934,30 @@ class SlidingSyncRoomLists:
else:
rooms_for_user[room_id] = change_room_for_user
newly_joined_room_ids, newly_left_room_ids = (
await self._get_newly_joined_and_left_rooms(
user_id, to_token=to_token, from_token=from_token
)
(
newly_joined_room_ids,
newly_left_room_ids,
) = await self._get_newly_joined_and_left_rooms(
user_id, to_token=to_token, from_token=from_token
)
dm_room_ids = await self._get_dm_rooms_for_user(user_id)
# Our working list of rooms that can show up in the sync response
sync_room_id_set = {
room_for_user.room_id: _RoomMembershipForUser(
room_id=room_for_user.room_id,
event_id=room_for_user.event_id,
event_pos=room_for_user.event_pos,
membership=room_for_user.membership,
sender=room_for_user.sender,
newly_joined=room_id in newly_joined_room_ids,
newly_left=room_id in newly_left_room_ids,
is_dm=room_id in dm_room_ids,
)
for room_id, room_for_user in rooms_for_user.items()
}
# Ensure we have entries for rooms that the user has been "state reset"
# out of. These are rooms appear in the `newly_left_rooms` map but
# aren't in the `rooms_for_user` map.
for room_id, left_event_pos in newly_left_room_ids.items():
if room_id in sync_room_id_set:
if room_id in rooms_for_user:
continue
sync_room_id_set[room_id] = _RoomMembershipForUser(
rooms_for_user[room_id] = RoomsForUserStateReset(
room_id=room_id,
event_id=None,
event_pos=left_event_pos,
membership=Membership.LEAVE,
sender=None,
newly_joined=False,
newly_left=True,
is_dm=room_id in dm_room_ids,
room_version_id=await self.store.get_room_version_id(room_id),
)
return sync_room_id_set
return rooms_for_user, newly_joined_room_ids, set(newly_left_room_ids)
@trace
async def _get_newly_joined_and_left_rooms(
@ -1007,7 +965,7 @@ class SlidingSyncRoomLists:
user_id: str,
to_token: StreamToken,
from_token: Optional[StreamToken],
) -> Tuple[StrCollection, Mapping[str, PersistedEventPosition]]:
) -> Tuple[AbstractSet[str], Mapping[str, PersistedEventPosition]]:
"""Fetch the sets of rooms that the user newly joined or left in the
given token range.
@ -1160,8 +1118,9 @@ class SlidingSyncRoomLists:
async def filter_rooms_relevant_for_sync(
self,
user: UserID,
room_membership_for_user_map: Dict[str, _RoomMembershipForUser],
) -> Dict[str, _RoomMembershipForUser]:
room_membership_for_user_map: Dict[str, RoomsForUserType],
newly_left_room_ids: AbstractSet[str],
) -> Dict[str, RoomsForUserType]:
"""
Filter room IDs that should/can be listed for this user in the sync response (the
full room list that will be further filtered, sorted, and sliced).
@ -1182,6 +1141,7 @@ class SlidingSyncRoomLists:
Args:
user: User that is syncing
room_membership_for_user_map: Room membership for the user
newly_left_room_ids: The set of room IDs we have newly left
Returns:
A dictionary of room IDs that should be listed in the sync response along
@ -1196,7 +1156,7 @@ class SlidingSyncRoomLists:
if filter_membership_for_sync(
user_id=user_id,
room_membership_for_user=room_membership_for_user,
newly_left=room_membership_for_user.newly_left,
newly_left=room_id in newly_left_room_ids,
)
}
@ -1205,9 +1165,9 @@ class SlidingSyncRoomLists:
async def check_room_subscription_allowed_for_user(
self,
room_id: str,
room_membership_for_user_map: Dict[str, _RoomMembershipForUser],
room_membership_for_user_map: Dict[str, RoomsForUserType],
to_token: StreamToken,
) -> Optional[_RoomMembershipForUser]:
) -> Optional[RoomsForUserType]:
"""
Check whether the user is allowed to see the room based on whether they have
ever had membership in the room or if the room is `world_readable`.
@ -1272,7 +1232,7 @@ class SlidingSyncRoomLists:
async def _bulk_get_stripped_state_for_rooms_from_sync_room_map(
self,
room_ids: StrCollection,
sync_room_map: Dict[str, _RoomMembershipForUser],
sync_room_map: Dict[str, RoomsForUserType],
) -> Dict[str, Optional[StateMap[StrippedStateEvent]]]:
"""
Fetch stripped state for a list of room IDs. Stripped state is only
@ -1369,7 +1329,7 @@ class SlidingSyncRoomLists:
"room_encryption",
],
room_ids: Set[str],
sync_room_map: Dict[str, _RoomMembershipForUser],
sync_room_map: Dict[str, RoomsForUserType],
to_token: StreamToken,
room_id_to_stripped_state_map: Dict[
str, Optional[StateMap[StrippedStateEvent]]
@ -1533,10 +1493,11 @@ class SlidingSyncRoomLists:
async def filter_rooms(
self,
user: UserID,
sync_room_map: Dict[str, _RoomMembershipForUser],
sync_room_map: Dict[str, RoomsForUserType],
filters: SlidingSyncConfig.SlidingSyncList.Filters,
to_token: StreamToken,
) -> Dict[str, _RoomMembershipForUser]:
dm_room_ids: AbstractSet[str],
) -> Dict[str, RoomsForUserType]:
"""
Filter rooms based on the sync request.
@ -1546,6 +1507,7 @@ class SlidingSyncRoomLists:
information in the room at the time of `to_token`.
filters: Filters to apply
to_token: We filter based on the state of the room at this token
dm_room_ids: Set of room IDs that are DMs for the user
Returns:
A filtered dictionary of room IDs along with membership information in the
@ -1565,14 +1527,14 @@ class SlidingSyncRoomLists:
filtered_room_id_set = {
room_id
for room_id in filtered_room_id_set
if sync_room_map[room_id].is_dm
if room_id in dm_room_ids
}
else:
# Only non-DM rooms please
filtered_room_id_set = {
room_id
for room_id in filtered_room_id_set
if not sync_room_map[room_id].is_dm
if room_id not in dm_room_ids
}
if filters.spaces is not None:
@ -1860,9 +1822,9 @@ class SlidingSyncRoomLists:
@trace
async def sort_rooms(
self,
sync_room_map: Dict[str, _RoomMembershipForUser],
sync_room_map: Dict[str, RoomsForUserType],
to_token: StreamToken,
) -> List[_RoomMembershipForUser]:
) -> List[RoomsForUserType]:
"""
Sort by `stream_ordering` of the last event that the user should see in the
room. `stream_ordering` is unique so we get a stable sort.

View file

@ -183,10 +183,7 @@ class JoinedSyncResult:
to tell if room needs to be part of the sync result.
"""
return bool(
self.timeline
or self.state
or self.ephemeral
or self.account_data
self.timeline or self.state or self.ephemeral or self.account_data
# nb the notification count does not, er, count: if there's nothing
# else in the result, we don't need to send it.
)
@ -575,10 +572,10 @@ class SyncHandler:
if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
result: Union[SyncResult, E2eeSyncResult] = (
await self.current_sync_for_user(
sync_config, sync_version, since_token, full_state=full_state
)
result: Union[
SyncResult, E2eeSyncResult
] = await self.current_sync_for_user(
sync_config, sync_version, since_token, full_state=full_state
)
else:
# Otherwise, we wait for something to happen and report it to the user.
@ -673,10 +670,10 @@ class SyncHandler:
# Go through the `/sync` v2 path
if sync_version == SyncVersion.SYNC_V2:
sync_result: Union[SyncResult, E2eeSyncResult] = (
await self.generate_sync_result(
sync_config, since_token, full_state
)
sync_result: Union[
SyncResult, E2eeSyncResult
] = await self.generate_sync_result(
sync_config, since_token, full_state
)
# Go through the MSC3575 Sliding Sync `/sync/e2ee` path
elif sync_version == SyncVersion.E2EE_SYNC:
@ -909,7 +906,7 @@ class SyncHandler:
# Use `stream_ordering` for updates
else paginate_room_events_by_stream_ordering
)
events, end_key = await pagination_method(
events, end_key, limited = await pagination_method(
room_id=room_id,
# The bounds are reversed so we can paginate backwards
# (from newer to older events) starting at to_bound.
@ -917,9 +914,7 @@ class SyncHandler:
from_key=end_key,
to_key=since_key,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
limit=load_limit + 1,
limit=load_limit,
)
# We want to return the events in ascending order (the last event is the
# most recent).
@ -974,9 +969,6 @@ class SyncHandler:
loaded_recents.extend(recents)
recents = loaded_recents
if len(events) <= load_limit:
limited = False
break
max_repeat -= 1
if len(recents) > timeline_limit:
@ -1488,13 +1480,16 @@ class SyncHandler:
# timeline here. The caller will then dedupe any redundant
# ones.
state_ids = await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
# we only want members!
state_filter=StateFilter.from_types(
(EventTypes.Member, member) for member in members_to_fetch
),
await_full_state=False,
state_ids = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
# we only want members!
state_filter=StateFilter.from_types(
(EventTypes.Member, member)
for member in members_to_fetch
),
await_full_state=False,
)
)
return state_ids
@ -2166,18 +2161,18 @@ class SyncHandler:
if push_rules_changed:
global_account_data = dict(global_account_data)
global_account_data[AccountDataTypes.PUSH_RULES] = (
await self._push_rules_handler.push_rules_for_user(sync_config.user)
)
global_account_data[
AccountDataTypes.PUSH_RULES
] = await self._push_rules_handler.push_rules_for_user(sync_config.user)
else:
all_global_account_data = await self.store.get_global_account_data_for_user(
user_id
)
global_account_data = dict(all_global_account_data)
global_account_data[AccountDataTypes.PUSH_RULES] = (
await self._push_rules_handler.push_rules_for_user(sync_config.user)
)
global_account_data[
AccountDataTypes.PUSH_RULES
] = await self._push_rules_handler.push_rules_for_user(sync_config.user)
account_data_for_user = (
await sync_config.filter_collection.filter_global_account_data(
@ -2608,7 +2603,7 @@ class SyncHandler:
newly_joined = room_id in newly_joined_rooms
if room_entry:
events, start_key = room_entry
events, start_key, _ = room_entry
# We want to return the events in ascending order (the last event is the
# most recent).
events.reverse()

View file

@ -183,7 +183,7 @@ class WorkerLocksHandler:
return
def _wake_all_locks(
locks: Collection[Union[WaitingLock, WaitingMultiLock]]
locks: Collection[Union[WaitingLock, WaitingMultiLock]],
) -> None:
for lock in locks:
deferred = lock.deferred

View file

@ -1313,6 +1313,5 @@ def is_unknown_endpoint(
)
) or (
# Older Synapses returned a 400 error.
e.code == 400
and synapse_error.errcode == Codes.UNRECOGNIZED
e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED
)

View file

@ -233,7 +233,7 @@ def return_html_error(
def wrap_async_request_handler(
h: Callable[["_AsyncResource", "SynapseRequest"], Awaitable[None]]
h: Callable[["_AsyncResource", "SynapseRequest"], Awaitable[None]],
) -> Callable[["_AsyncResource", "SynapseRequest"], "defer.Deferred[None]"]:
"""Wraps an async request handler so that it calls request.processing.

View file

@ -22,6 +22,7 @@
"""
Log formatters that output terse JSON.
"""
import json
import logging

View file

@ -20,7 +20,7 @@
#
#
""" Thread-local-alike tracking of log contexts within synapse
"""Thread-local-alike tracking of log contexts within synapse
This module provides objects and utilities for tracking contexts through
synapse code, so that log lines can include a request identifier, and so that
@ -29,6 +29,7 @@ them.
See doc/log_contexts.rst for details on how this works.
"""
import logging
import threading
import typing
@ -751,7 +752,7 @@ def preserve_fn(
f: Union[
Callable[P, R],
Callable[P, Awaitable[R]],
]
],
) -> Callable[P, "defer.Deferred[R]"]:
"""Function decorator which wraps the function with run_in_background"""

View file

@ -169,6 +169,7 @@ Gotchas
than one caller? Will all of those calling functions have be in a context
with an active span?
"""
import contextlib
import enum
import inspect
@ -414,7 +415,7 @@ def ensure_active_span(
"""
def ensure_active_span_inner_1(
func: Callable[P, R]
func: Callable[P, R],
) -> Callable[P, Union[Optional[T], R]]:
@wraps(func)
def ensure_active_span_inner_2(
@ -700,7 +701,7 @@ def set_operation_name(operation_name: str) -> None:
@only_if_tracing
def force_tracing(
span: Union["opentracing.Span", _Sentinel] = _Sentinel.sentinel
span: Union["opentracing.Span", _Sentinel] = _Sentinel.sentinel,
) -> None:
"""Force sampling for the active/given span and its children.
@ -1093,9 +1094,10 @@ def trace_servlet(
# Mypy seems to think that start_context.tag below can be Optional[str], but
# that doesn't appear to be correct and works in practice.
request_tags[
SynapseTags.REQUEST_TAG
] = request.request_metrics.start_context.tag # type: ignore[assignment]
request_tags[SynapseTags.REQUEST_TAG] = (
request.request_metrics.start_context.tag # type: ignore[assignment]
)
# set the tags *after* the servlet completes, in case it decided to
# prioritise the span (tags will get dropped on unprioritised spans)

View file

@ -293,7 +293,7 @@ def wrap_as_background_process(
"""
def wrap_as_background_process_inner(
func: Callable[P, Awaitable[Optional[R]]]
func: Callable[P, Awaitable[Optional[R]]],
) -> Callable[P, "defer.Deferred[Optional[R]]"]:
@wraps(func)
def wrap_as_background_process_inner_2(

View file

@ -304,9 +304,9 @@ class BulkPushRuleEvaluator:
if relation_type == "m.thread" and event.content.get(
"m.relates_to", {}
).get("is_falling_back", False):
related_events["m.in_reply_to"][
"im.vector.is_falling_back"
] = ""
related_events["m.in_reply_to"]["im.vector.is_falling_back"] = (
""
)
return related_events
@ -372,7 +372,8 @@ class BulkPushRuleEvaluator:
gather_results(
(
run_in_background( # type: ignore[call-arg]
self.store.get_number_joined_users_in_room, event.room_id # type: ignore[arg-type]
self.store.get_number_joined_users_in_room,
event.room_id, # type: ignore[arg-type]
),
run_in_background(
self._get_power_levels_and_sender_level,

View file

@ -119,7 +119,9 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
return payload
async def _handle_request(self, request: Request, content: JsonDict) -> Tuple[int, JsonDict]: # type: ignore[override]
async def _handle_request( # type: ignore[override]
self, request: Request, content: JsonDict
) -> Tuple[int, JsonDict]:
with Measure(self.clock, "repl_fed_send_events_parse"):
room_id = content["room_id"]
backfilled = content["backfilled"]

View file

@ -98,7 +98,9 @@ class ReplicationCopyPusherRestServlet(ReplicationEndpoint):
self._store = hs.get_datastores().main
@staticmethod
async def _serialize_payload(user_id: str, old_room_id: str, new_room_id: str) -> JsonDict: # type: ignore[override]
async def _serialize_payload( # type: ignore[override]
user_id: str, old_room_id: str, new_room_id: str
) -> JsonDict:
return {}
async def _handle_request( # type: ignore[override]
@ -109,7 +111,6 @@ class ReplicationCopyPusherRestServlet(ReplicationEndpoint):
old_room_id: str,
new_room_id: str,
) -> Tuple[int, JsonDict]:
await self._store.copy_push_rules_from_room_to_room_for_user(
old_room_id, new_room_id, user_id
)

View file

@ -18,8 +18,8 @@
# [This file includes modifications made by New Vector Limited]
#
#
"""A replication client for use by synapse workers.
"""
"""A replication client for use by synapse workers."""
import logging
from typing import TYPE_CHECKING, Dict, Iterable, Optional, Set, Tuple

View file

@ -23,6 +23,7 @@
The VALID_SERVER_COMMANDS and VALID_CLIENT_COMMANDS define which commands are
allowed to be sent by which side.
"""
import abc
import logging
from typing import List, Optional, Tuple, Type, TypeVar

View file

@ -857,7 +857,7 @@ UpdateRow = TypeVar("UpdateRow")
def _batch_updates(
updates: Iterable[Tuple[UpdateToken, UpdateRow]]
updates: Iterable[Tuple[UpdateToken, UpdateRow]],
) -> Iterator[Tuple[UpdateToken, List[UpdateRow]]]:
"""Collect stream updates with the same token together

View file

@ -23,6 +23,7 @@ protocols.
An explanation of this protocol is available in docs/tcp_replication.md
"""
import fcntl
import logging
import struct

View file

@ -18,8 +18,7 @@
# [This file includes modifications made by New Vector Limited]
#
#
"""The server side of the replication stream.
"""
"""The server side of the replication stream."""
import logging
import random
@ -307,7 +306,7 @@ class ReplicationStreamer:
def _batch_updates(
updates: List[Tuple[Token, StreamRow]]
updates: List[Tuple[Token, StreamRow]],
) -> List[Tuple[Optional[Token], StreamRow]]:
"""Takes a list of updates of form [(token, row)] and sets the token to
None for all rows where the next row has the same token. This is used to

View file

@ -247,7 +247,7 @@ class _StreamFromIdGen(Stream):
def current_token_without_instance(
current_token: Callable[[], int]
current_token: Callable[[], int],
) -> Callable[[str], int]:
"""Takes a current token callback function for a single writer stream
that doesn't take an instance name parameter and wraps it in a function that

View file

@ -181,8 +181,7 @@ class NewRegistrationTokenRestServlet(RestServlet):
uses_allowed = body.get("uses_allowed", None)
if not (
uses_allowed is None
or (type(uses_allowed) is int and uses_allowed >= 0) # noqa: E721
uses_allowed is None or (type(uses_allowed) is int and uses_allowed >= 0) # noqa: E721
):
raise SynapseError(
HTTPStatus.BAD_REQUEST,

View file

@ -19,8 +19,8 @@
#
#
"""This module contains base REST classes for constructing client v1 servlets.
"""
"""This module contains base REST classes for constructing client v1 servlets."""
import logging
import re
from typing import Any, Awaitable, Callable, Iterable, Pattern, Tuple, TypeVar, cast

View file

@ -108,9 +108,9 @@ class AccountDataServlet(RestServlet):
# Push rules are stored in a separate table and must be queried separately.
if account_data_type == AccountDataTypes.PUSH_RULES:
account_data: Optional[JsonMapping] = (
await self._push_rules_handler.push_rules_for_user(requester.user)
)
account_data: Optional[
JsonMapping
] = await self._push_rules_handler.push_rules_for_user(requester.user)
else:
account_data = await self.store.get_global_account_data_by_type_for_user(
user_id, account_data_type

View file

@ -48,9 +48,7 @@ class AccountValidityRenewServlet(RestServlet):
self.account_renewed_template = (
hs.config.account_validity.account_validity_account_renewed_template
)
self.account_previously_renewed_template = (
hs.config.account_validity.account_validity_account_previously_renewed_template
)
self.account_previously_renewed_template = hs.config.account_validity.account_validity_account_previously_renewed_template
self.invalid_token_template = (
hs.config.account_validity.account_validity_invalid_token_template
)

View file

@ -20,6 +20,7 @@
#
"""This module contains REST servlets to do with event streaming, /events."""
import logging
from typing import TYPE_CHECKING, Dict, List, Tuple, Union

View file

@ -53,7 +53,6 @@ class KnockRoomAliasServlet(RestServlet):
super().__init__()
self.room_member_handler = hs.get_room_member_handler()
self.auth = hs.get_auth()
self._support_via = hs.config.experimental.msc4156_enabled
async def on_POST(
self,
@ -72,15 +71,11 @@ class KnockRoomAliasServlet(RestServlet):
# twisted.web.server.Request.args is incorrectly defined as Optional[Any]
args: Dict[bytes, List[bytes]] = request.args # type: ignore
remote_room_hosts = parse_strings_from_args(
args, "server_name", required=False
)
if self._support_via:
# Prefer via over server_name (deprecated with MSC4156)
remote_room_hosts = parse_strings_from_args(args, "via", required=False)
if remote_room_hosts is None:
remote_room_hosts = parse_strings_from_args(
args,
"org.matrix.msc4156.via",
default=remote_room_hosts,
required=False,
args, "server_name", required=False
)
elif RoomAlias.is_valid(room_identifier):
handler = self.room_member_handler

View file

@ -19,8 +19,8 @@
#
#
""" This module contains REST servlets to do with presence: /presence/<paths>
"""
"""This module contains REST servlets to do with presence: /presence/<paths>"""
import logging
from typing import TYPE_CHECKING, Tuple

View file

@ -19,7 +19,7 @@
#
#
""" This module contains REST servlets to do with profile: /profile/<paths> """
"""This module contains REST servlets to do with profile: /profile/<paths>"""
from http import HTTPStatus
from typing import TYPE_CHECKING, Tuple

View file

@ -640,12 +640,10 @@ class RegisterRestServlet(RestServlet):
if not password_hash:
raise SynapseError(400, "Missing params: password", Codes.MISSING_PARAM)
desired_username = (
await (
self.password_auth_provider.get_username_for_registration(
auth_result,
params,
)
desired_username = await (
self.password_auth_provider.get_username_for_registration(
auth_result,
params,
)
)
@ -696,11 +694,9 @@ class RegisterRestServlet(RestServlet):
session_id
)
display_name = (
await (
self.password_auth_provider.get_displayname_for_registration(
auth_result, params
)
display_name = await (
self.password_auth_provider.get_displayname_for_registration(
auth_result, params
)
)

View file

@ -19,7 +19,8 @@
#
#
""" This module contains REST servlets to do with rooms: /rooms/<paths> """
"""This module contains REST servlets to do with rooms: /rooms/<paths>"""
import logging
import re
from enum import Enum
@ -418,7 +419,6 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, TransactionRestServlet):
super().__init__(hs)
super(ResolveRoomIdMixin, self).__init__(hs) # ensure the Mixin is set up
self.auth = hs.get_auth()
self._support_via = hs.config.experimental.msc4156_enabled
def register(self, http_server: HttpServer) -> None:
# /join/$room_identifier[/$txn_id]
@ -436,13 +436,11 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, TransactionRestServlet):
# twisted.web.server.Request.args is incorrectly defined as Optional[Any]
args: Dict[bytes, List[bytes]] = request.args # type: ignore
remote_room_hosts = parse_strings_from_args(args, "server_name", required=False)
if self._support_via:
# Prefer via over server_name (deprecated with MSC4156)
remote_room_hosts = parse_strings_from_args(args, "via", required=False)
if remote_room_hosts is None:
remote_room_hosts = parse_strings_from_args(
args,
"org.matrix.msc4156.via",
default=remote_room_hosts,
required=False,
args, "server_name", required=False
)
room_id, remote_room_hosts = await self.resolve_room_id(
room_identifier,

View file

@ -1011,12 +1011,16 @@ class SlidingSyncRestServlet(RestServlet):
for room_id, room_result in rooms.items():
serialized_rooms[room_id] = {
"bump_stamp": room_result.bump_stamp,
"joined_count": room_result.joined_count,
"invited_count": room_result.invited_count,
"notification_count": room_result.notification_count,
"highlight_count": room_result.highlight_count,
}
if room_result.joined_count is not None:
serialized_rooms[room_id]["joined_count"] = room_result.joined_count
if room_result.invited_count is not None:
serialized_rooms[room_id]["invited_count"] = room_result.invited_count
if room_result.name:
serialized_rooms[room_id]["name"] = room_result.name
@ -1045,9 +1049,9 @@ class SlidingSyncRestServlet(RestServlet):
serialized_rooms[room_id]["initial"] = room_result.initial
if room_result.unstable_expanded_timeline:
serialized_rooms[room_id][
"unstable_expanded_timeline"
] = room_result.unstable_expanded_timeline
serialized_rooms[room_id]["unstable_expanded_timeline"] = (
room_result.unstable_expanded_timeline
)
# This will be omitted for invite/knock rooms with `stripped_state`
if (
@ -1082,9 +1086,9 @@ class SlidingSyncRestServlet(RestServlet):
# This will be omitted for invite/knock rooms with `stripped_state`
if room_result.prev_batch is not None:
serialized_rooms[room_id]["prev_batch"] = (
await room_result.prev_batch.to_string(self.store)
)
serialized_rooms[room_id][
"prev_batch"
] = await room_result.prev_batch.to_string(self.store)
# This will be omitted for invite/knock rooms with `stripped_state`
if room_result.num_live is not None:

View file

@ -21,6 +21,7 @@
"""This module contains logic for storing HTTP PUT transactions. This is used
to ensure idempotency when performing PUTs using the REST API."""
import logging
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, Hashable, Tuple

View file

@ -191,10 +191,10 @@ class RemoteKey(RestServlet):
server_keys: Dict[Tuple[str, str], Optional[FetchKeyResultForRemote]] = {}
for server_name, key_ids in query.items():
if key_ids:
results: Mapping[str, Optional[FetchKeyResultForRemote]] = (
await self.store.get_server_keys_json_for_remote(
server_name, key_ids
)
results: Mapping[
str, Optional[FetchKeyResultForRemote]
] = await self.store.get_server_keys_json_for_remote(
server_name, key_ids
)
else:
results = await self.store.get_all_server_keys_json_for_remote(

Some files were not shown because too many files have changed in this diff Show more