From 64835a637bdc004a08fdc5e53916f6e7ae4dad17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=96=E7=95=8C?= Date: Tue, 14 Jun 2022 11:29:29 +0800 Subject: [PATCH] Improve bufio --- common/bufio/conn.go | 33 ++++++++++++++++---------- common/bufio/limit.go | 50 ++++++++++++++++++++++++++++++++++++++++ common/metadata/addr.go | 3 +++ common/network/conn.go | 5 ++++ go.mod | 2 +- go.sum | 4 ++-- protocol/socks/packet.go | 2 +- 7 files changed, 83 insertions(+), 16 deletions(-) create mode 100644 common/bufio/limit.go diff --git a/common/bufio/conn.go b/common/bufio/conn.go index 38be03d..b518474 100644 --- a/common/bufio/conn.go +++ b/common/bufio/conn.go @@ -299,8 +299,10 @@ func CopyPacketConnTimeout(ctx context.Context, conn N.PacketConn, dest N.Packet }) } -func NewPacketConn(conn net.PacketConn) N.PacketConn { - if udpConn, ok := conn.(*net.UDPConn); ok { +func NewPacketConn(conn net.PacketConn) N.NetPacketConn { + if packetConn, ok := conn.(N.NetPacketConn); ok { + return packetConn + } else if udpConn, ok := conn.(*net.UDPConn); ok { return &ExtendedUDPConn{udpConn} } else { return &ExtendedPacketConn{udpConn} @@ -380,37 +382,44 @@ func (c *BindPacketConn) Upstream() any { } type UnbindPacketConn struct { - net.Conn + N.ExtendedConn + Addr M.Socksaddr } func (c *UnbindPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { - n, err = c.Conn.Read(p) + n, err = c.ExtendedConn.Read(p) if err == nil { - addr = c.RemoteAddr() + addr = c.Addr.UDPAddr() } return } func (c *UnbindPacketConn) WriteTo(p []byte, _ net.Addr) (n int, err error) { - return c.Write(p) + return c.ExtendedConn.Write(p) } func (c *UnbindPacketConn) ReadPacket(buffer *buf.Buffer) (destination M.Socksaddr, err error) { - _, err = buffer.ReadFrom(c.Conn) + err = c.ExtendedConn.ReadBuffer(buffer) if err != nil { return } - destination = M.SocksaddrFromNet(c.RemoteAddr()) + destination = c.Addr return } -func (c *UnbindPacketConn) WritePacket(buffer *buf.Buffer, destination M.Socksaddr) error { - defer buffer.Release() - return common.Error(c.Conn.Write(buffer.Bytes())) +func (c *UnbindPacketConn) WritePacket(buffer *buf.Buffer, _ M.Socksaddr) error { + return c.ExtendedConn.WriteBuffer(buffer) } func (c *UnbindPacketConn) Upstream() any { - return c.Conn + return c.ExtendedConn +} + +func NewUnbindPacketConn(conn net.Conn) N.NetPacketConn { + return &UnbindPacketConn{ + NewExtendedConn(conn), + M.SocksaddrFromNet(conn.RemoteAddr()), + } } type ExtendedReaderWrapper struct { diff --git a/common/bufio/limit.go b/common/bufio/limit.go new file mode 100644 index 0000000..98e3e42 --- /dev/null +++ b/common/bufio/limit.go @@ -0,0 +1,50 @@ +package bufio + +import ( + "io" + + "github.com/sagernet/sing/common" + "github.com/sagernet/sing/common/buf" + N "github.com/sagernet/sing/common/network" +) + +type LimitedWriter struct { + upstream N.ExtendedWriter + maxChunkLength int +} + +func NewLimitedWriter(writer io.Writer, maxChunkLength int) *LimitedWriter { + return &LimitedWriter{ + upstream: NewExtendedWriter(writer), + maxChunkLength: maxChunkLength, + } +} + +func (w *LimitedWriter) Write(p []byte) (n int, err error) { + for pLen := len(p); pLen > 0; { + var data []byte + if pLen > w.maxChunkLength { + data = p[:w.maxChunkLength] + p = p[w.maxChunkLength:] + pLen -= w.maxChunkLength + } else { + data = p + pLen = 0 + } + var writeN int + writeN, err = w.upstream.Write(data) + if err != nil { + return + } + n += writeN + } + return +} + +func (w *LimitedWriter) WriteBuffer(buffer *buf.Buffer) error { + if buffer.Len() <= w.maxChunkLength { + return w.upstream.WriteBuffer(buffer) + } + defer buffer.Release() + return common.Error(w.Write(buffer.Bytes())) +} diff --git a/common/metadata/addr.go b/common/metadata/addr.go index 6bd92d0..f1a2b57 100644 --- a/common/metadata/addr.go +++ b/common/metadata/addr.go @@ -116,6 +116,9 @@ func SocksaddrFromNetIP(ap netip.AddrPort) Socksaddr { } func SocksaddrFromNet(ap net.Addr) Socksaddr { + if ap == nil { + return Socksaddr{} + } if socksAddr, ok := ap.(Socksaddr); ok { return socksAddr } diff --git a/common/network/conn.go b/common/network/conn.go index 51e7e34..271c9ae 100644 --- a/common/network/conn.go +++ b/common/network/conn.go @@ -60,6 +60,11 @@ type NetPacketConn interface { PacketConn } +type BindPacketConn interface { + NetPacketConn + net.Conn +} + type UDPHandler interface { NewPacket(ctx context.Context, conn PacketConn, buffer *buf.Buffer, metadata M.Metadata) error } diff --git a/go.mod b/go.mod index c10d1fe..5a21e74 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module github.com/sagernet/sing go 1.18 -require golang.org/x/sys v0.0.0-20220608164250-635b8c9b7f68 +require golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d diff --git a/go.sum b/go.sum index d9787ef..057668c 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,2 @@ -golang.org/x/sys v0.0.0-20220608164250-635b8c9b7f68 h1:z8Hj/bl9cOV2grsOpEaQFUaly0JWN3i97mo3jXKJNp0= -golang.org/x/sys v0.0.0-20220608164250-635b8c9b7f68/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d h1:Zu/JngovGLVi6t2J3nmAf3AoTDwuzw85YZ3b9o4yU7s= +golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/protocol/socks/packet.go b/protocol/socks/packet.go index 5b97d33..8ce654b 100644 --- a/protocol/socks/packet.go +++ b/protocol/socks/packet.go @@ -33,7 +33,7 @@ func NewAssociatePacketConn(conn net.PacketConn, remoteAddr M.Socksaddr, underly func NewAssociateConn(conn net.Conn, remoteAddr M.Socksaddr, underlying net.Conn) *AssociatePacketConn { return &AssociatePacketConn{ - PacketConn: &bufio.UnbindPacketConn{Conn: conn}, + PacketConn: bufio.NewUnbindPacketConn(conn), remoteAddr: remoteAddr, underlying: underlying, }