diff --git a/streams_map.go b/streams_map.go index 86d945fc..bf9374b4 100644 --- a/streams_map.go +++ b/streams_map.go @@ -7,7 +7,6 @@ import ( "github.com/lucas-clemente/quic-go/internal/handshake" "github.com/lucas-clemente/quic-go/internal/protocol" - "github.com/lucas-clemente/quic-go/internal/utils" "github.com/lucas-clemente/quic-go/internal/wire" "github.com/lucas-clemente/quic-go/qerr" ) @@ -28,11 +27,6 @@ type streamsMap struct { nextStreamToAccept protocol.StreamID newStream newStreamLambda - - numOutgoingStreams uint32 - numIncomingStreams uint32 - maxIncomingStreams uint32 - maxOutgoingStreams uint32 } var _ streamManager = &streamsMap{} @@ -42,17 +36,10 @@ type newStreamLambda func(protocol.StreamID) streamI var errMapAccess = errors.New("streamsMap: Error accessing the streams map") func newStreamsMap(newStream newStreamLambda, pers protocol.Perspective) streamManager { - // add some tolerance to the maximum incoming streams value - maxStreams := uint32(protocol.MaxIncomingStreams) - maxIncomingStreams := utils.MaxUint32( - maxStreams+protocol.MaxStreamsMinimumIncrement, - uint32(float64(maxStreams)*float64(protocol.MaxStreamsMultiplier)), - ) sm := streamsMap{ - perspective: pers, - streams: make(map[protocol.StreamID]streamI), - newStream: newStream, - maxIncomingStreams: maxIncomingStreams, + perspective: pers, + streams: make(map[protocol.StreamID]streamI), + newStream: newStream, } sm.nextStreamOrErrCond.L = &sm.mutex sm.openStreamOrErrCond.L = &sm.mutex @@ -134,29 +121,18 @@ func (m *streamsMap) GetOrOpenStream(id protocol.StreamID) (streamI, error) { } func (m *streamsMap) openRemoteStream(id protocol.StreamID) (streamI, error) { - if m.numIncomingStreams >= m.maxIncomingStreams { - return nil, qerr.TooManyOpenStreams - } if id+protocol.MaxNewStreamIDDelta < m.highestStreamOpenedByPeer { return nil, qerr.Error(qerr.InvalidStreamID, fmt.Sprintf("attempted to open stream %d, which is a lot smaller than the highest opened stream, %d", id, m.highestStreamOpenedByPeer)) } - - m.numIncomingStreams++ if id > m.highestStreamOpenedByPeer { m.highestStreamOpenedByPeer = id } - s := m.newStream(id) m.putStream(s) return s, nil } func (m *streamsMap) openStreamImpl() (streamI, error) { - if m.numOutgoingStreams >= m.maxOutgoingStreams { - return nil, qerr.TooManyOpenStreams - } - - m.numOutgoingStreams++ s := m.newStream(m.nextStreamToOpen) m.putStream(s) m.nextStreamToOpen = m.nextStreamID(m.nextStreamToOpen) @@ -222,11 +198,6 @@ func (m *streamsMap) DeleteStream(id protocol.StreamID) error { return errMapAccess } delete(m.streams, id) - if m.streamInitiatedBy(id) == m.perspective { - m.numOutgoingStreams-- - } else { - m.numIncomingStreams-- - } m.openStreamOrErrCond.Signal() return nil } @@ -254,7 +225,6 @@ func (m *streamsMap) CloseWithError(err error) { // TODO(#952): this won't be needed when gQUIC supports stateless handshakes func (m *streamsMap) UpdateLimits(params *handshake.TransportParameters) { m.mutex.Lock() - m.maxOutgoingStreams = params.MaxStreams for id, str := range m.streams { str.handleMaxStreamDataFrame(&wire.MaxStreamDataFrame{ StreamID: id, diff --git a/streams_map_test.go b/streams_map_test.go index ab5b5bc0..9a1fa32b 100644 --- a/streams_map_test.go +++ b/streams_map_test.go @@ -7,7 +7,6 @@ import ( "github.com/lucas-clemente/quic-go/internal/handshake" "github.com/lucas-clemente/quic-go/internal/protocol" "github.com/lucas-clemente/quic-go/internal/wire" - "github.com/lucas-clemente/quic-go/qerr" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -43,8 +42,6 @@ var _ = Describe("Streams Map (for IETF QUIC)", func() { Expect(s).ToNot(BeNil()) Expect(s.StreamID()).To(Equal(protocol.StreamID(1))) Expect(m.streams).To(HaveLen(1)) - Expect(m.numIncomingStreams).To(BeEquivalentTo(1)) - Expect(m.numOutgoingStreams).To(BeZero()) }) It("rejects streams with even IDs", func() { @@ -62,11 +59,11 @@ var _ = Describe("Streams Map (for IETF QUIC)", func() { It("gets existing streams", func() { s, err := m.GetOrOpenStream(5) Expect(err).NotTo(HaveOccurred()) - numStreams := m.numIncomingStreams + numStreams := len(m.streams) s, err = m.GetOrOpenStream(5) Expect(err).NotTo(HaveOccurred()) Expect(s.StreamID()).To(Equal(protocol.StreamID(5))) - Expect(m.numIncomingStreams).To(Equal(numStreams)) + Expect(m.streams).To(HaveLen(numStreams)) }) It("returns nil for closed streams", func() { @@ -95,46 +92,14 @@ var _ = Describe("Streams Map (for IETF QUIC)", func() { Expect(err).ToNot(HaveOccurred()) Expect(str).To(BeNil()) }) - - Context("counting streams", func() { - It("errors when too many streams are opened", func() { - for i := uint32(0); i < m.maxIncomingStreams; i++ { - _, err := m.GetOrOpenStream(protocol.StreamID(i*2 + 1)) - Expect(err).NotTo(HaveOccurred()) - } - _, err := m.GetOrOpenStream(protocol.StreamID(2*m.maxIncomingStreams + 3)) - Expect(err).To(MatchError(qerr.TooManyOpenStreams)) - }) - - It("errors when too many streams are opened implicitely", func() { - _, err := m.GetOrOpenStream(protocol.StreamID(m.maxIncomingStreams*2 + 3)) - Expect(err).To(MatchError(qerr.TooManyOpenStreams)) - }) - - It("does not error when many streams are opened and closed", func() { - for i := uint32(2); i < 10*m.maxIncomingStreams; i++ { - str, err := m.GetOrOpenStream(protocol.StreamID(i*2 + 1)) - Expect(err).NotTo(HaveOccurred()) - deleteStream(str.StreamID()) - } - }) - }) }) Context("server-side streams", func() { - It("doesn't allow opening streams before receiving the transport parameters", func() { - _, err := m.OpenStream() - Expect(err).To(MatchError(qerr.TooManyOpenStreams)) - }) - It("opens a stream 2 first", func() { - m.UpdateLimits(&handshake.TransportParameters{MaxStreams: 10000}) s, err := m.OpenStream() Expect(err).ToNot(HaveOccurred()) Expect(s).ToNot(BeNil()) Expect(s.StreamID()).To(Equal(protocol.StreamID(2))) - Expect(m.numIncomingStreams).To(BeZero()) - Expect(m.numOutgoingStreams).To(BeEquivalentTo(1)) }) It("returns the error when the streamsMap was closed", func() { @@ -145,7 +110,6 @@ var _ = Describe("Streams Map (for IETF QUIC)", func() { }) It("doesn't reopen an already closed stream", func() { - m.UpdateLimits(&handshake.TransportParameters{MaxStreams: 10000}) str, err := m.OpenStream() Expect(err).ToNot(HaveOccurred()) Expect(str.StreamID()).To(Equal(protocol.StreamID(2))) @@ -156,96 +120,7 @@ var _ = Describe("Streams Map (for IETF QUIC)", func() { Expect(str).To(BeNil()) }) - Context("counting streams", func() { - const maxOutgoingStreams = 50 - - BeforeEach(func() { - m.UpdateLimits(&handshake.TransportParameters{MaxStreams: maxOutgoingStreams}) - - }) - - It("errors when too many streams are opened", func() { - for i := 1; i <= maxOutgoingStreams; i++ { - _, err := m.OpenStream() - Expect(err).NotTo(HaveOccurred()) - } - _, err := m.OpenStream() - Expect(err).To(MatchError(qerr.TooManyOpenStreams)) - }) - - It("does not error when many streams are opened and closed", func() { - for i := 2; i < 10*maxOutgoingStreams; i++ { - str, err := m.OpenStream() - Expect(err).NotTo(HaveOccurred()) - deleteStream(str.StreamID()) - } - }) - - It("allows many server- and client-side streams at the same time", func() { - for i := 1; i < maxOutgoingStreams; i++ { - _, err := m.OpenStream() - Expect(err).ToNot(HaveOccurred()) - } - for i := 0; i < maxOutgoingStreams; i++ { - _, err := m.GetOrOpenStream(protocol.StreamID(2*i + 1)) - Expect(err).ToNot(HaveOccurred()) - } - }) - }) - Context("opening streams synchronously", func() { - const maxOutgoingStreams = 10 - - BeforeEach(func() { - m.UpdateLimits(&handshake.TransportParameters{MaxStreams: maxOutgoingStreams}) - }) - - openMaxNumStreams := func() { - for i := 1; i <= maxOutgoingStreams; i++ { - _, err := m.OpenStream() - Expect(err).NotTo(HaveOccurred()) - } - _, err := m.OpenStream() - Expect(err).To(MatchError(qerr.TooManyOpenStreams)) - } - - It("waits until another stream is closed", func() { - openMaxNumStreams() - var str Stream - done := make(chan struct{}) - go func() { - defer GinkgoRecover() - var err error - str, err = m.OpenStreamSync() - Expect(err).ToNot(HaveOccurred()) - close(done) - }() - Consistently(done).ShouldNot(BeClosed()) - deleteStream(6) - Eventually(done).Should(BeClosed()) - Expect(str.StreamID()).To(Equal(protocol.StreamID(2*maxOutgoingStreams + 2))) - }) - - It("stops waiting when an error is registered", func() { - testErr := errors.New("test error") - openMaxNumStreams() - for _, str := range m.streams { - str.(*MockStreamI).EXPECT().closeForShutdown(testErr) - } - - done := make(chan struct{}) - go func() { - defer GinkgoRecover() - _, err := m.OpenStreamSync() - Expect(err).To(MatchError(testErr)) - close(done) - }() - - Consistently(done).ShouldNot(BeClosed()) - m.CloseWithError(testErr) - Eventually(done).Should(BeClosed()) - }) - It("immediately returns when OpenStreamSync is called after an error was registered", func() { testErr := errors.New("test error") m.CloseWithError(testErr) @@ -404,7 +279,6 @@ var _ = Describe("Streams Map (for IETF QUIC)", func() { Context("as a client", func() { BeforeEach(func() { setNewStreamsMap(protocol.PerspectiveClient) - m.UpdateLimits(&handshake.TransportParameters{MaxStreams: 10000}) }) Context("server-side streams", func() { @@ -425,8 +299,6 @@ var _ = Describe("Streams Map (for IETF QUIC)", func() { Expect(err).NotTo(HaveOccurred()) Expect(s.StreamID()).To(Equal(protocol.StreamID(2))) Expect(m.streams).To(HaveLen(1)) - Expect(m.numOutgoingStreams).To(BeZero()) - Expect(m.numIncomingStreams).To(BeEquivalentTo(1)) }) It("opens skipped streams", func() { @@ -435,8 +307,6 @@ var _ = Describe("Streams Map (for IETF QUIC)", func() { Expect(m.streams).To(HaveKey(protocol.StreamID(2))) Expect(m.streams).To(HaveKey(protocol.StreamID(4))) Expect(m.streams).To(HaveKey(protocol.StreamID(6))) - Expect(m.numOutgoingStreams).To(BeZero()) - Expect(m.numIncomingStreams).To(BeEquivalentTo(3)) }) It("doesn't reopen an already closed stream", func() { @@ -454,13 +324,10 @@ var _ = Describe("Streams Map (for IETF QUIC)", func() { Context("client-side streams", func() { It("starts with stream 1", func() { setNewStreamsMap(protocol.PerspectiveClient) - m.UpdateLimits(&handshake.TransportParameters{MaxStreams: 10000}) s, err := m.OpenStream() Expect(err).ToNot(HaveOccurred()) Expect(s).ToNot(BeNil()) Expect(s.StreamID()).To(BeEquivalentTo(1)) - Expect(m.numOutgoingStreams).To(BeEquivalentTo(1)) - Expect(m.numIncomingStreams).To(BeZero()) }) It("opens multiple streams", func() { @@ -510,24 +377,19 @@ var _ = Describe("Streams Map (for IETF QUIC)", func() { It("deletes an incoming stream", func() { _, err := m.GetOrOpenStream(3) // open stream 1 and 3 Expect(err).ToNot(HaveOccurred()) - Expect(m.numIncomingStreams).To(BeEquivalentTo(2)) err = m.DeleteStream(1) Expect(err).ToNot(HaveOccurred()) Expect(m.streams).To(HaveLen(1)) Expect(m.streams).To(HaveKey(protocol.StreamID(3))) - Expect(m.numIncomingStreams).To(BeEquivalentTo(1)) }) It("deletes an outgoing stream", func() { - m.UpdateLimits(&handshake.TransportParameters{MaxStreams: 10000}) _, err := m.OpenStream() // open stream 2 Expect(err).ToNot(HaveOccurred()) _, err = m.OpenStream() Expect(err).ToNot(HaveOccurred()) - Expect(m.numOutgoingStreams).To(BeEquivalentTo(2)) err = m.DeleteStream(2) Expect(err).ToNot(HaveOccurred()) - Expect(m.numOutgoingStreams).To(BeEquivalentTo(1)) }) It("errors when the stream doesn't exist", func() {