mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-23 18:15:53 +03:00
Merge pull request #2767 from matrix-org/erikj/media_storage_refactor
Refactor MediaRepository to separate out storage
This commit is contained in:
commit
1159abbdd2
7 changed files with 839 additions and 342 deletions
|
@ -70,6 +70,32 @@ def respond_with_file(request, media_type, file_path,
|
||||||
logger.debug("Responding with %r", file_path)
|
logger.debug("Responding with %r", file_path)
|
||||||
|
|
||||||
if os.path.isfile(file_path):
|
if os.path.isfile(file_path):
|
||||||
|
if file_size is None:
|
||||||
|
stat = os.stat(file_path)
|
||||||
|
file_size = stat.st_size
|
||||||
|
|
||||||
|
add_file_headers(request, media_type, file_size, upload_name)
|
||||||
|
|
||||||
|
with open(file_path, "rb") as f:
|
||||||
|
yield logcontext.make_deferred_yieldable(
|
||||||
|
FileSender().beginFileTransfer(f, request)
|
||||||
|
)
|
||||||
|
|
||||||
|
finish_request(request)
|
||||||
|
else:
|
||||||
|
respond_404(request)
|
||||||
|
|
||||||
|
|
||||||
|
def add_file_headers(request, media_type, file_size, upload_name):
|
||||||
|
"""Adds the correct response headers in preparation for responding with the
|
||||||
|
media.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
request (twisted.web.http.Request)
|
||||||
|
media_type (str): The media/content type.
|
||||||
|
file_size (int): Size in bytes of the media, if known.
|
||||||
|
upload_name (str): The name of the requested file, if any.
|
||||||
|
"""
|
||||||
request.setHeader(b"Content-Type", media_type.encode("UTF-8"))
|
request.setHeader(b"Content-Type", media_type.encode("UTF-8"))
|
||||||
if upload_name:
|
if upload_name:
|
||||||
if is_ascii(upload_name):
|
if is_ascii(upload_name):
|
||||||
|
@ -95,19 +121,81 @@ def respond_with_file(request, media_type, file_path,
|
||||||
request.setHeader(
|
request.setHeader(
|
||||||
b"Cache-Control", b"public,max-age=86400,s-maxage=86400"
|
b"Cache-Control", b"public,max-age=86400,s-maxage=86400"
|
||||||
)
|
)
|
||||||
if file_size is None:
|
|
||||||
stat = os.stat(file_path)
|
|
||||||
file_size = stat.st_size
|
|
||||||
|
|
||||||
request.setHeader(
|
request.setHeader(
|
||||||
b"Content-Length", b"%d" % (file_size,)
|
b"Content-Length", b"%d" % (file_size,)
|
||||||
)
|
)
|
||||||
|
|
||||||
with open(file_path, "rb") as f:
|
|
||||||
yield logcontext.make_deferred_yieldable(
|
|
||||||
FileSender().beginFileTransfer(f, request)
|
|
||||||
)
|
|
||||||
|
|
||||||
finish_request(request)
|
@defer.inlineCallbacks
|
||||||
else:
|
def respond_with_responder(request, responder, media_type, file_size, upload_name=None):
|
||||||
|
"""Responds to the request with given responder. If responder is None then
|
||||||
|
returns 404.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
request (twisted.web.http.Request)
|
||||||
|
responder (Responder|None)
|
||||||
|
media_type (str): The media/content type.
|
||||||
|
file_size (int|None): Size in bytes of the media. If not known it should be None
|
||||||
|
upload_name (str|None): The name of the requested file, if any.
|
||||||
|
"""
|
||||||
|
if not responder:
|
||||||
respond_404(request)
|
respond_404(request)
|
||||||
|
return
|
||||||
|
|
||||||
|
add_file_headers(request, media_type, file_size, upload_name)
|
||||||
|
with responder:
|
||||||
|
yield responder.write_to_consumer(request)
|
||||||
|
finish_request(request)
|
||||||
|
|
||||||
|
|
||||||
|
class Responder(object):
|
||||||
|
"""Represents a response that can be streamed to the requester.
|
||||||
|
|
||||||
|
Responder is a context manager which *must* be used, so that any resources
|
||||||
|
held can be cleaned up.
|
||||||
|
"""
|
||||||
|
def write_to_consumer(self, consumer):
|
||||||
|
"""Stream response into consumer
|
||||||
|
|
||||||
|
Args:
|
||||||
|
consumer (IConsumer)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred: Resolves once the response has finished being written
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class FileInfo(object):
|
||||||
|
"""Details about a requested/uploaded file.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
server_name (str): The server name where the media originated from,
|
||||||
|
or None if local.
|
||||||
|
file_id (str): The local ID of the file. For local files this is the
|
||||||
|
same as the media_id
|
||||||
|
url_cache (bool): If the file is for the url preview cache
|
||||||
|
thumbnail (bool): Whether the file is a thumbnail or not.
|
||||||
|
thumbnail_width (int)
|
||||||
|
thumbnail_height (int)
|
||||||
|
thumbnail_method (str)
|
||||||
|
thumbnail_type (str): Content type of thumbnail, e.g. image/png
|
||||||
|
"""
|
||||||
|
def __init__(self, server_name, file_id, url_cache=False,
|
||||||
|
thumbnail=False, thumbnail_width=None, thumbnail_height=None,
|
||||||
|
thumbnail_method=None, thumbnail_type=None):
|
||||||
|
self.server_name = server_name
|
||||||
|
self.file_id = file_id
|
||||||
|
self.url_cache = url_cache
|
||||||
|
self.thumbnail = thumbnail
|
||||||
|
self.thumbnail_width = thumbnail_width
|
||||||
|
self.thumbnail_height = thumbnail_height
|
||||||
|
self.thumbnail_method = thumbnail_method
|
||||||
|
self.thumbnail_type = thumbnail_type
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
import synapse.http.servlet
|
import synapse.http.servlet
|
||||||
|
|
||||||
from ._base import parse_media_id, respond_with_file, respond_404
|
from ._base import parse_media_id, respond_404
|
||||||
from twisted.web.resource import Resource
|
from twisted.web.resource import Resource
|
||||||
from synapse.http.server import request_handler, set_cors_headers
|
from synapse.http.server import request_handler, set_cors_headers
|
||||||
|
|
||||||
|
@ -32,12 +32,12 @@ class DownloadResource(Resource):
|
||||||
def __init__(self, hs, media_repo):
|
def __init__(self, hs, media_repo):
|
||||||
Resource.__init__(self)
|
Resource.__init__(self)
|
||||||
|
|
||||||
self.filepaths = media_repo.filepaths
|
|
||||||
self.media_repo = media_repo
|
self.media_repo = media_repo
|
||||||
self.server_name = hs.hostname
|
self.server_name = hs.hostname
|
||||||
self.store = hs.get_datastore()
|
|
||||||
self.version_string = hs.version_string
|
# Both of these are expected by @request_handler()
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
|
self.version_string = hs.version_string
|
||||||
|
|
||||||
def render_GET(self, request):
|
def render_GET(self, request):
|
||||||
self._async_render_GET(request)
|
self._async_render_GET(request)
|
||||||
|
@ -57,37 +57,8 @@ class DownloadResource(Resource):
|
||||||
)
|
)
|
||||||
server_name, media_id, name = parse_media_id(request)
|
server_name, media_id, name = parse_media_id(request)
|
||||||
if server_name == self.server_name:
|
if server_name == self.server_name:
|
||||||
yield self._respond_local_file(request, media_id, name)
|
yield self.media_repo.get_local_media(request, media_id, name)
|
||||||
else:
|
else:
|
||||||
yield self._respond_remote_file(
|
|
||||||
request, server_name, media_id, name
|
|
||||||
)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def _respond_local_file(self, request, media_id, name):
|
|
||||||
media_info = yield self.store.get_local_media(media_id)
|
|
||||||
if not media_info or media_info["quarantined_by"]:
|
|
||||||
respond_404(request)
|
|
||||||
return
|
|
||||||
|
|
||||||
media_type = media_info["media_type"]
|
|
||||||
media_length = media_info["media_length"]
|
|
||||||
upload_name = name if name else media_info["upload_name"]
|
|
||||||
if media_info["url_cache"]:
|
|
||||||
# TODO: Check the file still exists, if it doesn't we can redownload
|
|
||||||
# it from the url `media_info["url_cache"]`
|
|
||||||
file_path = self.filepaths.url_cache_filepath(media_id)
|
|
||||||
else:
|
|
||||||
file_path = self.filepaths.local_media_filepath(media_id)
|
|
||||||
|
|
||||||
yield respond_with_file(
|
|
||||||
request, media_type, file_path, media_length,
|
|
||||||
upload_name=upload_name,
|
|
||||||
)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def _respond_remote_file(self, request, server_name, media_id, name):
|
|
||||||
# don't forward requests for remote media if allow_remote is false
|
|
||||||
allow_remote = synapse.http.servlet.parse_boolean(
|
allow_remote = synapse.http.servlet.parse_boolean(
|
||||||
request, "allow_remote", default=True)
|
request, "allow_remote", default=True)
|
||||||
if not allow_remote:
|
if not allow_remote:
|
||||||
|
@ -98,18 +69,4 @@ class DownloadResource(Resource):
|
||||||
respond_404(request)
|
respond_404(request)
|
||||||
return
|
return
|
||||||
|
|
||||||
media_info = yield self.media_repo.get_remote_media(server_name, media_id)
|
yield self.media_repo.get_remote_media(request, server_name, media_id, name)
|
||||||
|
|
||||||
media_type = media_info["media_type"]
|
|
||||||
media_length = media_info["media_length"]
|
|
||||||
filesystem_id = media_info["filesystem_id"]
|
|
||||||
upload_name = name if name else media_info["upload_name"]
|
|
||||||
|
|
||||||
file_path = self.filepaths.remote_media_filepath(
|
|
||||||
server_name, filesystem_id
|
|
||||||
)
|
|
||||||
|
|
||||||
yield respond_with_file(
|
|
||||||
request, media_type, file_path, media_length,
|
|
||||||
upload_name=upload_name,
|
|
||||||
)
|
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
# Copyright 2014-2016 OpenMarket Ltd
|
# Copyright 2014-2016 OpenMarket Ltd
|
||||||
|
# Copyright 2018 New Vector Ltd
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
# you may not use this file except in compliance with the License.
|
# you may not use this file except in compliance with the License.
|
||||||
|
@ -18,6 +19,7 @@ import twisted.internet.error
|
||||||
import twisted.web.http
|
import twisted.web.http
|
||||||
from twisted.web.resource import Resource
|
from twisted.web.resource import Resource
|
||||||
|
|
||||||
|
from ._base import respond_404, FileInfo, respond_with_responder
|
||||||
from .upload_resource import UploadResource
|
from .upload_resource import UploadResource
|
||||||
from .download_resource import DownloadResource
|
from .download_resource import DownloadResource
|
||||||
from .thumbnail_resource import ThumbnailResource
|
from .thumbnail_resource import ThumbnailResource
|
||||||
|
@ -25,6 +27,10 @@ from .identicon_resource import IdenticonResource
|
||||||
from .preview_url_resource import PreviewUrlResource
|
from .preview_url_resource import PreviewUrlResource
|
||||||
from .filepath import MediaFilePaths
|
from .filepath import MediaFilePaths
|
||||||
from .thumbnailer import Thumbnailer
|
from .thumbnailer import Thumbnailer
|
||||||
|
from .storage_provider import (
|
||||||
|
StorageProviderWrapper, FileStorageProviderBackend,
|
||||||
|
)
|
||||||
|
from .media_storage import MediaStorage
|
||||||
|
|
||||||
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
|
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
|
||||||
from synapse.util.stringutils import random_string
|
from synapse.util.stringutils import random_string
|
||||||
|
@ -33,7 +39,7 @@ from synapse.api.errors import SynapseError, HttpResponseException, \
|
||||||
|
|
||||||
from synapse.util.async import Linearizer
|
from synapse.util.async import Linearizer
|
||||||
from synapse.util.stringutils import is_ascii
|
from synapse.util.stringutils import is_ascii
|
||||||
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
|
from synapse.util.logcontext import make_deferred_yieldable
|
||||||
from synapse.util.retryutils import NotRetryingDestination
|
from synapse.util.retryutils import NotRetryingDestination
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
@ -63,10 +69,6 @@ class MediaRepository(object):
|
||||||
self.primary_base_path = hs.config.media_store_path
|
self.primary_base_path = hs.config.media_store_path
|
||||||
self.filepaths = MediaFilePaths(self.primary_base_path)
|
self.filepaths = MediaFilePaths(self.primary_base_path)
|
||||||
|
|
||||||
self.backup_base_path = hs.config.backup_media_store_path
|
|
||||||
|
|
||||||
self.synchronous_backup_media_store = hs.config.synchronous_backup_media_store
|
|
||||||
|
|
||||||
self.dynamic_thumbnails = hs.config.dynamic_thumbnails
|
self.dynamic_thumbnails = hs.config.dynamic_thumbnails
|
||||||
self.thumbnail_requirements = hs.config.thumbnail_requirements
|
self.thumbnail_requirements = hs.config.thumbnail_requirements
|
||||||
|
|
||||||
|
@ -74,6 +76,28 @@ class MediaRepository(object):
|
||||||
|
|
||||||
self.recently_accessed_remotes = set()
|
self.recently_accessed_remotes = set()
|
||||||
|
|
||||||
|
# List of StorageProviders where we should search for media and
|
||||||
|
# potentially upload to.
|
||||||
|
storage_providers = []
|
||||||
|
|
||||||
|
# TODO: Move this into config and allow other storage providers to be
|
||||||
|
# defined.
|
||||||
|
if hs.config.backup_media_store_path:
|
||||||
|
backend = FileStorageProviderBackend(
|
||||||
|
self.primary_base_path, hs.config.backup_media_store_path,
|
||||||
|
)
|
||||||
|
provider = StorageProviderWrapper(
|
||||||
|
backend,
|
||||||
|
store=True,
|
||||||
|
store_synchronous=hs.config.synchronous_backup_media_store,
|
||||||
|
store_remote=True,
|
||||||
|
)
|
||||||
|
storage_providers.append(provider)
|
||||||
|
|
||||||
|
self.media_storage = MediaStorage(
|
||||||
|
self.primary_base_path, self.filepaths, storage_providers,
|
||||||
|
)
|
||||||
|
|
||||||
self.clock.looping_call(
|
self.clock.looping_call(
|
||||||
self._update_recently_accessed_remotes,
|
self._update_recently_accessed_remotes,
|
||||||
UPDATE_RECENTLY_ACCESSED_REMOTES_TS
|
UPDATE_RECENTLY_ACCESSED_REMOTES_TS
|
||||||
|
@ -88,72 +112,6 @@ class MediaRepository(object):
|
||||||
media, self.clock.time_msec()
|
media, self.clock.time_msec()
|
||||||
)
|
)
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _makedirs(filepath):
|
|
||||||
dirname = os.path.dirname(filepath)
|
|
||||||
if not os.path.exists(dirname):
|
|
||||||
os.makedirs(dirname)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _write_file_synchronously(source, fname):
|
|
||||||
"""Write `source` to the path `fname` synchronously. Should be called
|
|
||||||
from a thread.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
source: A file like object to be written
|
|
||||||
fname (str): Path to write to
|
|
||||||
"""
|
|
||||||
MediaRepository._makedirs(fname)
|
|
||||||
source.seek(0) # Ensure we read from the start of the file
|
|
||||||
with open(fname, "wb") as f:
|
|
||||||
shutil.copyfileobj(source, f)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def write_to_file_and_backup(self, source, path):
|
|
||||||
"""Write `source` to the on disk media store, and also the backup store
|
|
||||||
if configured.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
source: A file like object that should be written
|
|
||||||
path (str): Relative path to write file to
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Deferred[str]: the file path written to in the primary media store
|
|
||||||
"""
|
|
||||||
fname = os.path.join(self.primary_base_path, path)
|
|
||||||
|
|
||||||
# Write to the main repository
|
|
||||||
yield make_deferred_yieldable(threads.deferToThread(
|
|
||||||
self._write_file_synchronously, source, fname,
|
|
||||||
))
|
|
||||||
|
|
||||||
# Write to backup repository
|
|
||||||
yield self.copy_to_backup(path)
|
|
||||||
|
|
||||||
defer.returnValue(fname)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def copy_to_backup(self, path):
|
|
||||||
"""Copy a file from the primary to backup media store, if configured.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
path(str): Relative path to write file to
|
|
||||||
"""
|
|
||||||
if self.backup_base_path:
|
|
||||||
primary_fname = os.path.join(self.primary_base_path, path)
|
|
||||||
backup_fname = os.path.join(self.backup_base_path, path)
|
|
||||||
|
|
||||||
# We can either wait for successful writing to the backup repository
|
|
||||||
# or write in the background and immediately return
|
|
||||||
if self.synchronous_backup_media_store:
|
|
||||||
yield make_deferred_yieldable(threads.deferToThread(
|
|
||||||
shutil.copyfile, primary_fname, backup_fname,
|
|
||||||
))
|
|
||||||
else:
|
|
||||||
preserve_fn(threads.deferToThread)(
|
|
||||||
shutil.copyfile, primary_fname, backup_fname,
|
|
||||||
)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def create_content(self, media_type, upload_name, content, content_length,
|
def create_content(self, media_type, upload_name, content, content_length,
|
||||||
auth_user):
|
auth_user):
|
||||||
|
@ -171,10 +129,13 @@ class MediaRepository(object):
|
||||||
"""
|
"""
|
||||||
media_id = random_string(24)
|
media_id = random_string(24)
|
||||||
|
|
||||||
fname = yield self.write_to_file_and_backup(
|
file_info = FileInfo(
|
||||||
content, self.filepaths.local_media_filepath_rel(media_id)
|
server_name=None,
|
||||||
|
file_id=media_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
fname = yield self.media_storage.store_file(content, file_info)
|
||||||
|
|
||||||
logger.info("Stored local media in file %r", fname)
|
logger.info("Stored local media in file %r", fname)
|
||||||
|
|
||||||
yield self.store.store_local_media(
|
yield self.store.store_local_media(
|
||||||
|
@ -195,42 +156,144 @@ class MediaRepository(object):
|
||||||
defer.returnValue("mxc://%s/%s" % (self.server_name, media_id))
|
defer.returnValue("mxc://%s/%s" % (self.server_name, media_id))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_remote_media(self, server_name, media_id):
|
def get_local_media(self, request, media_id, name):
|
||||||
|
"""Responds to reqests for local media, if exists, or returns 404.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
request(twisted.web.http.Request)
|
||||||
|
media_id (str): The media ID of the content. (This is the same as
|
||||||
|
the file_id for local content.)
|
||||||
|
name (str|None): Optional name that, if specified, will be used as
|
||||||
|
the filename in the Content-Disposition header of the response.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred: Resolves once a response has successfully been written
|
||||||
|
to request
|
||||||
|
"""
|
||||||
|
media_info = yield self.store.get_local_media(media_id)
|
||||||
|
if not media_info or media_info["quarantined_by"]:
|
||||||
|
respond_404(request)
|
||||||
|
return
|
||||||
|
|
||||||
|
media_type = media_info["media_type"]
|
||||||
|
media_length = media_info["media_length"]
|
||||||
|
upload_name = name if name else media_info["upload_name"]
|
||||||
|
url_cache = media_info["url_cache"]
|
||||||
|
|
||||||
|
file_info = FileInfo(
|
||||||
|
None, media_id,
|
||||||
|
url_cache=url_cache,
|
||||||
|
)
|
||||||
|
|
||||||
|
responder = yield self.media_storage.fetch_media(file_info)
|
||||||
|
yield respond_with_responder(
|
||||||
|
request, responder, media_type, media_length, upload_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_remote_media(self, request, server_name, media_id, name):
|
||||||
|
"""Respond to requests for remote media.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
request(twisted.web.http.Request)
|
||||||
|
server_name (str): Remote server_name where the media originated.
|
||||||
|
media_id (str): The media ID of the content (as defined by the
|
||||||
|
remote server).
|
||||||
|
name (str|None): Optional name that, if specified, will be used as
|
||||||
|
the filename in the Content-Disposition header of the response.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred: Resolves once a response has successfully been written
|
||||||
|
to request
|
||||||
|
"""
|
||||||
|
self.recently_accessed_remotes.add((server_name, media_id))
|
||||||
|
|
||||||
|
# We linearize here to ensure that we don't try and download remote
|
||||||
|
# media multiple times concurrently
|
||||||
key = (server_name, media_id)
|
key = (server_name, media_id)
|
||||||
with (yield self.remote_media_linearizer.queue(key)):
|
with (yield self.remote_media_linearizer.queue(key)):
|
||||||
media_info = yield self._get_remote_media_impl(server_name, media_id)
|
responder, media_info = yield self._get_remote_media_impl(
|
||||||
defer.returnValue(media_info)
|
server_name, media_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
# We deliberately stream the file outside the lock
|
||||||
|
if responder:
|
||||||
|
media_type = media_info["media_type"]
|
||||||
|
media_length = media_info["media_length"]
|
||||||
|
upload_name = name if name else media_info["upload_name"]
|
||||||
|
yield respond_with_responder(
|
||||||
|
request, responder, media_type, media_length, upload_name,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
respond_404(request)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _get_remote_media_impl(self, server_name, media_id):
|
def _get_remote_media_impl(self, server_name, media_id):
|
||||||
|
"""Looks for media in local cache, if not there then attempt to
|
||||||
|
download from remote server.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
server_name (str): Remote server_name where the media originated.
|
||||||
|
media_id (str): The media ID of the content (as defined by the
|
||||||
|
remote server).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[(Responder, media_info)]
|
||||||
|
"""
|
||||||
media_info = yield self.store.get_cached_remote_media(
|
media_info = yield self.store.get_cached_remote_media(
|
||||||
server_name, media_id
|
server_name, media_id
|
||||||
)
|
)
|
||||||
if not media_info:
|
|
||||||
media_info = yield self._download_remote_file(
|
|
||||||
server_name, media_id
|
|
||||||
)
|
|
||||||
elif media_info["quarantined_by"]:
|
|
||||||
raise NotFoundError()
|
|
||||||
else:
|
|
||||||
self.recently_accessed_remotes.add((server_name, media_id))
|
|
||||||
yield self.store.update_cached_last_access_time(
|
|
||||||
[(server_name, media_id)], self.clock.time_msec()
|
|
||||||
)
|
|
||||||
defer.returnValue(media_info)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
# file_id is the ID we use to track the file locally. If we've already
|
||||||
def _download_remote_file(self, server_name, media_id):
|
# seen the file then reuse the existing ID, otherwise genereate a new
|
||||||
|
# one.
|
||||||
|
if media_info:
|
||||||
|
file_id = media_info["filesystem_id"]
|
||||||
|
else:
|
||||||
file_id = random_string(24)
|
file_id = random_string(24)
|
||||||
|
|
||||||
fpath = self.filepaths.remote_media_filepath_rel(
|
file_info = FileInfo(server_name, file_id)
|
||||||
server_name, file_id
|
|
||||||
)
|
|
||||||
fname = os.path.join(self.primary_base_path, fpath)
|
|
||||||
self._makedirs(fname)
|
|
||||||
|
|
||||||
try:
|
# If we have an entry in the DB, try and look for it
|
||||||
with open(fname, "wb") as f:
|
if media_info:
|
||||||
|
if media_info["quarantined_by"]:
|
||||||
|
raise NotFoundError()
|
||||||
|
|
||||||
|
responder = yield self.media_storage.fetch_media(file_info)
|
||||||
|
if responder:
|
||||||
|
defer.returnValue((responder, media_info))
|
||||||
|
|
||||||
|
# Failed to find the file anywhere, lets download it.
|
||||||
|
|
||||||
|
media_info = yield self._download_remote_file(
|
||||||
|
server_name, media_id, file_id
|
||||||
|
)
|
||||||
|
|
||||||
|
responder = yield self.media_storage.fetch_media(file_info)
|
||||||
|
defer.returnValue((responder, media_info))
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _download_remote_file(self, server_name, media_id, file_id):
|
||||||
|
"""Attempt to download the remote file from the given server name,
|
||||||
|
using the given file_id as the local id.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
server_name (str): Originating server
|
||||||
|
media_id (str): The media ID of the content (as defined by the
|
||||||
|
remote server). This is different than the file_id, which is
|
||||||
|
locally generated.
|
||||||
|
file_id (str): Local file ID
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[MediaInfo]
|
||||||
|
"""
|
||||||
|
|
||||||
|
file_info = FileInfo(
|
||||||
|
server_name=server_name,
|
||||||
|
file_id=file_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
with self.media_storage.store_into_file(file_info) as (f, fname, finish):
|
||||||
request_path = "/".join((
|
request_path = "/".join((
|
||||||
"/_matrix/media/v1/download", server_name, media_id,
|
"/_matrix/media/v1/download", server_name, media_id,
|
||||||
))
|
))
|
||||||
|
@ -268,9 +331,10 @@ class MediaRepository(object):
|
||||||
server_name, media_id)
|
server_name, media_id)
|
||||||
raise SynapseError(502, "Failed to fetch remote media")
|
raise SynapseError(502, "Failed to fetch remote media")
|
||||||
|
|
||||||
yield self.copy_to_backup(fpath)
|
yield finish()
|
||||||
|
|
||||||
media_type = headers["Content-Type"][0]
|
media_type = headers["Content-Type"][0]
|
||||||
|
|
||||||
time_now_ms = self.clock.time_msec()
|
time_now_ms = self.clock.time_msec()
|
||||||
|
|
||||||
content_disposition = headers.get("Content-Disposition", None)
|
content_disposition = headers.get("Content-Disposition", None)
|
||||||
|
@ -310,9 +374,6 @@ class MediaRepository(object):
|
||||||
media_length=length,
|
media_length=length,
|
||||||
filesystem_id=file_id,
|
filesystem_id=file_id,
|
||||||
)
|
)
|
||||||
except Exception:
|
|
||||||
os.remove(fname)
|
|
||||||
raise
|
|
||||||
|
|
||||||
media_info = {
|
media_info = {
|
||||||
"media_type": media_type,
|
"media_type": media_type,
|
||||||
|
@ -368,11 +429,18 @@ class MediaRepository(object):
|
||||||
|
|
||||||
if t_byte_source:
|
if t_byte_source:
|
||||||
try:
|
try:
|
||||||
output_path = yield self.write_to_file_and_backup(
|
file_info = FileInfo(
|
||||||
t_byte_source,
|
server_name=None,
|
||||||
self.filepaths.local_media_thumbnail_rel(
|
file_id=media_id,
|
||||||
media_id, t_width, t_height, t_type, t_method
|
thumbnail=True,
|
||||||
|
thumbnail_width=t_width,
|
||||||
|
thumbnail_height=t_height,
|
||||||
|
thumbnail_method=t_method,
|
||||||
|
thumbnail_type=t_type,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
output_path = yield self.media_storage.store_file(
|
||||||
|
t_byte_source, file_info,
|
||||||
)
|
)
|
||||||
finally:
|
finally:
|
||||||
t_byte_source.close()
|
t_byte_source.close()
|
||||||
|
@ -400,11 +468,18 @@ class MediaRepository(object):
|
||||||
|
|
||||||
if t_byte_source:
|
if t_byte_source:
|
||||||
try:
|
try:
|
||||||
output_path = yield self.write_to_file_and_backup(
|
file_info = FileInfo(
|
||||||
t_byte_source,
|
server_name=server_name,
|
||||||
self.filepaths.remote_media_thumbnail_rel(
|
file_id=media_id,
|
||||||
server_name, file_id, t_width, t_height, t_type, t_method
|
thumbnail=True,
|
||||||
|
thumbnail_width=t_width,
|
||||||
|
thumbnail_height=t_height,
|
||||||
|
thumbnail_method=t_method,
|
||||||
|
thumbnail_type=t_type,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
output_path = yield self.media_storage.store_file(
|
||||||
|
t_byte_source, file_info,
|
||||||
)
|
)
|
||||||
finally:
|
finally:
|
||||||
t_byte_source.close()
|
t_byte_source.close()
|
||||||
|
@ -472,20 +547,6 @@ class MediaRepository(object):
|
||||||
|
|
||||||
# Now we generate the thumbnails for each dimension, store it
|
# Now we generate the thumbnails for each dimension, store it
|
||||||
for (t_width, t_height, t_type), t_method in thumbnails.iteritems():
|
for (t_width, t_height, t_type), t_method in thumbnails.iteritems():
|
||||||
# Work out the correct file name for thumbnail
|
|
||||||
if server_name:
|
|
||||||
file_path = self.filepaths.remote_media_thumbnail_rel(
|
|
||||||
server_name, file_id, t_width, t_height, t_type, t_method
|
|
||||||
)
|
|
||||||
elif url_cache:
|
|
||||||
file_path = self.filepaths.url_cache_thumbnail_rel(
|
|
||||||
media_id, t_width, t_height, t_type, t_method
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
file_path = self.filepaths.local_media_thumbnail_rel(
|
|
||||||
media_id, t_width, t_height, t_type, t_method
|
|
||||||
)
|
|
||||||
|
|
||||||
# Generate the thumbnail
|
# Generate the thumbnail
|
||||||
if t_method == "crop":
|
if t_method == "crop":
|
||||||
t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
|
t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
|
||||||
|
@ -505,9 +566,19 @@ class MediaRepository(object):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Write to disk
|
file_info = FileInfo(
|
||||||
output_path = yield self.write_to_file_and_backup(
|
server_name=server_name,
|
||||||
t_byte_source, file_path,
|
file_id=media_id,
|
||||||
|
thumbnail=True,
|
||||||
|
thumbnail_width=t_width,
|
||||||
|
thumbnail_height=t_height,
|
||||||
|
thumbnail_method=t_method,
|
||||||
|
thumbnail_type=t_type,
|
||||||
|
url_cache=url_cache,
|
||||||
|
)
|
||||||
|
|
||||||
|
output_path = yield self.media_storage.store_file(
|
||||||
|
t_byte_source, file_info,
|
||||||
)
|
)
|
||||||
finally:
|
finally:
|
||||||
t_byte_source.close()
|
t_byte_source.close()
|
||||||
|
@ -620,7 +691,11 @@ class MediaRepositoryResource(Resource):
|
||||||
|
|
||||||
self.putChild("upload", UploadResource(hs, media_repo))
|
self.putChild("upload", UploadResource(hs, media_repo))
|
||||||
self.putChild("download", DownloadResource(hs, media_repo))
|
self.putChild("download", DownloadResource(hs, media_repo))
|
||||||
self.putChild("thumbnail", ThumbnailResource(hs, media_repo))
|
self.putChild("thumbnail", ThumbnailResource(
|
||||||
|
hs, media_repo, media_repo.media_storage,
|
||||||
|
))
|
||||||
self.putChild("identicon", IdenticonResource())
|
self.putChild("identicon", IdenticonResource())
|
||||||
if hs.config.url_preview_enabled:
|
if hs.config.url_preview_enabled:
|
||||||
self.putChild("preview_url", PreviewUrlResource(hs, media_repo))
|
self.putChild("preview_url", PreviewUrlResource(
|
||||||
|
hs, media_repo, media_repo.media_storage,
|
||||||
|
))
|
||||||
|
|
228
synapse/rest/media/v1/media_storage.py
Normal file
228
synapse/rest/media/v1/media_storage.py
Normal file
|
@ -0,0 +1,228 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2018 New Vecotr Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from twisted.internet import defer, threads
|
||||||
|
from twisted.protocols.basic import FileSender
|
||||||
|
|
||||||
|
from ._base import Responder
|
||||||
|
|
||||||
|
from synapse.util.logcontext import make_deferred_yieldable
|
||||||
|
|
||||||
|
import contextlib
|
||||||
|
import os
|
||||||
|
import logging
|
||||||
|
import shutil
|
||||||
|
import sys
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class MediaStorage(object):
|
||||||
|
"""Responsible for storing/fetching files from local sources.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
local_media_directory (str): Base path where we store media on disk
|
||||||
|
filepaths (MediaFilePaths)
|
||||||
|
storage_providers ([StorageProvider]): List of StorageProvider that are
|
||||||
|
used to fetch and store files.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, local_media_directory, filepaths, storage_providers):
|
||||||
|
self.local_media_directory = local_media_directory
|
||||||
|
self.filepaths = filepaths
|
||||||
|
self.storage_providers = storage_providers
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def store_file(self, source, file_info):
|
||||||
|
"""Write `source` to the on disk media store, and also any other
|
||||||
|
configured storage providers
|
||||||
|
|
||||||
|
Args:
|
||||||
|
source: A file like object that should be written
|
||||||
|
file_info (FileInfo): Info about the file to store
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[str]: the file path written to in the primary media store
|
||||||
|
"""
|
||||||
|
path = self._file_info_to_path(file_info)
|
||||||
|
fname = os.path.join(self.local_media_directory, path)
|
||||||
|
|
||||||
|
dirname = os.path.dirname(fname)
|
||||||
|
if not os.path.exists(dirname):
|
||||||
|
os.makedirs(dirname)
|
||||||
|
|
||||||
|
# Write to the main repository
|
||||||
|
yield make_deferred_yieldable(threads.deferToThread(
|
||||||
|
_write_file_synchronously, source, fname,
|
||||||
|
))
|
||||||
|
|
||||||
|
defer.returnValue(fname)
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def store_into_file(self, file_info):
|
||||||
|
"""Context manager used to get a file like object to write into, as
|
||||||
|
described by file_info.
|
||||||
|
|
||||||
|
Actually yields a 3-tuple (file, fname, finish_cb), where file is a file
|
||||||
|
like object that can be written to, fname is the absolute path of file
|
||||||
|
on disk, and finish_cb is a function that returns a Deferred.
|
||||||
|
|
||||||
|
fname can be used to read the contents from after upload, e.g. to
|
||||||
|
generate thumbnails.
|
||||||
|
|
||||||
|
finish_cb must be called and waited on after the file has been
|
||||||
|
successfully been written to. Should not be called if there was an
|
||||||
|
error.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_info (FileInfo): Info about the file to store
|
||||||
|
|
||||||
|
Example:
|
||||||
|
|
||||||
|
with media_storage.store_into_file(info) as (f, fname, finish_cb):
|
||||||
|
# .. write into f ...
|
||||||
|
yield finish_cb()
|
||||||
|
"""
|
||||||
|
|
||||||
|
path = self._file_info_to_path(file_info)
|
||||||
|
fname = os.path.join(self.local_media_directory, path)
|
||||||
|
|
||||||
|
dirname = os.path.dirname(fname)
|
||||||
|
if not os.path.exists(dirname):
|
||||||
|
os.makedirs(dirname)
|
||||||
|
|
||||||
|
finished_called = [False]
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def finish():
|
||||||
|
for provider in self.storage_providers:
|
||||||
|
yield provider.store_file(path, file_info)
|
||||||
|
|
||||||
|
finished_called[0] = True
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(fname, "wb") as f:
|
||||||
|
yield f, fname, finish
|
||||||
|
except Exception:
|
||||||
|
t, v, tb = sys.exc_info()
|
||||||
|
try:
|
||||||
|
os.remove(fname)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
raise t, v, tb
|
||||||
|
|
||||||
|
if not finished_called:
|
||||||
|
raise Exception("Finished callback not called")
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def fetch_media(self, file_info):
|
||||||
|
"""Attempts to fetch media described by file_info from the local cache
|
||||||
|
and configured storage providers.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_info (FileInfo)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[Responder|None]: Returns a Responder if the file was found,
|
||||||
|
otherwise None.
|
||||||
|
"""
|
||||||
|
|
||||||
|
path = self._file_info_to_path(file_info)
|
||||||
|
local_path = os.path.join(self.local_media_directory, path)
|
||||||
|
if os.path.exists(local_path):
|
||||||
|
defer.returnValue(FileResponder(open(local_path, "rb")))
|
||||||
|
|
||||||
|
for provider in self.storage_providers:
|
||||||
|
res = yield provider.fetch(path, file_info)
|
||||||
|
if res:
|
||||||
|
defer.returnValue(res)
|
||||||
|
|
||||||
|
defer.returnValue(None)
|
||||||
|
|
||||||
|
def _file_info_to_path(self, file_info):
|
||||||
|
"""Converts file_info into a relative path.
|
||||||
|
|
||||||
|
The path is suitable for storing files under a directory, e.g. used to
|
||||||
|
store files on local FS under the base media repository directory.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_info (FileInfo)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str
|
||||||
|
"""
|
||||||
|
if file_info.url_cache:
|
||||||
|
return self.filepaths.url_cache_filepath_rel(file_info.file_id)
|
||||||
|
|
||||||
|
if file_info.server_name:
|
||||||
|
if file_info.thumbnail:
|
||||||
|
return self.filepaths.remote_media_thumbnail_rel(
|
||||||
|
server_name=file_info.server_name,
|
||||||
|
file_id=file_info.file_id,
|
||||||
|
width=file_info.thumbnail_width,
|
||||||
|
height=file_info.thumbnail_height,
|
||||||
|
content_type=file_info.thumbnail_type,
|
||||||
|
method=file_info.thumbnail_method
|
||||||
|
)
|
||||||
|
return self.filepaths.remote_media_filepath_rel(
|
||||||
|
file_info.server_name, file_info.file_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
if file_info.thumbnail:
|
||||||
|
return self.filepaths.local_media_thumbnail_rel(
|
||||||
|
media_id=file_info.file_id,
|
||||||
|
width=file_info.thumbnail_width,
|
||||||
|
height=file_info.thumbnail_height,
|
||||||
|
content_type=file_info.thumbnail_type,
|
||||||
|
method=file_info.thumbnail_method
|
||||||
|
)
|
||||||
|
return self.filepaths.local_media_filepath_rel(
|
||||||
|
file_info.file_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _write_file_synchronously(source, fname):
|
||||||
|
"""Write `source` to the path `fname` synchronously. Should be called
|
||||||
|
from a thread.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
source: A file like object to be written
|
||||||
|
fname (str): Path to write to
|
||||||
|
"""
|
||||||
|
dirname = os.path.dirname(fname)
|
||||||
|
if not os.path.exists(dirname):
|
||||||
|
os.makedirs(dirname)
|
||||||
|
|
||||||
|
source.seek(0) # Ensure we read from the start of the file
|
||||||
|
with open(fname, "wb") as f:
|
||||||
|
shutil.copyfileobj(source, f)
|
||||||
|
|
||||||
|
|
||||||
|
class FileResponder(Responder):
|
||||||
|
"""Wraps an open file that can be sent to a request.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
open_file (file): A file like object to be streamed ot the client,
|
||||||
|
is closed when finished streaming.
|
||||||
|
"""
|
||||||
|
def __init__(self, open_file):
|
||||||
|
self.open_file = open_file
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def write_to_consumer(self, consumer):
|
||||||
|
yield FileSender().beginFileTransfer(self.open_file, consumer)
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
self.open_file.close()
|
|
@ -17,6 +17,8 @@ from twisted.web.server import NOT_DONE_YET
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
from twisted.web.resource import Resource
|
from twisted.web.resource import Resource
|
||||||
|
|
||||||
|
from ._base import FileInfo
|
||||||
|
|
||||||
from synapse.api.errors import (
|
from synapse.api.errors import (
|
||||||
SynapseError, Codes,
|
SynapseError, Codes,
|
||||||
)
|
)
|
||||||
|
@ -49,7 +51,7 @@ logger = logging.getLogger(__name__)
|
||||||
class PreviewUrlResource(Resource):
|
class PreviewUrlResource(Resource):
|
||||||
isLeaf = True
|
isLeaf = True
|
||||||
|
|
||||||
def __init__(self, hs, media_repo):
|
def __init__(self, hs, media_repo, media_storage):
|
||||||
Resource.__init__(self)
|
Resource.__init__(self)
|
||||||
|
|
||||||
self.auth = hs.get_auth()
|
self.auth = hs.get_auth()
|
||||||
|
@ -62,6 +64,7 @@ class PreviewUrlResource(Resource):
|
||||||
self.client = SpiderHttpClient(hs)
|
self.client = SpiderHttpClient(hs)
|
||||||
self.media_repo = media_repo
|
self.media_repo = media_repo
|
||||||
self.primary_base_path = media_repo.primary_base_path
|
self.primary_base_path = media_repo.primary_base_path
|
||||||
|
self.media_storage = media_storage
|
||||||
|
|
||||||
self.url_preview_url_blacklist = hs.config.url_preview_url_blacklist
|
self.url_preview_url_blacklist = hs.config.url_preview_url_blacklist
|
||||||
|
|
||||||
|
@ -273,19 +276,21 @@ class PreviewUrlResource(Resource):
|
||||||
|
|
||||||
file_id = datetime.date.today().isoformat() + '_' + random_string(16)
|
file_id = datetime.date.today().isoformat() + '_' + random_string(16)
|
||||||
|
|
||||||
fpath = self.filepaths.url_cache_filepath_rel(file_id)
|
file_info = FileInfo(
|
||||||
fname = os.path.join(self.primary_base_path, fpath)
|
server_name=None,
|
||||||
self.media_repo._makedirs(fname)
|
file_id=file_id,
|
||||||
|
url_cache=True,
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with open(fname, "wb") as f:
|
with self.media_storage.store_into_file(file_info) as (f, fname, finish):
|
||||||
logger.debug("Trying to get url '%s'" % url)
|
logger.debug("Trying to get url '%s'" % url)
|
||||||
length, headers, uri, code = yield self.client.get_file(
|
length, headers, uri, code = yield self.client.get_file(
|
||||||
url, output_stream=f, max_size=self.max_spider_size,
|
url, output_stream=f, max_size=self.max_spider_size,
|
||||||
)
|
)
|
||||||
# FIXME: pass through 404s and other error messages nicely
|
# FIXME: pass through 404s and other error messages nicely
|
||||||
|
|
||||||
yield self.media_repo.copy_to_backup(fpath)
|
yield finish()
|
||||||
|
|
||||||
media_type = headers["Content-Type"][0]
|
media_type = headers["Content-Type"][0]
|
||||||
time_now_ms = self.clock.time_msec()
|
time_now_ms = self.clock.time_msec()
|
||||||
|
@ -327,7 +332,6 @@ class PreviewUrlResource(Resource):
|
||||||
)
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
os.remove(fname)
|
|
||||||
raise SynapseError(
|
raise SynapseError(
|
||||||
500, ("Failed to download content: %s" % e),
|
500, ("Failed to download content: %s" % e),
|
||||||
Codes.UNKNOWN
|
Codes.UNKNOWN
|
||||||
|
|
127
synapse/rest/media/v1/storage_provider.py
Normal file
127
synapse/rest/media/v1/storage_provider.py
Normal file
|
@ -0,0 +1,127 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2018 New Vector Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from twisted.internet import defer, threads
|
||||||
|
|
||||||
|
from .media_storage import FileResponder
|
||||||
|
|
||||||
|
from synapse.util.logcontext import preserve_fn
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class StorageProvider(object):
|
||||||
|
"""A storage provider is a service that can store uploaded media and
|
||||||
|
retrieve them.
|
||||||
|
"""
|
||||||
|
def store_file(self, path, file_info):
|
||||||
|
"""Store the file described by file_info. The actual contents can be
|
||||||
|
retrieved by reading the file in file_info.upload_path.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
path (str): Relative path of file in local cache
|
||||||
|
file_info (FileInfo)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
def fetch(self, path, file_info):
|
||||||
|
"""Attempt to fetch the file described by file_info and stream it
|
||||||
|
into writer.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
path (str): Relative path of file in local cache
|
||||||
|
file_info (FileInfo)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred(Responder): Returns a Responder if the provider has the file,
|
||||||
|
otherwise returns None.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class StorageProviderWrapper(StorageProvider):
|
||||||
|
"""Wraps a storage provider and provides various config options
|
||||||
|
|
||||||
|
Args:
|
||||||
|
backend (StorageProvider)
|
||||||
|
store (bool): Whether to store new files or not.
|
||||||
|
store_synchronous (bool): Whether to wait for file to be successfully
|
||||||
|
uploaded, or todo the upload in the backgroud.
|
||||||
|
store_remote (bool): Whether remote media should be uploaded
|
||||||
|
"""
|
||||||
|
def __init__(self, backend, store, store_synchronous, store_remote):
|
||||||
|
self.backend = backend
|
||||||
|
self.store = store
|
||||||
|
self.store_synchronous = store_synchronous
|
||||||
|
self.store_remote = store_remote
|
||||||
|
|
||||||
|
def store_file(self, path, file_info):
|
||||||
|
if not self.store:
|
||||||
|
return defer.succeed(None)
|
||||||
|
|
||||||
|
if file_info.server_name and not self.store_remote:
|
||||||
|
return defer.succeed(None)
|
||||||
|
|
||||||
|
if self.store_synchronous:
|
||||||
|
return self.backend.store_file(path, file_info)
|
||||||
|
else:
|
||||||
|
# TODO: Handle errors.
|
||||||
|
preserve_fn(self.backend.store_file)(path, file_info)
|
||||||
|
return defer.succeed(None)
|
||||||
|
|
||||||
|
def fetch(self, path, file_info):
|
||||||
|
return self.backend.fetch(path, file_info)
|
||||||
|
|
||||||
|
|
||||||
|
class FileStorageProviderBackend(StorageProvider):
|
||||||
|
"""A storage provider that stores files in a directory on a filesystem.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
cache_directory (str): Base path of the local media repository
|
||||||
|
base_directory (str): Base path to store new files
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, cache_directory, base_directory):
|
||||||
|
self.cache_directory = cache_directory
|
||||||
|
self.base_directory = base_directory
|
||||||
|
|
||||||
|
def store_file(self, path, file_info):
|
||||||
|
"""See StorageProvider.store_file"""
|
||||||
|
|
||||||
|
primary_fname = os.path.join(self.cache_directory, path)
|
||||||
|
backup_fname = os.path.join(self.base_directory, path)
|
||||||
|
|
||||||
|
dirname = os.path.dirname(backup_fname)
|
||||||
|
if not os.path.exists(dirname):
|
||||||
|
os.makedirs(dirname)
|
||||||
|
|
||||||
|
return threads.deferToThread(
|
||||||
|
shutil.copyfile, primary_fname, backup_fname,
|
||||||
|
)
|
||||||
|
|
||||||
|
def fetch(self, path, file_info):
|
||||||
|
"""See StorageProvider.fetch"""
|
||||||
|
|
||||||
|
backup_fname = os.path.join(self.base_directory, path)
|
||||||
|
if os.path.isfile(backup_fname):
|
||||||
|
return FileResponder(open(backup_fname, "rb"))
|
|
@ -14,7 +14,10 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
|
||||||
from ._base import parse_media_id, respond_404, respond_with_file
|
from ._base import (
|
||||||
|
parse_media_id, respond_404, respond_with_file, FileInfo,
|
||||||
|
respond_with_responder,
|
||||||
|
)
|
||||||
from twisted.web.resource import Resource
|
from twisted.web.resource import Resource
|
||||||
from synapse.http.servlet import parse_string, parse_integer
|
from synapse.http.servlet import parse_string, parse_integer
|
||||||
from synapse.http.server import request_handler, set_cors_headers
|
from synapse.http.server import request_handler, set_cors_headers
|
||||||
|
@ -30,12 +33,12 @@ logger = logging.getLogger(__name__)
|
||||||
class ThumbnailResource(Resource):
|
class ThumbnailResource(Resource):
|
||||||
isLeaf = True
|
isLeaf = True
|
||||||
|
|
||||||
def __init__(self, hs, media_repo):
|
def __init__(self, hs, media_repo, media_storage):
|
||||||
Resource.__init__(self)
|
Resource.__init__(self)
|
||||||
|
|
||||||
self.store = hs.get_datastore()
|
self.store = hs.get_datastore()
|
||||||
self.filepaths = media_repo.filepaths
|
|
||||||
self.media_repo = media_repo
|
self.media_repo = media_repo
|
||||||
|
self.media_storage = media_storage
|
||||||
self.dynamic_thumbnails = hs.config.dynamic_thumbnails
|
self.dynamic_thumbnails = hs.config.dynamic_thumbnails
|
||||||
self.server_name = hs.hostname
|
self.server_name = hs.hostname
|
||||||
self.version_string = hs.version_string
|
self.version_string = hs.version_string
|
||||||
|
@ -91,23 +94,22 @@ class ThumbnailResource(Resource):
|
||||||
thumbnail_info = self._select_thumbnail(
|
thumbnail_info = self._select_thumbnail(
|
||||||
width, height, method, m_type, thumbnail_infos
|
width, height, method, m_type, thumbnail_infos
|
||||||
)
|
)
|
||||||
t_width = thumbnail_info["thumbnail_width"]
|
|
||||||
t_height = thumbnail_info["thumbnail_height"]
|
|
||||||
t_type = thumbnail_info["thumbnail_type"]
|
|
||||||
t_method = thumbnail_info["thumbnail_method"]
|
|
||||||
|
|
||||||
if media_info["url_cache"]:
|
file_info = FileInfo(
|
||||||
# TODO: Check the file still exists, if it doesn't we can redownload
|
server_name=None, file_id=media_id,
|
||||||
# it from the url `media_info["url_cache"]`
|
url_cache=media_info["url_cache"],
|
||||||
file_path = self.filepaths.url_cache_thumbnail(
|
thumbnail=True,
|
||||||
media_id, t_width, t_height, t_type, t_method,
|
thumbnail_width=thumbnail_info["thumbnail_width"],
|
||||||
|
thumbnail_height=thumbnail_info["thumbnail_height"],
|
||||||
|
thumbnail_type=thumbnail_info["thumbnail_type"],
|
||||||
|
thumbnail_method=thumbnail_info["thumbnail_method"],
|
||||||
)
|
)
|
||||||
else:
|
|
||||||
file_path = self.filepaths.local_media_thumbnail(
|
|
||||||
media_id, t_width, t_height, t_type, t_method,
|
|
||||||
)
|
|
||||||
yield respond_with_file(request, t_type, file_path)
|
|
||||||
|
|
||||||
|
t_type = file_info.thumbnail_type
|
||||||
|
t_length = thumbnail_info["thumbnail_length"]
|
||||||
|
|
||||||
|
responder = yield self.media_storage.fetch_media(file_info)
|
||||||
|
yield respond_with_responder(request, responder, t_type, t_length)
|
||||||
else:
|
else:
|
||||||
respond_404(request)
|
respond_404(request)
|
||||||
|
|
||||||
|
@ -129,19 +131,22 @@ class ThumbnailResource(Resource):
|
||||||
t_type = info["thumbnail_type"] == desired_type
|
t_type = info["thumbnail_type"] == desired_type
|
||||||
|
|
||||||
if t_w and t_h and t_method and t_type:
|
if t_w and t_h and t_method and t_type:
|
||||||
if media_info["url_cache"]:
|
file_info = FileInfo(
|
||||||
# TODO: Check the file still exists, if it doesn't we can redownload
|
server_name=None, file_id=media_id,
|
||||||
# it from the url `media_info["url_cache"]`
|
url_cache=media_info["url_cache"],
|
||||||
file_path = self.filepaths.url_cache_thumbnail(
|
thumbnail=True,
|
||||||
media_id, desired_width, desired_height, desired_type,
|
thumbnail_width=info["thumbnail_width"],
|
||||||
desired_method,
|
thumbnail_height=info["thumbnail_height"],
|
||||||
|
thumbnail_type=info["thumbnail_type"],
|
||||||
|
thumbnail_method=info["thumbnail_method"],
|
||||||
)
|
)
|
||||||
else:
|
|
||||||
file_path = self.filepaths.local_media_thumbnail(
|
t_type = file_info.thumbnail_type
|
||||||
media_id, desired_width, desired_height, desired_type,
|
t_length = info["thumbnail_length"]
|
||||||
desired_method,
|
|
||||||
)
|
responder = yield self.media_storage.fetch_media(file_info)
|
||||||
yield respond_with_file(request, desired_type, file_path)
|
if responder:
|
||||||
|
yield respond_with_responder(request, responder, t_type, t_length)
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.debug("We don't have a local thumbnail of that size. Generating")
|
logger.debug("We don't have a local thumbnail of that size. Generating")
|
||||||
|
@ -175,11 +180,21 @@ class ThumbnailResource(Resource):
|
||||||
t_type = info["thumbnail_type"] == desired_type
|
t_type = info["thumbnail_type"] == desired_type
|
||||||
|
|
||||||
if t_w and t_h and t_method and t_type:
|
if t_w and t_h and t_method and t_type:
|
||||||
file_path = self.filepaths.remote_media_thumbnail(
|
file_info = FileInfo(
|
||||||
server_name, file_id, desired_width, desired_height,
|
server_name=None, file_id=media_id,
|
||||||
desired_type, desired_method,
|
thumbnail=True,
|
||||||
|
thumbnail_width=info["thumbnail_width"],
|
||||||
|
thumbnail_height=info["thumbnail_height"],
|
||||||
|
thumbnail_type=info["thumbnail_type"],
|
||||||
|
thumbnail_method=info["thumbnail_method"],
|
||||||
)
|
)
|
||||||
yield respond_with_file(request, desired_type, file_path)
|
|
||||||
|
t_type = file_info.thumbnail_type
|
||||||
|
t_length = info["thumbnail_length"]
|
||||||
|
|
||||||
|
responder = yield self.media_storage.fetch_media(file_info)
|
||||||
|
if responder:
|
||||||
|
yield respond_with_responder(request, responder, t_type, t_length)
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.debug("We don't have a local thumbnail of that size. Generating")
|
logger.debug("We don't have a local thumbnail of that size. Generating")
|
||||||
|
@ -211,17 +226,20 @@ class ThumbnailResource(Resource):
|
||||||
thumbnail_info = self._select_thumbnail(
|
thumbnail_info = self._select_thumbnail(
|
||||||
width, height, method, m_type, thumbnail_infos
|
width, height, method, m_type, thumbnail_infos
|
||||||
)
|
)
|
||||||
t_width = thumbnail_info["thumbnail_width"]
|
file_info = FileInfo(
|
||||||
t_height = thumbnail_info["thumbnail_height"]
|
server_name=None, file_id=media_id,
|
||||||
t_type = thumbnail_info["thumbnail_type"]
|
thumbnail=True,
|
||||||
t_method = thumbnail_info["thumbnail_method"]
|
thumbnail_width=thumbnail_info["thumbnail_width"],
|
||||||
file_id = thumbnail_info["filesystem_id"]
|
thumbnail_height=thumbnail_info["thumbnail_height"],
|
||||||
|
thumbnail_type=thumbnail_info["thumbnail_type"],
|
||||||
|
thumbnail_method=thumbnail_info["thumbnail_method"],
|
||||||
|
)
|
||||||
|
|
||||||
|
t_type = file_info.thumbnail_type
|
||||||
t_length = thumbnail_info["thumbnail_length"]
|
t_length = thumbnail_info["thumbnail_length"]
|
||||||
|
|
||||||
file_path = self.filepaths.remote_media_thumbnail(
|
responder = yield self.media_storage.fetch_media(file_info)
|
||||||
server_name, file_id, t_width, t_height, t_type, t_method,
|
yield respond_with_responder(request, responder, t_type, t_length)
|
||||||
)
|
|
||||||
yield respond_with_file(request, t_type, file_path, t_length)
|
|
||||||
else:
|
else:
|
||||||
respond_404(request)
|
respond_404(request)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue