store stream data, not STREAM frames, in the streamFrameSorter

This commit is contained in:
Marten Seemann 2018-08-26 08:59:48 +07:00
parent ac59e284dd
commit dbada7ad02
5 changed files with 150 additions and 115 deletions

View file

@ -37,5 +37,5 @@ func newCryptoStream(sender streamSender, flowController flowcontrol.StreamFlowC
// It must not be called concurrently with any other stream methods, especially Read and Write. // It must not be called concurrently with any other stream methods, especially Read and Write.
func (s *cryptoStreamImpl) setReadOffset(offset protocol.ByteCount) { func (s *cryptoStreamImpl) setReadOffset(offset protocol.ByteCount) {
s.receiveStream.readOffset = offset s.receiveStream.readOffset = offset
s.receiveStream.frameQueue.readPosition = offset s.receiveStream.frameQueue.readPos = offset
} }

View file

@ -21,6 +21,6 @@ var _ = Describe("Crypto Stream", func() {
It("sets the read offset", func() { It("sets the read offset", func() {
str.setReadOffset(0x42) str.setReadOffset(0x42)
Expect(str.receiveStream.readOffset).To(Equal(protocol.ByteCount(0x42))) Expect(str.receiveStream.readOffset).To(Equal(protocol.ByteCount(0x42)))
Expect(str.receiveStream.frameQueue.readPosition).To(Equal(protocol.ByteCount(0x42))) Expect(str.receiveStream.frameQueue.readPos).To(Equal(protocol.ByteCount(0x42)))
}) })
}) })

View file

@ -30,8 +30,9 @@ type receiveStream struct {
frameQueue *streamFrameSorter frameQueue *streamFrameSorter
readOffset protocol.ByteCount readOffset protocol.ByteCount
currentFrame *wire.StreamFrame currentFrame []byte
readPosInFrame int currentFrameIsLast bool // is the currentFrame the last frame on this stream
readPosInFrame int
closeForShutdownErr error closeForShutdownErr error
cancelReadErr error cancelReadErr error
@ -100,7 +101,7 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err
bytesRead := 0 bytesRead := 0
for bytesRead < len(p) { for bytesRead < len(p) {
if s.currentFrame == nil || s.readPosInFrame >= int(s.currentFrame.DataLen()) { if s.currentFrame == nil || s.readPosInFrame >= len(s.currentFrame) {
s.dequeueNextFrame() s.dequeueNextFrame()
} }
if s.currentFrame == nil && bytesRead > 0 { if s.currentFrame == nil && bytesRead > 0 {
@ -124,7 +125,7 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err
return false, bytesRead, errDeadline return false, bytesRead, errDeadline
} }
if s.currentFrame != nil { if s.currentFrame != nil || s.currentFrameIsLast {
break break
} }
@ -146,13 +147,13 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err
if bytesRead > len(p) { if bytesRead > len(p) {
return false, bytesRead, fmt.Errorf("BUG: bytesRead (%d) > len(p) (%d) in stream.Read", bytesRead, len(p)) return false, bytesRead, fmt.Errorf("BUG: bytesRead (%d) > len(p) (%d) in stream.Read", bytesRead, len(p))
} }
if s.readPosInFrame > int(s.currentFrame.DataLen()) { if s.readPosInFrame > len(s.currentFrame) {
return false, bytesRead, fmt.Errorf("BUG: readPosInFrame (%d) > frame.DataLen (%d) in stream.Read", s.readPosInFrame, s.currentFrame.DataLen()) return false, bytesRead, fmt.Errorf("BUG: readPosInFrame (%d) > frame.DataLen (%d) in stream.Read", s.readPosInFrame, len(s.currentFrame))
} }
s.mutex.Unlock() s.mutex.Unlock()
m := copy(p[bytesRead:], s.currentFrame.Data[s.readPosInFrame:]) m := copy(p[bytesRead:], s.currentFrame[s.readPosInFrame:])
s.readPosInFrame += m s.readPosInFrame += m
bytesRead += m bytesRead += m
s.readOffset += protocol.ByteCount(m) s.readOffset += protocol.ByteCount(m)
@ -165,18 +166,16 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err
// increase the flow control window, if necessary // increase the flow control window, if necessary
s.flowController.MaybeQueueWindowUpdate() s.flowController.MaybeQueueWindowUpdate()
if s.readPosInFrame >= int(s.currentFrame.DataLen()) { if s.readPosInFrame >= len(s.currentFrame) && s.currentFrameIsLast {
if s.currentFrame.FinBit { s.finRead = true
s.finRead = true return true, bytesRead, io.EOF
return true, bytesRead, io.EOF
}
} }
} }
return false, bytesRead, nil return false, bytesRead, nil
} }
func (s *receiveStream) dequeueNextFrame() { func (s *receiveStream) dequeueNextFrame() {
s.currentFrame = s.frameQueue.Pop() s.currentFrame, s.currentFrameIsLast = s.frameQueue.Pop()
s.readPosInFrame = 0 s.readPosInFrame = 0
} }

