use a token bucket pacing algorithm

This commit is contained in:
Marten Seemann 2020-06-17 11:32:50 +07:00
parent 312b8d37f0
commit fe622dd780
9 changed files with 104 additions and 148 deletions

View file

@ -36,12 +36,8 @@ type SentPacketHandler interface {
// TimeUntilSend is the time when the next packet should be sent. // TimeUntilSend is the time when the next packet should be sent.
// It is used for pacing packets. // It is used for pacing packets.
TimeUntilSend() time.Time TimeUntilSend() time.Time
// ShouldSendNumPackets returns the number of packets that should be sent immediately. // HasPacingBudget says if the pacer allows sending of a (full size) packet at this moment.
// It always returns a number greater or equal than 1. HasPacingBudget() bool
// A number greater than 1 is returned when the pacing delay is smaller than the minimum pacing delay.
// Note that the number of packets is only calculated based on the pacing algorithm.
// Before sending any packet, SendingAllowed() must be called to learn if we can actually send it.
ShouldSendNumPackets() int
// only to be called once the handshake is complete // only to be called once the handshake is complete
QueueProbePacket(protocol.EncryptionLevel) bool /* was a packet queued */ QueueProbePacket(protocol.EncryptionLevel) bool /* was a packet queued */

View file

@ -3,7 +3,6 @@ package ackhandler
import ( import (
"errors" "errors"
"fmt" "fmt"
"math"
"time" "time"
"github.com/lucas-clemente/quic-go/internal/congestion" "github.com/lucas-clemente/quic-go/internal/congestion"
@ -46,8 +45,6 @@ func newPacketNumberSpace(initialPN protocol.PacketNumber) *packetNumberSpace {
} }
type sentPacketHandler struct { type sentPacketHandler struct {
nextSendTime time.Time
initialPackets *packetNumberSpace initialPackets *packetNumberSpace
handshakePackets *packetNumberSpace handshakePackets *packetNumberSpace
appDataPackets *packetNumberSpace appDataPackets *packetNumberSpace
@ -254,7 +251,6 @@ func (h *sentPacketHandler) sentPacketImpl(packet *Packet) bool /* is ack-elicit
} }
h.congestion.OnPacketSent(packet.SendTime, h.bytesInFlight, packet.PacketNumber, packet.Length, isAckEliciting) h.congestion.OnPacketSent(packet.SendTime, h.bytesInFlight, packet.PacketNumber, packet.Length, isAckEliciting)
h.nextSendTime = utils.MaxTime(h.nextSendTime, packet.SendTime).Add(h.congestion.TimeUntilSend(h.bytesInFlight))
return isAckEliciting return isAckEliciting
} }
@ -720,19 +716,11 @@ func (h *sentPacketHandler) SendMode() SendMode {
} }
func (h *sentPacketHandler) TimeUntilSend() time.Time { func (h *sentPacketHandler) TimeUntilSend() time.Time {
return h.nextSendTime return h.congestion.TimeUntilSend(h.bytesInFlight)
} }
func (h *sentPacketHandler) ShouldSendNumPackets() int { func (h *sentPacketHandler) HasPacingBudget() bool {
if h.numProbesToSend > 0 { return h.congestion.HasPacingBudget()
// RTO probes should not be paced, but must be sent immediately.
return h.numProbesToSend
}
delay := h.congestion.TimeUntilSend(h.bytesInFlight)
if delay == 0 || delay > protocol.MinPacingDelay {
return 1
}
return int(math.Ceil(float64(protocol.MinPacingDelay) / float64(delay)))
} }
func (h *sentPacketHandler) AmplificationWindow() protocol.ByteCount { func (h *sentPacketHandler) AmplificationWindow() protocol.ByteCount {

View file

@ -10,6 +10,7 @@ import (
"github.com/lucas-clemente/quic-go/internal/protocol" "github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/utils" "github.com/lucas-clemente/quic-go/internal/utils"
"github.com/lucas-clemente/quic-go/internal/wire" "github.com/lucas-clemente/quic-go/internal/wire"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
) )
@ -427,7 +428,6 @@ var _ = Describe("SentPacketHandler", func() {
protocol.ByteCount(42), protocol.ByteCount(42),
true, true,
) )
cong.EXPECT().TimeUntilSend(gomock.Any())
handler.SentPacket(&Packet{ handler.SentPacket(&Packet{
PacketNumber: 1, PacketNumber: 1,
Length: 42, Length: 42,
@ -439,7 +439,6 @@ var _ = Describe("SentPacketHandler", func() {
It("should call MaybeExitSlowStart and OnPacketAcked", func() { It("should call MaybeExitSlowStart and OnPacketAcked", func() {
rcvTime := time.Now().Add(-5 * time.Second) rcvTime := time.Now().Add(-5 * time.Second)
cong.EXPECT().OnPacketSent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(3) cong.EXPECT().OnPacketSent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(3)
cong.EXPECT().TimeUntilSend(gomock.Any()).Times(3)
gomock.InOrder( gomock.InOrder(
cong.EXPECT().MaybeExitSlowStart(), // must be called before packets are acked cong.EXPECT().MaybeExitSlowStart(), // must be called before packets are acked
cong.EXPECT().OnPacketAcked(protocol.PacketNumber(1), protocol.ByteCount(1), protocol.ByteCount(3), rcvTime), cong.EXPECT().OnPacketAcked(protocol.PacketNumber(1), protocol.ByteCount(1), protocol.ByteCount(3), rcvTime),
@ -454,7 +453,6 @@ var _ = Describe("SentPacketHandler", func() {
It("doesn't call OnPacketAcked when a retransmitted packet is acked", func() { It("doesn't call OnPacketAcked when a retransmitted packet is acked", func() {
cong.EXPECT().OnPacketSent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2) cong.EXPECT().OnPacketSent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2)
cong.EXPECT().TimeUntilSend(gomock.Any()).Times(2)
handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 1, SendTime: time.Now().Add(-time.Hour)})) handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 1, SendTime: time.Now().Add(-time.Hour)}))
handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 2})) handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 2}))
// lose packet 1 // lose packet 1
@ -472,7 +470,6 @@ var _ = Describe("SentPacketHandler", func() {
It("calls OnPacketAcked and OnPacketLost with the right bytes_in_flight value", func() { It("calls OnPacketAcked and OnPacketLost with the right bytes_in_flight value", func() {
cong.EXPECT().OnPacketSent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(4) cong.EXPECT().OnPacketSent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(4)
cong.EXPECT().TimeUntilSend(gomock.Any()).Times(4)
handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 1, SendTime: time.Now().Add(-time.Hour)})) handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 1, SendTime: time.Now().Add(-time.Hour)}))
handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 2, SendTime: time.Now().Add(-30 * time.Minute)})) handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 2, SendTime: time.Now().Add(-30 * time.Minute)}))
handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 3, SendTime: time.Now().Add(-30 * time.Minute)})) handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 3, SendTime: time.Now().Add(-30 * time.Minute)}))
@ -498,7 +495,6 @@ var _ = Describe("SentPacketHandler", func() {
It("passes the bytes in flight to the congestion controller", func() { It("passes the bytes in flight to the congestion controller", func() {
handler.ReceivedPacket(protocol.EncryptionHandshake) handler.ReceivedPacket(protocol.EncryptionHandshake)
cong.EXPECT().OnPacketSent(gomock.Any(), protocol.ByteCount(42), gomock.Any(), protocol.ByteCount(42), true) cong.EXPECT().OnPacketSent(gomock.Any(), protocol.ByteCount(42), gomock.Any(), protocol.ByteCount(42), true)
cong.EXPECT().TimeUntilSend(gomock.Any())
handler.SentPacket(&Packet{ handler.SentPacket(&Packet{
Length: 42, Length: 42,
EncryptionLevel: protocol.EncryptionInitial, EncryptionLevel: protocol.EncryptionInitial,
@ -512,7 +508,6 @@ var _ = Describe("SentPacketHandler", func() {
It("returns SendNone if limited by the 3x limit", func() { It("returns SendNone if limited by the 3x limit", func() {
handler.ReceivedBytes(100) handler.ReceivedBytes(100)
cong.EXPECT().OnPacketSent(gomock.Any(), protocol.ByteCount(300), gomock.Any(), protocol.ByteCount(300), true) cong.EXPECT().OnPacketSent(gomock.Any(), protocol.ByteCount(300), gomock.Any(), protocol.ByteCount(300), true)
cong.EXPECT().TimeUntilSend(gomock.Any())
handler.SentPacket(&Packet{ handler.SentPacket(&Packet{
Length: 300, Length: 300,
EncryptionLevel: protocol.EncryptionInitial, EncryptionLevel: protocol.EncryptionInitial,
@ -527,7 +522,6 @@ var _ = Describe("SentPacketHandler", func() {
It("limits the window to 3x the bytes received, to avoid amplification attacks", func() { It("limits the window to 3x the bytes received, to avoid amplification attacks", func() {
handler.ReceivedPacket(protocol.EncryptionInitial) // receiving an Initial packet doesn't validate the client's address handler.ReceivedPacket(protocol.EncryptionInitial) // receiving an Initial packet doesn't validate the client's address
cong.EXPECT().OnPacketSent(gomock.Any(), protocol.ByteCount(50), gomock.Any(), protocol.ByteCount(50), true) cong.EXPECT().OnPacketSent(gomock.Any(), protocol.ByteCount(50), gomock.Any(), protocol.ByteCount(50), true)
cong.EXPECT().TimeUntilSend(gomock.Any())
handler.SentPacket(&Packet{ handler.SentPacket(&Packet{
Length: 50, Length: 50,
EncryptionLevel: protocol.EncryptionInitial, EncryptionLevel: protocol.EncryptionInitial,
@ -549,7 +543,6 @@ var _ = Describe("SentPacketHandler", func() {
It("allows sending of ACKs when we're keeping track of MaxOutstandingSentPackets packets", func() { It("allows sending of ACKs when we're keeping track of MaxOutstandingSentPackets packets", func() {
handler.ReceivedPacket(protocol.EncryptionHandshake) handler.ReceivedPacket(protocol.EncryptionHandshake)
cong.EXPECT().CanSend(gomock.Any()).Return(true).AnyTimes() cong.EXPECT().CanSend(gomock.Any()).Return(true).AnyTimes()
cong.EXPECT().TimeUntilSend(gomock.Any()).AnyTimes()
cong.EXPECT().OnPacketSent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() cong.EXPECT().OnPacketSent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
for i := protocol.PacketNumber(1); i < protocol.MaxOutstandingSentPackets; i++ { for i := protocol.PacketNumber(1); i < protocol.MaxOutstandingSentPackets; i++ {
handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: i})) handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: i}))
@ -568,35 +561,17 @@ var _ = Describe("SentPacketHandler", func() {
Expect(handler.SendMode()).To(Equal(SendPTOHandshake)) Expect(handler.SendMode()).To(Equal(SendPTOHandshake))
}) })
It("gets the pacing delay", func() { It("says if it has pacing budget", func() {
sendTime := time.Now().Add(-time.Minute) cong.EXPECT().HasPacingBudget().Return(true)
handler.bytesInFlight = 100 Expect(handler.HasPacingBudget()).To(BeTrue())
cong.EXPECT().OnPacketSent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()) cong.EXPECT().HasPacingBudget().Return(false)
cong.EXPECT().TimeUntilSend(protocol.ByteCount(100)).Return(time.Hour) Expect(handler.HasPacingBudget()).To(BeFalse())
handler.SentPacket(&Packet{PacketNumber: 1, SendTime: sendTime, EncryptionLevel: protocol.Encryption1RTT})
Expect(handler.TimeUntilSend()).To(Equal(sendTime.Add(time.Hour)))
}) })
It("allows sending of all RTO probe packets", func() { It("returns the pacing delay", func() {
handler.numProbesToSend = 5 t := time.Now()
Expect(handler.ShouldSendNumPackets()).To(Equal(5)) cong.EXPECT().TimeUntilSend(gomock.Any()).Return(t)
}) Expect(handler.TimeUntilSend()).To(Equal(t))
It("allows sending of one packet, if it should be sent immediately", func() {
cong.EXPECT().TimeUntilSend(gomock.Any()).Return(time.Duration(0))
Expect(handler.ShouldSendNumPackets()).To(Equal(1))
})
It("allows sending of multiple packets, if the pacing delay is smaller than the minimum", func() {
pacingDelay := protocol.MinPacingDelay / 10
cong.EXPECT().TimeUntilSend(gomock.Any()).Return(pacingDelay)
Expect(handler.ShouldSendNumPackets()).To(Equal(10))
})
It("allows sending of multiple packets, if the pacing delay is smaller than the minimum, and not a fraction", func() {
pacingDelay := protocol.MinPacingDelay * 2 / 5
cong.EXPECT().TimeUntilSend(gomock.Any()).Return(pacingDelay)
Expect(handler.ShouldSendNumPackets()).To(Equal(3))
}) })
}) })
@ -694,7 +669,6 @@ var _ = Describe("SentPacketHandler", func() {
})) }))
Expect(handler.OnLossDetectionTimeout()).To(Succeed()) Expect(handler.OnLossDetectionTimeout()).To(Succeed())
Expect(handler.SendMode()).To(Equal(SendPTOAppData)) Expect(handler.SendMode()).To(Equal(SendPTOAppData))
Expect(handler.ShouldSendNumPackets()).To(Equal(2))
handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 2})) handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 2}))
Expect(handler.SendMode()).To(Equal(SendPTOAppData)) Expect(handler.SendMode()).To(Equal(SendPTOAppData))
handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 3})) handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 3}))
@ -707,7 +681,6 @@ var _ = Describe("SentPacketHandler", func() {
handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 1, SendTime: time.Now().Add(-time.Hour)})) handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 1, SendTime: time.Now().Add(-time.Hour)}))
Expect(handler.OnLossDetectionTimeout()).To(Succeed()) Expect(handler.OnLossDetectionTimeout()).To(Succeed())
Expect(handler.SendMode()).To(Equal(SendPTOAppData)) Expect(handler.SendMode()).To(Equal(SendPTOAppData))
Expect(handler.ShouldSendNumPackets()).To(Equal(2))
handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 2})) handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 2}))
Expect(handler.SendMode()).To(Equal(SendPTOAppData)) Expect(handler.SendMode()).To(Equal(SendPTOAppData))
for p := protocol.PacketNumber(3); p < 30; p++ { for p := protocol.PacketNumber(3); p < 30; p++ {

View file

@ -23,6 +23,8 @@ type cubicSender struct {
rttStats *RTTStats rttStats *RTTStats
stats connectionStats stats connectionStats
cubic *Cubic cubic *Cubic
pacer *pacer
clock Clock
reno bool reno bool
@ -75,7 +77,7 @@ func NewCubicSender(clock Clock, rttStats *RTTStats, reno bool) *cubicSender {
} }
func newCubicSender(clock Clock, rttStats *RTTStats, reno bool, initialCongestionWindow, initialMaxCongestionWindow protocol.ByteCount) *cubicSender { func newCubicSender(clock Clock, rttStats *RTTStats, reno bool, initialCongestionWindow, initialMaxCongestionWindow protocol.ByteCount) *cubicSender {
return &cubicSender{ c := &cubicSender{
rttStats: rttStats, rttStats: rttStats,
largestSentPacketNumber: protocol.InvalidPacketNumber, largestSentPacketNumber: protocol.InvalidPacketNumber,
largestAckedPacketNumber: protocol.InvalidPacketNumber, largestAckedPacketNumber: protocol.InvalidPacketNumber,
@ -88,13 +90,20 @@ func newCubicSender(clock Clock, rttStats *RTTStats, reno bool, initialCongestio
maxCongestionWindow: initialMaxCongestionWindow, maxCongestionWindow: initialMaxCongestionWindow,
numConnections: defaultNumConnections, numConnections: defaultNumConnections,
cubic: NewCubic(clock), cubic: NewCubic(clock),
clock: clock,
reno: reno, reno: reno,
} }
c.pacer = newPacer(c.BandwidthEstimate)
return c
} }
// TimeUntilSend returns when the next packet should be sent. // TimeUntilSend returns when the next packet should be sent.
func (c *cubicSender) TimeUntilSend(bytesInFlight protocol.ByteCount) time.Duration { func (c *cubicSender) TimeUntilSend(_ protocol.ByteCount) time.Time {
return c.rttStats.SmoothedRTT() * time.Duration(maxDatagramSize) / time.Duration(2*c.GetCongestionWindow()) return c.pacer.TimeUntilSend()
}
func (c *cubicSender) HasPacingBudget() bool {
return c.pacer.Budget(c.clock.Now()) >= maxDatagramSize
} }
func (c *cubicSender) OnPacketSent( func (c *cubicSender) OnPacketSent(
@ -104,6 +113,7 @@ func (c *cubicSender) OnPacketSent(
bytes protocol.ByteCount, bytes protocol.ByteCount,
isRetransmittable bool, isRetransmittable bool,
) { ) {
c.pacer.SentPacket(sentTime, bytes)
if !isRetransmittable { if !isRetransmittable {
return return
} }

View file

@ -8,7 +8,8 @@ import (
// A SendAlgorithm performs congestion control // A SendAlgorithm performs congestion control
type SendAlgorithm interface { type SendAlgorithm interface {
TimeUntilSend(bytesInFlight protocol.ByteCount) time.Duration TimeUntilSend(bytesInFlight protocol.ByteCount) time.Time
HasPacingBudget() bool
OnPacketSent(sentTime time.Time, bytesInFlight protocol.ByteCount, packetNumber protocol.PacketNumber, bytes protocol.ByteCount, isRetransmittable bool) OnPacketSent(sentTime time.Time, bytesInFlight protocol.ByteCount, packetNumber protocol.PacketNumber, bytes protocol.ByteCount, isRetransmittable bool)
CanSend(bytesInFlight protocol.ByteCount) bool CanSend(bytesInFlight protocol.ByteCount) bool
MaybeExitSlowStart() MaybeExitSlowStart()

View file

@ -92,6 +92,20 @@ func (mr *MockSentPacketHandlerMockRecorder) GetStats() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStats", reflect.TypeOf((*MockSentPacketHandler)(nil).GetStats)) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStats", reflect.TypeOf((*MockSentPacketHandler)(nil).GetStats))
} }
// HasPacingBudget mocks base method
func (m *MockSentPacketHandler) HasPacingBudget() bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "HasPacingBudget")
ret0, _ := ret[0].(bool)
return ret0
}
// HasPacingBudget indicates an expected call of HasPacingBudget
func (mr *MockSentPacketHandlerMockRecorder) HasPacingBudget() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasPacingBudget", reflect.TypeOf((*MockSentPacketHandler)(nil).HasPacingBudget))
}
// OnLossDetectionTimeout mocks base method // OnLossDetectionTimeout mocks base method
func (m *MockSentPacketHandler) OnLossDetectionTimeout() error { func (m *MockSentPacketHandler) OnLossDetectionTimeout() error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
@ -227,20 +241,6 @@ func (mr *MockSentPacketHandlerMockRecorder) SetHandshakeComplete() *gomock.Call
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHandshakeComplete", reflect.TypeOf((*MockSentPacketHandler)(nil).SetHandshakeComplete)) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHandshakeComplete", reflect.TypeOf((*MockSentPacketHandler)(nil).SetHandshakeComplete))
} }
// ShouldSendNumPackets mocks base method
func (m *MockSentPacketHandler) ShouldSendNumPackets() int {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ShouldSendNumPackets")
ret0, _ := ret[0].(int)
return ret0
}
// ShouldSendNumPackets indicates an expected call of ShouldSendNumPackets
func (mr *MockSentPacketHandlerMockRecorder) ShouldSendNumPackets() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ShouldSendNumPackets", reflect.TypeOf((*MockSentPacketHandler)(nil).ShouldSendNumPackets))
}
// TimeUntilSend mocks base method // TimeUntilSend mocks base method
func (m *MockSentPacketHandler) TimeUntilSend() time.Time { func (m *MockSentPacketHandler) TimeUntilSend() time.Time {
m.ctrl.T.Helper() m.ctrl.T.Helper()

View file

@ -63,6 +63,20 @@ func (mr *MockSendAlgorithmWithDebugInfosMockRecorder) GetCongestionWindow() *go
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCongestionWindow", reflect.TypeOf((*MockSendAlgorithmWithDebugInfos)(nil).GetCongestionWindow)) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCongestionWindow", reflect.TypeOf((*MockSendAlgorithmWithDebugInfos)(nil).GetCongestionWindow))
} }
// HasPacingBudget mocks base method
func (m *MockSendAlgorithmWithDebugInfos) HasPacingBudget() bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "HasPacingBudget")
ret0, _ := ret[0].(bool)
return ret0
}
// HasPacingBudget indicates an expected call of HasPacingBudget
func (mr *MockSendAlgorithmWithDebugInfosMockRecorder) HasPacingBudget() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasPacingBudget", reflect.TypeOf((*MockSendAlgorithmWithDebugInfos)(nil).HasPacingBudget))
}
// InRecovery mocks base method // InRecovery mocks base method
func (m *MockSendAlgorithmWithDebugInfos) InRecovery() bool { func (m *MockSendAlgorithmWithDebugInfos) InRecovery() bool {
m.ctrl.T.Helper() m.ctrl.T.Helper()
@ -152,10 +166,10 @@ func (mr *MockSendAlgorithmWithDebugInfosMockRecorder) OnRetransmissionTimeout(a
} }
// TimeUntilSend mocks base method // TimeUntilSend mocks base method
func (m *MockSendAlgorithmWithDebugInfos) TimeUntilSend(arg0 protocol.ByteCount) time.Duration { func (m *MockSendAlgorithmWithDebugInfos) TimeUntilSend(arg0 protocol.ByteCount) time.Time {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "TimeUntilSend", arg0) ret := m.ctrl.Call(m, "TimeUntilSend", arg0)
ret0, _ := ret[0].(time.Duration) ret0, _ := ret[0].(time.Time)
return ret0 return ret0
} }

View file

@ -570,10 +570,6 @@ runLoop:
} }
} }
var pacingDeadline time.Time
if s.pacingDeadline.IsZero() { // the timer didn't have a pacing deadline set
pacingDeadline = s.sentPacketHandler.TimeUntilSend()
}
if keepAliveTime := s.nextKeepAliveTime(); !keepAliveTime.IsZero() && !now.Before(keepAliveTime) { if keepAliveTime := s.nextKeepAliveTime(); !keepAliveTime.IsZero() && !now.Before(keepAliveTime) {
// send a PING frame since there is no activity in the session // send a PING frame since there is no activity in the session
s.logger.Debugf("Sending a keep-alive PING to keep the connection alive.") s.logger.Debugf("Sending a keep-alive PING to keep the connection alive.")
@ -591,12 +587,6 @@ runLoop:
} }
s.destroyImpl(qerr.NewTimeoutError("No recent network activity")) s.destroyImpl(qerr.NewTimeoutError("No recent network activity"))
continue continue
} else if !pacingDeadline.IsZero() && now.Before(pacingDeadline) {
// If we get to this point before the pacing deadline, we should wait until that deadline.
// This can happen when scheduleSending is called, or a packet is received.
// Set the timer and restart the run loop.
s.pacingDeadline = pacingDeadline
continue
} }
if err := s.sendPackets(); err != nil { if err := s.sendPackets(); err != nil {
@ -1356,23 +1346,16 @@ func (s *session) processTransportParametersImpl(params *wire.TransportParameter
func (s *session) sendPackets() error { func (s *session) sendPackets() error {
s.pacingDeadline = time.Time{} s.pacingDeadline = time.Time{}
sendMode := s.sentPacketHandler.SendMode() var sentPacket bool // only used in for packets sent in send mode SendAny
if sendMode == ackhandler.SendNone { // shortcut: return immediately if there's nothing to send
return nil
}
numPackets := s.sentPacketHandler.ShouldSendNumPackets()
var numPacketsSent int
sendLoop:
for { for {
switch sendMode { switch sendMode := s.sentPacketHandler.SendMode(); sendMode {
case ackhandler.SendNone: case ackhandler.SendNone:
break sendLoop return nil
case ackhandler.SendAck: case ackhandler.SendAck:
// If we already sent packets, and the send mode switches to SendAck, // If we already sent packets, and the send mode switches to SendAck,
// we've just become congestion limited. // as we've just become congestion limited.
// There's no need to try to send an ACK at this moment. // There's no need to try to send an ACK at this moment.
if numPacketsSent > 0 { if sentPacket {
return nil return nil
} }
// We can at most send a single ACK only packet. // We can at most send a single ACK only packet.
@ -1383,40 +1366,28 @@ sendLoop:
if err := s.sendProbePacket(protocol.EncryptionInitial); err != nil { if err := s.sendProbePacket(protocol.EncryptionInitial); err != nil {
return err return err
} }
numPacketsSent++
case ackhandler.SendPTOHandshake: case ackhandler.SendPTOHandshake:
if err := s.sendProbePacket(protocol.EncryptionHandshake); err != nil { if err := s.sendProbePacket(protocol.EncryptionHandshake); err != nil {
return err return err
} }
numPacketsSent++
case ackhandler.SendPTOAppData: case ackhandler.SendPTOAppData:
if err := s.sendProbePacket(protocol.Encryption1RTT); err != nil { if err := s.sendProbePacket(protocol.Encryption1RTT); err != nil {
return err return err
} }
numPacketsSent++
case ackhandler.SendAny: case ackhandler.SendAny:
sentPacket, err := s.sendPacket() if s.handshakeComplete && !s.sentPacketHandler.HasPacingBudget() {
if err != nil { s.pacingDeadline = s.sentPacketHandler.TimeUntilSend()
return nil
}
sent, err := s.sendPacket()
if err != nil || !sent {
return err return err
} }
if !sentPacket { sentPacket = true
break sendLoop
}
numPacketsSent++
default: default:
return fmt.Errorf("BUG: invalid send mode %d", sendMode) return fmt.Errorf("BUG: invalid send mode %d", sendMode)
} }
if numPacketsSent >= numPackets {
break
}
sendMode = s.sentPacketHandler.SendMode()
} }
// Only start the pacing timer if we sent as many packets as we were allowed.
// There will probably be more to send when calling sendPacket again.
if numPacketsSent == numPackets {
s.pacingDeadline = s.sentPacketHandler.TimeUntilSend()
}
return nil
} }
func (s *session) maybeSendAckOnlyPacket() error { func (s *session) maybeSendAckOnlyPacket() error {

View file

@ -1067,7 +1067,7 @@ var _ = Describe("Session", func() {
sph.EXPECT().TimeUntilSend().AnyTimes() sph.EXPECT().TimeUntilSend().AnyTimes()
sph.EXPECT().GetLossDetectionTimeout().AnyTimes() sph.EXPECT().GetLossDetectionTimeout().AnyTimes()
sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes()
sph.EXPECT().ShouldSendNumPackets().Return(1000) sph.EXPECT().HasPacingBudget().Return(true).AnyTimes()
sph.EXPECT().SentPacket(gomock.Any()) sph.EXPECT().SentPacket(gomock.Any())
sess.sentPacketHandler = sph sess.sentPacketHandler = sph
runSession() runSession()
@ -1095,7 +1095,6 @@ var _ = Describe("Session", func() {
sph.EXPECT().TimeUntilSend().AnyTimes() sph.EXPECT().TimeUntilSend().AnyTimes()
sph.EXPECT().GetLossDetectionTimeout().AnyTimes() sph.EXPECT().GetLossDetectionTimeout().AnyTimes()
sph.EXPECT().SendMode().Return(ackhandler.SendAck) sph.EXPECT().SendMode().Return(ackhandler.SendAck)
sph.EXPECT().ShouldSendNumPackets().Return(1000)
done := make(chan struct{}) done := make(chan struct{})
packer.EXPECT().MaybePackAckPacket(false).Do(func(bool) { close(done) }) packer.EXPECT().MaybePackAckPacket(false).Do(func(bool) { close(done) })
sess.sentPacketHandler = sph sess.sentPacketHandler = sph
@ -1110,7 +1109,7 @@ var _ = Describe("Session", func() {
sph.EXPECT().TimeUntilSend().AnyTimes() sph.EXPECT().TimeUntilSend().AnyTimes()
sph.EXPECT().GetLossDetectionTimeout().AnyTimes() sph.EXPECT().GetLossDetectionTimeout().AnyTimes()
sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes()
sph.EXPECT().ShouldSendNumPackets().Return(1000) sph.EXPECT().HasPacingBudget().Return(true).AnyTimes()
sph.EXPECT().SentPacket(gomock.Any()) sph.EXPECT().SentPacket(gomock.Any())
sess.sentPacketHandler = sph sess.sentPacketHandler = sph
fc := mocks.NewMockConnectionFlowController(mockCtrl) fc := mocks.NewMockConnectionFlowController(mockCtrl)
@ -1133,7 +1132,7 @@ var _ = Describe("Session", func() {
It("doesn't send when the SentPacketHandler doesn't allow it", func() { It("doesn't send when the SentPacketHandler doesn't allow it", func() {
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().GetLossDetectionTimeout().AnyTimes() sph.EXPECT().GetLossDetectionTimeout().AnyTimes()
sph.EXPECT().SendMode().Return(ackhandler.SendNone) sph.EXPECT().SendMode().Return(ackhandler.SendNone).AnyTimes()
sph.EXPECT().TimeUntilSend().AnyTimes() sph.EXPECT().TimeUntilSend().AnyTimes()
sess.sentPacketHandler = sph sess.sentPacketHandler = sph
runSession() runSession()
@ -1167,7 +1166,7 @@ var _ = Describe("Session", func() {
sph.EXPECT().GetLossDetectionTimeout().AnyTimes() sph.EXPECT().GetLossDetectionTimeout().AnyTimes()
sph.EXPECT().TimeUntilSend().AnyTimes() sph.EXPECT().TimeUntilSend().AnyTimes()
sph.EXPECT().SendMode().Return(sendMode) sph.EXPECT().SendMode().Return(sendMode)
sph.EXPECT().ShouldSendNumPackets().Return(1) sph.EXPECT().SendMode().Return(ackhandler.SendNone)
sph.EXPECT().QueueProbePacket(encLevel) sph.EXPECT().QueueProbePacket(encLevel)
p := getPacket(123) p := getPacket(123)
packer.EXPECT().MaybePackProbePacket(encLevel).Return(p, nil) packer.EXPECT().MaybePackProbePacket(encLevel).Return(p, nil)
@ -1188,7 +1187,7 @@ var _ = Describe("Session", func() {
sph.EXPECT().GetLossDetectionTimeout().AnyTimes() sph.EXPECT().GetLossDetectionTimeout().AnyTimes()
sph.EXPECT().TimeUntilSend().AnyTimes() sph.EXPECT().TimeUntilSend().AnyTimes()
sph.EXPECT().SendMode().Return(sendMode) sph.EXPECT().SendMode().Return(sendMode)
sph.EXPECT().ShouldSendNumPackets().Return(1) sph.EXPECT().SendMode().Return(ackhandler.SendNone)
sph.EXPECT().QueueProbePacket(encLevel).Return(false) sph.EXPECT().QueueProbePacket(encLevel).Return(false)
p := getPacket(123) p := getPacket(123)
packer.EXPECT().MaybePackProbePacket(encLevel).Return(p, nil) packer.EXPECT().MaybePackProbePacket(encLevel).Return(p, nil)
@ -1218,6 +1217,7 @@ var _ = Describe("Session", func() {
sph = mockackhandler.NewMockSentPacketHandler(mockCtrl) sph = mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().GetLossDetectionTimeout().AnyTimes() sph.EXPECT().GetLossDetectionTimeout().AnyTimes()
sess.handshakeConfirmed = true sess.handshakeConfirmed = true
sess.handshakeComplete = true
sess.sentPacketHandler = sph sess.sentPacketHandler = sph
streamManager.EXPECT().CloseWithError(gomock.Any()) streamManager.EXPECT().CloseWithError(gomock.Any())
}) })
@ -1235,10 +1235,10 @@ var _ = Describe("Session", func() {
It("sends multiple packets one by one immediately", func() { It("sends multiple packets one by one immediately", func() {
sph.EXPECT().SentPacket(gomock.Any()).Times(2) sph.EXPECT().SentPacket(gomock.Any()).Times(2)
sph.EXPECT().ShouldSendNumPackets().Return(1).Times(2) sph.EXPECT().HasPacingBudget().Return(true).Times(2)
sph.EXPECT().TimeUntilSend().Return(time.Now()).Times(2) sph.EXPECT().HasPacingBudget()
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour)) sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour))
sph.EXPECT().SendMode().Return(ackhandler.SendAny).Times(2) // allow 2 packets... sph.EXPECT().SendMode().Return(ackhandler.SendAny).Times(3)
packer.EXPECT().PackPacket().Return(getPacket(10), nil) packer.EXPECT().PackPacket().Return(getPacket(10), nil)
packer.EXPECT().PackPacket().Return(getPacket(11), nil) packer.EXPECT().PackPacket().Return(getPacket(11), nil)
mconn.EXPECT().Write(gomock.Any()).Times(2) mconn.EXPECT().Write(gomock.Any()).Times(2)
@ -1255,8 +1255,7 @@ var _ = Describe("Session", func() {
// we shouldn't send the ACK in the same run // we shouldn't send the ACK in the same run
It("doesn't send an ACK right after becoming congestion limited", func() { It("doesn't send an ACK right after becoming congestion limited", func() {
sph.EXPECT().SentPacket(gomock.Any()) sph.EXPECT().SentPacket(gomock.Any())
sph.EXPECT().ShouldSendNumPackets().Return(1000) sph.EXPECT().HasPacingBudget().Return(true)
sph.EXPECT().TimeUntilSend().Return(time.Now())
sph.EXPECT().SendMode().Return(ackhandler.SendAny) sph.EXPECT().SendMode().Return(ackhandler.SendAny)
sph.EXPECT().SendMode().Return(ackhandler.SendAck) sph.EXPECT().SendMode().Return(ackhandler.SendAck)
packer.EXPECT().PackPacket().Return(getPacket(100), nil) packer.EXPECT().PackPacket().Return(getPacket(100), nil)
@ -1272,14 +1271,19 @@ var _ = Describe("Session", func() {
It("paces packets", func() { It("paces packets", func() {
pacingDelay := scaleDuration(100 * time.Millisecond) pacingDelay := scaleDuration(100 * time.Millisecond)
sph.EXPECT().SentPacket(gomock.Any()).Times(2)
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(-time.Minute)) // send one packet immediately
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(pacingDelay)) // send one
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour))
sph.EXPECT().ShouldSendNumPackets().Times(2).Return(1)
sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes()
packer.EXPECT().PackPacket().Return(getPacket(100), nil) gomock.InOrder(
packer.EXPECT().PackPacket().Return(getPacket(101), nil) sph.EXPECT().HasPacingBudget().Return(true),
packer.EXPECT().PackPacket().Return(getPacket(100), nil),
sph.EXPECT().SentPacket(gomock.Any()),
sph.EXPECT().HasPacingBudget(),
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(pacingDelay)),
sph.EXPECT().HasPacingBudget().Return(true),
packer.EXPECT().PackPacket().Return(getPacket(101), nil),
sph.EXPECT().SentPacket(gomock.Any()),
sph.EXPECT().HasPacingBudget(),
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour)),
)
written := make(chan struct{}, 2) written := make(chan struct{}, 2)
mconn.EXPECT().Write(gomock.Any()).DoAndReturn(func(p []byte) (int, error) { mconn.EXPECT().Write(gomock.Any()).DoAndReturn(func(p []byte) (int, error) {
written <- struct{}{} written <- struct{}{}
@ -1298,10 +1302,10 @@ var _ = Describe("Session", func() {
It("sends multiple packets at once", func() { It("sends multiple packets at once", func() {
sph.EXPECT().SentPacket(gomock.Any()).Times(3) sph.EXPECT().SentPacket(gomock.Any()).Times(3)
sph.EXPECT().ShouldSendNumPackets().Return(3) sph.EXPECT().HasPacingBudget().Return(true).Times(3)
sph.EXPECT().TimeUntilSend().Return(time.Now()) sph.EXPECT().HasPacingBudget()
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour)) sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour))
sph.EXPECT().SendMode().Return(ackhandler.SendAny).Times(3) sph.EXPECT().SendMode().Return(ackhandler.SendAny).Times(4)
packer.EXPECT().PackPacket().Return(getPacket(1000), nil) packer.EXPECT().PackPacket().Return(getPacket(1000), nil)
packer.EXPECT().PackPacket().Return(getPacket(1001), nil) packer.EXPECT().PackPacket().Return(getPacket(1001), nil)
packer.EXPECT().PackPacket().Return(getPacket(1002), nil) packer.EXPECT().PackPacket().Return(getPacket(1002), nil)
@ -1320,8 +1324,7 @@ var _ = Describe("Session", func() {
}) })
It("doesn't set a pacing timer when there is no data to send", func() { It("doesn't set a pacing timer when there is no data to send", func() {
sph.EXPECT().TimeUntilSend().Return(time.Now()) sph.EXPECT().HasPacingBudget().Return(true)
sph.EXPECT().ShouldSendNumPackets().Return(1)
sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes()
packer.EXPECT().PackPacket() packer.EXPECT().PackPacket()
// don't EXPECT any calls to mconn.Write() // don't EXPECT any calls to mconn.Write()
@ -1357,10 +1360,11 @@ var _ = Describe("Session", func() {
sph.EXPECT().GetLossDetectionTimeout().AnyTimes() sph.EXPECT().GetLossDetectionTimeout().AnyTimes()
sph.EXPECT().TimeUntilSend().AnyTimes() sph.EXPECT().TimeUntilSend().AnyTimes()
sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes()
sph.EXPECT().ShouldSendNumPackets().AnyTimes().Return(1) sph.EXPECT().HasPacingBudget().Return(true).AnyTimes()
sph.EXPECT().SentPacket(gomock.Any()) sph.EXPECT().SentPacket(gomock.Any())
sess.sentPacketHandler = sph sess.sentPacketHandler = sph
packer.EXPECT().PackPacket().Return(getPacket(1), nil) packer.EXPECT().PackPacket().Return(getPacket(1), nil)
packer.EXPECT().PackPacket().Return(nil, nil)
go func() { go func() {
defer GinkgoRecover() defer GinkgoRecover()
@ -1379,12 +1383,11 @@ var _ = Describe("Session", func() {
It("sets the timer to the ack timer", func() { It("sets the timer to the ack timer", func() {
packer.EXPECT().PackPacket().Return(getPacket(1234), nil) packer.EXPECT().PackPacket().Return(getPacket(1234), nil)
packer.EXPECT().PackPacket().Return(nil, nil)
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().TimeUntilSend().Return(time.Now())
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour))
sph.EXPECT().GetLossDetectionTimeout().AnyTimes() sph.EXPECT().GetLossDetectionTimeout().AnyTimes()
sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes()
sph.EXPECT().ShouldSendNumPackets().Return(1) sph.EXPECT().HasPacingBudget().Return(true).AnyTimes()
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) { sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {
Expect(p.PacketNumber).To(Equal(protocol.PacketNumber(1234))) Expect(p.PacketNumber).To(Equal(protocol.PacketNumber(1234)))
}) })
@ -1408,6 +1411,7 @@ var _ = Describe("Session", func() {
}) })
It("sends coalesced packets before the handshake is confirmed", func() { It("sends coalesced packets before the handshake is confirmed", func() {
sess.handshakeComplete = false
sess.handshakeConfirmed = false sess.handshakeConfirmed = false
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
const window protocol.ByteCount = 321 const window protocol.ByteCount = 321
@ -1445,7 +1449,6 @@ var _ = Describe("Session", func() {
sph.EXPECT().GetLossDetectionTimeout().AnyTimes() sph.EXPECT().GetLossDetectionTimeout().AnyTimes()
sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes()
sph.EXPECT().TimeUntilSend().Return(time.Now()).AnyTimes() sph.EXPECT().TimeUntilSend().Return(time.Now()).AnyTimes()
sph.EXPECT().ShouldSendNumPackets().Return(1).AnyTimes()
gomock.InOrder( gomock.InOrder(
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) { sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {
Expect(p.EncryptionLevel).To(Equal(protocol.EncryptionInitial)) Expect(p.EncryptionLevel).To(Equal(protocol.EncryptionInitial))
@ -1597,7 +1600,7 @@ var _ = Describe("Session", func() {
sph.EXPECT().SetHandshakeComplete() sph.EXPECT().SetHandshakeComplete()
sph.EXPECT().GetLossDetectionTimeout().AnyTimes() sph.EXPECT().GetLossDetectionTimeout().AnyTimes()
sph.EXPECT().TimeUntilSend().AnyTimes() sph.EXPECT().TimeUntilSend().AnyTimes()
sph.EXPECT().ShouldSendNumPackets().AnyTimes().Return(10) sph.EXPECT().HasPacingBudget().Return(true)
sess.sentPacketHandler = sph sess.sentPacketHandler = sph
done := make(chan struct{}) done := make(chan struct{})
sessionRunner.EXPECT().Retire(clientDestConnID) sessionRunner.EXPECT().Retire(clientDestConnID)