simplify sending of (connection-level) BLOCKED frames

This commit is contained in:
Marten Seemann 2017-12-18 14:54:09 +07:00
parent d49ad2d0cc
commit 092908d3e0
15 changed files with 80 additions and 148 deletions

View file

@ -13,6 +13,7 @@ type baseFlowController struct {
// for sending data
bytesSent protocol.ByteCount
sendWindow protocol.ByteCount
lastBlockedAt protocol.ByteCount
// for receiving data
mutex sync.RWMutex
@ -72,12 +73,14 @@ func (c *baseFlowController) getWindowUpdate() protocol.ByteCount {
return c.receiveWindow
}
// IsBlocked says if it is blocked by flow control.
// IsBlocked says if it is newly blocked by flow control.
// For every offset, it only returns true once.
// If it is blocked, the offset is returned.
func (c *baseFlowController) IsBlocked() (bool, protocol.ByteCount) {
if c.sendWindowSize() != 0 {
func (c *baseFlowController) IsNewlyBlocked() (bool, protocol.ByteCount) {
if c.sendWindowSize() != 0 || c.sendWindow == c.lastBlockedAt {
return false, 0
}
c.lastBlockedAt = c.sendWindow
return true, c.sendWindow
}

View file

@ -52,12 +52,26 @@ var _ = Describe("Base Flow controller", func() {
It("says when it's blocked", func() {
controller.UpdateSendWindow(100)
Expect(controller.IsBlocked()).To(BeFalse())
Expect(controller.IsNewlyBlocked()).To(BeFalse())
controller.AddBytesSent(100)
blocked, offset := controller.IsBlocked()
blocked, offset := controller.IsNewlyBlocked()
Expect(blocked).To(BeTrue())
Expect(offset).To(Equal(protocol.ByteCount(100)))
})
It("doesn't say that it's newly blocked multiple times for the same offset", func() {
controller.UpdateSendWindow(100)
controller.AddBytesSent(100)
newlyBlocked, offset := controller.IsNewlyBlocked()
Expect(newlyBlocked).To(BeTrue())
Expect(offset).To(Equal(protocol.ByteCount(100)))
newlyBlocked, _ = controller.IsNewlyBlocked()
Expect(newlyBlocked).To(BeFalse())
controller.UpdateSendWindow(150)
controller.AddBytesSent(150)
newlyBlocked, offset = controller.IsNewlyBlocked()
Expect(newlyBlocked).To(BeTrue())
})
})
Context("receive flow control", func() {

View file

@ -5,7 +5,7 @@ import "github.com/lucas-clemente/quic-go/internal/protocol"
type flowController interface {
// for sending
SendWindowSize() protocol.ByteCount
IsBlocked() (bool, protocol.ByteCount)
IsNewlyBlocked() (bool, protocol.ByteCount)
UpdateSendWindow(protocol.ByteCount)
AddBytesSent(protocol.ByteCount)
// for receiving

View file

@ -233,9 +233,9 @@ var _ = Describe("Stream Flow controller", func() {
controller.connection.UpdateSendWindow(50)
controller.UpdateSendWindow(100)
controller.AddBytesSent(50)
blocked, _ := controller.connection.IsBlocked()
blocked, _ := controller.connection.IsNewlyBlocked()
Expect(blocked).To(BeTrue())
Expect(controller.IsBlocked()).To(BeFalse())
Expect(controller.IsNewlyBlocked()).To(BeFalse())
})
})
})

View file

@ -65,17 +65,17 @@ func (_mr *MockConnectionFlowControllerMockRecorder) GetWindowUpdate() *gomock.C
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "GetWindowUpdate", reflect.TypeOf((*MockConnectionFlowController)(nil).GetWindowUpdate))
}
// IsBlocked mocks base method
func (_m *MockConnectionFlowController) IsBlocked() (bool, protocol.ByteCount) {
ret := _m.ctrl.Call(_m, "IsBlocked")
// IsNewlyBlocked mocks base method
func (_m *MockConnectionFlowController) IsNewlyBlocked() (bool, protocol.ByteCount) {
ret := _m.ctrl.Call(_m, "IsNewlyBlocked")
ret0, _ := ret[0].(bool)
ret1, _ := ret[1].(protocol.ByteCount)
return ret0, ret1
}
// IsBlocked indicates an expected call of IsBlocked
func (_mr *MockConnectionFlowControllerMockRecorder) IsBlocked() *gomock.Call {
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "IsBlocked", reflect.TypeOf((*MockConnectionFlowController)(nil).IsBlocked))
// IsNewlyBlocked indicates an expected call of IsNewlyBlocked
func (_mr *MockConnectionFlowControllerMockRecorder) IsNewlyBlocked() *gomock.Call {
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "IsNewlyBlocked", reflect.TypeOf((*MockConnectionFlowController)(nil).IsNewlyBlocked))
}
// SendWindowSize mocks base method

View file

@ -65,17 +65,17 @@ func (_mr *MockStreamFlowControllerMockRecorder) GetWindowUpdate() *gomock.Call
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "GetWindowUpdate", reflect.TypeOf((*MockStreamFlowController)(nil).GetWindowUpdate))
}
// IsBlocked mocks base method
func (_m *MockStreamFlowController) IsBlocked() (bool, protocol.ByteCount) {
ret := _m.ctrl.Call(_m, "IsBlocked")
// IsNewlyBlocked mocks base method
func (_m *MockStreamFlowController) IsNewlyBlocked() (bool, protocol.ByteCount) {
ret := _m.ctrl.Call(_m, "IsNewlyBlocked")
ret0, _ := ret[0].(bool)
ret1, _ := ret[1].(protocol.ByteCount)
return ret0, ret1
}
// IsBlocked indicates an expected call of IsBlocked
func (_mr *MockStreamFlowControllerMockRecorder) IsBlocked() *gomock.Call {
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "IsBlocked", reflect.TypeOf((*MockStreamFlowController)(nil).IsBlocked))
// IsNewlyBlocked indicates an expected call of IsNewlyBlocked
func (_mr *MockStreamFlowControllerMockRecorder) IsNewlyBlocked() *gomock.Call {
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "IsNewlyBlocked", reflect.TypeOf((*MockStreamFlowController)(nil).IsNewlyBlocked))
}
// SendWindowSize mocks base method

View file

@ -265,17 +265,9 @@ func (p *packetPacker) composeNextPacket(
fs[len(fs)-1].DataLenPresent = false
}
// TODO: Simplify
for _, f := range fs {
payloadFrames = append(payloadFrames, f)
}
for b := p.streamFramer.PopBlockedFrame(); b != nil; b = p.streamFramer.PopBlockedFrame() {
p.controlFrameMutex.Lock()
p.controlFrames = append(p.controlFrames, b)
p.controlFrameMutex.Unlock()
}
return payloadFrames, nil
}

View file

