mirror of
https://github.com/refraction-networking/uquic.git
synced 2025-04-03 20:27:35 +03:00
qlog: disentangle the ConnectionTracer from the qlog writer (#4300)
The qlog writer simply records events, puts them into a channel, and consumes these events in a separate Go routine (by serializing them). The ConnectionTracer is the one generating those events.
This commit is contained in:
parent
0344401de5
commit
225d2a3926
4 changed files with 915 additions and 880 deletions
|
@ -1,10 +1,7 @@
|
|||
package qlog
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
|
@ -16,32 +13,28 @@ import (
|
|||
"github.com/francoispqt/gojay"
|
||||
)
|
||||
|
||||
const eventChanSize = 50
|
||||
|
||||
type connectionTracer struct {
|
||||
w io.WriteCloser
|
||||
odcid protocol.ConnectionID
|
||||
perspective protocol.Perspective
|
||||
referenceTime time.Time
|
||||
|
||||
events chan event
|
||||
encodeErr error
|
||||
runStopped chan struct{}
|
||||
|
||||
w writer
|
||||
lastMetrics *metrics
|
||||
|
||||
perspective logging.Perspective
|
||||
}
|
||||
|
||||
// NewConnectionTracer creates a new tracer to record a qlog for a connection.
|
||||
func NewConnectionTracer(w io.WriteCloser, p protocol.Perspective, odcid protocol.ConnectionID) *logging.ConnectionTracer {
|
||||
t := connectionTracer{
|
||||
w: w,
|
||||
perspective: p,
|
||||
odcid: odcid,
|
||||
runStopped: make(chan struct{}),
|
||||
events: make(chan event, eventChanSize),
|
||||
referenceTime: time.Now(),
|
||||
func NewConnectionTracer(w io.WriteCloser, p logging.Perspective, odcid protocol.ConnectionID) *logging.ConnectionTracer {
|
||||
tr := &trace{
|
||||
VantagePoint: vantagePoint{Type: p},
|
||||
CommonFields: commonFields{
|
||||
ODCID: odcid,
|
||||
GroupID: odcid,
|
||||
ReferenceTime: time.Now(),
|
||||
},
|
||||
}
|
||||
go t.run()
|
||||
t := connectionTracer{
|
||||
w: *newWriter(w, tr),
|
||||
perspective: p,
|
||||
}
|
||||
go t.w.Run()
|
||||
return &logging.ConnectionTracer{
|
||||
StartedConnection: func(local, remote net.Addr, srcConnID, destConnID logging.ConnectionID) {
|
||||
t.StartedConnection(local, remote, srcConnID, destConnID)
|
||||
|
@ -125,65 +118,12 @@ func NewConnectionTracer(w io.WriteCloser, p protocol.Perspective, odcid protoco
|
|||
}
|
||||
}
|
||||
|
||||
func (t *connectionTracer) run() {
|
||||
defer close(t.runStopped)
|
||||
buf := &bytes.Buffer{}
|
||||
enc := gojay.NewEncoder(buf)
|
||||
tl := &topLevel{
|
||||
trace: trace{
|
||||
VantagePoint: vantagePoint{Type: t.perspective},
|
||||
CommonFields: commonFields{
|
||||
ODCID: t.odcid,
|
||||
GroupID: t.odcid,
|
||||
ReferenceTime: t.referenceTime,
|
||||
},
|
||||
},
|
||||
}
|
||||
if err := enc.Encode(tl); err != nil {
|
||||
panic(fmt.Sprintf("qlog encoding into a bytes.Buffer failed: %s", err))
|
||||
}
|
||||
if err := buf.WriteByte('\n'); err != nil {
|
||||
panic(fmt.Sprintf("qlog encoding into a bytes.Buffer failed: %s", err))
|
||||
}
|
||||
if _, err := t.w.Write(buf.Bytes()); err != nil {
|
||||
t.encodeErr = err
|
||||
}
|
||||
enc = gojay.NewEncoder(t.w)
|
||||
for ev := range t.events {
|
||||
if t.encodeErr != nil { // if encoding failed, just continue draining the event channel
|
||||
continue
|
||||
}
|
||||
if err := enc.Encode(ev); err != nil {
|
||||
t.encodeErr = err
|
||||
continue
|
||||
}
|
||||
if _, err := t.w.Write([]byte{'\n'}); err != nil {
|
||||
t.encodeErr = err
|
||||
}
|
||||
}
|
||||
func (t *connectionTracer) recordEvent(eventTime time.Time, details eventDetails) {
|
||||
t.w.RecordEvent(eventTime, details)
|
||||
}
|
||||
|
||||
func (t *connectionTracer) Close() {
|
||||
if err := t.export(); err != nil {
|
||||
log.Printf("exporting qlog failed: %s\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
// export writes a qlog.
|
||||
func (t *connectionTracer) export() error {
|
||||
close(t.events)
|
||||
<-t.runStopped
|
||||
if t.encodeErr != nil {
|
||||
return t.encodeErr
|
||||
}
|
||||
return t.w.Close()
|
||||
}
|
||||
|
||||
func (t *connectionTracer) recordEvent(eventTime time.Time, details eventDetails) {
|
||||
t.events <- event{
|
||||
RelativeTime: eventTime.Sub(t.referenceTime),
|
||||
eventDetails: details,
|
||||
}
|
||||
t.w.Close()
|
||||
}
|
||||
|
||||
func (t *connectionTracer) StartedConnection(local, remote net.Addr, srcConnID, destConnID protocol.ConnectionID) {
|
||||
|
|
File diff suppressed because it is too large
Load diff
84
qlog/writer.go
Normal file
84
qlog/writer.go
Normal file
|
@ -0,0 +1,84 @@
|
|||
package qlog
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/francoispqt/gojay"
|
||||
)
|
||||
|
||||
const eventChanSize = 50
|
||||
|
||||
type writer struct {
|
||||
w io.WriteCloser
|
||||
|
||||
referenceTime time.Time
|
||||
tr *trace
|
||||
|
||||
events chan event
|
||||
encodeErr error
|
||||
runStopped chan struct{}
|
||||
}
|
||||
|
||||
func newWriter(w io.WriteCloser, tr *trace) *writer {
|
||||
return &writer{
|
||||
w: w,
|
||||
tr: tr,
|
||||
referenceTime: tr.CommonFields.ReferenceTime,
|
||||
runStopped: make(chan struct{}),
|
||||
events: make(chan event, eventChanSize),
|
||||
}
|
||||
}
|
||||
|
||||
func (w *writer) RecordEvent(eventTime time.Time, details eventDetails) {
|
||||
w.events <- event{
|
||||
RelativeTime: eventTime.Sub(w.referenceTime),
|
||||
eventDetails: details,
|
||||
}
|
||||
}
|
||||
|
||||
func (w *writer) Run() {
|
||||
defer close(w.runStopped)
|
||||
buf := &bytes.Buffer{}
|
||||
enc := gojay.NewEncoder(buf)
|
||||
if err := enc.Encode(&topLevel{trace: *w.tr}); err != nil {
|
||||
panic(fmt.Sprintf("qlog encoding into a bytes.Buffer failed: %s", err))
|
||||
}
|
||||
if err := buf.WriteByte('\n'); err != nil {
|
||||
panic(fmt.Sprintf("qlog encoding into a bytes.Buffer failed: %s", err))
|
||||
}
|
||||
if _, err := w.w.Write(buf.Bytes()); err != nil {
|
||||
w.encodeErr = err
|
||||
}
|
||||
enc = gojay.NewEncoder(w.w)
|
||||
for ev := range w.events {
|
||||
if w.encodeErr != nil { // if encoding failed, just continue draining the event channel
|
||||
continue
|
||||
}
|
||||
if err := enc.Encode(ev); err != nil {
|
||||
w.encodeErr = err
|
||||
continue
|
||||
}
|
||||
if _, err := w.w.Write([]byte{'\n'}); err != nil {
|
||||
w.encodeErr = err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *writer) Close() {
|
||||
if err := w.close(); err != nil {
|
||||
log.Printf("exporting qlog failed: %s\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *writer) close() error {
|
||||
close(w.events)
|
||||
<-w.runStopped
|
||||
if w.encodeErr != nil {
|
||||
return w.encodeErr
|
||||
}
|
||||
return w.w.Close()
|
||||
}
|
49
qlog/writer_test.go
Normal file
49
qlog/writer_test.go
Normal file
|
@ -0,0 +1,49 @@
|
|||
package qlog
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
|
||||
"github.com/quic-go/quic-go/internal/protocol"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
type limitedWriter struct {
|
||||
io.WriteCloser
|
||||
N int
|
||||
written int
|
||||
}
|
||||
|
||||
func (w *limitedWriter) Write(p []byte) (int, error) {
|
||||
if w.written+len(p) > w.N {
|
||||
return 0, errors.New("writer full")
|
||||
}
|
||||
n, err := w.WriteCloser.Write(p)
|
||||
w.written += n
|
||||
return n, err
|
||||
}
|
||||
|
||||
var _ = Describe("Writing", func() {
|
||||
It("stops writing when encountering an error", func() {
|
||||
buf := &bytes.Buffer{}
|
||||
t := NewConnectionTracer(
|
||||
&limitedWriter{WriteCloser: nopWriteCloser(buf), N: 250},
|
||||
protocol.PerspectiveServer,
|
||||
protocol.ParseConnectionID([]byte{0xde, 0xad, 0xbe, 0xef}),
|
||||
)
|
||||
for i := uint32(0); i < 1000; i++ {
|
||||
t.UpdatedPTOCount(i)
|
||||
}
|
||||
|
||||
b := &bytes.Buffer{}
|
||||
log.SetOutput(b)
|
||||
defer log.SetOutput(os.Stdout)
|
||||
t.Close()
|
||||
Expect(b.String()).To(ContainSubstring("writer full"))
|
||||
})
|
||||
})
|
Loading…
Add table
Add a link
Reference in a new issue