From c46139a17edd0237ab3e6243346e5b5e201e4673 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 16 Nov 2017 18:07:01 +0000 Subject: [PATCH 01/23] Avoid locking account_data tables for upserts --- synapse/storage/account_data.py | 85 +++++++++++++++++++-------------- 1 file changed, 49 insertions(+), 36 deletions(-) diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index c8a1eb016b..56a0bde549 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -222,9 +222,12 @@ class AccountDataStore(SQLBaseStore): """ content_json = json.dumps(content) - def add_account_data_txn(txn, next_id): - self._simple_upsert_txn( - txn, + with self._account_data_id_gen.get_next() as next_id: + # no need to lock here as room_account_data has a unique constraint + # on (user_id, room_id, account_data_type) so _simple_upsert will + # retry if there is a conflict. + yield self._simple_upsert( + desc="add_room_account_data", table="room_account_data", keyvalues={ "user_id": user_id, @@ -234,19 +237,20 @@ class AccountDataStore(SQLBaseStore): values={ "stream_id": next_id, "content": content_json, - } + }, + lock=False, ) - txn.call_after( - self._account_data_stream_cache.entity_has_changed, - user_id, next_id, - ) - txn.call_after(self.get_account_data_for_user.invalidate, (user_id,)) - self._update_max_stream_id(txn, next_id) - with self._account_data_id_gen.get_next() as next_id: - yield self.runInteraction( - "add_room_account_data", add_account_data_txn, next_id - ) + # it's theoretically possible for the above to succeed and the + # below to fail - in which case we might reuse a stream id on + # restart, and the above update might not get propagated. That + # doesn't sound any worse than the whole update getting lost, + # which is what would happen if we combined the two into one + # transaction. + yield self._update_max_stream_id(next_id) + + self._account_data_stream_cache.entity_has_changed(user_id, next_id) + self.get_account_data_for_user.invalidate((user_id,)) result = self._account_data_id_gen.get_current_token() defer.returnValue(result) @@ -263,9 +267,12 @@ class AccountDataStore(SQLBaseStore): """ content_json = json.dumps(content) - def add_account_data_txn(txn, next_id): - self._simple_upsert_txn( - txn, + with self._account_data_id_gen.get_next() as next_id: + # no need to lock here as account_data has a unique constraint on + # (user_id, account_data_type) so _simple_upsert will retry if + # there is a conflict. + yield self._simple_upsert( + desc="add_user_account_data", table="account_data", keyvalues={ "user_id": user_id, @@ -274,40 +281,46 @@ class AccountDataStore(SQLBaseStore): values={ "stream_id": next_id, "content": content_json, - } + }, + lock=False, ) - txn.call_after( - self._account_data_stream_cache.entity_has_changed, + + # it's theoretically possible for the above to succeed and the + # below to fail - in which case we might reuse a stream id on + # restart, and the above update might not get propagated. That + # doesn't sound any worse than the whole update getting lost, + # which is what would happen if we combined the two into one + # transaction. + yield self._update_max_stream_id(next_id) + + self._account_data_stream_cache.entity_has_changed( user_id, next_id, ) - txn.call_after(self.get_account_data_for_user.invalidate, (user_id,)) - txn.call_after( - self.get_global_account_data_by_type_for_user.invalidate, + self.get_account_data_for_user.invalidate((user_id,)) + self.get_global_account_data_by_type_for_user.invalidate( (account_data_type, user_id,) ) - self._update_max_stream_id(txn, next_id) - - with self._account_data_id_gen.get_next() as next_id: - yield self.runInteraction( - "add_user_account_data", add_account_data_txn, next_id - ) result = self._account_data_id_gen.get_current_token() defer.returnValue(result) - def _update_max_stream_id(self, txn, next_id): + def _update_max_stream_id(self, next_id): """Update the max stream_id Args: - txn: The database cursor next_id(int): The the revision to advance to. """ - update_max_id_sql = ( - "UPDATE account_data_max_stream_id" - " SET stream_id = ?" - " WHERE stream_id < ?" + def _update(txn): + update_max_id_sql = ( + "UPDATE account_data_max_stream_id" + " SET stream_id = ?" + " WHERE stream_id < ?" + ) + txn.execute(update_max_id_sql, (next_id, next_id)) + return self.runInteraction( + "update_account_data_max_stream_id", + _update, ) - txn.execute(update_max_id_sql, (next_id, next_id)) @cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000) def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context): From 7098b65cb8c7e0b41a3bcb8ac7d2cc9e63f06f82 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 21 Nov 2017 11:03:21 +0000 Subject: [PATCH 02/23] Fix error on sqlite 3.7 Create the url_cache index on local_media_repository as a background update, so that we can detect whether we are on sqlite or not and create a partial or complete index accordingly. To avoid running the cleanup job before we have built the index, add a bailout which will defer the cleanup if the bg updates are still running. Fixes https://github.com/matrix-org/synapse/issues/2572. --- synapse/rest/media/v1/preview_url_resource.py | 10 +++++--- synapse/storage/background_updates.py | 12 +++++++++- synapse/storage/media_repository.py | 16 ++++++++++--- .../schema/delta/44/expire_url_cache.sql | 5 +++- .../46/local_media_repository_url_idx.sql | 24 +++++++++++++++++++ 5 files changed, 59 insertions(+), 8 deletions(-) create mode 100644 synapse/storage/schema/delta/46/local_media_repository_url_idx.sql diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 723f7043f4..dd76e3f7d5 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -348,11 +348,16 @@ class PreviewUrlResource(Resource): def _expire_url_cache_data(self): """Clean up expired url cache content, media and thumbnails. """ - # TODO: Delete from backup media store now = self.clock.time_msec() + logger.info("Running url preview cache expiry") + + if not self.store.has_completed_background_updates(): + logger.info("Still running DB updates; skipping expiry") + return + # First we delete expired url cache entries media_ids = yield self.store.get_expired_url_cache(now) @@ -426,8 +431,7 @@ class PreviewUrlResource(Resource): yield self.store.delete_url_cache_media(removed_media) - if removed_media: - logger.info("Deleted %d media from url cache", len(removed_media)) + logger.info("Deleted %d media from url cache", len(removed_media)) def decode_and_calc_og(body, media_uri, request_encoding=None): diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 6f235ac051..e755afc18e 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -85,6 +85,7 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_performance = {} self._background_update_queue = [] self._background_update_handlers = {} + self._all_done = False @defer.inlineCallbacks def start_doing_background_updates(self): @@ -106,8 +107,17 @@ class BackgroundUpdateStore(SQLBaseStore): "No more background updates to do." " Unscheduling background update task." ) + self._all_done = True defer.returnValue(None) + def has_completed_background_updates(self): + """Check if all the background updates have completed + + Returns: + bool: True if all background updates have completed + """ + return self._all_done + @defer.inlineCallbacks def do_next_background_update(self, desired_duration_ms): """Does some amount of work on the next queued background update @@ -269,7 +279,7 @@ class BackgroundUpdateStore(SQLBaseStore): # Sqlite doesn't support concurrent creation of indexes. # # We don't use partial indices on SQLite as it wasn't introduced - # until 3.8, and wheezy has 3.7 + # until 3.8, and wheezy and CentOS 7 have 3.7 # # We assume that sqlite doesn't give us invalid indices; however # we may still end up with the index existing but the diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 52e5cdad70..a66ff7c1e0 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -12,13 +12,23 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from ._base import SQLBaseStore +from synapse.storage.background_updates import BackgroundUpdateStore -class MediaRepositoryStore(SQLBaseStore): +class MediaRepositoryStore(BackgroundUpdateStore): """Persistence for attachments and avatars""" + def __init__(self, db_conn, hs): + super(MediaRepositoryStore, self).__init__(db_conn, hs) + + self.register_background_index_update( + update_name='local_media_repository_url_idx', + index_name='local_media_repository_url_idx', + table='local_media_repository', + columns=['created_ts'], + where_clause='url_cache IS NOT NULL', + ) + def get_default_thumbnails(self, top_level_type, sub_type): return [] diff --git a/synapse/storage/schema/delta/44/expire_url_cache.sql b/synapse/storage/schema/delta/44/expire_url_cache.sql index e2b775f038..b12f9b2ebf 100644 --- a/synapse/storage/schema/delta/44/expire_url_cache.sql +++ b/synapse/storage/schema/delta/44/expire_url_cache.sql @@ -13,7 +13,10 @@ * limitations under the License. */ -CREATE INDEX local_media_repository_url_idx ON local_media_repository(created_ts) WHERE url_cache IS NOT NULL; +-- this didn't work on SQLite 3.7 (because of lack of partial indexes), so was +-- removed and replaced with 46/local_media_repository_url_idx.sql. +-- +-- CREATE INDEX local_media_repository_url_idx ON local_media_repository(created_ts) WHERE url_cache IS NOT NULL; -- we need to change `expires` to `expires_ts` so that we can index on it. SQLite doesn't support -- indices on expressions until 3.9. diff --git a/synapse/storage/schema/delta/46/local_media_repository_url_idx.sql b/synapse/storage/schema/delta/46/local_media_repository_url_idx.sql new file mode 100644 index 0000000000..bbfc7f5d1a --- /dev/null +++ b/synapse/storage/schema/delta/46/local_media_repository_url_idx.sql @@ -0,0 +1,24 @@ +/* Copyright 2017 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- register a background update which will recreate the +-- local_media_repository_url_idx index. +-- +-- We do this as a bg update not because it is a particularly onerous +-- operation, but because we'd like it to be a partial index if possible, and +-- the background_index_update code will understand whether we are on +-- postgres or sqlite and behave accordingly. +INSERT INTO background_updates (update_name, progress_json) VALUES + ('local_media_repository_url_idx', '{}'); From 7298ed7c5145ee11cf8a8d866562170c3161c63c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 21 Nov 2017 10:50:23 +0000 Subject: [PATCH 03/23] Clean up dependency list remove those that aren't used at all, and replace the ones that don't have builders with simple getters rather than dynamically-generated methods. --- synapse/server.py | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/synapse/server.py b/synapse/server.py index 10e3e9a4f1..4746cc7b6c 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -90,17 +90,12 @@ class HomeServer(object): """ DEPENDENCIES = [ - 'config', - 'clock', 'http_client', 'db_pool', - 'persistence_service', 'replication_layer', - 'datastore', 'handlers', 'v1auth', 'auth', - 'rest_servlet_factory', 'state_handler', 'presence_handler', 'sync_handler', @@ -118,18 +113,7 @@ class HomeServer(object): 'device_message_handler', 'profile_handler', 'notifier', - 'distributor', - 'client_resource', - 'resource_for_federation', - 'resource_for_static_content', - 'resource_for_web_client', - 'resource_for_content_repo', - 'resource_for_server_key', - 'resource_for_server_key_v2', - 'resource_for_media_repository', - 'resource_for_metrics', 'event_sources', - 'ratelimiter', 'keyring', 'pusherpool', 'event_builder_factory', @@ -183,6 +167,21 @@ class HomeServer(object): def is_mine_id(self, string): return string.split(":", 1)[1] == self.hostname + def get_clock(self): + return self.clock + + def get_datastore(self): + return self.datastore + + def get_config(self): + return self.config + + def get_distributor(self): + return self.distributor + + def get_ratelimiter(self): + return self.ratelimiter + def build_replication_layer(self): return initialize_http_replication(self) From a0c668897612d04a7739d3c5d37a20187d881e5f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 21 Nov 2017 13:22:43 +0000 Subject: [PATCH 04/23] Improve documentation of workers Fixes https://github.com/matrix-org/synapse/issues/2554 --- docs/workers.rst | 154 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 133 insertions(+), 21 deletions(-) diff --git a/docs/workers.rst b/docs/workers.rst index 2d3df91593..3cc8b3d82e 100644 --- a/docs/workers.rst +++ b/docs/workers.rst @@ -1,11 +1,15 @@ Scaling synapse via workers ---------------------------- +=========================== Synapse has experimental support for splitting out functionality into multiple separate python processes, helping greatly with scalability. These processes are called 'workers', and are (eventually) intended to scale horizontally independently. +All of the below is highly experimental and subject to change as Synapse evolves, +but documenting it here to help folks needing highly scalable Synapses similar +to the one running matrix.org! + All processes continue to share the same database instance, and as such, workers only work with postgres based synapse deployments (sharing a single sqlite across multiple processes is a recipe for disaster, plus you should be using @@ -16,6 +20,16 @@ TCP protocol called 'replication' - analogous to MySQL or Postgres style database replication; feeding a stream of relevant data to the workers so they can be kept in sync with the main synapse process and database state. +Configuration +------------- + +To make effective use of the workers, you will need to configure an HTTP +reverse-proxy such as nginx or haproxy, which will direct incoming requests to +the correct worker, or to the main synapse instance. Note that this includes +requests made to the federation port. The caveats regarding running a +reverse-proxy on the federation port still apply (see +https://github.com/matrix-org/synapse/blob/master/README.rst#reverse-proxying-the-federation-port). + To enable workers, you need to add a replication listener to the master synapse, e.g.:: listeners: @@ -27,26 +41,19 @@ Under **no circumstances** should this replication API listener be exposed to th public internet; it currently implements no authentication whatsoever and is unencrypted. -You then create a set of configs for the various worker processes. These should be -worker configuration files should be stored in a dedicated subdirectory, to allow -synctl to manipulate them. - -The current available worker applications are: - * synapse.app.pusher - handles sending push notifications to sygnal and email - * synapse.app.synchrotron - handles /sync endpoints. can scales horizontally through multiple instances. - * synapse.app.appservice - handles output traffic to Application Services - * synapse.app.federation_reader - handles receiving federation traffic (including public_rooms API) - * synapse.app.media_repository - handles the media repository. - * synapse.app.client_reader - handles client API endpoints like /publicRooms +You then create a set of configs for the various worker processes. These +should be worker configuration files, and should be stored in a dedicated +subdirectory, to allow synctl to manipulate them. Each worker configuration file inherits the configuration of the main homeserver configuration file. You can then override configuration specific to that worker, e.g. the HTTP listener that it provides (if any); logging configuration; etc. You should minimise the number of overrides though to maintain a usable config. -You must specify the type of worker application (worker_app) and the replication -endpoint that it's talking to on the main synapse process (worker_replication_host -and worker_replication_port). +You must specify the type of worker application (``worker_app``). The currently +available worker applications are listed below. You must also specify the +replication endpoint that it's talking to on the main synapse process +(``worker_replication_host`` and ``worker_replication_port``). For instance:: @@ -68,11 +75,11 @@ For instance:: worker_log_config: /home/matrix/synapse/config/synchrotron_log_config.yaml ...is a full configuration for a synchrotron worker instance, which will expose a -plain HTTP /sync endpoint on port 8083 separately from the /sync endpoint provided +plain HTTP ``/sync`` endpoint on port 8083 separately from the ``/sync`` endpoint provided by the main synapse. -Obviously you should configure your loadbalancer to route the /sync endpoint to -the synchrotron instance(s) in this instance. +Obviously you should configure your reverse-proxy to route the relevant +endpoints to the worker (``localhost:8083`` in the above example). Finally, to actually run your worker-based synapse, you must pass synctl the -a commandline option to tell it to operate on all the worker configurations found @@ -89,6 +96,111 @@ To manipulate a specific worker, you pass the -w option to synctl:: synctl -w $CONFIG/workers/synchrotron.yaml restart -All of the above is highly experimental and subject to change as Synapse evolves, -but documenting it here to help folks needing highly scalable Synapses similar -to the one running matrix.org! + +Available worker applications +----------------------------- + +``synapse.app.pusher`` +~~~~~~~~~~~~~~~~~~~~~~ + +Handles sending push notifications to sygnal and email. Doesn't handle any +REST endpoints itself, but you should set ``start_pushers: False`` in the +shared configuration file to stop the main synapse sending these notifications. + +Note this worker cannot be load-balanced: only one instance should be active. + +``synapse.app.synchrotron`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The synchrotron handles ``sync`` requests from clients. In particular, it can +handle REST endpoints matching the following regular expressions:: + + ^/_matrix/client/(v2_alpha|r0)/sync$ + ^/_matrix/client/(api/v1|v2_alpha|r0)/events$ + ^/_matrix/client/(api/v1|r0)/initialSync$ + ^/_matrix/client/(api/v1|r0)/rooms/[^/]+/initialSync$ + +The above endpoints should all be routed to the synchrotron worker by the +reverse-proxy configuration. + +It is possible to run multiple instances of the synchrotron to scale +horizontally. In this case the reverse-proxy should be configured to +load-balance across the instances, though it will be more efficient if all +requests from a particular user are routed to a single instance. Extracting +a userid from the access token is currently left as an exercise for the reader. + +``synapse.app.appservice`` +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Handles sending output traffic to Application Services. Doesn't handle any +REST endpoints itself, but you should set ``notify_appservices: False`` in the +shared configuration file to stop the main synapse sending these notifications. + +Note this worker cannot be load-balanced: only one instance should be active. + +``synapse.app.federation_reader`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Handles a subset of federation endpoints. In particular, it can handle REST +endpoints matching the following regular expressions:: + + ^/_matrix/federation/v1/event/ + ^/_matrix/federation/v1/state/ + ^/_matrix/federation/v1/state_ids/ + ^/_matrix/federation/v1/backfill/ + ^/_matrix/federation/v1/get_missing_events/ + ^/_matrix/federation/v1/publicRooms + +The above endpoints should all be routed to the federation_reader worker by the +reverse-proxy configuration. + +``synapse.app.federation_sender`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Handles sending federation traffic to other servers. Doesn't handle any +REST endpoints itself, but you should set ``send_federation: False`` in the +shared configuration file to stop the main synapse sending this traffic. + +Note this worker cannot be load-balanced: only one instance should be active. + +``synapse.app.media_repository`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Handles the media repository. It can handle all endpoints starting with:: + + /_matrix/media/ + +Note this worker cannot be load-balanced: only one instance should be active. + +``synapse.app.client_reader`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Handles client API endpoints. It can handle REST endpoints matching the +following regular expressions:: + + ^/_matrix/client/(api/v1|r0|unstable)/publicRooms$ + +``synapse.app.user_dir`` +~~~~~~~~~~~~~~~~~~~~~~~~ + +Handles searches in the user directory. It can handle REST endpoints matching +the following regular expressions:: + + ^/_matrix/client/(api/v1|r0|unstable)/user_directory/search$ + +``synapse.app.frontend_proxy`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Proxies some frequently-requested client endpoints to add caching and remove +load from the main synapse. It can handle REST endpoints matching the following +regular expressions:: + + ^/_matrix/client/(api/v1|r0|unstable)/keys/upload + +It will proxy any requests it cannot handle to the main synapse instance. It +must therefore be configured with the location of the main instance, via +the ``worker_main_http_uri`` setting in the frontend_proxy worker configuration +file. For example:: + + worker_main_http_uri: http://127.0.0.1:8008 + From e1fd4751de8e96907ea97afaf91525e68ce22227 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 21 Nov 2017 11:08:08 +0000 Subject: [PATCH 05/23] Build MediaRepositoryResource as a homeserver dependency This avoids the scenario where we have four different PreviewUrlResources configured on a single app, each of which have their own caches and cache clearing jobs. --- synapse/app/homeserver.py | 3 +-- synapse/app/media_repository.py | 3 +-- synapse/server.py | 11 ++++++++++- synapse/server.pyi | 7 +++++++ 4 files changed, 19 insertions(+), 5 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 9e26146338..4b6164baa2 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -43,7 +43,6 @@ from synapse.rest import ClientRestResource from synapse.rest.key.v1.server_key_resource import LocalKey from synapse.rest.key.v2 import KeyApiV2Resource from synapse.rest.media.v0.content_repository import ContentRepoResource -from synapse.rest.media.v1.media_repository import MediaRepositoryResource from synapse.server import HomeServer from synapse.storage import are_all_users_on_domain from synapse.storage.engines import IncorrectDatabaseSetup, create_engine @@ -195,7 +194,7 @@ class SynapseHomeServer(HomeServer): }) if name in ["media", "federation", "client"]: - media_repo = MediaRepositoryResource(self) + media_repo = self.get_media_repository_resource() resources.update({ MEDIA_PREFIX: media_repo, LEGACY_MEDIA_PREFIX: media_repo, diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index 36c18bdbcb..f54beeb15d 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -35,7 +35,6 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.media.v0.content_repository import ContentRepoResource -from synapse.rest.media.v1.media_repository import MediaRepositoryResource from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.storage.media_repository import MediaRepositoryStore @@ -89,7 +88,7 @@ class MediaRepositoryServer(HomeServer): if name == "metrics": resources[METRICS_PREFIX] = MetricsResource(self) elif name == "media": - media_repo = MediaRepositoryResource(self) + media_repo = self.get_media_repository_resource() resources.update({ MEDIA_PREFIX: media_repo, LEGACY_MEDIA_PREFIX: media_repo, diff --git a/synapse/server.py b/synapse/server.py index 4746cc7b6c..853f4647b7 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -60,7 +60,10 @@ from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.notifier import Notifier from synapse.push.action_generator import ActionGenerator from synapse.push.pusherpool import PusherPool -from synapse.rest.media.v1.media_repository import MediaRepository +from synapse.rest.media.v1.media_repository import ( + MediaRepository, + MediaRepositoryResource, +) from synapse.state import StateHandler from synapse.storage import DataStore from synapse.streams.events import EventSources @@ -121,6 +124,7 @@ class HomeServer(object): 'http_client_context_factory', 'simple_http_client', 'media_repository', + 'media_repository_resource', 'federation_transport_client', 'federation_sender', 'receipts_handler', @@ -293,6 +297,11 @@ class HomeServer(object): **self.db_config.get("args", {}) ) + def build_media_repository_resource(self): + # build the media repo resource. This indirects through the HomeServer + # to ensure that we only have a single instance of + return MediaRepositoryResource(self) + def build_media_repository(self): return MediaRepository(self) diff --git a/synapse/server.pyi b/synapse/server.pyi index e8c0386b7f..3064a497eb 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -5,6 +5,7 @@ import synapse.handlers import synapse.handlers.auth import synapse.handlers.device import synapse.handlers.e2e_keys +import synapse.rest.media.v1.media_repository import synapse.storage import synapse.state @@ -35,3 +36,9 @@ class HomeServer(object): def get_federation_transport_client(self) -> synapse.federation.transport.client.TransportLayerClient: pass + + def get_media_repository_resource(self) -> synapse.rest.media.v1.media_repository.MediaRepositoryResource: + pass + + def get_media_repository(self) -> synapse.rest.media.v1.media_repository.MediaRepository: + pass From 68ca8641419ee42606192787b92152353f5c112e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 21 Nov 2017 13:29:39 +0000 Subject: [PATCH 06/23] Add config option to disable media_repo on main synapse ... to stop us doing the cache cleanup jobs on the master. --- docs/workers.rst | 5 ++++- synapse/app/homeserver.py | 21 +++++++++++++-------- synapse/app/media_repository.py | 7 +++++++ synapse/config/server.py | 6 ++++++ 4 files changed, 30 insertions(+), 9 deletions(-) diff --git a/docs/workers.rst b/docs/workers.rst index 3cc8b3d82e..b39f79058e 100644 --- a/docs/workers.rst +++ b/docs/workers.rst @@ -170,6 +170,10 @@ Handles the media repository. It can handle all endpoints starting with:: /_matrix/media/ +You should also set ``enable_media_repo: False`` in the shared configuration +file to stop the main synapse running background jobs related to managing the +media repository. + Note this worker cannot be load-balanced: only one instance should be active. ``synapse.app.client_reader`` @@ -203,4 +207,3 @@ the ``worker_main_http_uri`` setting in the frontend_proxy worker configuration file. For example:: worker_main_http_uri: http://127.0.0.1:8008 - diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 4b6164baa2..6b8875afb4 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -194,14 +194,19 @@ class SynapseHomeServer(HomeServer): }) if name in ["media", "federation", "client"]: - media_repo = self.get_media_repository_resource() - resources.update({ - MEDIA_PREFIX: media_repo, - LEGACY_MEDIA_PREFIX: media_repo, - CONTENT_REPO_PREFIX: ContentRepoResource( - self, self.config.uploads_path - ), - }) + if self.get_config().enable_media_repo: + media_repo = self.get_media_repository_resource() + resources.update({ + MEDIA_PREFIX: media_repo, + LEGACY_MEDIA_PREFIX: media_repo, + CONTENT_REPO_PREFIX: ContentRepoResource( + self, self.config.uploads_path + ), + }) + elif name == "media": + raise ConfigError( + "'media' resource conflicts with enable_media_repo=False", + ) if name in ["keys", "federation"]: resources.update({ diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index f54beeb15d..c4e5f0965d 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -150,6 +150,13 @@ def start(config_options): assert config.worker_app == "synapse.app.media_repository" + if config.enable_media_repo: + _base.quit_with_error( + "enable_media_repo must be disabled in the main synapse process\n" + "before the media repo can be run in a separate worker.\n" + "Please add ``enable_media_repo: false`` to the main config\n" + ) + setup_logging(config, use_worker_options=True) events.USE_FROZEN_DICTS = config.use_frozen_dicts diff --git a/synapse/config/server.py b/synapse/config/server.py index 4d9193536d..edb90a1348 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -41,6 +41,12 @@ class ServerConfig(Config): # false only if we are updating the user directory in a worker self.update_user_directory = config.get("update_user_directory", True) + # whether to enable the media repository endpoints. This should be set + # to false if the media repository is running as a separate endpoint; + # doing so ensures that we will not run cache cleanup jobs on the + # master, potentially causing inconsistency. + self.enable_media_repo = config.get("enable_media_repo", True) + self.filter_timeline_limit = config.get("filter_timeline_limit", -1) # Whether we should block invites sent to users on this server From 2908f955d12e8c9d6081a8d72096c85683fe1ebf Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 22 Nov 2017 18:02:15 +0000 Subject: [PATCH 07/23] Check database in has_completed_background_updates so that the right thing happens on workers. --- synapse/rest/media/v1/preview_url_resource.py | 2 +- synapse/storage/_base.py | 16 +++++------ synapse/storage/background_updates.py | 27 +++++++++++++++++-- 3 files changed, 33 insertions(+), 12 deletions(-) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index dd76e3f7d5..385e4079ec 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -354,7 +354,7 @@ class PreviewUrlResource(Resource): logger.info("Running url preview cache expiry") - if not self.store.has_completed_background_updates(): + if not (yield self.store.has_completed_background_updates()): logger.info("Still running DB updates; skipping expiry") return diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e6eefdd6fe..476c84c621 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -600,20 +600,18 @@ class SQLBaseStore(object): @staticmethod def _simple_select_onecol_txn(txn, table, keyvalues, retcol): - if keyvalues: - where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys()) - else: - where = "" - sql = ( - "SELECT %(retcol)s FROM %(table)s %(where)s" + "SELECT %(retcol)s FROM %(table)s" ) % { "retcol": retcol, "table": table, - "where": where, } - txn.execute(sql, keyvalues.values()) + if keyvalues: + sql += "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys()) + txn.execute(sql, keyvalues.values()) + else: + txn.execute(sql) return [r[0] for r in txn] @@ -624,7 +622,7 @@ class SQLBaseStore(object): Args: table (str): table name - keyvalues (dict): column names and values to select the rows with + keyvalues (dict|None): column names and values to select the rows with retcol (str): column whos value we wish to retrieve. Returns: diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index e755afc18e..11a1b942f1 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -110,13 +110,36 @@ class BackgroundUpdateStore(SQLBaseStore): self._all_done = True defer.returnValue(None) + @defer.inlineCallbacks def has_completed_background_updates(self): """Check if all the background updates have completed Returns: - bool: True if all background updates have completed + Deferred[bool]: True if all background updates have completed """ - return self._all_done + # if we've previously determined that there is nothing left to do, that + # is easy + if self._all_done: + defer.returnValue(True) + + # obviously, if we have things in our queue, we're not done. + if self._background_update_queue: + defer.returnValue(False) + + # otherwise, check if there are updates to be run. This is important, + # as we may be running on a worker which doesn't perform the bg updates + # itself, but still wants to wait for them to happen. + updates = yield self._simple_select_onecol( + "background_updates", + keyvalues=None, + retcol="1", + desc="check_background_updates", + ) + if not updates: + self._all_done = True + defer.returnValue(True) + + defer.returnValue(False) @defer.inlineCallbacks def do_next_background_update(self, desired_duration_ms): From 6b48b3e277b2fe7c14493ddd6c07f43890584955 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 22 Nov 2017 18:06:24 +0000 Subject: [PATCH 08/23] fix sql fails --- synapse/storage/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 476c84c621..470f7881ab 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -608,7 +608,7 @@ class SQLBaseStore(object): } if keyvalues: - sql += "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys()) + sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys()) txn.execute(sql, keyvalues.values()) else: txn.execute(sql) From 8132a6b7ac909b6771ee78ab3593d7d7f7e7ef2c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 23 Nov 2017 17:52:31 +0000 Subject: [PATCH 09/23] Fix OPTIONS on preview_url Fixes #2706 --- synapse/rest/media/v1/preview_url_resource.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 723f7043f4..65f86be205 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -25,7 +25,8 @@ from synapse.util.stringutils import random_string from synapse.util.caches.expiringcache import ExpiringCache from synapse.http.client import SpiderHttpClient from synapse.http.server import ( - request_handler, respond_with_json_bytes + request_handler, respond_with_json_bytes, + respond_with_json, ) from synapse.util.async import ObservableDeferred from synapse.util.stringutils import is_ascii @@ -78,6 +79,9 @@ class PreviewUrlResource(Resource): self._expire_url_cache_data, 10 * 1000 ) + def render_OPTIONS(self, request): + return respond_with_json(request, 200, {}, send_cors=True) + def render_GET(self, request): self._async_render_GET(request) return NOT_DONE_YET From 0edf085b683c8ba0ad8cf207d5bc8489b1e8cdbe Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 23 Nov 2017 23:19:43 +0000 Subject: [PATCH 10/23] Fix some logcontext leaks in replication resource The @measure_func annotations rely on the wrapped function respecting the logcontext rules. Add the necessary yields to make this work. --- synapse/replication/tcp/resource.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 1d03e79b85..786c3fe864 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -216,11 +216,12 @@ class ReplicationStreamer(object): self.federation_sender.federation_ack(token) @measure_func("repl.on_user_sync") + @defer.inlineCallbacks def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms): """A client has started/stopped syncing on a worker. """ user_sync_counter.inc() - self.presence_handler.update_external_syncs_row( + yield self.presence_handler.update_external_syncs_row( conn_id, user_id, is_syncing, last_sync_ms, ) @@ -244,11 +245,12 @@ class ReplicationStreamer(object): getattr(self.store, cache_func).invalidate(tuple(keys)) @measure_func("repl.on_user_ip") + @defer.inlineCallbacks def on_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen): """The client saw a user request """ user_ip_cache_counter.inc() - self.store.insert_client_ip( + yield self.store.insert_client_ip( user_id, access_token, ip, user_agent, device_id, last_seen, ) From 7f14f0ae3861bf9596358614133ed3bb7b8cefc4 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 24 Nov 2017 00:32:04 +0000 Subject: [PATCH 11/23] Remove dead sync_callback This is never used; let's remove it to stop confusing things. --- synapse/app/synchrotron.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 576ac6fb7e..f68f45ea4f 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -343,8 +343,6 @@ class SyncReplicationHandler(ReplicationClientHandler): self.presence_handler = hs.get_presence_handler() self.notifier = hs.get_notifier() - self.presence_handler.sync_callback = self.send_user_sync - def on_rdata(self, stream_name, token, rows): super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) From 795b0849f3654d292132e9ead691d12158bd0304 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 24 Nov 2017 00:34:56 +0000 Subject: [PATCH 12/23] Add a comment which might save some confusion --- synapse/app/synchrotron.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index f68f45ea4f..323fddee21 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -340,6 +340,7 @@ class SyncReplicationHandler(ReplicationClientHandler): self.store = hs.get_datastore() self.typing_handler = hs.get_typing_handler() + # NB this is a SynchrotronPresence, not a normal PresenceHandler self.presence_handler = hs.get_presence_handler() self.notifier = hs.get_notifier() From 8b38096a897bc142378d8a1f01fc28c459decc7e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 24 Nov 2017 16:44:56 +0000 Subject: [PATCH 13/23] Fix error handling on dns lookup pass the right arguments to the errback handler Fixes "TypeError('eb() takes exactly 2 arguments (1 given)',)" --- synapse/http/endpoint.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index a97532162f..e2b99ef3bd 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -362,8 +362,10 @@ def _get_hosts_for_srv_record(dns_client, host): return res # no logcontexts here, so we can safely fire these off and gatherResults - d1 = dns_client.lookupAddress(host).addCallbacks(cb, eb) - d2 = dns_client.lookupIPV6Address(host).addCallbacks(cb, eb) + d1 = dns_client.lookupAddress(host).addCallbacks( + cb, eb, errbackArgs=("A", )) + d2 = dns_client.lookupIPV6Address(host).addCallbacks( + cb, eb, errbackArgs=("AAAA", )) results = yield defer.DeferredList( [d1, d2], consumeErrors=True) From 63ccaa58736ca098a0d52000bdbc2df589b8cdaa Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 27 Nov 2017 11:56:57 +0000 Subject: [PATCH 14/23] Avoid retrying forever on IntegrityError --- synapse/storage/_base.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e6eefdd6fe..662c30187d 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -495,6 +495,7 @@ class SQLBaseStore(object): Deferred(bool): True if a new entry was created, False if an existing one was updated. """ + attempts = 0 while True: try: result = yield self.runInteraction( @@ -504,6 +505,12 @@ class SQLBaseStore(object): ) defer.returnValue(result) except self.database_engine.module.IntegrityError as e: + attempts += 1 + if attempts >= 5: + # don't retry forever, because things other than races + # can cause IntegrityErrors + raise + # presumably we raced with another transaction: let's retry. logger.warn( "IntegrityError when upserting into %s; retrying: %s", From 6be01f599b951fb0ef92d17d5bf7af71ec9d375a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 23 Nov 2017 15:20:52 +0000 Subject: [PATCH 15/23] Improve tracebacks on exceptions Use failure.Failure to recover our failure, which will give us a useful stacktrace, unlike the rethrown exception. --- synapse/http/server.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/synapse/http/server.py b/synapse/http/server.py index 3ca1c9947c..25466cd292 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -28,6 +28,7 @@ from canonicaljson import ( ) from twisted.internet import defer +from twisted.python import failure from twisted.web import server, resource from twisted.web.server import NOT_DONE_YET from twisted.web.util import redirectTo @@ -131,12 +132,17 @@ def wrap_request_handler(request_handler, include_metrics=False): version_string=self.version_string, ) except Exception: - logger.exception( - "Failed handle request %s.%s on %r: %r", + # failure.Failure() fishes the original Failure out + # of our stack, and thus gives us a sensible stack + # trace. + f = failure.Failure() + logger.error( + "Failed handle request %s.%s on %r: %r: %s", request_handler.__module__, request_handler.__name__, self, - request + request, + f.getTraceback().rstrip(), ) respond_with_json( request, From ab1b2d0ff238be60aa152dfafe758716a078cbf5 Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Tue, 28 Nov 2017 11:23:00 +0000 Subject: [PATCH 16/23] Allow guest access to group APIs for reading --- synapse/rest/client/v2_alpha/groups.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py index 089ec71c81..f762dbfa9a 100644 --- a/synapse/rest/client/v2_alpha/groups.py +++ b/synapse/rest/client/v2_alpha/groups.py @@ -38,7 +38,7 @@ class GroupServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request, group_id): - requester = yield self.auth.get_user_by_req(request) + requester = yield self.auth.get_user_by_req(request, allow_guest=True) requester_user_id = requester.user.to_string() group_description = yield self.groups_handler.get_group_profile( @@ -74,7 +74,7 @@ class GroupSummaryServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request, group_id): - requester = yield self.auth.get_user_by_req(request) + requester = yield self.auth.get_user_by_req(request, allow_guest=True) requester_user_id = requester.user.to_string() get_group_summary = yield self.groups_handler.get_group_summary( @@ -148,7 +148,7 @@ class GroupCategoryServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request, group_id, category_id): - requester = yield self.auth.get_user_by_req(request) + requester = yield self.auth.get_user_by_req(request, allow_guest=True) requester_user_id = requester.user.to_string() category = yield self.groups_handler.get_group_category( @@ -200,7 +200,7 @@ class GroupCategoriesServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request, group_id): - requester = yield self.auth.get_user_by_req(request) + requester = yield self.auth.get_user_by_req(request, allow_guest=True) requester_user_id = requester.user.to_string() category = yield self.groups_handler.get_group_categories( @@ -225,7 +225,7 @@ class GroupRoleServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request, group_id, role_id): - requester = yield self.auth.get_user_by_req(request) + requester = yield self.auth.get_user_by_req(request, allow_guest=True) requester_user_id = requester.user.to_string() category = yield self.groups_handler.get_group_role( @@ -277,7 +277,7 @@ class GroupRolesServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request, group_id): - requester = yield self.auth.get_user_by_req(request) + requester = yield self.auth.get_user_by_req(request, allow_guest=True) requester_user_id = requester.user.to_string() category = yield self.groups_handler.get_group_roles( @@ -348,7 +348,7 @@ class GroupRoomServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request, group_id): - requester = yield self.auth.get_user_by_req(request) + requester = yield self.auth.get_user_by_req(request, allow_guest=True) requester_user_id = requester.user.to_string() result = yield self.groups_handler.get_rooms_in_group(group_id, requester_user_id) @@ -369,7 +369,7 @@ class GroupUsersServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request, group_id): - requester = yield self.auth.get_user_by_req(request) + requester = yield self.auth.get_user_by_req(request, allow_guest=True) requester_user_id = requester.user.to_string() result = yield self.groups_handler.get_users_in_group(group_id, requester_user_id) @@ -672,7 +672,7 @@ class PublicisedGroupsForUserServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request, user_id): - yield self.auth.get_user_by_req(request) + yield self.auth.get_user_by_req(request, allow_guest=True) result = yield self.groups_handler.get_publicised_groups_for_user( user_id @@ -697,7 +697,7 @@ class PublicisedGroupsForUsersServlet(RestServlet): @defer.inlineCallbacks def on_POST(self, request): - yield self.auth.get_user_by_req(request) + yield self.auth.get_user_by_req(request, allow_guest=True) content = parse_json_object_from_request(request) user_ids = content["user_ids"] @@ -724,7 +724,7 @@ class GroupsForUserServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request): - requester = yield self.auth.get_user_by_req(request) + requester = yield self.auth.get_user_by_req(request, allow_guest=True) requester_user_id = requester.user.to_string() result = yield self.groups_handler.get_joined_groups(requester_user_id) From d4fb4f7c52e24c36204c6c3299dfa04cc05242c8 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 28 Nov 2017 14:37:11 +0000 Subject: [PATCH 17/23] Clear logcontext before starting fed txn queue runner These processes take a long time compared to the request, so there is lots of "Entering|Restoring dead context" in the logs. Let's try to shut it up a bit. --- synapse/federation/transaction_queue.py | 10 ++++++++-- synapse/notifier.py | 17 ++++++++++------- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 7a3c9cbb70..3e7809b04f 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -20,7 +20,7 @@ from .persistence import TransactionActions from .units import Transaction, Edu from synapse.api.errors import HttpResponseException -from synapse.util import logcontext +from synapse.util import logcontext, PreserveLoggingContext from synapse.util.async import run_on_reactor from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.metrics import measure_func @@ -146,7 +146,6 @@ class TransactionQueue(object): else: return not destination.startswith("localhost") - @defer.inlineCallbacks def notify_new_events(self, current_id): """This gets called when we have some new events we might want to send out to other servers. @@ -156,6 +155,13 @@ class TransactionQueue(object): if self._is_processing: return + # fire off a processing loop in the background. It's likely it will + # outlast the current request, so run it in the sentinel logcontext. + with PreserveLoggingContext(): + self._process_event_queue_loop() + + @defer.inlineCallbacks + def _process_event_queue_loop(self): try: self._is_processing = True while True: diff --git a/synapse/notifier.py b/synapse/notifier.py index 626da778cd..ef042681bc 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -255,9 +255,7 @@ class Notifier(object): ) if self.federation_sender: - preserve_fn(self.federation_sender.notify_new_events)( - room_stream_id - ) + self.federation_sender.notify_new_events(room_stream_id) if event.type == EventTypes.Member and event.membership == Membership.JOIN: self._user_joined_room(event.state_key, event.room_id) @@ -297,8 +295,7 @@ class Notifier(object): def on_new_replication_data(self): """Used to inform replication listeners that something has happend without waking up any of the normal user event streams""" - with PreserveLoggingContext(): - self.notify_replication() + self.notify_replication() @defer.inlineCallbacks def wait_for_events(self, user_id, timeout, callback, room_ids=None, @@ -516,8 +513,14 @@ class Notifier(object): self.replication_deferred = ObservableDeferred(defer.Deferred()) deferred.callback(None) - for cb in self.replication_callbacks: - preserve_fn(cb)() + # the callbacks may well outlast the current request, so we run + # them in the sentinel logcontext. + # + # (ideally it would be up to the callbacks to know if they were + # starting off background processes and drop the logcontext + # accordingly, but that requires more changes) + for cb in self.replication_callbacks: + cb() @defer.inlineCallbacks def wait_for_replication(self, callback, timeout): From da562bd6a105618e4358cb9ff08c1fb93d270947 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 29 Nov 2017 15:41:20 +0000 Subject: [PATCH 18/23] Improve comments on get_user_by_access_token because I have to reverse-engineer this every time. --- synapse/api/auth.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 72858cca1f..ac0a3655a5 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -270,7 +270,11 @@ class Auth(object): rights (str): The operation being performed; the access token must allow this. Returns: - dict : dict that includes the user and the ID of their access token. + Deferred[dict]: dict that includes: + `user` (UserID) + `is_guest` (bool) + `token_id` (int|None): access token id. May be None if guest + `device_id` (str|None): device corresponding to access token Raises: AuthError if no user by that token exists or the token is invalid. """ From 7303ed65e1cb036ce410fa6157d131dfaf00171a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 28 Nov 2017 14:06:12 +0000 Subject: [PATCH 19/23] Fix 500 when joining matrix-dev matrix-dev has an event (`$/6ANj/9QWQyd71N6DpRQPf+SDUu11+HVMeKSpMzBCwM:zemos.net`) which has no `hashes` member. Check for missing `hashes` element in events. --- synapse/crypto/event_signing.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index 0d0e7b5286..aaa3efaca3 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -32,15 +32,22 @@ def check_event_content_hash(event, hash_algorithm=hashlib.sha256): """Check whether the hash for this PDU matches the contents""" name, expected_hash = compute_content_hash(event, hash_algorithm) logger.debug("Expecting hash: %s", encode_base64(expected_hash)) - if name not in event.hashes: + + # some malformed events lack a 'hashes'. Protect against it being missing + # or a weird type by basically treating it the same as an unhashed event. + hashes = event.get("hashes") + if not isinstance(hashes, dict): + raise SynapseError(400, "Malformed 'hashes'", Codes.UNAUTHORIZED) + + if name not in hashes: raise SynapseError( 400, "Algorithm %s not in hashes %s" % ( - name, list(event.hashes), + name, list(hashes), ), Codes.UNAUTHORIZED, ) - message_hash_base64 = event.hashes[name] + message_hash_base64 = hashes[name] try: message_hash_bytes = decode_base64(message_hash_base64) except Exception: From 2c6d63922a5033a17c7d53a928892fbbcbd6fa63 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 29 Nov 2017 14:33:05 +0000 Subject: [PATCH 20/23] Remove pushers when deleting access tokens Whenever an access token is invalidated, we should remove the associated pushers. --- synapse/handlers/auth.py | 16 ++++++++++++---- synapse/push/pusherpool.py | 24 +++++++++++++++--------- synapse/storage/registration.py | 10 +++++----- 3 files changed, 32 insertions(+), 18 deletions(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 080eb14271..0ba66bc947 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -664,9 +664,6 @@ class AuthHandler(BaseHandler): yield self.delete_access_tokens_for_user( user_id, except_token_id=except_access_token_id, ) - yield self.hs.get_pusherpool().remove_pushers_by_user( - user_id, except_access_token_id - ) @defer.inlineCallbacks def deactivate_account(self, user_id): @@ -706,6 +703,12 @@ class AuthHandler(BaseHandler): access_token=access_token, ) + # delete pushers associated with this access token + if user_info["token_id"] is not None: + yield self.hs.get_pusherpool().remove_pushers_by_access_token( + str(user_info["user"]), (user_info["token_id"], ) + ) + @defer.inlineCallbacks def delete_access_tokens_for_user(self, user_id, except_token_id=None, device_id=None): @@ -728,13 +731,18 @@ class AuthHandler(BaseHandler): # see if any of our auth providers want to know about this for provider in self.password_providers: if hasattr(provider, "on_logged_out"): - for token, device_id in tokens_and_devices: + for token, token_id, device_id in tokens_and_devices: yield provider.on_logged_out( user_id=user_id, device_id=device_id, access_token=token, ) + # delete pushers associated with the access tokens + yield self.hs.get_pusherpool().remove_pushers_by_access_token( + user_id, (token_id for _, token_id, _ in tokens_and_devices), + ) + @defer.inlineCallbacks def add_threepid(self, user_id, medium, address, validated_at): # 'Canonicalise' email addresses down to lower case. diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 34cb108dcb..134e89b371 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -103,19 +103,25 @@ class PusherPool: yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) @defer.inlineCallbacks - def remove_pushers_by_user(self, user_id, except_access_token_id=None): - all = yield self.store.get_all_pushers() - logger.info( - "Removing all pushers for user %s except access tokens id %r", - user_id, except_access_token_id - ) - for p in all: - if p['user_name'] == user_id and p['access_token'] != except_access_token_id: + def remove_pushers_by_access_token(self, user_id, access_tokens): + """Remove the pushers for a given user corresponding to a set of + access_tokens. + + Args: + user_id (str): user to remove pushers for + access_tokens (Iterable[int]): access token *ids* to remove pushers + for + """ + tokens = set(access_tokens) + for p in (yield self.store.get_pushers_by_user_id(user_id)): + if p['access_token'] in tokens: logger.info( "Removing pusher for app id %s, pushkey %s, user %s", p['app_id'], p['pushkey'], p['user_name'] ) - yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) + yield self.remove_pusher( + p['app_id'], p['pushkey'], p['user_name'], + ) @defer.inlineCallbacks def on_new_notifications(self, min_stream_id, max_stream_id): diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 8b9544c209..3aa810981f 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -254,8 +254,8 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): If None, tokens associated with any device (or no device) will be deleted Returns: - defer.Deferred[list[str, str|None]]: a list of the deleted tokens - and device IDs + defer.Deferred[list[str, int, str|None, int]]: a list of + (token, token id, device id) for each of the deleted tokens """ def f(txn): keyvalues = { @@ -272,12 +272,12 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): values.append(except_token_id) txn.execute( - "SELECT token, device_id FROM access_tokens WHERE %s" % where_clause, + "SELECT token, id, device_id FROM access_tokens WHERE %s" % where_clause, values ) - tokens_and_devices = [(r[0], r[1]) for r in txn] + tokens_and_devices = [(r[0], r[1], r[2]) for r in txn] - for token, _ in tokens_and_devices: + for token, _, _ in tokens_and_devices: self._invalidate_cache_and_stream( txn, self.get_user_by_access_token, (token,) ) From 7ca5c682338e073060050f4ff78a1ab83530f9f2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 29 Nov 2017 11:48:43 +0000 Subject: [PATCH 21/23] Move deactivate_account into its own handler Non-functional refactoring to move deactivate_account. This means that we'll be able to properly deactivate devices and access tokens without introducing a dependency loop. --- synapse/handlers/auth.py | 16 --------- synapse/handlers/deactivate_account.py | 44 +++++++++++++++++++++++++ synapse/rest/client/v1/admin.py | 4 +-- synapse/rest/client/v2_alpha/account.py | 7 ++-- synapse/server.py | 5 +++ synapse/server.pyi | 7 +++- 6 files changed, 61 insertions(+), 22 deletions(-) create mode 100644 synapse/handlers/deactivate_account.py diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 0ba66bc947..cfcb4ea2a0 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -665,22 +665,6 @@ class AuthHandler(BaseHandler): user_id, except_token_id=except_access_token_id, ) - @defer.inlineCallbacks - def deactivate_account(self, user_id): - """Deactivate a user's account - - Args: - user_id (str): ID of user to be deactivated - - Returns: - Deferred - """ - # FIXME: Theoretically there is a race here wherein user resets - # password using threepid. - yield self.delete_access_tokens_for_user(user_id) - yield self.store.user_delete_threepids(user_id) - yield self.store.user_set_password_hash(user_id, None) - @defer.inlineCallbacks def delete_access_token(self, access_token): """Invalidate a single access token diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py new file mode 100644 index 0000000000..70f02c9b76 --- /dev/null +++ b/synapse/handlers/deactivate_account.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from twisted.internet import defer + +from ._base import BaseHandler + +import logging + +logger = logging.getLogger(__name__) + + +class DeactivateAccountHandler(BaseHandler): + """Handler which deals with deactivating user accounts.""" + def __init__(self, hs): + super(DeactivateAccountHandler, self).__init__(hs) + self._auth_handler = hs.get_auth_handler() + + @defer.inlineCallbacks + def deactivate_account(self, user_id): + """Deactivate a user's account + + Args: + user_id (str): ID of user to be deactivated + + Returns: + Deferred + """ + # FIXME: Theoretically there is a race here wherein user resets + # password using threepid. + yield self._auth_handler.delete_access_tokens_for_user(user_id) + yield self.store.user_delete_threepids(user_id) + yield self.store.user_set_password_hash(user_id, None) diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 1197158fdc..a67e22790b 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -137,8 +137,8 @@ class DeactivateAccountRestServlet(ClientV1RestServlet): PATTERNS = client_path_patterns("/admin/deactivate/(?P[^/]*)") def __init__(self, hs): - self._auth_handler = hs.get_auth_handler() super(DeactivateAccountRestServlet, self).__init__(hs) + self._deactivate_account_handler = hs.get_deactivate_account_handler() @defer.inlineCallbacks def on_POST(self, request, target_user_id): @@ -149,7 +149,7 @@ class DeactivateAccountRestServlet(ClientV1RestServlet): if not is_admin: raise AuthError(403, "You are not a server admin") - yield self._auth_handler.deactivate_account(target_user_id) + yield self._deactivate_account_handler.deactivate_account(target_user_id) defer.returnValue((200, {})) diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 726e0a2826..6202e8849d 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -161,10 +161,11 @@ class DeactivateAccountRestServlet(RestServlet): PATTERNS = client_v2_patterns("/account/deactivate$") def __init__(self, hs): + super(DeactivateAccountRestServlet, self).__init__() self.hs = hs self.auth = hs.get_auth() self.auth_handler = hs.get_auth_handler() - super(DeactivateAccountRestServlet, self).__init__() + self._deactivate_account_handler = hs.get_deactivate_account_handler() @defer.inlineCallbacks def on_POST(self, request): @@ -179,7 +180,7 @@ class DeactivateAccountRestServlet(RestServlet): # allow ASes to dectivate their own users if requester and requester.app_service: - yield self.auth_handler.deactivate_account( + yield self._deactivate_account_handler.deactivate_account( requester.user.to_string() ) defer.returnValue((200, {})) @@ -206,7 +207,7 @@ class DeactivateAccountRestServlet(RestServlet): logger.error("Auth succeeded but no known type!", result.keys()) raise SynapseError(500, "", Codes.UNKNOWN) - yield self.auth_handler.deactivate_account(user_id) + yield self._deactivate_account_handler.deactivate_account(user_id) defer.returnValue((200, {})) diff --git a/synapse/server.py b/synapse/server.py index 853f4647b7..fea19e68d1 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -39,6 +39,7 @@ from synapse.federation.transaction_queue import TransactionQueue from synapse.handlers import Handlers from synapse.handlers.appservice import ApplicationServicesHandler from synapse.handlers.auth import AuthHandler, MacaroonGeneartor +from synapse.handlers.deactivate_account import DeactivateAccountHandler from synapse.handlers.devicemessage import DeviceMessageHandler from synapse.handlers.device import DeviceHandler from synapse.handlers.e2e_keys import E2eKeysHandler @@ -115,6 +116,7 @@ class HomeServer(object): 'application_service_handler', 'device_message_handler', 'profile_handler', + 'deactivate_account_handler', 'notifier', 'event_sources', 'keyring', @@ -268,6 +270,9 @@ class HomeServer(object): def build_profile_handler(self): return ProfileHandler(self) + def build_deactivate_account_handler(self): + return DeactivateAccountHandler(self) + def build_event_sources(self): return EventSources(self) diff --git a/synapse/server.pyi b/synapse/server.pyi index 3064a497eb..e1d0a71fd4 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -3,11 +3,13 @@ import synapse.federation.transaction_queue import synapse.federation.transport.client import synapse.handlers import synapse.handlers.auth +import synapse.handlers.deactivate_account import synapse.handlers.device import synapse.handlers.e2e_keys import synapse.rest.media.v1.media_repository -import synapse.storage import synapse.state +import synapse.storage + class HomeServer(object): def get_auth(self) -> synapse.api.auth.Auth: @@ -31,6 +33,9 @@ class HomeServer(object): def get_state_handler(self) -> synapse.state.StateHandler: pass + def get_deactivate_account_handler(self) -> synapse.handlers.deactivate_account.DeactivateAccountHandler: + pass + def get_federation_sender(self) -> synapse.federation.transaction_queue.TransactionQueue: pass From ae31f8ce4507e8c68e5c1aea3363789dbd8ca999 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 29 Nov 2017 14:10:46 +0000 Subject: [PATCH 22/23] Move set_password into its own handler Non-functional refactoring to move set_password. This means that we'll be able to properly deactivate devices and access tokens without introducing a dependency loop. --- synapse/handlers/auth.py | 16 --------- synapse/handlers/set_password.py | 45 +++++++++++++++++++++++++ synapse/rest/client/v1/admin.py | 4 +-- synapse/rest/client/v2_alpha/account.py | 3 +- synapse/server.py | 5 +++ synapse/server.pyi | 4 +++ 6 files changed, 58 insertions(+), 19 deletions(-) create mode 100644 synapse/handlers/set_password.py diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index cfcb4ea2a0..2f30f183ce 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -649,22 +649,6 @@ class AuthHandler(BaseHandler): except Exception: raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN) - @defer.inlineCallbacks - def set_password(self, user_id, newpassword, requester=None): - password_hash = self.hash(newpassword) - - except_access_token_id = requester.access_token_id if requester else None - - try: - yield self.store.user_set_password_hash(user_id, password_hash) - except StoreError as e: - if e.code == 404: - raise SynapseError(404, "Unknown user", Codes.NOT_FOUND) - raise e - yield self.delete_access_tokens_for_user( - user_id, except_token_id=except_access_token_id, - ) - @defer.inlineCallbacks def delete_access_token(self, access_token): """Invalidate a single access token diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py new file mode 100644 index 0000000000..c0d83f229c --- /dev/null +++ b/synapse/handlers/set_password.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from twisted.internet import defer + +from synapse.api.errors import Codes, StoreError, SynapseError +from ._base import BaseHandler + +logger = logging.getLogger(__name__) + + +class SetPasswordHandler(BaseHandler): + """Handler which deals with changing user account passwords""" + def __init__(self, hs): + super(SetPasswordHandler, self).__init__(hs) + self._auth_handler = hs.get_auth_handler() + + @defer.inlineCallbacks + def set_password(self, user_id, newpassword, requester=None): + password_hash = self._auth_handler.hash(newpassword) + + except_access_token_id = requester.access_token_id if requester else None + + try: + yield self.store.user_set_password_hash(user_id, password_hash) + except StoreError as e: + if e.code == 404: + raise SynapseError(404, "Unknown user", Codes.NOT_FOUND) + raise e + yield self._auth_handler.delete_access_tokens_for_user( + user_id, except_token_id=except_access_token_id, + ) diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index a67e22790b..5022808ea9 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -309,7 +309,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet): super(ResetPasswordRestServlet, self).__init__(hs) self.hs = hs self.auth = hs.get_auth() - self.auth_handler = hs.get_auth_handler() + self._set_password_handler = hs.get_set_password_handler() @defer.inlineCallbacks def on_POST(self, request, target_user_id): @@ -330,7 +330,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet): logger.info("new_password: %r", new_password) - yield self.auth_handler.set_password( + yield self._set_password_handler.set_password( target_user_id, new_password, requester ) defer.returnValue((200, {})) diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 6202e8849d..c26ce63bcf 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -98,6 +98,7 @@ class PasswordRestServlet(RestServlet): self.auth = hs.get_auth() self.auth_handler = hs.get_auth_handler() self.datastore = self.hs.get_datastore() + self._set_password_handler = hs.get_set_password_handler() @defer.inlineCallbacks def on_POST(self, request): @@ -147,7 +148,7 @@ class PasswordRestServlet(RestServlet): raise SynapseError(400, "", Codes.MISSING_PARAM) new_password = params['new_password'] - yield self.auth_handler.set_password( + yield self._set_password_handler.set_password( user_id, new_password, requester ) diff --git a/synapse/server.py b/synapse/server.py index fea19e68d1..18c72d21a8 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -45,6 +45,7 @@ from synapse.handlers.device import DeviceHandler from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.presence import PresenceHandler from synapse.handlers.room_list import RoomListHandler +from synapse.handlers.set_password import SetPasswordHandler from synapse.handlers.sync import SyncHandler from synapse.handlers.typing import TypingHandler from synapse.handlers.events import EventHandler, EventStreamHandler @@ -117,6 +118,7 @@ class HomeServer(object): 'device_message_handler', 'profile_handler', 'deactivate_account_handler', + 'set_password_handler', 'notifier', 'event_sources', 'keyring', @@ -273,6 +275,9 @@ class HomeServer(object): def build_deactivate_account_handler(self): return DeactivateAccountHandler(self) + def build_set_password_handler(self): + return SetPasswordHandler(self) + def build_event_sources(self): return EventSources(self) diff --git a/synapse/server.pyi b/synapse/server.pyi index e1d0a71fd4..41416ef252 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -6,6 +6,7 @@ import synapse.handlers.auth import synapse.handlers.deactivate_account import synapse.handlers.device import synapse.handlers.e2e_keys +import synapse.handlers.set_password import synapse.rest.media.v1.media_repository import synapse.state import synapse.storage @@ -36,6 +37,9 @@ class HomeServer(object): def get_deactivate_account_handler(self) -> synapse.handlers.deactivate_account.DeactivateAccountHandler: pass + def get_set_password_handler(self) -> synapse.handlers.set_password.SetPasswordHandler: + pass + def get_federation_sender(self) -> synapse.federation.transaction_queue.TransactionQueue: pass From ad7e570d07e498e7a9395800650aeef0f9fbc914 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 29 Nov 2017 15:44:59 +0000 Subject: [PATCH 23/23] Delete devices in various logout situations Make sure that we delete devices whenever a user is logged out due to any of the following situations: * /logout * /logout_all * change password * deactivate account (by the user or by an admin) * invalidate access token from a dynamic module Fixes #2672. --- synapse/handlers/deactivate_account.py | 8 ++++++++ synapse/handlers/device.py | 20 ++++++++++++++++++- synapse/handlers/set_password.py | 11 +++++++++++ synapse/module_api/__init__.py | 14 +++++++++++-- synapse/rest/client/v1/logout.py | 27 ++++++++++++++++++++++++-- 5 files changed, 75 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index 70f02c9b76..b1d3814909 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -26,6 +26,7 @@ class DeactivateAccountHandler(BaseHandler): def __init__(self, hs): super(DeactivateAccountHandler, self).__init__(hs) self._auth_handler = hs.get_auth_handler() + self._device_handler = hs.get_device_handler() @defer.inlineCallbacks def deactivate_account(self, user_id): @@ -39,6 +40,13 @@ class DeactivateAccountHandler(BaseHandler): """ # FIXME: Theoretically there is a race here wherein user resets # password using threepid. + + # first delete any devices belonging to the user, which will also + # delete corresponding access tokens. + yield self._device_handler.delete_all_devices_for_user(user_id) + # then delete any remaining access tokens which weren't associated with + # a device. yield self._auth_handler.delete_access_tokens_for_user(user_id) + yield self.store.user_delete_threepids(user_id) yield self.store.user_set_password_hash(user_id, None) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 579d8477ba..2152efc692 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -170,13 +170,31 @@ class DeviceHandler(BaseHandler): yield self.notify_device_update(user_id, [device_id]) + @defer.inlineCallbacks + def delete_all_devices_for_user(self, user_id, except_device_id=None): + """Delete all of the user's devices + + Args: + user_id (str): + except_device_id (str|None): optional device id which should not + be deleted + + Returns: + defer.Deferred: + """ + device_map = yield self.store.get_devices_by_user(user_id) + device_ids = device_map.keys() + if except_device_id is not None: + device_ids = [d for d in device_ids if d != except_device_id] + yield self.delete_devices(user_id, device_ids) + @defer.inlineCallbacks def delete_devices(self, user_id, device_ids): """ Delete several devices Args: user_id (str): - device_ids (str): The list of device IDs to delete + device_ids (List[str]): The list of device IDs to delete Returns: defer.Deferred: diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py index c0d83f229c..44414e1dc1 100644 --- a/synapse/handlers/set_password.py +++ b/synapse/handlers/set_password.py @@ -27,11 +27,13 @@ class SetPasswordHandler(BaseHandler): def __init__(self, hs): super(SetPasswordHandler, self).__init__(hs) self._auth_handler = hs.get_auth_handler() + self._device_handler = hs.get_device_handler() @defer.inlineCallbacks def set_password(self, user_id, newpassword, requester=None): password_hash = self._auth_handler.hash(newpassword) + except_device_id = requester.device_id if requester else None except_access_token_id = requester.access_token_id if requester else None try: @@ -40,6 +42,15 @@ class SetPasswordHandler(BaseHandler): if e.code == 404: raise SynapseError(404, "Unknown user", Codes.NOT_FOUND) raise e + + # we want to log out all of the user's other sessions. First delete + # all his other devices. + yield self._device_handler.delete_all_devices_for_user( + user_id, except_device_id=except_device_id, + ) + + # and now delete any access tokens which weren't associated with + # devices (or were associated with this device). yield self._auth_handler.delete_access_tokens_for_user( user_id, except_token_id=except_access_token_id, ) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index dc680ddf43..097c844d31 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer from synapse.types import UserID @@ -81,6 +82,7 @@ class ModuleApi(object): reg = self.hs.get_handlers().registration_handler return reg.register(localpart=localpart) + @defer.inlineCallbacks def invalidate_access_token(self, access_token): """Invalidate an access token for a user @@ -94,8 +96,16 @@ class ModuleApi(object): Raises: synapse.api.errors.AuthError: the access token is invalid """ - - return self._auth_handler.delete_access_token(access_token) + # see if the access token corresponds to a device + user_info = yield self._auth.get_user_by_access_token(access_token) + device_id = user_info.get("device_id") + user_id = user_info["user"].to_string() + if device_id: + # delete the device, which will also delete its access tokens + yield self.hs.get_device_handler().delete_device(user_id, device_id) + else: + # no associated device. Just delete the access token. + yield self._auth_handler.delete_access_token(access_token) def run_db_interaction(self, desc, func, *args, **kwargs): """Run a function with a database connection diff --git a/synapse/rest/client/v1/logout.py b/synapse/rest/client/v1/logout.py index 6add754782..ca49955935 100644 --- a/synapse/rest/client/v1/logout.py +++ b/synapse/rest/client/v1/logout.py @@ -16,6 +16,7 @@ from twisted.internet import defer from synapse.api.auth import get_access_token_from_request +from synapse.api.errors import AuthError from .base import ClientV1RestServlet, client_path_patterns @@ -30,15 +31,30 @@ class LogoutRestServlet(ClientV1RestServlet): def __init__(self, hs): super(LogoutRestServlet, self).__init__(hs) + self._auth = hs.get_auth() self._auth_handler = hs.get_auth_handler() + self._device_handler = hs.get_device_handler() def on_OPTIONS(self, request): return (200, {}) @defer.inlineCallbacks def on_POST(self, request): - access_token = get_access_token_from_request(request) - yield self._auth_handler.delete_access_token(access_token) + try: + requester = yield self.auth.get_user_by_req(request) + except AuthError: + # this implies the access token has already been deleted. + pass + else: + if requester.device_id is None: + # the acccess token wasn't associated with a device. + # Just delete the access token + access_token = get_access_token_from_request(request) + yield self._auth_handler.delete_access_token(access_token) + else: + yield self._device_handler.delete_device( + requester.user.to_string(), requester.device_id) + defer.returnValue((200, {})) @@ -49,6 +65,7 @@ class LogoutAllRestServlet(ClientV1RestServlet): super(LogoutAllRestServlet, self).__init__(hs) self.auth = hs.get_auth() self._auth_handler = hs.get_auth_handler() + self._device_handler = hs.get_device_handler() def on_OPTIONS(self, request): return (200, {}) @@ -57,6 +74,12 @@ class LogoutAllRestServlet(ClientV1RestServlet): def on_POST(self, request): requester = yield self.auth.get_user_by_req(request) user_id = requester.user.to_string() + + # first delete all of the user's devices + yield self._device_handler.delete_all_devices_for_user(user_id) + + # .. and then delete any access tokens which weren't associated with + # devices. yield self._auth_handler.delete_access_tokens_for_user(user_id) defer.returnValue((200, {}))