Docstrings in storage.stats.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
This commit is contained in:
Olivier Wilkinson (reivilibre) 2019-08-12 16:22:18 +01:00
parent 314567d62d
commit 182cdcbf24

View file

@ -36,7 +36,7 @@ ABSOLUTE_STATS_FIELDS = {
"invited_members", "invited_members",
"left_members", "left_members",
"banned_members", "banned_members",
"total_events", # TODO review this list "total_events",
), ),
"user": ("public_rooms", "private_rooms"), "user": ("public_rooms", "private_rooms"),
} }
@ -78,6 +78,16 @@ class StatsStore(StateDeltasStore):
) )
def quantise_stats_time(self, ts): def quantise_stats_time(self, ts):
"""
Quantises a timestamp to be a multiple of the bucket size.
Args:
ts: the timestamp to quantise, in seconds since the Unix Epoch
Returns:
a timestamp which is divisible by the bucket size,
is no later than `ts` and is the largest such timestamp.
"""
return (ts // self.stats_bucket_size) * self.stats_bucket_size return (ts // self.stats_bucket_size) * self.stats_bucket_size
@defer.inlineCallbacks @defer.inlineCallbacks
@ -120,6 +130,10 @@ class StatsStore(StateDeltasStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def _populate_stats_prepare(self, progress, batch_size): def _populate_stats_prepare(self, progress, batch_size):
"""
This is a background update, which prepares the database for
statistics regeneration.
"""
if not self.stats_enabled: if not self.stats_enabled:
yield self._end_background_update("populate_stats_prepare") yield self._end_background_update("populate_stats_prepare")
@ -167,6 +181,9 @@ class StatsStore(StateDeltasStore):
txn.execute(statement) txn.execute(statement)
def _delete_dirty_skeletons(txn): def _delete_dirty_skeletons(txn):
"""
Delete pre-existing rows which are incomplete.
"""
sqls = """ sqls = """
DELETE FROM room_stats_current DELETE FROM room_stats_current
WHERE completed_delta_stream_id IS NULL; WHERE completed_delta_stream_id IS NULL;
@ -202,14 +219,20 @@ class StatsStore(StateDeltasStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def _populate_stats_cleanup(self, progress, batch_size): def _populate_stats_cleanup(self, progress, batch_size):
"""
This is a background update which cleans up after statistics regeneration.
"""
# TODO is there really no clean-up to be done? # TODO is there really no clean-up to be done?
# TODO if not self.stats_enabled…. # TODO if not self.stats_enabled cleanup.
yield self._end_background_update("populate_stats_cleanup") yield self._end_background_update("populate_stats_cleanup")
defer.returnValue(1) defer.returnValue(1)
@defer.inlineCallbacks @defer.inlineCallbacks
def _populate_stats_process_users(self, progress, batch_size): def _populate_stats_process_users(self, progress, batch_size):
"""
This is a background update which regenerates statistics for users.
"""
if not self.stats_enabled: if not self.stats_enabled:
yield self._end_background_update("populate_stats_process_users") yield self._end_background_update("populate_stats_process_users")
defer.returnValue(1) defer.returnValue(1)
@ -307,7 +330,6 @@ class StatsStore(StateDeltasStore):
"private_rooms": room_counts_by_publicness.get(False, 0), "private_rooms": room_counts_by_publicness.get(False, 0),
}, },
) )
# TODO CHECK: actually want to **overwrite** some of these!
except OldCollectionRequired: except OldCollectionRequired:
# this can't (shouldn't) actually happen # this can't (shouldn't) actually happen
# since we only run the background update for incomplete rows # since we only run the background update for incomplete rows
@ -347,6 +369,9 @@ class StatsStore(StateDeltasStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def _populate_stats_process_rooms(self, progress, batch_size): def _populate_stats_process_rooms(self, progress, batch_size):
"""
This is a background update which regenerates statistics for rooms.
"""
if not self.stats_enabled: if not self.stats_enabled:
yield self._end_background_update("populate_stats_process_rooms") yield self._end_background_update("populate_stats_process_rooms")
defer.returnValue(1) defer.returnValue(1)
@ -490,7 +515,6 @@ class StatsStore(StateDeltasStore):
"banned_members": membership_counts.get(Membership.BAN, 0), "banned_members": membership_counts.get(Membership.BAN, 0),
}, },
) )
# TODO CHECK: actually want to **overwrite** some of these!
except OldCollectionRequired: except OldCollectionRequired:
# this can't (shouldn't) actually happen # this can't (shouldn't) actually happen
# since we only run the background update for incomplete rows # since we only run the background update for incomplete rows
@ -581,6 +605,18 @@ class StatsStore(StateDeltasStore):
continue continue
def _count_events_in_room_txn(self, txn, room_id, low_token, high_token): def _count_events_in_room_txn(self, txn, room_id, low_token, high_token):
"""
Count the number of events in a room between two tokens, inclusive.
Args:
txn (cursor): The database
room_id (str): The ID of the room to count events for
low_token (int): the minimum stream ordering to count
high_token (int): the maximum stream ordering to count
Returns (int):
the number of events
"""
sql = """ sql = """
SELECT COUNT(*) AS num_events SELECT COUNT(*) AS num_events
FROM events FROM events
@ -595,6 +631,7 @@ class StatsStore(StateDeltasStore):
""" """
Delete all statistics records. Delete all statistics records.
TODO obsolete? TODO obsolete?
TODO at least will need updating
""" """
def _delete_all_stats_txn(txn): def _delete_all_stats_txn(txn):
@ -606,6 +643,23 @@ class StatsStore(StateDeltasStore):
return self.runInteraction("delete_all_stats", _delete_all_stats_txn) return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
def get_stats_positions(self, for_initial_processor=False): def get_stats_positions(self, for_initial_processor=False):
"""
Returns the stats processor positions.
Args:
for_initial_processor (bool, optional): If true, returns the position
promised by the latest stats regeneration, rather than the current
incremental processor's position.
Otherwise (if false), return the incremental processor's position.
Returns (dict):
Dict containing :-
state_delta_stream_id: stream_id of last-processed state delta
total_events_min_stream_ordering: stream_ordering of latest-processed
backfilled event, in the context of total_events counting.
total_events_max_stream_ordering: stream_ordering of latest-processed
non-backfilled event, in the context of total_events counting.
"""
return self._simple_select_one( return self._simple_select_one(
table="stats_incremental_position", table="stats_incremental_position",
keyvalues={"is_background_contract": for_initial_processor}, keyvalues={"is_background_contract": for_initial_processor},
@ -618,6 +672,12 @@ class StatsStore(StateDeltasStore):
) )
def _get_stats_positions_txn(self, txn, for_initial_processor=False): def _get_stats_positions_txn(self, txn, for_initial_processor=False):
"""
See L{get_stats_positions}.
Args:
txn (cursor): Database cursor
"""
return self._simple_select_one_txn( return self._simple_select_one_txn(
txn=txn, txn=txn,
table="stats_incremental_position", table="stats_incremental_position",
@ -630,6 +690,13 @@ class StatsStore(StateDeltasStore):
) )
def update_stats_positions(self, positions, for_initial_processor=False): def update_stats_positions(self, positions, for_initial_processor=False):
"""
Updates the stats processor positions.
Args:
positions: See L{get_stats_positions}
for_initial_processor: See L{get_stats_positions}
"""
if positions is None: if positions is None:
positions = { positions = {
"state_delta_stream_id": None, "state_delta_stream_id": None,
@ -644,6 +711,9 @@ class StatsStore(StateDeltasStore):
) )
def _update_stats_positions_txn(self, txn, positions, for_initial_processor=False): def _update_stats_positions_txn(self, txn, positions, for_initial_processor=False):
"""
See L{update_stats_positions}
"""
if positions is None: if positions is None:
positions = { positions = {
"state_delta_stream_id": None, "state_delta_stream_id": None,
@ -709,10 +779,12 @@ class StatsStore(StateDeltasStore):
size, size,
) )
# TODO fix and account for _current.
def _get_statistics_for_subject_txn( def _get_statistics_for_subject_txn(
self, txn, stats_type, stats_id, start, size=100 self, txn, stats_type, stats_id, start, size=100
): ):
"""
Transaction-bound version of L{get_statistics_for_subject}.
"""
table, id_col = TYPE_TO_TABLE[stats_type] table, id_col = TYPE_TO_TABLE[stats_type]
selected_columns = list( selected_columns = list(
@ -740,8 +812,15 @@ class StatsStore(StateDeltasStore):
allow_none=True, allow_none=True,
) )
if current is not None and current["end_ts"] is not None: if current is not None:
completed = current["completed_delta_stream_id"] is not None
dirty = current["end_ts"] is not None
if completed and dirty:
# it is dirty, so contains new information, so should be included # it is dirty, so contains new information, so should be included
# we don't accept incomplete rows as that would almost certainly
# be giving misinformation, since it is awaiting an
# initial background count
current["bucket_size"] = current["end_ts"] - current["start_ts"] current["bucket_size"] = current["end_ts"] - current["start_ts"]
del current["start_ts"] del current["start_ts"]
return [current] + slice_list return [current] + slice_list
@ -753,6 +832,17 @@ class StatsStore(StateDeltasStore):
) )
def get_room_state(self, room_id): def get_room_state(self, room_id):
"""
Returns the current room_state for a room.
Args:
room_id (str): The ID of the room to return state for.
Returns (dict):
Dictionary containing these keys:
"name", "topic", "canonical_alias", "avatar", "join_rules",
"history_visibility"
"""
return self._simple_select_one( return self._simple_select_one(
"room_state", "room_state",
{"room_id": room_id}, {"room_id": room_id},
@ -787,6 +877,14 @@ class StatsStore(StateDeltasStore):
) )
def _collect_old_txn(self, txn, stats_type, limit=500): def _collect_old_txn(self, txn, stats_type, limit=500):
"""
See {collect_old}. Runs only a small batch, specified by limit.
Returns (bool):
True iff there is possibly more to do (i.e. this needs re-running),
False otherwise.
"""
# we do them in batches to prevent concurrent updates from # we do them in batches to prevent concurrent updates from
# messing us over with lots of retries # messing us over with lots of retries
@ -801,11 +899,12 @@ class StatsStore(StateDeltasStore):
) )
) )
sql = ("SELECT %s FROM %s_current WHERE end_ts <= ? LIMIT %d FOR UPDATE") % ( # `end_ts IS NOT NULL` is for partial index optimisation
id_col, sql = (
table, "SELECT %s FROM %s_current"
limit, " WHERE end_ts <= ? AND end_ts IS NOT NULL"
) " LIMIT %d FOR UPDATE"
) % (id_col, table, limit)
txn.execute(sql, (quantised_ts,)) txn.execute(sql, (quantised_ts,))
maybe_more = txn.rowcount == limit maybe_more = txn.rowcount == limit
updates = txn.fetchall() updates = txn.fetchall()
@ -827,6 +926,19 @@ class StatsStore(StateDeltasStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def collect_old(self, stats_type): def collect_old(self, stats_type):
"""
Run 'old collection' on current stats rows.
Old collection is the process of copying dirty (updated) stats rows
from the current table to the historical table, when those rows have
finished their stats time slice.
Collected rows are then cleared of their dirty status.
Args:
stats_type: "room" or "user" the type of stats to run old collection
on.
"""
while True: while True:
maybe_more = yield self.runInteraction( maybe_more = yield self.runInteraction(
"stats_collect_old", self._collect_old_txn, stats_type "stats_collect_old", self._collect_old_txn, stats_type
@ -839,16 +951,18 @@ class StatsStore(StateDeltasStore):
self, ts, stats_type, stats_id, fields, complete_with_stream_id=None self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
): ):
""" """
Updates the statistics for a subject, with a delta (difference/relative
change).
Args: Args:
ts (int): ts (int): timestamp of the change
stats_type (str): stats_type (str): "room" or "user" the kind of subject
stats_id (str): stats_id (str): the subject's ID (room ID or user ID)
fields (dict[str, int]): Deltas of stats values. fields (dict[str, int]): Deltas of stats values.
complete_with_stream_id (int, optional): complete_with_stream_id (int, optional):
If supplied, converts an incomplete row into a complete row,
Returns: with the supplied stream_id marked as the stream_id where the
row was completed.
""" """
while True: while True:
@ -877,6 +991,11 @@ class StatsStore(StateDeltasStore):
complete_with_stream_id=None, complete_with_stream_id=None,
absolute_fields=None, absolute_fields=None,
): ):
"""
See L{update_stats_delta}
Additional Args:
absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas).
"""
table, id_col = TYPE_TO_TABLE[stats_type] table, id_col = TYPE_TO_TABLE[stats_type]
quantised_ts = self.quantise_stats_time(int(ts)) quantised_ts = self.quantise_stats_time(int(ts))
@ -923,7 +1042,7 @@ class StatsStore(StateDeltasStore):
insertee = { insertee = {
id_col: stats_id, id_col: stats_id,
"end_ts": end_ts, "end_ts": end_ts,
"start_ts": ts, # TODO or do we use qts? "start_ts": ts,
"completed_delta_stream_id": complete_with_stream_id, "completed_delta_stream_id": complete_with_stream_id,
} }
@ -968,6 +1087,19 @@ class StatsStore(StateDeltasStore):
raise OldCollectionRequired() raise OldCollectionRequired()
def incremental_update_total_events(self, in_positions): def incremental_update_total_events(self, in_positions):
"""
Counts the number of events per-room and then adds these to the respective
total_events room counts.
Args:
in_positions (dict): Positions,
as retrieved from L{get_stats_positions}.
Returns (dict):
The new positions. Note that this is for reference only
the new positions WILL be committed by this function.
"""
def incremental_update_total_events_txn(txn): def incremental_update_total_events_txn(txn):
positions = in_positions.copy() positions = in_positions.copy()