mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-22 17:46:08 +03:00
23740eaa3d
During the migration the automated script to update the copyright headers accidentally got rid of some of the existing copyright lines. Reinstate them.
295 lines
9.5 KiB
Python
295 lines
9.5 KiB
Python
#
|
|
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
|
#
|
|
# Copyright 2020 The Matrix.org Foundation C.I.C.
|
|
# Copyright (C) 2023 New Vector, Ltd
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# See the GNU Affero General Public License for more details:
|
|
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
|
#
|
|
# Originally licensed under the Apache License, Version 2.0:
|
|
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
|
#
|
|
# [This file includes modifications made by New Vector Limited]
|
|
#
|
|
#
|
|
from http import HTTPStatus
|
|
|
|
from twisted.test.proto_helpers import MemoryReactor
|
|
|
|
from synapse.api.errors import Codes
|
|
from synapse.events.utils import CANONICALJSON_MAX_INT, CANONICALJSON_MIN_INT
|
|
from synapse.rest import admin
|
|
from synapse.rest.client import login, room, sync
|
|
from synapse.server import HomeServer
|
|
from synapse.util import Clock
|
|
|
|
from tests.unittest import HomeserverTestCase
|
|
|
|
|
|
class PowerLevelsTestCase(HomeserverTestCase):
|
|
"""Tests that power levels are enforced in various situations"""
|
|
|
|
servlets = [
|
|
admin.register_servlets,
|
|
room.register_servlets,
|
|
login.register_servlets,
|
|
sync.register_servlets,
|
|
]
|
|
|
|
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
|
|
config = self.default_config()
|
|
|
|
return self.setup_test_homeserver(config=config)
|
|
|
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
|
# register a room admin, moderator and regular user
|
|
self.admin_user_id = self.register_user("admin", "pass")
|
|
self.admin_access_token = self.login("admin", "pass")
|
|
self.mod_user_id = self.register_user("mod", "pass")
|
|
self.mod_access_token = self.login("mod", "pass")
|
|
self.user_user_id = self.register_user("user", "pass")
|
|
self.user_access_token = self.login("user", "pass")
|
|
|
|
# Create a room
|
|
self.room_id = self.helper.create_room_as(
|
|
self.admin_user_id, tok=self.admin_access_token
|
|
)
|
|
|
|
# Invite the other users
|
|
self.helper.invite(
|
|
room=self.room_id,
|
|
src=self.admin_user_id,
|
|
tok=self.admin_access_token,
|
|
targ=self.mod_user_id,
|
|
)
|
|
self.helper.invite(
|
|
room=self.room_id,
|
|
src=self.admin_user_id,
|
|
tok=self.admin_access_token,
|
|
targ=self.user_user_id,
|
|
)
|
|
|
|
# Make the other users join the room
|
|
self.helper.join(
|
|
room=self.room_id, user=self.mod_user_id, tok=self.mod_access_token
|
|
)
|
|
self.helper.join(
|
|
room=self.room_id, user=self.user_user_id, tok=self.user_access_token
|
|
)
|
|
|
|
# Mod the mod
|
|
room_power_levels = self.helper.get_state(
|
|
self.room_id,
|
|
"m.room.power_levels",
|
|
tok=self.admin_access_token,
|
|
)
|
|
|
|
# Update existing power levels with mod at PL50
|
|
room_power_levels["users"].update({self.mod_user_id: 50})
|
|
|
|
self.helper.send_state(
|
|
self.room_id,
|
|
"m.room.power_levels",
|
|
room_power_levels,
|
|
tok=self.admin_access_token,
|
|
)
|
|
|
|
def test_non_admins_cannot_enable_room_encryption(self) -> None:
|
|
# have the mod try to enable room encryption
|
|
self.helper.send_state(
|
|
self.room_id,
|
|
"m.room.encryption",
|
|
{"algorithm": "m.megolm.v1.aes-sha2"},
|
|
tok=self.mod_access_token,
|
|
expect_code=403, # expect failure
|
|
)
|
|
|
|
# have the user try to enable room encryption
|
|
self.helper.send_state(
|
|
self.room_id,
|
|
"m.room.encryption",
|
|
{"algorithm": "m.megolm.v1.aes-sha2"},
|
|
tok=self.user_access_token,
|
|
expect_code=HTTPStatus.FORBIDDEN, # expect failure
|
|
)
|
|
|
|
def test_non_admins_cannot_send_server_acl(self) -> None:
|
|
# have the mod try to send a server ACL
|
|
self.helper.send_state(
|
|
self.room_id,
|
|
"m.room.server_acl",
|
|
{
|
|
"allow": ["*"],
|
|
"allow_ip_literals": False,
|
|
"deny": ["*.evil.com", "evil.com"],
|
|
},
|
|
tok=self.mod_access_token,
|
|
expect_code=HTTPStatus.FORBIDDEN, # expect failure
|
|
)
|
|
|
|
# have the user try to send a server ACL
|
|
self.helper.send_state(
|
|
self.room_id,
|
|
"m.room.server_acl",
|
|
{
|
|
"allow": ["*"],
|
|
"allow_ip_literals": False,
|
|
"deny": ["*.evil.com", "evil.com"],
|
|
},
|
|
tok=self.user_access_token,
|
|
expect_code=HTTPStatus.FORBIDDEN, # expect failure
|
|
)
|
|
|
|
def test_non_admins_cannot_tombstone_room(self) -> None:
|
|
# Create another room that will serve as our "upgraded room"
|
|
self.upgraded_room_id = self.helper.create_room_as(
|
|
self.admin_user_id, tok=self.admin_access_token
|
|
)
|
|
|
|
# have the mod try to send a tombstone event
|
|
self.helper.send_state(
|
|
self.room_id,
|
|
"m.room.tombstone",
|
|
{
|
|
"body": "This room has been replaced",
|
|
"replacement_room": self.upgraded_room_id,
|
|
},
|
|
tok=self.mod_access_token,
|
|
expect_code=HTTPStatus.FORBIDDEN, # expect failure
|
|
)
|
|
|
|
# have the user try to send a tombstone event
|
|
self.helper.send_state(
|
|
self.room_id,
|
|
"m.room.tombstone",
|
|
{
|
|
"body": "This room has been replaced",
|
|
"replacement_room": self.upgraded_room_id,
|
|
},
|
|
tok=self.user_access_token,
|
|
expect_code=403, # expect failure
|
|
)
|
|
|
|
def test_admins_can_enable_room_encryption(self) -> None:
|
|
# have the admin try to enable room encryption
|
|
self.helper.send_state(
|
|
self.room_id,
|
|
"m.room.encryption",
|
|
{"algorithm": "m.megolm.v1.aes-sha2"},
|
|
tok=self.admin_access_token,
|
|
expect_code=HTTPStatus.OK, # expect success
|
|
)
|
|
|
|
def test_admins_can_send_server_acl(self) -> None:
|
|
# have the admin try to send a server ACL
|
|
self.helper.send_state(
|
|
self.room_id,
|
|
"m.room.server_acl",
|
|
{
|
|
"allow": ["*"],
|
|
"allow_ip_literals": False,
|
|
"deny": ["*.evil.com", "evil.com"],
|
|
},
|
|
tok=self.admin_access_token,
|
|
expect_code=HTTPStatus.OK, # expect success
|
|
)
|
|
|
|
def test_admins_can_tombstone_room(self) -> None:
|
|
# Create another room that will serve as our "upgraded room"
|
|
self.upgraded_room_id = self.helper.create_room_as(
|
|
self.admin_user_id, tok=self.admin_access_token
|
|
)
|
|
|
|
# have the admin try to send a tombstone event
|
|
self.helper.send_state(
|
|
self.room_id,
|
|
"m.room.tombstone",
|
|
{
|
|
"body": "This room has been replaced",
|
|
"replacement_room": self.upgraded_room_id,
|
|
},
|
|
tok=self.admin_access_token,
|
|
expect_code=HTTPStatus.OK, # expect success
|
|
)
|
|
|
|
def test_cannot_set_string_power_levels(self) -> None:
|
|
room_power_levels = self.helper.get_state(
|
|
self.room_id,
|
|
"m.room.power_levels",
|
|
tok=self.admin_access_token,
|
|
)
|
|
|
|
# Update existing power levels with user at PL "0"
|
|
room_power_levels["users"].update({self.user_user_id: "0"})
|
|
|
|
body = self.helper.send_state(
|
|
self.room_id,
|
|
"m.room.power_levels",
|
|
room_power_levels,
|
|
tok=self.admin_access_token,
|
|
expect_code=HTTPStatus.BAD_REQUEST, # expect failure
|
|
)
|
|
|
|
self.assertEqual(
|
|
body["errcode"],
|
|
Codes.BAD_JSON,
|
|
body,
|
|
)
|
|
|
|
def test_cannot_set_unsafe_large_power_levels(self) -> None:
|
|
room_power_levels = self.helper.get_state(
|
|
self.room_id,
|
|
"m.room.power_levels",
|
|
tok=self.admin_access_token,
|
|
)
|
|
|
|
# Update existing power levels with user at PL above the max safe integer
|
|
room_power_levels["users"].update(
|
|
{self.user_user_id: CANONICALJSON_MAX_INT + 1}
|
|
)
|
|
|
|
body = self.helper.send_state(
|
|
self.room_id,
|
|
"m.room.power_levels",
|
|
room_power_levels,
|
|
tok=self.admin_access_token,
|
|
expect_code=HTTPStatus.BAD_REQUEST, # expect failure
|
|
)
|
|
|
|
self.assertEqual(
|
|
body["errcode"],
|
|
Codes.BAD_JSON,
|
|
body,
|
|
)
|
|
|
|
def test_cannot_set_unsafe_small_power_levels(self) -> None:
|
|
room_power_levels = self.helper.get_state(
|
|
self.room_id,
|
|
"m.room.power_levels",
|
|
tok=self.admin_access_token,
|
|
)
|
|
|
|
# Update existing power levels with user at PL below the minimum safe integer
|
|
room_power_levels["users"].update(
|
|
{self.user_user_id: CANONICALJSON_MIN_INT - 1}
|
|
)
|
|
|
|
body = self.helper.send_state(
|
|
self.room_id,
|
|
"m.room.power_levels",
|
|
room_power_levels,
|
|
tok=self.admin_access_token,
|
|
expect_code=HTTPStatus.BAD_REQUEST, # expect failure
|
|
)
|
|
|
|
self.assertEqual(
|
|
body["errcode"],
|
|
Codes.BAD_JSON,
|
|
body,
|
|
)
|