diff --git a/docs/log_contexts.rst b/docs/log_contexts.rst index 0046e171be..8d04a973de 100644 --- a/docs/log_contexts.rst +++ b/docs/log_contexts.rst @@ -1,10 +1,441 @@ -What do I do about "Unexpected logging context" debug log-lines everywhere? +Log contexts +============ - The logging context lives in thread local storage - Sometimes it gets out of sync with what it should actually be, usually because something scheduled something to run on the reactor without preserving the logging context. - what is the impact of it getting out of sync? and how and when should we preserve log context? - The impact is that some of the CPU and database metrics will be under-reported, and some log lines will be mis-attributed. - It should happen auto-magically in all the APIs that do IO or otherwise defer to the reactor. - Mjark: the other place is if we branch, e.g. using defer.gatherResults +.. contents:: -Unanswered: how and when should we preserve log context? \ No newline at end of file +To help track the processing of individual requests, synapse uses a +'log context' to track which request it is handling at any given moment. This +is done via a thread-local variable; a ``logging.Filter`` is then used to fish +the information back out of the thread-local variable and add it to each log +record. + +Logcontexts are also used for CPU and database accounting, so that we can track +which requests were responsible for high CPU use or database activity. + +The ``synapse.util.logcontext`` module provides a facilities for managing the +current log context (as well as providing the ``LoggingContextFilter`` class). + +Deferreds make the whole thing complicated, so this document describes how it +all works, and how to write code which follows the rules. + +Logcontexts without Deferreds +----------------------------- + +In the absence of any Deferred voodoo, things are simple enough. As with any +code of this nature, the rule is that our function should leave things as it +found them: + +.. code:: python + + from synapse.util import logcontext # omitted from future snippets + + def handle_request(request_id): + request_context = logcontext.LoggingContext() + + calling_context = logcontext.LoggingContext.current_context() + logcontext.LoggingContext.set_current_context(request_context) + try: + request_context.request = request_id + do_request_handling() + logger.debug("finished") + finally: + logcontext.LoggingContext.set_current_context(calling_context) + + def do_request_handling(): + logger.debug("phew") # this will be logged against request_id + + +LoggingContext implements the context management methods, so the above can be +written much more succinctly as: + +.. code:: python + + def handle_request(request_id): + with logcontext.LoggingContext() as request_context: + request_context.request = request_id + do_request_handling() + logger.debug("finished") + + def do_request_handling(): + logger.debug("phew") + + +Using logcontexts with Deferreds +-------------------------------- + +Deferreds — and in particular, ``defer.inlineCallbacks`` — break +the linear flow of code so that there is no longer a single entry point where +we should set the logcontext and a single exit point where we should remove it. + +Consider the example above, where ``do_request_handling`` needs to do some +blocking operation, and returns a deferred: + +.. code:: python + + @defer.inlineCallbacks + def handle_request(request_id): + with logcontext.LoggingContext() as request_context: + request_context.request = request_id + yield do_request_handling() + logger.debug("finished") + + +In the above flow: + +* The logcontext is set +* ``do_request_handling`` is called, and returns a deferred +* ``handle_request`` yields the deferred +* The ``inlineCallbacks`` wrapper of ``handle_request`` returns a deferred + +So we have stopped processing the request (and will probably go on to start +processing the next), without clearing the logcontext. + +To circumvent this problem, synapse code assumes that, wherever you have a +deferred, you will want to yield on it. To that end, whereever functions return +a deferred, we adopt the following conventions: + +**Rules for functions returning deferreds:** + + * If the deferred is already complete, the function returns with the same + logcontext it started with. + * If the deferred is incomplete, the function clears the logcontext before + returning; when the deferred completes, it restores the logcontext before + running any callbacks. + +That sounds complicated, but actually it means a lot of code (including the +example above) "just works". There are two cases: + +* If ``do_request_handling`` returns a completed deferred, then the logcontext + will still be in place. In this case, execution will continue immediately + after the ``yield``; the "finished" line will be logged against the right + context, and the ``with`` block restores the original context before we + return to the caller. + +* If the returned deferred is incomplete, ``do_request_handling`` clears the + logcontext before returning. The logcontext is therefore clear when + ``handle_request`` yields the deferred. At that point, the ``inlineCallbacks`` + wrapper adds a callback to the deferred, and returns another (incomplete) + deferred to the caller, and it is safe to begin processing the next request. + + Once ``do_request_handling``'s deferred completes, it will reinstate the + logcontext, before running the callback added by the ``inlineCallbacks`` + wrapper. That callback runs the second half of ``handle_request``, so again + the "finished" line will be logged against the right + context, and the ``with`` block restores the original context. + +As an aside, it's worth noting that ``handle_request`` follows our rules - +though that only matters if the caller has its own logcontext which it cares +about. + +The following sections describe pitfalls and helpful patterns when implementing +these rules. + +Always yield your deferreds +--------------------------- + +Whenever you get a deferred back from a function, you should ``yield`` on it +as soon as possible. (Returning it directly to your caller is ok too, if you're +not doing ``inlineCallbacks``.) Do not pass go; do not do any logging; do not +call any other functions. + +.. code:: python + + @defer.inlineCallbacks + def fun(): + logger.debug("starting") + yield do_some_stuff() # just like this + + d = more_stuff() + result = yield d # also fine, of course + + defer.returnValue(result) + + def nonInlineCallbacksFun(): + logger.debug("just a wrapper really") + return do_some_stuff() # this is ok too - the caller will yield on + # it anyway. + +Provided this pattern is followed all the way back up to the callchain to where +the logcontext was set, this will make things work out ok: provided +``do_some_stuff`` and ``more_stuff`` follow the rules above, then so will +``fun`` (as wrapped by ``inlineCallbacks``) and ``nonInlineCallbacksFun``. + +It's all too easy to forget to ``yield``: for instance if we forgot that +``do_some_stuff`` returned a deferred, we might plough on regardless. This +leads to a mess; it will probably work itself out eventually, but not before +a load of stuff has been logged against the wrong content. (Normally, other +things will break, more obviously, if you forget to ``yield``, so this tends +not to be a major problem in practice.) + +Of course sometimes you need to do something a bit fancier with your Deferreds +- not all code follows the linear A-then-B-then-C pattern. Notes on +implementing more complex patterns are in later sections. + +Where you create a new Deferred, make it follow the rules +--------------------------------------------------------- + +Most of the time, a Deferred comes from another synapse function. Sometimes, +though, we need to make up a new Deferred, or we get a Deferred back from +external code. We need to make it follow our rules. + +The easy way to do it is with a combination of ``defer.inlineCallbacks``, and +``logcontext.PreserveLoggingContext``. Suppose we want to implement ``sleep``, +which returns a deferred which will run its callbacks after a given number of +seconds. That might look like: + +.. code:: python + + # not a logcontext-rules-compliant function + def get_sleep_deferred(seconds): + d = defer.Deferred() + reactor.callLater(seconds, d.callback, None) + return d + +That doesn't follow the rules, but we can fix it by wrapping it with +``PreserveLoggingContext`` and ``yield`` ing on it: + +.. code:: python + + @defer.inlineCallbacks + def sleep(seconds): + with PreserveLoggingContext(): + yield get_sleep_deferred(seconds) + +This technique works equally for external functions which return deferreds, +or deferreds we have made ourselves. + +XXX: think this is what ``preserve_context_over_deferred`` is supposed to do, +though it is broken, in that it only restores the logcontext for the duration +of the callbacks, which doesn't comply with the logcontext rules. + +Fire-and-forget +--------------- + +Sometimes you want to fire off a chain of execution, but not wait for its +result. That might look a bit like this: + +.. code:: python + + @defer.inlineCallbacks + def do_request_handling(): + yield foreground_operation() + + # *don't* do this + background_operation() + + logger.debug("Request handling complete") + + @defer.inlineCallbacks + def background_operation(): + yield first_background_step() + logger.debug("Completed first step") + yield second_background_step() + logger.debug("Completed second step") + +The above code does a couple of steps in the background after +``do_request_handling`` has finished. The log lines are still logged against +the ``request_context`` logcontext, which may or may not be desirable. There +are two big problems with the above, however. The first problem is that, if +``background_operation`` returns an incomplete Deferred, it will expect its +caller to ``yield`` immediately, so will have cleared the logcontext. In this +example, that means that 'Request handling complete' will be logged without any +context. + +The second problem, which is potentially even worse, is that when the Deferred +returned by ``background_operation`` completes, it will restore the original +logcontext. There is nothing waiting on that Deferred, so the logcontext will +leak into the reactor and possibly get attached to some arbitrary future +operation. + +There are two potential solutions to this. + +One option is to surround the call to ``background_operation`` with a +``PreserveLoggingContext`` call. That will reset the logcontext before +starting ``background_operation`` (so the context restored when the deferred +completes will be the empty logcontext), and will restore the current +logcontext before continuing the foreground process: + +.. code:: python + + @defer.inlineCallbacks + def do_request_handling(): + yield foreground_operation() + + # start background_operation off in the empty logcontext, to + # avoid leaking the current context into the reactor. + with PreserveLoggingContext(): + background_operation() + + # this will now be logged against the request context + logger.debug("Request handling complete") + +Obviously that option means that the operations done in +``background_operation`` would be not be logged against a logcontext (though +that might be fixed by setting a different logcontext via a ``with +LoggingContext(...)`` in ``background_operation``). + +The second option is to use ``logcontext.preserve_fn``, which wraps a function +so that it doesn't reset the logcontext even when it returns an incomplete +deferred, and adds a callback to the returned deferred to reset the +logcontext. In other words, it turns a function that follows the Synapse rules +about logcontexts and Deferreds into one which behaves more like an external +function — the opposite operation to that described in the previous section. +It can be used like this: + +.. code:: python + + @defer.inlineCallbacks + def do_request_handling(): + yield foreground_operation() + + logcontext.preserve_fn(background_operation)() + + # this will now be logged against the request context + logger.debug("Request handling complete") + +XXX: I think ``preserve_context_over_fn`` is supposed to do the first option, +but the fact that it does ``preserve_context_over_deferred`` on its results +means that its use is fraught with difficulty. + +Passing synapse deferreds into third-party functions +---------------------------------------------------- + +A typical example of this is where we want to collect together two or more +deferred via ``defer.gatherResults``: + +.. code:: python + + d1 = operation1() + d2 = operation2() + d3 = defer.gatherResults([d1, d2]) + +This is really a variation of the fire-and-forget problem above, in that we are +firing off ``d1`` and ``d2`` without yielding on them. The difference +is that we now have third-party code attached to their callbacks. Anyway either +technique given in the `Fire-and-forget`_ section will work. + +Of course, the new Deferred returned by ``gatherResults`` needs to be wrapped +in order to make it follow the logcontext rules before we can yield it, as +described in `Where you create a new Deferred, make it follow the rules`_. + +So, option one: reset the logcontext before starting the operations to be +gathered: + +.. code:: python + + @defer.inlineCallbacks + def do_request_handling(): + with PreserveLoggingContext(): + d1 = operation1() + d2 = operation2() + result = yield defer.gatherResults([d1, d2]) + +In this case particularly, though, option two, of using +``logcontext.preserve_fn`` almost certainly makes more sense, so that +``operation1`` and ``operation2`` are both logged against the original +logcontext. This looks like: + +.. code:: python + + @defer.inlineCallbacks + def do_request_handling(): + d1 = logcontext.preserve_fn(operation1)() + d2 = logcontext.preserve_fn(operation2)() + + with PreserveLoggingContext(): + result = yield defer.gatherResults([d1, d2]) + + +Was all this really necessary? +------------------------------ + +The conventions used work fine for a linear flow where everything happens in +series via ``defer.inlineCallbacks`` and ``yield``, but are certainly tricky to +follow for any more exotic flows. It's hard not to wonder if we could have done +something else. + +We're not going to rewrite Synapse now, so the following is entirely of +academic interest, but I'd like to record some thoughts on an alternative +approach. + +I briefly prototyped some code following an alternative set of rules. I think +it would work, but I certainly didn't get as far as thinking how it would +interact with concepts as complicated as the cache descriptors. + +My alternative rules were: + +* functions always preserve the logcontext of their caller, whether or not they + are returning a Deferred. + +* Deferreds returned by synapse functions run their callbacks in the same + context as the function was orignally called in. + +The main point of this scheme is that everywhere that sets the logcontext is +responsible for clearing it before returning control to the reactor. + +So, for example, if you were the function which started a ``with +LoggingContext`` block, you wouldn't ``yield`` within it — instead you'd start +off the background process, and then leave the ``with`` block to wait for it: + +.. code:: python + + def handle_request(request_id): + with logcontext.LoggingContext() as request_context: + request_context.request = request_id + d = do_request_handling() + + def cb(r): + logger.debug("finished") + + d.addCallback(cb) + return d + +(in general, mixing ``with LoggingContext`` blocks and +``defer.inlineCallbacks`` in the same function leads to slighly +counter-intuitive code, under this scheme). + +Because we leave the original ``with`` block as soon as the Deferred is +returned (as opposed to waiting for it to be resolved, as we do today), the +logcontext is cleared before control passes back to the reactor; so if there is +some code within ``do_request_handling`` which needs to wait for a Deferred to +complete, there is no need for it to worry about clearing the logcontext before +doing so: + +.. code:: python + + def handle_request(): + r = do_some_stuff() + r.addCallback(do_some_more_stuff) + return r + +— and provided ``do_some_stuff`` follows the rules of returning a Deferred which +runs its callbacks in the original logcontext, all is happy. + +The business of a Deferred which runs its callbacks in the original logcontext +isn't hard to achieve — we have it today, in the shape of +``logcontext._PreservingContextDeferred``: + +.. code:: python + + def do_some_stuff(): + deferred = do_some_io() + pcd = _PreservingContextDeferred(LoggingContext.current_context()) + deferred.chainDeferred(pcd) + return pcd + +It turns out that, thanks to the way that Deferreds chain together, we +automatically get the property of a context-preserving deferred with +``defer.inlineCallbacks``, provided the final Defered the function ``yields`` +on has that property. So we can just write: + +.. code:: python + + @defer.inlineCallbacks + def handle_request(): + yield do_some_stuff() + yield do_some_more_stuff() + +To conclude: I think this scheme would have worked equally well, with less +danger of messing it up, and probably made some more esoteric code easier to +write. But again — changing the conventions of the entire Synapse codebase is +not a sensible option for the marginal improvement offered. diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 83ee3e3ce3..a6f1e7594e 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -29,7 +29,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.storage.engines import create_engine from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -187,7 +187,11 @@ def start(config_options): ps.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 7ed0de4117..a821a6ce62 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -35,7 +35,7 @@ from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -193,7 +193,11 @@ def start(config_options): ss.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index ca742de6b2..e52b0f240d 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -31,7 +31,7 @@ from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -184,7 +184,11 @@ def start(config_options): ss.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 0cf5b196e6..76c4cc54d1 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -35,7 +35,7 @@ from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -193,7 +193,11 @@ def start(config_options): ps.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 0b9d78c13c..2cdd2d39ff 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -52,7 +52,7 @@ from synapse.api.urls import ( ) from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.metrics import register_memory_metrics, get_metrics_for from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX @@ -456,7 +456,12 @@ def run(hs): def in_thread(): # Uncomment to enable tracing of log context changes. # sys.settrace(logcontext_tracer) - with LoggingContext("run"): + + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): change_resource_limit(hs.config.soft_file_limit) if hs.config.gc_thresholds: gc.set_threshold(*hs.config.gc_thresholds) diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index c5579d9e38..3c7984a237 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -32,7 +32,7 @@ from synapse.storage.engines import create_engine from synapse.storage.media_repository import MediaRepositoryStore from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -190,7 +190,11 @@ def start(config_options): ss.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index b025db54d4..ab682e52ec 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -31,7 +31,8 @@ from synapse.storage.engines import create_engine from synapse.storage import DataStore from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, preserve_fn, \ + PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -275,7 +276,11 @@ def start(config_options): ps.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 449fac771b..a68bb873fe 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -48,7 +48,8 @@ from synapse.storage.presence import PresenceStore, UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, preserve_fn, \ + PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.stringutils import random_string @@ -496,7 +497,11 @@ def start(config_options): ss.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index d7211ee9b3..80f27f8c53 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -96,10 +96,11 @@ class Keyring(object): verify_requests = [] for server_name, json_object in server_and_json: - logger.debug("Verifying for %s", server_name) key_ids = signature_ids(json_object, server_name) if not key_ids: + logger.warn("Request from %s: no supported signature keys", + server_name) deferred = defer.fail(SynapseError( 400, "Not signed with a supported algorithm", @@ -108,6 +109,9 @@ class Keyring(object): else: deferred = defer.Deferred() + logger.debug("Verifying for %s with key_ids %s", + server_name, key_ids) + verify_request = VerifyKeyRequest( server_name, key_ids, json_object, deferred ) @@ -142,6 +146,9 @@ class Keyring(object): json_object = verify_request.json_object + logger.debug("Got key %s %s:%s for server %s, verifying" % ( + key_id, verify_key.alg, verify_key.version, server_name, + )) try: verify_signed_json(json_object, server_name, verify_key) except: diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 11605b34a3..6be18880b9 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -15,6 +15,32 @@ class EventContext(object): + """ + Attributes: + current_state_ids (dict[(str, str), str]): + The current state map including the current event. + (type, state_key) -> event_id + + prev_state_ids (dict[(str, str), str]): + The current state map excluding the current event. + (type, state_key) -> event_id + + state_group (int): state group id + rejected (bool|str): A rejection reason if the event was rejected, else + False + + push_actions (list[(str, list[object])]): list of (user_id, actions) + tuples + + prev_group (int): Previously persisted state group. ``None`` for an + outlier. + delta_ids (dict[(str, str), str]): Delta from ``prev_group``. + (type, state_key) -> event_id. ``None`` for an outlier. + + prev_state_events (?): XXX: is this ever set to anything other than + the empty list? + """ + __slots__ = [ "current_state_ids", "prev_state_ids", diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d0c2b4d6ed..888dd01240 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -14,6 +14,7 @@ # limitations under the License. """Contains handlers for federation events.""" +import synapse.util.logcontext from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json from unpaddedbase64 import decode_base64 @@ -114,6 +115,14 @@ class FederationHandler(BaseHandler): logger.debug("Already seen pdu %s", pdu.event_id) return + # If we are currently in the process of joining this room, then we + # queue up events for later processing. + if pdu.room_id in self.room_queues: + logger.info("Ignoring PDU %s for room %s from %s for now; join " + "in progress", pdu.event_id, pdu.room_id, origin) + self.room_queues[pdu.room_id].append((pdu, origin)) + return + state = None auth_chain = [] @@ -274,26 +283,13 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def _process_received_pdu(self, origin, pdu, state=None, auth_chain=None): + def _process_received_pdu(self, origin, pdu, state, auth_chain): """ Called when we have a new pdu. We need to do auth checks and put it through the StateHandler. - - auth_chain and state are None if we already have the necessary state - and prev_events in the db """ event = pdu - logger.debug("Got event: %s", event.event_id) - - # If we are currently in the process of joining this room, then we - # queue up events for later processing. - if event.room_id in self.room_queues: - self.room_queues[event.room_id].append((pdu, origin)) - return - - logger.debug("Processing event: %s", event.event_id) - - logger.debug("Event: %s", event) + logger.debug("Processing event: %s", event) # FIXME (erikj): Awful hack to make the case where we are not currently # in the room work @@ -862,8 +858,6 @@ class FederationHandler(BaseHandler): """ logger.debug("Joining %s to %s", joinee, room_id) - yield self.store.clean_room_for_join(room_id) - origin, event = yield self._make_and_verify_event( target_hosts, room_id, @@ -872,7 +866,15 @@ class FederationHandler(BaseHandler): content, ) + # This shouldn't happen, because the RoomMemberHandler has a + # linearizer lock which only allows one operation per user per room + # at a time - so this is just paranoia. + assert (room_id not in self.room_queues) + self.room_queues[room_id] = [] + + yield self.store.clean_room_for_join(room_id) + handled_events = set() try: @@ -925,17 +927,36 @@ class FederationHandler(BaseHandler): room_queue = self.room_queues[room_id] del self.room_queues[room_id] - for p, origin in room_queue: - if p.event_id in handled_events: - continue + # we don't need to wait for the queued events to be processed - + # it's just a best-effort thing at this point. We do want to do + # them roughly in order, though, otherwise we'll end up making + # lots of requests for missing prev_events which we do actually + # have. Hence we fire off the deferred, but don't wait for it. - try: - self._process_received_pdu(origin, p) - except: - logger.exception("Couldn't handle pdu") + synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)( + room_queue + ) defer.returnValue(True) + @defer.inlineCallbacks + def _handle_queued_pdus(self, room_queue): + """Process PDUs which got queued up while we were busy send_joining. + + Args: + room_queue (list[FrozenEvent, str]): list of PDUs to be processed + and the servers that sent them + """ + for p, origin in room_queue: + try: + logger.info("Processing queued PDU %s which was received " + "while we were joining %s", p.event_id, p.room_id) + yield self.on_receive_pdu(origin, p) + except Exception as e: + logger.warn( + "Error handling queued PDU %s from %s: %s", + p.event_id, origin, e) + @defer.inlineCallbacks @log_function def on_make_join_request(self, room_id, user_id): @@ -1517,7 +1538,17 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _prep_event(self, origin, event, state=None, auth_events=None): + """ + Args: + origin: + event: + state: + auth_events: + + Returns: + Deferred, which resolves to synapse.events.snapshot.EventContext + """ context = yield self.state_handler.compute_event_context( event, old_state=state, ) diff --git a/synapse/state.py b/synapse/state.py index 383d32b163..9a523a1b89 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -177,17 +177,12 @@ class StateHandler(object): @defer.inlineCallbacks def compute_event_context(self, event, old_state=None): - """ Fills out the context with the `current state` of the graph. The - `current state` here is defined to be the state of the event graph - just before the event - i.e. it never includes `event` - - If `event` has `auth_events` then this will also fill out the - `auth_events` field on `context` from the `current_state`. + """Build an EventContext structure for the event. Args: - event (EventBase) + event (synapse.events.EventBase): Returns: - an EventContext + synapse.events.snapshot.EventContext: """ context = EventContext() diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 94b2bcc54a..813ad59e56 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import synapse.util.async from ._base import SQLBaseStore from . import engines @@ -84,24 +85,14 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_performance = {} self._background_update_queue = [] self._background_update_handlers = {} - self._background_update_timer = None @defer.inlineCallbacks def start_doing_background_updates(self): - assert self._background_update_timer is None, \ - "background updates already running" - logger.info("Starting background schema updates") while True: - sleep = defer.Deferred() - self._background_update_timer = self._clock.call_later( - self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None - ) - try: - yield sleep - finally: - self._background_update_timer = None + yield synapse.util.async.sleep( + self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.) try: result = yield self.do_next_background_update( diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 256e50dc20..0d97de2fe7 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -201,19 +201,19 @@ class EventFederationStore(SQLBaseStore): def _update_min_depth_for_room_txn(self, txn, room_id, depth): min_depth = self._get_min_depth_interaction(txn, room_id) - do_insert = depth < min_depth if min_depth else True + if min_depth and depth >= min_depth: + return - if do_insert: - self._simple_upsert_txn( - txn, - table="room_depth", - keyvalues={ - "room_id": room_id, - }, - values={ - "min_depth": depth, - }, - ) + self._simple_upsert_txn( + txn, + table="room_depth", + keyvalues={ + "room_id": room_id, + }, + values={ + "min_depth": depth, + }, + ) def _handle_mult_prev_events(self, txn, events): """ diff --git a/synapse/storage/events.py b/synapse/storage/events.py index e8ad5f2eac..ea2f4f3d05 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -34,14 +34,16 @@ from canonicaljson import encode_canonical_json from collections import deque, namedtuple, OrderedDict from functools import wraps -import synapse import synapse.metrics - import logging import math import ujson as json +# these are only included to make the type annotations work +from synapse.events import EventBase # noqa: F401 +from synapse.events.snapshot import EventContext # noqa: F401 + logger = logging.getLogger(__name__) @@ -82,6 +84,11 @@ class _EventPeristenceQueue(object): def add_to_queue(self, room_id, events_and_contexts, backfilled): """Add events to the queue, with the given persist_event options. + + Args: + room_id (str): + events_and_contexts (list[(EventBase, EventContext)]): + backfilled (bool): """ queue = self._event_persist_queues.setdefault(room_id, deque()) if queue: @@ -227,6 +234,17 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks @log_function def persist_event(self, event, context, backfilled=False): + """ + + Args: + event (EventBase): + context (EventContext): + backfilled (bool): + + Returns: + Deferred: resolves to (int, int): the stream ordering of ``event``, + and the stream ordering of the latest persisted event + """ deferred = self._event_persist_queue.add_to_queue( event.room_id, [(event, context)], backfilled=backfilled, @@ -253,6 +271,16 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def _persist_events(self, events_and_contexts, backfilled=False, delete_existing=False): + """Persist events to db + + Args: + events_and_contexts (list[(EventBase, EventContext)]): + backfilled (bool): + delete_existing (bool): + + Returns: + Deferred: resolves when the events have been persisted + """ if not events_and_contexts: return @@ -554,11 +582,91 @@ class EventsStore(SQLBaseStore): and the rejections table. Things reading from those table will need to check whether the event was rejected. - If delete_existing is True then existing events will be purged from the - database before insertion. This is useful when retrying due to IntegrityError. + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + events_and_contexts (list[(EventBase, EventContext)]): + events to persist + backfilled (bool): True if the events were backfilled + delete_existing (bool): True to purge existing table rows for the + events from the database. This is useful when retrying due to + IntegrityError. + current_state_for_room (dict[str, (list[str], list[str])]): + The current-state delta for each room. For each room, a tuple + (to_delete, to_insert), being a list of event ids to be removed + from the current state, and a list of event ids to be added to + the current state. + new_forward_extremeties (dict[str, list[str]]): + The new forward extremities for each room. For each room, a + list of the event ids which are the forward extremities. + """ + self._update_current_state_txn(txn, current_state_for_room) + max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering - for room_id, current_state_tuple in current_state_for_room.iteritems(): + self._update_forward_extremities_txn( + txn, + new_forward_extremities=new_forward_extremeties, + max_stream_order=max_stream_order, + ) + + # Ensure that we don't have the same event twice. + events_and_contexts = self._filter_events_and_contexts_for_duplicates( + events_and_contexts, + ) + + self._update_room_depths_txn( + txn, + events_and_contexts=events_and_contexts, + backfilled=backfilled, + ) + + # _update_outliers_txn filters out any events which have already been + # persisted, and returns the filtered list. + events_and_contexts = self._update_outliers_txn( + txn, + events_and_contexts=events_and_contexts, + ) + + # From this point onwards the events are only events that we haven't + # seen before. + + if delete_existing: + # For paranoia reasons, we go and delete all the existing entries + # for these events so we can reinsert them. + # This gets around any problems with some tables already having + # entries. + self._delete_existing_rows_txn( + txn, + events_and_contexts=events_and_contexts, + ) + + self._store_event_txn( + txn, + events_and_contexts=events_and_contexts, + ) + + # Insert into the state_groups, state_groups_state, and + # event_to_state_groups tables. + self._store_mult_state_groups_txn(txn, events_and_contexts) + + # _store_rejected_events_txn filters out any events which were + # rejected, and returns the filtered list. + events_and_contexts = self._store_rejected_events_txn( + txn, + events_and_contexts=events_and_contexts, + ) + + # From this point onwards the events are only ones that weren't + # rejected. + + self._update_metadata_tables_txn( + txn, + events_and_contexts=events_and_contexts, + backfilled=backfilled, + ) + + def _update_current_state_txn(self, txn, state_delta_by_room): + for room_id, current_state_tuple in state_delta_by_room.iteritems(): to_delete, to_insert = current_state_tuple txn.executemany( "DELETE FROM current_state_events WHERE event_id = ?", @@ -608,7 +716,9 @@ class EventsStore(SQLBaseStore): txn, self.get_current_state_ids, (room_id,) ) - for room_id, new_extrem in new_forward_extremeties.items(): + def _update_forward_extremities_txn(self, txn, new_forward_extremities, + max_stream_order): + for room_id, new_extrem in new_forward_extremities.items(): self._simple_delete_txn( txn, table="event_forward_extremities", @@ -626,7 +736,7 @@ class EventsStore(SQLBaseStore): "event_id": ev_id, "room_id": room_id, } - for room_id, new_extrem in new_forward_extremeties.items() + for room_id, new_extrem in new_forward_extremities.items() for ev_id in new_extrem ], ) @@ -643,13 +753,22 @@ class EventsStore(SQLBaseStore): "event_id": event_id, "stream_ordering": max_stream_order, } - for room_id, new_extrem in new_forward_extremeties.items() + for room_id, new_extrem in new_forward_extremities.items() for event_id in new_extrem ] ) - # Ensure that we don't have the same event twice. - # Pick the earliest non-outlier if there is one, else the earliest one. + @classmethod + def _filter_events_and_contexts_for_duplicates(cls, events_and_contexts): + """Ensure that we don't have the same event twice. + + Pick the earliest non-outlier if there is one, else the earliest one. + + Args: + events_and_contexts (list[(EventBase, EventContext)]): + Returns: + list[(EventBase, EventContext)]: filtered list + """ new_events_and_contexts = OrderedDict() for event, context in events_and_contexts: prev_event_context = new_events_and_contexts.get(event.event_id) @@ -662,9 +781,17 @@ class EventsStore(SQLBaseStore): new_events_and_contexts[event.event_id] = (event, context) else: new_events_and_contexts[event.event_id] = (event, context) + return new_events_and_contexts.values() - events_and_contexts = new_events_and_contexts.values() + def _update_room_depths_txn(self, txn, events_and_contexts, backfilled): + """Update min_depth for each room + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + events_and_contexts (list[(EventBase, EventContext)]): events + we are persisting + backfilled (bool): True if the events were backfilled + """ depth_updates = {} for event, context in events_and_contexts: # Remove the any existing cache entries for the event_ids @@ -683,6 +810,21 @@ class EventsStore(SQLBaseStore): for room_id, depth in depth_updates.items(): self._update_min_depth_for_room_txn(txn, room_id, depth) + def _update_outliers_txn(self, txn, events_and_contexts): + """Update any outliers with new event info. + + This turns outliers into ex-outliers (unless the new event was + rejected). + + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + events_and_contexts (list[(EventBase, EventContext)]): events + we are persisting + + Returns: + list[(EventBase, EventContext)] new list, without events which + are already in the events table. + """ txn.execute( "SELECT event_id, outlier FROM events WHERE event_id in (%s)" % ( ",".join(["?"] * len(events_and_contexts)), @@ -697,19 +839,16 @@ class EventsStore(SQLBaseStore): to_remove = set() for event, context in events_and_contexts: - if context.rejected: - # If the event is rejected then we don't care if the event - # was an outlier or not. - if event.event_id in have_persisted: - # If we have already seen the event then ignore it. - to_remove.add(event) - continue - if event.event_id not in have_persisted: continue to_remove.add(event) + if context.rejected: + # If the event is rejected then we don't care if the event + # was an outlier or not. + continue + outlier_persisted = have_persisted[event.event_id] if not event.internal_metadata.is_outlier() and outlier_persisted: # We received a copy of an event that we had already stored as @@ -764,37 +903,19 @@ class EventsStore(SQLBaseStore): # event isn't an outlier any more. self._update_backward_extremeties(txn, [event]) - events_and_contexts = [ + return [ ec for ec in events_and_contexts if ec[0] not in to_remove ] + @classmethod + def _delete_existing_rows_txn(cls, txn, events_and_contexts): if not events_and_contexts: - # Make sure we don't pass an empty list to functions that expect to - # be storing at least one element. + # nothing to do here return - # From this point onwards the events are only events that we haven't - # seen before. + logger.info("Deleting existing") - def event_dict(event): - return { - k: v - for k, v in event.get_dict().items() - if k not in [ - "redacted", - "redacted_because", - ] - } - - if delete_existing: - # For paranoia reasons, we go and delete all the existing entries - # for these events so we can reinsert them. - # This gets around any problems with some tables already having - # entries. - - logger.info("Deleting existing") - - for table in ( + for table in ( "events", "event_auth", "event_json", @@ -816,18 +937,34 @@ class EventsStore(SQLBaseStore): "redactions", "room_memberships", "topics" - ): - txn.executemany( - "DELETE FROM %s WHERE event_id = ?" % (table,), - [(ev.event_id,) for ev, _ in events_and_contexts] - ) - for table in ( - "event_push_actions", - ): - txn.executemany( - "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,), - [(ev.event_id,) for ev, _ in events_and_contexts] - ) + ): + txn.executemany( + "DELETE FROM %s WHERE event_id = ?" % (table,), + [(ev.event_id,) for ev, _ in events_and_contexts] + ) + + def _store_event_txn(self, txn, events_and_contexts): + """Insert new events into the event and event_json tables + + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + events_and_contexts (list[(EventBase, EventContext)]): events + we are persisting + """ + + if not events_and_contexts: + # nothing to do here + return + + def event_dict(event): + return { + k: v + for k, v in event.get_dict().items() + if k not in [ + "redacted", + "redacted_because", + ] + } self._simple_insert_many_txn( txn, @@ -871,6 +1008,19 @@ class EventsStore(SQLBaseStore): ], ) + def _store_rejected_events_txn(self, txn, events_and_contexts): + """Add rows to the 'rejections' table for received events which were + rejected + + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + events_and_contexts (list[(EventBase, EventContext)]): events + we are persisting + + Returns: + list[(EventBase, EventContext)] new list, without the rejected + events. + """ # Remove the rejected events from the list now that we've added them # to the events table and the events_json table. to_remove = set() @@ -882,16 +1032,23 @@ class EventsStore(SQLBaseStore): ) to_remove.add(event) - events_and_contexts = [ + return [ ec for ec in events_and_contexts if ec[0] not in to_remove ] - if not events_and_contexts: - # Make sure we don't pass an empty list to functions that expect to - # be storing at least one element. - return + def _update_metadata_tables_txn(self, txn, events_and_contexts, backfilled): + """Update all the miscellaneous tables for new events - # From this point onwards the events are only ones that weren't rejected. + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + events_and_contexts (list[(EventBase, EventContext)]): events + we are persisting + backfilled (bool): True if the events were backfilled + """ + + if not events_and_contexts: + # nothing to do here + return for event, context in events_and_contexts: # Insert all the push actions into the event_push_actions table. @@ -921,10 +1078,6 @@ class EventsStore(SQLBaseStore): ], ) - # Insert into the state_groups, state_groups_state, and - # event_to_state_groups tables. - self._store_mult_state_groups_txn(txn, events_and_contexts) - # Update the event_forward_extremities, event_backward_extremities and # event_edges tables. self._handle_mult_prev_events( @@ -1011,13 +1164,6 @@ class EventsStore(SQLBaseStore): # Prefill the event cache self._add_to_cache(txn, events_and_contexts) - if backfilled: - # Backfilled events come before the current state so we don't need - # to update the current state table - return - - return - def _add_to_cache(self, txn, events_and_contexts): to_prefill = [] diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 27f1ec89ec..1b42bea07a 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -136,6 +136,16 @@ class StateStore(SQLBaseStore): continue if context.current_state_ids is None: + # AFAIK, this can never happen + logger.error( + "Non-outlier event %s had current_state_ids==None", + event.event_id) + continue + + # if the event was rejected, just give it the same state as its + # predecessor. + if context.rejected: + state_groups[event.event_id] = context.prev_group continue state_groups[event.event_id] = context.state_group diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 6c83eb213d..ff67b1d794 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -12,6 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" Thread-local-alike tracking of log contexts within synapse + +This module provides objects and utilities for tracking contexts through +synapse code, so that log lines can include a request identifier, and so that +CPU and database activity can be accounted for against the request that caused +them. + +See doc/log_contexts.rst for details on how this works. +""" + from twisted.internet import defer import threading @@ -309,21 +319,43 @@ def preserve_context_over_deferred(deferred, context=None): def preserve_fn(f): - """Ensures that function is called with correct context and that context is - restored after return. Useful for wrapping functions that return a deferred - which you don't yield on. + """Wraps a function, to ensure that the current context is restored after + return from the function, and that the sentinel context is set once the + deferred returned by the funtion completes. + + Useful for wrapping functions that return a deferred which you don't yield + on. """ + def reset_context(result): + LoggingContext.set_current_context(LoggingContext.sentinel) + return result + + # XXX: why is this here rather than inside g? surely we want to preserve + # the context from the time the function was called, not when it was + # wrapped? current = LoggingContext.current_context() def g(*args, **kwargs): - with PreserveLoggingContext(current): - res = f(*args, **kwargs) - if isinstance(res, defer.Deferred): - return preserve_context_over_deferred( - res, context=LoggingContext.sentinel - ) - else: - return res + res = f(*args, **kwargs) + if isinstance(res, defer.Deferred) and not res.called: + # The function will have reset the context before returning, so + # we need to restore it now. + LoggingContext.set_current_context(current) + + # The original context will be restored when the deferred + # completes, but there is nothing waiting for it, so it will + # get leaked into the reactor or some other function which + # wasn't expecting it. We therefore need to reset the context + # here. + # + # (If this feels asymmetric, consider it this way: we are + # effectively forking a new thread of execution. We are + # probably currently within a ``with LoggingContext()`` block, + # which is supposed to have a single entry and exit point. But + # by spawning off another deferred, we are effectively + # adding a new exit point.) + res.addBoth(reset_context) + return res return g diff --git a/tests/util/test_log_context.py b/tests/util/test_log_context.py index 65a330a0e9..9ffe209c4d 100644 --- a/tests/util/test_log_context.py +++ b/tests/util/test_log_context.py @@ -1,8 +1,10 @@ +import twisted.python.failure from twisted.internet import defer from twisted.internet import reactor from .. import unittest from synapse.util.async import sleep +from synapse.util import logcontext from synapse.util.logcontext import LoggingContext @@ -33,3 +35,62 @@ class LoggingContextTestCase(unittest.TestCase): context_one.test_key = "one" yield sleep(0) self._check_test_key("one") + + def _test_preserve_fn(self, function): + sentinel_context = LoggingContext.current_context() + + callback_completed = [False] + + @defer.inlineCallbacks + def cb(): + context_one.test_key = "one" + yield function() + self._check_test_key("one") + + callback_completed[0] = True + + with LoggingContext() as context_one: + context_one.test_key = "one" + + # fire off function, but don't wait on it. + logcontext.preserve_fn(cb)() + + self._check_test_key("one") + + # now wait for the function under test to have run, and check that + # the logcontext is left in a sane state. + d2 = defer.Deferred() + + def check_logcontext(): + if not callback_completed[0]: + reactor.callLater(0.01, check_logcontext) + return + + # make sure that the context was reset before it got thrown back + # into the reactor + try: + self.assertIs(LoggingContext.current_context(), + sentinel_context) + d2.callback(None) + except BaseException: + d2.errback(twisted.python.failure.Failure()) + + reactor.callLater(0.01, check_logcontext) + + # test is done once d2 finishes + return d2 + + def test_preserve_fn_with_blocking_fn(self): + @defer.inlineCallbacks + def blocking_function(): + yield sleep(0) + + return self._test_preserve_fn(blocking_function) + + def test_preserve_fn_with_non_blocking_fn(self): + @defer.inlineCallbacks + def nonblocking_function(): + with logcontext.PreserveLoggingContext(): + yield defer.succeed(None) + + return self._test_preserve_fn(nonblocking_function)