queue: Drop worker goroutines idea

Simply start a goroutine when needed and use a semaphore
to limit concurrency.

Closes #110.
This commit is contained in:
fox.cpp 2019-09-20 16:46:13 +03:00
parent 3b46a88aa0
commit e253cca130
No known key found for this signature in database
GPG key ID: E76D97CCEDE90B6C
3 changed files with 43 additions and 43 deletions

View file

@ -80,9 +80,10 @@ type Queue struct {
Log log.Logger
Target module.DeliveryTarget
workersWg sync.WaitGroup
// Closed from Queue.Close.
workersStop chan struct{}
deliveryWg sync.WaitGroup
// Buffered channel used to restrict count of deliveries attempted
// in parallel.
deliverySemaphore chan struct{}
}
type QueueMetadata struct {
@ -118,15 +119,14 @@ func NewQueue(_, instName string, _ []string) (module.Module, error) {
retryTimeScale: 2,
postInitDelay: 10 * time.Second,
Log: log.Logger{Name: "queue"},
workersStop: make(chan struct{}),
}, nil
}
func (q *Queue) Init(cfg *config.Map) error {
var workers int
var maxParallelism int
cfg.Bool("debug", true, false, &q.Log.Debug)
cfg.Int("max_tries", false, false, 8, &q.maxTries)
cfg.Int("workers", false, false, 16, &workers)
cfg.Int("max_parallelism", false, false, 16, &maxParallelism)
cfg.String("location", false, false, "", &q.location)
cfg.Custom("target", false, true, nil, modconfig.DeliveryDirective, &q.Target)
cfg.String("hostname", true, true, "", &q.hostname)
@ -161,11 +161,12 @@ func (q *Queue) Init(cfg *config.Map) error {
return err
}
return q.start(workers)
return q.start(maxParallelism)
}
func (q *Queue) start(workers int) error {
func (q *Queue) start(maxParallelism int) error {
q.wheel = NewTimeWheel()
q.deliverySemaphore = make(chan struct{}, maxParallelism)
if err := q.readDiskQueue(); err != nil {
return err
@ -173,44 +174,39 @@ func (q *Queue) start(workers int) error {
q.Log.Debugf("delivery target: %T", q.Target)
for i := 0; i < workers; i++ {
q.workersWg.Add(1)
go q.worker()
}
go q.dispatch()
return nil
}
func (q *Queue) Close() error {
// Make Close function idempotent. This makes it more
// convenient to use in certain situations (see queue tests).
if q.workersStop == nil {
return nil
}
close(q.workersStop)
q.workersWg.Wait()
q.workersStop = nil
q.deliveryWg.Wait()
q.wheel.Close()
return nil
}
func (q *Queue) worker() {
for {
select {
case <-q.workersStop:
q.workersWg.Done()
return
case slot := <-q.wheel.Dispatch():
q.Log.Debugln("worker woke up for", slot.Value)
id := slot.Value.(string)
func (q *Queue) dispatch() {
for slot := range q.wheel.Dispatch() {
q.Log.Debugln("starting delivery for", slot.Value)
id := slot.Value.(string)
q.deliveryWg.Add(1)
go func() {
q.Log.Debugln("waiting on delivery semaphore for", slot.Value)
q.deliverySemaphore <- struct{}{}
defer func() {
<-q.deliverySemaphore
q.deliveryWg.Done()
}()
q.Log.Debugln("delivery semaphore acquired for", slot.Value)
meta, header, body, err := q.openMessage(id)
if err != nil {
q.Log.Printf("failed to read message: %v", err)
continue
return
}
q.tryDelivery(meta, header, body)
}
}()
}
}
@ -383,18 +379,15 @@ func (qd *queueDelivery) Abort() error {
}
func (qd *queueDelivery) Commit() error {
// workersWg counter in incremented to make sure there will be no race with Close.
// e.g. it will not close the wheel before we complete first attempt.
// Also the first attempt is not scheduled using time wheel because
// the "normal" code path requires re-reading and re-parsing of header
// which is kinda expensive.
// FIXME: Though this is temporary solution, the correct fix
// would be to ditch "worker goroutines" altogether and enforce
// concurrent deliveries limit using a semaphore.
qd.q.workersWg.Add(1)
qd.q.deliveryWg.Add(1)
go func() {
qd.q.deliverySemaphore <- struct{}{}
defer func() {
<-qd.q.deliverySemaphore
qd.q.deliveryWg.Done()
}()
qd.q.tryDelivery(qd.meta, qd.header, qd.body)
qd.q.workersWg.Done()
}()
return nil
}