diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index d423d80efa..8eff4d1791 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -137,7 +137,7 @@ class SlidingSyncStateInsertValues(TypedDict, total=False): """ room_type: Optional[str] - is_encrypted: Optional[bool] + is_encrypted: bool room_name: Optional[str] tombstone_successor_room_id: Optional[str] @@ -150,7 +150,7 @@ class SlidingSyncMembershipSnapshotSharedInsertValues( multiple memberships """ - has_known_state: Optional[bool] + has_known_state: bool @attr.s(slots=True, auto_attribs=True) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index f6c03a0650..ff4f064f16 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1881,10 +1881,9 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS keyvalues={"room_id": room_id}, values={}, insertion_values={ + # The reason we're only *inserting* (not *updating*) is because if + # they are present, that means they are already up-to-date. **update_map, - # The reason we're only *inserting* (not *updating*) `event_stream_ordering` - # and `bump_stamp` is because if they are present, that means they are already - # up-to-date. "event_stream_ordering": event_stream_ordering, "bump_stamp": bump_stamp, }, @@ -2122,6 +2121,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS last_to_insert_membership_infos_by_room_id: Dict[ str, SlidingSyncMembershipInfoWithEventPos ] = {} + # TODO: `concurrently_execute` based on buckets of room_ids for ( room_id, room_id_from_rooms_table, @@ -2226,6 +2226,9 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS # we only want some non-membership state await_full_state=False, ) + + # TODO: Read the state from the `room_stats_state` table if we can + # We're iterating over rooms that we are joined to so they should # have `current_state_events` and we should have some current state # for each room @@ -2404,59 +2407,103 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS ) ) + # Assemble data so it's ready for the batch queries in the transaction + key_names = ("room_id", "user_id") + key_values: List[Tuple[str, str]] = [] + insertion_value_names = ( + "has_known_state", + "room_type", + "is_encrypted", + "room_name", + "tombstone_successor_room_id", + "sender", + "membership_event_id", + "membership", + "event_stream_ordering", + "event_instance_name", + ) + insertion_value_values: List[ + Tuple[ + bool, + Optional[str], + bool, + Optional[str], + Optional[str], + str, + str, + str, + int, + str, + ] + ] = [] + forgotten_update_query_args: List[Tuple[str, str, str]] = [] + for key, insert_map in to_insert_membership_snapshots.items(): + room_id, user_id = key + membership_info = to_insert_membership_infos[(room_id, user_id)] + sender = membership_info.sender + membership_event_id = membership_info.membership_event_id + membership = membership_info.membership + membership_event_stream_ordering = ( + membership_info.membership_event_stream_ordering + ) + membership_event_instance_name = ( + membership_info.membership_event_instance_name + ) + + key_values.append((room_id, user_id)) + insertion_value_values.append( + ( + # `has_known_state` should be set to *some* True/False value + insert_map["has_known_state"], + insert_map.get("room_type"), + insert_map.get("is_encrypted", False), + insert_map.get("room_name"), + insert_map.get("tombstone_successor_room_id"), + sender, + membership_event_id, + membership, + membership_event_stream_ordering, + membership_event_instance_name, + ) + ) + + forgotten_update_query_args.append( + ( + membership_event_id, + room_id, + user_id, + ) + ) + def _fill_table_txn(txn: LoggingTransaction) -> None: # Handle updating the `sliding_sync_membership_snapshots` table # - for key, insert_map in to_insert_membership_snapshots.items(): - room_id, user_id = key - membership_info = to_insert_membership_infos[(room_id, user_id)] - sender = membership_info.sender - membership_event_id = membership_info.membership_event_id - membership = membership_info.membership - membership_event_stream_ordering = ( - membership_info.membership_event_stream_ordering - ) - membership_event_instance_name = ( - membership_info.membership_event_instance_name - ) + # We don't need to update in the upsert the state because we never partially + # insert/update the snapshots and anything already there is up-to-date + # EXCEPT for the `forgotten` field since that is updated out-of-band from + # the membership changes. We're using an upsert to avoid unique + # violation errors that would happen from directly inserting. + self.db_pool.simple_upsert_many_txn( + txn, + table="sliding_sync_membership_snapshots", + key_names=key_names, + key_values=key_values, + # TODO: Implement these + insertion_value_names=insertion_value_names, + insertion_value_values=insertion_value_values, + ) - # We don't need to upsert the state because we never partially - # insert/update the snapshots and anything already there is up-to-date - # EXCEPT for the `forgotten` field since that is updated out-of-band - # from the membership changes. - # - # Even though we're only doing insertions, we're using - # `simple_upsert_txn()` here to avoid unique violation errors that would - # happen from `simple_insert_txn()` - self.db_pool.simple_upsert_txn( - txn, - table="sliding_sync_membership_snapshots", - keyvalues={"room_id": room_id, "user_id": user_id}, - values={}, - insertion_values={ - **insert_map, - "sender": sender, - "membership_event_id": membership_event_id, - "membership": membership, - "event_stream_ordering": membership_event_stream_ordering, - "event_instance_name": membership_event_instance_name, - }, - ) - # We need to find the `forgotten` value during the transaction because - # we can't risk inserting stale data. - txn.execute( - """ - UPDATE sliding_sync_membership_snapshots - SET - forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?) - WHERE room_id = ? and user_id = ? - """, - ( - membership_event_id, - room_id, - user_id, - ), - ) + # We need to find the `forgotten` value during the transaction because + # we can't risk inserting stale data. + txn.execute_batch( + """ + UPDATE sliding_sync_membership_snapshots + SET + forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?) + WHERE room_id = ? and user_id = ? + """, + forgotten_update_query_args, + ) await self.db_pool.runInteraction( "sliding_sync_membership_snapshots_bg_update", _fill_table_txn