Refactor FileCache, allow disabling Trasncoding cache

This commit is contained in:
Deluan 2020-07-24 12:42:11 -04:00
parent b795ad55a3
commit 433e31acc8
5 changed files with 191 additions and 92 deletions

View file

@ -202,5 +202,5 @@ func readFromFile(path string) ([]byte, error) {
} }
func NewImageCache() (ImageCache, error) { func NewImageCache() (ImageCache, error) {
return newFileCache("Image", conf.Server.ImageCacheSize, consts.ImageCacheDir, consts.DefaultImageCacheMaxItems) return newFSCache("Image", conf.Server.ImageCacheSize, consts.ImageCacheDir, consts.DefaultImageCacheMaxItems)
} }

View file

@ -1,7 +1,9 @@
package core package core
import ( import (
"context"
"fmt" "fmt"
"io"
"path/filepath" "path/filepath"
"github.com/deluan/navidrome/conf" "github.com/deluan/navidrome/conf"
@ -11,7 +13,116 @@ import (
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
) )
func newFileCache(name, cacheSize, cacheFolder string, maxItems int) (fscache.Cache, error) { type ReadFunc func(ctx context.Context, arg fmt.Stringer) (io.Reader, error)
func NewFileCache(name, cacheSize, cacheFolder string, maxItems int, getReader ReadFunc) (*FileCache, error) {
cache, err := newFSCache(name, cacheSize, cacheFolder, maxItems)
if err != nil {
return nil, err
}
return &FileCache{
name: name,
disabled: cache == nil,
cache: cache,
getReader: getReader,
}, nil
}
type FileCache struct {
disabled bool
name string
cache fscache.Cache
getReader ReadFunc
}
func (fc *FileCache) Get(ctx context.Context, arg fmt.Stringer) (*CachedStream, error) {
if fc.disabled {
log.Debug(ctx, "Cache disabled", "cache", fc.name)
reader, err := fc.getReader(ctx, arg)
if err != nil {
return nil, err
}
return &CachedStream{Reader: reader}, nil
}
key := arg.String()
r, w, err := fc.cache.Get(key)
if err != nil {
return nil, err
}
cached := w == nil
if !cached {
log.Trace(ctx, "Cache MISS", "cache", fc.name, "key", key)
reader, err := fc.getReader(ctx, arg)
if err != nil {
return nil, err
}
go copyAndClose(ctx, w, reader)
}
// If it is in the cache, check if the stream is done being written. If so, return a ReaderSeeker
if cached {
size := getFinalCachedSize(r)
if size >= 0 {
log.Trace(ctx, "Cache HIT", "cache", fc.name, "key", key, "size", size)
sr := io.NewSectionReader(r, 0, size)
return &CachedStream{
Reader: sr,
Seeker: sr,
}, nil
} else {
log.Trace(ctx, "Cache HIT", "cache", fc.name, "key", key)
}
}
// All other cases, just return a Reader, without Seek capabilities
return &CachedStream{Reader: r}, nil
}
type CachedStream struct {
io.Reader
io.Seeker
}
func (s *CachedStream) Seekable() bool { return s.Seeker != nil }
func (s *CachedStream) Close() error {
if c, ok := s.Reader.(io.Closer); ok {
return c.Close()
}
return nil
}
func getFinalCachedSize(r fscache.ReadAtCloser) int64 {
cr, ok := r.(*fscache.CacheReader)
if ok {
size, final, err := cr.Size()
if final && err == nil {
return size
}
}
return -1
}
func copyAndClose(ctx context.Context, w io.WriteCloser, r io.Reader) {
_, err := io.Copy(w, r)
if err != nil {
log.Error(ctx, "Error copying data to cache", err)
}
if c, ok := r.(io.Closer); ok {
err = c.Close()
if err != nil {
log.Error(ctx, "Error closing source stream", err)
}
}
err = w.Close()
if err != nil {
log.Error(ctx, "Error closing cache writer", err)
}
}
func newFSCache(name, cacheSize, cacheFolder string, maxItems int) (fscache.Cache, error) {
if cacheSize == "0" { if cacheSize == "0" {
log.Warn(fmt.Sprintf("%s cache disabled", name)) log.Warn(fmt.Sprintf("%s cache disabled", name))
return nil, nil return nil, nil

View file

@ -18,20 +18,30 @@ var _ = Describe("File Caches", func() {
os.RemoveAll(conf.Server.DataFolder) os.RemoveAll(conf.Server.DataFolder)
}) })
Describe("newFileCache", func() { Describe("NewFileCache", func() {
It("creates the cache folder", func() { It("creates the cache folder", func() {
Expect(newFileCache("test", "1k", "test", 10)).ToNot(BeNil()) Expect(NewFileCache("test", "1k", "test", 10, nil)).ToNot(BeNil())
_, err := os.Stat(filepath.Join(conf.Server.DataFolder, "test")) _, err := os.Stat(filepath.Join(conf.Server.DataFolder, "test"))
Expect(os.IsNotExist(err)).To(BeFalse()) Expect(os.IsNotExist(err)).To(BeFalse())
}) })
It("creates the cache folder with invalid size", func() { It("creates the cache folder with invalid size", func() {
Expect(newFileCache("test", "abc", "test", 10)).ToNot(BeNil()) fc, err := NewFileCache("test", "abc", "test", 10, nil)
Expect(err).To(BeNil())
Expect(fc.cache).ToNot(BeNil())
Expect(fc.disabled).To(BeFalse())
}) })
It("returns empty if cache size is '0'", func() { It("returns empty if cache size is '0'", func() {
Expect(newFileCache("test", "0", "test", 10)).To(BeNil()) fc, err := NewFileCache("test", "0", "test", 10, nil)
Expect(err).To(BeNil())
Expect(fc.cache).To(BeNil())
Expect(fc.disabled).To(BeTrue())
}) })
}) })
Describe("FileCache", func() {
})
}) })

