queue connection-level window updates from the flow controller directly

It is not sufficient to check for connection-level window updates every
time a packet is sent. When a connection-level window update needs to be
sent, we need to make sure that it gets sent immediately (i.e. call
scheduleSending() in the session).
This commit is contained in:
Marten Seemann 2018-05-06 10:51:51 +09:00
parent 2e8a5807ba
commit 08160ab18f
10 changed files with 126 additions and 55 deletions

View file

@ -12,6 +12,8 @@ import (
type connectionFlowController struct {
lastBlockedAt protocol.ByteCount
baseFlowController
queueWindowUpdate func()
}
var _ ConnectionFlowController = &connectionFlowController{}
@ -21,6 +23,7 @@ var _ ConnectionFlowController = &connectionFlowController{}
func NewConnectionFlowController(
receiveWindow protocol.ByteCount,
maxReceiveWindow protocol.ByteCount,
queueWindowUpdate func(),
rttStats *congestion.RTTStats,
logger utils.Logger,
) ConnectionFlowController {
@ -32,6 +35,7 @@ func NewConnectionFlowController(
maxReceiveWindowSize: maxReceiveWindow,
logger: logger,
},
queueWindowUpdate: queueWindowUpdate,
}
}
@ -62,6 +66,15 @@ func (c *connectionFlowController) IncrementHighestReceived(increment protocol.B
return nil
}
func (c *connectionFlowController) MaybeQueueWindowUpdate() {
c.mutex.Lock()
hasWindowUpdate := c.hasWindowUpdate()
c.mutex.Unlock()
if hasWindowUpdate {
c.queueWindowUpdate()
}
}
func (c *connectionFlowController) GetWindowUpdate() protocol.ByteCount {
c.mutex.Lock()
oldWindowSize := c.receiveWindowSize

View file

@ -11,7 +11,10 @@ import (
)
var _ = Describe("Connection Flow controller", func() {
var controller *connectionFlowController
var (
controller *connectionFlowController
queuedWindowUpdate bool
)
// update the congestion such that it returns a given value for the smoothed RTT
setRtt := func(t time.Duration) {
@ -23,6 +26,7 @@ var _ = Describe("Connection Flow controller", func() {
controller = &connectionFlowController{}
controller.rttStats = &congestion.RTTStats{}
controller.logger = utils.DefaultLogger
controller.queueWindowUpdate = func() { queuedWindowUpdate = true }
})
Context("Constructor", func() {
@ -32,7 +36,7 @@ var _ = Describe("Connection Flow controller", func() {
receiveWindow := protocol.ByteCount(2000)
maxReceiveWindow := protocol.ByteCount(3000)
fc := NewConnectionFlowController(receiveWindow, maxReceiveWindow, rttStats, utils.DefaultLogger).(*connectionFlowController)
fc := NewConnectionFlowController(receiveWindow, maxReceiveWindow, nil, rttStats, utils.DefaultLogger).(*connectionFlowController)
Expect(fc.receiveWindow).To(Equal(receiveWindow))
Expect(fc.maxReceiveWindowSize).To(Equal(maxReceiveWindow))
})
@ -53,6 +57,18 @@ var _ = Describe("Connection Flow controller", func() {
controller.bytesRead = 100 - 60
})
It("queues window updates", func() {
controller.MaybeQueueWindowUpdate()
Expect(queuedWindowUpdate).To(BeFalse())
controller.AddBytesRead(30)
controller.MaybeQueueWindowUpdate()
Expect(queuedWindowUpdate).To(BeTrue())
Expect(controller.GetWindowUpdate()).ToNot(BeZero())
queuedWindowUpdate = false
controller.MaybeQueueWindowUpdate()
Expect(queuedWindowUpdate).To(BeFalse())
})
It("gets a window update", func() {
windowSize := controller.receiveWindowSize
oldOffset := controller.bytesRead

View file

@ -10,6 +10,7 @@ type flowController interface {
// for receiving
AddBytesRead(protocol.ByteCount)
GetWindowUpdate() protocol.ByteCount // returns 0 if no update is necessary
MaybeQueueWindowUpdate() // queues a window update, if necessary
}
// A StreamFlowController is a flow controller for a QUIC stream.
@ -21,8 +22,6 @@ type StreamFlowController interface {
// UpdateHighestReceived should be called when a new highest offset is received
// final has to be to true if this is the final offset of the stream, as contained in a STREAM frame with FIN bit, and the RST_STREAM frame
UpdateHighestReceived(offset protocol.ByteCount, final bool) error
// MaybeQueueWindowUpdate queues a window update, if necessary
MaybeQueueWindowUpdate()
}
// The ConnectionFlowController is the flow controller for the connection.

View file

@ -131,6 +131,9 @@ func (c *streamFlowController) MaybeQueueWindowUpdate() {
if hasWindowUpdate {
c.queueWindowUpdate()
}
if c.contributesToConnection {
c.connection.MaybeQueueWindowUpdate()
}
}
func (c *streamFlowController) GetWindowUpdate() protocol.ByteCount {

View file

@ -13,16 +13,18 @@ import (
var _ = Describe("Stream Flow controller", func() {
var (
controller *streamFlowController
queuedWindowUpdate bool
controller *streamFlowController
queuedWindowUpdate bool
queuedConnWindowUpdate bool
)
BeforeEach(func() {
queuedWindowUpdate = false
queuedConnWindowUpdate = false
rttStats := &congestion.RTTStats{}
controller = &streamFlowController{
streamID: 10,
connection: NewConnectionFlowController(1000, 1000, rttStats, utils.DefaultLogger).(*connectionFlowController),
connection: NewConnectionFlowController(1000, 1000, func() { queuedConnWindowUpdate = true }, rttStats, utils.DefaultLogger).(*connectionFlowController),
}
controller.maxReceiveWindowSize = 10000
controller.rttStats = rttStats
@ -37,7 +39,7 @@ var _ = Describe("Stream Flow controller", func() {
sendWindow := protocol.ByteCount(4000)
It("sets the send and receive windows", func() {
cc := NewConnectionFlowController(0, 0, nil, utils.DefaultLogger)
cc := NewConnectionFlowController(0, 0, nil, nil, utils.DefaultLogger)
fc := NewStreamFlowController(5, true, cc, receiveWindow, maxReceiveWindow, sendWindow, nil, rttStats, utils.DefaultLogger).(*streamFlowController)
Expect(fc.streamID).To(Equal(protocol.StreamID(5)))
Expect(fc.receiveWindow).To(Equal(receiveWindow))
@ -53,7 +55,7 @@ var _ = Describe("Stream Flow controller", func() {
queued = true
}
cc := NewConnectionFlowController(0, 0, nil, utils.DefaultLogger)
cc := NewConnectionFlowController(0, 0, nil, nil, utils.DefaultLogger)
fc := NewStreamFlowController(5, true, cc, receiveWindow, maxReceiveWindow, sendWindow, queueWindowUpdate, rttStats, utils.DefaultLogger).(*streamFlowController)
fc.AddBytesRead(receiveWindow)
fc.MaybeQueueWindowUpdate()
@ -189,6 +191,7 @@ var _ = Describe("Stream Flow controller", func() {
controller.receiveWindow = 100
controller.receiveWindowSize = 60
controller.bytesRead = 100 - 60
controller.connection.(*connectionFlowController).receiveWindow = 100
controller.connection.(*connectionFlowController).receiveWindowSize = 120
oldWindowSize = controller.receiveWindowSize
})
@ -205,6 +208,15 @@ var _ = Describe("Stream Flow controller", func() {
Expect(queuedWindowUpdate).To(BeFalse())
})
It("queues connection-level window updates", func() {
controller.contributesToConnection = true
controller.MaybeQueueWindowUpdate()
Expect(queuedConnWindowUpdate).To(BeFalse())
controller.AddBytesRead(60)
controller.MaybeQueueWindowUpdate()
Expect(queuedConnWindowUpdate).To(BeTrue())
})
It("tells the connection flow controller when the window was autotuned", func() {
oldOffset := controller.bytesRead
controller.contributesToConnection = true

View file

@ -79,6 +79,16 @@ func (mr *MockConnectionFlowControllerMockRecorder) IsNewlyBlocked() *gomock.Cal
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsNewlyBlocked", reflect.TypeOf((*MockConnectionFlowController)(nil).IsNewlyBlocked))
}
// MaybeQueueWindowUpdate mocks base method
func (m *MockConnectionFlowController) MaybeQueueWindowUpdate() {
m.ctrl.Call(m, "MaybeQueueWindowUpdate")
}
// MaybeQueueWindowUpdate indicates an expected call of MaybeQueueWindowUpdate
func (mr *MockConnectionFlowControllerMockRecorder) MaybeQueueWindowUpdate() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaybeQueueWindowUpdate", reflect.TypeOf((*MockConnectionFlowController)(nil).MaybeQueueWindowUpdate))
}
// SendWindowSize mocks base method
func (m *MockConnectionFlowController) SendWindowSize() protocol.ByteCount {
ret := m.ctrl.Call(m, "SendWindowSize")

View file

@ -405,6 +405,7 @@ func (s *session) preSetup() {
s.connFlowController = flowcontrol.NewConnectionFlowController(
protocol.ReceiveConnectionFlowControlWindow,
protocol.ByteCount(s.config.MaxReceiveConnectionFlowControlWindow),
s.onHasConnectionWindowUpdate,
s.rttStats,
s.logger,
)
@ -425,7 +426,7 @@ func (s *session) postSetup() error {
s.sessionCreationTime = now
s.receivedPacketHandler = ackhandler.NewReceivedPacketHandler(s.rttStats, s.version)
s.windowUpdateQueue = newWindowUpdateQueue(s.streamsMap, s.cryptoStream, s.packer.QueueControlFrame)
s.windowUpdateQueue = newWindowUpdateQueue(s.streamsMap, s.cryptoStream, s.connFlowController, s.packer.QueueControlFrame)
return nil
}
@ -1021,9 +1022,6 @@ func (s *session) maybeSendRetransmission() (bool, error) {
}
func (s *session) sendPacket() (bool, error) {
if offset := s.connFlowController.GetWindowUpdate(); offset != 0 {
s.packer.QueueControlFrame(&wire.MaxDataFrame{ByteOffset: offset})
}
if isBlocked, offset := s.connFlowController.IsNewlyBlocked(); isBlocked {
s.packer.QueueControlFrame(&wire.BlockedFrame{Offset: offset})
}
@ -1137,7 +1135,7 @@ func (s *session) newFlowController(id protocol.StreamID) flowcontrol.StreamFlow
protocol.ReceiveStreamFlowControlWindow,
protocol.ByteCount(s.config.MaxReceiveStreamFlowControlWindow),
initialSendWindow,
s.onHasWindowUpdate,
s.onHasStreamWindowUpdate,
s.rttStats,
s.logger,
)
@ -1152,7 +1150,7 @@ func (s *session) newCryptoStream() cryptoStreamI {
protocol.ReceiveStreamFlowControlWindow,
protocol.ByteCount(s.config.MaxReceiveStreamFlowControlWindow),
0,
s.onHasWindowUpdate,
s.onHasStreamWindowUpdate,
s.rttStats,
s.logger,
)
@ -1202,8 +1200,13 @@ func (s *session) queueControlFrame(f wire.Frame) {
s.scheduleSending()
}
func (s *session) onHasWindowUpdate(id protocol.StreamID) {
s.windowUpdateQueue.Add(id)
func (s *session) onHasStreamWindowUpdate(id protocol.StreamID) {
s.windowUpdateQueue.AddStream(id)
s.scheduleSending()
}
func (s *session) onHasConnectionWindowUpdate() {
s.windowUpdateQueue.AddConnection()
s.scheduleSending()
}

View file

@ -691,24 +691,6 @@ var _ = Describe("Session", func() {
Expect(mconn.written).To(Receive(ContainSubstring(string([]byte{0x03, 0x5e}))))
})
It("adds a MAX_DATA frames", func() {
fc := mocks.NewMockConnectionFlowController(mockCtrl)
fc.EXPECT().GetWindowUpdate().Return(protocol.ByteCount(0x1337))
fc.EXPECT().IsNewlyBlocked()
sess.connFlowController = fc
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {
Expect(p.Frames).To(Equal([]wire.Frame{
&wire.MaxDataFrame{ByteOffset: 0x1337},
}))
Expect(p.SendTime).To(BeTemporally("~", time.Now(), 100*time.Millisecond))
})
sess.sentPacketHandler = sph
sent, err := sess.sendPacket()
Expect(err).NotTo(HaveOccurred())
Expect(sent).To(BeTrue())
})
It("adds MAX_STREAM_DATA frames", func() {
sess.windowUpdateQueue.callback(&wire.MaxStreamDataFrame{
StreamID: 2,
@ -726,7 +708,6 @@ var _ = Describe("Session", func() {
It("adds a BLOCKED frame when it is connection-level flow control blocked", func() {
fc := mocks.NewMockConnectionFlowController(mockCtrl)
fc.EXPECT().GetWindowUpdate()
fc.EXPECT().IsNewlyBlocked().Return(true, protocol.ByteCount(1337))
sess.connFlowController = fc
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)

View file

@ -3,6 +3,7 @@ package quic
import (
"sync"
"github.com/lucas-clemente/quic-go/internal/flowcontrol"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/wire"
)
@ -10,29 +11,50 @@ import (
type windowUpdateQueue struct {
mutex sync.Mutex
queue map[protocol.StreamID]bool // used as a set
callback func(wire.Frame)
cryptoStream cryptoStreamI
streamGetter streamGetter
queue map[protocol.StreamID]bool // used as a set
queuedConn bool // connection-level window update
cryptoStream cryptoStreamI
streamGetter streamGetter
connFlowController flowcontrol.ConnectionFlowController
callback func(wire.Frame)
}
func newWindowUpdateQueue(streamGetter streamGetter, cryptoStream cryptoStreamI, cb func(wire.Frame)) *windowUpdateQueue {
func newWindowUpdateQueue(
streamGetter streamGetter,
cryptoStream cryptoStreamI,
connFC flowcontrol.ConnectionFlowController,
cb func(wire.Frame),
) *windowUpdateQueue {
return &windowUpdateQueue{
queue: make(map[protocol.StreamID]bool),
streamGetter: streamGetter,
cryptoStream: cryptoStream,
callback: cb,
queue: make(map[protocol.StreamID]bool),
streamGetter: streamGetter,
cryptoStream: cryptoStream,
connFlowController: connFC,
callback: cb,
}
}
func (q *windowUpdateQueue) Add(id protocol.StreamID) {
func (q *windowUpdateQueue) AddStream(id protocol.StreamID) {
q.mutex.Lock()
q.queue[id] = true
q.mutex.Unlock()
}
func (q *windowUpdateQueue) AddConnection() {
q.mutex.Lock()
q.queuedConn = true
q.mutex.Unlock()
}
func (q *windowUpdateQueue) QueueAll() {
q.mutex.Lock()
// queue a connection-level window update
if q.queuedConn {
q.callback(&wire.MaxDataFrame{ByteOffset: q.connFlowController.GetWindowUpdate()})
q.queuedConn = false
}
// queue all stream-level window updates
var offset protocol.ByteCount
for id := range q.queue {
if id == q.cryptoStream.StreamID() {

View file

@ -1,6 +1,7 @@
package quic
import (
"github.com/lucas-clemente/quic-go/internal/mocks"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/wire"
@ -12,6 +13,7 @@ var _ = Describe("Window Update Queue", func() {
var (
q *windowUpdateQueue
streamGetter *MockStreamGetter
connFC *mocks.MockConnectionFlowController
queuedFrames []wire.Frame
cryptoStream *MockCryptoStream
)
@ -19,9 +21,10 @@ var _ = Describe("Window Update Queue", func() {
BeforeEach(func() {
streamGetter = NewMockStreamGetter(mockCtrl)
cryptoStream = NewMockCryptoStream(mockCtrl)
connFC = mocks.NewMockConnectionFlowController(mockCtrl)
cryptoStream.EXPECT().StreamID().Return(protocol.StreamID(0)).AnyTimes()
queuedFrames = queuedFrames[:0]
q = newWindowUpdateQueue(streamGetter, cryptoStream, func(f wire.Frame) {
q = newWindowUpdateQueue(streamGetter, cryptoStream, connFC, func(f wire.Frame) {
queuedFrames = append(queuedFrames, f)
})
})
@ -33,8 +36,8 @@ var _ = Describe("Window Update Queue", func() {
stream3.EXPECT().getWindowUpdate().Return(protocol.ByteCount(30))
streamGetter.EXPECT().GetOrOpenReceiveStream(protocol.StreamID(3)).Return(stream3, nil)
streamGetter.EXPECT().GetOrOpenReceiveStream(protocol.StreamID(1)).Return(stream1, nil)
q.Add(3)
q.Add(1)
q.AddStream(3)
q.AddStream(1)
q.QueueAll()
Expect(queuedFrames).To(ContainElement(&wire.MaxStreamDataFrame{StreamID: 1, ByteOffset: 10}))
Expect(queuedFrames).To(ContainElement(&wire.MaxStreamDataFrame{StreamID: 3, ByteOffset: 30}))
@ -44,7 +47,7 @@ var _ = Describe("Window Update Queue", func() {
stream10 := NewMockStreamI(mockCtrl)
stream10.EXPECT().getWindowUpdate().Return(protocol.ByteCount(100))
streamGetter.EXPECT().GetOrOpenReceiveStream(protocol.StreamID(10)).Return(stream10, nil)
q.Add(10)
q.AddStream(10)
q.QueueAll()
Expect(queuedFrames).To(HaveLen(1))
q.QueueAll()
@ -53,7 +56,7 @@ var _ = Describe("Window Update Queue", func() {
It("doesn't queue a MAX_STREAM_DATA for a closed stream", func() {
streamGetter.EXPECT().GetOrOpenReceiveStream(protocol.StreamID(12)).Return(nil, nil)
q.Add(12)
q.AddStream(12)
q.QueueAll()
Expect(queuedFrames).To(BeEmpty())
})
@ -62,26 +65,35 @@ var _ = Describe("Window Update Queue", func() {
stream5 := NewMockStreamI(mockCtrl)
stream5.EXPECT().getWindowUpdate().Return(protocol.ByteCount(0))
streamGetter.EXPECT().GetOrOpenReceiveStream(protocol.StreamID(5)).Return(stream5, nil)
q.Add(5)
q.AddStream(5)
q.QueueAll()
Expect(queuedFrames).To(BeEmpty())
})
It("adds MAX_STREAM_DATA frames for the crypto stream", func() {
cryptoStream.EXPECT().getWindowUpdate().Return(protocol.ByteCount(42))
q.Add(0)
q.AddStream(0)
q.QueueAll()
Expect(queuedFrames).To(Equal([]wire.Frame{
&wire.MaxStreamDataFrame{StreamID: 0, ByteOffset: 42},
}))
})
It("queues MAX_DATA frames", func() {
connFC.EXPECT().GetWindowUpdate().Return(protocol.ByteCount(0x1337))
q.AddConnection()
q.QueueAll()
Expect(queuedFrames).To(Equal([]wire.Frame{
&wire.MaxDataFrame{ByteOffset: 0x1337},
}))
})
It("deduplicates", func() {
stream10 := NewMockStreamI(mockCtrl)
stream10.EXPECT().getWindowUpdate().Return(protocol.ByteCount(200))
streamGetter.EXPECT().GetOrOpenReceiveStream(protocol.StreamID(10)).Return(stream10, nil)
q.Add(10)
q.Add(10)
q.AddStream(10)
q.AddStream(10)
q.QueueAll()
Expect(queuedFrames).To(Equal([]wire.Frame{
&wire.MaxStreamDataFrame{StreamID: 10, ByteOffset: 200},