avoid duplicate mutex locking when reading data

This commit is contained in:
Marten Seemann 2020-11-09 09:15:21 +07:00
parent 5352cd509b
commit 655632b116
5 changed files with 22 additions and 30 deletions

View file

@ -15,6 +15,7 @@ type baseFlowController struct {
lastBlockedAt protocol.ByteCount lastBlockedAt protocol.ByteCount
// for receiving data // for receiving data
//nolint:structcheck // The mutex is used both by the stream and the connection flow controller
mutex sync.Mutex mutex sync.Mutex
bytesRead protocol.ByteCount bytesRead protocol.ByteCount
highestReceived protocol.ByteCount highestReceived protocol.ByteCount
@ -60,10 +61,8 @@ func (c *baseFlowController) sendWindowSize() protocol.ByteCount {
return c.sendWindow - c.bytesSent return c.sendWindow - c.bytesSent
} }
func (c *baseFlowController) AddBytesRead(n protocol.ByteCount) { // needs to be called with locked mutex
c.mutex.Lock() func (c *baseFlowController) addBytesRead(n protocol.ByteCount) {
defer c.mutex.Unlock()
// pretend we sent a WindowUpdate when reading the first byte // pretend we sent a WindowUpdate when reading the first byte
// this way auto-tuning of the window size already works for the first WindowUpdate // this way auto-tuning of the window size already works for the first WindowUpdate
if c.bytesRead == 0 { if c.bytesRead == 0 {

View file

@ -102,7 +102,7 @@ var _ = Describe("Base Flow controller", func() {
It("adds bytes read", func() { It("adds bytes read", func() {
controller.bytesRead = 5 controller.bytesRead = 5
controller.AddBytesRead(6) controller.addBytesRead(6)
Expect(controller.bytesRead).To(Equal(protocol.ByteCount(5 + 6))) Expect(controller.bytesRead).To(Equal(protocol.ByteCount(5 + 6)))
}) })
@ -147,7 +147,7 @@ var _ = Describe("Base Flow controller", func() {
It("doesn't increase the window size when no RTT estimate is available", func() { It("doesn't increase the window size when no RTT estimate is available", func() {
setRtt(0) setRtt(0)
controller.startNewAutoTuningEpoch(time.Now()) controller.startNewAutoTuningEpoch(time.Now())
controller.AddBytesRead(400) controller.addBytesRead(400)
offset := controller.getWindowUpdate() offset := controller.getWindowUpdate()
Expect(offset).ToNot(BeZero()) // make sure a window update is sent Expect(offset).ToNot(BeZero()) // make sure a window update is sent
Expect(controller.receiveWindowSize).To(Equal(oldWindowSize)) Expect(controller.receiveWindowSize).To(Equal(oldWindowSize))
@ -162,7 +162,7 @@ var _ = Describe("Base Flow controller", func() {
// ... in 4*2/3 of the RTT // ... in 4*2/3 of the RTT
controller.epochStartOffset = controller.bytesRead controller.epochStartOffset = controller.bytesRead
controller.epochStartTime = time.Now().Add(-rtt * 4 * 2 / 3) controller.epochStartTime = time.Now().Add(-rtt * 4 * 2 / 3)
controller.AddBytesRead(dataRead) controller.addBytesRead(dataRead)
offset := controller.getWindowUpdate() offset := controller.getWindowUpdate()
Expect(offset).ToNot(BeZero()) Expect(offset).ToNot(BeZero())
// check that the window size was increased // check that the window size was increased
@ -183,7 +183,7 @@ var _ = Describe("Base Flow controller", func() {
// ... in 4*2/3 of the RTT // ... in 4*2/3 of the RTT
controller.epochStartOffset = controller.bytesRead controller.epochStartOffset = controller.bytesRead
controller.epochStartTime = time.Now().Add(-rtt * 4 * 1 / 3) controller.epochStartTime = time.Now().Add(-rtt * 4 * 1 / 3)
controller.AddBytesRead(dataRead) controller.addBytesRead(dataRead)
offset := controller.getWindowUpdate() offset := controller.getWindowUpdate()
Expect(offset).ToNot(BeZero()) Expect(offset).ToNot(BeZero())
// check that the window size was not increased // check that the window size was not increased
@ -202,7 +202,7 @@ var _ = Describe("Base Flow controller", func() {
// ... in 4*2/3 of the RTT // ... in 4*2/3 of the RTT
controller.epochStartOffset = controller.bytesRead controller.epochStartOffset = controller.bytesRead
controller.epochStartTime = time.Now().Add(-rtt * 4 * 2 / 3) controller.epochStartTime = time.Now().Add(-rtt * 4 * 2 / 3)
controller.AddBytesRead(dataRead) controller.addBytesRead(dataRead)
offset := controller.getWindowUpdate() offset := controller.getWindowUpdate()
Expect(offset).ToNot(BeZero()) Expect(offset).ToNot(BeZero())
// check that the window size was not increased // check that the window size was not increased
@ -216,7 +216,7 @@ var _ = Describe("Base Flow controller", func() {
// make sure the next call to maybeAdjustWindowSize will increase the window // make sure the next call to maybeAdjustWindowSize will increase the window
controller.epochStartTime = time.Now().Add(-time.Millisecond) controller.epochStartTime = time.Now().Add(-time.Millisecond)
controller.epochStartOffset = controller.bytesRead controller.epochStartOffset = controller.bytesRead
controller.AddBytesRead(controller.receiveWindowSize/2 + 1) controller.addBytesRead(controller.receiveWindowSize/2 + 1)
} }
setRtt(scaleDuration(20 * time.Millisecond)) setRtt(scaleDuration(20 * time.Millisecond))
resetEpoch() resetEpoch()

View file

@ -55,15 +55,11 @@ func (c *connectionFlowController) IncrementHighestReceived(increment protocol.B
} }
func (c *connectionFlowController) AddBytesRead(n protocol.ByteCount) { func (c *connectionFlowController) AddBytesRead(n protocol.ByteCount) {
c.baseFlowController.AddBytesRead(n)
c.maybeQueueWindowUpdate()
}
func (c *connectionFlowController) maybeQueueWindowUpdate() {
c.mutex.Lock() c.mutex.Lock()
hasWindowUpdate := c.hasWindowUpdate() c.baseFlowController.addBytesRead(n)
shouldQueueWindowUpdate := c.hasWindowUpdate()
c.mutex.Unlock() c.mutex.Unlock()
if hasWindowUpdate { if shouldQueueWindowUpdate {
c.queueWindowUpdate() c.queueWindowUpdate()
} }
} }

View file

@ -86,8 +86,13 @@ func (c *streamFlowController) UpdateHighestReceived(offset protocol.ByteCount,
} }
func (c *streamFlowController) AddBytesRead(n protocol.ByteCount) { func (c *streamFlowController) AddBytesRead(n protocol.ByteCount) {
c.baseFlowController.AddBytesRead(n) c.mutex.Lock()
c.maybeQueueWindowUpdate() c.baseFlowController.addBytesRead(n)
shouldQueueWindowUpdate := c.shouldQueueWindowUpdate()
c.mutex.Unlock()
if shouldQueueWindowUpdate {
c.queueWindowUpdate()
}
c.connection.AddBytesRead(n) c.connection.AddBytesRead(n)
} }
@ -106,16 +111,8 @@ func (c *streamFlowController) SendWindowSize() protocol.ByteCount {
return utils.MinByteCount(c.baseFlowController.sendWindowSize(), c.connection.SendWindowSize()) return utils.MinByteCount(c.baseFlowController.sendWindowSize(), c.connection.SendWindowSize())
} }
func (c *streamFlowController) maybeQueueWindowUpdate() { func (c *streamFlowController) shouldQueueWindowUpdate() bool {
if c.receivedFinalOffset { return !c.receivedFinalOffset && c.hasWindowUpdate()
return
}
c.mutex.Lock()
hasWindowUpdate := c.hasWindowUpdate()
c.mutex.Unlock()
if hasWindowUpdate {
c.queueWindowUpdate()
}
} }
func (c *streamFlowController) GetWindowUpdate() protocol.ByteCount { func (c *streamFlowController) GetWindowUpdate() protocol.ByteCount {

View file

@ -43,7 +43,7 @@ var _ = Describe("Stream Flow controller", func() {
Expect(fc.sendWindow).To(Equal(sendWindow)) Expect(fc.sendWindow).To(Equal(sendWindow))
}) })
It("queues window updates with the correction stream ID", func() { It("queues window updates with the correct stream ID", func() {
var queued bool var queued bool
queueWindowUpdate := func(id protocol.StreamID) { queueWindowUpdate := func(id protocol.StreamID) {
Expect(id).To(Equal(protocol.StreamID(5))) Expect(id).To(Equal(protocol.StreamID(5)))