Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes

This commit is contained in:
Erik Johnston 2024-07-30 09:31:42 +01:00
commit 9cdfb4e08d
16 changed files with 2701 additions and 1256 deletions

View file

@ -0,0 +1 @@
Track which rooms have been sent to clients in the experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.

1
changelog.d/17476.doc Normal file
View file

@ -0,0 +1 @@
Update the [`allowed_local_3pids`](https://element-hq.github.io/synapse/v1.112/usage/configuration/config_documentation.html#allowed_local_3pids) config option's msisdn address to a working example.

View file

@ -0,0 +1 @@
Add Account Data extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.

1
changelog.d/17479.misc Normal file
View file

@ -0,0 +1 @@
Do not send down empty room entries down experimental sliding sync endpoint.

1
changelog.d/17481.misc Normal file
View file

@ -0,0 +1 @@
Refactor Sliding Sync tests to better utilize the `SlidingSyncBase`.

1
changelog.d/17482.misc Normal file
View file

@ -0,0 +1 @@
Refactor Sliding Sync tests to better utilize the `SlidingSyncBase`.

View file

@ -2386,7 +2386,7 @@ enable_registration_without_verification: true
--- ---
### `registrations_require_3pid` ### `registrations_require_3pid`
If this is set, users must provide all of the specified types of 3PID when registering an account. If this is set, users must provide all of the specified types of [3PID](https://spec.matrix.org/latest/appendices/#3pid-types) when registering an account.
Note that [`enable_registration`](#enable_registration) must also be set to allow account registration. Note that [`enable_registration`](#enable_registration) must also be set to allow account registration.
@ -2411,6 +2411,9 @@ disable_msisdn_registration: true
Mandate that users are only allowed to associate certain formats of Mandate that users are only allowed to associate certain formats of
3PIDs with accounts on this server, as specified by the `medium` and `pattern` sub-options. 3PIDs with accounts on this server, as specified by the `medium` and `pattern` sub-options.
`pattern` is a [Perl-like regular expression](https://docs.python.org/3/library/re.html#module-re).
More information about 3PIDs, allowed `medium` types and their `address` syntax can be found [in the Matrix spec](https://spec.matrix.org/latest/appendices/#3pid-types).
Example configuration: Example configuration:
```yaml ```yaml
@ -2420,7 +2423,7 @@ allowed_local_3pids:
- medium: email - medium: email
pattern: '^[^@]+@vector\.im$' pattern: '^[^@]+@vector\.im$'
- medium: msisdn - medium: msisdn
pattern: '\+44' pattern: '^44\d{10}$'
``` ```
--- ---
### `enable_3pid_lookup` ### `enable_3pid_lookup`

View file

@ -201,8 +201,8 @@ netaddr = ">=0.7.18"
# add a lower bound to the Jinja2 dependency. # add a lower bound to the Jinja2 dependency.
Jinja2 = ">=3.0" Jinja2 = ">=3.0"
bleach = ">=1.4.3" bleach = ">=1.4.3"
# We use `Self`, which were added in `typing-extensions` 4.0. # We use `assert_never`, which were added in `typing-extensions` 4.1.
typing-extensions = ">=4.0" typing-extensions = ">=4.1"
# We enforce that we have a `cryptography` version that bundles an `openssl` # We enforce that we have a `cryptography` version that bundles an `openssl`
# with the latest security patches. # with the latest security patches.
cryptography = ">=3.4.7" cryptography = ">=3.4.7"

View file

@ -18,6 +18,7 @@
# #
# #
import logging import logging
from enum import Enum
from itertools import chain from itertools import chain
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
@ -34,6 +35,7 @@ from typing import (
import attr import attr
from immutabledict import immutabledict from immutabledict import immutabledict
from typing_extensions import assert_never
from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership
from synapse.events import EventBase from synapse.events import EventBase
@ -46,11 +48,13 @@ from synapse.storage.roommember import MemberSummary
from synapse.types import ( from synapse.types import (
DeviceListUpdates, DeviceListUpdates,
JsonDict, JsonDict,
JsonMapping,
PersistedEventPosition, PersistedEventPosition,
Requester, Requester,
RoomStreamToken, RoomStreamToken,
SlidingSyncStreamToken, SlidingSyncStreamToken,
StateMap, StateMap,
StrCollection,
StreamKeyType, StreamKeyType,
StreamToken, StreamToken,
UserID, UserID,
@ -357,8 +361,11 @@ class SlidingSyncHandler:
self.event_sources = hs.get_event_sources() self.event_sources = hs.get_event_sources()
self.relations_handler = hs.get_relations_handler() self.relations_handler = hs.get_relations_handler()
self.device_handler = hs.get_device_handler() self.device_handler = hs.get_device_handler()
self.push_rules_handler = hs.get_push_rules_handler()
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
self.connection_store = SlidingSyncConnectionStore()
async def wait_for_sync_for_user( async def wait_for_sync_for_user(
self, self,
requester: Requester, requester: Requester,
@ -462,6 +469,11 @@ class SlidingSyncHandler:
# See https://github.com/matrix-org/matrix-doc/issues/1144 # See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError() raise NotImplementedError()
await self.connection_store.mark_token_seen(
sync_config=sync_config,
from_token=from_token,
)
# Get all of the room IDs that the user should be able to see in the sync # Get all of the room IDs that the user should be able to see in the sync
# response # response
has_lists = sync_config.lists is not None and len(sync_config.lists) > 0 has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
@ -607,11 +619,56 @@ class SlidingSyncHandler:
# Fetch room data # Fetch room data
rooms: Dict[str, SlidingSyncResult.RoomResult] = {} rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
# Filter out rooms that haven't received updates and we've sent down
# previously.
if from_token:
rooms_should_send = set()
# First we check if there are rooms that match a list/room
# subscription and have updates we need to send (i.e. either because
# we haven't sent the room down, or we have but there are missing
# updates).
for room_id in relevant_room_map:
status = await self.connection_store.have_sent_room(
sync_config,
from_token.connection_position,
room_id,
)
if (
# The room was never sent down before so the client needs to know
# about it regardless of any updates.
status.status == HaveSentRoomFlag.NEVER
# `PREVIOUSLY` literally means the "room was sent down before *AND*
# there are updates we haven't sent down" so we already know this
# room has updates.
or status.status == HaveSentRoomFlag.PREVIOUSLY
):
rooms_should_send.add(room_id)
elif status.status == HaveSentRoomFlag.LIVE:
# We know that we've sent all updates up until `from_token`,
# so we just need to check if there have been updates since
# then.
pass
else:
assert_never(status.status)
# We only need to check for new events since any state changes
# will also come down as new events.
rooms_that_have_updates = self.store.get_rooms_that_might_have_updates(
relevant_room_map.keys(), from_token.stream_token.room_key
)
rooms_should_send.update(rooms_that_have_updates)
relevant_room_map = {
room_id: room_sync_config
for room_id, room_sync_config in relevant_room_map.items()
if room_id in rooms_should_send
}
@trace @trace
@tag_args @tag_args
async def handle_room(room_id: str) -> None: async def handle_room(room_id: str) -> None:
room_sync_result = await self.get_room_sync_data( room_sync_result = await self.get_room_sync_data(
user=sync_config.user, sync_config=sync_config,
room_id=room_id, room_id=room_id,
room_sync_config=relevant_room_map[room_id], room_sync_config=relevant_room_map[room_id],
room_membership_for_user_at_to_token=room_membership_for_user_map[ room_membership_for_user_at_to_token=room_membership_for_user_map[
@ -621,22 +678,36 @@ class SlidingSyncHandler:
to_token=to_token, to_token=to_token,
) )
rooms[room_id] = room_sync_result # Filter out empty room results during incremental sync
if room_sync_result or not from_token:
rooms[room_id] = room_sync_result
with start_active_span("sliding_sync.generate_room_entries"): with start_active_span("sliding_sync.generate_room_entries"):
await concurrently_execute(handle_room, relevant_room_map, 10) await concurrently_execute(handle_room, relevant_room_map, 10)
extensions = await self.get_extensions_response( extensions = await self.get_extensions_response(
sync_config=sync_config, sync_config=sync_config,
lists=lists,
from_token=from_token, from_token=from_token,
to_token=to_token, to_token=to_token,
) )
# TODO: Update this when we implement per-connection state if has_lists or has_room_subscriptions:
connection_token = 0 connection_position = await self.connection_store.record_rooms(
sync_config=sync_config,
from_token=from_token,
sent_room_ids=relevant_room_map.keys(),
# TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
unsent_room_ids=[],
)
elif from_token:
connection_position = from_token.connection_position
else:
# Initial sync without a `from_token` starts at `0`
connection_position = 0
return SlidingSyncResult( return SlidingSyncResult(
next_pos=SlidingSyncStreamToken(to_token, connection_token), next_pos=SlidingSyncStreamToken(to_token, connection_position),
lists=lists, lists=lists,
rooms=rooms, rooms=rooms,
extensions=extensions, extensions=extensions,
@ -1367,7 +1438,7 @@ class SlidingSyncHandler:
async def get_room_sync_data( async def get_room_sync_data(
self, self,
user: UserID, sync_config: SlidingSyncConfig,
room_id: str, room_id: str,
room_sync_config: RoomSyncConfig, room_sync_config: RoomSyncConfig,
room_membership_for_user_at_to_token: _RoomMembershipForUser, room_membership_for_user_at_to_token: _RoomMembershipForUser,
@ -1389,6 +1460,37 @@ class SlidingSyncHandler:
from_token: The point in the stream to sync from. from_token: The point in the stream to sync from.
to_token: The point in the stream to sync up to. to_token: The point in the stream to sync up to.
""" """
user = sync_config.user
# Determine whether we should limit the timeline to the token range.
#
# We should return historical messages (before token range) in the
# following cases because we want clients to be able to show a basic
# screen of information:
# - Initial sync (because no `from_token` to limit us anyway)
# - When users `newly_joined`
# - For an incremental sync where we haven't sent it down this
# connection before
from_bound = None
initial = True
if from_token and not room_membership_for_user_at_to_token.newly_joined:
room_status = await self.connection_store.have_sent_room(
sync_config=sync_config,
connection_token=from_token.connection_position,
room_id=room_id,
)
if room_status.status == HaveSentRoomFlag.LIVE:
from_bound = from_token.stream_token.room_key
initial = False
elif room_status.status == HaveSentRoomFlag.PREVIOUSLY:
assert room_status.last_token is not None
from_bound = room_status.last_token
initial = False
elif room_status.status == HaveSentRoomFlag.NEVER:
from_bound = None
initial = True
else:
assert_never(room_status.status)
# Assemble the list of timeline events # Assemble the list of timeline events
# #
@ -1415,36 +1517,23 @@ class SlidingSyncHandler:
prev_batch_token = to_token prev_batch_token = to_token
# We're going to paginate backwards from the `to_token` # We're going to paginate backwards from the `to_token`
from_bound = to_token.room_key to_bound = to_token.room_key
# People shouldn't see past their leave/ban event # People shouldn't see past their leave/ban event
if room_membership_for_user_at_to_token.membership in ( if room_membership_for_user_at_to_token.membership in (
Membership.LEAVE, Membership.LEAVE,
Membership.BAN, Membership.BAN,
): ):
from_bound = ( to_bound = (
room_membership_for_user_at_to_token.event_pos.to_room_stream_token() room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
) )
# Determine whether we should limit the timeline to the token range.
#
# We should return historical messages (before token range) in the
# following cases because we want clients to be able to show a basic
# screen of information:
# - Initial sync (because no `from_token` to limit us anyway)
# - When users `newly_joined`
# - TODO: For an incremental sync where we haven't sent it down this
# connection before
to_bound = (
from_token.stream_token.room_key
if from_token is not None
and not room_membership_for_user_at_to_token.newly_joined
else None
)
timeline_events, new_room_key = await self.store.paginate_room_events( timeline_events, new_room_key = await self.store.paginate_room_events(
room_id=room_id, room_id=room_id,
from_key=from_bound, # The bounds are reversed so we can paginate backwards
to_key=to_bound, # (from newer to older events) starting at to_bound.
# This ensures we fill the `limit` with the newest events first,
from_key=to_bound,
to_key=from_bound,
direction=Direction.BACKWARDS, direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate # We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`) # the limit or not (see `limited`)
@ -1561,12 +1650,6 @@ class SlidingSyncHandler:
# indicate to the client that a state reset happened. Perhaps we should indicate # indicate to the client that a state reset happened. Perhaps we should indicate
# this by setting `initial: True` and empty `required_state`. # this by setting `initial: True` and empty `required_state`.
# TODO: Since we can't determine whether we've already sent a room down this
# Sliding Sync connection before (we plan to add this optimization in the
# future), we're always returning the requested room state instead of
# updates.
initial = True
# Check whether the room has a name set # Check whether the room has a name set
name_state_ids = await self.get_current_state_ids_at( name_state_ids = await self.get_current_state_ids_at(
room_id=room_id, room_id=room_id,
@ -1712,9 +1795,22 @@ class SlidingSyncHandler:
to_token=to_token, to_token=to_token,
) )
else: else:
# TODO: Once we can figure out if we've sent a room down this connection before, assert from_bound is not None
# we can return updates instead of the full required state.
raise NotImplementedError() # TODO: Limit the number of state events we're about to send down
# the room, if its too many we should change this to an
# `initial=True`?
deltas = await self.store.get_current_state_deltas_for_room(
room_id=room_id,
from_token=from_bound,
to_token=to_token.room_key,
)
# TODO: Filter room state before fetching events
# TODO: Handle state resets where event_id is None
events = await self.store.get_events(
[d.event_id for d in deltas if d.event_id]
)
room_state = {(s.type, s.state_key): s for s in events.values()}
required_room_state: StateMap[EventBase] = {} required_room_state: StateMap[EventBase] = {}
if required_state_filter != StateFilter.none(): if required_state_filter != StateFilter.none():
@ -1797,6 +1893,7 @@ class SlidingSyncHandler:
async def get_extensions_response( async def get_extensions_response(
self, self,
sync_config: SlidingSyncConfig, sync_config: SlidingSyncConfig,
lists: Dict[str, SlidingSyncResult.SlidingWindowList],
to_token: StreamToken, to_token: StreamToken,
from_token: Optional[SlidingSyncStreamToken], from_token: Optional[SlidingSyncStreamToken],
) -> SlidingSyncResult.Extensions: ) -> SlidingSyncResult.Extensions:
@ -1804,6 +1901,7 @@ class SlidingSyncHandler:
Args: Args:
sync_config: Sync configuration sync_config: Sync configuration
lists: Sliding window API. A map of list key to list results.
to_token: The point in the stream to sync up to. to_token: The point in the stream to sync up to.
from_token: The point in the stream to sync from. from_token: The point in the stream to sync from.
""" """
@ -1828,9 +1926,20 @@ class SlidingSyncHandler:
from_token=from_token, from_token=from_token,
) )
account_data_response = None
if sync_config.extensions.account_data is not None:
account_data_response = await self.get_account_data_extension_response(
sync_config=sync_config,
lists=lists,
account_data_request=sync_config.extensions.account_data,
to_token=to_token,
from_token=from_token,
)
return SlidingSyncResult.Extensions( return SlidingSyncResult.Extensions(
to_device=to_device_response, to_device=to_device_response,
e2ee=e2ee_response, e2ee=e2ee_response,
account_data=account_data_response,
) )
async def get_to_device_extension_response( async def get_to_device_extension_response(
@ -1847,7 +1956,7 @@ class SlidingSyncHandler:
to_token: The point in the stream to sync up to. to_token: The point in the stream to sync up to.
""" """
user_id = sync_config.user.to_string() user_id = sync_config.user.to_string()
device_id = sync_config.device_id device_id = sync_config.requester.device_id
# Skip if the extension is not enabled # Skip if the extension is not enabled
if not to_device_request.enabled: if not to_device_request.enabled:
@ -1923,7 +2032,7 @@ class SlidingSyncHandler:
from_token: The point in the stream to sync from. from_token: The point in the stream to sync from.
""" """
user_id = sync_config.user.to_string() user_id = sync_config.user.to_string()
device_id = sync_config.device_id device_id = sync_config.requester.device_id
# Skip if the extension is not enabled # Skip if the extension is not enabled
if not e2ee_request.enabled: if not e2ee_request.enabled:
@ -1956,3 +2065,357 @@ class SlidingSyncHandler:
device_one_time_keys_count=device_one_time_keys_count, device_one_time_keys_count=device_one_time_keys_count,
device_unused_fallback_key_types=device_unused_fallback_key_types, device_unused_fallback_key_types=device_unused_fallback_key_types,
) )
async def get_account_data_extension_response(
self,
sync_config: SlidingSyncConfig,
lists: Dict[str, SlidingSyncResult.SlidingWindowList],
account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension,
to_token: StreamToken,
from_token: Optional[SlidingSyncStreamToken],
) -> Optional[SlidingSyncResult.Extensions.AccountDataExtension]:
"""Handle Account Data extension (MSC3959)
Args:
sync_config: Sync configuration
lists: Sliding window API. A map of list key to list results.
account_data_request: The account_data extension from the request
to_token: The point in the stream to sync up to.
from_token: The point in the stream to sync from.
"""
user_id = sync_config.user.to_string()
# Skip if the extension is not enabled
if not account_data_request.enabled:
return None
global_account_data_map: Mapping[str, JsonMapping] = {}
if from_token is not None:
global_account_data_map = (
await self.store.get_updated_global_account_data_for_user(
user_id, from_token.stream_token.account_data_key
)
)
have_push_rules_changed = await self.store.have_push_rules_changed_for_user(
user_id, from_token.stream_token.push_rules_key
)
if have_push_rules_changed:
global_account_data_map = dict(global_account_data_map)
global_account_data_map[AccountDataTypes.PUSH_RULES] = (
await self.push_rules_handler.push_rules_for_user(sync_config.user)
)
else:
all_global_account_data = await self.store.get_global_account_data_for_user(
user_id
)
global_account_data_map = dict(all_global_account_data)
global_account_data_map[AccountDataTypes.PUSH_RULES] = (
await self.push_rules_handler.push_rules_for_user(sync_config.user)
)
# We only want to include account data for rooms that are already in the sliding
# sync response AND that were requested in the account data request.
relevant_room_ids: Set[str] = set()
# See what rooms from the room subscriptions we should get account data for
if (
account_data_request.rooms is not None
and sync_config.room_subscriptions is not None
):
actual_room_ids = sync_config.room_subscriptions.keys()
for room_id in account_data_request.rooms:
# A wildcard means we process all rooms from the room subscriptions
if room_id == "*":
relevant_room_ids.update(sync_config.room_subscriptions.keys())
break
if room_id in actual_room_ids:
relevant_room_ids.add(room_id)
# See what rooms from the sliding window lists we should get account data for
if account_data_request.lists is not None:
for list_key in account_data_request.lists:
# Just some typing because we share the variable name in multiple places
actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None
# A wildcard means we process rooms from all lists
if list_key == "*":
for actual_list in lists.values():
# We only expect a single SYNC operation for any list
assert len(actual_list.ops) == 1
sync_op = actual_list.ops[0]
assert sync_op.op == OperationType.SYNC
relevant_room_ids.update(sync_op.room_ids)
break
actual_list = lists.get(list_key)
if actual_list is not None:
# We only expect a single SYNC operation for any list
assert len(actual_list.ops) == 1
sync_op = actual_list.ops[0]
assert sync_op.op == OperationType.SYNC
relevant_room_ids.update(sync_op.room_ids)
# Fetch room account data
account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] = {}
if len(relevant_room_ids) > 0:
if from_token is not None:
account_data_by_room_map = (
await self.store.get_updated_room_account_data_for_user(
user_id, from_token.stream_token.account_data_key
)
)
else:
account_data_by_room_map = (
await self.store.get_room_account_data_for_user(user_id)
)
# Filter down to the relevant rooms
account_data_by_room_map = {
room_id: account_data_map
for room_id, account_data_map in account_data_by_room_map.items()
if room_id in relevant_room_ids
}
return SlidingSyncResult.Extensions.AccountDataExtension(
global_account_data_map=global_account_data_map,
account_data_by_room_map=account_data_by_room_map,
)
class HaveSentRoomFlag(Enum):
"""Flag for whether we have sent the room down a sliding sync connection.
The valid state changes here are:
NEVER -> LIVE
LIVE -> PREVIOUSLY
PREVIOUSLY -> LIVE
"""
# The room has never been sent down (or we have forgotten we have sent it
# down).
NEVER = 1
# We have previously sent the room down, but there are updates that we
# haven't sent down.
PREVIOUSLY = 2
# We have sent the room down and the client has received all updates.
LIVE = 3
@attr.s(auto_attribs=True, slots=True, frozen=True)
class HaveSentRoom:
"""Whether we have sent the room down a sliding sync connection.
Attributes:
status: Flag of if we have or haven't sent down the room
last_token: If the flag is `PREVIOUSLY` then this is non-null and
contains the last stream token of the last updates we sent down
the room, i.e. we still need to send everything since then to the
client.
"""
status: HaveSentRoomFlag
last_token: Optional[RoomStreamToken]
@staticmethod
def previously(last_token: RoomStreamToken) -> "HaveSentRoom":
"""Constructor for `PREVIOUSLY` flag."""
return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token)
HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None)
HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None)
@attr.s(auto_attribs=True)
class SlidingSyncConnectionStore:
"""In-memory store of per-connection state, including what rooms we have
previously sent down a sliding sync connection.
Note: This is NOT safe to run in a worker setup because connection positions will
point to different sets of rooms on different workers. e.g. for the same connection,
a connection position of 5 might have totally different states on worker A and
worker B.
One complication that we need to deal with here is needing to handle requests being
resent, i.e. if we sent down a room in a response that the client received, we must
consider the room *not* sent when we get the request again.
This is handled by using an integer "token", which is returned to the client
as part of the sync token. For each connection we store a mapping from
tokens to the room states, and create a new entry when we send down new
rooms.
Note that for any given sliding sync connection we will only store a maximum
of two different tokens: the previous token from the request and a new token
sent in the response. When we receive a request with a given token, we then
clear out all other entries with a different token.
Attributes:
_connections: Mapping from `(user_id, conn_id)` to mapping of `token`
to mapping of room ID to `HaveSentRoom`.
"""
# `(user_id, conn_id)` -> `token` -> `room_id` -> `HaveSentRoom`
_connections: Dict[Tuple[str, str], Dict[int, Dict[str, HaveSentRoom]]] = (
attr.Factory(dict)
)
async def have_sent_room(
self, sync_config: SlidingSyncConfig, connection_token: int, room_id: str
) -> HaveSentRoom:
"""For the given user_id/conn_id/token, return whether we have
previously sent the room down
"""
conn_key = self._get_connection_key(sync_config)
sync_statuses = self._connections.setdefault(conn_key, {})
room_status = sync_statuses.get(connection_token, {}).get(
room_id, HAVE_SENT_ROOM_NEVER
)
return room_status
async def record_rooms(
self,
sync_config: SlidingSyncConfig,
from_token: Optional[SlidingSyncStreamToken],
*,
sent_room_ids: StrCollection,
unsent_room_ids: StrCollection,
) -> int:
"""Record which rooms we have/haven't sent down in a new response
Attributes:
sync_config
from_token: The since token from the request, if any
sent_room_ids: The set of room IDs that we have sent down as
part of this request (only needs to be ones we didn't
previously sent down).
unsent_room_ids: The set of room IDs that have had updates
since the `from_token`, but which were not included in
this request
"""
prev_connection_token = 0
if from_token is not None:
prev_connection_token = from_token.connection_position
# If there are no changes then this is a noop.
if not sent_room_ids and not unsent_room_ids:
return prev_connection_token
conn_key = self._get_connection_key(sync_config)
sync_statuses = self._connections.setdefault(conn_key, {})
# Generate a new token, removing any existing entries in that token
# (which can happen if requests get resent).
new_store_token = prev_connection_token + 1
sync_statuses.pop(new_store_token, None)
# Copy over and update the room mappings.
new_room_statuses = dict(sync_statuses.get(prev_connection_token, {}))
# Whether we have updated the `new_room_statuses`, if we don't by the
# end we can treat this as a noop.
have_updated = False
for room_id in sent_room_ids:
new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE
have_updated = True
# Whether we add/update the entries for unsent rooms depends on the
# existing entry:
# - LIVE: We have previously sent down everything up to
# `last_room_token, so we update the entry to be `PREVIOUSLY` with
# `last_room_token`.
# - PREVIOUSLY: We have previously sent down everything up to *a*
# given token, so we don't need to update the entry.
# - NEVER: We have never previously sent down the room, and we haven't
# sent anything down this time either so we leave it as NEVER.
# Work out the new state for unsent rooms that were `LIVE`.
if from_token:
new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key)
else:
new_unsent_state = HAVE_SENT_ROOM_NEVER
for room_id in unsent_room_ids:
prev_state = new_room_statuses.get(room_id)
if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE:
new_room_statuses[room_id] = new_unsent_state
have_updated = True
if not have_updated:
return prev_connection_token
sync_statuses[new_store_token] = new_room_statuses
return new_store_token
async def mark_token_seen(
self,
sync_config: SlidingSyncConfig,
from_token: Optional[SlidingSyncStreamToken],
) -> None:
"""We have received a request with the given token, so we can clear out
any other tokens associated with the connection.
If there is no from token then we have started afresh, and so we delete
all tokens associated with the device.
"""
# Clear out any tokens for the connection that doesn't match the one
# from the request.
conn_key = self._get_connection_key(sync_config)
sync_statuses = self._connections.pop(conn_key, {})
if from_token is None:
return
sync_statuses = {
connection_token: room_statuses
for connection_token, room_statuses in sync_statuses.items()
if connection_token == from_token.connection_position
}
if sync_statuses:
self._connections[conn_key] = sync_statuses
@staticmethod
def _get_connection_key(sync_config: SlidingSyncConfig) -> Tuple[str, str]:
"""Return a unique identifier for this connection.
The first part is simply the user ID.
The second part is generally a combination of device ID and conn_id.
However, both these two are optional (e.g. puppet access tokens don't
have device IDs), so this handles those edge cases.
We use this over the raw `conn_id` to avoid clashes between different
clients that use the same `conn_id`. Imagine a user uses a web client
that uses `conn_id: main_sync_loop` and an Android client that also has
a `conn_id: main_sync_loop`.
"""
user_id = sync_config.user.to_string()
# Only one sliding sync connection is allowed per given conn_id (empty
# or not).
conn_id = sync_config.conn_id or ""
if sync_config.requester.device_id:
return (user_id, f"D/{sync_config.requester.device_id}/{conn_id}")
if sync_config.requester.access_token_id:
# If we don't have a device, then the access token ID should be a
# stable ID.
return (user_id, f"A/{sync_config.requester.access_token_id}/{conn_id}")
# If we have neither then its likely an AS or some weird token. Either
# way we can just fail here.
raise Exception("Cannot use sliding sync with access token type")

View file

@ -881,7 +881,6 @@ class SlidingSyncRestServlet(RestServlet):
) )
user = requester.user user = requester.user
device_id = requester.device_id
timeout = parse_integer(request, "timeout", default=0) timeout = parse_integer(request, "timeout", default=0)
# Position in the stream # Position in the stream
@ -902,11 +901,12 @@ class SlidingSyncRestServlet(RestServlet):
sync_config = SlidingSyncConfig( sync_config = SlidingSyncConfig(
user=user, user=user,
device_id=device_id, requester=requester,
# FIXME: Currently, we're just manually copying the fields from the # FIXME: Currently, we're just manually copying the fields from the
# `SlidingSyncBody` into the config. How can we gurantee into the future # `SlidingSyncBody` into the config. How can we guarantee into the future
# that we don't forget any? I would like something more structured like # that we don't forget any? I would like something more structured like
# `copy_attributes(from=body, to=config)` # `copy_attributes(from=body, to=config)`
conn_id=body.conn_id,
lists=body.lists, lists=body.lists,
room_subscriptions=body.room_subscriptions, room_subscriptions=body.room_subscriptions,
extensions=body.extensions, extensions=body.extensions,
@ -929,7 +929,6 @@ class SlidingSyncRestServlet(RestServlet):
return 200, response_content return 200, response_content
# TODO: Is there a better way to encode things?
async def encode_response( async def encode_response(
self, self,
requester: Requester, requester: Requester,
@ -1117,6 +1116,24 @@ class SlidingSyncRestServlet(RestServlet):
extensions.e2ee.device_list_updates.left extensions.e2ee.device_list_updates.left
) )
if extensions.account_data is not None:
serialized_extensions["account_data"] = {
# Same as the the top-level `account_data.events` field in Sync v2.
"global": [
{"type": account_data_type, "content": content}
for account_data_type, content in extensions.account_data.global_account_data_map.items()
],
# Same as the joined room's account_data field in Sync v2, e.g the path
# `rooms.join["!foo:bar"].account_data.events`.
"rooms": {
room_id: [
{"type": account_data_type, "content": content}
for account_data_type, content in event_map.items()
]
for room_id, event_map in extensions.account_data.account_data_by_room_map.items()
},
}
return serialized_extensions return serialized_extensions

View file

@ -559,6 +559,7 @@ class HomeServer(metaclass=abc.ABCMeta):
def get_sync_handler(self) -> SyncHandler: def get_sync_handler(self) -> SyncHandler:
return SyncHandler(self) return SyncHandler(self)
@cache_in_self
def get_sliding_sync_handler(self) -> SlidingSyncHandler: def get_sliding_sync_handler(self) -> SlidingSyncHandler:
return SlidingSyncHandler(self) return SlidingSyncHandler(self)

View file

@ -26,6 +26,8 @@ import attr
from synapse.storage._base import SQLBaseStore from synapse.storage._base import SQLBaseStore
from synapse.storage.database import LoggingTransaction from synapse.storage.database import LoggingTransaction
from synapse.storage.databases.main.stream import _filter_results_by_stream
from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.caches.stream_change_cache import StreamChangeCache
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -156,3 +158,38 @@ class StateDeltasStore(SQLBaseStore):
"get_max_stream_id_in_current_state_deltas", "get_max_stream_id_in_current_state_deltas",
self._get_max_stream_id_in_current_state_deltas_txn, self._get_max_stream_id_in_current_state_deltas_txn,
) )
async def get_current_state_deltas_for_room(
self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken
) -> List[StateDelta]:
"""Get the state deltas between two tokens."""
def get_current_state_deltas_for_room_txn(
txn: LoggingTransaction,
) -> List[StateDelta]:
sql = """
SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id
FROM current_state_delta_stream
WHERE room_id = ? AND ? < stream_id AND stream_id <= ?
ORDER BY stream_id ASC
"""
txn.execute(
sql, (room_id, from_token.stream, to_token.get_max_stream_pos())
)
return [
StateDelta(
stream_id=row[1],
room_id=room_id,
event_type=row[2],
state_key=row[3],
event_id=row[4],
prev_event_id=row[5],
)
for row in txn
if _filter_results_by_stream(from_token, to_token, row[0], row[1])
]
return await self.db_pool.runInteraction(
"get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
)

View file

@ -2104,3 +2104,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return RoomStreamToken(stream=last_position.stream - 1) return RoomStreamToken(stream=last_position.stream - 1)
return None return None
def get_rooms_that_might_have_updates(
self, room_ids: StrCollection, from_token: RoomStreamToken
) -> StrCollection:
"""Filters given room IDs down to those that might have updates, i.e.
removes rooms that definitely do not have updates.
"""
return self._events_stream_cache.get_entities_changed(
room_ids, from_token.stream
)

View file

@ -35,6 +35,7 @@ from synapse.types import (
DeviceListUpdates, DeviceListUpdates,
JsonDict, JsonDict,
JsonMapping, JsonMapping,
Requester,
SlidingSyncStreamToken, SlidingSyncStreamToken,
StreamToken, StreamToken,
UserID, UserID,
@ -109,7 +110,7 @@ class SlidingSyncConfig(SlidingSyncBody):
""" """
user: UserID user: UserID
device_id: Optional[str] requester: Requester
# Pydantic config # Pydantic config
class Config: class Config:
@ -237,6 +238,17 @@ class SlidingSyncResult:
notification_count: int notification_count: int
highlight_count: int highlight_count: int
def __bool__(self) -> bool:
return (
# If this is the first time the client is seeing the room, we should not filter it out
# under any circumstance.
self.initial
# We need to let the client know if there are any new events
or bool(self.required_state)
or bool(self.timeline_events)
or bool(self.stripped_state)
)
@attr.s(slots=True, frozen=True, auto_attribs=True) @attr.s(slots=True, frozen=True, auto_attribs=True)
class SlidingWindowList: class SlidingWindowList:
""" """
@ -330,11 +342,31 @@ class SlidingSyncResult:
or self.device_unused_fallback_key_types or self.device_unused_fallback_key_types
) )
@attr.s(slots=True, frozen=True, auto_attribs=True)
class AccountDataExtension:
"""The Account Data extension (MSC3959)
Attributes:
global_account_data_map: Mapping from `type` to `content` of global account
data events.
account_data_by_room_map: Mapping from room_id to mapping of `type` to
`content` of room account data events.
"""
global_account_data_map: Mapping[str, JsonMapping]
account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]]
def __bool__(self) -> bool:
return bool(
self.global_account_data_map or self.account_data_by_room_map
)
to_device: Optional[ToDeviceExtension] = None to_device: Optional[ToDeviceExtension] = None
e2ee: Optional[E2eeExtension] = None e2ee: Optional[E2eeExtension] = None
account_data: Optional[AccountDataExtension] = None
def __bool__(self) -> bool: def __bool__(self) -> bool:
return bool(self.to_device or self.e2ee) return bool(self.to_device or self.e2ee or self.account_data)
next_pos: SlidingSyncStreamToken next_pos: SlidingSyncStreamToken
lists: Dict[str, SlidingWindowList] lists: Dict[str, SlidingWindowList]
@ -346,7 +378,11 @@ class SlidingSyncResult:
to tell if the notifier needs to wait for more events when polling for to tell if the notifier needs to wait for more events when polling for
events. events.
""" """
return bool(self.lists or self.rooms or self.extensions) # We don't include `self.lists` here, as a) `lists` is always non-empty even if
# there are no changes, and b) since we're sorting rooms by `stream_ordering` of
# the latest activity, anything that would cause the order to change would end
# up in `self.rooms` and cause us to send down the change.
return bool(self.rooms or self.extensions)
@staticmethod @staticmethod
def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult": def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult":

View file

@ -120,6 +120,9 @@ class SlidingSyncBody(RequestBodyModel):
Sliding Sync API request body. Sliding Sync API request body.
Attributes: Attributes:
conn_id: An optional string to identify this connection to the server.
Only one sliding sync connection is allowed per given conn_id (empty
or not).
lists: Sliding window API. A map of list key to list information lists: Sliding window API. A map of list key to list information
(:class:`SlidingSyncList`). Max lists: 100. The list keys should be (:class:`SlidingSyncList`). Max lists: 100. The list keys should be
arbitrary strings which the client is using to refer to the list. Keep this arbitrary strings which the client is using to refer to the list. Keep this
@ -322,8 +325,28 @@ class SlidingSyncBody(RequestBodyModel):
enabled: Optional[StrictBool] = False enabled: Optional[StrictBool] = False
class AccountDataExtension(RequestBodyModel):
"""The Account Data extension (MSC3959)
Attributes:
enabled
lists: List of list keys (from the Sliding Window API) to apply this
extension to.
rooms: List of room IDs (from the Room Subscription API) to apply this
extension to.
"""
enabled: Optional[StrictBool] = False
# Process all lists defined in the Sliding Window API. (This is the default.)
lists: Optional[List[StrictStr]] = ["*"]
# Process all room subscriptions defined in the Room Subscription API. (This is the default.)
rooms: Optional[List[StrictStr]] = ["*"]
to_device: Optional[ToDeviceExtension] = None to_device: Optional[ToDeviceExtension] = None
e2ee: Optional[E2eeExtension] = None e2ee: Optional[E2eeExtension] = None
account_data: Optional[AccountDataExtension] = None
conn_id: Optional[str]
# mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884 # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
if TYPE_CHECKING: if TYPE_CHECKING:

File diff suppressed because it is too large Load diff