Merge pull request #2689 from matrix-org/rav/unlock_account_data_upsert

Avoid locking account_data tables for upserts
This commit is contained in:
Richard van der Hoff 2017-11-21 13:39:14 +00:00 committed by GitHub
commit 0b4aa2dc21
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -222,9 +222,12 @@ class AccountDataStore(SQLBaseStore):
"""
content_json = json.dumps(content)
def add_account_data_txn(txn, next_id):
self._simple_upsert_txn(
txn,
with self._account_data_id_gen.get_next() as next_id:
# no need to lock here as room_account_data has a unique constraint
# on (user_id, room_id, account_data_type) so _simple_upsert will
# retry if there is a conflict.
yield self._simple_upsert(
desc="add_room_account_data",
table="room_account_data",
keyvalues={
"user_id": user_id,
@ -234,19 +237,20 @@ class AccountDataStore(SQLBaseStore):
values={
"stream_id": next_id,
"content": content_json,
}
},
lock=False,
)
txn.call_after(
self._account_data_stream_cache.entity_has_changed,
user_id, next_id,
)
txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
self._update_max_stream_id(txn, next_id)
with self._account_data_id_gen.get_next() as next_id:
yield self.runInteraction(
"add_room_account_data", add_account_data_txn, next_id
)
# it's theoretically possible for the above to succeed and the
# below to fail - in which case we might reuse a stream id on
# restart, and the above update might not get propagated. That
# doesn't sound any worse than the whole update getting lost,
# which is what would happen if we combined the two into one
# transaction.
yield self._update_max_stream_id(next_id)
self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,))
result = self._account_data_id_gen.get_current_token()
defer.returnValue(result)
@ -263,9 +267,12 @@ class AccountDataStore(SQLBaseStore):
"""
content_json = json.dumps(content)
def add_account_data_txn(txn, next_id):
self._simple_upsert_txn(
txn,
with self._account_data_id_gen.get_next() as next_id:
# no need to lock here as account_data has a unique constraint on
# (user_id, account_data_type) so _simple_upsert will retry if
# there is a conflict.
yield self._simple_upsert(
desc="add_user_account_data",
table="account_data",
keyvalues={
"user_id": user_id,
@ -274,40 +281,46 @@ class AccountDataStore(SQLBaseStore):
values={
"stream_id": next_id,
"content": content_json,
}
},
lock=False,
)
txn.call_after(
self._account_data_stream_cache.entity_has_changed,
# it's theoretically possible for the above to succeed and the
# below to fail - in which case we might reuse a stream id on
# restart, and the above update might not get propagated. That
# doesn't sound any worse than the whole update getting lost,
# which is what would happen if we combined the two into one
# transaction.
yield self._update_max_stream_id(next_id)
self._account_data_stream_cache.entity_has_changed(
user_id, next_id,
)
txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
txn.call_after(
self.get_global_account_data_by_type_for_user.invalidate,
self.get_account_data_for_user.invalidate((user_id,))
self.get_global_account_data_by_type_for_user.invalidate(
(account_data_type, user_id,)
)
self._update_max_stream_id(txn, next_id)
with self._account_data_id_gen.get_next() as next_id:
yield self.runInteraction(
"add_user_account_data", add_account_data_txn, next_id
)
result = self._account_data_id_gen.get_current_token()
defer.returnValue(result)
def _update_max_stream_id(self, txn, next_id):
def _update_max_stream_id(self, next_id):
"""Update the max stream_id
Args:
txn: The database cursor
next_id(int): The the revision to advance to.
"""
update_max_id_sql = (
"UPDATE account_data_max_stream_id"
" SET stream_id = ?"
" WHERE stream_id < ?"
def _update(txn):
update_max_id_sql = (
"UPDATE account_data_max_stream_id"
" SET stream_id = ?"
" WHERE stream_id < ?"
)
txn.execute(update_max_id_sql, (next_id, next_id))
return self.runInteraction(
"update_account_data_max_stream_id",
_update,
)
txn.execute(update_max_id_sql, (next_id, next_id))
@cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000)
def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context):