mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-22 17:46:08 +03:00
876 lines
30 KiB
Python
876 lines
30 KiB
Python
#
|
|
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
|
#
|
|
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
|
# Copyright 2020 Dirk Klimpel
|
|
# Copyright (C) 2023 New Vector, Ltd
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# See the GNU Affero General Public License for more details:
|
|
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
|
#
|
|
# Originally licensed under the Apache License, Version 2.0:
|
|
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
|
#
|
|
# [This file includes modifications made by New Vector Limited]
|
|
#
|
|
#
|
|
import os
|
|
from typing import Dict
|
|
|
|
from parameterized import parameterized
|
|
|
|
from twisted.test.proto_helpers import MemoryReactor
|
|
from twisted.web.resource import Resource
|
|
|
|
import synapse.rest.admin
|
|
from synapse.api.errors import Codes
|
|
from synapse.media.filepath import MediaFilePaths
|
|
from synapse.rest.client import login, profile, room
|
|
from synapse.server import HomeServer
|
|
from synapse.util import Clock
|
|
|
|
from tests import unittest
|
|
from tests.test_utils import SMALL_PNG
|
|
|
|
VALID_TIMESTAMP = 1609459200000 # 2021-01-01 in milliseconds
|
|
INVALID_TIMESTAMP_IN_S = 1893456000 # 2030-01-01 in seconds
|
|
|
|
|
|
class _AdminMediaTests(unittest.HomeserverTestCase):
|
|
servlets = [
|
|
synapse.rest.admin.register_servlets,
|
|
synapse.rest.admin.register_servlets_for_media_repo,
|
|
login.register_servlets,
|
|
]
|
|
|
|
def create_resource_dict(self) -> Dict[str, Resource]:
|
|
resources = super().create_resource_dict()
|
|
resources["/_matrix/media"] = self.hs.get_media_repository_resource()
|
|
return resources
|
|
|
|
|
|
class DeleteMediaByIDTestCase(_AdminMediaTests):
|
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
|
self.server_name = hs.hostname
|
|
|
|
self.admin_user = self.register_user("admin", "pass", admin=True)
|
|
self.admin_user_tok = self.login("admin", "pass")
|
|
|
|
self.filepaths = MediaFilePaths(hs.config.media.media_store_path)
|
|
|
|
def test_no_auth(self) -> None:
|
|
"""
|
|
Try to delete media without authentication.
|
|
"""
|
|
url = "/_synapse/admin/v1/media/%s/%s" % (self.server_name, "12345")
|
|
|
|
channel = self.make_request("DELETE", url, b"{}")
|
|
|
|
self.assertEqual(
|
|
401,
|
|
channel.code,
|
|
msg=channel.json_body,
|
|
)
|
|
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
|
|
|
|
def test_requester_is_no_admin(self) -> None:
|
|
"""
|
|
If the user is not a server admin, an error is returned.
|
|
"""
|
|
self.other_user = self.register_user("user", "pass")
|
|
self.other_user_token = self.login("user", "pass")
|
|
|
|
url = "/_synapse/admin/v1/media/%s/%s" % (self.server_name, "12345")
|
|
|
|
channel = self.make_request(
|
|
"DELETE",
|
|
url,
|
|
access_token=self.other_user_token,
|
|
)
|
|
|
|
self.assertEqual(403, channel.code, msg=channel.json_body)
|
|
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
|
|
|
|
def test_media_does_not_exist(self) -> None:
|
|
"""
|
|
Tests that a lookup for a media that does not exist returns a 404
|
|
"""
|
|
url = "/_synapse/admin/v1/media/%s/%s" % (self.server_name, "12345")
|
|
|
|
channel = self.make_request(
|
|
"DELETE",
|
|
url,
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
|
|
self.assertEqual(404, channel.code, msg=channel.json_body)
|
|
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
|
|
|
|
def test_media_is_not_local(self) -> None:
|
|
"""
|
|
Tests that a lookup for a media that is not a local returns a 400
|
|
"""
|
|
url = "/_synapse/admin/v1/media/%s/%s" % ("unknown_domain", "12345")
|
|
|
|
channel = self.make_request(
|
|
"DELETE",
|
|
url,
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
|
|
self.assertEqual(400, channel.code, msg=channel.json_body)
|
|
self.assertEqual("Can only delete local media", channel.json_body["error"])
|
|
|
|
def test_delete_media(self) -> None:
|
|
"""
|
|
Tests that delete a media is successfully
|
|
"""
|
|
|
|
# Upload some media into the room
|
|
response = self.helper.upload_media(
|
|
SMALL_PNG,
|
|
tok=self.admin_user_tok,
|
|
expect_code=200,
|
|
)
|
|
# Extract media ID from the response
|
|
server_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
|
|
server_name, media_id = server_and_media_id.split("/")
|
|
|
|
self.assertEqual(server_name, self.server_name)
|
|
|
|
# Attempt to access media
|
|
channel = self.make_request(
|
|
"GET",
|
|
f"/_matrix/media/v3/download/{server_and_media_id}",
|
|
shorthand=False,
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
|
|
# Should be successful
|
|
self.assertEqual(
|
|
200,
|
|
channel.code,
|
|
msg=(
|
|
"Expected to receive a 200 on accessing media: %s" % server_and_media_id
|
|
),
|
|
)
|
|
|
|
# Test if the file exists
|
|
local_path = self.filepaths.local_media_filepath(media_id)
|
|
self.assertTrue(os.path.exists(local_path))
|
|
|
|
url = "/_synapse/admin/v1/media/%s/%s" % (self.server_name, media_id)
|
|
|
|
# Delete media
|
|
channel = self.make_request(
|
|
"DELETE",
|
|
url,
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
self.assertEqual(1, channel.json_body["total"])
|
|
self.assertEqual(
|
|
media_id,
|
|
channel.json_body["deleted_media"][0],
|
|
)
|
|
|
|
# Attempt to access media
|
|
channel = self.make_request(
|
|
"GET",
|
|
f"/_matrix/media/v3/download/{server_and_media_id}",
|
|
shorthand=False,
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
self.assertEqual(
|
|
404,
|
|
channel.code,
|
|
msg=(
|
|
"Expected to receive a 404 on accessing deleted media: %s"
|
|
% server_and_media_id
|
|
),
|
|
)
|
|
|
|
# Test if the file is deleted
|
|
self.assertFalse(os.path.exists(local_path))
|
|
|
|
|
|
class DeleteMediaByDateSizeTestCase(_AdminMediaTests):
|
|
servlets = [
|
|
synapse.rest.admin.register_servlets,
|
|
synapse.rest.admin.register_servlets_for_media_repo,
|
|
login.register_servlets,
|
|
profile.register_servlets,
|
|
room.register_servlets,
|
|
]
|
|
|
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
|
self.media_repo = hs.get_media_repository_resource()
|
|
self.server_name = hs.hostname
|
|
|
|
self.admin_user = self.register_user("admin", "pass", admin=True)
|
|
self.admin_user_tok = self.login("admin", "pass")
|
|
|
|
self.filepaths = MediaFilePaths(hs.config.media.media_store_path)
|
|
self.url = "/_synapse/admin/v1/media/delete"
|
|
self.legacy_url = "/_synapse/admin/v1/media/%s/delete" % self.server_name
|
|
|
|
# Move clock up to somewhat realistic time
|
|
self.reactor.advance(1000000000)
|
|
|
|
def test_no_auth(self) -> None:
|
|
"""
|
|
Try to delete media without authentication.
|
|
"""
|
|
|
|
channel = self.make_request("POST", self.url, b"{}")
|
|
|
|
self.assertEqual(401, channel.code, msg=channel.json_body)
|
|
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
|
|
|
|
def test_requester_is_no_admin(self) -> None:
|
|
"""
|
|
If the user is not a server admin, an error is returned.
|
|
"""
|
|
self.other_user = self.register_user("user", "pass")
|
|
self.other_user_token = self.login("user", "pass")
|
|
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url,
|
|
access_token=self.other_user_token,
|
|
)
|
|
|
|
self.assertEqual(403, channel.code, msg=channel.json_body)
|
|
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
|
|
|
|
def test_media_is_not_local(self) -> None:
|
|
"""
|
|
Tests that a lookup for media that is not local returns a 400
|
|
"""
|
|
url = "/_synapse/admin/v1/media/%s/delete" % "unknown_domain"
|
|
|
|
channel = self.make_request(
|
|
"POST",
|
|
url + f"?before_ts={VALID_TIMESTAMP}",
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
|
|
self.assertEqual(400, channel.code, msg=channel.json_body)
|
|
self.assertEqual("Can only delete local media", channel.json_body["error"])
|
|
|
|
def test_missing_parameter(self) -> None:
|
|
"""
|
|
If the parameter `before_ts` is missing, an error is returned.
|
|
"""
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url,
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
|
|
self.assertEqual(400, channel.code, msg=channel.json_body)
|
|
self.assertEqual(Codes.MISSING_PARAM, channel.json_body["errcode"])
|
|
self.assertEqual(
|
|
"Missing required integer query parameter before_ts",
|
|
channel.json_body["error"],
|
|
)
|
|
|
|
def test_invalid_parameter(self) -> None:
|
|
"""
|
|
If parameters are invalid, an error is returned.
|
|
"""
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url + "?before_ts=-1234",
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
|
|
self.assertEqual(400, channel.code, msg=channel.json_body)
|
|
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
|
|
self.assertEqual(
|
|
"Query parameter before_ts must be a positive integer.",
|
|
channel.json_body["error"],
|
|
)
|
|
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url + f"?before_ts={INVALID_TIMESTAMP_IN_S}",
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
|
|
self.assertEqual(400, channel.code, msg=channel.json_body)
|
|
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
|
|
self.assertEqual(
|
|
"Query parameter before_ts you provided is from the year 1970. "
|
|
+ "Double check that you are providing a timestamp in milliseconds.",
|
|
channel.json_body["error"],
|
|
)
|
|
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url + f"?before_ts={VALID_TIMESTAMP}&size_gt=-1234",
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
|
|
self.assertEqual(400, channel.code, msg=channel.json_body)
|
|
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
|
|
self.assertEqual(
|
|
"Query parameter size_gt must be a positive integer.",
|
|
channel.json_body["error"],
|
|
)
|
|
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url + f"?before_ts={VALID_TIMESTAMP}&keep_profiles=not_bool",
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
|
|
self.assertEqual(400, channel.code, msg=channel.json_body)
|
|
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
|
|
self.assertEqual(
|
|
"Boolean query parameter 'keep_profiles' must be one of ['true', 'false']",
|
|
channel.json_body["error"],
|
|
)
|
|
|
|
@parameterized.expand([(True,), (False,)])
|
|
def test_delete_media_never_accessed(self, use_legacy_url: bool) -> None:
|
|
"""
|
|
Tests that media deleted if it is older than `before_ts` and never accessed
|
|
`last_access_ts` is `NULL` and `created_ts` < `before_ts`
|
|
"""
|
|
url = self.legacy_url if use_legacy_url else self.url
|
|
|
|
# upload and do not access
|
|
server_and_media_id = self._create_media()
|
|
self.pump(1.0)
|
|
|
|
# test that the file exists
|
|
media_id = server_and_media_id.split("/")[1]
|
|
local_path = self.filepaths.local_media_filepath(media_id)
|
|
self.assertTrue(os.path.exists(local_path))
|
|
|
|
# timestamp after upload/create
|
|
now_ms = self.clock.time_msec()
|
|
channel = self.make_request(
|
|
"POST",
|
|
url + "?before_ts=" + str(now_ms),
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
self.assertEqual(1, channel.json_body["total"])
|
|
self.assertEqual(
|
|
media_id,
|
|
channel.json_body["deleted_media"][0],
|
|
)
|
|
|
|
self._access_media(server_and_media_id, False)
|
|
|
|
def test_keep_media_by_date(self) -> None:
|
|
"""
|
|
Tests that media is not deleted if it is newer than `before_ts`
|
|
"""
|
|
|
|
# timestamp before upload
|
|
now_ms = self.clock.time_msec()
|
|
server_and_media_id = self._create_media()
|
|
|
|
self._access_media(server_and_media_id)
|
|
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url + "?before_ts=" + str(now_ms),
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
self.assertEqual(0, channel.json_body["total"])
|
|
|
|
self._access_media(server_and_media_id)
|
|
|
|
# timestamp after upload
|
|
now_ms = self.clock.time_msec()
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url + "?before_ts=" + str(now_ms),
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
self.assertEqual(1, channel.json_body["total"])
|
|
self.assertEqual(
|
|
server_and_media_id.split("/")[1],
|
|
channel.json_body["deleted_media"][0],
|
|
)
|
|
|
|
self._access_media(server_and_media_id, False)
|
|
|
|
def test_keep_media_by_size(self) -> None:
|
|
"""
|
|
Tests that media is not deleted if its size is smaller than or equal
|
|
to `size_gt`
|
|
"""
|
|
server_and_media_id = self._create_media()
|
|
|
|
self._access_media(server_and_media_id)
|
|
|
|
now_ms = self.clock.time_msec()
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url + "?before_ts=" + str(now_ms) + "&size_gt=67",
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
self.assertEqual(0, channel.json_body["total"])
|
|
|
|
self._access_media(server_and_media_id)
|
|
|
|
now_ms = self.clock.time_msec()
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url + "?before_ts=" + str(now_ms) + "&size_gt=66",
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
self.assertEqual(1, channel.json_body["total"])
|
|
self.assertEqual(
|
|
server_and_media_id.split("/")[1],
|
|
channel.json_body["deleted_media"][0],
|
|
)
|
|
|
|
self._access_media(server_and_media_id, False)
|
|
|
|
def test_keep_media_by_user_avatar(self) -> None:
|
|
"""
|
|
Tests that we do not delete media if is used as a user avatar
|
|
Tests parameter `keep_profiles`
|
|
"""
|
|
server_and_media_id = self._create_media()
|
|
|
|
self._access_media(server_and_media_id)
|
|
|
|
# set media as avatar
|
|
channel = self.make_request(
|
|
"PUT",
|
|
"/profile/%s/avatar_url" % (self.admin_user,),
|
|
content={"avatar_url": "mxc://%s" % (server_and_media_id,)},
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
|
|
now_ms = self.clock.time_msec()
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url + "?before_ts=" + str(now_ms) + "&keep_profiles=true",
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
self.assertEqual(0, channel.json_body["total"])
|
|
|
|
self._access_media(server_and_media_id)
|
|
|
|
now_ms = self.clock.time_msec()
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url + "?before_ts=" + str(now_ms) + "&keep_profiles=false",
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
self.assertEqual(1, channel.json_body["total"])
|
|
self.assertEqual(
|
|
server_and_media_id.split("/")[1],
|
|
channel.json_body["deleted_media"][0],
|
|
)
|
|
|
|
self._access_media(server_and_media_id, False)
|
|
|
|
def test_keep_media_by_room_avatar(self) -> None:
|
|
"""
|
|
Tests that we do not delete media if it is used as a room avatar
|
|
Tests parameter `keep_profiles`
|
|
"""
|
|
server_and_media_id = self._create_media()
|
|
|
|
self._access_media(server_and_media_id)
|
|
|
|
# set media as room avatar
|
|
room_id = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
|
|
channel = self.make_request(
|
|
"PUT",
|
|
"/rooms/%s/state/m.room.avatar" % (room_id,),
|
|
content={"url": "mxc://%s" % (server_and_media_id,)},
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
|
|
now_ms = self.clock.time_msec()
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url + "?before_ts=" + str(now_ms) + "&keep_profiles=true",
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
self.assertEqual(0, channel.json_body["total"])
|
|
|
|
self._access_media(server_and_media_id)
|
|
|
|
now_ms = self.clock.time_msec()
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url + "?before_ts=" + str(now_ms) + "&keep_profiles=false",
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
self.assertEqual(1, channel.json_body["total"])
|
|
self.assertEqual(
|
|
server_and_media_id.split("/")[1],
|
|
channel.json_body["deleted_media"][0],
|
|
)
|
|
|
|
self._access_media(server_and_media_id, False)
|
|
|
|
def _create_media(self) -> str:
|
|
"""
|
|
Create a media and return media_id and server_and_media_id
|
|
"""
|
|
# Upload some media into the room
|
|
response = self.helper.upload_media(
|
|
SMALL_PNG,
|
|
tok=self.admin_user_tok,
|
|
expect_code=200,
|
|
)
|
|
# Extract media ID from the response
|
|
server_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
|
|
server_name = server_and_media_id.split("/")[0]
|
|
|
|
# Check that new media is a local and not remote
|
|
self.assertEqual(server_name, self.server_name)
|
|
|
|
return server_and_media_id
|
|
|
|
def _access_media(
|
|
self, server_and_media_id: str, expect_success: bool = True
|
|
) -> None:
|
|
"""
|
|
Try to access a media and check the result
|
|
"""
|
|
media_id = server_and_media_id.split("/")[1]
|
|
local_path = self.filepaths.local_media_filepath(media_id)
|
|
|
|
channel = self.make_request(
|
|
"GET",
|
|
f"/_matrix/media/v3/download/{server_and_media_id}",
|
|
shorthand=False,
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
|
|
if expect_success:
|
|
self.assertEqual(
|
|
200,
|
|
channel.code,
|
|
msg=(
|
|
"Expected to receive a 200 on accessing media: %s"
|
|
% server_and_media_id
|
|
),
|
|
)
|
|
# Test that the file exists
|
|
self.assertTrue(os.path.exists(local_path))
|
|
else:
|
|
self.assertEqual(
|
|
404,
|
|
channel.code,
|
|
msg=(
|
|
"Expected to receive a 404 on accessing deleted media: %s"
|
|
% (server_and_media_id)
|
|
),
|
|
)
|
|
# Test that the file is deleted
|
|
self.assertFalse(os.path.exists(local_path))
|
|
|
|
|
|
class QuarantineMediaByIDTestCase(_AdminMediaTests):
|
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
|
self.store = hs.get_datastores().main
|
|
self.server_name = hs.hostname
|
|
|
|
self.admin_user = self.register_user("admin", "pass", admin=True)
|
|
self.admin_user_tok = self.login("admin", "pass")
|
|
|
|
# Upload some media into the room
|
|
response = self.helper.upload_media(
|
|
SMALL_PNG,
|
|
tok=self.admin_user_tok,
|
|
expect_code=200,
|
|
)
|
|
# Extract media ID from the response
|
|
server_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
|
|
self.media_id = server_and_media_id.split("/")[1]
|
|
|
|
self.url = "/_synapse/admin/v1/media/%s/%s/%s"
|
|
|
|
@parameterized.expand(["quarantine", "unquarantine"])
|
|
def test_no_auth(self, action: str) -> None:
|
|
"""
|
|
Try to protect media without authentication.
|
|
"""
|
|
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url % (action, self.server_name, self.media_id),
|
|
b"{}",
|
|
)
|
|
|
|
self.assertEqual(401, channel.code, msg=channel.json_body)
|
|
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
|
|
|
|
@parameterized.expand(["quarantine", "unquarantine"])
|
|
def test_requester_is_no_admin(self, action: str) -> None:
|
|
"""
|
|
If the user is not a server admin, an error is returned.
|
|
"""
|
|
self.other_user = self.register_user("user", "pass")
|
|
self.other_user_token = self.login("user", "pass")
|
|
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url % (action, self.server_name, self.media_id),
|
|
access_token=self.other_user_token,
|
|
)
|
|
|
|
self.assertEqual(403, channel.code, msg=channel.json_body)
|
|
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
|
|
|
|
def test_quarantine_media(self) -> None:
|
|
"""
|
|
Tests that quarantining and remove from quarantine a media is successfully
|
|
"""
|
|
|
|
media_info = self.get_success(self.store.get_local_media(self.media_id))
|
|
assert media_info is not None
|
|
self.assertFalse(media_info.quarantined_by)
|
|
|
|
# quarantining
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url % ("quarantine", self.server_name, self.media_id),
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
self.assertFalse(channel.json_body)
|
|
|
|
media_info = self.get_success(self.store.get_local_media(self.media_id))
|
|
assert media_info is not None
|
|
self.assertTrue(media_info.quarantined_by)
|
|
|
|
# remove from quarantine
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url % ("unquarantine", self.server_name, self.media_id),
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
self.assertFalse(channel.json_body)
|
|
|
|
media_info = self.get_success(self.store.get_local_media(self.media_id))
|
|
assert media_info is not None
|
|
self.assertFalse(media_info.quarantined_by)
|
|
|
|
def test_quarantine_protected_media(self) -> None:
|
|
"""
|
|
Tests that quarantining from protected media fails
|
|
"""
|
|
|
|
# protect
|
|
self.get_success(self.store.mark_local_media_as_safe(self.media_id, safe=True))
|
|
|
|
# verify protection
|
|
media_info = self.get_success(self.store.get_local_media(self.media_id))
|
|
assert media_info is not None
|
|
self.assertTrue(media_info.safe_from_quarantine)
|
|
|
|
# quarantining
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url % ("quarantine", self.server_name, self.media_id),
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
self.assertFalse(channel.json_body)
|
|
|
|
# verify that is not in quarantine
|
|
media_info = self.get_success(self.store.get_local_media(self.media_id))
|
|
assert media_info is not None
|
|
self.assertFalse(media_info.quarantined_by)
|
|
|
|
|
|
class ProtectMediaByIDTestCase(_AdminMediaTests):
|
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
|
hs.get_media_repository_resource()
|
|
self.store = hs.get_datastores().main
|
|
|
|
self.admin_user = self.register_user("admin", "pass", admin=True)
|
|
self.admin_user_tok = self.login("admin", "pass")
|
|
|
|
# Upload some media into the room
|
|
response = self.helper.upload_media(
|
|
SMALL_PNG,
|
|
tok=self.admin_user_tok,
|
|
expect_code=200,
|
|
)
|
|
# Extract media ID from the response
|
|
server_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
|
|
self.media_id = server_and_media_id.split("/")[1]
|
|
|
|
self.url = "/_synapse/admin/v1/media/%s/%s"
|
|
|
|
@parameterized.expand(["protect", "unprotect"])
|
|
def test_no_auth(self, action: str) -> None:
|
|
"""
|
|
Try to protect media without authentication.
|
|
"""
|
|
|
|
channel = self.make_request("POST", self.url % (action, self.media_id), b"{}")
|
|
|
|
self.assertEqual(401, channel.code, msg=channel.json_body)
|
|
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
|
|
|
|
@parameterized.expand(["protect", "unprotect"])
|
|
def test_requester_is_no_admin(self, action: str) -> None:
|
|
"""
|
|
If the user is not a server admin, an error is returned.
|
|
"""
|
|
self.other_user = self.register_user("user", "pass")
|
|
self.other_user_token = self.login("user", "pass")
|
|
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url % (action, self.media_id),
|
|
access_token=self.other_user_token,
|
|
)
|
|
|
|
self.assertEqual(403, channel.code, msg=channel.json_body)
|
|
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
|
|
|
|
def test_protect_media(self) -> None:
|
|
"""
|
|
Tests that protect and unprotect a media is successfully
|
|
"""
|
|
|
|
media_info = self.get_success(self.store.get_local_media(self.media_id))
|
|
assert media_info is not None
|
|
self.assertFalse(media_info.safe_from_quarantine)
|
|
|
|
# protect
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url % ("protect", self.media_id),
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
self.assertFalse(channel.json_body)
|
|
|
|
media_info = self.get_success(self.store.get_local_media(self.media_id))
|
|
assert media_info is not None
|
|
self.assertTrue(media_info.safe_from_quarantine)
|
|
|
|
# unprotect
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url % ("unprotect", self.media_id),
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
|
|
self.assertEqual(200, channel.code, msg=channel.json_body)
|
|
self.assertFalse(channel.json_body)
|
|
|
|
media_info = self.get_success(self.store.get_local_media(self.media_id))
|
|
assert media_info is not None
|
|
self.assertFalse(media_info.safe_from_quarantine)
|
|
|
|
|
|
class PurgeMediaCacheTestCase(_AdminMediaTests):
|
|
servlets = [
|
|
synapse.rest.admin.register_servlets,
|
|
synapse.rest.admin.register_servlets_for_media_repo,
|
|
login.register_servlets,
|
|
profile.register_servlets,
|
|
room.register_servlets,
|
|
]
|
|
|
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
|
self.media_repo = hs.get_media_repository_resource()
|
|
self.server_name = hs.hostname
|
|
|
|
self.admin_user = self.register_user("admin", "pass", admin=True)
|
|
self.admin_user_tok = self.login("admin", "pass")
|
|
|
|
self.filepaths = MediaFilePaths(hs.config.media.media_store_path)
|
|
self.url = "/_synapse/admin/v1/purge_media_cache"
|
|
|
|
def test_no_auth(self) -> None:
|
|
"""
|
|
Try to delete media without authentication.
|
|
"""
|
|
|
|
channel = self.make_request("POST", self.url, b"{}")
|
|
|
|
self.assertEqual(
|
|
401,
|
|
channel.code,
|
|
msg=channel.json_body,
|
|
)
|
|
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
|
|
|
|
def test_requester_is_not_admin(self) -> None:
|
|
"""
|
|
If the user is not a server admin, an error is returned.
|
|
"""
|
|
self.other_user = self.register_user("user", "pass")
|
|
self.other_user_token = self.login("user", "pass")
|
|
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url,
|
|
access_token=self.other_user_token,
|
|
)
|
|
|
|
self.assertEqual(403, channel.code, msg=channel.json_body)
|
|
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
|
|
|
|
def test_invalid_parameter(self) -> None:
|
|
"""
|
|
If parameters are invalid, an error is returned.
|
|
"""
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url + "?before_ts=-1234",
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
|
|
self.assertEqual(400, channel.code, msg=channel.json_body)
|
|
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
|
|
self.assertEqual(
|
|
"Query parameter before_ts must be a positive integer.",
|
|
channel.json_body["error"],
|
|
)
|
|
|
|
channel = self.make_request(
|
|
"POST",
|
|
self.url + f"?before_ts={INVALID_TIMESTAMP_IN_S}",
|
|
access_token=self.admin_user_tok,
|
|
)
|
|
|
|
self.assertEqual(400, channel.code, msg=channel.json_body)
|
|
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
|
|
self.assertEqual(
|
|
"Query parameter before_ts you provided is from the year 1970. "
|
|
+ "Double check that you are providing a timestamp in milliseconds.",
|
|
channel.json_body["error"],
|
|
)
|