diff --git a/crypto_stream.go b/crypto_stream.go index b9263da6..c98bc6d4 100644 --- a/crypto_stream.go +++ b/crypto_stream.go @@ -19,7 +19,7 @@ type cryptoStreamI interface { // methods needed for flow control GetWindowUpdate() protocol.ByteCount HandleMaxStreamDataFrame(*wire.MaxStreamDataFrame) - IsFlowControlBlocked() bool + IsFlowControlBlocked() (bool, protocol.ByteCount) } type cryptoStream struct { diff --git a/internal/flowcontrol/base_flow_controller.go b/internal/flowcontrol/base_flow_controller.go index f7d70c7e..616e4bf5 100644 --- a/internal/flowcontrol/base_flow_controller.go +++ b/internal/flowcontrol/base_flow_controller.go @@ -79,11 +79,16 @@ func (c *baseFlowController) getWindowUpdate() protocol.ByteCount { return c.receiveWindow } -func (c *baseFlowController) IsBlocked() bool { +// IsBlocked says if it is blocked by flow control. +// If it is blocked, the offset is returned. +func (c *baseFlowController) IsBlocked() (bool, protocol.ByteCount) { c.mutex.RLock() defer c.mutex.RUnlock() - return c.sendWindowSize() == 0 + if c.sendWindowSize() != 0 { + return false, 0 + } + return true, c.sendWindow } // maybeAdjustWindowIncrement increases the receiveWindowIncrement if we're sending updates too often. diff --git a/internal/flowcontrol/base_flow_controller_test.go b/internal/flowcontrol/base_flow_controller_test.go index 9239722c..790d31f3 100644 --- a/internal/flowcontrol/base_flow_controller_test.go +++ b/internal/flowcontrol/base_flow_controller_test.go @@ -54,7 +54,9 @@ var _ = Describe("Base Flow controller", func() { controller.UpdateSendWindow(100) Expect(controller.IsBlocked()).To(BeFalse()) controller.AddBytesSent(100) - Expect(controller.IsBlocked()).To(BeTrue()) + blocked, offset := controller.IsBlocked() + Expect(blocked).To(BeTrue()) + Expect(offset).To(Equal(protocol.ByteCount(100))) }) }) diff --git a/internal/flowcontrol/interface.go b/internal/flowcontrol/interface.go index 75ec6fac..f5577283 100644 --- a/internal/flowcontrol/interface.go +++ b/internal/flowcontrol/interface.go @@ -5,7 +5,7 @@ import "github.com/lucas-clemente/quic-go/internal/protocol" type flowController interface { // for sending SendWindowSize() protocol.ByteCount - IsBlocked() bool + IsBlocked() (bool, protocol.ByteCount) UpdateSendWindow(protocol.ByteCount) AddBytesSent(protocol.ByteCount) // for receiving diff --git a/internal/flowcontrol/stream_flow_controller_test.go b/internal/flowcontrol/stream_flow_controller_test.go index ed51adcf..5938e5e6 100644 --- a/internal/flowcontrol/stream_flow_controller_test.go +++ b/internal/flowcontrol/stream_flow_controller_test.go @@ -233,7 +233,8 @@ var _ = Describe("Stream Flow controller", func() { controller.connection.UpdateSendWindow(50) controller.UpdateSendWindow(100) controller.AddBytesSent(50) - Expect(controller.connection.IsBlocked()).To(BeTrue()) + blocked, _ := controller.connection.IsBlocked() + Expect(blocked).To(BeTrue()) Expect(controller.IsBlocked()).To(BeFalse()) }) }) diff --git a/internal/mocks/connection_flow_controller.go b/internal/mocks/connection_flow_controller.go index 26c77cbe..cf99e9d1 100644 --- a/internal/mocks/connection_flow_controller.go +++ b/internal/mocks/connection_flow_controller.go @@ -66,10 +66,11 @@ func (_mr *MockConnectionFlowControllerMockRecorder) GetWindowUpdate() *gomock.C } // IsBlocked mocks base method -func (_m *MockConnectionFlowController) IsBlocked() bool { +func (_m *MockConnectionFlowController) IsBlocked() (bool, protocol.ByteCount) { ret := _m.ctrl.Call(_m, "IsBlocked") ret0, _ := ret[0].(bool) - return ret0 + ret1, _ := ret[1].(protocol.ByteCount) + return ret0, ret1 } // IsBlocked indicates an expected call of IsBlocked diff --git a/internal/mocks/stream.go b/internal/mocks/stream.go index 107d9177..0500861b 100644 --- a/internal/mocks/stream.go +++ b/internal/mocks/stream.go @@ -129,10 +129,11 @@ func (_mr *MockStreamIMockRecorder) HandleStreamFrame(arg0 interface{}) *gomock. } // IsFlowControlBlocked mocks base method -func (_m *MockStreamI) IsFlowControlBlocked() bool { +func (_m *MockStreamI) IsFlowControlBlocked() (bool, protocol.ByteCount) { ret := _m.ctrl.Call(_m, "IsFlowControlBlocked") ret0, _ := ret[0].(bool) - return ret0 + ret1, _ := ret[1].(protocol.ByteCount) + return ret0, ret1 } // IsFlowControlBlocked indicates an expected call of IsFlowControlBlocked diff --git a/internal/mocks/stream_flow_controller.go b/internal/mocks/stream_flow_controller.go index 7e970bb4..8bca7b9e 100644 --- a/internal/mocks/stream_flow_controller.go +++ b/internal/mocks/stream_flow_controller.go @@ -66,10 +66,11 @@ func (_mr *MockStreamFlowControllerMockRecorder) GetWindowUpdate() *gomock.Call } // IsBlocked mocks base method -func (_m *MockStreamFlowController) IsBlocked() bool { +func (_m *MockStreamFlowController) IsBlocked() (bool, protocol.ByteCount) { ret := _m.ctrl.Call(_m, "IsBlocked") ret0, _ := ret[0].(bool) - return ret0 + ret1, _ := ret[1].(protocol.ByteCount) + return ret0, ret1 } // IsBlocked indicates an expected call of IsBlocked diff --git a/stream.go b/stream.go index 6dacfd3f..aacacd77 100644 --- a/stream.go +++ b/stream.go @@ -25,7 +25,7 @@ type streamI interface { // methods needed for flow control GetWindowUpdate() protocol.ByteCount HandleMaxStreamDataFrame(*wire.MaxStreamDataFrame) - IsFlowControlBlocked() bool + IsFlowControlBlocked() (bool, protocol.ByteCount) } // A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface @@ -484,7 +484,7 @@ func (s *stream) HandleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) { s.flowController.UpdateSendWindow(frame.ByteOffset) } -func (s *stream) IsFlowControlBlocked() bool { +func (s *stream) IsFlowControlBlocked() (bool, protocol.ByteCount) { return s.flowController.IsBlocked() } diff --git a/stream_framer.go b/stream_framer.go index 9244bc3e..d5fa09a7 100644 --- a/stream_framer.go +++ b/stream_framer.go @@ -105,11 +105,16 @@ func (f *streamFramer) maybePopNormalFrames(maxTotalLen protocol.ByteCount) (res } // Finally, check if we are now FC blocked and should queue a BLOCKED frame - if !frame.FinBit && s.IsFlowControlBlocked() { - f.blockedFrameQueue = append(f.blockedFrameQueue, &wire.StreamBlockedFrame{StreamID: s.StreamID()}) + if !frame.FinBit { + if blocked, offset := s.IsFlowControlBlocked(); blocked { + f.blockedFrameQueue = append(f.blockedFrameQueue, &wire.StreamBlockedFrame{ + StreamID: s.StreamID(), + Offset: offset, + }) + } } - if f.connFlowController.IsBlocked() { - f.blockedFrameQueue = append(f.blockedFrameQueue, &wire.BlockedFrame{}) + if blocked, offset := f.connFlowController.IsBlocked(); blocked { + f.blockedFrameQueue = append(f.blockedFrameQueue, &wire.BlockedFrame{Offset: offset}) } res = append(res, frame) diff --git a/stream_framer_test.go b/stream_framer_test.go index cb5ba7c3..d3307cdd 100644 --- a/stream_framer_test.go +++ b/stream_framer_test.go @@ -72,8 +72,8 @@ var _ = Describe("Stream Framer", func() { BeforeEach(func() { // nothing is blocked here connFC.EXPECT().IsBlocked().AnyTimes() - stream1.EXPECT().IsFlowControlBlocked().Return(false).AnyTimes() - stream2.EXPECT().IsFlowControlBlocked().Return(false).AnyTimes() + stream1.EXPECT().IsFlowControlBlocked().Return(false, protocol.ByteCount(0)).AnyTimes() + stream2.EXPECT().IsFlowControlBlocked().Return(false, protocol.ByteCount(0)).AnyTimes() }) It("returns nil when popping an empty framer", func() { @@ -272,13 +272,13 @@ var _ = Describe("Stream Framer", func() { StreamID: id1, Data: []byte("foobar"), }) - stream1.EXPECT().IsFlowControlBlocked().Return(true) + stream1.EXPECT().IsFlowControlBlocked().Return(true, protocol.ByteCount(0x1234)) frames := framer.PopStreamFrames(1000) Expect(frames).To(HaveLen(1)) - f := framer.PopBlockedFrame() - Expect(f).To(BeAssignableToTypeOf(&wire.StreamBlockedFrame{})) - bf := f.(*wire.StreamBlockedFrame) - Expect(bf.StreamID).To(Equal(stream1.StreamID())) + Expect(framer.PopBlockedFrame()).To(Equal(&wire.StreamBlockedFrame{ + StreamID: stream1.StreamID(), + Offset: 0x1234, + })) Expect(framer.PopBlockedFrame()).To(BeNil()) }) @@ -300,15 +300,14 @@ var _ = Describe("Stream Framer", func() { It("queues and pops BLOCKED frames for connection blocked streams", func() { setNoData(stream2) - connFC.EXPECT().IsBlocked().Return(true) + connFC.EXPECT().IsBlocked().Return(true, protocol.ByteCount(0x4321)) stream1.EXPECT().PopStreamFrame(gomock.Any()).Return(&wire.StreamFrame{ StreamID: id1, Data: []byte("foo"), }) - stream1.EXPECT().IsFlowControlBlocked().Return(false) + stream1.EXPECT().IsFlowControlBlocked().Return(false, protocol.ByteCount(0)) framer.PopStreamFrames(1000) - f := framer.PopBlockedFrame() - Expect(f).To(BeAssignableToTypeOf(&wire.BlockedFrame{})) + Expect(framer.PopBlockedFrame()).To(Equal(&wire.BlockedFrame{Offset: 0x4321})) Expect(framer.PopBlockedFrame()).To(BeNil()) }) }) diff --git a/stream_test.go b/stream_test.go index 5bc197cb..b9ce75e0 100644 --- a/stream_test.go +++ b/stream_test.go @@ -1146,10 +1146,13 @@ var _ = Describe("Stream", func() { Context("flow control", func() { It("says when it's flow control blocked", func() { - mockFC.EXPECT().IsBlocked().Return(false) - Expect(str.IsFlowControlBlocked()).To(BeFalse()) - mockFC.EXPECT().IsBlocked().Return(true) - Expect(str.IsFlowControlBlocked()).To(BeTrue()) + mockFC.EXPECT().IsBlocked().Return(false, protocol.ByteCount(0)) + blocked, _ := str.IsFlowControlBlocked() + Expect(blocked).To(BeFalse()) + mockFC.EXPECT().IsBlocked().Return(true, protocol.ByteCount(0x1337)) + blocked, offset := str.IsFlowControlBlocked() + Expect(blocked).To(BeTrue()) + Expect(offset).To(Equal(protocol.ByteCount(0x1337))) }) It("updates the flow control window", func() {