From 8ee2e114dd0228d62fb48758cf8dbfa02f5aae4e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 19 Aug 2024 23:22:24 -0500 Subject: [PATCH] Add test to handle state reset in the meta data --- synapse/storage/databases/main/events.py | 84 +++++++---- tests/storage/test_events.py | 183 ++++++++++++++++++++++- 2 files changed, 234 insertions(+), 33 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index fa63ca7cf8..10865b1274 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1445,41 +1445,61 @@ class PersistEventsStore: insert_values = sliding_sync_joined_rooms_insert_map.values() # We only need to update when one of the relevant state values has changed if insert_keys: - args: List[Any] = [ - room_id, - # Even though `Mapping`/`Dict` have no guaranteed order, some - # implementations may preserve insertion order so we're just going to - # choose the best possible answer by using the "first" event ID which we - # will assume will have the greatest `stream_ordering`. We really just - # need *some* answer in case we are the first ones inserting into the - # table and in reality, - # `_update_sliding_sync_tables_with_new_persisted_events_txn()` is run - # after this function to update it to the correct latest value. This is - # just to account for things changing in the future. - next(iter(to_insert.values())), - ] + # If we have some `to_insert` values, we can use the standard upsert + # pattern because we have access to an `event_id` to use for the + # `event_stream_ordering` which has a `NON NULL` constraint. + if to_insert: + args: List[Any] = [ + room_id, + # Even though `Mapping`/`Dict` have no guaranteed order, some + # implementations may preserve insertion order so we're just + # going to choose the best possible answer by using the "first" + # event ID which we will assume will have the greatest + # `stream_ordering`. We really just need *some* answer in case + # we are the first ones inserting into the table because of the + # `NON NULL` constraint on `event_stream_ordering`. In reality, + # `_update_sliding_sync_tables_with_new_persisted_events_txn()` + # is run after this function to update it to the correct latest + # value. + next(iter(to_insert.values())), + ] - args.extend(iter(insert_values)) + args.extend(iter(insert_values)) - # We don't update `event_stream_ordering` `ON CONFLICT` because it's - # simpler and we can just rely on - # `_update_sliding_sync_tables_with_new_persisted_events_txn()` to do - # the right thing (same for `bump_stamp`). - txn.execute( - f""" - INSERT INTO sliding_sync_joined_rooms - (room_id, event_stream_ordering, {", ".join(insert_keys)}) - VALUES ( - ?, - (SELECT stream_ordering FROM events WHERE event_id = ?), - {", ".join("?" for _ in insert_values)} + # We don't update `event_stream_ordering` `ON CONFLICT` because it's + # simpler and we can just rely on + # `_update_sliding_sync_tables_with_new_persisted_events_txn()` to do + # the right thing (same for `bump_stamp`). + txn.execute( + f""" + INSERT INTO sliding_sync_joined_rooms + (room_id, event_stream_ordering, {", ".join(insert_keys)}) + VALUES ( + ?, + (SELECT stream_ordering FROM events WHERE event_id = ?), + {", ".join("?" for _ in insert_values)} + ) + ON CONFLICT (room_id) + DO UPDATE SET + {", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)} + """, + args, + ) + + # If there are only values `to_delete`, we have to use an `UPDATE` + # instead because there is no `event_id` to use for the `NON NULL` + # constraint on `event_stream_ordering`. + elif to_delete: + args = list(insert_values) + [room_id] + txn.execute( + f""" + UPDATE sliding_sync_joined_rooms + SET + {", ".join(f"{key} = ?" for key in insert_keys)} + WHERE room_id = ? + """, + args, ) - ON CONFLICT (room_id) - DO UPDATE SET - {", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)} - """, - args, - ) # We now update `local_current_membership`. We do this regardless # of whether we're still in the room or not to handle the case where diff --git a/tests/storage/test_events.py b/tests/storage/test_events.py index 83149a0921..bb59603d33 100644 --- a/tests/storage/test_events.py +++ b/tests/storage/test_events.py @@ -35,10 +35,12 @@ from synapse.federation.federation_base import event_from_pdu_json from synapse.rest import admin from synapse.rest.client import login, room from synapse.server import HomeServer +from synapse.storage.databases.main.events import DeltaState from synapse.storage.databases.state.bg_updates import _BackgroundUpdates from synapse.types import StateMap from synapse.util import Clock +from tests.test_utils.event_injection import create_event from tests.unittest import HomeserverTestCase logger = logging.getLogger(__name__) @@ -540,6 +542,9 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main self.storage_controllers = hs.get_storage_controllers() + persist_events_store = self.hs.get_datastores().persist_events + assert persist_events_store is not None + self.persist_events_store = persist_events_store def _get_sliding_sync_joined_rooms(self) -> Dict[str, _SlidingSyncJoinedRoomResult]: """ @@ -1313,7 +1318,183 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): user2_snapshot, ) - # TODO: test_joined_room_state_reset + def test_joined_room_meta_state_reset(self) -> None: + """ + Test that a state reset on the room name is reflected in the + `sliding_sync_joined_rooms` table. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id = self.helper.create_room_as(user2_id, tok=user2_tok) + # Add a room name + self.helper.send_state( + room_id, + EventTypes.Name, + {"name": "my super duper room"}, + tok=user2_tok, + ) + + # User1 joins the room + self.helper.join(room_id, user1_id, tok=user1_tok) + + # Make sure we see the new room name + sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms() + self.assertIncludes( + set(sliding_sync_joined_rooms_results.keys()), + {room_id}, + exact=True, + ) + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id) + ) + self.assertEqual( + sliding_sync_joined_rooms_results[room_id], + _SlidingSyncJoinedRoomResult( + room_id=room_id, + # This should be whatever is the last event in the room + event_stream_ordering=state_map[ + (EventTypes.Member, user1_id) + ].internal_metadata.stream_ordering, + bump_stamp=state_map[ + (EventTypes.Create, "") + ].internal_metadata.stream_ordering, + room_type=None, + room_name="my super duper room", + is_encrypted=False, + ), + ) + + sliding_sync_membership_snapshots_results = ( + self._get_sliding_sync_membership_snapshots() + ) + self.assertIncludes( + set(sliding_sync_membership_snapshots_results.keys()), + { + (room_id, user1_id), + (room_id, user2_id), + }, + exact=True, + ) + user1_snapshot = _SlidingSyncMembershipSnapshotResult( + room_id=room_id, + user_id=user1_id, + membership_event_id=state_map[(EventTypes.Member, user1_id)].event_id, + membership=Membership.JOIN, + event_stream_ordering=state_map[ + (EventTypes.Member, user1_id) + ].internal_metadata.stream_ordering, + has_known_state=True, + room_type=None, + room_name="my super duper room", + is_encrypted=False, + ) + self.assertEqual( + sliding_sync_membership_snapshots_results.get((room_id, user1_id)), + user1_snapshot, + ) + # Holds the info according to the current state when the user joined (no room + # name when the room creator joined) + user2_snapshot = _SlidingSyncMembershipSnapshotResult( + room_id=room_id, + user_id=user2_id, + membership_event_id=state_map[(EventTypes.Member, user2_id)].event_id, + membership=Membership.JOIN, + event_stream_ordering=state_map[ + (EventTypes.Member, user2_id) + ].internal_metadata.stream_ordering, + has_known_state=True, + room_type=None, + room_name=None, + is_encrypted=False, + ) + self.assertEqual( + sliding_sync_membership_snapshots_results.get((room_id, user2_id)), + user2_snapshot, + ) + + # Mock a state reset removing the room name state from the current state + message_tuple = self.get_success( + create_event( + self.hs, + prev_event_ids=[state_map[(EventTypes.Name, "")].event_id], + auth_event_ids=[ + state_map[(EventTypes.Create, "")].event_id, + state_map[(EventTypes.Member, user1_id)].event_id, + ], + type=EventTypes.Message, + content={"body": "foo", "msgtype": "m.text"}, + sender=user1_id, + room_id=room_id, + room_version=RoomVersions.V10.identifier, + ) + ) + event_chunk = [message_tuple] + self.get_success( + self.persist_events_store._persist_events_and_state_updates( + room_id, + event_chunk, + state_delta_for_room=DeltaState( + # This is the state reset part. We're removing the room name state. + to_delete=[(EventTypes.Name, "")], + to_insert={}, + ), + new_forward_extremities={message_tuple[0].event_id}, + use_negative_stream_ordering=False, + inhibit_local_membership_updates=False, + new_event_links={}, + ) + ) + + # Make sure the state reset is reflected in the `sliding_sync_joined_rooms` table + sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms() + self.assertIncludes( + set(sliding_sync_joined_rooms_results.keys()), + {room_id}, + exact=True, + ) + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id) + ) + self.assertEqual( + sliding_sync_joined_rooms_results[room_id], + _SlidingSyncJoinedRoomResult( + room_id=room_id, + # This should be whatever is the last event in the room + event_stream_ordering=message_tuple[ + 0 + ].internal_metadata.stream_ordering, + bump_stamp=message_tuple[0].internal_metadata.stream_ordering, + room_type=None, + # This was state reset back to None + room_name=None, + is_encrypted=False, + ), + ) + + # State reset shouldn't be reflected in the `sliding_sync_membership_snapshots` + sliding_sync_membership_snapshots_results = ( + self._get_sliding_sync_membership_snapshots() + ) + self.assertIncludes( + set(sliding_sync_membership_snapshots_results.keys()), + { + (room_id, user1_id), + (room_id, user2_id), + }, + exact=True, + ) + # Snapshots haven't changed + self.assertEqual( + sliding_sync_membership_snapshots_results.get((room_id, user1_id)), + user1_snapshot, + ) + self.assertEqual( + sliding_sync_membership_snapshots_results.get((room_id, user2_id)), + user2_snapshot, + ) def test_non_join_space_room_with_info(self) -> None: """