From dd9ce2e6687c243adbcafad4a0ec859c0b3f0219 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Fri, 9 Nov 2018 18:57:41 +0700 Subject: [PATCH] replace the STREAM_ID_BLOCKED with the STREAMS_BLOCKED frame --- internal/wire/frame_parser.go | 4 +- internal/wire/frame_parser_test.go | 7 +- internal/wire/stream_id_blocked_frame.go | 37 --------- internal/wire/stream_id_blocked_frame_test.go | 53 ------------- internal/wire/streams_blocked_frame.go | 52 ++++++++++++ internal/wire/streams_blocked_frame_test.go | 79 +++++++++++++++++++ session.go | 2 +- session_test.go | 2 +- streams_map_outgoing_bidi.go | 29 ++++--- streams_map_outgoing_generic.go | 33 +++++--- streams_map_outgoing_generic_test.go | 49 +++++++----- streams_map_outgoing_uni.go | 29 ++++--- 12 files changed, 233 insertions(+), 143 deletions(-) delete mode 100644 internal/wire/stream_id_blocked_frame.go delete mode 100644 internal/wire/stream_id_blocked_frame_test.go create mode 100644 internal/wire/streams_blocked_frame.go create mode 100644 internal/wire/streams_blocked_frame_test.go diff --git a/internal/wire/frame_parser.go b/internal/wire/frame_parser.go index 71ba202f..e96626f5 100644 --- a/internal/wire/frame_parser.go +++ b/internal/wire/frame_parser.go @@ -67,8 +67,8 @@ func parseFrame(r *bytes.Reader, typeByte byte, v protocol.VersionNumber) (Frame if err != nil { err = qerr.Error(qerr.InvalidBlockedData, err.Error()) } - case 0xa: - frame, err = parseStreamIDBlockedFrame(r, v) + case 0xa, 0xb: + frame, err = parseStreamsBlockedFrame(r, v) if err != nil { err = qerr.Error(qerr.InvalidFrameData, err.Error()) } diff --git a/internal/wire/frame_parser_test.go b/internal/wire/frame_parser_test.go index b56bf323..1db1f043 100644 --- a/internal/wire/frame_parser_test.go +++ b/internal/wire/frame_parser_test.go @@ -133,8 +133,11 @@ var _ = Describe("Frame parsing", func() { Expect(frame).To(Equal(f)) }) - It("unpacks STREAM_ID_BLOCKED frames", func() { - f := &StreamIDBlockedFrame{StreamID: 0x1234567} + It("unpacks STREAMS_BLOCKED frames", func() { + f := &StreamsBlockedFrame{ + Type: protocol.StreamTypeBidi, + StreamLimit: 0x1234567, + } buf := &bytes.Buffer{} err := f.Write(buf, versionIETFFrames) Expect(err).ToNot(HaveOccurred()) diff --git a/internal/wire/stream_id_blocked_frame.go b/internal/wire/stream_id_blocked_frame.go deleted file mode 100644 index 6476eb9d..00000000 --- a/internal/wire/stream_id_blocked_frame.go +++ /dev/null @@ -1,37 +0,0 @@ -package wire - -import ( - "bytes" - - "github.com/lucas-clemente/quic-go/internal/protocol" - "github.com/lucas-clemente/quic-go/internal/utils" -) - -// A StreamIDBlockedFrame is a STREAM_ID_BLOCKED frame -type StreamIDBlockedFrame struct { - StreamID protocol.StreamID -} - -// parseStreamIDBlockedFrame parses a STREAM_ID_BLOCKED frame -func parseStreamIDBlockedFrame(r *bytes.Reader, _ protocol.VersionNumber) (*StreamIDBlockedFrame, error) { - if _, err := r.ReadByte(); err != nil { - return nil, err - } - streamID, err := utils.ReadVarInt(r) - if err != nil { - return nil, err - } - return &StreamIDBlockedFrame{StreamID: protocol.StreamID(streamID)}, nil -} - -func (f *StreamIDBlockedFrame) Write(b *bytes.Buffer, _ protocol.VersionNumber) error { - typeByte := uint8(0x0a) - b.WriteByte(typeByte) - utils.WriteVarInt(b, uint64(f.StreamID)) - return nil -} - -// Length of a written frame -func (f *StreamIDBlockedFrame) Length(_ protocol.VersionNumber) protocol.ByteCount { - return 1 + utils.VarIntLen(uint64(f.StreamID)) -} diff --git a/internal/wire/stream_id_blocked_frame_test.go b/internal/wire/stream_id_blocked_frame_test.go deleted file mode 100644 index 7fb1cfb1..00000000 --- a/internal/wire/stream_id_blocked_frame_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package wire - -import ( - "bytes" - "io" - - "github.com/lucas-clemente/quic-go/internal/protocol" - "github.com/lucas-clemente/quic-go/internal/utils" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" -) - -var _ = Describe("STREAM_ID_BLOCKED frame", func() { - Context("parsing", func() { - It("accepts sample frame", func() { - expected := []byte{0xa} - expected = append(expected, encodeVarInt(0xdecafbad)...) - b := bytes.NewReader(expected) - frame, err := parseStreamIDBlockedFrame(b, protocol.VersionWhatever) - Expect(err).ToNot(HaveOccurred()) - Expect(frame.StreamID).To(Equal(protocol.StreamID(0xdecafbad))) - Expect(b.Len()).To(BeZero()) - }) - - It("errors on EOFs", func() { - data := []byte{0xa} - data = append(data, encodeVarInt(0x12345678)...) - _, err := parseStreamIDBlockedFrame(bytes.NewReader(data), versionIETFFrames) - Expect(err).ToNot(HaveOccurred()) - for i := range data { - _, err := parseStreamIDBlockedFrame(bytes.NewReader(data[:i]), versionIETFFrames) - Expect(err).To(MatchError(io.EOF)) - } - }) - }) - - Context("writing", func() { - It("writes a sample frame", func() { - b := &bytes.Buffer{} - frame := StreamIDBlockedFrame{StreamID: 0xdeadbeefcafe} - err := frame.Write(b, protocol.VersionWhatever) - Expect(err).ToNot(HaveOccurred()) - expected := []byte{0xa} - expected = append(expected, encodeVarInt(0xdeadbeefcafe)...) - Expect(b.Bytes()).To(Equal(expected)) - }) - - It("has the correct min length", func() { - frame := StreamIDBlockedFrame{StreamID: 0x123456} - Expect(frame.Length(0)).To(Equal(protocol.ByteCount(1) + utils.VarIntLen(0x123456))) - }) - }) -}) diff --git a/internal/wire/streams_blocked_frame.go b/internal/wire/streams_blocked_frame.go new file mode 100644 index 00000000..6a79d9a8 --- /dev/null +++ b/internal/wire/streams_blocked_frame.go @@ -0,0 +1,52 @@ +package wire + +import ( + "bytes" + + "github.com/lucas-clemente/quic-go/internal/protocol" + "github.com/lucas-clemente/quic-go/internal/utils" +) + +// A StreamsBlockedFrame is a STREAMS_BLOCKED frame +type StreamsBlockedFrame struct { + Type protocol.StreamType + StreamLimit uint64 +} + +func parseStreamsBlockedFrame(r *bytes.Reader, _ protocol.VersionNumber) (*StreamsBlockedFrame, error) { + typeByte, err := r.ReadByte() + if err != nil { + return nil, err + } + + f := &StreamsBlockedFrame{} + switch typeByte { + case 0xa: + f.Type = protocol.StreamTypeBidi + case 0xb: + f.Type = protocol.StreamTypeUni + } + streamLimit, err := utils.ReadVarInt(r) + if err != nil { + return nil, err + } + f.StreamLimit = streamLimit + + return f, nil +} + +func (f *StreamsBlockedFrame) Write(b *bytes.Buffer, _ protocol.VersionNumber) error { + switch f.Type { + case protocol.StreamTypeBidi: + b.WriteByte(0xa) + case protocol.StreamTypeUni: + b.WriteByte(0xb) + } + utils.WriteVarInt(b, f.StreamLimit) + return nil +} + +// Length of a written frame +func (f *StreamsBlockedFrame) Length(_ protocol.VersionNumber) protocol.ByteCount { + return 1 + utils.VarIntLen(f.StreamLimit) +} diff --git a/internal/wire/streams_blocked_frame_test.go b/internal/wire/streams_blocked_frame_test.go new file mode 100644 index 00000000..875ca69e --- /dev/null +++ b/internal/wire/streams_blocked_frame_test.go @@ -0,0 +1,79 @@ +package wire + +import ( + "bytes" + "io" + + "github.com/lucas-clemente/quic-go/internal/protocol" + "github.com/lucas-clemente/quic-go/internal/utils" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("STREAMS_BLOCKED frame", func() { + Context("parsing", func() { + It("accepts a frame for bidirectional streams", func() { + expected := []byte{0xa} + expected = append(expected, encodeVarInt(0x1337)...) + b := bytes.NewReader(expected) + f, err := parseStreamsBlockedFrame(b, protocol.VersionWhatever) + Expect(err).ToNot(HaveOccurred()) + Expect(f.Type).To(Equal(protocol.StreamTypeBidi)) + Expect(f.StreamLimit).To(BeEquivalentTo(0x1337)) + Expect(b.Len()).To(BeZero()) + }) + + It("accepts a frame for unidirectional streams", func() { + expected := []byte{0xb} + expected = append(expected, encodeVarInt(0x7331)...) + b := bytes.NewReader(expected) + f, err := parseStreamsBlockedFrame(b, protocol.VersionWhatever) + Expect(err).ToNot(HaveOccurred()) + Expect(f.Type).To(Equal(protocol.StreamTypeUni)) + Expect(f.StreamLimit).To(BeEquivalentTo(0x7331)) + Expect(b.Len()).To(BeZero()) + }) + + It("errors on EOFs", func() { + data := []byte{0xa} + data = append(data, encodeVarInt(0x12345678)...) + _, err := parseStreamsBlockedFrame(bytes.NewReader(data), versionIETFFrames) + Expect(err).ToNot(HaveOccurred()) + for i := range data { + _, err := parseStreamsBlockedFrame(bytes.NewReader(data[:i]), versionIETFFrames) + Expect(err).To(MatchError(io.EOF)) + } + }) + }) + + Context("writing", func() { + It("writes a frame for bidirectional streams", func() { + b := &bytes.Buffer{} + f := StreamsBlockedFrame{ + Type: protocol.StreamTypeBidi, + StreamLimit: 0xdeadbeefcafe, + } + Expect(f.Write(b, protocol.VersionWhatever)).To(Succeed()) + expected := []byte{0xa} + expected = append(expected, encodeVarInt(0xdeadbeefcafe)...) + Expect(b.Bytes()).To(Equal(expected)) + }) + + It("writes a frame for unidirectional streams", func() { + b := &bytes.Buffer{} + f := StreamsBlockedFrame{ + Type: protocol.StreamTypeUni, + StreamLimit: 0xdeadbeefcafe, + } + Expect(f.Write(b, protocol.VersionWhatever)).To(Succeed()) + expected := []byte{0xb} + expected = append(expected, encodeVarInt(0xdeadbeefcafe)...) + Expect(b.Bytes()).To(Equal(expected)) + }) + + It("has the correct min length", func() { + frame := StreamsBlockedFrame{StreamLimit: 0x123456} + Expect(frame.Length(0)).To(Equal(protocol.ByteCount(1) + utils.VarIntLen(0x123456))) + }) + }) +}) diff --git a/session.go b/session.go index c4394ef9..31a89c78 100644 --- a/session.go +++ b/session.go @@ -567,7 +567,7 @@ func (s *session) handleFrames(fs []wire.Frame, encLevel protocol.EncryptionLeve err = s.handleMaxStreamsFrame(frame) case *wire.DataBlockedFrame: case *wire.StreamDataBlockedFrame: - case *wire.StreamIDBlockedFrame: + case *wire.StreamsBlockedFrame: case *wire.StopSendingFrame: err = s.handleStopSendingFrame(frame) case *wire.PingFrame: diff --git a/session_test.go b/session_test.go index 796c0086..3fd5bdcf 100644 --- a/session_test.go +++ b/session_test.go @@ -319,7 +319,7 @@ var _ = Describe("Session", func() { }) It("handles STREAM_ID_BLOCKED frames", func() { - err := sess.handleFrames([]wire.Frame{&wire.StreamIDBlockedFrame{}}, protocol.EncryptionUnspecified) + err := sess.handleFrames([]wire.Frame{&wire.StreamsBlockedFrame{}}, protocol.EncryptionUnspecified) Expect(err).NotTo(HaveOccurred()) }) diff --git a/streams_map_outgoing_bidi.go b/streams_map_outgoing_bidi.go index ce3cd8c1..79b8efd5 100644 --- a/streams_map_outgoing_bidi.go +++ b/streams_map_outgoing_bidi.go @@ -19,13 +19,13 @@ type outgoingBidiStreamsMap struct { streams map[protocol.StreamID]streamI - nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync) - maxStream protocol.StreamID // the maximum stream ID we're allowed to open - maxStreamSet bool // was maxStream set. If not, it's not possible to any stream (also works for stream 0) - highestBlocked protocol.StreamID // the highest stream ID that we queued a STREAM_ID_BLOCKED frame for + nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync) + maxStream protocol.StreamID // the maximum stream ID we're allowed to open + maxStreamSet bool // was maxStream set. If not, it's not possible to any stream (also works for stream 0) + blockedSent bool // was a STREAMS_BLOCKED sent for the current maxStream newStream func(protocol.StreamID) streamI - queueStreamIDBlocked func(*wire.StreamIDBlockedFrame) + queueStreamIDBlocked func(*wire.StreamsBlockedFrame) closeErr error } @@ -39,7 +39,7 @@ func newOutgoingBidiStreamsMap( streams: make(map[protocol.StreamID]streamI), nextStream: nextStream, newStream: newStream, - queueStreamIDBlocked: func(f *wire.StreamIDBlockedFrame) { queueControlFrame(f) }, + queueStreamIDBlocked: func(f *wire.StreamsBlockedFrame) { queueControlFrame(f) }, } m.cond.L = &m.mutex return m @@ -73,9 +73,19 @@ func (m *outgoingBidiStreamsMap) openStreamImpl() (streamI, error) { return nil, m.closeErr } if !m.maxStreamSet || m.nextStream > m.maxStream { - if m.maxStream == 0 || m.highestBlocked < m.maxStream { - m.queueStreamIDBlocked(&wire.StreamIDBlockedFrame{StreamID: m.maxStream}) - m.highestBlocked = m.maxStream + if !m.blockedSent { + if m.maxStreamSet { + m.queueStreamIDBlocked(&wire.StreamsBlockedFrame{ + Type: protocol.StreamTypeBidi, + StreamLimit: m.maxStream.StreamNum(), + }) + } else { + m.queueStreamIDBlocked(&wire.StreamsBlockedFrame{ + Type: protocol.StreamTypeBidi, + StreamLimit: 0, + }) + } + m.blockedSent = true } return nil, qerr.TooManyOpenStreams } @@ -112,6 +122,7 @@ func (m *outgoingBidiStreamsMap) SetMaxStream(id protocol.StreamID) { if !m.maxStreamSet || id > m.maxStream { m.maxStream = id m.maxStreamSet = true + m.blockedSent = false m.cond.Broadcast() } m.mutex.Unlock() diff --git a/streams_map_outgoing_generic.go b/streams_map_outgoing_generic.go index 5df1d685..cfaad7c9 100644 --- a/streams_map_outgoing_generic.go +++ b/streams_map_outgoing_generic.go @@ -9,21 +9,21 @@ import ( "github.com/lucas-clemente/quic-go/qerr" ) -//go:generate genny -in $GOFILE -out streams_map_outgoing_bidi.go gen "item=streamI Item=BidiStream" -//go:generate genny -in $GOFILE -out streams_map_outgoing_uni.go gen "item=sendStreamI Item=UniStream" +//go:generate genny -in $GOFILE -out streams_map_outgoing_bidi.go gen "item=streamI Item=BidiStream streamTypeGeneric=protocol.StreamTypeBidi" +//go:generate genny -in $GOFILE -out streams_map_outgoing_uni.go gen "item=sendStreamI Item=UniStream streamTypeGeneric=protocol.StreamTypeUni" type outgoingItemsMap struct { mutex sync.RWMutex cond sync.Cond streams map[protocol.StreamID]item - nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync) - maxStream protocol.StreamID // the maximum stream ID we're allowed to open - maxStreamSet bool // was maxStream set. If not, it's not possible to any stream (also works for stream 0) - highestBlocked protocol.StreamID // the highest stream ID that we queued a STREAM_ID_BLOCKED frame for + nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync) + maxStream protocol.StreamID // the maximum stream ID we're allowed to open + maxStreamSet bool // was maxStream set. If not, it's not possible to any stream (also works for stream 0) + blockedSent bool // was a STREAMS_BLOCKED sent for the current maxStream newStream func(protocol.StreamID) item - queueStreamIDBlocked func(*wire.StreamIDBlockedFrame) + queueStreamIDBlocked func(*wire.StreamsBlockedFrame) closeErr error } @@ -37,7 +37,7 @@ func newOutgoingItemsMap( streams: make(map[protocol.StreamID]item), nextStream: nextStream, newStream: newStream, - queueStreamIDBlocked: func(f *wire.StreamIDBlockedFrame) { queueControlFrame(f) }, + queueStreamIDBlocked: func(f *wire.StreamsBlockedFrame) { queueControlFrame(f) }, } m.cond.L = &m.mutex return m @@ -71,9 +71,19 @@ func (m *outgoingItemsMap) openStreamImpl() (item, error) { return nil, m.closeErr } if !m.maxStreamSet || m.nextStream > m.maxStream { - if m.maxStream == 0 || m.highestBlocked < m.maxStream { - m.queueStreamIDBlocked(&wire.StreamIDBlockedFrame{StreamID: m.maxStream}) - m.highestBlocked = m.maxStream + if !m.blockedSent { + if m.maxStreamSet { + m.queueStreamIDBlocked(&wire.StreamsBlockedFrame{ + Type: streamTypeGeneric, + StreamLimit: m.maxStream.StreamNum(), + }) + } else { + m.queueStreamIDBlocked(&wire.StreamsBlockedFrame{ + Type: streamTypeGeneric, + StreamLimit: 0, + }) + } + m.blockedSent = true } return nil, qerr.TooManyOpenStreams } @@ -110,6 +120,7 @@ func (m *outgoingItemsMap) SetMaxStream(id protocol.StreamID) { if !m.maxStreamSet || id > m.maxStream { m.maxStream = id m.maxStreamSet = true + m.blockedSent = false m.cond.Broadcast() } m.mutex.Unlock() diff --git a/streams_map_outgoing_generic_test.go b/streams_map_outgoing_generic_test.go index 231f8cfe..4bb9a338 100644 --- a/streams_map_outgoing_generic_test.go +++ b/streams_map_outgoing_generic_test.go @@ -12,7 +12,8 @@ import ( ) var _ = Describe("Streams Map (outgoing)", func() { - const firstNewStream protocol.StreamID = 10 + const firstNewStream protocol.StreamID = 3 + var ( m *outgoingItemsMap newItem func(id protocol.StreamID) item @@ -57,16 +58,16 @@ var _ = Describe("Streams Map (outgoing)", func() { }) It("errors when trying to get a stream that has not yet been opened", func() { - _, err := m.GetStream(10) - Expect(err).To(MatchError(qerr.Error(qerr.InvalidStreamID, "peer attempted to open stream 10"))) + _, err := m.GetStream(firstNewStream) + Expect(err).To(MatchError(qerr.Error(qerr.InvalidStreamID, "peer attempted to open stream 3"))) }) It("deletes streams", func() { - _, err := m.OpenStream() // opens stream 10 + _, err := m.OpenStream() // opens firstNewStream Expect(err).ToNot(HaveOccurred()) - err = m.DeleteStream(10) + err = m.DeleteStream(firstNewStream) Expect(err).ToNot(HaveOccurred()) - str, err := m.GetStream(10) + str, err := m.GetStream(firstNewStream) Expect(err).ToNot(HaveOccurred()) Expect(str).To(BeNil()) }) @@ -77,12 +78,12 @@ var _ = Describe("Streams Map (outgoing)", func() { }) It("errors when deleting a stream twice", func() { - _, err := m.OpenStream() // opens stream 10 + _, err := m.OpenStream() // opens firstNewStream Expect(err).ToNot(HaveOccurred()) - err = m.DeleteStream(10) + err = m.DeleteStream(firstNewStream) Expect(err).ToNot(HaveOccurred()) - err = m.DeleteStream(10) - Expect(err).To(MatchError("Tried to delete unknown stream 10")) + err = m.DeleteStream(firstNewStream) + Expect(err).To(MatchError("Tried to delete unknown stream 3")) }) It("closes all streams when CloseWithError is called", func() { @@ -124,7 +125,9 @@ var _ = Describe("Streams Map (outgoing)", func() { It("works with stream 0", func() { m = newOutgoingItemsMap(0, newItem, mockSender.queueControlFrame) - mockSender.EXPECT().queueControlFrame(&wire.StreamIDBlockedFrame{StreamID: 0}) + mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) { + Expect(f.(*wire.StreamsBlockedFrame).StreamLimit).To(BeZero()) + }) done := make(chan struct{}) go func() { defer GinkgoRecover() @@ -156,25 +159,35 @@ var _ = Describe("Streams Map (outgoing)", func() { }) It("doesn't reduce the stream limit", func() { + m.SetMaxStream(firstNewStream + 4) m.SetMaxStream(firstNewStream) - m.SetMaxStream(firstNewStream - 4) + _, err := m.OpenStream() + Expect(err).ToNot(HaveOccurred()) str, err := m.OpenStream() Expect(err).ToNot(HaveOccurred()) - Expect(str.(*mockGenericStream).id).To(Equal(firstNewStream)) + Expect(str.(*mockGenericStream).id).To(Equal(firstNewStream + 4)) }) It("queues a STREAM_ID_BLOCKED frame if no stream can be opened", func() { - m.SetMaxStream(firstNewStream) - mockSender.EXPECT().queueControlFrame(&wire.StreamIDBlockedFrame{StreamID: firstNewStream}) + m.SetMaxStream(firstNewStream + 5*4) + // open the 6 allowed streams + for i := 0; i < 6; i++ { + _, err := m.OpenStream() + Expect(err).ToNot(HaveOccurred()) + } + + mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) { + Expect(f.(*wire.StreamsBlockedFrame).StreamLimit).To(BeEquivalentTo(6)) + }) _, err := m.OpenStream() - Expect(err).ToNot(HaveOccurred()) - _, err = m.OpenStream() Expect(err).To(MatchError(qerr.TooManyOpenStreams)) }) It("only sends one STREAM_ID_BLOCKED frame for one stream ID", func() { m.SetMaxStream(firstNewStream) - mockSender.EXPECT().queueControlFrame(&wire.StreamIDBlockedFrame{StreamID: firstNewStream}) + mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) { + Expect(f.(*wire.StreamsBlockedFrame).StreamLimit).To(BeEquivalentTo(1)) + }) _, err := m.OpenStream() Expect(err).ToNot(HaveOccurred()) // try to open a stream twice, but expect only one STREAM_ID_BLOCKED to be sent diff --git a/streams_map_outgoing_uni.go b/streams_map_outgoing_uni.go index 34243887..dc31387d 100644 --- a/streams_map_outgoing_uni.go +++ b/streams_map_outgoing_uni.go @@ -19,13 +19,13 @@ type outgoingUniStreamsMap struct { streams map[protocol.StreamID]sendStreamI - nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync) - maxStream protocol.StreamID // the maximum stream ID we're allowed to open - maxStreamSet bool // was maxStream set. If not, it's not possible to any stream (also works for stream 0) - highestBlocked protocol.StreamID // the highest stream ID that we queued a STREAM_ID_BLOCKED frame for + nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync) + maxStream protocol.StreamID // the maximum stream ID we're allowed to open + maxStreamSet bool // was maxStream set. If not, it's not possible to any stream (also works for stream 0) + blockedSent bool // was a STREAMS_BLOCKED sent for the current maxStream newStream func(protocol.StreamID) sendStreamI - queueStreamIDBlocked func(*wire.StreamIDBlockedFrame) + queueStreamIDBlocked func(*wire.StreamsBlockedFrame) closeErr error } @@ -39,7 +39,7 @@ func newOutgoingUniStreamsMap( streams: make(map[protocol.StreamID]sendStreamI), nextStream: nextStream, newStream: newStream, - queueStreamIDBlocked: func(f *wire.StreamIDBlockedFrame) { queueControlFrame(f) }, + queueStreamIDBlocked: func(f *wire.StreamsBlockedFrame) { queueControlFrame(f) }, } m.cond.L = &m.mutex return m @@ -73,9 +73,19 @@ func (m *outgoingUniStreamsMap) openStreamImpl() (sendStreamI, error) { return nil, m.closeErr } if !m.maxStreamSet || m.nextStream > m.maxStream { - if m.maxStream == 0 || m.highestBlocked < m.maxStream { - m.queueStreamIDBlocked(&wire.StreamIDBlockedFrame{StreamID: m.maxStream}) - m.highestBlocked = m.maxStream + if !m.blockedSent { + if m.maxStreamSet { + m.queueStreamIDBlocked(&wire.StreamsBlockedFrame{ + Type: protocol.StreamTypeUni, + StreamLimit: m.maxStream.StreamNum(), + }) + } else { + m.queueStreamIDBlocked(&wire.StreamsBlockedFrame{ + Type: protocol.StreamTypeUni, + StreamLimit: 0, + }) + } + m.blockedSent = true } return nil, qerr.TooManyOpenStreams } @@ -112,6 +122,7 @@ func (m *outgoingUniStreamsMap) SetMaxStream(id protocol.StreamID) { if !m.maxStreamSet || id > m.maxStream { m.maxStream = id m.maxStreamSet = true + m.blockedSent = false m.cond.Broadcast() } m.mutex.Unlock()