Look for previous snapshot

This commit is contained in:
Eric Eastwood 2024-09-06 01:48:39 -05:00
parent b4fcbc9686
commit ce7e1b4e67

View file

@ -2051,7 +2051,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
) )
return 0 return 0
def _find_previous_membership_txn( def _find_previous_invite_knock_membership_txn(
txn: LoggingTransaction, room_id: str, user_id: str, event_id: str txn: LoggingTransaction, room_id: str, user_id: str, event_id: str
) -> Tuple[str, str]: ) -> Tuple[str, str]:
# Find the previous invite/knock event before the leave event # Find the previous invite/knock event before the leave event
@ -2109,9 +2109,19 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
to_insert_membership_snapshots: Dict[ to_insert_membership_snapshots: Dict[
Tuple[str, str], SlidingSyncMembershipSnapshotSharedInsertValues Tuple[str, str], SlidingSyncMembershipSnapshotSharedInsertValues
] = {} ] = {}
# Map from (room_id, user_id) to ...
to_insert_membership_infos: Dict[ to_insert_membership_infos: Dict[
Tuple[str, str], SlidingSyncMembershipInfoWithEventPos Tuple[str, str], SlidingSyncMembershipInfoWithEventPos
] = {} ] = {}
# Map from room_id to ...
# Just some convenience maps for easier lookup by `room_id`.
last_to_insert_membership_snapshots_by_room_id: Dict[
str, SlidingSyncMembershipSnapshotSharedInsertValues
] = {}
last_to_insert_membership_infos_by_room_id: Dict[
str, SlidingSyncMembershipInfoWithEventPos
] = {}
for ( for (
room_id, room_id,
room_id_from_rooms_table, room_id_from_rooms_table,
@ -2153,9 +2163,56 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
}, },
) )
# Check if the current state has been updated since the last snapshot we inserted.
can_use_last_snapshot = False
# TODO: We could also look in the database to see if a snapshot is available
last_to_insert_membership_snapshot = (
last_to_insert_membership_snapshots_by_room_id.get(room_id)
)
last_to_insert_membership_info = (
last_to_insert_membership_infos_by_room_id.get(room_id)
)
if last_to_insert_membership_snapshot is not None:
can_use_last_snapshot = True
# These should go hand-in-hand
assert last_to_insert_membership_info is not None
state_deltas_since_last_snapshot = await self.get_current_state_deltas_for_room(
room_id,
# From the last snapshot we inserted
from_token=RoomStreamToken(
stream=last_to_insert_membership_info.membership_event_stream_ordering,
),
# To our current membership position
to_token=RoomStreamToken(
stream=membership_event_stream_ordering,
),
)
for state_delta in state_deltas_since_last_snapshot:
# We only need to check for the state is relevant to the
# `sliding_sync_joined_rooms` table.
if (
state_delta.event_type,
state_delta.state_key,
) in SLIDING_SYNC_RELEVANT_STATE_SET:
can_use_last_snapshot = False
# Once we find one relevant state event that changed, no need to
# look any further
break
# Map of values to insert/update in the `sliding_sync_membership_snapshots` table # Map of values to insert/update in the `sliding_sync_membership_snapshots` table
sliding_sync_membership_snapshots_insert_map: SlidingSyncMembershipSnapshotSharedInsertValues = {} sliding_sync_membership_snapshots_insert_map: SlidingSyncMembershipSnapshotSharedInsertValues = {}
if membership == Membership.JOIN: # If none of the relevant state has changed since the last snapshot, we can
# just re-use the last snapshot we inserted.
if can_use_last_snapshot:
# Based on the logic above, we should have a last snapshot to work from
assert last_to_insert_membership_snapshot is not None
sliding_sync_membership_snapshots_insert_map = (
last_to_insert_membership_snapshot
)
elif membership == Membership.JOIN:
# If we're still joined, we can pull from current state. # If we're still joined, we can pull from current state.
current_state_ids_map: StateMap[ current_state_ids_map: StateMap[
str str
@ -2240,8 +2297,8 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
invite_or_knock_event_id, invite_or_knock_event_id,
invite_or_knock_membership, invite_or_knock_membership,
) = await self.db_pool.runInteraction( ) = await self.db_pool.runInteraction(
"sliding_sync_membership_snapshots_bg_update._find_previous_membership", "sliding_sync_membership_snapshots_bg_update._find_previous_invite_knock_membership_txn",
_find_previous_membership_txn, _find_previous_invite_knock_membership_txn,
room_id, room_id,
user_id, user_id,
membership_event_id, membership_event_id,
@ -2327,10 +2384,14 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
f"Unexpected membership {membership} ({membership_event_id}) that we don't know how to handle yet" f"Unexpected membership {membership} ({membership_event_id}) that we don't know how to handle yet"
) )
to_insert_membership_snapshots[(room_id, user_id)] = ( to_insert_membership_snapshots[
(room_id, user_id)
] = last_to_insert_membership_snapshots_by_room_id[room_id] = (
sliding_sync_membership_snapshots_insert_map sliding_sync_membership_snapshots_insert_map
) )
to_insert_membership_infos[(room_id, user_id)] = ( to_insert_membership_infos[
(room_id, user_id)
] = last_to_insert_membership_infos_by_room_id[room_id] = (
SlidingSyncMembershipInfoWithEventPos( SlidingSyncMembershipInfoWithEventPos(
user_id=user_id, user_id=user_id,
sender=sender, sender=sender,
@ -2348,7 +2409,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
# #
for key, insert_map in to_insert_membership_snapshots.items(): for key, insert_map in to_insert_membership_snapshots.items():
room_id, user_id = key room_id, user_id = key
membership_info = to_insert_membership_infos[key] membership_info = to_insert_membership_infos[(room_id, user_id)]
sender = membership_info.sender sender = membership_info.sender
membership_event_id = membership_info.membership_event_id membership_event_id = membership_info.membership_event_id
membership = membership_info.membership membership = membership_info.membership