uquic/framer.go
Marten Seemann c0286b0c2e fix calculation of the length of appended STREAM frames
For the last STREAM frame we omit the Length field. When packing STREAM
frames, we need to account for this byte saving when calculating the
length of the payload.
2019-08-10 14:55:27 +07:00

124 lines
3.7 KiB
Go

package quic
import (
"sync"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/utils"
"github.com/lucas-clemente/quic-go/internal/wire"
)
type framer interface {
QueueControlFrame(wire.Frame)
AppendControlFrames([]wire.Frame, protocol.ByteCount) ([]wire.Frame, protocol.ByteCount)
AddActiveStream(protocol.StreamID)
AppendStreamFrames([]wire.Frame, protocol.ByteCount) ([]wire.Frame, protocol.ByteCount)
}
type framerI struct {
mutex sync.Mutex
streamGetter streamGetter
version protocol.VersionNumber
activeStreams map[protocol.StreamID]struct{}
streamQueue []protocol.StreamID
controlFrameMutex sync.Mutex
controlFrames []wire.Frame
}
var _ framer = &framerI{}
func newFramer(
streamGetter streamGetter,
v protocol.VersionNumber,
) framer {
return &framerI{
streamGetter: streamGetter,
activeStreams: make(map[protocol.StreamID]struct{}),
version: v,
}
}
func (f *framerI) QueueControlFrame(frame wire.Frame) {
f.controlFrameMutex.Lock()
f.controlFrames = append(f.controlFrames, frame)
f.controlFrameMutex.Unlock()
}
func (f *framerI) AppendControlFrames(frames []wire.Frame, maxLen protocol.ByteCount) ([]wire.Frame, protocol.ByteCount) {
var length protocol.ByteCount
f.controlFrameMutex.Lock()
for len(f.controlFrames) > 0 {
frame := f.controlFrames[len(f.controlFrames)-1]
frameLen := frame.Length(f.version)
if length+frameLen > maxLen {
break
}
frames = append(frames, frame)
length += frameLen
f.controlFrames = f.controlFrames[:len(f.controlFrames)-1]
}
f.controlFrameMutex.Unlock()
return frames, length
}
func (f *framerI) AddActiveStream(id protocol.StreamID) {
f.mutex.Lock()
if _, ok := f.activeStreams[id]; !ok {
f.streamQueue = append(f.streamQueue, id)
f.activeStreams[id] = struct{}{}
}
f.mutex.Unlock()
}
func (f *framerI) AppendStreamFrames(frames []wire.Frame, maxLen protocol.ByteCount) ([]wire.Frame, protocol.ByteCount) {
var length protocol.ByteCount
var frameAdded bool
f.mutex.Lock()
// pop STREAM frames, until less than MinStreamFrameSize bytes are left in the packet
numActiveStreams := len(f.streamQueue)
for i := 0; i < numActiveStreams; i++ {
if protocol.MinStreamFrameSize+length > maxLen {
break
}
id := f.streamQueue[0]
f.streamQueue = f.streamQueue[1:]
// This should never return an error. Better check it anyway.
// The stream will only be in the streamQueue, if it enqueued itself there.
str, err := f.streamGetter.GetOrOpenSendStream(id)
// The stream can be nil if it completed after it said it had data.
if str == nil || err != nil {
delete(f.activeStreams, id)
continue
}
remainingLen := maxLen - length
// For the last STREAM frame, we'll remove the DataLen field later.
// Therefore, we can pretend to have more bytes avaibalbe when popping
// the STREAM frame (which will always have the DataLen set).
remainingLen += utils.VarIntLen(uint64(remainingLen))
frame, hasMoreData := str.popStreamFrame(remainingLen)
if hasMoreData { // put the stream back in the queue (at the end)
f.streamQueue = append(f.streamQueue, id)
} else { // no more data to send. Stream is not active any more
delete(f.activeStreams, id)
}
if frame == nil { // can happen if the receiveStream was canceled after it said it had data
continue
}
frames = append(frames, frame)
length += frame.Length(f.version)
frameAdded = true
}
f.mutex.Unlock()
if frameAdded {
lastFrame := frames[len(frames)-1].(*wire.StreamFrame)
lastFrameLen := lastFrame.Length(f.version)
// acount for the smaller size of the last STREAM frame
lastFrame.DataLenPresent = false
length += lastFrame.Length(f.version) - lastFrameLen
}
return frames, length
}