Implement read waiter for UDP

This commit is contained in:
世界 2023-12-06 23:55:54 +08:00
parent 316055ea0f
commit cb26be3e2a
No known key found for this signature in database
GPG key ID: CD109927C34A63C4
8 changed files with 161 additions and 111 deletions

View file

@ -19,6 +19,7 @@ import (
"github.com/sagernet/sing/common/cache"
E "github.com/sagernet/sing/common/exceptions"
M "github.com/sagernet/sing/common/metadata"
N "github.com/sagernet/sing/common/network"
"github.com/sagernet/sing/common/rw"
)
@ -118,17 +119,18 @@ func fragUDPMessage(message *udpMessage, maxPacketSize int) []*udpMessage {
}
type udpPacketConn struct {
ctx context.Context
cancel common.ContextCancelCauseFunc
sessionID uint32
quicConn quic.Connection
data chan *udpMessage
udpMTU int
udpMTUTime time.Time
packetId atomic.Uint32
closeOnce sync.Once
defragger *udpDefragger
onDestroy func()
ctx context.Context
cancel common.ContextCancelCauseFunc
sessionID uint32
quicConn quic.Connection
data chan *udpMessage
udpMTU int
udpMTUTime time.Time
packetId atomic.Uint32
closeOnce sync.Once
defragger *udpDefragger
onDestroy func()
readWaitOptions N.ReadWaitOptions
}
func newUDPPacketConn(ctx context.Context, quicConn quic.Connection, onDestroy func()) *udpPacketConn {
@ -143,18 +145,6 @@ func newUDPPacketConn(ctx context.Context, quicConn quic.Connection, onDestroy f
}
}
func (c *udpPacketConn) ReadPacketThreadSafe() (buffer *buf.Buffer, destination M.Socksaddr, err error) {
select {
case p := <-c.data:
buffer = p.data
destination = M.ParseSocksaddrHostPort(p.host, p.port)
p.release()
return
case <-c.ctx.Done():
return nil, M.Socksaddr{}, io.ErrClosedPipe
}
}
func (c *udpPacketConn) ReadPacket(buffer *buf.Buffer) (destination M.Socksaddr, err error) {
select {
case p := <-c.data:
@ -167,18 +157,6 @@ func (c *udpPacketConn) ReadPacket(buffer *buf.Buffer) (destination M.Socksaddr,
}
}
func (c *udpPacketConn) WaitReadPacket(newBuffer func() *buf.Buffer) (destination M.Socksaddr, err error) {
select {
case p := <-c.data:
_, err = newBuffer().ReadOnceFrom(p.data)
destination = M.ParseSocksaddrHostPort(p.host, p.port)
p.releaseMessage()
return
case <-c.ctx.Done():
return M.Socksaddr{}, io.ErrClosedPipe
}
}
func (c *udpPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
select {
case pkt := <-c.data:

37
hysteria/packet_wait.go Normal file
View file

@ -0,0 +1,37 @@
package hysteria
import (
"io"
"github.com/sagernet/sing/common/buf"
M "github.com/sagernet/sing/common/metadata"
N "github.com/sagernet/sing/common/network"
)
func (c *udpPacketConn) InitializeReadWaiter(options N.ReadWaitOptions) (needCopy bool) {
c.readWaitOptions = options
return options.NeedHeadroom()
}
func (c *udpPacketConn) WaitReadPacket() (buffer *buf.Buffer, destination M.Socksaddr, err error) {
select {
case p := <-c.data:
destination = M.ParseSocksaddrHostPort(p.host, p.port)
if c.readWaitOptions.NeedHeadroom() {
buffer = c.readWaitOptions.NewPacketBuffer()
_, err = buffer.Write(p.data.Bytes())
if err != nil {
buffer.Release()
return nil, M.Socksaddr{}, err
}
p.releaseMessage()
c.readWaitOptions.PostReturn(buffer)
} else {
buffer = p.data
p.release()
}
return
case <-c.ctx.Done():
return nil, M.Socksaddr{}, io.ErrClosedPipe
}
}