mirror of
https://github.com/refraction-networking/uquic.git
synced 2025-04-04 20:57:36 +03:00
replace locally closed sessions with a dedicated closed session
This commit is contained in:
parent
d689f9a392
commit
22b12f199e
8 changed files with 259 additions and 146 deletions
92
closed_session.go
Normal file
92
closed_session.go
Normal file
|
@ -0,0 +1,92 @@
|
||||||
|
package quic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/lucas-clemente/quic-go/internal/protocol"
|
||||||
|
"github.com/lucas-clemente/quic-go/internal/utils"
|
||||||
|
)
|
||||||
|
|
||||||
|
type closedLocalSession struct {
|
||||||
|
conn connection
|
||||||
|
connClosePacket []byte
|
||||||
|
|
||||||
|
closeOnce sync.Once
|
||||||
|
closeChan chan struct{} // is closed when the session is closed or destroyed
|
||||||
|
|
||||||
|
receivedPackets chan *receivedPacket
|
||||||
|
counter uint64 // number of packets received
|
||||||
|
|
||||||
|
perspective protocol.Perspective
|
||||||
|
|
||||||
|
logger utils.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ packetHandler = &closedLocalSession{}
|
||||||
|
|
||||||
|
// newClosedLocalSession creates a new closedLocalSession and runs it.
|
||||||
|
func newClosedLocalSession(
|
||||||
|
conn connection,
|
||||||
|
connClosePacket []byte,
|
||||||
|
perspective protocol.Perspective,
|
||||||
|
logger utils.Logger,
|
||||||
|
) packetHandler {
|
||||||
|
s := &closedLocalSession{
|
||||||
|
conn: conn,
|
||||||
|
connClosePacket: connClosePacket,
|
||||||
|
perspective: perspective,
|
||||||
|
logger: logger,
|
||||||
|
closeChan: make(chan struct{}),
|
||||||
|
receivedPackets: make(chan *receivedPacket, 64),
|
||||||
|
}
|
||||||
|
go s.run()
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *closedLocalSession) run() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case p := <-s.receivedPackets:
|
||||||
|
s.handlePacketImpl(p)
|
||||||
|
case <-s.closeChan:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *closedLocalSession) handlePacket(p *receivedPacket) {
|
||||||
|
select {
|
||||||
|
case s.receivedPackets <- p:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *closedLocalSession) handlePacketImpl(p *receivedPacket) {
|
||||||
|
s.counter++
|
||||||
|
// exponential backoff
|
||||||
|
// only send a CONNECTION_CLOSE for the 1st, 2nd, 4th, 8th, 16th, ... packet arriving
|
||||||
|
for n := s.counter; n > 1; n = n / 2 {
|
||||||
|
if n%2 != 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.logger.Debugf("Received %d packets after sending CONNECTION_CLOSE. Retransmitting.", s.counter)
|
||||||
|
if err := s.conn.Write(s.connClosePacket); err != nil {
|
||||||
|
s.logger.Debugf("Error retransmitting CONNECTION_CLOSE: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *closedLocalSession) Close() error {
|
||||||
|
s.destroy(nil)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *closedLocalSession) destroy(error) {
|
||||||
|
s.closeOnce.Do(func() {
|
||||||
|
close(s.closeChan)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *closedLocalSession) getPerspective() protocol.Perspective {
|
||||||
|
return s.perspective
|
||||||
|
}
|
53
closed_session_test.go
Normal file
53
closed_session_test.go
Normal file
|
@ -0,0 +1,53 @@
|
||||||
|
package quic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/lucas-clemente/quic-go/internal/protocol"
|
||||||
|
"github.com/lucas-clemente/quic-go/internal/utils"
|
||||||
|
|
||||||
|
. "github.com/onsi/ginkgo"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ = Describe("Closed local session", func() {
|
||||||
|
var (
|
||||||
|
sess packetHandler
|
||||||
|
mconn *mockConnection
|
||||||
|
)
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
mconn = newMockConnection()
|
||||||
|
sess = newClosedLocalSession(mconn, []byte("close"), protocol.PerspectiveClient, utils.DefaultLogger)
|
||||||
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
Eventually(areClosedSessionsRunning).Should(BeFalse())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("tells its perspective", func() {
|
||||||
|
Expect(sess.getPerspective()).To(Equal(protocol.PerspectiveClient))
|
||||||
|
// stop the session
|
||||||
|
Expect(sess.Close()).To(Succeed())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("repeats the packet containing the CONNECTION_CLOSE frame", func() {
|
||||||
|
for i := 1; i <= 20; i++ {
|
||||||
|
sess.handlePacket(&receivedPacket{})
|
||||||
|
if i == 1 || i == 2 || i == 4 || i == 8 || i == 16 {
|
||||||
|
Eventually(mconn.written).Should(Receive(Equal([]byte("close")))) // receive the CONNECTION_CLOSE
|
||||||
|
} else {
|
||||||
|
Consistently(mconn.written, 10*time.Millisecond).Should(HaveLen(0))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// stop the session
|
||||||
|
Expect(sess.Close()).To(Succeed())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("destroys sessions", func() {
|
||||||
|
Expect(areClosedSessionsRunning()).To(BeTrue())
|
||||||
|
sess.destroy(errors.New("destroy"))
|
||||||
|
Eventually(areClosedSessionsRunning).Should(BeFalse())
|
||||||
|
})
|
||||||
|
})
|
|
@ -122,6 +122,18 @@ func (mr *MockPacketHandlerManagerMockRecorder) RemoveResetToken(arg0 interface{
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveResetToken", reflect.TypeOf((*MockPacketHandlerManager)(nil).RemoveResetToken), arg0)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveResetToken", reflect.TypeOf((*MockPacketHandlerManager)(nil).RemoveResetToken), arg0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReplaceWithClosed mocks base method
|
||||||
|
func (m *MockPacketHandlerManager) ReplaceWithClosed(arg0 protocol.ConnectionID, arg1 packetHandler) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
m.ctrl.Call(m, "ReplaceWithClosed", arg0, arg1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReplaceWithClosed indicates an expected call of ReplaceWithClosed
|
||||||
|
func (mr *MockPacketHandlerManagerMockRecorder) ReplaceWithClosed(arg0, arg1 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReplaceWithClosed", reflect.TypeOf((*MockPacketHandlerManager)(nil).ReplaceWithClosed), arg0, arg1)
|
||||||
|
}
|
||||||
|
|
||||||
// Retire mocks base method
|
// Retire mocks base method
|
||||||
func (m *MockPacketHandlerManager) Retire(arg0 protocol.ConnectionID) {
|
func (m *MockPacketHandlerManager) Retire(arg0 protocol.ConnectionID) {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
|
|
|
@ -70,6 +70,18 @@ func (mr *MockSessionRunnerMockRecorder) RemoveResetToken(arg0 interface{}) *gom
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveResetToken", reflect.TypeOf((*MockSessionRunner)(nil).RemoveResetToken), arg0)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveResetToken", reflect.TypeOf((*MockSessionRunner)(nil).RemoveResetToken), arg0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReplaceWithClosed mocks base method
|
||||||
|
func (m *MockSessionRunner) ReplaceWithClosed(arg0 protocol.ConnectionID, arg1 packetHandler) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
m.ctrl.Call(m, "ReplaceWithClosed", arg0, arg1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReplaceWithClosed indicates an expected call of ReplaceWithClosed
|
||||||
|
func (mr *MockSessionRunnerMockRecorder) ReplaceWithClosed(arg0, arg1 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReplaceWithClosed", reflect.TypeOf((*MockSessionRunner)(nil).ReplaceWithClosed), arg0, arg1)
|
||||||
|
}
|
||||||
|
|
||||||
// Retire mocks base method
|
// Retire mocks base method
|
||||||
func (m *MockSessionRunner) Retire(arg0 protocol.ConnectionID) {
|
func (m *MockSessionRunner) Retire(arg0 protocol.ConnectionID) {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
|
|
|
@ -65,18 +65,30 @@ func newPacketHandlerMap(
|
||||||
|
|
||||||
func (h *packetHandlerMap) Add(id protocol.ConnectionID, handler packetHandler) {
|
func (h *packetHandlerMap) Add(id protocol.ConnectionID, handler packetHandler) {
|
||||||
h.mutex.Lock()
|
h.mutex.Lock()
|
||||||
h.handlers[string(id)] = handler
|
h.addLocked(id, handler)
|
||||||
h.mutex.Unlock()
|
h.mutex.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *packetHandlerMap) addLocked(id protocol.ConnectionID, handler packetHandler) {
|
||||||
|
h.handlers[string(id)] = handler
|
||||||
|
}
|
||||||
|
|
||||||
func (h *packetHandlerMap) Remove(id protocol.ConnectionID) {
|
func (h *packetHandlerMap) Remove(id protocol.ConnectionID) {
|
||||||
|
h.mutex.Lock()
|
||||||
h.removeByConnectionIDAsString(string(id))
|
h.removeByConnectionIDAsString(string(id))
|
||||||
|
h.mutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *packetHandlerMap) ReplaceWithClosed(id protocol.ConnectionID, handler packetHandler) {
|
||||||
|
h.mutex.Lock()
|
||||||
|
h.removeByConnectionIDAsString(string(id))
|
||||||
|
h.addLocked(id, handler)
|
||||||
|
h.mutex.Unlock()
|
||||||
|
h.retireByConnectionIDAsString(string(id))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *packetHandlerMap) removeByConnectionIDAsString(id string) {
|
func (h *packetHandlerMap) removeByConnectionIDAsString(id string) {
|
||||||
h.mutex.Lock()
|
|
||||||
delete(h.handlers, id)
|
delete(h.handlers, id)
|
||||||
h.mutex.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *packetHandlerMap) Retire(id protocol.ConnectionID) {
|
func (h *packetHandlerMap) Retire(id protocol.ConnectionID) {
|
||||||
|
@ -85,7 +97,9 @@ func (h *packetHandlerMap) Retire(id protocol.ConnectionID) {
|
||||||
|
|
||||||
func (h *packetHandlerMap) retireByConnectionIDAsString(id string) {
|
func (h *packetHandlerMap) retireByConnectionIDAsString(id string) {
|
||||||
time.AfterFunc(h.deleteRetiredSessionsAfter, func() {
|
time.AfterFunc(h.deleteRetiredSessionsAfter, func() {
|
||||||
|
h.mutex.Lock()
|
||||||
h.removeByConnectionIDAsString(id)
|
h.removeByConnectionIDAsString(id)
|
||||||
|
h.mutex.Unlock()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,7 @@ type packetHandlerManager interface {
|
||||||
Add(protocol.ConnectionID, packetHandler)
|
Add(protocol.ConnectionID, packetHandler)
|
||||||
Retire(protocol.ConnectionID)
|
Retire(protocol.ConnectionID)
|
||||||
Remove(protocol.ConnectionID)
|
Remove(protocol.ConnectionID)
|
||||||
|
ReplaceWithClosed(protocol.ConnectionID, packetHandler)
|
||||||
AddResetToken([16]byte, packetHandler)
|
AddResetToken([16]byte, packetHandler)
|
||||||
RemoveResetToken([16]byte)
|
RemoveResetToken([16]byte)
|
||||||
GetStatelessResetToken(protocol.ConnectionID) [16]byte
|
GetStatelessResetToken(protocol.ConnectionID) [16]byte
|
||||||
|
@ -59,6 +60,7 @@ type quicSession interface {
|
||||||
type sessionRunner interface {
|
type sessionRunner interface {
|
||||||
Retire(protocol.ConnectionID)
|
Retire(protocol.ConnectionID)
|
||||||
Remove(protocol.ConnectionID)
|
Remove(protocol.ConnectionID)
|
||||||
|
ReplaceWithClosed(protocol.ConnectionID, packetHandler)
|
||||||
AddResetToken([16]byte, packetHandler)
|
AddResetToken([16]byte, packetHandler)
|
||||||
RemoveResetToken([16]byte)
|
RemoveResetToken([16]byte)
|
||||||
}
|
}
|
||||||
|
|
40
session.go
40
session.go
|
@ -132,11 +132,8 @@ type session struct {
|
||||||
sendingScheduled chan struct{}
|
sendingScheduled chan struct{}
|
||||||
|
|
||||||
closeOnce sync.Once
|
closeOnce sync.Once
|
||||||
closed utils.AtomicBool
|
|
||||||
// closeChan is used to notify the run loop that it should terminate
|
// closeChan is used to notify the run loop that it should terminate
|
||||||
closeChan chan closeError
|
closeChan chan closeError
|
||||||
connectionClosePacket *packedPacket
|
|
||||||
packetsReceivedAfterClose int
|
|
||||||
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
ctxCancel context.CancelFunc
|
ctxCancel context.CancelFunc
|
||||||
|
@ -487,7 +484,6 @@ runLoop:
|
||||||
}
|
}
|
||||||
|
|
||||||
s.handleCloseError(closeErr)
|
s.handleCloseError(closeErr)
|
||||||
s.closed.Set(true)
|
|
||||||
s.logger.Infof("Connection %s closed.", s.srcConnID)
|
s.logger.Infof("Connection %s closed.", s.srcConnID)
|
||||||
s.cryptoStreamHandler.Close()
|
s.cryptoStreamHandler.Close()
|
||||||
s.sendQueue.Close()
|
s.sendQueue.Close()
|
||||||
|
@ -803,9 +799,6 @@ func (s *session) handleFrame(f wire.Frame, pn protocol.PacketNumber, encLevel p
|
||||||
|
|
||||||
// handlePacket is called by the server with a new packet
|
// handlePacket is called by the server with a new packet
|
||||||
func (s *session) handlePacket(p *receivedPacket) {
|
func (s *session) handlePacket(p *receivedPacket) {
|
||||||
if s.closed.Get() {
|
|
||||||
s.handlePacketAfterClosed(p)
|
|
||||||
}
|
|
||||||
// Discard packets once the amount of queued packets is larger than
|
// Discard packets once the amount of queued packets is larger than
|
||||||
// the channel size, protocol.MaxSessionUnprocessedPackets
|
// the channel size, protocol.MaxSessionUnprocessedPackets
|
||||||
select {
|
select {
|
||||||
|
@ -814,24 +807,6 @@ func (s *session) handlePacket(p *receivedPacket) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *session) handlePacketAfterClosed(p *receivedPacket) {
|
|
||||||
s.packetsReceivedAfterClose++
|
|
||||||
if s.connectionClosePacket == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// exponential backoff
|
|
||||||
// only send a CONNECTION_CLOSE for the 1st, 2nd, 4th, 8th, 16th, ... packet arriving
|
|
||||||
for n := s.packetsReceivedAfterClose; n > 1; n = n / 2 {
|
|
||||||
if n%2 != 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
s.logger.Debugf("Received %d packets after sending CONNECTION_CLOSE. Retransmitting.", s.packetsReceivedAfterClose)
|
|
||||||
if err := s.conn.Write(s.connectionClosePacket.raw); err != nil {
|
|
||||||
s.logger.Debugf("Error retransmitting CONNECTION_CLOSE: %s", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *session) handleConnectionCloseFrame(frame *wire.ConnectionCloseFrame) {
|
func (s *session) handleConnectionCloseFrame(frame *wire.ConnectionCloseFrame) {
|
||||||
var e error
|
var e error
|
||||||
if frame.IsApplicationError {
|
if frame.IsApplicationError {
|
||||||
|
@ -946,7 +921,6 @@ func (s *session) closeLocal(e error) {
|
||||||
} else {
|
} else {
|
||||||
s.logger.Errorf("Closing session with error: %s", e)
|
s.logger.Errorf("Closing session with error: %s", e)
|
||||||
}
|
}
|
||||||
s.sessionRunner.Retire(s.srcConnID)
|
|
||||||
s.closeChan <- closeError{err: e, sendClose: true, remote: false}
|
s.closeChan <- closeError{err: e, sendClose: true, remote: false}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -1019,9 +993,12 @@ func (s *session) handleCloseError(closeErr closeError) {
|
||||||
if closeErr.remote {
|
if closeErr.remote {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := s.sendConnectionClose(quicErr); err != nil {
|
connClosePacket, err := s.sendConnectionClose(quicErr)
|
||||||
|
if err != nil {
|
||||||
s.logger.Debugf("Error sending CONNECTION_CLOSE: %s", err)
|
s.logger.Debugf("Error sending CONNECTION_CLOSE: %s", err)
|
||||||
}
|
}
|
||||||
|
cs := newClosedLocalSession(s.conn, connClosePacket, s.perspective, s.logger)
|
||||||
|
s.sessionRunner.ReplaceWithClosed(s.srcConnID, cs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *session) dropEncryptionLevel(encLevel protocol.EncryptionLevel) {
|
func (s *session) dropEncryptionLevel(encLevel protocol.EncryptionLevel) {
|
||||||
|
@ -1206,7 +1183,7 @@ func (s *session) sendPackedPacket(packet *packedPacket) {
|
||||||
s.sendQueue.Send(packet)
|
s.sendQueue.Send(packet)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *session) sendConnectionClose(quicErr *qerr.QuicError) error {
|
func (s *session) sendConnectionClose(quicErr *qerr.QuicError) ([]byte, error) {
|
||||||
var reason string
|
var reason string
|
||||||
// don't send details of crypto errors
|
// don't send details of crypto errors
|
||||||
if !quicErr.IsCryptoError() {
|
if !quicErr.IsCryptoError() {
|
||||||
|
@ -1217,11 +1194,10 @@ func (s *session) sendConnectionClose(quicErr *qerr.QuicError) error {
|
||||||
ReasonPhrase: reason,
|
ReasonPhrase: reason,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
s.connectionClosePacket = packet
|
|
||||||
s.logPacket(packet)
|
s.logPacket(packet)
|
||||||
return s.conn.Write(packet.raw)
|
return packet.raw, s.conn.Write(packet.raw)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *session) logPacket(packet *packedPacket) {
|
func (s *session) logPacket(packet *packedPacket) {
|
||||||
|
|
174
session_test.go
174
session_test.go
|
@ -64,6 +64,12 @@ func areSessionsRunning() bool {
|
||||||
return strings.Contains(b.String(), "quic-go.(*session).run")
|
return strings.Contains(b.String(), "quic-go.(*session).run")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func areClosedSessionsRunning() bool {
|
||||||
|
var b bytes.Buffer
|
||||||
|
pprof.Lookup("goroutine").WriteTo(&b, 1)
|
||||||
|
return strings.Contains(b.String(), "quic-go.(*closedLocalSession).run")
|
||||||
|
}
|
||||||
|
|
||||||
var _ = Describe("Session", func() {
|
var _ = Describe("Session", func() {
|
||||||
var (
|
var (
|
||||||
sess *session
|
sess *session
|
||||||
|
@ -85,6 +91,13 @@ var _ = Describe("Session", func() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
expectReplaceWithClosed := func() {
|
||||||
|
sessionRunner.EXPECT().ReplaceWithClosed(sess.srcConnID, gomock.Any()).Do(func(_ protocol.ConnectionID, s packetHandler) {
|
||||||
|
Expect(s.Close()).To(Succeed())
|
||||||
|
Eventually(areClosedSessionsRunning).Should(BeFalse())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
Eventually(areSessionsRunning).Should(BeFalse())
|
Eventually(areSessionsRunning).Should(BeFalse())
|
||||||
|
|
||||||
|
@ -404,7 +417,7 @@ var _ = Describe("Session", func() {
|
||||||
|
|
||||||
It("shuts down without error", func() {
|
It("shuts down without error", func() {
|
||||||
streamManager.EXPECT().CloseWithError(qerr.Error(qerr.NoError, ""))
|
streamManager.EXPECT().CloseWithError(qerr.Error(qerr.NoError, ""))
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
expectReplaceWithClosed()
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{raw: []byte("connection close")}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{raw: []byte("connection close")}, nil)
|
||||||
Expect(sess.Close()).To(Succeed())
|
Expect(sess.Close()).To(Succeed())
|
||||||
|
@ -416,7 +429,7 @@ var _ = Describe("Session", func() {
|
||||||
|
|
||||||
It("only closes once", func() {
|
It("only closes once", func() {
|
||||||
streamManager.EXPECT().CloseWithError(qerr.Error(qerr.NoError, ""))
|
streamManager.EXPECT().CloseWithError(qerr.Error(qerr.NoError, ""))
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
expectReplaceWithClosed()
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
Expect(sess.Close()).To(Succeed())
|
Expect(sess.Close()).To(Succeed())
|
||||||
|
@ -429,7 +442,7 @@ var _ = Describe("Session", func() {
|
||||||
It("closes streams with proper error", func() {
|
It("closes streams with proper error", func() {
|
||||||
testErr := errors.New("test error")
|
testErr := errors.New("test error")
|
||||||
streamManager.EXPECT().CloseWithError(qerr.Error(0x1337, testErr.Error()))
|
streamManager.EXPECT().CloseWithError(qerr.Error(0x1337, testErr.Error()))
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
expectReplaceWithClosed()
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
sess.CloseWithError(0x1337, testErr.Error())
|
sess.CloseWithError(0x1337, testErr.Error())
|
||||||
|
@ -460,7 +473,7 @@ var _ = Describe("Session", func() {
|
||||||
|
|
||||||
It("cancels the context when the run loop exists", func() {
|
It("cancels the context when the run loop exists", func() {
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
expectReplaceWithClosed()
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
returned := make(chan struct{})
|
returned := make(chan struct{})
|
||||||
|
@ -475,24 +488,6 @@ var _ = Describe("Session", func() {
|
||||||
sess.Close()
|
sess.Close()
|
||||||
Eventually(returned).Should(BeClosed())
|
Eventually(returned).Should(BeClosed())
|
||||||
})
|
})
|
||||||
|
|
||||||
It("retransmits the CONNECTION_CLOSE packet if packets are arriving late", func() {
|
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
|
||||||
cryptoSetup.EXPECT().Close()
|
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{raw: []byte("foobar")}, nil)
|
|
||||||
sess.Close()
|
|
||||||
Expect(mconn.written).To(Receive(Equal([]byte("foobar")))) // receive the CONNECTION_CLOSE
|
|
||||||
Eventually(sess.Context().Done()).Should(BeClosed())
|
|
||||||
for i := 1; i <= 20; i++ {
|
|
||||||
sess.handlePacket(&receivedPacket{})
|
|
||||||
if i == 1 || i == 2 || i == 4 || i == 8 || i == 16 {
|
|
||||||
Expect(mconn.written).To(Receive(Equal([]byte("foobar")))) // receive the CONNECTION_CLOSE
|
|
||||||
} else {
|
|
||||||
Expect(mconn.written).To(HaveLen(0))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
|
|
||||||
Context("receiving packets", func() {
|
Context("receiving packets", func() {
|
||||||
|
@ -574,7 +569,7 @@ var _ = Describe("Session", func() {
|
||||||
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
||||||
sess.run()
|
sess.run()
|
||||||
}()
|
}()
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
expectReplaceWithClosed()
|
||||||
sess.handlePacket(getPacket(&wire.ExtendedHeader{
|
sess.handlePacket(getPacket(&wire.ExtendedHeader{
|
||||||
Header: wire.Header{DestConnectionID: sess.srcConnID},
|
Header: wire.Header{DestConnectionID: sess.srcConnID},
|
||||||
PacketNumberLen: protocol.PacketNumberLen1,
|
PacketNumberLen: protocol.PacketNumberLen1,
|
||||||
|
@ -599,7 +594,7 @@ var _ = Describe("Session", func() {
|
||||||
Expect(err.(*qerr.QuicError).ErrorCode).To(Equal(qerr.ProtocolViolation))
|
Expect(err.(*qerr.QuicError).ErrorCode).To(Equal(qerr.ProtocolViolation))
|
||||||
close(done)
|
close(done)
|
||||||
}()
|
}()
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
expectReplaceWithClosed()
|
||||||
sess.handlePacket(getPacket(&wire.ExtendedHeader{
|
sess.handlePacket(getPacket(&wire.ExtendedHeader{
|
||||||
Header: wire.Header{DestConnectionID: sess.srcConnID},
|
Header: wire.Header{DestConnectionID: sess.srcConnID},
|
||||||
PacketNumberLen: protocol.PacketNumberLen1,
|
PacketNumberLen: protocol.PacketNumberLen1,
|
||||||
|
@ -619,7 +614,7 @@ var _ = Describe("Session", func() {
|
||||||
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
||||||
runErr <- sess.run()
|
runErr <- sess.run()
|
||||||
}()
|
}()
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
expectReplaceWithClosed()
|
||||||
sess.handlePacket(getPacket(&wire.ExtendedHeader{
|
sess.handlePacket(getPacket(&wire.ExtendedHeader{
|
||||||
Header: wire.Header{DestConnectionID: sess.srcConnID},
|
Header: wire.Header{DestConnectionID: sess.srcConnID},
|
||||||
PacketNumberLen: protocol.PacketNumberLen1,
|
PacketNumberLen: protocol.PacketNumberLen1,
|
||||||
|
@ -646,7 +641,7 @@ var _ = Describe("Session", func() {
|
||||||
Expect(err).To(MatchError("PROTOCOL_VIOLATION: empty packet"))
|
Expect(err).To(MatchError("PROTOCOL_VIOLATION: empty packet"))
|
||||||
close(done)
|
close(done)
|
||||||
}()
|
}()
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
expectReplaceWithClosed()
|
||||||
sess.handlePacket(getPacket(&wire.ExtendedHeader{
|
sess.handlePacket(getPacket(&wire.ExtendedHeader{
|
||||||
Header: wire.Header{DestConnectionID: sess.srcConnID},
|
Header: wire.Header{DestConnectionID: sess.srcConnID},
|
||||||
PacketNumberLen: protocol.PacketNumberLen1,
|
PacketNumberLen: protocol.PacketNumberLen1,
|
||||||
|
@ -847,7 +842,7 @@ var _ = Describe("Session", func() {
|
||||||
AfterEach(func() {
|
AfterEach(func() {
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
expectReplaceWithClosed()
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
sess.Close()
|
sess.Close()
|
||||||
Eventually(sess.Context().Done()).Should(BeClosed())
|
Eventually(sess.Context().Done()).Should(BeClosed())
|
||||||
|
@ -946,6 +941,15 @@ var _ = Describe("Session", func() {
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
// make the go routine return
|
||||||
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
|
expectReplaceWithClosed()
|
||||||
|
cryptoSetup.EXPECT().Close()
|
||||||
|
Expect(sess.Close()).To(Succeed())
|
||||||
|
Eventually(sess.Context().Done()).Should(BeClosed())
|
||||||
|
})
|
||||||
|
|
||||||
It("sends multiple packets one by one immediately", func() {
|
It("sends multiple packets one by one immediately", func() {
|
||||||
sph.EXPECT().SentPacket(gomock.Any()).Times(2)
|
sph.EXPECT().SentPacket(gomock.Any()).Times(2)
|
||||||
sph.EXPECT().ShouldSendNumPackets().Return(1).Times(2)
|
sph.EXPECT().ShouldSendNumPackets().Return(1).Times(2)
|
||||||
|
@ -954,22 +958,14 @@ var _ = Describe("Session", func() {
|
||||||
sph.EXPECT().SendMode().Return(ackhandler.SendAny).Times(2) // allow 2 packets...
|
sph.EXPECT().SendMode().Return(ackhandler.SendAny).Times(2) // allow 2 packets...
|
||||||
packer.EXPECT().PackPacket().Return(getPacket(10), nil)
|
packer.EXPECT().PackPacket().Return(getPacket(10), nil)
|
||||||
packer.EXPECT().PackPacket().Return(getPacket(11), nil)
|
packer.EXPECT().PackPacket().Return(getPacket(11), nil)
|
||||||
done := make(chan struct{})
|
|
||||||
go func() {
|
go func() {
|
||||||
defer GinkgoRecover()
|
defer GinkgoRecover()
|
||||||
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
||||||
sess.run()
|
sess.run()
|
||||||
close(done)
|
|
||||||
}()
|
}()
|
||||||
sess.scheduleSending()
|
sess.scheduleSending()
|
||||||
Eventually(mconn.written).Should(HaveLen(2))
|
Eventually(mconn.written).Should(HaveLen(2))
|
||||||
Consistently(mconn.written).Should(HaveLen(2))
|
Consistently(mconn.written).Should(HaveLen(2))
|
||||||
// make the go routine return
|
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
|
||||||
cryptoSetup.EXPECT().Close()
|
|
||||||
sess.Close()
|
|
||||||
Eventually(done).Should(BeClosed())
|
|
||||||
})
|
})
|
||||||
|
|
||||||
// when becoming congestion limited, at some point the SendMode will change from SendAny to SendAck
|
// when becoming congestion limited, at some point the SendMode will change from SendAny to SendAck
|
||||||
|
@ -981,22 +977,14 @@ var _ = Describe("Session", func() {
|
||||||
sph.EXPECT().SendMode().Return(ackhandler.SendAny)
|
sph.EXPECT().SendMode().Return(ackhandler.SendAny)
|
||||||
sph.EXPECT().SendMode().Return(ackhandler.SendAck)
|
sph.EXPECT().SendMode().Return(ackhandler.SendAck)
|
||||||
packer.EXPECT().PackPacket().Return(getPacket(100), nil)
|
packer.EXPECT().PackPacket().Return(getPacket(100), nil)
|
||||||
done := make(chan struct{})
|
|
||||||
go func() {
|
go func() {
|
||||||
defer GinkgoRecover()
|
defer GinkgoRecover()
|
||||||
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
||||||
sess.run()
|
sess.run()
|
||||||
close(done)
|
|
||||||
}()
|
}()
|
||||||
sess.scheduleSending()
|
sess.scheduleSending()
|
||||||
Eventually(mconn.written).Should(HaveLen(1))
|
Eventually(mconn.written).Should(HaveLen(1))
|
||||||
Consistently(mconn.written).Should(HaveLen(1))
|
Consistently(mconn.written).Should(HaveLen(1))
|
||||||
// make the go routine return
|
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
|
||||||
cryptoSetup.EXPECT().Close()
|
|
||||||
sess.Close()
|
|
||||||
Eventually(done).Should(BeClosed())
|
|
||||||
})
|
})
|
||||||
|
|
||||||
It("paces packets", func() {
|
It("paces packets", func() {
|
||||||
|
@ -1009,23 +997,15 @@ var _ = Describe("Session", func() {
|
||||||
sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes()
|
sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes()
|
||||||
packer.EXPECT().PackPacket().Return(getPacket(100), nil)
|
packer.EXPECT().PackPacket().Return(getPacket(100), nil)
|
||||||
packer.EXPECT().PackPacket().Return(getPacket(101), nil)
|
packer.EXPECT().PackPacket().Return(getPacket(101), nil)
|
||||||
done := make(chan struct{})
|
|
||||||
go func() {
|
go func() {
|
||||||
defer GinkgoRecover()
|
defer GinkgoRecover()
|
||||||
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
||||||
sess.run()
|
sess.run()
|
||||||
close(done)
|
|
||||||
}()
|
}()
|
||||||
sess.scheduleSending()
|
sess.scheduleSending()
|
||||||
Eventually(mconn.written).Should(HaveLen(1))
|
Eventually(mconn.written).Should(HaveLen(1))
|
||||||
Consistently(mconn.written, pacingDelay/2).Should(HaveLen(1))
|
Consistently(mconn.written, pacingDelay/2).Should(HaveLen(1))
|
||||||
Eventually(mconn.written, 2*pacingDelay).Should(HaveLen(2))
|
Eventually(mconn.written, 2*pacingDelay).Should(HaveLen(2))
|
||||||
// make the go routine return
|
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
|
||||||
cryptoSetup.EXPECT().Close()
|
|
||||||
sess.Close()
|
|
||||||
Eventually(done).Should(BeClosed())
|
|
||||||
})
|
})
|
||||||
|
|
||||||
It("sends multiple packets at once", func() {
|
It("sends multiple packets at once", func() {
|
||||||
|
@ -1037,21 +1017,13 @@ var _ = Describe("Session", func() {
|
||||||
packer.EXPECT().PackPacket().Return(getPacket(1000), nil)
|
packer.EXPECT().PackPacket().Return(getPacket(1000), nil)
|
||||||
packer.EXPECT().PackPacket().Return(getPacket(1001), nil)
|
packer.EXPECT().PackPacket().Return(getPacket(1001), nil)
|
||||||
packer.EXPECT().PackPacket().Return(getPacket(1002), nil)
|
packer.EXPECT().PackPacket().Return(getPacket(1002), nil)
|
||||||
done := make(chan struct{})
|
|
||||||
go func() {
|
go func() {
|
||||||
defer GinkgoRecover()
|
defer GinkgoRecover()
|
||||||
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
||||||
sess.run()
|
sess.run()
|
||||||
close(done)
|
|
||||||
}()
|
}()
|
||||||
sess.scheduleSending()
|
sess.scheduleSending()
|
||||||
Eventually(mconn.written).Should(HaveLen(3))
|
Eventually(mconn.written).Should(HaveLen(3))
|
||||||
// make the go routine return
|
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
|
||||||
cryptoSetup.EXPECT().Close()
|
|
||||||
sess.Close()
|
|
||||||
Eventually(done).Should(BeClosed())
|
|
||||||
})
|
})
|
||||||
|
|
||||||
It("doesn't set a pacing timer when there is no data to send", func() {
|
It("doesn't set a pacing timer when there is no data to send", func() {
|
||||||
|
@ -1059,21 +1031,13 @@ var _ = Describe("Session", func() {
|
||||||
sph.EXPECT().ShouldSendNumPackets().Return(1)
|
sph.EXPECT().ShouldSendNumPackets().Return(1)
|
||||||
sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes()
|
sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes()
|
||||||
packer.EXPECT().PackPacket()
|
packer.EXPECT().PackPacket()
|
||||||
done := make(chan struct{})
|
|
||||||
go func() {
|
go func() {
|
||||||
defer GinkgoRecover()
|
defer GinkgoRecover()
|
||||||
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
||||||
sess.run()
|
sess.run()
|
||||||
close(done)
|
|
||||||
}()
|
}()
|
||||||
sess.scheduleSending() // no packet will get sent
|
sess.scheduleSending() // no packet will get sent
|
||||||
Consistently(mconn.written).ShouldNot(Receive())
|
Consistently(mconn.written).ShouldNot(Receive())
|
||||||
// make the go routine return
|
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
|
||||||
cryptoSetup.EXPECT().Close()
|
|
||||||
sess.Close()
|
|
||||||
Eventually(done).Should(BeClosed())
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -1097,7 +1061,7 @@ var _ = Describe("Session", func() {
|
||||||
sess.scheduleSending()
|
sess.scheduleSending()
|
||||||
Eventually(mconn.written).Should(Receive())
|
Eventually(mconn.written).Should(Receive())
|
||||||
// make the go routine return
|
// make the go routine return
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
expectReplaceWithClosed()
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
|
@ -1131,7 +1095,7 @@ var _ = Describe("Session", func() {
|
||||||
Eventually(mconn.written).Should(Receive())
|
Eventually(mconn.written).Should(Receive())
|
||||||
// make sure the go routine returns
|
// make sure the go routine returns
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
expectReplaceWithClosed()
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
sess.Close()
|
sess.Close()
|
||||||
|
@ -1153,9 +1117,9 @@ var _ = Describe("Session", func() {
|
||||||
Consistently(handshakeCtx.Done()).ShouldNot(BeClosed())
|
Consistently(handshakeCtx.Done()).ShouldNot(BeClosed())
|
||||||
close(finishHandshake)
|
close(finishHandshake)
|
||||||
Eventually(handshakeCtx.Done()).Should(BeClosed())
|
Eventually(handshakeCtx.Done()).Should(BeClosed())
|
||||||
//make sure the go routine returns
|
// make sure the go routine returns
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
expectReplaceWithClosed()
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
Expect(sess.Close()).To(Succeed())
|
Expect(sess.Close()).To(Succeed())
|
||||||
|
@ -1165,7 +1129,7 @@ var _ = Describe("Session", func() {
|
||||||
It("doesn't cancel the HandshakeComplete context when the handshake fails", func() {
|
It("doesn't cancel the HandshakeComplete context when the handshake fails", func() {
|
||||||
packer.EXPECT().PackPacket().AnyTimes()
|
packer.EXPECT().PackPacket().AnyTimes()
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
expectReplaceWithClosed()
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -1197,9 +1161,9 @@ var _ = Describe("Session", func() {
|
||||||
sess.run()
|
sess.run()
|
||||||
}()
|
}()
|
||||||
Eventually(done).Should(BeClosed())
|
Eventually(done).Should(BeClosed())
|
||||||
//make sure the go routine returns
|
// make sure the go routine returns
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
expectReplaceWithClosed()
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
Expect(sess.Close()).To(Succeed())
|
Expect(sess.Close()).To(Succeed())
|
||||||
|
@ -1215,7 +1179,7 @@ var _ = Describe("Session", func() {
|
||||||
close(done)
|
close(done)
|
||||||
}()
|
}()
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
expectReplaceWithClosed()
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
Expect(sess.Close()).To(Succeed())
|
Expect(sess.Close()).To(Succeed())
|
||||||
|
@ -1233,7 +1197,7 @@ var _ = Describe("Session", func() {
|
||||||
close(done)
|
close(done)
|
||||||
}()
|
}()
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
expectReplaceWithClosed()
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
Expect(sess.CloseWithError(0x1337, testErr.Error())).To(Succeed())
|
Expect(sess.CloseWithError(0x1337, testErr.Error())).To(Succeed())
|
||||||
|
@ -1250,7 +1214,7 @@ var _ = Describe("Session", func() {
|
||||||
Expect(err.Error()).To(ContainSubstring("transport parameter"))
|
Expect(err.Error()).To(ContainSubstring("transport parameter"))
|
||||||
}()
|
}()
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
expectReplaceWithClosed()
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
sess.processTransportParameters([]byte("invalid"))
|
sess.processTransportParameters([]byte("invalid"))
|
||||||
|
@ -1278,7 +1242,7 @@ var _ = Describe("Session", func() {
|
||||||
|
|
||||||
// make the go routine return
|
// make the go routine return
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
expectReplaceWithClosed()
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
sess.Close()
|
sess.Close()
|
||||||
|
@ -1295,6 +1259,16 @@ var _ = Describe("Session", func() {
|
||||||
sess.peerParams = &handshake.TransportParameters{IdleTimeout: remoteIdleTimeout}
|
sess.peerParams = &handshake.TransportParameters{IdleTimeout: remoteIdleTimeout}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
// make the go routine return
|
||||||
|
expectReplaceWithClosed()
|
||||||
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
|
cryptoSetup.EXPECT().Close()
|
||||||
|
Expect(sess.Close()).To(Succeed())
|
||||||
|
Eventually(sess.Context().Done()).Should(BeClosed())
|
||||||
|
})
|
||||||
|
|
||||||
It("sends a PING as a keep-alive", func() {
|
It("sends a PING as a keep-alive", func() {
|
||||||
sess.handshakeComplete = true
|
sess.handshakeComplete = true
|
||||||
sess.config.KeepAlive = true
|
sess.config.KeepAlive = true
|
||||||
|
@ -1304,63 +1278,36 @@ var _ = Describe("Session", func() {
|
||||||
close(sent)
|
close(sent)
|
||||||
return nil, nil
|
return nil, nil
|
||||||
})
|
})
|
||||||
done := make(chan struct{})
|
|
||||||
go func() {
|
go func() {
|
||||||
defer GinkgoRecover()
|
defer GinkgoRecover()
|
||||||
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
||||||
sess.run()
|
sess.run()
|
||||||
close(done)
|
|
||||||
}()
|
}()
|
||||||
Eventually(sent).Should(BeClosed())
|
Eventually(sent).Should(BeClosed())
|
||||||
// make the go routine return
|
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
|
||||||
cryptoSetup.EXPECT().Close()
|
|
||||||
sess.Close()
|
|
||||||
Eventually(done).Should(BeClosed())
|
|
||||||
})
|
})
|
||||||
|
|
||||||
It("doesn't send a PING packet if keep-alive is disabled", func() {
|
It("doesn't send a PING packet if keep-alive is disabled", func() {
|
||||||
sess.handshakeComplete = true
|
sess.handshakeComplete = true
|
||||||
sess.config.KeepAlive = false
|
sess.config.KeepAlive = false
|
||||||
sess.lastPacketReceivedTime = time.Now().Add(-remoteIdleTimeout / 2)
|
sess.lastPacketReceivedTime = time.Now().Add(-remoteIdleTimeout / 2)
|
||||||
done := make(chan struct{})
|
|
||||||
go func() {
|
go func() {
|
||||||
defer GinkgoRecover()
|
defer GinkgoRecover()
|
||||||
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
||||||
sess.run()
|
sess.run()
|
||||||
close(done)
|
|
||||||
}()
|
}()
|
||||||
Consistently(mconn.written).ShouldNot(Receive())
|
Consistently(mconn.written).ShouldNot(Receive())
|
||||||
// make the go routine return
|
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
|
||||||
cryptoSetup.EXPECT().Close()
|
|
||||||
sess.Close()
|
|
||||||
Eventually(done).Should(BeClosed())
|
|
||||||
})
|
})
|
||||||
|
|
||||||
It("doesn't send a PING if the handshake isn't completed yet", func() {
|
It("doesn't send a PING if the handshake isn't completed yet", func() {
|
||||||
sess.handshakeComplete = false
|
sess.handshakeComplete = false
|
||||||
sess.config.KeepAlive = true
|
sess.config.KeepAlive = true
|
||||||
sess.lastPacketReceivedTime = time.Now().Add(-remoteIdleTimeout / 2)
|
sess.lastPacketReceivedTime = time.Now().Add(-remoteIdleTimeout / 2)
|
||||||
done := make(chan struct{})
|
|
||||||
go func() {
|
go func() {
|
||||||
defer GinkgoRecover()
|
defer GinkgoRecover()
|
||||||
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
||||||
sess.run()
|
sess.run()
|
||||||
close(done)
|
|
||||||
}()
|
}()
|
||||||
Consistently(mconn.written).ShouldNot(Receive())
|
Consistently(mconn.written).ShouldNot(Receive())
|
||||||
// make the go routine return
|
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
|
||||||
cryptoSetup.EXPECT().Close()
|
|
||||||
sess.Close()
|
|
||||||
Eventually(done).Should(BeClosed())
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -1422,7 +1369,7 @@ var _ = Describe("Session", func() {
|
||||||
}()
|
}()
|
||||||
Consistently(sess.Context().Done()).ShouldNot(BeClosed())
|
Consistently(sess.Context().Done()).ShouldNot(BeClosed())
|
||||||
// make the go routine return
|
// make the go routine return
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
expectReplaceWithClosed()
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
sess.Close()
|
sess.Close()
|
||||||
Eventually(sess.Context().Done()).Should(BeClosed())
|
Eventually(sess.Context().Done()).Should(BeClosed())
|
||||||
|
@ -1461,7 +1408,7 @@ var _ = Describe("Session", func() {
|
||||||
Consistently(sess.Context().Done()).ShouldNot(BeClosed())
|
Consistently(sess.Context().Done()).ShouldNot(BeClosed())
|
||||||
// make the go routine return
|
// make the go routine return
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
expectReplaceWithClosed()
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
sess.Close()
|
sess.Close()
|
||||||
Eventually(sess.Context().Done()).Should(BeClosed())
|
Eventually(sess.Context().Done()).Should(BeClosed())
|
||||||
|
@ -1563,6 +1510,13 @@ var _ = Describe("Client Session", func() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
expectReplaceWithClosed := func() {
|
||||||
|
sessionRunner.EXPECT().ReplaceWithClosed(sess.srcConnID, gomock.Any()).Do(func(_ protocol.ConnectionID, s packetHandler) {
|
||||||
|
Expect(s.Close()).To(Succeed())
|
||||||
|
Eventually(areClosedSessionsRunning).Should(BeFalse())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
quicConf = populateClientConfig(&Config{}, true)
|
quicConf = populateClientConfig(&Config{}, true)
|
||||||
})
|
})
|
||||||
|
@ -1625,7 +1579,7 @@ var _ = Describe("Client Session", func() {
|
||||||
}, []byte{0}))).To(BeTrue())
|
}, []byte{0}))).To(BeTrue())
|
||||||
// make sure the go routine returns
|
// make sure the go routine returns
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
expectReplaceWithClosed()
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
Expect(sess.Close()).To(Succeed())
|
Expect(sess.Close()).To(Succeed())
|
||||||
Eventually(sess.Context().Done()).Should(BeClosed())
|
Eventually(sess.Context().Done()).Should(BeClosed())
|
||||||
|
@ -1707,8 +1661,7 @@ var _ = Describe("Client Session", func() {
|
||||||
Expect(err).To(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
Expect(err.Error()).To(ContainSubstring("transport parameter"))
|
Expect(err.Error()).To(ContainSubstring("transport parameter"))
|
||||||
}()
|
}()
|
||||||
// streamManager.EXPECT().CloseWithError(gomock.Any())
|
expectReplaceWithClosed()
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
sess.processTransportParameters([]byte("invalid"))
|
sess.processTransportParameters([]byte("invalid"))
|
||||||
|
@ -1803,7 +1756,6 @@ var _ = Describe("Client Session", func() {
|
||||||
// Illustrates that an injected Initial with an ACK frame for an unsent packet causes
|
// Illustrates that an injected Initial with an ACK frame for an unsent packet causes
|
||||||
// the connection to immediately break down
|
// the connection to immediately break down
|
||||||
It("fails on Initial-level ACK for unsent packet", func() {
|
It("fails on Initial-level ACK for unsent packet", func() {
|
||||||
sessionRunner.EXPECT().Retire(gomock.Any())
|
|
||||||
ackFrame := testutils.ComposeAckFrame(0, 0)
|
ackFrame := testutils.ComposeAckFrame(0, 0)
|
||||||
initialPacket := testutils.ComposeInitialPacket(sess.destConnID, sess.srcConnID, sess.version, sess.destConnID, []wire.Frame{ackFrame})
|
initialPacket := testutils.ComposeInitialPacket(sess.destConnID, sess.srcConnID, sess.version, sess.destConnID, []wire.Frame{ackFrame})
|
||||||
Expect(sess.handlePacketImpl(wrapPacket(initialPacket))).To(BeFalse())
|
Expect(sess.handlePacketImpl(wrapPacket(initialPacket))).To(BeFalse())
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue