use the actual maximum packet size in the Reno congestion controller

This commit is contained in:
Marten Seemann 2021-01-25 14:23:11 +08:00
parent 8895a79e30
commit dd8b21f264
7 changed files with 97 additions and 39 deletions

View file

@ -38,6 +38,7 @@ type SentPacketHandler interface {
TimeUntilSend() time.Time TimeUntilSend() time.Time
// HasPacingBudget says if the pacer allows sending of a (full size) packet at this moment. // HasPacingBudget says if the pacer allows sending of a (full size) packet at this moment.
HasPacingBudget() bool HasPacingBudget() bool
SetMaxDatagramSize(count protocol.ByteCount)
// only to be called once the handshake is complete // only to be called once the handshake is complete
QueueProbePacket(protocol.EncryptionLevel) bool /* was a packet queued */ QueueProbePacket(protocol.EncryptionLevel) bool /* was a packet queued */

View file

@ -707,6 +707,10 @@ func (h *sentPacketHandler) HasPacingBudget() bool {
return h.congestion.HasPacingBudget() return h.congestion.HasPacingBudget()
} }
func (h *sentPacketHandler) SetMaxDatagramSize(s protocol.ByteCount) {
h.congestion.SetMaxDatagramSize(s)
}
func (h *sentPacketHandler) isAmplificationLimited() bool { func (h *sentPacketHandler) isAmplificationLimited() bool {
if h.peerAddressValidated { if h.peerAddressValidated {
return false return false

View file

@ -11,12 +11,13 @@ import (
const ( const (
// maxDatagramSize is the default maximum packet size used in the Linux TCP implementation. // maxDatagramSize is the default maximum packet size used in the Linux TCP implementation.
// Used in QUIC for congestion window computations in bytes. // Used in QUIC for congestion window computations in bytes.
maxDatagramSize = protocol.ByteCount(protocol.InitialPacketSizeIPv4) maxDatagramSize = protocol.ByteCount(protocol.InitialPacketSizeIPv4)
maxBurstBytes = 3 * maxDatagramSize initialMaxDatagramSize = protocol.ByteCount(protocol.InitialPacketSizeIPv4)
renoBeta = 0.7 // Reno backoff factor. maxBurstPackets = 3
maxCongestionWindow = protocol.MaxCongestionWindowPackets * maxDatagramSize renoBeta = 0.7 // Reno backoff factor.
minCongestionWindow = 2 * maxDatagramSize initialMaxCongestionWindow = protocol.MaxCongestionWindowPackets * initialMaxDatagramSize
initialCongestionWindow = 32 * maxDatagramSize minCongestionWindowPackets = 2
initialCongestionWindow = 32 * initialMaxDatagramSize
) )
type cubicSender struct { type cubicSender struct {
@ -44,12 +45,6 @@ type cubicSender struct {
// Congestion window in packets. // Congestion window in packets.
congestionWindow protocol.ByteCount congestionWindow protocol.ByteCount
// Minimum congestion window in packets.
minCongestionWindow protocol.ByteCount
// Maximum congestion window.
maxCongestionWindow protocol.ByteCount
// Slow start congestion window in bytes, aka ssthresh. // Slow start congestion window in bytes, aka ssthresh.
slowStartThreshold protocol.ByteCount slowStartThreshold protocol.ByteCount
@ -59,6 +54,8 @@ type cubicSender struct {
initialCongestionWindow protocol.ByteCount initialCongestionWindow protocol.ByteCount
initialMaxCongestionWindow protocol.ByteCount initialMaxCongestionWindow protocol.ByteCount
maxDatagramSize protocol.ByteCount
lastState logging.CongestionState lastState logging.CongestionState
tracer logging.ConnectionTracer tracer logging.ConnectionTracer
} }
@ -70,7 +67,7 @@ var (
// NewCubicSender makes a new cubic sender // NewCubicSender makes a new cubic sender
func NewCubicSender(clock Clock, rttStats *utils.RTTStats, reno bool, tracer logging.ConnectionTracer) *cubicSender { func NewCubicSender(clock Clock, rttStats *utils.RTTStats, reno bool, tracer logging.ConnectionTracer) *cubicSender {
return newCubicSender(clock, rttStats, reno, initialCongestionWindow, maxCongestionWindow, tracer) return newCubicSender(clock, rttStats, reno, initialCongestionWindow, initialMaxCongestionWindow, tracer)
} }
func newCubicSender(clock Clock, rttStats *utils.RTTStats, reno bool, initialCongestionWindow, initialMaxCongestionWindow protocol.ByteCount, tracer logging.ConnectionTracer) *cubicSender { func newCubicSender(clock Clock, rttStats *utils.RTTStats, reno bool, initialCongestionWindow, initialMaxCongestionWindow protocol.ByteCount, tracer logging.ConnectionTracer) *cubicSender {
@ -82,13 +79,12 @@ func newCubicSender(clock Clock, rttStats *utils.RTTStats, reno bool, initialCon
initialCongestionWindow: initialCongestionWindow, initialCongestionWindow: initialCongestionWindow,
initialMaxCongestionWindow: initialMaxCongestionWindow, initialMaxCongestionWindow: initialMaxCongestionWindow,
congestionWindow: initialCongestionWindow, congestionWindow: initialCongestionWindow,
minCongestionWindow: minCongestionWindow,
slowStartThreshold: protocol.MaxByteCount, slowStartThreshold: protocol.MaxByteCount,
maxCongestionWindow: initialMaxCongestionWindow,
cubic: NewCubic(clock), cubic: NewCubic(clock),
clock: clock, clock: clock,
reno: reno, reno: reno,
tracer: tracer, tracer: tracer,
maxDatagramSize: initialMaxDatagramSize,
} }
c.pacer = newPacer(c.BandwidthEstimate) c.pacer = newPacer(c.BandwidthEstimate)
if c.tracer != nil { if c.tracer != nil {
@ -104,12 +100,20 @@ func (c *cubicSender) TimeUntilSend(_ protocol.ByteCount) time.Time {
} }
func (c *cubicSender) HasPacingBudget() bool { func (c *cubicSender) HasPacingBudget() bool {
return c.pacer.Budget(c.clock.Now()) >= maxDatagramSize return c.pacer.Budget(c.clock.Now()) >= c.maxDatagramSize
}
func (c *cubicSender) maxCongestionWindow() protocol.ByteCount {
return c.maxDatagramSize * protocol.MaxCongestionWindowPackets
}
func (c *cubicSender) minCongestionWindow() protocol.ByteCount {
return c.maxDatagramSize * minCongestionWindowPackets
} }
func (c *cubicSender) OnPacketSent( func (c *cubicSender) OnPacketSent(
sentTime time.Time, sentTime time.Time,
bytesInFlight protocol.ByteCount, _ protocol.ByteCount,
packetNumber protocol.PacketNumber, packetNumber protocol.PacketNumber,
bytes protocol.ByteCount, bytes protocol.ByteCount,
isRetransmittable bool, isRetransmittable bool,
@ -139,7 +143,8 @@ func (c *cubicSender) GetCongestionWindow() protocol.ByteCount {
} }
func (c *cubicSender) MaybeExitSlowStart() { func (c *cubicSender) MaybeExitSlowStart() {
if c.InSlowStart() && c.hybridSlowStart.ShouldExitSlowStart(c.rttStats.LatestRTT(), c.rttStats.MinRTT(), c.GetCongestionWindow()/maxDatagramSize) { if c.InSlowStart() &&
c.hybridSlowStart.ShouldExitSlowStart(c.rttStats.LatestRTT(), c.rttStats.MinRTT(), c.GetCongestionWindow()/c.maxDatagramSize) {
// exit slow start // exit slow start
c.slowStartThreshold = c.congestionWindow c.slowStartThreshold = c.congestionWindow
c.maybeTraceStateChange(logging.CongestionStateCongestionAvoidance) c.maybeTraceStateChange(logging.CongestionStateCongestionAvoidance)
@ -162,11 +167,7 @@ func (c *cubicSender) OnPacketAcked(
} }
} }
func (c *cubicSender) OnPacketLost( func (c *cubicSender) OnPacketLost(packetNumber protocol.PacketNumber, lostBytes, priorInFlight protocol.ByteCount) {
packetNumber protocol.PacketNumber,
lostBytes protocol.ByteCount,
priorInFlight protocol.ByteCount,
) {
// TCP NewReno (RFC6582) says that once a loss occurs, any losses in packets // TCP NewReno (RFC6582) says that once a loss occurs, any losses in packets
// already sent should be treated as a single loss event, since it's expected. // already sent should be treated as a single loss event, since it's expected.
if packetNumber <= c.largestSentAtLastCutback { if packetNumber <= c.largestSentAtLastCutback {
@ -180,8 +181,8 @@ func (c *cubicSender) OnPacketLost(
} else { } else {
c.congestionWindow = c.cubic.CongestionWindowAfterPacketLoss(c.congestionWindow) c.congestionWindow = c.cubic.CongestionWindowAfterPacketLoss(c.congestionWindow)
} }
if c.congestionWindow < c.minCongestionWindow { if minCwnd := c.minCongestionWindow(); c.congestionWindow < minCwnd {
c.congestionWindow = c.minCongestionWindow c.congestionWindow = minCwnd
} }
c.slowStartThreshold = c.congestionWindow c.slowStartThreshold = c.congestionWindow
c.largestSentAtLastCutback = c.largestSentPacketNumber c.largestSentAtLastCutback = c.largestSentPacketNumber
@ -205,12 +206,12 @@ func (c *cubicSender) maybeIncreaseCwnd(
c.maybeTraceStateChange(logging.CongestionStateApplicationLimited) c.maybeTraceStateChange(logging.CongestionStateApplicationLimited)
return return
} }
if c.congestionWindow >= c.maxCongestionWindow { if c.congestionWindow >= c.maxCongestionWindow() {
return return
} }
if c.InSlowStart() { if c.InSlowStart() {
// TCP slow start, exponential growth, increase by one for each ACK. // TCP slow start, exponential growth, increase by one for each ACK.
c.congestionWindow += maxDatagramSize c.congestionWindow += c.maxDatagramSize
c.maybeTraceStateChange(logging.CongestionStateSlowStart) c.maybeTraceStateChange(logging.CongestionStateSlowStart)
return return
} }
@ -219,12 +220,12 @@ func (c *cubicSender) maybeIncreaseCwnd(
if c.reno { if c.reno {
// Classic Reno congestion avoidance. // Classic Reno congestion avoidance.
c.numAckedPackets++ c.numAckedPackets++
if c.numAckedPackets >= uint64(c.congestionWindow/maxDatagramSize) { if c.numAckedPackets >= uint64(c.congestionWindow/c.maxDatagramSize) {
c.congestionWindow += maxDatagramSize c.congestionWindow += c.maxDatagramSize
c.numAckedPackets = 0 c.numAckedPackets = 0
} }
} else { } else {
c.congestionWindow = utils.MinByteCount(c.maxCongestionWindow, c.cubic.CongestionWindowAfterAck(ackedBytes, c.congestionWindow, c.rttStats.MinRTT(), eventTime)) c.congestionWindow = utils.MinByteCount(c.maxCongestionWindow(), c.cubic.CongestionWindowAfterAck(ackedBytes, c.congestionWindow, c.rttStats.MinRTT(), eventTime))
} }
} }
@ -235,7 +236,7 @@ func (c *cubicSender) isCwndLimited(bytesInFlight protocol.ByteCount) bool {
} }
availableBytes := congestionWindow - bytesInFlight availableBytes := congestionWindow - bytesInFlight
slowStartLimited := c.InSlowStart() && bytesInFlight > congestionWindow/2 slowStartLimited := c.InSlowStart() && bytesInFlight > congestionWindow/2
return slowStartLimited || availableBytes <= maxBurstBytes return slowStartLimited || availableBytes <= maxBurstPackets*c.maxDatagramSize
} }
// BandwidthEstimate returns the current bandwidth estimate // BandwidthEstimate returns the current bandwidth estimate
@ -257,7 +258,7 @@ func (c *cubicSender) OnRetransmissionTimeout(packetsRetransmitted bool) {
c.hybridSlowStart.Restart() c.hybridSlowStart.Restart()
c.cubic.Reset() c.cubic.Reset()
c.slowStartThreshold = c.congestionWindow / 2 c.slowStartThreshold = c.congestionWindow / 2
c.congestionWindow = c.minCongestionWindow c.congestionWindow = c.minCongestionWindow()
} }
// OnConnectionMigration is called when the connection is migrated (?) // OnConnectionMigration is called when the connection is migrated (?)
@ -271,7 +272,6 @@ func (c *cubicSender) OnConnectionMigration() {
c.numAckedPackets = 0 c.numAckedPackets = 0
c.congestionWindow = c.initialCongestionWindow c.congestionWindow = c.initialCongestionWindow
c.slowStartThreshold = c.initialMaxCongestionWindow c.slowStartThreshold = c.initialMaxCongestionWindow
c.maxCongestionWindow = c.initialMaxCongestionWindow
} }
func (c *cubicSender) maybeTraceStateChange(new logging.CongestionState) { func (c *cubicSender) maybeTraceStateChange(new logging.CongestionState) {
@ -281,3 +281,14 @@ func (c *cubicSender) maybeTraceStateChange(new logging.CongestionState) {
c.tracer.UpdatedCongestionState(new) c.tracer.UpdatedCongestionState(new)
c.lastState = new c.lastState = new
} }
func (c *cubicSender) SetMaxDatagramSize(s protocol.ByteCount) {
if s < c.maxDatagramSize {
panic("congestion BUG: decreased max datagram size")
}
cwndIsMinCwnd := c.congestionWindow == c.minCongestionWindow()
c.maxDatagramSize = s
if cwndIsMinCwnd {
c.congestionWindow = c.minCongestionWindow()
}
}

View file

@ -46,7 +46,7 @@ var _ = Describe("Cubic Sender", func() {
}) })
SendAvailableSendWindowLen := func(packetLength protocol.ByteCount) int { SendAvailableSendWindowLen := func(packetLength protocol.ByteCount) int {
packetsSent := 0 var packetsSent int
for sender.CanSend(bytesInFlight) { for sender.CanSend(bytesInFlight) {
sender.OnPacketSent(clock.Now(), bytesInFlight, packetNumber, packetLength, true) sender.OnPacketSent(clock.Now(), bytesInFlight, packetNumber, packetLength, true)
packetNumber++ packetNumber++
@ -449,15 +449,32 @@ var _ = Describe("Cubic Sender", func() {
Expect(sender.hybridSlowStart.Started()).To(BeFalse()) Expect(sender.hybridSlowStart.Started()).To(BeFalse())
}) })
It("default max cwnd", func() { It("slow starts up to the maximum congestion window", func() {
sender = newCubicSender(&clock, rttStats, true /*reno*/, initialCongestionWindowPackets*maxDatagramSize, maxCongestionWindow, nil) sender = newCubicSender(&clock, rttStats, true, initialCongestionWindowPackets*maxDatagramSize, initialMaxCongestionWindow, nil)
defaultMaxCongestionWindowPackets := maxCongestionWindow / maxDatagramSize for i := 1; i < protocol.MaxCongestionWindowPackets; i++ {
for i := 1; i < int(defaultMaxCongestionWindowPackets); i++ {
sender.MaybeExitSlowStart() sender.MaybeExitSlowStart()
sender.OnPacketAcked(protocol.PacketNumber(i), 1350, sender.GetCongestionWindow(), clock.Now()) sender.OnPacketAcked(protocol.PacketNumber(i), 1350, sender.GetCongestionWindow(), clock.Now())
} }
Expect(sender.GetCongestionWindow()).To(Equal(maxCongestionWindow)) Expect(sender.GetCongestionWindow()).To(Equal(initialMaxCongestionWindow))
})
It("doesn't allow reductions of the maximum packet size", func() {
Expect(func() { sender.SetMaxDatagramSize(initialMaxDatagramSize - 1) }).To(Panic())
})
It("slow starts up to maximum congestion window, if larger packets are sent", func() {
sender = newCubicSender(&clock, rttStats, true, initialCongestionWindowPackets*maxDatagramSize, initialMaxCongestionWindow, nil)
const packetSize = initialMaxDatagramSize + 100
sender.SetMaxDatagramSize(packetSize)
for i := 1; i < protocol.MaxCongestionWindowPackets; i++ {
sender.OnPacketAcked(protocol.PacketNumber(i), packetSize, sender.GetCongestionWindow(), clock.Now())
}
const maxCwnd = protocol.MaxCongestionWindowPackets * packetSize
Expect(sender.GetCongestionWindow()).To(And(
BeNumerically(">", maxCwnd),
BeNumerically("<=", maxCwnd+packetSize),
))
}) })
It("limit cwnd increase in congestion avoidance", func() { It("limit cwnd increase in congestion avoidance", func() {

View file

@ -16,6 +16,7 @@ type SendAlgorithm interface {
OnPacketAcked(number protocol.PacketNumber, ackedBytes protocol.ByteCount, priorInFlight protocol.ByteCount, eventTime time.Time) OnPacketAcked(number protocol.PacketNumber, ackedBytes protocol.ByteCount, priorInFlight protocol.ByteCount, eventTime time.Time)
OnPacketLost(number protocol.PacketNumber, lostBytes protocol.ByteCount, priorInFlight protocol.ByteCount) OnPacketLost(number protocol.PacketNumber, lostBytes protocol.ByteCount, priorInFlight protocol.ByteCount)
OnRetransmissionTimeout(packetsRetransmitted bool) OnRetransmissionTimeout(packetsRetransmitted bool)
SetMaxDatagramSize(protocol.ByteCount)
} }
// A SendAlgorithmWithDebugInfos is a SendAlgorithm that exposes some debug infos // A SendAlgorithmWithDebugInfos is a SendAlgorithm that exposes some debug infos

View file

@ -212,6 +212,18 @@ func (mr *MockSentPacketHandlerMockRecorder) SetHandshakeConfirmed() *gomock.Cal
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHandshakeConfirmed", reflect.TypeOf((*MockSentPacketHandler)(nil).SetHandshakeConfirmed)) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHandshakeConfirmed", reflect.TypeOf((*MockSentPacketHandler)(nil).SetHandshakeConfirmed))
} }
// SetMaxDatagramSize mocks base method.
func (m *MockSentPacketHandler) SetMaxDatagramSize(arg0 protocol.ByteCount) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "SetMaxDatagramSize", arg0)
}
// SetMaxDatagramSize indicates an expected call of SetMaxDatagramSize.
func (mr *MockSentPacketHandlerMockRecorder) SetMaxDatagramSize(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetMaxDatagramSize", reflect.TypeOf((*MockSentPacketHandler)(nil).SetMaxDatagramSize), arg0)
}
// TimeUntilSend mocks base method. // TimeUntilSend mocks base method.
func (m *MockSentPacketHandler) TimeUntilSend() time.Time { func (m *MockSentPacketHandler) TimeUntilSend() time.Time {
m.ctrl.T.Helper() m.ctrl.T.Helper()

View file

@ -165,6 +165,18 @@ func (mr *MockSendAlgorithmWithDebugInfosMockRecorder) OnRetransmissionTimeout(a
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnRetransmissionTimeout", reflect.TypeOf((*MockSendAlgorithmWithDebugInfos)(nil).OnRetransmissionTimeout), arg0) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnRetransmissionTimeout", reflect.TypeOf((*MockSendAlgorithmWithDebugInfos)(nil).OnRetransmissionTimeout), arg0)
} }
// SetMaxDatagramSize mocks base method.
func (m *MockSendAlgorithmWithDebugInfos) SetMaxDatagramSize(arg0 protocol.ByteCount) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "SetMaxDatagramSize", arg0)
}
// SetMaxDatagramSize indicates an expected call of SetMaxDatagramSize.
func (mr *MockSendAlgorithmWithDebugInfosMockRecorder) SetMaxDatagramSize(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetMaxDatagramSize", reflect.TypeOf((*MockSendAlgorithmWithDebugInfos)(nil).SetMaxDatagramSize), arg0)
}
// TimeUntilSend mocks base method. // TimeUntilSend mocks base method.
func (m *MockSendAlgorithmWithDebugInfos) TimeUntilSend(arg0 protocol.ByteCount) time.Time { func (m *MockSendAlgorithmWithDebugInfos) TimeUntilSend(arg0 protocol.ByteCount) time.Time {
m.ctrl.T.Helper() m.ctrl.T.Helper()