mirror of
https://github.com/refraction-networking/uquic.git
synced 2025-04-04 12:47:36 +03:00
make the outgoing streams map work with stream 0
Stream 0 is now a valid stream ID used for application data, so the streams map must be able to (block on) opening this stream.
This commit is contained in:
parent
08008b680e
commit
5fc2e12038
4 changed files with 29 additions and 6 deletions
|
@ -21,6 +21,7 @@ type outgoingBidiStreamsMap struct {
|
||||||
|
|
||||||
nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync)
|
nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync)
|
||||||
maxStream protocol.StreamID // the maximum stream ID we're allowed to open
|
maxStream protocol.StreamID // the maximum stream ID we're allowed to open
|
||||||
|
maxStreamSet bool // was maxStream set. If not, it's not possible to any stream (also works for stream 0)
|
||||||
highestBlocked protocol.StreamID // the highest stream ID that we queued a STREAM_ID_BLOCKED frame for
|
highestBlocked protocol.StreamID // the highest stream ID that we queued a STREAM_ID_BLOCKED frame for
|
||||||
|
|
||||||
newStream func(protocol.StreamID) streamI
|
newStream func(protocol.StreamID) streamI
|
||||||
|
@ -71,7 +72,7 @@ func (m *outgoingBidiStreamsMap) openStreamImpl() (streamI, error) {
|
||||||
if m.closeErr != nil {
|
if m.closeErr != nil {
|
||||||
return nil, m.closeErr
|
return nil, m.closeErr
|
||||||
}
|
}
|
||||||
if m.nextStream > m.maxStream {
|
if !m.maxStreamSet || m.nextStream > m.maxStream {
|
||||||
if m.maxStream == 0 || m.highestBlocked < m.maxStream {
|
if m.maxStream == 0 || m.highestBlocked < m.maxStream {
|
||||||
m.queueStreamIDBlocked(&wire.StreamIDBlockedFrame{StreamID: m.maxStream})
|
m.queueStreamIDBlocked(&wire.StreamIDBlockedFrame{StreamID: m.maxStream})
|
||||||
m.highestBlocked = m.maxStream
|
m.highestBlocked = m.maxStream
|
||||||
|
@ -108,8 +109,9 @@ func (m *outgoingBidiStreamsMap) DeleteStream(id protocol.StreamID) error {
|
||||||
|
|
||||||
func (m *outgoingBidiStreamsMap) SetMaxStream(id protocol.StreamID) {
|
func (m *outgoingBidiStreamsMap) SetMaxStream(id protocol.StreamID) {
|
||||||
m.mutex.Lock()
|
m.mutex.Lock()
|
||||||
if id > m.maxStream {
|
if !m.maxStreamSet || id > m.maxStream {
|
||||||
m.maxStream = id
|
m.maxStream = id
|
||||||
|
m.maxStreamSet = true
|
||||||
m.cond.Broadcast()
|
m.cond.Broadcast()
|
||||||
}
|
}
|
||||||
m.mutex.Unlock()
|
m.mutex.Unlock()
|
||||||
|
|
|
@ -19,6 +19,7 @@ type outgoingItemsMap struct {
|
||||||
|
|
||||||
nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync)
|
nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync)
|
||||||
maxStream protocol.StreamID // the maximum stream ID we're allowed to open
|
maxStream protocol.StreamID // the maximum stream ID we're allowed to open
|
||||||
|
maxStreamSet bool // was maxStream set. If not, it's not possible to any stream (also works for stream 0)
|
||||||
highestBlocked protocol.StreamID // the highest stream ID that we queued a STREAM_ID_BLOCKED frame for
|
highestBlocked protocol.StreamID // the highest stream ID that we queued a STREAM_ID_BLOCKED frame for
|
||||||
|
|
||||||
newStream func(protocol.StreamID) item
|
newStream func(protocol.StreamID) item
|
||||||
|
@ -69,7 +70,7 @@ func (m *outgoingItemsMap) openStreamImpl() (item, error) {
|
||||||
if m.closeErr != nil {
|
if m.closeErr != nil {
|
||||||
return nil, m.closeErr
|
return nil, m.closeErr
|
||||||
}
|
}
|
||||||
if m.nextStream > m.maxStream {
|
if !m.maxStreamSet || m.nextStream > m.maxStream {
|
||||||
if m.maxStream == 0 || m.highestBlocked < m.maxStream {
|
if m.maxStream == 0 || m.highestBlocked < m.maxStream {
|
||||||
m.queueStreamIDBlocked(&wire.StreamIDBlockedFrame{StreamID: m.maxStream})
|
m.queueStreamIDBlocked(&wire.StreamIDBlockedFrame{StreamID: m.maxStream})
|
||||||
m.highestBlocked = m.maxStream
|
m.highestBlocked = m.maxStream
|
||||||
|
@ -106,8 +107,9 @@ func (m *outgoingItemsMap) DeleteStream(id protocol.StreamID) error {
|
||||||
|
|
||||||
func (m *outgoingItemsMap) SetMaxStream(id protocol.StreamID) {
|
func (m *outgoingItemsMap) SetMaxStream(id protocol.StreamID) {
|
||||||
m.mutex.Lock()
|
m.mutex.Lock()
|
||||||
if id > m.maxStream {
|
if !m.maxStreamSet || id > m.maxStream {
|
||||||
m.maxStream = id
|
m.maxStream = id
|
||||||
|
m.maxStreamSet = true
|
||||||
m.cond.Broadcast()
|
m.cond.Broadcast()
|
||||||
}
|
}
|
||||||
m.mutex.Unlock()
|
m.mutex.Unlock()
|
||||||
|
|
|
@ -122,6 +122,23 @@ var _ = Describe("Streams Map (outgoing)", func() {
|
||||||
Eventually(done).Should(BeClosed())
|
Eventually(done).Should(BeClosed())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("works with stream 0", func() {
|
||||||
|
m = newOutgoingItemsMap(0, newItem, mockSender.queueControlFrame)
|
||||||
|
mockSender.EXPECT().queueControlFrame(&wire.StreamIDBlockedFrame{StreamID: 0})
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
str, err := m.OpenStreamSync()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(str.(*mockGenericStream).id).To(BeZero())
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
Consistently(done).ShouldNot(BeClosed())
|
||||||
|
m.SetMaxStream(0)
|
||||||
|
Eventually(done).Should(BeClosed())
|
||||||
|
})
|
||||||
|
|
||||||
It("stops opening synchronously when it is closed", func() {
|
It("stops opening synchronously when it is closed", func() {
|
||||||
mockSender.EXPECT().queueControlFrame(gomock.Any())
|
mockSender.EXPECT().queueControlFrame(gomock.Any())
|
||||||
testErr := errors.New("test error")
|
testErr := errors.New("test error")
|
||||||
|
|
|
@ -21,6 +21,7 @@ type outgoingUniStreamsMap struct {
|
||||||
|
|
||||||
nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync)
|
nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync)
|
||||||
maxStream protocol.StreamID // the maximum stream ID we're allowed to open
|
maxStream protocol.StreamID // the maximum stream ID we're allowed to open
|
||||||
|
maxStreamSet bool // was maxStream set. If not, it's not possible to any stream (also works for stream 0)
|
||||||
highestBlocked protocol.StreamID // the highest stream ID that we queued a STREAM_ID_BLOCKED frame for
|
highestBlocked protocol.StreamID // the highest stream ID that we queued a STREAM_ID_BLOCKED frame for
|
||||||
|
|
||||||
newStream func(protocol.StreamID) sendStreamI
|
newStream func(protocol.StreamID) sendStreamI
|
||||||
|
@ -71,7 +72,7 @@ func (m *outgoingUniStreamsMap) openStreamImpl() (sendStreamI, error) {
|
||||||
if m.closeErr != nil {
|
if m.closeErr != nil {
|
||||||
return nil, m.closeErr
|
return nil, m.closeErr
|
||||||
}
|
}
|
||||||
if m.nextStream > m.maxStream {
|
if !m.maxStreamSet || m.nextStream > m.maxStream {
|
||||||
if m.maxStream == 0 || m.highestBlocked < m.maxStream {
|
if m.maxStream == 0 || m.highestBlocked < m.maxStream {
|
||||||
m.queueStreamIDBlocked(&wire.StreamIDBlockedFrame{StreamID: m.maxStream})
|
m.queueStreamIDBlocked(&wire.StreamIDBlockedFrame{StreamID: m.maxStream})
|
||||||
m.highestBlocked = m.maxStream
|
m.highestBlocked = m.maxStream
|
||||||
|
@ -108,8 +109,9 @@ func (m *outgoingUniStreamsMap) DeleteStream(id protocol.StreamID) error {
|
||||||
|
|
||||||
func (m *outgoingUniStreamsMap) SetMaxStream(id protocol.StreamID) {
|
func (m *outgoingUniStreamsMap) SetMaxStream(id protocol.StreamID) {
|
||||||
m.mutex.Lock()
|
m.mutex.Lock()
|
||||||
if id > m.maxStream {
|
if !m.maxStreamSet || id > m.maxStream {
|
||||||
m.maxStream = id
|
m.maxStream = id
|
||||||
|
m.maxStreamSet = true
|
||||||
m.cond.Broadcast()
|
m.cond.Broadcast()
|
||||||
}
|
}
|
||||||
m.mutex.Unlock()
|
m.mutex.Unlock()
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue