rewrite flow control auto-tuning

This commit is contained in:
Marten Seemann 2017-12-21 15:29:47 +07:00
parent ac05343b00
commit f7526b9883
5 changed files with 118 additions and 78 deletions

View file

@ -22,7 +22,9 @@ type baseFlowController struct {
receiveWindow protocol.ByteCount receiveWindow protocol.ByteCount
receiveWindowSize protocol.ByteCount receiveWindowSize protocol.ByteCount
maxReceiveWindowSize protocol.ByteCount maxReceiveWindowSize protocol.ByteCount
lastWindowUpdateTime time.Time
epochStartTime time.Time
epochStartOffset protocol.ByteCount
rttStats *congestion.RTTStats rttStats *congestion.RTTStats
} }
@ -53,7 +55,7 @@ func (c *baseFlowController) AddBytesRead(n protocol.ByteCount) {
// pretend we sent a WindowUpdate when reading the first byte // pretend we sent a WindowUpdate when reading the first byte
// this way auto-tuning of the window size already works for the first WindowUpdate // this way auto-tuning of the window size already works for the first WindowUpdate
if c.bytesRead == 0 { if c.bytesRead == 0 {
c.lastWindowUpdateTime = time.Now() c.startNewAutoTuningEpoch()
} }
c.bytesRead += n c.bytesRead += n
} }
@ -69,7 +71,6 @@ func (c *baseFlowController) getWindowUpdate() protocol.ByteCount {
c.maybeAdjustWindowSize() c.maybeAdjustWindowSize()
c.receiveWindow = c.bytesRead + c.receiveWindowSize c.receiveWindow = c.bytesRead + c.receiveWindowSize
c.lastWindowUpdateTime = time.Now()
return c.receiveWindow return c.receiveWindow
} }
@ -85,24 +86,30 @@ func (c *baseFlowController) IsNewlyBlocked() (bool, protocol.ByteCount) {
} }
// maybeAdjustWindowSize increases the receiveWindowSize if we're sending updates too often. // maybeAdjustWindowSize increases the receiveWindowSize if we're sending updates too often.
// For details about auto-tuning, see https://docs.google.com/document/d/1F2YfdDXKpy20WVKJueEf4abn_LVZHhMUMS5gX6Pgjl4/edit#heading=h.hcm2y5x4qmqt. // For details about auto-tuning, see https://docs.google.com/document/d/1SExkMmGiz8VYzV3s9E35JQlJ73vhzCekKkDi85F1qCE/edit?usp=sharing.
func (c *baseFlowController) maybeAdjustWindowSize() { func (c *baseFlowController) maybeAdjustWindowSize() {
if c.lastWindowUpdateTime.IsZero() { bytesReadInEpoch := c.bytesRead - c.epochStartOffset
// don't do anything if less than half the window has been consumed
if bytesReadInEpoch <= c.receiveWindowSize/2 {
return return
} }
rtt := c.rttStats.SmoothedRTT() rtt := c.rttStats.SmoothedRTT()
if rtt == 0 { if rtt == 0 {
return return
} }
timeSinceLastWindowUpdate := time.Since(c.lastWindowUpdateTime) fraction := float64(bytesReadInEpoch) / float64(c.receiveWindowSize)
// interval between the updates is sufficiently large, no need to increase the window size if time.Since(c.epochStartTime) < time.Duration(4*fraction*float64(rtt)) {
if timeSinceLastWindowUpdate >= 4*protocol.WindowUpdateThreshold*rtt { // window is consumed too fast, try to increase the window size
return
}
c.receiveWindowSize = utils.MinByteCount(2*c.receiveWindowSize, c.maxReceiveWindowSize) c.receiveWindowSize = utils.MinByteCount(2*c.receiveWindowSize, c.maxReceiveWindowSize)
} }
c.startNewAutoTuningEpoch()
}
func (c *baseFlowController) startNewAutoTuningEpoch() {
c.epochStartTime = time.Now()
c.epochStartOffset = c.bytesRead
}
func (c *baseFlowController) checkFlowControlViolation() bool { func (c *baseFlowController) checkFlowControlViolation() bool {
return c.highestReceived > c.receiveWindow return c.highestReceived > c.receiveWindow

View file

@ -77,10 +77,11 @@ var _ = Describe("Base Flow controller", func() {
Context("receive flow control", func() { Context("receive flow control", func() {
var ( var (
receiveWindow protocol.ByteCount = 10000 receiveWindow protocol.ByteCount = 10000
receiveWindowSize protocol.ByteCount = 600 receiveWindowSize protocol.ByteCount = 1000
) )
BeforeEach(func() { BeforeEach(func() {
controller.bytesRead = receiveWindow - receiveWindowSize
controller.receiveWindow = receiveWindow controller.receiveWindow = receiveWindow
controller.receiveWindowSize = receiveWindowSize controller.receiveWindowSize = receiveWindowSize
}) })
@ -92,7 +93,6 @@ var _ = Describe("Base Flow controller", func() {
}) })
It("triggers a window update when necessary", func() { It("triggers a window update when necessary", func() {
controller.lastWindowUpdateTime = time.Now().Add(-time.Hour)
bytesConsumed := float64(receiveWindowSize)*protocol.WindowUpdateThreshold + 1 // consumed 1 byte more than the threshold bytesConsumed := float64(receiveWindowSize)*protocol.WindowUpdateThreshold + 1 // consumed 1 byte more than the threshold
bytesRemaining := receiveWindowSize - protocol.ByteCount(bytesConsumed) bytesRemaining := receiveWindowSize - protocol.ByteCount(bytesConsumed)
readPosition := receiveWindow - bytesRemaining readPosition := receiveWindow - bytesRemaining
@ -100,19 +100,15 @@ var _ = Describe("Base Flow controller", func() {
offset := controller.getWindowUpdate() offset := controller.getWindowUpdate()
Expect(offset).To(Equal(readPosition + receiveWindowSize)) Expect(offset).To(Equal(readPosition + receiveWindowSize))
Expect(controller.receiveWindow).To(Equal(readPosition + receiveWindowSize)) Expect(controller.receiveWindow).To(Equal(readPosition + receiveWindowSize))
Expect(controller.lastWindowUpdateTime).To(BeTemporally("~", time.Now(), 20*time.Millisecond))
}) })
It("doesn't trigger a window update when not necessary", func() { It("doesn't trigger a window update when not necessary", func() {
lastWindowUpdateTime := time.Now().Add(-time.Hour)
controller.lastWindowUpdateTime = lastWindowUpdateTime
bytesConsumed := float64(receiveWindowSize)*protocol.WindowUpdateThreshold - 1 // consumed 1 byte less than the threshold bytesConsumed := float64(receiveWindowSize)*protocol.WindowUpdateThreshold - 1 // consumed 1 byte less than the threshold
bytesRemaining := receiveWindowSize - protocol.ByteCount(bytesConsumed) bytesRemaining := receiveWindowSize - protocol.ByteCount(bytesConsumed)
readPosition := receiveWindow - bytesRemaining readPosition := receiveWindow - bytesRemaining
controller.bytesRead = readPosition controller.bytesRead = readPosition
offset := controller.getWindowUpdate() offset := controller.getWindowUpdate()
Expect(offset).To(BeZero()) Expect(offset).To(BeZero())
Expect(controller.lastWindowUpdateTime).To(Equal(lastWindowUpdateTime))
}) })
Context("receive window size auto-tuning", func() { Context("receive window size auto-tuning", func() {
@ -120,7 +116,7 @@ var _ = Describe("Base Flow controller", func() {
BeforeEach(func() { BeforeEach(func() {
oldWindowSize = controller.receiveWindowSize oldWindowSize = controller.receiveWindowSize
controller.maxReceiveWindowSize = 3000 controller.maxReceiveWindowSize = 5000
}) })
// update the congestion such that it returns a given value for the smoothed RTT // update the congestion such that it returns a given value for the smoothed RTT
@ -136,62 +132,91 @@ var _ = Describe("Base Flow controller", func() {
It("doesn't increase the window size when no RTT estimate is available", func() { It("doesn't increase the window size when no RTT estimate is available", func() {
setRtt(0) setRtt(0)
controller.lastWindowUpdateTime = time.Now() controller.startNewAutoTuningEpoch()
controller.maybeAdjustWindowSize() controller.AddBytesRead(400)
offset := controller.getWindowUpdate()
Expect(offset).ToNot(BeZero()) // make sure a window update is sent
Expect(controller.receiveWindowSize).To(Equal(oldWindowSize)) Expect(controller.receiveWindowSize).To(Equal(oldWindowSize))
}) })
It("increases the window size when the last WindowUpdate was sent less than (4 * threshold) RTTs ago", func() { It("increases the window size if read so fast that the window would be consumed in less than 4 RTTs", func() {
bytesRead := controller.bytesRead
rtt := 20 * time.Millisecond rtt := 20 * time.Millisecond
setRtt(rtt) setRtt(rtt)
controller.AddBytesRead(9900) // receive window is 10000 // consume more than 2/3 of the window...
controller.lastWindowUpdateTime = time.Now().Add(-4*protocol.WindowUpdateThreshold*rtt + time.Millisecond) dataRead := receiveWindowSize*2/3 + 1
// ... in 4*2/3 of the RTT
controller.epochStartOffset = controller.bytesRead
controller.epochStartTime = time.Now().Add(-rtt * 4 * 2 / 3)
controller.AddBytesRead(dataRead)
offset := controller.getWindowUpdate() offset := controller.getWindowUpdate()
Expect(offset).ToNot(BeZero()) Expect(offset).ToNot(BeZero())
// check that the window size was increased // check that the window size was increased
newWindowSize := controller.receiveWindowSize newWindowSize := controller.receiveWindowSize
Expect(newWindowSize).To(Equal(2 * oldWindowSize)) Expect(newWindowSize).To(Equal(2 * oldWindowSize))
// check that the new window size was used to increase the offset // check that the new window size was used to increase the offset
Expect(offset).To(Equal(protocol.ByteCount(9900 + newWindowSize))) Expect(offset).To(Equal(protocol.ByteCount(bytesRead + dataRead + newWindowSize)))
}) })
It("doesn't increase the increase window size when the last WindowUpdate was sent more than (4 * threshold) RTTs ago", func() { It("doesn't increase the window size if data is read so fast that the window would be consumed in less than 4 RTTs, but less than half the window has been read", func() {
// this test only makes sense if a window update is triggered before half of the window has been consumed
Expect(protocol.WindowUpdateThreshold).To(BeNumerically(">", 1/3))
bytesRead := controller.bytesRead
rtt := 20 * time.Millisecond rtt := 20 * time.Millisecond
setRtt(rtt) setRtt(rtt)
controller.lastWindowUpdateTime = time.Now().Add(-4*protocol.WindowUpdateThreshold*rtt - time.Millisecond) // consume more than 2/3 of the window...
controller.maybeAdjustWindowSize() dataRead := receiveWindowSize*1/3 + 1
// ... in 4*2/3 of the RTT
controller.epochStartOffset = controller.bytesRead
controller.epochStartTime = time.Now().Add(-rtt * 4 * 1 / 3)
controller.AddBytesRead(dataRead)
offset := controller.getWindowUpdate()
Expect(offset).ToNot(BeZero())
// check that the window size was not increased
newWindowSize := controller.receiveWindowSize
Expect(newWindowSize).To(Equal(oldWindowSize))
// check that the new window size was used to increase the offset
Expect(offset).To(Equal(protocol.ByteCount(bytesRead + dataRead + newWindowSize)))
})
It("doesn't increase the window size if read too slowly", func() {
bytesRead := controller.bytesRead
rtt := 20 * time.Millisecond
setRtt(rtt)
// consume less than 2/3 of the window...
dataRead := receiveWindowSize*2/3 - 1
// ... in 4*2/3 of the RTT
controller.epochStartOffset = controller.bytesRead
controller.epochStartTime = time.Now().Add(-rtt * 4 * 2 / 3)
controller.AddBytesRead(dataRead)
offset := controller.getWindowUpdate()
Expect(offset).ToNot(BeZero())
// check that the window size was not increased
Expect(controller.receiveWindowSize).To(Equal(oldWindowSize)) Expect(controller.receiveWindowSize).To(Equal(oldWindowSize))
// check that the new window size was used to increase the offset
Expect(offset).To(Equal(protocol.ByteCount(bytesRead + dataRead + oldWindowSize)))
}) })
It("doesn't increase the window size to a value higher than the maxReceiveWindowSize", func() { It("doesn't increase the window size to a value higher than the maxReceiveWindowSize", func() {
resetEpoch := func() {
// make sure the next call to maybeAdjustWindowSize will increase the window
controller.epochStartTime = time.Now().Add(-time.Millisecond)
controller.epochStartOffset = controller.bytesRead
controller.AddBytesRead(controller.receiveWindowSize/2 + 1)
}
setRtt(20 * time.Millisecond) setRtt(20 * time.Millisecond)
controller.lastWindowUpdateTime = time.Now().Add(-time.Millisecond) resetEpoch()
controller.maybeAdjustWindowSize() controller.maybeAdjustWindowSize()
Expect(controller.receiveWindowSize).To(Equal(2 * oldWindowSize)) // 1200 Expect(controller.receiveWindowSize).To(Equal(2 * oldWindowSize)) // 2000
// because the lastWindowUpdateTime is updated by MaybeTriggerWindowUpdate(), we can just call maybeAdjustWindowSize() multiple times and get an increase of the window size every time // because the lastWindowUpdateTime is updated by MaybeTriggerWindowUpdate(), we can just call maybeAdjustWindowSize() multiple times and get an increase of the window size every time
resetEpoch()
controller.maybeAdjustWindowSize() controller.maybeAdjustWindowSize()
Expect(controller.receiveWindowSize).To(Equal(2 * 2 * oldWindowSize)) // 2400 Expect(controller.receiveWindowSize).To(Equal(2 * 2 * oldWindowSize)) // 4000
resetEpoch()
controller.maybeAdjustWindowSize() controller.maybeAdjustWindowSize()
Expect(controller.receiveWindowSize).To(Equal(controller.maxReceiveWindowSize)) // 3000 Expect(controller.receiveWindowSize).To(Equal(controller.maxReceiveWindowSize)) // 5000
controller.maybeAdjustWindowSize() controller.maybeAdjustWindowSize()
Expect(controller.receiveWindowSize).To(Equal(controller.maxReceiveWindowSize)) // 3000 Expect(controller.receiveWindowSize).To(Equal(controller.maxReceiveWindowSize)) // 5000
})
It("increases the window size sent in the first WindowUpdate, if data is read fast enough", func() {
setRtt(20 * time.Millisecond)
controller.AddBytesRead(9900)
offset := controller.getWindowUpdate()
Expect(offset).ToNot(BeZero())
Expect(controller.receiveWindowSize).To(Equal(2 * oldWindowSize))
})
It("doesn't increase the window size sent in the first WindowUpdate, if data is read slowly", func() {
setRtt(5 * time.Millisecond)
controller.AddBytesRead(9900)
time.Sleep(15 * time.Millisecond) // more than 2x RTT
offset := controller.getWindowUpdate()
Expect(offset).ToNot(BeZero())
Expect(controller.receiveWindowSize).To(Equal(oldWindowSize))
}) })
}) })
}) })

View file

@ -2,7 +2,6 @@ package flowcontrol
import ( import (
"fmt" "fmt"
"time"
"github.com/lucas-clemente/quic-go/congestion" "github.com/lucas-clemente/quic-go/congestion"
"github.com/lucas-clemente/quic-go/internal/protocol" "github.com/lucas-clemente/quic-go/internal/protocol"
@ -66,7 +65,7 @@ func (c *connectionFlowController) EnsureMinimumWindowSize(inc protocol.ByteCoun
c.mutex.Lock() c.mutex.Lock()
if inc > c.receiveWindowSize { if inc > c.receiveWindowSize {
c.receiveWindowSize = utils.MinByteCount(inc, c.maxReceiveWindowSize) c.receiveWindowSize = utils.MinByteCount(inc, c.maxReceiveWindowSize)
c.lastWindowUpdateTime = time.Time{} // disables autotuning for the next window update c.startNewAutoTuningEpoch()
} }
c.mutex.Unlock() c.mutex.Unlock()
} }

View file

@ -48,21 +48,31 @@ var _ = Describe("Connection Flow controller", func() {
controller.receiveWindow = 100 controller.receiveWindow = 100
controller.receiveWindowSize = 60 controller.receiveWindowSize = 60
controller.maxReceiveWindowSize = 1000 controller.maxReceiveWindowSize = 1000
controller.bytesRead = 100 - 60
}) })
It("gets a window update", func() { It("gets a window update", func() {
controller.AddBytesRead(80) windowSize := controller.receiveWindowSize
oldOffset := controller.bytesRead
dataRead := windowSize/2 - 1 // make sure not to trigger auto-tuning
controller.AddBytesRead(dataRead)
offset := controller.GetWindowUpdate() offset := controller.GetWindowUpdate()
Expect(offset).To(Equal(protocol.ByteCount(80 + 60))) Expect(offset).To(Equal(protocol.ByteCount(oldOffset + dataRead + 60)))
}) })
It("autotunes the window", func() { It("autotunes the window", func() {
controller.AddBytesRead(80) oldOffset := controller.bytesRead
oldWindowSize := controller.receiveWindowSize
rtt := 20 * time.Millisecond rtt := 20 * time.Millisecond
setRtt(rtt) setRtt(rtt)
controller.lastWindowUpdateTime = time.Now().Add(-4*protocol.WindowUpdateThreshold*rtt + time.Millisecond) controller.epochStartTime = time.Now().Add(-time.Millisecond)
controller.epochStartOffset = oldOffset
dataRead := oldWindowSize/2 + 1
controller.AddBytesRead(dataRead)
offset := controller.GetWindowUpdate() offset := controller.GetWindowUpdate()
Expect(offset).To(Equal(protocol.ByteCount(80 + 2*60))) newWindowSize := controller.receiveWindowSize
Expect(newWindowSize).To(Equal(2 * oldWindowSize))
Expect(offset).To(Equal(protocol.ByteCount(oldOffset + dataRead + newWindowSize)))
}) })
}) })
}) })
@ -71,10 +81,11 @@ var _ = Describe("Connection Flow controller", func() {
var ( var (
oldWindowSize protocol.ByteCount oldWindowSize protocol.ByteCount
receiveWindow protocol.ByteCount = 10000 receiveWindow protocol.ByteCount = 10000
receiveWindowSize protocol.ByteCount = 600 receiveWindowSize protocol.ByteCount = 1000
) )
BeforeEach(func() { BeforeEach(func() {
controller.bytesRead = receiveWindowSize - receiveWindowSize
controller.receiveWindow = receiveWindow controller.receiveWindow = receiveWindow
controller.receiveWindowSize = receiveWindowSize controller.receiveWindowSize = receiveWindowSize
oldWindowSize = controller.receiveWindowSize oldWindowSize = controller.receiveWindowSize
@ -82,8 +93,8 @@ var _ = Describe("Connection Flow controller", func() {
}) })
It("sets the minimum window window size", func() { It("sets the minimum window window size", func() {
controller.EnsureMinimumWindowSize(1000) controller.EnsureMinimumWindowSize(1800)
Expect(controller.receiveWindowSize).To(Equal(protocol.ByteCount(1000))) Expect(controller.receiveWindowSize).To(Equal(protocol.ByteCount(1800)))
}) })
It("doesn't reduce the window window size", func() { It("doesn't reduce the window window size", func() {
@ -97,14 +108,9 @@ var _ = Describe("Connection Flow controller", func() {
Expect(controller.receiveWindowSize).To(Equal(max)) Expect(controller.receiveWindowSize).To(Equal(max))
}) })
It("doesn't auto-tune the window after the window size was increased", func() { It("starts a new epoch after the window size was increased", func() {
setRtt(20 * time.Millisecond) controller.EnsureMinimumWindowSize(1912)
controller.bytesRead = 9900 // receive window is 10000 Expect(controller.epochStartTime).To(BeTemporally("~", time.Now(), 100*time.Millisecond))
controller.lastWindowUpdateTime = time.Now().Add(-20 * time.Millisecond)
controller.EnsureMinimumWindowSize(912)
offset := controller.getWindowUpdate()
Expect(controller.receiveWindowSize).To(Equal(protocol.ByteCount(912))) // no auto-tuning
Expect(offset).To(Equal(protocol.ByteCount(9900 + 912)))
}) })
}) })
}) })

