trace congestion state changes

This commit is contained in:
Marten Seemann 2020-07-22 14:59:32 +07:00
parent 741dc28d74
commit 0b7efe10d1
12 changed files with 99 additions and 24 deletions

View file

@ -103,6 +103,7 @@ func newSentPacketHandler(
congestion.DefaultClock{}, congestion.DefaultClock{},
rttStats, rttStats,
true, // use Reno true, // use Reno
tracer,
) )
return &sentPacketHandler{ return &sentPacketHandler{

View file

@ -5,6 +5,7 @@ import (
"github.com/lucas-clemente/quic-go/internal/protocol" "github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/utils" "github.com/lucas-clemente/quic-go/internal/utils"
"github.com/lucas-clemente/quic-go/logging"
) )
const ( const (
@ -57,17 +58,20 @@ type cubicSender struct {
initialCongestionWindow protocol.ByteCount initialCongestionWindow protocol.ByteCount
initialMaxCongestionWindow protocol.ByteCount initialMaxCongestionWindow protocol.ByteCount
lastState logging.CongestionState
tracer logging.ConnectionTracer
} }
var _ SendAlgorithm = &cubicSender{} var _ SendAlgorithm = &cubicSender{}
var _ SendAlgorithmWithDebugInfos = &cubicSender{} var _ SendAlgorithmWithDebugInfos = &cubicSender{}
// NewCubicSender makes a new cubic sender // NewCubicSender makes a new cubic sender
func NewCubicSender(clock Clock, rttStats *utils.RTTStats, reno bool) *cubicSender { func NewCubicSender(clock Clock, rttStats *utils.RTTStats, reno bool, tracer logging.ConnectionTracer) *cubicSender {
return newCubicSender(clock, rttStats, reno, initialCongestionWindow, maxCongestionWindow) return newCubicSender(clock, rttStats, reno, initialCongestionWindow, maxCongestionWindow, tracer)
} }
func newCubicSender(clock Clock, rttStats *utils.RTTStats, reno bool, initialCongestionWindow, initialMaxCongestionWindow protocol.ByteCount) *cubicSender { func newCubicSender(clock Clock, rttStats *utils.RTTStats, reno bool, initialCongestionWindow, initialMaxCongestionWindow protocol.ByteCount, tracer logging.ConnectionTracer) *cubicSender {
c := &cubicSender{ c := &cubicSender{
rttStats: rttStats, rttStats: rttStats,
largestSentPacketNumber: protocol.InvalidPacketNumber, largestSentPacketNumber: protocol.InvalidPacketNumber,
@ -82,8 +86,13 @@ func newCubicSender(clock Clock, rttStats *utils.RTTStats, reno bool, initialCon
cubic: NewCubic(clock), cubic: NewCubic(clock),
clock: clock, clock: clock,
reno: reno, reno: reno,
tracer: tracer,
} }
c.pacer = newPacer(c.BandwidthEstimate) c.pacer = newPacer(c.BandwidthEstimate)
if c.tracer != nil {
c.lastState = logging.CongestionStateSlowStart
c.tracer.UpdatedCongestionState(logging.CongestionStateSlowStart)
}
return c return c
} }
@ -131,6 +140,7 @@ func (c *cubicSender) MaybeExitSlowStart() {
if c.InSlowStart() && c.hybridSlowStart.ShouldExitSlowStart(c.rttStats.LatestRTT(), c.rttStats.MinRTT(), c.GetCongestionWindow()/maxDatagramSize) { if c.InSlowStart() && c.hybridSlowStart.ShouldExitSlowStart(c.rttStats.LatestRTT(), c.rttStats.MinRTT(), c.GetCongestionWindow()/maxDatagramSize) {
// exit slow start // exit slow start
c.slowStartThreshold = c.congestionWindow c.slowStartThreshold = c.congestionWindow
c.maybeTraceStateChange(logging.CongestionStateCongestionAvoidance)
} }
} }
@ -161,6 +171,7 @@ func (c *cubicSender) OnPacketLost(
return return
} }
c.lastCutbackExitedSlowstart = c.InSlowStart() c.lastCutbackExitedSlowstart = c.InSlowStart()
c.maybeTraceStateChange(logging.CongestionStateRecovery)
if c.reno { if c.reno {
c.congestionWindow = protocol.ByteCount(float64(c.congestionWindow) * renoBeta) c.congestionWindow = protocol.ByteCount(float64(c.congestionWindow) * renoBeta)
@ -189,6 +200,7 @@ func (c *cubicSender) maybeIncreaseCwnd(
// the current window. // the current window.
if !c.isCwndLimited(priorInFlight) { if !c.isCwndLimited(priorInFlight) {
c.cubic.OnApplicationLimited() c.cubic.OnApplicationLimited()
c.maybeTraceStateChange(logging.CongestionStateApplicationLimited)
return return
} }
if c.congestionWindow >= c.maxCongestionWindow { if c.congestionWindow >= c.maxCongestionWindow {
@ -197,9 +209,11 @@ func (c *cubicSender) maybeIncreaseCwnd(
if c.InSlowStart() { if c.InSlowStart() {
// TCP slow start, exponential growth, increase by one for each ACK. // TCP slow start, exponential growth, increase by one for each ACK.
c.congestionWindow += maxDatagramSize c.congestionWindow += maxDatagramSize
c.maybeTraceStateChange(logging.CongestionStateSlowStart)
return return
} }
// Congestion avoidance // Congestion avoidance
c.maybeTraceStateChange(logging.CongestionStateCongestionAvoidance)
if c.reno { if c.reno {
// Classic Reno congestion avoidance. // Classic Reno congestion avoidance.
c.numAckedPackets++ c.numAckedPackets++
@ -257,3 +271,10 @@ func (c *cubicSender) OnConnectionMigration() {
c.slowStartThreshold = c.initialMaxCongestionWindow c.slowStartThreshold = c.initialMaxCongestionWindow
c.maxCongestionWindow = c.initialMaxCongestionWindow c.maxCongestionWindow = c.initialMaxCongestionWindow
} }
func (c *cubicSender) maybeTraceStateChange(new logging.CongestionState) {
if c.tracer == nil || new == c.lastState {
return
}
c.tracer.UpdatedCongestionState(new)
}

View file

@ -40,7 +40,7 @@ var _ = Describe("Cubic Sender", func() {
ackedPacketNumber = 0 ackedPacketNumber = 0
clock = mockClock{} clock = mockClock{}
rttStats = utils.NewRTTStats() rttStats = utils.NewRTTStats()
sender = newCubicSender(&clock, rttStats, true /*reno*/, initialCongestionWindowPackets*maxDatagramSize, MaxCongestionWindow) sender = newCubicSender(&clock, rttStats, true /*reno*/, initialCongestionWindowPackets*maxDatagramSize, MaxCongestionWindow, nil)
}) })
SendAvailableSendWindowLen := func(packetLength protocol.ByteCount) int { SendAvailableSendWindowLen := func(packetLength protocol.ByteCount) int {
@ -308,7 +308,7 @@ var _ = Describe("Cubic Sender", func() {
It("tcp cubic reset epoch on quiescence", func() { It("tcp cubic reset epoch on quiescence", func() {
const maxCongestionWindow = 50 const maxCongestionWindow = 50
const maxCongestionWindowBytes = maxCongestionWindow * maxDatagramSize const maxCongestionWindowBytes = maxCongestionWindow * maxDatagramSize
sender = newCubicSender(&clock, rttStats, false, initialCongestionWindowPackets*maxDatagramSize, maxCongestionWindowBytes) sender = newCubicSender(&clock, rttStats, false, initialCongestionWindowPackets*maxDatagramSize, maxCongestionWindowBytes, nil)
numSent := SendAvailableSendWindow() numSent := SendAvailableSendWindow()
@ -448,7 +448,7 @@ var _ = Describe("Cubic Sender", func() {
}) })
It("default max cwnd", func() { It("default max cwnd", func() {
sender = newCubicSender(&clock, rttStats, true /*reno*/, initialCongestionWindowPackets*maxDatagramSize, maxCongestionWindow) sender = newCubicSender(&clock, rttStats, true /*reno*/, initialCongestionWindowPackets*maxDatagramSize, maxCongestionWindow, nil)
defaultMaxCongestionWindowPackets := maxCongestionWindow / maxDatagramSize defaultMaxCongestionWindowPackets := maxCongestionWindow / maxDatagramSize
for i := 1; i < int(defaultMaxCongestionWindowPackets); i++ { for i := 1; i < int(defaultMaxCongestionWindowPackets); i++ {
@ -460,7 +460,7 @@ var _ = Describe("Cubic Sender", func() {
It("limit cwnd increase in congestion avoidance", func() { It("limit cwnd increase in congestion avoidance", func() {
// Enable Cubic. // Enable Cubic.
sender = newCubicSender(&clock, rttStats, false, initialCongestionWindowPackets*maxDatagramSize, MaxCongestionWindow) sender = newCubicSender(&clock, rttStats, false, initialCongestionWindowPackets*maxDatagramSize, MaxCongestionWindow, nil)
numSent := SendAvailableSendWindow() numSent := SendAvailableSendWindow()
// Make sure we fall out of slow start. // Make sure we fall out of slow start.

View file

@ -5,16 +5,15 @@
package mocks package mocks
import ( import (
"net" net "net"
"reflect" reflect "reflect"
"time" time "time"
"github.com/lucas-clemente/quic-go/internal/utils" gomock "github.com/golang/mock/gomock"
protocol "github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/golang/mock/gomock" utils "github.com/lucas-clemente/quic-go/internal/utils"
"github.com/lucas-clemente/quic-go/internal/protocol" wire "github.com/lucas-clemente/quic-go/internal/wire"
"github.com/lucas-clemente/quic-go/internal/wire" logging "github.com/lucas-clemente/quic-go/logging"
"github.com/lucas-clemente/quic-go/logging"
) )
// MockConnectionTracer is a mock of ConnectionTracer interface // MockConnectionTracer is a mock of ConnectionTracer interface
@ -232,6 +231,18 @@ func (mr *MockConnectionTracerMockRecorder) StartedConnection(arg0, arg1, arg2,
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartedConnection", reflect.TypeOf((*MockConnectionTracer)(nil).StartedConnection), arg0, arg1, arg2, arg3, arg4) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartedConnection", reflect.TypeOf((*MockConnectionTracer)(nil).StartedConnection), arg0, arg1, arg2, arg3, arg4)
} }
// UpdatedCongestionState mocks base method
func (m *MockConnectionTracer) UpdatedCongestionState(arg0 logging.CongestionState) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "UpdatedCongestionState", arg0)
}
// UpdatedCongestionState indicates an expected call of UpdatedCongestionState
func (mr *MockConnectionTracerMockRecorder) UpdatedCongestionState(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdatedCongestionState", reflect.TypeOf((*MockConnectionTracer)(nil).UpdatedCongestionState), arg0)
}
// UpdatedKey mocks base method // UpdatedKey mocks base method
func (m *MockConnectionTracer) UpdatedKey(arg0 protocol.KeyPhase, arg1 bool) { func (m *MockConnectionTracer) UpdatedKey(arg0 protocol.KeyPhase, arg1 bool) {
m.ctrl.T.Helper() m.ctrl.T.Helper()

View file

@ -106,6 +106,7 @@ type ConnectionTracer interface {
DroppedPacket(PacketType, ByteCount, PacketDropReason) DroppedPacket(PacketType, ByteCount, PacketDropReason)
UpdatedMetrics(rttStats *RTTStats, cwnd, bytesInFlight ByteCount, packetsInFlight int) UpdatedMetrics(rttStats *RTTStats, cwnd, bytesInFlight ByteCount, packetsInFlight int)
LostPacket(EncryptionLevel, PacketNumber, PacketLossReason) LostPacket(EncryptionLevel, PacketNumber, PacketLossReason)
UpdatedCongestionState(CongestionState)
UpdatedPTOCount(value uint32) UpdatedPTOCount(value uint32)
UpdatedKeyFromTLS(EncryptionLevel, Perspective) UpdatedKeyFromTLS(EncryptionLevel, Perspective)
UpdatedKey(generation KeyPhase, remote bool) UpdatedKey(generation KeyPhase, remote bool)

View file

@ -5,14 +5,13 @@
package logging package logging
import ( import (
"net" gomock "github.com/golang/mock/gomock"
"reflect" protocol "github.com/lucas-clemente/quic-go/internal/protocol"
"time" utils "github.com/lucas-clemente/quic-go/internal/utils"
wire "github.com/lucas-clemente/quic-go/internal/wire"
"github.com/golang/mock/gomock" net "net"
"github.com/lucas-clemente/quic-go/internal/protocol" reflect "reflect"
"github.com/lucas-clemente/quic-go/internal/utils" time "time"
"github.com/lucas-clemente/quic-go/internal/wire"
) )
// MockConnectionTracer is a mock of ConnectionTracer interface // MockConnectionTracer is a mock of ConnectionTracer interface
@ -230,6 +229,18 @@ func (mr *MockConnectionTracerMockRecorder) StartedConnection(arg0, arg1, arg2,
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartedConnection", reflect.TypeOf((*MockConnectionTracer)(nil).StartedConnection), arg0, arg1, arg2, arg3, arg4) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartedConnection", reflect.TypeOf((*MockConnectionTracer)(nil).StartedConnection), arg0, arg1, arg2, arg3, arg4)
} }
// UpdatedCongestionState mocks base method
func (m *MockConnectionTracer) UpdatedCongestionState(arg0 CongestionState) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "UpdatedCongestionState", arg0)
}
// UpdatedCongestionState indicates an expected call of UpdatedCongestionState
func (mr *MockConnectionTracerMockRecorder) UpdatedCongestionState(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdatedCongestionState", reflect.TypeOf((*MockConnectionTracer)(nil).UpdatedCongestionState), arg0)
}
// UpdatedKey mocks base method // UpdatedKey mocks base method
func (m *MockConnectionTracer) UpdatedKey(arg0 protocol.KeyPhase, arg1 bool) { func (m *MockConnectionTracer) UpdatedKey(arg0 protocol.KeyPhase, arg1 bool) {
m.ctrl.T.Helper() m.ctrl.T.Helper()

View file

@ -120,6 +120,12 @@ func (m *connTracerMultiplexer) DroppedPacket(typ PacketType, size ByteCount, re
} }
} }
func (m *connTracerMultiplexer) UpdatedCongestionState(state CongestionState) {
for _, t := range m.tracers {
t.UpdatedCongestionState(state)
}
}
func (m *connTracerMultiplexer) UpdatedMetrics(rttStats *RTTStats, cwnd, bytesInFLight ByteCount, packetsInFlight int) { func (m *connTracerMultiplexer) UpdatedMetrics(rttStats *RTTStats, cwnd, bytesInFLight ByteCount, packetsInFlight int) {
for _, t := range m.tracers { for _, t := range m.tracers {
t.UpdatedMetrics(rttStats, cwnd, bytesInFLight, packetsInFlight) t.UpdatedMetrics(rttStats, cwnd, bytesInFLight, packetsInFlight)

View file

@ -169,6 +169,12 @@ var _ = Describe("Tracing", func() {
tracer.DroppedPacket(PacketTypeInitial, 1337, PacketDropHeaderParseError) tracer.DroppedPacket(PacketTypeInitial, 1337, PacketDropHeaderParseError)
}) })
It("traces the UpdatedCongestionState event", func() {
tr1.EXPECT().UpdatedCongestionState(CongestionStateRecovery)
tr2.EXPECT().UpdatedCongestionState(CongestionStateRecovery)
tracer.UpdatedCongestionState(CongestionStateRecovery)
})
It("traces the UpdatedMetrics event", func() { It("traces the UpdatedMetrics event", func() {
rttStats := &RTTStats{} rttStats := &RTTStats{}
rttStats.UpdateRTT(time.Second, 0, time.Now()) rttStats.UpdateRTT(time.Second, 0, time.Now())

View file

@ -81,3 +81,16 @@ const (
// This reason is not defined in the qlog draft, but very useful for debugging. // This reason is not defined in the qlog draft, but very useful for debugging.
TimeoutReasonIdle TimeoutReasonIdle
) )
type CongestionState uint8
const (
// CongestionStateSlowStart is the slow start phase of Reno / Cubic
CongestionStateSlowStart CongestionState = iota
// CongestionStateCongestionAvoidance is the slow start phase of Reno / Cubic
CongestionStateCongestionAvoidance
// CongestionStateCongestionAvoidance is the recovery phase of Reno / Cubic
CongestionStateRecovery
// CongestionStateApplicationLimited means that the congestion controller is application limited
CongestionStateApplicationLimited
)

View file

@ -172,6 +172,7 @@ func (t *connTracer) ReceivedPacket(*logging.ExtendedHeader, logging.ByteCount,
} }
func (t *connTracer) BufferedPacket(logging.PacketType) {} func (t *connTracer) BufferedPacket(logging.PacketType) {}
func (t *connTracer) DroppedPacket(logging.PacketType, logging.ByteCount, logging.PacketDropReason) {} func (t *connTracer) DroppedPacket(logging.PacketType, logging.ByteCount, logging.PacketDropReason) {}
func (t *connTracer) UpdatedCongestionState(logging.CongestionState) {}
func (t *connTracer) UpdatedMetrics(*logging.RTTStats, logging.ByteCount, logging.ByteCount, int) {} func (t *connTracer) UpdatedMetrics(*logging.RTTStats, logging.ByteCount, logging.ByteCount, int) {}
func (t *connTracer) LostPacket(encLevel logging.EncryptionLevel, _ logging.PacketNumber, reason logging.PacketLossReason) { func (t *connTracer) LostPacket(encLevel logging.EncryptionLevel, _ logging.PacketNumber, reason logging.PacketLossReason) {
stats.RecordWithTags( stats.RecordWithTags(

View file

@ -316,6 +316,8 @@ func (t *connectionTracer) LostPacket(encLevel protocol.EncryptionLevel, pn prot
t.mutex.Unlock() t.mutex.Unlock()
} }
func (t *connectionTracer) UpdatedCongestionState(logging.CongestionState) {}
func (t *connectionTracer) UpdatedPTOCount(value uint32) { func (t *connectionTracer) UpdatedPTOCount(value uint32) {
t.mutex.Lock() t.mutex.Lock()
t.recordEvent(time.Now(), &eventUpdatedPTO{Value: value}) t.recordEvent(time.Now(), &eventUpdatedPTO{Value: value})

View file

@ -91,6 +91,7 @@ var _ = Describe("Session", func() {
tracer = mocks.NewMockConnectionTracer(mockCtrl) tracer = mocks.NewMockConnectionTracer(mockCtrl)
tracer.EXPECT().SentTransportParameters(gomock.Any()) tracer.EXPECT().SentTransportParameters(gomock.Any())
tracer.EXPECT().UpdatedKeyFromTLS(gomock.Any(), gomock.Any()).AnyTimes() tracer.EXPECT().UpdatedKeyFromTLS(gomock.Any(), gomock.Any()).AnyTimes()
tracer.EXPECT().UpdatedCongestionState(gomock.Any())
sess = newSession( sess = newSession(
mconn, mconn,
sessionRunner, sessionRunner,
@ -2154,6 +2155,7 @@ var _ = Describe("Client Session", func() {
tracer = mocks.NewMockConnectionTracer(mockCtrl) tracer = mocks.NewMockConnectionTracer(mockCtrl)
tracer.EXPECT().SentTransportParameters(gomock.Any()) tracer.EXPECT().SentTransportParameters(gomock.Any())
tracer.EXPECT().UpdatedKeyFromTLS(gomock.Any(), gomock.Any()).AnyTimes() tracer.EXPECT().UpdatedKeyFromTLS(gomock.Any(), gomock.Any()).AnyTimes()
tracer.EXPECT().UpdatedCongestionState(gomock.Any())
sess = newClientSession( sess = newClientSession(
mconn, mconn,
sessionRunner, sessionRunner,