diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 0d32a3a498..0682308791 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -730,6 +730,30 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # ordered by stream_ordering column. If topological, then we need to # fetch events from one chunk at a time until we hit the limit. + # For backwards compatibility we need to check if the token has a + # topological part but no chunk part. If that's the case we can use the + # stream part to generate an appropriate topological token. + if from_token.chunk is None and from_token.topological is not None: + res = self._simple_select_one_txn( + txn, + table="events", + keyvalues={ + "stream_ordering": from_token.stream, + }, + retcols=( + "chunk_id", + "topological_ordering", + "stream_ordering", + ), + allow_none=True, + ) + if res and res["chunk_id"] is not None: + from_token = RoomStreamToken( + res["chunk_id"], + res["topological_ordering"], + res["stream_ordering"], + ) + # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence # we have a bit of asymmetry when it comes to equalities. @@ -783,13 +807,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): iterated_chunks = [] chunk_id = None - if from_token.chunk: # FIXME: may be topological but no chunk. - if rows: - chunk_id = rows[-1].chunk_id - iterated_chunks = [r.chunk_id for r in rows] - else: - chunk_id = from_token.chunk - iterated_chunks = [chunk_id] + if rows: + chunk_id = rows[-1].chunk_id + iterated_chunks = [r.chunk_id for r in rows] + elif from_token.chunk: + chunk_id = from_token.chunk + iterated_chunks = [chunk_id] table = ChunkDBOrderedListStore( txn, room_id, self.clock,