update the flow control windows of streams opened in 0-RTT

The server might have increased the initial flow control window. We need
to make sure to inform all streams opened during during the 0-RTT
period.
This commit is contained in:
Marten Seemann 2021-03-12 15:09:50 +08:00
parent c27f5a55b3
commit 7cd4dea764
8 changed files with 85 additions and 2 deletions

View file

@ -278,7 +278,9 @@ func (m *streamsMap) HandleMaxStreamsFrame(f *wire.MaxStreamsFrame) {
}
func (m *streamsMap) UpdateLimits(p *wire.TransportParameters) {
m.outgoingBidiStreams.UpdateSendWindow(p.InitialMaxStreamDataBidiRemote)
m.outgoingBidiStreams.SetMaxStream(p.MaxBidiStreamNum)
m.outgoingUniStreams.UpdateSendWindow(p.InitialMaxStreamDataUni)
m.outgoingUniStreams.SetMaxStream(p.MaxUniStreamNum)
}

View file

@ -11,6 +11,7 @@ import (
// This definition must be in a file that Genny doesn't process.
type item interface {
generic.Type
updateSendWindow(protocol.ByteCount)
closeForShutdown(error)
}

View file

@ -18,8 +18,9 @@ import (
type mockGenericStream struct {
num protocol.StreamNum
closed bool
closeErr error
closed bool
closeErr error
sendWindow protocol.ByteCount
}
func (s *mockGenericStream) closeForShutdown(err error) {
@ -27,6 +28,10 @@ func (s *mockGenericStream) closeForShutdown(err error) {
s.closeErr = err
}
func (s *mockGenericStream) updateSendWindow(limit protocol.ByteCount) {
s.sendWindow = limit
}
var _ = Describe("Streams Map (incoming)", func() {
var (
m *incomingItemsMap

View file

@ -180,6 +180,17 @@ func (m *outgoingBidiStreamsMap) SetMaxStream(num protocol.StreamNum) {
m.unblockOpenSync()
}
// UpdateSendWindow is called when the peer's transport parameters are received.
// Only in the case of a 0-RTT handshake will we have open streams at this point.
// We might need to update the send window, in case the server increased it.
func (m *outgoingBidiStreamsMap) UpdateSendWindow(limit protocol.ByteCount) {
m.mutex.Lock()
for _, str := range m.streams {
str.updateSendWindow(limit)
}
m.mutex.Unlock()
}
// unblockOpenSync unblocks the next OpenStreamSync go-routine to open a new stream
func (m *outgoingBidiStreamsMap) unblockOpenSync() {
if len(m.openQueue) == 0 {

View file

@ -178,6 +178,17 @@ func (m *outgoingItemsMap) SetMaxStream(num protocol.StreamNum) {
m.unblockOpenSync()
}
// UpdateSendWindow is called when the peer's transport parameters are received.
// Only in the case of a 0-RTT handshake will we have open streams at this point.
// We might need to update the send window, in case the server increased it.
func (m *outgoingItemsMap) UpdateSendWindow(limit protocol.ByteCount) {
m.mutex.Lock()
for _, str := range m.streams {
str.updateSendWindow(limit)
}
m.mutex.Unlock()
}
// unblockOpenSync unblocks the next OpenStreamSync go-routine to open a new stream
func (m *outgoingItemsMap) unblockOpenSync() {
if len(m.openQueue) == 0 {

View file

@ -112,6 +112,16 @@ var _ = Describe("Streams Map (outgoing)", func() {
Expect(str2.(*mockGenericStream).closed).To(BeTrue())
Expect(str2.(*mockGenericStream).closeErr).To(MatchError(testErr))
})
It("updates the send window", func() {
str1, err := m.OpenStream()
Expect(err).ToNot(HaveOccurred())
str2, err := m.OpenStream()
Expect(err).ToNot(HaveOccurred())
m.UpdateSendWindow(1337)
Expect(str1.(*mockGenericStream).sendWindow).To(BeEquivalentTo(1337))
Expect(str2.(*mockGenericStream).sendWindow).To(BeEquivalentTo(1337))
})
})
Context("with stream ID limits", func() {

View file

@ -180,6 +180,17 @@ func (m *outgoingUniStreamsMap) SetMaxStream(num protocol.StreamNum) {
m.unblockOpenSync()
}
// UpdateSendWindow is called when the peer's transport parameters are received.
// Only in the case of a 0-RTT handshake will we have open streams at this point.
// We might need to update the send window, in case the server increased it.
func (m *outgoingUniStreamsMap) UpdateSendWindow(limit protocol.ByteCount) {
m.mutex.Lock()
for _, str := range m.streams {
str.updateSendWindow(limit)
}
m.mutex.Unlock()
}
// unblockOpenSync unblocks the next OpenStreamSync go-routine to open a new stream
func (m *outgoingUniStreamsMap) unblockOpenSync() {
if len(m.openQueue) == 0 {

View file

@ -348,6 +348,38 @@ var _ = Describe("Streams Map", func() {
expectTooManyStreamsError(err)
})
if perspective == protocol.PerspectiveClient {
It("applies parameters to existing streams (needed for 0-RTT)", func() {
m.UpdateLimits(&wire.TransportParameters{
MaxBidiStreamNum: 1000,
MaxUniStreamNum: 1000,
})
flowControllers := make(map[protocol.StreamID]*mocks.MockStreamFlowController)
m.newFlowController = func(id protocol.StreamID) flowcontrol.StreamFlowController {
fc := mocks.NewMockStreamFlowController(mockCtrl)
flowControllers[id] = fc
return fc
}
str, err := m.OpenStream()
Expect(err).ToNot(HaveOccurred())
unistr, err := m.OpenUniStream()
Expect(err).ToNot(HaveOccurred())
Expect(flowControllers).To(HaveKey(str.StreamID()))
flowControllers[str.StreamID()].EXPECT().UpdateSendWindow(protocol.ByteCount(4321))
Expect(flowControllers).To(HaveKey(unistr.StreamID()))
flowControllers[unistr.StreamID()].EXPECT().UpdateSendWindow(protocol.ByteCount(1234))
m.UpdateLimits(&wire.TransportParameters{
MaxBidiStreamNum: 1000,
InitialMaxStreamDataUni: 1234,
MaxUniStreamNum: 1000,
InitialMaxStreamDataBidiRemote: 4321,
})
})
}
Context("handling MAX_STREAMS frames", func() {
BeforeEach(func() {
mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes()