Simplify table structure

This obviates the need for old collection, but comes at the minor cost
of not being able to track historical stats or per-slice fields until
after the statistics regenerator is finished.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
This commit is contained in:
Olivier Wilkinson (reivilibre) 2019-08-22 15:40:58 +01:00
parent 18a4c03c50
commit 7b657f1148
3 changed files with 175 additions and 138 deletions

View file

@ -63,14 +63,10 @@ INSERT INTO stats_incremental_position (
) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1));
-- represents PRESENT room statistics for a room
-- only holds absolute fields
CREATE TABLE IF NOT EXISTS room_stats_current (
room_id TEXT NOT NULL PRIMARY KEY,
-- These starts cover the time from start_ts...end_ts (in seconds).
-- Note that end_ts is quantised, and start_ts usually so.
start_ts BIGINT,
end_ts BIGINT,
current_state_events INT NOT NULL,
total_events INT NOT NULL,
joined_members INT NOT NULL,
@ -82,8 +78,6 @@ CREATE TABLE IF NOT EXISTS room_stats_current (
-- If initial stats regen has been performed: the maximum delta stream
-- position that this row takes into account.
completed_delta_stream_id BIGINT,
CONSTRAINT timestamp_nullity_equality CHECK ((start_ts IS NULL) = (end_ts IS NULL))
);
@ -91,7 +85,7 @@ CREATE TABLE IF NOT EXISTS room_stats_current (
CREATE TABLE IF NOT EXISTS room_stats_historical (
room_id TEXT NOT NULL,
-- These stats cover the time from (end_ts - bucket_size)...end_ts (in seconds).
-- Note that end_ts is quantised, and start_ts usually so.
-- Note that end_ts is quantised.
end_ts BIGINT NOT NULL,
bucket_size INT NOT NULL,
@ -113,13 +107,10 @@ CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical
-- represents PRESENT statistics for a user
-- only holds absolute fields
CREATE TABLE IF NOT EXISTS user_stats_current (
user_id TEXT NOT NULL PRIMARY KEY,
-- The timestamp that represents the start of the
start_ts BIGINT,
end_ts BIGINT,
public_rooms INT NOT NULL,
private_rooms INT NOT NULL,

View file

@ -32,13 +32,6 @@ def _run_create_generic(stats_type, cursor, database_engine):
# It's also the case that SQLite is only likely to be used in small
# deployments or testing, where the optimisations gained by use of a
# partial index are not a big concern.
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS %s_stats_current_dirty
ON %s_stats_current (end_ts);
"""
% (stats_type, stats_type)
)
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS %s_stats_not_complete
@ -47,16 +40,7 @@ def _run_create_generic(stats_type, cursor, database_engine):
% (stats_type, stats_type, stats_type)
)
elif isinstance(database_engine, PostgresEngine):
# This partial index helps us with finding dirty stats rows
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS %s_stats_current_dirty
ON %s_stats_current (end_ts)
WHERE end_ts IS NOT NULL;
"""
% (stats_type, stats_type)
)
# This partial index helps us with old collection
# This partial index helps us with finding incomplete stats rows
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS %s_stats_not_complete

View file

@ -105,7 +105,6 @@ class StatsStore(StateDeltasStore):
desc="update_room_state",
)
@defer.inlineCallbacks
def update_stats_delta(
self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
):
@ -124,22 +123,142 @@ class StatsStore(StateDeltasStore):
row was completed.
"""
while True:
try:
res = yield self.runInteraction(
"update_stats_delta",
self._update_stats_delta_txn,
ts,
stats_type,
stats_id,
fields,
complete_with_stream_id=complete_with_stream_id,
)
return res
except OldCollectionRequired:
# retry after collecting old rows
# TODO (implement later)
raise NotImplementedError("old collection not in this PR")
return self.runInteraction(
"update_stats_delta",
self._update_stats_delta_txn,
ts,
stats_type,
stats_id,
fields,
complete_with_stream_id=complete_with_stream_id,
)
def _upsert_with_additive_relatives_txn(
self, txn, table, keyvalues, absolutes, additive_relatives
):
"""
Args:
txn: Transaction
table (str): Table name
keyvalues (dict[str, any]): Row-identifying key values
absolutes (dict[str, any]): Absolute (set) fields
additive_relatives (dict[str, int]): Fields that will be added onto
if existing row present.
"""
if self.database_engine.can_native_upsert:
absolute_updates = [
"%(field)s = EXCLUDED.%(field)s" % {"field": field}
for field in absolutes.keys()
]
relative_updates = [
"%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
% {"table": table, "field": field}
for field in additive_relatives.keys()
]
insert_cols = []
qargs = [table]
for (key, val) in chain(
keyvalues.items(), absolutes.items(), additive_relatives.items()
):
insert_cols.append(key)
qargs.append(val)
sql = """
INSERT INTO %(table)s (%(insert_cols_cs)s)
VALUES (%(insert_vals_qs)s)
ON CONFLICT DO UPDATE SET %(updates)s
""" % {
"table": table,
"insert_cols_cs": ", ".join(insert_cols),
"insert_vals_qs": ", ".join(
["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
),
"updates": ", ".join(chain(absolute_updates, relative_updates)),
}
txn.execute(sql, qargs)
else:
retcols = chain(absolutes.keys(), additive_relatives.keys())
current_row = self._simple_select_one_txn(
txn, table, keyvalues, retcols, allow_none=True
)
if current_row is None:
merged_dict = {**keyvalues, **absolutes, **additive_relatives}
self._simple_insert_txn(txn, table, merged_dict)
else:
for (key, val) in additive_relatives.items():
current_row[key] += val
for (key, val) in absolutes.items():
current_row[key] = val
self._simple_update_one_txn(txn, table, keyvalues, current_row)
def _upsert_copy_from_table_with_additive_relatives_txn(
self, txn, into_table, keyvalues, additive_relatives, src_table, copy_columns
):
"""
Args:
txn: Transaction
into_table (str): The destination table to UPSERT the row into
keyvalues (dict[str, any]): Row-identifying key values
additive_relatives (dict[str, any]): Fields that will be added onto
if existing row present. (Must be disjoint from copy_columns.)
src_table (str): The source table to copy from
copy_columns (iterable[str]): The list of columns to copy
"""
if self.database_engine.can_native_upsert:
ins_columns = chain(keyvalues, copy_columns, additive_relatives.keys())
sel_exprs = chain(
keyvalues, copy_columns, ("?" for _ in additive_relatives)
)
keyvalues_where = ("%s = ?" % f for f in keyvalues)
sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
sets_ar = (
"%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns
)
sql = """
INSERT INTO %(into_table)s (%(ins_columns)s)
SELECT %(sel_exprs)s
FROM %(src_table)s
WHERE %(keyvalues_where)s
ON CONFLICT (%(keyvalues)s)
DO UPDATE SET %(sets)s
""" % {
"into_table": into_table,
"ins_columns": ", ".join(ins_columns),
"sel_exprs": ", ".join(sel_exprs),
"keyvalues_where": ", ".join(keyvalues_where),
"src_table": src_table,
"keyvalues": ", ".join(keyvalues.keys()),
"sets": ", ".join(chain(sets_cc, sets_ar)),
}
qargs = chain(additive_relatives.values(), keyvalues.values())
txn.execute(sql, qargs)
else:
src_row = self._simple_select_one_txn(
txn, src_table, keyvalues, copy_columns
)
dest_current_row = self._simple_select_one_txn(
txn,
into_table,
keyvalues,
chain(additive_relatives.keys(), copy_columns),
allow_none=True,
)
if dest_current_row is None:
merged_dict = {**keyvalues, **src_row, **additive_relatives}
self._simple_insert_txn(txn, into_table, merged_dict)
else:
for (key, val) in additive_relatives.items():
src_row[key] = dest_current_row[key] + val
self._simple_update_txn(txn, into_table, keyvalues, src_row)
def _update_stats_delta_txn(
self,
@ -154,118 +273,61 @@ class StatsStore(StateDeltasStore):
"""
See L{update_stats_delta}
Additional Args:
absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas).
absolute_fields (dict[str, int]): Absolute current stats values
(i.e. not deltas). Does not work with per-slice fields.
"""
table, id_col = TYPE_TO_TABLE[stats_type]
quantised_ts = self.quantise_stats_time(int(ts))
end_ts = quantised_ts + self.stats_bucket_size
abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
slice_field_names = PER_SLICE_FIELDS[stats_type]
for field in chain(fields.keys(), absolute_fields.keys()):
if (
field not in ABSOLUTE_STATS_FIELDS[stats_type]
and field not in PER_SLICE_FIELDS[stats_type]
):
if field not in abs_field_names and field not in slice_field_names:
# guard against potential SQL injection dodginess
raise ValueError(
"%s is not a recognised field"
" for stats type %s" % (field, stats_type)
)
field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()]
field_values = list(fields.values())
additive_relatives = {
key: fields.get(key, 0)
for key in abs_field_names
if key not in absolute_fields
}
if absolute_fields is not None:
field_sqls += ["%s = ?" % (field,) for field in absolute_fields.keys()]
field_values += list(absolute_fields.values())
if absolute_fields is None:
absolute_fields = {}
elif complete_with_stream_id is not None:
absolute_fields = absolute_fields.copy()
if complete_with_stream_id is not None:
field_sqls.append("completed_delta_stream_id = ?")
field_values.append(complete_with_stream_id)
absolute_fields["completed_delta_stream_id"] = complete_with_stream_id
# update current row, but only if it is either:
# - dirty and complete but not old
# If it is old, we should old-collect it first and retry.
# - dirty and incomplete
# Incomplete rows can't be old-collected (as this would commit
# false statistics into the _historical table).
# Instead, their `end_ts` is extended, whilst we wait for them to
# become complete at the hand of the stats regenerator.
sql = (
"UPDATE %s_current SET end_ts = ?, %s"
" WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))"
" AND %s = ?"
) % (table, ", ".join(field_sqls), id_col)
qargs = [end_ts] + list(field_values) + [end_ts, stats_id]
txn.execute(sql, qargs)
if txn.rowcount > 0:
# success.
return
# if we're here, it's because we didn't succeed in updating a stats
# row. Why? Let's find out…
current_row = self._simple_select_one_txn(
# first upsert the current table
self._upsert_with_additive_relatives_txn(
txn,
table + "_current",
{id_col: stats_id},
("end_ts", "completed_delta_stream_id"),
allow_none=True,
absolute_fields,
additive_relatives,
)
if current_row is None:
# Failure reason: There is no row.
# Solution:
# we need to insert a row! (insert a dirty, incomplete row)
insertee = {
id_col: stats_id,
"end_ts": end_ts,
"start_ts": ts,
"completed_delta_stream_id": complete_with_stream_id,
if self.has_completed_background_updates():
# TODO want to check specifically for stats regenerator, not all
# background updates…
# then upsert the historical table.
# we don't support absolute_fields for slice_field_names as it makes
# no sense.
per_slice_additive_relatives = {
key: fields.get(key, 0)
for key in slice_field_names
}
# we assume that, by default, blank fields should be zero.
for field_name in ABSOLUTE_STATS_FIELDS[stats_type]:
insertee[field_name] = 0
for field_name in PER_SLICE_FIELDS[stats_type]:
insertee[field_name] = 0
for (field, value) in fields.items():
insertee[field] = value
if absolute_fields is not None:
for (field, value) in absolute_fields.items():
insertee[field] = value
self._simple_insert_txn(txn, table + "_current", insertee)
elif current_row["end_ts"] is None:
# Failure reason: The row is not dirty.
# Solution:
# update the row, including `start_ts`, to make it dirty.
sql = (
"UPDATE %s_current SET start_ts = ?, end_ts = ?, %s"
" WHERE end_ts IS NULL AND %s = ?"
) % (table, ", ".join(field_sqls), id_col)
qargs = (
[end_ts - self.stats_bucket_size, end_ts]
+ list(field_values)
+ [stats_id]
self._upsert_copy_from_table_with_additive_relatives_txn(
txn,
table + "_historical",
{id_col: stats_id},
per_slice_additive_relatives,
table + "_current",
abs_field_names
)
txn.execute(sql, qargs)
if txn.rowcount == 0:
raise RuntimeError(
"Should be impossible: No rows updated"
" but all conditions are known to be met."
)
elif current_row["end_ts"] < end_ts:
# Failure reason: The row is complete and old.
# Solution: We need to perform old collection first
raise OldCollectionRequired()