2020-06-23 04:11:56 +03:00
package storageproviders
2020-06-03 11:34:05 +03:00
import (
2020-07-28 23:17:39 +03:00
"fmt"
2022-06-12 04:21:11 +03:00
"net/http"
2020-06-03 11:34:05 +03:00
"os"
2023-03-16 21:07:42 +03:00
"path"
2020-10-15 00:07:38 +03:00
"path/filepath"
2023-05-31 21:10:04 +03:00
"sort"
2021-10-06 02:45:39 +03:00
"strings"
2022-06-12 04:21:11 +03:00
"time"
2020-06-03 11:34:05 +03:00
2023-08-10 02:19:09 +03:00
"github.com/owncast/owncast/config"
2021-02-19 10:05:52 +03:00
"github.com/owncast/owncast/core/data"
2020-10-15 00:07:38 +03:00
"github.com/owncast/owncast/utils"
2023-05-31 21:10:04 +03:00
"github.com/pkg/errors"
2020-06-03 11:34:05 +03:00
log "github.com/sirupsen/logrus"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
2023-05-31 21:10:04 +03:00
"github.com/aws/aws-sdk-go/service/s3"
2020-06-03 11:34:05 +03:00
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)
2021-09-12 10:18:15 +03:00
// S3Storage is the s3 implementation of a storage provider.
2020-06-03 11:34:05 +03:00
type S3Storage struct {
2023-08-10 02:19:09 +03:00
streamId string
2023-05-31 21:10:04 +03:00
sess * session . Session
s3Client * s3 . S3
2023-05-30 20:31:43 +03:00
host string
2020-06-03 11:34:05 +03:00
2020-07-28 23:17:39 +03:00
s3Endpoint string
s3ServingEndpoint string
s3Region string
s3Bucket string
s3AccessKey string
s3Secret string
2020-10-04 00:35:03 +03:00
s3ACL string
2023-08-04 20:29:02 +03:00
s3PathPrefix string
2021-10-29 03:27:44 +03:00
s3ForcePathStyle bool
2023-05-31 21:10:04 +03:00
// If we try to upload a playlist but it is not yet on disk
// then keep a reference to it here.
queuedPlaylistUpdates map [ string ] string
uploader * s3manager . Uploader
2020-06-03 11:34:05 +03:00
}
2021-10-12 02:29:36 +03:00
// NewS3Storage returns a new S3Storage instance.
func NewS3Storage ( ) * S3Storage {
return & S3Storage {
queuedPlaylistUpdates : make ( map [ string ] string ) ,
}
}
2020-10-15 00:07:38 +03:00
2023-08-10 02:19:09 +03:00
// SetStreamID sets the stream id for this storage provider.
func ( s * S3Storage ) SetStreamId ( streamId string ) {
s . streamId = streamId
}
2020-11-13 02:14:59 +03:00
// Setup sets up the s3 storage for saving the video to s3.
2020-06-23 04:11:56 +03:00
func ( s * S3Storage ) Setup ( ) error {
2020-07-07 07:27:31 +03:00
log . Trace ( "Setting up S3 for external storage of video..." )
2020-06-03 11:34:05 +03:00
2021-02-19 10:05:52 +03:00
s3Config := data . GetS3Config ( )
2023-05-31 00:05:24 +03:00
customVideoServingEndpoint := data . GetVideoServingEndpoint ( )
2023-05-31 21:10:04 +03:00
2023-05-31 00:05:24 +03:00
if customVideoServingEndpoint != "" {
s . host = customVideoServingEndpoint
2020-10-15 00:07:38 +03:00
} else {
2021-02-19 10:05:52 +03:00
s . host = fmt . Sprintf ( "%s/%s" , s3Config . Endpoint , s3Config . Bucket )
2020-10-15 00:07:38 +03:00
}
2021-02-19 10:05:52 +03:00
s . s3Endpoint = s3Config . Endpoint
s . s3ServingEndpoint = s3Config . ServingEndpoint
s . s3Region = s3Config . Region
s . s3Bucket = s3Config . Bucket
s . s3AccessKey = s3Config . AccessKey
s . s3Secret = s3Config . Secret
s . s3ACL = s3Config . ACL
2023-08-04 20:29:02 +03:00
s . s3PathPrefix = s3Config . PathPrefix
2021-10-29 03:27:44 +03:00
s . s3ForcePathStyle = s3Config . ForcePathStyle
2020-06-03 11:34:05 +03:00
s . sess = s . connectAWS ( )
2023-05-31 21:10:04 +03:00
s . s3Client = s3 . New ( s . sess )
2020-06-23 04:11:56 +03:00
2021-10-12 02:29:36 +03:00
s . uploader = s3manager . NewUploader ( s . sess )
2020-10-15 00:07:38 +03:00
2020-06-23 04:11:56 +03:00
return nil
2020-06-03 11:34:05 +03:00
}
2020-11-13 02:14:59 +03:00
// SegmentWritten is called when a single segment of video is written.
2023-08-10 02:19:09 +03:00
func ( s * S3Storage ) SegmentWritten ( localFilePath string ) ( string , int , error ) {
2020-10-15 00:07:38 +03:00
index := utils . GetIndexFromFilePath ( localFilePath )
performanceMonitorKey := "s3upload-" + index
utils . StartPerformanceMonitor ( performanceMonitorKey )
// Upload the segment
2023-08-10 02:19:09 +03:00
remoteDestinationPath := s . GetRemoteDestinationPathFromLocalFilePath ( localFilePath )
remotePath , err := s . Save ( localFilePath , remoteDestinationPath , 0 )
if err != nil {
2020-10-17 01:04:31 +03:00
log . Errorln ( err )
2023-08-10 02:19:09 +03:00
return "" , 0 , err
2020-10-15 00:07:38 +03:00
}
2023-08-10 02:19:09 +03:00
2020-10-15 00:07:38 +03:00
averagePerformance := utils . GetAveragePerformance ( performanceMonitorKey )
// Warn the user about long-running save operations
if averagePerformance != 0 {
2021-02-19 10:05:52 +03:00
if averagePerformance > float64 ( data . GetStreamLatencyLevel ( ) . SecondsPerSegment ) * 0.9 {
2020-11-21 01:11:19 +03:00
log . Warnln ( "Possible slow uploads: average upload S3 save duration" , averagePerformance , "s. troubleshoot this issue by visiting https://owncast.online/docs/troubleshooting/" )
2020-10-15 00:07:38 +03:00
}
}
// Upload the variant playlist for this segment
// so the segments and the HLS playlist referencing
// them are in sync.
2020-10-17 01:04:31 +03:00
playlistPath := filepath . Join ( filepath . Dir ( localFilePath ) , "stream.m3u8" )
2023-08-10 02:19:09 +03:00
playlistRemoteDestinationPath := s . GetRemoteDestinationPathFromLocalFilePath ( playlistPath )
2023-08-21 07:13:59 +03:00
2023-08-10 02:19:09 +03:00
if _ , err := s . Save ( playlistPath , playlistRemoteDestinationPath , 0 ) ; err != nil {
2021-10-12 02:29:36 +03:00
s . queuedPlaylistUpdates [ playlistPath ] = playlistPath
2020-10-17 01:04:31 +03:00
if pErr , ok := err . ( * os . PathError ) ; ok {
2020-10-15 00:07:38 +03:00
log . Debugln ( pErr . Path , "does not yet exist locally when trying to upload to S3 storage." )
2023-08-10 02:19:09 +03:00
return remotePath , 0 , pErr . Err
2020-10-15 00:07:38 +03:00
}
}
2023-08-10 02:19:09 +03:00
return remotePath , 0 , nil
2020-10-15 00:07:38 +03:00
}
2020-11-13 02:14:59 +03:00
// VariantPlaylistWritten is called when a variant hls playlist is written.
2020-10-15 00:07:38 +03:00
func ( s * S3Storage ) VariantPlaylistWritten ( localFilePath string ) {
// We are uploading the variant playlist after uploading the segment
2020-11-13 01:57:24 +03:00
// to make sure we're not referring to files in a playlist that don't
2020-10-15 00:07:38 +03:00
// yet exist. See SegmentWritten.
2021-10-12 02:29:36 +03:00
if _ , ok := s . queuedPlaylistUpdates [ localFilePath ] ; ok {
2023-08-10 02:19:09 +03:00
remoteDestinationPath := s . GetRemoteDestinationPathFromLocalFilePath ( localFilePath )
if _ , err := s . Save ( localFilePath , remoteDestinationPath , 0 ) ; err != nil {
2020-10-17 01:04:31 +03:00
log . Errorln ( err )
2021-10-12 02:29:36 +03:00
s . queuedPlaylistUpdates [ localFilePath ] = localFilePath
2020-10-15 00:07:38 +03:00
}
2021-10-12 02:29:36 +03:00
delete ( s . queuedPlaylistUpdates , localFilePath )
2020-10-15 00:07:38 +03:00
}
}
2020-11-13 02:14:59 +03:00
// MasterPlaylistWritten is called when the master hls playlist is written.
2020-10-15 00:07:38 +03:00
func ( s * S3Storage ) MasterPlaylistWritten ( localFilePath string ) {
// Rewrite the playlist to use absolute remote S3 URLs
2023-09-08 03:48:47 +03:00
pathPrefix := filepath . Join ( "hls" , s . streamId )
if s . s3PathPrefix != "" {
pathPrefix = filepath . Join ( s . s3PathPrefix , pathPrefix )
}
if err := rewriteRemotePlaylist ( localFilePath , s . host , pathPrefix ) ; err != nil {
2020-11-15 05:39:53 +03:00
log . Warnln ( err )
}
2020-10-15 00:07:38 +03:00
}
2020-06-03 11:34:05 +03:00
2020-11-13 02:14:59 +03:00
// Save saves the file to the s3 bucket.
2023-08-10 02:19:09 +03:00
func ( s * S3Storage ) Save ( localFilePath , remoteDestinationPath string , retryCount int ) ( string , error ) {
file , err := os . Open ( localFilePath ) // nolint
2020-06-03 11:34:05 +03:00
if err != nil {
2020-06-23 04:11:56 +03:00
return "" , err
2020-06-03 11:34:05 +03:00
}
2020-06-23 04:11:56 +03:00
defer file . Close ( )
2020-06-03 11:34:05 +03:00
2021-10-06 02:45:39 +03:00
// Convert the local path to the variant/file path by stripping the local storage location.
2023-09-08 03:48:47 +03:00
normalizedPath := strings . TrimPrefix ( localFilePath , config . HLSStoragePath )
2021-10-06 02:45:39 +03:00
// Build the remote path by adding the "hls" path prefix.
remotePath := strings . Join ( [ ] string { "hls" , normalizedPath } , "" )
2023-08-04 20:29:02 +03:00
// If a custom path prefix is set prepend it.
if s . s3PathPrefix != "" {
prefix := strings . TrimPrefix ( s . s3PathPrefix , "/" )
remotePath = strings . Join ( [ ] string { prefix , remotePath } , "/" )
}
2023-08-10 02:19:09 +03:00
maxAgeSeconds := utils . GetCacheDurationSecondsForPath ( localFilePath )
2021-05-23 23:35:05 +03:00
cacheControlHeader := fmt . Sprintf ( "max-age=%d" , maxAgeSeconds )
2023-03-16 21:07:42 +03:00
2020-07-28 07:41:51 +03:00
uploadInput := & s3manager . UploadInput {
2020-10-15 00:07:38 +03:00
Bucket : aws . String ( s . s3Bucket ) , // Bucket to be used
2021-10-06 02:45:39 +03:00
Key : aws . String ( remotePath ) , // Name of the file to be saved
2020-10-15 00:07:38 +03:00
Body : file , // File
CacheControl : & cacheControlHeader ,
2020-07-28 07:41:51 +03:00
}
2020-10-15 00:07:38 +03:00
2023-08-10 02:19:09 +03:00
if path . Ext ( localFilePath ) == ".m3u8" {
2023-03-16 21:07:42 +03:00
noCacheHeader := "no-cache, no-store, must-revalidate"
contentType := "application/x-mpegURL"
uploadInput . CacheControl = & noCacheHeader
uploadInput . ContentType = & contentType
}
2020-07-28 07:41:51 +03:00
if s . s3ACL != "" {
uploadInput . ACL = aws . String ( s . s3ACL )
2020-10-15 00:07:38 +03:00
} else {
// Default ACL
uploadInput . ACL = aws . String ( "public-read" )
2020-07-28 07:41:51 +03:00
}
2020-10-15 00:07:38 +03:00
2021-10-12 02:29:36 +03:00
response , err := s . uploader . Upload ( uploadInput )
2020-06-03 11:34:05 +03:00
if err != nil {
2022-02-26 02:22:52 +03:00
log . Traceln ( "error uploading segment" , err . Error ( ) )
2020-06-18 08:01:53 +03:00
if retryCount < 4 {
2020-10-15 00:07:38 +03:00
log . Traceln ( "Retrying..." )
2023-08-10 02:19:09 +03:00
return s . Save ( localFilePath , remoteDestinationPath , retryCount + 1 )
2020-06-18 08:01:53 +03:00
}
2021-09-12 10:18:15 +03:00
2023-08-04 06:33:44 +03:00
// Upload failure. Remove the local file.
2023-08-10 02:19:09 +03:00
s . removeLocalFile ( localFilePath )
2023-08-04 06:33:44 +03:00
2023-08-10 02:19:09 +03:00
return "" , fmt . Errorf ( "Giving up uploading %s to object storage %s" , localFilePath , s . s3Endpoint )
2020-06-03 11:34:05 +03:00
}
2023-08-04 06:33:44 +03:00
// Upload success. Remove the local file.
2023-08-10 02:19:09 +03:00
s . removeLocalFile ( localFilePath )
2023-08-04 06:33:44 +03:00
2020-06-23 04:11:56 +03:00
return response . Location , nil
2020-06-03 11:34:05 +03:00
}
2023-05-31 21:10:04 +03:00
func ( s * S3Storage ) Cleanup ( ) error {
2023-08-10 02:19:09 +03:00
// If we're recording, don't perform the cleanup.
2023-08-10 02:35:42 +03:00
if config . EnableReplayFeatures {
2023-08-10 02:19:09 +03:00
return nil
}
2023-05-31 21:10:04 +03:00
// Determine how many files we should keep on S3 storage
maxNumber := data . GetStreamLatencyLevel ( ) . SegmentCount
buffer := 20
keys , err := s . getDeletableVideoSegmentsWithOffset ( maxNumber + buffer )
if err != nil {
return err
}
2023-07-25 01:12:04 +03:00
if len ( keys ) > 0 {
s . deleteObjects ( keys )
}
2023-05-31 21:10:04 +03:00
return nil
}
2020-10-15 00:07:38 +03:00
func ( s * S3Storage ) connectAWS ( ) * session . Session {
2022-06-12 04:21:11 +03:00
t := http . DefaultTransport . ( * http . Transport ) . Clone ( )
t . MaxIdleConnsPerHost = 100
httpClient := & http . Client {
Timeout : 10 * time . Second ,
Transport : t ,
}
2020-06-03 11:34:05 +03:00
creds := credentials . NewStaticCredentials ( s . s3AccessKey , s . s3Secret , "" )
_ , err := creds . Get ( )
if err != nil {
2020-06-18 09:01:49 +03:00
log . Panicln ( err )
2020-06-03 11:34:05 +03:00
}
sess , err := session . NewSession (
& aws . Config {
2022-06-12 04:21:11 +03:00
HTTPClient : httpClient ,
2021-10-29 03:27:44 +03:00
Region : aws . String ( s . s3Region ) ,
Credentials : creds ,
Endpoint : aws . String ( s . s3Endpoint ) ,
S3ForcePathStyle : aws . Bool ( s . s3ForcePathStyle ) ,
2020-06-03 11:34:05 +03:00
} ,
)
if err != nil {
2020-06-18 09:01:49 +03:00
log . Panicln ( err )
2020-06-03 11:34:05 +03:00
}
return sess
}
2023-05-31 21:10:04 +03:00
func ( s * S3Storage ) getDeletableVideoSegmentsWithOffset ( offset int ) ( [ ] s3object , error ) {
objectsToDelete , err := s . retrieveAllVideoSegments ( )
if err != nil {
return nil , err
}
2023-07-25 01:12:04 +03:00
if offset > len ( objectsToDelete ) - 1 {
offset = len ( objectsToDelete ) - 1
}
2023-05-31 21:10:04 +03:00
objectsToDelete = objectsToDelete [ offset : len ( objectsToDelete ) - 1 ]
return objectsToDelete , nil
}
2023-08-04 06:33:44 +03:00
func ( s * S3Storage ) removeLocalFile ( filePath string ) {
cleanFilepath := filepath . Clean ( filePath )
if err := os . Remove ( cleanFilepath ) ; err != nil {
2023-08-27 23:56:20 +03:00
log . Debugln ( err )
2023-08-04 06:33:44 +03:00
}
}
2023-05-31 21:10:04 +03:00
func ( s * S3Storage ) deleteObjects ( objects [ ] s3object ) {
keys := make ( [ ] * s3 . ObjectIdentifier , len ( objects ) )
for i , object := range objects {
keys [ i ] = & s3 . ObjectIdentifier { Key : aws . String ( object . key ) }
}
log . Debugln ( "Deleting" , len ( keys ) , "objects from S3 bucket:" , s . s3Bucket )
deleteObjectsRequest := & s3 . DeleteObjectsInput {
Bucket : aws . String ( s . s3Bucket ) ,
Delete : & s3 . Delete {
Objects : keys ,
Quiet : aws . Bool ( true ) ,
} ,
}
_ , err := s . s3Client . DeleteObjects ( deleteObjectsRequest )
if err != nil {
log . Errorf ( "Unable to delete objects from bucket %q, %v\n" , s . s3Bucket , err )
}
}
func ( s * S3Storage ) retrieveAllVideoSegments ( ) ( [ ] s3object , error ) {
allObjectsListRequest := & s3 . ListObjectsInput {
Bucket : aws . String ( s . s3Bucket ) ,
}
// Fetch all objects in the bucket
allObjectsListResponse , err := s . s3Client . ListObjects ( allObjectsListRequest )
if err != nil {
return nil , errors . Wrap ( err , "Unable to fetch list of items in bucket for cleanup" )
}
// Filter out non-video segments
allObjects := [ ] s3object { }
for _ , item := range allObjectsListResponse . Contents {
if ! strings . HasSuffix ( * item . Key , ".ts" ) {
continue
}
allObjects = append ( allObjects , s3object {
key : * item . Key ,
lastModified : * item . LastModified ,
} )
}
// Sort the results by timestamp
sort . Slice ( allObjects , func ( i , j int ) bool {
return allObjects [ i ] . lastModified . After ( allObjects [ j ] . lastModified )
} )
return allObjects , nil
}
2023-08-10 02:19:09 +03:00
func ( s * S3Storage ) GetRemoteDestinationPathFromLocalFilePath ( localFilePath string ) string {
// Convert the local path to the variant/file path by stripping the local storage location.
normalizedPath := strings . TrimPrefix ( localFilePath , config . HLSStoragePath )
// Build the remote path by adding the "hls" path prefix.
remoteDestionationPath := strings . Join ( [ ] string { "hls" , normalizedPath } , "" )
return remoteDestionationPath
}
2023-05-31 21:10:04 +03:00
type s3object struct {
key string
lastModified time . Time
}