From f18373dc5d6c5431bbf79760818b6ebc3467c7ba Mon Sep 17 00:00:00 2001 From: Kenny Keslar Date: Wed, 26 Jul 2017 22:44:19 -0500 Subject: [PATCH 01/53] Fix iteration of requests_missing_keys; list doesn't have .values() Signed-off-by: Kenny Keslar --- synapse/crypto/keyring.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 1bb27edc0f..c900f4d6df 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -305,7 +305,7 @@ class Keyring(object): if not missing_keys: break - for verify_request in requests_missing_keys.values(): + for verify_request in requests_missing_keys: verify_request.deferred.errback(SynapseError( 401, "No key for %s with id %s" % ( From 5ed109d59f46c5185395f7c76050274fdd6abc15 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 19 Sep 2017 12:20:11 +0100 Subject: [PATCH 02/53] PoC for filtering spammy events (#2456) Demonstration of how you might add some hooks to filter out spammy events. --- synapse/events/spamcheck.py | 38 ++++++++++++++++++++++++ synapse/federation/federation_base.py | 42 ++++++++++++++++----------- synapse/handlers/message.py | 8 ++++- 3 files changed, 70 insertions(+), 18 deletions(-) create mode 100644 synapse/events/spamcheck.py diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py new file mode 100644 index 0000000000..3eb4eab26a --- /dev/null +++ b/synapse/events/spamcheck.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 New Vector Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def check_event_for_spam(event): + """Checks if a given event is considered "spammy" by this server. + + If the server considers an event spammy, then it will be rejected if + sent by a local user. If it is sent by a user on another server, then + users + + Args: + event (synapse.events.EventBase): the event to be checked + + Returns: + bool: True if the event is spammy. + """ + if not hasattr(event, "content") or "body" not in event.content: + return False + + # for example: + # + # if "the third flower is green" in event.content["body"]: + # return True + + return False diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 2339cc9034..28eaab2cef 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -12,21 +12,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - -from twisted.internet import defer - -from synapse.events.utils import prune_event - -from synapse.crypto.event_signing import check_event_content_hash - -from synapse.api.errors import SynapseError - -from synapse.util import unwrapFirstError -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred - import logging +from synapse.api.errors import SynapseError +from synapse.crypto.event_signing import check_event_content_hash +from synapse.events import spamcheck +from synapse.events.utils import prune_event +from synapse.util import unwrapFirstError +from synapse.util.logcontext import preserve_context_over_deferred, preserve_fn +from twisted.internet import defer logger = logging.getLogger(__name__) @@ -117,12 +111,18 @@ class FederationBase(object): return self._check_sigs_and_hashes([pdu])[0] def _check_sigs_and_hashes(self, pdus): - """Throws a SynapseError if a PDU does not have the correct - signatures. + """Checks that each of the received events is correctly signed by the + sending server. + + Args: + pdus (list[FrozenEvent]): the events to be checked Returns: - FrozenEvent: Either the given event or it redacted if it failed the - content hash check. + list[Deferred]: for each input event, a deferred which: + * returns the original event if the checks pass + * returns a redacted version of the event (if the signature + matched but the hash did not) + * throws a SynapseError if the signature check failed. """ redacted_pdus = [ @@ -142,6 +142,14 @@ class FederationBase(object): pdu.event_id, pdu.get_pdu_json() ) return redacted + + if spamcheck.check_event_for_spam(pdu): + logger.warn( + "Event contains spam, redacting %s: %s", + pdu.event_id, pdu.get_pdu_json() + ) + return redacted + return pdu def errback(failure, pdu): diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index be4f123c54..da18bf23db 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +from synapse.events import spamcheck from twisted.internet import defer from synapse.api.constants import EventTypes, Membership @@ -321,6 +321,12 @@ class MessageHandler(BaseHandler): token_id=requester.access_token_id, txn_id=txn_id ) + + if spamcheck.check_event_for_spam(event): + raise SynapseError( + 403, "Spam is not permitted here", Codes.FORBIDDEN + ) + yield self.send_nonmember_event( requester, event, From 2eabdf3f9860c78598d026574807da463bf40f2e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 19 Sep 2017 12:18:01 +0100 Subject: [PATCH 03/53] add some comments to on_exchange_third_party_invite_request --- synapse/handlers/federation.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 4669199b2d..2637f41dcd 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2090,6 +2090,14 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function def on_exchange_third_party_invite_request(self, origin, room_id, event_dict): + """Handle an exchange_third_party_invite request from a remote server + + The remote server will call this when it wants to turn a 3pid invite + into a normal m.room.member invite. + + Returns: + Deferred: resolves (to None) + """ builder = self.event_builder_factory.new(event_dict) message_handler = self.hs.get_handlers().message_handler @@ -2108,9 +2116,12 @@ class FederationHandler(BaseHandler): raise e yield self._check_signature(event, context) + # XXX we send the invite here, but send_membership_event also sends it, + # so we end up making two requests. I think this is redundant. returned_invite = yield self.send_invite(origin, event) # TODO: Make sure the signatures actually are correct. event.signatures.update(returned_invite.signatures) + member_handler = self.hs.get_handlers().room_member_handler yield member_handler.send_membership_event(None, event, context) From aa620d09a01c226d7a6fbc0d839d8abd347a2b2e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 19 Sep 2017 16:08:14 +0100 Subject: [PATCH 04/53] Add a config option to block all room invites (#2457) - allows sysadmins the ability to lock down their servers so that people can't send their users room invites. --- synapse/api/auth.py | 8 ++++++++ synapse/config/server.py | 10 ++++++++++ synapse/handlers/federation.py | 3 +++ synapse/handlers/room_member.py | 22 ++++++++++++++++++++++ tests/utils.py | 1 + 5 files changed, 44 insertions(+) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index e3da45b416..72858cca1f 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -519,6 +519,14 @@ class Auth(object): ) def is_server_admin(self, user): + """ Check if the given user is a local server admin. + + Args: + user (str): mxid of user to check + + Returns: + bool: True if the user is an admin + """ return self.store.is_server_admin(user) @defer.inlineCallbacks diff --git a/synapse/config/server.py b/synapse/config/server.py index 89d61a0503..c9a1715f1f 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -43,6 +43,12 @@ class ServerConfig(Config): self.filter_timeline_limit = config.get("filter_timeline_limit", -1) + # Whether we should block invites sent to users on this server + # (other than those sent by local server admins) + self.block_non_admin_invites = config.get( + "block_non_admin_invites", False, + ) + if self.public_baseurl is not None: if self.public_baseurl[-1] != '/': self.public_baseurl += '/' @@ -194,6 +200,10 @@ class ServerConfig(Config): # and sync operations. The default value is -1, means no upper limit. # filter_timeline_limit: 5000 + # Whether room invites to users on this server should be blocked + # (except those sent by local server admins). The default is False. + # block_non_admin_invites: True + # List of ports that Synapse should listen on, their purpose and their # configuration. listeners: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2637f41dcd..18f87cad67 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1074,6 +1074,9 @@ class FederationHandler(BaseHandler): if is_blocked: raise SynapseError(403, "This room has been blocked on this server") + if self.hs.config.block_non_admin_invites: + raise SynapseError(403, "This server does not accept room invites") + membership = event.content.get("membership") if event.type != EventTypes.Member or membership != Membership.INVITE: raise SynapseError(400, "The event was not an m.room.member invite event") diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index b3f979b246..9a498c2d3e 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -191,6 +191,8 @@ class RoomMemberHandler(BaseHandler): if action in ["kick", "unban"]: effective_membership_state = "leave" + # if this is a join with a 3pid signature, we may need to turn a 3pid + # invite into a normal invite before we can handle the join. if third_party_signed is not None: replication = self.hs.get_replication_layer() yield replication.exchange_third_party_invite( @@ -208,6 +210,16 @@ class RoomMemberHandler(BaseHandler): if is_blocked: raise SynapseError(403, "This room has been blocked on this server") + if (effective_membership_state == "invite" and + self.hs.config.block_non_admin_invites): + is_requester_admin = yield self.auth.is_server_admin( + requester.user, + ) + if not is_requester_admin: + raise SynapseError( + 403, "Invites have been disabled on this server", + ) + latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) current_state_ids = yield self.state_handler.get_current_state_ids( room_id, latest_event_ids=latest_event_ids, @@ -471,6 +483,16 @@ class RoomMemberHandler(BaseHandler): requester, txn_id ): + if self.hs.config.block_non_admin_invites: + is_requester_admin = yield self.auth.is_server_admin( + requester.user, + ) + if not is_requester_admin: + raise SynapseError( + 403, "Invites have been disabled on this server", + Codes.FORBIDDEN, + ) + invitee = yield self._lookup_3pid( id_server, medium, address ) diff --git a/tests/utils.py b/tests/utils.py index 4f7e32b3ab..3c81a3e16d 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -56,6 +56,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): config.worker_replication_url = "" config.worker_app = None config.email_enable_notifs = False + config.block_non_admin_invites = False config.use_frozen_dicts = True config.database_config = {"name": "sqlite3"} From 9864efa5321ad5afa522d9ecb3eb48e1f50fb852 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 19 Sep 2017 23:25:44 +0100 Subject: [PATCH 05/53] Fix concurrent server_key requests (#2458) Fix a bug where we could end up firing off multiple requests for server_keys for the same server at the same time. --- synapse/crypto/keyring.py | 4 ++- tests/crypto/test_keyring.py | 58 ++++++++++++++++++++++++++++++++++-- 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 51851d04e5..ebf4e2e7a6 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -201,7 +201,9 @@ class Keyring(object): server_name = verify_request.server_name request_id = id(verify_request) server_to_request_ids.setdefault(server_name, set()).add(request_id) - deferred.addBoth(remove_deferreds, server_name, verify_request) + verify_request.deferred.addBoth( + remove_deferreds, server_name, verify_request, + ) # Pass those keys to handle_key_deferred so that the json object # signatures can be verified diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py index da2c9e44e7..2e5878f087 100644 --- a/tests/crypto/test_keyring.py +++ b/tests/crypto/test_keyring.py @@ -12,17 +12,27 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import signedjson +from mock import Mock +from synapse.api.errors import SynapseError from synapse.crypto import keyring +from synapse.util import async from synapse.util.logcontext import LoggingContext -from tests import utils, unittest +from tests import unittest, utils from twisted.internet import defer class KeyringTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): - self.hs = yield utils.setup_test_homeserver(handlers=None) + self.http_client = Mock() + self.hs = yield utils.setup_test_homeserver( + handlers=None, + http_client=self.http_client, + ) + self.hs.config.perspectives = { + "persp_server": {"k": "v"} + } @defer.inlineCallbacks def test_wait_for_previous_lookups(self): @@ -72,3 +82,45 @@ class KeyringTestCase(unittest.TestCase): # now the second wait should complete and restore our # loggingcontext. yield wait_2_deferred + + @defer.inlineCallbacks + def test_verify_json_objects_for_server_awaits_previous_requests(self): + key1 = signedjson.key.generate_signing_key(1) + + kr = keyring.Keyring(self.hs) + json1 = {} + signedjson.sign.sign_json(json1, "server1", key1) + + self.http_client.post_json.return_value = defer.Deferred() + + # start off a first set of lookups + res_deferreds = kr.verify_json_objects_for_server( + [("server1", json1), + ("server2", {}) + ] + ) + + # the unsigned json should be rejected pretty quickly + try: + yield res_deferreds[1] + self.assertFalse("unsigned json didn't cause a failure") + except SynapseError: + pass + + self.assertFalse(res_deferreds[0].called) + + # wait a tick for it to send the request to the perspectives server + # (it first tries the datastore) + yield async.sleep(0.005) + self.http_client.post_json.assert_called_once() + + # a second request for a server with outstanding requests should + # block rather than start a second call + self.http_client.post_json.reset_mock() + self.http_client.post_json.return_value = defer.Deferred() + + kr.verify_json_objects_for_server( + [("server1", json1)], + ) + yield async.sleep(0.005) + self.http_client.post_json.assert_not_called() From fcf2c0fd1aa4d85df0bdb43bc8411ad4ad988a6f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 20 Sep 2017 01:32:42 +0100 Subject: [PATCH 06/53] Remove redundant `preserve_fn` preserve_fn is a no-op unless the wrapped function returns a Deferred. verify_json_objects_for_server returns a list, so this is doing nothing. --- synapse/federation/federation_base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 28eaab2cef..cabed33f74 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -19,7 +19,7 @@ from synapse.crypto.event_signing import check_event_content_hash from synapse.events import spamcheck from synapse.events.utils import prune_event from synapse.util import unwrapFirstError -from synapse.util.logcontext import preserve_context_over_deferred, preserve_fn +from synapse.util.logcontext import preserve_context_over_deferred from twisted.internet import defer logger = logging.getLogger(__name__) @@ -130,7 +130,7 @@ class FederationBase(object): for pdu in pdus ] - deferreds = preserve_fn(self.keyring.verify_json_objects_for_server)([ + deferreds = self.keyring.verify_json_objects_for_server([ (p.origin, p.get_pdu_json()) for p in redacted_pdus ]) From e76d1135dd26305e0ff4c5d8e41b9dff204d72cf Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 20 Sep 2017 01:32:42 +0100 Subject: [PATCH 07/53] Invalidate signing key cache when we gat an update This might make the cache slightly more efficient. --- synapse/storage/keys.py | 41 ++++++++++++++++++++++++----------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index 3b5e0a4fb9..87aeaf71d6 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -113,30 +113,37 @@ class KeyStore(SQLBaseStore): keys[key_id] = key defer.returnValue(keys) - @defer.inlineCallbacks def store_server_verify_key(self, server_name, from_server, time_now_ms, verify_key): """Stores a NACL verification key for the given server. Args: server_name (str): The name of the server. - key_id (str): The version of the key for the server. from_server (str): Where the verification key was looked up - ts_now_ms (int): The time now in milliseconds - verification_key (VerifyKey): The NACL verify key. + time_now_ms (int): The time now in milliseconds + verify_key (nacl.signing.VerifyKey): The NACL verify key. """ - yield self._simple_upsert( - table="server_signature_keys", - keyvalues={ - "server_name": server_name, - "key_id": "%s:%s" % (verify_key.alg, verify_key.version), - }, - values={ - "from_server": from_server, - "ts_added_ms": time_now_ms, - "verify_key": buffer(verify_key.encode()), - }, - desc="store_server_verify_key", - ) + key_id = "%s:%s" % (verify_key.alg, verify_key.version) + + def _txn(txn): + self._simple_upsert_txn( + txn, + table="server_signature_keys", + keyvalues={ + "server_name": server_name, + "key_id": key_id, + }, + values={ + "from_server": from_server, + "ts_added_ms": time_now_ms, + "verify_key": buffer(verify_key.encode()), + }, + ) + txn.call_after( + self._get_server_verify_key.invalidate, + (server_name, key_id) + ) + + return self.runInteraction("store_server_verify_key", _txn) def store_server_keys_json(self, server_name, key_id, from_server, ts_now_ms, ts_expires_ms, key_json_bytes): From dd1ea9763a79f49403964667114a60f71ac1f0bf Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 20 Sep 2017 01:32:42 +0100 Subject: [PATCH 08/53] Fix incorrect key_ids in error message --- synapse/crypto/keyring.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index ebf4e2e7a6..7d142c1b96 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -144,7 +144,7 @@ class Keyring(object): ) raise SynapseError( 401, - "No key for %s with id %s" % (server_name, key_ids), + "No key for %s with id %s" % (server_name, verify_request.key_ids), Codes.UNAUTHORIZED, ) From 2d511defd9aa85b56222381efedc63c9f6045087 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 20 Sep 2017 01:32:42 +0100 Subject: [PATCH 09/53] pull out handle_key_deferred to top level There's no need for this to be a nested definition; pulling it out not only makes it more efficient, but makes it easier to check that it's not accessing any local variables it shouldn't be. --- synapse/crypto/keyring.py | 87 ++++++++++++++++++++------------------- 1 file changed, 44 insertions(+), 43 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 7d142c1b96..0033ba06ba 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -122,48 +122,6 @@ class Keyring(object): verify_requests.append(verify_request) - @defer.inlineCallbacks - def handle_key_deferred(verify_request): - server_name = verify_request.server_name - try: - _, key_id, verify_key = yield verify_request.deferred - except IOError as e: - logger.warn( - "Got IOError when downloading keys for %s: %s %s", - server_name, type(e).__name__, str(e.message), - ) - raise SynapseError( - 502, - "Error downloading keys for %s" % (server_name,), - Codes.UNAUTHORIZED, - ) - except Exception as e: - logger.exception( - "Got Exception when downloading keys for %s: %s %s", - server_name, type(e).__name__, str(e.message), - ) - raise SynapseError( - 401, - "No key for %s with id %s" % (server_name, verify_request.key_ids), - Codes.UNAUTHORIZED, - ) - - json_object = verify_request.json_object - - logger.debug("Got key %s %s:%s for server %s, verifying" % ( - key_id, verify_key.alg, verify_key.version, server_name, - )) - try: - verify_signed_json(json_object, server_name, verify_key) - except: - raise SynapseError( - 401, - "Invalid signature for server %s with key %s:%s" % ( - server_name, verify_key.alg, verify_key.version - ), - Codes.UNAUTHORIZED, - ) - server_to_deferred = { server_name: defer.Deferred() for server_name, _ in server_and_json @@ -208,7 +166,7 @@ class Keyring(object): # Pass those keys to handle_key_deferred so that the json object # signatures can be verified return [ - preserve_context_over_fn(handle_key_deferred, verify_request) + preserve_context_over_fn(_handle_key_deferred, verify_request) for verify_request in verify_requests ] @@ -740,3 +698,46 @@ class Keyring(object): ], consumeErrors=True, ).addErrback(unwrapFirstError)) + + +@defer.inlineCallbacks +def _handle_key_deferred(verify_request): + server_name = verify_request.server_name + try: + _, key_id, verify_key = yield verify_request.deferred + except IOError as e: + logger.warn( + "Got IOError when downloading keys for %s: %s %s", + server_name, type(e).__name__, str(e.message), + ) + raise SynapseError( + 502, + "Error downloading keys for %s" % (server_name,), + Codes.UNAUTHORIZED, + ) + except Exception as e: + logger.exception( + "Got Exception when downloading keys for %s: %s %s", + server_name, type(e).__name__, str(e.message), + ) + raise SynapseError( + 401, + "No key for %s with id %s" % (server_name, verify_request.key_ids), + Codes.UNAUTHORIZED, + ) + + json_object = verify_request.json_object + + logger.debug("Got key %s %s:%s for server %s, verifying" % ( + key_id, verify_key.alg, verify_key.version, server_name, + )) + try: + verify_signed_json(json_object, server_name, verify_key) + except: + raise SynapseError( + 401, + "Invalid signature for server %s with key %s:%s" % ( + server_name, verify_key.alg, verify_key.version + ), + Codes.UNAUTHORIZED, + ) From fde63b880d32937b52a80815a08342449d9c4842 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 20 Sep 2017 01:32:42 +0100 Subject: [PATCH 10/53] Replace `server_and_json` with `verify_requests` This is a precursor to factoring some of this code out. --- synapse/crypto/keyring.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 0033ba06ba..32b107b17d 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -123,8 +123,8 @@ class Keyring(object): verify_requests.append(verify_request) server_to_deferred = { - server_name: defer.Deferred() - for server_name, _ in server_and_json + rq.server_name: defer.Deferred() + for rq in verify_requests } with PreserveLoggingContext(): @@ -132,7 +132,7 @@ class Keyring(object): # We want to wait for any previous lookups to complete before # proceeding. wait_on_deferred = self.wait_for_previous_lookups( - [server_name for server_name, _ in server_and_json], + [rq.server_name for rq in verify_requests], server_to_deferred, ) From 3b98439ecaab4707c2224d7912b3f4513c2af8b7 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 20 Sep 2017 01:32:42 +0100 Subject: [PATCH 11/53] Factor out _start_key_lookups ... to make it easier to see what's going on. --- synapse/crypto/keyring.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 32b107b17d..105de2b58b 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -122,6 +122,23 @@ class Keyring(object): verify_requests.append(verify_request) + self._start_key_lookups(verify_requests) + + # Pass those keys to handle_key_deferred so that the json object + # signatures can be verified + return [ + preserve_context_over_fn(_handle_key_deferred, rq) + for rq in verify_requests + ] + + def _start_key_lookups(self, verify_requests): + """Sets off the key fetches for each verify request + + Once each fetch completes, verify_request.deferred will be resolved. + + Args: + verify_requests (List[VerifyKeyRequest]): + """ server_to_deferred = { rq.server_name: defer.Deferred() for rq in verify_requests @@ -163,13 +180,6 @@ class Keyring(object): remove_deferreds, server_name, verify_request, ) - # Pass those keys to handle_key_deferred so that the json object - # signatures can be verified - return [ - preserve_context_over_fn(_handle_key_deferred, verify_request) - for verify_request in verify_requests - ] - @defer.inlineCallbacks def wait_for_previous_lookups(self, server_names, server_to_deferred): """Waits for any previous key lookups for the given servers to finish. From 2a4b9ea233cfffa556fa63a37cffb24bfe133d82 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 20 Sep 2017 01:32:42 +0100 Subject: [PATCH 12/53] Consistency for how verify_request.deferred is called Define that it is run with no log context, and make sure that happens. If we aren't careful to reset the logcontext, we can't bung the deferreds into defer.gatherResults etc. We don't actually do that directly, but we *do* resolve other deferreds from affected callbacks (notably the server_to_deferred map in _start_key_lookups), and those *do* get passed into defer.gatherResults. It turns out that this way ends up being least confusing. --- synapse/crypto/keyring.py | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 105de2b58b..22bb325cfd 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -57,7 +57,8 @@ Attributes: json_object(dict): The JSON object to verify. deferred(twisted.internet.defer.Deferred): A deferred (server_name, key_id, verify_key) tuple that resolves when - a verify key has been fetched + a verify key has been fetched. The deferreds' callbacks are run with no + logcontext. """ @@ -284,19 +285,21 @@ class Keyring(object): if not missing_keys: break - for verify_request in requests_missing_keys.values(): - verify_request.deferred.errback(SynapseError( - 401, - "No key for %s with id %s" % ( - verify_request.server_name, verify_request.key_ids, - ), - Codes.UNAUTHORIZED, - )) + with PreserveLoggingContext(): + for verify_request in requests_missing_keys.values(): + verify_request.deferred.errback(SynapseError( + 401, + "No key for %s with id %s" % ( + verify_request.server_name, verify_request.key_ids, + ), + Codes.UNAUTHORIZED, + )) def on_err(err): - for verify_request in verify_requests: - if not verify_request.deferred.called: - verify_request.deferred.errback(err) + with PreserveLoggingContext(): + for verify_request in verify_requests: + if not verify_request.deferred.called: + verify_request.deferred.errback(err) do_iterations().addErrback(on_err) @@ -714,7 +717,8 @@ class Keyring(object): def _handle_key_deferred(verify_request): server_name = verify_request.server_name try: - _, key_id, verify_key = yield verify_request.deferred + with PreserveLoggingContext(): + _, key_id, verify_key = yield verify_request.deferred except IOError as e: logger.warn( "Got IOError when downloading keys for %s: %s %s", From afbd773dc66d43d066d5a0b4639075a2d09cb4e5 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 20 Sep 2017 01:32:42 +0100 Subject: [PATCH 13/53] Add some comments to _start_key_lookups --- synapse/crypto/keyring.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 22bb325cfd..d7fd831bf9 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -140,6 +140,12 @@ class Keyring(object): Args: verify_requests (List[VerifyKeyRequest]): """ + + # create a deferred for each server we're going to look up the keys + # for; we'll resolve them once we have completed our lookups. + # These will be passed into wait_for_previous_lookups to block + # any other lookups until we have finished. + # The deferreds are called with no logcontext. server_to_deferred = { rq.server_name: defer.Deferred() for rq in verify_requests @@ -162,6 +168,8 @@ class Keyring(object): # When we've finished fetching all the keys for a given server_name, # resolve the deferred passed to `wait_for_previous_lookups` so that # any lookups waiting will proceed. + # + # map from server name to a set of request ids server_to_request_ids = {} def remove_deferreds(res, server_name, verify_request): From abdefb8a01bf67b3055e9fbe52bb11a02ffd8d9a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 20 Sep 2017 01:32:42 +0100 Subject: [PATCH 14/53] Fix potential race in _start_key_lookups If the verify_request.deferred has already completed, then `remove_deferreds` will be called immediately. It therefore might resolve the server_to_deferred deferred while there are still other requests for that server in flight. To avoid that, we should build the complete list of requests, and *then* add the callbacks. --- synapse/crypto/keyring.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index d7fd831bf9..0e381c4710 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -172,7 +172,13 @@ class Keyring(object): # map from server name to a set of request ids server_to_request_ids = {} - def remove_deferreds(res, server_name, verify_request): + for verify_request in verify_requests: + server_name = verify_request.server_name + request_id = id(verify_request) + server_to_request_ids.setdefault(server_name, set()).add(request_id) + + def remove_deferreds(res, verify_request): + server_name = verify_request.server_name request_id = id(verify_request) server_to_request_ids[server_name].discard(request_id) if not server_to_request_ids[server_name]: @@ -182,11 +188,8 @@ class Keyring(object): return res for verify_request in verify_requests: - server_name = verify_request.server_name - request_id = id(verify_request) - server_to_request_ids.setdefault(server_name, set()).add(request_id) verify_request.deferred.addBoth( - remove_deferreds, server_name, verify_request, + remove_deferreds, verify_request, ) @defer.inlineCallbacks From c5b0e9f48542516a4fa82247c81e499894340cf5 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 20 Sep 2017 01:32:42 +0100 Subject: [PATCH 15/53] Turn _start_key_lookups into an inlineCallbacks function ... which means that logcontexts can be correctly preserved for the stuff it does. get_server_verify_keys is now called with the logcontext, so needs to preserve_fn when it fires off its nested inlineCallbacks function. Also renames get_server_verify_keys to reflect the fact it's meant to be private. --- synapse/crypto/keyring.py | 79 +++++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 41 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 0e381c4710..7e4cef13c1 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -123,7 +123,7 @@ class Keyring(object): verify_requests.append(verify_request) - self._start_key_lookups(verify_requests) + preserve_fn(self._start_key_lookups)(verify_requests) # Pass those keys to handle_key_deferred so that the json object # signatures can be verified @@ -132,6 +132,7 @@ class Keyring(object): for rq in verify_requests ] + @defer.inlineCallbacks def _start_key_lookups(self, verify_requests): """Sets off the key fetches for each verify request @@ -151,47 +152,43 @@ class Keyring(object): for rq in verify_requests } - with PreserveLoggingContext(): + # We want to wait for any previous lookups to complete before + # proceeding. + yield self.wait_for_previous_lookups( + [rq.server_name for rq in verify_requests], + server_to_deferred, + ) - # We want to wait for any previous lookups to complete before - # proceeding. - wait_on_deferred = self.wait_for_previous_lookups( - [rq.server_name for rq in verify_requests], - server_to_deferred, + # Actually start fetching keys. + self._get_server_verify_keys(verify_requests) + + # When we've finished fetching all the keys for a given server_name, + # resolve the deferred passed to `wait_for_previous_lookups` so that + # any lookups waiting will proceed. + # + # map from server name to a set of request ids + server_to_request_ids = {} + + for verify_request in verify_requests: + server_name = verify_request.server_name + request_id = id(verify_request) + server_to_request_ids.setdefault(server_name, set()).add(request_id) + + def remove_deferreds(res, verify_request): + server_name = verify_request.server_name + request_id = id(verify_request) + server_to_request_ids[server_name].discard(request_id) + if not server_to_request_ids[server_name]: + d = server_to_deferred.pop(server_name, None) + if d: + d.callback(None) + return res + + for verify_request in verify_requests: + verify_request.deferred.addBoth( + remove_deferreds, verify_request, ) - # Actually start fetching keys. - wait_on_deferred.addBoth( - lambda _: self.get_server_verify_keys(verify_requests) - ) - - # When we've finished fetching all the keys for a given server_name, - # resolve the deferred passed to `wait_for_previous_lookups` so that - # any lookups waiting will proceed. - # - # map from server name to a set of request ids - server_to_request_ids = {} - - for verify_request in verify_requests: - server_name = verify_request.server_name - request_id = id(verify_request) - server_to_request_ids.setdefault(server_name, set()).add(request_id) - - def remove_deferreds(res, verify_request): - server_name = verify_request.server_name - request_id = id(verify_request) - server_to_request_ids[server_name].discard(request_id) - if not server_to_request_ids[server_name]: - d = server_to_deferred.pop(server_name, None) - if d: - d.callback(None) - return res - - for verify_request in verify_requests: - verify_request.deferred.addBoth( - remove_deferreds, verify_request, - ) - @defer.inlineCallbacks def wait_for_previous_lookups(self, server_names, server_to_deferred): """Waits for any previous key lookups for the given servers to finish. @@ -227,7 +224,7 @@ class Keyring(object): self.key_downloads[server_name] = deferred deferred.addBoth(rm, server_name) - def get_server_verify_keys(self, verify_requests): + def _get_server_verify_keys(self, verify_requests): """Tries to find at least one key for each verify request For each verify_request, verify_request.deferred is called back with @@ -312,7 +309,7 @@ class Keyring(object): if not verify_request.deferred.called: verify_request.deferred.errback(err) - do_iterations().addErrback(on_err) + preserve_fn(do_iterations)().addErrback(on_err) @defer.inlineCallbacks def get_keys_from_store(self, server_name_and_key_ids): From c5c24c239b63d06a6e312d86c338da60cfcee814 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 20 Sep 2017 01:32:42 +0100 Subject: [PATCH 16/53] Fix logcontext handling in verify_json_objects_for_server preserve_context_over_fn is essentially broken, because (a) it pointlessly drops the current logcontext before calling its wrapped function, which means we don't get any useful logcontexts for _handle_key_deferred; (b) it wraps the resulting deferred in a _PreservingContextDeferred, which is very dangerous because you then can't yield on it without leaking context back into the reactor. Instead, let's specify that the resultant deferreds call their callbacks with no logcontext. --- synapse/crypto/keyring.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 7e4cef13c1..2a1d383078 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -18,7 +18,7 @@ from synapse.crypto.keyclient import fetch_server_key from synapse.api.errors import SynapseError, Codes from synapse.util import unwrapFirstError, logcontext from synapse.util.logcontext import ( - preserve_context_over_fn, PreserveLoggingContext, + PreserveLoggingContext, preserve_fn ) from synapse.util.metrics import Measure @@ -83,9 +83,11 @@ class Keyring(object): self.key_downloads = {} def verify_json_for_server(self, server_name, json_object): - return self.verify_json_objects_for_server( - [(server_name, json_object)] - )[0] + return logcontext.make_deferred_yieldable( + self.verify_json_objects_for_server( + [(server_name, json_object)] + )[0] + ) def verify_json_objects_for_server(self, server_and_json): """Bulk verifies signatures of json objects, bulk fetching keys as @@ -95,8 +97,10 @@ class Keyring(object): server_and_json (list): List of pairs of (server_name, json_object) Returns: - list of deferreds indicating success or failure to verify each - json object's signature for the given server_name. + List: for each input pair, a deferred indicating success + or failure to verify each json object's signature for the given + server_name. The deferreds run their callbacks in the sentinel + logcontext. """ verify_requests = [] @@ -127,9 +131,9 @@ class Keyring(object): # Pass those keys to handle_key_deferred so that the json object # signatures can be verified + handle = preserve_fn(_handle_key_deferred) return [ - preserve_context_over_fn(_handle_key_deferred, rq) - for rq in verify_requests + handle(rq) for rq in verify_requests ] @defer.inlineCallbacks From 72472456d82d956d957c4a68c23554f4b43eec54 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 20 Sep 2017 01:32:42 +0100 Subject: [PATCH 17/53] Add some more tests for Keyring --- tests/crypto/test_keyring.py | 177 +++++++++++++++++++++++++++-------- 1 file changed, 140 insertions(+), 37 deletions(-) diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py index 2e5878f087..570312da84 100644 --- a/tests/crypto/test_keyring.py +++ b/tests/crypto/test_keyring.py @@ -12,39 +12,72 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import signedjson +import time + +import signedjson.key +import signedjson.sign from mock import Mock from synapse.api.errors import SynapseError from synapse.crypto import keyring -from synapse.util import async +from synapse.util import async, logcontext from synapse.util.logcontext import LoggingContext from tests import unittest, utils from twisted.internet import defer +class MockPerspectiveServer(object): + def __init__(self): + self.server_name = "mock_server" + self.key = signedjson.key.generate_signing_key(0) + + def get_verify_keys(self): + vk = signedjson.key.get_verify_key(self.key) + return { + "%s:%s" % (vk.alg, vk.version): vk, + } + + def get_signed_key(self, server_name, verify_key): + key_id = "%s:%s" % (verify_key.alg, verify_key.version) + res = { + "server_name": server_name, + "old_verify_keys": {}, + "valid_until_ts": time.time() * 1000 + 3600, + "verify_keys": { + key_id: { + "key": signedjson.key.encode_verify_key_base64(verify_key) + } + } + } + signedjson.sign.sign_json(res, self.server_name, self.key) + return res + + class KeyringTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): + self.mock_perspective_server = MockPerspectiveServer() self.http_client = Mock() self.hs = yield utils.setup_test_homeserver( handlers=None, http_client=self.http_client, ) self.hs.config.perspectives = { - "persp_server": {"k": "v"} + self.mock_perspective_server.server_name: + self.mock_perspective_server.get_verify_keys() } + def check_context(self, _, expected): + self.assertEquals( + getattr(LoggingContext.current_context(), "test_key", None), + expected + ) + @defer.inlineCallbacks def test_wait_for_previous_lookups(self): sentinel_context = LoggingContext.current_context() kr = keyring.Keyring(self.hs) - def check_context(_, expected): - self.assertEquals( - LoggingContext.current_context().test_key, expected - ) - lookup_1_deferred = defer.Deferred() lookup_2_deferred = defer.Deferred() @@ -60,7 +93,7 @@ class KeyringTestCase(unittest.TestCase): self.assertTrue(wait_1_deferred.called) # ... so we should have preserved the LoggingContext. self.assertIs(LoggingContext.current_context(), context_one) - wait_1_deferred.addBoth(check_context, "one") + wait_1_deferred.addBoth(self.check_context, "one") with LoggingContext("two") as context_two: context_two.test_key = "two" @@ -74,7 +107,7 @@ class KeyringTestCase(unittest.TestCase): self.assertFalse(wait_2_deferred.called) # ... so we should have reset the LoggingContext. self.assertIs(LoggingContext.current_context(), sentinel_context) - wait_2_deferred.addBoth(check_context, "two") + wait_2_deferred.addBoth(self.check_context, "two") # let the first lookup complete (in the sentinel context) lookup_1_deferred.callback(None) @@ -89,38 +122,108 @@ class KeyringTestCase(unittest.TestCase): kr = keyring.Keyring(self.hs) json1 = {} - signedjson.sign.sign_json(json1, "server1", key1) + signedjson.sign.sign_json(json1, "server10", key1) - self.http_client.post_json.return_value = defer.Deferred() + persp_resp = { + "server_keys": [ + self.mock_perspective_server.get_signed_key( + "server10", + signedjson.key.get_verify_key(key1) + ), + ] + } + persp_deferred = defer.Deferred() - # start off a first set of lookups - res_deferreds = kr.verify_json_objects_for_server( - [("server1", json1), - ("server2", {}) - ] + @defer.inlineCallbacks + def get_perspectives(**kwargs): + self.assertEquals( + LoggingContext.current_context().test_key, "11", + ) + with logcontext.PreserveLoggingContext(): + yield persp_deferred + defer.returnValue(persp_resp) + self.http_client.post_json.side_effect = get_perspectives + + with LoggingContext("11") as context_11: + context_11.test_key = "11" + + # start off a first set of lookups + res_deferreds = kr.verify_json_objects_for_server( + [("server10", json1), + ("server11", {}) + ] + ) + + # the unsigned json should be rejected pretty quickly + self.assertTrue(res_deferreds[1].called) + try: + yield res_deferreds[1] + self.assertFalse("unsigned json didn't cause a failure") + except SynapseError: + pass + + self.assertFalse(res_deferreds[0].called) + res_deferreds[0].addBoth(self.check_context, None) + + # wait a tick for it to send the request to the perspectives server + # (it first tries the datastore) + yield async.sleep(0.005) + self.http_client.post_json.assert_called_once() + + self.assertIs(LoggingContext.current_context(), context_11) + + context_12 = LoggingContext("12") + context_12.test_key = "12" + with logcontext.PreserveLoggingContext(context_12): + # a second request for a server with outstanding requests + # should block rather than start a second call + self.http_client.post_json.reset_mock() + self.http_client.post_json.return_value = defer.Deferred() + + res_deferreds_2 = kr.verify_json_objects_for_server( + [("server10", json1)], + ) + yield async.sleep(0.005) + self.http_client.post_json.assert_not_called() + res_deferreds_2[0].addBoth(self.check_context, None) + + # complete the first request + with logcontext.PreserveLoggingContext(): + persp_deferred.callback(persp_resp) + self.assertIs(LoggingContext.current_context(), context_11) + + with logcontext.PreserveLoggingContext(): + yield res_deferreds[0] + yield res_deferreds_2[0] + + @defer.inlineCallbacks + def test_verify_json_for_server(self): + kr = keyring.Keyring(self.hs) + + key1 = signedjson.key.generate_signing_key(1) + yield self.hs.datastore.store_server_verify_key( + "server9", "", time.time() * 1000, + signedjson.key.get_verify_key(key1), ) + json1 = {} + signedjson.sign.sign_json(json1, "server9", key1) - # the unsigned json should be rejected pretty quickly - try: - yield res_deferreds[1] - self.assertFalse("unsigned json didn't cause a failure") - except SynapseError: - pass + sentinel_context = LoggingContext.current_context() - self.assertFalse(res_deferreds[0].called) + with LoggingContext("one") as context_one: + context_one.test_key = "one" - # wait a tick for it to send the request to the perspectives server - # (it first tries the datastore) - yield async.sleep(0.005) - self.http_client.post_json.assert_called_once() + defer = kr.verify_json_for_server("server9", {}) + try: + yield defer + self.fail("should fail on unsigned json") + except SynapseError: + pass + self.assertIs(LoggingContext.current_context(), context_one) - # a second request for a server with outstanding requests should - # block rather than start a second call - self.http_client.post_json.reset_mock() - self.http_client.post_json.return_value = defer.Deferred() + defer = kr.verify_json_for_server("server9", json1) + self.assertFalse(defer.called) + self.assertIs(LoggingContext.current_context(), sentinel_context) + yield defer - kr.verify_json_objects_for_server( - [("server1", json1)], - ) - yield async.sleep(0.005) - self.http_client.post_json.assert_not_called() + self.assertIs(LoggingContext.current_context(), context_one) From 6de74ea6d7394b63c9475e9dfff943188a9ed73b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 20 Sep 2017 01:32:42 +0100 Subject: [PATCH 18/53] Fix logcontexts in _check_sigs_and_hashes --- synapse/federation/federation_base.py | 108 ++++++++++++------------ synapse/federation/federation_client.py | 8 +- 2 files changed, 59 insertions(+), 57 deletions(-) diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index cabed33f74..babd9ea078 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -18,8 +18,7 @@ from synapse.api.errors import SynapseError from synapse.crypto.event_signing import check_event_content_hash from synapse.events import spamcheck from synapse.events.utils import prune_event -from synapse.util import unwrapFirstError -from synapse.util.logcontext import preserve_context_over_deferred +from synapse.util import unwrapFirstError, logcontext from twisted.internet import defer logger = logging.getLogger(__name__) @@ -51,56 +50,52 @@ class FederationBase(object): """ deferreds = self._check_sigs_and_hashes(pdus) - def callback(pdu): - return pdu + @defer.inlineCallbacks + def handle_check_result(pdu, deferred): + try: + res = yield logcontext.make_deferred_yieldable(deferred) + except SynapseError: + res = None - def errback(failure, pdu): - failure.trap(SynapseError) - return None - - def try_local_db(res, pdu): if not res: # Check local db. - return self.store.get_event( + res = yield self.store.get_event( pdu.event_id, allow_rejected=True, allow_none=True, ) - return res - def try_remote(res, pdu): if not res and pdu.origin != origin: - return self.get_pdu( - destinations=[pdu.origin], - event_id=pdu.event_id, - outlier=outlier, - timeout=10000, - ).addErrback(lambda e: None) - return res + try: + res = yield self.get_pdu( + destinations=[pdu.origin], + event_id=pdu.event_id, + outlier=outlier, + timeout=10000, + ) + except SynapseError: + pass - def warn(res, pdu): if not res: logger.warn( "Failed to find copy of %s with valid signature", pdu.event_id, ) - return res - for pdu, deferred in zip(pdus, deferreds): - deferred.addCallbacks( - callback, errback, errbackArgs=[pdu] - ).addCallback( - try_local_db, pdu - ).addCallback( - try_remote, pdu - ).addCallback( - warn, pdu + defer.returnValue(res) + + handle = logcontext.preserve_fn(handle_check_result) + deferreds2 = [ + handle(pdu, deferred) + for pdu, deferred in zip(pdus, deferreds) + ] + + valid_pdus = yield logcontext.make_deferred_yieldable( + defer.gatherResults( + deferreds2, + consumeErrors=True, ) - - valid_pdus = yield preserve_context_over_deferred(defer.gatherResults( - deferreds, - consumeErrors=True - )).addErrback(unwrapFirstError) + ).addErrback(unwrapFirstError) if include_none: defer.returnValue(valid_pdus) @@ -108,7 +103,9 @@ class FederationBase(object): defer.returnValue([p for p in valid_pdus if p]) def _check_sigs_and_hash(self, pdu): - return self._check_sigs_and_hashes([pdu])[0] + return logcontext.make_deferred_yieldable( + self._check_sigs_and_hashes([pdu])[0], + ) def _check_sigs_and_hashes(self, pdus): """Checks that each of the received events is correctly signed by the @@ -123,6 +120,7 @@ class FederationBase(object): * returns a redacted version of the event (if the signature matched but the hash did not) * throws a SynapseError if the signature check failed. + The deferreds run their callbacks in the sentinel logcontext. """ redacted_pdus = [ @@ -135,29 +133,33 @@ class FederationBase(object): for p in redacted_pdus ]) + ctx = logcontext.LoggingContext.current_context() + def callback(_, pdu, redacted): - if not check_event_content_hash(pdu): - logger.warn( - "Event content has been tampered, redacting %s: %s", - pdu.event_id, pdu.get_pdu_json() - ) - return redacted + with logcontext.PreserveLoggingContext(ctx): + if not check_event_content_hash(pdu): + logger.warn( + "Event content has been tampered, redacting %s: %s", + pdu.event_id, pdu.get_pdu_json() + ) + return redacted - if spamcheck.check_event_for_spam(pdu): - logger.warn( - "Event contains spam, redacting %s: %s", - pdu.event_id, pdu.get_pdu_json() - ) - return redacted + if spamcheck.check_event_for_spam(pdu): + logger.warn( + "Event contains spam, redacting %s: %s", + pdu.event_id, pdu.get_pdu_json() + ) + return redacted - return pdu + return pdu def errback(failure, pdu): failure.trap(SynapseError) - logger.warn( - "Signature check failed for %s", - pdu.event_id, - ) + with logcontext.PreserveLoggingContext(ctx): + logger.warn( + "Signature check failed for %s", + pdu.event_id, + ) return failure for deferred, pdu, redacted in zip(deferreds, pdus, redacted_pdus): diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 861441708b..7c5e5d957f 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -22,7 +22,7 @@ from synapse.api.constants import Membership from synapse.api.errors import ( CodeMessageException, HttpResponseException, SynapseError, ) -from synapse.util import unwrapFirstError +from synapse.util import unwrapFirstError, logcontext from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred @@ -189,10 +189,10 @@ class FederationClient(FederationBase): ] # FIXME: We should handle signature failures more gracefully. - pdus[:] = yield preserve_context_over_deferred(defer.gatherResults( + pdus[:] = yield logcontext.make_deferred_yieldable(defer.gatherResults( self._check_sigs_and_hashes(pdus), consumeErrors=True, - )).addErrback(unwrapFirstError) + ).addErrback(unwrapFirstError)) defer.returnValue(pdus) @@ -252,7 +252,7 @@ class FederationClient(FederationBase): pdu = pdu_list[0] # Check signatures are correct. - signed_pdu = yield self._check_sigs_and_hashes([pdu])[0] + signed_pdu = yield self._check_sigs_and_hash(pdu) break From 3166ed55b23d0939f08337336439d9222117c9e6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 Sep 2017 14:44:17 +0100 Subject: [PATCH 19/53] Fix device list when rejoining room (#2461) --- synapse/handlers/sync.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index bb78c25ee5..af1b527840 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -980,7 +980,18 @@ class SyncHandler(object): # We want to figure out if we joined the room at some point since # the last sync (even if we have since left). This is to make sure # we do send down the room, and with full state, where necessary + old_state_ids = None + if room_id in joined_room_ids and non_joins: + # Always include if the user (re)joined the room, especially + # important so that device list changes are calculated correctly. + # If there are non join member events, but we are still in the room, + # then the user must have left and joined + newly_joined_rooms.append(room_id) + + # User is in the room so we don't need to do the invite/leave checks + continue + if room_id in joined_room_ids or has_join: old_state_ids = yield self.get_state_at(room_id, since_token) old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None) @@ -992,8 +1003,9 @@ class SyncHandler(object): if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: newly_joined_rooms.append(room_id) - if room_id in joined_room_ids: - continue + # If user is in the room then we don't need to do the invite/leave checks + if room_id in joined_room_ids: + continue if not non_joins: continue From f496399ac4a54410a88d3aba8fe66b54e74bc3cf Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 22 Sep 2017 15:34:14 +0100 Subject: [PATCH 20/53] fix thinko'd docstring --- synapse/events/spamcheck.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py index 3eb4eab26a..56fa9e556e 100644 --- a/synapse/events/spamcheck.py +++ b/synapse/events/spamcheck.py @@ -19,7 +19,7 @@ def check_event_for_spam(event): If the server considers an event spammy, then it will be rejected if sent by a local user. If it is sent by a user on another server, then - users + users receive a blank event. Args: event (synapse.events.EventBase): the event to be checked From f65e31d22fe9a0b07053ee15004e106ca787048b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 22 Sep 2017 20:26:47 +0100 Subject: [PATCH 21/53] Do an AAAA lookup on SRV record targets (#2462) Support SRV records which point at AAAA records, as well as A records. Fixes https://github.com/matrix-org/synapse/issues/2405 --- synapse/http/endpoint.py | 116 ++++++++++++++++++++++++++++++++------- tests/test_dns.py | 26 +++++++-- 2 files changed, 118 insertions(+), 24 deletions(-) diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index d8923c9abb..241b17f2cb 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import socket from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS from twisted.internet import defer, reactor @@ -30,7 +31,10 @@ logger = logging.getLogger(__name__) SERVER_CACHE = {} - +# our record of an individual server which can be tried to reach a destination. +# +# "host" is actually a dotted-quad or ipv6 address string. Except when there's +# no SRV record, in which case it is the original hostname. _Server = collections.namedtuple( "_Server", "priority weight host port expires" ) @@ -219,9 +223,10 @@ class SRVClientEndpoint(object): return self.default_server else: raise ConnectError( - "Not server available for %s" % self.service_name + "No server available for %s" % self.service_name ) + # look for all servers with the same priority min_priority = self.servers[0].priority weight_indexes = list( (index, server.weight + 1) @@ -231,11 +236,22 @@ class SRVClientEndpoint(object): total_weight = sum(weight for index, weight in weight_indexes) target_weight = random.randint(0, total_weight) - for index, weight in weight_indexes: target_weight -= weight if target_weight <= 0: server = self.servers[index] + # XXX: this looks totally dubious: + # + # (a) we never reuse a server until we have been through + # all of the servers at the same priority, so if the + # weights are A: 100, B:1, we always do ABABAB instead of + # AAAA...AAAB (approximately). + # + # (b) After using all the servers at the lowest priority, + # we move onto the next priority. We should only use the + # second priority if servers at the top priority are + # unreachable. + # del self.servers[index] self.used_servers.append(server) return server @@ -280,26 +296,21 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t continue payload = answer.payload - host = str(payload.target) - srv_ttl = answer.ttl - try: - answers, _, _ = yield dns_client.lookupAddress(host) - except DNSNameError: - continue + hosts = yield _get_hosts_for_srv_record( + dns_client, str(payload.target) + ) - for answer in answers: - if answer.type == dns.A and answer.payload: - ip = answer.payload.dottedQuad() - host_ttl = min(srv_ttl, answer.ttl) + for (ip, ttl) in hosts: + host_ttl = min(answer.ttl, ttl) - servers.append(_Server( - host=ip, - port=int(payload.port), - priority=int(payload.priority), - weight=int(payload.weight), - expires=int(clock.time()) + host_ttl, - )) + servers.append(_Server( + host=ip, + port=int(payload.port), + priority=int(payload.priority), + weight=int(payload.weight), + expires=int(clock.time()) + host_ttl, + )) servers.sort() cache[service_name] = list(servers) @@ -317,3 +328,68 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t raise e defer.returnValue(servers) + + +@defer.inlineCallbacks +def _get_hosts_for_srv_record(dns_client, host): + """Look up each of the hosts in a SRV record + + Args: + dns_client (twisted.names.dns.IResolver): + host (basestring): host to look up + + Returns: + Deferred[list[(str, int)]]: a list of (host, ttl) pairs + + """ + ip4_servers = [] + ip6_servers = [] + + def cb(res): + # lookupAddress and lookupIP6Address return a three-tuple + # giving the answer, authority, and additional sections of the + # response. + # + # we only care about the answers. + + return res[0] + + def eb(res): + res.trap(DNSNameError) + return [] + + # no logcontexts here, so we can safely fire these off and gatherResults + d1 = dns_client.lookupAddress(host).addCallbacks(cb, eb) + d2 = dns_client.lookupIPV6Address(host).addCallbacks(cb, eb) + results = yield defer.gatherResults([d1, d2], consumeErrors=True) + + for result in results: + for answer in result: + if not answer.payload: + continue + + try: + if answer.type == dns.A: + ip = answer.payload.dottedQuad() + ip4_servers.append((ip, answer.ttl)) + elif answer.type == dns.AAAA: + ip = socket.inet_ntop( + socket.AF_INET6, answer.payload.address, + ) + ip6_servers.append((ip, answer.ttl)) + else: + # the most likely candidate here is a CNAME record. + # rfc2782 says srvs may not point to aliases. + logger.warn( + "Ignoring unexpected DNS record type %s for %s", + answer.type, host, + ) + continue + except Exception as e: + logger.warn("Ignoring invalid DNS response for %s: %s", + host, e) + continue + + # keep the ipv4 results before the ipv6 results, mostly to match historical + # behaviour. + defer.returnValue(ip4_servers + ip6_servers) diff --git a/tests/test_dns.py b/tests/test_dns.py index c394c57ee7..d08b0f4333 100644 --- a/tests/test_dns.py +++ b/tests/test_dns.py @@ -24,15 +24,17 @@ from synapse.http.endpoint import resolve_service from tests.utils import MockClock +@unittest.DEBUG class DnsTestCase(unittest.TestCase): @defer.inlineCallbacks def test_resolve(self): dns_client_mock = Mock() - service_name = "test_service.examle.com" + service_name = "test_service.example.com" host_name = "example.com" ip_address = "127.0.0.1" + ip6_address = "::1" answer_srv = dns.RRHeader( type=dns.SRV, @@ -48,8 +50,22 @@ class DnsTestCase(unittest.TestCase): ) ) - dns_client_mock.lookupService.return_value = ([answer_srv], None, None) - dns_client_mock.lookupAddress.return_value = ([answer_a], None, None) + answer_aaaa = dns.RRHeader( + type=dns.AAAA, + payload=dns.Record_AAAA( + address=ip6_address, + ) + ) + + dns_client_mock.lookupService.return_value = defer.succeed( + ([answer_srv], None, None), + ) + dns_client_mock.lookupAddress.return_value = defer.succeed( + ([answer_a], None, None), + ) + dns_client_mock.lookupIPV6Address.return_value = defer.succeed( + ([answer_aaaa], None, None), + ) cache = {} @@ -59,10 +75,12 @@ class DnsTestCase(unittest.TestCase): dns_client_mock.lookupService.assert_called_once_with(service_name) dns_client_mock.lookupAddress.assert_called_once_with(host_name) + dns_client_mock.lookupIPV6Address.assert_called_once_with(host_name) - self.assertEquals(len(servers), 1) + self.assertEquals(len(servers), 2) self.assertEquals(servers, cache[service_name]) self.assertEquals(servers[0].host, ip_address) + self.assertEquals(servers[1].host, ip6_address) @defer.inlineCallbacks def test_from_cache_expired_and_dns_fail(self): From 79b3cf3e02a3816791a8a0674bbac261b46abea9 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 25 Sep 2017 09:51:39 +0100 Subject: [PATCH 22/53] Fix logcontxt leak in keyclient (#2465) preserve_context_over_function doesn't do what you want it to do. --- synapse/crypto/keyclient.py | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py index c2bd64d6c2..f1fd488b90 100644 --- a/synapse/crypto/keyclient.py +++ b/synapse/crypto/keyclient.py @@ -13,14 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. - +from synapse.util import logcontext from twisted.web.http import HTTPClient from twisted.internet.protocol import Factory from twisted.internet import defer, reactor from synapse.http.endpoint import matrix_federation_endpoint -from synapse.util.logcontext import ( - preserve_context_over_fn, preserve_context_over_deferred -) import simplejson as json import logging @@ -43,14 +40,10 @@ def fetch_server_key(server_name, ssl_context_factory, path=KEY_API_V1): for i in range(5): try: - protocol = yield preserve_context_over_fn( - endpoint.connect, factory - ) - server_response, server_certificate = yield preserve_context_over_deferred( - protocol.remote_key - ) - defer.returnValue((server_response, server_certificate)) - return + with logcontext.PreserveLoggingContext(): + protocol = yield endpoint.connect(factory) + server_response, server_certificate = yield protocol.remote_key + defer.returnValue((server_response, server_certificate)) except SynapseKeyClientError as e: logger.exception("Error getting key for %r" % (server_name,)) if e.status.startswith("4"): From ba8fdc925c0d6271d339be8fc27ef3a15a3f01c0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 25 Sep 2017 11:01:31 +0100 Subject: [PATCH 23/53] Bump version and changes --- CHANGES.rst | 24 ++++++++++++++++++++++++ synapse/__init__.py | 2 +- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/CHANGES.rst b/CHANGES.rst index a415944756..2ba396fc23 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,27 @@ +Changes in synapse v0.23.0-rc1 (2017-09-25) +=========================================== + +Changes: + +* Use bcrypt module instead of py-bcrypt (PR #2288) Thanks to @kyrias! +* Improve performance of generating push notifications (PR #2343, #2357, #2365, + #2366, #2371) +* Add a frontend proxy worker (PR #2344) +* Improve DB performance for device list handling in sync (PR #2362) +* Add sample prometheus config (PR #2416) +* Document known to work postgres version (PR #2433) Thanks to @ptman! +* Add support for event_id_only push format (PR #2450) + + +Bug fixes: + +* Fix caching error in the push evaluator (PR #2332) +* Fix bug where pusherpool didn't start and broke some rooms (PR #2342) +* Fix port script for user directory tables (PR #2375) +* Fix device lists notifications when user rejoins a room (PR #2443, #2449) +* Fix sync to always send down current state events in timeline (PR #2451) + + Changes in synapse v0.22.1 (2017-07-06) ======================================= diff --git a/synapse/__init__.py b/synapse/__init__.py index dbf22eca00..30f78c11d1 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.22.1" +__version__ = "0.23.0-rc1" From b15c2b7971b582c7e5ec136a01715d8e860bfe30 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 25 Sep 2017 11:34:12 +0100 Subject: [PATCH 24/53] Update CHANGES --- CHANGES.rst | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 2ba396fc23..b7abe32519 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,16 +1,22 @@ Changes in synapse v0.23.0-rc1 (2017-09-25) =========================================== +Features: + +* Add a frontend proxy worker (PR #2344) +* Add support for event_id_only push format (PR #2450) +* Add a PoC for filtering spammy events (PR #2456) +* Add a config option to block all room invites (PR #2457) + + Changes: * Use bcrypt module instead of py-bcrypt (PR #2288) Thanks to @kyrias! * Improve performance of generating push notifications (PR #2343, #2357, #2365, #2366, #2371) -* Add a frontend proxy worker (PR #2344) * Improve DB performance for device list handling in sync (PR #2362) -* Add sample prometheus config (PR #2416) +* Include a sample prometheus config (PR #2416) * Document known to work postgres version (PR #2433) Thanks to @ptman! -* Add support for event_id_only push format (PR #2450) Bug fixes: @@ -20,6 +26,8 @@ Bug fixes: * Fix port script for user directory tables (PR #2375) * Fix device lists notifications when user rejoins a room (PR #2443, #2449) * Fix sync to always send down current state events in timeline (PR #2451) +* Fix bug where guest users were incorrectly kicked (PR #2453) +* Fix bug talking to IPv6 only servers using SRV records (PR #2462) Changes in synapse v0.22.1 (2017-07-06) From 7141f1a5cc40a6b2d76edacfdc66fe656565666c Mon Sep 17 00:00:00 2001 From: Max Dor Date: Mon, 25 Sep 2017 16:20:23 +0200 Subject: [PATCH 25/53] Clarify recommended network setup --- README.rst | 39 ++++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/README.rst b/README.rst index 4491b45181..8ca1e25d43 100644 --- a/README.rst +++ b/README.rst @@ -200,19 +200,21 @@ different. See `the spec`__ for more information on key management.) .. __: `key_management`_ The default configuration exposes two HTTP ports: 8008 and 8448. Port 8008 is -configured without TLS; it is not recommended this be exposed outside your -local network. Port 8448 is configured to use TLS with a self-signed -certificate. This is fine for testing with but, to avoid your clients -complaining about the certificate, you will almost certainly want to use -another certificate for production purposes. (Note that a self-signed +configured without TLS; it should be behind a reverse proxy for TLS/SSL +termination on port 443 which in turn should be used for clients. Port 8448 +is configured to use TLS with a self-signed certificate. If you would like +to do initial test with a client without having to setup a reverse proxy, +you can temporarly use another certificate. (Note that a self-signed certificate is fine for `Federation`_). You can do so by changing ``tls_certificate_path``, ``tls_private_key_path`` and ``tls_dh_params_path`` -in ``homeserver.yaml``; alternatively, you can use a reverse-proxy, but be sure -to read `Using a reverse proxy with Synapse`_ when doing so. +in ``homeserver.yaml``; Apart from port 8448 using TLS, both ports are the same in the default configuration. +See https://github.com/matrix-org/synapse/issues/2438 for the recommended +production configuration. + Registering a user ------------------ @@ -283,10 +285,16 @@ Connecting to Synapse from a client The easiest way to try out your new Synapse installation is by connecting to it from a web client. The easiest option is probably the one at http://riot.im/app. You will need to specify a "Custom server" when you log on -or register: set this to ``https://localhost:8448`` - remember to specify the -port (``:8448``) unless you changed the configuration. (Leave the identity +or register: set this to ``https://domain.tld`` if you setup a reverse proxy +following the recommended setup, or ``https://localhost:8448`` - remember to specify the +port (``:8448``) if not ``:443`` unless you changed the configuration. (Leave the identity server as the default - see `Identity servers`_.) +If using port 8448 you will run into errors until you accept the self-signed +certificate. You can easily do this by going to ``https://localhost:8448`` +directly with your browser and accept the presented certificate. You can then +go back in your web client and proceed further. + If all goes well you should at least be able to log in, create a room, and start sending messages. @@ -593,8 +601,9 @@ you to run your server on a machine that might not have the same name as your domain name. For example, you might want to run your server at ``synapse.example.com``, but have your Matrix user-ids look like ``@user:example.com``. (A SRV record also allows you to change the port from -the default 8448. However, if you are thinking of using a reverse-proxy, be -sure to read `Reverse-proxying the federation port`_ first.) +the default 8448. However, if you are thinking of using a reverse-proxy on the +federation port, which is highly not recommended, be sure to read +`Reverse-proxying the federation port`_ first.) To use a SRV record, first create your SRV record and publish it in DNS. This should have the format ``_matrix._tcp. IN SRV 10 0 @@ -674,7 +683,7 @@ For information on how to install and use PostgreSQL, please see Using a reverse proxy with Synapse ================================== -It is possible to put a reverse proxy such as +It is recommended to put a reverse proxy such as `nginx `_, `Apache `_ or `HAProxy `_ in front of Synapse. One advantage of @@ -692,9 +701,9 @@ federation port has a number of pitfalls. It is possible, but be sure to read `Reverse-proxying the federation port`_. The recommended setup is therefore to configure your reverse-proxy on port 443 -for client connections, but to also expose port 8448 for server-server -connections. All the Matrix endpoints begin ``/_matrix``, so an example nginx -configuration might look like:: +to port 8008 of synapse for client connections, but to also directly expose port +8448 for server-server connections. All the Matrix endpoints begin ``/_matrix``, +so an example nginx configuration might look like:: server { listen 443 ssl; From e591f7b3f06ba4de55c439e0741b4fe4ef445556 Mon Sep 17 00:00:00 2001 From: Max Dor Date: Mon, 25 Sep 2017 16:42:26 +0200 Subject: [PATCH 26/53] Include review feedback --- README.rst | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/README.rst b/README.rst index 8ca1e25d43..9da8c7f7a8 100644 --- a/README.rst +++ b/README.rst @@ -207,14 +207,12 @@ to do initial test with a client without having to setup a reverse proxy, you can temporarly use another certificate. (Note that a self-signed certificate is fine for `Federation`_). You can do so by changing ``tls_certificate_path``, ``tls_private_key_path`` and ``tls_dh_params_path`` -in ``homeserver.yaml``; +in ``homeserver.yaml``; alternatively, you can use a reverse-proxy, but be sure +to read `Using a reverse proxy with Synapse`_ when doing so. Apart from port 8448 using TLS, both ports are the same in the default configuration. -See https://github.com/matrix-org/synapse/issues/2438 for the recommended -production configuration. - Registering a user ------------------ @@ -602,7 +600,7 @@ domain name. For example, you might want to run your server at ``synapse.example.com``, but have your Matrix user-ids look like ``@user:example.com``. (A SRV record also allows you to change the port from the default 8448. However, if you are thinking of using a reverse-proxy on the -federation port, which is highly not recommended, be sure to read +federation port, which is not recommended, be sure to read `Reverse-proxying the federation port`_ first.) To use a SRV record, first create your SRV record and publish it in DNS. This From e3edca3b5d23e52d4b51afe5fa9fe2da79f09700 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 25 Sep 2017 17:35:39 +0100 Subject: [PATCH 27/53] Refactor to speed up incremental syncs --- synapse/handlers/sync.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index af1b527840..dd0ec00ae6 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -293,11 +293,6 @@ class SyncHandler(object): timeline_limit = sync_config.filter_collection.timeline_limit() block_all_timeline = sync_config.filter_collection.blocks_all_room_timeline() - # Pull out the current state, as we always want to include those events - # in the timeline if they're there. - current_state_ids = yield self.state.get_current_state_ids(room_id) - current_state_ids = frozenset(current_state_ids.itervalues()) - if recents is None or newly_joined_room or timeline_limit < len(recents): limited = True else: @@ -305,6 +300,15 @@ class SyncHandler(object): if recents: recents = sync_config.filter_collection.filter_room_timeline(recents) + + # We check if there are any state events, if there are then we pass + # all current state events to the filter_events function. This is to + # ensure that we always include current state in the timeline + current_state_ids = frozenset() + if any(e.is_state() for e in recents): + current_state_ids = yield self.state.get_current_state_ids(room_id) + current_state_ids = frozenset(current_state_ids.itervalues()) + recents = yield filter_events_for_client( self.store, sync_config.user.to_string(), @@ -341,6 +345,15 @@ class SyncHandler(object): loaded_recents = sync_config.filter_collection.filter_room_timeline( events ) + + # We check if there are any state events, if there are then we pass + # all current state events to the filter_events function. This is to + # ensure that we always include current state in the timeline + current_state_ids = frozenset() + if any(e.is_state() for e in loaded_recents): + current_state_ids = yield self.state.get_current_state_ids(room_id) + current_state_ids = frozenset(current_state_ids.itervalues()) + loaded_recents = yield filter_events_for_client( self.store, sync_config.user.to_string(), From f4c8cd5e85192bb7bf1f979ac6e1a0134766763f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 26 Sep 2017 10:02:48 +0100 Subject: [PATCH 28/53] Bump changelog and version --- CHANGES.rst | 8 ++++++++ synapse/__init__.py | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGES.rst b/CHANGES.rst index b7abe32519..6291fedb9a 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,11 @@ +Changes in synapse v0.23.0-rc2 (2017-09-26) +=========================================== + +Bug fixes: + +* Fix regression in performance of syncs (PR #2470) + + Changes in synapse v0.23.0-rc1 (2017-09-25) =========================================== diff --git a/synapse/__init__.py b/synapse/__init__.py index 30f78c11d1..ec83e6adb7 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.23.0-rc1" +__version__ = "0.23.0-rc2" From 4824a33c31c32a055fc5b8ff4d1197c0bd3933c5 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 26 Sep 2017 17:51:26 +0100 Subject: [PATCH 29/53] Factor out module loading to a separate place So it can be reused --- synapse/config/password_auth_providers.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/synapse/config/password_auth_providers.py b/synapse/config/password_auth_providers.py index 83762d089a..90824cab7f 100644 --- a/synapse/config/password_auth_providers.py +++ b/synapse/config/password_auth_providers.py @@ -15,13 +15,15 @@ from ._base import Config, ConfigError -import importlib +from synapse.util.module_loader import load_module class PasswordAuthProviderConfig(Config): def read_config(self, config): self.password_providers = [] + provider_config = None + # We want to be backwards compatible with the old `ldap_config` # param. ldap_config = config.get("ldap_config", {}) @@ -38,19 +40,15 @@ class PasswordAuthProviderConfig(Config): if provider['module'] == "synapse.util.ldap_auth_provider.LdapAuthProvider": from ldap_auth_provider import LdapAuthProvider provider_class = LdapAuthProvider + try: + provider_config = provider_class.parse_config(provider["config"]) + except Exception as e: + raise ConfigError( + "Failed to parse config for %r: %r" % (provider['module'], e) + ) else: - # We need to import the module, and then pick the class out of - # that, so we split based on the last dot. - module, clz = provider['module'].rsplit(".", 1) - module = importlib.import_module(module) - provider_class = getattr(module, clz) + (provider_class, provider_config) = load_module(provider) - try: - provider_config = provider_class.parse_config(provider["config"]) - except Exception as e: - raise ConfigError( - "Failed to parse config for %r: %r" % (provider['module'], e) - ) self.password_providers.append((provider_class, provider_config)) def default_config(self, **kwargs): From 0b03a9770829247055fe8eaf66c24bb1892a3c50 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 26 Sep 2017 17:56:41 +0100 Subject: [PATCH 30/53] Add module_loader.py --- synapse/util/module_loader.py | 41 +++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 synapse/util/module_loader.py diff --git a/synapse/util/module_loader.py b/synapse/util/module_loader.py new file mode 100644 index 0000000000..b4464790ee --- /dev/null +++ b/synapse/util/module_loader.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import importlib + +from synapse.config._base import ConfigError + +def load_module(provider): + """ Loads a module with its config + Take a dict with keys 'module' (the module name) and 'config' + (the config dict). + + Returns + Tuple of (provider class, parsed config object) + """ + # We need to import the module, and then pick the class out of + # that, so we split based on the last dot. + module, clz = provider['module'].rsplit(".", 1) + module = importlib.import_module(module) + provider_class = getattr(module, clz) + + try: + provider_config = provider_class.parse_config(provider["config"]) + except Exception as e: + raise ConfigError( + "Failed to parse config for %r: %r" % (provider['module'], e) + ) + + return (provider_class, provider_config) From 9fd086e506ae3cb3db7f1b1c7317c7602a4d71e3 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 26 Sep 2017 17:59:46 +0100 Subject: [PATCH 31/53] unnecessary parens --- synapse/util/module_loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/util/module_loader.py b/synapse/util/module_loader.py index b4464790ee..4b51d7a77b 100644 --- a/synapse/util/module_loader.py +++ b/synapse/util/module_loader.py @@ -38,4 +38,4 @@ def load_module(provider): "Failed to parse config for %r: %r" % (provider['module'], e) ) - return (provider_class, provider_config) + return provider_class, provider_config From 6cd5fcd5366cfef4959d107e818d0e20d78aa483 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 26 Sep 2017 19:20:23 +0100 Subject: [PATCH 32/53] Make the spam checker a module --- synapse/config/homeserver.py | 4 ++- synapse/events/spamcheck.py | 37 +++++++++++++++------------ synapse/federation/federation_base.py | 5 ++-- synapse/handlers/message.py | 5 ++-- synapse/server.py | 5 ++++ 5 files changed, 33 insertions(+), 23 deletions(-) diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index b22cacf8dc..3f9d9d5f8b 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -34,6 +34,7 @@ from .password_auth_providers import PasswordAuthProviderConfig from .emailconfig import EmailConfig from .workers import WorkerConfig from .push import PushConfig +from .spam_checker import SpamCheckerConfig class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, @@ -41,7 +42,8 @@ class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig, AppServiceConfig, KeyConfig, SAML2Config, CasConfig, JWTConfig, PasswordConfig, EmailConfig, - WorkerConfig, PasswordAuthProviderConfig, PushConfig,): + WorkerConfig, PasswordAuthProviderConfig, PushConfig, + SpamCheckerConfig,): pass diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py index 56fa9e556e..7b22b3413a 100644 --- a/synapse/events/spamcheck.py +++ b/synapse/events/spamcheck.py @@ -13,26 +13,29 @@ # See the License for the specific language governing permissions and # limitations under the License. +class SpamChecker(object): + def __init__(self, hs): + self.spam_checker = None -def check_event_for_spam(event): - """Checks if a given event is considered "spammy" by this server. + if hs.config.spam_checker is not None: + module, config = hs.config.spam_checker + print("cfg %r", config) + self.spam_checker = module(config=config) - If the server considers an event spammy, then it will be rejected if - sent by a local user. If it is sent by a user on another server, then - users receive a blank event. + def check_event_for_spam(self, event): + """Checks if a given event is considered "spammy" by this server. - Args: - event (synapse.events.EventBase): the event to be checked + If the server considers an event spammy, then it will be rejected if + sent by a local user. If it is sent by a user on another server, then + users receive a blank event. - Returns: - bool: True if the event is spammy. - """ - if not hasattr(event, "content") or "body" not in event.content: - return False + Args: + event (synapse.events.EventBase): the event to be checked - # for example: - # - # if "the third flower is green" in event.content["body"]: - # return True + Returns: + bool: True if the event is spammy. + """ + if self.spam_checker is None: + return False - return False + return self.spam_checker.check_event_for_spam(event) diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index babd9ea078..a0f5d40eb3 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -16,7 +16,6 @@ import logging from synapse.api.errors import SynapseError from synapse.crypto.event_signing import check_event_content_hash -from synapse.events import spamcheck from synapse.events.utils import prune_event from synapse.util import unwrapFirstError, logcontext from twisted.internet import defer @@ -26,7 +25,7 @@ logger = logging.getLogger(__name__) class FederationBase(object): def __init__(self, hs): - pass + self.spam_checker = hs.get_spam_checker() @defer.inlineCallbacks def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False, @@ -144,7 +143,7 @@ class FederationBase(object): ) return redacted - if spamcheck.check_event_for_spam(pdu): + if self.spam_checker.check_event_for_spam(pdu): logger.warn( "Event contains spam, redacting %s: %s", pdu.event_id, pdu.get_pdu_json() diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index da18bf23db..37f0a2772a 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from synapse.events import spamcheck from twisted.internet import defer from synapse.api.constants import EventTypes, Membership @@ -58,6 +57,8 @@ class MessageHandler(BaseHandler): self.action_generator = hs.get_action_generator() + self.spam_checker = hs.get_spam_checker() + @defer.inlineCallbacks def purge_history(self, room_id, event_id): event = yield self.store.get_event(event_id) @@ -322,7 +323,7 @@ class MessageHandler(BaseHandler): txn_id=txn_id ) - if spamcheck.check_event_for_spam(event): + if self.spam_checker.check_event_for_spam(event): raise SynapseError( 403, "Spam is not permitted here", Codes.FORBIDDEN ) diff --git a/synapse/server.py b/synapse/server.py index a38e5179e0..4d44af745e 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -31,6 +31,7 @@ from synapse.appservice.api import ApplicationServiceApi from synapse.appservice.scheduler import ApplicationServiceScheduler from synapse.crypto.keyring import Keyring from synapse.events.builder import EventBuilderFactory +from synapse.events.spamcheck import SpamChecker from synapse.federation import initialize_http_replication from synapse.federation.send_queue import FederationRemoteSendQueue from synapse.federation.transport.client import TransportLayerClient @@ -139,6 +140,7 @@ class HomeServer(object): 'read_marker_handler', 'action_generator', 'user_directory_handler', + 'spam_checker', ] def __init__(self, hostname, **kwargs): @@ -309,6 +311,9 @@ class HomeServer(object): def build_user_directory_handler(self): return UserDirectoyHandler(self) + def build_spam_checker(self): + return SpamChecker(self) + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) From 8ad5f34908df99804b27bd045fde5b9d5625d784 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 26 Sep 2017 19:21:41 +0100 Subject: [PATCH 33/53] pep8 --- synapse/util/module_loader.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/util/module_loader.py b/synapse/util/module_loader.py index 4b51d7a77b..4288312b8a 100644 --- a/synapse/util/module_loader.py +++ b/synapse/util/module_loader.py @@ -17,6 +17,7 @@ import importlib from synapse.config._base import ConfigError + def load_module(provider): """ Loads a module with its config Take a dict with keys 'module' (the module name) and 'config' From 1786b0e768877a608a6f44a6a37cc36e598eda4e Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 27 Sep 2017 10:22:54 +0100 Subject: [PATCH 34/53] Forgot the new file again :( --- synapse/config/spam_checker.py | 35 ++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 synapse/config/spam_checker.py diff --git a/synapse/config/spam_checker.py b/synapse/config/spam_checker.py new file mode 100644 index 0000000000..3fec42bdb0 --- /dev/null +++ b/synapse/config/spam_checker.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.util.module_loader import load_module + +from ._base import Config + + +class SpamCheckerConfig(Config): + def read_config(self, config): + self.spam_checker = None + + provider = config.get("spam_checker", None) + if provider is not None: + self.spam_checker = load_module(provider) + + def default_config(self, **kwargs): + return """\ + # spam_checker: + # module: "my_custom_project.SuperSpamChecker" + # config: + # example_option: 'things' + """ From 60c78666abbf82c3adfaa3bb4faf86f867eb18ea Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 27 Sep 2017 10:26:13 +0100 Subject: [PATCH 35/53] pep8 --- synapse/events/spamcheck.py | 1 + synapse/util/module_loader.py | 1 + 2 files changed, 2 insertions(+) diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py index 7b22b3413a..a876bcb816 100644 --- a/synapse/events/spamcheck.py +++ b/synapse/events/spamcheck.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. + class SpamChecker(object): def __init__(self, hs): self.spam_checker = None diff --git a/synapse/util/module_loader.py b/synapse/util/module_loader.py index 4b51d7a77b..4288312b8a 100644 --- a/synapse/util/module_loader.py +++ b/synapse/util/module_loader.py @@ -17,6 +17,7 @@ import importlib from synapse.config._base import ConfigError + def load_module(provider): """ Loads a module with its config Take a dict with keys 'module' (the module name) and 'config' From 8c06dd607165c109de23aa41098a8b45e02dcbd8 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 27 Sep 2017 10:31:14 +0100 Subject: [PATCH 36/53] Remove unintentional debugging --- synapse/events/spamcheck.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py index a876bcb816..8ddbf2ca38 100644 --- a/synapse/events/spamcheck.py +++ b/synapse/events/spamcheck.py @@ -20,7 +20,6 @@ class SpamChecker(object): if hs.config.spam_checker is not None: module, config = hs.config.spam_checker - print("cfg %r", config) self.spam_checker = module(config=config) def check_event_for_spam(self, event): From ef3a5ae787e2fa25cc753b7c5dc9f31ba3bf4316 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 27 Sep 2017 11:24:19 +0100 Subject: [PATCH 37/53] Don't test is spam_checker not None Sometimes it's a Mock object which is not none but is still not what we're after --- synapse/events/spamcheck.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py index 8ddbf2ca38..e739f105b2 100644 --- a/synapse/events/spamcheck.py +++ b/synapse/events/spamcheck.py @@ -18,8 +18,14 @@ class SpamChecker(object): def __init__(self, hs): self.spam_checker = None - if hs.config.spam_checker is not None: + module = None + config = None + try: module, config = hs.config.spam_checker + except: + pass + + if module is not None: self.spam_checker = module(config=config) def check_event_for_spam(self, event): From adec03395d1c9a8e237a74ea420966bae8ea0002 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 Sep 2017 15:01:25 +0100 Subject: [PATCH 38/53] Fix bug where /joined_members didn't check user was in room --- synapse/handlers/message.py | 31 +++++++++++++++++++++++++++++++ synapse/rest/client/v1/room.py | 17 +++++++---------- 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 37f0a2772a..f6740544c1 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -419,6 +419,37 @@ class MessageHandler(BaseHandler): [serialize_event(c, now) for c in room_state.values()] ) + @defer.inlineCallbacks + def get_joined_members(self, user_id, room_id): + """Get all the joined members in the room and their profile information. + + If the user has left the room return the state events from when they left. + + Args: + user_id(str): The user requesting state events. + room_id(str): The room ID to get all state events from. + Returns: + A dict of user_id to profile info + """ + membership, membership_event_id = yield self._check_in_room_or_world_readable( + room_id, user_id + ) + + if membership == Membership.JOIN: + users_with_profile = yield self.state.get_current_user_in_room(room_id) + else: + raise NotImplementedError( + "Getting joined members after leaving is not implemented" + ) + + defer.returnValue({ + user_id: { + "avatar_url": profile.avatar_url, + "display_name": profile.display_name, + } + for user_id, profile in users_with_profile.iteritems() + }) + @measure_func("_create_new_client_event") @defer.inlineCallbacks def _create_new_client_event(self, builder, requester=None, prev_event_ids=None): diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index cd388770c8..4be0fee38d 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -398,22 +398,19 @@ class JoinedRoomMemberListRestServlet(ClientV1RestServlet): def __init__(self, hs): super(JoinedRoomMemberListRestServlet, self).__init__(hs) - self.state = hs.get_state_handler() + self.message_handler = hs.get_handlers().message_handler @defer.inlineCallbacks def on_GET(self, request, room_id): - yield self.auth.get_user_by_req(request) + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() - users_with_profile = yield self.state.get_current_user_in_room(room_id) + users_with_profile = yield self.message_handler.get_joined_members( + user_id, room_id, + ) defer.returnValue((200, { - "joined": { - user_id: { - "avatar_url": profile.avatar_url, - "display_name": profile.display_name, - } - for user_id, profile in users_with_profile.iteritems() - } + "joined": users_with_profile, })) From 8090fd4664de87bad636ace6774dad8c33bd5276 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Sep 2017 10:09:32 +0100 Subject: [PATCH 39/53] Fix /joined_members to work with AS users --- synapse/handlers/message.py | 36 +++++++++++++++++++++++----------- synapse/rest/client/v1/room.py | 3 +-- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index f6740544c1..ca8c6c55bb 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -420,27 +420,41 @@ class MessageHandler(BaseHandler): ) @defer.inlineCallbacks - def get_joined_members(self, user_id, room_id): + def get_joined_members(self, requester, room_id): """Get all the joined members in the room and their profile information. If the user has left the room return the state events from when they left. Args: - user_id(str): The user requesting state events. + requester(Requester): The user requesting state events. room_id(str): The room ID to get all state events from. Returns: A dict of user_id to profile info """ - membership, membership_event_id = yield self._check_in_room_or_world_readable( - room_id, user_id - ) - - if membership == Membership.JOIN: - users_with_profile = yield self.state.get_current_user_in_room(room_id) - else: - raise NotImplementedError( - "Getting joined members after leaving is not implemented" + user_id = requester.user.to_string() + if not requester.app_service: + # We check AS auth after fetching the room membership, as it + # requires us to pull out all joined members anyway. + membership, _ = yield self._check_in_room_or_world_readable( + room_id, user_id ) + if membership != Membership.JOIN: + raise NotImplementedError( + "Getting joined members after leaving is not implemented" + ) + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + + # If this is an AS, double check that they are allowed to see the members. + # This can either be because the AS user is in the room or becuase there + # is a user in the room that the AS is "interested in" + if requester.app_service and user_id not in users_with_profile: + for uid in users_with_profile: + if requester.app_service.is_interested_in_user(uid): + break + else: + # Loop fell through, AS has no interested users in room + raise AuthError(403, "Appservice not in room") defer.returnValue({ user_id: { diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 4be0fee38d..6c379d53ac 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -403,10 +403,9 @@ class JoinedRoomMemberListRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request, room_id): requester = yield self.auth.get_user_by_req(request) - user_id = requester.user.to_string() users_with_profile = yield self.message_handler.get_joined_members( - user_id, room_id, + requester, room_id, ) defer.returnValue((200, { From 9ccb4226ba0824a1468c4e8f0abe91aca3381862 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Sep 2017 12:18:06 +0100 Subject: [PATCH 40/53] Delete expired url cache data --- synapse/rest/media/v1/filepath.py | 43 ++++++++- synapse/rest/media/v1/preview_url_resource.py | 90 ++++++++++++++++++- synapse/storage/media_repository.py | 61 +++++++++++++ synapse/storage/prepare_database.py | 2 +- .../schema/delta/44/expire_url_cache.sql | 17 ++++ 5 files changed, 208 insertions(+), 5 deletions(-) create mode 100644 synapse/storage/schema/delta/44/expire_url_cache.sql diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py index d92b7ff337..c5d43209f9 100644 --- a/synapse/rest/media/v1/filepath.py +++ b/synapse/rest/media/v1/filepath.py @@ -73,19 +73,58 @@ class MediaFilePaths(object): ) def url_cache_filepath(self, media_id): + # Media id is of the form + # E.g.: 2017-09-28-fsdRDt24DS234dsf return os.path.join( self.base_path, "url_cache", - media_id[0:2], media_id[2:4], media_id[4:] + media_id[:10], media_id[11:] ) + def url_cache_filepath_dirs_to_delete(self, media_id): + "The dirs to try and remove if we delete the media_id file" + return [ + os.path.join( + self.base_path, "url_cache", + media_id[:10], + ), + ] + def url_cache_thumbnail(self, media_id, width, height, content_type, method): + # Media id is of the form + # E.g.: 2017-09-28-fsdRDt24DS234dsf + top_level_type, sub_type = content_type.split("/") file_name = "%i-%i-%s-%s-%s" % ( width, height, top_level_type, sub_type, method ) + return os.path.join( self.base_path, "url_cache_thumbnails", - media_id[0:2], media_id[2:4], media_id[4:], + media_id[:10], media_id[11:], file_name ) + + def url_cache_thumbnail_directory(self, media_id): + # Media id is of the form + # E.g.: 2017-09-28-fsdRDt24DS234dsf + + return os.path.join( + self.base_path, "url_cache_thumbnails", + media_id[:10], media_id[11:], + ) + + def url_cache_thumbnail_dirs_to_delete(self, media_id): + "The dirs to try and remove if we delete the media_id thumbnails" + # Media id is of the form + # E.g.: 2017-09-28-fsdRDt24DS234dsf + return [ + os.path.join( + self.base_path, "url_cache_thumbnails", + media_id[:10], media_id[11:], + ), + os.path.join( + self.base_path, "url_cache_thumbnails", + media_id[:10], + ), + ] diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index b81a336c5d..c5ba83ddfd 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -36,6 +36,9 @@ import cgi import ujson as json import urlparse import itertools +import datetime +import errno +import shutil import logging logger = logging.getLogger(__name__) @@ -70,6 +73,10 @@ class PreviewUrlResource(Resource): self.downloads = {} + self._cleaner_loop = self.clock.looping_call( + self._expire_url_cache_data, 30 * 10000 + ) + def render_GET(self, request): self._async_render_GET(request) return NOT_DONE_YET @@ -253,8 +260,7 @@ class PreviewUrlResource(Resource): # we're most likely being explicitly triggered by a human rather than a # bot, so are we really a robot? - # XXX: horrible duplication with base_resource's _download_remote_file() - file_id = random_string(24) + file_id = datetime.date.today().isoformat() + '_' + random_string(16) fname = self.filepaths.url_cache_filepath(file_id) self.media_repo._makedirs(fname) @@ -328,6 +334,86 @@ class PreviewUrlResource(Resource): "etag": headers["ETag"][0] if "ETag" in headers else None, }) + @defer.inlineCallbacks + def _expire_url_cache_data(self): + """Clean up expired url cache content, media and thumbnails. + """ + now = self.clock.time_msec() + + # First we delete expired url cache entries + media_ids = yield self.store.get_expired_url_cache(now) + + removed_media = [] + for media_id in media_ids: + fname = self.filepaths.url_cache_filepath(media_id) + try: + os.remove(fname) + except OSError as e: + # If the path doesn't exist, meh + if e.errno != errno.ENOENT: + logger.warn("Failed to remove media: %r: %s", media_id, e) + continue + + removed_media.append(media_id) + + try: + dirs = self.filepaths.url_cache_filepath_dirs_to_delete(media_id) + for dir in dirs: + os.rmdir(dir) + except: + pass + + yield self.store.delete_url_cache(removed_media) + + logger.info("Deleted %d entries from url cache", len(removed_media)) + + # Now we delete old images associated with the url cache. + # These may be cached for a bit on the client (i.e., they + # may have a room open with a preview url thing open). + # So we wait a couple of days before deleting, just in case. + expire_before = now - 2 * 24 * 60 * 60 * 1000 + yield self.store.get_url_cache_media_before(expire_before) + + removed_media = [] + for media_id in media_ids: + fname = self.filepaths.url_cache_filepath(media_id) + try: + os.remove(fname) + except OSError as e: + # If the path doesn't exist, meh + if e.errno != errno.ENOENT: + logger.warn("Failed to remove media: %r: %s", media_id, e) + continue + + try: + dirs = self.filepaths.url_cache_filepath_dirs_to_delete(media_id) + for dir in dirs: + os.rmdir(dir) + except: + pass + + thumbnail_dir = self.filepaths.url_cache_thumbnail_directory(media_id) + try: + shutil.rmtree(thumbnail_dir) + except OSError as e: + # If the path doesn't exist, meh + if e.errno != errno.ENOENT: + logger.warn("Failed to remove media: %r: %s", media_id, e) + continue + + removed_media.append(media_id) + + try: + dirs = self.filepaths.url_cache_thumbnail_dirs_to_delete(media_id) + for dir in dirs: + os.rmdir(dir) + except: + pass + + yield self.store.delete_url_cache_media(removed_media) + + logger.info("Deleted %d media from url cache", len(removed_media)) + def decode_and_calc_og(body, media_uri, request_encoding=None): from lxml import etree diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 82bb61b811..5cca14ccb2 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -238,3 +238,64 @@ class MediaRepositoryStore(SQLBaseStore): }, ) return self.runInteraction("delete_remote_media", delete_remote_media_txn) + + def get_expired_url_cache(self, now_ts): + sql = ( + "SELECT media_id FROM local_media_repository_url_cache" + " WHERE download_ts + expires < ?" + " ORDER BY download_ts + expires ASC" + " LIMIT 100" + ) + + def _get_expired_url_cache_txn(txn): + txn.execute(sql, (now_ts,)) + return [row[0] for row in txn] + + return self.runInteraction("get_expired_url_cache", _get_expired_url_cache_txn) + + def delete_url_cache(self, media_ids): + sql = ( + "DELETE FROM local_media_repository_url_cache" + " WHERE media_id = ?" + ) + + def _delete_url_cache_txn(txn): + txn.executemany(sql, [(media_id) for media_id in media_ids]) + + return self.runInteraction("delete_url_cache", _delete_url_cache_txn) + + def get_url_cache_media_before(self, before_ts): + sql = ( + "SELECT media_id FROM local_media_repository" + " WHERE created_ts < ?" + " ORDER BY created_ts ASC" + " LIMIT 100" + ) + + def _get_url_cache_media_before_txn(txn): + txn.execute(sql, (before_ts,)) + return [row[0] for row in txn] + + return self.runInteraction( + "get_url_cache_media_before", _get_url_cache_media_before_txn, + ) + + def delete_url_cache_media(self, media_ids): + def _delete_url_cache_media_txn(txn): + sql = ( + "DELETE FROM local_media_repository" + " WHERE media_id = ?" + ) + + txn.executemany(sql, [(media_id) for media_id in media_ids]) + + sql = ( + "DELETE FROM local_media_repository_thumbnails" + " WHERE media_id = ?" + ) + + txn.executemany(sql, [(media_id) for media_id in media_ids]) + + return self.runInteraction( + "delete_url_cache_media", _delete_url_cache_media_txn, + ) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 72b670b83b..a0af8456f5 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 43 +SCHEMA_VERSION = 44 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/44/expire_url_cache.sql b/synapse/storage/schema/delta/44/expire_url_cache.sql new file mode 100644 index 0000000000..96202bd2a6 --- /dev/null +++ b/synapse/storage/schema/delta/44/expire_url_cache.sql @@ -0,0 +1,17 @@ +/* Copyright 2017 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE INDEX local_media_repository_url_idx ON local_media_repository(created_ts) WHERE url_cache IS NOT NULL; +CREATE INDEX local_media_repository_url_cache_expires_idx ON local_media_repository_url_cache(download_ts + expires); From 77f1d24de3c696f52bc1ba6d0f61e82f03a9de7a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Sep 2017 12:23:15 +0100 Subject: [PATCH 41/53] More brackets --- synapse/storage/schema/delta/44/expire_url_cache.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/schema/delta/44/expire_url_cache.sql b/synapse/storage/schema/delta/44/expire_url_cache.sql index 96202bd2a6..997e790b6d 100644 --- a/synapse/storage/schema/delta/44/expire_url_cache.sql +++ b/synapse/storage/schema/delta/44/expire_url_cache.sql @@ -14,4 +14,4 @@ */ CREATE INDEX local_media_repository_url_idx ON local_media_repository(created_ts) WHERE url_cache IS NOT NULL; -CREATE INDEX local_media_repository_url_cache_expires_idx ON local_media_repository_url_cache(download_ts + expires); +CREATE INDEX local_media_repository_url_cache_expires_idx ON local_media_repository_url_cache((download_ts + expires)); From ae79764fe55ab15156b4f28658326bd2c9c0b937 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Sep 2017 12:37:53 +0100 Subject: [PATCH 42/53] Change expires column to expires_ts --- synapse/rest/media/v1/preview_url_resource.py | 4 ++-- synapse/storage/media_repository.py | 14 ++++++------- .../schema/delta/44/expire_url_cache.sql | 21 ++++++++++++++++++- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index c5ba83ddfd..6f896ffb53 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -137,7 +137,7 @@ class PreviewUrlResource(Resource): cache_result = yield self.store.get_url_cache(url, ts) if ( cache_result and - cache_result["download_ts"] + cache_result["expires"] > ts and + cache_result["expires_ts"] > ts and cache_result["response_code"] / 100 == 2 ): respond_with_json_bytes( @@ -246,7 +246,7 @@ class PreviewUrlResource(Resource): url, media_info["response_code"], media_info["etag"], - media_info["expires"], + media_info["expires"] + media_info["created_ts"], json.dumps(og), media_info["filesystem_id"], media_info["created_ts"], diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 5cca14ccb2..b8a0dd0762 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -62,7 +62,7 @@ class MediaRepositoryStore(SQLBaseStore): def get_url_cache_txn(txn): # get the most recently cached result (relative to the given ts) sql = ( - "SELECT response_code, etag, expires, og, media_id, download_ts" + "SELECT response_code, etag, expires_ts, og, media_id, download_ts" " FROM local_media_repository_url_cache" " WHERE url = ? AND download_ts <= ?" " ORDER BY download_ts DESC LIMIT 1" @@ -74,7 +74,7 @@ class MediaRepositoryStore(SQLBaseStore): # ...or if we've requested a timestamp older than the oldest # copy in the cache, return the oldest copy (if any) sql = ( - "SELECT response_code, etag, expires, og, media_id, download_ts" + "SELECT response_code, etag, expires_ts, og, media_id, download_ts" " FROM local_media_repository_url_cache" " WHERE url = ? AND download_ts > ?" " ORDER BY download_ts ASC LIMIT 1" @@ -86,14 +86,14 @@ class MediaRepositoryStore(SQLBaseStore): return None return dict(zip(( - 'response_code', 'etag', 'expires', 'og', 'media_id', 'download_ts' + 'response_code', 'etag', 'expires_ts', 'og', 'media_id', 'download_ts' ), row)) return self.runInteraction( "get_url_cache", get_url_cache_txn ) - def store_url_cache(self, url, response_code, etag, expires, og, media_id, + def store_url_cache(self, url, response_code, etag, expires_ts, og, media_id, download_ts): return self._simple_insert( "local_media_repository_url_cache", @@ -101,7 +101,7 @@ class MediaRepositoryStore(SQLBaseStore): "url": url, "response_code": response_code, "etag": etag, - "expires": expires, + "expires_ts": expires_ts, "og": og, "media_id": media_id, "download_ts": download_ts, @@ -242,8 +242,8 @@ class MediaRepositoryStore(SQLBaseStore): def get_expired_url_cache(self, now_ts): sql = ( "SELECT media_id FROM local_media_repository_url_cache" - " WHERE download_ts + expires < ?" - " ORDER BY download_ts + expires ASC" + " WHERE expires_ts < ?" + " ORDER BY expires_ts ASC" " LIMIT 100" ) diff --git a/synapse/storage/schema/delta/44/expire_url_cache.sql b/synapse/storage/schema/delta/44/expire_url_cache.sql index 997e790b6d..9475d53e84 100644 --- a/synapse/storage/schema/delta/44/expire_url_cache.sql +++ b/synapse/storage/schema/delta/44/expire_url_cache.sql @@ -14,4 +14,23 @@ */ CREATE INDEX local_media_repository_url_idx ON local_media_repository(created_ts) WHERE url_cache IS NOT NULL; -CREATE INDEX local_media_repository_url_cache_expires_idx ON local_media_repository_url_cache((download_ts + expires)); + +-- we need to change `expires` to `expires_ts` so that we can index on it. SQLite doesn't support +-- indices on expressions until 3.9. +CREATE TABLE local_media_repository_url_cache_new( + url TEXT, + response_code INTEGER, + etag TEXT, + expires_ts BIGINT, + og TEXT, + media_id TEXT, + download_ts BIGINT +); + +INSERT INTO local_media_repository_url_cache_new + SELECT url, response_code, etag, expires + download_ts, og, media_id, download_ts FROM local_media_repository_url_cache; + +DROP TABLE local_media_repository_url_cache; +ALTER TABLE local_media_repository_url_cache_new RENAME TO local_media_repository_url_cache; + +CREATE INDEX local_media_repository_url_cache_expires_idx ON local_media_repository_url_cache(expires_ts); From 7a44c01d894d85a0eb829b4a82d1aeaff9a39ec9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Sep 2017 12:46:04 +0100 Subject: [PATCH 43/53] Fix typo --- synapse/storage/media_repository.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index b8a0dd0762..5e39daa210 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -260,7 +260,7 @@ class MediaRepositoryStore(SQLBaseStore): ) def _delete_url_cache_txn(txn): - txn.executemany(sql, [(media_id) for media_id in media_ids]) + txn.executemany(sql, [(media_id,) for media_id in media_ids]) return self.runInteraction("delete_url_cache", _delete_url_cache_txn) @@ -287,14 +287,14 @@ class MediaRepositoryStore(SQLBaseStore): " WHERE media_id = ?" ) - txn.executemany(sql, [(media_id) for media_id in media_ids]) + txn.executemany(sql, [(media_id,) for media_id in media_ids]) sql = ( "DELETE FROM local_media_repository_thumbnails" " WHERE media_id = ?" ) - txn.executemany(sql, [(media_id) for media_id in media_ids]) + txn.executemany(sql, [(media_id,) for media_id in media_ids]) return self.runInteraction( "delete_url_cache_media", _delete_url_cache_media_txn, From ace807908602cb955fc7a2cae63dc6e64bf90cc5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Sep 2017 12:52:51 +0100 Subject: [PATCH 44/53] Support new and old style media id formats --- synapse/rest/media/v1/filepath.py | 112 +++++++++++++++++++++--------- 1 file changed, 81 insertions(+), 31 deletions(-) diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py index c5d43209f9..d5cec10127 100644 --- a/synapse/rest/media/v1/filepath.py +++ b/synapse/rest/media/v1/filepath.py @@ -14,6 +14,9 @@ # limitations under the License. import os +import re + +NEW_FORMAT_ID_RE = re.compile(r"^\d\d\d\d-\d\d-\d\d") class MediaFilePaths(object): @@ -73,21 +76,39 @@ class MediaFilePaths(object): ) def url_cache_filepath(self, media_id): - # Media id is of the form - # E.g.: 2017-09-28-fsdRDt24DS234dsf - return os.path.join( - self.base_path, "url_cache", - media_id[:10], media_id[11:] - ) + if NEW_FORMAT_ID_RE.match(media_id): + # Media id is of the form + # E.g.: 2017-09-28-fsdRDt24DS234dsf + return os.path.join( + self.base_path, "url_cache", + media_id[:10], media_id[11:] + ) + else: + return os.path.join( + self.base_path, "url_cache", + media_id[0:2], media_id[2:4], media_id[4:], + ) def url_cache_filepath_dirs_to_delete(self, media_id): "The dirs to try and remove if we delete the media_id file" - return [ - os.path.join( - self.base_path, "url_cache", - media_id[:10], - ), - ] + if NEW_FORMAT_ID_RE.match(media_id): + return [ + os.path.join( + self.base_path, "url_cache", + media_id[:10], + ), + ] + else: + return [ + os.path.join( + self.base_path, "url_cache", + media_id[0:2], media_id[2:4], + ), + os.path.join( + self.base_path, "url_cache", + media_id[0:2], + ), + ] def url_cache_thumbnail(self, media_id, width, height, content_type, method): @@ -99,32 +120,61 @@ class MediaFilePaths(object): width, height, top_level_type, sub_type, method ) - return os.path.join( - self.base_path, "url_cache_thumbnails", - media_id[:10], media_id[11:], - file_name - ) + if NEW_FORMAT_ID_RE.match(media_id): + return os.path.join( + self.base_path, "url_cache_thumbnails", + media_id[:10], media_id[11:], + file_name + ) + else: + return os.path.join( + self.base_path, "url_cache_thumbnails", + media_id[0:2], media_id[2:4], media_id[4:], + file_name + ) def url_cache_thumbnail_directory(self, media_id): # Media id is of the form # E.g.: 2017-09-28-fsdRDt24DS234dsf - return os.path.join( - self.base_path, "url_cache_thumbnails", - media_id[:10], media_id[11:], - ) + if NEW_FORMAT_ID_RE.match(media_id): + return os.path.join( + self.base_path, "url_cache_thumbnails", + media_id[:10], media_id[11:], + ) + else: + return os.path.join( + self.base_path, "url_cache_thumbnails", + media_id[0:2], media_id[2:4], media_id[4:], + ) def url_cache_thumbnail_dirs_to_delete(self, media_id): "The dirs to try and remove if we delete the media_id thumbnails" # Media id is of the form # E.g.: 2017-09-28-fsdRDt24DS234dsf - return [ - os.path.join( - self.base_path, "url_cache_thumbnails", - media_id[:10], media_id[11:], - ), - os.path.join( - self.base_path, "url_cache_thumbnails", - media_id[:10], - ), - ] + if NEW_FORMAT_ID_RE.match(media_id): + return [ + os.path.join( + self.base_path, "url_cache_thumbnails", + media_id[:10], media_id[11:], + ), + os.path.join( + self.base_path, "url_cache_thumbnails", + media_id[:10], + ), + ] + else: + return [ + os.path.join( + self.base_path, "url_cache_thumbnails", + media_id[0:2], media_id[2:4], media_id[4:], + ), + os.path.join( + self.base_path, "url_cache_thumbnails", + media_id[0:2], media_id[2:4], + ), + os.path.join( + self.base_path, "url_cache_thumbnails", + media_id[0:2], + ), + ] From 5f501ec7e2645abe232bd6bab407ac863e3250c2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Sep 2017 12:59:01 +0100 Subject: [PATCH 45/53] Fix typo in url cache expiry timer --- synapse/rest/media/v1/preview_url_resource.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 6f896ffb53..1616809e8f 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -74,7 +74,7 @@ class PreviewUrlResource(Resource): self.downloads = {} self._cleaner_loop = self.clock.looping_call( - self._expire_url_cache_data, 30 * 10000 + self._expire_url_cache_data, 30 * 1000 ) def render_GET(self, request): From 93247a424a5068b088567fa98b6990e47608b7cb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Sep 2017 13:48:14 +0100 Subject: [PATCH 46/53] Only pull out local media that were for url cache --- synapse/storage/media_repository.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 5e39daa210..1f2eab98e3 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -267,7 +267,7 @@ class MediaRepositoryStore(SQLBaseStore): def get_url_cache_media_before(self, before_ts): sql = ( "SELECT media_id FROM local_media_repository" - " WHERE created_ts < ?" + " WHERE created_ts < ? AND url_cache IS NOT NULL" " ORDER BY created_ts ASC" " LIMIT 100" ) From e1e7d76cf16858d998884f19b141f90a0415d297 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Sep 2017 13:55:29 +0100 Subject: [PATCH 47/53] Actually assign result to variable --- synapse/rest/media/v1/preview_url_resource.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 1616809e8f..0123369a7f 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -372,7 +372,7 @@ class PreviewUrlResource(Resource): # may have a room open with a preview url thing open). # So we wait a couple of days before deleting, just in case. expire_before = now - 2 * 24 * 60 * 60 * 1000 - yield self.store.get_url_cache_media_before(expire_before) + media_ids = yield self.store.get_url_cache_media_before(expire_before) removed_media = [] for media_id in media_ids: From 7cc483aa0ef9e51bd3839768e44b449cf6d24136 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Sep 2017 13:56:53 +0100 Subject: [PATCH 48/53] Clear up expired url cache every 10s --- synapse/rest/media/v1/preview_url_resource.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 0123369a7f..2300c263e0 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -74,7 +74,7 @@ class PreviewUrlResource(Resource): self.downloads = {} self._cleaner_loop = self.clock.looping_call( - self._expire_url_cache_data, 30 * 1000 + self._expire_url_cache_data, 10 * 1000 ) def render_GET(self, request): From 4dc07e93a85f0f6e09a6763a7833ef935be1c417 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Sep 2017 14:10:33 +0100 Subject: [PATCH 49/53] Add old indices --- synapse/storage/schema/delta/44/expire_url_cache.sql | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/storage/schema/delta/44/expire_url_cache.sql b/synapse/storage/schema/delta/44/expire_url_cache.sql index 9475d53e84..e2b775f038 100644 --- a/synapse/storage/schema/delta/44/expire_url_cache.sql +++ b/synapse/storage/schema/delta/44/expire_url_cache.sql @@ -34,3 +34,5 @@ DROP TABLE local_media_repository_url_cache; ALTER TABLE local_media_repository_url_cache_new RENAME TO local_media_repository_url_cache; CREATE INDEX local_media_repository_url_cache_expires_idx ON local_media_repository_url_cache(expires_ts); +CREATE INDEX local_media_repository_url_cache_by_url_download_ts ON local_media_repository_url_cache(url, download_ts); +CREATE INDEX local_media_repository_url_cache_media_idx ON local_media_repository_url_cache(media_id); From 768f00dedbee83dd6bfb7c37bfadc511f7aeb10e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Sep 2017 14:27:27 +0100 Subject: [PATCH 50/53] Up the limits on number of url cache entries to delete at one time --- synapse/storage/media_repository.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 1f2eab98e3..7110a71279 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -244,7 +244,7 @@ class MediaRepositoryStore(SQLBaseStore): "SELECT media_id FROM local_media_repository_url_cache" " WHERE expires_ts < ?" " ORDER BY expires_ts ASC" - " LIMIT 100" + " LIMIT 500" ) def _get_expired_url_cache_txn(txn): @@ -269,7 +269,7 @@ class MediaRepositoryStore(SQLBaseStore): "SELECT media_id FROM local_media_repository" " WHERE created_ts < ? AND url_cache IS NOT NULL" " ORDER BY created_ts ASC" - " LIMIT 100" + " LIMIT 500" ) def _get_url_cache_media_before_txn(txn): From 75e67b9ee4526bc8e5ffd9251ad0370604db13cb Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 28 Sep 2017 15:24:00 +0100 Subject: [PATCH 51/53] Handle SERVFAILs when doing AAAA lookups for federation (#2477) ... to cope with people with broken dnssec setups, mostly --- synapse/http/endpoint.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index 241b17f2cb..a97532162f 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -354,16 +354,28 @@ def _get_hosts_for_srv_record(dns_client, host): return res[0] - def eb(res): - res.trap(DNSNameError) - return [] + def eb(res, record_type): + if res.check(DNSNameError): + return [] + logger.warn("Error looking up %s for %s: %s", + record_type, host, res, res.value) + return res # no logcontexts here, so we can safely fire these off and gatherResults d1 = dns_client.lookupAddress(host).addCallbacks(cb, eb) d2 = dns_client.lookupIPV6Address(host).addCallbacks(cb, eb) - results = yield defer.gatherResults([d1, d2], consumeErrors=True) + results = yield defer.DeferredList( + [d1, d2], consumeErrors=True) + + # if all of the lookups failed, raise an exception rather than blowing out + # the cache with an empty result. + if results and all(s == defer.FAILURE for (s, _) in results): + defer.returnValue(results[0][1]) + + for (success, result) in results: + if success == defer.FAILURE: + continue - for result in results: for answer in result: if not answer.payload: continue From e43de3ae4b33fb2fad7a4db042f413ecd7448545 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 28 Sep 2017 13:44:47 +0100 Subject: [PATCH 52/53] Improve logging of failures in matrixfederationclient * don't log exception types twice * not all exceptions have a meaningful 'message'. Use the repr rather than attempting to build a string ourselves. --- synapse/http/matrixfederationclient.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 747a791f83..6fc3a41c29 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -204,18 +204,15 @@ class MatrixFederationHttpClient(object): raise logger.warn( - "{%s} Sending request failed to %s: %s %s: %s - %s", + "{%s} Sending request failed to %s: %s %s: %s", txn_id, destination, method, url_bytes, - type(e).__name__, _flatten_response_never_received(e), ) - log_result = "%s - %s" % ( - type(e).__name__, _flatten_response_never_received(e), - ) + log_result = _flatten_response_never_received(e) if retries_left and not timeout: if long_retries: @@ -578,12 +575,14 @@ class _JsonProducer(object): def _flatten_response_never_received(e): if hasattr(e, "reasons"): - return ", ".join( + reasons = ", ".join( _flatten_response_never_received(f.value) for f in e.reasons ) + + return "%s:[%s]" % (type(e).__name__, reasons) else: - return "%s: %s" % (type(e).__name__, e.message,) + return repr(e) def check_content_type_is_json(headers): From d5694ac5fa3266a777fa171f33bebc0d7477c12a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Sep 2017 16:08:08 +0100 Subject: [PATCH 53/53] Only log if we've removed media --- synapse/rest/media/v1/preview_url_resource.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 2300c263e0..895b480d5c 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -365,7 +365,8 @@ class PreviewUrlResource(Resource): yield self.store.delete_url_cache(removed_media) - logger.info("Deleted %d entries from url cache", len(removed_media)) + if removed_media: + logger.info("Deleted %d entries from url cache", len(removed_media)) # Now we delete old images associated with the url cache. # These may be cached for a bit on the client (i.e., they @@ -412,7 +413,8 @@ class PreviewUrlResource(Resource): yield self.store.delete_url_cache_media(removed_media) - logger.info("Deleted %d media from url cache", len(removed_media)) + if removed_media: + logger.info("Deleted %d media from url cache", len(removed_media)) def decode_and_calc_og(body, media_uri, request_encoding=None):