qlog: remove unneeded mutex from the ConnectionTracer (#4299)

Events are appended to a channel, which is able to handle concurrect writes.
This commit is contained in:
Marten Seemann 2024-02-02 06:57:07 +07:00 committed by GitHub
parent 2fbe713bb6
commit f3c4be6b01
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -7,7 +7,6 @@ import (
"log" "log"
"net" "net"
"runtime/debug" "runtime/debug"
"sync"
"time" "time"
"github.com/quic-go/quic-go/internal/protocol" "github.com/quic-go/quic-go/internal/protocol"
@ -49,8 +48,6 @@ func init() {
const eventChanSize = 50 const eventChanSize = 50
type connectionTracer struct { type connectionTracer struct {
mutex sync.Mutex
w io.WriteCloser w io.WriteCloser
odcid protocol.ConnectionID odcid protocol.ConnectionID
perspective protocol.Perspective perspective protocol.Perspective
@ -146,9 +143,7 @@ func NewConnectionTracer(w io.WriteCloser, p protocol.Perspective, odcid protoco
t.ECNStateUpdated(state, trigger) t.ECNStateUpdated(state, trigger)
}, },
ChoseALPN: func(protocol string) { ChoseALPN: func(protocol string) {
t.mutex.Lock()
t.recordEvent(time.Now(), eventALPNInformation{chosenALPN: protocol}) t.recordEvent(time.Now(), eventALPNInformation{chosenALPN: protocol})
t.mutex.Unlock()
}, },
Debug: func(name, msg string) { Debug: func(name, msg string) {
t.Debug(name, msg) t.Debug(name, msg)
@ -230,14 +225,12 @@ func (t *connectionTracer) StartedConnection(local, remote net.Addr, srcConnID,
if !ok { if !ok {
return return
} }
t.mutex.Lock()
t.recordEvent(time.Now(), &eventConnectionStarted{ t.recordEvent(time.Now(), &eventConnectionStarted{
SrcAddr: localAddr, SrcAddr: localAddr,
DestAddr: remoteAddr, DestAddr: remoteAddr,
SrcConnectionID: srcConnID, SrcConnectionID: srcConnID,
DestConnectionID: destConnID, DestConnectionID: destConnID,
}) })
t.mutex.Unlock()
} }
func (t *connectionTracer) NegotiatedVersion(chosen logging.VersionNumber, client, server []logging.VersionNumber) { func (t *connectionTracer) NegotiatedVersion(chosen logging.VersionNumber, client, server []logging.VersionNumber) {
@ -254,19 +247,15 @@ func (t *connectionTracer) NegotiatedVersion(chosen logging.VersionNumber, clien
serverVersions[i] = versionNumber(v) serverVersions[i] = versionNumber(v)
} }
} }
t.mutex.Lock()
t.recordEvent(time.Now(), &eventVersionNegotiated{ t.recordEvent(time.Now(), &eventVersionNegotiated{
clientVersions: clientVersions, clientVersions: clientVersions,
serverVersions: serverVersions, serverVersions: serverVersions,
chosenVersion: versionNumber(chosen), chosenVersion: versionNumber(chosen),
}) })
t.mutex.Unlock()
} }
func (t *connectionTracer) ClosedConnection(e error) { func (t *connectionTracer) ClosedConnection(e error) {
t.mutex.Lock()
t.recordEvent(time.Now(), &eventConnectionClosed{e: e}) t.recordEvent(time.Now(), &eventConnectionClosed{e: e})
t.mutex.Unlock()
} }
func (t *connectionTracer) SentTransportParameters(tp *wire.TransportParameters) { func (t *connectionTracer) SentTransportParameters(tp *wire.TransportParameters) {
@ -281,9 +270,7 @@ func (t *connectionTracer) RestoredTransportParameters(tp *wire.TransportParamet
ev := t.toTransportParameters(tp) ev := t.toTransportParameters(tp)
ev.Restore = true ev.Restore = true
t.mutex.Lock()
t.recordEvent(time.Now(), ev) t.recordEvent(time.Now(), ev)
t.mutex.Unlock()
} }
func (t *connectionTracer) recordTransportParameters(sentBy protocol.Perspective, tp *wire.TransportParameters) { func (t *connectionTracer) recordTransportParameters(sentBy protocol.Perspective, tp *wire.TransportParameters) {
@ -294,9 +281,7 @@ func (t *connectionTracer) recordTransportParameters(sentBy protocol.Perspective
} }
ev.SentBy = sentBy ev.SentBy = sentBy
t.mutex.Lock()
t.recordEvent(time.Now(), ev) t.recordEvent(time.Now(), ev)
t.mutex.Unlock()
} }
func (t *connectionTracer) toTransportParameters(tp *wire.TransportParameters) *eventTransportParameters { func (t *connectionTracer) toTransportParameters(tp *wire.TransportParameters) *eventTransportParameters {
@ -369,7 +354,6 @@ func (t *connectionTracer) sentPacket(
for _, f := range frames { for _, f := range frames {
fs = append(fs, frame{Frame: f}) fs = append(fs, frame{Frame: f})
} }
t.mutex.Lock()
t.recordEvent(time.Now(), &eventPacketSent{ t.recordEvent(time.Now(), &eventPacketSent{
Header: hdr, Header: hdr,
Length: size, Length: size,
@ -377,7 +361,6 @@ func (t *connectionTracer) sentPacket(
ECN: ecn, ECN: ecn,
Frames: fs, Frames: fs,
}) })
t.mutex.Unlock()
} }
func (t *connectionTracer) ReceivedLongHeaderPacket(hdr *logging.ExtendedHeader, size logging.ByteCount, ecn logging.ECN, frames []logging.Frame) { func (t *connectionTracer) ReceivedLongHeaderPacket(hdr *logging.ExtendedHeader, size logging.ByteCount, ecn logging.ECN, frames []logging.Frame) {
@ -386,7 +369,6 @@ func (t *connectionTracer) ReceivedLongHeaderPacket(hdr *logging.ExtendedHeader,
fs[i] = frame{Frame: f} fs[i] = frame{Frame: f}
} }
header := *transformLongHeader(hdr) header := *transformLongHeader(hdr)
t.mutex.Lock()
t.recordEvent(time.Now(), &eventPacketReceived{ t.recordEvent(time.Now(), &eventPacketReceived{
Header: header, Header: header,
Length: size, Length: size,
@ -394,7 +376,6 @@ func (t *connectionTracer) ReceivedLongHeaderPacket(hdr *logging.ExtendedHeader,
ECN: ecn, ECN: ecn,
Frames: fs, Frames: fs,
}) })
t.mutex.Unlock()
} }
func (t *connectionTracer) ReceivedShortHeaderPacket(hdr *logging.ShortHeader, size logging.ByteCount, ecn logging.ECN, frames []logging.Frame) { func (t *connectionTracer) ReceivedShortHeaderPacket(hdr *logging.ShortHeader, size logging.ByteCount, ecn logging.ECN, frames []logging.Frame) {
@ -403,7 +384,6 @@ func (t *connectionTracer) ReceivedShortHeaderPacket(hdr *logging.ShortHeader, s
fs[i] = frame{Frame: f} fs[i] = frame{Frame: f}
} }
header := *transformShortHeader(hdr) header := *transformShortHeader(hdr)
t.mutex.Lock()
t.recordEvent(time.Now(), &eventPacketReceived{ t.recordEvent(time.Now(), &eventPacketReceived{
Header: header, Header: header,
Length: size, Length: size,
@ -411,15 +391,12 @@ func (t *connectionTracer) ReceivedShortHeaderPacket(hdr *logging.ShortHeader, s
ECN: ecn, ECN: ecn,
Frames: fs, Frames: fs,
}) })
t.mutex.Unlock()
} }
func (t *connectionTracer) ReceivedRetry(hdr *wire.Header) { func (t *connectionTracer) ReceivedRetry(hdr *wire.Header) {
t.mutex.Lock()
t.recordEvent(time.Now(), &eventRetryReceived{ t.recordEvent(time.Now(), &eventRetryReceived{
Header: *transformHeader(hdr), Header: *transformHeader(hdr),
}) })
t.mutex.Unlock()
} }
func (t *connectionTracer) ReceivedVersionNegotiationPacket(dest, src logging.ArbitraryLenConnectionID, versions []logging.VersionNumber) { func (t *connectionTracer) ReceivedVersionNegotiationPacket(dest, src logging.ArbitraryLenConnectionID, versions []logging.VersionNumber) {
@ -427,7 +404,6 @@ func (t *connectionTracer) ReceivedVersionNegotiationPacket(dest, src logging.Ar
for i, v := range versions { for i, v := range versions {
ver[i] = versionNumber(v) ver[i] = versionNumber(v)
} }
t.mutex.Lock()
t.recordEvent(time.Now(), &eventVersionNegotiationReceived{ t.recordEvent(time.Now(), &eventVersionNegotiationReceived{
Header: packetHeaderVersionNegotiation{ Header: packetHeaderVersionNegotiation{
SrcConnectionID: src, SrcConnectionID: src,
@ -435,27 +411,22 @@ func (t *connectionTracer) ReceivedVersionNegotiationPacket(dest, src logging.Ar
}, },
SupportedVersions: ver, SupportedVersions: ver,
}) })
t.mutex.Unlock()
} }
func (t *connectionTracer) BufferedPacket(pt logging.PacketType, size protocol.ByteCount) { func (t *connectionTracer) BufferedPacket(pt logging.PacketType, size protocol.ByteCount) {
t.mutex.Lock()
t.recordEvent(time.Now(), &eventPacketBuffered{ t.recordEvent(time.Now(), &eventPacketBuffered{
PacketType: pt, PacketType: pt,
PacketSize: size, PacketSize: size,
}) })
t.mutex.Unlock()
} }
func (t *connectionTracer) DroppedPacket(pt logging.PacketType, pn logging.PacketNumber, size protocol.ByteCount, reason logging.PacketDropReason) { func (t *connectionTracer) DroppedPacket(pt logging.PacketType, pn logging.PacketNumber, size protocol.ByteCount, reason logging.PacketDropReason) {
t.mutex.Lock()
t.recordEvent(time.Now(), &eventPacketDropped{ t.recordEvent(time.Now(), &eventPacketDropped{
PacketType: pt, PacketType: pt,
PacketNumber: pn, PacketNumber: pn,
PacketSize: size, PacketSize: size,
Trigger: packetDropReason(reason), Trigger: packetDropReason(reason),
}) })
t.mutex.Unlock()
} }
func (t *connectionTracer) UpdatedMetrics(rttStats *utils.RTTStats, cwnd, bytesInFlight protocol.ByteCount, packetsInFlight int) { func (t *connectionTracer) UpdatedMetrics(rttStats *utils.RTTStats, cwnd, bytesInFlight protocol.ByteCount, packetsInFlight int) {
@ -468,46 +439,36 @@ func (t *connectionTracer) UpdatedMetrics(rttStats *utils.RTTStats, cwnd, bytesI
BytesInFlight: bytesInFlight, BytesInFlight: bytesInFlight,
PacketsInFlight: packetsInFlight, PacketsInFlight: packetsInFlight,
} }
t.mutex.Lock()
t.recordEvent(time.Now(), &eventMetricsUpdated{ t.recordEvent(time.Now(), &eventMetricsUpdated{
Last: t.lastMetrics, Last: t.lastMetrics,
Current: m, Current: m,
}) })
t.lastMetrics = m t.lastMetrics = m
t.mutex.Unlock()
} }
func (t *connectionTracer) AcknowledgedPacket(protocol.EncryptionLevel, protocol.PacketNumber) {} func (t *connectionTracer) AcknowledgedPacket(protocol.EncryptionLevel, protocol.PacketNumber) {}
func (t *connectionTracer) LostPacket(encLevel protocol.EncryptionLevel, pn protocol.PacketNumber, lossReason logging.PacketLossReason) { func (t *connectionTracer) LostPacket(encLevel protocol.EncryptionLevel, pn protocol.PacketNumber, lossReason logging.PacketLossReason) {
t.mutex.Lock()
t.recordEvent(time.Now(), &eventPacketLost{ t.recordEvent(time.Now(), &eventPacketLost{
PacketType: getPacketTypeFromEncryptionLevel(encLevel), PacketType: getPacketTypeFromEncryptionLevel(encLevel),
PacketNumber: pn, PacketNumber: pn,
Trigger: packetLossReason(lossReason), Trigger: packetLossReason(lossReason),
}) })
t.mutex.Unlock()
} }
func (t *connectionTracer) UpdatedCongestionState(state logging.CongestionState) { func (t *connectionTracer) UpdatedCongestionState(state logging.CongestionState) {
t.mutex.Lock()
t.recordEvent(time.Now(), &eventCongestionStateUpdated{state: congestionState(state)}) t.recordEvent(time.Now(), &eventCongestionStateUpdated{state: congestionState(state)})
t.mutex.Unlock()
} }
func (t *connectionTracer) UpdatedPTOCount(value uint32) { func (t *connectionTracer) UpdatedPTOCount(value uint32) {
t.mutex.Lock()
t.recordEvent(time.Now(), &eventUpdatedPTO{Value: value}) t.recordEvent(time.Now(), &eventUpdatedPTO{Value: value})
t.mutex.Unlock()
} }
func (t *connectionTracer) UpdatedKeyFromTLS(encLevel protocol.EncryptionLevel, pers protocol.Perspective) { func (t *connectionTracer) UpdatedKeyFromTLS(encLevel protocol.EncryptionLevel, pers protocol.Perspective) {
t.mutex.Lock()
t.recordEvent(time.Now(), &eventKeyUpdated{ t.recordEvent(time.Now(), &eventKeyUpdated{
Trigger: keyUpdateTLS, Trigger: keyUpdateTLS,
KeyType: encLevelToKeyType(encLevel, pers), KeyType: encLevelToKeyType(encLevel, pers),
}) })
t.mutex.Unlock()
} }
func (t *connectionTracer) UpdatedKey(generation protocol.KeyPhase, remote bool) { func (t *connectionTracer) UpdatedKey(generation protocol.KeyPhase, remote bool) {
@ -515,7 +476,6 @@ func (t *connectionTracer) UpdatedKey(generation protocol.KeyPhase, remote bool)
if remote { if remote {
trigger = keyUpdateRemote trigger = keyUpdateRemote
} }
t.mutex.Lock()
now := time.Now() now := time.Now()
t.recordEvent(now, &eventKeyUpdated{ t.recordEvent(now, &eventKeyUpdated{
Trigger: trigger, Trigger: trigger,
@ -527,11 +487,9 @@ func (t *connectionTracer) UpdatedKey(generation protocol.KeyPhase, remote bool)
KeyType: keyTypeServer1RTT, KeyType: keyTypeServer1RTT,
Generation: generation, Generation: generation,
}) })
t.mutex.Unlock()
} }
func (t *connectionTracer) DroppedEncryptionLevel(encLevel protocol.EncryptionLevel) { func (t *connectionTracer) DroppedEncryptionLevel(encLevel protocol.EncryptionLevel) {
t.mutex.Lock()
now := time.Now() now := time.Now()
if encLevel == protocol.Encryption0RTT { if encLevel == protocol.Encryption0RTT {
t.recordEvent(now, &eventKeyDiscarded{KeyType: encLevelToKeyType(encLevel, t.perspective)}) t.recordEvent(now, &eventKeyDiscarded{KeyType: encLevelToKeyType(encLevel, t.perspective)})
@ -539,11 +497,9 @@ func (t *connectionTracer) DroppedEncryptionLevel(encLevel protocol.EncryptionLe
t.recordEvent(now, &eventKeyDiscarded{KeyType: encLevelToKeyType(encLevel, protocol.PerspectiveServer)}) t.recordEvent(now, &eventKeyDiscarded{KeyType: encLevelToKeyType(encLevel, protocol.PerspectiveServer)})
t.recordEvent(now, &eventKeyDiscarded{KeyType: encLevelToKeyType(encLevel, protocol.PerspectiveClient)}) t.recordEvent(now, &eventKeyDiscarded{KeyType: encLevelToKeyType(encLevel, protocol.PerspectiveClient)})
} }
t.mutex.Unlock()
} }
func (t *connectionTracer) DroppedKey(generation protocol.KeyPhase) { func (t *connectionTracer) DroppedKey(generation protocol.KeyPhase) {
t.mutex.Lock()
now := time.Now() now := time.Now()
t.recordEvent(now, &eventKeyDiscarded{ t.recordEvent(now, &eventKeyDiscarded{
KeyType: encLevelToKeyType(protocol.Encryption1RTT, protocol.PerspectiveServer), KeyType: encLevelToKeyType(protocol.Encryption1RTT, protocol.PerspectiveServer),
@ -553,46 +509,35 @@ func (t *connectionTracer) DroppedKey(generation protocol.KeyPhase) {
KeyType: encLevelToKeyType(protocol.Encryption1RTT, protocol.PerspectiveClient), KeyType: encLevelToKeyType(protocol.Encryption1RTT, protocol.PerspectiveClient),
Generation: generation, Generation: generation,
}) })
t.mutex.Unlock()
} }
func (t *connectionTracer) SetLossTimer(tt logging.TimerType, encLevel protocol.EncryptionLevel, timeout time.Time) { func (t *connectionTracer) SetLossTimer(tt logging.TimerType, encLevel protocol.EncryptionLevel, timeout time.Time) {
t.mutex.Lock()
now := time.Now() now := time.Now()
t.recordEvent(now, &eventLossTimerSet{ t.recordEvent(now, &eventLossTimerSet{
TimerType: timerType(tt), TimerType: timerType(tt),
EncLevel: encLevel, EncLevel: encLevel,
Delta: timeout.Sub(now), Delta: timeout.Sub(now),
}) })
t.mutex.Unlock()
} }
func (t *connectionTracer) LossTimerExpired(tt logging.TimerType, encLevel protocol.EncryptionLevel) { func (t *connectionTracer) LossTimerExpired(tt logging.TimerType, encLevel protocol.EncryptionLevel) {
t.mutex.Lock()
t.recordEvent(time.Now(), &eventLossTimerExpired{ t.recordEvent(time.Now(), &eventLossTimerExpired{
TimerType: timerType(tt), TimerType: timerType(tt),
EncLevel: encLevel, EncLevel: encLevel,
}) })
t.mutex.Unlock()
} }
func (t *connectionTracer) LossTimerCanceled() { func (t *connectionTracer) LossTimerCanceled() {
t.mutex.Lock()
t.recordEvent(time.Now(), &eventLossTimerCanceled{}) t.recordEvent(time.Now(), &eventLossTimerCanceled{})
t.mutex.Unlock()
} }
func (t *connectionTracer) ECNStateUpdated(state logging.ECNState, trigger logging.ECNStateTrigger) { func (t *connectionTracer) ECNStateUpdated(state logging.ECNState, trigger logging.ECNStateTrigger) {
t.mutex.Lock()
t.recordEvent(time.Now(), &eventECNStateUpdated{state: state, trigger: trigger}) t.recordEvent(time.Now(), &eventECNStateUpdated{state: state, trigger: trigger})
t.mutex.Unlock()
} }
func (t *connectionTracer) Debug(name, msg string) { func (t *connectionTracer) Debug(name, msg string) {
t.mutex.Lock()
t.recordEvent(time.Now(), &eventGeneric{ t.recordEvent(time.Now(), &eventGeneric{
name: name, name: name,
msg: msg, msg: msg,
}) })
t.mutex.Unlock()
} }