maddy/internal/endpoint/smtp/session.go
fox.cpp 9b279735e8
Address several TODO comments
Several comments were removed since they are not worth the trouble.
A few minor issues were addressed.
Most of remaining comments got corresponding GitHub issues assigned.
2020-03-05 02:26:30 +03:00

505 lines
12 KiB
Go

package smtp
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"net"
"runtime/trace"
"strings"
"sync"
"github.com/emersion/go-message/textproto"
"github.com/emersion/go-smtp"
"github.com/foxcpp/maddy/internal/address"
"github.com/foxcpp/maddy/internal/buffer"
"github.com/foxcpp/maddy/internal/dns"
"github.com/foxcpp/maddy/internal/exterrors"
"github.com/foxcpp/maddy/internal/log"
"github.com/foxcpp/maddy/internal/module"
)
type Session struct {
endp *Endpoint
// Specific for this session.
// sessionCtx is not used for cancellation or timeouts, only for tracing.
sessionCtx context.Context
cancelRDNS func()
connState module.ConnState
repeatedMailErrs int
loggedRcptErrors int
// Specific for the currently handled message.
// msgCtx is not used for cancellation or timeouts, only for tracing.
// It is the subcontext of sessionCtx.
// Mutex is used to prevent Close from accessing inconsistent state when it
// is called asynchronously to any SMTP command.
msgLock sync.Mutex
msgCtx context.Context
msgTask *trace.Task
mailFrom string
opts smtp.MailOptions
msgMeta *module.MsgMetadata
delivery module.Delivery
deliveryErr error
log log.Logger
}
func (s *Session) Reset() {
s.msgLock.Lock()
defer s.msgLock.Unlock()
if s.delivery != nil {
s.abort(s.msgCtx)
}
s.endp.Log.DebugMsg("reset")
}
func (s *Session) releaseLimits() {
_, domain, err := address.Split(s.mailFrom)
if err != nil {
return
}
addr, ok := s.msgMeta.Conn.RemoteAddr.(*net.TCPAddr)
if !ok {
addr = &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)}
}
s.endp.limits.ReleaseMsg(addr.IP, domain)
}
func (s *Session) abort(ctx context.Context) {
if err := s.delivery.Abort(ctx); err != nil {
s.endp.Log.Error("delivery abort failed", err)
}
s.log.Msg("aborted", "msg_id", s.msgMeta.ID)
s.mailFrom = ""
s.opts = smtp.MailOptions{}
s.msgMeta = nil
s.delivery = nil
s.deliveryErr = nil
s.msgCtx = nil
s.msgTask.End()
}
func (s *Session) startDelivery(ctx context.Context, from string, opts smtp.MailOptions) (string, error) {
var err error
msgMeta := &module.MsgMetadata{
Conn: &s.connState,
SMTPOpts: opts,
}
// INTERNATIONALIZATION: Do not permit non-ASCII addresses unless SMTPUTF8 is
// used.
for _, ch := range from {
if ch > 128 && !opts.UTF8 {
return "", &exterrors.SMTPError{
Code: 550,
EnhancedCode: exterrors.EnhancedCode{5, 6, 7},
Message: "SMTPUTF8 is required for non-ASCII senders",
}
}
}
// Decode punycode, normalize to NFC and case-fold address.
cleanFrom, err := address.CleanDomain(from)
if err != nil {
return "", &exterrors.SMTPError{
Code: 553,
EnhancedCode: exterrors.EnhancedCode{5, 1, 7},
Message: "Unable to normalize the sender address",
}
}
msgMeta.ID, err = module.GenerateMsgID()
if err != nil {
return "", err
}
msgMeta.OriginalFrom = from
_, domain, err := address.Split(cleanFrom)
if err != nil {
return "", err
}
remoteIP, ok := msgMeta.Conn.RemoteAddr.(*net.TCPAddr)
if !ok {
remoteIP = &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)}
}
if err := s.endp.limits.TakeMsg(context.Background(), remoteIP.IP, domain); err != nil {
return "", err
}
if s.connState.AuthUser != "" {
s.log.Msg("incoming message",
"src_host", msgMeta.Conn.Hostname,
"src_ip", msgMeta.Conn.RemoteAddr.String(),
"sender", from,
"msg_id", msgMeta.ID,
"username", s.connState.AuthUser,
)
} else {
s.log.Msg("incoming message",
"src_host", msgMeta.Conn.Hostname,
"src_ip", msgMeta.Conn.RemoteAddr.String(),
"sender", from,
"msg_id", msgMeta.ID,
)
}
s.msgCtx, s.msgTask = trace.NewTask(ctx, "Incoming Message")
mailCtx, mailTask := trace.NewTask(s.msgCtx, "MAIL FROM")
defer mailTask.End()
delivery, err := s.endp.pipeline.Start(mailCtx, msgMeta, cleanFrom)
if err != nil {
s.msgCtx = nil
s.msgTask.End()
return msgMeta.ID, err
}
s.msgMeta = msgMeta
s.mailFrom = cleanFrom
s.delivery = delivery
return msgMeta.ID, nil
}
func (s *Session) Mail(from string, opts smtp.MailOptions) error {
s.msgLock.Lock()
defer s.msgLock.Unlock()
if !s.endp.deferServerReject {
// Will initialize s.msgCtx.
msgID, err := s.startDelivery(s.sessionCtx, from, opts)
if err != nil {
if err != context.DeadlineExceeded {
s.log.Error("MAIL FROM error", err, "msg_id", msgID)
}
return s.endp.wrapErr(msgID, !opts.UTF8, err)
}
}
// Keep the MAIL FROM argument for deferred startDelivery.
s.mailFrom = from
s.opts = opts
return nil
}
func (s *Session) fetchRDNSName(ctx context.Context) {
defer trace.StartRegion(ctx, "rDNS fetch").End()
tcpAddr, ok := s.connState.RemoteAddr.(*net.TCPAddr)
if !ok {
s.connState.RDNSName.Set(nil, nil)
return
}
name, err := dns.LookupAddr(ctx, s.endp.resolver, tcpAddr.IP)
if err != nil {
dnsErr, ok := err.(*net.DNSError)
if ok && dnsErr.IsNotFound {
s.connState.RDNSName.Set(nil, nil)
return
}
reason, misc := exterrors.UnwrapDNSErr(err)
misc["reason"] = reason
s.log.Error("rDNS error", exterrors.WithFields(err, misc), "src_ip", s.connState.RemoteAddr)
s.connState.RDNSName.Set(nil, err)
return
}
s.connState.RDNSName.Set(name, nil)
}
func (s *Session) Rcpt(to string) error {
s.msgLock.Lock()
defer s.msgLock.Unlock()
// deferServerReject = true and this is the first RCPT TO command.
if s.delivery == nil {
// If we already attempted to initialize the delivery -
// fail again.
if s.deliveryErr != nil {
s.repeatedMailErrs++
// The deliveryErr is already wrapped.
return s.deliveryErr
}
// It will initialize s.msgCtx.
msgID, err := s.startDelivery(s.sessionCtx, s.mailFrom, s.opts)
if err != nil {
if err != context.DeadlineExceeded {
s.log.Error("MAIL FROM error (deferred)", err, "rcpt", to, "msg_id", msgID)
}
s.deliveryErr = s.endp.wrapErr(msgID, !s.opts.UTF8, err)
return s.deliveryErr
}
}
rcptCtx, rcptTask := trace.NewTask(s.msgCtx, "RCPT TO")
defer rcptTask.End()
if err := s.rcpt(rcptCtx, to); err != nil {
if s.loggedRcptErrors < s.endp.maxLoggedRcptErrors {
s.log.Error("RCPT error", err, "rcpt", to)
s.loggedRcptErrors++
if s.loggedRcptErrors == s.endp.maxLoggedRcptErrors {
s.log.Msg("too many RCPT errors, possible dictonary attack", "src_ip", s.connState.RemoteAddr, "msg_id", s.msgMeta.ID)
}
}
return s.endp.wrapErr(s.msgMeta.ID, !s.opts.UTF8, err)
}
s.endp.Log.Msg("RCPT ok", "rcpt", to, "msg_id", s.msgMeta.ID)
return nil
}
func (s *Session) rcpt(ctx context.Context, to string) error {
// INTERNATIONALIZATION: Do not permit non-ASCII addresses unless SMTPUTF8 is
// used.
if !address.IsASCII(to) && !s.opts.UTF8 {
return &exterrors.SMTPError{
Code: 553,
EnhancedCode: exterrors.EnhancedCode{5, 6, 7},
Message: "SMTPUTF8 is required for non-ASCII recipients",
}
}
cleanTo, err := address.CleanDomain(to)
if err != nil {
return &exterrors.SMTPError{
Code: 501,
EnhancedCode: exterrors.EnhancedCode{5, 1, 2},
Message: "Unable to normalize the recipient address",
}
}
return s.delivery.AddRcpt(ctx, cleanTo)
}
func (s *Session) Logout() error {
s.msgLock.Lock()
defer s.msgLock.Unlock()
if s.delivery != nil {
s.abort(s.msgCtx)
if s.repeatedMailErrs > s.endp.maxLoggedRcptErrors {
s.log.Msg("MAIL FROM repeated error a lot of times, possible dictonary attack", "count", s.repeatedMailErrs, "src_ip", s.connState.RemoteAddr)
}
}
if s.cancelRDNS != nil {
s.cancelRDNS()
}
return nil
}
func (s *Session) prepareBody(ctx context.Context, r io.Reader) (textproto.Header, buffer.Buffer, error) {
bufr := bufio.NewReader(r)
header, err := textproto.ReadHeader(bufr)
if err != nil {
return textproto.Header{}, nil, err
}
if s.endp.submission {
// The MsgMetadata is passed by pointer all the way down.
if err := s.submissionPrepare(s.msgMeta, &header); err != nil {
return textproto.Header{}, nil, err
}
}
buf, err := s.endp.buffer(bufr)
if err != nil {
return textproto.Header{}, nil, err
}
return header, buf, nil
}
func (s *Session) Data(r io.Reader) error {
bodyCtx, bodyTask := trace.NewTask(s.msgCtx, "DATA")
defer bodyTask.End()
wrapErr := func(err error) error {
s.log.Error("DATA error", err, "msg_id", s.msgMeta.ID)
return s.endp.wrapErr(s.msgMeta.ID, !s.opts.UTF8, err)
}
header, buf, err := s.prepareBody(bodyCtx, r)
if err != nil {
return wrapErr(err)
}
defer func() {
if err := buf.Remove(); err != nil {
s.log.Error("failed to remove buffered body", err)
}
// go-smtp will call Reset, but it will call Abort if delivery is non-nil.
s.delivery = nil
s.msgCtx = nil
s.msgTask.End()
s.msgTask = nil
s.releaseLimits()
}()
if err := s.checkRoutingLoops(header); err != nil {
return wrapErr(err)
}
if err := s.delivery.Body(bodyCtx, header, buf); err != nil {
return wrapErr(err)
}
if err := s.delivery.Commit(bodyCtx); err != nil {
return wrapErr(err)
}
s.log.Msg("accepted", "msg_id", s.msgMeta.ID)
return nil
}
type statusWrapper struct {
sc smtp.StatusCollector
s *Session
}
func (sw statusWrapper) SetStatus(rcpt string, err error) {
sw.sc.SetStatus(rcpt, sw.s.endp.wrapErr(sw.s.msgMeta.ID, !sw.s.opts.UTF8, err))
}
func (s *Session) LMTPData(r io.Reader, sc smtp.StatusCollector) error {
bodyCtx, bodyTask := trace.NewTask(s.msgCtx, "DATA")
defer bodyTask.End()
wrapErr := func(err error) error {
s.log.Error("DATA error", err, "msg_id", s.msgMeta.ID)
return s.endp.wrapErr(s.msgMeta.ID, !s.opts.UTF8, err)
}
header, buf, err := s.prepareBody(bodyCtx, r)
if err != nil {
return wrapErr(err)
}
defer func() {
if err := buf.Remove(); err != nil {
s.log.Error("failed to remove buffered body", err)
}
// go-smtp will call Reset, but it will call Abort if delivery is non-nil.
s.delivery = nil
s.msgCtx = nil
s.msgTask.End()
s.msgTask = nil
s.releaseLimits()
}()
if err := s.checkRoutingLoops(header); err != nil {
return wrapErr(err)
}
s.delivery.(module.PartialDelivery).BodyNonAtomic(bodyCtx, statusWrapper{sc, s}, header, buf)
// We can't really tell whether it is failed completely or succeeded
// so always commit. Should be harmless, anyway.
if err := s.delivery.Commit(bodyCtx); err != nil {
return wrapErr(err)
}
s.log.Msg("accepted", "msg_id", s.msgMeta.ID)
return nil
}
func (s *Session) checkRoutingLoops(header textproto.Header) error {
// RFC 5321 Section 6.3:
// >Simple counting of the number of "Received:" header fields in a
// >message has proven to be an effective, although rarely optimal,
// >method of detecting loops in mail systems.
receivedCount := 0
for f := header.FieldsByKey("Received"); f.Next(); {
receivedCount++
}
if receivedCount > s.endp.maxReceived {
return &exterrors.SMTPError{
Code: 554,
EnhancedCode: exterrors.EnhancedCode{5, 4, 6},
Message: fmt.Sprintf("Too many Received header fields (%d), possible forwarding loop", receivedCount),
}
}
return nil
}
func (endp *Endpoint) wrapErr(msgId string, mangleUTF8 bool, err error) error {
if err == nil {
return nil
}
if errors.Is(err, context.DeadlineExceeded) {
return &smtp.SMTPError{
Code: 451,
EnhancedCode: smtp.EnhancedCode{4, 4, 5},
Message: "High load, try again later",
}
}
res := &smtp.SMTPError{
Code: 554,
EnhancedCode: smtp.EnhancedCodeNotSet,
// Err on the side of caution if the error lacks SMTP annotations. If
// we just pass the error text through, we might accidenetally disclose
// details of server configuration.
Message: "Internal server error",
}
if exterrors.IsTemporary(err) {
res.Code = 451
}
ctxInfo := exterrors.Fields(err)
ctxCode, ok := ctxInfo["smtp_code"].(int)
if ok {
res.Code = ctxCode
}
ctxEnchCode, ok := ctxInfo["smtp_enchcode"].(exterrors.EnhancedCode)
if ok {
res.EnhancedCode = smtp.EnhancedCode(ctxEnchCode)
}
ctxMsg, ok := ctxInfo["smtp_msg"].(string)
if ok {
res.Message = ctxMsg
}
if smtpErr, ok := err.(*smtp.SMTPError); ok {
endp.Log.Printf("plain SMTP error returned, this is deprecated")
res.Code = smtpErr.Code
res.EnhancedCode = smtpErr.EnhancedCode
res.Message = smtpErr.Message
}
if msgId != "" {
res.Message += " (msg ID = " + msgId + ")"
}
// INTERNATIONALIZATION: See RFC 6531 Section 3.7.4.1.
if mangleUTF8 {
b := strings.Builder{}
b.Grow(len(res.Message))
for _, ch := range res.Message {
if ch > 128 {
b.WriteRune('?')
} else {
b.WriteRune(ch)
}
}
res.Message = b.String()
}
return res
}