From 267d11ee66f5934948b6efca19e0163b34788563 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Fri, 5 Apr 2019 10:55:31 +0900 Subject: [PATCH] add tracing for sent and received packets --- client.go | 1 + client_test.go | 4 + interface.go | 4 + internal/ackhandler/interfaces.go | 12 ++ internal/ackhandler/sent_packet_handler.go | 10 + .../mocks/ackhandler/sent_packet_handler.go | 14 ++ quictrace/README.md | 5 + quictrace/interface.go | 39 ++++ quictrace/tracer.go | 189 ++++++++++++++++++ server.go | 1 + server_test.go | 4 + session.go | 31 +++ 12 files changed, 314 insertions(+) create mode 100644 quictrace/README.md create mode 100644 quictrace/interface.go create mode 100644 quictrace/tracer.go diff --git a/client.go b/client.go index 5f1ea92c..63d52393 100644 --- a/client.go +++ b/client.go @@ -245,6 +245,7 @@ func populateClientConfig(config *Config, createdPacketConn bool) *Config { MaxIncomingUniStreams: maxIncomingUniStreams, KeepAlive: config.KeepAlive, StatelessResetKey: config.StatelessResetKey, + QuicTracer: config.QuicTracer, } } diff --git a/client_test.go b/client_test.go index 6c1ce156..d17f5066 100644 --- a/client_test.go +++ b/client_test.go @@ -14,6 +14,7 @@ import ( "github.com/lucas-clemente/quic-go/internal/protocol" "github.com/lucas-clemente/quic-go/internal/utils" "github.com/lucas-clemente/quic-go/internal/wire" + "github.com/lucas-clemente/quic-go/quictrace" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -398,6 +399,7 @@ var _ = Describe("Client", func() { Context("quic.Config", func() { It("setups with the right values", func() { + tracer := quictrace.NewTracer() config := &Config{ HandshakeTimeout: 1337 * time.Minute, IdleTimeout: 42 * time.Hour, @@ -405,6 +407,7 @@ var _ = Describe("Client", func() { MaxIncomingUniStreams: 4321, ConnectionIDLength: 13, StatelessResetKey: []byte("foobar"), + QuicTracer: tracer, } c := populateClientConfig(config, false) Expect(c.HandshakeTimeout).To(Equal(1337 * time.Minute)) @@ -413,6 +416,7 @@ var _ = Describe("Client", func() { Expect(c.MaxIncomingUniStreams).To(Equal(4321)) Expect(c.ConnectionIDLength).To(Equal(13)) Expect(c.StatelessResetKey).To(Equal([]byte("foobar"))) + Expect(c.QuicTracer).To(Equal(tracer)) }) It("errors when the Config contains an invalid version", func() { diff --git a/interface.go b/interface.go index 9a7e1fba..5362e330 100644 --- a/interface.go +++ b/interface.go @@ -8,6 +8,7 @@ import ( "time" "github.com/lucas-clemente/quic-go/internal/protocol" + "github.com/lucas-clemente/quic-go/quictrace" ) // The StreamID is the ID of a QUIC stream. @@ -219,6 +220,9 @@ type Config struct { StatelessResetKey []byte // KeepAlive defines whether this peer will periodically send a packet to keep the connection alive. KeepAlive bool + // QUIC Event Tracer. + // Warning: Experimental. This API should not be considered stable and will change soon. + QuicTracer quictrace.Tracer } // A Listener for incoming QUIC connections diff --git a/internal/ackhandler/interfaces.go b/internal/ackhandler/interfaces.go index b1abad0a..372c59a4 100644 --- a/internal/ackhandler/interfaces.go +++ b/internal/ackhandler/interfaces.go @@ -39,6 +39,9 @@ type SentPacketHandler interface { GetAlarmTimeout() time.Time OnAlarm() error + + // report some congestion statistics. For tracing only. + GetStats() *State } // ReceivedPacketHandler handles ACKs needed to send for incoming packets @@ -50,3 +53,12 @@ type ReceivedPacketHandler interface { GetAlarmTimeout() time.Time GetAckFrame(protocol.EncryptionLevel) *wire.AckFrame } + +type State struct { + MinRTT time.Duration + SmoothedRTT time.Duration + LatestRTT time.Duration + + BytesInFlight protocol.ByteCount + CongestionWindow protocol.ByteCount +} diff --git a/internal/ackhandler/sent_packet_handler.go b/internal/ackhandler/sent_packet_handler.go index 3570ff98..30842c41 100644 --- a/internal/ackhandler/sent_packet_handler.go +++ b/internal/ackhandler/sent_packet_handler.go @@ -670,3 +670,13 @@ func (h *sentPacketHandler) ResetForRetry() error { h.updateLossDetectionAlarm() return nil } + +func (h *sentPacketHandler) GetStats() *State { + return &State{ + MinRTT: h.rttStats.MinRTT(), + SmoothedRTT: h.rttStats.SmoothedOrInitialRTT(), + LatestRTT: h.rttStats.LatestRTT(), + BytesInFlight: h.bytesInFlight, + CongestionWindow: h.congestion.GetCongestionWindow(), + } +} diff --git a/internal/mocks/ackhandler/sent_packet_handler.go b/internal/mocks/ackhandler/sent_packet_handler.go index fab8ff08..34a5ee61 100644 --- a/internal/mocks/ackhandler/sent_packet_handler.go +++ b/internal/mocks/ackhandler/sent_packet_handler.go @@ -106,6 +106,20 @@ func (mr *MockSentPacketHandlerMockRecorder) GetLowestPacketNotConfirmedAcked() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLowestPacketNotConfirmedAcked", reflect.TypeOf((*MockSentPacketHandler)(nil).GetLowestPacketNotConfirmedAcked)) } +// GetStats mocks base method +func (m *MockSentPacketHandler) GetStats() *ackhandler.State { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetStats") + ret0, _ := ret[0].(*ackhandler.State) + return ret0 +} + +// GetStats indicates an expected call of GetStats +func (mr *MockSentPacketHandlerMockRecorder) GetStats() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStats", reflect.TypeOf((*MockSentPacketHandler)(nil).GetStats)) +} + // OnAlarm mocks base method func (m *MockSentPacketHandler) OnAlarm() error { m.ctrl.T.Helper() diff --git a/quictrace/README.md b/quictrace/README.md new file mode 100644 index 00000000..ec00c5d4 --- /dev/null +++ b/quictrace/README.md @@ -0,0 +1,5 @@ +# quic-trace Adapter + +This is an experimental implementation of the log format consumed by [quic-trace](https://github.com/google/quic-trace). + +At this moment, this package comes with no API stability whatsoever. diff --git a/quictrace/interface.go b/quictrace/interface.go new file mode 100644 index 00000000..3e71a443 --- /dev/null +++ b/quictrace/interface.go @@ -0,0 +1,39 @@ +package quictrace + +import ( + "time" + + "github.com/lucas-clemente/quic-go/internal/ackhandler" + "github.com/lucas-clemente/quic-go/internal/protocol" + "github.com/lucas-clemente/quic-go/internal/wire" +) + +// A Tracer traces a QUIC connection +type Tracer interface { + Trace(protocol.ConnectionID, Event) + GetAllTraces() map[string][]byte +} + +// EventType is the type of an event +type EventType uint8 + +const ( + // PacketSent means that a packet was sent + PacketSent EventType = 1 + iota + // PacketReceived means that a packet was received + PacketReceived + // PacketLost means that a packet was lost + PacketLost +) + +// Event is a quic-traceable event +type Event struct { + Time time.Time + EventType EventType + + TransportState *ackhandler.State + EncryptionLevel protocol.EncryptionLevel + PacketNumber protocol.PacketNumber + PacketSize protocol.ByteCount + Frames []wire.Frame +} diff --git a/quictrace/tracer.go b/quictrace/tracer.go new file mode 100644 index 00000000..96135951 --- /dev/null +++ b/quictrace/tracer.go @@ -0,0 +1,189 @@ +package quictrace + +import ( + "fmt" + "time" + + "github.com/golang/protobuf/proto" + + "github.com/lucas-clemente/quic-go/internal/ackhandler" + "github.com/lucas-clemente/quic-go/internal/protocol" + "github.com/lucas-clemente/quic-go/internal/wire" + "github.com/lucas-clemente/quic-go/quictrace/pb" +) + +type traceEvent struct { + connID protocol.ConnectionID + ev Event +} + +// A tracer is used to trace a QUIC connection +type tracer struct { + eventQueue chan traceEvent + + events map[string] /* conn ID */ []traceEvent +} + +var _ Tracer = &tracer{} + +// NewTracer creates a new Tracer +func NewTracer() Tracer { + qt := &tracer{ + eventQueue: make(chan traceEvent, 1<<10), + events: make(map[string][]traceEvent), + } + go qt.run() + return qt +} + +// Trace traces an event +func (t *tracer) Trace(connID protocol.ConnectionID, ev Event) { + t.eventQueue <- traceEvent{connID: connID, ev: ev} +} + +func (t *tracer) run() { + for tev := range t.eventQueue { + key := string(tev.connID) + if _, ok := t.events[key]; !ok { + t.events[key] = make([]traceEvent, 0, 10*1<<10) + } + t.events[key] = append(t.events[key], tev) + } +} + +func (t *tracer) GetAllTraces() map[string][]byte { + traces := make(map[string][]byte) + for connID := range t.events { + data, err := t.emitByConnIDAsString(connID) + if err != nil { + panic(err) + } + traces[connID] = data + } + return traces +} + +// Emit emits the serialized protobuf that will be consumed by quic-trace +func (t *tracer) Emit(connID protocol.ConnectionID) ([]byte, error) { + return t.emitByConnIDAsString(string(connID)) +} + +func (t *tracer) emitByConnIDAsString(connID string) ([]byte, error) { + events, ok := t.events[connID] + if !ok { + return nil, fmt.Errorf("No trace found for connection ID %s", connID) + } + trace := &pb.Trace{ + DestinationConnectionId: []byte{1, 2, 3, 4, 5, 6, 7, 8}, + SourceConnectionId: []byte{1, 2, 3, 4, 5, 6, 7, 8}, + ProtocolVersion: []byte{0xff, 0, 0, 19}, + Events: make([]*pb.Event, len(events)), + } + var startTime time.Time + for i, ev := range events { + event := ev.ev + if i == 0 { + startTime = event.Time + } + + packetNumber := uint64(event.PacketNumber) + packetSize := uint64(event.PacketSize) + + trace.Events[i] = &pb.Event{ + TimeUs: durationToUs(event.Time.Sub(startTime)), + EventType: getEventType(event.EventType), + PacketSize: &packetSize, + PacketNumber: &packetNumber, + TransportState: getTransportState(event.TransportState), + EncryptionLevel: getEncryptionLevel(event.EncryptionLevel), + Frames: getFrames(event.Frames), + } + } + delete(t.events, connID) + return proto.Marshal(trace) +} + +func getEventType(evType EventType) *pb.EventType { + var t pb.EventType + switch evType { + case PacketSent: + t = pb.EventType_PACKET_SENT + case PacketReceived: + t = pb.EventType_PACKET_RECEIVED + case PacketLost: + t = pb.EventType_PACKET_LOST + default: + panic("unknown event type") + } + return &t +} + +func getEncryptionLevel(encLevel protocol.EncryptionLevel) *pb.EncryptionLevel { + enc := pb.EncryptionLevel_ENCRYPTION_UNKNOWN + switch encLevel { + case protocol.EncryptionInitial: + enc = pb.EncryptionLevel_ENCRYPTION_INITIAL + case protocol.EncryptionHandshake: + enc = pb.EncryptionLevel_ENCRYPTION_HANDSHAKE + case protocol.Encryption1RTT: + enc = pb.EncryptionLevel_ENCRYPTION_1RTT + } + return &enc +} + +func getFrames(wframes []wire.Frame) []*pb.Frame { + streamFrameType := pb.FrameType_STREAM + ackFrameType := pb.FrameType_ACK + var frames []*pb.Frame + for _, frame := range wframes { + switch f := frame.(type) { + case *wire.StreamFrame: + streamID := uint64(f.StreamID) + offset := uint64(f.Offset) + length := uint64(f.DataLen()) + frames = append(frames, &pb.Frame{ + FrameType: &streamFrameType, + StreamFrameInfo: &pb.StreamFrameInfo{ + StreamId: &streamID, + Offset: &offset, + Length: &length, + }, + }) + case *wire.AckFrame: + var ackedPackets []*pb.AckBlock + for _, ackBlock := range f.AckRanges { + firstPacket := uint64(ackBlock.Smallest) + lastPacket := uint64(ackBlock.Largest) + ackedPackets = append(ackedPackets, &pb.AckBlock{ + FirstPacket: &firstPacket, + LastPacket: &lastPacket, + }) + } + frames = append(frames, &pb.Frame{ + FrameType: &ackFrameType, + AckInfo: &pb.AckInfo{ + AckDelayUs: durationToUs(f.DelayTime), + AckedPackets: ackedPackets, + }, + }) + } + } + return frames +} + +func getTransportState(state *ackhandler.State) *pb.TransportState { + bytesInFlight := uint64(state.BytesInFlight) + congestionWindow := uint64(state.CongestionWindow) + return &pb.TransportState{ + MinRttUs: durationToUs(state.MinRTT), + SmoothedRttUs: durationToUs(state.SmoothedRTT), + LastRttUs: durationToUs(state.LatestRTT), + InFlightBytes: &bytesInFlight, + CwndBytes: &congestionWindow, + } +} + +func durationToUs(d time.Duration) *uint64 { + dur := uint64(d / 1000) + return &dur +} diff --git a/server.go b/server.go index f7cc198d..c080d063 100644 --- a/server.go +++ b/server.go @@ -279,6 +279,7 @@ func populateServerConfig(config *Config) *Config { MaxIncomingUniStreams: maxIncomingUniStreams, ConnectionIDLength: connIDLen, StatelessResetKey: config.StatelessResetKey, + QuicTracer: config.QuicTracer, } } diff --git a/server_test.go b/server_test.go index a8a26da5..008d7dcc 100644 --- a/server_test.go +++ b/server_test.go @@ -15,6 +15,7 @@ import ( "github.com/lucas-clemente/quic-go/internal/testdata" "github.com/lucas-clemente/quic-go/internal/utils" "github.com/lucas-clemente/quic-go/internal/wire" + "github.com/lucas-clemente/quic-go/quictrace" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -86,6 +87,7 @@ var _ = Describe("Server", func() { It("setups with the right values", func() { supportedVersions := []protocol.VersionNumber{protocol.VersionTLS} acceptToken := func(_ net.Addr, _ *Token) bool { return true } + tracer := quictrace.NewTracer() config := Config{ Versions: supportedVersions, AcceptToken: acceptToken, @@ -93,6 +95,7 @@ var _ = Describe("Server", func() { IdleTimeout: 42 * time.Minute, KeepAlive: true, StatelessResetKey: []byte("foobar"), + QuicTracer: tracer, } ln, err := Listen(conn, tlsConf, &config) Expect(err).ToNot(HaveOccurred()) @@ -104,6 +107,7 @@ var _ = Describe("Server", func() { Expect(reflect.ValueOf(server.config.AcceptToken)).To(Equal(reflect.ValueOf(acceptToken))) Expect(server.config.KeepAlive).To(BeTrue()) Expect(server.config.StatelessResetKey).To(Equal([]byte("foobar"))) + Expect(server.config.QuicTracer).To(Equal(tracer)) // stop the listener Expect(ln.Close()).To(Succeed()) }) diff --git a/session.go b/session.go index 8d7bdcbf..347c1597 100644 --- a/session.go +++ b/session.go @@ -20,6 +20,7 @@ import ( "github.com/lucas-clemente/quic-go/internal/qerr" "github.com/lucas-clemente/quic-go/internal/utils" "github.com/lucas-clemente/quic-go/internal/wire" + "github.com/lucas-clemente/quic-go/quictrace" ) type unpacker interface { @@ -665,6 +666,10 @@ func (s *session) handleUnpackedPacket(packet *unpackedPacket, rcvTime time.Time s.firstAckElicitingPacketAfterIdleSentTime = time.Time{} s.keepAlivePingSent = false + // Only used for tracing. + // If we're not tracing, this slice will always remain empty. + var frames []wire.Frame + r := bytes.NewReader(packet.data) var isAckEliciting bool for { @@ -678,11 +683,26 @@ func (s *session) handleUnpackedPacket(packet *unpackedPacket, rcvTime time.Time if ackhandler.IsFrameAckEliciting(frame) { isAckEliciting = true } + if s.config.QuicTracer != nil { + frames = append(frames, frame) + } if err := s.handleFrame(frame, packet.packetNumber, packet.encryptionLevel); err != nil { return err } } + if s.config.QuicTracer != nil { + s.config.QuicTracer.Trace(s.origDestConnID, quictrace.Event{ + Time: time.Now(), + EventType: quictrace.PacketReceived, + TransportState: s.sentPacketHandler.GetStats(), + EncryptionLevel: packet.encryptionLevel, + PacketNumber: packet.packetNumber, + PacketSize: protocol.ByteCount(len(packet.data)), + Frames: frames, + }) + } + if err := s.receivedPacketHandler.ReceivedPacket(packet.packetNumber, packet.encryptionLevel, rcvTime, isAckEliciting); err != nil { return err } @@ -1153,6 +1173,17 @@ func (s *session) sendPackedPacket(packet *packedPacket) error { if s.firstAckElicitingPacketAfterIdleSentTime.IsZero() && packet.IsAckEliciting() { s.firstAckElicitingPacketAfterIdleSentTime = time.Now() } + if s.config.QuicTracer != nil { + s.config.QuicTracer.Trace(s.origDestConnID, quictrace.Event{ + Time: time.Now(), + EventType: quictrace.PacketSent, + TransportState: s.sentPacketHandler.GetStats(), + EncryptionLevel: packet.EncryptionLevel(), + PacketNumber: packet.header.PacketNumber, + PacketSize: protocol.ByteCount(len(packet.raw)), + Frames: packet.frames, + }) + } s.logPacket(packet) return s.conn.Write(packet.raw) }