From 03483d5e715014e0ca30d08a43c0abedb4bf8e5e Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 24 Sep 2019 20:03:38 +0700 Subject: [PATCH] refactor how sessions are deleted Replacing sessions with different structs representing a closed session doesn't work if a session is using multiple connection IDs. --- closed_session.go | 101 ++++++++++++++-------------- closed_session_test.go | 56 +++++++++------ mock_packet_handler_manager_test.go | 12 ---- mock_session_runner_test.go | 12 ---- packet_handler_map.go | 14 +--- quic_suite_test.go | 18 +++++ server.go | 2 - session.go | 27 +++++--- session_test.go | 98 ++++++++++----------------- streams_map_incoming_bidi.go | 1 + streams_map_incoming_uni.go | 1 + streams_map_outgoing_bidi.go | 1 + streams_map_outgoing_uni.go | 1 + 13 files changed, 165 insertions(+), 179 deletions(-) diff --git a/closed_session.go b/closed_session.go index 96a22182..872e3701 100644 --- a/closed_session.go +++ b/closed_session.go @@ -3,44 +3,58 @@ package quic import ( "sync" - "github.com/lucas-clemente/quic-go/internal/protocol" "github.com/lucas-clemente/quic-go/internal/utils" ) +type closedSession interface { + destroy() +} + // A closedLocalSession is a session that we closed locally. // When receiving packets for such a session, we need to retransmit the packet containing the CONNECTION_CLOSE frame, // with an exponential backoff. -type closedLocalSession struct { - conn connection - connClosePacket []byte - +type closedBaseSession struct { closeOnce sync.Once closeChan chan struct{} // is closed when the session is closed or destroyed - receivedPackets chan *receivedPacket - counter uint64 // number of packets received + receivedPackets <-chan *receivedPacket +} - perspective protocol.Perspective +func (s *closedBaseSession) destroy() { + s.closeOnce.Do(func() { + close(s.closeChan) + }) +} + +func newClosedBaseSession(receivedPackets <-chan *receivedPacket) closedBaseSession { + return closedBaseSession{ + receivedPackets: receivedPackets, + closeChan: make(chan struct{}), + } +} + +type closedLocalSession struct { + closedBaseSession + + conn connection + connClosePacket []byte + counter uint64 // number of packets received logger utils.Logger } -var _ packetHandler = &closedLocalSession{} - // newClosedLocalSession creates a new closedLocalSession and runs it. func newClosedLocalSession( conn connection, + receivedPackets <-chan *receivedPacket, connClosePacket []byte, - perspective protocol.Perspective, logger utils.Logger, -) packetHandler { +) closedSession { s := &closedLocalSession{ - conn: conn, - connClosePacket: connClosePacket, - perspective: perspective, - logger: logger, - closeChan: make(chan struct{}), - receivedPackets: make(chan *receivedPacket, 64), + closedBaseSession: newClosedBaseSession(receivedPackets), + conn: conn, + connClosePacket: connClosePacket, + logger: logger, } go s.run() return s @@ -50,21 +64,14 @@ func (s *closedLocalSession) run() { for { select { case p := <-s.receivedPackets: - s.handlePacketImpl(p) + s.handlePacket(p) case <-s.closeChan: return } } } -func (s *closedLocalSession) handlePacket(p *receivedPacket) { - select { - case s.receivedPackets <- p: - default: - } -} - -func (s *closedLocalSession) handlePacketImpl(_ *receivedPacket) { +func (s *closedLocalSession) handlePacket(_ *receivedPacket) { s.counter++ // exponential backoff // only send a CONNECTION_CLOSE for the 1st, 2nd, 4th, 8th, 16th, ... packet arriving @@ -79,35 +86,29 @@ func (s *closedLocalSession) handlePacketImpl(_ *receivedPacket) { } } -func (s *closedLocalSession) Close() error { - s.destroy(nil) - return nil -} - -func (s *closedLocalSession) destroy(error) { - s.closeOnce.Do(func() { - close(s.closeChan) - }) -} - -func (s *closedLocalSession) getPerspective() protocol.Perspective { - return s.perspective -} - // A closedRemoteSession is a session that was closed remotely. // For such a session, we might receive reordered packets that were sent before the CONNECTION_CLOSE. // We can just ignore those packets. type closedRemoteSession struct { - perspective protocol.Perspective + closedBaseSession } -var _ packetHandler = &closedRemoteSession{} +var _ closedSession = &closedRemoteSession{} -func newClosedRemoteSession(pers protocol.Perspective) packetHandler { - return &closedRemoteSession{perspective: pers} +func newClosedRemoteSession(receivedPackets <-chan *receivedPacket) closedSession { + s := &closedRemoteSession{ + closedBaseSession: newClosedBaseSession(receivedPackets), + } + go s.run() + return s } -func (s *closedRemoteSession) handlePacket(*receivedPacket) {} -func (s *closedRemoteSession) Close() error { return nil } -func (s *closedRemoteSession) destroy(error) {} -func (s *closedRemoteSession) getPerspective() protocol.Perspective { return s.perspective } +func (s *closedRemoteSession) run() { + for { + select { + case <-s.receivedPackets: // discard packets + case <-s.closeChan: + return + } + } +} diff --git a/closed_session_test.go b/closed_session_test.go index 9cabfeec..674c2f18 100644 --- a/closed_session_test.go +++ b/closed_session_test.go @@ -1,40 +1,30 @@ package quic import ( - "errors" "time" - "github.com/lucas-clemente/quic-go/internal/protocol" "github.com/lucas-clemente/quic-go/internal/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) -var _ = Describe("Closed local session", func() { +var _ = Describe("closed local session", func() { var ( - sess packetHandler - mconn *mockConnection + sess closedSession + mconn *mockConnection + receivedPackets chan *receivedPacket ) BeforeEach(func() { mconn = newMockConnection() - sess = newClosedLocalSession(mconn, []byte("close"), protocol.PerspectiveClient, utils.DefaultLogger) - }) - - AfterEach(func() { - Eventually(areClosedSessionsRunning).Should(BeFalse()) - }) - - It("tells its perspective", func() { - Expect(sess.getPerspective()).To(Equal(protocol.PerspectiveClient)) - // stop the session - Expect(sess.Close()).To(Succeed()) + receivedPackets = make(chan *receivedPacket, 10) + sess = newClosedLocalSession(mconn, receivedPackets, []byte("close"), utils.DefaultLogger) }) It("repeats the packet containing the CONNECTION_CLOSE frame", func() { for i := 1; i <= 20; i++ { - sess.handlePacket(&receivedPacket{}) + receivedPackets <- &receivedPacket{} if i == 1 || i == 2 || i == 4 || i == 8 || i == 16 { Eventually(mconn.written).Should(Receive(Equal([]byte("close")))) // receive the CONNECTION_CLOSE } else { @@ -42,12 +32,40 @@ var _ = Describe("Closed local session", func() { } } // stop the session - Expect(sess.Close()).To(Succeed()) + sess.destroy() + Eventually(areClosedSessionsRunning).Should(BeFalse()) }) It("destroys sessions", func() { Expect(areClosedSessionsRunning()).To(BeTrue()) - sess.destroy(errors.New("destroy")) + sess.destroy() + Eventually(areClosedSessionsRunning).Should(BeFalse()) + }) +}) + +var _ = Describe("closed remote session", func() { + var ( + sess closedSession + receivedPackets chan *receivedPacket + ) + + BeforeEach(func() { + receivedPackets = make(chan *receivedPacket, 10) + sess = newClosedRemoteSession(receivedPackets) + }) + + It("discards packets", func() { + for i := 0; i < 1000; i++ { + receivedPackets <- &receivedPacket{} + } + // stop the session + sess.destroy() + Eventually(areClosedSessionsRunning).Should(BeFalse()) + }) + + It("destroys sessions", func() { + Expect(areClosedSessionsRunning()).To(BeTrue()) + sess.destroy() Eventually(areClosedSessionsRunning).Should(BeFalse()) }) }) diff --git a/mock_packet_handler_manager_test.go b/mock_packet_handler_manager_test.go index 8d057df6..3793d270 100644 --- a/mock_packet_handler_manager_test.go +++ b/mock_packet_handler_manager_test.go @@ -122,18 +122,6 @@ func (mr *MockPacketHandlerManagerMockRecorder) RemoveResetToken(arg0 interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveResetToken", reflect.TypeOf((*MockPacketHandlerManager)(nil).RemoveResetToken), arg0) } -// ReplaceWithClosed mocks base method -func (m *MockPacketHandlerManager) ReplaceWithClosed(arg0 protocol.ConnectionID, arg1 packetHandler) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "ReplaceWithClosed", arg0, arg1) -} - -// ReplaceWithClosed indicates an expected call of ReplaceWithClosed -func (mr *MockPacketHandlerManagerMockRecorder) ReplaceWithClosed(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReplaceWithClosed", reflect.TypeOf((*MockPacketHandlerManager)(nil).ReplaceWithClosed), arg0, arg1) -} - // Retire mocks base method func (m *MockPacketHandlerManager) Retire(arg0 protocol.ConnectionID) { m.ctrl.T.Helper() diff --git a/mock_session_runner_test.go b/mock_session_runner_test.go index d9f5d3f4..9bb54a05 100644 --- a/mock_session_runner_test.go +++ b/mock_session_runner_test.go @@ -70,18 +70,6 @@ func (mr *MockSessionRunnerMockRecorder) RemoveResetToken(arg0 interface{}) *gom return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveResetToken", reflect.TypeOf((*MockSessionRunner)(nil).RemoveResetToken), arg0) } -// ReplaceWithClosed mocks base method -func (m *MockSessionRunner) ReplaceWithClosed(arg0 protocol.ConnectionID, arg1 packetHandler) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "ReplaceWithClosed", arg0, arg1) -} - -// ReplaceWithClosed indicates an expected call of ReplaceWithClosed -func (mr *MockSessionRunnerMockRecorder) ReplaceWithClosed(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReplaceWithClosed", reflect.TypeOf((*MockSessionRunner)(nil).ReplaceWithClosed), arg0, arg1) -} - // Retire mocks base method func (m *MockSessionRunner) Retire(arg0 protocol.ConnectionID) { m.ctrl.T.Helper() diff --git a/packet_handler_map.go b/packet_handler_map.go index a64bd2ed..2ee08580 100644 --- a/packet_handler_map.go +++ b/packet_handler_map.go @@ -65,12 +65,8 @@ func newPacketHandlerMap( func (h *packetHandlerMap) Add(id protocol.ConnectionID, handler packetHandler) { h.mutex.Lock() - h.addLocked(id, handler) - h.mutex.Unlock() -} - -func (h *packetHandlerMap) addLocked(id protocol.ConnectionID, handler packetHandler) { h.handlers[string(id)] = handler + h.mutex.Unlock() } func (h *packetHandlerMap) Remove(id protocol.ConnectionID) { @@ -79,14 +75,6 @@ func (h *packetHandlerMap) Remove(id protocol.ConnectionID) { h.mutex.Unlock() } -func (h *packetHandlerMap) ReplaceWithClosed(id protocol.ConnectionID, handler packetHandler) { - h.mutex.Lock() - h.removeByConnectionIDAsString(string(id)) - h.addLocked(id, handler) - h.mutex.Unlock() - h.retireByConnectionIDAsString(string(id)) -} - func (h *packetHandlerMap) removeByConnectionIDAsString(id string) { delete(h.handlers, id) } diff --git a/quic_suite_test.go b/quic_suite_test.go index 1798fff7..c1a2f5fb 100644 --- a/quic_suite_test.go +++ b/quic_suite_test.go @@ -1,6 +1,9 @@ package quic import ( + "bytes" + "runtime/pprof" + "strings" "sync" "github.com/golang/mock/gomock" @@ -24,6 +27,21 @@ var _ = BeforeEach(func() { connMuxerOnce = *new(sync.Once) }) +func areSessionsRunning() bool { + var b bytes.Buffer + pprof.Lookup("goroutine").WriteTo(&b, 1) + return strings.Contains(b.String(), "quic-go.(*session).run") +} + +func areClosedSessionsRunning() bool { + var b bytes.Buffer + pprof.Lookup("goroutine").WriteTo(&b, 1) + return strings.Contains(b.String(), "quic-go.(*closedLocalSession).run") || + strings.Contains(b.String(), "quic-go.(*closedRemoteSession).run") +} + var _ = AfterEach(func() { mockCtrl.Finish() + Eventually(areSessionsRunning).Should(BeFalse()) + Eventually(areClosedSessionsRunning).Should(BeFalse()) }) diff --git a/server.go b/server.go index 91d375a5..1e713cd6 100644 --- a/server.go +++ b/server.go @@ -37,7 +37,6 @@ type packetHandlerManager interface { Add(protocol.ConnectionID, packetHandler) Retire(protocol.ConnectionID) Remove(protocol.ConnectionID) - ReplaceWithClosed(protocol.ConnectionID, packetHandler) AddResetToken([16]byte, packetHandler) RemoveResetToken([16]byte) GetStatelessResetToken(protocol.ConnectionID) [16]byte @@ -60,7 +59,6 @@ type quicSession interface { type sessionRunner interface { Retire(protocol.ConnectionID) Remove(protocol.ConnectionID) - ReplaceWithClosed(protocol.ConnectionID, packetHandler) AddResetToken([16]byte, packetHandler) RemoveResetToken([16]byte) } diff --git a/session.go b/session.go index e305b282..7b79d031 100644 --- a/session.go +++ b/session.go @@ -87,7 +87,7 @@ func (r *handshakeRunner) OnHandshakeComplete() { r.onHandshakeC type closeError struct { err error remote bool - sendClose bool + immediate bool } var errCloseForRecreating = errors.New("closing session in order to recreate it") @@ -131,7 +131,9 @@ type session struct { receivedPackets chan *receivedPacket sendingScheduled chan struct{} - closeOnce sync.Once + closeOnce sync.Once + closedSessionMutex sync.Mutex + closedSession closedSession // closeChan is used to notify the run loop that it should terminate closeChan chan closeError @@ -919,12 +921,17 @@ func (s *session) closeLocal(e error) { } else { s.logger.Errorf("Closing session with error: %s", e) } - s.closeChan <- closeError{err: e, sendClose: true, remote: false} + s.closeChan <- closeError{err: e, remote: false} }) } // destroy closes the session without sending the error on the wire func (s *session) destroy(e error) { + s.closedSessionMutex.Lock() + if s.closedSession != nil { + s.closedSession.destroy() + } + s.closedSessionMutex.Unlock() s.destroyImpl(e) <-s.ctx.Done() } @@ -937,7 +944,7 @@ func (s *session) destroyImpl(e error) { s.logger.Errorf("Destroying session %s with error: %s", s.destConnID, e) } s.sessionRunner.Remove(s.srcConnID) - s.closeChan <- closeError{err: e, sendClose: false, remote: false} + s.closeChan <- closeError{err: e, immediate: true, remote: false} }) } @@ -952,7 +959,6 @@ func (s *session) closeForRecreating() protocol.PacketNumber { func (s *session) closeRemote(e error) { s.closeOnce.Do(func() { s.logger.Errorf("Peer closed session with error: %s", e) - s.sessionRunner.ReplaceWithClosed(s.srcConnID, newClosedRemoteSession(s.perspective)) s.closeChan <- closeError{err: e, remote: true} }) } @@ -984,19 +990,24 @@ func (s *session) handleCloseError(closeErr closeError) { s.streamsMap.CloseWithError(quicErr) - if !closeErr.sendClose { + if closeErr.immediate { return } + s.sessionRunner.Retire(s.srcConnID) // If this is a remote close we're done here if closeErr.remote { + s.closedSessionMutex.Lock() + s.closedSession = newClosedRemoteSession(s.receivedPackets) + s.closedSessionMutex.Unlock() return } connClosePacket, err := s.sendConnectionClose(quicErr) if err != nil { s.logger.Debugf("Error sending CONNECTION_CLOSE: %s", err) } - cs := newClosedLocalSession(s.conn, connClosePacket, s.perspective, s.logger) - s.sessionRunner.ReplaceWithClosed(s.srcConnID, cs) + s.closedSessionMutex.Lock() + s.closedSession = newClosedLocalSession(s.conn, s.receivedPackets, connClosePacket, s.logger) + s.closedSessionMutex.Unlock() } func (s *session) dropEncryptionLevel(encLevel protocol.EncryptionLevel) { diff --git a/session_test.go b/session_test.go index d18cdf04..4ee5b813 100644 --- a/session_test.go +++ b/session_test.go @@ -7,8 +7,6 @@ import ( "crypto/tls" "errors" "net" - "runtime/pprof" - "strings" "time" . "github.com/onsi/ginkgo" @@ -58,18 +56,6 @@ func (m *mockConnection) LocalAddr() net.Addr { return m.localAddr } func (m *mockConnection) RemoteAddr() net.Addr { return m.remoteAddr } func (*mockConnection) Close() error { panic("not implemented") } -func areSessionsRunning() bool { - var b bytes.Buffer - pprof.Lookup("goroutine").WriteTo(&b, 1) - return strings.Contains(b.String(), "quic-go.(*session).run") -} - -func areClosedSessionsRunning() bool { - var b bytes.Buffer - pprof.Lookup("goroutine").WriteTo(&b, 1) - return strings.Contains(b.String(), "quic-go.(*closedLocalSession).run") -} - var _ = Describe("Session", func() { var ( sess *session @@ -91,17 +77,7 @@ var _ = Describe("Session", func() { } } - expectReplaceWithClosed := func() { - sessionRunner.EXPECT().ReplaceWithClosed(sess.srcConnID, gomock.Any()).Do(func(_ protocol.ConnectionID, s packetHandler) { - Expect(s).To(BeAssignableToTypeOf(&closedLocalSession{})) - Expect(s.Close()).To(Succeed()) - Eventually(areClosedSessionsRunning).Should(BeFalse()) - }) - } - BeforeEach(func() { - Eventually(areSessionsRunning).Should(BeFalse()) - sessionRunner = NewMockSessionRunner(mockCtrl) mconn = newMockConnection() tokenGenerator, err := handshake.NewTokenGenerator() @@ -131,7 +107,9 @@ var _ = Describe("Session", func() { }) AfterEach(func() { - Eventually(areSessionsRunning).Should(BeFalse()) + if sess.closedSession != nil { + sess.closedSession.destroy() + } }) Context("frame handling", func() { @@ -348,9 +326,7 @@ var _ = Describe("Session", func() { It("handles CONNECTION_CLOSE frames, with a transport error code", func() { testErr := qerr.Error(qerr.StreamLimitError, "foobar") streamManager.EXPECT().CloseWithError(testErr) - sessionRunner.EXPECT().ReplaceWithClosed(sess.srcConnID, gomock.Any()).Do(func(_ protocol.ConnectionID, s packetHandler) { - Expect(s).To(BeAssignableToTypeOf(&closedRemoteSession{})) - }) + sessionRunner.EXPECT().Retire(gomock.Any()) cryptoSetup.EXPECT().Close() go func() { @@ -369,9 +345,7 @@ var _ = Describe("Session", func() { It("handles CONNECTION_CLOSE frames, with an application error code", func() { testErr := qerr.ApplicationError(0x1337, "foobar") streamManager.EXPECT().CloseWithError(testErr) - sessionRunner.EXPECT().ReplaceWithClosed(sess.srcConnID, gomock.Any()).Do(func(_ protocol.ConnectionID, s packetHandler) { - Expect(s).To(BeAssignableToTypeOf(&closedRemoteSession{})) - }) + sessionRunner.EXPECT().Retire(gomock.Any()) cryptoSetup.EXPECT().Close() go func() { @@ -419,7 +393,7 @@ var _ = Describe("Session", func() { It("shuts down without error", func() { streamManager.EXPECT().CloseWithError(qerr.Error(qerr.NoError, "")) - expectReplaceWithClosed() + sessionRunner.EXPECT().Retire(gomock.Any()) cryptoSetup.EXPECT().Close() packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{raw: []byte("connection close")}, nil) Expect(sess.Close()).To(Succeed()) @@ -431,7 +405,7 @@ var _ = Describe("Session", func() { It("only closes once", func() { streamManager.EXPECT().CloseWithError(qerr.Error(qerr.NoError, "")) - expectReplaceWithClosed() + sessionRunner.EXPECT().Retire(gomock.Any()) cryptoSetup.EXPECT().Close() packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) Expect(sess.Close()).To(Succeed()) @@ -444,7 +418,7 @@ var _ = Describe("Session", func() { It("closes streams with proper error", func() { testErr := errors.New("test error") streamManager.EXPECT().CloseWithError(qerr.Error(0x1337, testErr.Error())) - expectReplaceWithClosed() + sessionRunner.EXPECT().Retire(gomock.Any()) cryptoSetup.EXPECT().Close() packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) sess.CloseWithError(0x1337, testErr.Error()) @@ -475,7 +449,7 @@ var _ = Describe("Session", func() { It("cancels the context when the run loop exists", func() { streamManager.EXPECT().CloseWithError(gomock.Any()) - expectReplaceWithClosed() + sessionRunner.EXPECT().Retire(gomock.Any()) cryptoSetup.EXPECT().Close() packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) returned := make(chan struct{}) @@ -571,7 +545,7 @@ var _ = Describe("Session", func() { cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) sess.run() }() - expectReplaceWithClosed() + sessionRunner.EXPECT().Retire(gomock.Any()) sess.handlePacket(getPacket(&wire.ExtendedHeader{ Header: wire.Header{DestConnectionID: sess.srcConnID}, PacketNumberLen: protocol.PacketNumberLen1, @@ -596,7 +570,7 @@ var _ = Describe("Session", func() { Expect(err.(*qerr.QuicError).ErrorCode).To(Equal(qerr.ProtocolViolation)) close(done) }() - expectReplaceWithClosed() + sessionRunner.EXPECT().Retire(gomock.Any()) sess.handlePacket(getPacket(&wire.ExtendedHeader{ Header: wire.Header{DestConnectionID: sess.srcConnID}, PacketNumberLen: protocol.PacketNumberLen1, @@ -616,7 +590,7 @@ var _ = Describe("Session", func() { cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) runErr <- sess.run() }() - expectReplaceWithClosed() + sessionRunner.EXPECT().Retire(gomock.Any()) sess.handlePacket(getPacket(&wire.ExtendedHeader{ Header: wire.Header{DestConnectionID: sess.srcConnID}, PacketNumberLen: protocol.PacketNumberLen1, @@ -643,7 +617,7 @@ var _ = Describe("Session", func() { Expect(err).To(MatchError("PROTOCOL_VIOLATION: empty packet")) close(done) }() - expectReplaceWithClosed() + sessionRunner.EXPECT().Retire(gomock.Any()) sess.handlePacket(getPacket(&wire.ExtendedHeader{ Header: wire.Header{DestConnectionID: sess.srcConnID}, PacketNumberLen: protocol.PacketNumberLen1, @@ -844,7 +818,7 @@ var _ = Describe("Session", func() { AfterEach(func() { streamManager.EXPECT().CloseWithError(gomock.Any()) packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) - expectReplaceWithClosed() + sessionRunner.EXPECT().Retire(gomock.Any()) cryptoSetup.EXPECT().Close() sess.Close() Eventually(sess.Context().Done()).Should(BeClosed()) @@ -946,7 +920,7 @@ var _ = Describe("Session", func() { AfterEach(func() { // make the go routine return packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) - expectReplaceWithClosed() + sessionRunner.EXPECT().Retire(gomock.Any()) cryptoSetup.EXPECT().Close() Expect(sess.Close()).To(Succeed()) Eventually(sess.Context().Done()).Should(BeClosed()) @@ -1063,7 +1037,7 @@ var _ = Describe("Session", func() { sess.scheduleSending() Eventually(mconn.written).Should(Receive()) // make the go routine return - expectReplaceWithClosed() + sessionRunner.EXPECT().Retire(gomock.Any()) streamManager.EXPECT().CloseWithError(gomock.Any()) packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) cryptoSetup.EXPECT().Close() @@ -1097,7 +1071,7 @@ var _ = Describe("Session", func() { Eventually(mconn.written).Should(Receive()) // make sure the go routine returns packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) - expectReplaceWithClosed() + sessionRunner.EXPECT().Retire(gomock.Any()) streamManager.EXPECT().CloseWithError(gomock.Any()) cryptoSetup.EXPECT().Close() sess.Close() @@ -1121,7 +1095,7 @@ var _ = Describe("Session", func() { Eventually(handshakeCtx.Done()).Should(BeClosed()) // make sure the go routine returns streamManager.EXPECT().CloseWithError(gomock.Any()) - expectReplaceWithClosed() + sessionRunner.EXPECT().Retire(gomock.Any()) packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) cryptoSetup.EXPECT().Close() Expect(sess.Close()).To(Succeed()) @@ -1131,7 +1105,7 @@ var _ = Describe("Session", func() { It("doesn't cancel the HandshakeComplete context when the handshake fails", func() { packer.EXPECT().PackPacket().AnyTimes() streamManager.EXPECT().CloseWithError(gomock.Any()) - expectReplaceWithClosed() + sessionRunner.EXPECT().Retire(gomock.Any()) packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) cryptoSetup.EXPECT().Close() go func() { @@ -1165,7 +1139,7 @@ var _ = Describe("Session", func() { Eventually(done).Should(BeClosed()) // make sure the go routine returns streamManager.EXPECT().CloseWithError(gomock.Any()) - expectReplaceWithClosed() + sessionRunner.EXPECT().Retire(gomock.Any()) packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) cryptoSetup.EXPECT().Close() Expect(sess.Close()).To(Succeed()) @@ -1181,7 +1155,7 @@ var _ = Describe("Session", func() { close(done) }() streamManager.EXPECT().CloseWithError(gomock.Any()) - expectReplaceWithClosed() + sessionRunner.EXPECT().Retire(gomock.Any()) packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) cryptoSetup.EXPECT().Close() Expect(sess.Close()).To(Succeed()) @@ -1199,7 +1173,7 @@ var _ = Describe("Session", func() { close(done) }() streamManager.EXPECT().CloseWithError(gomock.Any()) - expectReplaceWithClosed() + sessionRunner.EXPECT().Retire(gomock.Any()) packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) cryptoSetup.EXPECT().Close() Expect(sess.CloseWithError(0x1337, testErr.Error())).To(Succeed()) @@ -1216,7 +1190,7 @@ var _ = Describe("Session", func() { Expect(err.Error()).To(ContainSubstring("transport parameter")) }() streamManager.EXPECT().CloseWithError(gomock.Any()) - expectReplaceWithClosed() + sessionRunner.EXPECT().Retire(gomock.Any()) packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) cryptoSetup.EXPECT().Close() sess.processTransportParameters([]byte("invalid")) @@ -1244,7 +1218,7 @@ var _ = Describe("Session", func() { // make the go routine return streamManager.EXPECT().CloseWithError(gomock.Any()) - expectReplaceWithClosed() + sessionRunner.EXPECT().Retire(gomock.Any()) packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) cryptoSetup.EXPECT().Close() sess.Close() @@ -1263,7 +1237,7 @@ var _ = Describe("Session", func() { AfterEach(func() { // make the go routine return - expectReplaceWithClosed() + sessionRunner.EXPECT().Retire(gomock.Any()) streamManager.EXPECT().CloseWithError(gomock.Any()) packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) cryptoSetup.EXPECT().Close() @@ -1371,7 +1345,7 @@ var _ = Describe("Session", func() { }() Consistently(sess.Context().Done()).ShouldNot(BeClosed()) // make the go routine return - expectReplaceWithClosed() + sessionRunner.EXPECT().Retire(gomock.Any()) cryptoSetup.EXPECT().Close() sess.Close() Eventually(sess.Context().Done()).Should(BeClosed()) @@ -1410,7 +1384,7 @@ var _ = Describe("Session", func() { Consistently(sess.Context().Done()).ShouldNot(BeClosed()) // make the go routine return packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) - expectReplaceWithClosed() + sessionRunner.EXPECT().Retire(gomock.Any()) cryptoSetup.EXPECT().Close() sess.Close() Eventually(sess.Context().Done()).Should(BeClosed()) @@ -1512,13 +1486,6 @@ var _ = Describe("Client Session", func() { } } - expectReplaceWithClosed := func() { - sessionRunner.EXPECT().ReplaceWithClosed(sess.srcConnID, gomock.Any()).Do(func(_ protocol.ConnectionID, s packetHandler) { - Expect(s.Close()).To(Succeed()) - Eventually(areClosedSessionsRunning).Should(BeFalse()) - }) - } - BeforeEach(func() { quicConf = populateClientConfig(&Config{}, true) }) @@ -1552,6 +1519,12 @@ var _ = Describe("Client Session", func() { sess.cryptoStreamHandler = cryptoSetup }) + AfterEach(func() { + if sess.closedSession != nil { + sess.closedSession.destroy() + } + }) + It("changes the connection ID when receiving the first packet from the server", func() { unpacker := NewMockUnpacker(mockCtrl) unpacker.EXPECT().Unpack(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(hdr *wire.Header, _ time.Time, data []byte) (*unpackedPacket, error) { @@ -1581,7 +1554,7 @@ var _ = Describe("Client Session", func() { }, []byte{0}))).To(BeTrue()) // make sure the go routine returns packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) - expectReplaceWithClosed() + sessionRunner.EXPECT().Retire(gomock.Any()) cryptoSetup.EXPECT().Close() Expect(sess.Close()).To(Succeed()) Eventually(sess.Context().Done()).Should(BeClosed()) @@ -1663,7 +1636,7 @@ var _ = Describe("Client Session", func() { Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("transport parameter")) }() - expectReplaceWithClosed() + sessionRunner.EXPECT().Retire(gomock.Any()) packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) cryptoSetup.EXPECT().Close() sess.processTransportParameters([]byte("invalid")) @@ -1766,7 +1739,6 @@ var _ = Describe("Client Session", func() { // Illustrates that an injected Initial with a CONNECTION_CLOSE frame causes // the connection to immediately break down It("fails on Initial-level CONNECTION_CLOSE frame", func() { - sessionRunner.EXPECT().ReplaceWithClosed(gomock.Any(), gomock.Any()) connCloseFrame := testutils.ComposeConnCloseFrame() initialPacket := testutils.ComposeInitialPacket(sess.destConnID, sess.srcConnID, sess.version, sess.destConnID, []wire.Frame{connCloseFrame}) Expect(sess.handlePacketImpl(wrapPacket(initialPacket))).To(BeTrue()) diff --git a/streams_map_incoming_bidi.go b/streams_map_incoming_bidi.go index f24b9ec2..84f35581 100644 --- a/streams_map_incoming_bidi.go +++ b/streams_map_incoming_bidi.go @@ -2,6 +2,7 @@ // Any changes will be lost if this file is regenerated. // see https://github.com/cheekybits/genny +//nolint:unused package quic import ( diff --git a/streams_map_incoming_uni.go b/streams_map_incoming_uni.go index c146f3ab..1d7f501f 100644 --- a/streams_map_incoming_uni.go +++ b/streams_map_incoming_uni.go @@ -2,6 +2,7 @@ // Any changes will be lost if this file is regenerated. // see https://github.com/cheekybits/genny +//nolint:unused package quic import ( diff --git a/streams_map_outgoing_bidi.go b/streams_map_outgoing_bidi.go index 5c0ff71f..5178dca0 100644 --- a/streams_map_outgoing_bidi.go +++ b/streams_map_outgoing_bidi.go @@ -2,6 +2,7 @@ // Any changes will be lost if this file is regenerated. // see https://github.com/cheekybits/genny +//nolint:unused package quic import ( diff --git a/streams_map_outgoing_uni.go b/streams_map_outgoing_uni.go index 986cb8a9..c020fa1c 100644 --- a/streams_map_outgoing_uni.go +++ b/streams_map_outgoing_uni.go @@ -2,6 +2,7 @@ // Any changes will be lost if this file is regenerated. // see https://github.com/cheekybits/genny +//nolint:unused package quic import (