SHHS - Room Join Complexity (#5072)

This commit is contained in:
Amber Brown 2019-05-20 17:01:50 -05:00 committed by GitHub
parent d142e51f76
commit c99c105158
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 462 additions and 24 deletions

2
.gitignore vendored
View file

@ -19,6 +19,7 @@ _trial_temp*/
/*.signing.key /*.signing.key
/env/ /env/
/homeserver*.yaml /homeserver*.yaml
/logs
/media_store/ /media_store/
/uploads /uploads
@ -37,4 +38,3 @@ _trial_temp*/
/docs/build/ /docs/build/
/htmlcov /htmlcov
/pip-wheel-metadata/ /pip-wheel-metadata/

1
changelog.d/5072.feature Normal file
View file

@ -0,0 +1 @@
Synapse can now be configured to not join remote rooms of a given "complexity" (currently, state events). This option can be used to prevent adverse performance on resource-constrained homeservers.

View file

@ -271,6 +271,17 @@ listeners:
# Used by phonehome stats to group together related servers. # Used by phonehome stats to group together related servers.
#server_context: context #server_context: context
# Resource-constrained Homeserver Settings
#
# If limit_large_remote_room_joins is True, the room complexity will be
# checked before a user joins a new remote room. If it is above
# limit_large_remote_room_complexity, it will disallow joining or
# instantly leave.
#
# Uncomment the below lines to enable:
#limit_large_remote_room_joins: True
#limit_large_remote_room_complexity: 1.0
# Whether to require a user to be in the room to add an alias to it. # Whether to require a user to be in the room to add an alias to it.
# Defaults to 'true'. # Defaults to 'true'.
# #

View file

@ -26,6 +26,7 @@ CLIENT_API_PREFIX = "/_matrix/client"
FEDERATION_PREFIX = "/_matrix/federation" FEDERATION_PREFIX = "/_matrix/federation"
FEDERATION_V1_PREFIX = FEDERATION_PREFIX + "/v1" FEDERATION_V1_PREFIX = FEDERATION_PREFIX + "/v1"
FEDERATION_V2_PREFIX = FEDERATION_PREFIX + "/v2" FEDERATION_V2_PREFIX = FEDERATION_PREFIX + "/v2"
FEDERATION_UNSTABLE_PREFIX = FEDERATION_PREFIX + "/unstable"
STATIC_PREFIX = "/_matrix/static" STATIC_PREFIX = "/_matrix/static"
WEB_CLIENT_PREFIX = "/_matrix/client" WEB_CLIENT_PREFIX = "/_matrix/client"
CONTENT_REPO_PREFIX = "/_matrix/content" CONTENT_REPO_PREFIX = "/_matrix/content"

View file

@ -221,6 +221,12 @@ class ServerConfig(Config):
self.gc_thresholds = read_gc_thresholds(config.get("gc_thresholds", None)) self.gc_thresholds = read_gc_thresholds(config.get("gc_thresholds", None))
# Resource-constrained Homeserver Configuration
self.limit_large_room_joins = config.get("limit_large_remote_room_joins", False)
self.limit_large_room_complexity = config.get(
"limit_large_remote_room_complexity", 1.0
)
bind_port = config.get("bind_port") bind_port = config.get("bind_port")
if bind_port: if bind_port:
if config.get("no_tls", False): if config.get("no_tls", False):
@ -572,6 +578,17 @@ class ServerConfig(Config):
# Used by phonehome stats to group together related servers. # Used by phonehome stats to group together related servers.
#server_context: context #server_context: context
# Resource-constrained Homeserver Settings
#
# If limit_large_remote_room_joins is True, the room complexity will be
# checked before a user joins a new remote room. If it is above
# limit_large_remote_room_complexity, it will disallow joining or
# instantly leave.
#
# Uncomment the below lines to enable:
#limit_large_remote_room_joins: True
#limit_large_remote_room_complexity: 1.0
# Whether to require a user to be in the room to add an alias to it. # Whether to require a user to be in the room to add an alias to it.
# Defaults to 'true'. # Defaults to 'true'.
# #

View file

@ -992,3 +992,39 @@ class FederationClient(FederationBase):
) )
raise RuntimeError("Failed to send to any server.") raise RuntimeError("Failed to send to any server.")
@defer.inlineCallbacks
def get_room_complexity(self, destination, room_id):
"""
Fetch the complexity of a remote room from another server.
Args:
destination (str): The remote server
room_id (str): The room ID to ask about.
Returns:
Deferred[dict] or Deferred[None]: Dict contains the complexity
metric versions, while None means we could not fetch the complexity.
"""
try:
complexity = yield self.transport_layer.get_room_complexity(
destination=destination,
room_id=room_id
)
defer.returnValue(complexity)
except CodeMessageException as e:
# We didn't manage to get it -- probably a 404. We are okay if other
# servers don't give it to us.
logger.debug(
"Failed to fetch room complexity via %s for %s, got a %d",
destination, room_id, e.code
)
except Exception:
logger.exception(
"Failed to fetch room complexity via %s for %s",
destination, room_id
)
# If we don't manage to find it, return None. It's not an error if a
# server doesn't give it to us.
defer.returnValue(None)

