queue: Fix race-conditions in queue tests

Improper queue initialization by filling fields manually instead of
calling NewQueue resulted in Queue objects created in tests having
no workersStop channel (nil), making Queue.Close no-op. This caused
race-conditions because test code assumes that Queue.Close will wait
for queue goroutines to stop before returning.

Queue late-initialization logic is factored out into Queue.start
function. Now newTestQueueDir calls NewQueue, modifies fields of
returned object and calls Queue.start.
This commit is contained in:
fox.cpp 2019-08-27 02:06:13 +03:00
parent 4c9433479b
commit d1df9f60be
No known key found for this signature in database
GPG key ID: E76D97CCEDE90B6C
2 changed files with 18 additions and 23 deletions

View file

@ -103,9 +103,7 @@ func NewQueue(_, instName string) (module.Module, error) {
}
func (q *Queue) Init(cfg *config.Map) error {
q.wheel = NewTimeWheel()
var workers int
cfg.Bool("debug", true, &q.Log.Debug)
cfg.Int("max_tries", false, false, 8, &q.maxTries)
cfg.Int("workers", false, false, 16, &workers)
@ -129,6 +127,13 @@ func (q *Queue) Init(cfg *config.Map) error {
if err := os.MkdirAll(q.location, os.ModePerm); err != nil {
return err
}
return q.start(workers)
}
func (q *Queue) start(workers int) error {
q.wheel = NewTimeWheel()
if err := q.readDiskQueue(); err != nil {
return err
}
@ -139,7 +144,6 @@ func (q *Queue) Init(cfg *config.Map) error {
q.workersWg.Add(1)
go q.worker()
}
return nil
}
@ -379,6 +383,7 @@ func (q *Queue) removeFromDisk(id string) {
if err := os.Remove(metaPath); err != nil {
q.Log.Printf("failed to remove meta-data from disk: %v (msg ID = %s)", err, id)
}
q.Log.Debugf("removed message from disk (msg ID = %s)", id)
}
func (q *Queue) readDiskQueue() error {

View file

@ -38,31 +38,21 @@ func cleanQueue(t *testing.T, q *Queue) {
}
func newTestQueueDir(t *testing.T, target module.DeliveryTarget, dir string) *Queue {
q := &Queue{
Log: testLogger(t, "queue"),
// Retry immediately since our tests do not rely on time anyhow
// This also a great opportunity to see whether TimeWheel handles
// edge case time values properly.
initialRetryTime: 0 * time.Millisecond,
retryTimeScale: 1,
maxTries: 5,
wheel: NewTimeWheel(),
location: dir,
Target: target,
}
mod, _ := NewQueue("", "queue")
q := mod.(*Queue)
q.Log = testLogger(t, "queue")
q.initialRetryTime = 0
q.retryTimeScale = 1
q.postInitDelay = 0
q.maxTries = 5
q.location = dir
q.Target = target
if !testing.Verbose() {
q.Log = log.Logger{Name: "", Out: log.WriterLog(ioutil.Discard)}
}
// Crippled version of Queue.Init logic.
if err := q.readDiskQueue(); err != nil {
t.Fatal("failed to read disk queue:", err)
}
q.workersWg.Add(1)
go q.worker()
q.start(1)
return q
}