View file

@ -9,9 +9,10 @@ import (
) )
type streamFrameSorter struct { type streamFrameSorter struct {
queuedFrames map[protocol.ByteCount]*wire.StreamFrame queue map[protocol.ByteCount][]byte
readPosition protocol.ByteCount readPos protocol.ByteCount
gaps *utils.ByteIntervalList finalOffset protocol.ByteCount
gaps *utils.ByteIntervalList
} }
var ( var (
@ -21,33 +22,38 @@ var (
func newStreamFrameSorter() *streamFrameSorter { func newStreamFrameSorter() *streamFrameSorter {
s := streamFrameSorter{ s := streamFrameSorter{
gaps: utils.NewByteIntervalList(), gaps: utils.NewByteIntervalList(),
queuedFrames: make(map[protocol.ByteCount]*wire.StreamFrame), queue: make(map[protocol.ByteCount][]byte),
finalOffset: protocol.MaxByteCount,
} }
s.gaps.PushFront(utils.ByteInterval{Start: 0, End: protocol.MaxByteCount}) s.gaps.PushFront(utils.ByteInterval{Start: 0, End: protocol.MaxByteCount})
return &s return &s
} }
func (s *streamFrameSorter) Push(frame *wire.StreamFrame) error { func (s *streamFrameSorter) Push(frame *wire.StreamFrame) error {
if frame.DataLen() == 0 { return s.push(frame.Data, frame.Offset, frame.FinBit)
if frame.FinBit { }
s.queuedFrames[frame.Offset] = frame
} func (s *streamFrameSorter) push(data []byte, offset protocol.ByteCount, fin bool) error {
if fin {
s.finalOffset = offset + protocol.ByteCount(len(data))
}
if len(data) == 0 {
return nil return nil
} }
var wasCut bool var wasCut bool
if oldFrame, ok := s.queuedFrames[frame.Offset]; ok { if oldData, ok := s.queue[offset]; ok {
if frame.DataLen() <= oldFrame.DataLen() { if len(data) <= len(oldData) {
return errDuplicateStreamData return errDuplicateStreamData
} }
frame.Data = frame.Data[oldFrame.DataLen():] data = data[len(oldData):]
frame.Offset += oldFrame.DataLen() offset += protocol.ByteCount(len(oldData))
wasCut = true wasCut = true
} }
start := frame.Offset start := offset
end := frame.Offset + frame.DataLen() end := offset + protocol.ByteCount(len(data))
// skip all gaps that are before this stream frame // skip all gaps that are before this stream frame
var gap *utils.ByteIntervalElement var gap *utils.ByteIntervalElement
@ -67,9 +73,9 @@ func (s *streamFrameSorter) Push(frame *wire.StreamFrame) error {
if start < gap.Value.Start { if start < gap.Value.Start {
add := gap.Value.Start - start add := gap.Value.Start - start
frame.Offset += add offset += add
start += add start += add
frame.Data = frame.Data[add:] data = data[add:]
wasCut = true wasCut = true
} }
@ -87,15 +93,15 @@ func (s *streamFrameSorter) Push(frame *wire.StreamFrame) error {
break break
} }
// delete queued frames completely covered by the current frame // delete queued frames completely covered by the current frame
delete(s.queuedFrames, endGap.Value.End) delete(s.queue, endGap.Value.End)
endGap = nextEndGap endGap = nextEndGap
} }
if end > endGap.Value.End { if end > endGap.Value.End {
cutLen := end - endGap.Value.End cutLen := end - endGap.Value.End
len := frame.DataLen() - cutLen len := protocol.ByteCount(len(data)) - cutLen
end -= cutLen end -= cutLen
frame.Data = frame.Data[:len] data = data[:len]
wasCut = true wasCut = true
} }
@ -132,21 +138,21 @@ func (s *streamFrameSorter) Push(frame *wire.StreamFrame) error {
} }
if wasCut { if wasCut {
data := make([]byte, frame.DataLen()) newData := make([]byte, len(data))
copy(data, frame.Data) copy(newData, data)
frame.Data = data data = newData
} }
s.queuedFrames[frame.Offset] = frame s.queue[offset] = data
return nil return nil
} }
func (s *streamFrameSorter) Pop() *wire.StreamFrame { func (s *streamFrameSorter) Pop() ([]byte /* data */, bool /* fin */) {
frame, ok := s.queuedFrames[s.readPosition] data, ok := s.queue[s.readPos]
if !ok { if !ok {
return nil return nil, s.readPos >= s.finalOffset
} }
s.readPosition += frame.DataLen() delete(s.queue, s.readPos)
delete(s.queuedFrames, frame.Offset) s.readPos += protocol.ByteCount(len(data))
return frame return data, s.readPos >= s.finalOffset
} }

