mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-29 07:28:55 +03:00
Merge pull request #61 from matrix-org/timeout-federation-requests
Timeout federation requests
This commit is contained in:
commit
c52e8d395b
3 changed files with 63 additions and 2 deletions
|
@ -72,5 +72,7 @@ class ReplicationLayer(FederationClient, FederationServer):
|
||||||
|
|
||||||
self._order = 0
|
self._order = 0
|
||||||
|
|
||||||
|
self.hs = hs
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return "<ReplicationLayer(%s)>" % self.server_name
|
return "<ReplicationLayer(%s)>" % self.server_name
|
||||||
|
|
|
@ -79,6 +79,7 @@ class MatrixFederationHttpClient(object):
|
||||||
self.signing_key = hs.config.signing_key[0]
|
self.signing_key = hs.config.signing_key[0]
|
||||||
self.server_name = hs.hostname
|
self.server_name = hs.hostname
|
||||||
self.agent = MatrixFederationHttpAgent(reactor)
|
self.agent = MatrixFederationHttpAgent(reactor)
|
||||||
|
self.clock = hs.get_clock()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _create_request(self, destination, method, path_bytes,
|
def _create_request(self, destination, method, path_bytes,
|
||||||
|
@ -118,7 +119,7 @@ class MatrixFederationHttpClient(object):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with PreserveLoggingContext():
|
with PreserveLoggingContext():
|
||||||
response = yield self.agent.request(
|
request_deferred = self.agent.request(
|
||||||
destination,
|
destination,
|
||||||
endpoint,
|
endpoint,
|
||||||
method,
|
method,
|
||||||
|
@ -129,6 +130,11 @@ class MatrixFederationHttpClient(object):
|
||||||
producer
|
producer
|
||||||
)
|
)
|
||||||
|
|
||||||
|
response = yield self.clock.time_bound_deferred(
|
||||||
|
request_deferred,
|
||||||
|
time_out=60,
|
||||||
|
)
|
||||||
|
|
||||||
logger.debug("Got response to %s", method)
|
logger.debug("Got response to %s", method)
|
||||||
break
|
break
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
|
@ -15,9 +15,12 @@
|
||||||
|
|
||||||
from synapse.util.logcontext import LoggingContext
|
from synapse.util.logcontext import LoggingContext
|
||||||
|
|
||||||
from twisted.internet import reactor, task
|
from twisted.internet import defer, reactor, task
|
||||||
|
|
||||||
import time
|
import time
|
||||||
|
import logging
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class Clock(object):
|
class Clock(object):
|
||||||
|
@ -53,3 +56,53 @@ class Clock(object):
|
||||||
|
|
||||||
def cancel_call_later(self, timer):
|
def cancel_call_later(self, timer):
|
||||||
timer.cancel()
|
timer.cancel()
|
||||||
|
|
||||||
|
def time_bound_deferred(self, given_deferred, time_out):
|
||||||
|
if given_deferred.called:
|
||||||
|
return given_deferred
|
||||||
|
|
||||||
|
ret_deferred = defer.Deferred()
|
||||||
|
|
||||||
|
def timed_out_fn():
|
||||||
|
try:
|
||||||
|
ret_deferred.errback(RuntimeError("Timed out"))
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
try:
|
||||||
|
given_deferred.cancel()
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
timer = None
|
||||||
|
|
||||||
|
def cancel(res):
|
||||||
|
try:
|
||||||
|
self.cancel_call_later(timer)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
return res
|
||||||
|
|
||||||
|
ret_deferred.addBoth(cancel)
|
||||||
|
|
||||||
|
def sucess(res):
|
||||||
|
try:
|
||||||
|
ret_deferred.callback(res)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return res
|
||||||
|
|
||||||
|
def err(res):
|
||||||
|
try:
|
||||||
|
ret_deferred.errback(res)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return res
|
||||||
|
|
||||||
|
given_deferred.addCallbacks(callback=sucess, errback=err)
|
||||||
|
|
||||||
|
timer = self.call_later(time_out, timed_out_fn)
|
||||||
|
|
||||||
|
return ret_deferred
|
||||||
|
|
Loading…
Reference in a new issue