mirror of
https://github.com/refraction-networking/uquic.git
synced 2025-04-03 20:27:35 +03:00
never increase the flow control limit for the crypto stream
This commit is contained in:
parent
f78c0035b9
commit
8bd6168511
5 changed files with 9 additions and 28 deletions
|
@ -164,7 +164,9 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err
|
||||||
s.flowController.AddBytesRead(protocol.ByteCount(m))
|
s.flowController.AddBytesRead(protocol.ByteCount(m))
|
||||||
}
|
}
|
||||||
// increase the flow control window, if necessary
|
// increase the flow control window, if necessary
|
||||||
s.flowController.MaybeQueueWindowUpdate()
|
if s.streamID != s.version.CryptoStreamID() {
|
||||||
|
s.flowController.MaybeQueueWindowUpdate()
|
||||||
|
}
|
||||||
|
|
||||||
if s.readPosInFrame >= len(s.currentFrame) && s.currentFrameIsLast {
|
if s.readPosInFrame >= len(s.currentFrame) && s.currentFrameIsLast {
|
||||||
s.finRead = true
|
s.finRead = true
|
||||||
|
|
|
@ -186,7 +186,6 @@ func (s *sendStream) getDataForWriting(maxBytes protocol.ByteCount) ([]byte, boo
|
||||||
return nil, s.finishedWriting && !s.finSent
|
return nil, s.finishedWriting && !s.finSent
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(#657): Flow control for the crypto stream
|
|
||||||
if s.streamID != s.version.CryptoStreamID() {
|
if s.streamID != s.version.CryptoStreamID() {
|
||||||
maxBytes = utils.MinByteCount(maxBytes, s.flowController.SendWindowSize())
|
maxBytes = utils.MinByteCount(maxBytes, s.flowController.SendWindowSize())
|
||||||
}
|
}
|
||||||
|
|
|
@ -440,7 +440,7 @@ func (s *session) postSetup() error {
|
||||||
s.sessionCreationTime = now
|
s.sessionCreationTime = now
|
||||||
|
|
||||||
s.receivedPacketHandler = ackhandler.NewReceivedPacketHandler(s.rttStats, s.logger, s.version)
|
s.receivedPacketHandler = ackhandler.NewReceivedPacketHandler(s.rttStats, s.logger, s.version)
|
||||||
s.windowUpdateQueue = newWindowUpdateQueue(s.streamsMap, s.cryptoStream, s.connFlowController, s.packer.QueueControlFrame)
|
s.windowUpdateQueue = newWindowUpdateQueue(s.streamsMap, s.connFlowController, s.packer.QueueControlFrame)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,7 +14,6 @@ type windowUpdateQueue struct {
|
||||||
queue map[protocol.StreamID]bool // used as a set
|
queue map[protocol.StreamID]bool // used as a set
|
||||||
queuedConn bool // connection-level window update
|
queuedConn bool // connection-level window update
|
||||||
|
|
||||||
cryptoStream cryptoStream
|
|
||||||
streamGetter streamGetter
|
streamGetter streamGetter
|
||||||
connFlowController flowcontrol.ConnectionFlowController
|
connFlowController flowcontrol.ConnectionFlowController
|
||||||
callback func(wire.Frame)
|
callback func(wire.Frame)
|
||||||
|
@ -22,14 +21,12 @@ type windowUpdateQueue struct {
|
||||||
|
|
||||||
func newWindowUpdateQueue(
|
func newWindowUpdateQueue(
|
||||||
streamGetter streamGetter,
|
streamGetter streamGetter,
|
||||||
cryptoStream cryptoStream,
|
|
||||||
connFC flowcontrol.ConnectionFlowController,
|
connFC flowcontrol.ConnectionFlowController,
|
||||||
cb func(wire.Frame),
|
cb func(wire.Frame),
|
||||||
) *windowUpdateQueue {
|
) *windowUpdateQueue {
|
||||||
return &windowUpdateQueue{
|
return &windowUpdateQueue{
|
||||||
queue: make(map[protocol.StreamID]bool),
|
queue: make(map[protocol.StreamID]bool),
|
||||||
streamGetter: streamGetter,
|
streamGetter: streamGetter,
|
||||||
cryptoStream: cryptoStream,
|
|
||||||
connFlowController: connFC,
|
connFlowController: connFC,
|
||||||
callback: cb,
|
callback: cb,
|
||||||
}
|
}
|
||||||
|
@ -55,17 +52,12 @@ func (q *windowUpdateQueue) QueueAll() {
|
||||||
q.queuedConn = false
|
q.queuedConn = false
|
||||||
}
|
}
|
||||||
// queue all stream-level window updates
|
// queue all stream-level window updates
|
||||||
var offset protocol.ByteCount
|
|
||||||
for id := range q.queue {
|
for id := range q.queue {
|
||||||
if id == q.cryptoStream.StreamID() {
|
str, err := q.streamGetter.GetOrOpenReceiveStream(id)
|
||||||
offset = q.cryptoStream.getWindowUpdate()
|
if err != nil || str == nil { // the stream can be nil if it was completed before dequeing the window update
|
||||||
} else {
|
continue
|
||||||
str, err := q.streamGetter.GetOrOpenReceiveStream(id)
|
|
||||||
if err != nil || str == nil { // the stream can be nil if it was completed before dequeing the window update
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
offset = str.getWindowUpdate()
|
|
||||||
}
|
}
|
||||||
|
offset := str.getWindowUpdate()
|
||||||
if offset == 0 { // can happen if we received a final offset, right after queueing the window update
|
if offset == 0 { // can happen if we received a final offset, right after queueing the window update
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,16 +15,13 @@ var _ = Describe("Window Update Queue", func() {
|
||||||
streamGetter *MockStreamGetter
|
streamGetter *MockStreamGetter
|
||||||
connFC *mocks.MockConnectionFlowController
|
connFC *mocks.MockConnectionFlowController
|
||||||
queuedFrames []wire.Frame
|
queuedFrames []wire.Frame
|
||||||
cryptoStream *MockCryptoStream
|
|
||||||
)
|
)
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
streamGetter = NewMockStreamGetter(mockCtrl)
|
streamGetter = NewMockStreamGetter(mockCtrl)
|
||||||
cryptoStream = NewMockCryptoStream(mockCtrl)
|
|
||||||
connFC = mocks.NewMockConnectionFlowController(mockCtrl)
|
connFC = mocks.NewMockConnectionFlowController(mockCtrl)
|
||||||
cryptoStream.EXPECT().StreamID().Return(protocol.StreamID(0)).AnyTimes()
|
|
||||||
queuedFrames = queuedFrames[:0]
|
queuedFrames = queuedFrames[:0]
|
||||||
q = newWindowUpdateQueue(streamGetter, cryptoStream, connFC, func(f wire.Frame) {
|
q = newWindowUpdateQueue(streamGetter, connFC, func(f wire.Frame) {
|
||||||
queuedFrames = append(queuedFrames, f)
|
queuedFrames = append(queuedFrames, f)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -70,15 +67,6 @@ var _ = Describe("Window Update Queue", func() {
|
||||||
Expect(queuedFrames).To(BeEmpty())
|
Expect(queuedFrames).To(BeEmpty())
|
||||||
})
|
})
|
||||||
|
|
||||||
It("adds MAX_STREAM_DATA frames for the crypto stream", func() {
|
|
||||||
cryptoStream.EXPECT().getWindowUpdate().Return(protocol.ByteCount(42))
|
|
||||||
q.AddStream(0)
|
|
||||||
q.QueueAll()
|
|
||||||
Expect(queuedFrames).To(Equal([]wire.Frame{
|
|
||||||
&wire.MaxStreamDataFrame{StreamID: 0, ByteOffset: 42},
|
|
||||||
}))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("queues MAX_DATA frames", func() {
|
It("queues MAX_DATA frames", func() {
|
||||||
connFC.EXPECT().GetWindowUpdate().Return(protocol.ByteCount(0x1337))
|
connFC.EXPECT().GetWindowUpdate().Return(protocol.ByteCount(0x1337))
|
||||||
q.AddConnection()
|
q.AddConnection()
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue