correctly read MIDS and MSPC parameter

fixes #367
This commit is contained in:
Marten Seemann 2016-12-08 16:15:31 +07:00
parent 11d786fe28
commit 5af50c8fd0
No known key found for this signature in database
GPG key ID: 3603F40B121FCDEA
10 changed files with 170 additions and 77 deletions

View file

@ -17,15 +17,19 @@ import (
type ConnectionParametersManager struct {
mutex sync.RWMutex
flowControlNegotiated bool // have the flow control parameters for sending already been negotiated
version protocol.VersionNumber
truncateConnectionID bool
maxStreamsPerConnection uint32
idleConnectionStateLifetime time.Duration
sendStreamFlowControlWindow protocol.ByteCount
sendConnectionFlowControlWindow protocol.ByteCount
receiveStreamFlowControlWindow protocol.ByteCount
receiveConnectionFlowControlWindow protocol.ByteCount
flowControlNegotiated bool
hasReceivedMaxIncomingDynamicStreams bool
truncateConnectionID bool
maxStreamsPerConnection uint32
maxIncomingDynamicStreamsPerConnection uint32
idleConnectionStateLifetime time.Duration
sendStreamFlowControlWindow protocol.ByteCount
sendConnectionFlowControlWindow protocol.ByteCount
receiveStreamFlowControlWindow protocol.ByteCount
receiveConnectionFlowControlWindow protocol.ByteCount
}
var errTagNotInConnectionParameterMap = errors.New("ConnectionParametersManager: Tag not found in ConnectionsParameter map")
@ -37,14 +41,16 @@ var (
)
// NewConnectionParamatersManager creates a new connection parameters manager
func NewConnectionParamatersManager() *ConnectionParametersManager {
func NewConnectionParamatersManager(v protocol.VersionNumber) *ConnectionParametersManager {
return &ConnectionParametersManager{
idleConnectionStateLifetime: protocol.DefaultIdleTimeout,
sendStreamFlowControlWindow: protocol.InitialStreamFlowControlWindow, // can only be changed by the client
sendConnectionFlowControlWindow: protocol.InitialConnectionFlowControlWindow, // can only be changed by the client
receiveStreamFlowControlWindow: protocol.ReceiveStreamFlowControlWindow,
receiveConnectionFlowControlWindow: protocol.ReceiveConnectionFlowControlWindow,
maxStreamsPerConnection: protocol.MaxStreamsPerConnection,
version: v,
idleConnectionStateLifetime: protocol.DefaultIdleTimeout,
sendStreamFlowControlWindow: protocol.InitialStreamFlowControlWindow, // can only be changed by the client
sendConnectionFlowControlWindow: protocol.InitialConnectionFlowControlWindow, // can only be changed by the client
receiveStreamFlowControlWindow: protocol.ReceiveStreamFlowControlWindow,
receiveConnectionFlowControlWindow: protocol.ReceiveConnectionFlowControlWindow,
maxStreamsPerConnection: protocol.MaxStreamsPerConnection, // this is the value negotiated based on what the client sent
maxIncomingDynamicStreamsPerConnection: protocol.MaxStreamsPerConnection, // "incoming" seen from the client's perspective
}
}
@ -67,6 +73,13 @@ func (h *ConnectionParametersManager) SetFromMap(params map[Tag][]byte) error {
return ErrMalformedTag
}
h.maxStreamsPerConnection = h.negotiateMaxStreamsPerConnection(clientValue)
case TagMIDS:
clientValue, err := utils.ReadUint32(bytes.NewBuffer(value))
if err != nil {
return ErrMalformedTag
}
h.maxIncomingDynamicStreamsPerConnection = h.negotiateMaxIncomingDynamicStreamsPerConnection(clientValue)
h.hasReceivedMaxIncomingDynamicStreams = true
case TagICSL:
clientValue, err := utils.ReadUint32(bytes.NewBuffer(value))
if err != nil {
@ -107,6 +120,10 @@ func (h *ConnectionParametersManager) negotiateMaxStreamsPerConnection(clientVal
return utils.MinUint32(clientValue, protocol.MaxStreamsPerConnection)
}
func (h *ConnectionParametersManager) negotiateMaxIncomingDynamicStreamsPerConnection(clientValue uint32) uint32 {
return utils.MinUint32(clientValue, protocol.MaxIncomingDynamicStreamsPerConnection)
}
func (h *ConnectionParametersManager) negotiateIdleConnectionStateLifetime(clientValue time.Duration) time.Duration {
return utils.MinDuration(clientValue, protocol.MaxIdleTimeout)
}
@ -118,19 +135,24 @@ func (h *ConnectionParametersManager) GetSHLOMap() map[Tag][]byte {
cfcw := bytes.NewBuffer([]byte{})
utils.WriteUint32(cfcw, uint32(h.GetReceiveConnectionFlowControlWindow()))
mspc := bytes.NewBuffer([]byte{})
utils.WriteUint32(mspc, h.GetMaxStreamsPerConnection())
mids := bytes.NewBuffer([]byte{})
utils.WriteUint32(mids, protocol.MaxIncomingDynamicStreams)
utils.WriteUint32(mspc, h.maxStreamsPerConnection)
icsl := bytes.NewBuffer([]byte{})
utils.WriteUint32(icsl, uint32(h.GetIdleConnectionStateLifetime()/time.Second))
return map[Tag][]byte{
tags := map[Tag][]byte{
TagICSL: icsl.Bytes(),
TagMSPC: mspc.Bytes(),
TagMIDS: mids.Bytes(),
TagCFCW: cfcw.Bytes(),
TagSFCW: sfcw.Bytes(),
}
if h.version > protocol.Version34 {
mids := bytes.NewBuffer([]byte{})
utils.WriteUint32(mids, protocol.MaxIncomingDynamicStreamsPerConnection)
tags[TagMIDS] = mids.Bytes()
}
return tags
}
// GetSendStreamFlowControlWindow gets the size of the stream-level flow control window for sending data
@ -161,13 +183,32 @@ func (h *ConnectionParametersManager) GetReceiveConnectionFlowControlWindow() pr
return h.receiveConnectionFlowControlWindow
}
// GetMaxStreamsPerConnection gets the maximum number of streams per connection
func (h *ConnectionParametersManager) GetMaxStreamsPerConnection() uint32 {
// GetMaxOutgoingStreams gets the maximum number of outgoing streams per connection
func (h *ConnectionParametersManager) GetMaxOutgoingStreams() uint32 {
h.mutex.RLock()
defer h.mutex.RUnlock()
if h.version > protocol.Version34 && h.hasReceivedMaxIncomingDynamicStreams {
return h.maxIncomingDynamicStreamsPerConnection
}
return h.maxStreamsPerConnection
}
// GetMaxIncomingStreams get the maximum number of incoming streams per connection
func (h *ConnectionParametersManager) GetMaxIncomingStreams() uint32 {
h.mutex.RLock()
defer h.mutex.RUnlock()
var val uint32
if h.version <= protocol.Version34 {
val = h.maxStreamsPerConnection
} else {
val = protocol.MaxIncomingDynamicStreamsPerConnection
}
return utils.MaxUint32(val+protocol.MaxStreamsMinimumIncrement, uint32(float64(val)*protocol.MaxStreamsMultiplier))
}
// GetIdleConnectionStateLifetime gets the idle timeout
func (h *ConnectionParametersManager) GetIdleConnectionStateLifetime() time.Duration {
h.mutex.RLock()

View file

@ -11,7 +11,7 @@ import (
var _ = Describe("ConnectionsParameterManager", func() {
var cpm *ConnectionParametersManager
BeforeEach(func() {
cpm = NewConnectionParamatersManager()
cpm = NewConnectionParamatersManager(protocol.Version36)
})
Context("SHLO", func() {
@ -22,6 +22,12 @@ var _ = Describe("ConnectionsParameterManager", func() {
Expect(entryMap).To(HaveKey(TagMIDS))
})
It("doesn't add the MaximumIncomingDynamicStreams tag for QUIC 34", func() {
cpm.version = protocol.Version34
entryMap := cpm.GetSHLOMap()
Expect(entryMap).ToNot(HaveKey(TagMIDS))
})
It("sets the stream-level flow control windows in SHLO", func() {
cpm.receiveStreamFlowControlWindow = 0xDEADBEEF
entryMap := cpm.GetSHLOMap()
@ -43,17 +49,20 @@ var _ = Describe("ConnectionsParameterManager", func() {
Expect(entryMap[TagICSL]).To(Equal([]byte{0xAD, 0xFB, 0xCA, 0xDE}))
})
It("sets the maximum streams per connection in SHLO", func() {
cpm.maxStreamsPerConnection = 0xDEADBEEF
It("sets the negotiated value for maximum streams in the SHLO", func() {
val := 50
Expect(val).To(BeNumerically("<", protocol.MaxStreamsPerConnection))
err := cpm.SetFromMap(map[Tag][]byte{TagMSPC: []byte{byte(val), 0, 0, 0}})
Expect(err).ToNot(HaveOccurred())
entryMap := cpm.GetSHLOMap()
Expect(entryMap).To(HaveKey(TagMSPC))
Expect(entryMap[TagMSPC]).To(Equal([]byte{0xEF, 0xBE, 0xAD, 0xDE}))
Expect(entryMap[TagMSPC]).To(Equal([]byte{byte(val), 0, 0, 0}))
})
It("sets the maximum incoming dynamic streams per connection in SHLO", func() {
It("always sends its own value for the maximum incoming dynamic streams in the SHLO", func() {
err := cpm.SetFromMap(map[Tag][]byte{TagMIDS: []byte{5, 0, 0, 0}})
Expect(err).ToNot(HaveOccurred())
entryMap := cpm.GetSHLOMap()
Expect(entryMap).To(HaveKey(TagMIDS))
Expect(entryMap[TagMIDS]).To(Equal([]byte{100, 0, 0, 0}))
Expect(entryMap[TagMIDS]).To(Equal([]byte{byte(protocol.MaxIncomingDynamicStreamsPerConnection), 0, 0, 0}))
})
})
@ -182,24 +191,6 @@ var _ = Describe("ConnectionsParameterManager", func() {
})
Context("max streams per connection", func() {
It("negotiates correctly when the client wants a larger number", func() {
Expect(cpm.negotiateMaxStreamsPerConnection(protocol.MaxStreamsPerConnection + 10)).To(Equal(uint32(protocol.MaxStreamsPerConnection)))
})
It("negotiates correctly when the client wants a smaller number", func() {
Expect(cpm.negotiateMaxStreamsPerConnection(protocol.MaxStreamsPerConnection - 1)).To(Equal(uint32(protocol.MaxStreamsPerConnection - 1)))
})
It("sets the negotiated max streams per connection value", func() {
// this test only works if the value given here is smaller than protocol.MaxStreamsPerConnection
values := map[Tag][]byte{
TagMSPC: {2, 0, 0, 0},
}
err := cpm.SetFromMap(values)
Expect(err).ToNot(HaveOccurred())
Expect(cpm.GetMaxStreamsPerConnection()).To(Equal(uint32(2)))
})
It("errors when given an invalid max streams per connection value", func() {
values := map[Tag][]byte{
TagMSPC: {2, 0, 0}, // 1 byte too short
@ -208,10 +199,59 @@ var _ = Describe("ConnectionsParameterManager", func() {
Expect(err).To(MatchError(ErrMalformedTag))
})
It("gets the max streams per connection value", func() {
var value uint32 = 0xDECAFBAD
cpm.maxStreamsPerConnection = value
Expect(cpm.GetMaxStreamsPerConnection()).To(Equal(value))
It("errors when given an invalid max dynamic incoming streams per connection value", func() {
values := map[Tag][]byte{
TagMIDS: {2, 0, 0}, // 1 byte too short
}
err := cpm.SetFromMap(values)
Expect(err).To(MatchError(ErrMalformedTag))
})
Context("outgoing connections", func() {
It("sets the negotiated max streams per connection value", func() {
// this test only works if the value given here is smaller than protocol.MaxStreamsPerConnection
err := cpm.SetFromMap(map[Tag][]byte{
TagMIDS: {2, 0, 0, 0},
TagMSPC: {1, 0, 0, 0},
})
Expect(err).ToNot(HaveOccurred())
Expect(cpm.GetMaxOutgoingStreams()).To(Equal(uint32(2)))
})
It("uses the the MSPC value, if no MIDS is given", func() {
err := cpm.SetFromMap(map[Tag][]byte{TagMIDS: {3, 0, 0, 0}})
Expect(err).ToNot(HaveOccurred())
Expect(cpm.GetMaxOutgoingStreams()).To(Equal(uint32(3)))
})
It("uses the MSPC value for QUIC 34", func() {
cpm.version = protocol.Version34
err := cpm.SetFromMap(map[Tag][]byte{
TagMIDS: {2, 0, 0, 0},
TagMSPC: {1, 0, 0, 0},
})
Expect(err).ToNot(HaveOccurred())
Expect(cpm.GetMaxOutgoingStreams()).To(Equal(uint32(1)))
})
})
Context("incoming connections", func() {
It("always uses the constant value, no matter what the client sent", func() {
err := cpm.SetFromMap(map[Tag][]byte{
TagMSPC: {3, 0, 0, 0},
TagMIDS: {3, 0, 0, 0},
})
Expect(err).ToNot(HaveOccurred())
Expect(cpm.GetMaxIncomingStreams()).To(BeNumerically(">", protocol.MaxStreamsPerConnection))
})
It("uses the negotiated MSCP value, for QUIC 34", func() {
cpm.version = protocol.Version34
err := cpm.SetFromMap(map[Tag][]byte{TagMSPC: {60, 0, 0, 0}})
Expect(err).ToNot(HaveOccurred())
Expect(cpm.GetMaxIncomingStreams()).To(BeNumerically("~", 60*protocol.MaxStreamsMultiplier, 10))
})
})
})
})

View file

@ -166,7 +166,7 @@ var _ = Describe("Crypto setup", func() {
Expect(err).NotTo(HaveOccurred())
scfg.stkSource = &mockStkSource{}
v := protocol.SupportedVersions[len(protocol.SupportedVersions)-1]
cpm = NewConnectionParamatersManager()
cpm = NewConnectionParamatersManager(protocol.Version36)
cs, err = NewCryptoSetup(protocol.ConnectionID(42), ip, v, scfg, stream, cpm, aeadChanged)
Expect(err).NotTo(HaveOccurred())
cs.keyDerivation = mockKeyDerivation

View file

@ -23,11 +23,12 @@ var _ = Describe("Packet packer", func() {
fcm.sendWindowSizes[5] = protocol.MaxByteCount
fcm.sendWindowSizes[7] = protocol.MaxByteCount
streamFramer = newStreamFramer(newStreamsMap(nil), fcm)
cpm := handshake.NewConnectionParamatersManager(protocol.VersionWhatever)
streamFramer = newStreamFramer(newStreamsMap(nil, cpm), fcm)
packer = &packetPacker{
cryptoSetup: &handshake.CryptoSetup{},
connectionParametersManager: handshake.NewConnectionParamatersManager(),
connectionParametersManager: cpm,
packetNumberGenerator: newPacketNumberGenerator(protocol.SkipPacketAveragePeriodLength),
streamFramer: streamFramer,
}

View file

@ -34,8 +34,8 @@ const MaxReceiveConnectionFlowControlWindow ByteCount = 1.5 * (1 << 20) // 1.5 M
// MaxStreamsPerConnection is the maximum value accepted for the number of streams per connection
const MaxStreamsPerConnection = 100
// MaxIncomingDynamicStreams is the maximum value accepted for the incoming number of dynamic streams per connection
const MaxIncomingDynamicStreams = 100
// MaxIncomingDynamicStreamsPerConnection is the maximum value accepted for the incoming number of dynamic streams per connection
const MaxIncomingDynamicStreamsPerConnection = 100
// MaxStreamsMultiplier is the slack the client is allowed for the maximum number of streams per connection, needed e.g. when packets are out of order or dropped. The minimum of this procentual increase and the absolute increment specified by MaxStreamsMinimumIncrement is used.
const MaxStreamsMultiplier = 1.1

View file

@ -94,7 +94,7 @@ type Session struct {
// newSession makes a new session
func newSession(conn connection, v protocol.VersionNumber, connectionID protocol.ConnectionID, sCfg *handshake.ServerConfig, streamCallback StreamCallback, closeCallback closeCallback) (packetHandler, error) {
connectionParametersManager := handshake.NewConnectionParamatersManager()
connectionParametersManager := handshake.NewConnectionParamatersManager(v)
var sentPacketHandler ackhandler.SentPacketHandler
var receivedPacketHandler ackhandler.ReceivedPacketHandler
@ -130,7 +130,7 @@ func newSession(conn connection, v protocol.VersionNumber, connectionID protocol
sessionCreationTime: now,
}
session.streamsMap = newStreamsMap(session.newStream)
session.streamsMap = newStreamsMap(session.newStream, session.connectionParametersManager)
cryptoStream, _ := session.GetOrOpenStream(1)
var err error

View file

@ -4,6 +4,7 @@ import (
"bytes"
"github.com/lucas-clemente/quic-go/frames"
"github.com/lucas-clemente/quic-go/handshake"
"github.com/lucas-clemente/quic-go/protocol"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -31,7 +32,7 @@ var _ = Describe("Stream Framer", func() {
stream1 = &stream{streamID: 10}
stream2 = &stream{streamID: 11}
streamsMap = newStreamsMap(nil)
streamsMap = newStreamsMap(nil, handshake.NewConnectionParamatersManager(protocol.VersionWhatever))
streamsMap.putStream(stream1)
streamsMap.putStream(stream2)

View file

@ -110,7 +110,7 @@ var _ = Describe("Stream", func() {
BeforeEach(func() {
onDataCalled = false
var streamID protocol.StreamID = 1337
cpm := handshake.NewConnectionParamatersManager()
cpm := handshake.NewConnectionParamatersManager(protocol.VersionWhatever)
flowControlManager := flowcontrol.NewFlowControlManager(cpm, &congestion.RTTStats{})
flowControlManager.NewStream(streamID, true)
str, _ = newStream(streamID, onData, flowControlManager)

View file

@ -5,22 +5,25 @@ import (
"fmt"
"sync"
"github.com/lucas-clemente/quic-go/handshake"
"github.com/lucas-clemente/quic-go/protocol"
"github.com/lucas-clemente/quic-go/qerr"
"github.com/lucas-clemente/quic-go/utils"
)
type streamsMap struct {
mutex sync.RWMutex
connectionParameters *handshake.ConnectionParametersManager
streams map[protocol.StreamID]*stream
openStreams []protocol.StreamID
highestStreamOpenedByClient protocol.StreamID
streamsOpenedAfterLastGarbageCollect int
newStream newStreamLambda
maxNumStreams int
newStream newStreamLambda
maxOpenOutgoingStreams uint32
maxIncomingStreams uint32
roundRobinIndex int
}
@ -32,14 +35,12 @@ var (
errMapAccess = errors.New("streamsMap: Error accessing the streams map")
)
func newStreamsMap(newStream newStreamLambda) *streamsMap {
maxNumStreams := utils.Max(int(float32(protocol.MaxIncomingDynamicStreams)*protocol.MaxStreamsMultiplier), int(protocol.MaxIncomingDynamicStreams))
func newStreamsMap(newStream newStreamLambda, cpm *handshake.ConnectionParametersManager) *streamsMap {
return &streamsMap{
streams: map[protocol.StreamID]*stream{},
openStreams: make([]protocol.StreamID, 0, maxNumStreams),
newStream: newStream,
maxNumStreams: maxNumStreams,
streams: map[protocol.StreamID]*stream{},
openStreams: make([]protocol.StreamID, 0),
newStream: newStream,
connectionParameters: cpm,
}
}
@ -61,7 +62,7 @@ func (m *streamsMap) GetOrOpenStream(id protocol.StreamID) (*stream, error) {
if ok {
return s, nil
}
if len(m.openStreams) == m.maxNumStreams {
if uint32(len(m.openStreams)) == m.connectionParameters.GetMaxIncomingStreams() {
return nil, qerr.TooManyOpenStreams
}
if id%2 == 0 {

View file

@ -3,6 +3,7 @@ package quic
import (
"errors"
"github.com/lucas-clemente/quic-go/handshake"
"github.com/lucas-clemente/quic-go/protocol"
"github.com/lucas-clemente/quic-go/qerr"
. "github.com/onsi/ginkgo"
@ -11,11 +12,13 @@ import (
var _ = Describe("Streams Map", func() {
var (
m *streamsMap
cpm *handshake.ConnectionParametersManager
m *streamsMap
)
BeforeEach(func() {
m = newStreamsMap(nil)
cpm = handshake.NewConnectionParamatersManager(protocol.VersionWhatever)
m = newStreamsMap(nil, cpm)
})
Context("getting and creating streams", func() {
@ -59,17 +62,23 @@ var _ = Describe("Streams Map", func() {
})
Context("counting streams", func() {
var maxNumStreams int
BeforeEach(func() {
maxNumStreams = int(cpm.GetMaxIncomingStreams())
})
It("errors when too many streams are opened", func() {
for i := 0; i < m.maxNumStreams; i++ {
for i := 0; i < maxNumStreams; i++ {
_, err := m.GetOrOpenStream(protocol.StreamID(i*2 + 1))
Expect(err).NotTo(HaveOccurred())
}
_, err := m.GetOrOpenStream(protocol.StreamID(m.maxNumStreams))
_, err := m.GetOrOpenStream(protocol.StreamID(maxNumStreams))
Expect(err).To(MatchError(qerr.TooManyOpenStreams))
})
It("does not error when many streams are opened and closed", func() {
for i := 2; i < 10*m.maxNumStreams; i++ {
for i := 2; i < 10*maxNumStreams; i++ {
_, err := m.GetOrOpenStream(protocol.StreamID(i*2 + 1))
Expect(err).NotTo(HaveOccurred())
m.RemoveStream(protocol.StreamID(i*2 + 1))