always queue window updates when data is being read

There's no need to have a separate call in the flow controller interface
for this.
This commit is contained in:
Marten Seemann 2019-01-22 15:58:43 +07:00
parent ff9bb5bd96
commit bfbf0bca48
9 changed files with 29 additions and 65 deletions

View file

@ -54,7 +54,12 @@ func (c *connectionFlowController) IncrementHighestReceived(increment protocol.B
return nil
}
func (c *connectionFlowController) MaybeQueueWindowUpdate() {
func (c *connectionFlowController) AddBytesRead(n protocol.ByteCount) {
c.baseFlowController.AddBytesRead(n)
c.maybeQueueWindowUpdate()
}
func (c *connectionFlowController) maybeQueueWindowUpdate() {
c.mutex.Lock()
hasWindowUpdate := c.hasWindowUpdate()
c.mutex.Unlock()

View file

@ -23,6 +23,7 @@ var _ = Describe("Connection Flow controller", func() {
}
BeforeEach(func() {
queuedWindowUpdate = false
controller = &connectionFlowController{}
controller.rttStats = &congestion.RTTStats{}
controller.logger = utils.DefaultLogger
@ -58,14 +59,13 @@ var _ = Describe("Connection Flow controller", func() {
})
It("queues window updates", func() {
controller.MaybeQueueWindowUpdate()
controller.AddBytesRead(1)
Expect(queuedWindowUpdate).To(BeFalse())
controller.AddBytesRead(30)
controller.MaybeQueueWindowUpdate()
controller.AddBytesRead(29)
Expect(queuedWindowUpdate).To(BeTrue())
Expect(controller.GetWindowUpdate()).ToNot(BeZero())
queuedWindowUpdate = false
controller.MaybeQueueWindowUpdate()
controller.AddBytesRead(1)
Expect(queuedWindowUpdate).To(BeFalse())
})

View file

@ -10,7 +10,6 @@ type flowController interface {
// for receiving
AddBytesRead(protocol.ByteCount)
GetWindowUpdate() protocol.ByteCount // returns 0 if no update is necessary
MaybeQueueWindowUpdate() // queues a window update, if necessary
IsNewlyBlocked() (bool, protocol.ByteCount)
}

View file

@ -89,6 +89,7 @@ func (c *streamFlowController) UpdateHighestReceived(byteOffset protocol.ByteCou
func (c *streamFlowController) AddBytesRead(n protocol.ByteCount) {
c.baseFlowController.AddBytesRead(n)
c.maybeQueueWindowUpdate()
c.connection.AddBytesRead(n)
}
@ -101,14 +102,13 @@ func (c *streamFlowController) SendWindowSize() protocol.ByteCount {
return utils.MinByteCount(c.baseFlowController.sendWindowSize(), c.connection.SendWindowSize())
}
func (c *streamFlowController) MaybeQueueWindowUpdate() {
func (c *streamFlowController) maybeQueueWindowUpdate() {
c.mutex.Lock()
hasWindowUpdate := !c.receivedFinalOffset && c.hasWindowUpdate()
c.mutex.Unlock()
if hasWindowUpdate {
c.queueWindowUpdate()
}
c.connection.MaybeQueueWindowUpdate()
}
func (c *streamFlowController) GetWindowUpdate() protocol.ByteCount {

View file

@ -13,18 +13,16 @@ import (
var _ = Describe("Stream Flow controller", func() {
var (
controller *streamFlowController
queuedWindowUpdate bool
queuedConnWindowUpdate bool
controller *streamFlowController
queuedWindowUpdate bool
)
BeforeEach(func() {
queuedWindowUpdate = false
queuedConnWindowUpdate = false
rttStats := &congestion.RTTStats{}
controller = &streamFlowController{
streamID: 10,
connection: NewConnectionFlowController(1000, 1000, func() { queuedConnWindowUpdate = true }, rttStats, utils.DefaultLogger).(*connectionFlowController),
connection: NewConnectionFlowController(1000, 1000, func() {}, rttStats, utils.DefaultLogger).(*connectionFlowController),
}
controller.maxReceiveWindowSize = 10000
controller.rttStats = rttStats
@ -57,7 +55,6 @@ var _ = Describe("Stream Flow controller", func() {
cc := NewConnectionFlowController(0, 0, nil, nil, utils.DefaultLogger)
fc := NewStreamFlowController(5, cc, receiveWindow, maxReceiveWindow, sendWindow, queueWindowUpdate, rttStats, utils.DefaultLogger).(*streamFlowController)
fc.AddBytesRead(receiveWindow)
fc.MaybeQueueWindowUpdate()
Expect(queued).To(BeTrue())
})
})
@ -179,25 +176,16 @@ var _ = Describe("Stream Flow controller", func() {
})
It("queues window updates", func() {
controller.MaybeQueueWindowUpdate()
controller.AddBytesRead(1)
Expect(queuedWindowUpdate).To(BeFalse())
controller.AddBytesRead(30)
controller.MaybeQueueWindowUpdate()
controller.AddBytesRead(29)
Expect(queuedWindowUpdate).To(BeTrue())
Expect(controller.GetWindowUpdate()).ToNot(BeZero())
queuedWindowUpdate = false
controller.MaybeQueueWindowUpdate()
controller.AddBytesRead(1)
Expect(queuedWindowUpdate).To(BeFalse())
})
It("queues connection-level window updates", func() {
controller.MaybeQueueWindowUpdate()
Expect(queuedConnWindowUpdate).To(BeFalse())
controller.AddBytesRead(60)
controller.MaybeQueueWindowUpdate()
Expect(queuedConnWindowUpdate).To(BeTrue())
})
It("tells the connection flow controller when the window was autotuned", func() {
oldOffset := controller.bytesRead
setRtt(scaleDuration(20 * time.Millisecond))
@ -211,10 +199,8 @@ var _ = Describe("Stream Flow controller", func() {
})
It("doesn't increase the window after a final offset was already received", func() {
Expect(controller.UpdateHighestReceived(90, true)).To(Succeed())
controller.AddBytesRead(30)
err := controller.UpdateHighestReceived(90, true)
Expect(err).ToNot(HaveOccurred())
controller.MaybeQueueWindowUpdate()
Expect(queuedWindowUpdate).To(BeFalse())
offset := controller.GetWindowUpdate()
Expect(offset).To(BeZero())

View file

@ -79,16 +79,6 @@ func (mr *MockConnectionFlowControllerMockRecorder) IsNewlyBlocked() *gomock.Cal
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsNewlyBlocked", reflect.TypeOf((*MockConnectionFlowController)(nil).IsNewlyBlocked))
}
// MaybeQueueWindowUpdate mocks base method
func (m *MockConnectionFlowController) MaybeQueueWindowUpdate() {
m.ctrl.Call(m, "MaybeQueueWindowUpdate")
}
// MaybeQueueWindowUpdate indicates an expected call of MaybeQueueWindowUpdate
func (mr *MockConnectionFlowControllerMockRecorder) MaybeQueueWindowUpdate() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaybeQueueWindowUpdate", reflect.TypeOf((*MockConnectionFlowController)(nil).MaybeQueueWindowUpdate))
}
// SendWindowSize mocks base method
func (m *MockConnectionFlowController) SendWindowSize() protocol.ByteCount {
ret := m.ctrl.Call(m, "SendWindowSize")

View file

@ -34,6 +34,16 @@ func (m *MockStreamFlowController) EXPECT() *MockStreamFlowControllerMockRecorde
return m.recorder
}
// Abandon mocks base method
func (m *MockStreamFlowController) Abandon() {
m.ctrl.Call(m, "Abandon")
}
// Abandon indicates an expected call of Abandon
func (mr *MockStreamFlowControllerMockRecorder) Abandon() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Abandon", reflect.TypeOf((*MockStreamFlowController)(nil).Abandon))
}
// AddBytesRead mocks base method
func (m *MockStreamFlowController) AddBytesRead(arg0 protocol.ByteCount) {
m.ctrl.Call(m, "AddBytesRead", arg0)
@ -79,16 +89,6 @@ func (mr *MockStreamFlowControllerMockRecorder) IsNewlyBlocked() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsNewlyBlocked", reflect.TypeOf((*MockStreamFlowController)(nil).IsNewlyBlocked))
}
// MaybeQueueWindowUpdate mocks base method
func (m *MockStreamFlowController) MaybeQueueWindowUpdate() {
m.ctrl.Call(m, "MaybeQueueWindowUpdate")
}
// MaybeQueueWindowUpdate indicates an expected call of MaybeQueueWindowUpdate
func (mr *MockStreamFlowControllerMockRecorder) MaybeQueueWindowUpdate() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaybeQueueWindowUpdate", reflect.TypeOf((*MockStreamFlowController)(nil).MaybeQueueWindowUpdate))
}
// SendWindowSize mocks base method
func (m *MockStreamFlowController) SendWindowSize() protocol.ByteCount {
ret := m.ctrl.Call(m, "SendWindowSize")

View file

@ -172,8 +172,6 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err
if !s.resetRemotely {
s.flowController.AddBytesRead(protocol.ByteCount(m))
}
// increase the flow control window, if necessary
s.flowController.MaybeQueueWindowUpdate()
if s.readPosInFrame >= len(s.currentFrame) && s.currentFrameIsLast {
s.finRead = true

View file

@ -43,7 +43,6 @@ var _ = Describe("Receive Stream", func() {
It("reads a single STREAM frame", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4))
mockFC.EXPECT().MaybeQueueWindowUpdate()
frame := wire.StreamFrame{
Offset: 0,
Data: []byte{0xDE, 0xAD, 0xBE, 0xEF},
@ -61,7 +60,6 @@ var _ = Describe("Receive Stream", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
mockFC.EXPECT().MaybeQueueWindowUpdate().Times(2)
frame := wire.StreamFrame{
Offset: 0,
Data: []byte{0xDE, 0xAD, 0xBE, 0xEF},
@ -83,7 +81,6 @@ var _ = Describe("Receive Stream", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
mockFC.EXPECT().MaybeQueueWindowUpdate().Times(2)
frame1 := wire.StreamFrame{
Offset: 0,
Data: []byte{0xDE, 0xAD},
@ -107,7 +104,6 @@ var _ = Describe("Receive Stream", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
mockFC.EXPECT().MaybeQueueWindowUpdate().Times(2)
frame1 := wire.StreamFrame{
Offset: 0,
Data: []byte{0xDE, 0xAD},
@ -130,7 +126,6 @@ var _ = Describe("Receive Stream", func() {
It("waits until data is available", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
mockFC.EXPECT().MaybeQueueWindowUpdate()
go func() {
defer GinkgoRecover()
frame := wire.StreamFrame{Data: []byte{0xDE, 0xAD}}
@ -148,7 +143,6 @@ var _ = Describe("Receive Stream", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
mockFC.EXPECT().MaybeQueueWindowUpdate().Times(2)
frame1 := wire.StreamFrame{
Offset: 2,
Data: []byte{0xBE, 0xEF},
@ -173,7 +167,6 @@ var _ = Describe("Receive Stream", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
mockFC.EXPECT().MaybeQueueWindowUpdate().Times(2)
frame1 := wire.StreamFrame{
Offset: 0,
Data: []byte{0xDE, 0xAD},
@ -204,7 +197,6 @@ var _ = Describe("Receive Stream", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), false)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4))
mockFC.EXPECT().MaybeQueueWindowUpdate().Times(2)
frame1 := wire.StreamFrame{
Offset: 0,
Data: []byte("foob"),
@ -337,7 +329,6 @@ var _ = Describe("Receive Stream", func() {
It("returns EOFs", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), true)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4))
mockFC.EXPECT().MaybeQueueWindowUpdate()
str.handleStreamFrame(&wire.StreamFrame{
Offset: 0,
Data: []byte{0xDE, 0xAD, 0xBE, 0xEF},
@ -358,7 +349,6 @@ var _ = Describe("Receive Stream", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), true)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
mockFC.EXPECT().MaybeQueueWindowUpdate().Times(2)
frame1 := wire.StreamFrame{
Offset: 2,
Data: []byte{0xBE, 0xEF},
@ -386,7 +376,6 @@ var _ = Describe("Receive Stream", func() {
It("returns EOFs with partial read", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), true)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
mockFC.EXPECT().MaybeQueueWindowUpdate()
err := str.handleStreamFrame(&wire.StreamFrame{
Offset: 0,
Data: []byte{0xde, 0xad},
@ -404,7 +393,6 @@ var _ = Describe("Receive Stream", func() {
It("handles immediate FINs", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(0))
mockFC.EXPECT().MaybeQueueWindowUpdate()
err := str.handleStreamFrame(&wire.StreamFrame{
Offset: 0,
FinBit: true,
@ -421,7 +409,6 @@ var _ = Describe("Receive Stream", func() {
It("closes when CloseRemote is called", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(0))
mockFC.EXPECT().MaybeQueueWindowUpdate()
str.CloseRemote(0)
mockSender.EXPECT().onStreamCompleted(streamID)
b := make([]byte, 8)
@ -497,7 +484,6 @@ var _ = Describe("Receive Stream", func() {
It("doesn't send a RESET_STREAM frame, if the FIN was already read", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), true)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(6))
mockFC.EXPECT().MaybeQueueWindowUpdate()
// no calls to mockSender.queueControlFrame
err := str.handleStreamFrame(&wire.StreamFrame{
StreamID: streamID,