diff --git a/internal/flowcontrol/base_flow_controller.go b/internal/flowcontrol/base_flow_controller.go index da6a7125..2bf14fdc 100644 --- a/internal/flowcontrol/base_flow_controller.go +++ b/internal/flowcontrol/base_flow_controller.go @@ -23,6 +23,8 @@ type baseFlowController struct { receiveWindowSize protocol.ByteCount maxReceiveWindowSize protocol.ByteCount + allowWindowIncrease func(size protocol.ByteCount) bool + epochStartTime time.Time epochStartOffset protocol.ByteCount rttStats *utils.RTTStats @@ -105,7 +107,10 @@ func (c *baseFlowController) maybeAdjustWindowSize() { now := time.Now() if now.Sub(c.epochStartTime) < time.Duration(4*fraction*float64(rtt)) { // window is consumed too fast, try to increase the window size - c.receiveWindowSize = utils.MinByteCount(2*c.receiveWindowSize, c.maxReceiveWindowSize) + newSize := utils.MinByteCount(2*c.receiveWindowSize, c.maxReceiveWindowSize) + if newSize > c.receiveWindowSize && (c.allowWindowIncrease == nil || c.allowWindowIncrease(newSize-c.receiveWindowSize)) { + c.receiveWindowSize = newSize + } } c.startNewAutoTuningEpoch(now) } diff --git a/internal/flowcontrol/connection_flow_controller.go b/internal/flowcontrol/connection_flow_controller.go index 609ca97c..6bf2241b 100644 --- a/internal/flowcontrol/connection_flow_controller.go +++ b/internal/flowcontrol/connection_flow_controller.go @@ -24,6 +24,7 @@ func NewConnectionFlowController( receiveWindow protocol.ByteCount, maxReceiveWindow protocol.ByteCount, queueWindowUpdate func(), + allowWindowIncrease func(size protocol.ByteCount) bool, rttStats *utils.RTTStats, logger utils.Logger, ) ConnectionFlowController { @@ -33,6 +34,7 @@ func NewConnectionFlowController( receiveWindow: receiveWindow, receiveWindowSize: receiveWindow, maxReceiveWindowSize: maxReceiveWindow, + allowWindowIncrease: allowWindowIncrease, logger: logger, }, queueWindowUpdate: queueWindowUpdate, @@ -85,13 +87,16 @@ func (c *connectionFlowController) EnsureMinimumWindowSize(inc protocol.ByteCoun c.mutex.Lock() if inc > c.receiveWindowSize { c.logger.Debugf("Increasing receive flow control window for the connection to %d kB, in response to stream flow control window increase", c.receiveWindowSize/(1<<10)) - c.receiveWindowSize = utils.MinByteCount(inc, c.maxReceiveWindowSize) + newSize := utils.MinByteCount(inc, c.maxReceiveWindowSize) + if delta := newSize - c.receiveWindowSize; delta > 0 && c.allowWindowIncrease(delta) { + c.receiveWindowSize = newSize + } c.startNewAutoTuningEpoch(time.Now()) } c.mutex.Unlock() } -// The flow controller is reset when 0-RTT is rejected. +// Reset rests the flow controller. This happens when 0-RTT is rejected. // All stream data is invalidated, it's if we had never opened a stream and never sent any data. // At that point, we only have sent stream data, but we didn't have the keys to open 1-RTT keys yet. func (c *connectionFlowController) Reset() error { diff --git a/internal/flowcontrol/connection_flow_controller_test.go b/internal/flowcontrol/connection_flow_controller_test.go index 936f129c..a874e1b7 100644 --- a/internal/flowcontrol/connection_flow_controller_test.go +++ b/internal/flowcontrol/connection_flow_controller_test.go @@ -27,6 +27,7 @@ var _ = Describe("Connection Flow controller", func() { controller.rttStats = &utils.RTTStats{} controller.logger = utils.DefaultLogger controller.queueWindowUpdate = func() { queuedWindowUpdate = true } + controller.allowWindowIncrease = func(protocol.ByteCount) bool { return true } }) Context("Constructor", func() { @@ -36,7 +37,13 @@ var _ = Describe("Connection Flow controller", func() { receiveWindow := protocol.ByteCount(2000) maxReceiveWindow := protocol.ByteCount(3000) - fc := NewConnectionFlowController(receiveWindow, maxReceiveWindow, nil, rttStats, utils.DefaultLogger).(*connectionFlowController) + fc := NewConnectionFlowController( + receiveWindow, + maxReceiveWindow, + nil, + func(protocol.ByteCount) bool { return true }, + rttStats, + utils.DefaultLogger).(*connectionFlowController) Expect(fc.receiveWindow).To(Equal(receiveWindow)) Expect(fc.maxReceiveWindowSize).To(Equal(maxReceiveWindow)) }) @@ -78,6 +85,11 @@ var _ = Describe("Connection Flow controller", func() { }) It("auto-tunes the window", func() { + var allowed protocol.ByteCount + controller.allowWindowIncrease = func(size protocol.ByteCount) bool { + allowed = size + return true + } oldOffset := controller.bytesRead oldWindowSize := controller.receiveWindowSize rtt := scaleDuration(20 * time.Millisecond) @@ -90,6 +102,23 @@ var _ = Describe("Connection Flow controller", func() { newWindowSize := controller.receiveWindowSize Expect(newWindowSize).To(Equal(2 * oldWindowSize)) Expect(offset).To(Equal(oldOffset + dataRead + newWindowSize)) + Expect(allowed).To(Equal(oldWindowSize)) + }) + + It("doesn't auto-tune the window if it's not allowed", func() { + controller.allowWindowIncrease = func(protocol.ByteCount) bool { return false } + oldOffset := controller.bytesRead + oldWindowSize := controller.receiveWindowSize + rtt := scaleDuration(20 * time.Millisecond) + setRtt(rtt) + controller.epochStartTime = time.Now().Add(-time.Millisecond) + controller.epochStartOffset = oldOffset + dataRead := oldWindowSize/2 + 1 + controller.AddBytesRead(dataRead) + offset := controller.GetWindowUpdate() + newWindowSize := controller.receiveWindowSize + Expect(newWindowSize).To(Equal(oldWindowSize)) + Expect(offset).To(Equal(oldOffset + dataRead + newWindowSize)) }) }) }) diff --git a/internal/flowcontrol/stream_flow_controller_test.go b/internal/flowcontrol/stream_flow_controller_test.go index 72e5843e..99228a25 100644 --- a/internal/flowcontrol/stream_flow_controller_test.go +++ b/internal/flowcontrol/stream_flow_controller_test.go @@ -21,8 +21,15 @@ var _ = Describe("Stream Flow controller", func() { queuedWindowUpdate = false rttStats := &utils.RTTStats{} controller = &streamFlowController{ - streamID: 10, - connection: NewConnectionFlowController(1000, 1000, func() {}, rttStats, utils.DefaultLogger).(*connectionFlowController), + streamID: 10, + connection: NewConnectionFlowController( + 1000, + 1000, + func() {}, + func(protocol.ByteCount) bool { return true }, + rttStats, + utils.DefaultLogger, + ).(*connectionFlowController), } controller.maxReceiveWindowSize = 10000 controller.rttStats = rttStats @@ -37,7 +44,7 @@ var _ = Describe("Stream Flow controller", func() { const sendWindow protocol.ByteCount = 4000 It("sets the send and receive windows", func() { - cc := NewConnectionFlowController(0, 0, nil, nil, utils.DefaultLogger) + cc := NewConnectionFlowController(0, 0, nil, func(protocol.ByteCount) bool { return true }, nil, utils.DefaultLogger) fc := NewStreamFlowController(5, cc, receiveWindow, maxReceiveWindow, sendWindow, nil, rttStats, utils.DefaultLogger).(*streamFlowController) Expect(fc.streamID).To(Equal(protocol.StreamID(5))) Expect(fc.receiveWindow).To(Equal(receiveWindow)) @@ -52,7 +59,7 @@ var _ = Describe("Stream Flow controller", func() { queued = true } - cc := NewConnectionFlowController(receiveWindow, maxReceiveWindow, func() {}, nil, utils.DefaultLogger) + cc := NewConnectionFlowController(receiveWindow, maxReceiveWindow, func() {}, func(protocol.ByteCount) bool { return true }, nil, utils.DefaultLogger) fc := NewStreamFlowController(5, cc, receiveWindow, maxReceiveWindow, sendWindow, queueWindowUpdate, rttStats, utils.DefaultLogger).(*streamFlowController) fc.AddBytesRead(receiveWindow) Expect(queued).To(BeTrue()) @@ -190,6 +197,11 @@ var _ = Describe("Stream Flow controller", func() { }) It("tells the connection flow controller when the window was auto-tuned", func() { + var allowed protocol.ByteCount + controller.connection.(*connectionFlowController).allowWindowIncrease = func(size protocol.ByteCount) bool { + allowed = size + return true + } oldOffset := controller.bytesRead setRtt(scaleDuration(20 * time.Millisecond)) controller.epochStartOffset = oldOffset @@ -198,9 +210,24 @@ var _ = Describe("Stream Flow controller", func() { offset := controller.GetWindowUpdate() Expect(offset).To(Equal(oldOffset + 55 + 2*oldWindowSize)) Expect(controller.receiveWindowSize).To(Equal(2 * oldWindowSize)) + Expect(allowed).To(Equal(oldWindowSize)) Expect(controller.connection.(*connectionFlowController).receiveWindowSize).To(Equal(protocol.ByteCount(float64(controller.receiveWindowSize) * protocol.ConnectionFlowControlMultiplier))) }) + It("doesn't increase the connection flow control window if it's not allowed", func() { + oldOffset := controller.bytesRead + oldConnectionSize := controller.connection.(*connectionFlowController).receiveWindowSize + controller.connection.(*connectionFlowController).allowWindowIncrease = func(protocol.ByteCount) bool { return false } + setRtt(scaleDuration(20 * time.Millisecond)) + controller.epochStartOffset = oldOffset + controller.epochStartTime = time.Now().Add(-time.Millisecond) + controller.AddBytesRead(55) + offset := controller.GetWindowUpdate() + Expect(offset).To(Equal(oldOffset + 55 + 2*oldWindowSize)) + Expect(controller.receiveWindowSize).To(Equal(2 * oldWindowSize)) + Expect(controller.connection.(*connectionFlowController).receiveWindowSize).To(Equal(oldConnectionSize)) + }) + It("sends a connection-level window update when a large stream is abandoned", func() { Expect(controller.UpdateHighestReceived(90, true)).To(Succeed()) Expect(controller.connection.GetWindowUpdate()).To(BeZero()) diff --git a/session.go b/session.go index 713f15fc..c2f6e65e 100644 --- a/session.go +++ b/session.go @@ -506,6 +506,7 @@ func (s *session) preSetup() { protocol.ByteCount(s.config.InitialConnectionReceiveWindow), protocol.ByteCount(s.config.MaxConnectionReceiveWindow), s.onHasConnectionWindowUpdate, + func(protocol.ByteCount) bool { return true }, s.rttStats, s.logger, )