concurrently_execute inserting

This commit is contained in:
Eric Eastwood 2024-09-06 00:48:48 -05:00
parent 5894fc200c
commit b4fcbc9686

View file

@ -1827,88 +1827,90 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
) )
) )
await concurrently_execute( rooms_to_update = [row[0] for row in rooms_to_update_rows]
handle_room, [row[0] for row in rooms_to_update_rows], 10 await concurrently_execute(handle_room, rooms_to_update, 10)
)
def _fill_table_for_room_txn(txn: LoggingTransaction, room_id: str) -> None:
update_map = joined_room_updates[room_id]
joined_room_stream_ordering_update = joined_room_stream_ordering_updates[
room_id
]
event_stream_ordering = (
joined_room_stream_ordering_update.most_recent_event_stream_ordering
)
bump_stamp = joined_room_stream_ordering_update.most_recent_bump_stamp
# Check if the current state has been updated since we gathered it.
# We're being careful not to insert/overwrite with stale data.
state_deltas_since_we_gathered_current_state = (
self.get_current_state_deltas_for_room_txn(
txn,
room_id,
from_token=RoomStreamToken(
stream=most_recent_current_state_delta_stream_id
),
to_token=None,
)
)
for state_delta in state_deltas_since_we_gathered_current_state:
# We only need to check for the state is relevant to the
# `sliding_sync_joined_rooms` table.
if (
state_delta.event_type,
state_delta.state_key,
) in SLIDING_SYNC_RELEVANT_STATE_SET:
# Raising exception so we can just exit and try again. It would
# be hard to resolve this within the transaction because we need
# to get full events out that take redactions into account. We
# could add some retry logic here, but it's easier to just let
# the background update try again.
raise Exception(
"Current state was updated after we gathered it to update "
+ "`sliding_sync_joined_rooms` in the background update. "
+ "Raising exception so we can just try again."
)
def _fill_table_txn(txn: LoggingTransaction) -> None:
# Handle updating the `sliding_sync_joined_rooms` table # Handle updating the `sliding_sync_joined_rooms` table
# #
for ( # Since we fully insert rows into `sliding_sync_joined_rooms`, we can
room_id, # just do everything on insert and `ON CONFLICT DO NOTHING`.
update_map,
) in joined_room_updates.items():
joined_room_stream_ordering_update = (
joined_room_stream_ordering_updates[room_id]
)
event_stream_ordering = (
joined_room_stream_ordering_update.most_recent_event_stream_ordering
)
bump_stamp = joined_room_stream_ordering_update.most_recent_bump_stamp
# Check if the current state has been updated since we gathered it.
# We're being careful not to insert/overwrite with stale data.
state_deltas_since_we_gathered_current_state = (
self.get_current_state_deltas_for_room_txn(
txn,
room_id,
from_token=RoomStreamToken(
stream=most_recent_current_state_delta_stream_id
),
to_token=None,
)
)
for state_delta in state_deltas_since_we_gathered_current_state:
# We only need to check for the state is relevant to the
# `sliding_sync_joined_rooms` table.
if (
state_delta.event_type,
state_delta.state_key,
) in SLIDING_SYNC_RELEVANT_STATE_SET:
# Raising exception so we can just exit and try again. It would
# be hard to resolve this within the transaction because we need
# to get full events out that take redactions into account. We
# could add some retry logic here, but it's easier to just let
# the background update try again.
raise Exception(
"Current state was updated after we gathered it to update "
+ "`sliding_sync_joined_rooms` in the background update. "
+ "Raising exception so we can just try again."
)
# Since we fully insert rows into `sliding_sync_joined_rooms`, we can
# just do everything on insert and `ON CONFLICT DO NOTHING`.
#
self.db_pool.simple_upsert_txn(
txn,
table="sliding_sync_joined_rooms",
keyvalues={"room_id": room_id},
values={},
insertion_values={
**update_map,
# The reason we're only *inserting* (not *updating*) `event_stream_ordering`
# and `bump_stamp` is because if they are present, that means they are already
# up-to-date.
"event_stream_ordering": event_stream_ordering,
"bump_stamp": bump_stamp,
},
)
# Now that we've processed all the room, we can remove them from the
# queue.
# #
# Note: we need to remove all the rooms from the queue we pulled out self.db_pool.simple_upsert_txn(
# from the DB, not just the ones we've processed above. Otherwise
# we'll simply keep pulling out the same rooms over and over again.
self.db_pool.simple_delete_many_batch_txn(
txn, txn,
table="sliding_sync_joined_rooms_to_recalculate", table="sliding_sync_joined_rooms",
keys=("room_id",), keyvalues={"room_id": room_id},
values=rooms_to_update_rows, values={},
insertion_values={
**update_map,
# The reason we're only *inserting* (not *updating*) `event_stream_ordering`
# and `bump_stamp` is because if they are present, that means they are already
# up-to-date.
"event_stream_ordering": event_stream_ordering,
"bump_stamp": bump_stamp,
},
) )
await self.db_pool.runInteraction( async def fill_table_for_room(room_id: str) -> None:
"sliding_sync_joined_rooms_bg_update", _fill_table_txn await self.db_pool.runInteraction(
"sliding_sync_joined_rooms_bg_update", _fill_table_for_room_txn, room_id
)
# Since most of the time spent here is probably just latency to/from the
# database, let's just run this concurrently.
await concurrently_execute(fill_table_for_room, joined_room_updates.keys(), 10)
# Now that we've processed all of the rooms, we can remove them from the
# queue.
#
# Note: we need to remove all the rooms from the queue we pulled out
# from the DB, not just the ones we've processed above. Otherwise
# we'll simply keep pulling out the same rooms over and over again.
await self.db_pool.simple_delete_many(
table="sliding_sync_joined_rooms_to_recalculate",
column="room_id",
iterable=rooms_to_update,
keyvalues={},
desc="_sliding_sync_joined_rooms_bg_update: removing rooms that we processed",
) )
return len(rooms_to_update_rows) return len(rooms_to_update_rows)