implement a send queue to send packet asynchronously

This commit is contained in:
Marten Seemann 2019-08-20 12:00:05 +07:00
parent f4621e280e
commit 00c19f7241
4 changed files with 351 additions and 221 deletions

39
send_queue.go Normal file
View file

@ -0,0 +1,39 @@
package quic
type sendQueue struct {
queue chan *packedPacket
closeChan chan struct{}
conn connection
}
func newSendQueue(conn connection) *sendQueue {
s := &sendQueue{
conn: conn,
closeChan: make(chan struct{}),
queue: make(chan *packedPacket, 1),
}
return s
}
func (h *sendQueue) Send(p *packedPacket) {
h.queue <- p
}
func (h *sendQueue) Run() error {
var p *packedPacket
for {
select {
case <-h.closeChan:
return nil
case p = <-h.queue:
}
if err := h.conn.Write(p.raw); err != nil {
return err
}
p.buffer.Release()
}
}
func (h *sendQueue) Close() {
close(h.closeChan)
}

66
send_queue_test.go Normal file
View file

@ -0,0 +1,66 @@
package quic
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("Send Queue", func() {
var q *sendQueue
var c *mockConnection
BeforeEach(func() {
c = newMockConnection()
q = newSendQueue(c)
})
getPacket := func(b []byte) *packedPacket {
buf := getPacketBuffer()
buf.Slice = buf.Slice[:len(b)]
copy(buf.Slice, b)
return &packedPacket{
buffer: buf,
raw: buf.Slice,
}
}
It("sends a packet", func() {
q.Send(getPacket([]byte("foobar")))
done := make(chan struct{})
go func() {
defer GinkgoRecover()
q.Run()
close(done)
}()
Eventually(c.written).Should(Receive(Equal([]byte("foobar"))))
q.Close()
Eventually(done).Should(BeClosed())
})
It("blocks sending when too many packets are queued", func() {
q.Send(getPacket([]byte("foobar")))
sent := make(chan struct{})
go func() {
defer GinkgoRecover()
q.Send(getPacket([]byte("raboof")))
close(sent)
}()
Consistently(sent).ShouldNot(BeClosed())
done := make(chan struct{})
go func() {
defer GinkgoRecover()
q.Run()
close(done)
}()
Eventually(c.written).Should(Receive(Equal([]byte("foobar"))))
Eventually(c.written).Should(Receive(Equal([]byte("raboof"))))
q.Close()
Eventually(done).Should(BeClosed())
})
})

View file

