mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-23 05:00:24 +03:00
c2e5e9e67c
Performance optimization: We can avoid fetching rooms that the user has left themselves (which could be a significant amount), then only add back rooms that the user has `newly_left` (left in the token range of an incremental sync). It's a lot faster to fetch less rooms than fetch them all and throw them away in most cases. Since the user only leaves a room (or is state reset out) once in a blue moon, we can avoid a lot of work. Based on @erikjohnston's branch, erikj/ss_perf --------- Co-authored-by: Erik Johnston <erik@matrix.org>
1467 lines
54 KiB
Python
1467 lines
54 KiB
Python
#
|
|
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
|
#
|
|
# Copyright (C) 2024 New Vector, Ltd
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# See the GNU Affero General Public License for more details:
|
|
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
|
#
|
|
import logging
|
|
from typing import Any, Dict, Iterable, List, Literal, Optional, Tuple
|
|
from unittest.mock import AsyncMock
|
|
|
|
from parameterized import parameterized, parameterized_class
|
|
from typing_extensions import assert_never
|
|
|
|
from twisted.test.proto_helpers import MemoryReactor
|
|
|
|
import synapse.rest.admin
|
|
from synapse.api.constants import (
|
|
AccountDataTypes,
|
|
EventContentFields,
|
|
EventTypes,
|
|
JoinRules,
|
|
Membership,
|
|
RoomTypes,
|
|
)
|
|
from synapse.api.room_versions import RoomVersions
|
|
from synapse.events import EventBase, StrippedStateEvent, make_event_from_dict
|
|
from synapse.events.snapshot import EventContext
|
|
from synapse.handlers.sliding_sync import StateValues
|
|
from synapse.rest.client import account_data, devices, login, receipts, room, sync
|
|
from synapse.server import HomeServer
|
|
from synapse.types import (
|
|
JsonDict,
|
|
RoomStreamToken,
|
|
SlidingSyncStreamToken,
|
|
StreamKeyType,
|
|
StreamToken,
|
|
)
|
|
from synapse.util import Clock
|
|
from synapse.util.stringutils import random_string
|
|
|
|
from tests import unittest
|
|
from tests.server import TimedOutException
|
|
from tests.test_utils.event_injection import create_event
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SlidingSyncBase(unittest.HomeserverTestCase):
|
|
"""Base class for sliding sync test cases"""
|
|
|
|
# Flag as to whether to use the new sliding sync tables or not
|
|
#
|
|
# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
|
|
# foreground update for
|
|
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
|
|
# https://github.com/element-hq/synapse/issues/17623)
|
|
use_new_tables: bool = True
|
|
|
|
sync_endpoint = "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
|
|
|
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
|
# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
|
|
# foreground update for
|
|
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
|
|
# https://github.com/element-hq/synapse/issues/17623)
|
|
hs.get_datastores().main.have_finished_sliding_sync_background_jobs = AsyncMock( # type: ignore[method-assign]
|
|
return_value=self.use_new_tables
|
|
)
|
|
|
|
def default_config(self) -> JsonDict:
|
|
config = super().default_config()
|
|
# Enable sliding sync
|
|
config["experimental_features"] = {"msc3575_enabled": True}
|
|
return config
|
|
|
|
def do_sync(
|
|
self, sync_body: JsonDict, *, since: Optional[str] = None, tok: str
|
|
) -> Tuple[JsonDict, str]:
|
|
"""Do a sliding sync request with given body.
|
|
|
|
Asserts the request was successful.
|
|
|
|
Attributes:
|
|
sync_body: The full request body to use
|
|
since: Optional since token
|
|
tok: Access token to use
|
|
|
|
Returns:
|
|
A tuple of the response body and the `pos` field.
|
|
"""
|
|
|
|
sync_path = self.sync_endpoint
|
|
if since:
|
|
sync_path += f"?pos={since}"
|
|
|
|
channel = self.make_request(
|
|
method="POST",
|
|
path=sync_path,
|
|
content=sync_body,
|
|
access_token=tok,
|
|
)
|
|
self.assertEqual(channel.code, 200, channel.json_body)
|
|
|
|
return channel.json_body, channel.json_body["pos"]
|
|
|
|
def _assertRequiredStateIncludes(
|
|
self,
|
|
actual_required_state: Any,
|
|
expected_state_events: Iterable[EventBase],
|
|
exact: bool = False,
|
|
) -> None:
|
|
"""
|
|
Wrapper around `assertIncludes` to give slightly better looking diff error
|
|
messages that include some context "$event_id (type, state_key)".
|
|
|
|
Args:
|
|
actual_required_state: The "required_state" of a room from a Sliding Sync
|
|
request response.
|
|
expected_state_events: The expected state events to be included in the
|
|
`actual_required_state`.
|
|
exact: Whether the actual state should be exactly equal to the expected
|
|
state (no extras).
|
|
"""
|
|
|
|
assert isinstance(actual_required_state, list)
|
|
for event in actual_required_state:
|
|
assert isinstance(event, dict)
|
|
|
|
self.assertIncludes(
|
|
{
|
|
f'{event["event_id"]} ("{event["type"]}", "{event["state_key"]}")'
|
|
for event in actual_required_state
|
|
},
|
|
{
|
|
f'{event.event_id} ("{event.type}", "{event.state_key}")'
|
|
for event in expected_state_events
|
|
},
|
|
exact=exact,
|
|
# Message to help understand the diff in context
|
|
message=str(actual_required_state),
|
|
)
|
|
|
|
def _add_new_dm_to_global_account_data(
|
|
self, source_user_id: str, target_user_id: str, target_room_id: str
|
|
) -> None:
|
|
"""
|
|
Helper to handle inserting a new DM for the source user into global account data
|
|
(handles all of the list merging).
|
|
|
|
Args:
|
|
source_user_id: The user ID of the DM mapping we're going to update
|
|
target_user_id: User ID of the person the DM is with
|
|
target_room_id: Room ID of the DM
|
|
"""
|
|
store = self.hs.get_datastores().main
|
|
|
|
# Get the current DM map
|
|
existing_dm_map = self.get_success(
|
|
store.get_global_account_data_by_type_for_user(
|
|
source_user_id, AccountDataTypes.DIRECT
|
|
)
|
|
)
|
|
# Scrutinize the account data since it has no concrete type. We're just copying
|
|
# everything into a known type. It should be a mapping from user ID to a list of
|
|
# room IDs. Ignore anything else.
|
|
new_dm_map: Dict[str, List[str]] = {}
|
|
if isinstance(existing_dm_map, dict):
|
|
for user_id, room_ids in existing_dm_map.items():
|
|
if isinstance(user_id, str) and isinstance(room_ids, list):
|
|
for room_id in room_ids:
|
|
if isinstance(room_id, str):
|
|
new_dm_map[user_id] = new_dm_map.get(user_id, []) + [
|
|
room_id
|
|
]
|
|
|
|
# Add the new DM to the map
|
|
new_dm_map[target_user_id] = new_dm_map.get(target_user_id, []) + [
|
|
target_room_id
|
|
]
|
|
# Save the DM map to global account data
|
|
self.get_success(
|
|
store.add_account_data_for_user(
|
|
source_user_id,
|
|
AccountDataTypes.DIRECT,
|
|
new_dm_map,
|
|
)
|
|
)
|
|
|
|
def _create_dm_room(
|
|
self,
|
|
inviter_user_id: str,
|
|
inviter_tok: str,
|
|
invitee_user_id: str,
|
|
invitee_tok: str,
|
|
should_join_room: bool = True,
|
|
) -> str:
|
|
"""
|
|
Helper to create a DM room as the "inviter" and invite the "invitee" user to the
|
|
room. The "invitee" user also will join the room. The `m.direct` account data
|
|
will be set for both users.
|
|
"""
|
|
# Create a room and send an invite the other user
|
|
room_id = self.helper.create_room_as(
|
|
inviter_user_id,
|
|
is_public=False,
|
|
tok=inviter_tok,
|
|
)
|
|
self.helper.invite(
|
|
room_id,
|
|
src=inviter_user_id,
|
|
targ=invitee_user_id,
|
|
tok=inviter_tok,
|
|
extra_data={"is_direct": True},
|
|
)
|
|
if should_join_room:
|
|
# Person that was invited joins the room
|
|
self.helper.join(room_id, invitee_user_id, tok=invitee_tok)
|
|
|
|
# Mimic the client setting the room as a direct message in the global account
|
|
# data for both users.
|
|
self._add_new_dm_to_global_account_data(
|
|
invitee_user_id, inviter_user_id, room_id
|
|
)
|
|
self._add_new_dm_to_global_account_data(
|
|
inviter_user_id, invitee_user_id, room_id
|
|
)
|
|
|
|
return room_id
|
|
|
|
_remote_invite_count: int = 0
|
|
|
|
def _create_remote_invite_room_for_user(
|
|
self,
|
|
invitee_user_id: str,
|
|
unsigned_invite_room_state: Optional[List[StrippedStateEvent]],
|
|
) -> str:
|
|
"""
|
|
Create a fake invite for a remote room and persist it.
|
|
|
|
We don't have any state for these kind of rooms and can only rely on the
|
|
stripped state included in the unsigned portion of the invite event to identify
|
|
the room.
|
|
|
|
Args:
|
|
invitee_user_id: The person being invited
|
|
unsigned_invite_room_state: List of stripped state events to assist the
|
|
receiver in identifying the room.
|
|
|
|
Returns:
|
|
The room ID of the remote invite room
|
|
"""
|
|
store = self.hs.get_datastores().main
|
|
|
|
invite_room_id = f"!test_room{self._remote_invite_count}:remote_server"
|
|
|
|
invite_event_dict = {
|
|
"room_id": invite_room_id,
|
|
"sender": "@inviter:remote_server",
|
|
"state_key": invitee_user_id,
|
|
"depth": 1,
|
|
"origin_server_ts": 1,
|
|
"type": EventTypes.Member,
|
|
"content": {"membership": Membership.INVITE},
|
|
"auth_events": [],
|
|
"prev_events": [],
|
|
}
|
|
if unsigned_invite_room_state is not None:
|
|
serialized_stripped_state_events = []
|
|
for stripped_event in unsigned_invite_room_state:
|
|
serialized_stripped_state_events.append(
|
|
{
|
|
"type": stripped_event.type,
|
|
"state_key": stripped_event.state_key,
|
|
"sender": stripped_event.sender,
|
|
"content": stripped_event.content,
|
|
}
|
|
)
|
|
|
|
invite_event_dict["unsigned"] = {
|
|
"invite_room_state": serialized_stripped_state_events
|
|
}
|
|
|
|
invite_event = make_event_from_dict(
|
|
invite_event_dict,
|
|
room_version=RoomVersions.V10,
|
|
)
|
|
invite_event.internal_metadata.outlier = True
|
|
invite_event.internal_metadata.out_of_band_membership = True
|
|
|
|
self.get_success(
|
|
store.maybe_store_room_on_outlier_membership(
|
|
room_id=invite_room_id, room_version=invite_event.room_version
|
|
)
|
|
)
|
|
context = EventContext.for_outlier(self.hs.get_storage_controllers())
|
|
persist_controller = self.hs.get_storage_controllers().persistence
|
|
assert persist_controller is not None
|
|
self.get_success(persist_controller.persist_event(invite_event, context))
|
|
|
|
self._remote_invite_count += 1
|
|
|
|
return invite_room_id
|
|
|
|
def _bump_notifier_wait_for_events(
|
|
self,
|
|
user_id: str,
|
|
wake_stream_key: Literal[
|
|
StreamKeyType.ACCOUNT_DATA,
|
|
StreamKeyType.PRESENCE,
|
|
],
|
|
) -> None:
|
|
"""
|
|
Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding
|
|
Sync results.
|
|
|
|
Args:
|
|
user_id: The user ID to wake up the notifier for
|
|
wake_stream_key: The stream key to wake up. This will create an actual new
|
|
entity in that stream so it's best to choose one that won't affect the
|
|
Sliding Sync results you're testing for. In other words, if your testing
|
|
account data, choose `StreamKeyType.PRESENCE` instead. We support two
|
|
possible stream keys because you're probably testing one or the other so
|
|
one is always a "safe" option.
|
|
"""
|
|
# We're expecting some new activity from this point onwards
|
|
from_token = self.hs.get_event_sources().get_current_token()
|
|
|
|
triggered_notifier_wait_for_events = False
|
|
|
|
async def _on_new_acivity(
|
|
before_token: StreamToken, after_token: StreamToken
|
|
) -> bool:
|
|
nonlocal triggered_notifier_wait_for_events
|
|
triggered_notifier_wait_for_events = True
|
|
return True
|
|
|
|
notifier = self.hs.get_notifier()
|
|
|
|
# Listen for some new activity for the user. We're just trying to confirm that
|
|
# our bump below actually does what we think it does (triggers new activity for
|
|
# the user).
|
|
result_awaitable = notifier.wait_for_events(
|
|
user_id,
|
|
1000,
|
|
_on_new_acivity,
|
|
from_token=from_token,
|
|
)
|
|
|
|
# Update the account data or presence so that `notifier.wait_for_events(...)`
|
|
# wakes up. We chose these two options because they're least likely to show up
|
|
# in the Sliding Sync response so it won't affect whether we have results.
|
|
if wake_stream_key == StreamKeyType.ACCOUNT_DATA:
|
|
self.get_success(
|
|
self.hs.get_account_data_handler().add_account_data_for_user(
|
|
user_id,
|
|
"org.matrix.foobarbaz",
|
|
{"foo": "bar"},
|
|
)
|
|
)
|
|
elif wake_stream_key == StreamKeyType.PRESENCE:
|
|
sending_user_id = self.register_user(
|
|
"user_bump_notifier_wait_for_events_" + random_string(10), "pass"
|
|
)
|
|
sending_user_tok = self.login(sending_user_id, "pass")
|
|
test_msg = {"foo": "bar"}
|
|
chan = self.make_request(
|
|
"PUT",
|
|
"/_matrix/client/r0/sendToDevice/m.test/1234",
|
|
content={"messages": {user_id: {"d1": test_msg}}},
|
|
access_token=sending_user_tok,
|
|
)
|
|
self.assertEqual(chan.code, 200, chan.result)
|
|
else:
|
|
assert_never(wake_stream_key)
|
|
|
|
# Wait for our notifier result
|
|
self.get_success(result_awaitable)
|
|
|
|
if not triggered_notifier_wait_for_events:
|
|
raise AssertionError(
|
|
"Expected `notifier.wait_for_events(...)` to be triggered"
|
|
)
|
|
|
|
|
|
# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
|
|
# foreground update for
|
|
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
|
|
# https://github.com/element-hq/synapse/issues/17623)
|
|
@parameterized_class(
|
|
("use_new_tables",),
|
|
[
|
|
(True,),
|
|
(False,),
|
|
],
|
|
class_name_func=lambda cls,
|
|
num,
|
|
params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}",
|
|
)
|
|
class SlidingSyncTestCase(SlidingSyncBase):
|
|
"""
|
|
Tests regarding MSC3575 Sliding Sync `/sync` endpoint.
|
|
|
|
Please put tests in more specific test files if applicable. This test class is meant
|
|
for generic behavior of the endpoint.
|
|
"""
|
|
|
|
servlets = [
|
|
synapse.rest.admin.register_servlets,
|
|
login.register_servlets,
|
|
room.register_servlets,
|
|
sync.register_servlets,
|
|
devices.register_servlets,
|
|
receipts.register_servlets,
|
|
account_data.register_servlets,
|
|
]
|
|
|
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
|
self.store = hs.get_datastores().main
|
|
self.event_sources = hs.get_event_sources()
|
|
self.storage_controllers = hs.get_storage_controllers()
|
|
self.account_data_handler = hs.get_account_data_handler()
|
|
persistence = self.hs.get_storage_controllers().persistence
|
|
assert persistence is not None
|
|
self.persistence = persistence
|
|
|
|
super().prepare(reactor, clock, hs)
|
|
|
|
def test_sync_list(self) -> None:
|
|
"""
|
|
Test that room IDs show up in the Sliding Sync `lists`
|
|
"""
|
|
user1_id = self.register_user("user1", "pass")
|
|
user1_tok = self.login(user1_id, "pass")
|
|
|
|
room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
|
|
|
# Make the Sliding Sync request
|
|
sync_body = {
|
|
"lists": {
|
|
"foo-list": {
|
|
"ranges": [[0, 99]],
|
|
"required_state": [],
|
|
"timeline_limit": 1,
|
|
}
|
|
}
|
|
}
|
|
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
|
|
|
|
# Make sure it has the foo-list we requested
|
|
self.assertListEqual(
|
|
list(response_body["lists"].keys()),
|
|
["foo-list"],
|
|
response_body["lists"].keys(),
|
|
)
|
|
|
|
# Make sure the list includes the room we are joined to
|
|
self.assertListEqual(
|
|
list(response_body["lists"]["foo-list"]["ops"]),
|
|
[
|
|
{
|
|
"op": "SYNC",
|
|
"range": [0, 99],
|
|
"room_ids": [room_id],
|
|
}
|
|
],
|
|
response_body["lists"]["foo-list"],
|
|
)
|
|
|
|
def test_wait_for_sync_token(self) -> None:
|
|
"""
|
|
Test that worker will wait until it catches up to the given token
|
|
"""
|
|
user1_id = self.register_user("user1", "pass")
|
|
user1_tok = self.login(user1_id, "pass")
|
|
|
|
# Create a future token that will cause us to wait. Since we never send a new
|
|
# event to reach that future stream_ordering, the worker will wait until the
|
|
# full timeout.
|
|
stream_id_gen = self.store.get_events_stream_id_generator()
|
|
stream_id = self.get_success(stream_id_gen.get_next().__aenter__())
|
|
current_token = self.event_sources.get_current_token()
|
|
future_position_token = current_token.copy_and_replace(
|
|
StreamKeyType.ROOM,
|
|
RoomStreamToken(stream=stream_id),
|
|
)
|
|
|
|
future_position_token_serialized = self.get_success(
|
|
SlidingSyncStreamToken(future_position_token, 0).to_string(self.store)
|
|
)
|
|
|
|
# Make the Sliding Sync request
|
|
sync_body = {
|
|
"lists": {
|
|
"foo-list": {
|
|
"ranges": [[0, 99]],
|
|
"required_state": [],
|
|
"timeline_limit": 1,
|
|
}
|
|
}
|
|
}
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.sync_endpoint + f"?pos={future_position_token_serialized}",
|
|
content=sync_body,
|
|
access_token=user1_tok,
|
|
await_result=False,
|
|
)
|
|
# Block for 10 seconds to make `notifier.wait_for_stream_token(from_token)`
|
|
# timeout
|
|
with self.assertRaises(TimedOutException):
|
|
channel.await_result(timeout_ms=9900)
|
|
channel.await_result(timeout_ms=200)
|
|
self.assertEqual(channel.code, 200, channel.json_body)
|
|
|
|
# We expect the next `pos` in the result to be the same as what we requested
|
|
# with because we weren't able to find anything new yet.
|
|
self.assertEqual(channel.json_body["pos"], future_position_token_serialized)
|
|
|
|
def test_wait_for_new_data(self) -> None:
|
|
"""
|
|
Test to make sure that the Sliding Sync request waits for new data to arrive.
|
|
|
|
(Only applies to incremental syncs with a `timeout` specified)
|
|
"""
|
|
user1_id = self.register_user("user1", "pass")
|
|
user1_tok = self.login(user1_id, "pass")
|
|
user2_id = self.register_user("user2", "pass")
|
|
user2_tok = self.login(user2_id, "pass")
|
|
|
|
room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
|
|
self.helper.join(room_id, user1_id, tok=user1_tok)
|
|
|
|
sync_body = {
|
|
"lists": {
|
|
"foo-list": {
|
|
"ranges": [[0, 0]],
|
|
"required_state": [],
|
|
"timeline_limit": 1,
|
|
}
|
|
}
|
|
}
|
|
_, from_token = self.do_sync(sync_body, tok=user1_tok)
|
|
|
|
# Make the Sliding Sync request
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.sync_endpoint + f"?timeout=10000&pos={from_token}",
|
|
content=sync_body,
|
|
access_token=user1_tok,
|
|
await_result=False,
|
|
)
|
|
# Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
|
|
with self.assertRaises(TimedOutException):
|
|
channel.await_result(timeout_ms=5000)
|
|
# Bump the room with new events to trigger new results
|
|
event_response1 = self.helper.send(
|
|
room_id, "new activity in room", tok=user1_tok
|
|
)
|
|
# Should respond before the 10 second timeout
|
|
channel.await_result(timeout_ms=3000)
|
|
self.assertEqual(channel.code, 200, channel.json_body)
|
|
|
|
# Check to make sure the new event is returned
|
|
self.assertEqual(
|
|
[
|
|
event["event_id"]
|
|
for event in channel.json_body["rooms"][room_id]["timeline"]
|
|
],
|
|
[
|
|
event_response1["event_id"],
|
|
],
|
|
channel.json_body["rooms"][room_id]["timeline"],
|
|
)
|
|
|
|
def test_wait_for_new_data_timeout(self) -> None:
|
|
"""
|
|
Test to make sure that the Sliding Sync request waits for new data to arrive but
|
|
no data ever arrives so we timeout. We're also making sure that the default data
|
|
doesn't trigger a false-positive for new data.
|
|
"""
|
|
user1_id = self.register_user("user1", "pass")
|
|
user1_tok = self.login(user1_id, "pass")
|
|
user2_id = self.register_user("user2", "pass")
|
|
user2_tok = self.login(user2_id, "pass")
|
|
|
|
room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
|
|
self.helper.join(room_id, user1_id, tok=user1_tok)
|
|
|
|
sync_body = {
|
|
"lists": {
|
|
"foo-list": {
|
|
"ranges": [[0, 0]],
|
|
"required_state": [],
|
|
"timeline_limit": 1,
|
|
}
|
|
}
|
|
}
|
|
_, from_token = self.do_sync(sync_body, tok=user1_tok)
|
|
|
|
# Make the Sliding Sync request
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.sync_endpoint + f"?timeout=10000&pos={from_token}",
|
|
content=sync_body,
|
|
access_token=user1_tok,
|
|
await_result=False,
|
|
)
|
|
# Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
|
|
with self.assertRaises(TimedOutException):
|
|
channel.await_result(timeout_ms=5000)
|
|
# Wake-up `notifier.wait_for_events(...)` that will cause us test
|
|
# `SlidingSyncResult.__bool__` for new results.
|
|
self._bump_notifier_wait_for_events(
|
|
user1_id, wake_stream_key=StreamKeyType.ACCOUNT_DATA
|
|
)
|
|
# Block for a little bit more to ensure we don't see any new results.
|
|
with self.assertRaises(TimedOutException):
|
|
channel.await_result(timeout_ms=4000)
|
|
# Wait for the sync to complete (wait for the rest of the 10 second timeout,
|
|
# 5000 + 4000 + 1200 > 10000)
|
|
channel.await_result(timeout_ms=1200)
|
|
self.assertEqual(channel.code, 200, channel.json_body)
|
|
|
|
# There should be no room sent down.
|
|
self.assertFalse(channel.json_body["rooms"])
|
|
|
|
def test_forgotten_up_to_date(self) -> None:
|
|
"""
|
|
Make sure we get up-to-date `forgotten` status for rooms
|
|
"""
|
|
user1_id = self.register_user("user1", "pass")
|
|
user1_tok = self.login(user1_id, "pass")
|
|
user2_id = self.register_user("user2", "pass")
|
|
user2_tok = self.login(user2_id, "pass")
|
|
|
|
room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
|
|
|
|
# User1 is banned from the room (was never in the room)
|
|
self.helper.ban(room_id, src=user2_id, targ=user1_id, tok=user2_tok)
|
|
|
|
sync_body = {
|
|
"lists": {
|
|
"foo-list": {
|
|
"ranges": [[0, 99]],
|
|
"required_state": [],
|
|
"timeline_limit": 0,
|
|
"filters": {},
|
|
},
|
|
}
|
|
}
|
|
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
|
|
self.assertIncludes(
|
|
set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]),
|
|
{room_id},
|
|
exact=True,
|
|
)
|
|
|
|
# User1 forgets the room
|
|
channel = self.make_request(
|
|
"POST",
|
|
f"/_matrix/client/r0/rooms/{room_id}/forget",
|
|
content={},
|
|
access_token=user1_tok,
|
|
)
|
|
self.assertEqual(channel.code, 200, channel.result)
|
|
|
|
# We should no longer see the forgotten room
|
|
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
|
self.assertIncludes(
|
|
set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]),
|
|
set(),
|
|
exact=True,
|
|
)
|
|
|
|
def test_ignored_user_invites_initial_sync(self) -> None:
|
|
"""
|
|
Make sure we ignore invites if they are from one of the `m.ignored_user_list` on
|
|
initial sync.
|
|
"""
|
|
user1_id = self.register_user("user1", "pass")
|
|
user1_tok = self.login(user1_id, "pass")
|
|
user2_id = self.register_user("user2", "pass")
|
|
user2_tok = self.login(user2_id, "pass")
|
|
|
|
# Create a room that user1 is already in
|
|
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
|
|
|
|
# Create a room that user2 is already in
|
|
room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
|
|
|
# User1 is invited to room_id2
|
|
self.helper.invite(room_id2, src=user2_id, targ=user1_id, tok=user2_tok)
|
|
|
|
# Sync once before we ignore to make sure the rooms can show up
|
|
sync_body = {
|
|
"lists": {
|
|
"foo-list": {
|
|
"ranges": [[0, 99]],
|
|
"required_state": [],
|
|
"timeline_limit": 0,
|
|
},
|
|
}
|
|
}
|
|
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
|
|
# room_id2 shows up because we haven't ignored the user yet
|
|
self.assertIncludes(
|
|
set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]),
|
|
{room_id1, room_id2},
|
|
exact=True,
|
|
)
|
|
|
|
# User1 ignores user2
|
|
channel = self.make_request(
|
|
"PUT",
|
|
f"/_matrix/client/v3/user/{user1_id}/account_data/{AccountDataTypes.IGNORED_USER_LIST}",
|
|
content={"ignored_users": {user2_id: {}}},
|
|
access_token=user1_tok,
|
|
)
|
|
self.assertEqual(channel.code, 200, channel.result)
|
|
|
|
# Sync again (initial sync)
|
|
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
|
|
# The invite for room_id2 should no longer show up because user2 is ignored
|
|
self.assertIncludes(
|
|
set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]),
|
|
{room_id1},
|
|
exact=True,
|
|
)
|
|
|
|
def test_ignored_user_invites_incremental_sync(self) -> None:
|
|
"""
|
|
Make sure we ignore invites if they are from one of the `m.ignored_user_list` on
|
|
incremental sync.
|
|
"""
|
|
user1_id = self.register_user("user1", "pass")
|
|
user1_tok = self.login(user1_id, "pass")
|
|
user2_id = self.register_user("user2", "pass")
|
|
user2_tok = self.login(user2_id, "pass")
|
|
|
|
# Create a room that user1 is already in
|
|
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
|
|
|
|
# Create a room that user2 is already in
|
|
room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
|
|
|
# User1 ignores user2
|
|
channel = self.make_request(
|
|
"PUT",
|
|
f"/_matrix/client/v3/user/{user1_id}/account_data/{AccountDataTypes.IGNORED_USER_LIST}",
|
|
content={"ignored_users": {user2_id: {}}},
|
|
access_token=user1_tok,
|
|
)
|
|
self.assertEqual(channel.code, 200, channel.result)
|
|
|
|
# Initial sync
|
|
sync_body = {
|
|
"lists": {
|
|
"foo-list": {
|
|
"ranges": [[0, 99]],
|
|
"required_state": [],
|
|
"timeline_limit": 0,
|
|
},
|
|
}
|
|
}
|
|
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
|
|
# User1 only has membership in room_id1 at this point
|
|
self.assertIncludes(
|
|
set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]),
|
|
{room_id1},
|
|
exact=True,
|
|
)
|
|
|
|
# User1 is invited to room_id2 after the initial sync
|
|
self.helper.invite(room_id2, src=user2_id, targ=user1_id, tok=user2_tok)
|
|
|
|
# Sync again (incremental sync)
|
|
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
|
# The invite for room_id2 doesn't show up because user2 is ignored
|
|
self.assertIncludes(
|
|
set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]),
|
|
{room_id1},
|
|
exact=True,
|
|
)
|
|
|
|
def test_sort_list(self) -> None:
|
|
"""
|
|
Test that the `lists` are sorted by `stream_ordering`
|
|
"""
|
|
user1_id = self.register_user("user1", "pass")
|
|
user1_tok = self.login(user1_id, "pass")
|
|
|
|
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
|
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
|
room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
|
|
|
# Activity that will order the rooms
|
|
self.helper.send(room_id3, "activity in room3", tok=user1_tok)
|
|
self.helper.send(room_id1, "activity in room1", tok=user1_tok)
|
|
self.helper.send(room_id2, "activity in room2", tok=user1_tok)
|
|
|
|
# Make the Sliding Sync request where the range includes *some* of the rooms
|
|
sync_body = {
|
|
"lists": {
|
|
"foo-list": {
|
|
"ranges": [[0, 1]],
|
|
"required_state": [],
|
|
"timeline_limit": 1,
|
|
}
|
|
}
|
|
}
|
|
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
|
|
|
|
# Make sure it has the foo-list we requested
|
|
self.assertIncludes(
|
|
response_body["lists"].keys(),
|
|
{"foo-list"},
|
|
)
|
|
# Make sure the list is sorted in the way we expect (we only sort when the range
|
|
# doesn't include all of the room)
|
|
self.assertListEqual(
|
|
list(response_body["lists"]["foo-list"]["ops"]),
|
|
[
|
|
{
|
|
"op": "SYNC",
|
|
"range": [0, 1],
|
|
"room_ids": [room_id2, room_id1],
|
|
}
|
|
],
|
|
response_body["lists"]["foo-list"],
|
|
)
|
|
|
|
# Make the Sliding Sync request where the range includes *all* of the rooms
|
|
sync_body = {
|
|
"lists": {
|
|
"foo-list": {
|
|
"ranges": [[0, 99]],
|
|
"required_state": [],
|
|
"timeline_limit": 1,
|
|
}
|
|
}
|
|
}
|
|
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
|
|
|
|
# Make sure it has the foo-list we requested
|
|
self.assertIncludes(
|
|
response_body["lists"].keys(),
|
|
{"foo-list"},
|
|
)
|
|
# Since the range includes all of the rooms, we don't sort the list
|
|
self.assertEqual(
|
|
len(response_body["lists"]["foo-list"]["ops"]),
|
|
1,
|
|
response_body["lists"]["foo-list"],
|
|
)
|
|
op = response_body["lists"]["foo-list"]["ops"][0]
|
|
self.assertEqual(op["op"], "SYNC")
|
|
self.assertEqual(op["range"], [0, 99])
|
|
# Note that we don't sort the rooms when the range includes all of the rooms, so
|
|
# we just assert that the rooms are included
|
|
self.assertIncludes(
|
|
set(op["room_ids"]), {room_id1, room_id2, room_id3}, exact=True
|
|
)
|
|
|
|
def test_sliced_windows(self) -> None:
|
|
"""
|
|
Test that the `lists` `ranges` are sliced correctly. Both sides of each range
|
|
are inclusive.
|
|
"""
|
|
user1_id = self.register_user("user1", "pass")
|
|
user1_tok = self.login(user1_id, "pass")
|
|
|
|
_room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
|
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
|
room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
|
|
|
# Make the Sliding Sync request for a single room
|
|
sync_body = {
|
|
"lists": {
|
|
"foo-list": {
|
|
"ranges": [[0, 0]],
|
|
"required_state": [],
|
|
"timeline_limit": 1,
|
|
}
|
|
}
|
|
}
|
|
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
|
|
|
|
# Make sure it has the foo-list we requested
|
|
self.assertListEqual(
|
|
list(response_body["lists"].keys()),
|
|
["foo-list"],
|
|
response_body["lists"].keys(),
|
|
)
|
|
# Make sure the list is sorted in the way we expect
|
|
self.assertListEqual(
|
|
list(response_body["lists"]["foo-list"]["ops"]),
|
|
[
|
|
{
|
|
"op": "SYNC",
|
|
"range": [0, 0],
|
|
"room_ids": [room_id3],
|
|
}
|
|
],
|
|
response_body["lists"]["foo-list"],
|
|
)
|
|
|
|
# Make the Sliding Sync request for the first two rooms
|
|
sync_body = {
|
|
"lists": {
|
|
"foo-list": {
|
|
"ranges": [[0, 1]],
|
|
"required_state": [],
|
|
"timeline_limit": 1,
|
|
}
|
|
}
|
|
}
|
|
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
|
|
|
|
# Make sure it has the foo-list we requested
|
|
self.assertListEqual(
|
|
list(response_body["lists"].keys()),
|
|
["foo-list"],
|
|
response_body["lists"].keys(),
|
|
)
|
|
# Make sure the list is sorted in the way we expect
|
|
self.assertListEqual(
|
|
list(response_body["lists"]["foo-list"]["ops"]),
|
|
[
|
|
{
|
|
"op": "SYNC",
|
|
"range": [0, 1],
|
|
"room_ids": [room_id3, room_id2],
|
|
}
|
|
],
|
|
response_body["lists"]["foo-list"],
|
|
)
|
|
|
|
def test_rooms_with_no_updates_do_not_come_down_incremental_sync(self) -> None:
|
|
"""
|
|
Test that rooms with no updates are returned in subsequent incremental
|
|
syncs.
|
|
"""
|
|
|
|
user1_id = self.register_user("user1", "pass")
|
|
user1_tok = self.login(user1_id, "pass")
|
|
|
|
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
|
|
|
|
sync_body = {
|
|
"lists": {
|
|
"foo-list": {
|
|
"ranges": [[0, 1]],
|
|
"required_state": [],
|
|
"timeline_limit": 0,
|
|
}
|
|
}
|
|
}
|
|
|
|
_, from_token = self.do_sync(sync_body, tok=user1_tok)
|
|
|
|
# Make the incremental Sliding Sync request
|
|
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
|
|
|
# Nothing has happened in the room, so the room should not come down
|
|
# /sync.
|
|
self.assertIsNone(response_body["rooms"].get(room_id1))
|
|
|
|
def test_empty_initial_room_comes_down_sync(self) -> None:
|
|
"""
|
|
Test that rooms come down /sync even with empty required state and
|
|
timeline limit in initial sync.
|
|
"""
|
|
|
|
user1_id = self.register_user("user1", "pass")
|
|
user1_tok = self.login(user1_id, "pass")
|
|
|
|
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
|
|
|
|
sync_body = {
|
|
"lists": {
|
|
"foo-list": {
|
|
"ranges": [[0, 1]],
|
|
"required_state": [],
|
|
"timeline_limit": 0,
|
|
}
|
|
}
|
|
}
|
|
|
|
# Make the Sliding Sync request
|
|
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
|
|
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
|
|
|
|
def test_state_reset_room_comes_down_incremental_sync(self) -> None:
|
|
"""Test that a room that we were state reset out of comes down
|
|
incremental sync"""
|
|
|
|
user1_id = self.register_user("user1", "pass")
|
|
user1_tok = self.login(user1_id, "pass")
|
|
user2_id = self.register_user("user2", "pass")
|
|
user2_tok = self.login(user2_id, "pass")
|
|
|
|
room_id1 = self.helper.create_room_as(
|
|
user2_id,
|
|
is_public=True,
|
|
tok=user2_tok,
|
|
extra_content={
|
|
"name": "my super room",
|
|
},
|
|
)
|
|
|
|
# Create an event for us to point back to for the state reset
|
|
event_response = self.helper.send(room_id1, "test", tok=user2_tok)
|
|
event_id = event_response["event_id"]
|
|
|
|
self.helper.join(room_id1, user1_id, tok=user1_tok)
|
|
|
|
sync_body = {
|
|
"lists": {
|
|
"foo-list": {
|
|
"ranges": [[0, 1]],
|
|
"required_state": [
|
|
# Request all state just to see what we get back when we are
|
|
# state reset out of the room
|
|
[StateValues.WILDCARD, StateValues.WILDCARD]
|
|
],
|
|
"timeline_limit": 1,
|
|
}
|
|
}
|
|
}
|
|
|
|
# Make the Sliding Sync request
|
|
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
|
|
# Make sure we see room1
|
|
self.assertIncludes(set(response_body["rooms"].keys()), {room_id1}, exact=True)
|
|
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
|
|
|
|
# Trigger a state reset
|
|
join_rule_event, join_rule_context = self.get_success(
|
|
create_event(
|
|
self.hs,
|
|
prev_event_ids=[event_id],
|
|
type=EventTypes.JoinRules,
|
|
state_key="",
|
|
content={"join_rule": JoinRules.INVITE},
|
|
sender=user2_id,
|
|
room_id=room_id1,
|
|
room_version=self.get_success(self.store.get_room_version_id(room_id1)),
|
|
)
|
|
)
|
|
_, join_rule_event_pos, _ = self.get_success(
|
|
self.persistence.persist_event(join_rule_event, join_rule_context)
|
|
)
|
|
|
|
# FIXME: We're manually busting the cache since
|
|
# https://github.com/element-hq/synapse/issues/17368 is not solved yet
|
|
self.store._membership_stream_cache.entity_has_changed(
|
|
user1_id, join_rule_event_pos.stream
|
|
)
|
|
|
|
# Ensure that the state reset worked and only user2 is in the room now
|
|
users_in_room = self.get_success(self.store.get_users_in_room(room_id1))
|
|
self.assertIncludes(set(users_in_room), {user2_id}, exact=True)
|
|
|
|
state_map_at_reset = self.get_success(
|
|
self.storage_controllers.state.get_current_state(room_id1)
|
|
)
|
|
|
|
# Update the state after user1 was state reset out of the room
|
|
self.helper.send_state(
|
|
room_id1,
|
|
EventTypes.Name,
|
|
{EventContentFields.ROOM_NAME: "my super duper room"},
|
|
tok=user2_tok,
|
|
)
|
|
|
|
# Make another Sliding Sync request (incremental)
|
|
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
|
|
|
# Expect to see room1 because it is `newly_left` thanks to being state reset out
|
|
# of it since the last time we synced. We need to let the client know that
|
|
# something happened and that they are no longer in the room.
|
|
self.assertIncludes(set(response_body["rooms"].keys()), {room_id1}, exact=True)
|
|
# We set `initial=True` to indicate that the client should reset the state they
|
|
# have about the room
|
|
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
|
|
# They shouldn't see anything past the state reset
|
|
self._assertRequiredStateIncludes(
|
|
response_body["rooms"][room_id1]["required_state"],
|
|
# We should see all the state events in the room
|
|
state_map_at_reset.values(),
|
|
exact=True,
|
|
)
|
|
# The position where the state reset happened
|
|
self.assertEqual(
|
|
response_body["rooms"][room_id1]["bump_stamp"],
|
|
join_rule_event_pos.stream,
|
|
response_body["rooms"][room_id1],
|
|
)
|
|
|
|
# Other non-important things. We just want to check what these are so we know
|
|
# what happens in a state reset scenario.
|
|
#
|
|
# Room name was set at the time of the state reset so we should still be able to
|
|
# see it.
|
|
self.assertEqual(response_body["rooms"][room_id1]["name"], "my super room")
|
|
# Could be set but there is no avatar for this room
|
|
self.assertIsNone(
|
|
response_body["rooms"][room_id1].get("avatar"),
|
|
response_body["rooms"][room_id1],
|
|
)
|
|
# Could be set but this room isn't marked as a DM
|
|
self.assertIsNone(
|
|
response_body["rooms"][room_id1].get("is_dm"),
|
|
response_body["rooms"][room_id1],
|
|
)
|
|
# Empty timeline because we are not in the room at all (they are all being
|
|
# filtered out)
|
|
self.assertIsNone(
|
|
response_body["rooms"][room_id1].get("timeline"),
|
|
response_body["rooms"][room_id1],
|
|
)
|
|
# `limited` since we're not providing any timeline events but there are some in
|
|
# the room.
|
|
self.assertEqual(response_body["rooms"][room_id1]["limited"], True)
|
|
# User is no longer in the room so they can't see this info
|
|
self.assertIsNone(
|
|
response_body["rooms"][room_id1].get("joined_count"),
|
|
response_body["rooms"][room_id1],
|
|
)
|
|
self.assertIsNone(
|
|
response_body["rooms"][room_id1].get("invited_count"),
|
|
response_body["rooms"][room_id1],
|
|
)
|
|
|
|
def test_state_reset_previously_room_comes_down_incremental_sync_with_filters(
|
|
self,
|
|
) -> None:
|
|
"""
|
|
Test that a room that we were state reset out of should always be sent down
|
|
regardless of the filters if it has been sent down the connection before.
|
|
"""
|
|
user1_id = self.register_user("user1", "pass")
|
|
user1_tok = self.login(user1_id, "pass")
|
|
user2_id = self.register_user("user2", "pass")
|
|
user2_tok = self.login(user2_id, "pass")
|
|
|
|
# Create a space room
|
|
space_room_id = self.helper.create_room_as(
|
|
user2_id,
|
|
tok=user2_tok,
|
|
extra_content={
|
|
"creation_content": {EventContentFields.ROOM_TYPE: RoomTypes.SPACE},
|
|
"name": "my super space",
|
|
},
|
|
)
|
|
|
|
# Create an event for us to point back to for the state reset
|
|
event_response = self.helper.send(space_room_id, "test", tok=user2_tok)
|
|
event_id = event_response["event_id"]
|
|
|
|
self.helper.join(space_room_id, user1_id, tok=user1_tok)
|
|
|
|
sync_body = {
|
|
"lists": {
|
|
"foo-list": {
|
|
"ranges": [[0, 1]],
|
|
"required_state": [
|
|
# Request all state just to see what we get back when we are
|
|
# state reset out of the room
|
|
[StateValues.WILDCARD, StateValues.WILDCARD]
|
|
],
|
|
"timeline_limit": 1,
|
|
"filters": {
|
|
"room_types": [RoomTypes.SPACE],
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
# Make the Sliding Sync request
|
|
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
|
|
# Make sure we see room1
|
|
self.assertIncludes(
|
|
set(response_body["rooms"].keys()), {space_room_id}, exact=True
|
|
)
|
|
self.assertEqual(response_body["rooms"][space_room_id]["initial"], True)
|
|
|
|
# Trigger a state reset
|
|
join_rule_event, join_rule_context = self.get_success(
|
|
create_event(
|
|
self.hs,
|
|
prev_event_ids=[event_id],
|
|
type=EventTypes.JoinRules,
|
|
state_key="",
|
|
content={"join_rule": JoinRules.INVITE},
|
|
sender=user2_id,
|
|
room_id=space_room_id,
|
|
room_version=self.get_success(
|
|
self.store.get_room_version_id(space_room_id)
|
|
),
|
|
)
|
|
)
|
|
_, join_rule_event_pos, _ = self.get_success(
|
|
self.persistence.persist_event(join_rule_event, join_rule_context)
|
|
)
|
|
|
|
# FIXME: We're manually busting the cache since
|
|
# https://github.com/element-hq/synapse/issues/17368 is not solved yet
|
|
self.store._membership_stream_cache.entity_has_changed(
|
|
user1_id, join_rule_event_pos.stream
|
|
)
|
|
|
|
# Ensure that the state reset worked and only user2 is in the room now
|
|
users_in_room = self.get_success(self.store.get_users_in_room(space_room_id))
|
|
self.assertIncludes(set(users_in_room), {user2_id}, exact=True)
|
|
|
|
state_map_at_reset = self.get_success(
|
|
self.storage_controllers.state.get_current_state(space_room_id)
|
|
)
|
|
|
|
# Update the state after user1 was state reset out of the room
|
|
self.helper.send_state(
|
|
space_room_id,
|
|
EventTypes.Name,
|
|
{EventContentFields.ROOM_NAME: "my super duper space"},
|
|
tok=user2_tok,
|
|
)
|
|
|
|
# User2 also leaves the room so the server is no longer participating in the room
|
|
# and we don't have access to current state
|
|
self.helper.leave(space_room_id, user2_id, tok=user2_tok)
|
|
|
|
# Make another Sliding Sync request (incremental)
|
|
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
|
|
|
# Expect to see room1 because it is `newly_left` thanks to being state reset out
|
|
# of it since the last time we synced. We need to let the client know that
|
|
# something happened and that they are no longer in the room.
|
|
self.assertIncludes(
|
|
set(response_body["rooms"].keys()), {space_room_id}, exact=True
|
|
)
|
|
# We set `initial=True` to indicate that the client should reset the state they
|
|
# have about the room
|
|
self.assertEqual(response_body["rooms"][space_room_id]["initial"], True)
|
|
# They shouldn't see anything past the state reset
|
|
self._assertRequiredStateIncludes(
|
|
response_body["rooms"][space_room_id]["required_state"],
|
|
# We should see all the state events in the room
|
|
state_map_at_reset.values(),
|
|
exact=True,
|
|
)
|
|
# The position where the state reset happened
|
|
self.assertEqual(
|
|
response_body["rooms"][space_room_id]["bump_stamp"],
|
|
join_rule_event_pos.stream,
|
|
response_body["rooms"][space_room_id],
|
|
)
|
|
|
|
# Other non-important things. We just want to check what these are so we know
|
|
# what happens in a state reset scenario.
|
|
#
|
|
# Room name was set at the time of the state reset so we should still be able to
|
|
# see it.
|
|
self.assertEqual(
|
|
response_body["rooms"][space_room_id]["name"], "my super space"
|
|
)
|
|
# Could be set but there is no avatar for this room
|
|
self.assertIsNone(
|
|
response_body["rooms"][space_room_id].get("avatar"),
|
|
response_body["rooms"][space_room_id],
|
|
)
|
|
# Could be set but this room isn't marked as a DM
|
|
self.assertIsNone(
|
|
response_body["rooms"][space_room_id].get("is_dm"),
|
|
response_body["rooms"][space_room_id],
|
|
)
|
|
# Empty timeline because we are not in the room at all (they are all being
|
|
# filtered out)
|
|
self.assertIsNone(
|
|
response_body["rooms"][space_room_id].get("timeline"),
|
|
response_body["rooms"][space_room_id],
|
|
)
|
|
# `limited` since we're not providing any timeline events but there are some in
|
|
# the room.
|
|
self.assertEqual(response_body["rooms"][space_room_id]["limited"], True)
|
|
# User is no longer in the room so they can't see this info
|
|
self.assertIsNone(
|
|
response_body["rooms"][space_room_id].get("joined_count"),
|
|
response_body["rooms"][space_room_id],
|
|
)
|
|
self.assertIsNone(
|
|
response_body["rooms"][space_room_id].get("invited_count"),
|
|
response_body["rooms"][space_room_id],
|
|
)
|
|
|
|
@parameterized.expand(
|
|
[
|
|
("server_leaves_room", True),
|
|
("server_participating_in_room", False),
|
|
]
|
|
)
|
|
def test_state_reset_never_room_incremental_sync_with_filters(
|
|
self, test_description: str, server_leaves_room: bool
|
|
) -> None:
|
|
"""
|
|
Test that a room that we were state reset out of should be sent down if we can
|
|
figure out the state or if it was sent down the connection before.
|
|
"""
|
|
user1_id = self.register_user("user1", "pass")
|
|
user1_tok = self.login(user1_id, "pass")
|
|
user2_id = self.register_user("user2", "pass")
|
|
user2_tok = self.login(user2_id, "pass")
|
|
|
|
# Create a space room
|
|
space_room_id = self.helper.create_room_as(
|
|
user2_id,
|
|
tok=user2_tok,
|
|
extra_content={
|
|
"creation_content": {EventContentFields.ROOM_TYPE: RoomTypes.SPACE},
|
|
"name": "my super space",
|
|
},
|
|
)
|
|
|
|
# Create another space room
|
|
space_room_id2 = self.helper.create_room_as(
|
|
user2_id,
|
|
tok=user2_tok,
|
|
extra_content={
|
|
"creation_content": {EventContentFields.ROOM_TYPE: RoomTypes.SPACE},
|
|
},
|
|
)
|
|
|
|
# Create an event for us to point back to for the state reset
|
|
event_response = self.helper.send(space_room_id, "test", tok=user2_tok)
|
|
event_id = event_response["event_id"]
|
|
|
|
# User1 joins the rooms
|
|
#
|
|
self.helper.join(space_room_id, user1_id, tok=user1_tok)
|
|
# Join space_room_id2 so that it is at the top of the list
|
|
self.helper.join(space_room_id2, user1_id, tok=user1_tok)
|
|
|
|
# Make a SS request for only the top room.
|
|
sync_body = {
|
|
"lists": {
|
|
"foo-list": {
|
|
"ranges": [[0, 0]],
|
|
"required_state": [
|
|
# Request all state just to see what we get back when we are
|
|
# state reset out of the room
|
|
[StateValues.WILDCARD, StateValues.WILDCARD]
|
|
],
|
|
"timeline_limit": 1,
|
|
"filters": {
|
|
"room_types": [RoomTypes.SPACE],
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
# Make the Sliding Sync request
|
|
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
|
|
# Make sure we only see space_room_id2
|
|
self.assertIncludes(
|
|
set(response_body["rooms"].keys()), {space_room_id2}, exact=True
|
|
)
|
|
self.assertEqual(response_body["rooms"][space_room_id2]["initial"], True)
|
|
|
|
# Just create some activity in space_room_id2 so it appears when we incremental sync again
|
|
self.helper.send(space_room_id2, "test", tok=user2_tok)
|
|
|
|
# Trigger a state reset
|
|
join_rule_event, join_rule_context = self.get_success(
|
|
create_event(
|
|
self.hs,
|
|
prev_event_ids=[event_id],
|
|
type=EventTypes.JoinRules,
|
|
state_key="",
|
|
content={"join_rule": JoinRules.INVITE},
|
|
sender=user2_id,
|
|
room_id=space_room_id,
|
|
room_version=self.get_success(
|
|
self.store.get_room_version_id(space_room_id)
|
|
),
|
|
)
|
|
)
|
|
_, join_rule_event_pos, _ = self.get_success(
|
|
self.persistence.persist_event(join_rule_event, join_rule_context)
|
|
)
|
|
|
|
# FIXME: We're manually busting the cache since
|
|
# https://github.com/element-hq/synapse/issues/17368 is not solved yet
|
|
self.store._membership_stream_cache.entity_has_changed(
|
|
user1_id, join_rule_event_pos.stream
|
|
)
|
|
|
|
# Ensure that the state reset worked and only user2 is in the room now
|
|
users_in_room = self.get_success(self.store.get_users_in_room(space_room_id))
|
|
self.assertIncludes(set(users_in_room), {user2_id}, exact=True)
|
|
|
|
# Update the state after user1 was state reset out of the room.
|
|
# This will also bump it to the top of the list.
|
|
self.helper.send_state(
|
|
space_room_id,
|
|
EventTypes.Name,
|
|
{EventContentFields.ROOM_NAME: "my super duper space"},
|
|
tok=user2_tok,
|
|
)
|
|
|
|
if server_leaves_room:
|
|
# User2 also leaves the room so the server is no longer participating in the room
|
|
# and we don't have access to current state
|
|
self.helper.leave(space_room_id, user2_id, tok=user2_tok)
|
|
|
|
# Make another Sliding Sync request (incremental)
|
|
sync_body = {
|
|
"lists": {
|
|
"foo-list": {
|
|
# Expand the range to include all rooms
|
|
"ranges": [[0, 1]],
|
|
"required_state": [
|
|
# Request all state just to see what we get back when we are
|
|
# state reset out of the room
|
|
[StateValues.WILDCARD, StateValues.WILDCARD]
|
|
],
|
|
"timeline_limit": 1,
|
|
"filters": {
|
|
"room_types": [RoomTypes.SPACE],
|
|
},
|
|
}
|
|
}
|
|
}
|
|
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
|
|
|
if self.use_new_tables:
|
|
if server_leaves_room:
|
|
# We still only expect to see space_room_id2 because even though we were state
|
|
# reset out of space_room_id, it was never sent down the connection before so we
|
|
# don't need to bother the client with it.
|
|
self.assertIncludes(
|
|
set(response_body["rooms"].keys()), {space_room_id2}, exact=True
|
|
)
|
|
else:
|
|
# Both rooms show up because we can figure out the state for the
|
|
# `filters.room_types` if someone is still in the room (we look at the
|
|
# current state because `room_type` never changes).
|
|
self.assertIncludes(
|
|
set(response_body["rooms"].keys()),
|
|
{space_room_id, space_room_id2},
|
|
exact=True,
|
|
)
|
|
else:
|
|
# Both rooms show up because we can actually take the time to figure out the
|
|
# state for the `filters.room_types` in the fallback path (we look at
|
|
# historical state for `LEAVE` membership).
|
|
self.assertIncludes(
|
|
set(response_body["rooms"].keys()),
|
|
{space_room_id, space_room_id2},
|
|
exact=True,
|
|
)
|