Update event push action and receipt tables to support threads. (#13753)

Adds a `thread_id` column to the `event_push_actions`, `event_push_actions_staging`,
and `event_push_summary` tables. This will notifications to be segmented by the thread
in a future pull request. The `thread_id` column stores the root event ID or the special
value `"main"`.

The `thread_id` column for `event_push_actions` and `event_push_summary` is
backfilled with `"main"` for all existing rows. New entries into `event_push_actions`
and `event_push_actions_staging` will get the proper thread ID.

`receipts_linearized` and `receipts_graph` also gain a `thread_id` column, which is similar,
except `NULL` is a special value meaning the receipt is "unthreaded".

See MSC3771 and MSC3773 for where this data will be useful.
This commit is contained in:
Patrick Cloke 2022-09-14 13:11:16 -04:00 committed by GitHub
parent f2d12ccabe
commit 666ae87729
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 312 additions and 20 deletions

1
changelog.d/13753.misc Normal file
View file

@ -0,0 +1 @@
Prepatory work for storing thread IDs for notifications and receipts.

View file

@ -198,7 +198,7 @@ class BulkPushRuleEvaluator:
return pl_event.content if pl_event else {}, sender_level
async def _get_mutual_relations(
self, event: EventBase, rules: Iterable[Tuple[PushRule, bool]]
self, parent_id: str, rules: Iterable[Tuple[PushRule, bool]]
) -> Dict[str, Set[Tuple[str, str]]]:
"""
Fetch event metadata for events which related to the same event as the given event.
@ -206,7 +206,7 @@ class BulkPushRuleEvaluator:
If the given event has no relation information, returns an empty dictionary.
Args:
event_id: The event ID which is targeted by relations.
parent_id: The event ID which is targeted by relations.
rules: The push rules which will be processed for this event.
Returns:
@ -220,12 +220,6 @@ class BulkPushRuleEvaluator:
if not self._relations_match_enabled:
return {}
# If the event does not have a relation, then cannot have any mutual
# relations.
relation = relation_from_event(event)
if not relation:
return {}
# Pre-filter to figure out which relation types are interesting.
rel_types = set()
for rule, enabled in rules:
@ -246,9 +240,7 @@ class BulkPushRuleEvaluator:
return {}
# If any valid rules were found, fetch the mutual relations.
return await self.store.get_mutual_event_relations(
relation.parent_id, rel_types
)
return await self.store.get_mutual_event_relations(parent_id, rel_types)
@measure_func("action_for_event_by_user")
async def action_for_event_by_user(
@ -281,9 +273,17 @@ class BulkPushRuleEvaluator:
sender_power_level,
) = await self._get_power_levels_and_sender_level(event, context)
relation = relation_from_event(event)
# If the event does not have a relation, then cannot have any mutual
# relations or thread ID.
relations = {}
thread_id = "main"
if relation:
relations = await self._get_mutual_relations(
event, itertools.chain(*rules_by_user.values())
relation.parent_id, itertools.chain(*rules_by_user.values())
)
if relation.rel_type == RelationTypes.THREAD:
thread_id = relation.parent_id
evaluator = PushRuleEvaluatorForEvent(
event,
@ -352,6 +352,7 @@ class BulkPushRuleEvaluator:
event.event_id,
actions_by_user,
count_as_unread,
thread_id,
)

View file

@ -98,6 +98,7 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
@ -232,6 +233,104 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
replaces_index="event_push_summary_user_rm",
)
self.db_pool.updates.register_background_index_update(
"event_push_summary_unique_index2",
index_name="event_push_summary_unique_index2",
table="event_push_summary",
columns=["user_id", "room_id", "thread_id"],
unique=True,
)
self.db_pool.updates.register_background_update_handler(
"event_push_backfill_thread_id",
self._background_backfill_thread_id,
)
async def _background_backfill_thread_id(
self, progress: JsonDict, batch_size: int
) -> int:
"""
Fill in the thread_id field for event_push_actions and event_push_summary.
This is preparatory so that it can be made non-nullable in the future.
Because all current (null) data is done in an unthreaded manner this
simply assumes it is on the "main" timeline. Since event_push_actions
are periodically cleared it is not possible to correctly re-calculate
the thread_id.
"""
event_push_actions_done = progress.get("event_push_actions_done", False)
def add_thread_id_txn(
txn: LoggingTransaction, table_name: str, start_stream_ordering: int
) -> int:
sql = f"""
SELECT stream_ordering
FROM {table_name}
WHERE
thread_id IS NULL
AND stream_ordering > ?
ORDER BY stream_ordering
LIMIT ?
"""
txn.execute(sql, (start_stream_ordering, batch_size))
# No more rows to process.
rows = txn.fetchall()
if not rows:
progress[f"{table_name}_done"] = True
self.db_pool.updates._background_update_progress_txn(
txn, "event_push_backfill_thread_id", progress
)
return 0
# Update the thread ID for any of those rows.
max_stream_ordering = rows[-1][0]
sql = f"""
UPDATE {table_name}
SET thread_id = 'main'
WHERE stream_ordering <= ? AND thread_id IS NULL
"""
txn.execute(sql, (max_stream_ordering,))
# Update progress.
processed_rows = txn.rowcount
progress[f"max_{table_name}_stream_ordering"] = max_stream_ordering
self.db_pool.updates._background_update_progress_txn(
txn, "event_push_backfill_thread_id", progress
)
return processed_rows
# First update the event_push_actions table, then the event_push_summary table.
#
# Note that the event_push_actions_staging table is ignored since it is
# assumed that items in that table will only exist for a short period of
# time.
if not event_push_actions_done:
result = await self.db_pool.runInteraction(
"event_push_backfill_thread_id",
add_thread_id_txn,
"event_push_actions",
progress.get("max_event_push_actions_stream_ordering", 0),
)
else:
result = await self.db_pool.runInteraction(
"event_push_backfill_thread_id",
add_thread_id_txn,
"event_push_summary",
progress.get("max_event_push_summary_stream_ordering", 0),
)
# Only done after the event_push_summary table is done.
if not result:
await self.db_pool.updates._end_background_update(
"event_push_backfill_thread_id"
)
return result
@cached(tree=True, max_entries=5000)
async def get_unread_event_push_actions_by_room_for_user(
self,
@ -670,6 +769,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
event_id: str,
user_id_actions: Dict[str, Collection[Union[Mapping, str]]],
count_as_unread: bool,
thread_id: str,
) -> None:
"""Add the push actions for the event to the push action staging area.
@ -678,6 +778,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
user_id_actions: A mapping of user_id to list of push actions, where
an action can either be a string or dict.
count_as_unread: Whether this event should increment unread counts.
thread_id: The thread this event is parent of, if applicable.
"""
if not user_id_actions:
return
@ -686,7 +787,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
# can be used to insert into the `event_push_actions_staging` table.
def _gen_entry(
user_id: str, actions: Collection[Union[Mapping, str]]
) -> Tuple[str, str, str, int, int, int]:
) -> Tuple[str, str, str, int, int, int, str]:
is_highlight = 1 if _action_has_highlight(actions) else 0
notif = 1 if "notify" in actions else 0
return (
@ -696,11 +797,20 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
notif, # notif column
is_highlight, # highlight column
int(count_as_unread), # unread column
thread_id, # thread_id column
)
await self.db_pool.simple_insert_many(
"event_push_actions_staging",
keys=("event_id", "user_id", "actions", "notif", "highlight", "unread"),
keys=(
"event_id",
"user_id",
"actions",
"notif",
"highlight",
"unread",
"thread_id",
),
values=[
_gen_entry(user_id, actions)
for user_id, actions in user_id_actions.items()
@ -981,6 +1091,8 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
)
# Replace the previous summary with the new counts.
#
# TODO(threads): Upsert per-thread instead of setting them all to main.
self.db_pool.simple_upsert_txn(
txn,
table="event_push_summary",
@ -990,6 +1102,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
"unread_count": unread_count,
"stream_ordering": old_rotate_stream_ordering,
"last_receipt_stream_ordering": stream_ordering,
"thread_id": "main",
},
)
@ -1138,17 +1251,19 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
logger.info("Rotating notifications, handling %d rows", len(summaries))
# TODO(threads): Update on a per-thread basis.
self.db_pool.simple_upsert_many_txn(
txn,
table="event_push_summary",
key_names=("user_id", "room_id"),
key_values=[(user_id, room_id) for user_id, room_id in summaries],
value_names=("notif_count", "unread_count", "stream_ordering"),
value_names=("notif_count", "unread_count", "stream_ordering", "thread_id"),
value_values=[
(
summary.notif_count,
summary.unread_count,
summary.stream_ordering,
"main",
)
for summary in summaries.values()
],

View file

@ -2192,9 +2192,9 @@ class PersistEventsStore:
sql = """
INSERT INTO event_push_actions (
room_id, event_id, user_id, actions, stream_ordering,
topological_ordering, notif, highlight, unread
topological_ordering, notif, highlight, unread, thread_id
)
SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread
SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread, thread_id
FROM event_push_actions_staging
WHERE event_id = ?
"""

View file

@ -113,6 +113,24 @@ class ReceiptsWorkerStore(SQLBaseStore):
prefilled_cache=receipts_stream_prefill,
)
self.db_pool.updates.register_background_index_update(
"receipts_linearized_unique_index",
index_name="receipts_linearized_unique_index",
table="receipts_linearized",
columns=["room_id", "receipt_type", "user_id"],
where_clause="thread_id IS NULL",
unique=True,
)
self.db_pool.updates.register_background_index_update(
"receipts_graph_unique_index",
index_name="receipts_graph_unique_index",
table="receipts_graph",
columns=["room_id", "receipt_type", "user_id"],
where_clause="thread_id IS NULL",
unique=True,
)
def get_max_receipt_stream_id(self) -> int:
"""Get the current max stream ID for receipts stream"""
return self._receipts_id_gen.get_current_token()
@ -677,6 +695,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
"event_id": event_id,
"event_stream_ordering": stream_ordering,
"data": json_encoder.encode(data),
"thread_id": None,
},
# receipts_linearized has a unique constraint on
# (user_id, room_id, receipt_type), so no need to lock
@ -824,6 +843,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
values={
"event_ids": json_encoder.encode(event_ids),
"data": json_encoder.encode(data),
"thread_id": None,
},
# receipts_graph has a unique constraint on
# (user_id, room_id, receipt_type), so no need to lock

View file

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
SCHEMA_VERSION = 72 # remember to update the list below when updating
SCHEMA_VERSION = 73 # remember to update the list below when updating
"""Represents the expectations made by the codebase about the database schema
This should be incremented whenever the codebase changes its requirements on the
@ -77,6 +77,10 @@ Changes in SCHEMA_VERSION = 72:
- Tables related to groups are dropped.
- Unused column application_services_state.last_txn is dropped
- Cache invalidation stream id sequence now begins at 2 to match code expectation.
Changes in SCHEMA_VERSION = 73;
- thread_id column is added to event_push_actions, event_push_actions_staging
event_push_summary, receipts_linearized, and receipts_graph.
"""

View file

@ -0,0 +1,30 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Add a nullable column for thread ID to the event push actions tables; this
-- will be filled in with a default value for any previously existing rows.
--
-- After migration this can be made non-nullable.
ALTER TABLE event_push_actions_staging ADD COLUMN thread_id TEXT;
ALTER TABLE event_push_actions ADD COLUMN thread_id TEXT;
ALTER TABLE event_push_summary ADD COLUMN thread_id TEXT;
-- Update the unique index for `event_push_summary`.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(7006, 'event_push_summary_unique_index2', '{}');
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
(7006, 'event_push_backfill_thread_id', '{}', 'event_push_summary_unique_index2');

View file

@ -0,0 +1,30 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Add a nullable column for thread ID to the receipts table; this allows a
-- receipt per user, per room, as well as an unthreaded receipt (corresponding
-- to a null thread ID).
ALTER TABLE receipts_linearized ADD COLUMN thread_id TEXT;
ALTER TABLE receipts_graph ADD COLUMN thread_id TEXT;
-- Rebuild the unique constraint with the thread_id.
ALTER TABLE receipts_linearized
ADD CONSTRAINT receipts_linearized_uniqueness_thread
UNIQUE (room_id, receipt_type, user_id, thread_id);
ALTER TABLE receipts_graph
ADD CONSTRAINT receipts_graph_uniqueness_thread
UNIQUE (room_id, receipt_type, user_id, thread_id);

View file

@ -0,0 +1,70 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Allow multiple receipts per user per room via a nullable thread_id column.
--
-- SQLite doesn't support modifying constraints to an existing table, so it must
-- be recreated.
-- Create the new tables.
CREATE TABLE receipts_linearized_new (
stream_id BIGINT NOT NULL,
room_id TEXT NOT NULL,
receipt_type TEXT NOT NULL,
user_id TEXT NOT NULL,
event_id TEXT NOT NULL,
thread_id TEXT,
event_stream_ordering BIGINT,
data TEXT NOT NULL,
CONSTRAINT receipts_linearized_uniqueness UNIQUE (room_id, receipt_type, user_id),
CONSTRAINT receipts_linearized_uniqueness_thread UNIQUE (room_id, receipt_type, user_id, thread_id)
);
CREATE TABLE receipts_graph_new (
room_id TEXT NOT NULL,
receipt_type TEXT NOT NULL,
user_id TEXT NOT NULL,
event_ids TEXT NOT NULL,
thread_id TEXT,
data TEXT NOT NULL,
CONSTRAINT receipts_graph_uniqueness UNIQUE (room_id, receipt_type, user_id),
CONSTRAINT receipts_graph_uniqueness_thread UNIQUE (room_id, receipt_type, user_id, thread_id)
);
-- Drop the old indexes.
DROP INDEX IF EXISTS receipts_linearized_id;
DROP INDEX IF EXISTS receipts_linearized_room_stream;
DROP INDEX IF EXISTS receipts_linearized_user;
-- Copy the data.
INSERT INTO receipts_linearized_new (stream_id, room_id, receipt_type, user_id, event_id, event_stream_ordering, data)
SELECT stream_id, room_id, receipt_type, user_id, event_id, event_stream_ordering, data
FROM receipts_linearized;
INSERT INTO receipts_graph_new (room_id, receipt_type, user_id, event_ids, data)
SELECT room_id, receipt_type, user_id, event_ids, data
FROM receipts_graph;
-- Drop the old tables.
DROP TABLE receipts_linearized;
DROP TABLE receipts_graph;
-- Rename the tables.
ALTER TABLE receipts_linearized_new RENAME TO receipts_linearized;
ALTER TABLE receipts_graph_new RENAME TO receipts_graph;
-- Create the indices.
CREATE INDEX receipts_linearized_id ON receipts_linearized( stream_id );
CREATE INDEX receipts_linearized_room_stream ON receipts_linearized( room_id, stream_id );
CREATE INDEX receipts_linearized_user ON receipts_linearized( user_id );

View file

@ -0,0 +1,20 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(7007, 'receipts_linearized_unique_index', '{}');
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(7007, 'receipts_graph_unique_index', '{}');

View file

@ -404,6 +404,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
event.event_id,
{user_id: actions for user_id, actions in push_actions},
False,
"main",
)
)
return event, context