mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-19 17:56:19 +03:00
Clean up code with improved naming and hoist around functions.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
This commit is contained in:
parent
324f21b216
commit
1af7866562
1 changed files with 91 additions and 79 deletions
|
@ -22,6 +22,9 @@ from synapse.storage.state_deltas import StateDeltasStore
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# these fields track absolutes (e.g. total number of rooms on the server)
|
# these fields track absolutes (e.g. total number of rooms on the server)
|
||||||
|
# You can think of these as Prometheus Gauges.
|
||||||
|
# You can draw these stats on a line graph.
|
||||||
|
# Example: number of users in a room
|
||||||
ABSOLUTE_STATS_FIELDS = {
|
ABSOLUTE_STATS_FIELDS = {
|
||||||
"room": (
|
"room": (
|
||||||
"current_state_events",
|
"current_state_events",
|
||||||
|
@ -35,6 +38,8 @@ ABSOLUTE_STATS_FIELDS = {
|
||||||
}
|
}
|
||||||
|
|
||||||
# these fields are per-timeslice and so should be reset to 0 upon a new slice
|
# these fields are per-timeslice and so should be reset to 0 upon a new slice
|
||||||
|
# You can draw these stats on a histogram.
|
||||||
|
# Example: number of events sent locally during a time slice
|
||||||
PER_SLICE_FIELDS = {"room": (), "user": ()}
|
PER_SLICE_FIELDS = {"room": (), "user": ()}
|
||||||
|
|
||||||
TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
|
TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
|
||||||
|
@ -126,6 +131,92 @@ class StatsStore(StateDeltasStore):
|
||||||
complete_with_stream_id=complete_with_stream_id,
|
complete_with_stream_id=complete_with_stream_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _update_stats_delta_txn(
|
||||||
|
self,
|
||||||
|
txn,
|
||||||
|
ts,
|
||||||
|
stats_type,
|
||||||
|
stats_id,
|
||||||
|
fields,
|
||||||
|
complete_with_stream_id=None,
|
||||||
|
absolute_field_overrides=None,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
See L{update_stats_delta}
|
||||||
|
Additional Args:
|
||||||
|
absolute_fields (dict[str, int]): Current stats values
|
||||||
|
(i.e. not deltas) of absolute fields.
|
||||||
|
Does not work with per-slice fields.
|
||||||
|
"""
|
||||||
|
table, id_col = TYPE_TO_TABLE[stats_type]
|
||||||
|
|
||||||
|
quantised_ts = self.quantise_stats_time(int(ts))
|
||||||
|
end_ts = quantised_ts + self.stats_bucket_size
|
||||||
|
|
||||||
|
abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
|
||||||
|
slice_field_names = PER_SLICE_FIELDS[stats_type]
|
||||||
|
for field in chain(fields.keys(), absolute_field_overrides.keys()):
|
||||||
|
if field not in abs_field_names and field not in slice_field_names:
|
||||||
|
# guard against potential SQL injection dodginess
|
||||||
|
raise ValueError(
|
||||||
|
"%s is not a recognised field"
|
||||||
|
" for stats type %s" % (field, stats_type)
|
||||||
|
)
|
||||||
|
|
||||||
|
# only absolute stats fields are tracked in the `_current` stats tables,
|
||||||
|
# so those are the only ones that we process deltas for when
|
||||||
|
# we upsert against the `_current` table.
|
||||||
|
|
||||||
|
# This calculates the deltas (`field = field + ?` values)
|
||||||
|
# for absolute fields,
|
||||||
|
# * defaulting to 0 if not specified
|
||||||
|
# (required for the INSERT part of upserting to work)
|
||||||
|
# * omitting overrides specified in `absolute_field_overrides`
|
||||||
|
deltas_of_absolute_fields = {
|
||||||
|
key: fields.get(key, 0)
|
||||||
|
for key in abs_field_names
|
||||||
|
if key not in absolute_field_overrides
|
||||||
|
}
|
||||||
|
|
||||||
|
if absolute_field_overrides is None:
|
||||||
|
absolute_field_overrides = {}
|
||||||
|
|
||||||
|
if complete_with_stream_id is not None:
|
||||||
|
absolute_field_overrides = absolute_field_overrides.copy()
|
||||||
|
absolute_field_overrides["completed_delta_stream_id"] = complete_with_stream_id
|
||||||
|
|
||||||
|
# first upsert the `_current` table
|
||||||
|
self._upsert_with_additive_relatives_txn(
|
||||||
|
txn=txn,
|
||||||
|
table=table + "_current",
|
||||||
|
keyvalues={id_col: stats_id},
|
||||||
|
absolutes=absolute_field_overrides,
|
||||||
|
additive_relatives=deltas_of_absolute_fields,
|
||||||
|
)
|
||||||
|
|
||||||
|
if self.has_completed_background_updates():
|
||||||
|
# TODO want to check specifically for stats regenerator, not all
|
||||||
|
# background updates…
|
||||||
|
# then upsert the `_historical` table.
|
||||||
|
# we don't support absolute_fields for per-slice fields as it makes
|
||||||
|
# no sense.
|
||||||
|
per_slice_additive_relatives = {
|
||||||
|
key: fields.get(key, 0) for key in slice_field_names
|
||||||
|
}
|
||||||
|
self._upsert_copy_from_table_with_additive_relatives_txn(
|
||||||
|
txn=txn,
|
||||||
|
into_table=table + "_historical",
|
||||||
|
keyvalues={id_col: stats_id},
|
||||||
|
extra_dst_keyvalues={
|
||||||
|
"end_ts": end_ts,
|
||||||
|
"bucket_size": self.stats_bucket_size,
|
||||||
|
},
|
||||||
|
additive_relatives=per_slice_additive_relatives,
|
||||||
|
src_table=table + "_current",
|
||||||
|
copy_columns=abs_field_names,
|
||||||
|
additional_where=" AND completed_delta_stream_id IS NOT NULL",
|
||||||
|
)
|
||||||
|
|
||||||
def _upsert_with_additive_relatives_txn(
|
def _upsert_with_additive_relatives_txn(
|
||||||
self, txn, table, keyvalues, absolutes, additive_relatives
|
self, txn, table, keyvalues, absolutes, additive_relatives
|
||||||
):
|
):
|
||||||
|
@ -272,82 +363,3 @@ class StatsStore(StateDeltasStore):
|
||||||
for (key, val) in additive_relatives.items():
|
for (key, val) in additive_relatives.items():
|
||||||
src_row[key] = dest_current_row[key] + val
|
src_row[key] = dest_current_row[key] + val
|
||||||
self._simple_update_txn(txn, into_table, keyvalues, src_row)
|
self._simple_update_txn(txn, into_table, keyvalues, src_row)
|
||||||
|
|
||||||
def _update_stats_delta_txn(
|
|
||||||
self,
|
|
||||||
txn,
|
|
||||||
ts,
|
|
||||||
stats_type,
|
|
||||||
stats_id,
|
|
||||||
fields,
|
|
||||||
complete_with_stream_id=None,
|
|
||||||
absolute_fields=None,
|
|
||||||
):
|
|
||||||
"""
|
|
||||||
See L{update_stats_delta}
|
|
||||||
Additional Args:
|
|
||||||
absolute_fields (dict[str, int]): Absolute current stats values
|
|
||||||
(i.e. not deltas). Does not work with per-slice fields.
|
|
||||||
"""
|
|
||||||
table, id_col = TYPE_TO_TABLE[stats_type]
|
|
||||||
|
|
||||||
quantised_ts = self.quantise_stats_time(int(ts))
|
|
||||||
end_ts = quantised_ts + self.stats_bucket_size
|
|
||||||
|
|
||||||
abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
|
|
||||||
slice_field_names = PER_SLICE_FIELDS[stats_type]
|
|
||||||
for field in chain(fields.keys(), absolute_fields.keys()):
|
|
||||||
if field not in abs_field_names and field not in slice_field_names:
|
|
||||||
# guard against potential SQL injection dodginess
|
|
||||||
raise ValueError(
|
|
||||||
"%s is not a recognised field"
|
|
||||||
" for stats type %s" % (field, stats_type)
|
|
||||||
)
|
|
||||||
|
|
||||||
# only absolute stats fields are tracked in the `_current` stats tables,
|
|
||||||
# so those are the only ones that we process deltas for when
|
|
||||||
# we upsert against the `_current` table.
|
|
||||||
additive_relatives = {
|
|
||||||
key: fields.get(key, 0)
|
|
||||||
for key in abs_field_names
|
|
||||||
if key not in absolute_fields
|
|
||||||
}
|
|
||||||
|
|
||||||
if absolute_fields is None:
|
|
||||||
absolute_fields = {}
|
|
||||||
|
|
||||||
if complete_with_stream_id is not None:
|
|
||||||
absolute_fields = absolute_fields.copy()
|
|
||||||
absolute_fields["completed_delta_stream_id"] = complete_with_stream_id
|
|
||||||
|
|
||||||
# first upsert the `_current` table
|
|
||||||
self._upsert_with_additive_relatives_txn(
|
|
||||||
txn=txn,
|
|
||||||
table=table + "_current",
|
|
||||||
keyvalues={id_col: stats_id},
|
|
||||||
absolutes=absolute_fields,
|
|
||||||
additive_relatives=additive_relatives,
|
|
||||||
)
|
|
||||||
|
|
||||||
if self.has_completed_background_updates():
|
|
||||||
# TODO want to check specifically for stats regenerator, not all
|
|
||||||
# background updates…
|
|
||||||
# then upsert the `_historical` table.
|
|
||||||
# we don't support absolute_fields for per-slice fields as it makes
|
|
||||||
# no sense.
|
|
||||||
per_slice_additive_relatives = {
|
|
||||||
key: fields.get(key, 0) for key in slice_field_names
|
|
||||||
}
|
|
||||||
self._upsert_copy_from_table_with_additive_relatives_txn(
|
|
||||||
txn=txn,
|
|
||||||
into_table=table + "_historical",
|
|
||||||
keyvalues={id_col: stats_id},
|
|
||||||
extra_dst_keyvalues={
|
|
||||||
"end_ts": end_ts,
|
|
||||||
"bucket_size": self.stats_bucket_size,
|
|
||||||
},
|
|
||||||
additive_relatives=per_slice_additive_relatives,
|
|
||||||
src_table=table + "_current",
|
|
||||||
copy_columns=abs_field_names,
|
|
||||||
additional_where=" AND completed_delta_stream_id IS NOT NULL",
|
|
||||||
)
|
|
||||||
|
|
Loading…
Reference in a new issue