implement a STREAM frame retransmission queue in the sendStream

This commit is contained in:
Marten Seemann 2019-08-29 12:04:31 +07:00
parent dd6e8a7424
commit 93d5d15e3b
3 changed files with 86 additions and 2 deletions

View file

@ -105,7 +105,10 @@ func (f *framerI) AppendStreamFrames(frames []wire.Frame, maxLen protocol.ByteCo
} else { // no more data to send. Stream is not active any more
delete(f.activeStreams, id)
}
if frame == nil { // can happen if the receiveStream was canceled after it said it had data
// The frame can be nil
// * if the receiveStream was canceled after it said it had data
// * the remaining size doesn't allow us to add another STREAM frame
if frame == nil {
continue
}
frames = append(frames, frame)

View file

@ -24,6 +24,8 @@ type sendStreamI interface {
type sendStream struct {
mutex sync.Mutex
retransmissionQueue []*wire.StreamFrame
ctx context.Context
ctxCancel context.CancelFunc
@ -147,6 +149,15 @@ func (s *sendStream) Write(p []byte) (int, error) {
// maxBytes is the maximum length this frame (including frame header) will have.
func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool /* has more data to send */) {
s.mutex.Lock()
if len(s.retransmissionQueue) > 0 {
frame, hasMoreRetransmissions := s.maybeGetRetransmission(maxBytes)
if frame != nil || hasMoreRetransmissions {
s.mutex.Unlock()
// We always claim that we have more data to send.
// This might be incorrect, in which case there'll be a spurious call to popStreamFrame in the future.
return frame, true
}
}
completed, frame, hasMoreData := s.popStreamFrameImpl(maxBytes)
s.mutex.Unlock()
@ -194,6 +205,16 @@ func (s *sendStream) popStreamFrameImpl(maxBytes protocol.ByteCount) (bool /* co
return frame.FinBit, frame, s.dataForWriting != nil
}
func (s *sendStream) maybeGetRetransmission(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool /* has more retransmissions */) {
f := s.retransmissionQueue[0]
newFrame, needsSplit := f.MaybeSplitOffFrame(maxBytes, s.version)
if needsSplit {
return newFrame, true
}
s.retransmissionQueue = s.retransmissionQueue[1:]
return f, len(s.retransmissionQueue) > 0
}
func (s *sendStream) hasData() bool {
s.mutex.Lock()
hasData := len(s.dataForWriting) > 0
@ -227,6 +248,15 @@ func (s *sendStream) getDataForWriting(maxBytes protocol.ByteCount) ([]byte, boo
return ret, s.finishedWriting && s.dataForWriting == nil && !s.finSent
}
func (s *sendStream) queueRetransmission(f *wire.StreamFrame) {
f.DataLenPresent = true
s.mutex.Lock()
s.retransmissionQueue = append(s.retransmissionQueue, f)
s.mutex.Unlock()
s.sender.onHasStreamData(s.streamID)
}
func (s *sendStream) Close() error {
s.mutex.Lock()
if s.canceledWrite {

View file

@ -517,7 +517,7 @@ var _ = Describe("Send Stream", func() {
})
})
Context("stream cancelations", func() {
Context("stream cancellations", func() {
Context("canceling writing", func() {
It("queues a RESET_STREAM frame", func() {
mockSender.EXPECT().queueControlFrame(&wire.ResetStreamFrame{
@ -658,4 +658,55 @@ var _ = Describe("Send Stream", func() {
})
})
})
Context("retransmissions", func() {
It("queues and retrieves frames", func() {
f := &wire.StreamFrame{
Data: []byte("foobar"),
Offset: 0x42,
DataLenPresent: false,
}
mockSender.EXPECT().onHasStreamData(streamID)
str.queueRetransmission(f)
frame, _ := str.popStreamFrame(protocol.MaxByteCount)
Expect(frame).ToNot(BeNil())
Expect(frame.Offset).To(Equal(protocol.ByteCount(0x42)))
Expect(frame.Data).To(Equal([]byte("foobar")))
Expect(frame.DataLenPresent).To(BeTrue())
})
It("splits a retransmission", func() {
f := &wire.StreamFrame{
Data: []byte("foobar"),
Offset: 0x42,
DataLenPresent: false,
}
mockSender.EXPECT().onHasStreamData(streamID)
str.queueRetransmission(f)
frame, hasMoreData := str.popStreamFrame(f.Length(str.version) - 3)
Expect(hasMoreData).To(BeTrue())
Expect(frame).ToNot(BeNil())
Expect(frame.Offset).To(Equal(protocol.ByteCount(0x42)))
Expect(frame.Data).To(Equal([]byte("foo")))
Expect(frame.DataLenPresent).To(BeTrue())
frame, _ = str.popStreamFrame(protocol.MaxByteCount)
Expect(frame).ToNot(BeNil())
Expect(frame.Offset).To(Equal(protocol.ByteCount(0x45)))
Expect(frame.Data).To(Equal([]byte("bar")))
Expect(frame.DataLenPresent).To(BeTrue())
})
It("returns nil if the size is too small", func() {
f := &wire.StreamFrame{
Data: []byte("foobar"),
Offset: 0x42,
DataLenPresent: false,
}
mockSender.EXPECT().onHasStreamData(streamID)
str.queueRetransmission(f)
frame, hasMoreData := str.popStreamFrame(2)
Expect(hasMoreData).To(BeTrue())
Expect(frame).To(BeNil())
})
})
})