Relay & better logging

This commit is contained in:
Toby 2021-02-05 01:00:44 -08:00
parent 7d280393a3
commit 565d659338
6 changed files with 270 additions and 117 deletions

View file

@ -11,7 +11,9 @@ import (
"github.com/tobyxdd/hysteria/pkg/core"
hyHTTP "github.com/tobyxdd/hysteria/pkg/http"
"github.com/tobyxdd/hysteria/pkg/obfs"
"github.com/tobyxdd/hysteria/pkg/relay"
"github.com/tobyxdd/hysteria/pkg/socks5"
"io"
"io/ioutil"
"net"
"net/http"
@ -101,46 +103,28 @@ func client(config *clientConfig) {
return config.SOCKS5.User == user && config.SOCKS5.Password == password
}
}
socks5server, err := socks5.NewServer(client, config.SOCKS5.Listen, authFunc, config.SOCKS5.Timeout, aclEngine,
config.SOCKS5.DisableUDP,
socks5server, err := socks5.NewServer(client, config.SOCKS5.Listen, authFunc,
time.Duration(config.SOCKS5.Timeout)*time.Second, aclEngine, config.SOCKS5.DisableUDP,
func(addr net.Addr, reqAddr string, action acl.Action, arg string) {
logrus.WithFields(logrus.Fields{
"action": actionToString(action, arg),
"src": addr.String(),
"dst": reqAddr,
}).Debug("New SOCKS5 TCP request")
}).Debug("SOCKS5 TCP request")
},
func(addr net.Addr, reqAddr string, err error) {
logrus.WithFields(logrus.Fields{
"error": err,
"src": addr.String(),
"dst": reqAddr,
}).Debug("SOCKS5 TCP request closed")
},
func(addr net.Addr) {
logrus.WithFields(logrus.Fields{
"src": addr.String(),
}).Debug("New SOCKS5 UDP associate request")
},
func(addr net.Addr, err error) {
logrus.WithFields(logrus.Fields{
"error": err,
"src": addr.String(),
}).Debug("SOCKS5 UDP associate request closed")
},
func(addr net.Addr, reqAddr string, action acl.Action, arg string) {
logrus.WithFields(logrus.Fields{
"action": actionToString(action, arg),
"src": addr.String(),
"dst": reqAddr,
}).Debug("New SOCKS5 UDP tunnel")
},
func(addr net.Addr, reqAddr string, err error) {
logrus.WithFields(logrus.Fields{
"error": err,
"src": addr.String(),
"dst": reqAddr,
}).Debug("SOCKS5 UDP tunnel closed")
if err != io.EOF {
logrus.WithFields(logrus.Fields{
"error": err,
"src": addr.String(),
"dst": reqAddr,
}).Info("SOCKS5 TCP error")
} else {
logrus.WithFields(logrus.Fields{
"src": addr.String(),
"dst": reqAddr,
}).Debug("SOCKS5 TCP EOF")
}
})
if err != nil {
logrus.WithField("error", err).Fatal("Failed to initialize SOCKS5 server")
@ -163,7 +147,7 @@ func client(config *clientConfig) {
logrus.WithFields(logrus.Fields{
"action": actionToString(action, arg),
"dst": reqAddr,
}).Debug("New HTTP request")
}).Debug("HTTP request")
},
authFunc)
if err != nil {
@ -179,21 +163,36 @@ func client(config *clientConfig) {
}()
}
if len(config.Relay.Listen) > 0 {
go func() {
rl, err := relay.NewRelay(client, config.Relay.Listen, config.Relay.Remote,
time.Duration(config.Relay.Timeout)*time.Second,
func(addr net.Addr) {
logrus.WithFields(logrus.Fields{
"src": addr.String(),
}).Debug("TCP relay request")
},
func(addr net.Addr, err error) {
if err != io.EOF {
logrus.WithFields(logrus.Fields{
"error": err,
"src": addr.String(),
}).Info("TCP relay error")
} else {
logrus.WithFields(logrus.Fields{
"src": addr.String(),
}).Debug("TCP relay EOF")
}
})
if err != nil {
logrus.WithField("error", err).Fatal("Failed to initialize TCP relay")
}
logrus.WithField("addr", config.Relay.Listen).Info("TCP relay up and running")
errChan <- rl.ListenAndServe()
}()
}
err = <-errChan
logrus.WithField("error", err).Fatal("Client shutdown")
}
func actionToString(action acl.Action, arg string) string {
switch action {
case acl.ActionDirect:
return "Direct"
case acl.ActionProxy:
return "Proxy"
case acl.ActionBlock:
return "Block"
case acl.ActionHijack:
return "Hijack to " + arg
default:
return "Unknown"
}
}

