mirror of
https://github.com/navidrome/navidrome.git
synced 2025-04-03 20:47:35 +03:00
Add new Artwork Cache Warmer
This commit is contained in:
parent
8c1cd9c273
commit
b6eb60f019
11 changed files with 501 additions and 215 deletions
176
utils/pl/pipelines.go
Normal file
176
utils/pl/pipelines.go
Normal file
|
@ -0,0 +1,176 @@
|
|||
// Package pl implements some Data Pipeline helper functions.
|
||||
// Reference: https://medium.com/amboss/applying-modern-go-concurrency-patterns-to-data-pipelines-b3b5327908d4#3a80
|
||||
//
|
||||
// See also:
|
||||
//
|
||||
// https://www.oreilly.com/library/view/concurrency-in-go/9781491941294/ch04.html#fano_fani
|
||||
// https://www.youtube.com/watch?v=f6kdp27TYZs
|
||||
// https://www.youtube.com/watch?v=QDDwwePbDtw
|
||||
package pl
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"github.com/navidrome/navidrome/log"
|
||||
"golang.org/x/sync/semaphore"
|
||||
)
|
||||
|
||||
func Stage[In any, Out any](
|
||||
ctx context.Context,
|
||||
maxWorkers int,
|
||||
inputChannel <-chan In,
|
||||
fn func(context.Context, In) (Out, error),
|
||||
) (chan Out, chan error) {
|
||||
outputChannel := make(chan Out)
|
||||
errorChannel := make(chan error)
|
||||
|
||||
limit := int64(maxWorkers)
|
||||
sem1 := semaphore.NewWeighted(limit)
|
||||
|
||||
go func() {
|
||||
defer close(outputChannel)
|
||||
defer close(errorChannel)
|
||||
|
||||
for s := range ReadOrDone(ctx, inputChannel) {
|
||||
if err := sem1.Acquire(ctx, 1); err != nil {
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
log.Error(ctx, "Failed to acquire semaphore", err)
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
go func(s In) {
|
||||
defer sem1.Release(1)
|
||||
|
||||
result, err := fn(ctx, s)
|
||||
if err != nil {
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
errorChannel <- err
|
||||
}
|
||||
} else {
|
||||
outputChannel <- result
|
||||
}
|
||||
}(s)
|
||||
}
|
||||
|
||||
// By using context.Background() here we are assuming the fn will stop when the context
|
||||
// is canceled. This is required so we can wait for the workers to finish and avoid closing
|
||||
// the outputChannel before they are done.
|
||||
if err := sem1.Acquire(context.Background(), limit); err != nil {
|
||||
log.Error(ctx, "Failed waiting for workers", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return outputChannel, errorChannel
|
||||
}
|
||||
|
||||
func Sink[In any](
|
||||
ctx context.Context,
|
||||
maxWorkers int,
|
||||
inputChannel <-chan In,
|
||||
fn func(context.Context, In) error,
|
||||
) chan error {
|
||||
results, errC := Stage(ctx, maxWorkers, inputChannel, func(ctx context.Context, in In) (bool, error) {
|
||||
err := fn(ctx, in)
|
||||
return false, err // Only err is important, results will be discarded
|
||||
})
|
||||
|
||||
// Discard results
|
||||
go func() {
|
||||
for range ReadOrDone(ctx, results) {
|
||||
}
|
||||
}()
|
||||
|
||||
return errC
|
||||
}
|
||||
|
||||
func Merge[T any](ctx context.Context, cs ...<-chan T) <-chan T {
|
||||
var wg sync.WaitGroup
|
||||
out := make(chan T)
|
||||
|
||||
output := func(c <-chan T) {
|
||||
defer wg.Done()
|
||||
for v := range ReadOrDone(ctx, c) {
|
||||
select {
|
||||
case out <- v:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
wg.Add(len(cs))
|
||||
for _, c := range cs {
|
||||
go output(c)
|
||||
}
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(out)
|
||||
}()
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
func SendOrDone[T any](ctx context.Context, out chan<- T, v T) {
|
||||
select {
|
||||
case out <- v:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func ReadOrDone[T any](ctx context.Context, in <-chan T) <-chan T {
|
||||
valStream := make(chan T)
|
||||
go func() {
|
||||
defer close(valStream)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case v, ok := <-in:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case valStream <- v:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
return valStream
|
||||
}
|
||||
|
||||
func Tee[T any](ctx context.Context, in <-chan T) (<-chan T, <-chan T) {
|
||||
out1 := make(chan T)
|
||||
out2 := make(chan T)
|
||||
go func() {
|
||||
defer close(out1)
|
||||
defer close(out2)
|
||||
for val := range ReadOrDone(ctx, in) {
|
||||
var out1, out2 = out1, out2
|
||||
for i := 0; i < 2; i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case out1 <- val:
|
||||
out1 = nil
|
||||
case out2 <- val:
|
||||
out2 = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
return out1, out2
|
||||
}
|
||||
|
||||
func FromSlice[T any](ctx context.Context, in []T) <-chan T {
|
||||
output := make(chan T, len(in))
|
||||
for _, c := range in {
|
||||
output <- c
|
||||
}
|
||||
close(output)
|
||||
return output
|
||||
}
|
168
utils/pl/pipelines_test.go
Normal file
168
utils/pl/pipelines_test.go
Normal file
|
@ -0,0 +1,168 @@
|
|||
package pl_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/navidrome/navidrome/utils/pl"
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
func TestPipeline(t *testing.T) {
|
||||
RegisterFailHandler(Fail)
|
||||
RunSpecs(t, "Pipeline Tests Suite")
|
||||
}
|
||||
|
||||
var _ = Describe("Pipeline", func() {
|
||||
Describe("Stage", func() {
|
||||
Context("happy path", func() {
|
||||
It("calls the 'transform' function and returns values and errors", func() {
|
||||
inC := make(chan int, 4)
|
||||
for i := 0; i < 4; i++ {
|
||||
inC <- i
|
||||
}
|
||||
close(inC)
|
||||
|
||||
outC, errC := pl.Stage(context.Background(), 1, inC, func(ctx context.Context, i int) (int, error) {
|
||||
if i%2 == 0 {
|
||||
return 0, errors.New("even number")
|
||||
}
|
||||
return i * 2, nil
|
||||
})
|
||||
|
||||
Expect(<-errC).To(MatchError("even number"))
|
||||
Expect(<-outC).To(Equal(2))
|
||||
Expect(<-errC).To(MatchError("even number"))
|
||||
Expect(<-outC).To(Equal(6))
|
||||
|
||||
Eventually(outC).Should(BeClosed())
|
||||
Eventually(errC).Should(BeClosed())
|
||||
})
|
||||
})
|
||||
Context("Multiple workers", func() {
|
||||
const maxWorkers = 2
|
||||
const numJobs = 100
|
||||
It("starts multiple workers, respecting the limit", func() {
|
||||
inC := make(chan int, numJobs)
|
||||
for i := 0; i < numJobs; i++ {
|
||||
inC <- i
|
||||
}
|
||||
close(inC)
|
||||
|
||||
current := atomic.Int32{}
|
||||
count := atomic.Int32{}
|
||||
max := atomic.Int32{}
|
||||
outC, _ := pl.Stage(context.Background(), maxWorkers, inC, func(ctx context.Context, in int) (int, error) {
|
||||
defer current.Add(-1)
|
||||
c := current.Add(1)
|
||||
count.Add(1)
|
||||
if c > max.Load() {
|
||||
max.Store(c)
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond) // Slow process
|
||||
return 0, nil
|
||||
})
|
||||
// Discard output and wait for completion
|
||||
for range outC {
|
||||
}
|
||||
|
||||
Expect(count.Load()).To(Equal(int32(numJobs)))
|
||||
Expect(current.Load()).To(Equal(int32(0)))
|
||||
Expect(max.Load()).To(Equal(int32(maxWorkers)))
|
||||
})
|
||||
})
|
||||
When("the context is canceled", func() {
|
||||
It("closes its output", func() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
inC := make(chan int)
|
||||
outC, errC := pl.Stage(ctx, 1, inC, func(ctx context.Context, i int) (int, error) {
|
||||
return i, nil
|
||||
})
|
||||
cancel()
|
||||
Eventually(outC).Should(BeClosed())
|
||||
Eventually(errC).Should(BeClosed())
|
||||
})
|
||||
})
|
||||
|
||||
})
|
||||
Describe("Merge", func() {
|
||||
var in1, in2 chan int
|
||||
BeforeEach(func() {
|
||||
in1 = make(chan int, 4)
|
||||
in2 = make(chan int, 4)
|
||||
for i := 0; i < 4; i++ {
|
||||
in1 <- i
|
||||
in2 <- i + 4
|
||||
}
|
||||
close(in1)
|
||||
close(in2)
|
||||
})
|
||||
When("ranging through the output channel", func() {
|
||||
It("copies values from all input channels to its output channel", func() {
|
||||
var values []int
|
||||
for v := range pl.Merge(context.Background(), in1, in2) {
|
||||
values = append(values, v)
|
||||
}
|
||||
|
||||
Expect(values).To(ConsistOf(0, 1, 2, 3, 4, 5, 6, 7))
|
||||
})
|
||||
})
|
||||
When("there's a blocked channel and the context is closed", func() {
|
||||
It("closes its output", func() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
in3 := make(chan int)
|
||||
out := pl.Merge(ctx, in1, in2, in3)
|
||||
cancel()
|
||||
Eventually(out).Should(BeClosed())
|
||||
})
|
||||
})
|
||||
})
|
||||
Describe("ReadOrDone", func() {
|
||||
When("values are sent", func() {
|
||||
It("copies them to its output channel", func() {
|
||||
in := make(chan int)
|
||||
out := pl.ReadOrDone(context.Background(), in)
|
||||
for i := 0; i < 4; i++ {
|
||||
in <- i
|
||||
j := <-out
|
||||
Expect(i).To(Equal(j))
|
||||
}
|
||||
close(in)
|
||||
Eventually(out).Should(BeClosed())
|
||||
})
|
||||
})
|
||||
When("the context is canceled", func() {
|
||||
It("closes its output", func() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
in := make(chan int)
|
||||
out := pl.ReadOrDone(ctx, in)
|
||||
cancel()
|
||||
Eventually(out).Should(BeClosed())
|
||||
})
|
||||
})
|
||||
})
|
||||
Describe("SendOrDone", func() {
|
||||
When("out is unblocked", func() {
|
||||
It("puts the value in the channel", func() {
|
||||
out := make(chan int)
|
||||
value := 1234
|
||||
go pl.SendOrDone(context.Background(), out, value)
|
||||
Eventually(out).Should(Receive(&value))
|
||||
})
|
||||
})
|
||||
When("out is blocked", func() {
|
||||
It("can be canceled by the context", func() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
out := make(chan int)
|
||||
go pl.SendOrDone(ctx, out, 1234)
|
||||
cancel()
|
||||
|
||||
Consistently(out).ShouldNot(Receive())
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
Loading…
Add table
Add a link
Reference in a new issue