send MAX_{STREAM}_DATA frames more frequently

WINDOW_UPDATEs are relatively small, and it doesn't cost much to grant
the peer more flow control credit earlier.
This commit is contained in:
Marten Seemann 2017-12-08 18:12:49 +07:00
parent 9b4cdf66fc
commit eb3e253be2
5 changed files with 46 additions and 36 deletions

View file

@ -67,9 +67,9 @@ func (c *baseFlowController) AddBytesRead(n protocol.ByteCount) {
// getWindowUpdate updates the receive window, if necessary // getWindowUpdate updates the receive window, if necessary
// it returns the new offset // it returns the new offset
func (c *baseFlowController) getWindowUpdate() protocol.ByteCount { func (c *baseFlowController) getWindowUpdate() protocol.ByteCount {
diff := c.receiveWindow - c.bytesRead bytesRemaining := c.receiveWindow - c.bytesRead
// update the window when more than half of it was already consumed // update the window when more than the threshold was consumed
if diff >= (c.receiveWindowIncrement / 2) { if bytesRemaining >= protocol.ByteCount((float64(c.receiveWindowIncrement) * float64((1 - protocol.WindowUpdateThreshold)))) {
return 0 return 0
} }
@ -86,7 +86,8 @@ func (c *baseFlowController) IsBlocked() bool {
return c.sendWindowSize() == 0 return c.sendWindowSize() == 0
} }
// maybeAdjustWindowIncrement increases the receiveWindowIncrement if we're sending WindowUpdates too often // maybeAdjustWindowIncrement increases the receiveWindowIncrement if we're sending updates too often.
// For details about auto-tuning, see https://docs.google.com/document/d/1F2YfdDXKpy20WVKJueEf4abn_LVZHhMUMS5gX6Pgjl4/edit#heading=h.hcm2y5x4qmqt.
func (c *baseFlowController) maybeAdjustWindowIncrement() { func (c *baseFlowController) maybeAdjustWindowIncrement() {
if c.lastWindowUpdateTime.IsZero() { if c.lastWindowUpdateTime.IsZero() {
return return
@ -98,8 +99,8 @@ func (c *baseFlowController) maybeAdjustWindowIncrement() {
} }
timeSinceLastWindowUpdate := time.Since(c.lastWindowUpdateTime) timeSinceLastWindowUpdate := time.Since(c.lastWindowUpdateTime)
// interval between the window updates is sufficiently large, no need to increase the increment // interval between the updates is sufficiently large, no need to increase the increment
if timeSinceLastWindowUpdate >= 2*rtt { if timeSinceLastWindowUpdate >= 4*protocol.WindowUpdateThreshold*rtt {
return return
} }
c.receiveWindowIncrement = utils.MinByteCount(2*c.receiveWindowIncrement, c.maxReceiveWindowIncrement) c.receiveWindowIncrement = utils.MinByteCount(2*c.receiveWindowIncrement, c.maxReceiveWindowIncrement)

View file

@ -59,8 +59,10 @@ var _ = Describe("Base Flow controller", func() {
}) })
Context("receive flow control", func() { Context("receive flow control", func() {
var receiveWindow protocol.ByteCount = 10000 var (
var receiveWindowIncrement protocol.ByteCount = 600 receiveWindow protocol.ByteCount = 10000
receiveWindowIncrement protocol.ByteCount = 600
)
BeforeEach(func() { BeforeEach(func() {
controller.receiveWindow = receiveWindow controller.receiveWindow = receiveWindow
@ -75,7 +77,9 @@ var _ = Describe("Base Flow controller", func() {
It("triggers a window update when necessary", func() { It("triggers a window update when necessary", func() {
controller.lastWindowUpdateTime = time.Now().Add(-time.Hour) controller.lastWindowUpdateTime = time.Now().Add(-time.Hour)
readPosition := receiveWindow - receiveWindowIncrement/2 + 1 bytesConsumed := float64(receiveWindowIncrement)*protocol.WindowUpdateThreshold + 1 // consumed 1 byte more than the threshold
bytesRemaining := receiveWindowIncrement - protocol.ByteCount(bytesConsumed)
readPosition := receiveWindow - bytesRemaining
controller.bytesRead = readPosition controller.bytesRead = readPosition
offset := controller.getWindowUpdate() offset := controller.getWindowUpdate()
Expect(offset).To(Equal(readPosition + receiveWindowIncrement)) Expect(offset).To(Equal(readPosition + receiveWindowIncrement))
@ -86,7 +90,9 @@ var _ = Describe("Base Flow controller", func() {
It("doesn't trigger a window update when not necessary", func() { It("doesn't trigger a window update when not necessary", func() {
lastWindowUpdateTime := time.Now().Add(-time.Hour) lastWindowUpdateTime := time.Now().Add(-time.Hour)
controller.lastWindowUpdateTime = lastWindowUpdateTime controller.lastWindowUpdateTime = lastWindowUpdateTime
readPosition := receiveWindow - receiveWindow/2 - 1 bytesConsumed := float64(receiveWindowIncrement)*protocol.WindowUpdateThreshold - 1 // consumed 1 byte less than the threshold
bytesRemaining := receiveWindowIncrement - protocol.ByteCount(bytesConsumed)
readPosition := receiveWindow - bytesRemaining
controller.bytesRead = readPosition controller.bytesRead = readPosition
offset := controller.getWindowUpdate() offset := controller.getWindowUpdate()
Expect(offset).To(BeZero()) Expect(offset).To(BeZero())
@ -119,23 +125,31 @@ var _ = Describe("Base Flow controller", func() {
Expect(controller.receiveWindowIncrement).To(Equal(oldIncrement)) Expect(controller.receiveWindowIncrement).To(Equal(oldIncrement))
}) })
It("increases the increment when the last WindowUpdate was sent less than two RTTs ago", func() { It("increases the increment when the last WindowUpdate was sent less than (4 * threshold) RTTs ago", func() {
setRtt(20 * time.Millisecond) rtt := 20 * time.Millisecond
controller.lastWindowUpdateTime = time.Now().Add(-35 * time.Millisecond) setRtt(rtt)
controller.maybeAdjustWindowIncrement() controller.AddBytesRead(9900) // receive window is 10000
Expect(controller.receiveWindowIncrement).To(Equal(2 * oldIncrement)) controller.lastWindowUpdateTime = time.Now().Add(-4*protocol.WindowUpdateThreshold*rtt + time.Millisecond)
offset := controller.getWindowUpdate()
Expect(offset).ToNot(BeZero())
// check that the increment was increased
newIncrement := controller.receiveWindowIncrement
Expect(newIncrement).To(Equal(2 * oldIncrement))
// check that the new increment was used to increase the offset
Expect(offset).To(Equal(protocol.ByteCount(9900 + newIncrement)))
}) })
It("doesn't increase the increase increment when the last WindowUpdate was sent more than two RTTs ago", func() { It("doesn't increase the increase increment when the last WindowUpdate was sent more than (4 * threshold) RTTs ago", func() {
setRtt(20 * time.Millisecond) rtt := 20 * time.Millisecond
controller.lastWindowUpdateTime = time.Now().Add(-45 * time.Millisecond) setRtt(rtt)
controller.lastWindowUpdateTime = time.Now().Add(-4*protocol.WindowUpdateThreshold*rtt - time.Millisecond)
controller.maybeAdjustWindowIncrement() controller.maybeAdjustWindowIncrement()
Expect(controller.receiveWindowIncrement).To(Equal(oldIncrement)) Expect(controller.receiveWindowIncrement).To(Equal(oldIncrement))
}) })
It("doesn't increase the increment to a value higher than the maxReceiveWindowIncrement", func() { It("doesn't increase the increment to a value higher than the maxReceiveWindowIncrement", func() {
setRtt(20 * time.Millisecond) setRtt(20 * time.Millisecond)
controller.lastWindowUpdateTime = time.Now().Add(-35 * time.Millisecond) controller.lastWindowUpdateTime = time.Now().Add(-time.Millisecond)
controller.maybeAdjustWindowIncrement() controller.maybeAdjustWindowIncrement()
Expect(controller.receiveWindowIncrement).To(Equal(2 * oldIncrement)) // 1200 Expect(controller.receiveWindowIncrement).To(Equal(2 * oldIncrement)) // 1200
// because the lastWindowUpdateTime is updated by MaybeTriggerWindowUpdate(), we can just call maybeAdjustWindowIncrement() multiple times and get an increase of the increment every time // because the lastWindowUpdateTime is updated by MaybeTriggerWindowUpdate(), we can just call maybeAdjustWindowIncrement() multiple times and get an increase of the increment every time
@ -147,17 +161,6 @@ var _ = Describe("Base Flow controller", func() {
Expect(controller.receiveWindowIncrement).To(Equal(controller.maxReceiveWindowIncrement)) // 3000 Expect(controller.receiveWindowIncrement).To(Equal(controller.maxReceiveWindowIncrement)) // 3000
}) })
It("returns the new increment when updating the window", func() {
setRtt(20 * time.Millisecond)
controller.AddBytesRead(9900) // receive window is 10000
controller.lastWindowUpdateTime = time.Now().Add(-35 * time.Millisecond)
offset := controller.getWindowUpdate()
Expect(offset).ToNot(BeZero())
newIncrement := controller.receiveWindowIncrement
Expect(newIncrement).To(Equal(2 * oldIncrement))
Expect(offset).To(Equal(protocol.ByteCount(9900 + newIncrement)))
})
It("increases the increment sent in the first WindowUpdate, if data is read fast enough", func() { It("increases the increment sent in the first WindowUpdate, if data is read fast enough", func() {
setRtt(20 * time.Millisecond) setRtt(20 * time.Millisecond)
controller.AddBytesRead(9900) controller.AddBytesRead(9900)

View file

@ -58,8 +58,9 @@ var _ = Describe("Connection Flow controller", func() {
It("autotunes the window", func() { It("autotunes the window", func() {
controller.AddBytesRead(80) controller.AddBytesRead(80)
setRtt(20 * time.Millisecond) rtt := 20 * time.Millisecond
controller.lastWindowUpdateTime = time.Now().Add(-35 * time.Millisecond) setRtt(rtt)
controller.lastWindowUpdateTime = time.Now().Add(-4*protocol.WindowUpdateThreshold*rtt + time.Millisecond)
offset := controller.GetWindowUpdate() offset := controller.GetWindowUpdate()
Expect(offset).To(Equal(protocol.ByteCount(80 + 2*60))) Expect(offset).To(Equal(protocol.ByteCount(80 + 2*60)))
}) })

View file

@ -175,8 +175,9 @@ var _ = Describe("Stream Flow controller", func() {
It("tells the connection flow controller when the window was autotuned", func() { It("tells the connection flow controller when the window was autotuned", func() {
controller.contributesToConnection = true controller.contributesToConnection = true
controller.AddBytesRead(75) controller.AddBytesRead(75)
setRtt(20 * time.Millisecond) rtt := 20 * time.Millisecond
controller.lastWindowUpdateTime = time.Now().Add(-35 * time.Millisecond) setRtt(rtt)
controller.lastWindowUpdateTime = time.Now().Add(-4*protocol.WindowUpdateThreshold*rtt + time.Millisecond)
offset := controller.GetWindowUpdate() offset := controller.GetWindowUpdate()
Expect(offset).To(Equal(protocol.ByteCount(75 + 2*60))) Expect(offset).To(Equal(protocol.ByteCount(75 + 2*60)))
Expect(controller.receiveWindowIncrement).To(Equal(2 * oldIncrement)) Expect(controller.receiveWindowIncrement).To(Equal(2 * oldIncrement))
@ -186,8 +187,9 @@ var _ = Describe("Stream Flow controller", func() {
It("doesn't tell the connection flow controller if it doesn't contribute", func() { It("doesn't tell the connection flow controller if it doesn't contribute", func() {
controller.contributesToConnection = false controller.contributesToConnection = false
controller.AddBytesRead(75) controller.AddBytesRead(75)
setRtt(20 * time.Millisecond) rtt := 20 * time.Millisecond
controller.lastWindowUpdateTime = time.Now().Add(-35 * time.Millisecond) setRtt(rtt)
controller.lastWindowUpdateTime = time.Now().Add(-4*protocol.WindowUpdateThreshold*rtt + time.Millisecond)
offset := controller.GetWindowUpdate() offset := controller.GetWindowUpdate()
Expect(offset).ToNot(BeZero()) Expect(offset).ToNot(BeZero())
Expect(controller.receiveWindowIncrement).To(Equal(2 * oldIncrement)) Expect(controller.receiveWindowIncrement).To(Equal(2 * oldIncrement))

View file

@ -56,6 +56,9 @@ const DefaultMaxReceiveConnectionFlowControlWindowClient = 15 * (1 << 20) // 15
// This is the value that Chromium is using // This is the value that Chromium is using
const ConnectionFlowControlMultiplier = 1.5 const ConnectionFlowControlMultiplier = 1.5
// WindowUpdateThreshold is the fraction of the receive window that has to be consumed before an higher offset is advertised to the client
const WindowUpdateThreshold = 0.25
// MaxIncomingStreams is the maximum number of streams that a peer may open // MaxIncomingStreams is the maximum number of streams that a peer may open
const MaxIncomingStreams = 100 const MaxIncomingStreams = 100