diff --git a/synapse/config/registration.py b/synapse/config/registration.py index 2854a48d0a..8f4d051fba 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -49,6 +49,10 @@ class RegistrationConfig(Config): self.auto_join_rooms = config.get("auto_join_rooms", []) + self.replicate_user_profiles_to = config.get("replicate_user_profiles_to", []) + if not isinstance(self.replicate_user_profiles_to, list): + self.replicate_user_profiles_to = [self.replicate_user_profiles_to,] + def default_config(self, **kwargs): registration_shared_secret = random_string_with_symbols(50) @@ -106,6 +110,10 @@ class RegistrationConfig(Config): - vector.im - riot.im + # If enabled, user IDs, display names and avatar URLs will be replicated + # to this server whenever they change. + # replicate_user_profiles_to: example.com + # Users who register on this homeserver will automatically be joined # to these rooms #auto_join_rooms: diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index cb710fe796..be9d1d8c41 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -15,12 +15,14 @@ import logging -from twisted.internet import defer +from twisted.internet import defer, reactor from synapse.api.errors import SynapseError, AuthError, CodeMessageException from synapse.types import UserID, get_domain_from_id from ._base import BaseHandler +from signedjson.sign import sign_json + logger = logging.getLogger(__name__) @@ -28,6 +30,8 @@ class ProfileHandler(BaseHandler): PROFILE_UPDATE_MS = 60 * 1000 PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000 + PROFILE_REPLICATE_INTERVAL = 2 * 60 * 1000 + def __init__(self, hs): super(ProfileHandler, self).__init__(hs) @@ -38,8 +42,72 @@ class ProfileHandler(BaseHandler): self.user_directory_handler = hs.get_user_directory_handler() + self.http_client = hs.get_simple_http_client() + self.clock.looping_call(self._update_remote_profile_cache, self.PROFILE_UPDATE_MS) + reactor.callWhenRunning(self._assign_profile_replication_batches) + reactor.callWhenRunning(self._replicate_profiles) + self.clock.looping_call(self._replicate_profiles, self.PROFILE_REPLICATE_INTERVAL) + + @defer.inlineCallbacks + def _assign_profile_replication_batches(self): + """If no profile replication has been done yet, allocate replication batch + numbers to each profile to start the replication process. + """ + logger.info("Assigning profile batch numbers...") + total = 0 + while True: + assigned = yield self.store.assign_profile_batch() + total += assigned + if assigned == 0: + break + logger.info("Assigned %d profile batch numbers", total) + + @defer.inlineCallbacks + def _replicate_profiles(self): + """If any profile data has been updated and not pushed to the replication targets, + replicate it. + """ + host_batches = yield self.store.get_replication_hosts() + latest_batch = yield self.store.get_latest_profile_replication_batch_number() + for repl_host in self.hs.config.replicate_user_profiles_to: + if repl_host not in host_batches: + host_batches[repl_host] = -1 + try: + for i in xrange(host_batches[repl_host] + 1, latest_batch + 1): + yield self._replicate_host_profile_batch(repl_host, i) + except: + logger.exception( + "Exception while replicating to %s: aborting for now", repl_host, + ) + + @defer.inlineCallbacks + def _replicate_host_profile_batch(self, host, batchnum): + logger.info("Replicating profile batch %d to %s", batchnum, host) + batch_rows = yield self.store.get_profile_batch(batchnum) + batch = { + UserID(r["user_id"], self.hs.hostname).to_string(): { + "displayname": r["displayname"], + "avatar_url": r["avatar_url"], + } for r in batch_rows + } + + url = "https://%s/_matrix/federation/v1/replicate_profiles" % (host,) + signed_batch = { + "batchnum": batchnum, + "signed_batch": sign_json(batch, self.hs.hostname, self.hs.config.signing_key[0]), + "origin_server": self.hs.hostname, + } + try: + yield self.http_client.post_json_get_json(url, signed_batch) + self.store.update_replication_batch_for_host(host, batchnum) + logger.info("Sucessfully replicated profile batch %d to %s", batchnum, host) + except: + # This will get retried when the looping call next comes around + logger.exception("Failed to replicate profile batch %d to %s", batchnum, host) + raise + @defer.inlineCallbacks def get_profile(self, user_id): target_user = UserID.from_string(user_id) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index c845a0cec5..68675e15d2 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 47 +SCHEMA_VERSION = 48 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index ec02e73bc2..cfc4a0606e 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -20,6 +21,8 @@ from synapse.api.errors import StoreError from ._base import SQLBaseStore +BATCH_SIZE = 100 + class ProfileStore(SQLBaseStore): def create_profile(self, user_localpart): @@ -85,6 +88,59 @@ class ProfileStore(SQLBaseStore): desc="set_profile_avatar_url", ) + @defer.inlineCallbacks + def get_latest_profile_replication_batch_number(self): + def f(txn): + txn.execute("SELECT MAX(batch) as maxbatch FROM profiles") + rows = self.cursor_to_dict(txn) + return rows[0]['maxbatch'] + max_batch = yield self.runInteraction("get_latest_profile_replication_batch_number", f) + defer.returnValue(max_batch) + + def get_profile_batch(self, batchnum): + return self._simple_select_list( + table="profiles", + keyvalues={ + "batch": batchnum, + }, + retcols=("user_id", "displayname", "avatar_url"), + desc="get_profile_batch", + ) + + @defer.inlineCallbacks + def assign_profile_batch(self): + def f(txn): + sql = ( + "UPDATE profiles SET batch = " + "(SELECT IFNULL(MAX(batch), -1) + 1 FROM profiles) " + "WHERE user_id in (" + " SELECT user_id FROM profiles WHERE batch is NULL limit ?" + ")" + ) + txn.execute(sql, (BATCH_SIZE,)) + return txn.rowcount + assigned = yield self.runInteraction("assign_profile_batch", f) + defer.returnValue(assigned) + + @defer.inlineCallbacks + def get_replication_hosts(self): + def f(txn): + txn.execute("SELECT host, last_synced_batch FROM profile_replication_status") + rows = self.cursor_to_dict(txn) + return { r['host']: r['last_synced_batch'] for r in rows } + result = yield self.runInteraction("get_replication_hosts", f) + defer.returnValue(result) + + def update_replication_batch_for_host(self, host, last_synced_batch): + return self._simple_upsert( + table="profile_replication_status", + keyvalues={"host": host}, + values={ + "last_synced_batch": last_synced_batch, + }, + desc="update_replication_batch_for_host", + ) + def get_from_remote_profile_cache(self, user_id): return self._simple_select_one( table="remote_profile_cache", diff --git a/synapse/storage/schema/delta/48/profiles_batch.sql b/synapse/storage/schema/delta/48/profiles_batch.sql new file mode 100644 index 0000000000..7639ff22d5 --- /dev/null +++ b/synapse/storage/schema/delta/48/profiles_batch.sql @@ -0,0 +1,23 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE profiles ADD COLUMN batch BIGINT DEFAULT NULL; + +CREATE INDEX profiles_batch_idx ON profiles(batch); + +CREATE TABLE profile_replication_status ( + host TEXT NOT NULL, + last_synced_batch BIGINT NOT NULL +);