synapse/synapse/util/iterutils.py
Erik Johnston 707d5e4e48
Encode JSON responses on a thread in C, mk2 (#10905)
Currently we use `JsonEncoder.iterencode` to write JSON responses, which ensures that we don't block the main reactor thread when encoding huge objects. The downside to this is that `iterencode` falls back to using a pure Python encoder that is *much* less efficient and can easily burn a lot of CPU for huge responses. To fix this, while still ensuring we don't block the reactor loop, we encode the JSON on a threadpool using the standard `JsonEncoder.encode` functions, which is backed by a C library.

Doing so, however, requires `respond_with_json` to have access to the reactor, which it previously didn't. There are two ways of doing this:

1. threading through the reactor object, which is a bit fiddly as e.g. `DirectServeJsonResource` doesn't currently take a reactor, but is exposed to modules and so is a PITA to change; or
2. expose the reactor in `SynapseRequest`, which requires updating a bunch of servlet types.

I went with the latter as that is just a mechanical change, and I think makes sense as a request already has a reactor associated with it (via its http channel).
2021-09-28 09:37:58 +00:00

110 lines
3.1 KiB
Python

# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import heapq
from itertools import islice
from typing import (
Collection,
Dict,
Generator,
Iterable,
Iterator,
Mapping,
Set,
Sized,
Tuple,
TypeVar,
)
from typing_extensions import Protocol
T = TypeVar("T")
S = TypeVar("S", bound="_SelfSlice")
class _SelfSlice(Sized, Protocol):
"""A helper protocol that matches types where taking a slice results in the
same type being returned.
This is more specific than `Sequence`, which allows another `Sequence` to be
returned.
"""
def __getitem__(self: S, i: slice) -> S:
...
def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]:
"""batch an iterable up into tuples with a maximum size
Args:
iterable: the iterable to slice
size: the maximum batch size
Returns:
an iterator over the chunks
"""
# make sure we can deal with iterables like lists too
sourceiter = iter(iterable)
# call islice until it returns an empty tuple
return iter(lambda: tuple(islice(sourceiter, size)), ())
def chunk_seq(iseq: S, maxlen: int) -> Iterator[S]:
"""Split the given sequence into chunks of the given size
The last chunk may be shorter than the given size.
If the input is empty, no chunks are returned.
"""
return (iseq[i : i + maxlen] for i in range(0, len(iseq), maxlen))
def sorted_topologically(
nodes: Iterable[T],
graph: Mapping[T, Collection[T]],
) -> Generator[T, None, None]:
"""Given a set of nodes and a graph, yield the nodes in toplogical order.
For example `sorted_topologically([1, 2], {1: [2]})` will yield `2, 1`.
"""
# This is implemented by Kahn's algorithm.
degree_map = {node: 0 for node in nodes}
reverse_graph: Dict[T, Set[T]] = {}
for node, edges in graph.items():
if node not in degree_map:
continue
for edge in set(edges):
if edge in degree_map:
degree_map[node] += 1
reverse_graph.setdefault(edge, set()).add(node)
reverse_graph.setdefault(node, set())
zero_degree = [node for node, degree in degree_map.items() if degree == 0]
heapq.heapify(zero_degree)
while zero_degree:
node = heapq.heappop(zero_degree)
yield node
for edge in reverse_graph.get(node, []):
if edge in degree_map:
degree_map[edge] -= 1
if degree_map[edge] == 0:
heapq.heappush(zero_degree, edge)