Merge pull request #137 from matrix-org/erikj/executemany

executemany support
This commit is contained in:
Mark Haines 2015-05-05 18:30:35 +01:00
commit ecb26beda5
7 changed files with 100 additions and 55 deletions

View file

@ -99,5 +99,5 @@ class TransactionActions(object):
transaction.transaction_id,
transaction.destination,
response_code,
encode_canonical_json(response_dict)
response_dict,
)

View file

@ -206,18 +206,23 @@ class LoggingTransaction(object):
def __setattr__(self, name, value):
setattr(self.txn, name, value)
def execute(self, sql, *args, **kwargs):
def execute(self, sql, *args):
self._do_execute(self.txn.execute, sql, *args)
def executemany(self, sql, *args):
self._do_execute(self.txn.executemany, sql, *args)
def _do_execute(self, func, sql, *args):
# TODO(paul): Maybe use 'info' and 'debug' for values?
sql_logger.debug("[SQL] {%s} %s", self.name, sql)
sql = self.database_engine.convert_param_style(sql)
if args and args[0]:
if args:
try:
sql_logger.debug(
"[SQL values] {%s} " + ", ".join(("<%r>",) * len(args[0])),
self.name,
*args[0]
"[SQL values] {%s} %r",
self.name, args[0]
)
except:
# Don't let logging failures stop SQL from working
@ -226,8 +231,8 @@ class LoggingTransaction(object):
start = time.time() * 1000
try:
return self.txn.execute(
sql, *args, **kwargs
return func(
sql, *args
)
except Exception as e:
logger.debug("[SQL FAIL] {%s} %s", self.name, e)
@ -484,18 +489,49 @@ class SQLBaseStore(object):
@log_function
def _simple_insert_txn(self, txn, table, values):
keys, vals = zip(*values.items())
sql = "INSERT INTO %s (%s) VALUES(%s)" % (
table,
", ".join(k for k in values),
", ".join("?" for k in values)
", ".join(k for k in keys),
", ".join("?" for _ in keys)
)
logger.debug(
"[SQL] %s Args=%s",
sql, values.values(),
txn.execute(sql, vals)
def _simple_insert_many_txn(self, txn, table, values):
if not values:
return
# This is a *slight* abomination to get a list of tuples of key names
# and a list of tuples of value names.
#
# i.e. [{"a": 1, "b": 2}, {"c": 3, "d": 4}]
# => [("a", "b",), ("c", "d",)] and [(1, 2,), (3, 4,)]
#
# The sort is to ensure that we don't rely on dictionary iteration
# order.
keys, vals = zip(*[
zip(
*(sorted(i.items(), key=lambda kv: kv[0]))
)
for i in values
if i
])
for k in keys:
if k != keys[0]:
raise RuntimeError(
"All items must have the same keys"
)
sql = "INSERT INTO %s (%s) VALUES(%s)" % (
table,
", ".join(k for k in keys[0]),
", ".join("?" for _ in keys[0])
)
txn.execute(sql, values.values())
txn.executemany(sql, vals)
def _simple_upsert(self, table, keyvalues, values,
insertion_values={}, desc="_simple_upsert", lock=True):

View file

@ -262,18 +262,19 @@ class EventFederationStore(SQLBaseStore):
For the given event, update the event edges table and forward and
backward extremities tables.
"""
for e_id, _ in prev_events:
# TODO (erikj): This could be done as a bulk insert
self._simple_insert_txn(
txn,
table="event_edges",
values={
self._simple_insert_many_txn(
txn,
table="event_edges",
values=[
{
"event_id": event_id,
"prev_event_id": e_id,
"room_id": room_id,
"is_state": False,
},
)
}
for e_id, _ in prev_events
],
)
# Update the extremities table if this is not an outlier.
if not outlier:
@ -307,16 +308,17 @@ class EventFederationStore(SQLBaseStore):
# Insert all the prev_events as a backwards thing, they'll get
# deleted in a second if they're incorrect anyway.
for e_id, _ in prev_events:
# TODO (erikj): This could be done as a bulk insert
self._simple_insert_txn(
txn,
table="event_backward_extremities",
values={
self._simple_insert_many_txn(
txn,
table="event_backward_extremities",
values=[
{
"event_id": e_id,
"room_id": room_id,
},
)
}
for e_id, _ in prev_events
],
)
# Also delete from the backwards extremities table all ones that
# reference events that we have already seen

View file

@ -129,7 +129,7 @@ class EventsStore(SQLBaseStore):
"room_id": s.room_id,
"type": s.type,
"state_key": s.state_key,
},
}
)
if event.is_state() and is_new_state:
@ -306,16 +306,18 @@ class EventsStore(SQLBaseStore):
hash_bytes
)
for auth_id, _ in event.auth_events:
self._simple_insert_txn(
txn,
table="event_auth",
values={
self._simple_insert_many_txn(
txn,
table="event_auth",
values=[
{
"event_id": event.event_id,
"room_id": event.room_id,
"auth_id": auth_id,
},
)
}
for auth_id, _ in event.auth_events
],
)
(ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
self._store_event_reference_hash_txn(
@ -340,17 +342,19 @@ class EventsStore(SQLBaseStore):
vals,
)
for e_id, h in event.prev_state:
self._simple_insert_txn(
txn,
table="event_edges",
values={
self._simple_insert_many_txn(
txn,
table="event_edges",
values=[
{
"event_id": event.event_id,
"prev_event_id": e_id,
"room_id": event.room_id,
"is_state": True,
},
)
}
for e_id, h in event.prev_state
],
)
if is_new_state and not context.rejected:
self._simple_upsert_txn(

View file

@ -104,18 +104,20 @@ class StateStore(SQLBaseStore):
},
)
for state in state_events.values():
self._simple_insert_txn(
txn,
table="state_groups_state",
values={
self._simple_insert_many_txn(
txn,
table="state_groups_state",
values=[
{
"state_group": state_group,
"room_id": state.room_id,
"type": state.type,
"state_key": state.state_key,
"event_id": state.event_id,
},
)
}
for state in state_events.values()
],
)
self._simple_insert_txn(
txn,

View file

@ -162,7 +162,8 @@ class TransactionStore(SQLBaseStore):
return self.runInteraction(
"delivered_txn",
self._delivered_txn,
transaction_id, destination, code, response_dict
transaction_id, destination, code,
buffer(encode_canonical_json(response_dict)),
)
def _delivered_txn(self, txn, transaction_id, destination,

View file

@ -67,7 +67,7 @@ class SQLBaseStoreTestCase(unittest.TestCase):
self.mock_txn.execute.assert_called_with(
"INSERT INTO tablename (columname) VALUES(?)",
["Value"]
("Value",)
)
@defer.inlineCallbacks
@ -82,7 +82,7 @@ class SQLBaseStoreTestCase(unittest.TestCase):
self.mock_txn.execute.assert_called_with(
"INSERT INTO tablename (colA, colB, colC) VALUES(?, ?, ?)",
[1, 2, 3]
(1, 2, 3,)
)
@defer.inlineCallbacks