diff --git a/changelog.d/12319.bugfix b/changelog.d/12319.bugfix new file mode 100644 index 0000000000..a50191feaa --- /dev/null +++ b/changelog.d/12319.bugfix @@ -0,0 +1 @@ +Fix bug with incremental sync missing events when rejoining/backfilling. Contributed by Nick @ Beeper. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 47a63005a9..1b092e900e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -175,17 +175,13 @@ class MessageHandler: state_filter = state_filter or StateFilter.all() if at_token: - # FIXME this claims to get the state at a stream position, but - # get_recent_events_for_room operates by topo ordering. This therefore - # does not reliably give you the state at the given stream position. - # (https://github.com/matrix-org/synapse/issues/3305) - last_events, _ = await self.store.get_recent_events_for_room( - room_id, end_token=at_token.room_key, limit=1 + last_event = await self.store.get_last_event_in_room_before_stream_ordering( + room_id, + end_token=at_token.room_key, ) - if not last_events: + if not last_event: raise NotFoundError("Can't find event for token %s" % (at_token,)) - last_event = last_events[0] # check whether the user is in the room at that time to determine # whether they should be treated as peeking. @@ -204,7 +200,7 @@ class MessageHandler: visible_events = await filter_events_for_client( self.storage, user_id, - last_events, + [last_event], filter_send_to_client=False, is_peeking=is_peeking, ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6c8b17c420..5125126a80 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -661,16 +661,15 @@ class SyncHandler: stream_position: point at which to get state state_filter: The state filter used to fetch state from the database. """ - # FIXME this claims to get the state at a stream position, but - # get_recent_events_for_room operates by topo ordering. This therefore - # does not reliably give you the state at the given stream position. - # (https://github.com/matrix-org/synapse/issues/3305) - last_events, _ = await self.store.get_recent_events_for_room( - room_id, end_token=stream_position.room_key, limit=1 + # FIXME: This gets the state at the latest event before the stream ordering, + # which might not be the same as the "current state" of the room at the time + # of the stream token if there were multiple forward extremities at the time. + last_event = await self.store.get_last_event_in_room_before_stream_ordering( + room_id, + end_token=stream_position.room_key, ) - if last_events: - last_event = last_events[-1] + if last_event: state = await self.get_state_after_event( last_event, state_filter=state_filter or StateFilter.all() ) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 6d45a8a9f6..793e906630 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -758,6 +758,32 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): "get_room_event_before_stream_ordering", _f ) + async def get_last_event_in_room_before_stream_ordering( + self, + room_id: str, + end_token: RoomStreamToken, + ) -> Optional[EventBase]: + """Returns the last event in a room at or before a stream ordering + + Args: + room_id + end_token: The token used to stream from + + Returns: + The most recent event. + """ + + last_row = await self.get_room_event_before_stream_ordering( + room_id=room_id, + stream_ordering=end_token.stream, + ) + if last_row: + _, _, event_id = last_row + event = await self.get_event(event_id, get_prev_content=True) + return event + + return None + async def get_current_room_stream_token_for_room_id( self, room_id: Optional[str] = None ) -> RoomStreamToken: diff --git a/tests/rest/client/test_room_batch.py b/tests/rest/client/test_room_batch.py index 44f333a0ee..41a1bf6d89 100644 --- a/tests/rest/client/test_room_batch.py +++ b/tests/rest/client/test_room_batch.py @@ -7,9 +7,9 @@ from twisted.test.proto_helpers import MemoryReactor from synapse.api.constants import EventContentFields, EventTypes from synapse.appservice import ApplicationService from synapse.rest import admin -from synapse.rest.client import login, register, room, room_batch +from synapse.rest.client import login, register, room, room_batch, sync from synapse.server import HomeServer -from synapse.types import JsonDict +from synapse.types import JsonDict, RoomStreamToken from synapse.util import Clock from tests import unittest @@ -63,6 +63,7 @@ class RoomBatchTestCase(unittest.HomeserverTestCase): room.register_servlets, register.register_servlets, login.register_servlets, + sync.register_servlets, ] def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: @@ -178,3 +179,123 @@ class RoomBatchTestCase(unittest.HomeserverTestCase): "Expected a single state_group to be returned by saw state_groups=%s" % (state_group_map.keys(),), ) + + @unittest.override_config({"experimental_features": {"msc2716_enabled": True}}) + def test_sync_while_batch_importing(self) -> None: + """ + Make sure that /sync correctly returns full room state when a user joins + during ongoing batch backfilling. + See: https://github.com/matrix-org/synapse/issues/12281 + """ + # Create user who will be invited & join room + user_id = self.register_user("beep", "test") + user_tok = self.login("beep", "test") + + time_before_room = int(self.clock.time_msec()) + + # Create a room with some events + room_id, _, _, _ = self._create_test_room() + # Invite the user + self.helper.invite( + room_id, src=self.appservice.sender, tok=self.appservice.token, targ=user_id + ) + + # Create another room, send a bunch of events to advance the stream token + other_room_id = self.helper.create_room_as( + self.appservice.sender, tok=self.appservice.token + ) + for _ in range(5): + self.helper.send_event( + room_id=other_room_id, + type=EventTypes.Message, + content={"msgtype": "m.text", "body": "C"}, + tok=self.appservice.token, + ) + + # Join the room as the normal user + self.helper.join(room_id, user_id, tok=user_tok) + + # Create an event to hang the historical batch from - In order to see + # the failure case originally reported in #12281, the historical batch + # must be hung from the most recent event in the room so the base + # insertion event ends up with the highest `topogological_ordering` + # (`depth`) in the room but will have a negative `stream_ordering` + # because it's a `historical` event. Previously, when assembling the + # `state` for the `/sync` response, the bugged logic would sort by + # `topological_ordering` descending and pick up the base insertion + # event because it has a negative `stream_ordering` below the given + # pagination token. Now we properly sort by `stream_ordering` + # descending which puts `historical` events with a negative + # `stream_ordering` way at the bottom and aren't selected as expected. + response = self.helper.send_event( + room_id=room_id, + type=EventTypes.Message, + content={ + "msgtype": "m.text", + "body": "C", + }, + tok=self.appservice.token, + ) + event_to_hang_id = response["event_id"] + + channel = self.make_request( + "POST", + "/_matrix/client/unstable/org.matrix.msc2716/rooms/%s/batch_send?prev_event_id=%s" + % (room_id, event_to_hang_id), + content={ + "events": _create_message_events_for_batch_send_request( + self.virtual_user_id, time_before_room, 3 + ), + "state_events_at_start": _create_join_state_events_for_batch_send_request( + [self.virtual_user_id], time_before_room + ), + }, + access_token=self.appservice.token, + ) + self.assertEqual(channel.code, 200, channel.result) + + # Now we need to find the invite + join events stream tokens so we can sync between + main_store = self.hs.get_datastores().main + events, next_key = self.get_success( + main_store.get_recent_events_for_room( + room_id, + 50, + end_token=main_store.get_room_max_token(), + ), + ) + invite_event_position = None + for event in events: + if ( + event.type == "m.room.member" + and event.content["membership"] == "invite" + ): + invite_event_position = self.get_success( + main_store.get_topological_token_for_event(event.event_id) + ) + break + + assert invite_event_position is not None, "No invite event found" + + # Remove the topological order from the token by re-creating w/stream only + invite_event_position = RoomStreamToken(None, invite_event_position.stream) + + # Sync everything after this token + since_token = self.get_success(invite_event_position.to_string(main_store)) + sync_response = self.make_request( + "GET", + f"/sync?since={since_token}", + access_token=user_tok, + ) + + # Assert that, for this room, the user was considered to have joined and thus + # receives the full state history + state_event_types = [ + event["type"] + for event in sync_response.json_body["rooms"]["join"][room_id]["state"][ + "events" + ] + ] + + assert ( + "m.room.create" in state_event_types + ), "Missing room full state in sync response"