diff --git a/core/transcoder/transcoder.go b/core/transcoder/transcoder.go index ad4028460..1231df390 100644 --- a/core/transcoder/transcoder.go +++ b/core/transcoder/transcoder.go @@ -2,6 +2,8 @@ package transcoder import ( "context" + "errors" + "fmt" "io" "os" "os/exec" @@ -12,7 +14,7 @@ import ( ) type Transcoder interface { - Start(ctx context.Context, command, path string, maxBitRate int) (f io.ReadCloser, err error) + Start(ctx context.Context, command, path string, maxBitRate int) (io.ReadCloser, error) } func New() Transcoder { @@ -21,22 +23,54 @@ func New() Transcoder { type externalTranscoder struct{} -func (e *externalTranscoder) Start(ctx context.Context, command, path string, maxBitRate int) (f io.ReadCloser, err error) { +func (e *externalTranscoder) Start(ctx context.Context, command, path string, maxBitRate int) (io.ReadCloser, error) { args := createTranscodeCommand(command, path, maxBitRate) - log.Trace(ctx, "Executing transcoding command", "cmd", args) - cmd := exec.CommandContext(ctx, args[0], args[1:]...) // #nosec + j := &Cmd{ctx: ctx, args: args} + j.PipeReader, j.out = io.Pipe() + err := j.start() + if err != nil { + return nil, err + } + go j.wait() + return j, nil +} + +type Cmd struct { + *io.PipeReader + out *io.PipeWriter + ctx context.Context + args []string + cmd *exec.Cmd +} + +func (j *Cmd) start() error { + cmd := exec.CommandContext(j.ctx, j.args[0], j.args[1:]...) // #nosec + cmd.Stdout = j.out cmd.Stderr = os.Stderr - if f, err = cmd.StdoutPipe(); err != nil { + j.cmd = cmd + + if err := cmd.Start(); err != nil { + return fmt.Errorf("starting cmd: %w", err) + } + return nil +} + +func (j *Cmd) wait() { + if err := j.cmd.Wait(); err != nil { + var exitErr *exec.ExitError + if errors.As(err, &exitErr) { + _ = j.out.CloseWithError(fmt.Errorf("%s exited with non-zero status code: %d", j.args[0], exitErr.ExitCode())) + } else { + _ = j.out.CloseWithError(fmt.Errorf("waiting %s cmd: %w", j.args[0], err)) + } return } - if err = cmd.Start(); err != nil { + if j.ctx.Err() != nil { + _ = j.out.CloseWithError(j.ctx.Err()) return } - - go func() { _ = cmd.Wait() }() // prevent zombies - - return + _ = j.out.Close() } // Path will always be an absolute path diff --git a/go.mod b/go.mod index ee855888d..234941ca4 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/golangci/golangci-lint v1.50.1 github.com/google/uuid v1.3.0 github.com/google/wire v0.5.0 + github.com/hashicorp/go-multierror v1.1.1 github.com/kennygrant/sanitize v0.0.0-20170120101633-6a0bfdde8629 github.com/kr/pretty v0.3.1 github.com/lestrrat-go/jwx/v2 v2.0.8 @@ -127,7 +128,6 @@ require ( github.com/gostaticanalysis/forcetypeassert v0.1.0 // indirect github.com/gostaticanalysis/nilerr v0.1.1 // indirect github.com/hashicorp/errwrap v1.0.0 // indirect - github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-version v1.6.0 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/hashicorp/hcl v1.0.0 // indirect diff --git a/utils/cache/file_caches.go b/utils/cache/file_caches.go index 0bcaf124b..97a3f6f65 100644 --- a/utils/cache/file_caches.go +++ b/utils/cache/file_caches.go @@ -10,6 +10,7 @@ import ( "github.com/djherbis/fscache" "github.com/dustin/go-humanize" + "github.com/hashicorp/go-multierror" "github.com/navidrome/navidrome/conf" "github.com/navidrome/navidrome/consts" "github.com/navidrome/navidrome/log" @@ -27,7 +28,7 @@ type FileCache interface { Available(ctx context.Context) bool } -func NewFileCache(name, cacheSize, cacheFolder string, maxItems int, getReader ReadFunc) *fileCache { +func NewFileCache(name, cacheSize, cacheFolder string, maxItems int, getReader ReadFunc) FileCache { fc := &fileCache{ name: name, cacheSize: cacheSize, @@ -86,6 +87,16 @@ func (fc *fileCache) Available(ctx context.Context) bool { return fc.ready && !fc.disabled } +func (fc *fileCache) invalidate(ctx context.Context, key string) error { + if !fc.Available(ctx) { + return nil + } + if !fc.cache.Exists(key) { + return nil + } + return fc.cache.Remove(key) +} + func (fc *fileCache) Get(ctx context.Context, arg Item) (*CachedStream, error) { if !fc.Available(ctx) { reader, err := fc.getReader(ctx, arg) @@ -109,10 +120,17 @@ func (fc *fileCache) Get(ctx context.Context, arg Item) (*CachedStream, error) { if err != nil { return nil, err } - go copyAndClose(ctx, w, reader) + go func() { + if err := copyAndClose(w, reader); err != nil { + log.Debug(ctx, "Error populating cache", "key", key, err) + if err = fc.invalidate(ctx, key); err != nil { + log.Warn(ctx, "Error removing key from cache", "key", key, err) + } + } + }() } - // If it is in the cache, check if the stream is done being written. If so, return a ReaderSeeker + // If it is in the cache, check if the stream is done being written. If so, return a ReadSeeker if cached { size := getFinalCachedSize(r) if size >= 0 { @@ -129,7 +147,7 @@ func (fc *fileCache) Get(ctx context.Context, arg Item) (*CachedStream, error) { } } - // All other cases, just return a Reader, without Seek capabilities + // All other cases, just return the cache reader, without Seek capabilities return &CachedStream{Reader: r, Cached: cached}, nil } @@ -140,7 +158,6 @@ type CachedStream struct { Cached bool } -func (s *CachedStream) Seekable() bool { return s.Seeker != nil } func (s *CachedStream) Close() error { if s.Closer != nil { return s.Closer.Close() @@ -162,21 +179,21 @@ func getFinalCachedSize(r fscache.ReadAtCloser) int64 { return -1 } -func copyAndClose(ctx context.Context, w io.WriteCloser, r io.Reader) { +func copyAndClose(w io.WriteCloser, r io.Reader) error { _, err := io.Copy(w, r) if err != nil { - log.Error(ctx, "Error copying data to cache", err) + err = fmt.Errorf("copying data to cache: %w", err) } if c, ok := r.(io.Closer); ok { - err = c.Close() - if err != nil { - log.Error(ctx, "Error closing source stream", err) + if cErr := c.Close(); cErr != nil { + err = multierror.Append(err, fmt.Errorf("closing source stream: %w", cErr)) } } - err = w.Close() - if err != nil { - log.Error(ctx, "Error closing cache writer", err) + + if cErr := w.Close(); cErr != nil { + err = multierror.Append(err, fmt.Errorf("closing cache writer: %w", cErr)) } + return err } func newFSCache(name, cacheSize, cacheFolder string, maxItems int) (fscache.Cache, error) { diff --git a/utils/cache/file_caches_test.go b/utils/cache/file_caches_test.go index 788dbf438..853a92f2c 100644 --- a/utils/cache/file_caches_test.go +++ b/utils/cache/file_caches_test.go @@ -2,12 +2,14 @@ package cache import ( "context" + "errors" "io" "os" "path/filepath" "strings" "github.com/navidrome/navidrome/conf" + "github.com/navidrome/navidrome/conf/configtest" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) @@ -15,16 +17,18 @@ import ( // Call NewFileCache and wait for it to be ready func callNewFileCache(name, cacheSize, cacheFolder string, maxItems int, getReader ReadFunc) *fileCache { fc := NewFileCache(name, cacheSize, cacheFolder, maxItems, getReader) - Eventually(func() bool { return fc.Ready(context.TODO()) }).Should(BeTrue()) - return fc + Eventually(func() bool { return fc.Ready(context.Background()) }).Should(BeTrue()) + return fc.(*fileCache) } var _ = Describe("File Caches", func() { BeforeEach(func() { - conf.Server.DataFolder, _ = os.MkdirTemp("", "file_caches") - }) - AfterEach(func() { - _ = os.RemoveAll(conf.Server.DataFolder) + tmpDir, _ := os.MkdirTemp("", "file_caches") + DeferCleanup(func() { + configtest.SetupConfig() + _ = os.RemoveAll(tmpDir) + }) + conf.Server.DataFolder = tmpDir }) Describe("NewFileCache", func() { @@ -56,7 +60,7 @@ var _ = Describe("File Caches", func() { return strings.NewReader(arg.Key()), nil }) // First call is a MISS - s, err := fc.Get(context.TODO(), &testArg{"test"}) + s, err := fc.Get(context.Background(), &testArg{"test"}) Expect(err).To(BeNil()) Expect(s.Cached).To(BeFalse()) Expect(s.Closer).To(BeNil()) @@ -64,7 +68,7 @@ var _ = Describe("File Caches", func() { // Second call is a HIT called = false - s, err = fc.Get(context.TODO(), &testArg{"test"}) + s, err = fc.Get(context.Background(), &testArg{"test"}) Expect(err).To(BeNil()) Expect(io.ReadAll(s)).To(Equal([]byte("test"))) Expect(s.Cached).To(BeTrue()) @@ -79,22 +83,90 @@ var _ = Describe("File Caches", func() { return strings.NewReader(arg.Key()), nil }) // First call is a MISS - s, err := fc.Get(context.TODO(), &testArg{"test"}) + s, err := fc.Get(context.Background(), &testArg{"test"}) Expect(err).To(BeNil()) Expect(s.Cached).To(BeFalse()) Expect(io.ReadAll(s)).To(Equal([]byte("test"))) // Second call is also a MISS called = false - s, err = fc.Get(context.TODO(), &testArg{"test"}) + s, err = fc.Get(context.Background(), &testArg{"test"}) Expect(err).To(BeNil()) Expect(io.ReadAll(s)).To(Equal([]byte("test"))) Expect(s.Cached).To(BeFalse()) Expect(called).To(BeTrue()) }) + + Context("reader errors", func() { + When("creating a reader fails", func() { + It("does not cache", func() { + fc := callNewFileCache("test", "1KB", "test", 0, func(ctx context.Context, arg Item) (io.Reader, error) { + return nil, errors.New("failed") + }) + + _, err := fc.Get(context.Background(), &testArg{"test"}) + Expect(err).To(MatchError("failed")) + }) + }) + When("reader returns error", func() { + It("does not cache", func() { + fc := callNewFileCache("test", "1KB", "test", 0, func(ctx context.Context, arg Item) (io.Reader, error) { + return errFakeReader{errors.New("read failure")}, nil + }) + + s, err := fc.Get(context.Background(), &testArg{"test"}) + Expect(err).ToNot(HaveOccurred()) + _, _ = io.Copy(io.Discard, s) + // TODO How to make the fscache reader return the underlying reader error? + //Expect(err).To(MatchError("read failure")) + + // Data should not be cached (or eventually be removed from cache) + Eventually(func() bool { + s, _ = fc.Get(context.Background(), &testArg{"test"}) + if s != nil { + return s.Cached + } + return false + }).Should(BeFalse()) + }) + }) + When("context is canceled", func() { + It("does not cache", func() { + ctx, cancel := context.WithCancel(context.Background()) + fc := callNewFileCache("test", "1KB", "test", 0, func(ctx context.Context, arg Item) (io.Reader, error) { + return &ctxFakeReader{ctx}, nil + }) + + s, err := fc.Get(ctx, &testArg{"test"}) + Expect(err).ToNot(HaveOccurred()) + cancel() + b := make([]byte, 10) + _, err = s.Read(b) + // TODO Should be context.Canceled error + Expect(err).To(MatchError(io.EOF)) + + // Data should not be cached (or eventually be removed from cache) + Eventually(func() bool { + s, _ = fc.Get(context.Background(), &testArg{"test"}) + if s != nil { + return s.Cached + } + return false + }).Should(BeFalse()) + }) + }) + }) }) }) type testArg struct{ s string } func (t *testArg) Key() string { return t.s } + +type errFakeReader struct{ err error } + +func (e errFakeReader) Read([]byte) (int, error) { return 0, e.err } + +type ctxFakeReader struct{ ctx context.Context } + +func (e *ctxFakeReader) Read([]byte) (int, error) { return 0, e.ctx.Err() }