2023-05-09 21:08:51 +03:00
|
|
|
#
|
2023-11-21 23:29:58 +03:00
|
|
|
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
|
|
|
#
|
2024-01-23 14:26:48 +03:00
|
|
|
# Copyright 2023 The Matrix.org Foundation C.I.C.
|
2023-11-21 23:29:58 +03:00
|
|
|
# Copyright (C) 2023 New Vector, Ltd
|
|
|
|
#
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
|
# it under the terms of the GNU Affero General Public License as
|
|
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
|
|
# License, or (at your option) any later version.
|
|
|
|
#
|
|
|
|
# See the GNU Affero General Public License for more details:
|
|
|
|
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
|
|
|
#
|
|
|
|
# Originally licensed under the Apache License, Version 2.0:
|
|
|
|
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
|
|
|
#
|
|
|
|
# [This file includes modifications made by New Vector Limited]
|
2023-05-09 21:08:51 +03:00
|
|
|
#
|
|
|
|
#
|
|
|
|
from typing import Dict
|
|
|
|
|
|
|
|
from twisted.test.proto_helpers import MemoryReactor
|
|
|
|
from twisted.web.resource import Resource
|
|
|
|
|
|
|
|
from synapse.media._base import FileInfo
|
|
|
|
from synapse.server import HomeServer
|
|
|
|
from synapse.util import Clock
|
|
|
|
|
|
|
|
from tests import unittest
|
|
|
|
from tests.test_utils import SMALL_PNG
|
|
|
|
from tests.unittest import override_config
|
|
|
|
|
|
|
|
|
|
|
|
class MediaDomainBlockingTests(unittest.HomeserverTestCase):
|
|
|
|
remote_media_id = "doesnotmatter"
|
|
|
|
remote_server_name = "evil.com"
|
|
|
|
|
|
|
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
|
|
|
self.store = hs.get_datastores().main
|
|
|
|
|
|
|
|
# Inject a piece of media. We'll use this to ensure we're returning a sane
|
|
|
|
# response when we're not supposed to block it, distinguishing a media block
|
|
|
|
# from a regular 404.
|
|
|
|
file_id = "abcdefg12345"
|
|
|
|
file_info = FileInfo(server_name=self.remote_server_name, file_id=file_id)
|
|
|
|
with hs.get_media_repository().media_storage.store_into_file(file_info) as (
|
|
|
|
f,
|
|
|
|
fname,
|
|
|
|
finish,
|
|
|
|
):
|
|
|
|
f.write(SMALL_PNG)
|
|
|
|
self.get_success(finish())
|
|
|
|
|
|
|
|
self.get_success(
|
|
|
|
self.store.store_cached_remote_media(
|
|
|
|
origin=self.remote_server_name,
|
|
|
|
media_id=self.remote_media_id,
|
|
|
|
media_type="image/png",
|
|
|
|
media_length=1,
|
|
|
|
time_now_ms=clock.time_msec(),
|
|
|
|
upload_name="test.png",
|
|
|
|
filesystem_id=file_id,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
def create_resource_dict(self) -> Dict[str, Resource]:
|
|
|
|
# We need to manually set the resource tree to include media, the
|
|
|
|
# default only does `/_matrix/client` APIs.
|
|
|
|
return {"/_matrix/media": self.hs.get_media_repository_resource()}
|
|
|
|
|
|
|
|
@override_config(
|
|
|
|
{
|
|
|
|
# Disable downloads from the domain we'll be trying to download from.
|
|
|
|
# Should result in a 404.
|
|
|
|
"prevent_media_downloads_from": ["evil.com"]
|
|
|
|
}
|
|
|
|
)
|
|
|
|
def test_cannot_download_blocked_media(self) -> None:
|
|
|
|
"""
|
|
|
|
Tests to ensure that remote media which is blocked cannot be downloaded.
|
|
|
|
"""
|
|
|
|
response = self.make_request(
|
|
|
|
"GET",
|
|
|
|
f"/_matrix/media/v3/download/evil.com/{self.remote_media_id}",
|
|
|
|
shorthand=False,
|
|
|
|
)
|
|
|
|
self.assertEqual(response.code, 404)
|
|
|
|
|
|
|
|
@override_config(
|
|
|
|
{
|
|
|
|
# Disable downloads from a domain we won't be requesting downloads from.
|
|
|
|
# This proves we haven't broken anything.
|
|
|
|
"prevent_media_downloads_from": ["not-listed.com"]
|
|
|
|
}
|
|
|
|
)
|
|
|
|
def test_remote_media_normally_unblocked(self) -> None:
|
|
|
|
"""
|
|
|
|
Tests to ensure that remote media is normally able to be downloaded
|
|
|
|
when no domain block is in place.
|
|
|
|
"""
|
|
|
|
response = self.make_request(
|
|
|
|
"GET",
|
|
|
|
f"/_matrix/media/v3/download/evil.com/{self.remote_media_id}",
|
|
|
|
shorthand=False,
|
|
|
|
)
|
|
|
|
self.assertEqual(response.code, 200)
|
|
|
|
|
|
|
|
@override_config(
|
|
|
|
{
|
|
|
|
# Disable downloads from the domain we'll be trying to download from.
|
|
|
|
# Should result in a 404.
|
|
|
|
"prevent_media_downloads_from": ["evil.com"],
|
|
|
|
"dynamic_thumbnails": True,
|
|
|
|
}
|
|
|
|
)
|
|
|
|
def test_cannot_download_blocked_media_thumbnail(self) -> None:
|
|
|
|
"""
|
|
|
|
Same test as test_cannot_download_blocked_media but for thumbnails.
|
|
|
|
"""
|
|
|
|
response = self.make_request(
|
|
|
|
"GET",
|
|
|
|
f"/_matrix/media/v3/thumbnail/evil.com/{self.remote_media_id}?width=100&height=100",
|
|
|
|
shorthand=False,
|
|
|
|
content={"width": 100, "height": 100},
|
|
|
|
)
|
|
|
|
self.assertEqual(response.code, 404)
|
|
|
|
|
|
|
|
@override_config(
|
|
|
|
{
|
|
|
|
# Disable downloads from a domain we won't be requesting downloads from.
|
|
|
|
# This proves we haven't broken anything.
|
|
|
|
"prevent_media_downloads_from": ["not-listed.com"],
|
|
|
|
"dynamic_thumbnails": True,
|
|
|
|
}
|
|
|
|
)
|
|
|
|
def test_remote_media_thumbnail_normally_unblocked(self) -> None:
|
|
|
|
"""
|
|
|
|
Same test as test_remote_media_normally_unblocked but for thumbnails.
|
|
|
|
"""
|
|
|
|
response = self.make_request(
|
|
|
|
"GET",
|
|
|
|
f"/_matrix/media/v3/thumbnail/evil.com/{self.remote_media_id}?width=100&height=100",
|
|
|
|
shorthand=False,
|
|
|
|
)
|
|
|
|
self.assertEqual(response.code, 200)
|