View file

@ -21,7 +21,11 @@ from six.moves import urllib
from twisted.internet import defer from twisted.internet import defer
from synapse.api.constants import Membership from synapse.api.constants import Membership
from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX from synapse.api.urls import (
FEDERATION_UNSTABLE_PREFIX,
FEDERATION_V1_PREFIX,
FEDERATION_V2_PREFIX,
)
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -959,6 +963,28 @@ class TransportLayerClient(object):
ignore_backoff=True, ignore_backoff=True,
) )
def get_room_complexity(self, destination, room_id):
"""
Args:
destination (str): The remote server
room_id (str): The room ID to ask about.
"""
path = _create_path(
FEDERATION_UNSTABLE_PREFIX, "/rooms/%s/complexity", room_id
)
return self.client.get_json(
destination=destination,
path=path
)
def _create_path(federation_prefix, path, *args):
"""
Ensures that all args are url encoded.
"""
return federation_prefix + path % tuple(urllib.parse.quote(arg, "") for arg in args)
def _create_v1_path(path, *args): def _create_v1_path(path, *args):
"""Creates a path against V1 federation API from the path template and """Creates a path against V1 federation API from the path template and
@ -975,10 +1001,7 @@ def _create_v1_path(path, *args):
Returns: Returns:
str str
""" """
return ( return _create_path(FEDERATION_V1_PREFIX, path, *args)
FEDERATION_V1_PREFIX
+ path % tuple(urllib.parse.quote(arg, "") for arg in args)
)
def _create_v2_path(path, *args): def _create_v2_path(path, *args):
@ -996,7 +1019,4 @@ def _create_v2_path(path, *args):
Returns: Returns:
str str
""" """
return ( return _create_path(FEDERATION_V2_PREFIX, path, *args)
FEDERATION_V2_PREFIX
+ path % tuple(urllib.parse.quote(arg, "") for arg in args)
)

View file