View file

@ -9,6 +9,7 @@ import (
hyCongestion "github.com/tobyxdd/hysteria/pkg/congestion"
"github.com/tobyxdd/hysteria/pkg/core"
"github.com/tobyxdd/hysteria/pkg/obfs"
"io"
"net"
"strings"
)
@ -78,16 +79,7 @@ func server(config *serverConfig) {
uint64(config.UpMbps)*mbpsToBps, uint64(config.DownMbps)*mbpsToBps,
func(refBPS uint64) congestion.CongestionControl {
return hyCongestion.NewBrutalSender(congestion.ByteCount(refBPS))
}, aclEngine, obfuscator, authFunc, func(addr net.Addr, auth []byte, udp bool, reqAddr string) {
if !udp {
logrus.WithFields(logrus.Fields{
"src": addr.String(),
"dst": reqAddr,
}).Debug("New TCP request")
} else {
// TODO
}
})
}, aclEngine, obfuscator, authFunc, tcpRequestFunc, tcpErrorFunc)
if err != nil {
logrus.WithField("error", err).Fatal("Failed to initialize server")
}
@ -97,3 +89,41 @@ func server(config *serverConfig) {
err = server.Serve()
logrus.WithField("error", err).Fatal("Server shutdown")
}
func tcpRequestFunc(addr net.Addr, auth []byte, reqAddr string, action acl.Action, arg string) {
logrus.WithFields(logrus.Fields{
"src": addr.String(),
"dst": reqAddr,
"action": actionToString(action, arg),
}).Debug("TCP request")
}
func tcpErrorFunc(addr net.Addr, auth []byte, reqAddr string, err error) {
if err != io.EOF {
logrus.WithFields(logrus.Fields{
"src": addr.String(),
"dst": reqAddr,
"error": err,
}).Info("TCP error")
} else {
logrus.WithFields(logrus.Fields{
"src": addr.String(),
"dst": reqAddr,
}).Debug("TCP EOF")
}
}
func actionToString(action acl.Action, arg string) string {
switch action {
case acl.ActionDirect:
return "Direct"
case acl.ActionProxy:
return "Proxy"
case acl.ActionBlock:
return "Block"
case acl.ActionHijack:
return "Hijack to " + arg
default:
return "Unknown"
}
}

View file

