mirror of
https://github.com/refraction-networking/uquic.git
synced 2025-04-05 05:07:36 +03:00
use type assertions to identify 0-RTT queues in packet handler map
This commit is contained in:
parent
57650fca7a
commit
19c6a1b252
1 changed files with 32 additions and 38 deletions
|
@ -57,11 +57,6 @@ type rawConn interface {
|
||||||
io.Closer
|
io.Closer
|
||||||
}
|
}
|
||||||
|
|
||||||
type packetHandlerMapEntry struct {
|
|
||||||
packetHandler packetHandler
|
|
||||||
is0RTTQueue bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// The packetHandlerMap stores packetHandlers, identified by connection ID.
|
// The packetHandlerMap stores packetHandlers, identified by connection ID.
|
||||||
// It is used:
|
// It is used:
|
||||||
// * by the server to store connections
|
// * by the server to store connections
|
||||||
|
@ -72,7 +67,7 @@ type packetHandlerMap struct {
|
||||||
conn rawConn
|
conn rawConn
|
||||||
connIDLen int
|
connIDLen int
|
||||||
|
|
||||||
handlers map[string] /* string(ConnectionID)*/ packetHandlerMapEntry
|
handlers map[string] /* string(ConnectionID)*/ packetHandler
|
||||||
resetTokens map[protocol.StatelessResetToken] /* stateless reset token */ packetHandler
|
resetTokens map[protocol.StatelessResetToken] /* stateless reset token */ packetHandler
|
||||||
server unknownPacketHandler
|
server unknownPacketHandler
|
||||||
numZeroRTTEntries int
|
numZeroRTTEntries int
|
||||||
|
@ -151,7 +146,7 @@ func newPacketHandlerMap(
|
||||||
conn: conn,
|
conn: conn,
|
||||||
connIDLen: connIDLen,
|
connIDLen: connIDLen,
|
||||||
listening: make(chan struct{}),
|
listening: make(chan struct{}),
|
||||||
handlers: make(map[string]packetHandlerMapEntry),
|
handlers: make(map[string]packetHandler),
|
||||||
resetTokens: make(map[protocol.StatelessResetToken]packetHandler),
|
resetTokens: make(map[protocol.StatelessResetToken]packetHandler),
|
||||||
deleteRetiredConnsAfter: protocol.RetiredConnectionIDDeleteTimeout,
|
deleteRetiredConnsAfter: protocol.RetiredConnectionIDDeleteTimeout,
|
||||||
zeroRTTQueueDuration: protocol.Max0RTTQueueingDuration,
|
zeroRTTQueueDuration: protocol.Max0RTTQueueingDuration,
|
||||||
|
@ -202,7 +197,7 @@ func (h *packetHandlerMap) Add(id protocol.ConnectionID, handler packetHandler)
|
||||||
h.logger.Debugf("Not adding connection ID %s, as it already exists.", id)
|
h.logger.Debugf("Not adding connection ID %s, as it already exists.", id)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
h.handlers[string(id)] = packetHandlerMapEntry{packetHandler: handler}
|
h.handlers[string(id)] = handler
|
||||||
h.logger.Debugf("Adding connection ID %s.", id)
|
h.logger.Debugf("Adding connection ID %s.", id)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@ -212,24 +207,24 @@ func (h *packetHandlerMap) AddWithConnID(clientDestConnID, newConnID protocol.Co
|
||||||
defer h.mutex.Unlock()
|
defer h.mutex.Unlock()
|
||||||
|
|
||||||
var q *zeroRTTQueue
|
var q *zeroRTTQueue
|
||||||
if entry, ok := h.handlers[string(clientDestConnID)]; ok {
|
if handler, ok := h.handlers[string(clientDestConnID)]; ok {
|
||||||
if !entry.is0RTTQueue {
|
q, ok = handler.(*zeroRTTQueue)
|
||||||
|
if !ok {
|
||||||
h.logger.Debugf("Not adding connection ID %s for a new connection, as it already exists.", clientDestConnID)
|
h.logger.Debugf("Not adding connection ID %s for a new connection, as it already exists.", clientDestConnID)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
q = entry.packetHandler.(*zeroRTTQueue)
|
|
||||||
q.retireTimer.Stop()
|
q.retireTimer.Stop()
|
||||||
h.numZeroRTTEntries--
|
h.numZeroRTTEntries--
|
||||||
if h.numZeroRTTEntries < 0 {
|
if h.numZeroRTTEntries < 0 {
|
||||||
panic("number of 0-RTT queues < 0")
|
panic("number of 0-RTT queues < 0")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sess := fn()
|
conn := fn()
|
||||||
if q != nil {
|
if q != nil {
|
||||||
q.EnqueueAll(sess)
|
q.EnqueueAll(conn)
|
||||||
}
|
}
|
||||||
h.handlers[string(clientDestConnID)] = packetHandlerMapEntry{packetHandler: sess}
|
h.handlers[string(clientDestConnID)] = conn
|
||||||
h.handlers[string(newConnID)] = packetHandlerMapEntry{packetHandler: sess}
|
h.handlers[string(newConnID)] = conn
|
||||||
h.logger.Debugf("Adding connection IDs %s and %s for a new connection.", clientDestConnID, newConnID)
|
h.logger.Debugf("Adding connection IDs %s and %s for a new connection.", clientDestConnID, newConnID)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@ -253,7 +248,7 @@ func (h *packetHandlerMap) Retire(id protocol.ConnectionID) {
|
||||||
|
|
||||||
func (h *packetHandlerMap) ReplaceWithClosed(id protocol.ConnectionID, handler packetHandler) {
|
func (h *packetHandlerMap) ReplaceWithClosed(id protocol.ConnectionID, handler packetHandler) {
|
||||||
h.mutex.Lock()
|
h.mutex.Lock()
|
||||||
h.handlers[string(id)] = packetHandlerMapEntry{packetHandler: handler}
|
h.handlers[string(id)] = handler
|
||||||
h.mutex.Unlock()
|
h.mutex.Unlock()
|
||||||
h.logger.Debugf("Replacing connection for connection ID %s with a closed connection.", id)
|
h.logger.Debugf("Replacing connection for connection ID %s with a closed connection.", id)
|
||||||
|
|
||||||
|
@ -292,14 +287,14 @@ func (h *packetHandlerMap) CloseServer() {
|
||||||
}
|
}
|
||||||
h.server = nil
|
h.server = nil
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for _, entry := range h.handlers {
|
for _, handler := range h.handlers {
|
||||||
if entry.packetHandler.getPerspective() == protocol.PerspectiveServer {
|
if handler.getPerspective() == protocol.PerspectiveServer {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(handler packetHandler) {
|
go func(handler packetHandler) {
|
||||||
// blocks until the CONNECTION_CLOSE has been sent and the run-loop has stopped
|
// blocks until the CONNECTION_CLOSE has been sent and the run-loop has stopped
|
||||||
handler.shutdown()
|
handler.shutdown()
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}(entry.packetHandler)
|
}(handler)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
h.mutex.Unlock()
|
h.mutex.Unlock()
|
||||||
|
@ -324,12 +319,12 @@ func (h *packetHandlerMap) close(e error) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for _, entry := range h.handlers {
|
for _, handler := range h.handlers {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(handler packetHandler) {
|
go func(handler packetHandler) {
|
||||||
handler.destroy(e)
|
handler.destroy(e)
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}(entry.packetHandler)
|
}(handler)
|
||||||
}
|
}
|
||||||
|
|
||||||
if h.server != nil {
|
if h.server != nil {
|
||||||
|
@ -379,14 +374,14 @@ func (h *packetHandlerMap) handlePacket(p *receivedPacket) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if entry, ok := h.handlers[string(connID)]; ok {
|
if handler, ok := h.handlers[string(connID)]; ok {
|
||||||
if entry.is0RTTQueue { // only enqueue 0-RTT packets in the 0-RTT queue
|
if ha, ok := handler.(*zeroRTTQueue); ok { // only enqueue 0-RTT packets in the 0-RTT queue
|
||||||
if wire.Is0RTTPacket(p.data) {
|
if wire.Is0RTTPacket(p.data) {
|
||||||
entry.packetHandler.handlePacket(p)
|
ha.handlePacket(p)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
} else { // existing connection
|
} else { // existing connection
|
||||||
entry.packetHandler.handlePacket(p)
|
handler.handlePacket(p)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -404,26 +399,25 @@ func (h *packetHandlerMap) handlePacket(p *receivedPacket) {
|
||||||
}
|
}
|
||||||
h.numZeroRTTEntries++
|
h.numZeroRTTEntries++
|
||||||
queue := &zeroRTTQueue{queue: make([]*receivedPacket, 0, 8)}
|
queue := &zeroRTTQueue{queue: make([]*receivedPacket, 0, 8)}
|
||||||
h.handlers[string(connID)] = packetHandlerMapEntry{
|
h.handlers[string(connID)] = queue
|
||||||
packetHandler: queue,
|
|
||||||
is0RTTQueue: true,
|
|
||||||
}
|
|
||||||
queue.retireTimer = time.AfterFunc(h.zeroRTTQueueDuration, func() {
|
queue.retireTimer = time.AfterFunc(h.zeroRTTQueueDuration, func() {
|
||||||
h.mutex.Lock()
|
h.mutex.Lock()
|
||||||
defer h.mutex.Unlock()
|
defer h.mutex.Unlock()
|
||||||
// The entry might have been replaced by an actual connection.
|
// The entry might have been replaced by an actual connection.
|
||||||
// Only delete it if it's still a 0-RTT queue.
|
// Only delete it if it's still a 0-RTT queue.
|
||||||
if entry, ok := h.handlers[string(connID)]; ok && entry.is0RTTQueue {
|
if handler, ok := h.handlers[string(connID)]; ok {
|
||||||
|
if q, ok := handler.(*zeroRTTQueue); ok {
|
||||||
delete(h.handlers, string(connID))
|
delete(h.handlers, string(connID))
|
||||||
h.numZeroRTTEntries--
|
h.numZeroRTTEntries--
|
||||||
if h.numZeroRTTEntries < 0 {
|
if h.numZeroRTTEntries < 0 {
|
||||||
panic("number of 0-RTT queues < 0")
|
panic("number of 0-RTT queues < 0")
|
||||||
}
|
}
|
||||||
entry.packetHandler.(*zeroRTTQueue).Clear()
|
q.Clear()
|
||||||
if h.logger.Debug() {
|
if h.logger.Debug() {
|
||||||
h.logger.Debugf("Removing 0-RTT queue for %s.", connID)
|
h.logger.Debugf("Removing 0-RTT queue for %s.", connID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
})
|
})
|
||||||
queue.handlePacket(p)
|
queue.handlePacket(p)
|
||||||
return
|
return
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue