fix(server): race condition when updating artist/album from external sources

Signed-off-by: Deluan <deluan@navidrome.org>
This commit is contained in:
Deluan 2024-12-01 20:16:40 -05:00
parent 8fb09e71b6
commit 2b0bfbd75a

View file

@ -43,8 +43,8 @@ type ExternalMetadata interface {
type externalMetadata struct { type externalMetadata struct {
ds model.DataStore ds model.DataStore
ag *agents.Agents ag *agents.Agents
artistQueue refreshQueue[*auxArtist] artistQueue refreshQueue[auxArtist]
albumQueue refreshQueue[*auxAlbum] albumQueue refreshQueue[auxAlbum]
} }
type auxAlbum struct { type auxAlbum struct {
@ -94,7 +94,7 @@ func (e *externalMetadata) UpdateAlbumInfo(ctx context.Context, id string) (*mod
updatedAt := V(album.ExternalInfoUpdatedAt) updatedAt := V(album.ExternalInfoUpdatedAt)
if updatedAt.IsZero() { if updatedAt.IsZero() {
log.Debug(ctx, "AlbumInfo not cached. Retrieving it now", "updatedAt", updatedAt, "id", id, "name", album.Name) log.Debug(ctx, "AlbumInfo not cached. Retrieving it now", "updatedAt", updatedAt, "id", id, "name", album.Name)
err = e.populateAlbumInfo(ctx, album) err = e.populateAlbumInfo(ctx, *album)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -109,7 +109,7 @@ func (e *externalMetadata) UpdateAlbumInfo(ctx context.Context, id string) (*mod
return &album.Album, nil return &album.Album, nil
} }
func (e *externalMetadata) populateAlbumInfo(ctx context.Context, album *auxAlbum) error { func (e *externalMetadata) populateAlbumInfo(ctx context.Context, album auxAlbum) error {
start := time.Now() start := time.Now()
info, err := e.ag.GetAlbumInfo(ctx, album.Name, album.AlbumArtist, album.MbzAlbumID) info, err := e.ag.GetAlbumInfo(ctx, album.Name, album.AlbumArtist, album.MbzAlbumID)
if errors.Is(err, agents.ErrNotFound) { if errors.Is(err, agents.ErrNotFound) {
@ -197,7 +197,7 @@ func (e *externalMetadata) refreshArtistInfo(ctx context.Context, id string) (*a
updatedAt := V(artist.ExternalInfoUpdatedAt) updatedAt := V(artist.ExternalInfoUpdatedAt)
if updatedAt.IsZero() { if updatedAt.IsZero() {
log.Debug(ctx, "ArtistInfo not cached. Retrieving it now", "updatedAt", updatedAt, "id", id, "name", artist.Name) log.Debug(ctx, "ArtistInfo not cached. Retrieving it now", "updatedAt", updatedAt, "id", id, "name", artist.Name)
err := e.populateArtistInfo(ctx, artist) err := e.populateArtistInfo(ctx, *artist)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -211,7 +211,7 @@ func (e *externalMetadata) refreshArtistInfo(ctx context.Context, id string) (*a
return artist, nil return artist, nil
} }
func (e *externalMetadata) populateArtistInfo(ctx context.Context, artist *auxArtist) error { func (e *externalMetadata) populateArtistInfo(ctx context.Context, artist auxArtist) error {
start := time.Now() start := time.Now()
// Get MBID first, if it is not yet available // Get MBID first, if it is not yet available
if artist.MbzArtistID == "" { if artist.MbzArtistID == "" {
@ -224,10 +224,10 @@ func (e *externalMetadata) populateArtistInfo(ctx context.Context, artist *auxAr
// Call all registered agents and collect information // Call all registered agents and collect information
g := errgroup.Group{} g := errgroup.Group{}
g.SetLimit(2) g.SetLimit(2)
g.Go(func() error { e.callGetImage(ctx, e.ag, artist); return nil }) g.Go(func() error { e.callGetImage(ctx, e.ag, &artist); return nil })
g.Go(func() error { e.callGetBiography(ctx, e.ag, artist); return nil }) g.Go(func() error { e.callGetBiography(ctx, e.ag, &artist); return nil })
g.Go(func() error { e.callGetURL(ctx, e.ag, artist); return nil }) g.Go(func() error { e.callGetURL(ctx, e.ag, &artist); return nil })
g.Go(func() error { e.callGetSimilar(ctx, e.ag, artist, maxSimilarArtists, true); return nil }) g.Go(func() error { e.callGetSimilar(ctx, e.ag, &artist, maxSimilarArtists, true); return nil })
_ = g.Wait() _ = g.Wait()
if utils.IsCtxDone(ctx) { if utils.IsCtxDone(ctx) {
@ -552,10 +552,10 @@ func (e *externalMetadata) loadSimilar(ctx context.Context, artist *auxArtist, c
return nil return nil
} }
type refreshQueue[T any] chan<- T type refreshQueue[T any] chan<- *T
func newRefreshQueue[T any](ctx context.Context, processFn func(context.Context, T) error) refreshQueue[T] { func newRefreshQueue[T any](ctx context.Context, processFn func(context.Context, T) error) refreshQueue[T] {
queue := make(chan T, refreshQueueLength) queue := make(chan *T, refreshQueueLength)
go func() { go func() {
for { for {
select { select {
@ -565,7 +565,7 @@ func newRefreshQueue[T any](ctx context.Context, processFn func(context.Context,
ctx, cancel := context.WithTimeout(ctx, refreshTimeout) ctx, cancel := context.WithTimeout(ctx, refreshTimeout)
select { select {
case item := <-queue: case item := <-queue:
_ = processFn(ctx, item) _ = processFn(ctx, *item)
cancel() cancel()
case <-ctx.Done(): case <-ctx.Done():
cancel() cancel()
@ -575,7 +575,8 @@ func newRefreshQueue[T any](ctx context.Context, processFn func(context.Context,
}() }()
return queue return queue
} }
func (q *refreshQueue[T]) enqueue(item T) {
func (q *refreshQueue[T]) enqueue(item *T) {
select { select {
case *q <- item: case *q <- item:
default: // It is ok to miss a refresh request default: // It is ok to miss a refresh request