Fix up historical stats support.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
This commit is contained in:
Olivier Wilkinson (reivilibre) 2019-08-22 16:10:32 +01:00
parent e8fc180d4d
commit 79252d1c83

View file

@ -17,8 +17,6 @@
import logging
from itertools import chain
from twisted.internet import defer
from synapse.storage.state_deltas import StateDeltasStore
logger = logging.getLogger(__name__)
@ -197,22 +195,41 @@ class StatsStore(StateDeltasStore):
self._simple_update_one_txn(txn, table, keyvalues, current_row)
def _upsert_copy_from_table_with_additive_relatives_txn(
self, txn, into_table, keyvalues, additive_relatives, src_table, copy_columns
self,
txn,
into_table,
keyvalues,
extra_dst_keyvalues,
additive_relatives,
src_table,
copy_columns,
additional_where="",
):
"""
Args:
txn: Transaction
into_table (str): The destination table to UPSERT the row into
keyvalues (dict[str, any]): Row-identifying key values
extra_dst_keyvalues (dict[str, any]): Additional keyvalues
for `into_table`.
additive_relatives (dict[str, any]): Fields that will be added onto
if existing row present. (Must be disjoint from copy_columns.)
src_table (str): The source table to copy from
copy_columns (iterable[str]): The list of columns to copy
additional_where (str): Additional SQL for where (prefix with AND
if using).
"""
if self.database_engine.can_native_upsert:
ins_columns = chain(keyvalues, copy_columns, additive_relatives.keys())
ins_columns = chain(
keyvalues,
copy_columns,
additive_relatives.keys(),
extra_dst_keyvalues.keys(),
)
sel_exprs = chain(
keyvalues, copy_columns, ("?" for _ in additive_relatives)
keyvalues,
copy_columns,
("?" for _ in chain(additive_relatives, extra_dst_keyvalues)),
)
keyvalues_where = ("%s = ?" % f for f in keyvalues)
@ -225,17 +242,20 @@ class StatsStore(StateDeltasStore):
INSERT INTO %(into_table)s (%(ins_columns)s)
SELECT %(sel_exprs)s
FROM %(src_table)s
WHERE %(keyvalues_where)s
WHERE %(keyvalues_where)s %(additional_where)s
ON CONFLICT (%(keyvalues)s)
DO UPDATE SET %(sets)s
""" % {
"into_table": into_table,
"ins_columns": ", ".join(ins_columns),
"sel_exprs": ", ".join(sel_exprs),
"keyvalues_where": ", ".join(keyvalues_where),
"keyvalues_where": " AND ".join(keyvalues_where),
"src_table": src_table,
"keyvalues": ", ".join(keyvalues.keys()),
"keyvalues": ", ".join(
chain(keyvalues.keys(), extra_dst_keyvalues.keys())
),
"sets": ", ".join(chain(sets_cc, sets_ar)),
"additional_where": additional_where,
}
qargs = chain(additive_relatives.values(), keyvalues.values())
@ -320,14 +340,15 @@ class StatsStore(StateDeltasStore):
# we don't support absolute_fields for slice_field_names as it makes
# no sense.
per_slice_additive_relatives = {
key: fields.get(key, 0)
for key in slice_field_names
key: fields.get(key, 0) for key in slice_field_names
}
self._upsert_copy_from_table_with_additive_relatives_txn(
txn,
table + "_historical",
{id_col: stats_id},
{"end_ts": end_ts, "bucket_size": self.stats_bucket_size},
per_slice_additive_relatives,
table + "_current",
abs_field_names
abs_field_names,
additional_where=" AND completed_delta_stream_id IS NOT NULL",
)