refactor connection timer logic (#4927)

This commit is contained in:
Marten Seemann 2025-01-28 05:08:05 +01:00 committed by GitHub
parent 10a541bfc0
commit 6b9921bbfc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 59 additions and 46 deletions

View file

@ -544,10 +544,15 @@ runLoop:
default:
}
s.maybeResetTimer()
// no need to set a timer if we can send packets immediately
if s.pacingDeadline != deadlineSendImmediately {
s.maybeResetTimer()
}
var processedUndecryptablePacket bool
// 1st: handle undecryptable packets, if any.
// This can only occur before completion of the handshake.
if len(s.undecryptablePacketsToProcess) > 0 {
var processedUndecryptablePacket bool
queue := s.undecryptablePacketsToProcess
s.undecryptablePacketsToProcess = nil
for _, p := range queue {
@ -560,19 +565,36 @@ runLoop:
processedUndecryptablePacket = true
}
}
if processedUndecryptablePacket {
// if we processed any undecryptable packets, jump to the resetting of the timers directly
continue
}
}
// If we processed any undecryptable packets, jump to the resetting of the timers directly.
if !processedUndecryptablePacket {
// 2nd: receive packets.
processed, err := s.handlePackets() // don't check receivedPackets.Len() in the run loop to avoid locking the mutex
if err != nil {
s.setCloseError(&closeError{err: err})
break runLoop
}
// We don't need to wait for new events if:
// * we processed packets: we probably need to send an ACK, and potentially more data
// * the pacer allows us to send more packets immediately
shouldProceedImmediately := sendQueueAvailable == nil && (processed || s.pacingDeadline == deadlineSendImmediately)
if !shouldProceedImmediately {
// 3rd: wait for something to happen:
// * closing of the connection
// * timer firing
// * sending scheduled
// * send queue available
// * received packets
select {
case <-s.closeChan:
break runLoop
case <-s.timer.Chan():
s.timer.SetRead()
// We do all the interesting stuff after the switch statement, so
// nothing to see here.
case <-s.sendingScheduled:
// We do all the interesting stuff after the switch statement, so
// nothing to see here.
case <-sendQueueAvailable:
case <-s.notifyReceivedPacket:
wasProcessed, err := s.handlePackets()
@ -580,19 +602,17 @@ runLoop:
s.setCloseError(&closeError{err: err})
break runLoop
}
// Only reset the timers if this packet was actually processed.
// This avoids modifying any state when handling undecryptable packets,
// which could be injected by an attacker.
// if we processed any undecryptable packets, jump to the resetting of the timers directly
if !wasProcessed {
continue
}
}
}
// Check for loss detection timeout.
// This could cause packets to be declared lost, and retransmissions to be enqueued.
now := time.Now()
if timeout := s.sentPacketHandler.GetLossDetectionTimeout(); !timeout.IsZero() && timeout.Before(now) {
// This could cause packets to be retransmitted.
// Check it before trying to send packets.
if err := s.sentPacketHandler.OnLossDetectionTimeout(now); err != nil {
s.setCloseError(&closeError{err: err})
break runLoop

View file

@ -1399,48 +1399,52 @@ func TestConnection0RTTTransportParameters(t *testing.T) {
func TestConnectionReceivePrioritization(t *testing.T) {
t.Run("handshake complete", func(t *testing.T) {
counter := testConnectionReceivePrioritization(t, true)
require.Equal(t, 10, counter)
events := testConnectionReceivePrioritization(t, true, 5)
require.Equal(t, []string{"unpack", "unpack", "unpack", "unpack", "unpack", "pack"}, events)
})
// before handshake completion, we trigger packing of a new packet every time we receive a packet
t.Run("handshake not complete", func(t *testing.T) {
counter := testConnectionReceivePrioritization(t, false)
require.Equal(t, 1, counter)
events := testConnectionReceivePrioritization(t, false, 5)
require.Equal(t, []string{
"unpack", "pack",
"unpack", "pack",
"unpack", "pack",
"unpack", "pack",
"unpack", "pack",
}, events)
})
}
func testConnectionReceivePrioritization(t *testing.T, handshakeComplete bool) int {
func testConnectionReceivePrioritization(t *testing.T, handshakeComplete bool, numPackets int) []string {
mockCtrl := gomock.NewController(t)
unpacker := NewMockUnpacker(mockCtrl)
opts := []testConnectionOpt{connectionOptUnpacker(unpacker)}
if handshakeComplete {
opts = append(opts, connectionOptHandshakeConfirmed())
}
tc := newServerTestConnection(t,
mockCtrl,
nil,
false,
opts...,
)
tc := newServerTestConnection(t, mockCtrl, nil, false, opts...)
var events []string
var counter int
var packedFirst bool
var testDone bool
done := make(chan struct{})
unpacker.EXPECT().UnpackShortHeader(gomock.Any(), gomock.Any()).DoAndReturn(
func(rcvTime time.Time, data []byte) (protocol.PacketNumber, protocol.PacketNumberLen, protocol.KeyPhaseBit, []byte, error) {
if !packedFirst {
counter++
counter++
if counter == numPackets {
testDone = true
}
events = append(events, "unpack")
return protocol.PacketNumber(counter), protocol.PacketNumberLen2, protocol.KeyPhaseZero, []byte{0, 1} /* PADDING, PING */, nil
},
).AnyTimes()
).Times(numPackets)
switch handshakeComplete {
case false:
tc.packer.EXPECT().PackCoalescedPacket(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(b bool, bc protocol.ByteCount, t time.Time, v protocol.Version) (*coalescedPacket, error) {
if !packedFirst {
packedFirst = true
events = append(events, "pack")
if testDone {
close(done)
}
return nil, nil
@ -1449,8 +1453,8 @@ func testConnectionReceivePrioritization(t *testing.T, handshakeComplete bool) i
case true:
tc.packer.EXPECT().AppendPacket(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(b *packetBuffer, bc protocol.ByteCount, t time.Time, v protocol.Version) (shortHeaderPacket, error) {
if !packedFirst {
packedFirst = true
events = append(events, "pack")
if testDone {
close(done)
}
return shortHeaderPacket{}, errNothingToPack
@ -1458,10 +1462,11 @@ func testConnectionReceivePrioritization(t *testing.T, handshakeComplete bool) i
).AnyTimes()
}
for i := 0; i < 10; i++ {
for i := range numPackets {
tc.conn.handlePacket(getShortHeaderPacket(t, tc.srcConnID, protocol.PacketNumber(i), []byte("foobar")))
}
tc.connRunner.EXPECT().Remove(gomock.Any()).AnyTimes()
errChan := make(chan error, 1)
go func() { errChan <- tc.conn.run() }()
@ -1480,8 +1485,7 @@ func testConnectionReceivePrioritization(t *testing.T, handshakeComplete bool) i
case <-time.After(time.Second):
t.Fatal("timeout")
}
return counter
return events
}
func TestConnectionPacketBuffering(t *testing.T) {

View file

@ -47,14 +47,3 @@ func TestConnectionTimerReset(t *testing.T) {
timer.SetTimer(now.Add(time.Hour), now.Add(time.Minute), time.Time{}, time.Time{})
require.Equal(t, now.Add(time.Hour), timer.Deadline())
}
func TestConnectionTimerSendImmediately(t *testing.T) {
now := time.Now()
timer := newTimer()
timer.SetTimer(now.Add(time.Hour), now.Add(time.Minute), time.Time{}, time.Time{})
require.Equal(t, now.Add(time.Minute), timer.Deadline())
timer.SetRead()
timer.SetTimer(now.Add(time.Hour), now.Add(time.Minute), time.Time{}, deadlineSendImmediately)
require.Equal(t, deadlineSendImmediately, timer.Deadline())
}