mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-22 01:25:44 +03:00
Merge branch 'develop' into madlittlemods/trace-get_sliding_sync_rooms_for_user
This commit is contained in:
commit
c00ae2d6cd
37 changed files with 696 additions and 578 deletions
1
changelog.d/17652.misc
Normal file
1
changelog.d/17652.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.
|
5
changelog.d/17667.misc
Normal file
5
changelog.d/17667.misc
Normal file
|
@ -0,0 +1,5 @@
|
|||
Import pydantic objects from the `_pydantic_compat` module.
|
||||
|
||||
This allows `check_pydantic_models.py` to mock those pydantic objects
|
||||
only in the synapse module, and not interfere with pydantic objects in
|
||||
external dependencies.
|
1
changelog.d/17683.misc
Normal file
1
changelog.d/17683.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Speed up sliding sync by reducing amount of data pulled out of the database for large rooms.
|
1
changelog.d/17692.bugfix
Normal file
1
changelog.d/17692.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Make sure we get up-to-date state information when using the new Sliding Sync tables to derive room membership.
|
1
changelog.d/17693.misc
Normal file
1
changelog.d/17693.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Use Sliding Sync tables as a bulk shortcut for getting the max `event_stream_ordering` of rooms.
|
|
@ -45,7 +45,6 @@ import traceback
|
|||
import unittest.mock
|
||||
from contextlib import contextmanager
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Callable,
|
||||
Dict,
|
||||
|
@ -57,30 +56,17 @@ from typing import (
|
|||
)
|
||||
|
||||
from parameterized import parameterized
|
||||
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import (
|
||||
BaseModel as PydanticBaseModel,
|
||||
conbytes,
|
||||
confloat,
|
||||
conint,
|
||||
constr,
|
||||
)
|
||||
from pydantic.v1.typing import get_args
|
||||
else:
|
||||
from pydantic import (
|
||||
BaseModel as PydanticBaseModel,
|
||||
conbytes,
|
||||
confloat,
|
||||
conint,
|
||||
constr,
|
||||
)
|
||||
from pydantic.typing import get_args
|
||||
|
||||
from typing_extensions import ParamSpec
|
||||
|
||||
from synapse._pydantic_compat import (
|
||||
BaseModel as PydanticBaseModel,
|
||||
conbytes,
|
||||
confloat,
|
||||
conint,
|
||||
constr,
|
||||
get_args,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
CONSTRAINED_TYPE_FACTORIES_WITH_STRICT_FLAG: List[Callable] = [
|
||||
|
@ -183,22 +169,16 @@ def monkeypatch_pydantic() -> Generator[None, None, None]:
|
|||
# Most Synapse code ought to import the patched objects directly from
|
||||
# `pydantic`. But we also patch their containing modules `pydantic.main` and
|
||||
# `pydantic.types` for completeness.
|
||||
patch_basemodel1 = unittest.mock.patch(
|
||||
"pydantic.BaseModel", new=PatchedBaseModel
|
||||
patch_basemodel = unittest.mock.patch(
|
||||
"synapse._pydantic_compat.BaseModel", new=PatchedBaseModel
|
||||
)
|
||||
patch_basemodel2 = unittest.mock.patch(
|
||||
"pydantic.main.BaseModel", new=PatchedBaseModel
|
||||
)
|
||||
patches.enter_context(patch_basemodel1)
|
||||
patches.enter_context(patch_basemodel2)
|
||||
patches.enter_context(patch_basemodel)
|
||||
for factory in CONSTRAINED_TYPE_FACTORIES_WITH_STRICT_FLAG:
|
||||
wrapper: Callable = make_wrapper(factory)
|
||||
patch1 = unittest.mock.patch(f"pydantic.{factory.__name__}", new=wrapper)
|
||||
patch2 = unittest.mock.patch(
|
||||
f"pydantic.types.{factory.__name__}", new=wrapper
|
||||
patch = unittest.mock.patch(
|
||||
f"synapse._pydantic_compat.{factory.__name__}", new=wrapper
|
||||
)
|
||||
patches.enter_context(patch1)
|
||||
patches.enter_context(patch2)
|
||||
patches.enter_context(patch)
|
||||
yield
|
||||
|
||||
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
#
|
||||
#
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from packaging.version import Version
|
||||
|
||||
try:
|
||||
|
@ -30,4 +32,64 @@ except ImportError:
|
|||
|
||||
HAS_PYDANTIC_V2: bool = Version(pydantic_version).major == 2
|
||||
|
||||
__all__ = ("HAS_PYDANTIC_V2",)
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import (
|
||||
BaseModel,
|
||||
Extra,
|
||||
Field,
|
||||
MissingError,
|
||||
PydanticValueError,
|
||||
StrictBool,
|
||||
StrictInt,
|
||||
StrictStr,
|
||||
ValidationError,
|
||||
conbytes,
|
||||
confloat,
|
||||
conint,
|
||||
constr,
|
||||
parse_obj_as,
|
||||
validator,
|
||||
)
|
||||
from pydantic.v1.error_wrappers import ErrorWrapper
|
||||
from pydantic.v1.typing import get_args
|
||||
else:
|
||||
from pydantic import (
|
||||
BaseModel,
|
||||
Extra,
|
||||
Field,
|
||||
MissingError,
|
||||
PydanticValueError,
|
||||
StrictBool,
|
||||
StrictInt,
|
||||
StrictStr,
|
||||
ValidationError,
|
||||
conbytes,
|
||||
confloat,
|
||||
conint,
|
||||
constr,
|
||||
parse_obj_as,
|
||||
validator,
|
||||
)
|
||||
from pydantic.error_wrappers import ErrorWrapper
|
||||
from pydantic.typing import get_args
|
||||
|
||||
__all__ = (
|
||||
"HAS_PYDANTIC_V2",
|
||||
"BaseModel",
|
||||
"constr",
|
||||
"conbytes",
|
||||
"conint",
|
||||
"confloat",
|
||||
"ErrorWrapper",
|
||||
"Extra",
|
||||
"Field",
|
||||
"get_args",
|
||||
"MissingError",
|
||||
"parse_obj_as",
|
||||
"PydanticValueError",
|
||||
"StrictBool",
|
||||
"StrictInt",
|
||||
"StrictStr",
|
||||
"ValidationError",
|
||||
"validator",
|
||||
)
|
||||
|
|
|
@ -18,17 +18,11 @@
|
|||
# [This file includes modifications made by New Vector Limited]
|
||||
#
|
||||
#
|
||||
from typing import TYPE_CHECKING, Any, Dict, Type, TypeVar
|
||||
from typing import Any, Dict, Type, TypeVar
|
||||
|
||||
import jsonschema
|
||||
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import BaseModel, ValidationError, parse_obj_as
|
||||
else:
|
||||
from pydantic import BaseModel, ValidationError, parse_obj_as
|
||||
|
||||
from synapse._pydantic_compat import BaseModel, ValidationError, parse_obj_as
|
||||
from synapse.config._base import ConfigError
|
||||
from synapse.types import JsonDict, StrSequence
|
||||
|
||||
|
|
|
@ -22,17 +22,17 @@
|
|||
|
||||
import argparse
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
import attr
|
||||
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import BaseModel, Extra, StrictBool, StrictInt, StrictStr
|
||||
else:
|
||||
from pydantic import BaseModel, Extra, StrictBool, StrictInt, StrictStr
|
||||
|
||||
from synapse._pydantic_compat import (
|
||||
BaseModel,
|
||||
Extra,
|
||||
StrictBool,
|
||||
StrictInt,
|
||||
StrictStr,
|
||||
)
|
||||
from synapse.config._base import (
|
||||
Config,
|
||||
ConfigError,
|
||||
|
|
|
@ -19,17 +19,11 @@
|
|||
#
|
||||
#
|
||||
import collections.abc
|
||||
from typing import TYPE_CHECKING, List, Type, Union, cast
|
||||
from typing import List, Type, Union, cast
|
||||
|
||||
import jsonschema
|
||||
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import Field, StrictBool, StrictStr
|
||||
else:
|
||||
from pydantic import Field, StrictBool, StrictStr
|
||||
|
||||
from synapse._pydantic_compat import Field, StrictBool, StrictStr
|
||||
from synapse.api.constants import (
|
||||
MAX_ALIAS_LENGTH,
|
||||
EventContentFields,
|
||||
|
|
|
@ -784,32 +784,10 @@ class SlidingSyncHandler:
|
|||
):
|
||||
avatar_changed = True
|
||||
|
||||
# We only need the room summary for calculating heroes, however if we do
|
||||
# fetch it then we can use it to calculate `joined_count` and
|
||||
# `invited_count`.
|
||||
room_membership_summary: Optional[Mapping[str, MemberSummary]] = None
|
||||
empty_membership_summary = MemberSummary([], 0)
|
||||
# We need the room summary for:
|
||||
# - Always for initial syncs (or the first time we send down the room)
|
||||
# - When the room has no name, we need `heroes`
|
||||
# - When the membership has changed so we need to give updated `heroes` and
|
||||
# `joined_count`/`invited_count`.
|
||||
#
|
||||
# Ideally, instead of just looking at `name_changed`, we'd check if the room
|
||||
# name is not set but this is a good enough approximation that saves us from
|
||||
# having to pull out the full event. This just means, we're generating the
|
||||
# summary whenever the room name changes instead of only when it changes to
|
||||
# `None`.
|
||||
if initial or name_changed or membership_changed:
|
||||
# We can't trace the function directly because it's cached and the `@cached`
|
||||
# decorator doesn't mix with `@trace` yet.
|
||||
with start_active_span("get_room_summary"):
|
||||
if room_membership_for_user_at_to_token.membership in (
|
||||
Membership.LEAVE,
|
||||
Membership.BAN,
|
||||
):
|
||||
# TODO: Figure out how to get the membership summary for left/banned rooms
|
||||
room_membership_summary = {}
|
||||
else:
|
||||
room_membership_summary = await self.store.get_room_summary(room_id)
|
||||
# TODO: Reverse/rewind back to the `to_token`
|
||||
|
||||
# `heroes` are required if the room name is not set.
|
||||
#
|
||||
|
@ -828,11 +806,45 @@ class SlidingSyncHandler:
|
|||
# get them on initial syncs (or the first time we send down the room) or if the
|
||||
# membership has changed which may change the heroes.
|
||||
if name_event_id is None and (initial or (not initial and membership_changed)):
|
||||
assert room_membership_summary is not None
|
||||
# We need the room summary to extract the heroes from
|
||||
if room_membership_for_user_at_to_token.membership != Membership.JOIN:
|
||||
# TODO: Figure out how to get the membership summary for left/banned rooms
|
||||
# For invite/knock rooms we don't include the information.
|
||||
room_membership_summary = {}
|
||||
else:
|
||||
room_membership_summary = await self.store.get_room_summary(room_id)
|
||||
# TODO: Reverse/rewind back to the `to_token`
|
||||
|
||||
hero_user_ids = extract_heroes_from_room_summary(
|
||||
room_membership_summary, me=user.to_string()
|
||||
)
|
||||
|
||||
# Fetch the membership counts for rooms we're joined to.
|
||||
#
|
||||
# Similarly to other metadata, we only need to calculate the member
|
||||
# counts if this is an initial sync or the memberships have changed.
|
||||
joined_count: Optional[int] = None
|
||||
invited_count: Optional[int] = None
|
||||
if (
|
||||
initial or membership_changed
|
||||
) and room_membership_for_user_at_to_token.membership == Membership.JOIN:
|
||||
# If we have the room summary (because we calculated heroes above)
|
||||
# then we can simply pull the counts from there.
|
||||
if room_membership_summary is not None:
|
||||
empty_membership_summary = MemberSummary([], 0)
|
||||
|
||||
joined_count = room_membership_summary.get(
|
||||
Membership.JOIN, empty_membership_summary
|
||||
).count
|
||||
|
||||
invited_count = room_membership_summary.get(
|
||||
Membership.INVITE, empty_membership_summary
|
||||
).count
|
||||
else:
|
||||
member_counts = await self.store.get_member_counts(room_id)
|
||||
joined_count = member_counts.get(Membership.JOIN, 0)
|
||||
invited_count = member_counts.get(Membership.INVITE, 0)
|
||||
|
||||
# Fetch the `required_state` for the room
|
||||
#
|
||||
# No `required_state` for invite/knock rooms (just `stripped_state`)
|
||||
|
@ -1090,20 +1102,6 @@ class SlidingSyncHandler:
|
|||
|
||||
set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)
|
||||
|
||||
joined_count: Optional[int] = None
|
||||
if initial or membership_changed:
|
||||
assert room_membership_summary is not None
|
||||
joined_count = room_membership_summary.get(
|
||||
Membership.JOIN, empty_membership_summary
|
||||
).count
|
||||
|
||||
invited_count: Optional[int] = None
|
||||
if initial or membership_changed:
|
||||
assert room_membership_summary is not None
|
||||
invited_count = room_membership_summary.get(
|
||||
Membership.INVITE, empty_membership_summary
|
||||
).count
|
||||
|
||||
return SlidingSyncResult.RoomResult(
|
||||
name=room_name,
|
||||
avatar=room_avatar,
|
||||
|
|
|
@ -27,6 +27,7 @@ from typing import (
|
|||
Set,
|
||||
Tuple,
|
||||
Union,
|
||||
cast,
|
||||
)
|
||||
|
||||
import attr
|
||||
|
@ -358,11 +359,18 @@ class SlidingSyncRoomLists:
|
|||
if list_config.ranges:
|
||||
if list_config.ranges == [(0, len(filtered_sync_room_map) - 1)]:
|
||||
# If we are asking for the full range, we don't need to sort the list.
|
||||
sorted_room_info = list(filtered_sync_room_map.values())
|
||||
sorted_room_info: List[RoomsForUserType] = list(
|
||||
filtered_sync_room_map.values()
|
||||
)
|
||||
else:
|
||||
# Sort the list
|
||||
sorted_room_info = await self.sort_rooms_using_tables(
|
||||
filtered_sync_room_map, to_token
|
||||
sorted_room_info = await self.sort_rooms(
|
||||
# Cast is safe because RoomsForUserSlidingSync is part
|
||||
# of the `RoomsForUserType` union. Why can't it detect this?
|
||||
cast(
|
||||
Dict[str, RoomsForUserType], filtered_sync_room_map
|
||||
),
|
||||
to_token,
|
||||
)
|
||||
|
||||
for range in list_config.ranges:
|
||||
|
@ -1765,63 +1773,6 @@ class SlidingSyncRoomLists:
|
|||
# Assemble a new sync room map but only with the `filtered_room_id_set`
|
||||
return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
|
||||
|
||||
@trace
|
||||
async def sort_rooms_using_tables(
|
||||
self,
|
||||
sync_room_map: Mapping[str, RoomsForUserSlidingSync],
|
||||
to_token: StreamToken,
|
||||
) -> List[RoomsForUserSlidingSync]:
|
||||
"""
|
||||
Sort by `stream_ordering` of the last event that the user should see in the
|
||||
room. `stream_ordering` is unique so we get a stable sort.
|
||||
|
||||
Args:
|
||||
sync_room_map: Dictionary of room IDs to sort along with membership
|
||||
information in the room at the time of `to_token`.
|
||||
to_token: We sort based on the events in the room at this token (<= `to_token`)
|
||||
|
||||
Returns:
|
||||
A sorted list of room IDs by `stream_ordering` along with membership information.
|
||||
"""
|
||||
|
||||
# Assemble a map of room ID to the `stream_ordering` of the last activity that the
|
||||
# user should see in the room (<= `to_token`)
|
||||
last_activity_in_room_map: Dict[str, int] = {}
|
||||
|
||||
for room_id, room_for_user in sync_room_map.items():
|
||||
if room_for_user.membership != Membership.JOIN:
|
||||
# If the user has left/been invited/knocked/been banned from a
|
||||
# room, they shouldn't see anything past that point.
|
||||
#
|
||||
# FIXME: It's possible that people should see beyond this point
|
||||
# in invited/knocked cases if for example the room has
|
||||
# `invite`/`world_readable` history visibility, see
|
||||
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
|
||||
last_activity_in_room_map[room_id] = room_for_user.event_pos.stream
|
||||
|
||||
# For fully-joined rooms, we find the latest activity at/before the
|
||||
# `to_token`.
|
||||
joined_room_positions = (
|
||||
await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering(
|
||||
[
|
||||
room_id
|
||||
for room_id, room_for_user in sync_room_map.items()
|
||||
if room_for_user.membership == Membership.JOIN
|
||||
],
|
||||
to_token.room_key,
|
||||
)
|
||||
)
|
||||
|
||||
last_activity_in_room_map.update(joined_room_positions)
|
||||
|
||||
return sorted(
|
||||
sync_room_map.values(),
|
||||
# Sort by the last activity (stream_ordering) in the room
|
||||
key=lambda room_info: last_activity_in_room_map[room_info.room_id],
|
||||
# We want descending order
|
||||
reverse=True,
|
||||
)
|
||||
|
||||
@trace
|
||||
async def sort_rooms(
|
||||
self,
|
||||
|
|
|
@ -37,19 +37,17 @@ from typing import (
|
|||
overload,
|
||||
)
|
||||
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import BaseModel, MissingError, PydanticValueError, ValidationError
|
||||
from pydantic.v1.error_wrappers import ErrorWrapper
|
||||
else:
|
||||
from pydantic import BaseModel, MissingError, PydanticValueError, ValidationError
|
||||
from pydantic.error_wrappers import ErrorWrapper
|
||||
|
||||
from typing_extensions import Literal
|
||||
|
||||
from twisted.web.server import Request
|
||||
|
||||
from synapse._pydantic_compat import (
|
||||
BaseModel,
|
||||
ErrorWrapper,
|
||||
MissingError,
|
||||
PydanticValueError,
|
||||
ValidationError,
|
||||
)
|
||||
from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.http import redact_uri
|
||||
from synapse.http.server import HttpServer
|
||||
|
|
|
@ -27,7 +27,7 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union
|
|||
|
||||
import attr
|
||||
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
from synapse._pydantic_compat import StrictBool
|
||||
from synapse.api.constants import Direction, UserTypes
|
||||
from synapse.api.errors import Codes, NotFoundError, SynapseError
|
||||
from synapse.http.servlet import (
|
||||
|
@ -56,11 +56,6 @@ from synapse.types.rest import RequestBodyModel
|
|||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import StrictBool
|
||||
else:
|
||||
from pydantic import StrictBool
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
|
@ -24,18 +24,12 @@ import random
|
|||
from typing import TYPE_CHECKING, List, Optional, Tuple
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import StrictBool, StrictStr, constr
|
||||
else:
|
||||
from pydantic import StrictBool, StrictStr, constr
|
||||
|
||||
import attr
|
||||
from typing_extensions import Literal
|
||||
|
||||
from twisted.web.server import Request
|
||||
|
||||
from synapse._pydantic_compat import StrictBool, StrictStr, constr
|
||||
from synapse.api.constants import LoginType
|
||||
from synapse.api.errors import (
|
||||
Codes,
|
||||
|
|
|
@ -24,13 +24,7 @@ import logging
|
|||
from http import HTTPStatus
|
||||
from typing import TYPE_CHECKING, List, Optional, Tuple
|
||||
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import Extra, StrictStr
|
||||
else:
|
||||
from pydantic import Extra, StrictStr
|
||||
|
||||
from synapse._pydantic_compat import Extra, StrictStr
|
||||
from synapse.api import errors
|
||||
from synapse.api.errors import NotFoundError, SynapseError, UnrecognizedRequestError
|
||||
from synapse.handlers.device import DeviceHandler
|
||||
|
|
|
@ -22,17 +22,11 @@
|
|||
import logging
|
||||
from typing import TYPE_CHECKING, List, Optional, Tuple
|
||||
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import StrictStr
|
||||
else:
|
||||
from pydantic import StrictStr
|
||||
|
||||
from typing_extensions import Literal
|
||||
|
||||
from twisted.web.server import Request
|
||||
|
||||
from synapse._pydantic_compat import StrictStr
|
||||
from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
|
||||
from synapse.http.server import HttpServer
|
||||
from synapse.http.servlet import (
|
||||
|
|
|
@ -23,7 +23,7 @@ import logging
|
|||
from http import HTTPStatus
|
||||
from typing import TYPE_CHECKING, Tuple
|
||||
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
from synapse._pydantic_compat import StrictStr
|
||||
from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
|
||||
from synapse.http.server import HttpServer
|
||||
from synapse.http.servlet import (
|
||||
|
@ -40,10 +40,6 @@ from ._base import client_patterns
|
|||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import StrictStr
|
||||
else:
|
||||
from pydantic import StrictStr
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
|
@ -23,17 +23,11 @@ import logging
|
|||
import re
|
||||
from typing import TYPE_CHECKING, Dict, Mapping, Optional, Set, Tuple
|
||||
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import Extra, StrictInt, StrictStr
|
||||
else:
|
||||
from pydantic import Extra, StrictInt, StrictStr
|
||||
|
||||
from signedjson.sign import sign_json
|
||||
|
||||
from twisted.web.server import Request
|
||||
|
||||
from synapse._pydantic_compat import Extra, StrictInt, StrictStr
|
||||
from synapse.crypto.keyring import ServerKeyFetcher
|
||||
from synapse.http.server import HttpServer
|
||||
from synapse.http.servlet import (
|
||||
|
|
|
@ -112,6 +112,7 @@ class SQLBaseStore(metaclass=ABCMeta):
|
|||
self._attempt_to_invalidate_cache(
|
||||
"get_number_joined_users_in_room", (room_id,)
|
||||
)
|
||||
self._attempt_to_invalidate_cache("get_member_counts", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
|
||||
|
||||
# There's no easy way of invalidating this cache for just the users
|
||||
|
@ -135,6 +136,7 @@ class SQLBaseStore(metaclass=ABCMeta):
|
|||
self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_room_type", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
|
||||
|
||||
def _invalidate_state_caches_all(self, room_id: str) -> None:
|
||||
"""Invalidates caches that are based on the current state, but does
|
||||
|
@ -153,6 +155,7 @@ class SQLBaseStore(metaclass=ABCMeta):
|
|||
self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_number_joined_users_in_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_member_counts", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
|
||||
self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)
|
||||
|
|
|
@ -40,7 +40,7 @@ from typing import (
|
|||
|
||||
import attr
|
||||
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
from synapse._pydantic_compat import BaseModel
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.types import Connection, Cursor
|
||||
|
@ -49,11 +49,6 @@ from synapse.util import Clock, json_encoder
|
|||
|
||||
from . import engines
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import BaseModel
|
||||
else:
|
||||
from pydantic import BaseModel
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.database import (
|
||||
|
|
|
@ -41,6 +41,7 @@ from synapse.storage.database import (
|
|||
LoggingDatabaseConnection,
|
||||
LoggingTransaction,
|
||||
)
|
||||
from synapse.storage.databases.main.events import SLIDING_SYNC_RELEVANT_STATE_SET
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator
|
||||
from synapse.util.caches.descriptors import CachedFunction
|
||||
|
@ -271,12 +272,20 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
|||
self._attempt_to_invalidate_cache(
|
||||
"get_rooms_for_user", (data.state_key,)
|
||||
)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_sliding_sync_rooms_for_user", None
|
||||
)
|
||||
elif data.type == EventTypes.RoomEncryption:
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_room_encryption", (data.room_id,)
|
||||
)
|
||||
elif data.type == EventTypes.Create:
|
||||
self._attempt_to_invalidate_cache("get_room_type", (data.room_id,))
|
||||
|
||||
if (data.type, data.state_key) in SLIDING_SYNC_RELEVANT_STATE_SET:
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_sliding_sync_rooms_for_user", None
|
||||
)
|
||||
elif row.type == EventsStreamAllStateRow.TypeId:
|
||||
assert isinstance(data, EventsStreamAllStateRow)
|
||||
# Similar to the above, but the entire caches are invalidated. This is
|
||||
|
@ -285,6 +294,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
|||
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache("get_room_type", (data.room_id,))
|
||||
self._attempt_to_invalidate_cache("get_room_encryption", (data.room_id,))
|
||||
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
|
||||
else:
|
||||
raise Exception("Unknown events stream row type %s" % (row.type,))
|
||||
|
||||
|
@ -365,6 +375,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
|||
elif etype == EventTypes.RoomEncryption:
|
||||
self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
|
||||
|
||||
if (etype, state_key) in SLIDING_SYNC_RELEVANT_STATE_SET:
|
||||
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
|
||||
|
||||
if relates_to:
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_relations_for_event",
|
||||
|
@ -477,6 +490,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
|||
self._attempt_to_invalidate_cache(
|
||||
"get_current_hosts_in_room_ordered", (room_id,)
|
||||
)
|
||||
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache("did_forget", None)
|
||||
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)
|
||||
|
|
|
@ -1980,7 +1980,12 @@ class PersistEventsStore:
|
|||
if state_key == (EventTypes.Create, ""):
|
||||
room_type = event.content.get(EventContentFields.ROOM_TYPE)
|
||||
# Scrutinize JSON values
|
||||
if room_type is None or isinstance(room_type, str):
|
||||
if room_type is None or (
|
||||
isinstance(room_type, str)
|
||||
# We ignore values with null bytes as Postgres doesn't allow them in
|
||||
# text columns.
|
||||
and "\0" not in room_type
|
||||
):
|
||||
sliding_sync_insert_map["room_type"] = room_type
|
||||
elif state_key == (EventTypes.RoomEncryption, ""):
|
||||
encryption_algorithm = event.content.get(
|
||||
|
@ -1990,15 +1995,26 @@ class PersistEventsStore:
|
|||
sliding_sync_insert_map["is_encrypted"] = is_encrypted
|
||||
elif state_key == (EventTypes.Name, ""):
|
||||
room_name = event.content.get(EventContentFields.ROOM_NAME)
|
||||
# Scrutinize JSON values
|
||||
if room_name is None or isinstance(room_name, str):
|
||||
# Scrutinize JSON values. We ignore values with nulls as
|
||||
# postgres doesn't allow null bytes in text columns.
|
||||
if room_name is None or (
|
||||
isinstance(room_name, str)
|
||||
# We ignore values with null bytes as Postgres doesn't allow them in
|
||||
# text columns.
|
||||
and "\0" not in room_name
|
||||
):
|
||||
sliding_sync_insert_map["room_name"] = room_name
|
||||
elif state_key == (EventTypes.Tombstone, ""):
|
||||
successor_room_id = event.content.get(
|
||||
EventContentFields.TOMBSTONE_SUCCESSOR_ROOM
|
||||
)
|
||||
# Scrutinize JSON values
|
||||
if successor_room_id is None or isinstance(successor_room_id, str):
|
||||
if successor_room_id is None or (
|
||||
isinstance(successor_room_id, str)
|
||||
# We ignore values with null bytes as Postgres doesn't allow them in
|
||||
# text columns.
|
||||
and "\0" not in successor_room_id
|
||||
):
|
||||
sliding_sync_insert_map["tombstone_successor_room_id"] = (
|
||||
successor_room_id
|
||||
)
|
||||
|
@ -2081,6 +2097,21 @@ class PersistEventsStore:
|
|||
else None
|
||||
)
|
||||
|
||||
# Check for null bytes in the room name and type. We have to
|
||||
# ignore values with null bytes as Postgres doesn't allow them
|
||||
# in text columns.
|
||||
if (
|
||||
sliding_sync_insert_map["room_name"] is not None
|
||||
and "\0" in sliding_sync_insert_map["room_name"]
|
||||
):
|
||||
sliding_sync_insert_map.pop("room_name")
|
||||
|
||||
if (
|
||||
sliding_sync_insert_map["room_type"] is not None
|
||||
and "\0" in sliding_sync_insert_map["room_type"]
|
||||
):
|
||||
sliding_sync_insert_map.pop("room_type")
|
||||
|
||||
# Find the tombstone_successor_room_id
|
||||
# Note: This isn't one of the stripped state events according to the spec
|
||||
# but seems like there is no reason not to support this kind of thing.
|
||||
|
@ -2095,6 +2126,12 @@ class PersistEventsStore:
|
|||
else None
|
||||
)
|
||||
|
||||
if (
|
||||
sliding_sync_insert_map["tombstone_successor_room_id"] is not None
|
||||
and "\0" in sliding_sync_insert_map["tombstone_successor_room_id"]
|
||||
):
|
||||
sliding_sync_insert_map.pop("tombstone_successor_room_id")
|
||||
|
||||
else:
|
||||
# No stripped state provided
|
||||
sliding_sync_insert_map["has_known_state"] = False
|
||||
|
|
|
@ -47,10 +47,12 @@ from synapse.storage.databases.main.events_worker import (
|
|||
)
|
||||
from synapse.storage.databases.main.state_deltas import StateDeltasStore
|
||||
from synapse.storage.databases.main.stream import StreamWorkerStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection
|
||||
from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.types.storage import _BackgroundUpdates
|
||||
from synapse.util import json_encoder
|
||||
from synapse.util.iterutils import batch_iter
|
||||
|
||||
|
@ -75,34 +77,6 @@ _REPLACE_STREAM_ORDERING_SQL_COMMANDS = (
|
|||
)
|
||||
|
||||
|
||||
class _BackgroundUpdates:
|
||||
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
|
||||
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
|
||||
DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
|
||||
POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2"
|
||||
INDEX_STREAM_ORDERING2 = "index_stream_ordering2"
|
||||
INDEX_STREAM_ORDERING2_CONTAINS_URL = "index_stream_ordering2_contains_url"
|
||||
INDEX_STREAM_ORDERING2_ROOM_ORDER = "index_stream_ordering2_room_order"
|
||||
INDEX_STREAM_ORDERING2_ROOM_STREAM = "index_stream_ordering2_room_stream"
|
||||
INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts"
|
||||
REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"
|
||||
|
||||
EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows"
|
||||
EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index"
|
||||
|
||||
EVENTS_POPULATE_STATE_KEY_REJECTIONS = "events_populate_state_key_rejections"
|
||||
|
||||
EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index"
|
||||
|
||||
SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE = (
|
||||
"sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update"
|
||||
)
|
||||
SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE = "sliding_sync_joined_rooms_bg_update"
|
||||
SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE = (
|
||||
"sliding_sync_membership_snapshots_bg_update"
|
||||
)
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class _CalculateChainCover:
|
||||
"""Return value for _calculate_chain_cover_txn."""
|
||||
|
@ -1877,9 +1851,29 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||
def _find_memberships_to_update_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> List[
|
||||
Tuple[str, Optional[str], str, str, str, str, int, Optional[str], bool]
|
||||
Tuple[
|
||||
str,
|
||||
Optional[str],
|
||||
Optional[str],
|
||||
str,
|
||||
str,
|
||||
str,
|
||||
str,
|
||||
int,
|
||||
Optional[str],
|
||||
bool,
|
||||
]
|
||||
]:
|
||||
# Fetch the set of event IDs that we want to update
|
||||
#
|
||||
# We skip over rows which we've already handled, i.e. have a
|
||||
# matching row in `sliding_sync_membership_snapshots` with the same
|
||||
# room, user and event ID.
|
||||
#
|
||||
# We also ignore rooms that the user has left themselves (i.e. not
|
||||
# kicked). This is to avoid having to port lots of old rooms that we
|
||||
# will never send down sliding sync (as we exclude such rooms from
|
||||
# initial syncs).
|
||||
|
||||
if initial_phase:
|
||||
# There are some old out-of-band memberships (before
|
||||
|
@ -1892,6 +1886,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||
SELECT
|
||||
c.room_id,
|
||||
r.room_id,
|
||||
r.room_version,
|
||||
c.user_id,
|
||||
e.sender,
|
||||
c.event_id,
|
||||
|
@ -1900,9 +1895,11 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||
e.instance_name,
|
||||
e.outlier
|
||||
FROM local_current_membership AS c
|
||||
LEFT JOIN sliding_sync_membership_snapshots AS m USING (room_id, user_id)
|
||||
INNER JOIN events AS e USING (event_id)
|
||||
LEFT JOIN rooms AS r ON (c.room_id = r.room_id)
|
||||
WHERE (c.room_id, c.user_id) > (?, ?)
|
||||
AND (m.user_id IS NULL OR c.event_id != m.membership_event_id)
|
||||
ORDER BY c.room_id ASC, c.user_id ASC
|
||||
LIMIT ?
|
||||
""",
|
||||
|
@ -1922,7 +1919,8 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||
"""
|
||||
SELECT
|
||||
c.room_id,
|
||||
c.room_id,
|
||||
r.room_id,
|
||||
r.room_version,
|
||||
c.user_id,
|
||||
e.sender,
|
||||
c.event_id,
|
||||
|
@ -1931,9 +1929,12 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||
e.instance_name,
|
||||
e.outlier
|
||||
FROM local_current_membership AS c
|
||||
LEFT JOIN sliding_sync_membership_snapshots AS m USING (room_id, user_id)
|
||||
INNER JOIN events AS e USING (event_id)
|
||||
WHERE event_stream_ordering > ?
|
||||
ORDER BY event_stream_ordering ASC
|
||||
LEFT JOIN rooms AS r ON (c.room_id = r.room_id)
|
||||
WHERE c.event_stream_ordering > ?
|
||||
AND (m.user_id IS NULL OR c.event_id != m.membership_event_id)
|
||||
ORDER BY c.event_stream_ordering ASC
|
||||
LIMIT ?
|
||||
""",
|
||||
(last_event_stream_ordering, batch_size),
|
||||
|
@ -1944,7 +1945,16 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||
memberships_to_update_rows = cast(
|
||||
List[
|
||||
Tuple[
|
||||
str, Optional[str], str, str, str, str, int, Optional[str], bool
|
||||
str,
|
||||
Optional[str],
|
||||
Optional[str],
|
||||
str,
|
||||
str,
|
||||
str,
|
||||
str,
|
||||
int,
|
||||
Optional[str],
|
||||
bool,
|
||||
]
|
||||
],
|
||||
txn.fetchall(),
|
||||
|
@ -1977,7 +1987,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||
|
||||
def _find_previous_invite_or_knock_membership_txn(
|
||||
txn: LoggingTransaction, room_id: str, user_id: str, event_id: str
|
||||
) -> Tuple[str, str]:
|
||||
) -> Optional[Tuple[str, str]]:
|
||||
# Find the previous invite/knock event before the leave event
|
||||
#
|
||||
# Here are some notes on how we landed on this query:
|
||||
|
@ -2027,8 +2037,13 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||
)
|
||||
row = txn.fetchone()
|
||||
|
||||
# We should see a corresponding previous invite/knock event
|
||||
assert row is not None
|
||||
if row is None:
|
||||
# Generally we should have an invite or knock event for leaves
|
||||
# that are outliers, however this may not always be the case
|
||||
# (e.g. a local user got kicked but the kick event got pulled in
|
||||
# as an outlier).
|
||||
return None
|
||||
|
||||
event_id, membership = row
|
||||
|
||||
return event_id, membership
|
||||
|
@ -2043,6 +2058,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||
for (
|
||||
room_id,
|
||||
room_id_from_rooms_table,
|
||||
room_version_id,
|
||||
user_id,
|
||||
sender,
|
||||
membership_event_id,
|
||||
|
@ -2061,6 +2077,14 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||
Membership.BAN,
|
||||
)
|
||||
|
||||
if (
|
||||
room_version_id is not None
|
||||
and room_version_id not in KNOWN_ROOM_VERSIONS
|
||||
):
|
||||
# Ignore rooms with unknown room versions (these were
|
||||
# experimental rooms, that we no longer support).
|
||||
continue
|
||||
|
||||
# There are some old out-of-band memberships (before
|
||||
# https://github.com/matrix-org/synapse/issues/6983) where we don't have the
|
||||
# corresponding room stored in the `rooms` table`. We have a `FOREIGN KEY`
|
||||
|
@ -2148,14 +2172,17 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||
# in the events table though. We'll just say that we don't
|
||||
# know the state for these rooms and continue on with our
|
||||
# day.
|
||||
sliding_sync_membership_snapshots_insert_map["has_known_state"] = (
|
||||
False
|
||||
)
|
||||
sliding_sync_membership_snapshots_insert_map = {
|
||||
"has_known_state": False,
|
||||
"room_type": None,
|
||||
"room_name": None,
|
||||
"is_encrypted": False,
|
||||
}
|
||||
elif membership in (Membership.INVITE, Membership.KNOCK) or (
|
||||
membership in (Membership.LEAVE, Membership.BAN) and is_outlier
|
||||
):
|
||||
invite_or_knock_event_id = membership_event_id
|
||||
invite_or_knock_membership = membership
|
||||
invite_or_knock_event_id = None
|
||||
invite_or_knock_membership = None
|
||||
|
||||
# If the event is an `out_of_band_membership` (special case of
|
||||
# `outlier`), we never had historical state so we have to pull from
|
||||
|
@ -2164,35 +2191,55 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||
# membership (i.e. the room shouldn't disappear if your using the
|
||||
# `is_encrypted` filter and you leave).
|
||||
if membership in (Membership.LEAVE, Membership.BAN) and is_outlier:
|
||||
(
|
||||
invite_or_knock_event_id,
|
||||
invite_or_knock_membership,
|
||||
) = await self.db_pool.runInteraction(
|
||||
previous_membership = await self.db_pool.runInteraction(
|
||||
"sliding_sync_membership_snapshots_bg_update._find_previous_invite_or_knock_membership_txn",
|
||||
_find_previous_invite_or_knock_membership_txn,
|
||||
room_id,
|
||||
user_id,
|
||||
membership_event_id,
|
||||
)
|
||||
if previous_membership is not None:
|
||||
(
|
||||
invite_or_knock_event_id,
|
||||
invite_or_knock_membership,
|
||||
) = previous_membership
|
||||
else:
|
||||
invite_or_knock_event_id = membership_event_id
|
||||
invite_or_knock_membership = membership
|
||||
|
||||
# Pull from the stripped state on the invite/knock event
|
||||
invite_or_knock_event = await self.get_event(invite_or_knock_event_id)
|
||||
|
||||
raw_stripped_state_events = None
|
||||
if invite_or_knock_membership == Membership.INVITE:
|
||||
invite_room_state = invite_or_knock_event.unsigned.get(
|
||||
"invite_room_state"
|
||||
if (
|
||||
invite_or_knock_event_id is not None
|
||||
and invite_or_knock_membership is not None
|
||||
):
|
||||
# Pull from the stripped state on the invite/knock event
|
||||
invite_or_knock_event = await self.get_event(
|
||||
invite_or_knock_event_id
|
||||
)
|
||||
raw_stripped_state_events = invite_room_state
|
||||
elif invite_or_knock_membership == Membership.KNOCK:
|
||||
knock_room_state = invite_or_knock_event.unsigned.get(
|
||||
"knock_room_state"
|
||||
)
|
||||
raw_stripped_state_events = knock_room_state
|
||||
|
||||
sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_stripped_state(
|
||||
raw_stripped_state_events
|
||||
)
|
||||
raw_stripped_state_events = None
|
||||
if invite_or_knock_membership == Membership.INVITE:
|
||||
invite_room_state = invite_or_knock_event.unsigned.get(
|
||||
"invite_room_state"
|
||||
)
|
||||
raw_stripped_state_events = invite_room_state
|
||||
elif invite_or_knock_membership == Membership.KNOCK:
|
||||
knock_room_state = invite_or_knock_event.unsigned.get(
|
||||
"knock_room_state"
|
||||
)
|
||||
raw_stripped_state_events = knock_room_state
|
||||
|
||||
sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_stripped_state(
|
||||
raw_stripped_state_events
|
||||
)
|
||||
else:
|
||||
# We couldn't find any state for the membership, so we just have to
|
||||
# leave it as empty.
|
||||
sliding_sync_membership_snapshots_insert_map = {
|
||||
"has_known_state": False,
|
||||
"room_type": None,
|
||||
"room_name": None,
|
||||
"is_encrypted": False,
|
||||
}
|
||||
|
||||
# We should have some insert values for each room, even if no
|
||||
# stripped state is on the event because we still want to record
|
||||
|
@ -2311,19 +2358,42 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||
)
|
||||
# We need to find the `forgotten` value during the transaction because
|
||||
# we can't risk inserting stale data.
|
||||
txn.execute(
|
||||
"""
|
||||
UPDATE sliding_sync_membership_snapshots
|
||||
SET
|
||||
forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?)
|
||||
WHERE room_id = ? and user_id = ?
|
||||
""",
|
||||
(
|
||||
membership_event_id,
|
||||
room_id,
|
||||
user_id,
|
||||
),
|
||||
)
|
||||
if isinstance(txn.database_engine, PostgresEngine):
|
||||
txn.execute(
|
||||
"""
|
||||
UPDATE sliding_sync_membership_snapshots
|
||||
SET
|
||||
forgotten = m.forgotten
|
||||
FROM room_memberships AS m
|
||||
WHERE sliding_sync_membership_snapshots.room_id = ?
|
||||
AND sliding_sync_membership_snapshots.user_id = ?
|
||||
AND membership_event_id = ?
|
||||
AND membership_event_id = m.event_id
|
||||
AND m.event_id IS NOT NULL
|
||||
""",
|
||||
(
|
||||
room_id,
|
||||
user_id,
|
||||
membership_event_id,
|
||||
),
|
||||
)
|
||||
else:
|
||||
# SQLite doesn't support UPDATE FROM before 3.33.0, so we do
|
||||
# this via sub-selects.
|
||||
txn.execute(
|
||||
"""
|
||||
UPDATE sliding_sync_membership_snapshots
|
||||
SET
|
||||
forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?)
|
||||
WHERE room_id = ? and user_id = ? AND membership_event_id = ?
|
||||
""",
|
||||
(
|
||||
membership_event_id,
|
||||
room_id,
|
||||
user_id,
|
||||
membership_event_id,
|
||||
),
|
||||
)
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"sliding_sync_membership_snapshots_bg_update", _fill_table_txn
|
||||
|
@ -2333,6 +2403,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||
(
|
||||
room_id,
|
||||
_room_id_from_rooms_table,
|
||||
_room_version_id,
|
||||
user_id,
|
||||
_sender,
|
||||
_membership_event_id,
|
||||
|
|
|
@ -83,6 +83,7 @@ from synapse.storage.util.id_generators import (
|
|||
from synapse.storage.util.sequence import build_sequence_generator
|
||||
from synapse.types import JsonDict, get_domain_from_id
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.types.storage import _BackgroundUpdates
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
|
@ -2465,3 +2466,14 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
self.invalidate_get_event_cache_after_txn(txn, event_id)
|
||||
|
||||
async def have_finished_sliding_sync_background_jobs(self) -> bool:
|
||||
"""Return if it's safe to use the sliding sync membership tables."""
|
||||
|
||||
return await self.db_pool.updates.have_completed_background_updates(
|
||||
(
|
||||
_BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE,
|
||||
_BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE,
|
||||
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
|
||||
)
|
||||
)
|
||||
|
|
|
@ -51,7 +51,6 @@ from synapse.storage.database import (
|
|||
LoggingTransaction,
|
||||
)
|
||||
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
|
||||
from synapse.storage.databases.main.events_bg_updates import _BackgroundUpdates
|
||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||
from synapse.storage.engines import Sqlite3Engine
|
||||
from synapse.storage.roommember import (
|
||||
|
@ -312,18 +311,10 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
|||
# We do this all in one transaction to keep the cache small.
|
||||
# FIXME: get rid of this when we have room_stats
|
||||
|
||||
# Note, rejected events will have a null membership field, so
|
||||
# we we manually filter them out.
|
||||
sql = """
|
||||
SELECT count(*), membership FROM current_state_events
|
||||
WHERE type = 'm.room.member' AND room_id = ?
|
||||
AND membership IS NOT NULL
|
||||
GROUP BY membership
|
||||
"""
|
||||
counts = self._get_member_counts_txn(txn, room_id)
|
||||
|
||||
txn.execute(sql, (room_id,))
|
||||
res: Dict[str, MemberSummary] = {}
|
||||
for count, membership in txn:
|
||||
for membership, count in counts.items():
|
||||
res.setdefault(membership, MemberSummary([], count))
|
||||
|
||||
# Order by membership (joins -> invites -> leave (former insiders) ->
|
||||
|
@ -369,6 +360,31 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
|||
"get_room_summary", _get_room_summary_txn
|
||||
)
|
||||
|
||||
@cached()
|
||||
async def get_member_counts(self, room_id: str) -> Mapping[str, int]:
|
||||
"""Get a mapping of number of users by membership"""
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_member_counts", self._get_member_counts_txn, room_id
|
||||
)
|
||||
|
||||
def _get_member_counts_txn(
|
||||
self, txn: LoggingTransaction, room_id: str
|
||||
) -> Dict[str, int]:
|
||||
"""Get a mapping of number of users by membership"""
|
||||
|
||||
# Note, rejected events will have a null membership field, so
|
||||
# we we manually filter them out.
|
||||
sql = """
|
||||
SELECT count(*), membership FROM current_state_events
|
||||
WHERE type = 'm.room.member' AND room_id = ?
|
||||
AND membership IS NOT NULL
|
||||
GROUP BY membership
|
||||
"""
|
||||
|
||||
txn.execute(sql, (room_id,))
|
||||
return {membership: count for count, membership in txn}
|
||||
|
||||
@cached()
|
||||
async def get_number_joined_users_in_room(self, room_id: str) -> int:
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
|
@ -1348,6 +1364,9 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
|||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_forgotten_rooms_for_user, (user_id,)
|
||||
)
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_sliding_sync_rooms_for_user, (user_id,)
|
||||
)
|
||||
|
||||
await self.db_pool.runInteraction("forget_membership", f)
|
||||
|
||||
|
@ -1393,6 +1412,10 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
|||
def get_sliding_sync_rooms_for_user_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Dict[str, RoomsForUserSlidingSync]:
|
||||
# XXX: If you use any new columns that can change (like from
|
||||
# `sliding_sync_joined_rooms` or `forgotten`), make sure to bust the
|
||||
# `get_sliding_sync_rooms_for_user` cache in the appropriate places (and add
|
||||
# tests).
|
||||
sql = """
|
||||
SELECT m.room_id, m.sender, m.membership, m.membership_event_id,
|
||||
r.room_version,
|
||||
|
@ -1415,7 +1438,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
|||
room_version_id=row[4],
|
||||
event_pos=PersistedEventPosition(row[5], row[6]),
|
||||
room_type=row[7],
|
||||
is_encrypted=row[8],
|
||||
is_encrypted=bool(row[8]),
|
||||
)
|
||||
for row in txn
|
||||
}
|
||||
|
@ -1425,17 +1448,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
|||
get_sliding_sync_rooms_for_user_txn,
|
||||
)
|
||||
|
||||
async def have_finished_sliding_sync_background_jobs(self) -> bool:
|
||||
"""Return if it's safe to use the sliding sync membership tables."""
|
||||
|
||||
return await self.db_pool.updates.have_completed_background_updates(
|
||||
(
|
||||
_BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE,
|
||||
_BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE,
|
||||
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
class RoomMemberBackgroundUpdateStore(SQLBaseStore):
|
||||
def __init__(
|
||||
|
|
|
@ -736,6 +736,7 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
|
|||
CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
|
||||
EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index"
|
||||
DELETE_CURRENT_STATE_UPDATE_NAME = "delete_old_current_state_events"
|
||||
MEMBERS_CURRENT_STATE_UPDATE_NAME = "current_state_events_members_room_index"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
@ -764,6 +765,13 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
|
|||
self.DELETE_CURRENT_STATE_UPDATE_NAME,
|
||||
self._background_remove_left_rooms,
|
||||
)
|
||||
self.db_pool.updates.register_background_index_update(
|
||||
self.MEMBERS_CURRENT_STATE_UPDATE_NAME,
|
||||
index_name="current_state_events_members_room_index",
|
||||
table="current_state_events",
|
||||
columns=["room_id", "membership"],
|
||||
where_clause="type='m.room.member'",
|
||||
)
|
||||
|
||||
async def _background_remove_left_rooms(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
|
|
|
@ -1524,7 +1524,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
# majority of rooms will have a latest token from before the min stream
|
||||
# pos.
|
||||
|
||||
def bulk_get_max_event_pos_txn(
|
||||
def bulk_get_max_event_pos_fallback_txn(
|
||||
txn: LoggingTransaction, batched_room_ids: StrCollection
|
||||
) -> Dict[str, int]:
|
||||
clause, args = make_in_list_sql_clause(
|
||||
|
@ -1547,11 +1547,37 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
txn.execute(sql, [max_pos] + args)
|
||||
return {row[0]: row[1] for row in txn}
|
||||
|
||||
# It's easier to look at the `sliding_sync_joined_rooms` table and avoid all of
|
||||
# the joins and sub-queries.
|
||||
def bulk_get_max_event_pos_from_sliding_sync_tables_txn(
|
||||
txn: LoggingTransaction, batched_room_ids: StrCollection
|
||||
) -> Dict[str, int]:
|
||||
clause, args = make_in_list_sql_clause(
|
||||
self.database_engine, "room_id", batched_room_ids
|
||||
)
|
||||
sql = f"""
|
||||
SELECT room_id, event_stream_ordering
|
||||
FROM sliding_sync_joined_rooms
|
||||
WHERE {clause}
|
||||
ORDER BY event_stream_ordering DESC
|
||||
"""
|
||||
txn.execute(sql, args)
|
||||
return {row[0]: row[1] for row in txn}
|
||||
|
||||
recheck_rooms: Set[str] = set()
|
||||
for batched in batch_iter(room_ids, 1000):
|
||||
batch_results = await self.db_pool.runInteraction(
|
||||
"_bulk_get_max_event_pos", bulk_get_max_event_pos_txn, batched
|
||||
)
|
||||
if await self.have_finished_sliding_sync_background_jobs():
|
||||
batch_results = await self.db_pool.runInteraction(
|
||||
"bulk_get_max_event_pos_from_sliding_sync_tables_txn",
|
||||
bulk_get_max_event_pos_from_sliding_sync_tables_txn,
|
||||
batched,
|
||||
)
|
||||
else:
|
||||
batch_results = await self.db_pool.runInteraction(
|
||||
"bulk_get_max_event_pos_fallback_txn",
|
||||
bulk_get_max_event_pos_fallback_txn,
|
||||
batched,
|
||||
)
|
||||
for room_id, stream_ordering in batch_results.items():
|
||||
if stream_ordering <= now_token.stream:
|
||||
results.update(batch_results)
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
--
|
||||
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
--
|
||||
-- Copyright (C) 2024 New Vector, Ltd
|
||||
--
|
||||
-- This program is free software: you can redistribute it and/or modify
|
||||
-- it under the terms of the GNU Affero General Public License as
|
||||
-- published by the Free Software Foundation, either version 3 of the
|
||||
-- License, or (at your option) any later version.
|
||||
--
|
||||
-- See the GNU Affero General Public License for more details:
|
||||
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
|
||||
-- Add a background updates to add a new index:
|
||||
-- `current_state_events(room_id, membership) WHERE type = 'm.room.member'
|
||||
-- This makes counting membership in rooms (for syncs) much faster
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||
(8701, 'current_state_events_members_room_index', '{}');
|
|
@ -37,23 +37,20 @@ from typing import (
|
|||
|
||||
import attr
|
||||
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
from synapse._pydantic_compat import Extra
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.types import MultiWriterStreamToken, RoomStreamToken, StrCollection, UserID
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import Extra
|
||||
else:
|
||||
from pydantic import Extra
|
||||
|
||||
from synapse.events import EventBase
|
||||
from synapse.types import (
|
||||
DeviceListUpdates,
|
||||
JsonDict,
|
||||
JsonMapping,
|
||||
MultiWriterStreamToken,
|
||||
Requester,
|
||||
RoomStreamToken,
|
||||
SlidingSyncStreamToken,
|
||||
StrCollection,
|
||||
StreamToken,
|
||||
UserID,
|
||||
)
|
||||
from synapse.types.rest.client import SlidingSyncBody
|
||||
|
||||
|
|
|
@ -18,14 +18,7 @@
|
|||
# [This file includes modifications made by New Vector Limited]
|
||||
#
|
||||
#
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import BaseModel, Extra
|
||||
else:
|
||||
from pydantic import BaseModel, Extra
|
||||
from synapse._pydantic_compat import BaseModel, Extra
|
||||
|
||||
|
||||
class RequestBodyModel(BaseModel):
|
||||
|
|
|
@ -20,29 +20,15 @@
|
|||
#
|
||||
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union
|
||||
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import (
|
||||
Extra,
|
||||
StrictBool,
|
||||
StrictInt,
|
||||
StrictStr,
|
||||
conint,
|
||||
constr,
|
||||
validator,
|
||||
)
|
||||
else:
|
||||
from pydantic import (
|
||||
Extra,
|
||||
StrictBool,
|
||||
StrictInt,
|
||||
StrictStr,
|
||||
conint,
|
||||
constr,
|
||||
validator,
|
||||
)
|
||||
|
||||
from synapse._pydantic_compat import (
|
||||
Extra,
|
||||
StrictBool,
|
||||
StrictInt,
|
||||
StrictStr,
|
||||
conint,
|
||||
constr,
|
||||
validator,
|
||||
)
|
||||
from synapse.types.rest import RequestBodyModel
|
||||
from synapse.util.threepids import validate_email
|
||||
|
||||
|
@ -384,7 +370,7 @@ class SlidingSyncBody(RequestBodyModel):
|
|||
receipts: Optional[ReceiptsExtension] = None
|
||||
typing: Optional[TypingExtension] = None
|
||||
|
||||
conn_id: Optional[str]
|
||||
conn_id: Optional[StrictStr]
|
||||
|
||||
# mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
|
||||
if TYPE_CHECKING:
|
||||
|
|
47
synapse/types/storage/__init__.py
Normal file
47
synapse/types/storage/__init__.py
Normal file
|
@ -0,0 +1,47 @@
|
|||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2024 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
# Originally licensed under the Apache License, Version 2.0:
|
||||
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
||||
#
|
||||
# [This file includes modifications made by New Vector Limited]
|
||||
#
|
||||
#
|
||||
|
||||
|
||||
class _BackgroundUpdates:
|
||||
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
|
||||
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
|
||||
DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
|
||||
POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2"
|
||||
INDEX_STREAM_ORDERING2 = "index_stream_ordering2"
|
||||
INDEX_STREAM_ORDERING2_CONTAINS_URL = "index_stream_ordering2_contains_url"
|
||||
INDEX_STREAM_ORDERING2_ROOM_ORDER = "index_stream_ordering2_room_order"
|
||||
INDEX_STREAM_ORDERING2_ROOM_STREAM = "index_stream_ordering2_room_stream"
|
||||
INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts"
|
||||
REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"
|
||||
|
||||
EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows"
|
||||
EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index"
|
||||
|
||||
EVENTS_POPULATE_STATE_KEY_REJECTIONS = "events_populate_state_key_rejections"
|
||||
|
||||
EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index"
|
||||
|
||||
SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE = (
|
||||
"sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update"
|
||||
)
|
||||
SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE = "sliding_sync_joined_rooms_bg_update"
|
||||
SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE = (
|
||||
"sliding_sync_membership_snapshots_bg_update"
|
||||
)
|
|
@ -371,14 +371,17 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
|
|||
"mxc://UPDATED_DUMMY_MEDIA_ID",
|
||||
response_body["rooms"][room_id1],
|
||||
)
|
||||
self.assertEqual(
|
||||
response_body["rooms"][room_id1]["joined_count"],
|
||||
1,
|
||||
|
||||
# We don't give extra room information to invitees
|
||||
self.assertNotIn(
|
||||
"joined_count",
|
||||
response_body["rooms"][room_id1],
|
||||
)
|
||||
self.assertEqual(
|
||||
response_body["rooms"][room_id1]["invited_count"],
|
||||
1,
|
||||
self.assertNotIn(
|
||||
"invited_count",
|
||||
response_body["rooms"][room_id1],
|
||||
)
|
||||
|
||||
self.assertIsNone(
|
||||
response_body["rooms"][room_id1].get("is_dm"),
|
||||
)
|
||||
|
@ -450,15 +453,16 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
|
|||
"mxc://DUMMY_MEDIA_ID",
|
||||
response_body["rooms"][room_id1],
|
||||
)
|
||||
self.assertEqual(
|
||||
response_body["rooms"][room_id1]["joined_count"],
|
||||
# FIXME: The actual number should be "1" (user2) but we currently don't
|
||||
# support this for rooms where the user has left/been banned.
|
||||
0,
|
||||
|
||||
# FIXME: We possibly want to return joined and invited counts for rooms
|
||||
# you're banned form
|
||||
self.assertNotIn(
|
||||
"joined_count",
|
||||
response_body["rooms"][room_id1],
|
||||
)
|
||||
self.assertEqual(
|
||||
response_body["rooms"][room_id1]["invited_count"],
|
||||
0,
|
||||
self.assertNotIn(
|
||||
"invited_count",
|
||||
response_body["rooms"][room_id1],
|
||||
)
|
||||
self.assertIsNone(
|
||||
response_body["rooms"][room_id1].get("is_dm"),
|
||||
|
@ -692,19 +696,15 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
|
|||
[],
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
response_body["rooms"][room_id1]["joined_count"],
|
||||
# FIXME: The actual number should be "1" (user2) but we currently don't
|
||||
# support this for rooms where the user has left/been banned.
|
||||
0,
|
||||
# FIXME: We possibly want to return joined and invited counts for rooms
|
||||
# you're banned form
|
||||
self.assertNotIn(
|
||||
"joined_count",
|
||||
response_body["rooms"][room_id1],
|
||||
)
|
||||
self.assertEqual(
|
||||
response_body["rooms"][room_id1]["invited_count"],
|
||||
# We shouldn't see user5 since they were invited after user1 was banned.
|
||||
#
|
||||
# FIXME: The actual number should be "1" (user3) but we currently don't
|
||||
# support this for rooms where the user has left/been banned.
|
||||
0,
|
||||
self.assertNotIn(
|
||||
"invited_count",
|
||||
response_body["rooms"][room_id1],
|
||||
)
|
||||
|
||||
def test_rooms_meta_heroes_incremental_sync_no_change(self) -> None:
|
||||
|
|
|
@ -722,43 +722,37 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
|||
self.helper.join(space_room_id, user1_id, tok=user1_tok)
|
||||
|
||||
# Make an initial Sliding Sync request
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint,
|
||||
{
|
||||
"lists": {
|
||||
"all-list": {
|
||||
"ranges": [[0, 99]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 0,
|
||||
"filters": {},
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"all-list": {
|
||||
"ranges": [[0, 99]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 0,
|
||||
"filters": {},
|
||||
},
|
||||
"foo-list": {
|
||||
"ranges": [[0, 99]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 1,
|
||||
"filters": {
|
||||
"is_encrypted": True,
|
||||
"room_types": [RoomTypes.SPACE],
|
||||
},
|
||||
"foo-list": {
|
||||
"ranges": [[0, 99]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 1,
|
||||
"filters": {
|
||||
"is_encrypted": True,
|
||||
"room_types": [RoomTypes.SPACE],
|
||||
},
|
||||
},
|
||||
}
|
||||
},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
from_token = channel.json_body["pos"]
|
||||
},
|
||||
}
|
||||
}
|
||||
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
# Make sure the response has the lists we requested
|
||||
self.assertListEqual(
|
||||
list(channel.json_body["lists"].keys()),
|
||||
list(response_body["lists"].keys()),
|
||||
["all-list", "foo-list"],
|
||||
channel.json_body["lists"].keys(),
|
||||
response_body["lists"].keys(),
|
||||
)
|
||||
|
||||
# Make sure the lists have the correct rooms
|
||||
self.assertListEqual(
|
||||
list(channel.json_body["lists"]["all-list"]["ops"]),
|
||||
list(response_body["lists"]["all-list"]["ops"]),
|
||||
[
|
||||
{
|
||||
"op": "SYNC",
|
||||
|
@ -768,7 +762,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
|||
],
|
||||
)
|
||||
self.assertListEqual(
|
||||
list(channel.json_body["lists"]["foo-list"]["ops"]),
|
||||
list(response_body["lists"]["foo-list"]["ops"]),
|
||||
[
|
||||
{
|
||||
"op": "SYNC",
|
||||
|
@ -783,35 +777,30 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
|||
self.helper.leave(space_room_id, user2_id, tok=user2_tok)
|
||||
|
||||
# Make an incremental Sliding Sync request
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint + f"?pos={from_token}",
|
||||
{
|
||||
"lists": {
|
||||
"all-list": {
|
||||
"ranges": [[0, 99]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 0,
|
||||
"filters": {},
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"all-list": {
|
||||
"ranges": [[0, 99]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 0,
|
||||
"filters": {},
|
||||
},
|
||||
"foo-list": {
|
||||
"ranges": [[0, 99]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 1,
|
||||
"filters": {
|
||||
"is_encrypted": True,
|
||||
"room_types": [RoomTypes.SPACE],
|
||||
},
|
||||
"foo-list": {
|
||||
"ranges": [[0, 99]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 1,
|
||||
"filters": {
|
||||
"is_encrypted": True,
|
||||
"room_types": [RoomTypes.SPACE],
|
||||
},
|
||||
},
|
||||
}
|
||||
},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
},
|
||||
}
|
||||
}
|
||||
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
||||
|
||||
# Make sure the lists have the correct rooms even though we `newly_left`
|
||||
self.assertListEqual(
|
||||
list(channel.json_body["lists"]["all-list"]["ops"]),
|
||||
list(response_body["lists"]["all-list"]["ops"]),
|
||||
[
|
||||
{
|
||||
"op": "SYNC",
|
||||
|
@ -821,7 +810,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
|||
],
|
||||
)
|
||||
self.assertListEqual(
|
||||
list(channel.json_body["lists"]["foo-list"]["ops"]),
|
||||
list(response_body["lists"]["foo-list"]["ops"]),
|
||||
[
|
||||
{
|
||||
"op": "SYNC",
|
||||
|
@ -831,6 +820,98 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
|||
],
|
||||
)
|
||||
|
||||
def test_filter_is_encrypted_up_to_date(self) -> None:
|
||||
"""
|
||||
Make sure we get up-to-date `is_encrypted` status for a joined room
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
|
||||
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 99]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 0,
|
||||
"filters": {
|
||||
"is_encrypted": True,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||
self.assertIncludes(
|
||||
set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]),
|
||||
set(),
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Update the encryption status
|
||||
self.helper.send_state(
|
||||
room_id,
|
||||
EventTypes.RoomEncryption,
|
||||
{EventContentFields.ENCRYPTION_ALGORITHM: "m.megolm.v1.aes-sha2"},
|
||||
tok=user1_tok,
|
||||
)
|
||||
|
||||
# We should see the room now because it's encrypted
|
||||
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
||||
self.assertIncludes(
|
||||
set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]),
|
||||
{room_id},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
def test_forgotten_up_to_date(self) -> None:
|
||||
"""
|
||||
Make sure we get up-to-date `forgotten` status for rooms
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
|
||||
room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
|
||||
# User1 is banned from the room (was never in the room)
|
||||
self.helper.ban(room_id, src=user2_id, targ=user1_id, tok=user2_tok)
|
||||
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 99]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 0,
|
||||
"filters": {},
|
||||
},
|
||||
}
|
||||
}
|
||||
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||
self.assertIncludes(
|
||||
set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]),
|
||||
{room_id},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# User1 forgets the room
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
f"/_matrix/client/r0/rooms/{room_id}/forget",
|
||||
content={},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.result)
|
||||
|
||||
# We should no longer see the forgotten room
|
||||
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
||||
self.assertIncludes(
|
||||
set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]),
|
||||
set(),
|
||||
exact=True,
|
||||
)
|
||||
|
||||
def test_sort_list(self) -> None:
|
||||
"""
|
||||
Test that the `lists` are sorted by `stream_ordering`
|
||||
|
|
|
@ -19,18 +19,12 @@
|
|||
#
|
||||
#
|
||||
import unittest as stdlib_unittest
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from typing_extensions import Literal
|
||||
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
from synapse._pydantic_compat import BaseModel, ValidationError
|
||||
from synapse.types.rest.client import EmailRequestTokenBody
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import BaseModel, ValidationError
|
||||
else:
|
||||
from pydantic import BaseModel, ValidationError
|
||||
|
||||
|
||||
class ThreepidMediumEnumTestCase(stdlib_unittest.TestCase):
|
||||
class Model(BaseModel):
|
||||
|
|
|
@ -34,11 +34,11 @@ from synapse.rest.client import login, room
|
|||
from synapse.server import HomeServer
|
||||
from synapse.storage.databases.main.events import DeltaState
|
||||
from synapse.storage.databases.main.events_bg_updates import (
|
||||
_BackgroundUpdates,
|
||||
_resolve_stale_data_in_sliding_sync_joined_rooms_table,
|
||||
_resolve_stale_data_in_sliding_sync_membership_snapshots_table,
|
||||
)
|
||||
from synapse.types import create_requester
|
||||
from synapse.types.storage import _BackgroundUpdates
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests.test_utils.event_injection import create_event
|
||||
|
@ -4416,136 +4416,6 @@ class SlidingSyncTablesBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase):
|
|||
),
|
||||
)
|
||||
|
||||
def test_membership_snapshots_background_update_forgotten_partial(self) -> None:
|
||||
"""
|
||||
Test an existing `sliding_sync_membership_snapshots` row is updated with the
|
||||
latest `forgotten` status after the background update passes over it.
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
|
||||
room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
|
||||
# User1 joins the room
|
||||
self.helper.join(room_id, user1_id, tok=user1_tok)
|
||||
# User1 leaves the room (we have to leave in order to forget the room)
|
||||
self.helper.leave(room_id, user1_id, tok=user1_tok)
|
||||
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id)
|
||||
)
|
||||
|
||||
# Forget the room
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
f"/_matrix/client/r0/rooms/{room_id}/forget",
|
||||
content={},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.result)
|
||||
|
||||
# Clean-up the `sliding_sync_joined_rooms` table as if the forgotten status
|
||||
# never made it into the table.
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_update(
|
||||
table="sliding_sync_membership_snapshots",
|
||||
keyvalues={"room_id": room_id},
|
||||
updatevalues={"forgotten": 0},
|
||||
desc="sliding_sync_membership_snapshots.test_membership_snapshots_background_update_forgotten_partial",
|
||||
)
|
||||
)
|
||||
|
||||
# We should see the partial row that we made in preparation for the test.
|
||||
sliding_sync_membership_snapshots_results = (
|
||||
self._get_sliding_sync_membership_snapshots()
|
||||
)
|
||||
self.assertIncludes(
|
||||
set(sliding_sync_membership_snapshots_results.keys()),
|
||||
{
|
||||
(room_id, user1_id),
|
||||
(room_id, user2_id),
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
user1_snapshot = _SlidingSyncMembershipSnapshotResult(
|
||||
room_id=room_id,
|
||||
user_id=user1_id,
|
||||
sender=user1_id,
|
||||
membership_event_id=state_map[(EventTypes.Member, user1_id)].event_id,
|
||||
membership=Membership.LEAVE,
|
||||
event_stream_ordering=state_map[
|
||||
(EventTypes.Member, user1_id)
|
||||
].internal_metadata.stream_ordering,
|
||||
has_known_state=True,
|
||||
room_type=None,
|
||||
room_name=None,
|
||||
is_encrypted=False,
|
||||
tombstone_successor_room_id=None,
|
||||
# Room is *not* forgotten because of our test preparation
|
||||
forgotten=False,
|
||||
)
|
||||
self.assertEqual(
|
||||
sliding_sync_membership_snapshots_results.get((room_id, user1_id)),
|
||||
user1_snapshot,
|
||||
)
|
||||
user2_snapshot = _SlidingSyncMembershipSnapshotResult(
|
||||
room_id=room_id,
|
||||
user_id=user2_id,
|
||||
sender=user2_id,
|
||||
membership_event_id=state_map[(EventTypes.Member, user2_id)].event_id,
|
||||
membership=Membership.JOIN,
|
||||
event_stream_ordering=state_map[
|
||||
(EventTypes.Member, user2_id)
|
||||
].internal_metadata.stream_ordering,
|
||||
has_known_state=True,
|
||||
room_type=None,
|
||||
room_name=None,
|
||||
is_encrypted=False,
|
||||
tombstone_successor_room_id=None,
|
||||
)
|
||||
self.assertEqual(
|
||||
sliding_sync_membership_snapshots_results.get((room_id, user2_id)),
|
||||
user2_snapshot,
|
||||
)
|
||||
|
||||
# Insert and run the background update.
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
|
||||
"progress_json": "{}",
|
||||
},
|
||||
)
|
||||
)
|
||||
self.store.db_pool.updates._all_done = False
|
||||
self.wait_for_background_updates()
|
||||
|
||||
# Make sure the table is populated
|
||||
sliding_sync_membership_snapshots_results = (
|
||||
self._get_sliding_sync_membership_snapshots()
|
||||
)
|
||||
self.assertIncludes(
|
||||
set(sliding_sync_membership_snapshots_results.keys()),
|
||||
{
|
||||
(room_id, user1_id),
|
||||
(room_id, user2_id),
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
# Forgotten status is now updated
|
||||
self.assertEqual(
|
||||
sliding_sync_membership_snapshots_results.get((room_id, user1_id)),
|
||||
attr.evolve(user1_snapshot, forgotten=True),
|
||||
)
|
||||
# Holds the info according to the current state when the user joined
|
||||
self.assertEqual(
|
||||
sliding_sync_membership_snapshots_results.get((room_id, user2_id)),
|
||||
user2_snapshot,
|
||||
)
|
||||
|
||||
|
||||
class SlidingSyncTablesCatchUpBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase):
|
||||
"""
|
||||
|
|
Loading…
Reference in a new issue