queue STREAM_BLOCKED frames from the stream, when popping a STREAM frame

This commit is contained in:
Marten Seemann 2017-12-18 13:45:05 +07:00
parent 1514e42e11
commit d49ad2d0cc
9 changed files with 86 additions and 92 deletions

View file

@ -19,7 +19,6 @@ type cryptoStreamI interface {
// methods needed for flow control
GetWindowUpdate() protocol.ByteCount
HandleMaxStreamDataFrame(*wire.MaxStreamDataFrame)
IsFlowControlBlocked() (bool, protocol.ByteCount)
}
type cryptoStream struct {

View file

@ -162,19 +162,6 @@ func (_mr *MockStreamIMockRecorder) HandleStreamFrame(arg0 interface{}) *gomock.
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "HandleStreamFrame", reflect.TypeOf((*MockStreamI)(nil).HandleStreamFrame), arg0)
}
// IsFlowControlBlocked mocks base method
func (_m *MockStreamI) IsFlowControlBlocked() (bool, protocol.ByteCount) {
ret := _m.ctrl.Call(_m, "IsFlowControlBlocked")
ret0, _ := ret[0].(bool)
ret1, _ := ret[1].(protocol.ByteCount)
return ret0, ret1
}
// IsFlowControlBlocked indicates an expected call of IsFlowControlBlocked
func (_mr *MockStreamIMockRecorder) IsFlowControlBlocked() *gomock.Call {
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "IsFlowControlBlocked", reflect.TypeOf((*MockStreamI)(nil).IsFlowControlBlocked))
}
// PopStreamFrame mocks base method
func (_m *MockStreamI) PopStreamFrame(_param0 protocol.ByteCount) *wire.StreamFrame {
ret := _m.ctrl.Call(_m, "PopStreamFrame", _param0)

View file

@ -693,7 +693,7 @@ var _ = Describe("Packet packer", func() {
Context("BLOCKED frames", func() {
It("queues a BLOCKED frame", func() {
length := 100
streamFramer.blockedFrameQueue = []wire.Frame{&wire.StreamBlockedFrame{StreamID: 5}}
streamFramer.blockedFrameQueue = []*wire.BlockedFrame{&wire.BlockedFrame{Offset: 555}}
f := &wire.StreamFrame{
StreamID: 5,
Data: bytes.Repeat([]byte{'f'}, length),
@ -701,12 +701,12 @@ var _ = Describe("Packet packer", func() {
streamFramer.AddFrameForRetransmission(f)
_, err := packer.composeNextPacket(maxFrameSize, true)
Expect(err).ToNot(HaveOccurred())
Expect(packer.controlFrames[0]).To(Equal(&wire.StreamBlockedFrame{StreamID: 5}))
Expect(packer.controlFrames[0]).To(Equal(&wire.BlockedFrame{Offset: 555}))
})
It("removes the dataLen attribute from the last StreamFrame, even if it queued a BLOCKED frame", func() {
It("removes the dataLen attribute from the last STREAM frame, even if it queued a BLOCKED frame", func() {
length := 100
streamFramer.blockedFrameQueue = []wire.Frame{&wire.StreamBlockedFrame{StreamID: 5}}
streamFramer.blockedFrameQueue = []*wire.BlockedFrame{&wire.BlockedFrame{Offset: 50}}
f := &wire.StreamFrame{
StreamID: 5,
Data: bytes.Repeat([]byte{'f'}, length),
@ -719,7 +719,7 @@ var _ = Describe("Packet packer", func() {
})
It("packs a connection-level BlockedFrame", func() {
streamFramer.blockedFrameQueue = []wire.Frame{&wire.BlockedFrame{}}
streamFramer.blockedFrameQueue = []*wire.BlockedFrame{&wire.BlockedFrame{}}
f := &wire.StreamFrame{
StreamID: 5,
Data: []byte("foobar"),

View file

@ -145,6 +145,14 @@ func (s *sendStream) PopStreamFrame(maxBytes protocol.ByteCount) *wire.StreamFra
}
if frame.FinBit {
s.finSent = true
} else if s.streamID != s.version.CryptoStreamID() { // TODO(#657): Flow control for the crypto stream
if isBlocked, offset := s.flowController.IsBlocked(); isBlocked {
s.queueControlFrame(&wire.StreamBlockedFrame{
StreamID: s.streamID,
Offset: offset,
})
s.onData()
}
}
return frame
}
@ -228,10 +236,6 @@ func (s *sendStream) HandleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) {
s.flowController.UpdateSendWindow(frame.ByteOffset)
}
func (s *sendStream) IsFlowControlBlocked() (bool, protocol.ByteCount) {
return s.flowController.IsBlocked()
}
// must be called after locking the mutex
func (s *sendStream) handleStopSendingFrameImpl(frame *wire.StopSendingFrame) {
writeErr := streamCanceledError{

View file

@ -49,6 +49,7 @@ var _ = Describe("Send Stream", func() {
It("writes and gets all data at once", func() {
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
mockFC.EXPECT().IsBlocked()
done := make(chan struct{})
go func() {
defer GinkgoRecover()
@ -57,14 +58,13 @@ var _ = Describe("Send Stream", func() {
Expect(n).To(Equal(6))
close(done)
}()
Eventually(func() []byte {
str.mutex.Lock()
defer str.mutex.Unlock()
return str.dataForWriting
}).Should(Equal([]byte("foobar")))
Consistently(done).ShouldNot(BeClosed())
var f *wire.StreamFrame
Eventually(func() *wire.StreamFrame {
f = str.PopStreamFrame(1000)
return f
}).ShouldNot(BeNil())
Expect(onDataCalled).To(BeTrue())
f := str.PopStreamFrame(1000)
Expect(f.Data).To(Equal([]byte("foobar")))
Expect(f.FinBit).To(BeFalse())
Expect(f.Offset).To(BeZero())
@ -78,6 +78,7 @@ var _ = Describe("Send Stream", func() {
frameHeaderLen := protocol.ByteCount(4)
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
mockFC.EXPECT().AddBytesSent(gomock.Any() /* protocol.ByteCount(3)*/).Times(2)
mockFC.EXPECT().IsBlocked().Times(2)
done := make(chan struct{})
go func() {
defer GinkgoRecover()
@ -86,13 +87,12 @@ var _ = Describe("Send Stream", func() {
Expect(n).To(Equal(6))
close(done)
}()
Eventually(func() []byte {
str.mutex.Lock()
defer str.mutex.Unlock()
return str.dataForWriting
}).Should(Equal([]byte("foobar")))
Consistently(done).ShouldNot(BeClosed())
f := str.PopStreamFrame(3 + frameHeaderLen)
var f *wire.StreamFrame
Eventually(func() *wire.StreamFrame {
f = str.PopStreamFrame(3 + frameHeaderLen)
return f
}).ShouldNot(BeNil())
Expect(f.Data).To(Equal([]byte("foo")))
Expect(f.FinBit).To(BeFalse())
Expect(f.Offset).To(BeZero())
@ -115,6 +115,7 @@ var _ = Describe("Send Stream", func() {
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(1))
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(2))
mockFC.EXPECT().IsBlocked().Times(2)
s := []byte("foo")
go func() {
defer GinkgoRecover()
@ -149,6 +150,58 @@ var _ = Describe("Send Stream", func() {
Expect(str.Context().Done()).To(BeClosed())
})
Context("adding BLOCKED", func() {
It("queues a BLOCKED frame if the stream is flow control blocked", func() {
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
// don't use offset 6 here, to make sure the BLOCKED frame contains the number returned by the flow controller
mockFC.EXPECT().IsBlocked().Return(true, protocol.ByteCount(10))
done := make(chan struct{})
go func() {
defer GinkgoRecover()
_, err := str.Write([]byte("foobar"))
Expect(err).ToNot(HaveOccurred())
close(done)
}()
var f *wire.StreamFrame
Eventually(func() *wire.StreamFrame {
f = str.PopStreamFrame(1000)
return f
}).ShouldNot(BeNil())
Expect(queuedControlFrames).To(Equal([]wire.Frame{
&wire.StreamBlockedFrame{
StreamID: streamID,
Offset: 10,
},
}))
Expect(onDataCalled).To(BeTrue())
Eventually(done).Should(BeClosed())
})
It("doesn't queue a BLOCKED frame if the stream is flow control blocked, but the frame popped has the FIN bit set", func() {
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
// don't EXPECT a call to IsBlocked
done := make(chan struct{})
go func() {
defer GinkgoRecover()
_, err := str.Write([]byte("foobar"))
Expect(err).ToNot(HaveOccurred())
close(done)
}()
Consistently(done).ShouldNot(BeClosed())
Expect(str.Close()).To(Succeed())
var f *wire.StreamFrame
Eventually(func() *wire.StreamFrame {
f = str.PopStreamFrame(1000)
return f
}).ShouldNot(BeNil())
Expect(f.FinBit).To(BeTrue())
Expect(queuedControlFrames).To(BeEmpty())
Eventually(done).Should(BeClosed())
})
})
Context("deadlines", func() {
It("returns an error when Write is called after the deadline", func() {
str.SetWriteDeadline(time.Now().Add(-time.Second))
@ -169,6 +222,7 @@ var _ = Describe("Send Stream", func() {
It("returns the number of bytes written, when the deadline expires", func() {
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(10000)).AnyTimes()
mockFC.EXPECT().AddBytesSent(gomock.Any())
mockFC.EXPECT().IsBlocked()
deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
str.SetWriteDeadline(deadline)
var n int
@ -241,10 +295,11 @@ var _ = Describe("Send Stream", func() {
Expect(f.FinBit).To(BeTrue())
})
It("doesn't allow FIN when there's still data", func() {
It("doesn't send a FIN when there's still data", func() {
frameHeaderLen := protocol.ByteCount(4)
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2)
mockFC.EXPECT().IsBlocked()
str.dataForWriting = []byte("foobar")
str.Close()
f := str.PopStreamFrame(3 + frameHeaderLen)
@ -285,6 +340,7 @@ var _ = Describe("Send Stream", func() {
It("doesn't get data for writing if an error occurred", func() {
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
mockFC.EXPECT().AddBytesSent(gomock.Any())
mockFC.EXPECT().IsBlocked()
done := make(chan struct{})
go func() {
defer GinkgoRecover()
@ -324,6 +380,7 @@ var _ = Describe("Send Stream", func() {
It("unblocks Write", func() {
mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount)
mockFC.EXPECT().AddBytesSent(gomock.Any())
mockFC.EXPECT().IsBlocked()
writeReturned := make(chan struct{})
var n int
go func() {
@ -421,26 +478,6 @@ var _ = Describe("Send Stream", func() {
})
})
Context("flow control", func() {
It("says when it's flow control blocked", func() {
mockFC.EXPECT().IsBlocked().Return(false, protocol.ByteCount(0))
blocked, _ := str.IsFlowControlBlocked()
Expect(blocked).To(BeFalse())
mockFC.EXPECT().IsBlocked().Return(true, protocol.ByteCount(0x1337))
blocked, offset := str.IsFlowControlBlocked()
Expect(blocked).To(BeTrue())
Expect(offset).To(Equal(protocol.ByteCount(0x1337)))
})
It("updates the flow control window", func() {
mockFC.EXPECT().UpdateSendWindow(protocol.ByteCount(0x42))
str.HandleMaxStreamDataFrame(&wire.MaxStreamDataFrame{
StreamID: streamID,
ByteOffset: 0x42,
})
})
})
Context("saying if it is finished", func() {
It("is finished after it is closed for shutdown", func() {
str.CloseForShutdown(errors.New("testErr"))

View file

@ -26,7 +26,6 @@ type streamI interface {
// methods needed for flow control
GetWindowUpdate() protocol.ByteCount
HandleMaxStreamDataFrame(*wire.MaxStreamDataFrame)
IsFlowControlBlocked() (bool, protocol.ByteCount)
}
// A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface

View file

@ -14,7 +14,7 @@ type streamFramer struct {
connFlowController flowcontrol.ConnectionFlowController
retransmissionQueue []*wire.StreamFrame
blockedFrameQueue []wire.Frame
blockedFrameQueue []*wire.BlockedFrame
}
func newStreamFramer(
@ -104,15 +104,6 @@ func (f *streamFramer) maybePopNormalFrames(maxTotalLen protocol.ByteCount) (res
return true, nil
}
// Finally, check if we are now FC blocked and should queue a BLOCKED frame
if !frame.FinBit {
if blocked, offset := s.IsFlowControlBlocked(); blocked {
f.blockedFrameQueue = append(f.blockedFrameQueue, &wire.StreamBlockedFrame{
StreamID: s.StreamID(),
Offset: offset,
})
}
}
if blocked, offset := f.connFlowController.IsBlocked(); blocked {
f.blockedFrameQueue = append(f.blockedFrameQueue, &wire.BlockedFrame{Offset: offset})
}

View file

@ -70,10 +70,8 @@ var _ = Describe("Stream Framer", func() {
Context("Popping", func() {
BeforeEach(func() {
// nothing is blocked here
// we're not connection-level flow control blocked
connFC.EXPECT().IsBlocked().AnyTimes()
stream1.EXPECT().IsFlowControlBlocked().Return(false, protocol.ByteCount(0)).AnyTimes()
stream2.EXPECT().IsFlowControlBlocked().Return(false, protocol.ByteCount(0)).AnyTimes()
})
It("returns nil when popping an empty framer", func() {
@ -261,27 +259,6 @@ var _ = Describe("Stream Framer", func() {
})
Context("BLOCKED frames", func() {
It("Pop returns nil if no frame is queued", func() {
Expect(framer.PopBlockedFrame()).To(BeNil())
})
It("queues and pops BLOCKED frames for individually blocked streams", func() {
setNoData(stream2)
connFC.EXPECT().IsBlocked()
stream1.EXPECT().PopStreamFrame(gomock.Any()).Return(&wire.StreamFrame{
StreamID: id1,
Data: []byte("foobar"),
})
stream1.EXPECT().IsFlowControlBlocked().Return(true, protocol.ByteCount(0x1234))
frames := framer.PopStreamFrames(1000)
Expect(frames).To(HaveLen(1))
Expect(framer.PopBlockedFrame()).To(Equal(&wire.StreamBlockedFrame{
StreamID: stream1.StreamID(),
Offset: 0x1234,
}))
Expect(framer.PopBlockedFrame()).To(BeNil())
})
It("doesn't queue a stream-level BLOCKED frame after sending the FIN bit frame", func() {
setNoData(stream2)
f := &wire.StreamFrame{
@ -305,7 +282,6 @@ var _ = Describe("Stream Framer", func() {
StreamID: id1,
Data: []byte("foo"),
})
stream1.EXPECT().IsFlowControlBlocked().Return(false, protocol.ByteCount(0))
framer.PopStreamFrames(1000)
Expect(framer.PopBlockedFrame()).To(Equal(&wire.BlockedFrame{Offset: 0x4321}))
Expect(framer.PopBlockedFrame()).To(BeNil())

View file

@ -138,6 +138,7 @@ var _ = Describe("Stream", func() {
str.version = versionGQUICFrames
mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).AnyTimes()
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
mockFC.EXPECT().IsBlocked()
err := str.CancelRead(1234)
Expect(err).ToNot(HaveOccurred())
Expect(queuedControlFrames).To(BeEmpty()) // no RST_STREAM frame queued yet