@ -15,21 +15,24 @@ import (
const dialTimeout = 10 * time.Second
type AuthFunc func(addr net.Addr, auth []byte, sSend uint64, sRecv uint64) (bool, string)
type RequestFunc func(addr net.Addr, auth []byte, udp bool, reqAddr string)
type TCPRequestFunc func(addr net.Addr, auth []byte, reqAddr string, action acl.Action, arg string)
type TCPErrorFunc func(addr net.Addr, auth []byte, reqAddr string, err error)
type Server struct {
sendBPS, recvBPS uint64
congestionFactory CongestionFactory
authFunc AuthFunc
requestFunc RequestFunc
aclEngine *acl.Engine
authFunc AuthFunc
tcpRequestFunc TCPRequestFunc
tcpErrorFunc TCPErrorFunc
listener quic.Listener
}
func NewServer(addr string, tlsConfig *tls.Config, quicConfig *quic.Config,
sendBPS uint64, recvBPS uint64, congestionFactory CongestionFactory, aclEngine *acl.Engine,
obfuscator Obfuscator, authFunc AuthFunc, requestFunc RequestFunc) (*Server, error) {
obfuscator Obfuscator, authFunc AuthFunc, tcpRequestFunc TCPRequestFunc, tcpErrorFunc TCPErrorFunc) (*Server, error) {
packetConn, err := net.ListenPacket("udp", addr)
if err != nil {
return nil, err
@ -50,9 +53,10 @@ func NewServer(addr string, tlsConfig *tls.Config, quicConfig *quic.Config,
sendBPS: sendBPS,
recvBPS: recvBPS,
congestionFactory: congestionFactory,
authFunc: authFunc,
requestFunc: requestFunc,
aclEngine: aclEngine,
authFunc: authFunc,
tcpRequestFunc: tcpRequestFunc,
tcpErrorFunc: tcpErrorFunc,
}
return s, nil
}
@ -148,23 +152,23 @@ func (s *Server) handleStream(remoteAddr net.Addr, auth []byte, stream quic.Stre
if err != nil {
return
}
s.requestFunc(remoteAddr, auth, req.UDP, req.Address)
if !req.UDP {
// TCP connection
s.handleTCP(stream, req.Address)
s.handleTCP(remoteAddr, auth, stream, req.Address)
} else {
// UDP connection
// TODO
}
}
func (s *Server) handleTCP(stream quic.Stream, reqAddr string) {
func (s *Server) handleTCP(remoteAddr net.Addr, auth []byte, stream quic.Stream, reqAddr string) {
host, port, err := net.SplitHostPort(reqAddr)
if err != nil {
_ = struc.Pack(stream, &serverResponse{
OK: false,
Message: "invalid address",
})
s.tcpErrorFunc(remoteAddr, auth, reqAddr, err)
return
}
ip := net.ParseIP(host)
@ -176,6 +180,7 @@ func (s *Server) handleTCP(stream quic.Stream, reqAddr string) {
if s.aclEngine != nil {
action, arg = s.aclEngine.Lookup(host, ip)
}
s.tcpRequestFunc(remoteAddr, auth, reqAddr, action, arg)
var conn net.Conn // Connection to be piped
switch action {
@ -186,6 +191,7 @@ func (s *Server) handleTCP(stream quic.Stream, reqAddr string) {
OK: false,
Message: err.Error(),
})
s.tcpErrorFunc(remoteAddr, auth, reqAddr, err)
return
}
case acl.ActionBlock:
@ -202,6 +208,7 @@ func (s *Server) handleTCP(stream quic.Stream, reqAddr string) {
OK: false,
Message: err.Error(),
})
s.tcpErrorFunc(remoteAddr, auth, reqAddr, err)
return
}
default:
@ -212,11 +219,13 @@ func (s *Server) handleTCP(stream quic.Stream, reqAddr string) {
return
}
// So far so good if we reach here
defer conn.Close()
err = struc.Pack(stream, &serverResponse{
OK: true,
})
if err != nil {
return
}
_ = utils.Pipe2Way(stream, conn)
err = utils.Pipe2Way(stream, conn)
s.tcpErrorFunc(remoteAddr, auth, reqAddr, err)
}

View file

@ -52,7 +52,7 @@ func NewProxyHTTPServer(hyClient *core.Client, idleTimeout time.Duration, aclEng
}
},
IdleConnTimeout: idleTimeout,
// TODO: Disable HTTP2 support? ref: https://github.com/elazarl/goproxy/issues/361
// Disable HTTP2 support? ref: https://github.com/elazarl/goproxy/issues/361
}
proxy.ConnectDial = nil
if basicAuthFunc != nil {

112
pkg/relay/relay.go Normal file
View file

@ -0,0 +1,112 @@
package relay
import (
"github.com/tobyxdd/hysteria/pkg/core"
"github.com/tobyxdd/hysteria/pkg/utils"
"io"
"net"
"time"
)
type Relay struct {
HyClient *core.Client
ListenAddr *net.TCPAddr
Remote string
Timeout time.Duration
ConnFunc func(addr net.Addr)
ErrorFunc func(addr net.Addr, err error)
tcpListener *net.TCPListener
}
func NewRelay(hyClient *core.Client, listen, remote string, timeout time.Duration,
connFunc func(addr net.Addr), errorFunc func(addr net.Addr, err error)) (*Relay, error) {
tAddr, err := net.ResolveTCPAddr("tcp", listen)
if err != nil {
return nil, err
}
r := &Relay{
HyClient: hyClient,
ListenAddr: tAddr,
Remote: remote,
Timeout: timeout,
ConnFunc: connFunc,
ErrorFunc: errorFunc,
}
return r, nil
}
func (r *Relay) ListenAndServe() error {
var err error
r.tcpListener, err = net.ListenTCP("tcp", r.ListenAddr)
if err != nil {
return err
}
defer r.tcpListener.Close()
for {
c, err := r.tcpListener.AcceptTCP()
if err != nil {
return err
}
go func(c *net.TCPConn) {
defer c.Close()
r.ConnFunc(c.RemoteAddr())
rc, err := r.HyClient.DialTCP(r.Remote)
if err != nil {
r.ErrorFunc(c.RemoteAddr(), err)
return
}
defer rc.Close()
err = pipePair(c, rc, r.Timeout)
r.ErrorFunc(c.RemoteAddr(), err)
}(c)
}
}
func pipePair(conn *net.TCPConn, stream io.ReadWriteCloser, timeout time.Duration) error {
errChan := make(chan error, 2)
// TCP to stream
go func() {
buf := make([]byte, utils.PipeBufferSize)
for {
if timeout != 0 {
_ = conn.SetDeadline(time.Now().Add(timeout))
}
rn, err := conn.Read(buf)
if rn > 0 {
_, err := stream.Write(buf[:rn])
if err != nil {
errChan <- err
return
}
}
if err != nil {
errChan <- err
return
}
}
}()
// Stream to TCP
go func() {
buf := make([]byte, utils.PipeBufferSize)
for {
rn, err := stream.Read(buf)
if rn > 0 {
_, err := conn.Write(buf[:rn])
if err != nil {
errChan <- err
return
}
if timeout != 0 {
_ = conn.SetDeadline(time.Now().Add(timeout))
}
}
if err != nil {
errChan <- err
return
}
}
}()
return <-errChan
}

View file

@ -23,34 +23,25 @@ var (
)
type Server struct {
HyClient *core.Client
AuthFunc func(username, password string) bool
Method byte
TCPAddr *net.TCPAddr
TCPDeadline int
ACLEngine *acl.Engine
DisableUDP bool
HyClient *core.Client
AuthFunc func(username, password string) bool
Method byte
TCPAddr *net.TCPAddr
TCPTimeout time.Duration
ACLEngine *acl.Engine
DisableUDP bool
NewRequestFunc func(addr net.Addr, reqAddr string, action acl.Action, arg string)
RequestClosedFunc func(addr net.Addr, reqAddr string, err error)
NewUDPAssociateFunc func(addr net.Addr)
UDPAssociateClosedFunc func(addr net.Addr, err error)
NewUDPTunnelFunc func(addr net.Addr, reqAddr string, action acl.Action, arg string)
UDPTunnelClosedFunc func(addr net.Addr, reqAddr string, err error)
TCPRequestFunc func(addr net.Addr, reqAddr string, action acl.Action, arg string)
TCPErrorFunc func(addr net.Addr, reqAddr string, err error)
tcpListener *net.TCPListener
}
func NewServer(hyClient *core.Client, addr string, authFunc func(username, password string) bool, tcpDeadline int,
func NewServer(hyClient *core.Client, addr string, authFunc func(username, password string) bool, tcpTimeout time.Duration,
aclEngine *acl.Engine, disableUDP bool,
newReqFunc func(addr net.Addr, reqAddr string, action acl.Action, arg string),
reqClosedFunc func(addr net.Addr, reqAddr string, err error),
newUDPAssociateFunc func(addr net.Addr),
udpAssociateClosedFunc func(addr net.Addr, err error),
newUDPTunnelFunc func(addr net.Addr, reqAddr string, action acl.Action, arg string),
udpTunnelClosedFunc func(addr net.Addr, reqAddr string, err error)) (*Server, error) {
taddr, err := net.ResolveTCPAddr("tcp", addr)
tcpReqFunc func(addr net.Addr, reqAddr string, action acl.Action, arg string),
tcpErrorFunc func(addr net.Addr, reqAddr string, err error)) (*Server, error) {
tAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, err
}
@ -59,19 +50,15 @@ func NewServer(hyClient *core.Client, addr string, authFunc func(username, passw
m = socks5.MethodUsernamePassword
}
s := &Server{
HyClient: hyClient,
AuthFunc: authFunc,
Method: m,
TCPAddr: taddr,
TCPDeadline: tcpDeadline,
ACLEngine: aclEngine,
DisableUDP: disableUDP,
NewRequestFunc: newReqFunc,
RequestClosedFunc: reqClosedFunc,
NewUDPAssociateFunc: newUDPAssociateFunc,
UDPAssociateClosedFunc: udpAssociateClosedFunc,
NewUDPTunnelFunc: newUDPTunnelFunc,
UDPTunnelClosedFunc: udpTunnelClosedFunc,
HyClient: hyClient,
AuthFunc: authFunc,
Method: m,
TCPAddr: tAddr,
TCPTimeout: tcpTimeout,
ACLEngine: aclEngine,
DisableUDP: disableUDP,
TCPRequestFunc: tcpReqFunc,
TCPErrorFunc: tcpErrorFunc,
}
return s, nil
}
@ -133,8 +120,8 @@ func (s *Server) ListenAndServe() error {
}
go func(c *net.TCPConn) {
defer c.Close()
if s.TCPDeadline != 0 {
if err := c.SetDeadline(time.Now().Add(time.Duration(s.TCPDeadline) * time.Second)); err != nil {
if s.TCPTimeout != 0 {
if err := c.SetDeadline(time.Now().Add(s.TCPTimeout)); err != nil {
return
}
}
@ -170,10 +157,10 @@ func (s *Server) handleTCP(c *net.TCPConn, r *socks5.Request) error {
if s.ACLEngine != nil {
action, arg = s.ACLEngine.Lookup(domain, ip)
}
s.NewRequestFunc(c.RemoteAddr(), addr, action, arg)
s.TCPRequestFunc(c.RemoteAddr(), addr, action, arg)
var closeErr error
defer func() {
s.RequestClosedFunc(c.RemoteAddr(), addr, closeErr)
s.TCPErrorFunc(c.RemoteAddr(), addr, closeErr)
}()
// Handle according to the action
switch action {
@ -186,7 +173,7 @@ func (s *Server) handleTCP(c *net.TCPConn, r *socks5.Request) error {
}
defer rc.Close()
_ = sendReply(c, socks5.RepSuccess)
closeErr = pipePair(c, rc, s.TCPDeadline)
closeErr = pipePair(c, rc, s.TCPTimeout)
return nil
case acl.ActionProxy:
rc, err := s.HyClient.DialTCP(addr)
@ -197,7 +184,7 @@ func (s *Server) handleTCP(c *net.TCPConn, r *socks5.Request) error {
}
defer rc.Close()
_ = sendReply(c, socks5.RepSuccess)
closeErr = pipePair(c, rc, s.TCPDeadline)
closeErr = pipePair(c, rc, s.TCPTimeout)
return nil
case acl.ActionBlock:
_ = sendReply(c, socks5.RepHostUnreachable)
@ -212,7 +199,7 @@ func (s *Server) handleTCP(c *net.TCPConn, r *socks5.Request) error {
}
defer rc.Close()
_ = sendReply(c, socks5.RepSuccess)
closeErr = pipePair(c, rc, s.TCPDeadline)
closeErr = pipePair(c, rc, s.TCPTimeout)
return nil
default:
_ = sendReply(c, socks5.RepServerFailure)
@ -237,15 +224,14 @@ func parseRequestAddress(r *socks5.Request) (domain string, ip net.IP, port stri
}
}
func pipePair(conn *net.TCPConn, stream io.ReadWriteCloser, deadline int) error {
deadlineDuration := time.Duration(deadline) * time.Second
func pipePair(conn *net.TCPConn, stream io.ReadWriteCloser, timeout time.Duration) error {
errChan := make(chan error, 2)
// TCP to stream
go func() {
buf := make([]byte, utils.PipeBufferSize)
for {
if deadline != 0 {
_ = conn.SetDeadline(time.Now().Add(deadlineDuration))
if timeout != 0 {
_ = conn.SetDeadline(time.Now().Add(timeout))
}
rn, err := conn.Read(buf)
if rn > 0 {
@ -263,7 +249,24 @@ func pipePair(conn *net.TCPConn, stream io.ReadWriteCloser, deadline int) error
}()
// Stream to TCP
go func() {
errChan <- utils.Pipe(stream, conn)
buf := make([]byte, utils.PipeBufferSize)
for {
rn, err := stream.Read(buf)
if rn > 0 {
_, err := conn.Write(buf[:rn])
if err != nil {
errChan <- err
return
}
if timeout != 0 {
_ = conn.SetDeadline(time.Now().Add(timeout))
}
}
if err != nil {
errChan <- err
return
}
}
}()
return <-errChan
}