View file

@ -37,7 +37,9 @@ var _ = Describe("STREAM frame sorter", func() {
Data: []byte("foobar"), Data: []byte("foobar"),
} }
Expect(s.Push(f)).To(Succeed()) Expect(s.Push(f)).To(Succeed())
Expect(s.Pop()).To(Equal(f)) data, fin := s.Pop()
Expect(data).To(Equal(f.Data))
Expect(fin).To(BeFalse())
Expect(s.Pop()).To(BeNil()) Expect(s.Pop()).To(BeNil())
}) })
@ -52,8 +54,12 @@ var _ = Describe("STREAM frame sorter", func() {
} }
Expect(s.Push(f1)).To(Succeed()) Expect(s.Push(f1)).To(Succeed())
Expect(s.Push(f2)).To(Succeed()) Expect(s.Push(f2)).To(Succeed())
Expect(s.Pop()).To(Equal(f1)) data, fin := s.Pop()
Expect(s.Pop()).To(Equal(f2)) Expect(data).To(Equal(f1.Data))
Expect(fin).To(BeFalse())
data, fin = s.Pop()
Expect(data).To(Equal(f2.Data))
Expect(fin).To(BeFalse())
Expect(s.Pop()).To(BeNil()) Expect(s.Pop()).To(BeNil())
}) })
@ -63,17 +69,37 @@ var _ = Describe("STREAM frame sorter", func() {
Expect(s.Pop()).To(BeNil()) Expect(s.Pop()).To(BeNil())
}) })
Context("FinBit handling", func() { Context("FIN handling", func() {
It("saves a FinBit frame at offset 0", func() { It("saves a FIN frame at offset 0", func() {
f := &wire.StreamFrame{ f := &wire.StreamFrame{
Offset: 0, Offset: 0,
FinBit: true, FinBit: true,
} }
Expect(s.Push(f)).To(Succeed()) Expect(s.Push(f)).To(Succeed())
Expect(s.Pop()).To(Equal(f)) data, fin := s.Pop()
Expect(data).To(Equal(f.Data))
Expect(fin).To(BeTrue())
data, fin = s.Pop()
Expect(data).To(BeNil())
Expect(fin).To(BeTrue())
}) })
It("sets the FinBit if a stream is closed after receiving some data", func() { It("saves a FIN frame at non-zero offset", func() {
f := &wire.StreamFrame{
Offset: 0,
Data: []byte("foobar"),
FinBit: true,
}
Expect(s.Push(f)).To(Succeed())
data, fin := s.Pop()
Expect(data).To(Equal(f.Data))
Expect(fin).To(BeTrue())
data, fin = s.Pop()
Expect(data).To(BeNil())
Expect(fin).To(BeTrue())
})
It("sets the FIN if a stream is closed after receiving some data", func() {
f1 := &wire.StreamFrame{ f1 := &wire.StreamFrame{
Offset: 0, Offset: 0,
Data: []byte("foobar"), Data: []byte("foobar"),
@ -84,8 +110,12 @@ var _ = Describe("STREAM frame sorter", func() {
FinBit: true, FinBit: true,
} }
Expect(s.Push(f2)).To(Succeed()) Expect(s.Push(f2)).To(Succeed())
Expect(s.Pop()).To(Equal(f1)) data, fin := s.Pop()
Expect(s.Pop()).To(Equal(f2)) Expect(data).To(Equal(f1.Data))
Expect(fin).To(BeTrue())
data, fin = s.Pop()
Expect(data).To(BeNil())
Expect(fin).To(BeTrue())
}) })
}) })
@ -211,7 +241,7 @@ var _ = Describe("STREAM frame sorter", func() {
} }
err = s.Push(f3) err = s.Push(f3)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(s.queuedFrames).To(HaveLen(3)) Expect(s.queue).To(HaveLen(3))
checkGaps([]utils.ByteInterval{ checkGaps([]utils.ByteInterval{
{Start: 15, End: protocol.MaxByteCount}, {Start: 15, End: protocol.MaxByteCount},
}) })
@ -230,7 +260,7 @@ var _ = Describe("STREAM frame sorter", func() {
} }
err = s.Push(f2) err = s.Push(f2)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(s.queuedFrames).To(HaveLen(2)) Expect(s.queue).To(HaveLen(2))
checkGaps([]utils.ByteInterval{ checkGaps([]utils.ByteInterval{
{Start: 0, End: 50}, {Start: 0, End: 50},
{Start: 56, End: 100}, {Start: 56, End: 100},
@ -262,9 +292,9 @@ var _ = Describe("STREAM frame sorter", func() {
} }
err := s.Push(f) err := s.Push(f)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(s.queuedFrames).To(HaveKey(protocol.ByteCount(0))) Expect(s.queue).To(HaveKey(protocol.ByteCount(0)))
Expect(s.queuedFrames[0].Data).To(Equal([]byte("fooba"))) Expect(s.queue[0]).To(Equal([]byte("fooba")))
Expect(s.queuedFrames[0].Data).To(HaveCap(5)) Expect(s.queue[0]).To(HaveCap(5))
checkGaps([]utils.ByteInterval{ checkGaps([]utils.ByteInterval{
{Start: 10, End: 15}, {Start: 10, End: 15},
{Start: 20, End: 25}, {Start: 20, End: 25},
@ -280,9 +310,9 @@ var _ = Describe("STREAM frame sorter", func() {
} }
err := s.Push(f) err := s.Push(f)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(s.queuedFrames).To(HaveKey(protocol.ByteCount(4))) Expect(s.queue).To(HaveKey(protocol.ByteCount(4)))
Expect(s.queuedFrames[4].Data).To(Equal([]byte("f"))) Expect(s.queue[4]).To(Equal([]byte("f")))
Expect(s.queuedFrames[4].Data).To(HaveCap(1)) Expect(s.queue[4]).To(HaveCap(1))
checkGaps([]utils.ByteInterval{ checkGaps([]utils.ByteInterval{
{Start: 0, End: 4}, {Start: 0, End: 4},
{Start: 10, End: 15}, {Start: 10, End: 15},
@ -299,9 +329,9 @@ var _ = Describe("STREAM frame sorter", func() {
} }
err := s.Push(f) err := s.Push(f)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(s.queuedFrames).To(HaveKey(protocol.ByteCount(10))) Expect(s.queue).To(HaveKey(protocol.ByteCount(10)))
Expect(s.queuedFrames[10].Data).To(Equal([]byte("fooba"))) Expect(s.queue[10]).To(Equal([]byte("fooba")))
Expect(s.queuedFrames[10].Data).To(HaveCap(5)) Expect(s.queue[10]).To(HaveCap(5))
checkGaps([]utils.ByteInterval{ checkGaps([]utils.ByteInterval{
{Start: 0, End: 5}, {Start: 0, End: 5},
{Start: 20, End: 25}, {Start: 20, End: 25},
@ -317,10 +347,10 @@ var _ = Describe("STREAM frame sorter", func() {
} }
err := s.Push(f) err := s.Push(f)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(s.queuedFrames).ToNot(HaveKey(protocol.ByteCount(8))) Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(8)))
Expect(s.queuedFrames).To(HaveKey(protocol.ByteCount(10))) Expect(s.queue).To(HaveKey(protocol.ByteCount(10)))
Expect(s.queuedFrames[10].Data).To(Equal([]byte("obar"))) Expect(s.queue[10]).To(Equal([]byte("obar")))
Expect(s.queuedFrames[10].Data).To(HaveCap(4)) Expect(s.queue[10]).To(HaveCap(4))
checkGaps([]utils.ByteInterval{ checkGaps([]utils.ByteInterval{
{Start: 0, End: 5}, {Start: 0, End: 5},
{Start: 14, End: 15}, {Start: 14, End: 15},
@ -337,9 +367,9 @@ var _ = Describe("STREAM frame sorter", func() {
} }
err := s.Push(f) err := s.Push(f)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(s.queuedFrames).ToNot(HaveKey(protocol.ByteCount(5))) Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(5)))
Expect(s.queuedFrames).To(HaveKey(protocol.ByteCount(2))) Expect(s.queue).To(HaveKey(protocol.ByteCount(2)))
Expect(s.queuedFrames[2].Data).To(Equal([]byte("1234567890"))) Expect(s.queue[2]).To(Equal([]byte("1234567890")))
checkGaps([]utils.ByteInterval{ checkGaps([]utils.ByteInterval{
{Start: 0, End: 2}, {Start: 0, End: 2},
{Start: 12, End: 15}, {Start: 12, End: 15},
@ -356,10 +386,10 @@ var _ = Describe("STREAM frame sorter", func() {
} }
err := s.Push(f) err := s.Push(f)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(s.queuedFrames).ToNot(HaveKey(protocol.ByteCount(5))) Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(5)))
Expect(s.queuedFrames).To(HaveKey(protocol.ByteCount(2))) Expect(s.queue).To(HaveKey(protocol.ByteCount(2)))
Expect(s.queuedFrames[2].Data).To(Equal([]byte("1234567890123"))) Expect(s.queue[2]).To(Equal([]byte("1234567890123")))
Expect(s.queuedFrames[2].Data).To(HaveCap(13)) Expect(s.queue[2]).To(HaveCap(13))
checkGaps([]utils.ByteInterval{ checkGaps([]utils.ByteInterval{
{Start: 0, End: 2}, {Start: 0, End: 2},
{Start: 20, End: 25}, {Start: 20, End: 25},
@ -375,9 +405,9 @@ var _ = Describe("STREAM frame sorter", func() {
} }
err := s.Push(f) err := s.Push(f)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(s.queuedFrames).To(HaveKey(protocol.ByteCount(5))) Expect(s.queue).To(HaveKey(protocol.ByteCount(5)))
Expect(s.queuedFrames).ToNot(HaveKey(protocol.ByteCount(15))) Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(15)))
Expect(s.queuedFrames[10].Data).To(Equal([]byte("678901234567"))) Expect(s.queue[10]).To(Equal([]byte("678901234567")))
checkGaps([]utils.ByteInterval{ checkGaps([]utils.ByteInterval{
{Start: 0, End: 5}, {Start: 0, End: 5},
{Start: 22, End: 25}, {Start: 22, End: 25},
@ -393,12 +423,12 @@ var _ = Describe("STREAM frame sorter", func() {
} }
err := s.Push(f) err := s.Push(f)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(s.queuedFrames).ToNot(HaveKey(protocol.ByteCount(5))) Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(5)))
Expect(s.queuedFrames).ToNot(HaveKey(protocol.ByteCount(15))) Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(15)))
Expect(s.queuedFrames).To(HaveKey(protocol.ByteCount(25))) Expect(s.queue).To(HaveKey(protocol.ByteCount(25)))
Expect(s.queuedFrames).To(HaveKey(protocol.ByteCount(2))) Expect(s.queue).To(HaveKey(protocol.ByteCount(2)))
Expect(s.queuedFrames[2].Data).To(Equal(bytes.Repeat([]byte{'e'}, 23))) Expect(s.queue[2]).To(Equal(bytes.Repeat([]byte{'e'}, 23)))
Expect(s.queuedFrames[2].Data).To(HaveCap(23)) Expect(s.queue[2]).To(HaveCap(23))
checkGaps([]utils.ByteInterval{ checkGaps([]utils.ByteInterval{
{Start: 0, End: 2}, {Start: 0, End: 2},
{Start: 30, End: protocol.MaxByteCount}, {Start: 30, End: protocol.MaxByteCount},
@ -413,12 +443,12 @@ var _ = Describe("STREAM frame sorter", func() {
} }
err := s.Push(f) err := s.Push(f)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(s.queuedFrames).To(HaveKey(protocol.ByteCount(5))) Expect(s.queue).To(HaveKey(protocol.ByteCount(5)))
Expect(s.queuedFrames).ToNot(HaveKey(protocol.ByteCount(15))) Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(15)))
Expect(s.queuedFrames).To(HaveKey(protocol.ByteCount(25))) Expect(s.queue).To(HaveKey(protocol.ByteCount(25)))
Expect(s.queuedFrames).To(HaveKey(protocol.ByteCount(10))) Expect(s.queue).To(HaveKey(protocol.ByteCount(10)))
Expect(s.queuedFrames[10].Data).To(Equal(bytes.Repeat([]byte{'d'}, 15))) Expect(s.queue[10]).To(Equal(bytes.Repeat([]byte{'d'}, 15)))
Expect(s.queuedFrames[10].Data).To(HaveCap(15)) Expect(s.queue[10]).To(HaveCap(15))
checkGaps([]utils.ByteInterval{ checkGaps([]utils.ByteInterval{
{Start: 0, End: 5}, {Start: 0, End: 5},
{Start: 30, End: protocol.MaxByteCount}, {Start: 30, End: protocol.MaxByteCount},
@ -433,10 +463,10 @@ var _ = Describe("STREAM frame sorter", func() {
} }
err := s.Push(f) err := s.Push(f)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(s.queuedFrames).To(HaveKey(protocol.ByteCount(1))) Expect(s.queue).To(HaveKey(protocol.ByteCount(1)))
Expect(s.queuedFrames).To(HaveKey(protocol.ByteCount(15))) Expect(s.queue).To(HaveKey(protocol.ByteCount(15)))
Expect(s.queuedFrames).ToNot(HaveKey(protocol.ByteCount(5))) Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(5)))
Expect(s.queuedFrames[1].Data).To(Equal(f.Data)) Expect(s.queue[1]).To(Equal(f.Data))
checkGaps([]utils.ByteInterval{ checkGaps([]utils.ByteInterval{
{Start: 0, End: 1}, {Start: 0, End: 1},
{Start: 20, End: 25}, {Start: 20, End: 25},
@ -452,9 +482,9 @@ var _ = Describe("STREAM frame sorter", func() {
} }
err := s.Push(f) err := s.Push(f)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(s.queuedFrames).To(HaveLen(1)) Expect(s.queue).To(HaveLen(1))
Expect(s.queuedFrames).To(HaveKey(protocol.ByteCount(0))) Expect(s.queue).To(HaveKey(protocol.ByteCount(0)))
Expect(s.queuedFrames[0].Data).To(Equal(f.Data)) Expect(s.queue[0]).To(Equal(f.Data))
checkGaps([]utils.ByteInterval{ checkGaps([]utils.ByteInterval{
{Start: 32, End: protocol.MaxByteCount}, {Start: 32, End: protocol.MaxByteCount},
}) })
@ -468,10 +498,10 @@ var _ = Describe("STREAM frame sorter", func() {
} }
err := s.Push(f) err := s.Push(f)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(s.queuedFrames).ToNot(HaveKey(protocol.ByteCount(8))) Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(8)))
Expect(s.queuedFrames).To(HaveKey(protocol.ByteCount(10))) Expect(s.queue).To(HaveKey(protocol.ByteCount(10)))
Expect(s.queuedFrames[10].Data).To(Equal([]byte("34567"))) Expect(s.queue[10]).To(Equal([]byte("34567")))
Expect(s.queuedFrames[10].Data).To(HaveCap(5)) Expect(s.queue[10]).To(HaveCap(5))
checkGaps([]utils.ByteInterval{ checkGaps([]utils.ByteInterval{
{Start: 0, End: 5}, {Start: 0, End: 5},
{Start: 20, End: 25}, {Start: 20, End: 25},
@ -487,9 +517,9 @@ var _ = Describe("STREAM frame sorter", func() {
} }
err := s.Push(f) err := s.Push(f)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(s.queuedFrames).To(HaveKey(protocol.ByteCount(10))) Expect(s.queue).To(HaveKey(protocol.ByteCount(10)))
Expect(s.queuedFrames[10].Data).To(Equal([]byte("12345"))) Expect(s.queue[10]).To(Equal([]byte("12345")))
Expect(s.queuedFrames[10].Data).To(HaveCap(5)) Expect(s.queue[10]).To(HaveCap(5))
checkGaps([]utils.ByteInterval{ checkGaps([]utils.ByteInterval{
{Start: 0, End: 5}, {Start: 0, End: 5},
{Start: 20, End: 25}, {Start: 20, End: 25},
@ -521,46 +551,46 @@ var _ = Describe("STREAM frame sorter", func() {
It("does not modify data when receiving a duplicate", func() { It("does not modify data when receiving a duplicate", func() {
err := s.Push(&wire.StreamFrame{Offset: 0, Data: []byte("fffff")}) err := s.Push(&wire.StreamFrame{Offset: 0, Data: []byte("fffff")})
Expect(err).To(MatchError(errDuplicateStreamData)) Expect(err).To(MatchError(errDuplicateStreamData))
Expect(s.queuedFrames[0].Data).ToNot(Equal([]byte("fffff"))) Expect(s.queue[0]).ToNot(Equal([]byte("fffff")))
}) })
It("detects a duplicate frame that is smaller than the original, starting at the beginning", func() { It("detects a duplicate frame that is smaller than the original, starting at the beginning", func() {
// 10 to 12 // 10 to 12
err := s.Push(&wire.StreamFrame{Offset: 10, Data: []byte("12")}) err := s.Push(&wire.StreamFrame{Offset: 10, Data: []byte("12")})
Expect(err).To(MatchError(errDuplicateStreamData)) Expect(err).To(MatchError(errDuplicateStreamData))
Expect(s.queuedFrames[10].Data).To(HaveLen(5)) Expect(s.queue[10]).To(HaveLen(5))
}) })
It("detects a duplicate frame that is smaller than the original, somewhere in the middle", func() { It("detects a duplicate frame that is smaller than the original, somewhere in the middle", func() {
// 1 to 4 // 1 to 4
err := s.Push(&wire.StreamFrame{Offset: 1, Data: []byte("123")}) err := s.Push(&wire.StreamFrame{Offset: 1, Data: []byte("123")})
Expect(err).To(MatchError(errDuplicateStreamData)) Expect(err).To(MatchError(errDuplicateStreamData))
Expect(s.queuedFrames[0].Data).To(HaveLen(5)) Expect(s.queue[0]).To(HaveLen(5))
Expect(s.queuedFrames).ToNot(HaveKey(protocol.ByteCount(1))) Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(1)))
}) })
It("detects a duplicate frame that is smaller than the original, somewhere in the middle in the last block", func() { It("detects a duplicate frame that is smaller than the original, somewhere in the middle in the last block", func() {
// 11 to 14 // 11 to 14
err := s.Push(&wire.StreamFrame{Offset: 11, Data: []byte("123")}) err := s.Push(&wire.StreamFrame{Offset: 11, Data: []byte("123")})
Expect(err).To(MatchError(errDuplicateStreamData)) Expect(err).To(MatchError(errDuplicateStreamData))
Expect(s.queuedFrames[10].Data).To(HaveLen(5)) Expect(s.queue[10]).To(HaveLen(5))
Expect(s.queuedFrames).ToNot(HaveKey(protocol.ByteCount(11))) Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(11)))
}) })
It("detects a duplicate frame that is smaller than the original, with aligned end in the last block", func() { It("detects a duplicate frame that is smaller than the original, with aligned end in the last block", func() {
// 11 to 14 // 11 to 14
err := s.Push(&wire.StreamFrame{Offset: 11, Data: []byte("1234")}) err := s.Push(&wire.StreamFrame{Offset: 11, Data: []byte("1234")})
Expect(err).To(MatchError(errDuplicateStreamData)) Expect(err).To(MatchError(errDuplicateStreamData))
Expect(s.queuedFrames[10].Data).To(HaveLen(5)) Expect(s.queue[10]).To(HaveLen(5))
Expect(s.queuedFrames).ToNot(HaveKey(protocol.ByteCount(11))) Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(11)))
}) })
It("detects a duplicate frame that is smaller than the original, with aligned end", func() { It("detects a duplicate frame that is smaller than the original, with aligned end", func() {
// 3 to 5 // 3 to 5
err := s.Push(&wire.StreamFrame{Offset: 3, Data: []byte("12")}) err := s.Push(&wire.StreamFrame{Offset: 3, Data: []byte("12")})
Expect(err).To(MatchError(errDuplicateStreamData)) Expect(err).To(MatchError(errDuplicateStreamData))
Expect(s.queuedFrames[0].Data).To(HaveLen(5)) Expect(s.queue[0]).To(HaveLen(5))
Expect(s.queuedFrames).ToNot(HaveKey(protocol.ByteCount(3))) Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(3)))
}) })
}) })