WIP: Remember memberships already sent

This commit is contained in:
Eric Eastwood 2024-10-08 21:36:17 -05:00
parent 5066cbc6f6
commit c5494f9836
3 changed files with 354 additions and 96 deletions

View file

@ -869,6 +869,8 @@ class SlidingSyncHandler:
#
# Calculate the `StateFilter` based on the `required_state` for the room
required_state_filter = StateFilter.none()
# Extra membership that we need pull out of the current state because of
# lazy-loading room members.
added_membership_state_filter = StateFilter.none()
# The requested `required_state_map` with the any lazy membership expanded and
# `$ME` replaced with the user's ID. This allows us to see what membership we've
@ -1180,6 +1182,9 @@ class SlidingSyncHandler:
# sensible order again.
bump_stamp = 0
logger.info("asdf expanded_required_state_map %s", expanded_required_state_map)
logger.info("asdf changed_required_state_map %s", changed_required_state_map)
room_sync_required_state_map_to_persist = expanded_required_state_map
if changed_required_state_map:
room_sync_required_state_map_to_persist = changed_required_state_map
@ -1379,10 +1384,36 @@ def _required_state_changes(
A 2-tuple of updated required state config and the state filter to use
to fetch extra current state that we need to return.
"""
if prev_required_state_map == request_required_state_map:
# There has been no change. Return immediately.
return None, StateFilter.none()
# Convert the list of state_deltas to map from type to state_keys that have
# changed.
changed_types_to_state_keys: Dict[str, Set[str]] = {}
for event_type, state_key in state_deltas:
changed_types_to_state_keys.setdefault(event_type, set()).add(state_key)
new_required_state_map: Dict[str, Set[str]] = {}
for event_type in (
prev_required_state_map.keys() | request_required_state_map.keys()
):
old_state_keys = prev_required_state_map.get(event_type, set())
changed_state_keys = changed_types_to_state_keys.get(event_type, set())
# We keep entries from the previous required state where the state hasn't
# changed. This way we can still keep track that we've already sent down
# the state to the client.
new_required_state_map[event_type] = request_required_state_map.get(
event_type, set()
) | {
old_state_key
for old_state_key in old_state_keys
if old_state_key not in changed_state_keys
and old_state_key not in {StateValues.WILDCARD, StateValues.LAZY}
}
prev_wildcard = prev_required_state_map.get(StateValues.WILDCARD, set())
request_wildcard = request_required_state_map.get(StateValues.WILDCARD, set())
@ -1403,10 +1434,6 @@ def _required_state_changes(
# Keys were only removed, so we don't have to fetch everything.
return request_required_state_map, StateFilter.none()
# Contains updates to the required state map compared with the previous room
# config. This has the same format as `RoomSyncConfig.required_state`
changes: Dict[str, AbstractSet[str]] = {}
# The set of types/state keys that we need to fetch and return to the
# client. Passed to `StateFilter.from_types(...)`
added: List[Tuple[str, Optional[str]]] = []
@ -1426,9 +1453,6 @@ def _required_state_changes(
# Nothing *added*, so we skip. Removals happen below.
continue
# Always update changes to include the newly added keys
changes[event_type] = request_state_keys
if StateValues.WILDCARD in old_state_keys:
# We were previously fetching everything for this type, so we don't need to
# fetch anything new.
@ -1451,78 +1475,17 @@ def _required_state_changes(
added_state_filter = StateFilter.from_types(added)
# Convert the list of state deltas to map from type to state_keys that have
# changed.
changed_types_to_state_keys: Dict[str, Set[str]] = {}
for event_type, state_key in state_deltas:
changed_types_to_state_keys.setdefault(event_type, set()).add(state_key)
# Figure out what changes we need to apply to the effective required state
# config.
for event_type, changed_state_keys in changed_types_to_state_keys.items():
old_state_keys = prev_required_state_map.get(event_type, set())
request_state_keys = request_required_state_map.get(event_type, set())
if old_state_keys == request_state_keys:
# No change.
continue
if request_state_keys - old_state_keys:
# We've expanded the set of state keys, so we just clobber the
# current set with the new set.
#
# We could also ensure that we keep entries where the state hasn't
# changed, but are no longer in the requested required state, but
# that's a sufficient edge case that we can ignore (as its only a
# performance optimization).
changes[event_type] = request_state_keys
continue
old_state_key_wildcard = StateValues.WILDCARD in old_state_keys
request_state_key_wildcard = StateValues.WILDCARD in request_state_keys
if old_state_key_wildcard != request_state_key_wildcard:
# If a state_key wildcard has been added or removed, we always update the
# effective room required state config to match the request.
changes[event_type] = request_state_keys
continue
old_state_key_lazy = StateValues.LAZY in old_state_keys
request_state_key_lazy = StateValues.LAZY in request_state_keys
if old_state_key_lazy != request_state_key_lazy:
# If a "$LAZY" has been added or removed we always update the effective room
# required state config to match the request.
changes[event_type] = request_state_keys
continue
# Handle "$ME" values by replacing state keys that match the user ID.
if user_id in changed_state_keys:
changed_state_keys.discard(user_id)
changed_state_keys.add(StateValues.ME)
# At this point there are no wildcards and no additions to the set of
# state keys requested, only deletions.
#
if event_type in new_required_state_map:
# We only remove state keys from the effective state if they've been
# removed from the request *and* the state has changed. This ensures
# that if a client removes and then readds a state key, we only send
# down the associated current state event if its changed (rather than
# sending down the same event twice).
invalidated = (old_state_keys - request_state_keys) & changed_state_keys
if invalidated:
changes[event_type] = old_state_keys - invalidated
if changes:
# Update the required state config based on the changes.
new_required_state_map = dict(prev_required_state_map)
for event_type, state_keys in changes.items():
if state_keys:
new_required_state_map[event_type] = state_keys
else:
# Remove entries with empty state keys.
new_required_state_map.pop(event_type, None)
new_required_state_map[event_type] = (
new_required_state_map[event_type] - changed_state_keys
)
return new_required_state_map, added_state_filter
else:
return None, added_state_filter

View file

@ -3671,6 +3671,127 @@ class RequiredStateChangesTestCase(unittest.TestCase):
),
),
),
(
"state_key_lazy_keep_previous_memberships_and_no_new_memberships",
"""
This test mimics a request with lazy-loading room members enabled where
we have previously sent down user2 and user3's membership events and now
we're sending down another response without any timeline events.
""",
RequiredStateChangesTestParameters(
previous_required_state_map={
EventTypes.Member: {
StateValues.LAZY,
"@user2:test",
"@user3:test",
}
},
request_required_state_map={EventTypes.Member: {StateValues.LAZY}},
state_deltas={(EventTypes.Member, "@user:test"): "$event_id"},
expected_with_state_deltas=(
# If a "$LAZY" has been added or removed we always update the
# required state to what was requested for simplicity.
{
EventTypes.Member: {
StateValues.LAZY,
"@user2:test",
"@user3:test",
}
},
StateFilter.none(),
),
expected_without_state_deltas=(
{
EventTypes.Member: {
StateValues.LAZY,
"@user2:test",
"@user3:test",
}
},
StateFilter.none(),
),
),
),
(
"state_key_lazy_keep_previous_memberships_with_new_memberships",
"""
This test mimics a request with lazy-loading room members enabled where
we have previously sent down user2 and user3's membership events and now
we're sending down another response with a new event from user4.
""",
RequiredStateChangesTestParameters(
previous_required_state_map={
EventTypes.Member: {
StateValues.LAZY,
"@user2:test",
"@user3:test",
}
},
request_required_state_map={
EventTypes.Member: {StateValues.LAZY, "@user4:test"}
},
state_deltas={(EventTypes.Member, "@user:test"): "$event_id"},
expected_with_state_deltas=(
# If a "$LAZY" has been added or removed we always update the
# required state to what was requested for simplicity.
{
EventTypes.Member: {
StateValues.LAZY,
"@user2:test",
"@user3:test",
"@user4:test",
}
},
StateFilter.none(),
),
expected_without_state_deltas=(
{
EventTypes.Member: {
StateValues.LAZY,
"@user2:test",
"@user3:test",
"@user4:test",
}
},
StateFilter.none(),
),
),
),
(
"state_key_expand_lazy_keep_previous_memberships",
"""
Test TODO
""",
RequiredStateChangesTestParameters(
previous_required_state_map={
EventTypes.Member: {"@user2:test", "@user3:test"}
},
request_required_state_map={EventTypes.Member: {StateValues.LAZY}},
state_deltas={(EventTypes.Member, "@user:test"): "$event_id"},
expected_with_state_deltas=(
# If a "$LAZY" has been added or removed we always update the
# required state to what was requested for simplicity.
{
EventTypes.Member: {
StateValues.LAZY,
"@user2:test",
"@user3:test",
}
},
StateFilter.none(),
),
expected_without_state_deltas=(
{
EventTypes.Member: {
StateValues.LAZY,
"@user2:test",
"@user3:test",
}
},
StateFilter.none(),
),
),
),
(
"type_wildcard_with_state_key_wildcard_to_explicit_state_keys",
"""
@ -3784,11 +3905,18 @@ class RequiredStateChangesTestCase(unittest.TestCase):
# request. And we need to request all of the state for that type
# because we previously, only sent down a few keys.
expected_with_state_deltas=(
{"type1": {StateValues.WILDCARD}},
{"type1": {StateValues.WILDCARD, "state_key2", "state_key3"}},
StateFilter.from_types([("type1", None)]),
),
expected_without_state_deltas=(
{"type1": {StateValues.WILDCARD}},
{
"type1": {
StateValues.WILDCARD,
"state_key1",
"state_key2",
"state_key3",
}
},
StateFilter.from_types([("type1", None)]),
),
),
@ -3804,14 +3932,8 @@ class RequiredStateChangesTestCase(unittest.TestCase):
# Without `state_deltas`
changed_required_state_map, added_state_filter = _required_state_changes(
user_id="@user:test",
previous_room_config=RoomSyncConfig(
timeline_limit=0,
required_state_map=test_parameters.previous_required_state_map,
),
room_sync_config=RoomSyncConfig(
timeline_limit=0,
required_state_map=test_parameters.request_required_state_map,
),
prev_required_state_map=test_parameters.previous_required_state_map,
request_required_state_map=test_parameters.request_required_state_map,
state_deltas={},
)
@ -3829,14 +3951,8 @@ class RequiredStateChangesTestCase(unittest.TestCase):
# With `state_deltas`
changed_required_state_map, added_state_filter = _required_state_changes(
user_id="@user:test",
previous_room_config=RoomSyncConfig(
timeline_limit=0,
required_state_map=test_parameters.previous_required_state_map,
),
room_sync_config=RoomSyncConfig(
timeline_limit=0,
required_state_map=test_parameters.request_required_state_map,
),
prev_required_state_map=test_parameters.previous_required_state_map,
request_required_state_map=test_parameters.request_required_state_map,
state_deltas=test_parameters.state_deltas,
)

