From d1df9f60be929d86ed83c181ff9a0ccecde03abc Mon Sep 17 00:00:00 2001 From: "fox.cpp" Date: Tue, 27 Aug 2019 02:06:13 +0300 Subject: [PATCH] queue: Fix race-conditions in queue tests Improper queue initialization by filling fields manually instead of calling NewQueue resulted in Queue objects created in tests having no workersStop channel (nil), making Queue.Close no-op. This caused race-conditions because test code assumes that Queue.Close will wait for queue goroutines to stop before returning. Queue late-initialization logic is factored out into Queue.start function. Now newTestQueueDir calls NewQueue, modifies fields of returned object and calls Queue.start. --- queue.go | 11 ++++++++--- queue_test.go | 30 ++++++++++-------------------- 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/queue.go b/queue.go index 5926b8f..7a281ae 100644 --- a/queue.go +++ b/queue.go @@ -103,9 +103,7 @@ func NewQueue(_, instName string) (module.Module, error) { } func (q *Queue) Init(cfg *config.Map) error { - q.wheel = NewTimeWheel() var workers int - cfg.Bool("debug", true, &q.Log.Debug) cfg.Int("max_tries", false, false, 8, &q.maxTries) cfg.Int("workers", false, false, 16, &workers) @@ -129,6 +127,13 @@ func (q *Queue) Init(cfg *config.Map) error { if err := os.MkdirAll(q.location, os.ModePerm); err != nil { return err } + + return q.start(workers) +} + +func (q *Queue) start(workers int) error { + q.wheel = NewTimeWheel() + if err := q.readDiskQueue(); err != nil { return err } @@ -139,7 +144,6 @@ func (q *Queue) Init(cfg *config.Map) error { q.workersWg.Add(1) go q.worker() } - return nil } @@ -379,6 +383,7 @@ func (q *Queue) removeFromDisk(id string) { if err := os.Remove(metaPath); err != nil { q.Log.Printf("failed to remove meta-data from disk: %v (msg ID = %s)", err, id) } + q.Log.Debugf("removed message from disk (msg ID = %s)", id) } func (q *Queue) readDiskQueue() error { diff --git a/queue_test.go b/queue_test.go index 61ab599..7dfb9db 100644 --- a/queue_test.go +++ b/queue_test.go @@ -38,31 +38,21 @@ func cleanQueue(t *testing.T, q *Queue) { } func newTestQueueDir(t *testing.T, target module.DeliveryTarget, dir string) *Queue { - q := &Queue{ - Log: testLogger(t, "queue"), - // Retry immediately since our tests do not rely on time anyhow - // This also a great opportunity to see whether TimeWheel handles - // edge case time values properly. - initialRetryTime: 0 * time.Millisecond, - retryTimeScale: 1, - maxTries: 5, - wheel: NewTimeWheel(), - location: dir, - Target: target, - } + mod, _ := NewQueue("", "queue") + q := mod.(*Queue) + q.Log = testLogger(t, "queue") + q.initialRetryTime = 0 + q.retryTimeScale = 1 + q.postInitDelay = 0 + q.maxTries = 5 + q.location = dir + q.Target = target if !testing.Verbose() { q.Log = log.Logger{Name: "", Out: log.WriterLog(ioutil.Discard)} } - // Crippled version of Queue.Init logic. - - if err := q.readDiskQueue(); err != nil { - t.Fatal("failed to read disk queue:", err) - } - - q.workersWg.Add(1) - go q.worker() + q.start(1) return q }