Allow external processes to mark a user as syncing. (#812)

* Add infrastructure to the presence handler to track sync requests in external processes

* Expire stale entries for dead external processes

* Add an http endpoint for making users as syncing

Add some docstrings and comments.

* Fixes
This commit is contained in:
Mark Haines 2016-06-02 15:20:15 +01:00
parent fb2193cc63
commit 70599ce925
4 changed files with 174 additions and 22 deletions

View file

@ -68,6 +68,10 @@ FEDERATION_TIMEOUT = 30 * 60 * 1000
# How often to resend presence to remote servers # How often to resend presence to remote servers
FEDERATION_PING_INTERVAL = 25 * 60 * 1000 FEDERATION_PING_INTERVAL = 25 * 60 * 1000
# How long we will wait before assuming that the syncs from an external process
# are dead.
EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000
assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
@ -158,10 +162,21 @@ class PresenceHandler(object):
self.serial_to_user = {} self.serial_to_user = {}
self._next_serial = 1 self._next_serial = 1
# Keeps track of the number of *ongoing* syncs. While this is non zero # Keeps track of the number of *ongoing* syncs on this process. While
# a user will never go offline. # this is non zero a user will never go offline.
self.user_to_num_current_syncs = {} self.user_to_num_current_syncs = {}
# Keeps track of the number of *ongoing* syncs on other processes.
# While any sync is ongoing on another process the user will never
# go offline.
# Each process has a unique identifier and an update frequency. If
# no update is received from that process within the update period then
# we assume that all the sync requests on that process have stopped.
# Stored as a dict from process_id to set of user_id, and a dict of
# process_id to millisecond timestamp last updated.
self.external_process_to_current_syncs = {}
self.external_process_last_updated_ms = {}
# Start a LoopingCall in 30s that fires every 5s. # Start a LoopingCall in 30s that fires every 5s.
# The initial delay is to allow disconnected clients a chance to # The initial delay is to allow disconnected clients a chance to
# reconnect before we treat them as offline. # reconnect before we treat them as offline.
@ -272,13 +287,26 @@ class PresenceHandler(object):
# Fetch the list of users that *may* have timed out. Things may have # Fetch the list of users that *may* have timed out. Things may have
# changed since the timeout was set, so we won't necessarily have to # changed since the timeout was set, so we won't necessarily have to
# take any action. # take any action.
users_to_check = self.wheel_timer.fetch(now) users_to_check = set(self.wheel_timer.fetch(now))
# Check whether the lists of syncing processes from an external
# process have expired.
expired_process_ids = [
process_id for process_id, last_update
in self.external_process_last_update.items()
if now - last_update > EXTERNAL_PROCESS_EXPIRY
]
for process_id in expired_process_ids:
users_to_check.update(
self.external_process_to_current_syncs.pop(process_id, ())
)
self.external_process_last_update.pop(process_id)
states = [ states = [
self.user_to_current_state.get( self.user_to_current_state.get(
user_id, UserPresenceState.default(user_id) user_id, UserPresenceState.default(user_id)
) )
for user_id in set(users_to_check) for user_id in users_to_check
] ]
timers_fired_counter.inc_by(len(states)) timers_fired_counter.inc_by(len(states))
@ -286,7 +314,7 @@ class PresenceHandler(object):
changes = handle_timeouts( changes = handle_timeouts(
states, states,
is_mine_fn=self.is_mine_id, is_mine_fn=self.is_mine_id,
user_to_num_current_syncs=self.user_to_num_current_syncs, syncing_users=self.get_syncing_users(),
now=now, now=now,
) )
@ -363,6 +391,73 @@ class PresenceHandler(object):
defer.returnValue(_user_syncing()) defer.returnValue(_user_syncing())
def get_currently_syncing_users(self):
"""Get the set of user ids that are currently syncing on this HS.
Returns:
set(str): A set of user_id strings.
"""
syncing_user_ids = {
user_id for user_id, count in self.user_to_num_current_syncs.items()
if count
}
syncing_user_ids.update(self.external_process_to_current_syncs.values())
return syncing_user_ids
@defer.inlineCallbacks
def update_external_syncs(self, process_id, syncing_user_ids):
"""Update the syncing users for an external process
Args:
process_id(str): An identifier for the process the users are
syncing against. This allows synapse to process updates
as user start and stop syncing against a given process.
syncing_user_ids(set(str)): The set of user_ids that are
currently syncing on that server.
"""
# Grab the previous list of user_ids that were syncing on that process
prev_syncing_user_ids = (
self.external_process_to_current_syncs.get(process_id, set())
)
# Grab the current presence state for both the users that are syncing
# now and the users that were syncing before this update.
prev_states = yield self.current_state_for_users(
syncing_user_ids | prev_syncing_user_ids
)
updates = []
time_now_ms = self.clock.time_msec()
# For each new user that is syncing check if we need to mark them as
# being online.
for new_user_id in syncing_user_ids - prev_syncing_user_ids:
prev_state = prev_states[new_user_id]
if prev_state.state == PresenceState.OFFLINE:
updates.append(prev_state.copy_and_replace(
state=PresenceState.ONLINE,
last_active_ts=time_now_ms,
last_user_sync_ts=time_now_ms,
))
else:
updates.append(prev_state.copy_and_replace(
last_user_sync_ts=time_now_ms,
))
# For each user that is still syncing or stopped syncing update the
# last sync time so that we will correctly apply the grace period when
# they stop syncing.
for old_user_id in prev_syncing_user_ids:
prev_state = prev_states[old_user_id]
updates.append(prev_state.copy_and_replace(
last_user_sync_ts=time_now_ms,
))
yield self._update_states(updates)
# Update the last updated time for the process. We expire the entries
# if we don't receive an update in the given timeframe.
self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
self.external_process_to_current_syncs[process_id] = syncing_user_ids
@defer.inlineCallbacks @defer.inlineCallbacks
def current_state_for_user(self, user_id): def current_state_for_user(self, user_id):
"""Get the current presence state for a user. """Get the current presence state for a user.
@ -935,15 +1030,14 @@ class PresenceEventSource(object):
return self.get_new_events(user, from_key=None, include_offline=False) return self.get_new_events(user, from_key=None, include_offline=False)
def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now): def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
"""Checks the presence of users that have timed out and updates as """Checks the presence of users that have timed out and updates as
appropriate. appropriate.
Args: Args:
user_states(list): List of UserPresenceState's to check. user_states(list): List of UserPresenceState's to check.
is_mine_fn (fn): Function that returns if a user_id is ours is_mine_fn (fn): Function that returns if a user_id is ours
user_to_num_current_syncs (dict): Mapping of user_id to number of currently syncing_user_ids (set): Set of user_ids with active syncs.
active syncs.
now (int): Current time in ms. now (int): Current time in ms.
Returns: Returns:
@ -954,21 +1048,20 @@ def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now):
for state in user_states: for state in user_states:
is_mine = is_mine_fn(state.user_id) is_mine = is_mine_fn(state.user_id)
new_state = handle_timeout(state, is_mine, user_to_num_current_syncs, now) new_state = handle_timeout(state, is_mine, syncing_user_ids, now)
if new_state: if new_state:
changes[state.user_id] = new_state changes[state.user_id] = new_state
return changes.values() return changes.values()
def handle_timeout(state, is_mine, user_to_num_current_syncs, now): def handle_timeout(state, is_mine, syncing_user_ids, now):
"""Checks the presence of the user to see if any of the timers have elapsed """Checks the presence of the user to see if any of the timers have elapsed
Args: Args:
state (UserPresenceState) state (UserPresenceState)
is_mine (bool): Whether the user is ours is_mine (bool): Whether the user is ours
user_to_num_current_syncs (dict): Mapping of user_id to number of currently syncing_user_ids (set): Set of user_ids with active syncs.
active syncs.
now (int): Current time in ms. now (int): Current time in ms.
Returns: Returns:
@ -1002,7 +1095,7 @@ def handle_timeout(state, is_mine, user_to_num_current_syncs, now):
# If there are have been no sync for a while (and none ongoing), # If there are have been no sync for a while (and none ongoing),
# set presence to offline # set presence to offline
if not user_to_num_current_syncs.get(user_id, 0): if user_id not in syncing_user_ids:
if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT: if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
state = state.copy_and_replace( state = state.copy_and_replace(
state=PresenceState.OFFLINE, state=PresenceState.OFFLINE,

View file

@ -0,0 +1,59 @@
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.http.server import respond_with_json_bytes, request_handler
from synapse.http.servlet import parse_json_object_from_request
from twisted.web.resource import Resource
from twisted.web.server import NOT_DONE_YET
from twisted.internet import defer
class PresenceResource(Resource):
"""
HTTP endpoint for marking users as syncing.
POST /_synapse/replication/presence HTTP/1.1
Content-Type: application/json
{
"process_id": "<process_id>",
"syncing_users": ["<user_id>"]
}
"""
def __init__(self, hs):
Resource.__init__(self) # Resource is old-style, so no super()
self.version_string = hs.version_string
self.clock = hs.get_clock()
self.presence_handler = hs.get_presence_handler()
def render_POST(self, request):
self._async_render_POST(request)
return NOT_DONE_YET
@request_handler()
@defer.inlineCallbacks
def _async_render_POST(self, request):
content = parse_json_object_from_request(request)
process_id = content["process_id"]
syncing_user_ids = content["syncing_users"]
yield self.presence_handler.update_external_syncs(
process_id, set(syncing_user_ids)
)
respond_with_json_bytes(request, 200, "{}")

View file

@ -16,6 +16,7 @@
from synapse.http.servlet import parse_integer, parse_string from synapse.http.servlet import parse_integer, parse_string
from synapse.http.server import request_handler, finish_request from synapse.http.server import request_handler, finish_request
from synapse.replication.pusher_resource import PusherResource from synapse.replication.pusher_resource import PusherResource
from synapse.replication.presence_resource import PresenceResource
from twisted.web.resource import Resource from twisted.web.resource import Resource
from twisted.web.server import NOT_DONE_YET from twisted.web.server import NOT_DONE_YET
@ -115,6 +116,7 @@ class ReplicationResource(Resource):
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.putChild("remove_pushers", PusherResource(hs)) self.putChild("remove_pushers", PusherResource(hs))
self.putChild("syncing_users", PresenceResource(hs))
def render_GET(self, request): def render_GET(self, request):
self._async_render_GET(request) self._async_render_GET(request)

View file

@ -264,7 +264,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
) )
new_state = handle_timeout( new_state = handle_timeout(
state, is_mine=True, user_to_num_current_syncs={}, now=now state, is_mine=True, syncing_user_ids=set(), now=now
) )
self.assertIsNotNone(new_state) self.assertIsNotNone(new_state)
@ -282,7 +282,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
) )
new_state = handle_timeout( new_state = handle_timeout(
state, is_mine=True, user_to_num_current_syncs={}, now=now state, is_mine=True, syncing_user_ids=set(), now=now
) )
self.assertIsNotNone(new_state) self.assertIsNotNone(new_state)
@ -300,9 +300,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
) )
new_state = handle_timeout( new_state = handle_timeout(
state, is_mine=True, user_to_num_current_syncs={ state, is_mine=True, syncing_user_ids=set([user_id]), now=now
user_id: 1,
}, now=now
) )
self.assertIsNotNone(new_state) self.assertIsNotNone(new_state)
@ -321,7 +319,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
) )
new_state = handle_timeout( new_state = handle_timeout(
state, is_mine=True, user_to_num_current_syncs={}, now=now state, is_mine=True, syncing_user_ids=set(), now=now
) )
self.assertIsNotNone(new_state) self.assertIsNotNone(new_state)
@ -340,7 +338,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
) )
new_state = handle_timeout( new_state = handle_timeout(
state, is_mine=True, user_to_num_current_syncs={}, now=now state, is_mine=True, syncing_user_ids=set(), now=now
) )
self.assertIsNone(new_state) self.assertIsNone(new_state)
@ -358,7 +356,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
) )
new_state = handle_timeout( new_state = handle_timeout(
state, is_mine=False, user_to_num_current_syncs={}, now=now state, is_mine=False, syncing_user_ids=set(), now=now
) )
self.assertIsNotNone(new_state) self.assertIsNotNone(new_state)
@ -377,7 +375,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
) )
new_state = handle_timeout( new_state = handle_timeout(
state, is_mine=True, user_to_num_current_syncs={}, now=now state, is_mine=True, syncing_user_ids=set(), now=now
) )
self.assertIsNotNone(new_state) self.assertIsNotNone(new_state)