mirror of
https://github.com/refraction-networking/uquic.git
synced 2025-04-03 20:27:35 +03:00
change streamFramer to pop as many stream frames as possible at once
ref #217
This commit is contained in:
parent
6f657c02e4
commit
7adf760fa4
4 changed files with 164 additions and 259 deletions
|
@ -89,11 +89,9 @@ func (f *streamFramer) EstimatedDataLen() protocol.ByteCount {
|
|||
return l
|
||||
}
|
||||
|
||||
func (f *streamFramer) PopStreamFrame(maxLen protocol.ByteCount) (*frames.StreamFrame, error) {
|
||||
if frame := f.maybePopFrameForRetransmission(maxLen); frame != nil {
|
||||
return frame, nil
|
||||
}
|
||||
return f.maybePopNormalFrame(maxLen)
|
||||
func (f *streamFramer) PopStreamFrames(maxLen protocol.ByteCount) []*frames.StreamFrame {
|
||||
fs, currentLen := f.maybePopFramesForRetransmission(maxLen)
|
||||
return append(fs, f.maybePopNormalFrames(maxLen-currentLen)...)
|
||||
}
|
||||
|
||||
func (f *streamFramer) PopBlockedFrame() *frames.BlockedFrame {
|
||||
|
@ -105,32 +103,39 @@ func (f *streamFramer) PopBlockedFrame() *frames.BlockedFrame {
|
|||
return frame
|
||||
}
|
||||
|
||||
func (f *streamFramer) maybePopFrameForRetransmission(maxLen protocol.ByteCount) *frames.StreamFrame {
|
||||
if len(f.retransmissionQueue) == 0 {
|
||||
return nil
|
||||
func (f *streamFramer) maybePopFramesForRetransmission(maxLen protocol.ByteCount) (res []*frames.StreamFrame, currentLen protocol.ByteCount) {
|
||||
for len(f.retransmissionQueue) > 0 {
|
||||
frame := f.retransmissionQueue[0]
|
||||
frame.DataLenPresent = true
|
||||
|
||||
frameHeaderLen, _ := frame.MinLength(protocol.VersionWhatever) // can never error
|
||||
if currentLen+frameHeaderLen > maxLen {
|
||||
break
|
||||
}
|
||||
|
||||
currentLen += frameHeaderLen
|
||||
|
||||
splitFrame := maybeSplitOffFrame(frame, maxLen-currentLen)
|
||||
if splitFrame != nil { // StreamFrame was split
|
||||
res = append(res, splitFrame)
|
||||
currentLen += splitFrame.DataLen()
|
||||
break
|
||||
}
|
||||
|
||||
f.retransmissionQueue = f.retransmissionQueue[1:]
|
||||
res = append(res, frame)
|
||||
currentLen += frame.DataLen()
|
||||
}
|
||||
|
||||
frame := f.retransmissionQueue[0]
|
||||
frame.DataLenPresent = true
|
||||
|
||||
frameHeaderLen, _ := frame.MinLength(protocol.VersionWhatever) // can never error
|
||||
if maxLen < frameHeaderLen {
|
||||
return nil
|
||||
}
|
||||
|
||||
splitFrame := maybeSplitOffFrame(frame, maxLen-frameHeaderLen)
|
||||
if splitFrame != nil { // StreamFrame was split
|
||||
return splitFrame
|
||||
}
|
||||
|
||||
f.retransmissionQueue = f.retransmissionQueue[1:]
|
||||
return frame
|
||||
return
|
||||
}
|
||||
|
||||
func (f *streamFramer) maybePopNormalFrame(maxBytes protocol.ByteCount) (*frames.StreamFrame, error) {
|
||||
frame := &frames.StreamFrame{DataLenPresent: true}
|
||||
func (f *streamFramer) maybePopNormalFrames(maxBytes protocol.ByteCount) (res []*frames.StreamFrame) {
|
||||
f.streamsMutex.RLock()
|
||||
defer f.streamsMutex.RUnlock()
|
||||
|
||||
frame := &frames.StreamFrame{DataLenPresent: true}
|
||||
var currentLen protocol.ByteCount
|
||||
|
||||
for _, s := range *f.streams {
|
||||
if s == nil {
|
||||
continue
|
||||
|
@ -140,16 +145,13 @@ func (f *streamFramer) maybePopNormalFrame(maxBytes protocol.ByteCount) (*frames
|
|||
// not perfect, but thread-safe since writeOffset is only written when getting data
|
||||
frame.Offset = s.writeOffset
|
||||
frameHeaderBytes, _ := frame.MinLength(protocol.VersionWhatever) // can never error
|
||||
if maxBytes < frameHeaderBytes {
|
||||
continue
|
||||
if currentLen+frameHeaderBytes > maxBytes {
|
||||
return // theoretically, we could find another stream that fits, but this is quite unlikely, so we stop here
|
||||
}
|
||||
maxLen := maxBytes - frameHeaderBytes
|
||||
maxLen := maxBytes - currentLen - frameHeaderBytes
|
||||
|
||||
if s.lenOfDataForWriting() != 0 {
|
||||
fcAllowance, err := f.getFCAllowanceForStream(s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fcAllowance, _ := f.getFCAllowanceForStream(s) // can never error
|
||||
maxLen = utils.MinByteCount(maxLen, fcAllowance)
|
||||
}
|
||||
|
||||
|
@ -162,15 +164,15 @@ func (f *streamFramer) maybePopNormalFrame(maxBytes protocol.ByteCount) (*frames
|
|||
if s.shouldSendFin() {
|
||||
frame.FinBit = true
|
||||
s.sentFin()
|
||||
return frame, nil
|
||||
res = append(res, frame)
|
||||
currentLen += frameHeaderBytes + frame.DataLen()
|
||||
frame = &frames.StreamFrame{DataLenPresent: true}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
frame.Data = data
|
||||
if err := f.flowControlManager.AddBytesSent(s.streamID, protocol.ByteCount(len(data))); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
f.flowControlManager.AddBytesSent(s.streamID, protocol.ByteCount(len(data)))
|
||||
|
||||
// Finally, check if we are now FC blocked and should queue a BLOCKED frame
|
||||
individualFcOffset, _ := f.flowControlManager.SendWindowSize(s.streamID) // can never error
|
||||
|
@ -183,9 +185,11 @@ func (f *streamFramer) maybePopNormalFrame(maxBytes protocol.ByteCount) (*frames
|
|||
f.blockedFrameQueue = append(f.blockedFrameQueue, &frames.BlockedFrame{StreamID: 0})
|
||||
}
|
||||
|
||||
return frame, nil
|
||||
res = append(res, frame)
|
||||
currentLen += frameHeaderBytes + frame.DataLen()
|
||||
frame = &frames.StreamFrame{DataLenPresent: true}
|
||||
}
|
||||
return nil, nil
|
||||
return
|
||||
}
|
||||
|
||||
func (f *streamFramer) getFCAllowanceForStream(s *stream) (protocol.ByteCount, error) {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue