From 2964c567d3cd53a34219aec5e59691ab7e965f8d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 20 Aug 2024 13:21:19 -0500 Subject: [PATCH] Use dicts --- synapse/storage/controllers/persist_events.py | 134 +++++++----- synapse/storage/databases/main/events.py | 204 ++++++++++-------- 2 files changed, 192 insertions(+), 146 deletions(-) diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index 0bc60077fb..68f3bb0b94 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -30,6 +30,7 @@ from typing import ( Awaitable, Callable, ClassVar, + Sequence, Collection, Deque, Dict, @@ -49,7 +50,7 @@ from prometheus_client import Counter, Histogram from twisted.internet import defer -from synapse.api.constants import EventTypes, Membership, EventContentFields +from synapse.api.constants import EventContentFields, EventTypes, Membership from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME @@ -65,20 +66,19 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.controllers.state import StateStorageController from synapse.storage.databases import Databases from synapse.storage.databases.main.events import ( + SLIDING_SYNC_RELEVANT_STATE_SET, DeltaState, - SlidingSyncTableChanges, - SlidingSyncStateInsertValues, - SlidingSyncMembershipSnapshotSharedInsertValues, SlidingSyncMembershipInfo, + SlidingSyncMembershipSnapshotSharedInsertValues, + SlidingSyncStateInsertValues, + SlidingSyncTableChanges, ) from synapse.storage.databases.main.events_worker import EventRedactBehaviour -from synapse.storage.databases.main.events import ( - SLIDING_SYNC_RELEVANT_STATE_SET, -) from synapse.types import ( PersistedEventPosition, RoomStreamToken, StateMap, + MutableStateMap, get_domain_from_id, ) from synapse.types.state import StateFilter @@ -511,8 +511,13 @@ class EventsPersistenceStorageController: """ state = await self._calculate_current_state(room_id) delta = await self._calculate_state_delta(room_id, state) + sliding_sync_table_changes = await self._calculate_sliding_sync_table_changes( + room_id, [], delta + ) - await self.persist_events_store.update_current_state(room_id, delta) + await self.persist_events_store.update_current_state( + room_id, delta, sliding_sync_table_changes + ) async def _calculate_current_state(self, room_id: str) -> StateMap[str]: """Calculate the current state of a room, based on the forward extremities @@ -627,12 +632,13 @@ class EventsPersistenceStorageController: room_id, chunk ) - with Measure(self._clock, "_calculate_sliding_sync_table_changes"): - sliding_sync_table_changes = ( - await self._calculate_sliding_sync_table_changes( - room_id, chunk, state_delta_for_room + if state_delta_for_room is not None: + with Measure(self._clock, "_calculate_sliding_sync_table_changes"): + sliding_sync_table_changes = ( + await self._calculate_sliding_sync_table_changes( + room_id, chunk, state_delta_for_room + ) ) - ) with Measure(self._clock, "calculate_chain_cover_index_for_events"): # We now calculate chain ID/sequence numbers for any state events we're @@ -772,15 +778,26 @@ class EventsPersistenceStorageController: async def _calculate_sliding_sync_table_changes( self, room_id: str, - events_and_contexts: List[Tuple[EventBase, EventContext]], - delta_state: Optional[DeltaState], - ) -> Optional[SlidingSyncTableChanges]: + events_and_contexts: Sequence[Tuple[EventBase, EventContext]], + delta_state: DeltaState, + ) -> SlidingSyncTableChanges: """ TODO + + Args: + room_id: The room ID currently being processed. + events_and_contexts: List of tuples of (event, context) being persisted. + This is completely optional (you can pass an empty list) and will just + save us from fetching the events from the database if we already have + them. + delta_state: Deltas that are going to be used to update the + `current_state_events` table. """ to_insert = delta_state.to_insert to_delete = delta_state.to_delete + event_map = {event.event_id: event for event, _ in events_and_contexts} + # This would only happen if someone was state reset out of the room to_delete_membership_snapshots = [ state_key @@ -788,7 +805,9 @@ class EventsPersistenceStorageController: if event_type == EventTypes.Member and self.is_mine_id(state_key) ] - membership_snapshot_updates = {} + membership_snapshot_shared_insert_values: ( + SlidingSyncMembershipSnapshotSharedInsertValues + ) = {} membership_infos: List[SlidingSyncMembershipInfo] = [] if to_insert: membership_event_id_to_user_id_map: Dict[str, str] = {} @@ -796,18 +815,39 @@ class EventsPersistenceStorageController: if state_key[0] == EventTypes.Member and self.is_mine_id(state_key[1]): membership_event_id_to_user_id_map[event_id] = state_key[1] - event_id_to_sender_map = await _get_sender_for_event_ids(membership_event_id_to_user_id_map.keys()) - membership_infos = [ - SlidingSyncMembershipInfo( - user_id=user_id, - sender=event_id_to_sender_map[event_id], - membership_event_id=membership_event_id + event_id_to_sender_map: Dict[str, str] = {} + # In normal event persist scenarios, we should be able to find the + # membership events in the `events_and_contexts` given to us but it's + # possible a state reset happened which added us to the room without a + # corresponding new membership event (reset back to a previous membership). + missing_membership_event_ids: Set[str] = set() + for membership_event_id in membership_event_id_to_user_id_map.keys(): + membership_event = event_map.get(membership_event_id) + if membership_event: + event_id_to_sender_map[membership_event_id] = ( + membership_event.sender + ) + else: + missing_membership_event_ids.add(membership_event_id) + + # Otherwise, we need to find a couple previous events that we were reset to. + if missing_membership_event_ids: + remaining_event_id_to_sender_map = await _get_sender_for_event_ids( + missing_membership_event_ids ) + event_id_to_sender_map.update(remaining_event_id_to_sender_map) + + membership_infos = [ + { + "user_id": user_id, + "sender": event_id_to_sender_map[event_id], + "membership_event_id": membership_event_id, + } for membership_event_id, user_id in membership_event_id_to_user_id_map.items() ] if membership_infos: - current_state_ids_map = ( + current_state_ids_map: MutableStateMap = dict( await self.main_store.get_partial_filtered_current_state_ids( room_id, state_filter=StateFilter.from_types( @@ -826,27 +866,25 @@ class EventsPersistenceStorageController: if state_key in SLIDING_SYNC_RELEVANT_STATE_SET: current_state_ids_map[state_key] = event_id - event_map = await self.main_store.get_events( + fetched_events = await self.main_store.get_events( current_state_ids_map.values() ) - current_state_map = {} - for key, event_id in current_state_ids_map.items(): - event = event_map.get(event_id) - if event: - current_state_map[key] = event + current_state_map: StateMap[EventBase] = { + key: fetched_events[event_id] + for key, event_id in current_state_ids_map.items() + } # Map of values to insert/update in the `sliding_sync_membership_snapshots` table - membership_snapshot_shared_insert_values = SlidingSyncMembershipSnapshotSharedInsertValues() - has_known_state = False if current_state_map: state_insert_values = ( self._get_sliding_sync_insert_values_from_state_map( current_state_map ) ) + membership_snapshot_shared_insert_values.update(state_insert_values) # We have current state to work from - membership_snapshot_shared_insert_values.has_known_state = True + membership_snapshot_shared_insert_values["has_known_state"] = True else: # We don't have any `current_state_events` anymore (previously # cleared out because of `no_longer_in_room`). This can happen if @@ -856,29 +894,21 @@ class EventsPersistenceStorageController: # rejects the invite (leaves the room), we will end up here. # # In these cases, we should inherit the meta data from the previous - # snapshot (handled by the default `ON CONFLICT ... DO UPDATE SET`). - # When using sliding sync filters, this will prevent the room from + # snapshot so we shouldn't update any of the state values. When + # using sliding sync filters, this will prevent the room from # disappearing/appearing just because you left the room. # # Ideally, we could additionally assert that we're only here for # valid non-join membership transitions. assert delta_state.no_longer_in_room - membership_snapshot_updates = { - (room_id, user_id): SlidingSyncSnapshotInsertValues( - membership_event_id=membership_event_id, - has_known_state=has_known_state, - ) - for membership_event_id, user_id in membership_event_id_to_user_id_map.items() - } - return SlidingSyncTableChanges( room_id=room_id, # For `sliding_sync_joined_rooms` joined_room_updates=TODO, to_delete_joined_rooms=TODO, # For `sliding_sync_membership_snapshots` - membership_snapshot_shared_insert_values=TODO, + membership_snapshot_shared_insert_values=membership_snapshot_shared_insert_values, to_insert_membership_snapshots=membership_infos, to_delete_membership_snapshots=to_delete_membership_snapshots, ) @@ -897,13 +927,15 @@ class EventsPersistenceStorageController: the `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables. """ # Map of values to insert/update in the `sliding_sync_membership_snapshots` table - sliding_sync_insert_map: Dict[str, Optional[Union[str, bool]]] = {} + sliding_sync_insert_map: SlidingSyncStateInsertValues = {} # Parse the raw event JSON for state_key, event in state_map.items(): if state_key == (EventTypes.Create, ""): room_type = event.content.get(EventContentFields.ROOM_TYPE) - sliding_sync_insert_map["room_type"] = room_type + # Scrutinize JSON values + if room_type is None or isinstance(room_type, str): + sliding_sync_insert_map["room_type"] = room_type elif state_key == (EventTypes.RoomEncryption, ""): encryption_algorithm = event.content.get( EventContentFields.ENCRYPTION_ALGORITHM @@ -912,7 +944,9 @@ class EventsPersistenceStorageController: sliding_sync_insert_map["is_encrypted"] = is_encrypted elif state_key == (EventTypes.Name, ""): room_name = event.content.get(EventContentFields.ROOM_NAME) - sliding_sync_insert_map["room_name"] = room_name + # Scrutinize JSON values + if room_name is None or isinstance(room_name, str): + sliding_sync_insert_map["room_name"] = room_name else: # We only expect to see events according to the # `SLIDING_SYNC_RELEVANT_STATE_SET`. @@ -920,11 +954,7 @@ class EventsPersistenceStorageController: f"Unexpected event (we should not be fetching extra events): {state_key} {event.event_id}" ) - return SlidingSyncStateInsertValues( - room_type=room_type, - is_encrypted=encryption_algorithm, - room_name=room_name, - ) + return sliding_sync_insert_map async def _calculate_new_extremities( self, diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 1c6eecff94..277df93459 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -31,6 +31,7 @@ from typing import ( Generator, Iterable, List, + Literal, Optional, Set, Tuple, @@ -125,34 +126,51 @@ class DeltaState: return not self.to_delete and not self.to_insert and not self.no_longer_in_room -@attr.s(slots=True, auto_attribs=True) -class SlidingSyncStateInsertValues: - """ - Insert values relevant for the `sliding_sync_joined_rooms` and - `sliding_sync_membership_snapshots` database tables. - """ - room_type: Optional[str] - is_encrypted: Optional[bool] - room_name: Optional[str] +# @attr.s(slots=True, auto_attribs=True) +# class SlidingSyncStateInsertValues: +# """ +# Insert values relevant for the `sliding_sync_joined_rooms` and +# `sliding_sync_membership_snapshots` database tables. +# """ +# room_type: Optional[str] +# is_encrypted: Optional[bool] +# room_name: Optional[str] + +SlidingSyncStateInsertKeys = Literal["room_type", "is_encrypted", "room_name"] +SlidingSyncStateInsertValues = Dict[ + SlidingSyncStateInsertKeys, Optional[Union[str, bool]] +] -@attr.s(slots=True, auto_attribs=True) -class SlidingSyncMembershipSnapshotSharedInsertValues(SlidingSyncStateInsertValues): - """ - Insert values for `sliding_sync_membership_snapshots` that we can share across - multiple memberships - """ - has_known_state: bool - # TODO: tombstone_successor_room_id: Optional[str] +# @attr.s(slots=True, auto_attribs=True) +# class SlidingSyncMembershipSnapshotSharedInsertValues(SlidingSyncStateInsertValues): +# """ +# Insert values for `sliding_sync_membership_snapshots` that we can share across +# multiple memberships +# """ +# has_known_state: bool +# # TODO: tombstone_successor_room_id: Optional[str] + +SlidingSyncMembershipSnapshotSharedInsertValues = Dict[ + # Instead of using a Union, we use a Literal to be compatible with mypy + # Literal[SlidingSyncStateInsertKeys, "has_known_state"], + Union[SlidingSyncStateInsertKeys, Literal["has_known_state"]], + Optional[Union[str, bool]], +] + +# @attr.s(slots=True, auto_attribs=True) +# class SlidingSyncMembershipInfo(SlidingSyncStateInsertValues): +# """ +# Values unique to each membership +# """ +# user_id: str +# sender: str +# membership_event_id: str + +SlidingSyncMembershipInfo = Dict[ + Literal["user_id", "sender", "membership_event_id"], Optional[Union[str, bool]] +] -@attr.s(slots=True, auto_attribs=True) -class SlidingSyncMembershipInfo(SlidingSyncStateInsertValues): - """ - Values unique to each membership - """ - user_id: str - sender: str - membership_event_id: str @attr.s(slots=True, auto_attribs=True) class SlidingSyncTableChanges: @@ -164,7 +182,9 @@ class SlidingSyncTableChanges: # Shared values to upsert into `sliding_sync_membership_snapshots` for each # `to_insert_membership_snapshots` - membership_snapshot_shared_insert_values: SlidingSyncMembershipSnapshotSharedInsertValues + membership_snapshot_shared_insert_values: ( + SlidingSyncMembershipSnapshotSharedInsertValues + ) # List of membership to insert into `sliding_sync_membership_snapshots` to_insert_membership_snapshots: List[SlidingSyncMembershipInfo] # List of user_id to delete from `sliding_sync_membership_snapshots` @@ -667,6 +687,9 @@ class PersistEventsStore: # room_memberships, where applicable. # NB: This function invalidates all state related caches if state_delta_for_room: + # If the state delta exists, the sliding sync table changes should also exist + assert sliding_sync_table_changes is not None + self._update_current_state_txn( txn, room_id, @@ -1213,6 +1236,7 @@ class PersistEventsStore: self, room_id: str, state_delta: DeltaState, + sliding_sync_table_changes: SlidingSyncTableChanges, ) -> None: """Update the current state stored in the datatabase for the given room""" @@ -1226,6 +1250,7 @@ class PersistEventsStore: room_id, delta_state=state_delta, stream_id=stream_ordering, + sliding_sync_table_changes=sliding_sync_table_changes, ) def _update_current_state_txn( @@ -1234,7 +1259,7 @@ class PersistEventsStore: room_id: str, delta_state: DeltaState, stream_id: int, - sliding_sync_table_changes: Optional[SlidingSyncTableChanges], + sliding_sync_table_changes: SlidingSyncTableChanges, ) -> None: to_delete = delta_state.to_delete to_insert = delta_state.to_insert @@ -1250,73 +1275,6 @@ class PersistEventsStore: if ev_type == EventTypes.Member } - # Handle updating the `sliding_sync_membership_snapshots` table - # - # This would only happen if someone was state reset out of the room - if sliding_sync_table_changes.to_delete_membership_snapshots: - txn.execute_batch( - "DELETE FROM sliding_sync_membership_snapshots" - " WHERE room_id = ? AND user_id = ?", - sliding_sync_table_changes.to_delete_membership_snapshots, - ) - - # We handle `sliding_sync_membership_snapshots` before `current_state_events` so - # we can gather the current state before it might be deleted if we are - # last ones in the room and now we are `no_longer_in_room`. - # - # We do this regardless of whether the server is `no_longer_in_room` or not - # because we still want a row if a local user was just left/kicked or got banned - # from the room. - if sliding_sync_table_changes.membership_snapshot_updates: - - # TODO - for asdf in sliding_sync_table_changes.membership_snapshot_updates: - for attr_name in ["room_type", "is_encrypted", "room_name"] - [ - getattr(x, attr_name) - - ] - - # Update the `sliding_sync_membership_snapshots` table - # - # Pulling keys/values separately is safe and will produce congruent - # lists - insert_keys = sliding_sync_membership_snapshots_insert_map.keys() - insert_values = sliding_sync_membership_snapshots_insert_map.values() - # We need to insert/update regardless of whether we have `insert_keys` - # because there are other fields in the `ON CONFLICT` upsert to run (see - # inherit case above for more context when this happens). - txn.execute_batch( - f""" - INSERT INTO sliding_sync_membership_snapshots - (room_id, user_id, membership_event_id, membership, event_stream_ordering - {("," + ", ".join(insert_keys)) if insert_keys else ""}) - VALUES ( - ?, ?, ?, - (SELECT membership FROM room_memberships WHERE event_id = ?), - (SELECT stream_ordering FROM events WHERE event_id = ?) - {("," + ", ".join("?" for _ in insert_values)) if insert_values else ""} - ) - ON CONFLICT (room_id, user_id) - DO UPDATE SET - membership_event_id = EXCLUDED.membership_event_id, - membership = EXCLUDED.membership, - event_stream_ordering = EXCLUDED.event_stream_ordering - {("," + ", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)) if insert_keys else ""} - """, - [ - [ - room_id, - user_id, - membership_event_id, - membership_event_id, - membership_event_id, - ] - + list(insert_values) - for membership_event_id, user_id in membership_event_id_to_user_id_map.items() - ], - ) - if delta_state.no_longer_in_room: # Server is no longer in the room so we delete the room from # current_state_events, being careful we've already updated the @@ -1545,6 +1503,64 @@ class PersistEventsStore: ], ) + # Handle updating the `sliding_sync_membership_snapshots` table + # + # This would only happen if someone was state reset out of the room + if sliding_sync_table_changes.to_delete_membership_snapshots: + txn.execute_batch( + "DELETE FROM sliding_sync_membership_snapshots" + " WHERE room_id = ? AND user_id = ?", + sliding_sync_table_changes.to_delete_membership_snapshots, + ) + + # We do this regardless of whether the server is `no_longer_in_room` or not + # because we still want a row if a local user was just left/kicked or got banned + # from the room. + if sliding_sync_table_changes.to_insert_membership_snapshots: + # Update the `sliding_sync_membership_snapshots` table + # + # Pulling keys/values separately is safe and will produce congruent + # lists + insert_keys = ( + sliding_sync_table_changes.membership_snapshot_shared_insert_values.keys() + ) + insert_values = ( + sliding_sync_table_changes.membership_snapshot_shared_insert_values.values() + ) + # We need to insert/update regardless of whether we have `insert_keys` + # because there are other fields in the `ON CONFLICT` upsert to run (see + # inherit case above for more context when this happens). + txn.execute_batch( + f""" + INSERT INTO sliding_sync_membership_snapshots + (room_id, user_id, membership_event_id, membership, event_stream_ordering + {("," + ", ".join(insert_keys)) if insert_keys else ""}) + VALUES ( + ?, ?, ?, + (SELECT membership FROM room_memberships WHERE event_id = ?), + (SELECT stream_ordering FROM events WHERE event_id = ?) + {("," + ", ".join("?" for _ in insert_values)) if insert_values else ""} + ) + ON CONFLICT (room_id, user_id) + DO UPDATE SET + membership_event_id = EXCLUDED.membership_event_id, + membership = EXCLUDED.membership, + event_stream_ordering = EXCLUDED.event_stream_ordering + {("," + ", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)) if insert_keys else ""} + """, + [ + [ + room_id, + membership_info["user_id"], + membership_info["membership_event_id"], + membership_info["membership_event_id"], + membership_info["membership_event_id"], + ] + + list(insert_values) + for membership_info in sliding_sync_table_changes.to_insert_membership_snapshots + ], + ) + txn.call_after( self.store._curr_state_delta_stream_cache.entity_has_changed, room_id,