mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-21 12:14:29 +03:00
Address code review comments
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
This commit is contained in:
parent
dfb22fec48
commit
81aa6d53b0
1 changed files with 30 additions and 24 deletions
|
@ -76,20 +76,18 @@ class StatsHandler(StateDeltasHandler):
|
|||
# we only want to run this process one-at-a-time,
|
||||
# and also, if the initial background updater wants us to keep out,
|
||||
# we should respect that.
|
||||
try:
|
||||
run_as_background_process("stats.notify_new_event", process)
|
||||
except: # noqa: E722 – re-raised so fine
|
||||
lock.release()
|
||||
raise
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _unsafe_process(self):
|
||||
# If self.pos is None then means we haven't fetched it from DB
|
||||
# If None is one of the values, then means that the stats regenerator has not (or had not) yet unwedged us
|
||||
# but note that this might be outdated, so we retrieve the positions again.
|
||||
if self.pos is None or None in self.pos.values():
|
||||
self.pos = yield self.store.get_stats_positions()
|
||||
|
||||
# If still None then the initial background update hasn't started yet
|
||||
if self.pos is None or None in self.pos.values():
|
||||
# If still contains a None position, then the stats regenerator hasn't started yet
|
||||
if None in self.pos.values():
|
||||
return None
|
||||
|
||||
# Loop round handling deltas until we're up to date
|
||||
|
@ -141,7 +139,10 @@ class StatsHandler(StateDeltasHandler):
|
|||
continue
|
||||
|
||||
if event_id is None and prev_event_id is None:
|
||||
# Errr...
|
||||
logger.error(
|
||||
"event ID is None and so is the previous event ID. stream_id: %s",
|
||||
stream_id,
|
||||
)
|
||||
continue
|
||||
|
||||
event_content = {}
|
||||
|
@ -153,11 +154,14 @@ class StatsHandler(StateDeltasHandler):
|
|||
|
||||
# We use stream_pos here rather than fetch by event_id as event_id
|
||||
# may be None
|
||||
now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
|
||||
now = int(now)
|
||||
stream_timestamp = yield self.store.get_received_ts_by_stream_pos(
|
||||
stream_pos
|
||||
)
|
||||
stream_timestamp = int(stream_timestamp)
|
||||
|
||||
# All the values in this dict are deltas (RELATIVE changes)
|
||||
room_stats_delta = {}
|
||||
room_stats_complete = False
|
||||
is_newly_created = False
|
||||
|
||||
if prev_event_id is None:
|
||||
# this state event doesn't overwrite another,
|
||||
|
@ -198,9 +202,9 @@ class StatsHandler(StateDeltasHandler):
|
|||
elif prev_membership == Membership.BAN:
|
||||
room_stats_delta["banned_members"] = -1
|
||||
else:
|
||||
err = "%s is not a valid prev_membership" % (repr(prev_membership),)
|
||||
logger.error(err)
|
||||
raise ValueError(err)
|
||||
raise ValueError(
|
||||
"%r is not a valid prev_membership" % (prev_membership,)
|
||||
)
|
||||
|
||||
if membership == prev_membership:
|
||||
pass # noop
|
||||
|
@ -213,9 +217,7 @@ class StatsHandler(StateDeltasHandler):
|
|||
elif membership == Membership.BAN:
|
||||
room_stats_delta["banned_members"] = +1
|
||||
else:
|
||||
err = "%s is not a valid membership" % (repr(membership),)
|
||||
logger.error(err)
|
||||
raise ValueError(err)
|
||||
raise ValueError("%r is not a valid membership" % (membership,))
|
||||
|
||||
user_id = state_key
|
||||
if self.is_mine_id(user_id):
|
||||
|
@ -232,7 +234,7 @@ class StatsHandler(StateDeltasHandler):
|
|||
delta = +1 if membership == Membership.JOIN else -1
|
||||
|
||||
yield self.store.update_stats_delta(
|
||||
now, "user", user_id, {field: delta}
|
||||
stream_timestamp, "user", user_id, {field: delta}
|
||||
)
|
||||
|
||||
elif typ == EventTypes.Create:
|
||||
|
@ -250,7 +252,7 @@ class StatsHandler(StateDeltasHandler):
|
|||
},
|
||||
)
|
||||
|
||||
room_stats_complete = True
|
||||
is_newly_created = True
|
||||
|
||||
elif typ == EventTypes.JoinRules:
|
||||
old_room_state = yield self.store.get_room_state(room_id)
|
||||
|
@ -269,7 +271,9 @@ class StatsHandler(StateDeltasHandler):
|
|||
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
|
||||
)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(now, room_id, is_public)
|
||||
yield self.update_public_room_stats(
|
||||
stream_timestamp, room_id, is_public
|
||||
)
|
||||
|
||||
elif typ == EventTypes.RoomHistoryVisibility:
|
||||
old_room_state = yield self.store.get_room_state(room_id)
|
||||
|
@ -289,7 +293,9 @@ class StatsHandler(StateDeltasHandler):
|
|||
prev_event_id, event_id, "history_visibility", "world_readable"
|
||||
)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(now, room_id, is_public)
|
||||
yield self.update_public_room_stats(
|
||||
stream_timestamp, room_id, is_public
|
||||
)
|
||||
|
||||
elif typ == EventTypes.Encryption:
|
||||
yield self.store.update_room_state(
|
||||
|
@ -312,9 +318,9 @@ class StatsHandler(StateDeltasHandler):
|
|||
room_id, {"canonical_alias": event_content.get("alias")}
|
||||
)
|
||||
|
||||
if room_stats_complete:
|
||||
if is_newly_created:
|
||||
yield self.store.update_stats_delta(
|
||||
now,
|
||||
stream_timestamp,
|
||||
"room",
|
||||
room_id,
|
||||
room_stats_delta,
|
||||
|
@ -323,7 +329,7 @@ class StatsHandler(StateDeltasHandler):
|
|||
|
||||
elif len(room_stats_delta) > 0:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, room_stats_delta
|
||||
stream_timestamp, "room", room_id, room_stats_delta
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
|
Loading…
Reference in a new issue