return ackhandler.Frames from sendStream.popStreamFrame

This commit is contained in:
Marten Seemann 2019-08-29 18:50:04 +07:00
parent e622207822
commit 0edb3f2b93
7 changed files with 89 additions and 64 deletions

View file

@ -77,7 +77,7 @@ func (f *framerI) AddActiveStream(id protocol.StreamID) {
func (f *framerI) AppendStreamFrames(frames []ackhandler.Frame, maxLen protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) { func (f *framerI) AppendStreamFrames(frames []ackhandler.Frame, maxLen protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) {
var length protocol.ByteCount var length protocol.ByteCount
var lastFrame *wire.StreamFrame var lastFrame *ackhandler.Frame
f.mutex.Lock() f.mutex.Lock()
// pop STREAM frames, until less than MinStreamFrameSize bytes are left in the packet // pop STREAM frames, until less than MinStreamFrameSize bytes are left in the packet
numActiveStreams := len(f.streamQueue) numActiveStreams := len(f.streamQueue)
@ -112,7 +112,7 @@ func (f *framerI) AppendStreamFrames(frames []ackhandler.Frame, maxLen protocol.
if frame == nil { if frame == nil {
continue continue
} }
frames = append(frames, ackhandler.Frame{Frame: frame}) frames = append(frames, *frame)
length += frame.Length(f.version) length += frame.Length(f.version)
lastFrame = frame lastFrame = frame
} }
@ -120,7 +120,7 @@ func (f *framerI) AppendStreamFrames(frames []ackhandler.Frame, maxLen protocol.
if lastFrame != nil { if lastFrame != nil {
lastFrameLen := lastFrame.Length(f.version) lastFrameLen := lastFrame.Length(f.version)
// account for the smaller size of the last STREAM frame // account for the smaller size of the last STREAM frame
lastFrame.DataLenPresent = false lastFrame.Frame.(*wire.StreamFrame).DataLenPresent = false
length += lastFrame.Length(f.version) - lastFrameLen length += lastFrame.Length(f.version) - lastFrameLen
} }
return frames, length return frames, length

View file

@ -91,7 +91,7 @@ var _ = Describe("Framer", func() {
Offset: 42, Offset: 42,
DataLenPresent: true, DataLenPresent: true,
} }
stream1.EXPECT().popStreamFrame(gomock.Any()).Return(f, false) stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f}, false)
framer.AddActiveStream(id1) framer.AddActiveStream(id1)
fs, length := framer.AppendStreamFrames(nil, 1000) fs, length := framer.AppendStreamFrames(nil, 1000)
Expect(fs).To(HaveLen(1)) Expect(fs).To(HaveLen(1))
@ -106,7 +106,7 @@ var _ = Describe("Framer", func() {
Data: []byte("foobar"), Data: []byte("foobar"),
DataLenPresent: true, DataLenPresent: true,
} }
stream1.EXPECT().popStreamFrame(gomock.Any()).Return(f, false) stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f}, false)
framer.AddActiveStream(id1) framer.AddActiveStream(id1)
mdf := &wire.MaxDataFrame{ByteOffset: 1337} mdf := &wire.MaxDataFrame{ByteOffset: 1337}
frames := []ackhandler.Frame{{Frame: mdf}} frames := []ackhandler.Frame{{Frame: mdf}}
@ -126,7 +126,7 @@ var _ = Describe("Framer", func() {
Data: []byte("foobar"), Data: []byte("foobar"),
DataLenPresent: true, DataLenPresent: true,
} }
stream2.EXPECT().popStreamFrame(gomock.Any()).Return(f, false) stream2.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f}, false)
framer.AddActiveStream(id1) framer.AddActiveStream(id1)
framer.AddActiveStream(id2) framer.AddActiveStream(id2)
frames, _ := framer.AppendStreamFrames(nil, 1000) frames, _ := framer.AppendStreamFrames(nil, 1000)
@ -143,7 +143,7 @@ var _ = Describe("Framer", func() {
DataLenPresent: true, DataLenPresent: true,
} }
stream1.EXPECT().popStreamFrame(gomock.Any()).Return(nil, false) stream1.EXPECT().popStreamFrame(gomock.Any()).Return(nil, false)
stream2.EXPECT().popStreamFrame(gomock.Any()).Return(f, false) stream2.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f}, false)
framer.AddActiveStream(id1) framer.AddActiveStream(id1)
framer.AddActiveStream(id2) framer.AddActiveStream(id2)
frames, _ := framer.AppendStreamFrames(nil, 1000) frames, _ := framer.AppendStreamFrames(nil, 1000)
@ -155,8 +155,8 @@ var _ = Describe("Framer", func() {
streamGetter.EXPECT().GetOrOpenSendStream(id1).Return(stream1, nil).Times(2) streamGetter.EXPECT().GetOrOpenSendStream(id1).Return(stream1, nil).Times(2)
f1 := &wire.StreamFrame{StreamID: id1, Data: []byte("foobar")} f1 := &wire.StreamFrame{StreamID: id1, Data: []byte("foobar")}
f2 := &wire.StreamFrame{StreamID: id1, Data: []byte("foobaz")} f2 := &wire.StreamFrame{StreamID: id1, Data: []byte("foobaz")}
stream1.EXPECT().popStreamFrame(gomock.Any()).Return(f1, true) stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f1}, true)
stream1.EXPECT().popStreamFrame(gomock.Any()).Return(f2, false) stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f2}, false)
framer.AddActiveStream(id1) // only add it once framer.AddActiveStream(id1) // only add it once
frames, _ := framer.AppendStreamFrames(nil, protocol.MinStreamFrameSize) frames, _ := framer.AppendStreamFrames(nil, protocol.MinStreamFrameSize)
Expect(frames).To(HaveLen(1)) Expect(frames).To(HaveLen(1))
@ -175,9 +175,9 @@ var _ = Describe("Framer", func() {
f11 := &wire.StreamFrame{StreamID: id1, Data: []byte("foobar")} f11 := &wire.StreamFrame{StreamID: id1, Data: []byte("foobar")}
f12 := &wire.StreamFrame{StreamID: id1, Data: []byte("foobaz")} f12 := &wire.StreamFrame{StreamID: id1, Data: []byte("foobaz")}
f2 := &wire.StreamFrame{StreamID: id2, Data: []byte("raboof")} f2 := &wire.StreamFrame{StreamID: id2, Data: []byte("raboof")}
stream1.EXPECT().popStreamFrame(gomock.Any()).Return(f11, true) stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f11}, true)
stream1.EXPECT().popStreamFrame(gomock.Any()).Return(f12, false) stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f12}, false)
stream2.EXPECT().popStreamFrame(gomock.Any()).Return(f2, false) stream2.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f2}, false)
framer.AddActiveStream(id1) // only add it once framer.AddActiveStream(id1) // only add it once
framer.AddActiveStream(id2) framer.AddActiveStream(id2)
// first a frame from stream 1 // first a frame from stream 1
@ -200,8 +200,8 @@ var _ = Describe("Framer", func() {
f1 := &wire.StreamFrame{StreamID: id1, Data: []byte("foobar")} f1 := &wire.StreamFrame{StreamID: id1, Data: []byte("foobar")}
f2 := &wire.StreamFrame{StreamID: id2, Data: []byte("raboof")} f2 := &wire.StreamFrame{StreamID: id2, Data: []byte("raboof")}
// both streams have more data, and will be re-queued // both streams have more data, and will be re-queued
stream1.EXPECT().popStreamFrame(gomock.Any()).Return(f1, true) stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f1}, true)
stream2.EXPECT().popStreamFrame(gomock.Any()).Return(f2, true) stream2.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f2}, true)
framer.AddActiveStream(id1) framer.AddActiveStream(id1)
framer.AddActiveStream(id2) framer.AddActiveStream(id2)
frames, length := framer.AppendStreamFrames(nil, 1000) frames, length := framer.AppendStreamFrames(nil, 1000)
@ -216,8 +216,8 @@ var _ = Describe("Framer", func() {
streamGetter.EXPECT().GetOrOpenSendStream(id2).Return(stream2, nil) streamGetter.EXPECT().GetOrOpenSendStream(id2).Return(stream2, nil)
f1 := &wire.StreamFrame{Data: []byte("foobar")} f1 := &wire.StreamFrame{Data: []byte("foobar")}
f2 := &wire.StreamFrame{Data: []byte("foobaz")} f2 := &wire.StreamFrame{Data: []byte("foobaz")}
stream1.EXPECT().popStreamFrame(gomock.Any()).Return(f1, false) stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f1}, false)
stream2.EXPECT().popStreamFrame(gomock.Any()).Return(f2, false) stream2.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f2}, false)
framer.AddActiveStream(id2) framer.AddActiveStream(id2)
framer.AddActiveStream(id1) framer.AddActiveStream(id1)
frames, _ := framer.AppendStreamFrames(nil, 1000) frames, _ := framer.AppendStreamFrames(nil, 1000)
@ -229,7 +229,7 @@ var _ = Describe("Framer", func() {
It("only asks a stream for data once, even if it was reported active multiple times", func() { It("only asks a stream for data once, even if it was reported active multiple times", func() {
streamGetter.EXPECT().GetOrOpenSendStream(id1).Return(stream1, nil) streamGetter.EXPECT().GetOrOpenSendStream(id1).Return(stream1, nil)
f := &wire.StreamFrame{Data: []byte("foobar")} f := &wire.StreamFrame{Data: []byte("foobar")}
stream1.EXPECT().popStreamFrame(gomock.Any()).Return(f, false) // only one call to this function stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f}, false) // only one call to this function
framer.AddActiveStream(id1) framer.AddActiveStream(id1)
framer.AddActiveStream(id1) framer.AddActiveStream(id1)
frames, _ := framer.AppendStreamFrames(nil, 1000) frames, _ := framer.AppendStreamFrames(nil, 1000)
@ -245,14 +245,14 @@ var _ = Describe("Framer", func() {
It("pops maximum size STREAM frames", func() { It("pops maximum size STREAM frames", func() {
for i := protocol.MinStreamFrameSize; i < 2000; i++ { for i := protocol.MinStreamFrameSize; i < 2000; i++ {
streamGetter.EXPECT().GetOrOpenSendStream(id1).Return(stream1, nil) streamGetter.EXPECT().GetOrOpenSendStream(id1).Return(stream1, nil)
stream1.EXPECT().popStreamFrame(gomock.Any()).DoAndReturn(func(size protocol.ByteCount) (*wire.StreamFrame, bool) { stream1.EXPECT().popStreamFrame(gomock.Any()).DoAndReturn(func(size protocol.ByteCount) (*ackhandler.Frame, bool) {
f := &wire.StreamFrame{ f := &wire.StreamFrame{
StreamID: id1, StreamID: id1,
DataLenPresent: true, DataLenPresent: true,
} }
f.Data = make([]byte, f.MaxDataLen(size, version)) f.Data = make([]byte, f.MaxDataLen(size, version))
Expect(f.Length(version)).To(Equal(size)) Expect(f.Length(version)).To(Equal(size))
return f, false return &ackhandler.Frame{Frame: f}, false
}) })
framer.AddActiveStream(id1) framer.AddActiveStream(id1)
frames, _ := framer.AppendStreamFrames(nil, i) frames, _ := framer.AppendStreamFrames(nil, i)
@ -267,22 +267,22 @@ var _ = Describe("Framer", func() {
for i := 2 * protocol.MinStreamFrameSize; i < 2000; i++ { for i := 2 * protocol.MinStreamFrameSize; i < 2000; i++ {
streamGetter.EXPECT().GetOrOpenSendStream(id1).Return(stream1, nil) streamGetter.EXPECT().GetOrOpenSendStream(id1).Return(stream1, nil)
streamGetter.EXPECT().GetOrOpenSendStream(id2).Return(stream2, nil) streamGetter.EXPECT().GetOrOpenSendStream(id2).Return(stream2, nil)
stream1.EXPECT().popStreamFrame(gomock.Any()).DoAndReturn(func(size protocol.ByteCount) (*wire.StreamFrame, bool) { stream1.EXPECT().popStreamFrame(gomock.Any()).DoAndReturn(func(size protocol.ByteCount) (*ackhandler.Frame, bool) {
f := &wire.StreamFrame{ f := &wire.StreamFrame{
StreamID: id2, StreamID: id2,
DataLenPresent: true, DataLenPresent: true,
} }
f.Data = make([]byte, f.MaxDataLen(protocol.MinStreamFrameSize, version)) f.Data = make([]byte, f.MaxDataLen(protocol.MinStreamFrameSize, version))
return f, false return &ackhandler.Frame{Frame: f}, false
}) })
stream2.EXPECT().popStreamFrame(gomock.Any()).DoAndReturn(func(size protocol.ByteCount) (*wire.StreamFrame, bool) { stream2.EXPECT().popStreamFrame(gomock.Any()).DoAndReturn(func(size protocol.ByteCount) (*ackhandler.Frame, bool) {
f := &wire.StreamFrame{ f := &wire.StreamFrame{
StreamID: id2, StreamID: id2,
DataLenPresent: true, DataLenPresent: true,
} }
f.Data = make([]byte, f.MaxDataLen(size, version)) f.Data = make([]byte, f.MaxDataLen(size, version))
Expect(f.Length(version)).To(Equal(size)) Expect(f.Length(version)).To(Equal(size))
return f, false return &ackhandler.Frame{Frame: f}, false
}) })
framer.AddActiveStream(id1) framer.AddActiveStream(id1)
framer.AddActiveStream(id2) framer.AddActiveStream(id2)
@ -298,7 +298,8 @@ var _ = Describe("Framer", func() {
It("pops frames that when asked for the the minimum STREAM frame size", func() { It("pops frames that when asked for the the minimum STREAM frame size", func() {
streamGetter.EXPECT().GetOrOpenSendStream(id1).Return(stream1, nil) streamGetter.EXPECT().GetOrOpenSendStream(id1).Return(stream1, nil)
stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&wire.StreamFrame{Data: []byte("foobar")}, false) f := &wire.StreamFrame{Data: []byte("foobar")}
stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f}, false)
framer.AddActiveStream(id1) framer.AddActiveStream(id1)
framer.AppendStreamFrames(nil, protocol.MinStreamFrameSize) framer.AppendStreamFrames(nil, protocol.MinStreamFrameSize)
}) })
@ -316,7 +317,7 @@ var _ = Describe("Framer", func() {
Data: bytes.Repeat([]byte("f"), int(500-protocol.MinStreamFrameSize)), Data: bytes.Repeat([]byte("f"), int(500-protocol.MinStreamFrameSize)),
DataLenPresent: true, DataLenPresent: true,
} }
stream1.EXPECT().popStreamFrame(gomock.Any()).Return(f, false) stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f}, false)
framer.AddActiveStream(id1) framer.AddActiveStream(id1)
fs, length := framer.AppendStreamFrames(nil, 500) fs, length := framer.AppendStreamFrames(nil, 500)
Expect(fs).To(HaveLen(1)) Expect(fs).To(HaveLen(1))

View file

@ -10,6 +10,7 @@ import (
time "time" time "time"
gomock "github.com/golang/mock/gomock" gomock "github.com/golang/mock/gomock"
ackhandler "github.com/lucas-clemente/quic-go/internal/ackhandler"
protocol "github.com/lucas-clemente/quic-go/internal/protocol" protocol "github.com/lucas-clemente/quic-go/internal/protocol"
wire "github.com/lucas-clemente/quic-go/internal/wire" wire "github.com/lucas-clemente/quic-go/internal/wire"
) )
@ -171,10 +172,10 @@ func (mr *MockSendStreamIMockRecorder) hasData() *gomock.Call {
} }
// popStreamFrame mocks base method // popStreamFrame mocks base method
func (m *MockSendStreamI) popStreamFrame(arg0 protocol.ByteCount) (*wire.StreamFrame, bool) { func (m *MockSendStreamI) popStreamFrame(arg0 protocol.ByteCount) (*ackhandler.Frame, bool) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "popStreamFrame", arg0) ret := m.ctrl.Call(m, "popStreamFrame", arg0)
ret0, _ := ret[0].(*wire.StreamFrame) ret0, _ := ret[0].(*ackhandler.Frame)
ret1, _ := ret[1].(bool) ret1, _ := ret[1].(bool)
return ret0, ret1 return ret0, ret1
} }

View file

@ -10,6 +10,7 @@ import (
time "time" time "time"
gomock "github.com/golang/mock/gomock" gomock "github.com/golang/mock/gomock"
ackhandler "github.com/lucas-clemente/quic-go/internal/ackhandler"
protocol "github.com/lucas-clemente/quic-go/internal/protocol" protocol "github.com/lucas-clemente/quic-go/internal/protocol"
wire "github.com/lucas-clemente/quic-go/internal/wire" wire "github.com/lucas-clemente/quic-go/internal/wire"
) )
@ -268,10 +269,10 @@ func (mr *MockStreamIMockRecorder) hasData() *gomock.Call {
} }
// popStreamFrame mocks base method // popStreamFrame mocks base method
func (m *MockStreamI) popStreamFrame(arg0 protocol.ByteCount) (*wire.StreamFrame, bool) { func (m *MockStreamI) popStreamFrame(arg0 protocol.ByteCount) (*ackhandler.Frame, bool) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "popStreamFrame", arg0) ret := m.ctrl.Call(m, "popStreamFrame", arg0)
ret0, _ := ret[0].(*wire.StreamFrame) ret0, _ := ret[0].(*ackhandler.Frame)
ret1, _ := ret[1].(bool) ret1, _ := ret[1].(bool)
return ret0, ret1 return ret0, ret1
} }

View file

@ -6,6 +6,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/lucas-clemente/quic-go/internal/ackhandler"
"github.com/lucas-clemente/quic-go/internal/flowcontrol" "github.com/lucas-clemente/quic-go/internal/flowcontrol"
"github.com/lucas-clemente/quic-go/internal/protocol" "github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/utils" "github.com/lucas-clemente/quic-go/internal/utils"
@ -16,7 +18,7 @@ type sendStreamI interface {
SendStream SendStream
handleStopSendingFrame(*wire.StopSendingFrame) handleStopSendingFrame(*wire.StopSendingFrame)
hasData() bool hasData() bool
popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool) popStreamFrame(maxBytes protocol.ByteCount) (*ackhandler.Frame, bool)
closeForShutdown(error) closeForShutdown(error)
handleMaxStreamDataFrame(*wire.MaxStreamDataFrame) handleMaxStreamDataFrame(*wire.MaxStreamDataFrame)
} }
@ -147,7 +149,7 @@ func (s *sendStream) Write(p []byte) (int, error) {
// popStreamFrame returns the next STREAM frame that is supposed to be sent on this stream // popStreamFrame returns the next STREAM frame that is supposed to be sent on this stream
// maxBytes is the maximum length this frame (including frame header) will have. // maxBytes is the maximum length this frame (including frame header) will have.
func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool /* has more data to send */) { func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*ackhandler.Frame, bool /* has more data to send */) {
s.mutex.Lock() s.mutex.Lock()
if len(s.retransmissionQueue) > 0 { if len(s.retransmissionQueue) > 0 {
frame, hasMoreRetransmissions := s.maybeGetRetransmission(maxBytes) frame, hasMoreRetransmissions := s.maybeGetRetransmission(maxBytes)
@ -155,7 +157,10 @@ func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFr
s.mutex.Unlock() s.mutex.Unlock()
// We always claim that we have more data to send. // We always claim that we have more data to send.
// This might be incorrect, in which case there'll be a spurious call to popStreamFrame in the future. // This might be incorrect, in which case there'll be a spurious call to popStreamFrame in the future.
return frame, true if frame == nil {
return nil, true
}
return &ackhandler.Frame{Frame: frame}, true
} }
} }
completed, frame, hasMoreData := s.popStreamFrameImpl(maxBytes) completed, frame, hasMoreData := s.popStreamFrameImpl(maxBytes)
@ -164,7 +169,10 @@ func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFr
if completed { if completed {
s.sender.onStreamCompleted(s.streamID) s.sender.onStreamCompleted(s.streamID)
} }
return frame, hasMoreData if frame == nil {
return nil, hasMoreData
}
return &ackhandler.Frame{Frame: frame}, hasMoreData
} }
func (s *sendStream) popStreamFrameImpl(maxBytes protocol.ByteCount) (bool /* completed */, *wire.StreamFrame, bool /* has more data to send */) { func (s *sendStream) popStreamFrameImpl(maxBytes protocol.ByteCount) (bool /* completed */, *wire.StreamFrame, bool /* has more data to send */) {
@ -248,10 +256,11 @@ func (s *sendStream) getDataForWriting(maxBytes protocol.ByteCount) ([]byte, boo
return ret, s.finishedWriting && s.dataForWriting == nil && !s.finSent return ret, s.finishedWriting && s.dataForWriting == nil && !s.finSent
} }
func (s *sendStream) queueRetransmission(f *wire.StreamFrame) { func (s *sendStream) queueRetransmission(f wire.Frame) {
f.DataLenPresent = true sf := f.(*wire.StreamFrame)
sf.DataLenPresent = true
s.mutex.Lock() s.mutex.Lock()
s.retransmissionQueue = append(s.retransmissionQueue, f) s.retransmissionQueue = append(s.retransmissionQueue, sf)
s.mutex.Unlock() s.mutex.Unlock()
s.sender.onHasStreamData(s.streamID) s.sender.onHasStreamData(s.streamID)

View file

@ -63,7 +63,8 @@ var _ = Describe("Send Stream", func() {
close(done) close(done)
}() }()
waitForWrite() waitForWrite()
f, _ := str.popStreamFrame(1000) frame, _ := str.popStreamFrame(1000)
f := frame.Frame.(*wire.StreamFrame)
Expect(f.Data).To(Equal([]byte("foobar"))) Expect(f.Data).To(Equal([]byte("foobar")))
Expect(f.FinBit).To(BeFalse()) Expect(f.FinBit).To(BeFalse())
Expect(f.Offset).To(BeZero()) Expect(f.Offset).To(BeZero())
@ -87,12 +88,14 @@ var _ = Describe("Send Stream", func() {
close(done) close(done)
}() }()
waitForWrite() waitForWrite()
f, _ := str.popStreamFrame(3 + frameHeaderLen) frame, _ := str.popStreamFrame(3 + frameHeaderLen)
f := frame.Frame.(*wire.StreamFrame)
Expect(f.Data).To(Equal([]byte("foo"))) Expect(f.Data).To(Equal([]byte("foo")))
Expect(f.FinBit).To(BeFalse()) Expect(f.FinBit).To(BeFalse())
Expect(f.Offset).To(BeZero()) Expect(f.Offset).To(BeZero())
Expect(f.DataLenPresent).To(BeTrue()) Expect(f.DataLenPresent).To(BeTrue())
f, _ = str.popStreamFrame(100) frame, _ = str.popStreamFrame(100)
f = frame.Frame.(*wire.StreamFrame)
Expect(f.Data).To(Equal([]byte("bar"))) Expect(f.Data).To(Equal([]byte("bar")))
Expect(f.FinBit).To(BeFalse()) Expect(f.FinBit).To(BeFalse())
Expect(f.Offset).To(Equal(protocol.ByteCount(3))) Expect(f.Offset).To(Equal(protocol.ByteCount(3)))
@ -148,9 +151,11 @@ var _ = Describe("Send Stream", func() {
}() }()
waitForWrite() waitForWrite()
frame, _ := str.popStreamFrame(frameHeaderSize + 1) frame, _ := str.popStreamFrame(frameHeaderSize + 1)
Expect(frame.Data).To(Equal([]byte("f"))) f := frame.Frame.(*wire.StreamFrame)
f, _ := str.popStreamFrame(100) Expect(f.Data).To(Equal([]byte("f")))
Expect(f).ToNot(BeNil()) frame, _ = str.popStreamFrame(100)
Expect(frame).ToNot(BeNil())
f = frame.Frame.(*wire.StreamFrame)
Expect(f.Data).To(Equal([]byte("oo"))) Expect(f.Data).To(Equal([]byte("oo")))
s[1] = 'e' s[1] = 'e'
Expect(f.Data).To(Equal([]byte("oo"))) Expect(f.Data).To(Equal([]byte("oo")))
@ -292,7 +297,7 @@ var _ = Describe("Send Stream", func() {
Expect(frame).ToNot(BeNil()) Expect(frame).ToNot(BeNil())
Expect(hasMoreData).To(BeTrue()) Expect(hasMoreData).To(BeTrue())
Eventually(writeReturned, scaleDuration(80*time.Millisecond)).Should(BeClosed()) Eventually(writeReturned, scaleDuration(80*time.Millisecond)).Should(BeClosed())
Expect(n).To(BeEquivalentTo(frame.DataLen())) Expect(n).To(BeEquivalentTo(frame.Frame.(*wire.StreamFrame).DataLen()))
}) })
It("doesn't pop any data after the deadline expired", func() { It("doesn't pop any data after the deadline expired", func() {
@ -402,7 +407,8 @@ var _ = Describe("Send Stream", func() {
mockSender.EXPECT().onHasStreamData(streamID) mockSender.EXPECT().onHasStreamData(streamID)
mockSender.EXPECT().onStreamCompleted(streamID) mockSender.EXPECT().onStreamCompleted(streamID)
str.Close() str.Close()
f, hasMoreData := str.popStreamFrame(1000) frame, hasMoreData := str.popStreamFrame(1000)
f := frame.Frame.(*wire.StreamFrame)
Expect(f).ToNot(BeNil()) Expect(f).ToNot(BeNil())
Expect(f.Data).To(BeEmpty()) Expect(f.Data).To(BeEmpty())
Expect(f.FinBit).To(BeTrue()) Expect(f.FinBit).To(BeTrue())
@ -416,12 +422,14 @@ var _ = Describe("Send Stream", func() {
mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2) mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2)
str.dataForWriting = []byte("foobar") str.dataForWriting = []byte("foobar")
Expect(str.Close()).To(Succeed()) Expect(str.Close()).To(Succeed())
f, _ := str.popStreamFrame(3 + frameHeaderLen) frame, _ := str.popStreamFrame(3 + frameHeaderLen)
f := frame.Frame.(*wire.StreamFrame)
Expect(f).ToNot(BeNil()) Expect(f).ToNot(BeNil())
Expect(f.Data).To(Equal([]byte("foo"))) Expect(f.Data).To(Equal([]byte("foo")))
Expect(f.FinBit).To(BeFalse()) Expect(f.FinBit).To(BeFalse())
mockSender.EXPECT().onStreamCompleted(streamID) mockSender.EXPECT().onStreamCompleted(streamID)
f, _ = str.popStreamFrame(100) frame, _ = str.popStreamFrame(100)
f = frame.Frame.(*wire.StreamFrame)
Expect(f.Data).To(Equal([]byte("bar"))) Expect(f.Data).To(Equal([]byte("bar")))
Expect(f.FinBit).To(BeTrue()) Expect(f.FinBit).To(BeTrue())
}) })
@ -437,12 +445,13 @@ var _ = Describe("Send Stream", func() {
mockSender.EXPECT().onHasStreamData(streamID) mockSender.EXPECT().onHasStreamData(streamID)
mockSender.EXPECT().onStreamCompleted(streamID) mockSender.EXPECT().onStreamCompleted(streamID)
str.Close() str.Close()
f, _ := str.popStreamFrame(1000) frame, _ := str.popStreamFrame(1000)
f := frame.Frame.(*wire.StreamFrame)
Expect(f).ToNot(BeNil()) Expect(f).ToNot(BeNil())
Expect(f.Data).To(BeEmpty()) Expect(f.Data).To(BeEmpty())
Expect(f.FinBit).To(BeTrue()) Expect(f.FinBit).To(BeTrue())
f, hasMoreData := str.popStreamFrame(1000) frame, hasMoreData := str.popStreamFrame(1000)
Expect(f).To(BeNil()) Expect(frame).To(BeNil())
Expect(hasMoreData).To(BeFalse()) Expect(hasMoreData).To(BeFalse())
}) })
}) })
@ -550,7 +559,7 @@ var _ = Describe("Send Stream", func() {
Expect(frame).ToNot(BeNil()) Expect(frame).ToNot(BeNil())
str.CancelWrite(1234) str.CancelWrite(1234)
Eventually(writeReturned).Should(BeClosed()) Eventually(writeReturned).Should(BeClosed())
Expect(n).To(BeEquivalentTo(frame.DataLen())) Expect(n).To(BeEquivalentTo(frame.Frame.(*wire.StreamFrame).DataLen()))
}) })
It("doesn't pop STREAM frames after being canceled", func() { It("doesn't pop STREAM frames after being canceled", func() {
@ -670,30 +679,33 @@ var _ = Describe("Send Stream", func() {
str.queueRetransmission(f) str.queueRetransmission(f)
frame, _ := str.popStreamFrame(protocol.MaxByteCount) frame, _ := str.popStreamFrame(protocol.MaxByteCount)
Expect(frame).ToNot(BeNil()) Expect(frame).ToNot(BeNil())
Expect(frame.Offset).To(Equal(protocol.ByteCount(0x42))) f = frame.Frame.(*wire.StreamFrame)
Expect(frame.Data).To(Equal([]byte("foobar"))) Expect(f.Offset).To(Equal(protocol.ByteCount(0x42)))
Expect(frame.DataLenPresent).To(BeTrue()) Expect(f.Data).To(Equal([]byte("foobar")))
Expect(f.DataLenPresent).To(BeTrue())
}) })
It("splits a retransmission", func() { It("splits a retransmission", func() {
f := &wire.StreamFrame{ sf := &wire.StreamFrame{
Data: []byte("foobar"), Data: []byte("foobar"),
Offset: 0x42, Offset: 0x42,
DataLenPresent: false, DataLenPresent: false,
} }
mockSender.EXPECT().onHasStreamData(streamID) mockSender.EXPECT().onHasStreamData(streamID)
str.queueRetransmission(f) str.queueRetransmission(sf)
frame, hasMoreData := str.popStreamFrame(f.Length(str.version) - 3) frame, hasMoreData := str.popStreamFrame(sf.Length(str.version) - 3)
Expect(hasMoreData).To(BeTrue())
Expect(frame).ToNot(BeNil()) Expect(frame).ToNot(BeNil())
Expect(frame.Offset).To(Equal(protocol.ByteCount(0x42))) f := frame.Frame.(*wire.StreamFrame)
Expect(frame.Data).To(Equal([]byte("foo"))) Expect(hasMoreData).To(BeTrue())
Expect(frame.DataLenPresent).To(BeTrue()) Expect(f.Offset).To(Equal(protocol.ByteCount(0x42)))
Expect(f.Data).To(Equal([]byte("foo")))
Expect(f.DataLenPresent).To(BeTrue())
frame, _ = str.popStreamFrame(protocol.MaxByteCount) frame, _ = str.popStreamFrame(protocol.MaxByteCount)
Expect(frame).ToNot(BeNil()) Expect(frame).ToNot(BeNil())
Expect(frame.Offset).To(Equal(protocol.ByteCount(0x45))) f = frame.Frame.(*wire.StreamFrame)
Expect(frame.Data).To(Equal([]byte("bar"))) Expect(f.Offset).To(Equal(protocol.ByteCount(0x45)))
Expect(frame.DataLenPresent).To(BeTrue()) Expect(f.Data).To(Equal([]byte("bar")))
Expect(f.DataLenPresent).To(BeTrue())
}) })
It("returns nil if the size is too small", func() { It("returns nil if the size is too small", func() {

View file

@ -5,6 +5,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/lucas-clemente/quic-go/internal/ackhandler"
"github.com/lucas-clemente/quic-go/internal/flowcontrol" "github.com/lucas-clemente/quic-go/internal/flowcontrol"
"github.com/lucas-clemente/quic-go/internal/protocol" "github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/wire" "github.com/lucas-clemente/quic-go/internal/wire"
@ -49,7 +50,7 @@ type streamI interface {
// for sending // for sending
hasData() bool hasData() bool
handleStopSendingFrame(*wire.StopSendingFrame) handleStopSendingFrame(*wire.StopSendingFrame)
popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool) popStreamFrame(maxBytes protocol.ByteCount) (*ackhandler.Frame, bool)
handleMaxStreamDataFrame(*wire.MaxStreamDataFrame) handleMaxStreamDataFrame(*wire.MaxStreamDataFrame)
} }