return a Stream, not streamI, from streamsMap Open and Accept methods

This commit is contained in:
Marten Seemann 2017-12-26 09:41:00 +07:00
parent 78d7f6fdba
commit 7a3209b3a4
2 changed files with 41 additions and 26 deletions

View file

@ -160,7 +160,7 @@ func (m *streamsMap) openStreamImpl() (streamI, error) {
}
// OpenStream opens the next available stream
func (m *streamsMap) OpenStream() (streamI, error) {
func (m *streamsMap) OpenStream() (Stream, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
@ -170,7 +170,7 @@ func (m *streamsMap) OpenStream() (streamI, error) {
return m.openStreamImpl()
}
func (m *streamsMap) OpenStreamSync() (streamI, error) {
func (m *streamsMap) OpenStreamSync() (Stream, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
@ -191,7 +191,7 @@ func (m *streamsMap) OpenStreamSync() (streamI, error) {
// AcceptStream returns the next stream opened by the peer
// it blocks until a new stream is opened
func (m *streamsMap) AcceptStream() (streamI, error) {
func (m *streamsMap) AcceptStream() (Stream, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
var str streamI

View file

@ -210,19 +210,18 @@ var _ = Describe("Streams Map", func() {
It("waits until another stream is closed", func() {
openMaxNumStreams()
var returned bool
var str streamI
var str Stream
done := make(chan struct{})
go func() {
defer GinkgoRecover()
var err error
str, err = m.OpenStreamSync()
Expect(err).ToNot(HaveOccurred())
returned = true
close(done)
}()
Consistently(func() bool { return returned }).Should(BeFalse())
Consistently(done).ShouldNot(BeClosed())
deleteStream(6)
Eventually(func() bool { return returned }).Should(BeTrue())
Eventually(done).Should(BeClosed())
Expect(str.StreamID()).To(Equal(protocol.StreamID(2*maxOutgoingStreams + 2)))
})
@ -267,95 +266,109 @@ var _ = Describe("Streams Map", func() {
It("starts with stream 1, if the crypto stream is stream 0", func() {
setNewStreamsMap(protocol.PerspectiveServer, versionIETFFrames)
var str streamI
var str Stream
done := make(chan struct{})
go func() {
defer GinkgoRecover()
var err error
str, err = m.AcceptStream()
Expect(err).ToNot(HaveOccurred())
close(done)
}()
_, err := m.GetOrOpenStream(1)
Expect(err).ToNot(HaveOccurred())
Eventually(func() Stream { return str }).ShouldNot(BeNil())
Eventually(done).Should(BeClosed())
Expect(str.StreamID()).To(Equal(protocol.StreamID(1)))
})
It("starts with stream 3, if the crypto stream is stream 1", func() {
var str streamI
var str Stream
done := make(chan struct{})
go func() {
defer GinkgoRecover()
var err error
str, err = m.AcceptStream()
Expect(err).ToNot(HaveOccurred())
close(done)
}()
_, err := m.GetOrOpenStream(3)
Expect(err).ToNot(HaveOccurred())
Eventually(func() Stream { return str }).ShouldNot(BeNil())
Eventually(done).Should(BeClosed())
Expect(str.StreamID()).To(Equal(protocol.StreamID(3)))
})
It("returns an implicitly opened stream, if a stream number is skipped", func() {
var str streamI
var str Stream
done := make(chan struct{})
go func() {
defer GinkgoRecover()
var err error
str, err = m.AcceptStream()
Expect(err).ToNot(HaveOccurred())
close(done)
}()
_, err := m.GetOrOpenStream(5)
Expect(err).ToNot(HaveOccurred())
Eventually(func() Stream { return str }).ShouldNot(BeNil())
Eventually(done).Should(BeClosed())
Expect(str.StreamID()).To(Equal(protocol.StreamID(3)))
})
It("returns to multiple accepts", func() {
var str1, str2 streamI
var str1, str2 Stream
done1 := make(chan struct{})
done2 := make(chan struct{})
go func() {
defer GinkgoRecover()
var err error
str1, err = m.AcceptStream()
Expect(err).ToNot(HaveOccurred())
close(done1)
}()
go func() {
defer GinkgoRecover()
var err error
str2, err = m.AcceptStream()
Expect(err).ToNot(HaveOccurred())
close(done2)
}()
_, err := m.GetOrOpenStream(5) // opens stream 3 and 5
Expect(err).ToNot(HaveOccurred())
Eventually(func() streamI { return str1 }).ShouldNot(BeNil())
Eventually(func() streamI { return str2 }).ShouldNot(BeNil())
Eventually(done1).Should(BeClosed())
Eventually(done2).Should(BeClosed())
Expect(str1.StreamID()).ToNot(Equal(str2.StreamID()))
Expect(str1.StreamID() + str2.StreamID()).To(BeEquivalentTo(3 + 5))
})
It("waits a new stream is available", func() {
var str streamI
It("waits until a new stream is available", func() {
var str Stream
done := make(chan struct{})
go func() {
defer GinkgoRecover()
var err error
str, err = m.AcceptStream()
Expect(err).ToNot(HaveOccurred())
close(done)
}()
Consistently(func() streamI { return str }).Should(BeNil())
Consistently(done).ShouldNot(BeClosed())
_, err := m.GetOrOpenStream(3)
Expect(err).ToNot(HaveOccurred())
Eventually(func() streamI { return str }).ShouldNot(BeNil())
Eventually(done).Should(BeClosed())
Expect(str.StreamID()).To(Equal(protocol.StreamID(3)))
})
It("returns multiple streams on subsequent Accept calls, if available", func() {
var str streamI
var str Stream
done := make(chan struct{})
go func() {
defer GinkgoRecover()
var err error
str, err = m.AcceptStream()
Expect(err).ToNot(HaveOccurred())
close(done)
}()
_, err := m.GetOrOpenStream(5)
Expect(err).ToNot(HaveOccurred())
Eventually(func() streamI { return str }).ShouldNot(BeNil())
Eventually(done).Should(BeClosed())
Expect(str.StreamID()).To(Equal(protocol.StreamID(3)))
str, err = m.AcceptStream()
Expect(err).ToNot(HaveOccurred())
@ -489,16 +502,18 @@ var _ = Describe("Streams Map", func() {
Context("accepting streams", func() {
It("accepts stream 2 first", func() {
var str streamI
var str Stream
done := make(chan struct{})
go func() {
defer GinkgoRecover()
var err error
str, err = m.AcceptStream()
Expect(err).ToNot(HaveOccurred())
close(done)
}()
_, err := m.GetOrOpenStream(2)
Expect(err).ToNot(HaveOccurred())
Eventually(func() streamI { return str }).ShouldNot(BeNil())
Eventually(done).Should(BeClosed())
Expect(str.StreamID()).To(Equal(protocol.StreamID(2)))
})
})