From 8cdd2d214e9bbf8995ff4920140804ebcea6497e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 Jul 2024 20:30:23 +0100 Subject: [PATCH] Fix bug in sliding sync when using old DB. (#17398) We don't necessarily have `instance_name` for old events (before we support multiple event persisters). We treat those as if the `instance_name` was "master". --------- Co-authored-by: Eric Eastwood --- changelog.d/17398.bugfix | 1 + synapse/storage/_base.py | 6 - synapse/storage/databases/main/cache.py | 10 -- .../storage/databases/main/events_worker.py | 3 +- synapse/storage/databases/main/roommember.py | 67 +--------- synapse/storage/databases/main/stream.py | 33 +++-- tests/handlers/test_sync.py | 1 - tests/replication/storage/test_events.py | 124 +----------------- 8 files changed, 33 insertions(+), 212 deletions(-) create mode 100644 changelog.d/17398.bugfix diff --git a/changelog.d/17398.bugfix b/changelog.d/17398.bugfix new file mode 100644 index 0000000000..7931c431ef --- /dev/null +++ b/changelog.d/17398.bugfix @@ -0,0 +1 @@ +Fix bug in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint when using an old database. diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index b127289d8d..881888fa93 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -119,9 +119,6 @@ class SQLBaseStore(metaclass=ABCMeta): self._attempt_to_invalidate_cache( "get_user_in_room_with_profile", (room_id, user_id) ) - self._attempt_to_invalidate_cache( - "get_rooms_for_user_with_stream_ordering", (user_id,) - ) self._attempt_to_invalidate_cache("get_rooms_for_user", (user_id,)) # Purge other caches based on room state. @@ -148,9 +145,6 @@ class SQLBaseStore(metaclass=ABCMeta): self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,)) self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None) self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None) - self._attempt_to_invalidate_cache( - "get_rooms_for_user_with_stream_ordering", None - ) self._attempt_to_invalidate_cache("get_rooms_for_user", None) self._attempt_to_invalidate_cache("get_room_summary", (room_id,)) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index c6787faea0..2d6b75e47e 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -268,16 +268,12 @@ class CacheInvalidationWorkerStore(SQLBaseStore): self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined] if data.type == EventTypes.Member: - self.get_rooms_for_user_with_stream_ordering.invalidate( # type: ignore[attr-defined] - (data.state_key,) - ) self.get_rooms_for_user.invalidate((data.state_key,)) # type: ignore[attr-defined] elif row.type == EventsStreamAllStateRow.TypeId: assert isinstance(data, EventsStreamAllStateRow) # Similar to the above, but the entire caches are invalidated. This is # unfortunate for the membership caches, but should recover quickly. self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined] - self.get_rooms_for_user_with_stream_ordering.invalidate_all() # type: ignore[attr-defined] self.get_rooms_for_user.invalidate_all() # type: ignore[attr-defined] else: raise Exception("Unknown events stream row type %s" % (row.type,)) @@ -334,9 +330,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore): self._attempt_to_invalidate_cache( "get_invited_rooms_for_local_user", (state_key,) ) - self._attempt_to_invalidate_cache( - "get_rooms_for_user_with_stream_ordering", (state_key,) - ) self._attempt_to_invalidate_cache("get_rooms_for_user", (state_key,)) self._attempt_to_invalidate_cache( @@ -399,9 +392,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore): self._attempt_to_invalidate_cache("get_thread_id", None) self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None) self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None) - self._attempt_to_invalidate_cache( - "get_rooms_for_user_with_stream_ordering", None - ) self._attempt_to_invalidate_cache("get_rooms_for_user", None) self._attempt_to_invalidate_cache("did_forget", None) self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index a5acea8c3b..4d4877c4c3 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1457,7 +1457,8 @@ class EventsWorkerStore(SQLBaseStore): event_dict[event_id] = _EventRow( event_id=event_id, stream_ordering=row[1], - instance_name=row[2], + # If instance_name is null we default to "master" + instance_name=row[2] or "master", internal_metadata=row[3], json=row[4], format_version=row[5], diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index d8b54dc4e3..5d2fd08495 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -50,12 +50,7 @@ from synapse.storage.database import ( from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import Sqlite3Engine -from synapse.storage.roommember import ( - GetRoomsForUserWithStreamOrdering, - MemberSummary, - ProfileInfo, - RoomsForUser, -) +from synapse.storage.roommember import MemberSummary, ProfileInfo, RoomsForUser from synapse.types import ( JsonDict, PersistedEventPosition, @@ -494,7 +489,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): sender=sender, membership=membership, event_id=event_id, - event_pos=PersistedEventPosition(instance_name, stream_ordering), + event_pos=PersistedEventPosition( + # If instance_name is null we default to "master" + instance_name or "master", + stream_ordering, + ), room_version_id=room_version, ) for room_id, sender, membership, event_id, instance_name, stream_ordering, room_version in txn @@ -606,53 +605,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): return results - @cached(max_entries=500000, iterable=True) - async def get_rooms_for_user_with_stream_ordering( - self, user_id: str - ) -> FrozenSet[GetRoomsForUserWithStreamOrdering]: - """Returns a set of room_ids the user is currently joined to. - - If a remote user only returns rooms this server is currently - participating in. - - Args: - user_id - - Returns: - Returns the rooms the user is in currently, along with the stream - ordering of the most recent join for that user and room, along with - the room version of the room. - """ - return await self.db_pool.runInteraction( - "get_rooms_for_user_with_stream_ordering", - self._get_rooms_for_user_with_stream_ordering_txn, - user_id, - ) - - def _get_rooms_for_user_with_stream_ordering_txn( - self, txn: LoggingTransaction, user_id: str - ) -> FrozenSet[GetRoomsForUserWithStreamOrdering]: - # We use `current_state_events` here and not `local_current_membership` - # as a) this gets called with remote users and b) this only gets called - # for rooms the server is participating in. - sql = """ - SELECT room_id, e.instance_name, e.stream_ordering - FROM current_state_events AS c - INNER JOIN events AS e USING (room_id, event_id) - WHERE - c.type = 'm.room.member' - AND c.state_key = ? - AND c.membership = ? - """ - - txn.execute(sql, (user_id, Membership.JOIN)) - return frozenset( - GetRoomsForUserWithStreamOrdering( - room_id, PersistedEventPosition(instance, stream_id) - ) - for room_id, instance, stream_id in txn - ) - async def get_users_server_still_shares_room_with( self, user_ids: Collection[str] ) -> Set[str]: @@ -701,13 +653,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): If a remote user only returns rooms this server is currently participating in. """ - rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate( - (user_id,), - None, - update_metrics=False, - ) - if rooms: - return frozenset(r.room_id for r in rooms) room_ids = await self.db_pool.simple_select_onecol( table="current_state_events", diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index be81025355..e74e0d2e91 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -371,7 +371,7 @@ def _make_generic_sql_bound( def _filter_results( lower_token: Optional[RoomStreamToken], upper_token: Optional[RoomStreamToken], - instance_name: str, + instance_name: Optional[str], topological_ordering: int, stream_ordering: int, ) -> bool: @@ -384,8 +384,14 @@ def _filter_results( position maps, which we handle by fetching more than necessary from the DB and then filtering (rather than attempting to construct a complicated SQL query). + + The `instance_name` arg is optional to handle historic rows, and is + interpreted as if it was "master". """ + if instance_name is None: + instance_name = "master" + event_historical_tuple = ( topological_ordering, stream_ordering, @@ -420,7 +426,7 @@ def _filter_results( def _filter_results_by_stream( lower_token: Optional[RoomStreamToken], upper_token: Optional[RoomStreamToken], - instance_name: str, + instance_name: Optional[str], stream_ordering: int, ) -> bool: """ @@ -436,7 +442,14 @@ def _filter_results_by_stream( position maps, which we handle by fetching more than necessary from the DB and then filtering (rather than attempting to construct a complicated SQL query). + + The `instance_name` arg is optional to handle historic rows, and is + interpreted as if it was "master". """ + + if instance_name is None: + instance_name = "master" + if lower_token: assert lower_token.topological is None @@ -912,7 +925,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): prev_sender, ) in txn: assert room_id is not None - assert instance_name is not None assert stream_ordering is not None if _filter_results_by_stream( @@ -936,7 +948,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # Event event_id=event_id, event_pos=PersistedEventPosition( - instance_name=instance_name, + # If instance_name is null we default to "master" + instance_name=instance_name or "master", stream=stream_ordering, ), # When `s.event_id = null`, we won't be able to get respective @@ -952,13 +965,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): prev_event_id=prev_event_id, prev_event_pos=( PersistedEventPosition( - instance_name=prev_instance_name, + # If instance_name is null we default to "master" + instance_name=prev_instance_name or "master", stream=prev_stream_ordering, ) - if ( - prev_instance_name is not None - and prev_stream_ordering is not None - ) + if (prev_stream_ordering is not None) else None ), prev_membership=prev_membership, @@ -1270,7 +1281,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): stream_ordering=stream_ordering, ): return event_id, PersistedEventPosition( - instance_name, stream_ordering + # If instance_name is null we default to "master" + instance_name or "master", + stream_ordering, ) return None diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index 674dd4fb54..77aafa492e 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -210,7 +210,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): ) # Blow away caches (supported room versions can only change due to a restart). - self.store.get_rooms_for_user_with_stream_ordering.invalidate_all() self.store.get_rooms_for_user.invalidate_all() self.store._get_event_cache.clear() self.store._event_ref.clear() diff --git a/tests/replication/storage/test_events.py b/tests/replication/storage/test_events.py index a56f1e2d5d..1afe523d02 100644 --- a/tests/replication/storage/test_events.py +++ b/tests/replication/storage/test_events.py @@ -30,19 +30,16 @@ from synapse.api.constants import ReceiptTypes from synapse.api.room_versions import RoomVersions from synapse.events import EventBase, make_event_from_dict from synapse.events.snapshot import EventContext -from synapse.handlers.room import RoomEventSource from synapse.server import HomeServer from synapse.storage.databases.main.event_push_actions import ( NotifCounts, RoomNotifCounts, ) from synapse.storage.databases.main.events_worker import EventsWorkerStore -from synapse.storage.roommember import GetRoomsForUserWithStreamOrdering, RoomsForUser +from synapse.storage.roommember import RoomsForUser from synapse.types import PersistedEventPosition from synapse.util import Clock -from tests.server import FakeTransport - from ._base import BaseWorkerStoreTestCase USER_ID = "@feeling:test" @@ -221,125 +218,6 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase): ), ) - def test_get_rooms_for_user_with_stream_ordering(self) -> None: - """Check that the cache on get_rooms_for_user_with_stream_ordering is invalidated - by rows in the events stream - """ - self.persist(type="m.room.create", key="", creator=USER_ID) - self.persist(type="m.room.member", key=USER_ID, membership="join") - self.replicate() - self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set()) - - j2 = self.persist( - type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join" - ) - assert j2.internal_metadata.instance_name is not None - assert j2.internal_metadata.stream_ordering is not None - self.replicate() - - expected_pos = PersistedEventPosition( - j2.internal_metadata.instance_name, j2.internal_metadata.stream_ordering - ) - self.check( - "get_rooms_for_user_with_stream_ordering", - (USER_ID_2,), - {GetRoomsForUserWithStreamOrdering(ROOM_ID, expected_pos)}, - ) - - def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist( - self, - ) -> None: - """Check that current_state invalidation happens correctly with multiple events - in the persistence batch. - - This test attempts to reproduce a race condition between the event persistence - loop and a worker-based Sync handler. - - The problem occurred when the master persisted several events in one batch. It - only updates the current_state at the end of each batch, so the obvious thing - to do is then to issue a current_state_delta stream update corresponding to the - last stream_id in the batch. - - However, that raises the possibility that a worker will see the replication - notification for a join event before the current_state caches are invalidated. - - The test involves: - * creating a join and a message event for a user, and persisting them in the - same batch - - * controlling the replication stream so that updates are sent gradually - - * between each bunch of replication updates, check that we see a consistent - snapshot of the state. - """ - self.persist(type="m.room.create", key="", creator=USER_ID) - self.persist(type="m.room.member", key=USER_ID, membership="join") - self.replicate() - self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set()) - - # limit the replication rate - repl_transport = self._server_transport - assert isinstance(repl_transport, FakeTransport) - repl_transport.autoflush = False - - # build the join and message events and persist them in the same batch. - logger.info("----- build test events ------") - j2, j2ctx = self.build_event( - type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join" - ) - msg, msgctx = self.build_event() - self.get_success(self.persistance.persist_events([(j2, j2ctx), (msg, msgctx)])) - self.replicate() - assert j2.internal_metadata.instance_name is not None - assert j2.internal_metadata.stream_ordering is not None - - event_source = RoomEventSource(self.hs) - event_source.store = self.worker_store - current_token = event_source.get_current_key() - - # gradually stream out the replication - while repl_transport.buffer: - logger.info("------ flush ------") - repl_transport.flush(30) - self.pump(0) - - prev_token = current_token - current_token = event_source.get_current_key() - - # attempt to replicate the behaviour of the sync handler. - # - # First, we get a list of the rooms we are joined to - joined_rooms = self.get_success( - self.worker_store.get_rooms_for_user_with_stream_ordering(USER_ID_2) - ) - - # Then, we get a list of the events since the last sync - membership_changes = self.get_success( - self.worker_store.get_membership_changes_for_user( - USER_ID_2, prev_token, current_token - ) - ) - - logger.info( - "%s->%s: joined_rooms=%r membership_changes=%r", - prev_token, - current_token, - joined_rooms, - membership_changes, - ) - - # the membership change is only any use to us if the room is in the - # joined_rooms list. - if membership_changes: - expected_pos = PersistedEventPosition( - j2.internal_metadata.instance_name, - j2.internal_metadata.stream_ordering, - ) - self.assertEqual( - joined_rooms, - {GetRoomsForUserWithStreamOrdering(ROOM_ID, expected_pos)}, - ) - event_id = 0 def persist(self, backfill: bool = False, **kwargs: Any) -> EventBase: