mirror of
https://github.com/refraction-networking/uquic.git
synced 2025-04-04 04:37:36 +03:00
refactor how sessions are deleted
Replacing sessions with different structs representing a closed session doesn't work if a session is using multiple connection IDs.
This commit is contained in:
parent
9e6bff0b98
commit
03483d5e71
13 changed files with 165 additions and 179 deletions
|
@ -3,44 +3,58 @@ package quic
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/lucas-clemente/quic-go/internal/protocol"
|
|
||||||
"github.com/lucas-clemente/quic-go/internal/utils"
|
"github.com/lucas-clemente/quic-go/internal/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type closedSession interface {
|
||||||
|
destroy()
|
||||||
|
}
|
||||||
|
|
||||||
// A closedLocalSession is a session that we closed locally.
|
// A closedLocalSession is a session that we closed locally.
|
||||||
// When receiving packets for such a session, we need to retransmit the packet containing the CONNECTION_CLOSE frame,
|
// When receiving packets for such a session, we need to retransmit the packet containing the CONNECTION_CLOSE frame,
|
||||||
// with an exponential backoff.
|
// with an exponential backoff.
|
||||||
type closedLocalSession struct {
|
type closedBaseSession struct {
|
||||||
conn connection
|
|
||||||
connClosePacket []byte
|
|
||||||
|
|
||||||
closeOnce sync.Once
|
closeOnce sync.Once
|
||||||
closeChan chan struct{} // is closed when the session is closed or destroyed
|
closeChan chan struct{} // is closed when the session is closed or destroyed
|
||||||
|
|
||||||
receivedPackets chan *receivedPacket
|
receivedPackets <-chan *receivedPacket
|
||||||
counter uint64 // number of packets received
|
}
|
||||||
|
|
||||||
perspective protocol.Perspective
|
func (s *closedBaseSession) destroy() {
|
||||||
|
s.closeOnce.Do(func() {
|
||||||
|
close(s.closeChan)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func newClosedBaseSession(receivedPackets <-chan *receivedPacket) closedBaseSession {
|
||||||
|
return closedBaseSession{
|
||||||
|
receivedPackets: receivedPackets,
|
||||||
|
closeChan: make(chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type closedLocalSession struct {
|
||||||
|
closedBaseSession
|
||||||
|
|
||||||
|
conn connection
|
||||||
|
connClosePacket []byte
|
||||||
|
counter uint64 // number of packets received
|
||||||
|
|
||||||
logger utils.Logger
|
logger utils.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ packetHandler = &closedLocalSession{}
|
|
||||||
|
|
||||||
// newClosedLocalSession creates a new closedLocalSession and runs it.
|
// newClosedLocalSession creates a new closedLocalSession and runs it.
|
||||||
func newClosedLocalSession(
|
func newClosedLocalSession(
|
||||||
conn connection,
|
conn connection,
|
||||||
|
receivedPackets <-chan *receivedPacket,
|
||||||
connClosePacket []byte,
|
connClosePacket []byte,
|
||||||
perspective protocol.Perspective,
|
|
||||||
logger utils.Logger,
|
logger utils.Logger,
|
||||||
) packetHandler {
|
) closedSession {
|
||||||
s := &closedLocalSession{
|
s := &closedLocalSession{
|
||||||
conn: conn,
|
closedBaseSession: newClosedBaseSession(receivedPackets),
|
||||||
connClosePacket: connClosePacket,
|
conn: conn,
|
||||||
perspective: perspective,
|
connClosePacket: connClosePacket,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
closeChan: make(chan struct{}),
|
|
||||||
receivedPackets: make(chan *receivedPacket, 64),
|
|
||||||
}
|
}
|
||||||
go s.run()
|
go s.run()
|
||||||
return s
|
return s
|
||||||
|
@ -50,21 +64,14 @@ func (s *closedLocalSession) run() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case p := <-s.receivedPackets:
|
case p := <-s.receivedPackets:
|
||||||
s.handlePacketImpl(p)
|
s.handlePacket(p)
|
||||||
case <-s.closeChan:
|
case <-s.closeChan:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *closedLocalSession) handlePacket(p *receivedPacket) {
|
func (s *closedLocalSession) handlePacket(_ *receivedPacket) {
|
||||||
select {
|
|
||||||
case s.receivedPackets <- p:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *closedLocalSession) handlePacketImpl(_ *receivedPacket) {
|
|
||||||
s.counter++
|
s.counter++
|
||||||
// exponential backoff
|
// exponential backoff
|
||||||
// only send a CONNECTION_CLOSE for the 1st, 2nd, 4th, 8th, 16th, ... packet arriving
|
// only send a CONNECTION_CLOSE for the 1st, 2nd, 4th, 8th, 16th, ... packet arriving
|
||||||
|
@ -79,35 +86,29 @@ func (s *closedLocalSession) handlePacketImpl(_ *receivedPacket) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *closedLocalSession) Close() error {
|
|
||||||
s.destroy(nil)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *closedLocalSession) destroy(error) {
|
|
||||||
s.closeOnce.Do(func() {
|
|
||||||
close(s.closeChan)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *closedLocalSession) getPerspective() protocol.Perspective {
|
|
||||||
return s.perspective
|
|
||||||
}
|
|
||||||
|
|
||||||
// A closedRemoteSession is a session that was closed remotely.
|
// A closedRemoteSession is a session that was closed remotely.
|
||||||
// For such a session, we might receive reordered packets that were sent before the CONNECTION_CLOSE.
|
// For such a session, we might receive reordered packets that were sent before the CONNECTION_CLOSE.
|
||||||
// We can just ignore those packets.
|
// We can just ignore those packets.
|
||||||
type closedRemoteSession struct {
|
type closedRemoteSession struct {
|
||||||
perspective protocol.Perspective
|
closedBaseSession
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ packetHandler = &closedRemoteSession{}
|
var _ closedSession = &closedRemoteSession{}
|
||||||
|
|
||||||
func newClosedRemoteSession(pers protocol.Perspective) packetHandler {
|
func newClosedRemoteSession(receivedPackets <-chan *receivedPacket) closedSession {
|
||||||
return &closedRemoteSession{perspective: pers}
|
s := &closedRemoteSession{
|
||||||
|
closedBaseSession: newClosedBaseSession(receivedPackets),
|
||||||
|
}
|
||||||
|
go s.run()
|
||||||
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *closedRemoteSession) handlePacket(*receivedPacket) {}
|
func (s *closedRemoteSession) run() {
|
||||||
func (s *closedRemoteSession) Close() error { return nil }
|
for {
|
||||||
func (s *closedRemoteSession) destroy(error) {}
|
select {
|
||||||
func (s *closedRemoteSession) getPerspective() protocol.Perspective { return s.perspective }
|
case <-s.receivedPackets: // discard packets
|
||||||
|
case <-s.closeChan:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,40 +1,30 @@
|
||||||
package quic
|
package quic
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/lucas-clemente/quic-go/internal/protocol"
|
|
||||||
"github.com/lucas-clemente/quic-go/internal/utils"
|
"github.com/lucas-clemente/quic-go/internal/utils"
|
||||||
|
|
||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ = Describe("Closed local session", func() {
|
var _ = Describe("closed local session", func() {
|
||||||
var (
|
var (
|
||||||
sess packetHandler
|
sess closedSession
|
||||||
mconn *mockConnection
|
mconn *mockConnection
|
||||||
|
receivedPackets chan *receivedPacket
|
||||||
)
|
)
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
mconn = newMockConnection()
|
mconn = newMockConnection()
|
||||||
sess = newClosedLocalSession(mconn, []byte("close"), protocol.PerspectiveClient, utils.DefaultLogger)
|
receivedPackets = make(chan *receivedPacket, 10)
|
||||||
})
|
sess = newClosedLocalSession(mconn, receivedPackets, []byte("close"), utils.DefaultLogger)
|
||||||
|
|
||||||
AfterEach(func() {
|
|
||||||
Eventually(areClosedSessionsRunning).Should(BeFalse())
|
|
||||||
})
|
|
||||||
|
|
||||||
It("tells its perspective", func() {
|
|
||||||
Expect(sess.getPerspective()).To(Equal(protocol.PerspectiveClient))
|
|
||||||
// stop the session
|
|
||||||
Expect(sess.Close()).To(Succeed())
|
|
||||||
})
|
})
|
||||||
|
|
||||||
It("repeats the packet containing the CONNECTION_CLOSE frame", func() {
|
It("repeats the packet containing the CONNECTION_CLOSE frame", func() {
|
||||||
for i := 1; i <= 20; i++ {
|
for i := 1; i <= 20; i++ {
|
||||||
sess.handlePacket(&receivedPacket{})
|
receivedPackets <- &receivedPacket{}
|
||||||
if i == 1 || i == 2 || i == 4 || i == 8 || i == 16 {
|
if i == 1 || i == 2 || i == 4 || i == 8 || i == 16 {
|
||||||
Eventually(mconn.written).Should(Receive(Equal([]byte("close")))) // receive the CONNECTION_CLOSE
|
Eventually(mconn.written).Should(Receive(Equal([]byte("close")))) // receive the CONNECTION_CLOSE
|
||||||
} else {
|
} else {
|
||||||
|
@ -42,12 +32,40 @@ var _ = Describe("Closed local session", func() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// stop the session
|
// stop the session
|
||||||
Expect(sess.Close()).To(Succeed())
|
sess.destroy()
|
||||||
|
Eventually(areClosedSessionsRunning).Should(BeFalse())
|
||||||
})
|
})
|
||||||
|
|
||||||
It("destroys sessions", func() {
|
It("destroys sessions", func() {
|
||||||
Expect(areClosedSessionsRunning()).To(BeTrue())
|
Expect(areClosedSessionsRunning()).To(BeTrue())
|
||||||
sess.destroy(errors.New("destroy"))
|
sess.destroy()
|
||||||
|
Eventually(areClosedSessionsRunning).Should(BeFalse())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
var _ = Describe("closed remote session", func() {
|
||||||
|
var (
|
||||||
|
sess closedSession
|
||||||
|
receivedPackets chan *receivedPacket
|
||||||
|
)
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
receivedPackets = make(chan *receivedPacket, 10)
|
||||||
|
sess = newClosedRemoteSession(receivedPackets)
|
||||||
|
})
|
||||||
|
|
||||||
|
It("discards packets", func() {
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
receivedPackets <- &receivedPacket{}
|
||||||
|
}
|
||||||
|
// stop the session
|
||||||
|
sess.destroy()
|
||||||
|
Eventually(areClosedSessionsRunning).Should(BeFalse())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("destroys sessions", func() {
|
||||||
|
Expect(areClosedSessionsRunning()).To(BeTrue())
|
||||||
|
sess.destroy()
|
||||||
Eventually(areClosedSessionsRunning).Should(BeFalse())
|
Eventually(areClosedSessionsRunning).Should(BeFalse())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
|
@ -122,18 +122,6 @@ func (mr *MockPacketHandlerManagerMockRecorder) RemoveResetToken(arg0 interface{
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveResetToken", reflect.TypeOf((*MockPacketHandlerManager)(nil).RemoveResetToken), arg0)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveResetToken", reflect.TypeOf((*MockPacketHandlerManager)(nil).RemoveResetToken), arg0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReplaceWithClosed mocks base method
|
|
||||||
func (m *MockPacketHandlerManager) ReplaceWithClosed(arg0 protocol.ConnectionID, arg1 packetHandler) {
|
|
||||||
m.ctrl.T.Helper()
|
|
||||||
m.ctrl.Call(m, "ReplaceWithClosed", arg0, arg1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReplaceWithClosed indicates an expected call of ReplaceWithClosed
|
|
||||||
func (mr *MockPacketHandlerManagerMockRecorder) ReplaceWithClosed(arg0, arg1 interface{}) *gomock.Call {
|
|
||||||
mr.mock.ctrl.T.Helper()
|
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReplaceWithClosed", reflect.TypeOf((*MockPacketHandlerManager)(nil).ReplaceWithClosed), arg0, arg1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Retire mocks base method
|
// Retire mocks base method
|
||||||
func (m *MockPacketHandlerManager) Retire(arg0 protocol.ConnectionID) {
|
func (m *MockPacketHandlerManager) Retire(arg0 protocol.ConnectionID) {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
|
|
|
@ -70,18 +70,6 @@ func (mr *MockSessionRunnerMockRecorder) RemoveResetToken(arg0 interface{}) *gom
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveResetToken", reflect.TypeOf((*MockSessionRunner)(nil).RemoveResetToken), arg0)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveResetToken", reflect.TypeOf((*MockSessionRunner)(nil).RemoveResetToken), arg0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReplaceWithClosed mocks base method
|
|
||||||
func (m *MockSessionRunner) ReplaceWithClosed(arg0 protocol.ConnectionID, arg1 packetHandler) {
|
|
||||||
m.ctrl.T.Helper()
|
|
||||||
m.ctrl.Call(m, "ReplaceWithClosed", arg0, arg1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReplaceWithClosed indicates an expected call of ReplaceWithClosed
|
|
||||||
func (mr *MockSessionRunnerMockRecorder) ReplaceWithClosed(arg0, arg1 interface{}) *gomock.Call {
|
|
||||||
mr.mock.ctrl.T.Helper()
|
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReplaceWithClosed", reflect.TypeOf((*MockSessionRunner)(nil).ReplaceWithClosed), arg0, arg1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Retire mocks base method
|
// Retire mocks base method
|
||||||
func (m *MockSessionRunner) Retire(arg0 protocol.ConnectionID) {
|
func (m *MockSessionRunner) Retire(arg0 protocol.ConnectionID) {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
|
|
|
@ -65,12 +65,8 @@ func newPacketHandlerMap(
|
||||||
|
|
||||||
func (h *packetHandlerMap) Add(id protocol.ConnectionID, handler packetHandler) {
|
func (h *packetHandlerMap) Add(id protocol.ConnectionID, handler packetHandler) {
|
||||||
h.mutex.Lock()
|
h.mutex.Lock()
|
||||||
h.addLocked(id, handler)
|
|
||||||
h.mutex.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *packetHandlerMap) addLocked(id protocol.ConnectionID, handler packetHandler) {
|
|
||||||
h.handlers[string(id)] = handler
|
h.handlers[string(id)] = handler
|
||||||
|
h.mutex.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *packetHandlerMap) Remove(id protocol.ConnectionID) {
|
func (h *packetHandlerMap) Remove(id protocol.ConnectionID) {
|
||||||
|
@ -79,14 +75,6 @@ func (h *packetHandlerMap) Remove(id protocol.ConnectionID) {
|
||||||
h.mutex.Unlock()
|
h.mutex.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *packetHandlerMap) ReplaceWithClosed(id protocol.ConnectionID, handler packetHandler) {
|
|
||||||
h.mutex.Lock()
|
|
||||||
h.removeByConnectionIDAsString(string(id))
|
|
||||||
h.addLocked(id, handler)
|
|
||||||
h.mutex.Unlock()
|
|
||||||
h.retireByConnectionIDAsString(string(id))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *packetHandlerMap) removeByConnectionIDAsString(id string) {
|
func (h *packetHandlerMap) removeByConnectionIDAsString(id string) {
|
||||||
delete(h.handlers, id)
|
delete(h.handlers, id)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,9 @@
|
||||||
package quic
|
package quic
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"runtime/pprof"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/golang/mock/gomock"
|
"github.com/golang/mock/gomock"
|
||||||
|
@ -24,6 +27,21 @@ var _ = BeforeEach(func() {
|
||||||
connMuxerOnce = *new(sync.Once)
|
connMuxerOnce = *new(sync.Once)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
func areSessionsRunning() bool {
|
||||||
|
var b bytes.Buffer
|
||||||
|
pprof.Lookup("goroutine").WriteTo(&b, 1)
|
||||||
|
return strings.Contains(b.String(), "quic-go.(*session).run")
|
||||||
|
}
|
||||||
|
|
||||||
|
func areClosedSessionsRunning() bool {
|
||||||
|
var b bytes.Buffer
|
||||||
|
pprof.Lookup("goroutine").WriteTo(&b, 1)
|
||||||
|
return strings.Contains(b.String(), "quic-go.(*closedLocalSession).run") ||
|
||||||
|
strings.Contains(b.String(), "quic-go.(*closedRemoteSession).run")
|
||||||
|
}
|
||||||
|
|
||||||
var _ = AfterEach(func() {
|
var _ = AfterEach(func() {
|
||||||
mockCtrl.Finish()
|
mockCtrl.Finish()
|
||||||
|
Eventually(areSessionsRunning).Should(BeFalse())
|
||||||
|
Eventually(areClosedSessionsRunning).Should(BeFalse())
|
||||||
})
|
})
|
||||||
|
|
|
@ -37,7 +37,6 @@ type packetHandlerManager interface {
|
||||||
Add(protocol.ConnectionID, packetHandler)
|
Add(protocol.ConnectionID, packetHandler)
|
||||||
Retire(protocol.ConnectionID)
|
Retire(protocol.ConnectionID)
|
||||||
Remove(protocol.ConnectionID)
|
Remove(protocol.ConnectionID)
|
||||||
ReplaceWithClosed(protocol.ConnectionID, packetHandler)
|
|
||||||
AddResetToken([16]byte, packetHandler)
|
AddResetToken([16]byte, packetHandler)
|
||||||
RemoveResetToken([16]byte)
|
RemoveResetToken([16]byte)
|
||||||
GetStatelessResetToken(protocol.ConnectionID) [16]byte
|
GetStatelessResetToken(protocol.ConnectionID) [16]byte
|
||||||
|
@ -60,7 +59,6 @@ type quicSession interface {
|
||||||
type sessionRunner interface {
|
type sessionRunner interface {
|
||||||
Retire(protocol.ConnectionID)
|
Retire(protocol.ConnectionID)
|
||||||
Remove(protocol.ConnectionID)
|
Remove(protocol.ConnectionID)
|
||||||
ReplaceWithClosed(protocol.ConnectionID, packetHandler)
|
|
||||||
AddResetToken([16]byte, packetHandler)
|
AddResetToken([16]byte, packetHandler)
|
||||||
RemoveResetToken([16]byte)
|
RemoveResetToken([16]byte)
|
||||||
}
|
}
|
||||||
|
|
27
session.go
27
session.go
|
@ -87,7 +87,7 @@ func (r *handshakeRunner) OnHandshakeComplete() { r.onHandshakeC
|
||||||
type closeError struct {
|
type closeError struct {
|
||||||
err error
|
err error
|
||||||
remote bool
|
remote bool
|
||||||
sendClose bool
|
immediate bool
|
||||||
}
|
}
|
||||||
|
|
||||||
var errCloseForRecreating = errors.New("closing session in order to recreate it")
|
var errCloseForRecreating = errors.New("closing session in order to recreate it")
|
||||||
|
@ -131,7 +131,9 @@ type session struct {
|
||||||
receivedPackets chan *receivedPacket
|
receivedPackets chan *receivedPacket
|
||||||
sendingScheduled chan struct{}
|
sendingScheduled chan struct{}
|
||||||
|
|
||||||
closeOnce sync.Once
|
closeOnce sync.Once
|
||||||
|
closedSessionMutex sync.Mutex
|
||||||
|
closedSession closedSession
|
||||||
// closeChan is used to notify the run loop that it should terminate
|
// closeChan is used to notify the run loop that it should terminate
|
||||||
closeChan chan closeError
|
closeChan chan closeError
|
||||||
|
|
||||||
|
@ -919,12 +921,17 @@ func (s *session) closeLocal(e error) {
|
||||||
} else {
|
} else {
|
||||||
s.logger.Errorf("Closing session with error: %s", e)
|
s.logger.Errorf("Closing session with error: %s", e)
|
||||||
}
|
}
|
||||||
s.closeChan <- closeError{err: e, sendClose: true, remote: false}
|
s.closeChan <- closeError{err: e, remote: false}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// destroy closes the session without sending the error on the wire
|
// destroy closes the session without sending the error on the wire
|
||||||
func (s *session) destroy(e error) {
|
func (s *session) destroy(e error) {
|
||||||
|
s.closedSessionMutex.Lock()
|
||||||
|
if s.closedSession != nil {
|
||||||
|
s.closedSession.destroy()
|
||||||
|
}
|
||||||
|
s.closedSessionMutex.Unlock()
|
||||||
s.destroyImpl(e)
|
s.destroyImpl(e)
|
||||||
<-s.ctx.Done()
|
<-s.ctx.Done()
|
||||||
}
|
}
|
||||||
|
@ -937,7 +944,7 @@ func (s *session) destroyImpl(e error) {
|
||||||
s.logger.Errorf("Destroying session %s with error: %s", s.destConnID, e)
|
s.logger.Errorf("Destroying session %s with error: %s", s.destConnID, e)
|
||||||
}
|
}
|
||||||
s.sessionRunner.Remove(s.srcConnID)
|
s.sessionRunner.Remove(s.srcConnID)
|
||||||
s.closeChan <- closeError{err: e, sendClose: false, remote: false}
|
s.closeChan <- closeError{err: e, immediate: true, remote: false}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -952,7 +959,6 @@ func (s *session) closeForRecreating() protocol.PacketNumber {
|
||||||
func (s *session) closeRemote(e error) {
|
func (s *session) closeRemote(e error) {
|
||||||
s.closeOnce.Do(func() {
|
s.closeOnce.Do(func() {
|
||||||
s.logger.Errorf("Peer closed session with error: %s", e)
|
s.logger.Errorf("Peer closed session with error: %s", e)
|
||||||
s.sessionRunner.ReplaceWithClosed(s.srcConnID, newClosedRemoteSession(s.perspective))
|
|
||||||
s.closeChan <- closeError{err: e, remote: true}
|
s.closeChan <- closeError{err: e, remote: true}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -984,19 +990,24 @@ func (s *session) handleCloseError(closeErr closeError) {
|
||||||
|
|
||||||
s.streamsMap.CloseWithError(quicErr)
|
s.streamsMap.CloseWithError(quicErr)
|
||||||
|
|
||||||
if !closeErr.sendClose {
|
if closeErr.immediate {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
s.sessionRunner.Retire(s.srcConnID)
|
||||||
// If this is a remote close we're done here
|
// If this is a remote close we're done here
|
||||||
if closeErr.remote {
|
if closeErr.remote {
|
||||||
|
s.closedSessionMutex.Lock()
|
||||||
|
s.closedSession = newClosedRemoteSession(s.receivedPackets)
|
||||||
|
s.closedSessionMutex.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
connClosePacket, err := s.sendConnectionClose(quicErr)
|
connClosePacket, err := s.sendConnectionClose(quicErr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.Debugf("Error sending CONNECTION_CLOSE: %s", err)
|
s.logger.Debugf("Error sending CONNECTION_CLOSE: %s", err)
|
||||||
}
|
}
|
||||||
cs := newClosedLocalSession(s.conn, connClosePacket, s.perspective, s.logger)
|
s.closedSessionMutex.Lock()
|
||||||
s.sessionRunner.ReplaceWithClosed(s.srcConnID, cs)
|
s.closedSession = newClosedLocalSession(s.conn, s.receivedPackets, connClosePacket, s.logger)
|
||||||
|
s.closedSessionMutex.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *session) dropEncryptionLevel(encLevel protocol.EncryptionLevel) {
|
func (s *session) dropEncryptionLevel(encLevel protocol.EncryptionLevel) {
|
||||||
|
|
|
@ -7,8 +7,6 @@ import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"errors"
|
"errors"
|
||||||
"net"
|
"net"
|
||||||
"runtime/pprof"
|
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
|
@ -58,18 +56,6 @@ func (m *mockConnection) LocalAddr() net.Addr { return m.localAddr }
|
||||||
func (m *mockConnection) RemoteAddr() net.Addr { return m.remoteAddr }
|
func (m *mockConnection) RemoteAddr() net.Addr { return m.remoteAddr }
|
||||||
func (*mockConnection) Close() error { panic("not implemented") }
|
func (*mockConnection) Close() error { panic("not implemented") }
|
||||||
|
|
||||||
func areSessionsRunning() bool {
|
|
||||||
var b bytes.Buffer
|
|
||||||
pprof.Lookup("goroutine").WriteTo(&b, 1)
|
|
||||||
return strings.Contains(b.String(), "quic-go.(*session).run")
|
|
||||||
}
|
|
||||||
|
|
||||||
func areClosedSessionsRunning() bool {
|
|
||||||
var b bytes.Buffer
|
|
||||||
pprof.Lookup("goroutine").WriteTo(&b, 1)
|
|
||||||
return strings.Contains(b.String(), "quic-go.(*closedLocalSession).run")
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ = Describe("Session", func() {
|
var _ = Describe("Session", func() {
|
||||||
var (
|
var (
|
||||||
sess *session
|
sess *session
|
||||||
|
@ -91,17 +77,7 @@ var _ = Describe("Session", func() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
expectReplaceWithClosed := func() {
|
|
||||||
sessionRunner.EXPECT().ReplaceWithClosed(sess.srcConnID, gomock.Any()).Do(func(_ protocol.ConnectionID, s packetHandler) {
|
|
||||||
Expect(s).To(BeAssignableToTypeOf(&closedLocalSession{}))
|
|
||||||
Expect(s.Close()).To(Succeed())
|
|
||||||
Eventually(areClosedSessionsRunning).Should(BeFalse())
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
Eventually(areSessionsRunning).Should(BeFalse())
|
|
||||||
|
|
||||||
sessionRunner = NewMockSessionRunner(mockCtrl)
|
sessionRunner = NewMockSessionRunner(mockCtrl)
|
||||||
mconn = newMockConnection()
|
mconn = newMockConnection()
|
||||||
tokenGenerator, err := handshake.NewTokenGenerator()
|
tokenGenerator, err := handshake.NewTokenGenerator()
|
||||||
|
@ -131,7 +107,9 @@ var _ = Describe("Session", func() {
|
||||||
})
|
})
|
||||||
|
|
||||||
AfterEach(func() {
|
AfterEach(func() {
|
||||||
Eventually(areSessionsRunning).Should(BeFalse())
|
if sess.closedSession != nil {
|
||||||
|
sess.closedSession.destroy()
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
Context("frame handling", func() {
|
Context("frame handling", func() {
|
||||||
|
@ -348,9 +326,7 @@ var _ = Describe("Session", func() {
|
||||||
It("handles CONNECTION_CLOSE frames, with a transport error code", func() {
|
It("handles CONNECTION_CLOSE frames, with a transport error code", func() {
|
||||||
testErr := qerr.Error(qerr.StreamLimitError, "foobar")
|
testErr := qerr.Error(qerr.StreamLimitError, "foobar")
|
||||||
streamManager.EXPECT().CloseWithError(testErr)
|
streamManager.EXPECT().CloseWithError(testErr)
|
||||||
sessionRunner.EXPECT().ReplaceWithClosed(sess.srcConnID, gomock.Any()).Do(func(_ protocol.ConnectionID, s packetHandler) {
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
Expect(s).To(BeAssignableToTypeOf(&closedRemoteSession{}))
|
|
||||||
})
|
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -369,9 +345,7 @@ var _ = Describe("Session", func() {
|
||||||
It("handles CONNECTION_CLOSE frames, with an application error code", func() {
|
It("handles CONNECTION_CLOSE frames, with an application error code", func() {
|
||||||
testErr := qerr.ApplicationError(0x1337, "foobar")
|
testErr := qerr.ApplicationError(0x1337, "foobar")
|
||||||
streamManager.EXPECT().CloseWithError(testErr)
|
streamManager.EXPECT().CloseWithError(testErr)
|
||||||
sessionRunner.EXPECT().ReplaceWithClosed(sess.srcConnID, gomock.Any()).Do(func(_ protocol.ConnectionID, s packetHandler) {
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
Expect(s).To(BeAssignableToTypeOf(&closedRemoteSession{}))
|
|
||||||
})
|
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -419,7 +393,7 @@ var _ = Describe("Session", func() {
|
||||||
|
|
||||||
It("shuts down without error", func() {
|
It("shuts down without error", func() {
|
||||||
streamManager.EXPECT().CloseWithError(qerr.Error(qerr.NoError, ""))
|
streamManager.EXPECT().CloseWithError(qerr.Error(qerr.NoError, ""))
|
||||||
expectReplaceWithClosed()
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{raw: []byte("connection close")}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{raw: []byte("connection close")}, nil)
|
||||||
Expect(sess.Close()).To(Succeed())
|
Expect(sess.Close()).To(Succeed())
|
||||||
|
@ -431,7 +405,7 @@ var _ = Describe("Session", func() {
|
||||||
|
|
||||||
It("only closes once", func() {
|
It("only closes once", func() {
|
||||||
streamManager.EXPECT().CloseWithError(qerr.Error(qerr.NoError, ""))
|
streamManager.EXPECT().CloseWithError(qerr.Error(qerr.NoError, ""))
|
||||||
expectReplaceWithClosed()
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
Expect(sess.Close()).To(Succeed())
|
Expect(sess.Close()).To(Succeed())
|
||||||
|
@ -444,7 +418,7 @@ var _ = Describe("Session", func() {
|
||||||
It("closes streams with proper error", func() {
|
It("closes streams with proper error", func() {
|
||||||
testErr := errors.New("test error")
|
testErr := errors.New("test error")
|
||||||
streamManager.EXPECT().CloseWithError(qerr.Error(0x1337, testErr.Error()))
|
streamManager.EXPECT().CloseWithError(qerr.Error(0x1337, testErr.Error()))
|
||||||
expectReplaceWithClosed()
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
sess.CloseWithError(0x1337, testErr.Error())
|
sess.CloseWithError(0x1337, testErr.Error())
|
||||||
|
@ -475,7 +449,7 @@ var _ = Describe("Session", func() {
|
||||||
|
|
||||||
It("cancels the context when the run loop exists", func() {
|
It("cancels the context when the run loop exists", func() {
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
expectReplaceWithClosed()
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
returned := make(chan struct{})
|
returned := make(chan struct{})
|
||||||
|
@ -571,7 +545,7 @@ var _ = Describe("Session", func() {
|
||||||
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
||||||
sess.run()
|
sess.run()
|
||||||
}()
|
}()
|
||||||
expectReplaceWithClosed()
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
sess.handlePacket(getPacket(&wire.ExtendedHeader{
|
sess.handlePacket(getPacket(&wire.ExtendedHeader{
|
||||||
Header: wire.Header{DestConnectionID: sess.srcConnID},
|
Header: wire.Header{DestConnectionID: sess.srcConnID},
|
||||||
PacketNumberLen: protocol.PacketNumberLen1,
|
PacketNumberLen: protocol.PacketNumberLen1,
|
||||||
|
@ -596,7 +570,7 @@ var _ = Describe("Session", func() {
|
||||||
Expect(err.(*qerr.QuicError).ErrorCode).To(Equal(qerr.ProtocolViolation))
|
Expect(err.(*qerr.QuicError).ErrorCode).To(Equal(qerr.ProtocolViolation))
|
||||||
close(done)
|
close(done)
|
||||||
}()
|
}()
|
||||||
expectReplaceWithClosed()
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
sess.handlePacket(getPacket(&wire.ExtendedHeader{
|
sess.handlePacket(getPacket(&wire.ExtendedHeader{
|
||||||
Header: wire.Header{DestConnectionID: sess.srcConnID},
|
Header: wire.Header{DestConnectionID: sess.srcConnID},
|
||||||
PacketNumberLen: protocol.PacketNumberLen1,
|
PacketNumberLen: protocol.PacketNumberLen1,
|
||||||
|
@ -616,7 +590,7 @@ var _ = Describe("Session", func() {
|
||||||
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
||||||
runErr <- sess.run()
|
runErr <- sess.run()
|
||||||
}()
|
}()
|
||||||
expectReplaceWithClosed()
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
sess.handlePacket(getPacket(&wire.ExtendedHeader{
|
sess.handlePacket(getPacket(&wire.ExtendedHeader{
|
||||||
Header: wire.Header{DestConnectionID: sess.srcConnID},
|
Header: wire.Header{DestConnectionID: sess.srcConnID},
|
||||||
PacketNumberLen: protocol.PacketNumberLen1,
|
PacketNumberLen: protocol.PacketNumberLen1,
|
||||||
|
@ -643,7 +617,7 @@ var _ = Describe("Session", func() {
|
||||||
Expect(err).To(MatchError("PROTOCOL_VIOLATION: empty packet"))
|
Expect(err).To(MatchError("PROTOCOL_VIOLATION: empty packet"))
|
||||||
close(done)
|
close(done)
|
||||||
}()
|
}()
|
||||||
expectReplaceWithClosed()
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
sess.handlePacket(getPacket(&wire.ExtendedHeader{
|
sess.handlePacket(getPacket(&wire.ExtendedHeader{
|
||||||
Header: wire.Header{DestConnectionID: sess.srcConnID},
|
Header: wire.Header{DestConnectionID: sess.srcConnID},
|
||||||
PacketNumberLen: protocol.PacketNumberLen1,
|
PacketNumberLen: protocol.PacketNumberLen1,
|
||||||
|
@ -844,7 +818,7 @@ var _ = Describe("Session", func() {
|
||||||
AfterEach(func() {
|
AfterEach(func() {
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
expectReplaceWithClosed()
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
sess.Close()
|
sess.Close()
|
||||||
Eventually(sess.Context().Done()).Should(BeClosed())
|
Eventually(sess.Context().Done()).Should(BeClosed())
|
||||||
|
@ -946,7 +920,7 @@ var _ = Describe("Session", func() {
|
||||||
AfterEach(func() {
|
AfterEach(func() {
|
||||||
// make the go routine return
|
// make the go routine return
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
expectReplaceWithClosed()
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
Expect(sess.Close()).To(Succeed())
|
Expect(sess.Close()).To(Succeed())
|
||||||
Eventually(sess.Context().Done()).Should(BeClosed())
|
Eventually(sess.Context().Done()).Should(BeClosed())
|
||||||
|
@ -1063,7 +1037,7 @@ var _ = Describe("Session", func() {
|
||||||
sess.scheduleSending()
|
sess.scheduleSending()
|
||||||
Eventually(mconn.written).Should(Receive())
|
Eventually(mconn.written).Should(Receive())
|
||||||
// make the go routine return
|
// make the go routine return
|
||||||
expectReplaceWithClosed()
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
|
@ -1097,7 +1071,7 @@ var _ = Describe("Session", func() {
|
||||||
Eventually(mconn.written).Should(Receive())
|
Eventually(mconn.written).Should(Receive())
|
||||||
// make sure the go routine returns
|
// make sure the go routine returns
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
expectReplaceWithClosed()
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
sess.Close()
|
sess.Close()
|
||||||
|
@ -1121,7 +1095,7 @@ var _ = Describe("Session", func() {
|
||||||
Eventually(handshakeCtx.Done()).Should(BeClosed())
|
Eventually(handshakeCtx.Done()).Should(BeClosed())
|
||||||
// make sure the go routine returns
|
// make sure the go routine returns
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
expectReplaceWithClosed()
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
Expect(sess.Close()).To(Succeed())
|
Expect(sess.Close()).To(Succeed())
|
||||||
|
@ -1131,7 +1105,7 @@ var _ = Describe("Session", func() {
|
||||||
It("doesn't cancel the HandshakeComplete context when the handshake fails", func() {
|
It("doesn't cancel the HandshakeComplete context when the handshake fails", func() {
|
||||||
packer.EXPECT().PackPacket().AnyTimes()
|
packer.EXPECT().PackPacket().AnyTimes()
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
expectReplaceWithClosed()
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -1165,7 +1139,7 @@ var _ = Describe("Session", func() {
|
||||||
Eventually(done).Should(BeClosed())
|
Eventually(done).Should(BeClosed())
|
||||||
// make sure the go routine returns
|
// make sure the go routine returns
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
expectReplaceWithClosed()
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
Expect(sess.Close()).To(Succeed())
|
Expect(sess.Close()).To(Succeed())
|
||||||
|
@ -1181,7 +1155,7 @@ var _ = Describe("Session", func() {
|
||||||
close(done)
|
close(done)
|
||||||
}()
|
}()
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
expectReplaceWithClosed()
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
Expect(sess.Close()).To(Succeed())
|
Expect(sess.Close()).To(Succeed())
|
||||||
|
@ -1199,7 +1173,7 @@ var _ = Describe("Session", func() {
|
||||||
close(done)
|
close(done)
|
||||||
}()
|
}()
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
expectReplaceWithClosed()
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
Expect(sess.CloseWithError(0x1337, testErr.Error())).To(Succeed())
|
Expect(sess.CloseWithError(0x1337, testErr.Error())).To(Succeed())
|
||||||
|
@ -1216,7 +1190,7 @@ var _ = Describe("Session", func() {
|
||||||
Expect(err.Error()).To(ContainSubstring("transport parameter"))
|
Expect(err.Error()).To(ContainSubstring("transport parameter"))
|
||||||
}()
|
}()
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
expectReplaceWithClosed()
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
sess.processTransportParameters([]byte("invalid"))
|
sess.processTransportParameters([]byte("invalid"))
|
||||||
|
@ -1244,7 +1218,7 @@ var _ = Describe("Session", func() {
|
||||||
|
|
||||||
// make the go routine return
|
// make the go routine return
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
expectReplaceWithClosed()
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
sess.Close()
|
sess.Close()
|
||||||
|
@ -1263,7 +1237,7 @@ var _ = Describe("Session", func() {
|
||||||
|
|
||||||
AfterEach(func() {
|
AfterEach(func() {
|
||||||
// make the go routine return
|
// make the go routine return
|
||||||
expectReplaceWithClosed()
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
|
@ -1371,7 +1345,7 @@ var _ = Describe("Session", func() {
|
||||||
}()
|
}()
|
||||||
Consistently(sess.Context().Done()).ShouldNot(BeClosed())
|
Consistently(sess.Context().Done()).ShouldNot(BeClosed())
|
||||||
// make the go routine return
|
// make the go routine return
|
||||||
expectReplaceWithClosed()
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
sess.Close()
|
sess.Close()
|
||||||
Eventually(sess.Context().Done()).Should(BeClosed())
|
Eventually(sess.Context().Done()).Should(BeClosed())
|
||||||
|
@ -1410,7 +1384,7 @@ var _ = Describe("Session", func() {
|
||||||
Consistently(sess.Context().Done()).ShouldNot(BeClosed())
|
Consistently(sess.Context().Done()).ShouldNot(BeClosed())
|
||||||
// make the go routine return
|
// make the go routine return
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
expectReplaceWithClosed()
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
sess.Close()
|
sess.Close()
|
||||||
Eventually(sess.Context().Done()).Should(BeClosed())
|
Eventually(sess.Context().Done()).Should(BeClosed())
|
||||||
|
@ -1512,13 +1486,6 @@ var _ = Describe("Client Session", func() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
expectReplaceWithClosed := func() {
|
|
||||||
sessionRunner.EXPECT().ReplaceWithClosed(sess.srcConnID, gomock.Any()).Do(func(_ protocol.ConnectionID, s packetHandler) {
|
|
||||||
Expect(s.Close()).To(Succeed())
|
|
||||||
Eventually(areClosedSessionsRunning).Should(BeFalse())
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
quicConf = populateClientConfig(&Config{}, true)
|
quicConf = populateClientConfig(&Config{}, true)
|
||||||
})
|
})
|
||||||
|
@ -1552,6 +1519,12 @@ var _ = Describe("Client Session", func() {
|
||||||
sess.cryptoStreamHandler = cryptoSetup
|
sess.cryptoStreamHandler = cryptoSetup
|
||||||
})
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
if sess.closedSession != nil {
|
||||||
|
sess.closedSession.destroy()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
It("changes the connection ID when receiving the first packet from the server", func() {
|
It("changes the connection ID when receiving the first packet from the server", func() {
|
||||||
unpacker := NewMockUnpacker(mockCtrl)
|
unpacker := NewMockUnpacker(mockCtrl)
|
||||||
unpacker.EXPECT().Unpack(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(hdr *wire.Header, _ time.Time, data []byte) (*unpackedPacket, error) {
|
unpacker.EXPECT().Unpack(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(hdr *wire.Header, _ time.Time, data []byte) (*unpackedPacket, error) {
|
||||||
|
@ -1581,7 +1554,7 @@ var _ = Describe("Client Session", func() {
|
||||||
}, []byte{0}))).To(BeTrue())
|
}, []byte{0}))).To(BeTrue())
|
||||||
// make sure the go routine returns
|
// make sure the go routine returns
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
expectReplaceWithClosed()
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
Expect(sess.Close()).To(Succeed())
|
Expect(sess.Close()).To(Succeed())
|
||||||
Eventually(sess.Context().Done()).Should(BeClosed())
|
Eventually(sess.Context().Done()).Should(BeClosed())
|
||||||
|
@ -1663,7 +1636,7 @@ var _ = Describe("Client Session", func() {
|
||||||
Expect(err).To(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
Expect(err.Error()).To(ContainSubstring("transport parameter"))
|
Expect(err.Error()).To(ContainSubstring("transport parameter"))
|
||||||
}()
|
}()
|
||||||
expectReplaceWithClosed()
|
sessionRunner.EXPECT().Retire(gomock.Any())
|
||||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
sess.processTransportParameters([]byte("invalid"))
|
sess.processTransportParameters([]byte("invalid"))
|
||||||
|
@ -1766,7 +1739,6 @@ var _ = Describe("Client Session", func() {
|
||||||
// Illustrates that an injected Initial with a CONNECTION_CLOSE frame causes
|
// Illustrates that an injected Initial with a CONNECTION_CLOSE frame causes
|
||||||
// the connection to immediately break down
|
// the connection to immediately break down
|
||||||
It("fails on Initial-level CONNECTION_CLOSE frame", func() {
|
It("fails on Initial-level CONNECTION_CLOSE frame", func() {
|
||||||
sessionRunner.EXPECT().ReplaceWithClosed(gomock.Any(), gomock.Any())
|
|
||||||
connCloseFrame := testutils.ComposeConnCloseFrame()
|
connCloseFrame := testutils.ComposeConnCloseFrame()
|
||||||
initialPacket := testutils.ComposeInitialPacket(sess.destConnID, sess.srcConnID, sess.version, sess.destConnID, []wire.Frame{connCloseFrame})
|
initialPacket := testutils.ComposeInitialPacket(sess.destConnID, sess.srcConnID, sess.version, sess.destConnID, []wire.Frame{connCloseFrame})
|
||||||
Expect(sess.handlePacketImpl(wrapPacket(initialPacket))).To(BeTrue())
|
Expect(sess.handlePacketImpl(wrapPacket(initialPacket))).To(BeTrue())
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
// Any changes will be lost if this file is regenerated.
|
// Any changes will be lost if this file is regenerated.
|
||||||
// see https://github.com/cheekybits/genny
|
// see https://github.com/cheekybits/genny
|
||||||
|
|
||||||
|
//nolint:unused
|
||||||
package quic
|
package quic
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
// Any changes will be lost if this file is regenerated.
|
// Any changes will be lost if this file is regenerated.
|
||||||
// see https://github.com/cheekybits/genny
|
// see https://github.com/cheekybits/genny
|
||||||
|
|
||||||
|
//nolint:unused
|
||||||
package quic
|
package quic
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
// Any changes will be lost if this file is regenerated.
|
// Any changes will be lost if this file is regenerated.
|
||||||
// see https://github.com/cheekybits/genny
|
// see https://github.com/cheekybits/genny
|
||||||
|
|
||||||
|
//nolint:unused
|
||||||
package quic
|
package quic
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
// Any changes will be lost if this file is regenerated.
|
// Any changes will be lost if this file is regenerated.
|
||||||
// see https://github.com/cheekybits/genny
|
// see https://github.com/cheekybits/genny
|
||||||
|
|
||||||
|
//nolint:unused
|
||||||
package quic
|
package quic
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue