diff --git a/changelog.d/17636.misc b/changelog.d/17636.misc new file mode 100644 index 0000000000..756918e2b2 --- /dev/null +++ b/changelog.d/17636.misc @@ -0,0 +1 @@ +Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting. diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index f3dbe5bba7..e44b8d8e54 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1861,7 +1861,7 @@ class PersistEventsStore: VALUES ( ?, ?, ?, ?, ?, (SELECT stream_ordering FROM events WHERE event_id = ?), - (SELECT instance_name FROM events WHERE event_id = ?) + (SELECT COALESCE(instance_name, 'master') FROM events WHERE event_id = ?) {("," + ", ".join("?" for _ in sliding_sync_snapshot_values)) if sliding_sync_snapshot_values else ""} ) ON CONFLICT (room_id, user_id) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index cb23f433bc..e819364a16 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -41,6 +41,7 @@ from synapse.storage.databases.main.events import ( SlidingSyncMembershipSnapshotSharedInsertValues, SlidingSyncStateInsertValues, ) +from synapse.storage.databases.main.events_worker import DatabaseCorruptionError from synapse.storage.databases.main.state_deltas import StateDeltasStore from synapse.storage.databases.main.stream import StreamWorkerStore from synapse.storage.types import Cursor @@ -1857,6 +1858,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS initial_phase = True last_room_id = progress.get("last_room_id", "") + last_user_id = progress.get("last_user_id", "") last_event_stream_ordering = progress["last_event_stream_ordering"] def _find_memberships_to_update_txn( @@ -1887,11 +1889,11 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS FROM local_current_membership AS c INNER JOIN events AS e USING (event_id) LEFT JOIN rooms AS r ON (c.room_id = r.room_id) - WHERE c.room_id > ? - ORDER BY c.room_id ASC + WHERE (c.room_id, c.user_id) > (?, ?) + ORDER BY c.room_id ASC, c.user_id ASC LIMIT ? """, - (last_room_id, batch_size), + (last_room_id, last_user_id, batch_size), ) elif last_event_stream_ordering is not None: # It's important to sort by `event_stream_ordering` *ascending* (oldest to @@ -1993,6 +1995,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS WHERE room_id = ? AND m.user_id = ? + AND (m.membership = ? OR m.membership = ?) AND e.event_id != ? ORDER BY e.topological_ordering DESC LIMIT 1 @@ -2000,6 +2003,8 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS ( room_id, user_id, + Membership.INVITE, + Membership.KNOCK, event_id, ), ) @@ -2081,9 +2086,17 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS # have `current_state_events` and we should have some current state # for each room if current_state_ids_map: - fetched_events = await self.get_events( - current_state_ids_map.values() - ) + try: + fetched_events = await self.get_events( + current_state_ids_map.values() + ) + except DatabaseCorruptionError as e: + logger.warning( + "Failed to fetch state for room '%s' due to corrupted events. Ignoring. Error: %s", + room_id, + e, + ) + continue current_state_map: StateMap[EventBase] = { state_key: fetched_events[event_id] @@ -2124,7 +2137,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS False ) elif membership in (Membership.INVITE, Membership.KNOCK) or ( - membership == Membership.LEAVE and is_outlier + membership in (Membership.LEAVE, Membership.BAN) and is_outlier ): invite_or_knock_event_id = membership_event_id invite_or_knock_membership = membership @@ -2135,7 +2148,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS # us a consistent view of the room state regardless of your # membership (i.e. the room shouldn't disappear if your using the # `is_encrypted` filter and you leave). - if membership == Membership.LEAVE and is_outlier: + if membership in (Membership.LEAVE, Membership.BAN) and is_outlier: invite_or_knock_event_id, invite_or_knock_membership = ( await self.db_pool.runInteraction( "sliding_sync_membership_snapshots_bg_update._find_previous_membership", @@ -2182,7 +2195,15 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS await_full_state=False, ) - fetched_events = await self.get_events(state_ids_map.values()) + try: + fetched_events = await self.get_events(state_ids_map.values()) + except DatabaseCorruptionError as e: + logger.warning( + "Failed to fetch state for room '%s' due to corrupted events. Ignoring. Error: %s", + room_id, + e, + ) + continue state_map: StateMap[EventBase] = { state_key: fetched_events[event_id] @@ -2296,7 +2317,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS ( room_id, _room_id_from_rooms_table, - _user_id, + user_id, _sender, _membership_event_id, _membership, @@ -2308,8 +2329,11 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS progress = { "initial_phase": initial_phase, "last_room_id": room_id, - "last_event_stream_ordering": membership_event_stream_ordering, + "last_user_id": user_id, + "last_event_stream_ordering": last_event_stream_ordering, } + if not initial_phase: + progress["last_event_stream_ordering"] = membership_event_stream_ordering await self.db_pool.updates._background_update_progress( _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 6079cc4a52..1d83390827 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -98,6 +98,26 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +class DatabaseCorruptionError(RuntimeError): + """We found an event in the DB that has a persisted event ID that doesn't + match its computed event ID.""" + + def __init__( + self, room_id: str, persisted_event_id: str, computed_event_id: str + ) -> None: + self.room_id = room_id + self.persisted_event_id = persisted_event_id + self.computed_event_id = computed_event_id + + message = ( + f"Database corruption: Event {persisted_event_id} in room {room_id} " + f"from the database appears to have been modified (calculated " + f"event id {computed_event_id})" + ) + + super().__init__(message) + + # These values are used in the `enqueue_event` and `_fetch_loop` methods to # control how we batch/bulk fetch events from the database. # The values are plucked out of thing air to make initial sync run faster @@ -1364,10 +1384,8 @@ class EventsWorkerStore(SQLBaseStore): if original_ev.event_id != event_id: # it's difficult to see what to do here. Pretty much all bets are off # if Synapse cannot rely on the consistency of its database. - raise RuntimeError( - f"Database corruption: Event {event_id} in room {d['room_id']} " - f"from the database appears to have been modified (calculated " - f"event id {original_ev.event_id})" + raise DatabaseCorruptionError( + d["room_id"], event_id, original_ev.event_id ) event_map[event_id] = original_ev