diff --git a/.gitignore b/.gitignore index ec91fce..8dc59ee 100644 --- a/.gitignore +++ b/.gitignore @@ -34,5 +34,5 @@ cmd/maddy-*-helper/maddy-*-helper # Some directories that may be created during test-runs # in repo directory. -mtasts-cache -*queue +cmd/maddy/*mtasts-cache +cmd/maddy/*queue diff --git a/target/queue/queue.go b/target/queue/queue.go new file mode 100644 index 0000000..65053d0 --- /dev/null +++ b/target/queue/queue.go @@ -0,0 +1,744 @@ +// Package queue implements module which keeps messages on disk +// and tries delivery to the configured target (usually remote) +// multiple times until all recipients are succeeded. +// +// Interfaces implemented: +// - module.DeliveryTarget +package queue + +import ( + "bufio" + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "math" + "net" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/emersion/go-message/textproto" + "github.com/emersion/go-smtp" + "github.com/foxcpp/maddy/buffer" + "github.com/foxcpp/maddy/config" + modconfig "github.com/foxcpp/maddy/config/module" + "github.com/foxcpp/maddy/dispatcher" + "github.com/foxcpp/maddy/dsn" + "github.com/foxcpp/maddy/log" + "github.com/foxcpp/maddy/module" + "github.com/foxcpp/maddy/target" +) + +// PartialError describes state of partially successful message delivery. +type PartialError struct { + // Recipients for which delivery permanently failed. + Failed []string + // Recipients for which delivery temporary failed. + TemporaryFailed []string + + // Underlying error objects. + Errs map[string]error +} + +func (pe PartialError) Error() string { + return fmt.Sprintf("delivery failed for some recipients (permanently: %v, temporary: %v): %v", pe.Failed, pe.TemporaryFailed, pe.Errs) +} + +type Queue struct { + name string + location string + hostname string + autogenMsgDomain string + wheel *TimeWheel + + dsnDispatcher *dispatcher.Dispatcher + + // Retry delay is calculated using the following formula: + // initialRetryTime * retryTimeScale ^ (TriesCount - 1) + + initialRetryTime time.Duration + retryTimeScale float64 + maxTries int + + // If any delivery is scheduled in less than postInitDelay + // after Init, its delay will be increased by postInitDelay. + // + // Say, if postInitDelay is 10 secs. + // Then if some message is scheduled to delivered 5 seconds + // after init, it will be actually delivered 15 seconds + // after start-up. + // + // This delay is added to make that if maddy is killed shortly + // after start-up for whatever reason it will not affect the queue. + postInitDelay time.Duration + + Log log.Logger + Target module.DeliveryTarget + + workersWg sync.WaitGroup + // Closed from Queue.Close. + workersStop chan struct{} +} + +type QueueMetadata struct { + MsgMeta *module.MsgMetadata + From string + + // Recipients that should be tried next. + // May or may not be equal to PartialError.TemporaryFailed. + To []string + + // Information about previous failures. + // Preserved to be included in a bounce message. + FailedRcpts []string + TemporaryFailedRcpts []string + // All errors are converted to SMTPError we can serialize and + // also it is directly usable for bounce messages. + RcptErrs map[string]*smtp.SMTPError + + // Amount of times delivery *already tried*. + TriesCount int + + FirstAttempt time.Time + LastAttempt time.Time + + // Whether this is a delivery notification. + DSN bool +} + +func NewQueue(_, instName string, _ []string) (module.Module, error) { + return &Queue{ + name: instName, + initialRetryTime: 15 * time.Minute, + retryTimeScale: 2, + postInitDelay: 10 * time.Second, + Log: log.Logger{Name: "queue"}, + workersStop: make(chan struct{}), + }, nil +} + +func (q *Queue) Init(cfg *config.Map) error { + var workers int + cfg.Bool("debug", true, &q.Log.Debug) + cfg.Int("max_tries", false, false, 8, &q.maxTries) + cfg.Int("workers", false, false, 16, &workers) + cfg.String("location", false, false, "", &q.location) + cfg.Custom("target", false, true, nil, modconfig.DeliveryDirective, &q.Target) + cfg.String("hostname", true, true, "", &q.hostname) + cfg.String("autogenerated_msg_domain", true, false, "", &q.autogenMsgDomain) + cfg.Custom("bounce", false, false, nil, func(m *config.Map, node *config.Node) (interface{}, error) { + return dispatcher.NewDispatcher(m.Globals, node.Children) + }, &q.dsnDispatcher) + if _, err := cfg.Process(); err != nil { + return err + } + + if q.dsnDispatcher != nil { + if q.autogenMsgDomain == "" { + return errors.New("queue: autogenerated_msg_domain is required if bounce {} is specified") + } + + q.dsnDispatcher.Hostname = q.hostname + q.dsnDispatcher.Log = log.Logger{Name: "queue/dispatcher", Debug: q.Log.Debug} + } + if q.location == "" && q.name == "" { + return errors.New("queue: need explicit location directive or config block name if defined inline") + } + if q.location == "" { + q.location = filepath.Join(config.StateDirectory(cfg.Globals), q.name) + } + if !filepath.IsAbs(q.location) { + q.location = filepath.Join(config.StateDirectory(cfg.Globals), q.location) + } + + // TODO: Check location write permissions. + if err := os.MkdirAll(q.location, os.ModePerm); err != nil { + return err + } + + return q.start(workers) +} + +func (q *Queue) start(workers int) error { + q.wheel = NewTimeWheel() + + if err := q.readDiskQueue(); err != nil { + return err + } + + q.Log.Debugf("delivery target: %T", q.Target) + + for i := 0; i < workers; i++ { + q.workersWg.Add(1) + go q.worker() + } + return nil +} + +func (q *Queue) Close() error { + // Make Close function idempotent. This makes it more + // convenient to use in certain situations (see queue tests). + if q.workersStop == nil { + return nil + } + close(q.workersStop) + q.workersWg.Wait() + q.workersStop = nil + q.wheel.Close() + return nil +} + +func (q *Queue) worker() { + for { + select { + case <-q.workersStop: + q.workersWg.Done() + return + case slot := <-q.wheel.Dispatch(): + q.Log.Debugln("worker woke up for", slot.Value) + id := slot.Value.(string) + + meta, header, body, err := q.openMessage(id) + if err != nil { + q.Log.Printf("failed to read message: %v", err) + continue + } + + q.tryDelivery(meta, header, body) + } + } +} + +func (q *Queue) tryDelivery(meta *QueueMetadata, header textproto.Header, body buffer.Buffer) { + dl := target.DeliveryLogger(q.Log, meta.MsgMeta) + dl.Debugf("delivery attempt #%d", meta.TriesCount+1) + + partialErr := q.deliver(meta, header, body) + dl.Debugf("failures: permanently: %v, temporary: %v, errors: %v", + partialErr.Failed, partialErr.TemporaryFailed, partialErr.Errs) + + // Save permanent errors information for reporting in bounce message. + meta.FailedRcpts = append(meta.FailedRcpts, partialErr.Failed...) + for rcpt, rcptErr := range partialErr.Errs { + var smtpErr *smtp.SMTPError + var ok bool + if smtpErr, ok = rcptErr.(*smtp.SMTPError); !ok { + smtpErr = &smtp.SMTPError{ + Code: 554, + EnhancedCode: smtp.EnhancedCode{5, 0, 0}, + Message: rcptErr.Error(), + } + if target.IsTemporaryErr(rcptErr) { + smtpErr.Code = 451 + smtpErr.EnhancedCode = smtp.EnhancedCode{4, 0, 0} + } + } + + meta.RcptErrs[rcpt] = smtpErr + } + meta.To = partialErr.TemporaryFailed + + meta.LastAttempt = time.Now() + if meta.TriesCount == q.maxTries || len(partialErr.TemporaryFailed) == 0 { + // Attempt either fully succeeded or completely failed. + if meta.TriesCount == q.maxTries { + dl.Printf("gave up trying to deliver to %v, errors: %v", meta.TemporaryFailedRcpts, meta.RcptErrs) + } + if len(meta.FailedRcpts) != 0 { + dl.Printf("permanently failed to deliver to %v, errors: %v", meta.FailedRcpts, meta.RcptErrs) + } + if !meta.DSN { + q.emitDSN(meta, header) + } + q.removeFromDisk(meta.MsgMeta) + return + } + + meta.TriesCount++ + + if err := q.updateMetadataOnDisk(meta); err != nil { + dl.Printf("failed to update meta-data: %v", err) + } + + nextTryTime := time.Now() + nextTryTime = nextTryTime.Add(q.initialRetryTime * time.Duration(math.Pow(q.retryTimeScale, float64(meta.TriesCount-1)))) + dl.Printf("%d attempt failed, will retry in %v (at %v)", meta.TriesCount, time.Until(nextTryTime), nextTryTime) + + q.wheel.Add(nextTryTime, meta.MsgMeta.ID) +} + +func (q *Queue) deliver(meta *QueueMetadata, header textproto.Header, body buffer.Buffer) PartialError { + dl := target.DeliveryLogger(q.Log, meta.MsgMeta) + perr := PartialError{ + Errs: map[string]error{}, + } + + deliveryTarget := q.Target + if meta.DSN { + deliveryTarget = q.dsnDispatcher + } + + delivery, err := deliveryTarget.Start(meta.MsgMeta, meta.From) + if err != nil { + perr.Failed = append(perr.Failed, meta.To...) + for _, rcpt := range meta.To { + perr.Errs[rcpt] = err + } + return perr + } + + var acceptedRcpts []string + for _, rcpt := range meta.To { + if err := delivery.AddRcpt(rcpt); err != nil { + if target.IsTemporaryErr(err) { + perr.TemporaryFailed = append(perr.TemporaryFailed, rcpt) + } else { + perr.Failed = append(perr.Failed, rcpt) + } + perr.Errs[rcpt] = err + } else { + acceptedRcpts = append(acceptedRcpts, rcpt) + } + } + + if len(acceptedRcpts) == 0 { + if err := delivery.Abort(); err != nil { + dl.Printf("delivery.Abort failed: %v", err) + } + return perr + } + + expandToPartialErr := func(err error) { + if expandedPerr, ok := err.(PartialError); ok { + perr.TemporaryFailed = append(perr.TemporaryFailed, expandedPerr.TemporaryFailed...) + perr.Failed = append(perr.Failed, expandedPerr.Failed...) + for rcpt, rcptErr := range expandedPerr.Errs { + perr.Errs[rcpt] = rcptErr + } + } else { + if target.IsTemporaryErr(err) { + perr.TemporaryFailed = append(perr.TemporaryFailed, acceptedRcpts...) + } else { + perr.Failed = append(perr.Failed, acceptedRcpts...) + } + for _, rcpt := range acceptedRcpts { + perr.Errs[rcpt] = err + } + } + } + + if err := delivery.Body(header, body); err != nil { + expandToPartialErr(err) + // No recipients succeeded. + if len(perr.TemporaryFailed)+len(perr.Failed) == len(acceptedRcpts) { + if err := delivery.Abort(); err != nil { + dl.Printf("delivery.Abort failed: %v", err) + } + return perr + } + } + if err := delivery.Commit(); err != nil { + expandToPartialErr(err) + } + + return perr +} + +type queueDelivery struct { + q *Queue + meta *QueueMetadata + + header textproto.Header + body buffer.Buffer +} + +func (qd *queueDelivery) AddRcpt(rcptTo string) error { + qd.meta.To = append(qd.meta.To, rcptTo) + return nil +} + +func (qd *queueDelivery) Body(header textproto.Header, body buffer.Buffer) error { + // Body buffer initially passed to us may not be valid after "delivery" to queue completes. + // storeNewMessage returns a new buffer object created from message blob stored on disk. + storedBody, err := qd.q.storeNewMessage(qd.meta, header, body) + if err != nil { + return err + } + + qd.body = storedBody + qd.header = header + return nil +} + +func (qd *queueDelivery) Abort() error { + if qd.body != nil { + qd.q.removeFromDisk(qd.meta.MsgMeta) + } + return nil +} + +func (qd *queueDelivery) Commit() error { + // workersWg counter in incremented to make sure there will be no race with Close. + // e.g. it will not close the wheel before we complete first attempt. + // Also the first attempt is not scheduled using time wheel because + // the "normal" code path requires re-reading and re-parsing of header + // which is kinda expensive. + // FIXME: Though this is temporary solution, the correct fix + // would be to ditch "worker goroutines" altogether and enforce + // concurrent deliveries limit using a semaphore. + qd.q.workersWg.Add(1) + go func() { + qd.q.tryDelivery(qd.meta, qd.header, qd.body) + qd.q.workersWg.Done() + }() + return nil +} + +func (q *Queue) Start(msgMeta *module.MsgMetadata, mailFrom string) (module.Delivery, error) { + meta := &QueueMetadata{ + MsgMeta: msgMeta, + From: mailFrom, + RcptErrs: map[string]*smtp.SMTPError{}, + FirstAttempt: time.Now(), + LastAttempt: time.Now(), + } + return &queueDelivery{q: q, meta: meta}, nil +} + +func (q *Queue) StartDSN(msgMeta *module.MsgMetadata, mailFrom string) (module.Delivery, error) { + meta := &QueueMetadata{ + DSN: true, + MsgMeta: msgMeta, + From: mailFrom, + RcptErrs: map[string]*smtp.SMTPError{}, + FirstAttempt: time.Now(), + LastAttempt: time.Now(), + } + return &queueDelivery{q: q, meta: meta}, nil +} + +func (q *Queue) removeFromDisk(msgMeta *module.MsgMetadata) { + id := msgMeta.ID + dl := target.DeliveryLogger(q.Log, msgMeta) + + // Order is important. + // If we remove header and body but can't remove meta now - readDiskQueue + // will detect and report it. + headerPath := filepath.Join(q.location, id+".header") + if err := os.Remove(headerPath); err != nil { + dl.Printf("failed to remove header from disk: %v", err) + } + bodyPath := filepath.Join(q.location, id+".body") + if err := os.Remove(bodyPath); err != nil { + dl.Printf("failed to remove body from disk: %v", err) + } + metaPath := filepath.Join(q.location, id+".meta") + if err := os.Remove(metaPath); err != nil { + dl.Printf("failed to remove meta-data from disk: %v", err) + } + dl.Debugf("removed message from disk") +} + +func (q *Queue) readDiskQueue() error { + dirInfo, err := ioutil.ReadDir(q.location) + if err != nil { + return err + } + + // TODO: Rewrite this function to pass all sub-tests in TestQueueDelivery_DeserializationCleanUp/NoMeta. + + loadedCount := 0 + for _, entry := range dirInfo { + // We start loading from meta-data files and then check whether ID.header and ID.body exist. + // This allows us to properly detect dangling body files. + if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".meta") { + continue + } + id := entry.Name()[:len(entry.Name())-5] + + meta, err := q.readMessageMeta(id) + if err != nil { + q.Log.Printf("failed to read meta-data, skipping: %v (msg ID = %s)", err, id) + continue + } + + // Check header file existence. + if _, err := os.Stat(filepath.Join(q.location, id+".header")); err != nil { + if os.IsNotExist(err) { + q.Log.Printf("header file doesn't exist for msg ID = %s", id) + q.tryRemoveDanglingFile(id + ".meta") + q.tryRemoveDanglingFile(id + ".body") + } else { + q.Log.Printf("skipping nonstat'able header file: %v (msg ID = %s)", err, id) + } + continue + } + + // Check body file existence. + if _, err := os.Stat(filepath.Join(q.location, id+".body")); err != nil { + if os.IsNotExist(err) { + q.Log.Printf("body file doesn't exist for msg ID = %s", id) + q.tryRemoveDanglingFile(id + ".meta") + q.tryRemoveDanglingFile(id + ".header") + } else { + q.Log.Printf("skipping nonstat'able body file: %v (msg ID = %s)", err, id) + } + continue + } + + nextTryTime := meta.LastAttempt + nextTryTime = nextTryTime.Add(q.initialRetryTime * time.Duration(math.Pow(q.retryTimeScale, float64(meta.TriesCount-1)))) + + if time.Until(nextTryTime) < q.postInitDelay { + nextTryTime = time.Now().Add(q.postInitDelay) + } + + q.Log.Debugf("will try to deliver (msg ID = %s) in %v (%v)", id, time.Until(nextTryTime), nextTryTime) + q.wheel.Add(nextTryTime, id) + loadedCount++ + } + + if loadedCount != 0 { + q.Log.Printf("loaded %d saved queue entries", loadedCount) + } + + return nil +} + +func (q *Queue) storeNewMessage(meta *QueueMetadata, header textproto.Header, body buffer.Buffer) (buffer.Buffer, error) { + id := meta.MsgMeta.ID + + headerPath := filepath.Join(q.location, id+".header") + headerFile, err := os.Create(headerPath) + if err != nil { + return nil, err + } + defer headerFile.Close() + + if err := textproto.WriteHeader(headerFile, header); err != nil { + q.tryRemoveDanglingFile(id + ".header") + return nil, err + } + + bodyReader, err := body.Open() + if err != nil { + q.tryRemoveDanglingFile(id + ".header") + return nil, err + } + defer bodyReader.Close() + + bodyPath := filepath.Join(q.location, id+".body") + bodyFile, err := os.Create(bodyPath) + if err != nil { + return nil, err + } + defer bodyFile.Close() + + if _, err := io.Copy(bodyFile, bodyReader); err != nil { + q.tryRemoveDanglingFile(id + ".body") + q.tryRemoveDanglingFile(id + ".header") + return nil, err + } + + if err := q.updateMetadataOnDisk(meta); err != nil { + q.tryRemoveDanglingFile(id + ".body") + q.tryRemoveDanglingFile(id + ".header") + return nil, err + } + + return buffer.FileBuffer{Path: bodyPath}, nil +} + +func (q *Queue) updateMetadataOnDisk(meta *QueueMetadata) error { + metaPath := filepath.Join(q.location, meta.MsgMeta.ID+".meta") + file, err := os.Create(metaPath) + if err != nil { + return err + } + defer file.Close() + + metaCopy := *meta + metaCopy.MsgMeta = meta.MsgMeta.DeepCopy() + + if _, ok := metaCopy.MsgMeta.SrcAddr.(*net.TCPAddr); !ok { + meta.MsgMeta.SrcAddr = nil + } + + if err := json.NewEncoder(file).Encode(metaCopy); err != nil { + return err + } + return nil +} + +func (q *Queue) readMessageMeta(id string) (*QueueMetadata, error) { + metaPath := filepath.Join(q.location, id+".meta") + file, err := os.Open(metaPath) + if err != nil { + return nil, err + } + defer file.Close() + + meta := &QueueMetadata{} + + // net.Addr can't be deserialized because we don't know concrete type. For + // this reason we assume that SrcAddr is TCPAddr, if it is not - we drop it + // during serialization (see updateMetadataOnDisk). + meta.MsgMeta = &module.MsgMetadata{} + meta.MsgMeta.SrcAddr = &net.TCPAddr{} + + if err := json.NewDecoder(file).Decode(meta); err != nil { + return nil, err + } + + return meta, nil +} + +type BufferedReadCloser struct { + *bufio.Reader + io.Closer +} + +func (q *Queue) tryRemoveDanglingFile(name string) { + if err := os.Remove(filepath.Join(q.location, name)); err != nil { + q.Log.Println(err) + return + } + q.Log.Printf("removed dangling file %s", name) +} + +func (q *Queue) openMessage(id string) (*QueueMetadata, textproto.Header, buffer.Buffer, error) { + meta, err := q.readMessageMeta(id) + if err != nil { + return nil, textproto.Header{}, nil, err + } + + bodyPath := filepath.Join(q.location, id+".body") + _, err = os.Stat(bodyPath) + if err != nil { + if os.IsNotExist(err) { + q.tryRemoveDanglingFile(id + ".meta") + } + return nil, textproto.Header{}, nil, nil + } + body := buffer.FileBuffer{Path: bodyPath} + + headerPath := filepath.Join(q.location, id+".header") + headerFile, err := os.Open(headerPath) + if err != nil { + if os.IsNotExist(err) { + q.tryRemoveDanglingFile(id + ".meta") + q.tryRemoveDanglingFile(id + ".body") + } + return nil, textproto.Header{}, nil, nil + } + + bufferedHeader := bufio.NewReader(headerFile) + header, err := textproto.ReadHeader(bufferedHeader) + if err != nil { + return nil, textproto.Header{}, nil, nil + } + + return meta, header, body, nil +} + +func (q *Queue) InstanceName() string { + return q.name +} + +func (q *Queue) Name() string { + return "queue" +} + +func (q *Queue) emitDSN(meta *QueueMetadata, header textproto.Header) { + // If, apparently, we have no DSN dispatcher configured - do nothing. + if q.dsnDispatcher == nil { + return + } + + dsnID, err := dispatcher.GenerateMsgID() + if err != nil { + q.Log.Printf("rand.Rand error: %v", err) + return + } + + dsnEnvelope := dsn.Envelope{ + MsgID: "<" + dsnID + "@" + q.autogenMsgDomain + ">", + From: "MAILER-DAEMON@" + q.autogenMsgDomain, + To: meta.From, + } + mtaInfo := dsn.ReportingMTAInfo{ + ReportingMTA: q.hostname, + XSender: meta.From, + XMessageID: meta.MsgMeta.ID, + ArrivalDate: meta.FirstAttempt, + LastAttemptDate: meta.LastAttempt, + } + if !meta.MsgMeta.DontTraceSender { + mtaInfo.ReceivedFromMTA = meta.MsgMeta.SrcHostname + } + + rcptInfo := make([]dsn.RecipientInfo, 0, len(meta.RcptErrs)) + for rcpt, err := range meta.RcptErrs { + if meta.MsgMeta.OriginalRcpts != nil { + originalRcpt := meta.MsgMeta.OriginalRcpts[rcpt] + if originalRcpt != "" { + rcpt = originalRcpt + } + } + + rcptInfo = append(rcptInfo, dsn.RecipientInfo{ + FinalRecipient: rcpt, + Action: dsn.ActionFailed, + Status: err.EnhancedCode, + DiagnosticCode: err, + }) + } + + var dsnBodyBlob bytes.Buffer + dl := target.DeliveryLogger(q.Log, meta.MsgMeta) + dsnHeader, err := dsn.GenerateDSN(dsnEnvelope, mtaInfo, rcptInfo, header, &dsnBodyBlob) + if err != nil { + dl.Printf("failed to generate fail DSN: %v", err) + return + } + dsnBody := buffer.MemoryBuffer{Slice: dsnBodyBlob.Bytes()} + + dsnMeta := &module.MsgMetadata{ + ID: dsnID, + SrcProto: "", + SrcHostname: q.hostname, + OurHostname: q.hostname, + } + dl.Printf("generated failed DSN, DSN ID = %s", dsnID) + + dsnDelivery, err := q.StartDSN(dsnMeta, "MAILER-DAEMON@"+q.autogenMsgDomain) + if err != nil { + dl.Printf("failed to enqueue DSN: %v", err) + return + } + defer func() { + if err != nil { + dl.Printf("failed to enqueue DSN: %v", err) + dsnDelivery.Abort() + } + }() + + if err = dsnDelivery.AddRcpt(meta.From); err != nil { + return + } + if err = dsnDelivery.Body(dsnHeader, dsnBody); err != nil { + return + } + if err = dsnDelivery.Commit(); err != nil { + return + } +} + +func init() { + module.Register("queue", NewQueue) +} diff --git a/target/queue/queue_test.go b/target/queue/queue_test.go new file mode 100644 index 0000000..d03734a --- /dev/null +++ b/target/queue/queue_test.go @@ -0,0 +1,550 @@ +package queue + +import ( + "errors" + "io/ioutil" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/emersion/go-message/textproto" + "github.com/emersion/go-smtp" + "github.com/foxcpp/maddy/buffer" + "github.com/foxcpp/maddy/log" + "github.com/foxcpp/maddy/module" + "github.com/foxcpp/maddy/testutils" +) + +// newTestQueue returns properly initialized Queue object usable for testing. +// +// See newTestQueueDir to create testing queue from an existing directory. +// It is called responsibility to remove queue directory created by this function. +func newTestQueue(t *testing.T, target module.DeliveryTarget) *Queue { + dir, err := ioutil.TempDir("", "maddy-tests-queue") + if err != nil { + t.Fatal("failed to create temporary directory for queue:", err) + } + return newTestQueueDir(t, target, dir) +} + +func cleanQueue(t *testing.T, q *Queue) { + if err := q.Close(); err != nil { + t.Fatal("queue.Close:", err) + } + if err := os.RemoveAll(q.location); err != nil { + t.Fatal("os.RemoveAll", err) + } +} + +func newTestQueueDir(t *testing.T, target module.DeliveryTarget, dir string) *Queue { + mod, _ := NewQueue("", "queue", nil) + q := mod.(*Queue) + q.Log = testutils.Logger(t, "queue") + q.initialRetryTime = 0 + q.retryTimeScale = 1 + q.postInitDelay = 0 + q.maxTries = 5 + q.location = dir + q.Target = target + + if !testing.Verbose() { + q.Log = log.Logger{Name: "", Out: log.WriterLog(ioutil.Discard)} + } + + q.start(1) + + return q +} + +// unreliableTarget is a module.DeliveryTarget implementation that stores +// messages to a slice and sometimes fails with the specified error. +type unreliableTarget struct { + committed chan testutils.Msg + aborted chan testutils.Msg + + // Amount of completed deliveries (both failed and succeeded) + passedMessages int + + // To make unreliableTarget fail Commit for N-th delivery, set N-1-th + // element of this slice to wanted error object. If slice is + // nil/empty or N is bigger than its size - delivery will succeed. + bodyFailures []error + rcptFailures []map[string]error +} + +type unreliableTargetDelivery struct { + ut *unreliableTarget + msg testutils.Msg +} + +func (utd *unreliableTargetDelivery) AddRcpt(rcptTo string) error { + if len(utd.ut.rcptFailures) > utd.ut.passedMessages { + rcptErrs := utd.ut.rcptFailures[utd.ut.passedMessages] + if err := rcptErrs[rcptTo]; err != nil { + return err + } + } + + utd.msg.RcptTo = append(utd.msg.RcptTo, rcptTo) + return nil +} + +func (utd *unreliableTargetDelivery) Body(header textproto.Header, body buffer.Buffer) error { + r, _ := body.Open() + utd.msg.Body, _ = ioutil.ReadAll(r) + + if len(utd.ut.bodyFailures) > utd.ut.passedMessages { + return utd.ut.bodyFailures[utd.ut.passedMessages] + } + + return nil +} + +func (utd *unreliableTargetDelivery) Abort() error { + utd.ut.passedMessages++ + if utd.ut.aborted != nil { + utd.ut.aborted <- utd.msg + } + return nil +} + +func (utd *unreliableTargetDelivery) Commit() error { + utd.ut.passedMessages++ + if utd.ut.committed != nil { + utd.ut.committed <- utd.msg + } + return nil +} + +func (ut *unreliableTarget) Start(msgMeta *module.MsgMetadata, mailFrom string) (module.Delivery, error) { + return &unreliableTargetDelivery{ + ut: ut, + msg: testutils.Msg{ + MsgMeta: msgMeta, + MailFrom: mailFrom, + }, + }, nil +} + +func readMsgChanTimeout(t *testing.T, ch <-chan testutils.Msg, timeout time.Duration) *testutils.Msg { + t.Helper() + timer := time.NewTimer(timeout) + select { + case msg := <-ch: + return &msg + case <-timer.C: + t.Fatal("chan read timed out") + return nil + } +} + +func checkQueueDir(t *testing.T, q *Queue, expectedIDs []string) { + t.Helper() + // We use the map to lookups and also to mark messages we found + // we can report missing entries. + expectedMap := make(map[string]bool, len(expectedIDs)) + for _, id := range expectedIDs { + expectedMap[id] = false + } + + dir, err := ioutil.ReadDir(q.location) + if err != nil { + t.Fatalf("failed to read queue directory: %v", err) + } + + // Queue implementation uses file names in the following format: + // DELIVERY_ID.SOMETHING + for _, file := range dir { + if file.IsDir() { + t.Fatalf("queue should not create subdirectories in the store, but there is %s dir in it", file.Name()) + } + + nameParts := strings.Split(file.Name(), ".") + if len(nameParts) != 2 { + t.Fatalf("did the queue files name format changed? got %s", file.Name()) + } + + _, ok := expectedMap[nameParts[0]] + if !ok { + t.Errorf("message with unexpected Msg ID %s is stored in queue store", nameParts[0]) + continue + } + + expectedMap[nameParts[0]] = true + } + + for id, found := range expectedMap { + if !found { + t.Errorf("expected message with Msg ID %s is missing from queue store", id) + } + } +} + +func TestQueueDelivery(t *testing.T) { + t.Parallel() + + dt := unreliableTarget{committed: make(chan testutils.Msg, 10)} + q := newTestQueue(t, &dt) + defer cleanQueue(t, q) + + testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) + + // This is far from being a proper blackbox testing. + // But I can't come up with a better way to inspect the Queue state. + // This probably will be improved when bounce messages will be implemented. + // For now, this is a dirty hack. Close the Queue and inspect serialized state. + // FIXME. + + // Wait for the delivery to complete and stop processing. + msg := readMsgChanTimeout(t, dt.committed, 5*time.Second) + q.Close() + + testutils.CheckMsg(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) + + // There should be no queued messages. + checkQueueDir(t, q, []string{}) +} + +func TestQueueDelivery_PermanentFail_NonPartial(t *testing.T) { + t.Parallel() + + dt := unreliableTarget{ + bodyFailures: []error{ + &smtp.SMTPError{ + Code: 500, + EnhancedCode: smtp.EnhancedCode{5, 0, 0}, + Message: "you shall not pass", + }, + }, + aborted: make(chan testutils.Msg, 10), + } + q := newTestQueue(t, &dt) + defer cleanQueue(t, q) + + testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) + + // Queue will abort a delivery if it fails for all recipients. + readMsgChanTimeout(t, dt.aborted, 5*time.Second) + q.Close() + + // Delivery is failed permanently, hence no retry should be rescheduled. + checkQueueDir(t, q, []string{}) +} + +func TestQueueDelivery_PermanentFail_Partial(t *testing.T) { + t.Parallel() + + dt := unreliableTarget{ + bodyFailures: []error{ + PartialError{ + Failed: []string{"tester1@example.org", "tester2@example.org"}, + Errs: map[string]error{ + "tester1@example.org": errors.New("you shall not pass"), + "tester2@example.org": errors.New("you shall not pass"), + }, + }, + }, + aborted: make(chan testutils.Msg, 10), + } + q := newTestQueue(t, &dt) + defer cleanQueue(t, q) + + testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) + + // This this is similar to the previous test, but checks PartialErr processing logic. + // Here delivery fails for recipients too, but this is reported using PartialErr. + + readMsgChanTimeout(t, dt.aborted, 5*time.Second) + q.Close() + checkQueueDir(t, q, []string{}) +} + +func TestQueueDelivery_TemporaryFail(t *testing.T) { + t.Parallel() + + dt := unreliableTarget{ + bodyFailures: []error{ + PartialError{ + TemporaryFailed: []string{"tester1@example.org", "tester2@example.org"}, + Errs: map[string]error{ + "tester1@example.org": errors.New("you shall not pass"), + "tester2@example.org": errors.New("you shall not pass"), + }, + }, + }, + aborted: make(chan testutils.Msg, 10), + committed: make(chan testutils.Msg, 10), + } + q := newTestQueue(t, &dt) + defer cleanQueue(t, q) + + testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) + + // Delivery should be aborted, because it failed for all recipients. + readMsgChanTimeout(t, dt.aborted, 5*time.Second) + + // Second retry, should work fine. + msg := readMsgChanTimeout(t, dt.committed, 5*time.Second) + testutils.CheckMsg(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) + + q.Close() + // No more retries scheduled, queue storage is clear. + defer checkQueueDir(t, q, []string{}) +} + +func TestQueueDelivery_TemporaryFail_Partial(t *testing.T) { + t.Parallel() + + dt := unreliableTarget{ + bodyFailures: []error{ + PartialError{ + TemporaryFailed: []string{"tester2@example.org"}, + Errs: map[string]error{ + "tester2@example.org": &smtp.SMTPError{ + Code: 400, + Message: "go away", + }, + }, + }, + }, + aborted: make(chan testutils.Msg, 10), + committed: make(chan testutils.Msg, 10), + } + q := newTestQueue(t, &dt) + defer cleanQueue(t, q) + + testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) + + // Committed, tester1@example.org - ok. + msg := readMsgChanTimeout(t, dt.committed, 5000*time.Second) + // Side note: unreliableTarget adds recipients to the msg object even if they were rejected + // later using a partial error. So slice below is all recipients that were submitted by + // the queue. + testutils.CheckMsg(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) + + // committed #2, tester2@example.org - ok + msg = readMsgChanTimeout(t, dt.committed, 5000*time.Second) + testutils.CheckMsg(t, msg, "tester@example.com", []string{"tester2@example.org"}) + + q.Close() + // No more retries scheduled, queue storage is clear. + checkQueueDir(t, q, []string{}) +} + +func TestQueueDelivery_MultipleAttempts(t *testing.T) { + t.Parallel() + + dt := unreliableTarget{ + bodyFailures: []error{ + PartialError{ + Failed: []string{"tester1@example.org"}, + TemporaryFailed: []string{"tester2@example.org"}, + Errs: map[string]error{ + "tester1@example.org": errors.New("you shall not pass"), + "tester2@example.org": errors.New("you shall not pass"), + }, + }, + PartialError{ + TemporaryFailed: []string{"tester2@example.org"}, + Errs: map[string]error{ + "tester2@example.org": errors.New("you shall not pass"), + }, + }, + }, + committed: make(chan testutils.Msg, 10), + } + q := newTestQueue(t, &dt) + defer cleanQueue(t, q) + + testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org", "tester3@example.org"}) + + // Committed because delivery to tester3@example.org is succeeded. + msg := readMsgChanTimeout(t, dt.committed, 5*time.Second) + // Side note: This slice contains all recipients submitted by the queue, even if + // they were rejected later using PartialError. + testutils.CheckMsg(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org", "tester3@example.org"}) + + // tester1 is failed permanently, should not be retried. + // tester2 is failed temporary, should be retried. + msg = readMsgChanTimeout(t, dt.committed, 5*time.Second) + testutils.CheckMsg(t, msg, "tester@example.com", []string{"tester2@example.org"}) + + q.Close() + // No more retries should be scheduled. + checkQueueDir(t, q, []string{}) +} + +func TestQueueDelivery_PermanentRcptReject(t *testing.T) { + t.Parallel() + + dt := unreliableTarget{ + rcptFailures: []map[string]error{ + { + "tester1@example.org": &smtp.SMTPError{ + Code: 500, + Message: "go away", + }, + }, + }, + committed: make(chan testutils.Msg, 10), + } + q := newTestQueue(t, &dt) + defer cleanQueue(t, q) + + testutils.DoTestDelivery(t, q, "tester@example.org", []string{"tester1@example.org", "tester2@example.org"}) + + // Committed, tester2@example.org succeeded. + msg := readMsgChanTimeout(t, dt.committed, 5*time.Second) + testutils.CheckMsg(t, msg, "tester@example.org", []string{"tester2@example.org"}) + + q.Close() + // No more retries should be scheduled. + checkQueueDir(t, q, []string{}) +} + +func TestQueueDelivery_TemporaryRcptReject(t *testing.T) { + t.Parallel() + + dt := unreliableTarget{ + rcptFailures: []map[string]error{ + { + "tester1@example.org": &smtp.SMTPError{ + Code: 400, + Message: "go away", + }, + }, + }, + committed: make(chan testutils.Msg, 10), + } + q := newTestQueue(t, &dt) + defer cleanQueue(t, q) + + // First attempt: + // tester1 - temp. fail + // tester2 - ok + // Second attempt: + // tester1 - ok + testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) + + msg := readMsgChanTimeout(t, dt.committed, 5*time.Second) + // Unlike previous tests where unreliableTarget rejected recipients by PartialError, here they are rejected + // by AddRcpt directly, so they are NOT saved by the target. + testutils.CheckMsg(t, msg, "tester@example.com", []string{"tester2@example.org"}) + + msg = readMsgChanTimeout(t, dt.committed, 5*time.Second) + testutils.CheckMsg(t, msg, "tester@example.com", []string{"tester1@example.org"}) + + q.Close() + // No more retries should be scheduled. + checkQueueDir(t, q, []string{}) +} + +func TestQueueDelivery_SerializationRoundtrip(t *testing.T) { + t.Parallel() + + dt := unreliableTarget{ + rcptFailures: []map[string]error{ + { + "tester1@example.org": &smtp.SMTPError{ + Code: 400, + Message: "go away", + }, + }, + }, + committed: make(chan testutils.Msg, 10), + } + q := newTestQueue(t, &dt) + defer cleanQueue(t, q) + + // This is the most tricky test because it is racy and I have no idea what can be done to avoid it. + // It relies on us calling Close before queue dispatcher decides to retry delivery. + // Hence retry delay is increased from 0ms used in other tests to make it reliable. + q.initialRetryTime = 1 * time.Second + + // To make sure we will not time out due to post-init delay. + q.postInitDelay = 0 + + deliveryID := testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) + + // Standard partial delivery, retry will be scheduled for tester1@example.org. + msg := readMsgChanTimeout(t, dt.committed, 5*time.Second) + testutils.CheckMsg(t, msg, "tester@example.com", []string{"tester2@example.org"}) + + // Then stop it. + q.Close() + + // Make sure it is saved. + checkQueueDir(t, q, []string{deliveryID}) + + // Then reinit it. + q = newTestQueueDir(t, &dt, q.location) + + // Wait for retry and check it. + msg = readMsgChanTimeout(t, dt.committed, 5*time.Second) + testutils.CheckMsg(t, msg, "tester@example.com", []string{"tester1@example.org"}) + + // Close it again. + q.Close() + // No more retries should be scheduled. + checkQueueDir(t, q, []string{}) +} + +func TestQueueDelivery_DeserlizationCleanUp(t *testing.T) { + t.Parallel() + + test := func(t *testing.T, fileSuffix string) { + dt := unreliableTarget{ + rcptFailures: []map[string]error{ + { + "tester1@example.org": &smtp.SMTPError{ + Code: 400, + Message: "go away", + }, + }, + }, + committed: make(chan testutils.Msg, 10), + } + q := newTestQueue(t, &dt) + defer cleanQueue(t, q) + + // This is the most tricky test because it is racy and I have no idea what can be done to avoid it. + // It relies on us calling Close before queue dispatcher decides to retry delivery. + // Hence retry delay is increased from 0ms used in other tests to make it reliable. + q.initialRetryTime = 1 * time.Second + + // To make sure we will not time out due to post-init delay. + q.postInitDelay = 0 + + deliveryID := testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) + + // Standard partial delivery, retry will be scheduled for tester1@example.org. + msg := readMsgChanTimeout(t, dt.committed, 5*time.Second) + testutils.CheckMsg(t, msg, "tester@example.com", []string{"tester2@example.org"}) + + q.Close() + + if err := os.Remove(filepath.Join(q.location, deliveryID+fileSuffix)); err != nil { + t.Fatal(err) + } + + // Dangling files should be removing during load. + q = newTestQueueDir(t, &dt, q.location) + q.Close() + + // Nothing should be left. + checkQueueDir(t, q, []string{}) + } + + t.Run("NoMeta", func(t *testing.T) { + t.Skip("Not implemented") + test(t, ".meta") + }) + t.Run("NoBody", func(t *testing.T) { + test(t, ".body") + }) + t.Run("NoHeader", func(t *testing.T) { + test(t, ".header") + }) +} diff --git a/target/queue/timewheel.go b/target/queue/timewheel.go new file mode 100644 index 0000000..8440b52 --- /dev/null +++ b/target/queue/timewheel.go @@ -0,0 +1,115 @@ +package queue + +import ( + "container/list" + "sync" + "time" +) + +type TimeSlot struct { + Time time.Time + Value interface{} +} + +type TimeWheel struct { + slots *list.List + slotsLock sync.Mutex + + updateNotify chan time.Time + stopNotify chan struct{} + + dispatch chan TimeSlot +} + +func NewTimeWheel() *TimeWheel { + tw := &TimeWheel{ + slots: list.New(), + stopNotify: make(chan struct{}), + updateNotify: make(chan time.Time), + dispatch: make(chan TimeSlot, 10), + } + go tw.tick() + return tw +} + +func (tw *TimeWheel) Add(target time.Time, value interface{}) { + if value == nil { + panic("can't insert nil objects into TimeWheel queue") + } + + tw.slotsLock.Lock() + tw.slots.PushBack(TimeSlot{Time: target, Value: value}) + tw.slotsLock.Unlock() + + tw.updateNotify <- target +} + +func (tw *TimeWheel) Close() { + tw.stopNotify <- struct{}{} + <-tw.stopNotify + + close(tw.updateNotify) + close(tw.dispatch) +} + +func (tw *TimeWheel) tick() { + for { + now := time.Now() + // Look for list element closest to now. + tw.slotsLock.Lock() + var closestSlot TimeSlot + var closestEl *list.Element + for e := tw.slots.Front(); e != nil; e = e.Next() { + slot := e.Value.(TimeSlot) + if slot.Time.Sub(now) < closestSlot.Time.Sub(now) || closestSlot.Value == nil { + closestSlot = slot + closestEl = e + } + } + tw.slotsLock.Unlock() + // Only this goroutine removes elements from TimeWheel so we can be safe using closestSlot. + + // Queue is empty. Just wait until update. + if closestEl == nil { + select { + case <-tw.updateNotify: + continue + case <-tw.stopNotify: + tw.stopNotify <- struct{}{} + return + } + } + + timer := time.NewTimer(closestSlot.Time.Sub(now)) + + for { + select { + case <-timer.C: + tw.slotsLock.Lock() + tw.slots.Remove(closestEl) + tw.slotsLock.Unlock() + tw.dispatch <- closestSlot + + // break inside of select exits select, not for loop + goto breakinnerloop + case newTarget := <-tw.updateNotify: + // Avoid unnecessary restarts if new target is not going to affect our + // current wait time. + if closestSlot.Time.Sub(now) <= newTarget.Sub(now) { + continue + } + + timer.Stop() + // Recalculate new slot time. + case <-tw.stopNotify: + tw.stopNotify <- struct{}{} + return + } + } + breakinnerloop: + } +} + +func (tw *TimeWheel) Dispatch() <-chan TimeSlot { + return tw.dispatch +}