diff --git a/frame_sorter.go b/frame_sorter.go index f2fc6986..aeafa7d4 100644 --- a/frame_sorter.go +++ b/frame_sorter.go @@ -45,108 +45,105 @@ func (s *frameSorter) push(data []byte, offset protocol.ByteCount, doneCb func() return errDuplicateStreamData } - if oldEntry, ok := s.queue[offset]; ok { - if len(data) <= len(oldEntry.Data) { - return errDuplicateStreamData - } - // The data we currently have is shorter than the new data. - // Replace it. - if oldEntry.DoneCb != nil { - oldEntry.DoneCb() - } - s.queue[offset] = frameSorterEntry{Data: data, DoneCb: doneCb} - } - start := offset end := offset + protocol.ByteCount(len(data)) - // skip all gaps that are before this stream frame - var gap *utils.ByteIntervalElement - for gap = s.gaps.Front(); gap != nil; gap = gap.Next() { - // the frame is a duplicate. Ignore it - if end <= gap.Value.Start { - return errDuplicateStreamData - } - if end > gap.Value.Start && start <= gap.Value.End { - break - } + if end <= s.gaps.Front().Value.Start { + return errDuplicateStreamData } - if gap == nil { - return errors.New("StreamFrameSorter BUG: no gap found") + startGap, startsInGap := s.findStartGap(start) + endGap, endsInGap := s.findEndGap(startGap, end) + + startGapEqualsEndGap := startGap == endGap + + if (startGapEqualsEndGap && end <= startGap.Value.Start) || + (!startGapEqualsEndGap && startGap.Value.End >= endGap.Value.Start && end <= startGap.Value.Start) { + return errDuplicateStreamData } + startGapNext := startGap.Next() + startGapEnd := startGap.Value.End // save it, in case startGap is modified + endGapStart := endGap.Value.Start // save it, in case endGap is modified + endGapEnd := endGap.Value.End // save it, in case endGap is modified + var adjustedStartGapEnd bool var wasCut bool - if start < gap.Value.Start { - add := gap.Value.Start - start - offset += add - start += add - data = data[add:] - wasCut = true - } - // find the highest gaps whose Start lies before the end of the frame - endGap := gap - for end >= endGap.Value.End { - nextEndGap := endGap.Next() - if nextEndGap == nil { - return errors.New("StreamFrameSorter BUG: no end gap found") - } - if endGap != gap { - s.gaps.Remove(endGap) - } - if end < nextEndGap.Value.Start { + pos := start + var hasReplacedAtLeastOne bool + for { + oldEntry, ok := s.queue[pos] + if !ok { break } - // delete queued frames completely covered by the current frame - end := endGap.Value.End - if end != offset { - if cb := s.queue[end].DoneCb; cb != nil { - cb() + oldEntryLen := protocol.ByteCount(len(oldEntry.Data)) + if end-pos > oldEntryLen || (hasReplacedAtLeastOne && end-pos == oldEntryLen) { + // The existing frame is shorter than the new frame. Replace it. + delete(s.queue, pos) + pos += oldEntryLen + hasReplacedAtLeastOne = true + if oldEntry.DoneCb != nil { + oldEntry.DoneCb() } - delete(s.queue, end) + } else { + if !hasReplacedAtLeastOne { + return errDuplicateStreamData + } + // The existing frame is longer than the new frame. + // Cut the new frame such that the end aligns with the start of the existing frame. + data = data[:pos-start] + end = pos + wasCut = true + break } - endGap = nextEndGap } - if end > endGap.Value.End { - cutLen := end - endGap.Value.End - len := protocol.ByteCount(len(data)) - cutLen - end -= cutLen - data = data[:len] + if !startsInGap && !hasReplacedAtLeastOne { + // cut the frame, such that it starts at the start of the gap + data = data[startGap.Value.Start-start:] + start = startGap.Value.Start wasCut = true } + if start <= startGap.Value.Start { + if end >= startGap.Value.End { + // The frame covers the whole startGap. Delete the gap. + s.gaps.Remove(startGap) + } else { + startGap.Value.Start = end + } + } else if !hasReplacedAtLeastOne { + startGap.Value.End = start + adjustedStartGapEnd = true + } - if start == gap.Value.Start { - if end >= gap.Value.End { - // the frame completely fills this gap - // delete the gap + if !startGapEqualsEndGap { + s.deleteConsecutive(startGapEnd) + var nextGap *utils.ByteIntervalElement + for gap := startGapNext; gap.Value.End < endGapStart; gap = nextGap { + nextGap = gap.Next() + s.deleteConsecutive(gap.Value.End) s.gaps.Remove(gap) } - if end < endGap.Value.End { - // the frame covers the beginning of the gap - // adjust the Start value to shrink the gap - endGap.Value.Start = end - } - } else if end == endGap.Value.End { - // the frame covers the end of the gap - // adjust the End value to shrink the gap - gap.Value.End = start - } else { - if gap == endGap { - // the frame lies within the current gap, splitting it into two - // insert a new gap and adjust the current one - intv := utils.ByteInterval{Start: end, End: gap.Value.End} - s.gaps.InsertAfter(intv, gap) - gap.Value.End = start - } else { - gap.Value.End = start - endGap.Value.Start = end - } } - if s.gaps.Len() > protocol.MaxStreamFrameSorterGaps { - return errors.New("too many gaps in received data") + if !endsInGap && start != endGapEnd && end > endGapEnd { + // cut the frame, such that it ends at the end of the gap + data = data[:endGapEnd-start] + end = endGapEnd + wasCut = true + } + if end == endGapEnd { + if !startGapEqualsEndGap { + // The frame covers the whole endGap. Delete the gap. + s.gaps.Remove(endGap) + } + } else { + if startGapEqualsEndGap && adjustedStartGapEnd { + // The frame split the existing gap into two. + s.gaps.InsertAfter(utils.ByteInterval{Start: end, End: startGapEnd}, startGap) + } else if !startGapEqualsEndGap { + endGap.Value.Start = end + } } if wasCut && len(data) < protocol.MinStreamFrameBufferSize { @@ -159,10 +156,54 @@ func (s *frameSorter) push(data []byte, offset protocol.ByteCount, doneCb func() } } - s.queue[offset] = frameSorterEntry{Data: data, DoneCb: doneCb} + if s.gaps.Len() > protocol.MaxStreamFrameSorterGaps { + return errors.New("too many gaps in received data") + } + + s.queue[start] = frameSorterEntry{Data: data, DoneCb: doneCb} return nil } +func (s *frameSorter) findStartGap(offset protocol.ByteCount) (*utils.ByteIntervalElement, bool) { + for gap := s.gaps.Front(); gap != nil; gap = gap.Next() { + if offset >= gap.Value.Start && offset <= gap.Value.End { + return gap, true + } + if offset < gap.Value.Start { + return gap, false + } + } + panic("no gap found") +} + +func (s *frameSorter) findEndGap(startGap *utils.ByteIntervalElement, offset protocol.ByteCount) (*utils.ByteIntervalElement, bool) { + for gap := startGap; gap != nil; gap = gap.Next() { + if offset >= gap.Value.Start && offset < gap.Value.End { + return gap, true + } + if offset < gap.Value.Start { + return gap.Prev(), false + } + } + panic("no gap found") +} + +// deleteConsecutive deletes consecutive frames from the queue, starting at pos +func (s *frameSorter) deleteConsecutive(pos protocol.ByteCount) { + for { + oldEntry, ok := s.queue[pos] + if !ok { + break + } + oldEntryLen := protocol.ByteCount(len(oldEntry.Data)) + delete(s.queue, pos) + if oldEntry.DoneCb != nil { + oldEntry.DoneCb() + } + pos += oldEntryLen + } +} + func (s *frameSorter) Pop() (protocol.ByteCount, []byte, func()) { entry, ok := s.queue[s.readPos] if !ok { @@ -171,6 +212,9 @@ func (s *frameSorter) Pop() (protocol.ByteCount, []byte, func()) { delete(s.queue, s.readPos) offset := s.readPos s.readPos += protocol.ByteCount(len(entry.Data)) + if s.gaps.Front().Value.End <= s.readPos { + panic("frame sorter BUG: read position higher than a gap") + } return offset, entry.Data, entry.DoneCb } diff --git a/frame_sorter_test.go b/frame_sorter_test.go index de20bf2d..ccbf4217 100644 --- a/frame_sorter_test.go +++ b/frame_sorter_test.go @@ -2,6 +2,10 @@ package quic import ( "bytes" + "fmt" + "math" + "math/rand" + "time" "github.com/lucas-clemente/quic-go/internal/protocol" "github.com/lucas-clemente/quic-go/internal/utils" @@ -13,7 +17,13 @@ var _ = Describe("frame sorter", func() { var s *frameSorter checkGaps := func(expectedGaps []utils.ByteInterval) { - ExpectWithOffset(1, s.gaps.Len()).To(Equal(len(expectedGaps))) + if s.gaps.Len() != len(expectedGaps) { + fmt.Println("Gaps:") + for gap := s.gaps.Front(); gap != nil; gap = gap.Next() { + fmt.Printf("\t%d - %d\n", gap.Value.Start, gap.Value.End) + } + ExpectWithOffset(1, s.gaps.Len()).To(Equal(len(expectedGaps))) + } var i int for gap := s.gaps.Front(); gap != nil; gap = gap.Next() { ExpectWithOffset(1, gap.Value).To(Equal(expectedGaps[i])) @@ -21,21 +31,37 @@ var _ = Describe("frame sorter", func() { } } - getCallback := func() (func(), *bool) { - var called bool - return func() { called = true }, &called + type callbackTracker struct { + called *bool + cb func() } - checkCallback := func(cb func(), called *bool) { - ExpectWithOffset(1, cb).ToNot(BeNil()) - ExpectWithOffset(1, *called).To(BeFalse()) - cb() - ExpectWithOffset(1, *called).To(BeTrue()) + getCallback := func() (func(), callbackTracker) { + var called bool + cb := func() { + if called { + panic("double free") + } + called = true + } + return cb, callbackTracker{ + cb: cb, + called: &called, + } + } + + checkCallbackCalled := func(t callbackTracker) { + ExpectWithOffset(1, *t.called).To(BeTrue()) + } + + checkCallbackNotCalled := func(t callbackTracker) { + ExpectWithOffset(1, *t.called).To(BeFalse()) + t.cb() + ExpectWithOffset(1, *t.called).To(BeTrue()) } BeforeEach(func() { s = newFrameSorter() - _ = checkGaps }) It("returns nil when empty", func() { @@ -44,520 +70,1459 @@ var _ = Describe("frame sorter", func() { Expect(doneCb).To(BeNil()) }) - Context("Push", func() { - It("inserts and pops a single frame", func() { - cb, called := getCallback() - Expect(s.Push([]byte("foobar"), 0, cb)).To(Succeed()) - offset, data, doneCb := s.Pop() - Expect(offset).To(BeZero()) - Expect(data).To(Equal([]byte("foobar"))) - checkCallback(doneCb, called) - offset, data, doneCb = s.Pop() - Expect(offset).To(Equal(protocol.ByteCount(6))) - Expect(data).To(BeNil()) - Expect(doneCb).To(BeNil()) + It("inserts and pops a single frame", func() { + cb, t := getCallback() + Expect(s.Push([]byte("foobar"), 0, cb)).To(Succeed()) + offset, data, doneCb := s.Pop() + Expect(offset).To(BeZero()) + Expect(data).To(Equal([]byte("foobar"))) + Expect(doneCb).ToNot(BeNil()) + checkCallbackNotCalled(t) + offset, data, doneCb = s.Pop() + Expect(offset).To(Equal(protocol.ByteCount(6))) + Expect(data).To(BeNil()) + Expect(doneCb).To(BeNil()) + }) + + It("inserts and pops two consecutive frame", func() { + cb1, t1 := getCallback() + cb2, t2 := getCallback() + Expect(s.Push([]byte("bar"), 3, cb2)).To(Succeed()) + Expect(s.Push([]byte("foo"), 0, cb1)).To(Succeed()) + offset, data, doneCb := s.Pop() + Expect(offset).To(BeZero()) + Expect(data).To(Equal([]byte("foo"))) + Expect(doneCb).ToNot(BeNil()) + doneCb() + checkCallbackCalled(t1) + offset, data, doneCb = s.Pop() + Expect(offset).To(Equal(protocol.ByteCount(3))) + Expect(data).To(Equal([]byte("bar"))) + Expect(doneCb).ToNot(BeNil()) + doneCb() + checkCallbackCalled(t2) + offset, data, doneCb = s.Pop() + Expect(offset).To(Equal(protocol.ByteCount(6))) + Expect(data).To(BeNil()) + Expect(doneCb).To(BeNil()) + }) + + It("ignores empty frames", func() { + Expect(s.Push(nil, 0, nil)).To(Succeed()) + _, data, doneCb := s.Pop() + Expect(data).To(BeNil()) + Expect(doneCb).To(BeNil()) + }) + + It("says if has more data", func() { + Expect(s.HasMoreData()).To(BeFalse()) + Expect(s.Push([]byte("foo"), 0, nil)).To(Succeed()) + Expect(s.HasMoreData()).To(BeTrue()) + _, data, _ := s.Pop() + Expect(data).To(Equal([]byte("foo"))) + Expect(s.HasMoreData()).To(BeFalse()) + }) + + Context("Gap handling", func() { + var dataCounter uint8 + + BeforeEach(func() { + dataCounter = 0 }) - It("inserts and pops two consecutive frame", func() { - cb1, called1 := getCallback() - cb2, called2 := getCallback() - Expect(s.Push([]byte("bar"), 3, cb2)).To(Succeed()) - Expect(s.Push([]byte("foo"), 0, cb1)).To(Succeed()) - offset, data, doneCb := s.Pop() - Expect(offset).To(BeZero()) - Expect(data).To(Equal([]byte("foo"))) - checkCallback(doneCb, called1) - offset, data, doneCb = s.Pop() - Expect(offset).To(Equal(protocol.ByteCount(3))) - Expect(data).To(Equal([]byte("bar"))) - checkCallback(doneCb, called2) - offset, data, doneCb = s.Pop() - Expect(offset).To(Equal(protocol.ByteCount(6))) - Expect(data).To(BeNil()) - Expect(doneCb).To(BeNil()) - }) + checkQueue := func(m map[protocol.ByteCount][]byte) { + ExpectWithOffset(1, s.queue).To(HaveLen(len(m))) + for offset, data := range m { + ExpectWithOffset(1, s.queue).To(HaveKey(offset)) + ExpectWithOffset(1, s.queue[offset].Data).To(Equal(data)) + } + } - It("ignores empty frames", func() { - Expect(s.Push(nil, 0, nil)).To(Succeed()) - _, data, doneCb := s.Pop() - Expect(data).To(BeNil()) - Expect(doneCb).To(BeNil()) - }) + getData := func(l protocol.ByteCount) []byte { + dataCounter++ + return bytes.Repeat([]byte{dataCounter}, int(l)) + } - It("says if has more data", func() { - Expect(s.HasMoreData()).To(BeFalse()) - Expect(s.Push([]byte("foo"), 0, nil)).To(Succeed()) - Expect(s.HasMoreData()).To(BeTrue()) - _, data, _ := s.Pop() - Expect(data).To(Equal([]byte("foo"))) - Expect(s.HasMoreData()).To(BeFalse()) - }) - - Context("Gap handling", func() { - It("finds the first gap", func() { - Expect(s.Push([]byte("foobar"), 10, nil)).To(Succeed()) - checkGaps([]utils.ByteInterval{ - {Start: 0, End: 10}, - {Start: 16, End: protocol.MaxByteCount}, - }) + // ---xxx-------------- + // ++++++ + // => + // ---xxx++++++-------- + It("case 1", func() { + f1 := getData(3) + cb1, t1 := getCallback() + f2 := getData(5) + cb2, t2 := getCallback() + Expect(s.Push(f1, 3, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 6, cb2)).To(Succeed()) // 6 - 11 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f1, + 6: f2, }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 11, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackNotCalled(t2) + }) - It("correctly sets the first gap for a frame with offset 0", func() { + // ---xxx----------------- + // +++++++ + // => + // ---xxx---+++++++-------- + It("case 2", func() { + f1 := getData(3) + cb1, t1 := getCallback() + f2 := getData(5) + cb2, t2 := getCallback() + Expect(s.Push(f1, 3, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 10, cb2)).To(Succeed()) // 10 -15 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f1, + 10: f2, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 6, End: 10}, + {Start: 15, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackNotCalled(t2) + }) + + // ---xxx----xxxxxx------- + // ++++ + // => + // ---xxx++++xxxxx-------- + It("case 3", func() { + f1 := getData(3) + cb1, t1 := getCallback() + f2 := getData(4) + cb2, t2 := getCallback() + f3 := getData(5) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f3, 10, cb2)).To(Succeed()) // 10 - 15 + Expect(s.Push(f2, 6, cb3)).To(Succeed()) // 6 - 10 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f1, + 6: f2, + 10: f3, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 15, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackNotCalled(t2) + checkCallbackNotCalled(t3) + }) + + // ----xxxx------- + // ++++ + // => + // ----xxxx++----- + It("case 4", func() { + f1 := getData(4) + cb1, t1 := getCallback() + f2 := getData(4) + cb2, t2 := getCallback() + Expect(s.Push(f1, 3, cb1)).To(Succeed()) // 3 - 7 + Expect(s.Push(f2, 5, cb2)).To(Succeed()) // 5 - 9 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f1, + 7: f2[2:], + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 9, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackCalled(t2) + }) + + It("case 4, for long frames", func() { + mult := protocol.ByteCount(math.Ceil(float64(protocol.MinStreamFrameSize) / 2)) + f1 := getData(4 * mult) + cb1, t1 := getCallback() + f2 := getData(4 * mult) + cb2, t2 := getCallback() + Expect(s.Push(f1, 3*mult, cb1)).To(Succeed()) // 3 - 7 + Expect(s.Push(f2, 5*mult, cb2)).To(Succeed()) // 5 - 9 + checkQueue(map[protocol.ByteCount][]byte{ + 3 * mult: f1, + 7 * mult: f2[2*mult:], + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3 * mult}, + {Start: 9 * mult, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackNotCalled(t2) + }) + + // xxxx------- + // ++++ + // => + // xxxx+++----- + It("case 5", func() { + f1 := getData(4) + cb1, t1 := getCallback() + f2 := getData(4) + cb2, t2 := getCallback() + Expect(s.Push(f1, 0, cb1)).To(Succeed()) // 0 - 4 + Expect(s.Push(f2, 3, cb2)).To(Succeed()) // 3 - 7 + checkQueue(map[protocol.ByteCount][]byte{ + 0: f1, + 4: f2[1:], + }) + checkGaps([]utils.ByteInterval{ + {Start: 7, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackCalled(t2) + }) + + It("case 5, for long frames", func() { + mult := protocol.ByteCount(math.Ceil(float64(protocol.MinStreamFrameSize) / 2)) + f1 := getData(4 * mult) + cb1, t1 := getCallback() + f2 := getData(4 * mult) + cb2, t2 := getCallback() + Expect(s.Push(f1, 0, cb1)).To(Succeed()) // 0 - 4 + Expect(s.Push(f2, 3*mult, cb2)).To(Succeed()) // 3 - 7 + checkQueue(map[protocol.ByteCount][]byte{ + 0: f1, + 4 * mult: f2[mult:], + }) + checkGaps([]utils.ByteInterval{ + {Start: 7 * mult, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackNotCalled(t2) + }) + + // ----xxxx------- + // ++++ + // => + // --++xxxx------- + It("case 6", func() { + f1 := getData(4) + cb1, t1 := getCallback() + f2 := getData(4) + cb2, t2 := getCallback() + Expect(s.Push(f1, 5, cb1)).To(Succeed()) // 5 - 9 + Expect(s.Push(f2, 3, cb2)).To(Succeed()) // 3 - 7 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f2[:2], + 5: f1, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 9, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackCalled(t2) + }) + + It("case 6, for long frames", func() { + mult := protocol.ByteCount(math.Ceil(float64(protocol.MinStreamFrameSize) / 2)) + f1 := getData(4 * mult) + cb1, t1 := getCallback() + f2 := getData(4 * mult) + cb2, t2 := getCallback() + Expect(s.Push(f1, 5*mult, cb1)).To(Succeed()) // 5 - 9 + Expect(s.Push(f2, 3*mult, cb2)).To(Succeed()) // 3 - 7 + checkQueue(map[protocol.ByteCount][]byte{ + 3 * mult: f2[:2*mult], + 5 * mult: f1, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3 * mult}, + {Start: 9 * mult, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackNotCalled(t2) + }) + + // ---xxx----xxxxxx------- + // ++ + // => + // ---xxx++--xxxxx-------- + It("case 7", func() { + f1 := getData(3) + cb1, t1 := getCallback() + f2 := getData(2) + cb2, t2 := getCallback() + f3 := getData(5) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f3, 10, cb2)).To(Succeed()) // 10 - 15 + Expect(s.Push(f2, 6, cb3)).To(Succeed()) // 6 - 8 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f1, + 6: f2, + 10: f3, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 8, End: 10}, + {Start: 15, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackNotCalled(t2) + checkCallbackNotCalled(t3) + }) + + // ---xxx---------xxxxxx-- + // ++ + // => + // ---xxx---++----xxxxx-- + It("case 8", func() { + f1 := getData(3) + cb1, t1 := getCallback() + f2 := getData(2) + cb2, t2 := getCallback() + f3 := getData(5) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f3, 15, cb2)).To(Succeed()) // 15 - 20 + Expect(s.Push(f2, 10, cb3)).To(Succeed()) // 10 - 12 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f1, + 10: f2, + 15: f3, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 6, End: 10}, + {Start: 12, End: 15}, + {Start: 20, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackNotCalled(t2) + checkCallbackNotCalled(t3) + }) + + // ---xxx----xxxxxx------- + // ++ + // => + // ---xxx--++xxxxx-------- + It("case 9", func() { + f1 := getData(3) + cb1, t1 := getCallback() + f2 := getData(2) + cb2, t2 := getCallback() + cb3, t3 := getCallback() + f3 := getData(5) + Expect(s.Push(f1, 3, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f3, 10, cb2)).To(Succeed()) // 10 - 15 + Expect(s.Push(f2, 8, cb3)).To(Succeed()) // 8 - 10 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f1, + 8: f2, + 10: f3, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 6, End: 8}, + {Start: 15, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackNotCalled(t2) + checkCallbackNotCalled(t3) + }) + + // ---xxx----=====------- + // +++++++ + // => + // ---xxx++++=====-------- + It("case 10", func() { + f1 := getData(3) + cb1, t1 := getCallback() + f2 := getData(5) + cb2, t2 := getCallback() + f3 := getData(6) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 10, cb2)).To(Succeed()) // 10 - 15 + Expect(s.Push(f3, 5, cb3)).To(Succeed()) // 5 - 11 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f1, + 6: f3[1:5], + 10: f2, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 15, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackNotCalled(t2) + checkCallbackCalled(t3) + }) + + It("case 10, for long frames", func() { + mult := protocol.ByteCount(math.Ceil(float64(protocol.MinStreamFrameSize) / 4)) + f1 := getData(3 * mult) + cb1, t1 := getCallback() + f2 := getData(5 * mult) + cb2, t2 := getCallback() + f3 := getData(6 * mult) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3*mult, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 10*mult, cb2)).To(Succeed()) // 10 - 15 + Expect(s.Push(f3, 5*mult, cb3)).To(Succeed()) // 5 - 11 + checkQueue(map[protocol.ByteCount][]byte{ + 3 * mult: f1, + 6 * mult: f3[mult : 5*mult], + 10 * mult: f2, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3 * mult}, + {Start: 15 * mult, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackNotCalled(t2) + checkCallbackNotCalled(t3) + }) + + // ---xxxx----=====------- + // ++++++ + // => + // ---xxx++++=====-------- + It("case 11", func() { + f1 := getData(4) + cb1, t1 := getCallback() + f2 := getData(5) + cb2, t2 := getCallback() + f3 := getData(5) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3, cb1)).To(Succeed()) // 3 - 7 + Expect(s.Push(f2, 10, cb2)).To(Succeed()) // 10 - 15 + Expect(s.Push(f3, 5, cb3)).To(Succeed()) // 5 - 10 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f1, + 7: f3[2:], + 10: f2, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 15, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackNotCalled(t2) + checkCallbackCalled(t3) + }) + + // ---xxxx----=====------- + // ++++++ + // => + // ---xxx++++=====-------- + It("case 11, for long frames", func() { + mult := protocol.ByteCount(math.Ceil(float64(protocol.MinStreamFrameSize) / 3)) + f1 := getData(4 * mult) + cb1, t1 := getCallback() + f2 := getData(5 * mult) + cb2, t2 := getCallback() + f3 := getData(5 * mult) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3*mult, cb1)).To(Succeed()) // 3 - 7 + Expect(s.Push(f2, 10*mult, cb2)).To(Succeed()) // 10 - 15 + Expect(s.Push(f3, 5*mult, cb3)).To(Succeed()) // 5 - 10 + checkQueue(map[protocol.ByteCount][]byte{ + 3 * mult: f1, + 7 * mult: f3[2*mult:], + 10 * mult: f2, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3 * mult}, + {Start: 15 * mult, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackNotCalled(t2) + checkCallbackNotCalled(t3) + }) + + // ----xxxx------- + // +++++++ + // => + // ----+++++++----- + It("case 12", func() { + f1 := getData(4) + cb1, t1 := getCallback() + f2 := getData(7) + cb2, t2 := getCallback() + Expect(s.Push(f1, 3, cb1)).To(Succeed()) // 3 - 7 + Expect(s.Push(f2, 3, cb2)).To(Succeed()) // 3 - 10 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f2, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 10, End: protocol.MaxByteCount}, + }) + checkCallbackCalled(t1) + checkCallbackNotCalled(t2) + }) + + // ----xxx===------- + // +++++++ + // => + // ----+++++++----- + It("case 13", func() { + f1 := getData(3) + cb1, t1 := getCallback() + f2 := getData(3) + cb2, t2 := getCallback() + f3 := getData(7) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 6, cb2)).To(Succeed()) // 6 - 9 + Expect(s.Push(f3, 3, cb3)).To(Succeed()) // 3 - 10 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f3, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 10, End: protocol.MaxByteCount}, + }) + checkCallbackCalled(t1) + checkCallbackCalled(t2) + checkCallbackNotCalled(t3) + }) + + // ----xxx====------- + // +++++ + // => + // ----+++====----- + It("case 14", func() { + f1 := getData(3) + cb1, t1 := getCallback() + f2 := getData(4) + cb2, t2 := getCallback() + f3 := getData(5) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 6, cb2)).To(Succeed()) // 6 - 10 + Expect(s.Push(f3, 3, cb3)).To(Succeed()) // 3 - 8 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f3[:3], + 6: f2, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 10, End: protocol.MaxByteCount}, + }) + checkCallbackCalled(t1) + checkCallbackNotCalled(t2) + checkCallbackCalled(t3) + }) + + It("case 14, for long frames", func() { + mult := protocol.ByteCount(math.Ceil(float64(protocol.MinStreamFrameSize) / 3)) + f1 := getData(3 * mult) + cb1, t1 := getCallback() + f2 := getData(4 * mult) + cb2, t2 := getCallback() + f3 := getData(5 * mult) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3*mult, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 6*mult, cb2)).To(Succeed()) // 6 - 10 + Expect(s.Push(f3, 3*mult, cb3)).To(Succeed()) // 3 - 8 + checkQueue(map[protocol.ByteCount][]byte{ + 3 * mult: f3[:3*mult], + 6 * mult: f2, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3 * mult}, + {Start: 10 * mult, End: protocol.MaxByteCount}, + }) + checkCallbackCalled(t1) + checkCallbackNotCalled(t2) + checkCallbackNotCalled(t3) + }) + + // ----xxx===------- + // ++++++ + // => + // ----++++++----- + It("case 15", func() { + f1 := getData(3) + cb1, t1 := getCallback() + f2 := getData(3) + cb2, t2 := getCallback() + f3 := getData(6) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 6, cb2)).To(Succeed()) // 6 - 9 + Expect(s.Push(f3, 3, cb3)).To(Succeed()) // 3 - 9 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f3, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 9, End: protocol.MaxByteCount}, + }) + checkCallbackCalled(t1) + checkCallbackCalled(t2) + checkCallbackNotCalled(t3) + }) + + // ---xxxx------- + // ++++ + // => + // ---xxxx----- + It("case 16", func() { + f1 := getData(4) + cb1, t1 := getCallback() + f2 := getData(4) + cb2, t2 := getCallback() + Expect(s.Push(f1, 5, cb1)).To(Succeed()) // 5 - 9 + Expect(s.Push(f2, 5, cb2)).To(Succeed()) // 5 - 9 + checkQueue(map[protocol.ByteCount][]byte{ + 5: f1, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 5}, + {Start: 9, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackCalled(t2) + }) + + // ----xxx===------- + // +++ + // => + // ----xxx===----- + It("case 17", func() { + f1 := getData(3) + cb1, t1 := getCallback() + f2 := getData(3) + cb2, t2 := getCallback() + f3 := getData(3) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 6, cb2)).To(Succeed()) // 6 - 9 + Expect(s.Push(f3, 3, cb3)).To(Succeed()) // 3 - 6 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f1, + 6: f2, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 9, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackNotCalled(t2) + checkCallbackCalled(t3) + }) + + // ---xxxx------- + // ++ + // => + // ---xxxx----- + It("case 18", func() { + f1 := getData(4) + cb1, t1 := getCallback() + f2 := getData(2) + cb2, t2 := getCallback() + Expect(s.Push(f1, 5, cb1)).To(Succeed()) // 5 - 9 + Expect(s.Push(f2, 5, cb2)).To(Succeed()) // 5 - 7 + checkQueue(map[protocol.ByteCount][]byte{ + 5: f1, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 5}, + {Start: 9, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackCalled(t2) + }) + + // ---xxxxx------ + // ++ + // => + // ---xxxxx---- + It("case 19", func() { + f1 := getData(5) + cb1, t1 := getCallback() + f2 := getData(2) + cb2, t2 := getCallback() + Expect(s.Push(f1, 5, cb1)).To(Succeed()) // 5 - 10 + checkQueue(map[protocol.ByteCount][]byte{ + 5: f1, + }) + Expect(s.Push(f2, 6, cb2)).To(Succeed()) // 6 - 8 + checkQueue(map[protocol.ByteCount][]byte{ + 5: f1, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 5}, + {Start: 10, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackCalled(t2) + }) + + // xxxxx------ + // ++ + // => + // xxxxx------ + It("case 20", func() { + f1 := getData(10) + cb1, t1 := getCallback() + f2 := getData(4) + cb2, t2 := getCallback() + Expect(s.Push(f1, 0, cb1)).To(Succeed()) // 0 - 10 + Expect(s.Push(f2, 5, cb2)).To(Succeed()) // 5 - 9 + checkQueue(map[protocol.ByteCount][]byte{ + 0: f1, + }) + checkGaps([]utils.ByteInterval{ + {Start: 10, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackCalled(t2) + }) + + // ---xxxxx--- + // +++ + // => + // ---xxxxx--- + It("case 21", func() { + f1 := getData(5) + cb1, t1 := getCallback() + f2 := getData(3) + cb2, t2 := getCallback() + Expect(s.Push(f1, 5, cb1)).To(Succeed()) // 5 - 10 + Expect(s.Push(f2, 7, cb2)).To(Succeed()) // 7 - 10 + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 5}, + {Start: 10, End: protocol.MaxByteCount}, + }) + checkQueue(map[protocol.ByteCount][]byte{ + 5: f1, + }) + checkCallbackNotCalled(t1) + checkCallbackCalled(t2) + }) + + // ----xxx------ + // +++++ + // => + // --+++++---- + It("case 22", func() { + f1 := getData(3) + cb1, t1 := getCallback() + f2 := getData(5) + cb2, t2 := getCallback() + Expect(s.Push(f1, 5, cb1)).To(Succeed()) // 5 - 8 + Expect(s.Push(f2, 3, cb2)).To(Succeed()) // 3 - 8 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f2, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 8, End: protocol.MaxByteCount}, + }) + checkCallbackCalled(t1) + checkCallbackNotCalled(t2) + }) + + // ----xxx===------ + // ++++++++ + // => + // --++++++++---- + It("case 23", func() { + f1 := getData(3) + cb1, t1 := getCallback() + f2 := getData(3) + cb2, t2 := getCallback() + f3 := getData(8) + cb3, t3 := getCallback() + Expect(s.Push(f1, 5, cb1)).To(Succeed()) // 5 - 8 + Expect(s.Push(f2, 8, cb2)).To(Succeed()) // 8 - 11 + Expect(s.Push(f3, 3, cb3)).To(Succeed()) // 3 - 11 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f3, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 11, End: protocol.MaxByteCount}, + }) + checkCallbackCalled(t1) + checkCallbackCalled(t2) + checkCallbackNotCalled(t3) + }) + + // --xxx---===--- + // ++++++ + // => + // --xxx++++++---- + It("case 24", func() { + f1 := getData(3) + cb1, t1 := getCallback() + f2 := getData(3) + cb2, t2 := getCallback() + f3 := getData(6) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 9, cb2)).To(Succeed()) // 9 - 12 + Expect(s.Push(f3, 6, cb3)).To(Succeed()) // 6 - 12 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f1, + 6: f3, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 12, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackCalled(t2) + checkCallbackNotCalled(t3) + }) + + // --xxx---===---### + // +++++++++ + // => + // --xxx+++++++++### + It("case 25", func() { + f1 := getData(3) + cb1, t1 := getCallback() + f2 := getData(3) + cb2, t2 := getCallback() + f3 := getData(3) + cb3, t3 := getCallback() + f4 := getData(9) + cb4, t4 := getCallback() + Expect(s.Push(f1, 3, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 9, cb2)).To(Succeed()) // 9 - 12 + Expect(s.Push(f3, 15, cb3)).To(Succeed()) // 15 - 18 + Expect(s.Push(f4, 6, cb4)).To(Succeed()) // 6 - 15 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f1, + 6: f4, + 15: f3, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 18, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackCalled(t2) + checkCallbackNotCalled(t3) + checkCallbackNotCalled(t4) + }) + + // ----xxx------ + // +++++++ + // => + // --+++++++--- + It("case 26", func() { + f1 := getData(3) + cb1, t1 := getCallback() + f2 := getData(10) + cb2, t2 := getCallback() + Expect(s.Push(f1, 5, cb1)).To(Succeed()) // 5 - 8 + Expect(s.Push(f2, 3, cb2)).To(Succeed()) // 3 - 13 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f2, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 13, End: protocol.MaxByteCount}, + }) + checkCallbackCalled(t1) + checkCallbackNotCalled(t2) + }) + + // ---xxx====--- + // ++++ + // => + // --+xxx====--- + It("case 27", func() { + f1 := getData(3) + cb1, t1 := getCallback() + f2 := getData(4) + cb2, t2 := getCallback() + f3 := getData(4) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 6, cb2)).To(Succeed()) // 6 - 10 + Expect(s.Push(f3, 2, cb3)).To(Succeed()) // 2 - 6 + checkQueue(map[protocol.ByteCount][]byte{ + 2: f3[:1], + 3: f1, + 6: f2, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 2}, + {Start: 10, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackNotCalled(t2) + checkCallbackCalled(t3) + }) + + It("case 27, for long frames", func() { + const mult = protocol.MinStreamFrameSize + f1 := getData(3 * mult) + cb1, t1 := getCallback() + f2 := getData(4 * mult) + cb2, t2 := getCallback() + f3 := getData(4 * mult) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3*mult, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 6*mult, cb2)).To(Succeed()) // 6 - 10 + Expect(s.Push(f3, 2*mult, cb3)).To(Succeed()) // 2 - 6 + checkQueue(map[protocol.ByteCount][]byte{ + 2 * mult: f3[:mult], + 3 * mult: f1, + 6 * mult: f2, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 2 * mult}, + {Start: 10 * mult, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackNotCalled(t2) + checkCallbackNotCalled(t3) + }) + + // ---xxx====--- + // ++++++ + // => + // --+xxx====--- + It("case 28", func() { + f1 := getData(3) + cb1, t1 := getCallback() + f2 := getData(4) + cb2, t2 := getCallback() + f3 := getData(6) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 6, cb2)).To(Succeed()) // 6 - 10 + Expect(s.Push(f3, 2, cb3)).To(Succeed()) // 2 - 8 + checkQueue(map[protocol.ByteCount][]byte{ + 2: f3[:1], + 3: f1, + 6: f2, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 2}, + {Start: 10, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackNotCalled(t2) + checkCallbackCalled(t3) + }) + + It("case 28, for long frames", func() { + const mult = protocol.MinStreamFrameSize + f1 := getData(3 * mult) + cb1, t1 := getCallback() + f2 := getData(4 * mult) + cb2, t2 := getCallback() + f3 := getData(6 * mult) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3*mult, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 6*mult, cb2)).To(Succeed()) // 6 - 10 + Expect(s.Push(f3, 2*mult, cb3)).To(Succeed()) // 2 - 8 + checkQueue(map[protocol.ByteCount][]byte{ + 2 * mult: f3[:mult], + 3 * mult: f1, + 6 * mult: f2, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 2 * mult}, + {Start: 10 * mult, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackNotCalled(t2) + checkCallbackNotCalled(t3) + }) + + // ---xxx===----- + // +++++ + // => + // ---xxx+++++--- + It("case 29", func() { + f1 := getData(3) + cb1, t1 := getCallback() + f2 := getData(3) + cb2, t2 := getCallback() + f3 := getData(5) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 6, cb2)).To(Succeed()) // 6 - 9 + Expect(s.Push(f3, 6, cb3)).To(Succeed()) // 6 - 11 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f1, + 6: f3, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 11, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackCalled(t2) + checkCallbackNotCalled(t3) + }) + + // ---xxx===---- + // ++++++ + // => + // ---xxx===++-- + It("case 30", func() { + f1 := getData(3) + cb1, t1 := getCallback() + f2 := getData(3) + cb2, t2 := getCallback() + f3 := getData(6) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 6, cb2)).To(Succeed()) // 6 - 9 + Expect(s.Push(f3, 5, cb3)).To(Succeed()) // 5 - 11 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f1, + 6: f2, + 9: f3[4:], + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 11, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackNotCalled(t2) + checkCallbackCalled(t3) + }) + + It("case 30, for long frames", func() { + mult := protocol.ByteCount(math.Ceil(float64(protocol.MinStreamFrameSize) / 2)) + f1 := getData(3 * mult) + cb1, t1 := getCallback() + f2 := getData(3 * mult) + cb2, t2 := getCallback() + f3 := getData(6 * mult) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3*mult, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 6*mult, cb2)).To(Succeed()) // 6 - 9 + Expect(s.Push(f3, 5*mult, cb3)).To(Succeed()) // 5 - 11 + checkQueue(map[protocol.ByteCount][]byte{ + 3 * mult: f1, + 6 * mult: f2, + 9 * mult: f3[4*mult:], + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3 * mult}, + {Start: 11 * mult, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackNotCalled(t2) + checkCallbackNotCalled(t3) + }) + + // ---xxx---===----- + // ++++++++++ + // => + // ---xxx++++++++--- + It("case 31", func() { + f1 := getData(3) + cb1, t1 := getCallback() + f2 := getData(3) + cb2, t2 := getCallback() + f3 := getData(10) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 9, cb2)).To(Succeed()) // 9 - 12 + Expect(s.Push(f3, 5, cb3)).To(Succeed()) // 5 - 15 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f1, + 6: f3[1:], + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 15, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackCalled(t2) + checkCallbackCalled(t3) + }) + + It("case 31, for long frames", func() { + mult := protocol.ByteCount(math.Ceil(float64(protocol.MinStreamFrameSize) / 9)) + f1 := getData(3 * mult) + cb1, t1 := getCallback() + f2 := getData(3 * mult) + cb2, t2 := getCallback() + f3 := getData(10 * mult) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3*mult, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 9*mult, cb2)).To(Succeed()) // 9 - 12 + Expect(s.Push(f3, 5*mult, cb3)).To(Succeed()) // 5 - 15 + checkQueue(map[protocol.ByteCount][]byte{ + 3 * mult: f1, + 6 * mult: f3[mult:], + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3 * mult}, + {Start: 15 * mult, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackCalled(t2) + checkCallbackNotCalled(t3) + }) + + // ---xxx---===----- + // +++++++++ + // => + // ---+++++++++--- + It("case 32", func() { + f1 := getData(3) + cb1, t1 := getCallback() + f2 := getData(3) + cb2, t2 := getCallback() + f3 := getData(9) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 9, cb2)).To(Succeed()) // 9 - 12 + Expect(s.Push(f3, 3, cb3)).To(Succeed()) // 3 - 12 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f3, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 12, End: protocol.MaxByteCount}, + }) + checkCallbackCalled(t1) + checkCallbackCalled(t2) + checkCallbackNotCalled(t3) + }) + + // ---xxx---===###----- + // ++++++++++++ + // => + // ---xxx++++++++++--- + It("case 33", func() { + f1 := getData(3) + cb1, t1 := getCallback() + f2 := getData(3) + cb2, t2 := getCallback() + f3 := getData(3) + cb3, t3 := getCallback() + f4 := getData(12) + cb4, t4 := getCallback() + Expect(s.Push(f1, 3, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 9, cb2)).To(Succeed()) // 9 - 12 + Expect(s.Push(f3, 9, cb3)).To(Succeed()) // 12 - 15 + Expect(s.Push(f4, 5, cb4)).To(Succeed()) // 5 - 17 + checkQueue(map[protocol.ByteCount][]byte{ + 3: f1, + 6: f4[1:], + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 17, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackCalled(t2) + checkCallbackCalled(t3) + checkCallbackCalled(t4) + }) + + It("case 33, for long frames", func() { + mult := protocol.ByteCount(math.Ceil(float64(protocol.MinStreamFrameSize) / 11)) + f1 := getData(3 * mult) + cb1, t1 := getCallback() + f2 := getData(3 * mult) + cb2, t2 := getCallback() + f3 := getData(3 * mult) + cb3, t3 := getCallback() + f4 := getData(12 * mult) + cb4, t4 := getCallback() + Expect(s.Push(f1, 3*mult, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 9*mult, cb2)).To(Succeed()) // 9 - 12 + Expect(s.Push(f3, 9*mult, cb3)).To(Succeed()) // 12 - 15 + Expect(s.Push(f4, 5*mult, cb4)).To(Succeed()) // 5 - 17 + checkQueue(map[protocol.ByteCount][]byte{ + 3 * mult: f1, + 6 * mult: f4[mult:], + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3 * mult}, + {Start: 17 * mult, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackCalled(t2) + checkCallbackCalled(t3) + checkCallbackNotCalled(t4) + }) + + // ---xxx===---### + // ++++++ + // => + // ---xxx++++++### + It("case 34", func() { + f1 := getData(5) + cb1, t1 := getCallback() + f2 := getData(5) + cb2, t2 := getCallback() + f3 := getData(10) + cb3, t3 := getCallback() + f4 := getData(5) + cb4, t4 := getCallback() + Expect(s.Push(f1, 5, cb1)).To(Succeed()) // 5 - 10 + Expect(s.Push(f2, 10, cb2)).To(Succeed()) // 10 - 15 + Expect(s.Push(f4, 20, cb3)).To(Succeed()) // 20 - 25 + Expect(s.Push(f3, 10, cb4)).To(Succeed()) // 10 - 20 + checkQueue(map[protocol.ByteCount][]byte{ + 5: f1, + 10: f3, + 20: f4, + }) + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 5}, + {Start: 25, End: protocol.MaxByteCount}, + }) + checkCallbackNotCalled(t1) + checkCallbackCalled(t2) + checkCallbackNotCalled(t3) + checkCallbackNotCalled(t4) + }) + + // ---xxx---####--- + // ++++++++ + // => + // ---++++++####--- + It("case 35", func() { + f1 := getData(3) + cb1, t1 := getCallback() + f2 := getData(4) + cb2, t2 := getCallback() + f3 := getData(8) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 9, cb2)).To(Succeed()) // 9 - 13 + Expect(s.Push(f3, 3, cb3)).To(Succeed()) // 3 - 11 + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3}, + {Start: 13, End: protocol.MaxByteCount}, + }) + checkQueue(map[protocol.ByteCount][]byte{ + 3: f3[:6], + 9: f2, + }) + checkCallbackCalled(t1) + checkCallbackNotCalled(t2) + checkCallbackCalled(t3) + }) + + It("case 35, for long frames", func() { + mult := protocol.ByteCount(math.Ceil(float64(protocol.MinStreamFrameSize) / 6)) + f1 := getData(3 * mult) + cb1, t1 := getCallback() + f2 := getData(4 * mult) + cb2, t2 := getCallback() + f3 := getData(8 * mult) + cb3, t3 := getCallback() + Expect(s.Push(f1, 3*mult, cb1)).To(Succeed()) // 3 - 6 + Expect(s.Push(f2, 9*mult, cb2)).To(Succeed()) // 9 - 13 + Expect(s.Push(f3, 3*mult, cb3)).To(Succeed()) // 3 - 11 + checkGaps([]utils.ByteInterval{ + {Start: 0, End: 3 * mult}, + {Start: 13 * mult, End: protocol.MaxByteCount}, + }) + checkQueue(map[protocol.ByteCount][]byte{ + 3 * mult: f3[:6*mult], + 9 * mult: f2, + }) + checkCallbackCalled(t1) + checkCallbackNotCalled(t2) + checkCallbackNotCalled(t3) + }) + + Context("receiving data after reads", func() { + It("ignores duplicate frames", func() { Expect(s.Push([]byte("foobar"), 0, nil)).To(Succeed()) + offset, data, _ := s.Pop() + Expect(offset).To(BeZero()) + Expect(data).To(Equal([]byte("foobar"))) + // now receive the duplicate + Expect(s.Push([]byte("foobar"), 0, nil)).To(Succeed()) + Expect(s.queue).To(BeEmpty()) checkGaps([]utils.ByteInterval{ {Start: 6, End: protocol.MaxByteCount}, }) }) - It("finds the two gaps", func() { - Expect(s.Push([]byte("foobar"), 10, nil)).To(Succeed()) - Expect(s.Push([]byte("foobar"), 20, nil)).To(Succeed()) - checkGaps([]utils.ByteInterval{ - {Start: 0, End: 10}, - {Start: 16, End: 20}, - {Start: 26, End: protocol.MaxByteCount}, - }) - }) - - It("finds the two gaps in reverse order", func() { - Expect(s.Push([]byte("foobar"), 20, nil)).To(Succeed()) - Expect(s.Push([]byte("foobar"), 10, nil)).To(Succeed()) - checkGaps([]utils.ByteInterval{ - {Start: 0, End: 10}, - {Start: 16, End: 20}, - {Start: 26, End: protocol.MaxByteCount}, - }) - }) - - It("shrinks a gap when it is partially filled", func() { - Expect(s.Push([]byte("test"), 10, nil)).To(Succeed()) - Expect(s.Push([]byte("foobar"), 4, nil)).To(Succeed()) - checkGaps([]utils.ByteInterval{ - {Start: 0, End: 4}, - {Start: 14, End: protocol.MaxByteCount}, - }) - }) - - It("deletes a gap at the beginning, when it is filled", func() { - Expect(s.Push([]byte("test"), 6, nil)).To(Succeed()) + It("ignores parts of frames that have already been read", func() { + Expect(s.Push([]byte("foo"), 0, nil)).To(Succeed()) + offset, data, _ := s.Pop() + Expect(offset).To(BeZero()) + Expect(data).To(Equal([]byte("foo"))) + // now receive the duplicate Expect(s.Push([]byte("foobar"), 0, nil)).To(Succeed()) + offset, data, _ = s.Pop() + Expect(offset).To(Equal(protocol.ByteCount(3))) + Expect(data).To(Equal([]byte("bar"))) + Expect(s.queue).To(BeEmpty()) checkGaps([]utils.ByteInterval{ - {Start: 10, End: protocol.MaxByteCount}, - }) - }) - - It("deletes a gap in the middle, when it is filled", func() { - Expect(s.Push([]byte("test"), 0, nil)).To(Succeed()) - Expect(s.Push([]byte("test2"), 10, nil)).To(Succeed()) - Expect(s.Push([]byte("foobar"), 4, nil)).To(Succeed()) - Expect(s.queue).To(HaveLen(3)) - checkGaps([]utils.ByteInterval{ - {Start: 15, End: protocol.MaxByteCount}, - }) - }) - - It("splits a gap into two", func() { - Expect(s.Push([]byte("test"), 100, nil)).To(Succeed()) - Expect(s.Push([]byte("foobar"), 50, nil)).To(Succeed()) - Expect(s.queue).To(HaveLen(2)) - checkGaps([]utils.ByteInterval{ - {Start: 0, End: 50}, - {Start: 56, End: 100}, - {Start: 104, End: protocol.MaxByteCount}, - }) - }) - - Context("Overlapping Stream Data detection", func() { - var initialCb1, initialCb2, initialCb3 func() - var initialCb1Called, initialCb2Called, initialCb3Called *bool - - // create gaps: 0-500, 1000-1500, 2000-2500, 3000-inf - BeforeEach(func() { - // make sure frames are not cut when we overlap a little bit - Expect(protocol.MinStreamFrameBufferSize).To(BeNumerically("<", 500/2)) - initialCb1, initialCb1Called = getCallback() - initialCb2, initialCb2Called = getCallback() - initialCb3, initialCb3Called = getCallback() - Expect(s.Push(bytes.Repeat([]byte{1}, 500), 500, initialCb1)).To(Succeed()) - Expect(s.Push(bytes.Repeat([]byte{2}, 500), 1500, initialCb2)).To(Succeed()) - Expect(s.Push(bytes.Repeat([]byte{3}, 500), 2500, initialCb3)).To(Succeed()) - checkGaps([]utils.ByteInterval{ - {Start: 0, End: 500}, - {Start: 1000, End: 1500}, - {Start: 2000, End: 2500}, - {Start: 3000, End: protocol.MaxByteCount}, - }) - }) - - It("cuts a frame with offset 0 that overlaps at the end", func() { - cb, called := getCallback() - // 0 - 505 - Expect(s.Push(bytes.Repeat([]byte{9}, 505), 0, cb)).To(Succeed()) - Expect(s.queue).To(HaveKey(protocol.ByteCount(0))) - Expect(s.queue[0].Data).To(Equal(bytes.Repeat([]byte{9}, 500))) // 0 to 500 - checkGaps([]utils.ByteInterval{ - {Start: 1000, End: 1500}, - {Start: 2000, End: 2500}, - {Start: 3000, End: protocol.MaxByteCount}, - }) - checkCallback(cb, called) - checkCallback(initialCb1, initialCb1Called) - checkCallback(initialCb2, initialCb2Called) - checkCallback(initialCb3, initialCb3Called) - }) - - It("cuts a frame that overlaps at the end", func() { - cb, called := getCallback() - // 100 to 600 - Expect(s.Push(bytes.Repeat([]byte{9}, 500), 100, cb)).To(Succeed()) - Expect(s.queue).To(HaveKey(protocol.ByteCount(100))) - Expect(s.queue[100].Data).To(Equal(bytes.Repeat([]byte{9}, 400))) // 100 to 500 - checkGaps([]utils.ByteInterval{ - {Start: 0, End: 100}, - {Start: 1000, End: 1500}, - {Start: 2000, End: 2500}, - {Start: 3000, End: protocol.MaxByteCount}, - }) - checkCallback(cb, called) - checkCallback(initialCb1, initialCb1Called) - checkCallback(initialCb2, initialCb2Called) - checkCallback(initialCb3, initialCb3Called) - }) - - It("cuts a frame that completely fills a gap, but overlaps at the end", func() { - // 1000 to 1600 - cb, called := getCallback() - Expect(s.Push(bytes.Repeat([]byte{9}, 600), 1000, cb)).To(Succeed()) - Expect(s.queue).To(HaveKey(protocol.ByteCount(1000))) - Expect(s.queue[1000].Data).To(Equal(bytes.Repeat([]byte{9}, 500))) // 1000 to 15000 - checkGaps([]utils.ByteInterval{ - {Start: 0, End: 500}, - {Start: 2000, End: 2500}, - {Start: 3000, End: protocol.MaxByteCount}, - }) - checkCallback(cb, called) - checkCallback(initialCb1, initialCb1Called) - checkCallback(initialCb2, initialCb2Called) - checkCallback(initialCb3, initialCb3Called) - }) - - It("cuts a frame that overlaps at the beginning", func() { - cb, called := getCallback() - // 900 to 1400 - Expect(s.Push(bytes.Repeat([]byte{9}, 500), 900, cb)).To(Succeed()) - Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(900))) - Expect(s.queue).To(HaveKey(protocol.ByteCount(1000))) - Expect(s.queue[1000].Data).To(Equal(bytes.Repeat([]byte{9}, 400))) // 1000 to 1400 - checkGaps([]utils.ByteInterval{ - {Start: 0, End: 500}, - {Start: 1400, End: 1500}, - {Start: 2000, End: 2500}, - {Start: 3000, End: protocol.MaxByteCount}, - }) - checkCallback(cb, called) - checkCallback(initialCb1, initialCb1Called) - checkCallback(initialCb2, initialCb2Called) - checkCallback(initialCb3, initialCb3Called) - }) - - It("processes a frame that overlaps at the beginning and at the end, starting in a gap", func() { - cb, called := getCallback() - // 300 to 1100 - Expect(s.Push(bytes.Repeat([]byte{9}, 800), 300, cb)).To(Succeed()) - Expect(s.queue).To(HaveKey(protocol.ByteCount(300))) - Expect(s.queue[300].Data).To(Equal(bytes.Repeat([]byte{9}, 800))) // 300 to 1100 - checkGaps([]utils.ByteInterval{ - {Start: 0, End: 300}, - {Start: 1100, End: 1500}, - {Start: 2000, End: 2500}, - {Start: 3000, End: protocol.MaxByteCount}, - }) - checkCallback(cb, called) - // initial1 spanned from 500 - 1000, and should have been deleted - Expect(*initialCb1Called).To(BeTrue()) - checkCallback(initialCb2, initialCb2Called) - checkCallback(initialCb3, initialCb3Called) - }) - - It("processes a frame that overlaps at the beginning and at the end, starting in a gap, ending in data", func() { - cb, called := getCallback() - // 400 to 1600 - Expect(s.Push(bytes.Repeat([]byte{9}, 1200), 400, cb)).To(Succeed()) - Expect(s.queue).To(HaveKey(protocol.ByteCount(400))) - Expect(s.queue).To(HaveKey(protocol.ByteCount(1500))) - Expect(s.queue[400].Data).To(Equal(bytes.Repeat([]byte{9}, 1100))) // 400 to 1500 - checkGaps([]utils.ByteInterval{ - {Start: 0, End: 400}, - {Start: 2000, End: 2500}, - {Start: 3000, End: protocol.MaxByteCount}, - }) - checkCallback(cb, called) - // initial1 spans from 500 - 1000, and should have been deleted - Expect(*initialCb1Called).To(BeTrue()) - checkCallback(initialCb2, initialCb2Called) - checkCallback(initialCb3, initialCb3Called) - }) - - It("processes a frame that overlaps at the beginning and at the end, starting in a gap, ending in data", func() { - cb, called := getCallback() - // 500 to 2100 - Expect(s.Push(bytes.Repeat([]byte{9}, 1600), 500, cb)).To(Succeed()) - Expect(s.queue).To(HaveKey(protocol.ByteCount(500))) - Expect(s.queue[500].Data).To(Equal(bytes.Repeat([]byte{9}, 1600))) // 500 to 2100 - Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(1000))) - Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(1500))) - checkGaps([]utils.ByteInterval{ - {Start: 0, End: 500}, - {Start: 2100, End: 2500}, - {Start: 3000, End: protocol.MaxByteCount}, - }) - checkCallback(cb, called) - // initial1 spans from 500 - 1000, and should have been deleted - Expect(*initialCb1Called).To(BeTrue()) - // initial2 spans from 1500 - 2000, and should have been deleted - Expect(*initialCb2Called).To(BeTrue()) - checkCallback(initialCb3, initialCb3Called) - }) - - It("processes a frame that closes multiple gaps, beginning in a gap", func() { - cb, called := getCallback() - // 400 to 3100 - Expect(s.Push(bytes.Repeat([]byte{9}, 2700), 400, cb)).To(Succeed()) - Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(500))) - Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(1500))) - Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(2500))) - Expect(s.queue).To(HaveKey(protocol.ByteCount(400))) - Expect(s.queue[400].Data).To(Equal(bytes.Repeat([]byte{9}, 2700))) // 400 to 3100 - checkGaps([]utils.ByteInterval{ - {Start: 0, End: 400}, - {Start: 3100, End: protocol.MaxByteCount}, - }) - checkCallback(cb, called) - // initial1 spans from 500 - 1000, and should have been deleted - Expect(*initialCb1Called).To(BeTrue()) - // initial2 spans from 1500 - 2000, and should have been deleted - Expect(*initialCb2Called).To(BeTrue()) - // initial3 spans from 2500 - 3100, and should have been deleted - Expect(*initialCb3Called).To(BeTrue()) - }) - - It("processes a frame that closes multiple gaps, beginning at the end of a gap", func() { - cb, called := getCallback() - // 500 to 2600 - Expect(s.Push(bytes.Repeat([]byte{9}, 2100), 500, cb)).To(Succeed()) - Expect(s.queue).To(HaveKey(protocol.ByteCount(500))) - Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(1000))) - Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(1500))) - Expect(s.queue).To(HaveKey(protocol.ByteCount(2500))) - Expect(s.queue[500].Data).To(Equal(bytes.Repeat([]byte{9}, 2000))) // 500 to 2500 - checkGaps([]utils.ByteInterval{ - {Start: 0, End: 500}, - {Start: 3000, End: protocol.MaxByteCount}, - }) - checkCallback(cb, called) - // initial1 spans from 500 - 1000, and should have been deleted - Expect(*initialCb1Called).To(BeTrue()) - // initial2 spans from 1500 - 2000, and should have been deleted - Expect(*initialCb2Called).To(BeTrue()) - checkCallback(initialCb3, initialCb3Called) - }) - - It("processes a frame that covers multiple gaps and ends at the end of a gap", func() { - cb, called := getCallback() - // 100 to 1500 - Expect(s.Push(bytes.Repeat([]byte{9}, 1400), 100, cb)).To(Succeed()) - Expect(s.queue).To(HaveKey(protocol.ByteCount(100))) - Expect(s.queue).To(HaveKey(protocol.ByteCount(1500))) - Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(500))) - Expect(s.queue[100].Data).To(Equal(bytes.Repeat([]byte{9}, 1400))) // 100 to 1500 - checkGaps([]utils.ByteInterval{ - {Start: 0, End: 100}, - {Start: 2000, End: 2500}, - {Start: 3000, End: protocol.MaxByteCount}, - }) - checkCallback(cb, called) - // initial1 spans from 500 - 1000, and should have been deleted - Expect(*initialCb1Called).To(BeTrue()) - checkCallback(initialCb2, initialCb2Called) - checkCallback(initialCb3, initialCb3Called) - }) - - It("processes a frame that closes all gaps (except for the last one)", func() { - cb, called := getCallback() - // 0 to 3100 - Expect(s.Push(bytes.Repeat([]byte{9}, 3100), 0, cb)).To(Succeed()) - Expect(s.queue).To(HaveLen(1)) - Expect(s.queue).To(HaveKey(protocol.ByteCount(0))) - Expect(s.queue[0].Data).To(Equal(bytes.Repeat([]byte{9}, 3100))) // 0 to 3100 - checkGaps([]utils.ByteInterval{ - {Start: 3100, End: protocol.MaxByteCount}, - }) - checkCallback(cb, called) - // initial1 spans from 500 - 1000, and should have been deleted - Expect(*initialCb1Called).To(BeTrue()) - Expect(*initialCb2Called).To(BeTrue()) - Expect(*initialCb3Called).To(BeTrue()) - }) - }) - - Context("duplicate data", func() { - var initialCb1, initialCb2 func() - var initialCb1Called, initialCb2Called *bool - - BeforeEach(func() { - // make sure frames are not cut when we overlap a little bit - Expect(protocol.MinStreamFrameBufferSize).To(BeNumerically("<", 500/2)) - initialCb1, initialCb1Called = getCallback() - initialCb2, initialCb2Called = getCallback() - // create gaps: 500 - 1000, 1500 - inf - Expect(s.Push(bytes.Repeat([]byte{1}, 500), 0, initialCb1)).To(Succeed()) - Expect(s.Push(bytes.Repeat([]byte{2}, 500), 1000, initialCb1)).To(Succeed()) - checkGaps([]utils.ByteInterval{ - {Start: 500, End: 1000}, - {Start: 1500, End: protocol.MaxByteCount}, - }) - }) - - AfterEach(func() { - // check that the gaps were not modified - checkGaps([]utils.ByteInterval{ - {Start: 500, End: 1000}, - {Start: 1500, End: protocol.MaxByteCount}, - }) - }) - - It("does not modify data when receiving a duplicate", func() { - cb, called := getCallback() - // 0 to 500 - Expect(s.Push(bytes.Repeat([]byte{9}, 500), 0, cb)).To(Succeed()) - Expect(s.queue).To(HaveKey(protocol.ByteCount(0))) - Expect(s.queue[0].Data).ToNot(Equal(bytes.Repeat([]byte{9}, 500))) - Expect(*called).To(BeTrue()) - checkCallback(initialCb1, initialCb1Called) - checkCallback(initialCb2, initialCb2Called) - }) - - It("detects a duplicate frame that is smaller than the original, starting at the beginning", func() { - cb, called := getCallback() - // 1000 to 1200 - Expect(s.Push(bytes.Repeat([]byte{9}, 200), 1000, cb)).To(Succeed()) - Expect(s.queue).To(HaveKey(protocol.ByteCount(1000))) - Expect(s.queue[1000].Data).ToNot(Equal(bytes.Repeat([]byte{9}, 200))) - Expect(*called).To(BeTrue()) - checkCallback(initialCb1, initialCb1Called) - checkCallback(initialCb2, initialCb2Called) - }) - - It("detects a duplicate frame that is smaller than the original, somewhere in the middle", func() { - cb, called := getCallback() - // 100 to 400 - Expect(s.Push(bytes.Repeat([]byte{9}, 300), 100, cb)).To(Succeed()) - Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(100))) - Expect(s.queue[0].Data).To(Equal(bytes.Repeat([]byte{1}, 500))) - Expect(*called).To(BeTrue()) - checkCallback(initialCb1, initialCb1Called) - checkCallback(initialCb2, initialCb2Called) - }) - - It("detects a duplicate frame that is smaller than the original, somewhere in the middle in the last block", func() { - cb, called := getCallback() - // 1100 to 1400 - Expect(s.Push(bytes.Repeat([]byte{9}, 300), 1100, cb)).To(Succeed()) - Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(1100))) - Expect(s.queue[1000].Data).To(Equal(bytes.Repeat([]byte{2}, 500))) - Expect(*called).To(BeTrue()) - checkCallback(initialCb1, initialCb1Called) - checkCallback(initialCb2, initialCb2Called) - }) - - It("detects a duplicate frame that is smaller than the original, somewhere in the middle in the last block", func() { - cb, called := getCallback() - // 1100 to 1500 - Expect(s.Push(bytes.Repeat([]byte{9}, 400), 1100, cb)).To(Succeed()) - Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(1100))) - Expect(s.queue[1000].Data).To(Equal(bytes.Repeat([]byte{2}, 500))) - Expect(*called).To(BeTrue()) - checkCallback(initialCb1, initialCb1Called) - checkCallback(initialCb2, initialCb2Called) - }) - - It("detects a duplicate frame that is smaller than the original, with aligned end", func() { - cb, called := getCallback() - // 300 to 500 - Expect(s.Push(bytes.Repeat([]byte{9}, 200), 300, cb)).To(Succeed()) - Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(300))) - Expect(s.queue[0].Data).To(Equal(bytes.Repeat([]byte{1}, 500))) - Expect(*called).To(BeTrue()) - checkCallback(initialCb1, initialCb1Called) - checkCallback(initialCb2, initialCb2Called) - }) - }) - - Context("cutting short frames", func() { - var initialCb1, initialCb2 func() - var initialCb1Called, initialCb2Called *bool - - // create gaps: 0-5, 10-15, 2000-inf - BeforeEach(func() { - // make sure frames are not cut when we overlap a little bit - Expect(protocol.MinStreamFrameBufferSize).To(BeNumerically(">", 10)) - initialCb1, initialCb1Called = getCallback() - initialCb2, initialCb2Called = getCallback() - Expect(s.Push(bytes.Repeat([]byte{1}, 5), 5, initialCb1)).To(Succeed()) - Expect(s.Push(bytes.Repeat([]byte{2}, 5), 15, initialCb2)).To(Succeed()) - checkGaps([]utils.ByteInterval{ - {Start: 0, End: 5}, - {Start: 10, End: 15}, - {Start: 20, End: protocol.MaxByteCount}, - }) - }) - - It("cuts a frame that overlaps with received data at the beginning", func() { - cb, called := getCallback() - // 9 to 12 - Expect(s.Push(bytes.Repeat([]byte{9}, 3), 9, cb)).To(Succeed()) - Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(9))) - Expect(s.queue).To(HaveKey(protocol.ByteCount(10))) - Expect(s.queue[10].Data).To(Equal(bytes.Repeat([]byte{9}, 2))) // 10 to 12 - Expect(s.queue[10].Data).To(HaveCap(2)) - checkGaps([]utils.ByteInterval{ - {Start: 0, End: 5}, - {Start: 12, End: 15}, - {Start: 20, End: protocol.MaxByteCount}, - }) - Expect(*called).To(BeTrue()) - checkCallback(initialCb1, initialCb1Called) - checkCallback(initialCb2, initialCb2Called) - }) - - It("cuts a frame that overlaps with received data at the end", func() { - cb, called := getCallback() - // 12 to 19 - Expect(s.Push(bytes.Repeat([]byte{9}, 7), 12, cb)).To(Succeed()) - Expect(s.queue).To(HaveKey(protocol.ByteCount(12))) - Expect(s.queue[12].Data).To(Equal(bytes.Repeat([]byte{9}, 3))) // 12 to 15 - Expect(s.queue[12].Data).To(HaveCap(3)) - checkGaps([]utils.ByteInterval{ - {Start: 0, End: 5}, - {Start: 10, End: 12}, - {Start: 20, End: protocol.MaxByteCount}, - }) - Expect(*called).To(BeTrue()) - checkCallback(initialCb1, initialCb1Called) - checkCallback(initialCb2, initialCb2Called) - }) - }) - - Context("DoS protection", func() { - It("errors when too many gaps are created", func() { - for i := 0; i < protocol.MaxStreamFrameSorterGaps; i++ { - Expect(s.Push([]byte("foobar"), protocol.ByteCount(i*7), nil)).To(Succeed()) - } - Expect(s.gaps.Len()).To(Equal(protocol.MaxStreamFrameSorterGaps)) - err := s.Push([]byte("foobar"), protocol.ByteCount(protocol.MaxStreamFrameSorterGaps*7)+100, nil) - Expect(err).To(MatchError("too many gaps in received data")) + {Start: 6, End: protocol.MaxByteCount}, }) }) }) + + Context("DoS protection", func() { + It("errors when too many gaps are created", func() { + for i := 0; i < protocol.MaxStreamFrameSorterGaps; i++ { + Expect(s.Push([]byte("foobar"), protocol.ByteCount(i*7), nil)).To(Succeed()) + } + Expect(s.gaps.Len()).To(Equal(protocol.MaxStreamFrameSorterGaps)) + err := s.Push([]byte("foobar"), protocol.ByteCount(protocol.MaxStreamFrameSorterGaps*7)+100, nil) + Expect(err).To(MatchError("too many gaps in received data")) + }) + }) + }) + + Context("stress testing", func() { + type frame struct { + offset protocol.ByteCount + data []byte + } + + for _, lf := range []bool{true, false} { + longFrames := lf + + const num = 1000 + + name := "short" + if longFrames { + name = "long" + } + + Context(fmt.Sprintf("using %s frames", name), func() { + var data []byte + var dataLen protocol.ByteCount + var callbacks []callbackTracker + + BeforeEach(func() { + callbacks = nil + dataLen = 25 + if longFrames { + dataLen = 2 * protocol.MinStreamFrameSize + } + + data = make([]byte, num*dataLen) + for i := 0; i < num; i++ { + for j := protocol.ByteCount(0); j < dataLen; j++ { + data[protocol.ByteCount(i)*dataLen+j] = uint8(i) + } + } + }) + + getRandomFrames := func() []frame { + frames := make([]frame, num) + for i := protocol.ByteCount(0); i < num; i++ { + b := make([]byte, dataLen) + Expect(copy(b, data[i*dataLen:])).To(BeEquivalentTo(dataLen)) + frames[i] = frame{ + offset: i * dataLen, + data: b, + } + } + rand.Shuffle(len(frames), func(i, j int) { frames[i], frames[j] = frames[j], frames[i] }) + return frames + } + + getData := func() []byte { + var data []byte + for { + offset, b, cb := s.Pop() + if b == nil { + break + } + Expect(offset).To(BeEquivalentTo(len(data))) + data = append(data, b...) + if cb != nil { + cb() + } + } + return data + } + + // push pushes data to the frame sorter + // It creates a new callback and adds the + push := func(data []byte, offset protocol.ByteCount) { + cb, t := getCallback() + fmt.Fprintf(GinkgoWriter, "Pushing %d bytes at offset %d - %d\n", len(data), offset, offset+protocol.ByteCount(len(data))) + ExpectWithOffset(1, s.Push(data, offset, cb)).To(Succeed()) + callbacks = append(callbacks, t) + } + + checkCallbacks := func() { + ExpectWithOffset(1, callbacks).ToNot(BeEmpty()) + for _, t := range callbacks { + checkCallbackCalled(t) + } + } + + It("inserting frames in a random order", func() { + frames := getRandomFrames() + + for _, f := range frames { + push(f.data, f.offset) + } + checkGaps([]utils.ByteInterval{{Start: num * dataLen, End: protocol.MaxByteCount}}) + + Expect(getData()).To(Equal(data)) + Expect(s.queue).To(BeEmpty()) + checkCallbacks() + }) + + It("inserting frames in a random order, with some duplicates", func() { + frames := getRandomFrames() + + for _, f := range frames { + push(f.data, f.offset) + if rand.Intn(10) < 5 { + df := frames[rand.Intn(len(frames))] + push(df.data, df.offset) + } + } + checkGaps([]utils.ByteInterval{{Start: num * dataLen, End: protocol.MaxByteCount}}) + + Expect(getData()).To(Equal(data)) + Expect(s.queue).To(BeEmpty()) + checkCallbacks() + }) + + It("inserting frames in a random order, with randomly cut retransmissions", func() { + seed := time.Now().UnixNano() + fmt.Fprintf(GinkgoWriter, "Seed: %d\n", seed) + rand.Seed(seed) + + frames := getRandomFrames() + + for _, f := range frames { + push(f.data, f.offset) + if rand.Intn(10) < 5 { + length := protocol.ByteCount(1 + rand.Intn(int(4*dataLen))) + if length >= num*dataLen { + length = num*dataLen - 1 + } + b := make([]byte, length) + offset := protocol.ByteCount(rand.Intn(int(num*dataLen - length))) + Expect(copy(b, data[offset:offset+length])).To(BeEquivalentTo(length)) + push(b, offset) + } + } + checkGaps([]utils.ByteInterval{{Start: num * dataLen, End: protocol.MaxByteCount}}) + + Expect(getData()).To(Equal(data)) + Expect(s.queue).To(BeEmpty()) + checkCallbacks() + }) + }) + } }) })