move tracking of the final stream offset to the stream

This commit is contained in:
Marten Seemann 2019-01-22 21:02:41 +07:00
parent 9888db457f
commit 1864e301ef
4 changed files with 90 additions and 113 deletions

View file

@ -53,11 +53,11 @@ func (s *cryptoStreamImpl) HandleCryptoFrame(f *wire.CryptoFrame) error {
return nil
}
s.highestOffset = utils.MaxByteCount(s.highestOffset, highestOffset)
if err := s.queue.Push(f.Data, f.Offset, false); err != nil {
if err := s.queue.Push(f.Data, f.Offset); err != nil {
return err
}
for {
data, _ := s.queue.Pop()
_, data := s.queue.Pop()
if data == nil {
return nil
}

View file

@ -8,36 +8,31 @@ import (
)
type frameSorter struct {
queue map[protocol.ByteCount][]byte
readPos protocol.ByteCount
finalOffset protocol.ByteCount
gaps *utils.ByteIntervalList
queue map[protocol.ByteCount][]byte
readPos protocol.ByteCount
gaps *utils.ByteIntervalList
}
var errDuplicateStreamData = errors.New("Duplicate Stream Data")
func newFrameSorter() *frameSorter {
s := frameSorter{
gaps: utils.NewByteIntervalList(),
queue: make(map[protocol.ByteCount][]byte),
finalOffset: protocol.MaxByteCount,
gaps: utils.NewByteIntervalList(),
queue: make(map[protocol.ByteCount][]byte),
}
s.gaps.PushFront(utils.ByteInterval{Start: 0, End: protocol.MaxByteCount})
return &s
}
func (s *frameSorter) Push(data []byte, offset protocol.ByteCount, fin bool) error {
err := s.push(data, offset, fin)
func (s *frameSorter) Push(data []byte, offset protocol.ByteCount) error {
err := s.push(data, offset)
if err == errDuplicateStreamData {
return nil
}
return err
}
func (s *frameSorter) push(data []byte, offset protocol.ByteCount, fin bool) error {
if fin {
s.finalOffset = offset + protocol.ByteCount(len(data))
}
func (s *frameSorter) push(data []byte, offset protocol.ByteCount) error {
if len(data) == 0 {
return nil
}
@ -147,14 +142,15 @@ func (s *frameSorter) push(data []byte, offset protocol.ByteCount, fin bool) err
return nil
}
func (s *frameSorter) Pop() ([]byte /* data */, bool /* fin */) {
func (s *frameSorter) Pop() (protocol.ByteCount, []byte) {
data, ok := s.queue[s.readPos]
if !ok {
return nil, s.readPos >= s.finalOffset
return s.readPos, nil
}
delete(s.queue, s.readPos)
offset := s.readPos
s.readPos += protocol.ByteCount(len(data))
return data, s.readPos >= s.finalOffset
return offset, data
}
// HasMoreData says if there is any more data queued at *any* offset.

View file

@ -25,81 +25,54 @@ var _ = Describe("STREAM frame sorter", func() {
s = newFrameSorter()
})
It("head returns nil when empty", func() {
Expect(s.Pop()).To(BeNil())
It("returns nil when empty", func() {
_, data := s.Pop()
Expect(data).To(BeNil())
})
Context("Push", func() {
It("inserts and pops a single frame", func() {
Expect(s.Push([]byte("foobar"), 0, false)).To(Succeed())
data, fin := s.Pop()
Expect(s.Push([]byte("foobar"), 0)).To(Succeed())
offset, data := s.Pop()
Expect(offset).To(BeZero())
Expect(data).To(Equal([]byte("foobar")))
Expect(fin).To(BeFalse())
Expect(s.Pop()).To(BeNil())
offset, data = s.Pop()
Expect(offset).To(Equal(protocol.ByteCount(6)))
Expect(data).To(BeNil())
})
It("inserts and pops two consecutive frame", func() {
Expect(s.Push([]byte("foo"), 0, false)).To(Succeed())
Expect(s.Push([]byte("bar"), 3, false)).To(Succeed())
data, fin := s.Pop()
Expect(s.Push([]byte("foo"), 0)).To(Succeed())
Expect(s.Push([]byte("bar"), 3)).To(Succeed())
offset, data := s.Pop()
Expect(offset).To(BeZero())
Expect(data).To(Equal([]byte("foo")))
Expect(fin).To(BeFalse())
data, fin = s.Pop()
offset, data = s.Pop()
Expect(offset).To(Equal(protocol.ByteCount(3)))
Expect(data).To(Equal([]byte("bar")))
Expect(fin).To(BeFalse())
Expect(s.Pop()).To(BeNil())
offset, data = s.Pop()
Expect(offset).To(Equal(protocol.ByteCount(6)))
Expect(data).To(BeNil())
})
It("ignores empty frames", func() {
Expect(s.Push(nil, 0, false)).To(Succeed())
Expect(s.Pop()).To(BeNil())
Expect(s.Push(nil, 0)).To(Succeed())
_, data := s.Pop()
Expect(data).To(BeNil())
})
It("says if has more data", func() {
Expect(s.HasMoreData()).To(BeFalse())
Expect(s.Push([]byte("foo"), 0, false)).To(Succeed())
Expect(s.Push([]byte("foo"), 0)).To(Succeed())
Expect(s.HasMoreData()).To(BeTrue())
data, _ := s.Pop()
_, data := s.Pop()
Expect(data).To(Equal([]byte("foo")))
Expect(s.HasMoreData()).To(BeFalse())
})
Context("FIN handling", func() {
It("saves a FIN at offset 0", func() {
Expect(s.Push(nil, 0, true)).To(Succeed())
data, fin := s.Pop()
Expect(data).To(BeEmpty())
Expect(fin).To(BeTrue())
data, fin = s.Pop()
Expect(data).To(BeNil())
Expect(fin).To(BeTrue())
})
It("saves a FIN frame at non-zero offset", func() {
Expect(s.Push([]byte("foobar"), 0, true)).To(Succeed())
data, fin := s.Pop()
Expect(data).To(Equal([]byte("foobar")))
Expect(fin).To(BeTrue())
data, fin = s.Pop()
Expect(data).To(BeNil())
Expect(fin).To(BeTrue())
})
It("sets the FIN if a stream is closed after receiving some data", func() {
Expect(s.Push([]byte("foobar"), 0, false)).To(Succeed())
Expect(s.Push(nil, 6, true)).To(Succeed())
data, fin := s.Pop()
Expect(data).To(Equal([]byte("foobar")))
Expect(fin).To(BeTrue())
data, fin = s.Pop()
Expect(data).To(BeNil())
Expect(fin).To(BeTrue())
})
})
Context("Gap handling", func() {
It("finds the first gap", func() {
Expect(s.Push([]byte("foobar"), 10, false)).To(Succeed())
Expect(s.Push([]byte("foobar"), 10)).To(Succeed())
checkGaps([]utils.ByteInterval{
{Start: 0, End: 10},
{Start: 16, End: protocol.MaxByteCount},
@ -107,15 +80,15 @@ var _ = Describe("STREAM frame sorter", func() {
})
It("correctly sets the first gap for a frame with offset 0", func() {
Expect(s.Push([]byte("foobar"), 0, false)).To(Succeed())
Expect(s.Push([]byte("foobar"), 0)).To(Succeed())
checkGaps([]utils.ByteInterval{
{Start: 6, End: protocol.MaxByteCount},
})
})
It("finds the two gaps", func() {
Expect(s.Push([]byte("foobar"), 10, false)).To(Succeed())
Expect(s.Push([]byte("foobar"), 20, false)).To(Succeed())
Expect(s.Push([]byte("foobar"), 10)).To(Succeed())
Expect(s.Push([]byte("foobar"), 20)).To(Succeed())
checkGaps([]utils.ByteInterval{
{Start: 0, End: 10},
{Start: 16, End: 20},
@ -124,8 +97,8 @@ var _ = Describe("STREAM frame sorter", func() {
})
It("finds the two gaps in reverse order", func() {
Expect(s.Push([]byte("foobar"), 20, false)).To(Succeed())
Expect(s.Push([]byte("foobar"), 10, false)).To(Succeed())
Expect(s.Push([]byte("foobar"), 20)).To(Succeed())
Expect(s.Push([]byte("foobar"), 10)).To(Succeed())
checkGaps([]utils.ByteInterval{
{Start: 0, End: 10},
{Start: 16, End: 20},
@ -134,8 +107,8 @@ var _ = Describe("STREAM frame sorter", func() {
})
It("shrinks a gap when it is partially filled", func() {
Expect(s.Push([]byte("test"), 10, false)).To(Succeed())
Expect(s.Push([]byte("foobar"), 4, false)).To(Succeed())
Expect(s.Push([]byte("test"), 10)).To(Succeed())
Expect(s.Push([]byte("foobar"), 4)).To(Succeed())
checkGaps([]utils.ByteInterval{
{Start: 0, End: 4},
{Start: 14, End: protocol.MaxByteCount},
@ -143,17 +116,17 @@ var _ = Describe("STREAM frame sorter", func() {
})
It("deletes a gap at the beginning, when it is filled", func() {
Expect(s.Push([]byte("test"), 6, false)).To(Succeed())
Expect(s.Push([]byte("foobar"), 0, false)).To(Succeed())
Expect(s.Push([]byte("test"), 6)).To(Succeed())
Expect(s.Push([]byte("foobar"), 0)).To(Succeed())
checkGaps([]utils.ByteInterval{
{Start: 10, End: protocol.MaxByteCount},
})
})
It("deletes a gap in the middle, when it is filled", func() {
Expect(s.Push([]byte("test"), 0, false)).To(Succeed())
Expect(s.Push([]byte("test2"), 10, false)).To(Succeed())
Expect(s.Push([]byte("foobar"), 4, false)).To(Succeed())
Expect(s.Push([]byte("test"), 0)).To(Succeed())
Expect(s.Push([]byte("test2"), 10)).To(Succeed())
Expect(s.Push([]byte("foobar"), 4)).To(Succeed())
Expect(s.queue).To(HaveLen(3))
checkGaps([]utils.ByteInterval{
{Start: 15, End: protocol.MaxByteCount},
@ -161,8 +134,8 @@ var _ = Describe("STREAM frame sorter", func() {
})
It("splits a gap into two", func() {
Expect(s.Push([]byte("test"), 100, false)).To(Succeed())
Expect(s.Push([]byte("foobar"), 50, false)).To(Succeed())
Expect(s.Push([]byte("test"), 100)).To(Succeed())
Expect(s.Push([]byte("foobar"), 50)).To(Succeed())
Expect(s.queue).To(HaveLen(2))
checkGaps([]utils.ByteInterval{
{Start: 0, End: 50},
@ -174,9 +147,9 @@ var _ = Describe("STREAM frame sorter", func() {
Context("Overlapping Stream Data detection", func() {
// create gaps: 0-5, 10-15, 20-25, 30-inf
BeforeEach(func() {
Expect(s.Push([]byte("12345"), 5, false)).To(Succeed())
Expect(s.Push([]byte("12345"), 15, false)).To(Succeed())
Expect(s.Push([]byte("12345"), 25, false)).To(Succeed())
Expect(s.Push([]byte("12345"), 5)).To(Succeed())
Expect(s.Push([]byte("12345"), 15)).To(Succeed())
Expect(s.Push([]byte("12345"), 25)).To(Succeed())
checkGaps([]utils.ByteInterval{
{Start: 0, End: 5},
{Start: 10, End: 15},
@ -186,7 +159,7 @@ var _ = Describe("STREAM frame sorter", func() {
})
It("cuts a frame with offset 0 that overlaps at the end", func() {
Expect(s.Push([]byte("foobar"), 0, false)).To(Succeed())
Expect(s.Push([]byte("foobar"), 0)).To(Succeed())
Expect(s.queue).To(HaveKey(protocol.ByteCount(0)))
Expect(s.queue[0]).To(Equal([]byte("fooba")))
Expect(s.queue[0]).To(HaveCap(5))
@ -199,7 +172,7 @@ var _ = Describe("STREAM frame sorter", func() {
It("cuts a frame that overlaps at the end", func() {
// 4 to 7
Expect(s.Push([]byte("foo"), 4, false)).To(Succeed())
Expect(s.Push([]byte("foo"), 4)).To(Succeed())
Expect(s.queue).To(HaveKey(protocol.ByteCount(4)))
Expect(s.queue[4]).To(Equal([]byte("f")))
Expect(s.queue[4]).To(HaveCap(1))
@ -213,7 +186,7 @@ var _ = Describe("STREAM frame sorter", func() {
It("cuts a frame that completely fills a gap, but overlaps at the end", func() {
// 10 to 16
Expect(s.Push([]byte("foobar"), 10, false)).To(Succeed())
Expect(s.Push([]byte("foobar"), 10)).To(Succeed())
Expect(s.queue).To(HaveKey(protocol.ByteCount(10)))
Expect(s.queue[10]).To(Equal([]byte("fooba")))
Expect(s.queue[10]).To(HaveCap(5))
@ -226,7 +199,7 @@ var _ = Describe("STREAM frame sorter", func() {
It("cuts a frame that overlaps at the beginning", func() {
// 8 to 14
Expect(s.Push([]byte("foobar"), 8, false)).To(Succeed())
Expect(s.Push([]byte("foobar"), 8)).To(Succeed())
Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(8)))
Expect(s.queue).To(HaveKey(protocol.ByteCount(10)))
Expect(s.queue[10]).To(Equal([]byte("obar")))
@ -241,7 +214,7 @@ var _ = Describe("STREAM frame sorter", func() {
It("processes a frame that overlaps at the beginning and at the end, starting in a gap", func() {
// 2 to 12
Expect(s.Push([]byte("1234567890"), 2, false)).To(Succeed())
Expect(s.Push([]byte("1234567890"), 2)).To(Succeed())
Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(5)))
Expect(s.queue).To(HaveKey(protocol.ByteCount(2)))
Expect(s.queue[2]).To(Equal([]byte("1234567890")))
@ -255,7 +228,7 @@ var _ = Describe("STREAM frame sorter", func() {
It("processes a frame that overlaps at the beginning and at the end, starting in a gap, ending in data", func() {
// 2 to 17
Expect(s.Push([]byte("123456789012345"), 2, false)).To(Succeed())
Expect(s.Push([]byte("123456789012345"), 2)).To(Succeed())
Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(5)))
Expect(s.queue).To(HaveKey(protocol.ByteCount(2)))
Expect(s.queue[2]).To(Equal([]byte("1234567890123")))
@ -269,7 +242,7 @@ var _ = Describe("STREAM frame sorter", func() {
It("processes a frame that overlaps at the beginning and at the end, starting in a gap, ending in data", func() {
// 5 to 22
Expect(s.Push([]byte("12345678901234567"), 5, false)).To(Succeed())
Expect(s.Push([]byte("12345678901234567"), 5)).To(Succeed())
Expect(s.queue).To(HaveKey(protocol.ByteCount(5)))
Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(15)))
Expect(s.queue[10]).To(Equal([]byte("678901234567")))
@ -282,7 +255,7 @@ var _ = Describe("STREAM frame sorter", func() {
It("processes a frame that closes multiple gaps", func() {
// 2 to 27
Expect(s.Push(bytes.Repeat([]byte{'e'}, 25), 2, false)).To(Succeed())
Expect(s.Push(bytes.Repeat([]byte{'e'}, 25), 2)).To(Succeed())
Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(5)))
Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(15)))
Expect(s.queue).To(HaveKey(protocol.ByteCount(25)))
@ -297,7 +270,7 @@ var _ = Describe("STREAM frame sorter", func() {
It("processes a frame that closes multiple gaps", func() {
// 5 to 27
Expect(s.Push(bytes.Repeat([]byte{'d'}, 22), 5, false)).To(Succeed())
Expect(s.Push(bytes.Repeat([]byte{'d'}, 22), 5)).To(Succeed())
Expect(s.queue).To(HaveKey(protocol.ByteCount(5)))
Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(15)))
Expect(s.queue).To(HaveKey(protocol.ByteCount(25)))
@ -313,7 +286,7 @@ var _ = Describe("STREAM frame sorter", func() {
It("processes a frame that covers multiple gaps and ends at the end of a gap", func() {
data := bytes.Repeat([]byte{'e'}, 14)
// 1 to 15
Expect(s.Push(data, 1, false)).To(Succeed())
Expect(s.Push(data, 1)).To(Succeed())
Expect(s.queue).To(HaveKey(protocol.ByteCount(1)))
Expect(s.queue).To(HaveKey(protocol.ByteCount(15)))
Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(5)))
@ -328,7 +301,7 @@ var _ = Describe("STREAM frame sorter", func() {
It("processes a frame that closes all gaps (except for the last one)", func() {
data := bytes.Repeat([]byte{'f'}, 32)
// 0 to 32
Expect(s.Push(data, 0, false)).To(Succeed())
Expect(s.Push(data, 0)).To(Succeed())
Expect(s.queue).To(HaveLen(1))
Expect(s.queue).To(HaveKey(protocol.ByteCount(0)))
Expect(s.queue[0]).To(Equal(data))
@ -339,7 +312,7 @@ var _ = Describe("STREAM frame sorter", func() {
It("cuts a frame that overlaps at the beginning and at the end, starting in data already received", func() {
// 8 to 17
Expect(s.Push([]byte("123456789"), 8, false)).To(Succeed())
Expect(s.Push([]byte("123456789"), 8)).To(Succeed())
Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(8)))
Expect(s.queue).To(HaveKey(protocol.ByteCount(10)))
Expect(s.queue[10]).To(Equal([]byte("34567")))
@ -353,7 +326,7 @@ var _ = Describe("STREAM frame sorter", func() {
It("cuts a frame that completely covers two gaps", func() {
// 10 to 20
Expect(s.Push([]byte("1234567890"), 10, false)).To(Succeed())
Expect(s.Push([]byte("1234567890"), 10)).To(Succeed())
Expect(s.queue).To(HaveKey(protocol.ByteCount(10)))
Expect(s.queue[10]).To(Equal([]byte("12345")))
Expect(s.queue[10]).To(HaveCap(5))
@ -373,8 +346,8 @@ var _ = Describe("STREAM frame sorter", func() {
BeforeEach(func() {
// create gaps: 5-10, 15-inf
Expect(s.Push([]byte("12345"), 0, false)).To(Succeed())
Expect(s.Push([]byte("12345"), 10, false)).To(Succeed())
Expect(s.Push([]byte("12345"), 0)).To(Succeed())
Expect(s.Push([]byte("12345"), 10)).To(Succeed())
checkGaps(expectedGaps)
})
@ -384,21 +357,21 @@ var _ = Describe("STREAM frame sorter", func() {
})
It("does not modify data when receiving a duplicate", func() {
err := s.push([]byte("fffff"), 0, false)
err := s.push([]byte("fffff"), 0)
Expect(err).To(MatchError(errDuplicateStreamData))
Expect(s.queue[0]).ToNot(Equal([]byte("fffff")))
})
It("detects a duplicate frame that is smaller than the original, starting at the beginning", func() {
// 10 to 12
err := s.push([]byte("12"), 10, false)
err := s.push([]byte("12"), 10)
Expect(err).To(MatchError(errDuplicateStreamData))
Expect(s.queue[10]).To(HaveLen(5))
})
It("detects a duplicate frame that is smaller than the original, somewhere in the middle", func() {
// 1 to 4
err := s.push([]byte("123"), 1, false)
err := s.push([]byte("123"), 1)
Expect(err).To(MatchError(errDuplicateStreamData))
Expect(s.queue[0]).To(HaveLen(5))
Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(1)))
@ -406,7 +379,7 @@ var _ = Describe("STREAM frame sorter", func() {
It("detects a duplicate frame that is smaller than the original, somewhere in the middle in the last block", func() {
// 11 to 14
err := s.push([]byte("123"), 11, false)
err := s.push([]byte("123"), 11)
Expect(err).To(MatchError(errDuplicateStreamData))
Expect(s.queue[10]).To(HaveLen(5))
Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(11)))
@ -414,7 +387,7 @@ var _ = Describe("STREAM frame sorter", func() {
It("detects a duplicate frame that is smaller than the original, with aligned end in the last block", func() {
// 11 to 15
err := s.push([]byte("1234"), 1, false)
err := s.push([]byte("1234"), 1)
Expect(err).To(MatchError(errDuplicateStreamData))
Expect(s.queue[10]).To(HaveLen(5))
Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(11)))
@ -422,7 +395,7 @@ var _ = Describe("STREAM frame sorter", func() {
It("detects a duplicate frame that is smaller than the original, with aligned end", func() {
// 3 to 5
err := s.push([]byte("12"), 3, false)
err := s.push([]byte("12"), 3)
Expect(err).To(MatchError(errDuplicateStreamData))
Expect(s.queue[0]).To(HaveLen(5))
Expect(s.queue).ToNot(HaveKey(protocol.ByteCount(3)))
@ -432,10 +405,10 @@ var _ = Describe("STREAM frame sorter", func() {
Context("DoS protection", func() {
It("errors when too many gaps are created", func() {
for i := 0; i < protocol.MaxStreamFrameSorterGaps; i++ {
Expect(s.Push([]byte("foobar"), protocol.ByteCount(i*7), false)).To(Succeed())
Expect(s.Push([]byte("foobar"), protocol.ByteCount(i*7))).To(Succeed())
}
Expect(s.gaps.Len()).To(Equal(protocol.MaxStreamFrameSorterGaps))
err := s.Push([]byte("foobar"), protocol.ByteCount(protocol.MaxStreamFrameSorterGaps*7)+100, false)
err := s.Push([]byte("foobar"), protocol.ByteCount(protocol.MaxStreamFrameSorterGaps*7)+100)
Expect(err).To(MatchError("Too many gaps in received data"))
})
})

View file

@ -28,8 +28,9 @@ type receiveStream struct {
sender streamSender
frameQueue *frameSorter
readOffset protocol.ByteCount
frameQueue *frameSorter
readOffset protocol.ByteCount
finalOffset protocol.ByteCount
currentFrame []byte
currentFrameIsLast bool // is the currentFrame the last frame on this stream
@ -66,6 +67,7 @@ func newReceiveStream(
flowController: flowController,
frameQueue: newFrameSorter(),
readChan: make(chan struct{}, 1),
finalOffset: protocol.MaxByteCount,
version: version,
}
}
@ -182,7 +184,9 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err
}
func (s *receiveStream) dequeueNextFrame() {
s.currentFrame, s.currentFrameIsLast = s.frameQueue.Pop()
var offset protocol.ByteCount
offset, s.currentFrame = s.frameQueue.Pop()
s.currentFrameIsLast = offset+protocol.ByteCount(len(s.currentFrame)) >= s.finalOffset
s.readPosInFrame = 0
}
@ -211,9 +215,12 @@ func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error {
s.mutex.Lock()
defer s.mutex.Unlock()
if err := s.frameQueue.Push(frame.Data, frame.Offset, frame.FinBit); err != nil {
if err := s.frameQueue.Push(frame.Data, frame.Offset); err != nil {
return err
}
if frame.FinBit {
s.finalOffset = maxOffset
}
s.signalRead()
return nil
}
@ -236,6 +243,7 @@ func (s *receiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame)
if err := s.flowController.UpdateHighestReceived(frame.ByteOffset, true); err != nil {
return false, err
}
s.finalOffset = frame.ByteOffset
// ignore duplicate RESET_STREAM frames for this stream (after checking their final offset)
if s.resetRemotely {