mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-19 09:31:35 +03:00
Merge pull request #5889 from matrix-org/rei/rss_inc2
Separated Statistics [2/7ish]
This commit is contained in:
commit
cc66cf1238
6 changed files with 460 additions and 13 deletions
|
@ -27,19 +27,16 @@ class StatsConfig(Config):
|
|||
|
||||
def read_config(self, config, **kwargs):
|
||||
self.stats_enabled = True
|
||||
self.stats_bucket_size = 86400
|
||||
self.stats_bucket_size = 86400 * 1000
|
||||
self.stats_retention = sys.maxsize
|
||||
stats_config = config.get("stats", None)
|
||||
if stats_config:
|
||||
self.stats_enabled = stats_config.get("enabled", self.stats_enabled)
|
||||
self.stats_bucket_size = (
|
||||
self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000
|
||||
self.stats_bucket_size = self.parse_duration(
|
||||
stats_config.get("bucket_size", "1d")
|
||||
)
|
||||
self.stats_retention = (
|
||||
self.parse_duration(
|
||||
stats_config.get("retention", "%ds" % (sys.maxsize,))
|
||||
)
|
||||
/ 1000
|
||||
self.stats_retention = self.parse_duration(
|
||||
stats_config.get("retention", "%ds" % (sys.maxsize,))
|
||||
)
|
||||
|
||||
def generate_config_section(self, config_dir_path, server_name, **kwargs):
|
||||
|
|
|
@ -2270,8 +2270,9 @@ class EventsStore(
|
|||
"room_aliases",
|
||||
"room_depth",
|
||||
"room_memberships",
|
||||
"room_state",
|
||||
"room_stats",
|
||||
"room_stats_state",
|
||||
"room_stats_current",
|
||||
"room_stats_historical",
|
||||
"room_stats_earliest_token",
|
||||
"rooms",
|
||||
"stream_ordering_to_exterm",
|
||||
|
|
|
@ -31,3 +31,112 @@ DELETE FROM background_updates WHERE update_name IN (
|
|||
'populate_stats_process_rooms',
|
||||
'populate_stats_cleanup'
|
||||
);
|
||||
|
||||
----- Create tables for our version of room stats.
|
||||
|
||||
-- single-row table to track position of incremental updates
|
||||
CREATE TABLE IF NOT EXISTS stats_incremental_position (
|
||||
-- the stream_id of the last-processed state delta
|
||||
state_delta_stream_id BIGINT,
|
||||
|
||||
-- the stream_ordering of the last-processed backfilled event
|
||||
-- (this is negative)
|
||||
total_events_min_stream_ordering BIGINT,
|
||||
|
||||
-- the stream_ordering of the last-processed normally-created event
|
||||
-- (this is positive)
|
||||
total_events_max_stream_ordering BIGINT,
|
||||
|
||||
-- If true, this represents the contract agreed upon by the stats
|
||||
-- regenerator.
|
||||
-- If false, this is suitable for use by the delta/incremental processor.
|
||||
is_background_contract BOOLEAN NOT NULL PRIMARY KEY
|
||||
);
|
||||
|
||||
-- insert a null row and make sure it is the only one.
|
||||
DELETE FROM stats_incremental_position;
|
||||
INSERT INTO stats_incremental_position (
|
||||
state_delta_stream_id,
|
||||
total_events_min_stream_ordering,
|
||||
total_events_max_stream_ordering,
|
||||
is_background_contract
|
||||
) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1));
|
||||
|
||||
-- represents PRESENT room statistics for a room
|
||||
-- only holds absolute fields
|
||||
CREATE TABLE IF NOT EXISTS room_stats_current (
|
||||
room_id TEXT NOT NULL PRIMARY KEY,
|
||||
|
||||
current_state_events INT NOT NULL,
|
||||
total_events INT NOT NULL,
|
||||
joined_members INT NOT NULL,
|
||||
invited_members INT NOT NULL,
|
||||
left_members INT NOT NULL,
|
||||
banned_members INT NOT NULL,
|
||||
|
||||
-- If initial stats regen is still to be performed: NULL
|
||||
-- If initial stats regen has been performed: the maximum delta stream
|
||||
-- position that this row takes into account.
|
||||
completed_delta_stream_id BIGINT
|
||||
);
|
||||
|
||||
|
||||
-- represents HISTORICAL room statistics for a room
|
||||
CREATE TABLE IF NOT EXISTS room_stats_historical (
|
||||
room_id TEXT NOT NULL,
|
||||
-- These stats cover the time from (end_ts - bucket_size)...end_ts (in ms).
|
||||
-- Note that end_ts is quantised.
|
||||
end_ts BIGINT NOT NULL,
|
||||
bucket_size INT NOT NULL,
|
||||
|
||||
current_state_events INT NOT NULL,
|
||||
total_events INT NOT NULL,
|
||||
joined_members INT NOT NULL,
|
||||
invited_members INT NOT NULL,
|
||||
left_members INT NOT NULL,
|
||||
banned_members INT NOT NULL,
|
||||
|
||||
PRIMARY KEY (room_id, end_ts)
|
||||
);
|
||||
|
||||
-- We use this index to speed up deletion of ancient room stats.
|
||||
CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical (end_ts);
|
||||
|
||||
-- We don't need an index on (room_id, end_ts) because PRIMARY KEY sorts that
|
||||
-- out for us. (We would want it to review stats for a particular room.)
|
||||
|
||||
|
||||
-- represents PRESENT statistics for a user
|
||||
-- only holds absolute fields
|
||||
CREATE TABLE IF NOT EXISTS user_stats_current (
|
||||
user_id TEXT NOT NULL PRIMARY KEY,
|
||||
|
||||
public_rooms INT NOT NULL,
|
||||
private_rooms INT NOT NULL,
|
||||
|
||||
-- If initial stats regen is still to be performed: NULL
|
||||
-- If initial stats regen has been performed: the maximum delta stream
|
||||
-- position that this row takes into account.
|
||||
completed_delta_stream_id BIGINT
|
||||
);
|
||||
|
||||
-- represents HISTORICAL statistics for a user
|
||||
CREATE TABLE IF NOT EXISTS user_stats_historical (
|
||||
user_id TEXT NOT NULL,
|
||||
end_ts BIGINT NOT NULL,
|
||||
bucket_size INT NOT NULL,
|
||||
|
||||
public_rooms INT NOT NULL,
|
||||
private_rooms INT NOT NULL,
|
||||
|
||||
PRIMARY KEY (user_id, end_ts)
|
||||
);
|
||||
|
||||
-- We use this index to speed up deletion of ancient user stats.
|
||||
CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical (end_ts);
|
||||
|
||||
-- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that
|
||||
-- out for us. (We would want it to review stats for a particular user.)
|
||||
|
||||
-- Also rename room_state to room_stats_state to make its ownership clear.
|
||||
ALTER TABLE room_state RENAME TO room_stats_state;
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
/* Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- These partial indices helps us with finding incomplete stats row
|
||||
CREATE INDEX IF NOT EXISTS room_stats_not_complete
|
||||
ON room_stats_current (room_id)
|
||||
WHERE completed_delta_stream_id IS NULL;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS user_stats_not_complete
|
||||
ON user_stats_current (user_id)
|
||||
WHERE completed_delta_stream_id IS NULL;
|
||||
|
27
synapse/storage/schema/delta/56/stats_separated2.sql.sqlite
Normal file
27
synapse/storage/schema/delta/56/stats_separated2.sql.sqlite
Normal file
|
@ -0,0 +1,27 @@
|
|||
/* Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- even though SQLite >= 3.8 can support partial indices, we won't enable
|
||||
-- them, in case the SQLite database may be later used on another system.
|
||||
-- It's also the case that SQLite is only likely to be used in small
|
||||
-- deployments or testing, where the optimisations gained by use of a
|
||||
-- partial index are not a big concern.
|
||||
|
||||
CREATE INDEX IF NOT EXISTS room_stats_not_complete
|
||||
ON room_stats_current (completed_delta_stream_id, room_id);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS user_stats_not_complete
|
||||
ON user_stats_current (completed_delta_stream_id, user_id);
|
||||
|
|
@ -1,5 +1,6 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2018, 2019 New Vector Ltd
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
|
@ -14,12 +15,16 @@
|
|||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from itertools import chain
|
||||
|
||||
from synapse.storage.state_deltas import StateDeltasStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# these fields track absolutes (e.g. total number of rooms on the server)
|
||||
# You can think of these as Prometheus Gauges.
|
||||
# You can draw these stats on a line graph.
|
||||
# Example: number of users in a room
|
||||
ABSOLUTE_STATS_FIELDS = {
|
||||
"room": (
|
||||
"current_state_events",
|
||||
|
@ -27,12 +32,17 @@ ABSOLUTE_STATS_FIELDS = {
|
|||
"invited_members",
|
||||
"left_members",
|
||||
"banned_members",
|
||||
"state_events",
|
||||
"total_events",
|
||||
),
|
||||
"user": ("public_rooms", "private_rooms"),
|
||||
}
|
||||
|
||||
TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
|
||||
# these fields are per-timeslice and so should be reset to 0 upon a new slice
|
||||
# You can draw these stats on a histogram.
|
||||
# Example: number of events sent locally during a time slice
|
||||
PER_SLICE_FIELDS = {"room": (), "user": ()}
|
||||
|
||||
TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
|
||||
|
||||
|
||||
class StatsStore(StateDeltasStore):
|
||||
|
@ -48,6 +58,22 @@ class StatsStore(StateDeltasStore):
|
|||
self.register_noop_background_update("populate_stats_process_rooms")
|
||||
self.register_noop_background_update("populate_stats_cleanup")
|
||||
|
||||
def quantise_stats_time(self, ts):
|
||||
"""
|
||||
Quantises a timestamp to be a multiple of the bucket size.
|
||||
|
||||
Args:
|
||||
ts (int): the timestamp to quantise, in milliseconds since the Unix
|
||||
Epoch
|
||||
|
||||
Returns:
|
||||
int: a timestamp which
|
||||
- is divisible by the bucket size;
|
||||
- is no later than `ts`; and
|
||||
- is the largest such timestamp.
|
||||
"""
|
||||
return (ts // self.stats_bucket_size) * self.stats_bucket_size
|
||||
|
||||
def update_room_state(self, room_id, fields):
|
||||
"""
|
||||
Args:
|
||||
|
@ -71,8 +97,271 @@ class StatsStore(StateDeltasStore):
|
|||
fields[col] = None
|
||||
|
||||
return self._simple_upsert(
|
||||
table="room_state",
|
||||
table="room_stats_state",
|
||||
keyvalues={"room_id": room_id},
|
||||
values=fields,
|
||||
desc="update_room_state",
|
||||
)
|
||||
|
||||
def update_stats_delta(
|
||||
self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
|
||||
):
|
||||
"""
|
||||
Updates the statistics for a subject, with a delta (difference/relative
|
||||
change).
|
||||
|
||||
Args:
|
||||
ts (int): timestamp of the change
|
||||
stats_type (str): "room" or "user" – the kind of subject
|
||||
stats_id (str): the subject's ID (room ID or user ID)
|
||||
fields (dict[str, int]): Deltas of stats values.
|
||||
complete_with_stream_id (int, optional):
|
||||
If supplied, converts an incomplete row into a complete row,
|
||||
with the supplied stream_id marked as the stream_id where the
|
||||
row was completed.
|
||||
"""
|
||||
|
||||
return self.runInteraction(
|
||||
"update_stats_delta",
|
||||
self._update_stats_delta_txn,
|
||||
ts,
|
||||
stats_type,
|
||||
stats_id,
|
||||
fields,
|
||||
complete_with_stream_id=complete_with_stream_id,
|
||||
)
|
||||
|
||||
def _update_stats_delta_txn(
|
||||
self,
|
||||
txn,
|
||||
ts,
|
||||
stats_type,
|
||||
stats_id,
|
||||
fields,
|
||||
complete_with_stream_id=None,
|
||||
absolute_field_overrides=None,
|
||||
):
|
||||
"""
|
||||
See L{update_stats_delta}
|
||||
Additional Args:
|
||||
absolute_field_overrides (dict[str, int]): Current stats values
|
||||
(i.e. not deltas) of absolute fields.
|
||||
Does not work with per-slice fields.
|
||||
"""
|
||||
table, id_col = TYPE_TO_TABLE[stats_type]
|
||||
|
||||
quantised_ts = self.quantise_stats_time(int(ts))
|
||||
end_ts = quantised_ts + self.stats_bucket_size
|
||||
|
||||
abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
|
||||
slice_field_names = PER_SLICE_FIELDS[stats_type]
|
||||
for field in chain(fields.keys(), absolute_field_overrides.keys()):
|
||||
if field not in abs_field_names and field not in slice_field_names:
|
||||
# guard against potential SQL injection dodginess
|
||||
raise ValueError(
|
||||
"%s is not a recognised field"
|
||||
" for stats type %s" % (field, stats_type)
|
||||
)
|
||||
|
||||
# only absolute stats fields are tracked in the `_current` stats tables,
|
||||
# so those are the only ones that we process deltas for when
|
||||
# we upsert against the `_current` table.
|
||||
|
||||
# This calculates the deltas (`field = field + ?` values)
|
||||
# for absolute fields,
|
||||
# * defaulting to 0 if not specified
|
||||
# (required for the INSERT part of upserting to work)
|
||||
# * omitting overrides specified in `absolute_field_overrides`
|
||||
deltas_of_absolute_fields = {
|
||||
key: fields.get(key, 0)
|
||||
for key in abs_field_names
|
||||
if key not in absolute_field_overrides
|
||||
}
|
||||
|
||||
if absolute_field_overrides is None:
|
||||
absolute_field_overrides = {}
|
||||
|
||||
if complete_with_stream_id is not None:
|
||||
absolute_field_overrides = absolute_field_overrides.copy()
|
||||
absolute_field_overrides[
|
||||
"completed_delta_stream_id"
|
||||
] = complete_with_stream_id
|
||||
|
||||
# first upsert the `_current` table
|
||||
self._upsert_with_additive_relatives_txn(
|
||||
txn=txn,
|
||||
table=table + "_current",
|
||||
keyvalues={id_col: stats_id},
|
||||
absolutes=absolute_field_overrides,
|
||||
additive_relatives=deltas_of_absolute_fields,
|
||||
)
|
||||
|
||||
if self.has_completed_background_updates():
|
||||
# TODO want to check specifically for stats regenerator, not all
|
||||
# background updates…
|
||||
# then upsert the `_historical` table.
|
||||
# we don't support absolute_fields for per-slice fields as it makes
|
||||
# no sense.
|
||||
per_slice_additive_relatives = {
|
||||
key: fields.get(key, 0) for key in slice_field_names
|
||||
}
|
||||
self._upsert_copy_from_table_with_additive_relatives_txn(
|
||||
txn=txn,
|
||||
into_table=table + "_historical",
|
||||
keyvalues={id_col: stats_id},
|
||||
extra_dst_keyvalues={
|
||||
"end_ts": end_ts,
|
||||
"bucket_size": self.stats_bucket_size,
|
||||
},
|
||||
additive_relatives=per_slice_additive_relatives,
|
||||
src_table=table + "_current",
|
||||
copy_columns=abs_field_names,
|
||||
additional_where=" AND completed_delta_stream_id IS NOT NULL",
|
||||
)
|
||||
|
||||
def _upsert_with_additive_relatives_txn(
|
||||
self, txn, table, keyvalues, absolutes, additive_relatives
|
||||
):
|
||||
"""Used to update values in the stats tables.
|
||||
|
||||
Args:
|
||||
txn: Transaction
|
||||
table (str): Table name
|
||||
keyvalues (dict[str, any]): Row-identifying key values
|
||||
absolutes (dict[str, any]): Absolute (set) fields
|
||||
additive_relatives (dict[str, int]): Fields that will be added onto
|
||||
if existing row present.
|
||||
"""
|
||||
if self.database_engine.can_native_upsert:
|
||||
absolute_updates = [
|
||||
"%(field)s = EXCLUDED.%(field)s" % {"field": field}
|
||||
for field in absolutes.keys()
|
||||
]
|
||||
|
||||
relative_updates = [
|
||||
"%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
|
||||
% {"table": table, "field": field}
|
||||
for field in additive_relatives.keys()
|
||||
]
|
||||
|
||||
insert_cols = []
|
||||
qargs = [table]
|
||||
|
||||
for (key, val) in chain(
|
||||
keyvalues.items(), absolutes.items(), additive_relatives.items()
|
||||
):
|
||||
insert_cols.append(key)
|
||||
qargs.append(val)
|
||||
|
||||
sql = """
|
||||
INSERT INTO %(table)s (%(insert_cols_cs)s)
|
||||
VALUES (%(insert_vals_qs)s)
|
||||
ON CONFLICT DO UPDATE SET %(updates)s
|
||||
""" % {
|
||||
"table": table,
|
||||
"insert_cols_cs": ", ".join(insert_cols),
|
||||
"insert_vals_qs": ", ".join(
|
||||
["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
|
||||
),
|
||||
"updates": ", ".join(chain(absolute_updates, relative_updates)),
|
||||
}
|
||||
|
||||
txn.execute(sql, qargs)
|
||||
else:
|
||||
self.database_engine.lock_table(txn, table)
|
||||
retcols = chain(absolutes.keys(), additive_relatives.keys())
|
||||
current_row = self._simple_select_one_txn(
|
||||
txn, table, keyvalues, retcols, allow_none=True
|
||||
)
|
||||
if current_row is None:
|
||||
merged_dict = {**keyvalues, **absolutes, **additive_relatives}
|
||||
self._simple_insert_txn(txn, table, merged_dict)
|
||||
else:
|
||||
for (key, val) in additive_relatives.items():
|
||||
current_row[key] += val
|
||||
current_row.update(absolutes)
|
||||
self._simple_update_one_txn(txn, table, keyvalues, current_row)
|
||||
|
||||
def _upsert_copy_from_table_with_additive_relatives_txn(
|
||||
self,
|
||||
txn,
|
||||
into_table,
|
||||
keyvalues,
|
||||
extra_dst_keyvalues,
|
||||
additive_relatives,
|
||||
src_table,
|
||||
copy_columns,
|
||||
additional_where="",
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
txn: Transaction
|
||||
into_table (str): The destination table to UPSERT the row into
|
||||
keyvalues (dict[str, any]): Row-identifying key values
|
||||
extra_dst_keyvalues (dict[str, any]): Additional keyvalues
|
||||
for `into_table`.
|
||||
additive_relatives (dict[str, any]): Fields that will be added onto
|
||||
if existing row present. (Must be disjoint from copy_columns.)
|
||||
src_table (str): The source table to copy from
|
||||
copy_columns (iterable[str]): The list of columns to copy
|
||||
additional_where (str): Additional SQL for where (prefix with AND
|
||||
if using).
|
||||
"""
|
||||
if self.database_engine.can_native_upsert:
|
||||
ins_columns = chain(
|
||||
keyvalues, copy_columns, additive_relatives, extra_dst_keyvalues
|
||||
)
|
||||
sel_exprs = chain(
|
||||
keyvalues,
|
||||
copy_columns,
|
||||
("?" for _ in chain(additive_relatives, extra_dst_keyvalues)),
|
||||
)
|
||||
keyvalues_where = ("%s = ?" % f for f in keyvalues)
|
||||
|
||||
sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
|
||||
sets_ar = (
|
||||
"%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns
|
||||
)
|
||||
|
||||
sql = """
|
||||
INSERT INTO %(into_table)s (%(ins_columns)s)
|
||||
SELECT %(sel_exprs)s
|
||||
FROM %(src_table)s
|
||||
WHERE %(keyvalues_where)s %(additional_where)s
|
||||
ON CONFLICT (%(keyvalues)s)
|
||||
DO UPDATE SET %(sets)s
|
||||
""" % {
|
||||
"into_table": into_table,
|
||||
"ins_columns": ", ".join(ins_columns),
|
||||
"sel_exprs": ", ".join(sel_exprs),
|
||||
"keyvalues_where": " AND ".join(keyvalues_where),
|
||||
"src_table": src_table,
|
||||
"keyvalues": ", ".join(
|
||||
chain(keyvalues.keys(), extra_dst_keyvalues.keys())
|
||||
),
|
||||
"sets": ", ".join(chain(sets_cc, sets_ar)),
|
||||
"additional_where": additional_where,
|
||||
}
|
||||
|
||||
qargs = chain(additive_relatives.values(), keyvalues.values())
|
||||
txn.execute(sql, qargs)
|
||||
else:
|
||||
self.database_engine.lock_table(txn, into_table)
|
||||
src_row = self._simple_select_one_txn(
|
||||
txn, src_table, keyvalues, copy_columns
|
||||
)
|
||||
dest_current_row = self._simple_select_one_txn(
|
||||
txn,
|
||||
into_table,
|
||||
keyvalues,
|
||||
chain(additive_relatives.keys(), copy_columns),
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
if dest_current_row is None:
|
||||
merged_dict = {**keyvalues, **src_row, **additive_relatives}
|
||||
self._simple_insert_txn(txn, into_table, merged_dict)
|
||||
else:
|
||||
for (key, val) in additive_relatives.items():
|
||||
src_row[key] = dest_current_row[key] + val
|
||||
self._simple_update_txn(txn, into_table, keyvalues, src_row)
|
||||
|
|
Loading…
Reference in a new issue