Start of moving logic outside of the transaction (pre-process)

This commit is contained in:
Eric Eastwood 2024-08-20 11:10:34 -05:00
parent 574a04a40f
commit 6cc6bdbedf
3 changed files with 250 additions and 104 deletions

View file

@ -49,7 +49,7 @@ from prometheus_client import Counter, Histogram
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.constants import EventTypes, Membership, EventContentFields
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
@ -64,8 +64,16 @@ from synapse.logging.opentracing import (
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.controllers.state import StateStorageController
from synapse.storage.databases import Databases
from synapse.storage.databases.main.events import DeltaState
from synapse.storage.databases.main.events import (
DeltaState,
SlidingSyncTableChanges,
SlidingSyncStateInsertValues,
SlidingSyncSnapshotInsertValues,
)
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.databases.main.events import (
SLIDING_SYNC_RELEVANT_STATE_SET,
)
from synapse.types import (
PersistedEventPosition,
RoomStreamToken,
@ -604,6 +612,7 @@ class EventsPersistenceStorageController:
new_forward_extremities = None
state_delta_for_room = None
sliding_sync_table_changes = None
if not backfilled:
with Measure(self._clock, "_calculate_state_and_extrem"):
@ -617,6 +626,13 @@ class EventsPersistenceStorageController:
room_id, chunk
)
with Measure(self._clock, "_calculate_sliding_sync_table_changes"):
sliding_sync_table_changes = (
await self._calculate_sliding_sync_table_changes(
room_id, chunk, state_delta_for_room
)
)
with Measure(self._clock, "calculate_chain_cover_index_for_events"):
# We now calculate chain ID/sequence numbers for any state events we're
# persisting. We ignore out of band memberships as we're not in the room
@ -636,6 +652,7 @@ class EventsPersistenceStorageController:
use_negative_stream_ordering=backfilled,
inhibit_local_membership_updates=backfilled,
new_event_links=new_event_links,
sliding_sync_table_changes=sliding_sync_table_changes,
)
return replaced_events
@ -751,6 +768,148 @@ class EventsPersistenceStorageController:
return (new_forward_extremities, delta)
async def _calculate_sliding_sync_table_changes(
self,
room_id: str,
events_and_contexts: List[Tuple[EventBase, EventContext]],
delta_state: Optional[DeltaState],
) -> Optional[SlidingSyncTableChanges]:
"""
TODO
"""
to_insert = delta_state.to_insert
to_delete = delta_state.to_delete
# This would only happen if someone was state reset out of the room
to_delete_membership_snapshots = {
(room_id, state_key)
for event_type, state_key in to_delete
if event_type == EventTypes.Member and self.is_mine_id(state_key)
}
membership_snapshot_updates = {}
if to_insert:
membership_event_id_to_user_id_map: Dict[str, str] = {}
for state_key, event_id in to_insert.items():
if state_key[0] == EventTypes.Member and self.is_mine_id(state_key[1]):
membership_event_id_to_user_id_map[event_id] = state_key[1]
if len(membership_event_id_to_user_id_map) > 0:
current_state_ids_map = (
await self.main_store.get_partial_filtered_current_state_ids(
room_id,
state_filter=StateFilter.from_types(
SLIDING_SYNC_RELEVANT_STATE_SET
),
)
)
# Since we fetched the current state before we took `to_insert`/`to_delete`
# into account, we need to do a couple fixups.
#
# Update the current_state_map with what we have `to_delete`
for state_key in to_delete:
current_state_ids_map.pop(state_key, None)
# Update the current_state_map with what we have `to_insert`
for state_key, event_id in to_insert.items():
if state_key in SLIDING_SYNC_RELEVANT_STATE_SET:
current_state_ids_map[state_key] = event_id
event_map = await self.main_store.get_events(
current_state_ids_map.values()
)
current_state_map = {}
for key, event_id in current_state_ids_map.items():
event = event_map.get(event_id)
if event:
current_state_map[key] = event
# Map of values to insert/update in the `sliding_sync_membership_snapshots` table
sliding_sync_insert_values = None
has_known_state = False
if current_state_map:
sliding_sync_insert_values = (
self._get_sliding_sync_insert_values_from_state_map(
current_state_map
)
)
# We have current state to work from
has_known_state = True
else:
# We don't have any `current_state_events` anymore (previously
# cleared out because of `no_longer_in_room`). This can happen if
# one user is joined and another is invited (some non-join
# membership). If the joined user leaves, we are `no_longer_in_room`
# and `current_state_events` is cleared out. When the invited user
# rejects the invite (leaves the room), we will end up here.
#
# In these cases, we should inherit the meta data from the previous
# snapshot (handled by the default `ON CONFLICT ... DO UPDATE SET`).
# When using sliding sync filters, this will prevent the room from
# disappearing/appearing just because you left the room.
#
# Ideally, we could additionally assert that we're only here for
# valid non-join membership transitions.
assert delta_state.no_longer_in_room
membership_snapshot_updates = {
(room_id, user_id): SlidingSyncSnapshotInsertValues(
membership_event_id=membership_event_id,
has_known_state=has_known_state,
)
for membership_event_id, user_id in membership_event_id_to_user_id_map.items()
}
return SlidingSyncTableChanges(
joined_room_updates=TODO,
to_delete_joined_rooms=TODO,
membership_snapshot_updates=membership_snapshot_updates,
to_delete_membership_snapshots=to_delete_membership_snapshots,
)
@classmethod
def _get_sliding_sync_insert_values_from_state_map(
cls, state_map: StateMap[EventBase]
) -> SlidingSyncStateInsertValues:
"""
Extract the relevant state values from the `state_map` needed to insert into the
`sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables.
Returns:
Map from column names (`room_type`, `is_encrypted`, `room_name`) to relevant
state values needed to insert into
the `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables.
"""
# Map of values to insert/update in the `sliding_sync_membership_snapshots` table
sliding_sync_insert_map: Dict[str, Optional[Union[str, bool]]] = {}
# Parse the raw event JSON
for state_key, event in state_map.items():
if state_key == (EventTypes.Create, ""):
room_type = event.content.get(EventContentFields.ROOM_TYPE)
sliding_sync_insert_map["room_type"] = room_type
elif state_key == (EventTypes.RoomEncryption, ""):
encryption_algorithm = event.content.get(
EventContentFields.ENCRYPTION_ALGORITHM
)
is_encrypted = encryption_algorithm is not None
sliding_sync_insert_map["is_encrypted"] = is_encrypted
elif state_key == (EventTypes.Name, ""):
room_name = event.content.get(EventContentFields.ROOM_NAME)
sliding_sync_insert_map["room_name"] = room_name
else:
# We only expect to see events according to the
# `SLIDING_SYNC_RELEVANT_STATE_SET`.
raise AssertionError(
f"Unexpected event (we should not be fetching extra events): {state_key} {event.event_id}"
)
return SlidingSyncStateInsertValues(
room_type=room_type,
is_encrypted=encryption_algorithm,
room_name=room_name,
)
async def _calculate_new_extremities(
self,
room_id: str,

View file

@ -125,6 +125,33 @@ class DeltaState:
return not self.to_delete and not self.to_insert and not self.no_longer_in_room
@attr.s(slots=True, auto_attribs=True)
class SlidingSyncStateInsertValues:
room_type: Optional[str]
is_encrypted: Optional[bool]
room_name: Optional[str]
@attr.s(slots=True, auto_attribs=True)
class SlidingSyncSnapshotInsertValues(SlidingSyncStateInsertValues):
membership_event_id: str
has_known_state: Optional[bool]
# TODO: `sender`
@attr.s(slots=True, auto_attribs=True)
class SlidingSyncTableChanges:
# room_id -> dict to upsert into `sliding_sync_joined_rooms`
joined_room_updates: Dict[str, SlidingSyncStateInsertValues]
# room_ids to delete from `sliding_sync_joined_rooms`
to_delete_joined_rooms: StrCollection
# (room_id, user_id) -> dict to upsert into sliding_sync_membership_snapshots
membership_snapshot_updates: Dict[Tuple[str, str], SlidingSyncSnapshotInsertValues]
# List of (room_id, user_id) to delete from `sliding_sync_membership_snapshots`
to_delete_membership_snapshots: Set[Tuple[str, str]]
@attr.s(slots=True, auto_attribs=True)
class NewEventChainLinks:
"""Information about new auth chain links that need to be added to the DB.
@ -193,6 +220,7 @@ class PersistEventsStore:
new_event_links: Dict[str, NewEventChainLinks],
use_negative_stream_ordering: bool = False,
inhibit_local_membership_updates: bool = False,
sliding_sync_table_changes: Optional[SlidingSyncTableChanges],
) -> None:
"""Persist a set of events alongside updates to the current state and
forward extremities tables.
@ -213,6 +241,7 @@ class PersistEventsStore:
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.
sliding_sync_table_changes: TODO
Returns:
Resolves when the events have been persisted
@ -261,6 +290,7 @@ class PersistEventsStore:
state_delta_for_room=state_delta_for_room,
new_forward_extremities=new_forward_extremities,
new_event_links=new_event_links,
sliding_sync_table_changes=sliding_sync_table_changes,
)
persist_event_counter.inc(len(events_and_contexts))
@ -484,6 +514,7 @@ class PersistEventsStore:
state_delta_for_room: Optional[DeltaState],
new_forward_extremities: Optional[Set[str]],
new_event_links: Dict[str, NewEventChainLinks],
sliding_sync_table_changes: Optional[SlidingSyncTableChanges],
) -> None:
"""Insert some number of room events into the necessary database tables.
@ -507,6 +538,7 @@ class PersistEventsStore:
state_delta_for_room: The current-state delta for the room.
new_forward_extremities: The new forward extremities for the room:
a set of the event ids which are the forward extremities.
sliding_sync_table_changes: TODO
Raises:
PartialStateConflictError: if attempting to persist a partial state event in
@ -617,7 +649,11 @@ class PersistEventsStore:
# NB: This function invalidates all state related caches
if state_delta_for_room:
self._update_current_state_txn(
txn, room_id, state_delta_for_room, min_stream_order
txn,
room_id,
state_delta_for_room,
min_stream_order,
sliding_sync_table_changes,
)
self._update_sliding_sync_tables_with_new_persisted_events_txn(
@ -1179,6 +1215,7 @@ class PersistEventsStore:
room_id: str,
delta_state: DeltaState,
stream_id: int,
sliding_sync_table_changes: Optional[SlidingSyncTableChanges],
) -> None:
to_delete = delta_state.to_delete
to_insert = delta_state.to_insert
@ -1197,15 +1234,11 @@ class PersistEventsStore:
# Handle updating the `sliding_sync_membership_snapshots` table
#
# This would only happen if someone was state reset out of the room
if to_delete:
if sliding_sync_table_changes.to_delete_membership_snapshots:
txn.execute_batch(
"DELETE FROM sliding_sync_membership_snapshots"
" WHERE room_id = ? AND user_id = ?",
(
(room_id, state_key)
for event_type, state_key in to_delete
if event_type == EventTypes.Member and self.is_mine_id(state_key)
),
sliding_sync_table_changes.to_delete_membership_snapshots,
)
# We handle `sliding_sync_membership_snapshots` before `current_state_events` so
@ -1215,59 +1248,13 @@ class PersistEventsStore:
# We do this regardless of whether the server is `no_longer_in_room` or not
# because we still want a row if a local user was just left/kicked or got banned
# from the room.
if to_insert:
membership_event_id_to_user_id_map: Dict[str, str] = {}
for state_key, event_id in to_insert.items():
if state_key[0] == EventTypes.Member and self.is_mine_id(state_key[1]):
membership_event_id_to_user_id_map[event_id] = state_key[1]
if sliding_sync_table_changes.membership_snapshot_updates:
if len(membership_event_id_to_user_id_map) > 0:
current_state_map = (
self._get_relevant_sliding_sync_current_state_event_ids_txn(
txn, room_id
)
)
# Since we fetched the current state before we took `to_insert`/`to_delete`
# into account, we need to do a couple fixups.
#
# Update the current_state_map with what we have `to_delete`
for state_key in to_delete:
current_state_map.pop(state_key, None)
# Update the current_state_map with what we have `to_insert`
for state_key, event_id in to_insert.items():
if state_key in SLIDING_SYNC_RELEVANT_STATE_SET:
current_state_map[state_key] = event_id
# Map of values to insert/update in the `sliding_sync_membership_snapshots` table
sliding_sync_membership_snapshots_insert_map: Dict[
str, Optional[Union[str, bool]]
] = {}
if current_state_map:
sliding_sync_membership_snapshots_insert_map = (
self._get_sliding_sync_insert_values_from_state_map_txn(
txn, current_state_map
)
)
# We have current state to work from
sliding_sync_membership_snapshots_insert_map["has_known_state"] = (
True
)
else:
# We don't have any `current_state_events` anymore (previously
# cleared out because of `no_longer_in_room`). This can happen if
# one user is joined and another is invited (some non-join
# membership). If the joined user leaves, we are `no_longer_in_room`
# and `current_state_events` is cleared out. When the invited user
# rejects the invite (leaves the room), we will end up here.
#
# In these cases, we should inherit the meta data from the previous
# snapshot (handled by the default `ON CONFLICT ... DO UPDATE SET`).
# When using sliding sync filters, this will prevent the room from
# disappearing/appearing just because you left the room.
#
# Ideally, we could additionally assert that we're only here for
# valid non-join membership transitions.
assert delta_state.no_longer_in_room
# TODO
[
getattr(x, attr_name)
for attr_name in ["room_type", "is_encrypted", "room_name"]
]
# Update the `sliding_sync_membership_snapshots` table
#
@ -1424,7 +1411,7 @@ class PersistEventsStore:
# Map of values to insert/update in the `sliding_sync_joined_rooms` table
sliding_sync_joined_rooms_insert_map = (
self._get_sliding_sync_insert_values_from_state_map_txn(
self._get_sliding_sync_insert_values_from_state_ids_map_txn(
txn, current_state_map
)
)
@ -1593,7 +1580,7 @@ class PersistEventsStore:
return current_state_map
@classmethod
def _get_sliding_sync_insert_values_from_state_map_txn(
def _get_sliding_sync_insert_values_from_state_ids_map_txn(
cls, txn: LoggingTransaction, state_map: StateMap[str]
) -> Dict[str, Optional[Union[str, bool]]]:
"""

View file

@ -621,7 +621,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
# so we should have some current state for each room
assert current_state_map
sliding_sync_joined_rooms_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_state_map_txn(
sliding_sync_joined_rooms_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_state_ids_map_txn(
txn, current_state_map
)
# We should have some insert values for each room, even if they are `None`
@ -754,7 +754,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
# for each room
assert current_state_map
sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_state_map_txn(
sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_state_ids_map_txn(
txn, current_state_map
)
# We should have some insert values for each room, even if they are `None`
@ -854,7 +854,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
)
state_map = state_by_group[state_group]
sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_state_map_txn(
sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_state_ids_map_txn(
txn, state_map
)
# We should have some insert values for each room, even if they are `None`