diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index bc2cb6b430..940d5570ad 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1827,88 +1827,90 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS ) ) - await concurrently_execute( - handle_room, [row[0] for row in rooms_to_update_rows], 10 - ) + rooms_to_update = [row[0] for row in rooms_to_update_rows] + await concurrently_execute(handle_room, rooms_to_update, 10) + + def _fill_table_for_room_txn(txn: LoggingTransaction, room_id: str) -> None: + update_map = joined_room_updates[room_id] + joined_room_stream_ordering_update = joined_room_stream_ordering_updates[ + room_id + ] + event_stream_ordering = ( + joined_room_stream_ordering_update.most_recent_event_stream_ordering + ) + bump_stamp = joined_room_stream_ordering_update.most_recent_bump_stamp + + # Check if the current state has been updated since we gathered it. + # We're being careful not to insert/overwrite with stale data. + state_deltas_since_we_gathered_current_state = ( + self.get_current_state_deltas_for_room_txn( + txn, + room_id, + from_token=RoomStreamToken( + stream=most_recent_current_state_delta_stream_id + ), + to_token=None, + ) + ) + for state_delta in state_deltas_since_we_gathered_current_state: + # We only need to check for the state is relevant to the + # `sliding_sync_joined_rooms` table. + if ( + state_delta.event_type, + state_delta.state_key, + ) in SLIDING_SYNC_RELEVANT_STATE_SET: + # Raising exception so we can just exit and try again. It would + # be hard to resolve this within the transaction because we need + # to get full events out that take redactions into account. We + # could add some retry logic here, but it's easier to just let + # the background update try again. + raise Exception( + "Current state was updated after we gathered it to update " + + "`sliding_sync_joined_rooms` in the background update. " + + "Raising exception so we can just try again." + ) - def _fill_table_txn(txn: LoggingTransaction) -> None: # Handle updating the `sliding_sync_joined_rooms` table # - for ( - room_id, - update_map, - ) in joined_room_updates.items(): - joined_room_stream_ordering_update = ( - joined_room_stream_ordering_updates[room_id] - ) - event_stream_ordering = ( - joined_room_stream_ordering_update.most_recent_event_stream_ordering - ) - bump_stamp = joined_room_stream_ordering_update.most_recent_bump_stamp - - # Check if the current state has been updated since we gathered it. - # We're being careful not to insert/overwrite with stale data. - state_deltas_since_we_gathered_current_state = ( - self.get_current_state_deltas_for_room_txn( - txn, - room_id, - from_token=RoomStreamToken( - stream=most_recent_current_state_delta_stream_id - ), - to_token=None, - ) - ) - for state_delta in state_deltas_since_we_gathered_current_state: - # We only need to check for the state is relevant to the - # `sliding_sync_joined_rooms` table. - if ( - state_delta.event_type, - state_delta.state_key, - ) in SLIDING_SYNC_RELEVANT_STATE_SET: - # Raising exception so we can just exit and try again. It would - # be hard to resolve this within the transaction because we need - # to get full events out that take redactions into account. We - # could add some retry logic here, but it's easier to just let - # the background update try again. - raise Exception( - "Current state was updated after we gathered it to update " - + "`sliding_sync_joined_rooms` in the background update. " - + "Raising exception so we can just try again." - ) - - # Since we fully insert rows into `sliding_sync_joined_rooms`, we can - # just do everything on insert and `ON CONFLICT DO NOTHING`. - # - self.db_pool.simple_upsert_txn( - txn, - table="sliding_sync_joined_rooms", - keyvalues={"room_id": room_id}, - values={}, - insertion_values={ - **update_map, - # The reason we're only *inserting* (not *updating*) `event_stream_ordering` - # and `bump_stamp` is because if they are present, that means they are already - # up-to-date. - "event_stream_ordering": event_stream_ordering, - "bump_stamp": bump_stamp, - }, - ) - - # Now that we've processed all the room, we can remove them from the - # queue. + # Since we fully insert rows into `sliding_sync_joined_rooms`, we can + # just do everything on insert and `ON CONFLICT DO NOTHING`. # - # Note: we need to remove all the rooms from the queue we pulled out - # from the DB, not just the ones we've processed above. Otherwise - # we'll simply keep pulling out the same rooms over and over again. - self.db_pool.simple_delete_many_batch_txn( + self.db_pool.simple_upsert_txn( txn, - table="sliding_sync_joined_rooms_to_recalculate", - keys=("room_id",), - values=rooms_to_update_rows, + table="sliding_sync_joined_rooms", + keyvalues={"room_id": room_id}, + values={}, + insertion_values={ + **update_map, + # The reason we're only *inserting* (not *updating*) `event_stream_ordering` + # and `bump_stamp` is because if they are present, that means they are already + # up-to-date. + "event_stream_ordering": event_stream_ordering, + "bump_stamp": bump_stamp, + }, ) - await self.db_pool.runInteraction( - "sliding_sync_joined_rooms_bg_update", _fill_table_txn + async def fill_table_for_room(room_id: str) -> None: + await self.db_pool.runInteraction( + "sliding_sync_joined_rooms_bg_update", _fill_table_for_room_txn, room_id + ) + + # Since most of the time spent here is probably just latency to/from the + # database, let's just run this concurrently. + await concurrently_execute(fill_table_for_room, joined_room_updates.keys(), 10) + + # Now that we've processed all of the rooms, we can remove them from the + # queue. + # + # Note: we need to remove all the rooms from the queue we pulled out + # from the DB, not just the ones we've processed above. Otherwise + # we'll simply keep pulling out the same rooms over and over again. + await self.db_pool.simple_delete_many( + table="sliding_sync_joined_rooms_to_recalculate", + column="room_id", + iterable=rooms_to_update, + keyvalues={}, + desc="_sliding_sync_joined_rooms_bg_update: removing rooms that we processed", ) return len(rooms_to_update_rows)