@ -23,7 +23,11 @@ from twisted.internet import defer
import synapse import synapse
from synapse.api.errors import Codes, FederationDeniedError, SynapseError from synapse.api.errors import Codes, FederationDeniedError, SynapseError
from synapse.api.room_versions import RoomVersions from synapse.api.room_versions import RoomVersions
from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX from synapse.api.urls import (
FEDERATION_UNSTABLE_PREFIX,
FEDERATION_V1_PREFIX,
FEDERATION_V2_PREFIX,
)
from synapse.http.endpoint import parse_and_validate_server_name from synapse.http.endpoint import parse_and_validate_server_name
from synapse.http.server import JsonResource from synapse.http.server import JsonResource
from synapse.http.servlet import ( from synapse.http.servlet import (
@ -1304,6 +1308,26 @@ class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
defer.returnValue((200, new_content)) defer.returnValue((200, new_content))
class RoomComplexityServlet(BaseFederationServlet):
PATH = "/rooms/(?P<room_id>[^/]*)/complexity"
PREFIX = FEDERATION_UNSTABLE_PREFIX
@defer.inlineCallbacks
def on_GET(self, origin, content, query, room_id):
store = self.handler.hs.get_datastore()
is_public = yield store.is_room_world_readable_or_publicly_joinable(
room_id
)
if not is_public:
raise SynapseError(404, "Room not found", errcode=Codes.INVALID_PARAM)
complexity = yield store.get_room_complexity(room_id)
defer.returnValue((200, complexity))
FEDERATION_SERVLET_CLASSES = ( FEDERATION_SERVLET_CLASSES = (
FederationSendServlet, FederationSendServlet,
FederationEventServlet, FederationEventServlet,
@ -1327,6 +1351,7 @@ FEDERATION_SERVLET_CLASSES = (
FederationThirdPartyInviteExchangeServlet, FederationThirdPartyInviteExchangeServlet,
On3pidBindServlet, On3pidBindServlet,
FederationVersionServlet, FederationVersionServlet,
RoomComplexityServlet,
) )
OPENID_SERVLET_CLASSES = ( OPENID_SERVLET_CLASSES = (

View file

@ -2724,3 +2724,28 @@ class FederationHandler(BaseHandler):
) )
else: else:
return user_joined_room(self.distributor, user, room_id) return user_joined_room(self.distributor, user, room_id)
@defer.inlineCallbacks
def get_room_complexity(self, remote_room_hosts, room_id):
"""
Fetch the complexity of a remote room over federation.
Args:
remote_room_hosts (list[str]): The remote servers to ask.
room_id (str): The room ID to ask about.
Returns:
Deferred[dict] or Deferred[None]: Dict contains the complexity
metric versions, while None means we could not fetch the complexity.
"""
for host in remote_room_hosts:
res = yield self.federation_client.get_room_complexity(host, room_id)
# We got a result, return it.
if res:
defer.returnValue(res)
# We fell off the bottom, couldn't get the complexity from anyone. Oh
# well.
defer.returnValue(None)

View file

@ -26,8 +26,7 @@ from unpaddedbase64 import decode_base64
from twisted.internet import defer from twisted.internet import defer
import synapse.server from synapse import types
import synapse.types
from synapse.api.constants import EventTypes, Membership from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.types import RoomID, UserID from synapse.types import RoomID, UserID
@ -590,7 +589,7 @@ class RoomMemberHandler(object):
) )
assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,) assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,)
else: else:
requester = synapse.types.create_requester(target_user) requester = types.create_requester(target_user)
prev_event = yield self.event_creation_handler.deduplicate_state_event( prev_event = yield self.event_creation_handler.deduplicate_state_event(
event, context, event, context,
@ -1017,6 +1016,47 @@ class RoomMemberMasterHandler(RoomMemberHandler):
self.distributor.declare("user_joined_room") self.distributor.declare("user_joined_room")
self.distributor.declare("user_left_room") self.distributor.declare("user_left_room")
@defer.inlineCallbacks
def _is_remote_room_too_complex(self, room_id, remote_room_hosts):
"""
Check if complexity of a remote room is too great.
Args:
room_id (str)
remote_room_hosts (list[str])
Returns: bool of whether the complexity is too great, or None
if unable to be fetched
"""
max_complexity = self.hs.config.limit_large_room_complexity
complexity = yield self.federation_handler.get_room_complexity(
remote_room_hosts, room_id
)
if complexity:
if complexity["v1"] > max_complexity:
return True
return False
return None
@defer.inlineCallbacks
def _is_local_room_too_complex(self, room_id):
"""
Check if the complexity of a local room is too great.
Args:
room_id (str)
Returns: bool
"""
max_complexity = self.hs.config.limit_large_room_complexity
complexity = yield self.store.get_room_complexity(room_id)
if complexity["v1"] > max_complexity:
return True
return False
@defer.inlineCallbacks @defer.inlineCallbacks
def _remote_join(self, requester, remote_room_hosts, room_id, user, content): def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
"""Implements RoomMemberHandler._remote_join """Implements RoomMemberHandler._remote_join
@ -1024,7 +1064,6 @@ class RoomMemberMasterHandler(RoomMemberHandler):
# filter ourselves out of remote_room_hosts: do_invite_join ignores it # filter ourselves out of remote_room_hosts: do_invite_join ignores it
# and if it is the only entry we'd like to return a 404 rather than a # and if it is the only entry we'd like to return a 404 rather than a
# 500. # 500.
remote_room_hosts = [ remote_room_hosts = [
host for host in remote_room_hosts if host != self.hs.hostname host for host in remote_room_hosts if host != self.hs.hostname
] ]
@ -1032,6 +1071,18 @@ class RoomMemberMasterHandler(RoomMemberHandler):
if len(remote_room_hosts) == 0: if len(remote_room_hosts) == 0:
raise SynapseError(404, "No known servers") raise SynapseError(404, "No known servers")
if self.hs.config.limit_large_room_joins:
# Fetch the room complexity
too_complex = yield self._is_remote_room_too_complex(
room_id, remote_room_hosts
)
if too_complex is True:
msg = "Room too large (preflight)"
raise SynapseError(
code=400, msg=msg,
errcode=Codes.RESOURCE_LIMIT_EXCEEDED
)
# We don't do an auth check if we are doing an invite # We don't do an auth check if we are doing an invite
# join dance for now, since we're kinda implicitly checking # join dance for now, since we're kinda implicitly checking
# that we are allowed to join when we decide whether or not we # that we are allowed to join when we decide whether or not we
@ -1044,6 +1095,36 @@ class RoomMemberMasterHandler(RoomMemberHandler):
) )
yield self._user_joined_room(user, room_id) yield self._user_joined_room(user, room_id)
# Check the room we just joined wasn't too large, if we didn't fetch the
# complexity of it before.
if self.hs.config.limit_large_room_joins:
if too_complex is False:
# We checked, and we're under the limit.
return
# Check again, but with the local state events
too_complex = yield self._is_local_room_too_complex(room_id)
if too_complex is False:
# We're under the limit.
return
# The room is too large. Leave.
requester = types.create_requester(
user, None, False, None
)
yield self.update_membership(
requester=requester,
target=user,
room_id=room_id,
action="leave"
)
msg = "Room too large (postflight)"
raise SynapseError(
code=400, msg=msg,
errcode=Codes.RESOURCE_LIMIT_EXCEEDED
)
@defer.inlineCallbacks @defer.inlineCallbacks
def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
"""Implements RoomMemberHandler._remote_reject_invite """Implements RoomMemberHandler._remote_reject_invite

