move frame parsing to the session

This way, we can handle every parsed frame directly, and avoid
allocating a slice of parsed frames.
This commit is contained in:
Marten Seemann 2019-01-01 16:04:38 +07:00
parent 05645b546c
commit 855b643c7c
4 changed files with 152 additions and 136 deletions

View file

@ -6,7 +6,6 @@ import (
"github.com/lucas-clemente/quic-go/internal/handshake"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/qerr"
"github.com/lucas-clemente/quic-go/internal/utils"
"github.com/lucas-clemente/quic-go/internal/wire"
)
@ -15,7 +14,7 @@ type unpackedPacket struct {
packetNumber protocol.PacketNumber // the decoded packet number
hdr *wire.ExtendedHeader
encryptionLevel protocol.EncryptionLevel
frames []wire.Frame
data []byte
}
// The packetUnpacker unpacks QUIC packets.
@ -94,36 +93,10 @@ func (u *packetUnpacker) Unpack(hdr *wire.Header, data []byte) (*unpackedPacket,
// Only do this after decrypting, so we are sure the packet is not attacker-controlled
u.largestRcvdPacketNumber = utils.MaxPacketNumber(u.largestRcvdPacketNumber, pn)
fs, err := u.parseFrames(decrypted)
if err != nil {
return nil, err
}
return &unpackedPacket{
hdr: extHdr,
packetNumber: pn,
encryptionLevel: encLevel,
frames: fs,
data: decrypted,
}, nil
}
func (u *packetUnpacker) parseFrames(decrypted []byte) ([]wire.Frame, error) {
r := bytes.NewReader(decrypted)
if r.Len() == 0 {
return nil, qerr.MissingPayload
}
fs := make([]wire.Frame, 0, 2)
// Read all frames in the packet
for {
frame, err := wire.ParseNextFrame(r, u.version)
if err != nil {
return nil, err
}
if frame == nil {
break
}
fs = append(fs, frame)
}
return fs, nil
}

View file

