implement sending of MAX_STREAM_ID frames

We can now impose a limit on the number of stream for IETF QUIC, and
advertise that in the transport parameters during the handshake.
This commit is contained in:
Marten Seemann 2018-02-05 21:09:05 +08:00
parent e36b8d8e30
commit 8e332c2e13
11 changed files with 208 additions and 41 deletions

View file

@ -171,9 +171,9 @@ func (c *client) dialTLS() error {
ConnectionFlowControlWindow: protocol.ReceiveConnectionFlowControlWindow,
IdleTimeout: c.config.IdleTimeout,
OmitConnectionID: c.config.RequestConnectionIDOmission,
// TODO(#1150): set reasonable limits
MaxBidiStreamID: 0xffffffff,
MaxUniStreamID: 0xffffffff,
// TODO(#523): make these values configurable
MaxBidiStreamID: protocol.MaxBidiStreamID(protocol.MaxIncomingStreams, protocol.PerspectiveClient),
MaxUniStreamID: protocol.MaxUniStreamID(protocol.MaxIncomingStreams, protocol.PerspectiveClient),
}
csc := handshake.NewCryptoStreamConn(nil)
extHandler := handshake.NewExtensionHandlerClient(params, c.initialVersion, c.config.Versions, c.version)

View file

@ -55,9 +55,6 @@ func (t PacketType) String() string {
// A ConnectionID in QUIC
type ConnectionID uint64
// A StreamID in QUIC
type StreamID uint64
// A ByteCount in QUIC
type ByteCount uint64

View file

@ -0,0 +1,36 @@
package protocol
// A StreamID in QUIC
type StreamID uint64
// MaxBidiStreamID is the highest stream ID that the peer is allowed to open,
// when it is allowed to open numStreams bidirectional streams.
// It is only valid for IETF QUIC.
func MaxBidiStreamID(numStreams int, pers Perspective) StreamID {
if numStreams == 0 {
return 0
}
var first StreamID
if pers == PerspectiveClient {
first = 1
} else {
first = 4
}
return first + 4*StreamID(numStreams-1)
}
// MaxUniStreamID is the highest stream ID that the peer is allowed to open,
// when it is allowed to open numStreams unidirectional streams.
// It is only valid for IETF QUIC.
func MaxUniStreamID(numStreams int, pers Perspective) StreamID {
if numStreams == 0 {
return 0
}
var first StreamID
if pers == PerspectiveClient {
first = 3
} else {
first = 2
}
return first + 4*StreamID(numStreams-1)
}

View file

@ -0,0 +1,42 @@
package protocol
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("Stream ID", func() {
Context("bidirectional streams", func() {
It("doesn't allow any", func() {
Expect(MaxBidiStreamID(0, PerspectiveClient)).To(Equal(StreamID(0)))
Expect(MaxBidiStreamID(0, PerspectiveServer)).To(Equal(StreamID(0)))
})
It("allows one", func() {
Expect(MaxBidiStreamID(1, PerspectiveClient)).To(Equal(StreamID(1)))
Expect(MaxBidiStreamID(1, PerspectiveServer)).To(Equal(StreamID(4)))
})
It("allows many", func() {
Expect(MaxBidiStreamID(100, PerspectiveClient)).To(Equal(StreamID(397)))
Expect(MaxBidiStreamID(100, PerspectiveServer)).To(Equal(StreamID(400)))
})
})
Context("unidirectional streams", func() {
It("doesn't allow any", func() {
Expect(MaxUniStreamID(0, PerspectiveClient)).To(Equal(StreamID(0)))
Expect(MaxUniStreamID(0, PerspectiveServer)).To(Equal(StreamID(0)))
})
It("allows one", func() {
Expect(MaxUniStreamID(1, PerspectiveClient)).To(Equal(StreamID(3)))
Expect(MaxUniStreamID(1, PerspectiveServer)).To(Equal(StreamID(2)))
})
It("allows many", func() {
Expect(MaxUniStreamID(100, PerspectiveClient)).To(Equal(StreamID(399)))
Expect(MaxUniStreamID(100, PerspectiveServer)).To(Equal(StreamID(398)))
})
})
})

View file

@ -67,9 +67,9 @@ func newServerTLS(
StreamFlowControlWindow: protocol.ReceiveStreamFlowControlWindow,
ConnectionFlowControlWindow: protocol.ReceiveConnectionFlowControlWindow,
IdleTimeout: config.IdleTimeout,
// TODO(#1150): set reasonable limits
MaxBidiStreamID: 0xffffffff,
MaxUniStreamID: 0xffffffff,
// TODO(#523): make these values configurable
MaxBidiStreamID: protocol.MaxBidiStreamID(protocol.MaxIncomingStreams, protocol.PerspectiveServer),
MaxUniStreamID: protocol.MaxUniStreamID(protocol.MaxIncomingStreams, protocol.PerspectiveServer),
},
}
s.newMintConn = s.newMintConnImpl

