diff --git a/framer.go b/framer.go index 84cf1a8e..117f01c1 100644 --- a/framer.go +++ b/framer.go @@ -105,7 +105,10 @@ func (f *framerI) AppendStreamFrames(frames []wire.Frame, maxLen protocol.ByteCo } else { // no more data to send. Stream is not active any more delete(f.activeStreams, id) } - if frame == nil { // can happen if the receiveStream was canceled after it said it had data + // The frame can be nil + // * if the receiveStream was canceled after it said it had data + // * the remaining size doesn't allow us to add another STREAM frame + if frame == nil { continue } frames = append(frames, frame) diff --git a/send_stream.go b/send_stream.go index 3def448e..a4814d7d 100644 --- a/send_stream.go +++ b/send_stream.go @@ -24,6 +24,8 @@ type sendStreamI interface { type sendStream struct { mutex sync.Mutex + retransmissionQueue []*wire.StreamFrame + ctx context.Context ctxCancel context.CancelFunc @@ -147,6 +149,15 @@ func (s *sendStream) Write(p []byte) (int, error) { // maxBytes is the maximum length this frame (including frame header) will have. func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool /* has more data to send */) { s.mutex.Lock() + if len(s.retransmissionQueue) > 0 { + frame, hasMoreRetransmissions := s.maybeGetRetransmission(maxBytes) + if frame != nil || hasMoreRetransmissions { + s.mutex.Unlock() + // We always claim that we have more data to send. + // This might be incorrect, in which case there'll be a spurious call to popStreamFrame in the future. + return frame, true + } + } completed, frame, hasMoreData := s.popStreamFrameImpl(maxBytes) s.mutex.Unlock() @@ -194,6 +205,16 @@ func (s *sendStream) popStreamFrameImpl(maxBytes protocol.ByteCount) (bool /* co return frame.FinBit, frame, s.dataForWriting != nil } +func (s *sendStream) maybeGetRetransmission(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool /* has more retransmissions */) { + f := s.retransmissionQueue[0] + newFrame, needsSplit := f.MaybeSplitOffFrame(maxBytes, s.version) + if needsSplit { + return newFrame, true + } + s.retransmissionQueue = s.retransmissionQueue[1:] + return f, len(s.retransmissionQueue) > 0 +} + func (s *sendStream) hasData() bool { s.mutex.Lock() hasData := len(s.dataForWriting) > 0 @@ -227,6 +248,15 @@ func (s *sendStream) getDataForWriting(maxBytes protocol.ByteCount) ([]byte, boo return ret, s.finishedWriting && s.dataForWriting == nil && !s.finSent } +func (s *sendStream) queueRetransmission(f *wire.StreamFrame) { + f.DataLenPresent = true + s.mutex.Lock() + s.retransmissionQueue = append(s.retransmissionQueue, f) + s.mutex.Unlock() + + s.sender.onHasStreamData(s.streamID) +} + func (s *sendStream) Close() error { s.mutex.Lock() if s.canceledWrite { diff --git a/send_stream_test.go b/send_stream_test.go index efcd648c..97faa0d5 100644 --- a/send_stream_test.go +++ b/send_stream_test.go @@ -517,7 +517,7 @@ var _ = Describe("Send Stream", func() { }) }) - Context("stream cancelations", func() { + Context("stream cancellations", func() { Context("canceling writing", func() { It("queues a RESET_STREAM frame", func() { mockSender.EXPECT().queueControlFrame(&wire.ResetStreamFrame{ @@ -658,4 +658,55 @@ var _ = Describe("Send Stream", func() { }) }) }) + + Context("retransmissions", func() { + It("queues and retrieves frames", func() { + f := &wire.StreamFrame{ + Data: []byte("foobar"), + Offset: 0x42, + DataLenPresent: false, + } + mockSender.EXPECT().onHasStreamData(streamID) + str.queueRetransmission(f) + frame, _ := str.popStreamFrame(protocol.MaxByteCount) + Expect(frame).ToNot(BeNil()) + Expect(frame.Offset).To(Equal(protocol.ByteCount(0x42))) + Expect(frame.Data).To(Equal([]byte("foobar"))) + Expect(frame.DataLenPresent).To(BeTrue()) + }) + + It("splits a retransmission", func() { + f := &wire.StreamFrame{ + Data: []byte("foobar"), + Offset: 0x42, + DataLenPresent: false, + } + mockSender.EXPECT().onHasStreamData(streamID) + str.queueRetransmission(f) + frame, hasMoreData := str.popStreamFrame(f.Length(str.version) - 3) + Expect(hasMoreData).To(BeTrue()) + Expect(frame).ToNot(BeNil()) + Expect(frame.Offset).To(Equal(protocol.ByteCount(0x42))) + Expect(frame.Data).To(Equal([]byte("foo"))) + Expect(frame.DataLenPresent).To(BeTrue()) + frame, _ = str.popStreamFrame(protocol.MaxByteCount) + Expect(frame).ToNot(BeNil()) + Expect(frame.Offset).To(Equal(protocol.ByteCount(0x45))) + Expect(frame.Data).To(Equal([]byte("bar"))) + Expect(frame.DataLenPresent).To(BeTrue()) + }) + + It("returns nil if the size is too small", func() { + f := &wire.StreamFrame{ + Data: []byte("foobar"), + Offset: 0x42, + DataLenPresent: false, + } + mockSender.EXPECT().onHasStreamData(streamID) + str.queueRetransmission(f) + frame, hasMoreData := str.popStreamFrame(2) + Expect(hasMoreData).To(BeTrue()) + Expect(frame).To(BeNil()) + }) + }) })