From c3ebbb05a07abe496535941ad8cd7173f9511a18 Mon Sep 17 00:00:00 2001 From: "fox.cpp" Date: Sat, 15 Feb 2020 14:03:35 +0300 Subject: [PATCH] Generalize message flow restrictions Set of flow restrictions is represented as a "limits" module instance that can be either created inline via "limits" directive in some modules (including "remote" target and "smtp" endpoint) or defined globally and referenced in configuration of modules mentioned above. This permits a variety of use cases, including shared and separate counters for various endpoints and also "modules group" style sharing described in #195. --- docs/man/maddy-smtp.5.scd | 87 +++++-- docs/man/maddy-targets.5.scd | 16 +- internal/config/limiters.go | 76 ------- internal/endpoint/smtp/smtp.go | 63 ++++-- internal/{ => limits}/limiters/bucket.go | 31 ++- internal/{ => limits}/limiters/concurrency.go | 8 +- internal/{ => limits}/limiters/limiters.go | 10 +- internal/limits/limiters/multilimit.go | 51 +++++ internal/{ => limits}/limiters/rate.go | 3 + internal/limits/limits.go | 213 ++++++++++++++++++ internal/target/remote/connect.go | 10 +- internal/target/remote/remote.go | 55 +++++ maddy.conf | 6 +- 13 files changed, 480 insertions(+), 149 deletions(-) delete mode 100644 internal/config/limiters.go rename internal/{ => limits}/limiters/bucket.go (83%) rename internal/{ => limits}/limiters/concurrency.go (89%) rename internal/{ => limits}/limiters/limiters.go (58%) create mode 100644 internal/limits/limiters/multilimit.go rename internal/{ => limits}/limiters/rate.go (98%) create mode 100644 internal/limits/limits.go diff --git a/docs/man/maddy-smtp.5.scd b/docs/man/maddy-smtp.5.scd index b242a69..78ae6c9 100644 --- a/docs/man/maddy-smtp.5.scd +++ b/docs/man/maddy-smtp.5.scd @@ -21,7 +21,10 @@ smtp tcp://0.0.0.0:25 { auth pam defer_sender_reject yes dmarc yes - ratelimit 0 0s + limits { + endpoint rate 10 + endpoint concurrency 500 + } # Example pipeline ocnfiguration. destination example.org { @@ -81,25 +84,6 @@ I/O read timeout. I/O write timeout. -*Syntex*: ratelimit _burst_ _interval_ ++ -*Default*: 20 1s (up to 20 messages per second) - -Restrict the inbound messages rate to at most _burst_ messages in _interval_. -Specify both values to 0 to disable the limit. - -If the message waits for more than 5 seconds, it is rejected with code 451. - -*Syntax*: concurrency _max_ ++ -*Default*: 1000 - -Restrict the amount of concurrent inbound deliveries to handle. -Specify 0 to disable restriction. - -If the message waits for more than 5 seconds, it is rejected with code 451. - -Example: ratelimit 20 1s -Limit to 20 messages per second. - *Syntax*: max_message_size _size_ ++ *Default*: 32M @@ -136,6 +120,69 @@ check module. *NOTE*: DMARC needs apply_spf and verify_dkim checks to function correctly. Without these, DMARC check will not run. +## Rate & concurrency limiting + +*Syntax*: limits _config block_ ++ +*Default*: no limits + +This allows configuring a set of message flow restrictions including +max. concurrency and rate per-endpoint, per-source, per-destination. + +Limits are specified as directives inside the block: +``` +limits { + all rate 20 + destination concurrency 5 +} +``` + +Supported limits: + +- Rate limit + +*Syntax*: _scope_ rate _burst_ _[period]_ ++ +Restrict the amount of messages processed in _period_ to _burst_ messages. +If period is not specified, 1 second is used. + +- Concurrency limit + +*Syntax*: _scope_ concurrency _max_ ++ +Restrict the amount of messages processed in parallel to _max_. + +For each supported limitation, _scope_ determines whether it should be applied +for all messages ("all"), per-sender IP ("ip") or per-recipient domain +("destination"). Having a scope other than "all" means that the restriction +will be enforced independently for each group determined by scope. E.g. +"ip rate 20" means that the same IP cannot send more than 20 messages in a +scond. "destination concurrency 5" means that no more than 5 messages can be +sent in parallel to a single domain. + +*Note*: At the moment, SMTP endpoint does not support per-recipient limits. +They will be no-op. If you want to enforce a per-recipient restriction on +outbound messages, do so using 'limits' directive for the 'remote' module (see +*maddy-targets*(5)). + +It is possible to share limit counters between multiple endpoints (or any other +modules). To do so define a top-level configuration block for module "limits" +and reference it where needed using standard & syntax. E.g. +``` +limits inbound_limits { + all rate 20 +} + +smtp smtp://0.0.0.0:25 { + limits &inbound_limits + ... +} + +submission tls://0.0.0.0:465 { + limits &inbound_limits + ... +} +``` +Using an "all rate" restriction in such way means that no more than 20 +messages can enter the server through both endpoints in one second. + # Submission module (submission) Module 'submission' implements all functionality of the 'smtp' module and adds diff --git a/docs/man/maddy-targets.5.scd b/docs/man/maddy-targets.5.scd index 1a571f5..455a057 100644 --- a/docs/man/maddy-targets.5.scd +++ b/docs/man/maddy-targets.5.scd @@ -111,17 +111,13 @@ Hostname to use client greeting (EHLO/HELO command). Some servers require it to be FQDN, SPF-capable servers check whether it corresponds to the server IP address, so it is better to set it to a domain that resolves to the server IP. -*Syntax*: min_mx_level none|mtasts|dnssec ++ -*Default*: none +*Syntax*: limits _config block_ ++ +*Default*: no limits -Minimal MX records security level to require before using remote server for -delivery. See [Security levels](../../seclevels) page for details. +See 'limits' directive in *maddy-smtp*(5) for SMTP endpoint. +It works the same except for address domains used for +per-source/per-destination are as observed when message exits the server. -*Syntax*: min_tls_level none|encrypted|authenticated -*Default*: none - -Minimal MX records security level to require before using remote server for -delivery. See [Security levels](../../seclevels) page for details. *Syntax*: debug _boolean_ ++ *Default*: global directive value @@ -130,7 +126,7 @@ Enable verbose logging. ## Security policies -*Syntax*: mx_auth _config block_ +*Syntax*: mx_auth _config block_ ++ *Default*: no policies 'remote' module implements a number of of schemes and protocols necessary to diff --git a/internal/config/limiters.go b/internal/config/limiters.go deleted file mode 100644 index 33971d2..0000000 --- a/internal/config/limiters.go +++ /dev/null @@ -1,76 +0,0 @@ -package config - -import ( - "strconv" - "time" - - "github.com/foxcpp/maddy/internal/limiters" -) - -// GlobalRateLimit reads '... ' config directive and returns -// limiters.Rate created using arguments. -func GlobalRateLimit(m *Map, node *Node) (interface{}, error) { - if len(node.Args) != 2 { - return nil, m.MatchErr("need two arguments: ") - } - - burst, err := strconv.Atoi(node.Args[0]) - if err != nil { - return nil, m.MatchErr("%v", err) - } - - interval, err := time.ParseDuration(node.Args[1]) - if err != nil { - return nil, m.MatchErr("%v", err) - } - - return limiters.NewRate(burst, interval), nil -} - -func NoGlobalRateLimit() (interface{}, error) { - return limiters.NewRate(0, 0), nil -} - -// KeyRateLimit reads '... ' config directive and returns -// limiters.RateSet created using arguments, maxBuckets is currently hardcoded -// to be 20010 (slightly higher than the default max_recipients value). -func KeyRateLimit(m *Map, node *Node) (interface{}, error) { - if len(node.Args) != 2 { - return nil, m.MatchErr("need two arguments: ") - } - - burst, err := strconv.Atoi(node.Args[0]) - if err != nil { - return nil, m.MatchErr("%v", err) - } - - interval, err := time.ParseDuration(node.Args[1]) - if err != nil { - return nil, m.MatchErr("%v", err) - } - - return limiters.NewRateSet(burst, interval, 20010), nil -} - -func NoKeyRateLimit() (interface{}, error) { - return limiters.NewRateSet(0, 0, 20010), nil -} - -// ConcurrencyLimit reads '... ' config directive and returns limiters.Semaphore -// created using arguments. -func ConcurrencyLimit(m *Map, node *Node) (interface{}, error) { - if len(node.Args) != 0 { - return nil, m.MatchErr("need two arguments: ") - } - - max, err := strconv.Atoi(node.Args[0]) - if err != nil { - return nil, m.MatchErr("%v", err) - } - - return limiters.NewSemaphore(max), nil -} - -func NoConcurrencyLimit() (interface{}, error) { - return limiters.NewSemaphore(0), nil -} diff --git a/internal/endpoint/smtp/smtp.go b/internal/endpoint/smtp/smtp.go index df047e2..922b0e1 100644 --- a/internal/endpoint/smtp/smtp.go +++ b/internal/endpoint/smtp/smtp.go @@ -24,7 +24,7 @@ import ( "github.com/foxcpp/maddy/internal/dns" "github.com/foxcpp/maddy/internal/exterrors" "github.com/foxcpp/maddy/internal/future" - "github.com/foxcpp/maddy/internal/limiters" + "github.com/foxcpp/maddy/internal/limits" "github.com/foxcpp/maddy/internal/log" "github.com/foxcpp/maddy/internal/module" "github.com/foxcpp/maddy/internal/msgpipeline" @@ -64,8 +64,19 @@ func (s *Session) Reset() { s.endp.Log.DebugMsg("reset") } +func (s *Session) releaseLimits() { + _, domain, err := address.Split(s.mailFrom) + if err != nil { + return + } + addr, ok := s.msgMeta.Conn.RemoteAddr.(*net.TCPAddr) + if !ok { + addr = &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)} + } + s.endp.limits.ReleaseMsg(addr.IP, domain) +} + func (s *Session) abort(ctx context.Context) { - s.endp.semaphore.Release() if err := s.delivery.Abort(ctx); err != nil { s.endp.Log.Error("delivery abort failed", err) } @@ -113,16 +124,7 @@ func (s *Session) startDelivery(ctx context.Context, from string, opts smtp.Mail if err != nil { return "", err } - msgMeta.OriginalFrom = cleanFrom - - limitersCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - if err := s.endp.ratelimit.TakeContext(limitersCtx); err != nil { - return "", err - } - if err := s.endp.semaphore.TakeContext(limitersCtx); err != nil { - return "", err - } + msgMeta.OriginalFrom = from if s.connState.AuthUser != "" { s.log.Msg("incoming message", @@ -142,6 +144,19 @@ func (s *Session) startDelivery(ctx context.Context, from string, opts smtp.Mail } s.msgCtx, s.msgTask = trace.NewTask(ctx, "Incoming Message") + + _, domain, err := address.Split(cleanFrom) + if err != nil { + return "", err + } + remoteIP, ok := msgMeta.Conn.RemoteAddr.(*net.TCPAddr) + if !ok { + remoteIP = &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)} + } + if err := s.endp.limits.TakeMsg(s.msgCtx, remoteIP.IP, domain); err != nil { + return "", err + } + mailCtx, mailTask := trace.NewTask(s.msgCtx, "MAIL FROM") defer mailTask.End() @@ -338,7 +353,7 @@ func (s *Session) Data(r io.Reader) error { s.msgCtx = nil s.msgTask.End() s.msgTask = nil - s.endp.semaphore.Release() + s.releaseLimits() return nil } @@ -381,7 +396,7 @@ func (s *Session) LMTPData(r io.Reader, sc smtp.StatusCollector) error { s.msgCtx = nil s.msgTask.End() s.msgTask = nil - s.endp.semaphore.Release() + s.releaseLimits() return nil } @@ -391,7 +406,7 @@ func (endp *Endpoint) wrapErr(msgId string, mangleUTF8 bool, err error) error { return nil } - if err == context.DeadlineExceeded { + if errors.Is(err, context.DeadlineExceeded) { return &smtp.SMTPError{ Code: 451, EnhancedCode: smtp.EnhancedCode{4, 4, 5}, @@ -463,8 +478,7 @@ type Endpoint struct { listeners []net.Listener pipeline *msgpipeline.MsgPipeline resolver dns.Resolver - ratelimit limiters.Rate - semaphore limiters.Semaphore + limits *limits.Group authAlwaysRequired bool submission bool @@ -566,12 +580,15 @@ func (endp *Endpoint) setConfig(cfg *config.Map) error { cfg.Bool("debug", true, false, &endp.Log.Debug) cfg.Bool("defer_sender_reject", false, true, &endp.deferServerReject) cfg.Int("max_logged_rcpt_errors", false, false, 5, &endp.maxLoggedRcptErrors) - cfg.Custom("ratelimit", false, false, func() (interface{}, error) { - return limiters.NewRate(10, time.Second), nil - }, config.GlobalRateLimit, &endp.ratelimit) - cfg.Custom("concurrency", false, false, func() (interface{}, error) { - return limiters.NewSemaphore(1000), nil - }, config.ConcurrencyLimit, &endp.semaphore) + cfg.Custom("limits", false, false, func() (interface{}, error) { + return &limits.Group{}, nil + }, func(cfg *config.Map, n *config.Node) (interface{}, error) { + var g *limits.Group + if err := modconfig.GroupFromNode("limits", n.Args, n, cfg.Globals, &g); err != nil { + return nil, err + } + return g, nil + }, &endp.limits) cfg.AllowUnknown() unknown, err := cfg.Process() if err != nil { diff --git a/internal/limiters/bucket.go b/internal/limits/limiters/bucket.go similarity index 83% rename from internal/limiters/bucket.go rename to internal/limits/limiters/bucket.go index 7701bcb..2bccc35 100644 --- a/internal/limiters/bucket.go +++ b/internal/limits/limiters/bucket.go @@ -6,7 +6,7 @@ import ( "time" ) -// BucketSet combines a group of Limiters into a single key-indexed structure. +// BucketSet combines a group of Ls into a single key-indexed structure. // Basically, each unique key gets its own counter. The main use case for // BucketSet is to apply per-resource rate limiting. // @@ -19,11 +19,11 @@ import ( // A BucksetSet without a New function assigned is no-op: Take and TakeContext // always succeed and Release does nothing. type BucketSet struct { - // New function is used to construct underlying Limiter instances. + // New function is used to construct underlying L instances. // // It is safe to change it only when BucketSet is not used by any // goroutine. - New func() Limiter + New func() L // Time after which bucket is considered stale and can be removed from the // set. For safe use with Rate limiter, it should be at least as twice as @@ -34,18 +34,18 @@ type BucketSet struct { mLck sync.Mutex m map[string]*struct { - r Limiter + r L lastUse time.Time } } -func NewBucketSet(new_ func() Limiter, reapInterval time.Duration, maxBuckets int) *BucketSet { +func NewBucketSet(new_ func() L, reapInterval time.Duration, maxBuckets int) *BucketSet { return &BucketSet{ New: new_, ReapInterval: reapInterval, MaxBuckets: maxBuckets, m: map[string]*struct { - r Limiter + r L lastUse time.Time }{}, } @@ -60,7 +60,7 @@ func (r *BucketSet) Close() { } } -func (r *BucketSet) take(key string) Limiter { +func (r *BucketSet) take(key string) L { r.mLck.Lock() defer r.mLck.Unlock() @@ -88,7 +88,7 @@ func (r *BucketSet) take(key string) Limiter { bucket, ok := r.m[key] if !ok { r.m[key] = &struct { - r Limiter + r L lastUse time.Time }{ r: r.New(), @@ -110,6 +110,21 @@ func (r *BucketSet) Take(key string) bool { return bucket.Take() } +func (r *BucketSet) Release(key string) { + if r.New == nil { + return + } + + r.mLck.Lock() + defer r.mLck.Unlock() + + bucket, ok := r.m[key] + if !ok { + return + } + bucket.r.Release() +} + func (r *BucketSet) TakeContext(ctx context.Context, key string) error { if r.New == nil { return nil diff --git a/internal/limiters/concurrency.go b/internal/limits/limiters/concurrency.go similarity index 89% rename from internal/limiters/concurrency.go rename to internal/limits/limiters/concurrency.go index b92f367..bcaba30 100644 --- a/internal/limiters/concurrency.go +++ b/internal/limits/limiters/concurrency.go @@ -15,11 +15,12 @@ func NewSemaphore(max int) Semaphore { return Semaphore{c: make(chan struct{}, max)} } -func (s Semaphore) Take() { +func (s Semaphore) Take() bool { if cap(s.c) <= 0 { - return + return true } s.c <- struct{}{} + return true } func (s Semaphore) TakeContext(ctx context.Context) error { @@ -44,3 +45,6 @@ func (s Semaphore) Release() { panic("limiters: mismatched Release call") } } + +func (s Semaphore) Close() { +} diff --git a/internal/limiters/limiters.go b/internal/limits/limiters/limiters.go similarity index 58% rename from internal/limiters/limiters.go rename to internal/limits/limiters/limiters.go index 9e43c47..fd6b511 100644 --- a/internal/limiters/limiters.go +++ b/internal/limits/limiters/limiters.go @@ -4,14 +4,14 @@ package limiters import "context" -// The Limiter interface represents a blocking limiter that has some upper -// bound of resource use and blocks when it is exceeded until enough resources -// are freed. -type Limiter interface { +// The L interface represents a blocking limiter that has some upper bound of +// resource use and blocks when it is exceeded until enough resources are +// freed. +type L interface { Take() bool TakeContext(context.Context) error Release() // Close frees any resources used internally by Limiter for book-keeping. - Close() error + Close() } diff --git a/internal/limits/limiters/multilimit.go b/internal/limits/limiters/multilimit.go new file mode 100644 index 0000000..8d81585 --- /dev/null +++ b/internal/limits/limiters/multilimit.go @@ -0,0 +1,51 @@ +package limiters + +import "context" + +// MultiLimit wraps multiple L implementations into a single one, locking them +// in the specified order. +// +// It does not implement any deadlock detection or avoidance algorithms. +type MultiLimit struct { + Wrapped []L +} + +func (ml *MultiLimit) Take() bool { + for i := 0; i < len(ml.Wrapped); i++ { + if !ml.Wrapped[i].Take() { + // Acquire failed, undo acquire for all other resources we already + // got. + for _, l := range ml.Wrapped[:i] { + l.Release() + } + return false + } + } + return true +} + +func (ml *MultiLimit) TakeContext(ctx context.Context) error { + for i := 0; i < len(ml.Wrapped); i++ { + if err := ml.Wrapped[i].TakeContext(ctx); err != nil { + // Acquire failed, undo acquire for all other resources we already + // got. + for _, l := range ml.Wrapped[:i] { + l.Release() + } + return err + } + } + return nil +} + +func (ml *MultiLimit) Release() { + for _, l := range ml.Wrapped { + l.Release() + } +} + +func (ml *MultiLimit) Close() { + for _, l := range ml.Wrapped { + l.Close() + } +} diff --git a/internal/limiters/rate.go b/internal/limits/limiters/rate.go similarity index 98% rename from internal/limiters/rate.go rename to internal/limits/limiters/rate.go index 374f858..25bdd6f 100644 --- a/internal/limiters/rate.go +++ b/internal/limits/limiters/rate.go @@ -93,6 +93,9 @@ func (r Rate) TakeContext(ctx context.Context) error { } } +func (r Rate) Release() { +} + func (r Rate) Close() { close(r.stop) } diff --git a/internal/limits/limits.go b/internal/limits/limits.go new file mode 100644 index 0000000..0491b1c --- /dev/null +++ b/internal/limits/limits.go @@ -0,0 +1,213 @@ +// Package limit provides a module object that can be used to restrict the +// concurrency and rate of the messages flow globally or on per-source, +// per-destination basis. +// +// Note, all domain inputs are interpreted with the assumption they are already +// normalized. +// +// Low-level components are available in the limiters/ subpackage. +package limits + +import ( + "context" + "net" + "strconv" + "time" + + "github.com/foxcpp/maddy/internal/config" + "github.com/foxcpp/maddy/internal/limits/limiters" + "github.com/foxcpp/maddy/internal/module" +) + +type Group struct { + instName string + + global limiters.MultiLimit + ip *limiters.BucketSet // BucketSet of MultiLimit + source *limiters.BucketSet // BucketSet of MultiLimit + dest *limiters.BucketSet // BucketSet of MultiLimit +} + +func New(_, instName string, _, _ []string) (module.Module, error) { + return &Group{ + instName: instName, + }, nil +} + +func (g *Group) Init(cfg *config.Map) error { + var ( + globalL []limiters.L + ipL []func() limiters.L + sourceL []func() limiters.L + destL []func() limiters.L + ) + + for _, child := range cfg.Block.Children { + if len(child.Args) < 1 { + return config.NodeErr(&child, "at least two arguments are required") + } + + var ( + ctor func() limiters.L + err error + ) + switch kind := child.Args[0]; kind { + case "rate": + ctor, err = rateCtor(cfg, child.Args[1:]) + case "concurrency": + ctor, err = concurrencyCtor(cfg, child.Args[1:]) + default: + return config.NodeErr(&child, "unknown limit kind: %v", kind) + } + if err != nil { + return err + } + + switch scope := child.Name; scope { + case "all": + globalL = append(globalL, ctor()) + case "ip": + ipL = append(ipL, ctor) + case "source": + sourceL = append(sourceL, ctor) + case "destination": + destL = append(destL, ctor) + default: + return config.NodeErr(&child, "unknown limit scope: %v", scope) + } + } + + // 20010 is slightly higher than the default max. recipients count in + // endpoint/smtp. + g.global = limiters.MultiLimit{Wrapped: globalL} + if len(ipL) != 0 { + g.ip = limiters.NewBucketSet(func() limiters.L { + l := make([]limiters.L, 0, len(ipL)) + for _, ctor := range ipL { + l = append(l, ctor()) + } + return &limiters.MultiLimit{Wrapped: l} + }, 1*time.Minute, 20010) + } + if len(sourceL) != 0 { + g.source = limiters.NewBucketSet(func() limiters.L { + l := make([]limiters.L, 0, len(sourceL)) + for _, ctor := range sourceL { + l = append(l, ctor()) + } + return &limiters.MultiLimit{Wrapped: l} + }, 1*time.Minute, 20010) + } + if len(destL) != 0 { + g.dest = limiters.NewBucketSet(func() limiters.L { + l := make([]limiters.L, 0, len(sourceL)) + for _, ctor := range sourceL { + l = append(l, ctor()) + } + return &limiters.MultiLimit{Wrapped: l} + }, 1*time.Minute, 20010) + } + + return nil +} + +func rateCtor(cfg *config.Map, args []string) (func() limiters.L, error) { + period := 1 * time.Second + burst := 0 + + switch len(args) { + case 2: + var err error + period, err = time.ParseDuration(args[1]) + if err != nil { + return nil, cfg.MatchErr("%v", err) + } + case 1: + var err error + burst, err = strconv.Atoi(args[0]) + if err != nil { + return nil, cfg.MatchErr("%v", err) + } + case 0: + return nil, cfg.MatchErr("at least burst size is needed") + } + + return func() limiters.L { + return limiters.NewRate(burst, period) + }, nil +} + +func concurrencyCtor(cfg *config.Map, args []string) (func() limiters.L, error) { + if len(args) != 1 { + return nil, cfg.MatchErr("max concurrency value is needed") + } + max, err := strconv.Atoi(args[0]) + if err != nil { + return nil, cfg.MatchErr("%v", err) + } + return func() limiters.L { + return limiters.NewSemaphore(max) + }, nil +} + +func (g *Group) TakeMsg(ctx context.Context, addr net.IP, sourceDomain string) error { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + if err := g.global.TakeContext(ctx); err != nil { + return err + } + + if g.ip != nil { + if err := g.ip.TakeContext(ctx, addr.String()); err != nil { + g.global.Release() + return err + } + } + if g.source != nil { + if err := g.source.TakeContext(ctx, sourceDomain); err != nil { + g.global.Release() + g.ip.Release(addr.String()) + return err + } + } + return nil +} + +func (g *Group) TakeDest(ctx context.Context, domain string) error { + if g.dest == nil { + return nil + } + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + return g.dest.TakeContext(ctx, domain) +} + +func (g *Group) ReleaseMsg(addr net.IP, sourceDomain string) { + g.global.Release() + if g.ip != nil { + g.ip.Release(addr.String()) + } + if g.source != nil { + g.source.Release(sourceDomain) + } +} + +func (g *Group) ReleaseDest(domain string) { + if g.dest == nil { + return + } + g.dest.Release(domain) +} + +func (g *Group) Name() string { + return "limits" +} + +func (g *Group) InstanceName() string { + return g.instName +} + +func init() { + module.Register("limits", New) +} diff --git a/internal/target/remote/connect.go b/internal/target/remote/connect.go index b03f25d..ed45eeb 100644 --- a/internal/target/remote/connect.go +++ b/internal/target/remote/connect.go @@ -7,7 +7,6 @@ import ( "net" "runtime/trace" "sort" - "strings" "github.com/foxcpp/maddy/internal/config" "github.com/foxcpp/maddy/internal/exterrors" @@ -149,8 +148,6 @@ func (rd *remoteDelivery) attemptMX(ctx context.Context, conn mxConn, record *ne } func (rd *remoteDelivery) connectionForDomain(ctx context.Context, domain string) (*smtpconn.C, error) { - domain = strings.ToLower(domain) - if c, ok := rd.connections[domain]; ok { return c.C, nil } @@ -177,6 +174,13 @@ func (rd *remoteDelivery) connectionForDomain(ctx context.Context, domain string } conn.dnssecOk = dnssecOk + region = trace.StartRegion(ctx, "remote/limits.TakeDest") + if err := rd.rt.limits.TakeDest(ctx, domain); err != nil { + region.End() + return nil, err + } + region.End() + var lastErr error region = trace.StartRegion(ctx, "remote/Connect+TLS") for _, record := range records { diff --git a/internal/target/remote/remote.go b/internal/target/remote/remote.go index 4564b88..9c65246 100644 --- a/internal/target/remote/remote.go +++ b/internal/target/remote/remote.go @@ -22,6 +22,7 @@ import ( modconfig "github.com/foxcpp/maddy/internal/config/module" "github.com/foxcpp/maddy/internal/dns" "github.com/foxcpp/maddy/internal/exterrors" + "github.com/foxcpp/maddy/internal/limits" "github.com/foxcpp/maddy/internal/log" "github.com/foxcpp/maddy/internal/module" "github.com/foxcpp/maddy/internal/target" @@ -92,6 +93,7 @@ type Target struct { extResolver *dns.ExtResolver policies []Policy + limits *limits.Group Log log.Logger } @@ -134,6 +136,15 @@ func (rt *Target) Init(cfg *config.Map) error { } return p.L, nil }, &rt.policies) + cfg.Custom("limits", false, false, func() (interface{}, error) { + return &limits.Group{}, nil + }, func(cfg *config.Map, n *config.Node) (interface{}, error) { + var g *limits.Group + if err := modconfig.GroupFromNode("limits", n.Args, n, cfg.Globals, &g); err != nil { + return nil, err + } + return g, nil + }, &rt.limits) if _, err := cfg.Process(); err != nil { return err @@ -183,6 +194,36 @@ func (rt *Target) Start(ctx context.Context, msgMeta *module.MsgMetadata, mailFr policies = append(policies, p.Start(msgMeta)) } + _, domain, err := address.Split(mailFrom) + if err != nil { + return nil, &exterrors.SMTPError{ + Code: 501, + EnhancedCode: exterrors.EnhancedCode{5, 1, 8}, + Message: "Malformed sender address", + TargetName: "remote", + Err: err, + } + } + // Domain is already should be normalized by the message source (e.g. + // endpoint/smtp). + region := trace.StartRegion(ctx, "remote/limits.Take") + addr, ok := msgMeta.Conn.RemoteAddr.(*net.TCPAddr) + if !ok { + addr = &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)} + } + if err := rt.limits.TakeMsg(ctx, addr.IP, domain); err != nil { + region.End() + return nil, &exterrors.SMTPError{ + Code: 451, + EnhancedCode: exterrors.EnhancedCode{4, 4, 5}, + Message: "High load, try again later", + Reason: "Global limit timeout", + TargetName: "remote", + Err: err, + } + } + region.End() + return &remoteDelivery{ rt: rt, mailFrom: mailFrom, @@ -365,8 +406,22 @@ func (rd *remoteDelivery) Commit(ctx context.Context) error { func (rd *remoteDelivery) Close() error { for _, conn := range rd.connections { rd.Log.Debugf("disconnected from %s", conn.ServerName()) + + rd.rt.limits.ReleaseDest(conn.domain) + conn.Close() } + + _, domain, err := address.Split(rd.mailFrom) + if err != nil { + return err + } + addr, ok := rd.msgMeta.Conn.RemoteAddr.(*net.TCPAddr) + if !ok { + addr = &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)} + } + rd.rt.limits.ReleaseMsg(addr.IP, domain) + return nil } diff --git a/maddy.conf b/maddy.conf index f019352..2f5e8e5 100644 --- a/maddy.conf +++ b/maddy.conf @@ -51,8 +51,10 @@ sql local_mailboxes local_authdb { } smtp tcp://0.0.0.0:25 { - # Permit up to 20 messages per second to be enqueued. - ratelimit 20 1s + # Permit up to 20 messages per second to be accepted + limits { + all rate 20 + } check { # Verify that hostname in EHLO/HELO resolves to the source IP. Fail if it is not.