From 69998c19cbbdcfe98167899c722afdf8c7d0a933 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 14 Dec 2017 14:30:14 +0700 Subject: [PATCH 1/2] add the offset field to the BLOCKED and STREAM_BLOCKED frames The value is not yet set, but parsing and writing works. --- internal/wire/blocked_frame.go | 20 +++++++++++---- internal/wire/blocked_frame_test.go | 29 ++++++++++++++++------ internal/wire/stream_blocked_frame.go | 15 ++++++++--- internal/wire/stream_blocked_frame_test.go | 10 ++++++-- packet_unpacker.go | 3 +++ packet_unpacker_test.go | 8 ++++-- 6 files changed, 65 insertions(+), 20 deletions(-) diff --git a/internal/wire/blocked_frame.go b/internal/wire/blocked_frame.go index 72c8a056..04dd29d8 100644 --- a/internal/wire/blocked_frame.go +++ b/internal/wire/blocked_frame.go @@ -4,17 +4,26 @@ import ( "bytes" "github.com/lucas-clemente/quic-go/internal/protocol" + "github.com/lucas-clemente/quic-go/internal/utils" ) // A BlockedFrame is a BLOCKED frame -type BlockedFrame struct{} +type BlockedFrame struct { + Offset protocol.ByteCount +} // ParseBlockedFrame parses a BLOCKED frame -func ParseBlockedFrame(r *bytes.Reader, version protocol.VersionNumber) (*BlockedFrame, error) { +func ParseBlockedFrame(r *bytes.Reader, _ protocol.VersionNumber) (*BlockedFrame, error) { if _, err := r.ReadByte(); err != nil { return nil, err } - return &BlockedFrame{}, nil + offset, err := utils.ReadVarInt(r) + if err != nil { + return nil, err + } + return &BlockedFrame{ + Offset: protocol.ByteCount(offset), + }, nil } func (f *BlockedFrame) Write(b *bytes.Buffer, version protocol.VersionNumber) error { @@ -23,13 +32,14 @@ func (f *BlockedFrame) Write(b *bytes.Buffer, version protocol.VersionNumber) er } typeByte := uint8(0x08) b.WriteByte(typeByte) + utils.WriteVarInt(b, uint64(f.Offset)) return nil } // MinLength of a written frame func (f *BlockedFrame) MinLength(version protocol.VersionNumber) protocol.ByteCount { - if !version.UsesIETFFrameFormat() { // writing this frame would result in a legacy BLOCKED being written, which is longer + if !version.UsesIETFFrameFormat() { return 1 + 4 } - return 1 + return 1 + utils.VarIntLen(uint64(f.Offset)) } diff --git a/internal/wire/blocked_frame_test.go b/internal/wire/blocked_frame_test.go index 9a3e2dde..ce58820b 100644 --- a/internal/wire/blocked_frame_test.go +++ b/internal/wire/blocked_frame_test.go @@ -2,8 +2,10 @@ package wire import ( "bytes" + "io" "github.com/lucas-clemente/quic-go/internal/protocol" + "github.com/lucas-clemente/quic-go/internal/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -12,30 +14,41 @@ import ( var _ = Describe("BLOCKED frame", func() { Context("when parsing", func() { It("accepts sample frame", func() { - b := bytes.NewReader([]byte{0x08}) - _, err := ParseBlockedFrame(b, protocol.VersionWhatever) + data := []byte{0x08} + data = append(data, encodeVarInt(0x12345678)...) + b := bytes.NewReader(data) + frame, err := ParseBlockedFrame(b, versionIETFFrames) Expect(err).ToNot(HaveOccurred()) + Expect(frame.Offset).To(Equal(protocol.ByteCount(0x12345678))) Expect(b.Len()).To(BeZero()) }) It("errors on EOFs", func() { - _, err := ParseBlockedFrame(bytes.NewReader(nil), protocol.VersionWhatever) - Expect(err).To(HaveOccurred()) + data := []byte{0x08} + data = append(data, encodeVarInt(0x12345678)...) + _, err := ParseBlockedFrame(bytes.NewReader(data), versionIETFFrames) + Expect(err).ToNot(HaveOccurred()) + for i := range data { + _, err := ParseBlockedFrame(bytes.NewReader(data[:i]), versionIETFFrames) + Expect(err).To(MatchError(io.EOF)) + } }) }) Context("when writing", func() { It("writes a sample frame", func() { b := &bytes.Buffer{} - frame := BlockedFrame{} + frame := BlockedFrame{Offset: 0xdeadbeef} err := frame.Write(b, protocol.VersionWhatever) Expect(err).ToNot(HaveOccurred()) - Expect(b.Bytes()).To(Equal([]byte{0x08})) + expected := []byte{0x08} + expected = append(expected, encodeVarInt(0xdeadbeef)...) + Expect(b.Bytes()).To(Equal(expected)) }) It("has the correct min length", func() { - frame := BlockedFrame{} - Expect(frame.MinLength(versionIETFFrames)).To(Equal(protocol.ByteCount(1))) + frame := BlockedFrame{Offset: 0x12345} + Expect(frame.MinLength(versionIETFFrames)).To(Equal(1 + utils.VarIntLen(0x12345))) }) }) }) diff --git a/internal/wire/stream_blocked_frame.go b/internal/wire/stream_blocked_frame.go index b2ecd584..b67bd24e 100644 --- a/internal/wire/stream_blocked_frame.go +++ b/internal/wire/stream_blocked_frame.go @@ -10,10 +10,11 @@ import ( // A StreamBlockedFrame in QUIC type StreamBlockedFrame struct { StreamID protocol.StreamID + Offset protocol.ByteCount } // ParseStreamBlockedFrame parses a STREAM_BLOCKED frame -func ParseStreamBlockedFrame(r *bytes.Reader, version protocol.VersionNumber) (*StreamBlockedFrame, error) { +func ParseStreamBlockedFrame(r *bytes.Reader, _ protocol.VersionNumber) (*StreamBlockedFrame, error) { if _, err := r.ReadByte(); err != nil { // read the TypeByte return nil, err } @@ -21,7 +22,14 @@ func ParseStreamBlockedFrame(r *bytes.Reader, version protocol.VersionNumber) (* if err != nil { return nil, err } - return &StreamBlockedFrame{StreamID: protocol.StreamID(sid)}, nil + offset, err := utils.ReadVarInt(r) + if err != nil { + return nil, err + } + return &StreamBlockedFrame{ + StreamID: protocol.StreamID(sid), + Offset: protocol.ByteCount(offset), + }, nil } // Write writes a STREAM_BLOCKED frame @@ -31,6 +39,7 @@ func (f *StreamBlockedFrame) Write(b *bytes.Buffer, version protocol.VersionNumb } b.WriteByte(0x09) utils.WriteVarInt(b, uint64(f.StreamID)) + utils.WriteVarInt(b, uint64(f.Offset)) return nil } @@ -39,5 +48,5 @@ func (f *StreamBlockedFrame) MinLength(version protocol.VersionNumber) protocol. if !version.UsesIETFFrameFormat() { return 1 + 4 } - return 1 + utils.VarIntLen(uint64(f.StreamID)) + return 1 + utils.VarIntLen(uint64(f.StreamID)) + utils.VarIntLen(uint64(f.Offset)) } diff --git a/internal/wire/stream_blocked_frame_test.go b/internal/wire/stream_blocked_frame_test.go index d31ce787..42b2046a 100644 --- a/internal/wire/stream_blocked_frame_test.go +++ b/internal/wire/stream_blocked_frame_test.go @@ -14,17 +14,20 @@ var _ = Describe("STREAM_BLOCKED frame", func() { Context("parsing", func() { It("accepts sample frame", func() { data := []byte{0x9} - data = append(data, encodeVarInt(0xdeadbeef)...) + data = append(data, encodeVarInt(0xdeadbeef)...) // stream ID + data = append(data, encodeVarInt(0xdecafbad)...) // offset b := bytes.NewReader(data) frame, err := ParseStreamBlockedFrame(b, versionIETFFrames) Expect(err).ToNot(HaveOccurred()) Expect(frame.StreamID).To(Equal(protocol.StreamID(0xdeadbeef))) + Expect(frame.Offset).To(Equal(protocol.ByteCount(0xdecafbad))) Expect(b.Len()).To(BeZero()) }) It("errors on EOFs", func() { data := []byte{0x9} data = append(data, encodeVarInt(0xdeadbeef)...) + data = append(data, encodeVarInt(0xc0010ff)...) _, err := ParseStreamBlockedFrame(bytes.NewReader(data), versionIETFFrames) Expect(err).NotTo(HaveOccurred()) for i := range data { @@ -38,19 +41,22 @@ var _ = Describe("STREAM_BLOCKED frame", func() { It("has proper min length", func() { f := &StreamBlockedFrame{ StreamID: 0x1337, + Offset: 0xdeadbeef, } - Expect(f.MinLength(0)).To(Equal(1 + utils.VarIntLen(0x1337))) + Expect(f.MinLength(0)).To(Equal(1 + utils.VarIntLen(0x1337) + utils.VarIntLen(0xdeadbeef))) }) It("writes a sample frame", func() { b := &bytes.Buffer{} f := &StreamBlockedFrame{ StreamID: 0xdecafbad, + Offset: 0x1337, } err := f.Write(b, versionIETFFrames) Expect(err).ToNot(HaveOccurred()) expected := []byte{0x9} expected = append(expected, encodeVarInt(uint64(f.StreamID))...) + expected = append(expected, encodeVarInt(uint64(f.Offset))...) Expect(b.Bytes()).To(Equal(expected)) }) }) diff --git a/packet_unpacker.go b/packet_unpacker.go index 9b11085f..9d09c373 100644 --- a/packet_unpacker.go +++ b/packet_unpacker.go @@ -117,6 +117,9 @@ func (u *packetUnpacker) parseIETFFrame(r *bytes.Reader, typeByte byte, hdr *wir frame, err = wire.ParsePingFrame(r, u.version) case 0x8: frame, err = wire.ParseBlockedFrame(r, u.version) + if err != nil { + err = qerr.Error(qerr.InvalidBlockedData, err.Error()) + } case 0x9: frame, err = wire.ParseStreamBlockedFrame(r, u.version) if err != nil { diff --git a/packet_unpacker_test.go b/packet_unpacker_test.go index 22816948..8934bb7e 100644 --- a/packet_unpacker_test.go +++ b/packet_unpacker_test.go @@ -343,7 +343,7 @@ var _ = Describe("Packet unpacker", func() { }) It("unpacks connection-level BLOCKED frames", func() { - f := &wire.BlockedFrame{} + f := &wire.BlockedFrame{Offset: 0x1234} buf := &bytes.Buffer{} err := f.Write(buf, versionIETFFrames) Expect(err).ToNot(HaveOccurred()) @@ -354,7 +354,10 @@ var _ = Describe("Packet unpacker", func() { }) It("unpacks stream-level BLOCKED frames", func() { - f := &wire.StreamBlockedFrame{StreamID: 0xdeadbeef} + f := &wire.StreamBlockedFrame{ + StreamID: 0xdeadbeef, + Offset: 0xdead, + } buf := &bytes.Buffer{} err := f.Write(buf, versionIETFFrames) Expect(err).ToNot(HaveOccurred()) @@ -403,6 +406,7 @@ var _ = Describe("Packet unpacker", func() { 0x02: qerr.InvalidConnectionCloseData, 0x04: qerr.InvalidWindowUpdateData, 0x05: qerr.InvalidWindowUpdateData, + 0x08: qerr.InvalidBlockedData, 0x09: qerr.InvalidBlockedData, 0x0c: qerr.InvalidFrameData, 0x0e: qerr.InvalidAckData, From 00edfb7461b6b02efb0ef2f5e9296a7ea629b066 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 14 Dec 2017 15:19:55 +0700 Subject: [PATCH 2/2] set the offset in BLOCKED and STREAM_BLOCKED frames --- crypto_stream.go | 2 +- internal/flowcontrol/base_flow_controller.go | 9 ++++++-- .../flowcontrol/base_flow_controller_test.go | 4 +++- internal/flowcontrol/interface.go | 2 +- .../stream_flow_controller_test.go | 3 ++- internal/mocks/connection_flow_controller.go | 5 +++-- internal/mocks/stream.go | 5 +++-- internal/mocks/stream_flow_controller.go | 5 +++-- stream.go | 4 ++-- stream_framer.go | 13 ++++++++---- stream_framer_test.go | 21 +++++++++---------- stream_test.go | 11 ++++++---- 12 files changed, 51 insertions(+), 33 deletions(-) diff --git a/crypto_stream.go b/crypto_stream.go index b9263da6..c98bc6d4 100644 --- a/crypto_stream.go +++ b/crypto_stream.go @@ -19,7 +19,7 @@ type cryptoStreamI interface { // methods needed for flow control GetWindowUpdate() protocol.ByteCount HandleMaxStreamDataFrame(*wire.MaxStreamDataFrame) - IsFlowControlBlocked() bool + IsFlowControlBlocked() (bool, protocol.ByteCount) } type cryptoStream struct { diff --git a/internal/flowcontrol/base_flow_controller.go b/internal/flowcontrol/base_flow_controller.go index f7d70c7e..616e4bf5 100644 --- a/internal/flowcontrol/base_flow_controller.go +++ b/internal/flowcontrol/base_flow_controller.go @@ -79,11 +79,16 @@ func (c *baseFlowController) getWindowUpdate() protocol.ByteCount { return c.receiveWindow } -func (c *baseFlowController) IsBlocked() bool { +// IsBlocked says if it is blocked by flow control. +// If it is blocked, the offset is returned. +func (c *baseFlowController) IsBlocked() (bool, protocol.ByteCount) { c.mutex.RLock() defer c.mutex.RUnlock() - return c.sendWindowSize() == 0 + if c.sendWindowSize() != 0 { + return false, 0 + } + return true, c.sendWindow } // maybeAdjustWindowIncrement increases the receiveWindowIncrement if we're sending updates too often. diff --git a/internal/flowcontrol/base_flow_controller_test.go b/internal/flowcontrol/base_flow_controller_test.go index 9239722c..790d31f3 100644 --- a/internal/flowcontrol/base_flow_controller_test.go +++ b/internal/flowcontrol/base_flow_controller_test.go @@ -54,7 +54,9 @@ var _ = Describe("Base Flow controller", func() { controller.UpdateSendWindow(100) Expect(controller.IsBlocked()).To(BeFalse()) controller.AddBytesSent(100) - Expect(controller.IsBlocked()).To(BeTrue()) + blocked, offset := controller.IsBlocked() + Expect(blocked).To(BeTrue()) + Expect(offset).To(Equal(protocol.ByteCount(100))) }) }) diff --git a/internal/flowcontrol/interface.go b/internal/flowcontrol/interface.go index 75ec6fac..f5577283 100644 --- a/internal/flowcontrol/interface.go +++ b/internal/flowcontrol/interface.go @@ -5,7 +5,7 @@ import "github.com/lucas-clemente/quic-go/internal/protocol" type flowController interface { // for sending SendWindowSize() protocol.ByteCount - IsBlocked() bool + IsBlocked() (bool, protocol.ByteCount) UpdateSendWindow(protocol.ByteCount) AddBytesSent(protocol.ByteCount) // for receiving diff --git a/internal/flowcontrol/stream_flow_controller_test.go b/internal/flowcontrol/stream_flow_controller_test.go index ed51adcf..5938e5e6 100644 --- a/internal/flowcontrol/stream_flow_controller_test.go +++ b/internal/flowcontrol/stream_flow_controller_test.go @@ -233,7 +233,8 @@ var _ = Describe("Stream Flow controller", func() { controller.connection.UpdateSendWindow(50) controller.UpdateSendWindow(100) controller.AddBytesSent(50) - Expect(controller.connection.IsBlocked()).To(BeTrue()) + blocked, _ := controller.connection.IsBlocked() + Expect(blocked).To(BeTrue()) Expect(controller.IsBlocked()).To(BeFalse()) }) }) diff --git a/internal/mocks/connection_flow_controller.go b/internal/mocks/connection_flow_controller.go index 26c77cbe..cf99e9d1 100644 --- a/internal/mocks/connection_flow_controller.go +++ b/internal/mocks/connection_flow_controller.go @@ -66,10 +66,11 @@ func (_mr *MockConnectionFlowControllerMockRecorder) GetWindowUpdate() *gomock.C } // IsBlocked mocks base method -func (_m *MockConnectionFlowController) IsBlocked() bool { +func (_m *MockConnectionFlowController) IsBlocked() (bool, protocol.ByteCount) { ret := _m.ctrl.Call(_m, "IsBlocked") ret0, _ := ret[0].(bool) - return ret0 + ret1, _ := ret[1].(protocol.ByteCount) + return ret0, ret1 } // IsBlocked indicates an expected call of IsBlocked diff --git a/internal/mocks/stream.go b/internal/mocks/stream.go index 107d9177..0500861b 100644 --- a/internal/mocks/stream.go +++ b/internal/mocks/stream.go @@ -129,10 +129,11 @@ func (_mr *MockStreamIMockRecorder) HandleStreamFrame(arg0 interface{}) *gomock. } // IsFlowControlBlocked mocks base method -func (_m *MockStreamI) IsFlowControlBlocked() bool { +func (_m *MockStreamI) IsFlowControlBlocked() (bool, protocol.ByteCount) { ret := _m.ctrl.Call(_m, "IsFlowControlBlocked") ret0, _ := ret[0].(bool) - return ret0 + ret1, _ := ret[1].(protocol.ByteCount) + return ret0, ret1 } // IsFlowControlBlocked indicates an expected call of IsFlowControlBlocked diff --git a/internal/mocks/stream_flow_controller.go b/internal/mocks/stream_flow_controller.go index 7e970bb4..8bca7b9e 100644 --- a/internal/mocks/stream_flow_controller.go +++ b/internal/mocks/stream_flow_controller.go @@ -66,10 +66,11 @@ func (_mr *MockStreamFlowControllerMockRecorder) GetWindowUpdate() *gomock.Call } // IsBlocked mocks base method -func (_m *MockStreamFlowController) IsBlocked() bool { +func (_m *MockStreamFlowController) IsBlocked() (bool, protocol.ByteCount) { ret := _m.ctrl.Call(_m, "IsBlocked") ret0, _ := ret[0].(bool) - return ret0 + ret1, _ := ret[1].(protocol.ByteCount) + return ret0, ret1 } // IsBlocked indicates an expected call of IsBlocked diff --git a/stream.go b/stream.go index 6dacfd3f..aacacd77 100644 --- a/stream.go +++ b/stream.go @@ -25,7 +25,7 @@ type streamI interface { // methods needed for flow control GetWindowUpdate() protocol.ByteCount HandleMaxStreamDataFrame(*wire.MaxStreamDataFrame) - IsFlowControlBlocked() bool + IsFlowControlBlocked() (bool, protocol.ByteCount) } // A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface @@ -484,7 +484,7 @@ func (s *stream) HandleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) { s.flowController.UpdateSendWindow(frame.ByteOffset) } -func (s *stream) IsFlowControlBlocked() bool { +func (s *stream) IsFlowControlBlocked() (bool, protocol.ByteCount) { return s.flowController.IsBlocked() } diff --git a/stream_framer.go b/stream_framer.go index 9244bc3e..d5fa09a7 100644 --- a/stream_framer.go +++ b/stream_framer.go @@ -105,11 +105,16 @@ func (f *streamFramer) maybePopNormalFrames(maxTotalLen protocol.ByteCount) (res } // Finally, check if we are now FC blocked and should queue a BLOCKED frame - if !frame.FinBit && s.IsFlowControlBlocked() { - f.blockedFrameQueue = append(f.blockedFrameQueue, &wire.StreamBlockedFrame{StreamID: s.StreamID()}) + if !frame.FinBit { + if blocked, offset := s.IsFlowControlBlocked(); blocked { + f.blockedFrameQueue = append(f.blockedFrameQueue, &wire.StreamBlockedFrame{ + StreamID: s.StreamID(), + Offset: offset, + }) + } } - if f.connFlowController.IsBlocked() { - f.blockedFrameQueue = append(f.blockedFrameQueue, &wire.BlockedFrame{}) + if blocked, offset := f.connFlowController.IsBlocked(); blocked { + f.blockedFrameQueue = append(f.blockedFrameQueue, &wire.BlockedFrame{Offset: offset}) } res = append(res, frame) diff --git a/stream_framer_test.go b/stream_framer_test.go index cb5ba7c3..d3307cdd 100644 --- a/stream_framer_test.go +++ b/stream_framer_test.go @@ -72,8 +72,8 @@ var _ = Describe("Stream Framer", func() { BeforeEach(func() { // nothing is blocked here connFC.EXPECT().IsBlocked().AnyTimes() - stream1.EXPECT().IsFlowControlBlocked().Return(false).AnyTimes() - stream2.EXPECT().IsFlowControlBlocked().Return(false).AnyTimes() + stream1.EXPECT().IsFlowControlBlocked().Return(false, protocol.ByteCount(0)).AnyTimes() + stream2.EXPECT().IsFlowControlBlocked().Return(false, protocol.ByteCount(0)).AnyTimes() }) It("returns nil when popping an empty framer", func() { @@ -272,13 +272,13 @@ var _ = Describe("Stream Framer", func() { StreamID: id1, Data: []byte("foobar"), }) - stream1.EXPECT().IsFlowControlBlocked().Return(true) + stream1.EXPECT().IsFlowControlBlocked().Return(true, protocol.ByteCount(0x1234)) frames := framer.PopStreamFrames(1000) Expect(frames).To(HaveLen(1)) - f := framer.PopBlockedFrame() - Expect(f).To(BeAssignableToTypeOf(&wire.StreamBlockedFrame{})) - bf := f.(*wire.StreamBlockedFrame) - Expect(bf.StreamID).To(Equal(stream1.StreamID())) + Expect(framer.PopBlockedFrame()).To(Equal(&wire.StreamBlockedFrame{ + StreamID: stream1.StreamID(), + Offset: 0x1234, + })) Expect(framer.PopBlockedFrame()).To(BeNil()) }) @@ -300,15 +300,14 @@ var _ = Describe("Stream Framer", func() { It("queues and pops BLOCKED frames for connection blocked streams", func() { setNoData(stream2) - connFC.EXPECT().IsBlocked().Return(true) + connFC.EXPECT().IsBlocked().Return(true, protocol.ByteCount(0x4321)) stream1.EXPECT().PopStreamFrame(gomock.Any()).Return(&wire.StreamFrame{ StreamID: id1, Data: []byte("foo"), }) - stream1.EXPECT().IsFlowControlBlocked().Return(false) + stream1.EXPECT().IsFlowControlBlocked().Return(false, protocol.ByteCount(0)) framer.PopStreamFrames(1000) - f := framer.PopBlockedFrame() - Expect(f).To(BeAssignableToTypeOf(&wire.BlockedFrame{})) + Expect(framer.PopBlockedFrame()).To(Equal(&wire.BlockedFrame{Offset: 0x4321})) Expect(framer.PopBlockedFrame()).To(BeNil()) }) }) diff --git a/stream_test.go b/stream_test.go index 5bc197cb..b9ce75e0 100644 --- a/stream_test.go +++ b/stream_test.go @@ -1146,10 +1146,13 @@ var _ = Describe("Stream", func() { Context("flow control", func() { It("says when it's flow control blocked", func() { - mockFC.EXPECT().IsBlocked().Return(false) - Expect(str.IsFlowControlBlocked()).To(BeFalse()) - mockFC.EXPECT().IsBlocked().Return(true) - Expect(str.IsFlowControlBlocked()).To(BeTrue()) + mockFC.EXPECT().IsBlocked().Return(false, protocol.ByteCount(0)) + blocked, _ := str.IsFlowControlBlocked() + Expect(blocked).To(BeFalse()) + mockFC.EXPECT().IsBlocked().Return(true, protocol.ByteCount(0x1337)) + blocked, offset := str.IsFlowControlBlocked() + Expect(blocked).To(BeTrue()) + Expect(offset).To(Equal(protocol.ByteCount(0x1337))) }) It("updates the flow control window", func() {