diff --git a/framer.go b/framer.go index 6803e0d7..fa2179f5 100644 --- a/framer.go +++ b/framer.go @@ -120,7 +120,7 @@ func (f *framerI) AppendStreamFrames(frames []*ackhandler.Frame, maxLen protocol // Therefore, we can pretend to have more bytes available when popping // the STREAM frame (which will always have the DataLen set). remainingLen += quicvarint.Len(uint64(remainingLen)) - frame, hasMoreData := str.popStreamFrame(remainingLen) + frame, hasMoreData := str.popStreamFrame(remainingLen, f.version) if hasMoreData { // put the stream back in the queue (at the end) f.streamQueue = append(f.streamQueue, id) } else { // no more data to send. Stream is not active any more diff --git a/framer_test.go b/framer_test.go index 4afd96ab..77ab90f7 100644 --- a/framer_test.go +++ b/framer_test.go @@ -33,7 +33,7 @@ var _ = Describe("Framer", func() { stream1.EXPECT().StreamID().Return(protocol.StreamID(5)).AnyTimes() stream2 = NewMockSendStreamI(mockCtrl) stream2.EXPECT().StreamID().Return(protocol.StreamID(6)).AnyTimes() - framer = newFramer(streamGetter, version) + framer = newFramer(streamGetter, protocol.Version1) }) Context("handling control frames", func() { @@ -124,7 +124,7 @@ var _ = Describe("Framer", func() { Offset: 42, DataLenPresent: true, } - stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f}, false) + stream1.EXPECT().popStreamFrame(gomock.Any(), protocol.Version1).Return(&ackhandler.Frame{Frame: f}, false) framer.AddActiveStream(id1) fs, length := framer.AppendStreamFrames(nil, 1000) Expect(fs).To(HaveLen(1)) @@ -139,8 +139,8 @@ var _ = Describe("Framer", func() { Expect(framer.HasData()).To(BeTrue()) f1 := &wire.StreamFrame{StreamID: id1, Data: []byte("foo")} f2 := &wire.StreamFrame{StreamID: id1, Data: []byte("bar")} - stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f1}, true) - stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f2}, false) + stream1.EXPECT().popStreamFrame(gomock.Any(), protocol.Version1).Return(&ackhandler.Frame{Frame: f1}, true) + stream1.EXPECT().popStreamFrame(gomock.Any(), protocol.Version1).Return(&ackhandler.Frame{Frame: f2}, false) frames, _ := framer.AppendStreamFrames(nil, protocol.MaxByteCount) Expect(frames).To(HaveLen(1)) Expect(frames[0].Frame).To(Equal(f1)) @@ -158,7 +158,7 @@ var _ = Describe("Framer", func() { Data: []byte("foobar"), DataLenPresent: true, } - stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f}, false) + stream1.EXPECT().popStreamFrame(gomock.Any(), protocol.Version1).Return(&ackhandler.Frame{Frame: f}, false) framer.AddActiveStream(id1) mdf := &wire.MaxDataFrame{MaximumData: 1337} frames := []*ackhandler.Frame{{Frame: mdf}} @@ -178,7 +178,7 @@ var _ = Describe("Framer", func() { Data: []byte("foobar"), DataLenPresent: true, } - stream2.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f}, false) + stream2.EXPECT().popStreamFrame(gomock.Any(), protocol.Version1).Return(&ackhandler.Frame{Frame: f}, false) framer.AddActiveStream(id1) framer.AddActiveStream(id2) frames, _ := framer.AppendStreamFrames(nil, 1000) @@ -194,8 +194,8 @@ var _ = Describe("Framer", func() { Data: []byte("foobar"), DataLenPresent: true, } - stream1.EXPECT().popStreamFrame(gomock.Any()).Return(nil, false) - stream2.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f}, false) + stream1.EXPECT().popStreamFrame(gomock.Any(), protocol.Version1).Return(nil, false) + stream2.EXPECT().popStreamFrame(gomock.Any(), protocol.Version1).Return(&ackhandler.Frame{Frame: f}, false) framer.AddActiveStream(id1) framer.AddActiveStream(id2) frames, _ := framer.AppendStreamFrames(nil, 1000) @@ -207,8 +207,8 @@ var _ = Describe("Framer", func() { streamGetter.EXPECT().GetOrOpenSendStream(id1).Return(stream1, nil).Times(2) f1 := &wire.StreamFrame{StreamID: id1, Data: []byte("foobar")} f2 := &wire.StreamFrame{StreamID: id1, Data: []byte("foobaz")} - stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f1}, true) - stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f2}, false) + stream1.EXPECT().popStreamFrame(gomock.Any(), protocol.Version1).Return(&ackhandler.Frame{Frame: f1}, true) + stream1.EXPECT().popStreamFrame(gomock.Any(), protocol.Version1).Return(&ackhandler.Frame{Frame: f2}, false) framer.AddActiveStream(id1) // only add it once frames, _ := framer.AppendStreamFrames(nil, protocol.MinStreamFrameSize) Expect(frames).To(HaveLen(1)) @@ -227,9 +227,9 @@ var _ = Describe("Framer", func() { f11 := &wire.StreamFrame{StreamID: id1, Data: []byte("foobar")} f12 := &wire.StreamFrame{StreamID: id1, Data: []byte("foobaz")} f2 := &wire.StreamFrame{StreamID: id2, Data: []byte("raboof")} - stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f11}, true) - stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f12}, false) - stream2.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f2}, false) + stream1.EXPECT().popStreamFrame(gomock.Any(), protocol.Version1).Return(&ackhandler.Frame{Frame: f11}, true) + stream1.EXPECT().popStreamFrame(gomock.Any(), protocol.Version1).Return(&ackhandler.Frame{Frame: f12}, false) + stream2.EXPECT().popStreamFrame(gomock.Any(), protocol.Version1).Return(&ackhandler.Frame{Frame: f2}, false) framer.AddActiveStream(id1) // only add it once framer.AddActiveStream(id2) // first a frame from stream 1 @@ -252,8 +252,8 @@ var _ = Describe("Framer", func() { f1 := &wire.StreamFrame{StreamID: id1, Data: []byte("foobar")} f2 := &wire.StreamFrame{StreamID: id2, Data: []byte("raboof")} // both streams have more data, and will be re-queued - stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f1}, true) - stream2.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f2}, true) + stream1.EXPECT().popStreamFrame(gomock.Any(), protocol.Version1).Return(&ackhandler.Frame{Frame: f1}, true) + stream2.EXPECT().popStreamFrame(gomock.Any(), protocol.Version1).Return(&ackhandler.Frame{Frame: f2}, true) framer.AddActiveStream(id1) framer.AddActiveStream(id2) frames, length := framer.AppendStreamFrames(nil, 1000) @@ -268,8 +268,8 @@ var _ = Describe("Framer", func() { streamGetter.EXPECT().GetOrOpenSendStream(id2).Return(stream2, nil) f1 := &wire.StreamFrame{Data: []byte("foobar")} f2 := &wire.StreamFrame{Data: []byte("foobaz")} - stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f1}, false) - stream2.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f2}, false) + stream1.EXPECT().popStreamFrame(gomock.Any(), protocol.Version1).Return(&ackhandler.Frame{Frame: f1}, false) + stream2.EXPECT().popStreamFrame(gomock.Any(), protocol.Version1).Return(&ackhandler.Frame{Frame: f2}, false) framer.AddActiveStream(id2) framer.AddActiveStream(id1) frames, _ := framer.AppendStreamFrames(nil, 1000) @@ -281,7 +281,7 @@ var _ = Describe("Framer", func() { It("only asks a stream for data once, even if it was reported active multiple times", func() { streamGetter.EXPECT().GetOrOpenSendStream(id1).Return(stream1, nil) f := &wire.StreamFrame{Data: []byte("foobar")} - stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f}, false) // only one call to this function + stream1.EXPECT().popStreamFrame(gomock.Any(), protocol.Version1).Return(&ackhandler.Frame{Frame: f}, false) // only one call to this function framer.AddActiveStream(id1) framer.AddActiveStream(id1) frames, _ := framer.AppendStreamFrames(nil, 1000) @@ -297,12 +297,12 @@ var _ = Describe("Framer", func() { It("pops maximum size STREAM frames", func() { for i := protocol.MinStreamFrameSize; i < 2000; i++ { streamGetter.EXPECT().GetOrOpenSendStream(id1).Return(stream1, nil) - stream1.EXPECT().popStreamFrame(gomock.Any()).DoAndReturn(func(size protocol.ByteCount) (*ackhandler.Frame, bool) { + stream1.EXPECT().popStreamFrame(gomock.Any(), protocol.Version1).DoAndReturn(func(size protocol.ByteCount, v protocol.VersionNumber) (*ackhandler.Frame, bool) { f := &wire.StreamFrame{ StreamID: id1, DataLenPresent: true, } - f.Data = make([]byte, f.MaxDataLen(size, version)) + f.Data = make([]byte, f.MaxDataLen(size, v)) Expect(f.Length(version)).To(Equal(size)) return &ackhandler.Frame{Frame: f}, false }) @@ -319,20 +319,20 @@ var _ = Describe("Framer", func() { for i := 2 * protocol.MinStreamFrameSize; i < 2000; i++ { streamGetter.EXPECT().GetOrOpenSendStream(id1).Return(stream1, nil) streamGetter.EXPECT().GetOrOpenSendStream(id2).Return(stream2, nil) - stream1.EXPECT().popStreamFrame(gomock.Any()).DoAndReturn(func(size protocol.ByteCount) (*ackhandler.Frame, bool) { + stream1.EXPECT().popStreamFrame(gomock.Any(), protocol.Version1).DoAndReturn(func(size protocol.ByteCount, v protocol.VersionNumber) (*ackhandler.Frame, bool) { f := &wire.StreamFrame{ StreamID: id2, DataLenPresent: true, } - f.Data = make([]byte, f.MaxDataLen(protocol.MinStreamFrameSize, version)) + f.Data = make([]byte, f.MaxDataLen(protocol.MinStreamFrameSize, v)) return &ackhandler.Frame{Frame: f}, false }) - stream2.EXPECT().popStreamFrame(gomock.Any()).DoAndReturn(func(size protocol.ByteCount) (*ackhandler.Frame, bool) { + stream2.EXPECT().popStreamFrame(gomock.Any(), protocol.Version1).DoAndReturn(func(size protocol.ByteCount, v protocol.VersionNumber) (*ackhandler.Frame, bool) { f := &wire.StreamFrame{ StreamID: id2, DataLenPresent: true, } - f.Data = make([]byte, f.MaxDataLen(size, version)) + f.Data = make([]byte, f.MaxDataLen(size, v)) Expect(f.Length(version)).To(Equal(size)) return &ackhandler.Frame{Frame: f}, false }) @@ -351,7 +351,7 @@ var _ = Describe("Framer", func() { It("pops frames that when asked for the the minimum STREAM frame size", func() { streamGetter.EXPECT().GetOrOpenSendStream(id1).Return(stream1, nil) f := &wire.StreamFrame{Data: []byte("foobar")} - stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f}, false) + stream1.EXPECT().popStreamFrame(gomock.Any(), protocol.Version1).Return(&ackhandler.Frame{Frame: f}, false) framer.AddActiveStream(id1) framer.AppendStreamFrames(nil, protocol.MinStreamFrameSize) }) @@ -369,7 +369,7 @@ var _ = Describe("Framer", func() { Data: bytes.Repeat([]byte("f"), int(500-protocol.MinStreamFrameSize)), DataLenPresent: true, } - stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f}, false) + stream1.EXPECT().popStreamFrame(gomock.Any(), protocol.Version1).Return(&ackhandler.Frame{Frame: f}, false) framer.AddActiveStream(id1) fs, length := framer.AppendStreamFrames(nil, 500) Expect(fs).To(HaveLen(1)) diff --git a/mock_send_stream_internal_test.go b/mock_send_stream_internal_test.go index 764bcbaa..acedbcf4 100644 --- a/mock_send_stream_internal_test.go +++ b/mock_send_stream_internal_test.go @@ -160,18 +160,18 @@ func (mr *MockSendStreamIMockRecorder) hasData() *gomock.Call { } // popStreamFrame mocks base method. -func (m *MockSendStreamI) popStreamFrame(maxBytes protocol.ByteCount) (*ackhandler.Frame, bool) { +func (m *MockSendStreamI) popStreamFrame(maxBytes protocol.ByteCount, v protocol.VersionNumber) (*ackhandler.Frame, bool) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "popStreamFrame", maxBytes) + ret := m.ctrl.Call(m, "popStreamFrame", maxBytes, v) ret0, _ := ret[0].(*ackhandler.Frame) ret1, _ := ret[1].(bool) return ret0, ret1 } // popStreamFrame indicates an expected call of popStreamFrame. -func (mr *MockSendStreamIMockRecorder) popStreamFrame(maxBytes interface{}) *gomock.Call { +func (mr *MockSendStreamIMockRecorder) popStreamFrame(maxBytes, v interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "popStreamFrame", reflect.TypeOf((*MockSendStreamI)(nil).popStreamFrame), maxBytes) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "popStreamFrame", reflect.TypeOf((*MockSendStreamI)(nil).popStreamFrame), maxBytes, v) } // updateSendWindow mocks base method. diff --git a/mock_stream_internal_test.go b/mock_stream_internal_test.go index ff8a2f5f..e71f7fa5 100644 --- a/mock_stream_internal_test.go +++ b/mock_stream_internal_test.go @@ -257,18 +257,18 @@ func (mr *MockStreamIMockRecorder) hasData() *gomock.Call { } // popStreamFrame mocks base method. -func (m *MockStreamI) popStreamFrame(maxBytes protocol.ByteCount) (*ackhandler.Frame, bool) { +func (m *MockStreamI) popStreamFrame(maxBytes protocol.ByteCount, v protocol.VersionNumber) (*ackhandler.Frame, bool) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "popStreamFrame", maxBytes) + ret := m.ctrl.Call(m, "popStreamFrame", maxBytes, v) ret0, _ := ret[0].(*ackhandler.Frame) ret1, _ := ret[1].(bool) return ret0, ret1 } // popStreamFrame indicates an expected call of popStreamFrame. -func (mr *MockStreamIMockRecorder) popStreamFrame(maxBytes interface{}) *gomock.Call { +func (mr *MockStreamIMockRecorder) popStreamFrame(maxBytes, v interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "popStreamFrame", reflect.TypeOf((*MockStreamI)(nil).popStreamFrame), maxBytes) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "popStreamFrame", reflect.TypeOf((*MockStreamI)(nil).popStreamFrame), maxBytes, v) } // updateSendWindow mocks base method. diff --git a/send_stream.go b/send_stream.go index 20e12259..08083d30 100644 --- a/send_stream.go +++ b/send_stream.go @@ -18,7 +18,7 @@ type sendStreamI interface { SendStream handleStopSendingFrame(*wire.StopSendingFrame) hasData() bool - popStreamFrame(maxBytes protocol.ByteCount) (*ackhandler.Frame, bool) + popStreamFrame(maxBytes protocol.ByteCount, v protocol.VersionNumber) (*ackhandler.Frame, bool) closeForShutdown(error) updateSendWindow(protocol.ByteCount) } @@ -54,8 +54,6 @@ type sendStream struct { deadline time.Time flowController flowcontrol.StreamFlowController - - version protocol.VersionNumber } var ( @@ -67,7 +65,6 @@ func newSendStream( streamID protocol.StreamID, sender streamSender, flowController flowcontrol.StreamFlowController, - version protocol.VersionNumber, ) *sendStream { s := &sendStream{ streamID: streamID, @@ -75,7 +72,6 @@ func newSendStream( flowController: flowController, writeChan: make(chan struct{}, 1), writeOnce: make(chan struct{}, 1), // cap: 1, to protect against concurrent use of Write - version: version, } s.ctx, s.ctxCancel = context.WithCancel(context.Background()) return s @@ -204,9 +200,9 @@ func (s *sendStream) canBufferStreamFrame() bool { // popStreamFrame returns the next STREAM frame that is supposed to be sent on this stream // maxBytes is the maximum length this frame (including frame header) will have. -func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*ackhandler.Frame, bool /* has more data to send */) { +func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount, v protocol.VersionNumber) (*ackhandler.Frame, bool /* has more data to send */) { s.mutex.Lock() - f, hasMoreData := s.popNewOrRetransmittedStreamFrame(maxBytes) + f, hasMoreData := s.popNewOrRetransmittedStreamFrame(maxBytes, v) if f != nil { s.numOutstandingFrames++ } @@ -222,13 +218,13 @@ func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*ackhandler.Fr return af, hasMoreData } -func (s *sendStream) popNewOrRetransmittedStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool /* has more data to send */) { +func (s *sendStream) popNewOrRetransmittedStreamFrame(maxBytes protocol.ByteCount, v protocol.VersionNumber) (*wire.StreamFrame, bool /* has more data to send */) { if s.canceledWrite || s.closeForShutdownErr != nil { return nil, false } if len(s.retransmissionQueue) > 0 { - f, hasMoreRetransmissions := s.maybeGetRetransmission(maxBytes) + f, hasMoreRetransmissions := s.maybeGetRetransmission(maxBytes, v) if f != nil || hasMoreRetransmissions { if f == nil { return nil, true @@ -264,7 +260,7 @@ func (s *sendStream) popNewOrRetransmittedStreamFrame(maxBytes protocol.ByteCoun return nil, true } - f, hasMoreData := s.popNewStreamFrame(maxBytes, sendWindow) + f, hasMoreData := s.popNewStreamFrame(maxBytes, sendWindow, v) if dataLen := f.DataLen(); dataLen > 0 { s.writeOffset += f.DataLen() s.flowController.AddBytesSent(f.DataLen()) @@ -276,12 +272,12 @@ func (s *sendStream) popNewOrRetransmittedStreamFrame(maxBytes protocol.ByteCoun return f, hasMoreData } -func (s *sendStream) popNewStreamFrame(maxBytes, sendWindow protocol.ByteCount) (*wire.StreamFrame, bool) { +func (s *sendStream) popNewStreamFrame(maxBytes, sendWindow protocol.ByteCount, v protocol.VersionNumber) (*wire.StreamFrame, bool) { if s.nextFrame != nil { nextFrame := s.nextFrame s.nextFrame = nil - maxDataLen := utils.Min(sendWindow, nextFrame.MaxDataLen(maxBytes, s.version)) + maxDataLen := utils.Min(sendWindow, nextFrame.MaxDataLen(maxBytes, v)) if nextFrame.DataLen() > maxDataLen { s.nextFrame = wire.GetStreamFrame() s.nextFrame.StreamID = s.streamID @@ -303,7 +299,7 @@ func (s *sendStream) popNewStreamFrame(maxBytes, sendWindow protocol.ByteCount) f.DataLenPresent = true f.Data = f.Data[:0] - hasMoreData := s.popNewStreamFrameWithoutBuffer(f, maxBytes, sendWindow) + hasMoreData := s.popNewStreamFrameWithoutBuffer(f, maxBytes, sendWindow, v) if len(f.Data) == 0 && !f.Fin { f.PutBack() return nil, hasMoreData @@ -311,8 +307,8 @@ func (s *sendStream) popNewStreamFrame(maxBytes, sendWindow protocol.ByteCount) return f, hasMoreData } -func (s *sendStream) popNewStreamFrameWithoutBuffer(f *wire.StreamFrame, maxBytes, sendWindow protocol.ByteCount) bool { - maxDataLen := f.MaxDataLen(maxBytes, s.version) +func (s *sendStream) popNewStreamFrameWithoutBuffer(f *wire.StreamFrame, maxBytes, sendWindow protocol.ByteCount, v protocol.VersionNumber) bool { + maxDataLen := f.MaxDataLen(maxBytes, v) if maxDataLen == 0 { // a STREAM frame must have at least one byte of data return s.dataForWriting != nil || s.nextFrame != nil || s.finishedWriting } @@ -321,9 +317,9 @@ func (s *sendStream) popNewStreamFrameWithoutBuffer(f *wire.StreamFrame, maxByte return s.dataForWriting != nil || s.nextFrame != nil || s.finishedWriting } -func (s *sendStream) maybeGetRetransmission(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool /* has more retransmissions */) { +func (s *sendStream) maybeGetRetransmission(maxBytes protocol.ByteCount, v protocol.VersionNumber) (*wire.StreamFrame, bool /* has more retransmissions */) { f := s.retransmissionQueue[0] - newFrame, needsSplit := f.MaybeSplitOffFrame(maxBytes, s.version) + newFrame, needsSplit := f.MaybeSplitOffFrame(maxBytes, v) if needsSplit { return newFrame, true } diff --git a/send_stream_test.go b/send_stream_test.go index 786f5f37..bb1b8fb4 100644 --- a/send_stream_test.go +++ b/send_stream_test.go @@ -32,7 +32,7 @@ var _ = Describe("Send Stream", func() { BeforeEach(func() { mockSender = NewMockStreamSender(mockCtrl) mockFC = mocks.NewMockStreamFlowController(mockCtrl) - str = newSendStream(streamID, mockSender, mockFC, protocol.VersionWhatever) + str = newSendStream(streamID, mockSender, mockFC) timeout := scaleDuration(250 * time.Millisecond) strWithTimeout = gbytes.TimeoutWriter(str, timeout) @@ -85,7 +85,7 @@ var _ = Describe("Send Stream", func() { waitForWrite() mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6)) - frame, _ := str.popStreamFrame(protocol.MaxByteCount) + frame, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) f := frame.Frame.(*wire.StreamFrame) Expect(f.Data).To(Equal([]byte("foobar"))) Expect(f.Fin).To(BeFalse()) @@ -109,19 +109,19 @@ var _ = Describe("Send Stream", func() { waitForWrite() mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).Times(2) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(3)).Times(2) - frame, _ := str.popStreamFrame(expectedFrameHeaderLen(0) + 3) + frame, _ := str.popStreamFrame(expectedFrameHeaderLen(0)+3, protocol.Version1) f := frame.Frame.(*wire.StreamFrame) Expect(f.Offset).To(BeZero()) Expect(f.Fin).To(BeFalse()) Expect(f.Data).To(Equal([]byte("foo"))) Expect(f.DataLenPresent).To(BeTrue()) - frame, _ = str.popStreamFrame(protocol.MaxByteCount) + frame, _ = str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) f = frame.Frame.(*wire.StreamFrame) Expect(f.Data).To(Equal([]byte("bar"))) Expect(f.Fin).To(BeFalse()) Expect(f.Offset).To(Equal(protocol.ByteCount(3))) Expect(f.DataLenPresent).To(BeTrue()) - Expect(str.popStreamFrame(1000)).To(BeNil()) + Expect(str.popStreamFrame(1000, protocol.Version1)).To(BeNil()) Eventually(done).Should(BeClosed()) }) @@ -141,7 +141,7 @@ var _ = Describe("Send Stream", func() { Eventually(done).Should(BeClosed()) // both Write calls returned without any data having been dequeued yet mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6)) - frame, _ := str.popStreamFrame(protocol.MaxByteCount) + frame, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) f := frame.Frame.(*wire.StreamFrame) Expect(f.Offset).To(BeZero()) Expect(f.Fin).To(BeFalse()) @@ -163,7 +163,7 @@ var _ = Describe("Send Stream", func() { }() waitForWrite() for i := 0; i < 5; i++ { - frame, _ := str.popStreamFrame(1100) + frame, _ := str.popStreamFrame(1100, protocol.Version1) f := frame.Frame.(*wire.StreamFrame) Expect(f.Offset).To(BeNumerically("~", 1100*i, 10*i)) Expect(f.Fin).To(BeFalse()) @@ -186,13 +186,13 @@ var _ = Describe("Send Stream", func() { waitForWrite() mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).Times(2) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(2)) - frame, hasMoreData := str.popStreamFrame(expectedFrameHeaderLen(0) + 2) + frame, hasMoreData := str.popStreamFrame(expectedFrameHeaderLen(0)+2, protocol.Version1) Expect(hasMoreData).To(BeTrue()) f := frame.Frame.(*wire.StreamFrame) Expect(f.DataLen()).To(Equal(protocol.ByteCount(2))) Consistently(done).ShouldNot(BeClosed()) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(1)) - frame, hasMoreData = str.popStreamFrame(expectedFrameHeaderLen(1) + 1) + frame, hasMoreData = str.popStreamFrame(expectedFrameHeaderLen(1)+1, protocol.Version1) Expect(hasMoreData).To(BeTrue()) f = frame.Frame.(*wire.StreamFrame) Expect(f.DataLen()).To(Equal(protocol.ByteCount(1))) @@ -214,13 +214,13 @@ var _ = Describe("Send Stream", func() { waitForWrite() mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).Times(2) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(2)) - frame, hasMoreData := str.popStreamFrame(expectedFrameHeaderLen(0) + 2) + frame, hasMoreData := str.popStreamFrame(expectedFrameHeaderLen(0)+2, protocol.Version1) Expect(hasMoreData).To(BeTrue()) f := frame.Frame.(*wire.StreamFrame) Expect(f.Data).To(Equal([]byte("fo"))) Consistently(done).ShouldNot(BeClosed()) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(4)) - frame, hasMoreData = str.popStreamFrame(expectedFrameHeaderLen(2) + 4) + frame, hasMoreData = str.popStreamFrame(expectedFrameHeaderLen(2)+4, protocol.Version1) Expect(hasMoreData).To(BeTrue()) f = frame.Frame.(*wire.StreamFrame) Expect(f.Data).To(Equal([]byte("obar"))) @@ -228,7 +228,7 @@ var _ = Describe("Send Stream", func() { }) It("popStreamFrame returns nil if no data is available", func() { - frame, hasMoreData := str.popStreamFrame(1000) + frame, hasMoreData := str.popStreamFrame(1000, protocol.Version1) Expect(frame).To(BeNil()) Expect(hasMoreData).To(BeFalse()) }) @@ -246,15 +246,15 @@ var _ = Describe("Send Stream", func() { waitForWrite() mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).Times(2) mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2) - frame, hasMoreData := str.popStreamFrame(50) + frame, hasMoreData := str.popStreamFrame(50, protocol.Version1) Expect(frame).ToNot(BeNil()) Expect(frame.Frame.(*wire.StreamFrame).Fin).To(BeFalse()) Expect(hasMoreData).To(BeTrue()) - frame, hasMoreData = str.popStreamFrame(protocol.MaxByteCount) + frame, hasMoreData = str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) Expect(frame).ToNot(BeNil()) Expect(frame.Frame.(*wire.StreamFrame).Fin).To(BeFalse()) Expect(hasMoreData).To(BeFalse()) - frame, _ = str.popStreamFrame(protocol.MaxByteCount) + frame, _ = str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) Expect(frame).To(BeNil()) Eventually(done).Should(BeClosed()) }) @@ -275,10 +275,10 @@ var _ = Describe("Send Stream", func() { Expect(n).To(Equal(3)) }() waitForWrite() - frame, _ := str.popStreamFrame(frameHeaderSize + 1) + frame, _ := str.popStreamFrame(frameHeaderSize+1, protocol.Version1) f := frame.Frame.(*wire.StreamFrame) Expect(f.Data).To(Equal([]byte("f"))) - frame, _ = str.popStreamFrame(100) + frame, _ = str.popStreamFrame(100, protocol.Version1) Expect(frame).ToNot(BeNil()) f = frame.Frame.(*wire.StreamFrame) Expect(f.Data).To(Equal([]byte("oo"))) @@ -323,7 +323,7 @@ var _ = Describe("Send Stream", func() { Expect(err).ToNot(HaveOccurred()) }() waitForWrite() - f, hasMoreData := str.popStreamFrame(1000) + f, hasMoreData := str.popStreamFrame(1000, protocol.Version1) Expect(f).To(BeNil()) Expect(hasMoreData).To(BeFalse()) // make the Write go routine return @@ -345,7 +345,7 @@ var _ = Describe("Send Stream", func() { // first pop a STREAM frame of the maximum size allowed by flow control mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(3)) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(3)) - f, hasMoreData := str.popStreamFrame(expectedFrameHeaderLen(0) + 3) + f, hasMoreData := str.popStreamFrame(expectedFrameHeaderLen(0)+3, protocol.Version1) Expect(f).ToNot(BeNil()) Expect(hasMoreData).To(BeTrue()) @@ -357,7 +357,7 @@ var _ = Describe("Send Stream", func() { StreamID: streamID, MaximumStreamData: 10, }) - f, hasMoreData = str.popStreamFrame(1000) + f, hasMoreData = str.popStreamFrame(1000, protocol.Version1) Expect(f).To(BeNil()) Expect(hasMoreData).To(BeFalse()) // make the Write go routine return @@ -416,7 +416,7 @@ var _ = Describe("Send Stream", func() { Expect(time.Now()).To(BeTemporally("~", deadline, scaleDuration(20*time.Millisecond))) }() waitForWrite() - frame, hasMoreData := str.popStreamFrame(50) + frame, hasMoreData := str.popStreamFrame(50, protocol.Version1) Expect(frame).ToNot(BeNil()) Expect(hasMoreData).To(BeTrue()) Eventually(writeReturned, scaleDuration(80*time.Millisecond)).Should(BeClosed()) @@ -437,11 +437,11 @@ var _ = Describe("Send Stream", func() { Expect(err).To(MatchError(errDeadline)) }() waitForWrite() - frame, hasMoreData := str.popStreamFrame(50) + frame, hasMoreData := str.popStreamFrame(50, protocol.Version1) Expect(frame).ToNot(BeNil()) Expect(hasMoreData).To(BeTrue()) Eventually(writeReturned, scaleDuration(80*time.Millisecond)).Should(BeClosed()) - frame, hasMoreData = str.popStreamFrame(50) + frame, hasMoreData = str.popStreamFrame(50, protocol.Version1) Expect(frame).To(BeNil()) Expect(hasMoreData).To(BeFalse()) }) @@ -529,7 +529,7 @@ var _ = Describe("Send Stream", func() { It("allows FIN", func() { mockSender.EXPECT().onHasStreamData(streamID) str.Close() - frame, hasMoreData := str.popStreamFrame(1000) + frame, hasMoreData := str.popStreamFrame(1000, protocol.Version1) Expect(frame).ToNot(BeNil()) f := frame.Frame.(*wire.StreamFrame) Expect(f.Data).To(BeEmpty()) @@ -546,12 +546,12 @@ var _ = Describe("Send Stream", func() { Expect(str.Close()).To(Succeed()) mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).Times(2) mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2) - frame, _ := str.popStreamFrame(3 + frameHeaderLen) + frame, _ := str.popStreamFrame(3+frameHeaderLen, protocol.Version1) Expect(frame).ToNot(BeNil()) f := frame.Frame.(*wire.StreamFrame) Expect(f.Data).To(Equal([]byte("foo"))) Expect(f.Fin).To(BeFalse()) - frame, _ = str.popStreamFrame(protocol.MaxByteCount) + frame, _ = str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) f = frame.Frame.(*wire.StreamFrame) Expect(f.Data).To(Equal([]byte("bar"))) Expect(f.Fin).To(BeTrue()) @@ -575,7 +575,7 @@ var _ = Describe("Send Stream", func() { if i == 5 { Eventually(done).Should(BeClosed()) } - frame, _ := str.popStreamFrame(1100) + frame, _ := str.popStreamFrame(1100, protocol.Version1) Expect(frame).ToNot(BeNil()) f := frame.Frame.(*wire.StreamFrame) Expect(f.Data).To(Equal(getDataAtOffset(f.Offset, f.DataLen()))) @@ -585,12 +585,12 @@ var _ = Describe("Send Stream", func() { It("doesn't allow FIN after it is closed for shutdown", func() { str.closeForShutdown(errors.New("test")) - f, hasMoreData := str.popStreamFrame(1000) + f, hasMoreData := str.popStreamFrame(1000, protocol.Version1) Expect(f).To(BeNil()) Expect(hasMoreData).To(BeFalse()) Expect(str.Close()).To(Succeed()) - f, hasMoreData = str.popStreamFrame(1000) + f, hasMoreData = str.popStreamFrame(1000, protocol.Version1) Expect(f).To(BeNil()) Expect(hasMoreData).To(BeFalse()) }) @@ -598,12 +598,12 @@ var _ = Describe("Send Stream", func() { It("doesn't allow FIN twice", func() { mockSender.EXPECT().onHasStreamData(streamID) str.Close() - frame, _ := str.popStreamFrame(1000) + frame, _ := str.popStreamFrame(1000, protocol.Version1) Expect(frame).ToNot(BeNil()) f := frame.Frame.(*wire.StreamFrame) Expect(f.Data).To(BeEmpty()) Expect(f.Fin).To(BeTrue()) - frame, hasMoreData := str.popStreamFrame(1000) + frame, hasMoreData := str.popStreamFrame(1000, protocol.Version1) Expect(frame).To(BeNil()) Expect(hasMoreData).To(BeFalse()) }) @@ -631,11 +631,11 @@ var _ = Describe("Send Stream", func() { close(done) }() waitForWrite() - frame, hasMoreData := str.popStreamFrame(50) // get a STREAM frame containing some data, but not all + frame, hasMoreData := str.popStreamFrame(50, protocol.Version1) // get a STREAM frame containing some data, but not all Expect(frame).ToNot(BeNil()) Expect(hasMoreData).To(BeTrue()) str.closeForShutdown(testErr) - frame, hasMoreData = str.popStreamFrame(1000) + frame, hasMoreData = str.popStreamFrame(1000, protocol.Version1) Expect(frame).To(BeNil()) Expect(hasMoreData).To(BeFalse()) Eventually(done).Should(BeClosed()) @@ -710,7 +710,7 @@ var _ = Describe("Send Stream", func() { }() runtime.Gosched() - go str.popStreamFrame(protocol.MaxByteCount) + go str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) go str.CancelWrite(1234) Eventually(errChan).Should(Receive(Not(HaveOccurred()))) }) @@ -730,7 +730,7 @@ var _ = Describe("Send Stream", func() { close(writeReturned) }() waitForWrite() - frame, _ := str.popStreamFrame(50) + frame, _ := str.popStreamFrame(50, protocol.Version1) Expect(frame).ToNot(BeNil()) mockSender.EXPECT().onStreamCompleted(streamID) str.CancelWrite(1234) @@ -750,12 +750,12 @@ var _ = Describe("Send Stream", func() { close(writeReturned) }() waitForWrite() - frame, hasMoreData := str.popStreamFrame(50) + frame, hasMoreData := str.popStreamFrame(50, protocol.Version1) Expect(hasMoreData).To(BeTrue()) Expect(frame).ToNot(BeNil()) mockSender.EXPECT().onStreamCompleted(streamID) str.CancelWrite(1234) - frame, hasMoreData = str.popStreamFrame(10) + frame, hasMoreData = str.popStreamFrame(10, protocol.Version1) Expect(frame).To(BeNil()) Expect(hasMoreData).To(BeFalse()) Eventually(writeReturned).Should(BeClosed()) @@ -774,12 +774,12 @@ var _ = Describe("Send Stream", func() { close(writeReturned) }() waitForWrite() - frame, hasMoreData := str.popStreamFrame(50) + frame, hasMoreData := str.popStreamFrame(50, protocol.Version1) Expect(hasMoreData).To(BeTrue()) Expect(frame).ToNot(BeNil()) mockSender.EXPECT().onStreamCompleted(streamID) str.CancelWrite(1234) - frame, hasMoreData = str.popStreamFrame(10) + frame, hasMoreData = str.popStreamFrame(10, protocol.Version1) Expect(hasMoreData).To(BeFalse()) Expect(frame).To(BeNil()) Eventually(writeReturned).Should(BeClosed()) @@ -797,7 +797,7 @@ var _ = Describe("Send Stream", func() { close(writeReturned) }() waitForWrite() - frame, hasMoreData := str.popStreamFrame(50) + frame, hasMoreData := str.popStreamFrame(50, protocol.Version1) Expect(hasMoreData).To(BeTrue()) Expect(frame).ToNot(BeNil()) mockSender.EXPECT().onStreamCompleted(streamID) @@ -902,7 +902,7 @@ var _ = Describe("Send Stream", func() { } mockSender.EXPECT().onHasStreamData(streamID) str.queueRetransmission(f) - frame, _ := str.popStreamFrame(protocol.MaxByteCount) + frame, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) Expect(frame).ToNot(BeNil()) f = frame.Frame.(*wire.StreamFrame) Expect(f.Offset).To(Equal(protocol.ByteCount(0x42))) @@ -919,14 +919,14 @@ var _ = Describe("Send Stream", func() { } mockSender.EXPECT().onHasStreamData(streamID) str.queueRetransmission(sf) - frame, hasMoreData := str.popStreamFrame(sf.Length(str.version) - 3) + frame, hasMoreData := str.popStreamFrame(sf.Length(protocol.Version1)-3, protocol.Version1) Expect(frame).ToNot(BeNil()) f := frame.Frame.(*wire.StreamFrame) Expect(hasMoreData).To(BeTrue()) Expect(f.Offset).To(Equal(protocol.ByteCount(0x42))) Expect(f.Data).To(Equal([]byte("foo"))) Expect(f.DataLenPresent).To(BeTrue()) - frame, _ = str.popStreamFrame(protocol.MaxByteCount) + frame, _ = str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) Expect(frame).ToNot(BeNil()) f = frame.Frame.(*wire.StreamFrame) Expect(f.Offset).To(Equal(protocol.ByteCount(0x45))) @@ -943,7 +943,7 @@ var _ = Describe("Send Stream", func() { } mockSender.EXPECT().onHasStreamData(streamID) str.queueRetransmission(f) - frame, hasMoreData := str.popStreamFrame(2) + frame, hasMoreData := str.popStreamFrame(2, protocol.Version1) Expect(hasMoreData).To(BeTrue()) Expect(frame).To(BeNil()) }) @@ -960,7 +960,7 @@ var _ = Describe("Send Stream", func() { close(done) }() waitForWrite() - frame, _ := str.popStreamFrame(protocol.MaxByteCount) + frame, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) Eventually(done).Should(BeClosed()) Expect(frame).ToNot(BeNil()) Expect(frame.Frame.(*wire.StreamFrame).Data).To(Equal([]byte("foobar"))) @@ -968,7 +968,7 @@ var _ = Describe("Send Stream", func() { // now lose the frame mockSender.EXPECT().onHasStreamData(streamID) frame.OnLost(frame.Frame) - newFrame, _ := str.popStreamFrame(protocol.MaxByteCount) + newFrame, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) Expect(newFrame).ToNot(BeNil()) Expect(newFrame.Frame.(*wire.StreamFrame).Data).To(Equal([]byte("foobar"))) }) @@ -985,7 +985,7 @@ var _ = Describe("Send Stream", func() { close(done) }() waitForWrite() - f, _ := str.popStreamFrame(100) + f, _ := str.popStreamFrame(100, protocol.Version1) Expect(f).ToNot(BeNil()) gomock.InOrder( mockSender.EXPECT().queueControlFrame(gomock.Any()), @@ -1018,7 +1018,7 @@ var _ = Describe("Send Stream", func() { // get a bunch of small frames (max. 20 bytes) var frames []ackhandler.Frame for { - frame, hasMoreData := str.popStreamFrame(20) + frame, hasMoreData := str.popStreamFrame(20, protocol.Version1) if frame == nil { continue } @@ -1038,7 +1038,7 @@ var _ = Describe("Send Stream", func() { // Now close the stream and acknowledge the FIN. mockSender.EXPECT().onHasStreamData(streamID) Expect(str.Close()).To(Succeed()) - frame, _ := str.popStreamFrame(protocol.MaxByteCount) + frame, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) Expect(frame).ToNot(BeNil()) mockSender.EXPECT().onStreamCompleted(streamID) frame.OnAcked(frame.Frame) @@ -1057,7 +1057,7 @@ var _ = Describe("Send Stream", func() { Eventually(done).Should(BeClosed()) Expect(str.Close()).To(Succeed()) - frame, hasMoreData := str.popStreamFrame(protocol.MaxByteCount) + frame, hasMoreData := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) Expect(hasMoreData).To(BeFalse()) Expect(frame).ToNot(BeNil()) Expect(frame.Frame.(*wire.StreamFrame).Fin).To(BeTrue()) @@ -1082,7 +1082,7 @@ var _ = Describe("Send Stream", func() { // get a bunch of small frames (max. 20 bytes) var frames []ackhandler.Frame for { - frame, _ := str.popStreamFrame(20) + frame, _ := str.popStreamFrame(20, protocol.Version1) if frame == nil { continue } @@ -1101,7 +1101,7 @@ var _ = Describe("Send Stream", func() { frames[0].OnLost(frames[0].Frame) // get the retransmission and acknowledge it - ret, _ := str.popStreamFrame(protocol.MaxByteCount) + ret, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1) Expect(ret).ToNot(BeNil()) mockSender.EXPECT().onStreamCompleted(streamID) ret.OnAcked(ret.Frame) @@ -1139,7 +1139,7 @@ var _ = Describe("Send Stream", func() { if completed { break } - f, _ := str.popStreamFrame(protocol.ByteCount(mrand.Intn(300) + 100)) + f, _ := str.popStreamFrame(protocol.ByteCount(mrand.Intn(300)+100), protocol.Version1) if f == nil { continue } diff --git a/stream.go b/stream.go index 95bbcb35..09e8ca6d 100644 --- a/stream.go +++ b/stream.go @@ -60,7 +60,7 @@ type streamI interface { // for sending hasData() bool handleStopSendingFrame(*wire.StopSendingFrame) - popStreamFrame(maxBytes protocol.ByteCount) (*ackhandler.Frame, bool) + popStreamFrame(maxBytes protocol.ByteCount, v protocol.VersionNumber) (*ackhandler.Frame, bool) updateSendWindow(protocol.ByteCount) } @@ -102,7 +102,7 @@ func newStream(streamID protocol.StreamID, s.completedMutex.Unlock() }, } - s.sendStream = *newSendStream(streamID, senderForSendStream, flowController, version) + s.sendStream = *newSendStream(streamID, senderForSendStream, flowController) senderForReceiveStream := &uniStreamSender{ streamSender: sender, onStreamCompletedImpl: func() { diff --git a/streams_map.go b/streams_map.go index e9f0c2e1..c938b5ba 100644 --- a/streams_map.go +++ b/streams_map.go @@ -106,7 +106,7 @@ func (m *streamsMap) initMaps() { protocol.StreamTypeUni, func(num protocol.StreamNum) sendStreamI { id := num.StreamID(protocol.StreamTypeUni, m.perspective) - return newSendStream(id, m.sender, m.newFlowController(id), m.version) + return newSendStream(id, m.sender, m.newFlowController(id)) }, m.sender.queueControlFrame, )