From f4e7545d8863e24a2327636181a49589d1e78ed8 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Tue, 28 Feb 2017 18:11:52 +0000 Subject: [PATCH 01/11] No longer need to request all the sub-components to be split when running sytest+dendron --- jenkins-dendron-postgres.sh | 6 ------ 1 file changed, 6 deletions(-) diff --git a/jenkins-dendron-postgres.sh b/jenkins-dendron-postgres.sh index 55ff31fd18..37ae746f4b 100755 --- a/jenkins-dendron-postgres.sh +++ b/jenkins-dendron-postgres.sh @@ -17,9 +17,3 @@ export SYNAPSE_CACHE_FACTOR=1 ./sytest/jenkins/install_and_run.sh \ --synapse-directory $WORKSPACE \ --dendron $WORKSPACE/dendron/bin/dendron \ - --pusher \ - --synchrotron \ - --federation-reader \ - --client-reader \ - --appservice \ - --federation-sender \ From 6b1ffa5f3de93618ae748391215e7139099b265f Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Tue, 28 Feb 2017 18:14:13 +0000 Subject: [PATCH 02/11] Added also a control script to run via the crazy dendron+haproxy hybrid we're temporarily using --- jenkins-dendron-haproxy-postgres.sh | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100755 jenkins-dendron-haproxy-postgres.sh diff --git a/jenkins-dendron-haproxy-postgres.sh b/jenkins-dendron-haproxy-postgres.sh new file mode 100755 index 0000000000..d64b2d2c9d --- /dev/null +++ b/jenkins-dendron-haproxy-postgres.sh @@ -0,0 +1,22 @@ +#!/bin/bash + +set -eux + +: ${WORKSPACE:="$(pwd)"} + +export WORKSPACE +export PYTHONDONTWRITEBYTECODE=yep +export SYNAPSE_CACHE_FACTOR=1 + +export HAPROXY_BIN=/home/haproxy/haproxy-1.6.11/haproxy + +./jenkins/prepare_synapse.sh +./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git +./jenkins/clone.sh dendron https://github.com/matrix-org/dendron.git +./dendron/jenkins/build_dendron.sh +./sytest/jenkins/prep_sytest_for_postgres.sh + +./sytest/jenkins/install_and_run.sh \ + --synapse-directory $WORKSPACE \ + --dendron $WORKSPACE/dendron/bin/dendron \ + --haproxy \ From 3365117151217f0ea96291aabce222f8b58bd850 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Mar 2017 10:21:30 +0000 Subject: [PATCH 03/11] Clobber old device list stream entries --- synapse/storage/devices.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 7b5903bf8e..f9ed18d2aa 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -546,6 +546,16 @@ class DeviceStore(SQLBaseStore): host, stream_id, ) + # Delete older entries in the table, as we really only care about + # when the latest change happened. + txn.executemany( + """ + DELETE FROM device_lists_stream + WHERE user_id = ? AND device_id = ? AND stream_id < ? + """, + [(user_id, device_id, stream_id) for device_id in device_ids] + ) + self._simple_insert_many_txn( txn, table="device_lists_stream", From 36be39b8b39f4fc9b3a272faa306660b95145dad Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Mar 2017 14:12:11 +0000 Subject: [PATCH 04/11] Fix device list update to not constantly resync --- synapse/handlers/device.py | 177 ++++++++++++++++++++++++++----------- 1 file changed, 123 insertions(+), 54 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index ca7137f315..23dab53df1 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -34,10 +34,11 @@ class DeviceHandler(BaseHandler): self.state = hs.get_state_handler() self.federation_sender = hs.get_federation_sender() self.federation = hs.get_replication_layer() - self._remote_edue_linearizer = Linearizer(name="remote_device_list") + + self._edu_updater = DeviceListEduUpdater(hs, self) self.federation.register_edu_handler( - "m.device_list_update", self._incoming_device_list_update, + "m.device_list_update", self._edu_updater.incoming_device_list_update, ) self.federation.register_query_handler( "user_devices", self.on_federation_query_user_devices, @@ -299,58 +300,6 @@ class DeviceHandler(BaseHandler): # and those that actually still share a room with the user defer.returnValue(users_who_share_room & possibly_changed) - @measure_func("_incoming_device_list_update") - @defer.inlineCallbacks - def _incoming_device_list_update(self, origin, edu_content): - user_id = edu_content["user_id"] - device_id = edu_content["device_id"] - stream_id = edu_content["stream_id"] - prev_ids = edu_content.get("prev_id", []) - - if get_domain_from_id(user_id) != origin: - # TODO: Raise? - logger.warning("Got device list update edu for %r from %r", user_id, origin) - return - - rooms = yield self.store.get_rooms_for_user(user_id) - if not rooms: - # We don't share any rooms with this user. Ignore update, as we - # probably won't get any further updates. - return - - with (yield self._remote_edue_linearizer.queue(user_id)): - # If the prev id matches whats in our cache table, then we don't need - # to resync the users device list, otherwise we do. - resync = True - if len(prev_ids) == 1: - extremity = yield self.store.get_device_list_last_stream_id_for_remote( - user_id - ) - logger.info("Extrem: %r, prev_ids: %r", extremity, prev_ids) - if str(extremity) == str(prev_ids[0]): - resync = False - - if resync: - # Fetch all devices for the user. - result = yield self.federation.query_user_devices(origin, user_id) - stream_id = result["stream_id"] - devices = result["devices"] - yield self.store.update_remote_device_list_cache( - user_id, devices, stream_id, - ) - device_ids = [device["device_id"] for device in devices] - yield self.notify_device_update(user_id, device_ids) - else: - # Simply update the single device, since we know that is the only - # change (becuase of the single prev_id matching the current cache) - content = dict(edu_content) - for key in ("user_id", "device_id", "stream_id", "prev_ids"): - content.pop(key, None) - yield self.store.update_remote_device_list_cache_entry( - user_id, device_id, content, stream_id, - ) - yield self.notify_device_update(user_id, [device_id]) - @defer.inlineCallbacks def on_federation_query_user_devices(self, user_id): stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) @@ -376,3 +325,123 @@ def _update_device_from_client_ips(device, client_ips): "last_seen_ts": ip.get("last_seen"), "last_seen_ip": ip.get("ip"), }) + + +class DeviceListEduUpdater(object): + "Handles incoming device list updates from federation and updates the DB" + + def __init__(self, hs, device_handler): + self.store = hs.get_datastore() + self.federation = hs.get_replication_layer() + self.clock = hs.get_clock() + self.device_handler = device_handler + + self._remote_edue_linearizer = Linearizer(name="remote_device_list") + + # user_id -> list of updates waiting to be handled. + self._pending_updates = {} + + # Recently seen stream ids. We don't bother keeping these in the DB, + # but they're useful to have them about to reduce the number of spurious + # resyncs. + self._seen_updates = {} + + @defer.inlineCallbacks + def incoming_device_list_update(self, origin, edu_content): + """Called on incoming device list update from federation. Responsible + for parsing the EDU and adding to pending updates list. + """ + + user_id = edu_content.pop("user_id") + device_id = edu_content.pop("device_id") + stream_id = str(edu_content.pop("stream_id")) # They may come as ints + prev_ids = edu_content.pop("prev_id", []) + prev_ids = [str(p) for p in prev_ids] # They may come as ints + + if get_domain_from_id(user_id) != origin: + # TODO: Raise? + logger.warning("Got device list update edu for %r from %r", user_id, origin) + return + + rooms = yield self.store.get_rooms_for_user(user_id) + if not rooms: + # We don't share any rooms with this user. Ignore update, as we + # probably won't get any further updates. + return + + self._pending_updates.setdefault(user_id, []).append( + (device_id, stream_id, prev_ids, edu_content) + ) + + yield self._handle_device_updates(user_id) + + @measure_func("_incoming_device_list_update") + @defer.inlineCallbacks + def _handle_device_updates(self, user_id): + "Actually handle pending updates." + + with (yield self._remote_edue_linearizer.queue(user_id)): + pending_updates = self._pending_updates.pop(user_id, []) + if not pending_updates: + # This can happen since we batch updates + return + + resync = yield self._need_to_do_resync(user_id, pending_updates) + + if resync: + # Fetch all devices for the user. + origin = get_domain_from_id(user_id) + result = yield self.federation.query_user_devices(origin, user_id) + stream_id = result["stream_id"] + devices = result["devices"] + yield self.store.update_remote_device_list_cache( + user_id, devices, stream_id, + ) + device_ids = [device["device_id"] for device in devices] + yield self.device_handler.notify_device_update(user_id, device_ids) + else: + # Simply update the single device, since we know that is the only + # change (becuase of the single prev_id matching the current cache) + for device_id, stream_id, prev_ids, content in pending_updates: + yield self.store.update_remote_device_list_cache_entry( + user_id, device_id, content, stream_id, + ) + + yield self.device_handler.notify_device_update( + user_id, [device_id for device_id, _, _, _ in pending_updates] + ) + + self._seen_updates.setdefault(user_id, set()).update( + [stream_id for _, stream_id, _, _ in pending_updates] + ) + + @defer.inlineCallbacks + def _need_to_do_resync(self, user_id, updates): + """Given a list of updates for a user figure out if we need to do a full + resync, or whether we have enough data that we can just apply the delta. + """ + seen_updates = self._seen_updates.get(user_id, set()) + + extremity = yield self.store.get_device_list_last_stream_id_for_remote( + user_id + ) + + stream_id_in_updates = set() # stream_ids in updates list + for _, stream_id, prev_ids, _ in updates: + if not prev_ids: + # We always do a resync if there are no previous IDs + defer.returnValue(True) + + for prev_id in prev_ids: + if prev_id == extremity: + continue + elif prev_id in seen_updates: + continue + elif prev_id in stream_id_in_updates: + continue + else: + defer.returnValue(True) + + stream_id_in_updates.add(stream_id) + + defer.returnValue(False) From d766343668e63a7572dcfe571d38ea3e143f3c1c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Mar 2017 15:56:30 +0000 Subject: [PATCH 05/11] Add index to device_lists_stream --- synapse/storage/deviceinbox.py | 8 ++++---- synapse/storage/devices.py | 7 +++++++ .../schema/delta/41/device_list_stream_idx.sql | 17 +++++++++++++++++ 3 files changed, 28 insertions(+), 4 deletions(-) create mode 100644 synapse/storage/schema/delta/41/device_list_stream_idx.sql diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index bde3b5cbbc..1951de0ce1 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -31,10 +31,10 @@ class DeviceInboxStore(BackgroundUpdateStore): super(DeviceInboxStore, self).__init__(hs) self.register_background_index_update( - "device_inbox_stream_index", - index_name="device_inbox_stream_id_user_id", - table="device_inbox", - columns=["stream_id", "user_id"], + "device_lists_stream_idx", + index_name="device_lists_stream_user_id", + table="device_lists_stream", + columns=["user_id", "device_id"], ) self.register_background_update_handler( diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index f9ed18d2aa..ed659b7001 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -33,6 +33,13 @@ class DeviceStore(SQLBaseStore): self._prune_old_outbound_device_pokes, 60 * 60 * 1000 ) + self.register_background_index_update( + "device_inbox_stream_index", + index_name="device_inbox_stream_id_user_id", + table="device_inbox", + columns=["stream_id", "user_id"], + ) + @defer.inlineCallbacks def store_device(self, user_id, device_id, initial_device_display_name): diff --git a/synapse/storage/schema/delta/41/device_list_stream_idx.sql b/synapse/storage/schema/delta/41/device_list_stream_idx.sql new file mode 100644 index 0000000000..b7bee8b692 --- /dev/null +++ b/synapse/storage/schema/delta/41/device_list_stream_idx.sql @@ -0,0 +1,17 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT into background_updates (update_name, progress_json) + VALUES ('device_lists_stream_idx', '{}'); From 856a18f7a8076860b59060912a431bb2b80bec82 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 1 Mar 2017 16:24:11 +0000 Subject: [PATCH 06/11] kick jenkins From da52d3af317f03cd95c2f3158bf95290d828f2aa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 3 Mar 2017 15:29:13 +0000 Subject: [PATCH 07/11] Fix up --- synapse/storage/deviceinbox.py | 8 ++++---- synapse/storage/devices.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 1951de0ce1..bde3b5cbbc 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -31,10 +31,10 @@ class DeviceInboxStore(BackgroundUpdateStore): super(DeviceInboxStore, self).__init__(hs) self.register_background_index_update( - "device_lists_stream_idx", - index_name="device_lists_stream_user_id", - table="device_lists_stream", - columns=["user_id", "device_id"], + "device_inbox_stream_index", + index_name="device_inbox_stream_id_user_id", + table="device_inbox", + columns=["stream_id", "user_id"], ) self.register_background_update_handler( diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index ed659b7001..81c43d31f6 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -34,10 +34,10 @@ class DeviceStore(SQLBaseStore): ) self.register_background_index_update( - "device_inbox_stream_index", - index_name="device_inbox_stream_id_user_id", - table="device_inbox", - columns=["stream_id", "user_id"], + "device_lists_stream_idx", + index_name="device_lists_stream_user_id", + table="device_lists_stream", + columns=["user_id", "device_id"], ) @defer.inlineCallbacks From 9834367eeaecd7f356cf7cda1e1b3eb79f8f2ea2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 3 Mar 2017 15:31:57 +0000 Subject: [PATCH 08/11] Spelling --- synapse/handlers/device.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 23dab53df1..540b438797 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -336,7 +336,7 @@ class DeviceListEduUpdater(object): self.clock = hs.get_clock() self.device_handler = device_handler - self._remote_edue_linearizer = Linearizer(name="remote_device_list") + self._remote_edu_linearizer = Linearizer(name="remote_device_list") # user_id -> list of updates waiting to be handled. self._pending_updates = {} @@ -380,7 +380,7 @@ class DeviceListEduUpdater(object): def _handle_device_updates(self, user_id): "Actually handle pending updates." - with (yield self._remote_edue_linearizer.queue(user_id)): + with (yield self._remote_edu_linearizer.queue(user_id)): pending_updates = self._pending_updates.pop(user_id, []) if not pending_updates: # This can happen since we batch updates From f2581ee8b81e72c49b0c16ca41071c87292c0227 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 3 Mar 2017 16:02:53 +0000 Subject: [PATCH 09/11] Don't keep around old stream IDs forever --- synapse/handlers/device.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 540b438797..e859b3165f 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -16,6 +16,7 @@ from synapse.api import errors from synapse.api.constants import EventTypes from synapse.util import stringutils from synapse.util.async import Linearizer +from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import measure_func from synapse.types import get_domain_from_id, RoomStreamToken from twisted.internet import defer @@ -344,7 +345,13 @@ class DeviceListEduUpdater(object): # Recently seen stream ids. We don't bother keeping these in the DB, # but they're useful to have them about to reduce the number of spurious # resyncs. - self._seen_updates = {} + self._seen_updates = ExpiringCache( + cache_name="device_update_edu", + clock=self.clock, + max_len=10000, + expiry_ms=30 * 60 * 1000, + iterable=True, + ) @defer.inlineCallbacks def incoming_device_list_update(self, origin, edu_content): @@ -412,7 +419,7 @@ class DeviceListEduUpdater(object): ) self._seen_updates.setdefault(user_id, set()).update( - [stream_id for _, stream_id, _, _ in pending_updates] + stream_id for _, stream_id, _, _ in pending_updates ) @defer.inlineCallbacks From ac5491f56308405890530ae09ac6ffcf93ad48b7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Mar 2017 11:10:10 +0000 Subject: [PATCH 10/11] Select distinct devices from DB Otherwise we might pull out tonnes of duplicate user_ids and this can make synapse sad. --- synapse/storage/devices.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 81c43d31f6..bd56ba2515 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -508,7 +508,7 @@ class DeviceStore(SQLBaseStore): defer.returnValue(set(changed)) sql = """ - SELECT user_id FROM device_lists_stream WHERE stream_id > ? + SELECT DISTINCT user_id FROM device_lists_stream WHERE stream_id > ? """ rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key) defer.returnValue(set(row[0] for row in rows)) From 6ad71cc29d8e0f8f5cc7976a84fa0b953edf0f82 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 8 Mar 2017 18:00:44 +0000 Subject: [PATCH 11/11] Remove spurious SQL logging (#1972) looks like the upsert function was accidentally sending sql logging to the general logger. We already log the sql in `txn.execute`. --- synapse/storage/_base.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 4410cd9e62..a7a8ec9b7b 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -488,10 +488,6 @@ class SQLBaseStore(object): " AND ".join("%s = ?" % (k,) for k in keyvalues) ) sqlargs = values.values() + keyvalues.values() - logger.debug( - "[SQL] %s Args=%s", - sql, sqlargs, - ) txn.execute(sql, sqlargs) if txn.rowcount == 0: @@ -506,10 +502,6 @@ class SQLBaseStore(object): ", ".join(k for k in allvalues), ", ".join("?" for _ in allvalues) ) - logger.debug( - "[SQL] %s Args=%s", - sql, keyvalues.values(), - ) txn.execute(sql, allvalues.values()) return True