@ -63,7 +63,7 @@ var _ = Describe("Packet packer", func() {
version := versionGQUICFrames
cryptoStream = newCryptoStream(func() {}, flowcontrol.NewStreamFlowController(version.CryptoStreamID(), false, flowcontrol.NewConnectionFlowController(1000, 1000, nil), 1000, 1000, 1000, nil), version)
streamsMap := newStreamsMap(nil, protocol.PerspectiveServer, versionGQUICFrames)
streamFramer = newStreamFramer(cryptoStream, streamsMap, nil, versionGQUICFrames)
streamFramer = newStreamFramer(cryptoStream, streamsMap, versionGQUICFrames)
packer = &packetPacker{
cryptoSetup: &mockCryptoSetup{encLevelSeal: protocol.EncryptionForwardSecure},
@ -690,47 +690,6 @@ var _ = Describe("Packet packer", func() {
})
})
Context("BLOCKED frames", func() {
It("queues a BLOCKED frame", func() {
length := 100
streamFramer.blockedFrameQueue = []*wire.BlockedFrame{&wire.BlockedFrame{Offset: 555}}
f := &wire.StreamFrame{
StreamID: 5,
Data: bytes.Repeat([]byte{'f'}, length),
}
streamFramer.AddFrameForRetransmission(f)
_, err := packer.composeNextPacket(maxFrameSize, true)
Expect(err).ToNot(HaveOccurred())
Expect(packer.controlFrames[0]).To(Equal(&wire.BlockedFrame{Offset: 555}))
})
It("removes the dataLen attribute from the last STREAM frame, even if it queued a BLOCKED frame", func() {
length := 100
streamFramer.blockedFrameQueue = []*wire.BlockedFrame{&wire.BlockedFrame{Offset: 50}}
f := &wire.StreamFrame{
StreamID: 5,
Data: bytes.Repeat([]byte{'f'}, length),
}
streamFramer.AddFrameForRetransmission(f)
p, err := packer.composeNextPacket(maxFrameSize, true)
Expect(err).ToNot(HaveOccurred())
Expect(p).To(HaveLen(1))
Expect(p[0].(*wire.StreamFrame).DataLenPresent).To(BeFalse())
})
It("packs a connection-level BlockedFrame", func() {
streamFramer.blockedFrameQueue = []*wire.BlockedFrame{&wire.BlockedFrame{}}
f := &wire.StreamFrame{
StreamID: 5,
Data: []byte("foobar"),
}
streamFramer.AddFrameForRetransmission(f)
_, err := packer.composeNextPacket(maxFrameSize, true)
Expect(err).ToNot(HaveOccurred())
Expect(packer.controlFrames[0]).To(Equal(&wire.BlockedFrame{}))
})
})
It("returns nil if we only have a single STOP_WAITING", func() {
packer.QueueControlFrame(&wire.StopWaitingFrame{})
p, err := packer.PackPacket()

View file

@ -146,7 +146,7 @@ func (s *sendStream) PopStreamFrame(maxBytes protocol.ByteCount) *wire.StreamFra
if frame.FinBit {
s.finSent = true
} else if s.streamID != s.version.CryptoStreamID() { // TODO(#657): Flow control for the crypto stream
if isBlocked, offset := s.flowController.IsBlocked(); isBlocked {
if isBlocked, offset := s.flowController.IsNewlyBlocked(); isBlocked {
s.queueControlFrame(&wire.StreamBlockedFrame{
StreamID: s.streamID,
Offset: offset,

View file

@ -49,7 +49,7 @@ var _ = Describe("Send Stream", func() {
It("writes and gets all data at once", func() {
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
mockFC.EXPECT().IsBlocked()
mockFC.EXPECT().IsNewlyBlocked()
done := make(chan struct{})
go func() {
defer GinkgoRecover()
@ -78,7 +78,7 @@ var _ = Describe("Send Stream", func() {
frameHeaderLen := protocol.ByteCount(4)
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
mockFC.EXPECT().AddBytesSent(gomock.Any() /* protocol.ByteCount(3)*/).Times(2)
mockFC.EXPECT().IsBlocked().Times(2)
mockFC.EXPECT().IsNewlyBlocked().Times(2)
done := make(chan struct{})
go func() {
defer GinkgoRecover()
@ -115,7 +115,7 @@ var _ = Describe("Send Stream", func() {
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(1))
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(2))
mockFC.EXPECT().IsBlocked().Times(2)
mockFC.EXPECT().IsNewlyBlocked().Times(2)
s := []byte("foo")
go func() {
defer GinkgoRecover()
@ -155,7 +155,7 @@ var _ = Describe("Send Stream", func() {
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
// don't use offset 6 here, to make sure the BLOCKED frame contains the number returned by the flow controller
mockFC.EXPECT().IsBlocked().Return(true, protocol.ByteCount(10))
mockFC.EXPECT().IsNewlyBlocked().Return(true, protocol.ByteCount(10))
done := make(chan struct{})
go func() {
defer GinkgoRecover()
@ -181,7 +181,7 @@ var _ = Describe("Send Stream", func() {
It("doesn't queue a BLOCKED frame if the stream is flow control blocked, but the frame popped has the FIN bit set", func() {
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
// don't EXPECT a call to IsBlocked
// don't EXPECT a call to IsNewlyBlocked
done := make(chan struct{})
go func() {
defer GinkgoRecover()
@ -222,7 +222,7 @@ var _ = Describe("Send Stream", func() {
It("returns the number of bytes written, when the deadline expires", func() {
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(10000)).AnyTimes()
mockFC.EXPECT().AddBytesSent(gomock.Any())
mockFC.EXPECT().IsBlocked()
mockFC.EXPECT().IsNewlyBlocked()
deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
str.SetWriteDeadline(deadline)
var n int
@ -299,7 +299,7 @@ var _ = Describe("Send Stream", func() {
frameHeaderLen := protocol.ByteCount(4)
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2)
mockFC.EXPECT().IsBlocked()
mockFC.EXPECT().IsNewlyBlocked()
str.dataForWriting = []byte("foobar")
str.Close()
f := str.PopStreamFrame(3 + frameHeaderLen)
@ -340,7 +340,7 @@ var _ = Describe("Send Stream", func() {
It("doesn't get data for writing if an error occurred", func() {
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
mockFC.EXPECT().AddBytesSent(gomock.Any())
mockFC.EXPECT().IsBlocked()
mockFC.EXPECT().IsNewlyBlocked()
done := make(chan struct{})
go func() {
defer GinkgoRecover()
@ -380,7 +380,7 @@ var _ = Describe("Send Stream", func() {
It("unblocks Write", func() {
mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount)
mockFC.EXPECT().AddBytesSent(gomock.Any())
mockFC.EXPECT().IsBlocked()
mockFC.EXPECT().IsNewlyBlocked()
writeReturned := make(chan struct{})
var n int
go func() {

View file

@ -314,7 +314,7 @@ func (s *session) postSetup(initialPacketNumber protocol.PacketNumber) error {
s.receivedPacketHandler = ackhandler.NewReceivedPacketHandler(s.version)
s.streamsMap = newStreamsMap(s.newStream, s.perspective, s.version)
s.streamFramer = newStreamFramer(s.cryptoStream, s.streamsMap, s.connFlowController, s.version)
s.streamFramer = newStreamFramer(s.cryptoStream, s.streamsMap, s.version)
s.packer = newPacketPacker(s.connectionID,
initialPacketNumber,
@ -721,6 +721,9 @@ func (s *session) sendPacket() error {
for _, f := range s.getWindowUpdates() {
s.packer.QueueControlFrame(f)
}
if isBlocked, offset := s.connFlowController.IsNewlyBlocked(); isBlocked {
s.packer.QueueControlFrame(&wire.BlockedFrame{Offset: offset})
}
ack := s.receivedPacketHandler.GetAckFrame()
if ack != nil {

View file

@ -857,6 +857,27 @@ var _ = Describe("Session", func() {
Expect(mconn.written).To(HaveLen(1))
})
It("adds a BLOCKED frame when it is connection-level flow control blocked", func() {
fc := mocks.NewMockConnectionFlowController(mockCtrl)
fc.EXPECT().GetWindowUpdate()
fc.EXPECT().IsNewlyBlocked().Return(true, protocol.ByteCount(1337))
sess.connFlowController = fc
sph := mocks.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().GetLeastUnacked().AnyTimes()
sph.EXPECT().SendingAllowed().Return(true)
sph.EXPECT().SendingAllowed()
sph.EXPECT().DequeuePacketForRetransmission()
sph.EXPECT().ShouldSendRetransmittablePacket()
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {
Expect(p.Frames).To(Equal([]wire.Frame{
&wire.BlockedFrame{Offset: 1337},
}))
})
sess.sentPacketHandler = sph
err := sess.sendPacket()
Expect(err).ToNot(HaveOccurred())
})
It("sends public reset", func() {
err := sess.sendPublicReset(1)
Expect(err).NotTo(HaveOccurred())

View file

@ -1,7 +1,6 @@
package quic
import (
"github.com/lucas-clemente/quic-go/internal/flowcontrol"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/wire"
)
@ -11,22 +10,17 @@ type streamFramer struct {
cryptoStream cryptoStreamI
version protocol.VersionNumber
connFlowController flowcontrol.ConnectionFlowController
retransmissionQueue []*wire.StreamFrame
blockedFrameQueue []*wire.BlockedFrame
}
func newStreamFramer(
cryptoStream cryptoStreamI,
streamsMap *streamsMap,
cfc flowcontrol.ConnectionFlowController,
v protocol.VersionNumber,
) *streamFramer {
return &streamFramer{
streamsMap: streamsMap,
cryptoStream: cryptoStream,
connFlowController: cfc,
version: v,
}
}
@ -40,15 +34,6 @@ func (f *streamFramer) PopStreamFrames(maxLen protocol.ByteCount) []*wire.Stream
return append(fs, f.maybePopNormalFrames(maxLen-currentLen)...)
}
func (f *streamFramer) PopBlockedFrame() wire.Frame {
if len(f.blockedFrameQueue) == 0 {
return nil
}
frame := f.blockedFrameQueue[0]
f.blockedFrameQueue = f.blockedFrameQueue[1:]
return frame
}
func (f *streamFramer) HasFramesForRetransmission() bool {
return len(f.retransmissionQueue) > 0
}
@ -103,19 +88,11 @@ func (f *streamFramer) maybePopNormalFrames(maxTotalLen protocol.ByteCount) (res
if frame == nil {
return true, nil
}
if blocked, offset := f.connFlowController.IsBlocked(); blocked {
f.blockedFrameQueue = append(f.blockedFrameQueue, &wire.BlockedFrame{Offset: offset})
}
res = append(res, frame)
currentLen += frame.MinLength(f.version) + frame.DataLen()
if currentLen == maxTotalLen {
return false, nil
}
frame = &wire.StreamFrame{DataLenPresent: true}
return true, nil
}

View file

@ -23,7 +23,6 @@ var _ = Describe("Stream Framer", func() {
framer *streamFramer
streamsMap *streamsMap
stream1, stream2 *mocks.MockStreamI
connFC *mocks.MockConnectionFlowController
)
setNoData := func(str *mocks.MockStreamI) {
@ -49,8 +48,7 @@ var _ = Describe("Stream Framer", func() {
streamsMap.putStream(stream1)
streamsMap.putStream(stream2)
connFC = mocks.NewMockConnectionFlowController(mockCtrl)
framer = newStreamFramer(nil, streamsMap, connFC, versionGQUICFrames)
framer = newStreamFramer(nil, streamsMap, versionGQUICFrames)
})
It("says if it has retransmissions", func() {
@ -69,11 +67,6 @@ var _ = Describe("Stream Framer", func() {
})
Context("Popping", func() {
BeforeEach(func() {
// we're not connection-level flow control blocked
connFC.EXPECT().IsBlocked().AnyTimes()
})
It("returns nil when popping an empty framer", func() {
setNoData(stream1)
setNoData(stream2)
@ -257,34 +250,4 @@ var _ = Describe("Stream Framer", func() {
})
})
})
Context("BLOCKED frames", func() {
It("doesn't queue a stream-level BLOCKED frame after sending the FIN bit frame", func() {
setNoData(stream2)
f := &wire.StreamFrame{
StreamID: id1,
Data: []byte("foobar"),
FinBit: true,
}
connFC.EXPECT().IsBlocked()
stream1.EXPECT().PopStreamFrame(gomock.Any()).Return(f)
// no call to IsFlowControlBlocked()
frames := framer.PopStreamFrames(1000)
Expect(frames).To(Equal([]*wire.StreamFrame{f}))
blockedFrame := framer.PopBlockedFrame()
Expect(blockedFrame).To(BeNil())
})
It("queues and pops BLOCKED frames for connection blocked streams", func() {
setNoData(stream2)
connFC.EXPECT().IsBlocked().Return(true, protocol.ByteCount(0x4321))
stream1.EXPECT().PopStreamFrame(gomock.Any()).Return(&wire.StreamFrame{
StreamID: id1,
Data: []byte("foo"),
})
framer.PopStreamFrames(1000)
Expect(framer.PopBlockedFrame()).To(Equal(&wire.BlockedFrame{Offset: 0x4321}))
Expect(framer.PopBlockedFrame()).To(BeNil())
})
})
})

View file

@ -138,7 +138,7 @@ var _ = Describe("Stream", func() {
str.version = versionGQUICFrames
mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).AnyTimes()
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
mockFC.EXPECT().IsBlocked()
mockFC.EXPECT().IsNewlyBlocked()
err := str.CancelRead(1234)
Expect(err).ToNot(HaveOccurred())
Expect(queuedControlFrames).To(BeEmpty()) // no RST_STREAM frame queued yet