Use simple_upsert_txn for sliding_sync_joined_rooms in background update

This commit is contained in:
Eric Eastwood 2024-08-22 23:30:00 -05:00
parent f8926d07df
commit b6a7d2bf6c
2 changed files with 17 additions and 27 deletions

View file

@ -1730,12 +1730,11 @@ class PersistEventsStore:
keyvalues={"room_id": room_id},
values=sliding_sync_table_changes.joined_room_updates,
insertion_values={
# The reason we're only *inserting* `event_stream_ordering` here
# is because the column has a `NON NULL` constraint and we need
# *some* answer. If the row already exists, we are trying to
# avoid doing an `UPDATE` and accidentally overwriting the value
# with some stale data since this is just a "best effort" value.
# It's better to just rely on
# The reason we're only *inserting* (not *updating*)
# `event_stream_ordering` here is because the column has a `NON
# NULL` constraint and we need *some* answer. And if the row
# already exists, it already has the correct value and it's
# better to just rely on
# `_update_sliding_sync_tables_with_new_persisted_events_txn()`
# to do the right thing (same for `bump_stamp`).
"event_stream_ordering": sliding_sync_table_changes.joined_room_best_effort_most_recent_stream_ordering

View file

@ -1691,33 +1691,24 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
+ "Raising exception so we can just try again."
)
# Pulling keys/values separately is safe and will produce congruent
# lists
insert_keys = insert_map.keys()
insert_values = insert_map.values()
# Since we partially update the `sliding_sync_joined_rooms` as new state
# is sent, we need to update the state fields `ON CONFLICT`. We just
# have to be careful we're not overwriting it with stale data (see
# `last_current_state_delta_stream_id` check above).
#
# We don't need to update `event_stream_ordering` and `bump_stamp` `ON
# CONFLICT` because if they are present, that means they are already
# up-to-date.
sql = f"""
INSERT INTO sliding_sync_joined_rooms
(room_id, event_stream_ordering, bump_stamp, {", ".join(insert_keys)})
VALUES (
?, ?, ?,
{", ".join("?" for _ in insert_values)}
)
ON CONFLICT (room_id)
DO UPDATE SET
{", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)}
"""
args = [room_id, event_stream_ordering, bump_stamp] + list(
insert_values
self.db_pool.simple_upsert_txn(
txn,
table="sliding_sync_joined_rooms",
keyvalues={"room_id": room_id},
values=insert_map,
insertion_values={
# The reason we're only *inserting* (not *updating*) `event_stream_ordering`
# and `bump_stamp` is because if they are present, that means they are already
# up-to-date.
"event_stream_ordering": event_stream_ordering,
"bump_stamp": bump_stamp,
},
)
txn.execute(sql, args)
# Keep track of the last successful room_id
last_successful_room_id = room_id