View file

@ -14,23 +14,31 @@ import (
"github.com/deluan/navidrome/log" "github.com/deluan/navidrome/log"
"github.com/deluan/navidrome/model" "github.com/deluan/navidrome/model"
"github.com/deluan/navidrome/model/request" "github.com/deluan/navidrome/model/request"
"github.com/djherbis/fscache"
) )
type MediaStreamer interface { type MediaStreamer interface {
NewStream(ctx context.Context, id string, reqFormat string, reqBitRate int) (*Stream, error) NewStream(ctx context.Context, id string, reqFormat string, reqBitRate int) (*Stream, error)
} }
type TranscodingCache fscache.Cache func NewMediaStreamer(ds model.DataStore, ffm transcoder.Transcoder, cache *FileCache) MediaStreamer {
func NewMediaStreamer(ds model.DataStore, ffm transcoder.Transcoder, cache TranscodingCache) MediaStreamer {
return &mediaStreamer{ds: ds, ffm: ffm, cache: cache} return &mediaStreamer{ds: ds, ffm: ffm, cache: cache}
} }
type mediaStreamer struct { type mediaStreamer struct {
ds model.DataStore ds model.DataStore
ffm transcoder.Transcoder ffm transcoder.Transcoder
cache fscache.Cache cache *FileCache
}
type streamJob struct {
ms *mediaStreamer
mf *model.MediaFile
format string
bitRate int
}
func (j *streamJob) String() string {
return fmt.Sprintf("%s.%d.%s", j.mf.ID, j.bitRate, j.format)
} }
func (ms *mediaStreamer) NewStream(ctx context.Context, id string, reqFormat string, reqBitRate int) (*Stream, error) { func (ms *mediaStreamer) NewStream(ctx context.Context, id string, reqFormat string, reqBitRate int) (*Stream, error) {
@ -49,17 +57,13 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, id string, reqFormat str
}() }()
format, bitRate = selectTranscodingOptions(ctx, ms.ds, mf, reqFormat, reqBitRate) format, bitRate = selectTranscodingOptions(ctx, ms.ds, mf, reqFormat, reqBitRate)
log.Trace(ctx, "Selected transcoding options",
"requestBitrate", reqBitRate, "requestFormat", reqFormat,
"originalBitrate", mf.BitRate, "originalFormat", mf.Suffix,
"selectedBitrate", bitRate, "selectedFormat", format,
)
s := &Stream{ctx: ctx, mf: mf, format: format, bitRate: bitRate} s := &Stream{ctx: ctx, mf: mf, format: format, bitRate: bitRate}
if format == "raw" { if format == "raw" {
log.Debug(ctx, "Streaming raw file", "id", mf.ID, "path", mf.Path, log.Debug(ctx, "Streaming RAW file", "id", mf.ID, "path", mf.Path,
"requestBitrate", reqBitRate, "requestFormat", reqFormat, "requestBitrate", reqBitRate, "requestFormat", reqFormat,
"originalBitrate", mf.BitRate, "originalFormat", mf.Suffix) "originalBitrate", mf.BitRate, "originalFormat", mf.Suffix,
"selectedBitrate", bitRate, "selectedFormat", format)
f, err := os.Open(mf.Path) f, err := os.Open(mf.Path)
if err != nil { if err != nil {
return nil, err return nil, err
@ -71,70 +75,30 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, id string, reqFormat str
return s, nil return s, nil
} }
key := cacheKey(id, bitRate, format) job := &streamJob{
r, w, err := ms.cache.Get(key) ms: ms,
mf: mf,
format: format,
bitRate: bitRate,
}
r, err := ms.cache.Get(ctx, job)
if err != nil { if err != nil {
log.Error(ctx, "Error creating stream caching buffer", "id", mf.ID, err) log.Error(ctx, "Error accessing cache", "id", mf.ID, err)
return nil, err return nil, err
} }
cached = w == nil log.Debug(ctx, "Streaming TRANSCODED file", "id", mf.ID, "path", mf.Path,
// If this is a brand new transcoding request, not in the cache, start transcoding
if !cached {
log.Trace(ctx, "Cache miss. Starting new transcoding session", "id", mf.ID)
t, err := ms.ds.Transcoding(ctx).FindByFormat(format)
if err != nil {
log.Error(ctx, "Error loading transcoding command", "format", format, err)
return nil, os.ErrInvalid
}
out, err := ms.ffm.Start(ctx, t.Command, mf.Path, bitRate)
if err != nil {
log.Error(ctx, "Error starting transcoder", "id", mf.ID, err)
return nil, os.ErrInvalid
}
go copyAndClose(ctx, w, out)
}
// If it is in the cache, check if the stream is done being written. If so, return a ReaderSeeker
if cached {
size := getFinalCachedSize(r)
if size > 0 {
log.Debug(ctx, "Streaming cached file", "id", mf.ID, "path", mf.Path,
"requestBitrate", reqBitRate, "requestFormat", reqFormat,
"originalBitrate", mf.BitRate, "originalFormat", mf.Suffix, "size", size)
sr := io.NewSectionReader(r, 0, size)
s.Reader = sr
s.Closer = r
s.Seeker = sr
s.format = format
return s, nil
}
}
log.Debug(ctx, "Streaming transcoded file", "id", mf.ID, "path", mf.Path,
"requestBitrate", reqBitRate, "requestFormat", reqFormat, "requestBitrate", reqBitRate, "requestFormat", reqFormat,
"originalBitrate", mf.BitRate, "originalFormat", mf.Suffix) "originalBitrate", mf.BitRate, "originalFormat", mf.Suffix,
// All other cases, just return a ReadCloser, without Seek capabilities "selectedBitrate", bitRate, "selectedFormat", format)
s.Reader = r s.Reader = r
s.Closer = r s.Closer = r
s.format = format if r.Seekable() {
return s, nil s.Seeker = r
} }
func copyAndClose(ctx context.Context, w io.WriteCloser, r io.ReadCloser) { return s, nil
_, err := io.Copy(w, r)
if err != nil {
log.Error(ctx, "Error copying data to cache", err)
}
err = r.Close()
if err != nil {
log.Error(ctx, "Error closing transcode output", err)
}
err = w.Close()
if err != nil {
log.Error(ctx, "Error closing cache", err)
}
} }
type Stream struct { type Stream struct {
@ -202,21 +166,21 @@ func selectTranscodingOptions(ctx context.Context, ds model.DataStore, mf *model
return return
} }
func cacheKey(id string, bitRate int, format string) string { func NewTranscodingCache() (*FileCache, error) {
return fmt.Sprintf("%s.%d.%s", id, bitRate, format) return NewFileCache("Transcoding", conf.Server.TranscodingCacheSize,
} consts.TranscodingCacheDir, consts.DefaultTranscodingCacheMaxItems,
func(ctx context.Context, arg fmt.Stringer) (io.Reader, error) {
func getFinalCachedSize(r fscache.ReadAtCloser) int64 { job := arg.(*streamJob)
cr, ok := r.(*fscache.CacheReader) t, err := job.ms.ds.Transcoding(ctx).FindByFormat(job.format)
if ok { if err != nil {
size, final, err := cr.Size() log.Error(ctx, "Error loading transcoding command", "format", job.format, err)
if final && err == nil { return nil, os.ErrInvalid
return size }
} out, err := job.ms.ffm.Start(ctx, t.Command, job.mf.Path, job.bitRate)
} if err != nil {
return -1 log.Error(ctx, "Error starting transcoder", "id", job.mf.ID, err)
} return nil, os.ErrInvalid
}
func NewTranscodingCache() (TranscodingCache, error) { return out, nil
return newFileCache("Transcoding", conf.Server.TranscodingCacheSize, consts.TranscodingCacheDir, consts.DefaultTranscodingCacheMaxItems) })
} }

View file

@ -3,8 +3,11 @@ package core
import ( import (
"context" "context"
"io" "io"
"io/ioutil"
"os"
"strings" "strings"
"github.com/deluan/navidrome/conf"
"github.com/deluan/navidrome/log" "github.com/deluan/navidrome/log"
"github.com/deluan/navidrome/model" "github.com/deluan/navidrome/model"
"github.com/deluan/navidrome/model/request" "github.com/deluan/navidrome/model/request"
@ -18,13 +21,21 @@ var _ = Describe("MediaStreamer", func() {
var ds model.DataStore var ds model.DataStore
ffmpeg := &fakeFFmpeg{Data: "fake data"} ffmpeg := &fakeFFmpeg{Data: "fake data"}
ctx := log.NewContext(context.TODO()) ctx := log.NewContext(context.TODO())
log.SetLevel(log.LevelTrace)
BeforeEach(func() { BeforeEach(func() {
conf.Server.DataFolder, _ = ioutil.TempDir("", "file_caches")
conf.Server.TranscodingCacheSize = "100MB"
ds = &persistence.MockDataStore{MockedTranscoding: &mockTranscodingRepository{}} ds = &persistence.MockDataStore{MockedTranscoding: &mockTranscodingRepository{}}
ds.MediaFile(ctx).(*persistence.MockMediaFile).SetData(`[{"id": "123", "path": "tests/fixtures/test.mp3", "suffix": "mp3", "bitRate": 128, "duration": 257.0}]`) ds.MediaFile(ctx).(*persistence.MockMediaFile).SetData(`[{"id": "123", "path": "tests/fixtures/test.mp3", "suffix": "mp3", "bitRate": 128, "duration": 257.0}]`)
testCache, _ := NewTranscodingCache()
streamer = NewMediaStreamer(ds, ffmpeg, testCache) streamer = NewMediaStreamer(ds, ffmpeg, testCache)
}) })
AfterEach(func() {
os.RemoveAll(conf.Server.DataFolder)
})
Context("NewStream", func() { Context("NewStream", func() {
It("returns a seekable stream if format is 'raw'", func() { It("returns a seekable stream if format is 'raw'", func() {
s, err := streamer.NewStream(ctx, "123", "raw", 0) s, err := streamer.NewStream(ctx, "123", "raw", 0)
@ -48,8 +59,11 @@ var _ = Describe("MediaStreamer", func() {
Expect(s.Duration()).To(Equal(float32(257.0))) Expect(s.Duration()).To(Equal(float32(257.0)))
}) })
It("returns a seekable stream if the file is complete in the cache", func() { It("returns a seekable stream if the file is complete in the cache", func() {
s, err := streamer.NewStream(ctx, "123", "mp3", 32)
Expect(err).To(BeNil())
Eventually(func() bool { return ffmpeg.closed }, "3s").Should(BeTrue()) Eventually(func() bool { return ffmpeg.closed }, "3s").Should(BeTrue())
s, err := streamer.NewStream(ctx, "123", "mp3", 64)
s, err = streamer.NewStream(ctx, "123", "mp3", 32)
Expect(err).To(BeNil()) Expect(err).To(BeNil())
Expect(s.Seekable()).To(BeTrue()) Expect(s.Seekable()).To(BeTrue())
}) })