mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-29 07:28:55 +03:00
Fix well-known lookups with the federation certificate whitelist (#5997)
This commit is contained in:
parent
c755955f33
commit
850dcfd2d3
5 changed files with 63 additions and 15 deletions
1
changelog.d/5996.bugfix
Normal file
1
changelog.d/5996.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
federation_certificate_verification_whitelist now will not cause TypeErrors to be raised (a regression in 1.3). Additionally, it now supports internationalised domain names in their non-canonical representation.
|
|
@ -110,8 +110,15 @@ class TlsConfig(Config):
|
||||||
# Support globs (*) in whitelist values
|
# Support globs (*) in whitelist values
|
||||||
self.federation_certificate_verification_whitelist = []
|
self.federation_certificate_verification_whitelist = []
|
||||||
for entry in fed_whitelist_entries:
|
for entry in fed_whitelist_entries:
|
||||||
|
try:
|
||||||
|
entry_regex = glob_to_regex(entry.encode("ascii").decode("ascii"))
|
||||||
|
except UnicodeEncodeError:
|
||||||
|
raise ConfigError(
|
||||||
|
"IDNA domain names are not allowed in the "
|
||||||
|
"federation_certificate_verification_whitelist: %s" % (entry,)
|
||||||
|
)
|
||||||
|
|
||||||
# Convert globs to regex
|
# Convert globs to regex
|
||||||
entry_regex = glob_to_regex(entry)
|
|
||||||
self.federation_certificate_verification_whitelist.append(entry_regex)
|
self.federation_certificate_verification_whitelist.append(entry_regex)
|
||||||
|
|
||||||
# List of custom certificate authorities for federation traffic validation
|
# List of custom certificate authorities for federation traffic validation
|
||||||
|
|
|
@ -15,7 +15,6 @@
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
import idna
|
|
||||||
from service_identity import VerificationError
|
from service_identity import VerificationError
|
||||||
from service_identity.pyopenssl import verify_hostname, verify_ip_address
|
from service_identity.pyopenssl import verify_hostname, verify_ip_address
|
||||||
from zope.interface import implementer
|
from zope.interface import implementer
|
||||||
|
@ -114,14 +113,20 @@ class ClientTLSOptionsFactory(object):
|
||||||
self._no_verify_ssl_context = self._no_verify_ssl.getContext()
|
self._no_verify_ssl_context = self._no_verify_ssl.getContext()
|
||||||
self._no_verify_ssl_context.set_info_callback(self._context_info_cb)
|
self._no_verify_ssl_context.set_info_callback(self._context_info_cb)
|
||||||
|
|
||||||
def get_options(self, host):
|
def get_options(self, host: bytes):
|
||||||
|
|
||||||
|
# IPolicyForHTTPS.get_options takes bytes, but we want to compare
|
||||||
|
# against the str whitelist. The hostnames in the whitelist are already
|
||||||
|
# IDNA-encoded like the hosts will be here.
|
||||||
|
ascii_host = host.decode("ascii")
|
||||||
|
|
||||||
# Check if certificate verification has been enabled
|
# Check if certificate verification has been enabled
|
||||||
should_verify = self._config.federation_verify_certificates
|
should_verify = self._config.federation_verify_certificates
|
||||||
|
|
||||||
# Check if we've disabled certificate verification for this host
|
# Check if we've disabled certificate verification for this host
|
||||||
if should_verify:
|
if should_verify:
|
||||||
for regex in self._config.federation_certificate_verification_whitelist:
|
for regex in self._config.federation_certificate_verification_whitelist:
|
||||||
if regex.match(host):
|
if regex.match(ascii_host):
|
||||||
should_verify = False
|
should_verify = False
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -162,7 +167,7 @@ class SSLClientConnectionCreator(object):
|
||||||
Replaces twisted.internet.ssl.ClientTLSOptions
|
Replaces twisted.internet.ssl.ClientTLSOptions
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, hostname, ctx, verify_certs):
|
def __init__(self, hostname: bytes, ctx, verify_certs: bool):
|
||||||
self._ctx = ctx
|
self._ctx = ctx
|
||||||
self._verifier = ConnectionVerifier(hostname, verify_certs)
|
self._verifier = ConnectionVerifier(hostname, verify_certs)
|
||||||
|
|
||||||
|
@ -190,21 +195,16 @@ class ConnectionVerifier(object):
|
||||||
|
|
||||||
# This code is based on twisted.internet.ssl.ClientTLSOptions.
|
# This code is based on twisted.internet.ssl.ClientTLSOptions.
|
||||||
|
|
||||||
def __init__(self, hostname, verify_certs):
|
def __init__(self, hostname: bytes, verify_certs):
|
||||||
self._verify_certs = verify_certs
|
self._verify_certs = verify_certs
|
||||||
|
|
||||||
if isIPAddress(hostname) or isIPv6Address(hostname):
|
_decoded = hostname.decode("ascii")
|
||||||
self._hostnameBytes = hostname.encode("ascii")
|
if isIPAddress(_decoded) or isIPv6Address(_decoded):
|
||||||
self._is_ip_address = True
|
self._is_ip_address = True
|
||||||
else:
|
else:
|
||||||
# twisted's ClientTLSOptions falls back to the stdlib impl here if
|
|
||||||
# idna is not installed, but points out that lacks support for
|
|
||||||
# IDNA2008 (http://bugs.python.org/issue17305).
|
|
||||||
#
|
|
||||||
# We can rely on having idna.
|
|
||||||
self._hostnameBytes = idna.encode(hostname)
|
|
||||||
self._is_ip_address = False
|
self._is_ip_address = False
|
||||||
|
|
||||||
|
self._hostnameBytes = hostname
|
||||||
self._hostnameASCII = self._hostnameBytes.decode("ascii")
|
self._hostnameASCII = self._hostnameBytes.decode("ascii")
|
||||||
|
|
||||||
def verify_context_info_cb(self, ssl_connection, where):
|
def verify_context_info_cb(self, ssl_connection, where):
|
||||||
|
|
|
@ -217,7 +217,7 @@ class MatrixHostnameEndpoint(object):
|
||||||
self._tls_options = None
|
self._tls_options = None
|
||||||
else:
|
else:
|
||||||
self._tls_options = tls_client_options_factory.get_options(
|
self._tls_options = tls_client_options_factory.get_options(
|
||||||
self._parsed_uri.host.decode("ascii")
|
self._parsed_uri.host
|
||||||
)
|
)
|
||||||
|
|
||||||
self._srv_resolver = srv_resolver
|
self._srv_resolver = srv_resolver
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
import idna
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
from OpenSSL import SSL
|
from OpenSSL import SSL
|
||||||
|
@ -235,3 +236,42 @@ s4niecZKPBizL6aucT59CsunNmmb5Glq8rlAcU+1ZTZZzGYqVYhF6axB9Qg=
|
||||||
)
|
)
|
||||||
|
|
||||||
self.assertTrue(conf.acme_enabled)
|
self.assertTrue(conf.acme_enabled)
|
||||||
|
|
||||||
|
def test_whitelist_idna_failure(self):
|
||||||
|
"""
|
||||||
|
The federation certificate whitelist will not allow IDNA domain names.
|
||||||
|
"""
|
||||||
|
config = {
|
||||||
|
"federation_certificate_verification_whitelist": [
|
||||||
|
"example.com",
|
||||||
|
"*.ドメイン.テスト",
|
||||||
|
]
|
||||||
|
}
|
||||||
|
t = TestConfig()
|
||||||
|
e = self.assertRaises(
|
||||||
|
ConfigError, t.read_config, config, config_dir_path="", data_dir_path=""
|
||||||
|
)
|
||||||
|
self.assertIn("IDNA domain names", str(e))
|
||||||
|
|
||||||
|
def test_whitelist_idna_result(self):
|
||||||
|
"""
|
||||||
|
The federation certificate whitelist will match on IDNA encoded names.
|
||||||
|
"""
|
||||||
|
config = {
|
||||||
|
"federation_certificate_verification_whitelist": [
|
||||||
|
"example.com",
|
||||||
|
"*.xn--eckwd4c7c.xn--zckzah",
|
||||||
|
]
|
||||||
|
}
|
||||||
|
t = TestConfig()
|
||||||
|
t.read_config(config, config_dir_path="", data_dir_path="")
|
||||||
|
|
||||||
|
cf = ClientTLSOptionsFactory(t)
|
||||||
|
|
||||||
|
# Not in the whitelist
|
||||||
|
opts = cf.get_options(b"notexample.com")
|
||||||
|
self.assertTrue(opts._verifier._verify_certs)
|
||||||
|
|
||||||
|
# Caught by the wildcard
|
||||||
|
opts = cf.get_options(idna.encode("テスト.ドメイン.テスト"))
|
||||||
|
self.assertFalse(opts._verifier._verify_certs)
|
||||||
|
|
Loading…
Reference in a new issue