// Package pl implements some Data Pipeline helper functions. // Reference: https://medium.com/amboss/applying-modern-go-concurrency-patterns-to-data-pipelines-b3b5327908d4#3a80 // // See also: // // https://www.oreilly.com/library/view/concurrency-in-go/9781491941294/ch04.html#fano_fani // https://www.youtube.com/watch?v=f6kdp27TYZs // https://www.youtube.com/watch?v=QDDwwePbDtw package pl import ( "context" "errors" "sync" "github.com/navidrome/navidrome/log" "golang.org/x/sync/semaphore" ) func Stage[In any, Out any]( ctx context.Context, maxWorkers int, inputChannel <-chan In, fn func(context.Context, In) (Out, error), ) (chan Out, chan error) { outputChannel := make(chan Out) errorChannel := make(chan error) limit := int64(maxWorkers) sem1 := semaphore.NewWeighted(limit) go func() { defer close(outputChannel) defer close(errorChannel) for s := range ReadOrDone(ctx, inputChannel) { if err := sem1.Acquire(ctx, 1); err != nil { if !errors.Is(err, context.Canceled) { log.Error(ctx, "Failed to acquire semaphore", err) } break } go func(s In) { defer sem1.Release(1) result, err := fn(ctx, s) if err != nil { if !errors.Is(err, context.Canceled) { errorChannel <- err } } else { outputChannel <- result } }(s) } // By using context.Background() here we are assuming the fn will stop when the context // is canceled. This is required so we can wait for the workers to finish and avoid closing // the outputChannel before they are done. if err := sem1.Acquire(context.Background(), limit); err != nil { log.Error(ctx, "Failed waiting for workers", err) } }() return outputChannel, errorChannel } func Sink[In any]( ctx context.Context, maxWorkers int, inputChannel <-chan In, fn func(context.Context, In) error, ) chan error { results, errC := Stage(ctx, maxWorkers, inputChannel, func(ctx context.Context, in In) (bool, error) { err := fn(ctx, in) return false, err // Only err is important, results will be discarded }) // Discard results go func() { for range ReadOrDone(ctx, results) { } }() return errC } func Merge[T any](ctx context.Context, cs ...<-chan T) <-chan T { var wg sync.WaitGroup out := make(chan T) output := func(c <-chan T) { defer wg.Done() for v := range ReadOrDone(ctx, c) { select { case out <- v: case <-ctx.Done(): return } } } wg.Add(len(cs)) for _, c := range cs { go output(c) } go func() { wg.Wait() close(out) }() return out } func SendOrDone[T any](ctx context.Context, out chan<- T, v T) { select { case out <- v: case <-ctx.Done(): return } } func ReadOrDone[T any](ctx context.Context, in <-chan T) <-chan T { valStream := make(chan T) go func() { defer close(valStream) for { select { case <-ctx.Done(): return case v, ok := <-in: if !ok { return } select { case valStream <- v: case <-ctx.Done(): } } } }() return valStream } func Tee[T any](ctx context.Context, in <-chan T) (<-chan T, <-chan T) { out1 := make(chan T) out2 := make(chan T) go func() { defer close(out1) defer close(out2) for val := range ReadOrDone(ctx, in) { var out1, out2 = out1, out2 for i := 0; i < 2; i++ { select { case <-ctx.Done(): case out1 <- val: out1 = nil case out2 <- val: out2 = nil } } } }() return out1, out2 } func FromSlice[T any](ctx context.Context, in []T) <-chan T { output := make(chan T, len(in)) for _, c := range in { output <- c } close(output) return output }