@ -105,7 +105,8 @@ type session struct {
version protocol.VersionNumber version protocol.VersionNumber
config *Config config *Config
conn connection conn connection
sendQueue *sendQueue
streamsMap streamManager streamsMap streamManager
@ -337,6 +338,7 @@ var newClientSession = func(
} }
func (s *session) preSetup() { func (s *session) preSetup() {
s.sendQueue = newSendQueue(s.conn)
s.frameParser = wire.NewFrameParser(s.version) s.frameParser = wire.NewFrameParser(s.version)
s.rttStats = &congestion.RTTStats{} s.rttStats = &congestion.RTTStats{}
s.receivedPacketHandler = ackhandler.NewReceivedPacketHandler(s.rttStats, s.logger, s.version) s.receivedPacketHandler = ackhandler.NewReceivedPacketHandler(s.rttStats, s.logger, s.version)
@ -377,6 +379,11 @@ func (s *session) run() error {
defer s.ctxCancel() defer s.ctxCancel()
go s.cryptoStreamHandler.RunHandshake() go s.cryptoStreamHandler.RunHandshake()
go func() {
if err := s.sendQueue.Run(); err != nil {
s.closeLocal(err)
}
}()
if s.perspective == protocol.PerspectiveClient { if s.perspective == protocol.PerspectiveClient {
select { select {
@ -468,6 +475,7 @@ runLoop:
s.closed.Set(true) s.closed.Set(true)
s.logger.Infof("Connection %s closed.", s.srcConnID) s.logger.Infof("Connection %s closed.", s.srcConnID)
s.cryptoStreamHandler.Close() s.cryptoStreamHandler.Close()
s.sendQueue.Close()
return closeErr.err return closeErr.err
} }
@ -1123,7 +1131,8 @@ func (s *session) maybeSendAckOnlyPacket() error {
return nil return nil
} }
s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket()) s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket())
return s.sendPackedPacket(packet) s.sendQueue.Send(packet)
return nil
} }
// maybeSendRetransmission sends retransmissions for at most one packet. // maybeSendRetransmission sends retransmissions for at most one packet.
@ -1145,9 +1154,7 @@ func (s *session) maybeSendRetransmission() (bool, error) {
} }
s.sentPacketHandler.SentPacketsAsRetransmission(ackhandlerPackets, retransmitPacket.PacketNumber) s.sentPacketHandler.SentPacketsAsRetransmission(ackhandlerPackets, retransmitPacket.PacketNumber)
for _, packet := range packets { for _, packet := range packets {
if err := s.sendPackedPacket(packet); err != nil { s.sendPackedPacket(packet)
return false, err
}
} }
return true, nil return true, nil
} }
@ -1169,9 +1176,7 @@ func (s *session) sendProbePacket() error {
} }
s.sentPacketHandler.SentPacketsAsRetransmission(ackhandlerPackets, p.PacketNumber) s.sentPacketHandler.SentPacketsAsRetransmission(ackhandlerPackets, p.PacketNumber)
for _, packet := range packets { for _, packet := range packets {
if err := s.sendPackedPacket(packet); err != nil { s.sendPackedPacket(packet)
return err
}
} }
return nil return nil
} }
@ -1187,14 +1192,11 @@ func (s *session) sendPacket() (bool, error) {
return false, err return false, err
} }
s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket()) s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket())
if err := s.sendPackedPacket(packet); err != nil { s.sendPackedPacket(packet)
return false, err
}
return true, nil return true, nil
} }
func (s *session) sendPackedPacket(packet *packedPacket) error { func (s *session) sendPackedPacket(packet *packedPacket) {
defer packet.buffer.Release()
if s.firstAckElicitingPacketAfterIdleSentTime.IsZero() && packet.IsAckEliciting() { if s.firstAckElicitingPacketAfterIdleSentTime.IsZero() && packet.IsAckEliciting() {
s.firstAckElicitingPacketAfterIdleSentTime = time.Now() s.firstAckElicitingPacketAfterIdleSentTime = time.Now()
} }
@ -1210,7 +1212,7 @@ func (s *session) sendPackedPacket(packet *packedPacket) error {
}) })
} }
s.logPacket(packet) s.logPacket(packet)
return s.conn.Write(packet.raw) s.sendQueue.Send(packet)
} }
func (s *session) sendConnectionClose(quicErr *qerr.QuicError) error { func (s *session) sendConnectionClose(quicErr *qerr.QuicError) error {

View file

@ -73,6 +73,17 @@ var _ = Describe("Session", func() {
cryptoSetup *mocks.MockCryptoSetup cryptoSetup *mocks.MockCryptoSetup
) )
getPacket := func(pn protocol.PacketNumber) *packedPacket {
buffer := getPacketBuffer()
data := buffer.Slice[:0]
data = append(data, []byte("foobar")...)
return &packedPacket{
raw: data,
buffer: buffer,
header: &wire.ExtendedHeader{PacketNumber: pn},
}
}
BeforeEach(func() { BeforeEach(func() {
Eventually(areSessionsRunning).Should(BeFalse()) Eventually(areSessionsRunning).Should(BeFalse())
@ -817,16 +828,22 @@ var _ = Describe("Session", func() {
}) })
Context("sending packets", func() { Context("sending packets", func() {
getPacket := func(pn protocol.PacketNumber) *packedPacket { BeforeEach(func() {
buffer := getPacketBuffer() cryptoSetup.EXPECT().RunHandshake()
data := buffer.Slice[:0] go func() {
data = append(data, []byte("foobar")...) defer GinkgoRecover()
return &packedPacket{ sess.run()
raw: data, }()
buffer: buffer, })
header: &wire.ExtendedHeader{PacketNumber: pn},
} AfterEach(func() {
} streamManager.EXPECT().CloseWithError(gomock.Any())
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
sessionRunner.EXPECT().Retire(gomock.Any())
cryptoSetup.EXPECT().Close()
sess.Close()
Eventually(sess.Context().Done()).Should(BeClosed())
})
It("sends packets", func() { It("sends packets", func() {
packer.EXPECT().PackPacket().Return(getPacket(1), nil) packer.EXPECT().PackPacket().Return(getPacket(1), nil)
@ -834,6 +851,7 @@ var _ = Describe("Session", func() {
sent, err := sess.sendPacket() sent, err := sess.sendPacket()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(sent).To(BeTrue()) Expect(sent).To(BeTrue())
Eventually(mconn.written).Should(Receive())
}) })
It("doesn't send packets if there's nothing to send", func() { It("doesn't send packets if there's nothing to send", func() {
@ -872,6 +890,7 @@ var _ = Describe("Session", func() {
newPacket := getPacket(234) newPacket := getPacket(234)
sess.windowUpdateQueue.callback(&wire.MaxDataFrame{}) sess.windowUpdateQueue.callback(&wire.MaxDataFrame{})
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().GetLossDetectionTimeout().AnyTimes()
sph.EXPECT().DequeuePacketForRetransmission().Return(packetToRetransmit) sph.EXPECT().DequeuePacketForRetransmission().Return(packetToRetransmit)
sph.EXPECT().SendMode().Return(ackhandler.SendRetransmission) sph.EXPECT().SendMode().Return(ackhandler.SendRetransmission)
sph.EXPECT().SendMode().Return(ackhandler.SendAny) sph.EXPECT().SendMode().Return(ackhandler.SendAny)
@ -890,6 +909,7 @@ var _ = Describe("Session", func() {
) )
sess.sentPacketHandler = sph sess.sentPacketHandler = sph
Expect(sess.sendPackets()).To(Succeed()) Expect(sess.sendPackets()).To(Succeed())
Eventually(mconn.written).Should(HaveLen(2))
}) })
It("sends multiple packets, if the retransmission is split", func() { It("sends multiple packets, if the retransmission is split", func() {
@ -903,6 +923,7 @@ var _ = Describe("Session", func() {
} }
retransmissions := []*packedPacket{getPacket(1337), getPacket(1338)} retransmissions := []*packedPacket{getPacket(1337), getPacket(1338)}
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().GetLossDetectionTimeout().AnyTimes()
sph.EXPECT().DequeuePacketForRetransmission().Return(packet) sph.EXPECT().DequeuePacketForRetransmission().Return(packet)
packer.EXPECT().PackRetransmission(packet).Return(retransmissions, nil) packer.EXPECT().PackRetransmission(packet).Return(retransmissions, nil)
sph.EXPECT().SentPacketsAsRetransmission(gomock.Any(), protocol.PacketNumber(42)).Do(func(packets []*ackhandler.Packet, _ protocol.PacketNumber) { sph.EXPECT().SentPacketsAsRetransmission(gomock.Any(), protocol.PacketNumber(42)).Do(func(packets []*ackhandler.Packet, _ protocol.PacketNumber) {
@ -914,13 +935,14 @@ var _ = Describe("Session", func() {
sent, err := sess.maybeSendRetransmission() sent, err := sess.maybeSendRetransmission()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(sent).To(BeTrue()) Expect(sent).To(BeTrue())
Expect(mconn.written).To(HaveLen(2)) Eventually(mconn.written).Should(HaveLen(2))
}) })
It("sends a probe packet", func() { It("sends a probe packet", func() {
packetToRetransmit := &ackhandler.Packet{PacketNumber: 0x42} packetToRetransmit := &ackhandler.Packet{PacketNumber: 0x42}
retransmittedPacket := getPacket(123) retransmittedPacket := getPacket(123)
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().GetLossDetectionTimeout().AnyTimes()
sph.EXPECT().TimeUntilSend() sph.EXPECT().TimeUntilSend()
sph.EXPECT().SendMode().Return(ackhandler.SendPTO) sph.EXPECT().SendMode().Return(ackhandler.SendPTO)
sph.EXPECT().ShouldSendNumPackets().Return(1) sph.EXPECT().ShouldSendNumPackets().Return(1)
@ -936,214 +958,215 @@ var _ = Describe("Session", func() {
It("doesn't send when the SentPacketHandler doesn't allow it", func() { It("doesn't send when the SentPacketHandler doesn't allow it", func() {
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().GetLossDetectionTimeout().AnyTimes()
sph.EXPECT().SendMode().Return(ackhandler.SendNone) sph.EXPECT().SendMode().Return(ackhandler.SendNone)
sess.sentPacketHandler = sph sess.sentPacketHandler = sph
err := sess.sendPackets() err := sess.sendPackets()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })
})
Context("packet pacing", func() { Context("packet pacing", func() {
var sph *mockackhandler.MockSentPacketHandler var sph *mockackhandler.MockSentPacketHandler
BeforeEach(func() { BeforeEach(func() {
sph = mockackhandler.NewMockSentPacketHandler(mockCtrl) sph = mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().GetLossDetectionTimeout().AnyTimes() sph.EXPECT().GetLossDetectionTimeout().AnyTimes()
sph.EXPECT().DequeuePacketForRetransmission().AnyTimes() sph.EXPECT().DequeuePacketForRetransmission().AnyTimes()
sess.sentPacketHandler = sph sess.sentPacketHandler = sph
streamManager.EXPECT().CloseWithError(gomock.Any()) streamManager.EXPECT().CloseWithError(gomock.Any())
})
It("sends multiple packets one by one immediately", func() {
sph.EXPECT().SentPacket(gomock.Any()).Times(2)
sph.EXPECT().ShouldSendNumPackets().Return(1).Times(2)
sph.EXPECT().TimeUntilSend().Return(time.Now()).Times(2)
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour))
sph.EXPECT().SendMode().Return(ackhandler.SendAny).Times(2) // allow 2 packets...
packer.EXPECT().PackPacket().Return(getPacket(10), nil)
packer.EXPECT().PackPacket().Return(getPacket(11), nil)
done := make(chan struct{})
go func() {
defer GinkgoRecover()
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
sess.run()
close(done)
}()
sess.scheduleSending()
Eventually(mconn.written).Should(HaveLen(2))
Consistently(mconn.written).Should(HaveLen(2))
// make the go routine return
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
sessionRunner.EXPECT().Retire(gomock.Any())
cryptoSetup.EXPECT().Close()
sess.Close()
Eventually(done).Should(BeClosed())
})
// when becoming congestion limited, at some point the SendMode will change from SendAny to SendAck
// we shouldn't send the ACK in the same run
It("doesn't send an ACK right after becoming congestion limited", func() {
sph.EXPECT().SentPacket(gomock.Any())
sph.EXPECT().ShouldSendNumPackets().Return(1000)
sph.EXPECT().TimeUntilSend().Return(time.Now())
sph.EXPECT().SendMode().Return(ackhandler.SendAny)
sph.EXPECT().SendMode().Return(ackhandler.SendAck)
packer.EXPECT().PackPacket().Return(getPacket(100), nil)
done := make(chan struct{})
go func() {
defer GinkgoRecover()
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
sess.run()
close(done)
}()
sess.scheduleSending()
Eventually(mconn.written).Should(HaveLen(1))
Consistently(mconn.written).Should(HaveLen(1))
// make the go routine return
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
sessionRunner.EXPECT().Retire(gomock.Any())
cryptoSetup.EXPECT().Close()
sess.Close()
Eventually(done).Should(BeClosed())
})
It("paces packets", func() {
pacingDelay := scaleDuration(100 * time.Millisecond)
sph.EXPECT().SentPacket(gomock.Any()).Times(2)
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(-time.Minute)) // send one packet immediately
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(pacingDelay)) // send one
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour))
sph.EXPECT().ShouldSendNumPackets().Times(2).Return(1)
sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes()
packer.EXPECT().PackPacket().Return(getPacket(100), nil)
packer.EXPECT().PackPacket().Return(getPacket(101), nil)
done := make(chan struct{})
go func() {
defer GinkgoRecover()
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
sess.run()
close(done)
}()
sess.scheduleSending()
Eventually(mconn.written).Should(HaveLen(1))
Consistently(mconn.written, pacingDelay/2).Should(HaveLen(1))
Eventually(mconn.written, 2*pacingDelay).Should(HaveLen(2))
// make the go routine return
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
sessionRunner.EXPECT().Retire(gomock.Any())
cryptoSetup.EXPECT().Close()
sess.Close()
Eventually(done).Should(BeClosed())
})
It("sends multiple packets at once", func() {
sph.EXPECT().SentPacket(gomock.Any()).Times(3)
sph.EXPECT().ShouldSendNumPackets().Return(3)
sph.EXPECT().TimeUntilSend().Return(time.Now())
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour))
sph.EXPECT().SendMode().Return(ackhandler.SendAny).Times(3)
packer.EXPECT().PackPacket().Return(getPacket(1000), nil)
packer.EXPECT().PackPacket().Return(getPacket(1001), nil)
packer.EXPECT().PackPacket().Return(getPacket(1002), nil)
done := make(chan struct{})
go func() {
defer GinkgoRecover()
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
sess.run()
close(done)
}()
sess.scheduleSending()
Eventually(mconn.written).Should(HaveLen(3))
// make the go routine return
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
sessionRunner.EXPECT().Retire(gomock.Any())
cryptoSetup.EXPECT().Close()
sess.Close()
Eventually(done).Should(BeClosed())
})
It("doesn't set a pacing timer when there is no data to send", func() {
sph.EXPECT().TimeUntilSend().Return(time.Now())
sph.EXPECT().ShouldSendNumPackets().Return(1)
sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes()
packer.EXPECT().PackPacket()
done := make(chan struct{})
go func() {
defer GinkgoRecover()
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
sess.run()
close(done)
}()
sess.scheduleSending() // no packet will get sent
Consistently(mconn.written).ShouldNot(Receive())
// make the go routine return
sessionRunner.EXPECT().Retire(gomock.Any())
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
cryptoSetup.EXPECT().Close()
sess.Close()
Eventually(done).Should(BeClosed())
})
}) })
Context("scheduling sending", func() { It("sends multiple packets one by one immediately", func() {
It("sends when scheduleSending is called", func() { sph.EXPECT().SentPacket(gomock.Any()).Times(2)
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) sph.EXPECT().ShouldSendNumPackets().Return(1).Times(2)
sph.EXPECT().GetLossDetectionTimeout().AnyTimes() sph.EXPECT().TimeUntilSend().Return(time.Now()).Times(2)
sph.EXPECT().TimeUntilSend().AnyTimes() sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour))
sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() sph.EXPECT().SendMode().Return(ackhandler.SendAny).Times(2) // allow 2 packets...
sph.EXPECT().ShouldSendNumPackets().AnyTimes().Return(1) packer.EXPECT().PackPacket().Return(getPacket(10), nil)
sph.EXPECT().SentPacket(gomock.Any()) packer.EXPECT().PackPacket().Return(getPacket(11), nil)
sess.sentPacketHandler = sph done := make(chan struct{})
packer.EXPECT().PackPacket().Return(getPacket(1), nil) go func() {
defer GinkgoRecover()
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
sess.run()
close(done)
}()
sess.scheduleSending()
Eventually(mconn.written).Should(HaveLen(2))
Consistently(mconn.written).Should(HaveLen(2))
// make the go routine return
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
sessionRunner.EXPECT().Retire(gomock.Any())
cryptoSetup.EXPECT().Close()
sess.Close()
Eventually(done).Should(BeClosed())
})
go func() { // when becoming congestion limited, at some point the SendMode will change from SendAny to SendAck
defer GinkgoRecover() // we shouldn't send the ACK in the same run
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) It("doesn't send an ACK right after becoming congestion limited", func() {
sess.run() sph.EXPECT().SentPacket(gomock.Any())
}() sph.EXPECT().ShouldSendNumPackets().Return(1000)
Consistently(mconn.written).ShouldNot(Receive()) sph.EXPECT().TimeUntilSend().Return(time.Now())
sess.scheduleSending() sph.EXPECT().SendMode().Return(ackhandler.SendAny)
Eventually(mconn.written).Should(Receive()) sph.EXPECT().SendMode().Return(ackhandler.SendAck)
// make the go routine return packer.EXPECT().PackPacket().Return(getPacket(100), nil)
sessionRunner.EXPECT().Retire(gomock.Any()) done := make(chan struct{})
streamManager.EXPECT().CloseWithError(gomock.Any()) go func() {
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) defer GinkgoRecover()
cryptoSetup.EXPECT().Close() cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
sess.Close() sess.run()
Eventually(sess.Context().Done()).Should(BeClosed()) close(done)
}()
sess.scheduleSending()
Eventually(mconn.written).Should(HaveLen(1))
Consistently(mconn.written).Should(HaveLen(1))
// make the go routine return
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
sessionRunner.EXPECT().Retire(gomock.Any())
cryptoSetup.EXPECT().Close()
sess.Close()
Eventually(done).Should(BeClosed())
})
It("paces packets", func() {
pacingDelay := scaleDuration(100 * time.Millisecond)
sph.EXPECT().SentPacket(gomock.Any()).Times(2)
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(-time.Minute)) // send one packet immediately
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(pacingDelay)) // send one
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour))
sph.EXPECT().ShouldSendNumPackets().Times(2).Return(1)
sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes()
packer.EXPECT().PackPacket().Return(getPacket(100), nil)
packer.EXPECT().PackPacket().Return(getPacket(101), nil)
done := make(chan struct{})
go func() {
defer GinkgoRecover()
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
sess.run()
close(done)
}()
sess.scheduleSending()
Eventually(mconn.written).Should(HaveLen(1))
Consistently(mconn.written, pacingDelay/2).Should(HaveLen(1))
Eventually(mconn.written, 2*pacingDelay).Should(HaveLen(2))
// make the go routine return
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
sessionRunner.EXPECT().Retire(gomock.Any())
cryptoSetup.EXPECT().Close()
sess.Close()
Eventually(done).Should(BeClosed())
})
It("sends multiple packets at once", func() {
sph.EXPECT().SentPacket(gomock.Any()).Times(3)
sph.EXPECT().ShouldSendNumPackets().Return(3)
sph.EXPECT().TimeUntilSend().Return(time.Now())
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour))
sph.EXPECT().SendMode().Return(ackhandler.SendAny).Times(3)
packer.EXPECT().PackPacket().Return(getPacket(1000), nil)
packer.EXPECT().PackPacket().Return(getPacket(1001), nil)
packer.EXPECT().PackPacket().Return(getPacket(1002), nil)
done := make(chan struct{})
go func() {
defer GinkgoRecover()
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
sess.run()
close(done)
}()
sess.scheduleSending()
Eventually(mconn.written).Should(HaveLen(3))
// make the go routine return
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
sessionRunner.EXPECT().Retire(gomock.Any())
cryptoSetup.EXPECT().Close()
sess.Close()
Eventually(done).Should(BeClosed())
})
It("doesn't set a pacing timer when there is no data to send", func() {
sph.EXPECT().TimeUntilSend().Return(time.Now())
sph.EXPECT().ShouldSendNumPackets().Return(1)
sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes()
packer.EXPECT().PackPacket()
done := make(chan struct{})
go func() {
defer GinkgoRecover()
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
sess.run()
close(done)
}()
sess.scheduleSending() // no packet will get sent
Consistently(mconn.written).ShouldNot(Receive())
// make the go routine return
sessionRunner.EXPECT().Retire(gomock.Any())
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
cryptoSetup.EXPECT().Close()
sess.Close()
Eventually(done).Should(BeClosed())
})
})
Context("scheduling sending", func() {
It("sends when scheduleSending is called", func() {
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().GetLossDetectionTimeout().AnyTimes()
sph.EXPECT().TimeUntilSend().AnyTimes()
sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes()
sph.EXPECT().ShouldSendNumPackets().AnyTimes().Return(1)
sph.EXPECT().SentPacket(gomock.Any())
sess.sentPacketHandler = sph
packer.EXPECT().PackPacket().Return(getPacket(1), nil)
go func() {
defer GinkgoRecover()
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
sess.run()
}()
Consistently(mconn.written).ShouldNot(Receive())
sess.scheduleSending()
Eventually(mconn.written).Should(Receive())
// make the go routine return
sessionRunner.EXPECT().Retire(gomock.Any())
streamManager.EXPECT().CloseWithError(gomock.Any())
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
cryptoSetup.EXPECT().Close()
sess.Close()
Eventually(sess.Context().Done()).Should(BeClosed())
})
It("sets the timer to the ack timer", func() {
packer.EXPECT().PackPacket().Return(getPacket(1234), nil)
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().TimeUntilSend().Return(time.Now())
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour))
sph.EXPECT().GetLossDetectionTimeout().AnyTimes()
sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes()
sph.EXPECT().ShouldSendNumPackets().Return(1)
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {
Expect(p.PacketNumber).To(Equal(protocol.PacketNumber(1234)))
}) })
sess.sentPacketHandler = sph
rph := mockackhandler.NewMockReceivedPacketHandler(mockCtrl)
rph.EXPECT().GetAlarmTimeout().Return(time.Now().Add(10 * time.Millisecond))
// make the run loop wait
rph.EXPECT().GetAlarmTimeout().Return(time.Now().Add(time.Hour)).MaxTimes(1)
sess.receivedPacketHandler = rph
It("sets the timer to the ack timer", func() { go func() {
packer.EXPECT().PackPacket().Return(getPacket(1234), nil) defer GinkgoRecover()
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
sph.EXPECT().TimeUntilSend().Return(time.Now()) sess.run()
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour)) }()
sph.EXPECT().GetLossDetectionTimeout().AnyTimes() Eventually(mconn.written).Should(Receive())
sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() // make sure the go routine returns
sph.EXPECT().ShouldSendNumPackets().Return(1) packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) { sessionRunner.EXPECT().Retire(gomock.Any())
Expect(p.PacketNumber).To(Equal(protocol.PacketNumber(1234))) streamManager.EXPECT().CloseWithError(gomock.Any())
}) cryptoSetup.EXPECT().Close()
sess.sentPacketHandler = sph sess.Close()
rph := mockackhandler.NewMockReceivedPacketHandler(mockCtrl) Eventually(sess.Context().Done()).Should(BeClosed())
rph.EXPECT().GetAlarmTimeout().Return(time.Now().Add(10 * time.Millisecond))
// make the run loop wait
rph.EXPECT().GetAlarmTimeout().Return(time.Now().Add(time.Hour)).MaxTimes(1)
sess.receivedPacketHandler = rph
go func() {
defer GinkgoRecover()
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
sess.run()
}()
Eventually(mconn.written).Should(Receive())
// make sure the go routine returns
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
sessionRunner.EXPECT().Retire(gomock.Any())
streamManager.EXPECT().CloseWithError(gomock.Any())
cryptoSetup.EXPECT().Close()
sess.Close()
Eventually(sess.Context().Done()).Should(BeClosed())
})
}) })
}) })