Merge branch 'madlittlemods/sliding-sync-pre-populate-room-meta-data' into erikj/ss_hacks

This commit is contained in:
Erik Johnston 2024-08-28 14:58:31 +01:00
commit a217155570
7 changed files with 1325 additions and 214 deletions

View file

@ -1368,7 +1368,7 @@ class DatabasePool:
if lock: if lock:
# We need to lock the table :( # We need to lock the table :(
self.engine.lock_table(txn, table) txn.database_engine.lock_table(txn, table)
def _getwhere(key: str) -> str: def _getwhere(key: str) -> str:
# If the value we're passing in is None (aka NULL), we need to use # If the value we're passing in is None (aka NULL), we need to use
@ -1422,8 +1422,8 @@ class DatabasePool:
# successfully inserted # successfully inserted
return True return True
@staticmethod
def simple_upsert_txn_native_upsert( def simple_upsert_txn_native_upsert(
self,
txn: LoggingTransaction, txn: LoggingTransaction,
table: str, table: str,
keyvalues: Mapping[str, Any], keyvalues: Mapping[str, Any],
@ -1581,8 +1581,8 @@ class DatabasePool:
self.simple_upsert_txn_emulated(txn, table, _keys, _vals, lock=False) self.simple_upsert_txn_emulated(txn, table, _keys, _vals, lock=False)
@staticmethod
def simple_upsert_many_txn_native_upsert( def simple_upsert_many_txn_native_upsert(
self,
txn: LoggingTransaction, txn: LoggingTransaction,
table: str, table: str,
key_names: Collection[str], key_names: Collection[str],

View file

@ -1849,7 +1849,7 @@ class PersistEventsStore:
@classmethod @classmethod
def _get_relevant_sliding_sync_current_state_event_ids_txn( def _get_relevant_sliding_sync_current_state_event_ids_txn(
cls, txn: LoggingTransaction, room_id: str cls, txn: LoggingTransaction, room_id: str
) -> Tuple[MutableStateMap[str], int]: ) -> MutableStateMap[str]:
""" """
Fetch the current state event IDs for the relevant (to the Fetch the current state event IDs for the relevant (to the
`sliding_sync_joined_rooms` table) state types for the given room. `sliding_sync_joined_rooms` table) state types for the given room.

View file

@ -47,6 +47,8 @@ from synapse.storage.types import Cursor
from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection
from synapse.types.handlers.sliding_sync import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES from synapse.types.handlers.sliding_sync import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
from synapse.types.state import StateFilter from synapse.types.state import StateFilter
from synapse.util import json_encoder
from synapse.util.iterutils import batch_iter
if TYPE_CHECKING: if TYPE_CHECKING:
from synapse.server import HomeServer from synapse.server import HomeServer
@ -88,6 +90,9 @@ class _BackgroundUpdates:
EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index" EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index"
SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE = (
"sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update"
)
SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE = "sliding_sync_joined_rooms_bg_update" SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE = "sliding_sync_joined_rooms_bg_update"
SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE = ( SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE = (
"sliding_sync_membership_snapshots_bg_update" "sliding_sync_membership_snapshots_bg_update"
@ -112,6 +117,18 @@ class _CalculateChainCover:
finished_room_map: Dict[str, Tuple[int, int]] finished_room_map: Dict[str, Tuple[int, int]]
@attr.s(slots=True, frozen=True, auto_attribs=True)
class _JoinedRoomStreamOrderingUpdate:
"""
Intermediate container class used in `SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE`
"""
# The most recent event stream_ordering for the room
most_recent_event_stream_ordering: int
# The most recent event `bump_stamp` for the room
most_recent_bump_stamp: Optional[int]
class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseStore): class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseStore):
def __init__( def __init__(
self, self,
@ -294,6 +311,12 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
where_clause="NOT outlier", where_clause="NOT outlier",
) )
# Handle background updates for Sliding Sync tables
#
self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE,
self._sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update,
)
# Add some background updates to populate the sliding sync tables # Add some background updates to populate the sliding sync tables
self.db_pool.updates.register_background_update_handler( self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE, _BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE,
@ -304,6 +327,15 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
self._sliding_sync_membership_snapshots_bg_update, self._sliding_sync_membership_snapshots_bg_update,
) )
# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
# foreground update for
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
# https://github.com/element-hq/synapse/issues/TODO)
with db_conn.cursor(txn_name="resolve_sliding_sync") as txn:
_resolve_stale_data_in_sliding_sync_tables(
txn=txn,
)
async def _background_reindex_fields_sender( async def _background_reindex_fields_sender(
self, progress: JsonDict, batch_size: int self, progress: JsonDict, batch_size: int
) -> int: ) -> int:
@ -1542,29 +1574,77 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
return batch_size return batch_size
async def _sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update(
self, progress: JsonDict, _batch_size: int
) -> int:
"""
Prefill `sliding_sync_joined_rooms_to_recalculate` table with all rooms we know about already.
"""
def _txn(txn: LoggingTransaction) -> None:
# We do this as one big bulk insert. This has been tested on a bigger
# homeserver with ~10M rooms and took 11s. There is potential for this to
# starve disk usage while this goes on.
#
# We upsert in case we have to run this multiple times.
#
# The `WHERE TRUE` clause is to avoid "Parsing Ambiguity"
txn.execute(
"""
INSERT INTO sliding_sync_joined_rooms_to_recalculate
(room_id)
SELECT room_id FROM rooms WHERE ?
ON CONFLICT (room_id)
DO NOTHING;
""",
(True,),
)
await self.db_pool.runInteraction(
"_sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update",
_txn,
)
# Background update is done.
await self.db_pool.updates._end_background_update(
_BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE
)
return 0
async def _sliding_sync_joined_rooms_bg_update( async def _sliding_sync_joined_rooms_bg_update(
self, progress: JsonDict, batch_size: int self, progress: JsonDict, batch_size: int
) -> int: ) -> int:
""" """
Background update to populate the `sliding_sync_joined_rooms` table. Background update to populate the `sliding_sync_joined_rooms` table.
""" """
last_room_id = progress.get("last_room_id", "") # We don't need to fetch any progress state because we just grab the next N
# events in `sliding_sync_joined_rooms_to_recalculate`
def _get_rooms_to_update_txn(txn: LoggingTransaction) -> List[str]: def _get_rooms_to_update_txn(txn: LoggingTransaction) -> List[Tuple[str]]:
"""
Returns:
A list of room ID's to update along with the progress value
(event_stream_ordering) indicating the continuation point in the
`current_state_events` table for the next batch.
"""
# Fetch the set of room IDs that we want to update # Fetch the set of room IDs that we want to update
#
# We use `current_state_events` table as the barometer for whether the
# server is still participating in the room because if we're
# `no_longer_in_room`, this table would be cleared out for the given
# `room_id`.
txn.execute( txn.execute(
""" """
SELECT DISTINCT room_id FROM current_state_events SELECT room_id
WHERE room_id > ? FROM sliding_sync_joined_rooms_to_recalculate
ORDER BY room_id ASC
LIMIT ? LIMIT ?
""", """,
(last_room_id, batch_size), (batch_size,),
) )
rooms_to_update_rows = cast(List[Tuple[str]], txn.fetchall()) rooms_to_update_rows = cast(List[Tuple[str]], txn.fetchall())
return [row[0] for row in rooms_to_update_rows] return rooms_to_update_rows
rooms_to_update = await self.db_pool.runInteraction( rooms_to_update = await self.db_pool.runInteraction(
"_sliding_sync_joined_rooms_bg_update._get_rooms_to_update_txn", "_sliding_sync_joined_rooms_bg_update._get_rooms_to_update_txn",
@ -1577,32 +1657,42 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
) )
return 0 return 0
# Map from room_id to insert/update state values in the `sliding_sync_joined_rooms` table # Map from room_id to insert/update state values in the `sliding_sync_joined_rooms` table.
joined_room_updates: Dict[str, SlidingSyncStateInsertValues] = {} joined_room_updates: Dict[str, SlidingSyncStateInsertValues] = {}
# Map from room_id to stream_ordering/bump_stamp/last_current_state_delta_stream_id values # Map from room_id to stream_ordering/bump_stamp, etc values
joined_room_stream_ordering_updates: Dict[ joined_room_stream_ordering_updates: Dict[
str, Tuple[int, Optional[int], int] str, _JoinedRoomStreamOrderingUpdate
] = {} ] = {}
current_stream_id = await self.get_max_stream_id_in_current_state_deltas() # As long as we get this value before we fetch the current state, we can use it
for room_id in rooms_to_update: # to check if something has changed since that point.
most_recent_current_state_delta_stream_id = (
await self.get_max_stream_id_in_current_state_deltas()
)
for (room_id,) in rooms_to_update:
current_state_ids_map = await self.db_pool.runInteraction( current_state_ids_map = await self.db_pool.runInteraction(
"_sliding_sync_joined_rooms_bg_update._get_relevant_sliding_sync_current_state_event_ids_txn", "_sliding_sync_joined_rooms_bg_update._get_relevant_sliding_sync_current_state_event_ids_txn",
PersistEventsStore._get_relevant_sliding_sync_current_state_event_ids_txn, PersistEventsStore._get_relevant_sliding_sync_current_state_event_ids_txn,
room_id, room_id,
) )
# We're iterating over rooms pulled from the current_state_events table
# so we should have some current state for each room # If we're not joined to the room a) it doesn't belong in the
assert current_state_ids_map # `sliding_sync_joined_rooms` table so we should skip and b) we won't have
# any `current_state_events` for the room.
if not current_state_ids_map:
continue
fetched_events = await self.get_events(current_state_ids_map.values()) fetched_events = await self.get_events(current_state_ids_map.values())
current_state_map: StateMap[EventBase] = { current_state_map: StateMap[EventBase] = {
state_key: fetched_events[event_id] state_key: fetched_events[event_id]
for state_key, event_id in current_state_ids_map.items() for state_key, event_id in current_state_ids_map.items()
# `get_events(...)` will filter out events for unknown room versions
if event_id in fetched_events if event_id in fetched_events
} }
# Can happen for old room versions. # Even if we are joined to the room, this can happen for unknown room
# versions (old room versions that aren't known anymore) since
# `get_events(...)` will filter out events for unknown room versions
if not current_state_map: if not current_state_map:
continue continue
@ -1628,10 +1718,11 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
"We should have at-least one event in the room (our own join membership event for example) " "We should have at-least one event in the room (our own join membership event for example) "
+ "that isn't backfilled (negative `stream_ordering`) if we are joined to the room." + "that isn't backfilled (negative `stream_ordering`) if we are joined to the room."
) )
# Figure out the latest bump_stamp in the room. This could be `None` for a # Figure out the latest `bump_stamp` in the room. This could be `None` for a
# federated room you just joined where all of events are still `outliers` or # federated room you just joined where all of events are still `outliers` or
# backfilled history. In the Sliding Sync API, we default to the user's # backfilled history. In the Sliding Sync API, we default to the user's
# membership event `stream_ordering` if we don't have a `bump_stamp`. # membership event `stream_ordering` if we don't have a `bump_stamp` so
# having it as `None` in this table is fine.
bump_stamp_event_pos_results = await self.get_last_event_pos_in_room( bump_stamp_event_pos_results = await self.get_last_event_pos_in_room(
room_id, event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES room_id, event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
) )
@ -1643,21 +1734,26 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
most_recent_bump_stamp = bump_stamp_event_pos_results[1].stream most_recent_bump_stamp = bump_stamp_event_pos_results[1].stream
joined_room_stream_ordering_updates[room_id] = ( joined_room_stream_ordering_updates[room_id] = (
most_recent_event_stream_ordering, _JoinedRoomStreamOrderingUpdate(
most_recent_bump_stamp, most_recent_event_stream_ordering=most_recent_event_stream_ordering,
current_stream_id, most_recent_bump_stamp=most_recent_bump_stamp,
)
) )
def _fill_table_txn(txn: LoggingTransaction) -> None: def _fill_table_txn(txn: LoggingTransaction) -> None:
# Handle updating the `sliding_sync_joined_rooms` table # Handle updating the `sliding_sync_joined_rooms` table
# #
last_successful_room_id: Optional[str] = None for (
for room_id, insert_map in joined_room_updates.items(): room_id,
( update_map,
event_stream_ordering, ) in joined_room_updates.items():
bump_stamp, joined_room_stream_ordering_update = (
last_current_state_delta_stream_id, joined_room_stream_ordering_updates[room_id]
) = joined_room_stream_ordering_updates[room_id] )
event_stream_ordering = (
joined_room_stream_ordering_update.most_recent_event_stream_ordering
)
bump_stamp = joined_room_stream_ordering_update.most_recent_bump_stamp
# Check if the current state has been updated since we gathered it # Check if the current state has been updated since we gathered it
state_deltas_since_we_gathered_current_state = ( state_deltas_since_we_gathered_current_state = (
@ -1665,25 +1761,18 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
txn, txn,
room_id, room_id,
from_token=RoomStreamToken( from_token=RoomStreamToken(
stream=last_current_state_delta_stream_id stream=most_recent_current_state_delta_stream_id
), ),
to_token=None, to_token=None,
) )
) )
for state_delta in state_deltas_since_we_gathered_current_state: for state_delta in state_deltas_since_we_gathered_current_state:
# We only need to check if the state is relevant to the # We only need to check for the state is relevant to the
# `sliding_sync_joined_rooms` table. # `sliding_sync_joined_rooms` table.
if ( if (
state_delta.event_type, state_delta.event_type,
state_delta.state_key, state_delta.state_key,
) in SLIDING_SYNC_RELEVANT_STATE_SET: ) in SLIDING_SYNC_RELEVANT_STATE_SET:
# Save our progress before we exit early
if last_successful_room_id is not None:
self.db_pool.updates._background_update_progress_txn(
txn,
_BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE,
{"last_room_id": room_id},
)
# Raising exception so we can just exit and try again. It would # Raising exception so we can just exit and try again. It would
# be hard to resolve this within the transaction because we need # be hard to resolve this within the transaction because we need
# to get full events out that take redactions into account. We # to get full events out that take redactions into account. We
@ -1698,13 +1787,13 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
# Since we partially update the `sliding_sync_joined_rooms` as new state # Since we partially update the `sliding_sync_joined_rooms` as new state
# is sent, we need to update the state fields `ON CONFLICT`. We just # is sent, we need to update the state fields `ON CONFLICT`. We just
# have to be careful we're not overwriting it with stale data (see # have to be careful we're not overwriting it with stale data (see
# `last_current_state_delta_stream_id` check above). # `most_recent_current_state_delta_stream_id` check above).
# #
self.db_pool.simple_upsert_txn( self.db_pool.simple_upsert_txn(
txn, txn,
table="sliding_sync_joined_rooms", table="sliding_sync_joined_rooms",
keyvalues={"room_id": room_id}, keyvalues={"room_id": room_id},
values=insert_map, values=update_map,
insertion_values={ insertion_values={
# The reason we're only *inserting* (not *updating*) `event_stream_ordering` # The reason we're only *inserting* (not *updating*) `event_stream_ordering`
# and `bump_stamp` is because if they are present, that means they are already # and `bump_stamp` is because if they are present, that means they are already
@ -1714,19 +1803,23 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
}, },
) )
# Keep track of the last successful room_id # Now that we've processed all the room, we can remove them from the
last_successful_room_id = room_id # queue.
#
# Note: we need to remove all the rooms from the queue we pulled out
# from the DB, not just the ones we've processed above. Otherwise
# we'll simply keep pulling out the same rooms over and over again.
self.db_pool.simple_delete_many_batch_txn(
txn,
table="sliding_sync_joined_rooms_to_recalculate",
keys=("room_id",),
values=rooms_to_update,
)
await self.db_pool.runInteraction( await self.db_pool.runInteraction(
"sliding_sync_joined_rooms_bg_update", _fill_table_txn "sliding_sync_joined_rooms_bg_update", _fill_table_txn
) )
# Update the progress
await self.db_pool.updates._background_update_progress(
_BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE,
{"last_room_id": rooms_to_update[-1]},
)
return len(rooms_to_update) return len(rooms_to_update)
async def _sliding_sync_membership_snapshots_bg_update( async def _sliding_sync_membership_snapshots_bg_update(
@ -1735,32 +1828,84 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
""" """
Background update to populate the `sliding_sync_membership_snapshots` table. Background update to populate the `sliding_sync_membership_snapshots` table.
""" """
last_event_stream_ordering = progress.get( # We do this in two phases: a) the initial phase where we go through all
"last_event_stream_ordering", -(1 << 31) # room memberships, and then b) a second phase where we look at new
) # memberships (this is to handle the case where we downgrade and then
# upgrade again).
#
# We have to do this as two phases (rather than just the second phase
# where we iterate on event_stream_ordering), as the
# `event_stream_ordering` column may have null values for old rows.
# Therefore we first do the set of historic rooms and *then* look at any
# new rows (which will have a non-null `event_stream_ordering`).
initial_phase = progress.get("initial_phase")
if initial_phase is None:
# If this is the first run, store the current max stream position.
# We know we will go through all memberships less than the current
# max in the initial phase.
progress = {
"initial_phase": True,
"last_event_stream_ordering": self.get_room_max_stream_ordering(),
}
await self.db_pool.updates._background_update_progress(
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
progress,
)
initial_phase = True
last_room_id = progress.get("last_room_id", "")
last_event_stream_ordering = progress["last_event_stream_ordering"]
def _find_memberships_to_update_txn( def _find_memberships_to_update_txn(
txn: LoggingTransaction, txn: LoggingTransaction,
) -> List[Tuple[str, str, str, str, str, int, bool]]: ) -> List[Tuple[str, str, str, str, str, int, bool]]:
# Fetch the set of event IDs that we want to update # Fetch the set of event IDs that we want to update
txn.execute(
""" if initial_phase:
SELECT txn.execute(
c.room_id, """
c.user_id, SELECT
e.sender, c.room_id,
c.event_id, c.user_id,
c.membership, e.sender,
c.event_stream_ordering, c.event_id,
e.outlier c.membership,
FROM local_current_membership as c e.stream_ordering,
INNER JOIN events AS e USING (event_id) e.outlier
WHERE event_stream_ordering > ? FROM local_current_membership as c
ORDER BY event_stream_ordering ASC INNER JOIN events AS e USING (event_id)
LIMIT ? WHERE c.room_id > ?
""", ORDER BY c.room_id ASC
(last_event_stream_ordering, batch_size), LIMIT ?
) """,
(last_room_id, batch_size),
)
elif last_event_stream_ordering is not None:
# It's important to sort by `event_stream_ordering` *ascending* (oldest to
# newest) so that if we see that this background update in progress and want
# to start the catch-up process, we can safely assume that it will
# eventually get to the rooms we want to catch-up on anyway (see
# `_resolve_stale_data_in_sliding_sync_tables()`).
txn.execute(
"""
SELECT
c.room_id,
c.user_id,
e.sender,
c.event_id,
c.membership,
c.event_stream_ordering,
e.outlier
FROM local_current_membership as c
INNER JOIN events AS e USING (event_id)
WHERE event_stream_ordering > ?
ORDER BY event_stream_ordering ASC
LIMIT ?
""",
(last_event_stream_ordering, batch_size),
)
else:
raise Exception("last_event_stream_ordering should not be None")
memberships_to_update_rows = cast( memberships_to_update_rows = cast(
List[Tuple[str, str, str, str, str, int, bool]], txn.fetchall() List[Tuple[str, str, str, str, str, int, bool]], txn.fetchall()
@ -1774,39 +1919,45 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
) )
if not memberships_to_update_rows: if not memberships_to_update_rows:
await self.db_pool.updates._end_background_update( if initial_phase:
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE # Move onto the next phase.
) await self.db_pool.updates._background_update_progress(
return 0 _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
{
"initial_phase": False,
"last_event_stream_ordering": last_event_stream_ordering,
},
)
return 0
else:
# We've finished both phases, we're done.
await self.db_pool.updates._end_background_update(
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE
)
return 0
def _find_previous_membership_txn( def _find_previous_membership_txn(
txn: LoggingTransaction, room_id: str, user_id: str, stream_ordering: int txn: LoggingTransaction, event_id: str, user_id: str
) -> Tuple[str, str]: ) -> Tuple[str, str]:
# Find the previous invite/knock event before the leave event # Find the previous invite/knock event before the leave event. This
# is done by looking at the auth events of the invite/knock and
# finding the corresponding membership event.
txn.execute( txn.execute(
""" """
SELECT event_id, membership SELECT m.event_id, m.membership
FROM room_memberships FROM event_auth AS a
WHERE INNER JOIN room_memberships AS m ON (a.auth_id = m.event_id)
room_id = ? WHERE a.event_id = ? AND m.user_id = ?
AND user_id = ?
AND event_stream_ordering < ?
ORDER BY event_stream_ordering DESC
LIMIT 1
""", """,
( (event_id, user_id),
room_id,
user_id,
stream_ordering,
),
) )
row = txn.fetchone() row = txn.fetchone()
# We should see a corresponding previous invite/knock event # We should see a corresponding previous invite/knock event
assert row is not None assert row is not None
event_id, membership = row previous_event_id, membership = row
return event_id, membership return previous_event_id, membership
# Map from (room_id, user_id) to ... # Map from (room_id, user_id) to ...
to_insert_membership_snapshots: Dict[ to_insert_membership_snapshots: Dict[
@ -1862,8 +2013,16 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
current_state_map: StateMap[EventBase] = { current_state_map: StateMap[EventBase] = {
state_key: fetched_events[event_id] state_key: fetched_events[event_id]
for state_key, event_id in current_state_ids_map.items() for state_key, event_id in current_state_ids_map.items()
# `get_events(...)` will filter out events for unknown room versions
if event_id in fetched_events
} }
# Can happen for unknown room versions (old room versions that aren't known
# anymore) since `get_events(...)` will filter out events for unknown room
# versions
if not current_state_map:
continue
state_insert_values = ( state_insert_values = (
PersistEventsStore._get_sliding_sync_insert_values_from_state_map( PersistEventsStore._get_sliding_sync_insert_values_from_state_map(
current_state_map current_state_map
@ -1892,9 +2051,8 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
await self.db_pool.runInteraction( await self.db_pool.runInteraction(
"sliding_sync_membership_snapshots_bg_update._find_previous_membership", "sliding_sync_membership_snapshots_bg_update._find_previous_membership",
_find_previous_membership_txn, _find_previous_membership_txn,
room_id, membership_event_id,
user_id, user_id,
membership_event_stream_ordering,
) )
) )
@ -1939,8 +2097,16 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
state_map: StateMap[EventBase] = { state_map: StateMap[EventBase] = {
state_key: fetched_events[event_id] state_key: fetched_events[event_id]
for state_key, event_id in state_ids_map.items() for state_key, event_id in state_ids_map.items()
# `get_events(...)` will filter out events for unknown room versions
if event_id in fetched_events
} }
# Can happen for unknown room versions (old room versions that aren't known
# anymore) since `get_events(...)` will filter out events for unknown room
# versions
if not state_map:
continue
state_insert_values = ( state_insert_values = (
PersistEventsStore._get_sliding_sync_insert_values_from_state_map( PersistEventsStore._get_sliding_sync_insert_values_from_state_map(
state_map state_map
@ -2029,7 +2195,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
# Update the progress # Update the progress
( (
_room_id, room_id,
_user_id, _user_id,
_sender, _sender,
_membership_event_id, _membership_event_id,
@ -2037,9 +2203,271 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
membership_event_stream_ordering, membership_event_stream_ordering,
_is_outlier, _is_outlier,
) = memberships_to_update_rows[-1] ) = memberships_to_update_rows[-1]
progress = {
"initial_phase": initial_phase,
"last_room_id": room_id,
"last_event_stream_ordering": membership_event_stream_ordering,
}
await self.db_pool.updates._background_update_progress( await self.db_pool.updates._background_update_progress(
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE, _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
{"last_event_stream_ordering": membership_event_stream_ordering}, progress,
) )
return len(memberships_to_update_rows) return len(memberships_to_update_rows)
def _resolve_stale_data_in_sliding_sync_tables(
txn: LoggingTransaction,
) -> None:
"""
Clears stale/out-of-date entries from the
`sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables.
This accounts for when someone downgrades their Synapse version and then upgrades it
again. This will ensure that we don't have any stale/out-of-date data in the
`sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables since any new
events sent in rooms would have also needed to be written to the sliding sync
tables. For example a new event needs to bump `event_stream_ordering` in
`sliding_sync_joined_rooms` table or some state in the room changing (like the room
name). Or another example of someone's membership changing in a room affecting
`sliding_sync_membership_snapshots`.
This way, if a row exists in the sliding sync tables, we are able to rely on it
(accurate data). And if a row doesn't exist, we use a fallback to get the same info
until the background updates fill in the rows or a new event comes in triggering it
to be fully inserted.
FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
foreground update for
`sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
https://github.com/element-hq/synapse/issues/TODO)
"""
_resolve_stale_data_in_sliding_sync_joined_rooms_table(txn)
_resolve_stale_data_in_sliding_sync_membership_snapshots_table(txn)
def _resolve_stale_data_in_sliding_sync_joined_rooms_table(
txn: LoggingTransaction,
) -> None:
"""
Clears stale/out-of-date entries from the `sliding_sync_joined_rooms` table and
kicks-off the background update to catch-up with what we missed while Synapse was
downgraded.
See `_resolve_stale_data_in_sliding_sync_tables()` description above for more
context.
"""
# Find the point when we stopped writing to the `sliding_sync_joined_rooms` table
txn.execute(
"""
SELECT event_stream_ordering
FROM sliding_sync_joined_rooms
ORDER BY event_stream_ordering DESC
LIMIT 1
""",
)
# If we have nothing written to the `sliding_sync_joined_rooms` table, there is
# nothing to clean up
row = cast(Optional[Tuple[int]], txn.fetchone())
max_stream_ordering_sliding_sync_joined_rooms_table = None
depends_on = None
if row is not None:
(max_stream_ordering_sliding_sync_joined_rooms_table,) = row
txn.execute(
"""
SELECT room_id
FROM events
WHERE stream_ordering > ?
GROUP BY room_id
ORDER BY MAX(stream_ordering) ASC
""",
(max_stream_ordering_sliding_sync_joined_rooms_table,),
)
room_rows = txn.fetchall()
# No new events have been written to the `events` table since the last time we wrote
# to the `sliding_sync_joined_rooms` table so there is nothing to clean up. This is
# the expected normal scenario for people who have not downgraded their Synapse
# version.
if not room_rows:
return
# 1000 is an arbitrary batch size with no testing
for chunk in batch_iter(room_rows, 1000):
# Handle updating the `sliding_sync_joined_rooms` table
#
# Clear out the stale data
DatabasePool.simple_delete_many_batch_txn(
txn,
table="sliding_sync_joined_rooms",
keys=("room_id",),
values=chunk,
)
# Update the `sliding_sync_joined_rooms_to_recalculate` table with the rooms
# that went stale and now need to be recalculated.
DatabasePool.simple_upsert_many_txn_native_upsert(
txn,
table="sliding_sync_joined_rooms_to_recalculate",
key_names=("room_id",),
key_values=chunk,
value_names=(),
# No value columns, therefore make a blank list so that the following
# zip() works correctly.
value_values=[() for x in range(len(chunk))],
)
else:
txn.execute("SELECT 1 FROM local_current_membership LIMIT 1")
row = txn.fetchone()
if row is None:
# There are no rooms, so don't schedule the bg update.
return
# Re-run the `sliding_sync_joined_rooms_to_recalculate` prefill if there is
# nothing in the `sliding_sync_joined_rooms` table
DatabasePool.simple_upsert_txn_native_upsert(
txn,
table="background_updates",
keyvalues={
"update_name": _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE
},
values={},
# Only insert the row if it doesn't already exist. If it already exists,
# we're already working on it
insertion_values={
"progress_json": "{}",
},
)
depends_on = (
_BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE
)
# Now kick-off the background update to catch-up with what we missed while Synapse
# was downgraded.
#
# We may need to catch-up on everything if we have nothing written to the
# `sliding_sync_joined_rooms` table yet. This could happen if someone had zero rooms
# on their server (so the normal background update completes), downgrade Synapse
# versions, join and create some new rooms, and upgrade again.
DatabasePool.simple_upsert_txn_native_upsert(
txn,
table="background_updates",
keyvalues={
"update_name": _BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE
},
values={},
# Only insert the row if it doesn't already exist. If it already exists, we will
# eventually fill in the rows we're trying to populate.
insertion_values={
# Empty progress is expected since it's not used for this background update.
"progress_json": "{}",
# Wait for the prefill to finish
"depends_on": depends_on,
},
)
def _resolve_stale_data_in_sliding_sync_membership_snapshots_table(
txn: LoggingTransaction,
) -> None:
"""
Clears stale/out-of-date entries from the `sliding_sync_membership_snapshots` table
and kicks-off the background update to catch-up with what we missed while Synapse
was downgraded.
See `_resolve_stale_data_in_sliding_sync_tables()` description above for more
context.
"""
# Find the point when we stopped writing to the `sliding_sync_membership_snapshots` table
txn.execute(
"""
SELECT event_stream_ordering
FROM sliding_sync_membership_snapshots
ORDER BY event_stream_ordering DESC
LIMIT 1
""",
)
# If we have nothing written to the `sliding_sync_membership_snapshots` table,
# there is nothing to clean up
row = cast(Optional[Tuple[int]], txn.fetchone())
max_stream_ordering_sliding_sync_membership_snapshots_table = None
if row is not None:
(max_stream_ordering_sliding_sync_membership_snapshots_table,) = row
# XXX: Since `forgotten` is simply a flag on the `room_memberships` table that is
# set out-of-band, there is no way to tell whether it was set while Synapse was
# downgraded. The only thing the user can do is `/forget` again if they run into
# this.
#
# This only picks up changes to memberships.
txn.execute(
"""
SELECT user_id, room_id
FROM local_current_membership
WHERE event_stream_ordering > ?
ORDER BY event_stream_ordering ASC
""",
(max_stream_ordering_sliding_sync_membership_snapshots_table,),
)
membership_rows = txn.fetchall()
# No new events have been written to the `events` table since the last time we wrote
# to the `sliding_sync_membership_snapshots` table so there is nothing to clean up.
# This is the expected normal scenario for people who have not downgraded their
# Synapse version.
if not membership_rows:
return
# 1000 is an arbitrary batch size with no testing
for chunk in batch_iter(membership_rows, 1000):
# Handle updating the `sliding_sync_membership_snapshots` table
#
DatabasePool.simple_delete_many_batch_txn(
txn,
table="sliding_sync_membership_snapshots",
keys=("user_id", "room_id"),
values=chunk,
)
else:
txn.execute("SELECT 1 FROM local_current_membership LIMIT 1")
row = txn.fetchone()
if row is None:
# There are no rooms, so don't schedule the bg update.
return
# Now kick-off the background update to catch-up with what we missed while Synapse
# was downgraded.
#
# We may need to catch-up on everything if we have nothing written to the
# `sliding_sync_membership_snapshots` table yet. This could happen if someone had
# zero rooms on their server (so the normal background update completes), downgrade
# Synapse versions, join and create some new rooms, and upgrade again.
#
progress_json: JsonDict = {}
if max_stream_ordering_sliding_sync_membership_snapshots_table is not None:
progress_json["initial_phase"] = False
progress_json["last_event_stream_ordering"] = (
max_stream_ordering_sliding_sync_membership_snapshots_table
)
DatabasePool.simple_upsert_txn_native_upsert(
txn,
table="background_updates",
keyvalues={
"update_name": _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE
},
values={},
# Only insert the row if it doesn't already exist. If it already exists, we will
# eventually fill in the rows we're trying to populate.
insertion_values={
"progress_json": json_encoder.encode(progress_json),
},
)

View file

@ -457,6 +457,8 @@ class EventsWorkerStore(SQLBaseStore):
) -> Optional[EventBase]: ) -> Optional[EventBase]:
"""Get an event from the database by event_id. """Get an event from the database by event_id.
Events for unknown room versions will also be filtered out.
Args: Args:
event_id: The event_id of the event to fetch event_id: The event_id of the event to fetch
@ -513,6 +515,8 @@ class EventsWorkerStore(SQLBaseStore):
Unknown events will be omitted from the response. Unknown events will be omitted from the response.
Events for unknown room versions will also be filtered out.
Args: Args:
event_ids: The event_ids of the events to fetch event_ids: The event_ids of the events to fetch
@ -555,6 +559,8 @@ class EventsWorkerStore(SQLBaseStore):
Unknown events will be omitted from the response. Unknown events will be omitted from the response.
Events for unknown room versions will also be filtered out.
Args: Args:
event_ids: The event_ids of the events to fetch event_ids: The event_ids of the events to fetch

View file

@ -11,6 +11,16 @@
-- See the GNU Affero General Public License for more details: -- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>. -- <https://www.gnu.org/licenses/agpl-3.0.html>.
-- This table is a list/queue used to keep track of which rooms need to be inserted into
-- `sliding_sync_joined_rooms`. We do this to avoid reading from `current_state_events`
-- during the background update to populate `sliding_sync_joined_rooms` which works but
-- it takes a lot of work for the database to grab `DISTINCT` room_ids given how many
-- state events there are for each room.
CREATE TABLE IF NOT EXISTS sliding_sync_joined_rooms_to_recalculate(
room_id TEXT NOT NULL REFERENCES rooms(room_id),
PRIMARY KEY (room_id)
);
-- A table for storing room meta data (current state relevant to sliding sync) that the -- A table for storing room meta data (current state relevant to sliding sync) that the
-- local server is still participating in (someone local is joined to the room). -- local server is still participating in (someone local is joined to the room).
-- --
@ -127,8 +137,17 @@ CREATE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_user_id ON sliding_
CREATE UNIQUE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_event_stream_ordering ON sliding_sync_membership_snapshots(event_stream_ordering); CREATE UNIQUE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_event_stream_ordering ON sliding_sync_membership_snapshots(event_stream_ordering);
-- Add some background updates to populate the new tables -- Add a series of background updates to populate the new `sliding_sync_joined_rooms` table:
--
-- 1. Add a background update to prefill `sliding_sync_joined_rooms_to_recalculate`.
-- We do a one-shot bulk insert from the `rooms` table to prefill.
-- 2. Add a background update to populate the new `sliding_sync_joined_rooms` table
--
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(8701, 'sliding_sync_joined_rooms_bg_update', '{}'); (8701, 'sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update', '{}');
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
(8701, 'sliding_sync_joined_rooms_bg_update', '{}', 'sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update');
-- Add a background updates to populate the new `sliding_sync_membership_snapshots` table
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(8701, 'sliding_sync_membership_snapshots_bg_update', '{}'); (8701, 'sliding_sync_membership_snapshots_bg_update', '{}');

View file

@ -112,6 +112,24 @@ class UpdateUpsertManyTests(unittest.HomeserverTestCase):
{(1, "user1", "hello"), (2, "user2", "bleb")}, {(1, "user1", "hello"), (2, "user2", "bleb")},
) )
self.get_success(
self.storage.db_pool.runInteraction(
"test",
self.storage.db_pool.simple_upsert_many_txn,
self.table_name,
key_names=key_names,
key_values=[[2, "user2"]],
value_names=[],
value_values=[],
)
)
# Check results are what we expect
self.assertEqual(
set(self._dump_table_to_tuple()),
{(1, "user1", "hello"), (2, "user2", "bleb")},
)
def test_simple_update_many(self) -> None: def test_simple_update_many(self) -> None:
""" """
simple_update_many performs many updates at once. simple_update_many performs many updates at once.

File diff suppressed because it is too large Load diff