From f401a73d271b76a353313de227fb19ff53a8bedd Mon Sep 17 00:00:00 2001 From: Sukun Date: Sat, 20 May 2023 13:23:18 +0530 Subject: [PATCH] transport: send stateless reset packets from a single Go routine (#3842) * don't spawn new go routine for every stateless packet * pass *receivedPacket around --- transport.go | 37 ++++++++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/transport.go b/transport.go index 55bc7c48..c62ee388 100644 --- a/transport.go +++ b/transport.go @@ -66,7 +66,8 @@ type Transport struct { conn rawConn - closeQueue chan closePacket + closeQueue chan closePacket + statelessResetQueue chan *receivedPacket listening chan struct{} // is closed when listen returns closed bool @@ -182,6 +183,7 @@ func (t *Transport) init(isServer bool) error { t.listening = make(chan struct{}) t.closeQueue = make(chan closePacket, 4) + t.statelessResetQueue = make(chan *receivedPacket, 4) if t.ConnectionIDGenerator != nil { t.connIDGenerator = t.ConnectionIDGenerator @@ -196,7 +198,7 @@ func (t *Transport) init(isServer bool) error { } go t.listen(conn) - go t.runCloseQueue() + go t.runSendQueue() }) return t.initErr } @@ -210,13 +212,15 @@ func (t *Transport) enqueueClosePacket(p closePacket) { } } -func (t *Transport) runCloseQueue() { +func (t *Transport) runSendQueue() { for { select { case <-t.listening: return case p := <-t.closeQueue: t.conn.WritePacket(p.payload, p.addr, p.info.OOB()) + case p := <-t.statelessResetQueue: + t.sendStatelessReset(p) } } } @@ -340,7 +344,7 @@ func (t *Transport) handlePacket(p *receivedPacket) { return } if !wire.IsLongHeaderPacket(p.data[0]) { - go t.maybeSendStatelessReset(p, connID) + t.maybeSendStatelessReset(p) return } @@ -353,14 +357,33 @@ func (t *Transport) handlePacket(p *receivedPacket) { t.server.handlePacket(p) } -func (t *Transport) maybeSendStatelessReset(p *receivedPacket, connID protocol.ConnectionID) { - defer p.buffer.Release() +func (t *Transport) maybeSendStatelessReset(p *receivedPacket) { if t.StatelessResetKey == nil { + p.buffer.Release() return } + // Don't send a stateless reset in response to very small packets. // This includes packets that could be stateless resets. if len(p.data) <= protocol.MinStatelessResetSize { + p.buffer.Release() + return + } + + select { + case t.statelessResetQueue <- p: + default: + // it's fine to not send a stateless reset when we're busy + p.buffer.Release() + } +} + +func (t *Transport) sendStatelessReset(p *receivedPacket) { + defer p.buffer.Release() + + connID, err := wire.ParseConnectionID(p.data, t.connIDLen) + if err != nil { + t.logger.Errorf("error parsing connection ID on packet from %s: %s", p.remoteAddr, err) return } token := t.handlerMap.GetStatelessResetToken(connID) @@ -370,7 +393,7 @@ func (t *Transport) maybeSendStatelessReset(p *receivedPacket, connID protocol.C data[0] = (data[0] & 0x7f) | 0x40 data = append(data, token[:]...) if _, err := t.conn.WritePacket(data, p.remoteAddr, p.info.OOB()); err != nil { - t.logger.Debugf("Error sending Stateless Reset: %s", err) + t.logger.Debugf("Error sending Stateless Reset to %s: %s", p.remoteAddr, err) } }