Add concurrency limit for task

This commit is contained in:
世界 2023-11-30 16:08:57 +08:00
parent 028dcd722c
commit 99d07d6e5a
No known key found for this signature in database
GPG key ID: CD109927C34A63C4

View file

@ -23,6 +23,7 @@ type Group struct {
tasks []taskItem
cleanup func()
fastFail bool
queue chan struct{}
}
func (g *Group) Append(name string, f func(ctx context.Context) error) {
@ -46,6 +47,13 @@ func (g *Group) FastFail() {
g.fastFail = true
}
func (g *Group) Concurrency(n int) {
g.queue = make(chan struct{}, n)
for i := 0; i < n; i++ {
g.queue <- struct{}{}
}
}
func (g *Group) Run(contextList ...context.Context) error {
return g.RunContextList(contextList)
}
@ -65,6 +73,14 @@ func (g *Group) RunContextList(contextList []context.Context) error {
for _, task := range g.tasks {
currentTask := task
go func() {
if g.queue != nil {
<-g.queue
select {
case <-taskCancelContext.Done():
return
default:
}
}
err := currentTask.Run(taskCancelContext)
errorAccess.Lock()
if err != nil {
@ -83,6 +99,9 @@ func (g *Group) RunContextList(contextList []context.Context) error {
taskCancel(errTaskSucceed{})
taskFinish(errTaskSucceed{})
}
if g.queue != nil {
g.queue <- struct{}{}
}
}()
}