diff --git a/internal/flowcontrol/base_flow_controller.go b/internal/flowcontrol/base_flow_controller.go index eaa971e5..435a2c47 100644 --- a/internal/flowcontrol/base_flow_controller.go +++ b/internal/flowcontrol/base_flow_controller.go @@ -15,6 +15,7 @@ type baseFlowController struct { lastBlockedAt protocol.ByteCount // for receiving data + //nolint:structcheck // The mutex is used both by the stream and the connection flow controller mutex sync.Mutex bytesRead protocol.ByteCount highestReceived protocol.ByteCount @@ -60,10 +61,8 @@ func (c *baseFlowController) sendWindowSize() protocol.ByteCount { return c.sendWindow - c.bytesSent } -func (c *baseFlowController) AddBytesRead(n protocol.ByteCount) { - c.mutex.Lock() - defer c.mutex.Unlock() - +// needs to be called with locked mutex +func (c *baseFlowController) addBytesRead(n protocol.ByteCount) { // pretend we sent a WindowUpdate when reading the first byte // this way auto-tuning of the window size already works for the first WindowUpdate if c.bytesRead == 0 { diff --git a/internal/flowcontrol/base_flow_controller_test.go b/internal/flowcontrol/base_flow_controller_test.go index 9971d778..0f4b0228 100644 --- a/internal/flowcontrol/base_flow_controller_test.go +++ b/internal/flowcontrol/base_flow_controller_test.go @@ -102,7 +102,7 @@ var _ = Describe("Base Flow controller", func() { It("adds bytes read", func() { controller.bytesRead = 5 - controller.AddBytesRead(6) + controller.addBytesRead(6) Expect(controller.bytesRead).To(Equal(protocol.ByteCount(5 + 6))) }) @@ -147,7 +147,7 @@ var _ = Describe("Base Flow controller", func() { It("doesn't increase the window size when no RTT estimate is available", func() { setRtt(0) controller.startNewAutoTuningEpoch(time.Now()) - controller.AddBytesRead(400) + controller.addBytesRead(400) offset := controller.getWindowUpdate() Expect(offset).ToNot(BeZero()) // make sure a window update is sent Expect(controller.receiveWindowSize).To(Equal(oldWindowSize)) @@ -162,7 +162,7 @@ var _ = Describe("Base Flow controller", func() { // ... in 4*2/3 of the RTT controller.epochStartOffset = controller.bytesRead controller.epochStartTime = time.Now().Add(-rtt * 4 * 2 / 3) - controller.AddBytesRead(dataRead) + controller.addBytesRead(dataRead) offset := controller.getWindowUpdate() Expect(offset).ToNot(BeZero()) // check that the window size was increased @@ -183,7 +183,7 @@ var _ = Describe("Base Flow controller", func() { // ... in 4*2/3 of the RTT controller.epochStartOffset = controller.bytesRead controller.epochStartTime = time.Now().Add(-rtt * 4 * 1 / 3) - controller.AddBytesRead(dataRead) + controller.addBytesRead(dataRead) offset := controller.getWindowUpdate() Expect(offset).ToNot(BeZero()) // check that the window size was not increased @@ -202,7 +202,7 @@ var _ = Describe("Base Flow controller", func() { // ... in 4*2/3 of the RTT controller.epochStartOffset = controller.bytesRead controller.epochStartTime = time.Now().Add(-rtt * 4 * 2 / 3) - controller.AddBytesRead(dataRead) + controller.addBytesRead(dataRead) offset := controller.getWindowUpdate() Expect(offset).ToNot(BeZero()) // check that the window size was not increased @@ -216,7 +216,7 @@ var _ = Describe("Base Flow controller", func() { // make sure the next call to maybeAdjustWindowSize will increase the window controller.epochStartTime = time.Now().Add(-time.Millisecond) controller.epochStartOffset = controller.bytesRead - controller.AddBytesRead(controller.receiveWindowSize/2 + 1) + controller.addBytesRead(controller.receiveWindowSize/2 + 1) } setRtt(scaleDuration(20 * time.Millisecond)) resetEpoch() diff --git a/internal/flowcontrol/connection_flow_controller.go b/internal/flowcontrol/connection_flow_controller.go index 0c42c800..253d3156 100644 --- a/internal/flowcontrol/connection_flow_controller.go +++ b/internal/flowcontrol/connection_flow_controller.go @@ -55,15 +55,11 @@ func (c *connectionFlowController) IncrementHighestReceived(increment protocol.B } func (c *connectionFlowController) AddBytesRead(n protocol.ByteCount) { - c.baseFlowController.AddBytesRead(n) - c.maybeQueueWindowUpdate() -} - -func (c *connectionFlowController) maybeQueueWindowUpdate() { c.mutex.Lock() - hasWindowUpdate := c.hasWindowUpdate() + c.baseFlowController.addBytesRead(n) + shouldQueueWindowUpdate := c.hasWindowUpdate() c.mutex.Unlock() - if hasWindowUpdate { + if shouldQueueWindowUpdate { c.queueWindowUpdate() } } diff --git a/internal/flowcontrol/stream_flow_controller.go b/internal/flowcontrol/stream_flow_controller.go index 90b04e72..46a911cf 100644 --- a/internal/flowcontrol/stream_flow_controller.go +++ b/internal/flowcontrol/stream_flow_controller.go @@ -50,9 +50,6 @@ func NewStreamFlowController( // UpdateHighestReceived updates the highestReceived value, if the offset is higher. func (c *streamFlowController) UpdateHighestReceived(offset protocol.ByteCount, final bool) error { - c.mutex.Lock() - defer c.mutex.Unlock() - // If the final offset for this stream is already known, check for consistency. if c.receivedFinalOffset { // If we receive another final offset, check that it's the same. @@ -89,8 +86,13 @@ func (c *streamFlowController) UpdateHighestReceived(offset protocol.ByteCount, } func (c *streamFlowController) AddBytesRead(n protocol.ByteCount) { - c.baseFlowController.AddBytesRead(n) - c.maybeQueueWindowUpdate() + c.mutex.Lock() + c.baseFlowController.addBytesRead(n) + shouldQueueWindowUpdate := c.shouldQueueWindowUpdate() + c.mutex.Unlock() + if shouldQueueWindowUpdate { + c.queueWindowUpdate() + } c.connection.AddBytesRead(n) } @@ -109,24 +111,18 @@ func (c *streamFlowController) SendWindowSize() protocol.ByteCount { return utils.MinByteCount(c.baseFlowController.sendWindowSize(), c.connection.SendWindowSize()) } -func (c *streamFlowController) maybeQueueWindowUpdate() { - c.mutex.Lock() - hasWindowUpdate := !c.receivedFinalOffset && c.hasWindowUpdate() - c.mutex.Unlock() - if hasWindowUpdate { - c.queueWindowUpdate() - } +func (c *streamFlowController) shouldQueueWindowUpdate() bool { + return !c.receivedFinalOffset && c.hasWindowUpdate() } func (c *streamFlowController) GetWindowUpdate() protocol.ByteCount { - // don't use defer for unlocking the mutex here, GetWindowUpdate() is called frequently and defer shows up in the profiler - c.mutex.Lock() - // if we already received the final offset for this stream, the peer won't need any additional flow control credit + // If we already received the final offset for this stream, the peer won't need any additional flow control credit. if c.receivedFinalOffset { - c.mutex.Unlock() return 0 } + // Don't use defer for unlocking the mutex here, GetWindowUpdate() is called frequently and defer shows up in the profiler + c.mutex.Lock() oldWindowSize := c.receiveWindowSize offset := c.baseFlowController.getWindowUpdate() if c.receiveWindowSize > oldWindowSize { // auto-tuning enlarged the window size diff --git a/internal/flowcontrol/stream_flow_controller_test.go b/internal/flowcontrol/stream_flow_controller_test.go index fca80949..8195c71e 100644 --- a/internal/flowcontrol/stream_flow_controller_test.go +++ b/internal/flowcontrol/stream_flow_controller_test.go @@ -43,7 +43,7 @@ var _ = Describe("Stream Flow controller", func() { Expect(fc.sendWindow).To(Equal(sendWindow)) }) - It("queues window updates with the correction stream ID", func() { + It("queues window updates with the correct stream ID", func() { var queued bool queueWindowUpdate := func(id protocol.StreamID) { Expect(id).To(Equal(protocol.StreamID(5)))