Merge branch 'rei/rss_target' into rei/rss_inc3

This commit is contained in:
Olivier Wilkinson (reivilibre) 2019-08-28 09:53:33 +01:00
commit dfb22fec48
3 changed files with 100 additions and 89 deletions

View file

@ -2270,8 +2270,9 @@ class EventsStore(
"room_aliases", "room_aliases",
"room_depth", "room_depth",
"room_memberships", "room_memberships",
"room_state", "room_stats_state",
"room_stats", "room_stats_current",
"room_stats_historical",
"room_stats_earliest_token", "room_stats_earliest_token",
"rooms", "rooms",
"stream_ordering_to_exterm", "stream_ordering_to_exterm",

View file

@ -137,3 +137,6 @@ CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical
-- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that -- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that
-- out for us. (We would want it to review stats for a particular user.) -- out for us. (We would want it to review stats for a particular user.)
-- Also rename room_state to room_stats_state to make its ownership clear.
ALTER TABLE room_state RENAME TO room_stats_state;

View file

@ -25,6 +25,9 @@ from synapse.util.caches.descriptors import cached
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# these fields track absolutes (e.g. total number of rooms on the server) # these fields track absolutes (e.g. total number of rooms on the server)
# You can think of these as Prometheus Gauges.
# You can draw these stats on a line graph.
# Example: number of users in a room
ABSOLUTE_STATS_FIELDS = { ABSOLUTE_STATS_FIELDS = {
"room": ( "room": (
"current_state_events", "current_state_events",
@ -38,17 +41,13 @@ ABSOLUTE_STATS_FIELDS = {
} }
# these fields are per-timeslice and so should be reset to 0 upon a new slice # these fields are per-timeslice and so should be reset to 0 upon a new slice
# You can draw these stats on a histogram.
# Example: number of events sent locally during a time slice
PER_SLICE_FIELDS = {"room": (), "user": ()} PER_SLICE_FIELDS = {"room": (), "user": ()}
TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
class OldCollectionRequired(Exception):
""" Signal that we need to collect old stats rows and retry. """
pass
class StatsStore(StateDeltasStore): class StatsStore(StateDeltasStore):
def __init__(self, db_conn, hs): def __init__(self, db_conn, hs):
super(StatsStore, self).__init__(db_conn, hs) super(StatsStore, self).__init__(db_conn, hs)
@ -188,7 +187,7 @@ class StatsStore(StateDeltasStore):
fields[col] = None fields[col] = None
return self._simple_upsert( return self._simple_upsert(
table="room_state", table="room_stats_state",
keyvalues={"room_id": room_id}, keyvalues={"room_id": room_id},
values=fields, values=fields,
desc="update_room_state", desc="update_room_state",
@ -242,6 +241,94 @@ class StatsStore(StateDeltasStore):
complete_with_stream_id=complete_with_stream_id, complete_with_stream_id=complete_with_stream_id,
) )
def _update_stats_delta_txn(
self,
txn,
ts,
stats_type,
stats_id,
fields,
complete_with_stream_id=None,
absolute_field_overrides=None,
):
"""
See L{update_stats_delta}
Additional Args:
absolute_field_overrides (dict[str, int]): Current stats values
(i.e. not deltas) of absolute fields.
Does not work with per-slice fields.
"""
table, id_col = TYPE_TO_TABLE[stats_type]
quantised_ts = self.quantise_stats_time(int(ts))
end_ts = quantised_ts + self.stats_bucket_size
abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
slice_field_names = PER_SLICE_FIELDS[stats_type]
for field in chain(fields.keys(), absolute_field_overrides.keys()):
if field not in abs_field_names and field not in slice_field_names:
# guard against potential SQL injection dodginess
raise ValueError(
"%s is not a recognised field"
" for stats type %s" % (field, stats_type)
)
# only absolute stats fields are tracked in the `_current` stats tables,
# so those are the only ones that we process deltas for when
# we upsert against the `_current` table.
# This calculates the deltas (`field = field + ?` values)
# for absolute fields,
# * defaulting to 0 if not specified
# (required for the INSERT part of upserting to work)
# * omitting overrides specified in `absolute_field_overrides`
deltas_of_absolute_fields = {
key: fields.get(key, 0)
for key in abs_field_names
if key not in absolute_field_overrides
}
if absolute_field_overrides is None:
absolute_field_overrides = {}
if complete_with_stream_id is not None:
absolute_field_overrides = absolute_field_overrides.copy()
absolute_field_overrides[
"completed_delta_stream_id"
] = complete_with_stream_id
# first upsert the `_current` table
self._upsert_with_additive_relatives_txn(
txn=txn,
table=table + "_current",
keyvalues={id_col: stats_id},
absolutes=absolute_field_overrides,
additive_relatives=deltas_of_absolute_fields,
)
if self.has_completed_background_updates():
# TODO want to check specifically for stats regenerator, not all
# background updates…
# then upsert the `_historical` table.
# we don't support absolute_fields for per-slice fields as it makes
# no sense.
per_slice_additive_relatives = {
key: fields.get(key, 0) for key in slice_field_names
}
self._upsert_copy_from_table_with_additive_relatives_txn(
txn=txn,
into_table=table + "_historical",
keyvalues={id_col: stats_id},
extra_dst_keyvalues={
"end_ts": end_ts,
"bucket_size": self.stats_bucket_size,
},
additive_relatives=per_slice_additive_relatives,
src_table=table + "_current",
copy_columns=abs_field_names,
additional_where=" AND completed_delta_stream_id IS NOT NULL",
)
def _upsert_with_additive_relatives_txn( def _upsert_with_additive_relatives_txn(
self, txn, table, keyvalues, absolutes, additive_relatives self, txn, table, keyvalues, absolutes, additive_relatives
): ):
@ -388,83 +475,3 @@ class StatsStore(StateDeltasStore):
for (key, val) in additive_relatives.items(): for (key, val) in additive_relatives.items():
src_row[key] = dest_current_row[key] + val src_row[key] = dest_current_row[key] + val
self._simple_update_txn(txn, into_table, keyvalues, src_row) self._simple_update_txn(txn, into_table, keyvalues, src_row)
def _update_stats_delta_txn(
self,
txn,
ts,
stats_type,
stats_id,
fields,
complete_with_stream_id=None,
absolute_fields=None,
):
"""
See L{update_stats_delta}
Additional Args:
absolute_fields (dict[str, int]): Absolute current stats values
(i.e. not deltas). Does not work with per-slice fields.
"""
table, id_col = TYPE_TO_TABLE[stats_type]
quantised_ts = self.quantise_stats_time(int(ts))
end_ts = quantised_ts + self.stats_bucket_size
abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
slice_field_names = PER_SLICE_FIELDS[stats_type]
for field in chain(fields.keys(), absolute_fields.keys()):
if field not in abs_field_names and field not in slice_field_names:
# guard against potential SQL injection dodginess
raise ValueError(
"%s is not a recognised field"
" for stats type %s" % (field, stats_type)
)
# only absolute stats fields are tracked in the `_current` stats tables,
# so those are the only ones that we process deltas for when
# we upsert against the `_current` table.
additive_relatives = {
key: fields.get(key, 0)
for key in abs_field_names
if key not in absolute_fields
}
if absolute_fields is None:
absolute_fields = {}
if complete_with_stream_id is not None:
absolute_fields = absolute_fields.copy()
absolute_fields["completed_delta_stream_id"] = complete_with_stream_id
self.get_earliest_token_for_stats.invalidate(stats_type, stats_id)
# first upsert the `_current` table
self._upsert_with_additive_relatives_txn(
txn=txn,
table=table + "_current",
keyvalues={id_col: stats_id},
absolutes=absolute_fields,
additive_relatives=additive_relatives,
)
if self.has_completed_background_updates():
# TODO want to check specifically for stats regenerator, not all
# background updates…
# then upsert the `_historical` table.
# we don't support absolute_fields for per-slice fields as it makes
# no sense.
per_slice_additive_relatives = {
key: fields.get(key, 0) for key in slice_field_names
}
self._upsert_copy_from_table_with_additive_relatives_txn(
txn=txn,
into_table=table + "_historical",
keyvalues={id_col: stats_id},
extra_dst_keyvalues={
"end_ts": end_ts,
"bucket_size": self.stats_bucket_size,
},
additive_relatives=per_slice_additive_relatives,
src_table=table + "_current",
copy_columns=abs_field_names,
additional_where=" AND completed_delta_stream_id IS NOT NULL",
)