mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-22 01:25:44 +03:00
Partial revert #17044 to fix sql performance regression
For details see #17129. Compared to a full revert, we keep the class method instroduced in #17129, as its used elsewhere by now Fixes: #17129 Signed-off-by: Christoph Settgast <csett86_git@quicksands.de>
This commit is contained in:
parent
7c9ac01eb5
commit
a2c74d05bc
1 changed files with 96 additions and 3 deletions
|
@ -280,16 +280,64 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
|||
|
||||
# Now we look up all links for the chains we have, adding chains that
|
||||
# are reachable from any event.
|
||||
#
|
||||
# This query is structured to first get all chain IDs reachable, and
|
||||
# then pull out all links from those chains. This does pull out more
|
||||
# rows than is strictly necessary, however there isn't a way of
|
||||
# structuring the recursive part of query to pull out the links without
|
||||
# also returning large quantities of redundant data (which can make it a
|
||||
# lot slower).
|
||||
sql = """
|
||||
WITH RECURSIVE links(chain_id) AS (
|
||||
SELECT
|
||||
DISTINCT origin_chain_id
|
||||
FROM event_auth_chain_links WHERE %s
|
||||
UNION
|
||||
SELECT
|
||||
target_chain_id
|
||||
FROM event_auth_chain_links
|
||||
INNER JOIN links ON (chain_id = origin_chain_id)
|
||||
)
|
||||
SELECT
|
||||
origin_chain_id, origin_sequence_number,
|
||||
target_chain_id, target_sequence_number
|
||||
FROM links
|
||||
INNER JOIN event_auth_chain_links ON (chain_id = origin_chain_id)
|
||||
"""
|
||||
|
||||
# A map from chain ID to max sequence number *reachable* from any event ID.
|
||||
chains: Dict[int, int] = {}
|
||||
for links in self._get_chain_links(txn, set(event_chains.keys())):
|
||||
|
||||
# Add all linked chains reachable from initial set of chains.
|
||||
chains_to_fetch = set(event_chains.keys())
|
||||
while chains_to_fetch:
|
||||
batch2 = tuple(itertools.islice(chains_to_fetch, 1000))
|
||||
chains_to_fetch.difference_update(batch2)
|
||||
clause, args = make_in_list_sql_clause(
|
||||
txn.database_engine, "origin_chain_id", batch2
|
||||
)
|
||||
txn.execute(sql % (clause,), args)
|
||||
|
||||
links: Dict[int, List[Tuple[int, int, int]]] = {}
|
||||
|
||||
for (
|
||||
origin_chain_id,
|
||||
origin_sequence_number,
|
||||
target_chain_id,
|
||||
target_sequence_number,
|
||||
) in txn:
|
||||
links.setdefault(origin_chain_id, []).append(
|
||||
(origin_sequence_number, target_chain_id, target_sequence_number)
|
||||
)
|
||||
|
||||
for chain_id in links:
|
||||
if chain_id not in event_chains:
|
||||
continue
|
||||
|
||||
_materialize(chain_id, event_chains[chain_id], links, chains)
|
||||
|
||||
chains_to_fetch.difference_update(chains)
|
||||
|
||||
# Add the initial set of chains, excluding the sequence corresponding to
|
||||
# initial event.
|
||||
for chain_id, seq_no in event_chains.items():
|
||||
|
@ -579,9 +627,53 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
|||
|
||||
# Now we look up all links for the chains we have, adding chains that
|
||||
# are reachable from any event.
|
||||
#
|
||||
# This query is structured to first get all chain IDs reachable, and
|
||||
# then pull out all links from those chains. This does pull out more
|
||||
# rows than is strictly necessary, however there isn't a way of
|
||||
# structuring the recursive part of query to pull out the links without
|
||||
# also returning large quantities of redundant data (which can make it a
|
||||
# lot slower).
|
||||
sql = """
|
||||
WITH RECURSIVE links(chain_id) AS (
|
||||
SELECT
|
||||
DISTINCT origin_chain_id
|
||||
FROM event_auth_chain_links WHERE %s
|
||||
UNION
|
||||
SELECT
|
||||
target_chain_id
|
||||
FROM event_auth_chain_links
|
||||
INNER JOIN links ON (chain_id = origin_chain_id)
|
||||
)
|
||||
SELECT
|
||||
origin_chain_id, origin_sequence_number,
|
||||
target_chain_id, target_sequence_number
|
||||
FROM links
|
||||
INNER JOIN event_auth_chain_links ON (chain_id = origin_chain_id)
|
||||
"""
|
||||
|
||||
# (We need to take a copy of `seen_chains` as we want to mutate it in
|
||||
# the loop)
|
||||
chains_to_fetch = set(seen_chains)
|
||||
while chains_to_fetch:
|
||||
batch2 = tuple(itertools.islice(chains_to_fetch, 1000))
|
||||
clause, args = make_in_list_sql_clause(
|
||||
txn.database_engine, "origin_chain_id", batch2
|
||||
)
|
||||
txn.execute(sql % (clause,), args)
|
||||
|
||||
links: Dict[int, List[Tuple[int, int, int]]] = {}
|
||||
|
||||
for (
|
||||
origin_chain_id,
|
||||
origin_sequence_number,
|
||||
target_chain_id,
|
||||
target_sequence_number,
|
||||
) in txn:
|
||||
links.setdefault(origin_chain_id, []).append(
|
||||
(origin_sequence_number, target_chain_id, target_sequence_number)
|
||||
)
|
||||
|
||||
# (We need to take a copy of `seen_chains` as the function mutates it)
|
||||
for links in self._get_chain_links(txn, set(seen_chains)):
|
||||
for chains in set_to_chain:
|
||||
for chain_id in links:
|
||||
if chain_id not in chains:
|
||||
|
@ -589,6 +681,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
|||
|
||||
_materialize(chain_id, chains[chain_id], links, chains)
|
||||
|
||||
chains_to_fetch.difference_update(chains)
|
||||
seen_chains.update(chains)
|
||||
|
||||
# Now for each chain we figure out the maximum sequence number reachable
|
||||
|
|
Loading…
Reference in a new issue