use the new gbytes.TimeoutReader and TimeoutWriter in the stream tests

This makes sure that our tests fail, even if stream.Read or stream.Write
block.
This commit is contained in:
Marten Seemann 2017-07-17 15:31:06 +07:00
parent d4fb1c3402
commit 315aa14ab5

View file

@ -14,14 +14,16 @@ import (
"github.com/lucas-clemente/quic-go/protocol"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
)
var _ = Describe("Stream", func() {
const streamID protocol.StreamID = 1337
var (
str *stream
onDataCalled bool
str *stream
strWithTimeout io.ReadWriter // str wrapped with gbytes.Timeout{Reader,Writer}
onDataCalled bool
resetCalled bool
resetCalledForStream protocol.StreamID
@ -57,6 +59,15 @@ var _ = Describe("Stream", func() {
resetCalled = false
mockFcm = mocks_fc.NewMockFlowControlManager(mockCtrl)
str = newStream(streamID, onData, onReset, mockFcm)
timeout := scaleDuration(250 * time.Millisecond)
strWithTimeout = struct {
io.Reader
io.Writer
}{
gbytes.TimeoutReader(str, timeout),
gbytes.TimeoutWriter(str, timeout),
}
})
It("gets stream id", func() {
@ -74,7 +85,7 @@ var _ = Describe("Stream", func() {
err := str.AddStreamFrame(&frame)
Expect(err).ToNot(HaveOccurred())
b := make([]byte, 4)
n, err := str.Read(b)
n, err := strWithTimeout.Read(b)
Expect(err).ToNot(HaveOccurred())
Expect(n).To(Equal(4))
Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
@ -91,11 +102,11 @@ var _ = Describe("Stream", func() {
err := str.AddStreamFrame(&frame)
Expect(err).ToNot(HaveOccurred())
b := make([]byte, 2)
n, err := str.Read(b)
n, err := strWithTimeout.Read(b)
Expect(err).ToNot(HaveOccurred())
Expect(n).To(Equal(2))
Expect(b).To(Equal([]byte{0xDE, 0xAD}))
n, err = str.Read(b)
n, err = strWithTimeout.Read(b)
Expect(err).ToNot(HaveOccurred())
Expect(n).To(Equal(2))
Expect(b).To(Equal([]byte{0xBE, 0xEF}))
@ -118,7 +129,7 @@ var _ = Describe("Stream", func() {
err = str.AddStreamFrame(&frame2)
Expect(err).ToNot(HaveOccurred())
b := make([]byte, 6)
n, err := str.Read(b)
n, err := strWithTimeout.Read(b)
Expect(err).ToNot(HaveOccurred())
Expect(n).To(Equal(4))
Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x00}))
@ -141,7 +152,7 @@ var _ = Describe("Stream", func() {
err = str.AddStreamFrame(&frame2)
Expect(err).ToNot(HaveOccurred())
b := make([]byte, 4)
n, err := str.Read(b)
n, err := strWithTimeout.Read(b)
Expect(err).ToNot(HaveOccurred())
Expect(n).To(Equal(4))
Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
@ -158,7 +169,7 @@ var _ = Describe("Stream", func() {
Expect(err).ToNot(HaveOccurred())
}()
b := make([]byte, 2)
n, err := str.Read(b)
n, err := strWithTimeout.Read(b)
Expect(err).ToNot(HaveOccurred())
Expect(n).To(Equal(2))
})
@ -180,7 +191,7 @@ var _ = Describe("Stream", func() {
err = str.AddStreamFrame(&frame2)
Expect(err).ToNot(HaveOccurred())
b := make([]byte, 4)
n, err := str.Read(b)
n, err := strWithTimeout.Read(b)
Expect(err).ToNot(HaveOccurred())
Expect(n).To(Equal(4))
Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
@ -210,7 +221,7 @@ var _ = Describe("Stream", func() {
err = str.AddStreamFrame(&frame3)
Expect(err).ToNot(HaveOccurred())
b := make([]byte, 4)
n, err := str.Read(b)
n, err := strWithTimeout.Read(b)
Expect(err).ToNot(HaveOccurred())
Expect(n).To(Equal(4))
Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
@ -234,7 +245,7 @@ var _ = Describe("Stream", func() {
err = str.AddStreamFrame(&frame2)
Expect(err).ToNot(HaveOccurred())
b := make([]byte, 6)
n, err := str.Read(b)
n, err := strWithTimeout.Read(b)
Expect(err).ToNot(HaveOccurred())
Expect(n).To(Equal(6))
Expect(b).To(Equal([]byte("foobar")))
@ -249,7 +260,7 @@ var _ = Describe("Stream", func() {
}
str.AddStreamFrame(&frame)
b := make([]byte, 4)
_, err := str.Read(b)
_, err := strWithTimeout.Read(b)
Expect(err).ToNot(HaveOccurred())
Expect(onDataCalled).To(BeTrue())
})
@ -267,7 +278,7 @@ var _ = Describe("Stream", func() {
Expect(err).ToNot(HaveOccurred())
str.SetReadDeadline(time.Now().Add(-time.Second))
b := make([]byte, 6)
n, err := str.Read(b)
n, err := strWithTimeout.Read(b)
Expect(err).To(MatchError(errDeadline))
Expect(n).To(BeZero())
})
@ -276,7 +287,7 @@ var _ = Describe("Stream", func() {
deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
str.SetReadDeadline(deadline)
b := make([]byte, 6)
n, err := str.Read(b)
n, err := strWithTimeout.Read(b)
Expect(err).To(MatchError(errDeadline))
Expect(n).To(BeZero())
Expect(time.Now()).To(BeTemporally("~", deadline, scaleDuration(10*time.Millisecond)))
@ -295,7 +306,7 @@ var _ = Describe("Stream", func() {
}()
runtime.Gosched()
b := make([]byte, 10)
n, err := str.Read(b)
n, err := strWithTimeout.Read(b)
Expect(err).To(MatchError(errDeadline))
Expect(n).To(BeZero())
Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(20*time.Millisecond)))
@ -314,7 +325,7 @@ var _ = Describe("Stream", func() {
str.SetReadDeadline(deadline1)
runtime.Gosched()
b := make([]byte, 10)
_, err := str.Read(b)
_, err := strWithTimeout.Read(b)
Expect(err).To(MatchError(errDeadline))
Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(25*time.Millisecond)))
})
@ -326,7 +337,7 @@ var _ = Describe("Stream", func() {
Expect(err).ToNot(HaveOccurred())
str.SetDeadline(time.Now().Add(-time.Second))
b := make([]byte, 6)
n, err := str.Read(b)
n, err := strWithTimeout.Read(b)
Expect(err).To(MatchError(errDeadline))
Expect(n).To(BeZero())
})
@ -344,11 +355,11 @@ var _ = Describe("Stream", func() {
}
str.AddStreamFrame(&frame)
b := make([]byte, 4)
n, err := str.Read(b)
n, err := strWithTimeout.Read(b)
Expect(err).To(MatchError(io.EOF))
Expect(n).To(Equal(4))
Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
n, err = str.Read(b)
n, err = strWithTimeout.Read(b)
Expect(n).To(BeZero())
Expect(err).To(MatchError(io.EOF))
})
@ -371,11 +382,11 @@ var _ = Describe("Stream", func() {
err = str.AddStreamFrame(&frame2)
Expect(err).ToNot(HaveOccurred())
b := make([]byte, 4)
n, err := str.Read(b)
n, err := strWithTimeout.Read(b)
Expect(err).To(MatchError(io.EOF))
Expect(n).To(Equal(4))
Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
n, err = str.Read(b)
n, err = strWithTimeout.Read(b)
Expect(n).To(BeZero())
Expect(err).To(MatchError(io.EOF))
})
@ -391,7 +402,7 @@ var _ = Describe("Stream", func() {
err := str.AddStreamFrame(&frame)
Expect(err).ToNot(HaveOccurred())
b := make([]byte, 4)
n, err := str.Read(b)
n, err := strWithTimeout.Read(b)
Expect(err).To(MatchError(io.EOF))
Expect(n).To(Equal(2))
Expect(b[:n]).To(Equal([]byte{0xDE, 0xAD}))
@ -408,7 +419,7 @@ var _ = Describe("Stream", func() {
err := str.AddStreamFrame(&frame)
Expect(err).ToNot(HaveOccurred())
b := make([]byte, 4)
n, err := str.Read(b)
n, err := strWithTimeout.Read(b)
Expect(n).To(BeZero())
Expect(err).To(MatchError(io.EOF))
})
@ -420,7 +431,7 @@ var _ = Describe("Stream", func() {
mockFcm.EXPECT().AddBytesRead(streamID, protocol.ByteCount(0))
str.CloseRemote(0)
b := make([]byte, 8)
n, err := str.Read(b)
n, err := strWithTimeout.Read(b)
Expect(n).To(BeZero())
Expect(err).To(MatchError(io.EOF))
})
@ -431,25 +442,24 @@ var _ = Describe("Stream", func() {
testErr := errors.New("test error")
It("immediately returns all reads", func() {
var readReturned bool
var n int
var err error
done := make(chan struct{})
b := make([]byte, 4)
go func() {
n, err = str.Read(b)
readReturned = true
defer GinkgoRecover()
n, err := strWithTimeout.Read(b)
Expect(n).To(BeZero())
Expect(err).To(MatchError(testErr))
close(done)
}()
Consistently(func() bool { return readReturned }).Should(BeFalse())
Consistently(done).ShouldNot(BeClosed())
str.Cancel(testErr)
Eventually(func() bool { return readReturned }).Should(BeTrue())
Expect(n).To(BeZero())
Expect(err).To(MatchError(testErr))
Eventually(done).Should(BeClosed())
})
It("errors for all following reads", func() {
str.Cancel(testErr)
b := make([]byte, 1)
n, err := str.Read(b)
n, err := strWithTimeout.Read(b)
Expect(n).To(BeZero())
Expect(err).To(MatchError(testErr))
})
@ -469,7 +479,7 @@ var _ = Describe("Stream", func() {
str.AddStreamFrame(&frame)
str.RegisterRemoteError(testErr)
b := make([]byte, 4)
n, err := str.Read(b)
n, err := strWithTimeout.Read(b)
Expect(err).ToNot(HaveOccurred())
Expect(n).To(Equal(4))
})
@ -484,7 +494,7 @@ var _ = Describe("Stream", func() {
err := str.AddStreamFrame(&frame)
Expect(err).ToNot(HaveOccurred())
b := make([]byte, 4)
n, err := str.Read(b)
n, err := strWithTimeout.Read(b)
Expect(err).ToNot(HaveOccurred())
Expect(n).To(Equal(4))
})
@ -498,7 +508,7 @@ var _ = Describe("Stream", func() {
str.AddStreamFrame(&frame)
str.RegisterRemoteError(testErr)
b := make([]byte, 10)
n, err := str.Read(b)
n, err := strWithTimeout.Read(b)
Expect(b[0:4]).To(Equal(frame.Data))
Expect(err).To(MatchError(testErr))
Expect(n).To(Equal(4))
@ -514,7 +524,7 @@ var _ = Describe("Stream", func() {
str.AddStreamFrame(&frame)
str.RegisterRemoteError(testErr)
b := make([]byte, 10)
n, err := str.Read(b)
n, err := strWithTimeout.Read(b)
Expect(b[:4]).To(Equal(frame.Data))
Expect(err).To(MatchError(io.EOF))
Expect(n).To(Equal(4))
@ -530,11 +540,11 @@ var _ = Describe("Stream", func() {
str.AddStreamFrame(&frame)
str.RegisterRemoteError(testErr)
b := make([]byte, 3)
_, err := str.Read(b)
_, err := strWithTimeout.Read(b)
Expect(err).ToNot(HaveOccurred())
Expect(b).To(Equal([]byte{0xde, 0xad, 0xbe}))
b = make([]byte, 3)
n, err := str.Read(b)
n, err := strWithTimeout.Read(b)
Expect(err).To(MatchError(io.EOF))
Expect(b[:1]).To(Equal([]byte{0xef}))
Expect(n).To(Equal(1))
@ -551,54 +561,51 @@ var _ = Describe("Stream", func() {
str.AddStreamFrame(&frame)
str.RegisterRemoteError(testErr)
b := make([]byte, 3)
_, err := str.Read(b)
_, err := strWithTimeout.Read(b)
Expect(err).ToNot(HaveOccurred())
})
It("stops writing after receiving a remote error", func() {
var writeReturned bool
var n int
var err error
done := make(chan struct{})
go func() {
n, err = str.Write([]byte("foobar"))
writeReturned = true
defer GinkgoRecover()
n, err := strWithTimeout.Write([]byte("foobar"))
Expect(n).To(BeZero())
Expect(err).To(MatchError(testErr))
close(done)
}()
str.RegisterRemoteError(testErr)
Eventually(func() bool { return writeReturned }).Should(BeTrue())
Expect(n).To(BeZero())
Expect(err).To(MatchError(testErr))
Eventually(done).Should(BeClosed())
})
It("returns how much was written when recieving a remote error", func() {
var writeReturned bool
var n int
var err error
done := make(chan struct{})
go func() {
n, err = str.Write([]byte("foobar"))
writeReturned = true
defer GinkgoRecover()
n, err := strWithTimeout.Write([]byte("foobar"))
Expect(err).To(MatchError(testErr))
Expect(n).To(Equal(4))
close(done)
}()
Eventually(func() []byte { return str.getDataForWriting(4) }).ShouldNot(BeEmpty())
str.RegisterRemoteError(testErr)
Eventually(func() bool { return writeReturned }).Should(BeTrue())
Expect(err).To(MatchError(testErr))
Expect(n).To(Equal(4))
Eventually(done).Should(BeClosed())
})
It("calls onReset when receiving a remote error", func() {
var writeReturned bool
done := make(chan struct{})
str.writeOffset = 0x1000
go func() {
str.Write([]byte("foobar"))
writeReturned = true
_, _ = strWithTimeout.Write([]byte("foobar"))
close(done)
}()
str.RegisterRemoteError(testErr)
Expect(resetCalled).To(BeTrue())
Expect(resetCalledForStream).To(Equal(protocol.StreamID(1337)))
Expect(resetCalledAtOffset).To(Equal(protocol.ByteCount(0x1000)))
Eventually(func() bool { return writeReturned }).Should(BeTrue())
Eventually(done).Should(BeClosed())
})
It("doesn't call onReset if it already sent a FIN", func() {
@ -627,45 +634,41 @@ var _ = Describe("Stream", func() {
Context("reset locally", func() {
It("stops writing", func() {
var writeReturned bool
var n int
var err error
done := make(chan struct{})
go func() {
n, err = str.Write([]byte("foobar"))
writeReturned = true
defer GinkgoRecover()
n, err := strWithTimeout.Write([]byte("foobar"))
Expect(n).To(BeZero())
Expect(err).To(MatchError(testErr))
close(done)
}()
Consistently(func() bool { return writeReturned }).Should(BeFalse())
Consistently(done).ShouldNot(BeClosed())
str.Reset(testErr)
Expect(str.getDataForWriting(6)).To(BeNil())
Eventually(func() bool { return writeReturned }).Should(BeTrue())
Expect(n).To(BeZero())
Expect(err).To(MatchError(testErr))
Eventually(done).Should(BeClosed())
})
It("doesn't allow further writes", func() {
str.Reset(testErr)
n, err := str.Write([]byte("foobar"))
n, err := strWithTimeout.Write([]byte("foobar"))
Expect(n).To(BeZero())
Expect(err).To(MatchError(testErr))
Expect(str.getDataForWriting(6)).To(BeNil())
})
It("stops reading", func() {
var readReturned bool
var n int
var err error
done := make(chan struct{})
go func() {
defer GinkgoRecover()
b := make([]byte, 4)
n, err = str.Read(b)
readReturned = true
n, err := strWithTimeout.Read(b)
Expect(n).To(BeZero())
Expect(err).To(MatchError(testErr))
close(done)
}()
Consistently(func() bool { return readReturned }).Should(BeFalse())
Consistently(done).ShouldNot(BeClosed())
str.Reset(testErr)
Eventually(func() bool { return readReturned }).Should(BeTrue())
Expect(n).To(BeZero())
Expect(err).To(MatchError(testErr))
Eventually(done).Should(BeClosed())
})
It("doesn't allow further reads", func() {
@ -675,7 +678,7 @@ var _ = Describe("Stream", func() {
})
str.Reset(testErr)
b := make([]byte, 6)
n, err := str.Read(b)
n, err := strWithTimeout.Read(b)
Expect(n).To(BeZero())
Expect(err).To(MatchError(testErr))
})
@ -714,44 +717,45 @@ var _ = Describe("Stream", func() {
})
Context("writing", func() {
It("writes and gets all data at once", func(done Done) {
var writeReturned bool
It("writes and gets all data at once", func() {
done := make(chan struct{})
go func() {
n, err := str.Write([]byte("foobar"))
defer GinkgoRecover()
n, err := strWithTimeout.Write([]byte("foobar"))
Expect(err).ToNot(HaveOccurred())
Expect(n).To(Equal(6))
writeReturned = true
close(done)
}()
Eventually(func() []byte {
str.mutex.Lock()
defer str.mutex.Unlock()
return str.dataForWriting
}).Should(Equal([]byte("foobar")))
Consistently(func() bool { return writeReturned }).Should(BeFalse())
Consistently(done).ShouldNot(BeClosed())
Expect(onDataCalled).To(BeTrue())
Expect(str.lenOfDataForWriting()).To(Equal(protocol.ByteCount(6)))
data := str.getDataForWriting(1000)
Expect(data).To(Equal([]byte("foobar")))
Expect(str.writeOffset).To(Equal(protocol.ByteCount(6)))
Expect(str.dataForWriting).To(BeNil())
Eventually(func() bool { return writeReturned }).Should(BeTrue())
close(done)
Eventually(done).Should(BeClosed())
})
It("writes and gets data in two turns", func(done Done) {
var writeReturned bool
It("writes and gets data in two turns", func() {
done := make(chan struct{})
go func() {
n, err := str.Write([]byte("foobar"))
defer GinkgoRecover()
n, err := strWithTimeout.Write([]byte("foobar"))
Expect(err).ToNot(HaveOccurred())
Expect(n).To(Equal(6))
writeReturned = true
close(done)
}()
Eventually(func() []byte {
str.mutex.Lock()
defer str.mutex.Unlock()
return str.dataForWriting
}).Should(Equal([]byte("foobar")))
Consistently(func() bool { return writeReturned }).Should(BeFalse())
Consistently(done).ShouldNot(BeClosed())
Expect(str.lenOfDataForWriting()).To(Equal(protocol.ByteCount(6)))
data := str.getDataForWriting(3)
Expect(data).To(Equal([]byte("foo")))
@ -763,8 +767,7 @@ var _ = Describe("Stream", func() {
Expect(str.writeOffset).To(Equal(protocol.ByteCount(6)))
Expect(str.dataForWriting).To(BeNil())
Expect(str.lenOfDataForWriting()).To(Equal(protocol.ByteCount(0)))
Eventually(func() bool { return writeReturned }).Should(BeTrue())
close(done)
Eventually(done).Should(BeClosed())
})
It("getDataForWriting returns nil if no data is available", func() {
@ -774,7 +777,8 @@ var _ = Describe("Stream", func() {
It("copies the slice while writing", func() {
s := []byte("foo")
go func() {
n, err := str.Write(s)
defer GinkgoRecover()
n, err := strWithTimeout.Write(s)
Expect(err).ToNot(HaveOccurred())
Expect(n).To(Equal(3))
}()
@ -784,13 +788,13 @@ var _ = Describe("Stream", func() {
})
It("returns when given a nil input", func() {
n, err := str.Write(nil)
n, err := strWithTimeout.Write(nil)
Expect(n).To(BeZero())
Expect(err).ToNot(HaveOccurred())
})
It("returns when given an empty slice", func() {
n, err := str.Write([]byte(""))
n, err := strWithTimeout.Write([]byte(""))
Expect(n).To(BeZero())
Expect(err).ToNot(HaveOccurred())
})
@ -798,7 +802,7 @@ var _ = Describe("Stream", func() {
Context("deadlines", func() {
It("returns an error when Write is called after the deadline", func() {
str.SetWriteDeadline(time.Now().Add(-time.Second))
n, err := str.Write([]byte("foobar"))
n, err := strWithTimeout.Write([]byte("foobar"))
Expect(err).To(MatchError(errDeadline))
Expect(n).To(BeZero())
})
@ -806,7 +810,7 @@ var _ = Describe("Stream", func() {
It("unblocks after the deadline", func() {
deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
str.SetWriteDeadline(deadline)
n, err := str.Write([]byte("foobar"))
n, err := strWithTimeout.Write([]byte("foobar"))
Expect(err).To(MatchError(errDeadline))
Expect(n).To(BeZero())
Expect(time.Now()).To(BeTemporally("~", deadline, scaleDuration(20*time.Millisecond)))
@ -824,7 +828,7 @@ var _ = Describe("Stream", func() {
Expect(time.Now()).To(BeTemporally("<", deadline1))
}()
runtime.Gosched()
n, err := str.Write([]byte("foobar"))
n, err := strWithTimeout.Write([]byte("foobar"))
Expect(err).To(MatchError(errDeadline))
Expect(n).To(BeZero())
Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(20*time.Millisecond)))
@ -842,14 +846,14 @@ var _ = Describe("Stream", func() {
}()
str.SetWriteDeadline(deadline1)
runtime.Gosched()
_, err := str.Write([]byte("foobar"))
_, err := strWithTimeout.Write([]byte("foobar"))
Expect(err).To(MatchError(errDeadline))
Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(20*time.Millisecond)))
})
It("sets a read deadline, when SetDeadline is called", func() {
str.SetDeadline(time.Now().Add(-time.Second))
n, err := str.Write([]byte("foobar"))
n, err := strWithTimeout.Write([]byte("foobar"))
Expect(err).To(MatchError(errDeadline))
Expect(n).To(BeZero())
})
@ -894,14 +898,15 @@ var _ = Describe("Stream", func() {
It("returns errors when the stream is cancelled", func() {
str.Cancel(testErr)
n, err := str.Write([]byte("foo"))
n, err := strWithTimeout.Write([]byte("foo"))
Expect(n).To(BeZero())
Expect(err).To(MatchError(testErr))
})
It("doesn't get data for writing if an error occurred", func() {
go func() {
_, err := str.Write([]byte("foobar"))
defer GinkgoRecover()
_, err := strWithTimeout.Write([]byte("foobar"))
Expect(err).To(MatchError(testErr))
}()
Eventually(func() []byte { return str.dataForWriting }).ShouldNot(BeNil())
@ -932,7 +937,7 @@ var _ = Describe("Stream", func() {
err := str.AddStreamFrame(&frames.StreamFrame{FinBit: true})
Expect(err).ToNot(HaveOccurred())
b := make([]byte, 100)
_, err = str.Read(b)
_, err = strWithTimeout.Read(b)
Expect(err).To(MatchError(io.EOF))
}