Sliding Sync: Speed up incremental sync by avoiding extra work (#17665)

Speed up incremental sync by avoiding extra work. We first look at the
state delta changes and only fetch and calculate further derived things
if they have changed.
This commit is contained in:
Eric Eastwood 2024-09-09 04:36:22 -05:00 committed by GitHub
parent e5d07bb083
commit 5389374ef8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 472 additions and 47 deletions

1
changelog.d/17665.misc Normal file
View file

@ -0,0 +1 @@
Speed up incremental Sliding Sync requests by avoiding extra work.

View file

@ -44,6 +44,7 @@ from synapse.storage.roommember import (
)
from synapse.types import (
JsonDict,
MutableStateMap,
PersistedEventPosition,
Requester,
RoomStreamToken,
@ -753,26 +754,78 @@ class SlidingSyncHandler:
# indicate to the client that a state reset happened. Perhaps we should indicate
# this by setting `initial: True` and empty `required_state`.
# Check whether the room has a name set
name_state_ids = await self.get_current_state_ids_at(
room_id=room_id,
room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
state_filter=StateFilter.from_types([(EventTypes.Name, "")]),
to_token=to_token,
)
name_event_id = name_state_ids.get((EventTypes.Name, ""))
room_membership_summary: Mapping[str, MemberSummary]
empty_membership_summary = MemberSummary([], 0)
if room_membership_for_user_at_to_token.membership in (
Membership.LEAVE,
Membership.BAN,
):
# TODO: Figure out how to get the membership summary for left/banned rooms
room_membership_summary = {}
# Get the changes to current state in the token range from the
# `current_state_delta_stream` table.
#
# For incremental syncs, we can do this first to determine if something relevant
# has changed and strategically avoid fetching other costly things.
room_state_delta_id_map: MutableStateMap[str] = {}
name_event_id: Optional[str] = None
membership_changed = False
name_changed = False
avatar_changed = False
if initial:
# Check whether the room has a name set
name_state_ids = await self.get_current_state_ids_at(
room_id=room_id,
room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
state_filter=StateFilter.from_types([(EventTypes.Name, "")]),
to_token=to_token,
)
name_event_id = name_state_ids.get((EventTypes.Name, ""))
else:
room_membership_summary = await self.store.get_room_summary(room_id)
# TODO: Reverse/rewind back to the `to_token`
assert from_bound is not None
# TODO: Limit the number of state events we're about to send down
# the room, if its too many we should change this to an
# `initial=True`?
deltas = await self.store.get_current_state_deltas_for_room(
room_id=room_id,
from_token=from_bound,
to_token=to_token.room_key,
)
for delta in deltas:
# TODO: Handle state resets where event_id is None
if delta.event_id is not None:
room_state_delta_id_map[(delta.event_type, delta.state_key)] = (
delta.event_id
)
if delta.event_type == EventTypes.Member:
membership_changed = True
elif delta.event_type == EventTypes.Name and delta.state_key == "":
name_changed = True
elif (
delta.event_type == EventTypes.RoomAvatar and delta.state_key == ""
):
avatar_changed = True
room_membership_summary: Optional[Mapping[str, MemberSummary]] = None
empty_membership_summary = MemberSummary([], 0)
# We need the room summary for:
# - Always for initial syncs (or the first time we send down the room)
# - When the room has no name, we need `heroes`
# - When the membership has changed so we need to give updated `heroes` and
# `joined_count`/`invited_count`.
#
# Ideally, instead of just looking at `name_changed`, we'd check if the room
# name is not set but this is a good enough approximation that saves us from
# having to pull out the full event. This just means, we're generating the
# summary whenever the room name changes instead of only when it changes to
# `None`.
if initial or name_changed or membership_changed:
# We can't trace the function directly because it's cached and the `@cached`
# decorator doesn't mix with `@trace` yet.
with start_active_span("get_room_summary"):
if room_membership_for_user_at_to_token.membership in (
Membership.LEAVE,
Membership.BAN,
):
# TODO: Figure out how to get the membership summary for left/banned rooms
room_membership_summary = {}
else:
room_membership_summary = await self.store.get_room_summary(room_id)
# TODO: Reverse/rewind back to the `to_token`
# `heroes` are required if the room name is not set.
#
@ -786,7 +839,12 @@ class SlidingSyncHandler:
# TODO: Should we also check for `EventTypes.CanonicalAlias`
# (`m.room.canonical_alias`) as a fallback for the room name? see
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1671260153
if name_event_id is None:
#
# We need to fetch the `heroes` if the room name is not set. But we only need to
# get them on initial syncs (or the first time we send down the room) or if the
# membership has changed which may change the heroes.
if name_event_id is None and (initial or (not initial and membership_changed)):
assert room_membership_summary is not None
hero_user_ids = extract_heroes_from_room_summary(
room_membership_summary, me=user.to_string()
)
@ -904,9 +962,15 @@ class SlidingSyncHandler:
# We need this base set of info for the response so let's just fetch it along
# with the `required_state` for the room
meta_room_state = [(EventTypes.Name, ""), (EventTypes.RoomAvatar, "")] + [
hero_room_state = [
(EventTypes.Member, hero_user_id) for hero_user_id in hero_user_ids
]
meta_room_state = list(hero_room_state)
if initial or name_changed:
meta_room_state.append((EventTypes.Name, ""))
if initial or avatar_changed:
meta_room_state.append((EventTypes.RoomAvatar, ""))
state_filter = StateFilter.all()
if required_state_filter != StateFilter.all():
state_filter = StateFilter(
@ -929,21 +993,22 @@ class SlidingSyncHandler:
else:
assert from_bound is not None
# TODO: Limit the number of state events we're about to send down
# the room, if its too many we should change this to an
# `initial=True`?
deltas = await self.store.get_current_state_deltas_for_room(
room_id=room_id,
from_token=from_bound,
to_token=to_token.room_key,
)
# TODO: Filter room state before fetching events
# TODO: Handle state resets where event_id is None
events = await self.store.get_events(
[d.event_id for d in deltas if d.event_id]
state_filter.filter_state(room_state_delta_id_map).values()
)
room_state = {(s.type, s.state_key): s for s in events.values()}
# If the membership changed and we have to get heroes, get the remaining
# heroes from the state
if hero_user_ids:
hero_membership_state = await self.get_current_state_at(
room_id=room_id,
room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
state_filter=StateFilter.from_types(hero_room_state),
to_token=to_token,
)
room_state.update(hero_membership_state)
required_room_state: StateMap[EventBase] = {}
if required_state_filter != StateFilter.none():
required_room_state = required_state_filter.filter_state(room_state)
@ -1050,6 +1115,20 @@ class SlidingSyncHandler:
set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)
joined_count: Optional[int] = None
if initial or membership_changed:
assert room_membership_summary is not None
joined_count = room_membership_summary.get(
Membership.JOIN, empty_membership_summary
).count
invited_count: Optional[int] = None
if initial or membership_changed:
assert room_membership_summary is not None
invited_count = room_membership_summary.get(
Membership.INVITE, empty_membership_summary
).count
return SlidingSyncResult.RoomResult(
name=room_name,
avatar=room_avatar,
@ -1065,12 +1144,8 @@ class SlidingSyncHandler:
unstable_expanded_timeline=unstable_expanded_timeline,
num_live=num_live,
bump_stamp=bump_stamp,
joined_count=room_membership_summary.get(
Membership.JOIN, empty_membership_summary
).count,
invited_count=room_membership_summary.get(
Membership.INVITE, empty_membership_summary
).count,
joined_count=joined_count,
invited_count=invited_count,
# TODO: These are just dummy values. We could potentially just remove these
# since notifications can only really be done correctly on the client anyway
# (encrypted rooms).

View file

@ -1011,12 +1011,16 @@ class SlidingSyncRestServlet(RestServlet):
for room_id, room_result in rooms.items():
serialized_rooms[room_id] = {
"bump_stamp": room_result.bump_stamp,
"joined_count": room_result.joined_count,
"invited_count": room_result.invited_count,
"notification_count": room_result.notification_count,
"highlight_count": room_result.highlight_count,
}
if room_result.joined_count is not None:
serialized_rooms[room_id]["joined_count"] = room_result.joined_count
if room_result.invited_count is not None:
serialized_rooms[room_id]["invited_count"] = room_result.invited_count
if room_result.name:
serialized_rooms[room_id]["name"] = room_result.name

View file

@ -197,8 +197,8 @@ class SlidingSyncResult:
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
num_live: Optional[int]
bump_stamp: int
joined_count: int
invited_count: int
joined_count: Optional[int]
invited_count: Optional[int]
notification_count: int
highlight_count: int
@ -207,6 +207,12 @@ class SlidingSyncResult:
# If this is the first time the client is seeing the room, we should not filter it out
# under any circumstance.
self.initial
# We need to let the client know if any of the info has changed
or self.name is not None
or self.avatar is not None
or bool(self.heroes)
or self.joined_count is not None
or self.invited_count is not None
# We need to let the client know if there are any new events
or bool(self.required_state)
or bool(self.timeline_events)

View file

@ -13,7 +13,7 @@
#
import logging
from parameterized import parameterized_class
from parameterized import parameterized, parameterized_class
from twisted.test.proto_helpers import MemoryReactor
@ -67,10 +67,11 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
super().prepare(reactor, clock, hs)
def test_rooms_meta_when_joined(self) -> None:
def test_rooms_meta_when_joined_initial(self) -> None:
"""
Test that the `rooms` `name` and `avatar` are included in the response and
reflect the current state of the room when the user is joined to the room.
Test that the `rooms` `name` and `avatar` are included in the initial sync
response and reflect the current state of the room when the user is joined to
the room.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
@ -107,6 +108,7 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Reflect the current state of the room
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
self.assertEqual(
response_body["rooms"][room_id1]["name"],
"my super room",
@ -129,6 +131,178 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
response_body["rooms"][room_id1].get("is_dm"),
)
def test_rooms_meta_when_joined_incremental_no_change(self) -> None:
"""
Test that the `rooms` `name` and `avatar` aren't included in an incremental sync
response if they haven't changed.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(
user2_id,
tok=user2_tok,
extra_content={
"name": "my super room",
},
)
# Set the room avatar URL
self.helper.send_state(
room_id1,
EventTypes.RoomAvatar,
{"url": "mxc://DUMMY_MEDIA_ID"},
tok=user2_tok,
)
self.helper.join(room_id1, user1_id, tok=user1_tok)
# Make the Sliding Sync request
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
# This needs to be set to one so the `RoomResult` isn't empty and
# the room comes down incremental sync when we send a new message.
"timeline_limit": 1,
}
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# Send a message to make the room come down sync
self.helper.send(room_id1, "message in room1", tok=user2_tok)
# Incremental sync
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
# We should only see changed meta info (nothing changed so we shouldn't see any
# of these fields)
self.assertNotIn(
"initial",
response_body["rooms"][room_id1],
)
self.assertNotIn(
"name",
response_body["rooms"][room_id1],
)
self.assertNotIn(
"avatar",
response_body["rooms"][room_id1],
)
self.assertNotIn(
"joined_count",
response_body["rooms"][room_id1],
)
self.assertNotIn(
"invited_count",
response_body["rooms"][room_id1],
)
self.assertIsNone(
response_body["rooms"][room_id1].get("is_dm"),
)
@parameterized.expand(
[
("in_required_state", True),
("not_in_required_state", False),
]
)
def test_rooms_meta_when_joined_incremental_with_state_change(
self, test_description: str, include_changed_state_in_required_state: bool
) -> None:
"""
Test that the `rooms` `name` and `avatar` are included in an incremental sync
response if they changed.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(
user2_id,
tok=user2_tok,
extra_content={
"name": "my super room",
},
)
# Set the room avatar URL
self.helper.send_state(
room_id1,
EventTypes.RoomAvatar,
{"url": "mxc://DUMMY_MEDIA_ID"},
tok=user2_tok,
)
self.helper.join(room_id1, user1_id, tok=user1_tok)
# Make the Sliding Sync request
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": (
[[EventTypes.Name, ""], [EventTypes.RoomAvatar, ""]]
# Conditionally include the changed state in the
# `required_state` to make sure whether we request it or not,
# the new room name still flows down to the client.
if include_changed_state_in_required_state
else []
),
"timeline_limit": 0,
}
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# Update the room name
self.helper.send_state(
room_id1,
EventTypes.Name,
{EventContentFields.ROOM_NAME: "my super duper room"},
tok=user2_tok,
)
# Update the room avatar URL
self.helper.send_state(
room_id1,
EventTypes.RoomAvatar,
{"url": "mxc://DUMMY_MEDIA_ID_UPDATED"},
tok=user2_tok,
)
# Incremental sync
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
# We should only see changed meta info (the room name and avatar)
self.assertNotIn(
"initial",
response_body["rooms"][room_id1],
)
self.assertEqual(
response_body["rooms"][room_id1]["name"],
"my super duper room",
response_body["rooms"][room_id1],
)
self.assertEqual(
response_body["rooms"][room_id1]["avatar"],
"mxc://DUMMY_MEDIA_ID_UPDATED",
response_body["rooms"][room_id1],
)
self.assertNotIn(
"joined_count",
response_body["rooms"][room_id1],
)
self.assertNotIn(
"invited_count",
response_body["rooms"][room_id1],
)
self.assertIsNone(
response_body["rooms"][room_id1].get("is_dm"),
)
def test_rooms_meta_when_invited(self) -> None:
"""
Test that the `rooms` `name` and `avatar` are included in the response and
@ -186,6 +360,7 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
# This should still reflect the current state of the room even when the user is
# invited.
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
self.assertEqual(
response_body["rooms"][room_id1]["name"],
"my super duper room",
@ -264,6 +439,7 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Reflect the state of the room at the time of leaving
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
self.assertEqual(
response_body["rooms"][room_id1]["name"],
"my super room",
@ -338,6 +514,7 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
# Room1 has a name so we shouldn't see any `heroes` which the client would use
# the calculate the room name themselves.
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
self.assertEqual(
response_body["rooms"][room_id1]["name"],
"my super room",
@ -354,6 +531,7 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
)
# Room2 doesn't have a name so we should see `heroes` populated
self.assertEqual(response_body["rooms"][room_id2]["initial"], True)
self.assertIsNone(response_body["rooms"][room_id2].get("name"))
self.assertCountEqual(
[
@ -425,6 +603,7 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Room2 doesn't have a name so we should see `heroes` populated
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
self.assertIsNone(response_body["rooms"][room_id1].get("name"))
self.assertCountEqual(
[
@ -497,7 +676,8 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
}
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Room2 doesn't have a name so we should see `heroes` populated
# Room doesn't have a name so we should see `heroes` populated
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
self.assertIsNone(response_body["rooms"][room_id1].get("name"))
self.assertCountEqual(
[
@ -527,6 +707,165 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
0,
)
def test_rooms_meta_heroes_incremental_sync_no_change(self) -> None:
"""
Test that the `rooms` `heroes` aren't included in an incremental sync
response if they haven't changed.
(when the room doesn't have a room name set)
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
user3_id = self.register_user("user3", "pass")
_user3_tok = self.login(user3_id, "pass")
room_id = self.helper.create_room_as(
user2_id,
tok=user2_tok,
extra_content={
# No room name set so that `heroes` is populated
#
# "name": "my super room2",
},
)
self.helper.join(room_id, user1_id, tok=user1_tok)
# User3 is invited
self.helper.invite(room_id, src=user2_id, targ=user3_id, tok=user2_tok)
# Make the Sliding Sync request
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
# This needs to be set to one so the `RoomResult` isn't empty and
# the room comes down incremental sync when we send a new message.
"timeline_limit": 1,
}
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# Send a message to make the room come down sync
self.helper.send(room_id, "message in room", tok=user2_tok)
# Incremental sync
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
# This is an incremental sync and the second time we have seen this room so it
# isn't `initial`
self.assertNotIn(
"initial",
response_body["rooms"][room_id],
)
# Room shouldn't have a room name because we're testing the `heroes` field which
# will only has a chance to appear if the room doesn't have a name.
self.assertNotIn(
"name",
response_body["rooms"][room_id],
)
# No change to heroes
self.assertNotIn(
"heroes",
response_body["rooms"][room_id],
)
# No change to member counts
self.assertNotIn(
"joined_count",
response_body["rooms"][room_id],
)
self.assertNotIn(
"invited_count",
response_body["rooms"][room_id],
)
# We didn't request any state so we shouldn't see any `required_state`
self.assertNotIn(
"required_state",
response_body["rooms"][room_id],
)
def test_rooms_meta_heroes_incremental_sync_with_membership_change(self) -> None:
"""
Test that the `rooms` `heroes` are included in an incremental sync response if
the membership has changed.
(when the room doesn't have a room name set)
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
user3_id = self.register_user("user3", "pass")
user3_tok = self.login(user3_id, "pass")
room_id = self.helper.create_room_as(
user2_id,
tok=user2_tok,
extra_content={
# No room name set so that `heroes` is populated
#
# "name": "my super room2",
},
)
self.helper.join(room_id, user1_id, tok=user1_tok)
# User3 is invited
self.helper.invite(room_id, src=user2_id, targ=user3_id, tok=user2_tok)
# Make the Sliding Sync request
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 0,
}
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# User3 joins (membership change)
self.helper.join(room_id, user3_id, tok=user3_tok)
# Incremental sync
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
# This is an incremental sync and the second time we have seen this room so it
# isn't `initial`
self.assertNotIn(
"initial",
response_body["rooms"][room_id],
)
# Room shouldn't have a room name because we're testing the `heroes` field which
# will only has a chance to appear if the room doesn't have a name.
self.assertNotIn(
"name",
response_body["rooms"][room_id],
)
# Membership change so we should see heroes and membership counts
self.assertCountEqual(
[
hero["user_id"]
for hero in response_body["rooms"][room_id].get("heroes", [])
],
# Heroes shouldn't include the user themselves (we shouldn't see user1)
[user2_id, user3_id],
)
self.assertEqual(
response_body["rooms"][room_id]["joined_count"],
3,
)
self.assertEqual(
response_body["rooms"][room_id]["invited_count"],
0,
)
# We didn't request any state so we shouldn't see any `required_state`
self.assertNotIn(
"required_state",
response_body["rooms"][room_id],
)
def test_rooms_bump_stamp(self) -> None:
"""
Test that `bump_stamp` is present and pointing to relevant events.