mirror of
https://github.com/refraction-networking/uquic.git
synced 2025-04-03 20:27:35 +03:00
only send BLOCKED frames if there is more data to send
This commit is contained in:
parent
fc87fbd354
commit
50397a979c
11 changed files with 83 additions and 139 deletions
|
@ -11,8 +11,9 @@ import (
|
|||
|
||||
type baseFlowController struct {
|
||||
// for sending data
|
||||
bytesSent protocol.ByteCount
|
||||
sendWindow protocol.ByteCount
|
||||
bytesSent protocol.ByteCount
|
||||
sendWindow protocol.ByteCount
|
||||
lastBlockedAt protocol.ByteCount
|
||||
|
||||
// for receiving data
|
||||
mutex sync.RWMutex
|
||||
|
@ -29,6 +30,17 @@ type baseFlowController struct {
|
|||
logger utils.Logger
|
||||
}
|
||||
|
||||
// IsNewlyBlocked says if it is newly blocked by flow control.
|
||||
// For every offset, it only returns true once.
|
||||
// If it is blocked, the offset is returned.
|
||||
func (c *baseFlowController) IsNewlyBlocked() (bool, protocol.ByteCount) {
|
||||
if c.sendWindowSize() != 0 || c.sendWindow == c.lastBlockedAt {
|
||||
return false, 0
|
||||
}
|
||||
c.lastBlockedAt = c.sendWindow
|
||||
return true, c.sendWindow
|
||||
}
|
||||
|
||||
func (c *baseFlowController) AddBytesSent(n protocol.ByteCount) {
|
||||
c.bytesSent += n
|
||||
}
|
||||
|
|
|
@ -61,6 +61,29 @@ var _ = Describe("Base Flow controller", func() {
|
|||
controller.UpdateSendWindow(10)
|
||||
Expect(controller.sendWindowSize()).To(Equal(protocol.ByteCount(20)))
|
||||
})
|
||||
|
||||
It("says when it's blocked", func() {
|
||||
controller.UpdateSendWindow(100)
|
||||
Expect(controller.IsNewlyBlocked()).To(BeFalse())
|
||||
controller.AddBytesSent(100)
|
||||
blocked, offset := controller.IsNewlyBlocked()
|
||||
Expect(blocked).To(BeTrue())
|
||||
Expect(offset).To(Equal(protocol.ByteCount(100)))
|
||||
})
|
||||
|
||||
It("doesn't say that it's newly blocked multiple times for the same offset", func() {
|
||||
controller.UpdateSendWindow(100)
|
||||
controller.AddBytesSent(100)
|
||||
newlyBlocked, offset := controller.IsNewlyBlocked()
|
||||
Expect(newlyBlocked).To(BeTrue())
|
||||
Expect(offset).To(Equal(protocol.ByteCount(100)))
|
||||
newlyBlocked, _ = controller.IsNewlyBlocked()
|
||||
Expect(newlyBlocked).To(BeFalse())
|
||||
controller.UpdateSendWindow(150)
|
||||
controller.AddBytesSent(150)
|
||||
newlyBlocked, _ = controller.IsNewlyBlocked()
|
||||
Expect(newlyBlocked).To(BeTrue())
|
||||
})
|
||||
})
|
||||
|
||||
Context("receive flow control", func() {
|
||||
|
|
|
@ -10,7 +10,6 @@ import (
|
|||
)
|
||||
|
||||
type connectionFlowController struct {
|
||||
lastBlockedAt protocol.ByteCount
|
||||
baseFlowController
|
||||
|
||||
queueWindowUpdate func()
|
||||
|
@ -43,17 +42,6 @@ func (c *connectionFlowController) SendWindowSize() protocol.ByteCount {
|
|||
return c.baseFlowController.sendWindowSize()
|
||||
}
|
||||
|
||||
// IsNewlyBlocked says if it is newly blocked by flow control.
|
||||
// For every offset, it only returns true once.
|
||||
// If it is blocked, the offset is returned.
|
||||
func (c *connectionFlowController) IsNewlyBlocked() (bool, protocol.ByteCount) {
|
||||
if c.sendWindowSize() != 0 || c.sendWindow == c.lastBlockedAt {
|
||||
return false, 0
|
||||
}
|
||||
c.lastBlockedAt = c.sendWindow
|
||||
return true, c.sendWindow
|
||||
}
|
||||
|
||||
// IncrementHighestReceived adds an increment to the highestReceived value
|
||||
func (c *connectionFlowController) IncrementHighestReceived(increment protocol.ByteCount) error {
|
||||
c.mutex.Lock()
|
||||
|
|
|
@ -95,31 +95,6 @@ var _ = Describe("Connection Flow controller", func() {
|
|||
})
|
||||
})
|
||||
|
||||
Context("send flow control", func() {
|
||||
It("says when it's blocked", func() {
|
||||
controller.UpdateSendWindow(100)
|
||||
Expect(controller.IsNewlyBlocked()).To(BeFalse())
|
||||
controller.AddBytesSent(100)
|
||||
blocked, offset := controller.IsNewlyBlocked()
|
||||
Expect(blocked).To(BeTrue())
|
||||
Expect(offset).To(Equal(protocol.ByteCount(100)))
|
||||
})
|
||||
|
||||
It("doesn't say that it's newly blocked multiple times for the same offset", func() {
|
||||
controller.UpdateSendWindow(100)
|
||||
controller.AddBytesSent(100)
|
||||
newlyBlocked, offset := controller.IsNewlyBlocked()
|
||||
Expect(newlyBlocked).To(BeTrue())
|
||||
Expect(offset).To(Equal(protocol.ByteCount(100)))
|
||||
newlyBlocked, _ = controller.IsNewlyBlocked()
|
||||
Expect(newlyBlocked).To(BeFalse())
|
||||
controller.UpdateSendWindow(150)
|
||||
controller.AddBytesSent(150)
|
||||
newlyBlocked, _ = controller.IsNewlyBlocked()
|
||||
Expect(newlyBlocked).To(BeTrue())
|
||||
})
|
||||
})
|
||||
|
||||
Context("setting the minimum window size", func() {
|
||||
var (
|
||||
oldWindowSize protocol.ByteCount
|
||||
|
|
|
@ -11,13 +11,12 @@ type flowController interface {
|
|||
AddBytesRead(protocol.ByteCount)
|
||||
GetWindowUpdate() protocol.ByteCount // returns 0 if no update is necessary
|
||||
MaybeQueueWindowUpdate() // queues a window update, if necessary
|
||||
IsNewlyBlocked() (bool, protocol.ByteCount)
|
||||
}
|
||||
|
||||
// A StreamFlowController is a flow controller for a QUIC stream.
|
||||
type StreamFlowController interface {
|
||||
flowController
|
||||
// for sending
|
||||
IsBlocked() (bool, protocol.ByteCount)
|
||||
// for receiving
|
||||
// UpdateHighestReceived should be called when a new highest offset is received
|
||||
// final has to be to true if this is the final offset of the stream, as contained in a STREAM frame with FIN bit, and the RST_STREAM frame
|
||||
|
@ -27,8 +26,6 @@ type StreamFlowController interface {
|
|||
// The ConnectionFlowController is the flow controller for the connection.
|
||||
type ConnectionFlowController interface {
|
||||
flowController
|
||||
// for sending
|
||||
IsNewlyBlocked() (bool, protocol.ByteCount)
|
||||
}
|
||||
|
||||
type connectionFlowControllerI interface {
|
||||
|
|
|
@ -115,15 +115,6 @@ func (c *streamFlowController) SendWindowSize() protocol.ByteCount {
|
|||
return window
|
||||
}
|
||||
|
||||
// IsBlocked says if it is blocked by stream-level flow control.
|
||||
// If it is blocked, the offset is returned.
|
||||
func (c *streamFlowController) IsBlocked() (bool, protocol.ByteCount) {
|
||||
if c.sendWindowSize() != 0 {
|
||||
return false, 0
|
||||
}
|
||||
return true, c.sendWindow
|
||||
}
|
||||
|
||||
func (c *streamFlowController) MaybeQueueWindowUpdate() {
|
||||
c.mutex.Lock()
|
||||
hasWindowUpdate := !c.receivedFinalOffset && c.hasWindowUpdate()
|
||||
|
|
|
@ -284,7 +284,7 @@ var _ = Describe("Stream Flow controller", func() {
|
|||
controller.AddBytesSent(50)
|
||||
blocked, _ := controller.connection.IsNewlyBlocked()
|
||||
Expect(blocked).To(BeTrue())
|
||||
Expect(controller.IsBlocked()).To(BeFalse())
|
||||
Expect(controller.IsNewlyBlocked()).To(BeFalse())
|
||||
})
|
||||
})
|
||||
})
|
||||
|
|
|
@ -66,17 +66,17 @@ func (mr *MockStreamFlowControllerMockRecorder) GetWindowUpdate() *gomock.Call {
|
|||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetWindowUpdate", reflect.TypeOf((*MockStreamFlowController)(nil).GetWindowUpdate))
|
||||
}
|
||||
|
||||
// IsBlocked mocks base method
|
||||
func (m *MockStreamFlowController) IsBlocked() (bool, protocol.ByteCount) {
|
||||
ret := m.ctrl.Call(m, "IsBlocked")
|
||||
// IsNewlyBlocked mocks base method
|
||||
func (m *MockStreamFlowController) IsNewlyBlocked() (bool, protocol.ByteCount) {
|
||||
ret := m.ctrl.Call(m, "IsNewlyBlocked")
|
||||
ret0, _ := ret[0].(bool)
|
||||
ret1, _ := ret[1].(protocol.ByteCount)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// IsBlocked indicates an expected call of IsBlocked
|
||||
func (mr *MockStreamFlowControllerMockRecorder) IsBlocked() *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsBlocked", reflect.TypeOf((*MockStreamFlowController)(nil).IsBlocked))
|
||||
// IsNewlyBlocked indicates an expected call of IsNewlyBlocked
|
||||
func (mr *MockStreamFlowControllerMockRecorder) IsNewlyBlocked() *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsNewlyBlocked", reflect.TypeOf((*MockStreamFlowController)(nil).IsNewlyBlocked))
|
||||
}
|
||||
|
||||
// MaybeQueueWindowUpdate mocks base method
|
||||
|
|
|
@ -166,22 +166,19 @@ func (s *sendStream) popStreamFrameImpl(maxBytes protocol.ByteCount) (bool /* co
|
|||
if s.dataForWriting == nil {
|
||||
return false, nil, false
|
||||
}
|
||||
isBlocked, _ := s.flowController.IsBlocked()
|
||||
return false, nil, !isBlocked
|
||||
}
|
||||
if frame.FinBit {
|
||||
s.finSent = true
|
||||
return true, frame, s.dataForWriting != nil
|
||||
} else if s.streamID != s.version.CryptoStreamID() { // TODO(#657): Flow control for the crypto stream
|
||||
if isBlocked, offset := s.flowController.IsBlocked(); isBlocked {
|
||||
if isBlocked, offset := s.flowController.IsNewlyBlocked(); isBlocked {
|
||||
s.sender.queueControlFrame(&wire.StreamBlockedFrame{
|
||||
StreamID: s.streamID,
|
||||
Offset: offset,
|
||||
})
|
||||
return false, frame, false
|
||||
return false, nil, false
|
||||
}
|
||||
return false, nil, true
|
||||
}
|
||||
return false, frame, s.dataForWriting != nil
|
||||
if frame.FinBit {
|
||||
s.finSent = true
|
||||
}
|
||||
return frame.FinBit, frame, s.dataForWriting != nil
|
||||
}
|
||||
|
||||
func (s *sendStream) getDataForWriting(maxBytes protocol.ByteCount) ([]byte, bool /* should send FIN */) {
|
||||
|
|
|
@ -54,7 +54,6 @@ var _ = Describe("Send Stream", func() {
|
|||
mockSender.EXPECT().onHasStreamData(streamID)
|
||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
|
||||
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
|
||||
mockFC.EXPECT().IsBlocked()
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
|
@ -79,7 +78,6 @@ var _ = Describe("Send Stream", func() {
|
|||
frameHeaderLen := protocol.ByteCount(4)
|
||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
|
||||
mockFC.EXPECT().AddBytesSent(gomock.Any() /* protocol.ByteCount(3)*/).Times(2)
|
||||
mockFC.EXPECT().IsBlocked().Times(2)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
|
@ -113,7 +111,6 @@ var _ = Describe("Send Stream", func() {
|
|||
mockSender.EXPECT().onHasStreamData(streamID)
|
||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
|
||||
mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2)
|
||||
mockFC.EXPECT().IsBlocked().Times(2)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
|
@ -140,7 +137,6 @@ var _ = Describe("Send Stream", func() {
|
|||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
|
||||
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(1))
|
||||
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(2))
|
||||
mockFC.EXPECT().IsBlocked().Times(2)
|
||||
s := []byte("foo")
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
|
@ -180,9 +176,13 @@ var _ = Describe("Send Stream", func() {
|
|||
})
|
||||
|
||||
Context("flow control blocking", func() {
|
||||
It("returns nil when it is blocked", func() {
|
||||
It("queues a BLOCKED frame if the stream is flow control blocked", func() {
|
||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(0))
|
||||
mockFC.EXPECT().IsBlocked().Return(true, protocol.ByteCount(10))
|
||||
mockFC.EXPECT().IsNewlyBlocked().Return(true, protocol.ByteCount(12))
|
||||
mockSender.EXPECT().queueControlFrame(&wire.StreamBlockedFrame{
|
||||
StreamID: streamID,
|
||||
Offset: 12,
|
||||
})
|
||||
mockSender.EXPECT().onHasStreamData(streamID)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
|
@ -200,74 +200,41 @@ var _ = Describe("Send Stream", func() {
|
|||
Eventually(done).Should(BeClosed())
|
||||
})
|
||||
|
||||
It("queues a BLOCKED frame if the stream is flow control blocked", func() {
|
||||
It("says that it doesn't have any more data, when it is flow control blocked", func() {
|
||||
frameHeaderSize := protocol.ByteCount(4)
|
||||
mockSender.EXPECT().onHasStreamData(streamID)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
_, err := str.Write([]byte("foobar"))
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
close(done)
|
||||
}()
|
||||
waitForWrite()
|
||||
|
||||
// first pop a STREAM frame of the maximum size allowed by flow control
|
||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(3))
|
||||
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(3))
|
||||
f, hasMoreData := str.popStreamFrame(frameHeaderSize + 3)
|
||||
Expect(f).ToNot(BeNil())
|
||||
Expect(hasMoreData).To(BeTrue())
|
||||
|
||||
// try to pop again, this time noticing that we're blocked
|
||||
mockFC.EXPECT().SendWindowSize()
|
||||
// don't use offset 3 here, to make sure the BLOCKED frame contains the number returned by the flow controller
|
||||
mockFC.EXPECT().IsNewlyBlocked().Return(true, protocol.ByteCount(10))
|
||||
mockSender.EXPECT().queueControlFrame(&wire.StreamBlockedFrame{
|
||||
StreamID: streamID,
|
||||
Offset: 10,
|
||||
})
|
||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
|
||||
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
|
||||
// don't use offset 6 here, to make sure the BLOCKED frame contains the number returned by the flow controller
|
||||
mockFC.EXPECT().IsBlocked().Return(true, protocol.ByteCount(10))
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
_, err := str.Write([]byte("foobar"))
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
close(done)
|
||||
}()
|
||||
waitForWrite()
|
||||
f, hasMoreData := str.popStreamFrame(1000)
|
||||
Expect(f).ToNot(BeNil())
|
||||
Expect(hasMoreData).To(BeFalse())
|
||||
Eventually(done).Should(BeClosed())
|
||||
})
|
||||
|
||||
It("says that it doesn't have any more data, when it is flow control blocked", func() {
|
||||
mockSender.EXPECT().onHasStreamData(streamID)
|
||||
mockSender.EXPECT().queueControlFrame(gomock.Any())
|
||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
|
||||
mockFC.EXPECT().AddBytesSent(gomock.Any())
|
||||
mockFC.EXPECT().IsBlocked().Return(true, protocol.ByteCount(10))
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
_, err := str.Write(bytes.Repeat([]byte{0}, 100))
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
close(done)
|
||||
}()
|
||||
waitForWrite()
|
||||
f, hasMoreData := str.popStreamFrame(50)
|
||||
Expect(f).ToNot(BeNil())
|
||||
f, hasMoreData = str.popStreamFrame(1000)
|
||||
Expect(f).To(BeNil())
|
||||
Expect(hasMoreData).To(BeFalse())
|
||||
// make the Write go routine return
|
||||
str.closeForShutdown(nil)
|
||||
Eventually(done).Should(BeClosed())
|
||||
})
|
||||
|
||||
It("doesn't queue a BLOCKED frame if the stream is flow control blocked, but the frame popped has the FIN bit set", func() {
|
||||
mockSender.EXPECT().onHasStreamData(streamID).Times(2) // once for the Write, once for the Close
|
||||
mockSender.EXPECT().onStreamCompleted(streamID)
|
||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
|
||||
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
|
||||
// don't EXPECT a call to mockFC.IsBlocked
|
||||
// don't EXPECT a call to mockSender.queueControlFrame
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
_, err := str.Write([]byte("foobar"))
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
close(done)
|
||||
}()
|
||||
waitForWrite()
|
||||
Expect(str.Close()).To(Succeed())
|
||||
f, hasMoreData := str.popStreamFrame(1000)
|
||||
Expect(hasMoreData).To(BeFalse())
|
||||
Expect(f).ToNot(BeNil())
|
||||
Expect(f.FinBit).To(BeTrue())
|
||||
Eventually(done).Should(BeClosed())
|
||||
})
|
||||
})
|
||||
|
||||
Context("deadlines", func() {
|
||||
|
@ -292,7 +259,6 @@ var _ = Describe("Send Stream", func() {
|
|||
mockSender.EXPECT().onHasStreamData(streamID)
|
||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(10000)).AnyTimes()
|
||||
mockFC.EXPECT().AddBytesSent(gomock.Any())
|
||||
mockFC.EXPECT().IsBlocked()
|
||||
deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
|
||||
str.SetWriteDeadline(deadline)
|
||||
var n int
|
||||
|
@ -317,7 +283,6 @@ var _ = Describe("Send Stream", func() {
|
|||
mockSender.EXPECT().onHasStreamData(streamID)
|
||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(10000)).AnyTimes()
|
||||
mockFC.EXPECT().AddBytesSent(gomock.Any())
|
||||
mockFC.EXPECT().IsBlocked()
|
||||
deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
|
||||
str.SetWriteDeadline(deadline)
|
||||
writeReturned := make(chan struct{})
|
||||
|
@ -405,7 +370,6 @@ var _ = Describe("Send Stream", func() {
|
|||
frameHeaderLen := protocol.ByteCount(4)
|
||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
|
||||
mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2)
|
||||
mockFC.EXPECT().IsBlocked()
|
||||
str.dataForWriting = []byte("foobar")
|
||||
Expect(str.Close()).To(Succeed())
|
||||
f, _ := str.popStreamFrame(3 + frameHeaderLen)
|
||||
|
@ -453,7 +417,6 @@ var _ = Describe("Send Stream", func() {
|
|||
mockSender.EXPECT().onHasStreamData(streamID)
|
||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
|
||||
mockFC.EXPECT().AddBytesSent(gomock.Any())
|
||||
mockFC.EXPECT().IsBlocked()
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
|
@ -530,7 +493,6 @@ var _ = Describe("Send Stream", func() {
|
|||
mockSender.EXPECT().queueControlFrame(gomock.Any())
|
||||
mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount)
|
||||
mockFC.EXPECT().AddBytesSent(gomock.Any())
|
||||
mockFC.EXPECT().IsBlocked()
|
||||
writeReturned := make(chan struct{})
|
||||
var n int
|
||||
go func() {
|
||||
|
|
|
@ -130,7 +130,6 @@ var _ = Describe("Stream", func() {
|
|||
mockSender.EXPECT().onHasStreamData(streamID).Times(2) // once for the Write, once for the Close
|
||||
mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).AnyTimes()
|
||||
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
|
||||
mockFC.EXPECT().IsBlocked()
|
||||
err := str.CancelRead(1234)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
writeReturned := make(chan struct{})
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue