mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-24 10:35:46 +03:00
23740eaa3d
During the migration the automated script to update the copyright headers accidentally got rid of some of the existing copyright lines. Reinstate them.
417 lines
16 KiB
Python
417 lines
16 KiB
Python
#
|
|
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
|
#
|
|
# Copyright 2020-2021 The Matrix.org Foundation C.I.C.
|
|
# Copyright 2014-2016 OpenMarket Ltd
|
|
# Copyright (C) 2023 New Vector, Ltd
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# See the GNU Affero General Public License for more details:
|
|
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
|
#
|
|
# Originally licensed under the Apache License, Version 2.0:
|
|
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
|
#
|
|
# [This file includes modifications made by New Vector Limited]
|
|
#
|
|
#
|
|
|
|
import functools
|
|
import os
|
|
import re
|
|
import string
|
|
from typing import Any, Callable, List, TypeVar, Union, cast
|
|
|
|
NEW_FORMAT_ID_RE = re.compile(r"^\d\d\d\d-\d\d-\d\d")
|
|
|
|
|
|
F = TypeVar("F", bound=Callable[..., str])
|
|
|
|
|
|
def _wrap_in_base_path(func: F) -> F:
|
|
"""Takes a function that returns a relative path and turns it into an
|
|
absolute path based on the location of the primary media store
|
|
"""
|
|
|
|
@functools.wraps(func)
|
|
def _wrapped(self: "MediaFilePaths", *args: Any, **kwargs: Any) -> str:
|
|
path = func(self, *args, **kwargs)
|
|
return os.path.join(self.base_path, path)
|
|
|
|
return cast(F, _wrapped)
|
|
|
|
|
|
GetPathMethod = TypeVar(
|
|
"GetPathMethod", bound=Union[Callable[..., str], Callable[..., List[str]]]
|
|
)
|
|
|
|
|
|
def _wrap_with_jail_check(relative: bool) -> Callable[[GetPathMethod], GetPathMethod]:
|
|
"""Wraps a path-returning method to check that the returned path(s) do not escape
|
|
the media store directory.
|
|
|
|
The path-returning method may return either a single path, or a list of paths.
|
|
|
|
The check is not expected to ever fail, unless `func` is missing a call to
|
|
`_validate_path_component`, or `_validate_path_component` is buggy.
|
|
|
|
Args:
|
|
relative: A boolean indicating whether the wrapped method returns paths relative
|
|
to the media store directory.
|
|
|
|
Returns:
|
|
A method which will wrap a path-returning method, adding a check to ensure that
|
|
the returned path(s) lie within the media store directory. The check will raise
|
|
a `ValueError` if it fails.
|
|
"""
|
|
|
|
def _wrap_with_jail_check_inner(func: GetPathMethod) -> GetPathMethod:
|
|
@functools.wraps(func)
|
|
def _wrapped(
|
|
self: "MediaFilePaths", *args: Any, **kwargs: Any
|
|
) -> Union[str, List[str]]:
|
|
path_or_paths = func(self, *args, **kwargs)
|
|
|
|
if isinstance(path_or_paths, list):
|
|
paths_to_check = path_or_paths
|
|
else:
|
|
paths_to_check = [path_or_paths]
|
|
|
|
for path in paths_to_check:
|
|
# Construct the path that will ultimately be used.
|
|
# We cannot guess whether `path` is relative to the media store
|
|
# directory, since the media store directory may itself be a relative
|
|
# path.
|
|
if relative:
|
|
path = os.path.join(self.base_path, path)
|
|
normalized_path = os.path.normpath(path)
|
|
|
|
# Now that `normpath` has eliminated `../`s and `./`s from the path,
|
|
# `os.path.commonpath` can be used to check whether it lies within the
|
|
# media store directory.
|
|
if (
|
|
os.path.commonpath([normalized_path, self.normalized_base_path])
|
|
!= self.normalized_base_path
|
|
):
|
|
# The path resolves to outside the media store directory,
|
|
# or `self.base_path` is `.`, which is an unlikely configuration.
|
|
raise ValueError(f"Invalid media store path: {path!r}")
|
|
|
|
# Note that `os.path.normpath`/`abspath` has a subtle caveat:
|
|
# `a/b/c/../c` will normalize to `a/b/c`, but the former refers to a
|
|
# different path if `a/b/c` is a symlink. That is, the check above is
|
|
# not perfect and may allow a certain restricted subset of untrustworthy
|
|
# paths through. Since the check above is secondary to the main
|
|
# `_validate_path_component` checks, it's less important for it to be
|
|
# perfect.
|
|
#
|
|
# As an alternative, `os.path.realpath` will resolve symlinks, but
|
|
# proves problematic if there are symlinks inside the media store.
|
|
# eg. if `url_store/` is symlinked to elsewhere, its canonical path
|
|
# won't match that of the main media store directory.
|
|
|
|
return path_or_paths
|
|
|
|
return cast(GetPathMethod, _wrapped)
|
|
|
|
return _wrap_with_jail_check_inner
|
|
|
|
|
|
ALLOWED_CHARACTERS = set(
|
|
string.ascii_letters
|
|
+ string.digits
|
|
+ "_-"
|
|
+ ".[]:" # Domain names, IPv6 addresses and ports in server names
|
|
)
|
|
FORBIDDEN_NAMES = {
|
|
"",
|
|
os.path.curdir, # "." for the current platform
|
|
os.path.pardir, # ".." for the current platform
|
|
}
|
|
|
|
|
|
def _validate_path_component(name: str) -> str:
|
|
"""Checks that the given string can be safely used as a path component
|
|
|
|
Args:
|
|
name: The path component to check.
|
|
|
|
Returns:
|
|
The path component if valid.
|
|
|
|
Raises:
|
|
ValueError: If `name` cannot be safely used as a path component.
|
|
"""
|
|
if not ALLOWED_CHARACTERS.issuperset(name) or name in FORBIDDEN_NAMES:
|
|
raise ValueError(f"Invalid path component: {name!r}")
|
|
|
|
return name
|
|
|
|
|
|
class MediaFilePaths:
|
|
"""Describes where files are stored on disk.
|
|
|
|
Most of the functions have a `*_rel` variant which returns a file path that
|
|
is relative to the base media store path. This is mainly used when we want
|
|
to write to the backup media store (when one is configured)
|
|
"""
|
|
|
|
def __init__(self, primary_base_path: str):
|
|
self.base_path = primary_base_path
|
|
self.normalized_base_path = os.path.normpath(self.base_path)
|
|
|
|
# Refuse to initialize if paths cannot be validated correctly for the current
|
|
# platform.
|
|
assert os.path.sep not in ALLOWED_CHARACTERS
|
|
assert os.path.altsep not in ALLOWED_CHARACTERS
|
|
# On Windows, paths have all sorts of weirdness which `_validate_path_component`
|
|
# does not consider. In any case, the remote media store can't work correctly
|
|
# for certain homeservers there, since ":"s aren't allowed in paths.
|
|
assert os.name == "posix"
|
|
|
|
@_wrap_with_jail_check(relative=True)
|
|
def local_media_filepath_rel(self, media_id: str) -> str:
|
|
return os.path.join(
|
|
"local_content",
|
|
_validate_path_component(media_id[0:2]),
|
|
_validate_path_component(media_id[2:4]),
|
|
_validate_path_component(media_id[4:]),
|
|
)
|
|
|
|
local_media_filepath = _wrap_in_base_path(local_media_filepath_rel)
|
|
|
|
@_wrap_with_jail_check(relative=True)
|
|
def local_media_thumbnail_rel(
|
|
self, media_id: str, width: int, height: int, content_type: str, method: str
|
|
) -> str:
|
|
top_level_type, sub_type = content_type.split("/")
|
|
file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
|
|
return os.path.join(
|
|
"local_thumbnails",
|
|
_validate_path_component(media_id[0:2]),
|
|
_validate_path_component(media_id[2:4]),
|
|
_validate_path_component(media_id[4:]),
|
|
_validate_path_component(file_name),
|
|
)
|
|
|
|
local_media_thumbnail = _wrap_in_base_path(local_media_thumbnail_rel)
|
|
|
|
@_wrap_with_jail_check(relative=False)
|
|
def local_media_thumbnail_dir(self, media_id: str) -> str:
|
|
"""
|
|
Retrieve the local store path of thumbnails of a given media_id
|
|
|
|
Args:
|
|
media_id: The media ID to query.
|
|
Returns:
|
|
Path of local_thumbnails from media_id
|
|
"""
|
|
return os.path.join(
|
|
self.base_path,
|
|
"local_thumbnails",
|
|
_validate_path_component(media_id[0:2]),
|
|
_validate_path_component(media_id[2:4]),
|
|
_validate_path_component(media_id[4:]),
|
|
)
|
|
|
|
@_wrap_with_jail_check(relative=True)
|
|
def remote_media_filepath_rel(self, server_name: str, file_id: str) -> str:
|
|
return os.path.join(
|
|
"remote_content",
|
|
_validate_path_component(server_name),
|
|
_validate_path_component(file_id[0:2]),
|
|
_validate_path_component(file_id[2:4]),
|
|
_validate_path_component(file_id[4:]),
|
|
)
|
|
|
|
remote_media_filepath = _wrap_in_base_path(remote_media_filepath_rel)
|
|
|
|
@_wrap_with_jail_check(relative=True)
|
|
def remote_media_thumbnail_rel(
|
|
self,
|
|
server_name: str,
|
|
file_id: str,
|
|
width: int,
|
|
height: int,
|
|
content_type: str,
|
|
method: str,
|
|
) -> str:
|
|
top_level_type, sub_type = content_type.split("/")
|
|
file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
|
|
return os.path.join(
|
|
"remote_thumbnail",
|
|
_validate_path_component(server_name),
|
|
_validate_path_component(file_id[0:2]),
|
|
_validate_path_component(file_id[2:4]),
|
|
_validate_path_component(file_id[4:]),
|
|
_validate_path_component(file_name),
|
|
)
|
|
|
|
remote_media_thumbnail = _wrap_in_base_path(remote_media_thumbnail_rel)
|
|
|
|
# Legacy path that was used to store thumbnails previously.
|
|
# Should be removed after some time, when most of the thumbnails are stored
|
|
# using the new path.
|
|
@_wrap_with_jail_check(relative=True)
|
|
def remote_media_thumbnail_rel_legacy(
|
|
self, server_name: str, file_id: str, width: int, height: int, content_type: str
|
|
) -> str:
|
|
top_level_type, sub_type = content_type.split("/")
|
|
file_name = "%i-%i-%s-%s" % (width, height, top_level_type, sub_type)
|
|
return os.path.join(
|
|
"remote_thumbnail",
|
|
_validate_path_component(server_name),
|
|
_validate_path_component(file_id[0:2]),
|
|
_validate_path_component(file_id[2:4]),
|
|
_validate_path_component(file_id[4:]),
|
|
_validate_path_component(file_name),
|
|
)
|
|
|
|
@_wrap_with_jail_check(relative=False)
|
|
def remote_media_thumbnail_dir(self, server_name: str, file_id: str) -> str:
|
|
return os.path.join(
|
|
self.base_path,
|
|
"remote_thumbnail",
|
|
_validate_path_component(server_name),
|
|
_validate_path_component(file_id[0:2]),
|
|
_validate_path_component(file_id[2:4]),
|
|
_validate_path_component(file_id[4:]),
|
|
)
|
|
|
|
@_wrap_with_jail_check(relative=True)
|
|
def url_cache_filepath_rel(self, media_id: str) -> str:
|
|
if NEW_FORMAT_ID_RE.match(media_id):
|
|
# Media id is of the form <DATE><RANDOM_STRING>
|
|
# E.g.: 2017-09-28-fsdRDt24DS234dsf
|
|
return os.path.join(
|
|
"url_cache",
|
|
_validate_path_component(media_id[:10]),
|
|
_validate_path_component(media_id[11:]),
|
|
)
|
|
else:
|
|
return os.path.join(
|
|
"url_cache",
|
|
_validate_path_component(media_id[0:2]),
|
|
_validate_path_component(media_id[2:4]),
|
|
_validate_path_component(media_id[4:]),
|
|
)
|
|
|
|
url_cache_filepath = _wrap_in_base_path(url_cache_filepath_rel)
|
|
|
|
@_wrap_with_jail_check(relative=False)
|
|
def url_cache_filepath_dirs_to_delete(self, media_id: str) -> List[str]:
|
|
"The dirs to try and remove if we delete the media_id file"
|
|
if NEW_FORMAT_ID_RE.match(media_id):
|
|
return [
|
|
os.path.join(
|
|
self.base_path, "url_cache", _validate_path_component(media_id[:10])
|
|
)
|
|
]
|
|
else:
|
|
return [
|
|
os.path.join(
|
|
self.base_path,
|
|
"url_cache",
|
|
_validate_path_component(media_id[0:2]),
|
|
_validate_path_component(media_id[2:4]),
|
|
),
|
|
os.path.join(
|
|
self.base_path, "url_cache", _validate_path_component(media_id[0:2])
|
|
),
|
|
]
|
|
|
|
@_wrap_with_jail_check(relative=True)
|
|
def url_cache_thumbnail_rel(
|
|
self, media_id: str, width: int, height: int, content_type: str, method: str
|
|
) -> str:
|
|
# Media id is of the form <DATE><RANDOM_STRING>
|
|
# E.g.: 2017-09-28-fsdRDt24DS234dsf
|
|
|
|
top_level_type, sub_type = content_type.split("/")
|
|
file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
|
|
|
|
if NEW_FORMAT_ID_RE.match(media_id):
|
|
return os.path.join(
|
|
"url_cache_thumbnails",
|
|
_validate_path_component(media_id[:10]),
|
|
_validate_path_component(media_id[11:]),
|
|
_validate_path_component(file_name),
|
|
)
|
|
else:
|
|
return os.path.join(
|
|
"url_cache_thumbnails",
|
|
_validate_path_component(media_id[0:2]),
|
|
_validate_path_component(media_id[2:4]),
|
|
_validate_path_component(media_id[4:]),
|
|
_validate_path_component(file_name),
|
|
)
|
|
|
|
url_cache_thumbnail = _wrap_in_base_path(url_cache_thumbnail_rel)
|
|
|
|
@_wrap_with_jail_check(relative=True)
|
|
def url_cache_thumbnail_directory_rel(self, media_id: str) -> str:
|
|
# Media id is of the form <DATE><RANDOM_STRING>
|
|
# E.g.: 2017-09-28-fsdRDt24DS234dsf
|
|
|
|
if NEW_FORMAT_ID_RE.match(media_id):
|
|
return os.path.join(
|
|
"url_cache_thumbnails",
|
|
_validate_path_component(media_id[:10]),
|
|
_validate_path_component(media_id[11:]),
|
|
)
|
|
else:
|
|
return os.path.join(
|
|
"url_cache_thumbnails",
|
|
_validate_path_component(media_id[0:2]),
|
|
_validate_path_component(media_id[2:4]),
|
|
_validate_path_component(media_id[4:]),
|
|
)
|
|
|
|
url_cache_thumbnail_directory = _wrap_in_base_path(
|
|
url_cache_thumbnail_directory_rel
|
|
)
|
|
|
|
@_wrap_with_jail_check(relative=False)
|
|
def url_cache_thumbnail_dirs_to_delete(self, media_id: str) -> List[str]:
|
|
"The dirs to try and remove if we delete the media_id thumbnails"
|
|
# Media id is of the form <DATE><RANDOM_STRING>
|
|
# E.g.: 2017-09-28-fsdRDt24DS234dsf
|
|
if NEW_FORMAT_ID_RE.match(media_id):
|
|
return [
|
|
os.path.join(
|
|
self.base_path,
|
|
"url_cache_thumbnails",
|
|
_validate_path_component(media_id[:10]),
|
|
_validate_path_component(media_id[11:]),
|
|
),
|
|
os.path.join(
|
|
self.base_path,
|
|
"url_cache_thumbnails",
|
|
_validate_path_component(media_id[:10]),
|
|
),
|
|
]
|
|
else:
|
|
return [
|
|
os.path.join(
|
|
self.base_path,
|
|
"url_cache_thumbnails",
|
|
_validate_path_component(media_id[0:2]),
|
|
_validate_path_component(media_id[2:4]),
|
|
_validate_path_component(media_id[4:]),
|
|
),
|
|
os.path.join(
|
|
self.base_path,
|
|
"url_cache_thumbnails",
|
|
_validate_path_component(media_id[0:2]),
|
|
_validate_path_component(media_id[2:4]),
|
|
),
|
|
os.path.join(
|
|
self.base_path,
|
|
"url_cache_thumbnails",
|
|
_validate_path_component(media_id[0:2]),
|
|
),
|
|
]
|