mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-22 01:25:44 +03:00
Sliding sync minor performance speed up using new table (#17787)
Use the new tables to work out which rooms have changed. --------- Co-authored-by: Eric Eastwood <eric.eastwood@beta.gouv.fr>
This commit is contained in:
parent
e2610de208
commit
4e90221d87
3 changed files with 61 additions and 9 deletions
1
changelog.d/17787.misc
Normal file
1
changelog.d/17787.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Sliding sync minor performance speed up using new table.
|
|
@ -49,6 +49,7 @@ from synapse.types import (
|
||||||
Requester,
|
Requester,
|
||||||
SlidingSyncStreamToken,
|
SlidingSyncStreamToken,
|
||||||
StateMap,
|
StateMap,
|
||||||
|
StrCollection,
|
||||||
StreamKeyType,
|
StreamKeyType,
|
||||||
StreamToken,
|
StreamToken,
|
||||||
)
|
)
|
||||||
|
@ -293,7 +294,6 @@ class SlidingSyncHandler:
|
||||||
# to record rooms as having updates even if there might not actually
|
# to record rooms as having updates even if there might not actually
|
||||||
# be anything new for the user (e.g. due to event filters, events
|
# be anything new for the user (e.g. due to event filters, events
|
||||||
# having happened after the user left, etc).
|
# having happened after the user left, etc).
|
||||||
unsent_room_ids = []
|
|
||||||
if from_token:
|
if from_token:
|
||||||
# The set of rooms that the client (may) care about, but aren't
|
# The set of rooms that the client (may) care about, but aren't
|
||||||
# in any list range (or subscribed to).
|
# in any list range (or subscribed to).
|
||||||
|
@ -305,15 +305,24 @@ class SlidingSyncHandler:
|
||||||
# TODO: Replace this with something faster. When we land the
|
# TODO: Replace this with something faster. When we land the
|
||||||
# sliding sync tables that record the most recent event
|
# sliding sync tables that record the most recent event
|
||||||
# positions we can use that.
|
# positions we can use that.
|
||||||
missing_event_map_by_room = (
|
unsent_room_ids: StrCollection
|
||||||
await self.store.get_room_events_stream_for_rooms(
|
if await self.store.have_finished_sliding_sync_background_jobs():
|
||||||
room_ids=missing_rooms,
|
unsent_room_ids = await (
|
||||||
from_key=to_token.room_key,
|
self.store.get_rooms_that_have_updates_since_sliding_sync_table(
|
||||||
to_key=from_token.stream_token.room_key,
|
room_ids=missing_rooms,
|
||||||
limit=1,
|
from_key=from_token.stream_token.room_key,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
else:
|
||||||
unsent_room_ids = list(missing_event_map_by_room)
|
missing_event_map_by_room = (
|
||||||
|
await self.store.get_room_events_stream_for_rooms(
|
||||||
|
room_ids=missing_rooms,
|
||||||
|
from_key=to_token.room_key,
|
||||||
|
to_key=from_token.stream_token.room_key,
|
||||||
|
limit=1,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
unsent_room_ids = list(missing_event_map_by_room)
|
||||||
|
|
||||||
new_connection_state.rooms.record_unsent_rooms(
|
new_connection_state.rooms.record_unsent_rooms(
|
||||||
unsent_room_ids, from_token.stream_token.room_key
|
unsent_room_ids, from_token.stream_token.room_key
|
||||||
|
|
|
@ -751,6 +751,48 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
if self._events_stream_cache.has_entity_changed(room_id, from_id)
|
if self._events_stream_cache.has_entity_changed(room_id, from_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async def get_rooms_that_have_updates_since_sliding_sync_table(
|
||||||
|
self,
|
||||||
|
room_ids: StrCollection,
|
||||||
|
from_key: RoomStreamToken,
|
||||||
|
) -> StrCollection:
|
||||||
|
"""Return the rooms that probably have had updates since the given
|
||||||
|
token (changes that are > `from_key`)."""
|
||||||
|
# If the stream change cache is valid for the stream token, we can just
|
||||||
|
# use the result of that.
|
||||||
|
if from_key.stream >= self._events_stream_cache.get_earliest_known_position():
|
||||||
|
return self._events_stream_cache.get_entities_changed(
|
||||||
|
room_ids, from_key.stream
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_rooms_that_have_updates_since_sliding_sync_table_txn(
|
||||||
|
txn: LoggingTransaction,
|
||||||
|
) -> StrCollection:
|
||||||
|
sql = """
|
||||||
|
SELECT room_id
|
||||||
|
FROM sliding_sync_joined_rooms
|
||||||
|
WHERE {clause}
|
||||||
|
AND event_stream_ordering > ?
|
||||||
|
"""
|
||||||
|
|
||||||
|
results: Set[str] = set()
|
||||||
|
for batch in batch_iter(room_ids, 1000):
|
||||||
|
clause, args = make_in_list_sql_clause(
|
||||||
|
self.database_engine, "room_id", batch
|
||||||
|
)
|
||||||
|
|
||||||
|
args.append(from_key.stream)
|
||||||
|
txn.execute(sql.format(clause=clause), args)
|
||||||
|
|
||||||
|
results.update(row[0] for row in txn)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
return await self.db_pool.runInteraction(
|
||||||
|
"get_rooms_that_have_updates_since_sliding_sync_table",
|
||||||
|
get_rooms_that_have_updates_since_sliding_sync_table_txn,
|
||||||
|
)
|
||||||
|
|
||||||
async def paginate_room_events_by_stream_ordering(
|
async def paginate_room_events_by_stream_ordering(
|
||||||
self,
|
self,
|
||||||
*,
|
*,
|
||||||
|
|
Loading…
Reference in a new issue