From d49ad2d0cc50e9669e3ab7f070b29d11f075701c Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 18 Dec 2017 13:45:05 +0700 Subject: [PATCH] queue STREAM_BLOCKED frames from the stream, when popping a STREAM frame --- crypto_stream.go | 1 - internal/mocks/stream.go | 13 ----- packet_packer_test.go | 10 ++-- send_stream.go | 12 +++-- send_stream_test.go | 103 ++++++++++++++++++++++++++------------- stream.go | 1 - stream_framer.go | 11 +---- stream_framer_test.go | 26 +--------- stream_test.go | 1 + 9 files changed, 86 insertions(+), 92 deletions(-) diff --git a/crypto_stream.go b/crypto_stream.go index 395794e2..4d967000 100644 --- a/crypto_stream.go +++ b/crypto_stream.go @@ -19,7 +19,6 @@ type cryptoStreamI interface { // methods needed for flow control GetWindowUpdate() protocol.ByteCount HandleMaxStreamDataFrame(*wire.MaxStreamDataFrame) - IsFlowControlBlocked() (bool, protocol.ByteCount) } type cryptoStream struct { diff --git a/internal/mocks/stream.go b/internal/mocks/stream.go index 2c36be0b..65227b68 100644 --- a/internal/mocks/stream.go +++ b/internal/mocks/stream.go @@ -162,19 +162,6 @@ func (_mr *MockStreamIMockRecorder) HandleStreamFrame(arg0 interface{}) *gomock. return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "HandleStreamFrame", reflect.TypeOf((*MockStreamI)(nil).HandleStreamFrame), arg0) } -// IsFlowControlBlocked mocks base method -func (_m *MockStreamI) IsFlowControlBlocked() (bool, protocol.ByteCount) { - ret := _m.ctrl.Call(_m, "IsFlowControlBlocked") - ret0, _ := ret[0].(bool) - ret1, _ := ret[1].(protocol.ByteCount) - return ret0, ret1 -} - -// IsFlowControlBlocked indicates an expected call of IsFlowControlBlocked -func (_mr *MockStreamIMockRecorder) IsFlowControlBlocked() *gomock.Call { - return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "IsFlowControlBlocked", reflect.TypeOf((*MockStreamI)(nil).IsFlowControlBlocked)) -} - // PopStreamFrame mocks base method func (_m *MockStreamI) PopStreamFrame(_param0 protocol.ByteCount) *wire.StreamFrame { ret := _m.ctrl.Call(_m, "PopStreamFrame", _param0) diff --git a/packet_packer_test.go b/packet_packer_test.go index 454fe37b..e4c80fcb 100644 --- a/packet_packer_test.go +++ b/packet_packer_test.go @@ -693,7 +693,7 @@ var _ = Describe("Packet packer", func() { Context("BLOCKED frames", func() { It("queues a BLOCKED frame", func() { length := 100 - streamFramer.blockedFrameQueue = []wire.Frame{&wire.StreamBlockedFrame{StreamID: 5}} + streamFramer.blockedFrameQueue = []*wire.BlockedFrame{&wire.BlockedFrame{Offset: 555}} f := &wire.StreamFrame{ StreamID: 5, Data: bytes.Repeat([]byte{'f'}, length), @@ -701,12 +701,12 @@ var _ = Describe("Packet packer", func() { streamFramer.AddFrameForRetransmission(f) _, err := packer.composeNextPacket(maxFrameSize, true) Expect(err).ToNot(HaveOccurred()) - Expect(packer.controlFrames[0]).To(Equal(&wire.StreamBlockedFrame{StreamID: 5})) + Expect(packer.controlFrames[0]).To(Equal(&wire.BlockedFrame{Offset: 555})) }) - It("removes the dataLen attribute from the last StreamFrame, even if it queued a BLOCKED frame", func() { + It("removes the dataLen attribute from the last STREAM frame, even if it queued a BLOCKED frame", func() { length := 100 - streamFramer.blockedFrameQueue = []wire.Frame{&wire.StreamBlockedFrame{StreamID: 5}} + streamFramer.blockedFrameQueue = []*wire.BlockedFrame{&wire.BlockedFrame{Offset: 50}} f := &wire.StreamFrame{ StreamID: 5, Data: bytes.Repeat([]byte{'f'}, length), @@ -719,7 +719,7 @@ var _ = Describe("Packet packer", func() { }) It("packs a connection-level BlockedFrame", func() { - streamFramer.blockedFrameQueue = []wire.Frame{&wire.BlockedFrame{}} + streamFramer.blockedFrameQueue = []*wire.BlockedFrame{&wire.BlockedFrame{}} f := &wire.StreamFrame{ StreamID: 5, Data: []byte("foobar"), diff --git a/send_stream.go b/send_stream.go index 78f75b44..f09e975f 100644 --- a/send_stream.go +++ b/send_stream.go @@ -145,6 +145,14 @@ func (s *sendStream) PopStreamFrame(maxBytes protocol.ByteCount) *wire.StreamFra } if frame.FinBit { s.finSent = true + } else if s.streamID != s.version.CryptoStreamID() { // TODO(#657): Flow control for the crypto stream + if isBlocked, offset := s.flowController.IsBlocked(); isBlocked { + s.queueControlFrame(&wire.StreamBlockedFrame{ + StreamID: s.streamID, + Offset: offset, + }) + s.onData() + } } return frame } @@ -228,10 +236,6 @@ func (s *sendStream) HandleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) { s.flowController.UpdateSendWindow(frame.ByteOffset) } -func (s *sendStream) IsFlowControlBlocked() (bool, protocol.ByteCount) { - return s.flowController.IsBlocked() -} - // must be called after locking the mutex func (s *sendStream) handleStopSendingFrameImpl(frame *wire.StopSendingFrame) { writeErr := streamCanceledError{ diff --git a/send_stream_test.go b/send_stream_test.go index e594a88c..c80d91c9 100644 --- a/send_stream_test.go +++ b/send_stream_test.go @@ -49,6 +49,7 @@ var _ = Describe("Send Stream", func() { It("writes and gets all data at once", func() { mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6)) + mockFC.EXPECT().IsBlocked() done := make(chan struct{}) go func() { defer GinkgoRecover() @@ -57,14 +58,13 @@ var _ = Describe("Send Stream", func() { Expect(n).To(Equal(6)) close(done) }() - Eventually(func() []byte { - str.mutex.Lock() - defer str.mutex.Unlock() - return str.dataForWriting - }).Should(Equal([]byte("foobar"))) Consistently(done).ShouldNot(BeClosed()) + var f *wire.StreamFrame + Eventually(func() *wire.StreamFrame { + f = str.PopStreamFrame(1000) + return f + }).ShouldNot(BeNil()) Expect(onDataCalled).To(BeTrue()) - f := str.PopStreamFrame(1000) Expect(f.Data).To(Equal([]byte("foobar"))) Expect(f.FinBit).To(BeFalse()) Expect(f.Offset).To(BeZero()) @@ -78,6 +78,7 @@ var _ = Describe("Send Stream", func() { frameHeaderLen := protocol.ByteCount(4) mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2) mockFC.EXPECT().AddBytesSent(gomock.Any() /* protocol.ByteCount(3)*/).Times(2) + mockFC.EXPECT().IsBlocked().Times(2) done := make(chan struct{}) go func() { defer GinkgoRecover() @@ -86,13 +87,12 @@ var _ = Describe("Send Stream", func() { Expect(n).To(Equal(6)) close(done) }() - Eventually(func() []byte { - str.mutex.Lock() - defer str.mutex.Unlock() - return str.dataForWriting - }).Should(Equal([]byte("foobar"))) Consistently(done).ShouldNot(BeClosed()) - f := str.PopStreamFrame(3 + frameHeaderLen) + var f *wire.StreamFrame + Eventually(func() *wire.StreamFrame { + f = str.PopStreamFrame(3 + frameHeaderLen) + return f + }).ShouldNot(BeNil()) Expect(f.Data).To(Equal([]byte("foo"))) Expect(f.FinBit).To(BeFalse()) Expect(f.Offset).To(BeZero()) @@ -115,6 +115,7 @@ var _ = Describe("Send Stream", func() { mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(1)) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(2)) + mockFC.EXPECT().IsBlocked().Times(2) s := []byte("foo") go func() { defer GinkgoRecover() @@ -149,6 +150,58 @@ var _ = Describe("Send Stream", func() { Expect(str.Context().Done()).To(BeClosed()) }) + Context("adding BLOCKED", func() { + It("queues a BLOCKED frame if the stream is flow control blocked", func() { + mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)) + mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6)) + // don't use offset 6 here, to make sure the BLOCKED frame contains the number returned by the flow controller + mockFC.EXPECT().IsBlocked().Return(true, protocol.ByteCount(10)) + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + _, err := str.Write([]byte("foobar")) + Expect(err).ToNot(HaveOccurred()) + close(done) + }() + var f *wire.StreamFrame + Eventually(func() *wire.StreamFrame { + f = str.PopStreamFrame(1000) + return f + }).ShouldNot(BeNil()) + Expect(queuedControlFrames).To(Equal([]wire.Frame{ + &wire.StreamBlockedFrame{ + StreamID: streamID, + Offset: 10, + }, + })) + Expect(onDataCalled).To(BeTrue()) + Eventually(done).Should(BeClosed()) + }) + + It("doesn't queue a BLOCKED frame if the stream is flow control blocked, but the frame popped has the FIN bit set", func() { + mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)) + mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6)) + // don't EXPECT a call to IsBlocked + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + _, err := str.Write([]byte("foobar")) + Expect(err).ToNot(HaveOccurred()) + close(done) + }() + Consistently(done).ShouldNot(BeClosed()) + Expect(str.Close()).To(Succeed()) + var f *wire.StreamFrame + Eventually(func() *wire.StreamFrame { + f = str.PopStreamFrame(1000) + return f + }).ShouldNot(BeNil()) + Expect(f.FinBit).To(BeTrue()) + Expect(queuedControlFrames).To(BeEmpty()) + Eventually(done).Should(BeClosed()) + }) + }) + Context("deadlines", func() { It("returns an error when Write is called after the deadline", func() { str.SetWriteDeadline(time.Now().Add(-time.Second)) @@ -169,6 +222,7 @@ var _ = Describe("Send Stream", func() { It("returns the number of bytes written, when the deadline expires", func() { mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(10000)).AnyTimes() mockFC.EXPECT().AddBytesSent(gomock.Any()) + mockFC.EXPECT().IsBlocked() deadline := time.Now().Add(scaleDuration(50 * time.Millisecond)) str.SetWriteDeadline(deadline) var n int @@ -241,10 +295,11 @@ var _ = Describe("Send Stream", func() { Expect(f.FinBit).To(BeTrue()) }) - It("doesn't allow FIN when there's still data", func() { + It("doesn't send a FIN when there's still data", func() { frameHeaderLen := protocol.ByteCount(4) mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2) mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2) + mockFC.EXPECT().IsBlocked() str.dataForWriting = []byte("foobar") str.Close() f := str.PopStreamFrame(3 + frameHeaderLen) @@ -285,6 +340,7 @@ var _ = Describe("Send Stream", func() { It("doesn't get data for writing if an error occurred", func() { mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)) mockFC.EXPECT().AddBytesSent(gomock.Any()) + mockFC.EXPECT().IsBlocked() done := make(chan struct{}) go func() { defer GinkgoRecover() @@ -324,6 +380,7 @@ var _ = Describe("Send Stream", func() { It("unblocks Write", func() { mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount) mockFC.EXPECT().AddBytesSent(gomock.Any()) + mockFC.EXPECT().IsBlocked() writeReturned := make(chan struct{}) var n int go func() { @@ -421,26 +478,6 @@ var _ = Describe("Send Stream", func() { }) }) - Context("flow control", func() { - It("says when it's flow control blocked", func() { - mockFC.EXPECT().IsBlocked().Return(false, protocol.ByteCount(0)) - blocked, _ := str.IsFlowControlBlocked() - Expect(blocked).To(BeFalse()) - mockFC.EXPECT().IsBlocked().Return(true, protocol.ByteCount(0x1337)) - blocked, offset := str.IsFlowControlBlocked() - Expect(blocked).To(BeTrue()) - Expect(offset).To(Equal(protocol.ByteCount(0x1337))) - }) - - It("updates the flow control window", func() { - mockFC.EXPECT().UpdateSendWindow(protocol.ByteCount(0x42)) - str.HandleMaxStreamDataFrame(&wire.MaxStreamDataFrame{ - StreamID: streamID, - ByteOffset: 0x42, - }) - }) - }) - Context("saying if it is finished", func() { It("is finished after it is closed for shutdown", func() { str.CloseForShutdown(errors.New("testErr")) diff --git a/stream.go b/stream.go index 0d9915c0..d2936e02 100644 --- a/stream.go +++ b/stream.go @@ -26,7 +26,6 @@ type streamI interface { // methods needed for flow control GetWindowUpdate() protocol.ByteCount HandleMaxStreamDataFrame(*wire.MaxStreamDataFrame) - IsFlowControlBlocked() (bool, protocol.ByteCount) } // A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface diff --git a/stream_framer.go b/stream_framer.go index d5fa09a7..8e032f45 100644 --- a/stream_framer.go +++ b/stream_framer.go @@ -14,7 +14,7 @@ type streamFramer struct { connFlowController flowcontrol.ConnectionFlowController retransmissionQueue []*wire.StreamFrame - blockedFrameQueue []wire.Frame + blockedFrameQueue []*wire.BlockedFrame } func newStreamFramer( @@ -104,15 +104,6 @@ func (f *streamFramer) maybePopNormalFrames(maxTotalLen protocol.ByteCount) (res return true, nil } - // Finally, check if we are now FC blocked and should queue a BLOCKED frame - if !frame.FinBit { - if blocked, offset := s.IsFlowControlBlocked(); blocked { - f.blockedFrameQueue = append(f.blockedFrameQueue, &wire.StreamBlockedFrame{ - StreamID: s.StreamID(), - Offset: offset, - }) - } - } if blocked, offset := f.connFlowController.IsBlocked(); blocked { f.blockedFrameQueue = append(f.blockedFrameQueue, &wire.BlockedFrame{Offset: offset}) } diff --git a/stream_framer_test.go b/stream_framer_test.go index d3307cdd..a2d33f78 100644 --- a/stream_framer_test.go +++ b/stream_framer_test.go @@ -70,10 +70,8 @@ var _ = Describe("Stream Framer", func() { Context("Popping", func() { BeforeEach(func() { - // nothing is blocked here + // we're not connection-level flow control blocked connFC.EXPECT().IsBlocked().AnyTimes() - stream1.EXPECT().IsFlowControlBlocked().Return(false, protocol.ByteCount(0)).AnyTimes() - stream2.EXPECT().IsFlowControlBlocked().Return(false, protocol.ByteCount(0)).AnyTimes() }) It("returns nil when popping an empty framer", func() { @@ -261,27 +259,6 @@ var _ = Describe("Stream Framer", func() { }) Context("BLOCKED frames", func() { - It("Pop returns nil if no frame is queued", func() { - Expect(framer.PopBlockedFrame()).To(BeNil()) - }) - - It("queues and pops BLOCKED frames for individually blocked streams", func() { - setNoData(stream2) - connFC.EXPECT().IsBlocked() - stream1.EXPECT().PopStreamFrame(gomock.Any()).Return(&wire.StreamFrame{ - StreamID: id1, - Data: []byte("foobar"), - }) - stream1.EXPECT().IsFlowControlBlocked().Return(true, protocol.ByteCount(0x1234)) - frames := framer.PopStreamFrames(1000) - Expect(frames).To(HaveLen(1)) - Expect(framer.PopBlockedFrame()).To(Equal(&wire.StreamBlockedFrame{ - StreamID: stream1.StreamID(), - Offset: 0x1234, - })) - Expect(framer.PopBlockedFrame()).To(BeNil()) - }) - It("doesn't queue a stream-level BLOCKED frame after sending the FIN bit frame", func() { setNoData(stream2) f := &wire.StreamFrame{ @@ -305,7 +282,6 @@ var _ = Describe("Stream Framer", func() { StreamID: id1, Data: []byte("foo"), }) - stream1.EXPECT().IsFlowControlBlocked().Return(false, protocol.ByteCount(0)) framer.PopStreamFrames(1000) Expect(framer.PopBlockedFrame()).To(Equal(&wire.BlockedFrame{Offset: 0x4321})) Expect(framer.PopBlockedFrame()).To(BeNil()) diff --git a/stream_test.go b/stream_test.go index a0c94dec..2e4a9204 100644 --- a/stream_test.go +++ b/stream_test.go @@ -138,6 +138,7 @@ var _ = Describe("Stream", func() { str.version = versionGQUICFrames mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).AnyTimes() mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6)) + mockFC.EXPECT().IsBlocked() err := str.CancelRead(1234) Expect(err).ToNot(HaveOccurred()) Expect(queuedControlFrames).To(BeEmpty()) // no RST_STREAM frame queued yet