Add test to handle state reset in the meta data

This commit is contained in:
Eric Eastwood 2024-08-19 23:22:24 -05:00
parent 98fb56e5fe
commit 8ee2e114dd
2 changed files with 234 additions and 33 deletions

View file

@ -1445,41 +1445,61 @@ class PersistEventsStore:
insert_values = sliding_sync_joined_rooms_insert_map.values()
# We only need to update when one of the relevant state values has changed
if insert_keys:
args: List[Any] = [
room_id,
# Even though `Mapping`/`Dict` have no guaranteed order, some
# implementations may preserve insertion order so we're just going to
# choose the best possible answer by using the "first" event ID which we
# will assume will have the greatest `stream_ordering`. We really just
# need *some* answer in case we are the first ones inserting into the
# table and in reality,
# `_update_sliding_sync_tables_with_new_persisted_events_txn()` is run
# after this function to update it to the correct latest value. This is
# just to account for things changing in the future.
next(iter(to_insert.values())),
]
# If we have some `to_insert` values, we can use the standard upsert
# pattern because we have access to an `event_id` to use for the
# `event_stream_ordering` which has a `NON NULL` constraint.
if to_insert:
args: List[Any] = [
room_id,
# Even though `Mapping`/`Dict` have no guaranteed order, some
# implementations may preserve insertion order so we're just
# going to choose the best possible answer by using the "first"
# event ID which we will assume will have the greatest
# `stream_ordering`. We really just need *some* answer in case
# we are the first ones inserting into the table because of the
# `NON NULL` constraint on `event_stream_ordering`. In reality,
# `_update_sliding_sync_tables_with_new_persisted_events_txn()`
# is run after this function to update it to the correct latest
# value.
next(iter(to_insert.values())),
]
args.extend(iter(insert_values))
args.extend(iter(insert_values))
# We don't update `event_stream_ordering` `ON CONFLICT` because it's
# simpler and we can just rely on
# `_update_sliding_sync_tables_with_new_persisted_events_txn()` to do
# the right thing (same for `bump_stamp`).
txn.execute(
f"""
INSERT INTO sliding_sync_joined_rooms
(room_id, event_stream_ordering, {", ".join(insert_keys)})
VALUES (
?,
(SELECT stream_ordering FROM events WHERE event_id = ?),
{", ".join("?" for _ in insert_values)}
# We don't update `event_stream_ordering` `ON CONFLICT` because it's
# simpler and we can just rely on
# `_update_sliding_sync_tables_with_new_persisted_events_txn()` to do
# the right thing (same for `bump_stamp`).
txn.execute(
f"""
INSERT INTO sliding_sync_joined_rooms
(room_id, event_stream_ordering, {", ".join(insert_keys)})
VALUES (
?,
(SELECT stream_ordering FROM events WHERE event_id = ?),
{", ".join("?" for _ in insert_values)}
)
ON CONFLICT (room_id)
DO UPDATE SET
{", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)}
""",
args,
)
# If there are only values `to_delete`, we have to use an `UPDATE`
# instead because there is no `event_id` to use for the `NON NULL`
# constraint on `event_stream_ordering`.
elif to_delete:
args = list(insert_values) + [room_id]
txn.execute(
f"""
UPDATE sliding_sync_joined_rooms
SET
{", ".join(f"{key} = ?" for key in insert_keys)}
WHERE room_id = ?
""",
args,
)
ON CONFLICT (room_id)
DO UPDATE SET
{", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)}
""",
args,
)
# We now update `local_current_membership`. We do this regardless
# of whether we're still in the room or not to handle the case where

View file

@ -35,10 +35,12 @@ from synapse.federation.federation_base import event_from_pdu_json
from synapse.rest import admin
from synapse.rest.client import login, room
from synapse.server import HomeServer
from synapse.storage.databases.main.events import DeltaState
from synapse.storage.databases.state.bg_updates import _BackgroundUpdates
from synapse.types import StateMap
from synapse.util import Clock
from tests.test_utils.event_injection import create_event
from tests.unittest import HomeserverTestCase
logger = logging.getLogger(__name__)
@ -540,6 +542,9 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.storage_controllers = hs.get_storage_controllers()
persist_events_store = self.hs.get_datastores().persist_events
assert persist_events_store is not None
self.persist_events_store = persist_events_store
def _get_sliding_sync_joined_rooms(self) -> Dict[str, _SlidingSyncJoinedRoomResult]:
"""
@ -1313,7 +1318,183 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
user2_snapshot,
)
# TODO: test_joined_room_state_reset
def test_joined_room_meta_state_reset(self) -> None:
"""
Test that a state reset on the room name is reflected in the
`sliding_sync_joined_rooms` table.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
# Add a room name
self.helper.send_state(
room_id,
EventTypes.Name,
{"name": "my super duper room"},
tok=user2_tok,
)
# User1 joins the room
self.helper.join(room_id, user1_id, tok=user1_tok)
# Make sure we see the new room name
sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms()
self.assertIncludes(
set(sliding_sync_joined_rooms_results.keys()),
{room_id},
exact=True,
)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id)
)
self.assertEqual(
sliding_sync_joined_rooms_results[room_id],
_SlidingSyncJoinedRoomResult(
room_id=room_id,
# This should be whatever is the last event in the room
event_stream_ordering=state_map[
(EventTypes.Member, user1_id)
].internal_metadata.stream_ordering,
bump_stamp=state_map[
(EventTypes.Create, "")
].internal_metadata.stream_ordering,
room_type=None,
room_name="my super duper room",
is_encrypted=False,
),
)
sliding_sync_membership_snapshots_results = (
self._get_sliding_sync_membership_snapshots()
)
self.assertIncludes(
set(sliding_sync_membership_snapshots_results.keys()),
{
(room_id, user1_id),
(room_id, user2_id),
},
exact=True,
)
user1_snapshot = _SlidingSyncMembershipSnapshotResult(
room_id=room_id,
user_id=user1_id,
membership_event_id=state_map[(EventTypes.Member, user1_id)].event_id,
membership=Membership.JOIN,
event_stream_ordering=state_map[
(EventTypes.Member, user1_id)
].internal_metadata.stream_ordering,
has_known_state=True,
room_type=None,
room_name="my super duper room",
is_encrypted=False,
)
self.assertEqual(
sliding_sync_membership_snapshots_results.get((room_id, user1_id)),
user1_snapshot,
)
# Holds the info according to the current state when the user joined (no room
# name when the room creator joined)
user2_snapshot = _SlidingSyncMembershipSnapshotResult(
room_id=room_id,
user_id=user2_id,
membership_event_id=state_map[(EventTypes.Member, user2_id)].event_id,
membership=Membership.JOIN,
event_stream_ordering=state_map[
(EventTypes.Member, user2_id)
].internal_metadata.stream_ordering,
has_known_state=True,
room_type=None,
room_name=None,
is_encrypted=False,
)
self.assertEqual(
sliding_sync_membership_snapshots_results.get((room_id, user2_id)),
user2_snapshot,
)
# Mock a state reset removing the room name state from the current state
message_tuple = self.get_success(
create_event(
self.hs,
prev_event_ids=[state_map[(EventTypes.Name, "")].event_id],
auth_event_ids=[
state_map[(EventTypes.Create, "")].event_id,
state_map[(EventTypes.Member, user1_id)].event_id,
],
type=EventTypes.Message,
content={"body": "foo", "msgtype": "m.text"},
sender=user1_id,
room_id=room_id,
room_version=RoomVersions.V10.identifier,
)
)
event_chunk = [message_tuple]
self.get_success(
self.persist_events_store._persist_events_and_state_updates(
room_id,
event_chunk,
state_delta_for_room=DeltaState(
# This is the state reset part. We're removing the room name state.
to_delete=[(EventTypes.Name, "")],
to_insert={},
),
new_forward_extremities={message_tuple[0].event_id},
use_negative_stream_ordering=False,
inhibit_local_membership_updates=False,
new_event_links={},
)
)
# Make sure the state reset is reflected in the `sliding_sync_joined_rooms` table
sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms()
self.assertIncludes(
set(sliding_sync_joined_rooms_results.keys()),
{room_id},
exact=True,
)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id)
)
self.assertEqual(
sliding_sync_joined_rooms_results[room_id],
_SlidingSyncJoinedRoomResult(
room_id=room_id,
# This should be whatever is the last event in the room
event_stream_ordering=message_tuple[
0
].internal_metadata.stream_ordering,
bump_stamp=message_tuple[0].internal_metadata.stream_ordering,
room_type=None,
# This was state reset back to None
room_name=None,
is_encrypted=False,
),
)
# State reset shouldn't be reflected in the `sliding_sync_membership_snapshots`
sliding_sync_membership_snapshots_results = (
self._get_sliding_sync_membership_snapshots()
)
self.assertIncludes(
set(sliding_sync_membership_snapshots_results.keys()),
{
(room_id, user1_id),
(room_id, user2_id),
},
exact=True,
)
# Snapshots haven't changed
self.assertEqual(
sliding_sync_membership_snapshots_results.get((room_id, user1_id)),
user1_snapshot,
)
self.assertEqual(
sliding_sync_membership_snapshots_results.get((room_id, user2_id)),
user2_snapshot,
)
def test_non_join_space_room_with_info(self) -> None:
"""