From f49451ce3c3251b0526497a16bca618cf7439b1a Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 29 Aug 2019 19:09:05 +0700 Subject: [PATCH] queue lost STREAM frames in the stream's retransmission queue --- packet_packer.go | 3 + packet_packer_test.go | 38 ++++------- retransmission_queue.go | 22 +------ retransmission_queue_test.go | 38 ----------- send_stream.go | 90 +++++++++++++------------ send_stream_test.go | 124 +++++++++++++++++++++++++++++++---- 6 files changed, 178 insertions(+), 137 deletions(-) diff --git a/packet_packer.go b/packet_packer.go index 999fa610..adaced6e 100644 --- a/packet_packer.go +++ b/packet_packer.go @@ -68,6 +68,9 @@ func (p *packedPacket) ToAckHandlerPacket(q *retransmissionQueue) *ackhandler.Pa } encLevel := p.EncryptionLevel() for i := range p.frames { + if p.frames[i].OnLost != nil { + continue + } switch encLevel { case protocol.EncryptionInitial: p.frames[i].OnLost = q.AddInitial diff --git a/packet_packer_test.go b/packet_packer_test.go index db6b30a6..669ba1b9 100644 --- a/packet_packer_test.go +++ b/packet_packer_test.go @@ -447,29 +447,6 @@ var _ = Describe("Packet packer", func() { Expect(p.frames[2].Frame.(*wire.StreamFrame).Data).To(Equal([]byte("frame 3"))) }) - It("adds retransmissions", func() { - f1 := &wire.StreamFrame{Data: []byte("frame 1")} - cf := &wire.MaxDataFrame{ByteOffset: 0x42} - retransmissionQueue.AddAppData(f1) - retransmissionQueue.AddAppData(cf) - pnManager.EXPECT().PeekPacketNumber(protocol.Encryption1RTT).Return(protocol.PacketNumber(0x42), protocol.PacketNumberLen2) - pnManager.EXPECT().PopPacketNumber(protocol.Encryption1RTT).Return(protocol.PacketNumber(0x42)) - sealingManager.EXPECT().Get1RTTSealer().Return(sealer, nil) - ackFramer.EXPECT().GetAckFrame(protocol.Encryption1RTT) - expectAppendControlFrames() - f2 := &wire.StreamFrame{Data: []byte("frame 2")} - expectAppendStreamFrames(ackhandler.Frame{Frame: f2}) - p, err := packer.PackPacket() - Expect(p).ToNot(BeNil()) - Expect(err).ToNot(HaveOccurred()) - Expect(p.frames).To(HaveLen(3)) - Expect(p.frames[0].Frame).To(BeAssignableToTypeOf(&wire.StreamFrame{})) - Expect(p.frames[0].Frame.(*wire.StreamFrame).Data).To(Equal([]byte("frame 1"))) - Expect(p.frames[1].Frame).To(Equal(cf)) - Expect(p.frames[2].Frame).To(BeAssignableToTypeOf(&wire.StreamFrame{})) - Expect(p.frames[2].Frame.(*wire.StreamFrame).Data).To(Equal([]byte("frame 2"))) - }) - Context("making ACK packets ack-eliciting", func() { sendMaxNumNonAckElicitingAcks := func() { for i := 0; i < protocol.MaxNonAckElicitingAcks; i++ { @@ -807,13 +784,20 @@ var _ = Describe("Converting to AckHandler packets", func() { Expect(p.LargestAcked).To(Equal(protocol.InvalidPacketNumber)) }) - It("sets the OnLost callback", func() { + It("doesn't overwrite the OnLost callback, if it is set", func() { + var pingLost bool packet := &packedPacket{ - header: &wire.ExtendedHeader{Header: wire.Header{}}, - frames: []ackhandler.Frame{{Frame: &wire.MaxDataFrame{}}}, - raw: []byte("foobar"), + header: &wire.ExtendedHeader{Header: wire.Header{Type: protocol.PacketTypeHandshake}}, + frames: []ackhandler.Frame{ + {Frame: &wire.MaxDataFrame{}}, + {Frame: &wire.PingFrame{}, OnLost: func(wire.Frame) { pingLost = true }}, + }, + raw: []byte("foobar"), } p := packet.ToAckHandlerPacket(newRetransmissionQueue(protocol.VersionTLS)) + Expect(p.Frames).To(HaveLen(2)) Expect(p.Frames[0].OnLost).ToNot(BeNil()) + p.Frames[1].OnLost(nil) + Expect(pingLost).To(BeTrue()) }) }) diff --git a/retransmission_queue.go b/retransmission_queue.go index 0de60d1f..741a57a1 100644 --- a/retransmission_queue.go +++ b/retransmission_queue.go @@ -14,8 +14,7 @@ type retransmissionQueue struct { handshake []wire.Frame handshakeCryptoData []*wire.CryptoFrame - appData []wire.Frame - streamData []*wire.StreamFrame + appData []wire.Frame version protocol.VersionNumber } @@ -49,10 +48,8 @@ func (q *retransmissionQueue) HasHandshakeData() bool { } func (q *retransmissionQueue) AddAppData(f wire.Frame) { - if sf, ok := f.(*wire.StreamFrame); ok { - sf.DataLenPresent = true - q.streamData = append(q.streamData, sf) - return + if _, ok := f.(*wire.StreamFrame); ok { + panic("STREAM frames are handled with their respective streams.") } q.appData = append(q.appData, f) } @@ -94,19 +91,6 @@ func (q *retransmissionQueue) GetHandshakeFrame(maxLen protocol.ByteCount) wire. } func (q *retransmissionQueue) GetAppDataFrame(maxLen protocol.ByteCount) wire.Frame { - if len(q.streamData) > 0 { - f := q.streamData[0] - if f.Length(q.version) <= maxLen { - q.streamData = q.streamData[1:] - return f - } - if maxLen >= protocol.MinStreamFrameSize { - newFrame, needsSplit := f.MaybeSplitOffFrame(maxLen, q.version) - if needsSplit && newFrame != nil { - return newFrame - } - } - } if len(q.appData) == 0 { return nil } diff --git a/retransmission_queue_test.go b/retransmission_queue_test.go index c6c4b963..9bfafe18 100644 --- a/retransmission_queue_test.go +++ b/retransmission_queue_test.go @@ -1,8 +1,6 @@ package quic import ( - "math/rand" - "github.com/lucas-clemente/quic-go/internal/protocol" "github.com/lucas-clemente/quic-go/internal/wire" @@ -118,41 +116,5 @@ var _ = Describe("Retransmission queue", func() { Expect(q.GetAppDataFrame(f.Length(version) - 1)).To(BeNil()) Expect(q.GetAppDataFrame(f.Length(version))).To(Equal(f)) }) - - It("queues and retrieves a STREAM frame", func() { - f := &wire.StreamFrame{Data: []byte("foobar")} - q.AddAppData(f) - Expect(q.GetAppDataFrame(f.Length(version) - 1)).To(BeNil()) - Expect(q.GetAppDataFrame(f.Length(version))).To(Equal(f)) - }) - - It("splits STREAM frames larger than MinStreamFrameSize", func() { - data := make([]byte, 1000) - rand.Read(data) - f := &wire.StreamFrame{ - Data: data, - FinBit: true, - } - q.AddAppData(f) - Expect(q.GetAppDataFrame(protocol.MinStreamFrameSize - 1)).To(BeNil()) - f1 := q.GetAppDataFrame(protocol.MinStreamFrameSize).(*wire.StreamFrame) - Expect(f1).ToNot(BeNil()) - Expect(f1.Length(version)).To(Equal(protocol.MinStreamFrameSize)) - Expect(f1.FinBit).To(BeFalse()) - Expect(f1.Data).To(Equal(data[:f1.DataLen()])) - f2 := q.GetAppDataFrame(protocol.MaxByteCount).(*wire.StreamFrame) - Expect(f2).ToNot(BeNil()) - Expect(f2.FinBit).To(BeTrue()) - Expect(f1.DataLen() + f2.DataLen()).To(BeEquivalentTo(1000)) - Expect(f2.Data).To(Equal(data[f1.DataLen():])) - Expect(q.GetAppDataFrame(protocol.MaxByteCount)).To(BeNil()) - }) - - It("returns a control frame if it doesn't split a STREAM frame", func() { - cf := &wire.MaxDataFrame{ByteOffset: 0x42} - q.AddAppData(&wire.StreamFrame{Data: make([]byte, 1000)}) - q.AddAppData(cf) - Expect(q.GetAppDataFrame(protocol.MinStreamFrameSize - 1)).To(Equal(cf)) - }) }) }) diff --git a/send_stream.go b/send_stream.go index d618ccd8..c674e236 100644 --- a/send_stream.go +++ b/send_stream.go @@ -26,7 +26,8 @@ type sendStreamI interface { type sendStream struct { mutex sync.Mutex - retransmissionQueue []*wire.StreamFrame + numOutstandingFrames int64 + retransmissionQueue []*wire.StreamFrame ctx context.Context ctxCancel context.CancelFunc @@ -151,33 +152,30 @@ func (s *sendStream) Write(p []byte) (int, error) { // maxBytes is the maximum length this frame (including frame header) will have. func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*ackhandler.Frame, bool /* has more data to send */) { s.mutex.Lock() + frame, hasMoreData := s.popStreamFrameImpl(maxBytes) + if frame != nil { + s.numOutstandingFrames++ + } + s.mutex.Unlock() + + return frame, hasMoreData +} + +func (s *sendStream) popStreamFrameImpl(maxBytes protocol.ByteCount) (*ackhandler.Frame, bool /* has more data to send */) { if len(s.retransmissionQueue) > 0 { frame, hasMoreRetransmissions := s.maybeGetRetransmission(maxBytes) if frame != nil || hasMoreRetransmissions { - s.mutex.Unlock() - // We always claim that we have more data to send. - // This might be incorrect, in which case there'll be a spurious call to popStreamFrame in the future. if frame == nil { return nil, true } - return &ackhandler.Frame{Frame: frame}, true + // We always claim that we have more data to send. + // This might be incorrect, in which case there'll be a spurious call to popStreamFrame in the future. + return &ackhandler.Frame{Frame: frame, OnLost: s.queueRetransmission, OnAcked: s.frameAcked}, true } } - completed, frame, hasMoreData := s.popStreamFrameImpl(maxBytes) - s.mutex.Unlock() - if completed { - s.sender.onStreamCompleted(s.streamID) - } - if frame == nil { - return nil, hasMoreData - } - return &ackhandler.Frame{Frame: frame}, hasMoreData -} - -func (s *sendStream) popStreamFrameImpl(maxBytes protocol.ByteCount) (bool /* completed */, *wire.StreamFrame, bool /* has more data to send */) { if s.canceledWrite || s.closeForShutdownErr != nil { - return false, nil, false + return nil, false } frame := &wire.StreamFrame{ @@ -187,7 +185,7 @@ func (s *sendStream) popStreamFrameImpl(maxBytes protocol.ByteCount) (bool /* co } maxDataLen := frame.MaxDataLen(maxBytes, s.version) if maxDataLen == 0 { // a STREAM frame must have at least one byte of data - return false, nil, s.dataForWriting != nil + return nil, s.dataForWriting != nil } frame.Data, frame.FinBit = s.getDataForWriting(maxDataLen) if len(frame.Data) == 0 && !frame.FinBit { @@ -196,21 +194,21 @@ func (s *sendStream) popStreamFrameImpl(maxBytes protocol.ByteCount) (bool /* co // - there's data for writing, but the stream is stream-level flow control blocked // - there's data for writing, but the stream is connection-level flow control blocked if s.dataForWriting == nil { - return false, nil, false + return nil, false } if isBlocked, offset := s.flowController.IsNewlyBlocked(); isBlocked { s.sender.queueControlFrame(&wire.StreamDataBlockedFrame{ StreamID: s.streamID, DataLimit: offset, }) - return false, nil, false + return nil, false } - return false, nil, true + return nil, true } if frame.FinBit { s.finSent = true } - return frame.FinBit, frame, s.dataForWriting != nil + return &ackhandler.Frame{Frame: frame, OnLost: s.queueRetransmission, OnAcked: s.frameAcked}, s.dataForWriting != nil } func (s *sendStream) maybeGetRetransmission(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool /* has more retransmissions */) { @@ -256,11 +254,32 @@ func (s *sendStream) getDataForWriting(maxBytes protocol.ByteCount) ([]byte, boo return ret, s.finishedWriting && s.dataForWriting == nil && !s.finSent } +func (s *sendStream) frameAcked() { + var streamCompleted bool + s.mutex.Lock() + s.numOutstandingFrames-- + if s.numOutstandingFrames < 0 { + panic("numOutStandingFrames negative") + } + if (s.finSent || s.canceledWrite) && s.numOutstandingFrames == 0 && len(s.retransmissionQueue) == 0 { + streamCompleted = true + } + s.mutex.Unlock() + + if streamCompleted { + s.sender.onStreamCompleted(s.streamID) + } +} + func (s *sendStream) queueRetransmission(f wire.Frame) { sf := f.(*wire.StreamFrame) sf.DataLenPresent = true s.mutex.Lock() s.retransmissionQueue = append(s.retransmissionQueue, sf) + s.numOutstandingFrames-- + if s.numOutstandingFrames < 0 { + panic("numOutStandingFrames negative") + } s.mutex.Unlock() s.sender.onHasStreamData(s.streamID) @@ -280,20 +299,17 @@ func (s *sendStream) Close() error { return nil } +// TODO: also complete the stream when this is called after all outstanding data has been acknowledged func (s *sendStream) CancelWrite(errorCode protocol.ApplicationErrorCode) { s.mutex.Lock() - completed := s.cancelWriteImpl(errorCode, fmt.Errorf("Write on stream %d canceled with error code %d", s.streamID, errorCode)) + s.cancelWriteImpl(errorCode, fmt.Errorf("Write on stream %d canceled with error code %d", s.streamID, errorCode)) s.mutex.Unlock() - - if completed { - s.sender.onStreamCompleted(s.streamID) // must be called without holding the mutex - } } // must be called after locking the mutex -func (s *sendStream) cancelWriteImpl(errorCode protocol.ApplicationErrorCode, writeErr error) bool /*completed */ { +func (s *sendStream) cancelWriteImpl(errorCode protocol.ApplicationErrorCode, writeErr error) { if s.canceledWrite || s.finishedWriting { - return false + return } s.canceledWrite = true s.cancelWriteErr = writeErr @@ -305,7 +321,6 @@ func (s *sendStream) cancelWriteImpl(errorCode protocol.ApplicationErrorCode, wr }) // TODO(#991): cancel retransmissions for this stream s.ctxCancel() - return true } func (s *sendStream) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) { @@ -319,23 +334,16 @@ func (s *sendStream) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) { } } +// TODO: also complete the stream when the frame is received after all outstanding data has been acknowledged func (s *sendStream) handleStopSendingFrame(frame *wire.StopSendingFrame) { s.mutex.Lock() - completed := s.handleStopSendingFrameImpl(frame) - s.mutex.Unlock() + defer s.mutex.Unlock() - if completed { - s.sender.onStreamCompleted(s.streamID) - } -} - -// must be called after locking the mutex -func (s *sendStream) handleStopSendingFrameImpl(frame *wire.StopSendingFrame) bool /*completed*/ { writeErr := streamCanceledError{ errorCode: frame.ErrorCode, error: fmt.Errorf("Stream %d was reset with error code %d", s.streamID, frame.ErrorCode), } - return s.cancelWriteImpl(frame.ErrorCode, writeErr) + s.cancelWriteImpl(frame.ErrorCode, writeErr) } func (s *sendStream) Context() context.Context { diff --git a/send_stream_test.go b/send_stream_test.go index 089f074b..db3c27fe 100644 --- a/send_stream_test.go +++ b/send_stream_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/golang/mock/gomock" + "github.com/lucas-clemente/quic-go/internal/ackhandler" "github.com/lucas-clemente/quic-go/internal/mocks" "github.com/lucas-clemente/quic-go/internal/protocol" "github.com/lucas-clemente/quic-go/internal/wire" @@ -405,7 +406,6 @@ var _ = Describe("Send Stream", func() { It("allows FIN", func() { mockSender.EXPECT().onHasStreamData(streamID) - mockSender.EXPECT().onStreamCompleted(streamID) str.Close() frame, hasMoreData := str.popStreamFrame(1000) f := frame.Frame.(*wire.StreamFrame) @@ -427,7 +427,6 @@ var _ = Describe("Send Stream", func() { Expect(f).ToNot(BeNil()) Expect(f.Data).To(Equal([]byte("foo"))) Expect(f.FinBit).To(BeFalse()) - mockSender.EXPECT().onStreamCompleted(streamID) frame, _ = str.popStreamFrame(100) f = frame.Frame.(*wire.StreamFrame) Expect(f.Data).To(Equal([]byte("bar"))) @@ -443,7 +442,6 @@ var _ = Describe("Send Stream", func() { It("doesn't allow FIN twice", func() { mockSender.EXPECT().onHasStreamData(streamID) - mockSender.EXPECT().onStreamCompleted(streamID) str.Close() frame, _ := str.popStreamFrame(1000) f := frame.Frame.(*wire.StreamFrame) @@ -534,14 +532,12 @@ var _ = Describe("Send Stream", func() { ByteOffset: 1234, ErrorCode: 9876, }) - mockSender.EXPECT().onStreamCompleted(streamID) str.writeOffset = 1234 str.CancelWrite(9876) }) It("unblocks Write", func() { mockSender.EXPECT().onHasStreamData(streamID) - mockSender.EXPECT().onStreamCompleted(streamID) mockSender.EXPECT().queueControlFrame(gomock.Any()) mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount) mockFC.EXPECT().AddBytesSent(gomock.Any()) @@ -564,7 +560,6 @@ var _ = Describe("Send Stream", func() { It("doesn't pop STREAM frames after being canceled", func() { mockSender.EXPECT().onHasStreamData(streamID) - mockSender.EXPECT().onStreamCompleted(streamID) mockSender.EXPECT().queueControlFrame(gomock.Any()) mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount) mockFC.EXPECT().AddBytesSent(gomock.Any()) @@ -588,7 +583,6 @@ var _ = Describe("Send Stream", func() { It("cancels the context", func() { mockSender.EXPECT().queueControlFrame(gomock.Any()) - mockSender.EXPECT().onStreamCompleted(streamID) Expect(str.Context().Done()).ToNot(BeClosed()) str.CancelWrite(1234) Expect(str.Context().Done()).To(BeClosed()) @@ -596,7 +590,6 @@ var _ = Describe("Send Stream", func() { It("doesn't allow further calls to Write", func() { mockSender.EXPECT().queueControlFrame(gomock.Any()) - mockSender.EXPECT().onStreamCompleted(streamID) str.CancelWrite(1234) _, err := strWithTimeout.Write([]byte("foobar")) Expect(err).To(MatchError("Write on stream 1337 canceled with error code 1234")) @@ -604,7 +597,6 @@ var _ = Describe("Send Stream", func() { It("only cancels once", func() { mockSender.EXPECT().queueControlFrame(&wire.ResetStreamFrame{StreamID: streamID, ErrorCode: 1234}) - mockSender.EXPECT().onStreamCompleted(streamID) str.CancelWrite(1234) str.CancelWrite(4321) }) @@ -623,7 +615,6 @@ var _ = Describe("Send Stream", func() { StreamID: streamID, ErrorCode: 101, }) - mockSender.EXPECT().onStreamCompleted(streamID) str.handleStopSendingFrame(&wire.StopSendingFrame{ StreamID: streamID, ErrorCode: 101, @@ -644,7 +635,6 @@ var _ = Describe("Send Stream", func() { close(done) }() waitForWrite() - mockSender.EXPECT().onStreamCompleted(streamID) str.handleStopSendingFrame(&wire.StopSendingFrame{ StreamID: streamID, ErrorCode: 123, @@ -654,7 +644,6 @@ var _ = Describe("Send Stream", func() { It("doesn't allow further calls to Write", func() { mockSender.EXPECT().queueControlFrame(gomock.Any()) - mockSender.EXPECT().onStreamCompleted(streamID) str.handleStopSendingFrame(&wire.StopSendingFrame{ StreamID: streamID, ErrorCode: 123, @@ -670,6 +659,7 @@ var _ = Describe("Send Stream", func() { Context("retransmissions", func() { It("queues and retrieves frames", func() { + str.numOutstandingFrames = 1 f := &wire.StreamFrame{ Data: []byte("foobar"), Offset: 0x42, @@ -686,6 +676,7 @@ var _ = Describe("Send Stream", func() { }) It("splits a retransmission", func() { + str.numOutstandingFrames = 1 sf := &wire.StreamFrame{ Data: []byte("foobar"), Offset: 0x42, @@ -709,6 +700,7 @@ var _ = Describe("Send Stream", func() { }) It("returns nil if the size is too small", func() { + str.numOutstandingFrames = 1 f := &wire.StreamFrame{ Data: []byte("foobar"), Offset: 0x42, @@ -720,5 +712,113 @@ var _ = Describe("Send Stream", func() { Expect(hasMoreData).To(BeTrue()) Expect(frame).To(BeNil()) }) + + It("queues lost STREAM frames", func() { + mockSender.EXPECT().onHasStreamData(streamID).Times(2) + mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)) + mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6)) + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + _, err := strWithTimeout.Write([]byte("foobar")) + Expect(err).ToNot(HaveOccurred()) + close(done) + }() + waitForWrite() + frame, _ := str.popStreamFrame(protocol.MaxByteCount) + Eventually(done).Should(BeClosed()) + Expect(frame).ToNot(BeNil()) + Expect(frame.Frame.(*wire.StreamFrame).Data).To(Equal([]byte("foobar"))) + + // now lose the frame + frame.OnLost(frame.Frame) + newFrame, _ := str.popStreamFrame(protocol.MaxByteCount) + Expect(newFrame).ToNot(BeNil()) + Expect(newFrame.Frame.(*wire.StreamFrame).Data).To(Equal([]byte("foobar"))) + }) + }) + + Context("determining when a stream is completed", func() { + BeforeEach(func() { + mockSender.EXPECT().onHasStreamData(streamID).AnyTimes() + mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).AnyTimes() + mockFC.EXPECT().AddBytesSent(gomock.Any()).AnyTimes() + }) + + It("says when a stream is completed", func() { + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + _, err := strWithTimeout.Write(make([]byte, 1000)) + Expect(err).ToNot(HaveOccurred()) + close(done) + }() + waitForWrite() + + // get a bunch of small frames (max. 20 bytes) + var frames []ackhandler.Frame + for { + frame, hasMoreData := str.popStreamFrame(200) + if frame == nil { + continue + } + frames = append(frames, *frame) + if !hasMoreData { + break + } + } + Eventually(done).Should(BeClosed()) + + // Acknowledge all frames. + // We don't expect the stream to be completed, since we still need to send the FIN. + for _, f := range frames { + f.OnAcked() + } + + // Now close the stream and acknowledge the FIN. + Expect(str.Close()).To(Succeed()) + frame, _ := str.popStreamFrame(protocol.MaxByteCount) + Expect(frame).ToNot(BeNil()) + mockSender.EXPECT().onStreamCompleted(streamID) + frame.OnAcked() + }) + + It("doesn't say it's completed when there are frames waiting to be retransmitted", func() { + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + _, err := strWithTimeout.Write(make([]byte, 10)) + Expect(err).ToNot(HaveOccurred()) + Expect(str.Close()).To(Succeed()) + close(done) + }() + waitForWrite() + + // get a bunch of small frames (max. 20 bytes) + var frames []ackhandler.Frame + for { + frame, _ := str.popStreamFrame(protocol.MaxByteCount) + if frame == nil { + continue + } + frames = append(frames, *frame) + if frame.Frame.(*wire.StreamFrame).FinBit { + break + } + } + Eventually(done).Should(BeClosed()) + + // lose the first frame, acknowledge all others + for _, f := range frames[1:] { + f.OnAcked() + } + frames[0].OnLost(frames[0].Frame) + + // get the retransmission and acknowledge it + ret, _ := str.popStreamFrame(protocol.MaxByteCount) + Expect(ret).ToNot(BeNil()) + mockSender.EXPECT().onStreamCompleted(streamID) + ret.OnAcked() + }) }) })