View file

@ -168,32 +168,35 @@ var _ = Describe("Stream Flow controller", func() {
BeforeEach(func() { BeforeEach(func() {
controller.receiveWindow = 100 controller.receiveWindow = 100
controller.receiveWindowSize = 60 controller.receiveWindowSize = 60
controller.bytesRead = 100 - 60
controller.connection.(*connectionFlowController).receiveWindowSize = 120 controller.connection.(*connectionFlowController).receiveWindowSize = 120
oldWindowSize = controller.receiveWindowSize oldWindowSize = controller.receiveWindowSize
}) })
It("tells the connection flow controller when the window was autotuned", func() { It("tells the connection flow controller when the window was autotuned", func() {
oldOffset := controller.bytesRead
controller.contributesToConnection = true controller.contributesToConnection = true
controller.AddBytesRead(75) setRtt(20 * time.Millisecond)
rtt := 20 * time.Millisecond controller.epochStartOffset = oldOffset
setRtt(rtt) controller.epochStartTime = time.Now().Add(-time.Millisecond)
controller.lastWindowUpdateTime = time.Now().Add(-4*protocol.WindowUpdateThreshold*rtt + time.Millisecond) controller.AddBytesRead(55)
offset := controller.GetWindowUpdate() offset := controller.GetWindowUpdate()
Expect(offset).To(Equal(protocol.ByteCount(75 + 2*60))) Expect(offset).To(Equal(protocol.ByteCount(oldOffset + 55 + 2*oldWindowSize)))
Expect(controller.receiveWindowSize).To(Equal(2 * oldWindowSize)) Expect(controller.receiveWindowSize).To(Equal(2 * oldWindowSize))
Expect(controller.connection.(*connectionFlowController).receiveWindowSize).To(Equal(protocol.ByteCount(float64(controller.receiveWindowSize) * protocol.ConnectionFlowControlMultiplier))) Expect(controller.connection.(*connectionFlowController).receiveWindowSize).To(Equal(protocol.ByteCount(float64(controller.receiveWindowSize) * protocol.ConnectionFlowControlMultiplier)))
}) })
It("doesn't tell the connection flow controller if it doesn't contribute", func() { It("doesn't tell the connection flow controller if it doesn't contribute", func() {
oldOffset := controller.bytesRead
controller.contributesToConnection = false controller.contributesToConnection = false
controller.AddBytesRead(75) setRtt(20 * time.Millisecond)
rtt := 20 * time.Millisecond controller.epochStartOffset = oldOffset
setRtt(rtt) controller.epochStartTime = time.Now().Add(-time.Millisecond)
controller.lastWindowUpdateTime = time.Now().Add(-4*protocol.WindowUpdateThreshold*rtt + time.Millisecond) controller.AddBytesRead(55)
offset := controller.GetWindowUpdate() offset := controller.GetWindowUpdate()
Expect(offset).ToNot(BeZero()) Expect(offset).ToNot(BeZero())
Expect(controller.receiveWindowSize).To(Equal(2 * oldWindowSize)) Expect(controller.receiveWindowSize).To(Equal(2 * oldWindowSize))
Expect(controller.connection.(*connectionFlowController).receiveWindowSize).To(Equal(protocol.ByteCount(120))) // unchanged Expect(controller.connection.(*connectionFlowController).receiveWindowSize).To(Equal(protocol.ByteCount(2 * oldWindowSize))) // unchanged
}) })
It("doesn't increase the window after a final offset was already received", func() { It("doesn't increase the window after a final offset was already received", func() {