From 2aa71ff76b7a117bc21d69e6acdb3c396295dc48 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 17 Jan 2023 23:15:02 -0800 Subject: [PATCH] use a sync.Pool for ackhandler.Frames (#3656) --- connection.go | 2 +- connection_test.go | 8 +-- framer.go | 14 ++-- framer_test.go | 4 +- internal/ackhandler/ack_eliciting.go | 2 +- internal/ackhandler/ack_eliciting_test.go | 2 +- internal/ackhandler/frame.go | 22 ++++++- internal/ackhandler/packet.go | 10 ++- .../ackhandler/sent_packet_handler_test.go | 36 +++++------ mock_frame_source_test.go | 8 +-- packet_packer.go | 41 +++++++----- packet_packer_test.go | 64 +++++++++---------- send_stream.go | 6 +- 13 files changed, 129 insertions(+), 90 deletions(-) diff --git a/connection.go b/connection.go index ed01defa..b5435633 100644 --- a/connection.go +++ b/connection.go @@ -2012,7 +2012,7 @@ func (s *connection) logLongHeaderPacket(p *longHeaderPacket) { func (s *connection) logShortHeaderPacket( destConnID protocol.ConnectionID, ackFrame *wire.AckFrame, - frames []ackhandler.Frame, + frames []*ackhandler.Frame, pn protocol.PacketNumber, pnLen protocol.PacketNumberLen, kp protocol.KeyPhaseBit, diff --git a/connection_test.go b/connection_test.go index 320f123b..d2e65ca4 100644 --- a/connection_test.go +++ b/connection_test.go @@ -309,7 +309,7 @@ var _ = Describe("Connection", func() { err := conn.handleFrame(&wire.PathChallengeFrame{Data: data}, protocol.Encryption1RTT, protocol.ConnectionID{}) Expect(err).ToNot(HaveOccurred()) frames, _ := conn.framer.AppendControlFrames(nil, 1000) - Expect(frames).To(Equal([]ackhandler.Frame{{Frame: &wire.PathResponseFrame{Data: data}}})) + Expect(frames).To(Equal([]*ackhandler.Frame{{Frame: &wire.PathResponseFrame{Data: data}}})) }) It("rejects NEW_TOKEN frames", func() { @@ -1270,7 +1270,7 @@ var _ = Describe("Connection", func() { conn.scheduleSending() Eventually(sent).Should(BeClosed()) frames, _ := conn.framer.AppendControlFrames(nil, 1000) - Expect(frames).To(Equal([]ackhandler.Frame{{Frame: &logging.DataBlockedFrame{MaximumData: 1337}}})) + Expect(frames).To(Equal([]*ackhandler.Frame{{Frame: &logging.DataBlockedFrame{MaximumData: 1337}}})) }) It("doesn't send when the SentPacketHandler doesn't allow it", func() { @@ -1867,8 +1867,8 @@ var _ = Describe("Connection", func() { handshakeCtx := conn.HandshakeComplete() Consistently(handshakeCtx.Done()).ShouldNot(BeClosed()) close(finishHandshake) - var frames []ackhandler.Frame - Eventually(func() []ackhandler.Frame { + var frames []*ackhandler.Frame + Eventually(func() []*ackhandler.Frame { frames, _ = conn.framer.AppendControlFrames(nil, protocol.MaxByteCount) return frames }).ShouldNot(BeEmpty()) diff --git a/framer.go b/framer.go index 29d36b85..6803e0d7 100644 --- a/framer.go +++ b/framer.go @@ -14,10 +14,10 @@ type framer interface { HasData() bool QueueControlFrame(wire.Frame) - AppendControlFrames([]ackhandler.Frame, protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) + AppendControlFrames([]*ackhandler.Frame, protocol.ByteCount) ([]*ackhandler.Frame, protocol.ByteCount) AddActiveStream(protocol.StreamID) - AppendStreamFrames([]ackhandler.Frame, protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) + AppendStreamFrames([]*ackhandler.Frame, protocol.ByteCount) ([]*ackhandler.Frame, protocol.ByteCount) Handle0RTTRejection() error } @@ -67,7 +67,7 @@ func (f *framerI) QueueControlFrame(frame wire.Frame) { f.controlFrameMutex.Unlock() } -func (f *framerI) AppendControlFrames(frames []ackhandler.Frame, maxLen protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) { +func (f *framerI) AppendControlFrames(frames []*ackhandler.Frame, maxLen protocol.ByteCount) ([]*ackhandler.Frame, protocol.ByteCount) { var length protocol.ByteCount f.controlFrameMutex.Lock() for len(f.controlFrames) > 0 { @@ -76,7 +76,9 @@ func (f *framerI) AppendControlFrames(frames []ackhandler.Frame, maxLen protocol if length+frameLen > maxLen { break } - frames = append(frames, ackhandler.Frame{Frame: frame}) + af := ackhandler.GetFrame() + af.Frame = frame + frames = append(frames, af) length += frameLen f.controlFrames = f.controlFrames[:len(f.controlFrames)-1] } @@ -93,7 +95,7 @@ func (f *framerI) AddActiveStream(id protocol.StreamID) { f.mutex.Unlock() } -func (f *framerI) AppendStreamFrames(frames []ackhandler.Frame, maxLen protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) { +func (f *framerI) AppendStreamFrames(frames []*ackhandler.Frame, maxLen protocol.ByteCount) ([]*ackhandler.Frame, protocol.ByteCount) { var length protocol.ByteCount var lastFrame *ackhandler.Frame f.mutex.Lock() @@ -130,7 +132,7 @@ func (f *framerI) AppendStreamFrames(frames []ackhandler.Frame, maxLen protocol. if frame == nil { continue } - frames = append(frames, *frame) + frames = append(frames, frame) length += frame.Length(f.version) lastFrame = frame } diff --git a/framer_test.go b/framer_test.go index 181ea7c2..4afd96ab 100644 --- a/framer_test.go +++ b/framer_test.go @@ -64,7 +64,7 @@ var _ = Describe("Framer", func() { ping := &wire.PingFrame{} mdf := &wire.MaxDataFrame{MaximumData: 0x42} framer.QueueControlFrame(mdf) - frames, length := framer.AppendControlFrames([]ackhandler.Frame{{Frame: ping}}, 1000) + frames, length := framer.AppendControlFrames([]*ackhandler.Frame{{Frame: ping}}, 1000) Expect(frames).To(HaveLen(2)) Expect(frames[0].Frame).To(Equal(ping)) Expect(frames[1].Frame).To(Equal(mdf)) @@ -161,7 +161,7 @@ var _ = Describe("Framer", func() { stream1.EXPECT().popStreamFrame(gomock.Any()).Return(&ackhandler.Frame{Frame: f}, false) framer.AddActiveStream(id1) mdf := &wire.MaxDataFrame{MaximumData: 1337} - frames := []ackhandler.Frame{{Frame: mdf}} + frames := []*ackhandler.Frame{{Frame: mdf}} fs, length := framer.AppendStreamFrames(frames, 1000) Expect(fs).To(HaveLen(2)) Expect(fs[0].Frame).To(Equal(mdf)) diff --git a/internal/ackhandler/ack_eliciting.go b/internal/ackhandler/ack_eliciting.go index b8cd558a..96152708 100644 --- a/internal/ackhandler/ack_eliciting.go +++ b/internal/ackhandler/ack_eliciting.go @@ -10,7 +10,7 @@ func IsFrameAckEliciting(f wire.Frame) bool { } // HasAckElicitingFrames returns true if at least one frame is ack-eliciting. -func HasAckElicitingFrames(fs []Frame) bool { +func HasAckElicitingFrames(fs []*Frame) bool { for _, f := range fs { if IsFrameAckEliciting(f.Frame) { return true diff --git a/internal/ackhandler/ack_eliciting_test.go b/internal/ackhandler/ack_eliciting_test.go index d6c4474a..d440969a 100644 --- a/internal/ackhandler/ack_eliciting_test.go +++ b/internal/ackhandler/ack_eliciting_test.go @@ -28,7 +28,7 @@ var _ = Describe("ack-eliciting frames", func() { }) It("HasAckElicitingFrames works for "+fName, func() { - Expect(HasAckElicitingFrames([]Frame{{Frame: f}})).To(Equal(e)) + Expect(HasAckElicitingFrames([]*Frame{{Frame: f}})).To(Equal(e)) }) } }) diff --git a/internal/ackhandler/frame.go b/internal/ackhandler/frame.go index aed6038d..6695a804 100644 --- a/internal/ackhandler/frame.go +++ b/internal/ackhandler/frame.go @@ -1,9 +1,29 @@ package ackhandler -import "github.com/lucas-clemente/quic-go/internal/wire" +import ( + "sync" + + "github.com/lucas-clemente/quic-go/internal/wire" +) type Frame struct { wire.Frame // nil if the frame has already been acknowledged in another packet OnLost func(wire.Frame) OnAcked func(wire.Frame) } + +var framePool = sync.Pool{New: func() any { return &Frame{} }} + +func GetFrame() *Frame { + f := framePool.Get().(*Frame) + f.OnLost = nil + f.OnAcked = nil + return f +} + +func putFrame(f *Frame) { + f.Frame = nil + f.OnLost = nil + f.OnAcked = nil + framePool.Put(f) +} diff --git a/internal/ackhandler/packet.go b/internal/ackhandler/packet.go index b8a47b7a..b951c59c 100644 --- a/internal/ackhandler/packet.go +++ b/internal/ackhandler/packet.go @@ -10,7 +10,7 @@ import ( // A Packet is a packet type Packet struct { PacketNumber protocol.PacketNumber - Frames []Frame + Frames []*Frame LargestAcked protocol.PacketNumber // InvalidPacketNumber if the packet doesn't contain an ACK Length protocol.ByteCount EncryptionLevel protocol.EncryptionLevel @@ -46,4 +46,10 @@ func GetPacket() *Packet { // We currently only return Packets back into the pool when they're acknowledged (not when they're lost). // This simplifies the code, and gives the vast majority of the performance benefit we can gain from using the pool. -func putPacket(p *Packet) { packetPool.Put(p) } +func putPacket(p *Packet) { + for _, f := range p.Frames { + putFrame(f) + } + p.Frames = nil + packetPool.Put(p) +} diff --git a/internal/ackhandler/sent_packet_handler_test.go b/internal/ackhandler/sent_packet_handler_test.go index 8f767cbb..ab386a69 100644 --- a/internal/ackhandler/sent_packet_handler_test.go +++ b/internal/ackhandler/sent_packet_handler_test.go @@ -54,7 +54,7 @@ var _ = Describe("SentPacketHandler", func() { p.SendTime = time.Now() } if len(p.Frames) == 0 { - p.Frames = []Frame{ + p.Frames = []*Frame{ {Frame: &wire.PingFrame{}, OnLost: func(wire.Frame) { lostPackets = append(lostPackets, p.PacketNumber) }}, } } @@ -276,7 +276,7 @@ var _ = Describe("SentPacketHandler", func() { ping := &wire.PingFrame{} handler.SentPacket(ackElicitingPacket(&Packet{ PacketNumber: 13, - Frames: []Frame{{ + Frames: []*Frame{{ Frame: ping, OnAcked: func(f wire.Frame) { Expect(f).To(Equal(ping)) acked = true @@ -428,20 +428,20 @@ var _ = Describe("SentPacketHandler", func() { { PacketNumber: 13, LargestAcked: 100, - Frames: []Frame{{Frame: &streamFrame, OnLost: func(wire.Frame) {}}}, + Frames: []*Frame{{Frame: &streamFrame, OnLost: func(wire.Frame) {}}}, Length: 1, EncryptionLevel: protocol.Encryption1RTT, }, { PacketNumber: 14, LargestAcked: 200, - Frames: []Frame{{Frame: &streamFrame, OnLost: func(wire.Frame) {}}}, + Frames: []*Frame{{Frame: &streamFrame, OnLost: func(wire.Frame) {}}}, Length: 1, EncryptionLevel: protocol.Encryption1RTT, }, { PacketNumber: 15, - Frames: []Frame{{Frame: &streamFrame, OnLost: func(wire.Frame) {}}}, + Frames: []*Frame{{Frame: &streamFrame, OnLost: func(wire.Frame) {}}}, Length: 1, EncryptionLevel: protocol.Encryption1RTT, }, @@ -501,7 +501,7 @@ var _ = Describe("SentPacketHandler", func() { handler.SentPacket(&Packet{ PacketNumber: 1, Length: 42, - Frames: []Frame{{Frame: &wire.PingFrame{}, OnLost: func(wire.Frame) {}}}, + Frames: []*Frame{{Frame: &wire.PingFrame{}, OnLost: func(wire.Frame) {}}}, EncryptionLevel: protocol.Encryption1RTT, }) }) @@ -548,7 +548,7 @@ var _ = Describe("SentPacketHandler", func() { PacketNumber: 1, SendTime: time.Now().Add(-time.Hour), IsPathMTUProbePacket: true, - Frames: []Frame{{Frame: &wire.PingFrame{}, OnLost: func(wire.Frame) { mtuPacketDeclaredLost = true }}}, + Frames: []*Frame{{Frame: &wire.PingFrame{}, OnLost: func(wire.Frame) { mtuPacketDeclaredLost = true }}}, })) handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 2})) // lose packet 1, but don't EXPECT any calls to OnPacketLost() @@ -595,7 +595,7 @@ var _ = Describe("SentPacketHandler", func() { handler.SentPacket(&Packet{ Length: 42, EncryptionLevel: protocol.EncryptionInitial, - Frames: []Frame{{Frame: &wire.PingFrame{}}}, + Frames: []*Frame{{Frame: &wire.PingFrame{}}}, SendTime: time.Now(), }) cong.EXPECT().CanSend(protocol.ByteCount(42)).Return(true) @@ -755,7 +755,7 @@ var _ = Describe("SentPacketHandler", func() { handler.SentPacket(ackElicitingPacket(&Packet{ PacketNumber: 1, SendTime: time.Now().Add(-time.Hour), - Frames: []Frame{ + Frames: []*Frame{ {Frame: &wire.PingFrame{}, OnLost: func(wire.Frame) { lostPackets = append(lostPackets, 1) }}, }, })) @@ -775,7 +775,7 @@ var _ = Describe("SentPacketHandler", func() { handler.SentPacket(ackElicitingPacket(&Packet{ PacketNumber: pn, SendTime: time.Now().Add(-time.Hour), - Frames: []Frame{ + Frames: []*Frame{ {Frame: &wire.PingFrame{}, OnLost: func(wire.Frame) { lostPackets = append(lostPackets, 1) }}, }, })) @@ -895,7 +895,7 @@ var _ = Describe("SentPacketHandler", func() { PacketNumber: 1, Length: 599, EncryptionLevel: protocol.EncryptionInitial, - Frames: []Frame{{Frame: &wire.PingFrame{}}}, + Frames: []*Frame{{Frame: &wire.PingFrame{}}}, SendTime: time.Now(), }) Expect(handler.SendMode()).To(Equal(SendAny)) @@ -903,7 +903,7 @@ var _ = Describe("SentPacketHandler", func() { PacketNumber: 2, Length: 1, EncryptionLevel: protocol.EncryptionInitial, - Frames: []Frame{{Frame: &wire.PingFrame{}}}, + Frames: []*Frame{{Frame: &wire.PingFrame{}}}, SendTime: time.Now(), }) Expect(handler.SendMode()).To(Equal(SendNone)) @@ -915,7 +915,7 @@ var _ = Describe("SentPacketHandler", func() { PacketNumber: 1, Length: 900, EncryptionLevel: protocol.EncryptionInitial, - Frames: []Frame{{Frame: &wire.PingFrame{}}}, + Frames: []*Frame{{Frame: &wire.PingFrame{}}}, SendTime: time.Now(), }) // Amplification limited. We don't need to set a timer now. @@ -931,7 +931,7 @@ var _ = Describe("SentPacketHandler", func() { PacketNumber: 1, Length: 900, EncryptionLevel: protocol.EncryptionHandshake, - Frames: []Frame{{Frame: &wire.PingFrame{}}}, + Frames: []*Frame{{Frame: &wire.PingFrame{}}}, SendTime: time.Now(), }) // Amplification limited. We don't need to set a timer now. @@ -965,7 +965,7 @@ var _ = Describe("SentPacketHandler", func() { PacketNumber: 1, Length: 900, EncryptionLevel: protocol.EncryptionInitial, - Frames: []Frame{{Frame: &wire.PingFrame{}}}, + Frames: []*Frame{{Frame: &wire.PingFrame{}}}, SendTime: time.Now(), }) Expect(handler.SendMode()).To(Equal(SendAny)) @@ -1158,7 +1158,7 @@ var _ = Describe("SentPacketHandler", func() { PacketNumber: 1, SendTime: now.Add(-3 * time.Second), IsPathMTUProbePacket: true, - Frames: []Frame{{Frame: &wire.PingFrame{}, OnLost: func(wire.Frame) { mtuPacketDeclaredLost = true }}}, + Frames: []*Frame{{Frame: &wire.PingFrame{}, OnLost: func(wire.Frame) { mtuPacketDeclaredLost = true }}}, })) handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 2, SendTime: now.Add(-3 * time.Second)})) ack := &wire.AckFrame{AckRanges: []wire.AckRange{{Smallest: 2, Largest: 2}}} @@ -1341,7 +1341,7 @@ var _ = Describe("SentPacketHandler", func() { handler.SentPacket(&Packet{ PacketNumber: 13, EncryptionLevel: protocol.EncryptionInitial, - Frames: []Frame{ + Frames: []*Frame{ {Frame: &wire.CryptoFrame{Data: []byte("foobar")}, OnLost: func(wire.Frame) { lostInitial = true }}, }, Length: 100, @@ -1350,7 +1350,7 @@ var _ = Describe("SentPacketHandler", func() { handler.SentPacket(&Packet{ PacketNumber: pn, EncryptionLevel: protocol.Encryption0RTT, - Frames: []Frame{ + Frames: []*Frame{ {Frame: &wire.StreamFrame{Data: []byte("foobar")}, OnLost: func(wire.Frame) { lost0RTT = true }}, }, Length: 999, diff --git a/mock_frame_source_test.go b/mock_frame_source_test.go index 616af4c1..ed2efa67 100644 --- a/mock_frame_source_test.go +++ b/mock_frame_source_test.go @@ -36,10 +36,10 @@ func (m *MockFrameSource) EXPECT() *MockFrameSourceMockRecorder { } // AppendControlFrames mocks base method. -func (m *MockFrameSource) AppendControlFrames(arg0 []ackhandler.Frame, arg1 protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) { +func (m *MockFrameSource) AppendControlFrames(arg0 []*ackhandler.Frame, arg1 protocol.ByteCount) ([]*ackhandler.Frame, protocol.ByteCount) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AppendControlFrames", arg0, arg1) - ret0, _ := ret[0].([]ackhandler.Frame) + ret0, _ := ret[0].([]*ackhandler.Frame) ret1, _ := ret[1].(protocol.ByteCount) return ret0, ret1 } @@ -51,10 +51,10 @@ func (mr *MockFrameSourceMockRecorder) AppendControlFrames(arg0, arg1 interface{ } // AppendStreamFrames mocks base method. -func (m *MockFrameSource) AppendStreamFrames(arg0 []ackhandler.Frame, arg1 protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) { +func (m *MockFrameSource) AppendStreamFrames(arg0 []*ackhandler.Frame, arg1 protocol.ByteCount) ([]*ackhandler.Frame, protocol.ByteCount) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AppendStreamFrames", arg0, arg1) - ret0, _ := ret[0].([]ackhandler.Frame) + ret0, _ := ret[0].([]*ackhandler.Frame) ret1, _ := ret[1].(protocol.ByteCount) return ret0, ret1 } diff --git a/packet_packer.go b/packet_packer.go index f2223e12..0a11bac4 100644 --- a/packet_packer.go +++ b/packet_packer.go @@ -35,7 +35,7 @@ type sealer interface { } type payload struct { - frames []ackhandler.Frame + frames []*ackhandler.Frame ack *wire.AckFrame length protocol.ByteCount } @@ -43,7 +43,7 @@ type payload struct { type longHeaderPacket struct { header *wire.ExtendedHeader ack *wire.AckFrame - frames []ackhandler.Frame + frames []*ackhandler.Frame length protocol.ByteCount @@ -143,8 +143,8 @@ type sealingManager interface { type frameSource interface { HasData() bool - AppendStreamFrames([]ackhandler.Frame, protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) - AppendControlFrames([]ackhandler.Frame, protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) + AppendStreamFrames([]*ackhandler.Frame, protocol.ByteCount) ([]*ackhandler.Frame, protocol.ByteCount) + AppendControlFrames([]*ackhandler.Frame, protocol.ByteCount) ([]*ackhandler.Frame, protocol.ByteCount) } type ackFrameSource interface { @@ -256,7 +256,7 @@ func (p *packetPacker) packConnectionClose( ccf.ReasonPhrase = "" } pl := payload{ - frames: []ackhandler.Frame{{Frame: ccf}}, + frames: []*ackhandler.Frame{{Frame: ccf}}, length: ccf.Length(p.version), } @@ -357,7 +357,7 @@ func (p *packetPacker) shortHeaderPacketLength(connID protocol.ConnectionID, pnL } // size is the expected size of the packet, if no padding was applied. -func (p *packetPacker) initialPaddingLen(frames []ackhandler.Frame, size protocol.ByteCount) protocol.ByteCount { +func (p *packetPacker) initialPaddingLen(frames []*ackhandler.Frame, size protocol.ByteCount) protocol.ByteCount { // For the server, only ack-eliciting Initial packets need to be padded. if p.perspective == protocol.PerspectiveServer && !ackhandler.HasAckElicitingFrames(frames) { return 0 @@ -576,14 +576,16 @@ func (p *packetPacker) maybeGetCryptoPacket(maxPacketSize protocol.ByteCount, en if f == nil { break } - pl.frames = append(pl.frames, ackhandler.Frame{Frame: f}) + af := ackhandler.GetFrame() + af.Frame = f + pl.frames = append(pl.frames, af) frameLen := f.Length(p.version) pl.length += frameLen maxPacketSize -= frameLen } } else if s.HasData() { cf := s.PopCryptoFrame(maxPacketSize) - pl.frames = []ackhandler.Frame{{Frame: cf}} + pl.frames = []*ackhandler.Frame{{Frame: cf}} pl.length += cf.Length(p.version) } return hdr, pl @@ -616,7 +618,10 @@ func (p *packetPacker) maybeGetAppDataPacket(maxPayloadSize protocol.ByteCount, if p.numNonAckElicitingAcks >= protocol.MaxNonAckElicitingAcks { ping := &wire.PingFrame{} // don't retransmit the PING frame when it is lost - pl.frames = append(pl.frames, ackhandler.Frame{Frame: ping, OnLost: func(wire.Frame) {}}) + af := ackhandler.GetFrame() + af.Frame = ping + af.OnLost = func(wire.Frame) {} + pl.frames = append(pl.frames, af) pl.length += ping.Length(p.version) p.numNonAckElicitingAcks = 0 } else { @@ -639,7 +644,7 @@ func (p *packetPacker) composeNextPacket(maxFrameSize protocol.ByteCount, onlyAc return payload{} } - pl := payload{frames: make([]ackhandler.Frame, 0, 1)} + pl := payload{frames: make([]*ackhandler.Frame, 0, 1)} hasData := p.framer.HasData() hasRetransmission := p.retransmissionQueue.HasAppData() @@ -657,11 +662,11 @@ func (p *packetPacker) composeNextPacket(maxFrameSize protocol.ByteCount, onlyAc if f := p.datagramQueue.Peek(); f != nil { size := f.Length(p.version) if size <= maxFrameSize-pl.length { - pl.frames = append(pl.frames, ackhandler.Frame{ - Frame: f, - // set it to a no-op. Then we won't set the default callback, which would retransmit the frame. - OnLost: func(wire.Frame) {}, - }) + af := ackhandler.GetFrame() + af.Frame = f + // set it to a no-op. Then we won't set the default callback, which would retransmit the frame. + af.OnLost = func(wire.Frame) {} + pl.frames = append(pl.frames, af) pl.length += size p.datagramQueue.Pop() } @@ -682,7 +687,9 @@ func (p *packetPacker) composeNextPacket(maxFrameSize protocol.ByteCount, onlyAc if f == nil { break } - pl.frames = append(pl.frames, ackhandler.Frame{Frame: f}) + af := ackhandler.GetFrame() + af.Frame = f + pl.frames = append(pl.frames, af) pl.length += f.Length(p.version) } } @@ -772,7 +779,7 @@ func (p *packetPacker) MaybePackProbePacket(encLevel protocol.EncryptionLevel) ( func (p *packetPacker) PackMTUProbePacket(ping ackhandler.Frame, size protocol.ByteCount, now time.Time) (shortHeaderPacket, *packetBuffer, error) { pl := payload{ - frames: []ackhandler.Frame{ping}, + frames: []*ackhandler.Frame{&ping}, length: ping.Length(p.version), } buffer := getPacketBuffer() diff --git a/packet_packer_test.go b/packet_packer_test.go index 947163af..7f07a0c7 100644 --- a/packet_packer_test.go +++ b/packet_packer_test.go @@ -63,7 +63,7 @@ var _ = Describe("Packet packer", func() { ExpectWithOffset(1, len(data)-l+int(pnLen)).To(BeNumerically(">=", 4)) } - appendFrames := func(fs, frames []ackhandler.Frame) ([]ackhandler.Frame, protocol.ByteCount) { + appendFrames := func(fs, frames []*ackhandler.Frame) ([]*ackhandler.Frame, protocol.ByteCount) { var length protocol.ByteCount for _, f := range frames { length += f.Frame.Length(packer.version) @@ -71,14 +71,14 @@ var _ = Describe("Packet packer", func() { return append(fs, frames...), length } - expectAppendStreamFrames := func(frames ...ackhandler.Frame) { - framer.EXPECT().AppendStreamFrames(gomock.Any(), gomock.Any()).DoAndReturn(func(fs []ackhandler.Frame, _ protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) { + expectAppendStreamFrames := func(frames ...*ackhandler.Frame) { + framer.EXPECT().AppendStreamFrames(gomock.Any(), gomock.Any()).DoAndReturn(func(fs []*ackhandler.Frame, _ protocol.ByteCount) ([]*ackhandler.Frame, protocol.ByteCount) { return appendFrames(fs, frames) }) } - expectAppendControlFrames := func(frames ...ackhandler.Frame) { - framer.EXPECT().AppendControlFrames(gomock.Any(), gomock.Any()).DoAndReturn(func(fs []ackhandler.Frame, _ protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) { + expectAppendControlFrames := func(frames ...*ackhandler.Frame) { + framer.EXPECT().AppendControlFrames(gomock.Any(), gomock.Any()).DoAndReturn(func(fs []*ackhandler.Frame, _ protocol.ByteCount) ([]*ackhandler.Frame, protocol.ByteCount) { return appendFrames(fs, frames) }) } @@ -183,7 +183,7 @@ var _ = Describe("Packet packer", func() { ackFramer.EXPECT().GetAckFrame(protocol.Encryption1RTT, false) expectAppendControlFrames() f := &wire.StreamFrame{Data: []byte{0xde, 0xca, 0xfb, 0xad}} - expectAppendStreamFrames(ackhandler.Frame{Frame: f}) + expectAppendStreamFrames(&ackhandler.Frame{Frame: f}) p, err := packer.PackCoalescedPacket(false) Expect(err).ToNot(HaveOccurred()) Expect(p).ToNot(BeNil()) @@ -313,14 +313,14 @@ var _ = Describe("Packet packer", func() { sealingManager.EXPECT().Get0RTTSealer().Return(getSealer(), nil).AnyTimes() pnManager.EXPECT().PeekPacketNumber(protocol.Encryption0RTT).Return(protocol.PacketNumber(0x42), protocol.PacketNumberLen2) pnManager.EXPECT().PopPacketNumber(protocol.Encryption0RTT).Return(protocol.PacketNumber(0x42)) - cf := ackhandler.Frame{Frame: &wire.MaxDataFrame{MaximumData: 0x1337}} + cf := &ackhandler.Frame{Frame: &wire.MaxDataFrame{MaximumData: 0x1337}} framer.EXPECT().HasData().Return(true) - framer.EXPECT().AppendControlFrames(gomock.Any(), gomock.Any()).DoAndReturn(func(frames []ackhandler.Frame, _ protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) { + framer.EXPECT().AppendControlFrames(gomock.Any(), gomock.Any()).DoAndReturn(func(frames []*ackhandler.Frame, _ protocol.ByteCount) ([]*ackhandler.Frame, protocol.ByteCount) { Expect(frames).To(BeEmpty()) return append(frames, cf), cf.Length(packer.version) }) // TODO: check sizes - framer.EXPECT().AppendStreamFrames(gomock.Any(), gomock.Any()).DoAndReturn(func(frames []ackhandler.Frame, _ protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) { + framer.EXPECT().AppendStreamFrames(gomock.Any(), gomock.Any()).DoAndReturn(func(frames []*ackhandler.Frame, _ protocol.ByteCount) ([]*ackhandler.Frame, protocol.ByteCount) { return frames, 0 }) p, err := packer.PackCoalescedPacket(false) @@ -329,7 +329,7 @@ var _ = Describe("Packet packer", func() { Expect(p.longHdrPackets).To(HaveLen(1)) Expect(p.longHdrPackets[0].header.Type).To(Equal(protocol.PacketType0RTT)) Expect(p.longHdrPackets[0].EncryptionLevel()).To(Equal(protocol.Encryption0RTT)) - Expect(p.longHdrPackets[0].frames).To(Equal([]ackhandler.Frame{cf})) + Expect(p.longHdrPackets[0].frames).To(Equal([]*ackhandler.Frame{cf})) }) }) @@ -517,7 +517,7 @@ var _ = Describe("Packet packer", func() { StreamID: 5, Data: []byte{0xde, 0xca, 0xfb, 0xad}, } - expectAppendStreamFrames(ackhandler.Frame{Frame: f}) + expectAppendStreamFrames(&ackhandler.Frame{Frame: f}) p, buffer, err := packer.PackPacket(false, time.Now()) Expect(err).ToNot(HaveOccurred()) Expect(p).ToNot(BeNil()) @@ -547,7 +547,7 @@ var _ = Describe("Packet packer", func() { sealingManager.EXPECT().Get1RTTSealer().Return(getSealer(), nil) framer.EXPECT().HasData().Return(true) ackFramer.EXPECT().GetAckFrame(protocol.Encryption1RTT, false) - frames := []ackhandler.Frame{ + frames := []*ackhandler.Frame{ {Frame: &wire.ResetStreamFrame{}}, {Frame: &wire.MaxDataFrame{}}, } @@ -627,11 +627,11 @@ var _ = Describe("Packet packer", func() { ackFramer.EXPECT().GetAckFrame(protocol.Encryption1RTT, false) var maxSize protocol.ByteCount gomock.InOrder( - framer.EXPECT().AppendControlFrames(gomock.Any(), gomock.Any()).DoAndReturn(func(fs []ackhandler.Frame, maxLen protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) { + framer.EXPECT().AppendControlFrames(gomock.Any(), gomock.Any()).DoAndReturn(func(fs []*ackhandler.Frame, maxLen protocol.ByteCount) ([]*ackhandler.Frame, protocol.ByteCount) { maxSize = maxLen return fs, 444 }), - framer.EXPECT().AppendStreamFrames(gomock.Any(), gomock.Any()).Do(func(fs []ackhandler.Frame, maxLen protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) { + framer.EXPECT().AppendStreamFrames(gomock.Any(), gomock.Any()).Do(func(fs []*ackhandler.Frame, maxLen protocol.ByteCount) ([]*ackhandler.Frame, protocol.ByteCount) { Expect(maxLen).To(Equal(maxSize - 444)) return fs, 0 }), @@ -692,7 +692,7 @@ var _ = Describe("Packet packer", func() { framer.EXPECT().HasData().Return(true) ackFramer.EXPECT().GetAckFrame(protocol.Encryption1RTT, false) expectAppendControlFrames() - expectAppendStreamFrames(ackhandler.Frame{Frame: f}) + expectAppendStreamFrames(&ackhandler.Frame{Frame: f}) _, buffer, err := packer.PackPacket(false, time.Now()) Expect(err).ToNot(HaveOccurred()) // cut off the tag that the mock sealer added @@ -741,7 +741,7 @@ var _ = Describe("Packet packer", func() { framer.EXPECT().HasData().Return(true) ackFramer.EXPECT().GetAckFrame(protocol.Encryption1RTT, false) expectAppendControlFrames() - expectAppendStreamFrames(ackhandler.Frame{Frame: f1}, ackhandler.Frame{Frame: f2}, ackhandler.Frame{Frame: f3}) + expectAppendStreamFrames(&ackhandler.Frame{Frame: f1}, &ackhandler.Frame{Frame: f2}, &ackhandler.Frame{Frame: f3}) p, _, err := packer.PackPacket(false, time.Now()) Expect(p).ToNot(BeNil()) Expect(err).ToNot(HaveOccurred()) @@ -845,7 +845,7 @@ var _ = Describe("Packet packer", func() { framer.EXPECT().HasData().Return(true) ackFramer.EXPECT().GetAckFrame(protocol.Encryption1RTT, false) expectAppendStreamFrames() - expectAppendControlFrames(ackhandler.Frame{Frame: &wire.MaxDataFrame{}}) + expectAppendControlFrames(&ackhandler.Frame{Frame: &wire.MaxDataFrame{}}) p, _, err := packer.PackPacket(false, time.Now()) Expect(err).ToNot(HaveOccurred()) Expect(p).ToNot(BeNil()) @@ -860,7 +860,7 @@ var _ = Describe("Packet packer", func() { framer.EXPECT().HasData().Return(true).Times(2) ackFramer.EXPECT().GetAckFrame(protocol.Encryption1RTT, false).Times(2) var initialMaxPacketSize protocol.ByteCount - framer.EXPECT().AppendControlFrames(gomock.Any(), gomock.Any()).Do(func(_ []ackhandler.Frame, maxLen protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) { + framer.EXPECT().AppendControlFrames(gomock.Any(), gomock.Any()).Do(func(_ []*ackhandler.Frame, maxLen protocol.ByteCount) ([]*ackhandler.Frame, protocol.ByteCount) { initialMaxPacketSize = maxLen return nil, 0 }) @@ -871,7 +871,7 @@ var _ = Describe("Packet packer", func() { packer.HandleTransportParameters(&wire.TransportParameters{ MaxUDPPayloadSize: maxPacketSize - 10, }) - framer.EXPECT().AppendControlFrames(gomock.Any(), gomock.Any()).Do(func(_ []ackhandler.Frame, maxLen protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) { + framer.EXPECT().AppendControlFrames(gomock.Any(), gomock.Any()).Do(func(_ []*ackhandler.Frame, maxLen protocol.ByteCount) ([]*ackhandler.Frame, protocol.ByteCount) { Expect(maxLen).To(Equal(initialMaxPacketSize - 10)) return nil, 0 }) @@ -886,7 +886,7 @@ var _ = Describe("Packet packer", func() { framer.EXPECT().HasData().Return(true).Times(2) ackFramer.EXPECT().GetAckFrame(protocol.Encryption1RTT, false).Times(2) var initialMaxPacketSize protocol.ByteCount - framer.EXPECT().AppendControlFrames(gomock.Any(), gomock.Any()).Do(func(_ []ackhandler.Frame, maxLen protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) { + framer.EXPECT().AppendControlFrames(gomock.Any(), gomock.Any()).Do(func(_ []*ackhandler.Frame, maxLen protocol.ByteCount) ([]*ackhandler.Frame, protocol.ByteCount) { initialMaxPacketSize = maxLen return nil, 0 }) @@ -897,7 +897,7 @@ var _ = Describe("Packet packer", func() { packer.HandleTransportParameters(&wire.TransportParameters{ MaxUDPPayloadSize: maxPacketSize + 10, }) - framer.EXPECT().AppendControlFrames(gomock.Any(), gomock.Any()).Do(func(_ []ackhandler.Frame, maxLen protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) { + framer.EXPECT().AppendControlFrames(gomock.Any(), gomock.Any()).Do(func(_ []*ackhandler.Frame, maxLen protocol.ByteCount) ([]*ackhandler.Frame, protocol.ByteCount) { Expect(maxLen).To(Equal(initialMaxPacketSize)) return nil, 0 }) @@ -914,7 +914,7 @@ var _ = Describe("Packet packer", func() { framer.EXPECT().HasData().Return(true).Times(2) ackFramer.EXPECT().GetAckFrame(protocol.Encryption1RTT, false).Times(2) var initialMaxPacketSize protocol.ByteCount - framer.EXPECT().AppendControlFrames(gomock.Any(), gomock.Any()).Do(func(_ []ackhandler.Frame, maxLen protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) { + framer.EXPECT().AppendControlFrames(gomock.Any(), gomock.Any()).Do(func(_ []*ackhandler.Frame, maxLen protocol.ByteCount) ([]*ackhandler.Frame, protocol.ByteCount) { initialMaxPacketSize = maxLen return nil, 0 }) @@ -924,7 +924,7 @@ var _ = Describe("Packet packer", func() { // now reduce the maxPacketSize const packetSizeIncrease = 50 packer.SetMaxPacketSize(maxPacketSize + packetSizeIncrease) - framer.EXPECT().AppendControlFrames(gomock.Any(), gomock.Any()).Do(func(_ []ackhandler.Frame, maxLen protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) { + framer.EXPECT().AppendControlFrames(gomock.Any(), gomock.Any()).Do(func(_ []*ackhandler.Frame, maxLen protocol.ByteCount) ([]*ackhandler.Frame, protocol.ByteCount) { Expect(maxLen).To(Equal(initialMaxPacketSize + packetSizeIncrease)) return nil, 0 }) @@ -1158,7 +1158,7 @@ var _ = Describe("Packet packer", func() { return &wire.CryptoFrame{Offset: 0x42, Data: []byte("initial")} }) expectAppendControlFrames() - expectAppendStreamFrames(ackhandler.Frame{Frame: &wire.StreamFrame{Data: []byte("foobar")}}) + expectAppendStreamFrames(&ackhandler.Frame{Frame: &wire.StreamFrame{Data: []byte("foobar")}}) p, err := packer.PackCoalescedPacket(false) Expect(err).ToNot(HaveOccurred()) Expect(p.buffer.Len()).To(BeNumerically(">=", protocol.MinInitialPacketSize)) @@ -1193,7 +1193,7 @@ var _ = Describe("Packet packer", func() { return &wire.CryptoFrame{Offset: 0x1337, Data: []byte("handshake")} }) expectAppendControlFrames() - expectAppendStreamFrames(ackhandler.Frame{Frame: &wire.StreamFrame{Data: []byte("foobar")}}) + expectAppendStreamFrames(&ackhandler.Frame{Frame: &wire.StreamFrame{Data: []byte("foobar")}}) p, err := packer.PackCoalescedPacket(false) Expect(err).ToNot(HaveOccurred()) Expect(p.buffer.Len()).To(BeNumerically("<", 100)) @@ -1290,7 +1290,7 @@ var _ = Describe("Packet packer", func() { Expect(err).ToNot(HaveOccurred()) Expect(p.longHdrPackets).To(HaveLen(1)) Expect(p.longHdrPackets[0].EncryptionLevel()).To(Equal(protocol.EncryptionInitial)) - Expect(p.longHdrPackets[0].frames).To(Equal([]ackhandler.Frame{{Frame: f}})) + Expect(p.longHdrPackets[0].frames).To(Equal([]*ackhandler.Frame{{Frame: f}})) }) It("sends an Initial packet containing only an ACK", func() { @@ -1491,7 +1491,7 @@ var _ = Describe("Packet packer", func() { pnManager.EXPECT().PopPacketNumber(protocol.Encryption1RTT).Return(protocol.PacketNumber(0x42)) framer.EXPECT().HasData().Return(true) expectAppendControlFrames() - expectAppendStreamFrames(ackhandler.Frame{Frame: f}) + expectAppendStreamFrames(&ackhandler.Frame{Frame: f}) p, err := packer.MaybePackProbePacket(protocol.Encryption1RTT) Expect(err).ToNot(HaveOccurred()) @@ -1512,10 +1512,10 @@ var _ = Describe("Packet packer", func() { pnManager.EXPECT().PopPacketNumber(protocol.Encryption1RTT).Return(protocol.PacketNumber(0x42)) framer.EXPECT().HasData().Return(true) expectAppendControlFrames() - framer.EXPECT().AppendStreamFrames(gomock.Any(), gomock.Any()).DoAndReturn(func(fs []ackhandler.Frame, maxSize protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) { + framer.EXPECT().AppendStreamFrames(gomock.Any(), gomock.Any()).DoAndReturn(func(fs []*ackhandler.Frame, maxSize protocol.ByteCount) ([]*ackhandler.Frame, protocol.ByteCount) { sf, split := f.MaybeSplitOffFrame(maxSize, packer.version) Expect(split).To(BeTrue()) - return append(fs, ackhandler.Frame{Frame: sf}), sf.Length(packer.version) + return append(fs, &ackhandler.Frame{Frame: sf}), sf.Length(packer.version) }) p, err := packer.MaybePackProbePacket(protocol.Encryption1RTT) @@ -1561,7 +1561,7 @@ var _ = Describe("Converting to ackhandler.Packet", func() { It("convert a packet", func() { packet := &longHeaderPacket{ header: &wire.ExtendedHeader{Header: wire.Header{Type: protocol.PacketTypeInitial}}, - frames: []ackhandler.Frame{{Frame: &wire.MaxDataFrame{}}, {Frame: &wire.PingFrame{}}}, + frames: []*ackhandler.Frame{{Frame: &wire.MaxDataFrame{}}, {Frame: &wire.PingFrame{}}}, ack: &wire.AckFrame{AckRanges: []wire.AckRange{{Largest: 100, Smallest: 80}}}, length: 42, } @@ -1576,7 +1576,7 @@ var _ = Describe("Converting to ackhandler.Packet", func() { It("sets the LargestAcked to invalid, if the packet doesn't have an ACK frame", func() { packet := &longHeaderPacket{ header: &wire.ExtendedHeader{Header: wire.Header{Type: protocol.PacketTypeHandshake}}, - frames: []ackhandler.Frame{{Frame: &wire.MaxDataFrame{}}, {Frame: &wire.PingFrame{}}}, + frames: []*ackhandler.Frame{{Frame: &wire.MaxDataFrame{}}, {Frame: &wire.PingFrame{}}}, } p := packet.ToAckHandlerPacket(time.Now(), nil) Expect(p.LargestAcked).To(Equal(protocol.InvalidPacketNumber)) @@ -1588,7 +1588,7 @@ var _ = Describe("Converting to ackhandler.Packet", func() { var pingLost bool packet := &longHeaderPacket{ header: &wire.ExtendedHeader{Header: hdr}, - frames: []ackhandler.Frame{ + frames: []*ackhandler.Frame{ {Frame: &wire.MaxDataFrame{}}, {Frame: &wire.PingFrame{}, OnLost: func(wire.Frame) { pingLost = true }}, }, diff --git a/send_stream.go b/send_stream.go index 66807927..20e12259 100644 --- a/send_stream.go +++ b/send_stream.go @@ -215,7 +215,11 @@ func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*ackhandler.Fr if f == nil { return nil, hasMoreData } - return &ackhandler.Frame{Frame: f, OnLost: s.queueRetransmission, OnAcked: s.frameAcked}, hasMoreData + af := ackhandler.GetFrame() + af.Frame = f + af.OnLost = s.queueRetransmission + af.OnAcked = s.frameAcked + return af, hasMoreData } func (s *sendStream) popNewOrRetransmittedStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool /* has more data to send */) {