pass a callback containing the callbacks to the stream

This commit is contained in:
Marten Seemann 2017-12-18 18:37:37 +07:00
parent 44cff87e53
commit fc8fafd15e
12 changed files with 218 additions and 165 deletions

View file

@ -6,6 +6,7 @@ import (
"runtime"
"time"
"github.com/golang/mock/gomock"
"github.com/lucas-clemente/quic-go/internal/mocks"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/wire"
@ -19,21 +20,16 @@ var _ = Describe("Receive Stream", func() {
const streamID protocol.StreamID = 1337
var (
str *receiveStream
strWithTimeout io.Reader // str wrapped with gbytes.TimeoutReader
onDataCalled bool
queuedControlFrames []wire.Frame
mockFC *mocks.MockStreamFlowController
str *receiveStream
strWithTimeout io.Reader // str wrapped with gbytes.TimeoutReader
mockFC *mocks.MockStreamFlowController
mockSender *MockStreamSender
)
onData := func() { onDataCalled = true }
queueControlFrame := func(f wire.Frame) { queuedControlFrames = append(queuedControlFrames, f) }
BeforeEach(func() {
queuedControlFrames = queuedControlFrames[:0]
onDataCalled = false
mockSender = NewMockStreamSender(mockCtrl)
mockFC = mocks.NewMockStreamFlowController(mockCtrl)
str = newReceiveStream(streamID, onData, queueControlFrame, mockFC)
str = newReceiveStream(streamID, mockSender, mockFC)
timeout := scaleDuration(250 * time.Millisecond)
strWithTimeout = gbytes.TimeoutReader(str, timeout)
@ -45,6 +41,7 @@ var _ = Describe("Receive Stream", func() {
Context("reading", func() {
It("reads a single STREAM frame", func() {
mockSender.EXPECT().scheduleSending()
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4))
frame := wire.StreamFrame{
@ -61,6 +58,7 @@ var _ = Describe("Receive Stream", func() {
})
It("reads a single STREAM frame in multiple goes", func() {
mockSender.EXPECT().scheduleSending().Times(2)
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
@ -82,6 +80,7 @@ var _ = Describe("Receive Stream", func() {
})
It("reads all data available", func() {
mockSender.EXPECT().scheduleSending().Times(2)
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
@ -104,7 +103,8 @@ var _ = Describe("Receive Stream", func() {
Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x00}))
})
It("assembles multiple StreamFrames", func() {
It("assembles multiple STREAM frames", func() {
mockSender.EXPECT().scheduleSending().Times(2)
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
@ -128,6 +128,7 @@ var _ = Describe("Receive Stream", func() {
})
It("waits until data is available", func() {
mockSender.EXPECT().scheduleSending()
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
go func() {
@ -144,6 +145,7 @@ var _ = Describe("Receive Stream", func() {
})
It("handles STREAM frames in wrong order", func() {
mockSender.EXPECT().scheduleSending().Times(2)
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
@ -167,6 +169,7 @@ var _ = Describe("Receive Stream", func() {
})
It("ignores duplicate STREAM frames", func() {
mockSender.EXPECT().scheduleSending().Times(2)
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
@ -197,6 +200,7 @@ var _ = Describe("Receive Stream", func() {
})
It("doesn't rejects a STREAM frames with an overlapping data range", func() {
mockSender.EXPECT().scheduleSending().Times(2)
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), false)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
@ -220,21 +224,6 @@ var _ = Describe("Receive Stream", func() {
Expect(b).To(Equal([]byte("foobar")))
})
It("calls onData", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4))
frame := wire.StreamFrame{
Offset: 0,
Data: []byte{0xDE, 0xAD, 0xBE, 0xEF},
}
err := str.handleStreamFrame(&frame)
Expect(err).ToNot(HaveOccurred())
b := make([]byte, 4)
_, err = strWithTimeout.Read(b)
Expect(err).ToNot(HaveOccurred())
Expect(onDataCalled).To(BeTrue())
})
It("passes on errors from the streamFrameSorter", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), false)
err := str.handleStreamFrame(&wire.StreamFrame{StreamID: streamID}) // STREAM frame without data
@ -311,6 +300,7 @@ var _ = Describe("Receive Stream", func() {
Context("closing", func() {
Context("with FIN bit", func() {
It("returns EOFs", func() {
mockSender.EXPECT().scheduleSending()
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), true)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4))
frame := wire.StreamFrame{
@ -330,6 +320,7 @@ var _ = Describe("Receive Stream", func() {
})
It("handles out-of-order frames", func() {
mockSender.EXPECT().scheduleSending().Times(2)
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), true)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
@ -357,6 +348,7 @@ var _ = Describe("Receive Stream", func() {
})
It("returns EOFs with partial read", func() {
mockSender.EXPECT().scheduleSending()
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), true)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
frame := wire.StreamFrame{
@ -374,6 +366,7 @@ var _ = Describe("Receive Stream", func() {
})
It("handles immediate FINs", func() {
mockSender.EXPECT().scheduleSending()
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(0))
frame := wire.StreamFrame{
@ -391,6 +384,7 @@ var _ = Describe("Receive Stream", func() {
})
It("closes when CloseRemote is called", func() {
mockSender.EXPECT().scheduleSending()
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(0))
str.CloseRemote(0)
@ -432,6 +426,7 @@ var _ = Describe("Receive Stream", func() {
Context("stream cancelations", func() {
Context("canceling read", func() {
It("unblocks Read", func() {
mockSender.EXPECT().queueControlFrame(gomock.Any())
done := make(chan struct{})
go func() {
defer GinkgoRecover()
@ -446,6 +441,7 @@ var _ = Describe("Receive Stream", func() {
})
It("doesn't allow further calls to Read", func() {
mockSender.EXPECT().queueControlFrame(gomock.Any())
err := str.CancelRead(1234)
Expect(err).ToNot(HaveOccurred())
_, err = strWithTimeout.Read([]byte{0})
@ -453,6 +449,7 @@ var _ = Describe("Receive Stream", func() {
})
It("does nothing when CancelRead is called twice", func() {
mockSender.EXPECT().queueControlFrame(gomock.Any())
err := str.CancelRead(1234)
Expect(err).ToNot(HaveOccurred())
err = str.CancelRead(2345)
@ -462,8 +459,10 @@ var _ = Describe("Receive Stream", func() {
})
It("doesn't send a RST_STREAM frame, if the FIN was already read", func() {
mockSender.EXPECT().scheduleSending()
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), true)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(6))
// no calls to mockSender.queueControlFrame
err := str.handleStreamFrame(&wire.StreamFrame{
StreamID: streamID,
Data: []byte("foobar"),
@ -474,20 +473,19 @@ var _ = Describe("Receive Stream", func() {
Expect(err).To(MatchError(io.EOF))
err = str.CancelRead(1234)
Expect(err).ToNot(HaveOccurred())
Expect(queuedControlFrames).To(BeEmpty()) // no RST_STREAM frame queued yet
})
Context("for IETF QUIC", func() {
It("queues a STOP_SENDING frame", func() {
err := str.CancelRead(1234)
Expect(err).ToNot(HaveOccurred())
Expect(queuedControlFrames).To(Equal([]wire.Frame{
&wire.StopSendingFrame{
StreamID: streamID,
ErrorCode: 1234,
},
}))
It("queues a STOP_SENDING frame, for IETF QUIC", func() {
mockSender.EXPECT().queueControlFrame(&wire.StopSendingFrame{
StreamID: streamID,
ErrorCode: 1234,
})
err := str.CancelRead(1234)
Expect(err).ToNot(HaveOccurred())
})
It("doesn't queue a STOP_SENDING frame, for gQUIC", func() {
})
})
@ -570,7 +568,8 @@ var _ = Describe("Receive Stream", func() {
Eventually(readReturned).Should(BeClosed())
})
It("sends a RST_STREAM and continues reading until the end when receiving a RST_STREAM frame with error code 0", func() {
It("continues reading until the end when receiving a RST_STREAM frame with error code 0", func() {
mockSender.EXPECT().scheduleSending().Times(2)
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), true).Times(2)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4))
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
@ -600,7 +599,6 @@ var _ = Describe("Receive Stream", func() {
Expect(err).ToNot(HaveOccurred())
Eventually(readReturned).Should(BeClosed())
})
})
})
})
@ -639,6 +637,7 @@ var _ = Describe("Receive Stream", func() {
})
It("is finished if it is only closed for reading", func() {
mockSender.EXPECT().scheduleSending()
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(0))
finishReading()
Expect(str.finished()).To(BeTrue())
@ -647,6 +646,7 @@ var _ = Describe("Receive Stream", func() {
// the stream still needs to stay alive until we receive the final offset
// (either by receiving a STREAM frame with FIN, or a RST_STREAM)
It("is not finished after CancelRead", func() {
mockSender.EXPECT().queueControlFrame(gomock.Any())
err := str.CancelRead(123)
Expect(err).ToNot(HaveOccurred())
Expect(str.finished()).To(BeFalse())