From f6c2b0ec2ea6c63f569bade481fe2a4f3c1080ae Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Sep 2024 13:16:57 +0100 Subject: [PATCH 1/7] Sliding sync: don't fetch room summary for named rooms. (#17683) For rooms with a name we can skip fetching a full room summary, as we don't need to calculate heroes, and instead just fetch the room counts directly. This also changes things to not return counts and heroes for non-joined rooms. For left/banned rooms we were returning zero values anyway, and for invite/knock rooms we don't really want to leak such information (even if some of is included in the stripped state). --- changelog.d/17683.misc | 1 + synapse/handlers/sliding_sync/__init__.py | 78 +++++++++---------- synapse/storage/_base.py | 2 + synapse/storage/databases/main/roommember.py | 37 ++++++--- synapse/storage/databases/main/state.py | 8 ++ .../main/delta/87/03_current_state_index.sql | 19 +++++ .../client/sliding_sync/test_rooms_meta.py | 52 ++++++------- 7 files changed, 121 insertions(+), 76 deletions(-) create mode 100644 changelog.d/17683.misc create mode 100644 synapse/storage/schema/main/delta/87/03_current_state_index.sql diff --git a/changelog.d/17683.misc b/changelog.d/17683.misc new file mode 100644 index 0000000000..11a10ff854 --- /dev/null +++ b/changelog.d/17683.misc @@ -0,0 +1 @@ +Speed up sliding sync by reducing amount of data pulled out of the database for large rooms. diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py index b097ac57a2..04493494a6 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py @@ -784,32 +784,10 @@ class SlidingSyncHandler: ): avatar_changed = True + # We only need the room summary for calculating heroes, however if we do + # fetch it then we can use it to calculate `joined_count` and + # `invited_count`. room_membership_summary: Optional[Mapping[str, MemberSummary]] = None - empty_membership_summary = MemberSummary([], 0) - # We need the room summary for: - # - Always for initial syncs (or the first time we send down the room) - # - When the room has no name, we need `heroes` - # - When the membership has changed so we need to give updated `heroes` and - # `joined_count`/`invited_count`. - # - # Ideally, instead of just looking at `name_changed`, we'd check if the room - # name is not set but this is a good enough approximation that saves us from - # having to pull out the full event. This just means, we're generating the - # summary whenever the room name changes instead of only when it changes to - # `None`. - if initial or name_changed or membership_changed: - # We can't trace the function directly because it's cached and the `@cached` - # decorator doesn't mix with `@trace` yet. - with start_active_span("get_room_summary"): - if room_membership_for_user_at_to_token.membership in ( - Membership.LEAVE, - Membership.BAN, - ): - # TODO: Figure out how to get the membership summary for left/banned rooms - room_membership_summary = {} - else: - room_membership_summary = await self.store.get_room_summary(room_id) - # TODO: Reverse/rewind back to the `to_token` # `heroes` are required if the room name is not set. # @@ -828,11 +806,45 @@ class SlidingSyncHandler: # get them on initial syncs (or the first time we send down the room) or if the # membership has changed which may change the heroes. if name_event_id is None and (initial or (not initial and membership_changed)): - assert room_membership_summary is not None + # We need the room summary to extract the heroes from + if room_membership_for_user_at_to_token.membership != Membership.JOIN: + # TODO: Figure out how to get the membership summary for left/banned rooms + # For invite/knock rooms we don't include the information. + room_membership_summary = {} + else: + room_membership_summary = await self.store.get_room_summary(room_id) + # TODO: Reverse/rewind back to the `to_token` + hero_user_ids = extract_heroes_from_room_summary( room_membership_summary, me=user.to_string() ) + # Fetch the membership counts for rooms we're joined to. + # + # Similarly to other metadata, we only need to calculate the member + # counts if this is an initial sync or the memberships have changed. + joined_count: Optional[int] = None + invited_count: Optional[int] = None + if ( + initial or membership_changed + ) and room_membership_for_user_at_to_token.membership == Membership.JOIN: + # If we have the room summary (because we calculated heroes above) + # then we can simply pull the counts from there. + if room_membership_summary is not None: + empty_membership_summary = MemberSummary([], 0) + + joined_count = room_membership_summary.get( + Membership.JOIN, empty_membership_summary + ).count + + invited_count = room_membership_summary.get( + Membership.INVITE, empty_membership_summary + ).count + else: + member_counts = await self.store.get_member_counts(room_id) + joined_count = member_counts.get(Membership.JOIN, 0) + invited_count = member_counts.get(Membership.INVITE, 0) + # Fetch the `required_state` for the room # # No `required_state` for invite/knock rooms (just `stripped_state`) @@ -1090,20 +1102,6 @@ class SlidingSyncHandler: set_tag(SynapseTags.RESULT_PREFIX + "initial", initial) - joined_count: Optional[int] = None - if initial or membership_changed: - assert room_membership_summary is not None - joined_count = room_membership_summary.get( - Membership.JOIN, empty_membership_summary - ).count - - invited_count: Optional[int] = None - if initial or membership_changed: - assert room_membership_summary is not None - invited_count = room_membership_summary.get( - Membership.INVITE, empty_membership_summary - ).count - return SlidingSyncResult.RoomResult( name=room_name, avatar=room_avatar, diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index d22160b85c..d6deb077c8 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -112,6 +112,7 @@ class SQLBaseStore(metaclass=ABCMeta): self._attempt_to_invalidate_cache( "get_number_joined_users_in_room", (room_id,) ) + self._attempt_to_invalidate_cache("get_member_counts", (room_id,)) self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,)) # There's no easy way of invalidating this cache for just the users @@ -153,6 +154,7 @@ class SQLBaseStore(metaclass=ABCMeta): self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,)) self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,)) self._attempt_to_invalidate_cache("get_number_joined_users_in_room", (room_id,)) + self._attempt_to_invalidate_cache("get_member_counts", (room_id,)) self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,)) self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None) self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 8df760e8a6..db03729cfe 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -312,18 +312,10 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): # We do this all in one transaction to keep the cache small. # FIXME: get rid of this when we have room_stats - # Note, rejected events will have a null membership field, so - # we we manually filter them out. - sql = """ - SELECT count(*), membership FROM current_state_events - WHERE type = 'm.room.member' AND room_id = ? - AND membership IS NOT NULL - GROUP BY membership - """ + counts = self._get_member_counts_txn(txn, room_id) - txn.execute(sql, (room_id,)) res: Dict[str, MemberSummary] = {} - for count, membership in txn: + for membership, count in counts.items(): res.setdefault(membership, MemberSummary([], count)) # Order by membership (joins -> invites -> leave (former insiders) -> @@ -369,6 +361,31 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): "get_room_summary", _get_room_summary_txn ) + @cached() + async def get_member_counts(self, room_id: str) -> Mapping[str, int]: + """Get a mapping of number of users by membership""" + + return await self.db_pool.runInteraction( + "get_member_counts", self._get_member_counts_txn, room_id + ) + + def _get_member_counts_txn( + self, txn: LoggingTransaction, room_id: str + ) -> Dict[str, int]: + """Get a mapping of number of users by membership""" + + # Note, rejected events will have a null membership field, so + # we we manually filter them out. + sql = """ + SELECT count(*), membership FROM current_state_events + WHERE type = 'm.room.member' AND room_id = ? + AND membership IS NOT NULL + GROUP BY membership + """ + + txn.execute(sql, (room_id,)) + return {membership: count for count, membership in txn} + @cached() async def get_number_joined_users_in_room(self, room_id: str) -> int: return await self.db_pool.simple_select_one_onecol( diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index c5caaf56b0..ca31122ad3 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -736,6 +736,7 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore): CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx" EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index" DELETE_CURRENT_STATE_UPDATE_NAME = "delete_old_current_state_events" + MEMBERS_CURRENT_STATE_UPDATE_NAME = "current_state_events_members_room_index" def __init__( self, @@ -764,6 +765,13 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore): self.DELETE_CURRENT_STATE_UPDATE_NAME, self._background_remove_left_rooms, ) + self.db_pool.updates.register_background_index_update( + self.MEMBERS_CURRENT_STATE_UPDATE_NAME, + index_name="current_state_events_members_room_index", + table="current_state_events", + columns=["room_id", "membership"], + where_clause="type='m.room.member'", + ) async def _background_remove_left_rooms( self, progress: JsonDict, batch_size: int diff --git a/synapse/storage/schema/main/delta/87/03_current_state_index.sql b/synapse/storage/schema/main/delta/87/03_current_state_index.sql new file mode 100644 index 0000000000..76b974271c --- /dev/null +++ b/synapse/storage/schema/main/delta/87/03_current_state_index.sql @@ -0,0 +1,19 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2024 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- . + + +-- Add a background updates to add a new index: +-- `current_state_events(room_id, membership) WHERE type = 'm.room.member' +-- This makes counting membership in rooms (for syncs) much faster +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (8701, 'current_state_events_members_room_index', '{}'); diff --git a/tests/rest/client/sliding_sync/test_rooms_meta.py b/tests/rest/client/sliding_sync/test_rooms_meta.py index 6d2742e25f..6dbce7126f 100644 --- a/tests/rest/client/sliding_sync/test_rooms_meta.py +++ b/tests/rest/client/sliding_sync/test_rooms_meta.py @@ -371,14 +371,17 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase): "mxc://UPDATED_DUMMY_MEDIA_ID", response_body["rooms"][room_id1], ) - self.assertEqual( - response_body["rooms"][room_id1]["joined_count"], - 1, + + # We don't give extra room information to invitees + self.assertNotIn( + "joined_count", + response_body["rooms"][room_id1], ) - self.assertEqual( - response_body["rooms"][room_id1]["invited_count"], - 1, + self.assertNotIn( + "invited_count", + response_body["rooms"][room_id1], ) + self.assertIsNone( response_body["rooms"][room_id1].get("is_dm"), ) @@ -450,15 +453,16 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase): "mxc://DUMMY_MEDIA_ID", response_body["rooms"][room_id1], ) - self.assertEqual( - response_body["rooms"][room_id1]["joined_count"], - # FIXME: The actual number should be "1" (user2) but we currently don't - # support this for rooms where the user has left/been banned. - 0, + + # FIXME: We possibly want to return joined and invited counts for rooms + # you're banned form + self.assertNotIn( + "joined_count", + response_body["rooms"][room_id1], ) - self.assertEqual( - response_body["rooms"][room_id1]["invited_count"], - 0, + self.assertNotIn( + "invited_count", + response_body["rooms"][room_id1], ) self.assertIsNone( response_body["rooms"][room_id1].get("is_dm"), @@ -692,19 +696,15 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase): [], ) - self.assertEqual( - response_body["rooms"][room_id1]["joined_count"], - # FIXME: The actual number should be "1" (user2) but we currently don't - # support this for rooms where the user has left/been banned. - 0, + # FIXME: We possibly want to return joined and invited counts for rooms + # you're banned form + self.assertNotIn( + "joined_count", + response_body["rooms"][room_id1], ) - self.assertEqual( - response_body["rooms"][room_id1]["invited_count"], - # We shouldn't see user5 since they were invited after user1 was banned. - # - # FIXME: The actual number should be "1" (user3) but we currently don't - # support this for rooms where the user has left/been banned. - 0, + self.assertNotIn( + "invited_count", + response_body["rooms"][room_id1], ) def test_rooms_meta_heroes_incremental_sync_no_change(self) -> None: From 596b96411bfd852e757a8673ecac0dbc31f77735 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Sep 2024 15:38:46 +0100 Subject: [PATCH 2/7] Sliding sync: various fixups to the background update (#17652) --- changelog.d/17652.misc | 1 + synapse/storage/databases/main/events.py | 45 ++++- .../databases/main/events_bg_updates.py | 190 +++++++++++++----- tests/storage/test_sliding_sync_tables.py | 130 ------------ 4 files changed, 186 insertions(+), 180 deletions(-) create mode 100644 changelog.d/17652.misc diff --git a/changelog.d/17652.misc b/changelog.d/17652.misc new file mode 100644 index 0000000000..756918e2b2 --- /dev/null +++ b/changelog.d/17652.misc @@ -0,0 +1 @@ +Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting. diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index e5f63019fd..c0b7d8107d 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1980,7 +1980,12 @@ class PersistEventsStore: if state_key == (EventTypes.Create, ""): room_type = event.content.get(EventContentFields.ROOM_TYPE) # Scrutinize JSON values - if room_type is None or isinstance(room_type, str): + if room_type is None or ( + isinstance(room_type, str) + # We ignore values with null bytes as Postgres doesn't allow them in + # text columns. + and "\0" not in room_type + ): sliding_sync_insert_map["room_type"] = room_type elif state_key == (EventTypes.RoomEncryption, ""): encryption_algorithm = event.content.get( @@ -1990,15 +1995,26 @@ class PersistEventsStore: sliding_sync_insert_map["is_encrypted"] = is_encrypted elif state_key == (EventTypes.Name, ""): room_name = event.content.get(EventContentFields.ROOM_NAME) - # Scrutinize JSON values - if room_name is None or isinstance(room_name, str): + # Scrutinize JSON values. We ignore values with nulls as + # postgres doesn't allow null bytes in text columns. + if room_name is None or ( + isinstance(room_name, str) + # We ignore values with null bytes as Postgres doesn't allow them in + # text columns. + and "\0" not in room_name + ): sliding_sync_insert_map["room_name"] = room_name elif state_key == (EventTypes.Tombstone, ""): successor_room_id = event.content.get( EventContentFields.TOMBSTONE_SUCCESSOR_ROOM ) # Scrutinize JSON values - if successor_room_id is None or isinstance(successor_room_id, str): + if successor_room_id is None or ( + isinstance(successor_room_id, str) + # We ignore values with null bytes as Postgres doesn't allow them in + # text columns. + and "\0" not in successor_room_id + ): sliding_sync_insert_map["tombstone_successor_room_id"] = ( successor_room_id ) @@ -2081,6 +2097,21 @@ class PersistEventsStore: else None ) + # Check for null bytes in the room name and type. We have to + # ignore values with null bytes as Postgres doesn't allow them + # in text columns. + if ( + sliding_sync_insert_map["room_name"] is not None + and "\0" in sliding_sync_insert_map["room_name"] + ): + sliding_sync_insert_map.pop("room_name") + + if ( + sliding_sync_insert_map["room_type"] is not None + and "\0" in sliding_sync_insert_map["room_type"] + ): + sliding_sync_insert_map.pop("room_type") + # Find the tombstone_successor_room_id # Note: This isn't one of the stripped state events according to the spec # but seems like there is no reason not to support this kind of thing. @@ -2095,6 +2126,12 @@ class PersistEventsStore: else None ) + if ( + sliding_sync_insert_map["tombstone_successor_room_id"] is not None + and "\0" in sliding_sync_insert_map["tombstone_successor_room_id"] + ): + sliding_sync_insert_map.pop("tombstone_successor_room_id") + else: # No stripped state provided sliding_sync_insert_map["has_known_state"] = False diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index b3244f7457..743200471b 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -47,6 +47,7 @@ from synapse.storage.databases.main.events_worker import ( ) from synapse.storage.databases.main.state_deltas import StateDeltasStore from synapse.storage.databases.main.stream import StreamWorkerStore +from synapse.storage.engines import PostgresEngine from synapse.storage.types import Cursor from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES @@ -1877,9 +1878,29 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS def _find_memberships_to_update_txn( txn: LoggingTransaction, ) -> List[ - Tuple[str, Optional[str], str, str, str, str, int, Optional[str], bool] + Tuple[ + str, + Optional[str], + Optional[str], + str, + str, + str, + str, + int, + Optional[str], + bool, + ] ]: # Fetch the set of event IDs that we want to update + # + # We skip over rows which we've already handled, i.e. have a + # matching row in `sliding_sync_membership_snapshots` with the same + # room, user and event ID. + # + # We also ignore rooms that the user has left themselves (i.e. not + # kicked). This is to avoid having to port lots of old rooms that we + # will never send down sliding sync (as we exclude such rooms from + # initial syncs). if initial_phase: # There are some old out-of-band memberships (before @@ -1892,6 +1913,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS SELECT c.room_id, r.room_id, + r.room_version, c.user_id, e.sender, c.event_id, @@ -1900,9 +1922,11 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS e.instance_name, e.outlier FROM local_current_membership AS c + LEFT JOIN sliding_sync_membership_snapshots AS m USING (room_id, user_id) INNER JOIN events AS e USING (event_id) LEFT JOIN rooms AS r ON (c.room_id = r.room_id) WHERE (c.room_id, c.user_id) > (?, ?) + AND (m.user_id IS NULL OR c.event_id != m.membership_event_id) ORDER BY c.room_id ASC, c.user_id ASC LIMIT ? """, @@ -1922,7 +1946,8 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS """ SELECT c.room_id, - c.room_id, + r.room_id, + r.room_version, c.user_id, e.sender, c.event_id, @@ -1931,9 +1956,12 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS e.instance_name, e.outlier FROM local_current_membership AS c + LEFT JOIN sliding_sync_membership_snapshots AS m USING (room_id, user_id) INNER JOIN events AS e USING (event_id) - WHERE event_stream_ordering > ? - ORDER BY event_stream_ordering ASC + LEFT JOIN rooms AS r ON (c.room_id = r.room_id) + WHERE c.event_stream_ordering > ? + AND (m.user_id IS NULL OR c.event_id != m.membership_event_id) + ORDER BY c.event_stream_ordering ASC LIMIT ? """, (last_event_stream_ordering, batch_size), @@ -1944,7 +1972,16 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS memberships_to_update_rows = cast( List[ Tuple[ - str, Optional[str], str, str, str, str, int, Optional[str], bool + str, + Optional[str], + Optional[str], + str, + str, + str, + str, + int, + Optional[str], + bool, ] ], txn.fetchall(), @@ -1977,7 +2014,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS def _find_previous_invite_or_knock_membership_txn( txn: LoggingTransaction, room_id: str, user_id: str, event_id: str - ) -> Tuple[str, str]: + ) -> Optional[Tuple[str, str]]: # Find the previous invite/knock event before the leave event # # Here are some notes on how we landed on this query: @@ -2027,8 +2064,13 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS ) row = txn.fetchone() - # We should see a corresponding previous invite/knock event - assert row is not None + if row is None: + # Generally we should have an invite or knock event for leaves + # that are outliers, however this may not always be the case + # (e.g. a local user got kicked but the kick event got pulled in + # as an outlier). + return None + event_id, membership = row return event_id, membership @@ -2043,6 +2085,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS for ( room_id, room_id_from_rooms_table, + room_version_id, user_id, sender, membership_event_id, @@ -2061,6 +2104,14 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS Membership.BAN, ) + if ( + room_version_id is not None + and room_version_id not in KNOWN_ROOM_VERSIONS + ): + # Ignore rooms with unknown room versions (these were + # experimental rooms, that we no longer support). + continue + # There are some old out-of-band memberships (before # https://github.com/matrix-org/synapse/issues/6983) where we don't have the # corresponding room stored in the `rooms` table`. We have a `FOREIGN KEY` @@ -2148,14 +2199,17 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS # in the events table though. We'll just say that we don't # know the state for these rooms and continue on with our # day. - sliding_sync_membership_snapshots_insert_map["has_known_state"] = ( - False - ) + sliding_sync_membership_snapshots_insert_map = { + "has_known_state": False, + "room_type": None, + "room_name": None, + "is_encrypted": False, + } elif membership in (Membership.INVITE, Membership.KNOCK) or ( membership in (Membership.LEAVE, Membership.BAN) and is_outlier ): - invite_or_knock_event_id = membership_event_id - invite_or_knock_membership = membership + invite_or_knock_event_id = None + invite_or_knock_membership = None # If the event is an `out_of_band_membership` (special case of # `outlier`), we never had historical state so we have to pull from @@ -2164,35 +2218,55 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS # membership (i.e. the room shouldn't disappear if your using the # `is_encrypted` filter and you leave). if membership in (Membership.LEAVE, Membership.BAN) and is_outlier: - ( - invite_or_knock_event_id, - invite_or_knock_membership, - ) = await self.db_pool.runInteraction( + previous_membership = await self.db_pool.runInteraction( "sliding_sync_membership_snapshots_bg_update._find_previous_invite_or_knock_membership_txn", _find_previous_invite_or_knock_membership_txn, room_id, user_id, membership_event_id, ) + if previous_membership is not None: + ( + invite_or_knock_event_id, + invite_or_knock_membership, + ) = previous_membership + else: + invite_or_knock_event_id = membership_event_id + invite_or_knock_membership = membership - # Pull from the stripped state on the invite/knock event - invite_or_knock_event = await self.get_event(invite_or_knock_event_id) - - raw_stripped_state_events = None - if invite_or_knock_membership == Membership.INVITE: - invite_room_state = invite_or_knock_event.unsigned.get( - "invite_room_state" + if ( + invite_or_knock_event_id is not None + and invite_or_knock_membership is not None + ): + # Pull from the stripped state on the invite/knock event + invite_or_knock_event = await self.get_event( + invite_or_knock_event_id ) - raw_stripped_state_events = invite_room_state - elif invite_or_knock_membership == Membership.KNOCK: - knock_room_state = invite_or_knock_event.unsigned.get( - "knock_room_state" - ) - raw_stripped_state_events = knock_room_state - sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_stripped_state( - raw_stripped_state_events - ) + raw_stripped_state_events = None + if invite_or_knock_membership == Membership.INVITE: + invite_room_state = invite_or_knock_event.unsigned.get( + "invite_room_state" + ) + raw_stripped_state_events = invite_room_state + elif invite_or_knock_membership == Membership.KNOCK: + knock_room_state = invite_or_knock_event.unsigned.get( + "knock_room_state" + ) + raw_stripped_state_events = knock_room_state + + sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_stripped_state( + raw_stripped_state_events + ) + else: + # We couldn't find any state for the membership, so we just have to + # leave it as empty. + sliding_sync_membership_snapshots_insert_map = { + "has_known_state": False, + "room_type": None, + "room_name": None, + "is_encrypted": False, + } # We should have some insert values for each room, even if no # stripped state is on the event because we still want to record @@ -2311,19 +2385,42 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS ) # We need to find the `forgotten` value during the transaction because # we can't risk inserting stale data. - txn.execute( - """ - UPDATE sliding_sync_membership_snapshots - SET - forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?) - WHERE room_id = ? and user_id = ? - """, - ( - membership_event_id, - room_id, - user_id, - ), - ) + if isinstance(txn.database_engine, PostgresEngine): + txn.execute( + """ + UPDATE sliding_sync_membership_snapshots + SET + forgotten = m.forgotten + FROM room_memberships AS m + WHERE sliding_sync_membership_snapshots.room_id = ? + AND sliding_sync_membership_snapshots.user_id = ? + AND membership_event_id = ? + AND membership_event_id = m.event_id + AND m.event_id IS NOT NULL + """, + ( + room_id, + user_id, + membership_event_id, + ), + ) + else: + # SQLite doesn't support UPDATE FROM before 3.33.0, so we do + # this via sub-selects. + txn.execute( + """ + UPDATE sliding_sync_membership_snapshots + SET + forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?) + WHERE room_id = ? and user_id = ? AND membership_event_id = ? + """, + ( + membership_event_id, + room_id, + user_id, + membership_event_id, + ), + ) await self.db_pool.runInteraction( "sliding_sync_membership_snapshots_bg_update", _fill_table_txn @@ -2333,6 +2430,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS ( room_id, _room_id_from_rooms_table, + _room_version_id, user_id, _sender, _membership_event_id, diff --git a/tests/storage/test_sliding_sync_tables.py b/tests/storage/test_sliding_sync_tables.py index de80ad53cd..61dccc8077 100644 --- a/tests/storage/test_sliding_sync_tables.py +++ b/tests/storage/test_sliding_sync_tables.py @@ -4416,136 +4416,6 @@ class SlidingSyncTablesBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase): ), ) - def test_membership_snapshots_background_update_forgotten_partial(self) -> None: - """ - Test an existing `sliding_sync_membership_snapshots` row is updated with the - latest `forgotten` status after the background update passes over it. - """ - user1_id = self.register_user("user1", "pass") - user1_tok = self.login(user1_id, "pass") - user2_id = self.register_user("user2", "pass") - user2_tok = self.login(user2_id, "pass") - - room_id = self.helper.create_room_as(user2_id, tok=user2_tok) - - # User1 joins the room - self.helper.join(room_id, user1_id, tok=user1_tok) - # User1 leaves the room (we have to leave in order to forget the room) - self.helper.leave(room_id, user1_id, tok=user1_tok) - - state_map = self.get_success( - self.storage_controllers.state.get_current_state(room_id) - ) - - # Forget the room - channel = self.make_request( - "POST", - f"/_matrix/client/r0/rooms/{room_id}/forget", - content={}, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.result) - - # Clean-up the `sliding_sync_joined_rooms` table as if the forgotten status - # never made it into the table. - self.get_success( - self.store.db_pool.simple_update( - table="sliding_sync_membership_snapshots", - keyvalues={"room_id": room_id}, - updatevalues={"forgotten": 0}, - desc="sliding_sync_membership_snapshots.test_membership_snapshots_background_update_forgotten_partial", - ) - ) - - # We should see the partial row that we made in preparation for the test. - sliding_sync_membership_snapshots_results = ( - self._get_sliding_sync_membership_snapshots() - ) - self.assertIncludes( - set(sliding_sync_membership_snapshots_results.keys()), - { - (room_id, user1_id), - (room_id, user2_id), - }, - exact=True, - ) - user1_snapshot = _SlidingSyncMembershipSnapshotResult( - room_id=room_id, - user_id=user1_id, - sender=user1_id, - membership_event_id=state_map[(EventTypes.Member, user1_id)].event_id, - membership=Membership.LEAVE, - event_stream_ordering=state_map[ - (EventTypes.Member, user1_id) - ].internal_metadata.stream_ordering, - has_known_state=True, - room_type=None, - room_name=None, - is_encrypted=False, - tombstone_successor_room_id=None, - # Room is *not* forgotten because of our test preparation - forgotten=False, - ) - self.assertEqual( - sliding_sync_membership_snapshots_results.get((room_id, user1_id)), - user1_snapshot, - ) - user2_snapshot = _SlidingSyncMembershipSnapshotResult( - room_id=room_id, - user_id=user2_id, - sender=user2_id, - membership_event_id=state_map[(EventTypes.Member, user2_id)].event_id, - membership=Membership.JOIN, - event_stream_ordering=state_map[ - (EventTypes.Member, user2_id) - ].internal_metadata.stream_ordering, - has_known_state=True, - room_type=None, - room_name=None, - is_encrypted=False, - tombstone_successor_room_id=None, - ) - self.assertEqual( - sliding_sync_membership_snapshots_results.get((room_id, user2_id)), - user2_snapshot, - ) - - # Insert and run the background update. - self.get_success( - self.store.db_pool.simple_insert( - "background_updates", - { - "update_name": _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE, - "progress_json": "{}", - }, - ) - ) - self.store.db_pool.updates._all_done = False - self.wait_for_background_updates() - - # Make sure the table is populated - sliding_sync_membership_snapshots_results = ( - self._get_sliding_sync_membership_snapshots() - ) - self.assertIncludes( - set(sliding_sync_membership_snapshots_results.keys()), - { - (room_id, user1_id), - (room_id, user2_id), - }, - exact=True, - ) - # Forgotten status is now updated - self.assertEqual( - sliding_sync_membership_snapshots_results.get((room_id, user1_id)), - attr.evolve(user1_snapshot, forgotten=True), - ) - # Holds the info according to the current state when the user joined - self.assertEqual( - sliding_sync_membership_snapshots_results.get((room_id, user2_id)), - user2_snapshot, - ) - class SlidingSyncTablesCatchUpBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase): """ From b732d13d4cf6e6b474980b16b4920eebebe4b197 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Sep 2024 15:38:46 +0100 Subject: [PATCH 3/7] Sliding sync: various fixups to the background update (#17652) --- changelog.d/17652.misc | 1 + synapse/storage/databases/main/events.py | 45 ++++- .../databases/main/events_bg_updates.py | 190 +++++++++++++----- tests/storage/test_sliding_sync_tables.py | 130 ------------ 4 files changed, 186 insertions(+), 180 deletions(-) create mode 100644 changelog.d/17652.misc diff --git a/changelog.d/17652.misc b/changelog.d/17652.misc new file mode 100644 index 0000000000..756918e2b2 --- /dev/null +++ b/changelog.d/17652.misc @@ -0,0 +1 @@ +Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting. diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index e5f63019fd..c0b7d8107d 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1980,7 +1980,12 @@ class PersistEventsStore: if state_key == (EventTypes.Create, ""): room_type = event.content.get(EventContentFields.ROOM_TYPE) # Scrutinize JSON values - if room_type is None or isinstance(room_type, str): + if room_type is None or ( + isinstance(room_type, str) + # We ignore values with null bytes as Postgres doesn't allow them in + # text columns. + and "\0" not in room_type + ): sliding_sync_insert_map["room_type"] = room_type elif state_key == (EventTypes.RoomEncryption, ""): encryption_algorithm = event.content.get( @@ -1990,15 +1995,26 @@ class PersistEventsStore: sliding_sync_insert_map["is_encrypted"] = is_encrypted elif state_key == (EventTypes.Name, ""): room_name = event.content.get(EventContentFields.ROOM_NAME) - # Scrutinize JSON values - if room_name is None or isinstance(room_name, str): + # Scrutinize JSON values. We ignore values with nulls as + # postgres doesn't allow null bytes in text columns. + if room_name is None or ( + isinstance(room_name, str) + # We ignore values with null bytes as Postgres doesn't allow them in + # text columns. + and "\0" not in room_name + ): sliding_sync_insert_map["room_name"] = room_name elif state_key == (EventTypes.Tombstone, ""): successor_room_id = event.content.get( EventContentFields.TOMBSTONE_SUCCESSOR_ROOM ) # Scrutinize JSON values - if successor_room_id is None or isinstance(successor_room_id, str): + if successor_room_id is None or ( + isinstance(successor_room_id, str) + # We ignore values with null bytes as Postgres doesn't allow them in + # text columns. + and "\0" not in successor_room_id + ): sliding_sync_insert_map["tombstone_successor_room_id"] = ( successor_room_id ) @@ -2081,6 +2097,21 @@ class PersistEventsStore: else None ) + # Check for null bytes in the room name and type. We have to + # ignore values with null bytes as Postgres doesn't allow them + # in text columns. + if ( + sliding_sync_insert_map["room_name"] is not None + and "\0" in sliding_sync_insert_map["room_name"] + ): + sliding_sync_insert_map.pop("room_name") + + if ( + sliding_sync_insert_map["room_type"] is not None + and "\0" in sliding_sync_insert_map["room_type"] + ): + sliding_sync_insert_map.pop("room_type") + # Find the tombstone_successor_room_id # Note: This isn't one of the stripped state events according to the spec # but seems like there is no reason not to support this kind of thing. @@ -2095,6 +2126,12 @@ class PersistEventsStore: else None ) + if ( + sliding_sync_insert_map["tombstone_successor_room_id"] is not None + and "\0" in sliding_sync_insert_map["tombstone_successor_room_id"] + ): + sliding_sync_insert_map.pop("tombstone_successor_room_id") + else: # No stripped state provided sliding_sync_insert_map["has_known_state"] = False diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index b3244f7457..743200471b 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -47,6 +47,7 @@ from synapse.storage.databases.main.events_worker import ( ) from synapse.storage.databases.main.state_deltas import StateDeltasStore from synapse.storage.databases.main.stream import StreamWorkerStore +from synapse.storage.engines import PostgresEngine from synapse.storage.types import Cursor from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES @@ -1877,9 +1878,29 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS def _find_memberships_to_update_txn( txn: LoggingTransaction, ) -> List[ - Tuple[str, Optional[str], str, str, str, str, int, Optional[str], bool] + Tuple[ + str, + Optional[str], + Optional[str], + str, + str, + str, + str, + int, + Optional[str], + bool, + ] ]: # Fetch the set of event IDs that we want to update + # + # We skip over rows which we've already handled, i.e. have a + # matching row in `sliding_sync_membership_snapshots` with the same + # room, user and event ID. + # + # We also ignore rooms that the user has left themselves (i.e. not + # kicked). This is to avoid having to port lots of old rooms that we + # will never send down sliding sync (as we exclude such rooms from + # initial syncs). if initial_phase: # There are some old out-of-band memberships (before @@ -1892,6 +1913,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS SELECT c.room_id, r.room_id, + r.room_version, c.user_id, e.sender, c.event_id, @@ -1900,9 +1922,11 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS e.instance_name, e.outlier FROM local_current_membership AS c + LEFT JOIN sliding_sync_membership_snapshots AS m USING (room_id, user_id) INNER JOIN events AS e USING (event_id) LEFT JOIN rooms AS r ON (c.room_id = r.room_id) WHERE (c.room_id, c.user_id) > (?, ?) + AND (m.user_id IS NULL OR c.event_id != m.membership_event_id) ORDER BY c.room_id ASC, c.user_id ASC LIMIT ? """, @@ -1922,7 +1946,8 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS """ SELECT c.room_id, - c.room_id, + r.room_id, + r.room_version, c.user_id, e.sender, c.event_id, @@ -1931,9 +1956,12 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS e.instance_name, e.outlier FROM local_current_membership AS c + LEFT JOIN sliding_sync_membership_snapshots AS m USING (room_id, user_id) INNER JOIN events AS e USING (event_id) - WHERE event_stream_ordering > ? - ORDER BY event_stream_ordering ASC + LEFT JOIN rooms AS r ON (c.room_id = r.room_id) + WHERE c.event_stream_ordering > ? + AND (m.user_id IS NULL OR c.event_id != m.membership_event_id) + ORDER BY c.event_stream_ordering ASC LIMIT ? """, (last_event_stream_ordering, batch_size), @@ -1944,7 +1972,16 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS memberships_to_update_rows = cast( List[ Tuple[ - str, Optional[str], str, str, str, str, int, Optional[str], bool + str, + Optional[str], + Optional[str], + str, + str, + str, + str, + int, + Optional[str], + bool, ] ], txn.fetchall(), @@ -1977,7 +2014,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS def _find_previous_invite_or_knock_membership_txn( txn: LoggingTransaction, room_id: str, user_id: str, event_id: str - ) -> Tuple[str, str]: + ) -> Optional[Tuple[str, str]]: # Find the previous invite/knock event before the leave event # # Here are some notes on how we landed on this query: @@ -2027,8 +2064,13 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS ) row = txn.fetchone() - # We should see a corresponding previous invite/knock event - assert row is not None + if row is None: + # Generally we should have an invite or knock event for leaves + # that are outliers, however this may not always be the case + # (e.g. a local user got kicked but the kick event got pulled in + # as an outlier). + return None + event_id, membership = row return event_id, membership @@ -2043,6 +2085,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS for ( room_id, room_id_from_rooms_table, + room_version_id, user_id, sender, membership_event_id, @@ -2061,6 +2104,14 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS Membership.BAN, ) + if ( + room_version_id is not None + and room_version_id not in KNOWN_ROOM_VERSIONS + ): + # Ignore rooms with unknown room versions (these were + # experimental rooms, that we no longer support). + continue + # There are some old out-of-band memberships (before # https://github.com/matrix-org/synapse/issues/6983) where we don't have the # corresponding room stored in the `rooms` table`. We have a `FOREIGN KEY` @@ -2148,14 +2199,17 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS # in the events table though. We'll just say that we don't # know the state for these rooms and continue on with our # day. - sliding_sync_membership_snapshots_insert_map["has_known_state"] = ( - False - ) + sliding_sync_membership_snapshots_insert_map = { + "has_known_state": False, + "room_type": None, + "room_name": None, + "is_encrypted": False, + } elif membership in (Membership.INVITE, Membership.KNOCK) or ( membership in (Membership.LEAVE, Membership.BAN) and is_outlier ): - invite_or_knock_event_id = membership_event_id - invite_or_knock_membership = membership + invite_or_knock_event_id = None + invite_or_knock_membership = None # If the event is an `out_of_band_membership` (special case of # `outlier`), we never had historical state so we have to pull from @@ -2164,35 +2218,55 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS # membership (i.e. the room shouldn't disappear if your using the # `is_encrypted` filter and you leave). if membership in (Membership.LEAVE, Membership.BAN) and is_outlier: - ( - invite_or_knock_event_id, - invite_or_knock_membership, - ) = await self.db_pool.runInteraction( + previous_membership = await self.db_pool.runInteraction( "sliding_sync_membership_snapshots_bg_update._find_previous_invite_or_knock_membership_txn", _find_previous_invite_or_knock_membership_txn, room_id, user_id, membership_event_id, ) + if previous_membership is not None: + ( + invite_or_knock_event_id, + invite_or_knock_membership, + ) = previous_membership + else: + invite_or_knock_event_id = membership_event_id + invite_or_knock_membership = membership - # Pull from the stripped state on the invite/knock event - invite_or_knock_event = await self.get_event(invite_or_knock_event_id) - - raw_stripped_state_events = None - if invite_or_knock_membership == Membership.INVITE: - invite_room_state = invite_or_knock_event.unsigned.get( - "invite_room_state" + if ( + invite_or_knock_event_id is not None + and invite_or_knock_membership is not None + ): + # Pull from the stripped state on the invite/knock event + invite_or_knock_event = await self.get_event( + invite_or_knock_event_id ) - raw_stripped_state_events = invite_room_state - elif invite_or_knock_membership == Membership.KNOCK: - knock_room_state = invite_or_knock_event.unsigned.get( - "knock_room_state" - ) - raw_stripped_state_events = knock_room_state - sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_stripped_state( - raw_stripped_state_events - ) + raw_stripped_state_events = None + if invite_or_knock_membership == Membership.INVITE: + invite_room_state = invite_or_knock_event.unsigned.get( + "invite_room_state" + ) + raw_stripped_state_events = invite_room_state + elif invite_or_knock_membership == Membership.KNOCK: + knock_room_state = invite_or_knock_event.unsigned.get( + "knock_room_state" + ) + raw_stripped_state_events = knock_room_state + + sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_stripped_state( + raw_stripped_state_events + ) + else: + # We couldn't find any state for the membership, so we just have to + # leave it as empty. + sliding_sync_membership_snapshots_insert_map = { + "has_known_state": False, + "room_type": None, + "room_name": None, + "is_encrypted": False, + } # We should have some insert values for each room, even if no # stripped state is on the event because we still want to record @@ -2311,19 +2385,42 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS ) # We need to find the `forgotten` value during the transaction because # we can't risk inserting stale data. - txn.execute( - """ - UPDATE sliding_sync_membership_snapshots - SET - forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?) - WHERE room_id = ? and user_id = ? - """, - ( - membership_event_id, - room_id, - user_id, - ), - ) + if isinstance(txn.database_engine, PostgresEngine): + txn.execute( + """ + UPDATE sliding_sync_membership_snapshots + SET + forgotten = m.forgotten + FROM room_memberships AS m + WHERE sliding_sync_membership_snapshots.room_id = ? + AND sliding_sync_membership_snapshots.user_id = ? + AND membership_event_id = ? + AND membership_event_id = m.event_id + AND m.event_id IS NOT NULL + """, + ( + room_id, + user_id, + membership_event_id, + ), + ) + else: + # SQLite doesn't support UPDATE FROM before 3.33.0, so we do + # this via sub-selects. + txn.execute( + """ + UPDATE sliding_sync_membership_snapshots + SET + forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?) + WHERE room_id = ? and user_id = ? AND membership_event_id = ? + """, + ( + membership_event_id, + room_id, + user_id, + membership_event_id, + ), + ) await self.db_pool.runInteraction( "sliding_sync_membership_snapshots_bg_update", _fill_table_txn @@ -2333,6 +2430,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS ( room_id, _room_id_from_rooms_table, + _room_version_id, user_id, _sender, _membership_event_id, diff --git a/tests/storage/test_sliding_sync_tables.py b/tests/storage/test_sliding_sync_tables.py index de80ad53cd..61dccc8077 100644 --- a/tests/storage/test_sliding_sync_tables.py +++ b/tests/storage/test_sliding_sync_tables.py @@ -4416,136 +4416,6 @@ class SlidingSyncTablesBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase): ), ) - def test_membership_snapshots_background_update_forgotten_partial(self) -> None: - """ - Test an existing `sliding_sync_membership_snapshots` row is updated with the - latest `forgotten` status after the background update passes over it. - """ - user1_id = self.register_user("user1", "pass") - user1_tok = self.login(user1_id, "pass") - user2_id = self.register_user("user2", "pass") - user2_tok = self.login(user2_id, "pass") - - room_id = self.helper.create_room_as(user2_id, tok=user2_tok) - - # User1 joins the room - self.helper.join(room_id, user1_id, tok=user1_tok) - # User1 leaves the room (we have to leave in order to forget the room) - self.helper.leave(room_id, user1_id, tok=user1_tok) - - state_map = self.get_success( - self.storage_controllers.state.get_current_state(room_id) - ) - - # Forget the room - channel = self.make_request( - "POST", - f"/_matrix/client/r0/rooms/{room_id}/forget", - content={}, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.result) - - # Clean-up the `sliding_sync_joined_rooms` table as if the forgotten status - # never made it into the table. - self.get_success( - self.store.db_pool.simple_update( - table="sliding_sync_membership_snapshots", - keyvalues={"room_id": room_id}, - updatevalues={"forgotten": 0}, - desc="sliding_sync_membership_snapshots.test_membership_snapshots_background_update_forgotten_partial", - ) - ) - - # We should see the partial row that we made in preparation for the test. - sliding_sync_membership_snapshots_results = ( - self._get_sliding_sync_membership_snapshots() - ) - self.assertIncludes( - set(sliding_sync_membership_snapshots_results.keys()), - { - (room_id, user1_id), - (room_id, user2_id), - }, - exact=True, - ) - user1_snapshot = _SlidingSyncMembershipSnapshotResult( - room_id=room_id, - user_id=user1_id, - sender=user1_id, - membership_event_id=state_map[(EventTypes.Member, user1_id)].event_id, - membership=Membership.LEAVE, - event_stream_ordering=state_map[ - (EventTypes.Member, user1_id) - ].internal_metadata.stream_ordering, - has_known_state=True, - room_type=None, - room_name=None, - is_encrypted=False, - tombstone_successor_room_id=None, - # Room is *not* forgotten because of our test preparation - forgotten=False, - ) - self.assertEqual( - sliding_sync_membership_snapshots_results.get((room_id, user1_id)), - user1_snapshot, - ) - user2_snapshot = _SlidingSyncMembershipSnapshotResult( - room_id=room_id, - user_id=user2_id, - sender=user2_id, - membership_event_id=state_map[(EventTypes.Member, user2_id)].event_id, - membership=Membership.JOIN, - event_stream_ordering=state_map[ - (EventTypes.Member, user2_id) - ].internal_metadata.stream_ordering, - has_known_state=True, - room_type=None, - room_name=None, - is_encrypted=False, - tombstone_successor_room_id=None, - ) - self.assertEqual( - sliding_sync_membership_snapshots_results.get((room_id, user2_id)), - user2_snapshot, - ) - - # Insert and run the background update. - self.get_success( - self.store.db_pool.simple_insert( - "background_updates", - { - "update_name": _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE, - "progress_json": "{}", - }, - ) - ) - self.store.db_pool.updates._all_done = False - self.wait_for_background_updates() - - # Make sure the table is populated - sliding_sync_membership_snapshots_results = ( - self._get_sliding_sync_membership_snapshots() - ) - self.assertIncludes( - set(sliding_sync_membership_snapshots_results.keys()), - { - (room_id, user1_id), - (room_id, user2_id), - }, - exact=True, - ) - # Forgotten status is now updated - self.assertEqual( - sliding_sync_membership_snapshots_results.get((room_id, user1_id)), - attr.evolve(user1_snapshot, forgotten=True), - ) - # Holds the info according to the current state when the user joined - self.assertEqual( - sliding_sync_membership_snapshots_results.get((room_id, user2_id)), - user2_snapshot, - ) - class SlidingSyncTablesCatchUpBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase): """ From 76f7c91e441cd21f518b2fa79903cb4d03caaae3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Sep 2024 13:16:57 +0100 Subject: [PATCH 4/7] Sliding sync: don't fetch room summary for named rooms. (#17683) For rooms with a name we can skip fetching a full room summary, as we don't need to calculate heroes, and instead just fetch the room counts directly. This also changes things to not return counts and heroes for non-joined rooms. For left/banned rooms we were returning zero values anyway, and for invite/knock rooms we don't really want to leak such information (even if some of is included in the stripped state). --- changelog.d/17683.misc | 1 + synapse/handlers/sliding_sync/__init__.py | 78 +++++++++---------- synapse/storage/_base.py | 2 + synapse/storage/databases/main/roommember.py | 37 ++++++--- synapse/storage/databases/main/state.py | 8 ++ .../main/delta/87/03_current_state_index.sql | 19 +++++ .../client/sliding_sync/test_rooms_meta.py | 52 ++++++------- 7 files changed, 121 insertions(+), 76 deletions(-) create mode 100644 changelog.d/17683.misc create mode 100644 synapse/storage/schema/main/delta/87/03_current_state_index.sql diff --git a/changelog.d/17683.misc b/changelog.d/17683.misc new file mode 100644 index 0000000000..11a10ff854 --- /dev/null +++ b/changelog.d/17683.misc @@ -0,0 +1 @@ +Speed up sliding sync by reducing amount of data pulled out of the database for large rooms. diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py index b097ac57a2..04493494a6 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py @@ -784,32 +784,10 @@ class SlidingSyncHandler: ): avatar_changed = True + # We only need the room summary for calculating heroes, however if we do + # fetch it then we can use it to calculate `joined_count` and + # `invited_count`. room_membership_summary: Optional[Mapping[str, MemberSummary]] = None - empty_membership_summary = MemberSummary([], 0) - # We need the room summary for: - # - Always for initial syncs (or the first time we send down the room) - # - When the room has no name, we need `heroes` - # - When the membership has changed so we need to give updated `heroes` and - # `joined_count`/`invited_count`. - # - # Ideally, instead of just looking at `name_changed`, we'd check if the room - # name is not set but this is a good enough approximation that saves us from - # having to pull out the full event. This just means, we're generating the - # summary whenever the room name changes instead of only when it changes to - # `None`. - if initial or name_changed or membership_changed: - # We can't trace the function directly because it's cached and the `@cached` - # decorator doesn't mix with `@trace` yet. - with start_active_span("get_room_summary"): - if room_membership_for_user_at_to_token.membership in ( - Membership.LEAVE, - Membership.BAN, - ): - # TODO: Figure out how to get the membership summary for left/banned rooms - room_membership_summary = {} - else: - room_membership_summary = await self.store.get_room_summary(room_id) - # TODO: Reverse/rewind back to the `to_token` # `heroes` are required if the room name is not set. # @@ -828,11 +806,45 @@ class SlidingSyncHandler: # get them on initial syncs (or the first time we send down the room) or if the # membership has changed which may change the heroes. if name_event_id is None and (initial or (not initial and membership_changed)): - assert room_membership_summary is not None + # We need the room summary to extract the heroes from + if room_membership_for_user_at_to_token.membership != Membership.JOIN: + # TODO: Figure out how to get the membership summary for left/banned rooms + # For invite/knock rooms we don't include the information. + room_membership_summary = {} + else: + room_membership_summary = await self.store.get_room_summary(room_id) + # TODO: Reverse/rewind back to the `to_token` + hero_user_ids = extract_heroes_from_room_summary( room_membership_summary, me=user.to_string() ) + # Fetch the membership counts for rooms we're joined to. + # + # Similarly to other metadata, we only need to calculate the member + # counts if this is an initial sync or the memberships have changed. + joined_count: Optional[int] = None + invited_count: Optional[int] = None + if ( + initial or membership_changed + ) and room_membership_for_user_at_to_token.membership == Membership.JOIN: + # If we have the room summary (because we calculated heroes above) + # then we can simply pull the counts from there. + if room_membership_summary is not None: + empty_membership_summary = MemberSummary([], 0) + + joined_count = room_membership_summary.get( + Membership.JOIN, empty_membership_summary + ).count + + invited_count = room_membership_summary.get( + Membership.INVITE, empty_membership_summary + ).count + else: + member_counts = await self.store.get_member_counts(room_id) + joined_count = member_counts.get(Membership.JOIN, 0) + invited_count = member_counts.get(Membership.INVITE, 0) + # Fetch the `required_state` for the room # # No `required_state` for invite/knock rooms (just `stripped_state`) @@ -1090,20 +1102,6 @@ class SlidingSyncHandler: set_tag(SynapseTags.RESULT_PREFIX + "initial", initial) - joined_count: Optional[int] = None - if initial or membership_changed: - assert room_membership_summary is not None - joined_count = room_membership_summary.get( - Membership.JOIN, empty_membership_summary - ).count - - invited_count: Optional[int] = None - if initial or membership_changed: - assert room_membership_summary is not None - invited_count = room_membership_summary.get( - Membership.INVITE, empty_membership_summary - ).count - return SlidingSyncResult.RoomResult( name=room_name, avatar=room_avatar, diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index d22160b85c..d6deb077c8 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -112,6 +112,7 @@ class SQLBaseStore(metaclass=ABCMeta): self._attempt_to_invalidate_cache( "get_number_joined_users_in_room", (room_id,) ) + self._attempt_to_invalidate_cache("get_member_counts", (room_id,)) self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,)) # There's no easy way of invalidating this cache for just the users @@ -153,6 +154,7 @@ class SQLBaseStore(metaclass=ABCMeta): self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,)) self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,)) self._attempt_to_invalidate_cache("get_number_joined_users_in_room", (room_id,)) + self._attempt_to_invalidate_cache("get_member_counts", (room_id,)) self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,)) self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None) self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 8df760e8a6..db03729cfe 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -312,18 +312,10 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): # We do this all in one transaction to keep the cache small. # FIXME: get rid of this when we have room_stats - # Note, rejected events will have a null membership field, so - # we we manually filter them out. - sql = """ - SELECT count(*), membership FROM current_state_events - WHERE type = 'm.room.member' AND room_id = ? - AND membership IS NOT NULL - GROUP BY membership - """ + counts = self._get_member_counts_txn(txn, room_id) - txn.execute(sql, (room_id,)) res: Dict[str, MemberSummary] = {} - for count, membership in txn: + for membership, count in counts.items(): res.setdefault(membership, MemberSummary([], count)) # Order by membership (joins -> invites -> leave (former insiders) -> @@ -369,6 +361,31 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): "get_room_summary", _get_room_summary_txn ) + @cached() + async def get_member_counts(self, room_id: str) -> Mapping[str, int]: + """Get a mapping of number of users by membership""" + + return await self.db_pool.runInteraction( + "get_member_counts", self._get_member_counts_txn, room_id + ) + + def _get_member_counts_txn( + self, txn: LoggingTransaction, room_id: str + ) -> Dict[str, int]: + """Get a mapping of number of users by membership""" + + # Note, rejected events will have a null membership field, so + # we we manually filter them out. + sql = """ + SELECT count(*), membership FROM current_state_events + WHERE type = 'm.room.member' AND room_id = ? + AND membership IS NOT NULL + GROUP BY membership + """ + + txn.execute(sql, (room_id,)) + return {membership: count for count, membership in txn} + @cached() async def get_number_joined_users_in_room(self, room_id: str) -> int: return await self.db_pool.simple_select_one_onecol( diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index c5caaf56b0..ca31122ad3 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -736,6 +736,7 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore): CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx" EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index" DELETE_CURRENT_STATE_UPDATE_NAME = "delete_old_current_state_events" + MEMBERS_CURRENT_STATE_UPDATE_NAME = "current_state_events_members_room_index" def __init__( self, @@ -764,6 +765,13 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore): self.DELETE_CURRENT_STATE_UPDATE_NAME, self._background_remove_left_rooms, ) + self.db_pool.updates.register_background_index_update( + self.MEMBERS_CURRENT_STATE_UPDATE_NAME, + index_name="current_state_events_members_room_index", + table="current_state_events", + columns=["room_id", "membership"], + where_clause="type='m.room.member'", + ) async def _background_remove_left_rooms( self, progress: JsonDict, batch_size: int diff --git a/synapse/storage/schema/main/delta/87/03_current_state_index.sql b/synapse/storage/schema/main/delta/87/03_current_state_index.sql new file mode 100644 index 0000000000..76b974271c --- /dev/null +++ b/synapse/storage/schema/main/delta/87/03_current_state_index.sql @@ -0,0 +1,19 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2024 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- . + + +-- Add a background updates to add a new index: +-- `current_state_events(room_id, membership) WHERE type = 'm.room.member' +-- This makes counting membership in rooms (for syncs) much faster +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (8701, 'current_state_events_members_room_index', '{}'); diff --git a/tests/rest/client/sliding_sync/test_rooms_meta.py b/tests/rest/client/sliding_sync/test_rooms_meta.py index 6d2742e25f..6dbce7126f 100644 --- a/tests/rest/client/sliding_sync/test_rooms_meta.py +++ b/tests/rest/client/sliding_sync/test_rooms_meta.py @@ -371,14 +371,17 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase): "mxc://UPDATED_DUMMY_MEDIA_ID", response_body["rooms"][room_id1], ) - self.assertEqual( - response_body["rooms"][room_id1]["joined_count"], - 1, + + # We don't give extra room information to invitees + self.assertNotIn( + "joined_count", + response_body["rooms"][room_id1], ) - self.assertEqual( - response_body["rooms"][room_id1]["invited_count"], - 1, + self.assertNotIn( + "invited_count", + response_body["rooms"][room_id1], ) + self.assertIsNone( response_body["rooms"][room_id1].get("is_dm"), ) @@ -450,15 +453,16 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase): "mxc://DUMMY_MEDIA_ID", response_body["rooms"][room_id1], ) - self.assertEqual( - response_body["rooms"][room_id1]["joined_count"], - # FIXME: The actual number should be "1" (user2) but we currently don't - # support this for rooms where the user has left/been banned. - 0, + + # FIXME: We possibly want to return joined and invited counts for rooms + # you're banned form + self.assertNotIn( + "joined_count", + response_body["rooms"][room_id1], ) - self.assertEqual( - response_body["rooms"][room_id1]["invited_count"], - 0, + self.assertNotIn( + "invited_count", + response_body["rooms"][room_id1], ) self.assertIsNone( response_body["rooms"][room_id1].get("is_dm"), @@ -692,19 +696,15 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase): [], ) - self.assertEqual( - response_body["rooms"][room_id1]["joined_count"], - # FIXME: The actual number should be "1" (user2) but we currently don't - # support this for rooms where the user has left/been banned. - 0, + # FIXME: We possibly want to return joined and invited counts for rooms + # you're banned form + self.assertNotIn( + "joined_count", + response_body["rooms"][room_id1], ) - self.assertEqual( - response_body["rooms"][room_id1]["invited_count"], - # We shouldn't see user5 since they were invited after user1 was banned. - # - # FIXME: The actual number should be "1" (user3) but we currently don't - # support this for rooms where the user has left/been banned. - 0, + self.assertNotIn( + "invited_count", + response_body["rooms"][room_id1], ) def test_rooms_meta_heroes_incremental_sync_no_change(self) -> None: From e4a1f271b9365e2dddbb9205b10ebe36eed12e62 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 11 Sep 2024 12:13:54 -0500 Subject: [PATCH 5/7] Sliding Sync: Make sure we get up-to-date information from `get_sliding_sync_rooms_for_user(...)` (#17692) We need to bust the `get_sliding_sync_rooms_for_user` cache when the room encryption is updated and any other field that is used in the query. Follow-up to https://github.com/element-hq/synapse/pull/17630 - Bust cache for membership change (cross-reference `get_rooms_for_user`) - Bust cache for room `encryption` (cross-reference `get_room_encryption`) - Bust cache for `forgotten` (cross-reference `did_forget`/`get_forgotten_rooms_for_user`) --- changelog.d/17692.bugfix | 1 + synapse/storage/_base.py | 1 + synapse/storage/databases/main/cache.py | 14 ++ synapse/storage/databases/main/roommember.py | 9 +- .../client/sliding_sync/test_sliding_sync.py | 191 +++++++++++++----- 5 files changed, 160 insertions(+), 56 deletions(-) create mode 100644 changelog.d/17692.bugfix diff --git a/changelog.d/17692.bugfix b/changelog.d/17692.bugfix new file mode 100644 index 0000000000..84e0754a99 --- /dev/null +++ b/changelog.d/17692.bugfix @@ -0,0 +1 @@ +Make sure we get up-to-date state information when using the new Sliding Sync tables to derive room membership. diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index d6deb077c8..e14d711c76 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -136,6 +136,7 @@ class SQLBaseStore(metaclass=ABCMeta): self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,)) self._attempt_to_invalidate_cache("get_room_type", (room_id,)) self._attempt_to_invalidate_cache("get_room_encryption", (room_id,)) + self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None) def _invalidate_state_caches_all(self, room_id: str) -> None: """Invalidates caches that are based on the current state, but does diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index b0e30daee5..37c865a8e7 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -41,6 +41,7 @@ from synapse.storage.database import ( LoggingDatabaseConnection, LoggingTransaction, ) +from synapse.storage.databases.main.events import SLIDING_SYNC_RELEVANT_STATE_SET from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.util.caches.descriptors import CachedFunction @@ -271,12 +272,20 @@ class CacheInvalidationWorkerStore(SQLBaseStore): self._attempt_to_invalidate_cache( "get_rooms_for_user", (data.state_key,) ) + self._attempt_to_invalidate_cache( + "get_sliding_sync_rooms_for_user", None + ) elif data.type == EventTypes.RoomEncryption: self._attempt_to_invalidate_cache( "get_room_encryption", (data.room_id,) ) elif data.type == EventTypes.Create: self._attempt_to_invalidate_cache("get_room_type", (data.room_id,)) + + if (data.type, data.state_key) in SLIDING_SYNC_RELEVANT_STATE_SET: + self._attempt_to_invalidate_cache( + "get_sliding_sync_rooms_for_user", None + ) elif row.type == EventsStreamAllStateRow.TypeId: assert isinstance(data, EventsStreamAllStateRow) # Similar to the above, but the entire caches are invalidated. This is @@ -285,6 +294,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): self._attempt_to_invalidate_cache("get_rooms_for_user", None) self._attempt_to_invalidate_cache("get_room_type", (data.room_id,)) self._attempt_to_invalidate_cache("get_room_encryption", (data.room_id,)) + self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None) else: raise Exception("Unknown events stream row type %s" % (row.type,)) @@ -365,6 +375,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore): elif etype == EventTypes.RoomEncryption: self._attempt_to_invalidate_cache("get_room_encryption", (room_id,)) + if (etype, state_key) in SLIDING_SYNC_RELEVANT_STATE_SET: + self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None) + if relates_to: self._attempt_to_invalidate_cache( "get_relations_for_event", @@ -477,6 +490,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): self._attempt_to_invalidate_cache( "get_current_hosts_in_room_ordered", (room_id,) ) + self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None) self._attempt_to_invalidate_cache("did_forget", None) self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None) self._attempt_to_invalidate_cache("_get_membership_from_event_id", None) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index db03729cfe..1fc2d7ba1e 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -1365,6 +1365,9 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): self._invalidate_cache_and_stream( txn, self.get_forgotten_rooms_for_user, (user_id,) ) + self._invalidate_cache_and_stream( + txn, self.get_sliding_sync_rooms_for_user, (user_id,) + ) await self.db_pool.runInteraction("forget_membership", f) @@ -1410,6 +1413,10 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): def get_sliding_sync_rooms_for_user_txn( txn: LoggingTransaction, ) -> Dict[str, RoomsForUserSlidingSync]: + # XXX: If you use any new columns that can change (like from + # `sliding_sync_joined_rooms` or `forgotten`), make sure to bust the + # `get_sliding_sync_rooms_for_user` cache in the appropriate places (and add + # tests). sql = """ SELECT m.room_id, m.sender, m.membership, m.membership_event_id, r.room_version, @@ -1432,7 +1439,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): room_version_id=row[4], event_pos=PersistedEventPosition(row[5], row[6]), room_type=row[7], - is_encrypted=row[8], + is_encrypted=bool(row[8]), ) for row in txn } diff --git a/tests/rest/client/sliding_sync/test_sliding_sync.py b/tests/rest/client/sliding_sync/test_sliding_sync.py index 930cb5ef45..9e23dbe522 100644 --- a/tests/rest/client/sliding_sync/test_sliding_sync.py +++ b/tests/rest/client/sliding_sync/test_sliding_sync.py @@ -722,43 +722,37 @@ class SlidingSyncTestCase(SlidingSyncBase): self.helper.join(space_room_id, user1_id, tok=user1_tok) # Make an initial Sliding Sync request - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "all-list": { - "ranges": [[0, 99]], - "required_state": [], - "timeline_limit": 0, - "filters": {}, + sync_body = { + "lists": { + "all-list": { + "ranges": [[0, 99]], + "required_state": [], + "timeline_limit": 0, + "filters": {}, + }, + "foo-list": { + "ranges": [[0, 99]], + "required_state": [], + "timeline_limit": 1, + "filters": { + "is_encrypted": True, + "room_types": [RoomTypes.SPACE], }, - "foo-list": { - "ranges": [[0, 99]], - "required_state": [], - "timeline_limit": 1, - "filters": { - "is_encrypted": True, - "room_types": [RoomTypes.SPACE], - }, - }, - } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) - from_token = channel.json_body["pos"] + }, + } + } + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) # Make sure the response has the lists we requested self.assertListEqual( - list(channel.json_body["lists"].keys()), + list(response_body["lists"].keys()), ["all-list", "foo-list"], - channel.json_body["lists"].keys(), + response_body["lists"].keys(), ) # Make sure the lists have the correct rooms self.assertListEqual( - list(channel.json_body["lists"]["all-list"]["ops"]), + list(response_body["lists"]["all-list"]["ops"]), [ { "op": "SYNC", @@ -768,7 +762,7 @@ class SlidingSyncTestCase(SlidingSyncBase): ], ) self.assertListEqual( - list(channel.json_body["lists"]["foo-list"]["ops"]), + list(response_body["lists"]["foo-list"]["ops"]), [ { "op": "SYNC", @@ -783,35 +777,30 @@ class SlidingSyncTestCase(SlidingSyncBase): self.helper.leave(space_room_id, user2_id, tok=user2_tok) # Make an incremental Sliding Sync request - channel = self.make_request( - "POST", - self.sync_endpoint + f"?pos={from_token}", - { - "lists": { - "all-list": { - "ranges": [[0, 99]], - "required_state": [], - "timeline_limit": 0, - "filters": {}, + sync_body = { + "lists": { + "all-list": { + "ranges": [[0, 99]], + "required_state": [], + "timeline_limit": 0, + "filters": {}, + }, + "foo-list": { + "ranges": [[0, 99]], + "required_state": [], + "timeline_limit": 1, + "filters": { + "is_encrypted": True, + "room_types": [RoomTypes.SPACE], }, - "foo-list": { - "ranges": [[0, 99]], - "required_state": [], - "timeline_limit": 1, - "filters": { - "is_encrypted": True, - "room_types": [RoomTypes.SPACE], - }, - }, - } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + }, + } + } + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) # Make sure the lists have the correct rooms even though we `newly_left` self.assertListEqual( - list(channel.json_body["lists"]["all-list"]["ops"]), + list(response_body["lists"]["all-list"]["ops"]), [ { "op": "SYNC", @@ -821,7 +810,7 @@ class SlidingSyncTestCase(SlidingSyncBase): ], ) self.assertListEqual( - list(channel.json_body["lists"]["foo-list"]["ops"]), + list(response_body["lists"]["foo-list"]["ops"]), [ { "op": "SYNC", @@ -831,6 +820,98 @@ class SlidingSyncTestCase(SlidingSyncBase): ], ) + def test_filter_is_encrypted_up_to_date(self) -> None: + """ + Make sure we get up-to-date `is_encrypted` status for a joined room + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id = self.helper.create_room_as(user1_id, tok=user1_tok) + + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 99]], + "required_state": [], + "timeline_limit": 0, + "filters": { + "is_encrypted": True, + }, + }, + } + } + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + self.assertIncludes( + set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]), + set(), + exact=True, + ) + + # Update the encryption status + self.helper.send_state( + room_id, + EventTypes.RoomEncryption, + {EventContentFields.ENCRYPTION_ALGORITHM: "m.megolm.v1.aes-sha2"}, + tok=user1_tok, + ) + + # We should see the room now because it's encrypted + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + self.assertIncludes( + set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]), + {room_id}, + exact=True, + ) + + def test_forgotten_up_to_date(self) -> None: + """ + Make sure we get up-to-date `forgotten` status for rooms + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id = self.helper.create_room_as(user2_id, tok=user2_tok) + + # User1 is banned from the room (was never in the room) + self.helper.ban(room_id, src=user2_id, targ=user1_id, tok=user2_tok) + + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 99]], + "required_state": [], + "timeline_limit": 0, + "filters": {}, + }, + } + } + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + self.assertIncludes( + set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]), + {room_id}, + exact=True, + ) + + # User1 forgets the room + channel = self.make_request( + "POST", + f"/_matrix/client/r0/rooms/{room_id}/forget", + content={}, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.result) + + # We should no longer see the forgotten room + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + self.assertIncludes( + set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]), + set(), + exact=True, + ) + def test_sort_list(self) -> None: """ Test that the `lists` are sorted by `stream_ordering` From 16af80b8fbdeabcdea222b86c6f6b9da2f794565 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 11 Sep 2024 12:16:24 -0500 Subject: [PATCH 6/7] Sliding Sync: Use Sliding Sync tables for sorting (#17693) Use Sliding Sync tables for sorting (`bulk_get_last_event_pos_in_room_before_stream_ordering(...)` -> `_bulk_get_max_event_pos(...)`) --- changelog.d/17693.misc | 1 + synapse/handlers/sliding_sync/room_lists.py | 71 +++---------------- .../databases/main/events_bg_updates.py | 29 +------- .../storage/databases/main/events_worker.py | 12 ++++ synapse/storage/databases/main/roommember.py | 12 ---- synapse/storage/databases/main/stream.py | 34 +++++++-- synapse/types/storage/__init__.py | 47 ++++++++++++ tests/storage/test_sliding_sync_tables.py | 2 +- 8 files changed, 103 insertions(+), 105 deletions(-) create mode 100644 changelog.d/17693.misc create mode 100644 synapse/types/storage/__init__.py diff --git a/changelog.d/17693.misc b/changelog.d/17693.misc new file mode 100644 index 0000000000..0d20c80916 --- /dev/null +++ b/changelog.d/17693.misc @@ -0,0 +1 @@ +Use Sliding Sync tables as a bulk shortcut for getting the max `event_stream_ordering` of rooms. diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index 165b15c60f..652d05dbe9 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -27,6 +27,7 @@ from typing import ( Set, Tuple, Union, + cast, ) import attr @@ -355,11 +356,18 @@ class SlidingSyncRoomLists: if list_config.ranges: if list_config.ranges == [(0, len(filtered_sync_room_map) - 1)]: # If we are asking for the full range, we don't need to sort the list. - sorted_room_info = list(filtered_sync_room_map.values()) + sorted_room_info: List[RoomsForUserType] = list( + filtered_sync_room_map.values() + ) else: # Sort the list - sorted_room_info = await self.sort_rooms_using_tables( - filtered_sync_room_map, to_token + sorted_room_info = await self.sort_rooms( + # Cast is safe because RoomsForUserSlidingSync is part + # of the `RoomsForUserType` union. Why can't it detect this? + cast( + Dict[str, RoomsForUserType], filtered_sync_room_map + ), + to_token, ) for range in list_config.ranges: @@ -1762,63 +1770,6 @@ class SlidingSyncRoomLists: # Assemble a new sync room map but only with the `filtered_room_id_set` return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set} - @trace - async def sort_rooms_using_tables( - self, - sync_room_map: Mapping[str, RoomsForUserSlidingSync], - to_token: StreamToken, - ) -> List[RoomsForUserSlidingSync]: - """ - Sort by `stream_ordering` of the last event that the user should see in the - room. `stream_ordering` is unique so we get a stable sort. - - Args: - sync_room_map: Dictionary of room IDs to sort along with membership - information in the room at the time of `to_token`. - to_token: We sort based on the events in the room at this token (<= `to_token`) - - Returns: - A sorted list of room IDs by `stream_ordering` along with membership information. - """ - - # Assemble a map of room ID to the `stream_ordering` of the last activity that the - # user should see in the room (<= `to_token`) - last_activity_in_room_map: Dict[str, int] = {} - - for room_id, room_for_user in sync_room_map.items(): - if room_for_user.membership != Membership.JOIN: - # If the user has left/been invited/knocked/been banned from a - # room, they shouldn't see anything past that point. - # - # FIXME: It's possible that people should see beyond this point - # in invited/knocked cases if for example the room has - # `invite`/`world_readable` history visibility, see - # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 - last_activity_in_room_map[room_id] = room_for_user.event_pos.stream - - # For fully-joined rooms, we find the latest activity at/before the - # `to_token`. - joined_room_positions = ( - await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering( - [ - room_id - for room_id, room_for_user in sync_room_map.items() - if room_for_user.membership == Membership.JOIN - ], - to_token.room_key, - ) - ) - - last_activity_in_room_map.update(joined_room_positions) - - return sorted( - sync_room_map.values(), - # Sort by the last activity (stream_ordering) in the room - key=lambda room_info: last_activity_in_room_map[room_info.room_id], - # We want descending order - reverse=True, - ) - @trace async def sort_rooms( self, diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 743200471b..a8723f94bc 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -52,6 +52,7 @@ from synapse.storage.types import Cursor from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES from synapse.types.state import StateFilter +from synapse.types.storage import _BackgroundUpdates from synapse.util import json_encoder from synapse.util.iterutils import batch_iter @@ -76,34 +77,6 @@ _REPLACE_STREAM_ORDERING_SQL_COMMANDS = ( ) -class _BackgroundUpdates: - EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" - EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" - DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities" - POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2" - INDEX_STREAM_ORDERING2 = "index_stream_ordering2" - INDEX_STREAM_ORDERING2_CONTAINS_URL = "index_stream_ordering2_contains_url" - INDEX_STREAM_ORDERING2_ROOM_ORDER = "index_stream_ordering2_room_order" - INDEX_STREAM_ORDERING2_ROOM_STREAM = "index_stream_ordering2_room_stream" - INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts" - REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column" - - EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows" - EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index" - - EVENTS_POPULATE_STATE_KEY_REJECTIONS = "events_populate_state_key_rejections" - - EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index" - - SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE = ( - "sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update" - ) - SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE = "sliding_sync_joined_rooms_bg_update" - SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE = ( - "sliding_sync_membership_snapshots_bg_update" - ) - - @attr.s(slots=True, frozen=True, auto_attribs=True) class _CalculateChainCover: """Return value for _calculate_chain_cover_txn.""" diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index b188f32927..029f4bd87d 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -83,6 +83,7 @@ from synapse.storage.util.id_generators import ( from synapse.storage.util.sequence import build_sequence_generator from synapse.types import JsonDict, get_domain_from_id from synapse.types.state import StateFilter +from synapse.types.storage import _BackgroundUpdates from synapse.util import unwrapFirstError from synapse.util.async_helpers import ObservableDeferred, delay_cancellation from synapse.util.caches.descriptors import cached, cachedList @@ -2465,3 +2466,14 @@ class EventsWorkerStore(SQLBaseStore): ) self.invalidate_get_event_cache_after_txn(txn, event_id) + + async def have_finished_sliding_sync_background_jobs(self) -> bool: + """Return if it's safe to use the sliding sync membership tables.""" + + return await self.db_pool.updates.have_completed_background_updates( + ( + _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE, + _BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE, + _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE, + ) + ) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 1fc2d7ba1e..8bfa6254f3 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -51,7 +51,6 @@ from synapse.storage.database import ( LoggingTransaction, ) from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore -from synapse.storage.databases.main.events_bg_updates import _BackgroundUpdates from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import Sqlite3Engine from synapse.storage.roommember import ( @@ -1449,17 +1448,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): get_sliding_sync_rooms_for_user_txn, ) - async def have_finished_sliding_sync_background_jobs(self) -> bool: - """Return if it's safe to use the sliding sync membership tables.""" - - return await self.db_pool.updates.have_completed_background_updates( - ( - _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE, - _BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE, - _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE, - ) - ) - class RoomMemberBackgroundUpdateStore(SQLBaseStore): def __init__( diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 459436e304..94a7efee73 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -1524,7 +1524,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # majority of rooms will have a latest token from before the min stream # pos. - def bulk_get_max_event_pos_txn( + def bulk_get_max_event_pos_fallback_txn( txn: LoggingTransaction, batched_room_ids: StrCollection ) -> Dict[str, int]: clause, args = make_in_list_sql_clause( @@ -1547,11 +1547,37 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): txn.execute(sql, [max_pos] + args) return {row[0]: row[1] for row in txn} + # It's easier to look at the `sliding_sync_joined_rooms` table and avoid all of + # the joins and sub-queries. + def bulk_get_max_event_pos_from_sliding_sync_tables_txn( + txn: LoggingTransaction, batched_room_ids: StrCollection + ) -> Dict[str, int]: + clause, args = make_in_list_sql_clause( + self.database_engine, "room_id", batched_room_ids + ) + sql = f""" + SELECT room_id, event_stream_ordering + FROM sliding_sync_joined_rooms + WHERE {clause} + ORDER BY event_stream_ordering DESC + """ + txn.execute(sql, args) + return {row[0]: row[1] for row in txn} + recheck_rooms: Set[str] = set() for batched in batch_iter(room_ids, 1000): - batch_results = await self.db_pool.runInteraction( - "_bulk_get_max_event_pos", bulk_get_max_event_pos_txn, batched - ) + if await self.have_finished_sliding_sync_background_jobs(): + batch_results = await self.db_pool.runInteraction( + "bulk_get_max_event_pos_from_sliding_sync_tables_txn", + bulk_get_max_event_pos_from_sliding_sync_tables_txn, + batched, + ) + else: + batch_results = await self.db_pool.runInteraction( + "bulk_get_max_event_pos_fallback_txn", + bulk_get_max_event_pos_fallback_txn, + batched, + ) for room_id, stream_ordering in batch_results.items(): if stream_ordering <= now_token.stream: results.update(batch_results) diff --git a/synapse/types/storage/__init__.py b/synapse/types/storage/__init__.py new file mode 100644 index 0000000000..fae5449bcc --- /dev/null +++ b/synapse/types/storage/__init__.py @@ -0,0 +1,47 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2024 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +# +# Originally licensed under the Apache License, Version 2.0: +# . +# +# [This file includes modifications made by New Vector Limited] +# +# + + +class _BackgroundUpdates: + EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" + EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" + DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities" + POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2" + INDEX_STREAM_ORDERING2 = "index_stream_ordering2" + INDEX_STREAM_ORDERING2_CONTAINS_URL = "index_stream_ordering2_contains_url" + INDEX_STREAM_ORDERING2_ROOM_ORDER = "index_stream_ordering2_room_order" + INDEX_STREAM_ORDERING2_ROOM_STREAM = "index_stream_ordering2_room_stream" + INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts" + REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column" + + EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows" + EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index" + + EVENTS_POPULATE_STATE_KEY_REJECTIONS = "events_populate_state_key_rejections" + + EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index" + + SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE = ( + "sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update" + ) + SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE = "sliding_sync_joined_rooms_bg_update" + SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE = ( + "sliding_sync_membership_snapshots_bg_update" + ) diff --git a/tests/storage/test_sliding_sync_tables.py b/tests/storage/test_sliding_sync_tables.py index 61dccc8077..35917505a4 100644 --- a/tests/storage/test_sliding_sync_tables.py +++ b/tests/storage/test_sliding_sync_tables.py @@ -34,11 +34,11 @@ from synapse.rest.client import login, room from synapse.server import HomeServer from synapse.storage.databases.main.events import DeltaState from synapse.storage.databases.main.events_bg_updates import ( - _BackgroundUpdates, _resolve_stale_data_in_sliding_sync_joined_rooms_table, _resolve_stale_data_in_sliding_sync_membership_snapshots_table, ) from synapse.types import create_requester +from synapse.types.storage import _BackgroundUpdates from synapse.util import Clock from tests.test_utils.event_injection import create_event From ebad618bf0f4a7cd8adb5c65d6025d320387b492 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=89loi=20Rivard?= Date: Wed, 11 Sep 2024 23:01:43 +0200 Subject: [PATCH 7/7] import pydantic objects from the `_pydantic_compat` module (#17667) This PR changes `from pydantic import BaseModel` to `from synapse._pydantic_compat import BaseModel` (as well as `constr`, `conbytes`, `conint`, `confloat`). It allows `check_pydantic_models.py` to mock those pydantic objects only in the synapse module, and not interfere with pydantic objects in external dependencies. This should solve the CI problems for #17144, which breaks because `check_pydantic_models.py` patches pydantic models from [scim2-models](https://scim2-models.readthedocs.io/). /cc @DMRobertson @gotmax23 fixes #17659 ### Pull Request Checklist * [x] Pull request is based on the develop branch * [x] Pull request includes a [changelog file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog). The entry should: - Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.". - Use markdown where necessary, mostly for `code blocks`. - End with either a period (.) or an exclamation mark (!). - Start with a capital letter. - Feel free to credit yourself, by adding a sentence "Contributed by @github_username." or "Contributed by [Your Name]." to the end of the entry. * [x] [Code style](https://element-hq.github.io/synapse/latest/code_style.html) is correct (run the [linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters)) --- changelog.d/17667.misc | 5 ++ scripts-dev/check_pydantic_models.py | 50 +++++------------ synapse/_pydantic_compat.py | 64 +++++++++++++++++++++- synapse/config/_util.py | 10 +--- synapse/config/workers.py | 16 +++--- synapse/events/validator.py | 10 +--- synapse/http/servlet.py | 16 +++--- synapse/rest/admin/users.py | 7 +-- synapse/rest/client/account.py | 8 +-- synapse/rest/client/devices.py | 8 +-- synapse/rest/client/directory.py | 8 +-- synapse/rest/client/reporting.py | 6 +- synapse/rest/key/v2/remote_key_resource.py | 8 +-- synapse/storage/background_updates.py | 7 +-- synapse/types/handlers/sliding_sync.py | 13 ++--- synapse/types/rest/__init__.py | 9 +-- synapse/types/rest/client/__init__.py | 34 ++++-------- tests/rest/client/test_models.py | 8 +-- 18 files changed, 126 insertions(+), 161 deletions(-) create mode 100644 changelog.d/17667.misc diff --git a/changelog.d/17667.misc b/changelog.d/17667.misc new file mode 100644 index 0000000000..6526f283bc --- /dev/null +++ b/changelog.d/17667.misc @@ -0,0 +1,5 @@ +Import pydantic objects from the `_pydantic_compat` module. + +This allows `check_pydantic_models.py` to mock those pydantic objects +only in the synapse module, and not interfere with pydantic objects in +external dependencies. diff --git a/scripts-dev/check_pydantic_models.py b/scripts-dev/check_pydantic_models.py index 26d667aba0..5eb1f0a9df 100755 --- a/scripts-dev/check_pydantic_models.py +++ b/scripts-dev/check_pydantic_models.py @@ -45,7 +45,6 @@ import traceback import unittest.mock from contextlib import contextmanager from typing import ( - TYPE_CHECKING, Any, Callable, Dict, @@ -57,30 +56,17 @@ from typing import ( ) from parameterized import parameterized - -from synapse._pydantic_compat import HAS_PYDANTIC_V2 - -if TYPE_CHECKING or HAS_PYDANTIC_V2: - from pydantic.v1 import ( - BaseModel as PydanticBaseModel, - conbytes, - confloat, - conint, - constr, - ) - from pydantic.v1.typing import get_args -else: - from pydantic import ( - BaseModel as PydanticBaseModel, - conbytes, - confloat, - conint, - constr, - ) - from pydantic.typing import get_args - from typing_extensions import ParamSpec +from synapse._pydantic_compat import ( + BaseModel as PydanticBaseModel, + conbytes, + confloat, + conint, + constr, + get_args, +) + logger = logging.getLogger(__name__) CONSTRAINED_TYPE_FACTORIES_WITH_STRICT_FLAG: List[Callable] = [ @@ -183,22 +169,16 @@ def monkeypatch_pydantic() -> Generator[None, None, None]: # Most Synapse code ought to import the patched objects directly from # `pydantic`. But we also patch their containing modules `pydantic.main` and # `pydantic.types` for completeness. - patch_basemodel1 = unittest.mock.patch( - "pydantic.BaseModel", new=PatchedBaseModel + patch_basemodel = unittest.mock.patch( + "synapse._pydantic_compat.BaseModel", new=PatchedBaseModel ) - patch_basemodel2 = unittest.mock.patch( - "pydantic.main.BaseModel", new=PatchedBaseModel - ) - patches.enter_context(patch_basemodel1) - patches.enter_context(patch_basemodel2) + patches.enter_context(patch_basemodel) for factory in CONSTRAINED_TYPE_FACTORIES_WITH_STRICT_FLAG: wrapper: Callable = make_wrapper(factory) - patch1 = unittest.mock.patch(f"pydantic.{factory.__name__}", new=wrapper) - patch2 = unittest.mock.patch( - f"pydantic.types.{factory.__name__}", new=wrapper + patch = unittest.mock.patch( + f"synapse._pydantic_compat.{factory.__name__}", new=wrapper ) - patches.enter_context(patch1) - patches.enter_context(patch2) + patches.enter_context(patch) yield diff --git a/synapse/_pydantic_compat.py b/synapse/_pydantic_compat.py index a6ceeb04d2..f0eedf5c6d 100644 --- a/synapse/_pydantic_compat.py +++ b/synapse/_pydantic_compat.py @@ -19,6 +19,8 @@ # # +from typing import TYPE_CHECKING + from packaging.version import Version try: @@ -30,4 +32,64 @@ except ImportError: HAS_PYDANTIC_V2: bool = Version(pydantic_version).major == 2 -__all__ = ("HAS_PYDANTIC_V2",) +if TYPE_CHECKING or HAS_PYDANTIC_V2: + from pydantic.v1 import ( + BaseModel, + Extra, + Field, + MissingError, + PydanticValueError, + StrictBool, + StrictInt, + StrictStr, + ValidationError, + conbytes, + confloat, + conint, + constr, + parse_obj_as, + validator, + ) + from pydantic.v1.error_wrappers import ErrorWrapper + from pydantic.v1.typing import get_args +else: + from pydantic import ( + BaseModel, + Extra, + Field, + MissingError, + PydanticValueError, + StrictBool, + StrictInt, + StrictStr, + ValidationError, + conbytes, + confloat, + conint, + constr, + parse_obj_as, + validator, + ) + from pydantic.error_wrappers import ErrorWrapper + from pydantic.typing import get_args + +__all__ = ( + "HAS_PYDANTIC_V2", + "BaseModel", + "constr", + "conbytes", + "conint", + "confloat", + "ErrorWrapper", + "Extra", + "Field", + "get_args", + "MissingError", + "parse_obj_as", + "PydanticValueError", + "StrictBool", + "StrictInt", + "StrictStr", + "ValidationError", + "validator", +) diff --git a/synapse/config/_util.py b/synapse/config/_util.py index 32b906a1ec..731b60a840 100644 --- a/synapse/config/_util.py +++ b/synapse/config/_util.py @@ -18,17 +18,11 @@ # [This file includes modifications made by New Vector Limited] # # -from typing import TYPE_CHECKING, Any, Dict, Type, TypeVar +from typing import Any, Dict, Type, TypeVar import jsonschema -from synapse._pydantic_compat import HAS_PYDANTIC_V2 - -if TYPE_CHECKING or HAS_PYDANTIC_V2: - from pydantic.v1 import BaseModel, ValidationError, parse_obj_as -else: - from pydantic import BaseModel, ValidationError, parse_obj_as - +from synapse._pydantic_compat import BaseModel, ValidationError, parse_obj_as from synapse.config._base import ConfigError from synapse.types import JsonDict, StrSequence diff --git a/synapse/config/workers.py b/synapse/config/workers.py index b013ffa354..ab896be307 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -22,17 +22,17 @@ import argparse import logging -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Union import attr -from synapse._pydantic_compat import HAS_PYDANTIC_V2 - -if TYPE_CHECKING or HAS_PYDANTIC_V2: - from pydantic.v1 import BaseModel, Extra, StrictBool, StrictInt, StrictStr -else: - from pydantic import BaseModel, Extra, StrictBool, StrictInt, StrictStr - +from synapse._pydantic_compat import ( + BaseModel, + Extra, + StrictBool, + StrictInt, + StrictStr, +) from synapse.config._base import ( Config, ConfigError, diff --git a/synapse/events/validator.py b/synapse/events/validator.py index 73b63b77f2..8aa8d7e017 100644 --- a/synapse/events/validator.py +++ b/synapse/events/validator.py @@ -19,17 +19,11 @@ # # import collections.abc -from typing import TYPE_CHECKING, List, Type, Union, cast +from typing import List, Type, Union, cast import jsonschema -from synapse._pydantic_compat import HAS_PYDANTIC_V2 - -if TYPE_CHECKING or HAS_PYDANTIC_V2: - from pydantic.v1 import Field, StrictBool, StrictStr -else: - from pydantic import Field, StrictBool, StrictStr - +from synapse._pydantic_compat import Field, StrictBool, StrictStr from synapse.api.constants import ( MAX_ALIAS_LENGTH, EventContentFields, diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index 08b8ff7afd..0330f1c878 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -37,19 +37,17 @@ from typing import ( overload, ) -from synapse._pydantic_compat import HAS_PYDANTIC_V2 - -if TYPE_CHECKING or HAS_PYDANTIC_V2: - from pydantic.v1 import BaseModel, MissingError, PydanticValueError, ValidationError - from pydantic.v1.error_wrappers import ErrorWrapper -else: - from pydantic import BaseModel, MissingError, PydanticValueError, ValidationError - from pydantic.error_wrappers import ErrorWrapper - from typing_extensions import Literal from twisted.web.server import Request +from synapse._pydantic_compat import ( + BaseModel, + ErrorWrapper, + MissingError, + PydanticValueError, + ValidationError, +) from synapse.api.errors import Codes, SynapseError from synapse.http import redact_uri from synapse.http.server import HttpServer diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index ad515bd5a3..076994b87e 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -27,7 +27,7 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union import attr -from synapse._pydantic_compat import HAS_PYDANTIC_V2 +from synapse._pydantic_compat import StrictBool from synapse.api.constants import Direction, UserTypes from synapse.api.errors import Codes, NotFoundError, SynapseError from synapse.http.servlet import ( @@ -56,11 +56,6 @@ from synapse.types.rest import RequestBodyModel if TYPE_CHECKING: from synapse.server import HomeServer -if TYPE_CHECKING or HAS_PYDANTIC_V2: - from pydantic.v1 import StrictBool -else: - from pydantic import StrictBool - logger = logging.getLogger(__name__) diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py index 8daa449f9e..32fa7b4ec4 100644 --- a/synapse/rest/client/account.py +++ b/synapse/rest/client/account.py @@ -24,18 +24,12 @@ import random from typing import TYPE_CHECKING, List, Optional, Tuple from urllib.parse import urlparse -from synapse._pydantic_compat import HAS_PYDANTIC_V2 - -if TYPE_CHECKING or HAS_PYDANTIC_V2: - from pydantic.v1 import StrictBool, StrictStr, constr -else: - from pydantic import StrictBool, StrictStr, constr - import attr from typing_extensions import Literal from twisted.web.server import Request +from synapse._pydantic_compat import StrictBool, StrictStr, constr from synapse.api.constants import LoginType from synapse.api.errors import ( Codes, diff --git a/synapse/rest/client/devices.py b/synapse/rest/client/devices.py index 8313d687b7..6a45a5d130 100644 --- a/synapse/rest/client/devices.py +++ b/synapse/rest/client/devices.py @@ -24,13 +24,7 @@ import logging from http import HTTPStatus from typing import TYPE_CHECKING, List, Optional, Tuple -from synapse._pydantic_compat import HAS_PYDANTIC_V2 - -if TYPE_CHECKING or HAS_PYDANTIC_V2: - from pydantic.v1 import Extra, StrictStr -else: - from pydantic import Extra, StrictStr - +from synapse._pydantic_compat import Extra, StrictStr from synapse.api import errors from synapse.api.errors import NotFoundError, SynapseError, UnrecognizedRequestError from synapse.handlers.device import DeviceHandler diff --git a/synapse/rest/client/directory.py b/synapse/rest/client/directory.py index 11fdd0f7c6..98ba5c4c2a 100644 --- a/synapse/rest/client/directory.py +++ b/synapse/rest/client/directory.py @@ -22,17 +22,11 @@ import logging from typing import TYPE_CHECKING, List, Optional, Tuple -from synapse._pydantic_compat import HAS_PYDANTIC_V2 - -if TYPE_CHECKING or HAS_PYDANTIC_V2: - from pydantic.v1 import StrictStr -else: - from pydantic import StrictStr - from typing_extensions import Literal from twisted.web.server import Request +from synapse._pydantic_compat import StrictStr from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError from synapse.http.server import HttpServer from synapse.http.servlet import ( diff --git a/synapse/rest/client/reporting.py b/synapse/rest/client/reporting.py index 4eee53e5a8..97bd5d8c02 100644 --- a/synapse/rest/client/reporting.py +++ b/synapse/rest/client/reporting.py @@ -23,7 +23,7 @@ import logging from http import HTTPStatus from typing import TYPE_CHECKING, Tuple -from synapse._pydantic_compat import HAS_PYDANTIC_V2 +from synapse._pydantic_compat import StrictStr from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError from synapse.http.server import HttpServer from synapse.http.servlet import ( @@ -40,10 +40,6 @@ from ._base import client_patterns if TYPE_CHECKING: from synapse.server import HomeServer -if TYPE_CHECKING or HAS_PYDANTIC_V2: - from pydantic.v1 import StrictStr -else: - from pydantic import StrictStr logger = logging.getLogger(__name__) diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py index 3c2028a2ad..fea0b9706d 100644 --- a/synapse/rest/key/v2/remote_key_resource.py +++ b/synapse/rest/key/v2/remote_key_resource.py @@ -23,17 +23,11 @@ import logging import re from typing import TYPE_CHECKING, Dict, Mapping, Optional, Set, Tuple -from synapse._pydantic_compat import HAS_PYDANTIC_V2 - -if TYPE_CHECKING or HAS_PYDANTIC_V2: - from pydantic.v1 import Extra, StrictInt, StrictStr -else: - from pydantic import Extra, StrictInt, StrictStr - from signedjson.sign import sign_json from twisted.web.server import Request +from synapse._pydantic_compat import Extra, StrictInt, StrictStr from synapse.crypto.keyring import ServerKeyFetcher from synapse.http.server import HttpServer from synapse.http.servlet import ( diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index efe4238036..1194b58ffb 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -40,7 +40,7 @@ from typing import ( import attr -from synapse._pydantic_compat import HAS_PYDANTIC_V2 +from synapse._pydantic_compat import BaseModel from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.engines import PostgresEngine from synapse.storage.types import Connection, Cursor @@ -49,11 +49,6 @@ from synapse.util import Clock, json_encoder from . import engines -if TYPE_CHECKING or HAS_PYDANTIC_V2: - from pydantic.v1 import BaseModel -else: - from pydantic import BaseModel - if TYPE_CHECKING: from synapse.server import HomeServer from synapse.storage.database import ( diff --git a/synapse/types/handlers/sliding_sync.py b/synapse/types/handlers/sliding_sync.py index e1b2af7a03..48badeacda 100644 --- a/synapse/types/handlers/sliding_sync.py +++ b/synapse/types/handlers/sliding_sync.py @@ -37,23 +37,20 @@ from typing import ( import attr -from synapse._pydantic_compat import HAS_PYDANTIC_V2 +from synapse._pydantic_compat import Extra from synapse.api.constants import EventTypes -from synapse.types import MultiWriterStreamToken, RoomStreamToken, StrCollection, UserID - -if TYPE_CHECKING or HAS_PYDANTIC_V2: - from pydantic.v1 import Extra -else: - from pydantic import Extra - from synapse.events import EventBase from synapse.types import ( DeviceListUpdates, JsonDict, JsonMapping, + MultiWriterStreamToken, Requester, + RoomStreamToken, SlidingSyncStreamToken, + StrCollection, StreamToken, + UserID, ) from synapse.types.rest.client import SlidingSyncBody diff --git a/synapse/types/rest/__init__.py b/synapse/types/rest/__init__.py index 2b6f5ed35a..183831e79a 100644 --- a/synapse/types/rest/__init__.py +++ b/synapse/types/rest/__init__.py @@ -18,14 +18,7 @@ # [This file includes modifications made by New Vector Limited] # # -from typing import TYPE_CHECKING - -from synapse._pydantic_compat import HAS_PYDANTIC_V2 - -if TYPE_CHECKING or HAS_PYDANTIC_V2: - from pydantic.v1 import BaseModel, Extra -else: - from pydantic import BaseModel, Extra +from synapse._pydantic_compat import BaseModel, Extra class RequestBodyModel(BaseModel): diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py index 9f6fb087c1..c739bd16b0 100644 --- a/synapse/types/rest/client/__init__.py +++ b/synapse/types/rest/client/__init__.py @@ -20,29 +20,15 @@ # from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union -from synapse._pydantic_compat import HAS_PYDANTIC_V2 - -if TYPE_CHECKING or HAS_PYDANTIC_V2: - from pydantic.v1 import ( - Extra, - StrictBool, - StrictInt, - StrictStr, - conint, - constr, - validator, - ) -else: - from pydantic import ( - Extra, - StrictBool, - StrictInt, - StrictStr, - conint, - constr, - validator, - ) - +from synapse._pydantic_compat import ( + Extra, + StrictBool, + StrictInt, + StrictStr, + conint, + constr, + validator, +) from synapse.types.rest import RequestBodyModel from synapse.util.threepids import validate_email @@ -384,7 +370,7 @@ class SlidingSyncBody(RequestBodyModel): receipts: Optional[ReceiptsExtension] = None typing: Optional[TypingExtension] = None - conn_id: Optional[str] + conn_id: Optional[StrictStr] # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884 if TYPE_CHECKING: diff --git a/tests/rest/client/test_models.py b/tests/rest/client/test_models.py index f8a56c80ca..f14585ccac 100644 --- a/tests/rest/client/test_models.py +++ b/tests/rest/client/test_models.py @@ -19,18 +19,12 @@ # # import unittest as stdlib_unittest -from typing import TYPE_CHECKING from typing_extensions import Literal -from synapse._pydantic_compat import HAS_PYDANTIC_V2 +from synapse._pydantic_compat import BaseModel, ValidationError from synapse.types.rest.client import EmailRequestTokenBody -if TYPE_CHECKING or HAS_PYDANTIC_V2: - from pydantic.v1 import BaseModel, ValidationError -else: - from pydantic import BaseModel, ValidationError - class ThreepidMediumEnumTestCase(stdlib_unittest.TestCase): class Model(BaseModel):