Add a NotifierUserStream to hold all the notification listeners for a user

This commit is contained in:
Mark Haines 2015-05-12 11:00:37 +01:00
parent e269c511f6
commit 5c75adff95

View file

@ -43,28 +43,18 @@ def count(func, l):
class _NotificationListener(object): class _NotificationListener(object):
""" This represents a single client connection to the events stream. """ This represents a single client connection to the events stream.
The events stream handler will have yielded to the deferred, so to The events stream handler will have yielded to the deferred, so to
notify the handler it is sufficient to resolve the deferred. notify the handler it is sufficient to resolve the deferred.
This listener will also keep track of which rooms it is listening in
so that it can remove itself from the indexes in the Notifier class.
""" """
def __init__(self, user, rooms, deferred, appservice=None): def __init__(self, deferred):
self.user = user
self.appservice = appservice
self.deferred = deferred self.deferred = deferred
self.rooms = rooms
self.timer = None
def notified(self): def notified(self):
return self.deferred.called return self.deferred.called
def notify(self, notifier): def notify(self):
""" Inform whoever is listening about the new events. This will """ Inform whoever is listening about the new events.
also remove this listener from all the indexes in the Notifier
it knows about.
""" """
try: try:
@ -72,27 +62,45 @@ class _NotificationListener(object):
except defer.AlreadyCalledError: except defer.AlreadyCalledError:
pass pass
# Should the following be done be using intrusively linked lists?
# -- erikj class _NotifierUserStream(object):
"""This represents a user connected to the event stream.
It tracks the most recent stream token for that user.
At a given point a user may have a number of streams listening for
events.
This listener will also keep track of which rooms it is listening in
so that it can remove itself from the indexes in the Notifier class.
"""
def __init__(self, user, rooms, current_token, appservice=None):
self.user = user
self.appservice = appservice
self.listeners = set()
self.rooms = rooms
self.current_token = current_token
def notify(self, new_token):
for listener in self.listeners:
listener.notify(new_token)
self.listeners.clear()
def remove(self, notifier):
""" Remove this listener from all the indexes in the Notifier
it knows about.
"""
for room in self.rooms: for room in self.rooms:
lst = notifier.room_to_listeners.get(room, set()) lst = notifier.room_to_user_streams.get(room, set())
lst.discard(self) lst.discard(self)
notifier.user_to_listeners.get(self.user, set()).discard(self) notifier.user_to_user_streams.get(self.user, set()).discard(self)
if self.appservice: if self.appservice:
notifier.appservice_to_listeners.get( notifier.appservice_to_user_streams.get(
self.appservice, set() self.appservice, set()
).discard(self) ).discard(self)
# Cancel the timeout for this notifer if one exists.
if self.timer is not None:
try:
notifier.clock.cancel_call_later(self.timer)
except:
logger.warn("Failed to cancel notifier timer")
class Notifier(object): class Notifier(object):
""" This class is responsible for notifying any listeners when there are """ This class is responsible for notifying any listeners when there are
@ -104,11 +112,12 @@ class Notifier(object):
def __init__(self, hs): def __init__(self, hs):
self.hs = hs self.hs = hs
self.room_to_listeners = {} self.user_to_user_stream = {}
self.user_to_listeners = {} self.room_to_user_streams = {}
self.appservice_to_listeners = {} self.appservice_to_user_streams = {}
self.event_sources = hs.get_event_sources() self.event_sources = hs.get_event_sources()
self.store = hs.get_datastore()
self.clock = hs.get_clock() self.clock = hs.get_clock()
@ -120,34 +129,34 @@ class Notifier(object):
# when rendering the metrics page, which is likely once per minute at # when rendering the metrics page, which is likely once per minute at
# most when scraping it. # most when scraping it.
def count_listeners(): def count_listeners():
all_listeners = set() all_user_streams = set()
for x in self.room_to_listeners.values(): for x in self.room_to_user_streams.values():
all_listeners |= x all_user_streams |= x
for x in self.user_to_listeners.values(): for x in self.user_to_user_streams.values():
all_listeners |= x all_user_streams |= x
for x in self.appservice_to_listeners.values(): for x in self.appservice_to_user_streams.values():
all_listeners |= x all_user_streams |= x
return len(all_listeners) return sum(len(stream.listeners) for stream in all_user_streams)
metrics.register_callback("listeners", count_listeners) metrics.register_callback("listeners", count_listeners)
metrics.register_callback( metrics.register_callback(
"rooms", "rooms",
lambda: count(bool, self.room_to_listeners.values()), lambda: count(bool, self.room_to_user_streams.values()),
) )
metrics.register_callback( metrics.register_callback(
"users", "users",
lambda: count(bool, self.user_to_listeners.values()), lambda: len(self.user_to_user_stream),
) )
metrics.register_callback( metrics.register_callback(
"appservices", "appservices",
lambda: count(bool, self.appservice_to_listeners.values()), lambda: count(bool, self.appservice_to_user_streams.values()),
) )
@log_function @log_function
@defer.inlineCallbacks @defer.inlineCallbacks
def on_new_room_event(self, event, extra_users=[]): def on_new_room_event(self, event, new_token, extra_users=[]):
""" Used by handlers to inform the notifier something has happened """ Used by handlers to inform the notifier something has happened
in the room, room event wise. in the room, room event wise.
@ -155,6 +164,7 @@ class Notifier(object):
listening to the room, and any listeners for the users in the listening to the room, and any listeners for the users in the
`extra_users` param. `extra_users` param.
""" """
assert isinstance(new_token, StreamToken)
yield run_on_reactor() yield run_on_reactor()
# poke any interested application service. # poke any interested application service.
self.hs.get_handlers().appservice_handler.notify_interested_services( self.hs.get_handlers().appservice_handler.notify_interested_services(
@ -163,72 +173,60 @@ class Notifier(object):
room_id = event.room_id room_id = event.room_id
room_listeners = self.room_to_listeners.get(room_id, set()) room_user_streams = self.room_to_user_streams.get(room_id, set())
_discard_if_notified(room_listeners) user_streams = room_user_streams.copy()
listeners = room_listeners.copy()
for user in extra_users: for user in extra_users:
user_listeners = self.user_to_listeners.get(user, set()) user_stream = self.user_to_user_stream.get(user)
if user_stream is not None:
user_streams.add(user_stream)
_discard_if_notified(user_listeners) for appservice in self.appservice_to_user_streams:
listeners |= user_listeners
for appservice in self.appservice_to_listeners:
# TODO (kegan): Redundant appservice listener checks? # TODO (kegan): Redundant appservice listener checks?
# App services will already be in the room_to_listeners set, but # App services will already be in the room_to_user_streams set, but
# that isn't enough. They need to be checked here in order to # that isn't enough. They need to be checked here in order to
# receive *invites* for users they are interested in. Does this # receive *invites* for users they are interested in. Does this
# make the room_to_listeners check somewhat obselete? # make the room_to_user_streams check somewhat obselete?
if appservice.is_interested(event): if appservice.is_interested(event):
app_listeners = self.appservice_to_listeners.get( app_user_streams = self.appservice_to_user_streams.get(
appservice, set() appservice, set()
) )
user_streams |= app_user_streams
_discard_if_notified(app_listeners) logger.debug("on_new_room_event listeners %s", user_streams)
listeners |= app_listeners
logger.debug("on_new_room_event listeners %s", listeners)
with PreserveLoggingContext(): with PreserveLoggingContext():
for listener in listeners: for user_stream in user_streams:
try: try:
listener.notify(self) user_stream.notify(new_token)
except: except:
logger.exception("Failed to notify listener") logger.exception("Failed to notify listener")
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def on_new_user_event(self, users=[], rooms=[]): def on_new_user_event(self, new_token, users=[], rooms=[]):
""" Used to inform listeners that something has happend """ Used to inform listeners that something has happend
presence/user event wise. presence/user event wise.
Will wake up all listeners for the given users and rooms. Will wake up all listeners for the given users and rooms.
""" """
assert isinstance(new_token, StreamToken)
yield run_on_reactor() yield run_on_reactor()
listeners = set() user_streams = set()
for user in users: for user in users:
user_listeners = self.user_to_listeners.get(user, set()) user_stream = self.user_to_user_stream.get(user)
if user_stream:
_discard_if_notified(user_listeners) user_stream.add(user_stream)
listeners |= user_listeners
for room in rooms: for room in rooms:
room_listeners = self.room_to_listeners.get(room, set()) user_streams |= self.room_to_user_streams.get(room, set())
_discard_if_notified(room_listeners)
listeners |= room_listeners
with PreserveLoggingContext(): with PreserveLoggingContext():
for listener in listeners: for user_stream in user_streams:
try: try:
listener.notify(self) user_streams.notify(new_token)
except: except:
logger.exception("Failed to notify listener") logger.exception("Failed to notify listener")
@ -240,21 +238,32 @@ class Notifier(object):
""" """
deferred = defer.Deferred() deferred = defer.Deferred()
appservice = yield self.hs.get_datastore().get_app_service_by_user_id(
user.to_string()
)
listener = [_NotificationListener( user_stream = self.user_to_user_streams.get(user)
user=user, if user_stream is None:
rooms=rooms, appservice = yield self.store.get_app_service_by_user_id(
deferred=deferred, user.to_string()
appservice=appservice, )
)] current_token = yield self.event_sources.get_current_token()
user_stream = _NotifierUserStream(
user=user,
rooms=rooms,
appservice=appservice,
current_token=current_token,
)
self._register_with_keys(user_stream)
else:
current_token = user_stream.current_token
if timeout: if timeout and not current_token.is_after(from_token):
self._register_with_keys(listener[0]) listener = [_NotificationListener(deferred)]
user_stream.listeners.add(listener[0])
if current_token.is_after(from_token):
result = yield callback(from_token, current_token)
else:
result = None
result = yield callback()
timer = [None] timer = [None]
if timeout: if timeout:
@ -263,23 +272,19 @@ class Notifier(object):
def _timeout_listener(): def _timeout_listener():
timed_out[0] = True timed_out[0] = True
timer[0] = None timer[0] = None
listener[0].notify(self) listener[0].notify(user_stream)
# We create multiple notification listeners so we have to manage # We create multiple notification listeners so we have to manage
# canceling the timeout ourselves. # canceling the timeout ourselves.
timer[0] = self.clock.call_later(timeout/1000., _timeout_listener) timer[0] = self.clock.call_later(timeout/1000., _timeout_listener)
while not result and not timed_out[0]: while not result and not timed_out[0]:
yield deferred new_token = yield deferred
deferred = defer.Deferred() deferred = defer.Deferred()
listener[0] = _NotificationListener( listener[0] = _NotificationListener(deferred)
user=user, user_stream.listeners.add(listener[0])
rooms=rooms, result = yield callback(current_token, new_token)
deferred=deferred, current_token = new_token
appservice=appservice,
)
self._register_with_keys(listener[0])
result = yield callback()
if timer[0] is not None: if timer[0] is not None:
try: try:
@ -302,7 +307,7 @@ class Notifier(object):
limit = pagination_config.limit limit = pagination_config.limit
@defer.inlineCallbacks @defer.inlineCallbacks
def check_for_updates(): def check_for_updates(start_token, end_token):
events = [] events = []
end_token = from_token end_token = from_token
for name, source in self.event_sources.sources.items(): for name, source in self.event_sources.sources.items():
@ -328,26 +333,23 @@ class Notifier(object):
defer.returnValue(result) defer.returnValue(result)
@log_function @log_function
def _register_with_keys(self, listener): def _register_with_keys(self, user_stream):
for room in listener.rooms: self.user_to_user_stream[user_stream.user] = user_stream
s = self.room_to_listeners.setdefault(room, set())
s.add(listener)
self.user_to_listeners.setdefault(listener.user, set()).add(listener) for room in user_stream.rooms:
s = self.room_to_user_stream.setdefault(room, set())
s.add(user_stream)
if listener.appservice: if user_stream.appservice:
self.appservice_to_listeners.setdefault( self.appservice_to_user_stream.setdefault(
listener.appservice, set() user_stream.appservice, set()
).add(listener) ).add(user_stream)
def _user_joined_room(self, user, room_id): def _user_joined_room(self, user, room_id):
new_listeners = self.user_to_listeners.get(user, set()) new_user_stream = self.user_to_user_stream.get(user)
room_streams = self.room_to_user_streams.setdefault(room_id, set())
listeners = self.room_to_listeners.setdefault(room_id, set()) room_streams.add(new_user_stream)
listeners |= new_listeners new_user_stream.rooms.add(room_id)
for l in new_listeners:
l.rooms.add(room_id)
def _discard_if_notified(listener_set): def _discard_if_notified(listener_set):