mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2024-11-21 16:55:38 +03:00
[bugfix] media.Processor{}.GetFile() returning 404s on first call, correctly loading on 2nd (#3129)
* refactor file handling a tiny bit * whoops * make processing media / emoji defers a bit clear to see that it's the "on finished processing" path * some wording * add some debug logging * add mutex locks for processing remote media * try removing freshness check * fix derefMedia not being allocated * fix log format string * handle case of empty file paths (i.e. not stored) * remove media / emoji once finished processing from dereferencer maps * whoops, fix the cached / force checks * move url parsing outside of 'process___Safely()' funcs to prevalidate url * use emoji.ShortcodeDomain() * update RefreshEmoji() to also match RefreshMedia() changes --------- Co-authored-by: tobi <tobi.smethurst@protonmail.com>
This commit is contained in:
parent
5338825d2b
commit
31294f7c78
9 changed files with 385 additions and 293 deletions
|
@ -59,12 +59,19 @@ func (c *Cleaner) Media() *Media {
|
|||
|
||||
// haveFiles returns whether all of the provided files exist within current storage.
|
||||
func (c *Cleaner) haveFiles(ctx context.Context, files ...string) (bool, error) {
|
||||
for _, file := range files {
|
||||
for _, path := range files {
|
||||
if path == "" {
|
||||
// File not stored.
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Check whether each file exists in storage.
|
||||
have, err := c.state.Storage.Has(ctx, file)
|
||||
have, err := c.state.Storage.Has(ctx, path)
|
||||
if err != nil {
|
||||
return false, gtserror.Newf("error checking storage for %s: %w", file, err)
|
||||
} else if !have {
|
||||
return false, gtserror.Newf("error checking storage for %s: %w", path, err)
|
||||
}
|
||||
|
||||
if !have {
|
||||
// Missing file(s).
|
||||
return false, nil
|
||||
}
|
||||
|
@ -80,29 +87,34 @@ func (c *Cleaner) removeFiles(ctx context.Context, files ...string) (int, error)
|
|||
}
|
||||
|
||||
var (
|
||||
errs gtserror.MultiError
|
||||
errCount int
|
||||
errs gtserror.MultiError
|
||||
count int
|
||||
)
|
||||
|
||||
for _, path := range files {
|
||||
if path == "" {
|
||||
// not stored.
|
||||
continue
|
||||
}
|
||||
|
||||
// Remove each provided storage path.
|
||||
log.Debugf(ctx, "removing file: %s", path)
|
||||
err := c.state.Storage.Delete(ctx, path)
|
||||
if err != nil && !storage.IsNotFound(err) {
|
||||
errs.Appendf("error removing %s: %w", path, err)
|
||||
errCount++
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate no. files removed.
|
||||
diff := len(files) - errCount
|
||||
// Incr.
|
||||
count++
|
||||
}
|
||||
|
||||
// Wrap the combined error slice.
|
||||
if err := errs.Combine(); err != nil {
|
||||
return diff, gtserror.Newf("error(s) removing files: %w", err)
|
||||
return count, gtserror.Newf("error(s) removing files: %w", err)
|
||||
}
|
||||
|
||||
return diff, nil
|
||||
return count, nil
|
||||
}
|
||||
|
||||
// ScheduleJobs schedules cleaning
|
||||
|
|
|
@ -97,7 +97,7 @@ func (m *Media) PruneOrphaned(ctx context.Context) (int, error) {
|
|||
if err := m.state.Storage.WalkKeys(ctx, func(path string) error {
|
||||
// Check for our expected fileserver path format.
|
||||
if !regexes.FilePath.MatchString(path) {
|
||||
log.Warn(ctx, "unexpected storage item: %s", path)
|
||||
log.Warnf(ctx, "unexpected storage item: %s", path)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -85,12 +85,9 @@ type Dereferencer struct {
|
|||
mediaManager *media.Manager
|
||||
visibility *visibility.Filter
|
||||
|
||||
// in-progress dereferencing emoji. we already perform
|
||||
// locks per-status and per-account so we don't need
|
||||
// processing maps for other media which won't often
|
||||
// end up being repeated. worst case we run into an
|
||||
// db.ErrAlreadyExists error which then gets handled
|
||||
// appropriately by enrich{Account,Status}Safely().
|
||||
// in-progress dereferencing media / emoji
|
||||
derefMedia map[string]*media.ProcessingMedia
|
||||
derefMediaMu sync.Mutex
|
||||
derefEmojis map[string]*media.ProcessingEmoji
|
||||
derefEmojisMu sync.Mutex
|
||||
|
||||
|
@ -119,6 +116,7 @@ func NewDereferencer(
|
|||
transportController: transportController,
|
||||
mediaManager: mediaManager,
|
||||
visibility: visFilter,
|
||||
derefMedia: make(map[string]*media.ProcessingMedia),
|
||||
derefEmojis: make(map[string]*media.ProcessingEmoji),
|
||||
handshakes: make(map[string][]*url.URL),
|
||||
}
|
||||
|
|
|
@ -77,32 +77,34 @@ func (d *Dereferencer) GetEmoji(
|
|||
// Generate shortcode domain for locks + logging.
|
||||
shortcodeDomain := shortcode + "@" + domain
|
||||
|
||||
// Ensure we have a valid remote URL.
|
||||
url, err := url.Parse(remoteURL)
|
||||
if err != nil {
|
||||
err := gtserror.Newf("invalid image remote url %s for emoji %s: %w", remoteURL, shortcodeDomain, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Acquire new instance account transport for emoji dereferencing.
|
||||
tsport, err := d.transportController.NewTransportForUsername(ctx, "")
|
||||
if err != nil {
|
||||
err := gtserror.Newf("error getting instance transport: %w", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Get maximum supported remote emoji size.
|
||||
maxsz := config.GetMediaEmojiRemoteMaxSize()
|
||||
|
||||
// Prepare data function to dereference remote emoji media.
|
||||
data := func(context.Context) (io.ReadCloser, error) {
|
||||
return tsport.DereferenceMedia(ctx, url, int64(maxsz))
|
||||
}
|
||||
|
||||
// Pass along for safe processing.
|
||||
return d.processEmojiSafely(ctx,
|
||||
shortcodeDomain,
|
||||
func() (*media.ProcessingEmoji, error) {
|
||||
|
||||
// Ensure we have a valid remote URL.
|
||||
url, err := url.Parse(remoteURL)
|
||||
if err != nil {
|
||||
err := gtserror.Newf("invalid image remote url %s for emoji %s: %w", remoteURL, shortcodeDomain, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Acquire new instance account transport for emoji dereferencing.
|
||||
tsport, err := d.transportController.NewTransportForUsername(ctx, "")
|
||||
if err != nil {
|
||||
err := gtserror.Newf("error getting instance transport: %w", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Get maximum supported remote emoji size.
|
||||
maxsz := config.GetMediaEmojiRemoteMaxSize()
|
||||
|
||||
// Prepare data function to dereference remote emoji media.
|
||||
data := func(context.Context) (io.ReadCloser, error) {
|
||||
return tsport.DereferenceMedia(ctx, url, int64(maxsz))
|
||||
}
|
||||
|
||||
// Create new emoji with prepared info.
|
||||
return d.mediaManager.CreateEmoji(ctx,
|
||||
shortcode,
|
||||
domain,
|
||||
|
@ -142,24 +144,25 @@ func (d *Dereferencer) RefreshEmoji(
|
|||
switch {
|
||||
case info.URI != nil &&
|
||||
*info.URI != emoji.URI:
|
||||
emoji.URI = *info.URI
|
||||
force = true
|
||||
case info.ImageRemoteURL != nil &&
|
||||
*info.ImageRemoteURL != emoji.ImageRemoteURL:
|
||||
emoji.ImageRemoteURL = *info.ImageRemoteURL
|
||||
force = true
|
||||
case info.ImageStaticRemoteURL != nil &&
|
||||
*info.ImageStaticRemoteURL != emoji.ImageStaticRemoteURL:
|
||||
emoji.ImageStaticRemoteURL = *info.ImageStaticRemoteURL
|
||||
force = true
|
||||
}
|
||||
|
||||
// Check if needs updating.
|
||||
if !force && *emoji.Cached {
|
||||
if *emoji.Cached && !force {
|
||||
return emoji, nil
|
||||
}
|
||||
|
||||
// TODO: more finegrained freshness checks.
|
||||
|
||||
// Generate shortcode domain for locks + logging.
|
||||
shortcodeDomain := emoji.Shortcode + "@" + emoji.Domain
|
||||
// Get shortcode domain for locks + logging.
|
||||
shortcodeDomain := emoji.ShortcodeDomain()
|
||||
|
||||
// Ensure we have a valid image remote URL.
|
||||
url, err := url.Parse(emoji.ImageRemoteURL)
|
||||
|
@ -168,25 +171,27 @@ func (d *Dereferencer) RefreshEmoji(
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// Acquire new instance account transport for emoji dereferencing.
|
||||
tsport, err := d.transportController.NewTransportForUsername(ctx, "")
|
||||
if err != nil {
|
||||
err := gtserror.Newf("error getting instance transport: %w", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Get maximum supported remote emoji size.
|
||||
maxsz := config.GetMediaEmojiRemoteMaxSize()
|
||||
|
||||
// Prepare data function to dereference remote emoji media.
|
||||
data := func(context.Context) (io.ReadCloser, error) {
|
||||
return tsport.DereferenceMedia(ctx, url, int64(maxsz))
|
||||
}
|
||||
|
||||
// Pass along for safe processing.
|
||||
return d.processEmojiSafely(ctx,
|
||||
shortcodeDomain,
|
||||
func() (*media.ProcessingEmoji, error) {
|
||||
|
||||
// Acquire new instance account transport for emoji dereferencing.
|
||||
tsport, err := d.transportController.NewTransportForUsername(ctx, "")
|
||||
if err != nil {
|
||||
err := gtserror.Newf("error getting instance transport: %w", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Get maximum supported remote emoji size.
|
||||
maxsz := config.GetMediaEmojiRemoteMaxSize()
|
||||
|
||||
// Prepare data function to dereference remote emoji media.
|
||||
data := func(context.Context) (io.ReadCloser, error) {
|
||||
return tsport.DereferenceMedia(ctx, url, int64(maxsz))
|
||||
}
|
||||
|
||||
// Refresh emoji with prepared info.
|
||||
return d.mediaManager.RefreshEmoji(ctx,
|
||||
emoji,
|
||||
data,
|
||||
|
@ -226,6 +231,13 @@ func (d *Dereferencer) processEmojiSafely(
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
// Remove on finish.
|
||||
d.derefEmojisMu.Lock()
|
||||
delete(d.derefEmojis, shortcodeDomain)
|
||||
d.derefEmojisMu.Unlock()
|
||||
}()
|
||||
}
|
||||
|
||||
// Unlock map.
|
||||
|
@ -240,10 +252,7 @@ func (d *Dereferencer) processEmojiSafely(
|
|||
// which can determine if loading error should allow remaining placeholder.
|
||||
}
|
||||
|
||||
// Return a COPY of emoji.
|
||||
emoji2 := new(gtsmodel.Emoji)
|
||||
*emoji2 = *emoji
|
||||
return emoji2, err
|
||||
return
|
||||
}
|
||||
|
||||
func (d *Dereferencer) fetchEmojis(
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/media"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/util"
|
||||
)
|
||||
|
||||
// GetMedia fetches the media at given remote URL by
|
||||
|
@ -56,46 +57,39 @@ func (d *Dereferencer) GetMedia(
|
|||
*gtsmodel.MediaAttachment,
|
||||
error,
|
||||
) {
|
||||
// Parse str as valid URL object.
|
||||
// Ensure we have a valid remote URL.
|
||||
url, err := url.Parse(remoteURL)
|
||||
if err != nil {
|
||||
return nil, gtserror.Newf("invalid remote media url %q: %v", remoteURL, err)
|
||||
}
|
||||
|
||||
// Fetch transport for the provided request user from controller.
|
||||
tsport, err := d.transportController.NewTransportForUsername(ctx,
|
||||
requestUser,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, gtserror.Newf("failed getting transport for %s: %w", requestUser, err)
|
||||
}
|
||||
|
||||
// Get maximum supported remote media size.
|
||||
maxsz := config.GetMediaRemoteMaxSize()
|
||||
|
||||
// Start processing remote attachment at URL.
|
||||
processing, err := d.mediaManager.CreateMedia(
|
||||
ctx,
|
||||
accountID,
|
||||
func(ctx context.Context) (io.ReadCloser, error) {
|
||||
return tsport.DereferenceMedia(ctx, url, int64(maxsz))
|
||||
},
|
||||
info,
|
||||
)
|
||||
if err != nil {
|
||||
err := gtserror.Newf("invalid media remote url %s: %w", remoteURL, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Perform media load operation.
|
||||
media, err := processing.Load(ctx)
|
||||
if err != nil {
|
||||
err = gtserror.Newf("error loading media %s: %w", media.RemoteURL, err)
|
||||
return d.processMediaSafeley(ctx,
|
||||
remoteURL,
|
||||
func() (*media.ProcessingMedia, error) {
|
||||
|
||||
// TODO: in time we should return checkable flags by gtserror.Is___()
|
||||
// which can determine if loading error should allow remaining placeholder.
|
||||
}
|
||||
// Fetch transport for the provided request user from controller.
|
||||
tsport, err := d.transportController.NewTransportForUsername(ctx,
|
||||
requestUser,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, gtserror.Newf("failed getting transport for %s: %w", requestUser, err)
|
||||
}
|
||||
|
||||
return media, err
|
||||
// Get maximum supported remote media size.
|
||||
maxsz := config.GetMediaRemoteMaxSize()
|
||||
|
||||
// Create media with prepared info.
|
||||
return d.mediaManager.CreateMedia(
|
||||
ctx,
|
||||
accountID,
|
||||
func(ctx context.Context) (io.ReadCloser, error) {
|
||||
return tsport.DereferenceMedia(ctx, url, int64(maxsz))
|
||||
},
|
||||
info,
|
||||
)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// RefreshMedia ensures that given media is up-to-date,
|
||||
|
@ -119,7 +113,7 @@ func (d *Dereferencer) GetMedia(
|
|||
func (d *Dereferencer) RefreshMedia(
|
||||
ctx context.Context,
|
||||
requestUser string,
|
||||
media *gtsmodel.MediaAttachment,
|
||||
attach *gtsmodel.MediaAttachment,
|
||||
info media.AdditionalMediaInfo,
|
||||
force bool,
|
||||
) (
|
||||
|
@ -127,67 +121,65 @@ func (d *Dereferencer) RefreshMedia(
|
|||
error,
|
||||
) {
|
||||
// Can't refresh local.
|
||||
if media.IsLocal() {
|
||||
return media, nil
|
||||
if attach.IsLocal() {
|
||||
return attach, nil
|
||||
}
|
||||
|
||||
// Check emoji is up-to-date
|
||||
// with provided extra info.
|
||||
switch {
|
||||
case info.Blurhash != nil &&
|
||||
*info.Blurhash != media.Blurhash:
|
||||
*info.Blurhash != attach.Blurhash:
|
||||
attach.Blurhash = *info.Blurhash
|
||||
force = true
|
||||
case info.Description != nil &&
|
||||
*info.Description != media.Description:
|
||||
*info.Description != attach.Description:
|
||||
attach.Description = *info.Description
|
||||
force = true
|
||||
case info.RemoteURL != nil &&
|
||||
*info.RemoteURL != media.RemoteURL:
|
||||
*info.RemoteURL != attach.RemoteURL:
|
||||
attach.RemoteURL = *info.RemoteURL
|
||||
force = true
|
||||
}
|
||||
|
||||
// Check if needs updating.
|
||||
if !force && *media.Cached {
|
||||
return media, nil
|
||||
if *attach.Cached && !force {
|
||||
return attach, nil
|
||||
}
|
||||
|
||||
// TODO: more finegrained freshness checks.
|
||||
|
||||
// Ensure we have a valid remote URL.
|
||||
url, err := url.Parse(media.RemoteURL)
|
||||
url, err := url.Parse(attach.RemoteURL)
|
||||
if err != nil {
|
||||
err := gtserror.Newf("invalid media remote url %s: %w", media.RemoteURL, err)
|
||||
err := gtserror.Newf("invalid media remote url %s: %w", attach.RemoteURL, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Fetch transport for the provided request user from controller.
|
||||
tsport, err := d.transportController.NewTransportForUsername(ctx,
|
||||
requestUser,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, gtserror.Newf("failed getting transport for %s: %w", requestUser, err)
|
||||
}
|
||||
// Pass along for safe processing.
|
||||
return d.processMediaSafeley(ctx,
|
||||
attach.RemoteURL,
|
||||
func() (*media.ProcessingMedia, error) {
|
||||
|
||||
// Get maximum supported remote media size.
|
||||
maxsz := config.GetMediaRemoteMaxSize()
|
||||
// Fetch transport for the provided request user from controller.
|
||||
tsport, err := d.transportController.NewTransportForUsername(ctx,
|
||||
requestUser,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, gtserror.Newf("failed getting transport for %s: %w", requestUser, err)
|
||||
}
|
||||
|
||||
// Start processing remote attachment recache.
|
||||
processing := d.mediaManager.RecacheMedia(
|
||||
media,
|
||||
func(ctx context.Context) (io.ReadCloser, error) {
|
||||
return tsport.DereferenceMedia(ctx, url, int64(maxsz))
|
||||
// Get maximum supported remote media size.
|
||||
maxsz := config.GetMediaRemoteMaxSize()
|
||||
|
||||
// Recache media with prepared info,
|
||||
// this will also update media in db.
|
||||
return d.mediaManager.RecacheMedia(
|
||||
attach,
|
||||
func(ctx context.Context) (io.ReadCloser, error) {
|
||||
return tsport.DereferenceMedia(ctx, url, int64(maxsz))
|
||||
},
|
||||
), nil
|
||||
},
|
||||
)
|
||||
|
||||
// Perform media load operation.
|
||||
media, err = processing.Load(ctx)
|
||||
if err != nil {
|
||||
err = gtserror.Newf("error loading media %s: %w", media.RemoteURL, err)
|
||||
|
||||
// TODO: in time we should return checkable flags by gtserror.Is___()
|
||||
// which can determine if loading error should allow remaining placeholder.
|
||||
}
|
||||
|
||||
return media, err
|
||||
}
|
||||
|
||||
// updateAttachment handles the case of an existing media attachment
|
||||
|
@ -220,3 +212,57 @@ func (d *Dereferencer) updateAttachment(
|
|||
false,
|
||||
)
|
||||
}
|
||||
|
||||
// processingEmojiSafely provides concurrency-safe processing of
|
||||
// an emoji with given shortcode+domain. if a copy of the emoji is
|
||||
// not already being processed, the given 'process' callback will
|
||||
// be used to generate new *media.ProcessingEmoji{} instance.
|
||||
func (d *Dereferencer) processMediaSafeley(
|
||||
ctx context.Context,
|
||||
remoteURL string,
|
||||
process func() (*media.ProcessingMedia, error),
|
||||
) (
|
||||
media *gtsmodel.MediaAttachment,
|
||||
err error,
|
||||
) {
|
||||
|
||||
// Acquire map lock.
|
||||
d.derefMediaMu.Lock()
|
||||
|
||||
// Ensure unlock only done once.
|
||||
unlock := d.derefMediaMu.Unlock
|
||||
unlock = util.DoOnce(unlock)
|
||||
defer unlock()
|
||||
|
||||
// Look for an existing deref in progress.
|
||||
processing, ok := d.derefMedia[remoteURL]
|
||||
|
||||
if !ok {
|
||||
// Start new processing emoji.
|
||||
processing, err = process()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
// Remove on finish.
|
||||
d.derefMediaMu.Lock()
|
||||
delete(d.derefMedia, remoteURL)
|
||||
d.derefMediaMu.Unlock()
|
||||
}()
|
||||
}
|
||||
|
||||
// Unlock map.
|
||||
unlock()
|
||||
|
||||
// Perform media load operation.
|
||||
media, err = processing.Load(ctx)
|
||||
if err != nil {
|
||||
err = gtserror.Newf("error loading media %s: %w", remoteURL, err)
|
||||
|
||||
// TODO: in time we should return checkable flags by gtserror.Is___()
|
||||
// which can determine if loading error should allow remaining placeholder.
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ import (
|
|||
|
||||
errorsv2 "codeberg.org/gruf/go-errors/v2"
|
||||
"codeberg.org/gruf/go-runners"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/log"
|
||||
|
@ -77,42 +76,34 @@ func (p *ProcessingEmoji) load(ctx context.Context) (
|
|||
|
||||
defer func() {
|
||||
// This is only done when ctx NOT cancelled.
|
||||
done = (err == nil || !errorsv2.IsV2(err,
|
||||
if done = (err == nil || !errorsv2.IsV2(err,
|
||||
context.Canceled,
|
||||
context.DeadlineExceeded,
|
||||
))
|
||||
)); done {
|
||||
// Processing finished,
|
||||
// whether error or not!
|
||||
|
||||
if !done {
|
||||
return
|
||||
// Anything from here, we
|
||||
// need to ensure happens
|
||||
// (i.e. no ctx canceled).
|
||||
ctx = context.WithoutCancel(ctx)
|
||||
|
||||
// On error, clean
|
||||
// downloaded files.
|
||||
if err != nil {
|
||||
p.cleanup(ctx)
|
||||
}
|
||||
|
||||
// Update with latest details, whatever happened.
|
||||
e := p.mgr.state.DB.UpdateEmoji(ctx, p.emoji)
|
||||
if e != nil {
|
||||
log.Errorf(ctx, "error updating emoji in db: %v", e)
|
||||
}
|
||||
|
||||
// Store values.
|
||||
p.done = true
|
||||
p.err = err
|
||||
}
|
||||
|
||||
// Anything from here, we
|
||||
// need to ensure happens
|
||||
// (i.e. no ctx canceled).
|
||||
ctx = gtscontext.WithValues(
|
||||
context.Background(),
|
||||
ctx, // values
|
||||
)
|
||||
|
||||
// On error, clean
|
||||
// downloaded files.
|
||||
if err != nil {
|
||||
p.cleanup(ctx)
|
||||
}
|
||||
|
||||
if !done {
|
||||
return
|
||||
}
|
||||
|
||||
// Update with latest details, whatever happened.
|
||||
e := p.mgr.state.DB.UpdateEmoji(ctx, p.emoji)
|
||||
if e != nil {
|
||||
log.Errorf(ctx, "error updating emoji in db: %v", e)
|
||||
}
|
||||
|
||||
// Store final values.
|
||||
p.done = true
|
||||
p.err = err
|
||||
}()
|
||||
|
||||
// Attempt to store media and calculate
|
||||
|
@ -122,7 +113,10 @@ func (p *ProcessingEmoji) load(ctx context.Context) (
|
|||
err = p.store(ctx)
|
||||
return err
|
||||
})
|
||||
emoji = p.emoji
|
||||
|
||||
// Return a copy of emoji.
|
||||
emoji = new(gtsmodel.Emoji)
|
||||
*emoji = *p.emoji
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -265,11 +259,11 @@ func (p *ProcessingEmoji) store(ctx context.Context) error {
|
|||
// cleanup will remove any traces of processing emoji from storage,
|
||||
// and perform any other necessary cleanup steps after failure.
|
||||
func (p *ProcessingEmoji) cleanup(ctx context.Context) {
|
||||
var err error
|
||||
log.Debugf(ctx, "running cleanup of emoji %s", p.emoji.ID)
|
||||
|
||||
if p.emoji.ImagePath != "" {
|
||||
// Ensure emoji file at path is deleted from storage.
|
||||
err = p.mgr.state.Storage.Delete(ctx, p.emoji.ImagePath)
|
||||
err := p.mgr.state.Storage.Delete(ctx, p.emoji.ImagePath)
|
||||
if err != nil && !storage.IsNotFound(err) {
|
||||
log.Errorf(ctx, "error deleting %s: %v", p.emoji.ImagePath, err)
|
||||
}
|
||||
|
@ -277,7 +271,7 @@ func (p *ProcessingEmoji) cleanup(ctx context.Context) {
|
|||
|
||||
if p.emoji.ImageStaticPath != "" {
|
||||
// Ensure emoji static file at path is deleted from storage.
|
||||
err = p.mgr.state.Storage.Delete(ctx, p.emoji.ImageStaticPath)
|
||||
err := p.mgr.state.Storage.Delete(ctx, p.emoji.ImageStaticPath)
|
||||
if err != nil && !storage.IsNotFound(err) {
|
||||
log.Errorf(ctx, "error deleting %s: %v", p.emoji.ImageStaticPath, err)
|
||||
}
|
||||
|
|
|
@ -19,12 +19,10 @@ package media
|
|||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
errorsv2 "codeberg.org/gruf/go-errors/v2"
|
||||
"codeberg.org/gruf/go-runners"
|
||||
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/log"
|
||||
|
@ -63,6 +61,7 @@ func (p *ProcessingMedia) Load(ctx context.Context) (*gtsmodel.MediaAttachment,
|
|||
media, done, err := p.load(ctx)
|
||||
if !done {
|
||||
// On a context-canceled error (marked as !done), requeue for loading.
|
||||
log.Warnf(ctx, "reprocessing media %s after canceled ctx", p.media.ID)
|
||||
p.mgr.state.Workers.Dereference.Queue.Push(func(ctx context.Context) {
|
||||
if _, _, err := p.load(ctx); err != nil {
|
||||
log.Errorf(ctx, "error loading media: %v", err)
|
||||
|
@ -86,55 +85,35 @@ func (p *ProcessingMedia) load(ctx context.Context) (
|
|||
|
||||
defer func() {
|
||||
// This is only done when ctx NOT cancelled.
|
||||
done = (err == nil || !errorsv2.IsV2(err,
|
||||
if done = (err == nil || !errorsv2.IsV2(err,
|
||||
context.Canceled,
|
||||
context.DeadlineExceeded,
|
||||
))
|
||||
)); done {
|
||||
// Processing finished,
|
||||
// whether error or not!
|
||||
|
||||
if !done {
|
||||
return
|
||||
// Anything from here, we
|
||||
// need to ensure happens
|
||||
// (i.e. no ctx canceled).
|
||||
ctx = context.WithoutCancel(ctx)
|
||||
|
||||
// On error or unknown media types, perform error cleanup.
|
||||
if err != nil || p.media.Type == gtsmodel.FileTypeUnknown {
|
||||
p.cleanup(ctx)
|
||||
}
|
||||
|
||||
// Update with latest details, whatever happened.
|
||||
e := p.mgr.state.DB.UpdateAttachment(ctx, p.media)
|
||||
if e != nil {
|
||||
log.Errorf(ctx, "error updating media in db: %v", e)
|
||||
}
|
||||
|
||||
// Store values.
|
||||
p.done = true
|
||||
p.err = err
|
||||
}
|
||||
|
||||
// Anything from here, we
|
||||
// need to ensure happens
|
||||
// (i.e. no ctx canceled).
|
||||
ctx = gtscontext.WithValues(
|
||||
context.Background(),
|
||||
ctx, // values
|
||||
)
|
||||
|
||||
// On error or unknown media types, perform error cleanup.
|
||||
if err != nil || p.media.Type == gtsmodel.FileTypeUnknown {
|
||||
p.cleanup(ctx)
|
||||
}
|
||||
|
||||
// Update with latest details, whatever happened.
|
||||
e := p.mgr.state.DB.UpdateAttachment(ctx, p.media)
|
||||
if e != nil {
|
||||
log.Errorf(ctx, "error updating media in db: %v", e)
|
||||
}
|
||||
|
||||
// Store final values.
|
||||
p.done = true
|
||||
p.err = err
|
||||
}()
|
||||
|
||||
// TODO: in time update this
|
||||
// to perhaps follow a similar
|
||||
// freshness window to statuses
|
||||
// / accounts? But that's a big
|
||||
// maybe, media don't change in
|
||||
// the same way so this is largely
|
||||
// just to slow down fail retries.
|
||||
const maxfreq = 6 * time.Hour
|
||||
|
||||
// Check whether media is uncached but repeatedly failing,
|
||||
// specifically limit the frequency at which we allow this.
|
||||
if !p.media.UpdatedAt.Equal(p.media.CreatedAt) && // i.e. not new
|
||||
p.media.UpdatedAt.Add(maxfreq).Before(time.Now()) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Attempt to store media and calculate
|
||||
// full-size media attachment details.
|
||||
//
|
||||
|
@ -142,7 +121,10 @@ func (p *ProcessingMedia) load(ctx context.Context) (
|
|||
err = p.store(ctx)
|
||||
return err
|
||||
})
|
||||
media = p.media
|
||||
|
||||
// Return a copy of media attachment.
|
||||
media = new(gtsmodel.MediaAttachment)
|
||||
*media = *p.media
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -331,11 +313,9 @@ func (p *ProcessingMedia) store(ctx context.Context) error {
|
|||
// cleanup will remove any traces of processing media from storage.
|
||||
// and perform any other necessary cleanup steps after failure.
|
||||
func (p *ProcessingMedia) cleanup(ctx context.Context) {
|
||||
var err error
|
||||
|
||||
if p.media.File.Path != "" {
|
||||
// Ensure media file at path is deleted from storage.
|
||||
err = p.mgr.state.Storage.Delete(ctx, p.media.File.Path)
|
||||
err := p.mgr.state.Storage.Delete(ctx, p.media.File.Path)
|
||||
if err != nil && !storage.IsNotFound(err) {
|
||||
log.Errorf(ctx, "error deleting %s: %v", p.media.File.Path, err)
|
||||
}
|
||||
|
@ -343,7 +323,7 @@ func (p *ProcessingMedia) cleanup(ctx context.Context) {
|
|||
|
||||
if p.media.Thumbnail.Path != "" {
|
||||
// Ensure media thumbnail at path is deleted from storage.
|
||||
err = p.mgr.state.Storage.Delete(ctx, p.media.Thumbnail.Path)
|
||||
err := p.mgr.state.Storage.Delete(ctx, p.media.Thumbnail.Path)
|
||||
if err != nil && !storage.IsNotFound(err) {
|
||||
log.Errorf(ctx, "error deleting %s: %v", p.media.Thumbnail.Path, err)
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import (
|
|||
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/media"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/regexes"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/storage"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/uris"
|
||||
)
|
||||
|
@ -41,79 +42,97 @@ func (p *Processor) GetFile(
|
|||
requester *gtsmodel.Account,
|
||||
form *apimodel.GetContentRequestForm,
|
||||
) (*apimodel.Content, gtserror.WithCode) {
|
||||
// parse the form fields
|
||||
// Parse media size (small, static, original).
|
||||
mediaSize, err := parseSize(form.MediaSize)
|
||||
if err != nil {
|
||||
return nil, gtserror.NewErrorNotFound(fmt.Errorf("media size %s not valid", form.MediaSize))
|
||||
err := gtserror.Newf("media size %s not valid", form.MediaSize)
|
||||
return nil, gtserror.NewErrorNotFound(err)
|
||||
}
|
||||
|
||||
// Parse media type (emoji, header, avatar, attachment).
|
||||
mediaType, err := parseType(form.MediaType)
|
||||
if err != nil {
|
||||
return nil, gtserror.NewErrorNotFound(fmt.Errorf("media type %s not valid", form.MediaType))
|
||||
err := gtserror.Newf("media type %s not valid", form.MediaType)
|
||||
return nil, gtserror.NewErrorNotFound(err)
|
||||
}
|
||||
|
||||
spl := strings.Split(form.FileName, ".")
|
||||
if len(spl) != 2 || spl[0] == "" || spl[1] == "" {
|
||||
return nil, gtserror.NewErrorNotFound(fmt.Errorf("file name %s not parseable", form.FileName))
|
||||
}
|
||||
wantedMediaID := spl[0]
|
||||
owningAccountID := form.AccountID
|
||||
|
||||
// get the account that owns the media and make sure it's not suspended
|
||||
owningAccount, err := p.state.DB.GetAccountByID(ctx, owningAccountID)
|
||||
// Parse media ID from file name.
|
||||
mediaID, _, err := parseFileName(form.FileName)
|
||||
if err != nil {
|
||||
return nil, gtserror.NewErrorNotFound(fmt.Errorf("account with id %s could not be selected from the db: %s", owningAccountID, err))
|
||||
}
|
||||
if !owningAccount.SuspendedAt.IsZero() {
|
||||
return nil, gtserror.NewErrorNotFound(fmt.Errorf("account with id %s is suspended", owningAccountID))
|
||||
err := gtserror.Newf("media file name %s not valid", form.FileName)
|
||||
return nil, gtserror.NewErrorNotFound(err)
|
||||
}
|
||||
|
||||
// make sure the requesting account and the media account don't block each other
|
||||
// Get the account that owns the media
|
||||
// and make sure it's not suspended.
|
||||
acctID := form.AccountID
|
||||
acct, err := p.state.DB.GetAccountByID(ctx, acctID)
|
||||
if err != nil {
|
||||
err := gtserror.Newf("db error getting account %s: %w", acctID, err)
|
||||
return nil, gtserror.NewErrorNotFound(err)
|
||||
}
|
||||
|
||||
if acct.IsSuspended() {
|
||||
err := gtserror.Newf("account %s is suspended", acctID)
|
||||
return nil, gtserror.NewErrorNotFound(err)
|
||||
}
|
||||
|
||||
// If requester was authenticated, ensure media
|
||||
// owner and requester don't block each other.
|
||||
if requester != nil {
|
||||
blocked, err := p.state.DB.IsEitherBlocked(ctx, requester.ID, owningAccountID)
|
||||
blocked, err := p.state.DB.IsEitherBlocked(ctx, requester.ID, acctID)
|
||||
if err != nil {
|
||||
return nil, gtserror.NewErrorNotFound(fmt.Errorf("block status could not be established between accounts %s and %s: %s", owningAccountID, requester.ID, err))
|
||||
err := gtserror.Newf("db error checking block between %s and %s: %w", acctID, requester.ID, err)
|
||||
return nil, gtserror.NewErrorNotFound(err)
|
||||
}
|
||||
|
||||
if blocked {
|
||||
return nil, gtserror.NewErrorNotFound(fmt.Errorf("block exists between accounts %s and %s", owningAccountID, requester.ID))
|
||||
err := gtserror.Newf("block exists between %s and %s", acctID, requester.ID)
|
||||
return nil, gtserror.NewErrorNotFound(err)
|
||||
}
|
||||
}
|
||||
|
||||
// the way we store emojis is a little different from the way we store other attachments,
|
||||
// so we need to take different steps depending on the media type being requested
|
||||
// The way we store emojis is a bit different
|
||||
// from the way we store other attachments,
|
||||
// so we need to take different steps depending
|
||||
// on the media type being requested.
|
||||
switch mediaType {
|
||||
|
||||
case media.TypeEmoji:
|
||||
return p.getEmojiContent(ctx,
|
||||
owningAccountID,
|
||||
wantedMediaID,
|
||||
acctID,
|
||||
mediaSize,
|
||||
mediaID,
|
||||
)
|
||||
|
||||
case media.TypeAttachment, media.TypeHeader, media.TypeAvatar:
|
||||
return p.getAttachmentContent(ctx,
|
||||
requester,
|
||||
owningAccountID,
|
||||
wantedMediaID,
|
||||
acctID,
|
||||
mediaSize,
|
||||
mediaID,
|
||||
)
|
||||
|
||||
default:
|
||||
return nil, gtserror.NewErrorNotFound(fmt.Errorf("media type %s not recognized", mediaType))
|
||||
err := gtserror.Newf("media type %s not recognized", mediaType)
|
||||
return nil, gtserror.NewErrorNotFound(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Processor) getAttachmentContent(
|
||||
ctx context.Context,
|
||||
requester *gtsmodel.Account,
|
||||
ownerID string,
|
||||
mediaID string,
|
||||
acctID string,
|
||||
sizeStr media.Size,
|
||||
mediaID string,
|
||||
) (
|
||||
*apimodel.Content,
|
||||
gtserror.WithCode,
|
||||
) {
|
||||
// Search for media with given ID in the database.
|
||||
// Get attachment with given ID from the database.
|
||||
attach, err := p.state.DB.GetAttachmentByID(ctx, mediaID)
|
||||
if err != nil && !errors.Is(err, db.ErrNoEntries) {
|
||||
err := gtserror.Newf("error fetching media from database: %w", err)
|
||||
err := gtserror.Newf("db error getting attachment %s: %w", mediaID, err)
|
||||
return nil, gtserror.NewErrorInternalError(err)
|
||||
}
|
||||
|
||||
|
@ -122,51 +141,24 @@ func (p *Processor) getAttachmentContent(
|
|||
return nil, gtserror.NewErrorNotFound(errors.New(text), text)
|
||||
}
|
||||
|
||||
// Ensure the 'owner' owns media.
|
||||
if attach.AccountID != ownerID {
|
||||
// Ensure the account
|
||||
// actually owns the media.
|
||||
if attach.AccountID != acctID {
|
||||
const text = "media was not owned by passed account id"
|
||||
return nil, gtserror.NewErrorNotFound(errors.New(text) /* no help text! */)
|
||||
}
|
||||
|
||||
var remoteURL *url.URL
|
||||
if attach.RemoteURL != "" {
|
||||
|
||||
// Parse media remote URL to valid URL object.
|
||||
remoteURL, err = url.Parse(attach.RemoteURL)
|
||||
if err != nil {
|
||||
err := gtserror.Newf("invalid media remote url %s: %w", attach.RemoteURL, err)
|
||||
return nil, gtserror.NewErrorInternalError(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Uknown file types indicate no *locally*
|
||||
// Unknown file types indicate no *locally*
|
||||
// stored data we can serve. Handle separately.
|
||||
if attach.Type == gtsmodel.FileTypeUnknown {
|
||||
if remoteURL == nil {
|
||||
err := gtserror.Newf("missing remote url for unknown type media %s: %w", attach.ID, err)
|
||||
return nil, gtserror.NewErrorInternalError(err)
|
||||
}
|
||||
|
||||
// If this is an "Unknown" file type, ie., one we
|
||||
// tried to process and couldn't, or one we refused
|
||||
// to process because it wasn't supported, then we
|
||||
// can skip a lot of steps here by simply forwarding
|
||||
// the request to the remote URL.
|
||||
url := &storage.PresignedURL{
|
||||
URL: remoteURL,
|
||||
|
||||
// We might manage to cache the media
|
||||
// at some point, so set a low-ish expiry.
|
||||
Expiry: time.Now().Add(2 * time.Hour),
|
||||
}
|
||||
|
||||
return &apimodel.Content{URL: url}, nil
|
||||
return handleUnknown(attach)
|
||||
}
|
||||
|
||||
// If requester was provided, use their username
|
||||
// to create a transport to potentially re-fetch
|
||||
// the media. Else falls back to instance account.
|
||||
var requestUser string
|
||||
|
||||
if requester != nil {
|
||||
// Set requesting acc username.
|
||||
requestUser = requester.Username
|
||||
}
|
||||
|
||||
|
@ -217,10 +209,9 @@ func (p *Processor) getAttachmentContent(
|
|||
|
||||
func (p *Processor) getEmojiContent(
|
||||
ctx context.Context,
|
||||
|
||||
ownerID string,
|
||||
emojiID string,
|
||||
acctID string,
|
||||
sizeStr media.Size,
|
||||
emojiID string,
|
||||
) (
|
||||
*apimodel.Content,
|
||||
gtserror.WithCode,
|
||||
|
@ -229,7 +220,7 @@ func (p *Processor) getEmojiContent(
|
|||
// As refreshed emojis use a newly generated path ID to
|
||||
// differentiate them (cache-wise) from the original.
|
||||
staticURL := uris.URIForAttachment(
|
||||
ownerID,
|
||||
acctID,
|
||||
string(media.TypeEmoji),
|
||||
string(media.SizeStatic),
|
||||
emojiID,
|
||||
|
@ -323,8 +314,9 @@ func (p *Processor) getContent(
|
|||
|
||||
// Ensure found.
|
||||
if rc == nil {
|
||||
err := gtserror.Newf("file not found at %s", path)
|
||||
const text = "file not found"
|
||||
return nil, gtserror.NewErrorNotFound(errors.New(text), text)
|
||||
return nil, gtserror.NewErrorNotFound(err, text)
|
||||
}
|
||||
|
||||
// Return with stream.
|
||||
|
@ -332,6 +324,41 @@ func (p *Processor) getContent(
|
|||
return content, nil
|
||||
}
|
||||
|
||||
// handles serving Content for "unknown" file
|
||||
// type, ie., a file we couldn't cache (this time).
|
||||
func handleUnknown(
|
||||
attach *gtsmodel.MediaAttachment,
|
||||
) (*apimodel.Content, gtserror.WithCode) {
|
||||
if attach.RemoteURL == "" {
|
||||
err := gtserror.Newf("empty remote url for %s", attach.ID)
|
||||
return nil, gtserror.NewErrorInternalError(err)
|
||||
}
|
||||
|
||||
// Parse media remote URL to valid URL object.
|
||||
remoteURL, err := url.Parse(attach.RemoteURL)
|
||||
if err != nil {
|
||||
err := gtserror.Newf("invalid remote url for %s: %w", attach.ID, err)
|
||||
return nil, gtserror.NewErrorInternalError(err)
|
||||
}
|
||||
|
||||
if remoteURL == nil {
|
||||
err := gtserror.Newf("nil remote url for %s", attach.ID)
|
||||
return nil, gtserror.NewErrorInternalError(err)
|
||||
}
|
||||
|
||||
// Just forward the request to the remote URL,
|
||||
// since this is a type we couldn't process.
|
||||
url := &storage.PresignedURL{
|
||||
URL: remoteURL,
|
||||
|
||||
// We might manage to cache the media
|
||||
// at some point, so set a low-ish expiry.
|
||||
Expiry: time.Now().Add(2 * time.Hour),
|
||||
}
|
||||
|
||||
return &apimodel.Content{URL: url}, nil
|
||||
}
|
||||
|
||||
func parseType(s string) (media.Type, error) {
|
||||
switch s {
|
||||
case string(media.TypeAttachment):
|
||||
|
@ -357,3 +384,23 @@ func parseSize(s string) (media.Size, error) {
|
|||
}
|
||||
return "", fmt.Errorf("%s not a recognized media.Size", s)
|
||||
}
|
||||
|
||||
// Extract the mediaID and file extension from
|
||||
// a string like "01J3CTH8CZ6ATDNMG6CPRC36XE.gif"
|
||||
func parseFileName(s string) (string, string, error) {
|
||||
spl := strings.Split(s, ".")
|
||||
if len(spl) != 2 || spl[0] == "" || spl[1] == "" {
|
||||
return "", "", errors.New("file name not splittable on '.'")
|
||||
}
|
||||
|
||||
var (
|
||||
mediaID = spl[0]
|
||||
mediaExt = spl[1]
|
||||
)
|
||||
|
||||
if !regexes.ULID.MatchString(mediaID) {
|
||||
return "", "", fmt.Errorf("%s not a valid ULID", mediaID)
|
||||
}
|
||||
|
||||
return mediaID, mediaExt, nil
|
||||
}
|
||||
|
|
|
@ -52,6 +52,12 @@ type PresignedURL struct {
|
|||
Expiry time.Time // link expires at this time
|
||||
}
|
||||
|
||||
// IsInvalidKey returns whether error is an invalid-key
|
||||
// type error returned by the underlying storage library.
|
||||
func IsInvalidKey(err error) bool {
|
||||
return errors.Is(err, storage.ErrInvalidKey)
|
||||
}
|
||||
|
||||
// IsAlreadyExist returns whether error is an already-exists
|
||||
// type error returned by the underlying storage library.
|
||||
func IsAlreadyExist(err error) bool {
|
||||
|
|
Loading…
Reference in a new issue