From ec5717caf59eb72caf6f82f1643f492f328a4be5 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 22 Jul 2016 13:14:03 +0100 Subject: [PATCH 1/4] Create index on user_ips in the background user_ips is kinda big, so really we want to add the index in the background once we're running. Replace the schema delta with one which will do that. I've done this in a way that's reasonably easy to reuse as there a few other indexes I need, and I don't suppose they will be the last. --- synapse/storage/background_updates.py | 73 +++++++++++++++++-- synapse/storage/client_ips.py | 16 +++- .../schema/delta/33/user_ips_index.sql | 3 +- 3 files changed, 80 insertions(+), 12 deletions(-) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 66a995157d..75951d0173 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -14,6 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore +from . import engines from twisted.internet import defer @@ -106,13 +107,13 @@ class BackgroundUpdateStore(SQLBaseStore): ) except: logger.exception("Error doing update") - - if result is None: - logger.info( - "No more background updates to do." - " Unscheduling background update task." - ) - return + else: + if result is None: + logger.info( + "No more background updates to do." + " Unscheduling background update task." + ) + return @defer.inlineCallbacks def do_background_update(self, desired_duration_ms): @@ -202,6 +203,64 @@ class BackgroundUpdateStore(SQLBaseStore): """ self._background_update_handlers[update_name] = update_handler + def register_background_index_update(self, update_name, index_name, + table, columns): + """Helper for store classes to do a background index addition + + To use: + + 1. use a schema delta file to add a background update. Example: + INSERT INTO background_updates (update_name, progress_json) VALUES + ('my_new_index', '{}'); + + 2. In the Store constructor, call this method + + Args: + update_name (str): update_name to register for + index_name (str): name of index to add + table (str): table to add index to + columns (list[str]): columns/expressions to include in index + """ + + # if this is postgres, we add the indexes concurrently. Otherwise + # we fall back to doing it inline + if isinstance(self.database_engine, engines.PostgresEngine): + conc = True + else: + conc = False + + sql = "CREATE INDEX %(conc)s %(name)s ON %(table)s (%(columns)s)" \ + % { + "conc": "CONCURRENTLY" if conc else "", + "name": index_name, + "table": table, + "columns": ", ".join(columns), + } + + def create_index_concurrently(conn): + conn.rollback() + # postgres insists on autocommit for the index + conn.set_session(autocommit=True) + c = conn.cursor() + c.execute(sql) + conn.set_session(autocommit=False) + + def create_index(conn): + c = conn.cursor() + c.execute(sql) + + @defer.inlineCallbacks + def updater(progress, batch_size): + logger.info("Adding index %s to %s", index_name, table) + if conc: + yield self.runWithConnection(create_index_concurrently) + else: + yield self.runWithConnection(create_index) + yield self._end_background_update(update_name) + defer.returnValue(1) + + self.register_background_update_handler(update_name, updater) + def start_background_update(self, update_name, progress): """Starts a background update running. diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index e31fa53c3f..20eb9ac15f 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -15,10 +15,11 @@ import logging -from ._base import SQLBaseStore, Cache - from twisted.internet import defer +from ._base import Cache +from . import background_updates + logger = logging.getLogger(__name__) # Number of msec of granularity to store the user IP 'last seen' time. Smaller @@ -27,8 +28,7 @@ logger = logging.getLogger(__name__) LAST_SEEN_GRANULARITY = 120 * 1000 -class ClientIpStore(SQLBaseStore): - +class ClientIpStore(background_updates.BackgroundUpdateStore): def __init__(self, hs): self.client_ip_last_seen = Cache( name="client_ip_last_seen", @@ -37,6 +37,14 @@ class ClientIpStore(SQLBaseStore): super(ClientIpStore, self).__init__(hs) + self.register_background_index_update( + "user_ips_device_index", + index_name="user_ips_device_id", + table="user_ips", + columns=["user_id", "device_id", "last_seen"], + ) + + @defer.inlineCallbacks def insert_client_ip(self, user, access_token, ip, user_agent, device_id): now = int(self._clock.time_msec()) diff --git a/synapse/storage/schema/delta/33/user_ips_index.sql b/synapse/storage/schema/delta/33/user_ips_index.sql index 8a05677d42..473f75a78e 100644 --- a/synapse/storage/schema/delta/33/user_ips_index.sql +++ b/synapse/storage/schema/delta/33/user_ips_index.sql @@ -13,4 +13,5 @@ * limitations under the License. */ -CREATE INDEX user_ips_device_id ON user_ips(user_id, device_id, last_seen); +INSERT INTO background_updates (update_name, progress_json) VALUES + ('user_ips_device_index', '{}'); From 363786845b728bcd7146b3d949a86021a96eb2d2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 22 Jul 2016 13:21:07 +0100 Subject: [PATCH 2/4] PEP8 --- synapse/storage/client_ips.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 20eb9ac15f..71e5ea112f 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -44,7 +44,6 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): columns=["user_id", "device_id", "last_seen"], ) - @defer.inlineCallbacks def insert_client_ip(self, user, access_token, ip, user_agent, device_id): now = int(self._clock.time_msec()) From 465117d7ca40ba9906697aa023897798f7833830 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 25 Jul 2016 12:10:42 +0100 Subject: [PATCH 3/4] Fix background_update tests A bit of a cleanup for background_updates, and make sure that the real background updates have run before we start the unit tests, so that they don't interfere with the tests. --- synapse/storage/background_updates.py | 27 ++++++++++++++++++------- tests/storage/test_background_update.py | 22 ++++++++++++++------ 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 75951d0173..2771f7c3c1 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -88,10 +88,12 @@ class BackgroundUpdateStore(SQLBaseStore): @defer.inlineCallbacks def start_doing_background_updates(self): - while True: - if self._background_update_timer is not None: - return + assert(self._background_update_timer is not None, + "background updates already running") + logger.info("Starting background schema updates") + + while True: sleep = defer.Deferred() self._background_update_timer = self._clock.call_later( self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None @@ -102,7 +104,7 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_timer = None try: - result = yield self.do_background_update( + result = yield self.do_next_background_update( self.BACKGROUND_UPDATE_DURATION_MS ) except: @@ -113,11 +115,12 @@ class BackgroundUpdateStore(SQLBaseStore): "No more background updates to do." " Unscheduling background update task." ) - return + defer.returnValue() @defer.inlineCallbacks - def do_background_update(self, desired_duration_ms): - """Does some amount of work on a background update + def do_next_background_update(self, desired_duration_ms): + """Does some amount of work on the next queued background update + Args: desired_duration_ms(float): How long we want to spend updating. @@ -136,11 +139,21 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_queue.append(update['update_name']) if not self._background_update_queue: + # no work left to do defer.returnValue(None) + # pop from the front, and add back to the back update_name = self._background_update_queue.pop(0) self._background_update_queue.append(update_name) + res = yield self._do_background_update(update_name, desired_duration_ms) + defer.returnValue(res) + + @defer.inlineCallbacks + def _do_background_update(self, update_name, desired_duration_ms): + logger.info("Starting update batch on background update '%s'", + update_name) + update_handler = self._background_update_handlers[update_name] performance = self._background_update_performance.get(update_name) diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py index 6e4d9b1373..4944cb0d2e 100644 --- a/tests/storage/test_background_update.py +++ b/tests/storage/test_background_update.py @@ -10,7 +10,7 @@ class BackgroundUpdateTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): - hs = yield setup_test_homeserver() + hs = yield setup_test_homeserver() # type: synapse.server.HomeServer self.store = hs.get_datastore() self.clock = hs.get_clock() @@ -20,11 +20,20 @@ class BackgroundUpdateTestCase(unittest.TestCase): "test_update", self.update_handler ) + # run the real background updates, to get them out the way + # (perhaps we should run them as part of the test HS setup, since we + # run all of the other schema setup stuff there?) + while True: + res = yield self.store.do_next_background_update(1000) + if res is None: + break + @defer.inlineCallbacks def test_do_background_update(self): desired_count = 1000 duration_ms = 42 + # first step: make a bit of progress @defer.inlineCallbacks def update(progress, count): self.clock.advance_time_msec(count * duration_ms) @@ -42,7 +51,7 @@ class BackgroundUpdateTestCase(unittest.TestCase): yield self.store.start_background_update("test_update", {"my_key": 1}) self.update_handler.reset_mock() - result = yield self.store.do_background_update( + result = yield self.store.do_next_background_update( duration_ms * desired_count ) self.assertIsNotNone(result) @@ -50,24 +59,25 @@ class BackgroundUpdateTestCase(unittest.TestCase): {"my_key": 1}, self.store.DEFAULT_BACKGROUND_BATCH_SIZE ) + # second step: complete the update @defer.inlineCallbacks def update(progress, count): yield self.store._end_background_update("test_update") defer.returnValue(count) self.update_handler.side_effect = update - self.update_handler.reset_mock() - result = yield self.store.do_background_update( - duration_ms * desired_count + result = yield self.store.do_next_background_update( + duration_ms * desired_count ) self.assertIsNotNone(result) self.update_handler.assert_called_once_with( {"my_key": 2}, desired_count ) + # third step: we don't expect to be called any more self.update_handler.reset_mock() - result = yield self.store.do_background_update( + result = yield self.store.do_next_background_update( duration_ms * desired_count ) self.assertIsNone(result) From 42f4feb2b709671bb2dbbabfe1aad7e951479652 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 25 Jul 2016 12:25:06 +0100 Subject: [PATCH 4/4] PEP8 --- tests/storage/test_background_update.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py index 4944cb0d2e..1286b4ce2d 100644 --- a/tests/storage/test_background_update.py +++ b/tests/storage/test_background_update.py @@ -68,7 +68,7 @@ class BackgroundUpdateTestCase(unittest.TestCase): self.update_handler.side_effect = update self.update_handler.reset_mock() result = yield self.store.do_next_background_update( - duration_ms * desired_count + duration_ms * desired_count ) self.assertIsNotNone(result) self.update_handler.assert_called_once_with(