feat: rework udp stuff

This commit is contained in:
Toby 2023-07-27 18:51:33 -07:00
parent fd4d095dcd
commit d4e3833641
10 changed files with 278 additions and 136 deletions

View file

@ -84,7 +84,7 @@ func (u *udpConn) Close() error {
type udpSessionManager struct {
io udpIO
mutex sync.Mutex
mutex sync.RWMutex
m map[uint32]*udpConn
nextID uint32
@ -123,8 +123,8 @@ func (m *udpSessionManager) closeCleanup() {
}
func (m *udpSessionManager) feed(msg *protocol.UDPMessage) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.mutex.RLock()
defer m.mutex.RUnlock()
conn, ok := m.m[msg.SessionID]
if !ok {
@ -162,9 +162,7 @@ func (m *udpSessionManager) NewUDP() (HyUDPConn, error) {
conn.CloseFunc = func() {
m.mutex.Lock()
defer m.mutex.Unlock()
if !conn.Closed {
m.close(conn)
}
m.close(conn)
}
m.m[id] = conn
@ -172,13 +170,15 @@ func (m *udpSessionManager) NewUDP() (HyUDPConn, error) {
}
func (m *udpSessionManager) close(conn *udpConn) {
conn.Closed = true
close(conn.ReceiveCh)
delete(m.m, conn.ID)
if !conn.Closed {
conn.Closed = true
close(conn.ReceiveCh)
delete(m.m, conn.ID)
}
}
func (m *udpSessionManager) Count() int {
m.mutex.Lock()
defer m.mutex.Unlock()
m.mutex.RLock()
defer m.mutex.RUnlock()
return len(m.m)
}

View file

@ -3,7 +3,6 @@ module github.com/apernet/hysteria/core
go 1.20
require (
github.com/magiconair/properties v1.8.7
github.com/quic-go/quic-go v0.0.0-00010101000000-000000000000
github.com/stretchr/testify v1.8.4
go.uber.org/goleak v1.2.1

View file

@ -23,8 +23,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/onsi/ginkgo/v2 v2.9.5 h1:+6Hr4uxzP4XIUyAkg61dWBw8lb/gc4/X5luuxN/EC+Q=

View file

@ -29,11 +29,11 @@ type udpEventLogger interface {
}
type udpSessionEntry struct {
ID uint32
Conn UDPConn
D *frag.Defragger
Last *utils.AtomicTime
Closed bool
ID uint32
Conn UDPConn
D *frag.Defragger
Last *utils.AtomicTime
Timeout bool // true if the session is closed due to timeout
}
// Feed feeds a UDP message to the session.
@ -111,7 +111,7 @@ type udpSessionManager struct {
eventLogger udpEventLogger
idleTimeout time.Duration
mutex sync.Mutex
mutex sync.RWMutex
m map[uint32]*udpSessionEntry
}
@ -155,30 +155,31 @@ func (m *udpSessionManager) idleCleanupLoop(stopCh <-chan struct{}) {
}
func (m *udpSessionManager) cleanup(idleOnly bool) {
m.mutex.Lock()
defer m.mutex.Unlock()
// We use RLock here as we are only scanning the map, not deleting from it.
m.mutex.RLock()
defer m.mutex.RUnlock()
now := time.Now()
for sessionID, entry := range m.m {
for _, entry := range m.m {
if !idleOnly || now.Sub(entry.Last.Get()) > m.idleTimeout {
entry.Closed = true
entry.Timeout = true
_ = entry.Conn.Close()
m.eventLogger.Close(sessionID, nil)
delete(m.m, sessionID)
// Closing the connection here will cause the ReceiveLoop to exit,
// and the session will be removed from the map there.
}
}
}
func (m *udpSessionManager) feed(msg *protocol.UDPMessage) {
m.mutex.Lock()
m.mutex.RLock()
entry := m.m[msg.SessionID]
m.mutex.RUnlock()
// Create a new session if not exists
if entry == nil {
// New session
m.eventLogger.New(msg.SessionID, msg.Addr)
conn, err := m.io.UDP(msg.Addr)
if err != nil {
m.mutex.Unlock()
m.eventLogger.Close(msg.SessionID, err)
return
}
@ -191,21 +192,26 @@ func (m *udpSessionManager) feed(msg *protocol.UDPMessage) {
// Start the receive loop for this session
go func() {
err := entry.ReceiveLoop(m.io)
// Receive loop stopped, remove the session
m.mutex.Lock()
if !entry.Closed {
entry.Closed = true
if !entry.Timeout {
_ = entry.Conn.Close()
m.eventLogger.Close(entry.ID, err)
delete(m.m, entry.ID)
} else {
// Connection already closed by timeout cleanup,
// no need to close again here.
// Use nil error to indicate timeout.
m.eventLogger.Close(entry.ID, nil)
}
// Remove the session from the map
m.mutex.Lock()
delete(m.m, entry.ID)
m.mutex.Unlock()
}()
// Insert the session into the map
m.mutex.Lock()
m.m[msg.SessionID] = entry
m.mutex.Unlock()
}
m.mutex.Unlock()
// Feed the message to the session
// Feed (send) errors are ignored for now,
// as some are temporary (e.g. invalid address)
@ -213,7 +219,7 @@ func (m *udpSessionManager) feed(msg *protocol.UDPMessage) {
}
func (m *udpSessionManager) Count() int {
m.mutex.Lock()
defer m.mutex.Unlock()
m.mutex.RLock()
defer m.mutex.RUnlock()
return len(m.m)
}