From 5c75adff951b27744528d9c095f4ff1f8df1f77a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 12 May 2015 11:00:37 +0100 Subject: [PATCH 01/11] Add a NotifierUserStream to hold all the notification listeners for a user --- synapse/notifier.py | 228 ++++++++++++++++++++++---------------------- 1 file changed, 115 insertions(+), 113 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index abe12b1434..0b50898f3f 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -43,28 +43,18 @@ def count(func, l): class _NotificationListener(object): """ This represents a single client connection to the events stream. - The events stream handler will have yielded to the deferred, so to notify the handler it is sufficient to resolve the deferred. - - This listener will also keep track of which rooms it is listening in - so that it can remove itself from the indexes in the Notifier class. """ - def __init__(self, user, rooms, deferred, appservice=None): - self.user = user - self.appservice = appservice + def __init__(self, deferred): self.deferred = deferred - self.rooms = rooms - self.timer = None def notified(self): return self.deferred.called - def notify(self, notifier): - """ Inform whoever is listening about the new events. This will - also remove this listener from all the indexes in the Notifier - it knows about. + def notify(self): + """ Inform whoever is listening about the new events. """ try: @@ -72,27 +62,45 @@ class _NotificationListener(object): except defer.AlreadyCalledError: pass - # Should the following be done be using intrusively linked lists? - # -- erikj + +class _NotifierUserStream(object): + """This represents a user connected to the event stream. + It tracks the most recent stream token for that user. + At a given point a user may have a number of streams listening for + events. + + This listener will also keep track of which rooms it is listening in + so that it can remove itself from the indexes in the Notifier class. + """ + + def __init__(self, user, rooms, current_token, appservice=None): + self.user = user + self.appservice = appservice + self.listeners = set() + self.rooms = rooms + self.current_token = current_token + + def notify(self, new_token): + for listener in self.listeners: + listener.notify(new_token) + self.listeners.clear() + + def remove(self, notifier): + """ Remove this listener from all the indexes in the Notifier + it knows about. + """ for room in self.rooms: - lst = notifier.room_to_listeners.get(room, set()) + lst = notifier.room_to_user_streams.get(room, set()) lst.discard(self) - notifier.user_to_listeners.get(self.user, set()).discard(self) + notifier.user_to_user_streams.get(self.user, set()).discard(self) if self.appservice: - notifier.appservice_to_listeners.get( + notifier.appservice_to_user_streams.get( self.appservice, set() ).discard(self) - # Cancel the timeout for this notifer if one exists. - if self.timer is not None: - try: - notifier.clock.cancel_call_later(self.timer) - except: - logger.warn("Failed to cancel notifier timer") - class Notifier(object): """ This class is responsible for notifying any listeners when there are @@ -104,11 +112,12 @@ class Notifier(object): def __init__(self, hs): self.hs = hs - self.room_to_listeners = {} - self.user_to_listeners = {} - self.appservice_to_listeners = {} + self.user_to_user_stream = {} + self.room_to_user_streams = {} + self.appservice_to_user_streams = {} self.event_sources = hs.get_event_sources() + self.store = hs.get_datastore() self.clock = hs.get_clock() @@ -120,34 +129,34 @@ class Notifier(object): # when rendering the metrics page, which is likely once per minute at # most when scraping it. def count_listeners(): - all_listeners = set() + all_user_streams = set() - for x in self.room_to_listeners.values(): - all_listeners |= x - for x in self.user_to_listeners.values(): - all_listeners |= x - for x in self.appservice_to_listeners.values(): - all_listeners |= x + for x in self.room_to_user_streams.values(): + all_user_streams |= x + for x in self.user_to_user_streams.values(): + all_user_streams |= x + for x in self.appservice_to_user_streams.values(): + all_user_streams |= x - return len(all_listeners) + return sum(len(stream.listeners) for stream in all_user_streams) metrics.register_callback("listeners", count_listeners) metrics.register_callback( "rooms", - lambda: count(bool, self.room_to_listeners.values()), + lambda: count(bool, self.room_to_user_streams.values()), ) metrics.register_callback( "users", - lambda: count(bool, self.user_to_listeners.values()), + lambda: len(self.user_to_user_stream), ) metrics.register_callback( "appservices", - lambda: count(bool, self.appservice_to_listeners.values()), + lambda: count(bool, self.appservice_to_user_streams.values()), ) @log_function @defer.inlineCallbacks - def on_new_room_event(self, event, extra_users=[]): + def on_new_room_event(self, event, new_token, extra_users=[]): """ Used by handlers to inform the notifier something has happened in the room, room event wise. @@ -155,6 +164,7 @@ class Notifier(object): listening to the room, and any listeners for the users in the `extra_users` param. """ + assert isinstance(new_token, StreamToken) yield run_on_reactor() # poke any interested application service. self.hs.get_handlers().appservice_handler.notify_interested_services( @@ -163,72 +173,60 @@ class Notifier(object): room_id = event.room_id - room_listeners = self.room_to_listeners.get(room_id, set()) + room_user_streams = self.room_to_user_streams.get(room_id, set()) - _discard_if_notified(room_listeners) - - listeners = room_listeners.copy() + user_streams = room_user_streams.copy() for user in extra_users: - user_listeners = self.user_to_listeners.get(user, set()) + user_stream = self.user_to_user_stream.get(user) + if user_stream is not None: + user_streams.add(user_stream) - _discard_if_notified(user_listeners) - - listeners |= user_listeners - - for appservice in self.appservice_to_listeners: + for appservice in self.appservice_to_user_streams: # TODO (kegan): Redundant appservice listener checks? - # App services will already be in the room_to_listeners set, but + # App services will already be in the room_to_user_streams set, but # that isn't enough. They need to be checked here in order to # receive *invites* for users they are interested in. Does this - # make the room_to_listeners check somewhat obselete? + # make the room_to_user_streams check somewhat obselete? if appservice.is_interested(event): - app_listeners = self.appservice_to_listeners.get( + app_user_streams = self.appservice_to_user_streams.get( appservice, set() ) + user_streams |= app_user_streams - _discard_if_notified(app_listeners) - - listeners |= app_listeners - - logger.debug("on_new_room_event listeners %s", listeners) + logger.debug("on_new_room_event listeners %s", user_streams) with PreserveLoggingContext(): - for listener in listeners: + for user_stream in user_streams: try: - listener.notify(self) + user_stream.notify(new_token) except: logger.exception("Failed to notify listener") @defer.inlineCallbacks @log_function - def on_new_user_event(self, users=[], rooms=[]): + def on_new_user_event(self, new_token, users=[], rooms=[]): """ Used to inform listeners that something has happend presence/user event wise. Will wake up all listeners for the given users and rooms. """ + assert isinstance(new_token, StreamToken) yield run_on_reactor() - listeners = set() + user_streams = set() for user in users: - user_listeners = self.user_to_listeners.get(user, set()) - - _discard_if_notified(user_listeners) - - listeners |= user_listeners + user_stream = self.user_to_user_stream.get(user) + if user_stream: + user_stream.add(user_stream) for room in rooms: - room_listeners = self.room_to_listeners.get(room, set()) - - _discard_if_notified(room_listeners) - - listeners |= room_listeners + user_streams |= self.room_to_user_streams.get(room, set()) with PreserveLoggingContext(): - for listener in listeners: + for user_stream in user_streams: try: - listener.notify(self) + user_streams.notify(new_token) except: logger.exception("Failed to notify listener") @@ -240,21 +238,32 @@ class Notifier(object): """ deferred = defer.Deferred() - appservice = yield self.hs.get_datastore().get_app_service_by_user_id( - user.to_string() - ) - listener = [_NotificationListener( - user=user, - rooms=rooms, - deferred=deferred, - appservice=appservice, - )] + user_stream = self.user_to_user_streams.get(user) + if user_stream is None: + appservice = yield self.store.get_app_service_by_user_id( + user.to_string() + ) + current_token = yield self.event_sources.get_current_token() + user_stream = _NotifierUserStream( + user=user, + rooms=rooms, + appservice=appservice, + current_token=current_token, + ) + self._register_with_keys(user_stream) + else: + current_token = user_stream.current_token - if timeout: - self._register_with_keys(listener[0]) + if timeout and not current_token.is_after(from_token): + listener = [_NotificationListener(deferred)] + user_stream.listeners.add(listener[0]) + + if current_token.is_after(from_token): + result = yield callback(from_token, current_token) + else: + result = None - result = yield callback() timer = [None] if timeout: @@ -263,23 +272,19 @@ class Notifier(object): def _timeout_listener(): timed_out[0] = True timer[0] = None - listener[0].notify(self) + listener[0].notify(user_stream) # We create multiple notification listeners so we have to manage # canceling the timeout ourselves. timer[0] = self.clock.call_later(timeout/1000., _timeout_listener) while not result and not timed_out[0]: - yield deferred + new_token = yield deferred deferred = defer.Deferred() - listener[0] = _NotificationListener( - user=user, - rooms=rooms, - deferred=deferred, - appservice=appservice, - ) - self._register_with_keys(listener[0]) - result = yield callback() + listener[0] = _NotificationListener(deferred) + user_stream.listeners.add(listener[0]) + result = yield callback(current_token, new_token) + current_token = new_token if timer[0] is not None: try: @@ -302,7 +307,7 @@ class Notifier(object): limit = pagination_config.limit @defer.inlineCallbacks - def check_for_updates(): + def check_for_updates(start_token, end_token): events = [] end_token = from_token for name, source in self.event_sources.sources.items(): @@ -328,26 +333,23 @@ class Notifier(object): defer.returnValue(result) @log_function - def _register_with_keys(self, listener): - for room in listener.rooms: - s = self.room_to_listeners.setdefault(room, set()) - s.add(listener) + def _register_with_keys(self, user_stream): + self.user_to_user_stream[user_stream.user] = user_stream - self.user_to_listeners.setdefault(listener.user, set()).add(listener) + for room in user_stream.rooms: + s = self.room_to_user_stream.setdefault(room, set()) + s.add(user_stream) - if listener.appservice: - self.appservice_to_listeners.setdefault( - listener.appservice, set() - ).add(listener) + if user_stream.appservice: + self.appservice_to_user_stream.setdefault( + user_stream.appservice, set() + ).add(user_stream) def _user_joined_room(self, user, room_id): - new_listeners = self.user_to_listeners.get(user, set()) - - listeners = self.room_to_listeners.setdefault(room_id, set()) - listeners |= new_listeners - - for l in new_listeners: - l.rooms.add(room_id) + new_user_stream = self.user_to_user_stream.get(user) + room_streams = self.room_to_user_streams.setdefault(room_id, set()) + room_streams.add(new_user_stream) + new_user_stream.rooms.add(room_id) def _discard_if_notified(listener_set): From 63878c03794d33a8767425e114845159e5c1cb9a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 13 May 2015 13:42:21 +0100 Subject: [PATCH 02/11] Don't bother checking for updates if the stream token hasn't advanced for a user --- synapse/handlers/_base.py | 7 ++- synapse/handlers/federation.py | 25 +++++---- synapse/handlers/presence.py | 4 ++ synapse/handlers/typing.py | 4 +- synapse/notifier.py | 75 ++++++++++++++++++--------- synapse/storage/events.py | 3 ++ synapse/types.py | 19 ++++++- tests/handlers/test_federation.py | 4 +- tests/handlers/test_room.py | 8 +-- tests/handlers/test_typing.py | 12 ++--- tests/rest/client/v1/test_presence.py | 15 ++++-- tests/utils.py | 2 +- 12 files changed, 123 insertions(+), 55 deletions(-) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index ddc5c21e7d..833ff41377 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -105,7 +105,9 @@ class BaseHandler(object): if not suppress_auth: self.auth.check(event, auth_events=context.current_state) - yield self.store.persist_event(event, context=context) + (event_stream_id, max_stream_id) = yield self.store.persist_event( + event, context=context + ) federation_handler = self.hs.get_handlers().federation_handler @@ -142,7 +144,8 @@ class BaseHandler(object): with PreserveLoggingContext(): # Don't block waiting on waking up all the listeners. notify_d = self.notifier.on_new_room_event( - event, extra_users=extra_users + event, event_stream_id, max_stream_id, + extra_users=extra_users ) def log_failure(f): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7d9906039e..bc0f7b0ee7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -160,7 +160,7 @@ class FederationHandler(BaseHandler): ) try: - yield self._handle_new_event( + _, event_stream_id, max_stream_id = yield self._handle_new_event( origin, event, state=state, @@ -203,7 +203,8 @@ class FederationHandler(BaseHandler): with PreserveLoggingContext(): d = self.notifier.on_new_room_event( - event, extra_users=extra_users + event, event_stream_id, max_stream_id, + extra_users=extra_users ) def log_failure(f): @@ -561,7 +562,7 @@ class FederationHandler(BaseHandler): if e.event_id in auth_ids } - yield self._handle_new_event( + _, event_stream_id, max_stream_id = yield self._handle_new_event( origin, new_event, state=state, @@ -571,7 +572,8 @@ class FederationHandler(BaseHandler): with PreserveLoggingContext(): d = self.notifier.on_new_room_event( - new_event, extra_users=[joinee] + new_event, event_stream_id, max_stream_id, + extra_users=[joinee] ) def log_failure(f): @@ -637,7 +639,9 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = False - context = yield self._handle_new_event(origin, event) + context, event_stream_id, max_stream_id = yield self._handle_new_event( + origin, event + ) logger.debug( "on_send_join_request: After _handle_new_event: %s, sigs: %s", @@ -653,7 +657,7 @@ class FederationHandler(BaseHandler): with PreserveLoggingContext(): d = self.notifier.on_new_room_event( - event, extra_users=extra_users + event, event_stream_id, max_stream_id, extra_users=extra_users ) def log_failure(f): @@ -727,7 +731,7 @@ class FederationHandler(BaseHandler): context = yield self.state_handler.compute_event_context(event) - yield self.store.persist_event( + event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, backfilled=False, @@ -736,7 +740,8 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(event.state_key) with PreserveLoggingContext(): d = self.notifier.on_new_room_event( - event, extra_users=[target_user], + event, event_stream_id, max_stream_id, + extra_users=[target_user], ) def log_failure(f): @@ -914,7 +919,7 @@ class FederationHandler(BaseHandler): ) raise - yield self.store.persist_event( + event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, backfilled=backfilled, @@ -922,7 +927,7 @@ class FederationHandler(BaseHandler): current_state=current_state, ) - defer.returnValue(context) + defer.returnValue((context, event_stream_id, max_stream_id)) @defer.inlineCallbacks def on_query_auth(self, origin, event_id, remote_auth_chain, rejects, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 28688d532d..7db4b062d2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -345,6 +345,8 @@ class PresenceHandler(BaseHandler): curr_users = yield rm_handler.get_room_members(room_id) for local_user in [c for c in curr_users if self.hs.is_mine(c)]: + statuscache = self._get_or_offline_usercache(local_user) + statuscache.update({}, serial=self._user_cachemap_latest_serial) self.push_update_to_local_and_remote( observed_user=local_user, users_to_push=[user], @@ -820,6 +822,8 @@ class PresenceHandler(BaseHandler): room_ids=[], statuscache=None): with PreserveLoggingContext(): self.notifier.on_new_user_event( + "presence_key", + self._user_cachemap_latest_serial, users_to_push, room_ids, ) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 64fe51aa3e..a9895292c2 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -218,7 +218,9 @@ class TypingNotificationHandler(BaseHandler): self._room_serials[room_id] = self._latest_room_serial with PreserveLoggingContext(): - self.notifier.on_new_user_event(rooms=[room_id]) + self.notifier.on_new_user_event( + "typing_key", self._latest_room_serial, rooms=[room_id] + ) class TypingNotificationEventSource(object): diff --git a/synapse/notifier.py b/synapse/notifier.py index 214a2b28ca..4d10c05038 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -52,12 +52,11 @@ class _NotificationListener(object): def notified(self): return self.deferred.called - def notify(self): + def notify(self, token): """ Inform whoever is listening about the new events. """ - try: - self.deferred.callback(None) + self.deferred.callback(token) except defer.AlreadyCalledError: pass @@ -73,15 +72,18 @@ class _NotifierUserStream(object): """ def __init__(self, user, rooms, current_token, appservice=None): - self.user = user + self.user = str(user) self.appservice = appservice self.listeners = set() - self.rooms = rooms + self.rooms = set(rooms) self.current_token = current_token - def notify(self, new_token): + def notify(self, stream_key, stream_id): + self.current_token = self.current_token.copy_and_replace( + stream_key, stream_id + ) for listener in self.listeners: - listener.notify(new_token) + listener.notify(self.current_token) self.listeners.clear() def remove(self, notifier): @@ -117,6 +119,7 @@ class Notifier(object): self.event_sources = hs.get_event_sources() self.store = hs.get_datastore() + self.pending_new_room_events = [] self.clock = hs.get_clock() @@ -153,9 +156,21 @@ class Notifier(object): lambda: count(bool, self.appservice_to_user_streams.values()), ) + def notify_pending_new_room_events(self, max_room_stream_id): + pending = sorted(self.pending_new_room_events) + self.pending_new_room_events = [] + for event, room_stream_id, extra_users in pending: + if room_stream_id > max_room_stream_id: + self.pending_new_room_events.append(( + event, room_stream_id, extra_users + )) + else: + self._on_new_room_event(event, room_stream_id, extra_users) + @log_function @defer.inlineCallbacks - def on_new_room_event(self, event, new_token, extra_users=[]): + def on_new_room_event(self, event, room_stream_id, max_room_stream_id, + extra_users=[]): """ Used by handlers to inform the notifier something has happened in the room, room event wise. @@ -163,8 +178,18 @@ class Notifier(object): listening to the room, and any listeners for the users in the `extra_users` param. """ - assert isinstance(new_token, StreamToken) yield run_on_reactor() + + self.notify_pending_new_room_events(max_room_stream_id) + + if room_stream_id > max_room_stream_id: + self.pending_new_room_events.append(( + event, room_stream_id, extra_users + )) + else: + self._on_new_room_event(event, room_stream_id, extra_users) + + def _on_new_room_event(self, event, room_stream_id, extra_users=[]): # poke any interested application service. self.hs.get_handlers().appservice_handler.notify_interested_services( event @@ -197,33 +222,32 @@ class Notifier(object): for user_stream in user_streams: try: - user_stream.notify(new_token) + user_stream.notify("room_key", "s%d" % (room_stream_id,)) except: logger.exception("Failed to notify listener") @defer.inlineCallbacks @log_function - def on_new_user_event(self, new_token, users=[], rooms=[]): + def on_new_user_event(self, stream_key, new_token, users=[], rooms=[]): """ Used to inform listeners that something has happend presence/user event wise. Will wake up all listeners for the given users and rooms. """ - assert isinstance(new_token, StreamToken) yield run_on_reactor() user_streams = set() for user in users: user_stream = self.user_to_user_stream.get(user) - if user_stream: - user_stream.add(user_stream) + if user_stream is not None: + user_streams.add(user_stream) for room in rooms: user_streams |= self.room_to_user_streams.get(room, set()) for user_stream in user_streams: try: - user_streams.notify(new_token) + user_stream.notify(stream_key, new_token) except: logger.exception("Failed to notify listener") @@ -236,12 +260,12 @@ class Notifier(object): deferred = defer.Deferred() - user_stream = self.user_to_user_streams.get(user) + user = str(user) + user_stream = self.user_to_user_stream.get(user) if user_stream is None: - appservice = yield self.store.get_app_service_by_user_id( - user.to_string() - ) + appservice = yield self.store.get_app_service_by_user_id(user) current_token = yield self.event_sources.get_current_token() + rooms = yield self.store.get_rooms_for_user(user) user_stream = _NotifierUserStream( user=user, rooms=rooms, @@ -252,8 +276,9 @@ class Notifier(object): else: current_token = user_stream.current_token + listener = [_NotificationListener(deferred)] + if timeout and not current_token.is_after(from_token): - listener = [_NotificationListener(deferred)] user_stream.listeners.add(listener[0]) if current_token.is_after(from_token): @@ -334,7 +359,7 @@ class Notifier(object): self.user_to_user_stream[user_stream.user] = user_stream for room in user_stream.rooms: - s = self.room_to_user_stream.setdefault(room, set()) + s = self.room_to_user_streams.setdefault(room, set()) s.add(user_stream) if user_stream.appservice: @@ -343,10 +368,12 @@ class Notifier(object): ).add(user_stream) def _user_joined_room(self, user, room_id): + user = str(user) new_user_stream = self.user_to_user_stream.get(user) - room_streams = self.room_to_user_streams.setdefault(room_id, set()) - room_streams.add(new_user_stream) - new_user_stream.rooms.add(room_id) + if new_user_stream is not None: + room_streams = self.room_to_user_streams.setdefault(room_id, set()) + room_streams.add(new_user_stream) + new_user_stream.rooms.add(room_id) def _discard_if_notified(listener_set): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a5a6869079..7d6df5f4c6 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -64,6 +64,9 @@ class EventsStore(SQLBaseStore): except _RollbackButIsFineException: pass + max_persisted_id = yield self._stream_id_gen.get_max_token(self) + defer.returnValue((stream_ordering, max_persisted_id)) + @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False, diff --git a/synapse/types.py b/synapse/types.py index 0f16867d75..d89a04f7c3 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -70,6 +70,8 @@ class DomainSpecificString( """Return a string encoding the fields of the structure object.""" return "%s%s:%s" % (self.SIGIL, self.localpart, self.domain) + __str__ = to_string + @classmethod def create(cls, localpart, domain,): return cls(localpart=localpart, domain=domain) @@ -107,7 +109,6 @@ class StreamToken( def from_string(cls, string): try: keys = string.split(cls._SEPARATOR) - return cls(*keys) except: raise SynapseError(400, "Invalid Token") @@ -115,6 +116,22 @@ class StreamToken( def to_string(self): return self._SEPARATOR.join([str(k) for k in self]) + @property + def room_stream_id(self): + # TODO(markjh): Awful hack to work around hacks in the presence tests + if type(self.room_key) is int: + return self.room_key + else: + return int(self.room_key[1:].split("-")[-1]) + + def is_after(self, other_token): + """Does this token contain events that the other doesn't?""" + return ( + (other_token.room_stream_id < self.room_stream_id) + or (int(other_token.presence_key) < int(self.presence_key)) + or (int(other_token.typing_key) < int(self.typing_key)) + ) + def copy_and_replace(self, key, new_value): d = self._asdict() d[key] = new_value diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index 08d2404b6c..f3821242bc 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -83,7 +83,7 @@ class FederationTestCase(unittest.TestCase): "hashes": {"sha256":"AcLrgtUIqqwaGoHhrEvYG1YLDIsVPYJdSRGhkp3jJp8"}, }) - self.datastore.persist_event.return_value = defer.succeed(None) + self.datastore.persist_event.return_value = defer.succeed((1,1)) self.datastore.get_room.return_value = defer.succeed(True) self.auth.check_host_in_room.return_value = defer.succeed(True) @@ -126,5 +126,5 @@ class FederationTestCase(unittest.TestCase): self.auth.check.assert_called_once_with(ANY, auth_events={}) self.notifier.on_new_room_event.assert_called_once_with( - ANY, extra_users=[] + ANY, 1, 1, extra_users=[] ) diff --git a/tests/handlers/test_room.py b/tests/handlers/test_room.py index 6417f73309..a2d7635995 100644 --- a/tests/handlers/test_room.py +++ b/tests/handlers/test_room.py @@ -87,6 +87,8 @@ class RoomMemberHandlerTestCase(unittest.TestCase): self.ratelimiter = hs.get_ratelimiter() self.ratelimiter.send_message.return_value = (True, 0) + self.datastore.persist_event.return_value = (1,1) + @defer.inlineCallbacks def test_invite(self): room_id = "!foo:red" @@ -160,7 +162,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): event, context=context, ) self.notifier.on_new_room_event.assert_called_once_with( - event, extra_users=[UserID.from_string(target_user_id)] + event, 1, 1, extra_users=[UserID.from_string(target_user_id)] ) self.assertFalse(self.datastore.get_room.called) self.assertFalse(self.datastore.store_room.called) @@ -226,7 +228,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): event, context=context ) self.notifier.on_new_room_event.assert_called_once_with( - event, extra_users=[user] + event, 1, 1, extra_users=[user] ) join_signal_observer.assert_called_with( @@ -304,7 +306,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): event, context=context ) self.notifier.on_new_room_event.assert_called_once_with( - event, extra_users=[user] + event, 1, 1, extra_users=[user] ) leave_signal_observer.assert_called_with( diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index b318d4944a..7ccbe2ea9c 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -183,7 +183,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ) self.on_new_user_event.assert_has_calls([ - call(rooms=[self.room_id]), + call('typing_key', 1, rooms=[self.room_id]), ]) self.assertEquals(self.event_source.get_current_key(), 1) @@ -246,7 +246,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ) self.on_new_user_event.assert_has_calls([ - call(rooms=[self.room_id]), + call('typing_key', 1, rooms=[self.room_id]), ]) self.assertEquals(self.event_source.get_current_key(), 1) @@ -300,7 +300,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ) self.on_new_user_event.assert_has_calls([ - call(rooms=[self.room_id]), + call('typing_key', 1, rooms=[self.room_id]), ]) yield put_json.await_calls() @@ -332,7 +332,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ) self.on_new_user_event.assert_has_calls([ - call(rooms=[self.room_id]), + call('typing_key', 1, rooms=[self.room_id]), ]) self.on_new_user_event.reset_mock() @@ -352,7 +352,7 @@ class TypingNotificationsTestCase(unittest.TestCase): self.clock.advance_time(11) self.on_new_user_event.assert_has_calls([ - call(rooms=[self.room_id]), + call('typing_key', 2, rooms=[self.room_id]), ]) self.assertEquals(self.event_source.get_current_key(), 2) @@ -378,7 +378,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ) self.on_new_user_event.assert_has_calls([ - call(rooms=[self.room_id]), + call('typing_key', 3, rooms=[self.room_id]), ]) self.on_new_user_event.reset_mock() diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index 8e0c5fa630..c0c52796ad 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -27,6 +27,7 @@ from synapse.handlers.presence import PresenceHandler from synapse.rest.client.v1 import presence from synapse.rest.client.v1 import events from synapse.types import UserID +from synapse.util.async import run_on_reactor OFFLINE = PresenceState.OFFLINE @@ -264,6 +265,7 @@ class PresenceEventStreamTestCase(unittest.TestCase): datastore=Mock(spec=[ "set_presence_state", "get_presence_list", + "get_rooms_for_user", ]), clock=Mock(spec=[ "call_later", @@ -298,6 +300,9 @@ class PresenceEventStreamTestCase(unittest.TestCase): self.mock_datastore.get_app_service_by_user_id = Mock( return_value=defer.succeed(None) ) + self.mock_datastore.get_rooms_for_user = ( + lambda u: get_rooms_for_user(UserID.from_string(u)) + ) def get_profile_displayname(user_id): return defer.succeed("Frank") @@ -350,19 +355,19 @@ class PresenceEventStreamTestCase(unittest.TestCase): self.mock_datastore.set_presence_state.return_value = defer.succeed( {"state": ONLINE} ) - self.mock_datastore.get_presence_list.return_value = defer.succeed( - [] - ) + self.mock_datastore.get_presence_list.return_value = defer.succeed([]) yield self.presence.set_state(self.u_banana, self.u_banana, state={"presence": ONLINE} ) + yield run_on_reactor() + (code, response) = yield self.mock_resource.trigger("GET", - "/events?from=0_1_0&timeout=0", None) + "/events?from=s0_1_0&timeout=0", None) self.assertEquals(200, code) - self.assertEquals({"start": "0_1_0", "end": "0_2_0", "chunk": [ + self.assertEquals({"start": "s0_1_0", "end": "s0_2_0", "chunk": [ {"type": "m.presence", "content": { "user_id": "@banana:test", diff --git a/tests/utils.py b/tests/utils.py index cc038fecf1..a67530bd63 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -355,7 +355,7 @@ class MemoryDataStore(object): return [] def get_room_events_max_id(self): - return 0 # TODO (erikj) + return "s0" # TODO (erikj) def get_send_event_level(self, room_id): return defer.succeed(0) From df6db5c8025e67c410a60994ac87eb39d842af30 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 13 May 2015 15:08:24 +0100 Subject: [PATCH 03/11] Don't bother checking for new events from a source if the stream token hasn't advanced for that source --- synapse/notifier.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 4d10c05038..d2fefea756 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -294,7 +294,7 @@ class Notifier(object): def _timeout_listener(): timed_out[0] = True timer[0] = None - listener[0].notify(user_stream) + listener[0].notify(from_token) # We create multiple notification listeners so we have to manage # canceling the timeout ourselves. @@ -329,11 +329,15 @@ class Notifier(object): limit = pagination_config.limit @defer.inlineCallbacks - def check_for_updates(start_token, end_token): + def check_for_updates(before_token, after_token): events = [] end_token = from_token for name, source in self.event_sources.sources.items(): keyname = "%s_key" % name + before_id = getattr(before_token, keyname) + after_id = getattr(after_token, keyname) + if before_id == after_id: + continue stuff, new_key = yield source.get_new_events_for_user( user, getattr(from_token, keyname), limit, ) From 9af432257da39385ce03c846b3c76f7fc7778ff0 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 13 May 2015 15:42:13 +0100 Subject: [PATCH 04/11] Don't set a timer if there's already a result to return --- synapse/notifier.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index d2fefea756..6fcb7767a0 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -82,9 +82,10 @@ class _NotifierUserStream(object): self.current_token = self.current_token.copy_and_replace( stream_key, stream_id ) - for listener in self.listeners: + listeners = self.listeners + self.listeners = set() + for listener in listeners: listener.notify(self.current_token) - self.listeners.clear() def remove(self, notifier): """ Remove this listener from all the indexes in the Notifier @@ -202,7 +203,7 @@ class Notifier(object): user_streams = room_user_streams.copy() for user in extra_users: - user_stream = self.user_to_user_stream.get(user) + user_stream = self.user_to_user_stream.get(str(user)) if user_stream is not None: user_streams.add(user_stream) @@ -288,12 +289,18 @@ class Notifier(object): timer = [None] + if result: + user_stream.listeners.discard(listener[0]) + defer.returnValue(result) + return + if timeout: timed_out = [False] def _timeout_listener(): timed_out[0] = True timer[0] = None + user_stream.listeners.discard(listener[0]) listener[0].notify(from_token) # We create multiple notification listeners so we have to manage From f1b83d88a3d3ad596631e51852a9802d0a7270a0 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 13 May 2015 16:54:02 +0100 Subject: [PATCH 05/11] Discard unused NotifierUserStreams --- synapse/notifier.py | 50 ++++++++++++++++++--------- tests/rest/client/v1/test_presence.py | 1 + tests/utils.py | 3 ++ 3 files changed, 38 insertions(+), 16 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 6fcb7767a0..344dd03172 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -71,14 +71,17 @@ class _NotifierUserStream(object): so that it can remove itself from the indexes in the Notifier class. """ - def __init__(self, user, rooms, current_token, appservice=None): + def __init__(self, user, rooms, current_token, time_now_ms, + appservice=None): self.user = str(user) self.appservice = appservice self.listeners = set() self.rooms = set(rooms) self.current_token = current_token + self.last_notified_ms = time_now_ms - def notify(self, stream_key, stream_id): + def notify(self, stream_key, stream_id, time_now_ms): + self.last_notified_ms = time_now_ms self.current_token = self.current_token.copy_and_replace( stream_key, stream_id ) @@ -96,7 +99,7 @@ class _NotifierUserStream(object): lst = notifier.room_to_user_streams.get(room, set()) lst.discard(self) - notifier.user_to_user_streams.get(self.user, set()).discard(self) + notifier.user_to_user_stream.pop(self.user) if self.appservice: notifier.appservice_to_user_streams.get( @@ -111,6 +114,8 @@ class Notifier(object): Primarily used from the /events stream. """ + UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000 + def __init__(self, hs): self.hs = hs @@ -128,6 +133,10 @@ class Notifier(object): "user_joined_room", self._user_joined_room ) + self.clock.looping_call( + self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS + ) + # This is not a very cheap test to perform, but it's only executed # when rendering the metrics page, which is likely once per minute at # most when scraping it. @@ -221,9 +230,12 @@ class Notifier(object): logger.debug("on_new_room_event listeners %s", user_streams) + time_now_ms = self.clock.time_msec() for user_stream in user_streams: try: - user_stream.notify("room_key", "s%d" % (room_stream_id,)) + user_stream.notify( + "room_key", "s%d" % (room_stream_id,), time_now_ms + ) except: logger.exception("Failed to notify listener") @@ -246,9 +258,10 @@ class Notifier(object): for room in rooms: user_streams |= self.room_to_user_streams.get(room, set()) + time_now_ms = self.clock.time_msec() for user_stream in user_streams: try: - user_stream.notify(stream_key, new_token) + user_stream.notify(stream_key, new_token, time_now_ms) except: logger.exception("Failed to notify listener") @@ -260,6 +273,7 @@ class Notifier(object): """ deferred = defer.Deferred() + time_now_ms = self.clock.time_msec() user = str(user) user_stream = self.user_to_user_stream.get(user) @@ -272,6 +286,7 @@ class Notifier(object): rooms=rooms, appservice=appservice, current_token=current_token, + time_now_ms=time_now_ms, ) self._register_with_keys(user_stream) else: @@ -365,6 +380,20 @@ class Notifier(object): defer.returnValue(result) + @log_function + def remove_expired_streams(self): + time_now_ms = self.clock.time_msec() + expired_streams = [] + expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY_MS + for stream in self.user_to_user_stream.values(): + if stream.listeners: + continue + if stream.last_notified_ms < expire_before_ts: + expired_streams.append(stream) + + for expired_stream in expired_streams: + expired_stream.remove(self) + @log_function def _register_with_keys(self, user_stream): self.user_to_user_stream[user_stream.user] = user_stream @@ -385,14 +414,3 @@ class Notifier(object): room_streams = self.room_to_user_streams.setdefault(room_id, set()) room_streams.add(new_user_stream) new_user_stream.rooms.add(room_id) - - -def _discard_if_notified(listener_set): - """Remove any 'stale' listeners from the given set. - """ - to_discard = set() - for l in listener_set: - if l.notified(): - to_discard.add(l) - - listener_set -= to_discard diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index c0c52796ad..29c0038f06 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -271,6 +271,7 @@ class PresenceEventStreamTestCase(unittest.TestCase): "call_later", "cancel_call_later", "time_msec", + "looping_call", ]), ) diff --git a/tests/utils.py b/tests/utils.py index a67530bd63..3b5c335911 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -197,6 +197,9 @@ class MockClock(object): return t + def looping_call(self, function, interval): + pass + def cancel_call_later(self, timer): if timer[2]: raise Exception("Cannot cancel an expired timer") From 5e0c533672f63990f7fdcf0b2c3cce15c3682d78 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 13 May 2015 17:20:28 +0100 Subject: [PATCH 06/11] Fix metric counter --- synapse/notifier.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 344dd03172..1f7f0a143f 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -145,8 +145,8 @@ class Notifier(object): for x in self.room_to_user_streams.values(): all_user_streams |= x - for x in self.user_to_user_streams.values(): - all_user_streams |= x + for x in self.user_to_user_stream: + all_user_streams.add(x) for x in self.appservice_to_user_streams.values(): all_user_streams |= x From 3edd2d5c93ccbec46f101e65c6c7874a90bf0018 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 14 May 2015 11:25:30 +0100 Subject: [PATCH 07/11] Fix v2 sync, update the last_notified_ms only if there was an active listener --- synapse/handlers/sync.py | 2 +- synapse/notifier.py | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 35a62fda47..bd8c603681 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -92,7 +92,7 @@ class SyncHandler(BaseHandler): result = yield self.current_sync_for_user(sync_config, since_token) defer.returnValue(result) else: - def current_sync_callback(): + def current_sync_callback(before_token, after_token): return self.current_sync_for_user(sync_config, since_token) rm_handler = self.hs.get_handlers().room_member_handler diff --git a/synapse/notifier.py b/synapse/notifier.py index 1f7f0a143f..2de7dca8a5 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -81,14 +81,15 @@ class _NotifierUserStream(object): self.last_notified_ms = time_now_ms def notify(self, stream_key, stream_id, time_now_ms): - self.last_notified_ms = time_now_ms self.current_token = self.current_token.copy_and_replace( stream_key, stream_id ) - listeners = self.listeners - self.listeners = set() - for listener in listeners: - listener.notify(self.current_token) + if self.listeners: + self.last_notified_ms = time_now_ms + listeners = self.listeners + self.listeners = set() + for listener in listeners: + listener.notify(self.current_token) def remove(self, notifier): """ Remove this listener from all the indexes in the Notifier From 084c365c3ad7f3a30ecfccca6cc79fb2e0c652ad Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 14 May 2015 12:03:26 +0100 Subject: [PATCH 08/11] Use the current token when timing out a notifier, make sure the user_id is a string in on_new_user_event --- synapse/notifier.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 2de7dca8a5..3dbd6f984d 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -252,7 +252,7 @@ class Notifier(object): user_streams = set() for user in users: - user_stream = self.user_to_user_stream.get(user) + user_stream = self.user_to_user_stream.get(str(user)) if user_stream is not None: user_streams.add(user_stream) @@ -317,7 +317,7 @@ class Notifier(object): timed_out[0] = True timer[0] = None user_stream.listeners.discard(listener[0]) - listener[0].notify(from_token) + listener[0].notify(current_token) # We create multiple notification listeners so we have to manage # canceling the timeout ourselves. From 0ad1c672344e09a2a09690a1ab1bb61fe0397293 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 14 May 2015 14:35:07 +0100 Subject: [PATCH 09/11] Add some doc-strings to notifier --- synapse/notifier.py | 50 +++++++++++++++++++++++++++++---------------- 1 file changed, 32 insertions(+), 18 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 3dbd6f984d..862b42cfc8 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -81,6 +81,13 @@ class _NotifierUserStream(object): self.last_notified_ms = time_now_ms def notify(self, stream_key, stream_id, time_now_ms): + """Notify any listeners for this user of a new event from an + event source. + Args: + stream_key(str): The stream the event came from. + stream_id(str): The new id for the stream the event came from. + time_now_ms(int): The current time in milliseconds. + """ self.current_token = self.current_token.copy_and_replace( stream_key, stream_id ) @@ -167,17 +174,6 @@ class Notifier(object): lambda: count(bool, self.appservice_to_user_streams.values()), ) - def notify_pending_new_room_events(self, max_room_stream_id): - pending = sorted(self.pending_new_room_events) - self.pending_new_room_events = [] - for event, room_stream_id, extra_users in pending: - if room_stream_id > max_room_stream_id: - self.pending_new_room_events.append(( - event, room_stream_id, extra_users - )) - else: - self._on_new_room_event(event, room_stream_id, extra_users) - @log_function @defer.inlineCallbacks def on_new_room_event(self, event, room_stream_id, max_room_stream_id, @@ -188,19 +184,37 @@ class Notifier(object): This triggers the notifier to wake up any listeners that are listening to the room, and any listeners for the users in the `extra_users` param. + + The events can be peristed out of order. The notifier will wait + until all previous events have been persisted before notifying + the client streams. """ yield run_on_reactor() - self.notify_pending_new_room_events(max_room_stream_id) + self.pending_new_room_events.append(( + event, room_stream_id, extra_users + )) + self._notify_pending_new_room_events(max_room_stream_id) - if room_stream_id > max_room_stream_id: - self.pending_new_room_events.append(( - event, room_stream_id, extra_users - )) - else: - self._on_new_room_event(event, room_stream_id, extra_users) + def _notify_pending_new_room_events(self, max_room_stream_id): + """Notify for the room events that were queued waiting for a previous + event to be persisted. + Args: + max_room_stream_id(int): The highest stream_id below which all + events have been persisted. + """ + pending = sorted(self.pending_new_room_events) + self.pending_new_room_events = [] + for event, room_stream_id, extra_users in pending: + if room_stream_id > max_room_stream_id: + self.pending_new_room_events.append(( + event, room_stream_id, extra_users + )) + else: + self._on_new_room_event(event, room_stream_id, extra_users) def _on_new_room_event(self, event, room_stream_id, extra_users=[]): + """Notify any user streams that are interested in this room event""" # poke any interested application service. self.hs.get_handlers().appservice_handler.notify_interested_services( event From 1e90715a3d5f8a910c187dec888283e110a3c04a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 18 May 2015 13:17:36 +0100 Subject: [PATCH 10/11] Make sure the notifier stream token goes forward when it is updated. Sort the pending events by the correct room_stream_id --- synapse/notifier.py | 8 ++++---- synapse/types.py | 17 +++++++++++++++++ 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 862b42cfc8..0b5d97521e 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -88,7 +88,7 @@ class _NotifierUserStream(object): stream_id(str): The new id for the stream the event came from. time_now_ms(int): The current time in milliseconds. """ - self.current_token = self.current_token.copy_and_replace( + self.current_token = self.current_token.copy_and_advance( stream_key, stream_id ) if self.listeners: @@ -192,7 +192,7 @@ class Notifier(object): yield run_on_reactor() self.pending_new_room_events.append(( - event, room_stream_id, extra_users + room_stream_id, event, extra_users )) self._notify_pending_new_room_events(max_room_stream_id) @@ -205,10 +205,10 @@ class Notifier(object): """ pending = sorted(self.pending_new_room_events) self.pending_new_room_events = [] - for event, room_stream_id, extra_users in pending: + for room_stream_id, event, extra_users in pending: if room_stream_id > max_room_stream_id: self.pending_new_room_events.append(( - event, room_stream_id, extra_users + room_stream_id, event, extra_users )) else: self._on_new_room_event(event, room_stream_id, extra_users) diff --git a/synapse/types.py b/synapse/types.py index d89a04f7c3..1b21160c57 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -119,6 +119,7 @@ class StreamToken( @property def room_stream_id(self): # TODO(markjh): Awful hack to work around hacks in the presence tests + # which assume that the keys are integers. if type(self.room_key) is int: return self.room_key else: @@ -132,6 +133,22 @@ class StreamToken( or (int(other_token.typing_key) < int(self.typing_key)) ) + def copy_and_advance(self, key, new_value): + """Advance the given key in the token to a new value if and only if the + new value is after the old value. + """ + new_token = self.copy_and_replace(key, new_value) + if key == "room_key": + new_id = new_token.room_stream_id + old_id = self.room_stream_id + else: + new_id = int(getattr(new_token, key)) + old_id = int(getattr(self, key)) + if old_id < new_id: + return new_token + else: + return self + def copy_and_replace(self, key, new_value): d = self._asdict() d[key] = new_value From ad31fa304022e593b6c9354d3a42df761597e69b Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 18 May 2015 14:04:58 +0100 Subject: [PATCH 11/11] Don't bother sorting by the room_stream_ids, it shouldn't matter which order they are notified in --- synapse/notifier.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 0b5d97521e..1e73d52c4d 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -203,7 +203,7 @@ class Notifier(object): max_room_stream_id(int): The highest stream_id below which all events have been persisted. """ - pending = sorted(self.pending_new_room_events) + pending = self.pending_new_room_events self.pending_new_room_events = [] for room_stream_id, event, extra_users in pending: if room_stream_id > max_room_stream_id: