diff --git a/docs/man/maddy-smtp.5.scd b/docs/man/maddy-smtp.5.scd index b242a69..78ae6c9 100644 --- a/docs/man/maddy-smtp.5.scd +++ b/docs/man/maddy-smtp.5.scd @@ -21,7 +21,10 @@ smtp tcp://0.0.0.0:25 { auth pam defer_sender_reject yes dmarc yes - ratelimit 0 0s + limits { + endpoint rate 10 + endpoint concurrency 500 + } # Example pipeline ocnfiguration. destination example.org { @@ -81,25 +84,6 @@ I/O read timeout. I/O write timeout. -*Syntex*: ratelimit _burst_ _interval_ ++ -*Default*: 20 1s (up to 20 messages per second) - -Restrict the inbound messages rate to at most _burst_ messages in _interval_. -Specify both values to 0 to disable the limit. - -If the message waits for more than 5 seconds, it is rejected with code 451. - -*Syntax*: concurrency _max_ ++ -*Default*: 1000 - -Restrict the amount of concurrent inbound deliveries to handle. -Specify 0 to disable restriction. - -If the message waits for more than 5 seconds, it is rejected with code 451. - -Example: ratelimit 20 1s -Limit to 20 messages per second. - *Syntax*: max_message_size _size_ ++ *Default*: 32M @@ -136,6 +120,69 @@ check module. *NOTE*: DMARC needs apply_spf and verify_dkim checks to function correctly. Without these, DMARC check will not run. +## Rate & concurrency limiting + +*Syntax*: limits _config block_ ++ +*Default*: no limits + +This allows configuring a set of message flow restrictions including +max. concurrency and rate per-endpoint, per-source, per-destination. + +Limits are specified as directives inside the block: +``` +limits { + all rate 20 + destination concurrency 5 +} +``` + +Supported limits: + +- Rate limit + +*Syntax*: _scope_ rate _burst_ _[period]_ ++ +Restrict the amount of messages processed in _period_ to _burst_ messages. +If period is not specified, 1 second is used. + +- Concurrency limit + +*Syntax*: _scope_ concurrency _max_ ++ +Restrict the amount of messages processed in parallel to _max_. + +For each supported limitation, _scope_ determines whether it should be applied +for all messages ("all"), per-sender IP ("ip") or per-recipient domain +("destination"). Having a scope other than "all" means that the restriction +will be enforced independently for each group determined by scope. E.g. +"ip rate 20" means that the same IP cannot send more than 20 messages in a +scond. "destination concurrency 5" means that no more than 5 messages can be +sent in parallel to a single domain. + +*Note*: At the moment, SMTP endpoint does not support per-recipient limits. +They will be no-op. If you want to enforce a per-recipient restriction on +outbound messages, do so using 'limits' directive for the 'remote' module (see +*maddy-targets*(5)). + +It is possible to share limit counters between multiple endpoints (or any other +modules). To do so define a top-level configuration block for module "limits" +and reference it where needed using standard & syntax. E.g. +``` +limits inbound_limits { + all rate 20 +} + +smtp smtp://0.0.0.0:25 { + limits &inbound_limits + ... +} + +submission tls://0.0.0.0:465 { + limits &inbound_limits + ... +} +``` +Using an "all rate" restriction in such way means that no more than 20 +messages can enter the server through both endpoints in one second. + # Submission module (submission) Module 'submission' implements all functionality of the 'smtp' module and adds diff --git a/docs/man/maddy-targets.5.scd b/docs/man/maddy-targets.5.scd index 1a571f5..455a057 100644 --- a/docs/man/maddy-targets.5.scd +++ b/docs/man/maddy-targets.5.scd @@ -111,17 +111,13 @@ Hostname to use client greeting (EHLO/HELO command). Some servers require it to be FQDN, SPF-capable servers check whether it corresponds to the server IP address, so it is better to set it to a domain that resolves to the server IP. -*Syntax*: min_mx_level none|mtasts|dnssec ++ -*Default*: none +*Syntax*: limits _config block_ ++ +*Default*: no limits -Minimal MX records security level to require before using remote server for -delivery. See [Security levels](../../seclevels) page for details. +See 'limits' directive in *maddy-smtp*(5) for SMTP endpoint. +It works the same except for address domains used for +per-source/per-destination are as observed when message exits the server. -*Syntax*: min_tls_level none|encrypted|authenticated -*Default*: none - -Minimal MX records security level to require before using remote server for -delivery. See [Security levels](../../seclevels) page for details. *Syntax*: debug _boolean_ ++ *Default*: global directive value @@ -130,7 +126,7 @@ Enable verbose logging. ## Security policies -*Syntax*: mx_auth _config block_ +*Syntax*: mx_auth _config block_ ++ *Default*: no policies 'remote' module implements a number of of schemes and protocols necessary to diff --git a/internal/config/limiters.go b/internal/config/limiters.go deleted file mode 100644 index 33971d2..0000000 --- a/internal/config/limiters.go +++ /dev/null @@ -1,76 +0,0 @@ -package config - -import ( - "strconv" - "time" - - "github.com/foxcpp/maddy/internal/limiters" -) - -// GlobalRateLimit reads '... ' config directive and returns -// limiters.Rate created using arguments. -func GlobalRateLimit(m *Map, node *Node) (interface{}, error) { - if len(node.Args) != 2 { - return nil, m.MatchErr("need two arguments: ") - } - - burst, err := strconv.Atoi(node.Args[0]) - if err != nil { - return nil, m.MatchErr("%v", err) - } - - interval, err := time.ParseDuration(node.Args[1]) - if err != nil { - return nil, m.MatchErr("%v", err) - } - - return limiters.NewRate(burst, interval), nil -} - -func NoGlobalRateLimit() (interface{}, error) { - return limiters.NewRate(0, 0), nil -} - -// KeyRateLimit reads '... ' config directive and returns -// limiters.RateSet created using arguments, maxBuckets is currently hardcoded -// to be 20010 (slightly higher than the default max_recipients value). -func KeyRateLimit(m *Map, node *Node) (interface{}, error) { - if len(node.Args) != 2 { - return nil, m.MatchErr("need two arguments: ") - } - - burst, err := strconv.Atoi(node.Args[0]) - if err != nil { - return nil, m.MatchErr("%v", err) - } - - interval, err := time.ParseDuration(node.Args[1]) - if err != nil { - return nil, m.MatchErr("%v", err) - } - - return limiters.NewRateSet(burst, interval, 20010), nil -} - -func NoKeyRateLimit() (interface{}, error) { - return limiters.NewRateSet(0, 0, 20010), nil -} - -// ConcurrencyLimit reads '... ' config directive and returns limiters.Semaphore -// created using arguments. -func ConcurrencyLimit(m *Map, node *Node) (interface{}, error) { - if len(node.Args) != 0 { - return nil, m.MatchErr("need two arguments: ") - } - - max, err := strconv.Atoi(node.Args[0]) - if err != nil { - return nil, m.MatchErr("%v", err) - } - - return limiters.NewSemaphore(max), nil -} - -func NoConcurrencyLimit() (interface{}, error) { - return limiters.NewSemaphore(0), nil -} diff --git a/internal/endpoint/smtp/smtp.go b/internal/endpoint/smtp/smtp.go index df047e2..922b0e1 100644 --- a/internal/endpoint/smtp/smtp.go +++ b/internal/endpoint/smtp/smtp.go @@ -24,7 +24,7 @@ import ( "github.com/foxcpp/maddy/internal/dns" "github.com/foxcpp/maddy/internal/exterrors" "github.com/foxcpp/maddy/internal/future" - "github.com/foxcpp/maddy/internal/limiters" + "github.com/foxcpp/maddy/internal/limits" "github.com/foxcpp/maddy/internal/log" "github.com/foxcpp/maddy/internal/module" "github.com/foxcpp/maddy/internal/msgpipeline" @@ -64,8 +64,19 @@ func (s *Session) Reset() { s.endp.Log.DebugMsg("reset") } +func (s *Session) releaseLimits() { + _, domain, err := address.Split(s.mailFrom) + if err != nil { + return + } + addr, ok := s.msgMeta.Conn.RemoteAddr.(*net.TCPAddr) + if !ok { + addr = &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)} + } + s.endp.limits.ReleaseMsg(addr.IP, domain) +} + func (s *Session) abort(ctx context.Context) { - s.endp.semaphore.Release() if err := s.delivery.Abort(ctx); err != nil { s.endp.Log.Error("delivery abort failed", err) } @@ -113,16 +124,7 @@ func (s *Session) startDelivery(ctx context.Context, from string, opts smtp.Mail if err != nil { return "", err } - msgMeta.OriginalFrom = cleanFrom - - limitersCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - if err := s.endp.ratelimit.TakeContext(limitersCtx); err != nil { - return "", err - } - if err := s.endp.semaphore.TakeContext(limitersCtx); err != nil { - return "", err - } + msgMeta.OriginalFrom = from if s.connState.AuthUser != "" { s.log.Msg("incoming message", @@ -142,6 +144,19 @@ func (s *Session) startDelivery(ctx context.Context, from string, opts smtp.Mail } s.msgCtx, s.msgTask = trace.NewTask(ctx, "Incoming Message") + + _, domain, err := address.Split(cleanFrom) + if err != nil { + return "", err + } + remoteIP, ok := msgMeta.Conn.RemoteAddr.(*net.TCPAddr) + if !ok { + remoteIP = &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)} + } + if err := s.endp.limits.TakeMsg(s.msgCtx, remoteIP.IP, domain); err != nil { + return "", err + } + mailCtx, mailTask := trace.NewTask(s.msgCtx, "MAIL FROM") defer mailTask.End() @@ -338,7 +353,7 @@ func (s *Session) Data(r io.Reader) error { s.msgCtx = nil s.msgTask.End() s.msgTask = nil - s.endp.semaphore.Release() + s.releaseLimits() return nil } @@ -381,7 +396,7 @@ func (s *Session) LMTPData(r io.Reader, sc smtp.StatusCollector) error { s.msgCtx = nil s.msgTask.End() s.msgTask = nil - s.endp.semaphore.Release() + s.releaseLimits() return nil } @@ -391,7 +406,7 @@ func (endp *Endpoint) wrapErr(msgId string, mangleUTF8 bool, err error) error { return nil } - if err == context.DeadlineExceeded { + if errors.Is(err, context.DeadlineExceeded) { return &smtp.SMTPError{ Code: 451, EnhancedCode: smtp.EnhancedCode{4, 4, 5}, @@ -463,8 +478,7 @@ type Endpoint struct { listeners []net.Listener pipeline *msgpipeline.MsgPipeline resolver dns.Resolver - ratelimit limiters.Rate - semaphore limiters.Semaphore + limits *limits.Group authAlwaysRequired bool submission bool @@ -566,12 +580,15 @@ func (endp *Endpoint) setConfig(cfg *config.Map) error { cfg.Bool("debug", true, false, &endp.Log.Debug) cfg.Bool("defer_sender_reject", false, true, &endp.deferServerReject) cfg.Int("max_logged_rcpt_errors", false, false, 5, &endp.maxLoggedRcptErrors) - cfg.Custom("ratelimit", false, false, func() (interface{}, error) { - return limiters.NewRate(10, time.Second), nil - }, config.GlobalRateLimit, &endp.ratelimit) - cfg.Custom("concurrency", false, false, func() (interface{}, error) { - return limiters.NewSemaphore(1000), nil - }, config.ConcurrencyLimit, &endp.semaphore) + cfg.Custom("limits", false, false, func() (interface{}, error) { + return &limits.Group{}, nil + }, func(cfg *config.Map, n *config.Node) (interface{}, error) { + var g *limits.Group + if err := modconfig.GroupFromNode("limits", n.Args, n, cfg.Globals, &g); err != nil { + return nil, err + } + return g, nil + }, &endp.limits) cfg.AllowUnknown() unknown, err := cfg.Process() if err != nil { diff --git a/internal/limiters/bucket.go b/internal/limits/limiters/bucket.go similarity index 83% rename from internal/limiters/bucket.go rename to internal/limits/limiters/bucket.go index 7701bcb..2bccc35 100644 --- a/internal/limiters/bucket.go +++ b/internal/limits/limiters/bucket.go @@ -6,7 +6,7 @@ import ( "time" ) -// BucketSet combines a group of Limiters into a single key-indexed structure. +// BucketSet combines a group of Ls into a single key-indexed structure. // Basically, each unique key gets its own counter. The main use case for // BucketSet is to apply per-resource rate limiting. // @@ -19,11 +19,11 @@ import ( // A BucksetSet without a New function assigned is no-op: Take and TakeContext // always succeed and Release does nothing. type BucketSet struct { - // New function is used to construct underlying Limiter instances. + // New function is used to construct underlying L instances. // // It is safe to change it only when BucketSet is not used by any // goroutine. - New func() Limiter + New func() L // Time after which bucket is considered stale and can be removed from the // set. For safe use with Rate limiter, it should be at least as twice as @@ -34,18 +34,18 @@ type BucketSet struct { mLck sync.Mutex m map[string]*struct { - r Limiter + r L lastUse time.Time } } -func NewBucketSet(new_ func() Limiter, reapInterval time.Duration, maxBuckets int) *BucketSet { +func NewBucketSet(new_ func() L, reapInterval time.Duration, maxBuckets int) *BucketSet { return &BucketSet{ New: new_, ReapInterval: reapInterval, MaxBuckets: maxBuckets, m: map[string]*struct { - r Limiter + r L lastUse time.Time }{}, } @@ -60,7 +60,7 @@ func (r *BucketSet) Close() { } } -func (r *BucketSet) take(key string) Limiter { +func (r *BucketSet) take(key string) L { r.mLck.Lock() defer r.mLck.Unlock() @@ -88,7 +88,7 @@ func (r *BucketSet) take(key string) Limiter { bucket, ok := r.m[key] if !ok { r.m[key] = &struct { - r Limiter + r L lastUse time.Time }{ r: r.New(), @@ -110,6 +110,21 @@ func (r *BucketSet) Take(key string) bool { return bucket.Take() } +func (r *BucketSet) Release(key string) { + if r.New == nil { + return + } + + r.mLck.Lock() + defer r.mLck.Unlock() + + bucket, ok := r.m[key] + if !ok { + return + } + bucket.r.Release() +} + func (r *BucketSet) TakeContext(ctx context.Context, key string) error { if r.New == nil { return nil diff --git a/internal/limiters/concurrency.go b/internal/limits/limiters/concurrency.go similarity index 89% rename from internal/limiters/concurrency.go rename to internal/limits/limiters/concurrency.go index b92f367..bcaba30 100644 --- a/internal/limiters/concurrency.go +++ b/internal/limits/limiters/concurrency.go @@ -15,11 +15,12 @@ func NewSemaphore(max int) Semaphore { return Semaphore{c: make(chan struct{}, max)} } -func (s Semaphore) Take() { +func (s Semaphore) Take() bool { if cap(s.c) <= 0 { - return + return true } s.c <- struct{}{} + return true } func (s Semaphore) TakeContext(ctx context.Context) error { @@ -44,3 +45,6 @@ func (s Semaphore) Release() { panic("limiters: mismatched Release call") } } + +func (s Semaphore) Close() { +} diff --git a/internal/limiters/limiters.go b/internal/limits/limiters/limiters.go similarity index 58% rename from internal/limiters/limiters.go rename to internal/limits/limiters/limiters.go index 9e43c47..fd6b511 100644 --- a/internal/limiters/limiters.go +++ b/internal/limits/limiters/limiters.go @@ -4,14 +4,14 @@ package limiters import "context" -// The Limiter interface represents a blocking limiter that has some upper -// bound of resource use and blocks when it is exceeded until enough resources -// are freed. -type Limiter interface { +// The L interface represents a blocking limiter that has some upper bound of +// resource use and blocks when it is exceeded until enough resources are +// freed. +type L interface { Take() bool TakeContext(context.Context) error Release() // Close frees any resources used internally by Limiter for book-keeping. - Close() error + Close() } diff --git a/internal/limits/limiters/multilimit.go b/internal/limits/limiters/multilimit.go new file mode 100644 index 0000000..8d81585 --- /dev/null +++ b/internal/limits/limiters/multilimit.go @@ -0,0 +1,51 @@ +package limiters + +import "context" + +// MultiLimit wraps multiple L implementations into a single one, locking them +// in the specified order. +// +// It does not implement any deadlock detection or avoidance algorithms. +type MultiLimit struct { + Wrapped []L +} + +func (ml *MultiLimit) Take() bool { + for i := 0; i < len(ml.Wrapped); i++ { + if !ml.Wrapped[i].Take() { + // Acquire failed, undo acquire for all other resources we already + // got. + for _, l := range ml.Wrapped[:i] { + l.Release() + } + return false + } + } + return true +} + +func (ml *MultiLimit) TakeContext(ctx context.Context) error { + for i := 0; i < len(ml.Wrapped); i++ { + if err := ml.Wrapped[i].TakeContext(ctx); err != nil { + // Acquire failed, undo acquire for all other resources we already + // got. + for _, l := range ml.Wrapped[:i] { + l.Release() + } + return err + } + } + return nil +} + +func (ml *MultiLimit) Release() { + for _, l := range ml.Wrapped { + l.Release() + } +} + +func (ml *MultiLimit) Close() { + for _, l := range ml.Wrapped { + l.Close() + } +} diff --git a/internal/limiters/rate.go b/internal/limits/limiters/rate.go similarity index 98% rename from internal/limiters/rate.go rename to internal/limits/limiters/rate.go index 374f858..25bdd6f 100644 --- a/internal/limiters/rate.go +++ b/internal/limits/limiters/rate.go @@ -93,6 +93,9 @@ func (r Rate) TakeContext(ctx context.Context) error { } } +func (r Rate) Release() { +} + func (r Rate) Close() { close(r.stop) } diff --git a/internal/limits/limits.go b/internal/limits/limits.go new file mode 100644 index 0000000..0491b1c --- /dev/null +++ b/internal/limits/limits.go @@ -0,0 +1,213 @@ +// Package limit provides a module object that can be used to restrict the +// concurrency and rate of the messages flow globally or on per-source, +// per-destination basis. +// +// Note, all domain inputs are interpreted with the assumption they are already +// normalized. +// +// Low-level components are available in the limiters/ subpackage. +package limits + +import ( + "context" + "net" + "strconv" + "time" + + "github.com/foxcpp/maddy/internal/config" + "github.com/foxcpp/maddy/internal/limits/limiters" + "github.com/foxcpp/maddy/internal/module" +) + +type Group struct { + instName string + + global limiters.MultiLimit + ip *limiters.BucketSet // BucketSet of MultiLimit + source *limiters.BucketSet // BucketSet of MultiLimit + dest *limiters.BucketSet // BucketSet of MultiLimit +} + +func New(_, instName string, _, _ []string) (module.Module, error) { + return &Group{ + instName: instName, + }, nil +} + +func (g *Group) Init(cfg *config.Map) error { + var ( + globalL []limiters.L + ipL []func() limiters.L + sourceL []func() limiters.L + destL []func() limiters.L + ) + + for _, child := range cfg.Block.Children { + if len(child.Args) < 1 { + return config.NodeErr(&child, "at least two arguments are required") + } + + var ( + ctor func() limiters.L + err error + ) + switch kind := child.Args[0]; kind { + case "rate": + ctor, err = rateCtor(cfg, child.Args[1:]) + case "concurrency": + ctor, err = concurrencyCtor(cfg, child.Args[1:]) + default: + return config.NodeErr(&child, "unknown limit kind: %v", kind) + } + if err != nil { + return err + } + + switch scope := child.Name; scope { + case "all": + globalL = append(globalL, ctor()) + case "ip": + ipL = append(ipL, ctor) + case "source": + sourceL = append(sourceL, ctor) + case "destination": + destL = append(destL, ctor) + default: + return config.NodeErr(&child, "unknown limit scope: %v", scope) + } + } + + // 20010 is slightly higher than the default max. recipients count in + // endpoint/smtp. + g.global = limiters.MultiLimit{Wrapped: globalL} + if len(ipL) != 0 { + g.ip = limiters.NewBucketSet(func() limiters.L { + l := make([]limiters.L, 0, len(ipL)) + for _, ctor := range ipL { + l = append(l, ctor()) + } + return &limiters.MultiLimit{Wrapped: l} + }, 1*time.Minute, 20010) + } + if len(sourceL) != 0 { + g.source = limiters.NewBucketSet(func() limiters.L { + l := make([]limiters.L, 0, len(sourceL)) + for _, ctor := range sourceL { + l = append(l, ctor()) + } + return &limiters.MultiLimit{Wrapped: l} + }, 1*time.Minute, 20010) + } + if len(destL) != 0 { + g.dest = limiters.NewBucketSet(func() limiters.L { + l := make([]limiters.L, 0, len(sourceL)) + for _, ctor := range sourceL { + l = append(l, ctor()) + } + return &limiters.MultiLimit{Wrapped: l} + }, 1*time.Minute, 20010) + } + + return nil +} + +func rateCtor(cfg *config.Map, args []string) (func() limiters.L, error) { + period := 1 * time.Second + burst := 0 + + switch len(args) { + case 2: + var err error + period, err = time.ParseDuration(args[1]) + if err != nil { + return nil, cfg.MatchErr("%v", err) + } + case 1: + var err error + burst, err = strconv.Atoi(args[0]) + if err != nil { + return nil, cfg.MatchErr("%v", err) + } + case 0: + return nil, cfg.MatchErr("at least burst size is needed") + } + + return func() limiters.L { + return limiters.NewRate(burst, period) + }, nil +} + +func concurrencyCtor(cfg *config.Map, args []string) (func() limiters.L, error) { + if len(args) != 1 { + return nil, cfg.MatchErr("max concurrency value is needed") + } + max, err := strconv.Atoi(args[0]) + if err != nil { + return nil, cfg.MatchErr("%v", err) + } + return func() limiters.L { + return limiters.NewSemaphore(max) + }, nil +} + +func (g *Group) TakeMsg(ctx context.Context, addr net.IP, sourceDomain string) error { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + if err := g.global.TakeContext(ctx); err != nil { + return err + } + + if g.ip != nil { + if err := g.ip.TakeContext(ctx, addr.String()); err != nil { + g.global.Release() + return err + } + } + if g.source != nil { + if err := g.source.TakeContext(ctx, sourceDomain); err != nil { + g.global.Release() + g.ip.Release(addr.String()) + return err + } + } + return nil +} + +func (g *Group) TakeDest(ctx context.Context, domain string) error { + if g.dest == nil { + return nil + } + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + return g.dest.TakeContext(ctx, domain) +} + +func (g *Group) ReleaseMsg(addr net.IP, sourceDomain string) { + g.global.Release() + if g.ip != nil { + g.ip.Release(addr.String()) + } + if g.source != nil { + g.source.Release(sourceDomain) + } +} + +func (g *Group) ReleaseDest(domain string) { + if g.dest == nil { + return + } + g.dest.Release(domain) +} + +func (g *Group) Name() string { + return "limits" +} + +func (g *Group) InstanceName() string { + return g.instName +} + +func init() { + module.Register("limits", New) +} diff --git a/internal/target/remote/connect.go b/internal/target/remote/connect.go index b03f25d..ed45eeb 100644 --- a/internal/target/remote/connect.go +++ b/internal/target/remote/connect.go @@ -7,7 +7,6 @@ import ( "net" "runtime/trace" "sort" - "strings" "github.com/foxcpp/maddy/internal/config" "github.com/foxcpp/maddy/internal/exterrors" @@ -149,8 +148,6 @@ func (rd *remoteDelivery) attemptMX(ctx context.Context, conn mxConn, record *ne } func (rd *remoteDelivery) connectionForDomain(ctx context.Context, domain string) (*smtpconn.C, error) { - domain = strings.ToLower(domain) - if c, ok := rd.connections[domain]; ok { return c.C, nil } @@ -177,6 +174,13 @@ func (rd *remoteDelivery) connectionForDomain(ctx context.Context, domain string } conn.dnssecOk = dnssecOk + region = trace.StartRegion(ctx, "remote/limits.TakeDest") + if err := rd.rt.limits.TakeDest(ctx, domain); err != nil { + region.End() + return nil, err + } + region.End() + var lastErr error region = trace.StartRegion(ctx, "remote/Connect+TLS") for _, record := range records { diff --git a/internal/target/remote/remote.go b/internal/target/remote/remote.go index 4564b88..9c65246 100644 --- a/internal/target/remote/remote.go +++ b/internal/target/remote/remote.go @@ -22,6 +22,7 @@ import ( modconfig "github.com/foxcpp/maddy/internal/config/module" "github.com/foxcpp/maddy/internal/dns" "github.com/foxcpp/maddy/internal/exterrors" + "github.com/foxcpp/maddy/internal/limits" "github.com/foxcpp/maddy/internal/log" "github.com/foxcpp/maddy/internal/module" "github.com/foxcpp/maddy/internal/target" @@ -92,6 +93,7 @@ type Target struct { extResolver *dns.ExtResolver policies []Policy + limits *limits.Group Log log.Logger } @@ -134,6 +136,15 @@ func (rt *Target) Init(cfg *config.Map) error { } return p.L, nil }, &rt.policies) + cfg.Custom("limits", false, false, func() (interface{}, error) { + return &limits.Group{}, nil + }, func(cfg *config.Map, n *config.Node) (interface{}, error) { + var g *limits.Group + if err := modconfig.GroupFromNode("limits", n.Args, n, cfg.Globals, &g); err != nil { + return nil, err + } + return g, nil + }, &rt.limits) if _, err := cfg.Process(); err != nil { return err @@ -183,6 +194,36 @@ func (rt *Target) Start(ctx context.Context, msgMeta *module.MsgMetadata, mailFr policies = append(policies, p.Start(msgMeta)) } + _, domain, err := address.Split(mailFrom) + if err != nil { + return nil, &exterrors.SMTPError{ + Code: 501, + EnhancedCode: exterrors.EnhancedCode{5, 1, 8}, + Message: "Malformed sender address", + TargetName: "remote", + Err: err, + } + } + // Domain is already should be normalized by the message source (e.g. + // endpoint/smtp). + region := trace.StartRegion(ctx, "remote/limits.Take") + addr, ok := msgMeta.Conn.RemoteAddr.(*net.TCPAddr) + if !ok { + addr = &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)} + } + if err := rt.limits.TakeMsg(ctx, addr.IP, domain); err != nil { + region.End() + return nil, &exterrors.SMTPError{ + Code: 451, + EnhancedCode: exterrors.EnhancedCode{4, 4, 5}, + Message: "High load, try again later", + Reason: "Global limit timeout", + TargetName: "remote", + Err: err, + } + } + region.End() + return &remoteDelivery{ rt: rt, mailFrom: mailFrom, @@ -365,8 +406,22 @@ func (rd *remoteDelivery) Commit(ctx context.Context) error { func (rd *remoteDelivery) Close() error { for _, conn := range rd.connections { rd.Log.Debugf("disconnected from %s", conn.ServerName()) + + rd.rt.limits.ReleaseDest(conn.domain) + conn.Close() } + + _, domain, err := address.Split(rd.mailFrom) + if err != nil { + return err + } + addr, ok := rd.msgMeta.Conn.RemoteAddr.(*net.TCPAddr) + if !ok { + addr = &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)} + } + rd.rt.limits.ReleaseMsg(addr.IP, domain) + return nil } diff --git a/maddy.conf b/maddy.conf index f019352..2f5e8e5 100644 --- a/maddy.conf +++ b/maddy.conf @@ -51,8 +51,10 @@ sql local_mailboxes local_authdb { } smtp tcp://0.0.0.0:25 { - # Permit up to 20 messages per second to be enqueued. - ratelimit 20 1s + # Permit up to 20 messages per second to be accepted + limits { + all rate 20 + } check { # Verify that hostname in EHLO/HELO resolves to the source IP. Fail if it is not.