mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-24 18:45:52 +03:00
Allow use of the /filter
Client-Server APIs on workers. (#15134)
This commit is contained in:
parent
c369d82df0
commit
682d31c702
6 changed files with 27 additions and 8 deletions
1
changelog.d/15134.feature
Normal file
1
changelog.d/15134.feature
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Allow use of the `/filter` Client-Server APIs on workers.
|
|
@ -142,6 +142,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable/.*)/rooms/.*/aliases",
|
"^/_matrix/client/(api/v1|r0|v3|unstable/.*)/rooms/.*/aliases",
|
||||||
"^/_matrix/client/v1/rooms/.*/timestamp_to_event$",
|
"^/_matrix/client/v1/rooms/.*/timestamp_to_event$",
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/search",
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/search",
|
||||||
|
"^/_matrix/client/(r0|v3|unstable)/user/.*/filter(/|$)",
|
||||||
],
|
],
|
||||||
"shared_extra_conf": {},
|
"shared_extra_conf": {},
|
||||||
"worker_extra_conf": "",
|
"worker_extra_conf": "",
|
||||||
|
|
|
@ -232,6 +232,7 @@ information.
|
||||||
^/_matrix/client/(api/v1|r0|v3|unstable)/joined_rooms$
|
^/_matrix/client/(api/v1|r0|v3|unstable)/joined_rooms$
|
||||||
^/_matrix/client/v1/rooms/.*/timestamp_to_event$
|
^/_matrix/client/v1/rooms/.*/timestamp_to_event$
|
||||||
^/_matrix/client/(api/v1|r0|v3|unstable)/search$
|
^/_matrix/client/(api/v1|r0|v3|unstable)/search$
|
||||||
|
^/_matrix/client/(r0|v3|unstable)/user/.*/filter(/|$)
|
||||||
|
|
||||||
# Encryption requests
|
# Encryption requests
|
||||||
^/_matrix/client/(r0|v3|unstable)/keys/query$
|
^/_matrix/client/(r0|v3|unstable)/keys/query$
|
||||||
|
|
|
@ -108,8 +108,7 @@ class ClientRestResource(JsonResource):
|
||||||
if is_main_process:
|
if is_main_process:
|
||||||
logout.register_servlets(hs, client_resource)
|
logout.register_servlets(hs, client_resource)
|
||||||
sync.register_servlets(hs, client_resource)
|
sync.register_servlets(hs, client_resource)
|
||||||
if is_main_process:
|
filter.register_servlets(hs, client_resource)
|
||||||
filter.register_servlets(hs, client_resource)
|
|
||||||
account.register_servlets(hs, client_resource)
|
account.register_servlets(hs, client_resource)
|
||||||
register.register_servlets(hs, client_resource)
|
register.register_servlets(hs, client_resource)
|
||||||
if is_main_process:
|
if is_main_process:
|
||||||
|
|
|
@ -43,7 +43,7 @@ from .event_federation import EventFederationStore
|
||||||
from .event_push_actions import EventPushActionsStore
|
from .event_push_actions import EventPushActionsStore
|
||||||
from .events_bg_updates import EventsBackgroundUpdatesStore
|
from .events_bg_updates import EventsBackgroundUpdatesStore
|
||||||
from .events_forward_extremities import EventForwardExtremitiesStore
|
from .events_forward_extremities import EventForwardExtremitiesStore
|
||||||
from .filtering import FilteringStore
|
from .filtering import FilteringWorkerStore
|
||||||
from .keys import KeyStore
|
from .keys import KeyStore
|
||||||
from .lock import LockStore
|
from .lock import LockStore
|
||||||
from .media_repository import MediaRepositoryStore
|
from .media_repository import MediaRepositoryStore
|
||||||
|
@ -99,7 +99,7 @@ class DataStore(
|
||||||
EventFederationStore,
|
EventFederationStore,
|
||||||
MediaRepositoryStore,
|
MediaRepositoryStore,
|
||||||
RejectionsStore,
|
RejectionsStore,
|
||||||
FilteringStore,
|
FilteringWorkerStore,
|
||||||
PusherStore,
|
PusherStore,
|
||||||
PushRuleStore,
|
PushRuleStore,
|
||||||
ApplicationServiceTransactionStore,
|
ApplicationServiceTransactionStore,
|
||||||
|
|
|
@ -17,7 +17,7 @@ from typing import Optional, Tuple, Union, cast
|
||||||
|
|
||||||
from canonicaljson import encode_canonical_json
|
from canonicaljson import encode_canonical_json
|
||||||
|
|
||||||
from synapse.api.errors import Codes, SynapseError
|
from synapse.api.errors import Codes, StoreError, SynapseError
|
||||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||||
from synapse.storage.database import LoggingTransaction
|
from synapse.storage.database import LoggingTransaction
|
||||||
from synapse.types import JsonDict
|
from synapse.types import JsonDict
|
||||||
|
@ -46,8 +46,6 @@ class FilteringWorkerStore(SQLBaseStore):
|
||||||
|
|
||||||
return db_to_json(def_json)
|
return db_to_json(def_json)
|
||||||
|
|
||||||
|
|
||||||
class FilteringStore(FilteringWorkerStore):
|
|
||||||
async def add_user_filter(self, user_localpart: str, user_filter: JsonDict) -> int:
|
async def add_user_filter(self, user_localpart: str, user_filter: JsonDict) -> int:
|
||||||
def_json = encode_canonical_json(user_filter)
|
def_json = encode_canonical_json(user_filter)
|
||||||
|
|
||||||
|
@ -79,4 +77,23 @@ class FilteringStore(FilteringWorkerStore):
|
||||||
|
|
||||||
return filter_id
|
return filter_id
|
||||||
|
|
||||||
return await self.db_pool.runInteraction("add_user_filter", _do_txn)
|
attempts = 0
|
||||||
|
while True:
|
||||||
|
# Try a few times.
|
||||||
|
# This is technically needed if a user tries to create two filters at once,
|
||||||
|
# leading to two concurrent transactions.
|
||||||
|
# The failure case would be:
|
||||||
|
# - SELECT filter_id ... filter_json = ? → both transactions return no rows
|
||||||
|
# - SELECT MAX(filter_id) ... → both transactions return e.g. 5
|
||||||
|
# - INSERT INTO ... → both transactions insert filter_id = 6
|
||||||
|
# One of the transactions will commit. The other will get a unique key
|
||||||
|
# constraint violation error (IntegrityError). This is not the same as a
|
||||||
|
# serialisability violation, which would be automatically retried by
|
||||||
|
# `runInteraction`.
|
||||||
|
try:
|
||||||
|
return await self.db_pool.runInteraction("add_user_filter", _do_txn)
|
||||||
|
except self.db_pool.engine.module.IntegrityError:
|
||||||
|
attempts += 1
|
||||||
|
|
||||||
|
if attempts >= 5:
|
||||||
|
raise StoreError(500, "Couldn't generate a filter ID.")
|
||||||
|
|
Loading…
Reference in a new issue