send multiple packets at once, if the pacing delay is very small

This is an optimization to avoid waking of the run loop every couple of
microseconds.
This commit is contained in:
Marten Seemann 2018-01-08 10:03:26 +07:00
parent 5ef89733ae
commit 9ef3a47da5
9 changed files with 214 additions and 88 deletions

View file

@ -14,8 +14,21 @@ type SentPacketHandler interface {
ReceivedAck(ackFrame *wire.AckFrame, withPacketNumber protocol.PacketNumber, encLevel protocol.EncryptionLevel, recvTime time.Time) error
SetHandshakeComplete()
// SendingAllowed says if a packet can be sent.
// Sending packets might not be possible because:
// * we're congestion limited
// * we're tracking the maximum number of sent packets
SendingAllowed() bool
// TimeUntilSend is the time when the next packet should be sent.
// It is used for pacing packets.
TimeUntilSend() time.Time
// ShouldSendNumPackets returns the number of packets that should be sent immediately.
// It always returns a number greater or equal than 1.
// A number greater than 1 is returned when the pacing delay is smaller than the minimum pacing delay.
// Note that the number of packets is only calculated based on the pacing algorithm.
// Before sending any packet, SendingAllowed() must be called to learn if we can actually send it.
ShouldSendNumPackets() int
GetStopWaitingFrame(force bool) *wire.StopWaitingFrame
GetLowestPacketNotConfirmedAcked() protocol.PacketNumber
ShouldSendRetransmittablePacket() bool

View file

@ -3,6 +3,7 @@ package ackhandler
import (
"errors"
"fmt"
"math"
"time"
"github.com/lucas-clemente/quic-go/congestion"
@ -34,7 +35,7 @@ var ErrDuplicateOrOutOfOrderAck = errors.New("SentPacketHandler: Duplicate or ou
type sentPacketHandler struct {
lastSentPacketNumber protocol.PacketNumber
lastPacketSentTime time.Time
nextPacketSendTime time.Time
skippedPackets []protocol.PacketNumber
numNonRetransmittablePackets int // number of non-retransmittable packets since the last retransmittable packet
@ -125,7 +126,6 @@ func (h *sentPacketHandler) SentPacket(packet *Packet) error {
now := time.Now()
h.lastSentPacketNumber = packet.PacketNumber
h.lastPacketSentTime = now
var largestAcked protocol.PacketNumber
if len(packet.Frames) > 0 {
@ -155,6 +155,8 @@ func (h *sentPacketHandler) SentPacket(packet *Packet) error {
isRetransmittable,
)
h.nextPacketSendTime = utils.MaxTime(h.nextPacketSendTime, now).Add(h.congestion.TimeUntilSend(h.bytesInFlight))
h.updateLossDetectionAlarm(now)
return nil
}
@ -389,7 +391,15 @@ func (h *sentPacketHandler) SendingAllowed() bool {
}
func (h *sentPacketHandler) TimeUntilSend() time.Time {
return h.lastPacketSentTime.Add(h.congestion.TimeUntilSend(h.bytesInFlight))
return h.nextPacketSendTime
}
func (h *sentPacketHandler) ShouldSendNumPackets() int {
delay := h.congestion.TimeUntilSend(h.bytesInFlight)
if delay == 0 || delay > protocol.MinPacingDelay {
return 1
}
return int(math.Ceil(float64(protocol.MinPacingDelay) / float64(delay)))
}
func (h *sentPacketHandler) retransmitOldestTwoPackets() {

View file

@ -680,9 +680,7 @@ var _ = Describe("SentPacketHandler", func() {
})
Context("congestion", func() {
var (
cong *mocks.MockSendAlgorithm
)
var cong *mocks.MockSendAlgorithm
BeforeEach(func() {
cong = mocks.NewMockSendAlgorithm(mockCtrl)
@ -698,6 +696,7 @@ var _ = Describe("SentPacketHandler", func() {
protocol.ByteCount(42),
true,
)
cong.EXPECT().TimeUntilSend(gomock.Any())
p := &Packet{
PacketNumber: 1,
Length: 42,
@ -709,6 +708,7 @@ var _ = Describe("SentPacketHandler", func() {
It("should call MaybeExitSlowStart and OnPacketAcked", func() {
cong.EXPECT().OnPacketSent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2)
cong.EXPECT().TimeUntilSend(gomock.Any()).Times(2)
cong.EXPECT().MaybeExitSlowStart()
cong.EXPECT().OnPacketAcked(
protocol.PacketNumber(1),
@ -723,6 +723,7 @@ var _ = Describe("SentPacketHandler", func() {
It("should call MaybeExitSlowStart and OnPacketLost", func() {
cong.EXPECT().OnPacketSent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(3)
cong.EXPECT().TimeUntilSend(gomock.Any()).Times(3)
cong.EXPECT().OnRetransmissionTimeout(true).Times(2)
cong.EXPECT().OnPacketLost(
protocol.PacketNumber(1),
@ -765,12 +766,29 @@ var _ = Describe("SentPacketHandler", func() {
})
It("gets the pacing delay", func() {
handler.bytesInFlight = 100
cong.EXPECT().OnPacketSent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any())
cong.EXPECT().TimeUntilSend(protocol.ByteCount(100)).Return(time.Hour)
handler.SentPacket(&Packet{PacketNumber: 1})
handler.bytesInFlight = protocol.ByteCount(100)
cong.EXPECT().TimeUntilSend(handler.bytesInFlight).Return(time.Hour)
Expect(handler.TimeUntilSend()).To(BeTemporally("~", time.Now().Add(time.Hour), time.Second))
})
It("allows sending of one packet, if it should be sent immediately", func() {
cong.EXPECT().TimeUntilSend(gomock.Any()).Return(time.Duration(0))
Expect(handler.ShouldSendNumPackets()).To(Equal(1))
})
It("allows sending of multiple packets, if the pacing delay is smaller than the minimum", func() {
pacingDelay := protocol.MinPacingDelay / 10
cong.EXPECT().TimeUntilSend(gomock.Any()).Return(pacingDelay)
Expect(handler.ShouldSendNumPackets()).To(Equal(10))
})
It("allows sending of multiple packets, if the pacing delay is smaller than the minimum, and not a fraction", func() {
pacingDelay := protocol.MinPacingDelay * 2 / 5
cong.EXPECT().TimeUntilSend(gomock.Any()).Return(pacingDelay)
Expect(handler.ShouldSendNumPackets()).To(Equal(3))
})
})
Context("calculating RTO", func() {

View file

@ -153,6 +153,18 @@ func (mr *MockSentPacketHandlerMockRecorder) SetHandshakeComplete() *gomock.Call
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHandshakeComplete", reflect.TypeOf((*MockSentPacketHandler)(nil).SetHandshakeComplete))
}
// ShouldSendNumPackets mocks base method
func (m *MockSentPacketHandler) ShouldSendNumPackets() int {
ret := m.ctrl.Call(m, "ShouldSendNumPackets")
ret0, _ := ret[0].(int)
return ret0
}
// ShouldSendNumPackets indicates an expected call of ShouldSendNumPackets
func (mr *MockSentPacketHandlerMockRecorder) ShouldSendNumPackets() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ShouldSendNumPackets", reflect.TypeOf((*MockSentPacketHandler)(nil).ShouldSendNumPackets))
}
// ShouldSendRetransmittablePacket mocks base method
func (m *MockSentPacketHandler) ShouldSendRetransmittablePacket() bool {
ret := m.ctrl.Call(m, "ShouldSendRetransmittablePacket")

View file

@ -131,3 +131,8 @@ const NumCachedCertificates = 128
// 1. it reduces the framing overhead
// 2. it reduces the head-of-line blocking, when a packet is lost
const MinStreamFrameSize ByteCount = 128
// MinPacingDelay is the minimum duration that is used for packet pacing
// If the packet packing frequency is higher, multiple packets might be sent at once.
// Example: For a packet pacing delay of 20 microseconds, we would send 5 packets at once, wait for 100 microseconds, and so forth.
const MinPacingDelay time.Duration = 100 * time.Microsecond

View file

@ -114,6 +114,14 @@ func MinTime(a, b time.Time) time.Time {
return a
}
// MaxTime returns the later time
func MaxTime(a, b time.Time) time.Time {
if a.After(b) {
return a
}
return b
}
// MaxPacketNumber returns the max packet number
func MaxPacketNumber(a, b protocol.PacketNumber) protocol.PacketNumber {
if a > b {

View file

@ -49,6 +49,13 @@ var _ = Describe("Min / Max", func() {
Expect(MaxPacketNumber(1, 2)).To(Equal(protocol.PacketNumber(2)))
Expect(MaxPacketNumber(2, 1)).To(Equal(protocol.PacketNumber(2)))
})
It("returns the maximum time", func() {
a := time.Now()
b := a.Add(time.Second)
Expect(MaxTime(a, b)).To(Equal(b))
Expect(MaxTime(b, a)).To(Equal(b))
})
})
Context("Min", func() {

View file

@ -432,22 +432,8 @@ runLoop:
continue
}
s.pacingDeadline = time.Time{}
sendingAllowed := s.sentPacketHandler.SendingAllowed()
if !sendingAllowed { // if congestion limited, at least try sending an ACK frame
if err := s.maybeSendAckOnlyPacket(); err != nil {
s.closeLocal(err)
}
} else {
sentPacket, err := s.sendPacket()
if err != nil {
s.closeLocal(err)
}
if sentPacket {
// Only start the pacing timer if actually a packet was sent.
// If one packet was sent, there will probably be more to send when calling sendPacket again.
s.pacingDeadline = s.sentPacketHandler.TimeUntilSend()
}
if err := s.sendPackets(); err != nil {
s.closeLocal(err)
}
if !s.receivedTooManyUndecrytablePacketsTime.IsZero() && s.receivedTooManyUndecrytablePacketsTime.Add(protocol.PublicResetTimeout).Before(now) && len(s.undecryptablePackets) != 0 {
@ -755,6 +741,28 @@ func (s *session) processTransportParameters(params *handshake.TransportParamete
// so we don't need to update stream flow control windows
}
func (s *session) sendPackets() error {
s.pacingDeadline = time.Time{}
if !s.sentPacketHandler.SendingAllowed() { // if congestion limited, at least try sending an ACK frame
return s.maybeSendAckOnlyPacket()
}
numPackets := s.sentPacketHandler.ShouldSendNumPackets()
for i := 0; i < numPackets; i++ {
sentPacket, err := s.sendPacket()
if err != nil {
return err
}
// If no packet was sent, or we're congestion limit, we're done here.
if !sentPacket || !s.sentPacketHandler.SendingAllowed() {
return nil
}
}
// Only start the pacing timer if we sent as many packets as we were allowed.
// There will probably be more to send when calling sendPacket again.
s.pacingDeadline = s.sentPacketHandler.TimeUntilSend()
return nil
}
func (s *session) maybeSendAckOnlyPacket() error {
ack := s.receivedPacketHandler.GetAckFrame()
if ack == nil {

View file

@ -704,70 +704,6 @@ var _ = Describe("Session", func() {
Expect(sent).To(BeTrue())
})
It("sends multiple packets", func() {
sess.queueControlFrame(&wire.MaxDataFrame{ByteOffset: 1})
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().DequeuePacketForRetransmission().Times(2)
sph.EXPECT().GetAlarmTimeout().AnyTimes()
sph.EXPECT().GetLeastUnacked().AnyTimes()
sph.EXPECT().ShouldSendRetransmittablePacket().Times(2)
sph.EXPECT().SentPacket(gomock.Any()).Times(2)
sph.EXPECT().TimeUntilSend().MinTimes(2).MaxTimes(3).Return(time.Now()) // the test might be completed before the last call
sph.EXPECT().SendingAllowed().Do(func() { // after sending the first packet
// make sure there's something to send
sess.packer.QueueControlFrame(&wire.MaxDataFrame{ByteOffset: 2})
}).Return(true).Times(2) // allow 2 packets...
// ...then report that we're congestion limited
// (at most once, the test might be completed before the run loop executes this)
sph.EXPECT().SendingAllowed().MaxTimes(1)
sess.sentPacketHandler = sph
done := make(chan struct{})
go func() {
defer GinkgoRecover()
sess.run()
close(done)
}()
sess.scheduleSending()
Eventually(mconn.written).Should(HaveLen(2))
Consistently(mconn.written).Should(HaveLen(2))
// make the go routine return
streamManager.EXPECT().CloseWithError(gomock.Any())
sess.Close(nil)
Eventually(done).Should(BeClosed())
})
It("paces packets", func() {
sess.queueControlFrame(&wire.MaxDataFrame{ByteOffset: 1})
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().DequeuePacketForRetransmission().Times(2)
sph.EXPECT().GetAlarmTimeout().AnyTimes()
sph.EXPECT().GetLeastUnacked().AnyTimes()
sph.EXPECT().ShouldSendRetransmittablePacket().Times(2)
sph.EXPECT().SentPacket(gomock.Any()).Times(2)
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(-time.Minute))
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(300 * time.Millisecond))
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour))
sph.EXPECT().SendingAllowed().Do(func() { // after sending the first packet
// make sure there's something to send
sess.packer.QueueControlFrame(&wire.MaxDataFrame{ByteOffset: 2})
}).Return(true).Times(2) // allow 2 packets...
sess.sentPacketHandler = sph
done := make(chan struct{})
go func() {
defer GinkgoRecover()
sess.run()
close(done)
}()
sess.scheduleSending()
Eventually(mconn.written).Should(HaveLen(1))
Consistently(mconn.written, 100*time.Millisecond).Should(HaveLen(1))
Eventually(mconn.written).Should(HaveLen(2))
// make the go routine return
streamManager.EXPECT().CloseWithError(gomock.Any())
sess.Close(nil)
Eventually(done).Should(BeClosed())
})
It("sends public reset", func() {
err := sess.sendPublicReset(1)
Expect(err).NotTo(HaveOccurred())
@ -805,6 +741,114 @@ var _ = Describe("Session", func() {
})
})
Context("packet pacing", func() {
var sph *mockackhandler.MockSentPacketHandler
BeforeEach(func() {
sph = mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().GetAlarmTimeout().AnyTimes()
sph.EXPECT().GetLeastUnacked().AnyTimes()
sph.EXPECT().DequeuePacketForRetransmission().AnyTimes()
sph.EXPECT().ShouldSendRetransmittablePacket().AnyTimes()
sess.sentPacketHandler = sph
sess.packer.hasSentPacket = true
streamManager.EXPECT().CloseWithError(gomock.Any())
})
It("sends multiple packets one by one immediately", func() {
// sess.queueControlFrame(&wire.MaxDataFrame{ByteOffset: 1})
sph.EXPECT().SentPacket(gomock.Any()).Times(2)
sph.EXPECT().ShouldSendNumPackets().Return(1).Times(2)
sph.EXPECT().TimeUntilSend().Return(time.Now()).Times(2)
sph.EXPECT().SendingAllowed().Do(func() {
// make sure there's something to send
sess.packer.QueueControlFrame(&wire.MaxDataFrame{ByteOffset: 1})
}).Return(true).Times(3) // allow 2 packets...
// ...then report that we're congestion limited
sph.EXPECT().SendingAllowed()
done := make(chan struct{})
go func() {
defer GinkgoRecover()
sess.run()
close(done)
}()
sess.scheduleSending()
Eventually(mconn.written).Should(HaveLen(2))
Consistently(mconn.written).Should(HaveLen(2))
// make the go routine return
sess.Close(nil)
Eventually(done).Should(BeClosed())
})
It("paces packets", func() {
sess.queueControlFrame(&wire.MaxDataFrame{ByteOffset: 1})
sph.EXPECT().SentPacket(gomock.Any()).Times(2)
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(-time.Minute))
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(300 * time.Millisecond))
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour))
sph.EXPECT().ShouldSendNumPackets().Times(2).Return(1)
sph.EXPECT().SendingAllowed().Do(func() { // after sending the first packet
// make sure there's something to send
sess.packer.QueueControlFrame(&wire.MaxDataFrame{ByteOffset: 2})
}).Return(true).AnyTimes()
done := make(chan struct{})
go func() {
defer GinkgoRecover()
sess.run()
close(done)
}()
sess.scheduleSending()
Eventually(mconn.written).Should(HaveLen(1))
Consistently(mconn.written, 100*time.Millisecond).Should(HaveLen(1))
Eventually(mconn.written).Should(HaveLen(2))
// make the go routine return
sess.Close(nil)
Eventually(done).Should(BeClosed())
})
It("sends multiple packets at once", func() {
sph.EXPECT().SentPacket(gomock.Any()).Times(3)
sph.EXPECT().ShouldSendNumPackets().Return(3)
sph.EXPECT().TimeUntilSend().Return(time.Now())
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour))
sph.EXPECT().SendingAllowed().Do(func() {
// make sure there's something to send
sess.packer.QueueControlFrame(&wire.MaxDataFrame{ByteOffset: 1})
}).Return(true).Times(4)
done := make(chan struct{})
go func() {
defer GinkgoRecover()
sess.run()
close(done)
}()
sess.scheduleSending()
Eventually(mconn.written).Should(HaveLen(3))
// make the go routine return
sess.Close(nil)
Eventually(done).Should(BeClosed())
})
It("doesn't set a pacing timer when there is no data to send", func() {
sph.EXPECT().TimeUntilSend().Return(time.Now())
sph.EXPECT().ShouldSendNumPackets().Return(1)
sph.EXPECT().SendingAllowed().Return(true).AnyTimes()
done := make(chan struct{})
go func() {
defer GinkgoRecover()
sess.run()
close(done)
}()
sess.scheduleSending() // no packet will get sent
Consistently(mconn.written).ShouldNot(Receive())
// queue a frame, and expect that it won't be sent
sess.packer.QueueControlFrame(&wire.MaxDataFrame{ByteOffset: 1})
Consistently(mconn.written).ShouldNot(Receive())
// make the go routine return
sess.Close(nil)
Eventually(done).Should(BeClosed())
})
})
Context("sending ACK only packets", func() {
It("doesn't do anything if there's no ACK to be sent", func() {
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
@ -1073,6 +1117,7 @@ var _ = Describe("Session", func() {
sph.EXPECT().DequeuePacketForRetransmission()
sph.EXPECT().GetStopWaitingFrame(gomock.Any())
sph.EXPECT().ShouldSendRetransmittablePacket()
sph.EXPECT().ShouldSendNumPackets().Return(1)
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {
Expect(p.Frames[0]).To(BeAssignableToTypeOf(&wire.AckFrame{}))
Expect(p.Frames[0].(*wire.AckFrame).LargestAcked).To(Equal(protocol.PacketNumber(0x1337)))