add tracing for sent and received packets

This commit is contained in:
Marten Seemann 2019-04-05 10:55:31 +09:00
parent 8926531f7e
commit 267d11ee66
12 changed files with 314 additions and 0 deletions

View file

@ -245,6 +245,7 @@ func populateClientConfig(config *Config, createdPacketConn bool) *Config {
MaxIncomingUniStreams: maxIncomingUniStreams,
KeepAlive: config.KeepAlive,
StatelessResetKey: config.StatelessResetKey,
QuicTracer: config.QuicTracer,
}
}

View file

@ -14,6 +14,7 @@ import (
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/utils"
"github.com/lucas-clemente/quic-go/internal/wire"
"github.com/lucas-clemente/quic-go/quictrace"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -398,6 +399,7 @@ var _ = Describe("Client", func() {
Context("quic.Config", func() {
It("setups with the right values", func() {
tracer := quictrace.NewTracer()
config := &Config{
HandshakeTimeout: 1337 * time.Minute,
IdleTimeout: 42 * time.Hour,
@ -405,6 +407,7 @@ var _ = Describe("Client", func() {
MaxIncomingUniStreams: 4321,
ConnectionIDLength: 13,
StatelessResetKey: []byte("foobar"),
QuicTracer: tracer,
}
c := populateClientConfig(config, false)
Expect(c.HandshakeTimeout).To(Equal(1337 * time.Minute))
@ -413,6 +416,7 @@ var _ = Describe("Client", func() {
Expect(c.MaxIncomingUniStreams).To(Equal(4321))
Expect(c.ConnectionIDLength).To(Equal(13))
Expect(c.StatelessResetKey).To(Equal([]byte("foobar")))
Expect(c.QuicTracer).To(Equal(tracer))
})
It("errors when the Config contains an invalid version", func() {

View file

@ -8,6 +8,7 @@ import (
"time"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/quictrace"
)
// The StreamID is the ID of a QUIC stream.
@ -219,6 +220,9 @@ type Config struct {
StatelessResetKey []byte
// KeepAlive defines whether this peer will periodically send a packet to keep the connection alive.
KeepAlive bool
// QUIC Event Tracer.
// Warning: Experimental. This API should not be considered stable and will change soon.
QuicTracer quictrace.Tracer
}
// A Listener for incoming QUIC connections

View file

@ -39,6 +39,9 @@ type SentPacketHandler interface {
GetAlarmTimeout() time.Time
OnAlarm() error
// report some congestion statistics. For tracing only.
GetStats() *State
}
// ReceivedPacketHandler handles ACKs needed to send for incoming packets
@ -50,3 +53,12 @@ type ReceivedPacketHandler interface {
GetAlarmTimeout() time.Time
GetAckFrame(protocol.EncryptionLevel) *wire.AckFrame
}
type State struct {
MinRTT time.Duration
SmoothedRTT time.Duration
LatestRTT time.Duration
BytesInFlight protocol.ByteCount
CongestionWindow protocol.ByteCount
}

View file

@ -670,3 +670,13 @@ func (h *sentPacketHandler) ResetForRetry() error {
h.updateLossDetectionAlarm()
return nil
}
func (h *sentPacketHandler) GetStats() *State {
return &State{
MinRTT: h.rttStats.MinRTT(),
SmoothedRTT: h.rttStats.SmoothedOrInitialRTT(),
LatestRTT: h.rttStats.LatestRTT(),
BytesInFlight: h.bytesInFlight,
CongestionWindow: h.congestion.GetCongestionWindow(),
}
}

View file

@ -106,6 +106,20 @@ func (mr *MockSentPacketHandlerMockRecorder) GetLowestPacketNotConfirmedAcked()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLowestPacketNotConfirmedAcked", reflect.TypeOf((*MockSentPacketHandler)(nil).GetLowestPacketNotConfirmedAcked))
}
// GetStats mocks base method
func (m *MockSentPacketHandler) GetStats() *ackhandler.State {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetStats")
ret0, _ := ret[0].(*ackhandler.State)
return ret0
}
// GetStats indicates an expected call of GetStats
func (mr *MockSentPacketHandlerMockRecorder) GetStats() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStats", reflect.TypeOf((*MockSentPacketHandler)(nil).GetStats))
}
// OnAlarm mocks base method
func (m *MockSentPacketHandler) OnAlarm() error {
m.ctrl.T.Helper()

5
quictrace/README.md Normal file
View file

@ -0,0 +1,5 @@
# quic-trace Adapter
This is an experimental implementation of the log format consumed by [quic-trace](https://github.com/google/quic-trace).
At this moment, this package comes with no API stability whatsoever.

39
quictrace/interface.go Normal file
View file

@ -0,0 +1,39 @@
package quictrace
import (
"time"
"github.com/lucas-clemente/quic-go/internal/ackhandler"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/wire"
)
// A Tracer traces a QUIC connection
type Tracer interface {
Trace(protocol.ConnectionID, Event)
GetAllTraces() map[string][]byte
}
// EventType is the type of an event
type EventType uint8
const (
// PacketSent means that a packet was sent
PacketSent EventType = 1 + iota
// PacketReceived means that a packet was received
PacketReceived
// PacketLost means that a packet was lost
PacketLost
)
// Event is a quic-traceable event
type Event struct {
Time time.Time
EventType EventType
TransportState *ackhandler.State
EncryptionLevel protocol.EncryptionLevel
PacketNumber protocol.PacketNumber
PacketSize protocol.ByteCount
Frames []wire.Frame
}

189
quictrace/tracer.go Normal file
View file

@ -0,0 +1,189 @@
package quictrace
import (
"fmt"
"time"
"github.com/golang/protobuf/proto"
"github.com/lucas-clemente/quic-go/internal/ackhandler"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/wire"
"github.com/lucas-clemente/quic-go/quictrace/pb"
)
type traceEvent struct {
connID protocol.ConnectionID
ev Event
}
// A tracer is used to trace a QUIC connection
type tracer struct {
eventQueue chan traceEvent
events map[string] /* conn ID */ []traceEvent
}
var _ Tracer = &tracer{}
// NewTracer creates a new Tracer
func NewTracer() Tracer {
qt := &tracer{
eventQueue: make(chan traceEvent, 1<<10),
events: make(map[string][]traceEvent),
}
go qt.run()
return qt
}
// Trace traces an event
func (t *tracer) Trace(connID protocol.ConnectionID, ev Event) {
t.eventQueue <- traceEvent{connID: connID, ev: ev}
}
func (t *tracer) run() {
for tev := range t.eventQueue {
key := string(tev.connID)
if _, ok := t.events[key]; !ok {
t.events[key] = make([]traceEvent, 0, 10*1<<10)
}
t.events[key] = append(t.events[key], tev)
}
}
func (t *tracer) GetAllTraces() map[string][]byte {
traces := make(map[string][]byte)
for connID := range t.events {
data, err := t.emitByConnIDAsString(connID)
if err != nil {
panic(err)
}
traces[connID] = data
}
return traces
}
// Emit emits the serialized protobuf that will be consumed by quic-trace
func (t *tracer) Emit(connID protocol.ConnectionID) ([]byte, error) {
return t.emitByConnIDAsString(string(connID))
}
func (t *tracer) emitByConnIDAsString(connID string) ([]byte, error) {
events, ok := t.events[connID]
if !ok {
return nil, fmt.Errorf("No trace found for connection ID %s", connID)
}
trace := &pb.Trace{
DestinationConnectionId: []byte{1, 2, 3, 4, 5, 6, 7, 8},
SourceConnectionId: []byte{1, 2, 3, 4, 5, 6, 7, 8},
ProtocolVersion: []byte{0xff, 0, 0, 19},
Events: make([]*pb.Event, len(events)),
}
var startTime time.Time
for i, ev := range events {
event := ev.ev
if i == 0 {
startTime = event.Time
}
packetNumber := uint64(event.PacketNumber)
packetSize := uint64(event.PacketSize)
trace.Events[i] = &pb.Event{
TimeUs: durationToUs(event.Time.Sub(startTime)),
EventType: getEventType(event.EventType),
PacketSize: &packetSize,
PacketNumber: &packetNumber,
TransportState: getTransportState(event.TransportState),
EncryptionLevel: getEncryptionLevel(event.EncryptionLevel),
Frames: getFrames(event.Frames),
}
}
delete(t.events, connID)
return proto.Marshal(trace)
}
func getEventType(evType EventType) *pb.EventType {
var t pb.EventType
switch evType {
case PacketSent:
t = pb.EventType_PACKET_SENT
case PacketReceived:
t = pb.EventType_PACKET_RECEIVED
case PacketLost:
t = pb.EventType_PACKET_LOST
default:
panic("unknown event type")
}
return &t
}
func getEncryptionLevel(encLevel protocol.EncryptionLevel) *pb.EncryptionLevel {
enc := pb.EncryptionLevel_ENCRYPTION_UNKNOWN
switch encLevel {
case protocol.EncryptionInitial:
enc = pb.EncryptionLevel_ENCRYPTION_INITIAL
case protocol.EncryptionHandshake:
enc = pb.EncryptionLevel_ENCRYPTION_HANDSHAKE
case protocol.Encryption1RTT:
enc = pb.EncryptionLevel_ENCRYPTION_1RTT
}
return &enc
}
func getFrames(wframes []wire.Frame) []*pb.Frame {
streamFrameType := pb.FrameType_STREAM
ackFrameType := pb.FrameType_ACK
var frames []*pb.Frame
for _, frame := range wframes {
switch f := frame.(type) {
case *wire.StreamFrame:
streamID := uint64(f.StreamID)
offset := uint64(f.Offset)
length := uint64(f.DataLen())
frames = append(frames, &pb.Frame{
FrameType: &streamFrameType,
StreamFrameInfo: &pb.StreamFrameInfo{
StreamId: &streamID,
Offset: &offset,
Length: &length,
},
})
case *wire.AckFrame:
var ackedPackets []*pb.AckBlock
for _, ackBlock := range f.AckRanges {
firstPacket := uint64(ackBlock.Smallest)
lastPacket := uint64(ackBlock.Largest)
ackedPackets = append(ackedPackets, &pb.AckBlock{
FirstPacket: &firstPacket,
LastPacket: &lastPacket,
})
}
frames = append(frames, &pb.Frame{
FrameType: &ackFrameType,
AckInfo: &pb.AckInfo{
AckDelayUs: durationToUs(f.DelayTime),
AckedPackets: ackedPackets,
},
})
}
}
return frames
}
func getTransportState(state *ackhandler.State) *pb.TransportState {
bytesInFlight := uint64(state.BytesInFlight)
congestionWindow := uint64(state.CongestionWindow)
return &pb.TransportState{
MinRttUs: durationToUs(state.MinRTT),
SmoothedRttUs: durationToUs(state.SmoothedRTT),
LastRttUs: durationToUs(state.LatestRTT),
InFlightBytes: &bytesInFlight,
CwndBytes: &congestionWindow,
}
}
func durationToUs(d time.Duration) *uint64 {
dur := uint64(d / 1000)
return &dur
}

View file

@ -279,6 +279,7 @@ func populateServerConfig(config *Config) *Config {
MaxIncomingUniStreams: maxIncomingUniStreams,
ConnectionIDLength: connIDLen,
StatelessResetKey: config.StatelessResetKey,
QuicTracer: config.QuicTracer,
}
}

View file

@ -15,6 +15,7 @@ import (
"github.com/lucas-clemente/quic-go/internal/testdata"
"github.com/lucas-clemente/quic-go/internal/utils"
"github.com/lucas-clemente/quic-go/internal/wire"
"github.com/lucas-clemente/quic-go/quictrace"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -86,6 +87,7 @@ var _ = Describe("Server", func() {
It("setups with the right values", func() {
supportedVersions := []protocol.VersionNumber{protocol.VersionTLS}
acceptToken := func(_ net.Addr, _ *Token) bool { return true }
tracer := quictrace.NewTracer()
config := Config{
Versions: supportedVersions,
AcceptToken: acceptToken,
@ -93,6 +95,7 @@ var _ = Describe("Server", func() {
IdleTimeout: 42 * time.Minute,
KeepAlive: true,
StatelessResetKey: []byte("foobar"),
QuicTracer: tracer,
}
ln, err := Listen(conn, tlsConf, &config)
Expect(err).ToNot(HaveOccurred())
@ -104,6 +107,7 @@ var _ = Describe("Server", func() {
Expect(reflect.ValueOf(server.config.AcceptToken)).To(Equal(reflect.ValueOf(acceptToken)))
Expect(server.config.KeepAlive).To(BeTrue())
Expect(server.config.StatelessResetKey).To(Equal([]byte("foobar")))
Expect(server.config.QuicTracer).To(Equal(tracer))
// stop the listener
Expect(ln.Close()).To(Succeed())
})

View file

@ -20,6 +20,7 @@ import (
"github.com/lucas-clemente/quic-go/internal/qerr"
"github.com/lucas-clemente/quic-go/internal/utils"
"github.com/lucas-clemente/quic-go/internal/wire"
"github.com/lucas-clemente/quic-go/quictrace"
)
type unpacker interface {
@ -665,6 +666,10 @@ func (s *session) handleUnpackedPacket(packet *unpackedPacket, rcvTime time.Time
s.firstAckElicitingPacketAfterIdleSentTime = time.Time{}
s.keepAlivePingSent = false
// Only used for tracing.
// If we're not tracing, this slice will always remain empty.
var frames []wire.Frame
r := bytes.NewReader(packet.data)
var isAckEliciting bool
for {
@ -678,11 +683,26 @@ func (s *session) handleUnpackedPacket(packet *unpackedPacket, rcvTime time.Time
if ackhandler.IsFrameAckEliciting(frame) {
isAckEliciting = true
}
if s.config.QuicTracer != nil {
frames = append(frames, frame)
}
if err := s.handleFrame(frame, packet.packetNumber, packet.encryptionLevel); err != nil {
return err
}
}
if s.config.QuicTracer != nil {
s.config.QuicTracer.Trace(s.origDestConnID, quictrace.Event{
Time: time.Now(),
EventType: quictrace.PacketReceived,
TransportState: s.sentPacketHandler.GetStats(),
EncryptionLevel: packet.encryptionLevel,
PacketNumber: packet.packetNumber,
PacketSize: protocol.ByteCount(len(packet.data)),
Frames: frames,
})
}
if err := s.receivedPacketHandler.ReceivedPacket(packet.packetNumber, packet.encryptionLevel, rcvTime, isAckEliciting); err != nil {
return err
}
@ -1153,6 +1173,17 @@ func (s *session) sendPackedPacket(packet *packedPacket) error {
if s.firstAckElicitingPacketAfterIdleSentTime.IsZero() && packet.IsAckEliciting() {
s.firstAckElicitingPacketAfterIdleSentTime = time.Now()
}
if s.config.QuicTracer != nil {
s.config.QuicTracer.Trace(s.origDestConnID, quictrace.Event{
Time: time.Now(),
EventType: quictrace.PacketSent,
TransportState: s.sentPacketHandler.GetStats(),
EncryptionLevel: packet.EncryptionLevel(),
PacketNumber: packet.header.PacketNumber,
PacketSize: protocol.ByteCount(len(packet.raw)),
Frames: packet.frames,
})
}
s.logPacket(packet)
return s.conn.Write(packet.raw)
}