mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-22 09:35:45 +03:00
Use deferred.addTimeout instead of time_bound_deferred
This doesn't feel like a wheel we need to reinvent.
This commit is contained in:
parent
dc875d2712
commit
1ea904b9f0
6 changed files with 60 additions and 131 deletions
|
@ -1,5 +1,6 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
|
@ -12,3 +13,24 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from twisted.internet.defer import CancelledError
|
||||
from twisted.python import failure
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
|
||||
|
||||
class RequestTimedOutError(SynapseError):
|
||||
"""Exception representing timeout of an outbound request"""
|
||||
def __init__(self):
|
||||
super(RequestTimedOutError, self).__init__(504, "Timed out")
|
||||
|
||||
|
||||
def cancelled_to_request_timed_out_error(value):
|
||||
"""Turns CancelledErrors into RequestTimedOutErrors.
|
||||
|
||||
For use with deferred.addTimeout()
|
||||
"""
|
||||
if isinstance(value, failure.Failure):
|
||||
value.trap(CancelledError)
|
||||
raise RequestTimedOutError()
|
||||
return value
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
|
@ -18,9 +19,9 @@ from OpenSSL.SSL import VERIFY_NONE
|
|||
from synapse.api.errors import (
|
||||
CodeMessageException, MatrixCodeMessageException, SynapseError, Codes,
|
||||
)
|
||||
from synapse.http import cancelled_to_request_timed_out_error
|
||||
from synapse.util.caches import CACHE_SIZE_FACTOR
|
||||
from synapse.util.logcontext import make_deferred_yieldable
|
||||
from synapse.util import logcontext
|
||||
import synapse.metrics
|
||||
from synapse.http.endpoint import SpiderEndpoint
|
||||
|
||||
|
@ -95,21 +96,16 @@ class SimpleHttpClient(object):
|
|||
# counters to it
|
||||
outgoing_requests_counter.inc(method)
|
||||
|
||||
def send_request():
|
||||
request_deferred = self.agent.request(
|
||||
method, uri, *args, **kwargs
|
||||
)
|
||||
|
||||
return self.clock.time_bound_deferred(
|
||||
request_deferred,
|
||||
time_out=60,
|
||||
)
|
||||
|
||||
logger.info("Sending request %s %s", method, uri)
|
||||
|
||||
try:
|
||||
with logcontext.PreserveLoggingContext():
|
||||
response = yield send_request()
|
||||
request_deferred = self.agent.request(
|
||||
method, uri, *args, **kwargs
|
||||
)
|
||||
request_deferred.addTimeout(
|
||||
60, reactor, cancelled_to_request_timed_out_error,
|
||||
)
|
||||
response = yield make_deferred_yieldable(request_deferred)
|
||||
|
||||
incoming_responses_counter.inc(method, response.code)
|
||||
logger.info(
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
|
@ -12,17 +13,19 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import synapse.util.retryutils
|
||||
from twisted.internet import defer, reactor, protocol
|
||||
from twisted.internet.error import DNSLookupError
|
||||
from twisted.web.client import readBody, HTTPConnectionPool, Agent
|
||||
from twisted.web.http_headers import Headers
|
||||
from twisted.web._newclient import ResponseDone
|
||||
|
||||
from synapse.http import cancelled_to_request_timed_out_error
|
||||
from synapse.http.endpoint import matrix_federation_endpoint
|
||||
import synapse.metrics
|
||||
from synapse.util.async import sleep
|
||||
from synapse.util import logcontext
|
||||
import synapse.metrics
|
||||
from synapse.util.logcontext import make_deferred_yieldable
|
||||
import synapse.util.retryutils
|
||||
|
||||
from canonicaljson import encode_canonical_json
|
||||
|
||||
|
@ -184,21 +187,19 @@ class MatrixFederationHttpClient(object):
|
|||
producer = body_callback(method, http_url_bytes, headers_dict)
|
||||
|
||||
try:
|
||||
def send_request():
|
||||
request_deferred = self.agent.request(
|
||||
method,
|
||||
url_bytes,
|
||||
Headers(headers_dict),
|
||||
producer
|
||||
)
|
||||
|
||||
return self.clock.time_bound_deferred(
|
||||
request_deferred,
|
||||
time_out=timeout / 1000. if timeout else 60,
|
||||
)
|
||||
|
||||
with logcontext.PreserveLoggingContext():
|
||||
response = yield send_request()
|
||||
request_deferred = self.agent.request(
|
||||
method,
|
||||
url_bytes,
|
||||
Headers(headers_dict),
|
||||
producer
|
||||
)
|
||||
request_deferred.addTimeout(
|
||||
timeout / 1000. if timeout else 60,
|
||||
reactor, cancelled_to_request_timed_out_error,
|
||||
)
|
||||
response = yield make_deferred_yieldable(
|
||||
request_deferred,
|
||||
)
|
||||
|
||||
log_result = "%d %s" % (response.code, response.phrase,)
|
||||
break
|
||||
|
|
|
@ -13,12 +13,13 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.internet.defer import TimeoutError
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import AuthError
|
||||
from synapse.handlers.presence import format_user_presence_state
|
||||
|
||||
from synapse.util import DeferredTimedOutError
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.util.async import ObservableDeferred
|
||||
from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
|
||||
|
@ -331,11 +332,11 @@ class Notifier(object):
|
|||
# Now we wait for the _NotifierUserStream to be told there
|
||||
# is a new token.
|
||||
listener = user_stream.new_listener(prev_token)
|
||||
listener.deferred.addTimeout(
|
||||
(end_time - now) / 1000., reactor,
|
||||
)
|
||||
with PreserveLoggingContext():
|
||||
yield self.clock.time_bound_deferred(
|
||||
listener.deferred,
|
||||
time_out=(end_time - now) / 1000.
|
||||
)
|
||||
yield listener.deferred
|
||||
|
||||
current_token = user_stream.current_token
|
||||
|
||||
|
@ -346,7 +347,7 @@ class Notifier(object):
|
|||
# Update the prev_token to the current_token since nothing
|
||||
# has happened between the old prev_token and the current_token
|
||||
prev_token = current_token
|
||||
except DeferredTimedOutError:
|
||||
except TimeoutError:
|
||||
break
|
||||
except defer.CancelledError:
|
||||
break
|
||||
|
@ -551,13 +552,11 @@ class Notifier(object):
|
|||
if end_time <= now:
|
||||
break
|
||||
|
||||
listener.deferred.addTimeout((end_time - now) / 1000., reactor)
|
||||
try:
|
||||
with PreserveLoggingContext():
|
||||
yield self.clock.time_bound_deferred(
|
||||
listener.deferred,
|
||||
time_out=(end_time - now) / 1000.
|
||||
)
|
||||
except DeferredTimedOutError:
|
||||
yield listener.deferred
|
||||
except TimeoutError:
|
||||
break
|
||||
except defer.CancelledError:
|
||||
break
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.util.logcontext import PreserveLoggingContext
|
||||
|
||||
from twisted.internet import defer, reactor, task
|
||||
|
@ -24,11 +23,6 @@ import logging
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DeferredTimedOutError(SynapseError):
|
||||
def __init__(self):
|
||||
super(DeferredTimedOutError, self).__init__(504, "Timed out")
|
||||
|
||||
|
||||
def unwrapFirstError(failure):
|
||||
# defer.gatherResults and DeferredLists wrap failures.
|
||||
failure.trap(defer.FirstError)
|
||||
|
@ -85,53 +79,3 @@ class Clock(object):
|
|||
except Exception:
|
||||
if not ignore_errs:
|
||||
raise
|
||||
|
||||
def time_bound_deferred(self, given_deferred, time_out):
|
||||
if given_deferred.called:
|
||||
return given_deferred
|
||||
|
||||
ret_deferred = defer.Deferred()
|
||||
|
||||
def timed_out_fn():
|
||||
e = DeferredTimedOutError()
|
||||
|
||||
try:
|
||||
ret_deferred.errback(e)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
given_deferred.cancel()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
timer = None
|
||||
|
||||
def cancel(res):
|
||||
try:
|
||||
self.cancel_call_later(timer)
|
||||
except Exception:
|
||||
pass
|
||||
return res
|
||||
|
||||
ret_deferred.addBoth(cancel)
|
||||
|
||||
def success(res):
|
||||
try:
|
||||
ret_deferred.callback(res)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return res
|
||||
|
||||
def err(res):
|
||||
try:
|
||||
ret_deferred.errback(res)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
given_deferred.addCallbacks(callback=success, errback=err)
|
||||
|
||||
timer = self.call_later(time_out, timed_out_fn)
|
||||
|
||||
return ret_deferred
|
||||
|
|
|
@ -1,33 +0,0 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2017 Vector Creations Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from synapse import util
|
||||
from twisted.internet import defer
|
||||
from tests import unittest
|
||||
|
||||
|
||||
class ClockTestCase(unittest.TestCase):
|
||||
@defer.inlineCallbacks
|
||||
def test_time_bound_deferred(self):
|
||||
# just a deferred which never resolves
|
||||
slow_deferred = defer.Deferred()
|
||||
|
||||
clock = util.Clock()
|
||||
time_bound = clock.time_bound_deferred(slow_deferred, 0.001)
|
||||
|
||||
try:
|
||||
yield time_bound
|
||||
self.fail("Expected timedout error, but got nothing")
|
||||
except util.DeferredTimedOutError:
|
||||
pass
|
Loading…
Reference in a new issue