drop incoming packets when the server's receive queue is full

This commit is contained in:
Marten Seemann 2020-02-15 16:43:00 +07:00
parent 9b71878d53
commit 643f0b4c67
3 changed files with 55 additions and 2 deletions

View file

@ -39,6 +39,9 @@ const DefaultMaxIncomingStreams = 100
// DefaultMaxIncomingUniStreams is the maximum number of unidirectional streams that a peer may open
const DefaultMaxIncomingUniStreams = 100
// MaxServerUnprocessedPackets is the max number of packets stored in the server that are not yet processed.
const MaxServerUnprocessedPackets = 1024
// MaxSessionUnprocessedPackets is the max number of packets stored in each session that are not yet processed.
const MaxSessionUnprocessedPackets = MaxCongestionWindowPackets

View file

@ -179,7 +179,7 @@ func listen(conn net.PacketConn, tlsConf *tls.Config, config *Config, acceptEarl
zeroRTTQueue: newZeroRTTQueue(),
sessionQueue: make(chan quicSession),
errorChan: make(chan struct{}),
receivedPackets: make(chan *receivedPacket, 1000),
receivedPackets: make(chan *receivedPacket, protocol.MaxServerUnprocessedPackets),
newSession: newSession,
logger: utils.DefaultLogger.WithPrefix("server"),
acceptEarlySessions: acceptEarly,
@ -285,7 +285,11 @@ func (s *baseServer) Addr() net.Addr {
}
func (s *baseServer) handlePacket(p *receivedPacket) {
s.receivedPackets <- p
select {
case s.receivedPackets <- p:
default:
s.logger.Debugf("Dropping packet from %s (%d bytes). Server receive queue full.", p.remoteAddr, len(p.data))
}
}
func (s *baseServer) handlePacketImpl(p *receivedPacket) bool /* was the packet handled */ {

View file

@ -11,6 +11,7 @@ import (
"runtime/pprof"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/lucas-clemente/quic-go/qlog"
@ -438,6 +439,51 @@ var _ = Describe("Server", func() {
Expect(createdSession).To(BeTrue())
})
It("drops packets if the receive queue is full", func() {
phm.EXPECT().GetStatelessResetToken(gomock.Any()).AnyTimes()
phm.EXPECT().Add(gomock.Any(), gomock.Any()).AnyTimes()
serv.config.AcceptToken = func(net.Addr, *Token) bool { return true }
acceptSession := make(chan struct{})
var counter uint32 // to be used as an atomic, so we query it in Eventually
serv.newSession = func(
_ connection,
runner sessionRunner,
_ protocol.ConnectionID,
_ protocol.ConnectionID,
_ protocol.ConnectionID,
_ protocol.ConnectionID,
_ [16]byte,
_ *Config,
_ *tls.Config,
_ *handshake.TokenGenerator,
_ bool,
_ qlog.Tracer,
_ utils.Logger,
_ protocol.VersionNumber,
) quicSession {
<-acceptSession
atomic.AddUint32(&counter, 1)
return nil
}
serv.handlePacket(getInitial(protocol.ConnectionID{1, 2, 3, 4, 5, 6, 7, 8}))
var wg sync.WaitGroup
for i := 0; i < 3*protocol.MaxServerUnprocessedPackets; i++ {
wg.Add(1)
go func() {
defer GinkgoRecover()
defer wg.Done()
serv.handlePacket(getInitial(protocol.ConnectionID{1, 2, 3, 4, 5, 6, 7, 8}))
}()
}
wg.Wait()
close(acceptSession)
Eventually(func() uint32 { return atomic.LoadUint32(&counter) }).Should(BeEquivalentTo(protocol.MaxServerUnprocessedPackets + 1))
Consistently(func() uint32 { return atomic.LoadUint32(&counter) }).Should(BeEquivalentTo(protocol.MaxServerUnprocessedPackets + 1))
})
It("only creates a single session for a duplicate Initial", func() {
serv.config.AcceptToken = func(_ net.Addr, _ *Token) bool { return true }
var createdSession bool