From c53a83535e87c677f83c9bb3f98247e05744b9cb Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 16 Oct 2017 17:37:11 +0700 Subject: [PATCH] split the flow controller in stream and connection flow controller --- internal/flowcontrol/base_flow_controller.go | 107 +++++++++++ ...r_test.go => base_flow_controller_test.go} | 96 +--------- .../flowcontrol/connection_flow_controller.go | 55 ++++++ .../connection_flow_controller_test.go | 88 +++++++++ internal/flowcontrol/flow_control_manager.go | 26 ++- .../flowcontrol/flow_control_manager_test.go | 1 - internal/flowcontrol/flow_controller.go | 179 ------------------ .../flowcontrol/stream_flow_controller.go | 66 +++++++ .../stream_flow_controller_test.go | 83 ++++++++ 9 files changed, 414 insertions(+), 287 deletions(-) create mode 100644 internal/flowcontrol/base_flow_controller.go rename internal/flowcontrol/{flow_controller_test.go => base_flow_controller_test.go} (66%) create mode 100644 internal/flowcontrol/connection_flow_controller.go create mode 100644 internal/flowcontrol/connection_flow_controller_test.go delete mode 100644 internal/flowcontrol/flow_controller.go create mode 100644 internal/flowcontrol/stream_flow_controller.go create mode 100644 internal/flowcontrol/stream_flow_controller_test.go diff --git a/internal/flowcontrol/base_flow_controller.go b/internal/flowcontrol/base_flow_controller.go new file mode 100644 index 00000000..baa8b045 --- /dev/null +++ b/internal/flowcontrol/base_flow_controller.go @@ -0,0 +1,107 @@ +package flowcontrol + +import ( + "errors" + "time" + + "github.com/lucas-clemente/quic-go/congestion" + "github.com/lucas-clemente/quic-go/internal/protocol" + "github.com/lucas-clemente/quic-go/internal/utils" +) + +type baseFlowController struct { + rttStats *congestion.RTTStats + + bytesSent protocol.ByteCount + sendWindow protocol.ByteCount + + lastWindowUpdateTime time.Time + + bytesRead protocol.ByteCount + highestReceived protocol.ByteCount + receiveWindow protocol.ByteCount + receiveWindowIncrement protocol.ByteCount + maxReceiveWindowIncrement protocol.ByteCount +} + +// ErrReceivedSmallerByteOffset occurs if the ByteOffset received is smaller than a ByteOffset that was set previously +var ErrReceivedSmallerByteOffset = errors.New("Received a smaller byte offset") + +func (c *baseFlowController) AddBytesSent(n protocol.ByteCount) { + c.bytesSent += n +} + +// UpdateSendWindow should be called after receiving a WindowUpdateFrame +// it returns true if the window was actually updated +func (c *baseFlowController) UpdateSendWindow(newOffset protocol.ByteCount) bool { + if newOffset > c.sendWindow { + c.sendWindow = newOffset + return true + } + return false +} + +func (c *baseFlowController) SendWindowSize() protocol.ByteCount { + // this only happens during connection establishment, when data is sent before we receive the peer's transport parameters + if c.bytesSent > c.sendWindow { + return 0 + } + return c.sendWindow - c.bytesSent +} + +func (c *baseFlowController) AddBytesRead(n protocol.ByteCount) { + // pretend we sent a WindowUpdate when reading the first byte + // this way auto-tuning of the window increment already works for the first WindowUpdate + if c.bytesRead == 0 { + c.lastWindowUpdateTime = time.Now() + } + c.bytesRead += n +} + +// MaybeUpdateWindow updates the receive window, if necessary +// if the receive window increment is changed, the new value is returned, otherwise a 0 +// the last return value is the new offset of the receive window +func (c *baseFlowController) MaybeUpdateWindow() (bool, protocol.ByteCount /* new increment */, protocol.ByteCount /* new offset */) { + diff := c.receiveWindow - c.bytesRead + + // Chromium implements the same threshold + if diff < (c.receiveWindowIncrement / 2) { + var newWindowIncrement protocol.ByteCount + oldWindowIncrement := c.receiveWindowIncrement + + c.maybeAdjustWindowIncrement() + if c.receiveWindowIncrement != oldWindowIncrement { + newWindowIncrement = c.receiveWindowIncrement + } + + c.lastWindowUpdateTime = time.Now() + c.receiveWindow = c.bytesRead + c.receiveWindowIncrement + return true, newWindowIncrement, c.receiveWindow + } + + return false, 0, 0 +} + +// maybeAdjustWindowIncrement increases the receiveWindowIncrement if we're sending WindowUpdates too often +func (c *baseFlowController) maybeAdjustWindowIncrement() { + if c.lastWindowUpdateTime.IsZero() { + return + } + + rtt := c.rttStats.SmoothedRTT() + if rtt == 0 { + return + } + + timeSinceLastWindowUpdate := time.Since(c.lastWindowUpdateTime) + + // interval between the window updates is sufficiently large, no need to increase the increment + if timeSinceLastWindowUpdate >= 2*rtt { + return + } + c.receiveWindowIncrement = utils.MinByteCount(2*c.receiveWindowIncrement, c.maxReceiveWindowIncrement) +} + +func (c *baseFlowController) CheckFlowControlViolation() bool { + return c.highestReceived > c.receiveWindow +} diff --git a/internal/flowcontrol/flow_controller_test.go b/internal/flowcontrol/base_flow_controller_test.go similarity index 66% rename from internal/flowcontrol/flow_controller_test.go rename to internal/flowcontrol/base_flow_controller_test.go index 161dafc1..e68c91a2 100644 --- a/internal/flowcontrol/flow_controller_test.go +++ b/internal/flowcontrol/base_flow_controller_test.go @@ -9,36 +9,14 @@ import ( . "github.com/onsi/gomega" ) -var _ = Describe("Flow controller", func() { - var controller *flowController +var _ = Describe("Base Flow controller", func() { + var controller *baseFlowController BeforeEach(func() { - controller = &flowController{} + controller = &baseFlowController{} controller.rttStats = &congestion.RTTStats{} }) - Context("Constructor", func() { - rttStats := &congestion.RTTStats{} - - It("sets the send and receive windows", func() { - receiveWindow := protocol.ByteCount(2000) - maxReceiveWindow := protocol.ByteCount(3000) - sendWindow := protocol.ByteCount(4000) - fc := newFlowController(5, true, receiveWindow, maxReceiveWindow, sendWindow, rttStats) - Expect(fc.streamID).To(Equal(protocol.StreamID(5))) - Expect(fc.receiveWindow).To(Equal(receiveWindow)) - Expect(fc.maxReceiveWindowIncrement).To(Equal(maxReceiveWindow)) - Expect(fc.sendWindow).To(Equal(sendWindow)) - }) - - It("says if it contributes to connection-level flow control", func() { - fc := newFlowController(1, false, protocol.MaxByteCount, protocol.MaxByteCount, protocol.MaxByteCount, rttStats) - Expect(fc.ContributesToConnection()).To(BeFalse()) - fc = newFlowController(5, true, protocol.MaxByteCount, protocol.MaxByteCount, protocol.MaxByteCount, rttStats) - Expect(fc.ContributesToConnection()).To(BeTrue()) - }) - }) - Context("send flow control", func() { It("adds bytes sent", func() { controller.bytesSent = 5 @@ -112,45 +90,6 @@ var _ = Describe("Flow controller", func() { Expect(controller.lastWindowUpdateTime).To(Equal(lastWindowUpdateTime)) }) - It("updates the highestReceived", func() { - controller.highestReceived = 1337 - increment, err := controller.UpdateHighestReceived(1338) - Expect(err).ToNot(HaveOccurred()) - Expect(increment).To(Equal(protocol.ByteCount(1338 - 1337))) - Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1338))) - }) - - It("does not decrease the highestReceived", func() { - controller.highestReceived = 1337 - increment, err := controller.UpdateHighestReceived(1000) - Expect(err).To(MatchError(ErrReceivedSmallerByteOffset)) - Expect(increment).To(BeZero()) - Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1337))) - }) - - It("does not error when setting the same byte offset", func() { - controller.highestReceived = 1337 - increment, err := controller.UpdateHighestReceived(1337) - Expect(err).ToNot(HaveOccurred()) - Expect(increment).To(BeZero()) - }) - - It("increases the highestReceived by a given increment", func() { - controller.highestReceived = 1337 - controller.IncrementHighestReceived(123) - Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1337 + 123))) - }) - - It("detects a flow control violation", func() { - controller.UpdateHighestReceived(receiveWindow + 1) - Expect(controller.CheckFlowControlViolation()).To(BeTrue()) - }) - - It("does not give a flow control violation when using the window completely", func() { - controller.UpdateHighestReceived(receiveWindow) - Expect(controller.CheckFlowControlViolation()).To(BeFalse()) - }) - Context("receive window increment auto-tuning", func() { var oldIncrement protocol.ByteCount @@ -243,35 +182,6 @@ var _ = Describe("Flow controller", func() { Expect(controller.receiveWindowIncrement).To(Equal(oldIncrement)) Expect(offset).To(Equal(protocol.ByteCount(9900 + oldIncrement))) }) - - Context("setting the minimum increment", func() { - It("sets the minimum window increment", func() { - controller.EnsureMinimumWindowIncrement(1000) - Expect(controller.receiveWindowIncrement).To(Equal(protocol.ByteCount(1000))) - }) - - It("doesn't reduce the window increment", func() { - controller.EnsureMinimumWindowIncrement(1) - Expect(controller.receiveWindowIncrement).To(Equal(oldIncrement)) - }) - - It("doens't increase the increment beyong the maxReceiveWindowIncrement", func() { - max := controller.maxReceiveWindowIncrement - controller.EnsureMinimumWindowIncrement(2 * max) - Expect(controller.receiveWindowIncrement).To(Equal(max)) - }) - - It("doesn't auto-tune the window after the increment was increased", func() { - setRtt(20 * time.Millisecond) - controller.bytesRead = 9900 // receive window is 10000 - controller.lastWindowUpdateTime = time.Now().Add(-20 * time.Millisecond) - controller.EnsureMinimumWindowIncrement(912) - necessary, newIncrement, offset := controller.MaybeUpdateWindow() - Expect(necessary).To(BeTrue()) - Expect(newIncrement).To(BeZero()) // no auto-tuning - Expect(offset).To(Equal(protocol.ByteCount(9900 + 912))) - }) - }) }) }) }) diff --git a/internal/flowcontrol/connection_flow_controller.go b/internal/flowcontrol/connection_flow_controller.go new file mode 100644 index 00000000..74ab8f42 --- /dev/null +++ b/internal/flowcontrol/connection_flow_controller.go @@ -0,0 +1,55 @@ +package flowcontrol + +import ( + "time" + + "github.com/lucas-clemente/quic-go/congestion" + "github.com/lucas-clemente/quic-go/internal/protocol" + "github.com/lucas-clemente/quic-go/internal/utils" +) + +type connectionFlowController struct { + baseFlowController +} + +// newConnectionFlowController gets a new flow controller for the connection +func newConnectionFlowController( + receiveWindow protocol.ByteCount, + maxReceiveWindow protocol.ByteCount, + initialSendWindow protocol.ByteCount, + rttStats *congestion.RTTStats, +) *connectionFlowController { + return &connectionFlowController{ + baseFlowController: baseFlowController{ + rttStats: rttStats, + receiveWindow: receiveWindow, + receiveWindowIncrement: receiveWindow, + maxReceiveWindowIncrement: maxReceiveWindow, + sendWindow: initialSendWindow, + }, + } +} + +// EnsureMinimumWindowIncrement sets a minimum window increment +// it should make sure that the connection-level window is increased when a stream-level window grows +func (c *connectionFlowController) EnsureMinimumWindowIncrement(inc protocol.ByteCount) { + if inc > c.receiveWindowIncrement { + c.receiveWindowIncrement = utils.MinByteCount(inc, c.maxReceiveWindowIncrement) + c.lastWindowUpdateTime = time.Time{} // disables autotuning for the next window update + } +} + +// IncrementHighestReceived adds an increment to the highestReceived value +func (c *connectionFlowController) IncrementHighestReceived(increment protocol.ByteCount) { + c.highestReceived += increment +} + +func (c *connectionFlowController) MaybeUpdateWindow() (bool, protocol.ByteCount, protocol.ByteCount) { + oldWindowSize := c.receiveWindowIncrement + updated, newIncrement, newOffset := c.baseFlowController.MaybeUpdateWindow() + // debug log, if the window size was actually increased + if oldWindowSize < c.receiveWindowIncrement { + utils.Debugf("Increasing receive flow control window for the connection to %d kB", c.receiveWindowIncrement/(1<<10)) + } + return updated, newIncrement, newOffset +} diff --git a/internal/flowcontrol/connection_flow_controller_test.go b/internal/flowcontrol/connection_flow_controller_test.go new file mode 100644 index 00000000..89300e01 --- /dev/null +++ b/internal/flowcontrol/connection_flow_controller_test.go @@ -0,0 +1,88 @@ +package flowcontrol + +import ( + "time" + + "github.com/lucas-clemente/quic-go/congestion" + "github.com/lucas-clemente/quic-go/internal/protocol" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Connection Flow controller", func() { + var controller *connectionFlowController + + BeforeEach(func() { + controller = &connectionFlowController{} + controller.rttStats = &congestion.RTTStats{} + }) + + Context("Constructor", func() { + rttStats := &congestion.RTTStats{} + + It("sets the send and receive windows", func() { + receiveWindow := protocol.ByteCount(2000) + maxReceiveWindow := protocol.ByteCount(3000) + sendWindow := protocol.ByteCount(4000) + + fc := newConnectionFlowController(receiveWindow, maxReceiveWindow, sendWindow, rttStats) + Expect(fc.receiveWindow).To(Equal(receiveWindow)) + Expect(fc.maxReceiveWindowIncrement).To(Equal(maxReceiveWindow)) + Expect(fc.sendWindow).To(Equal(sendWindow)) + }) + }) + + Context("receive flow control", func() { + It("increases the highestReceived by a given increment", func() { + controller.highestReceived = 1337 + controller.IncrementHighestReceived(123) + Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1337 + 123))) + }) + }) + + Context("setting the minimum increment", func() { + var oldIncrement protocol.ByteCount + var receiveWindow protocol.ByteCount = 10000 + var receiveWindowIncrement protocol.ByteCount = 600 + + BeforeEach(func() { + controller.receiveWindow = receiveWindow + controller.receiveWindowIncrement = receiveWindowIncrement + oldIncrement = controller.receiveWindowIncrement + controller.maxReceiveWindowIncrement = 3000 + }) + + // update the congestion such that it returns a given value for the smoothed RTT + setRtt := func(t time.Duration) { + controller.rttStats.UpdateRTT(t, 0, time.Now()) + Expect(controller.rttStats.SmoothedRTT()).To(Equal(t)) // make sure it worked + } + + It("sets the minimum window increment", func() { + controller.EnsureMinimumWindowIncrement(1000) + Expect(controller.receiveWindowIncrement).To(Equal(protocol.ByteCount(1000))) + }) + + It("doesn't reduce the window increment", func() { + controller.EnsureMinimumWindowIncrement(1) + Expect(controller.receiveWindowIncrement).To(Equal(oldIncrement)) + }) + + It("doens't increase the increment beyond the maxReceiveWindowIncrement", func() { + max := controller.maxReceiveWindowIncrement + controller.EnsureMinimumWindowIncrement(2 * max) + Expect(controller.receiveWindowIncrement).To(Equal(max)) + }) + + It("doesn't auto-tune the window after the increment was increased", func() { + setRtt(20 * time.Millisecond) + controller.bytesRead = 9900 // receive window is 10000 + controller.lastWindowUpdateTime = time.Now().Add(-20 * time.Millisecond) + controller.EnsureMinimumWindowIncrement(912) + necessary, newIncrement, offset := controller.MaybeUpdateWindow() + Expect(necessary).To(BeTrue()) + Expect(newIncrement).To(BeZero()) // no auto-tuning + Expect(offset).To(Equal(protocol.ByteCount(9900 + 912))) + }) + }) +}) diff --git a/internal/flowcontrol/flow_control_manager.go b/internal/flowcontrol/flow_control_manager.go index 902eaa79..148aa9cc 100644 --- a/internal/flowcontrol/flow_control_manager.go +++ b/internal/flowcontrol/flow_control_manager.go @@ -16,8 +16,8 @@ type flowControlManager struct { rttStats *congestion.RTTStats maxReceiveStreamWindow protocol.ByteCount - streamFlowController map[protocol.StreamID]*flowController - connFlowController *flowController + streamFlowController map[protocol.StreamID]*streamFlowController + connFlowController *connectionFlowController mutex sync.RWMutex initialStreamSendWindow protocol.ByteCount @@ -36,8 +36,8 @@ func NewFlowControlManager( return &flowControlManager{ rttStats: rttStats, maxReceiveStreamWindow: maxReceiveStreamWindow, - streamFlowController: make(map[protocol.StreamID]*flowController), - connFlowController: newFlowController(0, false, protocol.ReceiveConnectionFlowControlWindow, maxReceiveConnectionWindow, 0, rttStats), + streamFlowController: make(map[protocol.StreamID]*streamFlowController), + connFlowController: newConnectionFlowController(protocol.ReceiveConnectionFlowControlWindow, maxReceiveConnectionWindow, 0, rttStats), } } @@ -50,7 +50,7 @@ func (f *flowControlManager) NewStream(streamID protocol.StreamID, contributesTo if _, ok := f.streamFlowController[streamID]; ok { return } - f.streamFlowController[streamID] = newFlowController(streamID, contributesToConnection, protocol.ReceiveStreamFlowControlWindow, f.maxReceiveStreamWindow, f.initialStreamSendWindow, f.rttStats) + f.streamFlowController[streamID] = newStreamFlowController(streamID, contributesToConnection, protocol.ReceiveStreamFlowControlWindow, f.maxReceiveStreamWindow, f.initialStreamSendWindow, f.rttStats) } // RemoveStream removes a closed stream from flow control @@ -233,20 +233,18 @@ func (f *flowControlManager) UpdateWindow(streamID protocol.StreamID, offset pro f.mutex.Lock() defer f.mutex.Unlock() - var fc *flowController if streamID == 0 { - fc = f.connFlowController - } else { - var err error - fc, err = f.getFlowController(streamID) - if err != nil { - return false, err - } + return f.connFlowController.UpdateSendWindow(offset), nil + } + + fc, err := f.getFlowController(streamID) + if err != nil { + return false, err } return fc.UpdateSendWindow(offset), nil } -func (f *flowControlManager) getFlowController(streamID protocol.StreamID) (*flowController, error) { +func (f *flowControlManager) getFlowController(streamID protocol.StreamID) (*streamFlowController, error) { streamFlowController, ok := f.streamFlowController[streamID] if !ok { return nil, errMapAccess diff --git a/internal/flowcontrol/flow_control_manager_test.go b/internal/flowcontrol/flow_control_manager_test.go index 09f2f64f..e135a5b7 100644 --- a/internal/flowcontrol/flow_control_manager_test.go +++ b/internal/flowcontrol/flow_control_manager_test.go @@ -25,7 +25,6 @@ var _ = Describe("Flow Control Manager", func() { It("creates a connection level flow controller", func() { Expect(fcm.streamFlowController).To(BeEmpty()) - Expect(fcm.connFlowController.ContributesToConnection()).To(BeFalse()) Expect(fcm.connFlowController.sendWindow).To(BeZero()) Expect(fcm.connFlowController.maxReceiveWindowIncrement).To(Equal(protocol.ByteCount(0x4000))) }) diff --git a/internal/flowcontrol/flow_controller.go b/internal/flowcontrol/flow_controller.go deleted file mode 100644 index 8bd42492..00000000 --- a/internal/flowcontrol/flow_controller.go +++ /dev/null @@ -1,179 +0,0 @@ -package flowcontrol - -import ( - "errors" - "time" - - "github.com/lucas-clemente/quic-go/congestion" - "github.com/lucas-clemente/quic-go/internal/protocol" - "github.com/lucas-clemente/quic-go/internal/utils" -) - -type flowController struct { - streamID protocol.StreamID - contributesToConnection bool // does the stream contribute to connection level flow control - - rttStats *congestion.RTTStats - - bytesSent protocol.ByteCount - sendWindow protocol.ByteCount - - lastWindowUpdateTime time.Time - - bytesRead protocol.ByteCount - highestReceived protocol.ByteCount - receiveWindow protocol.ByteCount - receiveWindowIncrement protocol.ByteCount - maxReceiveWindowIncrement protocol.ByteCount -} - -// ErrReceivedSmallerByteOffset occurs if the ByteOffset received is smaller than a ByteOffset that was set previously -var ErrReceivedSmallerByteOffset = errors.New("Received a smaller byte offset") - -// newFlowController gets a new flow controller -func newFlowController( - streamID protocol.StreamID, - contributesToConnection bool, - receiveWindow protocol.ByteCount, - maxReceiveWindow protocol.ByteCount, - initialSendWindow protocol.ByteCount, - rttStats *congestion.RTTStats, -) *flowController { - return &flowController{ - streamID: streamID, - contributesToConnection: contributesToConnection, - rttStats: rttStats, - receiveWindow: receiveWindow, - receiveWindowIncrement: receiveWindow, - maxReceiveWindowIncrement: maxReceiveWindow, - sendWindow: initialSendWindow, - } -} - -func (c *flowController) ContributesToConnection() bool { - return c.contributesToConnection -} - -func (c *flowController) AddBytesSent(n protocol.ByteCount) { - c.bytesSent += n -} - -// UpdateSendWindow should be called after receiving a WindowUpdateFrame -// it returns true if the window was actually updated -func (c *flowController) UpdateSendWindow(newOffset protocol.ByteCount) bool { - if newOffset > c.sendWindow { - c.sendWindow = newOffset - return true - } - return false -} - -func (c *flowController) SendWindowSize() protocol.ByteCount { - // this only happens during connection establishment, when data is sent before we receive the peer's transport parameters - if c.bytesSent > c.sendWindow { - return 0 - } - return c.sendWindow - c.bytesSent -} - -// UpdateHighestReceived updates the highestReceived value, if the byteOffset is higher -// Should **only** be used for the stream-level FlowController -// it returns an ErrReceivedSmallerByteOffset if the received byteOffset is smaller than any byteOffset received before -// This error occurs every time StreamFrames get reordered and has to be ignored in that case -// It should only be treated as an error when resetting a stream -func (c *flowController) UpdateHighestReceived(byteOffset protocol.ByteCount) (protocol.ByteCount, error) { - if byteOffset == c.highestReceived { - return 0, nil - } - if byteOffset > c.highestReceived { - increment := byteOffset - c.highestReceived - c.highestReceived = byteOffset - return increment, nil - } - return 0, ErrReceivedSmallerByteOffset -} - -// IncrementHighestReceived adds an increment to the highestReceived value -// Should **only** be used for the connection-level FlowController -func (c *flowController) IncrementHighestReceived(increment protocol.ByteCount) { - c.highestReceived += increment -} - -func (c *flowController) AddBytesRead(n protocol.ByteCount) { - // pretend we sent a WindowUpdate when reading the first byte - // this way auto-tuning of the window increment already works for the first WindowUpdate - if c.bytesRead == 0 { - c.lastWindowUpdateTime = time.Now() - } - c.bytesRead += n -} - -// MaybeUpdateWindow updates the receive window, if necessary -// if the receive window increment is changed, the new value is returned, otherwise a 0 -// the last return value is the new offset of the receive window -func (c *flowController) MaybeUpdateWindow() (bool, protocol.ByteCount /* new increment */, protocol.ByteCount /* new offset */) { - diff := c.receiveWindow - c.bytesRead - - // Chromium implements the same threshold - if diff < (c.receiveWindowIncrement / 2) { - var newWindowIncrement protocol.ByteCount - oldWindowIncrement := c.receiveWindowIncrement - - c.maybeAdjustWindowIncrement() - if c.receiveWindowIncrement != oldWindowIncrement { - newWindowIncrement = c.receiveWindowIncrement - } - - c.lastWindowUpdateTime = time.Now() - c.receiveWindow = c.bytesRead + c.receiveWindowIncrement - return true, newWindowIncrement, c.receiveWindow - } - - return false, 0, 0 -} - -// maybeAdjustWindowIncrement increases the receiveWindowIncrement if we're sending WindowUpdates too often -func (c *flowController) maybeAdjustWindowIncrement() { - if c.lastWindowUpdateTime.IsZero() { - return - } - - rtt := c.rttStats.SmoothedRTT() - if rtt == 0 { - return - } - - timeSinceLastWindowUpdate := time.Since(c.lastWindowUpdateTime) - - // interval between the window updates is sufficiently large, no need to increase the increment - if timeSinceLastWindowUpdate >= 2*rtt { - return - } - - oldWindowSize := c.receiveWindowIncrement - c.receiveWindowIncrement = utils.MinByteCount(2*c.receiveWindowIncrement, c.maxReceiveWindowIncrement) - - // debug log, if the window size was actually increased - if oldWindowSize < c.receiveWindowIncrement { - newWindowSize := c.receiveWindowIncrement / (1 << 10) - if c.streamID == 0 { - utils.Debugf("Increasing receive flow control window for the connection to %d kB", newWindowSize) - } else { - utils.Debugf("Increasing receive flow control window increment for stream %d to %d kB", c.streamID, newWindowSize) - } - } -} - -// EnsureMinimumWindowIncrement sets a minimum window increment -// it is intended be used for the connection-level flow controller -// it should make sure that the connection-level window is increased when a stream-level window grows -func (c *flowController) EnsureMinimumWindowIncrement(inc protocol.ByteCount) { - if inc > c.receiveWindowIncrement { - c.receiveWindowIncrement = utils.MinByteCount(inc, c.maxReceiveWindowIncrement) - c.lastWindowUpdateTime = time.Time{} // disables autotuning for the next window update - } -} - -func (c *flowController) CheckFlowControlViolation() bool { - return c.highestReceived > c.receiveWindow -} diff --git a/internal/flowcontrol/stream_flow_controller.go b/internal/flowcontrol/stream_flow_controller.go new file mode 100644 index 00000000..6ff80f28 --- /dev/null +++ b/internal/flowcontrol/stream_flow_controller.go @@ -0,0 +1,66 @@ +package flowcontrol + +import ( + "github.com/lucas-clemente/quic-go/congestion" + "github.com/lucas-clemente/quic-go/internal/protocol" + "github.com/lucas-clemente/quic-go/internal/utils" +) + +type streamFlowController struct { + baseFlowController + + streamID protocol.StreamID + contributesToConnection bool // does the stream contribute to connection level flow control +} + +// newStreamFlowController gets a new flow controller for a stream +func newStreamFlowController( + streamID protocol.StreamID, + contributesToConnection bool, + receiveWindow protocol.ByteCount, + maxReceiveWindow protocol.ByteCount, + initialSendWindow protocol.ByteCount, + rttStats *congestion.RTTStats, +) *streamFlowController { + return &streamFlowController{ + streamID: streamID, + contributesToConnection: contributesToConnection, + baseFlowController: baseFlowController{ + rttStats: rttStats, + receiveWindow: receiveWindow, + receiveWindowIncrement: receiveWindow, + maxReceiveWindowIncrement: maxReceiveWindow, + sendWindow: initialSendWindow, + }, + } +} + +func (c *streamFlowController) ContributesToConnection() bool { + return c.contributesToConnection +} + +// UpdateHighestReceived updates the highestReceived value, if the byteOffset is higher +// it returns an ErrReceivedSmallerByteOffset if the received byteOffset is smaller than any byteOffset received before +// This error occurs every time StreamFrames get reordered and has to be ignored in that case +// It should only be treated as an error when resetting a stream +func (c *streamFlowController) UpdateHighestReceived(byteOffset protocol.ByteCount) (protocol.ByteCount, error) { + if byteOffset == c.highestReceived { + return 0, nil + } + if byteOffset > c.highestReceived { + increment := byteOffset - c.highestReceived + c.highestReceived = byteOffset + return increment, nil + } + return 0, ErrReceivedSmallerByteOffset +} + +func (c *streamFlowController) MaybeUpdateWindow() (bool, protocol.ByteCount, protocol.ByteCount) { + oldWindowSize := c.receiveWindowIncrement + updated, newIncrement, newOffset := c.baseFlowController.MaybeUpdateWindow() + // debug log, if the window size was actually increased + if oldWindowSize < c.receiveWindowIncrement { + utils.Debugf("Increasing receive flow control window for the connection to %d kB", c.receiveWindowIncrement/(1<<10)) + } + return updated, newIncrement, newOffset +} diff --git a/internal/flowcontrol/stream_flow_controller_test.go b/internal/flowcontrol/stream_flow_controller_test.go new file mode 100644 index 00000000..1c273bda --- /dev/null +++ b/internal/flowcontrol/stream_flow_controller_test.go @@ -0,0 +1,83 @@ +package flowcontrol + +import ( + "github.com/lucas-clemente/quic-go/congestion" + "github.com/lucas-clemente/quic-go/internal/protocol" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Stream Flow controller", func() { + var controller *streamFlowController + + BeforeEach(func() { + controller = &streamFlowController{} + controller.rttStats = &congestion.RTTStats{} + }) + + Context("Constructor", func() { + rttStats := &congestion.RTTStats{} + + It("sets the send and receive windows", func() { + receiveWindow := protocol.ByteCount(2000) + maxReceiveWindow := protocol.ByteCount(3000) + sendWindow := protocol.ByteCount(4000) + + fc := newStreamFlowController(5, true, receiveWindow, maxReceiveWindow, sendWindow, rttStats) + Expect(fc.streamID).To(Equal(protocol.StreamID(5))) + Expect(fc.receiveWindow).To(Equal(receiveWindow)) + Expect(fc.maxReceiveWindowIncrement).To(Equal(maxReceiveWindow)) + Expect(fc.sendWindow).To(Equal(sendWindow)) + }) + + It("says if it contributes to connection-level flow control", func() { + fc := newStreamFlowController(1, false, protocol.MaxByteCount, protocol.MaxByteCount, protocol.MaxByteCount, rttStats) + Expect(fc.ContributesToConnection()).To(BeFalse()) + fc = newStreamFlowController(5, true, protocol.MaxByteCount, protocol.MaxByteCount, protocol.MaxByteCount, rttStats) + Expect(fc.ContributesToConnection()).To(BeTrue()) + }) + }) + + Context("receive flow control", func() { + var receiveWindow protocol.ByteCount = 10000 + var receiveWindowIncrement protocol.ByteCount = 600 + + BeforeEach(func() { + controller.receiveWindow = receiveWindow + controller.receiveWindowIncrement = receiveWindowIncrement + }) + + It("updates the highestReceived", func() { + controller.highestReceived = 1337 + increment, err := controller.UpdateHighestReceived(1338) + Expect(err).ToNot(HaveOccurred()) + Expect(increment).To(Equal(protocol.ByteCount(1338 - 1337))) + Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1338))) + }) + + It("does not decrease the highestReceived", func() { + controller.highestReceived = 1337 + increment, err := controller.UpdateHighestReceived(1000) + Expect(err).To(MatchError(ErrReceivedSmallerByteOffset)) + Expect(increment).To(BeZero()) + Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1337))) + }) + + It("does not error when setting the same byte offset", func() { + controller.highestReceived = 1337 + increment, err := controller.UpdateHighestReceived(1337) + Expect(err).ToNot(HaveOccurred()) + Expect(increment).To(BeZero()) + }) + + It("detects a flow control violation", func() { + controller.UpdateHighestReceived(receiveWindow + 1) + Expect(controller.CheckFlowControlViolation()).To(BeTrue()) + }) + + It("does not give a flow control violation when using the window completely", func() { + controller.UpdateHighestReceived(receiveWindow) + Expect(controller.CheckFlowControlViolation()).To(BeFalse()) + }) + }) +})