diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 90519e33e3..4d0515ddfb 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -50,6 +50,8 @@ class MessageHandler(BaseHandler): self.pagination_lock = ReadWriteLock() + # We arbitrarily limit concurrent event creation for a room to 5. + # This is to stop us from diverging history *too* much. self.limiter = Limiter(max_count=5) @defer.inlineCallbacks diff --git a/synapse/util/async.py b/synapse/util/async.py index 2a680fff5e..16ed183d4c 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -213,12 +213,21 @@ class Limiter(object): max_count(int): The maximum number of concurrent access """ self.max_count = max_count + + # key_to_defer is a map from the key to a 2 element list where + # the first element is the number of things executing + # the second element is a list of deferreds for the things blocked from + # executing. self.key_to_defer = {} @defer.inlineCallbacks def queue(self, key): entry = self.key_to_defer.setdefault(key, [0, []]) + # If the number of things executing is greater than the maximum + # then add a deferred to the list of blocked items + # When on of the things currently executing finishes it will callback + # this item so that it can continue executing. if entry[0] >= self.max_count: new_defer = defer.Deferred() entry[1].append(new_defer) @@ -232,10 +241,14 @@ class Limiter(object): try: yield finally: + # We've finished executing so check if there are any things + # blocked waiting to execute and start one of them entry[0] -= 1 try: entry[1].pop(0).callback(None) except IndexError: + # If nothing else is executing for this key then remove it + # from the map if entry[0] == 0: self.key_to_defer.pop(key, None)