mirror of
https://github.com/navidrome/navidrome.git
synced 2025-04-03 04:27:37 +03:00
176 lines
3.5 KiB
Go
176 lines
3.5 KiB
Go
// Package pl implements some Data Pipeline helper functions.
|
|
// Reference: https://medium.com/amboss/applying-modern-go-concurrency-patterns-to-data-pipelines-b3b5327908d4#3a80
|
|
//
|
|
// See also:
|
|
//
|
|
// https://www.oreilly.com/library/view/concurrency-in-go/9781491941294/ch04.html#fano_fani
|
|
// https://www.youtube.com/watch?v=f6kdp27TYZs
|
|
// https://www.youtube.com/watch?v=QDDwwePbDtw
|
|
package pl
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
|
|
"github.com/navidrome/navidrome/log"
|
|
"golang.org/x/sync/semaphore"
|
|
)
|
|
|
|
func Stage[In any, Out any](
|
|
ctx context.Context,
|
|
maxWorkers int,
|
|
inputChannel <-chan In,
|
|
fn func(context.Context, In) (Out, error),
|
|
) (chan Out, chan error) {
|
|
outputChannel := make(chan Out)
|
|
errorChannel := make(chan error)
|
|
|
|
limit := int64(maxWorkers)
|
|
sem1 := semaphore.NewWeighted(limit)
|
|
|
|
go func() {
|
|
defer close(outputChannel)
|
|
defer close(errorChannel)
|
|
|
|
for s := range ReadOrDone(ctx, inputChannel) {
|
|
if err := sem1.Acquire(ctx, 1); err != nil {
|
|
if !errors.Is(err, context.Canceled) {
|
|
log.Error(ctx, "Failed to acquire semaphore", err)
|
|
}
|
|
break
|
|
}
|
|
|
|
go func(s In) {
|
|
defer sem1.Release(1)
|
|
|
|
result, err := fn(ctx, s)
|
|
if err != nil {
|
|
if !errors.Is(err, context.Canceled) {
|
|
errorChannel <- err
|
|
}
|
|
} else {
|
|
outputChannel <- result
|
|
}
|
|
}(s)
|
|
}
|
|
|
|
// By using context.Background() here we are assuming the fn will stop when the context
|
|
// is canceled. This is required so we can wait for the workers to finish and avoid closing
|
|
// the outputChannel before they are done.
|
|
if err := sem1.Acquire(context.Background(), limit); err != nil {
|
|
log.Error(ctx, "Failed waiting for workers", err)
|
|
}
|
|
}()
|
|
|
|
return outputChannel, errorChannel
|
|
}
|
|
|
|
func Sink[In any](
|
|
ctx context.Context,
|
|
maxWorkers int,
|
|
inputChannel <-chan In,
|
|
fn func(context.Context, In) error,
|
|
) chan error {
|
|
results, errC := Stage(ctx, maxWorkers, inputChannel, func(ctx context.Context, in In) (bool, error) {
|
|
err := fn(ctx, in)
|
|
return false, err // Only err is important, results will be discarded
|
|
})
|
|
|
|
// Discard results
|
|
go func() {
|
|
for range ReadOrDone(ctx, results) {
|
|
}
|
|
}()
|
|
|
|
return errC
|
|
}
|
|
|
|
func Merge[T any](ctx context.Context, cs ...<-chan T) <-chan T {
|
|
var wg sync.WaitGroup
|
|
out := make(chan T)
|
|
|
|
output := func(c <-chan T) {
|
|
defer wg.Done()
|
|
for v := range ReadOrDone(ctx, c) {
|
|
select {
|
|
case out <- v:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
wg.Add(len(cs))
|
|
for _, c := range cs {
|
|
go output(c)
|
|
}
|
|
|
|
go func() {
|
|
wg.Wait()
|
|
close(out)
|
|
}()
|
|
|
|
return out
|
|
}
|
|
|
|
func SendOrDone[T any](ctx context.Context, out chan<- T, v T) {
|
|
select {
|
|
case out <- v:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
|
|
func ReadOrDone[T any](ctx context.Context, in <-chan T) <-chan T {
|
|
valStream := make(chan T)
|
|
go func() {
|
|
defer close(valStream)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case v, ok := <-in:
|
|
if !ok {
|
|
return
|
|
}
|
|
select {
|
|
case valStream <- v:
|
|
case <-ctx.Done():
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
return valStream
|
|
}
|
|
|
|
func Tee[T any](ctx context.Context, in <-chan T) (<-chan T, <-chan T) {
|
|
out1 := make(chan T)
|
|
out2 := make(chan T)
|
|
go func() {
|
|
defer close(out1)
|
|
defer close(out2)
|
|
for val := range ReadOrDone(ctx, in) {
|
|
var out1, out2 = out1, out2
|
|
for i := 0; i < 2; i++ {
|
|
select {
|
|
case <-ctx.Done():
|
|
case out1 <- val:
|
|
out1 = nil
|
|
case out2 <- val:
|
|
out2 = nil
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
return out1, out2
|
|
}
|
|
|
|
func FromSlice[T any](ctx context.Context, in []T) <-chan T {
|
|
output := make(chan T, len(in))
|
|
for _, c := range in {
|
|
output <- c
|
|
}
|
|
close(output)
|
|
return output
|
|
}
|