diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 4cb10dc9fb..5a7dfde926 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -17,8 +17,6 @@ import logging from itertools import chain -from twisted.internet import defer - from synapse.storage.state_deltas import StateDeltasStore logger = logging.getLogger(__name__) @@ -197,22 +195,41 @@ class StatsStore(StateDeltasStore): self._simple_update_one_txn(txn, table, keyvalues, current_row) def _upsert_copy_from_table_with_additive_relatives_txn( - self, txn, into_table, keyvalues, additive_relatives, src_table, copy_columns + self, + txn, + into_table, + keyvalues, + extra_dst_keyvalues, + additive_relatives, + src_table, + copy_columns, + additional_where="", ): """ Args: txn: Transaction into_table (str): The destination table to UPSERT the row into keyvalues (dict[str, any]): Row-identifying key values + extra_dst_keyvalues (dict[str, any]): Additional keyvalues + for `into_table`. additive_relatives (dict[str, any]): Fields that will be added onto if existing row present. (Must be disjoint from copy_columns.) src_table (str): The source table to copy from copy_columns (iterable[str]): The list of columns to copy + additional_where (str): Additional SQL for where (prefix with AND + if using). """ if self.database_engine.can_native_upsert: - ins_columns = chain(keyvalues, copy_columns, additive_relatives.keys()) + ins_columns = chain( + keyvalues, + copy_columns, + additive_relatives.keys(), + extra_dst_keyvalues.keys(), + ) sel_exprs = chain( - keyvalues, copy_columns, ("?" for _ in additive_relatives) + keyvalues, + copy_columns, + ("?" for _ in chain(additive_relatives, extra_dst_keyvalues)), ) keyvalues_where = ("%s = ?" % f for f in keyvalues) @@ -225,17 +242,20 @@ class StatsStore(StateDeltasStore): INSERT INTO %(into_table)s (%(ins_columns)s) SELECT %(sel_exprs)s FROM %(src_table)s - WHERE %(keyvalues_where)s + WHERE %(keyvalues_where)s %(additional_where)s ON CONFLICT (%(keyvalues)s) DO UPDATE SET %(sets)s """ % { "into_table": into_table, "ins_columns": ", ".join(ins_columns), "sel_exprs": ", ".join(sel_exprs), - "keyvalues_where": ", ".join(keyvalues_where), + "keyvalues_where": " AND ".join(keyvalues_where), "src_table": src_table, - "keyvalues": ", ".join(keyvalues.keys()), + "keyvalues": ", ".join( + chain(keyvalues.keys(), extra_dst_keyvalues.keys()) + ), "sets": ", ".join(chain(sets_cc, sets_ar)), + "additional_where": additional_where, } qargs = chain(additive_relatives.values(), keyvalues.values()) @@ -320,14 +340,15 @@ class StatsStore(StateDeltasStore): # we don't support absolute_fields for slice_field_names as it makes # no sense. per_slice_additive_relatives = { - key: fields.get(key, 0) - for key in slice_field_names + key: fields.get(key, 0) for key in slice_field_names } self._upsert_copy_from_table_with_additive_relatives_txn( txn, table + "_historical", {id_col: stats_id}, + {"end_ts": end_ts, "bucket_size": self.stats_bucket_size}, per_slice_additive_relatives, table + "_current", - abs_field_names + abs_field_names, + additional_where=" AND completed_delta_stream_id IS NOT NULL", )