Merge pull request #1105 from apernet/wip-rbq

fix: BBR stuck in STARTUP phase due to incorrect app-limited logic
This commit is contained in:
Toby 2024-05-28 21:54:06 -07:00 committed by GitHub
commit 492145c124
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -4,6 +4,8 @@ import (
"fmt"
"math/rand"
"net"
"os"
"strconv"
"time"
"github.com/apernet/quic-go/congestion"
@ -37,6 +39,8 @@ const (
derivedHighGain = 2.773
// The newly derived CWND gain for STARTUP, 2.
derivedHighCWNDGain = 2.0
debugEnv = "HYSTERIA_BBR_DEBUG"
)
// The cycle of gains used during the PROBE_BW stage.
@ -61,7 +65,7 @@ const (
// Flag.
defaultStartupFullLossCount = 8
quicBbr2DefaultLossThreshold = 0.02
maxBbrBurstPackets = 3
maxBbrBurstPackets = 10
)
type bbrMode int
@ -237,6 +241,8 @@ type bbrSender struct {
maxDatagramSize congestion.ByteCount
// Recorded on packet sent. equivalent |unacked_packets_->bytes_in_flight()|
bytesInFlight congestion.ByteCount
debug bool
}
var _ congestion.CongestionControl = &bbrSender{}
@ -259,6 +265,7 @@ func newBbrSender(
initialCongestionWindow,
initialMaxCongestionWindow congestion.ByteCount,
) *bbrSender {
debug, _ := strconv.ParseBool(os.Getenv(debugEnv))
b := &bbrSender{
clock: clock,
mode: bbrModeStartup,
@ -284,6 +291,7 @@ func newBbrSender(
cwndToCalculateMinPacingRate: initialCongestionWindow,
maxCongestionWindowWithNetworkParametersAdjusted: initialMaxCongestionWindow,
maxDatagramSize: initialMaxDatagramSize,
debug: debug,
}
b.pacer = common.NewPacer(b.bandwidthForPacer)
@ -411,7 +419,7 @@ func (b *bbrSender) OnCongestionEventEx(priorInFlight congestion.ByteCount, even
// packet in lost_packets.
var lastPacketSendState sendTimeState
b.maybeApplimited(priorInFlight)
b.maybeAppLimited(priorInFlight)
// Update bytesInFlight
b.bytesInFlight = priorInFlight
@ -539,7 +547,7 @@ func (b *bbrSender) setDrainGain(drainGain float64) {
b.drainGain = drainGain
}
// What's the current estimated bandwidth in bytes per second.
// Get the current bandwidth estimate. Note that Bandwidth is in bits per second.
func (b *bbrSender) bandwidthEstimate() Bandwidth {
return b.maxBandwidth.GetBest()
}
@ -607,6 +615,10 @@ func (b *bbrSender) enterStartupMode(now time.Time) {
// b.maybeTraceStateChange(logging.CongestionStateStartup)
b.pacingGain = b.highGain
b.congestionWindowGain = b.highCwndGain
if b.debug {
b.debugPrint("Phase: STARTUP")
}
}
// Enters the PROBE_BW mode.
@ -625,6 +637,10 @@ func (b *bbrSender) enterProbeBandwidthMode(now time.Time) {
b.lastCycleStart = now
b.pacingGain = pacingGain[b.cycleCurrentOffset]
if b.debug {
b.debugPrint("Phase: PROBE_BW")
}
}
// Updates the round-trip counter if a round-trip has passed. Returns true if
@ -698,14 +714,8 @@ func (b *bbrSender) checkIfFullBandwidthReached(lastPacketSendState *sendTimeSta
}
}
func (b *bbrSender) maybeApplimited(bytesInFlight congestion.ByteCount) {
congestionWindow := b.GetCongestionWindow()
if bytesInFlight >= congestionWindow {
return
}
availableBytes := congestionWindow - bytesInFlight
drainLimited := b.mode == bbrModeDrain && bytesInFlight > congestionWindow/2
if !drainLimited || availableBytes > maxBbrBurstPackets*b.maxDatagramSize {
func (b *bbrSender) maybeAppLimited(bytesInFlight congestion.ByteCount) {
if bytesInFlight < b.getTargetCongestionWindow(1) {
b.sampler.OnAppLimited()
}
}
@ -718,6 +728,10 @@ func (b *bbrSender) maybeExitStartupOrDrain(now time.Time) {
// b.maybeTraceStateChange(logging.CongestionStateDrain)
b.pacingGain = b.drainGain
b.congestionWindowGain = b.highCwndGain
if b.debug {
b.debugPrint("Phase: DRAIN")
}
}
if b.mode == bbrModeDrain && b.bytesInFlight <= b.getTargetCongestionWindow(1) {
b.enterProbeBandwidthMode(now)
@ -733,6 +747,12 @@ func (b *bbrSender) maybeEnterOrExitProbeRtt(now time.Time, isRoundStart, minRtt
// Do not decide on the time to exit PROBE_RTT until the |bytes_in_flight|
// is at the target small value.
b.exitProbeRttAt = time.Time{}
if b.debug {
b.debugPrint("BandwidthEstimate: %s, CongestionWindowGain: %.2f, PacingGain: %.2f, PacingRate: %s",
formatSpeed(b.bandwidthEstimate()), b.congestionWindowGain, b.pacingGain, formatSpeed(b.PacingRate()))
b.debugPrint("Phase: PROBE_RTT")
}
}
if b.mode == bbrModeProbeRtt {
@ -754,6 +774,9 @@ func (b *bbrSender) maybeEnterOrExitProbeRtt(now time.Time, isRoundStart, minRtt
}
if now.Sub(b.exitProbeRttAt) >= 0 && b.probeRttRoundPassed {
b.minRttTimestamp = now
if b.debug {
b.debugPrint("MinRTT: %s", b.getMinRtt())
}
if !b.isAtFullBandwidth {
b.enterStartupMode(now)
} else {
@ -925,6 +948,12 @@ func (b *bbrSender) shouldExitStartupDueToLoss(lastPacketSendState *sendTimeStat
return false
}
func (b *bbrSender) debugPrint(format string, a ...any) {
fmt.Printf("[BBRSender] [%s] %s\n",
time.Now().Format("15:04:05"),
fmt.Sprintf(format, a...))
}
func bdpFromRttAndBandwidth(rtt time.Duration, bandwidth Bandwidth) congestion.ByteCount {
return congestion.ByteCount(rtt) * congestion.ByteCount(bandwidth) / congestion.ByteCount(BytesPerSecond) / congestion.ByteCount(time.Second)
}
@ -942,3 +971,14 @@ func GetInitialPacketSize(addr net.Addr) congestion.ByteCount {
return congestion.MinInitialPacketSize
}
}
func formatSpeed(bw Bandwidth) string {
bwf := float64(bw)
units := []string{"bps", "Kbps", "Mbps", "Gbps"}
unitIndex := 0
for bwf > 1024 && unitIndex < len(units)-1 {
bwf /= 1024
unitIndex++
}
return fmt.Sprintf("%.2f %s", bwf, units[unitIndex])
}