Fix udpnat timeout

This commit is contained in:
世界 2023-04-18 09:19:09 +08:00
parent 0b4d134fe9
commit f196b4303e
No known key found for this signature in database
GPG key ID: CD109927C34A63C4
2 changed files with 45 additions and 18 deletions

View file

@ -3,6 +3,9 @@ package cache
// Modified by https://github.com/die-net/lrucache // Modified by https://github.com/die-net/lrucache
import ( import (
"context"
"log"
"runtime/debug"
"sync" "sync"
"time" "time"
@ -38,13 +41,14 @@ func WithSize[K comparable, V any](maxSize int) Option[K, V] {
} }
} }
func WithStale[K comparable, V any](stale bool) Option[K, V] { func WithContext[K comparable, V any](ctx context.Context) Option[K, V] {
return func(l *LruCache[K, V]) { return func(l *LruCache[K, V]) {
l.staleReturn = stale l.ctx = ctx
} }
} }
type LruCache[K comparable, V any] struct { type LruCache[K comparable, V any] struct {
ctx context.Context
maxAge int64 maxAge int64
maxSize int maxSize int
mu sync.Mutex mu sync.Mutex
@ -64,6 +68,14 @@ func New[K comparable, V any](options ...Option[K, V]) *LruCache[K, V] {
option(lc) option(lc)
} }
if lc.maxAge > 0 {
if lc.ctx == nil {
lc.ctx = context.Background()
log.Println("your lru cache is going to leak")
debug.PrintStack()
}
go lc.loopCheckTimeout()
}
return lc return lc
} }
@ -107,8 +119,6 @@ create:
e := &entry[K, V]{key: key, value: value, expires: time.Now().Unix() + c.maxAge} e := &entry[K, V]{key: key, value: value, expires: time.Now().Unix() + c.maxAge}
c.cache[key] = c.lru.PushBack(e) c.cache[key] = c.lru.PushBack(e)
} }
c.maybeDeleteOldest()
return value, false return value, false
} }
@ -146,8 +156,6 @@ create:
e := &entry[K, V]{key: key, value: value, expires: time.Now().Unix() + c.maxAge} e := &entry[K, V]{key: key, value: value, expires: time.Now().Unix() + c.maxAge}
c.cache[key] = c.lru.PushBack(e) c.cache[key] = c.lru.PushBack(e)
} }
c.maybeDeleteOldest()
return value, false return value, false
} }
@ -195,8 +203,6 @@ func (c *LruCache[K, V]) StoreWithExpire(key K, value V, expires time.Time) {
} }
} }
} }
c.maybeDeleteOldest()
} }
func (c *LruCache[K, V]) CloneTo(n *LruCache[K, V]) { func (c *LruCache[K, V]) CloneTo(n *LruCache[K, V]) {
@ -234,8 +240,6 @@ func (c *LruCache[K, V]) get(key K) *entry[K, V] {
if !c.staleReturn && c.maxAge > 0 && le.Value.expires <= time.Now().Unix() { if !c.staleReturn && c.maxAge > 0 && le.Value.expires <= time.Now().Unix() {
c.deleteElement(le) c.deleteElement(le)
c.maybeDeleteOldest()
return nil return nil
} }
@ -258,15 +262,34 @@ func (c *LruCache[K, V]) Delete(key K) {
c.mu.Unlock() c.mu.Unlock()
} }
func (c *LruCache[K, V]) maybeDeleteOldest() { func (c *LruCache[K, V]) loopCheckTimeout() {
if !c.staleReturn && c.maxAge > 0 { ticker := time.NewTicker(time.Second * time.Duration(c.maxAge))
now := time.Now().Unix() defer ticker.Stop()
for le := c.lru.Front(); le != nil && le.Value.expires <= now; le = c.lru.Front() { for {
c.deleteElement(le) select {
case <-ticker.C:
c.checkTimeout()
case <-c.ctx.Done():
return
} }
} }
} }
func (c *LruCache[K, V]) checkTimeout() {
c.mu.Lock()
defer c.mu.Unlock()
now := time.Now().Unix()
var toDelete []*list.Element[*entry[K, V]]
for it := c.lru.Front(); it != nil; it = it.Next() {
if it.Value.expires <= now {
toDelete = append(toDelete, it)
}
}
for _, it := range toDelete {
c.deleteElement(it)
}
}
func (c *LruCache[K, V]) deleteElement(le *list.Element[*entry[K, V]]) { func (c *LruCache[K, V]) deleteElement(le *list.Element[*entry[K, V]]) {
c.lru.Remove(le) c.lru.Remove(le)
e := le.Value e := le.Value

View file

@ -25,9 +25,10 @@ type Service[K comparable] struct {
handler Handler handler Handler
} }
func New[K comparable](maxAge int64, handler Handler) *Service[K] { func New[K comparable](ctx context.Context, maxAge int64, handler Handler) *Service[K] {
return &Service[K]{ return &Service[K]{
nat: cache.New( nat: cache.New(
cache.WithContext[K, *conn](ctx),
cache.WithAge[K, *conn](maxAge), cache.WithAge[K, *conn](maxAge),
cache.WithUpdateAgeOnGet[K, *conn](), cache.WithUpdateAgeOnGet[K, *conn](),
cache.WithEvict[K, *conn](func(key K, conn *conn) { cache.WithEvict[K, *conn](func(key K, conn *conn) {
@ -102,6 +103,10 @@ func (s *Service[T]) NewContextPacket(ctx context.Context, key T, buffer *buf.Bu
} }
} }
func (s *Service[T]) Close() error {
return common.Close(common.PtrOrNil(s.nat))
}
type packet struct { type packet struct {
data *buf.Buffer data *buf.Buffer
destination M.Socksaddr destination M.Socksaddr
@ -159,10 +164,9 @@ func (c *conn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
func (c *conn) Close() error { func (c *conn) Close() error {
select { select {
case <-c.ctx.Done(): case <-c.ctx.Done():
return os.ErrClosed
default: default:
c.cancel(net.ErrClosed)
} }
c.cancel(net.ErrClosed)
if sourceCloser, sourceIsCloser := c.source.(io.Closer); sourceIsCloser { if sourceCloser, sourceIsCloser := c.source.(io.Closer); sourceIsCloser {
return sourceCloser.Close() return sourceCloser.Close()
} }