Add history visibility index table

This commit is contained in:
Erik Johnston 2024-09-15 10:13:00 +01:00
parent a5e16a4ab5
commit 35d797a9c4
4 changed files with 130 additions and 1 deletions

View file

@ -49,7 +49,7 @@ from prometheus_client import Counter, Histogram
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.constants import EventTypes, HistoryVisibility, Membership
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
@ -635,6 +635,44 @@ class EventsPersistenceStorageController:
room_id, [e for e, _ in chunk]
)
visibilities: Dict[str, str] = {}
with Measure(self._clock, "calculate_history_vis"):
# TODO: We only need to do this on changes, rather than looking
# up the state for every event
for event, context in events_and_contexts:
if (
backfilled
or event.internal_metadata.is_outlier()
or context.rejected
):
continue
state = await context.get_current_state_ids(
StateFilter.from_types([(EventTypes.RoomHistoryVisibility, "")])
)
# We're not an outlier
assert state is not None
history_visibility = HistoryVisibility.SHARED
history_visibility_event_id = state.get(
(EventTypes.RoomHistoryVisibility, "")
)
if history_visibility_event_id:
for event, _ in events_and_contexts:
if event.event_id == history_visibility_event_id:
history_visibility_event = event
break
else:
history_visibility_event = await self.main_store.get_event(
history_visibility_event_id,
get_prev_content=False,
)
history_visibility = history_visibility_event.content.get(
"history_visibility", HistoryVisibility.SHARED
)
visibilities[event.event_id] = history_visibility
await self.persist_events_store._persist_events_and_state_updates(
room_id,
chunk,
@ -643,6 +681,7 @@ class EventsPersistenceStorageController:
use_negative_stream_ordering=backfilled,
inhibit_local_membership_updates=backfilled,
new_event_links=new_event_links,
visibilities=visibilities,
)
return replaced_events

View file

@ -31,6 +31,7 @@ from typing import (
Generator,
Iterable,
List,
Mapping,
Optional,
Sequence,
Set,
@ -271,6 +272,7 @@ class PersistEventsStore:
new_event_links: Dict[str, NewEventChainLinks],
use_negative_stream_ordering: bool = False,
inhibit_local_membership_updates: bool = False,
visibilities: Mapping[str, str] = {},
) -> None:
"""Persist a set of events alongside updates to the current state and
forward extremities tables.
@ -355,6 +357,7 @@ class PersistEventsStore:
new_forward_extremities=new_forward_extremities,
new_event_links=new_event_links,
sliding_sync_table_changes=sliding_sync_table_changes,
visibilities=visibilities,
)
persist_event_counter.inc(len(events_and_contexts))
@ -874,6 +877,7 @@ class PersistEventsStore:
new_forward_extremities: Optional[Set[str]],
new_event_links: Dict[str, NewEventChainLinks],
sliding_sync_table_changes: Optional[SlidingSyncTableChanges],
visibilities: Mapping[str, str] = {},
) -> None:
"""Insert some number of room events into the necessary database tables.
@ -1027,6 +1031,52 @@ class PersistEventsStore:
txn, room_id, events_and_contexts
)
changes = [
(visibilities[event.event_id], event.internal_metadata.stream_ordering)
for event, context in events_and_contexts
if event.event_id in visibilities
]
if changes:
sql = """
SELECT visibility, start_range FROM history_visibility_ranges
WHERE room_id = ?
ORDER BY start_range DESC
LIMIT 1
"""
txn.execute(sql, (room_id,))
row = txn.fetchone()
prev_visibility = None
start_range = None
if row:
(
prev_visibility,
start_range,
) = row
for new_visibility, stream_ordering in changes:
assert stream_ordering is not None
if new_visibility != prev_visibility:
if start_range is not None:
self.db_pool.simple_update_one_txn(
txn,
table="history_visibility_ranges",
keyvalues={"room_id": room_id, "start_range": start_range},
updatevalues={"end_range": stream_ordering},
)
self.db_pool.simple_insert_txn(
txn,
table="history_visibility_ranges",
values={
"room_id": room_id,
"visibility": new_visibility,
"start_range": stream_ordering,
"end_range": None,
},
)
prev_visibility = new_visibility
start_range = stream_ordering
def _persist_event_auth_chain_txn(
self,
txn: LoggingTransaction,

View file

@ -0,0 +1,25 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2024 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
CREATE TABLE IF NOT EXISTS history_visibility_ranges (
room_id TEXT NOT NULL,
visibility TEXT NOT NULL,
start_range BIGINT NOT NULL,
end_range BIGINT
);
CREATE INDEX history_visibility_ranges_idx ON history_visibility_ranges(room_id, start_range, end_range DESC);
CREATE UNIQUE INDEX history_visibility_ranges_uniq_idx ON history_visibility_ranges(room_id, start_range);
-- CREATE EXTENSION IF NOT EXISTS btree_gist;
-- CREATE INDEX history_visibility_ranges_idx_gist ON history_visibility_ranges USING gist(room_id, int8range(start_range, end_range, '[)]'));

View file

@ -0,0 +1,15 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2024 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
CREATE EXTENSION IF NOT EXISTS btree_gist;
CREATE INDEX history_visibility_ranges_idx_gist ON history_visibility_ranges USING gist(room_id, int8range(start_range, end_range, '[)'));