mirror of
https://github.com/apernet/hysteria.git
synced 2025-04-03 20:47:38 +03:00
321 lines
9 KiB
Go
321 lines
9 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"net/http"
|
|
"sync"
|
|
|
|
"github.com/apernet/quic-go"
|
|
"github.com/apernet/quic-go/http3"
|
|
|
|
"github.com/apernet/hysteria/core/internal/congestion"
|
|
"github.com/apernet/hysteria/core/internal/protocol"
|
|
"github.com/apernet/hysteria/core/internal/utils"
|
|
)
|
|
|
|
const (
|
|
closeErrCodeOK = 0x100 // HTTP3 ErrCodeNoError
|
|
closeErrCodeTrafficLimitReached = 0x107 // HTTP3 ErrCodeExcessiveLoad
|
|
)
|
|
|
|
type Server interface {
|
|
Serve() error
|
|
Close() error
|
|
}
|
|
|
|
func NewServer(config *Config) (Server, error) {
|
|
if err := config.fill(); err != nil {
|
|
return nil, err
|
|
}
|
|
tlsConfig := http3.ConfigureTLSConfig(&tls.Config{
|
|
Certificates: config.TLSConfig.Certificates,
|
|
GetCertificate: config.TLSConfig.GetCertificate,
|
|
})
|
|
quicConfig := &quic.Config{
|
|
InitialStreamReceiveWindow: config.QUICConfig.InitialStreamReceiveWindow,
|
|
MaxStreamReceiveWindow: config.QUICConfig.MaxStreamReceiveWindow,
|
|
InitialConnectionReceiveWindow: config.QUICConfig.InitialConnectionReceiveWindow,
|
|
MaxConnectionReceiveWindow: config.QUICConfig.MaxConnectionReceiveWindow,
|
|
MaxIdleTimeout: config.QUICConfig.MaxIdleTimeout,
|
|
MaxIncomingStreams: config.QUICConfig.MaxIncomingStreams,
|
|
DisablePathMTUDiscovery: config.QUICConfig.DisablePathMTUDiscovery,
|
|
EnableDatagrams: true,
|
|
}
|
|
listener, err := quic.Listen(config.Conn, tlsConfig, quicConfig)
|
|
if err != nil {
|
|
_ = config.Conn.Close()
|
|
return nil, err
|
|
}
|
|
return &serverImpl{
|
|
config: config,
|
|
listener: listener,
|
|
}, nil
|
|
}
|
|
|
|
type serverImpl struct {
|
|
config *Config
|
|
listener *quic.Listener
|
|
}
|
|
|
|
func (s *serverImpl) Serve() error {
|
|
for {
|
|
conn, err := s.listener.Accept(context.Background())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
go s.handleClient(conn)
|
|
}
|
|
}
|
|
|
|
func (s *serverImpl) Close() error {
|
|
err := s.listener.Close()
|
|
_ = s.config.Conn.Close()
|
|
return err
|
|
}
|
|
|
|
func (s *serverImpl) handleClient(conn quic.Connection) {
|
|
handler := newH3sHandler(s.config, conn)
|
|
h3s := http3.Server{
|
|
Handler: handler,
|
|
StreamHijacker: handler.ProxyStreamHijacker,
|
|
}
|
|
err := h3s.ServeQUICConn(conn)
|
|
// If the client is authenticated, we need to log the disconnect event
|
|
if handler.authenticated && s.config.EventLogger != nil {
|
|
s.config.TrafficLogger.LogOffline(handler.authID, conn.RemoteAddr())
|
|
s.config.EventLogger.Disconnect(conn.RemoteAddr(), handler.authID, err)
|
|
}
|
|
_ = conn.CloseWithError(closeErrCodeOK, "")
|
|
}
|
|
|
|
type h3sHandler struct {
|
|
config *Config
|
|
conn quic.Connection
|
|
|
|
authenticated bool
|
|
authMutex sync.Mutex
|
|
authID string
|
|
|
|
udpSM *udpSessionManager // Only set after authentication
|
|
}
|
|
|
|
func newH3sHandler(config *Config, conn quic.Connection) *h3sHandler {
|
|
return &h3sHandler{
|
|
config: config,
|
|
conn: conn,
|
|
}
|
|
}
|
|
|
|
func (h *h3sHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method == http.MethodPost && r.Host == protocol.URLHost && r.URL.Path == protocol.URLPath {
|
|
h.authMutex.Lock()
|
|
defer h.authMutex.Unlock()
|
|
if h.authenticated {
|
|
// Already authenticated
|
|
protocol.AuthResponseToHeader(w.Header(), protocol.AuthResponse{
|
|
UDPEnabled: !h.config.DisableUDP,
|
|
Rx: h.config.BandwidthConfig.MaxRx,
|
|
RxAuto: h.config.IgnoreClientBandwidth,
|
|
})
|
|
w.WriteHeader(protocol.StatusAuthOK)
|
|
return
|
|
}
|
|
authReq := protocol.AuthRequestFromHeader(r.Header)
|
|
actualTx := authReq.Rx
|
|
ok, id := h.config.Authenticator.Authenticate(h.conn.RemoteAddr(), authReq.Auth, actualTx)
|
|
if ok {
|
|
// Set authenticated flag
|
|
h.authenticated = true
|
|
h.authID = id
|
|
if h.config.IgnoreClientBandwidth {
|
|
// Ignore client bandwidth, always use BBR
|
|
congestion.UseBBR(h.conn)
|
|
actualTx = 0
|
|
} else {
|
|
// actualTx = min(serverTx, clientRx)
|
|
if h.config.BandwidthConfig.MaxTx > 0 && actualTx > h.config.BandwidthConfig.MaxTx {
|
|
// We have a maxTx limit and the client is asking for more than that,
|
|
// return and use the limit instead
|
|
actualTx = h.config.BandwidthConfig.MaxTx
|
|
}
|
|
if actualTx > 0 {
|
|
congestion.UseBrutal(h.conn, actualTx)
|
|
} else {
|
|
// Client doesn't know its own bandwidth, use BBR
|
|
congestion.UseBBR(h.conn)
|
|
}
|
|
}
|
|
// Auth OK, send response
|
|
protocol.AuthResponseToHeader(w.Header(), protocol.AuthResponse{
|
|
UDPEnabled: !h.config.DisableUDP,
|
|
Rx: h.config.BandwidthConfig.MaxRx,
|
|
RxAuto: h.config.IgnoreClientBandwidth,
|
|
})
|
|
w.WriteHeader(protocol.StatusAuthOK)
|
|
// Call event logger
|
|
if h.config.EventLogger != nil {
|
|
h.config.TrafficLogger.LogOnline(id, h.conn.RemoteAddr())
|
|
h.config.EventLogger.Connect(h.conn.RemoteAddr(), id, actualTx)
|
|
}
|
|
// Initialize UDP session manager (if UDP is enabled)
|
|
// We use sync.Once to make sure that only one goroutine is started,
|
|
// as ServeHTTP may be called by multiple goroutines simultaneously
|
|
if !h.config.DisableUDP {
|
|
go func() {
|
|
sm := newUDPSessionManager(
|
|
&udpIOImpl{h.conn, id, h.config.TrafficLogger, h.config.Outbound},
|
|
&udpEventLoggerImpl{h.conn, id, h.config.EventLogger},
|
|
h.config.UDPIdleTimeout)
|
|
h.udpSM = sm
|
|
go sm.Run()
|
|
}()
|
|
}
|
|
} else {
|
|
// Auth failed, pretend to be a normal HTTP server
|
|
h.masqHandler(w, r)
|
|
}
|
|
} else {
|
|
// Not an auth request, pretend to be a normal HTTP server
|
|
h.masqHandler(w, r)
|
|
}
|
|
}
|
|
|
|
func (h *h3sHandler) ProxyStreamHijacker(ft http3.FrameType, id quic.ConnectionTracingID, stream quic.Stream, err error) (bool, error) {
|
|
if err != nil || !h.authenticated {
|
|
return false, nil
|
|
}
|
|
|
|
// Wraps the stream with QStream, which handles Close() properly
|
|
stream = &utils.QStream{Stream: stream}
|
|
|
|
switch ft {
|
|
case protocol.FrameTypeTCPRequest:
|
|
go h.handleTCPRequest(stream)
|
|
return true, nil
|
|
default:
|
|
return false, nil
|
|
}
|
|
}
|
|
|
|
func (h *h3sHandler) handleTCPRequest(stream quic.Stream) {
|
|
// Read request
|
|
reqAddr, err := protocol.ReadTCPRequest(stream)
|
|
if err != nil {
|
|
_ = stream.Close()
|
|
return
|
|
}
|
|
// Log the event
|
|
if h.config.EventLogger != nil {
|
|
h.config.EventLogger.TCPRequest(h.conn.RemoteAddr(), h.authID, reqAddr)
|
|
}
|
|
// Dial target
|
|
tConn, err := h.config.Outbound.TCP(reqAddr)
|
|
if err != nil {
|
|
_ = protocol.WriteTCPResponse(stream, false, err.Error())
|
|
_ = stream.Close()
|
|
// Log the error
|
|
if h.config.EventLogger != nil {
|
|
h.config.EventLogger.TCPError(h.conn.RemoteAddr(), h.authID, reqAddr, err)
|
|
}
|
|
return
|
|
}
|
|
_ = protocol.WriteTCPResponse(stream, true, "")
|
|
// Start proxying
|
|
if h.config.TrafficLogger != nil {
|
|
err = copyTwoWayWithLogger(h.authID, stream, tConn, h.config.TrafficLogger)
|
|
} else {
|
|
// Use the fast path if no traffic logger is set
|
|
err = copyTwoWay(stream, tConn)
|
|
}
|
|
if h.config.EventLogger != nil {
|
|
h.config.EventLogger.TCPError(h.conn.RemoteAddr(), h.authID, reqAddr, err)
|
|
}
|
|
// Cleanup
|
|
_ = tConn.Close()
|
|
_ = stream.Close()
|
|
// Disconnect the client if TrafficLogger requested
|
|
if err == errDisconnect {
|
|
_ = h.conn.CloseWithError(closeErrCodeTrafficLimitReached, "")
|
|
}
|
|
}
|
|
|
|
func (h *h3sHandler) masqHandler(w http.ResponseWriter, r *http.Request) {
|
|
if h.config.MasqHandler != nil {
|
|
h.config.MasqHandler.ServeHTTP(w, r)
|
|
} else {
|
|
// Return 404 for everything
|
|
http.NotFound(w, r)
|
|
}
|
|
}
|
|
|
|
// udpIOImpl is the IO implementation for udpSessionManager with TrafficLogger support
|
|
type udpIOImpl struct {
|
|
Conn quic.Connection
|
|
AuthID string
|
|
TrafficLogger TrafficLogger
|
|
Outbound Outbound
|
|
}
|
|
|
|
func (io *udpIOImpl) ReceiveMessage() (*protocol.UDPMessage, error) {
|
|
for {
|
|
msg, err := io.Conn.ReceiveDatagram(context.Background())
|
|
if err != nil {
|
|
// Connection error, this will stop the session manager
|
|
return nil, err
|
|
}
|
|
udpMsg, err := protocol.ParseUDPMessage(msg)
|
|
if err != nil {
|
|
// Invalid message, this is fine - just wait for the next
|
|
continue
|
|
}
|
|
if io.TrafficLogger != nil {
|
|
ok := io.TrafficLogger.Log(io.AuthID, uint64(len(udpMsg.Data)), 0)
|
|
if !ok {
|
|
// TrafficLogger requested to disconnect the client
|
|
_ = io.Conn.CloseWithError(closeErrCodeTrafficLimitReached, "")
|
|
return nil, errDisconnect
|
|
}
|
|
}
|
|
return udpMsg, nil
|
|
}
|
|
}
|
|
|
|
func (io *udpIOImpl) SendMessage(buf []byte, msg *protocol.UDPMessage) error {
|
|
if io.TrafficLogger != nil {
|
|
ok := io.TrafficLogger.Log(io.AuthID, 0, uint64(len(msg.Data)))
|
|
if !ok {
|
|
// TrafficLogger requested to disconnect the client
|
|
_ = io.Conn.CloseWithError(closeErrCodeTrafficLimitReached, "")
|
|
return errDisconnect
|
|
}
|
|
}
|
|
msgN := msg.Serialize(buf)
|
|
if msgN < 0 {
|
|
// Message larger than buffer, silent drop
|
|
return nil
|
|
}
|
|
return io.Conn.SendDatagram(buf[:msgN])
|
|
}
|
|
|
|
func (io *udpIOImpl) UDP(reqAddr string) (UDPConn, error) {
|
|
return io.Outbound.UDP(reqAddr)
|
|
}
|
|
|
|
type udpEventLoggerImpl struct {
|
|
Conn quic.Connection
|
|
AuthID string
|
|
EventLogger EventLogger
|
|
}
|
|
|
|
func (l *udpEventLoggerImpl) New(sessionID uint32, reqAddr string) {
|
|
if l.EventLogger != nil {
|
|
l.EventLogger.UDPRequest(l.Conn.RemoteAddr(), l.AuthID, sessionID, reqAddr)
|
|
}
|
|
}
|
|
|
|
func (l *udpEventLoggerImpl) Close(sessionID uint32, err error) {
|
|
if l.EventLogger != nil {
|
|
l.EventLogger.UDPError(l.Conn.RemoteAddr(), l.AuthID, sessionID, err)
|
|
}
|
|
}
|