use a ringbuffer to store received packets in the connection (#4928)

Reduces memory usage by replacing a fixed-capacity channel (256) with a
ring buffer (initial capacity: 8). The buffer grows to 256 only when
many packets are enqueued.
This commit is contained in:
Marten Seemann 2025-01-28 04:54:04 +01:00 committed by GitHub
parent 02e276ed70
commit 10a541bfc0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -19,6 +19,7 @@ import (
"github.com/quic-go/quic-go/internal/protocol"
"github.com/quic-go/quic-go/internal/qerr"
"github.com/quic-go/quic-go/internal/utils"
"github.com/quic-go/quic-go/internal/utils/ringbuffer"
"github.com/quic-go/quic-go/internal/wire"
"github.com/quic-go/quic-go/logging"
)
@ -154,8 +155,10 @@ type connection struct {
oneRTTStream *cryptoStream // only set for the server
cryptoStreamHandler cryptoStreamHandler
receivedPackets chan receivedPacket
sendingScheduled chan struct{}
notifyReceivedPacket chan struct{}
sendingScheduled chan struct{}
receivedPacketMx sync.Mutex
receivedPackets ringbuffer.RingBuffer[receivedPacket]
// closeChan is used to notify the run loop that it should terminate
closeChan chan struct{}
@ -478,7 +481,8 @@ func (s *connection) preSetup() {
s.perspective,
)
s.framer = newFramer(s.connFlowController)
s.receivedPackets = make(chan receivedPacket, protocol.MaxConnUnprocessedPackets)
s.receivedPackets.Init(8)
s.notifyReceivedPacket = make(chan struct{}, 1)
s.closeChan = make(chan struct{}, 1)
s.sendingScheduled = make(chan struct{}, 1)
s.handshakeCompleteChan = make(chan struct{})
@ -496,18 +500,14 @@ func (s *connection) run() (err error) {
defer func() { s.ctxCancel(err) }()
defer func() {
// Drain queued packets that will never be processed.
for {
select {
case p, ok := <-s.receivedPackets:
if !ok {
return
}
p.buffer.Decrement()
p.buffer.MaybeRelease()
default:
return
}
// drain queued packets that will never be processed
s.receivedPacketMx.Lock()
defer s.receivedPacketMx.Unlock()
for !s.receivedPackets.Empty() {
p := s.receivedPackets.PopFront()
p.buffer.Decrement()
p.buffer.MaybeRelease()
}
}()
@ -574,8 +574,8 @@ runLoop:
// We do all the interesting stuff after the switch statement, so
// nothing to see here.
case <-sendQueueAvailable:
case firstPacket := <-s.receivedPackets:
wasProcessed, err := s.handlePackets(firstPacket)
case <-s.notifyReceivedPacket:
wasProcessed, err := s.handlePackets()
if err != nil {
s.setCloseError(&closeError{err: err})
break runLoop
@ -784,32 +784,45 @@ func (s *connection) handleHandshakeConfirmed(now time.Time) error {
return nil
}
func (s *connection) handlePackets(firstPacket receivedPacket) (wasProcessed bool, _ error) {
wasProcessed, err := s.handleOnePacket(firstPacket)
if err != nil {
return false, err
}
// only process a single packet at a time before handshake completion
if !s.handshakeComplete {
return wasProcessed, nil
}
func (s *connection) handlePackets() (wasProcessed bool, _ error) {
// Now process all packets in the receivedPackets channel.
// Limit the number of packets to the length of the receivedPackets channel,
// so we eventually get a chance to send out an ACK when receiving a lot of packets.
numPackets := len(s.receivedPackets)
receiveLoop:
s.receivedPacketMx.Lock()
numPackets := s.receivedPackets.Len()
if numPackets == 0 {
s.receivedPacketMx.Unlock()
return false, nil
}
var hasMorePackets bool
for i := 0; i < numPackets; i++ {
if i > 0 {
s.receivedPacketMx.Lock()
}
p := s.receivedPackets.PopFront()
hasMorePackets = !s.receivedPackets.Empty()
s.receivedPacketMx.Unlock()
processed, err := s.handleOnePacket(p)
if err != nil {
return false, err
}
if processed {
wasProcessed = true
}
if !hasMorePackets {
break
}
// only process a single packet at a time before handshake completion
if !s.handshakeComplete {
break
}
}
if hasMorePackets {
select {
case p := <-s.receivedPackets:
processed, err := s.handleOnePacket(p)
if err != nil {
return false, err
}
if processed {
wasProcessed = true
}
case s.notifyReceivedPacket <- struct{}{}:
default:
break receiveLoop
}
}
return wasProcessed, nil
@ -1392,14 +1405,22 @@ func (s *connection) handleFrame(
// handlePacket is called by the server with a new packet
func (s *connection) handlePacket(p receivedPacket) {
s.receivedPacketMx.Lock()
// Discard packets once the amount of queued packets is larger than
// the channel size, protocol.MaxConnUnprocessedPackets
select {
case s.receivedPackets <- p:
default:
if s.receivedPackets.Len() >= protocol.MaxConnUnprocessedPackets {
if s.tracer != nil && s.tracer.DroppedPacket != nil {
s.tracer.DroppedPacket(logging.PacketTypeNotDetermined, protocol.InvalidPacketNumber, p.Size(), logging.PacketDropDOSPrevention)
}
s.receivedPacketMx.Unlock()
return
}
s.receivedPackets.PushBack(p)
s.receivedPacketMx.Unlock()
select {
case s.notifyReceivedPacket <- struct{}{}:
default:
}
}
@ -1978,7 +1999,10 @@ func (s *connection) sendPacketsWithoutGSO(now time.Time) error {
return nil
}
// Prioritize receiving of packets over sending out more packets.
if len(s.receivedPackets) > 0 {
s.receivedPacketMx.Lock()
hasPackets := !s.receivedPackets.Empty()
s.receivedPacketMx.Unlock()
if hasPackets {
s.pacingDeadline = deadlineSendImmediately
return nil
}
@ -2036,7 +2060,10 @@ func (s *connection) sendPacketsWithGSO(now time.Time) error {
}
// Prioritize receiving of packets over sending out more packets.
if len(s.receivedPackets) > 0 {
s.receivedPacketMx.Lock()
hasPackets := !s.receivedPackets.Empty()
s.receivedPacketMx.Unlock()
if hasPackets {
s.pacingDeadline = deadlineSendImmediately
return nil
}