target/queue: Rework error handling to track retries per-recipient

While was not strictly needed before, it is necessary in the presence of
limits and other internal target errors that should cause the delivery
to get rescheduled without increasing retries counter.
This commit is contained in:
fox.cpp 2020-02-16 16:28:39 +03:00
parent 497384efd2
commit 8635a11293
No known key found for this signature in database
GPG key ID: E76D97CCEDE90B6C
2 changed files with 88 additions and 79 deletions

View file

@ -9,11 +9,11 @@ Interfaces implemented:
Implementation summary follows.
All scheduled deliveries are attempted to the configured DeliveryTarget.
All metadata is preserved on disk so
All metadata is preserved on disk.
Failure status is determined on per-recipient basis:
- Delivery.Start fail handled as a failure for all recipients.
- Delivery.AddRcpt fail handled as a failure for corresponding recipient.
- Delivery.AddRcpt fail handled as a failure for the corresponding recipient.
- Delivery.Body fail handled as a failure for all recipients.
- If Delivery implements PartialDelivery, then
PartialDelivery.BodyNonAtomic is used instead. Failures are determined based
@ -71,12 +71,8 @@ import (
// partialError describes state of partially successful message delivery.
type partialError struct {
// Recipients for which delivery permanently failed.
Failed []string
// Recipients for which delivery temporary failed.
TemporaryFailed []string
// Underlying error objects.
// Underlying error objects for each recipient.
Errs map[string]error
// Fields can be accessed without holding this lock, but only after
@ -93,16 +89,11 @@ func (pe *partialError) SetStatus(rcptTo string, err error) {
}
pe.statusLock.Lock()
defer pe.statusLock.Unlock()
if exterrors.IsTemporaryOrUnspec(err) {
pe.TemporaryFailed = append(pe.TemporaryFailed, rcptTo)
} else {
pe.Failed = append(pe.Failed, rcptTo)
}
pe.Errs[rcptTo] = err
}
func (pe partialError) Error() string {
return fmt.Sprintf("delivery failed for some recipients (permanently: %v, temporary: %v): %v", pe.Failed, pe.TemporaryFailed, pe.Errs)
return fmt.Sprintf("delivery failed for some recipients: %v", pe.Errs)
}
// dontRecover controls the behavior of panic handlers, if it is set to true -
@ -163,7 +154,7 @@ type QueueMetadata struct {
RcptErrs map[string]*smtp.SMTPError
// Amount of times delivery *already tried*.
TriesCount int
TriesCount map[string]int
FirstAttempt time.Time
LastAttempt time.Time
@ -364,54 +355,74 @@ func toSMTPErr(err error) *smtp.SMTPError {
func (q *Queue) tryDelivery(meta *QueueMetadata, header textproto.Header, body buffer.Buffer) {
dl := target.DeliveryLogger(q.Log, meta.MsgMeta)
dl.Debugf("delivery attempt #%d", meta.TriesCount+1)
partialErr := q.deliver(meta, header, body)
dl.Debugf("failures: permanently: %v, temporary: %v, errors: %v",
partialErr.Failed, partialErr.TemporaryFailed, partialErr.Errs)
dl.Debugf("errors: %v", partialErr.Errs)
// While iterating the list of recipients we also pick the smallest tries count
// and use it to calculate the delay for the next attempt.
smallestTriesCount := 999999
if meta.TriesCount == nil {
meta.TriesCount = make(map[string]int)
}
// Check attempted recipients and corresponding errors.
// Split list into two parts: recipients that should be retried (newRcpts)
// and recipients DSN will be generated for.
newRcpts := make([]string, 0, len(partialErr.Errs))
failedRcpts := make([]string, 0, len(partialErr.Errs))
for _, rcpt := range meta.To {
if _, ok := partialErr.Errs[rcpt]; ok {
rcptErr, ok := partialErr.Errs[rcpt]
if !ok {
dl.Msg("delivered", "rcpt", rcpt, "attempt", meta.TriesCount[rcpt])
continue
}
dl.Msg("delivered", "rcpt", rcpt, "attempt", meta.TriesCount+1)
}
// Save errors information for reporting in bounce message.
meta.FailedRcpts = append(meta.FailedRcpts, partialErr.Failed...)
for rcpt, rcptErr := range partialErr.Errs {
// Save last error (either temporary or permanent) for reporting in the DSN.
dl.Error("delivery attempt failed", rcptErr, "rcpt", rcpt)
meta.RcptErrs[rcpt] = toSMTPErr(rcptErr)
}
meta.To = partialErr.TemporaryFailed
meta.LastAttempt = time.Now()
if meta.TriesCount == q.maxTries || len(partialErr.TemporaryFailed) == 0 {
// Attempt either fully succeeded or completely failed.
if meta.TriesCount == q.maxTries {
for _, rcpt := range meta.TemporaryFailedRcpts {
dl.Msg("not delivered, temporary error", "rcpt", rcpt)
}
}
for _, rcpt := range meta.FailedRcpts {
temporary := exterrors.IsTemporaryOrUnspec(rcptErr)
if !temporary || meta.TriesCount[rcpt] == q.maxTries {
delete(meta.TriesCount, rcpt)
dl.Msg("not delivered, permanent error", "rcpt", rcpt)
failedRcpts = append(failedRcpts, rcpt)
continue
}
if len(meta.FailedRcpts)+len(meta.TemporaryFailedRcpts) != 0 {
q.emitDSN(meta, header)
// Temporary error, increase tries counter and requeue.
meta.TriesCount[rcpt]++
newRcpts = append(newRcpts, rcpt)
// See smallestTriesCount comment.
if count := meta.TriesCount[rcpt]; count > smallestTriesCount {
smallestTriesCount = count
}
}
// Generate DSN for recipients that failed permanently this time.
if len(failedRcpts) != 0 {
q.emitDSN(meta, header, failedRcpts)
}
// No recipients to try, either all failed or all succeeded.
if len(newRcpts) == 0 {
q.removeFromDisk(meta.MsgMeta)
return
}
meta.TriesCount++
meta.To = newRcpts
meta.LastAttempt = time.Now()
if err := q.updateMetadataOnDisk(meta); err != nil {
dl.Error("meta-data update", err)
}
nextTryTime := time.Now()
nextTryTime = nextTryTime.Add(q.initialRetryTime * time.Duration(math.Pow(q.retryTimeScale, float64(meta.TriesCount-1))))
// Delay between retries grows exponentally, the formula is:
// initialRetryTime * retryTimeScale ^ (smallestTriesCount - 1)
scaleFactor := time.Duration(math.Pow(q.retryTimeScale, float64(smallestTriesCount-1)))
nextTryTime = nextTryTime.Add(q.initialRetryTime * scaleFactor)
dl.Msg("will retry",
"attempts_count", meta.TriesCount,
"next_try_delay", time.Until(nextTryTime),
@ -436,7 +447,7 @@ func (q *Queue) deliver(meta *QueueMetadata, header textproto.Header, body buffe
}
msgMeta := meta.MsgMeta.DeepCopy()
msgMeta.ID = msgMeta.ID + "-" + strconv.Itoa(meta.TriesCount+1)
msgMeta.ID = msgMeta.ID + "-" + strconv.FormatInt(time.Now().Unix(), 16)
dl.Debugf("using message ID = %s", msgMeta.ID)
msgCtx, msgTask := trace.NewTask(context.Background(), "Queue delivery")
@ -447,7 +458,6 @@ func (q *Queue) deliver(meta *QueueMetadata, header textproto.Header, body buffe
mailTask.End()
if err != nil {
dl.Debugf("target.Start failed: %v", err)
perr.Failed = append(perr.Failed, meta.To...)
for _, rcpt := range meta.To {
perr.Errs[rcpt] = err
}
@ -459,11 +469,6 @@ func (q *Queue) deliver(meta *QueueMetadata, header textproto.Header, body buffe
for _, rcpt := range meta.To {
rcptCtx, rcptTask := trace.NewTask(msgCtx, "RCPT TO")
if err := delivery.AddRcpt(rcptCtx, rcpt); err != nil {
if exterrors.IsTemporaryOrUnspec(err) {
perr.TemporaryFailed = append(perr.TemporaryFailed, rcpt)
} else {
perr.Failed = append(perr.Failed, rcpt)
}
dl.Debugf("delivery.AddRcpt %s failed: %v", rcpt, err)
perr.Errs[rcpt] = err
} else {
@ -482,11 +487,6 @@ func (q *Queue) deliver(meta *QueueMetadata, header textproto.Header, body buffe
}
expandToPartialErr := func(err error) {
if exterrors.IsTemporaryOrUnspec(err) {
perr.TemporaryFailed = append(perr.TemporaryFailed, acceptedRcpts...)
} else {
perr.Failed = append(perr.Failed, acceptedRcpts...)
}
for _, rcpt := range acceptedRcpts {
perr.Errs[rcpt] = err
}
@ -666,8 +666,15 @@ func (q *Queue) readDiskQueue() error {
continue
}
smallestTriesCount := 999999
for _, count := range meta.TriesCount {
if smallestTriesCount > count {
smallestTriesCount = count
}
}
nextTryTime := meta.LastAttempt
nextTryTime = nextTryTime.Add(q.initialRetryTime * time.Duration(math.Pow(q.retryTimeScale, float64(meta.TriesCount-1))))
scaleFactor := time.Duration(math.Pow(q.retryTimeScale, float64(smallestTriesCount-1)))
nextTryTime = nextTryTime.Add(q.initialRetryTime * scaleFactor)
if time.Until(nextTryTime) < q.postInitDelay {
nextTryTime = time.Now().Add(q.postInitDelay)
@ -846,7 +853,7 @@ func (q *Queue) Name() string {
return "queue"
}
func (q *Queue) emitDSN(meta *QueueMetadata, header textproto.Header) {
func (q *Queue) emitDSN(meta *QueueMetadata, header textproto.Header, failedRcpts []string) {
// If, apparently, we have no DSN msgpipeline configured - do nothing.
if q.dsnPipeline == nil {
return
@ -880,19 +887,21 @@ func (q *Queue) emitDSN(meta *QueueMetadata, header textproto.Header) {
}
rcptInfo := make([]dsn.RecipientInfo, 0, len(meta.RcptErrs))
for rcpt, err := range meta.RcptErrs {
if meta.MsgMeta.OriginalRcpts != nil {
originalRcpt := meta.MsgMeta.OriginalRcpts[rcpt]
if originalRcpt != "" {
rcpt = originalRcpt
}
for _, rcpt := range failedRcpts {
rcptErr := meta.RcptErrs[rcpt]
// rcptErr is stored in RcptErrs using the effective recipient address,
// not the original one.
originalRcpt := meta.MsgMeta.OriginalRcpts[rcpt]
if originalRcpt != "" {
rcpt = originalRcpt
}
rcptInfo = append(rcptInfo, dsn.RecipientInfo{
FinalRecipient: rcpt,
Action: dsn.ActionFailed,
Status: err.EnhancedCode,
DiagnosticCode: err,
Status: rcptErr.EnhancedCode,
DiagnosticCode: rcptErr,
})
}

View file

@ -229,7 +229,7 @@ func TestQueueDelivery(t *testing.T) {
q := newTestQueue(t, &dt)
defer cleanQueue(t, q)
encID := testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})
testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})
// This is far from being a proper blackbox testing.
// But I can't come up with a better way to inspect the Queue state.
@ -241,7 +241,7 @@ func TestQueueDelivery(t *testing.T) {
msg := readMsgChanTimeout(t, dt.committed, 5*time.Second)
q.Close()
testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}, encID+"-1")
testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}, "")
// There should be no queued messages.
checkQueueDir(t, q, []string{})
@ -307,14 +307,14 @@ func TestQueueDelivery_TemporaryFail(t *testing.T) {
q := newTestQueue(t, &dt)
defer cleanQueue(t, q)
encID := testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})
testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})
// Delivery should be aborted, because it failed for all recipients.
readMsgChanTimeout(t, dt.aborted, 5*time.Second)
// Second retry, should work fine.
msg := readMsgChanTimeout(t, dt.committed, 5*time.Second)
testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}, encID+"-2")
testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}, "")
q.Close()
// No more retries scheduled, queue storage is clear.
@ -336,18 +336,18 @@ func TestQueueDelivery_TemporaryFail_Partial(t *testing.T) {
q := newTestQueue(t, &dt)
defer cleanQueue(t, q)
encID := testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})
testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})
// Committed, tester1@example.org - ok.
msg := readMsgChanTimeout(t, dt.committed, 5000*time.Second)
// Side note: unreliableTarget adds recipients to the msg object even if they were rejected
// later using a partial error. So slice below is all recipients that were submitted by
// the queue.
testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}, encID+"-1")
testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}, "")
// committed #2, tester2@example.org - ok
msg = readMsgChanTimeout(t, dt.committed, 5000*time.Second)
testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, encID+"-2")
testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "")
q.Close()
// No more retries scheduled, queue storage is clear.
@ -373,13 +373,13 @@ func TestQueueDelivery_MultipleAttempts(t *testing.T) {
q := newTestQueue(t, &dt)
defer cleanQueue(t, q)
encID := testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org", "tester3@example.org"})
testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org", "tester3@example.org"})
// Committed because delivery to tester3@example.org is succeeded.
msg := readMsgChanTimeout(t, dt.committed, 5*time.Second)
// Side note: This slice contains all recipients submitted by the queue, even if
// they were rejected later using partialError.
testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org", "tester3@example.org"}, encID+"-1")
testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org", "tester3@example.org"}, "")
// tester1 is failed permanently, should not be retried.
// tester2 is failed temporary, should be retried.
@ -387,7 +387,7 @@ func TestQueueDelivery_MultipleAttempts(t *testing.T) {
// Third attempt... tester2 delivered.
msg = readMsgChanTimeout(t, dt.committed, 5*time.Second)
testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, encID+"-3")
testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "")
q.Close()
// No more retries should be scheduled.
@ -408,11 +408,11 @@ func TestQueueDelivery_PermanentRcptReject(t *testing.T) {
q := newTestQueue(t, &dt)
defer cleanQueue(t, q)
encID := testutils.DoTestDelivery(t, q, "tester@example.org", []string{"tester1@example.org", "tester2@example.org"})
testutils.DoTestDelivery(t, q, "tester@example.org", []string{"tester1@example.org", "tester2@example.org"})
// Committed, tester2@example.org succeeded.
msg := readMsgChanTimeout(t, dt.committed, 5*time.Second)
testutils.CheckMsgID(t, msg, "tester@example.org", []string{"tester2@example.org"}, encID+"-1")
testutils.CheckMsgID(t, msg, "tester@example.org", []string{"tester2@example.org"}, "")
q.Close()
// No more retries should be scheduled.
@ -438,15 +438,15 @@ func TestQueueDelivery_TemporaryRcptReject(t *testing.T) {
// tester2 - ok
// Second attempt:
// tester1 - ok
encID := testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})
testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})
msg := readMsgChanTimeout(t, dt.committed, 5*time.Second)
// Unlike previous tests where unreliableTarget rejected recipients by partialError, here they are rejected
// by AddRcpt directly, so they are NOT saved by the target.
testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, encID+"-1")
testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "")
msg = readMsgChanTimeout(t, dt.committed, 5*time.Second)
testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org"}, encID+"-2")
testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org"}, "")
q.Close()
// No more retries should be scheduled.
@ -479,7 +479,7 @@ func TestQueueDelivery_SerializationRoundtrip(t *testing.T) {
// Standard partial delivery, retry will be scheduled for tester1@example.org.
msg := readMsgChanTimeout(t, dt.committed, 5*time.Second)
testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, deliveryID+"-1")
testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "")
// Then stop it.
q.Close()
@ -492,7 +492,7 @@ func TestQueueDelivery_SerializationRoundtrip(t *testing.T) {
// Wait for retry and check it.
msg = readMsgChanTimeout(t, dt.committed, 5*time.Second)
testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org"}, deliveryID+"-2")
testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org"}, "")
// Close it again.
q.Close()
@ -527,7 +527,7 @@ func TestQueueDelivery_DeserlizationCleanUp(t *testing.T) {
// Standard partial delivery, retry will be scheduled for tester1@example.org.
msg := readMsgChanTimeout(t, dt.committed, 5*time.Second)
testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, deliveryID+"-1")
testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "")
q.Close()