From d7675e79e13c1fd44dd97033d1e69f57811d75d1 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Aug 2019 13:45:05 +0100 Subject: [PATCH 01/29] Add schema for Separated Statistics Signed-off-by: Olivier Wilkinson (reivilibre) --- .../schema/delta/56/stats_separated1.sql | 115 ++++++++++++++++++ .../schema/delta/56/stats_separated2.py | 87 +++++++++++++ 2 files changed, 202 insertions(+) create mode 100644 synapse/storage/schema/delta/56/stats_separated2.py diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 5b125d17b0..8d8e8fb97c 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -31,3 +31,118 @@ DELETE FROM background_updates WHERE update_name IN ( 'populate_stats_process_rooms', 'populate_stats_cleanup' ); + +----- Create tables for our version of room stats. + +-- single-row table to track position of incremental updates +CREATE TABLE IF NOT EXISTS stats_incremental_position ( + -- the stream_id of the last-processed state delta + state_delta_stream_id BIGINT, + + -- the stream_ordering of the last-processed backfilled event + -- (this is negative) + total_events_min_stream_ordering BIGINT, + + -- the stream_ordering of the last-processed normally-created event + -- (this is positive) + total_events_max_stream_ordering BIGINT, + + -- If true, this represents the contract agreed upon by the background + -- population processor. + -- If false, this is suitable for use by the delta/incremental processor. + is_background_contract BOOLEAN NOT NULL PRIMARY KEY +); + +-- insert a null row and make sure it is the only one. +DELETE FROM stats_incremental_position; +INSERT INTO stats_incremental_position ( + state_delta_stream_id, + total_events_min_stream_ordering, + total_events_max_stream_ordering, + is_background_contract +) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1)); + +-- represents PRESENT room statistics for a room +CREATE TABLE IF NOT EXISTS room_stats_current ( + room_id TEXT NOT NULL PRIMARY KEY, + + -- These starts cover the time from start_ts...end_ts (in seconds). + -- Note that end_ts is quantised, and start_ts usually so. + start_ts BIGINT, + end_ts BIGINT, + + current_state_events INT NOT NULL DEFAULT 0, + total_events INT NOT NULL DEFAULT 0, + joined_members INT NOT NULL DEFAULT 0, + invited_members INT NOT NULL DEFAULT 0, + left_members INT NOT NULL DEFAULT 0, + banned_members INT NOT NULL DEFAULT 0, + + -- If initial background count is still to be performed: NULL + -- If initial background count has been performed: the maximum delta stream + -- position that this row takes into account. + completed_delta_stream_id BIGINT, + + CONSTRAINT timestamp_nullity_equality CHECK ((start_ts IS NULL) = (end_ts IS NULL)) +); + + +-- represents HISTORICAL room statistics for a room +CREATE TABLE IF NOT EXISTS room_stats_historical ( + room_id TEXT NOT NULL, + -- These stats cover the time from (end_ts - bucket_size)...end_ts (in seconds). + -- Note that end_ts is quantised, and start_ts usually so. + end_ts BIGINT NOT NULL, + bucket_size INT NOT NULL, + + current_state_events INT NOT NULL, + total_events INT NOT NULL, + joined_members INT NOT NULL, + invited_members INT NOT NULL, + left_members INT NOT NULL, + banned_members INT NOT NULL, + + PRIMARY KEY (room_id, end_ts) +); + +-- We use this index to speed up deletion of ancient room stats. +CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical (end_ts); + +-- We don't need an index on (room_id, end_ts) because PRIMARY KEY sorts that +-- out for us. (We would want it to review stats for a particular room.) + + +-- represents PRESENT statistics for a user +CREATE TABLE IF NOT EXISTS user_stats_current ( + user_id TEXT NOT NULL PRIMARY KEY, + + -- The timestamp that represents the start of the + start_ts BIGINT, + end_ts BIGINT, + + public_rooms INT DEFAULT 0 NOT NULL, + private_rooms INT DEFAULT 0 NOT NULL, + + -- If initial background count is still to be performed: NULL + -- If initial background count has been performed: the maximum delta stream + -- position that this row takes into account. + completed_delta_stream_id BIGINT +); + +-- represents HISTORICAL statistics for a user +CREATE TABLE IF NOT EXISTS user_stats_historical ( + user_id TEXT NOT NULL, + end_ts BIGINT NOT NULL, + bucket_size INT NOT NULL, + + public_rooms INT NOT NULL, + private_rooms INT NOT NULL, + + PRIMARY KEY (user_id, end_ts) +); + +-- We use this index to speed up deletion of ancient user stats. +CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical (end_ts); + +-- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that +-- out for us. (We would want it to review stats for a particular user.) diff --git a/synapse/storage/schema/delta/56/stats_separated2.py b/synapse/storage/schema/delta/56/stats_separated2.py new file mode 100644 index 0000000000..049867fa3e --- /dev/null +++ b/synapse/storage/schema/delta/56/stats_separated2.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This schema delta will be run after 'stats_separated1.sql' due to lexicographic +# ordering. Note that it MUST be so. +from synapse.storage.engines import PostgresEngine, Sqlite3Engine + + +def _run_create_generic(stats_type, cursor, database_engine): + """ + Creates the pertinent (partial, if supported) indices for one kind of stats. + Args: + stats_type: "room" or "user" - the type of stats + cursor: Database Cursor + database_engine: Database Engine + """ + if isinstance(database_engine, Sqlite3Engine): + # even though SQLite >= 3.8 can support partial indices, we won't enable + # them, in case the SQLite database may be later used on another system. + # It's also the case that SQLite is only likely to be used in small + # deployments or testing, where the optimisations gained by use of a + # partial index are not a big concern. + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS %s_stats_current_dirty + ON %s_stats_current (end_ts); + """ + % (stats_type, stats_type) + ) + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS %s_stats_not_complete + ON %s_stats_current (completed_delta_stream_id, %s_id); + """ + % (stats_type, stats_type, stats_type) + ) + elif isinstance(database_engine, PostgresEngine): + # This partial index helps us with finding dirty stats rows + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS %s_stats_current_dirty + ON %s_stats_current (end_ts) + WHERE end_ts IS NOT NULL; + """ + % (stats_type, stats_type) + ) + # This partial index helps us with old collection + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS %s_stats_not_complete + ON %s_stats_current (%s_id) + WHERE completed_delta_stream_id IS NULL; + """ + % (stats_type, stats_type, stats_type) + ) + else: + raise NotImplementedError("Unknown database engine.") + + +def run_create(cursor, database_engine): + """ + This function is called as part of the schema delta. + It will create indices - partial, if supported - for the new 'separated' + room & user statistics. + """ + _run_create_generic("room", cursor, database_engine) + _run_create_generic("user", cursor, database_engine) + + +def run_upgrade(cur, database_engine, config): + """ + This function is run on a database upgrade (of a non-empty database). + We have no need to do anything specific here. + """ + pass From 80a1c6e9e5e4fbaa58355559e42a9a1bbc91c81f Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Aug 2019 14:08:02 +0100 Subject: [PATCH 02/29] Add storage function for storing stats deltas Old collection is not included in this commit Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 171 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 169 insertions(+), 2 deletions(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 123c1ae220..e8b1ce240b 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2018, 2019 New Vector Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,6 +17,7 @@ import logging from synapse.storage.state_deltas import StateDeltasStore +from twisted.internet import defer logger = logging.getLogger(__name__) @@ -27,12 +29,21 @@ ABSOLUTE_STATS_FIELDS = { "invited_members", "left_members", "banned_members", - "state_events", + "total_events", ), "user": ("public_rooms", "private_rooms"), } -TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} +# these fields are per-timeslice and so should be reset to 0 upon a new slice +PER_SLICE_FIELDS = {"room": (), "user": ()} + +TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} + + +class OldCollectionRequired(Exception): + """ Signal that we need to collect old stats rows and retry. """ + + pass class StatsStore(StateDeltasStore): @@ -48,6 +59,21 @@ class StatsStore(StateDeltasStore): self.register_noop_background_update("populate_stats_process_rooms") self.register_noop_background_update("populate_stats_cleanup") + def quantise_stats_time(self, ts): + """ + Quantises a timestamp to be a multiple of the bucket size. + + Args: + ts: the timestamp to quantise, in seconds since the Unix Epoch + + Returns: + a timestamp which + - is divisible by the bucket size; + - is no later than `ts`; and + - is the largest such timestamp. + """ + return (ts // self.stats_bucket_size) * self.stats_bucket_size + def update_room_state(self, room_id, fields): """ Args: @@ -76,3 +102,144 @@ class StatsStore(StateDeltasStore): values=fields, desc="update_room_state", ) + + @defer.inlineCallbacks + def update_stats_delta( + self, ts, stats_type, stats_id, fields, complete_with_stream_id=None + ): + """ + Updates the statistics for a subject, with a delta (difference/relative + change). + + Args: + ts (int): timestamp of the change + stats_type (str): "room" or "user" – the kind of subject + stats_id (str): the subject's ID (room ID or user ID) + fields (dict[str, int]): Deltas of stats values. + complete_with_stream_id (int, optional): + If supplied, converts an incomplete row into a complete row, + with the supplied stream_id marked as the stream_id where the + row was completed. + """ + + while True: + try: + res = yield self.runInteraction( + "update_stats_delta", + self._update_stats_delta_txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id=complete_with_stream_id, + ) + return res + except OldCollectionRequired: + # retry after collecting old rows + # TODO (implement later) + raise NotImplementedError("old collection not in this PR") + + def _update_stats_delta_txn( + self, + txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id=None, + absolute_fields=None, + ): + """ + See L{update_stats_delta} + Additional Args: + absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas). + """ + table, id_col = TYPE_TO_TABLE[stats_type] + + quantised_ts = self.quantise_stats_time(int(ts)) + end_ts = quantised_ts + self.stats_bucket_size + + field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()] + field_values = list(fields.values()) + + if absolute_fields is not None: + field_sqls += ["%s = ?" % (field,) for field in absolute_fields.keys()] + field_values += list(absolute_fields.values()) + + if complete_with_stream_id is not None: + field_sqls.append("completed_delta_stream_id = ?") + field_values.append(complete_with_stream_id) + + sql = ( + "UPDATE %s_current SET end_ts = ?, %s" + " WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))" + " AND %s = ?" + ) % (table, ", ".join(field_sqls), id_col) + + qargs = [end_ts] + list(field_values) + [end_ts, stats_id] + + txn.execute(sql, qargs) + + if txn.rowcount > 0: + # success. + return + + # if we're here, it's because we didn't succeed in updating a stats + # row. Why? Let's find out… + + current_row = self._simple_select_one_txn( + txn, + table + "_current", + {id_col: stats_id}, + ("end_ts", "completed_delta_stream_id"), + allow_none=True, + ) + + if current_row is None: + # we need to insert a row! (insert a dirty, incomplete row) + insertee = { + id_col: stats_id, + "end_ts": end_ts, + "start_ts": ts, + "completed_delta_stream_id": complete_with_stream_id, + } + + # we assume that, by default, blank fields should be zero. + for field_name in ABSOLUTE_STATS_FIELDS[stats_type]: + insertee[field_name] = 0 + + for field_name in PER_SLICE_FIELDS[stats_type]: + insertee[field_name] = 0 + + for (field, value) in fields.items(): + insertee[field] = value + + if absolute_fields is not None: + for (field, value) in absolute_fields.items(): + insertee[field] = value + + self._simple_insert_txn(txn, table + "_current", insertee) + + elif current_row["end_ts"] is None: + # update the row, including start_ts + sql = ( + "UPDATE %s_current SET start_ts = ?, end_ts = ?, %s" + " WHERE end_ts IS NULL AND %s = ?" + ) % (table, ", ".join(field_sqls), id_col) + + qargs = ( + [end_ts - self.stats_bucket_size, end_ts] + + list(field_values) + + [stats_id] + ) + + txn.execute(sql, qargs) + if txn.rowcount == 0: + raise RuntimeError( + "Should be impossible: No rows updated" + " but all conditions are known to be met." + ) + + elif current_row["end_ts"] < end_ts: + # we need to perform old collection first + raise OldCollectionRequired() From 1819563640e1be839c348e18afc1b59c7b3b8c9c Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Aug 2019 14:27:47 +0100 Subject: [PATCH 03/29] Ack, isort! Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index e8b1ce240b..4e0a3d4f6e 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -16,9 +16,10 @@ import logging -from synapse.storage.state_deltas import StateDeltasStore from twisted.internet import defer +from synapse.storage.state_deltas import StateDeltasStore + logger = logging.getLogger(__name__) # these fields track absolutes (e.g. total number of rooms on the server) From b5573c0ffb97059f672d465be7dd38c94854411d Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Tue, 20 Aug 2019 15:02:49 +0100 Subject: [PATCH 04/29] Update synapse/storage/stats.py Co-Authored-By: Erik Johnston --- synapse/storage/stats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 4e0a3d4f6e..095924cae6 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -68,7 +68,7 @@ class StatsStore(StateDeltasStore): ts: the timestamp to quantise, in seconds since the Unix Epoch Returns: - a timestamp which + int: a timestamp which - is divisible by the bucket size; - is no later than `ts`; and - is the largest such timestamp. From 4a97eef0dc49ee8f3c446221f0fcbb0e65ece113 Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Tue, 20 Aug 2019 15:12:21 +0100 Subject: [PATCH 05/29] Update synapse/storage/stats.py Co-Authored-By: Erik Johnston --- synapse/storage/stats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 095924cae6..0445b97b4a 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -65,7 +65,7 @@ class StatsStore(StateDeltasStore): Quantises a timestamp to be a multiple of the bucket size. Args: - ts: the timestamp to quantise, in seconds since the Unix Epoch + ts (int): the timestamp to quantise, in seconds since the Unix Epoch Returns: int: a timestamp which From 6a19f7e1010e409752507fd29b23af5b795a0440 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Aug 2019 15:35:07 +0100 Subject: [PATCH 06/29] Add room and user statistics documentation. Signed-off-by: Olivier Wilkinson (reivilibre) --- docs/room_and_user_statistics.md | 146 +++++++++++++++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 docs/room_and_user_statistics.md diff --git a/docs/room_and_user_statistics.md b/docs/room_and_user_statistics.md new file mode 100644 index 0000000000..eb003f771a --- /dev/null +++ b/docs/room_and_user_statistics.md @@ -0,0 +1,146 @@ +Room and User Statistics +======================== + +Synapse maintains room and user statistics (as well as a cache of room state), +in various tables. + +These can be used for administrative purposes but are also used when generating +the public room directory. If these tables get stale or out of sync (possibly +after database corruption), you may wish to regenerate them. + + +# Synapse Administrator Documentation + +## Various SQL scripts that you may find useful + +### Delete stats, including historical stats + +```sql +DELETE FROM room_stats_current; +DELETE FROM room_stats_historical; +DELETE FROM user_stats_current; +DELETE FROM user_stats_historical; +``` + +### Regenerate stats (all subjects) + +```sql +BEGIN; + DELETE FROM stats_incremental_position; + INSERT INTO stats_incremental_position ( + state_delta_stream_id, + total_events_min_stream_ordering, + total_events_max_stream_ordering, + is_background_contract + ) VALUES (NULL, NULL, NULL, FALSE), (NULL, NULL, NULL, TRUE); +COMMIT; + +DELETE FROM room_stats_current; +DELETE FROM user_stats_current; +``` + +then follow the steps below for **'Regenerate stats (missing subjects only)'** + +### Regenerate stats (missing subjects only) + +```sql +-- Set up staging tables +-- we depend on current_state_events_membership because this is used +-- in our counting. +INSERT INTO background_updates (update_name, progress_json) VALUES + ('populate_stats_prepare', '{}', 'current_state_events_membership'); + +-- Run through each room and update stats +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_stats_process_rooms', '{}', 'populate_stats_prepare'); + +-- Run through each user and update stats. +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_stats_process_users', '{}', 'populate_stats_process_rooms'); + +-- Clean up staging tables +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_stats_cleanup', '{}', 'populate_stats_process_users'); +``` + +then **restart Synapse**. + + +# Synapse Developer Documentation + +## High-Level Concepts + +### Definitions + +* **subject**: Something we are tracking stats about – currently a room or user. +* **current row**: An entry for a subject in the appropriate current statistics + table. Each subject can have only one. +* **historical row**: An entry for a subject in the appropriate historical + statistics table. Each subject can have any number of these. + +### Overview + +Stats are maintained as time series. There are two kinds of column: + +* absolute columns – where the value is correct for the time given by `end_ts` + in the stats row. (Imagine a line graph for these values) +* per-slice columns – where the value corresponds to how many of the occurrences + occurred within the time slice given by `(end_ts − bucket_size)…end_ts` + or `start_ts…end_ts`. (Imagine a histogram for these values) + +Currently, only absolute columns are in use. + +Stats are maintained in two tables (for each type): current and historical. + +Current stats correspond to the present values. Each subject can only have one +entry. + +Historical stats correspond to values in the past. Subjects may have multiple +entries. + +## Concepts around the management of stats + +### current rows + +#### dirty current rows + +Current rows can be **dirty**, which means that they have changed since the +latest historical row for the same subject. +**Dirty** current rows possess an end timestamp, `end_ts`. + +#### old current rows and old collection + +When a (necessarily dirty) current row has an `end_ts` in the past, it is said +to be **old**. +Old current rows must be copied into a historical row, and cleared of their dirty +status, before further statistics can be tracked for that subject. +The process which does this is referred to as **old collection**. + +#### incomplete current rows + +There are also **incomplete** current rows, which are current rows that do not +contain a full count yet – this is because they are waiting for the stats +regenerator to give them an initial count. Incomplete current rows DO NOT contain +correct and up-to-date values. As such, *incomplete rows are not old-collected*. +Instead, old incomplete rows will be extended so they are no longer old. + +### historical rows + +Historical rows can always be considered to be valid for the time slice and +end time specified. (This, of course, assumes a lack of defects in the code +to track the statistics, and assumes integrity of the database). + +Even still, there are two considerations that we may need to bear in mind: + +* historical rows will not exist for every time slice – they will be omitted + if there were no changes. In this case, the following assumptions can be + made to interpolate/recreate missing rows: + - absolute fields have the same values as in the preceding row + - per-slice fields are zero (`0`) +* historical rows will not be retained forever – rows older than a configurable + time will be purged. + +#### purge + +The purging of historical rows is not yet implemented. + From 981c6cf5442bfb16c177f995deedeb3ec44bf5fb Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Aug 2019 15:41:10 +0100 Subject: [PATCH 07/29] Sanitise accepted fields in `_update_stats_delta_txn` Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 0445b97b4a..a372f35eae 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -15,6 +15,7 @@ # limitations under the License. import logging +from itertools import chain from twisted.internet import defer @@ -160,6 +161,17 @@ class StatsStore(StateDeltasStore): quantised_ts = self.quantise_stats_time(int(ts)) end_ts = quantised_ts + self.stats_bucket_size + for field in chain(fields.keys(), absolute_fields.keys()): + if ( + field not in ABSOLUTE_STATS_FIELDS[stats_type] + and field not in PER_SLICE_FIELDS[stats_type] + ): + # guard against potential SQL injection dodginess + raise ValueError( + "%s is not a recognised field" + " for stats type %s" % (field, stats_type) + ) + field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()] field_values = list(fields.values()) From 977310ee2767e4edaa20e4a2216be359a7eb8002 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Aug 2019 15:49:00 +0100 Subject: [PATCH 08/29] Clarify `_update_stats_delta_txn` Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index a372f35eae..cebe4cbc57 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -183,6 +183,14 @@ class StatsStore(StateDeltasStore): field_sqls.append("completed_delta_stream_id = ?") field_values.append(complete_with_stream_id) + # update current row, but only if it is either: + # - dirty and complete but not old + # If it is old, we should old-collect it first and retry. + # - dirty and incomplete + # Incomplete rows can't be old-collected (as this would commit + # false statistics into the _historical table). + # Instead, their `end_ts` is extended, whilst we wait for them to + # become complete at the hand of the stats regenerator. sql = ( "UPDATE %s_current SET end_ts = ?, %s" " WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))" @@ -209,6 +217,8 @@ class StatsStore(StateDeltasStore): ) if current_row is None: + # Failure reason: There is no row. + # Solution: # we need to insert a row! (insert a dirty, incomplete row) insertee = { id_col: stats_id, @@ -234,7 +244,9 @@ class StatsStore(StateDeltasStore): self._simple_insert_txn(txn, table + "_current", insertee) elif current_row["end_ts"] is None: - # update the row, including start_ts + # Failure reason: The row is not dirty. + # Solution: + # update the row, including `start_ts`, to make it dirty. sql = ( "UPDATE %s_current SET start_ts = ?, end_ts = ?, %s" " WHERE end_ts IS NULL AND %s = ?" @@ -254,5 +266,6 @@ class StatsStore(StateDeltasStore): ) elif current_row["end_ts"] < end_ts: - # we need to perform old collection first + # Failure reason: The row is complete and old. + # Solution: We need to perform old collection first raise OldCollectionRequired() From eafa8d3c54e03ca2c7929125a5ce5ea015491bf5 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Aug 2019 16:03:37 +0100 Subject: [PATCH 09/29] Unify name of 'stats regenerator' in schema comments. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/schema/delta/56/stats_separated1.sql | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 8d8e8fb97c..19a91416c2 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -47,8 +47,8 @@ CREATE TABLE IF NOT EXISTS stats_incremental_position ( -- (this is positive) total_events_max_stream_ordering BIGINT, - -- If true, this represents the contract agreed upon by the background - -- population processor. + -- If true, this represents the contract agreed upon by the stats + -- regenerator. -- If false, this is suitable for use by the delta/incremental processor. is_background_contract BOOLEAN NOT NULL PRIMARY KEY ); @@ -78,8 +78,8 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( left_members INT NOT NULL DEFAULT 0, banned_members INT NOT NULL DEFAULT 0, - -- If initial background count is still to be performed: NULL - -- If initial background count has been performed: the maximum delta stream + -- If initial stats regen is still to be performed: NULL + -- If initial stats regen has been performed: the maximum delta stream -- position that this row takes into account. completed_delta_stream_id BIGINT, @@ -123,8 +123,8 @@ CREATE TABLE IF NOT EXISTS user_stats_current ( public_rooms INT DEFAULT 0 NOT NULL, private_rooms INT DEFAULT 0 NOT NULL, - -- If initial background count is still to be performed: NULL - -- If initial background count has been performed: the maximum delta stream + -- If initial stats regen is still to be performed: NULL + -- If initial stats regen has been performed: the maximum delta stream -- position that this row takes into account. completed_delta_stream_id BIGINT ); From 18a4c03c50236a2da6b5aa5321ca084f18dbc36d Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Aug 2019 16:04:04 +0100 Subject: [PATCH 10/29] Remove needless defaults. Signed-off-by: Olivier Wilkinson (reivilibre) --- .../storage/schema/delta/56/stats_separated1.sql | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 19a91416c2..1e17eae226 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -71,12 +71,12 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( start_ts BIGINT, end_ts BIGINT, - current_state_events INT NOT NULL DEFAULT 0, - total_events INT NOT NULL DEFAULT 0, - joined_members INT NOT NULL DEFAULT 0, - invited_members INT NOT NULL DEFAULT 0, - left_members INT NOT NULL DEFAULT 0, - banned_members INT NOT NULL DEFAULT 0, + current_state_events INT NOT NULL, + total_events INT NOT NULL, + joined_members INT NOT NULL, + invited_members INT NOT NULL, + left_members INT NOT NULL, + banned_members INT NOT NULL, -- If initial stats regen is still to be performed: NULL -- If initial stats regen has been performed: the maximum delta stream @@ -120,8 +120,8 @@ CREATE TABLE IF NOT EXISTS user_stats_current ( start_ts BIGINT, end_ts BIGINT, - public_rooms INT DEFAULT 0 NOT NULL, - private_rooms INT DEFAULT 0 NOT NULL, + public_rooms INT NOT NULL, + private_rooms INT NOT NULL, -- If initial stats regen is still to be performed: NULL -- If initial stats regen has been performed: the maximum delta stream From 7b657f1148fa10234d52d333ff176969f296aa0f Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 22 Aug 2019 15:40:58 +0100 Subject: [PATCH 11/29] Simplify table structure This obviates the need for old collection, but comes at the minor cost of not being able to track historical stats or per-slice fields until after the statistics regenerator is finished. Signed-off-by: Olivier Wilkinson (reivilibre) --- .../schema/delta/56/stats_separated1.sql | 15 +- .../schema/delta/56/stats_separated2.py | 18 +- synapse/storage/stats.py | 280 +++++++++++------- 3 files changed, 175 insertions(+), 138 deletions(-) diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 1e17eae226..d7418fdf1e 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -63,14 +63,10 @@ INSERT INTO stats_incremental_position ( ) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1)); -- represents PRESENT room statistics for a room +-- only holds absolute fields CREATE TABLE IF NOT EXISTS room_stats_current ( room_id TEXT NOT NULL PRIMARY KEY, - -- These starts cover the time from start_ts...end_ts (in seconds). - -- Note that end_ts is quantised, and start_ts usually so. - start_ts BIGINT, - end_ts BIGINT, - current_state_events INT NOT NULL, total_events INT NOT NULL, joined_members INT NOT NULL, @@ -82,8 +78,6 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( -- If initial stats regen has been performed: the maximum delta stream -- position that this row takes into account. completed_delta_stream_id BIGINT, - - CONSTRAINT timestamp_nullity_equality CHECK ((start_ts IS NULL) = (end_ts IS NULL)) ); @@ -91,7 +85,7 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( CREATE TABLE IF NOT EXISTS room_stats_historical ( room_id TEXT NOT NULL, -- These stats cover the time from (end_ts - bucket_size)...end_ts (in seconds). - -- Note that end_ts is quantised, and start_ts usually so. + -- Note that end_ts is quantised. end_ts BIGINT NOT NULL, bucket_size INT NOT NULL, @@ -113,13 +107,10 @@ CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical -- represents PRESENT statistics for a user +-- only holds absolute fields CREATE TABLE IF NOT EXISTS user_stats_current ( user_id TEXT NOT NULL PRIMARY KEY, - -- The timestamp that represents the start of the - start_ts BIGINT, - end_ts BIGINT, - public_rooms INT NOT NULL, private_rooms INT NOT NULL, diff --git a/synapse/storage/schema/delta/56/stats_separated2.py b/synapse/storage/schema/delta/56/stats_separated2.py index 049867fa3e..942d240010 100644 --- a/synapse/storage/schema/delta/56/stats_separated2.py +++ b/synapse/storage/schema/delta/56/stats_separated2.py @@ -32,13 +32,6 @@ def _run_create_generic(stats_type, cursor, database_engine): # It's also the case that SQLite is only likely to be used in small # deployments or testing, where the optimisations gained by use of a # partial index are not a big concern. - cursor.execute( - """ - CREATE INDEX IF NOT EXISTS %s_stats_current_dirty - ON %s_stats_current (end_ts); - """ - % (stats_type, stats_type) - ) cursor.execute( """ CREATE INDEX IF NOT EXISTS %s_stats_not_complete @@ -47,16 +40,7 @@ def _run_create_generic(stats_type, cursor, database_engine): % (stats_type, stats_type, stats_type) ) elif isinstance(database_engine, PostgresEngine): - # This partial index helps us with finding dirty stats rows - cursor.execute( - """ - CREATE INDEX IF NOT EXISTS %s_stats_current_dirty - ON %s_stats_current (end_ts) - WHERE end_ts IS NOT NULL; - """ - % (stats_type, stats_type) - ) - # This partial index helps us with old collection + # This partial index helps us with finding incomplete stats rows cursor.execute( """ CREATE INDEX IF NOT EXISTS %s_stats_not_complete diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index cebe4cbc57..4cb10dc9fb 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -105,7 +105,6 @@ class StatsStore(StateDeltasStore): desc="update_room_state", ) - @defer.inlineCallbacks def update_stats_delta( self, ts, stats_type, stats_id, fields, complete_with_stream_id=None ): @@ -124,22 +123,142 @@ class StatsStore(StateDeltasStore): row was completed. """ - while True: - try: - res = yield self.runInteraction( - "update_stats_delta", - self._update_stats_delta_txn, - ts, - stats_type, - stats_id, - fields, - complete_with_stream_id=complete_with_stream_id, - ) - return res - except OldCollectionRequired: - # retry after collecting old rows - # TODO (implement later) - raise NotImplementedError("old collection not in this PR") + return self.runInteraction( + "update_stats_delta", + self._update_stats_delta_txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id=complete_with_stream_id, + ) + + def _upsert_with_additive_relatives_txn( + self, txn, table, keyvalues, absolutes, additive_relatives + ): + """ + + Args: + txn: Transaction + table (str): Table name + keyvalues (dict[str, any]): Row-identifying key values + absolutes (dict[str, any]): Absolute (set) fields + additive_relatives (dict[str, int]): Fields that will be added onto + if existing row present. + """ + if self.database_engine.can_native_upsert: + absolute_updates = [ + "%(field)s = EXCLUDED.%(field)s" % {"field": field} + for field in absolutes.keys() + ] + + relative_updates = [ + "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s" + % {"table": table, "field": field} + for field in additive_relatives.keys() + ] + + insert_cols = [] + qargs = [table] + + for (key, val) in chain( + keyvalues.items(), absolutes.items(), additive_relatives.items() + ): + insert_cols.append(key) + qargs.append(val) + + sql = """ + INSERT INTO %(table)s (%(insert_cols_cs)s) + VALUES (%(insert_vals_qs)s) + ON CONFLICT DO UPDATE SET %(updates)s + """ % { + "table": table, + "insert_cols_cs": ", ".join(insert_cols), + "insert_vals_qs": ", ".join( + ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives)) + ), + "updates": ", ".join(chain(absolute_updates, relative_updates)), + } + + txn.execute(sql, qargs) + else: + retcols = chain(absolutes.keys(), additive_relatives.keys()) + current_row = self._simple_select_one_txn( + txn, table, keyvalues, retcols, allow_none=True + ) + if current_row is None: + merged_dict = {**keyvalues, **absolutes, **additive_relatives} + self._simple_insert_txn(txn, table, merged_dict) + else: + for (key, val) in additive_relatives.items(): + current_row[key] += val + for (key, val) in absolutes.items(): + current_row[key] = val + self._simple_update_one_txn(txn, table, keyvalues, current_row) + + def _upsert_copy_from_table_with_additive_relatives_txn( + self, txn, into_table, keyvalues, additive_relatives, src_table, copy_columns + ): + """ + Args: + txn: Transaction + into_table (str): The destination table to UPSERT the row into + keyvalues (dict[str, any]): Row-identifying key values + additive_relatives (dict[str, any]): Fields that will be added onto + if existing row present. (Must be disjoint from copy_columns.) + src_table (str): The source table to copy from + copy_columns (iterable[str]): The list of columns to copy + """ + if self.database_engine.can_native_upsert: + ins_columns = chain(keyvalues, copy_columns, additive_relatives.keys()) + sel_exprs = chain( + keyvalues, copy_columns, ("?" for _ in additive_relatives) + ) + keyvalues_where = ("%s = ?" % f for f in keyvalues) + + sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns) + sets_ar = ( + "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns + ) + + sql = """ + INSERT INTO %(into_table)s (%(ins_columns)s) + SELECT %(sel_exprs)s + FROM %(src_table)s + WHERE %(keyvalues_where)s + ON CONFLICT (%(keyvalues)s) + DO UPDATE SET %(sets)s + """ % { + "into_table": into_table, + "ins_columns": ", ".join(ins_columns), + "sel_exprs": ", ".join(sel_exprs), + "keyvalues_where": ", ".join(keyvalues_where), + "src_table": src_table, + "keyvalues": ", ".join(keyvalues.keys()), + "sets": ", ".join(chain(sets_cc, sets_ar)), + } + + qargs = chain(additive_relatives.values(), keyvalues.values()) + txn.execute(sql, qargs) + else: + src_row = self._simple_select_one_txn( + txn, src_table, keyvalues, copy_columns + ) + dest_current_row = self._simple_select_one_txn( + txn, + into_table, + keyvalues, + chain(additive_relatives.keys(), copy_columns), + allow_none=True, + ) + + if dest_current_row is None: + merged_dict = {**keyvalues, **src_row, **additive_relatives} + self._simple_insert_txn(txn, into_table, merged_dict) + else: + for (key, val) in additive_relatives.items(): + src_row[key] = dest_current_row[key] + val + self._simple_update_txn(txn, into_table, keyvalues, src_row) def _update_stats_delta_txn( self, @@ -154,118 +273,61 @@ class StatsStore(StateDeltasStore): """ See L{update_stats_delta} Additional Args: - absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas). + absolute_fields (dict[str, int]): Absolute current stats values + (i.e. not deltas). Does not work with per-slice fields. """ table, id_col = TYPE_TO_TABLE[stats_type] quantised_ts = self.quantise_stats_time(int(ts)) end_ts = quantised_ts + self.stats_bucket_size + abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type] + slice_field_names = PER_SLICE_FIELDS[stats_type] for field in chain(fields.keys(), absolute_fields.keys()): - if ( - field not in ABSOLUTE_STATS_FIELDS[stats_type] - and field not in PER_SLICE_FIELDS[stats_type] - ): + if field not in abs_field_names and field not in slice_field_names: # guard against potential SQL injection dodginess raise ValueError( "%s is not a recognised field" " for stats type %s" % (field, stats_type) ) - field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()] - field_values = list(fields.values()) + additive_relatives = { + key: fields.get(key, 0) + for key in abs_field_names + if key not in absolute_fields + } - if absolute_fields is not None: - field_sqls += ["%s = ?" % (field,) for field in absolute_fields.keys()] - field_values += list(absolute_fields.values()) + if absolute_fields is None: + absolute_fields = {} + elif complete_with_stream_id is not None: + absolute_fields = absolute_fields.copy() - if complete_with_stream_id is not None: - field_sqls.append("completed_delta_stream_id = ?") - field_values.append(complete_with_stream_id) + absolute_fields["completed_delta_stream_id"] = complete_with_stream_id - # update current row, but only if it is either: - # - dirty and complete but not old - # If it is old, we should old-collect it first and retry. - # - dirty and incomplete - # Incomplete rows can't be old-collected (as this would commit - # false statistics into the _historical table). - # Instead, their `end_ts` is extended, whilst we wait for them to - # become complete at the hand of the stats regenerator. - sql = ( - "UPDATE %s_current SET end_ts = ?, %s" - " WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))" - " AND %s = ?" - ) % (table, ", ".join(field_sqls), id_col) - - qargs = [end_ts] + list(field_values) + [end_ts, stats_id] - - txn.execute(sql, qargs) - - if txn.rowcount > 0: - # success. - return - - # if we're here, it's because we didn't succeed in updating a stats - # row. Why? Let's find out… - - current_row = self._simple_select_one_txn( + # first upsert the current table + self._upsert_with_additive_relatives_txn( txn, table + "_current", {id_col: stats_id}, - ("end_ts", "completed_delta_stream_id"), - allow_none=True, + absolute_fields, + additive_relatives, ) - if current_row is None: - # Failure reason: There is no row. - # Solution: - # we need to insert a row! (insert a dirty, incomplete row) - insertee = { - id_col: stats_id, - "end_ts": end_ts, - "start_ts": ts, - "completed_delta_stream_id": complete_with_stream_id, + if self.has_completed_background_updates(): + # TODO want to check specifically for stats regenerator, not all + # background updates… + # then upsert the historical table. + # we don't support absolute_fields for slice_field_names as it makes + # no sense. + per_slice_additive_relatives = { + key: fields.get(key, 0) + for key in slice_field_names } - - # we assume that, by default, blank fields should be zero. - for field_name in ABSOLUTE_STATS_FIELDS[stats_type]: - insertee[field_name] = 0 - - for field_name in PER_SLICE_FIELDS[stats_type]: - insertee[field_name] = 0 - - for (field, value) in fields.items(): - insertee[field] = value - - if absolute_fields is not None: - for (field, value) in absolute_fields.items(): - insertee[field] = value - - self._simple_insert_txn(txn, table + "_current", insertee) - - elif current_row["end_ts"] is None: - # Failure reason: The row is not dirty. - # Solution: - # update the row, including `start_ts`, to make it dirty. - sql = ( - "UPDATE %s_current SET start_ts = ?, end_ts = ?, %s" - " WHERE end_ts IS NULL AND %s = ?" - ) % (table, ", ".join(field_sqls), id_col) - - qargs = ( - [end_ts - self.stats_bucket_size, end_ts] - + list(field_values) - + [stats_id] + self._upsert_copy_from_table_with_additive_relatives_txn( + txn, + table + "_historical", + {id_col: stats_id}, + per_slice_additive_relatives, + table + "_current", + abs_field_names ) - - txn.execute(sql, qargs) - if txn.rowcount == 0: - raise RuntimeError( - "Should be impossible: No rows updated" - " but all conditions are known to be met." - ) - - elif current_row["end_ts"] < end_ts: - # Failure reason: The row is complete and old. - # Solution: We need to perform old collection first - raise OldCollectionRequired() From e8fc180d4dbcf8237769397652356ffa23a2e952 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 22 Aug 2019 16:10:05 +0100 Subject: [PATCH 12/29] Fix up SQL schema delta Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/schema/delta/56/stats_separated1.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index d7418fdf1e..95daf8f53b 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -77,7 +77,7 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( -- If initial stats regen is still to be performed: NULL -- If initial stats regen has been performed: the maximum delta stream -- position that this row takes into account. - completed_delta_stream_id BIGINT, + completed_delta_stream_id BIGINT ); From 79252d1c83f9d230f0e2320cc0a40493e22ad653 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 22 Aug 2019 16:10:32 +0100 Subject: [PATCH 13/29] Fix up historical stats support. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 43 ++++++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 11 deletions(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 4cb10dc9fb..5a7dfde926 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -17,8 +17,6 @@ import logging from itertools import chain -from twisted.internet import defer - from synapse.storage.state_deltas import StateDeltasStore logger = logging.getLogger(__name__) @@ -197,22 +195,41 @@ class StatsStore(StateDeltasStore): self._simple_update_one_txn(txn, table, keyvalues, current_row) def _upsert_copy_from_table_with_additive_relatives_txn( - self, txn, into_table, keyvalues, additive_relatives, src_table, copy_columns + self, + txn, + into_table, + keyvalues, + extra_dst_keyvalues, + additive_relatives, + src_table, + copy_columns, + additional_where="", ): """ Args: txn: Transaction into_table (str): The destination table to UPSERT the row into keyvalues (dict[str, any]): Row-identifying key values + extra_dst_keyvalues (dict[str, any]): Additional keyvalues + for `into_table`. additive_relatives (dict[str, any]): Fields that will be added onto if existing row present. (Must be disjoint from copy_columns.) src_table (str): The source table to copy from copy_columns (iterable[str]): The list of columns to copy + additional_where (str): Additional SQL for where (prefix with AND + if using). """ if self.database_engine.can_native_upsert: - ins_columns = chain(keyvalues, copy_columns, additive_relatives.keys()) + ins_columns = chain( + keyvalues, + copy_columns, + additive_relatives.keys(), + extra_dst_keyvalues.keys(), + ) sel_exprs = chain( - keyvalues, copy_columns, ("?" for _ in additive_relatives) + keyvalues, + copy_columns, + ("?" for _ in chain(additive_relatives, extra_dst_keyvalues)), ) keyvalues_where = ("%s = ?" % f for f in keyvalues) @@ -225,17 +242,20 @@ class StatsStore(StateDeltasStore): INSERT INTO %(into_table)s (%(ins_columns)s) SELECT %(sel_exprs)s FROM %(src_table)s - WHERE %(keyvalues_where)s + WHERE %(keyvalues_where)s %(additional_where)s ON CONFLICT (%(keyvalues)s) DO UPDATE SET %(sets)s """ % { "into_table": into_table, "ins_columns": ", ".join(ins_columns), "sel_exprs": ", ".join(sel_exprs), - "keyvalues_where": ", ".join(keyvalues_where), + "keyvalues_where": " AND ".join(keyvalues_where), "src_table": src_table, - "keyvalues": ", ".join(keyvalues.keys()), + "keyvalues": ", ".join( + chain(keyvalues.keys(), extra_dst_keyvalues.keys()) + ), "sets": ", ".join(chain(sets_cc, sets_ar)), + "additional_where": additional_where, } qargs = chain(additive_relatives.values(), keyvalues.values()) @@ -320,14 +340,15 @@ class StatsStore(StateDeltasStore): # we don't support absolute_fields for slice_field_names as it makes # no sense. per_slice_additive_relatives = { - key: fields.get(key, 0) - for key in slice_field_names + key: fields.get(key, 0) for key in slice_field_names } self._upsert_copy_from_table_with_additive_relatives_txn( txn, table + "_historical", {id_col: stats_id}, + {"end_ts": end_ts, "bucket_size": self.stats_bucket_size}, per_slice_additive_relatives, table + "_current", - abs_field_names + abs_field_names, + additional_where=" AND completed_delta_stream_id IS NOT NULL", ) From c3d2bf280790986017c43e3dde17c04a6797f557 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 08:52:20 +0100 Subject: [PATCH 14/29] Allow schema deltas to be engine-specific Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/prepare_database.py | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index d20eacda59..0270cd6f6c 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -238,6 +238,15 @@ def _upgrade_existing_database( logger.debug("applied_delta_files: %s", applied_delta_files) + if isinstance(database_engine, PostgresEngine): + specific_engine_extension = ".postgres" + else: + specific_engine_extension = ".sqlite" + + specific_engine_extensions = ( + ".sqlite", ".postgres" + ) + for v in range(start_ver, SCHEMA_VERSION + 1): logger.info("Upgrading schema to v%d", v) @@ -274,15 +283,22 @@ def _upgrade_existing_database( # Sometimes .pyc files turn up anyway even though we've # disabled their generation; e.g. from distribution package # installers. Silently skip it - pass + continue elif ext == ".sql": # A plain old .sql file, just read and execute it logger.info("Applying schema %s", relative_path) executescript(cur, absolute_path) + elif ext == specific_engine_extension and root_name.endswith(".sql"): + # A .sql file specific to our engine; just read and execute it + logger.info("Applying engine-specific schema %s", relative_path) + executescript(cur, absolute_path) + elif ext in specific_engine_extensions and root_name.endswith(".sql"): + # A .sql file for a different engine; skip it. + continue else: # Not a valid delta file. - logger.warn( - "Found directory entry that did not end in .py or" " .sql: %s", + logger.warning( + "Found directory entry that did not end in .py or .sql: %s", relative_path, ) continue @@ -290,7 +306,7 @@ def _upgrade_existing_database( # Mark as done. cur.execute( database_engine.convert_param_style( - "INSERT INTO applied_schema_deltas (version, file)" " VALUES (?,?)" + "INSERT INTO applied_schema_deltas (version, file) VALUES (?,?)" ), (v, relative_path), ) @@ -298,7 +314,7 @@ def _upgrade_existing_database( cur.execute("DELETE FROM schema_version") cur.execute( database_engine.convert_param_style( - "INSERT INTO schema_version (version, upgraded)" " VALUES (?,?)" + "INSERT INTO schema_version (version, upgraded) VALUES (?,?)" ), (v, True), ) From 1ecd1a6a5fe95df7a726362c143320fab09373c2 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 09:46:13 +0100 Subject: [PATCH 15/29] Use engine-specific delta SQL files rather than delta written in Python. Signed-off-by: Olivier Wilkinson (reivilibre) --- .../schema/delta/56/stats_separated2.py | 71 ------------------- .../delta/56/stats_separated2.sql.postgres | 24 +++++++ .../delta/56/stats_separated2.sql.sqlite | 27 +++++++ 3 files changed, 51 insertions(+), 71 deletions(-) delete mode 100644 synapse/storage/schema/delta/56/stats_separated2.py create mode 100644 synapse/storage/schema/delta/56/stats_separated2.sql.postgres create mode 100644 synapse/storage/schema/delta/56/stats_separated2.sql.sqlite diff --git a/synapse/storage/schema/delta/56/stats_separated2.py b/synapse/storage/schema/delta/56/stats_separated2.py deleted file mode 100644 index 942d240010..0000000000 --- a/synapse/storage/schema/delta/56/stats_separated2.py +++ /dev/null @@ -1,71 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2019 The Matrix.org Foundation C.I.C. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# This schema delta will be run after 'stats_separated1.sql' due to lexicographic -# ordering. Note that it MUST be so. -from synapse.storage.engines import PostgresEngine, Sqlite3Engine - - -def _run_create_generic(stats_type, cursor, database_engine): - """ - Creates the pertinent (partial, if supported) indices for one kind of stats. - Args: - stats_type: "room" or "user" - the type of stats - cursor: Database Cursor - database_engine: Database Engine - """ - if isinstance(database_engine, Sqlite3Engine): - # even though SQLite >= 3.8 can support partial indices, we won't enable - # them, in case the SQLite database may be later used on another system. - # It's also the case that SQLite is only likely to be used in small - # deployments or testing, where the optimisations gained by use of a - # partial index are not a big concern. - cursor.execute( - """ - CREATE INDEX IF NOT EXISTS %s_stats_not_complete - ON %s_stats_current (completed_delta_stream_id, %s_id); - """ - % (stats_type, stats_type, stats_type) - ) - elif isinstance(database_engine, PostgresEngine): - # This partial index helps us with finding incomplete stats rows - cursor.execute( - """ - CREATE INDEX IF NOT EXISTS %s_stats_not_complete - ON %s_stats_current (%s_id) - WHERE completed_delta_stream_id IS NULL; - """ - % (stats_type, stats_type, stats_type) - ) - else: - raise NotImplementedError("Unknown database engine.") - - -def run_create(cursor, database_engine): - """ - This function is called as part of the schema delta. - It will create indices - partial, if supported - for the new 'separated' - room & user statistics. - """ - _run_create_generic("room", cursor, database_engine) - _run_create_generic("user", cursor, database_engine) - - -def run_upgrade(cur, database_engine, config): - """ - This function is run on a database upgrade (of a non-empty database). - We have no need to do anything specific here. - """ - pass diff --git a/synapse/storage/schema/delta/56/stats_separated2.sql.postgres b/synapse/storage/schema/delta/56/stats_separated2.sql.postgres new file mode 100644 index 0000000000..0519fcff79 --- /dev/null +++ b/synapse/storage/schema/delta/56/stats_separated2.sql.postgres @@ -0,0 +1,24 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- These partial indices helps us with finding incomplete stats row +CREATE INDEX IF NOT EXISTS room_stats_not_complete + ON room_stats_current (room_id) + WHERE completed_delta_stream_id IS NULL; + +CREATE INDEX IF NOT EXISTS user_stats_not_complete + ON user_stats_current (user_id) + WHERE completed_delta_stream_id IS NULL; + diff --git a/synapse/storage/schema/delta/56/stats_separated2.sql.sqlite b/synapse/storage/schema/delta/56/stats_separated2.sql.sqlite new file mode 100644 index 0000000000..181f4ec5b9 --- /dev/null +++ b/synapse/storage/schema/delta/56/stats_separated2.sql.sqlite @@ -0,0 +1,27 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- even though SQLite >= 3.8 can support partial indices, we won't enable +-- them, in case the SQLite database may be later used on another system. +-- It's also the case that SQLite is only likely to be used in small +-- deployments or testing, where the optimisations gained by use of a +-- partial index are not a big concern. + +CREATE INDEX IF NOT EXISTS room_stats_not_complete + ON room_stats_current (completed_delta_stream_id, room_id); + +CREATE INDEX IF NOT EXISTS user_stats_not_complete + ON user_stats_current (completed_delta_stream_id, user_id); + From 4b7bf2e413a6012c87e3d12f7bf4183b9638836b Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Tue, 27 Aug 2019 13:26:08 +0100 Subject: [PATCH 16/29] Apply suggestions from code review Co-Authored-By: Erik Johnston --- synapse/storage/stats.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 5a7dfde926..62047839cc 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -134,7 +134,7 @@ class StatsStore(StateDeltasStore): def _upsert_with_additive_relatives_txn( self, txn, table, keyvalues, absolutes, additive_relatives ): - """ + """Used to update values in the stats tables. Args: txn: Transaction @@ -322,7 +322,7 @@ class StatsStore(StateDeltasStore): elif complete_with_stream_id is not None: absolute_fields = absolute_fields.copy() - absolute_fields["completed_delta_stream_id"] = complete_with_stream_id + absolute_fields["completed_delta_stream_id"] = complete_with_stream_id # first upsert the current table self._upsert_with_additive_relatives_txn( From 81c5289c839a6d6888cd849996572aa5c9e19fbd Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 13:35:37 +0100 Subject: [PATCH 17/29] Clarify `_update_stats_delta_txn` by adding code comments and kwargs. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 62047839cc..7959f5785b 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -311,6 +311,9 @@ class StatsStore(StateDeltasStore): " for stats type %s" % (field, stats_type) ) + # only absolute stats fields are tracked in the `_current` stats tables, + # so those are the only ones that we process deltas for when + # we upsert against the `_current` table. additive_relatives = { key: fields.get(key, 0) for key in abs_field_names @@ -321,34 +324,33 @@ class StatsStore(StateDeltasStore): absolute_fields = {} elif complete_with_stream_id is not None: absolute_fields = absolute_fields.copy() - absolute_fields["completed_delta_stream_id"] = complete_with_stream_id - # first upsert the current table + # first upsert the `_current` table self._upsert_with_additive_relatives_txn( - txn, - table + "_current", - {id_col: stats_id}, - absolute_fields, - additive_relatives, + txn=txn, + table=table + "_current", + keyvalues={id_col: stats_id}, + absolutes=absolute_fields, + additive_relatives=additive_relatives, ) if self.has_completed_background_updates(): # TODO want to check specifically for stats regenerator, not all # background updates… - # then upsert the historical table. - # we don't support absolute_fields for slice_field_names as it makes + # then upsert the `_historical` table. + # we don't support absolute_fields for per-slice fields as it makes # no sense. per_slice_additive_relatives = { key: fields.get(key, 0) for key in slice_field_names } self._upsert_copy_from_table_with_additive_relatives_txn( - txn, - table + "_historical", - {id_col: stats_id}, - {"end_ts": end_ts, "bucket_size": self.stats_bucket_size}, - per_slice_additive_relatives, - table + "_current", - abs_field_names, + txn=txn, + into_table=table + "_historical", + keyvalues={id_col: stats_id}, + extra_dst_keyvalues={"end_ts": end_ts, "bucket_size": self.stats_bucket_size}, + additive_relatives=per_slice_additive_relatives, + src_table=table + "_current", + copy_columns=abs_field_names, additional_where=" AND completed_delta_stream_id IS NOT NULL", ) From 544ba2c2e9ad8d3d9aa9041e3f724e5c96a15390 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 13:40:00 +0100 Subject: [PATCH 18/29] Apply minor suggestions from review Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 7959f5785b..c950ab9953 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -190,8 +190,7 @@ class StatsStore(StateDeltasStore): else: for (key, val) in additive_relatives.items(): current_row[key] += val - for (key, val) in absolutes.items(): - current_row[key] = val + current_row.update(absolutes) self._simple_update_one_txn(txn, table, keyvalues, current_row) def _upsert_copy_from_table_with_additive_relatives_txn( @@ -223,8 +222,8 @@ class StatsStore(StateDeltasStore): ins_columns = chain( keyvalues, copy_columns, - additive_relatives.keys(), - extra_dst_keyvalues.keys(), + additive_relatives, + extra_dst_keyvalues, ) sel_exprs = chain( keyvalues, From a6c102009e219d93b512f682e5f799037536e3ee Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 13:41:48 +0100 Subject: [PATCH 19/29] Lock tables in upsert fall-backs. Should not be too much of a performance concern as this code won't be hit on Postgres, which large deployments should be using. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index c950ab9953..0f3aa6a801 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -180,6 +180,7 @@ class StatsStore(StateDeltasStore): txn.execute(sql, qargs) else: + self.database_engine.lock_table(txn, table) retcols = chain(absolutes.keys(), additive_relatives.keys()) current_row = self._simple_select_one_txn( txn, table, keyvalues, retcols, allow_none=True @@ -260,6 +261,7 @@ class StatsStore(StateDeltasStore): qargs = chain(additive_relatives.values(), keyvalues.values()) txn.execute(sql, qargs) else: + self.database_engine.lock_table(txn, into_table) src_row = self._simple_select_one_txn( txn, src_table, keyvalues, copy_columns ) From 736ac58e1191fc28abaef5c2cab86901b85ce192 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 13:42:33 +0100 Subject: [PATCH 20/29] Code formatting (Black) Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 0f3aa6a801..650c0050cb 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -221,10 +221,7 @@ class StatsStore(StateDeltasStore): """ if self.database_engine.can_native_upsert: ins_columns = chain( - keyvalues, - copy_columns, - additive_relatives, - extra_dst_keyvalues, + keyvalues, copy_columns, additive_relatives, extra_dst_keyvalues ) sel_exprs = chain( keyvalues, @@ -349,7 +346,10 @@ class StatsStore(StateDeltasStore): txn=txn, into_table=table + "_historical", keyvalues={id_col: stats_id}, - extra_dst_keyvalues={"end_ts": end_ts, "bucket_size": self.stats_bucket_size}, + extra_dst_keyvalues={ + "end_ts": end_ts, + "bucket_size": self.stats_bucket_size, + }, additive_relatives=per_slice_additive_relatives, src_table=table + "_current", copy_columns=abs_field_names, From 09cbc3a8e9c0f494fb272cb3761024a851b3e3f8 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 13:50:58 +0100 Subject: [PATCH 21/29] Switch to milliseconds in room/user stats for consistency. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/config/stats.py | 13 +++++-------- .../storage/schema/delta/56/stats_separated1.sql | 2 +- synapse/storage/stats.py | 3 ++- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/synapse/config/stats.py b/synapse/config/stats.py index b518a3ed9c..b18ddbd1fa 100644 --- a/synapse/config/stats.py +++ b/synapse/config/stats.py @@ -27,19 +27,16 @@ class StatsConfig(Config): def read_config(self, config, **kwargs): self.stats_enabled = True - self.stats_bucket_size = 86400 + self.stats_bucket_size = 86400 * 1000 self.stats_retention = sys.maxsize stats_config = config.get("stats", None) if stats_config: self.stats_enabled = stats_config.get("enabled", self.stats_enabled) - self.stats_bucket_size = ( - self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000 + self.stats_bucket_size = self.parse_duration( + stats_config.get("bucket_size", "1d") ) - self.stats_retention = ( - self.parse_duration( - stats_config.get("retention", "%ds" % (sys.maxsize,)) - ) - / 1000 + self.stats_retention = self.parse_duration( + stats_config.get("retention", "%ds" % (sys.maxsize,)) ) def generate_config_section(self, config_dir_path, server_name, **kwargs): diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 95daf8f53b..045b5ca013 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -84,7 +84,7 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( -- represents HISTORICAL room statistics for a room CREATE TABLE IF NOT EXISTS room_stats_historical ( room_id TEXT NOT NULL, - -- These stats cover the time from (end_ts - bucket_size)...end_ts (in seconds). + -- These stats cover the time from (end_ts - bucket_size)...end_ts (in ms). -- Note that end_ts is quantised. end_ts BIGINT NOT NULL, bucket_size INT NOT NULL, diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 650c0050cb..35fca1dc7b 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -64,7 +64,8 @@ class StatsStore(StateDeltasStore): Quantises a timestamp to be a multiple of the bucket size. Args: - ts (int): the timestamp to quantise, in seconds since the Unix Epoch + ts (int): the timestamp to quantise, in milliseconds since the Unix + Epoch Returns: int: a timestamp which From c775f310e949e6ab72d1074310eb2bd4d024172c Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 13:51:25 +0100 Subject: [PATCH 22/29] Don't include the room & user stats docs in this PR. Signed-off-by: Olivier Wilkinson (reivilibre) --- docs/room_and_user_statistics.md | 146 ------------------------------- 1 file changed, 146 deletions(-) delete mode 100644 docs/room_and_user_statistics.md diff --git a/docs/room_and_user_statistics.md b/docs/room_and_user_statistics.md deleted file mode 100644 index eb003f771a..0000000000 --- a/docs/room_and_user_statistics.md +++ /dev/null @@ -1,146 +0,0 @@ -Room and User Statistics -======================== - -Synapse maintains room and user statistics (as well as a cache of room state), -in various tables. - -These can be used for administrative purposes but are also used when generating -the public room directory. If these tables get stale or out of sync (possibly -after database corruption), you may wish to regenerate them. - - -# Synapse Administrator Documentation - -## Various SQL scripts that you may find useful - -### Delete stats, including historical stats - -```sql -DELETE FROM room_stats_current; -DELETE FROM room_stats_historical; -DELETE FROM user_stats_current; -DELETE FROM user_stats_historical; -``` - -### Regenerate stats (all subjects) - -```sql -BEGIN; - DELETE FROM stats_incremental_position; - INSERT INTO stats_incremental_position ( - state_delta_stream_id, - total_events_min_stream_ordering, - total_events_max_stream_ordering, - is_background_contract - ) VALUES (NULL, NULL, NULL, FALSE), (NULL, NULL, NULL, TRUE); -COMMIT; - -DELETE FROM room_stats_current; -DELETE FROM user_stats_current; -``` - -then follow the steps below for **'Regenerate stats (missing subjects only)'** - -### Regenerate stats (missing subjects only) - -```sql --- Set up staging tables --- we depend on current_state_events_membership because this is used --- in our counting. -INSERT INTO background_updates (update_name, progress_json) VALUES - ('populate_stats_prepare', '{}', 'current_state_events_membership'); - --- Run through each room and update stats -INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES - ('populate_stats_process_rooms', '{}', 'populate_stats_prepare'); - --- Run through each user and update stats. -INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES - ('populate_stats_process_users', '{}', 'populate_stats_process_rooms'); - --- Clean up staging tables -INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES - ('populate_stats_cleanup', '{}', 'populate_stats_process_users'); -``` - -then **restart Synapse**. - - -# Synapse Developer Documentation - -## High-Level Concepts - -### Definitions - -* **subject**: Something we are tracking stats about – currently a room or user. -* **current row**: An entry for a subject in the appropriate current statistics - table. Each subject can have only one. -* **historical row**: An entry for a subject in the appropriate historical - statistics table. Each subject can have any number of these. - -### Overview - -Stats are maintained as time series. There are two kinds of column: - -* absolute columns – where the value is correct for the time given by `end_ts` - in the stats row. (Imagine a line graph for these values) -* per-slice columns – where the value corresponds to how many of the occurrences - occurred within the time slice given by `(end_ts − bucket_size)…end_ts` - or `start_ts…end_ts`. (Imagine a histogram for these values) - -Currently, only absolute columns are in use. - -Stats are maintained in two tables (for each type): current and historical. - -Current stats correspond to the present values. Each subject can only have one -entry. - -Historical stats correspond to values in the past. Subjects may have multiple -entries. - -## Concepts around the management of stats - -### current rows - -#### dirty current rows - -Current rows can be **dirty**, which means that they have changed since the -latest historical row for the same subject. -**Dirty** current rows possess an end timestamp, `end_ts`. - -#### old current rows and old collection - -When a (necessarily dirty) current row has an `end_ts` in the past, it is said -to be **old**. -Old current rows must be copied into a historical row, and cleared of their dirty -status, before further statistics can be tracked for that subject. -The process which does this is referred to as **old collection**. - -#### incomplete current rows - -There are also **incomplete** current rows, which are current rows that do not -contain a full count yet – this is because they are waiting for the stats -regenerator to give them an initial count. Incomplete current rows DO NOT contain -correct and up-to-date values. As such, *incomplete rows are not old-collected*. -Instead, old incomplete rows will be extended so they are no longer old. - -### historical rows - -Historical rows can always be considered to be valid for the time slice and -end time specified. (This, of course, assumes a lack of defects in the code -to track the statistics, and assumes integrity of the database). - -Even still, there are two considerations that we may need to bear in mind: - -* historical rows will not exist for every time slice – they will be omitted - if there were no changes. In this case, the following assumptions can be - made to interpolate/recreate missing rows: - - absolute fields have the same values as in the preceding row - - per-slice fields are zero (`0`) -* historical rows will not be retained forever – rows older than a configurable - time will be purged. - -#### purge - -The purging of historical rows is not yet implemented. - From 491eaf0808fb4d3bb6d9b5f5f26c77e82e9333ec Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:19:38 +0100 Subject: [PATCH 23/29] Remove obsolete `OldCollectionRequired` as old collection is obsolete. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 35fca1dc7b..824e57bad7 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -40,12 +40,6 @@ PER_SLICE_FIELDS = {"room": (), "user": ()} TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} -class OldCollectionRequired(Exception): - """ Signal that we need to collect old stats rows and retry. """ - - pass - - class StatsStore(StateDeltasStore): def __init__(self, db_conn, hs): super(StatsStore, self).__init__(db_conn, hs) From 11c4e506bd6254fcb9551729e70b7ade80b127e4 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:24:25 +0100 Subject: [PATCH 24/29] Rename `room_state` table to `room_stats_state` Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/events.py | 2 +- synapse/storage/schema/delta/56/stats_separated1.sql | 3 +++ synapse/storage/stats.py | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5a95c36a8b..28fdba5fb2 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2270,7 +2270,7 @@ class EventsStore( "room_aliases", "room_depth", "room_memberships", - "room_state", + "room_stats_state", "room_stats", "room_stats_earliest_token", "rooms", diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 045b5ca013..52fb09c0e6 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -137,3 +137,6 @@ CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical -- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that -- out for us. (We would want it to review stats for a particular user.) + +-- Also rename room_state to room_stats_state to make its ownership clear. +ALTER TABLE room_state RENAME TO room_stats_state; diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 824e57bad7..fce0fb5a56 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -92,7 +92,7 @@ class StatsStore(StateDeltasStore): fields[col] = None return self._simple_upsert( - table="room_state", + table="room_stats_state", keyvalues={"room_id": room_id}, values=fields, desc="update_room_state", From 62b1250629f0aef9ef74021c86ae53a2d27aad09 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:24:56 +0100 Subject: [PATCH 25/29] Update `_purge_room_txn` to take account of separated stats tables Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/events.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 28fdba5fb2..5527dd208e 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2271,7 +2271,8 @@ class EventsStore( "room_depth", "room_memberships", "room_stats_state", - "room_stats", + "room_stats_current", + "room_stats_historical", "room_stats_earliest_token", "rooms", "stream_ordering_to_exterm", From 324f21b216e84e8b2b9287bda4f2ad5565ac60c6 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:53:45 +0100 Subject: [PATCH 26/29] Fix logic error. `absolute_fields` being None shouldn't preclude completion of a current stats row. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index fce0fb5a56..35d8bdb7b7 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -315,7 +315,8 @@ class StatsStore(StateDeltasStore): if absolute_fields is None: absolute_fields = {} - elif complete_with_stream_id is not None: + + if complete_with_stream_id is not None: absolute_fields = absolute_fields.copy() absolute_fields["completed_delta_stream_id"] = complete_with_stream_id From 1af7866562c3a09815378635b1d305b5824bc7c0 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 16:16:53 +0100 Subject: [PATCH 27/29] Clean up code with improved naming and hoist around functions. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 170 +++++++++++++++++++++------------------ 1 file changed, 91 insertions(+), 79 deletions(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 35d8bdb7b7..4b2364746c 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -22,6 +22,9 @@ from synapse.storage.state_deltas import StateDeltasStore logger = logging.getLogger(__name__) # these fields track absolutes (e.g. total number of rooms on the server) +# You can think of these as Prometheus Gauges. +# You can draw these stats on a line graph. +# Example: number of users in a room ABSOLUTE_STATS_FIELDS = { "room": ( "current_state_events", @@ -35,6 +38,8 @@ ABSOLUTE_STATS_FIELDS = { } # these fields are per-timeslice and so should be reset to 0 upon a new slice +# You can draw these stats on a histogram. +# Example: number of events sent locally during a time slice PER_SLICE_FIELDS = {"room": (), "user": ()} TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} @@ -126,6 +131,92 @@ class StatsStore(StateDeltasStore): complete_with_stream_id=complete_with_stream_id, ) + def _update_stats_delta_txn( + self, + txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id=None, + absolute_field_overrides=None, + ): + """ + See L{update_stats_delta} + Additional Args: + absolute_fields (dict[str, int]): Current stats values + (i.e. not deltas) of absolute fields. + Does not work with per-slice fields. + """ + table, id_col = TYPE_TO_TABLE[stats_type] + + quantised_ts = self.quantise_stats_time(int(ts)) + end_ts = quantised_ts + self.stats_bucket_size + + abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type] + slice_field_names = PER_SLICE_FIELDS[stats_type] + for field in chain(fields.keys(), absolute_field_overrides.keys()): + if field not in abs_field_names and field not in slice_field_names: + # guard against potential SQL injection dodginess + raise ValueError( + "%s is not a recognised field" + " for stats type %s" % (field, stats_type) + ) + + # only absolute stats fields are tracked in the `_current` stats tables, + # so those are the only ones that we process deltas for when + # we upsert against the `_current` table. + + # This calculates the deltas (`field = field + ?` values) + # for absolute fields, + # * defaulting to 0 if not specified + # (required for the INSERT part of upserting to work) + # * omitting overrides specified in `absolute_field_overrides` + deltas_of_absolute_fields = { + key: fields.get(key, 0) + for key in abs_field_names + if key not in absolute_field_overrides + } + + if absolute_field_overrides is None: + absolute_field_overrides = {} + + if complete_with_stream_id is not None: + absolute_field_overrides = absolute_field_overrides.copy() + absolute_field_overrides["completed_delta_stream_id"] = complete_with_stream_id + + # first upsert the `_current` table + self._upsert_with_additive_relatives_txn( + txn=txn, + table=table + "_current", + keyvalues={id_col: stats_id}, + absolutes=absolute_field_overrides, + additive_relatives=deltas_of_absolute_fields, + ) + + if self.has_completed_background_updates(): + # TODO want to check specifically for stats regenerator, not all + # background updates… + # then upsert the `_historical` table. + # we don't support absolute_fields for per-slice fields as it makes + # no sense. + per_slice_additive_relatives = { + key: fields.get(key, 0) for key in slice_field_names + } + self._upsert_copy_from_table_with_additive_relatives_txn( + txn=txn, + into_table=table + "_historical", + keyvalues={id_col: stats_id}, + extra_dst_keyvalues={ + "end_ts": end_ts, + "bucket_size": self.stats_bucket_size, + }, + additive_relatives=per_slice_additive_relatives, + src_table=table + "_current", + copy_columns=abs_field_names, + additional_where=" AND completed_delta_stream_id IS NOT NULL", + ) + def _upsert_with_additive_relatives_txn( self, txn, table, keyvalues, absolutes, additive_relatives ): @@ -272,82 +363,3 @@ class StatsStore(StateDeltasStore): for (key, val) in additive_relatives.items(): src_row[key] = dest_current_row[key] + val self._simple_update_txn(txn, into_table, keyvalues, src_row) - - def _update_stats_delta_txn( - self, - txn, - ts, - stats_type, - stats_id, - fields, - complete_with_stream_id=None, - absolute_fields=None, - ): - """ - See L{update_stats_delta} - Additional Args: - absolute_fields (dict[str, int]): Absolute current stats values - (i.e. not deltas). Does not work with per-slice fields. - """ - table, id_col = TYPE_TO_TABLE[stats_type] - - quantised_ts = self.quantise_stats_time(int(ts)) - end_ts = quantised_ts + self.stats_bucket_size - - abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type] - slice_field_names = PER_SLICE_FIELDS[stats_type] - for field in chain(fields.keys(), absolute_fields.keys()): - if field not in abs_field_names and field not in slice_field_names: - # guard against potential SQL injection dodginess - raise ValueError( - "%s is not a recognised field" - " for stats type %s" % (field, stats_type) - ) - - # only absolute stats fields are tracked in the `_current` stats tables, - # so those are the only ones that we process deltas for when - # we upsert against the `_current` table. - additive_relatives = { - key: fields.get(key, 0) - for key in abs_field_names - if key not in absolute_fields - } - - if absolute_fields is None: - absolute_fields = {} - - if complete_with_stream_id is not None: - absolute_fields = absolute_fields.copy() - absolute_fields["completed_delta_stream_id"] = complete_with_stream_id - - # first upsert the `_current` table - self._upsert_with_additive_relatives_txn( - txn=txn, - table=table + "_current", - keyvalues={id_col: stats_id}, - absolutes=absolute_fields, - additive_relatives=additive_relatives, - ) - - if self.has_completed_background_updates(): - # TODO want to check specifically for stats regenerator, not all - # background updates… - # then upsert the `_historical` table. - # we don't support absolute_fields for per-slice fields as it makes - # no sense. - per_slice_additive_relatives = { - key: fields.get(key, 0) for key in slice_field_names - } - self._upsert_copy_from_table_with_additive_relatives_txn( - txn=txn, - into_table=table + "_historical", - keyvalues={id_col: stats_id}, - extra_dst_keyvalues={ - "end_ts": end_ts, - "bucket_size": self.stats_bucket_size, - }, - additive_relatives=per_slice_additive_relatives, - src_table=table + "_current", - copy_columns=abs_field_names, - additional_where=" AND completed_delta_stream_id IS NOT NULL", - ) From b9f1adc370fa06e5803907ee563fd743821e271d Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Wed, 28 Aug 2019 09:01:25 +0100 Subject: [PATCH 28/29] Update synapse/storage/stats.py Co-Authored-By: Erik Johnston --- synapse/storage/stats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 4b2364746c..a4493556f3 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -144,7 +144,7 @@ class StatsStore(StateDeltasStore): """ See L{update_stats_delta} Additional Args: - absolute_fields (dict[str, int]): Current stats values + absolute_field_overrides (dict[str, int]): Current stats values (i.e. not deltas) of absolute fields. Does not work with per-slice fields. """ From a344ad3d3fdb6575338a056e4f36afae1ee4f9a0 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 28 Aug 2019 09:33:03 +0100 Subject: [PATCH 29/29] Code formatting (Black) Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index a4493556f3..4106f161f0 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -183,7 +183,9 @@ class StatsStore(StateDeltasStore): if complete_with_stream_id is not None: absolute_field_overrides = absolute_field_overrides.copy() - absolute_field_overrides["completed_delta_stream_id"] = complete_with_stream_id + absolute_field_overrides[ + "completed_delta_stream_id" + ] = complete_with_stream_id # first upsert the `_current` table self._upsert_with_additive_relatives_txn(