From defafec71ec126850279465ef341ade6464e7941 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 7 Aug 2018 18:34:31 +0700 Subject: [PATCH] use retransmissions as TLP packets --- internal/ackhandler/interfaces.go | 3 +- internal/ackhandler/sent_packet_handler.go | 31 +++++----- .../ackhandler/sent_packet_handler_test.go | 37 +++++------ .../mocks/ackhandler/sent_packet_handler.go | 13 ++++ session.go | 57 ++++++++--------- session_test.go | 62 ++++--------------- 6 files changed, 88 insertions(+), 115 deletions(-) diff --git a/internal/ackhandler/interfaces.go b/internal/ackhandler/interfaces.go index 43027dcf..1924cdc9 100644 --- a/internal/ackhandler/interfaces.go +++ b/internal/ackhandler/interfaces.go @@ -29,7 +29,8 @@ type SentPacketHandler interface { GetStopWaitingFrame(force bool) *wire.StopWaitingFrame GetLowestPacketNotConfirmedAcked() protocol.PacketNumber - DequeuePacketForRetransmission() (packet *Packet) + DequeuePacketForRetransmission() *Packet + DequeueProbePacket() (*Packet, error) GetPacketNumberLen(protocol.PacketNumber) protocol.PacketNumberLen GetAlarmTimeout() time.Time diff --git a/internal/ackhandler/sent_packet_handler.go b/internal/ackhandler/sent_packet_handler.go index 3ed4a7ed..4fdb8c36 100644 --- a/internal/ackhandler/sent_packet_handler.go +++ b/internal/ackhandler/sent_packet_handler.go @@ -1,6 +1,7 @@ package ackhandler import ( + "errors" "fmt" "math" "time" @@ -415,7 +416,6 @@ func (h *sentPacketHandler) onVerifiedAlarm() error { } h.rtoCount++ h.numRTOs += 2 - err = h.queueRTOs() } return err } @@ -506,6 +506,19 @@ func (h *sentPacketHandler) DequeuePacketForRetransmission() *Packet { return packet } +func (h *sentPacketHandler) DequeueProbePacket() (*Packet, error) { + if len(h.retransmissionQueue) == 0 { + p := h.packetHistory.FirstOutstanding() + if p == nil { + return nil, errors.New("cannot dequeue a probe packet. No outstanding packets") + } + if err := h.queuePacketForRetransmission(p); err != nil { + return nil, err + } + } + return h.DequeuePacketForRetransmission(), nil +} + func (h *sentPacketHandler) GetPacketNumberLen(p protocol.PacketNumber) protocol.PacketNumberLen { return protocol.GetPacketNumberLengthForHeader(p, h.lowestUnacked(), h.version) } @@ -569,22 +582,6 @@ func (h *sentPacketHandler) ShouldSendNumPackets() int { return int(math.Ceil(float64(protocol.MinPacingDelay) / float64(delay))) } -// retransmit the oldest two packets -func (h *sentPacketHandler) queueRTOs() error { - // Queue the first two outstanding packets for retransmission. - // This does NOT declare this packets as lost: - // They are still tracked in the packet history and count towards the bytes in flight. - for i := 0; i < 2; i++ { - if p := h.packetHistory.FirstOutstanding(); p != nil { - h.logger.Debugf("Queueing packet %#x for retransmission (RTO)", p.PacketNumber) - if err := h.queuePacketForRetransmission(p); err != nil { - return err - } - } - } - return nil -} - func (h *sentPacketHandler) queueHandshakePacketsForRetransmission() error { var handshakePackets []*Packet h.packetHistory.Iterate(func(p *Packet) (bool, error) { diff --git a/internal/ackhandler/sent_packet_handler_test.go b/internal/ackhandler/sent_packet_handler_test.go index ff014db9..9960e537 100644 --- a/internal/ackhandler/sent_packet_handler_test.go +++ b/internal/ackhandler/sent_packet_handler_test.go @@ -591,12 +591,8 @@ var _ = Describe("SentPacketHandler", func() { handler.SentPacket(retransmittablePacket(&Packet{PacketNumber: i})) } handler.OnAlarm() // TLP - Expect(handler.DequeuePacketForRetransmission()).To(BeNil()) handler.OnAlarm() // TLP - Expect(handler.DequeuePacketForRetransmission()).To(BeNil()) handler.OnAlarm() // RTO - Expect(handler.DequeuePacketForRetransmission()).ToNot(BeNil()) - Expect(handler.DequeuePacketForRetransmission()).ToNot(BeNil()) }) It("declares all lower packets lost and call OnRetransmissionTimeout when verifying an RTO", func() { @@ -609,8 +605,6 @@ var _ = Describe("SentPacketHandler", func() { handler.OnAlarm() // TLP handler.OnAlarm() // TLP handler.OnAlarm() // RTO - Expect(handler.DequeuePacketForRetransmission()).ToNot(BeNil()) - Expect(handler.DequeuePacketForRetransmission()).ToNot(BeNil()) // send one probe packet and receive an ACK for it rcvTime := time.Now() gomock.InOrder( @@ -636,10 +630,8 @@ var _ = Describe("SentPacketHandler", func() { handler.OnAlarm() // TLP handler.OnAlarm() // TLP handler.OnAlarm() // RTO - Expect(handler.DequeuePacketForRetransmission()).ToNot(BeNil()) - Expect(handler.DequeuePacketForRetransmission()).ToNot(BeNil()) - // send one probe packet + // send one probe packet handler.SentPacket(retransmittablePacket(&Packet{PacketNumber: 3})) // receive an ACK for a packet send *before* the probe packet // don't EXPECT any call to OnRetransmissionTimeout @@ -873,7 +865,7 @@ var _ = Describe("SentPacketHandler", func() { Expect(handler.computeRTOTimeout()).To(Equal(4 * defaultRTOTimeout)) }) - It("queues two packets if RTO expires", func() { + It("gets two probe packets if RTO expires", func() { handler.SentPacket(retransmittablePacket(&Packet{PacketNumber: 1})) handler.SentPacket(retransmittablePacket(&Packet{PacketNumber: 2})) @@ -884,10 +876,12 @@ var _ = Describe("SentPacketHandler", func() { handler.OnAlarm() // TLP handler.OnAlarm() // TLP handler.OnAlarm() // RTO - p := handler.DequeuePacketForRetransmission() + p, err := handler.DequeueProbePacket() + Expect(err).ToNot(HaveOccurred()) Expect(p).ToNot(BeNil()) Expect(p.PacketNumber).To(Equal(protocol.PacketNumber(1))) - p = handler.DequeuePacketForRetransmission() + p, err = handler.DequeueProbePacket() + Expect(err).ToNot(HaveOccurred()) Expect(p).ToNot(BeNil()) Expect(p.PacketNumber).To(Equal(protocol.PacketNumber(2))) Expect(handler.bytesInFlight).To(Equal(protocol.ByteCount(2))) @@ -902,15 +896,17 @@ var _ = Describe("SentPacketHandler", func() { handler.OnAlarm() // TLP handler.OnAlarm() // TLP handler.OnAlarm() // RTO - Expect(handler.DequeuePacketForRetransmission()).ToNot(BeNil()) - Expect(handler.DequeuePacketForRetransmission()).ToNot(BeNil()) + _, err := handler.DequeueProbePacket() + Expect(err).ToNot(HaveOccurred()) + _, err = handler.DequeueProbePacket() + Expect(err).ToNot(HaveOccurred()) expectInPacketHistory([]protocol.PacketNumber{1, 2}) Expect(handler.bytesInFlight).To(Equal(protocol.ByteCount(2))) // Send a probe packet and receive an ACK for it. // This verifies the RTO. handler.SentPacket(retransmittablePacket(&Packet{PacketNumber: 3})) ack := &wire.AckFrame{AckRanges: []wire.AckRange{{Smallest: 3, Largest: 3}}} - err := handler.ReceivedAck(ack, 1, protocol.EncryptionForwardSecure, time.Now()) + err = handler.ReceivedAck(ack, 1, protocol.EncryptionForwardSecure, time.Now()) Expect(err).ToNot(HaveOccurred()) Expect(handler.packetHistory.Len()).To(BeZero()) Expect(handler.bytesInFlight).To(BeZero()) @@ -931,7 +927,7 @@ var _ = Describe("SentPacketHandler", func() { Expect(handler.SendMode()).ToNot(Equal(SendRTO)) }) - It("queues packets sent before the probe packet for retransmission", func() { + It("gets packets sent before the probe packet for retransmission", func() { handler.SentPacket(retransmittablePacket(&Packet{PacketNumber: 1, SendTime: time.Now().Add(-time.Hour)})) handler.SentPacket(retransmittablePacket(&Packet{PacketNumber: 2, SendTime: time.Now().Add(-time.Hour)})) handler.SentPacket(retransmittablePacket(&Packet{PacketNumber: 3, SendTime: time.Now().Add(-time.Hour)})) @@ -940,14 +936,16 @@ var _ = Describe("SentPacketHandler", func() { handler.OnAlarm() // TLP handler.OnAlarm() // TLP handler.OnAlarm() // RTO - Expect(handler.DequeuePacketForRetransmission()).ToNot(BeNil()) - Expect(handler.DequeuePacketForRetransmission()).ToNot(BeNil()) + _, err := handler.DequeueProbePacket() + Expect(err).ToNot(HaveOccurred()) + _, err = handler.DequeueProbePacket() + Expect(err).ToNot(HaveOccurred()) expectInPacketHistory([]protocol.PacketNumber{1, 2, 3, 4, 5}) // Send a probe packet and receive an ACK for it. // This verifies the RTO. handler.SentPacket(retransmittablePacket(&Packet{PacketNumber: 6})) ack := &wire.AckFrame{AckRanges: []wire.AckRange{{Smallest: 6, Largest: 6}}} - err := handler.ReceivedAck(ack, 1, protocol.EncryptionForwardSecure, time.Now()) + err = handler.ReceivedAck(ack, 1, protocol.EncryptionForwardSecure, time.Now()) Expect(err).ToNot(HaveOccurred()) Expect(handler.packetHistory.Len()).To(BeZero()) Expect(handler.bytesInFlight).To(BeZero()) @@ -960,7 +958,6 @@ var _ = Describe("SentPacketHandler", func() { handler.OnAlarm() // TLP handler.OnAlarm() // TLP handler.OnAlarm() // RTO - Expect(handler.DequeuePacketForRetransmission()).ToNot(BeNil()) handler.SentPacketsAsRetransmission([]*Packet{retransmittablePacket(&Packet{PacketNumber: 6})}, 5) ack := &wire.AckFrame{AckRanges: []wire.AckRange{{Smallest: 5, Largest: 5}}} err := handler.ReceivedAck(ack, 1, protocol.EncryptionForwardSecure, time.Now()) diff --git a/internal/mocks/ackhandler/sent_packet_handler.go b/internal/mocks/ackhandler/sent_packet_handler.go index b11d999a..aff5a1e1 100644 --- a/internal/mocks/ackhandler/sent_packet_handler.go +++ b/internal/mocks/ackhandler/sent_packet_handler.go @@ -49,6 +49,19 @@ func (mr *MockSentPacketHandlerMockRecorder) DequeuePacketForRetransmission() *g return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DequeuePacketForRetransmission", reflect.TypeOf((*MockSentPacketHandler)(nil).DequeuePacketForRetransmission)) } +// DequeueProbePacket mocks base method +func (m *MockSentPacketHandler) DequeueProbePacket() (*ackhandler.Packet, error) { + ret := m.ctrl.Call(m, "DequeueProbePacket") + ret0, _ := ret[0].(*ackhandler.Packet) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DequeueProbePacket indicates an expected call of DequeueProbePacket +func (mr *MockSentPacketHandlerMockRecorder) DequeueProbePacket() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DequeueProbePacket", reflect.TypeOf((*MockSentPacketHandler)(nil).DequeueProbePacket)) +} + // GetAlarmTimeout mocks base method func (m *MockSentPacketHandler) GetAlarmTimeout() time.Time { ret := m.ctrl.Call(m, "GetAlarmTimeout") diff --git a/session.go b/session.go index 419edb88..928cd141 100644 --- a/session.go +++ b/session.go @@ -935,37 +935,11 @@ sendLoop: // There will only be a new ACK after receiving new packets. // SendAck is only returned when we're congestion limited, so we don't need to set the pacingt timer. return s.maybeSendAckOnlyPacket() - case ackhandler.SendRTO: - // try to send a retransmission first - sentPacket, err := s.maybeSendRetransmission() - if err != nil { + case ackhandler.SendTLP, ackhandler.SendRTO: + if err := s.sendProbePacket(); err != nil { return err } - if !sentPacket { - // In RTO mode, a probe packet has to be sent. - // Add a PING frame to make sure a (retransmittable) packet will be sent. - s.queueControlFrame(&wire.PingFrame{}) - sentPacket, err := s.sendPacket() - if err != nil { - return err - } - if !sentPacket { - return errors.New("session BUG: expected a packet to be sent in RTO mode") - } - } numPacketsSent++ - case ackhandler.SendTLP: - // In TLP mode, a probe packet has to be sent. - // Add a PING frame to make sure a (retransmittable) packet will be sent. - s.queueControlFrame(&wire.PingFrame{}) - sentPacket, err := s.sendPacket() - if err != nil { - return err - } - if !sentPacket { - return errors.New("session BUG: expected a packet to be sent in TLP mode") - } - return nil case ackhandler.SendRetransmission: sentPacket, err := s.maybeSendRetransmission() if err != nil { @@ -1067,6 +1041,33 @@ func (s *session) maybeSendRetransmission() (bool, error) { return true, nil } +func (s *session) sendProbePacket() error { + p, err := s.sentPacketHandler.DequeueProbePacket() + if err != nil { + return err + } + s.logger.Debugf("Sending a retransmission for %#x as a probe packet.", p.PacketNumber) + + if s.version.UsesStopWaitingFrames() { + s.packer.QueueControlFrame(s.sentPacketHandler.GetStopWaitingFrame(true)) + } + packets, err := s.packer.PackRetransmission(p) + if err != nil { + return err + } + ackhandlerPackets := make([]*ackhandler.Packet, len(packets)) + for i, packet := range packets { + ackhandlerPackets[i] = packet.ToAckHandlerPacket() + } + s.sentPacketHandler.SentPacketsAsRetransmission(ackhandlerPackets, p.PacketNumber) + for _, packet := range packets { + if err := s.sendPackedPacket(packet); err != nil { + return err + } + } + return nil +} + func (s *session) sendPacket() (bool, error) { if isBlocked, offset := s.connFlowController.IsNewlyBlocked(); isBlocked { s.packer.QueueControlFrame(&wire.BlockedFrame{Offset: offset}) diff --git a/session_test.go b/session_test.go index 9c13766a..300b2c53 100644 --- a/session_test.go +++ b/session_test.go @@ -810,65 +810,29 @@ var _ = Describe("Session", func() { Expect(err).ToNot(HaveOccurred()) }) - It("sends a TLP probe packet", func() { + It("sends a probe packet", func() { + f := &wire.MaxDataFrame{ByteOffset: 1337} sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) + sph.EXPECT().TimeUntilSend() sph.EXPECT().GetPacketNumberLen(gomock.Any()).Return(protocol.PacketNumberLen2).AnyTimes() sph.EXPECT().SendMode().Return(ackhandler.SendTLP) sph.EXPECT().ShouldSendNumPackets().Return(1) - sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) { - Expect(p.Frames).To(HaveLen(1)) - Expect(p.Frames[0]).To(BeAssignableToTypeOf(&wire.PingFrame{})) + sph.EXPECT().DequeueProbePacket().Return(&ackhandler.Packet{ + PacketNumber: 0x42, + Frames: []wire.Frame{f}, + }, nil) + sph.EXPECT().GetStopWaitingFrame(true).Return(&wire.StopWaitingFrame{}) + sph.EXPECT().SentPacketsAsRetransmission(gomock.Any(), protocol.PacketNumber(0x42)).Do(func(packets []*ackhandler.Packet, _ protocol.PacketNumber) { + Expect(packets).To(HaveLen(1)) + p := packets[0] + Expect(p.Frames).To(HaveLen(2)) + Expect(p.Frames[1]).To(Equal(f)) }) sess.sentPacketHandler = sph err := sess.sendPackets() Expect(err).ToNot(HaveOccurred()) }) - It("sends an RTO probe packets", func() { - sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) - sph.EXPECT().GetPacketNumberLen(gomock.Any()).Return(protocol.PacketNumberLen2).AnyTimes() - sph.EXPECT().TimeUntilSend() - sph.EXPECT().DequeuePacketForRetransmission().Return(&ackhandler.Packet{ - PacketNumber: 10, - }) - sph.EXPECT().DequeuePacketForRetransmission().Return(&ackhandler.Packet{ - PacketNumber: 11, - }) - sph.EXPECT().SendMode().Return(ackhandler.SendRTO).Times(2) - sph.EXPECT().ShouldSendNumPackets().Return(2) - sph.EXPECT().GetStopWaitingFrame(gomock.Any()).Return(&wire.StopWaitingFrame{}).Times(2) - gomock.InOrder( - sph.EXPECT().SentPacketsAsRetransmission(gomock.Any(), protocol.PacketNumber(10)), - sph.EXPECT().SentPacketsAsRetransmission(gomock.Any(), protocol.PacketNumber(11)), - ) - sess.sentPacketHandler = sph - err := sess.sendPackets() - Expect(err).ToNot(HaveOccurred()) - }) - - It("sends RTO probe packets with new data, if no retransmission is available", func() { - sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) - sph.EXPECT().GetPacketNumberLen(gomock.Any()).Return(protocol.PacketNumberLen2).AnyTimes() - sph.EXPECT().TimeUntilSend() - sph.EXPECT().DequeuePacketForRetransmission().Return(&ackhandler.Packet{ - PacketNumber: 10, - }) - sph.EXPECT().DequeuePacketForRetransmission() - sph.EXPECT().SendMode().Return(ackhandler.SendRTO).Times(2) - sph.EXPECT().ShouldSendNumPackets().Return(2) - sph.EXPECT().GetStopWaitingFrame(gomock.Any()).Return(&wire.StopWaitingFrame{}) - gomock.InOrder( - sph.EXPECT().SentPacketsAsRetransmission(gomock.Any(), protocol.PacketNumber(10)), - sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) { - Expect(p.Frames).To(HaveLen(1)) - Expect(p.Frames[0]).To(BeAssignableToTypeOf(&wire.PingFrame{})) - }), - ) - sess.sentPacketHandler = sph - err := sess.sendPackets() - Expect(err).ToNot(HaveOccurred()) - }) - It("doesn't send when the SentPacketHandler doesn't allow it", func() { sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) sph.EXPECT().SendMode().Return(ackhandler.SendNone)