mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-23 01:55:53 +03:00
Replace uses of simple_insert_many with simple_insert_many_values. (#11742)
This should be (slightly) more efficient and it is simpler to have a single method for inserting multiple values.
This commit is contained in:
parent
d70169bf9b
commit
3e0536cd2a
19 changed files with 263 additions and 298 deletions
1
changelog.d/11742.misc
Normal file
1
changelog.d/11742.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Minor efficiency improvements when inserting many values into the database.
|
|
@ -123,34 +123,25 @@ class BackgroundUpdateStartJobRestServlet(RestServlet):
|
||||||
job_name = body["job_name"]
|
job_name = body["job_name"]
|
||||||
|
|
||||||
if job_name == "populate_stats_process_rooms":
|
if job_name == "populate_stats_process_rooms":
|
||||||
jobs = [
|
jobs = [("populate_stats_process_rooms", "{}", "")]
|
||||||
{
|
|
||||||
"update_name": "populate_stats_process_rooms",
|
|
||||||
"progress_json": "{}",
|
|
||||||
},
|
|
||||||
]
|
|
||||||
elif job_name == "regenerate_directory":
|
elif job_name == "regenerate_directory":
|
||||||
jobs = [
|
jobs = [
|
||||||
{
|
("populate_user_directory_createtables", "{}", ""),
|
||||||
"update_name": "populate_user_directory_createtables",
|
(
|
||||||
"progress_json": "{}",
|
"populate_user_directory_process_rooms",
|
||||||
"depends_on": "",
|
"{}",
|
||||||
},
|
"populate_user_directory_createtables",
|
||||||
{
|
),
|
||||||
"update_name": "populate_user_directory_process_rooms",
|
(
|
||||||
"progress_json": "{}",
|
"populate_user_directory_process_users",
|
||||||
"depends_on": "populate_user_directory_createtables",
|
"{}",
|
||||||
},
|
"populate_user_directory_process_rooms",
|
||||||
{
|
),
|
||||||
"update_name": "populate_user_directory_process_users",
|
(
|
||||||
"progress_json": "{}",
|
"populate_user_directory_cleanup",
|
||||||
"depends_on": "populate_user_directory_process_rooms",
|
"{}",
|
||||||
},
|
"populate_user_directory_process_users",
|
||||||
{
|
),
|
||||||
"update_name": "populate_user_directory_cleanup",
|
|
||||||
"progress_json": "{}",
|
|
||||||
"depends_on": "populate_user_directory_process_users",
|
|
||||||
},
|
|
||||||
]
|
]
|
||||||
else:
|
else:
|
||||||
raise SynapseError(HTTPStatus.BAD_REQUEST, "Invalid job_name")
|
raise SynapseError(HTTPStatus.BAD_REQUEST, "Invalid job_name")
|
||||||
|
@ -158,6 +149,7 @@ class BackgroundUpdateStartJobRestServlet(RestServlet):
|
||||||
try:
|
try:
|
||||||
await self._store.db_pool.simple_insert_many(
|
await self._store.db_pool.simple_insert_many(
|
||||||
table="background_updates",
|
table="background_updates",
|
||||||
|
keys=("update_name", "progress_json", "depends_on"),
|
||||||
values=jobs,
|
values=jobs,
|
||||||
desc=f"admin_api_run_{job_name}",
|
desc=f"admin_api_run_{job_name}",
|
||||||
)
|
)
|
||||||
|
|
|
@ -934,56 +934,6 @@ class DatabasePool:
|
||||||
txn.execute(sql, vals)
|
txn.execute(sql, vals)
|
||||||
|
|
||||||
async def simple_insert_many(
|
async def simple_insert_many(
|
||||||
self, table: str, values: List[Dict[str, Any]], desc: str
|
|
||||||
) -> None:
|
|
||||||
"""Executes an INSERT query on the named table.
|
|
||||||
|
|
||||||
The input is given as a list of dicts, with one dict per row.
|
|
||||||
Generally simple_insert_many_values should be preferred for new code.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
table: string giving the table name
|
|
||||||
values: dict of new column names and values for them
|
|
||||||
desc: description of the transaction, for logging and metrics
|
|
||||||
"""
|
|
||||||
await self.runInteraction(desc, self.simple_insert_many_txn, table, values)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def simple_insert_many_txn(
|
|
||||||
txn: LoggingTransaction, table: str, values: List[Dict[str, Any]]
|
|
||||||
) -> None:
|
|
||||||
"""Executes an INSERT query on the named table.
|
|
||||||
|
|
||||||
The input is given as a list of dicts, with one dict per row.
|
|
||||||
Generally simple_insert_many_values_txn should be preferred for new code.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
txn: The transaction to use.
|
|
||||||
table: string giving the table name
|
|
||||||
values: dict of new column names and values for them
|
|
||||||
"""
|
|
||||||
if not values:
|
|
||||||
return
|
|
||||||
|
|
||||||
# This is a *slight* abomination to get a list of tuples of key names
|
|
||||||
# and a list of tuples of value names.
|
|
||||||
#
|
|
||||||
# i.e. [{"a": 1, "b": 2}, {"c": 3, "d": 4}]
|
|
||||||
# => [("a", "b",), ("c", "d",)] and [(1, 2,), (3, 4,)]
|
|
||||||
#
|
|
||||||
# The sort is to ensure that we don't rely on dictionary iteration
|
|
||||||
# order.
|
|
||||||
keys, vals = zip(
|
|
||||||
*(zip(*(sorted(i.items(), key=lambda kv: kv[0]))) for i in values if i)
|
|
||||||
)
|
|
||||||
|
|
||||||
for k in keys:
|
|
||||||
if k != keys[0]:
|
|
||||||
raise RuntimeError("All items must have the same keys")
|
|
||||||
|
|
||||||
return DatabasePool.simple_insert_many_values_txn(txn, table, keys[0], vals)
|
|
||||||
|
|
||||||
async def simple_insert_many_values(
|
|
||||||
self,
|
self,
|
||||||
table: str,
|
table: str,
|
||||||
keys: Collection[str],
|
keys: Collection[str],
|
||||||
|
@ -1002,11 +952,11 @@ class DatabasePool:
|
||||||
desc: description of the transaction, for logging and metrics
|
desc: description of the transaction, for logging and metrics
|
||||||
"""
|
"""
|
||||||
await self.runInteraction(
|
await self.runInteraction(
|
||||||
desc, self.simple_insert_many_values_txn, table, keys, values
|
desc, self.simple_insert_many_txn, table, keys, values
|
||||||
)
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def simple_insert_many_values_txn(
|
def simple_insert_many_txn(
|
||||||
txn: LoggingTransaction,
|
txn: LoggingTransaction,
|
||||||
table: str,
|
table: str,
|
||||||
keys: Collection[str],
|
keys: Collection[str],
|
||||||
|
|
|
@ -536,9 +536,9 @@ class AccountDataWorkerStore(CacheInvalidationWorkerStore):
|
||||||
self.db_pool.simple_insert_many_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="ignored_users",
|
table="ignored_users",
|
||||||
|
keys=("ignorer_user_id", "ignored_user_id"),
|
||||||
values=[
|
values=[
|
||||||
{"ignorer_user_id": user_id, "ignored_user_id": u}
|
(user_id, u) for u in currently_ignored_users - previously_ignored_users
|
||||||
for u in currently_ignored_users - previously_ignored_users
|
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -432,14 +432,21 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||||
self.db_pool.simple_insert_many_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="device_federation_outbox",
|
table="device_federation_outbox",
|
||||||
|
keys=(
|
||||||
|
"destination",
|
||||||
|
"stream_id",
|
||||||
|
"queued_ts",
|
||||||
|
"messages_json",
|
||||||
|
"instance_name",
|
||||||
|
),
|
||||||
values=[
|
values=[
|
||||||
{
|
(
|
||||||
"destination": destination,
|
destination,
|
||||||
"stream_id": stream_id,
|
stream_id,
|
||||||
"queued_ts": now_ms,
|
now_ms,
|
||||||
"messages_json": json_encoder.encode(edu),
|
json_encoder.encode(edu),
|
||||||
"instance_name": self._instance_name,
|
self._instance_name,
|
||||||
}
|
)
|
||||||
for destination, edu in remote_messages_by_destination.items()
|
for destination, edu in remote_messages_by_destination.items()
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -571,14 +578,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||||
self.db_pool.simple_insert_many_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="device_inbox",
|
table="device_inbox",
|
||||||
|
keys=("user_id", "device_id", "stream_id", "message_json", "instance_name"),
|
||||||
values=[
|
values=[
|
||||||
{
|
(user_id, device_id, stream_id, message_json, self._instance_name)
|
||||||
"user_id": user_id,
|
|
||||||
"device_id": device_id,
|
|
||||||
"stream_id": stream_id,
|
|
||||||
"message_json": message_json,
|
|
||||||
"instance_name": self._instance_name,
|
|
||||||
}
|
|
||||||
for user_id, messages_by_device in local_by_user_then_device.items()
|
for user_id, messages_by_device in local_by_user_then_device.items()
|
||||||
for device_id, message_json in messages_by_device.items()
|
for device_id, message_json in messages_by_device.items()
|
||||||
],
|
],
|
||||||
|
|
|
@ -1386,12 +1386,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||||
self.db_pool.simple_insert_many_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="device_lists_remote_cache",
|
table="device_lists_remote_cache",
|
||||||
|
keys=("user_id", "device_id", "content"),
|
||||||
values=[
|
values=[
|
||||||
{
|
(user_id, content["device_id"], json_encoder.encode(content))
|
||||||
"user_id": user_id,
|
|
||||||
"device_id": content["device_id"],
|
|
||||||
"content": json_encoder.encode(content),
|
|
||||||
}
|
|
||||||
for content in devices
|
for content in devices
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -1479,8 +1476,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||||
self.db_pool.simple_insert_many_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="device_lists_stream",
|
table="device_lists_stream",
|
||||||
|
keys=("stream_id", "user_id", "device_id"),
|
||||||
values=[
|
values=[
|
||||||
{"stream_id": stream_id, "user_id": user_id, "device_id": device_id}
|
(stream_id, user_id, device_id)
|
||||||
for stream_id, device_id in zip(stream_ids, device_ids)
|
for stream_id, device_id in zip(stream_ids, device_ids)
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -1507,18 +1505,27 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||||
self.db_pool.simple_insert_many_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="device_lists_outbound_pokes",
|
table="device_lists_outbound_pokes",
|
||||||
|
keys=(
|
||||||
|
"destination",
|
||||||
|
"stream_id",
|
||||||
|
"user_id",
|
||||||
|
"device_id",
|
||||||
|
"sent",
|
||||||
|
"ts",
|
||||||
|
"opentracing_context",
|
||||||
|
),
|
||||||
values=[
|
values=[
|
||||||
{
|
(
|
||||||
"destination": destination,
|
destination,
|
||||||
"stream_id": next(next_stream_id),
|
next(next_stream_id),
|
||||||
"user_id": user_id,
|
user_id,
|
||||||
"device_id": device_id,
|
device_id,
|
||||||
"sent": False,
|
False,
|
||||||
"ts": now,
|
now,
|
||||||
"opentracing_context": json_encoder.encode(context)
|
json_encoder.encode(context)
|
||||||
if whitelisted_homeserver(destination)
|
if whitelisted_homeserver(destination)
|
||||||
else "{}",
|
else "{}",
|
||||||
}
|
)
|
||||||
for destination in hosts
|
for destination in hosts
|
||||||
for device_id in device_ids
|
for device_id in device_ids
|
||||||
],
|
],
|
||||||
|
|
|
@ -112,10 +112,8 @@ class DirectoryWorkerStore(CacheInvalidationWorkerStore):
|
||||||
self.db_pool.simple_insert_many_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="room_alias_servers",
|
table="room_alias_servers",
|
||||||
values=[
|
keys=("room_alias", "server"),
|
||||||
{"room_alias": room_alias.to_string(), "server": server}
|
values=[(room_alias.to_string(), server) for server in servers],
|
||||||
for server in servers
|
|
||||||
],
|
|
||||||
)
|
)
|
||||||
|
|
||||||
self._invalidate_cache_and_stream(
|
self._invalidate_cache_and_stream(
|
||||||
|
|
|
@ -110,16 +110,16 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
||||||
values = []
|
values = []
|
||||||
for (room_id, session_id, room_key) in room_keys:
|
for (room_id, session_id, room_key) in room_keys:
|
||||||
values.append(
|
values.append(
|
||||||
{
|
(
|
||||||
"user_id": user_id,
|
user_id,
|
||||||
"version": version_int,
|
version_int,
|
||||||
"room_id": room_id,
|
room_id,
|
||||||
"session_id": session_id,
|
session_id,
|
||||||
"first_message_index": room_key["first_message_index"],
|
room_key["first_message_index"],
|
||||||
"forwarded_count": room_key["forwarded_count"],
|
room_key["forwarded_count"],
|
||||||
"is_verified": room_key["is_verified"],
|
room_key["is_verified"],
|
||||||
"session_data": json_encoder.encode(room_key["session_data"]),
|
json_encoder.encode(room_key["session_data"]),
|
||||||
}
|
)
|
||||||
)
|
)
|
||||||
log_kv(
|
log_kv(
|
||||||
{
|
{
|
||||||
|
@ -131,7 +131,19 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
||||||
)
|
)
|
||||||
|
|
||||||
await self.db_pool.simple_insert_many(
|
await self.db_pool.simple_insert_many(
|
||||||
table="e2e_room_keys", values=values, desc="add_e2e_room_keys"
|
table="e2e_room_keys",
|
||||||
|
keys=(
|
||||||
|
"user_id",
|
||||||
|
"version",
|
||||||
|
"room_id",
|
||||||
|
"session_id",
|
||||||
|
"first_message_index",
|
||||||
|
"forwarded_count",
|
||||||
|
"is_verified",
|
||||||
|
"session_data",
|
||||||
|
),
|
||||||
|
values=values,
|
||||||
|
desc="add_e2e_room_keys",
|
||||||
)
|
)
|
||||||
|
|
||||||
@trace
|
@trace
|
||||||
|
|
|
@ -387,15 +387,16 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
|
||||||
self.db_pool.simple_insert_many_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="e2e_one_time_keys_json",
|
table="e2e_one_time_keys_json",
|
||||||
|
keys=(
|
||||||
|
"user_id",
|
||||||
|
"device_id",
|
||||||
|
"algorithm",
|
||||||
|
"key_id",
|
||||||
|
"ts_added_ms",
|
||||||
|
"key_json",
|
||||||
|
),
|
||||||
values=[
|
values=[
|
||||||
{
|
(user_id, device_id, algorithm, key_id, time_now, json_bytes)
|
||||||
"user_id": user_id,
|
|
||||||
"device_id": device_id,
|
|
||||||
"algorithm": algorithm,
|
|
||||||
"key_id": key_id,
|
|
||||||
"ts_added_ms": time_now,
|
|
||||||
"key_json": json_bytes,
|
|
||||||
}
|
|
||||||
for algorithm, key_id, json_bytes in new_keys
|
for algorithm, key_id, json_bytes in new_keys
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -1186,15 +1187,22 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
||||||
"""
|
"""
|
||||||
await self.db_pool.simple_insert_many(
|
await self.db_pool.simple_insert_many(
|
||||||
"e2e_cross_signing_signatures",
|
"e2e_cross_signing_signatures",
|
||||||
[
|
keys=(
|
||||||
{
|
"user_id",
|
||||||
"user_id": user_id,
|
"key_id",
|
||||||
"key_id": item.signing_key_id,
|
"target_user_id",
|
||||||
"target_user_id": item.target_user_id,
|
"target_device_id",
|
||||||
"target_device_id": item.target_device_id,
|
"signature",
|
||||||
"signature": item.signature,
|
),
|
||||||
}
|
values=[
|
||||||
|
(
|
||||||
|
user_id,
|
||||||
|
item.signing_key_id,
|
||||||
|
item.target_user_id,
|
||||||
|
item.target_device_id,
|
||||||
|
item.signature,
|
||||||
|
)
|
||||||
for item in signatures
|
for item in signatures
|
||||||
],
|
],
|
||||||
"add_e2e_signing_key",
|
desc="add_e2e_signing_key",
|
||||||
)
|
)
|
||||||
|
|
|
@ -875,14 +875,21 @@ class EventPushActionsWorkerStore(SQLBaseStore):
|
||||||
self.db_pool.simple_insert_many_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="event_push_summary",
|
table="event_push_summary",
|
||||||
|
keys=(
|
||||||
|
"user_id",
|
||||||
|
"room_id",
|
||||||
|
"notif_count",
|
||||||
|
"unread_count",
|
||||||
|
"stream_ordering",
|
||||||
|
),
|
||||||
values=[
|
values=[
|
||||||
{
|
(
|
||||||
"user_id": user_id,
|
user_id,
|
||||||
"room_id": room_id,
|
room_id,
|
||||||
"notif_count": summary.notif_count,
|
summary.notif_count,
|
||||||
"unread_count": summary.unread_count,
|
summary.unread_count,
|
||||||
"stream_ordering": summary.stream_ordering,
|
summary.stream_ordering,
|
||||||
}
|
)
|
||||||
for ((user_id, room_id), summary) in summaries.items()
|
for ((user_id, room_id), summary) in summaries.items()
|
||||||
if summary.old_user_id is None
|
if summary.old_user_id is None
|
||||||
],
|
],
|
||||||
|
|
|
@ -442,12 +442,9 @@ class PersistEventsStore:
|
||||||
self.db_pool.simple_insert_many_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="event_auth",
|
table="event_auth",
|
||||||
|
keys=("event_id", "room_id", "auth_id"),
|
||||||
values=[
|
values=[
|
||||||
{
|
(event.event_id, event.room_id, auth_id)
|
||||||
"event_id": event.event_id,
|
|
||||||
"room_id": event.room_id,
|
|
||||||
"auth_id": auth_id,
|
|
||||||
}
|
|
||||||
for event in events
|
for event in events
|
||||||
for auth_id in event.auth_event_ids()
|
for auth_id in event.auth_event_ids()
|
||||||
if event.is_state()
|
if event.is_state()
|
||||||
|
@ -675,8 +672,9 @@ class PersistEventsStore:
|
||||||
db_pool.simple_insert_many_txn(
|
db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="event_auth_chains",
|
table="event_auth_chains",
|
||||||
|
keys=("event_id", "chain_id", "sequence_number"),
|
||||||
values=[
|
values=[
|
||||||
{"event_id": event_id, "chain_id": c_id, "sequence_number": seq}
|
(event_id, c_id, seq)
|
||||||
for event_id, (c_id, seq) in new_chain_tuples.items()
|
for event_id, (c_id, seq) in new_chain_tuples.items()
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -782,13 +780,14 @@ class PersistEventsStore:
|
||||||
db_pool.simple_insert_many_txn(
|
db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="event_auth_chain_links",
|
table="event_auth_chain_links",
|
||||||
|
keys=(
|
||||||
|
"origin_chain_id",
|
||||||
|
"origin_sequence_number",
|
||||||
|
"target_chain_id",
|
||||||
|
"target_sequence_number",
|
||||||
|
),
|
||||||
values=[
|
values=[
|
||||||
{
|
(source_id, source_seq, target_id, target_seq)
|
||||||
"origin_chain_id": source_id,
|
|
||||||
"origin_sequence_number": source_seq,
|
|
||||||
"target_chain_id": target_id,
|
|
||||||
"target_sequence_number": target_seq,
|
|
||||||
}
|
|
||||||
for (
|
for (
|
||||||
source_id,
|
source_id,
|
||||||
source_seq,
|
source_seq,
|
||||||
|
@ -943,20 +942,28 @@ class PersistEventsStore:
|
||||||
txn_id = getattr(event.internal_metadata, "txn_id", None)
|
txn_id = getattr(event.internal_metadata, "txn_id", None)
|
||||||
if token_id and txn_id:
|
if token_id and txn_id:
|
||||||
to_insert.append(
|
to_insert.append(
|
||||||
{
|
(
|
||||||
"event_id": event.event_id,
|
event.event_id,
|
||||||
"room_id": event.room_id,
|
event.room_id,
|
||||||
"user_id": event.sender,
|
event.sender,
|
||||||
"token_id": token_id,
|
token_id,
|
||||||
"txn_id": txn_id,
|
txn_id,
|
||||||
"inserted_ts": self._clock.time_msec(),
|
self._clock.time_msec(),
|
||||||
}
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
if to_insert:
|
if to_insert:
|
||||||
self.db_pool.simple_insert_many_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="event_txn_id",
|
table="event_txn_id",
|
||||||
|
keys=(
|
||||||
|
"event_id",
|
||||||
|
"room_id",
|
||||||
|
"user_id",
|
||||||
|
"token_id",
|
||||||
|
"txn_id",
|
||||||
|
"inserted_ts",
|
||||||
|
),
|
||||||
values=to_insert,
|
values=to_insert,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1161,8 +1168,9 @@ class PersistEventsStore:
|
||||||
self.db_pool.simple_insert_many_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="event_forward_extremities",
|
table="event_forward_extremities",
|
||||||
|
keys=("event_id", "room_id"),
|
||||||
values=[
|
values=[
|
||||||
{"event_id": ev_id, "room_id": room_id}
|
(ev_id, room_id)
|
||||||
for room_id, new_extrem in new_forward_extremities.items()
|
for room_id, new_extrem in new_forward_extremities.items()
|
||||||
for ev_id in new_extrem
|
for ev_id in new_extrem
|
||||||
],
|
],
|
||||||
|
@ -1174,12 +1182,9 @@ class PersistEventsStore:
|
||||||
self.db_pool.simple_insert_many_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="stream_ordering_to_exterm",
|
table="stream_ordering_to_exterm",
|
||||||
|
keys=("room_id", "event_id", "stream_ordering"),
|
||||||
values=[
|
values=[
|
||||||
{
|
(room_id, event_id, max_stream_order)
|
||||||
"room_id": room_id,
|
|
||||||
"event_id": event_id,
|
|
||||||
"stream_ordering": max_stream_order,
|
|
||||||
}
|
|
||||||
for room_id, new_extrem in new_forward_extremities.items()
|
for room_id, new_extrem in new_forward_extremities.items()
|
||||||
for event_id in new_extrem
|
for event_id in new_extrem
|
||||||
],
|
],
|
||||||
|
@ -1342,7 +1347,7 @@ class PersistEventsStore:
|
||||||
d.pop("redacted_because", None)
|
d.pop("redacted_because", None)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
self.db_pool.simple_insert_many_values_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="event_json",
|
table="event_json",
|
||||||
keys=("event_id", "room_id", "internal_metadata", "json", "format_version"),
|
keys=("event_id", "room_id", "internal_metadata", "json", "format_version"),
|
||||||
|
@ -1358,7 +1363,7 @@ class PersistEventsStore:
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
self.db_pool.simple_insert_many_values_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="events",
|
table="events",
|
||||||
keys=(
|
keys=(
|
||||||
|
@ -1412,7 +1417,7 @@ class PersistEventsStore:
|
||||||
)
|
)
|
||||||
txn.execute(sql + clause, [False] + args)
|
txn.execute(sql + clause, [False] + args)
|
||||||
|
|
||||||
self.db_pool.simple_insert_many_values_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="state_events",
|
table="state_events",
|
||||||
keys=("event_id", "room_id", "type", "state_key"),
|
keys=("event_id", "room_id", "type", "state_key"),
|
||||||
|
@ -1622,14 +1627,9 @@ class PersistEventsStore:
|
||||||
return self.db_pool.simple_insert_many_txn(
|
return self.db_pool.simple_insert_many_txn(
|
||||||
txn=txn,
|
txn=txn,
|
||||||
table="event_labels",
|
table="event_labels",
|
||||||
|
keys=("event_id", "label", "room_id", "topological_ordering"),
|
||||||
values=[
|
values=[
|
||||||
{
|
(event_id, label, room_id, topological_ordering) for label in labels
|
||||||
"event_id": event_id,
|
|
||||||
"label": label,
|
|
||||||
"room_id": room_id,
|
|
||||||
"topological_ordering": topological_ordering,
|
|
||||||
}
|
|
||||||
for label in labels
|
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1657,16 +1657,13 @@ class PersistEventsStore:
|
||||||
vals = []
|
vals = []
|
||||||
for event in events:
|
for event in events:
|
||||||
ref_alg, ref_hash_bytes = compute_event_reference_hash(event)
|
ref_alg, ref_hash_bytes = compute_event_reference_hash(event)
|
||||||
vals.append(
|
vals.append((event.event_id, ref_alg, memoryview(ref_hash_bytes)))
|
||||||
{
|
|
||||||
"event_id": event.event_id,
|
|
||||||
"algorithm": ref_alg,
|
|
||||||
"hash": memoryview(ref_hash_bytes),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
self.db_pool.simple_insert_many_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn, table="event_reference_hashes", values=vals
|
txn,
|
||||||
|
table="event_reference_hashes",
|
||||||
|
keys=("event_id", "algorithm", "hash"),
|
||||||
|
values=vals,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _store_room_members_txn(
|
def _store_room_members_txn(
|
||||||
|
@ -1689,18 +1686,25 @@ class PersistEventsStore:
|
||||||
self.db_pool.simple_insert_many_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="room_memberships",
|
table="room_memberships",
|
||||||
|
keys=(
|
||||||
|
"event_id",
|
||||||
|
"user_id",
|
||||||
|
"sender",
|
||||||
|
"room_id",
|
||||||
|
"membership",
|
||||||
|
"display_name",
|
||||||
|
"avatar_url",
|
||||||
|
),
|
||||||
values=[
|
values=[
|
||||||
{
|
(
|
||||||
"event_id": event.event_id,
|
event.event_id,
|
||||||
"user_id": event.state_key,
|
event.state_key,
|
||||||
"sender": event.user_id,
|
event.user_id,
|
||||||
"room_id": event.room_id,
|
event.room_id,
|
||||||
"membership": event.membership,
|
event.membership,
|
||||||
"display_name": non_null_str_or_none(
|
non_null_str_or_none(event.content.get("displayname")),
|
||||||
event.content.get("displayname")
|
non_null_str_or_none(event.content.get("avatar_url")),
|
||||||
),
|
)
|
||||||
"avatar_url": non_null_str_or_none(event.content.get("avatar_url")),
|
|
||||||
}
|
|
||||||
for event in events
|
for event in events
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -2163,13 +2167,9 @@ class PersistEventsStore:
|
||||||
self.db_pool.simple_insert_many_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="event_edges",
|
table="event_edges",
|
||||||
|
keys=("event_id", "prev_event_id", "room_id", "is_state"),
|
||||||
values=[
|
values=[
|
||||||
{
|
(ev.event_id, e_id, ev.room_id, False)
|
||||||
"event_id": ev.event_id,
|
|
||||||
"prev_event_id": e_id,
|
|
||||||
"room_id": ev.room_id,
|
|
||||||
"is_state": False,
|
|
||||||
}
|
|
||||||
for ev in events
|
for ev in events
|
||||||
for e_id in ev.prev_event_ids()
|
for e_id in ev.prev_event_ids()
|
||||||
],
|
],
|
||||||
|
|
|
@ -684,13 +684,14 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
||||||
self.db_pool.simple_insert_many_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn=txn,
|
txn=txn,
|
||||||
table="event_labels",
|
table="event_labels",
|
||||||
|
keys=("event_id", "label", "room_id", "topological_ordering"),
|
||||||
values=[
|
values=[
|
||||||
{
|
(
|
||||||
"event_id": event_id,
|
event_id,
|
||||||
"label": label,
|
label,
|
||||||
"room_id": event_json["room_id"],
|
event_json["room_id"],
|
||||||
"topological_ordering": event_json["depth"],
|
event_json["depth"],
|
||||||
}
|
)
|
||||||
for label in event_json["content"].get(
|
for label in event_json["content"].get(
|
||||||
EventContentFields.LABELS, []
|
EventContentFields.LABELS, []
|
||||||
)
|
)
|
||||||
|
@ -803,29 +804,19 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
||||||
|
|
||||||
if not has_state:
|
if not has_state:
|
||||||
state_events.append(
|
state_events.append(
|
||||||
{
|
(event.event_id, event.room_id, event.type, event.state_key)
|
||||||
"event_id": event.event_id,
|
|
||||||
"room_id": event.room_id,
|
|
||||||
"type": event.type,
|
|
||||||
"state_key": event.state_key,
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if not has_event_auth:
|
if not has_event_auth:
|
||||||
# Old, dodgy, events may have duplicate auth events, which we
|
# Old, dodgy, events may have duplicate auth events, which we
|
||||||
# need to deduplicate as we have a unique constraint.
|
# need to deduplicate as we have a unique constraint.
|
||||||
for auth_id in set(event.auth_event_ids()):
|
for auth_id in set(event.auth_event_ids()):
|
||||||
auth_events.append(
|
auth_events.append((event.event_id, event.room_id, auth_id))
|
||||||
{
|
|
||||||
"room_id": event.room_id,
|
|
||||||
"event_id": event.event_id,
|
|
||||||
"auth_id": auth_id,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
if state_events:
|
if state_events:
|
||||||
await self.db_pool.simple_insert_many(
|
await self.db_pool.simple_insert_many(
|
||||||
table="state_events",
|
table="state_events",
|
||||||
|
keys=("event_id", "room_id", "type", "state_key"),
|
||||||
values=state_events,
|
values=state_events,
|
||||||
desc="_rejected_events_metadata_state_events",
|
desc="_rejected_events_metadata_state_events",
|
||||||
)
|
)
|
||||||
|
@ -833,6 +824,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
||||||
if auth_events:
|
if auth_events:
|
||||||
await self.db_pool.simple_insert_many(
|
await self.db_pool.simple_insert_many(
|
||||||
table="event_auth",
|
table="event_auth",
|
||||||
|
keys=("event_id", "room_id", "auth_id"),
|
||||||
values=auth_events,
|
values=auth_events,
|
||||||
desc="_rejected_events_metadata_event_auth",
|
desc="_rejected_events_metadata_event_auth",
|
||||||
)
|
)
|
||||||
|
|
|
@ -129,18 +129,29 @@ class PresenceStore(PresenceBackgroundUpdateStore):
|
||||||
self.db_pool.simple_insert_many_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="presence_stream",
|
table="presence_stream",
|
||||||
|
keys=(
|
||||||
|
"stream_id",
|
||||||
|
"user_id",
|
||||||
|
"state",
|
||||||
|
"last_active_ts",
|
||||||
|
"last_federation_update_ts",
|
||||||
|
"last_user_sync_ts",
|
||||||
|
"status_msg",
|
||||||
|
"currently_active",
|
||||||
|
"instance_name",
|
||||||
|
),
|
||||||
values=[
|
values=[
|
||||||
{
|
(
|
||||||
"stream_id": stream_id,
|
stream_id,
|
||||||
"user_id": state.user_id,
|
state.user_id,
|
||||||
"state": state.state,
|
state.state,
|
||||||
"last_active_ts": state.last_active_ts,
|
state.last_active_ts,
|
||||||
"last_federation_update_ts": state.last_federation_update_ts,
|
state.last_federation_update_ts,
|
||||||
"last_user_sync_ts": state.last_user_sync_ts,
|
state.last_user_sync_ts,
|
||||||
"status_msg": state.status_msg,
|
state.status_msg,
|
||||||
"currently_active": state.currently_active,
|
state.currently_active,
|
||||||
"instance_name": self._instance_name,
|
self._instance_name,
|
||||||
}
|
)
|
||||||
for stream_id, state in zip(stream_orderings, presence_states)
|
for stream_id, state in zip(stream_orderings, presence_states)
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
|
@ -561,13 +561,9 @@ class PusherStore(PusherWorkerStore):
|
||||||
self.db_pool.simple_insert_many_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="deleted_pushers",
|
table="deleted_pushers",
|
||||||
|
keys=("stream_id", "app_id", "pushkey", "user_id"),
|
||||||
values=[
|
values=[
|
||||||
{
|
(stream_id, pusher.app_id, pusher.pushkey, user_id)
|
||||||
"stream_id": stream_id,
|
|
||||||
"app_id": pusher.app_id,
|
|
||||||
"pushkey": pusher.pushkey,
|
|
||||||
"user_id": user_id,
|
|
||||||
}
|
|
||||||
for stream_id, pusher in zip(stream_ids, pushers)
|
for stream_id, pusher in zip(stream_ids, pushers)
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
|
@ -105,8 +105,10 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
|
||||||
GROUP BY room_id
|
GROUP BY room_id
|
||||||
"""
|
"""
|
||||||
txn.execute(sql)
|
txn.execute(sql)
|
||||||
rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()]
|
rooms = list(txn.fetchall())
|
||||||
self.db_pool.simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms)
|
self.db_pool.simple_insert_many_txn(
|
||||||
|
txn, TEMP_TABLE + "_rooms", keys=("room_id", "events"), values=rooms
|
||||||
|
)
|
||||||
del rooms
|
del rooms
|
||||||
|
|
||||||
sql = (
|
sql = (
|
||||||
|
@ -117,9 +119,11 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
|
||||||
txn.execute(sql)
|
txn.execute(sql)
|
||||||
|
|
||||||
txn.execute("SELECT name FROM users")
|
txn.execute("SELECT name FROM users")
|
||||||
users = [{"user_id": x[0]} for x in txn.fetchall()]
|
users = list(txn.fetchall())
|
||||||
|
|
||||||
self.db_pool.simple_insert_many_txn(txn, TEMP_TABLE + "_users", users)
|
self.db_pool.simple_insert_many_txn(
|
||||||
|
txn, TEMP_TABLE + "_users", keys=("user_id",), values=users
|
||||||
|
)
|
||||||
|
|
||||||
new_pos = await self.get_max_stream_id_in_current_state_deltas()
|
new_pos = await self.get_max_stream_id_in_current_state_deltas()
|
||||||
await self.db_pool.runInteraction(
|
await self.db_pool.runInteraction(
|
||||||
|
|
|
@ -327,14 +327,15 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
|
||||||
self.db_pool.simple_insert_many_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="state_groups_state",
|
table="state_groups_state",
|
||||||
|
keys=(
|
||||||
|
"state_group",
|
||||||
|
"room_id",
|
||||||
|
"type",
|
||||||
|
"state_key",
|
||||||
|
"event_id",
|
||||||
|
),
|
||||||
values=[
|
values=[
|
||||||
{
|
(state_group, room_id, key[0], key[1], state_id)
|
||||||
"state_group": state_group,
|
|
||||||
"room_id": room_id,
|
|
||||||
"type": key[0],
|
|
||||||
"state_key": key[1],
|
|
||||||
"event_id": state_id,
|
|
||||||
}
|
|
||||||
for key, state_id in delta_state.items()
|
for key, state_id in delta_state.items()
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
|
@ -460,14 +460,9 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||||
self.db_pool.simple_insert_many_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="state_groups_state",
|
table="state_groups_state",
|
||||||
|
keys=("state_group", "room_id", "type", "state_key", "event_id"),
|
||||||
values=[
|
values=[
|
||||||
{
|
(state_group, room_id, key[0], key[1], state_id)
|
||||||
"state_group": state_group,
|
|
||||||
"room_id": room_id,
|
|
||||||
"type": key[0],
|
|
||||||
"state_key": key[1],
|
|
||||||
"event_id": state_id,
|
|
||||||
}
|
|
||||||
for key, state_id in delta_ids.items()
|
for key, state_id in delta_ids.items()
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -475,14 +470,9 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||||
self.db_pool.simple_insert_many_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="state_groups_state",
|
table="state_groups_state",
|
||||||
|
keys=("state_group", "room_id", "type", "state_key", "event_id"),
|
||||||
values=[
|
values=[
|
||||||
{
|
(state_group, room_id, key[0], key[1], state_id)
|
||||||
"state_group": state_group,
|
|
||||||
"room_id": room_id,
|
|
||||||
"type": key[0],
|
|
||||||
"state_key": key[1],
|
|
||||||
"event_id": state_id,
|
|
||||||
}
|
|
||||||
for key, state_id in current_state_ids.items()
|
for key, state_id in current_state_ids.items()
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -589,14 +579,9 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||||
self.db_pool.simple_insert_many_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="state_groups_state",
|
table="state_groups_state",
|
||||||
|
keys=("state_group", "room_id", "type", "state_key", "event_id"),
|
||||||
values=[
|
values=[
|
||||||
{
|
(sg, room_id, key[0], key[1], state_id)
|
||||||
"state_group": sg,
|
|
||||||
"room_id": room_id,
|
|
||||||
"type": key[0],
|
|
||||||
"state_key": key[1],
|
|
||||||
"event_id": state_id,
|
|
||||||
}
|
|
||||||
for key, state_id in curr_state.items()
|
for key, state_id in curr_state.items()
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
|
@ -223,20 +223,13 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
|
||||||
# Create all possible single character tokens
|
# Create all possible single character tokens
|
||||||
tokens = []
|
tokens = []
|
||||||
for c in string.ascii_letters + string.digits + "._~-":
|
for c in string.ascii_letters + string.digits + "._~-":
|
||||||
tokens.append(
|
tokens.append((c, None, 0, 0, None))
|
||||||
{
|
|
||||||
"token": c,
|
|
||||||
"uses_allowed": None,
|
|
||||||
"pending": 0,
|
|
||||||
"completed": 0,
|
|
||||||
"expiry_time": None,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
self.get_success(
|
self.get_success(
|
||||||
self.store.db_pool.simple_insert_many(
|
self.store.db_pool.simple_insert_many(
|
||||||
"registration_tokens",
|
"registration_tokens",
|
||||||
tokens,
|
keys=("token", "uses_allowed", "pending", "completed", "expiry_time"),
|
||||||
"create_all_registration_tokens",
|
values=tokens,
|
||||||
|
desc="create_all_registration_tokens",
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -515,17 +515,23 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
|
||||||
self.get_success(
|
self.get_success(
|
||||||
self.store.db_pool.simple_insert_many(
|
self.store.db_pool.simple_insert_many(
|
||||||
table="federation_inbound_events_staging",
|
table="federation_inbound_events_staging",
|
||||||
|
keys=(
|
||||||
|
"origin",
|
||||||
|
"room_id",
|
||||||
|
"received_ts",
|
||||||
|
"event_id",
|
||||||
|
"event_json",
|
||||||
|
"internal_metadata",
|
||||||
|
),
|
||||||
values=[
|
values=[
|
||||||
{
|
(
|
||||||
"origin": "some_origin",
|
"some_origin",
|
||||||
"room_id": room_id,
|
room_id,
|
||||||
"received_ts": 0,
|
0,
|
||||||
"event_id": f"$fake_event_id_{i + 1}",
|
f"$fake_event_id_{i + 1}",
|
||||||
"event_json": json_encoder.encode(
|
json_encoder.encode({"prev_events": [f"$fake_event_id_{i}"]}),
|
||||||
{"prev_events": [f"$fake_event_id_{i}"]}
|
"{}",
|
||||||
),
|
)
|
||||||
"internal_metadata": "{}",
|
|
||||||
}
|
|
||||||
for i in range(500)
|
for i in range(500)
|
||||||
],
|
],
|
||||||
desc="test_prune_inbound_federation_queue",
|
desc="test_prune_inbound_federation_queue",
|
||||||
|
|
Loading…
Reference in a new issue