From 8635a11293dd20909c59cbf3dca76d7fe54ed5eb Mon Sep 17 00:00:00 2001 From: "fox.cpp" Date: Sun, 16 Feb 2020 16:28:39 +0300 Subject: [PATCH] target/queue: Rework error handling to track retries per-recipient While was not strictly needed before, it is necessary in the presence of limits and other internal target errors that should cause the delivery to get rescheduled without increasing retries counter. --- internal/target/queue/queue.go | 131 +++++++++++++++------------- internal/target/queue/queue_test.go | 36 ++++---- 2 files changed, 88 insertions(+), 79 deletions(-) diff --git a/internal/target/queue/queue.go b/internal/target/queue/queue.go index fbf1389..f1c7cdd 100644 --- a/internal/target/queue/queue.go +++ b/internal/target/queue/queue.go @@ -9,11 +9,11 @@ Interfaces implemented: Implementation summary follows. All scheduled deliveries are attempted to the configured DeliveryTarget. -All metadata is preserved on disk so +All metadata is preserved on disk. Failure status is determined on per-recipient basis: - Delivery.Start fail handled as a failure for all recipients. -- Delivery.AddRcpt fail handled as a failure for corresponding recipient. +- Delivery.AddRcpt fail handled as a failure for the corresponding recipient. - Delivery.Body fail handled as a failure for all recipients. - If Delivery implements PartialDelivery, then PartialDelivery.BodyNonAtomic is used instead. Failures are determined based @@ -71,12 +71,8 @@ import ( // partialError describes state of partially successful message delivery. type partialError struct { - // Recipients for which delivery permanently failed. - Failed []string - // Recipients for which delivery temporary failed. - TemporaryFailed []string - // Underlying error objects. + // Underlying error objects for each recipient. Errs map[string]error // Fields can be accessed without holding this lock, but only after @@ -93,16 +89,11 @@ func (pe *partialError) SetStatus(rcptTo string, err error) { } pe.statusLock.Lock() defer pe.statusLock.Unlock() - if exterrors.IsTemporaryOrUnspec(err) { - pe.TemporaryFailed = append(pe.TemporaryFailed, rcptTo) - } else { - pe.Failed = append(pe.Failed, rcptTo) - } pe.Errs[rcptTo] = err } func (pe partialError) Error() string { - return fmt.Sprintf("delivery failed for some recipients (permanently: %v, temporary: %v): %v", pe.Failed, pe.TemporaryFailed, pe.Errs) + return fmt.Sprintf("delivery failed for some recipients: %v", pe.Errs) } // dontRecover controls the behavior of panic handlers, if it is set to true - @@ -163,7 +154,7 @@ type QueueMetadata struct { RcptErrs map[string]*smtp.SMTPError // Amount of times delivery *already tried*. - TriesCount int + TriesCount map[string]int FirstAttempt time.Time LastAttempt time.Time @@ -364,54 +355,74 @@ func toSMTPErr(err error) *smtp.SMTPError { func (q *Queue) tryDelivery(meta *QueueMetadata, header textproto.Header, body buffer.Buffer) { dl := target.DeliveryLogger(q.Log, meta.MsgMeta) - dl.Debugf("delivery attempt #%d", meta.TriesCount+1) partialErr := q.deliver(meta, header, body) - dl.Debugf("failures: permanently: %v, temporary: %v, errors: %v", - partialErr.Failed, partialErr.TemporaryFailed, partialErr.Errs) + dl.Debugf("errors: %v", partialErr.Errs) + // While iterating the list of recipients we also pick the smallest tries count + // and use it to calculate the delay for the next attempt. + smallestTriesCount := 999999 + + if meta.TriesCount == nil { + meta.TriesCount = make(map[string]int) + } + + // Check attempted recipients and corresponding errors. + // Split list into two parts: recipients that should be retried (newRcpts) + // and recipients DSN will be generated for. + newRcpts := make([]string, 0, len(partialErr.Errs)) + failedRcpts := make([]string, 0, len(partialErr.Errs)) for _, rcpt := range meta.To { - if _, ok := partialErr.Errs[rcpt]; ok { + rcptErr, ok := partialErr.Errs[rcpt] + if !ok { + dl.Msg("delivered", "rcpt", rcpt, "attempt", meta.TriesCount[rcpt]) continue } - dl.Msg("delivered", "rcpt", rcpt, "attempt", meta.TriesCount+1) - } - - // Save errors information for reporting in bounce message. - meta.FailedRcpts = append(meta.FailedRcpts, partialErr.Failed...) - for rcpt, rcptErr := range partialErr.Errs { + // Save last error (either temporary or permanent) for reporting in the DSN. dl.Error("delivery attempt failed", rcptErr, "rcpt", rcpt) meta.RcptErrs[rcpt] = toSMTPErr(rcptErr) - } - meta.To = partialErr.TemporaryFailed - meta.LastAttempt = time.Now() - if meta.TriesCount == q.maxTries || len(partialErr.TemporaryFailed) == 0 { - // Attempt either fully succeeded or completely failed. - if meta.TriesCount == q.maxTries { - for _, rcpt := range meta.TemporaryFailedRcpts { - dl.Msg("not delivered, temporary error", "rcpt", rcpt) - } - } - for _, rcpt := range meta.FailedRcpts { + temporary := exterrors.IsTemporaryOrUnspec(rcptErr) + if !temporary || meta.TriesCount[rcpt] == q.maxTries { + delete(meta.TriesCount, rcpt) dl.Msg("not delivered, permanent error", "rcpt", rcpt) + failedRcpts = append(failedRcpts, rcpt) + continue } - if len(meta.FailedRcpts)+len(meta.TemporaryFailedRcpts) != 0 { - q.emitDSN(meta, header) + + // Temporary error, increase tries counter and requeue. + meta.TriesCount[rcpt]++ + newRcpts = append(newRcpts, rcpt) + + // See smallestTriesCount comment. + if count := meta.TriesCount[rcpt]; count > smallestTriesCount { + smallestTriesCount = count } + } + + // Generate DSN for recipients that failed permanently this time. + if len(failedRcpts) != 0 { + q.emitDSN(meta, header, failedRcpts) + } + // No recipients to try, either all failed or all succeeded. + if len(newRcpts) == 0 { q.removeFromDisk(meta.MsgMeta) return } - meta.TriesCount++ + meta.To = newRcpts + meta.LastAttempt = time.Now() if err := q.updateMetadataOnDisk(meta); err != nil { dl.Error("meta-data update", err) } nextTryTime := time.Now() - nextTryTime = nextTryTime.Add(q.initialRetryTime * time.Duration(math.Pow(q.retryTimeScale, float64(meta.TriesCount-1)))) + // Delay between retries grows exponentally, the formula is: + // initialRetryTime * retryTimeScale ^ (smallestTriesCount - 1) + scaleFactor := time.Duration(math.Pow(q.retryTimeScale, float64(smallestTriesCount-1))) + nextTryTime = nextTryTime.Add(q.initialRetryTime * scaleFactor) dl.Msg("will retry", "attempts_count", meta.TriesCount, "next_try_delay", time.Until(nextTryTime), @@ -436,7 +447,7 @@ func (q *Queue) deliver(meta *QueueMetadata, header textproto.Header, body buffe } msgMeta := meta.MsgMeta.DeepCopy() - msgMeta.ID = msgMeta.ID + "-" + strconv.Itoa(meta.TriesCount+1) + msgMeta.ID = msgMeta.ID + "-" + strconv.FormatInt(time.Now().Unix(), 16) dl.Debugf("using message ID = %s", msgMeta.ID) msgCtx, msgTask := trace.NewTask(context.Background(), "Queue delivery") @@ -447,7 +458,6 @@ func (q *Queue) deliver(meta *QueueMetadata, header textproto.Header, body buffe mailTask.End() if err != nil { dl.Debugf("target.Start failed: %v", err) - perr.Failed = append(perr.Failed, meta.To...) for _, rcpt := range meta.To { perr.Errs[rcpt] = err } @@ -459,11 +469,6 @@ func (q *Queue) deliver(meta *QueueMetadata, header textproto.Header, body buffe for _, rcpt := range meta.To { rcptCtx, rcptTask := trace.NewTask(msgCtx, "RCPT TO") if err := delivery.AddRcpt(rcptCtx, rcpt); err != nil { - if exterrors.IsTemporaryOrUnspec(err) { - perr.TemporaryFailed = append(perr.TemporaryFailed, rcpt) - } else { - perr.Failed = append(perr.Failed, rcpt) - } dl.Debugf("delivery.AddRcpt %s failed: %v", rcpt, err) perr.Errs[rcpt] = err } else { @@ -482,11 +487,6 @@ func (q *Queue) deliver(meta *QueueMetadata, header textproto.Header, body buffe } expandToPartialErr := func(err error) { - if exterrors.IsTemporaryOrUnspec(err) { - perr.TemporaryFailed = append(perr.TemporaryFailed, acceptedRcpts...) - } else { - perr.Failed = append(perr.Failed, acceptedRcpts...) - } for _, rcpt := range acceptedRcpts { perr.Errs[rcpt] = err } @@ -666,8 +666,15 @@ func (q *Queue) readDiskQueue() error { continue } + smallestTriesCount := 999999 + for _, count := range meta.TriesCount { + if smallestTriesCount > count { + smallestTriesCount = count + } + } nextTryTime := meta.LastAttempt - nextTryTime = nextTryTime.Add(q.initialRetryTime * time.Duration(math.Pow(q.retryTimeScale, float64(meta.TriesCount-1)))) + scaleFactor := time.Duration(math.Pow(q.retryTimeScale, float64(smallestTriesCount-1))) + nextTryTime = nextTryTime.Add(q.initialRetryTime * scaleFactor) if time.Until(nextTryTime) < q.postInitDelay { nextTryTime = time.Now().Add(q.postInitDelay) @@ -846,7 +853,7 @@ func (q *Queue) Name() string { return "queue" } -func (q *Queue) emitDSN(meta *QueueMetadata, header textproto.Header) { +func (q *Queue) emitDSN(meta *QueueMetadata, header textproto.Header, failedRcpts []string) { // If, apparently, we have no DSN msgpipeline configured - do nothing. if q.dsnPipeline == nil { return @@ -880,19 +887,21 @@ func (q *Queue) emitDSN(meta *QueueMetadata, header textproto.Header) { } rcptInfo := make([]dsn.RecipientInfo, 0, len(meta.RcptErrs)) - for rcpt, err := range meta.RcptErrs { - if meta.MsgMeta.OriginalRcpts != nil { - originalRcpt := meta.MsgMeta.OriginalRcpts[rcpt] - if originalRcpt != "" { - rcpt = originalRcpt - } + for _, rcpt := range failedRcpts { + rcptErr := meta.RcptErrs[rcpt] + // rcptErr is stored in RcptErrs using the effective recipient address, + // not the original one. + + originalRcpt := meta.MsgMeta.OriginalRcpts[rcpt] + if originalRcpt != "" { + rcpt = originalRcpt } rcptInfo = append(rcptInfo, dsn.RecipientInfo{ FinalRecipient: rcpt, Action: dsn.ActionFailed, - Status: err.EnhancedCode, - DiagnosticCode: err, + Status: rcptErr.EnhancedCode, + DiagnosticCode: rcptErr, }) } diff --git a/internal/target/queue/queue_test.go b/internal/target/queue/queue_test.go index 1ff9233..e97dfc9 100644 --- a/internal/target/queue/queue_test.go +++ b/internal/target/queue/queue_test.go @@ -229,7 +229,7 @@ func TestQueueDelivery(t *testing.T) { q := newTestQueue(t, &dt) defer cleanQueue(t, q) - encID := testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) + testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) // This is far from being a proper blackbox testing. // But I can't come up with a better way to inspect the Queue state. @@ -241,7 +241,7 @@ func TestQueueDelivery(t *testing.T) { msg := readMsgChanTimeout(t, dt.committed, 5*time.Second) q.Close() - testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}, encID+"-1") + testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}, "") // There should be no queued messages. checkQueueDir(t, q, []string{}) @@ -307,14 +307,14 @@ func TestQueueDelivery_TemporaryFail(t *testing.T) { q := newTestQueue(t, &dt) defer cleanQueue(t, q) - encID := testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) + testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) // Delivery should be aborted, because it failed for all recipients. readMsgChanTimeout(t, dt.aborted, 5*time.Second) // Second retry, should work fine. msg := readMsgChanTimeout(t, dt.committed, 5*time.Second) - testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}, encID+"-2") + testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}, "") q.Close() // No more retries scheduled, queue storage is clear. @@ -336,18 +336,18 @@ func TestQueueDelivery_TemporaryFail_Partial(t *testing.T) { q := newTestQueue(t, &dt) defer cleanQueue(t, q) - encID := testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) + testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) // Committed, tester1@example.org - ok. msg := readMsgChanTimeout(t, dt.committed, 5000*time.Second) // Side note: unreliableTarget adds recipients to the msg object even if they were rejected // later using a partial error. So slice below is all recipients that were submitted by // the queue. - testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}, encID+"-1") + testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}, "") // committed #2, tester2@example.org - ok msg = readMsgChanTimeout(t, dt.committed, 5000*time.Second) - testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, encID+"-2") + testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "") q.Close() // No more retries scheduled, queue storage is clear. @@ -373,13 +373,13 @@ func TestQueueDelivery_MultipleAttempts(t *testing.T) { q := newTestQueue(t, &dt) defer cleanQueue(t, q) - encID := testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org", "tester3@example.org"}) + testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org", "tester3@example.org"}) // Committed because delivery to tester3@example.org is succeeded. msg := readMsgChanTimeout(t, dt.committed, 5*time.Second) // Side note: This slice contains all recipients submitted by the queue, even if // they were rejected later using partialError. - testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org", "tester3@example.org"}, encID+"-1") + testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org", "tester3@example.org"}, "") // tester1 is failed permanently, should not be retried. // tester2 is failed temporary, should be retried. @@ -387,7 +387,7 @@ func TestQueueDelivery_MultipleAttempts(t *testing.T) { // Third attempt... tester2 delivered. msg = readMsgChanTimeout(t, dt.committed, 5*time.Second) - testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, encID+"-3") + testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "") q.Close() // No more retries should be scheduled. @@ -408,11 +408,11 @@ func TestQueueDelivery_PermanentRcptReject(t *testing.T) { q := newTestQueue(t, &dt) defer cleanQueue(t, q) - encID := testutils.DoTestDelivery(t, q, "tester@example.org", []string{"tester1@example.org", "tester2@example.org"}) + testutils.DoTestDelivery(t, q, "tester@example.org", []string{"tester1@example.org", "tester2@example.org"}) // Committed, tester2@example.org succeeded. msg := readMsgChanTimeout(t, dt.committed, 5*time.Second) - testutils.CheckMsgID(t, msg, "tester@example.org", []string{"tester2@example.org"}, encID+"-1") + testutils.CheckMsgID(t, msg, "tester@example.org", []string{"tester2@example.org"}, "") q.Close() // No more retries should be scheduled. @@ -438,15 +438,15 @@ func TestQueueDelivery_TemporaryRcptReject(t *testing.T) { // tester2 - ok // Second attempt: // tester1 - ok - encID := testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) + testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) msg := readMsgChanTimeout(t, dt.committed, 5*time.Second) // Unlike previous tests where unreliableTarget rejected recipients by partialError, here they are rejected // by AddRcpt directly, so they are NOT saved by the target. - testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, encID+"-1") + testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "") msg = readMsgChanTimeout(t, dt.committed, 5*time.Second) - testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org"}, encID+"-2") + testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org"}, "") q.Close() // No more retries should be scheduled. @@ -479,7 +479,7 @@ func TestQueueDelivery_SerializationRoundtrip(t *testing.T) { // Standard partial delivery, retry will be scheduled for tester1@example.org. msg := readMsgChanTimeout(t, dt.committed, 5*time.Second) - testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, deliveryID+"-1") + testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "") // Then stop it. q.Close() @@ -492,7 +492,7 @@ func TestQueueDelivery_SerializationRoundtrip(t *testing.T) { // Wait for retry and check it. msg = readMsgChanTimeout(t, dt.committed, 5*time.Second) - testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org"}, deliveryID+"-2") + testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org"}, "") // Close it again. q.Close() @@ -527,7 +527,7 @@ func TestQueueDelivery_DeserlizationCleanUp(t *testing.T) { // Standard partial delivery, retry will be scheduled for tester1@example.org. msg := readMsgChanTimeout(t, dt.committed, 5*time.Second) - testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, deliveryID+"-1") + testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "") q.Close()