diff --git a/synapse/config/registration.py b/synapse/config/registration.py index 5c21287998..34326718ad 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -114,6 +114,8 @@ class RegistrationConfig(Config): # If enabled, user IDs, display names and avatar URLs will be replicated # to this server whenever they change. + # This is an experimental API currently implemented by sydent to support + # cross-homeserver user directories. # replicate_user_profiles_to: example.com # Users who register on this homeserver will automatically be joined diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 8332771c15..7202d3c81d 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -53,6 +53,9 @@ class ProfileHandler(BaseHandler): if len(self.hs.config.replicate_user_profiles_to) > 0: reactor.callWhenRunning(self._assign_profile_replication_batches) reactor.callWhenRunning(self._replicate_profiles) + # Add a looping call to replicate_profiles: this handles retries + # if the replication is unsuccessful when the user updated their + # profile. self.clock.looping_call( self._replicate_profiles, self.PROFILE_REPLICATE_INTERVAL ) @@ -109,7 +112,7 @@ class ProfileHandler(BaseHandler): signed_body = sign_json(body, self.hs.hostname, self.hs.config.signing_key[0]) try: yield self.http_client.post_json_get_json(url, signed_body) - self.store.update_replication_batch_for_host(host, batchnum) + yield self.store.update_replication_batch_for_host(host, batchnum) logger.info("Sucessfully replicated profile batch %d to %s", batchnum, host) except Exception: # This will get retried when the looping call next comes around diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index 048f48dcc1..12e2d44406 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -65,16 +65,14 @@ class ProfileWorkerStore(SQLBaseStore): desc="get_profile_avatar_url", ) - @defer.inlineCallbacks def get_latest_profile_replication_batch_number(self): def f(txn): txn.execute("SELECT MAX(batch) as maxbatch FROM profiles") rows = self.cursor_to_dict(txn) return rows[0]['maxbatch'] - max_batch = yield self.runInteraction( + return self.runInteraction( "get_latest_profile_replication_batch_number", f, ) - defer.returnValue(max_batch) def get_profile_batch(self, batchnum): return self._simple_select_list( @@ -86,7 +84,6 @@ class ProfileWorkerStore(SQLBaseStore): desc="get_profile_batch", ) - @defer.inlineCallbacks def assign_profile_batch(self): def f(txn): sql = ( @@ -98,17 +95,14 @@ class ProfileWorkerStore(SQLBaseStore): ) txn.execute(sql, (BATCH_SIZE,)) return txn.rowcount - assigned = yield self.runInteraction("assign_profile_batch", f) - defer.returnValue(assigned) + return self.runInteraction("assign_profile_batch", f) - @defer.inlineCallbacks def get_replication_hosts(self): def f(txn): txn.execute("SELECT host, last_synced_batch FROM profile_replication_status") rows = self.cursor_to_dict(txn) return {r['host']: r['last_synced_batch'] for r in rows} - result = yield self.runInteraction("get_replication_hosts", f) - defer.returnValue(result) + return self.runInteraction("get_replication_hosts", f) def update_replication_batch_for_host(self, host, last_synced_batch): return self._simple_upsert( diff --git a/synapse/storage/schema/delta/48/profiles_batch.sql b/synapse/storage/schema/delta/48/profiles_batch.sql index 7639ff22d5..e744c02fe8 100644 --- a/synapse/storage/schema/delta/48/profiles_batch.sql +++ b/synapse/storage/schema/delta/48/profiles_batch.sql @@ -13,10 +13,23 @@ * limitations under the License. */ +/* + * Add a batch number to track changes to profiles and the + * order they're made in so we can replicate user profiles + * to other hosts as they change + */ ALTER TABLE profiles ADD COLUMN batch BIGINT DEFAULT NULL; +/* + * Index on the batch number so we can get profiles + * by their batch + */ CREATE INDEX profiles_batch_idx ON profiles(batch); +/* + * A table to track what batch of user profiles has been + * synced to what profile replication target. + */ CREATE TABLE profile_replication_status ( host TEXT NOT NULL, last_synced_batch BIGINT NOT NULL diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index 205190f8d6..8646c4e434 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -75,7 +75,7 @@ class ProfileTestCase(unittest.TestCase): @defer.inlineCallbacks def test_get_my_name(self): yield self.store.set_profile_displayname( - self.frank.localpart, "Frank", 1 + self.frank.localpart, "Frank", 1, ) displayname = yield self.handler.get_displayname(self.frank) @@ -135,7 +135,7 @@ class ProfileTestCase(unittest.TestCase): @defer.inlineCallbacks def test_get_my_avatar(self): yield self.store.set_profile_avatar_url( - self.frank.localpart, "http://my.server/me.png", 1 + self.frank.localpart, "http://my.server/me.png", 1, ) avatar_url = yield self.handler.get_avatar_url(self.frank) diff --git a/tests/storage/test_profile.py b/tests/storage/test_profile.py index 6b0cc17010..1bfabc15ad 100644 --- a/tests/storage/test_profile.py +++ b/tests/storage/test_profile.py @@ -40,7 +40,7 @@ class ProfileStoreTestCase(unittest.TestCase): ) yield self.store.set_profile_displayname( - self.u_frank.localpart, "Frank", 1 + self.u_frank.localpart, "Frank", 1, ) self.assertEquals( @@ -55,7 +55,7 @@ class ProfileStoreTestCase(unittest.TestCase): ) yield self.store.set_profile_avatar_url( - self.u_frank.localpart, "http://my.site/here", 1 + self.u_frank.localpart, "http://my.site/here", 1, ) self.assertEquals(