don't use closures for passing OnLost and OnAcked STREAM frame callbacks (#3833)

This commit is contained in:
Marten Seemann 2023-06-02 14:14:04 +03:00 committed by GitHub
parent ad79149738
commit f8d24ef1e9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 327 additions and 250 deletions

View file

@ -85,8 +85,9 @@ var _ = Describe("Send Stream", func() {
waitForWrite()
mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount)
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
frame, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
f := frame.Frame.(*wire.StreamFrame)
frame, ok, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
Expect(ok).To(BeTrue())
f := frame.Frame
Expect(f.Data).To(Equal([]byte("foobar")))
Expect(f.Fin).To(BeFalse())
Expect(f.Offset).To(BeZero())
@ -109,19 +110,22 @@ var _ = Describe("Send Stream", func() {
waitForWrite()
mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).Times(2)
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(3)).Times(2)
frame, _ := str.popStreamFrame(expectedFrameHeaderLen(0)+3, protocol.Version1)
f := frame.Frame.(*wire.StreamFrame)
frame, ok, _ := str.popStreamFrame(expectedFrameHeaderLen(0)+3, protocol.Version1)
Expect(ok).To(BeTrue())
f := frame.Frame
Expect(f.Offset).To(BeZero())
Expect(f.Fin).To(BeFalse())
Expect(f.Data).To(Equal([]byte("foo")))
Expect(f.DataLenPresent).To(BeTrue())
frame, _ = str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
f = frame.Frame.(*wire.StreamFrame)
frame, ok, _ = str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
Expect(ok).To(BeTrue())
f = frame.Frame
Expect(f.Data).To(Equal([]byte("bar")))
Expect(f.Fin).To(BeFalse())
Expect(f.Offset).To(Equal(protocol.ByteCount(3)))
Expect(f.DataLenPresent).To(BeTrue())
Expect(str.popStreamFrame(1000, protocol.Version1)).To(BeNil())
_, ok, _ = str.popStreamFrame(1000, protocol.Version1)
Expect(ok).To(BeFalse())
Eventually(done).Should(BeClosed())
})
@ -141,8 +145,9 @@ var _ = Describe("Send Stream", func() {
Eventually(done).Should(BeClosed()) // both Write calls returned without any data having been dequeued yet
mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount)
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
frame, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
f := frame.Frame.(*wire.StreamFrame)
frame, ok, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
Expect(ok).To(BeTrue())
f := frame.Frame
Expect(f.Offset).To(BeZero())
Expect(f.Fin).To(BeFalse())
Expect(f.Data).To(Equal([]byte("foobar")))
@ -163,8 +168,9 @@ var _ = Describe("Send Stream", func() {
}()
waitForWrite()
for i := 0; i < 5; i++ {
frame, _ := str.popStreamFrame(1100, protocol.Version1)
f := frame.Frame.(*wire.StreamFrame)
frame, ok, _ := str.popStreamFrame(1100, protocol.Version1)
Expect(ok).To(BeTrue())
f := frame.Frame
Expect(f.Offset).To(BeNumerically("~", 1100*i, 10*i))
Expect(f.Fin).To(BeFalse())
Expect(f.Data).To(Equal(getDataAtOffset(f.Offset, f.DataLen())))
@ -186,15 +192,17 @@ var _ = Describe("Send Stream", func() {
waitForWrite()
mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).Times(2)
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(2))
frame, hasMoreData := str.popStreamFrame(expectedFrameHeaderLen(0)+2, protocol.Version1)
frame, ok, hasMoreData := str.popStreamFrame(expectedFrameHeaderLen(0)+2, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(hasMoreData).To(BeTrue())
f := frame.Frame.(*wire.StreamFrame)
f := frame.Frame
Expect(f.DataLen()).To(Equal(protocol.ByteCount(2)))
Consistently(done).ShouldNot(BeClosed())
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(1))
frame, hasMoreData = str.popStreamFrame(expectedFrameHeaderLen(1)+1, protocol.Version1)
frame, ok, hasMoreData = str.popStreamFrame(expectedFrameHeaderLen(1)+1, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(hasMoreData).To(BeTrue())
f = frame.Frame.(*wire.StreamFrame)
f = frame.Frame
Expect(f.DataLen()).To(Equal(protocol.ByteCount(1)))
Eventually(done).Should(BeClosed())
})
@ -214,22 +222,24 @@ var _ = Describe("Send Stream", func() {
waitForWrite()
mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).Times(2)
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(2))
frame, hasMoreData := str.popStreamFrame(expectedFrameHeaderLen(0)+2, protocol.Version1)
frame, ok, hasMoreData := str.popStreamFrame(expectedFrameHeaderLen(0)+2, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(hasMoreData).To(BeTrue())
f := frame.Frame.(*wire.StreamFrame)
f := frame.Frame
Expect(f.Data).To(Equal([]byte("fo")))
Consistently(done).ShouldNot(BeClosed())
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(4))
frame, hasMoreData = str.popStreamFrame(expectedFrameHeaderLen(2)+4, protocol.Version1)
frame, ok, hasMoreData = str.popStreamFrame(expectedFrameHeaderLen(2)+4, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(hasMoreData).To(BeTrue())
f = frame.Frame.(*wire.StreamFrame)
f = frame.Frame
Expect(f.Data).To(Equal([]byte("obar")))
Eventually(done).Should(BeClosed())
})
It("popStreamFrame returns nil if no data is available", func() {
frame, hasMoreData := str.popStreamFrame(1000, protocol.Version1)
Expect(frame).To(BeNil())
_, ok, hasMoreData := str.popStreamFrame(1000, protocol.Version1)
Expect(ok).To(BeFalse())
Expect(hasMoreData).To(BeFalse())
})
@ -246,16 +256,16 @@ var _ = Describe("Send Stream", func() {
waitForWrite()
mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).Times(2)
mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2)
frame, hasMoreData := str.popStreamFrame(50, protocol.Version1)
Expect(frame).ToNot(BeNil())
Expect(frame.Frame.(*wire.StreamFrame).Fin).To(BeFalse())
frame, ok, hasMoreData := str.popStreamFrame(50, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(frame.Frame.Fin).To(BeFalse())
Expect(hasMoreData).To(BeTrue())
frame, hasMoreData = str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
Expect(frame).ToNot(BeNil())
Expect(frame.Frame.(*wire.StreamFrame).Fin).To(BeFalse())
frame, ok, hasMoreData = str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(frame.Frame.Fin).To(BeFalse())
Expect(hasMoreData).To(BeFalse())
frame, _ = str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
Expect(frame).To(BeNil())
_, ok, _ = str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
Expect(ok).To(BeFalse())
Eventually(done).Should(BeClosed())
})
@ -275,12 +285,14 @@ var _ = Describe("Send Stream", func() {
Expect(n).To(Equal(3))
}()
waitForWrite()
frame, _ := str.popStreamFrame(frameHeaderSize+1, protocol.Version1)
f := frame.Frame.(*wire.StreamFrame)
frame, ok, _ := str.popStreamFrame(frameHeaderSize+1, protocol.Version1)
Expect(ok).To(BeTrue())
f := frame.Frame
Expect(f.Data).To(Equal([]byte("f")))
frame, _ = str.popStreamFrame(100, protocol.Version1)
frame, ok, _ = str.popStreamFrame(100, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(frame).ToNot(BeNil())
f = frame.Frame.(*wire.StreamFrame)
f = frame.Frame
Expect(f.Data).To(Equal([]byte("oo")))
s[1] = 'e'
Expect(f.Data).To(Equal([]byte("oo")))
@ -323,8 +335,8 @@ var _ = Describe("Send Stream", func() {
Expect(err).ToNot(HaveOccurred())
}()
waitForWrite()
f, hasMoreData := str.popStreamFrame(1000, protocol.Version1)
Expect(f).To(BeNil())
_, ok, hasMoreData := str.popStreamFrame(1000, protocol.Version1)
Expect(ok).To(BeFalse())
Expect(hasMoreData).To(BeFalse())
// make the Write go routine return
str.closeForShutdown(nil)
@ -345,7 +357,8 @@ var _ = Describe("Send Stream", func() {
// first pop a STREAM frame of the maximum size allowed by flow control
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(3))
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(3))
f, hasMoreData := str.popStreamFrame(expectedFrameHeaderLen(0)+3, protocol.Version1)
f, ok, hasMoreData := str.popStreamFrame(expectedFrameHeaderLen(0)+3, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(f).ToNot(BeNil())
Expect(hasMoreData).To(BeTrue())
@ -357,8 +370,8 @@ var _ = Describe("Send Stream", func() {
StreamID: streamID,
MaximumStreamData: 10,
})
f, hasMoreData = str.popStreamFrame(1000, protocol.Version1)
Expect(f).To(BeNil())
_, ok, hasMoreData = str.popStreamFrame(1000, protocol.Version1)
Expect(ok).To(BeFalse())
Expect(hasMoreData).To(BeFalse())
// make the Write go routine return
str.closeForShutdown(nil)
@ -416,11 +429,12 @@ var _ = Describe("Send Stream", func() {
Expect(time.Now()).To(BeTemporally("~", deadline, scaleDuration(20*time.Millisecond)))
}()
waitForWrite()
frame, hasMoreData := str.popStreamFrame(50, protocol.Version1)
frame, ok, hasMoreData := str.popStreamFrame(50, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(frame).ToNot(BeNil())
Expect(hasMoreData).To(BeTrue())
Eventually(writeReturned, scaleDuration(80*time.Millisecond)).Should(BeClosed())
Expect(n).To(BeEquivalentTo(frame.Frame.(*wire.StreamFrame).DataLen()))
Expect(n).To(BeEquivalentTo(frame.Frame.DataLen()))
})
It("doesn't pop any data after the deadline expired", func() {
@ -437,12 +451,13 @@ var _ = Describe("Send Stream", func() {
Expect(err).To(MatchError(errDeadline))
}()
waitForWrite()
frame, hasMoreData := str.popStreamFrame(50, protocol.Version1)
frame, ok, hasMoreData := str.popStreamFrame(50, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(frame).ToNot(BeNil())
Expect(hasMoreData).To(BeTrue())
Eventually(writeReturned, scaleDuration(80*time.Millisecond)).Should(BeClosed())
frame, hasMoreData = str.popStreamFrame(50, protocol.Version1)
Expect(frame).To(BeNil())
_, ok, hasMoreData = str.popStreamFrame(50, protocol.Version1)
Expect(ok).To(BeFalse())
Expect(hasMoreData).To(BeFalse())
})
@ -529,9 +544,10 @@ var _ = Describe("Send Stream", func() {
It("allows FIN", func() {
mockSender.EXPECT().onHasStreamData(streamID)
str.Close()
frame, hasMoreData := str.popStreamFrame(1000, protocol.Version1)
frame, ok, hasMoreData := str.popStreamFrame(1000, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(frame).ToNot(BeNil())
f := frame.Frame.(*wire.StreamFrame)
f := frame.Frame
Expect(f.Data).To(BeEmpty())
Expect(f.Fin).To(BeTrue())
Expect(f.DataLenPresent).To(BeTrue())
@ -546,13 +562,15 @@ var _ = Describe("Send Stream", func() {
Expect(str.Close()).To(Succeed())
mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).Times(2)
mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2)
frame, _ := str.popStreamFrame(3+frameHeaderLen, protocol.Version1)
frame, ok, _ := str.popStreamFrame(3+frameHeaderLen, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(frame).ToNot(BeNil())
f := frame.Frame.(*wire.StreamFrame)
f := frame.Frame
Expect(f.Data).To(Equal([]byte("foo")))
Expect(f.Fin).To(BeFalse())
frame, _ = str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
f = frame.Frame.(*wire.StreamFrame)
frame, ok, _ = str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
Expect(ok).To(BeTrue())
f = frame.Frame
Expect(f.Data).To(Equal([]byte("bar")))
Expect(f.Fin).To(BeTrue())
})
@ -575,9 +593,10 @@ var _ = Describe("Send Stream", func() {
if i == 5 {
Eventually(done).Should(BeClosed())
}
frame, _ := str.popStreamFrame(1100, protocol.Version1)
frame, ok, _ := str.popStreamFrame(1100, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(frame).ToNot(BeNil())
f := frame.Frame.(*wire.StreamFrame)
f := frame.Frame
Expect(f.Data).To(Equal(getDataAtOffset(f.Offset, f.DataLen())))
Expect(f.Fin).To(Equal(i == 5)) // the last frame should have the FIN bit set
}
@ -585,26 +604,27 @@ var _ = Describe("Send Stream", func() {
It("doesn't allow FIN after it is closed for shutdown", func() {
str.closeForShutdown(errors.New("test"))
f, hasMoreData := str.popStreamFrame(1000, protocol.Version1)
Expect(f).To(BeNil())
_, ok, hasMoreData := str.popStreamFrame(1000, protocol.Version1)
Expect(ok).To(BeFalse())
Expect(hasMoreData).To(BeFalse())
Expect(str.Close()).To(Succeed())
f, hasMoreData = str.popStreamFrame(1000, protocol.Version1)
Expect(f).To(BeNil())
_, ok, hasMoreData = str.popStreamFrame(1000, protocol.Version1)
Expect(ok).To(BeFalse())
Expect(hasMoreData).To(BeFalse())
})
It("doesn't allow FIN twice", func() {
mockSender.EXPECT().onHasStreamData(streamID)
str.Close()
frame, _ := str.popStreamFrame(1000, protocol.Version1)
frame, ok, _ := str.popStreamFrame(1000, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(frame).ToNot(BeNil())
f := frame.Frame.(*wire.StreamFrame)
f := frame.Frame
Expect(f.Data).To(BeEmpty())
Expect(f.Fin).To(BeTrue())
frame, hasMoreData := str.popStreamFrame(1000, protocol.Version1)
Expect(frame).To(BeNil())
_, ok, hasMoreData := str.popStreamFrame(1000, protocol.Version1)
Expect(ok).To(BeFalse())
Expect(hasMoreData).To(BeFalse())
})
})
@ -631,12 +651,13 @@ var _ = Describe("Send Stream", func() {
close(done)
}()
waitForWrite()
frame, hasMoreData := str.popStreamFrame(50, protocol.Version1) // get a STREAM frame containing some data, but not all
frame, ok, hasMoreData := str.popStreamFrame(50, protocol.Version1) // get a STREAM frame containing some data, but not all
Expect(ok).To(BeTrue())
Expect(frame).ToNot(BeNil())
Expect(hasMoreData).To(BeTrue())
str.closeForShutdown(testErr)
frame, hasMoreData = str.popStreamFrame(1000, protocol.Version1)
Expect(frame).To(BeNil())
_, ok, hasMoreData = str.popStreamFrame(1000, protocol.Version1)
Expect(ok).To(BeFalse())
Expect(hasMoreData).To(BeFalse())
Eventually(done).Should(BeClosed())
})
@ -734,12 +755,13 @@ var _ = Describe("Send Stream", func() {
close(writeReturned)
}()
waitForWrite()
frame, _ := str.popStreamFrame(50, protocol.Version1)
frame, ok, _ := str.popStreamFrame(50, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(frame).ToNot(BeNil())
mockSender.EXPECT().onStreamCompleted(streamID)
str.CancelWrite(1234)
Eventually(writeReturned).Should(BeClosed())
Expect(n).To(BeEquivalentTo(frame.Frame.(*wire.StreamFrame).DataLen()))
Expect(n).To(BeEquivalentTo(frame.Frame.DataLen()))
})
It("doesn't pop STREAM frames after being canceled", func() {
@ -754,13 +776,14 @@ var _ = Describe("Send Stream", func() {
close(writeReturned)
}()
waitForWrite()
frame, hasMoreData := str.popStreamFrame(50, protocol.Version1)
frame, ok, hasMoreData := str.popStreamFrame(50, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(hasMoreData).To(BeTrue())
Expect(frame).ToNot(BeNil())
mockSender.EXPECT().onStreamCompleted(streamID)
str.CancelWrite(1234)
frame, hasMoreData = str.popStreamFrame(10, protocol.Version1)
Expect(frame).To(BeNil())
_, ok, hasMoreData = str.popStreamFrame(10, protocol.Version1)
Expect(ok).To(BeFalse())
Expect(hasMoreData).To(BeFalse())
Eventually(writeReturned).Should(BeClosed())
})
@ -782,14 +805,15 @@ var _ = Describe("Send Stream", func() {
close(writeReturned)
}()
waitForWrite()
frame, hasMoreData := str.popStreamFrame(50, protocol.Version1)
frame, ok, hasMoreData := str.popStreamFrame(50, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(hasMoreData).To(BeTrue())
Expect(frame).ToNot(BeNil())
mockSender.EXPECT().onStreamCompleted(streamID)
str.CancelWrite(1234)
frame, hasMoreData = str.popStreamFrame(10, protocol.Version1)
_, ok, hasMoreData = str.popStreamFrame(10, protocol.Version1)
Expect(ok).To(BeFalse())
Expect(hasMoreData).To(BeFalse())
Expect(frame).To(BeNil())
Eventually(writeReturned).Should(BeClosed())
})
@ -805,12 +829,13 @@ var _ = Describe("Send Stream", func() {
close(writeReturned)
}()
waitForWrite()
frame, hasMoreData := str.popStreamFrame(50, protocol.Version1)
frame, ok, hasMoreData := str.popStreamFrame(50, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(hasMoreData).To(BeTrue())
Expect(frame).ToNot(BeNil())
mockSender.EXPECT().onStreamCompleted(streamID)
str.CancelWrite(1234)
frame.OnAcked(frame.Frame)
frame.Handler.OnAcked(frame.Frame)
})
It("cancels the context", func() {
@ -915,10 +940,11 @@ var _ = Describe("Send Stream", func() {
DataLenPresent: false,
}
mockSender.EXPECT().onHasStreamData(streamID)
str.queueRetransmission(f)
frame, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
str.OnLost(f)
frame, ok, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(frame).ToNot(BeNil())
f = frame.Frame.(*wire.StreamFrame)
f = frame.Frame
Expect(f.Offset).To(Equal(protocol.ByteCount(0x42)))
Expect(f.Data).To(Equal([]byte("foobar")))
Expect(f.DataLenPresent).To(BeTrue())
@ -932,17 +958,19 @@ var _ = Describe("Send Stream", func() {
DataLenPresent: false,
}
mockSender.EXPECT().onHasStreamData(streamID)
str.queueRetransmission(sf)
frame, hasMoreData := str.popStreamFrame(sf.Length(protocol.Version1)-3, protocol.Version1)
str.OnLost(sf)
frame, ok, hasMoreData := str.popStreamFrame(sf.Length(protocol.Version1)-3, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(frame).ToNot(BeNil())
f := frame.Frame.(*wire.StreamFrame)
f := frame.Frame
Expect(hasMoreData).To(BeTrue())
Expect(f.Offset).To(Equal(protocol.ByteCount(0x42)))
Expect(f.Data).To(Equal([]byte("foo")))
Expect(f.DataLenPresent).To(BeTrue())
frame, _ = str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
frame, ok, _ = str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(frame).ToNot(BeNil())
f = frame.Frame.(*wire.StreamFrame)
f = frame.Frame
Expect(f.Offset).To(Equal(protocol.ByteCount(0x45)))
Expect(f.Data).To(Equal([]byte("bar")))
Expect(f.DataLenPresent).To(BeTrue())
@ -956,10 +984,10 @@ var _ = Describe("Send Stream", func() {
DataLenPresent: false,
}
mockSender.EXPECT().onHasStreamData(streamID)
str.queueRetransmission(f)
frame, hasMoreData := str.popStreamFrame(2, protocol.Version1)
str.OnLost(f)
_, ok, hasMoreData := str.popStreamFrame(2, protocol.Version1)
Expect(ok).To(BeFalse())
Expect(hasMoreData).To(BeTrue())
Expect(frame).To(BeNil())
})
It("queues lost STREAM frames", func() {
@ -974,17 +1002,19 @@ var _ = Describe("Send Stream", func() {
close(done)
}()
waitForWrite()
frame, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
frame, ok, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
Expect(ok).To(BeTrue())
Eventually(done).Should(BeClosed())
Expect(frame).ToNot(BeNil())
Expect(frame.Frame.(*wire.StreamFrame).Data).To(Equal([]byte("foobar")))
Expect(frame.Frame.Data).To(Equal([]byte("foobar")))
// now lose the frame
mockSender.EXPECT().onHasStreamData(streamID)
frame.OnLost(frame.Frame)
newFrame, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
frame.Handler.OnLost(frame.Frame)
newFrame, ok, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(newFrame).ToNot(BeNil())
Expect(newFrame.Frame.(*wire.StreamFrame).Data).To(Equal([]byte("foobar")))
Expect(newFrame.Frame.Data).To(Equal([]byte("foobar")))
})
It("doesn't queue retransmissions for a stream that was canceled", func() {
@ -999,7 +1029,8 @@ var _ = Describe("Send Stream", func() {
close(done)
}()
waitForWrite()
f, _ := str.popStreamFrame(100, protocol.Version1)
f, ok, _ := str.popStreamFrame(100, protocol.Version1)
Expect(ok).To(BeTrue())
Eventually(done).Should(BeClosed())
Expect(f).ToNot(BeNil())
gomock.InOrder(
@ -1008,7 +1039,7 @@ var _ = Describe("Send Stream", func() {
)
str.CancelWrite(9876)
// don't EXPECT any calls to onHasStreamData
f.OnLost(f.Frame)
f.Handler.OnLost(f.Frame)
Expect(str.retransmissionQueue).To(BeEmpty())
})
})
@ -1031,13 +1062,13 @@ var _ = Describe("Send Stream", func() {
waitForWrite()
// get a bunch of small frames (max. 20 bytes)
var frames []ackhandler.Frame
var frames []ackhandler.StreamFrame
for {
frame, hasMoreData := str.popStreamFrame(20, protocol.Version1)
if frame == nil {
frame, ok, hasMoreData := str.popStreamFrame(20, protocol.Version1)
if !ok {
continue
}
frames = append(frames, *frame)
frames = append(frames, frame)
if !hasMoreData {
break
}
@ -1047,16 +1078,17 @@ var _ = Describe("Send Stream", func() {
// Acknowledge all frames.
// We don't expect the stream to be completed, since we still need to send the FIN.
for _, f := range frames {
f.OnAcked(f.Frame)
f.Handler.OnAcked(f.Frame)
}
// Now close the stream and acknowledge the FIN.
mockSender.EXPECT().onHasStreamData(streamID)
Expect(str.Close()).To(Succeed())
frame, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
frame, ok, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(frame).ToNot(BeNil())
mockSender.EXPECT().onStreamCompleted(streamID)
frame.OnAcked(frame.Frame)
frame.Handler.OnAcked(frame.Frame)
})
It("says when a stream is completed, if Close() is called before popping the frame", func() {
@ -1072,13 +1104,14 @@ var _ = Describe("Send Stream", func() {
Eventually(done).Should(BeClosed())
Expect(str.Close()).To(Succeed())
frame, hasMoreData := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
frame, ok, hasMoreData := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(hasMoreData).To(BeFalse())
Expect(frame).ToNot(BeNil())
Expect(frame.Frame.(*wire.StreamFrame).Fin).To(BeTrue())
Expect(frame.Frame.Fin).To(BeTrue())
mockSender.EXPECT().onStreamCompleted(streamID)
frame.OnAcked(frame.Frame)
frame.Handler.OnAcked(frame.Frame)
})
It("doesn't say it's completed when there are frames waiting to be retransmitted", func() {
@ -1095,14 +1128,14 @@ var _ = Describe("Send Stream", func() {
waitForWrite()
// get a bunch of small frames (max. 20 bytes)
var frames []ackhandler.Frame
var frames []ackhandler.StreamFrame
for {
frame, _ := str.popStreamFrame(20, protocol.Version1)
if frame == nil {
frame, ok, _ := str.popStreamFrame(20, protocol.Version1)
if !ok {
continue
}
frames = append(frames, *frame)
if frame.Frame.(*wire.StreamFrame).Fin {
frames = append(frames, frame)
if frame.Frame.Fin {
break
}
}
@ -1110,16 +1143,17 @@ var _ = Describe("Send Stream", func() {
// lose the first frame, acknowledge all others
for _, f := range frames[1:] {
f.OnAcked(f.Frame)
f.Handler.OnAcked(f.Frame)
}
mockSender.EXPECT().onHasStreamData(streamID)
frames[0].OnLost(frames[0].Frame)
frames[0].Handler.OnLost(frames[0].Frame)
// get the retransmission and acknowledge it
ret, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
ret, ok, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(ret).ToNot(BeNil())
mockSender.EXPECT().onStreamCompleted(streamID)
ret.OnAcked(ret.Frame)
ret.Handler.OnAcked(ret.Frame)
})
// This test is kind of an integration test.
@ -1154,18 +1188,18 @@ var _ = Describe("Send Stream", func() {
if completed {
break
}
f, _ := str.popStreamFrame(protocol.ByteCount(mrand.Intn(300)+100), protocol.Version1)
if f == nil {
f, ok, _ := str.popStreamFrame(protocol.ByteCount(mrand.Intn(300)+100), protocol.Version1)
if !ok {
continue
}
sf := f.Frame.(*wire.StreamFrame)
sf := f.Frame
// 50%: acknowledge the frame and save the data
// 50%: lose the frame
if mrand.Intn(100) < 50 {
copy(received[sf.Offset:sf.Offset+sf.DataLen()], sf.Data)
f.OnAcked(f.Frame)
f.Handler.OnAcked(f.Frame)
} else {
f.OnLost(f.Frame)
f.Handler.OnLost(f.Frame)
}
}
Expect(received).To(Equal(data))