Optimized import, only updating changed data and purging old data

This commit is contained in:
Deluan 2016-03-08 14:18:17 -05:00
parent df9687bf02
commit 46e7627fd3
13 changed files with 253 additions and 55 deletions

1
.gitignore vendored
View file

@ -5,3 +5,4 @@ devDb
/tmp /tmp
.vendor .vendor
wiki wiki
TODO.md

View file

@ -68,6 +68,7 @@ $ go test ./... -v
#### Search #### Search
* https://github.com/sunfmin/redisgosearch * https://github.com/sunfmin/redisgosearch
* http://patshaughnessy.net/2011/11/29/two-ways-of-using-redis-to-build-a-nosql-autocomplete-search-index
#### Testing #### Testing
* http://goconvey.co/ * http://goconvey.co/

View file

@ -29,4 +29,5 @@ type AlbumRepository interface {
Get(id string) (*Album, error) Get(id string) (*Album, error)
FindByArtist(artistId string) (Albums, error) FindByArtist(artistId string) (Albums, error)
GetAll(QueryOptions) (Albums, error) GetAll(QueryOptions) (Albums, error)
PurgeInactive(active *Albums) error
} }

View file

@ -10,6 +10,7 @@ type ArtistRepository interface {
Put(m *Artist) error Put(m *Artist) error
Get(id string) (*Artist, error) Get(id string) (*Artist, error)
GetByName(name string) (*Artist, error) GetByName(name string) (*Artist, error)
PurgeInactive(active *Artists) error
} }
type Artists []Artist type Artists []Artist

View file

@ -48,4 +48,5 @@ type MediaFileRepository interface {
Put(m *MediaFile) error Put(m *MediaFile) error
Get(id string) (*MediaFile, error) Get(id string) (*MediaFile, error)
FindByAlbum(albumId string) (MediaFiles, error) FindByAlbum(albumId string) (MediaFiles, error)
PurgeInactive(active *MediaFiles) error
} }

View file

