diff --git a/internal/target/queue/queue.go b/internal/target/queue/queue.go index fbf1389..f1c7cdd 100644 --- a/internal/target/queue/queue.go +++ b/internal/target/queue/queue.go @@ -9,11 +9,11 @@ Interfaces implemented: Implementation summary follows. All scheduled deliveries are attempted to the configured DeliveryTarget. -All metadata is preserved on disk so +All metadata is preserved on disk. Failure status is determined on per-recipient basis: - Delivery.Start fail handled as a failure for all recipients. -- Delivery.AddRcpt fail handled as a failure for corresponding recipient. +- Delivery.AddRcpt fail handled as a failure for the corresponding recipient. - Delivery.Body fail handled as a failure for all recipients. - If Delivery implements PartialDelivery, then PartialDelivery.BodyNonAtomic is used instead. Failures are determined based @@ -71,12 +71,8 @@ import ( // partialError describes state of partially successful message delivery. type partialError struct { - // Recipients for which delivery permanently failed. - Failed []string - // Recipients for which delivery temporary failed. - TemporaryFailed []string - // Underlying error objects. + // Underlying error objects for each recipient. Errs map[string]error // Fields can be accessed without holding this lock, but only after @@ -93,16 +89,11 @@ func (pe *partialError) SetStatus(rcptTo string, err error) { } pe.statusLock.Lock() defer pe.statusLock.Unlock() - if exterrors.IsTemporaryOrUnspec(err) { - pe.TemporaryFailed = append(pe.TemporaryFailed, rcptTo) - } else { - pe.Failed = append(pe.Failed, rcptTo) - } pe.Errs[rcptTo] = err } func (pe partialError) Error() string { - return fmt.Sprintf("delivery failed for some recipients (permanently: %v, temporary: %v): %v", pe.Failed, pe.TemporaryFailed, pe.Errs) + return fmt.Sprintf("delivery failed for some recipients: %v", pe.Errs) } // dontRecover controls the behavior of panic handlers, if it is set to true - @@ -163,7 +154,7 @@ type QueueMetadata struct { RcptErrs map[string]*smtp.SMTPError // Amount of times delivery *already tried*. - TriesCount int + TriesCount map[string]int FirstAttempt time.Time LastAttempt time.Time @@ -364,54 +355,74 @@ func toSMTPErr(err error) *smtp.SMTPError { func (q *Queue) tryDelivery(meta *QueueMetadata, header textproto.Header, body buffer.Buffer) { dl := target.DeliveryLogger(q.Log, meta.MsgMeta) - dl.Debugf("delivery attempt #%d", meta.TriesCount+1) partialErr := q.deliver(meta, header, body) - dl.Debugf("failures: permanently: %v, temporary: %v, errors: %v", - partialErr.Failed, partialErr.TemporaryFailed, partialErr.Errs) + dl.Debugf("errors: %v", partialErr.Errs) + // While iterating the list of recipients we also pick the smallest tries count + // and use it to calculate the delay for the next attempt. + smallestTriesCount := 999999 + + if meta.TriesCount == nil { + meta.TriesCount = make(map[string]int) + } + + // Check attempted recipients and corresponding errors. + // Split list into two parts: recipients that should be retried (newRcpts) + // and recipients DSN will be generated for. + newRcpts := make([]string, 0, len(partialErr.Errs)) + failedRcpts := make([]string, 0, len(partialErr.Errs)) for _, rcpt := range meta.To { - if _, ok := partialErr.Errs[rcpt]; ok { + rcptErr, ok := partialErr.Errs[rcpt] + if !ok { + dl.Msg("delivered", "rcpt", rcpt, "attempt", meta.TriesCount[rcpt]) continue } - dl.Msg("delivered", "rcpt", rcpt, "attempt", meta.TriesCount+1) - } - - // Save errors information for reporting in bounce message. - meta.FailedRcpts = append(meta.FailedRcpts, partialErr.Failed...) - for rcpt, rcptErr := range partialErr.Errs { + // Save last error (either temporary or permanent) for reporting in the DSN. dl.Error("delivery attempt failed", rcptErr, "rcpt", rcpt) meta.RcptErrs[rcpt] = toSMTPErr(rcptErr) - } - meta.To = partialErr.TemporaryFailed - meta.LastAttempt = time.Now() - if meta.TriesCount == q.maxTries || len(partialErr.TemporaryFailed) == 0 { - // Attempt either fully succeeded or completely failed. - if meta.TriesCount == q.maxTries { - for _, rcpt := range meta.TemporaryFailedRcpts { - dl.Msg("not delivered, temporary error", "rcpt", rcpt) - } - } - for _, rcpt := range meta.FailedRcpts { + temporary := exterrors.IsTemporaryOrUnspec(rcptErr) + if !temporary || meta.TriesCount[rcpt] == q.maxTries { + delete(meta.TriesCount, rcpt) dl.Msg("not delivered, permanent error", "rcpt", rcpt) + failedRcpts = append(failedRcpts, rcpt) + continue } - if len(meta.FailedRcpts)+len(meta.TemporaryFailedRcpts) != 0 { - q.emitDSN(meta, header) + + // Temporary error, increase tries counter and requeue. + meta.TriesCount[rcpt]++ + newRcpts = append(newRcpts, rcpt) + + // See smallestTriesCount comment. + if count := meta.TriesCount[rcpt]; count > smallestTriesCount { + smallestTriesCount = count } + } + + // Generate DSN for recipients that failed permanently this time. + if len(failedRcpts) != 0 { + q.emitDSN(meta, header, failedRcpts) + } + // No recipients to try, either all failed or all succeeded. + if len(newRcpts) == 0 { q.removeFromDisk(meta.MsgMeta) return } - meta.TriesCount++ + meta.To = newRcpts + meta.LastAttempt = time.Now() if err := q.updateMetadataOnDisk(meta); err != nil { dl.Error("meta-data update", err) } nextTryTime := time.Now() - nextTryTime = nextTryTime.Add(q.initialRetryTime * time.Duration(math.Pow(q.retryTimeScale, float64(meta.TriesCount-1)))) + // Delay between retries grows exponentally, the formula is: + // initialRetryTime * retryTimeScale ^ (smallestTriesCount - 1) + scaleFactor := time.Duration(math.Pow(q.retryTimeScale, float64(smallestTriesCount-1))) + nextTryTime = nextTryTime.Add(q.initialRetryTime * scaleFactor) dl.Msg("will retry", "attempts_count", meta.TriesCount, "next_try_delay", time.Until(nextTryTime), @@ -436,7 +447,7 @@ func (q *Queue) deliver(meta *QueueMetadata, header textproto.Header, body buffe } msgMeta := meta.MsgMeta.DeepCopy() - msgMeta.ID = msgMeta.ID + "-" + strconv.Itoa(meta.TriesCount+1) + msgMeta.ID = msgMeta.ID + "-" + strconv.FormatInt(time.Now().Unix(), 16) dl.Debugf("using message ID = %s", msgMeta.ID) msgCtx, msgTask := trace.NewTask(context.Background(), "Queue delivery") @@ -447,7 +458,6 @@ func (q *Queue) deliver(meta *QueueMetadata, header textproto.Header, body buffe mailTask.End() if err != nil { dl.Debugf("target.Start failed: %v", err) - perr.Failed = append(perr.Failed, meta.To...) for _, rcpt := range meta.To { perr.Errs[rcpt] = err } @@ -459,11 +469,6 @@ func (q *Queue) deliver(meta *QueueMetadata, header textproto.Header, body buffe for _, rcpt := range meta.To { rcptCtx, rcptTask := trace.NewTask(msgCtx, "RCPT TO") if err := delivery.AddRcpt(rcptCtx, rcpt); err != nil { - if exterrors.IsTemporaryOrUnspec(err) { - perr.TemporaryFailed = append(perr.TemporaryFailed, rcpt) - } else { - perr.Failed = append(perr.Failed, rcpt) - } dl.Debugf("delivery.AddRcpt %s failed: %v", rcpt, err) perr.Errs[rcpt] = err } else { @@ -482,11 +487,6 @@ func (q *Queue) deliver(meta *QueueMetadata, header textproto.Header, body buffe } expandToPartialErr := func(err error) { - if exterrors.IsTemporaryOrUnspec(err) { - perr.TemporaryFailed = append(perr.TemporaryFailed, acceptedRcpts...) - } else { - perr.Failed = append(perr.Failed, acceptedRcpts...) - } for _, rcpt := range acceptedRcpts { perr.Errs[rcpt] = err } @@ -666,8 +666,15 @@ func (q *Queue) readDiskQueue() error { continue } + smallestTriesCount := 999999 + for _, count := range meta.TriesCount { + if smallestTriesCount > count { + smallestTriesCount = count + } + } nextTryTime := meta.LastAttempt - nextTryTime = nextTryTime.Add(q.initialRetryTime * time.Duration(math.Pow(q.retryTimeScale, float64(meta.TriesCount-1)))) + scaleFactor := time.Duration(math.Pow(q.retryTimeScale, float64(smallestTriesCount-1))) + nextTryTime = nextTryTime.Add(q.initialRetryTime * scaleFactor) if time.Until(nextTryTime) < q.postInitDelay { nextTryTime = time.Now().Add(q.postInitDelay) @@ -846,7 +853,7 @@ func (q *Queue) Name() string { return "queue" } -func (q *Queue) emitDSN(meta *QueueMetadata, header textproto.Header) { +func (q *Queue) emitDSN(meta *QueueMetadata, header textproto.Header, failedRcpts []string) { // If, apparently, we have no DSN msgpipeline configured - do nothing. if q.dsnPipeline == nil { return @@ -880,19 +887,21 @@ func (q *Queue) emitDSN(meta *QueueMetadata, header textproto.Header) { } rcptInfo := make([]dsn.RecipientInfo, 0, len(meta.RcptErrs)) - for rcpt, err := range meta.RcptErrs { - if meta.MsgMeta.OriginalRcpts != nil { - originalRcpt := meta.MsgMeta.OriginalRcpts[rcpt] - if originalRcpt != "" { - rcpt = originalRcpt - } + for _, rcpt := range failedRcpts { + rcptErr := meta.RcptErrs[rcpt] + // rcptErr is stored in RcptErrs using the effective recipient address, + // not the original one. + + originalRcpt := meta.MsgMeta.OriginalRcpts[rcpt] + if originalRcpt != "" { + rcpt = originalRcpt } rcptInfo = append(rcptInfo, dsn.RecipientInfo{ FinalRecipient: rcpt, Action: dsn.ActionFailed, - Status: err.EnhancedCode, - DiagnosticCode: err, + Status: rcptErr.EnhancedCode, + DiagnosticCode: rcptErr, }) } diff --git a/internal/target/queue/queue_test.go b/internal/target/queue/queue_test.go index 1ff9233..e97dfc9 100644 --- a/internal/target/queue/queue_test.go +++ b/internal/target/queue/queue_test.go @@ -229,7 +229,7 @@ func TestQueueDelivery(t *testing.T) { q := newTestQueue(t, &dt) defer cleanQueue(t, q) - encID := testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) + testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) // This is far from being a proper blackbox testing. // But I can't come up with a better way to inspect the Queue state. @@ -241,7 +241,7 @@ func TestQueueDelivery(t *testing.T) { msg := readMsgChanTimeout(t, dt.committed, 5*time.Second) q.Close() - testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}, encID+"-1") + testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}, "") // There should be no queued messages. checkQueueDir(t, q, []string{}) @@ -307,14 +307,14 @@ func TestQueueDelivery_TemporaryFail(t *testing.T) { q := newTestQueue(t, &dt) defer cleanQueue(t, q) - encID := testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) + testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) // Delivery should be aborted, because it failed for all recipients. readMsgChanTimeout(t, dt.aborted, 5*time.Second) // Second retry, should work fine. msg := readMsgChanTimeout(t, dt.committed, 5*time.Second) - testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}, encID+"-2") + testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}, "") q.Close() // No more retries scheduled, queue storage is clear. @@ -336,18 +336,18 @@ func TestQueueDelivery_TemporaryFail_Partial(t *testing.T) { q := newTestQueue(t, &dt) defer cleanQueue(t, q) - encID := testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) + testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) // Committed, tester1@example.org - ok. msg := readMsgChanTimeout(t, dt.committed, 5000*time.Second) // Side note: unreliableTarget adds recipients to the msg object even if they were rejected // later using a partial error. So slice below is all recipients that were submitted by // the queue. - testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}, encID+"-1") + testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}, "") // committed #2, tester2@example.org - ok msg = readMsgChanTimeout(t, dt.committed, 5000*time.Second) - testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, encID+"-2") + testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "") q.Close() // No more retries scheduled, queue storage is clear. @@ -373,13 +373,13 @@ func TestQueueDelivery_MultipleAttempts(t *testing.T) { q := newTestQueue(t, &dt) defer cleanQueue(t, q) - encID := testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org", "tester3@example.org"}) + testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org", "tester3@example.org"}) // Committed because delivery to tester3@example.org is succeeded. msg := readMsgChanTimeout(t, dt.committed, 5*time.Second) // Side note: This slice contains all recipients submitted by the queue, even if // they were rejected later using partialError. - testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org", "tester3@example.org"}, encID+"-1") + testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org", "tester3@example.org"}, "") // tester1 is failed permanently, should not be retried. // tester2 is failed temporary, should be retried. @@ -387,7 +387,7 @@ func TestQueueDelivery_MultipleAttempts(t *testing.T) { // Third attempt... tester2 delivered. msg = readMsgChanTimeout(t, dt.committed, 5*time.Second) - testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, encID+"-3") + testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "") q.Close() // No more retries should be scheduled. @@ -408,11 +408,11 @@ func TestQueueDelivery_PermanentRcptReject(t *testing.T) { q := newTestQueue(t, &dt) defer cleanQueue(t, q) - encID := testutils.DoTestDelivery(t, q, "tester@example.org", []string{"tester1@example.org", "tester2@example.org"}) + testutils.DoTestDelivery(t, q, "tester@example.org", []string{"tester1@example.org", "tester2@example.org"}) // Committed, tester2@example.org succeeded. msg := readMsgChanTimeout(t, dt.committed, 5*time.Second) - testutils.CheckMsgID(t, msg, "tester@example.org", []string{"tester2@example.org"}, encID+"-1") + testutils.CheckMsgID(t, msg, "tester@example.org", []string{"tester2@example.org"}, "") q.Close() // No more retries should be scheduled. @@ -438,15 +438,15 @@ func TestQueueDelivery_TemporaryRcptReject(t *testing.T) { // tester2 - ok // Second attempt: // tester1 - ok - encID := testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) + testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}) msg := readMsgChanTimeout(t, dt.committed, 5*time.Second) // Unlike previous tests where unreliableTarget rejected recipients by partialError, here they are rejected // by AddRcpt directly, so they are NOT saved by the target. - testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, encID+"-1") + testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "") msg = readMsgChanTimeout(t, dt.committed, 5*time.Second) - testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org"}, encID+"-2") + testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org"}, "") q.Close() // No more retries should be scheduled. @@ -479,7 +479,7 @@ func TestQueueDelivery_SerializationRoundtrip(t *testing.T) { // Standard partial delivery, retry will be scheduled for tester1@example.org. msg := readMsgChanTimeout(t, dt.committed, 5*time.Second) - testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, deliveryID+"-1") + testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "") // Then stop it. q.Close() @@ -492,7 +492,7 @@ func TestQueueDelivery_SerializationRoundtrip(t *testing.T) { // Wait for retry and check it. msg = readMsgChanTimeout(t, dt.committed, 5*time.Second) - testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org"}, deliveryID+"-2") + testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org"}, "") // Close it again. q.Close() @@ -527,7 +527,7 @@ func TestQueueDelivery_DeserlizationCleanUp(t *testing.T) { // Standard partial delivery, retry will be scheduled for tester1@example.org. msg := readMsgChanTimeout(t, dt.committed, 5*time.Second) - testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, deliveryID+"-1") + testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "") q.Close()