remove the receive flow control window from the params negotiator

The receive windows are not negotiated, they are just announced to the
peer.
This commit is contained in:
Marten Seemann 2017-10-15 13:50:19 +07:00
parent f5acb690d3
commit 480db80da1
10 changed files with 45 additions and 96 deletions

View file

@ -38,7 +38,7 @@ func NewFlowControlManager(
rttStats: rttStats, rttStats: rttStats,
maxReceiveStreamWindow: maxReceiveStreamWindow, maxReceiveStreamWindow: maxReceiveStreamWindow,
streamFlowController: make(map[protocol.StreamID]*flowController), streamFlowController: make(map[protocol.StreamID]*flowController),
connFlowController: newFlowController(0, false, connParams, maxReceiveConnectionWindow, rttStats), connFlowController: newFlowController(0, false, connParams, protocol.ReceiveConnectionFlowControlWindow, maxReceiveConnectionWindow, rttStats),
} }
} }
@ -51,7 +51,7 @@ func (f *flowControlManager) NewStream(streamID protocol.StreamID, contributesTo
if _, ok := f.streamFlowController[streamID]; ok { if _, ok := f.streamFlowController[streamID]; ok {
return return
} }
f.streamFlowController[streamID] = newFlowController(streamID, contributesToConnection, f.connParams, f.maxReceiveStreamWindow, f.rttStats) f.streamFlowController[streamID] = newFlowController(streamID, contributesToConnection, f.connParams, protocol.ReceiveStreamFlowControlWindow, f.maxReceiveStreamWindow, f.rttStats)
} }
// RemoveStream removes a closed stream from flow control // RemoveStream removes a closed stream from flow control

View file

