diff --git a/synapse/config/registration.py b/synapse/config/registration.py index c30ab24eb9..34326718ad 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -49,6 +49,10 @@ class RegistrationConfig(Config): self.auto_join_rooms = config.get("auto_join_rooms", []) + self.replicate_user_profiles_to = config.get("replicate_user_profiles_to", []) + if not isinstance(self.replicate_user_profiles_to, list): + self.replicate_user_profiles_to = [self.replicate_user_profiles_to, ] + def default_config(self, **kwargs): registration_shared_secret = random_string_with_symbols(50) @@ -108,6 +112,12 @@ class RegistrationConfig(Config): - vector.im - riot.im + # If enabled, user IDs, display names and avatar URLs will be replicated + # to this server whenever they change. + # This is an experimental API currently implemented by sydent to support + # cross-homeserver user directories. + # replicate_user_profiles_to: example.com + # Users who register on this homeserver will automatically be joined # to these rooms #auto_join_rooms: diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 3465a787ab..7202d3c81d 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -15,12 +15,15 @@ import logging -from twisted.internet import defer +from twisted.internet import defer, reactor from synapse.api.errors import SynapseError, AuthError, CodeMessageException +from synapse.util.logcontext import run_in_background from synapse.types import UserID, get_domain_from_id from ._base import BaseHandler +from signedjson.sign import sign_json + logger = logging.getLogger(__name__) @@ -28,6 +31,8 @@ class ProfileHandler(BaseHandler): PROFILE_UPDATE_MS = 60 * 1000 PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000 + PROFILE_REPLICATE_INTERVAL = 2 * 60 * 1000 + def __init__(self, hs): super(ProfileHandler, self).__init__(hs) @@ -38,11 +43,82 @@ class ProfileHandler(BaseHandler): self.user_directory_handler = hs.get_user_directory_handler() + self.http_client = hs.get_simple_http_client() + if hs.config.worker_app is None: self.clock.looping_call( self._update_remote_profile_cache, self.PROFILE_UPDATE_MS, ) + if len(self.hs.config.replicate_user_profiles_to) > 0: + reactor.callWhenRunning(self._assign_profile_replication_batches) + reactor.callWhenRunning(self._replicate_profiles) + # Add a looping call to replicate_profiles: this handles retries + # if the replication is unsuccessful when the user updated their + # profile. + self.clock.looping_call( + self._replicate_profiles, self.PROFILE_REPLICATE_INTERVAL + ) + + @defer.inlineCallbacks + def _assign_profile_replication_batches(self): + """If no profile replication has been done yet, allocate replication batch + numbers to each profile to start the replication process. + """ + logger.info("Assigning profile batch numbers...") + total = 0 + while True: + assigned = yield self.store.assign_profile_batch() + total += assigned + if assigned == 0: + break + logger.info("Assigned %d profile batch numbers", total) + + @defer.inlineCallbacks + def _replicate_profiles(self): + """If any profile data has been updated and not pushed to the replication targets, + replicate it. + """ + host_batches = yield self.store.get_replication_hosts() + latest_batch = yield self.store.get_latest_profile_replication_batch_number() + for repl_host in self.hs.config.replicate_user_profiles_to: + if repl_host not in host_batches: + host_batches[repl_host] = -1 + try: + for i in xrange(host_batches[repl_host] + 1, latest_batch + 1): + yield self._replicate_host_profile_batch(repl_host, i) + except Exception: + logger.exception( + "Exception while replicating to %s: aborting for now", repl_host, + ) + + @defer.inlineCallbacks + def _replicate_host_profile_batch(self, host, batchnum): + logger.info("Replicating profile batch %d to %s", batchnum, host) + batch_rows = yield self.store.get_profile_batch(batchnum) + batch = { + UserID(r["user_id"], self.hs.hostname).to_string(): { + "display_name": r["displayname"], + "avatar_url": r["avatar_url"], + } for r in batch_rows + } + + url = "https://%s/_matrix/federation/v1/replicate_profiles" % (host,) + body = { + "batchnum": batchnum, + "batch": batch, + "origin_server": self.hs.hostname, + } + signed_body = sign_json(body, self.hs.hostname, self.hs.config.signing_key[0]) + try: + yield self.http_client.post_json_get_json(url, signed_body) + yield self.store.update_replication_batch_for_host(host, batchnum) + logger.info("Sucessfully replicated profile batch %d to %s", batchnum, host) + except Exception: + # This will get retried when the looping call next comes around + logger.exception("Failed to replicate profile batch %d to %s", batchnum, host) + raise + @defer.inlineCallbacks def get_profile(self, user_id): target_user = UserID.from_string(user_id) @@ -140,8 +216,14 @@ class ProfileHandler(BaseHandler): if new_displayname == '': new_displayname = None + if len(self.hs.config.replicate_user_profiles_to) > 0: + cur_batchnum = yield self.store.get_latest_profile_replication_batch_number() + new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1 + else: + new_batchnum = None + yield self.store.set_profile_displayname( - target_user.localpart, new_displayname + target_user.localpart, new_displayname, new_batchnum ) if self.hs.config.user_directory_search_all_users: @@ -152,6 +234,9 @@ class ProfileHandler(BaseHandler): yield self._update_join_states(requester, target_user) + # start a profile replication push + run_in_background(self._replicate_profiles) + @defer.inlineCallbacks def get_avatar_url(self, target_user): if self.hs.is_mine(target_user): @@ -190,8 +275,14 @@ class ProfileHandler(BaseHandler): if not by_admin and target_user != requester.user: raise AuthError(400, "Cannot set another user's avatar_url") + if len(self.hs.config.replicate_user_profiles_to) > 0: + cur_batchnum = yield self.store.get_latest_profile_replication_batch_number() + new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1 + else: + new_batchnum = None + yield self.store.set_profile_avatar_url( - target_user.localpart, new_avatar_url + target_user.localpart, new_avatar_url, new_batchnum, ) if self.hs.config.user_directory_search_all_users: @@ -202,6 +293,9 @@ class ProfileHandler(BaseHandler): yield self._update_join_states(requester, target_user) + # start a profile replication push + run_in_background(self._replicate_profiles) + @defer.inlineCallbacks def on_profile_query(self, args): user = UserID.from_string(args["user_id"]) diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index 8612bd5ecc..12e2d44406 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -20,6 +21,8 @@ from synapse.api.errors import StoreError from ._base import SQLBaseStore +BATCH_SIZE = 100 + class ProfileWorkerStore(SQLBaseStore): @defer.inlineCallbacks @@ -62,6 +65,55 @@ class ProfileWorkerStore(SQLBaseStore): desc="get_profile_avatar_url", ) + def get_latest_profile_replication_batch_number(self): + def f(txn): + txn.execute("SELECT MAX(batch) as maxbatch FROM profiles") + rows = self.cursor_to_dict(txn) + return rows[0]['maxbatch'] + return self.runInteraction( + "get_latest_profile_replication_batch_number", f, + ) + + def get_profile_batch(self, batchnum): + return self._simple_select_list( + table="profiles", + keyvalues={ + "batch": batchnum, + }, + retcols=("user_id", "displayname", "avatar_url"), + desc="get_profile_batch", + ) + + def assign_profile_batch(self): + def f(txn): + sql = ( + "UPDATE profiles SET batch = " + "(SELECT IFNULL(MAX(batch), -1) + 1 FROM profiles) " + "WHERE user_id in (" + " SELECT user_id FROM profiles WHERE batch is NULL limit ?" + ")" + ) + txn.execute(sql, (BATCH_SIZE,)) + return txn.rowcount + return self.runInteraction("assign_profile_batch", f) + + def get_replication_hosts(self): + def f(txn): + txn.execute("SELECT host, last_synced_batch FROM profile_replication_status") + rows = self.cursor_to_dict(txn) + return {r['host']: r['last_synced_batch'] for r in rows} + return self.runInteraction("get_replication_hosts", f) + + def update_replication_batch_for_host(self, host, last_synced_batch): + return self._simple_upsert( + table="profile_replication_status", + keyvalues={"host": host}, + values={ + "last_synced_batch": last_synced_batch, + }, + desc="update_replication_batch_for_host", + ) + def get_from_remote_profile_cache(self, user_id): return self._simple_select_one( table="remote_profile_cache", @@ -80,19 +132,25 @@ class ProfileStore(ProfileWorkerStore): desc="create_profile", ) - def set_profile_displayname(self, user_localpart, new_displayname): + def set_profile_displayname(self, user_localpart, new_displayname, batchnum): return self._simple_update_one( table="profiles", keyvalues={"user_id": user_localpart}, - updatevalues={"displayname": new_displayname}, + updatevalues={ + "displayname": new_displayname, + "batch": batchnum, + }, desc="set_profile_displayname", ) - def set_profile_avatar_url(self, user_localpart, new_avatar_url): + def set_profile_avatar_url(self, user_localpart, new_avatar_url, batchnum): return self._simple_update_one( table="profiles", keyvalues={"user_id": user_localpart}, - updatevalues={"avatar_url": new_avatar_url}, + updatevalues={ + "avatar_url": new_avatar_url, + "batch": batchnum, + }, desc="set_profile_avatar_url", ) diff --git a/synapse/storage/schema/delta/48/profiles_batch.sql b/synapse/storage/schema/delta/48/profiles_batch.sql new file mode 100644 index 0000000000..e744c02fe8 --- /dev/null +++ b/synapse/storage/schema/delta/48/profiles_batch.sql @@ -0,0 +1,36 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Add a batch number to track changes to profiles and the + * order they're made in so we can replicate user profiles + * to other hosts as they change + */ +ALTER TABLE profiles ADD COLUMN batch BIGINT DEFAULT NULL; + +/* + * Index on the batch number so we can get profiles + * by their batch + */ +CREATE INDEX profiles_batch_idx ON profiles(batch); + +/* + * A table to track what batch of user profiles has been + * synced to what profile replication target. + */ +CREATE TABLE profile_replication_status ( + host TEXT NOT NULL, + last_synced_batch BIGINT NOT NULL +); diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index 458296ee4c..8646c4e434 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -75,7 +75,7 @@ class ProfileTestCase(unittest.TestCase): @defer.inlineCallbacks def test_get_my_name(self): yield self.store.set_profile_displayname( - self.frank.localpart, "Frank" + self.frank.localpart, "Frank", 1, ) displayname = yield self.handler.get_displayname(self.frank) @@ -124,7 +124,7 @@ class ProfileTestCase(unittest.TestCase): @defer.inlineCallbacks def test_incoming_fed_query(self): yield self.store.create_profile("caroline") - yield self.store.set_profile_displayname("caroline", "Caroline") + yield self.store.set_profile_displayname("caroline", "Caroline", 1) response = yield self.query_handlers["profile"]( {"user_id": "@caroline:test", "field": "displayname"} @@ -135,7 +135,7 @@ class ProfileTestCase(unittest.TestCase): @defer.inlineCallbacks def test_get_my_avatar(self): yield self.store.set_profile_avatar_url( - self.frank.localpart, "http://my.server/me.png" + self.frank.localpart, "http://my.server/me.png", 1, ) avatar_url = yield self.handler.get_avatar_url(self.frank) diff --git a/tests/storage/test_profile.py b/tests/storage/test_profile.py index 423710c9c1..1bfabc15ad 100644 --- a/tests/storage/test_profile.py +++ b/tests/storage/test_profile.py @@ -40,7 +40,7 @@ class ProfileStoreTestCase(unittest.TestCase): ) yield self.store.set_profile_displayname( - self.u_frank.localpart, "Frank" + self.u_frank.localpart, "Frank", 1, ) self.assertEquals( @@ -55,7 +55,7 @@ class ProfileStoreTestCase(unittest.TestCase): ) yield self.store.set_profile_avatar_url( - self.u_frank.localpart, "http://my.site/here" + self.u_frank.localpart, "http://my.site/here", 1, ) self.assertEquals( diff --git a/tests/utils.py b/tests/utils.py index f15317d27b..1e532f4a27 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -60,6 +60,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): config.block_non_admin_invites = False config.federation_domain_whitelist = None config.user_directory_search_all_users = False + config.replicate_user_profiles_to = [] # disable user directory updates, because they get done in the # background, which upsets the test runner.