Merge remote-tracking branch 'origin/develop' into erikj/ss_remove_list_ops

This commit is contained in:
Erik Johnston 2024-09-10 10:25:05 +01:00
commit 02bc71a771
34 changed files with 1378 additions and 517 deletions

1
changelog.d/17641.misc Normal file
View file

@ -0,0 +1 @@
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.

View file

@ -0,0 +1 @@
Stabilise [MSC4156](https://github.com/matrix-org/matrix-spec-proposals/pull/4156) by removing the `msc4156_enabled` config setting and defaulting it to `true`.

1
changelog.d/17654.misc Normal file
View file

@ -0,0 +1 @@
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.

1
changelog.d/17658.misc Normal file
View file

@ -0,0 +1 @@
Get `bump_stamp` from [new sliding sync tables](https://github.com/element-hq/synapse/pull/17512) which should be faster.

1
changelog.d/17665.misc Normal file
View file

@ -0,0 +1 @@
Speed up incremental Sliding Sync requests by avoiding extra work.

1
changelog.d/17666.misc Normal file
View file

@ -0,0 +1 @@
Small performance improvement in speeding up sliding sync.

1
changelog.d/17670.misc Normal file
View file

@ -0,0 +1 @@
Small performance improvement in speeding up sliding sync.

1
changelog.d/17672.misc Normal file
View file

@ -0,0 +1 @@
Small performance improvement in speeding up sliding sync.

1
changelog.d/17673.misc Normal file
View file

@ -0,0 +1 @@
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.

1
changelog.d/17674.bugfix Normal file
View file

@ -0,0 +1 @@
Fix bug where we returned the wrong `bump_stamp` for invites in sliding sync response, causing incorrect ordering of invites in the room list.

1
changelog.d/17684.misc Normal file
View file

@ -0,0 +1 @@
Speed up sliding sync by reducing number of database calls.

1
changelog.d/17688.misc Normal file
View file

@ -0,0 +1 @@
Speed up sync by pulling out fewer events from the database.

View file

@ -447,6 +447,3 @@ class ExperimentalConfig(Config):
# MSC4151: Report room API (Client-Server API)
self.msc4151_enabled: bool = experimental.get("msc4151_enabled", False)
# MSC4156: Migrate server_name to via
self.msc4156_enabled: bool = experimental.get("msc4156_enabled", False)

View file

@ -200,6 +200,7 @@ class AdminHandler:
(
events,
_,
_,
) = await self._store.paginate_room_events_by_topological_ordering(
room_id=room_id,
from_key=from_key,

View file

@ -510,6 +510,7 @@ class PaginationHandler:
(
events,
next_key,
_,
) = await self.store.paginate_room_events_by_topological_ordering(
room_id=room_id,
from_key=from_token.room_key,
@ -588,6 +589,7 @@ class PaginationHandler:
(
events,
next_key,
_,
) = await self.store.paginate_room_events_by_topological_ordering(
room_id=room_id,
from_key=from_token.room_key,

View file

@ -1753,7 +1753,7 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
)
events = list(room_events)
events.extend(e for evs, _ in room_to_events.values() for e in evs)
events.extend(e for evs, _, _ in room_to_events.values() for e in evs)
# We know stream_ordering must be not None here, as its been
# persisted, but mypy doesn't know that

View file

@ -25,8 +25,8 @@ from synapse.events.utils import strip_event
from synapse.handlers.relations import BundledAggregations
from synapse.handlers.sliding_sync.extensions import SlidingSyncExtensionHandler
from synapse.handlers.sliding_sync.room_lists import (
RoomsForUserType,
SlidingSyncRoomLists,
_RoomMembershipForUser,
)
from synapse.handlers.sliding_sync.store import SlidingSyncConnectionStore
from synapse.logging.opentracing import (
@ -39,12 +39,14 @@ from synapse.logging.opentracing import (
)
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
from synapse.storage.databases.main.stream import PaginateFunction
from synapse.storage.roommember import MemberSummary
from synapse.storage.roommember import (
MemberSummary,
)
from synapse.types import (
JsonDict,
MutableStateMap,
PersistedEventPosition,
Requester,
RoomStreamToken,
SlidingSyncStreamToken,
StateMap,
StreamKeyType,
@ -255,6 +257,8 @@ class SlidingSyncHandler:
],
from_token=from_token,
to_token=to_token,
newly_joined=room_id in interested_rooms.newly_joined_rooms,
is_dm=room_id in interested_rooms.dm_room_ids,
)
# Filter out empty room results during incremental sync
@ -352,7 +356,7 @@ class SlidingSyncHandler:
async def get_current_state_ids_at(
self,
room_id: str,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
room_membership_for_user_at_to_token: RoomsForUserType,
state_filter: StateFilter,
to_token: StreamToken,
) -> StateMap[str]:
@ -417,7 +421,7 @@ class SlidingSyncHandler:
async def get_current_state_at(
self,
room_id: str,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
room_membership_for_user_at_to_token: RoomsForUserType,
state_filter: StateFilter,
to_token: StreamToken,
) -> StateMap[EventBase]:
@ -457,9 +461,11 @@ class SlidingSyncHandler:
new_connection_state: "MutablePerConnectionState",
room_id: str,
room_sync_config: RoomSyncConfig,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
room_membership_for_user_at_to_token: RoomsForUserType,
from_token: Optional[SlidingSyncStreamToken],
to_token: StreamToken,
newly_joined: bool,
is_dm: bool,
) -> SlidingSyncResult.RoomResult:
"""
Fetch room data for the sync response.
@ -475,6 +481,8 @@ class SlidingSyncHandler:
in the room at the time of `to_token`.
from_token: The point in the stream to sync from.
to_token: The point in the stream to sync up to.
newly_joined: If the user has newly joined the room
is_dm: Whether the room is a DM room
"""
user = sync_config.user
@ -519,7 +527,7 @@ class SlidingSyncHandler:
from_bound = None
initial = True
ignore_timeline_bound = False
if from_token and not room_membership_for_user_at_to_token.newly_joined:
if from_token and not newly_joined:
room_status = previous_connection_state.rooms.have_sent_room(room_id)
if room_status.status == HaveSentRoomFlag.LIVE:
from_bound = from_token.stream_token.room_key
@ -623,7 +631,7 @@ class SlidingSyncHandler:
# Use `stream_ordering` for updates
else paginate_room_events_by_stream_ordering
)
timeline_events, new_room_key = await pagination_method(
timeline_events, new_room_key, limited = await pagination_method(
room_id=room_id,
# The bounds are reversed so we can paginate backwards
# (from newer to older events) starting at to_bound.
@ -631,28 +639,13 @@ class SlidingSyncHandler:
from_key=to_bound,
to_key=timeline_from_bound,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
limit=room_sync_config.timeline_limit + 1,
limit=room_sync_config.timeline_limit,
)
# We want to return the events in ascending order (the last event is the
# most recent).
timeline_events.reverse()
# Determine our `limited` status based on the timeline. We do this before
# filtering the events so we can accurately determine if there is more to
# paginate even if we filter out some/all events.
if len(timeline_events) > room_sync_config.timeline_limit:
limited = True
# Get rid of that extra "+ 1" event because we only used it to determine
# if we hit the limit or not
timeline_events = timeline_events[-room_sync_config.timeline_limit :]
assert timeline_events[0].internal_metadata.stream_ordering
new_room_key = RoomStreamToken(
stream=timeline_events[0].internal_metadata.stream_ordering - 1
)
# Make sure we don't expose any events that the client shouldn't see
timeline_events = await filter_events_for_client(
self.storage_controllers,
@ -745,6 +738,17 @@ class SlidingSyncHandler:
# indicate to the client that a state reset happened. Perhaps we should indicate
# this by setting `initial: True` and empty `required_state`.
# Get the changes to current state in the token range from the
# `current_state_delta_stream` table.
#
# For incremental syncs, we can do this first to determine if something relevant
# has changed and strategically avoid fetching other costly things.
room_state_delta_id_map: MutableStateMap[str] = {}
name_event_id: Optional[str] = None
membership_changed = False
name_changed = False
avatar_changed = False
if initial:
# Check whether the room has a name set
name_state_ids = await self.get_current_state_ids_at(
room_id=room_id,
@ -753,9 +757,50 @@ class SlidingSyncHandler:
to_token=to_token,
)
name_event_id = name_state_ids.get((EventTypes.Name, ""))
else:
assert from_bound is not None
room_membership_summary: Mapping[str, MemberSummary]
# TODO: Limit the number of state events we're about to send down
# the room, if its too many we should change this to an
# `initial=True`?
deltas = await self.store.get_current_state_deltas_for_room(
room_id=room_id,
from_token=from_bound,
to_token=to_token.room_key,
)
for delta in deltas:
# TODO: Handle state resets where event_id is None
if delta.event_id is not None:
room_state_delta_id_map[(delta.event_type, delta.state_key)] = (
delta.event_id
)
if delta.event_type == EventTypes.Member:
membership_changed = True
elif delta.event_type == EventTypes.Name and delta.state_key == "":
name_changed = True
elif (
delta.event_type == EventTypes.RoomAvatar and delta.state_key == ""
):
avatar_changed = True
room_membership_summary: Optional[Mapping[str, MemberSummary]] = None
empty_membership_summary = MemberSummary([], 0)
# We need the room summary for:
# - Always for initial syncs (or the first time we send down the room)
# - When the room has no name, we need `heroes`
# - When the membership has changed so we need to give updated `heroes` and
# `joined_count`/`invited_count`.
#
# Ideally, instead of just looking at `name_changed`, we'd check if the room
# name is not set but this is a good enough approximation that saves us from
# having to pull out the full event. This just means, we're generating the
# summary whenever the room name changes instead of only when it changes to
# `None`.
if initial or name_changed or membership_changed:
# We can't trace the function directly because it's cached and the `@cached`
# decorator doesn't mix with `@trace` yet.
with start_active_span("get_room_summary"):
if room_membership_for_user_at_to_token.membership in (
Membership.LEAVE,
Membership.BAN,
@ -778,7 +823,12 @@ class SlidingSyncHandler:
# TODO: Should we also check for `EventTypes.CanonicalAlias`
# (`m.room.canonical_alias`) as a fallback for the room name? see
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1671260153
if name_event_id is None:
#
# We need to fetch the `heroes` if the room name is not set. But we only need to
# get them on initial syncs (or the first time we send down the room) or if the
# membership has changed which may change the heroes.
if name_event_id is None and (initial or (not initial and membership_changed)):
assert room_membership_summary is not None
hero_user_ids = extract_heroes_from_room_summary(
room_membership_summary, me=user.to_string()
)
@ -896,9 +946,15 @@ class SlidingSyncHandler:
# We need this base set of info for the response so let's just fetch it along
# with the `required_state` for the room
meta_room_state = [(EventTypes.Name, ""), (EventTypes.RoomAvatar, "")] + [
hero_room_state = [
(EventTypes.Member, hero_user_id) for hero_user_id in hero_user_ids
]
meta_room_state = list(hero_room_state)
if initial or name_changed:
meta_room_state.append((EventTypes.Name, ""))
if initial or avatar_changed:
meta_room_state.append((EventTypes.RoomAvatar, ""))
state_filter = StateFilter.all()
if required_state_filter != StateFilter.all():
state_filter = StateFilter(
@ -921,21 +977,22 @@ class SlidingSyncHandler:
else:
assert from_bound is not None
# TODO: Limit the number of state events we're about to send down
# the room, if its too many we should change this to an
# `initial=True`?
deltas = await self.store.get_current_state_deltas_for_room(
room_id=room_id,
from_token=from_bound,
to_token=to_token.room_key,
)
# TODO: Filter room state before fetching events
# TODO: Handle state resets where event_id is None
events = await self.store.get_events(
[d.event_id for d in deltas if d.event_id]
state_filter.filter_state(room_state_delta_id_map).values()
)
room_state = {(s.type, s.state_key): s for s in events.values()}
# If the membership changed and we have to get heroes, get the remaining
# heroes from the state
if hero_user_ids:
hero_membership_state = await self.get_current_state_at(
room_id=room_id,
room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
state_filter=StateFilter.from_types(hero_room_state),
to_token=to_token,
)
room_state.update(hero_membership_state)
required_room_state: StateMap[EventBase] = {}
if required_state_filter != StateFilter.none():
required_room_state = required_state_filter.filter_state(room_state)
@ -968,26 +1025,19 @@ class SlidingSyncHandler:
)
# Figure out the last bump event in the room
last_bump_event_result = (
await self.store.get_last_event_pos_in_room_before_stream_ordering(
room_id,
to_token.room_key,
event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
)
)
# By default, just choose the membership event position
#
# By default, just choose the membership event position for any non-join membership
bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
# But if we found a bump event, use that instead
if last_bump_event_result is not None:
_, new_bump_event_pos = last_bump_event_result
# If we've just joined a remote room, then the last bump event may
# have been backfilled (and so have a negative stream ordering).
# These negative stream orderings can't sensibly be compared, so
# instead we use the membership event position.
if new_bump_event_pos.stream > 0:
bump_stamp = new_bump_event_pos.stream
# If we're joined to the room, we need to find the last bump event before the
# `to_token`
if room_membership_for_user_at_to_token.membership == Membership.JOIN:
# Try and get a bump stamp, if not we just fall back to the
# membership token.
new_bump_stamp = await self._get_bump_stamp(
room_id, to_token, timeline_events
)
if new_bump_stamp is not None:
bump_stamp = new_bump_stamp
unstable_expanded_timeline = False
prev_room_sync_config = previous_connection_state.room_configs.get(room_id)
@ -1040,11 +1090,25 @@ class SlidingSyncHandler:
set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)
joined_count: Optional[int] = None
if initial or membership_changed:
assert room_membership_summary is not None
joined_count = room_membership_summary.get(
Membership.JOIN, empty_membership_summary
).count
invited_count: Optional[int] = None
if initial or membership_changed:
assert room_membership_summary is not None
invited_count = room_membership_summary.get(
Membership.INVITE, empty_membership_summary
).count
return SlidingSyncResult.RoomResult(
name=room_name,
avatar=room_avatar,
heroes=heroes,
is_dm=room_membership_for_user_at_to_token.is_dm,
is_dm=is_dm,
initial=initial,
required_state=list(required_room_state.values()),
timeline_events=timeline_events,
@ -1055,15 +1119,100 @@ class SlidingSyncHandler:
unstable_expanded_timeline=unstable_expanded_timeline,
num_live=num_live,
bump_stamp=bump_stamp,
joined_count=room_membership_summary.get(
Membership.JOIN, empty_membership_summary
).count,
invited_count=room_membership_summary.get(
Membership.INVITE, empty_membership_summary
).count,
joined_count=joined_count,
invited_count=invited_count,
# TODO: These are just dummy values. We could potentially just remove these
# since notifications can only really be done correctly on the client anyway
# (encrypted rooms).
notification_count=0,
highlight_count=0,
)
@trace
async def _get_bump_stamp(
self, room_id: str, to_token: StreamToken, timeline: List[EventBase]
) -> Optional[int]:
"""Get a bump stamp for the room, if we have a bump event
Args:
room_id
to_token: The upper bound of token to return
timeline: The list of events we have fetched.
"""
# First check the timeline events we're returning to see if one of
# those matches. We iterate backwards and take the stream ordering
# of the first event that matches the bump event types.
for timeline_event in reversed(timeline):
if timeline_event.type in SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES:
new_bump_stamp = timeline_event.internal_metadata.stream_ordering
# All persisted events have a stream ordering
assert new_bump_stamp is not None
# If we've just joined a remote room, then the last bump event may
# have been backfilled (and so have a negative stream ordering).
# These negative stream orderings can't sensibly be compared, so
# instead we use the membership event position.
if new_bump_stamp > 0:
return new_bump_stamp
# We can quickly query for the latest bump event in the room using the
# sliding sync tables.
latest_room_bump_stamp = await self.store.get_latest_bump_stamp_for_room(
room_id
)
min_to_token_position = to_token.room_key.stream
# If we can rely on the new sliding sync tables and the `bump_stamp` is
# `None`, just fallback to the membership event position. This can happen
# when we've just joined a remote room and all the events are backfilled.
if (
# FIXME: The background job check can be removed once we bump
# `SCHEMA_COMPAT_VERSION` and run the foreground update for
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots`
# (tracked by https://github.com/element-hq/synapse/issues/17623)
await self.store.have_finished_sliding_sync_background_jobs()
and latest_room_bump_stamp is None
):
return None
# The `bump_stamp` stored in the database might be ahead of our token. Since
# `bump_stamp` is only a `stream_ordering` position, we can't be 100% sure
# that's before the `to_token` in all scenarios. The only scenario we can be
# sure of is if the `bump_stamp` is totally before the minimum position from
# the token.
#
# We don't need to check if the background update has finished, as if the
# returned bump stamp is not None then it must be up to date.
elif (
latest_room_bump_stamp is not None
and latest_room_bump_stamp < min_to_token_position
):
if latest_room_bump_stamp > 0:
return latest_room_bump_stamp
else:
return None
# Otherwise, if it's within or after the `to_token`, we need to find the
# last bump event before the `to_token`.
else:
last_bump_event_result = (
await self.store.get_last_event_pos_in_room_before_stream_ordering(
room_id,
to_token.room_key,
event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
)
)
if last_bump_event_result is not None:
_, new_bump_event_pos = last_bump_event_result
# If we've just joined a remote room, then the last bump event may
# have been backfilled (and so have a negative stream ordering).
# These negative stream orderings can't sensibly be compared, so
# instead we use the membership event position.
if new_bump_event_pos.stream > 0:
return new_bump_event_pos.stream
return None

View file

@ -19,7 +19,6 @@ from itertools import chain
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Dict,
List,
Literal,
@ -48,7 +47,11 @@ from synapse.storage.databases.main.state import (
Sentinel as StateSentinel,
)
from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
from synapse.storage.roommember import RoomsForUser, RoomsForUserSlidingSync
from synapse.storage.roommember import (
RoomsForUser,
RoomsForUserSlidingSync,
RoomsForUserStateReset,
)
from synapse.types import (
MutableStateMap,
PersistedEventPosition,
@ -75,6 +78,11 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
# Helper definition for the types that we might return. We do this to avoid
# copying data between types (which can be expensive for many rooms).
RoomsForUserType = Union[RoomsForUserStateReset, RoomsForUser, RoomsForUserSlidingSync]
@attr.s(auto_attribs=True, slots=True, frozen=True)
class SlidingSyncInterestedRooms:
"""The set of rooms and metadata a client is interested in based on their
@ -90,13 +98,22 @@ class SlidingSyncInterestedRooms:
includes the rooms that *may* have relevant updates. Rooms not
in this map will definitely not have room updates (though
extensions may have updates in these rooms).
newly_joined_rooms: The set of rooms that were joined in the token range
and the user is still joined to at the end of this range.
newly_left_rooms: The set of rooms that we left in the token range
and are still "leave" at the end of this range.
dm_room_ids: The set of rooms the user consider as direct-message (DM) rooms
"""
lists: Mapping[str, "SlidingSyncListResult"]
relevant_room_map: Mapping[str, RoomSyncConfig]
relevant_rooms_to_send_map: Mapping[str, RoomSyncConfig]
all_rooms: Set[str]
room_membership_for_user_map: Mapping[str, "_RoomMembershipForUser"]
room_membership_for_user_map: Mapping[str, RoomsForUserType]
newly_joined_rooms: AbstractSet[str]
newly_left_rooms: AbstractSet[str]
dm_room_ids: AbstractSet[str]
@attr.s(auto_attribs=True, slots=True, frozen=True)
@ -116,47 +133,10 @@ class Sentinel(enum.Enum):
UNSET_SENTINEL = object()
@attr.s(slots=True, frozen=True, auto_attribs=True)
class _RoomMembershipForUser:
"""
Attributes:
room_id: The room ID of the membership event
event_id: The event ID of the membership event
event_pos: The stream position of the membership event
membership: The membership state of the user in the room
sender: The person who sent the membership event
newly_joined: Whether the user newly joined the room during the given token
range and is still joined to the room at the end of this range.
newly_left: Whether the user newly left (or kicked) the room during the given
token range and is still "leave" at the end of this range.
is_dm: Whether this user considers this room as a direct-message (DM) room
"""
room_id: str
# Optional because state resets can affect room membership without a corresponding event.
event_id: Optional[str]
# Even during a state reset which removes the user from the room, we expect this to
# be set because `current_state_delta_stream` will note the position that the reset
# happened.
event_pos: PersistedEventPosition
# Even during a state reset which removes the user from the room, we expect this to
# be set to `LEAVE` because we can make that assumption based on the situaton (see
# `get_current_state_delta_membership_changes_for_user(...)`)
membership: str
# Optional because state resets can affect room membership without a corresponding event.
sender: Optional[str]
newly_joined: bool
newly_left: bool
is_dm: bool
def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
return attr.evolve(self, **kwds)
def filter_membership_for_sync(
*,
user_id: str,
room_membership_for_user: Union[_RoomMembershipForUser, RoomsForUserSlidingSync],
room_membership_for_user: RoomsForUserType,
newly_left: bool,
) -> bool:
"""
@ -363,11 +343,7 @@ class SlidingSyncRoomLists:
# Find which rooms are partially stated and may need to be filtered out
# depending on the `required_state` requested (see below).
partial_state_room_map = (
await self.store.is_partial_state_room_batched(
filtered_sync_room_map.keys()
)
)
partial_state_rooms = await self.store.get_partial_rooms()
# Since creating the `RoomSyncConfig` takes some work, let's just do it
# once.
@ -379,18 +355,23 @@ class SlidingSyncRoomLists:
filtered_sync_room_map = {
room_id: room
for room_id, room in filtered_sync_room_map.items()
if not partial_state_room_map.get(room_id)
if room_id not in partial_state_rooms
}
all_rooms.update(filtered_sync_room_map)
room_ids_in_list: List[str] = []
if list_config.ranges:
if list_config.ranges == [(0, len(filtered_sync_room_map) - 1)]:
# If we are asking for the full range, we don't need to sort the list.
sorted_room_info = list(filtered_sync_room_map.values())
else:
# Sort the list
sorted_room_info = await self.sort_rooms_using_tables(
filtered_sync_room_map, to_token
)
room_ids_in_list: List[str] = []
if list_config.ranges:
for range in list_config.ranges:
# We're going to loop through the sorted list of rooms starting
# at the range start index and keep adding rooms until we fill
@ -428,9 +409,7 @@ class SlidingSyncRoomLists:
with start_active_span("assemble_room_subscriptions"):
# Find which rooms are partially stated and may need to be filtered out
# depending on the `required_state` requested (see below).
partial_state_room_map = await self.store.is_partial_state_room_batched(
sync_config.room_subscriptions.keys()
)
partial_state_rooms = await self.store.get_partial_rooms()
for (
room_id,
@ -450,7 +429,7 @@ class SlidingSyncRoomLists:
# Exclude partially-stated rooms if we must wait for the room to be
# fully-stated
if room_sync_config.must_await_full_state(self.is_mine_id):
if partial_state_room_map.get(room_id):
if room_id in partial_state_rooms:
continue
all_rooms.add(room_id)
@ -478,22 +457,10 @@ class SlidingSyncRoomLists:
relevant_room_map=relevant_room_map,
relevant_rooms_to_send_map=relevant_rooms_to_send_map,
all_rooms=all_rooms,
room_membership_for_user_map={
# FIXME: Ideally we wouldn't have to create a new
# `_RoomMembershipForUser` here and instead just return
# `newly_joined_room_ids` directly, to save CPU time.
room_id: _RoomMembershipForUser(
room_id=room_id,
event_id=membership_info.event_id,
event_pos=membership_info.event_pos,
sender=membership_info.sender,
membership=membership_info.membership,
newly_joined=room_id in newly_joined_room_ids,
newly_left=room_id in newly_left_room_map,
is_dm=room_id in dm_room_ids,
)
for room_id, membership_info in room_membership_for_user_map.items()
},
room_membership_for_user_map=room_membership_for_user_map,
newly_joined_rooms=newly_joined_room_ids,
newly_left_rooms=set(newly_left_room_map),
dm_room_ids=dm_room_ids,
)
async def _compute_interested_rooms_fallback(
@ -505,11 +472,15 @@ class SlidingSyncRoomLists:
) -> SlidingSyncInterestedRooms:
"""Fallback code when the database background updates haven't completed yet."""
room_membership_for_user_map = (
await self.get_room_membership_for_user_at_to_token(
(
room_membership_for_user_map,
newly_joined_room_ids,
newly_left_room_ids,
) = await self.get_room_membership_for_user_at_to_token(
sync_config.user, to_token, from_token
)
)
dm_room_ids = await self._get_dm_rooms_for_user(sync_config.user.to_string())
# Assemble sliding window lists
lists: Dict[str, SlidingSyncListResult] = {}
@ -524,6 +495,7 @@ class SlidingSyncRoomLists:
sync_room_map = await self.filter_rooms_relevant_for_sync(
user=sync_config.user,
room_membership_for_user_map=room_membership_for_user_map,
newly_left_room_ids=newly_left_room_ids,
)
for list_key, list_config in sync_config.lists.items():
@ -535,15 +507,12 @@ class SlidingSyncRoomLists:
sync_room_map,
list_config.filters,
to_token,
dm_room_ids,
)
# Find which rooms are partially stated and may need to be filtered out
# depending on the `required_state` requested (see below).
partial_state_room_map = (
await self.store.is_partial_state_room_batched(
filtered_sync_room_map.keys()
)
)
partial_state_rooms = await self.store.get_partial_rooms()
# Since creating the `RoomSyncConfig` takes some work, let's just do it
# once.
@ -555,7 +524,7 @@ class SlidingSyncRoomLists:
filtered_sync_room_map = {
room_id: room
for room_id, room in filtered_sync_room_map.items()
if not partial_state_room_map.get(room_id)
if room_id not in partial_state_rooms
}
all_rooms.update(filtered_sync_room_map)
@ -604,9 +573,7 @@ class SlidingSyncRoomLists:
with start_active_span("assemble_room_subscriptions"):
# Find which rooms are partially stated and may need to be filtered out
# depending on the `required_state` requested (see below).
partial_state_room_map = await self.store.is_partial_state_room_batched(
sync_config.room_subscriptions.keys()
)
partial_state_rooms = await self.store.get_partial_rooms()
for (
room_id,
@ -638,7 +605,7 @@ class SlidingSyncRoomLists:
# Exclude partially-stated rooms if we must wait for the room to be
# fully-stated
if room_sync_config.must_await_full_state(self.is_mine_id):
if partial_state_room_map.get(room_id):
if room_id in partial_state_rooms:
continue
all_rooms.add(room_id)
@ -667,6 +634,9 @@ class SlidingSyncRoomLists:
relevant_rooms_to_send_map=relevant_rooms_to_send_map,
all_rooms=all_rooms,
room_membership_for_user_map=room_membership_for_user_map,
newly_joined_rooms=newly_joined_room_ids,
newly_left_rooms=newly_left_room_ids,
dm_room_ids=dm_room_ids,
)
async def _filter_relevant_room_to_send(
@ -743,7 +713,7 @@ class SlidingSyncRoomLists:
async def _get_rewind_changes_to_current_membership_to_token(
self,
user: UserID,
rooms_for_user: Mapping[str, Union[RoomsForUser, RoomsForUserSlidingSync]],
rooms_for_user: Mapping[str, RoomsForUserType],
to_token: StreamToken,
) -> Mapping[str, Optional[RoomsForUser]]:
"""
@ -895,7 +865,7 @@ class SlidingSyncRoomLists:
user: UserID,
to_token: StreamToken,
from_token: Optional[StreamToken],
) -> Dict[str, _RoomMembershipForUser]:
) -> Tuple[Dict[str, RoomsForUserType], AbstractSet[str], AbstractSet[str]]:
"""
Fetch room IDs that the user has had membership in (the full room list including
long-lost left rooms that will be filtered, sorted, and sliced).
@ -914,8 +884,11 @@ class SlidingSyncRoomLists:
from_token: The point in the stream to sync from.
Returns:
A dictionary of room IDs that the user has had membership in along with
A 3-tuple of:
- A dictionary of room IDs that the user has had membership in along with
membership information in that room at the time of `to_token`.
- Set of newly joined rooms
- Set of newly left rooms
"""
user_id = user.to_string()
@ -932,12 +905,14 @@ class SlidingSyncRoomLists:
# If the user has never joined any rooms before, we can just return an empty list
if not room_for_user_list:
return {}
return {}, set(), set()
# Since we fetched the users room list at some point in time after the
# tokens, we need to revert/rewind some membership changes to match the point in
# time of the `to_token`.
rooms_for_user = {room.room_id: room for room in room_for_user_list}
rooms_for_user: Dict[str, RoomsForUserType] = {
room.room_id: room for room in room_for_user_list
}
changes = await self._get_rewind_changes_to_current_membership_to_token(
user, rooms_for_user, to_token
)
@ -954,42 +929,23 @@ class SlidingSyncRoomLists:
user_id, to_token=to_token, from_token=from_token
)
dm_room_ids = await self._get_dm_rooms_for_user(user_id)
# Our working list of rooms that can show up in the sync response
sync_room_id_set = {
room_for_user.room_id: _RoomMembershipForUser(
room_id=room_for_user.room_id,
event_id=room_for_user.event_id,
event_pos=room_for_user.event_pos,
membership=room_for_user.membership,
sender=room_for_user.sender,
newly_joined=room_id in newly_joined_room_ids,
newly_left=room_id in newly_left_room_ids,
is_dm=room_id in dm_room_ids,
)
for room_id, room_for_user in rooms_for_user.items()
}
# Ensure we have entries for rooms that the user has been "state reset"
# out of. These are rooms appear in the `newly_left_rooms` map but
# aren't in the `rooms_for_user` map.
for room_id, left_event_pos in newly_left_room_ids.items():
if room_id in sync_room_id_set:
if room_id in rooms_for_user:
continue
sync_room_id_set[room_id] = _RoomMembershipForUser(
rooms_for_user[room_id] = RoomsForUserStateReset(
room_id=room_id,
event_id=None,
event_pos=left_event_pos,
membership=Membership.LEAVE,
sender=None,
newly_joined=False,
newly_left=True,
is_dm=room_id in dm_room_ids,
room_version_id=await self.store.get_room_version_id(room_id),
)
return sync_room_id_set
return rooms_for_user, newly_joined_room_ids, set(newly_left_room_ids)
@trace
async def _get_newly_joined_and_left_rooms(
@ -997,7 +953,7 @@ class SlidingSyncRoomLists:
user_id: str,
to_token: StreamToken,
from_token: Optional[StreamToken],
) -> Tuple[StrCollection, Mapping[str, PersistedEventPosition]]:
) -> Tuple[AbstractSet[str], Mapping[str, PersistedEventPosition]]:
"""Fetch the sets of rooms that the user newly joined or left in the
given token range.
@ -1150,8 +1106,9 @@ class SlidingSyncRoomLists:
async def filter_rooms_relevant_for_sync(
self,
user: UserID,
room_membership_for_user_map: Dict[str, _RoomMembershipForUser],
) -> Dict[str, _RoomMembershipForUser]:
room_membership_for_user_map: Dict[str, RoomsForUserType],
newly_left_room_ids: AbstractSet[str],
) -> Dict[str, RoomsForUserType]:
"""
Filter room IDs that should/can be listed for this user in the sync response (the
full room list that will be further filtered, sorted, and sliced).
@ -1172,6 +1129,7 @@ class SlidingSyncRoomLists:
Args:
user: User that is syncing
room_membership_for_user_map: Room membership for the user
newly_left_room_ids: The set of room IDs we have newly left
Returns:
A dictionary of room IDs that should be listed in the sync response along
@ -1186,7 +1144,7 @@ class SlidingSyncRoomLists:
if filter_membership_for_sync(
user_id=user_id,
room_membership_for_user=room_membership_for_user,
newly_left=room_membership_for_user.newly_left,
newly_left=room_id in newly_left_room_ids,
)
}
@ -1195,9 +1153,9 @@ class SlidingSyncRoomLists:
async def check_room_subscription_allowed_for_user(
self,
room_id: str,
room_membership_for_user_map: Dict[str, _RoomMembershipForUser],
room_membership_for_user_map: Dict[str, RoomsForUserType],
to_token: StreamToken,
) -> Optional[_RoomMembershipForUser]:
) -> Optional[RoomsForUserType]:
"""
Check whether the user is allowed to see the room based on whether they have
ever had membership in the room or if the room is `world_readable`.
@ -1262,7 +1220,7 @@ class SlidingSyncRoomLists:
async def _bulk_get_stripped_state_for_rooms_from_sync_room_map(
self,
room_ids: StrCollection,
sync_room_map: Dict[str, _RoomMembershipForUser],
sync_room_map: Dict[str, RoomsForUserType],
) -> Dict[str, Optional[StateMap[StrippedStateEvent]]]:
"""
Fetch stripped state for a list of room IDs. Stripped state is only
@ -1359,7 +1317,7 @@ class SlidingSyncRoomLists:
"room_encryption",
],
room_ids: Set[str],
sync_room_map: Dict[str, _RoomMembershipForUser],
sync_room_map: Dict[str, RoomsForUserType],
to_token: StreamToken,
room_id_to_stripped_state_map: Dict[
str, Optional[StateMap[StrippedStateEvent]]
@ -1523,10 +1481,11 @@ class SlidingSyncRoomLists:
async def filter_rooms(
self,
user: UserID,
sync_room_map: Dict[str, _RoomMembershipForUser],
sync_room_map: Dict[str, RoomsForUserType],
filters: SlidingSyncConfig.SlidingSyncList.Filters,
to_token: StreamToken,
) -> Dict[str, _RoomMembershipForUser]:
dm_room_ids: AbstractSet[str],
) -> Dict[str, RoomsForUserType]:
"""
Filter rooms based on the sync request.
@ -1536,6 +1495,7 @@ class SlidingSyncRoomLists:
information in the room at the time of `to_token`.
filters: Filters to apply
to_token: We filter based on the state of the room at this token
dm_room_ids: Set of room IDs that are DMs for the user
Returns:
A filtered dictionary of room IDs along with membership information in the
@ -1555,14 +1515,14 @@ class SlidingSyncRoomLists:
filtered_room_id_set = {
room_id
for room_id in filtered_room_id_set
if sync_room_map[room_id].is_dm
if room_id in dm_room_ids
}
else:
# Only non-DM rooms please
filtered_room_id_set = {
room_id
for room_id in filtered_room_id_set
if not sync_room_map[room_id].is_dm
if room_id not in dm_room_ids
}
if filters.spaces is not None:
@ -1850,9 +1810,9 @@ class SlidingSyncRoomLists:
@trace
async def sort_rooms(
self,
sync_room_map: Dict[str, _RoomMembershipForUser],
sync_room_map: Dict[str, RoomsForUserType],
to_token: StreamToken,
) -> List[_RoomMembershipForUser]:
) -> List[RoomsForUserType]:
"""
Sort by `stream_ordering` of the last event that the user should see in the
room. `stream_ordering` is unique so we get a stable sort.

View file

@ -906,7 +906,7 @@ class SyncHandler:
# Use `stream_ordering` for updates
else paginate_room_events_by_stream_ordering
)
events, end_key = await pagination_method(
events, end_key, limited = await pagination_method(
room_id=room_id,
# The bounds are reversed so we can paginate backwards
# (from newer to older events) starting at to_bound.
@ -914,9 +914,7 @@ class SyncHandler:
from_key=end_key,
to_key=since_key,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
limit=load_limit + 1,
limit=load_limit,
)
# We want to return the events in ascending order (the last event is the
# most recent).
@ -971,9 +969,6 @@ class SyncHandler:
loaded_recents.extend(recents)
recents = loaded_recents
if len(events) <= load_limit:
limited = False
break
max_repeat -= 1
if len(recents) > timeline_limit:
@ -2608,7 +2603,7 @@ class SyncHandler:
newly_joined = room_id in newly_joined_rooms
if room_entry:
events, start_key = room_entry
events, start_key, _ = room_entry
# We want to return the events in ascending order (the last event is the
# most recent).
events.reverse()

View file

@ -53,7 +53,6 @@ class KnockRoomAliasServlet(RestServlet):
super().__init__()
self.room_member_handler = hs.get_room_member_handler()
self.auth = hs.get_auth()
self._support_via = hs.config.experimental.msc4156_enabled
async def on_POST(
self,
@ -72,16 +71,12 @@ class KnockRoomAliasServlet(RestServlet):
# twisted.web.server.Request.args is incorrectly defined as Optional[Any]
args: Dict[bytes, List[bytes]] = request.args # type: ignore
# Prefer via over server_name (deprecated with MSC4156)
remote_room_hosts = parse_strings_from_args(args, "via", required=False)
if remote_room_hosts is None:
remote_room_hosts = parse_strings_from_args(
args, "server_name", required=False
)
if self._support_via:
remote_room_hosts = parse_strings_from_args(
args,
"org.matrix.msc4156.via",
default=remote_room_hosts,
required=False,
)
elif RoomAlias.is_valid(room_identifier):
handler = self.room_member_handler
room_alias = RoomAlias.from_string(room_identifier)

View file

@ -419,7 +419,6 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, TransactionRestServlet):
super().__init__(hs)
super(ResolveRoomIdMixin, self).__init__(hs) # ensure the Mixin is set up
self.auth = hs.get_auth()
self._support_via = hs.config.experimental.msc4156_enabled
def register(self, http_server: HttpServer) -> None:
# /join/$room_identifier[/$txn_id]
@ -437,13 +436,11 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, TransactionRestServlet):
# twisted.web.server.Request.args is incorrectly defined as Optional[Any]
args: Dict[bytes, List[bytes]] = request.args # type: ignore
remote_room_hosts = parse_strings_from_args(args, "server_name", required=False)
if self._support_via:
# Prefer via over server_name (deprecated with MSC4156)
remote_room_hosts = parse_strings_from_args(args, "via", required=False)
if remote_room_hosts is None:
remote_room_hosts = parse_strings_from_args(
args,
"org.matrix.msc4156.via",
default=remote_room_hosts,
required=False,
args, "server_name", required=False
)
room_id, remote_room_hosts = await self.resolve_room_id(
room_identifier,

View file

@ -993,12 +993,16 @@ class SlidingSyncRestServlet(RestServlet):
for room_id, room_result in rooms.items():
serialized_rooms[room_id] = {
"bump_stamp": room_result.bump_stamp,
"joined_count": room_result.joined_count,
"invited_count": room_result.invited_count,
"notification_count": room_result.notification_count,
"highlight_count": room_result.highlight_count,
}
if room_result.joined_count is not None:
serialized_rooms[room_id]["joined_count"] = room_result.joined_count
if room_result.invited_count is not None:
serialized_rooms[room_id]["invited_count"] = room_result.invited_count
if room_result.name:
serialized_rooms[room_id]["name"] = room_result.name

View file

@ -327,6 +327,13 @@ class PersistEventsStore:
async with stream_ordering_manager as stream_orderings:
for (event, _), stream in zip(events_and_contexts, stream_orderings):
# XXX: We can't rely on `stream_ordering`/`instance_name` being correct
# at this point. We could be working with events that were previously
# persisted as an `outlier` with one `stream_ordering` but are now being
# persisted again and de-outliered and are being assigned a different
# `stream_ordering` here that won't end up being used.
# `_update_outliers_txn()` will fix this discrepancy (always use the
# `stream_ordering` from the first time it was persisted).
event.internal_metadata.stream_ordering = stream
event.internal_metadata.instance_name = self._instance_name
@ -470,11 +477,11 @@ class PersistEventsStore:
membership_infos_to_insert_membership_snapshots.append(
# XXX: We don't use `SlidingSyncMembershipInfoWithEventPos` here
# because we're sourcing the event from `events_and_contexts`, we
# can't rely on `stream_ordering`/`instance_name` being correct. We
# could be working with events that were previously persisted as an
# `outlier` with one `stream_ordering` but are now being persisted
# again and de-outliered and assigned a different `stream_ordering`
# that won't end up being used. Since we call
# can't rely on `stream_ordering`/`instance_name` being correct at
# this point. We could be working with events that were previously
# persisted as an `outlier` with one `stream_ordering` but are now
# being persisted again and de-outliered and assigned a different
# `stream_ordering` that won't end up being used. Since we call
# `_calculate_sliding_sync_table_changes()` before
# `_update_outliers_txn()` which fixes this discrepancy (always use
# the `stream_ordering` from the first time it was persisted), we're
@ -591,11 +598,17 @@ class PersistEventsStore:
event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
)
)
bump_stamp_to_fully_insert = (
most_recent_bump_event_pos_results[1].stream
if most_recent_bump_event_pos_results is not None
else None
)
if most_recent_bump_event_pos_results is not None:
_, new_bump_event_pos = most_recent_bump_event_pos_results
# If we've just joined a remote room, then the last bump event may
# have been backfilled (and so have a negative stream ordering).
# These negative stream orderings can't sensibly be compared, so
# instead just leave it as `None` in the table and we will use their
# membership event position as the bump event position in the
# Sliding Sync API.
if new_bump_event_pos.stream > 0:
bump_stamp_to_fully_insert = new_bump_event_pos.stream
current_state_ids_map = dict(
await self.store.get_partial_filtered_current_state_ids(
@ -2123,31 +2136,26 @@ class PersistEventsStore:
if len(events_and_contexts) == 0:
return
# We only update the sliding sync tables for non-backfilled events.
#
# Check if the first event is a backfilled event (with a negative
# `stream_ordering`). If one event is backfilled, we assume this whole batch was
# backfilled.
first_event_stream_ordering = events_and_contexts[0][
0
].internal_metadata.stream_ordering
# This should exist for persisted events
assert first_event_stream_ordering is not None
if first_event_stream_ordering < 0:
return
# Since the list is sorted ascending by `stream_ordering`, the last event should
# have the highest `stream_ordering`.
max_stream_ordering = events_and_contexts[-1][
0
].internal_metadata.stream_ordering
# `stream_ordering` should be assigned for persisted events
assert max_stream_ordering is not None
# Check if the event is a backfilled event (with a negative `stream_ordering`).
# If one event is backfilled, we assume this whole batch was backfilled.
if max_stream_ordering < 0:
# We only update the sliding sync tables for non-backfilled events.
return
max_bump_stamp = None
for event, _ in reversed(events_and_contexts):
# Sanity check that all events belong to the same room
assert event.room_id == room_id
if event.type in SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES:
# This should exist for persisted events
# `stream_ordering` should be assigned for persisted events
assert event.internal_metadata.stream_ordering is not None
max_bump_stamp = event.internal_metadata.stream_ordering
@ -2156,11 +2164,6 @@ class PersistEventsStore:
# matching bump event which should have the highest `stream_ordering`.
break
# We should have exited earlier if there were no events
assert (
max_stream_ordering is not None
), "Expected to have a stream_ordering if we have events"
# Handle updating the `sliding_sync_joined_rooms` table.
#
txn.execute(

View file

@ -41,7 +41,10 @@ from synapse.storage.databases.main.events import (
SlidingSyncMembershipSnapshotSharedInsertValues,
SlidingSyncStateInsertValues,
)
from synapse.storage.databases.main.events_worker import DatabaseCorruptionError
from synapse.storage.databases.main.events_worker import (
DatabaseCorruptionError,
InvalidEventError,
)
from synapse.storage.databases.main.state_deltas import StateDeltasStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.storage.types import Cursor
@ -1592,17 +1595,15 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
# starve disk usage while this goes on.
#
# We upsert in case we have to run this multiple times.
#
# The `WHERE TRUE` clause is to avoid "Parsing Ambiguity"
txn.execute(
"""
INSERT INTO sliding_sync_joined_rooms_to_recalculate
(room_id)
SELECT room_id FROM rooms WHERE ?
SELECT DISTINCT room_id FROM local_current_membership
WHERE membership = 'join'
ON CONFLICT (room_id)
DO NOTHING;
""",
(True,),
)
await self.db_pool.runInteraction(
@ -1686,7 +1687,15 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
if not current_state_ids_map:
continue
try:
fetched_events = await self.get_events(current_state_ids_map.values())
except (DatabaseCorruptionError, InvalidEventError) as e:
logger.warning(
"Failed to fetch state for room '%s' due to corrupted events. Ignoring. Error: %s",
room_id,
e,
)
continue
current_state_map: StateMap[EventBase] = {
state_key: fetched_events[event_id]
@ -1719,10 +1728,13 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
+ "given we pulled the room out of `current_state_events`"
)
most_recent_event_stream_ordering = most_recent_event_pos_results[1].stream
assert most_recent_event_stream_ordering > 0, (
"We should have at-least one event in the room (our own join membership event for example) "
+ "that isn't backfilled (negative `stream_ordering`) if we are joined to the room."
)
# The `most_recent_event_stream_ordering` should be positive,
# however there are (very rare) rooms where that is not the case in
# the matrix.org database. It's not clear how they got into that
# state, but does mean that we cannot assert that the stream
# ordering is indeed positive.
# Figure out the latest `bump_stamp` in the room. This could be `None` for a
# federated room you just joined where all of events are still `outliers` or
# backfilled history. In the Sliding Sync API, we default to the user's
@ -1963,7 +1975,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
)
return 0
def _find_previous_membership_txn(
def _find_previous_invite_or_knock_membership_txn(
txn: LoggingTransaction, room_id: str, user_id: str, event_id: str
) -> Tuple[str, str]:
# Find the previous invite/knock event before the leave event
@ -2004,6 +2016,10 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
(
room_id,
user_id,
# We look explicitly for `invite` and `knock` events instead of
# just their previous membership as someone could have been `invite`
# -> `ban` -> unbanned (`leave`) and we want to find the `invite`
# event where the stripped state is.
Membership.INVITE,
Membership.KNOCK,
event_id,
@ -2089,7 +2105,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
fetched_events = await self.get_events(
current_state_ids_map.values()
)
except DatabaseCorruptionError as e:
except (DatabaseCorruptionError, InvalidEventError) as e:
logger.warning(
"Failed to fetch state for room '%s' due to corrupted events. Ignoring. Error: %s",
room_id,
@ -2152,8 +2168,8 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
invite_or_knock_event_id,
invite_or_knock_membership,
) = await self.db_pool.runInteraction(
"sliding_sync_membership_snapshots_bg_update._find_previous_membership",
_find_previous_membership_txn,
"sliding_sync_membership_snapshots_bg_update._find_previous_invite_or_knock_membership_txn",
_find_previous_invite_or_knock_membership_txn,
room_id,
user_id,
membership_event_id,
@ -2197,7 +2213,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
try:
fetched_events = await self.get_events(state_ids_map.values())
except DatabaseCorruptionError as e:
except (DatabaseCorruptionError, InvalidEventError) as e:
logger.warning(
"Failed to fetch state for room '%s' due to corrupted events. Ignoring. Error: %s",
room_id,

View file

@ -1382,6 +1382,30 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
partial_state_rooms = {row[0] for row in rows}
return {room_id: room_id in partial_state_rooms for room_id in room_ids}
@cached(max_entries=10000, iterable=True)
async def get_partial_rooms(self) -> AbstractSet[str]:
"""Get any "partial-state" rooms which the user is in.
This is fast as the set of partially stated rooms at any point across
the whole server is small, and so such a query is fast. This is also
faster than looking up whether a set of room ID's are partially stated
via `is_partial_state_room_batched(...)` because of the sheer amount of
CPU time looking all the rooms up in the cache.
"""
def _get_partial_rooms_for_user_txn(
txn: LoggingTransaction,
) -> AbstractSet[str]:
sql = """
SELECT room_id FROM partial_state_rooms
"""
txn.execute(sql)
return {room_id for (room_id,) in txn}
return await self.db_pool.runInteraction(
"get_partial_rooms_for_user", _get_partial_rooms_for_user_txn
)
async def get_join_event_id_and_device_lists_stream_id_for_partial_state(
self, room_id: str
) -> Tuple[str, int]:
@ -2341,6 +2365,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
self._invalidate_cache_and_stream(
txn, self._get_partial_state_servers_at_join, (room_id,)
)
self._invalidate_all_cache_and_stream(txn, self.get_partial_rooms)
async def write_partial_state_rooms_join_event_id(
self,
@ -2562,6 +2587,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
self._invalidate_cache_and_stream(
txn, self._get_partial_state_servers_at_join, (room_id,)
)
self._invalidate_all_cache_and_stream(txn, self.get_partial_rooms)
DatabasePool.simple_insert_txn(
txn,

View file

@ -41,6 +41,46 @@ logger = logging.getLogger(__name__)
class SlidingSyncStore(SQLBaseStore):
async def get_latest_bump_stamp_for_room(
self,
room_id: str,
) -> Optional[int]:
"""
Get the `bump_stamp` for the room.
The `bump_stamp` is the `stream_ordering` of the last event according to the
`bump_event_types`. This helps clients sort more readily without them needing to
pull in a bunch of the timeline to determine the last activity.
`bump_event_types` is a thing because for example, we don't want display name
changes to mark the room as unread and bump it to the top. For encrypted rooms,
we just have to consider any activity as a bump because we can't see the content
and the client has to figure it out for themselves.
This should only be called where the server is participating
in the room (someone local is joined).
Returns:
The `bump_stamp` for the room (which can be `None`).
"""
return cast(
Optional[int],
await self.db_pool.simple_select_one_onecol(
table="sliding_sync_joined_rooms",
keyvalues={"room_id": room_id},
retcol="bump_stamp",
# FIXME: This should be `False` once we bump `SCHEMA_COMPAT_VERSION` and run the
# foreground update for
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked
# by https://github.com/element-hq/synapse/issues/17623)
#
# The should be `allow_none=False` in the future because event though
# `bump_stamp` itself can be `None`, we should have a row in the
# `sliding_sync_joined_rooms` table for any joined room.
allow_none=True,
),
)
async def persist_per_connection_state(
self,
user_id: str,

View file

@ -108,7 +108,7 @@ class PaginateFunction(Protocol):
to_key: Optional[RoomStreamToken] = None,
direction: Direction = Direction.BACKWARDS,
limit: int = 0,
) -> Tuple[List[EventBase], RoomStreamToken]: ...
) -> Tuple[List[EventBase], RoomStreamToken, bool]: ...
# Used as return values for pagination APIs
@ -679,7 +679,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
to_key: Optional[RoomStreamToken] = None,
direction: Direction = Direction.BACKWARDS,
limit: int = 0,
) -> Dict[str, Tuple[List[EventBase], RoomStreamToken]]:
) -> Dict[str, Tuple[List[EventBase], RoomStreamToken, bool]]:
"""Get new room events in stream ordering since `from_key`.
Args:
@ -695,6 +695,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
A map from room id to a tuple containing:
- list of recent events in the room
- stream ordering key for the start of the chunk of events returned.
- a boolean to indicate if there were more events but we hit the limit
When Direction.FORWARDS: from_key < x <= to_key, (ascending order)
When Direction.BACKWARDS: from_key >= x > to_key, (descending order)
@ -758,7 +759,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
to_key: Optional[RoomStreamToken] = None,
direction: Direction = Direction.BACKWARDS,
limit: int = 0,
) -> Tuple[List[EventBase], RoomStreamToken]:
) -> Tuple[List[EventBase], RoomStreamToken, bool]:
"""
Paginate events by `stream_ordering` in the room from the `from_key` in the
given `direction` to the `to_key` or `limit`.
@ -773,8 +774,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
limit: Maximum number of events to return
Returns:
The results as a list of events and a token that points to the end
of the result set. If no events are returned then the end of the
The results as a list of events, a token that points to the end of
the result set, and a boolean to indicate if there were more events
but we hit the limit. If no events are returned then the end of the
stream has been reached (i.e. there are no events between `from_key`
and `to_key`).
@ -798,7 +800,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
and to_key.is_before_or_eq(from_key)
):
# Token selection matches what we do below if there are no rows
return [], to_key if to_key else from_key
return [], to_key if to_key else from_key, False
# Or vice-versa, if we're looking backwards and our `from_key` is already before
# our `to_key`.
elif (
@ -807,7 +809,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
and from_key.is_before_or_eq(to_key)
):
# Token selection matches what we do below if there are no rows
return [], to_key if to_key else from_key
return [], to_key if to_key else from_key, False
# We can do a quick sanity check to see if any events have been sent in the room
# since the earlier token.
@ -826,7 +828,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
if not has_changed:
# Token selection matches what we do below if there are no rows
return [], to_key if to_key else from_key
return [], to_key if to_key else from_key, False
order, from_bound, to_bound = generate_pagination_bounds(
direction, from_key, to_key
@ -842,7 +844,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
engine=self.database_engine,
)
def f(txn: LoggingTransaction) -> List[_EventDictReturn]:
def f(txn: LoggingTransaction) -> Tuple[List[_EventDictReturn], bool]:
sql = f"""
SELECT event_id, instance_name, stream_ordering
FROM events
@ -854,9 +856,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
"""
txn.execute(sql, (room_id, 2 * limit))
# Get all the rows and check if we hit the limit.
fetched_rows = txn.fetchall()
limited = len(fetched_rows) >= 2 * limit
rows = [
_EventDictReturn(event_id, None, stream_ordering)
for event_id, instance_name, stream_ordering in txn
for event_id, instance_name, stream_ordering in fetched_rows
if _filter_results_by_stream(
lower_token=(
to_key if direction == Direction.BACKWARDS else from_key
@ -867,10 +873,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
instance_name=instance_name,
stream_ordering=stream_ordering,
)
][:limit]
return rows
]
rows = await self.db_pool.runInteraction("get_room_events_stream_for_room", f)
if len(rows) > limit:
limited = True
rows = rows[:limit]
return rows, limited
rows, limited = await self.db_pool.runInteraction(
"get_room_events_stream_for_room", f
)
ret = await self.get_events_as_list(
[r.event_id for r in rows], get_prev_content=True
@ -887,7 +900,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# `_paginate_room_events_by_topological_ordering_txn(...)`)
next_key = to_key if to_key else from_key
return ret, next_key
return ret, next_key, limited
@trace
async def get_current_state_delta_membership_changes_for_user(
@ -1191,7 +1204,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
if limit == 0:
return [], end_token
rows, token = await self.db_pool.runInteraction(
rows, token, _ = await self.db_pool.runInteraction(
"get_recent_event_ids_for_room",
self._paginate_room_events_by_topological_ordering_txn,
room_id,
@ -1765,7 +1778,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
topological=topological_ordering, stream=stream_ordering
)
rows, start_token = self._paginate_room_events_by_topological_ordering_txn(
rows, start_token, _ = self._paginate_room_events_by_topological_ordering_txn(
txn,
room_id,
before_token,
@ -1775,7 +1788,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
)
events_before = [r.event_id for r in rows]
rows, end_token = self._paginate_room_events_by_topological_ordering_txn(
rows, end_token, _ = self._paginate_room_events_by_topological_ordering_txn(
txn,
room_id,
after_token,
@ -1947,7 +1960,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
direction: Direction = Direction.BACKWARDS,
limit: int = 0,
event_filter: Optional[Filter] = None,
) -> Tuple[List[_EventDictReturn], RoomStreamToken]:
) -> Tuple[List[_EventDictReturn], RoomStreamToken, bool]:
"""Returns list of events before or after a given token.
Args:
@ -1962,10 +1975,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
those that match the filter.
Returns:
A list of _EventDictReturn and a token that points to the end of the
result set. If no events are returned then the end of the stream has
been reached (i.e. there are no events between `from_token` and
`to_token`), or `limit` is zero.
A list of _EventDictReturn, a token that points to the end of the
result set, and a boolean to indicate if there were more events but
we hit the limit. If no events are returned then the end of the
stream has been reached (i.e. there are no events between
`from_token` and `to_token`), or `limit` is zero.
"""
# We can bail early if we're looking forwards, and our `to_key` is already
# before our `from_token`.
@ -1975,7 +1989,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
and to_token.is_before_or_eq(from_token)
):
# Token selection matches what we do below if there are no rows
return [], to_token if to_token else from_token
return [], to_token if to_token else from_token, False
# Or vice-versa, if we're looking backwards and our `from_token` is already before
# our `to_token`.
elif (
@ -1984,7 +1998,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
and from_token.is_before_or_eq(to_token)
):
# Token selection matches what we do below if there are no rows
return [], to_token if to_token else from_token
return [], to_token if to_token else from_token, False
args: List[Any] = [room_id]
@ -2007,6 +2021,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
args.extend(filter_args)
# We fetch more events as we'll filter the result set
requested_limit = int(limit) * 2
args.append(int(limit) * 2)
select_keywords = "SELECT"
@ -2071,10 +2086,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
}
txn.execute(sql, args)
# Get all the rows and check if we hit the limit.
fetched_rows = txn.fetchall()
limited = len(fetched_rows) >= requested_limit
# Filter the result set.
rows = [
_EventDictReturn(event_id, topological_ordering, stream_ordering)
for event_id, instance_name, topological_ordering, stream_ordering in txn
for event_id, instance_name, topological_ordering, stream_ordering in fetched_rows
if _filter_results(
lower_token=(
to_token if direction == Direction.BACKWARDS else from_token
@ -2086,7 +2105,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
topological_ordering=topological_ordering,
stream_ordering=stream_ordering,
)
][:limit]
]
if len(rows) > limit:
limited = True
rows = rows[:limit]
if rows:
assert rows[-1].topological_ordering is not None
@ -2097,7 +2121,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# TODO (erikj): We should work out what to do here instead.
next_token = to_token if to_token else from_token
return rows, next_token
return rows, next_token, limited
@trace
@tag_args
@ -2110,7 +2134,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
direction: Direction = Direction.BACKWARDS,
limit: int = 0,
event_filter: Optional[Filter] = None,
) -> Tuple[List[EventBase], RoomStreamToken]:
) -> Tuple[List[EventBase], RoomStreamToken, bool]:
"""
Paginate events by `topological_ordering` (tie-break with `stream_ordering`) in
the room from the `from_key` in the given `direction` to the `to_key` or
@ -2127,8 +2151,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
event_filter: If provided filters the events to those that match the filter.
Returns:
The results as a list of events and a token that points to the end
of the result set. If no events are returned then the end of the
The results as a list of events, a token that points to the end of
the result set, and a boolean to indicate if there were more events
but we hit the limit. If no events are returned then the end of the
stream has been reached (i.e. there are no events between `from_key`
and `to_key`).
@ -2152,7 +2177,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
):
# Token selection matches what we do in `_paginate_room_events_txn` if there
# are no rows
return [], to_key if to_key else from_key
return [], to_key if to_key else from_key, False
# Or vice-versa, if we're looking backwards and our `from_key` is already before
# our `to_key`.
elif (
@ -2162,9 +2187,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
):
# Token selection matches what we do in `_paginate_room_events_txn` if there
# are no rows
return [], to_key if to_key else from_key
return [], to_key if to_key else from_key, False
rows, token = await self.db_pool.runInteraction(
rows, token, limited = await self.db_pool.runInteraction(
"paginate_room_events_by_topological_ordering",
self._paginate_room_events_by_topological_ordering_txn,
room_id,
@ -2179,7 +2204,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
[r.event_id for r in rows], get_prev_content=True
)
return events, token
return events, token, limited
@cached()
async def get_id_for_instance(self, instance_name: str) -> int:

View file

@ -52,6 +52,20 @@ class RoomsForUserSlidingSync:
is_encrypted: bool
@attr.s(slots=True, frozen=True, weakref_slot=False, auto_attribs=True)
class RoomsForUserStateReset:
"""A version of `RoomsForUser` that supports optional sender and event ID
fields, to handle state resets. State resets can affect room membership
without a corresponding event so that information isn't always available."""
room_id: str
sender: Optional[str]
membership: str
event_id: Optional[str]
event_pos: PersistedEventPosition
room_version_id: str
@attr.s(slots=True, frozen=True, weakref_slot=False, auto_attribs=True)
class GetRoomsForUserWithStreamOrdering:
room_id: str

View file

@ -19,6 +19,7 @@ from enum import Enum
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Callable,
Dict,
Final,
@ -195,8 +196,8 @@ class SlidingSyncResult:
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
num_live: Optional[int]
bump_stamp: int
joined_count: int
invited_count: int
joined_count: Optional[int]
invited_count: Optional[int]
notification_count: int
highlight_count: int
@ -205,6 +206,12 @@ class SlidingSyncResult:
# If this is the first time the client is seeing the room, we should not filter it out
# under any circumstance.
self.initial
# We need to let the client know if any of the info has changed
or self.name is not None
or self.avatar is not None
or bool(self.heroes)
or self.joined_count is not None
or self.invited_count is not None
# We need to let the client know if there are any new events
or bool(self.required_state)
or bool(self.timeline_events)
@ -684,7 +691,12 @@ class HaveSentRoom(Generic[T]):
@staticmethod
def never() -> "HaveSentRoom[T]":
return HaveSentRoom(HaveSentRoomFlag.NEVER, None)
# We use a singleton to avoid repeatedly instantiating new `never`
# values.
return _HAVE_SENT_ROOM_NEVER
_HAVE_SENT_ROOM_NEVER: HaveSentRoom[Any] = HaveSentRoom(HaveSentRoomFlag.NEVER, None)
@attr.s(auto_attribs=True, slots=True, frozen=True)

File diff suppressed because it is too large Load diff

View file

@ -13,7 +13,7 @@
#
import logging
from parameterized import parameterized_class
from parameterized import parameterized, parameterized_class
from twisted.test.proto_helpers import MemoryReactor
@ -67,10 +67,11 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
super().prepare(reactor, clock, hs)
def test_rooms_meta_when_joined(self) -> None:
def test_rooms_meta_when_joined_initial(self) -> None:
"""
Test that the `rooms` `name` and `avatar` are included in the response and
reflect the current state of the room when the user is joined to the room.
Test that the `rooms` `name` and `avatar` are included in the initial sync
response and reflect the current state of the room when the user is joined to
the room.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
@ -107,6 +108,7 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Reflect the current state of the room
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
self.assertEqual(
response_body["rooms"][room_id1]["name"],
"my super room",
@ -129,6 +131,178 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
response_body["rooms"][room_id1].get("is_dm"),
)
def test_rooms_meta_when_joined_incremental_no_change(self) -> None:
"""
Test that the `rooms` `name` and `avatar` aren't included in an incremental sync
response if they haven't changed.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(
user2_id,
tok=user2_tok,
extra_content={
"name": "my super room",
},
)
# Set the room avatar URL
self.helper.send_state(
room_id1,
EventTypes.RoomAvatar,
{"url": "mxc://DUMMY_MEDIA_ID"},
tok=user2_tok,
)
self.helper.join(room_id1, user1_id, tok=user1_tok)
# Make the Sliding Sync request
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
# This needs to be set to one so the `RoomResult` isn't empty and
# the room comes down incremental sync when we send a new message.
"timeline_limit": 1,
}
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# Send a message to make the room come down sync
self.helper.send(room_id1, "message in room1", tok=user2_tok)
# Incremental sync
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
# We should only see changed meta info (nothing changed so we shouldn't see any
# of these fields)
self.assertNotIn(
"initial",
response_body["rooms"][room_id1],
)
self.assertNotIn(
"name",
response_body["rooms"][room_id1],
)
self.assertNotIn(
"avatar",
response_body["rooms"][room_id1],
)
self.assertNotIn(
"joined_count",
response_body["rooms"][room_id1],
)
self.assertNotIn(
"invited_count",
response_body["rooms"][room_id1],
)
self.assertIsNone(
response_body["rooms"][room_id1].get("is_dm"),
)
@parameterized.expand(
[
("in_required_state", True),
("not_in_required_state", False),
]
)
def test_rooms_meta_when_joined_incremental_with_state_change(
self, test_description: str, include_changed_state_in_required_state: bool
) -> None:
"""
Test that the `rooms` `name` and `avatar` are included in an incremental sync
response if they changed.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(
user2_id,
tok=user2_tok,
extra_content={
"name": "my super room",
},
)
# Set the room avatar URL
self.helper.send_state(
room_id1,
EventTypes.RoomAvatar,
{"url": "mxc://DUMMY_MEDIA_ID"},
tok=user2_tok,
)
self.helper.join(room_id1, user1_id, tok=user1_tok)
# Make the Sliding Sync request
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": (
[[EventTypes.Name, ""], [EventTypes.RoomAvatar, ""]]
# Conditionally include the changed state in the
# `required_state` to make sure whether we request it or not,
# the new room name still flows down to the client.
if include_changed_state_in_required_state
else []
),
"timeline_limit": 0,
}
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# Update the room name
self.helper.send_state(
room_id1,
EventTypes.Name,
{EventContentFields.ROOM_NAME: "my super duper room"},
tok=user2_tok,
)
# Update the room avatar URL
self.helper.send_state(
room_id1,
EventTypes.RoomAvatar,
{"url": "mxc://DUMMY_MEDIA_ID_UPDATED"},
tok=user2_tok,
)
# Incremental sync
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
# We should only see changed meta info (the room name and avatar)
self.assertNotIn(
"initial",
response_body["rooms"][room_id1],
)
self.assertEqual(
response_body["rooms"][room_id1]["name"],
"my super duper room",
response_body["rooms"][room_id1],
)
self.assertEqual(
response_body["rooms"][room_id1]["avatar"],
"mxc://DUMMY_MEDIA_ID_UPDATED",
response_body["rooms"][room_id1],
)
self.assertNotIn(
"joined_count",
response_body["rooms"][room_id1],
)
self.assertNotIn(
"invited_count",
response_body["rooms"][room_id1],
)
self.assertIsNone(
response_body["rooms"][room_id1].get("is_dm"),
)
def test_rooms_meta_when_invited(self) -> None:
"""
Test that the `rooms` `name` and `avatar` are included in the response and
@ -186,6 +360,7 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
# This should still reflect the current state of the room even when the user is
# invited.
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
self.assertEqual(
response_body["rooms"][room_id1]["name"],
"my super duper room",
@ -264,6 +439,7 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Reflect the state of the room at the time of leaving
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
self.assertEqual(
response_body["rooms"][room_id1]["name"],
"my super room",
@ -338,6 +514,7 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
# Room1 has a name so we shouldn't see any `heroes` which the client would use
# the calculate the room name themselves.
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
self.assertEqual(
response_body["rooms"][room_id1]["name"],
"my super room",
@ -354,6 +531,7 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
)
# Room2 doesn't have a name so we should see `heroes` populated
self.assertEqual(response_body["rooms"][room_id2]["initial"], True)
self.assertIsNone(response_body["rooms"][room_id2].get("name"))
self.assertCountEqual(
[
@ -425,6 +603,7 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Room2 doesn't have a name so we should see `heroes` populated
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
self.assertIsNone(response_body["rooms"][room_id1].get("name"))
self.assertCountEqual(
[
@ -497,7 +676,8 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
}
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Room2 doesn't have a name so we should see `heroes` populated
# Room doesn't have a name so we should see `heroes` populated
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
self.assertIsNone(response_body["rooms"][room_id1].get("name"))
self.assertCountEqual(
[
@ -527,6 +707,165 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
0,
)
def test_rooms_meta_heroes_incremental_sync_no_change(self) -> None:
"""
Test that the `rooms` `heroes` aren't included in an incremental sync
response if they haven't changed.
(when the room doesn't have a room name set)
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
user3_id = self.register_user("user3", "pass")
_user3_tok = self.login(user3_id, "pass")
room_id = self.helper.create_room_as(
user2_id,
tok=user2_tok,
extra_content={
# No room name set so that `heroes` is populated
#
# "name": "my super room2",
},
)
self.helper.join(room_id, user1_id, tok=user1_tok)
# User3 is invited
self.helper.invite(room_id, src=user2_id, targ=user3_id, tok=user2_tok)
# Make the Sliding Sync request
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
# This needs to be set to one so the `RoomResult` isn't empty and
# the room comes down incremental sync when we send a new message.
"timeline_limit": 1,
}
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# Send a message to make the room come down sync
self.helper.send(room_id, "message in room", tok=user2_tok)
# Incremental sync
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
# This is an incremental sync and the second time we have seen this room so it
# isn't `initial`
self.assertNotIn(
"initial",
response_body["rooms"][room_id],
)
# Room shouldn't have a room name because we're testing the `heroes` field which
# will only has a chance to appear if the room doesn't have a name.
self.assertNotIn(
"name",
response_body["rooms"][room_id],
)
# No change to heroes
self.assertNotIn(
"heroes",
response_body["rooms"][room_id],
)
# No change to member counts
self.assertNotIn(
"joined_count",
response_body["rooms"][room_id],
)
self.assertNotIn(
"invited_count",
response_body["rooms"][room_id],
)
# We didn't request any state so we shouldn't see any `required_state`
self.assertNotIn(
"required_state",
response_body["rooms"][room_id],
)
def test_rooms_meta_heroes_incremental_sync_with_membership_change(self) -> None:
"""
Test that the `rooms` `heroes` are included in an incremental sync response if
the membership has changed.
(when the room doesn't have a room name set)
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
user3_id = self.register_user("user3", "pass")
user3_tok = self.login(user3_id, "pass")
room_id = self.helper.create_room_as(
user2_id,
tok=user2_tok,
extra_content={
# No room name set so that `heroes` is populated
#
# "name": "my super room2",
},
)
self.helper.join(room_id, user1_id, tok=user1_tok)
# User3 is invited
self.helper.invite(room_id, src=user2_id, targ=user3_id, tok=user2_tok)
# Make the Sliding Sync request
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 0,
}
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# User3 joins (membership change)
self.helper.join(room_id, user3_id, tok=user3_tok)
# Incremental sync
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
# This is an incremental sync and the second time we have seen this room so it
# isn't `initial`
self.assertNotIn(
"initial",
response_body["rooms"][room_id],
)
# Room shouldn't have a room name because we're testing the `heroes` field which
# will only has a chance to appear if the room doesn't have a name.
self.assertNotIn(
"name",
response_body["rooms"][room_id],
)
# Membership change so we should see heroes and membership counts
self.assertCountEqual(
[
hero["user_id"]
for hero in response_body["rooms"][room_id].get("heroes", [])
],
# Heroes shouldn't include the user themselves (we shouldn't see user1)
[user2_id, user3_id],
)
self.assertEqual(
response_body["rooms"][room_id]["joined_count"],
3,
)
self.assertEqual(
response_body["rooms"][room_id]["invited_count"],
0,
)
# We didn't request any state so we shouldn't see any `required_state`
self.assertNotIn(
"required_state",
response_body["rooms"][room_id],
)
def test_rooms_bump_stamp(self) -> None:
"""
Test that `bump_stamp` is present and pointing to relevant events.
@ -743,3 +1082,48 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
self.assertGreater(response_body["rooms"][room_id]["bump_stamp"], 0)
def test_rooms_bump_stamp_invites(self) -> None:
"""
Test that `bump_stamp` is present and points to the membership event,
and not later events, for non-joined rooms
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id = self.helper.create_room_as(
user2_id,
tok=user2_tok,
)
# Invite user1 to the room
invite_response = self.helper.invite(room_id, user2_id, user1_id, tok=user2_tok)
# More messages happen after the invite
self.helper.send(room_id, "message in room1", tok=user2_tok)
# We expect the bump_stamp to match the invite.
invite_pos = self.get_success(
self.store.get_position_for_event(invite_response["event_id"])
)
# Doing an SS request should return a `bump_stamp` of the invite event,
# rather than the message that was sent after.
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 5,
}
}
}
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
self.assertEqual(
response_body["rooms"][room_id]["bump_stamp"], invite_pos.stream
)

View file

@ -22,7 +22,7 @@ import synapse.rest.admin
from synapse.api.constants import EventTypes
from synapse.rest.client import login, room, sync
from synapse.server import HomeServer
from synapse.types import StreamToken, StrSequence
from synapse.types import StrSequence
from synapse.util import Clock
from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase
@ -149,16 +149,10 @@ class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase):
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.send(room_id1, "activity1", tok=user2_tok)
self.helper.send(room_id1, "activity2", tok=user2_tok)
event_response1 = self.helper.send(room_id1, "activity1", tok=user2_tok)
event_response2 = self.helper.send(room_id1, "activity2", tok=user2_tok)
event_response3 = self.helper.send(room_id1, "activity3", tok=user2_tok)
event_pos3 = self.get_success(
self.store.get_position_for_event(event_response3["event_id"])
)
event_response4 = self.helper.send(room_id1, "activity4", tok=user2_tok)
event_pos4 = self.get_success(
self.store.get_position_for_event(event_response4["event_id"])
)
event_response5 = self.helper.send(room_id1, "activity5", tok=user2_tok)
user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
@ -196,27 +190,23 @@ class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase):
)
# Check to make sure the `prev_batch` points at the right place
prev_batch_token = self.get_success(
StreamToken.from_string(
self.store, response_body["rooms"][room_id1]["prev_batch"]
prev_batch_token = response_body["rooms"][room_id1]["prev_batch"]
# If we use the `prev_batch` token to look backwards we should see
# `event3` and older next.
channel = self.make_request(
"GET",
f"/rooms/{room_id1}/messages?from={prev_batch_token}&dir=b&limit=3",
access_token=user1_tok,
)
)
prev_batch_room_stream_token_serialized = self.get_success(
prev_batch_token.room_key.to_string(self.store)
)
# If we use the `prev_batch` token to look backwards, we should see `event3`
# next so make sure the token encompasses it
self.assertEqual(
event_pos3.persisted_after(prev_batch_token.room_key),
False,
f"`prev_batch` token {prev_batch_room_stream_token_serialized} should be >= event_pos3={self.get_success(event_pos3.to_room_stream_token().to_string(self.store))}",
)
# If we use the `prev_batch` token to look backwards, we shouldn't see `event4`
# anymore since it was just returned in this response.
self.assertEqual(
event_pos4.persisted_after(prev_batch_token.room_key),
True,
f"`prev_batch` token {prev_batch_room_stream_token_serialized} should be < event_pos4={self.get_success(event_pos4.to_room_stream_token().to_string(self.store))}",
self.assertEqual(channel.code, 200, channel.json_body)
self.assertListEqual(
[
event_response3["event_id"],
event_response2["event_id"],
event_response1["event_id"],
],
[ev["event_id"] for ev in channel.json_body["chunk"]],
)
# With no `from_token` (initial sync), it's all historical since there is no

View file

@ -106,6 +106,12 @@ class SlidingSyncTablesTestCaseBase(HomeserverTestCase):
assert persist_events_store is not None
self.persist_events_store = persist_events_store
persist_controller = self.hs.get_storage_controllers().persistence
assert persist_controller is not None
self.persist_controller = persist_controller
self.state_handler = self.hs.get_state_handler()
def _get_sliding_sync_joined_rooms(self) -> Dict[str, _SlidingSyncJoinedRoomResult]:
"""
Return the rows from the `sliding_sync_joined_rooms` table.
@ -260,10 +266,8 @@ class SlidingSyncTablesTestCaseBase(HomeserverTestCase):
)
)
context = EventContext.for_outlier(self.hs.get_storage_controllers())
persist_controller = self.hs.get_storage_controllers().persistence
assert persist_controller is not None
persisted_event, _, _ = self.get_success(
persist_controller.persist_event(invite_event, context)
self.persist_controller.persist_event(invite_event, context)
)
self._remote_invite_count += 1
@ -316,10 +320,8 @@ class SlidingSyncTablesTestCaseBase(HomeserverTestCase):
)
)
context = EventContext.for_outlier(self.hs.get_storage_controllers())
persist_controller = self.hs.get_storage_controllers().persistence
assert persist_controller is not None
persisted_event, _, _ = self.get_success(
persist_controller.persist_event(kick_event, context)
self.persist_controller.persist_event(kick_event, context)
)
return persisted_event
@ -926,6 +928,201 @@ class SlidingSyncTablesTestCase(SlidingSyncTablesTestCaseBase):
user2_snapshot,
)
def test_joined_room_bump_stamp_backfill(self) -> None:
"""
Test that `bump_stamp` ignores backfilled events, i.e. events with a
negative stream ordering.
"""
user1_id = self.register_user("user1", "pass")
_user1_tok = self.login(user1_id, "pass")
# Create a remote room
creator = "@user:other"
room_id = "!foo:other"
room_version = RoomVersions.V10
shared_kwargs = {
"room_id": room_id,
"room_version": room_version.identifier,
}
create_tuple = self.get_success(
create_event(
self.hs,
prev_event_ids=[],
type=EventTypes.Create,
state_key="",
content={
# The `ROOM_CREATOR` field could be removed if we used a room
# version > 10 (in favor of relying on `sender`)
EventContentFields.ROOM_CREATOR: creator,
EventContentFields.ROOM_VERSION: room_version.identifier,
},
sender=creator,
**shared_kwargs,
)
)
creator_tuple = self.get_success(
create_event(
self.hs,
prev_event_ids=[create_tuple[0].event_id],
auth_event_ids=[create_tuple[0].event_id],
type=EventTypes.Member,
state_key=creator,
content={"membership": Membership.JOIN},
sender=creator,
**shared_kwargs,
)
)
room_name_tuple = self.get_success(
create_event(
self.hs,
prev_event_ids=[creator_tuple[0].event_id],
auth_event_ids=[create_tuple[0].event_id, creator_tuple[0].event_id],
type=EventTypes.Name,
state_key="",
content={
EventContentFields.ROOM_NAME: "my super duper room",
},
sender=creator,
**shared_kwargs,
)
)
# We add a message event as a valid "bump type"
msg_tuple = self.get_success(
create_event(
self.hs,
prev_event_ids=[room_name_tuple[0].event_id],
auth_event_ids=[create_tuple[0].event_id, creator_tuple[0].event_id],
type=EventTypes.Message,
content={"body": "foo", "msgtype": "m.text"},
sender=creator,
**shared_kwargs,
)
)
invite_tuple = self.get_success(
create_event(
self.hs,
prev_event_ids=[msg_tuple[0].event_id],
auth_event_ids=[create_tuple[0].event_id, creator_tuple[0].event_id],
type=EventTypes.Member,
state_key=user1_id,
content={"membership": Membership.INVITE},
sender=creator,
**shared_kwargs,
)
)
remote_events_and_contexts = [
create_tuple,
creator_tuple,
room_name_tuple,
msg_tuple,
invite_tuple,
]
# Ensure the local HS knows the room version
self.get_success(self.store.store_room(room_id, creator, False, room_version))
# Persist these events as backfilled events.
for event, context in remote_events_and_contexts:
self.get_success(
self.persist_controller.persist_event(event, context, backfilled=True)
)
# Now we join the local user to the room. We want to make this feel as close to
# the real `process_remote_join()` as possible but we'd like to avoid some of
# the auth checks that would be done in the real code.
#
# FIXME: The test was originally written using this less-real
# `persist_event(...)` shortcut but it would be nice to use the real remote join
# process in a `FederatingHomeserverTestCase`.
flawed_join_tuple = self.get_success(
create_event(
self.hs,
prev_event_ids=[invite_tuple[0].event_id],
# This doesn't work correctly to create an `EventContext` that includes
# both of these state events. I assume it's because we're working on our
# local homeserver which has the remote state set as `outlier`. We have
# to create our own EventContext below to get this right.
auth_event_ids=[create_tuple[0].event_id, invite_tuple[0].event_id],
type=EventTypes.Member,
state_key=user1_id,
content={"membership": Membership.JOIN},
sender=user1_id,
**shared_kwargs,
)
)
# We have to create our own context to get the state set correctly. If we use
# the `EventContext` from the `flawed_join_tuple`, the `current_state_events`
# table will only have the join event in it which should never happen in our
# real server.
join_event = flawed_join_tuple[0]
join_context = self.get_success(
self.state_handler.compute_event_context(
join_event,
state_ids_before_event={
(e.type, e.state_key): e.event_id
for e in [create_tuple[0], invite_tuple[0], room_name_tuple[0]]
},
partial_state=False,
)
)
join_event, _join_event_pos, _room_token = self.get_success(
self.persist_controller.persist_event(join_event, join_context)
)
# Make sure the tables are populated correctly
sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms()
self.assertIncludes(
set(sliding_sync_joined_rooms_results.keys()),
{room_id},
exact=True,
)
self.assertEqual(
sliding_sync_joined_rooms_results[room_id],
_SlidingSyncJoinedRoomResult(
room_id=room_id,
# This should be the last event in the room (the join membership)
event_stream_ordering=join_event.internal_metadata.stream_ordering,
# Since all of the bump events are backfilled, the `bump_stamp` should
# still be `None`. (and we will fallback to the users membership event
# position in the Sliding Sync API)
bump_stamp=None,
room_type=None,
# We still pick up state of the room even if it's backfilled
room_name="my super duper room",
is_encrypted=False,
tombstone_successor_room_id=None,
),
)
sliding_sync_membership_snapshots_results = (
self._get_sliding_sync_membership_snapshots()
)
self.assertIncludes(
set(sliding_sync_membership_snapshots_results.keys()),
{
(room_id, user1_id),
},
exact=True,
)
self.assertEqual(
sliding_sync_membership_snapshots_results.get((room_id, user1_id)),
_SlidingSyncMembershipSnapshotResult(
room_id=room_id,
user_id=user1_id,
sender=user1_id,
membership_event_id=join_event.event_id,
membership=Membership.JOIN,
event_stream_ordering=join_event.internal_metadata.stream_ordering,
has_known_state=True,
room_type=None,
room_name="my super duper room",
is_encrypted=False,
tombstone_successor_room_id=None,
),
)
@parameterized.expand(
# Test both an insert an upsert into the
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` to exercise
@ -1036,11 +1233,9 @@ class SlidingSyncTablesTestCase(SlidingSyncTablesTestCaseBase):
context = self.get_success(unpersisted_context.persist(event))
events_to_persist.append((event, context))
persist_controller = self.hs.get_storage_controllers().persistence
assert persist_controller is not None
for event, context in events_to_persist:
self.get_success(
persist_controller.persist_event(
self.persist_controller.persist_event(
event,
context,
)

View file

@ -147,7 +147,7 @@ class PaginationTestCase(HomeserverTestCase):
def _filter_messages(self, filter: JsonDict) -> List[str]:
"""Make a request to /messages with a filter, returns the chunk of events."""
events, next_key = self.get_success(
events, next_key, _ = self.get_success(
self.hs.get_datastores().main.paginate_room_events_by_topological_ordering(
room_id=self.room_id,
from_key=self.from_token.room_key,