send retransmission as separate packets

This commit is contained in:
Marten Seemann 2018-02-26 21:34:38 +08:00
parent 33c2ff59e5
commit ca7291e8cf
6 changed files with 293 additions and 226 deletions

View file

@ -99,11 +99,97 @@ func (p *packetPacker) PackAckPacket() (*packedPacket, error) {
}, err
}
// PackHandshakeRetransmission retransmits a handshake packet, that was sent with less than forward-secure encryption
func (p *packetPacker) PackHandshakeRetransmission(packet *ackhandler.Packet) (*packedPacket, error) {
if packet.EncryptionLevel == protocol.EncryptionForwardSecure {
return nil, errors.New("PacketPacker BUG: forward-secure encrypted handshake packets don't need special treatment")
// PackRetransmission packs a retransmission
// For packets sent after completion of the handshake, it might happen that 2 packets have to be sent.
// This can happen e.g. when a longer packet number is used in the header.
func (p *packetPacker) PackRetransmission(packet *ackhandler.Packet) ([]*packedPacket, error) {
if packet.EncryptionLevel != protocol.EncryptionForwardSecure {
p, err := p.packHandshakeRetransmission(packet)
return []*packedPacket{p}, err
}
var controlFrames []wire.Frame
var streamFrames []*wire.StreamFrame
for _, f := range packet.Frames {
if sf, ok := f.(*wire.StreamFrame); ok {
sf.DataLenPresent = true
streamFrames = append(streamFrames, sf)
} else {
controlFrames = append(controlFrames, f)
}
}
var packets []*packedPacket
encLevel, sealer := p.cryptoSetup.GetSealer()
for len(controlFrames) > 0 || len(streamFrames) > 0 {
var frames []wire.Frame
var payloadLength protocol.ByteCount
header := p.getHeader(encLevel)
headerLength, err := header.GetLength(p.perspective, p.version)
if err != nil {
return nil, err
}
maxSize := protocol.MaxPacketSize - protocol.ByteCount(sealer.Overhead()) - headerLength
// for gQUIC: add a STOP_WAITING for *every* retransmission
if !p.version.UsesIETFFrameFormat() {
if p.stopWaiting == nil {
return nil, errors.New("PacketPacker BUG: Handshake retransmissions must contain a STOP_WAITING frame")
}
// create a new StopWaitingFrame, since we might need to send more than one packet as a retransmission
swf := &wire.StopWaitingFrame{
LeastUnacked: p.stopWaiting.LeastUnacked,
PacketNumber: header.PacketNumber,
PacketNumberLen: header.PacketNumberLen,
}
payloadLength += swf.Length(p.version)
frames = append(frames, swf)
}
for len(controlFrames) > 0 {
frame := controlFrames[0]
length := frame.Length(p.version)
if payloadLength+length > maxSize {
break
}
payloadLength += length
frames = append(frames, frame)
controlFrames = controlFrames[1:]
}
for len(streamFrames) > 0 && payloadLength+protocol.MinStreamFrameSize < maxSize {
// TODO: optimize by setting DataLenPresent = false on all but the last STREAM frame
frame := streamFrames[0]
frameToAdd := frame
sf, err := frame.MaybeSplitOffFrame(maxSize-payloadLength, p.version)
if err != nil {
return nil, err
}
if sf != nil {
frameToAdd = sf
} else {
streamFrames = streamFrames[1:]
}
payloadLength += frameToAdd.Length(p.version)
frames = append(frames, frameToAdd)
}
raw, err := p.writeAndSealPacket(header, frames, sealer)
if err != nil {
return nil, err
}
packets = append(packets, &packedPacket{
header: header,
raw: raw,
frames: frames,
encryptionLevel: encLevel,
})
}
p.stopWaiting = nil
return packets, nil
}
// packHandshakeRetransmission retransmits a handshake packet, that was sent with less than forward-secure encryption
func (p *packetPacker) packHandshakeRetransmission(packet *ackhandler.Packet) (*packedPacket, error) {
sealer, err := p.cryptoSetup.GetSealerWithEncryptionLevel(packet.EncryptionLevel)
if err != nil {
return nil, err

View file

@ -619,10 +619,11 @@ var _ = Describe("Packet packer", func() {
EncryptionLevel: protocol.EncryptionUnencrypted,
Frames: []wire.Frame{sf},
}
p, err := packer.PackHandshakeRetransmission(packet)
p, err := packer.PackRetransmission(packet)
Expect(err).ToNot(HaveOccurred())
Expect(p.frames).To(Equal([]wire.Frame{swf, sf}))
Expect(p.encryptionLevel).To(Equal(protocol.EncryptionUnencrypted))
Expect(p).To(HaveLen(1))
Expect(p[0].frames).To(Equal([]wire.Frame{swf, sf}))
Expect(p[0].encryptionLevel).To(Equal(protocol.EncryptionUnencrypted))
})
It("doesn't add a STOP_WAITING frame for IETF QUIC", func() {
@ -631,10 +632,11 @@ var _ = Describe("Packet packer", func() {
EncryptionLevel: protocol.EncryptionUnencrypted,
Frames: []wire.Frame{sf},
}
p, err := packer.PackHandshakeRetransmission(packet)
p, err := packer.PackRetransmission(packet)
Expect(err).ToNot(HaveOccurred())
Expect(p.frames).To(Equal([]wire.Frame{sf}))
Expect(p.encryptionLevel).To(Equal(protocol.EncryptionUnencrypted))
Expect(p).To(HaveLen(1))
Expect(p[0].frames).To(Equal([]wire.Frame{sf}))
Expect(p[0].encryptionLevel).To(Equal(protocol.EncryptionUnencrypted))
})
It("packs a retransmission for a packet sent with initial encryption", func() {
@ -644,13 +646,14 @@ var _ = Describe("Packet packer", func() {
EncryptionLevel: protocol.EncryptionSecure,
Frames: []wire.Frame{sf},
}
p, err := packer.PackHandshakeRetransmission(packet)
p, err := packer.PackRetransmission(packet)
Expect(err).ToNot(HaveOccurred())
Expect(p.frames).To(Equal([]wire.Frame{swf, sf}))
Expect(p.encryptionLevel).To(Equal(protocol.EncryptionSecure))
Expect(p).To(HaveLen(1))
Expect(p[0].frames).To(Equal([]wire.Frame{swf, sf}))
Expect(p[0].encryptionLevel).To(Equal(protocol.EncryptionSecure))
// a packet sent by the server with initial encryption contains the SHLO
// it needs to have a diversification nonce
Expect(p.raw).To(ContainSubstring(string(nonce)))
Expect(p[0].raw).To(ContainSubstring(string(nonce)))
})
It("includes the diversification nonce on packets sent with initial encryption", func() {
@ -658,9 +661,10 @@ var _ = Describe("Packet packer", func() {
EncryptionLevel: protocol.EncryptionSecure,
Frames: []wire.Frame{sf},
}
p, err := packer.PackHandshakeRetransmission(packet)
p, err := packer.PackRetransmission(packet)
Expect(err).ToNot(HaveOccurred())
Expect(p.encryptionLevel).To(Equal(protocol.EncryptionSecure))
Expect(p).To(HaveLen(1))
Expect(p[0].encryptionLevel).To(Equal(protocol.EncryptionSecure))
})
// this should never happen, since non forward-secure packets are limited to a size smaller than MaxPacketSize, such that it is always possible to retransmit them without splitting the StreamFrame
@ -675,7 +679,7 @@ var _ = Describe("Packet packer", func() {
},
},
}
_, err := packer.PackHandshakeRetransmission(packet)
_, err := packer.PackRetransmission(packet)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("PacketPacker BUG: packet too large"))
})
@ -708,30 +712,140 @@ var _ = Describe("Packet packer", func() {
EncryptionLevel: protocol.EncryptionUnencrypted,
Frames: []wire.Frame{sf},
}
p, err := packer.PackHandshakeRetransmission(packet)
p, err := packer.PackRetransmission(packet)
Expect(err).ToNot(HaveOccurred())
Expect(p.frames).To(Equal([]wire.Frame{sf}))
Expect(p.encryptionLevel).To(Equal(protocol.EncryptionUnencrypted))
Expect(p.header.Type).To(Equal(protocol.PacketTypeInitial))
})
It("refuses to retransmit packets that were sent with forward-secure encryption", func() {
p := &ackhandler.Packet{
EncryptionLevel: protocol.EncryptionForwardSecure,
}
_, err := packer.PackHandshakeRetransmission(p)
Expect(err).To(MatchError("PacketPacker BUG: forward-secure encrypted handshake packets don't need special treatment"))
Expect(p).To(HaveLen(1))
Expect(p[0].frames).To(Equal([]wire.Frame{sf}))
Expect(p[0].encryptionLevel).To(Equal(protocol.EncryptionUnencrypted))
Expect(p[0].header.Type).To(Equal(protocol.PacketTypeInitial))
})
It("refuses to retransmit packets without a STOP_WAITING Frame", func() {
packer.stopWaiting = nil
_, err := packer.PackHandshakeRetransmission(&ackhandler.Packet{
_, err := packer.PackRetransmission(&ackhandler.Packet{
EncryptionLevel: protocol.EncryptionSecure,
})
Expect(err).To(MatchError("PacketPacker BUG: Handshake retransmissions must contain a STOP_WAITING frame"))
})
})
Context("retransmission of forward-secure packets", func() {
BeforeEach(func() {
packer.packetNumberGenerator.next = 15
packer.stopWaiting = &wire.StopWaitingFrame{LeastUnacked: 7}
})
It("retransmits a small packet", func() {
frames := []wire.Frame{
&wire.MaxDataFrame{ByteOffset: 0x1234},
&wire.StreamFrame{StreamID: 42, Data: []byte("foobar")},
}
packets, err := packer.PackRetransmission(&ackhandler.Packet{
EncryptionLevel: protocol.EncryptionForwardSecure,
Frames: frames,
})
Expect(err).ToNot(HaveOccurred())
Expect(packets).To(HaveLen(1))
p := packets[0]
Expect(p.encryptionLevel).To(Equal(protocol.EncryptionForwardSecure))
Expect(p.frames).To(HaveLen(3))
Expect(p.frames[0]).To(BeAssignableToTypeOf(&wire.StopWaitingFrame{}))
Expect(p.frames[0].(*wire.StopWaitingFrame).LeastUnacked).To(Equal(protocol.PacketNumber(7)))
Expect(p.frames[0].(*wire.StopWaitingFrame).PacketNumber).To(Equal(p.header.PacketNumber))
Expect(p.frames[0].(*wire.StopWaitingFrame).PacketNumberLen).To(Equal(p.header.PacketNumberLen))
Expect(p.frames[1:]).To(Equal(frames))
})
It("refuses to retransmit packets without a STOP_WAITING Frame", func() {
packer.stopWaiting = nil
_, err := packer.PackRetransmission(&ackhandler.Packet{
EncryptionLevel: protocol.EncryptionForwardSecure,
Frames: []wire.Frame{&wire.MaxDataFrame{ByteOffset: 0x1234}},
})
Expect(err).To(MatchError("PacketPacker BUG: Handshake retransmissions must contain a STOP_WAITING frame"))
})
It("packs two packets for retransmission if the original packet contained many control frames", func() {
var frames []wire.Frame
var totalLen protocol.ByteCount
// pack a bunch of control frames, such that the packet is way bigger than a single packet
for i := 0; totalLen < protocol.MaxPacketSize*3/2; i++ {
f := &wire.MaxStreamDataFrame{StreamID: protocol.StreamID(i), ByteOffset: protocol.ByteCount(i)}
frames = append(frames, f)
totalLen += f.Length(packer.version)
}
packets, err := packer.PackRetransmission(&ackhandler.Packet{
EncryptionLevel: protocol.EncryptionForwardSecure,
Frames: frames,
})
Expect(err).ToNot(HaveOccurred())
Expect(packets).To(HaveLen(2))
Expect(len(packets[0].frames) + len(packets[1].frames)).To(Equal(len(frames) + 2)) // all frames, plus 2 STOP_WAITING frames
Expect(packets[0].frames[0]).To(BeAssignableToTypeOf(&wire.StopWaitingFrame{}))
Expect(packets[1].frames[0]).To(BeAssignableToTypeOf(&wire.StopWaitingFrame{}))
Expect(packets[0].frames[1:]).To(Equal(frames[:len(packets[0].frames)-1]))
Expect(packets[1].frames[1:]).To(Equal(frames[len(packets[0].frames)-1:]))
// check that the first packet was filled up as far as possible:
// if the first frame (after the STOP_WAITING) was packed into the first packet, it would have overflown the MaxPacketSize
Expect(len(packets[0].raw) + int(packets[1].frames[1].Length(packer.version))).To(BeNumerically(">", protocol.MaxPacketSize))
})
It("splits a STREAM frame that doesn't fit", func() {
packets, err := packer.PackRetransmission(&ackhandler.Packet{
EncryptionLevel: protocol.EncryptionForwardSecure,
Frames: []wire.Frame{&wire.StreamFrame{
StreamID: 42,
Offset: 1337,
Data: bytes.Repeat([]byte{'a'}, int(protocol.MaxPacketSize)*3/2),
}},
})
Expect(err).ToNot(HaveOccurred())
Expect(packets).To(HaveLen(2))
Expect(packets[0].frames[0]).To(BeAssignableToTypeOf(&wire.StopWaitingFrame{}))
Expect(packets[1].frames[0]).To(BeAssignableToTypeOf(&wire.StopWaitingFrame{}))
Expect(packets[0].frames[1]).To(BeAssignableToTypeOf(&wire.StreamFrame{}))
Expect(packets[1].frames[1]).To(BeAssignableToTypeOf(&wire.StreamFrame{}))
sf1 := packets[0].frames[1].(*wire.StreamFrame)
sf2 := packets[1].frames[1].(*wire.StreamFrame)
Expect(sf1.StreamID).To(Equal(protocol.StreamID(42)))
Expect(sf1.Offset).To(Equal(protocol.ByteCount(1337)))
Expect(sf2.StreamID).To(Equal(protocol.StreamID(42)))
Expect(sf2.Offset).To(Equal(protocol.ByteCount(1337) + sf1.DataLen()))
Expect(sf1.DataLen() + sf2.DataLen()).To(Equal(protocol.MaxPacketSize * 3 / 2))
Expect(packets[0].raw).To(HaveLen(int(protocol.MaxPacketSize)))
})
It("packs two packets for retransmission if the original packet contained many STREAM frames", func() {
var frames []wire.Frame
var totalLen protocol.ByteCount
// pack a bunch of control frames, such that the packet is way bigger than a single packet
for i := 0; totalLen < protocol.MaxPacketSize*3/2; i++ {
f := &wire.StreamFrame{
StreamID: protocol.StreamID(i),
Data: []byte("foobar"),
DataLenPresent: true,
}
frames = append(frames, f)
totalLen += f.Length(packer.version)
}
packets, err := packer.PackRetransmission(&ackhandler.Packet{
EncryptionLevel: protocol.EncryptionForwardSecure,
Frames: frames,
})
Expect(err).ToNot(HaveOccurred())
Expect(packets).To(HaveLen(2))
Expect(len(packets[0].frames) + len(packets[1].frames)).To(Equal(len(frames) + 2)) // all frames, plus 2 STOP_WAITING frames
Expect(packets[0].frames[0]).To(BeAssignableToTypeOf(&wire.StopWaitingFrame{}))
Expect(packets[1].frames[0]).To(BeAssignableToTypeOf(&wire.StopWaitingFrame{}))
Expect(packets[0].frames[1:]).To(Equal(frames[:len(packets[0].frames)-1]))
Expect(packets[1].frames[1:]).To(Equal(frames[len(packets[0].frames)-1:]))
// check that the first packet was filled up as far as possible:
// if the first frame (after the STOP_WAITING) was packed into the first packet, it would have overflown the MaxPacketSize
Expect(len(packets[0].raw) + int(packets[1].frames[1].Length(packer.version))).To(BeNumerically(">", protocol.MaxPacketSize-protocol.MinStreamFrameSize))
})
})
Context("packing ACK packets", func() {
It("packs ACK packets", func() {
packer.QueueControlFrame(&wire.AckFrame{})

View file

@ -838,39 +838,30 @@ func (s *session) sendPacket() (bool, error) {
continue
}
// retransmit handshake packets
// retransmit packets
if retransmitPacket.EncryptionLevel != protocol.EncryptionForwardSecure {
utils.Debugf("\tDequeueing handshake retransmission for packet 0x%x", retransmitPacket.PacketNumber)
if !s.version.UsesIETFFrameFormat() {
s.packer.QueueControlFrame(s.sentPacketHandler.GetStopWaitingFrame(true))
}
packet, err := s.packer.PackHandshakeRetransmission(retransmitPacket)
if err != nil {
return false, err
}
} else {
utils.Debugf("\tDequeueing retransmission for packet 0x%x", retransmitPacket.PacketNumber)
}
if !s.version.UsesIETFFrameFormat() {
s.packer.QueueControlFrame(s.sentPacketHandler.GetStopWaitingFrame(true))
}
packets, err := s.packer.PackRetransmission(retransmitPacket)
if err != nil {
return false, err
}
for _, packet := range packets {
if err := s.sendPackedPacket(packet); err != nil {
return false, err
}
return true, nil
}
// queue all retransmittable frames sent in forward-secure packets
utils.Debugf("\tDequeueing retransmission for packet 0x%x", retransmitPacket.PacketNumber)
// resend the frames that were in the packet
for _, frame := range retransmitPacket.GetFramesForRetransmission() {
// TODO: only retransmit WINDOW_UPDATEs if they actually enlarge the window
switch f := frame.(type) {
case *wire.StreamFrame:
s.streamFramer.AddFrameForRetransmission(f)
default:
s.packer.QueueControlFrame(frame)
}
}
return true, nil
}
hasRetransmission := s.streamFramer.HasFramesForRetransmission()
if !s.version.UsesIETFFrameFormat() && (ack != nil || hasRetransmission) {
if swf := s.sentPacketHandler.GetStopWaitingFrame(hasRetransmission); swf != nil {
if !s.version.UsesIETFFrameFormat() && ack != nil {
if swf := s.sentPacketHandler.GetStopWaitingFrame(false); swf != nil {
s.packer.QueueControlFrame(swf)
}
}

View file

@ -736,8 +736,11 @@ var _ = Describe("Session", func() {
var sentPacket *ackhandler.Packet
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().GetLeastUnacked().AnyTimes()
sph.EXPECT().GetStopWaitingFrame(gomock.Any())
sph.EXPECT().DequeuePacketForRetransmission()
sph.EXPECT().GetStopWaitingFrame(gomock.Any()).Return(&wire.StopWaitingFrame{})
sph.EXPECT().DequeuePacketForRetransmission().Return(&ackhandler.Packet{
EncryptionLevel: protocol.EncryptionForwardSecure,
Frames: []wire.Frame{f},
})
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {
sentPacket = p
})
@ -745,7 +748,6 @@ var _ = Describe("Session", func() {
sess.packer.packetNumberGenerator.next = 0x1337 + 9
sess.packer.cryptoSetup = &mockCryptoSetup{encLevelSeal: protocol.EncryptionForwardSecure}
sess.streamFramer.AddFrameForRetransmission(f)
sent, err := sess.sendPacket()
Expect(err).NotTo(HaveOccurred())
Expect(sent).To(BeTrue())
@ -1021,9 +1023,10 @@ var _ = Describe("Session", func() {
Frames: []wire.Frame{f},
EncryptionLevel: protocol.EncryptionForwardSecure,
})
sph.EXPECT().DequeuePacketForRetransmission()
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {
Expect(p.Frames).To(Equal([]wire.Frame{swf, f}))
Expect(p.Frames).To(HaveLen(2))
Expect(p.Frames[0]).To(BeAssignableToTypeOf(&wire.StopWaitingFrame{}))
Expect(p.Frames[1]).To(Equal(f))
Expect(p.EncryptionLevel).To(Equal(protocol.EncryptionForwardSecure))
})
sent, err := sess.sendPacket()
@ -1043,7 +1046,6 @@ var _ = Describe("Session", func() {
Frames: []wire.Frame{f},
EncryptionLevel: protocol.EncryptionForwardSecure,
})
sph.EXPECT().DequeuePacketForRetransmission()
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {
Expect(p.Frames).To(Equal([]wire.Frame{f}))
Expect(p.EncryptionLevel).To(Equal(protocol.EncryptionForwardSecure))
@ -1054,39 +1056,26 @@ var _ = Describe("Session", func() {
Expect(mconn.written).To(HaveLen(1))
})
It("sends a STREAM frame from a packet queued for retransmission", func() {
f1 := wire.StreamFrame{
It("sends multiple packets, if the retransmission is split", func() {
sess.version = versionIETFFrames
sess.packer.version = versionIETFFrames
f := &wire.StreamFrame{
StreamID: 0x5,
Data: []byte("foobar"),
Data: bytes.Repeat([]byte{'b'}, int(protocol.MaxPacketSize)*3/2),
}
f2 := wire.StreamFrame{
StreamID: 0x7,
Data: []byte("loremipsum"),
}
p1 := &ackhandler.Packet{
PacketNumber: 0x1337,
Frames: []wire.Frame{&f1},
sph.EXPECT().DequeuePacketForRetransmission().Return(&ackhandler.Packet{
Frames: []wire.Frame{f},
EncryptionLevel: protocol.EncryptionForwardSecure,
}
p2 := &ackhandler.Packet{
PacketNumber: 0x1338,
Frames: []wire.Frame{&f2},
EncryptionLevel: protocol.EncryptionForwardSecure,
}
sph.EXPECT().DequeuePacketForRetransmission().Return(p1)
sph.EXPECT().DequeuePacketForRetransmission().Return(p2)
sph.EXPECT().DequeuePacketForRetransmission()
sph.EXPECT().GetStopWaitingFrame(true).Return(&wire.StopWaitingFrame{})
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {
Expect(p.Frames).To(HaveLen(3))
})
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {
Expect(p.Frames).To(HaveLen(1))
Expect(p.Frames[0]).To(BeAssignableToTypeOf(&wire.StreamFrame{}))
Expect(p.EncryptionLevel).To(Equal(protocol.EncryptionForwardSecure))
}).Times(2)
sent, err := sess.sendPacket()
Expect(err).NotTo(HaveOccurred())
Expect(sent).To(BeTrue())
Expect(mconn.written).To(HaveLen(1))
packet := <-mconn.written
Expect(packet).To(ContainSubstring("foobar"))
Expect(packet).To(ContainSubstring("loremipsum"))
Expect(mconn.written).To(HaveLen(2))
})
})
})
@ -1126,16 +1115,32 @@ var _ = Describe("Session", func() {
})
It("sends when scheduleSending is called", func() {
sess.packer.packetNumberGenerator.next = 10000
f := &wire.StreamFrame{
StreamID: 0x5,
Data: []byte("foobar"),
}
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().GetAlarmTimeout().AnyTimes()
sph.EXPECT().TimeUntilSend().AnyTimes()
sph.EXPECT().SendingAllowed().AnyTimes().Return(true)
sph.EXPECT().ShouldSendNumPackets().AnyTimes().Return(1)
sph.EXPECT().GetLeastUnacked().AnyTimes()
sph.EXPECT().GetStopWaitingFrame(true).Return(&wire.StopWaitingFrame{LeastUnacked: 10})
sph.EXPECT().DequeuePacketForRetransmission().Return(&ackhandler.Packet{
PacketNumber: 0x1337,
Frames: []wire.Frame{f},
EncryptionLevel: protocol.EncryptionForwardSecure,
})
sph.EXPECT().SentPacket(gomock.Any())
sess.sentPacketHandler = sph
done := make(chan struct{})
go func() {
defer GinkgoRecover()
sess.run()
close(done)
}()
sess.streamFramer.AddFrameForRetransmission(&wire.StreamFrame{
StreamID: 5,
Data: []byte("foobar"),
})
Consistently(mconn.written).ShouldNot(Receive())
sess.scheduleSending()
Eventually(mconn.written).Should(Receive())

View file

@ -12,8 +12,6 @@ type streamFramer struct {
cryptoStream cryptoStreamI
version protocol.VersionNumber
retransmissionQueue []*wire.StreamFrame
streamQueueMutex sync.Mutex
activeStreams map[protocol.StreamID]struct{}
streamQueue []protocol.StreamID
@ -33,10 +31,6 @@ func newStreamFramer(
}
}
func (f *streamFramer) AddFrameForRetransmission(frame *wire.StreamFrame) {
f.retransmissionQueue = append(f.retransmissionQueue, frame)
}
func (f *streamFramer) AddActiveStream(id protocol.StreamID) {
if id == f.version.CryptoStreamID() { // the crypto stream is handled separately
f.streamQueueMutex.Lock()
@ -52,15 +46,6 @@ func (f *streamFramer) AddActiveStream(id protocol.StreamID) {
f.streamQueueMutex.Unlock()
}
func (f *streamFramer) PopStreamFrames(maxLen protocol.ByteCount) []*wire.StreamFrame {
fs, currentLen := f.maybePopFramesForRetransmission(maxLen)
return append(fs, f.maybePopNormalFrames(maxLen-currentLen)...)
}
func (f *streamFramer) HasFramesForRetransmission() bool {
return len(f.retransmissionQueue) > 0
}
func (f *streamFramer) HasCryptoStreamData() bool {
f.streamQueueMutex.Lock()
hasCryptoStreamData := f.hasCryptoStreamData
@ -76,34 +61,7 @@ func (f *streamFramer) PopCryptoStreamFrame(maxLen protocol.ByteCount) *wire.Str
return frame
}
func (f *streamFramer) maybePopFramesForRetransmission(maxTotalLen protocol.ByteCount) (res []*wire.StreamFrame, currentLen protocol.ByteCount) {
for len(f.retransmissionQueue) > 0 {
frame := f.retransmissionQueue[0]
frame.DataLenPresent = true
maxLen := maxTotalLen - currentLen
if frame.Length(f.version) > maxLen && maxLen < protocol.MinStreamFrameSize {
break
}
splitFrame, err := frame.MaybeSplitOffFrame(maxLen, f.version)
if err != nil { // maxLen is too small. Can't split frame
break
}
if splitFrame != nil { // frame was split
res = append(res, splitFrame)
currentLen += splitFrame.Length(f.version)
break
}
f.retransmissionQueue = f.retransmissionQueue[1:]
res = append(res, frame)
currentLen += frame.Length(f.version)
}
return
}
func (f *streamFramer) maybePopNormalFrames(maxTotalLen protocol.ByteCount) []*wire.StreamFrame {
func (f *streamFramer) PopStreamFrames(maxTotalLen protocol.ByteCount) []*wire.StreamFrame {
var currentLen protocol.ByteCount
var frames []*wire.StreamFrame
f.streamQueueMutex.Lock()

View file

@ -18,24 +18,14 @@ var _ = Describe("Stream Framer", func() {
)
var (
retransmittedFrame1, retransmittedFrame2 *wire.StreamFrame
framer *streamFramer
cryptoStream *MockCryptoStream
stream1, stream2 *MockSendStreamI
streamGetter *MockStreamGetter
framer *streamFramer
cryptoStream *MockCryptoStream
stream1, stream2 *MockSendStreamI
streamGetter *MockStreamGetter
)
BeforeEach(func() {
streamGetter = NewMockStreamGetter(mockCtrl)
retransmittedFrame1 = &wire.StreamFrame{
StreamID: 5,
Data: []byte{0x13, 0x37},
}
retransmittedFrame2 = &wire.StreamFrame{
StreamID: 6,
Data: []byte{0xDE, 0xCA, 0xFB, 0xAD},
}
stream1 = NewMockSendStreamI(mockCtrl)
stream1.EXPECT().StreamID().Return(protocol.StreamID(5)).AnyTimes()
stream2 = NewMockSendStreamI(mockCtrl)
@ -44,19 +34,6 @@ var _ = Describe("Stream Framer", func() {
framer = newStreamFramer(cryptoStream, streamGetter, versionGQUICFrames)
})
It("says if it has retransmissions", func() {
Expect(framer.HasFramesForRetransmission()).To(BeFalse())
framer.AddFrameForRetransmission(retransmittedFrame1)
Expect(framer.HasFramesForRetransmission()).To(BeTrue())
})
It("sets the DataLenPresent for dequeued retransmitted frames", func() {
framer.AddFrameForRetransmission(retransmittedFrame1)
fs := framer.PopStreamFrames(protocol.MaxByteCount)
Expect(fs).To(HaveLen(1))
Expect(fs[0].DataLenPresent).To(BeTrue())
})
Context("handling the crypto stream", func() {
It("says if it has crypto stream data", func() {
Expect(framer.HasCryptoStreamData()).To(BeFalse())
@ -94,37 +71,7 @@ var _ = Describe("Stream Framer", func() {
Expect(framer.PopStreamFrames(1000)).To(BeEmpty())
})
It("pops frames for retransmission", func() {
framer.AddFrameForRetransmission(retransmittedFrame1)
framer.AddFrameForRetransmission(retransmittedFrame2)
fs := framer.PopStreamFrames(1000)
Expect(fs).To(Equal([]*wire.StreamFrame{retransmittedFrame1, retransmittedFrame2}))
// make sure the frames are actually removed, and not returned a second time
Expect(framer.PopStreamFrames(1000)).To(BeEmpty())
})
It("doesn't pop frames for retransmission, if the size would be smaller than the minimum STREAM frame size", func() {
framer.AddFrameForRetransmission(&wire.StreamFrame{
StreamID: id1,
Data: bytes.Repeat([]byte{'a'}, int(protocol.MinStreamFrameSize)),
})
fs := framer.PopStreamFrames(protocol.MinStreamFrameSize - 1)
Expect(fs).To(BeEmpty())
})
It("pops frames for retransmission, even if the remaining space in the packet is too small, if the frame doesn't need to be split", func() {
framer.AddFrameForRetransmission(retransmittedFrame1)
fs := framer.PopStreamFrames(protocol.MinStreamFrameSize - 1)
Expect(fs).To(Equal([]*wire.StreamFrame{retransmittedFrame1}))
})
It("pops frames for retransmission, if the remaining size is the miniumum STREAM frame size", func() {
framer.AddFrameForRetransmission(retransmittedFrame1)
fs := framer.PopStreamFrames(protocol.MinStreamFrameSize)
Expect(fs).To(Equal([]*wire.StreamFrame{retransmittedFrame1}))
})
It("returns normal frames", func() {
It("returns STREAM frames", func() {
streamGetter.EXPECT().GetOrOpenSendStream(id1).Return(stream1, nil)
f := &wire.StreamFrame{
StreamID: id1,
@ -227,16 +174,6 @@ var _ = Describe("Stream Framer", func() {
Expect(framer.PopStreamFrames(1000)).To(HaveLen(1))
})
It("returns retransmission frames before normal frames", func() {
streamGetter.EXPECT().GetOrOpenSendStream(id1).Return(stream1, nil)
framer.AddActiveStream(id1)
f1 := &wire.StreamFrame{Data: []byte("foobar")}
stream1.EXPECT().popStreamFrame(gomock.Any()).Return(f1, false)
framer.AddFrameForRetransmission(retransmittedFrame1)
fs := framer.PopStreamFrames(1000)
Expect(fs).To(Equal([]*wire.StreamFrame{retransmittedFrame1, f1}))
})
It("does not pop empty frames", func() {
fs := framer.PopStreamFrames(500)
Expect(fs).To(BeEmpty())
@ -266,29 +203,5 @@ var _ = Describe("Stream Framer", func() {
fs := framer.PopStreamFrames(500)
Expect(fs).To(Equal([]*wire.StreamFrame{f}))
})
Context("splitting of frames", func() {
It("splits a frame", func() {
framer.AddFrameForRetransmission(&wire.StreamFrame{Data: make([]byte, 600)})
fs := framer.PopStreamFrames(500)
Expect(fs).To(HaveLen(1))
Expect(fs[0].Length(framer.version)).To(Equal(protocol.ByteCount(500)))
Expect(framer.retransmissionQueue[0].Data).To(HaveLen(int(600 - fs[0].DataLen())))
Expect(framer.retransmissionQueue[0].Offset).To(Equal(fs[0].DataLen()))
})
It("only removes a frame from the framer after returning all split parts", func() {
frameHeaderLen := protocol.ByteCount(4)
frame := &wire.StreamFrame{Data: bytes.Repeat([]byte{0}, int(501-frameHeaderLen))}
framer.AddFrameForRetransmission(frame)
fs := framer.PopStreamFrames(500)
Expect(fs).To(HaveLen(1))
Expect(framer.retransmissionQueue).ToNot(BeEmpty())
fs = framer.PopStreamFrames(500)
Expect(fs).To(HaveLen(1))
Expect(fs[0].DataLen()).To(BeEquivalentTo(1))
Expect(framer.retransmissionQueue).To(BeEmpty())
})
})
})
})