mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-27 20:22:07 +03:00
Add transaction level logging and timing information. Add a _simple_delete method
This commit is contained in:
parent
967ce43b59
commit
da1dda3e1d
9 changed files with 91 additions and 21 deletions
|
@ -109,6 +109,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
|
||||
try:
|
||||
yield self.runInteraction(
|
||||
"persist_event",
|
||||
self._persist_pdu_event_txn,
|
||||
pdu=pdu,
|
||||
event=event,
|
||||
|
@ -394,7 +395,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
prev_state_pdu=prev_state_pdu,
|
||||
)
|
||||
|
||||
return self.runInteraction(_snapshot)
|
||||
return self.runInteraction("snapshot_room", _snapshot)
|
||||
|
||||
|
||||
class Snapshot(object):
|
||||
|
|
|
@ -29,15 +29,17 @@ import time
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
sql_logger = logging.getLogger("synapse.storage.SQL")
|
||||
transaction_logger = logging.getLogger("synapse.storage.txn")
|
||||
|
||||
|
||||
class LoggingTransaction(object):
|
||||
"""An object that almost-transparently proxies for the 'txn' object
|
||||
passed to the constructor. Adds logging to the .execute() method."""
|
||||
__slots__ = ["txn"]
|
||||
__slots__ = ["txn", "name"]
|
||||
|
||||
def __init__(self, txn):
|
||||
def __init__(self, txn, name):
|
||||
object.__setattr__(self, "txn", txn)
|
||||
object.__setattr__(self, "name", name)
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.txn, name)
|
||||
|
@ -47,12 +49,15 @@ class LoggingTransaction(object):
|
|||
|
||||
def execute(self, sql, *args, **kwargs):
|
||||
# TODO(paul): Maybe use 'info' and 'debug' for values?
|
||||
sql_logger.debug("[SQL] %s", sql)
|
||||
sql_logger.debug("[SQL] {%s} %s", self.name, sql)
|
||||
try:
|
||||
if args and args[0]:
|
||||
values = args[0]
|
||||
sql_logger.debug("[SQL values] " +
|
||||
", ".join(("<%s>",) * len(values)), *values)
|
||||
sql_logger.debug(
|
||||
"[SQL values] {%s} " + ", ".join(("<%s>",) * len(values)),
|
||||
self.name,
|
||||
*values
|
||||
)
|
||||
except:
|
||||
# Don't let logging failures stop SQL from working
|
||||
pass
|
||||
|
@ -64,10 +69,11 @@ class LoggingTransaction(object):
|
|||
)
|
||||
finally:
|
||||
end = time.clock() * 1000
|
||||
sql_logger.debug("[SQL time] %f", end - start)
|
||||
sql_logger.debug("[SQL time] {%s} %f", self.name, end - start)
|
||||
|
||||
|
||||
class SQLBaseStore(object):
|
||||
_TXN_ID = 0
|
||||
|
||||
def __init__(self, hs):
|
||||
self.hs = hs
|
||||
|
@ -75,10 +81,24 @@ class SQLBaseStore(object):
|
|||
self.event_factory = hs.get_event_factory()
|
||||
self._clock = hs.get_clock()
|
||||
|
||||
def runInteraction(self, func, *args, **kwargs):
|
||||
def runInteraction(self, desc, func, *args, **kwargs):
|
||||
"""Wraps the .runInteraction() method on the underlying db_pool."""
|
||||
def inner_func(txn, *args, **kwargs):
|
||||
return func(LoggingTransaction(txn), *args, **kwargs)
|
||||
start = time.clock() * 1000
|
||||
txn_id = str(SQLBaseStore._TXN_ID)
|
||||
SQLBaseStore._TXN_ID += 1
|
||||
|
||||
name = "%s-%s" % (desc, txn_id, )
|
||||
|
||||
transaction_logger.debug("[TXN START] {%s}", name)
|
||||
try:
|
||||
return func(LoggingTransaction(txn, name), *args, **kwargs)
|
||||
finally:
|
||||
end = time.clock() * 1000
|
||||
transaction_logger.debug(
|
||||
"[TXN END] {%s} %f",
|
||||
name, end - start
|
||||
)
|
||||
|
||||
return self._db_pool.runInteraction(inner_func, *args, **kwargs)
|
||||
|
||||
|
@ -114,7 +134,7 @@ class SQLBaseStore(object):
|
|||
else:
|
||||
return cursor.fetchall()
|
||||
|
||||
return self.runInteraction(interaction)
|
||||
return self.runInteraction("_execute", interaction)
|
||||
|
||||
def _execute_and_decode(self, query, *args):
|
||||
return self._execute(self.cursor_to_dict, query, *args)
|
||||
|
@ -131,6 +151,7 @@ class SQLBaseStore(object):
|
|||
or_replace : bool; if True performs an INSERT OR REPLACE
|
||||
"""
|
||||
return self.runInteraction(
|
||||
"_simple_insert",
|
||||
self._simple_insert_txn, table, values, or_replace=or_replace,
|
||||
or_ignore=or_ignore,
|
||||
)
|
||||
|
@ -168,6 +189,7 @@ class SQLBaseStore(object):
|
|||
statement returns no rows
|
||||
"""
|
||||
return self._simple_selectupdate_one(
|
||||
"_simple_select_one",
|
||||
table, keyvalues, retcols=retcols, allow_none=allow_none
|
||||
)
|
||||
|
||||
|
@ -217,7 +239,7 @@ class SQLBaseStore(object):
|
|||
txn.execute(sql, keyvalues.values())
|
||||
return txn.fetchall()
|
||||
|
||||
res = yield self.runInteraction(func)
|
||||
res = yield self.runInteraction("_simple_select_onecol", func)
|
||||
|
||||
defer.returnValue([r[0] for r in res])
|
||||
|
||||
|
@ -240,7 +262,7 @@ class SQLBaseStore(object):
|
|||
txn.execute(sql, keyvalues.values())
|
||||
return self.cursor_to_dict(txn)
|
||||
|
||||
return self.runInteraction(func)
|
||||
return self.runInteraction("_simple_select_list", func)
|
||||
|
||||
def _simple_update_one(self, table, keyvalues, updatevalues,
|
||||
retcols=None):
|
||||
|
@ -308,7 +330,7 @@ class SQLBaseStore(object):
|
|||
raise StoreError(500, "More than one row matched")
|
||||
|
||||
return ret
|
||||
return self.runInteraction(func)
|
||||
return self.runInteraction("_simple_selectupdate_one", func)
|
||||
|
||||
def _simple_delete_one(self, table, keyvalues):
|
||||
"""Executes a DELETE query on the named table, expecting to delete a
|
||||
|
@ -320,7 +342,7 @@ class SQLBaseStore(object):
|
|||
"""
|
||||
sql = "DELETE FROM %s WHERE %s" % (
|
||||
table,
|
||||
" AND ".join("%s = ?" % (k) for k in keyvalues)
|
||||
" AND ".join("%s = ?" % (k, ) for k in keyvalues)
|
||||
)
|
||||
|
||||
def func(txn):
|
||||
|
@ -329,7 +351,25 @@ class SQLBaseStore(object):
|
|||
raise StoreError(404, "No row found")
|
||||
if txn.rowcount > 1:
|
||||
raise StoreError(500, "more than one row matched")
|
||||
return self.runInteraction(func)
|
||||
return self.runInteraction("_simple_delete_one", func)
|
||||
|
||||
def _simple_delete(self, table, keyvalues):
|
||||
"""Executes a DELETE query on the named table.
|
||||
|
||||
Args:
|
||||
table : string giving the table name
|
||||
keyvalues : dict of column names and values to select the row with
|
||||
"""
|
||||
|
||||
return self.runInteraction("_simple_delete", self._simple_delete_txn)
|
||||
|
||||
def _simple_delete_txn(self, txn, table, keyvalues):
|
||||
sql = "DELETE FROM %s WHERE %s" % (
|
||||
table,
|
||||
" AND ".join("%s = ?" % (k, ) for k in keyvalues)
|
||||
)
|
||||
|
||||
return txn.execute(sql, keyvalues.values())
|
||||
|
||||
def _simple_max_id(self, table):
|
||||
"""Executes a SELECT query on the named table, expecting to return the
|
||||
|
@ -347,7 +387,7 @@ class SQLBaseStore(object):
|
|||
return 0
|
||||
return max_id
|
||||
|
||||
return self.runInteraction(func)
|
||||
return self.runInteraction("_simple_max_id", func)
|
||||
|
||||
def _parse_event_from_row(self, row_dict):
|
||||
d = copy.deepcopy({k: v for k, v in row_dict.items()})
|
||||
|
@ -371,7 +411,9 @@ class SQLBaseStore(object):
|
|||
)
|
||||
|
||||
def _parse_events(self, rows):
|
||||
return self.runInteraction(self._parse_events_txn, rows)
|
||||
return self.runInteraction(
|
||||
"_parse_events", self._parse_events_txn, rows
|
||||
)
|
||||
|
||||
def _parse_events_txn(self, txn, rows):
|
||||
events = [self._parse_event_from_row(r) for r in rows]
|
||||
|
|
|
@ -95,6 +95,7 @@ class DirectoryStore(SQLBaseStore):
|
|||
|
||||
def delete_room_alias(self, room_alias):
|
||||
return self.runInteraction(
|
||||
"delete_room_alias",
|
||||
self._delete_room_alias_txn,
|
||||
room_alias,
|
||||
)
|
||||
|
|
|
@ -47,7 +47,7 @@ class PduStore(SQLBaseStore):
|
|||
"""
|
||||
|
||||
return self.runInteraction(
|
||||
self._get_pdu_tuple, pdu_id, origin
|
||||
"get_pdu", self._get_pdu_tuple, pdu_id, origin
|
||||
)
|
||||
|
||||
def _get_pdu_tuple(self, txn, pdu_id, origin):
|
||||
|
@ -108,6 +108,7 @@ class PduStore(SQLBaseStore):
|
|||
"""
|
||||
|
||||
return self.runInteraction(
|
||||
"get_current_state_for_context",
|
||||
self._get_current_state_for_context,
|
||||
context
|
||||
)
|
||||
|
@ -156,6 +157,7 @@ class PduStore(SQLBaseStore):
|
|||
"""
|
||||
|
||||
return self.runInteraction(
|
||||
"mark_pdu_as_processed",
|
||||
self._mark_as_processed, pdu_id, pdu_origin
|
||||
)
|
||||
|
||||
|
@ -165,6 +167,7 @@ class PduStore(SQLBaseStore):
|
|||
def get_all_pdus_from_context(self, context):
|
||||
"""Get a list of all PDUs for a given context."""
|
||||
return self.runInteraction(
|
||||
"get_all_pdus_from_context",
|
||||
self._get_all_pdus_from_context, context,
|
||||
)
|
||||
|
||||
|
@ -192,6 +195,7 @@ class PduStore(SQLBaseStore):
|
|||
list: A list of PduTuples
|
||||
"""
|
||||
return self.runInteraction(
|
||||
"get_backfill",
|
||||
self._get_backfill, context, pdu_list, limit
|
||||
)
|
||||
|
||||
|
@ -253,6 +257,7 @@ class PduStore(SQLBaseStore):
|
|||
context (str)
|
||||
"""
|
||||
return self.runInteraction(
|
||||
"get_min_depth_for_context",
|
||||
self._get_min_depth_for_context, context
|
||||
)
|
||||
|
||||
|
@ -291,6 +296,7 @@ class PduStore(SQLBaseStore):
|
|||
|
||||
def get_latest_pdus_in_context(self, context):
|
||||
return self.runInteraction(
|
||||
"get_latest_pdus_in_context",
|
||||
self._get_latest_pdus_in_context,
|
||||
context
|
||||
)
|
||||
|
@ -370,6 +376,7 @@ class PduStore(SQLBaseStore):
|
|||
"""
|
||||
|
||||
return self.runInteraction(
|
||||
"is_pdu_new",
|
||||
self._is_pdu_new,
|
||||
pdu_id=pdu_id,
|
||||
origin=origin,
|
||||
|
@ -523,6 +530,7 @@ class StatePduStore(SQLBaseStore):
|
|||
|
||||
def get_unresolved_state_tree(self, new_state_pdu):
|
||||
return self.runInteraction(
|
||||
"get_unresolved_state_tree",
|
||||
self._get_unresolved_state_tree, new_state_pdu
|
||||
)
|
||||
|
||||
|
@ -562,6 +570,7 @@ class StatePduStore(SQLBaseStore):
|
|||
def update_current_state(self, pdu_id, origin, context, pdu_type,
|
||||
state_key):
|
||||
return self.runInteraction(
|
||||
"update_current_state",
|
||||
self._update_current_state,
|
||||
pdu_id, origin, context, pdu_type, state_key
|
||||
)
|
||||
|
@ -601,6 +610,7 @@ class StatePduStore(SQLBaseStore):
|
|||
"""
|
||||
|
||||
return self.runInteraction(
|
||||
"get_current_state_pdu",
|
||||
self._get_current_state_pdu, context, pdu_type, state_key
|
||||
)
|
||||
|
||||
|
@ -660,6 +670,7 @@ class StatePduStore(SQLBaseStore):
|
|||
bool: True if the new_pdu clobbered the current state, False if not
|
||||
"""
|
||||
return self.runInteraction(
|
||||
"handle_new_state",
|
||||
self._handle_new_state, new_pdu
|
||||
)
|
||||
|
||||
|
|
|
@ -62,8 +62,10 @@ class RegistrationStore(SQLBaseStore):
|
|||
Raises:
|
||||
StoreError if the user_id could not be registered.
|
||||
"""
|
||||
yield self.runInteraction(self._register, user_id, token,
|
||||
password_hash)
|
||||
yield self.runInteraction(
|
||||
"register",
|
||||
self._register, user_id, token, password_hash
|
||||
)
|
||||
|
||||
def _register(self, txn, user_id, token, password_hash):
|
||||
now = int(self.clock.time())
|
||||
|
@ -100,6 +102,7 @@ class RegistrationStore(SQLBaseStore):
|
|||
StoreError if no user was found.
|
||||
"""
|
||||
return self.runInteraction(
|
||||
"get_user_by_token",
|
||||
self._query_for_auth,
|
||||
token
|
||||
)
|
||||
|
|
|
@ -150,6 +150,7 @@ class RoomStore(SQLBaseStore):
|
|||
|
||||
def get_power_level(self, room_id, user_id):
|
||||
return self.runInteraction(
|
||||
"get_power_level",
|
||||
self._get_power_level,
|
||||
room_id, user_id,
|
||||
)
|
||||
|
@ -183,6 +184,7 @@ class RoomStore(SQLBaseStore):
|
|||
|
||||
def get_ops_levels(self, room_id):
|
||||
return self.runInteraction(
|
||||
"get_ops_levels",
|
||||
self._get_ops_levels,
|
||||
room_id,
|
||||
)
|
||||
|
|
|
@ -59,6 +59,7 @@ class StateStore(SQLBaseStore):
|
|||
|
||||
def store_state_groups(self, event):
|
||||
return self.runInteraction(
|
||||
"store_state_groups",
|
||||
self._store_state_groups_txn, event
|
||||
)
|
||||
|
||||
|
|
|
@ -309,7 +309,10 @@ class StreamStore(SQLBaseStore):
|
|||
defer.returnValue(ret)
|
||||
|
||||
def get_room_events_max_id(self):
|
||||
return self.runInteraction(self._get_room_events_max_id_txn)
|
||||
return self.runInteraction(
|
||||
"get_room_events_max_id",
|
||||
self._get_room_events_max_id_txn
|
||||
)
|
||||
|
||||
def _get_room_events_max_id_txn(self, txn):
|
||||
txn.execute(
|
||||
|
|
|
@ -42,6 +42,7 @@ class TransactionStore(SQLBaseStore):
|
|||
"""
|
||||
|
||||
return self.runInteraction(
|
||||
"get_received_txn_response",
|
||||
self._get_received_txn_response, transaction_id, origin
|
||||
)
|
||||
|
||||
|
@ -73,6 +74,7 @@ class TransactionStore(SQLBaseStore):
|
|||
"""
|
||||
|
||||
return self.runInteraction(
|
||||
"set_received_txn_response",
|
||||
self._set_received_txn_response,
|
||||
transaction_id, origin, code, response_dict
|
||||
)
|
||||
|
@ -106,6 +108,7 @@ class TransactionStore(SQLBaseStore):
|
|||
"""
|
||||
|
||||
return self.runInteraction(
|
||||
"prep_send_transaction",
|
||||
self._prep_send_transaction,
|
||||
transaction_id, destination, origin_server_ts, pdu_list
|
||||
)
|
||||
|
@ -161,6 +164,7 @@ class TransactionStore(SQLBaseStore):
|
|||
response_json (str)
|
||||
"""
|
||||
return self.runInteraction(
|
||||
"delivered_txn",
|
||||
self._delivered_txn,
|
||||
transaction_id, destination, code, response_dict
|
||||
)
|
||||
|
@ -186,6 +190,7 @@ class TransactionStore(SQLBaseStore):
|
|||
list: A list of `ReceivedTransactionsTable.EntryType`
|
||||
"""
|
||||
return self.runInteraction(
|
||||
"get_transactions_after",
|
||||
self._get_transactions_after, transaction_id, destination
|
||||
)
|
||||
|
||||
|
@ -216,6 +221,7 @@ class TransactionStore(SQLBaseStore):
|
|||
list: A list of PduTuple
|
||||
"""
|
||||
return self.runInteraction(
|
||||
"get_pdus_after_transaction",
|
||||
self._get_pdus_after_transaction,
|
||||
transaction_id, destination
|
||||
)
|
||||
|
|
Loading…
Reference in a new issue