transport: send stateless reset packets from a single Go routine (#3842)

* don't spawn new go routine for every stateless packet

* pass *receivedPacket around
This commit is contained in:
Sukun 2023-05-20 13:23:18 +05:30 committed by GitHub
parent cf267ff7d7
commit f401a73d27
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -67,6 +67,7 @@ type Transport struct {
conn rawConn conn rawConn
closeQueue chan closePacket closeQueue chan closePacket
statelessResetQueue chan *receivedPacket
listening chan struct{} // is closed when listen returns listening chan struct{} // is closed when listen returns
closed bool closed bool
@ -182,6 +183,7 @@ func (t *Transport) init(isServer bool) error {
t.listening = make(chan struct{}) t.listening = make(chan struct{})
t.closeQueue = make(chan closePacket, 4) t.closeQueue = make(chan closePacket, 4)
t.statelessResetQueue = make(chan *receivedPacket, 4)
if t.ConnectionIDGenerator != nil { if t.ConnectionIDGenerator != nil {
t.connIDGenerator = t.ConnectionIDGenerator t.connIDGenerator = t.ConnectionIDGenerator
@ -196,7 +198,7 @@ func (t *Transport) init(isServer bool) error {
} }
go t.listen(conn) go t.listen(conn)
go t.runCloseQueue() go t.runSendQueue()
}) })
return t.initErr return t.initErr
} }
@ -210,13 +212,15 @@ func (t *Transport) enqueueClosePacket(p closePacket) {
} }
} }
func (t *Transport) runCloseQueue() { func (t *Transport) runSendQueue() {
for { for {
select { select {
case <-t.listening: case <-t.listening:
return return
case p := <-t.closeQueue: case p := <-t.closeQueue:
t.conn.WritePacket(p.payload, p.addr, p.info.OOB()) t.conn.WritePacket(p.payload, p.addr, p.info.OOB())
case p := <-t.statelessResetQueue:
t.sendStatelessReset(p)
} }
} }
} }
@ -340,7 +344,7 @@ func (t *Transport) handlePacket(p *receivedPacket) {
return return
} }
if !wire.IsLongHeaderPacket(p.data[0]) { if !wire.IsLongHeaderPacket(p.data[0]) {
go t.maybeSendStatelessReset(p, connID) t.maybeSendStatelessReset(p)
return return
} }
@ -353,14 +357,33 @@ func (t *Transport) handlePacket(p *receivedPacket) {
t.server.handlePacket(p) t.server.handlePacket(p)
} }
func (t *Transport) maybeSendStatelessReset(p *receivedPacket, connID protocol.ConnectionID) { func (t *Transport) maybeSendStatelessReset(p *receivedPacket) {
defer p.buffer.Release()
if t.StatelessResetKey == nil { if t.StatelessResetKey == nil {
p.buffer.Release()
return return
} }
// Don't send a stateless reset in response to very small packets. // Don't send a stateless reset in response to very small packets.
// This includes packets that could be stateless resets. // This includes packets that could be stateless resets.
if len(p.data) <= protocol.MinStatelessResetSize { if len(p.data) <= protocol.MinStatelessResetSize {
p.buffer.Release()
return
}
select {
case t.statelessResetQueue <- p:
default:
// it's fine to not send a stateless reset when we're busy
p.buffer.Release()
}
}
func (t *Transport) sendStatelessReset(p *receivedPacket) {
defer p.buffer.Release()
connID, err := wire.ParseConnectionID(p.data, t.connIDLen)
if err != nil {
t.logger.Errorf("error parsing connection ID on packet from %s: %s", p.remoteAddr, err)
return return
} }
token := t.handlerMap.GetStatelessResetToken(connID) token := t.handlerMap.GetStatelessResetToken(connID)
@ -370,7 +393,7 @@ func (t *Transport) maybeSendStatelessReset(p *receivedPacket, connID protocol.C
data[0] = (data[0] & 0x7f) | 0x40 data[0] = (data[0] & 0x7f) | 0x40
data = append(data, token[:]...) data = append(data, token[:]...)
if _, err := t.conn.WritePacket(data, p.remoteAddr, p.info.OOB()); err != nil { if _, err := t.conn.WritePacket(data, p.remoteAddr, p.info.OOB()); err != nil {
t.logger.Debugf("Error sending Stateless Reset: %s", err) t.logger.Debugf("Error sending Stateless Reset to %s: %s", p.remoteAddr, err)
} }
} }