View file

@ -2,7 +2,6 @@ package quic
import (
"fmt"
"math"
"github.com/lucas-clemente/quic-go/internal/flowcontrol"
"github.com/lucas-clemente/quic-go/internal/handshake"
@ -66,11 +65,23 @@ func newStreamsMap(
return newReceiveStream(id, m.sender, m.newFlowController(id), version)
}
m.outgoingBidiStreams = newOutgoingBidiStreamsMap(firstOutgoingBidiStream, newBidiStream)
// TODO(#1150): use a reasonable stream limit
m.incomingBidiStreams = newIncomingBidiStreamsMap(firstIncomingBidiStream, protocol.StreamID(math.MaxUint32), newBidiStream)
// TODO(#523): make these values configurable
m.incomingBidiStreams = newIncomingBidiStreamsMap(
firstIncomingBidiStream,
protocol.MaxBidiStreamID(protocol.MaxIncomingStreams, perspective),
protocol.MaxIncomingStreams,
sender.queueControlFrame,
newBidiStream,
)
m.outgoingUniStreams = newOutgoingUniStreamsMap(firstOutgoingUniStream, newUniSendStream)
// TODO(#1150): use a reasonable stream limit
m.incomingUniStreams = newIncomingUniStreamsMap(firstIncomingUniStream, protocol.StreamID(math.MaxUint32), newUniReceiveStream)
// TODO(#523): make these values configurable
m.incomingUniStreams = newIncomingUniStreamsMap(
firstIncomingUniStream,
protocol.MaxUniStreamID(protocol.MaxIncomingStreams, perspective),
protocol.MaxIncomingStreams,
sender.queueControlFrame,
newUniReceiveStream,
)
return m
}

View file

