Implement read waiter for UDP

This commit is contained in:
世界 2023-12-06 23:55:54 +08:00
parent c098d42d8b
commit 499e257d3b
No known key found for this signature in database
GPG key ID: CD109927C34A63C4
8 changed files with 163 additions and 115 deletions

5
go.mod
View file

@ -5,9 +5,9 @@ go 1.20
require (
github.com/gofrs/uuid/v5 v5.0.0
github.com/sagernet/quic-go v0.40.0
github.com/sagernet/sing v0.2.18
github.com/sagernet/sing v0.3.0-beta.3
golang.org/x/crypto v0.16.0
golang.org/x/exp v0.0.0-20231127185646-65229373498e
golang.org/x/exp v0.0.0-20231214170342-aacd6d4b4611
)
require (
@ -16,7 +16,6 @@ require (
github.com/onsi/ginkgo/v2 v2.9.7 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/qtls-go1-20 v0.4.1 // indirect
github.com/stretchr/testify v1.8.4 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect

9
go.sum
View file

@ -23,16 +23,15 @@ github.com/quic-go/qtls-go1-20 v0.4.1 h1:D33340mCNDAIKBqXuAvexTNMUByrYmFYVfKfDN5
github.com/quic-go/qtls-go1-20 v0.4.1/go.mod h1:X9Nh97ZL80Z+bX/gUXMbipO6OxdiDi58b/fMC9mAL+k=
github.com/sagernet/quic-go v0.40.0 h1:DvQNPb72lzvNQDe9tcUyHTw8eRv6PLtM2mNYmdlzUMo=
github.com/sagernet/quic-go v0.40.0/go.mod h1:VqtdhlbkeeG5Okhb3eDMb/9o0EoglReHunNT9ukrJAI=
github.com/sagernet/sing v0.2.18 h1:2Ce4dl0pkWft+4914NGXPb8OiQpgA8UHQ9xFOmgvKuY=
github.com/sagernet/sing v0.2.18/go.mod h1:OL6k2F0vHmEzXz2KW19qQzu172FDgSbUSODylighuVo=
github.com/sagernet/sing v0.3.0-beta.3 h1:E2xBoJUducK/FE6EwMk95Rt2bkXeht9l1BTYRui+DXs=
github.com/sagernet/sing v0.3.0-beta.3/go.mod h1:9pfuAH6mZfgnz/YjP6xu5sxx882rfyjpcrTdUpd6w3g=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY=
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No=
golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
golang.org/x/exp v0.0.0-20231214170342-aacd6d4b4611 h1:qCEDpW1G+vcj3Y7Fy52pEM1AWm3abj8WimGYejI3SC4=
golang.org/x/exp v0.0.0-20231214170342-aacd6d4b4611/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=

View file

@ -19,6 +19,7 @@ import (
"github.com/sagernet/sing/common/cache"
E "github.com/sagernet/sing/common/exceptions"
M "github.com/sagernet/sing/common/metadata"
N "github.com/sagernet/sing/common/network"
"github.com/sagernet/sing/common/rw"
)
@ -129,6 +130,7 @@ type udpPacketConn struct {
closeOnce sync.Once
defragger *udpDefragger
onDestroy func()
readWaitOptions N.ReadWaitOptions
}
func newUDPPacketConn(ctx context.Context, quicConn quic.Connection, onDestroy func()) *udpPacketConn {
@ -143,18 +145,6 @@ func newUDPPacketConn(ctx context.Context, quicConn quic.Connection, onDestroy f
}
}
func (c *udpPacketConn) ReadPacketThreadSafe() (buffer *buf.Buffer, destination M.Socksaddr, err error) {
select {
case p := <-c.data:
buffer = p.data
destination = M.ParseSocksaddrHostPort(p.host, p.port)
p.release()
return
case <-c.ctx.Done():
return nil, M.Socksaddr{}, io.ErrClosedPipe
}
}
func (c *udpPacketConn) ReadPacket(buffer *buf.Buffer) (destination M.Socksaddr, err error) {
select {
case p := <-c.data:
@ -167,18 +157,6 @@ func (c *udpPacketConn) ReadPacket(buffer *buf.Buffer) (destination M.Socksaddr,
}
}
func (c *udpPacketConn) WaitReadPacket(newBuffer func() *buf.Buffer) (destination M.Socksaddr, err error) {
select {
case p := <-c.data:
_, err = newBuffer().ReadOnceFrom(p.data)
destination = M.ParseSocksaddrHostPort(p.host, p.port)
p.releaseMessage()
return
case <-c.ctx.Done():
return M.Socksaddr{}, io.ErrClosedPipe
}
}
func (c *udpPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
select {
case pkt := <-c.data:

37
hysteria/packet_wait.go Normal file
View file

@ -0,0 +1,37 @@
package hysteria
import (
"io"
"github.com/sagernet/sing/common/buf"
M "github.com/sagernet/sing/common/metadata"
N "github.com/sagernet/sing/common/network"
)
func (c *udpPacketConn) InitializeReadWaiter(options N.ReadWaitOptions) (needCopy bool) {
c.readWaitOptions = options
return options.NeedHeadroom()
}
func (c *udpPacketConn) WaitReadPacket() (buffer *buf.Buffer, destination M.Socksaddr, err error) {
select {
case p := <-c.data:
destination = M.ParseSocksaddrHostPort(p.host, p.port)
if c.readWaitOptions.NeedHeadroom() {
buffer = c.readWaitOptions.NewPacketBuffer()
_, err = buffer.Write(p.data.Bytes())
if err != nil {
buffer.Release()
return nil, M.Socksaddr{}, err
}
p.releaseMessage()
c.readWaitOptions.PostReturn(buffer)
} else {
buffer = p.data
p.release()
}
return
case <-c.ctx.Done():
return nil, M.Socksaddr{}, io.ErrClosedPipe
}
}

View file

@ -20,6 +20,7 @@ import (
"github.com/sagernet/sing/common/buf"
"github.com/sagernet/sing/common/cache"
M "github.com/sagernet/sing/common/metadata"
N "github.com/sagernet/sing/common/network"
)
var udpMessagePool = sync.Pool{
@ -125,6 +126,7 @@ type udpPacketConn struct {
closeOnce sync.Once
defragger *udpDefragger
onDestroy func()
readWaitOptions N.ReadWaitOptions
}
func newUDPPacketConn(ctx context.Context, quicConn quic.Connection, onDestroy func()) *udpPacketConn {
@ -139,18 +141,6 @@ func newUDPPacketConn(ctx context.Context, quicConn quic.Connection, onDestroy f
}
}
func (c *udpPacketConn) ReadPacketThreadSafe() (buffer *buf.Buffer, destination M.Socksaddr, err error) {
select {
case p := <-c.data:
buffer = p.data
destination = M.ParseSocksaddr(p.destination)
p.release()
return
case <-c.ctx.Done():
return nil, M.Socksaddr{}, io.ErrClosedPipe
}
}
func (c *udpPacketConn) ReadPacket(buffer *buf.Buffer) (destination M.Socksaddr, err error) {
select {
case p := <-c.data:
@ -163,18 +153,6 @@ func (c *udpPacketConn) ReadPacket(buffer *buf.Buffer) (destination M.Socksaddr,
}
}
func (c *udpPacketConn) WaitReadPacket(newBuffer func() *buf.Buffer) (destination M.Socksaddr, err error) {
select {
case p := <-c.data:
_, err = newBuffer().ReadOnceFrom(p.data)
destination = M.ParseSocksaddr(p.destination)
p.releaseMessage()
return
case <-c.ctx.Done():
return M.Socksaddr{}, io.ErrClosedPipe
}
}
func (c *udpPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
select {
case pkt := <-c.data:

37
hysteria2/packet_wait.go Normal file
View file

@ -0,0 +1,37 @@
package hysteria2
import (
"io"
"github.com/sagernet/sing/common/buf"
M "github.com/sagernet/sing/common/metadata"
N "github.com/sagernet/sing/common/network"
)
func (c *udpPacketConn) InitializeReadWaiter(options N.ReadWaitOptions) (needCopy bool) {
c.readWaitOptions = options
return options.NeedHeadroom()
}
func (c *udpPacketConn) WaitReadPacket() (buffer *buf.Buffer, destination M.Socksaddr, err error) {
select {
case p := <-c.data:
destination = M.ParseSocksaddr(p.destination)
if c.readWaitOptions.NeedHeadroom() {
buffer = c.readWaitOptions.NewPacketBuffer()
_, err = buffer.Write(p.data.Bytes())
if err != nil {
buffer.Release()
return nil, M.Socksaddr{}, err
}
p.releaseMessage()
c.readWaitOptions.PostReturn(buffer)
} else {
buffer = p.data
p.release()
}
return
case <-c.ctx.Done():
return nil, M.Socksaddr{}, io.ErrClosedPipe
}
}

View file

@ -19,6 +19,7 @@ import (
"github.com/sagernet/sing/common/cache"
E "github.com/sagernet/sing/common/exceptions"
M "github.com/sagernet/sing/common/metadata"
N "github.com/sagernet/sing/common/network"
)
var udpMessagePool = sync.Pool{
@ -114,6 +115,11 @@ func fragUDPMessage(message *udpMessage, maxPacketSize int) []*udpMessage {
return fragments
}
var (
_ N.NetPacketConn = (*udpPacketConn)(nil)
_ N.PacketReadWaiter = (*udpPacketConn)(nil)
)
type udpPacketConn struct {
ctx context.Context
cancel common.ContextCancelCauseFunc
@ -128,6 +134,7 @@ type udpPacketConn struct {
isServer bool
defragger *udpDefragger
onDestroy func()
readWaitOptions N.ReadWaitOptions
}
func newUDPPacketConn(ctx context.Context, quicConn quic.Connection, udpStream bool, isServer bool, onDestroy func()) *udpPacketConn {
@ -144,18 +151,6 @@ func newUDPPacketConn(ctx context.Context, quicConn quic.Connection, udpStream b
}
}
func (c *udpPacketConn) ReadPacketThreadSafe() (buffer *buf.Buffer, destination M.Socksaddr, err error) {
select {
case p := <-c.data:
buffer = p.data
destination = p.destination
p.release()
return
case <-c.ctx.Done():
return nil, M.Socksaddr{}, io.ErrClosedPipe
}
}
func (c *udpPacketConn) ReadPacket(buffer *buf.Buffer) (destination M.Socksaddr, err error) {
select {
case p := <-c.data:
@ -168,18 +163,6 @@ func (c *udpPacketConn) ReadPacket(buffer *buf.Buffer) (destination M.Socksaddr,
}
}
func (c *udpPacketConn) WaitReadPacket(newBuffer func() *buf.Buffer) (destination M.Socksaddr, err error) {
select {
case p := <-c.data:
_, err = newBuffer().ReadOnceFrom(p.data)
destination = p.destination
p.releaseMessage()
return
case <-c.ctx.Done():
return M.Socksaddr{}, io.ErrClosedPipe
}
}
func (c *udpPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
select {
case pkt := <-c.data:

37
tuic/packet_wait.go Normal file
View file

@ -0,0 +1,37 @@
package tuic
import (
"io"
"github.com/sagernet/sing/common/buf"
M "github.com/sagernet/sing/common/metadata"
N "github.com/sagernet/sing/common/network"
)
func (c *udpPacketConn) InitializeReadWaiter(options N.ReadWaitOptions) (needCopy bool) {
c.readWaitOptions = options
return options.NeedHeadroom()
}
func (c *udpPacketConn) WaitReadPacket() (buffer *buf.Buffer, destination M.Socksaddr, err error) {
select {
case p := <-c.data:
destination = p.destination
if c.readWaitOptions.NeedHeadroom() {
buffer = c.readWaitOptions.NewPacketBuffer()
_, err = buffer.Write(p.data.Bytes())
if err != nil {
buffer.Release()
return nil, M.Socksaddr{}, err
}
p.releaseMessage()
c.readWaitOptions.PostReturn(buffer)
} else {
buffer = p.data
p.release()
}
return
case <-c.ctx.Done():
return nil, M.Socksaddr{}, io.ErrClosedPipe
}
}