diff --git a/ackhandler/interfaces.go b/ackhandler/interfaces.go index cf86f1a0..248eb76e 100644 --- a/ackhandler/interfaces.go +++ b/ackhandler/interfaces.go @@ -14,8 +14,21 @@ type SentPacketHandler interface { ReceivedAck(ackFrame *wire.AckFrame, withPacketNumber protocol.PacketNumber, encLevel protocol.EncryptionLevel, recvTime time.Time) error SetHandshakeComplete() + // SendingAllowed says if a packet can be sent. + // Sending packets might not be possible because: + // * we're congestion limited + // * we're tracking the maximum number of sent packets SendingAllowed() bool + // TimeUntilSend is the time when the next packet should be sent. + // It is used for pacing packets. TimeUntilSend() time.Time + // ShouldSendNumPackets returns the number of packets that should be sent immediately. + // It always returns a number greater or equal than 1. + // A number greater than 1 is returned when the pacing delay is smaller than the minimum pacing delay. + // Note that the number of packets is only calculated based on the pacing algorithm. + // Before sending any packet, SendingAllowed() must be called to learn if we can actually send it. + ShouldSendNumPackets() int + GetStopWaitingFrame(force bool) *wire.StopWaitingFrame GetLowestPacketNotConfirmedAcked() protocol.PacketNumber ShouldSendRetransmittablePacket() bool diff --git a/ackhandler/sent_packet_handler.go b/ackhandler/sent_packet_handler.go index 00c51be2..6c0fd019 100644 --- a/ackhandler/sent_packet_handler.go +++ b/ackhandler/sent_packet_handler.go @@ -3,6 +3,7 @@ package ackhandler import ( "errors" "fmt" + "math" "time" "github.com/lucas-clemente/quic-go/congestion" @@ -34,7 +35,7 @@ var ErrDuplicateOrOutOfOrderAck = errors.New("SentPacketHandler: Duplicate or ou type sentPacketHandler struct { lastSentPacketNumber protocol.PacketNumber - lastPacketSentTime time.Time + nextPacketSendTime time.Time skippedPackets []protocol.PacketNumber numNonRetransmittablePackets int // number of non-retransmittable packets since the last retransmittable packet @@ -125,7 +126,6 @@ func (h *sentPacketHandler) SentPacket(packet *Packet) error { now := time.Now() h.lastSentPacketNumber = packet.PacketNumber - h.lastPacketSentTime = now var largestAcked protocol.PacketNumber if len(packet.Frames) > 0 { @@ -155,6 +155,8 @@ func (h *sentPacketHandler) SentPacket(packet *Packet) error { isRetransmittable, ) + h.nextPacketSendTime = utils.MaxTime(h.nextPacketSendTime, now).Add(h.congestion.TimeUntilSend(h.bytesInFlight)) + h.updateLossDetectionAlarm(now) return nil } @@ -389,7 +391,15 @@ func (h *sentPacketHandler) SendingAllowed() bool { } func (h *sentPacketHandler) TimeUntilSend() time.Time { - return h.lastPacketSentTime.Add(h.congestion.TimeUntilSend(h.bytesInFlight)) + return h.nextPacketSendTime +} + +func (h *sentPacketHandler) ShouldSendNumPackets() int { + delay := h.congestion.TimeUntilSend(h.bytesInFlight) + if delay == 0 || delay > protocol.MinPacingDelay { + return 1 + } + return int(math.Ceil(float64(protocol.MinPacingDelay) / float64(delay))) } func (h *sentPacketHandler) retransmitOldestTwoPackets() { diff --git a/ackhandler/sent_packet_handler_test.go b/ackhandler/sent_packet_handler_test.go index d9b290f4..6601d38d 100644 --- a/ackhandler/sent_packet_handler_test.go +++ b/ackhandler/sent_packet_handler_test.go @@ -680,9 +680,7 @@ var _ = Describe("SentPacketHandler", func() { }) Context("congestion", func() { - var ( - cong *mocks.MockSendAlgorithm - ) + var cong *mocks.MockSendAlgorithm BeforeEach(func() { cong = mocks.NewMockSendAlgorithm(mockCtrl) @@ -698,6 +696,7 @@ var _ = Describe("SentPacketHandler", func() { protocol.ByteCount(42), true, ) + cong.EXPECT().TimeUntilSend(gomock.Any()) p := &Packet{ PacketNumber: 1, Length: 42, @@ -709,6 +708,7 @@ var _ = Describe("SentPacketHandler", func() { It("should call MaybeExitSlowStart and OnPacketAcked", func() { cong.EXPECT().OnPacketSent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2) + cong.EXPECT().TimeUntilSend(gomock.Any()).Times(2) cong.EXPECT().MaybeExitSlowStart() cong.EXPECT().OnPacketAcked( protocol.PacketNumber(1), @@ -723,6 +723,7 @@ var _ = Describe("SentPacketHandler", func() { It("should call MaybeExitSlowStart and OnPacketLost", func() { cong.EXPECT().OnPacketSent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(3) + cong.EXPECT().TimeUntilSend(gomock.Any()).Times(3) cong.EXPECT().OnRetransmissionTimeout(true).Times(2) cong.EXPECT().OnPacketLost( protocol.PacketNumber(1), @@ -765,12 +766,29 @@ var _ = Describe("SentPacketHandler", func() { }) It("gets the pacing delay", func() { + handler.bytesInFlight = 100 cong.EXPECT().OnPacketSent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()) + cong.EXPECT().TimeUntilSend(protocol.ByteCount(100)).Return(time.Hour) handler.SentPacket(&Packet{PacketNumber: 1}) - handler.bytesInFlight = protocol.ByteCount(100) - cong.EXPECT().TimeUntilSend(handler.bytesInFlight).Return(time.Hour) Expect(handler.TimeUntilSend()).To(BeTemporally("~", time.Now().Add(time.Hour), time.Second)) }) + + It("allows sending of one packet, if it should be sent immediately", func() { + cong.EXPECT().TimeUntilSend(gomock.Any()).Return(time.Duration(0)) + Expect(handler.ShouldSendNumPackets()).To(Equal(1)) + }) + + It("allows sending of multiple packets, if the pacing delay is smaller than the minimum", func() { + pacingDelay := protocol.MinPacingDelay / 10 + cong.EXPECT().TimeUntilSend(gomock.Any()).Return(pacingDelay) + Expect(handler.ShouldSendNumPackets()).To(Equal(10)) + }) + + It("allows sending of multiple packets, if the pacing delay is smaller than the minimum, and not a fraction", func() { + pacingDelay := protocol.MinPacingDelay * 2 / 5 + cong.EXPECT().TimeUntilSend(gomock.Any()).Return(pacingDelay) + Expect(handler.ShouldSendNumPackets()).To(Equal(3)) + }) }) Context("calculating RTO", func() { diff --git a/internal/mocks/ackhandler/sent_packet_handler.go b/internal/mocks/ackhandler/sent_packet_handler.go index d71009dc..bad045ed 100644 --- a/internal/mocks/ackhandler/sent_packet_handler.go +++ b/internal/mocks/ackhandler/sent_packet_handler.go @@ -153,6 +153,18 @@ func (mr *MockSentPacketHandlerMockRecorder) SetHandshakeComplete() *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHandshakeComplete", reflect.TypeOf((*MockSentPacketHandler)(nil).SetHandshakeComplete)) } +// ShouldSendNumPackets mocks base method +func (m *MockSentPacketHandler) ShouldSendNumPackets() int { + ret := m.ctrl.Call(m, "ShouldSendNumPackets") + ret0, _ := ret[0].(int) + return ret0 +} + +// ShouldSendNumPackets indicates an expected call of ShouldSendNumPackets +func (mr *MockSentPacketHandlerMockRecorder) ShouldSendNumPackets() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ShouldSendNumPackets", reflect.TypeOf((*MockSentPacketHandler)(nil).ShouldSendNumPackets)) +} + // ShouldSendRetransmittablePacket mocks base method func (m *MockSentPacketHandler) ShouldSendRetransmittablePacket() bool { ret := m.ctrl.Call(m, "ShouldSendRetransmittablePacket") diff --git a/internal/protocol/server_parameters.go b/internal/protocol/server_parameters.go index 7886482b..e5c1cd5c 100644 --- a/internal/protocol/server_parameters.go +++ b/internal/protocol/server_parameters.go @@ -131,3 +131,8 @@ const NumCachedCertificates = 128 // 1. it reduces the framing overhead // 2. it reduces the head-of-line blocking, when a packet is lost const MinStreamFrameSize ByteCount = 128 + +// MinPacingDelay is the minimum duration that is used for packet pacing +// If the packet packing frequency is higher, multiple packets might be sent at once. +// Example: For a packet pacing delay of 20 microseconds, we would send 5 packets at once, wait for 100 microseconds, and so forth. +const MinPacingDelay time.Duration = 100 * time.Microsecond diff --git a/internal/utils/minmax.go b/internal/utils/minmax.go index c984a3c7..ef71c7fa 100644 --- a/internal/utils/minmax.go +++ b/internal/utils/minmax.go @@ -114,6 +114,14 @@ func MinTime(a, b time.Time) time.Time { return a } +// MaxTime returns the later time +func MaxTime(a, b time.Time) time.Time { + if a.After(b) { + return a + } + return b +} + // MaxPacketNumber returns the max packet number func MaxPacketNumber(a, b protocol.PacketNumber) protocol.PacketNumber { if a > b { diff --git a/internal/utils/minmax_test.go b/internal/utils/minmax_test.go index f9041472..0a5dbb08 100644 --- a/internal/utils/minmax_test.go +++ b/internal/utils/minmax_test.go @@ -49,6 +49,13 @@ var _ = Describe("Min / Max", func() { Expect(MaxPacketNumber(1, 2)).To(Equal(protocol.PacketNumber(2))) Expect(MaxPacketNumber(2, 1)).To(Equal(protocol.PacketNumber(2))) }) + + It("returns the maximum time", func() { + a := time.Now() + b := a.Add(time.Second) + Expect(MaxTime(a, b)).To(Equal(b)) + Expect(MaxTime(b, a)).To(Equal(b)) + }) }) Context("Min", func() { diff --git a/session.go b/session.go index 73d87c3e..2127b295 100644 --- a/session.go +++ b/session.go @@ -432,22 +432,8 @@ runLoop: continue } - s.pacingDeadline = time.Time{} - sendingAllowed := s.sentPacketHandler.SendingAllowed() - if !sendingAllowed { // if congestion limited, at least try sending an ACK frame - if err := s.maybeSendAckOnlyPacket(); err != nil { - s.closeLocal(err) - } - } else { - sentPacket, err := s.sendPacket() - if err != nil { - s.closeLocal(err) - } - if sentPacket { - // Only start the pacing timer if actually a packet was sent. - // If one packet was sent, there will probably be more to send when calling sendPacket again. - s.pacingDeadline = s.sentPacketHandler.TimeUntilSend() - } + if err := s.sendPackets(); err != nil { + s.closeLocal(err) } if !s.receivedTooManyUndecrytablePacketsTime.IsZero() && s.receivedTooManyUndecrytablePacketsTime.Add(protocol.PublicResetTimeout).Before(now) && len(s.undecryptablePackets) != 0 { @@ -755,6 +741,28 @@ func (s *session) processTransportParameters(params *handshake.TransportParamete // so we don't need to update stream flow control windows } +func (s *session) sendPackets() error { + s.pacingDeadline = time.Time{} + if !s.sentPacketHandler.SendingAllowed() { // if congestion limited, at least try sending an ACK frame + return s.maybeSendAckOnlyPacket() + } + numPackets := s.sentPacketHandler.ShouldSendNumPackets() + for i := 0; i < numPackets; i++ { + sentPacket, err := s.sendPacket() + if err != nil { + return err + } + // If no packet was sent, or we're congestion limit, we're done here. + if !sentPacket || !s.sentPacketHandler.SendingAllowed() { + return nil + } + } + // Only start the pacing timer if we sent as many packets as we were allowed. + // There will probably be more to send when calling sendPacket again. + s.pacingDeadline = s.sentPacketHandler.TimeUntilSend() + return nil +} + func (s *session) maybeSendAckOnlyPacket() error { ack := s.receivedPacketHandler.GetAckFrame() if ack == nil { diff --git a/session_test.go b/session_test.go index a9618266..5030ca89 100644 --- a/session_test.go +++ b/session_test.go @@ -704,70 +704,6 @@ var _ = Describe("Session", func() { Expect(sent).To(BeTrue()) }) - It("sends multiple packets", func() { - sess.queueControlFrame(&wire.MaxDataFrame{ByteOffset: 1}) - sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) - sph.EXPECT().DequeuePacketForRetransmission().Times(2) - sph.EXPECT().GetAlarmTimeout().AnyTimes() - sph.EXPECT().GetLeastUnacked().AnyTimes() - sph.EXPECT().ShouldSendRetransmittablePacket().Times(2) - sph.EXPECT().SentPacket(gomock.Any()).Times(2) - sph.EXPECT().TimeUntilSend().MinTimes(2).MaxTimes(3).Return(time.Now()) // the test might be completed before the last call - sph.EXPECT().SendingAllowed().Do(func() { // after sending the first packet - // make sure there's something to send - sess.packer.QueueControlFrame(&wire.MaxDataFrame{ByteOffset: 2}) - }).Return(true).Times(2) // allow 2 packets... - // ...then report that we're congestion limited - // (at most once, the test might be completed before the run loop executes this) - sph.EXPECT().SendingAllowed().MaxTimes(1) - sess.sentPacketHandler = sph - done := make(chan struct{}) - go func() { - defer GinkgoRecover() - sess.run() - close(done) - }() - sess.scheduleSending() - Eventually(mconn.written).Should(HaveLen(2)) - Consistently(mconn.written).Should(HaveLen(2)) - // make the go routine return - streamManager.EXPECT().CloseWithError(gomock.Any()) - sess.Close(nil) - Eventually(done).Should(BeClosed()) - }) - - It("paces packets", func() { - sess.queueControlFrame(&wire.MaxDataFrame{ByteOffset: 1}) - sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) - sph.EXPECT().DequeuePacketForRetransmission().Times(2) - sph.EXPECT().GetAlarmTimeout().AnyTimes() - sph.EXPECT().GetLeastUnacked().AnyTimes() - sph.EXPECT().ShouldSendRetransmittablePacket().Times(2) - sph.EXPECT().SentPacket(gomock.Any()).Times(2) - sph.EXPECT().TimeUntilSend().Return(time.Now().Add(-time.Minute)) - sph.EXPECT().TimeUntilSend().Return(time.Now().Add(300 * time.Millisecond)) - sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour)) - sph.EXPECT().SendingAllowed().Do(func() { // after sending the first packet - // make sure there's something to send - sess.packer.QueueControlFrame(&wire.MaxDataFrame{ByteOffset: 2}) - }).Return(true).Times(2) // allow 2 packets... - sess.sentPacketHandler = sph - done := make(chan struct{}) - go func() { - defer GinkgoRecover() - sess.run() - close(done) - }() - sess.scheduleSending() - Eventually(mconn.written).Should(HaveLen(1)) - Consistently(mconn.written, 100*time.Millisecond).Should(HaveLen(1)) - Eventually(mconn.written).Should(HaveLen(2)) - // make the go routine return - streamManager.EXPECT().CloseWithError(gomock.Any()) - sess.Close(nil) - Eventually(done).Should(BeClosed()) - }) - It("sends public reset", func() { err := sess.sendPublicReset(1) Expect(err).NotTo(HaveOccurred()) @@ -805,6 +741,114 @@ var _ = Describe("Session", func() { }) }) + Context("packet pacing", func() { + var sph *mockackhandler.MockSentPacketHandler + + BeforeEach(func() { + sph = mockackhandler.NewMockSentPacketHandler(mockCtrl) + sph.EXPECT().GetAlarmTimeout().AnyTimes() + sph.EXPECT().GetLeastUnacked().AnyTimes() + sph.EXPECT().DequeuePacketForRetransmission().AnyTimes() + sph.EXPECT().ShouldSendRetransmittablePacket().AnyTimes() + sess.sentPacketHandler = sph + sess.packer.hasSentPacket = true + streamManager.EXPECT().CloseWithError(gomock.Any()) + }) + + It("sends multiple packets one by one immediately", func() { + // sess.queueControlFrame(&wire.MaxDataFrame{ByteOffset: 1}) + sph.EXPECT().SentPacket(gomock.Any()).Times(2) + sph.EXPECT().ShouldSendNumPackets().Return(1).Times(2) + sph.EXPECT().TimeUntilSend().Return(time.Now()).Times(2) + sph.EXPECT().SendingAllowed().Do(func() { + // make sure there's something to send + sess.packer.QueueControlFrame(&wire.MaxDataFrame{ByteOffset: 1}) + }).Return(true).Times(3) // allow 2 packets... + // ...then report that we're congestion limited + sph.EXPECT().SendingAllowed() + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + sess.run() + close(done) + }() + sess.scheduleSending() + Eventually(mconn.written).Should(HaveLen(2)) + Consistently(mconn.written).Should(HaveLen(2)) + // make the go routine return + sess.Close(nil) + Eventually(done).Should(BeClosed()) + }) + + It("paces packets", func() { + sess.queueControlFrame(&wire.MaxDataFrame{ByteOffset: 1}) + sph.EXPECT().SentPacket(gomock.Any()).Times(2) + sph.EXPECT().TimeUntilSend().Return(time.Now().Add(-time.Minute)) + sph.EXPECT().TimeUntilSend().Return(time.Now().Add(300 * time.Millisecond)) + sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour)) + sph.EXPECT().ShouldSendNumPackets().Times(2).Return(1) + sph.EXPECT().SendingAllowed().Do(func() { // after sending the first packet + // make sure there's something to send + sess.packer.QueueControlFrame(&wire.MaxDataFrame{ByteOffset: 2}) + }).Return(true).AnyTimes() + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + sess.run() + close(done) + }() + sess.scheduleSending() + Eventually(mconn.written).Should(HaveLen(1)) + Consistently(mconn.written, 100*time.Millisecond).Should(HaveLen(1)) + Eventually(mconn.written).Should(HaveLen(2)) + // make the go routine return + sess.Close(nil) + Eventually(done).Should(BeClosed()) + }) + + It("sends multiple packets at once", func() { + sph.EXPECT().SentPacket(gomock.Any()).Times(3) + sph.EXPECT().ShouldSendNumPackets().Return(3) + sph.EXPECT().TimeUntilSend().Return(time.Now()) + sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour)) + sph.EXPECT().SendingAllowed().Do(func() { + // make sure there's something to send + sess.packer.QueueControlFrame(&wire.MaxDataFrame{ByteOffset: 1}) + }).Return(true).Times(4) + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + sess.run() + close(done) + }() + sess.scheduleSending() + Eventually(mconn.written).Should(HaveLen(3)) + // make the go routine return + sess.Close(nil) + Eventually(done).Should(BeClosed()) + }) + + It("doesn't set a pacing timer when there is no data to send", func() { + sph.EXPECT().TimeUntilSend().Return(time.Now()) + sph.EXPECT().ShouldSendNumPackets().Return(1) + sph.EXPECT().SendingAllowed().Return(true).AnyTimes() + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + sess.run() + close(done) + }() + sess.scheduleSending() // no packet will get sent + Consistently(mconn.written).ShouldNot(Receive()) + // queue a frame, and expect that it won't be sent + sess.packer.QueueControlFrame(&wire.MaxDataFrame{ByteOffset: 1}) + Consistently(mconn.written).ShouldNot(Receive()) + // make the go routine return + sess.Close(nil) + Eventually(done).Should(BeClosed()) + }) + }) + Context("sending ACK only packets", func() { It("doesn't do anything if there's no ACK to be sent", func() { sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) @@ -1073,6 +1117,7 @@ var _ = Describe("Session", func() { sph.EXPECT().DequeuePacketForRetransmission() sph.EXPECT().GetStopWaitingFrame(gomock.Any()) sph.EXPECT().ShouldSendRetransmittablePacket() + sph.EXPECT().ShouldSendNumPackets().Return(1) sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) { Expect(p.Frames[0]).To(BeAssignableToTypeOf(&wire.AckFrame{})) Expect(p.Frames[0].(*wire.AckFrame).LargestAcked).To(Equal(protocol.PacketNumber(0x1337)))