mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-26 19:47:05 +03:00
Use READ COMMITTED isolation level when purging rooms (#12942)
To close: #10294. Signed off by Nick @ Beeper.
This commit is contained in:
parent
c5f487b7cb
commit
6785b0f39d
2 changed files with 32 additions and 2 deletions
1
changelog.d/12942.misc
Normal file
1
changelog.d/12942.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Use lower isolation level when purging rooms to avoid serialization errors. Contributed by Nick @ Beeper.
|
|
@ -19,6 +19,8 @@ from synapse.api.errors import SynapseError
|
||||||
from synapse.storage.database import LoggingTransaction
|
from synapse.storage.database import LoggingTransaction
|
||||||
from synapse.storage.databases.main import CacheInvalidationWorkerStore
|
from synapse.storage.databases.main import CacheInvalidationWorkerStore
|
||||||
from synapse.storage.databases.main.state import StateGroupWorkerStore
|
from synapse.storage.databases.main.state import StateGroupWorkerStore
|
||||||
|
from synapse.storage.engines import PostgresEngine
|
||||||
|
from synapse.storage.engines._base import IsolationLevel
|
||||||
from synapse.types import RoomStreamToken
|
from synapse.types import RoomStreamToken
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -317,11 +319,38 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||||
Returns:
|
Returns:
|
||||||
The list of state groups to delete.
|
The list of state groups to delete.
|
||||||
"""
|
"""
|
||||||
return await self.db_pool.runInteraction(
|
|
||||||
"purge_room", self._purge_room_txn, room_id
|
# This first runs the purge transaction with READ_COMMITTED isolation level,
|
||||||
|
# meaning any new rows in the tables will not trigger a serialization error.
|
||||||
|
# We then run the same purge a second time without this isolation level to
|
||||||
|
# purge any of those rows which were added during the first.
|
||||||
|
|
||||||
|
state_groups_to_delete = await self.db_pool.runInteraction(
|
||||||
|
"purge_room",
|
||||||
|
self._purge_room_txn,
|
||||||
|
room_id=room_id,
|
||||||
|
isolation_level=IsolationLevel.READ_COMMITTED,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
state_groups_to_delete.extend(
|
||||||
|
await self.db_pool.runInteraction(
|
||||||
|
"purge_room",
|
||||||
|
self._purge_room_txn,
|
||||||
|
room_id=room_id,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
return state_groups_to_delete
|
||||||
|
|
||||||
def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
|
def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
|
||||||
|
# This collides with event persistence so we cannot write new events and metadata into
|
||||||
|
# a room while deleting it or this transaction will fail.
|
||||||
|
if isinstance(self.database_engine, PostgresEngine):
|
||||||
|
txn.execute(
|
||||||
|
"SELECT room_version FROM rooms WHERE room_id = ? FOR UPDATE",
|
||||||
|
(room_id,),
|
||||||
|
)
|
||||||
|
|
||||||
# First, fetch all the state groups that should be deleted, before
|
# First, fetch all the state groups that should be deleted, before
|
||||||
# we delete that information.
|
# we delete that information.
|
||||||
txn.execute(
|
txn.execute(
|
||||||
|
|
Loading…
Reference in a new issue