Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes

This commit is contained in:
Patrick Cloke 2023-11-03 07:45:38 -04:00
commit 671266b5a9
73 changed files with 757 additions and 525 deletions

View file

@ -1,3 +1,78 @@
# Synapse 1.96.0rc1 (2023-10-31)
### Features
- Add experimental support to allow multiple workers to write to receipts stream. ([\#16432](https://github.com/matrix-org/synapse/issues/16432))
- Add a new module API for controller presence. ([\#16544](https://github.com/matrix-org/synapse/issues/16544))
- Add a new module API callback that allows adding extra fields to events' unsigned section when sent down to clients. ([\#16549](https://github.com/matrix-org/synapse/issues/16549))
- Improve the performance of claiming encryption keys. ([\#16565](https://github.com/matrix-org/synapse/issues/16565), [\#16570](https://github.com/matrix-org/synapse/issues/16570))
### Bugfixes
- Fixed a bug in the example Grafana dashboard that prevents it from finding the correct datasource. Contributed by @MichaelSasser. ([\#16471](https://github.com/matrix-org/synapse/issues/16471))
- Fix a long-standing, exceedingly rare edge case where the first event persisted by a new event persister worker might not be sent down `/sync`. ([\#16473](https://github.com/matrix-org/synapse/issues/16473), [\#16557](https://github.com/matrix-org/synapse/issues/16557), [\#16561](https://github.com/matrix-org/synapse/issues/16561), [\#16578](https://github.com/matrix-org/synapse/issues/16578), [\#16580](https://github.com/matrix-org/synapse/issues/16580))
- Fix long-standing bug where `/sync` incorrectly did not mark a room as `limited` in a sync requests when there were missing remote events. ([\#16485](https://github.com/matrix-org/synapse/issues/16485))
- Fix a bug introduced in Synapse 1.41 where HTTP(S) forward proxy authorization would fail when using basic HTTP authentication with a long `username:password` string. ([\#16504](https://github.com/matrix-org/synapse/issues/16504))
- Force TLS certificate verification in user registration script. ([\#16530](https://github.com/matrix-org/synapse/issues/16530))
- Fix long-standing bug where `/sync` could tightloop after restart when using SQLite. ([\#16540](https://github.com/matrix-org/synapse/issues/16540))
- Fix ratelimiting of message sending when using workers, where the ratelimit would only be applied after most of the work has been done. ([\#16558](https://github.com/matrix-org/synapse/issues/16558))
- Fix a long-standing bug where invited/knocking users would not leave during a room purge. ([\#16559](https://github.com/matrix-org/synapse/issues/16559))
### Improved Documentation
- Improve documentation of presence router. ([\#16529](https://github.com/matrix-org/synapse/issues/16529))
- Add a sentence to the [opentracing docs](https://matrix-org.github.io/synapse/latest/opentracing.html) on how you can have jaeger in a different place than synapse. ([\#16531](https://github.com/matrix-org/synapse/issues/16531))
- Correctly describe the meaning of unspecified rule lists in the [`alias_creation_rules`](https://matrix-org.github.io/synapse/latest/usage/configuration/config_documentation.html#alias_creation_rules) and [`room_list_publication_rules`](https://matrix-org.github.io/synapse/latest/usage/configuration/config_documentation.html#room_list_publication_rules) config options and improve their descriptions more generally. ([\#16541](https://github.com/matrix-org/synapse/issues/16541))
- Pin the recommended poetry version in [contributors' guide](https://matrix-org.github.io/synapse/latest/development/contributing_guide.html). ([\#16550](https://github.com/matrix-org/synapse/issues/16550))
- Fix a broken link to the [client breakdown](https://matrix.org/ecosystem/clients/) in the README. ([\#16569](https://github.com/matrix-org/synapse/issues/16569))
### Internal Changes
- Improve performance of delete device messages query, cf issue [16479](https://github.com/matrix-org/synapse/issues/16479). ([\#16492](https://github.com/matrix-org/synapse/issues/16492))
- Reduce memory allocations. ([\#16505](https://github.com/matrix-org/synapse/issues/16505))
- Improve replication performance when purging rooms. ([\#16510](https://github.com/matrix-org/synapse/issues/16510))
- Run tests against Python 3.12. ([\#16511](https://github.com/matrix-org/synapse/issues/16511))
- Run trial & integration tests in continuous integration when `.ci` directory is modified. ([\#16512](https://github.com/matrix-org/synapse/issues/16512))
- Remove duplicate call to mark remote server 'awake' when using a federation sending worker. ([\#16515](https://github.com/matrix-org/synapse/issues/16515))
- Enable dirty runs on Complement CI, which is significantly faster. ([\#16520](https://github.com/matrix-org/synapse/issues/16520))
- Stop deleting from an unused table. ([\#16521](https://github.com/matrix-org/synapse/issues/16521))
- Improve type hints. ([\#16526](https://github.com/matrix-org/synapse/issues/16526), [\#16551](https://github.com/matrix-org/synapse/issues/16551))
- Fix running unit tests on Twisted trunk. ([\#16528](https://github.com/matrix-org/synapse/issues/16528))
- Reduce some spurious logging in worker mode. ([\#16555](https://github.com/matrix-org/synapse/issues/16555))
- Stop porting a table in port db that we're going to nuke and rebuild anyway. ([\#16563](https://github.com/matrix-org/synapse/issues/16563))
- Deal with warnings from running complement in CI. ([\#16567](https://github.com/matrix-org/synapse/issues/16567))
- Allow building with `setuptools_rust` 1.8.0. ([\#16574](https://github.com/matrix-org/synapse/issues/16574))
### Updates to locked dependencies
* Bump black from 23.10.0 to 23.10.1. ([\#16575](https://github.com/matrix-org/synapse/issues/16575))
* Bump black from 23.9.1 to 23.10.0. ([\#16538](https://github.com/matrix-org/synapse/issues/16538))
* Bump cryptography from 41.0.4 to 41.0.5. ([\#16572](https://github.com/matrix-org/synapse/issues/16572))
* Bump gitpython from 3.1.37 to 3.1.40. ([\#16534](https://github.com/matrix-org/synapse/issues/16534))
* Bump phonenumbers from 8.13.22 to 8.13.23. ([\#16576](https://github.com/matrix-org/synapse/issues/16576))
* Bump pygithub from 1.59.1 to 2.1.1. ([\#16535](https://github.com/matrix-org/synapse/issues/16535))
- Bump matrix-synapse-ldap3 from 0.2.2 to 0.3.0. ([\#16539](https://github.com/matrix-org/synapse/issues/16539))
* Bump serde from 1.0.189 to 1.0.190. ([\#16577](https://github.com/matrix-org/synapse/issues/16577))
* Bump setuptools-rust from 1.7.0 to 1.8.0. ([\#16574](https://github.com/matrix-org/synapse/issues/16574))
* Bump types-pillow from 10.0.0.3 to 10.1.0.0. ([\#16536](https://github.com/matrix-org/synapse/issues/16536))
* Bump types-psycopg2 from 2.9.21.14 to 2.9.21.15. ([\#16573](https://github.com/matrix-org/synapse/issues/16573))
* Bump types-requests from 2.31.0.2 to 2.31.0.10. ([\#16537](https://github.com/matrix-org/synapse/issues/16537))
* Bump urllib3 from 1.26.17 to 1.26.18. ([\#16516](https://github.com/matrix-org/synapse/issues/16516))
# Synapse 1.95.1 (2023-10-31)
## Security advisory
The following issue is fixed in 1.95.1.
- [GHSA-mp92-3jfm-3575](https://github.com/matrix-org/synapse/security/advisories/GHSA-mp92-3jfm-3575) / [CVE-2023-43796](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-43796) — Moderate Severity
Cached device information of remote users can be queried from Synapse. This can be used to enumerate the remote users known to a homeserver.
See the advisory for more details. If you have any questions, email security@matrix.org.
# Synapse 1.95.0 (2023-10-24) # Synapse 1.95.0 (2023-10-24)
### Internal Changes ### Internal Changes

View file

@ -1 +0,0 @@
Allow multiple workers to write to receipts stream.

View file

@ -1 +0,0 @@
Fixed a bug that prevents Grafana from finding the correct datasource. Contributed by @MichaelSasser.

View file

@ -1 +0,0 @@
Fix a long-standing, exceedingly rare edge case where the first event persisted by a new event persister worker might not be sent down `/sync`.

View file

@ -1 +0,0 @@
Fix long-standing bug where `/sync` incorrectly did not mark a room as `limited` in a sync requests when there were missing remote events.

View file

@ -1 +0,0 @@
Improve performance of delete device messages query, cf issue [16479](https://github.com/matrix-org/synapse/issues/16479).

View file

@ -1 +0,0 @@
Fix a bug introduced in Synapse 1.41 where HTTP(S) forward proxy authorization would fail when using basic HTTP authentication with a long `username:password` string.

View file

@ -1 +0,0 @@
Reduce memory allocations.

View file

@ -1 +0,0 @@
Improve replication performance when purging rooms.

View file

@ -1 +0,0 @@
Run tests against Python 3.12.

View file

@ -1 +0,0 @@
Run trial & integration tests in continuous integration when `.ci` directory is modified.

View file

@ -1 +0,0 @@
Remove duplicate call to mark remote server 'awake' when using a federation sending worker.

View file

@ -1 +0,0 @@
Enable dirty runs on Complement CI, which is significantly faster.

View file

@ -1 +0,0 @@
Stop deleting from an unused table.

View file

@ -1 +0,0 @@
Fix running unit tests on Twisted trunk.

View file

@ -1 +0,0 @@
Improve documentation of presence router.

View file

@ -1 +0,0 @@
Force TLS certificate verification in user registration script.

View file

@ -1 +0,0 @@
Add a sentence to the opentracing docs on how you can have jaeger in a different place than synapse.

View file

@ -1 +0,0 @@
Bump matrix-synapse-ldap3 from 0.2.2 to 0.3.0.

View file

@ -1 +0,0 @@
Fix long-standing bug where `/sync` could tightloop after restart when using SQLite.

View file

@ -1 +0,0 @@
Correctly describe the meaning of unspecified rule lists in the [`alias_creation_rules`](https://matrix-org.github.io/synapse/latest/usage/configuration/config_documentation.html#alias_creation_rules) and [`room_list_publication_rules`](https://matrix-org.github.io/synapse/latest/usage/configuration/config_documentation.html#room_list_publication_rules) config options and improve their descriptions more generally.

View file

@ -1 +0,0 @@
Add a new module API for controller presence.

View file

@ -1 +0,0 @@
Add a new module API callback that allows adding extra fields to events' unsigned section when sent down to clients.

View file

@ -1 +0,0 @@
Pin the recommended poetry version in contributors' guide.

View file

@ -1 +0,0 @@
Improve type hints.

View file

@ -1 +0,0 @@
Reduce some spurious logging in worker mode.

View file

@ -1 +0,0 @@
Fix a long-standing, exceedingly rare edge case where the first event persisted by a new event persister worker might not be sent down `/sync`.

View file

@ -1 +0,0 @@
Fix ratelimiting of message sending when using workers, where the ratelimit would only be applied after most of the work has been done.

View file

@ -1 +0,0 @@
Fix a long-standing bug where invited/knocking users would not leave during a room purge.

View file

@ -1 +0,0 @@
Fix a long-standing, exceedingly rare edge case where the first event persisted by a new event persister worker might not be sent down `/sync`.

View file

@ -1 +0,0 @@
Stop porting a table in port db that we're going to nuke and rebuild anyway.

View file

@ -1 +0,0 @@
Improve the performance of claiming encryption keys.

View file

@ -1 +0,0 @@
Deal with warnings from running complement in CI.

View file

@ -1 +0,0 @@
Fix a broken link to the [client breakdown](https://matrix.org/ecosystem/clients/) in the README.

View file

@ -1 +0,0 @@
Improve the performance of claiming encryption keys.

View file

@ -1 +0,0 @@
Allow building with `setuptools_rust` 1.8.0.

View file

@ -1 +0,0 @@
Fix a long-standing, exceedingly rare edge case where the first event persisted by a new event persister worker might not be sent down `/sync`.

View file

@ -1 +0,0 @@
Fix a long-standing, exceedingly rare edge case where the first event persisted by a new event persister worker might not be sent down `/sync`.

1
changelog.d/16584.misc Normal file
View file

@ -0,0 +1 @@
Simplify persistance code to be per-room.

1
changelog.d/16585.misc Normal file
View file

@ -0,0 +1 @@
Use standard SQL helpers in persistence code.

1
changelog.d/16586.misc Normal file
View file

@ -0,0 +1 @@
Avoid updating the stream cache unnecessarily.

1
changelog.d/16588.misc Normal file
View file

@ -0,0 +1 @@
Bump twisted from 23.8.0 to 23.10.0.

1
changelog.d/16589.misc Normal file
View file

@ -0,0 +1 @@
Improve performance when using opentracing.

12
debian/changelog vendored
View file

@ -1,3 +1,15 @@
matrix-synapse-py3 (1.96.0~rc1) stable; urgency=medium
* New Synapse release 1.96.0rc1.
-- Synapse Packaging team <packages@matrix.org> Tue, 31 Oct 2023 14:09:09 +0000
matrix-synapse-py3 (1.95.1) stable; urgency=medium
* New Synapse release 1.95.1.
-- Synapse Packaging team <packages@matrix.org> Tue, 31 Oct 2023 14:00:00 +0000
matrix-synapse-py3 (1.95.0) stable; urgency=medium matrix-synapse-py3 (1.95.0) stable; urgency=medium
* New Synapse release 1.95.0. * New Synapse release 1.95.0.

17
poetry.lock generated
View file

@ -2972,13 +2972,13 @@ urllib3 = ">=1.26.0"
[[package]] [[package]]
name = "twisted" name = "twisted"
version = "23.8.0" version = "23.10.0"
description = "An asynchronous networking framework written in Python" description = "An asynchronous networking framework written in Python"
optional = false optional = false
python-versions = ">=3.7.1" python-versions = ">=3.8.0"
files = [ files = [
{file = "twisted-23.8.0-py3-none-any.whl", hash = "sha256:b8bdba145de120ffb36c20e6e071cce984e89fba798611ed0704216fb7f884cd"}, {file = "twisted-23.10.0-py3-none-any.whl", hash = "sha256:4ae8bce12999a35f7fe6443e7f1893e6fe09588c8d2bed9c35cdce8ff2d5b444"},
{file = "twisted-23.8.0.tar.gz", hash = "sha256:3c73360add17336a622c0d811c2a2ce29866b6e59b1125fd6509b17252098a24"}, {file = "twisted-23.10.0.tar.gz", hash = "sha256:987847a0790a2c597197613686e2784fd54167df3a55d0fb17c8412305d76ce5"},
] ]
[package.dependencies] [package.dependencies]
@ -2991,19 +2991,18 @@ incremental = ">=22.10.0"
pyopenssl = {version = ">=21.0.0", optional = true, markers = "extra == \"tls\""} pyopenssl = {version = ">=21.0.0", optional = true, markers = "extra == \"tls\""}
service-identity = {version = ">=18.1.0", optional = true, markers = "extra == \"tls\""} service-identity = {version = ">=18.1.0", optional = true, markers = "extra == \"tls\""}
twisted-iocpsupport = {version = ">=1.0.2,<2", markers = "platform_system == \"Windows\""} twisted-iocpsupport = {version = ">=1.0.2,<2", markers = "platform_system == \"Windows\""}
typing-extensions = ">=3.10.0" typing-extensions = ">=4.2.0"
zope-interface = ">=5" zope-interface = ">=5"
[package.extras] [package.extras]
all-non-platform = ["twisted[conch,contextvars,http2,serial,test,tls]", "twisted[conch,contextvars,http2,serial,test,tls]"] all-non-platform = ["twisted[conch,http2,serial,test,tls]", "twisted[conch,http2,serial,test,tls]"]
conch = ["appdirs (>=1.4.0)", "bcrypt (>=3.1.3)", "cryptography (>=3.3)"] conch = ["appdirs (>=1.4.0)", "bcrypt (>=3.1.3)", "cryptography (>=3.3)"]
contextvars = ["contextvars (>=2.4,<3)"]
dev = ["coverage (>=6b1,<7)", "pyflakes (>=2.2,<3.0)", "python-subunit (>=1.4,<2.0)", "twisted[dev-release]", "twistedchecker (>=0.7,<1.0)"] dev = ["coverage (>=6b1,<7)", "pyflakes (>=2.2,<3.0)", "python-subunit (>=1.4,<2.0)", "twisted[dev-release]", "twistedchecker (>=0.7,<1.0)"]
dev-release = ["pydoctor (>=23.4.0,<23.5.0)", "pydoctor (>=23.4.0,<23.5.0)", "readthedocs-sphinx-ext (>=2.2,<3.0)", "readthedocs-sphinx-ext (>=2.2,<3.0)", "sphinx (>=5,<7)", "sphinx (>=5,<7)", "sphinx-rtd-theme (>=1.2,<2.0)", "sphinx-rtd-theme (>=1.2,<2.0)", "towncrier (>=22.12,<23.0)", "towncrier (>=22.12,<23.0)", "urllib3 (<2)", "urllib3 (<2)"] dev-release = ["pydoctor (>=23.9.0,<23.10.0)", "pydoctor (>=23.9.0,<23.10.0)", "sphinx (>=6,<7)", "sphinx (>=6,<7)", "sphinx-rtd-theme (>=1.3,<2.0)", "sphinx-rtd-theme (>=1.3,<2.0)", "towncrier (>=23.6,<24.0)", "towncrier (>=23.6,<24.0)"]
gtk-platform = ["pygobject", "pygobject", "twisted[all-non-platform]", "twisted[all-non-platform]"] gtk-platform = ["pygobject", "pygobject", "twisted[all-non-platform]", "twisted[all-non-platform]"]
http2 = ["h2 (>=3.0,<5.0)", "priority (>=1.1.0,<2.0)"] http2 = ["h2 (>=3.0,<5.0)", "priority (>=1.1.0,<2.0)"]
macos-platform = ["pyobjc-core", "pyobjc-core", "pyobjc-framework-cfnetwork", "pyobjc-framework-cfnetwork", "pyobjc-framework-cocoa", "pyobjc-framework-cocoa", "twisted[all-non-platform]", "twisted[all-non-platform]"] macos-platform = ["pyobjc-core", "pyobjc-core", "pyobjc-framework-cfnetwork", "pyobjc-framework-cfnetwork", "pyobjc-framework-cocoa", "pyobjc-framework-cocoa", "twisted[all-non-platform]", "twisted[all-non-platform]"]
mypy = ["mypy (==0.981)", "mypy-extensions (==0.4.3)", "mypy-zope (==0.3.11)", "twisted[all-non-platform,dev]", "types-pyopenssl", "types-setuptools"] mypy = ["mypy (>=1.5.1,<1.6.0)", "mypy-zope (>=1.0.1,<1.1.0)", "twisted[all-non-platform,dev]", "types-pyopenssl", "types-setuptools"]
osx-platform = ["twisted[macos-platform]", "twisted[macos-platform]"] osx-platform = ["twisted[macos-platform]", "twisted[macos-platform]"]
serial = ["pyserial (>=3.0)", "pywin32 (!=226)"] serial = ["pyserial (>=3.0)", "pywin32 (!=226)"]
test = ["cython-test-exception-raiser (>=1.0.2,<2)", "hypothesis (>=6.56)", "pyhamcrest (>=2)"] test = ["cython-test-exception-raiser (>=1.0.2,<2)", "hypothesis (>=6.56)", "pyhamcrest (>=2)"]

View file

@ -96,7 +96,7 @@ module-name = "synapse.synapse_rust"
[tool.poetry] [tool.poetry]
name = "matrix-synapse" name = "matrix-synapse"
version = "1.95.0" version = "1.96.0rc1"
description = "Homeserver for the Matrix decentralised comms protocol" description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"] authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "Apache-2.0" license = "Apache-2.0"

View file

@ -84,7 +84,7 @@ from synapse.replication.http.federation import (
from synapse.storage.databases.main.lock import Lock from synapse.storage.databases.main.lock import Lock
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
from synapse.storage.roommember import MemberSummary from synapse.storage.roommember import MemberSummary
from synapse.types import JsonDict, StateMap, get_domain_from_id from synapse.types import JsonDict, StateMap, UserID, get_domain_from_id
from synapse.util import unwrapFirstError from synapse.util import unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute, gather_results from synapse.util.async_helpers import Linearizer, concurrently_execute, gather_results
from synapse.util.caches.response_cache import ResponseCache from synapse.util.caches.response_cache import ResponseCache
@ -999,6 +999,12 @@ class FederationServer(FederationBase):
async def on_claim_client_keys( async def on_claim_client_keys(
self, query: List[Tuple[str, str, str, int]], always_include_fallback_keys: bool self, query: List[Tuple[str, str, str, int]], always_include_fallback_keys: bool
) -> Dict[str, Any]: ) -> Dict[str, Any]:
if any(
not self.hs.is_mine(UserID.from_string(user_id))
for user_id, _, _, _ in query
):
raise SynapseError(400, "User is not hosted on this homeserver")
log_kv({"message": "Claiming one time keys.", "user, device pairs": query}) log_kv({"message": "Claiming one time keys.", "user, device pairs": query})
results = await self._e2e_keys_handler.claim_local_one_time_keys( results = await self._e2e_keys_handler.claim_local_one_time_keys(
query, always_include_fallback_keys=always_include_fallback_keys query, always_include_fallback_keys=always_include_fallback_keys

View file

@ -283,7 +283,7 @@ class AdminHandler:
start, limit, user_id start, limit, user_id
) )
for media in media_ids: for media in media_ids:
writer.write_media_id(media["media_id"], media) writer.write_media_id(media.media_id, attr.asdict(media))
logger.info( logger.info(
"[%s] Written %d media_ids of %s", "[%s] Written %d media_ids of %s",

View file

@ -328,6 +328,9 @@ class DeviceWorkerHandler:
return result return result
async def on_federation_query_user_devices(self, user_id: str) -> JsonDict: async def on_federation_query_user_devices(self, user_id: str) -> JsonDict:
if not self.hs.is_mine(UserID.from_string(user_id)):
raise SynapseError(400, "User is not hosted on this homeserver")
stream_id, devices = await self.store.get_e2e_device_keys_for_federation_query( stream_id, devices = await self.store.get_e2e_device_keys_for_federation_query(
user_id user_id
) )

View file

@ -542,6 +542,12 @@ class E2eKeysHandler:
device_keys_query: Dict[str, Optional[List[str]]] = query_body.get( device_keys_query: Dict[str, Optional[List[str]]] = query_body.get(
"device_keys", {} "device_keys", {}
) )
if any(
not self.is_mine(UserID.from_string(user_id))
for user_id in device_keys_query
):
raise SynapseError(400, "User is not hosted on this homeserver")
res = await self.query_local_devices( res = await self.query_local_devices(
device_keys_query, device_keys_query,
include_displaynames=( include_displaynames=(

View file

@ -33,6 +33,7 @@ from synapse.api.errors import (
RequestSendFailed, RequestSendFailed,
SynapseError, SynapseError,
) )
from synapse.storage.databases.main.room import LargestRoomStats
from synapse.types import JsonDict, JsonMapping, ThirdPartyInstanceID from synapse.types import JsonDict, JsonMapping, ThirdPartyInstanceID
from synapse.util.caches.descriptors import _CacheContext, cached from synapse.util.caches.descriptors import _CacheContext, cached
from synapse.util.caches.response_cache import ResponseCache from synapse.util.caches.response_cache import ResponseCache
@ -170,26 +171,24 @@ class RoomListHandler:
ignore_non_federatable=from_federation, ignore_non_federatable=from_federation,
) )
def build_room_entry(room: JsonDict) -> JsonDict: def build_room_entry(room: LargestRoomStats) -> JsonDict:
entry = { entry = {
"room_id": room["room_id"], "room_id": room.room_id,
"name": room["name"], "name": room.name,
"topic": room["topic"], "topic": room.topic,
"canonical_alias": room["canonical_alias"], "canonical_alias": room.canonical_alias,
"num_joined_members": room["joined_members"], "num_joined_members": room.joined_members,
"avatar_url": room["avatar"], "avatar_url": room.avatar,
"world_readable": room["history_visibility"] "world_readable": room.history_visibility
== HistoryVisibility.WORLD_READABLE, == HistoryVisibility.WORLD_READABLE,
"guest_can_join": room["guest_access"] == "can_join", "guest_can_join": room.guest_access == "can_join",
"join_rule": room["join_rules"], "join_rule": room.join_rules,
"room_type": room["room_type"], "room_type": room.room_type,
} }
# Filter out Nones rather omit the field altogether # Filter out Nones rather omit the field altogether
return {k: v for k, v in entry.items() if v is not None} return {k: v for k, v in entry.items() if v is not None}
results = [build_room_entry(r) for r in results]
response: JsonDict = {} response: JsonDict = {}
num_results = len(results) num_results = len(results)
if limit is not None: if limit is not None:
@ -212,33 +211,33 @@ class RoomListHandler:
# If there was a token given then we assume that there # If there was a token given then we assume that there
# must be previous results. # must be previous results.
response["prev_batch"] = RoomListNextBatch( response["prev_batch"] = RoomListNextBatch(
last_joined_members=initial_entry["num_joined_members"], last_joined_members=initial_entry.joined_members,
last_room_id=initial_entry["room_id"], last_room_id=initial_entry.room_id,
direction_is_forward=False, direction_is_forward=False,
).to_token() ).to_token()
if more_to_come: if more_to_come:
response["next_batch"] = RoomListNextBatch( response["next_batch"] = RoomListNextBatch(
last_joined_members=final_entry["num_joined_members"], last_joined_members=final_entry.joined_members,
last_room_id=final_entry["room_id"], last_room_id=final_entry.room_id,
direction_is_forward=True, direction_is_forward=True,
).to_token() ).to_token()
else: else:
if has_batch_token: if has_batch_token:
response["next_batch"] = RoomListNextBatch( response["next_batch"] = RoomListNextBatch(
last_joined_members=final_entry["num_joined_members"], last_joined_members=final_entry.joined_members,
last_room_id=final_entry["room_id"], last_room_id=final_entry.room_id,
direction_is_forward=True, direction_is_forward=True,
).to_token() ).to_token()
if more_to_come: if more_to_come:
response["prev_batch"] = RoomListNextBatch( response["prev_batch"] = RoomListNextBatch(
last_joined_members=initial_entry["num_joined_members"], last_joined_members=initial_entry.joined_members,
last_room_id=initial_entry["room_id"], last_room_id=initial_entry.room_id,
direction_is_forward=False, direction_is_forward=False,
).to_token() ).to_token()
response["chunk"] = results response["chunk"] = [build_room_entry(r) for r in results]
response["total_room_count_estimate"] = await self.store.count_public_rooms( response["total_room_count_estimate"] = await self.store.count_public_rooms(
network_tuple, network_tuple,

View file

@ -703,24 +703,24 @@ class RoomSummaryHandler:
# there should always be an entry # there should always be an entry
assert stats is not None, "unable to retrieve stats for %s" % (room_id,) assert stats is not None, "unable to retrieve stats for %s" % (room_id,)
entry = { entry: JsonDict = {
"room_id": stats["room_id"], "room_id": stats.room_id,
"name": stats["name"], "name": stats.name,
"topic": stats["topic"], "topic": stats.topic,
"canonical_alias": stats["canonical_alias"], "canonical_alias": stats.canonical_alias,
"num_joined_members": stats["joined_members"], "num_joined_members": stats.joined_members,
"avatar_url": stats["avatar"], "avatar_url": stats.avatar,
"join_rule": stats["join_rules"], "join_rule": stats.join_rules,
"world_readable": ( "world_readable": (
stats["history_visibility"] == HistoryVisibility.WORLD_READABLE stats.history_visibility == HistoryVisibility.WORLD_READABLE
), ),
"guest_can_join": stats["guest_access"] == "can_join", "guest_can_join": stats.guest_access == "can_join",
"room_type": stats["room_type"], "room_type": stats.room_type,
} }
if self._msc3266_enabled: if self._msc3266_enabled:
entry["im.nheko.summary.version"] = stats["version"] entry["im.nheko.summary.version"] = stats.version
entry["im.nheko.summary.encryption"] = stats["encryption"] entry["im.nheko.summary.encryption"] = stats.encryption
# Federation requests need to provide additional information so the # Federation requests need to provide additional information so the
# requested server is able to filter the response appropriately. # requested server is able to filter the response appropriately.

View file

@ -1019,11 +1019,14 @@ def tag_args(func: Callable[P, R]) -> Callable[P, R]:
if not opentracing: if not opentracing:
return func return func
# getfullargspec is somewhat expensive, so ensure it is only called a single
# time (the function signature shouldn't change anyway).
argspec = inspect.getfullargspec(func)
@contextlib.contextmanager @contextlib.contextmanager
def _wrapping_logic( def _wrapping_logic(
func: Callable[P, R], *args: P.args, **kwargs: P.kwargs _func: Callable[P, R], *args: P.args, **kwargs: P.kwargs
) -> Generator[None, None, None]: ) -> Generator[None, None, None]:
argspec = inspect.getfullargspec(func)
# We use `[1:]` to skip the `self` object reference and `start=1` to # We use `[1:]` to skip the `self` object reference and `start=1` to
# make the index line up with `argspec.args`. # make the index line up with `argspec.args`.
# #

View file

@ -17,6 +17,8 @@ import logging
from http import HTTPStatus from http import HTTPStatus
from typing import TYPE_CHECKING, Optional, Tuple from typing import TYPE_CHECKING, Optional, Tuple
import attr
from synapse.api.constants import Direction from synapse.api.constants import Direction
from synapse.api.errors import Codes, NotFoundError, SynapseError from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.http.server import HttpServer from synapse.http.server import HttpServer
@ -418,7 +420,7 @@ class UserMediaRestServlet(RestServlet):
start, limit, user_id, order_by, direction start, limit, user_id, order_by, direction
) )
ret = {"media": media, "total": total} ret = {"media": [attr.asdict(m) for m in media], "total": total}
if (start + limit) < total: if (start + limit) < total:
ret["next_token"] = start + len(media) ret["next_token"] = start + len(media)
@ -477,7 +479,7 @@ class UserMediaRestServlet(RestServlet):
) )
deleted_media, total = await self.media_repository.delete_local_media_ids( deleted_media, total = await self.media_repository.delete_local_media_ids(
[row["media_id"] for row in media] [m.media_id for m in media]
) )
return HTTPStatus.OK, {"deleted_media": deleted_media, "total": total} return HTTPStatus.OK, {"deleted_media": deleted_media, "total": total}

View file

@ -77,7 +77,18 @@ class ListRegistrationTokensRestServlet(RestServlet):
await assert_requester_is_admin(self.auth, request) await assert_requester_is_admin(self.auth, request)
valid = parse_boolean(request, "valid") valid = parse_boolean(request, "valid")
token_list = await self.store.get_registration_tokens(valid) token_list = await self.store.get_registration_tokens(valid)
return HTTPStatus.OK, {"registration_tokens": token_list} return HTTPStatus.OK, {
"registration_tokens": [
{
"token": t[0],
"uses_allowed": t[1],
"pending": t[2],
"completed": t[3],
"expiry_time": t[4],
}
for t in token_list
]
}
class NewRegistrationTokenRestServlet(RestServlet): class NewRegistrationTokenRestServlet(RestServlet):

View file

@ -16,6 +16,8 @@ from http import HTTPStatus
from typing import TYPE_CHECKING, List, Optional, Tuple, cast from typing import TYPE_CHECKING, List, Optional, Tuple, cast
from urllib import parse as urlparse from urllib import parse as urlparse
import attr
from synapse.api.constants import Direction, EventTypes, JoinRules, Membership from synapse.api.constants import Direction, EventTypes, JoinRules, Membership
from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
from synapse.api.filtering import Filter from synapse.api.filtering import Filter
@ -306,10 +308,13 @@ class RoomRestServlet(RestServlet):
raise NotFoundError("Room not found") raise NotFoundError("Room not found")
members = await self.store.get_users_in_room(room_id) members = await self.store.get_users_in_room(room_id)
ret["joined_local_devices"] = await self.store.count_devices_by_users(members) result = attr.asdict(ret)
ret["forgotten"] = await self.store.is_locally_forgotten_room(room_id) result["joined_local_devices"] = await self.store.count_devices_by_users(
members
)
result["forgotten"] = await self.store.is_locally_forgotten_room(room_id)
return HTTPStatus.OK, ret return HTTPStatus.OK, result
async def on_DELETE( async def on_DELETE(
self, request: SynapseRequest, room_id: str self, request: SynapseRequest, room_id: str

View file

@ -18,6 +18,8 @@ import secrets
from http import HTTPStatus from http import HTTPStatus
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
import attr
from synapse.api.constants import Direction, UserTypes from synapse.api.constants import Direction, UserTypes
from synapse.api.errors import Codes, NotFoundError, SynapseError from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.http.servlet import ( from synapse.http.servlet import (
@ -161,11 +163,13 @@ class UsersRestServletV2(RestServlet):
) )
# If support for MSC3866 is not enabled, don't show the approval flag. # If support for MSC3866 is not enabled, don't show the approval flag.
filter = None
if not self._msc3866_enabled: if not self._msc3866_enabled:
for user in users:
del user["approved"]
ret = {"users": users, "total": total} def _filter(a: attr.Attribute) -> bool:
return a.name != "approved"
ret = {"users": [attr.asdict(u, filter=filter) for u in users], "total": total}
if (start + limit) < total: if (start + limit) < total:
ret["next_token"] = str(start + len(users)) ret["next_token"] = str(start + len(users))

View file

@ -28,6 +28,7 @@ from typing import (
Sequence, Sequence,
Tuple, Tuple,
Type, Type,
cast,
) )
import attr import attr
@ -488,14 +489,14 @@ class BackgroundUpdater:
True if we have finished running all the background updates, otherwise False True if we have finished running all the background updates, otherwise False
""" """
def get_background_updates_txn(txn: Cursor) -> List[Dict[str, Any]]: def get_background_updates_txn(txn: Cursor) -> List[Tuple[str, Optional[str]]]:
txn.execute( txn.execute(
""" """
SELECT update_name, depends_on FROM background_updates SELECT update_name, depends_on FROM background_updates
ORDER BY ordering, update_name ORDER BY ordering, update_name
""" """
) )
return self.db_pool.cursor_to_dict(txn) return cast(List[Tuple[str, Optional[str]]], txn.fetchall())
if not self._current_background_update: if not self._current_background_update:
all_pending_updates = await self.db_pool.runInteraction( all_pending_updates = await self.db_pool.runInteraction(
@ -507,14 +508,13 @@ class BackgroundUpdater:
return True return True
# find the first update which isn't dependent on another one in the queue. # find the first update which isn't dependent on another one in the queue.
pending = {update["update_name"] for update in all_pending_updates} pending = {update_name for update_name, depends_on in all_pending_updates}
for upd in all_pending_updates: for update_name, depends_on in all_pending_updates:
depends_on = upd["depends_on"]
if not depends_on or depends_on not in pending: if not depends_on or depends_on not in pending:
break break
logger.info( logger.info(
"Not starting on bg update %s until %s is done", "Not starting on bg update %s until %s is done",
upd["update_name"], update_name,
depends_on, depends_on,
) )
else: else:
@ -524,7 +524,7 @@ class BackgroundUpdater:
"another: dependency cycle?" "another: dependency cycle?"
) )
self._current_background_update = upd["update_name"] self._current_background_update = update_name
# We have a background update to run, otherwise we would have returned # We have a background update to run, otherwise we would have returned
# early. # early.

View file

@ -542,13 +542,15 @@ class EventsPersistenceStorageController:
return await res.get_state(self._state_controller, StateFilter.all()) return await res.get_state(self._state_controller, StateFilter.all())
async def _persist_event_batch( async def _persist_event_batch(
self, _room_id: str, task: _PersistEventsTask self, room_id: str, task: _PersistEventsTask
) -> Dict[str, str]: ) -> Dict[str, str]:
"""Callback for the _event_persist_queue """Callback for the _event_persist_queue
Calculates the change to current state and forward extremities, and Calculates the change to current state and forward extremities, and
persists the given events and with those updates. persists the given events and with those updates.
Assumes that we are only persisting events for one room at a time.
Returns: Returns:
A dictionary of event ID to event ID we didn't persist as we already A dictionary of event ID to event ID we didn't persist as we already
had another event persisted with the same TXN ID. had another event persisted with the same TXN ID.
@ -594,41 +596,55 @@ class EventsPersistenceStorageController:
# We can't easily parallelize these since different chunks # We can't easily parallelize these since different chunks
# might contain the same event. :( # might contain the same event. :(
# NB: Assumes that we are only persisting events for one room new_forward_extremities = None
# at a time. state_delta_for_room = None
# map room_id->set[event_ids] giving the new forward
# extremities in each room
new_forward_extremities: Dict[str, Set[str]] = {}
# map room_id->(to_delete, to_insert) where to_delete is a list
# of type/state keys to remove from current state, and to_insert
# is a map (type,key)->event_id giving the state delta in each
# room
state_delta_for_room: Dict[str, DeltaState] = {}
if not backfilled: if not backfilled:
with Measure(self._clock, "_calculate_state_and_extrem"): with Measure(self._clock, "_calculate_state_and_extrem"):
# Work out the new "current state" for each room. # Work out the new "current state" for the room.
# We do this by working out what the new extremities are and then # We do this by working out what the new extremities are and then
# calculating the state from that. # calculating the state from that.
events_by_room: Dict[str, List[Tuple[EventBase, EventContext]]] = {} (
for event, context in chunk: new_forward_extremities,
events_by_room.setdefault(event.room_id, []).append( state_delta_for_room,
(event, context) ) = await self._calculate_new_forward_extremities_and_state_delta(
room_id, chunk
) )
for room_id, ev_ctx_rm in events_by_room.items(): await self.persist_events_store._persist_events_and_state_updates(
latest_event_ids = ( room_id,
await self.main_store.get_latest_event_ids_in_room(room_id) chunk,
state_delta_for_room=state_delta_for_room,
new_forward_extremities=new_forward_extremities,
use_negative_stream_ordering=backfilled,
inhibit_local_membership_updates=backfilled,
) )
return replaced_events
async def _calculate_new_forward_extremities_and_state_delta(
self, room_id: str, ev_ctx_rm: List[Tuple[EventBase, EventContext]]
) -> Tuple[Optional[Set[str]], Optional[DeltaState]]:
"""Calculates the new forward extremities and state delta for a room
given events to persist.
Assumes that we are only persisting events for one room at a time.
Returns:
A tuple of:
A set of str giving the new forward extremities the room
The state delta for the room.
"""
latest_event_ids = await self.main_store.get_latest_event_ids_in_room(room_id)
new_latest_event_ids = await self._calculate_new_extremities( new_latest_event_ids = await self._calculate_new_extremities(
room_id, ev_ctx_rm, latest_event_ids room_id, ev_ctx_rm, latest_event_ids
) )
if new_latest_event_ids == latest_event_ids: if new_latest_event_ids == latest_event_ids:
# No change in extremities, so no change in state # No change in extremities, so no change in state
continue return (None, None)
# there should always be at least one forward extremity. # there should always be at least one forward extremity.
# (except during the initial persistence of the send_join # (except during the initial persistence of the send_join
@ -636,22 +652,18 @@ class EventsPersistenceStorageController:
# extremities, so we'll `continue` above and skip this bit.) # extremities, so we'll `continue` above and skip this bit.)
assert new_latest_event_ids, "No forward extremities left!" assert new_latest_event_ids, "No forward extremities left!"
new_forward_extremities[room_id] = new_latest_event_ids new_forward_extremities = new_latest_event_ids
len_1 = ( len_1 = len(latest_event_ids) == 1 and len(new_latest_event_ids) == 1
len(latest_event_ids) == 1
and len(new_latest_event_ids) == 1
)
if len_1: if len_1:
all_single_prev_not_state = all( all_single_prev_not_state = all(
len(event.prev_event_ids()) == 1 len(event.prev_event_ids()) == 1 and not event.is_state()
and not event.is_state()
for event, ctx in ev_ctx_rm for event, ctx in ev_ctx_rm
) )
# Don't bother calculating state if they're just # Don't bother calculating state if they're just
# a long chain of single ancestor non-state events. # a long chain of single ancestor non-state events.
if all_single_prev_not_state: if all_single_prev_not_state:
continue return (new_forward_extremities, None)
state_delta_counter.inc() state_delta_counter.inc()
if len(new_latest_event_ids) == 1: if len(new_latest_event_ids) == 1:
@ -674,9 +686,7 @@ class EventsPersistenceStorageController:
break break
logger.debug("Calculating state delta for room %s", room_id) logger.debug("Calculating state delta for room %s", room_id)
with Measure( with Measure(self._clock, "persist_events.get_new_state_after_events"):
self._clock, "persist_events.get_new_state_after_events"
):
res = await self._get_new_state_after_events( res = await self._get_new_state_after_events(
room_id, room_id,
ev_ctx_rm, ev_ctx_rm,
@ -691,7 +701,7 @@ class EventsPersistenceStorageController:
# extremities, so we'll `continue` above and skip this bit.) # extremities, so we'll `continue` above and skip this bit.)
assert new_latest_event_ids, "No forward extremities left!" assert new_latest_event_ids, "No forward extremities left!"
new_forward_extremities[room_id] = new_latest_event_ids new_forward_extremities = new_latest_event_ids
# If either are not None then there has been a change, # If either are not None then there has been a change,
# and we need to work out the delta (or use that # and we need to work out the delta (or use that
@ -703,12 +713,8 @@ class EventsPersistenceStorageController:
# removed keys entirely. # removed keys entirely.
delta = DeltaState([], delta_ids) delta = DeltaState([], delta_ids)
elif current_state is not None: elif current_state is not None:
with Measure( with Measure(self._clock, "persist_events.calculate_state_delta"):
self._clock, "persist_events.calculate_state_delta" delta = await self._calculate_state_delta(room_id, current_state)
):
delta = await self._calculate_state_delta(
room_id, current_state
)
if delta: if delta:
# If we have a change of state then lets check # If we have a change of state then lets check
@ -725,17 +731,7 @@ class EventsPersistenceStorageController:
logger.info("Server no longer in room %s", room_id) logger.info("Server no longer in room %s", room_id)
delta.no_longer_in_room = True delta.no_longer_in_room = True
state_delta_for_room[room_id] = delta return (new_forward_extremities, delta)
await self.persist_events_store._persist_events_and_state_updates(
chunk,
state_delta_for_room=state_delta_for_room,
new_forward_extremities=new_forward_extremities,
use_negative_stream_ordering=backfilled,
inhibit_local_membership_updates=backfilled,
)
return replaced_events
async def _calculate_new_extremities( async def _calculate_new_extremities(
self, self,

View file

@ -18,7 +18,6 @@ import logging
import time import time
import types import types
from collections import defaultdict from collections import defaultdict
from sys import intern
from time import monotonic as monotonic_time from time import monotonic as monotonic_time
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
@ -1042,20 +1041,6 @@ class DatabasePool:
self._db_pool.runWithConnection(inner_func, *args, **kwargs) self._db_pool.runWithConnection(inner_func, *args, **kwargs)
) )
@staticmethod
def cursor_to_dict(cursor: Cursor) -> List[Dict[str, Any]]:
"""Converts a SQL cursor into an list of dicts.
Args:
cursor: The DBAPI cursor which has executed a query.
Returns:
A list of dicts where the key is the column header.
"""
assert cursor.description is not None, "cursor.description was None"
col_headers = [intern(str(column[0])) for column in cursor.description]
results = [dict(zip(col_headers, row)) for row in cursor]
return results
async def execute(self, desc: str, query: str, *args: Any) -> List[Tuple[Any, ...]]: async def execute(self, desc: str, query: str, *args: Any) -> List[Tuple[Any, ...]]:
"""Runs a single query for a result set. """Runs a single query for a result set.

View file

@ -17,6 +17,8 @@
import logging import logging
from typing import TYPE_CHECKING, List, Optional, Tuple, Union, cast from typing import TYPE_CHECKING, List, Optional, Tuple, Union, cast
import attr
from synapse.api.constants import Direction from synapse.api.constants import Direction
from synapse.config.homeserver import HomeServerConfig from synapse.config.homeserver import HomeServerConfig
from synapse.storage._base import make_in_list_sql_clause from synapse.storage._base import make_in_list_sql_clause
@ -28,7 +30,7 @@ from synapse.storage.database import (
from synapse.storage.databases.main.stats import UserSortOrder from synapse.storage.databases.main.stats import UserSortOrder
from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.types import Cursor from synapse.storage.types import Cursor
from synapse.types import JsonDict, get_domain_from_id from synapse.types import get_domain_from_id
from .account_data import AccountDataStore from .account_data import AccountDataStore
from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore
@ -82,6 +84,25 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class UserPaginateResponse:
"""This is very similar to UserInfo, but not quite the same."""
name: str
user_type: Optional[str]
is_guest: bool
admin: bool
deactivated: bool
shadow_banned: bool
displayname: Optional[str]
avatar_url: Optional[str]
creation_ts: Optional[int]
approved: bool
erased: bool
last_seen_ts: int
locked: bool
class DataStore( class DataStore(
EventsBackgroundUpdatesStore, EventsBackgroundUpdatesStore,
ExperimentalFeaturesStore, ExperimentalFeaturesStore,
@ -156,7 +177,7 @@ class DataStore(
approved: bool = True, approved: bool = True,
not_user_types: Optional[List[str]] = None, not_user_types: Optional[List[str]] = None,
locked: bool = False, locked: bool = False,
) -> Tuple[List[JsonDict], int]: ) -> Tuple[List[UserPaginateResponse], int]:
"""Function to retrieve a paginated list of users from """Function to retrieve a paginated list of users from
users list. This will return a json list of users and the users list. This will return a json list of users and the
total number of users matching the filter criteria. total number of users matching the filter criteria.
@ -182,7 +203,7 @@ class DataStore(
def get_users_paginate_txn( def get_users_paginate_txn(
txn: LoggingTransaction, txn: LoggingTransaction,
) -> Tuple[List[JsonDict], int]: ) -> Tuple[List[UserPaginateResponse], int]:
filters = [] filters = []
args: list = [] args: list = []
@ -282,13 +303,24 @@ class DataStore(
""" """
args += [limit, start] args += [limit, start]
txn.execute(sql, args) txn.execute(sql, args)
users = self.db_pool.cursor_to_dict(txn) users = [
UserPaginateResponse(
# some of those boolean values are returned as integers when we're on SQLite name=row[0],
columns_to_boolify = ["erased"] user_type=row[1],
for user in users: is_guest=bool(row[2]),
for column in columns_to_boolify: admin=bool(row[3]),
user[column] = bool(user[column]) deactivated=bool(row[4]),
shadow_banned=bool(row[5]),
displayname=row[6],
avatar_url=row[7],
creation_ts=row[8],
approved=bool(row[9]),
erased=bool(row[10]),
last_seen_ts=row[11],
locked=bool(row[12]),
)
for row in txn
]
return users, count return users, count

View file

@ -1622,7 +1622,6 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
# #
# For each duplicate, we delete all the existing rows and put one back. # For each duplicate, we delete all the existing rows and put one back.
KEY_COLS = ["stream_id", "destination", "user_id", "device_id"]
last_row = progress.get( last_row = progress.get(
"last_row", "last_row",
{"stream_id": 0, "destination": "", "user_id": "", "device_id": ""}, {"stream_id": 0, "destination": "", "user_id": "", "device_id": ""},
@ -1630,44 +1629,62 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
def _txn(txn: LoggingTransaction) -> int: def _txn(txn: LoggingTransaction) -> int:
clause, args = make_tuple_comparison_clause( clause, args = make_tuple_comparison_clause(
[(x, last_row[x]) for x in KEY_COLS] [
("stream_id", last_row["stream_id"]),
("destination", last_row["destination"]),
("user_id", last_row["user_id"]),
("device_id", last_row["device_id"]),
]
) )
sql = """ sql = f"""
SELECT stream_id, destination, user_id, device_id, MAX(ts) AS ts SELECT stream_id, destination, user_id, device_id, MAX(ts) AS ts
FROM device_lists_outbound_pokes FROM device_lists_outbound_pokes
WHERE %s WHERE {clause}
GROUP BY %s GROUP BY stream_id, destination, user_id, device_id
HAVING count(*) > 1 HAVING count(*) > 1
ORDER BY %s ORDER BY stream_id, destination, user_id, device_id
LIMIT ? LIMIT ?
""" % ( """
clause, # WHERE
",".join(KEY_COLS), # GROUP BY
",".join(KEY_COLS), # ORDER BY
)
txn.execute(sql, args + [batch_size]) txn.execute(sql, args + [batch_size])
rows = self.db_pool.cursor_to_dict(txn) rows = txn.fetchall()
row = None stream_id, destination, user_id, device_id = None, None, None, None
for row in rows: for stream_id, destination, user_id, device_id, _ in rows:
self.db_pool.simple_delete_txn( self.db_pool.simple_delete_txn(
txn, txn,
"device_lists_outbound_pokes", "device_lists_outbound_pokes",
{x: row[x] for x in KEY_COLS}, {
"stream_id": stream_id,
"destination": destination,
"user_id": user_id,
"device_id": device_id,
},
) )
row["sent"] = False
self.db_pool.simple_insert_txn( self.db_pool.simple_insert_txn(
txn, txn,
"device_lists_outbound_pokes", "device_lists_outbound_pokes",
row, {
"stream_id": stream_id,
"destination": destination,
"user_id": user_id,
"device_id": device_id,
"sent": False,
},
) )
if row: if rows:
self.db_pool.updates._background_update_progress_txn( self.db_pool.updates._background_update_progress_txn(
txn, txn,
BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES, BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES,
{"last_row": row}, {
"last_row": {
"stream_id": stream_id,
"destination": destination,
"user_id": user_id,
"device_id": device_id,
}
},
) )
return len(rows) return len(rows)

View file

@ -79,7 +79,7 @@ class DeltaState:
Attributes: Attributes:
to_delete: List of type/state_keys to delete from current state to_delete: List of type/state_keys to delete from current state
to_insert: Map of state to upsert into current state to_insert: Map of state to upsert into current state
no_longer_in_room: The server is not longer in the room, so the room no_longer_in_room: The server is no longer in the room, so the room
should e.g. be removed from `current_state_events` table. should e.g. be removed from `current_state_events` table.
""" """
@ -131,22 +131,25 @@ class PersistEventsStore:
@trace @trace
async def _persist_events_and_state_updates( async def _persist_events_and_state_updates(
self, self,
room_id: str,
events_and_contexts: List[Tuple[EventBase, EventContext]], events_and_contexts: List[Tuple[EventBase, EventContext]],
*, *,
state_delta_for_room: Dict[str, DeltaState], state_delta_for_room: Optional[DeltaState],
new_forward_extremities: Dict[str, Set[str]], new_forward_extremities: Optional[Set[str]],
use_negative_stream_ordering: bool = False, use_negative_stream_ordering: bool = False,
inhibit_local_membership_updates: bool = False, inhibit_local_membership_updates: bool = False,
) -> None: ) -> None:
"""Persist a set of events alongside updates to the current state and """Persist a set of events alongside updates to the current state and
forward extremities tables. forward extremities tables.
Assumes that we are only persisting events for one room at a time.
Args: Args:
room_id:
events_and_contexts: events_and_contexts:
state_delta_for_room: Map from room_id to the delta to apply to state_delta_for_room: The delta to apply to the room state
room state new_forward_extremities: A set of event IDs that are the new forward
new_forward_extremities: Map from room_id to set of event IDs extremities of the room.
that are the new forward extremities of the room.
use_negative_stream_ordering: Whether to start stream_ordering on use_negative_stream_ordering: Whether to start stream_ordering on
the negative side and decrement. This should be set as True the negative side and decrement. This should be set as True
for backfilled events because backfilled events get a negative for backfilled events because backfilled events get a negative
@ -196,6 +199,7 @@ class PersistEventsStore:
await self.db_pool.runInteraction( await self.db_pool.runInteraction(
"persist_events", "persist_events",
self._persist_events_txn, self._persist_events_txn,
room_id=room_id,
events_and_contexts=events_and_contexts, events_and_contexts=events_and_contexts,
inhibit_local_membership_updates=inhibit_local_membership_updates, inhibit_local_membership_updates=inhibit_local_membership_updates,
state_delta_for_room=state_delta_for_room, state_delta_for_room=state_delta_for_room,
@ -221,9 +225,9 @@ class PersistEventsStore:
event_counter.labels(event.type, origin_type, origin_entity).inc() event_counter.labels(event.type, origin_type, origin_entity).inc()
for room_id, latest_event_ids in new_forward_extremities.items(): if new_forward_extremities:
self.store.get_latest_event_ids_in_room.prefill( self.store.get_latest_event_ids_in_room.prefill(
(room_id,), frozenset(latest_event_ids) (room_id,), frozenset(new_forward_extremities)
) )
async def _get_events_which_are_prevs(self, event_ids: Iterable[str]) -> List[str]: async def _get_events_which_are_prevs(self, event_ids: Iterable[str]) -> List[str]:
@ -336,10 +340,11 @@ class PersistEventsStore:
self, self,
txn: LoggingTransaction, txn: LoggingTransaction,
*, *,
room_id: str,
events_and_contexts: List[Tuple[EventBase, EventContext]], events_and_contexts: List[Tuple[EventBase, EventContext]],
inhibit_local_membership_updates: bool, inhibit_local_membership_updates: bool,
state_delta_for_room: Dict[str, DeltaState], state_delta_for_room: Optional[DeltaState],
new_forward_extremities: Dict[str, Set[str]], new_forward_extremities: Optional[Set[str]],
) -> None: ) -> None:
"""Insert some number of room events into the necessary database tables. """Insert some number of room events into the necessary database tables.
@ -347,8 +352,11 @@ class PersistEventsStore:
and the rejections table. Things reading from those table will need to check and the rejections table. Things reading from those table will need to check
whether the event was rejected. whether the event was rejected.
Assumes that we are only persisting events for one room at a time.
Args: Args:
txn txn
room_id: The room the events are from
events_and_contexts: events to persist events_and_contexts: events to persist
inhibit_local_membership_updates: Stop the local_current_membership inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True from being updated by these events. This should be set to True
@ -357,10 +365,9 @@ class PersistEventsStore:
delete_existing True to purge existing table rows for the events delete_existing True to purge existing table rows for the events
from the database. This is useful when retrying due to from the database. This is useful when retrying due to
IntegrityError. IntegrityError.
state_delta_for_room: The current-state delta for each room. state_delta_for_room: The current-state delta for the room.
new_forward_extremities: The new forward extremities for each room. new_forward_extremities: The new forward extremities for the room:
For each room, a list of the event ids which are the forward a set of the event ids which are the forward extremities.
extremities.
Raises: Raises:
PartialStateConflictError: if attempting to persist a partial state event in PartialStateConflictError: if attempting to persist a partial state event in
@ -376,7 +383,6 @@ class PersistEventsStore:
# #
# Annoyingly SQLite doesn't support row level locking. # Annoyingly SQLite doesn't support row level locking.
if isinstance(self.database_engine, PostgresEngine): if isinstance(self.database_engine, PostgresEngine):
for room_id in {e.room_id for e, _ in events_and_contexts}:
txn.execute( txn.execute(
"SELECT room_version FROM rooms WHERE room_id = ? FOR SHARE", "SELECT room_version FROM rooms WHERE room_id = ? FOR SHARE",
(room_id,), (room_id,),
@ -419,7 +425,9 @@ class PersistEventsStore:
events_and_contexts events_and_contexts
) )
self._update_room_depths_txn(txn, events_and_contexts=events_and_contexts) self._update_room_depths_txn(
txn, room_id, events_and_contexts=events_and_contexts
)
# _update_outliers_txn filters out any events which have already been # _update_outliers_txn filters out any events which have already been
# persisted, and returns the filtered list. # persisted, and returns the filtered list.
@ -432,8 +440,10 @@ class PersistEventsStore:
self._store_event_txn(txn, events_and_contexts=events_and_contexts) self._store_event_txn(txn, events_and_contexts=events_and_contexts)
if new_forward_extremities:
self._update_forward_extremities_txn( self._update_forward_extremities_txn(
txn, txn,
room_id,
new_forward_extremities=new_forward_extremities, new_forward_extremities=new_forward_extremities,
max_stream_order=max_stream_order, max_stream_order=max_stream_order,
) )
@ -464,7 +474,10 @@ class PersistEventsStore:
# We call this last as it assumes we've inserted the events into # We call this last as it assumes we've inserted the events into
# room_memberships, where applicable. # room_memberships, where applicable.
# NB: This function invalidates all state related caches # NB: This function invalidates all state related caches
self._update_current_state_txn(txn, state_delta_for_room, min_stream_order) if state_delta_for_room:
self._update_current_state_txn(
txn, room_id, state_delta_for_room, min_stream_order
)
def _persist_event_auth_chain_txn( def _persist_event_auth_chain_txn(
self, self,
@ -1026,17 +1039,18 @@ class PersistEventsStore:
await self.db_pool.runInteraction( await self.db_pool.runInteraction(
"update_current_state", "update_current_state",
self._update_current_state_txn, self._update_current_state_txn,
state_delta_by_room={room_id: state_delta}, room_id,
delta_state=state_delta,
stream_id=stream_ordering, stream_id=stream_ordering,
) )
def _update_current_state_txn( def _update_current_state_txn(
self, self,
txn: LoggingTransaction, txn: LoggingTransaction,
state_delta_by_room: Dict[str, DeltaState], room_id: str,
delta_state: DeltaState,
stream_id: int, stream_id: int,
) -> None: ) -> None:
for room_id, delta_state in state_delta_by_room.items():
to_delete = delta_state.to_delete to_delete = delta_state.to_delete
to_insert = delta_state.to_insert to_insert = delta_state.to_insert
@ -1190,9 +1204,7 @@ class PersistEventsStore:
) )
# Invalidate the various caches # Invalidate the various caches
self.store._invalidate_state_caches_and_stream( self.store._invalidate_state_caches_and_stream(txn, room_id, members_changed)
txn, room_id, members_changed
)
# Check if any of the remote membership changes requires us to # Check if any of the remote membership changes requires us to
# unsubscribe from their device lists. # unsubscribe from their device lists.
@ -1232,10 +1244,10 @@ class PersistEventsStore:
def _update_forward_extremities_txn( def _update_forward_extremities_txn(
self, self,
txn: LoggingTransaction, txn: LoggingTransaction,
new_forward_extremities: Dict[str, Set[str]], room_id: str,
new_forward_extremities: Set[str],
max_stream_order: int, max_stream_order: int,
) -> None: ) -> None:
for room_id in new_forward_extremities.keys():
self.db_pool.simple_delete_txn( self.db_pool.simple_delete_txn(
txn, table="event_forward_extremities", keyvalues={"room_id": room_id} txn, table="event_forward_extremities", keyvalues={"room_id": room_id}
) )
@ -1244,11 +1256,7 @@ class PersistEventsStore:
txn, txn,
table="event_forward_extremities", table="event_forward_extremities",
keys=("event_id", "room_id"), keys=("event_id", "room_id"),
values=[ values=[(ev_id, room_id) for ev_id in new_forward_extremities],
(ev_id, room_id)
for room_id, new_extrem in new_forward_extremities.items()
for ev_id in new_extrem
],
) )
# We now insert into stream_ordering_to_exterm a mapping from room_id, # We now insert into stream_ordering_to_exterm a mapping from room_id,
# new stream_ordering to new forward extremeties in the room. # new stream_ordering to new forward extremeties in the room.
@ -1260,8 +1268,7 @@ class PersistEventsStore:
keys=("room_id", "event_id", "stream_ordering"), keys=("room_id", "event_id", "stream_ordering"),
values=[ values=[
(room_id, event_id, max_stream_order) (room_id, event_id, max_stream_order)
for room_id, new_extrem in new_forward_extremities.items() for event_id in new_forward_extremities
for event_id in new_extrem
], ],
) )
@ -1298,36 +1305,45 @@ class PersistEventsStore:
def _update_room_depths_txn( def _update_room_depths_txn(
self, self,
txn: LoggingTransaction, txn: LoggingTransaction,
room_id: str,
events_and_contexts: List[Tuple[EventBase, EventContext]], events_and_contexts: List[Tuple[EventBase, EventContext]],
) -> None: ) -> None:
"""Update min_depth for each room """Update min_depth for each room
Args: Args:
txn: db connection txn: db connection
room_id: The room ID
events_and_contexts: events we are persisting events_and_contexts: events we are persisting
""" """
depth_updates: Dict[str, int] = {} stream_ordering: Optional[int] = None
depth_update = 0
for event, context in events_and_contexts: for event, context in events_and_contexts:
# Then update the `stream_ordering` position to mark the latest # Don't update the stream ordering for backfilled events because
# event as the front of the room. This should not be done for # backfilled events have negative stream_ordering and happened in the
# backfilled events because backfilled events have negative # past, so we know that we don't need to update the stream_ordering
# stream_ordering and happened in the past so we know that we don't # tip/front for the room.
# need to update the stream_ordering tip/front for the room.
assert event.internal_metadata.stream_ordering is not None assert event.internal_metadata.stream_ordering is not None
if event.internal_metadata.stream_ordering >= 0: if event.internal_metadata.stream_ordering >= 0:
txn.call_after( if stream_ordering is None:
self.store._events_stream_cache.entity_has_changed, stream_ordering = event.internal_metadata.stream_ordering
event.room_id, else:
event.internal_metadata.stream_ordering, stream_ordering = max(
stream_ordering, event.internal_metadata.stream_ordering
) )
if not event.internal_metadata.is_outlier() and not context.rejected: if not event.internal_metadata.is_outlier() and not context.rejected:
depth_updates[event.room_id] = max( depth_update = max(event.depth, depth_update)
event.depth, depth_updates.get(event.room_id, event.depth)
# Then update the `stream_ordering` position to mark the latest event as
# the front of the room.
if stream_ordering is not None:
txn.call_after(
self.store._events_stream_cache.entity_has_changed,
room_id,
stream_ordering,
) )
for room_id, depth in depth_updates.items(): self._update_min_depth_for_room_txn(txn, room_id, depth_update)
self._update_min_depth_for_room_txn(txn, room_id, depth)
def _update_outliers_txn( def _update_outliers_txn(
self, self,
@ -1350,13 +1366,19 @@ class PersistEventsStore:
PartialStateConflictError: if attempting to persist a partial state event in PartialStateConflictError: if attempting to persist a partial state event in
a room that has been un-partial stated. a room that has been un-partial stated.
""" """
txn.execute( rows = cast(
"SELECT event_id, outlier FROM events WHERE event_id in (%s)" List[Tuple[str, bool]],
% (",".join(["?"] * len(events_and_contexts)),), self.db_pool.simple_select_many_txn(
txn,
"events",
"event_id",
[event.event_id for event, _ in events_and_contexts], [event.event_id for event, _ in events_and_contexts],
keyvalues={},
retcols=("event_id", "outlier"),
),
) )
have_persisted = dict(cast(Iterable[Tuple[str, bool]], txn)) have_persisted = dict(rows)
logger.debug( logger.debug(
"_update_outliers_txn: events=%s have_persisted=%s", "_update_outliers_txn: events=%s have_persisted=%s",

View file

@ -26,6 +26,8 @@ from typing import (
cast, cast,
) )
import attr
from synapse.api.constants import Direction from synapse.api.constants import Direction
from synapse.logging.opentracing import trace from synapse.logging.opentracing import trace
from synapse.media._base import ThumbnailInfo from synapse.media._base import ThumbnailInfo
@ -45,6 +47,18 @@ BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD_2 = (
) )
@attr.s(slots=True, frozen=True, auto_attribs=True)
class LocalMedia:
media_id: str
media_type: str
media_length: int
upload_name: str
created_ts: int
last_access_ts: int
quarantined_by: Optional[str]
safe_from_quarantine: bool
class MediaSortOrder(Enum): class MediaSortOrder(Enum):
""" """
Enum to define the sorting method used when returning media with Enum to define the sorting method used when returning media with
@ -180,7 +194,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
user_id: str, user_id: str,
order_by: str = MediaSortOrder.CREATED_TS.value, order_by: str = MediaSortOrder.CREATED_TS.value,
direction: Direction = Direction.FORWARDS, direction: Direction = Direction.FORWARDS,
) -> Tuple[List[Dict[str, Any]], int]: ) -> Tuple[List[LocalMedia], int]:
"""Get a paginated list of metadata for a local piece of media """Get a paginated list of metadata for a local piece of media
which an user_id has uploaded which an user_id has uploaded
@ -197,7 +211,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
def get_local_media_by_user_paginate_txn( def get_local_media_by_user_paginate_txn(
txn: LoggingTransaction, txn: LoggingTransaction,
) -> Tuple[List[Dict[str, Any]], int]: ) -> Tuple[List[LocalMedia], int]:
# Set ordering # Set ordering
order_by_column = MediaSortOrder(order_by).value order_by_column = MediaSortOrder(order_by).value
@ -217,14 +231,14 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
sql = """ sql = """
SELECT SELECT
"media_id", media_id,
"media_type", media_type,
"media_length", media_length,
"upload_name", upload_name,
"created_ts", created_ts,
"last_access_ts", last_access_ts,
"quarantined_by", quarantined_by,
"safe_from_quarantine" safe_from_quarantine
FROM local_media_repository FROM local_media_repository
WHERE user_id = ? WHERE user_id = ?
ORDER BY {order_by_column} {order}, media_id ASC ORDER BY {order_by_column} {order}, media_id ASC
@ -236,7 +250,19 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
args += [limit, start] args += [limit, start]
txn.execute(sql, args) txn.execute(sql, args)
media = self.db_pool.cursor_to_dict(txn) media = [
LocalMedia(
media_id=row[0],
media_type=row[1],
media_length=row[2],
upload_name=row[3],
created_ts=row[4],
last_access_ts=row[5],
quarantined_by=row[6],
safe_from_quarantine=bool(row[7]),
)
for row in txn
]
return media, count return media, count
return await self.db_pool.runInteraction( return await self.db_pool.runInteraction(

View file

@ -1517,7 +1517,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
async def get_registration_tokens( async def get_registration_tokens(
self, valid: Optional[bool] = None self, valid: Optional[bool] = None
) -> List[Dict[str, Any]]: ) -> List[Tuple[str, Optional[int], int, int, Optional[int]]]:
"""List all registration tokens. Used by the admin API. """List all registration tokens. Used by the admin API.
Args: Args:
@ -1526,34 +1526,48 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
Default is None: return all tokens regardless of validity. Default is None: return all tokens regardless of validity.
Returns: Returns:
A list of dicts, each containing details of a token. A list of tuples containing:
* The token
* The number of users allowed (or None)
* Whether it is pending
* Whether it has been completed
* An expiry time (or None if no expiry)
""" """
def select_registration_tokens_txn( def select_registration_tokens_txn(
txn: LoggingTransaction, now: int, valid: Optional[bool] txn: LoggingTransaction, now: int, valid: Optional[bool]
) -> List[Dict[str, Any]]: ) -> List[Tuple[str, Optional[int], int, int, Optional[int]]]:
if valid is None: if valid is None:
# Return all tokens regardless of validity # Return all tokens regardless of validity
txn.execute("SELECT * FROM registration_tokens") txn.execute(
"""
SELECT token, uses_allowed, pending, completed, expiry_time
FROM registration_tokens
"""
)
elif valid: elif valid:
# Select valid tokens only # Select valid tokens only
sql = ( sql = """
"SELECT * FROM registration_tokens WHERE " SELECT token, uses_allowed, pending, completed, expiry_time
"(uses_allowed > pending + completed OR uses_allowed IS NULL) " FROM registration_tokens
"AND (expiry_time > ? OR expiry_time IS NULL)" WHERE (uses_allowed > pending + completed OR uses_allowed IS NULL)
) AND (expiry_time > ? OR expiry_time IS NULL)
"""
txn.execute(sql, [now]) txn.execute(sql, [now])
else: else:
# Select invalid tokens only # Select invalid tokens only
sql = ( sql = """
"SELECT * FROM registration_tokens WHERE " SELECT token, uses_allowed, pending, completed, expiry_time
"uses_allowed <= pending + completed OR expiry_time <= ?" FROM registration_tokens
) WHERE uses_allowed <= pending + completed OR expiry_time <= ?
"""
txn.execute(sql, [now]) txn.execute(sql, [now])
return self.db_pool.cursor_to_dict(txn) return cast(
List[Tuple[str, Optional[int], int, int, Optional[int]]], txn.fetchall()
)
return await self.db_pool.runInteraction( return await self.db_pool.runInteraction(
"select_registration_tokens", "select_registration_tokens",

View file

@ -78,6 +78,31 @@ class RatelimitOverride:
burst_count: int burst_count: int
@attr.s(slots=True, frozen=True, auto_attribs=True)
class LargestRoomStats:
room_id: str
name: Optional[str]
canonical_alias: Optional[str]
joined_members: int
join_rules: Optional[str]
guest_access: Optional[str]
history_visibility: Optional[str]
state_events: int
avatar: Optional[str]
topic: Optional[str]
room_type: Optional[str]
@attr.s(slots=True, frozen=True, auto_attribs=True)
class RoomStats(LargestRoomStats):
joined_local_members: int
version: Optional[str]
creator: Optional[str]
encryption: Optional[str]
federatable: bool
public: bool
class RoomSortOrder(Enum): class RoomSortOrder(Enum):
""" """
Enum to define the sorting method used when returning rooms with get_rooms_paginate Enum to define the sorting method used when returning rooms with get_rooms_paginate
@ -204,7 +229,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
allow_none=True, allow_none=True,
) )
async def get_room_with_stats(self, room_id: str) -> Optional[Dict[str, Any]]: async def get_room_with_stats(self, room_id: str) -> Optional[RoomStats]:
"""Retrieve room with statistics. """Retrieve room with statistics.
Args: Args:
@ -215,7 +240,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
def get_room_with_stats_txn( def get_room_with_stats_txn(
txn: LoggingTransaction, room_id: str txn: LoggingTransaction, room_id: str
) -> Optional[Dict[str, Any]]: ) -> Optional[RoomStats]:
sql = """ sql = """
SELECT room_id, state.name, state.canonical_alias, curr.joined_members, SELECT room_id, state.name, state.canonical_alias, curr.joined_members,
curr.local_users_in_room AS joined_local_members, rooms.room_version AS version, curr.local_users_in_room AS joined_local_members, rooms.room_version AS version,
@ -229,15 +254,28 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
WHERE room_id = ? WHERE room_id = ?
""" """
txn.execute(sql, [room_id]) txn.execute(sql, [room_id])
# Catch error if sql returns empty result to return "None" instead of an error row = txn.fetchone()
try: if not row:
res = self.db_pool.cursor_to_dict(txn)[0]
except IndexError:
return None return None
return RoomStats(
res["federatable"] = bool(res["federatable"]) room_id=row[0],
res["public"] = bool(res["public"]) name=row[1],
return res canonical_alias=row[2],
joined_members=row[3],
joined_local_members=row[4],
version=row[5],
creator=row[6],
encryption=row[7],
federatable=bool(row[8]),
public=bool(row[9]),
join_rules=row[10],
guest_access=row[11],
history_visibility=row[12],
state_events=row[13],
avatar=row[14],
topic=row[15],
room_type=row[16],
)
return await self.db_pool.runInteraction( return await self.db_pool.runInteraction(
"get_room_with_stats", get_room_with_stats_txn, room_id "get_room_with_stats", get_room_with_stats_txn, room_id
@ -368,7 +406,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
bounds: Optional[Tuple[int, str]], bounds: Optional[Tuple[int, str]],
forwards: bool, forwards: bool,
ignore_non_federatable: bool = False, ignore_non_federatable: bool = False,
) -> List[Dict[str, Any]]: ) -> List[LargestRoomStats]:
"""Gets the largest public rooms (where largest is in terms of joined """Gets the largest public rooms (where largest is in terms of joined
members, as tracked in the statistics table). members, as tracked in the statistics table).
@ -505,20 +543,34 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
def _get_largest_public_rooms_txn( def _get_largest_public_rooms_txn(
txn: LoggingTransaction, txn: LoggingTransaction,
) -> List[Dict[str, Any]]: ) -> List[LargestRoomStats]:
txn.execute(sql, query_args) txn.execute(sql, query_args)
results = self.db_pool.cursor_to_dict(txn) results = [
LargestRoomStats(
room_id=r[0],
name=r[1],
canonical_alias=r[3],
joined_members=r[4],
join_rules=r[8],
guest_access=r[7],
history_visibility=r[6],
state_events=0,
avatar=r[5],
topic=r[2],
room_type=r[9],
)
for r in txn
]
if not forwards: if not forwards:
results.reverse() results.reverse()
return results return results
ret_val = await self.db_pool.runInteraction( return await self.db_pool.runInteraction(
"get_largest_public_rooms", _get_largest_public_rooms_txn "get_largest_public_rooms", _get_largest_public_rooms_txn
) )
return ret_val
@cached(max_entries=10000) @cached(max_entries=10000)
async def is_room_blocked(self, room_id: str) -> Optional[bool]: async def is_room_blocked(self, room_id: str) -> Optional[bool]:

View file

@ -93,7 +93,7 @@ class Clock:
_reactor: IReactorTime = attr.ib() _reactor: IReactorTime = attr.ib()
@defer.inlineCallbacks # type: ignore[arg-type] # Issue in Twisted's type annotations @defer.inlineCallbacks
def sleep(self, seconds: float) -> "Generator[Deferred[float], Any, Any]": def sleep(self, seconds: float) -> "Generator[Deferred[float], Any, Any]":
d: defer.Deferred[float] = defer.Deferred() d: defer.Deferred[float] = defer.Deferred()
with context.PreserveLoggingContext(): with context.PreserveLoggingContext():

View file

@ -342,10 +342,10 @@ class RegistrationTestCase(unittest.HomeserverTestCase):
# Ensure the room is properly not federated. # Ensure the room is properly not federated.
room = self.get_success(self.store.get_room_with_stats(room_id["room_id"])) room = self.get_success(self.store.get_room_with_stats(room_id["room_id"]))
assert room is not None assert room is not None
self.assertFalse(room["federatable"]) self.assertFalse(room.federatable)
self.assertFalse(room["public"]) self.assertFalse(room.public)
self.assertEqual(room["join_rules"], "public") self.assertEqual(room.join_rules, "public")
self.assertIsNone(room["guest_access"]) self.assertIsNone(room.guest_access)
# The user should be in the room. # The user should be in the room.
rooms = self.get_success(self.store.get_rooms_for_user(user_id)) rooms = self.get_success(self.store.get_rooms_for_user(user_id))
@ -372,7 +372,7 @@ class RegistrationTestCase(unittest.HomeserverTestCase):
# Ensure the room is properly a public room. # Ensure the room is properly a public room.
room = self.get_success(self.store.get_room_with_stats(room_id["room_id"])) room = self.get_success(self.store.get_room_with_stats(room_id["room_id"]))
assert room is not None assert room is not None
self.assertEqual(room["join_rules"], "public") self.assertEqual(room.join_rules, "public")
# Both users should be in the room. # Both users should be in the room.
rooms = self.get_success(self.store.get_rooms_for_user(inviter)) rooms = self.get_success(self.store.get_rooms_for_user(inviter))
@ -411,9 +411,9 @@ class RegistrationTestCase(unittest.HomeserverTestCase):
# Ensure the room is properly a private room. # Ensure the room is properly a private room.
room = self.get_success(self.store.get_room_with_stats(room_id["room_id"])) room = self.get_success(self.store.get_room_with_stats(room_id["room_id"]))
assert room is not None assert room is not None
self.assertFalse(room["public"]) self.assertFalse(room.public)
self.assertEqual(room["join_rules"], "invite") self.assertEqual(room.join_rules, "invite")
self.assertEqual(room["guest_access"], "can_join") self.assertEqual(room.guest_access, "can_join")
# Both users should be in the room. # Both users should be in the room.
rooms = self.get_success(self.store.get_rooms_for_user(inviter)) rooms = self.get_success(self.store.get_rooms_for_user(inviter))
@ -455,9 +455,9 @@ class RegistrationTestCase(unittest.HomeserverTestCase):
# Ensure the room is properly a private room. # Ensure the room is properly a private room.
room = self.get_success(self.store.get_room_with_stats(room_id["room_id"])) room = self.get_success(self.store.get_room_with_stats(room_id["room_id"]))
assert room is not None assert room is not None
self.assertFalse(room["public"]) self.assertFalse(room.public)
self.assertEqual(room["join_rules"], "invite") self.assertEqual(room.join_rules, "invite")
self.assertEqual(room["guest_access"], "can_join") self.assertEqual(room.guest_access, "can_join")
# Both users should be in the room. # Both users should be in the room.
rooms = self.get_success(self.store.get_rooms_for_user(inviter)) rooms = self.get_success(self.store.get_rooms_for_user(inviter))

View file

@ -182,7 +182,7 @@ def wrap_server_factory_for_tls(
) )
else: else:
return TLSMemoryBIOFactory( return TLSMemoryBIOFactory(
connection_creator, isClient=False, wrappedFactory=factory, clock=clock # type: ignore[call-arg] connection_creator, isClient=False, wrappedFactory=factory, clock=clock
) )

View file

@ -484,7 +484,7 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
if twisted.version > Version("Twisted", 23, 8, 0): if twisted.version > Version("Twisted", 23, 8, 0):
from twisted.protocols import tls from twisted.protocols import tls
tls._get_default_clock = lambda: self # type: ignore[attr-defined] tls._get_default_clock = lambda: self
self.nameResolver = SimpleResolverComplexifier(FakeResolver()) self.nameResolver = SimpleResolverComplexifier(FakeResolver())
super().__init__() super().__init__()

View file

@ -39,11 +39,11 @@ class DataStoreTestCase(unittest.HomeserverTestCase):
) )
self.assertEqual(1, total) self.assertEqual(1, total)
self.assertEqual(self.displayname, users.pop()["displayname"]) self.assertEqual(self.displayname, users.pop().displayname)
users, total = self.get_success( users, total = self.get_success(
self.store.get_users_paginate(0, 10, name="BC", guests=False) self.store.get_users_paginate(0, 10, name="BC", guests=False)
) )
self.assertEqual(1, total) self.assertEqual(1, total)
self.assertEqual(self.displayname, users.pop()["displayname"]) self.assertEqual(self.displayname, users.pop().displayname)

View file

@ -59,14 +59,9 @@ class RoomStoreTestCase(HomeserverTestCase):
def test_get_room_with_stats(self) -> None: def test_get_room_with_stats(self) -> None:
res = self.get_success(self.store.get_room_with_stats(self.room.to_string())) res = self.get_success(self.store.get_room_with_stats(self.room.to_string()))
assert res is not None assert res is not None
self.assertLessEqual( self.assertEqual(res.room_id, self.room.to_string())
{ self.assertEqual(res.creator, self.u_creator.to_string())
"room_id": self.room.to_string(), self.assertTrue(res.public)
"creator": self.u_creator.to_string(),
"public": True,
}.items(),
res.items(),
)
def test_get_room_with_stats_unknown_room(self) -> None: def test_get_room_with_stats_unknown_room(self) -> None:
self.assertIsNone( self.assertIsNone(