Use dicts

This commit is contained in:
Eric Eastwood 2024-08-20 13:21:19 -05:00
parent 95d39db772
commit 2964c567d3
2 changed files with 192 additions and 146 deletions

View file

@ -30,6 +30,7 @@ from typing import (
Awaitable,
Callable,
ClassVar,
Sequence,
Collection,
Deque,
Dict,
@ -49,7 +50,7 @@ from prometheus_client import Counter, Histogram
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership, EventContentFields
from synapse.api.constants import EventContentFields, EventTypes, Membership
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
@ -65,20 +66,19 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.controllers.state import StateStorageController
from synapse.storage.databases import Databases
from synapse.storage.databases.main.events import (
SLIDING_SYNC_RELEVANT_STATE_SET,
DeltaState,
SlidingSyncTableChanges,
SlidingSyncStateInsertValues,
SlidingSyncMembershipSnapshotSharedInsertValues,
SlidingSyncMembershipInfo,
SlidingSyncMembershipSnapshotSharedInsertValues,
SlidingSyncStateInsertValues,
SlidingSyncTableChanges,
)
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.databases.main.events import (
SLIDING_SYNC_RELEVANT_STATE_SET,
)
from synapse.types import (
PersistedEventPosition,
RoomStreamToken,
StateMap,
MutableStateMap,
get_domain_from_id,
)
from synapse.types.state import StateFilter
@ -511,8 +511,13 @@ class EventsPersistenceStorageController:
"""
state = await self._calculate_current_state(room_id)
delta = await self._calculate_state_delta(room_id, state)
sliding_sync_table_changes = await self._calculate_sliding_sync_table_changes(
room_id, [], delta
)
await self.persist_events_store.update_current_state(room_id, delta)
await self.persist_events_store.update_current_state(
room_id, delta, sliding_sync_table_changes
)
async def _calculate_current_state(self, room_id: str) -> StateMap[str]:
"""Calculate the current state of a room, based on the forward extremities
@ -627,6 +632,7 @@ class EventsPersistenceStorageController:
room_id, chunk
)
if state_delta_for_room is not None:
with Measure(self._clock, "_calculate_sliding_sync_table_changes"):
sliding_sync_table_changes = (
await self._calculate_sliding_sync_table_changes(
@ -772,15 +778,26 @@ class EventsPersistenceStorageController:
async def _calculate_sliding_sync_table_changes(
self,
room_id: str,
events_and_contexts: List[Tuple[EventBase, EventContext]],
delta_state: Optional[DeltaState],
) -> Optional[SlidingSyncTableChanges]:
events_and_contexts: Sequence[Tuple[EventBase, EventContext]],
delta_state: DeltaState,
) -> SlidingSyncTableChanges:
"""
TODO
Args:
room_id: The room ID currently being processed.
events_and_contexts: List of tuples of (event, context) being persisted.
This is completely optional (you can pass an empty list) and will just
save us from fetching the events from the database if we already have
them.
delta_state: Deltas that are going to be used to update the
`current_state_events` table.
"""
to_insert = delta_state.to_insert
to_delete = delta_state.to_delete
event_map = {event.event_id: event for event, _ in events_and_contexts}
# This would only happen if someone was state reset out of the room
to_delete_membership_snapshots = [
state_key
@ -788,7 +805,9 @@ class EventsPersistenceStorageController:
if event_type == EventTypes.Member and self.is_mine_id(state_key)
]
membership_snapshot_updates = {}
membership_snapshot_shared_insert_values: (
SlidingSyncMembershipSnapshotSharedInsertValues
) = {}
membership_infos: List[SlidingSyncMembershipInfo] = []
if to_insert:
membership_event_id_to_user_id_map: Dict[str, str] = {}
@ -796,18 +815,39 @@ class EventsPersistenceStorageController:
if state_key[0] == EventTypes.Member and self.is_mine_id(state_key[1]):
membership_event_id_to_user_id_map[event_id] = state_key[1]
event_id_to_sender_map = await _get_sender_for_event_ids(membership_event_id_to_user_id_map.keys())
membership_infos = [
SlidingSyncMembershipInfo(
user_id=user_id,
sender=event_id_to_sender_map[event_id],
membership_event_id=membership_event_id
event_id_to_sender_map: Dict[str, str] = {}
# In normal event persist scenarios, we should be able to find the
# membership events in the `events_and_contexts` given to us but it's
# possible a state reset happened which added us to the room without a
# corresponding new membership event (reset back to a previous membership).
missing_membership_event_ids: Set[str] = set()
for membership_event_id in membership_event_id_to_user_id_map.keys():
membership_event = event_map.get(membership_event_id)
if membership_event:
event_id_to_sender_map[membership_event_id] = (
membership_event.sender
)
else:
missing_membership_event_ids.add(membership_event_id)
# Otherwise, we need to find a couple previous events that we were reset to.
if missing_membership_event_ids:
remaining_event_id_to_sender_map = await _get_sender_for_event_ids(
missing_membership_event_ids
)
event_id_to_sender_map.update(remaining_event_id_to_sender_map)
membership_infos = [
{
"user_id": user_id,
"sender": event_id_to_sender_map[event_id],
"membership_event_id": membership_event_id,
}
for membership_event_id, user_id in membership_event_id_to_user_id_map.items()
]
if membership_infos:
current_state_ids_map = (
current_state_ids_map: MutableStateMap = dict(
await self.main_store.get_partial_filtered_current_state_ids(
room_id,
state_filter=StateFilter.from_types(
@ -826,27 +866,25 @@ class EventsPersistenceStorageController:
if state_key in SLIDING_SYNC_RELEVANT_STATE_SET:
current_state_ids_map[state_key] = event_id
event_map = await self.main_store.get_events(
fetched_events = await self.main_store.get_events(
current_state_ids_map.values()
)
current_state_map = {}
for key, event_id in current_state_ids_map.items():
event = event_map.get(event_id)
if event:
current_state_map[key] = event
current_state_map: StateMap[EventBase] = {
key: fetched_events[event_id]
for key, event_id in current_state_ids_map.items()
}
# Map of values to insert/update in the `sliding_sync_membership_snapshots` table
membership_snapshot_shared_insert_values = SlidingSyncMembershipSnapshotSharedInsertValues()
has_known_state = False
if current_state_map:
state_insert_values = (
self._get_sliding_sync_insert_values_from_state_map(
current_state_map
)
)
membership_snapshot_shared_insert_values.update(state_insert_values)
# We have current state to work from
membership_snapshot_shared_insert_values.has_known_state = True
membership_snapshot_shared_insert_values["has_known_state"] = True
else:
# We don't have any `current_state_events` anymore (previously
# cleared out because of `no_longer_in_room`). This can happen if
@ -856,29 +894,21 @@ class EventsPersistenceStorageController:
# rejects the invite (leaves the room), we will end up here.
#
# In these cases, we should inherit the meta data from the previous
# snapshot (handled by the default `ON CONFLICT ... DO UPDATE SET`).
# When using sliding sync filters, this will prevent the room from
# snapshot so we shouldn't update any of the state values. When
# using sliding sync filters, this will prevent the room from
# disappearing/appearing just because you left the room.
#
# Ideally, we could additionally assert that we're only here for
# valid non-join membership transitions.
assert delta_state.no_longer_in_room
membership_snapshot_updates = {
(room_id, user_id): SlidingSyncSnapshotInsertValues(
membership_event_id=membership_event_id,
has_known_state=has_known_state,
)
for membership_event_id, user_id in membership_event_id_to_user_id_map.items()
}
return SlidingSyncTableChanges(
room_id=room_id,
# For `sliding_sync_joined_rooms`
joined_room_updates=TODO,
to_delete_joined_rooms=TODO,
# For `sliding_sync_membership_snapshots`
membership_snapshot_shared_insert_values=TODO,
membership_snapshot_shared_insert_values=membership_snapshot_shared_insert_values,
to_insert_membership_snapshots=membership_infos,
to_delete_membership_snapshots=to_delete_membership_snapshots,
)
@ -897,12 +927,14 @@ class EventsPersistenceStorageController:
the `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables.
"""
# Map of values to insert/update in the `sliding_sync_membership_snapshots` table
sliding_sync_insert_map: Dict[str, Optional[Union[str, bool]]] = {}
sliding_sync_insert_map: SlidingSyncStateInsertValues = {}
# Parse the raw event JSON
for state_key, event in state_map.items():
if state_key == (EventTypes.Create, ""):
room_type = event.content.get(EventContentFields.ROOM_TYPE)
# Scrutinize JSON values
if room_type is None or isinstance(room_type, str):
sliding_sync_insert_map["room_type"] = room_type
elif state_key == (EventTypes.RoomEncryption, ""):
encryption_algorithm = event.content.get(
@ -912,6 +944,8 @@ class EventsPersistenceStorageController:
sliding_sync_insert_map["is_encrypted"] = is_encrypted
elif state_key == (EventTypes.Name, ""):
room_name = event.content.get(EventContentFields.ROOM_NAME)
# Scrutinize JSON values
if room_name is None or isinstance(room_name, str):
sliding_sync_insert_map["room_name"] = room_name
else:
# We only expect to see events according to the
@ -920,11 +954,7 @@ class EventsPersistenceStorageController:
f"Unexpected event (we should not be fetching extra events): {state_key} {event.event_id}"
)
return SlidingSyncStateInsertValues(
room_type=room_type,
is_encrypted=encryption_algorithm,
room_name=room_name,
)
return sliding_sync_insert_map
async def _calculate_new_extremities(
self,

View file

@ -31,6 +31,7 @@ from typing import (
Generator,
Iterable,
List,
Literal,
Optional,
Set,
Tuple,
@ -125,34 +126,51 @@ class DeltaState:
return not self.to_delete and not self.to_insert and not self.no_longer_in_room
@attr.s(slots=True, auto_attribs=True)
class SlidingSyncStateInsertValues:
"""
Insert values relevant for the `sliding_sync_joined_rooms` and
`sliding_sync_membership_snapshots` database tables.
"""
room_type: Optional[str]
is_encrypted: Optional[bool]
room_name: Optional[str]
# @attr.s(slots=True, auto_attribs=True)
# class SlidingSyncStateInsertValues:
# """
# Insert values relevant for the `sliding_sync_joined_rooms` and
# `sliding_sync_membership_snapshots` database tables.
# """
# room_type: Optional[str]
# is_encrypted: Optional[bool]
# room_name: Optional[str]
SlidingSyncStateInsertKeys = Literal["room_type", "is_encrypted", "room_name"]
SlidingSyncStateInsertValues = Dict[
SlidingSyncStateInsertKeys, Optional[Union[str, bool]]
]
@attr.s(slots=True, auto_attribs=True)
class SlidingSyncMembershipSnapshotSharedInsertValues(SlidingSyncStateInsertValues):
"""
Insert values for `sliding_sync_membership_snapshots` that we can share across
multiple memberships
"""
has_known_state: bool
# TODO: tombstone_successor_room_id: Optional[str]
# @attr.s(slots=True, auto_attribs=True)
# class SlidingSyncMembershipSnapshotSharedInsertValues(SlidingSyncStateInsertValues):
# """
# Insert values for `sliding_sync_membership_snapshots` that we can share across
# multiple memberships
# """
# has_known_state: bool
# # TODO: tombstone_successor_room_id: Optional[str]
SlidingSyncMembershipSnapshotSharedInsertValues = Dict[
# Instead of using a Union, we use a Literal to be compatible with mypy
# Literal[SlidingSyncStateInsertKeys, "has_known_state"],
Union[SlidingSyncStateInsertKeys, Literal["has_known_state"]],
Optional[Union[str, bool]],
]
# @attr.s(slots=True, auto_attribs=True)
# class SlidingSyncMembershipInfo(SlidingSyncStateInsertValues):
# """
# Values unique to each membership
# """
# user_id: str
# sender: str
# membership_event_id: str
SlidingSyncMembershipInfo = Dict[
Literal["user_id", "sender", "membership_event_id"], Optional[Union[str, bool]]
]
@attr.s(slots=True, auto_attribs=True)
class SlidingSyncMembershipInfo(SlidingSyncStateInsertValues):
"""
Values unique to each membership
"""
user_id: str
sender: str
membership_event_id: str
@attr.s(slots=True, auto_attribs=True)
class SlidingSyncTableChanges:
@ -164,7 +182,9 @@ class SlidingSyncTableChanges:
# Shared values to upsert into `sliding_sync_membership_snapshots` for each
# `to_insert_membership_snapshots`
membership_snapshot_shared_insert_values: SlidingSyncMembershipSnapshotSharedInsertValues
membership_snapshot_shared_insert_values: (
SlidingSyncMembershipSnapshotSharedInsertValues
)
# List of membership to insert into `sliding_sync_membership_snapshots`
to_insert_membership_snapshots: List[SlidingSyncMembershipInfo]
# List of user_id to delete from `sliding_sync_membership_snapshots`
@ -667,6 +687,9 @@ class PersistEventsStore:
# room_memberships, where applicable.
# NB: This function invalidates all state related caches
if state_delta_for_room:
# If the state delta exists, the sliding sync table changes should also exist
assert sliding_sync_table_changes is not None
self._update_current_state_txn(
txn,
room_id,
@ -1213,6 +1236,7 @@ class PersistEventsStore:
self,
room_id: str,
state_delta: DeltaState,
sliding_sync_table_changes: SlidingSyncTableChanges,
) -> None:
"""Update the current state stored in the datatabase for the given room"""
@ -1226,6 +1250,7 @@ class PersistEventsStore:
room_id,
delta_state=state_delta,
stream_id=stream_ordering,
sliding_sync_table_changes=sliding_sync_table_changes,
)
def _update_current_state_txn(
@ -1234,7 +1259,7 @@ class PersistEventsStore:
room_id: str,
delta_state: DeltaState,
stream_id: int,
sliding_sync_table_changes: Optional[SlidingSyncTableChanges],
sliding_sync_table_changes: SlidingSyncTableChanges,
) -> None:
to_delete = delta_state.to_delete
to_insert = delta_state.to_insert
@ -1250,73 +1275,6 @@ class PersistEventsStore:
if ev_type == EventTypes.Member
}
# Handle updating the `sliding_sync_membership_snapshots` table
#
# This would only happen if someone was state reset out of the room
if sliding_sync_table_changes.to_delete_membership_snapshots:
txn.execute_batch(
"DELETE FROM sliding_sync_membership_snapshots"
" WHERE room_id = ? AND user_id = ?",
sliding_sync_table_changes.to_delete_membership_snapshots,
)
# We handle `sliding_sync_membership_snapshots` before `current_state_events` so
# we can gather the current state before it might be deleted if we are
# last ones in the room and now we are `no_longer_in_room`.
#
# We do this regardless of whether the server is `no_longer_in_room` or not
# because we still want a row if a local user was just left/kicked or got banned
# from the room.
if sliding_sync_table_changes.membership_snapshot_updates:
# TODO
for asdf in sliding_sync_table_changes.membership_snapshot_updates:
for attr_name in ["room_type", "is_encrypted", "room_name"]
[
getattr(x, attr_name)
]
# Update the `sliding_sync_membership_snapshots` table
#
# Pulling keys/values separately is safe and will produce congruent
# lists
insert_keys = sliding_sync_membership_snapshots_insert_map.keys()
insert_values = sliding_sync_membership_snapshots_insert_map.values()
# We need to insert/update regardless of whether we have `insert_keys`
# because there are other fields in the `ON CONFLICT` upsert to run (see
# inherit case above for more context when this happens).
txn.execute_batch(
f"""
INSERT INTO sliding_sync_membership_snapshots
(room_id, user_id, membership_event_id, membership, event_stream_ordering
{("," + ", ".join(insert_keys)) if insert_keys else ""})
VALUES (
?, ?, ?,
(SELECT membership FROM room_memberships WHERE event_id = ?),
(SELECT stream_ordering FROM events WHERE event_id = ?)
{("," + ", ".join("?" for _ in insert_values)) if insert_values else ""}
)
ON CONFLICT (room_id, user_id)
DO UPDATE SET
membership_event_id = EXCLUDED.membership_event_id,
membership = EXCLUDED.membership,
event_stream_ordering = EXCLUDED.event_stream_ordering
{("," + ", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)) if insert_keys else ""}
""",
[
[
room_id,
user_id,
membership_event_id,
membership_event_id,
membership_event_id,
]
+ list(insert_values)
for membership_event_id, user_id in membership_event_id_to_user_id_map.items()
],
)
if delta_state.no_longer_in_room:
# Server is no longer in the room so we delete the room from
# current_state_events, being careful we've already updated the
@ -1545,6 +1503,64 @@ class PersistEventsStore:
],
)
# Handle updating the `sliding_sync_membership_snapshots` table
#
# This would only happen if someone was state reset out of the room
if sliding_sync_table_changes.to_delete_membership_snapshots:
txn.execute_batch(
"DELETE FROM sliding_sync_membership_snapshots"
" WHERE room_id = ? AND user_id = ?",
sliding_sync_table_changes.to_delete_membership_snapshots,
)
# We do this regardless of whether the server is `no_longer_in_room` or not
# because we still want a row if a local user was just left/kicked or got banned
# from the room.
if sliding_sync_table_changes.to_insert_membership_snapshots:
# Update the `sliding_sync_membership_snapshots` table
#
# Pulling keys/values separately is safe and will produce congruent
# lists
insert_keys = (
sliding_sync_table_changes.membership_snapshot_shared_insert_values.keys()
)
insert_values = (
sliding_sync_table_changes.membership_snapshot_shared_insert_values.values()
)
# We need to insert/update regardless of whether we have `insert_keys`
# because there are other fields in the `ON CONFLICT` upsert to run (see
# inherit case above for more context when this happens).
txn.execute_batch(
f"""
INSERT INTO sliding_sync_membership_snapshots
(room_id, user_id, membership_event_id, membership, event_stream_ordering
{("," + ", ".join(insert_keys)) if insert_keys else ""})
VALUES (
?, ?, ?,
(SELECT membership FROM room_memberships WHERE event_id = ?),
(SELECT stream_ordering FROM events WHERE event_id = ?)
{("," + ", ".join("?" for _ in insert_values)) if insert_values else ""}
)
ON CONFLICT (room_id, user_id)
DO UPDATE SET
membership_event_id = EXCLUDED.membership_event_id,
membership = EXCLUDED.membership,
event_stream_ordering = EXCLUDED.event_stream_ordering
{("," + ", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)) if insert_keys else ""}
""",
[
[
room_id,
membership_info["user_id"],
membership_info["membership_event_id"],
membership_info["membership_event_id"],
membership_info["membership_event_id"],
]
+ list(insert_values)
for membership_info in sliding_sync_table_changes.to_insert_membership_snapshots
],
)
txn.call_after(
self.store._curr_state_delta_stream_cache.entity_has_changed,
room_id,