mirror of
https://github.com/navidrome/navidrome.git
synced 2025-04-05 05:27:37 +03:00
Don't cache transcoded files if the request was cancelled (#2041)
* Don't cache transcoded files if the request was cancelled (or there was a transcoding error) * Add context to logs * Simplify Wait error handling * Fix flaky test * Change log level for "populating cache" error message * Small cleanups
This commit is contained in:
parent
54395e7e6a
commit
24d520882e
4 changed files with 157 additions and 34 deletions
|
@ -2,6 +2,8 @@ package transcoder
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
|
@ -12,7 +14,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Transcoder interface {
|
type Transcoder interface {
|
||||||
Start(ctx context.Context, command, path string, maxBitRate int) (f io.ReadCloser, err error)
|
Start(ctx context.Context, command, path string, maxBitRate int) (io.ReadCloser, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func New() Transcoder {
|
func New() Transcoder {
|
||||||
|
@ -21,23 +23,55 @@ func New() Transcoder {
|
||||||
|
|
||||||
type externalTranscoder struct{}
|
type externalTranscoder struct{}
|
||||||
|
|
||||||
func (e *externalTranscoder) Start(ctx context.Context, command, path string, maxBitRate int) (f io.ReadCloser, err error) {
|
func (e *externalTranscoder) Start(ctx context.Context, command, path string, maxBitRate int) (io.ReadCloser, error) {
|
||||||
args := createTranscodeCommand(command, path, maxBitRate)
|
args := createTranscodeCommand(command, path, maxBitRate)
|
||||||
|
|
||||||
log.Trace(ctx, "Executing transcoding command", "cmd", args)
|
log.Trace(ctx, "Executing transcoding command", "cmd", args)
|
||||||
cmd := exec.CommandContext(ctx, args[0], args[1:]...) // #nosec
|
j := &Cmd{ctx: ctx, args: args}
|
||||||
|
j.PipeReader, j.out = io.Pipe()
|
||||||
|
err := j.start()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
go j.wait()
|
||||||
|
return j, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type Cmd struct {
|
||||||
|
*io.PipeReader
|
||||||
|
out *io.PipeWriter
|
||||||
|
ctx context.Context
|
||||||
|
args []string
|
||||||
|
cmd *exec.Cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *Cmd) start() error {
|
||||||
|
cmd := exec.CommandContext(j.ctx, j.args[0], j.args[1:]...) // #nosec
|
||||||
|
cmd.Stdout = j.out
|
||||||
cmd.Stderr = os.Stderr
|
cmd.Stderr = os.Stderr
|
||||||
if f, err = cmd.StdoutPipe(); err != nil {
|
j.cmd = cmd
|
||||||
return
|
|
||||||
|
if err := cmd.Start(); err != nil {
|
||||||
|
return fmt.Errorf("starting cmd: %w", err)
|
||||||
}
|
}
|
||||||
if err = cmd.Start(); err != nil {
|
return nil
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() { _ = cmd.Wait() }() // prevent zombies
|
func (j *Cmd) wait() {
|
||||||
|
if err := j.cmd.Wait(); err != nil {
|
||||||
|
var exitErr *exec.ExitError
|
||||||
|
if errors.As(err, &exitErr) {
|
||||||
|
_ = j.out.CloseWithError(fmt.Errorf("%s exited with non-zero status code: %d", j.args[0], exitErr.ExitCode()))
|
||||||
|
} else {
|
||||||
|
_ = j.out.CloseWithError(fmt.Errorf("waiting %s cmd: %w", j.args[0], err))
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if j.ctx.Err() != nil {
|
||||||
|
_ = j.out.CloseWithError(j.ctx.Err())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_ = j.out.Close()
|
||||||
|
}
|
||||||
|
|
||||||
// Path will always be an absolute path
|
// Path will always be an absolute path
|
||||||
func createTranscodeCommand(cmd, path string, maxBitRate int) []string {
|
func createTranscodeCommand(cmd, path string, maxBitRate int) []string {
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -24,6 +24,7 @@ require (
|
||||||
github.com/golangci/golangci-lint v1.50.1
|
github.com/golangci/golangci-lint v1.50.1
|
||||||
github.com/google/uuid v1.3.0
|
github.com/google/uuid v1.3.0
|
||||||
github.com/google/wire v0.5.0
|
github.com/google/wire v0.5.0
|
||||||
|
github.com/hashicorp/go-multierror v1.1.1
|
||||||
github.com/kennygrant/sanitize v0.0.0-20170120101633-6a0bfdde8629
|
github.com/kennygrant/sanitize v0.0.0-20170120101633-6a0bfdde8629
|
||||||
github.com/kr/pretty v0.3.1
|
github.com/kr/pretty v0.3.1
|
||||||
github.com/lestrrat-go/jwx/v2 v2.0.8
|
github.com/lestrrat-go/jwx/v2 v2.0.8
|
||||||
|
@ -127,7 +128,6 @@ require (
|
||||||
github.com/gostaticanalysis/forcetypeassert v0.1.0 // indirect
|
github.com/gostaticanalysis/forcetypeassert v0.1.0 // indirect
|
||||||
github.com/gostaticanalysis/nilerr v0.1.1 // indirect
|
github.com/gostaticanalysis/nilerr v0.1.1 // indirect
|
||||||
github.com/hashicorp/errwrap v1.0.0 // indirect
|
github.com/hashicorp/errwrap v1.0.0 // indirect
|
||||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
|
||||||
github.com/hashicorp/go-version v1.6.0 // indirect
|
github.com/hashicorp/go-version v1.6.0 // indirect
|
||||||
github.com/hashicorp/golang-lru v0.5.4 // indirect
|
github.com/hashicorp/golang-lru v0.5.4 // indirect
|
||||||
github.com/hashicorp/hcl v1.0.0 // indirect
|
github.com/hashicorp/hcl v1.0.0 // indirect
|
||||||
|
|
43
utils/cache/file_caches.go
vendored
43
utils/cache/file_caches.go
vendored
|
@ -10,6 +10,7 @@ import (
|
||||||
|
|
||||||
"github.com/djherbis/fscache"
|
"github.com/djherbis/fscache"
|
||||||
"github.com/dustin/go-humanize"
|
"github.com/dustin/go-humanize"
|
||||||
|
"github.com/hashicorp/go-multierror"
|
||||||
"github.com/navidrome/navidrome/conf"
|
"github.com/navidrome/navidrome/conf"
|
||||||
"github.com/navidrome/navidrome/consts"
|
"github.com/navidrome/navidrome/consts"
|
||||||
"github.com/navidrome/navidrome/log"
|
"github.com/navidrome/navidrome/log"
|
||||||
|
@ -27,7 +28,7 @@ type FileCache interface {
|
||||||
Available(ctx context.Context) bool
|
Available(ctx context.Context) bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFileCache(name, cacheSize, cacheFolder string, maxItems int, getReader ReadFunc) *fileCache {
|
func NewFileCache(name, cacheSize, cacheFolder string, maxItems int, getReader ReadFunc) FileCache {
|
||||||
fc := &fileCache{
|
fc := &fileCache{
|
||||||
name: name,
|
name: name,
|
||||||
cacheSize: cacheSize,
|
cacheSize: cacheSize,
|
||||||
|
@ -86,6 +87,16 @@ func (fc *fileCache) Available(ctx context.Context) bool {
|
||||||
return fc.ready && !fc.disabled
|
return fc.ready && !fc.disabled
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fc *fileCache) invalidate(ctx context.Context, key string) error {
|
||||||
|
if !fc.Available(ctx) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if !fc.cache.Exists(key) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fc.cache.Remove(key)
|
||||||
|
}
|
||||||
|
|
||||||
func (fc *fileCache) Get(ctx context.Context, arg Item) (*CachedStream, error) {
|
func (fc *fileCache) Get(ctx context.Context, arg Item) (*CachedStream, error) {
|
||||||
if !fc.Available(ctx) {
|
if !fc.Available(ctx) {
|
||||||
reader, err := fc.getReader(ctx, arg)
|
reader, err := fc.getReader(ctx, arg)
|
||||||
|
@ -109,10 +120,17 @@ func (fc *fileCache) Get(ctx context.Context, arg Item) (*CachedStream, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
go copyAndClose(ctx, w, reader)
|
go func() {
|
||||||
|
if err := copyAndClose(w, reader); err != nil {
|
||||||
|
log.Debug(ctx, "Error populating cache", "key", key, err)
|
||||||
|
if err = fc.invalidate(ctx, key); err != nil {
|
||||||
|
log.Warn(ctx, "Error removing key from cache", "key", key, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// If it is in the cache, check if the stream is done being written. If so, return a ReaderSeeker
|
// If it is in the cache, check if the stream is done being written. If so, return a ReadSeeker
|
||||||
if cached {
|
if cached {
|
||||||
size := getFinalCachedSize(r)
|
size := getFinalCachedSize(r)
|
||||||
if size >= 0 {
|
if size >= 0 {
|
||||||
|
@ -129,7 +147,7 @@ func (fc *fileCache) Get(ctx context.Context, arg Item) (*CachedStream, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// All other cases, just return a Reader, without Seek capabilities
|
// All other cases, just return the cache reader, without Seek capabilities
|
||||||
return &CachedStream{Reader: r, Cached: cached}, nil
|
return &CachedStream{Reader: r, Cached: cached}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -140,7 +158,6 @@ type CachedStream struct {
|
||||||
Cached bool
|
Cached bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *CachedStream) Seekable() bool { return s.Seeker != nil }
|
|
||||||
func (s *CachedStream) Close() error {
|
func (s *CachedStream) Close() error {
|
||||||
if s.Closer != nil {
|
if s.Closer != nil {
|
||||||
return s.Closer.Close()
|
return s.Closer.Close()
|
||||||
|
@ -162,21 +179,21 @@ func getFinalCachedSize(r fscache.ReadAtCloser) int64 {
|
||||||
return -1
|
return -1
|
||||||
}
|
}
|
||||||
|
|
||||||
func copyAndClose(ctx context.Context, w io.WriteCloser, r io.Reader) {
|
func copyAndClose(w io.WriteCloser, r io.Reader) error {
|
||||||
_, err := io.Copy(w, r)
|
_, err := io.Copy(w, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(ctx, "Error copying data to cache", err)
|
err = fmt.Errorf("copying data to cache: %w", err)
|
||||||
}
|
}
|
||||||
if c, ok := r.(io.Closer); ok {
|
if c, ok := r.(io.Closer); ok {
|
||||||
err = c.Close()
|
if cErr := c.Close(); cErr != nil {
|
||||||
if err != nil {
|
err = multierror.Append(err, fmt.Errorf("closing source stream: %w", cErr))
|
||||||
log.Error(ctx, "Error closing source stream", err)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = w.Close()
|
|
||||||
if err != nil {
|
if cErr := w.Close(); cErr != nil {
|
||||||
log.Error(ctx, "Error closing cache writer", err)
|
err = multierror.Append(err, fmt.Errorf("closing cache writer: %w", cErr))
|
||||||
}
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func newFSCache(name, cacheSize, cacheFolder string, maxItems int) (fscache.Cache, error) {
|
func newFSCache(name, cacheSize, cacheFolder string, maxItems int) (fscache.Cache, error) {
|
||||||
|
|
90
utils/cache/file_caches_test.go
vendored
90
utils/cache/file_caches_test.go
vendored
|
@ -2,12 +2,14 @@ package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/navidrome/navidrome/conf"
|
"github.com/navidrome/navidrome/conf"
|
||||||
|
"github.com/navidrome/navidrome/conf/configtest"
|
||||||
. "github.com/onsi/ginkgo/v2"
|
. "github.com/onsi/ginkgo/v2"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
)
|
)
|
||||||
|
@ -15,16 +17,18 @@ import (
|
||||||
// Call NewFileCache and wait for it to be ready
|
// Call NewFileCache and wait for it to be ready
|
||||||
func callNewFileCache(name, cacheSize, cacheFolder string, maxItems int, getReader ReadFunc) *fileCache {
|
func callNewFileCache(name, cacheSize, cacheFolder string, maxItems int, getReader ReadFunc) *fileCache {
|
||||||
fc := NewFileCache(name, cacheSize, cacheFolder, maxItems, getReader)
|
fc := NewFileCache(name, cacheSize, cacheFolder, maxItems, getReader)
|
||||||
Eventually(func() bool { return fc.Ready(context.TODO()) }).Should(BeTrue())
|
Eventually(func() bool { return fc.Ready(context.Background()) }).Should(BeTrue())
|
||||||
return fc
|
return fc.(*fileCache)
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ = Describe("File Caches", func() {
|
var _ = Describe("File Caches", func() {
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
conf.Server.DataFolder, _ = os.MkdirTemp("", "file_caches")
|
tmpDir, _ := os.MkdirTemp("", "file_caches")
|
||||||
|
DeferCleanup(func() {
|
||||||
|
configtest.SetupConfig()
|
||||||
|
_ = os.RemoveAll(tmpDir)
|
||||||
})
|
})
|
||||||
AfterEach(func() {
|
conf.Server.DataFolder = tmpDir
|
||||||
_ = os.RemoveAll(conf.Server.DataFolder)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
Describe("NewFileCache", func() {
|
Describe("NewFileCache", func() {
|
||||||
|
@ -56,7 +60,7 @@ var _ = Describe("File Caches", func() {
|
||||||
return strings.NewReader(arg.Key()), nil
|
return strings.NewReader(arg.Key()), nil
|
||||||
})
|
})
|
||||||
// First call is a MISS
|
// First call is a MISS
|
||||||
s, err := fc.Get(context.TODO(), &testArg{"test"})
|
s, err := fc.Get(context.Background(), &testArg{"test"})
|
||||||
Expect(err).To(BeNil())
|
Expect(err).To(BeNil())
|
||||||
Expect(s.Cached).To(BeFalse())
|
Expect(s.Cached).To(BeFalse())
|
||||||
Expect(s.Closer).To(BeNil())
|
Expect(s.Closer).To(BeNil())
|
||||||
|
@ -64,7 +68,7 @@ var _ = Describe("File Caches", func() {
|
||||||
|
|
||||||
// Second call is a HIT
|
// Second call is a HIT
|
||||||
called = false
|
called = false
|
||||||
s, err = fc.Get(context.TODO(), &testArg{"test"})
|
s, err = fc.Get(context.Background(), &testArg{"test"})
|
||||||
Expect(err).To(BeNil())
|
Expect(err).To(BeNil())
|
||||||
Expect(io.ReadAll(s)).To(Equal([]byte("test")))
|
Expect(io.ReadAll(s)).To(Equal([]byte("test")))
|
||||||
Expect(s.Cached).To(BeTrue())
|
Expect(s.Cached).To(BeTrue())
|
||||||
|
@ -79,22 +83,90 @@ var _ = Describe("File Caches", func() {
|
||||||
return strings.NewReader(arg.Key()), nil
|
return strings.NewReader(arg.Key()), nil
|
||||||
})
|
})
|
||||||
// First call is a MISS
|
// First call is a MISS
|
||||||
s, err := fc.Get(context.TODO(), &testArg{"test"})
|
s, err := fc.Get(context.Background(), &testArg{"test"})
|
||||||
Expect(err).To(BeNil())
|
Expect(err).To(BeNil())
|
||||||
Expect(s.Cached).To(BeFalse())
|
Expect(s.Cached).To(BeFalse())
|
||||||
Expect(io.ReadAll(s)).To(Equal([]byte("test")))
|
Expect(io.ReadAll(s)).To(Equal([]byte("test")))
|
||||||
|
|
||||||
// Second call is also a MISS
|
// Second call is also a MISS
|
||||||
called = false
|
called = false
|
||||||
s, err = fc.Get(context.TODO(), &testArg{"test"})
|
s, err = fc.Get(context.Background(), &testArg{"test"})
|
||||||
Expect(err).To(BeNil())
|
Expect(err).To(BeNil())
|
||||||
Expect(io.ReadAll(s)).To(Equal([]byte("test")))
|
Expect(io.ReadAll(s)).To(Equal([]byte("test")))
|
||||||
Expect(s.Cached).To(BeFalse())
|
Expect(s.Cached).To(BeFalse())
|
||||||
Expect(called).To(BeTrue())
|
Expect(called).To(BeTrue())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Context("reader errors", func() {
|
||||||
|
When("creating a reader fails", func() {
|
||||||
|
It("does not cache", func() {
|
||||||
|
fc := callNewFileCache("test", "1KB", "test", 0, func(ctx context.Context, arg Item) (io.Reader, error) {
|
||||||
|
return nil, errors.New("failed")
|
||||||
|
})
|
||||||
|
|
||||||
|
_, err := fc.Get(context.Background(), &testArg{"test"})
|
||||||
|
Expect(err).To(MatchError("failed"))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
When("reader returns error", func() {
|
||||||
|
It("does not cache", func() {
|
||||||
|
fc := callNewFileCache("test", "1KB", "test", 0, func(ctx context.Context, arg Item) (io.Reader, error) {
|
||||||
|
return errFakeReader{errors.New("read failure")}, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
s, err := fc.Get(context.Background(), &testArg{"test"})
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
_, _ = io.Copy(io.Discard, s)
|
||||||
|
// TODO How to make the fscache reader return the underlying reader error?
|
||||||
|
//Expect(err).To(MatchError("read failure"))
|
||||||
|
|
||||||
|
// Data should not be cached (or eventually be removed from cache)
|
||||||
|
Eventually(func() bool {
|
||||||
|
s, _ = fc.Get(context.Background(), &testArg{"test"})
|
||||||
|
if s != nil {
|
||||||
|
return s.Cached
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}).Should(BeFalse())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
When("context is canceled", func() {
|
||||||
|
It("does not cache", func() {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
fc := callNewFileCache("test", "1KB", "test", 0, func(ctx context.Context, arg Item) (io.Reader, error) {
|
||||||
|
return &ctxFakeReader{ctx}, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
s, err := fc.Get(ctx, &testArg{"test"})
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
cancel()
|
||||||
|
b := make([]byte, 10)
|
||||||
|
_, err = s.Read(b)
|
||||||
|
// TODO Should be context.Canceled error
|
||||||
|
Expect(err).To(MatchError(io.EOF))
|
||||||
|
|
||||||
|
// Data should not be cached (or eventually be removed from cache)
|
||||||
|
Eventually(func() bool {
|
||||||
|
s, _ = fc.Get(context.Background(), &testArg{"test"})
|
||||||
|
if s != nil {
|
||||||
|
return s.Cached
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}).Should(BeFalse())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
type testArg struct{ s string }
|
type testArg struct{ s string }
|
||||||
|
|
||||||
func (t *testArg) Key() string { return t.s }
|
func (t *testArg) Key() string { return t.s }
|
||||||
|
|
||||||
|
type errFakeReader struct{ err error }
|
||||||
|
|
||||||
|
func (e errFakeReader) Read([]byte) (int, error) { return 0, e.err }
|
||||||
|
|
||||||
|
type ctxFakeReader struct{ ctx context.Context }
|
||||||
|
|
||||||
|
func (e *ctxFakeReader) Read([]byte) (int, error) { return 0, e.ctx.Err() }
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue