proxy: optimize packet sorting logic (#4923)

This commit is contained in:
Marten Seemann 2025-01-26 05:27:12 +01:00 committed by GitHub
parent 3e87ea3f50
commit f20b823154
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 48 additions and 16 deletions

View file

@ -5,7 +5,7 @@ import (
"fmt"
"net"
"os"
"sort"
"slices"
"sync"
"time"
@ -64,17 +64,11 @@ type packetEntry struct {
Raw []byte
}
type packetEntries []packetEntry
func (e packetEntries) Len() int { return len(e) }
func (e packetEntries) Less(i, j int) bool { return e[i].Time.Before(e[j].Time) }
func (e packetEntries) Swap(i, j int) { e[i], e[j] = e[j], e[i] }
type queue struct {
sync.Mutex
timer *utils.Timer
Packets packetEntries
Packets []packetEntry // sorted by the packetEntry.Time
}
func newQueue() *queue {
@ -83,15 +77,27 @@ func newQueue() *queue {
func (q *queue) Add(e packetEntry) {
q.Lock()
q.Packets = append(q.Packets, e)
if len(q.Packets) > 1 {
lastIndex := len(q.Packets) - 1
if q.Packets[lastIndex].Time.Before(q.Packets[lastIndex-1].Time) {
sort.Stable(q.Packets)
}
defer q.Unlock()
if len(q.Packets) == 0 {
q.Packets = append(q.Packets, e)
q.timer.Reset(e.Time)
return
}
// The packets slice is sorted by the packetEntry.Time.
// We only need to insert the packet at the correct position.
idx := slices.IndexFunc(q.Packets, func(p packetEntry) bool {
return p.Time.After(e.Time)
})
if idx == -1 {
q.Packets = append(q.Packets, e)
} else {
q.Packets = slices.Insert(q.Packets, idx, e)
}
if idx == 0 {
q.timer.Reset(q.Packets[0].Time)
}
q.timer.Reset(q.Packets[0].Time)
q.Unlock()
}
func (q *queue) Get() []byte {

View file

@ -13,6 +13,32 @@ import (
"github.com/stretchr/testify/require"
)
func TestPacketQueue(t *testing.T) {
q := newQueue()
getPackets := func() []string {
packets := make([]string, 0, len(q.Packets))
for _, p := range q.Packets {
packets = append(packets, string(p.Raw))
}
return packets
}
require.Empty(t, getPackets())
now := time.Now()
q.Add(packetEntry{Time: now, Raw: []byte("p3")})
require.Equal(t, []string{"p3"}, getPackets())
q.Add(packetEntry{Time: now.Add(time.Second), Raw: []byte("p4")})
require.Equal(t, []string{"p3", "p4"}, getPackets())
q.Add(packetEntry{Time: now.Add(-time.Second), Raw: []byte("p1")})
require.Equal(t, []string{"p1", "p3", "p4"}, getPackets())
q.Add(packetEntry{Time: now.Add(time.Second), Raw: []byte("p5")})
require.Equal(t, []string{"p1", "p3", "p4", "p5"}, getPackets())
q.Add(packetEntry{Time: now.Add(-time.Second), Raw: []byte("p2")})
require.Equal(t, []string{"p1", "p2", "p3", "p4", "p5"}, getPackets())
}
func newUPDConnLocalhost(t testing.TB) *net.UDPConn {
t.Helper()
conn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0})