Remove preserve_context_over_{fn, deferred}

Both of these functions ae known to leak logcontexts. Replace the remaining
calls to them and kill them off.
This commit is contained in:
Richard van der Hoff 2017-11-14 11:22:42 +00:00
parent 7bd6c87eca
commit 7e6fa29cb5
10 changed files with 24 additions and 67 deletions

View file

@ -298,10 +298,6 @@ It can be used like this:
# this will now be logged against the request context
logger.debug("Request handling complete")
XXX: I think ``preserve_context_over_fn`` is supposed to do the first option,
but the fact that it does ``preserve_context_over_deferred`` on its results
means that its use is fraught with difficulty.
Passing synapse deferreds into third-party functions
----------------------------------------------------

View file

@ -25,7 +25,7 @@ from synapse.api.errors import (
from synapse.util import unwrapFirstError, logcontext
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logutils import log_function
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.events import FrozenEvent, builder
import synapse.metrics
@ -420,7 +420,7 @@ class FederationClient(FederationBase):
for e_id in batch
]
res = yield preserve_context_over_deferred(
res = yield make_deferred_yieldable(
defer.DeferredList(deferreds, consumeErrors=True)
)
for success, result in res:

View file

@ -17,7 +17,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes
from synapse.util.metrics import Measure
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
import logging
@ -159,7 +159,7 @@ class ApplicationServicesHandler(object):
def query_3pe(self, kind, protocol, fields):
services = yield self._get_services_for_3pn(protocol)
results = yield preserve_context_over_deferred(defer.DeferredList([
results = yield make_deferred_yieldable(defer.DeferredList([
preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields)
for service in services
], consumeErrors=True))

View file

@ -27,7 +27,7 @@ from synapse.types import (
from synapse.util import unwrapFirstError
from synapse.util.async import concurrently_execute
from synapse.util.caches.snapshot_cache import SnapshotCache
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@ -163,7 +163,7 @@ class InitialSyncHandler(BaseHandler):
lambda states: states[event.event_id]
)
(messages, token), current_state = yield preserve_context_over_deferred(
(messages, token), current_state = yield make_deferred_yieldable(
defer.gatherResults(
[
preserve_fn(self.store.get_recent_events_for_room)(

View file

@ -17,7 +17,7 @@
from twisted.internet import defer
from .pusher import PusherFactory
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.async import run_on_reactor
import logging
@ -136,7 +136,7 @@ class PusherPool:
)
)
yield preserve_context_over_deferred(defer.gatherResults(deferreds))
yield make_deferred_yieldable(defer.gatherResults(deferreds))
except Exception:
logger.exception("Exception in pusher on_new_notifications")
@ -161,7 +161,7 @@ class PusherPool:
preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id)
)
yield preserve_context_over_deferred(defer.gatherResults(deferreds))
yield make_deferred_yieldable(defer.gatherResults(deferreds))
except Exception:
logger.exception("Exception in pusher on_new_receipts")

View file

@ -39,7 +39,7 @@ from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached
from synapse.api.constants import EventTypes
from synapse.types import RoomStreamToken
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
import logging
@ -234,7 +234,7 @@ class StreamStore(SQLBaseStore):
results = {}
room_ids = list(room_ids)
for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)):
res = yield preserve_context_over_deferred(defer.gatherResults([
res = yield make_deferred_yieldable(defer.gatherResults([
preserve_fn(self.get_room_events_stream_for_room)(
room_id, from_key, to_key, limit, order=order,
)

View file

@ -17,7 +17,7 @@
from twisted.internet import defer, reactor
from .logcontext import (
PreserveLoggingContext, preserve_fn, preserve_context_over_deferred,
PreserveLoggingContext, make_deferred_yieldable, preserve_fn
)
from synapse.util import logcontext, unwrapFirstError
@ -351,7 +351,7 @@ class ReadWriteLock(object):
# We wait for the latest writer to finish writing. We can safely ignore
# any existing readers... as they're readers.
yield curr_writer
yield make_deferred_yieldable(curr_writer)
@contextmanager
def _ctx_manager():
@ -380,7 +380,7 @@ class ReadWriteLock(object):
curr_readers.clear()
self.key_to_current_writer[key] = new_defer
yield preserve_context_over_deferred(defer.gatherResults(to_wait_on))
yield make_deferred_yieldable(defer.gatherResults(to_wait_on))
@contextmanager
def _ctx_manager():

View file

@ -13,32 +13,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from synapse.util.logcontext import (
PreserveLoggingContext, preserve_context_over_fn
)
from synapse.util import unwrapFirstError
import logging
from twisted.internet import defer
from synapse.util import unwrapFirstError
from synapse.util.logcontext import PreserveLoggingContext
logger = logging.getLogger(__name__)
def user_left_room(distributor, user, room_id):
return preserve_context_over_fn(
distributor.fire,
"user_left_room", user=user, room_id=room_id
)
with PreserveLoggingContext():
distributor.fire("user_left_room", user=user, room_id=room_id)
def user_joined_room(distributor, user, room_id):
return preserve_context_over_fn(
distributor.fire,
"user_joined_room", user=user, room_id=room_id
)
with PreserveLoggingContext():
distributor.fire("user_joined_room", user=user, room_id=room_id)
class Distributor(object):

View file

@ -291,37 +291,6 @@ class _PreservingContextDeferred(defer.Deferred):
return g
def preserve_context_over_fn(fn, *args, **kwargs):
"""Takes a function and invokes it with the given arguments, but removes
and restores the current logging context while doing so.
If the result is a deferred, call preserve_context_over_deferred before
returning it.
"""
with PreserveLoggingContext():
res = fn(*args, **kwargs)
if isinstance(res, defer.Deferred):
return preserve_context_over_deferred(res)
else:
return res
def preserve_context_over_deferred(deferred, context=None):
"""Given a deferred wrap it such that any callbacks added later to it will
be invoked with the current context.
Deprecated: this almost certainly doesn't do want you want, ie make
the deferred follow the synapse logcontext rules: try
``make_deferred_yieldable`` instead.
"""
if context is None:
context = LoggingContext.current_context()
d = _PreservingContextDeferred(context)
deferred.chainDeferred(d)
return d
def preserve_fn(f):
"""Wraps a function, to ensure that the current context is restored after
return from the function, and that the sentinel context is set once the

View file

@ -17,7 +17,7 @@ from twisted.internet import defer
from synapse.api.constants import Membership, EventTypes
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
import logging
@ -58,7 +58,7 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state,
always_include_ids (set(event_id)): set of event ids to specifically
include (unless sender is ignored)
"""
forgotten = yield preserve_context_over_deferred(defer.gatherResults([
forgotten = yield make_deferred_yieldable(defer.gatherResults([
defer.maybeDeferred(
preserve_fn(store.who_forgot_in_room),
room_id,