From 7134ca7daa7ead8104e3c51ba4bc730d99d098e3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 4 Nov 2019 13:36:57 +0000 Subject: [PATCH] Change to not require a state_groups.room_id index. This does mean that we won't clean up orphaned state groups (i.e. state groups that were persisted but the associated event wasn't). --- synapse/storage/data_stores/main/events.py | 70 ++++++++++++------- .../schema/delta/56/state_group_room_idx.sql | 17 ----- synapse/storage/data_stores/main/state.py | 7 -- synapse/storage/purge_events.py | 4 +- 4 files changed, 45 insertions(+), 53 deletions(-) delete mode 100644 synapse/storage/data_stores/main/schema/delta/56/state_group_room_idx.sql diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 68f27078c4..3049a21dc5 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -1624,7 +1624,10 @@ class EventsStore( """Deletes all record of a room Args: - room_id (str): + room_id (str) + + Returns: + Deferred[List[int]]: The list of state groups to delete. """ return self.runInteraction("purge_room", self._purge_room_txn, room_id) @@ -1714,10 +1717,24 @@ class EventsStore( # index on them. In any case we should be clearing out 'stream' tables # periodically anyway (#5888) + # Now we fetch all the state groups that should be deleted. + txn.execute( + """ + SELECT DISTINCT state_group FROM events + INNER JOIN event_to_state_groups USING(event_id) + WHERE events.room_id = ? + """, + (room_id,), + ) + + state_groups = [row[0] for row in txn] + # TODO: we could probably usefully do a bunch of cache invalidation here logger.info("[purge] done") + return state_groups + def purge_unreferenced_state_groups( self, room_id: str, state_groups_to_delete: Set[int] ) -> defer.Deferred: @@ -1825,54 +1842,53 @@ class EventsStore( return {row["state_group"]: row["prev_state_group"] for row in rows} - def purge_room_state(self, room_id): + def purge_room_state(self, room_id, state_groups_to_delete): """Deletes all record of a room from state tables Args: room_id (str): + state_groups_to_delete (list[int]): State groups to delete """ return self.runInteraction( - "purge_room_state", self._purge_room_state_txn, room_id + "purge_room_state", + self._purge_room_state_txn, + room_id, + state_groups_to_delete, ) - def _purge_room_state_txn(self, txn, room_id): + def _purge_room_state_txn(self, txn, room_id, state_groups_to_delete): # first we have to delete the state groups states logger.info("[purge] removing %s from state_groups_state", room_id) - txn.execute( - """ - DELETE FROM state_groups_state - WHERE state_group IN ( - SELECT state_group FROM state_groups - WHERE room_id = ? - ) - """, - (room_id,), + self._simple_delete_many_txn( + txn, + table="state_groups_state", + column="state_group", + iterable=state_groups_to_delete, + keyvalues={}, ) # ... and the state group edges logger.info("[purge] removing %s from state_group_edges", room_id) - txn.execute( - """ - DELETE FROM state_group_edges - WHERE state_group IN ( - SELECT state_group FROM state_groups - WHERE room_id = ? - ) - """, - (room_id,), + self._simple_delete_many_txn( + txn, + table="state_group_edges", + column="state_group", + iterable=state_groups_to_delete, + keyvalues={}, ) # ... and the state groups logger.info("[purge] removing %s from state_groups", room_id) - txn.execute( - """ - DELETE FROM state_groups WHERE room_id = ? - """, - (room_id,), + self._simple_delete_many_txn( + txn, + table="state_groups", + column="id", + iterable=state_groups_to_delete, + keyvalues={}, ) async def is_event_after(self, event_id1, event_id2): diff --git a/synapse/storage/data_stores/main/schema/delta/56/state_group_room_idx.sql b/synapse/storage/data_stores/main/schema/delta/56/state_group_room_idx.sql deleted file mode 100644 index 7916ef18b2..0000000000 --- a/synapse/storage/data_stores/main/schema/delta/56/state_group_room_idx.sql +++ /dev/null @@ -1,17 +0,0 @@ -/* Copyright 2019 The Matrix.org Foundation C.I.C. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -INSERT INTO background_updates (update_name, progress_json) VALUES - ('state_groups_room_id_idx', '{}'); diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py index e1d3041c7c..5c5b15840e 100644 --- a/synapse/storage/data_stores/main/state.py +++ b/synapse/storage/data_stores/main/state.py @@ -1023,7 +1023,6 @@ class StateBackgroundUpdateStore( STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index" CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx" EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index" - STATE_GROUPS_ROOM_INDEX_UPDATE_NAME = "state_groups_room_id_idx" def __init__(self, db_conn, hs): super(StateBackgroundUpdateStore, self).__init__(db_conn, hs) @@ -1047,12 +1046,6 @@ class StateBackgroundUpdateStore( table="event_to_state_groups", columns=["state_group"], ) - self.register_background_index_update( - self.STATE_GROUPS_ROOM_INDEX_UPDATE_NAME, - index_name="state_groups_room_id_idx", - table="state_groups", - columns=["room_id"], - ) @defer.inlineCallbacks def _background_deduplicate_state(self, progress, batch_size): diff --git a/synapse/storage/purge_events.py b/synapse/storage/purge_events.py index dd45df0c88..a368182034 100644 --- a/synapse/storage/purge_events.py +++ b/synapse/storage/purge_events.py @@ -33,8 +33,8 @@ class PurgeEventsStorage(object): """Deletes all record of a room """ - yield self.stores.main.purge_room(room_id) - yield self.stores.main.purge_room_state(room_id) + state_groups_to_delete = yield self.stores.main.purge_room(room_id) + yield self.stores.main.purge_room_state(room_id, state_groups_to_delete) @defer.inlineCallbacks def purge_history(self, room_id, token, delete_local_events):