@ -8,7 +8,6 @@ import (
"github.com/lucas-clemente/quic-go/internal/handshake"
"github.com/lucas-clemente/quic-go/internal/mocks"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/qerr"
"github.com/lucas-clemente/quic-go/internal/wire"
. "github.com/onsi/ginkgo"
@ -37,22 +36,6 @@ var _ = Describe("Packet Unpacker", func() {
unpacker = newPacketUnpacker(cs, version).(*packetUnpacker)
})
It("errors if the packet doesn't contain any payload", func() {
extHdr := &wire.ExtendedHeader{
Header: wire.Header{DestConnectionID: connID},
PacketNumber: 42,
PacketNumberLen: protocol.PacketNumberLen2,
}
hdr, hdrRaw := getHeader(extHdr)
// return an empty (unencrypted) payload
opener := mocks.NewMockOpener(mockCtrl)
cs.EXPECT().GetOpener(protocol.Encryption1RTT).Return(opener, nil)
opener.EXPECT().DecryptHeader(gomock.Any(), gomock.Any(), gomock.Any())
opener.EXPECT().Open(gomock.Any(), payload, extHdr.PacketNumber, hdrRaw).Return([]byte{}, nil)
_, err := unpacker.Unpack(hdr, append(hdrRaw, payload...))
Expect(err).To(MatchError(qerr.MissingPayload))
})
It("errors when the packet is too small to obtain the header decryption sample", func() {
extHdr := &wire.ExtendedHeader{
Header: wire.Header{DestConnectionID: connID},
@ -83,10 +66,11 @@ var _ = Describe("Packet Unpacker", func() {
opener := mocks.NewMockOpener(mockCtrl)
cs.EXPECT().GetOpener(protocol.EncryptionInitial).Return(opener, nil)
opener.EXPECT().DecryptHeader(gomock.Any(), gomock.Any(), gomock.Any())
opener.EXPECT().Open(gomock.Any(), payload, extHdr.PacketNumber, hdrRaw).Return([]byte{0}, nil)
opener.EXPECT().Open(gomock.Any(), payload, extHdr.PacketNumber, hdrRaw).Return([]byte("decrypted"), nil)
packet, err := unpacker.Unpack(hdr, append(hdrRaw, payload...))
Expect(err).ToNot(HaveOccurred())
Expect(packet.encryptionLevel).To(Equal(protocol.EncryptionInitial))
Expect(packet.data).To(Equal([]byte("decrypted")))
})
It("returns the error when getting the sealer fails", func() {
@ -193,23 +177,4 @@ var _ = Describe("Packet Unpacker", func() {
Expect(err).ToNot(HaveOccurred())
Expect(packet.packetNumber).To(Equal(protocol.PacketNumber(0x1338)))
})
It("unpacks the frames", func() {
extHdr := &wire.ExtendedHeader{
Header: wire.Header{DestConnectionID: connID},
PacketNumber: 0x1337,
PacketNumberLen: 2,
}
buf := &bytes.Buffer{}
(&wire.PingFrame{}).Write(buf, protocol.VersionWhatever)
(&wire.DataBlockedFrame{}).Write(buf, protocol.VersionWhatever)
hdr, hdrRaw := getHeader(extHdr)
opener := mocks.NewMockOpener(mockCtrl)
opener.EXPECT().DecryptHeader(gomock.Any(), gomock.Any(), gomock.Any())
cs.EXPECT().GetOpener(protocol.Encryption1RTT).Return(opener, nil)
opener.EXPECT().Open(gomock.Any(), gomock.Any(), extHdr.PacketNumber, hdrRaw).Return(buf.Bytes(), nil)
packet, err := unpacker.Unpack(hdr, append(hdrRaw, payload...))
Expect(err).ToNot(HaveOccurred())
Expect(packet.frames).To(Equal([]wire.Frame{&wire.PingFrame{}, &wire.DataBlockedFrame{}}))
})
})

View file

@ -1,12 +1,14 @@
package quic
import (
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net"
"reflect"
"sync"
"time"
@ -519,6 +521,10 @@ func (s *session) handlePacketImpl(p *receivedPacket) bool /* was the packet suc
}
func (s *session) handleUnpackedPacket(packet *unpackedPacket, rcvTime time.Time) error {
if len(packet.data) == 0 {
return qerr.MissingPayload
}
// The server can change the source connection ID with the first Handshake packet.
if s.perspective == protocol.PerspectiveClient && !s.receivedFirstPacket && packet.hdr.IsLongHeader && !packet.hdr.SrcConnectionID.Equal(s.destConnID) {
s.logger.Debugf("Received first packet. Switching destination connection ID to: %s", packet.hdr.SrcConnectionID)
@ -539,62 +545,72 @@ func (s *session) handleUnpackedPacket(packet *unpackedPacket, rcvTime time.Time
}
}
isRetransmittable := ackhandler.HasRetransmittableFrames(packet.frames)
if err := s.receivedPacketHandler.ReceivedPacket(packet.packetNumber, rcvTime, isRetransmittable); err != nil {
return err
}
return s.handleFrames(packet.frames, packet.packetNumber, packet.encryptionLevel)
}
func (s *session) handleFrames(fs []wire.Frame, pn protocol.PacketNumber, encLevel protocol.EncryptionLevel) error {
for _, ff := range fs {
var err error
wire.LogFrame(s.logger, ff, false)
switch frame := ff.(type) {
case *wire.CryptoFrame:
err = s.handleCryptoFrame(frame, encLevel)
case *wire.StreamFrame:
err = s.handleStreamFrame(frame, encLevel)
case *wire.AckFrame:
err = s.handleAckFrame(frame, pn, encLevel)
case *wire.ConnectionCloseFrame:
s.closeRemote(qerr.Error(frame.ErrorCode, frame.ReasonPhrase))
case *wire.ResetStreamFrame:
err = s.handleResetStreamFrame(frame)
case *wire.MaxDataFrame:
s.handleMaxDataFrame(frame)
case *wire.MaxStreamDataFrame:
err = s.handleMaxStreamDataFrame(frame)
case *wire.MaxStreamsFrame:
err = s.handleMaxStreamsFrame(frame)
case *wire.DataBlockedFrame:
case *wire.StreamDataBlockedFrame:
case *wire.StreamsBlockedFrame:
case *wire.StopSendingFrame:
err = s.handleStopSendingFrame(frame)
case *wire.PingFrame:
case *wire.PathChallengeFrame:
s.handlePathChallengeFrame(frame)
case *wire.PathResponseFrame:
// since we don't send PATH_CHALLENGEs, we don't expect PATH_RESPONSEs
err = errors.New("unexpected PATH_RESPONSE frame")
case *wire.NewTokenFrame:
case *wire.NewConnectionIDFrame:
case *wire.RetireConnectionIDFrame:
// since we don't send new connection IDs, we don't expect retirements
err = errors.New("unexpected RETIRE_CONNECTION_ID frame")
default:
return errors.New("Session BUG: unexpected frame type")
}
r := bytes.NewReader(packet.data)
var isRetransmittable bool
for {
frame, err := wire.ParseNextFrame(r, s.version)
if err != nil {
return err
}
if frame == nil {
break
}
if ackhandler.IsFrameRetransmittable(frame) {
isRetransmittable = true
}
if err := s.handleFrame(frame, packet.packetNumber, packet.encryptionLevel); err != nil {
return err
}
}
if err := s.receivedPacketHandler.ReceivedPacket(packet.packetNumber, rcvTime, isRetransmittable); err != nil {
return err
}
return nil
}
func (s *session) handleFrame(f wire.Frame, pn protocol.PacketNumber, encLevel protocol.EncryptionLevel) error {
var err error
wire.LogFrame(s.logger, f, false)
switch frame := f.(type) {
case *wire.CryptoFrame:
err = s.handleCryptoFrame(frame, encLevel)
case *wire.StreamFrame:
err = s.handleStreamFrame(frame, encLevel)
case *wire.AckFrame:
err = s.handleAckFrame(frame, pn, encLevel)
case *wire.ConnectionCloseFrame:
s.closeRemote(qerr.Error(frame.ErrorCode, frame.ReasonPhrase))
case *wire.ResetStreamFrame:
err = s.handleResetStreamFrame(frame)
case *wire.MaxDataFrame:
s.handleMaxDataFrame(frame)
case *wire.MaxStreamDataFrame:
err = s.handleMaxStreamDataFrame(frame)
case *wire.MaxStreamsFrame:
err = s.handleMaxStreamsFrame(frame)
case *wire.DataBlockedFrame:
case *wire.StreamDataBlockedFrame:
case *wire.StreamsBlockedFrame:
case *wire.StopSendingFrame:
err = s.handleStopSendingFrame(frame)
case *wire.PingFrame:
case *wire.PathChallengeFrame:
s.handlePathChallengeFrame(frame)
case *wire.PathResponseFrame:
// since we don't send PATH_CHALLENGEs, we don't expect PATH_RESPONSEs
err = errors.New("unexpected PATH_RESPONSE frame")
case *wire.NewTokenFrame:
case *wire.NewConnectionIDFrame:
case *wire.RetireConnectionIDFrame:
// since we don't send new connection IDs, we don't expect retirements
err = errors.New("unexpected RETIRE_CONNECTION_ID frame")
default:
err = fmt.Errorf("unexpected frame type: %s", reflect.ValueOf(&frame).Elem().Type().Name())
}
return err
}
// handlePacket is called by the server with a new packet
func (s *session) handlePacket(p *receivedPacket) {
if s.closed.Get() {

View file

@ -16,7 +16,7 @@ import (
"github.com/lucas-clemente/quic-go/internal/ackhandler"
"github.com/lucas-clemente/quic-go/internal/handshake"
"github.com/lucas-clemente/quic-go/internal/mocks"
"github.com/lucas-clemente/quic-go/internal/mocks/ackhandler"
mockackhandler "github.com/lucas-clemente/quic-go/internal/mocks/ackhandler"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/qerr"
"github.com/lucas-clemente/quic-go/internal/utils"
@ -207,10 +207,10 @@ var _ = Describe("Session", func() {
It("ignores RESET_STREAM frames for closed streams", func() {
streamManager.EXPECT().GetOrOpenReceiveStream(protocol.StreamID(3)).Return(nil, nil)
Expect(sess.handleFrames([]wire.Frame{&wire.ResetStreamFrame{
Expect(sess.handleFrame(&wire.ResetStreamFrame{
StreamID: 3,
ErrorCode: 42,
}}, 0, protocol.EncryptionUnspecified)).To(Succeed())
}, 0, protocol.EncryptionUnspecified)).To(Succeed())
})
})
@ -242,11 +242,10 @@ var _ = Describe("Session", func() {
It("ignores MAX_STREAM_DATA frames for a closed stream", func() {
streamManager.EXPECT().GetOrOpenSendStream(protocol.StreamID(10)).Return(nil, nil)
err := sess.handleFrames([]wire.Frame{&wire.MaxStreamDataFrame{
Expect(sess.handleFrame(&wire.MaxStreamDataFrame{
StreamID: 10,
ByteOffset: 1337,
}}, 0, protocol.EncryptionUnspecified)
Expect(err).NotTo(HaveOccurred())
}, 0, protocol.EncryptionUnspecified)).To(Succeed())
})
})
@ -285,43 +284,43 @@ var _ = Describe("Session", func() {
It("ignores STOP_SENDING frames for a closed stream", func() {
streamManager.EXPECT().GetOrOpenSendStream(protocol.StreamID(3)).Return(nil, nil)
Expect(sess.handleFrames([]wire.Frame{&wire.StopSendingFrame{
Expect(sess.handleFrame(&wire.StopSendingFrame{
StreamID: 3,
ErrorCode: 1337,
}}, 0, protocol.EncryptionUnspecified)).To(Succeed())
}, 0, protocol.EncryptionUnspecified)).To(Succeed())
})
})
It("handles PING frames", func() {
err := sess.handleFrames([]wire.Frame{&wire.PingFrame{}}, 0, protocol.EncryptionUnspecified)
err := sess.handleFrame(&wire.PingFrame{}, 0, protocol.EncryptionUnspecified)
Expect(err).NotTo(HaveOccurred())
})
It("rejects PATH_RESPONSE frames", func() {
err := sess.handleFrames([]wire.Frame{&wire.PathResponseFrame{Data: [8]byte{1, 2, 3, 4, 5, 6, 7, 8}}}, 0, protocol.EncryptionUnspecified)
err := sess.handleFrame(&wire.PathResponseFrame{Data: [8]byte{1, 2, 3, 4, 5, 6, 7, 8}}, 0, protocol.EncryptionUnspecified)
Expect(err).To(MatchError("unexpected PATH_RESPONSE frame"))
})
It("handles PATH_CHALLENGE frames", func() {
data := [8]byte{1, 2, 3, 4, 5, 6, 7, 8}
err := sess.handleFrames([]wire.Frame{&wire.PathChallengeFrame{Data: data}}, 0, protocol.EncryptionUnspecified)
err := sess.handleFrame(&wire.PathChallengeFrame{Data: data}, 0, protocol.EncryptionUnspecified)
Expect(err).ToNot(HaveOccurred())
frames, _ := sess.framer.AppendControlFrames(nil, 1000)
Expect(frames).To(Equal([]wire.Frame{&wire.PathResponseFrame{Data: data}}))
})
It("handles BLOCKED frames", func() {
err := sess.handleFrames([]wire.Frame{&wire.DataBlockedFrame{}}, 0, protocol.EncryptionUnspecified)
err := sess.handleFrame(&wire.DataBlockedFrame{}, 0, protocol.EncryptionUnspecified)
Expect(err).NotTo(HaveOccurred())
})
It("handles STREAM_BLOCKED frames", func() {
err := sess.handleFrames([]wire.Frame{&wire.StreamDataBlockedFrame{}}, 0, protocol.EncryptionUnspecified)
err := sess.handleFrame(&wire.StreamDataBlockedFrame{}, 0, protocol.EncryptionUnspecified)
Expect(err).NotTo(HaveOccurred())
})
It("handles STREAM_ID_BLOCKED frames", func() {
err := sess.handleFrames([]wire.Frame{&wire.StreamsBlockedFrame{}}, 0, protocol.EncryptionUnspecified)
err := sess.handleFrame(&wire.StreamsBlockedFrame{}, 0, protocol.EncryptionUnspecified)
Expect(err).NotTo(HaveOccurred())
})
@ -337,7 +336,7 @@ var _ = Describe("Session", func() {
err := sess.run()
Expect(err).To(MatchError(testErr))
}()
err := sess.handleFrames([]wire.Frame{&wire.ConnectionCloseFrame{ErrorCode: qerr.ProofInvalid, ReasonPhrase: "foobar"}}, 0, protocol.EncryptionUnspecified)
err := sess.handleFrame(&wire.ConnectionCloseFrame{ErrorCode: qerr.ProofInvalid, ReasonPhrase: "foobar"}, 0, protocol.EncryptionUnspecified)
Expect(err).NotTo(HaveOccurred())
Eventually(sess.Context().Done()).Should(BeClosed())
})
@ -490,13 +489,17 @@ var _ = Describe("Session", func() {
return buf.Bytes()
}
It("informs the ReceivedPacketHandler", func() {
It("informs the ReceivedPacketHandler about non-retransmittable packets", func() {
hdr := &wire.ExtendedHeader{
PacketNumber: 0x37,
PacketNumberLen: protocol.PacketNumberLen1,
}
rcvTime := time.Now().Add(-10 * time.Second)
unpacker.EXPECT().Unpack(gomock.Any(), gomock.Any()).Return(&unpackedPacket{packetNumber: 0x1337, hdr: hdr}, nil)
unpacker.EXPECT().Unpack(gomock.Any(), gomock.Any()).Return(&unpackedPacket{
packetNumber: 0x1337,
hdr: hdr,
data: []byte{0}, // one PADDING frame
}, nil)
rph := mockackhandler.NewMockReceivedPacketHandler(mockCtrl)
rph.EXPECT().ReceivedPacket(protocol.PacketNumber(0x1337), rcvTime, false)
sess.receivedPacketHandler = rph
@ -507,6 +510,29 @@ var _ = Describe("Session", func() {
}))).To(BeTrue())
})
It("informs the ReceivedPacketHandler about retransmittable packets", func() {
hdr := &wire.ExtendedHeader{
PacketNumber: 0x37,
PacketNumberLen: protocol.PacketNumberLen1,
}
rcvTime := time.Now().Add(-10 * time.Second)
buf := &bytes.Buffer{}
Expect((&wire.PingFrame{}).Write(buf, sess.version)).To(Succeed())
unpacker.EXPECT().Unpack(gomock.Any(), gomock.Any()).Return(&unpackedPacket{
packetNumber: 0x1337,
hdr: hdr,
data: buf.Bytes(),
}, nil)
rph := mockackhandler.NewMockReceivedPacketHandler(mockCtrl)
rph.EXPECT().ReceivedPacket(protocol.PacketNumber(0x1337), rcvTime, true)
sess.receivedPacketHandler = rph
Expect(sess.handlePacketImpl(insertPacketBuffer(&receivedPacket{
rcvTime: rcvTime,
hdr: &hdr.Header,
data: getData(hdr),
}))).To(BeTrue())
})
It("closes when handling a packet fails", func() {
testErr := errors.New("unpack error")
unpacker.EXPECT().Unpack(gomock.Any(), gomock.Any()).Return(nil, testErr)
@ -529,12 +555,39 @@ var _ = Describe("Session", func() {
Eventually(done).Should(BeClosed())
})
It("rejects packets with empty payload", func() {
unpacker.EXPECT().Unpack(gomock.Any(), gomock.Any()).Return(&unpackedPacket{
hdr: &wire.ExtendedHeader{},
data: []byte{}, // no payload
}, nil)
streamManager.EXPECT().CloseWithError(gomock.Any())
cryptoSetup.EXPECT().Close()
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil)
done := make(chan struct{})
go func() {
defer GinkgoRecover()
cryptoSetup.EXPECT().RunHandshake().Do(func() { <-sess.Context().Done() })
err := sess.run()
Expect(err).To(MatchError(qerr.MissingPayload))
close(done)
}()
sessionRunner.EXPECT().retireConnectionID(gomock.Any())
sess.handlePacket(insertPacketBuffer(&receivedPacket{
hdr: &wire.Header{},
data: getData(&wire.ExtendedHeader{PacketNumberLen: protocol.PacketNumberLen1}),
}))
Eventually(done).Should(BeClosed())
})
It("handles duplicate packets", func() {
hdr := &wire.ExtendedHeader{
PacketNumber: 5,
PacketNumberLen: protocol.PacketNumberLen1,
}
unpacker.EXPECT().Unpack(gomock.Any(), gomock.Any()).Return(&unpackedPacket{hdr: hdr}, nil).Times(2)
unpacker.EXPECT().Unpack(gomock.Any(), gomock.Any()).Return(&unpackedPacket{
hdr: hdr,
data: []byte{0}, // one PADDING frame
}, nil).Times(2)
Expect(sess.handlePacketImpl(insertPacketBuffer(&receivedPacket{hdr: &hdr.Header, data: getData(hdr)}))).To(BeTrue())
Expect(sess.handlePacketImpl(insertPacketBuffer(&receivedPacket{hdr: &hdr.Header, data: getData(hdr)}))).To(BeTrue())
})
@ -558,7 +611,10 @@ var _ = Describe("Session", func() {
}
// Send one packet, which might change the connection ID.
// only EXPECT one call to the unpacker
unpacker.EXPECT().Unpack(gomock.Any(), gomock.Any()).Return(&unpackedPacket{hdr: &wire.ExtendedHeader{Header: *hdr}}, nil)
unpacker.EXPECT().Unpack(gomock.Any(), gomock.Any()).Return(&unpackedPacket{
hdr: &wire.ExtendedHeader{Header: *hdr},
data: []byte{0}, // one PADDING frame
}, nil)
Expect(sess.handlePacketImpl(insertPacketBuffer(&receivedPacket{
hdr: hdr,
data: getData(&wire.ExtendedHeader{PacketNumberLen: protocol.PacketNumberLen1}),
@ -577,7 +633,10 @@ var _ = Describe("Session", func() {
Context("updating the remote address", func() {
It("doesn't support connection migration", func() {
unpacker.EXPECT().Unpack(gomock.Any(), gomock.Any()).Return(&unpackedPacket{hdr: &wire.ExtendedHeader{}}, nil)
unpacker.EXPECT().Unpack(gomock.Any(), gomock.Any()).Return(&unpackedPacket{
hdr: &wire.ExtendedHeader{},
data: []byte{0}, // one PADDING frame
}, nil)
origAddr := sess.conn.(*mockConnection).remoteAddr
remoteIP := &net.IPAddr{IP: net.IPv4(192, 168, 0, 100)}
Expect(origAddr).ToNot(Equal(remoteIP))
@ -1350,7 +1409,10 @@ var _ = Describe("Client Session", func() {
It("changes the connection ID when receiving the first packet from the server", func() {
unpacker := NewMockUnpacker(mockCtrl)
unpacker.EXPECT().Unpack(gomock.Any(), gomock.Any()).DoAndReturn(func(hdr *wire.Header, data []byte) (*unpackedPacket, error) {
return &unpackedPacket{hdr: &wire.ExtendedHeader{Header: *hdr}}, nil
return &unpackedPacket{
hdr: &wire.ExtendedHeader{Header: *hdr},
data: []byte{0}, // one PADDING frame
}, nil
})
sess.unpacker = unpacker
go func() {