From 46e7627fd33ba3d65b01e92f9321d5f8ef2ba724 Mon Sep 17 00:00:00 2001 From: Deluan Date: Tue, 8 Mar 2016 14:18:17 -0500 Subject: [PATCH] Optimized import, only updating changed data and purging old data --- .gitignore | 1 + README.md | 1 + domain/album.go | 1 + domain/artist.go | 1 + domain/mediafile.go | 1 + main.go | 2 +- persistence/album_repository.go | 18 +++++ persistence/artist_repository.go | 18 +++++ persistence/ledis_repository.go | 103 +++++++++++++++++++++++---- persistence/ledis_repository_test.go | 70 ++++++++++++++---- persistence/mediafile_repository.go | 20 +++++- scanner/importer.go | 59 +++++++++------ scanner/itunes_scanner.go | 13 ++-- 13 files changed, 253 insertions(+), 55 deletions(-) diff --git a/.gitignore b/.gitignore index 345167000..5c63597ad 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ devDb /tmp .vendor wiki +TODO.md \ No newline at end of file diff --git a/README.md b/README.md index 5a144a60b..330c3738b 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,7 @@ $ go test ./... -v #### Search * https://github.com/sunfmin/redisgosearch +* http://patshaughnessy.net/2011/11/29/two-ways-of-using-redis-to-build-a-nosql-autocomplete-search-index #### Testing * http://goconvey.co/ diff --git a/domain/album.go b/domain/album.go index 12b43fc6a..0a43499ef 100644 --- a/domain/album.go +++ b/domain/album.go @@ -29,4 +29,5 @@ type AlbumRepository interface { Get(id string) (*Album, error) FindByArtist(artistId string) (Albums, error) GetAll(QueryOptions) (Albums, error) + PurgeInactive(active *Albums) error } diff --git a/domain/artist.go b/domain/artist.go index 8f9777aa3..e4a03d8c8 100644 --- a/domain/artist.go +++ b/domain/artist.go @@ -10,6 +10,7 @@ type ArtistRepository interface { Put(m *Artist) error Get(id string) (*Artist, error) GetByName(name string) (*Artist, error) + PurgeInactive(active *Artists) error } type Artists []Artist diff --git a/domain/mediafile.go b/domain/mediafile.go index 32442d18b..f7fa3bd59 100644 --- a/domain/mediafile.go +++ b/domain/mediafile.go @@ -48,4 +48,5 @@ type MediaFileRepository interface { Put(m *MediaFile) error Get(id string) (*MediaFile, error) FindByAlbum(albumId string) (MediaFiles, error) + PurgeInactive(active *MediaFiles) error } diff --git a/main.go b/main.go index acdf6e856..2ea828c5a 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,7 @@ import ( ) func main() { - beego.BConfig.Log.FileLineNum = false + //beego.BConfig.Log.FileLineNum = false if beego.BConfig.RunMode == "dev" { beego.BConfig.WebConfig.DirectoryIndex = true beego.BConfig.WebConfig.StaticDir["/swagger"] = "swagger" diff --git a/persistence/album_repository.go b/persistence/album_repository.go index d4595a76c..172975581 100644 --- a/persistence/album_repository.go +++ b/persistence/album_repository.go @@ -2,6 +2,7 @@ package persistence import ( "errors" + "github.com/deluan/gosonic/domain" ) @@ -40,4 +41,21 @@ func (r *albumRepository) GetAll(options domain.QueryOptions) (domain.Albums, er return as, err } +func (r *albumRepository) PurgeInactive(active *domain.Albums) error { + currentIds, err := r.GetAllIds() + if err != nil { + return err + } + for _, a := range *active { + currentIds[a.Id] = false + } + inactiveIds := make(map[string]bool) + for id, inactive := range currentIds { + if inactive { + inactiveIds[id] = true + } + } + return r.DeleteAll(inactiveIds) +} + var _ domain.AlbumRepository = (*albumRepository)(nil) diff --git a/persistence/artist_repository.go b/persistence/artist_repository.go index 7f096bfc4..c1aba8b1e 100644 --- a/persistence/artist_repository.go +++ b/persistence/artist_repository.go @@ -2,6 +2,7 @@ package persistence import ( "errors" + "github.com/deluan/gosonic/domain" ) @@ -33,4 +34,21 @@ func (r *artistRepository) GetByName(name string) (*domain.Artist, error) { return r.Get(id) } +func (r *artistRepository) PurgeInactive(active *domain.Artists) error { + currentIds, err := r.GetAllIds() + if err != nil { + return err + } + for _, a := range *active { + currentIds[a.Id] = false + } + inactiveIds := make(map[string]bool) + for id, inactive := range currentIds { + if inactive { + inactiveIds[id] = true + } + } + return r.DeleteAll(inactiveIds) +} + var _ domain.ArtistRepository = (*artistRepository)(nil) diff --git a/persistence/ledis_repository.go b/persistence/ledis_repository.go index 867c98a6a..f49028147 100644 --- a/persistence/ledis_repository.go +++ b/persistence/ledis_repository.go @@ -4,17 +4,20 @@ import ( "crypto/md5" "encoding/json" "fmt" + "reflect" + "strings" + "github.com/deluan/gosonic/domain" "github.com/deluan/gosonic/utils" "github.com/siddontang/ledisdb/ledis" - "reflect" - "strings" ) type ledisRepository struct { - table string - entityType reflect.Type - fieldNames []string + table string + entityType reflect.Type + fieldNames []string + parentTable string + parentIdField string } func (r *ledisRepository) init(table string, entity interface{}) { @@ -28,6 +31,7 @@ func (r *ledisRepository) init(table string, entity interface{}) { r.fieldNames[i] = k i++ } + r.parentTable, r.parentIdField, _ = r.getParent(entity) } // TODO Use annotations to specify fields to be used @@ -41,6 +45,62 @@ func (r *ledisRepository) CountAll() (int64, error) { return size, err } +func (r *ledisRepository) GetAllIds() (map[string]bool, error) { + m := make(map[string]bool) + pairs, err := db().ZRange([]byte(r.table+"s:all"), 0, -1) + if err != nil { + return m, err + } + for _, p := range pairs { + m[string(p.Member)] = true + } + return m, err +} + +func (r *ledisRepository) DeleteAll(ids map[string]bool) error { + allKey := r.table + "s:all" + keys := make([][]byte, len(ids)) + + i := 0 + for id, _ := range ids { + // Delete from parent:parentId:table (ZSet) + if r.parentTable != "" { + parentKey := []byte(fmt.Sprintf("%s:%s:%s", r.table, id, r.parentIdField)) + pid, err := db().Get(parentKey) + var parentId string + if err := json.Unmarshal(pid, &parentId); err != nil { + return err + } + if err != nil { + return err + } + parentKey = []byte(fmt.Sprintf("%s:%s:%ss", r.parentTable, parentId, r.table)) + if _, err := db().ZRem(parentKey, []byte(id)); err != nil { + return err + } + } + + // Delete record table:id:* (KV) + if err := r.deleteRecord(id); err != nil { + return err + } + keys[i] = []byte(id) + + i++ + } + + // Delete from table:all (ZSet) + _, err := db().ZRem([]byte(allKey), keys...) + + return err +} + +func (r *ledisRepository) deleteRecord(id string) error { + keys := r.getFieldKeys(id) + _, err := db().Del(keys...) + return err +} + func (r *ledisRepository) Exists(id string) (bool, error) { res, _ := db().ZScore([]byte(r.table+"s:all"), []byte(id)) return res != ledis.InvalidScore, nil @@ -68,25 +128,42 @@ func (r *ledisRepository) saveOrUpdate(id string, entity interface{}) error { return err } - if parentTable, parentId := r.getParent(entity); parentTable != "" { - parentCollectionKey := fmt.Sprintf("%s:%s:%ss", parentTable, parentId, r.table) + if parentCollectionKey := r.getParentRelationKey(entity); parentCollectionKey != "" { _, err = db().ZAdd([]byte(parentCollectionKey), sid) } return nil } +func (r *ledisRepository) getParentRelationKey(entity interface{}) string { + parentId := r.getParentId(entity) + if parentId != "" { + return fmt.Sprintf("%s:%s:%ss", r.parentTable, parentId, r.table) + } + return "" +} + // TODO Optimize -func (r *ledisRepository) getParent(entity interface{}) (table string, id string) { +func (r *ledisRepository) getParent(entity interface{}) (table string, idField string, id string) { dt := reflect.TypeOf(entity).Elem() for i := 0; i < dt.NumField(); i++ { f := dt.Field(i) - table := f.Tag.Get("parent") + table = f.Tag.Get("parent") if table != "" { + idField = f.Name dv := reflect.ValueOf(entity).Elem() - return table, dv.FieldByName(f.Name).String() + id = dv.FieldByName(f.Name).String() + return } } - return "", "" + return +} + +func (r *ledisRepository) getParentId(entity interface{}) string { + if r.parentTable != "" { + dv := reflect.ValueOf(entity).Elem() + return dv.FieldByName(r.parentIdField).String() + } + return "" } func (r *ledisRepository) getFieldKeys(id string) [][]byte { @@ -135,9 +212,9 @@ func (r *ledisRepository) loadAll(entities interface{}, qo ...domain.QueryOption return r.loadFromSet(setName, entities, qo...) } -func (r *ledisRepository) loadChildren(parentTable string, parentId string, entities interface{}, qo ...domain.QueryOptions) error { +func (r *ledisRepository) loadChildren(parentTable string, parentId string, emptyEntityArray interface{}, qo ...domain.QueryOptions) error { setName := fmt.Sprintf("%s:%s:%ss", parentTable, parentId, r.table) - return r.loadFromSet(setName, entities, qo...) + return r.loadFromSet(setName, emptyEntityArray, qo...) } // TODO Optimize it! Probably very slow (and confusing!) diff --git a/persistence/ledis_repository_test.go b/persistence/ledis_repository_test.go index a617cf5dd..0598c4aeb 100644 --- a/persistence/ledis_repository_test.go +++ b/persistence/ledis_repository_test.go @@ -2,15 +2,17 @@ package persistence import ( "fmt" - "github.com/deluan/gosonic/tests" - . "github.com/smartystreets/goconvey/convey" "strconv" "testing" + + "github.com/deluan/gosonic/tests" + . "github.com/smartystreets/goconvey/convey" ) type TestEntity struct { - Id string - Name string + Id string + Name string + ParentId string `parent:"parent"` } func shouldBeEqual(actualStruct interface{}, expectedStruct ...interface{}) string { @@ -65,19 +67,25 @@ func TestBaseRepository(t *testing.T) { Convey("Given an empty DB", func() { repo := createRepo() - Convey("When I save a new entity", func() { - entity := &TestEntity{"123", "My Name"} + Convey("When I save a new entity and a parent", func() { + entity := &TestEntity{"123", "My Name", "ABC"} err := repo.saveOrUpdate("123", entity) - - Convey("Then the method shouldn't return any errors", func() { + Convey("Then saving the entity shouldn't return any errors", func() { So(err, ShouldBeNil) }) - Convey("Then the number of entities should be 1", func() { + Convey("And the number of entities should be 1", func() { count, _ := repo.CountAll() So(count, ShouldEqual, 1) }) + Convey("And the number of children should be 1", func() { + children := make([]TestEntity, 0) + err := repo.loadChildren("parent", "ABC", &children) + So(err, ShouldBeNil) + So(len(children), ShouldEqual, 1) + }) + Convey("And this entity should be equal to the the saved one", func() { actualEntity, _ := repo.readEntity("123") So(actualEntity, shouldBeEqual, entity) @@ -89,11 +97,11 @@ func TestBaseRepository(t *testing.T) { Convey("Given a table with one entity", func() { repo := createRepo() - entity := &TestEntity{"111", "One Name"} + entity := &TestEntity{"111", "One Name", "AAA"} repo.saveOrUpdate(entity.Id, entity) Convey("When I save an entity with a different Id", func() { - newEntity := &TestEntity{"222", "Another Name"} + newEntity := &TestEntity{"222", "Another Name", "AAA"} repo.saveOrUpdate(newEntity.Id, newEntity) Convey("Then the number of entities should be 2", func() { @@ -104,7 +112,7 @@ func TestBaseRepository(t *testing.T) { }) Convey("When I save an entity with the same Id", func() { - newEntity := &TestEntity{"111", "New Name"} + newEntity := &TestEntity{"111", "New Name", "AAA"} repo.saveOrUpdate(newEntity.Id, newEntity) Convey("Then the number of entities should be 1", func() { @@ -125,7 +133,7 @@ func TestBaseRepository(t *testing.T) { Convey("Given a table with 3 entities", func() { repo := createRepo() for i := 1; i <= 3; i++ { - e := &TestEntity{strconv.Itoa(i), fmt.Sprintf("Name %d", i)} + e := &TestEntity{strconv.Itoa(i), fmt.Sprintf("Name %d", i), "AAA"} repo.saveOrUpdate(e.Id, e) } @@ -142,9 +150,45 @@ func TestBaseRepository(t *testing.T) { for _, e := range es { So(e.Id, ShouldBeIn, []string{"1", "2", "3"}) So(e.Name, ShouldBeIn, []string{"Name 1", "Name 2", "Name 3"}) + So(e.ParentId, ShouldEqual, "AAA") } }) }) + Convey("When I call GetAllIds", func() { + ids, err := repo.GetAllIds() + Convey("Then It should not return any error", func() { + So(err, ShouldBeNil) + }) + Convey("And I get all saved ids", func() { + So(len(ids), ShouldEqual, 3) + for k, _ := range ids { + So(k, ShouldBeIn, []string{"1", "2", "3"}) + } + }) + }) + + Convey("When I call DeletaAll with one of the entities", func() { + ids := make(map[string]bool) + ids["1"] = true + err := repo.DeleteAll(ids) + Convey("Then It should not return any error", func() { + So(err, ShouldBeNil) + }) + Convey("Then CountAll should return 2", func() { + count, _ := repo.CountAll() + So(count, ShouldEqual, 2) + }) + Convey("And the deleted record shouldn't be among the children", func() { + children := make([]TestEntity, 0) + err := repo.loadChildren("parent", "AAA", &children) + So(err, ShouldBeNil) + So(len(children), ShouldEqual, 2) + for _, e := range children { + So(e.Id, ShouldNotEqual, "1") + } + }) + + }) }) Reset(func() { diff --git a/persistence/mediafile_repository.go b/persistence/mediafile_repository.go index 12a7973ef..4f609a650 100644 --- a/persistence/mediafile_repository.go +++ b/persistence/mediafile_repository.go @@ -2,8 +2,9 @@ package persistence import ( "errors" - "github.com/deluan/gosonic/domain" "sort" + + "github.com/deluan/gosonic/domain" ) type mediaFileRepository struct { @@ -42,4 +43,21 @@ func (r *mediaFileRepository) FindByAlbum(albumId string) (domain.MediaFiles, er return mfs, err } +func (r *mediaFileRepository) PurgeInactive(active *domain.MediaFiles) error { + currentIds, err := r.GetAllIds() + if err != nil { + return err + } + for _, a := range *active { + currentIds[a.Id] = false + } + inactiveIds := make(map[string]bool) + for id, inactive := range currentIds { + if inactive { + inactiveIds[id] = true + } + } + return r.DeleteAll(inactiveIds) +} + var _ domain.MediaFileRepository = (*mediaFileRepository)(nil) diff --git a/scanner/importer.go b/scanner/importer.go index 99067d77f..edd8f2b01 100644 --- a/scanner/importer.go +++ b/scanner/importer.go @@ -2,14 +2,15 @@ package scanner import ( "fmt" + "strconv" + "strings" + "time" + "github.com/astaxie/beego" "github.com/deluan/gosonic/consts" "github.com/deluan/gosonic/domain" "github.com/deluan/gosonic/persistence" "github.com/deluan/gosonic/utils" - "strconv" - "strings" - "time" ) type Scanner interface { @@ -45,14 +46,16 @@ type Importer struct { artistRepo domain.ArtistRepository idxRepo domain.ArtistIndexRepository propertyRepo domain.PropertyRepository + lastScan time.Time } func (i *Importer) Run() { - if total, err := i.scanner.ScanLibrary(i.lastModifiedSince(), i.mediaFolder); err != nil { + i.lastScan = i.lastModifiedSince() + if total, err := i.scanner.ScanLibrary(i.lastScan, i.mediaFolder); err != nil { beego.Error("Error importing iTunes Library:", err) return } else { - beego.Info("Found", total, "tracks,", + beego.Debug("Found", total, "tracks,", len(i.scanner.MediaFiles()), "songs,", len(i.scanner.Albums()), "albums,", len(i.scanner.Artists()), "artists") @@ -76,20 +79,39 @@ func (i *Importer) lastModifiedSince() time.Time { func (i *Importer) importLibrary() (err error) { indexGroups := utils.ParseIndexGroups(beego.AppConfig.String("indexGroups")) artistIndex := make(map[string]tempIndex) + mfs := make(domain.MediaFiles, len(i.scanner.MediaFiles())) + als := make(domain.Albums, len(i.scanner.Albums())) + ars := make(domain.Artists, len(i.scanner.Artists())) + beego.Debug("Saving updated data") + j := 0 for _, mf := range i.scanner.MediaFiles() { + mfs[j] = *mf + j++ + if mf.UpdatedAt.Before(i.lastScan) { + continue + } if err := i.mfRepo.Put(mf); err != nil { beego.Error(err) } } + j = 0 for _, al := range i.scanner.Albums() { + als[j] = *al + j++ + if al.UpdatedAt.Before(i.lastScan) { + continue + } if err := i.albumRepo.Put(al); err != nil { beego.Error(err) } } + j = 0 for _, ar := range i.scanner.Artists() { + ars[j] = *ar + j++ if err := i.artistRepo.Put(ar); err != nil { beego.Error(err) } @@ -100,6 +122,17 @@ func (i *Importer) importLibrary() (err error) { beego.Error(err) } + beego.Debug("Purging old data") + if err := i.mfRepo.PurgeInactive(&mfs); err != nil { + beego.Error(err) + } + if err := i.albumRepo.PurgeInactive(&als); err != nil { + beego.Error(err) + } + if err := i.artistRepo.PurgeInactive(&ars); err != nil { + beego.Error(err) + } + c, _ := i.artistRepo.CountAll() beego.Info("Total Artists in database:", c) c, _ = i.albumRepo.CountAll() @@ -116,22 +149,6 @@ func (i *Importer) importLibrary() (err error) { return err } -func (i *Importer) persist(mf *domain.MediaFile, album *domain.Album, artist *domain.Artist) { - if err := i.artistRepo.Put(artist); err != nil { - beego.Error(err) - } - - album.ArtistId = artist.Id - if err := i.albumRepo.Put(album); err != nil { - beego.Error(err) - } - - mf.AlbumId = album.Id - if err := i.mfRepo.Put(mf); err != nil { - beego.Error(err) - } -} - func (i *Importer) collectIndex(ig utils.IndexGroups, a *domain.Artist, artistIndex map[string]tempIndex) { name := a.Name indexName := strings.ToLower(utils.NoArticle(name)) diff --git a/scanner/itunes_scanner.go b/scanner/itunes_scanner.go index 6f2de7bbc..3b8590772 100644 --- a/scanner/itunes_scanner.go +++ b/scanner/itunes_scanner.go @@ -3,16 +3,17 @@ package scanner import ( "crypto/md5" "fmt" - "github.com/astaxie/beego" - "github.com/deluan/gosonic/domain" - "github.com/deluan/itl" - "github.com/dhowden/tag" "net/url" "os" "path/filepath" "strconv" "strings" "time" + + "github.com/astaxie/beego" + "github.com/deluan/gosonic/domain" + "github.com/deluan/itl" + "github.com/dhowden/tag" ) type ItunesScanner struct { @@ -30,7 +31,7 @@ func (s *ItunesScanner) ScanLibrary(lastModifiedSince time.Time, path string) (i if err != nil { return 0, err } - beego.Info("Loaded", len(l.Tracks), "tracks") + beego.Debug("Loaded", len(l.Tracks), "tracks") s.lastModifiedSince = lastModifiedSince s.mediaFiles = make(map[string]*domain.MediaFile) @@ -46,7 +47,7 @@ func (s *ItunesScanner) ScanLibrary(lastModifiedSince time.Time, path string) (i } i++ if i%1000 == 0 { - beego.Info("Processed", i, "tracks.", len(s.artists), "artists,", len(s.albums), "albums", len(s.mediaFiles), "songs") + beego.Debug("Processed", i, "tracks.", len(s.artists), "artists,", len(s.albums), "albums", len(s.mediaFiles), "songs") } } return len(l.Tracks), nil