From ef910a0358d1a1bd608576cfc07edc0a4f2649aa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 May 2015 17:17:04 +0100 Subject: [PATCH 01/10] Do work in parellel when joining a room --- synapse/handlers/federation.py | 69 ++++++++++++++++++++++++---------- 1 file changed, 50 insertions(+), 19 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 880cbd77e7..78f2bfc212 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -516,30 +516,59 @@ class FederationHandler(BaseHandler): # FIXME pass + auth_ids_to_deferred = {} + + def process_auth_ev(ev): + auth_ids = [e_id for e_id, _ in ev.auth_events] + + prev_ds = [ + auth_ids_to_deferred[i] + for i in auth_ids + if i in auth_ids_to_deferred + ] + + d = defer.Deferred() + + auth_ids_to_deferred[ev.event_id] = d + + @defer.inlineCallbacks + def f(*_): + ev.internal_metadata.outlier = True + + try: + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + + yield self._handle_new_event( + origin, ev, auth_events=auth + ) + except: + logger.exception( + "Failed to handle auth event %s", + ev.event_id, + ) + + d.callback(None) + + if prev_ds: + dx = defer.DeferredList(prev_ds) + dx.addBoth(f) + else: + f() + for e in auth_chain: - e.internal_metadata.outlier = True - if e.event_id == event.event_id: - continue + return + process_auth_ev(e) - try: - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { - (e.type, e.state_key): e for e in auth_chain - if e.event_id in auth_ids - } - yield self._handle_new_event( - origin, e, auth_events=auth - ) - except: - logger.exception( - "Failed to handle auth event %s", - e.event_id, - ) + yield defer.DeferredList(auth_ids_to_deferred.values()) - for e in state: + @defer.inlineCallbacks + def handle_state(e): if e.event_id == event.event_id: - continue + return e.internal_metadata.outlier = True try: @@ -557,6 +586,8 @@ class FederationHandler(BaseHandler): e.event_id, ) + yield defer.DeferredList([handle_state(e) for e in state]) + auth_ids = [e_id for e_id, _ in event.auth_events] auth_events = { (e.type, e.state_key): e for e in auth_chain From 62ccc6d95f0ce1f9f23d7458b2d7f950360b8fb9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 May 2015 11:58:04 +0100 Subject: [PATCH 02/10] Don't reuse var names --- synapse/crypto/keyring.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index c626f78f4b..1f24e58ba0 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -144,8 +144,8 @@ class Keyring(object): ) perspective_results = yield defer.gatherResults([ - get_key(name, keys) - for name, keys in self.perspective_servers.items() + get_key(p_name, p_keys) + for p_name, p_keys in self.perspective_servers.items() ]) for results in perspective_results: From 350b88656ab7a4ff871dfebc85b9db6d294a4295 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 May 2015 13:01:57 +0100 Subject: [PATCH 03/10] SYN-383: Actually, we expect this value to be a dict --- synapse/crypto/keyring.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index a061def16a..2a5a8914c0 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -312,8 +312,9 @@ class Keyring(object): time_now_ms = self.clock.time_msec() response_keys = {} verify_keys = {} - for key_id, key_base64 in response_json["verify_keys"].items(): + for key_id, key_data in response_json["verify_keys"].items(): if is_signing_algorithm_supported(key_id): + key_base64 = key_data["key"] key_bytes = decode_base64(key_base64) verify_key = decode_verify_key_bytes(key_id, key_bytes) verify_key.time_added = time_now_ms From 5ae4a84211e4ca0247ab3bca77b159f843d6ead2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 May 2015 13:43:34 +0100 Subject: [PATCH 04/10] Don't always hit get_server_verify_key_v1_direct --- synapse/crypto/keyring.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 2a5a8914c0..35f9ac3517 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -164,12 +164,17 @@ class Keyring(object): keys = yield self.get_server_verify_key_v2_direct( server_name, key_ids ) - except: - pass + except Exception as e: + logging.info( + "Unable to getting key %r for %r directly: %s %s", + key_ids, server_name, + type(e).__name__, str(e.message), + ) - keys = yield self.get_server_verify_key_v1_direct( - server_name, key_ids - ) + if keys is None: + keys = yield self.get_server_verify_key_v1_direct( + server_name, key_ids + ) for key_id in key_ids: if key_id in keys: From 722312991694846045fae31ec6e0cbbbc59c6a33 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 May 2015 14:15:05 +0100 Subject: [PATCH 05/10] Don't apply new room join hack if depth > 5 --- synapse/handlers/federation.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 03f8444ad5..d85b1cf5de 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -924,9 +924,12 @@ class FederationHandler(BaseHandler): # This is a hack to fix some old rooms where the initial join event # didn't reference the create event in its auth events. if event.type == EventTypes.Member and not event.auth_events: - if len(event.prev_events) == 1: - c = yield self.store.get_event(event.prev_events[0][0]) - if c.type == EventTypes.Create: + if len(event.prev_events) == 1 and event.depth < 5: + c = yield self.store.get_event( + event.prev_events[0][0], + allow_none=True, + ) + if c and c.type == EventTypes.Create: auth_events[(c.type, c.state_key)] = c try: From 6837c5edab94df93addc85d1900011ba2182e0f1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 May 2015 14:27:11 +0100 Subject: [PATCH 06/10] Handle the case when things return empty but non none things --- synapse/crypto/keyring.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 35f9ac3517..aff69c5f83 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -159,7 +159,7 @@ class Keyring(object): ) with limiter: - if keys is None: + if not keys: try: keys = yield self.get_server_verify_key_v2_direct( server_name, key_ids @@ -171,7 +171,7 @@ class Keyring(object): type(e).__name__, str(e.message), ) - if keys is None: + if not keys: keys = yield self.get_server_verify_key_v1_direct( server_name, key_ids ) From 5b1631a4a9ad4c1ed0adaff3ffc8238014359e95 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 May 2015 14:53:32 +0100 Subject: [PATCH 07/10] Add a timeout param to get_event --- synapse/federation/federation_base.py | 1 + synapse/federation/federation_client.py | 23 ++++++++++++++--------- synapse/federation/transport/client.py | 4 ++-- synapse/http/matrixfederationclient.py | 13 ++++++++----- 4 files changed, 25 insertions(+), 16 deletions(-) diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 5217d91aab..f0430b2cb1 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -80,6 +80,7 @@ class FederationBase(object): destinations=[pdu.origin], event_id=pdu.event_id, outlier=outlier, + timeout=10000, ) if new_pdu: diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 904c7c0945..a163b2674d 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -22,6 +22,7 @@ from .units import Edu from synapse.api.errors import ( CodeMessageException, HttpResponseException, SynapseError, ) +from synapse.util import unwrapFirstError from synapse.util.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.events import FrozenEvent @@ -173,7 +174,7 @@ class FederationClient(FederationBase): @defer.inlineCallbacks @log_function - def get_pdu(self, destinations, event_id, outlier=False): + def get_pdu(self, destinations, event_id, outlier=False, timeout=None): """Requests the PDU with given origin and ID from the remote home servers. @@ -212,7 +213,7 @@ class FederationClient(FederationBase): with limiter: transaction_data = yield self.transport_layer.get_event( - destination, event_id + destination, event_id, timeout=timeout, ) logger.debug("transaction_data %r", transaction_data) @@ -370,13 +371,17 @@ class FederationClient(FederationBase): for p in content.get("auth_chain", []) ] - signed_state = yield self._check_sigs_and_hash_and_fetch( - destination, state, outlier=True - ) - - signed_auth = yield self._check_sigs_and_hash_and_fetch( - destination, auth_chain, outlier=True - ) + signed_state, signed_auth = yield defer.gatherResults( + [ + self._check_sigs_and_hash_and_fetch( + destination, state, outlier=True + ), + self._check_sigs_and_hash_and_fetch( + destination, auth_chain, outlier=True + ) + ], + consumeErrors=True + ).addErrback(unwrapFirstError) auth_chain.sort(key=lambda e: e.depth) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 80d03012b7..c2b53b78b2 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -50,7 +50,7 @@ class TransportLayerClient(object): ) @log_function - def get_event(self, destination, event_id): + def get_event(self, destination, event_id, timeout=None): """ Requests the pdu with give id and origin from the given server. Args: @@ -65,7 +65,7 @@ class TransportLayerClient(object): destination, event_id) path = PREFIX + "/event/%s/" % (event_id, ) - return self.client.get_json(destination, path=path) + return self.client.get_json(destination, path=path, timeout=timeout) @log_function def backfill(self, destination, room_id, event_tuples, limit): diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index c99d237c73..312bbcc6b8 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -110,7 +110,8 @@ class MatrixFederationHttpClient(object): @defer.inlineCallbacks def _create_request(self, destination, method, path_bytes, body_callback, headers_dict={}, param_bytes=b"", - query_bytes=b"", retry_on_dns_fail=True): + query_bytes=b"", retry_on_dns_fail=True, + timeout=None): """ Creates and sends a request to the given url """ headers_dict[b"User-Agent"] = [self.version_string] @@ -158,7 +159,7 @@ class MatrixFederationHttpClient(object): response = yield self.clock.time_bound_deferred( request_deferred, - time_out=60, + time_out=timeout/1000. if timeout else 60, ) logger.debug("Got response to %s", method) @@ -181,7 +182,7 @@ class MatrixFederationHttpClient(object): _flatten_response_never_received(e), ) - if retries_left: + if retries_left and not timeout: yield sleep(2 ** (5 - retries_left)) retries_left -= 1 else: @@ -334,7 +335,8 @@ class MatrixFederationHttpClient(object): defer.returnValue(json.loads(body)) @defer.inlineCallbacks - def get_json(self, destination, path, args={}, retry_on_dns_fail=True): + def get_json(self, destination, path, args={}, retry_on_dns_fail=True, + timeout=None): """ GETs some json from the given host homeserver and path Args: @@ -370,7 +372,8 @@ class MatrixFederationHttpClient(object): path.encode("ascii"), query_bytes=query_bytes, body_callback=body_callback, - retry_on_dns_fail=retry_on_dns_fail + retry_on_dns_fail=retry_on_dns_fail, + timeout=timeout, ) if 200 <= response.code < 300: From aa729349ddf23c0abb5096581a317783a8f60aa6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 May 2015 15:27:00 +0100 Subject: [PATCH 08/10] Fix event_backwards_extrem insertion to ignore outliers --- synapse/storage/event_federation.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index a1982dfbb5..2880850506 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -337,12 +337,13 @@ class EventFederationStore(SQLBaseStore): " WHERE event_id = ? AND room_id = ?" " )" " AND NOT EXISTS (" - " SELECT 1 FROM events WHERE event_id = ? AND room_id = ?" + " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? " + " AND outlier = ?" " )" ) txn.executemany(query, [ - (e_id, room_id, e_id, room_id, e_id, room_id, ) + (e_id, room_id, e_id, room_id, e_id, room_id, False) for e_id, _ in prev_events ]) From 3a653515ec3bba9d5b143e37bc9569d5caa50a5b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 May 2015 15:27:09 +0100 Subject: [PATCH 09/10] Add None check --- synapse/federation/federation_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index a163b2674d..4b3bf97835 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -523,7 +523,7 @@ class FederationClient(FederationBase): # Are we missing any? seen_events = set(earliest_events_ids) - seen_events.update(e.event_id for e in signed_events) + seen_events.update(e.event_id for e in signed_events if e) missing_events = {} for e in itertools.chain(latest_events, signed_events): From 284f55a7fbf597e32508301fe9571cd1b8523625 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 May 2015 15:18:04 +0100 Subject: [PATCH 10/10] Add doc strings --- synapse/federation/federation_client.py | 2 ++ synapse/federation/transport/client.py | 2 ++ synapse/http/matrixfederationclient.py | 3 +++ 3 files changed, 7 insertions(+) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index ecb6dbd770..3249060bcf 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -190,6 +190,8 @@ class FederationClient(FederationBase): outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if it's from an arbitary point in the context as opposed to part of the current block of PDUs. Defaults to `False` + timeout (int): How long to try (in ms) each destination for before + moving to the next destination. None indicates no timeout. Returns: Deferred: Results in the requested PDU. diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index c2b53b78b2..610a4c3163 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -57,6 +57,8 @@ class TransportLayerClient(object): destination (str): The host name of the remote home server we want to get the state from. event_id (str): The id of the event being requested. + timeout (int): How long to try (in ms) the destination for before + giving up. None indicates no timeout. Returns: Deferred: Results in a dict received from the remote homeserver. diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 312bbcc6b8..6f976d5ce8 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -345,6 +345,9 @@ class MatrixFederationHttpClient(object): path (str): The HTTP path. args (dict): A dictionary used to create query strings, defaults to None. + timeout (int): How long to try (in ms) the destination for before + giving up. None indicates no timeout and that the request will + be retried. Returns: Deferred: Succeeds when we get *any* HTTP response.