From 491eaf0808fb4d3bb6d9b5f5f26c77e82e9333ec Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:19:38 +0100 Subject: [PATCH 1/7] Remove obsolete `OldCollectionRequired` as old collection is obsolete. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 35fca1dc7b..824e57bad7 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -40,12 +40,6 @@ PER_SLICE_FIELDS = {"room": (), "user": ()} TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} -class OldCollectionRequired(Exception): - """ Signal that we need to collect old stats rows and retry. """ - - pass - - class StatsStore(StateDeltasStore): def __init__(self, db_conn, hs): super(StatsStore, self).__init__(db_conn, hs) From 11c4e506bd6254fcb9551729e70b7ade80b127e4 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:24:25 +0100 Subject: [PATCH 2/7] Rename `room_state` table to `room_stats_state` Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/events.py | 2 +- synapse/storage/schema/delta/56/stats_separated1.sql | 3 +++ synapse/storage/stats.py | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5a95c36a8b..28fdba5fb2 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2270,7 +2270,7 @@ class EventsStore( "room_aliases", "room_depth", "room_memberships", - "room_state", + "room_stats_state", "room_stats", "room_stats_earliest_token", "rooms", diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 045b5ca013..52fb09c0e6 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -137,3 +137,6 @@ CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical -- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that -- out for us. (We would want it to review stats for a particular user.) + +-- Also rename room_state to room_stats_state to make its ownership clear. +ALTER TABLE room_state RENAME TO room_stats_state; diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 824e57bad7..fce0fb5a56 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -92,7 +92,7 @@ class StatsStore(StateDeltasStore): fields[col] = None return self._simple_upsert( - table="room_state", + table="room_stats_state", keyvalues={"room_id": room_id}, values=fields, desc="update_room_state", From 62b1250629f0aef9ef74021c86ae53a2d27aad09 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:24:56 +0100 Subject: [PATCH 3/7] Update `_purge_room_txn` to take account of separated stats tables Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/events.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 28fdba5fb2..5527dd208e 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2271,7 +2271,8 @@ class EventsStore( "room_depth", "room_memberships", "room_stats_state", - "room_stats", + "room_stats_current", + "room_stats_historical", "room_stats_earliest_token", "rooms", "stream_ordering_to_exterm", From 324f21b216e84e8b2b9287bda4f2ad5565ac60c6 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:53:45 +0100 Subject: [PATCH 4/7] Fix logic error. `absolute_fields` being None shouldn't preclude completion of a current stats row. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index fce0fb5a56..35d8bdb7b7 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -315,7 +315,8 @@ class StatsStore(StateDeltasStore): if absolute_fields is None: absolute_fields = {} - elif complete_with_stream_id is not None: + + if complete_with_stream_id is not None: absolute_fields = absolute_fields.copy() absolute_fields["completed_delta_stream_id"] = complete_with_stream_id From 1af7866562c3a09815378635b1d305b5824bc7c0 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 16:16:53 +0100 Subject: [PATCH 5/7] Clean up code with improved naming and hoist around functions. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 170 +++++++++++++++++++++------------------ 1 file changed, 91 insertions(+), 79 deletions(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 35d8bdb7b7..4b2364746c 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -22,6 +22,9 @@ from synapse.storage.state_deltas import StateDeltasStore logger = logging.getLogger(__name__) # these fields track absolutes (e.g. total number of rooms on the server) +# You can think of these as Prometheus Gauges. +# You can draw these stats on a line graph. +# Example: number of users in a room ABSOLUTE_STATS_FIELDS = { "room": ( "current_state_events", @@ -35,6 +38,8 @@ ABSOLUTE_STATS_FIELDS = { } # these fields are per-timeslice and so should be reset to 0 upon a new slice +# You can draw these stats on a histogram. +# Example: number of events sent locally during a time slice PER_SLICE_FIELDS = {"room": (), "user": ()} TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} @@ -126,6 +131,92 @@ class StatsStore(StateDeltasStore): complete_with_stream_id=complete_with_stream_id, ) + def _update_stats_delta_txn( + self, + txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id=None, + absolute_field_overrides=None, + ): + """ + See L{update_stats_delta} + Additional Args: + absolute_fields (dict[str, int]): Current stats values + (i.e. not deltas) of absolute fields. + Does not work with per-slice fields. + """ + table, id_col = TYPE_TO_TABLE[stats_type] + + quantised_ts = self.quantise_stats_time(int(ts)) + end_ts = quantised_ts + self.stats_bucket_size + + abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type] + slice_field_names = PER_SLICE_FIELDS[stats_type] + for field in chain(fields.keys(), absolute_field_overrides.keys()): + if field not in abs_field_names and field not in slice_field_names: + # guard against potential SQL injection dodginess + raise ValueError( + "%s is not a recognised field" + " for stats type %s" % (field, stats_type) + ) + + # only absolute stats fields are tracked in the `_current` stats tables, + # so those are the only ones that we process deltas for when + # we upsert against the `_current` table. + + # This calculates the deltas (`field = field + ?` values) + # for absolute fields, + # * defaulting to 0 if not specified + # (required for the INSERT part of upserting to work) + # * omitting overrides specified in `absolute_field_overrides` + deltas_of_absolute_fields = { + key: fields.get(key, 0) + for key in abs_field_names + if key not in absolute_field_overrides + } + + if absolute_field_overrides is None: + absolute_field_overrides = {} + + if complete_with_stream_id is not None: + absolute_field_overrides = absolute_field_overrides.copy() + absolute_field_overrides["completed_delta_stream_id"] = complete_with_stream_id + + # first upsert the `_current` table + self._upsert_with_additive_relatives_txn( + txn=txn, + table=table + "_current", + keyvalues={id_col: stats_id}, + absolutes=absolute_field_overrides, + additive_relatives=deltas_of_absolute_fields, + ) + + if self.has_completed_background_updates(): + # TODO want to check specifically for stats regenerator, not all + # background updates… + # then upsert the `_historical` table. + # we don't support absolute_fields for per-slice fields as it makes + # no sense. + per_slice_additive_relatives = { + key: fields.get(key, 0) for key in slice_field_names + } + self._upsert_copy_from_table_with_additive_relatives_txn( + txn=txn, + into_table=table + "_historical", + keyvalues={id_col: stats_id}, + extra_dst_keyvalues={ + "end_ts": end_ts, + "bucket_size": self.stats_bucket_size, + }, + additive_relatives=per_slice_additive_relatives, + src_table=table + "_current", + copy_columns=abs_field_names, + additional_where=" AND completed_delta_stream_id IS NOT NULL", + ) + def _upsert_with_additive_relatives_txn( self, txn, table, keyvalues, absolutes, additive_relatives ): @@ -272,82 +363,3 @@ class StatsStore(StateDeltasStore): for (key, val) in additive_relatives.items(): src_row[key] = dest_current_row[key] + val self._simple_update_txn(txn, into_table, keyvalues, src_row) - - def _update_stats_delta_txn( - self, - txn, - ts, - stats_type, - stats_id, - fields, - complete_with_stream_id=None, - absolute_fields=None, - ): - """ - See L{update_stats_delta} - Additional Args: - absolute_fields (dict[str, int]): Absolute current stats values - (i.e. not deltas). Does not work with per-slice fields. - """ - table, id_col = TYPE_TO_TABLE[stats_type] - - quantised_ts = self.quantise_stats_time(int(ts)) - end_ts = quantised_ts + self.stats_bucket_size - - abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type] - slice_field_names = PER_SLICE_FIELDS[stats_type] - for field in chain(fields.keys(), absolute_fields.keys()): - if field not in abs_field_names and field not in slice_field_names: - # guard against potential SQL injection dodginess - raise ValueError( - "%s is not a recognised field" - " for stats type %s" % (field, stats_type) - ) - - # only absolute stats fields are tracked in the `_current` stats tables, - # so those are the only ones that we process deltas for when - # we upsert against the `_current` table. - additive_relatives = { - key: fields.get(key, 0) - for key in abs_field_names - if key not in absolute_fields - } - - if absolute_fields is None: - absolute_fields = {} - - if complete_with_stream_id is not None: - absolute_fields = absolute_fields.copy() - absolute_fields["completed_delta_stream_id"] = complete_with_stream_id - - # first upsert the `_current` table - self._upsert_with_additive_relatives_txn( - txn=txn, - table=table + "_current", - keyvalues={id_col: stats_id}, - absolutes=absolute_fields, - additive_relatives=additive_relatives, - ) - - if self.has_completed_background_updates(): - # TODO want to check specifically for stats regenerator, not all - # background updates… - # then upsert the `_historical` table. - # we don't support absolute_fields for per-slice fields as it makes - # no sense. - per_slice_additive_relatives = { - key: fields.get(key, 0) for key in slice_field_names - } - self._upsert_copy_from_table_with_additive_relatives_txn( - txn=txn, - into_table=table + "_historical", - keyvalues={id_col: stats_id}, - extra_dst_keyvalues={ - "end_ts": end_ts, - "bucket_size": self.stats_bucket_size, - }, - additive_relatives=per_slice_additive_relatives, - src_table=table + "_current", - copy_columns=abs_field_names, - additional_where=" AND completed_delta_stream_id IS NOT NULL", - ) From b9f1adc370fa06e5803907ee563fd743821e271d Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Wed, 28 Aug 2019 09:01:25 +0100 Subject: [PATCH 6/7] Update synapse/storage/stats.py Co-Authored-By: Erik Johnston --- synapse/storage/stats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 4b2364746c..a4493556f3 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -144,7 +144,7 @@ class StatsStore(StateDeltasStore): """ See L{update_stats_delta} Additional Args: - absolute_fields (dict[str, int]): Current stats values + absolute_field_overrides (dict[str, int]): Current stats values (i.e. not deltas) of absolute fields. Does not work with per-slice fields. """ From a344ad3d3fdb6575338a056e4f36afae1ee4f9a0 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 28 Aug 2019 09:33:03 +0100 Subject: [PATCH 7/7] Code formatting (Black) Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index a4493556f3..4106f161f0 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -183,7 +183,9 @@ class StatsStore(StateDeltasStore): if complete_with_stream_id is not None: absolute_field_overrides = absolute_field_overrides.copy() - absolute_field_overrides["completed_delta_stream_id"] = complete_with_stream_id + absolute_field_overrides[ + "completed_delta_stream_id" + ] = complete_with_stream_id # first upsert the `_current` table self._upsert_with_additive_relatives_txn(