Move _calculate_sliding_sync_table_changes(...) after we assign stream_ordering to events

See https://github.com/element-hq/synapse/pull/17512#discussion_r1725728637
This commit is contained in:
Eric Eastwood 2024-08-21 16:10:14 -05:00
parent 772c501bb6
commit f6d7ffd9c5
3 changed files with 237 additions and 262 deletions

View file

@ -38,7 +38,6 @@ from typing import (
Iterable,
List,
Optional,
Sequence,
Set,
Tuple,
TypeVar,
@ -65,18 +64,9 @@ from synapse.logging.opentracing import (
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.controllers.state import StateStorageController
from synapse.storage.databases import Databases
from synapse.storage.databases.main.events import (
SLIDING_SYNC_RELEVANT_STATE_SET,
DeltaState,
PersistEventsStore,
SlidingSyncMembershipInfo,
SlidingSyncMembershipSnapshotSharedInsertValues,
SlidingSyncStateInsertValues,
SlidingSyncTableChanges,
)
from synapse.storage.databases.main.events import DeltaState
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
MutableStateMap,
PersistedEventPosition,
RoomStreamToken,
StateMap,
@ -512,8 +502,10 @@ class EventsPersistenceStorageController:
"""
state = await self._calculate_current_state(room_id)
delta = await self._calculate_state_delta(room_id, state)
sliding_sync_table_changes = await self._calculate_sliding_sync_table_changes(
room_id, [], delta
sliding_sync_table_changes = (
await self.persist_events_store._calculate_sliding_sync_table_changes(
room_id, [], delta
)
)
await self.persist_events_store.update_current_state(
@ -619,7 +611,6 @@ class EventsPersistenceStorageController:
new_forward_extremities = None
state_delta_for_room = None
sliding_sync_table_changes = None
if not backfilled:
with Measure(self._clock, "_calculate_state_and_extrem"):
@ -633,14 +624,6 @@ class EventsPersistenceStorageController:
room_id, chunk
)
if state_delta_for_room is not None:
with Measure(self._clock, "_calculate_sliding_sync_table_changes"):
sliding_sync_table_changes = (
await self._calculate_sliding_sync_table_changes(
room_id, chunk, state_delta_for_room
)
)
with Measure(self._clock, "calculate_chain_cover_index_for_events"):
# We now calculate chain ID/sequence numbers for any state events we're
# persisting. We ignore out of band memberships as we're not in the room
@ -660,7 +643,6 @@ class EventsPersistenceStorageController:
use_negative_stream_ordering=backfilled,
inhibit_local_membership_updates=backfilled,
new_event_links=new_event_links,
sliding_sync_table_changes=sliding_sync_table_changes,
)
return replaced_events
@ -776,218 +758,6 @@ class EventsPersistenceStorageController:
return (new_forward_extremities, delta)
async def _calculate_sliding_sync_table_changes(
self,
room_id: str,
events_and_contexts: Sequence[Tuple[EventBase, EventContext]],
delta_state: DeltaState,
) -> SlidingSyncTableChanges:
"""
Calculate the changes to the `sliding_sync_membership_snapshots` and
`sliding_sync_joined_rooms` tables given the deltas that are going to be used to
update the `current_state_events` table.
Just a bunch of pre-processing so we so we don't need to spend time in the
transaction itself gathering all of this info. It's also easier to deal with
redactions outside of a transaction.
Args:
room_id: The room ID currently being processed.
events_and_contexts: List of tuples of (event, context) being persisted.
This is completely optional (you can pass an empty list) and will just
save us from fetching the events from the database if we already have
them.
delta_state: Deltas that are going to be used to update the
`current_state_events` table.
"""
to_insert = delta_state.to_insert
to_delete = delta_state.to_delete
event_map = {event.event_id: event for event, _ in events_and_contexts}
# Handle gathering info for the `sliding_sync_membership_snapshots` table
#
# This would only happen if someone was state reset out of the room
user_ids_to_delete_membership_snapshots = [
state_key
for event_type, state_key in to_delete
if event_type == EventTypes.Member and self.is_mine_id(state_key)
]
membership_snapshot_shared_insert_values: (
SlidingSyncMembershipSnapshotSharedInsertValues
) = {}
membership_infos_to_insert_membership_snapshots: List[
SlidingSyncMembershipInfo
] = []
if to_insert:
membership_event_id_to_user_id_map: Dict[str, str] = {}
for state_key, event_id in to_insert.items():
if state_key[0] == EventTypes.Member and self.is_mine_id(state_key[1]):
membership_event_id_to_user_id_map[event_id] = state_key[1]
event_id_to_sender_map: Dict[str, str] = {}
# In normal event persist scenarios, we should be able to find the
# membership events in the `events_and_contexts` given to us but it's
# possible a state reset happened which added us to the room without a
# corresponding new membership event (reset back to a previous membership).
missing_membership_event_ids: Set[str] = set()
for membership_event_id in membership_event_id_to_user_id_map.keys():
membership_event = event_map.get(membership_event_id)
if membership_event:
event_id_to_sender_map[membership_event_id] = (
membership_event.sender
)
else:
missing_membership_event_ids.add(membership_event_id)
# Otherwise, we need to find a couple events that we were reset to.
if missing_membership_event_ids:
remaining_event_id_to_sender_map = (
await self.main_store.get_sender_for_event_ids(
missing_membership_event_ids
)
)
# There shouldn't be any missing events
assert (
remaining_event_id_to_sender_map.keys()
== missing_membership_event_ids
), missing_membership_event_ids.difference(
remaining_event_id_to_sender_map.keys()
)
event_id_to_sender_map.update(remaining_event_id_to_sender_map)
membership_infos_to_insert_membership_snapshots = [
SlidingSyncMembershipInfo(
user_id=user_id,
sender=event_id_to_sender_map[membership_event_id],
membership_event_id=membership_event_id,
)
for membership_event_id, user_id in membership_event_id_to_user_id_map.items()
]
if membership_infos_to_insert_membership_snapshots:
current_state_ids_map: MutableStateMap[str] = dict(
await self.main_store.get_partial_filtered_current_state_ids(
room_id,
state_filter=StateFilter.from_types(
SLIDING_SYNC_RELEVANT_STATE_SET
),
)
)
# Since we fetched the current state before we took `to_insert`/`to_delete`
# into account, we need to do a couple fixups.
#
# Update the current_state_map with what we have `to_delete`
for state_key in to_delete:
current_state_ids_map.pop(state_key, None)
# Update the current_state_map with what we have `to_insert`
for state_key, event_id in to_insert.items():
if state_key in SLIDING_SYNC_RELEVANT_STATE_SET:
current_state_ids_map[state_key] = event_id
fetched_events = await self.main_store.get_events(
current_state_ids_map.values()
)
current_state_map: StateMap[EventBase] = {
state_key: fetched_events[event_id]
for state_key, event_id in current_state_ids_map.items()
}
if current_state_map:
state_insert_values = PersistEventsStore._get_sliding_sync_insert_values_from_state_map(
current_state_map
)
membership_snapshot_shared_insert_values.update(state_insert_values)
# We have current state to work from
membership_snapshot_shared_insert_values["has_known_state"] = True
else:
# We don't have any `current_state_events` anymore (previously
# cleared out because of `no_longer_in_room`). This can happen if
# one user is joined and another is invited (some non-join
# membership). If the joined user leaves, we are `no_longer_in_room`
# and `current_state_events` is cleared out. When the invited user
# rejects the invite (leaves the room), we will end up here.
#
# In these cases, we should inherit the meta data from the previous
# snapshot so we shouldn't update any of the state values. When
# using sliding sync filters, this will prevent the room from
# disappearing/appearing just because you left the room.
#
# Ideally, we could additionally assert that we're only here for
# valid non-join membership transitions.
assert delta_state.no_longer_in_room
# Handle gathering info for the `sliding_sync_joined_rooms` table
#
# We only deal with
# updating the state related columns. The
# `event_stream_ordering`/`bump_stamp` are updated elsewhere in the event
# persisting stack (see
# `_update_sliding_sync_tables_with_new_persisted_events_txn()`)
#
joined_room_updates: SlidingSyncStateInsertValues = {}
if not delta_state.no_longer_in_room:
# Look through the items we're going to insert into the current state to see
# if there is anything that we care about and should also update in the
# `sliding_sync_joined_rooms` table.
current_state_ids_map = {}
for state_key, event_id in to_insert.items():
if state_key in SLIDING_SYNC_RELEVANT_STATE_SET:
current_state_ids_map[state_key] = event_id
# Get the full event objects for the current state events
#
# In normal event persist scenarios, we should be able to find the state
# events in the `events_and_contexts` given to us but it's possible a state
# reset happened which that reset back to a previous state.
current_state_map = {}
missing_event_ids: Set[str] = set()
for state_key, event_id in current_state_ids_map.items():
event = event_map.get(event_id)
if event:
current_state_map[state_key] = event
else:
missing_event_ids.add(event_id)
# Otherwise, we need to find a couple events that we were reset to.
if missing_event_ids:
remaining_events = await self.main_store.get_events(
current_state_ids_map.values()
)
# There shouldn't be any missing events
assert (
remaining_events.keys() == missing_event_ids
), missing_event_ids.difference(remaining_events.keys())
for event in remaining_events.values():
current_state_map[(event.type, event.state_key)] = event
joined_room_updates = (
PersistEventsStore._get_sliding_sync_insert_values_from_state_map(
current_state_map
)
)
# If something is being deleted from the state, we need to clear it out
for state_key in to_delete:
if state_key == (EventTypes.Create, ""):
joined_room_updates["room_type"] = None
elif state_key == (EventTypes.RoomEncryption, ""):
joined_room_updates["is_encrypted"] = False
elif state_key == (EventTypes.Name, ""):
joined_room_updates["room_name"] = None
return SlidingSyncTableChanges(
room_id=room_id,
# For `sliding_sync_joined_rooms`
joined_room_updates=joined_room_updates,
# For `sliding_sync_membership_snapshots`
membership_snapshot_shared_insert_values=membership_snapshot_shared_insert_values,
to_insert_membership_snapshots=membership_infos_to_insert_membership_snapshots,
to_delete_membership_snapshots=user_ids_to_delete_membership_snapshots,
)
async def _calculate_new_extremities(
self,
room_id: str,

View file

@ -32,6 +32,7 @@ from typing import (
Iterable,
List,
Optional,
Sequence,
Set,
Tuple,
cast,
@ -75,6 +76,7 @@ from synapse.types import (
get_domain_from_id,
)
from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
from synapse.types.state import StateFilter
from synapse.util import json_encoder
from synapse.util.iterutils import batch_iter, sorted_topologically
from synapse.util.stringutils import non_null_str_or_none
@ -245,7 +247,6 @@ class PersistEventsStore:
new_event_links: Dict[str, NewEventChainLinks],
use_negative_stream_ordering: bool = False,
inhibit_local_membership_updates: bool = False,
sliding_sync_table_changes: Optional[SlidingSyncTableChanges],
) -> None:
"""Persist a set of events alongside updates to the current state and
forward extremities tables.
@ -306,6 +307,14 @@ class PersistEventsStore:
event.internal_metadata.stream_ordering = stream
event.internal_metadata.instance_name = self._instance_name
sliding_sync_table_changes = None
if state_delta_for_room is not None:
sliding_sync_table_changes = (
await self._calculate_sliding_sync_table_changes(
room_id, events_and_contexts, state_delta_for_room
)
)
await self.db_pool.runInteraction(
"persist_events",
self._persist_events_txn,
@ -342,6 +351,218 @@ class PersistEventsStore:
(room_id,), frozenset(new_forward_extremities)
)
async def _calculate_sliding_sync_table_changes(
self,
room_id: str,
events_and_contexts: Sequence[Tuple[EventBase, EventContext]],
delta_state: DeltaState,
) -> SlidingSyncTableChanges:
"""
Calculate the changes to the `sliding_sync_membership_snapshots` and
`sliding_sync_joined_rooms` tables given the deltas that are going to be used to
update the `current_state_events` table.
Just a bunch of pre-processing so we so we don't need to spend time in the
transaction itself gathering all of this info. It's also easier to deal with
redactions outside of a transaction.
Args:
room_id: The room ID currently being processed.
events_and_contexts: List of tuples of (event, context) being persisted.
This is completely optional (you can pass an empty list) and will just
save us from fetching the events from the database if we already have
them.
delta_state: Deltas that are going to be used to update the
`current_state_events` table.
"""
to_insert = delta_state.to_insert
to_delete = delta_state.to_delete
event_map = {event.event_id: event for event, _ in events_and_contexts}
# Handle gathering info for the `sliding_sync_membership_snapshots` table
#
# This would only happen if someone was state reset out of the room
user_ids_to_delete_membership_snapshots = [
state_key
for event_type, state_key in to_delete
if event_type == EventTypes.Member and self.is_mine_id(state_key)
]
membership_snapshot_shared_insert_values: (
SlidingSyncMembershipSnapshotSharedInsertValues
) = {}
membership_infos_to_insert_membership_snapshots: List[
SlidingSyncMembershipInfo
] = []
if to_insert:
membership_event_id_to_user_id_map: Dict[str, str] = {}
for state_key, event_id in to_insert.items():
if state_key[0] == EventTypes.Member and self.is_mine_id(state_key[1]):
membership_event_id_to_user_id_map[event_id] = state_key[1]
event_id_to_sender_map: Dict[str, str] = {}
# In normal event persist scenarios, we should be able to find the
# membership events in the `events_and_contexts` given to us but it's
# possible a state reset happened which added us to the room without a
# corresponding new membership event (reset back to a previous membership).
missing_membership_event_ids: Set[str] = set()
for membership_event_id in membership_event_id_to_user_id_map.keys():
membership_event = event_map.get(membership_event_id)
if membership_event:
event_id_to_sender_map[membership_event_id] = (
membership_event.sender
)
else:
missing_membership_event_ids.add(membership_event_id)
# Otherwise, we need to find a couple events that we were reset to.
if missing_membership_event_ids:
remaining_event_id_to_sender_map = (
await self.store.get_sender_for_event_ids(
missing_membership_event_ids
)
)
# There shouldn't be any missing events
assert (
remaining_event_id_to_sender_map.keys()
== missing_membership_event_ids
), missing_membership_event_ids.difference(
remaining_event_id_to_sender_map.keys()
)
event_id_to_sender_map.update(remaining_event_id_to_sender_map)
membership_infos_to_insert_membership_snapshots = [
SlidingSyncMembershipInfo(
user_id=user_id,
sender=event_id_to_sender_map[membership_event_id],
membership_event_id=membership_event_id,
)
for membership_event_id, user_id in membership_event_id_to_user_id_map.items()
]
if membership_infos_to_insert_membership_snapshots:
current_state_ids_map: MutableStateMap[str] = dict(
await self.store.get_partial_filtered_current_state_ids(
room_id,
state_filter=StateFilter.from_types(
SLIDING_SYNC_RELEVANT_STATE_SET
),
)
)
# Since we fetched the current state before we took `to_insert`/`to_delete`
# into account, we need to do a couple fixups.
#
# Update the current_state_map with what we have `to_delete`
for state_key in to_delete:
current_state_ids_map.pop(state_key, None)
# Update the current_state_map with what we have `to_insert`
for state_key, event_id in to_insert.items():
if state_key in SLIDING_SYNC_RELEVANT_STATE_SET:
current_state_ids_map[state_key] = event_id
fetched_events = await self.store.get_events(
current_state_ids_map.values()
)
current_state_map: StateMap[EventBase] = {
state_key: fetched_events[event_id]
for state_key, event_id in current_state_ids_map.items()
}
if current_state_map:
state_insert_values = PersistEventsStore._get_sliding_sync_insert_values_from_state_map(
current_state_map
)
membership_snapshot_shared_insert_values.update(state_insert_values)
# We have current state to work from
membership_snapshot_shared_insert_values["has_known_state"] = True
else:
# We don't have any `current_state_events` anymore (previously
# cleared out because of `no_longer_in_room`). This can happen if
# one user is joined and another is invited (some non-join
# membership). If the joined user leaves, we are `no_longer_in_room`
# and `current_state_events` is cleared out. When the invited user
# rejects the invite (leaves the room), we will end up here.
#
# In these cases, we should inherit the meta data from the previous
# snapshot so we shouldn't update any of the state values. When
# using sliding sync filters, this will prevent the room from
# disappearing/appearing just because you left the room.
#
# Ideally, we could additionally assert that we're only here for
# valid non-join membership transitions.
assert delta_state.no_longer_in_room
# Handle gathering info for the `sliding_sync_joined_rooms` table
#
# We only deal with
# updating the state related columns. The
# `event_stream_ordering`/`bump_stamp` are updated elsewhere in the event
# persisting stack (see
# `_update_sliding_sync_tables_with_new_persisted_events_txn()`)
#
joined_room_updates: SlidingSyncStateInsertValues = {}
if not delta_state.no_longer_in_room:
# Look through the items we're going to insert into the current state to see
# if there is anything that we care about and should also update in the
# `sliding_sync_joined_rooms` table.
current_state_ids_map = {}
for state_key, event_id in to_insert.items():
if state_key in SLIDING_SYNC_RELEVANT_STATE_SET:
current_state_ids_map[state_key] = event_id
# Get the full event objects for the current state events
#
# In normal event persist scenarios, we should be able to find the state
# events in the `events_and_contexts` given to us but it's possible a state
# reset happened which that reset back to a previous state.
current_state_map = {}
missing_event_ids: Set[str] = set()
for state_key, event_id in current_state_ids_map.items():
event = event_map.get(event_id)
if event:
current_state_map[state_key] = event
else:
missing_event_ids.add(event_id)
# Otherwise, we need to find a couple events that we were reset to.
if missing_event_ids:
remaining_events = await self.store.get_events(
current_state_ids_map.values()
)
# There shouldn't be any missing events
assert (
remaining_events.keys() == missing_event_ids
), missing_event_ids.difference(remaining_events.keys())
for event in remaining_events.values():
current_state_map[(event.type, event.state_key)] = event
joined_room_updates = (
PersistEventsStore._get_sliding_sync_insert_values_from_state_map(
current_state_map
)
)
# If something is being deleted from the state, we need to clear it out
for state_key in to_delete:
if state_key == (EventTypes.Create, ""):
joined_room_updates["room_type"] = None
elif state_key == (EventTypes.RoomEncryption, ""):
joined_room_updates["is_encrypted"] = False
elif state_key == (EventTypes.Name, ""):
joined_room_updates["room_name"] = None
return SlidingSyncTableChanges(
room_id=room_id,
# For `sliding_sync_joined_rooms`
joined_room_updates=joined_room_updates,
# For `sliding_sync_membership_snapshots`
membership_snapshot_shared_insert_values=membership_snapshot_shared_insert_values,
to_insert_membership_snapshots=membership_infos_to_insert_membership_snapshots,
to_delete_membership_snapshots=user_ids_to_delete_membership_snapshots,
)
async def calculate_chain_cover_index_for_events(
self, room_id: str, events: Collection[EventBase]
) -> Dict[str, NewEventChainLinks]:

View file

@ -1432,27 +1432,19 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
)
)
event_chunk = [message_tuple]
delta_state = DeltaState(
# This is the state reset part. We're removing the room name state.
to_delete=[(EventTypes.Name, "")],
to_insert={},
)
assert self.storage_controllers.persistence is not None
sliding_sync_table_changes = self.get_success(
self.storage_controllers.persistence._calculate_sliding_sync_table_changes(
room_id, event_chunk, delta_state
)
)
self.get_success(
self.persist_events_store._persist_events_and_state_updates(
room_id,
event_chunk,
state_delta_for_room=delta_state,
state_delta_for_room=DeltaState(
# This is the state reset part. We're removing the room name state.
to_delete=[(EventTypes.Name, "")],
to_insert={},
),
new_forward_extremities={message_tuple[0].event_id},
use_negative_stream_ordering=False,
inhibit_local_membership_updates=False,
new_event_links={},
sliding_sync_table_changes=sliding_sync_table_changes,
)
)
@ -2680,27 +2672,19 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
)
)
event_chunk = [message_tuple]
delta_state = DeltaState(
# This is the state reset part. We're removing the room name state.
to_delete=[(EventTypes.Member, user1_id)],
to_insert={},
)
assert self.storage_controllers.persistence is not None
sliding_sync_table_changes = self.get_success(
self.storage_controllers.persistence._calculate_sliding_sync_table_changes(
room_id, event_chunk, delta_state
)
)
self.get_success(
self.persist_events_store._persist_events_and_state_updates(
room_id,
event_chunk,
state_delta_for_room=delta_state,
state_delta_for_room=DeltaState(
# This is the state reset part. We're removing the room name state.
to_delete=[(EventTypes.Member, user1_id)],
to_insert={},
),
new_forward_extremities={message_tuple[0].event_id},
use_negative_stream_ordering=False,
inhibit_local_membership_updates=False,
new_event_links={},
sliding_sync_table_changes=sliding_sync_table_changes,
)
)