Work on snapshots background update

This commit is contained in:
Eric Eastwood 2024-09-06 02:34:07 -05:00
parent ce7e1b4e67
commit 27caa47c50
2 changed files with 101 additions and 54 deletions

View file

@ -137,7 +137,7 @@ class SlidingSyncStateInsertValues(TypedDict, total=False):
"""
room_type: Optional[str]
is_encrypted: Optional[bool]
is_encrypted: bool
room_name: Optional[str]
tombstone_successor_room_id: Optional[str]
@ -150,7 +150,7 @@ class SlidingSyncMembershipSnapshotSharedInsertValues(
multiple memberships
"""
has_known_state: Optional[bool]
has_known_state: bool
@attr.s(slots=True, auto_attribs=True)

View file

@ -1881,10 +1881,9 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
keyvalues={"room_id": room_id},
values={},
insertion_values={
# The reason we're only *inserting* (not *updating*) is because if
# they are present, that means they are already up-to-date.
**update_map,
# The reason we're only *inserting* (not *updating*) `event_stream_ordering`
# and `bump_stamp` is because if they are present, that means they are already
# up-to-date.
"event_stream_ordering": event_stream_ordering,
"bump_stamp": bump_stamp,
},
@ -2122,6 +2121,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
last_to_insert_membership_infos_by_room_id: Dict[
str, SlidingSyncMembershipInfoWithEventPos
] = {}
# TODO: `concurrently_execute` based on buckets of room_ids
for (
room_id,
room_id_from_rooms_table,
@ -2226,6 +2226,9 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
# we only want some non-membership state
await_full_state=False,
)
# TODO: Read the state from the `room_stats_state` table if we can
# We're iterating over rooms that we are joined to so they should
# have `current_state_events` and we should have some current state
# for each room
@ -2404,9 +2407,36 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
)
)
def _fill_table_txn(txn: LoggingTransaction) -> None:
# Handle updating the `sliding_sync_membership_snapshots` table
#
# Assemble data so it's ready for the batch queries in the transaction
key_names = ("room_id", "user_id")
key_values: List[Tuple[str, str]] = []
insertion_value_names = (
"has_known_state",
"room_type",
"is_encrypted",
"room_name",
"tombstone_successor_room_id",
"sender",
"membership_event_id",
"membership",
"event_stream_ordering",
"event_instance_name",
)
insertion_value_values: List[
Tuple[
bool,
Optional[str],
bool,
Optional[str],
Optional[str],
str,
str,
str,
int,
str,
]
] = []
forgotten_update_query_args: List[Tuple[str, str, str]] = []
for key, insert_map in to_insert_membership_snapshots.items():
room_id, user_id = key
membership_info = to_insert_membership_infos[(room_id, user_id)]
@ -2420,42 +2450,59 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
membership_info.membership_event_instance_name
)
# We don't need to upsert the state because we never partially
# insert/update the snapshots and anything already there is up-to-date
# EXCEPT for the `forgotten` field since that is updated out-of-band
# from the membership changes.
key_values.append((room_id, user_id))
insertion_value_values.append(
(
# `has_known_state` should be set to *some* True/False value
insert_map["has_known_state"],
insert_map.get("room_type"),
insert_map.get("is_encrypted", False),
insert_map.get("room_name"),
insert_map.get("tombstone_successor_room_id"),
sender,
membership_event_id,
membership,
membership_event_stream_ordering,
membership_event_instance_name,
)
)
forgotten_update_query_args.append(
(
membership_event_id,
room_id,
user_id,
)
)
def _fill_table_txn(txn: LoggingTransaction) -> None:
# Handle updating the `sliding_sync_membership_snapshots` table
#
# Even though we're only doing insertions, we're using
# `simple_upsert_txn()` here to avoid unique violation errors that would
# happen from `simple_insert_txn()`
self.db_pool.simple_upsert_txn(
# We don't need to update in the upsert the state because we never partially
# insert/update the snapshots and anything already there is up-to-date
# EXCEPT for the `forgotten` field since that is updated out-of-band from
# the membership changes. We're using an upsert to avoid unique
# violation errors that would happen from directly inserting.
self.db_pool.simple_upsert_many_txn(
txn,
table="sliding_sync_membership_snapshots",
keyvalues={"room_id": room_id, "user_id": user_id},
values={},
insertion_values={
**insert_map,
"sender": sender,
"membership_event_id": membership_event_id,
"membership": membership,
"event_stream_ordering": membership_event_stream_ordering,
"event_instance_name": membership_event_instance_name,
},
key_names=key_names,
key_values=key_values,
# TODO: Implement these
insertion_value_names=insertion_value_names,
insertion_value_values=insertion_value_values,
)
# We need to find the `forgotten` value during the transaction because
# we can't risk inserting stale data.
txn.execute(
txn.execute_batch(
"""
UPDATE sliding_sync_membership_snapshots
SET
forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?)
WHERE room_id = ? and user_id = ?
""",
(
membership_event_id,
room_id,
user_id,
),
forgotten_update_query_args,
)
await self.db_pool.runInteraction(