mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-19 09:31:35 +03:00
Handle state deltas and turn them into stats deltas
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
This commit is contained in:
parent
80a1c6e9e5
commit
e4cbea6c46
2 changed files with 439 additions and 2 deletions
|
@ -15,7 +15,14 @@
|
|||
|
||||
import logging
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes, JoinRules, Membership
|
||||
from synapse.handlers.state_deltas import StateDeltasHandler
|
||||
from synapse.metrics import event_processing_positions
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.types import UserID
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -52,4 +59,324 @@ class StatsHandler(StateDeltasHandler):
|
|||
def notify_new_event(self):
|
||||
"""Called when there may be more deltas to process
|
||||
"""
|
||||
pass
|
||||
if not self.hs.config.stats_enabled:
|
||||
return
|
||||
|
||||
lock = self.store.stats_delta_processing_lock
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def process():
|
||||
try:
|
||||
yield self._unsafe_process()
|
||||
finally:
|
||||
lock.release()
|
||||
|
||||
if lock.acquire(blocking=False):
|
||||
# we only want to run this process one-at-a-time,
|
||||
# and also, if the initial background updater wants us to keep out,
|
||||
# we should respect that.
|
||||
try:
|
||||
run_as_background_process("stats.notify_new_event", process)
|
||||
except: # noqa: E722 – re-raised so fine
|
||||
lock.release()
|
||||
raise
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _unsafe_process(self):
|
||||
# If self.pos is None then means we haven't fetched it from DB
|
||||
if self.pos is None or None in self.pos.values():
|
||||
self.pos = yield self.store.get_stats_positions()
|
||||
|
||||
# If still None then the initial background update hasn't started yet
|
||||
if self.pos is None or None in self.pos.values():
|
||||
return None
|
||||
|
||||
# Loop round handling deltas until we're up to date
|
||||
with Measure(self.clock, "stats_delta"):
|
||||
while True:
|
||||
deltas = yield self.store.get_current_state_deltas(
|
||||
self.pos["state_delta_stream_id"]
|
||||
)
|
||||
if not deltas:
|
||||
break
|
||||
|
||||
logger.debug("Handling %d state deltas", len(deltas))
|
||||
yield self._handle_deltas(deltas)
|
||||
|
||||
self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"]
|
||||
|
||||
event_processing_positions.labels("stats").set(
|
||||
self.pos["state_delta_stream_id"]
|
||||
)
|
||||
|
||||
if self.pos is not None:
|
||||
yield self.store.update_stats_positions(self.pos)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _handle_deltas(self, deltas):
|
||||
"""
|
||||
Called with the state deltas to process
|
||||
"""
|
||||
for delta in deltas:
|
||||
typ = delta["type"]
|
||||
state_key = delta["state_key"]
|
||||
room_id = delta["room_id"]
|
||||
event_id = delta["event_id"]
|
||||
stream_id = delta["stream_id"]
|
||||
prev_event_id = delta["prev_event_id"]
|
||||
stream_pos = delta["stream_id"]
|
||||
|
||||
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
|
||||
|
||||
token = yield self.store.get_earliest_token_for_stats("room", room_id)
|
||||
|
||||
# If the earliest token to begin from is larger than our current
|
||||
# stream ID, skip processing this delta.
|
||||
if token is not None and token >= stream_id:
|
||||
logger.debug(
|
||||
"Ignoring: %s as earlier than this room's initial ingestion event",
|
||||
event_id,
|
||||
)
|
||||
continue
|
||||
|
||||
if event_id is None and prev_event_id is None:
|
||||
# Errr...
|
||||
continue
|
||||
|
||||
event_content = {}
|
||||
|
||||
if event_id is not None:
|
||||
event = yield self.store.get_event(event_id, allow_none=True)
|
||||
if event:
|
||||
event_content = event.content or {}
|
||||
|
||||
# We use stream_pos here rather than fetch by event_id as event_id
|
||||
# may be None
|
||||
now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
|
||||
now = int(now) // 1000
|
||||
|
||||
room_stats_delta = {}
|
||||
room_stats_complete = False
|
||||
|
||||
if prev_event_id is None:
|
||||
# this state event doesn't overwrite another,
|
||||
# so it is a new effective/current state event
|
||||
room_stats_delta["current_state_events"] = (
|
||||
room_stats_delta.get("current_state_events", 0) + 1
|
||||
)
|
||||
|
||||
if typ == EventTypes.Member:
|
||||
# we could use _get_key_change here but it's a bit inefficient
|
||||
# given we're not testing for a specific result; might as well
|
||||
# just grab the prev_membership and membership strings and
|
||||
# compare them.
|
||||
# We take None rather than leave as a previous membership
|
||||
# in the absence of a previous event because we do not want to
|
||||
# reduce the leave count when a new-to-the-room user joins.
|
||||
prev_membership = None
|
||||
if prev_event_id is not None:
|
||||
prev_event = yield self.store.get_event(
|
||||
prev_event_id, allow_none=True
|
||||
)
|
||||
if prev_event:
|
||||
prev_event_content = prev_event.content
|
||||
prev_membership = prev_event_content.get(
|
||||
"membership", Membership.LEAVE
|
||||
)
|
||||
|
||||
membership = event_content.get("membership", Membership.LEAVE)
|
||||
|
||||
if prev_membership is None:
|
||||
logger.debug("No previous membership for this user.")
|
||||
elif prev_membership == Membership.JOIN:
|
||||
room_stats_delta["joined_members"] = (
|
||||
room_stats_delta.get("joined_members", 0) - 1
|
||||
)
|
||||
elif prev_membership == Membership.INVITE:
|
||||
room_stats_delta["invited_members"] = (
|
||||
room_stats_delta.get("invited_members", 0) - 1
|
||||
)
|
||||
elif prev_membership == Membership.LEAVE:
|
||||
room_stats_delta["left_members"] = (
|
||||
room_stats_delta.get("left_members", 0) - 1
|
||||
)
|
||||
elif prev_membership == Membership.BAN:
|
||||
room_stats_delta["banned_members"] = (
|
||||
room_stats_delta.get("banned_members", 0) - 1
|
||||
)
|
||||
else:
|
||||
err = "%s is not a valid prev_membership" % (repr(prev_membership),)
|
||||
logger.error(err)
|
||||
raise ValueError(err)
|
||||
|
||||
if membership == Membership.JOIN:
|
||||
room_stats_delta["joined_members"] = (
|
||||
room_stats_delta.get("joined_members", 0) + 1
|
||||
)
|
||||
elif membership == Membership.INVITE:
|
||||
room_stats_delta["invited_members"] = (
|
||||
room_stats_delta.get("invited_members", 0) + 1
|
||||
)
|
||||
elif membership == Membership.LEAVE:
|
||||
room_stats_delta["left_members"] = (
|
||||
room_stats_delta.get("left_members", 0) + 1
|
||||
)
|
||||
elif membership == Membership.BAN:
|
||||
room_stats_delta["banned_members"] = (
|
||||
room_stats_delta.get("banned_members", 0) + 1
|
||||
)
|
||||
else:
|
||||
err = "%s is not a valid membership" % (repr(membership),)
|
||||
logger.error(err)
|
||||
raise ValueError(err)
|
||||
|
||||
user_id = state_key
|
||||
if self.is_mine_id(user_id) and membership in (
|
||||
Membership.JOIN,
|
||||
Membership.LEAVE,
|
||||
):
|
||||
# update user_stats as it's one of our users
|
||||
public = yield self._is_public_room(room_id)
|
||||
|
||||
field = "public_rooms" if public else "private_rooms"
|
||||
delta = +1 if membership == Membership.JOIN else -1
|
||||
|
||||
yield self.store.update_stats_delta(
|
||||
now, "user", user_id, {field: delta}
|
||||
)
|
||||
|
||||
elif typ == EventTypes.Create:
|
||||
# Newly created room. Add it with all blank portions.
|
||||
yield self.store.update_room_state(
|
||||
room_id,
|
||||
{
|
||||
"join_rules": None,
|
||||
"history_visibility": None,
|
||||
"encryption": None,
|
||||
"name": None,
|
||||
"topic": None,
|
||||
"avatar": None,
|
||||
"canonical_alias": None,
|
||||
},
|
||||
)
|
||||
|
||||
room_stats_complete = True
|
||||
|
||||
elif typ == EventTypes.JoinRules:
|
||||
old_room_state = yield self.store.get_room_state(room_id)
|
||||
yield self.store.update_room_state(
|
||||
room_id, {"join_rules": event_content.get("join_rule")}
|
||||
)
|
||||
|
||||
# whether the room would be public anyway,
|
||||
# because of history_visibility
|
||||
other_field_gives_publicity = (
|
||||
old_room_state["history_visibility"] == "world_readable"
|
||||
)
|
||||
|
||||
if not other_field_gives_publicity:
|
||||
is_public = yield self._get_key_change(
|
||||
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
|
||||
)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(now, room_id, is_public)
|
||||
|
||||
elif typ == EventTypes.RoomHistoryVisibility:
|
||||
old_room_state = yield self.store.get_room_state(room_id)
|
||||
yield self.store.update_room_state(
|
||||
room_id,
|
||||
{"history_visibility": event_content.get("history_visibility")},
|
||||
)
|
||||
|
||||
# whether the room would be public anyway,
|
||||
# because of join_rule
|
||||
other_field_gives_publicity = (
|
||||
old_room_state["join_rules"] == JoinRules.PUBLIC
|
||||
)
|
||||
|
||||
if not other_field_gives_publicity:
|
||||
is_public = yield self._get_key_change(
|
||||
prev_event_id, event_id, "history_visibility", "world_readable"
|
||||
)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(now, room_id, is_public)
|
||||
|
||||
elif typ == EventTypes.Encryption:
|
||||
yield self.store.update_room_state(
|
||||
room_id, {"encryption": event_content.get("algorithm")}
|
||||
)
|
||||
elif typ == EventTypes.Name:
|
||||
yield self.store.update_room_state(
|
||||
room_id, {"name": event_content.get("name")}
|
||||
)
|
||||
elif typ == EventTypes.Topic:
|
||||
yield self.store.update_room_state(
|
||||
room_id, {"topic": event_content.get("topic")}
|
||||
)
|
||||
elif typ == EventTypes.RoomAvatar:
|
||||
yield self.store.update_room_state(
|
||||
room_id, {"avatar": event_content.get("url")}
|
||||
)
|
||||
elif typ == EventTypes.CanonicalAlias:
|
||||
yield self.store.update_room_state(
|
||||
room_id, {"canonical_alias": event_content.get("alias")}
|
||||
)
|
||||
|
||||
if room_stats_complete:
|
||||
yield self.store.update_stats_delta(
|
||||
now,
|
||||
"room",
|
||||
room_id,
|
||||
room_stats_delta,
|
||||
complete_with_stream_id=stream_id,
|
||||
)
|
||||
|
||||
elif len(room_stats_delta) > 0:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, room_stats_delta
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_public_room_stats(self, ts, room_id, is_public):
|
||||
"""
|
||||
Increment/decrement a user's number of public rooms when a room they are
|
||||
in changes to/from public visibility.
|
||||
|
||||
Args:
|
||||
ts (int): Timestamp in seconds
|
||||
room_id (str)
|
||||
is_public (bool)
|
||||
"""
|
||||
# For now, blindly iterate over all local users in the room so that
|
||||
# we can handle the whole problem of copying buckets over as needed
|
||||
user_ids = yield self.store.get_users_in_room(room_id)
|
||||
|
||||
for user_id in user_ids:
|
||||
if self.hs.is_mine(UserID.from_string(user_id)):
|
||||
yield self.store.update_stats_delta(
|
||||
ts,
|
||||
"user",
|
||||
user_id,
|
||||
{
|
||||
"public_rooms": +1 if is_public else -1,
|
||||
"private_rooms": -1 if is_public else +1,
|
||||
},
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _is_public_room(self, room_id):
|
||||
join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules)
|
||||
history_visibility = yield self.state.get_current_state(
|
||||
room_id, EventTypes.RoomHistoryVisibility
|
||||
)
|
||||
|
||||
if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or (
|
||||
(
|
||||
history_visibility
|
||||
and history_visibility.content.get("history_visibility")
|
||||
== "world_readable"
|
||||
)
|
||||
):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
|
|
@ -15,9 +15,12 @@
|
|||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from threading import Lock
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.storage.state_deltas import StateDeltasStore
|
||||
from twisted.internet import defer
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -55,6 +58,8 @@ class StatsStore(StateDeltasStore):
|
|||
self.stats_enabled = hs.config.stats_enabled
|
||||
self.stats_bucket_size = hs.config.stats_bucket_size
|
||||
|
||||
self.stats_delta_processing_lock = Lock()
|
||||
|
||||
self.register_noop_background_update("populate_stats_createtables")
|
||||
self.register_noop_background_update("populate_stats_process_rooms")
|
||||
self.register_noop_background_update("populate_stats_cleanup")
|
||||
|
@ -74,6 +79,91 @@ class StatsStore(StateDeltasStore):
|
|||
"""
|
||||
return (ts // self.stats_bucket_size) * self.stats_bucket_size
|
||||
|
||||
def get_stats_positions(self, for_initial_processor=False):
|
||||
"""
|
||||
Returns the stats processor positions.
|
||||
|
||||
Args:
|
||||
for_initial_processor (bool, optional): If true, returns the position
|
||||
promised by the latest stats regeneration, rather than the current
|
||||
incremental processor's position.
|
||||
Otherwise (if false), return the incremental processor's position.
|
||||
|
||||
Returns (dict):
|
||||
Dict containing :-
|
||||
state_delta_stream_id: stream_id of last-processed state delta
|
||||
total_events_min_stream_ordering: stream_ordering of latest-processed
|
||||
backfilled event, in the context of total_events counting.
|
||||
total_events_max_stream_ordering: stream_ordering of latest-processed
|
||||
non-backfilled event, in the context of total_events counting.
|
||||
"""
|
||||
return self._simple_select_one(
|
||||
table="stats_incremental_position",
|
||||
keyvalues={"is_background_contract": for_initial_processor},
|
||||
retcols=(
|
||||
"state_delta_stream_id",
|
||||
"total_events_min_stream_ordering",
|
||||
"total_events_max_stream_ordering",
|
||||
),
|
||||
desc="stats_incremental_position",
|
||||
)
|
||||
|
||||
def _get_stats_positions_txn(self, txn, for_initial_processor=False):
|
||||
"""
|
||||
See L{get_stats_positions}.
|
||||
|
||||
Args:
|
||||
txn (cursor): Database cursor
|
||||
"""
|
||||
return self._simple_select_one_txn(
|
||||
txn=txn,
|
||||
table="stats_incremental_position",
|
||||
keyvalues={"is_background_contract": for_initial_processor},
|
||||
retcols=(
|
||||
"state_delta_stream_id",
|
||||
"total_events_min_stream_ordering",
|
||||
"total_events_max_stream_ordering",
|
||||
),
|
||||
)
|
||||
|
||||
def update_stats_positions(self, positions, for_initial_processor=False):
|
||||
"""
|
||||
Updates the stats processor positions.
|
||||
|
||||
Args:
|
||||
positions: See L{get_stats_positions}
|
||||
for_initial_processor: See L{get_stats_positions}
|
||||
"""
|
||||
if positions is None:
|
||||
positions = {
|
||||
"state_delta_stream_id": None,
|
||||
"total_events_min_stream_ordering": None,
|
||||
"total_events_max_stream_ordering": None,
|
||||
}
|
||||
return self._simple_update_one(
|
||||
table="stats_incremental_position",
|
||||
keyvalues={"is_background_contract": for_initial_processor},
|
||||
updatevalues=positions,
|
||||
desc="update_stats_incremental_position",
|
||||
)
|
||||
|
||||
def _update_stats_positions_txn(self, txn, positions, for_initial_processor=False):
|
||||
"""
|
||||
See L{update_stats_positions}
|
||||
"""
|
||||
if positions is None:
|
||||
positions = {
|
||||
"state_delta_stream_id": None,
|
||||
"total_events_min_stream_ordering": None,
|
||||
"total_events_max_stream_ordering": None,
|
||||
}
|
||||
return self._simple_update_one_txn(
|
||||
txn,
|
||||
table="stats_incremental_position",
|
||||
keyvalues={"is_background_contract": for_initial_processor},
|
||||
updatevalues=positions,
|
||||
)
|
||||
|
||||
def update_room_state(self, room_id, fields):
|
||||
"""
|
||||
Args:
|
||||
|
@ -103,6 +193,26 @@ class StatsStore(StateDeltasStore):
|
|||
desc="update_room_state",
|
||||
)
|
||||
|
||||
@cached()
|
||||
def get_earliest_token_for_stats(self, stats_type, id):
|
||||
"""
|
||||
Fetch the "earliest token". This is used by the room stats delta
|
||||
processor to ignore deltas that have been processed between the
|
||||
start of the background task and any particular room's stats
|
||||
being calculated.
|
||||
|
||||
Returns:
|
||||
Deferred[int]
|
||||
"""
|
||||
table, id_col = TYPE_TO_TABLE[stats_type]
|
||||
|
||||
return self._simple_select_one_onecol(
|
||||
"%s_current" % (table,),
|
||||
{id_col: id},
|
||||
retcol="completed_delta_stream_id",
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_stats_delta(
|
||||
self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
|
||||
|
|
Loading…
Reference in a new issue