mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-23 01:55:53 +03:00
Add Measures to presence
This commit is contained in:
parent
4a95eb0a12
commit
24d9f2c140
1 changed files with 63 additions and 58 deletions
|
@ -31,6 +31,7 @@ from synapse.storage.presence import UserPresenceState
|
|||
|
||||
from synapse.util.logcontext import preserve_fn
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.util.wheel_timer import WheelTimer
|
||||
from synapse.types import UserID
|
||||
import synapse.metrics
|
||||
|
@ -209,57 +210,60 @@ class PresenceHandler(BaseHandler):
|
|||
"""
|
||||
now = self.clock.time_msec()
|
||||
|
||||
# NOTE: We purposefully don't yield between now and when we've
|
||||
# calculated what we want to do with the new states, to avoid races.
|
||||
with Measure(self.clock, "presence_update_states"):
|
||||
|
||||
to_notify = {} # Changes we want to notify everyone about
|
||||
to_federation_ping = {} # These need sending keep-alives
|
||||
for new_state in new_states:
|
||||
user_id = new_state.user_id
|
||||
# NOTE: We purposefully don't yield between now and when we've
|
||||
# calculated what we want to do with the new states, to avoid races.
|
||||
|
||||
# Its fine to not hit the database here, as the only thing not in
|
||||
# the current state cache are OFFLINE states, where the only field
|
||||
# of interest is last_active which is safe enough to assume is 0
|
||||
# here.
|
||||
prev_state = self.user_to_current_state.get(
|
||||
user_id, UserPresenceState.default(user_id)
|
||||
)
|
||||
to_notify = {} # Changes we want to notify everyone about
|
||||
to_federation_ping = {} # These need sending keep-alives
|
||||
|
||||
new_state, should_notify, should_ping = handle_update(
|
||||
prev_state, new_state,
|
||||
is_mine=self.hs.is_mine_id(user_id),
|
||||
wheel_timer=self.wheel_timer,
|
||||
now=now
|
||||
)
|
||||
for new_state in new_states:
|
||||
user_id = new_state.user_id
|
||||
|
||||
self.user_to_current_state[user_id] = new_state
|
||||
# Its fine to not hit the database here, as the only thing not in
|
||||
# the current state cache are OFFLINE states, where the only field
|
||||
# of interest is last_active which is safe enough to assume is 0
|
||||
# here.
|
||||
prev_state = self.user_to_current_state.get(
|
||||
user_id, UserPresenceState.default(user_id)
|
||||
)
|
||||
|
||||
if should_notify:
|
||||
to_notify[user_id] = new_state
|
||||
elif should_ping:
|
||||
to_federation_ping[user_id] = new_state
|
||||
new_state, should_notify, should_ping = handle_update(
|
||||
prev_state, new_state,
|
||||
is_mine=self.hs.is_mine_id(user_id),
|
||||
wheel_timer=self.wheel_timer,
|
||||
now=now
|
||||
)
|
||||
|
||||
# TODO: We should probably ensure there are no races hereafter
|
||||
self.user_to_current_state[user_id] = new_state
|
||||
|
||||
presence_updates_counter.inc_by(len(new_states))
|
||||
if should_notify:
|
||||
to_notify[user_id] = new_state
|
||||
elif should_ping:
|
||||
to_federation_ping[user_id] = new_state
|
||||
|
||||
if to_notify:
|
||||
notified_presence_counter.inc_by(len(to_notify))
|
||||
yield self._persist_and_notify(to_notify.values())
|
||||
# TODO: We should probably ensure there are no races hereafter
|
||||
|
||||
self.unpersisted_users_changes |= set(s.user_id for s in new_states)
|
||||
self.unpersisted_users_changes -= set(to_notify.keys())
|
||||
presence_updates_counter.inc_by(len(new_states))
|
||||
|
||||
to_federation_ping = {
|
||||
user_id: state for user_id, state in to_federation_ping.items()
|
||||
if user_id not in to_notify
|
||||
}
|
||||
if to_federation_ping:
|
||||
_, _, hosts_to_states = yield self._get_interested_parties(
|
||||
to_federation_ping.values()
|
||||
)
|
||||
if to_notify:
|
||||
notified_presence_counter.inc_by(len(to_notify))
|
||||
yield self._persist_and_notify(to_notify.values())
|
||||
|
||||
self._push_to_remotes(hosts_to_states)
|
||||
self.unpersisted_users_changes |= set(s.user_id for s in new_states)
|
||||
self.unpersisted_users_changes -= set(to_notify.keys())
|
||||
|
||||
to_federation_ping = {
|
||||
user_id: state for user_id, state in to_federation_ping.items()
|
||||
if user_id not in to_notify
|
||||
}
|
||||
if to_federation_ping:
|
||||
_, _, hosts_to_states = yield self._get_interested_parties(
|
||||
to_federation_ping.values()
|
||||
)
|
||||
|
||||
self._push_to_remotes(hosts_to_states)
|
||||
|
||||
def _handle_timeouts(self):
|
||||
"""Checks the presence of users that have timed out and updates as
|
||||
|
@ -267,26 +271,27 @@ class PresenceHandler(BaseHandler):
|
|||
"""
|
||||
now = self.clock.time_msec()
|
||||
|
||||
# Fetch the list of users that *may* have timed out. Things may have
|
||||
# changed since the timeout was set, so we won't necessarily have to
|
||||
# take any action.
|
||||
users_to_check = self.wheel_timer.fetch(now)
|
||||
with Measure(self.clock, "presence_handle_timeouts"):
|
||||
# Fetch the list of users that *may* have timed out. Things may have
|
||||
# changed since the timeout was set, so we won't necessarily have to
|
||||
# take any action.
|
||||
users_to_check = self.wheel_timer.fetch(now)
|
||||
|
||||
states = [
|
||||
self.user_to_current_state.get(
|
||||
user_id, UserPresenceState.default(user_id)
|
||||
states = [
|
||||
self.user_to_current_state.get(
|
||||
user_id, UserPresenceState.default(user_id)
|
||||
)
|
||||
for user_id in set(users_to_check)
|
||||
]
|
||||
|
||||
timers_fired_counter.inc_by(len(states))
|
||||
|
||||
changes = handle_timeouts(
|
||||
states,
|
||||
is_mine_fn=self.hs.is_mine_id,
|
||||
user_to_num_current_syncs=self.user_to_num_current_syncs,
|
||||
now=now,
|
||||
)
|
||||
for user_id in set(users_to_check)
|
||||
]
|
||||
|
||||
timers_fired_counter.inc_by(len(states))
|
||||
|
||||
changes = handle_timeouts(
|
||||
states,
|
||||
is_mine_fn=self.hs.is_mine_id,
|
||||
user_to_num_current_syncs=self.user_to_num_current_syncs,
|
||||
now=now,
|
||||
)
|
||||
|
||||
preserve_fn(self._update_states)(changes)
|
||||
|
||||
|
|
Loading…
Reference in a new issue