diff --git a/cmd/wire_gen.go b/cmd/wire_gen.go index f3333fc9b..f4e4a164a 100644 --- a/cmd/wire_gen.go +++ b/cmd/wire_gen.go @@ -27,7 +27,10 @@ func CreateServer(musicFolder string) *server.Server { func CreateScanner(musicFolder string) scanner.Scanner { dataStore := persistence.New() - scannerScanner := scanner.New(dataStore) + artworkCache := core.NewImageCache() + artwork := core.NewArtwork(dataStore, artworkCache) + cacheWarmer := core.NewCacheWarmer(artworkCache, artwork) + scannerScanner := scanner.New(dataStore, cacheWarmer) return scannerScanner } diff --git a/conf/configuration.go b/conf/configuration.go index 08459fe6d..86051de30 100644 --- a/conf/configuration.go +++ b/conf/configuration.go @@ -48,6 +48,7 @@ type configOptions struct { // DevFlags. These are used to enable/disable debugging and incomplete features DevLogSourceLine bool DevAutoCreateAdminPassword string + DevPreCacheAlbumArtwork bool } type scannerOptions struct { @@ -132,7 +133,7 @@ func init() { // DevFlags. These are used to enable/disable debugging and incomplete features viper.SetDefault("devlogsourceline", false) viper.SetDefault("devautocreateadminpassword", "") - viper.SetDefault("devoldscanner", false) + viper.SetDefault("devprecachealbumartwork", false) } func InitConfig(cfgFile string) { diff --git a/core/cache_warmer.go b/core/cache_warmer.go new file mode 100644 index 000000000..58e5735b9 --- /dev/null +++ b/core/cache_warmer.go @@ -0,0 +1,72 @@ +package core + +import ( + "context" + "io/ioutil" + + "github.com/deluan/navidrome/conf" + "github.com/deluan/navidrome/core/pool" + "github.com/deluan/navidrome/log" +) + +type CacheWarmer interface { + AddAlbum(ctx context.Context, albumID string) + Flush(ctx context.Context) +} + +func NewCacheWarmer(cache ArtworkCache, artwork Artwork) CacheWarmer { + w := &warmer{ + artwork: artwork, + cache: cache, + albums: map[string]struct{}{}, + } + p, err := pool.NewPool("artwork", 3, &artworkItem{}, w.execute) + if err != nil { + log.Error(context.Background(), "Error creating pool for Album Artwork Cache Warmer", err) + } else { + w.pool = p + } + + return w +} + +type warmer struct { + pool *pool.Pool + artwork Artwork + cache ArtworkCache + albums map[string]struct{} +} + +func (w *warmer) AddAlbum(ctx context.Context, albumID string) { + if albumID == "" { + return + } + w.albums[albumID] = struct{}{} +} + +func (w *warmer) Flush(ctx context.Context) { + if conf.Server.DevPreCacheAlbumArtwork { + if w.pool == nil || len(w.albums) == 0 { + return + } + log.Info(ctx, "Pre-caching album artworks", "numAlbums", len(w.albums)) + for id := range w.albums { + w.pool.Submit(artworkItem{albumID: id}) + } + } + w.albums = map[string]struct{}{} +} + +func (w *warmer) execute(workload interface{}) { + ctx := context.Background() + item := workload.(artworkItem) + log.Trace(ctx, "Pre-caching album artwork", "albumID", item.albumID) + err := w.artwork.Get(ctx, item.albumID, 0, ioutil.Discard) + if err != nil { + log.Warn("Error pre-caching artwork from album", "id", item.albumID, err) + } +} + +type artworkItem struct { + albumID string +} diff --git a/core/pool/pool.go b/core/pool/pool.go new file mode 100644 index 000000000..f781480ab --- /dev/null +++ b/core/pool/pool.go @@ -0,0 +1,99 @@ +package pool + +type Executor func(workload interface{}) + +type Pool struct { + name string + item interface{} + workers []worker + exec Executor + //queue *dque.DQue + queue chan work // receives jobs to send to workers + end chan bool // when receives bool stops workers +} + +func NewPool(name string, workerCount int, item interface{}, exec Executor) (*Pool, error) { + p := &Pool{ + name: name, + item: item, + exec: exec, + queue: make(chan work), + end: make(chan bool), + } + + //q, err := dque.NewOrOpen(name, filepath.Join(conf.Server.DataFolder, "queues", name), 50, p.itemBuilder) + //if err != nil { + // return nil, err + //} + //p.queue = q + for i := 0; i < workerCount; i++ { + worker := worker{ + p: p, + id: i, + channel: make(chan work), + workerChannel: workerChannel, + end: make(chan bool)} + worker.Start() + p.workers = append(p.workers, worker) + } + + // start pool + go func() { + for { + select { + case <-p.end: + for _, w := range p.workers { + w.Stop() // stop worker + } + return + case work := <-p.queue: + worker := <-workerChannel // wait for available channel + worker <- work // dispatch work to worker + } + } + }() + return p, nil +} + +func (p *Pool) Submit(workload interface{}) { + p.queue <- work{workload} +} + +//func (p *Pool) itemBuilder() interface{} { +// t := reflect.TypeOf(p.item) +// return reflect.New(t).Interface() +//} +// +var workerChannel = make(chan chan work) + +type work struct { + workload interface{} +} + +type worker struct { + id int + p *Pool + workerChannel chan chan work // used to communicate between dispatcher and workers + channel chan work + end chan bool +} + +// start worker +func (w *worker) Start() { + go func() { + for { + w.workerChannel <- w.channel // when the worker is available place channel in queue + select { + case job := <-w.channel: // worker has received job + w.p.exec(job.workload) // do work + case <-w.end: + return + } + } + }() +} + +// end worker +func (w *worker) Stop() { + w.end <- true +} diff --git a/core/pool/pool_test.go b/core/pool/pool_test.go new file mode 100644 index 000000000..a6ce3fc29 --- /dev/null +++ b/core/pool/pool_test.go @@ -0,0 +1,21 @@ +package pool + +import ( + "testing" + + "github.com/deluan/navidrome/log" + "github.com/deluan/navidrome/tests" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestCore(t *testing.T) { + tests.Init(t, false) + log.SetLevel(log.LevelCritical) + RegisterFailHandler(Fail) + RunSpecs(t, "Core Suite") +} + +var _ = Describe("Pool", func() { + +}) diff --git a/core/wire_providers.go b/core/wire_providers.go index c16f4d1cc..217fa56c2 100644 --- a/core/wire_providers.go +++ b/core/wire_providers.go @@ -17,6 +17,7 @@ var Set = wire.NewSet( NewImageCache, NewArchiver, NewExternalInfo, + NewCacheWarmer, LastFMNewClient, SpotifyNewClient, transcoder.New, diff --git a/db/migration/20201025222059_purge_cache.go b/db/migration/20201025222059_purge_cache.go new file mode 100644 index 000000000..c8c74c87c --- /dev/null +++ b/db/migration/20201025222059_purge_cache.go @@ -0,0 +1,25 @@ +package migration + +import ( + "database/sql" + "os" + "path/filepath" + + "github.com/deluan/navidrome/conf" + "github.com/pressly/goose" +) + +func init() { + goose.AddMigration(Up20201025222059, Down20201025222059) +} + +func Up20201025222059(tx *sql.Tx) error { + cacheFolder := filepath.Join(conf.Server.DataFolder, "cache") + notice(tx, "Purging all cache entries, as the format of the cache changed.") + return os.RemoveAll(cacheFolder) +} + +func Down20201025222059(tx *sql.Tx) error { + // This code is executed when the migration is rolled back. + return nil +} diff --git a/scanner/refresh_buffer.go b/scanner/refresh_buffer.go index b588c159b..0aca43f57 100644 --- a/scanner/refresh_buffer.go +++ b/scanner/refresh_buffer.go @@ -25,8 +25,12 @@ func newRefreshBuffer(ctx context.Context, ds model.DataStore) *refreshBuffer { } func (f *refreshBuffer) accumulate(mf model.MediaFile) { - f.album[mf.AlbumID] = struct{}{} - f.artist[mf.AlbumArtistID] = struct{}{} + if mf.AlbumID != "" { + f.album[mf.AlbumID] = struct{}{} + } + if mf.AlbumArtistID != "" { + f.artist[mf.AlbumArtistID] = struct{}{} + } } type refreshCallbackFunc = func(ids ...string) error diff --git a/scanner/scanner.go b/scanner/scanner.go index 6826152a0..f213ed288 100644 --- a/scanner/scanner.go +++ b/scanner/scanner.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/deluan/navidrome/core" "github.com/deluan/navidrome/log" "github.com/deluan/navidrome/model" ) @@ -32,12 +33,13 @@ type FolderScanner interface { } type scanner struct { - folders map[string]FolderScanner - status map[string]*scanStatus - lock *sync.RWMutex - ds model.DataStore - done chan bool - scan chan bool + folders map[string]FolderScanner + status map[string]*scanStatus + lock *sync.RWMutex + ds model.DataStore + cacheWarmer core.CacheWarmer + done chan bool + scan chan bool } type scanStatus struct { @@ -46,14 +48,15 @@ type scanStatus struct { lastUpdate time.Time } -func New(ds model.DataStore) Scanner { +func New(ds model.DataStore, cacheWarmer core.CacheWarmer) Scanner { s := &scanner{ - ds: ds, - folders: map[string]FolderScanner{}, - status: map[string]*scanStatus{}, - lock: &sync.RWMutex{}, - done: make(chan bool), - scan: make(chan bool), + ds: ds, + cacheWarmer: cacheWarmer, + folders: map[string]FolderScanner{}, + status: map[string]*scanStatus{}, + lock: &sync.RWMutex{}, + done: make(chan bool), + scan: make(chan bool), } s.loadFolders() return s @@ -213,5 +216,5 @@ func (s *scanner) loadFolders() { } func (s *scanner) newScanner(f model.MediaFolder) FolderScanner { - return NewTagScanner(f.Path, s.ds) + return NewTagScanner(f.Path, s.ds, s.cacheWarmer) } diff --git a/scanner/tag_scanner.go b/scanner/tag_scanner.go index b9369f9f9..312f52a3b 100644 --- a/scanner/tag_scanner.go +++ b/scanner/tag_scanner.go @@ -9,6 +9,7 @@ import ( "time" "github.com/deluan/navidrome/conf" + "github.com/deluan/navidrome/core" "github.com/deluan/navidrome/log" "github.com/deluan/navidrome/model" "github.com/deluan/navidrome/model/request" @@ -17,19 +18,21 @@ import ( ) type TagScanner struct { - rootFolder string - ds model.DataStore - mapper *mediaFileMapper - plsSync *playlistSync - cnt *counters + rootFolder string + ds model.DataStore + mapper *mediaFileMapper + plsSync *playlistSync + cnt *counters + cacheWarmer core.CacheWarmer } -func NewTagScanner(rootFolder string, ds model.DataStore) *TagScanner { +func NewTagScanner(rootFolder string, ds model.DataStore, cacheWarmer core.CacheWarmer) *TagScanner { return &TagScanner{ - rootFolder: rootFolder, - mapper: newMediaFileMapper(rootFolder), - plsSync: newPlaylistSync(ds), - ds: ds, + rootFolder: rootFolder, + mapper: newMediaFileMapper(rootFolder), + plsSync: newPlaylistSync(ds), + ds: ds, + cacheWarmer: cacheWarmer, } } @@ -62,6 +65,7 @@ const ( // Delete all empty albums, delete all empty artists, clean-up playlists func (s *TagScanner) Scan(ctx context.Context, lastModifiedSince time.Time) error { ctx = s.withAdminUser(ctx) + defer s.cacheWarmer.Flush(ctx) start := time.Now() allFSDirs, err := s.getDirTree(ctx) @@ -209,6 +213,7 @@ func (s *TagScanner) processDeletedDir(ctx context.Context, dir string) error { for _, t := range mfs { buffer.accumulate(t) + s.cacheWarmer.AddAlbum(ctx, t.AlbumID) } err = buffer.flush() @@ -285,6 +290,11 @@ func (s *TagScanner) processChangedDir(ctx context.Context, dir string) error { } } + // Pre cache all changed album artwork + for albumID := range buffer.album { + s.cacheWarmer.AddAlbum(ctx, albumID) + } + err = buffer.flush() log.Info(ctx, "Finished processing changed folder", "dir", dir, "updated", numUpdatedTracks, "purged", numPurgedTracks, "elapsed", time.Since(start))