View file

@ -822,10 +822,16 @@ class AdminRestResource(JsonResource):
def __init__(self, hs): def __init__(self, hs):
JsonResource.__init__(self, hs, canonical_json=False) JsonResource.__init__(self, hs, canonical_json=False)
register_servlets(hs, self)
register_servlets_for_client_rest_resource(hs, self)
SendServerNoticeServlet(hs).register(self) def register_servlets(hs, http_server):
VersionServlet(hs).register(self) """
Register all the admin servlets.
"""
register_servlets_for_client_rest_resource(hs, http_server)
SendServerNoticeServlet(hs).register(http_server)
VersionServlet(hs).register(http_server)
def register_servlets_for_client_rest_resource(hs, http_server): def register_servlets_for_client_rest_resource(hs, http_server):

View file

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from __future__ import division
import itertools import itertools
import logging import logging
from collections import namedtuple from collections import namedtuple
@ -611,3 +613,45 @@ class EventsWorkerStore(SQLBaseStore):
return res return res
return self.runInteraction("get_rejection_reasons", f) return self.runInteraction("get_rejection_reasons", f)
def _get_state_event_counts_txn(self, txn, room_id):
"""
See get_state_event_counts.
"""
sql = "SELECT COUNT(*) FROM current_state_events WHERE room_id=?"
txn.execute(sql, (room_id,))
row = txn.fetchone()
return row[0] if row else 0
def get_state_event_counts(self, room_id):
"""
Gets the total number of state events in a room.
Args:
room_id (str)
Returns:
Deferred[int]
"""
return self.runInteraction(
"get_state_event_counts", self._get_state_event_counts_txn, room_id
)
@defer.inlineCallbacks
def get_room_complexity(self, room_id):
"""
Get the complexity of a room.
Args:
room_id (str)
Returns:
Deferred[dict[str:int]] of complexity version to complexity.
"""
state_events = yield self.get_state_event_counts(room_id)
# Call this one "v1", so we can introduce new ones as we want to develop
# it.
complexity_v1 = round(state_events / 500, 2)
defer.returnValue({"v1": complexity_v1})

View file

