diff --git a/send_queue.go b/send_queue.go new file mode 100644 index 00000000..957296aa --- /dev/null +++ b/send_queue.go @@ -0,0 +1,39 @@ +package quic + +type sendQueue struct { + queue chan *packedPacket + closeChan chan struct{} + conn connection +} + +func newSendQueue(conn connection) *sendQueue { + s := &sendQueue{ + conn: conn, + closeChan: make(chan struct{}), + queue: make(chan *packedPacket, 1), + } + return s +} + +func (h *sendQueue) Send(p *packedPacket) { + h.queue <- p +} + +func (h *sendQueue) Run() error { + var p *packedPacket + for { + select { + case <-h.closeChan: + return nil + case p = <-h.queue: + } + if err := h.conn.Write(p.raw); err != nil { + return err + } + p.buffer.Release() + } +} + +func (h *sendQueue) Close() { + close(h.closeChan) +} diff --git a/send_queue_test.go b/send_queue_test.go new file mode 100644 index 00000000..91d49cb3 --- /dev/null +++ b/send_queue_test.go @@ -0,0 +1,66 @@ +package quic + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Send Queue", func() { + var q *sendQueue + var c *mockConnection + + BeforeEach(func() { + c = newMockConnection() + q = newSendQueue(c) + }) + + getPacket := func(b []byte) *packedPacket { + buf := getPacketBuffer() + buf.Slice = buf.Slice[:len(b)] + copy(buf.Slice, b) + return &packedPacket{ + buffer: buf, + raw: buf.Slice, + } + } + + It("sends a packet", func() { + q.Send(getPacket([]byte("foobar"))) + + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + q.Run() + close(done) + }() + + Eventually(c.written).Should(Receive(Equal([]byte("foobar")))) + q.Close() + Eventually(done).Should(BeClosed()) + }) + + It("blocks sending when too many packets are queued", func() { + q.Send(getPacket([]byte("foobar"))) + + sent := make(chan struct{}) + go func() { + defer GinkgoRecover() + q.Send(getPacket([]byte("raboof"))) + close(sent) + }() + + Consistently(sent).ShouldNot(BeClosed()) + + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + q.Run() + close(done) + }() + + Eventually(c.written).Should(Receive(Equal([]byte("foobar")))) + Eventually(c.written).Should(Receive(Equal([]byte("raboof")))) + q.Close() + Eventually(done).Should(BeClosed()) + }) +}) diff --git a/session.go b/session.go index 998e2dca..7abb9640 100644 --- a/session.go +++ b/session.go @@ -105,7 +105,8 @@ type session struct { version protocol.VersionNumber config *Config - conn connection + conn connection + sendQueue *sendQueue streamsMap streamManager @@ -337,6 +338,7 @@ var newClientSession = func( } func (s *session) preSetup() { + s.sendQueue = newSendQueue(s.conn) s.frameParser = wire.NewFrameParser(s.version) s.rttStats = &congestion.RTTStats{} s.receivedPacketHandler = ackhandler.NewReceivedPacketHandler(s.rttStats, s.logger, s.version) @@ -377,6 +379,11 @@ func (s *session) run() error { defer s.ctxCancel() go s.cryptoStreamHandler.RunHandshake() + go func() { + if err := s.sendQueue.Run(); err != nil { + s.closeLocal(err) + } + }() if s.perspective == protocol.PerspectiveClient { select { @@ -468,6 +475,7 @@ runLoop: s.closed.Set(true) s.logger.Infof("Connection %s closed.", s.srcConnID) s.cryptoStreamHandler.Close() + s.sendQueue.Close() return closeErr.err } @@ -1123,7 +1131,8 @@ func (s *session) maybeSendAckOnlyPacket() error { return nil } s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket()) - return s.sendPackedPacket(packet) + s.sendQueue.Send(packet) + return nil } // maybeSendRetransmission sends retransmissions for at most one packet. @@ -1145,9 +1154,7 @@ func (s *session) maybeSendRetransmission() (bool, error) { } s.sentPacketHandler.SentPacketsAsRetransmission(ackhandlerPackets, retransmitPacket.PacketNumber) for _, packet := range packets { - if err := s.sendPackedPacket(packet); err != nil { - return false, err - } + s.sendPackedPacket(packet) } return true, nil } @@ -1169,9 +1176,7 @@ func (s *session) sendProbePacket() error { } s.sentPacketHandler.SentPacketsAsRetransmission(ackhandlerPackets, p.PacketNumber) for _, packet := range packets { - if err := s.sendPackedPacket(packet); err != nil { - return err - } + s.sendPackedPacket(packet) } return nil } @@ -1187,14 +1192,11 @@ func (s *session) sendPacket() (bool, error) { return false, err } s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket()) - if err := s.sendPackedPacket(packet); err != nil { - return false, err - } + s.sendPackedPacket(packet) return true, nil } -func (s *session) sendPackedPacket(packet *packedPacket) error { - defer packet.buffer.Release() +func (s *session) sendPackedPacket(packet *packedPacket) { if s.firstAckElicitingPacketAfterIdleSentTime.IsZero() && packet.IsAckEliciting() { s.firstAckElicitingPacketAfterIdleSentTime = time.Now() } @@ -1210,7 +1212,7 @@ func (s *session) sendPackedPacket(packet *packedPacket) error { }) } s.logPacket(packet) - return s.conn.Write(packet.raw) + s.sendQueue.Send(packet) } func (s *session) sendConnectionClose(quicErr *qerr.QuicError) error { diff --git a/session_test.go b/session_test.go index 56d13fab..3a49241d 100644 --- a/session_test.go +++ b/session_test.go @@ -73,6 +73,17 @@ var _ = Describe("Session", func() { cryptoSetup *mocks.MockCryptoSetup ) + getPacket := func(pn protocol.PacketNumber) *packedPacket { + buffer := getPacketBuffer() + data := buffer.Slice[:0] + data = append(data, []byte("foobar")...) + return &packedPacket{ + raw: data, + buffer: buffer, + header: &wire.ExtendedHeader{PacketNumber: pn}, + } + } + BeforeEach(func() { Eventually(areSessionsRunning).Should(BeFalse()) @@ -817,16 +828,22 @@ var _ = Describe("Session", func() { }) Context("sending packets", func() { - getPacket := func(pn protocol.PacketNumber) *packedPacket { - buffer := getPacketBuffer() - data := buffer.Slice[:0] - data = append(data, []byte("foobar")...) - return &packedPacket{ - raw: data, - buffer: buffer, - header: &wire.ExtendedHeader{PacketNumber: pn}, - } - } + BeforeEach(func() { + cryptoSetup.EXPECT().RunHandshake() + go func() { + defer GinkgoRecover() + sess.run() + }() + }) + + AfterEach(func() { + streamManager.EXPECT().CloseWithError(gomock.Any()) + packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) + sessionRunner.EXPECT().Retire(gomock.Any()) + cryptoSetup.EXPECT().Close() + sess.Close() + Eventually(sess.Context().Done()).Should(BeClosed()) + }) It("sends packets", func() { packer.EXPECT().PackPacket().Return(getPacket(1), nil) @@ -834,6 +851,7 @@ var _ = Describe("Session", func() { sent, err := sess.sendPacket() Expect(err).NotTo(HaveOccurred()) Expect(sent).To(BeTrue()) + Eventually(mconn.written).Should(Receive()) }) It("doesn't send packets if there's nothing to send", func() { @@ -872,6 +890,7 @@ var _ = Describe("Session", func() { newPacket := getPacket(234) sess.windowUpdateQueue.callback(&wire.MaxDataFrame{}) sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) + sph.EXPECT().GetLossDetectionTimeout().AnyTimes() sph.EXPECT().DequeuePacketForRetransmission().Return(packetToRetransmit) sph.EXPECT().SendMode().Return(ackhandler.SendRetransmission) sph.EXPECT().SendMode().Return(ackhandler.SendAny) @@ -890,6 +909,7 @@ var _ = Describe("Session", func() { ) sess.sentPacketHandler = sph Expect(sess.sendPackets()).To(Succeed()) + Eventually(mconn.written).Should(HaveLen(2)) }) It("sends multiple packets, if the retransmission is split", func() { @@ -903,6 +923,7 @@ var _ = Describe("Session", func() { } retransmissions := []*packedPacket{getPacket(1337), getPacket(1338)} sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) + sph.EXPECT().GetLossDetectionTimeout().AnyTimes() sph.EXPECT().DequeuePacketForRetransmission().Return(packet) packer.EXPECT().PackRetransmission(packet).Return(retransmissions, nil) sph.EXPECT().SentPacketsAsRetransmission(gomock.Any(), protocol.PacketNumber(42)).Do(func(packets []*ackhandler.Packet, _ protocol.PacketNumber) { @@ -914,13 +935,14 @@ var _ = Describe("Session", func() { sent, err := sess.maybeSendRetransmission() Expect(err).NotTo(HaveOccurred()) Expect(sent).To(BeTrue()) - Expect(mconn.written).To(HaveLen(2)) + Eventually(mconn.written).Should(HaveLen(2)) }) It("sends a probe packet", func() { packetToRetransmit := &ackhandler.Packet{PacketNumber: 0x42} retransmittedPacket := getPacket(123) sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) + sph.EXPECT().GetLossDetectionTimeout().AnyTimes() sph.EXPECT().TimeUntilSend() sph.EXPECT().SendMode().Return(ackhandler.SendPTO) sph.EXPECT().ShouldSendNumPackets().Return(1) @@ -936,214 +958,215 @@ var _ = Describe("Session", func() { It("doesn't send when the SentPacketHandler doesn't allow it", func() { sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) + sph.EXPECT().GetLossDetectionTimeout().AnyTimes() sph.EXPECT().SendMode().Return(ackhandler.SendNone) sess.sentPacketHandler = sph err := sess.sendPackets() Expect(err).ToNot(HaveOccurred()) }) + }) - Context("packet pacing", func() { - var sph *mockackhandler.MockSentPacketHandler + Context("packet pacing", func() { + var sph *mockackhandler.MockSentPacketHandler - BeforeEach(func() { - sph = mockackhandler.NewMockSentPacketHandler(mockCtrl) - sph.EXPECT().GetLossDetectionTimeout().AnyTimes() - sph.EXPECT().DequeuePacketForRetransmission().AnyTimes() - sess.sentPacketHandler = sph - streamManager.EXPECT().CloseWithError(gomock.Any()) - }) - - It("sends multiple packets one by one immediately", func() { - sph.EXPECT().SentPacket(gomock.Any()).Times(2) - sph.EXPECT().ShouldSendNumPackets().Return(1).Times(2) - sph.EXPECT().TimeUntilSend().Return(time.Now()).Times(2) - sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour)) - sph.EXPECT().SendMode().Return(ackhandler.SendAny).Times(2) // allow 2 packets... - packer.EXPECT().PackPacket().Return(getPacket(10), nil) - packer.EXPECT().PackPacket().Return(getPacket(11), nil) - done := make(chan struct{}) - go func() { - defer GinkgoRecover() - cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) - sess.run() - close(done) - }() - sess.scheduleSending() - Eventually(mconn.written).Should(HaveLen(2)) - Consistently(mconn.written).Should(HaveLen(2)) - // make the go routine return - packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) - sessionRunner.EXPECT().Retire(gomock.Any()) - cryptoSetup.EXPECT().Close() - sess.Close() - Eventually(done).Should(BeClosed()) - }) - - // when becoming congestion limited, at some point the SendMode will change from SendAny to SendAck - // we shouldn't send the ACK in the same run - It("doesn't send an ACK right after becoming congestion limited", func() { - sph.EXPECT().SentPacket(gomock.Any()) - sph.EXPECT().ShouldSendNumPackets().Return(1000) - sph.EXPECT().TimeUntilSend().Return(time.Now()) - sph.EXPECT().SendMode().Return(ackhandler.SendAny) - sph.EXPECT().SendMode().Return(ackhandler.SendAck) - packer.EXPECT().PackPacket().Return(getPacket(100), nil) - done := make(chan struct{}) - go func() { - defer GinkgoRecover() - cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) - sess.run() - close(done) - }() - sess.scheduleSending() - Eventually(mconn.written).Should(HaveLen(1)) - Consistently(mconn.written).Should(HaveLen(1)) - // make the go routine return - packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) - sessionRunner.EXPECT().Retire(gomock.Any()) - cryptoSetup.EXPECT().Close() - sess.Close() - Eventually(done).Should(BeClosed()) - }) - - It("paces packets", func() { - pacingDelay := scaleDuration(100 * time.Millisecond) - sph.EXPECT().SentPacket(gomock.Any()).Times(2) - sph.EXPECT().TimeUntilSend().Return(time.Now().Add(-time.Minute)) // send one packet immediately - sph.EXPECT().TimeUntilSend().Return(time.Now().Add(pacingDelay)) // send one - sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour)) - sph.EXPECT().ShouldSendNumPackets().Times(2).Return(1) - sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() - packer.EXPECT().PackPacket().Return(getPacket(100), nil) - packer.EXPECT().PackPacket().Return(getPacket(101), nil) - done := make(chan struct{}) - go func() { - defer GinkgoRecover() - cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) - sess.run() - close(done) - }() - sess.scheduleSending() - Eventually(mconn.written).Should(HaveLen(1)) - Consistently(mconn.written, pacingDelay/2).Should(HaveLen(1)) - Eventually(mconn.written, 2*pacingDelay).Should(HaveLen(2)) - // make the go routine return - packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) - sessionRunner.EXPECT().Retire(gomock.Any()) - cryptoSetup.EXPECT().Close() - sess.Close() - Eventually(done).Should(BeClosed()) - }) - - It("sends multiple packets at once", func() { - sph.EXPECT().SentPacket(gomock.Any()).Times(3) - sph.EXPECT().ShouldSendNumPackets().Return(3) - sph.EXPECT().TimeUntilSend().Return(time.Now()) - sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour)) - sph.EXPECT().SendMode().Return(ackhandler.SendAny).Times(3) - packer.EXPECT().PackPacket().Return(getPacket(1000), nil) - packer.EXPECT().PackPacket().Return(getPacket(1001), nil) - packer.EXPECT().PackPacket().Return(getPacket(1002), nil) - done := make(chan struct{}) - go func() { - defer GinkgoRecover() - cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) - sess.run() - close(done) - }() - sess.scheduleSending() - Eventually(mconn.written).Should(HaveLen(3)) - // make the go routine return - packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) - sessionRunner.EXPECT().Retire(gomock.Any()) - cryptoSetup.EXPECT().Close() - sess.Close() - Eventually(done).Should(BeClosed()) - }) - - It("doesn't set a pacing timer when there is no data to send", func() { - sph.EXPECT().TimeUntilSend().Return(time.Now()) - sph.EXPECT().ShouldSendNumPackets().Return(1) - sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() - packer.EXPECT().PackPacket() - done := make(chan struct{}) - go func() { - defer GinkgoRecover() - cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) - sess.run() - close(done) - }() - sess.scheduleSending() // no packet will get sent - Consistently(mconn.written).ShouldNot(Receive()) - // make the go routine return - sessionRunner.EXPECT().Retire(gomock.Any()) - packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) - cryptoSetup.EXPECT().Close() - sess.Close() - Eventually(done).Should(BeClosed()) - }) + BeforeEach(func() { + sph = mockackhandler.NewMockSentPacketHandler(mockCtrl) + sph.EXPECT().GetLossDetectionTimeout().AnyTimes() + sph.EXPECT().DequeuePacketForRetransmission().AnyTimes() + sess.sentPacketHandler = sph + streamManager.EXPECT().CloseWithError(gomock.Any()) }) - Context("scheduling sending", func() { - It("sends when scheduleSending is called", func() { - sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) - sph.EXPECT().GetLossDetectionTimeout().AnyTimes() - sph.EXPECT().TimeUntilSend().AnyTimes() - sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() - sph.EXPECT().ShouldSendNumPackets().AnyTimes().Return(1) - sph.EXPECT().SentPacket(gomock.Any()) - sess.sentPacketHandler = sph - packer.EXPECT().PackPacket().Return(getPacket(1), nil) + It("sends multiple packets one by one immediately", func() { + sph.EXPECT().SentPacket(gomock.Any()).Times(2) + sph.EXPECT().ShouldSendNumPackets().Return(1).Times(2) + sph.EXPECT().TimeUntilSend().Return(time.Now()).Times(2) + sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour)) + sph.EXPECT().SendMode().Return(ackhandler.SendAny).Times(2) // allow 2 packets... + packer.EXPECT().PackPacket().Return(getPacket(10), nil) + packer.EXPECT().PackPacket().Return(getPacket(11), nil) + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) + sess.run() + close(done) + }() + sess.scheduleSending() + Eventually(mconn.written).Should(HaveLen(2)) + Consistently(mconn.written).Should(HaveLen(2)) + // make the go routine return + packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) + sessionRunner.EXPECT().Retire(gomock.Any()) + cryptoSetup.EXPECT().Close() + sess.Close() + Eventually(done).Should(BeClosed()) + }) - go func() { - defer GinkgoRecover() - cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) - sess.run() - }() - Consistently(mconn.written).ShouldNot(Receive()) - sess.scheduleSending() - Eventually(mconn.written).Should(Receive()) - // make the go routine return - sessionRunner.EXPECT().Retire(gomock.Any()) - streamManager.EXPECT().CloseWithError(gomock.Any()) - packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) - cryptoSetup.EXPECT().Close() - sess.Close() - Eventually(sess.Context().Done()).Should(BeClosed()) + // when becoming congestion limited, at some point the SendMode will change from SendAny to SendAck + // we shouldn't send the ACK in the same run + It("doesn't send an ACK right after becoming congestion limited", func() { + sph.EXPECT().SentPacket(gomock.Any()) + sph.EXPECT().ShouldSendNumPackets().Return(1000) + sph.EXPECT().TimeUntilSend().Return(time.Now()) + sph.EXPECT().SendMode().Return(ackhandler.SendAny) + sph.EXPECT().SendMode().Return(ackhandler.SendAck) + packer.EXPECT().PackPacket().Return(getPacket(100), nil) + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) + sess.run() + close(done) + }() + sess.scheduleSending() + Eventually(mconn.written).Should(HaveLen(1)) + Consistently(mconn.written).Should(HaveLen(1)) + // make the go routine return + packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) + sessionRunner.EXPECT().Retire(gomock.Any()) + cryptoSetup.EXPECT().Close() + sess.Close() + Eventually(done).Should(BeClosed()) + }) + + It("paces packets", func() { + pacingDelay := scaleDuration(100 * time.Millisecond) + sph.EXPECT().SentPacket(gomock.Any()).Times(2) + sph.EXPECT().TimeUntilSend().Return(time.Now().Add(-time.Minute)) // send one packet immediately + sph.EXPECT().TimeUntilSend().Return(time.Now().Add(pacingDelay)) // send one + sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour)) + sph.EXPECT().ShouldSendNumPackets().Times(2).Return(1) + sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() + packer.EXPECT().PackPacket().Return(getPacket(100), nil) + packer.EXPECT().PackPacket().Return(getPacket(101), nil) + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) + sess.run() + close(done) + }() + sess.scheduleSending() + Eventually(mconn.written).Should(HaveLen(1)) + Consistently(mconn.written, pacingDelay/2).Should(HaveLen(1)) + Eventually(mconn.written, 2*pacingDelay).Should(HaveLen(2)) + // make the go routine return + packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) + sessionRunner.EXPECT().Retire(gomock.Any()) + cryptoSetup.EXPECT().Close() + sess.Close() + Eventually(done).Should(BeClosed()) + }) + + It("sends multiple packets at once", func() { + sph.EXPECT().SentPacket(gomock.Any()).Times(3) + sph.EXPECT().ShouldSendNumPackets().Return(3) + sph.EXPECT().TimeUntilSend().Return(time.Now()) + sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour)) + sph.EXPECT().SendMode().Return(ackhandler.SendAny).Times(3) + packer.EXPECT().PackPacket().Return(getPacket(1000), nil) + packer.EXPECT().PackPacket().Return(getPacket(1001), nil) + packer.EXPECT().PackPacket().Return(getPacket(1002), nil) + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) + sess.run() + close(done) + }() + sess.scheduleSending() + Eventually(mconn.written).Should(HaveLen(3)) + // make the go routine return + packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) + sessionRunner.EXPECT().Retire(gomock.Any()) + cryptoSetup.EXPECT().Close() + sess.Close() + Eventually(done).Should(BeClosed()) + }) + + It("doesn't set a pacing timer when there is no data to send", func() { + sph.EXPECT().TimeUntilSend().Return(time.Now()) + sph.EXPECT().ShouldSendNumPackets().Return(1) + sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() + packer.EXPECT().PackPacket() + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) + sess.run() + close(done) + }() + sess.scheduleSending() // no packet will get sent + Consistently(mconn.written).ShouldNot(Receive()) + // make the go routine return + sessionRunner.EXPECT().Retire(gomock.Any()) + packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) + cryptoSetup.EXPECT().Close() + sess.Close() + Eventually(done).Should(BeClosed()) + }) + }) + + Context("scheduling sending", func() { + It("sends when scheduleSending is called", func() { + sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) + sph.EXPECT().GetLossDetectionTimeout().AnyTimes() + sph.EXPECT().TimeUntilSend().AnyTimes() + sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() + sph.EXPECT().ShouldSendNumPackets().AnyTimes().Return(1) + sph.EXPECT().SentPacket(gomock.Any()) + sess.sentPacketHandler = sph + packer.EXPECT().PackPacket().Return(getPacket(1), nil) + + go func() { + defer GinkgoRecover() + cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) + sess.run() + }() + Consistently(mconn.written).ShouldNot(Receive()) + sess.scheduleSending() + Eventually(mconn.written).Should(Receive()) + // make the go routine return + sessionRunner.EXPECT().Retire(gomock.Any()) + streamManager.EXPECT().CloseWithError(gomock.Any()) + packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) + cryptoSetup.EXPECT().Close() + sess.Close() + Eventually(sess.Context().Done()).Should(BeClosed()) + }) + + It("sets the timer to the ack timer", func() { + packer.EXPECT().PackPacket().Return(getPacket(1234), nil) + sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) + sph.EXPECT().TimeUntilSend().Return(time.Now()) + sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour)) + sph.EXPECT().GetLossDetectionTimeout().AnyTimes() + sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() + sph.EXPECT().ShouldSendNumPackets().Return(1) + sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) { + Expect(p.PacketNumber).To(Equal(protocol.PacketNumber(1234))) }) + sess.sentPacketHandler = sph + rph := mockackhandler.NewMockReceivedPacketHandler(mockCtrl) + rph.EXPECT().GetAlarmTimeout().Return(time.Now().Add(10 * time.Millisecond)) + // make the run loop wait + rph.EXPECT().GetAlarmTimeout().Return(time.Now().Add(time.Hour)).MaxTimes(1) + sess.receivedPacketHandler = rph - It("sets the timer to the ack timer", func() { - packer.EXPECT().PackPacket().Return(getPacket(1234), nil) - sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) - sph.EXPECT().TimeUntilSend().Return(time.Now()) - sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour)) - sph.EXPECT().GetLossDetectionTimeout().AnyTimes() - sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() - sph.EXPECT().ShouldSendNumPackets().Return(1) - sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) { - Expect(p.PacketNumber).To(Equal(protocol.PacketNumber(1234))) - }) - sess.sentPacketHandler = sph - rph := mockackhandler.NewMockReceivedPacketHandler(mockCtrl) - rph.EXPECT().GetAlarmTimeout().Return(time.Now().Add(10 * time.Millisecond)) - // make the run loop wait - rph.EXPECT().GetAlarmTimeout().Return(time.Now().Add(time.Hour)).MaxTimes(1) - sess.receivedPacketHandler = rph - - go func() { - defer GinkgoRecover() - cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) - sess.run() - }() - Eventually(mconn.written).Should(Receive()) - // make sure the go routine returns - packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) - sessionRunner.EXPECT().Retire(gomock.Any()) - streamManager.EXPECT().CloseWithError(gomock.Any()) - cryptoSetup.EXPECT().Close() - sess.Close() - Eventually(sess.Context().Done()).Should(BeClosed()) - }) + go func() { + defer GinkgoRecover() + cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) + sess.run() + }() + Eventually(mconn.written).Should(Receive()) + // make sure the go routine returns + packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) + sessionRunner.EXPECT().Retire(gomock.Any()) + streamManager.EXPECT().CloseWithError(gomock.Any()) + cryptoSetup.EXPECT().Close() + sess.Close() + Eventually(sess.Context().Done()).Should(BeClosed()) }) })