use batched reads

This commit is contained in:
Marten Seemann 2020-11-04 12:33:10 +07:00
parent e9d12b7f83
commit 870c759515
7 changed files with 163 additions and 21 deletions

View file

@ -15,3 +15,7 @@ const (
msgTypeIPv4PKTINFO = unix.IP_PKTINFO
msgTypeIPv6PKTINFO = 0x2e
)
// ReadBatch only returns a single packet on OSX,
// see https://godoc.org/golang.org/x/net/ipv4#PacketConn.ReadBatch.
const batchSize = 1

View file

@ -15,3 +15,5 @@ const (
msgTypeIPv4PKTINFO = 0x7
msgTypeIPv6PKTINFO = 0x2e
)
const batchSize = 8

View file

@ -15,3 +15,5 @@ const (
msgTypeIPv4PKTINFO = unix.IP_PKTINFO
msgTypeIPv6PKTINFO = unix.IPV6_PKTINFO
)
const batchSize = 8 // needs to smaller than MaxUint8 (otherwise the type of oobConn.readPos has to be changed)

View file

@ -12,13 +12,27 @@ import (
"time"
"unsafe"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
"golang.org/x/sys/unix"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/utils"
)
const ecnMask uint8 = 0x3
const (
ecnMask = 0x3
oobBufferSize = 128
)
// Contrary to what the naming suggests, the ipv{4,6}.Message is not dependent on the IP version.
// They're both just aliases for x/net/internal/socket.Message.
// This means we can use this struct to read from a socket that receives both IPv4 and IPv6 messages.
var _ ipv4.Message = ipv6.Message{}
type batchConn interface {
ReadBatch(ms []ipv4.Message, flags int) (int, error)
}
func inspectReadBuffer(c interface{}) (int, error) {
conn, ok := c.(interface {
@ -43,7 +57,12 @@ func inspectReadBuffer(c interface{}) (int, error) {
type oobConn struct {
OOBCapablePacketConn
oobBuffer []byte
batchConn batchConn
readPos uint8
// Packets received from the kernel, but not yet returned by ReadPacket().
messages []ipv4.Message
buffers [batchSize]*packetBuffer
}
var _ connection = &oobConn{}
@ -94,23 +113,41 @@ func newConn(c OOBCapablePacketConn) (*oobConn, error) {
return nil, errors.New("activating packet info failed for both IPv4 and IPv6")
}
}
return &oobConn{
oobConn := &oobConn{
OOBCapablePacketConn: c,
oobBuffer: make([]byte, 128),
}, nil
batchConn: ipv4.NewPacketConn(c),
messages: make([]ipv4.Message, batchSize),
readPos: batchSize,
}
for i := 0; i < batchSize; i++ {
oobConn.messages[i].OOB = make([]byte, oobBufferSize)
}
return oobConn, nil
}
func (c *oobConn) ReadPacket() (*receivedPacket, error) {
buffer := getPacketBuffer()
// The packet size should not exceed protocol.MaxPacketBufferSize bytes
// If it does, we only read a truncated packet, which will then end up undecryptable
buffer.Data = buffer.Data[:protocol.MaxPacketBufferSize]
c.oobBuffer = c.oobBuffer[:cap(c.oobBuffer)]
n, oobn, _, addr, err := c.OOBCapablePacketConn.ReadMsgUDP(buffer.Data, c.oobBuffer)
if err != nil {
return nil, err
if len(c.messages) == int(c.readPos) { // all messages read. Read the next batch of messages.
c.messages = c.messages[:batchSize]
// replace buffers data buffers up to the packet that has been consumed during the last ReadBatch call
for i := uint8(0); i < c.readPos; i++ {
buffer := getPacketBuffer()
buffer.Data = buffer.Data[:protocol.MaxPacketBufferSize]
c.buffers[i] = buffer
c.messages[i].Buffers = [][]byte{c.buffers[i].Data}
}
c.readPos = 0
n, err := c.batchConn.ReadBatch(c.messages, 0)
if n == 0 || err != nil {
return nil, err
}
c.messages = c.messages[:n]
}
ctrlMsgs, err := unix.ParseSocketControlMessage(c.oobBuffer[:oobn])
msg := c.messages[c.readPos]
buffer := c.buffers[c.readPos]
c.readPos++
ctrlMsgs, err := unix.ParseSocketControlMessage(msg.OOB[:msg.NN])
if err != nil {
return nil, err
}
@ -129,13 +166,15 @@ func (c *oobConn) ReadPacket() (*receivedPacket, error) {
// struct in_addr ipi_addr; /* Header Destination
// address */
// };
ip := make([]byte, 4)
if len(ctrlMsg.Data) == 12 {
ifIndex = binary.LittleEndian.Uint32(ctrlMsg.Data)
destIP = net.IP(ctrlMsg.Data[8:12])
copy(ip, ctrlMsg.Data[8:12])
} else if len(ctrlMsg.Data) == 4 {
// FreeBSD
destIP = net.IP(ctrlMsg.Data)
copy(ip, ctrlMsg.Data)
}
destIP = net.IP(ip)
}
}
if ctrlMsg.Header.Level == unix.IPPROTO_IPV6 {
@ -148,7 +187,9 @@ func (c *oobConn) ReadPacket() (*receivedPacket, error) {
// unsigned int ipi6_ifindex; /* send/recv interface index */
// };
if len(ctrlMsg.Data) == 20 {
destIP = net.IP(ctrlMsg.Data[:16])
ip := make([]byte, 16)
copy(ip, ctrlMsg.Data[:16])
destIP = net.IP(ip)
ifIndex = binary.LittleEndian.Uint32(ctrlMsg.Data[16:])
}
}
@ -162,9 +203,9 @@ func (c *oobConn) ReadPacket() (*receivedPacket, error) {
}
}
return &receivedPacket{
remoteAddr: addr,
remoteAddr: msg.Addr,
rcvTime: time.Now(),
data: buffer.Data[:n],
data: msg.Buffers[0][:msg.N],
ecn: ecn,
info: info,
buffer: buffer,

View file

@ -3,11 +3,14 @@
package quic
import (
"fmt"
"net"
"time"
"golang.org/x/net/ipv4"
"golang.org/x/sys/unix"
"github.com/golang/mock/gomock"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/utils"
@ -21,14 +24,14 @@ var _ = Describe("OOB Conn Test", func() {
Expect(err).ToNot(HaveOccurred())
udpConn, err := net.ListenUDP(network, addr)
Expect(err).ToNot(HaveOccurred())
ecnConn, err := newConn(udpConn)
oobConn, err := newConn(udpConn)
Expect(err).ToNot(HaveOccurred())
packetChan := make(chan *receivedPacket)
go func() {
defer GinkgoRecover()
for {
p, err := ecnConn.ReadPacket()
p, err := oobConn.ReadPacket()
if err != nil {
return
}
@ -197,4 +200,43 @@ var _ = Describe("OOB Conn Test", func() {
Expect(p.info.addr).To(Equal(ip6))
})
})
Context("Batch Reading", func() {
var batchConn *MockBatchConn
BeforeEach(func() {
batchConn = NewMockBatchConn(mockCtrl)
})
It("reads multiple messages in one batch", func() {
const numMsgRead = batchSize/2 + 1
var counter int
batchConn.EXPECT().ReadBatch(gomock.Any(), gomock.Any()).DoAndReturn(func(ms []ipv4.Message, flags int) (int, error) {
Expect(ms).To(HaveLen(batchSize))
for i := 0; i < numMsgRead; i++ {
Expect(ms[i].Buffers).To(HaveLen(1))
Expect(ms[i].Buffers[0]).To(HaveLen(int(protocol.MaxPacketBufferSize)))
data := []byte(fmt.Sprintf("message %d", counter))
counter++
ms[i].Buffers[0] = data
ms[i].N = len(data)
}
return numMsgRead, nil
}).Times(2)
addr, err := net.ResolveUDPAddr("udp", "localhost:0")
Expect(err).ToNot(HaveOccurred())
udpConn, err := net.ListenUDP("udp", addr)
Expect(err).ToNot(HaveOccurred())
oobConn, err := newConn(udpConn)
Expect(err).ToNot(HaveOccurred())
oobConn.batchConn = batchConn
for i := 0; i < batchSize+1; i++ {
p, err := oobConn.ReadPacket()
Expect(err).ToNot(HaveOccurred())
Expect(string(p.data)).To(Equal(fmt.Sprintf("message %d", i)))
}
})
})
})

50
mock_batch_conn_test.go Normal file
View file

@ -0,0 +1,50 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: conn_oob.go
// Package quic is a generated GoMock package.
package quic
import (
reflect "reflect"
gomock "github.com/golang/mock/gomock"
ipv4 "golang.org/x/net/ipv4"
)
// MockBatchConn is a mock of BatchConn interface.
type MockBatchConn struct {
ctrl *gomock.Controller
recorder *MockBatchConnMockRecorder
}
// MockBatchConnMockRecorder is the mock recorder for MockBatchConn.
type MockBatchConnMockRecorder struct {
mock *MockBatchConn
}
// NewMockBatchConn creates a new mock instance.
func NewMockBatchConn(ctrl *gomock.Controller) *MockBatchConn {
mock := &MockBatchConn{ctrl: ctrl}
mock.recorder = &MockBatchConnMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockBatchConn) EXPECT() *MockBatchConnMockRecorder {
return m.recorder
}
// ReadBatch mocks base method.
func (m *MockBatchConn) ReadBatch(ms []ipv4.Message, flags int) (int, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ReadBatch", ms, flags)
ret0, _ := ret[0].(int)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ReadBatch indicates an expected call of ReadBatch.
func (mr *MockBatchConnMockRecorder) ReadBatch(ms, flags interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadBatch", reflect.TypeOf((*MockBatchConn)(nil).ReadBatch), ms, flags)
}

View file

@ -22,5 +22,6 @@ package quic
//go:generate sh -c "./mockgen_private.sh quic mock_unknown_packet_handler_test.go github.com/lucas-clemente/quic-go unknownPacketHandler"
//go:generate sh -c "./mockgen_private.sh quic mock_packet_handler_manager_test.go github.com/lucas-clemente/quic-go packetHandlerManager"
//go:generate sh -c "./mockgen_private.sh quic mock_multiplexer_test.go github.com/lucas-clemente/quic-go multiplexer"
//go:generate sh -c "./mockgen_private.sh quic mock_batch_conn_test.go github.com/lucas-clemente/quic-go batchConn"
//go:generate sh -c "mockgen -package quic -self_package github.com/lucas-clemente/quic-go -destination mock_token_store_test.go github.com/lucas-clemente/quic-go TokenStore && goimports -w mock_token_store_test.go"
//go:generate sh -c "mockgen -package quic -self_package github.com/lucas-clemente/quic-go -destination mock_packetconn_test.go net PacketConn && goimports -w mock_packetconn_test.go"