mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-23 01:55:53 +03:00
Fix TaskStopped exceptions when outbound requests time out (#4690)
This commit is contained in:
parent
8d98dc8ffe
commit
c88bc53903
4 changed files with 30 additions and 18 deletions
1
changelog.d/4690.bugfix
Normal file
1
changelog.d/4690.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Fix TaskStopped exceptions in logs when outbound requests time out.
|
|
@ -15,8 +15,10 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
import re
|
import re
|
||||||
|
|
||||||
|
from twisted.internet import task
|
||||||
from twisted.internet.defer import CancelledError
|
from twisted.internet.defer import CancelledError
|
||||||
from twisted.python import failure
|
from twisted.python import failure
|
||||||
|
from twisted.web.client import FileBodyProducer
|
||||||
|
|
||||||
from synapse.api.errors import SynapseError
|
from synapse.api.errors import SynapseError
|
||||||
|
|
||||||
|
@ -47,3 +49,16 @@ def redact_uri(uri):
|
||||||
r'\1<redacted>\3',
|
r'\1<redacted>\3',
|
||||||
uri
|
uri
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class QuieterFileBodyProducer(FileBodyProducer):
|
||||||
|
"""Wrapper for FileBodyProducer that avoids CRITICAL errors when the connection drops.
|
||||||
|
|
||||||
|
Workaround for https://github.com/matrix-org/synapse/issues/4003 /
|
||||||
|
https://twistedmatrix.com/trac/ticket/6528
|
||||||
|
"""
|
||||||
|
def stopProducing(self):
|
||||||
|
try:
|
||||||
|
FileBodyProducer.stopProducing(self)
|
||||||
|
except task.TaskStopped:
|
||||||
|
pass
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
from six import text_type
|
from six import text_type
|
||||||
from six.moves import urllib
|
from six.moves import urllib
|
||||||
|
@ -39,7 +40,11 @@ from twisted.web.http import PotentialDataLoss
|
||||||
from twisted.web.http_headers import Headers
|
from twisted.web.http_headers import Headers
|
||||||
|
|
||||||
from synapse.api.errors import Codes, HttpResponseException, SynapseError
|
from synapse.api.errors import Codes, HttpResponseException, SynapseError
|
||||||
from synapse.http import cancelled_to_request_timed_out_error, redact_uri
|
from synapse.http import (
|
||||||
|
QuieterFileBodyProducer,
|
||||||
|
cancelled_to_request_timed_out_error,
|
||||||
|
redact_uri,
|
||||||
|
)
|
||||||
from synapse.util.async_helpers import timeout_deferred
|
from synapse.util.async_helpers import timeout_deferred
|
||||||
from synapse.util.caches import CACHE_SIZE_FACTOR
|
from synapse.util.caches import CACHE_SIZE_FACTOR
|
||||||
from synapse.util.logcontext import make_deferred_yieldable
|
from synapse.util.logcontext import make_deferred_yieldable
|
||||||
|
@ -246,7 +251,7 @@ class SimpleHttpClient(object):
|
||||||
)
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def request(self, method, uri, data=b'', headers=None):
|
def request(self, method, uri, data=None, headers=None):
|
||||||
"""
|
"""
|
||||||
Args:
|
Args:
|
||||||
method (str): HTTP method to use.
|
method (str): HTTP method to use.
|
||||||
|
@ -265,11 +270,15 @@ class SimpleHttpClient(object):
|
||||||
logger.info("Sending request %s %s", method, redact_uri(uri))
|
logger.info("Sending request %s %s", method, redact_uri(uri))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
body_producer = None
|
||||||
|
if data is not None:
|
||||||
|
body_producer = QuieterFileBodyProducer(BytesIO(data))
|
||||||
|
|
||||||
request_deferred = treq.request(
|
request_deferred = treq.request(
|
||||||
method,
|
method,
|
||||||
uri,
|
uri,
|
||||||
agent=self.agent,
|
agent=self.agent,
|
||||||
data=data,
|
data=body_producer,
|
||||||
headers=headers,
|
headers=headers,
|
||||||
**self._extra_treq_args
|
**self._extra_treq_args
|
||||||
)
|
)
|
||||||
|
|
|
@ -28,11 +28,10 @@ from canonicaljson import encode_canonical_json
|
||||||
from prometheus_client import Counter
|
from prometheus_client import Counter
|
||||||
from signedjson.sign import sign_json
|
from signedjson.sign import sign_json
|
||||||
|
|
||||||
from twisted.internet import defer, protocol, task
|
from twisted.internet import defer, protocol
|
||||||
from twisted.internet.error import DNSLookupError
|
from twisted.internet.error import DNSLookupError
|
||||||
from twisted.internet.task import _EPSILON, Cooperator
|
from twisted.internet.task import _EPSILON, Cooperator
|
||||||
from twisted.web._newclient import ResponseDone
|
from twisted.web._newclient import ResponseDone
|
||||||
from twisted.web.client import FileBodyProducer
|
|
||||||
from twisted.web.http_headers import Headers
|
from twisted.web.http_headers import Headers
|
||||||
|
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
|
@ -44,6 +43,7 @@ from synapse.api.errors import (
|
||||||
RequestSendFailed,
|
RequestSendFailed,
|
||||||
SynapseError,
|
SynapseError,
|
||||||
)
|
)
|
||||||
|
from synapse.http import QuieterFileBodyProducer
|
||||||
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
|
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
|
||||||
from synapse.util.async_helpers import timeout_deferred
|
from synapse.util.async_helpers import timeout_deferred
|
||||||
from synapse.util.logcontext import make_deferred_yieldable
|
from synapse.util.logcontext import make_deferred_yieldable
|
||||||
|
@ -839,16 +839,3 @@ def encode_query_args(args):
|
||||||
query_bytes = urllib.parse.urlencode(encoded_args, True)
|
query_bytes = urllib.parse.urlencode(encoded_args, True)
|
||||||
|
|
||||||
return query_bytes.encode('utf8')
|
return query_bytes.encode('utf8')
|
||||||
|
|
||||||
|
|
||||||
class QuieterFileBodyProducer(FileBodyProducer):
|
|
||||||
"""Wrapper for FileBodyProducer that avoids CRITICAL errors when the connection drops.
|
|
||||||
|
|
||||||
Workaround for https://github.com/matrix-org/synapse/issues/4003 /
|
|
||||||
https://twistedmatrix.com/trac/ticket/6528
|
|
||||||
"""
|
|
||||||
def stopProducing(self):
|
|
||||||
try:
|
|
||||||
FileBodyProducer.stopProducing(self)
|
|
||||||
except task.TaskStopped:
|
|
||||||
pass
|
|
||||||
|
|
Loading…
Reference in a new issue