Add concurrency limit for task

This commit is contained in:
世界 2023-11-30 16:08:57 +08:00
parent 6512789f97
commit f3f99915cf
No known key found for this signature in database
GPG key ID: CD109927C34A63C4

View file

@ -23,6 +23,7 @@ type Group struct {
tasks []taskItem tasks []taskItem
cleanup func() cleanup func()
fastFail bool fastFail bool
queue chan struct{}
} }
func (g *Group) Append(name string, f func(ctx context.Context) error) { func (g *Group) Append(name string, f func(ctx context.Context) error) {
@ -46,6 +47,13 @@ func (g *Group) FastFail() {
g.fastFail = true g.fastFail = true
} }
func (g *Group) Concurrency(n int) {
g.queue = make(chan struct{}, n)
for i := 0; i < n; i++ {
g.queue <- struct{}{}
}
}
func (g *Group) Run(contextList ...context.Context) error { func (g *Group) Run(contextList ...context.Context) error {
return g.RunContextList(contextList) return g.RunContextList(contextList)
} }
@ -65,6 +73,14 @@ func (g *Group) RunContextList(contextList []context.Context) error {
for _, task := range g.tasks { for _, task := range g.tasks {
currentTask := task currentTask := task
go func() { go func() {
if g.queue != nil {
<-g.queue
select {
case <-taskCancelContext.Done():
return
default:
}
}
err := currentTask.Run(taskCancelContext) err := currentTask.Run(taskCancelContext)
errorAccess.Lock() errorAccess.Lock()
if err != nil { if err != nil {
@ -83,6 +99,9 @@ func (g *Group) RunContextList(contextList []context.Context) error {
taskCancel(errTaskSucceed{}) taskCancel(errTaskSucceed{})
taskFinish(errTaskSucceed{}) taskFinish(errTaskSucceed{})
} }
if g.queue != nil {
g.queue <- struct{}{}
}
}() }()
} }