feat(wip): test reworks

This commit is contained in:
Toby 2023-07-25 16:11:11 -07:00
parent e48bb98024
commit 1c7cb23389
11 changed files with 899 additions and 331 deletions

View file

@ -2,260 +2,174 @@ package server
import (
"errors"
"fmt"
"testing"
"time"
"github.com/apernet/hysteria/core/internal/protocol"
"github.com/magiconair/properties/assert"
"github.com/stretchr/testify/mock"
"go.uber.org/goleak"
"github.com/apernet/hysteria/core/internal/protocol"
)
var (
errUDPBlocked = errors.New("blocked")
errUDPClosed = errors.New("closed")
)
type echoUDPConnPkt struct {
Data []byte
Addr string
Close bool
}
type echoUDPConn struct {
PktCh chan echoUDPConnPkt
}
func (c *echoUDPConn) ReadFrom(b []byte) (int, string, error) {
pkt := <-c.PktCh
if pkt.Close {
return 0, "", errUDPClosed
}
n := copy(b, pkt.Data)
return n, pkt.Addr, nil
}
func (c *echoUDPConn) WriteTo(b []byte, addr string) (int, error) {
nb := make([]byte, len(b))
copy(nb, b)
c.PktCh <- echoUDPConnPkt{
Data: nb,
Addr: addr,
}
return len(b), nil
}
func (c *echoUDPConn) Close() error {
c.PktCh <- echoUDPConnPkt{
Close: true,
}
return nil
}
type udpMockIO struct {
ReceiveCh <-chan *protocol.UDPMessage
SendCh chan<- *protocol.UDPMessage
UDPClose bool // ReadFrom() returns error immediately
BlockUDP bool // Block UDP connection creation
}
func (io *udpMockIO) ReceiveMessage() (*protocol.UDPMessage, error) {
m := <-io.ReceiveCh
if m == nil {
return nil, errUDPClosed
}
return m, nil
}
func (io *udpMockIO) SendMessage(buf []byte, msg *protocol.UDPMessage) error {
nMsg := *msg
nMsg.Data = make([]byte, len(msg.Data))
copy(nMsg.Data, msg.Data)
io.SendCh <- &nMsg
return nil
}
func (io *udpMockIO) UDP(reqAddr string) (UDPConn, error) {
if io.BlockUDP {
return nil, errUDPBlocked
}
conn := &echoUDPConn{
PktCh: make(chan echoUDPConnPkt, 10),
}
if io.UDPClose {
conn.PktCh <- echoUDPConnPkt{
Close: true,
}
}
return conn, nil
}
type udpMockEventNew struct {
SessionID uint32
ReqAddr string
}
type udpMockEventClose struct {
SessionID uint32
Err error
}
type udpMockEventLogger struct {
NewCh chan<- udpMockEventNew
CloseCh chan<- udpMockEventClose
}
func (l *udpMockEventLogger) New(sessionID uint32, reqAddr string) {
l.NewCh <- udpMockEventNew{sessionID, reqAddr}
}
func (l *udpMockEventLogger) Close(sessionID uint32, err error) {
l.CloseCh <- udpMockEventClose{sessionID, err}
}
func TestUDPSessionManager(t *testing.T) {
msgReceiveCh := make(chan *protocol.UDPMessage, 10)
msgSendCh := make(chan *protocol.UDPMessage, 10)
io := &udpMockIO{
ReceiveCh: msgReceiveCh,
SendCh: msgSendCh,
}
eventNewCh := make(chan udpMockEventNew, 10)
eventCloseCh := make(chan udpMockEventClose, 10)
eventLogger := &udpMockEventLogger{
NewCh: eventNewCh,
CloseCh: eventCloseCh,
}
io := newMockUDPIO(t)
eventLogger := newMockUDPEventLogger(t)
sm := newUDPSessionManager(io, eventLogger, 2*time.Second)
msgCh := make(chan *protocol.UDPMessage, 4)
io.EXPECT().ReceiveMessage().RunAndReturn(func() (*protocol.UDPMessage, error) {
m := <-msgCh
if m == nil {
return nil, errors.New("closed")
}
return m, nil
})
go sm.Run()
t.Run("session creation & timeout", func(t *testing.T) {
ms := []*protocol.UDPMessage{
{
SessionID: 1234,
PacketID: 0,
FragID: 0,
FragCount: 1,
Addr: "example.com:5353",
Data: []byte("hello"),
},
{
SessionID: 5678,
PacketID: 0,
FragID: 0,
FragCount: 1,
Addr: "example.com:9999",
Data: []byte("goodbye"),
},
{
SessionID: 1234,
PacketID: 0,
FragID: 0,
FragCount: 1,
Addr: "example.com:5353",
Data: []byte(" world"),
},
{
SessionID: 5678,
PacketID: 0,
FragID: 0,
FragCount: 1,
Addr: "example.com:9999",
Data: []byte(" girl"),
},
}
for _, m := range ms {
msgReceiveCh <- m
}
// New event order should be consistent with message order
for i := 0; i < 2; i++ {
newEvent := <-eventNewCh
if newEvent.SessionID != ms[i].SessionID || newEvent.ReqAddr != ms[i].Addr {
t.Errorf("unexpected new event value: %d:%s", newEvent.SessionID, newEvent.ReqAddr)
}
}
// Message order is not guaranteed so we use a map
msgMap := make(map[string]bool)
for i := 0; i < 4; i++ {
msg := <-msgSendCh
msgMap[fmt.Sprintf("%d:%s:%s", msg.SessionID, msg.Addr, string(msg.Data))] = true
}
for _, m := range ms {
key := fmt.Sprintf("%d:%s:%s", m.SessionID, m.Addr, string(m.Data))
if !msgMap[key] {
t.Errorf("missing message: %s", key)
}
}
// Timeout check
startTime := time.Now()
closeMap := make(map[uint32]bool)
for i := 0; i < 2; i++ {
closeEvent := <-eventCloseCh
closeMap[closeEvent.SessionID] = true
}
for i := 0; i < 2; i++ {
if !closeMap[ms[i].SessionID] {
t.Errorf("missing close event: %d", ms[i].SessionID)
}
}
dur := time.Since(startTime)
if dur < 2*time.Second || dur > 4*time.Second {
t.Errorf("unexpected timeout duration: %s", dur)
udpReadFunc := func(addr string, ch chan []byte, b []byte) (int, string, error) {
bs := <-ch
if bs == nil {
return 0, "", errors.New("closed")
}
n := copy(b, bs)
return n, addr, nil
}
// Test normal session creation & timeout
msg1 := &protocol.UDPMessage{
SessionID: 1234,
PacketID: 0,
FragID: 0,
FragCount: 1,
Addr: "address1.com:9000",
Data: []byte("hello"),
}
eventLogger.EXPECT().New(msg1.SessionID, msg1.Addr).Return().Once()
udpConn1 := newMockUDPConn(t)
udpConn1Ch := make(chan []byte, 1)
io.EXPECT().UDP(msg1.Addr).Return(udpConn1, nil).Once()
udpConn1.EXPECT().WriteTo(msg1.Data, msg1.Addr).Return(5, nil).Once()
udpConn1.EXPECT().ReadFrom(mock.Anything).RunAndReturn(func(b []byte) (int, string, error) {
return udpReadFunc(msg1.Addr, udpConn1Ch, b)
})
io.EXPECT().SendMessage(mock.Anything, &protocol.UDPMessage{
SessionID: msg1.SessionID,
PacketID: 0,
FragID: 0,
FragCount: 1,
Addr: msg1.Addr,
Data: []byte("hi back"),
}).Return(nil).Once()
msgCh <- msg1
udpConn1Ch <- []byte("hi back")
t.Run("UDP connection close", func(t *testing.T) {
// Close UDP connection immediately after creation
io.UDPClose = true
m := &protocol.UDPMessage{
SessionID: 8888,
PacketID: 0,
FragID: 0,
FragCount: 1,
Addr: "mygod.org:1514",
Data: []byte("goodnight"),
}
msgReceiveCh <- m
// Should have both new and close events immediately
newEvent := <-eventNewCh
if newEvent.SessionID != m.SessionID || newEvent.ReqAddr != m.Addr {
t.Errorf("unexpected new event value: %d:%s", newEvent.SessionID, newEvent.ReqAddr)
}
closeEvent := <-eventCloseCh
if closeEvent.SessionID != m.SessionID || closeEvent.Err != errUDPClosed {
t.Errorf("unexpected close event value: %d:%s", closeEvent.SessionID, closeEvent.Err)
}
msg2 := &protocol.UDPMessage{
SessionID: 5678,
PacketID: 0,
FragID: 0,
FragCount: 1,
Addr: "address2.net:12450",
Data: []byte("how are you"),
}
eventLogger.EXPECT().New(msg2.SessionID, msg2.Addr).Return().Once()
udpConn2 := newMockUDPConn(t)
udpConn2Ch := make(chan []byte, 1)
io.EXPECT().UDP(msg2.Addr).Return(udpConn2, nil).Once()
udpConn2.EXPECT().WriteTo(msg2.Data, msg2.Addr).Return(11, nil).Once()
udpConn2.EXPECT().ReadFrom(mock.Anything).RunAndReturn(func(b []byte) (int, string, error) {
return udpReadFunc(msg2.Addr, udpConn2Ch, b)
})
io.EXPECT().SendMessage(mock.Anything, &protocol.UDPMessage{
SessionID: msg2.SessionID,
PacketID: 0,
FragID: 0,
FragCount: 1,
Addr: msg2.Addr,
Data: []byte("im fine"),
}).Return(nil).Once()
msgCh <- msg2
udpConn2Ch <- []byte("im fine")
t.Run("UDP IO failure", func(t *testing.T) {
// Block UDP connection creation
io.BlockUDP = true
msg3 := &protocol.UDPMessage{
SessionID: 1234,
PacketID: 0,
FragID: 0,
FragCount: 1,
Addr: "address1.com:9000",
Data: []byte("who are you?"),
}
udpConn1.EXPECT().WriteTo(msg3.Data, msg3.Addr).Return(12, nil).Once()
io.EXPECT().SendMessage(mock.Anything, &protocol.UDPMessage{
SessionID: msg3.SessionID,
PacketID: 0,
FragID: 0,
FragCount: 1,
Addr: msg3.Addr,
Data: []byte("im your father"),
}).Return(nil).Once()
msgCh <- msg3
udpConn1Ch <- []byte("im your father")
m := &protocol.UDPMessage{
SessionID: 9999,
PacketID: 0,
FragID: 0,
FragCount: 1,
Addr: "xxx.net:12450",
Data: []byte("nope"),
}
msgReceiveCh <- m
// Should have both new and close events immediately
newEvent := <-eventNewCh
if newEvent.SessionID != m.SessionID || newEvent.ReqAddr != m.Addr {
t.Errorf("unexpected new event value: %d:%s", newEvent.SessionID, newEvent.ReqAddr)
}
closeEvent := <-eventCloseCh
if closeEvent.SessionID != m.SessionID || closeEvent.Err != errUDPBlocked {
t.Errorf("unexpected close event value: %d:%s", closeEvent.SessionID, closeEvent.Err)
}
})
// Make sure timeout works (connections closed & close events emitted)
udpConn1.EXPECT().Close().RunAndReturn(func() error {
close(udpConn1Ch)
return nil
}).Once()
udpConn2.EXPECT().Close().RunAndReturn(func() error {
close(udpConn2Ch)
return nil
}).Once()
eventLogger.EXPECT().Close(msg1.SessionID, nil).Once()
eventLogger.EXPECT().Close(msg2.SessionID, nil).Once()
time.Sleep(3 * time.Second) // Wait for timeout
mock.AssertExpectationsForObjects(t, io, eventLogger, udpConn1, udpConn2)
// Test UDP connection close error propagation
errUDPClosed := errors.New("UDP connection closed")
msg4 := &protocol.UDPMessage{
SessionID: 666,
PacketID: 0,
FragID: 0,
FragCount: 1,
Addr: "oh-no.com:27015",
Data: []byte("dont say bye"),
}
eventLogger.EXPECT().New(msg4.SessionID, msg4.Addr).Return().Once()
udpConn4 := newMockUDPConn(t)
io.EXPECT().UDP(msg4.Addr).Return(udpConn4, nil).Once()
udpConn4.EXPECT().WriteTo(msg4.Data, msg4.Addr).Return(12, nil).Once()
udpConn4.EXPECT().ReadFrom(mock.Anything).Return(0, "", errUDPClosed).Once()
udpConn4.EXPECT().Close().Return(nil).Once()
eventLogger.EXPECT().Close(msg4.SessionID, errUDPClosed).Once()
msgCh <- msg4
time.Sleep(1 * time.Second)
mock.AssertExpectationsForObjects(t, io, eventLogger, udpConn4)
// Test UDP connection creation error propagation
errUDPIO := errors.New("UDP IO error")
msg5 := &protocol.UDPMessage{
SessionID: 777,
PacketID: 0,
FragID: 0,
FragCount: 1,
Addr: "callmemaybe.com:15353",
Data: []byte("babe i miss you"),
}
eventLogger.EXPECT().New(msg5.SessionID, msg5.Addr).Return().Once()
io.EXPECT().UDP(msg5.Addr).Return(nil, errUDPIO).Once()
eventLogger.EXPECT().Close(msg5.SessionID, errUDPIO).Once()
msgCh <- msg5
time.Sleep(1 * time.Second)
mock.AssertExpectationsForObjects(t, io, eventLogger)
// Leak checks
msgReceiveCh <- nil
time.Sleep(1 * time.Second) // Give some time for the goroutines to exit
if sm.Count() != 0 {
t.Error("session count should be 0")
}
close(msgCh) // This will return error from ReceiveMessage(), should stop the session manager
time.Sleep(1 * time.Second) // Wait one more second just to be sure
assert.Equal(t, sm.Count(), 0, "session count should be 0")
goleak.VerifyNone(t)
}