diff --git a/changelog.d/17787.misc b/changelog.d/17787.misc new file mode 100644 index 0000000000..41ac59b348 --- /dev/null +++ b/changelog.d/17787.misc @@ -0,0 +1 @@ +Sliding sync minor performance speed up using new table. diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py index 9fcc68ff25..cb6a0b9f35 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py @@ -49,6 +49,7 @@ from synapse.types import ( Requester, SlidingSyncStreamToken, StateMap, + StrCollection, StreamKeyType, StreamToken, ) @@ -293,7 +294,6 @@ class SlidingSyncHandler: # to record rooms as having updates even if there might not actually # be anything new for the user (e.g. due to event filters, events # having happened after the user left, etc). - unsent_room_ids = [] if from_token: # The set of rooms that the client (may) care about, but aren't # in any list range (or subscribed to). @@ -305,15 +305,24 @@ class SlidingSyncHandler: # TODO: Replace this with something faster. When we land the # sliding sync tables that record the most recent event # positions we can use that. - missing_event_map_by_room = ( - await self.store.get_room_events_stream_for_rooms( - room_ids=missing_rooms, - from_key=to_token.room_key, - to_key=from_token.stream_token.room_key, - limit=1, + unsent_room_ids: StrCollection + if await self.store.have_finished_sliding_sync_background_jobs(): + unsent_room_ids = await ( + self.store.get_rooms_that_have_updates_since_sliding_sync_table( + room_ids=missing_rooms, + from_key=from_token.stream_token.room_key, + ) ) - ) - unsent_room_ids = list(missing_event_map_by_room) + else: + missing_event_map_by_room = ( + await self.store.get_room_events_stream_for_rooms( + room_ids=missing_rooms, + from_key=to_token.room_key, + to_key=from_token.stream_token.room_key, + limit=1, + ) + ) + unsent_room_ids = list(missing_event_map_by_room) new_connection_state.rooms.record_unsent_rooms( unsent_room_ids, from_token.stream_token.room_key diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 964f41ca57..b4258a4436 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -751,6 +751,48 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): if self._events_stream_cache.has_entity_changed(room_id, from_id) } + async def get_rooms_that_have_updates_since_sliding_sync_table( + self, + room_ids: StrCollection, + from_key: RoomStreamToken, + ) -> StrCollection: + """Return the rooms that probably have had updates since the given + token (changes that are > `from_key`).""" + # If the stream change cache is valid for the stream token, we can just + # use the result of that. + if from_key.stream >= self._events_stream_cache.get_earliest_known_position(): + return self._events_stream_cache.get_entities_changed( + room_ids, from_key.stream + ) + + def get_rooms_that_have_updates_since_sliding_sync_table_txn( + txn: LoggingTransaction, + ) -> StrCollection: + sql = """ + SELECT room_id + FROM sliding_sync_joined_rooms + WHERE {clause} + AND event_stream_ordering > ? + """ + + results: Set[str] = set() + for batch in batch_iter(room_ids, 1000): + clause, args = make_in_list_sql_clause( + self.database_engine, "room_id", batch + ) + + args.append(from_key.stream) + txn.execute(sql.format(clause=clause), args) + + results.update(row[0] for row in txn) + + return results + + return await self.db_pool.runInteraction( + "get_rooms_that_have_updates_since_sliding_sync_table", + get_rooms_that_have_updates_since_sliding_sync_table_txn, + ) + async def paginate_room_events_by_stream_ordering( self, *,