add a HasWindowUpdate method to the stream flow controller

This commit is contained in:
Marten Seemann 2017-12-21 17:06:38 +07:00
parent 8759fa2d8e
commit 15ab0ae443
5 changed files with 38 additions and 4 deletions

View file

@ -60,12 +60,16 @@ func (c *baseFlowController) AddBytesRead(n protocol.ByteCount) {
c.bytesRead += n
}
func (c *baseFlowController) hasWindowUpdate() bool {
bytesRemaining := c.receiveWindow - c.bytesRead
// update the window when more than the threshold was consumed
return bytesRemaining <= protocol.ByteCount((float64(c.receiveWindowSize) * float64((1 - protocol.WindowUpdateThreshold))))
}
// getWindowUpdate updates the receive window, if necessary
// it returns the new offset
func (c *baseFlowController) getWindowUpdate() protocol.ByteCount {
bytesRemaining := c.receiveWindow - c.bytesRead
// update the window when more than the threshold was consumed
if bytesRemaining >= protocol.ByteCount((float64(c.receiveWindowSize) * float64((1 - protocol.WindowUpdateThreshold)))) {
if !c.hasWindowUpdate() {
return 0
}

View file

@ -20,6 +20,8 @@ type StreamFlowController interface {
// UpdateHighestReceived should be called when a new highest offset is received
// final has to be to true if this is the final offset of the stream, as contained in a STREAM frame with FIN bit, and the RST_STREAM frame
UpdateHighestReceived(offset protocol.ByteCount, final bool) error
// HasWindowUpdate says if it is necessary to update the window
HasWindowUpdate() bool
}
// The ConnectionFlowController is the flow controller for the connection.

View file

@ -109,6 +109,13 @@ func (c *streamFlowController) SendWindowSize() protocol.ByteCount {
return window
}
func (c *streamFlowController) HasWindowUpdate() bool {
c.mutex.Lock()
hasWindowUpdate := !c.receivedFinalOffset && c.hasWindowUpdate()
c.mutex.Unlock()
return hasWindowUpdate
}
func (c *streamFlowController) GetWindowUpdate() protocol.ByteCount {
// don't use defer for unlocking the mutex here, GetWindowUpdate() is called frequently and defer shows up in the profiler
c.mutex.Lock()

View file

@ -173,6 +173,14 @@ var _ = Describe("Stream Flow controller", func() {
oldWindowSize = controller.receiveWindowSize
})
It("tells if it has window updates", func() {
Expect(controller.HasWindowUpdate()).To(BeFalse())
controller.AddBytesRead(30)
Expect(controller.HasWindowUpdate()).To(BeTrue())
Expect(controller.GetWindowUpdate()).ToNot(BeZero())
Expect(controller.HasWindowUpdate()).To(BeFalse())
})
It("tells the connection flow controller when the window was autotuned", func() {
oldOffset := controller.bytesRead
controller.contributesToConnection = true
@ -200,9 +208,10 @@ var _ = Describe("Stream Flow controller", func() {
})
It("doesn't increase the window after a final offset was already received", func() {
controller.AddBytesRead(80)
controller.AddBytesRead(30)
err := controller.UpdateHighestReceived(90, true)
Expect(err).ToNot(HaveOccurred())
Expect(controller.HasWindowUpdate()).To(BeFalse())
offset := controller.GetWindowUpdate()
Expect(offset).To(BeZero())
})

View file

@ -66,6 +66,18 @@ func (mr *MockStreamFlowControllerMockRecorder) GetWindowUpdate() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetWindowUpdate", reflect.TypeOf((*MockStreamFlowController)(nil).GetWindowUpdate))
}
// HasWindowUpdate mocks base method
func (m *MockStreamFlowController) HasWindowUpdate() bool {
ret := m.ctrl.Call(m, "HasWindowUpdate")
ret0, _ := ret[0].(bool)
return ret0
}
// HasWindowUpdate indicates an expected call of HasWindowUpdate
func (mr *MockStreamFlowControllerMockRecorder) HasWindowUpdate() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasWindowUpdate", reflect.TypeOf((*MockStreamFlowController)(nil).HasWindowUpdate))
}
// IsNewlyBlocked mocks base method
func (m *MockStreamFlowController) IsNewlyBlocked() (bool, protocol.ByteCount) {
ret := m.ctrl.Call(m, "IsNewlyBlocked")