mirror of
https://github.com/refraction-networking/uquic.git
synced 2025-04-04 20:57:36 +03:00
rewrite the frame sorter
This commit is contained in:
parent
fa69438124
commit
39fe927ef5
2 changed files with 1598 additions and 589 deletions
206
frame_sorter.go
206
frame_sorter.go
|
@ -45,108 +45,105 @@ func (s *frameSorter) push(data []byte, offset protocol.ByteCount, doneCb func()
|
|||
return errDuplicateStreamData
|
||||
}
|
||||
|
||||
if oldEntry, ok := s.queue[offset]; ok {
|
||||
if len(data) <= len(oldEntry.Data) {
|
||||
return errDuplicateStreamData
|
||||
}
|
||||
// The data we currently have is shorter than the new data.
|
||||
// Replace it.
|
||||
if oldEntry.DoneCb != nil {
|
||||
oldEntry.DoneCb()
|
||||
}
|
||||
s.queue[offset] = frameSorterEntry{Data: data, DoneCb: doneCb}
|
||||
}
|
||||
|
||||
start := offset
|
||||
end := offset + protocol.ByteCount(len(data))
|
||||
|
||||
// skip all gaps that are before this stream frame
|
||||
var gap *utils.ByteIntervalElement
|
||||
for gap = s.gaps.Front(); gap != nil; gap = gap.Next() {
|
||||
// the frame is a duplicate. Ignore it
|
||||
if end <= gap.Value.Start {
|
||||
if end <= s.gaps.Front().Value.Start {
|
||||
return errDuplicateStreamData
|
||||
}
|
||||
if end > gap.Value.Start && start <= gap.Value.End {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if gap == nil {
|
||||
return errors.New("StreamFrameSorter BUG: no gap found")
|
||||
|
||||
startGap, startsInGap := s.findStartGap(start)
|
||||
endGap, endsInGap := s.findEndGap(startGap, end)
|
||||
|
||||
startGapEqualsEndGap := startGap == endGap
|
||||
|
||||
if (startGapEqualsEndGap && end <= startGap.Value.Start) ||
|
||||
(!startGapEqualsEndGap && startGap.Value.End >= endGap.Value.Start && end <= startGap.Value.Start) {
|
||||
return errDuplicateStreamData
|
||||
}
|
||||
|
||||
startGapNext := startGap.Next()
|
||||
startGapEnd := startGap.Value.End // save it, in case startGap is modified
|
||||
endGapStart := endGap.Value.Start // save it, in case endGap is modified
|
||||
endGapEnd := endGap.Value.End // save it, in case endGap is modified
|
||||
var adjustedStartGapEnd bool
|
||||
var wasCut bool
|
||||
if start < gap.Value.Start {
|
||||
add := gap.Value.Start - start
|
||||
offset += add
|
||||
start += add
|
||||
data = data[add:]
|
||||
wasCut = true
|
||||
}
|
||||
|
||||
// find the highest gaps whose Start lies before the end of the frame
|
||||
endGap := gap
|
||||
for end >= endGap.Value.End {
|
||||
nextEndGap := endGap.Next()
|
||||
if nextEndGap == nil {
|
||||
return errors.New("StreamFrameSorter BUG: no end gap found")
|
||||
}
|
||||
if endGap != gap {
|
||||
s.gaps.Remove(endGap)
|
||||
}
|
||||
if end < nextEndGap.Value.Start {
|
||||
pos := start
|
||||
var hasReplacedAtLeastOne bool
|
||||
for {
|
||||
oldEntry, ok := s.queue[pos]
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
// delete queued frames completely covered by the current frame
|
||||
end := endGap.Value.End
|
||||
if end != offset {
|
||||
if cb := s.queue[end].DoneCb; cb != nil {
|
||||
cb()
|
||||
oldEntryLen := protocol.ByteCount(len(oldEntry.Data))
|
||||
if end-pos > oldEntryLen || (hasReplacedAtLeastOne && end-pos == oldEntryLen) {
|
||||
// The existing frame is shorter than the new frame. Replace it.
|
||||
delete(s.queue, pos)
|
||||
pos += oldEntryLen
|
||||
hasReplacedAtLeastOne = true
|
||||
if oldEntry.DoneCb != nil {
|
||||
oldEntry.DoneCb()
|
||||
}
|
||||
delete(s.queue, end)
|
||||
} else {
|
||||
if !hasReplacedAtLeastOne {
|
||||
return errDuplicateStreamData
|
||||
}
|
||||
// The existing frame is longer than the new frame.
|
||||
// Cut the new frame such that the end aligns with the start of the existing frame.
|
||||
data = data[:pos-start]
|
||||
end = pos
|
||||
wasCut = true
|
||||
break
|
||||
}
|
||||
endGap = nextEndGap
|
||||
}
|
||||
|
||||
if end > endGap.Value.End {
|
||||
cutLen := end - endGap.Value.End
|
||||
len := protocol.ByteCount(len(data)) - cutLen
|
||||
end -= cutLen
|
||||
data = data[:len]
|
||||
if !startsInGap && !hasReplacedAtLeastOne {
|
||||
// cut the frame, such that it starts at the start of the gap
|
||||
data = data[startGap.Value.Start-start:]
|
||||
start = startGap.Value.Start
|
||||
wasCut = true
|
||||
}
|
||||
if start <= startGap.Value.Start {
|
||||
if end >= startGap.Value.End {
|
||||
// The frame covers the whole startGap. Delete the gap.
|
||||
s.gaps.Remove(startGap)
|
||||
} else {
|
||||
startGap.Value.Start = end
|
||||
}
|
||||
} else if !hasReplacedAtLeastOne {
|
||||
startGap.Value.End = start
|
||||
adjustedStartGapEnd = true
|
||||
}
|
||||
|
||||
if start == gap.Value.Start {
|
||||
if end >= gap.Value.End {
|
||||
// the frame completely fills this gap
|
||||
// delete the gap
|
||||
if !startGapEqualsEndGap {
|
||||
s.deleteConsecutive(startGapEnd)
|
||||
var nextGap *utils.ByteIntervalElement
|
||||
for gap := startGapNext; gap.Value.End < endGapStart; gap = nextGap {
|
||||
nextGap = gap.Next()
|
||||
s.deleteConsecutive(gap.Value.End)
|
||||
s.gaps.Remove(gap)
|
||||
}
|
||||
if end < endGap.Value.End {
|
||||
// the frame covers the beginning of the gap
|
||||
// adjust the Start value to shrink the gap
|
||||
endGap.Value.Start = end
|
||||
}
|
||||
} else if end == endGap.Value.End {
|
||||
// the frame covers the end of the gap
|
||||
// adjust the End value to shrink the gap
|
||||
gap.Value.End = start
|
||||
} else {
|
||||
if gap == endGap {
|
||||
// the frame lies within the current gap, splitting it into two
|
||||
// insert a new gap and adjust the current one
|
||||
intv := utils.ByteInterval{Start: end, End: gap.Value.End}
|
||||
s.gaps.InsertAfter(intv, gap)
|
||||
gap.Value.End = start
|
||||
} else {
|
||||
gap.Value.End = start
|
||||
endGap.Value.Start = end
|
||||
}
|
||||
}
|
||||
|
||||
if s.gaps.Len() > protocol.MaxStreamFrameSorterGaps {
|
||||
return errors.New("too many gaps in received data")
|
||||
if !endsInGap && start != endGapEnd && end > endGapEnd {
|
||||
// cut the frame, such that it ends at the end of the gap
|
||||
data = data[:endGapEnd-start]
|
||||
end = endGapEnd
|
||||
wasCut = true
|
||||
}
|
||||
if end == endGapEnd {
|
||||
if !startGapEqualsEndGap {
|
||||
// The frame covers the whole endGap. Delete the gap.
|
||||
s.gaps.Remove(endGap)
|
||||
}
|
||||
} else {
|
||||
if startGapEqualsEndGap && adjustedStartGapEnd {
|
||||
// The frame split the existing gap into two.
|
||||
s.gaps.InsertAfter(utils.ByteInterval{Start: end, End: startGapEnd}, startGap)
|
||||
} else if !startGapEqualsEndGap {
|
||||
endGap.Value.Start = end
|
||||
}
|
||||
}
|
||||
|
||||
if wasCut && len(data) < protocol.MinStreamFrameBufferSize {
|
||||
|
@ -159,10 +156,54 @@ func (s *frameSorter) push(data []byte, offset protocol.ByteCount, doneCb func()
|
|||
}
|
||||
}
|
||||
|
||||
s.queue[offset] = frameSorterEntry{Data: data, DoneCb: doneCb}
|
||||
if s.gaps.Len() > protocol.MaxStreamFrameSorterGaps {
|
||||
return errors.New("too many gaps in received data")
|
||||
}
|
||||
|
||||
s.queue[start] = frameSorterEntry{Data: data, DoneCb: doneCb}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *frameSorter) findStartGap(offset protocol.ByteCount) (*utils.ByteIntervalElement, bool) {
|
||||
for gap := s.gaps.Front(); gap != nil; gap = gap.Next() {
|
||||
if offset >= gap.Value.Start && offset <= gap.Value.End {
|
||||
return gap, true
|
||||
}
|
||||
if offset < gap.Value.Start {
|
||||
return gap, false
|
||||
}
|
||||
}
|
||||
panic("no gap found")
|
||||
}
|
||||
|
||||
func (s *frameSorter) findEndGap(startGap *utils.ByteIntervalElement, offset protocol.ByteCount) (*utils.ByteIntervalElement, bool) {
|
||||
for gap := startGap; gap != nil; gap = gap.Next() {
|
||||
if offset >= gap.Value.Start && offset < gap.Value.End {
|
||||
return gap, true
|
||||
}
|
||||
if offset < gap.Value.Start {
|
||||
return gap.Prev(), false
|
||||
}
|
||||
}
|
||||
panic("no gap found")
|
||||
}
|
||||
|
||||
// deleteConsecutive deletes consecutive frames from the queue, starting at pos
|
||||
func (s *frameSorter) deleteConsecutive(pos protocol.ByteCount) {
|
||||
for {
|
||||
oldEntry, ok := s.queue[pos]
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
oldEntryLen := protocol.ByteCount(len(oldEntry.Data))
|
||||
delete(s.queue, pos)
|
||||
if oldEntry.DoneCb != nil {
|
||||
oldEntry.DoneCb()
|
||||
}
|
||||
pos += oldEntryLen
|
||||
}
|
||||
}
|
||||
|
||||
func (s *frameSorter) Pop() (protocol.ByteCount, []byte, func()) {
|
||||
entry, ok := s.queue[s.readPos]
|
||||
if !ok {
|
||||
|
@ -171,6 +212,9 @@ func (s *frameSorter) Pop() (protocol.ByteCount, []byte, func()) {
|
|||
delete(s.queue, s.readPos)
|
||||
offset := s.readPos
|
||||
s.readPos += protocol.ByteCount(len(entry.Data))
|
||||
if s.gaps.Front().Value.End <= s.readPos {
|
||||
panic("frame sorter BUG: read position higher than a gap")
|
||||
}
|
||||
return offset, entry.Data, entry.DoneCb
|
||||
}
|
||||
|
||||
|
|
1877
frame_sorter_test.go
1877
frame_sorter_test.go
File diff suppressed because it is too large
Load diff
Loading…
Add table
Add a link
Reference in a new issue