@ -16,8 +16,6 @@ var _ = Describe("Flow Control Manager", func() {
BeforeEach(func() { BeforeEach(func() {
mockPn := mocks.NewMockParamsNegotiator(mockCtrl) mockPn := mocks.NewMockParamsNegotiator(mockCtrl)
mockPn.EXPECT().GetReceiveStreamFlowControlWindow().AnyTimes().Return(protocol.ByteCount(100))
mockPn.EXPECT().GetReceiveConnectionFlowControlWindow().AnyTimes().Return(protocol.ByteCount(200))
fcm = NewFlowControlManager(mockPn, protocol.MaxByteCount, protocol.MaxByteCount, &congestion.RTTStats{}).(*flowControlManager) fcm = NewFlowControlManager(mockPn, protocol.MaxByteCount, protocol.MaxByteCount, &congestion.RTTStats{}).(*flowControlManager)
}) })
@ -58,6 +56,13 @@ var _ = Describe("Flow Control Manager", func() {
fcm.NewStream(1, false) fcm.NewStream(1, false)
fcm.NewStream(4, true) fcm.NewStream(4, true)
fcm.NewStream(6, true) fcm.NewStream(6, true)
for _, fc := range fcm.streamFlowController {
fc.receiveWindow = 100
fc.receiveWindowIncrement = 100
}
fcm.connFlowController.receiveWindow = 200
fcm.connFlowController.receiveWindowIncrement = 200
}) })
It("updates the connection level flow controller if the stream contributes", func() { It("updates the connection level flow controller if the stream contributes", func() {
@ -195,6 +200,13 @@ var _ = Describe("Flow Control Manager", func() {
fcm.NewStream(6, true) fcm.NewStream(6, true)
fcm.streamFlowController[1].bytesSent = 41 fcm.streamFlowController[1].bytesSent = 41
fcm.streamFlowController[4].bytesSent = 42 fcm.streamFlowController[4].bytesSent = 42
for _, fc := range fcm.streamFlowController {
fc.receiveWindow = 100
fc.receiveWindowIncrement = 100
}
fcm.connFlowController.receiveWindow = 200
fcm.connFlowController.receiveWindowIncrement = 200
}) })
It("updates the connection level flow controller if the stream contributes", func() { It("updates the connection level flow controller if the stream contributes", func() {

View file

@ -37,26 +37,19 @@ func newFlowController(
streamID protocol.StreamID, streamID protocol.StreamID,
contributesToConnection bool, contributesToConnection bool,
connParams handshake.ParamsNegotiator, connParams handshake.ParamsNegotiator,
receiveWindow protocol.ByteCount,
maxReceiveWindow protocol.ByteCount, maxReceiveWindow protocol.ByteCount,
rttStats *congestion.RTTStats, rttStats *congestion.RTTStats,
) *flowController { ) *flowController {
fc := flowController{ return &flowController{
streamID: streamID, streamID: streamID,
contributesToConnection: contributesToConnection, contributesToConnection: contributesToConnection,
connParams: connParams, connParams: connParams,
rttStats: rttStats, rttStats: rttStats,
receiveWindow: receiveWindow,
receiveWindowIncrement: receiveWindow,
maxReceiveWindowIncrement: maxReceiveWindow, maxReceiveWindowIncrement: maxReceiveWindow,
} }
if streamID == 0 {
fc.receiveWindow = connParams.GetReceiveConnectionFlowControlWindow()
fc.receiveWindowIncrement = fc.receiveWindow
} else {
fc.receiveWindow = connParams.GetReceiveStreamFlowControlWindow()
fc.receiveWindowIncrement = fc.receiveWindow
}
return &fc
} }
func (c *flowController) ContributesToConnection() bool { func (c *flowController) ContributesToConnection() bool {

View file

@ -22,46 +22,46 @@ var _ = Describe("Flow controller", func() {
var rttStats *congestion.RTTStats var rttStats *congestion.RTTStats
var mockPn *mocks.MockParamsNegotiator var mockPn *mocks.MockParamsNegotiator
receiveStreamWindow := protocol.ByteCount(2000)
receiveConnectionWindow := protocol.ByteCount(4000)
maxReceiveStreamWindow := protocol.ByteCount(8000) maxReceiveStreamWindow := protocol.ByteCount(8000)
maxReceiveConnectionWindow := protocol.ByteCount(9000) maxReceiveConnectionWindow := protocol.ByteCount(9000)
BeforeEach(func() { BeforeEach(func() {
mockPn = mocks.NewMockParamsNegotiator(mockCtrl) mockPn = mocks.NewMockParamsNegotiator(mockCtrl)
mockPn.EXPECT().GetSendStreamFlowControlWindow().AnyTimes().Return(protocol.ByteCount(1000)) mockPn.EXPECT().GetSendStreamFlowControlWindow().AnyTimes().Return(protocol.ByteCount(1000))
mockPn.EXPECT().GetReceiveStreamFlowControlWindow().AnyTimes().Return(protocol.ByteCount(2000))
mockPn.EXPECT().GetSendConnectionFlowControlWindow().AnyTimes().Return(protocol.ByteCount(3000)) mockPn.EXPECT().GetSendConnectionFlowControlWindow().AnyTimes().Return(protocol.ByteCount(3000))
mockPn.EXPECT().GetReceiveConnectionFlowControlWindow().AnyTimes().Return(protocol.ByteCount(4000))
rttStats = &congestion.RTTStats{} rttStats = &congestion.RTTStats{}
}) })
It("reads the stream send and receive windows when acting as stream-level flow controller", func() { It("reads the stream send and receive windows when acting as stream-level flow controller", func() {
fc := newFlowController(5, true, mockPn, maxReceiveStreamWindow, rttStats) fc := newFlowController(5, true, mockPn, receiveStreamWindow, maxReceiveStreamWindow, rttStats)
Expect(fc.streamID).To(Equal(protocol.StreamID(5))) Expect(fc.streamID).To(Equal(protocol.StreamID(5)))
Expect(fc.receiveWindow).To(Equal(protocol.ByteCount(2000))) Expect(fc.receiveWindow).To(Equal(receiveStreamWindow))
Expect(fc.maxReceiveWindowIncrement).To(Equal(maxReceiveStreamWindow)) Expect(fc.maxReceiveWindowIncrement).To(Equal(maxReceiveStreamWindow))
}) })
It("reads the stream send and receive windows when acting as connection-level flow controller", func() { It("reads the stream send and receive windows when acting as connection-level flow controller", func() {
fc := newFlowController(0, false, mockPn, maxReceiveConnectionWindow, rttStats) fc := newFlowController(0, false, mockPn, receiveConnectionWindow, maxReceiveConnectionWindow, rttStats)
Expect(fc.streamID).To(Equal(protocol.StreamID(0))) Expect(fc.streamID).To(Equal(protocol.StreamID(0)))
Expect(fc.receiveWindow).To(Equal(protocol.ByteCount(4000))) Expect(fc.receiveWindow).To(Equal(receiveConnectionWindow))
Expect(fc.maxReceiveWindowIncrement).To(Equal(maxReceiveConnectionWindow)) Expect(fc.maxReceiveWindowIncrement).To(Equal(maxReceiveConnectionWindow))
}) })
It("does not set the stream flow control windows for sending", func() { It("does not set the stream flow control windows for sending", func() {
fc := newFlowController(5, true, mockPn, protocol.MaxByteCount, rttStats) fc := newFlowController(5, true, mockPn, protocol.MaxByteCount, protocol.MaxByteCount, rttStats)
Expect(fc.sendWindow).To(BeZero()) Expect(fc.sendWindow).To(BeZero())
}) })
It("does not set the connection flow control windows for sending", func() { It("does not set the connection flow control windows for sending", func() {
fc := newFlowController(0, false, mockPn, protocol.MaxByteCount, rttStats) fc := newFlowController(0, false, mockPn, protocol.MaxByteCount, protocol.MaxByteCount, rttStats)
Expect(fc.sendWindow).To(BeZero()) Expect(fc.sendWindow).To(BeZero())
}) })
It("says if it contributes to connection-level flow control", func() { It("says if it contributes to connection-level flow control", func() {
fc := newFlowController(1, false, mockPn, protocol.MaxByteCount, rttStats) fc := newFlowController(1, false, mockPn, protocol.MaxByteCount, protocol.MaxByteCount, rttStats)
Expect(fc.ContributesToConnection()).To(BeFalse()) Expect(fc.ContributesToConnection()).To(BeFalse())
fc = newFlowController(5, true, mockPn, protocol.MaxByteCount, rttStats) fc = newFlowController(5, true, mockPn, protocol.MaxByteCount, protocol.MaxByteCount, rttStats)
Expect(fc.ContributesToConnection()).To(BeTrue()) Expect(fc.ContributesToConnection()).To(BeTrue())
}) })
}) })

View file

@ -79,9 +79,9 @@ func (h *paramsNegotiator) SetFromTransportParameters(params []transportParamete
func (h *paramsNegotiator) GetTransportParameters() []transportParameter { func (h *paramsNegotiator) GetTransportParameters() []transportParameter {
initialMaxStreamData := make([]byte, 4) initialMaxStreamData := make([]byte, 4)
binary.BigEndian.PutUint32(initialMaxStreamData, uint32(h.GetReceiveStreamFlowControlWindow())) binary.BigEndian.PutUint32(initialMaxStreamData, uint32(protocol.ReceiveStreamFlowControlWindow))
initialMaxData := make([]byte, 4) initialMaxData := make([]byte, 4)
binary.BigEndian.PutUint32(initialMaxData, uint32(h.GetReceiveConnectionFlowControlWindow())) binary.BigEndian.PutUint32(initialMaxData, uint32(protocol.ReceiveConnectionFlowControlWindow))
initialMaxStreamID := make([]byte, 4) initialMaxStreamID := make([]byte, 4)
// TODO: use a reasonable value here // TODO: use a reasonable value here
binary.BigEndian.PutUint32(initialMaxStreamID, math.MaxUint32) binary.BigEndian.PutUint32(initialMaxStreamID, math.MaxUint32)

View file

@ -13,8 +13,6 @@ import (
type ParamsNegotiator interface { type ParamsNegotiator interface {
GetSendStreamFlowControlWindow() protocol.ByteCount GetSendStreamFlowControlWindow() protocol.ByteCount
GetSendConnectionFlowControlWindow() protocol.ByteCount GetSendConnectionFlowControlWindow() protocol.ByteCount
GetReceiveStreamFlowControlWindow() protocol.ByteCount
GetReceiveConnectionFlowControlWindow() protocol.ByteCount
GetMaxOutgoingStreams() uint32 GetMaxOutgoingStreams() uint32
// get the idle timeout that was sent by the peer // get the idle timeout that was sent by the peer
GetRemoteIdleTimeout() time.Duration GetRemoteIdleTimeout() time.Duration
@ -39,20 +37,16 @@ type paramsNegotiatorBase struct {
omitConnectionID bool omitConnectionID bool
requestConnectionIDOmission bool requestConnectionIDOmission bool
maxOutgoingStreams uint32 maxOutgoingStreams uint32
idleTimeout time.Duration idleTimeout time.Duration
remoteIdleTimeout time.Duration remoteIdleTimeout time.Duration
sendStreamFlowControlWindow protocol.ByteCount sendStreamFlowControlWindow protocol.ByteCount
sendConnectionFlowControlWindow protocol.ByteCount sendConnectionFlowControlWindow protocol.ByteCount
receiveStreamFlowControlWindow protocol.ByteCount
receiveConnectionFlowControlWindow protocol.ByteCount
} }
func (h *paramsNegotiatorBase) init(params *TransportParameters) { func (h *paramsNegotiatorBase) init(params *TransportParameters) {
h.sendStreamFlowControlWindow = protocol.InitialStreamFlowControlWindow // can only be changed by the client h.sendStreamFlowControlWindow = protocol.InitialStreamFlowControlWindow // can only be changed by the client
h.sendConnectionFlowControlWindow = protocol.InitialConnectionFlowControlWindow // can only be changed by the client h.sendConnectionFlowControlWindow = protocol.InitialConnectionFlowControlWindow // can only be changed by the client
h.receiveStreamFlowControlWindow = protocol.ReceiveStreamFlowControlWindow
h.receiveConnectionFlowControlWindow = protocol.ReceiveConnectionFlowControlWindow
h.requestConnectionIDOmission = params.RequestConnectionIDOmission h.requestConnectionIDOmission = params.RequestConnectionIDOmission
h.idleTimeout = params.IdleTimeout h.idleTimeout = params.IdleTimeout
@ -74,19 +68,6 @@ func (h *paramsNegotiatorBase) GetSendConnectionFlowControlWindow() protocol.Byt
return h.sendConnectionFlowControlWindow return h.sendConnectionFlowControlWindow
} }
func (h *paramsNegotiatorBase) GetReceiveStreamFlowControlWindow() protocol.ByteCount {
h.mutex.RLock()
defer h.mutex.RUnlock()
return h.receiveStreamFlowControlWindow
}
// GetReceiveConnectionFlowControlWindow gets the size of the stream-level flow control window for receiving data
func (h *paramsNegotiatorBase) GetReceiveConnectionFlowControlWindow() protocol.ByteCount {
h.mutex.RLock()
defer h.mutex.RUnlock()
return h.receiveConnectionFlowControlWindow
}
func (h *paramsNegotiatorBase) GetMaxOutgoingStreams() uint32 { func (h *paramsNegotiatorBase) GetMaxOutgoingStreams() uint32 {
h.mutex.RLock() h.mutex.RLock()
defer h.mutex.RUnlock() defer h.mutex.RUnlock()

View file

@ -89,9 +89,9 @@ func (h *paramsNegotiatorGQUIC) SetFromMap(params map[Tag][]byte) error {
// GetHelloMap gets all parameters needed for the Hello message. // GetHelloMap gets all parameters needed for the Hello message.
func (h *paramsNegotiatorGQUIC) GetHelloMap() (map[Tag][]byte, error) { func (h *paramsNegotiatorGQUIC) GetHelloMap() (map[Tag][]byte, error) {
sfcw := bytes.NewBuffer([]byte{}) sfcw := bytes.NewBuffer([]byte{})
utils.LittleEndian.WriteUint32(sfcw, uint32(h.GetReceiveStreamFlowControlWindow())) utils.LittleEndian.WriteUint32(sfcw, uint32(protocol.ReceiveStreamFlowControlWindow))
cfcw := bytes.NewBuffer([]byte{}) cfcw := bytes.NewBuffer([]byte{})
utils.LittleEndian.WriteUint32(cfcw, uint32(h.GetReceiveConnectionFlowControlWindow())) utils.LittleEndian.WriteUint32(cfcw, uint32(protocol.ReceiveConnectionFlowControlWindow))
mids := bytes.NewBuffer([]byte{}) mids := bytes.NewBuffer([]byte{})
utils.LittleEndian.WriteUint32(mids, protocol.MaxIncomingStreams) utils.LittleEndian.WriteUint32(mids, protocol.MaxIncomingStreams)
icsl := bytes.NewBuffer([]byte{}) icsl := bytes.NewBuffer([]byte{})

View file

@ -44,19 +44,19 @@ var _ = Describe("Params Negotiator (for gQUIC)", func() {
}) })
It("sets the stream-level flow control windows in SHLO", func() { It("sets the stream-level flow control windows in SHLO", func() {
pn.receiveStreamFlowControlWindow = 0xDEADBEEF
entryMap, err := pn.GetHelloMap() entryMap, err := pn.GetHelloMap()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(entryMap).To(HaveKey(TagSFCW)) expected := make([]byte, 4)
Expect(entryMap[TagSFCW]).To(Equal([]byte{0xEF, 0xBE, 0xAD, 0xDE})) binary.LittleEndian.PutUint32(expected, uint32(protocol.ReceiveStreamFlowControlWindow))
Expect(entryMap).To(HaveKeyWithValue(TagSFCW, expected))
}) })
It("sets the connection-level flow control windows in SHLO", func() { It("sets the connection-level flow control windows in SHLO", func() {
pn.receiveConnectionFlowControlWindow = 0xDECAFBAD
entryMap, err := pn.GetHelloMap() entryMap, err := pn.GetHelloMap()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(entryMap).To(HaveKey(TagCFCW)) expected := make([]byte, 4)
Expect(entryMap[TagCFCW]).To(Equal([]byte{0xAD, 0xFB, 0xCA, 0xDE})) binary.LittleEndian.PutUint32(expected, uint32(protocol.ReceiveConnectionFlowControlWindow))
Expect(entryMap).To(HaveKeyWithValue(TagCFCW, expected))
}) })
It("sets the connection-level flow control windows in SHLO", func() { It("sets the connection-level flow control windows in SHLO", func() {
@ -128,13 +128,6 @@ var _ = Describe("Params Negotiator (for gQUIC)", func() {
Expect(pnClient.GetSendConnectionFlowControlWindow()).To(Equal(protocol.InitialConnectionFlowControlWindow)) Expect(pnClient.GetSendConnectionFlowControlWindow()).To(Equal(protocol.InitialConnectionFlowControlWindow))
}) })
It("has the correct default flow control windows for receiving", func() {
Expect(pn.GetReceiveStreamFlowControlWindow()).To(BeEquivalentTo(protocol.ReceiveStreamFlowControlWindow))
Expect(pn.GetReceiveConnectionFlowControlWindow()).To(BeEquivalentTo(protocol.ReceiveConnectionFlowControlWindow))
Expect(pnClient.GetReceiveStreamFlowControlWindow()).To(BeEquivalentTo(protocol.ReceiveStreamFlowControlWindow))
Expect(pnClient.GetReceiveConnectionFlowControlWindow()).To(BeEquivalentTo(protocol.ReceiveConnectionFlowControlWindow))
})
It("sets a new stream-level flow control window for sending", func() { It("sets a new stream-level flow control window for sending", func() {
values := map[Tag][]byte{TagSFCW: {0xDE, 0xAD, 0xBE, 0xEF}} values := map[Tag][]byte{TagSFCW: {0xDE, 0xAD, 0xBE, 0xEF}}
err := pn.SetFromMap(values) err := pn.SetFromMap(values)

View file

@ -59,30 +59,6 @@ func (mr *MockParamsNegotiatorMockRecorder) GetSendConnectionFlowControlWindow()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSendConnectionFlowControlWindow", reflect.TypeOf((*MockParamsNegotiator)(nil).GetSendConnectionFlowControlWindow)) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSendConnectionFlowControlWindow", reflect.TypeOf((*MockParamsNegotiator)(nil).GetSendConnectionFlowControlWindow))
} }
// GetReceiveStreamFlowControlWindow mocks base method
func (m *MockParamsNegotiator) GetReceiveStreamFlowControlWindow() protocol.ByteCount {
ret := m.ctrl.Call(m, "GetReceiveStreamFlowControlWindow")
ret0, _ := ret[0].(protocol.ByteCount)
return ret0
}
// GetReceiveStreamFlowControlWindow indicates an expected call of GetReceiveStreamFlowControlWindow
func (mr *MockParamsNegotiatorMockRecorder) GetReceiveStreamFlowControlWindow() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetReceiveStreamFlowControlWindow", reflect.TypeOf((*MockParamsNegotiator)(nil).GetReceiveStreamFlowControlWindow))
}
// GetReceiveConnectionFlowControlWindow mocks base method
func (m *MockParamsNegotiator) GetReceiveConnectionFlowControlWindow() protocol.ByteCount {
ret := m.ctrl.Call(m, "GetReceiveConnectionFlowControlWindow")
ret0, _ := ret[0].(protocol.ByteCount)
return ret0
}
// GetReceiveConnectionFlowControlWindow indicates an expected call of GetReceiveConnectionFlowControlWindow
func (mr *MockParamsNegotiatorMockRecorder) GetReceiveConnectionFlowControlWindow() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetReceiveConnectionFlowControlWindow", reflect.TypeOf((*MockParamsNegotiator)(nil).GetReceiveConnectionFlowControlWindow))
}
// GetMaxOutgoingStreams mocks base method // GetMaxOutgoingStreams mocks base method
func (m *MockParamsNegotiator) GetMaxOutgoingStreams() uint32 { func (m *MockParamsNegotiator) GetMaxOutgoingStreams() uint32 {
ret := m.ctrl.Call(m, "GetMaxOutgoingStreams") ret := m.ctrl.Call(m, "GetMaxOutgoingStreams")

View file

@ -152,12 +152,6 @@ func (m *mockParamsNegotiator) GetSendStreamFlowControlWindow() protocol.ByteCou
func (m *mockParamsNegotiator) GetSendConnectionFlowControlWindow() protocol.ByteCount { func (m *mockParamsNegotiator) GetSendConnectionFlowControlWindow() protocol.ByteCount {
return protocol.InitialConnectionFlowControlWindow return protocol.InitialConnectionFlowControlWindow
} }
func (m *mockParamsNegotiator) GetReceiveStreamFlowControlWindow() protocol.ByteCount {
return protocol.ReceiveStreamFlowControlWindow
}
func (m *mockParamsNegotiator) GetReceiveConnectionFlowControlWindow() protocol.ByteCount {
return protocol.ReceiveConnectionFlowControlWindow
}
func (m *mockParamsNegotiator) GetMaxOutgoingStreams() uint32 { return 100 } func (m *mockParamsNegotiator) GetMaxOutgoingStreams() uint32 { return 100 }
func (m *mockParamsNegotiator) GetRemoteIdleTimeout() time.Duration { return time.Hour } func (m *mockParamsNegotiator) GetRemoteIdleTimeout() time.Duration { return time.Hour }
func (m *mockParamsNegotiator) OmitConnectionID() bool { return false } func (m *mockParamsNegotiator) OmitConnectionID() bool { return false }