SOCKS5 UDP implementation

This commit is contained in:
Toby 2020-04-24 19:47:56 -07:00
parent 6b5fc2862a
commit e02ede3076
6 changed files with 207 additions and 402 deletions

View file

@ -7,10 +7,12 @@ import (
"os"
"reflect"
"strings"
"time"
)
const (
mbpsToBps = 125000
mbpsToBps = 125000
dialTimeout = 10 * time.Second
DefaultMaxReceiveStreamFlowControlWindow = 33554432
DefaultMaxReceiveConnectionFlowControlWindow = 67108864

View file

@ -11,6 +11,7 @@ import (
"github.com/tobyxdd/hysteria/pkg/socks5"
"io/ioutil"
"log"
"net"
)
func proxyClient(args []string) {
@ -69,19 +70,29 @@ func proxyClient(args []string) {
defer client.Close()
log.Println("Connected to", config.ServerAddr)
socks5server, err := socks5.NewServer(config.SOCKS5Addr, "", nil, config.SOCKS5Timeout, 0, 0)
socks5server, err := socks5.NewServer(client, config.SOCKS5Addr, nil, config.SOCKS5Timeout,
func(addr net.Addr, reqAddr string) {
log.Printf("[TCP] %s <-> %s\n", addr.String(), reqAddr)
},
func(addr net.Addr, reqAddr string, err error) {
log.Printf("Closed [TCP] %s <-> %s: %s\n", addr.String(), reqAddr, err.Error())
},
func(addr net.Addr) {
log.Printf("[UDP] Associate %s\n", addr.String())
},
func(addr net.Addr, err error) {
log.Printf("Closed [UDP] Associate %s: %s\n", addr.String(), err.Error())
},
func(addr net.Addr, reqAddr string) {
log.Printf("[UDP] %s <-> %s\n", addr.String(), reqAddr)
},
func(addr net.Addr, reqAddr string, err error) {
log.Printf("Closed [UDP] %s <-> %s: %s\n", addr.String(), reqAddr, err.Error())
})
if err != nil {
log.Fatalln("SOCKS5 server initialization failed:", err)
}
log.Println("SOCKS5 server up and running on", config.SOCKS5Addr)
log.Fatalln(socks5server.ListenAndServe(&socks5.HyHandler{
Client: client,
NewTCPRequestFunc: func(addr, reqAddr string) {
log.Printf("[TCP] %s <-> %s\n", addr, reqAddr)
},
TCPRequestClosedFunc: func(addr, reqAddr string, err error) {
log.Printf("Closed [TCP] %s <-> %s: %s\n", addr, reqAddr, err.Error())
},
}))
log.Fatalln(socks5server.ListenAndServe())
}

View file

@ -96,7 +96,7 @@ func proxyServer(args []string) {
if !packet {
// TCP
log.Printf("%s (%s): [TCP] %s\n", addr.String(), username, reqAddr)
conn, err := net.Dial("tcp", reqAddr)
conn, err := net.DialTimeout("tcp", reqAddr, dialTimeout)
if err != nil {
log.Printf("TCP error %s: %s\n", reqAddr, err.Error())
return core.ConnFailed, err.Error(), nil

View file

@ -74,7 +74,7 @@ func relayServer(args []string) {
if packet {
return core.ConnBlocked, "unsupported", nil
}
conn, err := net.Dial("tcp", config.RemoteAddr)
conn, err := net.DialTimeout("tcp", config.RemoteAddr, dialTimeout)
if err != nil {
log.Printf("TCP error %s: %s\n", config.RemoteAddr, err.Error())
return core.ConnFailed, err.Error(), nil

View file

@ -1,86 +0,0 @@
package socks5
import (
"github.com/tobyxdd/hysteria/internal/utils"
"github.com/tobyxdd/hysteria/pkg/core"
"github.com/txthinking/socks5"
"io"
"net"
"time"
)
type HyHandler struct {
Client core.Client
NewTCPRequestFunc func(addr, reqAddr string)
TCPRequestClosedFunc func(addr, reqAddr string, err error)
}
func (h *HyHandler) TCPHandle(server *Server, conn *net.TCPConn, request *socks5.Request) error {
if request.Cmd == socks5.CmdConnect {
h.NewTCPRequestFunc(conn.RemoteAddr().String(), request.Address())
var closeErr error
defer func() {
h.TCPRequestClosedFunc(conn.RemoteAddr().String(), request.Address(), closeErr)
}()
rc, err := h.Client.Dial(false, request.Address())
if err != nil {
_ = sendReply(request, conn, socks5.RepHostUnreachable)
closeErr = err
return err
}
// All good
_ = sendReply(request, conn, socks5.RepSuccess)
defer rc.Close()
closeErr = pipePair(conn, rc, server.TCPDeadline)
return nil
} else {
_ = sendReply(request, conn, socks5.RepCommandNotSupported)
return ErrUnsupportedCmd
}
}
func (h *HyHandler) UDPHandle(server *Server, addr *net.UDPAddr, datagram *socks5.Datagram) error {
// Not supported for now
return nil
}
func sendReply(request *socks5.Request, conn *net.TCPConn, rep byte) error {
var p *socks5.Reply
if request.Atyp == socks5.ATYPIPv4 || request.Atyp == socks5.ATYPDomain {
p = socks5.NewReply(rep, socks5.ATYPIPv4, []byte{0x00, 0x00, 0x00, 0x00}, []byte{0x00, 0x00})
} else {
p = socks5.NewReply(rep, socks5.ATYPIPv6, net.IPv6zero, []byte{0x00, 0x00})
}
_, err := p.WriteTo(conn)
return err
}
func pipePair(conn *net.TCPConn, stream io.ReadWriteCloser, deadline int) error {
errChan := make(chan error, 2)
// TCP to stream
go func() {
buf := make([]byte, utils.PipeBufferSize)
for {
if deadline != 0 {
_ = conn.SetDeadline(time.Now().Add(time.Duration(deadline) * time.Second))
}
rn, err := conn.Read(buf)
if rn > 0 {
_, err := stream.Write(buf[:rn])
if err != nil {
errChan <- err
return
}
}
if err != nil {
errChan <- err
return
}
}
}()
// Stream to TCP
go func() {
errChan <- utils.Pipe(stream, conn, nil)
}()
return <-errChan
}

View file

@ -1,17 +1,16 @@
package socks5
import "errors"
// Modified based on https://github.com/txthinking/socks5/blob/master/server.go
import (
"errors"
"github.com/tobyxdd/hysteria/internal/utils"
"github.com/tobyxdd/hysteria/pkg/core"
"io"
)
import (
"github.com/txthinking/socks5"
"log"
"net"
"time"
"github.com/patrickmn/go-cache"
"github.com/txthinking/runnergroup"
)
var (
@ -19,75 +18,53 @@ var (
ErrUserPassAuth = errors.New("invalid username or password")
)
// Server is socks5 server wrapper
type Server struct {
AuthFunc func(username, password string) bool
Method byte
SupportedCommands []byte
TCPAddr *net.TCPAddr
UDPAddr *net.UDPAddr
ServerAddr *net.UDPAddr
TCPListen *net.TCPListener
UDPConn *net.UDPConn
UDPExchanges *cache.Cache
TCPDeadline int
UDPDeadline int
UDPSessionTime int // If client does't send address, use this fixed time
Handle Handler
TCPUDPAssociate *cache.Cache
RunnerGroup *runnergroup.RunnerGroup
HyClient core.Client
AuthFunc func(username, password string) bool
Method byte
TCPAddr *net.TCPAddr
TCPDeadline int
NewRequestFunc func(addr net.Addr, reqAddr string)
RequestClosedFunc func(addr net.Addr, reqAddr string, err error)
NewUDPAssociateFunc func(addr net.Addr)
UDPAssociateClosedFunc func(addr net.Addr, err error)
NewUDPTunnelFunc func(addr net.Addr, reqAddr string)
UDPTunnelClosedFunc func(addr net.Addr, reqAddr string, err error)
tcpListener *net.TCPListener
}
// UDPExchange used to store client address and remote connection
type UDPExchange struct {
ClientAddr *net.UDPAddr
RemoteConn *net.UDPConn
}
func NewServer(hyClient core.Client, addr string, authFunc func(username, password string) bool, tcpDeadline int,
newReqFunc func(addr net.Addr, reqAddr string), reqClosedFunc func(addr net.Addr, reqAddr string, err error),
newUDPAssociateFunc func(addr net.Addr), udpAssociateClosedFunc func(addr net.Addr, err error),
newUDPTunnelFunc func(addr net.Addr, reqAddr string), udpTunnelClosedFunc func(addr net.Addr, reqAddr string, err error)) (*Server, error) {
func NewServer(addr, ip string, authFunc func(username, password string) bool, tcpDeadline, udpDeadline, udpSessionTime int) (*Server, error) {
_, p, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
}
taddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, err
}
uaddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, err
}
saddr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(ip, p))
if err != nil {
return nil, err
}
m := socks5.MethodNone
if authFunc != nil {
m = socks5.MethodUsernamePassword
}
cs := cache.New(cache.NoExpiration, cache.NoExpiration)
cs1 := cache.New(cache.NoExpiration, cache.NoExpiration)
s := &Server{
Method: m,
AuthFunc: authFunc,
SupportedCommands: []byte{socks5.CmdConnect, socks5.CmdUDP},
TCPAddr: taddr,
UDPAddr: uaddr,
ServerAddr: saddr,
UDPExchanges: cs,
TCPDeadline: tcpDeadline,
UDPDeadline: udpDeadline,
UDPSessionTime: udpSessionTime,
TCPUDPAssociate: cs1,
RunnerGroup: runnergroup.New(),
HyClient: hyClient,
AuthFunc: authFunc,
Method: m,
TCPAddr: taddr,
TCPDeadline: tcpDeadline,
NewRequestFunc: newReqFunc,
RequestClosedFunc: reqClosedFunc,
NewUDPAssociateFunc: newUDPAssociateFunc,
UDPAssociateClosedFunc: udpAssociateClosedFunc,
NewUDPTunnelFunc: newUDPTunnelFunc,
UDPTunnelClosedFunc: udpTunnelClosedFunc,
}
return s, nil
}
// Negotiate handle negotiate packet.
// This method do not handle gssapi(0x01) method now.
// Error or OK both replied.
func (s *Server) Negotiate(c *net.TCPConn) error {
func (s *Server) negotiate(c *net.TCPConn) error {
rq, err := socks5.NewNegotiationRequestFrom(c)
if err != nil {
return err
@ -130,77 +107,15 @@ func (s *Server) Negotiate(c *net.TCPConn) error {
return nil
}
// GetRequest get request packet from client, and check command according to SupportedCommands
// Error replied.
func (s *Server) GetRequest(c *net.TCPConn) (*socks5.Request, error) {
r, err := socks5.NewRequestFrom(c)
if err != nil {
return nil, err
}
var supported bool
for _, c := range s.SupportedCommands {
if r.Cmd == c {
supported = true
break
}
}
if !supported {
var p *socks5.Reply
if r.Atyp == socks5.ATYPIPv4 || r.Atyp == socks5.ATYPDomain {
p = socks5.NewReply(socks5.RepCommandNotSupported, socks5.ATYPIPv4, net.IPv4zero, []byte{0x00, 0x00})
} else {
p = socks5.NewReply(socks5.RepCommandNotSupported, socks5.ATYPIPv6, net.IPv6zero, []byte{0x00, 0x00})
}
if _, err := p.WriteTo(c); err != nil {
return nil, err
}
return nil, ErrUnsupportedCmd
}
return r, nil
}
// Run server
func (s *Server) ListenAndServe(h Handler) error {
if h == nil {
s.Handle = &DefaultHandle{}
} else {
s.Handle = h
}
s.RunnerGroup.Add(&runnergroup.Runner{
Start: func() error {
return s.RunTCPServer()
},
Stop: func() error {
if s.TCPListen != nil {
return s.TCPListen.Close()
}
return nil
},
})
s.RunnerGroup.Add(&runnergroup.Runner{
Start: func() error {
return s.RunUDPServer()
},
Stop: func() error {
if s.UDPConn != nil {
return s.UDPConn.Close()
}
return nil
},
})
return s.RunnerGroup.Wait()
}
// RunTCPServer starts tcp server
func (s *Server) RunTCPServer() error {
func (s *Server) ListenAndServe() error {
var err error
s.TCPListen, err = net.ListenTCP("tcp", s.TCPAddr)
s.tcpListener, err = net.ListenTCP("tcp", s.TCPAddr)
if err != nil {
return err
}
defer s.TCPListen.Close()
defer s.tcpListener.Close()
for {
c, err := s.TCPListen.AcceptTCP()
c, err := s.tcpListener.AcceptTCP()
if err != nil {
return err
}
@ -211,219 +126,182 @@ func (s *Server) RunTCPServer() error {
return
}
}
if err := s.Negotiate(c); err != nil {
if err := s.negotiate(c); err != nil {
return
}
r, err := s.GetRequest(c)
r, err := socks5.NewRequestFrom(c)
if err != nil {
return
}
_ = s.Handle.TCPHandle(s, c, r)
_ = s.handle(c, r)
}(c)
}
}
// RunUDPServer starts udp server
func (s *Server) RunUDPServer() error {
var err error
s.UDPConn, err = net.ListenUDP("udp", s.UDPAddr)
if err != nil {
return err
}
defer s.UDPConn.Close()
for {
b := make([]byte, 65536)
n, addr, err := s.UDPConn.ReadFromUDP(b)
if err != nil {
return err
}
go func(addr *net.UDPAddr, b []byte) {
d, err := socks5.NewDatagramFromBytes(b)
if err != nil {
return
}
if d.Frag != 0x00 {
return
}
_ = s.Handle.UDPHandle(s, addr, d)
}(addr, b[0:n])
}
}
// Stop server
func (s *Server) Shutdown() error {
return s.RunnerGroup.Done()
}
// TCP connection waits for associated UDP to close
func (s *Server) TCPWaitsForUDP(addr *net.UDPAddr) error {
_, p, err := net.SplitHostPort(addr.String())
if err != nil {
return err
}
if p == "0" {
time.Sleep(time.Duration(s.UDPSessionTime) * time.Second)
return nil
}
ch := make(chan byte)
s.TCPUDPAssociate.Set(addr.String(), ch, cache.DefaultExpiration)
<-ch
return nil
}
// UDP releases associated TCP
func (s *Server) UDPReleasesTCP(addr *net.UDPAddr) {
v, ok := s.TCPUDPAssociate.Get(addr.String())
if ok {
ch := v.(chan byte)
ch <- 0x00
s.TCPUDPAssociate.Delete(addr.String())
}
}
// Handler handle tcp, udp request
type Handler interface {
// Request has not been replied yet
TCPHandle(*Server, *net.TCPConn, *socks5.Request) error
UDPHandle(*Server, *net.UDPAddr, *socks5.Datagram) error
}
// DefaultHandle implements Handler interface
type DefaultHandle struct {
}
// TCPHandle auto handle request. You may prefer to do yourself.
func (h *DefaultHandle) TCPHandle(s *Server, c *net.TCPConn, r *socks5.Request) error {
func (s *Server) handle(c *net.TCPConn, r *socks5.Request) error {
if r.Cmd == socks5.CmdConnect {
rc, err := r.Connect(c)
// TCP
s.NewRequestFunc(c.RemoteAddr(), r.Address())
var closeErr error
defer func() {
s.RequestClosedFunc(c.RemoteAddr(), r.Address(), closeErr)
}()
rc, err := s.HyClient.Dial(false, r.Address())
if err != nil {
_ = sendReply(c, socks5.RepHostUnreachable)
closeErr = err
return err
}
defer rc.Close()
go func() {
var bf [1024 * 2]byte
for {
if s.TCPDeadline != 0 {
if err := rc.SetDeadline(time.Now().Add(time.Duration(s.TCPDeadline) * time.Second)); err != nil {
return
}
}
i, err := rc.Read(bf[:])
if err != nil {
return
}
if _, err := c.Write(bf[0:i]); err != nil {
return
}
}
// All good
_ = sendReply(c, socks5.RepSuccess)
closeErr = pipePair(c, rc, s.TCPDeadline)
return nil
} else if r.Cmd == socks5.CmdUDP {
// UDP
s.NewUDPAssociateFunc(c.RemoteAddr())
var closeErr error
defer func() {
s.UDPAssociateClosedFunc(c.RemoteAddr(), closeErr)
}()
var bf [1024 * 2]byte
udpConn, err := net.ListenUDP("udp", &net.UDPAddr{
IP: s.TCPAddr.IP,
Zone: s.TCPAddr.Zone,
})
if err != nil {
_ = sendReply(c, socks5.RepServerFailure)
closeErr = err
return err
}
defer udpConn.Close()
// Send UDP server addr to the client
atyp, addr, port, err := socks5.ParseAddress(udpConn.LocalAddr().String())
if err != nil {
_ = sendReply(c, socks5.RepServerFailure)
closeErr = err
return err
}
_, _ = socks5.NewReply(socks5.RepSuccess, atyp, addr, port).WriteTo(c)
// Let UDP server do its job, we hold the TCP connection here
go s.handleUDP(udpConn)
buf := make([]byte, 1024)
for {
if s.TCPDeadline != 0 {
if err := c.SetDeadline(time.Now().Add(time.Duration(s.TCPDeadline) * time.Second)); err != nil {
return nil
}
_ = c.SetDeadline(time.Now().Add(time.Duration(s.TCPDeadline) * time.Second))
}
i, err := c.Read(bf[:])
_, err := c.Read(buf)
if err != nil {
return nil
}
if _, err := rc.Write(bf[0:i]); err != nil {
return nil
closeErr = err
break
}
}
}
if r.Cmd == socks5.CmdUDP {
caddr, err := r.UDP(c, s.ServerAddr)
if err != nil {
return err
}
if err := s.TCPWaitsForUDP(caddr); err != nil {
return err
}
// As the TCP connection closes, so does the UDP listener
return nil
} else {
_ = sendReply(c, socks5.RepCommandNotSupported)
return ErrUnsupportedCmd
}
return ErrUnsupportedCmd
}
// UDPHandle auto handle packet. You may prefer to do yourself.
func (h *DefaultHandle) UDPHandle(s *Server, addr *net.UDPAddr, d *socks5.Datagram) error {
send := func(ue *UDPExchange, data []byte) error {
_, err := ue.RemoteConn.Write(data)
func (s *Server) handleUDP(c *net.UDPConn) {
var clientAddr *net.UDPAddr
remoteMap := make(map[string]io.ReadWriteCloser) // Remote addr <-> Remote conn
buf := make([]byte, utils.PipeBufferSize)
var closeErr error
for {
n, caddr, err := c.ReadFromUDP(buf)
if err != nil {
return err
closeErr = err
break
}
if socks5.Debug {
log.Printf("Sent UDP data to remote. client: %#v server: %#v remote: %#v data: %#v\n", ue.ClientAddr.String(), ue.RemoteConn.LocalAddr().String(), ue.RemoteConn.RemoteAddr().String(), data)
d, err := socks5.NewDatagramFromBytes(buf[:n])
if err != nil || d.Frag != 0 {
// Ignore bad packets
continue
}
if clientAddr == nil {
// Whoever sends the first valid packet is our client :P
clientAddr = caddr
} else if caddr.String() != clientAddr.String() {
// We already have a client and you're not it!
continue
}
rc := remoteMap[d.Address()]
if rc == nil {
// Need a new entry
rc, err = s.HyClient.Dial(true, d.Address())
if err != nil {
// Failed to establish a connection, silently ignore
continue
}
// The other direction
go udpReversePipe(clientAddr, c, rc)
remoteMap[d.Address()] = rc
s.NewUDPTunnelFunc(clientAddr, d.Address())
}
_, err = rc.Write(d.Data)
if err != nil {
// The connection is no longer valid, close & remove from map
_ = rc.Close()
delete(remoteMap, d.Address())
s.UDPTunnelClosedFunc(clientAddr, d.Address(), err)
}
return nil
}
// Close all remote connections
for raddr, rc := range remoteMap {
_ = rc.Close()
s.UDPTunnelClosedFunc(clientAddr, raddr, closeErr)
}
}
var ue *UDPExchange
iue, ok := s.UDPExchanges.Get(addr.String())
if ok {
ue = iue.(*UDPExchange)
return send(ue, d.Data)
}
func sendReply(conn *net.TCPConn, rep byte) error {
p := socks5.NewReply(rep, socks5.ATYPIPv4, []byte{0x00, 0x00, 0x00, 0x00}, []byte{0x00, 0x00})
_, err := p.WriteTo(conn)
return err
}
if socks5.Debug {
log.Printf("Call udp: %#v\n", d.Address())
}
c, err := socks5.Dial.Dial("udp", d.Address())
if err != nil {
s.UDPReleasesTCP(addr)
return err
}
// A UDP association terminates when the TCP connection that the UDP
// ASSOCIATE request arrived on terminates.
rc := c.(*net.UDPConn)
ue = &UDPExchange{
ClientAddr: addr,
RemoteConn: rc,
}
if socks5.Debug {
log.Printf("Created remote UDP conn for client. client: %#v server: %#v remote: %#v\n", addr.String(), ue.RemoteConn.LocalAddr().String(), d.Address())
}
if err := send(ue, d.Data); err != nil {
s.UDPReleasesTCP(ue.ClientAddr)
ue.RemoteConn.Close()
return err
}
s.UDPExchanges.Set(ue.ClientAddr.String(), ue, cache.DefaultExpiration)
go func(ue *UDPExchange) {
defer func() {
s.UDPReleasesTCP(ue.ClientAddr)
s.UDPExchanges.Delete(ue.ClientAddr.String())
ue.RemoteConn.Close()
}()
var b [65536]byte
func pipePair(conn *net.TCPConn, stream io.ReadWriteCloser, deadline int) error {
errChan := make(chan error, 2)
// TCP to stream
go func() {
buf := make([]byte, utils.PipeBufferSize)
for {
if s.UDPDeadline != 0 {
if err := ue.RemoteConn.SetDeadline(time.Now().Add(time.Duration(s.UDPDeadline) * time.Second)); err != nil {
log.Println(err)
break
if deadline != 0 {
_ = conn.SetDeadline(time.Now().Add(time.Duration(deadline) * time.Second))
}
rn, err := conn.Read(buf)
if rn > 0 {
_, err := stream.Write(buf[:rn])
if err != nil {
errChan <- err
return
}
}
n, err := ue.RemoteConn.Read(b[:])
if err != nil {
break
}
if socks5.Debug {
log.Printf("Got UDP data from remote. client: %#v server: %#v remote: %#v data: %#v\n", ue.ClientAddr.String(), ue.RemoteConn.LocalAddr().String(), ue.RemoteConn.RemoteAddr().String(), b[0:n])
}
a, addr, port, err := socks5.ParseAddress(ue.ClientAddr.String())
if err != nil {
log.Println(err)
break
}
d1 := socks5.NewDatagram(a, addr, port, b[0:n])
if _, err := s.UDPConn.WriteToUDP(d1.Bytes(), ue.ClientAddr); err != nil {
break
}
if socks5.Debug {
log.Printf("Sent Datagram. client: %#v server: %#v remote: %#v data: %#v %#v %#v %#v %#v %#v datagram address: %#v\n", ue.ClientAddr.String(), ue.RemoteConn.LocalAddr().String(), ue.RemoteConn.RemoteAddr().String(), d1.Rsv, d1.Frag, d1.Atyp, d1.DstAddr, d1.DstPort, d1.Data, d1.Address())
errChan <- err
return
}
}
}(ue)
return nil
}()
// Stream to TCP
go func() {
errChan <- utils.Pipe(stream, conn, nil)
}()
return <-errChan
}
func udpReversePipe(clientAddr *net.UDPAddr, c *net.UDPConn, rc io.ReadWriteCloser) {
buf := make([]byte, utils.PipeBufferSize)
for {
n, err := rc.Read(buf)
if err != nil {
break
}
d := socks5.NewDatagram(socks5.ATYPIPv4, []byte{0x00, 0x00, 0x00, 0x00}, []byte{0x00, 0x00}, buf[:n])
_, err = c.WriteTo(d.Bytes(), clientAddr)
if err != nil {
break
}
}
}