No need to update event_stream_ordering/bump_stamp ON CONFLICT

This commit is contained in:
Eric Eastwood 2024-08-21 14:46:50 -05:00
parent d3f90e4bd8
commit cda92af4a6

View file

@ -1664,7 +1664,11 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
_BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BACKFILL, _BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BACKFILL,
{"last_room_id": room_id}, {"last_room_id": room_id},
) )
# Raising exception so we can just exit and try again # Raising exception so we can just exit and try again. It would
# be hard to resolve this within the transaction because we need
# to get full events out that take redactions into account. We
# could add some retry logic here, but it's easier to just let
# the background update try again.
raise Exception( raise Exception(
"Current state was updated after we gathered it to update " "Current state was updated after we gathered it to update "
+ "`sliding_sync_joined_rooms` in the background update. " + "`sliding_sync_joined_rooms` in the background update. "
@ -1676,8 +1680,13 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
insert_keys = insert_map.keys() insert_keys = insert_map.keys()
insert_values = insert_map.values() insert_values = insert_map.values()
# Since we partially update the `sliding_sync_joined_rooms` as new state # Since we partially update the `sliding_sync_joined_rooms` as new state
# is sent, we need to update the fields `ON CONFLICT`. We just have to be careful # is sent, we need to update the state fields `ON CONFLICT`. We just
# we're not overwriting it with stale data. # have to be careful we're not overwriting it with stale data (see
# `last_current_state_delta_stream_id` check above).
#
# We don't need to update `event_stream_ordering` and `bump_stamp` `ON
# CONFLICT` because if they are present, that means they are already
# up-to-date.
sql = f""" sql = f"""
INSERT INTO sliding_sync_joined_rooms INSERT INTO sliding_sync_joined_rooms
(room_id, event_stream_ordering, bump_stamp, {", ".join(insert_keys)}) (room_id, event_stream_ordering, bump_stamp, {", ".join(insert_keys)})
@ -1687,16 +1696,6 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
) )
ON CONFLICT (room_id) ON CONFLICT (room_id)
DO UPDATE SET DO UPDATE SET
event_stream_ordering = CASE
WHEN event_stream_ordering IS NULL OR event_stream_ordering < EXCLUDED.event_stream_ordering
THEN EXCLUDED.event_stream_ordering
ELSE event_stream_ordering
END,
bump_stamp = CASE
WHEN bump_stamp IS NULL OR bump_stamp < EXCLUDED.bump_stamp
THEN EXCLUDED.bump_stamp
ELSE bump_stamp
END,
{", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)} {", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)}
""" """
args = [room_id, event_stream_ordering, bump_stamp] + list( args = [room_id, event_stream_ordering, bump_stamp] + list(