1
0
Fork 0
mirror of https://github.com/element-hq/synapse.git synced 2024-12-22 04:34:28 +03:00
This commit is contained in:
Erik Johnston 2016-11-11 10:42:08 +00:00
parent 2bd4513a4d
commit 64038b806c
2 changed files with 15 additions and 0 deletions
synapse
handlers
util

View file

@ -50,6 +50,8 @@ class MessageHandler(BaseHandler):
self.pagination_lock = ReadWriteLock() self.pagination_lock = ReadWriteLock()
# We arbitrarily limit concurrent event creation for a room to 5.
# This is to stop us from diverging history *too* much.
self.limiter = Limiter(max_count=5) self.limiter = Limiter(max_count=5)
@defer.inlineCallbacks @defer.inlineCallbacks

View file

@ -213,12 +213,21 @@ class Limiter(object):
max_count(int): The maximum number of concurrent access max_count(int): The maximum number of concurrent access
""" """
self.max_count = max_count self.max_count = max_count
# key_to_defer is a map from the key to a 2 element list where
# the first element is the number of things executing
# the second element is a list of deferreds for the things blocked from
# executing.
self.key_to_defer = {} self.key_to_defer = {}
@defer.inlineCallbacks @defer.inlineCallbacks
def queue(self, key): def queue(self, key):
entry = self.key_to_defer.setdefault(key, [0, []]) entry = self.key_to_defer.setdefault(key, [0, []])
# If the number of things executing is greater than the maximum
# then add a deferred to the list of blocked items
# When on of the things currently executing finishes it will callback
# this item so that it can continue executing.
if entry[0] >= self.max_count: if entry[0] >= self.max_count:
new_defer = defer.Deferred() new_defer = defer.Deferred()
entry[1].append(new_defer) entry[1].append(new_defer)
@ -232,10 +241,14 @@ class Limiter(object):
try: try:
yield yield
finally: finally:
# We've finished executing so check if there are any things
# blocked waiting to execute and start one of them
entry[0] -= 1 entry[0] -= 1
try: try:
entry[1].pop(0).callback(None) entry[1].pop(0).callback(None)
except IndexError: except IndexError:
# If nothing else is executing for this key then remove it
# from the map
if entry[0] == 0: if entry[0] == 0:
self.key_to_defer.pop(key, None) self.key_to_defer.pop(key, None)