diff --git a/congestion/cubic.go b/congestion/cubic.go index fce7241f..80e9aa31 100644 --- a/congestion/cubic.go +++ b/congestion/cubic.go @@ -215,3 +215,8 @@ func (c *Cubic) CongestionWindowAfterAck(currentCongestionWindow protocol.Packet return targetCongestionWindow } + +// SetNumConnections sets the number of emulated connections +func (c *Cubic) SetNumConnections(n int) { + c.numConnections = n +} diff --git a/congestion/cubic_sender.go b/congestion/cubic_sender.go index aae8aff2..56985142 100644 --- a/congestion/cubic_sender.go +++ b/congestion/cubic_sender.go @@ -5,11 +5,13 @@ import ( "time" "github.com/lucas-clemente/quic-go/protocol" + "github.com/lucas-clemente/quic-go/utils" ) const ( maxBurstBytes = 3 * protocol.DefaultTCPMSS defaultMinimumCongestionWindow protocol.PacketNumber = 2 + renoBeta float32 = 0.7 // Reno backoff factor. ) type cubicSender struct { @@ -46,17 +48,27 @@ type cubicSender struct { // Maximum number of outstanding packets for tcp. maxTCPCongestionWindow protocol.PacketNumber + + // Number of connections to simulate. + numConnections int + + // ACK counter for the Reno implementation. + congestionWindowCount uint64 + + reno bool } // NewCubicSender makes a new cubic sender -func NewCubicSender(clock Clock, rttStats *RTTStats, initialCongestionWindow protocol.PacketNumber) SendAlgorithm { +func NewCubicSender(clock Clock, rttStats *RTTStats, reno bool, initialCongestionWindow, maxCongestionWindow protocol.PacketNumber) SendAlgorithm { return &cubicSender{ rttStats: rttStats, - minCongestionWindow: defaultMinimumCongestionWindow, congestionWindow: initialCongestionWindow, - maxTCPCongestionWindow: protocol.MaxCongestionWindow, - slowstartThreshold: protocol.MaxCongestionWindow, + minCongestionWindow: defaultMinimumCongestionWindow, + slowstartThreshold: maxCongestionWindow, + maxTCPCongestionWindow: maxCongestionWindow, + numConnections: defaultNumConnections, cubic: NewCubic(clock), + reno: reno, } } @@ -105,6 +117,10 @@ func (c *cubicSender) ExitSlowstart() { c.slowstartThreshold = c.congestionWindow } +func (c *cubicSender) SlowstartThreshold() protocol.PacketNumber { + return c.slowstartThreshold +} + // OnCongestionEvent indicates an update to the congestion state, caused either by an incoming // ack or loss event timeout. |rttUpdated| indicates whether a new // latest_rtt sample has been taken, |byte_in_flight| the bytes in flight @@ -159,6 +175,8 @@ func (c *cubicSender) onPacketLost(packetNumber protocol.PacketNumber, lostBytes // TODO(chromium): Separate out all of slow start into a separate class. if c.slowStartLargeReduction && c.InSlowStart() { c.congestionWindow = c.congestionWindow - 1 + } else if c.reno { + c.congestionWindow = protocol.PacketNumber(float32(c.congestionWindow) * c.renoBeta()) } else { c.congestionWindow = c.cubic.CongestionWindowAfterPacketLoss(c.congestionWindow) } @@ -168,6 +186,17 @@ func (c *cubicSender) onPacketLost(packetNumber protocol.PacketNumber, lostBytes } c.slowstartThreshold = c.congestionWindow c.largestSentAtLastCutback = c.largestSentPacketNumber + // reset packet count from congestion avoidance mode. We start + // counting again when we're out of recovery. + c.congestionWindowCount = 0 +} + +func (c *cubicSender) renoBeta() float32 { + // kNConnectionBeta is the backoff factor after loss for our N-connection + // emulation, which emulates the effective backoff of an ensemble of N + // TCP-Reno connections on a single loss event. The effective multiplier is + // computed as: + return (float32(c.numConnections) - 1. + renoBeta) / float32(c.numConnections) } // Called when we receive an ack. Normal TCP tracks how many packets one ack @@ -187,7 +216,18 @@ func (c *cubicSender) maybeIncreaseCwnd(ackedPacketNumber protocol.PacketNumber, c.congestionWindow++ return } - c.congestionWindow = protocol.MinPacketNumber(c.maxTCPCongestionWindow, c.cubic.CongestionWindowAfterAck(c.congestionWindow, c.rttStats.MinRTT())) + if c.reno { + // Classic Reno congestion avoidance. + c.congestionWindowCount++ + // Divide by num_connections to smoothly increase the CWND at a faster + // rate than conventional Reno. + if protocol.PacketNumber(c.congestionWindowCount*uint64(c.numConnections)) >= c.congestionWindow { + c.congestionWindow++ + c.congestionWindowCount = 0 + } + } else { + c.congestionWindow = protocol.MinPacketNumber(c.maxTCPCongestionWindow, c.cubic.CongestionWindowAfterAck(c.congestionWindow, c.rttStats.MinRTT())) + } } func (c *cubicSender) isCwndLimited(bytesInFlight uint64) bool { @@ -209,3 +249,26 @@ func (c *cubicSender) BandwidthEstimate() Bandwidth { } return BandwidthFromDelta(c.GetCongestionWindow(), srtt) } + +// HybridSlowStart returns the hybrid slow start instance for testing +func (c *cubicSender) HybridSlowStart() *HybridSlowStart { + return &c.hybridSlowStart +} + +// SetNumEmulatedConnections sets the number of emulated connections +func (c *cubicSender) SetNumEmulatedConnections(n int) { + c.numConnections = utils.Max(n, 1) + c.cubic.SetNumConnections(c.numConnections) +} + +// OnRetransmissionTimeout is called on an retransmission timeout +func (c *cubicSender) OnRetransmissionTimeout(packetsRetransmitted bool) { + c.largestSentAtLastCutback = 0 + if !packetsRetransmitted { + return + } + c.hybridSlowStart.Restart() + c.cubic.Reset() + c.slowstartThreshold = c.congestionWindow / 2 + c.congestionWindow = c.minCongestionWindow +} diff --git a/congestion/cubic_sender_test.go b/congestion/cubic_sender_test.go index b4db85c2..99dab140 100644 --- a/congestion/cubic_sender_test.go +++ b/congestion/cubic_sender_test.go @@ -11,6 +11,7 @@ import ( const initialCongestionWindowPackets protocol.PacketNumber = 10 const defaultWindowTCP = uint64(initialCongestionWindowPackets * protocol.DefaultTCPMSS) +const renoBeta float32 = 0.7 // Reno backoff factor. type mockClock time.Time @@ -35,12 +36,13 @@ var _ = Describe("Cubic Sender", func() { BeforeEach(func() { bytesInFlight = 0 packetNumber = 1 + ackedPacketNumber = 0 clock = mockClock{} rttStats = congestion.NewRTTStats() - sender = congestion.NewCubicSender(&clock, rttStats, initialCongestionWindowPackets) + sender = congestion.NewCubicSender(&clock, rttStats, true /*reno*/, initialCongestionWindowPackets, protocol.MaxCongestionWindow) }) - SendAvailableSendWindow := func(packetLength uint64) int { + SendAvailableSendWindow := func() int { // Send as long as TimeUntilSend returns Zero. packets_sent := 0 can_send := sender.TimeUntilSend(clock.Now(), bytesInFlight) == 0 @@ -68,6 +70,18 @@ var _ = Describe("Cubic Sender", func() { clock.Advance(time.Millisecond) } + LoseNPackets := func(n int) { + packetLength := uint64(protocol.DefaultTCPMSS) + var ackedPackets congestion.PacketVector + var lostPackets congestion.PacketVector + for i := 0; i < n; i++ { + ackedPacketNumber++ + lostPackets = append(lostPackets, congestion.PacketInfo{Number: ackedPacketNumber, Length: packetLength}) + } + sender.OnCongestionEvent(false, bytesInFlight, ackedPackets, lostPackets) + bytesInFlight -= uint64(n) * packetLength + } + It("simpler sender", func() { // At startup make sure we are at the default. Expect(sender.GetCongestionWindow()).To(Equal(defaultWindowTCP)) @@ -79,7 +93,7 @@ var _ = Describe("Cubic Sender", func() { Expect(sender.GetCongestionWindow()).To(Equal(defaultWindowTCP)) // Fill the send window with data, then verify that we can't send. - SendAvailableSendWindow(protocol.DefaultTCPMSS) + SendAvailableSendWindow() Expect(sender.TimeUntilSend(clock.Now(), sender.GetCongestionWindow())).ToNot(BeZero()) }) @@ -91,7 +105,7 @@ var _ = Describe("Cubic Sender", func() { // Make sure we can send. Expect(sender.TimeUntilSend(clock.Now(), 0)).To(BeZero()) - SendAvailableSendWindow(protocol.DefaultTCPMSS) + SendAvailableSendWindow() for i := 0; i < kNumberOfAcks; i++ { AckNPackets(2) } @@ -111,7 +125,7 @@ var _ = Describe("Cubic Sender", func() { for i := 0; i < kNumberOfAcks; i++ { // Send our full send window. - SendAvailableSendWindow(protocol.DefaultTCPMSS) + SendAvailableSendWindow() AckNPackets(2) } cwnd := sender.GetCongestionWindow() @@ -119,4 +133,181 @@ var _ = Describe("Cubic Sender", func() { Expect(sender.BandwidthEstimate()).To(Equal(congestion.BandwidthFromDelta(cwnd, rttStats.SmoothedRTT()))) }) + PIt("slow start packet loss", func() { + sender.SetNumEmulatedConnections(1) + const kNumberOfAcks = 10 + for i := 0; i < kNumberOfAcks; i++ { + // Send our full send window. + SendAvailableSendWindow() + AckNPackets(2) + } + SendAvailableSendWindow() + expected_send_window := defaultWindowTCP + (protocol.DefaultTCPMSS * 2 * kNumberOfAcks) + Expect(sender.GetCongestionWindow()).To(Equal(expected_send_window)) + + // Lose a packet to exit slow start. + LoseNPackets(1) + packets_in_recovery_window := expected_send_window / protocol.DefaultTCPMSS + + // We should now have fallen out of slow start with a reduced window. + expected_send_window = uint64(float32(expected_send_window) * renoBeta) + Expect(sender.GetCongestionWindow()).To(Equal(expected_send_window)) + + // Recovery phase. We need to ack every packet in the recovery window before + // we exit recovery. + number_of_packets_in_window := expected_send_window / protocol.DefaultTCPMSS + AckNPackets(int(packets_in_recovery_window)) + SendAvailableSendWindow() + Expect(sender.GetCongestionWindow()).To(Equal(expected_send_window)) + + // We need to ack an entire window before we increase CWND by 1. + AckNPackets(int(number_of_packets_in_window) - 2) + SendAvailableSendWindow() + Expect(sender.GetCongestionWindow()).To(Equal(expected_send_window)) + + // Next ack should increase cwnd by 1. + AckNPackets(1) + expected_send_window += protocol.DefaultTCPMSS + Expect(sender.GetCongestionWindow()).To(Equal(expected_send_window)) + + // Now RTO and ensure slow start gets reset. + Expect(sender.HybridSlowStart().Started()).To(BeTrue()) + sender.OnRetransmissionTimeout(true) + Expect(sender.HybridSlowStart().Started()).To(BeFalse()) + }) + + It("no PRR when less than one packet in flight", func() { + SendAvailableSendWindow() + LoseNPackets(int(initialCongestionWindowPackets) - 1) + AckNPackets(1) + // PRR will allow 2 packets for every ack during recovery. + Expect(SendAvailableSendWindow()).To(Equal(2)) + // Simulate abandoning all packets by supplying a bytes_in_flight of 0. + // PRR should now allow a packet to be sent, even though prr's state + // variables believe it has sent enough packets. + Expect(sender.TimeUntilSend(clock.Now(), 0)).To(BeZero()) + }) + + PIt("slow start packet loss PRR", func() { + sender.SetNumEmulatedConnections(1) + // Test based on the first example in RFC6937. + // Ack 10 packets in 5 acks to raise the CWND to 20, as in the example. + const kNumberOfAcks = 5 + for i := 0; i < kNumberOfAcks; i++ { + // Send our full send window. + SendAvailableSendWindow() + AckNPackets(2) + } + SendAvailableSendWindow() + expected_send_window := defaultWindowTCP + (protocol.DefaultTCPMSS * 2 * kNumberOfAcks) + Expect(sender.GetCongestionWindow()).To(Equal(expected_send_window)) + + LoseNPackets(1) + + // We should now have fallen out of slow start with a reduced window. + send_window_before_loss := expected_send_window + expected_send_window = uint64(float32(expected_send_window) * renoBeta) + Expect(sender.GetCongestionWindow()).To(Equal(expected_send_window)) + + // Testing TCP proportional rate reduction. + // We should send packets paced over the received acks for the remaining + // outstanding packets. The number of packets before we exit recovery is the + // original CWND minus the packet that has been lost and the one which + // triggered the loss. + remaining_packets_in_recovery := send_window_before_loss/protocol.DefaultTCPMSS - 2 + + for i := uint64(0); i < remaining_packets_in_recovery; i++ { + AckNPackets(1) + SendAvailableSendWindow() + Expect(sender.GetCongestionWindow()).To(Equal(expected_send_window)) + } + + // We need to ack another window before we increase CWND by 1. + number_of_packets_in_window := expected_send_window / protocol.DefaultTCPMSS + for i := uint64(0); i < number_of_packets_in_window; i++ { + AckNPackets(1) + Expect(SendAvailableSendWindow()).To(Equal(1)) + Expect(sender.GetCongestionWindow()).To(Equal(expected_send_window)) + } + + AckNPackets(1) + expected_send_window += protocol.DefaultTCPMSS + Expect(sender.GetCongestionWindow()).To(Equal(expected_send_window)) + }) + + It("RTO congestion window", func() { + Expect(sender.GetCongestionWindow()).To(Equal(defaultWindowTCP)) + Expect(sender.SlowstartThreshold()).To(Equal(protocol.MaxCongestionWindow)) + + // Expect the window to decrease to the minimum once the RTO fires + // and slow start threshold to be set to 1/2 of the CWND. + sender.OnRetransmissionTimeout(true) + Expect(sender.GetCongestionWindow()).To(Equal(uint64(2 * protocol.DefaultTCPMSS))) + Expect(sender.SlowstartThreshold()).To(Equal(protocol.PacketNumber(5))) + }) + + It("RTO congestion window no retransmission", func() { + Expect(sender.GetCongestionWindow()).To(Equal(defaultWindowTCP)) + + // Expect the window to remain unchanged if the RTO fires but no + // packets are retransmitted. + sender.OnRetransmissionTimeout(false) + Expect(sender.GetCongestionWindow()).To(Equal(defaultWindowTCP)) + }) + + It("slow start max send window", func() { + const kMaxCongestionWindowTCP = 50 + const kNumberOfAcks = 100 + sender = congestion.NewCubicSender(&clock, rttStats, false, initialCongestionWindowPackets, kMaxCongestionWindowTCP) + + for i := 0; i < kNumberOfAcks; i++ { + // Send our full send window. + SendAvailableSendWindow() + AckNPackets(2) + } + expected_send_window := kMaxCongestionWindowTCP * protocol.DefaultTCPMSS + Expect(sender.GetCongestionWindow()).To(Equal(uint64(expected_send_window))) + }) + + It("tcp reno max congestion window", func() { + const kMaxCongestionWindowTCP = 50 + const kNumberOfAcks = 1000 + sender = congestion.NewCubicSender(&clock, rttStats, false, initialCongestionWindowPackets, kMaxCongestionWindowTCP) + + SendAvailableSendWindow() + AckNPackets(2) + // Make sure we fall out of slow start. + LoseNPackets(1) + + for i := 0; i < kNumberOfAcks; i++ { + // Send our full send window. + SendAvailableSendWindow() + AckNPackets(2) + } + + expected_send_window := kMaxCongestionWindowTCP * protocol.DefaultTCPMSS + Expect(sender.GetCongestionWindow()).To(Equal(uint64(expected_send_window))) + }) + + It("tcp cubic max congestion window", func() { + const kMaxCongestionWindowTCP = 50 + // Set to 10000 to compensate for small cubic alpha. + const kNumberOfAcks = 10000 + + sender = congestion.NewCubicSender(&clock, rttStats, false, initialCongestionWindowPackets, kMaxCongestionWindowTCP) + + SendAvailableSendWindow() + AckNPackets(2) + // Make sure we fall out of slow start. + LoseNPackets(1) + + for i := 0; i < kNumberOfAcks; i++ { + // Send our full send window. + SendAvailableSendWindow() + AckNPackets(2) + } + + expected_send_window := kMaxCongestionWindowTCP * protocol.DefaultTCPMSS + Expect(sender.GetCongestionWindow()).To(Equal(uint64(expected_send_window))) + }) }) diff --git a/congestion/hybrid_slow_start.go b/congestion/hybrid_slow_start.go index a0e2306e..f1a2b529 100644 --- a/congestion/hybrid_slow_start.go +++ b/congestion/hybrid_slow_start.go @@ -98,3 +98,14 @@ func (s *HybridSlowStart) OnPacketAcked(ackedPacketNumber protocol.PacketNumber) s.started = false } } + +// Started returns true if started +func (s *HybridSlowStart) Started() bool { + return s.started +} + +// Restart the slow start phase +func (s *HybridSlowStart) Restart() { + s.started = false + s.hystartFound = false +} diff --git a/congestion/interface.go b/congestion/interface.go index a947ccaf..34ae5ffa 100644 --- a/congestion/interface.go +++ b/congestion/interface.go @@ -12,4 +12,9 @@ type SendAlgorithm interface { GetCongestionWindow() uint64 OnCongestionEvent(rttUpdated bool, bytesInFlight uint64, ackedPackets PacketVector, lostPackets PacketVector) BandwidthEstimate() Bandwidth + SetNumEmulatedConnections(n int) + OnRetransmissionTimeout(packetsRetransmitted bool) + + HybridSlowStart() *HybridSlowStart // only for testing + SlowstartThreshold() protocol.PacketNumber // only for testing } diff --git a/utils/utils.go b/utils/utils.go index 4962f2fa..829d28de 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -141,6 +141,14 @@ func Max(a, b int) int { return a } +// MaxUint32 returns the maximum of two uint32 +func MaxUint32(a, b uint32) uint32 { + if a < b { + return b + } + return a +} + // MaxUint64 returns the maximum of two uint64 func MaxUint64(a, b uint64) uint64 { if a < b { diff --git a/utils/utils_test.go b/utils/utils_test.go index b69985e6..0f56599d 100644 --- a/utils/utils_test.go +++ b/utils/utils_test.go @@ -127,6 +127,10 @@ var _ = Describe("Utils", func() { Expect(Max(5, 7)).To(Equal(7)) }) + It("returns the maximum uint32", func() { + Expect(MaxUint32(5, 7)).To(Equal(uint32(7))) + }) + It("returns the maximum uint64", func() { Expect(MaxUint64(5, 7)).To(Equal(uint64(7))) })