@ -0,0 +1,160 @@
# -*- coding: utf-8 -*-
# Copyright 2019 Matrix.org Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mock import Mock
from twisted.internet import defer
from synapse.api.errors import Codes, SynapseError
from synapse.config.ratelimiting import FederationRateLimitConfig
from synapse.federation.transport import server
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
from synapse.types import UserID
from synapse.util.ratelimitutils import FederationRateLimiter
from tests import unittest
class RoomComplexityTests(unittest.HomeserverTestCase):
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]
def default_config(self, name='test'):
config = super().default_config(name=name)
config["limit_large_remote_room_joins"] = True
config["limit_large_remote_room_complexity"] = 0.05
return config
def prepare(self, reactor, clock, homeserver):
class Authenticator(object):
def authenticate_request(self, request, content):
return defer.succeed("otherserver.nottld")
ratelimiter = FederationRateLimiter(
clock,
FederationRateLimitConfig(
window_size=1,
sleep_limit=1,
sleep_msec=1,
reject_limit=1000,
concurrent_requests=1000,
),
)
server.register_servlets(
homeserver, self.resource, Authenticator(), ratelimiter
)
def test_complexity_simple(self):
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")
room_1 = self.helper.create_room_as(u1, tok=u1_token)
self.helper.send_state(
room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
)
# Get the room complexity
request, channel = self.make_request(
"GET", "/_matrix/federation/unstable/rooms/%s/complexity" % (room_1,)
)
self.render(request)
self.assertEquals(200, channel.code)
complexity = channel.json_body["v1"]
self.assertTrue(complexity > 0, complexity)
# Artificially raise the complexity
store = self.hs.get_datastore()
store.get_state_event_counts = lambda x: defer.succeed(500 * 1.23)
# Get the room complexity again -- make sure it's our artificial value
request, channel = self.make_request(
"GET", "/_matrix/federation/unstable/rooms/%s/complexity" % (room_1,)
)
self.render(request)
self.assertEquals(200, channel.code)
complexity = channel.json_body["v1"]
self.assertEqual(complexity, 1.23)
def test_join_too_large(self):
u1 = self.register_user("u1", "pass")
handler = self.hs.get_room_member_handler()
fed_transport = self.hs.get_federation_transport_client()
# Mock out some things, because we don't want to test the whole join
fed_transport.client.get_json = Mock(return_value=defer.succeed({"v1": 9999}))
handler.federation_handler.do_invite_join = Mock(return_value=defer.succeed(1))
d = handler._remote_join(
None,
["otherserver.example"],
"roomid",
UserID.from_string(u1),
{"membership": "join"},
)
self.pump()
# The request failed with a SynapseError saying the resource limit was
# exceeded.
f = self.get_failure(d, SynapseError)
self.assertEqual(f.value.code, 400, f.value)
self.assertEqual(f.value.errcode, Codes.RESOURCE_LIMIT_EXCEEDED)
def test_join_too_large_once_joined(self):
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")
# Ok, this might seem a bit weird -- I want to test that we actually
# leave the room, but I don't want to simulate two servers. So, we make
# a local room, which we say we're joining remotely, even if there's no
# remote, because we mock that out. Then, we'll leave the (actually
# local) room, which will be propagated over federation in a real
# scenario.
room_1 = self.helper.create_room_as(u1, tok=u1_token)
handler = self.hs.get_room_member_handler()
fed_transport = self.hs.get_federation_transport_client()
# Mock out some things, because we don't want to test the whole join
fed_transport.client.get_json = Mock(return_value=defer.succeed(None))
handler.federation_handler.do_invite_join = Mock(return_value=defer.succeed(1))
# Artificially raise the complexity
self.hs.get_datastore().get_state_event_counts = lambda x: defer.succeed(600)
d = handler._remote_join(
None,
["otherserver.example"],
room_1,
UserID.from_string(u1),
{"membership": "join"},
)
self.pump()
# The request failed with a SynapseError saying the resource limit was
# exceeded.
f = self.get_failure(d, SynapseError)
self.assertEqual(f.value.code, 400)
self.assertEqual(f.value.errcode, Codes.RESOURCE_LIMIT_EXCEEDED)

View file

@ -127,3 +127,20 @@ class RestHelper(object):
) )
return channel.json_body return channel.json_body
def send_state(self, room_id, event_type, body, tok, expect_code=200):
path = "/_matrix/client/r0/rooms/%s/state/%s" % (room_id, event_type)
if tok:
path = path + "?access_token=%s" % tok
request, channel = make_request(
self.hs.get_reactor(), "PUT", path, json.dumps(body).encode('utf8')
)
render(request, self.resource, self.hs.get_reactor())
assert int(channel.result["code"]) == expect_code, (
"Expected: %d, got: %d, resp: %r"
% (expect_code, int(channel.result["code"]), channel.result["body"])
)
return channel.json_body

View file

@ -22,8 +22,6 @@ from mock import Mock
from canonicaljson import json from canonicaljson import json
import twisted
import twisted.logger
from twisted.internet.defer import Deferred from twisted.internet.defer import Deferred
from twisted.trial import unittest from twisted.trial import unittest
@ -77,10 +75,6 @@ class TestCase(unittest.TestCase):
@around(self) @around(self)
def setUp(orig): def setUp(orig):
# enable debugging of delayed calls - this means that we get a
# traceback when a unit test exits leaving things on the reactor.
twisted.internet.base.DelayedCall.debug = True
# if we're not starting in the sentinel logcontext, then to be honest # if we're not starting in the sentinel logcontext, then to be honest
# all future bets are off. # all future bets are off.
if LoggingContext.current_context() is not LoggingContext.sentinel: if LoggingContext.current_context() is not LoggingContext.sentinel: