package quic import ( "bytes" "context" "errors" "io" mrand "math/rand" "runtime" "time" "golang.org/x/exp/rand" "github.com/quic-go/quic-go/internal/ackhandler" "github.com/quic-go/quic-go/internal/mocks" "github.com/quic-go/quic-go/internal/protocol" "github.com/quic-go/quic-go/internal/wire" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/onsi/gomega/gbytes" "go.uber.org/mock/gomock" ) var _ = Describe("Send Stream", func() { const streamID protocol.StreamID = 1337 var ( str *sendStream strWithTimeout io.Writer // str wrapped with gbytes.TimeoutWriter mockFC *mocks.MockStreamFlowController mockSender *MockStreamSender ) BeforeEach(func() { mockSender = NewMockStreamSender(mockCtrl) mockFC = mocks.NewMockStreamFlowController(mockCtrl) str = newSendStream(streamID, mockSender, mockFC) timeout := scaleDuration(250 * time.Millisecond) strWithTimeout = gbytes.TimeoutWriter(str, timeout) }) expectedFrameHeaderLen := func(offset protocol.ByteCount) protocol.ByteCount { return (&wire.StreamFrame{ StreamID: streamID, Offset: offset, DataLenPresent: true, }).Length(protocol.Version1) } waitForWrite := func() { EventuallyWithOffset(0, func() bool { str.mutex.Lock() hasData := str.dataForWriting != nil || str.nextFrame != nil str.mutex.Unlock() return hasData }).Should(BeTrue()) } getDataAtOffset := func(offset, length protocol.ByteCount) []byte { b := make([]byte, length) for i := protocol.ByteCount(0); i < length; i++ { b[i] = uint8(offset + i) } return b } getData := func(length protocol.ByteCount) []byte { return getDataAtOffset(0, length) } It("gets stream id", func() { Expect(str.StreamID()).To(Equal(protocol.StreamID(1337))) }) Context("writing", func() { It("writes and gets all data at once", func() { done := make(chan struct{}) go func() { defer GinkgoRecover() defer close(done) mockSender.EXPECT().onHasStreamData(streamID) n, err := strWithTimeout.Write([]byte("foobar")) Expect(err).ToNot(HaveOccurred()) Expect(n).To(Equal(6)) }() waitForWrite() mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6)) frame, ok, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) Expect(ok).To(BeTrue()) f := frame.Frame Expect(f.Data).To(Equal([]byte("foobar"))) Expect(f.Fin).To(BeFalse()) Expect(f.Offset).To(BeZero()) Expect(f.DataLenPresent).To(BeTrue()) Expect(str.writeOffset).To(Equal(protocol.ByteCount(6))) Expect(str.dataForWriting).To(BeNil()) Eventually(done).Should(BeClosed()) }) It("writes and gets data in two turns", func() { done := make(chan struct{}) go func() { defer GinkgoRecover() mockSender.EXPECT().onHasStreamData(streamID) n, err := strWithTimeout.Write([]byte("foobar")) Expect(err).ToNot(HaveOccurred()) Expect(n).To(Equal(6)) close(done) }() waitForWrite() mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).Times(2) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(3)).Times(2) frame, ok, _ := str.popStreamFrame(expectedFrameHeaderLen(0)+3, protocol.Version1) Expect(ok).To(BeTrue()) f := frame.Frame Expect(f.Offset).To(BeZero()) Expect(f.Fin).To(BeFalse()) Expect(f.Data).To(Equal([]byte("foo"))) Expect(f.DataLenPresent).To(BeTrue()) frame, ok, _ = str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) Expect(ok).To(BeTrue()) f = frame.Frame Expect(f.Data).To(Equal([]byte("bar"))) Expect(f.Fin).To(BeFalse()) Expect(f.Offset).To(Equal(protocol.ByteCount(3))) Expect(f.DataLenPresent).To(BeTrue()) _, ok, _ = str.popStreamFrame(1000, protocol.Version1) Expect(ok).To(BeFalse()) Eventually(done).Should(BeClosed()) }) It("bundles small writes", func() { done := make(chan struct{}) go func() { defer GinkgoRecover() mockSender.EXPECT().onHasStreamData(streamID).Times(2) n, err := strWithTimeout.Write([]byte("foo")) Expect(err).ToNot(HaveOccurred()) Expect(n).To(Equal(3)) n, err = strWithTimeout.Write([]byte("bar")) Expect(err).ToNot(HaveOccurred()) Expect(n).To(Equal(3)) close(done) }() Eventually(done).Should(BeClosed()) // both Write calls returned without any data having been dequeued yet mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6)) frame, ok, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) Expect(ok).To(BeTrue()) f := frame.Frame Expect(f.Offset).To(BeZero()) Expect(f.Fin).To(BeFalse()) Expect(f.Data).To(Equal([]byte("foobar"))) }) It("writes and gets data in multiple turns, for large writes", func() { mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).Times(5) var totalBytesSent protocol.ByteCount mockFC.EXPECT().AddBytesSent(gomock.Any()).Do(func(l protocol.ByteCount) { totalBytesSent += l }).Times(5) done := make(chan struct{}) go func() { defer GinkgoRecover() mockSender.EXPECT().onHasStreamData(streamID) n, err := strWithTimeout.Write(getData(5000)) Expect(err).ToNot(HaveOccurred()) Expect(n).To(Equal(5000)) close(done) }() waitForWrite() for i := 0; i < 5; i++ { frame, ok, _ := str.popStreamFrame(1100, protocol.Version1) Expect(ok).To(BeTrue()) f := frame.Frame Expect(f.Offset).To(BeNumerically("~", 1100*i, 10*i)) Expect(f.Fin).To(BeFalse()) Expect(f.Data).To(Equal(getDataAtOffset(f.Offset, f.DataLen()))) Expect(f.DataLenPresent).To(BeTrue()) } Expect(totalBytesSent).To(Equal(protocol.ByteCount(5000))) Eventually(done).Should(BeClosed()) }) It("unblocks Write as soon as a STREAM frame can be buffered", func() { done := make(chan struct{}) go func() { defer GinkgoRecover() defer close(done) mockSender.EXPECT().onHasStreamData(streamID) _, err := strWithTimeout.Write(getData(protocol.MaxPacketBufferSize + 3)) Expect(err).ToNot(HaveOccurred()) }() waitForWrite() mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).Times(2) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(2)) frame, ok, hasMoreData := str.popStreamFrame(expectedFrameHeaderLen(0)+2, protocol.Version1) Expect(ok).To(BeTrue()) Expect(hasMoreData).To(BeTrue()) f := frame.Frame Expect(f.DataLen()).To(Equal(protocol.ByteCount(2))) Consistently(done).ShouldNot(BeClosed()) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(1)) frame, ok, hasMoreData = str.popStreamFrame(expectedFrameHeaderLen(1)+1, protocol.Version1) Expect(ok).To(BeTrue()) Expect(hasMoreData).To(BeTrue()) f = frame.Frame Expect(f.DataLen()).To(Equal(protocol.ByteCount(1))) Eventually(done).Should(BeClosed()) }) It("only unblocks Write once a previously buffered STREAM frame has been fully dequeued", func() { mockSender.EXPECT().onHasStreamData(streamID) _, err := strWithTimeout.Write([]byte("foobar")) Expect(err).ToNot(HaveOccurred()) done := make(chan struct{}) go func() { defer GinkgoRecover() defer close(done) mockSender.EXPECT().onHasStreamData(streamID) _, err := str.Write(getData(protocol.MaxPacketBufferSize)) Expect(err).ToNot(HaveOccurred()) }() waitForWrite() mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).Times(2) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(2)) frame, ok, hasMoreData := str.popStreamFrame(expectedFrameHeaderLen(0)+2, protocol.Version1) Expect(ok).To(BeTrue()) Expect(hasMoreData).To(BeTrue()) f := frame.Frame Expect(f.Data).To(Equal([]byte("fo"))) Consistently(done).ShouldNot(BeClosed()) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(4)) frame, ok, hasMoreData = str.popStreamFrame(expectedFrameHeaderLen(2)+4, protocol.Version1) Expect(ok).To(BeTrue()) Expect(hasMoreData).To(BeTrue()) f = frame.Frame Expect(f.Data).To(Equal([]byte("obar"))) Eventually(done).Should(BeClosed()) }) It("popStreamFrame returns nil if no data is available", func() { _, ok, hasMoreData := str.popStreamFrame(1000, protocol.Version1) Expect(ok).To(BeFalse()) Expect(hasMoreData).To(BeFalse()) }) It("says if it has more data for writing", func() { done := make(chan struct{}) go func() { defer GinkgoRecover() defer close(done) mockSender.EXPECT().onHasStreamData(streamID) n, err := strWithTimeout.Write(bytes.Repeat([]byte{0}, 100)) Expect(err).ToNot(HaveOccurred()) Expect(n).To(Equal(100)) }() waitForWrite() mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).Times(2) mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2) frame, ok, hasMoreData := str.popStreamFrame(50, protocol.Version1) Expect(ok).To(BeTrue()) Expect(frame.Frame.Fin).To(BeFalse()) Expect(hasMoreData).To(BeTrue()) frame, ok, hasMoreData = str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) Expect(ok).To(BeTrue()) Expect(frame.Frame.Fin).To(BeFalse()) Expect(hasMoreData).To(BeFalse()) _, ok, _ = str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) Expect(ok).To(BeFalse()) Eventually(done).Should(BeClosed()) }) It("copies the slice while writing", func() { frameHeaderSize := protocol.ByteCount(4) mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).Times(2) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(1)) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(2)) s := []byte("foo") done := make(chan struct{}) go func() { defer GinkgoRecover() defer close(done) mockSender.EXPECT().onHasStreamData(streamID) n, err := strWithTimeout.Write(s) Expect(err).ToNot(HaveOccurred()) Expect(n).To(Equal(3)) }() waitForWrite() frame, ok, _ := str.popStreamFrame(frameHeaderSize+1, protocol.Version1) Expect(ok).To(BeTrue()) f := frame.Frame Expect(f.Data).To(Equal([]byte("f"))) frame, ok, _ = str.popStreamFrame(100, protocol.Version1) Expect(ok).To(BeTrue()) Expect(frame).ToNot(BeNil()) f = frame.Frame Expect(f.Data).To(Equal([]byte("oo"))) s[1] = 'e' Expect(f.Data).To(Equal([]byte("oo"))) Eventually(done).Should(BeClosed()) }) It("returns when given a nil input", func() { n, err := strWithTimeout.Write(nil) Expect(n).To(BeZero()) Expect(err).ToNot(HaveOccurred()) }) It("returns when given an empty slice", func() { n, err := strWithTimeout.Write([]byte("")) Expect(n).To(BeZero()) Expect(err).ToNot(HaveOccurred()) }) It("cancels the context when Close is called", func() { mockSender.EXPECT().onHasStreamData(streamID) Expect(str.Context().Done()).ToNot(BeClosed()) Expect(str.Close()).To(Succeed()) Expect(str.Context().Done()).To(BeClosed()) Expect(context.Cause(str.Context())).To(MatchError(context.Canceled)) }) Context("flow control blocking", func() { It("queues a BLOCKED frame if the stream is flow control blocked", func() { mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(0)) mockFC.EXPECT().IsNewlyBlocked().Return(true, protocol.ByteCount(12)) mockSender.EXPECT().queueControlFrame(&wire.StreamDataBlockedFrame{ StreamID: streamID, MaximumStreamData: 12, }) done := make(chan struct{}) go func() { defer GinkgoRecover() defer close(done) mockSender.EXPECT().onHasStreamData(streamID) _, err := str.Write([]byte("foobar")) Expect(err).ToNot(HaveOccurred()) }() waitForWrite() _, ok, hasMoreData := str.popStreamFrame(1000, protocol.Version1) Expect(ok).To(BeFalse()) Expect(hasMoreData).To(BeFalse()) // make the Write go routine return str.closeForShutdown(nil) Eventually(done).Should(BeClosed()) }) It("says that it doesn't have any more data, when it is flow control blocked", func() { done := make(chan struct{}) go func() { defer GinkgoRecover() defer close(done) mockSender.EXPECT().onHasStreamData(streamID) _, err := str.Write([]byte("foobar")) Expect(err).ToNot(HaveOccurred()) }() waitForWrite() // first pop a STREAM frame of the maximum size allowed by flow control mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(3)) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(3)) f, ok, hasMoreData := str.popStreamFrame(expectedFrameHeaderLen(0)+3, protocol.Version1) Expect(ok).To(BeTrue()) Expect(f).ToNot(BeNil()) Expect(hasMoreData).To(BeTrue()) // try to pop again, this time noticing that we're blocked mockFC.EXPECT().SendWindowSize() // don't use offset 3 here, to make sure the BLOCKED frame contains the number returned by the flow controller mockFC.EXPECT().IsNewlyBlocked().Return(true, protocol.ByteCount(10)) mockSender.EXPECT().queueControlFrame(&wire.StreamDataBlockedFrame{ StreamID: streamID, MaximumStreamData: 10, }) _, ok, hasMoreData = str.popStreamFrame(1000, protocol.Version1) Expect(ok).To(BeFalse()) Expect(hasMoreData).To(BeFalse()) // make the Write go routine return str.closeForShutdown(nil) Eventually(done).Should(BeClosed()) }) }) Context("deadlines", func() { It("returns an error when Write is called after the deadline", func() { str.SetWriteDeadline(time.Now().Add(-time.Second)) n, err := strWithTimeout.Write([]byte("foobar")) Expect(err).To(MatchError(errDeadline)) Expect(n).To(BeZero()) }) It("unblocks after the deadline", func() { mockSender.EXPECT().onHasStreamData(streamID) deadline := time.Now().Add(scaleDuration(50 * time.Millisecond)) str.SetWriteDeadline(deadline) n, err := strWithTimeout.Write(getData(5000)) Expect(err).To(MatchError(errDeadline)) Expect(n).To(BeZero()) Expect(time.Now()).To(BeTemporally("~", deadline, scaleDuration(20*time.Millisecond))) }) It("unblocks when the deadline is changed to the past", func() { mockSender.EXPECT().onHasStreamData(streamID) str.SetWriteDeadline(time.Now().Add(time.Hour)) done := make(chan struct{}) go func() { defer GinkgoRecover() _, err := str.Write(getData(5000)) Expect(err).To(MatchError(errDeadline)) close(done) }() Consistently(done).ShouldNot(BeClosed()) str.SetWriteDeadline(time.Now().Add(-time.Hour)) Eventually(done).Should(BeClosed()) }) It("returns the number of bytes written, when the deadline expires", func() { mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).AnyTimes() mockFC.EXPECT().AddBytesSent(gomock.Any()) deadline := time.Now().Add(scaleDuration(50 * time.Millisecond)) str.SetWriteDeadline(deadline) var n int writeReturned := make(chan struct{}) go func() { defer GinkgoRecover() defer close(writeReturned) mockSender.EXPECT().onHasStreamData(streamID) var err error n, err = strWithTimeout.Write(getData(5000)) Expect(err).To(MatchError(errDeadline)) Expect(time.Now()).To(BeTemporally("~", deadline, scaleDuration(20*time.Millisecond))) }() waitForWrite() frame, ok, hasMoreData := str.popStreamFrame(50, protocol.Version1) Expect(ok).To(BeTrue()) Expect(frame).ToNot(BeNil()) Expect(hasMoreData).To(BeTrue()) Eventually(writeReturned, scaleDuration(80*time.Millisecond)).Should(BeClosed()) Expect(n).To(BeEquivalentTo(frame.Frame.DataLen())) }) It("doesn't pop any data after the deadline expired", func() { mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).AnyTimes() mockFC.EXPECT().AddBytesSent(gomock.Any()) deadline := time.Now().Add(scaleDuration(50 * time.Millisecond)) str.SetWriteDeadline(deadline) writeReturned := make(chan struct{}) go func() { defer GinkgoRecover() defer close(writeReturned) mockSender.EXPECT().onHasStreamData(streamID) _, err := strWithTimeout.Write(getData(5000)) Expect(err).To(MatchError(errDeadline)) }() waitForWrite() frame, ok, hasMoreData := str.popStreamFrame(50, protocol.Version1) Expect(ok).To(BeTrue()) Expect(frame).ToNot(BeNil()) Expect(hasMoreData).To(BeTrue()) Eventually(writeReturned, scaleDuration(80*time.Millisecond)).Should(BeClosed()) _, ok, hasMoreData = str.popStreamFrame(50, protocol.Version1) Expect(ok).To(BeFalse()) Expect(hasMoreData).To(BeFalse()) }) It("doesn't unblock if the deadline is changed before the first one expires", func() { mockSender.EXPECT().onHasStreamData(streamID) deadline1 := time.Now().Add(scaleDuration(50 * time.Millisecond)) deadline2 := time.Now().Add(scaleDuration(100 * time.Millisecond)) str.SetWriteDeadline(deadline1) done := make(chan struct{}) go func() { defer GinkgoRecover() time.Sleep(scaleDuration(20 * time.Millisecond)) str.SetWriteDeadline(deadline2) // make sure that this was actually execute before the deadline expires Expect(time.Now()).To(BeTemporally("<", deadline1)) close(done) }() runtime.Gosched() n, err := strWithTimeout.Write(getData(5000)) Expect(err).To(MatchError(errDeadline)) Expect(n).To(BeZero()) Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(20*time.Millisecond))) Eventually(done).Should(BeClosed()) }) It("unblocks earlier, when a new deadline is set", func() { mockSender.EXPECT().onHasStreamData(streamID) deadline1 := time.Now().Add(scaleDuration(200 * time.Millisecond)) deadline2 := time.Now().Add(scaleDuration(50 * time.Millisecond)) done := make(chan struct{}) go func() { defer GinkgoRecover() time.Sleep(scaleDuration(10 * time.Millisecond)) str.SetWriteDeadline(deadline2) // make sure that this was actually execute before the deadline expires Expect(time.Now()).To(BeTemporally("<", deadline2)) close(done) }() str.SetWriteDeadline(deadline1) runtime.Gosched() _, err := strWithTimeout.Write(getData(5000)) Expect(err).To(MatchError(errDeadline)) Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(20*time.Millisecond))) Eventually(done).Should(BeClosed()) }) It("doesn't unblock if the deadline is removed", func() { mockSender.EXPECT().onHasStreamData(streamID) deadline := time.Now().Add(scaleDuration(50 * time.Millisecond)) str.SetWriteDeadline(deadline) deadlineUnset := make(chan struct{}) go func() { defer GinkgoRecover() time.Sleep(scaleDuration(20 * time.Millisecond)) str.SetWriteDeadline(time.Time{}) // make sure that this was actually execute before the deadline expires Expect(time.Now()).To(BeTemporally("<", deadline)) close(deadlineUnset) }() done := make(chan struct{}) go func() { defer GinkgoRecover() _, err := strWithTimeout.Write(getData(5000)) Expect(err).To(MatchError("test done")) close(done) }() runtime.Gosched() Eventually(deadlineUnset).Should(BeClosed()) Consistently(done, scaleDuration(100*time.Millisecond)).ShouldNot(BeClosed()) // make the go routine return str.closeForShutdown(errors.New("test done")) Eventually(done).Should(BeClosed()) }) }) Context("closing", func() { It("doesn't allow writes after it has been closed", func() { mockSender.EXPECT().onHasStreamData(streamID) str.Close() _, err := strWithTimeout.Write([]byte("foobar")) Expect(err).To(MatchError("write on closed stream 1337")) }) It("allows FIN", func() { mockSender.EXPECT().onHasStreamData(streamID) str.Close() frame, ok, hasMoreData := str.popStreamFrame(1000, protocol.Version1) Expect(ok).To(BeTrue()) Expect(frame).ToNot(BeNil()) f := frame.Frame Expect(f.Data).To(BeEmpty()) Expect(f.Fin).To(BeTrue()) Expect(f.DataLenPresent).To(BeTrue()) Expect(hasMoreData).To(BeFalse()) }) It("doesn't send a FIN when there's still data", func() { const frameHeaderLen protocol.ByteCount = 4 mockSender.EXPECT().onHasStreamData(streamID).Times(2) _, err := strWithTimeout.Write([]byte("foobar")) Expect(err).ToNot(HaveOccurred()) Expect(str.Close()).To(Succeed()) mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).Times(2) mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2) frame, ok, _ := str.popStreamFrame(3+frameHeaderLen, protocol.Version1) Expect(ok).To(BeTrue()) Expect(frame).ToNot(BeNil()) f := frame.Frame Expect(f.Data).To(Equal([]byte("foo"))) Expect(f.Fin).To(BeFalse()) frame, ok, _ = str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) Expect(ok).To(BeTrue()) f = frame.Frame Expect(f.Data).To(Equal([]byte("bar"))) Expect(f.Fin).To(BeTrue()) }) It("doesn't send a FIN when there's still data, for long writes", func() { done := make(chan struct{}) go func() { defer GinkgoRecover() defer close(done) mockSender.EXPECT().onHasStreamData(streamID) _, err := strWithTimeout.Write(getData(5000)) Expect(err).ToNot(HaveOccurred()) mockSender.EXPECT().onHasStreamData(streamID) Expect(str.Close()).To(Succeed()) }() waitForWrite() for i := 1; i <= 5; i++ { mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount) mockFC.EXPECT().AddBytesSent(gomock.Any()) if i == 5 { Eventually(done).Should(BeClosed()) } frame, ok, _ := str.popStreamFrame(1100, protocol.Version1) Expect(ok).To(BeTrue()) Expect(frame).ToNot(BeNil()) f := frame.Frame Expect(f.Data).To(Equal(getDataAtOffset(f.Offset, f.DataLen()))) Expect(f.Fin).To(Equal(i == 5)) // the last frame should have the FIN bit set } }) It("doesn't allow FIN after it is closed for shutdown", func() { str.closeForShutdown(errors.New("test")) _, ok, hasMoreData := str.popStreamFrame(1000, protocol.Version1) Expect(ok).To(BeFalse()) Expect(hasMoreData).To(BeFalse()) Expect(str.Close()).To(Succeed()) _, ok, hasMoreData = str.popStreamFrame(1000, protocol.Version1) Expect(ok).To(BeFalse()) Expect(hasMoreData).To(BeFalse()) }) It("doesn't allow FIN twice", func() { mockSender.EXPECT().onHasStreamData(streamID) str.Close() frame, ok, _ := str.popStreamFrame(1000, protocol.Version1) Expect(ok).To(BeTrue()) Expect(frame).ToNot(BeNil()) f := frame.Frame Expect(f.Data).To(BeEmpty()) Expect(f.Fin).To(BeTrue()) _, ok, hasMoreData := str.popStreamFrame(1000, protocol.Version1) Expect(ok).To(BeFalse()) Expect(hasMoreData).To(BeFalse()) }) }) Context("closing for shutdown", func() { testErr := errors.New("test") It("returns errors when the stream is cancelled", func() { str.closeForShutdown(testErr) n, err := strWithTimeout.Write([]byte("foo")) Expect(n).To(BeZero()) Expect(err).To(MatchError(testErr)) }) It("doesn't get data for writing if an error occurred", func() { mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount) mockFC.EXPECT().AddBytesSent(gomock.Any()) mockSender.EXPECT().onHasStreamData(streamID) done := make(chan struct{}) go func() { defer GinkgoRecover() _, err := strWithTimeout.Write(getData(5000)) Expect(err).To(MatchError(testErr)) close(done) }() waitForWrite() frame, ok, hasMoreData := str.popStreamFrame(50, protocol.Version1) // get a STREAM frame containing some data, but not all Expect(ok).To(BeTrue()) Expect(frame).ToNot(BeNil()) Expect(hasMoreData).To(BeTrue()) str.closeForShutdown(testErr) _, ok, hasMoreData = str.popStreamFrame(1000, protocol.Version1) Expect(ok).To(BeFalse()) Expect(hasMoreData).To(BeFalse()) Eventually(done).Should(BeClosed()) }) It("cancels the context", func() { Expect(str.Context().Done()).ToNot(BeClosed()) str.closeForShutdown(testErr) Expect(str.Context().Done()).To(BeClosed()) Expect(context.Cause(str.Context())).To(MatchError(testErr)) }) }) }) Context("handling MAX_STREAM_DATA frames", func() { It("informs the flow controller", func() { mockFC.EXPECT().UpdateSendWindow(protocol.ByteCount(0x1337)) str.updateSendWindow(0x1337) }) It("says when it has data for sending", func() { mockFC.EXPECT().UpdateSendWindow(gomock.Any()) mockSender.EXPECT().onHasStreamData(streamID) done := make(chan struct{}) go func() { defer GinkgoRecover() _, err := str.Write([]byte("foobar")) Expect(err).ToNot(HaveOccurred()) close(done) }() waitForWrite() mockSender.EXPECT().onHasStreamData(streamID) str.updateSendWindow(42) // make sure the Write go routine returns str.closeForShutdown(nil) Eventually(done).Should(BeClosed()) }) }) Context("stream cancellations", func() { Context("canceling writing", func() { It("queues a RESET_STREAM frame", func() { gomock.InOrder( mockSender.EXPECT().queueControlFrame(&wire.ResetStreamFrame{ StreamID: streamID, FinalSize: 1234, ErrorCode: 9876, }), mockSender.EXPECT().onStreamCompleted(streamID), ) str.writeOffset = 1234 str.CancelWrite(9876) }) // This test is inherently racy, as it tests a concurrent call to Write() and CancelRead(). // A single successful run of this test therefore doesn't mean a lot, // for reliable results it has to be run many times. It("returns a nil error when the whole slice has been sent out", func() { mockSender.EXPECT().queueControlFrame(gomock.Any()).MaxTimes(1) mockSender.EXPECT().onHasStreamData(streamID).MaxTimes(1) mockSender.EXPECT().onStreamCompleted(streamID).MaxTimes(1) mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).MaxTimes(1) mockFC.EXPECT().AddBytesSent(gomock.Any()).MaxTimes(1) errChan := make(chan error) go func() { defer GinkgoRecover() n, err := strWithTimeout.Write(getData(100)) if n == 0 { errChan <- nil return } errChan <- err }() runtime.Gosched() go str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) go str.CancelWrite(1234) Eventually(errChan).Should(Receive(Not(HaveOccurred()))) }) It("unblocks Write", func() { mockSender.EXPECT().queueControlFrame(gomock.Any()) mockSender.EXPECT().onHasStreamData(streamID) mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount) mockFC.EXPECT().AddBytesSent(gomock.Any()) writeReturned := make(chan struct{}) var n int go func() { defer GinkgoRecover() var err error n, err = strWithTimeout.Write(getData(5000)) Expect(err).To(Equal(&StreamError{ StreamID: streamID, ErrorCode: 1234, Remote: false, })) close(writeReturned) }() waitForWrite() frame, ok, _ := str.popStreamFrame(50, protocol.Version1) Expect(ok).To(BeTrue()) Expect(frame).ToNot(BeNil()) mockSender.EXPECT().onStreamCompleted(streamID) str.CancelWrite(1234) Eventually(writeReturned).Should(BeClosed()) Expect(n).To(BeEquivalentTo(frame.Frame.DataLen())) }) It("doesn't pop STREAM frames after being canceled", func() { mockSender.EXPECT().queueControlFrame(gomock.Any()) mockSender.EXPECT().onHasStreamData(streamID) mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount) mockFC.EXPECT().AddBytesSent(gomock.Any()) writeReturned := make(chan struct{}) go func() { defer GinkgoRecover() strWithTimeout.Write(getData(100)) close(writeReturned) }() waitForWrite() frame, ok, hasMoreData := str.popStreamFrame(50, protocol.Version1) Expect(ok).To(BeTrue()) Expect(hasMoreData).To(BeTrue()) Expect(frame).ToNot(BeNil()) mockSender.EXPECT().onStreamCompleted(streamID) str.CancelWrite(1234) _, ok, hasMoreData = str.popStreamFrame(10, protocol.Version1) Expect(ok).To(BeFalse()) Expect(hasMoreData).To(BeFalse()) Eventually(writeReturned).Should(BeClosed()) }) It("doesn't pop STREAM frames after being canceled, for large writes", func() { mockSender.EXPECT().queueControlFrame(gomock.Any()) mockSender.EXPECT().onHasStreamData(streamID) mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount) mockFC.EXPECT().AddBytesSent(gomock.Any()) writeReturned := make(chan struct{}) go func() { defer GinkgoRecover() _, err := strWithTimeout.Write(getData(5000)) Expect(err).To(Equal(&StreamError{ StreamID: streamID, ErrorCode: 1234, Remote: false, })) close(writeReturned) }() waitForWrite() frame, ok, hasMoreData := str.popStreamFrame(50, protocol.Version1) Expect(ok).To(BeTrue()) Expect(hasMoreData).To(BeTrue()) Expect(frame).ToNot(BeNil()) mockSender.EXPECT().onStreamCompleted(streamID) str.CancelWrite(1234) _, ok, hasMoreData = str.popStreamFrame(10, protocol.Version1) Expect(ok).To(BeFalse()) Expect(hasMoreData).To(BeFalse()) Eventually(writeReturned).Should(BeClosed()) }) It("ignores acknowledgements for STREAM frames after it was cancelled", func() { mockSender.EXPECT().queueControlFrame(gomock.Any()) mockSender.EXPECT().onHasStreamData(streamID) mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount) mockFC.EXPECT().AddBytesSent(gomock.Any()) writeReturned := make(chan struct{}) go func() { defer GinkgoRecover() strWithTimeout.Write(getData(100)) close(writeReturned) }() waitForWrite() frame, ok, hasMoreData := str.popStreamFrame(50, protocol.Version1) Expect(ok).To(BeTrue()) Expect(hasMoreData).To(BeTrue()) Expect(frame).ToNot(BeNil()) mockSender.EXPECT().onStreamCompleted(streamID) str.CancelWrite(1234) frame.Handler.OnAcked(frame.Frame) }) It("cancels the context", func() { mockSender.EXPECT().queueControlFrame(gomock.Any()) mockSender.EXPECT().onStreamCompleted(gomock.Any()) Expect(str.Context().Done()).ToNot(BeClosed()) str.CancelWrite(1234) Expect(str.Context().Done()).To(BeClosed()) Expect(context.Cause(str.Context())).To(BeAssignableToTypeOf(&StreamError{})) Expect(context.Cause(str.Context()).(*StreamError).ErrorCode).To(Equal(StreamErrorCode(1234))) }) It("doesn't allow further calls to Write", func() { mockSender.EXPECT().queueControlFrame(gomock.Any()) mockSender.EXPECT().onStreamCompleted(gomock.Any()) str.CancelWrite(1234) _, err := strWithTimeout.Write([]byte("foobar")) Expect(err).To(MatchError(&StreamError{ StreamID: streamID, ErrorCode: 1234, Remote: false, })) }) It("only cancels once", func() { mockSender.EXPECT().queueControlFrame(&wire.ResetStreamFrame{StreamID: streamID, ErrorCode: 1234}) mockSender.EXPECT().onStreamCompleted(gomock.Any()) str.CancelWrite(1234) str.CancelWrite(4321) }) It("queues a RESET_STREAM frame, even if the stream was already closed", func() { mockSender.EXPECT().onHasStreamData(streamID) mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) { Expect(f).To(BeAssignableToTypeOf(&wire.ResetStreamFrame{})) }) mockSender.EXPECT().onStreamCompleted(gomock.Any()) Expect(str.Close()).To(Succeed()) // don't EXPECT any calls to queueControlFrame str.CancelWrite(123) }) }) Context("receiving STOP_SENDING frames", func() { It("queues a RESET_STREAM frames, and copies the error code from the STOP_SENDING frame", func() { mockSender.EXPECT().queueControlFrame(&wire.ResetStreamFrame{ StreamID: streamID, ErrorCode: 101, }) mockSender.EXPECT().onStreamCompleted(gomock.Any()) str.handleStopSendingFrame(&wire.StopSendingFrame{ StreamID: streamID, ErrorCode: 101, }) }) It("unblocks Write", func() { mockSender.EXPECT().onHasStreamData(streamID) mockSender.EXPECT().queueControlFrame(gomock.Any()) mockSender.EXPECT().onStreamCompleted(gomock.Any()) done := make(chan struct{}) go func() { defer GinkgoRecover() _, err := str.Write(getData(5000)) Expect(err).To(Equal(&StreamError{ StreamID: streamID, ErrorCode: 123, Remote: true, })) close(done) }() waitForWrite() str.handleStopSendingFrame(&wire.StopSendingFrame{ StreamID: streamID, ErrorCode: 123, }) Eventually(done).Should(BeClosed()) }) It("doesn't allow further calls to Write", func() { mockSender.EXPECT().queueControlFrame(gomock.Any()) mockSender.EXPECT().onStreamCompleted(gomock.Any()) str.handleStopSendingFrame(&wire.StopSendingFrame{ StreamID: streamID, ErrorCode: 123, }) _, err := str.Write([]byte("foobar")) Expect(err).To(Equal(&StreamError{ StreamID: streamID, ErrorCode: 123, Remote: true, })) }) }) }) Context("retransmissions", func() { It("queues and retrieves frames", func() { str.numOutstandingFrames = 1 f := &wire.StreamFrame{ Data: []byte("foobar"), Offset: 0x42, DataLenPresent: false, } mockSender.EXPECT().onHasStreamData(streamID) (*sendStreamAckHandler)(str).OnLost(f) frame, ok, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) Expect(ok).To(BeTrue()) Expect(frame).ToNot(BeNil()) f = frame.Frame Expect(f.Offset).To(Equal(protocol.ByteCount(0x42))) Expect(f.Data).To(Equal([]byte("foobar"))) Expect(f.DataLenPresent).To(BeTrue()) }) It("splits a retransmission", func() { str.numOutstandingFrames = 1 sf := &wire.StreamFrame{ Data: []byte("foobar"), Offset: 0x42, DataLenPresent: false, } mockSender.EXPECT().onHasStreamData(streamID) (*sendStreamAckHandler)(str).OnLost(sf) frame, ok, hasMoreData := str.popStreamFrame(sf.Length(protocol.Version1)-3, protocol.Version1) Expect(ok).To(BeTrue()) Expect(frame).ToNot(BeNil()) f := frame.Frame Expect(hasMoreData).To(BeTrue()) Expect(f.Offset).To(Equal(protocol.ByteCount(0x42))) Expect(f.Data).To(Equal([]byte("foo"))) Expect(f.DataLenPresent).To(BeTrue()) frame, ok, _ = str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) Expect(ok).To(BeTrue()) Expect(frame).ToNot(BeNil()) f = frame.Frame Expect(f.Offset).To(Equal(protocol.ByteCount(0x45))) Expect(f.Data).To(Equal([]byte("bar"))) Expect(f.DataLenPresent).To(BeTrue()) }) It("returns nil if the size is too small", func() { str.numOutstandingFrames = 1 f := &wire.StreamFrame{ Data: []byte("foobar"), Offset: 0x42, DataLenPresent: false, } mockSender.EXPECT().onHasStreamData(streamID) (*sendStreamAckHandler)(str).OnLost(f) _, ok, hasMoreData := str.popStreamFrame(2, protocol.Version1) Expect(ok).To(BeFalse()) Expect(hasMoreData).To(BeTrue()) }) It("queues lost STREAM frames", func() { mockSender.EXPECT().onHasStreamData(streamID) mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6)) done := make(chan struct{}) go func() { defer GinkgoRecover() _, err := strWithTimeout.Write([]byte("foobar")) Expect(err).ToNot(HaveOccurred()) close(done) }() waitForWrite() frame, ok, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) Expect(ok).To(BeTrue()) Eventually(done).Should(BeClosed()) Expect(frame).ToNot(BeNil()) Expect(frame.Frame.Data).To(Equal([]byte("foobar"))) // now lose the frame mockSender.EXPECT().onHasStreamData(streamID) frame.Handler.OnLost(frame.Frame) newFrame, ok, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) Expect(ok).To(BeTrue()) Expect(newFrame).ToNot(BeNil()) Expect(newFrame.Frame.Data).To(Equal([]byte("foobar"))) }) It("doesn't queue retransmissions for a stream that was canceled", func() { mockSender.EXPECT().onHasStreamData(streamID) mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6)) done := make(chan struct{}) go func() { defer GinkgoRecover() _, err := str.Write([]byte("foobar")) Expect(err).ToNot(HaveOccurred()) close(done) }() waitForWrite() f, ok, _ := str.popStreamFrame(100, protocol.Version1) Expect(ok).To(BeTrue()) Eventually(done).Should(BeClosed()) Expect(f).ToNot(BeNil()) gomock.InOrder( mockSender.EXPECT().queueControlFrame(gomock.Any()), mockSender.EXPECT().onStreamCompleted(streamID), ) str.CancelWrite(9876) // don't EXPECT any calls to onHasStreamData f.Handler.OnLost(f.Frame) Expect(str.retransmissionQueue).To(BeEmpty()) }) }) Context("determining when a stream is completed", func() { BeforeEach(func() { mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).AnyTimes() mockFC.EXPECT().AddBytesSent(gomock.Any()).AnyTimes() }) It("says when a stream is completed", func() { mockSender.EXPECT().onHasStreamData(streamID) done := make(chan struct{}) go func() { defer GinkgoRecover() _, err := strWithTimeout.Write(make([]byte, 100)) Expect(err).ToNot(HaveOccurred()) close(done) }() waitForWrite() // get a bunch of small frames (max. 20 bytes) var frames []ackhandler.StreamFrame for { frame, ok, hasMoreData := str.popStreamFrame(20, protocol.Version1) if !ok { continue } frames = append(frames, frame) if !hasMoreData { break } } Eventually(done).Should(BeClosed()) // Acknowledge all frames. // We don't expect the stream to be completed, since we still need to send the FIN. for _, f := range frames { f.Handler.OnAcked(f.Frame) } // Now close the stream and acknowledge the FIN. mockSender.EXPECT().onHasStreamData(streamID) Expect(str.Close()).To(Succeed()) frame, ok, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) Expect(ok).To(BeTrue()) Expect(frame).ToNot(BeNil()) mockSender.EXPECT().onStreamCompleted(streamID) frame.Handler.OnAcked(frame.Frame) }) It("says when a stream is completed, if Close() is called before popping the frame", func() { mockSender.EXPECT().onHasStreamData(streamID).Times(2) done := make(chan struct{}) go func() { defer GinkgoRecover() _, err := strWithTimeout.Write(make([]byte, 100)) Expect(err).ToNot(HaveOccurred()) close(done) }() waitForWrite() Eventually(done).Should(BeClosed()) Expect(str.Close()).To(Succeed()) frame, ok, hasMoreData := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) Expect(ok).To(BeTrue()) Expect(hasMoreData).To(BeFalse()) Expect(frame).ToNot(BeNil()) Expect(frame.Frame.Fin).To(BeTrue()) mockSender.EXPECT().onStreamCompleted(streamID) frame.Handler.OnAcked(frame.Frame) }) It("doesn't say it's completed when there are frames waiting to be retransmitted", func() { mockSender.EXPECT().onHasStreamData(streamID) done := make(chan struct{}) go func() { defer GinkgoRecover() _, err := strWithTimeout.Write(getData(100)) Expect(err).ToNot(HaveOccurred()) mockSender.EXPECT().onHasStreamData(streamID) Expect(str.Close()).To(Succeed()) close(done) }() waitForWrite() // get a bunch of small frames (max. 20 bytes) var frames []ackhandler.StreamFrame for { frame, ok, _ := str.popStreamFrame(20, protocol.Version1) if !ok { continue } frames = append(frames, frame) if frame.Frame.Fin { break } } Eventually(done).Should(BeClosed()) // lose the first frame, acknowledge all others for _, f := range frames[1:] { f.Handler.OnAcked(f.Frame) } mockSender.EXPECT().onHasStreamData(streamID) frames[0].Handler.OnLost(frames[0].Frame) // get the retransmission and acknowledge it ret, ok, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) Expect(ok).To(BeTrue()) Expect(ret).ToNot(BeNil()) mockSender.EXPECT().onStreamCompleted(streamID) ret.Handler.OnAcked(ret.Frame) }) // This test is kind of an integration test. // It writes 4 MB of data, and pops STREAM frames that sometimes are and sometimes aren't limited by flow control. // Half of these STREAM frames are then received and their content saved, while the other half is reported lost // and has to be retransmitted. It("retransmits data until everything has been acknowledged", func() { const dataLen = 1 << 22 // 4 MB mockSender.EXPECT().onHasStreamData(streamID).AnyTimes() mockFC.EXPECT().SendWindowSize().DoAndReturn(func() protocol.ByteCount { return protocol.ByteCount(mrand.Intn(500)) + 50 }).AnyTimes() mockFC.EXPECT().AddBytesSent(gomock.Any()).AnyTimes() data := make([]byte, dataLen) _, err := rand.Read(data) Expect(err).ToNot(HaveOccurred()) done := make(chan struct{}) go func() { defer GinkgoRecover() defer close(done) _, err := str.Write(data) Expect(err).ToNot(HaveOccurred()) str.Close() }() var completed bool mockSender.EXPECT().onStreamCompleted(streamID).Do(func(protocol.StreamID) { completed = true }) received := make([]byte, dataLen) for { if completed { break } f, ok, _ := str.popStreamFrame(protocol.ByteCount(mrand.Intn(300)+100), protocol.Version1) if !ok { continue } sf := f.Frame // 50%: acknowledge the frame and save the data // 50%: lose the frame if mrand.Intn(100) < 50 { copy(received[sf.Offset:sf.Offset+sf.DataLen()], sf.Data) f.Handler.OnAcked(f.Frame) } else { f.Handler.OnLost(f.Frame) } } Expect(received).To(Equal(data)) }) }) })