@ -9,6 +9,7 @@ import (
"sync"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/wire"
)
type incomingBidiStreamsMap struct {
@ -20,21 +21,28 @@ type incomingBidiStreamsMap struct {
nextStream protocol.StreamID // the next stream that will be returned by AcceptStream()
highestStream protocol.StreamID // the highest stream that the peer openend
maxStream protocol.StreamID // the highest stream that the peer is allowed to open
newStream func(protocol.StreamID) streamI
maxNumStreams int // maximum number of streams
newStream func(protocol.StreamID) streamI
queueMaxStreamID func(*wire.MaxStreamIDFrame)
closeErr error
}
func newIncomingBidiStreamsMap(
nextStream protocol.StreamID,
maxStream protocol.StreamID,
initialMaxStreamID protocol.StreamID,
maxNumStreams int,
queueControlFrame func(wire.Frame),
newStream func(protocol.StreamID) streamI,
) *incomingBidiStreamsMap {
m := &incomingBidiStreamsMap{
streams: make(map[protocol.StreamID]streamI),
nextStream: nextStream,
maxStream: maxStream,
newStream: newStream,
streams: make(map[protocol.StreamID]streamI),
nextStream: nextStream,
maxStream: initialMaxStreamID,
maxNumStreams: maxNumStreams,
newStream: newStream,
queueMaxStreamID: func(f *wire.MaxStreamIDFrame) { queueControlFrame(f) },
}
m.cond.L = &m.mutex
return m
@ -99,6 +107,11 @@ func (m *incomingBidiStreamsMap) DeleteStream(id protocol.StreamID) error {
return fmt.Errorf("Tried to delete unknown stream %d", id)
}
delete(m.streams, id)
// queue a MAX_STREAM_ID frame, giving the peer the option to open a new stream
if numNewStreams := m.maxNumStreams - len(m.streams); numNewStreams > 0 {
m.maxStream = m.highestStream + protocol.StreamID(numNewStreams*4)
m.queueMaxStreamID(&wire.MaxStreamIDFrame{StreamID: m.maxStream})
}
return nil
}

View file

@ -5,6 +5,7 @@ import (
"sync"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/wire"
)
//go:generate genny -in $GOFILE -out streams_map_incoming_bidi.go gen "item=streamI Item=BidiStream"
@ -18,21 +19,28 @@ type incomingItemsMap struct {
nextStream protocol.StreamID // the next stream that will be returned by AcceptStream()
highestStream protocol.StreamID // the highest stream that the peer openend
maxStream protocol.StreamID // the highest stream that the peer is allowed to open
newStream func(protocol.StreamID) item
maxNumStreams int // maximum number of streams
newStream func(protocol.StreamID) item
queueMaxStreamID func(*wire.MaxStreamIDFrame)
closeErr error
}
func newIncomingItemsMap(
nextStream protocol.StreamID,
maxStream protocol.StreamID,
initialMaxStreamID protocol.StreamID,
maxNumStreams int,
queueControlFrame func(wire.Frame),
newStream func(protocol.StreamID) item,
) *incomingItemsMap {
m := &incomingItemsMap{
streams: make(map[protocol.StreamID]item),
nextStream: nextStream,
maxStream: maxStream,
newStream: newStream,
streams: make(map[protocol.StreamID]item),
nextStream: nextStream,
maxStream: initialMaxStreamID,
maxNumStreams: maxNumStreams,
newStream: newStream,
queueMaxStreamID: func(f *wire.MaxStreamIDFrame) { queueControlFrame(f) },
}
m.cond.L = &m.mutex
return m
@ -97,6 +105,11 @@ func (m *incomingItemsMap) DeleteStream(id protocol.StreamID) error {
return fmt.Errorf("Tried to delete unknown stream %d", id)
}
delete(m.streams, id)
// queue a MAX_STREAM_ID frame, giving the peer the option to open a new stream
if numNewStreams := m.maxNumStreams - len(m.streams); numNewStreams > 0 {
m.maxStream = m.highestStream + protocol.StreamID(numNewStreams*4)
m.queueMaxStreamID(&wire.MaxStreamIDFrame{StreamID: m.maxStream})
}
return nil
}

View file

@ -4,7 +4,9 @@ import (
"errors"
"fmt"
"github.com/golang/mock/gomock"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/wire"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -12,14 +14,16 @@ import (
var _ = Describe("Streams Map (outgoing)", func() {
const (
firstNewStream protocol.StreamID = 20
maxStream protocol.StreamID = firstNewStream + 4*100
firstNewStream protocol.StreamID = 20
maxNumStreams int = 10
initialMaxStream protocol.StreamID = firstNewStream + 4*protocol.StreamID(maxNumStreams-1)
)
var (
m *incomingItemsMap
newItem func(id protocol.StreamID) item
newItemCounter int
mockSender *MockStreamSender
)
BeforeEach(func() {
@ -28,7 +32,8 @@ var _ = Describe("Streams Map (outgoing)", func() {
newItemCounter++
return id
}
m = newIncomingItemsMap(firstNewStream, maxStream, newItem)
mockSender = NewMockStreamSender(mockCtrl)
m = newIncomingItemsMap(firstNewStream, initialMaxStream, maxNumStreams, mockSender.queueControlFrame, newItem)
})
It("opens all streams up to the id on GetOrOpenStream", func() {
@ -59,14 +64,14 @@ var _ = Describe("Streams Map (outgoing)", func() {
})
It("allows opening the maximum stream ID", func() {
str, err := m.GetOrOpenStream(maxStream)
str, err := m.GetOrOpenStream(initialMaxStream)
Expect(err).ToNot(HaveOccurred())
Expect(str).To(Equal(maxStream))
Expect(str).To(Equal(initialMaxStream))
})
It("errors when trying to get a stream ID higher than the maximum", func() {
_, err := m.GetOrOpenStream(maxStream + 4)
Expect(err).To(MatchError(fmt.Errorf("peer tried to open stream %d (current limit: %d)", maxStream+4, maxStream)))
_, err := m.GetOrOpenStream(initialMaxStream + 4)
Expect(err).To(MatchError(fmt.Errorf("peer tried to open stream %d (current limit: %d)", initialMaxStream+4, initialMaxStream)))
})
It("blocks AcceptStream until a new stream is available", func() {
@ -106,6 +111,7 @@ var _ = Describe("Streams Map (outgoing)", func() {
})
It("deletes streams", func() {
mockSender.EXPECT().queueControlFrame(gomock.Any())
_, err := m.GetOrOpenStream(20)
Expect(err).ToNot(HaveOccurred())
err = m.DeleteStream(20)
@ -119,4 +125,14 @@ var _ = Describe("Streams Map (outgoing)", func() {
err := m.DeleteStream(1337)
Expect(err).To(MatchError("Tried to delete unknown stream 1337"))
})
It("sends MAX_STREAM_ID frames when streams are deleted", func() {
// open a bunch of streams
_, err := m.GetOrOpenStream(firstNewStream + 4*4)
Expect(err).ToNot(HaveOccurred())
mockSender.EXPECT().queueControlFrame(&wire.MaxStreamIDFrame{StreamID: initialMaxStream + 4})
Expect(m.DeleteStream(firstNewStream + 4)).To(Succeed())
mockSender.EXPECT().queueControlFrame(&wire.MaxStreamIDFrame{StreamID: initialMaxStream + 8})
Expect(m.DeleteStream(firstNewStream + 3*4)).To(Succeed())
})
})

View file

@ -9,6 +9,7 @@ import (
"sync"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/wire"
)
type incomingUniStreamsMap struct {
@ -20,21 +21,28 @@ type incomingUniStreamsMap struct {
nextStream protocol.StreamID // the next stream that will be returned by AcceptStream()
highestStream protocol.StreamID // the highest stream that the peer openend
maxStream protocol.StreamID // the highest stream that the peer is allowed to open
newStream func(protocol.StreamID) receiveStreamI
maxNumStreams int // maximum number of streams
newStream func(protocol.StreamID) receiveStreamI
queueMaxStreamID func(*wire.MaxStreamIDFrame)
closeErr error
}
func newIncomingUniStreamsMap(
nextStream protocol.StreamID,
maxStream protocol.StreamID,
initialMaxStreamID protocol.StreamID,
maxNumStreams int,
queueControlFrame func(wire.Frame),
newStream func(protocol.StreamID) receiveStreamI,
) *incomingUniStreamsMap {
m := &incomingUniStreamsMap{
streams: make(map[protocol.StreamID]receiveStreamI),
nextStream: nextStream,
maxStream: maxStream,
newStream: newStream,
streams: make(map[protocol.StreamID]receiveStreamI),
nextStream: nextStream,
maxStream: initialMaxStreamID,
maxNumStreams: maxNumStreams,
newStream: newStream,
queueMaxStreamID: func(f *wire.MaxStreamIDFrame) { queueControlFrame(f) },
}
m.cond.L = &m.mutex
return m
@ -99,6 +107,11 @@ func (m *incomingUniStreamsMap) DeleteStream(id protocol.StreamID) error {
return fmt.Errorf("Tried to delete unknown stream %d", id)
}
delete(m.streams, id)
// queue a MAX_STREAM_ID frame, giving the peer the option to open a new stream
if numNewStreams := m.maxNumStreams - len(m.streams); numNewStreams > 0 {
m.maxStream = m.highestStream + protocol.StreamID(numNewStreams*4)
m.queueMaxStreamID(&wire.MaxStreamIDFrame{StreamID: m.maxStream})
}
return nil
}

View file

@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"github.com/golang/mock/gomock"
"github.com/lucas-clemente/quic-go/internal/flowcontrol"
"github.com/lucas-clemente/quic-go/internal/handshake"
"github.com/lucas-clemente/quic-go/internal/mocks"
@ -50,7 +51,10 @@ var _ = Describe("Streams Map (for IETF QUIC)", func() {
}
Context(perspective.String(), func() {
var m *streamsMap
var (
m *streamsMap
mockSender *MockStreamSender
)
allowUnlimitedStreams := func() {
m.UpdateLimits(&handshake.TransportParameters{
@ -60,7 +64,8 @@ var _ = Describe("Streams Map (for IETF QUIC)", func() {
}
BeforeEach(func() {
m = newStreamsMap(nil, newFlowController, perspective, versionIETFFrames).(*streamsMap)
mockSender = NewMockStreamSender(mockCtrl)
m = newStreamsMap(mockSender, newFlowController, perspective, versionIETFFrames).(*streamsMap)
})
Context("opening", func() {
@ -111,6 +116,7 @@ var _ = Describe("Streams Map (for IETF QUIC)", func() {
Context("deleting", func() {
BeforeEach(func() {
mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes()
allowUnlimitedStreams()
})
@ -306,6 +312,26 @@ var _ = Describe("Streams Map (for IETF QUIC)", func() {
})
})
Context("sending MAX_STREAM_ID frames", func() {
It("sends MAX_STREAM_ID frames for bidirectional streams", func() {
_, err := m.GetOrOpenReceiveStream(ids.firstIncomingBidiStream + 4*10)
Expect(err).ToNot(HaveOccurred())
mockSender.EXPECT().queueControlFrame(&wire.MaxStreamIDFrame{
StreamID: protocol.MaxBidiStreamID(protocol.MaxIncomingStreams, perspective) + 4,
})
Expect(m.DeleteStream(ids.firstIncomingBidiStream)).To(Succeed())
})
It("sends MAX_STREAM_ID frames for unidirectional streams", func() {
_, err := m.GetOrOpenReceiveStream(ids.firstIncomingUniStream + 4*10)
Expect(err).ToNot(HaveOccurred())
mockSender.EXPECT().queueControlFrame(&wire.MaxStreamIDFrame{
StreamID: protocol.MaxUniStreamID(protocol.MaxIncomingStreams, perspective) + 4,
})
Expect(m.DeleteStream(ids.firstIncomingUniStream)).To(Succeed())
})
})
It("closes", func() {
testErr := errors.New("test error")
m.CloseWithError(testErr)