View file

@ -496,6 +496,185 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
)
self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
def test_rooms_required_state_expand_lazy_loading_room_members_incremental_sync(
self,
) -> None:
"""
Test that when we expand the `required_state` to include lazy-loading room
members, it returns people relevant to the timeline.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
user3_id = self.register_user("user3", "pass")
user3_tok = self.login(user3_id, "pass")
user4_id = self.register_user("user4", "pass")
user4_tok = self.login(user4_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
self.helper.join(room_id1, user3_id, tok=user3_tok)
self.helper.join(room_id1, user4_id, tok=user4_tok)
self.helper.send(room_id1, "1", tok=user2_tok)
self.helper.send(room_id1, "2", tok=user2_tok)
self.helper.send(room_id1, "3", tok=user2_tok)
# Make the Sliding Sync request *without* lazy loading for the room members
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Create, ""],
],
"timeline_limit": 3,
}
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# Send more timeline events into the room
self.helper.send(room_id1, "4", tok=user2_tok)
self.helper.send(room_id1, "5", tok=user4_tok)
self.helper.send(room_id1, "6", tok=user4_tok)
# Expand `required_state` and make an incremental Sliding Sync request *with*
# lazy-loading room members
sync_body["lists"]["foo-list"]["required_state"] = [
[EventTypes.Create, ""],
[EventTypes.Member, StateValues.LAZY],
]
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
# Only user2 and user4 sent events in the last 3 events we see in the `timeline`
# and we haven't seen any membership before this sync so we should see both
# users.
self._assertRequiredStateIncludes(
response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Member, user2_id)],
state_map[(EventTypes.Member, user4_id)],
},
exact=True,
)
self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
# Send a message so the room comes down sync.
self.helper.send(room_id1, "4", tok=user2_tok)
# Make another incremental Sliding Sync request
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
# Only user2 and user4 sent events in the last 3 events we see in the `timeline`
# but since we've seen both memberships in the last sync, they shouldn't appear
# again.
self._assertRequiredStateIncludes(
response_body["rooms"][room_id1]["required_state"],
set(),
exact=True,
)
self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
def test_rooms_required_state_expand_retract_expand_lazy_loading_room_members_incremental_sync(
self,
) -> None:
"""
Test that when we expand the `required_state` to include lazy-loading room
members, it returns people relevant to the timeline.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
user3_id = self.register_user("user3", "pass")
user3_tok = self.login(user3_id, "pass")
user4_id = self.register_user("user4", "pass")
user4_tok = self.login(user4_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
self.helper.join(room_id1, user3_id, tok=user3_tok)
self.helper.join(room_id1, user4_id, tok=user4_tok)
self.helper.send(room_id1, "1", tok=user2_tok)
self.helper.send(room_id1, "2", tok=user2_tok)
self.helper.send(room_id1, "3", tok=user2_tok)
# Make the Sliding Sync request *without* lazy loading for the room members
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Create, ""],
],
"timeline_limit": 3,
}
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# Send more timeline events into the room
self.helper.send(room_id1, "4", tok=user2_tok)
self.helper.send(room_id1, "5", tok=user4_tok)
self.helper.send(room_id1, "6", tok=user4_tok)
# Expand `required_state` and make an incremental Sliding Sync request *with*
# lazy-loading room members
sync_body["lists"]["foo-list"]["required_state"] = [
[EventTypes.Create, ""],
[EventTypes.Member, StateValues.LAZY],
]
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
# Only user2 and user4 sent events in the last 3 events we see in the `timeline`
# and we haven't seen any membership before this sync so we should see both
# users because we're lazy-loading the room members.
self._assertRequiredStateIncludes(
response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Member, user2_id)],
state_map[(EventTypes.Member, user4_id)],
},
exact=True,
)
# Send a message so the room comes down sync.
self.helper.send(room_id1, "msg", tok=user4_tok)
# Retract `required_state` and make an incremental Sliding Sync request
# requesting a few memberships
sync_body["lists"]["foo-list"]["required_state"] = [
[EventTypes.Create, ""],
[EventTypes.Member, StateValues.ME],
[EventTypes.Member, user2_id],
]
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
# We've seen user2's membership in the last sync so we shouldn't see it here
# even though it's requested. We should only see user1's membership.
self._assertRequiredStateIncludes(
response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Member, user1_id)],
},
exact=True,
)
def test_rooms_required_state_me(self) -> None:
"""
Test `rooms.required_state` correctly handles $ME.