@ -6,7 +6,7 @@ import (
) )
func main() { func main() {
beego.BConfig.Log.FileLineNum = false //beego.BConfig.Log.FileLineNum = false
if beego.BConfig.RunMode == "dev" { if beego.BConfig.RunMode == "dev" {
beego.BConfig.WebConfig.DirectoryIndex = true beego.BConfig.WebConfig.DirectoryIndex = true
beego.BConfig.WebConfig.StaticDir["/swagger"] = "swagger" beego.BConfig.WebConfig.StaticDir["/swagger"] = "swagger"

View file

@ -2,6 +2,7 @@ package persistence
import ( import (
"errors" "errors"
"github.com/deluan/gosonic/domain" "github.com/deluan/gosonic/domain"
) )
@ -40,4 +41,21 @@ func (r *albumRepository) GetAll(options domain.QueryOptions) (domain.Albums, er
return as, err return as, err
} }
func (r *albumRepository) PurgeInactive(active *domain.Albums) error {
currentIds, err := r.GetAllIds()
if err != nil {
return err
}
for _, a := range *active {
currentIds[a.Id] = false
}
inactiveIds := make(map[string]bool)
for id, inactive := range currentIds {
if inactive {
inactiveIds[id] = true
}
}
return r.DeleteAll(inactiveIds)
}
var _ domain.AlbumRepository = (*albumRepository)(nil) var _ domain.AlbumRepository = (*albumRepository)(nil)

View file

@ -2,6 +2,7 @@ package persistence
import ( import (
"errors" "errors"
"github.com/deluan/gosonic/domain" "github.com/deluan/gosonic/domain"
) )
@ -33,4 +34,21 @@ func (r *artistRepository) GetByName(name string) (*domain.Artist, error) {
return r.Get(id) return r.Get(id)
} }
func (r *artistRepository) PurgeInactive(active *domain.Artists) error {
currentIds, err := r.GetAllIds()
if err != nil {
return err
}
for _, a := range *active {
currentIds[a.Id] = false
}
inactiveIds := make(map[string]bool)
for id, inactive := range currentIds {
if inactive {
inactiveIds[id] = true
}
}
return r.DeleteAll(inactiveIds)
}
var _ domain.ArtistRepository = (*artistRepository)(nil) var _ domain.ArtistRepository = (*artistRepository)(nil)

View file

@ -4,17 +4,20 @@ import (
"crypto/md5" "crypto/md5"
"encoding/json" "encoding/json"
"fmt" "fmt"
"reflect"
"strings"
"github.com/deluan/gosonic/domain" "github.com/deluan/gosonic/domain"
"github.com/deluan/gosonic/utils" "github.com/deluan/gosonic/utils"
"github.com/siddontang/ledisdb/ledis" "github.com/siddontang/ledisdb/ledis"
"reflect"
"strings"
) )
type ledisRepository struct { type ledisRepository struct {
table string table string
entityType reflect.Type entityType reflect.Type
fieldNames []string fieldNames []string
parentTable string
parentIdField string
} }
func (r *ledisRepository) init(table string, entity interface{}) { func (r *ledisRepository) init(table string, entity interface{}) {
@ -28,6 +31,7 @@ func (r *ledisRepository) init(table string, entity interface{}) {
r.fieldNames[i] = k r.fieldNames[i] = k
i++ i++
} }
r.parentTable, r.parentIdField, _ = r.getParent(entity)
} }
// TODO Use annotations to specify fields to be used // TODO Use annotations to specify fields to be used
@ -41,6 +45,62 @@ func (r *ledisRepository) CountAll() (int64, error) {
return size, err return size, err
} }
func (r *ledisRepository) GetAllIds() (map[string]bool, error) {
m := make(map[string]bool)
pairs, err := db().ZRange([]byte(r.table+"s:all"), 0, -1)
if err != nil {
return m, err
}
for _, p := range pairs {
m[string(p.Member)] = true
}
return m, err
}
func (r *ledisRepository) DeleteAll(ids map[string]bool) error {
allKey := r.table + "s:all"
keys := make([][]byte, len(ids))
i := 0
for id, _ := range ids {
// Delete from parent:parentId:table (ZSet)
if r.parentTable != "" {
parentKey := []byte(fmt.Sprintf("%s:%s:%s", r.table, id, r.parentIdField))
pid, err := db().Get(parentKey)
var parentId string
if err := json.Unmarshal(pid, &parentId); err != nil {
return err
}
if err != nil {
return err
}
parentKey = []byte(fmt.Sprintf("%s:%s:%ss", r.parentTable, parentId, r.table))
if _, err := db().ZRem(parentKey, []byte(id)); err != nil {
return err
}
}
// Delete record table:id:* (KV)
if err := r.deleteRecord(id); err != nil {
return err
}
keys[i] = []byte(id)
i++
}
// Delete from table:all (ZSet)
_, err := db().ZRem([]byte(allKey), keys...)
return err
}
func (r *ledisRepository) deleteRecord(id string) error {
keys := r.getFieldKeys(id)
_, err := db().Del(keys...)
return err
}
func (r *ledisRepository) Exists(id string) (bool, error) { func (r *ledisRepository) Exists(id string) (bool, error) {
res, _ := db().ZScore([]byte(r.table+"s:all"), []byte(id)) res, _ := db().ZScore([]byte(r.table+"s:all"), []byte(id))
return res != ledis.InvalidScore, nil return res != ledis.InvalidScore, nil
@ -68,25 +128,42 @@ func (r *ledisRepository) saveOrUpdate(id string, entity interface{}) error {
return err return err
} }
if parentTable, parentId := r.getParent(entity); parentTable != "" { if parentCollectionKey := r.getParentRelationKey(entity); parentCollectionKey != "" {
parentCollectionKey := fmt.Sprintf("%s:%s:%ss", parentTable, parentId, r.table)
_, err = db().ZAdd([]byte(parentCollectionKey), sid) _, err = db().ZAdd([]byte(parentCollectionKey), sid)
} }
return nil return nil
} }
func (r *ledisRepository) getParentRelationKey(entity interface{}) string {
parentId := r.getParentId(entity)
if parentId != "" {
return fmt.Sprintf("%s:%s:%ss", r.parentTable, parentId, r.table)
}
return ""
}
// TODO Optimize // TODO Optimize
func (r *ledisRepository) getParent(entity interface{}) (table string, id string) { func (r *ledisRepository) getParent(entity interface{}) (table string, idField string, id string) {
dt := reflect.TypeOf(entity).Elem() dt := reflect.TypeOf(entity).Elem()
for i := 0; i < dt.NumField(); i++ { for i := 0; i < dt.NumField(); i++ {
f := dt.Field(i) f := dt.Field(i)
table := f.Tag.Get("parent") table = f.Tag.Get("parent")
if table != "" { if table != "" {
idField = f.Name
dv := reflect.ValueOf(entity).Elem() dv := reflect.ValueOf(entity).Elem()
return table, dv.FieldByName(f.Name).String() id = dv.FieldByName(f.Name).String()
return
} }
} }
return "", "" return
}
func (r *ledisRepository) getParentId(entity interface{}) string {
if r.parentTable != "" {
dv := reflect.ValueOf(entity).Elem()
return dv.FieldByName(r.parentIdField).String()
}
return ""
} }
func (r *ledisRepository) getFieldKeys(id string) [][]byte { func (r *ledisRepository) getFieldKeys(id string) [][]byte {
@ -135,9 +212,9 @@ func (r *ledisRepository) loadAll(entities interface{}, qo ...domain.QueryOption
return r.loadFromSet(setName, entities, qo...) return r.loadFromSet(setName, entities, qo...)
} }
func (r *ledisRepository) loadChildren(parentTable string, parentId string, entities interface{}, qo ...domain.QueryOptions) error { func (r *ledisRepository) loadChildren(parentTable string, parentId string, emptyEntityArray interface{}, qo ...domain.QueryOptions) error {
setName := fmt.Sprintf("%s:%s:%ss", parentTable, parentId, r.table) setName := fmt.Sprintf("%s:%s:%ss", parentTable, parentId, r.table)
return r.loadFromSet(setName, entities, qo...) return r.loadFromSet(setName, emptyEntityArray, qo...)
} }
// TODO Optimize it! Probably very slow (and confusing!) // TODO Optimize it! Probably very slow (and confusing!)

View file

@ -2,15 +2,17 @@ package persistence
import ( import (
"fmt" "fmt"
"github.com/deluan/gosonic/tests"
. "github.com/smartystreets/goconvey/convey"
"strconv" "strconv"
"testing" "testing"
"github.com/deluan/gosonic/tests"
. "github.com/smartystreets/goconvey/convey"
) )
type TestEntity struct { type TestEntity struct {
Id string Id string
Name string Name string
ParentId string `parent:"parent"`
} }
func shouldBeEqual(actualStruct interface{}, expectedStruct ...interface{}) string { func shouldBeEqual(actualStruct interface{}, expectedStruct ...interface{}) string {
@ -65,19 +67,25 @@ func TestBaseRepository(t *testing.T) {
Convey("Given an empty DB", func() { Convey("Given an empty DB", func() {
repo := createRepo() repo := createRepo()
Convey("When I save a new entity", func() { Convey("When I save a new entity and a parent", func() {
entity := &TestEntity{"123", "My Name"} entity := &TestEntity{"123", "My Name", "ABC"}
err := repo.saveOrUpdate("123", entity) err := repo.saveOrUpdate("123", entity)
Convey("Then saving the entity shouldn't return any errors", func() {
Convey("Then the method shouldn't return any errors", func() {
So(err, ShouldBeNil) So(err, ShouldBeNil)
}) })
Convey("Then the number of entities should be 1", func() { Convey("And the number of entities should be 1", func() {
count, _ := repo.CountAll() count, _ := repo.CountAll()
So(count, ShouldEqual, 1) So(count, ShouldEqual, 1)
}) })
Convey("And the number of children should be 1", func() {
children := make([]TestEntity, 0)
err := repo.loadChildren("parent", "ABC", &children)
So(err, ShouldBeNil)
So(len(children), ShouldEqual, 1)
})
Convey("And this entity should be equal to the the saved one", func() { Convey("And this entity should be equal to the the saved one", func() {
actualEntity, _ := repo.readEntity("123") actualEntity, _ := repo.readEntity("123")
So(actualEntity, shouldBeEqual, entity) So(actualEntity, shouldBeEqual, entity)
@ -89,11 +97,11 @@ func TestBaseRepository(t *testing.T) {
Convey("Given a table with one entity", func() { Convey("Given a table with one entity", func() {
repo := createRepo() repo := createRepo()
entity := &TestEntity{"111", "One Name"} entity := &TestEntity{"111", "One Name", "AAA"}
repo.saveOrUpdate(entity.Id, entity) repo.saveOrUpdate(entity.Id, entity)
Convey("When I save an entity with a different Id", func() { Convey("When I save an entity with a different Id", func() {
newEntity := &TestEntity{"222", "Another Name"} newEntity := &TestEntity{"222", "Another Name", "AAA"}
repo.saveOrUpdate(newEntity.Id, newEntity) repo.saveOrUpdate(newEntity.Id, newEntity)
Convey("Then the number of entities should be 2", func() { Convey("Then the number of entities should be 2", func() {
@ -104,7 +112,7 @@ func TestBaseRepository(t *testing.T) {
}) })
Convey("When I save an entity with the same Id", func() { Convey("When I save an entity with the same Id", func() {
newEntity := &TestEntity{"111", "New Name"} newEntity := &TestEntity{"111", "New Name", "AAA"}
repo.saveOrUpdate(newEntity.Id, newEntity) repo.saveOrUpdate(newEntity.Id, newEntity)
Convey("Then the number of entities should be 1", func() { Convey("Then the number of entities should be 1", func() {
@ -125,7 +133,7 @@ func TestBaseRepository(t *testing.T) {
Convey("Given a table with 3 entities", func() { Convey("Given a table with 3 entities", func() {
repo := createRepo() repo := createRepo()
for i := 1; i <= 3; i++ { for i := 1; i <= 3; i++ {
e := &TestEntity{strconv.Itoa(i), fmt.Sprintf("Name %d", i)} e := &TestEntity{strconv.Itoa(i), fmt.Sprintf("Name %d", i), "AAA"}
repo.saveOrUpdate(e.Id, e) repo.saveOrUpdate(e.Id, e)
} }
@ -142,9 +150,45 @@ func TestBaseRepository(t *testing.T) {
for _, e := range es { for _, e := range es {
So(e.Id, ShouldBeIn, []string{"1", "2", "3"}) So(e.Id, ShouldBeIn, []string{"1", "2", "3"})
So(e.Name, ShouldBeIn, []string{"Name 1", "Name 2", "Name 3"}) So(e.Name, ShouldBeIn, []string{"Name 1", "Name 2", "Name 3"})
So(e.ParentId, ShouldEqual, "AAA")
} }
}) })
}) })
Convey("When I call GetAllIds", func() {
ids, err := repo.GetAllIds()
Convey("Then It should not return any error", func() {
So(err, ShouldBeNil)
})
Convey("And I get all saved ids", func() {
So(len(ids), ShouldEqual, 3)
for k, _ := range ids {
So(k, ShouldBeIn, []string{"1", "2", "3"})
}
})
})
Convey("When I call DeletaAll with one of the entities", func() {
ids := make(map[string]bool)
ids["1"] = true
err := repo.DeleteAll(ids)
Convey("Then It should not return any error", func() {
So(err, ShouldBeNil)
})
Convey("Then CountAll should return 2", func() {
count, _ := repo.CountAll()
So(count, ShouldEqual, 2)
})
Convey("And the deleted record shouldn't be among the children", func() {
children := make([]TestEntity, 0)
err := repo.loadChildren("parent", "AAA", &children)
So(err, ShouldBeNil)
So(len(children), ShouldEqual, 2)
for _, e := range children {
So(e.Id, ShouldNotEqual, "1")
}
})
})
}) })
Reset(func() { Reset(func() {

View file

@ -2,8 +2,9 @@ package persistence
import ( import (
"errors" "errors"
"github.com/deluan/gosonic/domain"
"sort" "sort"
"github.com/deluan/gosonic/domain"
) )
type mediaFileRepository struct { type mediaFileRepository struct {
@ -42,4 +43,21 @@ func (r *mediaFileRepository) FindByAlbum(albumId string) (domain.MediaFiles, er
return mfs, err return mfs, err
} }
func (r *mediaFileRepository) PurgeInactive(active *domain.MediaFiles) error {
currentIds, err := r.GetAllIds()
if err != nil {
return err
}
for _, a := range *active {
currentIds[a.Id] = false
}
inactiveIds := make(map[string]bool)
for id, inactive := range currentIds {
if inactive {
inactiveIds[id] = true
}
}
return r.DeleteAll(inactiveIds)
}
var _ domain.MediaFileRepository = (*mediaFileRepository)(nil) var _ domain.MediaFileRepository = (*mediaFileRepository)(nil)

View file

@ -2,14 +2,15 @@ package scanner
import ( import (
"fmt" "fmt"
"strconv"
"strings"
"time"
"github.com/astaxie/beego" "github.com/astaxie/beego"
"github.com/deluan/gosonic/consts" "github.com/deluan/gosonic/consts"
"github.com/deluan/gosonic/domain" "github.com/deluan/gosonic/domain"
"github.com/deluan/gosonic/persistence" "github.com/deluan/gosonic/persistence"
"github.com/deluan/gosonic/utils" "github.com/deluan/gosonic/utils"
"strconv"
"strings"
"time"
) )
type Scanner interface { type Scanner interface {
@ -45,14 +46,16 @@ type Importer struct {
artistRepo domain.ArtistRepository artistRepo domain.ArtistRepository
idxRepo domain.ArtistIndexRepository idxRepo domain.ArtistIndexRepository
propertyRepo domain.PropertyRepository propertyRepo domain.PropertyRepository
lastScan time.Time
} }
func (i *Importer) Run() { func (i *Importer) Run() {
if total, err := i.scanner.ScanLibrary(i.lastModifiedSince(), i.mediaFolder); err != nil { i.lastScan = i.lastModifiedSince()
if total, err := i.scanner.ScanLibrary(i.lastScan, i.mediaFolder); err != nil {
beego.Error("Error importing iTunes Library:", err) beego.Error("Error importing iTunes Library:", err)
return return
} else { } else {
beego.Info("Found", total, "tracks,", beego.Debug("Found", total, "tracks,",
len(i.scanner.MediaFiles()), "songs,", len(i.scanner.MediaFiles()), "songs,",
len(i.scanner.Albums()), "albums,", len(i.scanner.Albums()), "albums,",
len(i.scanner.Artists()), "artists") len(i.scanner.Artists()), "artists")
@ -76,20 +79,39 @@ func (i *Importer) lastModifiedSince() time.Time {
func (i *Importer) importLibrary() (err error) { func (i *Importer) importLibrary() (err error) {
indexGroups := utils.ParseIndexGroups(beego.AppConfig.String("indexGroups")) indexGroups := utils.ParseIndexGroups(beego.AppConfig.String("indexGroups"))
artistIndex := make(map[string]tempIndex) artistIndex := make(map[string]tempIndex)
mfs := make(domain.MediaFiles, len(i.scanner.MediaFiles()))
als := make(domain.Albums, len(i.scanner.Albums()))
ars := make(domain.Artists, len(i.scanner.Artists()))
beego.Debug("Saving updated data")
j := 0
for _, mf := range i.scanner.MediaFiles() { for _, mf := range i.scanner.MediaFiles() {
mfs[j] = *mf
j++
if mf.UpdatedAt.Before(i.lastScan) {
continue
}
if err := i.mfRepo.Put(mf); err != nil { if err := i.mfRepo.Put(mf); err != nil {
beego.Error(err) beego.Error(err)
} }
} }
j = 0
for _, al := range i.scanner.Albums() { for _, al := range i.scanner.Albums() {
als[j] = *al
j++
if al.UpdatedAt.Before(i.lastScan) {
continue
}
if err := i.albumRepo.Put(al); err != nil { if err := i.albumRepo.Put(al); err != nil {
beego.Error(err) beego.Error(err)
} }
} }
j = 0
for _, ar := range i.scanner.Artists() { for _, ar := range i.scanner.Artists() {
ars[j] = *ar
j++
if err := i.artistRepo.Put(ar); err != nil { if err := i.artistRepo.Put(ar); err != nil {
beego.Error(err) beego.Error(err)
} }
@ -100,6 +122,17 @@ func (i *Importer) importLibrary() (err error) {
beego.Error(err) beego.Error(err)
} }
beego.Debug("Purging old data")
if err := i.mfRepo.PurgeInactive(&mfs); err != nil {
beego.Error(err)
}
if err := i.albumRepo.PurgeInactive(&als); err != nil {
beego.Error(err)
}
if err := i.artistRepo.PurgeInactive(&ars); err != nil {
beego.Error(err)
}
c, _ := i.artistRepo.CountAll() c, _ := i.artistRepo.CountAll()
beego.Info("Total Artists in database:", c) beego.Info("Total Artists in database:", c)
c, _ = i.albumRepo.CountAll() c, _ = i.albumRepo.CountAll()
@ -116,22 +149,6 @@ func (i *Importer) importLibrary() (err error) {
return err return err
} }
func (i *Importer) persist(mf *domain.MediaFile, album *domain.Album, artist *domain.Artist) {
if err := i.artistRepo.Put(artist); err != nil {
beego.Error(err)
}
album.ArtistId = artist.Id
if err := i.albumRepo.Put(album); err != nil {
beego.Error(err)
}
mf.AlbumId = album.Id
if err := i.mfRepo.Put(mf); err != nil {
beego.Error(err)
}
}
func (i *Importer) collectIndex(ig utils.IndexGroups, a *domain.Artist, artistIndex map[string]tempIndex) { func (i *Importer) collectIndex(ig utils.IndexGroups, a *domain.Artist, artistIndex map[string]tempIndex) {
name := a.Name name := a.Name
indexName := strings.ToLower(utils.NoArticle(name)) indexName := strings.ToLower(utils.NoArticle(name))

View file

@ -3,16 +3,17 @@ package scanner
import ( import (
"crypto/md5" "crypto/md5"
"fmt" "fmt"
"github.com/astaxie/beego"
"github.com/deluan/gosonic/domain"
"github.com/deluan/itl"
"github.com/dhowden/tag"
"net/url" "net/url"
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/astaxie/beego"
"github.com/deluan/gosonic/domain"
"github.com/deluan/itl"
"github.com/dhowden/tag"
) )
type ItunesScanner struct { type ItunesScanner struct {
@ -30,7 +31,7 @@ func (s *ItunesScanner) ScanLibrary(lastModifiedSince time.Time, path string) (i
if err != nil { if err != nil {
return 0, err return 0, err
} }
beego.Info("Loaded", len(l.Tracks), "tracks") beego.Debug("Loaded", len(l.Tracks), "tracks")
s.lastModifiedSince = lastModifiedSince s.lastModifiedSince = lastModifiedSince
s.mediaFiles = make(map[string]*domain.MediaFile) s.mediaFiles = make(map[string]*domain.MediaFile)
@ -46,7 +47,7 @@ func (s *ItunesScanner) ScanLibrary(lastModifiedSince time.Time, path string) (i
} }
i++ i++
if i%1000 == 0 { if i%1000 == 0 {
beego.Info("Processed", i, "tracks.", len(s.artists), "artists,", len(s.albums), "albums", len(s.mediaFiles), "songs") beego.Debug("Processed", i, "tracks.", len(s.artists), "artists,", len(s.albums), "albums", len(s.mediaFiles), "songs")
} }
} }
return len(l.Tracks), nil return len(l.Tracks), nil