Deduplicate joins

This commit is contained in:
Erik Johnston 2016-04-06 15:44:22 +01:00
parent 87a30890a3
commit af03ecf352
4 changed files with 118 additions and 1 deletions

View file

@ -24,6 +24,7 @@ from synapse.api.constants import (
) )
from synapse.api.errors import AuthError, SynapseError, Codes from synapse.api.errors import AuthError, SynapseError, Codes
from synapse.util.logcontext import preserve_context_over_fn from synapse.util.logcontext import preserve_context_over_fn
from synapse.util.async import Linearizer
from signedjson.sign import verify_signed_json from signedjson.sign import verify_signed_json
from signedjson.key import decode_verify_key_bytes from signedjson.key import decode_verify_key_bytes
@ -60,6 +61,8 @@ class RoomMemberHandler(BaseHandler):
def __init__(self, hs): def __init__(self, hs):
super(RoomMemberHandler, self).__init__(hs) super(RoomMemberHandler, self).__init__(hs)
self.member_linearizer = Linearizer()
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.distributor = hs.get_distributor() self.distributor = hs.get_distributor()
@ -182,6 +185,34 @@ class RoomMemberHandler(BaseHandler):
remote_room_hosts=None, remote_room_hosts=None,
third_party_signed=None, third_party_signed=None,
ratelimit=True, ratelimit=True,
):
key = (target, room_id,)
with (yield self.member_linearizer.queue(key)):
result = yield self._update_membership(
requester,
target,
room_id,
action,
txn_id=txn_id,
remote_room_hosts=remote_room_hosts,
third_party_signed=third_party_signed,
ratelimit=ratelimit,
)
defer.returnValue(result)
@defer.inlineCallbacks
def _update_membership(
self,
requester,
target,
room_id,
action,
txn_id=None,
remote_room_hosts=None,
third_party_signed=None,
ratelimit=True,
): ):
effective_membership_state = action effective_membership_state = action
if action in ["kick", "unban"]: if action in ["kick", "unban"]:

View file

@ -19,6 +19,8 @@ from twisted.internet import defer, reactor
from .logcontext import PreserveLoggingContext, preserve_fn from .logcontext import PreserveLoggingContext, preserve_fn
from synapse.util import unwrapFirstError from synapse.util import unwrapFirstError
from contextlib import contextmanager
@defer.inlineCallbacks @defer.inlineCallbacks
def sleep(seconds): def sleep(seconds):
@ -137,3 +139,43 @@ def concurrently_execute(func, args, limit):
preserve_fn(_concurrently_execute_inner)() preserve_fn(_concurrently_execute_inner)()
for _ in xrange(limit) for _ in xrange(limit)
], consumeErrors=True).addErrback(unwrapFirstError) ], consumeErrors=True).addErrback(unwrapFirstError)
@contextmanager
def _trigger_defer_manager(d):
try:
yield
finally:
d.callback(None)
class Linearizer(object):
"""Linearizes access to resources based on a key. Useful to ensure only one
thing is happening at a time on a given resource.
Example:
with (yield linearizer.queue("test_key")):
# do some work.
"""
def __init__(self):
self.key_to_defer = {}
@defer.inlineCallbacks
def queue(self, key):
current_defer = self.key_to_defer.get(key)
new_defer = defer.Deferred()
self.key_to_defer[key] = new_defer
def remove_if_current(_):
d = self.key_to_defer.get(key)
if d is new_defer:
self.key_to_defer.pop(key, None)
new_defer.addBoth(remove_if_current)
yield current_defer
defer.returnValue(_trigger_defer_manager(new_defer))

View file

@ -35,7 +35,7 @@ class ResponseCache(object):
return None return None
def set(self, key, deferred): def set(self, key, deferred):
result = ObservableDeferred(deferred) result = ObservableDeferred(deferred, consumeErrors=True)
self.pending_result_cache[key] = result self.pending_result_cache[key] = result
def remove(r): def remove(r):

View file

@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from tests import unittest
from twisted.internet import defer
from synapse.util.async import Linearizer
class LinearizerTestCase(unittest.TestCase):
@defer.inlineCallbacks
def test_linearizer(self):
linearizer = Linearizer()
key = object()
d1 = linearizer.queue(key)
cm1 = yield d1
d2 = linearizer.queue(key)
self.assertFalse(d2.called)
with cm1:
self.assertFalse(d2.called)
self.assertTrue(d2.called)
with (yield d2):
pass