synapse/synapse/http/proxy.py
Erik Johnston 3a30846bd0
Fix mypy on latest Twisted release (#17036)
`ITransport.abortConnection` isn't a thing, but
`HTTPChannel.forceAbortClient` calls it, so lets just use that

Fixes https://github.com/element-hq/synapse/issues/16728
2024-04-11 16:03:45 +01:00

290 lines
10 KiB
Python

#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright 2023 The Matrix.org Foundation C.I.C.
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#
import json
import logging
import urllib.parse
from typing import TYPE_CHECKING, Any, Optional, Set, Tuple, cast
from twisted.internet import protocol
from twisted.internet.interfaces import ITCPTransport
from twisted.internet.protocol import connectionDone
from twisted.python import failure
from twisted.python.failure import Failure
from twisted.web.client import ResponseDone
from twisted.web.http_headers import Headers
from twisted.web.iweb import IResponse
from twisted.web.resource import IResource
from twisted.web.server import Request, Site
from synapse.api.errors import Codes, InvalidProxyCredentialsError
from synapse.http import QuieterFileBodyProducer
from synapse.http.server import _AsyncResource
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.types import ISynapseReactor
from synapse.util.async_helpers import timeout_deferred
if TYPE_CHECKING:
from synapse.http.site import SynapseRequest
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
# "Hop-by-hop" headers (as opposed to "end-to-end" headers) as defined by RFC2616
# section 13.5.1 and referenced in RFC9110 section 7.6.1. These are meant to only be
# consumed by the immediate recipient and not be forwarded on.
HOP_BY_HOP_HEADERS = {
"Connection",
"Keep-Alive",
"Proxy-Authenticate",
"Proxy-Authorization",
"TE",
"Trailers",
"Transfer-Encoding",
"Upgrade",
}
def parse_connection_header_value(
connection_header_value: Optional[bytes],
) -> Set[str]:
"""
Parse the `Connection` header to determine which headers we should not be copied
over from the remote response.
As defined by RFC2616 section 14.10 and RFC9110 section 7.6.1
Example: `Connection: close, X-Foo, X-Bar` will return `{"Close", "X-Foo", "X-Bar"}`
Even though "close" is a special directive, let's just treat it as just another
header for simplicity. If people want to check for this directive, they can simply
check for `"Close" in headers`.
Args:
connection_header_value: The value of the `Connection` header.
Returns:
The set of header names that should not be copied over from the remote response.
The keys are capitalized in canonical capitalization.
"""
headers = Headers()
extra_headers_to_remove: Set[str] = set()
if connection_header_value:
extra_headers_to_remove = {
headers._canonicalNameCaps(connection_option.strip()).decode("ascii")
for connection_option in connection_header_value.split(b",")
}
return extra_headers_to_remove
class ProxyResource(_AsyncResource):
"""
A stub resource that proxies any requests with a `matrix-federation://` scheme
through the given `federation_agent` to the remote homeserver and ferries back the
info.
"""
isLeaf = True
def __init__(self, reactor: ISynapseReactor, hs: "HomeServer"):
super().__init__(True)
self.reactor = reactor
self.agent = hs.get_federation_http_client().agent
self._proxy_authorization_secret = hs.config.worker.worker_replication_secret
def _check_auth(self, request: Request) -> None:
# The `matrix-federation://` proxy functionality can only be used with auth.
# Protect homserver admins forgetting to configure a secret.
assert self._proxy_authorization_secret is not None
# Get the authorization header.
auth_headers = request.requestHeaders.getRawHeaders(b"Proxy-Authorization")
if not auth_headers:
raise InvalidProxyCredentialsError(
"Missing Proxy-Authorization header.", Codes.MISSING_TOKEN
)
if len(auth_headers) > 1:
raise InvalidProxyCredentialsError(
"Too many Proxy-Authorization headers.", Codes.UNAUTHORIZED
)
parts = auth_headers[0].split(b" ")
if parts[0] == b"Bearer" and len(parts) == 2:
received_secret = parts[1].decode("ascii")
if self._proxy_authorization_secret == received_secret:
# Success!
return
raise InvalidProxyCredentialsError(
"Invalid Proxy-Authorization header.", Codes.UNAUTHORIZED
)
async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]:
uri = urllib.parse.urlparse(request.uri)
assert uri.scheme == b"matrix-federation"
# Check the authorization headers before handling the request.
self._check_auth(request)
headers = Headers()
for header_name in (b"User-Agent", b"Authorization", b"Content-Type"):
header_value = request.getHeader(header_name)
if header_value:
headers.addRawHeader(header_name, header_value)
request_deferred = run_in_background(
self.agent.request,
request.method,
request.uri,
headers=headers,
bodyProducer=QuieterFileBodyProducer(request.content),
)
request_deferred = timeout_deferred(
request_deferred,
# This should be set longer than the timeout in `MatrixFederationHttpClient`
# so that it has enough time to complete and pass us the data before we give
# up.
timeout=90,
reactor=self.reactor,
)
response = await make_deferred_yieldable(request_deferred)
return response.code, response
def _send_response(
self,
request: "SynapseRequest",
code: int,
response_object: Any,
) -> None:
response = cast(IResponse, response_object)
response_headers = cast(Headers, response.headers)
request.setResponseCode(code)
# The `Connection` header also defines which headers should not be copied over.
connection_header = response_headers.getRawHeaders(b"connection")
extra_headers_to_remove = parse_connection_header_value(
connection_header[0] if connection_header else None
)
# Copy headers.
for k, v in response_headers.getAllRawHeaders():
# Do not copy over any hop-by-hop headers. These are meant to only be
# consumed by the immediate recipient and not be forwarded on.
header_key = k.decode("ascii")
if (
header_key in HOP_BY_HOP_HEADERS
or header_key in extra_headers_to_remove
):
continue
request.responseHeaders.setRawHeaders(k, v)
response.deliverBody(_ProxyResponseBody(request))
def _send_error_response(
self,
f: failure.Failure,
request: "SynapseRequest",
) -> None:
if isinstance(f.value, InvalidProxyCredentialsError):
error_response_code = f.value.code
error_response_json = {"errcode": f.value.errcode, "err": f.value.msg}
else:
error_response_code = 502
error_response_json = {
"errcode": Codes.UNKNOWN,
"err": "ProxyResource: Error when proxying request: %s %s -> %s"
% (
request.method.decode("ascii"),
request.uri.decode("ascii"),
f,
),
}
request.setResponseCode(error_response_code)
request.setHeader(b"Content-Type", b"application/json")
request.write((json.dumps(error_response_json)).encode())
request.finish()
class _ProxyResponseBody(protocol.Protocol):
"""
A protocol that proxies the given remote response data back out to the given local
request.
"""
transport: Optional[ITCPTransport] = None
def __init__(self, request: "SynapseRequest") -> None:
self._request = request
def dataReceived(self, data: bytes) -> None:
# Avoid sending response data to the local request that already disconnected
if self._request._disconnected and self.transport is not None:
# Close the connection (forcefully) since all the data will get
# discarded anyway.
self.transport.abortConnection()
return
self._request.write(data)
def connectionLost(self, reason: Failure = connectionDone) -> None:
# If the local request is already finished (successfully or failed), don't
# worry about sending anything back.
if self._request.finished:
return
if reason.check(ResponseDone):
self._request.finish()
else:
# Abort the underlying request since our remote request also failed.
if self._request.channel:
self._request.channel.forceAbortClient()
class ProxySite(Site):
"""
Proxies any requests with a `matrix-federation://` scheme through the given
`federation_agent`. Otherwise, behaves like a normal `Site`.
"""
def __init__(
self,
resource: IResource,
reactor: ISynapseReactor,
hs: "HomeServer",
):
super().__init__(resource, reactor=reactor)
self._proxy_resource = ProxyResource(reactor, hs=hs)
def getResourceFor(self, request: "SynapseRequest") -> IResource:
uri = urllib.parse.urlparse(request.uri)
if uri.scheme == b"matrix-federation":
return self._proxy_resource
return super().getResourceFor(request)