Sliding Sync: Add Account Data extension (MSC3959) (#17477)
Some checks are pending
Build docker images / build (push) Waiting to run
Deploy the documentation / Calculate variables for GitHub Pages deployment (push) Waiting to run
Deploy the documentation / GitHub Pages (push) Blocked by required conditions
Build release artifacts / Calculate list of debian distros (push) Waiting to run
Build release artifacts / Build .deb packages (push) Blocked by required conditions
Build release artifacts / Build wheels on ${{ matrix.os }} for ${{ matrix.arch }} (x86_64, ${{ startsWith(github.ref, 'refs/pull/') }}, ubuntu-20.04) (push) Waiting to run
Build release artifacts / Build sdist (push) Waiting to run
Build release artifacts / Attach assets to release (push) Blocked by required conditions
Tests / lint-newsfile (push) Waiting to run
Tests / lint-pydantic (push) Blocked by required conditions
Tests / lint-clippy (push) Blocked by required conditions
Tests / lint-clippy-nightly (push) Blocked by required conditions
Tests / lint-rustfmt (push) Blocked by required conditions
Tests / lint-readme (push) Blocked by required conditions
Tests / linting-done (push) Blocked by required conditions
Tests / calculate-test-jobs (push) Blocked by required conditions
Tests / changes (push) Waiting to run
Tests / check-sampleconfig (push) Blocked by required conditions
Tests / check-schema-delta (push) Blocked by required conditions
Tests / check-lockfile (push) Waiting to run
Tests / lint (push) Blocked by required conditions
Tests / Typechecking (push) Blocked by required conditions
Tests / lint-crlf (push) Waiting to run
Tests / trial (push) Blocked by required conditions
Tests / trial-olddeps (push) Blocked by required conditions
Tests / trial-pypy (all, pypy-3.8) (push) Blocked by required conditions
Tests / sytest (push) Blocked by required conditions
Tests / export-data (push) Blocked by required conditions
Tests / portdb (11, 3.8) (push) Blocked by required conditions
Tests / portdb (15, 3.11) (push) Blocked by required conditions
Tests / complement (monolith, Postgres) (push) Blocked by required conditions
Tests / complement (monolith, SQLite) (push) Blocked by required conditions
Tests / complement (workers, Postgres) (push) Blocked by required conditions
Build release artifacts / Build wheels on ${{ matrix.os }} for ${{ matrix.arch }} (aarch64, ${{ startsWith(github.ref, 'refs/pull/') }}, ubuntu-20.04) (push) Waiting to run
Build release artifacts / Build wheels on ${{ matrix.os }} for ${{ matrix.arch }} (x86_64, ${{ startsWith(github.ref, 'refs/pull/') }}, macos-12) (push) Waiting to run
Tests / cargo-test (push) Blocked by required conditions
Tests / cargo-bench (push) Blocked by required conditions
Tests / tests-done (push) Blocked by required conditions

Extensions based on
[MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575):
Sliding Sync
This commit is contained in:
Eric Eastwood 2024-07-24 17:10:38 -05:00 committed by GitHub
parent bdf37ad4c4
commit 729026e604
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 982 additions and 2 deletions

View file

@ -0,0 +1 @@
Add Account Data extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.

View file

@ -46,6 +46,7 @@ from synapse.storage.roommember import MemberSummary
from synapse.types import (
DeviceListUpdates,
JsonDict,
JsonMapping,
PersistedEventPosition,
Requester,
RoomStreamToken,
@ -357,6 +358,7 @@ class SlidingSyncHandler:
self.event_sources = hs.get_event_sources()
self.relations_handler = hs.get_relations_handler()
self.device_handler = hs.get_device_handler()
self.push_rules_handler = hs.get_push_rules_handler()
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
async def wait_for_sync_for_user(
@ -628,6 +630,7 @@ class SlidingSyncHandler:
extensions = await self.get_extensions_response(
sync_config=sync_config,
lists=lists,
from_token=from_token,
to_token=to_token,
)
@ -1797,6 +1800,7 @@ class SlidingSyncHandler:
async def get_extensions_response(
self,
sync_config: SlidingSyncConfig,
lists: Dict[str, SlidingSyncResult.SlidingWindowList],
to_token: StreamToken,
from_token: Optional[SlidingSyncStreamToken],
) -> SlidingSyncResult.Extensions:
@ -1804,6 +1808,7 @@ class SlidingSyncHandler:
Args:
sync_config: Sync configuration
lists: Sliding window API. A map of list key to list results.
to_token: The point in the stream to sync up to.
from_token: The point in the stream to sync from.
"""
@ -1828,9 +1833,20 @@ class SlidingSyncHandler:
from_token=from_token,
)
account_data_response = None
if sync_config.extensions.account_data is not None:
account_data_response = await self.get_account_data_extension_response(
sync_config=sync_config,
lists=lists,
account_data_request=sync_config.extensions.account_data,
to_token=to_token,
from_token=from_token,
)
return SlidingSyncResult.Extensions(
to_device=to_device_response,
e2ee=e2ee_response,
account_data=account_data_response,
)
async def get_to_device_extension_response(
@ -1956,3 +1972,125 @@ class SlidingSyncHandler:
device_one_time_keys_count=device_one_time_keys_count,
device_unused_fallback_key_types=device_unused_fallback_key_types,
)
async def get_account_data_extension_response(
self,
sync_config: SlidingSyncConfig,
lists: Dict[str, SlidingSyncResult.SlidingWindowList],
account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension,
to_token: StreamToken,
from_token: Optional[SlidingSyncStreamToken],
) -> Optional[SlidingSyncResult.Extensions.AccountDataExtension]:
"""Handle Account Data extension (MSC3959)
Args:
sync_config: Sync configuration
lists: Sliding window API. A map of list key to list results.
account_data_request: The account_data extension from the request
to_token: The point in the stream to sync up to.
from_token: The point in the stream to sync from.
"""
user_id = sync_config.user.to_string()
# Skip if the extension is not enabled
if not account_data_request.enabled:
return None
global_account_data_map: Mapping[str, JsonMapping] = {}
if from_token is not None:
global_account_data_map = (
await self.store.get_updated_global_account_data_for_user(
user_id, from_token.stream_token.account_data_key
)
)
have_push_rules_changed = await self.store.have_push_rules_changed_for_user(
user_id, from_token.stream_token.push_rules_key
)
if have_push_rules_changed:
global_account_data_map = dict(global_account_data_map)
global_account_data_map[AccountDataTypes.PUSH_RULES] = (
await self.push_rules_handler.push_rules_for_user(sync_config.user)
)
else:
all_global_account_data = await self.store.get_global_account_data_for_user(
user_id
)
global_account_data_map = dict(all_global_account_data)
global_account_data_map[AccountDataTypes.PUSH_RULES] = (
await self.push_rules_handler.push_rules_for_user(sync_config.user)
)
# We only want to include account data for rooms that are already in the sliding
# sync response AND that were requested in the account data request.
relevant_room_ids: Set[str] = set()
# See what rooms from the room subscriptions we should get account data for
if (
account_data_request.rooms is not None
and sync_config.room_subscriptions is not None
):
actual_room_ids = sync_config.room_subscriptions.keys()
for room_id in account_data_request.rooms:
# A wildcard means we process all rooms from the room subscriptions
if room_id == "*":
relevant_room_ids.update(sync_config.room_subscriptions.keys())
break
if room_id in actual_room_ids:
relevant_room_ids.add(room_id)
# See what rooms from the sliding window lists we should get account data for
if account_data_request.lists is not None:
for list_key in account_data_request.lists:
# Just some typing because we share the variable name in multiple places
actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None
# A wildcard means we process rooms from all lists
if list_key == "*":
for actual_list in lists.values():
# We only expect a single SYNC operation for any list
assert len(actual_list.ops) == 1
sync_op = actual_list.ops[0]
assert sync_op.op == OperationType.SYNC
relevant_room_ids.update(sync_op.room_ids)
break
actual_list = lists.get(list_key)
if actual_list is not None:
# We only expect a single SYNC operation for any list
assert len(actual_list.ops) == 1
sync_op = actual_list.ops[0]
assert sync_op.op == OperationType.SYNC
relevant_room_ids.update(sync_op.room_ids)
# Fetch room account data
account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] = {}
if len(relevant_room_ids) > 0:
if from_token is not None:
account_data_by_room_map = (
await self.store.get_updated_room_account_data_for_user(
user_id, from_token.stream_token.account_data_key
)
)
else:
account_data_by_room_map = (
await self.store.get_room_account_data_for_user(user_id)
)
# Filter down to the relevant rooms
account_data_by_room_map = {
room_id: account_data_map
for room_id, account_data_map in account_data_by_room_map.items()
if room_id in relevant_room_ids
}
return SlidingSyncResult.Extensions.AccountDataExtension(
global_account_data_map=global_account_data_map,
account_data_by_room_map=account_data_by_room_map,
)

View file

@ -929,7 +929,6 @@ class SlidingSyncRestServlet(RestServlet):
return 200, response_content
# TODO: Is there a better way to encode things?
async def encode_response(
self,
requester: Requester,
@ -1117,6 +1116,24 @@ class SlidingSyncRestServlet(RestServlet):
extensions.e2ee.device_list_updates.left
)
if extensions.account_data is not None:
serialized_extensions["account_data"] = {
# Same as the the top-level `account_data.events` field in Sync v2.
"global": [
{"type": account_data_type, "content": content}
for account_data_type, content in extensions.account_data.global_account_data_map.items()
],
# Same as the joined room's account_data field in Sync v2, e.g the path
# `rooms.join["!foo:bar"].account_data.events`.
"rooms": {
room_id: [
{"type": account_data_type, "content": content}
for account_data_type, content in event_map.items()
]
for room_id, event_map in extensions.account_data.account_data_by_room_map.items()
},
}
return serialized_extensions

View file

@ -330,11 +330,31 @@ class SlidingSyncResult:
or self.device_unused_fallback_key_types
)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class AccountDataExtension:
"""The Account Data extension (MSC3959)
Attributes:
global_account_data_map: Mapping from `type` to `content` of global account
data events.
account_data_by_room_map: Mapping from room_id to mapping of `type` to
`content` of room account data events.
"""
global_account_data_map: Mapping[str, JsonMapping]
account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]]
def __bool__(self) -> bool:
return bool(
self.global_account_data_map or self.account_data_by_room_map
)
to_device: Optional[ToDeviceExtension] = None
e2ee: Optional[E2eeExtension] = None
account_data: Optional[AccountDataExtension] = None
def __bool__(self) -> bool:
return bool(self.to_device or self.e2ee)
return bool(self.to_device or self.e2ee or self.account_data)
next_pos: SlidingSyncStreamToken
lists: Dict[str, SlidingWindowList]

View file

@ -322,8 +322,26 @@ class SlidingSyncBody(RequestBodyModel):
enabled: Optional[StrictBool] = False
class AccountDataExtension(RequestBodyModel):
"""The Account Data extension (MSC3959)
Attributes:
enabled
lists: List of list keys (from the Sliding Window API) to apply this
extension to.
rooms: List of room IDs (from the Room Subscription API) to apply this
extension to.
"""
enabled: Optional[StrictBool] = False
# Process all lists defined in the Sliding Window API. (This is the default.)
lists: Optional[List[StrictStr]] = ["*"]
# Process all room subscriptions defined in the Room Subscription API. (This is the default.)
rooms: Optional[List[StrictStr]] = ["*"]
to_device: Optional[ToDeviceExtension] = None
e2ee: Optional[E2eeExtension] = None
account_data: Optional[AccountDataExtension] = None
# mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
if TYPE_CHECKING:

View file

@ -5458,3 +5458,789 @@ class SlidingSyncE2eeExtensionTestCase(SlidingSyncBase):
),
["alg1"],
)
class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
"""Tests for the account_data sliding sync extension"""
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
room.register_servlets,
sync.register_servlets,
sendtodevice.register_servlets,
]
def default_config(self) -> JsonDict:
config = super().default_config()
# Enable sliding sync
config["experimental_features"] = {"msc3575_enabled": True}
return config
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.event_sources = hs.get_event_sources()
self.e2e_keys_handler = hs.get_e2e_keys_handler()
self.account_data_handler = hs.get_account_data_handler()
self.notifier = hs.get_notifier()
self.sync_endpoint = (
"/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
)
def _bump_notifier_wait_for_events(self, user_id: str) -> None:
"""
Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding
Sync results.
"""
# We're expecting some new activity from this point onwards
from_token = self.event_sources.get_current_token()
triggered_notifier_wait_for_events = False
async def _on_new_acivity(
before_token: StreamToken, after_token: StreamToken
) -> bool:
nonlocal triggered_notifier_wait_for_events
triggered_notifier_wait_for_events = True
return True
# Listen for some new activity for the user. We're just trying to confirm that
# our bump below actually does what we think it does (triggers new activity for
# the user).
result_awaitable = self.notifier.wait_for_events(
user_id,
1000,
_on_new_acivity,
from_token=from_token,
)
# Send a new To-Device message so that `notifier.wait_for_events(...)` wakes up.
# We're bumping to-device because it won't show up in the Sliding Sync response
# for this extension so it won't affect whether we have results.
sending_user_id = self.register_user(
"user_bump_notifier_wait_for_events", "pass"
)
sending_user_tok = self.login(sending_user_id, "pass")
test_msg = {"foo": "bar"}
chan = self.make_request(
"PUT",
"/_matrix/client/r0/sendToDevice/m.test/1234",
content={"messages": {user_id: {"d1": test_msg}}},
access_token=sending_user_tok,
)
self.assertEqual(chan.code, 200, chan.result)
# Wait for our notifier result
self.get_success(result_awaitable)
if not triggered_notifier_wait_for_events:
raise AssertionError(
"Expected `notifier.wait_for_events(...)` to be triggered"
)
def test_no_data_initial_sync(self) -> None:
"""
Test that enabling the account_data extension works during an intitial sync,
even if there is no-data.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
# Make an initial Sliding Sync request with the account_data extension enabled
sync_body = {
"lists": {},
"extensions": {
"account_data": {
"enabled": True,
}
},
}
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
self.assertIncludes(
{
global_event["type"]
for global_event in response_body["extensions"]["account_data"].get(
"global"
)
},
# Even though we don't have any global account data set, Synapse saves some
# default push rules for us.
{AccountDataTypes.PUSH_RULES},
exact=True,
)
self.assertIncludes(
response_body["extensions"]["account_data"].get("rooms").keys(),
set(),
exact=True,
)
def test_no_data_incremental_sync(self) -> None:
"""
Test that enabling account_data extension works during an incremental sync, even
if there is no-data.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
sync_body = {
"lists": {},
"extensions": {
"account_data": {
"enabled": True,
}
},
}
_, from_token = self.do_sync(sync_body, tok=user1_tok)
# Make an incremental Sliding Sync request with the account_data extension enabled
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
# There has been no account data changes since the `from_token` so we shouldn't
# see any account data here.
self.assertIncludes(
{
global_event["type"]
for global_event in response_body["extensions"]["account_data"].get(
"global"
)
},
set(),
exact=True,
)
self.assertIncludes(
response_body["extensions"]["account_data"].get("rooms").keys(),
set(),
exact=True,
)
def test_global_account_data_initial_sync(self) -> None:
"""
On initial sync, we should return all global account data on initial sync.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
# Update the global account data
self.get_success(
self.account_data_handler.add_account_data_for_user(
user_id=user1_id,
account_data_type="org.matrix.foobarbaz",
content={"foo": "bar"},
)
)
# Make an initial Sliding Sync request with the account_data extension enabled
sync_body = {
"lists": {},
"extensions": {
"account_data": {
"enabled": True,
}
},
}
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# It should show us all of the global account data
self.assertIncludes(
{
global_event["type"]
for global_event in response_body["extensions"]["account_data"].get(
"global"
)
},
{AccountDataTypes.PUSH_RULES, "org.matrix.foobarbaz"},
exact=True,
)
self.assertIncludes(
response_body["extensions"]["account_data"].get("rooms").keys(),
set(),
exact=True,
)
def test_global_account_data_incremental_sync(self) -> None:
"""
On incremental sync, we should only account data that has changed since the
`from_token`.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
# Add some global account data
self.get_success(
self.account_data_handler.add_account_data_for_user(
user_id=user1_id,
account_data_type="org.matrix.foobarbaz",
content={"foo": "bar"},
)
)
sync_body = {
"lists": {},
"extensions": {
"account_data": {
"enabled": True,
}
},
}
_, from_token = self.do_sync(sync_body, tok=user1_tok)
# Add some other global account data
self.get_success(
self.account_data_handler.add_account_data_for_user(
user_id=user1_id,
account_data_type="org.matrix.doodardaz",
content={"doo": "dar"},
)
)
# Make an incremental Sliding Sync request with the account_data extension enabled
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
self.assertIncludes(
{
global_event["type"]
for global_event in response_body["extensions"]["account_data"].get(
"global"
)
},
# We should only see the new global account data that happened after the `from_token`
{"org.matrix.doodardaz"},
exact=True,
)
self.assertIncludes(
response_body["extensions"]["account_data"].get("rooms").keys(),
set(),
exact=True,
)
def test_room_account_data_initial_sync(self) -> None:
"""
On initial sync, we return all account data for a given room but only for
rooms that we request and are being returned in the Sliding Sync response.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
# Create a room and add some room account data
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
self.get_success(
self.account_data_handler.add_account_data_to_room(
user_id=user1_id,
room_id=room_id1,
account_data_type="org.matrix.roorarraz",
content={"roo": "rar"},
)
)
# Create another room with some room account data
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
self.get_success(
self.account_data_handler.add_account_data_to_room(
user_id=user1_id,
room_id=room_id2,
account_data_type="org.matrix.roorarraz",
content={"roo": "rar"},
)
)
# Make an initial Sliding Sync request with the account_data extension enabled
sync_body = {
"lists": {},
"room_subscriptions": {
room_id1: {
"required_state": [],
"timeline_limit": 0,
}
},
"extensions": {
"account_data": {
"enabled": True,
"rooms": [room_id1, room_id2],
}
},
}
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
self.assertIsNotNone(response_body["extensions"]["account_data"].get("global"))
# Even though we requested room2, we only expect room1 to show up because that's
# the only room in the Sliding Sync response (room2 is not one of our room
# subscriptions or in a sliding window list).
self.assertIncludes(
response_body["extensions"]["account_data"].get("rooms").keys(),
{room_id1},
exact=True,
)
self.assertIncludes(
{
event["type"]
for event in response_body["extensions"]["account_data"]
.get("rooms")
.get(room_id1)
},
{"org.matrix.roorarraz"},
exact=True,
)
def test_room_account_data_incremental_sync(self) -> None:
"""
On incremental sync, we return all account data for a given room but only for
rooms that we request and are being returned in the Sliding Sync response.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
# Create a room and add some room account data
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
self.get_success(
self.account_data_handler.add_account_data_to_room(
user_id=user1_id,
room_id=room_id1,
account_data_type="org.matrix.roorarraz",
content={"roo": "rar"},
)
)
# Create another room with some room account data
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
self.get_success(
self.account_data_handler.add_account_data_to_room(
user_id=user1_id,
room_id=room_id2,
account_data_type="org.matrix.roorarraz",
content={"roo": "rar"},
)
)
sync_body = {
"lists": {},
"room_subscriptions": {
room_id1: {
"required_state": [],
"timeline_limit": 0,
}
},
"extensions": {
"account_data": {
"enabled": True,
"rooms": [room_id1, room_id2],
}
},
}
_, from_token = self.do_sync(sync_body, tok=user1_tok)
# Add some other room account data
self.get_success(
self.account_data_handler.add_account_data_to_room(
user_id=user1_id,
room_id=room_id1,
account_data_type="org.matrix.roorarraz2",
content={"roo": "rar"},
)
)
self.get_success(
self.account_data_handler.add_account_data_to_room(
user_id=user1_id,
room_id=room_id2,
account_data_type="org.matrix.roorarraz2",
content={"roo": "rar"},
)
)
# Make an incremental Sliding Sync request with the account_data extension enabled
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
self.assertIsNotNone(response_body["extensions"]["account_data"].get("global"))
# Even though we requested room2, we only expect room1 to show up because that's
# the only room in the Sliding Sync response (room2 is not one of our room
# subscriptions or in a sliding window list).
self.assertIncludes(
response_body["extensions"]["account_data"].get("rooms").keys(),
{room_id1},
exact=True,
)
# We should only see the new room account data that happened after the `from_token`
self.assertIncludes(
{
event["type"]
for event in response_body["extensions"]["account_data"]
.get("rooms")
.get(room_id1)
},
{"org.matrix.roorarraz2"},
exact=True,
)
def test_room_account_data_relevant_rooms(self) -> None:
"""
Test out different variations of `lists`/`rooms` we are requesting account data for.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
# Create a room and add some room account data
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
self.get_success(
self.account_data_handler.add_account_data_to_room(
user_id=user1_id,
room_id=room_id1,
account_data_type="org.matrix.roorarraz",
content={"roo": "rar"},
)
)
# Create another room with some room account data
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
self.get_success(
self.account_data_handler.add_account_data_to_room(
user_id=user1_id,
room_id=room_id2,
account_data_type="org.matrix.roorarraz",
content={"roo": "rar"},
)
)
# Create another room with some room account data
room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok)
self.get_success(
self.account_data_handler.add_account_data_to_room(
user_id=user1_id,
room_id=room_id3,
account_data_type="org.matrix.roorarraz",
content={"roo": "rar"},
)
)
# Create another room with some room account data
room_id4 = self.helper.create_room_as(user1_id, tok=user1_tok)
self.get_success(
self.account_data_handler.add_account_data_to_room(
user_id=user1_id,
room_id=room_id4,
account_data_type="org.matrix.roorarraz",
content={"roo": "rar"},
)
)
# Create another room with some room account data
room_id5 = self.helper.create_room_as(user1_id, tok=user1_tok)
self.get_success(
self.account_data_handler.add_account_data_to_room(
user_id=user1_id,
room_id=room_id5,
account_data_type="org.matrix.roorarraz",
content={"roo": "rar"},
)
)
room_id_to_human_name_map = {
room_id1: "room1",
room_id2: "room2",
room_id3: "room3",
room_id4: "room4",
room_id5: "room5",
}
# Mix lists and rooms
sync_body = {
"lists": {
# We expect this list range to include room5 and room4
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 0,
},
# We expect this list range to include room5, room4, room3
"bar-list": {
"ranges": [[0, 2]],
"required_state": [],
"timeline_limit": 0,
},
},
"room_subscriptions": {
room_id1: {
"required_state": [],
"timeline_limit": 0,
}
},
"extensions": {
"account_data": {
"enabled": True,
"lists": ["foo-list", "non-existent-list"],
"rooms": [room_id1, room_id2, "!non-existent-room"],
}
},
}
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# room1: ✅ Requested via `rooms` and a room subscription exists
# room2: ❌ Requested via `rooms` but not in the response (from lists or room subscriptions)
# room3: ❌ Not requested
# room4: ✅ Shows up because requested via `lists` and list exists in the response
# room5: ✅ Shows up because requested via `lists` and list exists in the response
self.assertIncludes(
{
room_id_to_human_name_map[room_id]
for room_id in response_body["extensions"]["account_data"]
.get("rooms")
.keys()
},
{"room1", "room4", "room5"},
exact=True,
)
# Try wildcards (this is the default)
sync_body = {
"lists": {
# We expect this list range to include room5 and room4
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 0,
},
# We expect this list range to include room5, room4, room3
"bar-list": {
"ranges": [[0, 2]],
"required_state": [],
"timeline_limit": 0,
},
},
"room_subscriptions": {
room_id1: {
"required_state": [],
"timeline_limit": 0,
}
},
"extensions": {
"account_data": {
"enabled": True,
# "lists": ["*"],
# "rooms": ["*"],
}
},
}
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# room1: ✅ Shows up because of default `rooms` wildcard and is in one of the room subscriptions
# room2: ❌ Not requested
# room3: ✅ Shows up because of default `lists` wildcard and is in a list
# room4: ✅ Shows up because of default `lists` wildcard and is in a list
# room5: ✅ Shows up because of default `lists` wildcard and is in a list
self.assertIncludes(
{
room_id_to_human_name_map[room_id]
for room_id in response_body["extensions"]["account_data"]
.get("rooms")
.keys()
},
{"room1", "room3", "room4", "room5"},
exact=True,
)
# Empty list will return nothing
sync_body = {
"lists": {
# We expect this list range to include room5 and room4
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 0,
},
# We expect this list range to include room5, room4, room3
"bar-list": {
"ranges": [[0, 2]],
"required_state": [],
"timeline_limit": 0,
},
},
"room_subscriptions": {
room_id1: {
"required_state": [],
"timeline_limit": 0,
}
},
"extensions": {
"account_data": {
"enabled": True,
"lists": [],
"rooms": [],
}
},
}
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# room1: ❌ Not requested
# room2: ❌ Not requested
# room3: ❌ Not requested
# room4: ❌ Not requested
# room5: ❌ Not requested
self.assertIncludes(
{
room_id_to_human_name_map[room_id]
for room_id in response_body["extensions"]["account_data"]
.get("rooms")
.keys()
},
set(),
exact=True,
)
# Try wildcard and none
sync_body = {
"lists": {
# We expect this list range to include room5 and room4
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 0,
},
# We expect this list range to include room5, room4, room3
"bar-list": {
"ranges": [[0, 2]],
"required_state": [],
"timeline_limit": 0,
},
},
"room_subscriptions": {
room_id1: {
"required_state": [],
"timeline_limit": 0,
}
},
"extensions": {
"account_data": {
"enabled": True,
"lists": ["*"],
"rooms": [],
}
},
}
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# room1: ❌ Not requested
# room2: ❌ Not requested
# room3: ✅ Shows up because of default `lists` wildcard and is in a list
# room4: ✅ Shows up because of default `lists` wildcard and is in a list
# room5: ✅ Shows up because of default `lists` wildcard and is in a list
self.assertIncludes(
{
room_id_to_human_name_map[room_id]
for room_id in response_body["extensions"]["account_data"]
.get("rooms")
.keys()
},
{"room3", "room4", "room5"},
exact=True,
)
def test_wait_for_new_data(self) -> None:
"""
Test to make sure that the Sliding Sync request waits for new data to arrive.
(Only applies to incremental syncs with a `timeout` specified)
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id, user1_id, tok=user1_tok)
sync_body = {
"lists": {},
"extensions": {
"account_data": {
"enabled": True,
}
},
}
_, from_token = self.do_sync(sync_body, tok=user1_tok)
# Make an incremental Sliding Sync request with the account_data extension enabled
channel = self.make_request(
"POST",
self.sync_endpoint + f"?timeout=10000&pos={from_token}",
content=sync_body,
access_token=user1_tok,
await_result=False,
)
# Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
with self.assertRaises(TimedOutException):
channel.await_result(timeout_ms=5000)
# Bump the global account data to trigger new results
self.get_success(
self.account_data_handler.add_account_data_for_user(
user1_id,
"org.matrix.foobarbaz",
{"foo": "bar"},
)
)
# Should respond before the 10 second timeout
channel.await_result(timeout_ms=3000)
self.assertEqual(channel.code, 200, channel.json_body)
# We should see the global account data update
self.assertIncludes(
{
global_event["type"]
for global_event in channel.json_body["extensions"]["account_data"].get(
"global"
)
},
{"org.matrix.foobarbaz"},
exact=True,
)
self.assertIncludes(
channel.json_body["extensions"]["account_data"].get("rooms").keys(),
set(),
exact=True,
)
def test_wait_for_new_data_timeout(self) -> None:
"""
Test to make sure that the Sliding Sync request waits for new data to arrive but
no data ever arrives so we timeout. We're also making sure that the default data
from the account_data extension doesn't trigger a false-positive for new data.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
sync_body = {
"lists": {},
"extensions": {
"account_data": {
"enabled": True,
}
},
}
_, from_token = self.do_sync(sync_body, tok=user1_tok)
# Make the Sliding Sync request
channel = self.make_request(
"POST",
self.sync_endpoint + f"?timeout=10000&pos={from_token}",
content=sync_body,
access_token=user1_tok,
await_result=False,
)
# Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
with self.assertRaises(TimedOutException):
channel.await_result(timeout_ms=5000)
# Wake-up `notifier.wait_for_events(...)` that will cause us test
# `SlidingSyncResult.__bool__` for new results.
self._bump_notifier_wait_for_events(user1_id)
# Block for a little bit more to ensure we don't see any new results.
with self.assertRaises(TimedOutException):
channel.await_result(timeout_ms=4000)
# Wait for the sync to complete (wait for the rest of the 10 second timeout,
# 5000 + 4000 + 1200 > 10000)
channel.await_result(timeout_ms=1200)
self.assertEqual(channel.code, 200, channel.json_body)
self.assertIsNotNone(
channel.json_body["extensions"]["account_data"].get("global")
)
self.assertIsNotNone(
channel.json_body["extensions"]["account_data"].get("rooms")
)