mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-25 02:55:46 +03:00
Combine Limiter and Linearizer
Linearizer was effectively a Limiter with max_count=1, so rather than maintaining two sets of code, let's combine them.
This commit is contained in:
parent
8462c26485
commit
7c712f95bb
4 changed files with 59 additions and 161 deletions
|
@ -33,7 +33,7 @@ from synapse.events.utils import serialize_event
|
||||||
from synapse.events.validator import EventValidator
|
from synapse.events.validator import EventValidator
|
||||||
from synapse.replication.http.send_event import send_event_to_master
|
from synapse.replication.http.send_event import send_event_to_master
|
||||||
from synapse.types import RoomAlias, RoomStreamToken, UserID
|
from synapse.types import RoomAlias, RoomStreamToken, UserID
|
||||||
from synapse.util.async import Limiter, ReadWriteLock
|
from synapse.util.async import Linearizer, ReadWriteLock
|
||||||
from synapse.util.frozenutils import frozendict_json_encoder
|
from synapse.util.frozenutils import frozendict_json_encoder
|
||||||
from synapse.util.logcontext import run_in_background
|
from synapse.util.logcontext import run_in_background
|
||||||
from synapse.util.metrics import measure_func
|
from synapse.util.metrics import measure_func
|
||||||
|
@ -427,7 +427,7 @@ class EventCreationHandler(object):
|
||||||
|
|
||||||
# We arbitrarily limit concurrent event creation for a room to 5.
|
# We arbitrarily limit concurrent event creation for a room to 5.
|
||||||
# This is to stop us from diverging history *too* much.
|
# This is to stop us from diverging history *too* much.
|
||||||
self.limiter = Limiter(max_count=5, name="room_event_creation_limit")
|
self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
|
||||||
|
|
||||||
self.action_generator = hs.get_action_generator()
|
self.action_generator = hs.get_action_generator()
|
||||||
|
|
||||||
|
|
|
@ -157,91 +157,8 @@ def concurrently_execute(func, args, limit):
|
||||||
|
|
||||||
|
|
||||||
class Linearizer(object):
|
class Linearizer(object):
|
||||||
"""Linearizes access to resources based on a key. Useful to ensure only one
|
|
||||||
thing is happening at a time on a given resource.
|
|
||||||
|
|
||||||
Example:
|
|
||||||
|
|
||||||
with (yield linearizer.queue("test_key")):
|
|
||||||
# do some work.
|
|
||||||
|
|
||||||
"""
|
|
||||||
def __init__(self, name=None, clock=None):
|
|
||||||
if name is None:
|
|
||||||
self.name = id(self)
|
|
||||||
else:
|
|
||||||
self.name = name
|
|
||||||
self.key_to_defer = {}
|
|
||||||
|
|
||||||
if not clock:
|
|
||||||
from twisted.internet import reactor
|
|
||||||
clock = Clock(reactor)
|
|
||||||
self._clock = clock
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def queue(self, key):
|
|
||||||
# If there is already a deferred in the queue, we pull it out so that
|
|
||||||
# we can wait on it later.
|
|
||||||
# Then we replace it with a deferred that we resolve *after* the
|
|
||||||
# context manager has exited.
|
|
||||||
# We only return the context manager after the previous deferred has
|
|
||||||
# resolved.
|
|
||||||
# This all has the net effect of creating a chain of deferreds that
|
|
||||||
# wait for the previous deferred before starting their work.
|
|
||||||
current_defer = self.key_to_defer.get(key)
|
|
||||||
|
|
||||||
new_defer = defer.Deferred()
|
|
||||||
self.key_to_defer[key] = new_defer
|
|
||||||
|
|
||||||
if current_defer:
|
|
||||||
logger.info(
|
|
||||||
"Waiting to acquire linearizer lock %r for key %r", self.name, key
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
with PreserveLoggingContext():
|
|
||||||
yield current_defer
|
|
||||||
except Exception:
|
|
||||||
logger.exception("Unexpected exception in Linearizer")
|
|
||||||
|
|
||||||
logger.info("Acquired linearizer lock %r for key %r", self.name,
|
|
||||||
key)
|
|
||||||
|
|
||||||
# if the code holding the lock completes synchronously, then it
|
|
||||||
# will recursively run the next claimant on the list. That can
|
|
||||||
# relatively rapidly lead to stack exhaustion. This is essentially
|
|
||||||
# the same problem as http://twistedmatrix.com/trac/ticket/9304.
|
|
||||||
#
|
|
||||||
# In order to break the cycle, we add a cheeky sleep(0) here to
|
|
||||||
# ensure that we fall back to the reactor between each iteration.
|
|
||||||
#
|
|
||||||
# (There's no particular need for it to happen before we return
|
|
||||||
# the context manager, but it needs to happen while we hold the
|
|
||||||
# lock, and the context manager's exit code must be synchronous,
|
|
||||||
# so actually this is the only sensible place.
|
|
||||||
yield self._clock.sleep(0)
|
|
||||||
|
|
||||||
else:
|
|
||||||
logger.info("Acquired uncontended linearizer lock %r for key %r",
|
|
||||||
self.name, key)
|
|
||||||
|
|
||||||
@contextmanager
|
|
||||||
def _ctx_manager():
|
|
||||||
try:
|
|
||||||
yield
|
|
||||||
finally:
|
|
||||||
logger.info("Releasing linearizer lock %r for key %r", self.name, key)
|
|
||||||
with PreserveLoggingContext():
|
|
||||||
new_defer.callback(None)
|
|
||||||
current_d = self.key_to_defer.get(key)
|
|
||||||
if current_d is new_defer:
|
|
||||||
self.key_to_defer.pop(key, None)
|
|
||||||
|
|
||||||
defer.returnValue(_ctx_manager())
|
|
||||||
|
|
||||||
|
|
||||||
class Limiter(object):
|
|
||||||
"""Limits concurrent access to resources based on a key. Useful to ensure
|
"""Limits concurrent access to resources based on a key. Useful to ensure
|
||||||
only a few thing happen at a time on a given resource.
|
only a few things happen at a time on a given resource.
|
||||||
|
|
||||||
Example:
|
Example:
|
||||||
|
|
||||||
|
@ -249,7 +166,7 @@ class Limiter(object):
|
||||||
# do some work.
|
# do some work.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
def __init__(self, max_count=1, name=None, clock=None):
|
def __init__(self, name=None, max_count=1, clock=None):
|
||||||
"""
|
"""
|
||||||
Args:
|
Args:
|
||||||
max_count(int): The maximum number of concurrent accesses
|
max_count(int): The maximum number of concurrent accesses
|
||||||
|
@ -283,10 +200,12 @@ class Limiter(object):
|
||||||
new_defer = defer.Deferred()
|
new_defer = defer.Deferred()
|
||||||
entry[1].append(new_defer)
|
entry[1].append(new_defer)
|
||||||
|
|
||||||
logger.info("Waiting to acquire limiter lock %r for key %r", self.name, key)
|
logger.info(
|
||||||
|
"Waiting to acquire linearizer lock %r for key %r", self.name, key,
|
||||||
|
)
|
||||||
yield make_deferred_yieldable(new_defer)
|
yield make_deferred_yieldable(new_defer)
|
||||||
|
|
||||||
logger.info("Acquired limiter lock %r for key %r", self.name, key)
|
logger.info("Acquired linearizer lock %r for key %r", self.name, key)
|
||||||
entry[0] += 1
|
entry[0] += 1
|
||||||
|
|
||||||
# if the code holding the lock completes synchronously, then it
|
# if the code holding the lock completes synchronously, then it
|
||||||
|
@ -302,7 +221,9 @@ class Limiter(object):
|
||||||
yield self._clock.sleep(0)
|
yield self._clock.sleep(0)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.info("Acquired uncontended limiter lock %r for key %r", self.name, key)
|
logger.info(
|
||||||
|
"Acquired uncontended linearizer lock %r for key %r", self.name, key,
|
||||||
|
)
|
||||||
entry[0] += 1
|
entry[0] += 1
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
|
@ -310,7 +231,7 @@ class Limiter(object):
|
||||||
try:
|
try:
|
||||||
yield
|
yield
|
||||||
finally:
|
finally:
|
||||||
logger.info("Releasing limiter lock %r for key %r", self.name, key)
|
logger.info("Releasing linearizer lock %r for key %r", self.name, key)
|
||||||
|
|
||||||
# We've finished executing so check if there are any things
|
# We've finished executing so check if there are any things
|
||||||
# blocked waiting to execute and start one of them
|
# blocked waiting to execute and start one of them
|
||||||
|
|
|
@ -1,70 +0,0 @@
|
||||||
# -*- coding: utf-8 -*-
|
|
||||||
# Copyright 2016 OpenMarket Ltd
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
|
|
||||||
from twisted.internet import defer
|
|
||||||
|
|
||||||
from synapse.util.async import Limiter
|
|
||||||
|
|
||||||
from tests import unittest
|
|
||||||
|
|
||||||
|
|
||||||
class LimiterTestCase(unittest.TestCase):
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def test_limiter(self):
|
|
||||||
limiter = Limiter(3)
|
|
||||||
|
|
||||||
key = object()
|
|
||||||
|
|
||||||
d1 = limiter.queue(key)
|
|
||||||
cm1 = yield d1
|
|
||||||
|
|
||||||
d2 = limiter.queue(key)
|
|
||||||
cm2 = yield d2
|
|
||||||
|
|
||||||
d3 = limiter.queue(key)
|
|
||||||
cm3 = yield d3
|
|
||||||
|
|
||||||
d4 = limiter.queue(key)
|
|
||||||
self.assertFalse(d4.called)
|
|
||||||
|
|
||||||
d5 = limiter.queue(key)
|
|
||||||
self.assertFalse(d5.called)
|
|
||||||
|
|
||||||
with cm1:
|
|
||||||
self.assertFalse(d4.called)
|
|
||||||
self.assertFalse(d5.called)
|
|
||||||
|
|
||||||
cm4 = yield d4
|
|
||||||
self.assertFalse(d5.called)
|
|
||||||
|
|
||||||
with cm3:
|
|
||||||
self.assertFalse(d5.called)
|
|
||||||
|
|
||||||
cm5 = yield d5
|
|
||||||
|
|
||||||
with cm2:
|
|
||||||
pass
|
|
||||||
|
|
||||||
with cm4:
|
|
||||||
pass
|
|
||||||
|
|
||||||
with cm5:
|
|
||||||
pass
|
|
||||||
|
|
||||||
d6 = limiter.queue(key)
|
|
||||||
with (yield d6):
|
|
||||||
pass
|
|
|
@ -1,5 +1,6 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
# Copyright 2016 OpenMarket Ltd
|
# Copyright 2016 OpenMarket Ltd
|
||||||
|
# Copyright 2018 New Vector Ltd.
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
# you may not use this file except in compliance with the License.
|
# you may not use this file except in compliance with the License.
|
||||||
|
@ -65,3 +66,49 @@ class LinearizerTestCase(unittest.TestCase):
|
||||||
func(i)
|
func(i)
|
||||||
|
|
||||||
return func(1000)
|
return func(1000)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_multiple_entries(self):
|
||||||
|
limiter = Linearizer(max_count=3)
|
||||||
|
|
||||||
|
key = object()
|
||||||
|
|
||||||
|
d1 = limiter.queue(key)
|
||||||
|
cm1 = yield d1
|
||||||
|
|
||||||
|
d2 = limiter.queue(key)
|
||||||
|
cm2 = yield d2
|
||||||
|
|
||||||
|
d3 = limiter.queue(key)
|
||||||
|
cm3 = yield d3
|
||||||
|
|
||||||
|
d4 = limiter.queue(key)
|
||||||
|
self.assertFalse(d4.called)
|
||||||
|
|
||||||
|
d5 = limiter.queue(key)
|
||||||
|
self.assertFalse(d5.called)
|
||||||
|
|
||||||
|
with cm1:
|
||||||
|
self.assertFalse(d4.called)
|
||||||
|
self.assertFalse(d5.called)
|
||||||
|
|
||||||
|
cm4 = yield d4
|
||||||
|
self.assertFalse(d5.called)
|
||||||
|
|
||||||
|
with cm3:
|
||||||
|
self.assertFalse(d5.called)
|
||||||
|
|
||||||
|
cm5 = yield d5
|
||||||
|
|
||||||
|
with cm2:
|
||||||
|
pass
|
||||||
|
|
||||||
|
with cm4:
|
||||||
|
pass
|
||||||
|
|
||||||
|
with cm5:
|
||||||
|
pass
|
||||||
|
|
||||||
|
d6 = limiter.queue(key)
|
||||||
|
with (yield d6):
|
||||||
|
pass
|
||||||
|
|
Loading…
Reference in a new issue