implement packet send modes to determine what kind of packets are sent

This commit is contained in:
Marten Seemann 2018-03-09 17:20:02 +07:00
parent 6a2c4548f7
commit b5977236ff
9 changed files with 205 additions and 92 deletions

View file

@ -14,11 +14,8 @@ type SentPacketHandler interface {
ReceivedAck(ackFrame *wire.AckFrame, withPacketNumber protocol.PacketNumber, encLevel protocol.EncryptionLevel, recvTime time.Time) error
SetHandshakeComplete()
// SendingAllowed says if a packet can be sent.
// Sending packets might not be possible because:
// * we're congestion limited
// * we're tracking the maximum number of sent packets
SendingAllowed() bool
// The SendMode determines if and what kind of packets can be sent.
SendMode() SendMode
// TimeUntilSend is the time when the next packet should be sent.
// It is used for pacing packets.
TimeUntilSend() time.Time

View file

@ -0,0 +1,32 @@
package ackhandler
import "fmt"
// The SendMode says what kind of packets can be sent.
type SendMode uint8
const (
// SendNone means that no packets should be sent
SendNone SendMode = iota
// SendAck means an ACK-only packet should be sent
SendAck
// SendRetransmission means that retransmissions should be sent
SendRetransmission
// SendAny packet should be sent
SendAny
)
func (s SendMode) String() string {
switch s {
case SendNone:
return "none"
case SendAck:
return "ack"
case SendRetransmission:
return "retransmission"
case SendAny:
return "any"
default:
return fmt.Sprintf("invalid send mode: %d", s)
}
}

View file

@ -0,0 +1,16 @@
package ackhandler
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("Send Mode", func() {
It("has a string representation", func() {
Expect(SendNone.String()).To(Equal("none"))
Expect(SendAny.String()).To(Equal("any"))
Expect(SendAck.String()).To(Equal("ack"))
Expect(SendRetransmission.String()).To(Equal("retransmission"))
Expect(SendMode(123).String()).To(Equal("invalid send mode: 123"))
})
})

View file

@ -357,18 +357,31 @@ func (h *sentPacketHandler) GetStopWaitingFrame(force bool) *wire.StopWaitingFra
return h.stopWaitingManager.GetStopWaitingFrame(force)
}
func (h *sentPacketHandler) SendingAllowed() bool {
cwnd := h.congestion.GetCongestionWindow()
congestionLimited := h.bytesInFlight > cwnd
maxTrackedLimited := protocol.PacketNumber(len(h.retransmissionQueue)+h.packetHistory.Len()) >= protocol.MaxTrackedSentPackets
if congestionLimited {
utils.Debugf("Congestion limited: bytes in flight %d, window %d", h.bytesInFlight, cwnd)
func (h *sentPacketHandler) SendMode() SendMode {
numTrackedPackets := len(h.retransmissionQueue) + h.packetHistory.Len()
// Don't send any packets if we're keeping track of the maximum number of packets.
// Note that since MaxOutstandingSentPackets is smaller than MaxTrackedSentPackets,
// we will stop sending out new data when reaching MaxOutstandingSentPackets,
// but still allow sending of retransmissions and ACKs.
if numTrackedPackets >= protocol.MaxTrackedSentPackets {
utils.Debugf("Limited by the number of tracked packets: tracking %d packets, maximum %d", numTrackedPackets, protocol.MaxTrackedSentPackets)
return SendNone
}
// Workaround for #555:
// Always allow sending of retransmissions. This should probably be limited
// to RTOs, but we currently don't have a nice way of distinguishing them.
haveRetransmissions := len(h.retransmissionQueue) > 0
return !maxTrackedLimited && (!congestionLimited || haveRetransmissions)
// Send retransmissions first, if there are any.
if len(h.retransmissionQueue) > 0 {
return SendRetransmission
}
// Only send ACKs if we're congestion limited.
if cwnd := h.congestion.GetCongestionWindow(); h.bytesInFlight > cwnd {
utils.Debugf("Congestion limited: bytes in flight %d, window %d", h.bytesInFlight, cwnd)
return SendAck
}
if numTrackedPackets >= protocol.MaxOutstandingSentPackets {
utils.Debugf("Max outstanding limited: tracking %d packets, maximum: %d", numTrackedPackets, protocol.MaxOutstandingSentPackets)
return SendAck
}
return SendAny
}
func (h *sentPacketHandler) TimeUntilSend() time.Time {

View file

@ -694,28 +694,39 @@ var _ = Describe("SentPacketHandler", func() {
handler.OnAlarm() // RTO, meaning 2 lost packets
})
It("allows or denies sending based on congestion", func() {
It("only allows sending of ACKs when congestion limited", func() {
handler.bytesInFlight = 100
cong.EXPECT().GetCongestionWindow().Return(protocol.ByteCount(200))
Expect(handler.SendingAllowed()).To(BeTrue())
Expect(handler.SendMode()).To(Equal(SendAny))
cong.EXPECT().GetCongestionWindow().Return(protocol.ByteCount(75))
Expect(handler.SendingAllowed()).To(BeFalse())
Expect(handler.SendMode()).To(Equal(SendAck))
})
It("allows or denies sending based on the number of tracked packets", func() {
cong.EXPECT().GetCongestionWindow().Times(2)
Expect(handler.SendingAllowed()).To(BeTrue())
handler.retransmissionQueue = make([]*Packet, protocol.MaxTrackedSentPackets)
Expect(handler.SendingAllowed()).To(BeFalse())
It("only allows sending of ACKs when we're keeping track of MaxOutstandingSentPackets packets", func() {
cong.EXPECT().GetCongestionWindow().Return(protocol.MaxByteCount).AnyTimes()
cong.EXPECT().TimeUntilSend(gomock.Any()).AnyTimes()
cong.EXPECT().OnPacketSent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
for i := 1; i < protocol.MaxOutstandingSentPackets; i++ {
handler.SentPacket(retransmittablePacket(protocol.PacketNumber(i)))
Expect(handler.SendMode()).To(Equal(SendAny))
}
handler.SentPacket(retransmittablePacket(protocol.MaxOutstandingSentPackets))
Expect(handler.SendMode()).To(Equal(SendAck))
})
It("allows sending if there are retransmisisons outstanding", func() {
cong.EXPECT().GetCongestionWindow().Times(2)
handler.bytesInFlight = 100
Expect(handler.retransmissionQueue).To(BeEmpty())
Expect(handler.SendingAllowed()).To(BeFalse())
It("allows sending retransmissions", func() {
// note that we don't EXPECT a call to GetCongestionWindow
// that means retransmissions are sent without considering the congestion window
handler.retransmissionQueue = []*Packet{{PacketNumber: 3}}
Expect(handler.SendingAllowed()).To(BeTrue())
Expect(handler.SendMode()).To(Equal(SendRetransmission))
})
It("allow retransmissions, if we're keeping track of between MaxOutstandingSentPackets and MaxTrackedSentPackets packets", func() {
Expect(protocol.MaxOutstandingSentPackets).To(BeNumerically("<", protocol.MaxTrackedSentPackets))
handler.retransmissionQueue = make([]*Packet, protocol.MaxOutstandingSentPackets+10)
Expect(handler.SendMode()).To(Equal(SendRetransmission))
handler.retransmissionQueue = make([]*Packet, protocol.MaxTrackedSentPackets)
Expect(handler.SendMode()).To(Equal(SendNone))
})
It("gets the pacing delay", func() {

View file

@ -119,16 +119,16 @@ func (mr *MockSentPacketHandlerMockRecorder) ReceivedAck(arg0, arg1, arg2, arg3
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReceivedAck", reflect.TypeOf((*MockSentPacketHandler)(nil).ReceivedAck), arg0, arg1, arg2, arg3)
}
// SendingAllowed mocks base method
func (m *MockSentPacketHandler) SendingAllowed() bool {
ret := m.ctrl.Call(m, "SendingAllowed")
ret0, _ := ret[0].(bool)
// SendMode mocks base method
func (m *MockSentPacketHandler) SendMode() ackhandler.SendMode {
ret := m.ctrl.Call(m, "SendMode")
ret0, _ := ret[0].(ackhandler.SendMode)
return ret0
}
// SendingAllowed indicates an expected call of SendingAllowed
func (mr *MockSentPacketHandlerMockRecorder) SendingAllowed() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendingAllowed", reflect.TypeOf((*MockSentPacketHandler)(nil).SendingAllowed))
// SendMode indicates an expected call of SendMode
func (mr *MockSentPacketHandlerMockRecorder) SendMode() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMode", reflect.TypeOf((*MockSentPacketHandler)(nil).SendMode))
}
// SentPacket mocks base method

View file

@ -81,8 +81,15 @@ const MaxTrackedSkippedPackets = 10
// CookieExpiryTime is the valid time of a cookie
const CookieExpiryTime = 24 * time.Hour
// MaxTrackedSentPackets is maximum number of sent packets saved for either later retransmission or entropy calculation
const MaxTrackedSentPackets = 2 * DefaultMaxCongestionWindow
// MaxOutstandingSentPackets is maximum number of packets saved for retransmission.
// When reached, it imposes a soft limit on sending new packets:
// Sending ACKs and retransmission is still allowed, but now new regular packets can be sent.
const MaxOutstandingSentPackets = 2 * DefaultMaxCongestionWindow
// MaxTrackedSentPackets is maximum number of sent packets saved for retransmission.
// When reached, no more packets will be sent.
// This value *must* be larger than MaxOutstandingSentPackets.
const MaxTrackedSentPackets = MaxOutstandingSentPackets * 5 / 4
// MaxTrackedReceivedAckRanges is the maximum number of ACK ranges tracked
const MaxTrackedReceivedAckRanges = DefaultMaxCongestionWindow

View file

@ -766,40 +766,56 @@ func (s *session) processTransportParameters(params *handshake.TransportParamete
func (s *session) sendPackets() error {
s.pacingDeadline = time.Time{}
if !s.sentPacketHandler.SendingAllowed() { // if congestion limited, at least try sending an ACK frame
return s.maybeSendAckOnlyPacket()
sendMode := s.sentPacketHandler.SendMode()
if sendMode == ackhandler.SendNone { // shortcut: return immediately if there's nothing to send
return nil
}
numPackets := s.sentPacketHandler.ShouldSendNumPackets()
var numPacketsSent int
// Send retransmissions, until
// * we're congestion limited, or
// * there are no more retransmissions, or
// * the maximum number of packets was reached
for ; numPacketsSent < numPackets; numPacketsSent++ {
sentPacket, err := s.maybeSendRetransmission()
if err != nil {
return err
sendLoop:
for {
switch sendMode {
case ackhandler.SendNone:
break sendLoop
case ackhandler.SendAck:
// We can at most send a single ACK only packet.
// There will only be a new ACK after receiving new packets.
// SendAck is only returned when we're congestion limited, so we don't need to set the pacingt timer.
return s.maybeSendAckOnlyPacket()
case ackhandler.SendRetransmission:
sentPacket, err := s.maybeSendRetransmission()
if err != nil {
return err
}
if sentPacket {
numPacketsSent++
// This can happen if a retransmission queued, but it wasn't necessary to send it.
// e.g. when an Initial is queued, but we already received a packet from the server.
}
case ackhandler.SendAny:
sentPacket, err := s.sendPacket()
if err != nil {
return err
}
if !sentPacket {
break sendLoop
}
numPacketsSent++
default:
return fmt.Errorf("BUG: invalid send mode %d", sendMode)
}
if !sentPacket { // no more retransmission to send. Proceed to send new data.
if numPacketsSent >= numPackets {
break
}
if !s.sentPacketHandler.SendingAllowed() {
return nil
}
}
for ; numPacketsSent < numPackets; numPacketsSent++ {
sentPacket, err := s.sendPacket()
if err != nil {
return err
}
// If no packet was sent, or we're congestion limit, we're done here.
if !sentPacket || !s.sentPacketHandler.SendingAllowed() {
return nil
}
sendMode = s.sentPacketHandler.SendMode()
}
// Only start the pacing timer if we sent as many packets as we were allowed.
// There will probably be more to send when calling sendPacket again.
s.pacingDeadline = s.sentPacketHandler.TimeUntilSend()
if numPacketsSent == numPackets {
s.pacingDeadline = s.sentPacketHandler.TimeUntilSend()
}
return nil
}
@ -837,7 +853,6 @@ func (s *session) maybeSendRetransmission() (bool, error) {
// As soon as we receive one response, we don't need to send any more Initials.
if s.receivedFirstPacket && retransmitPacket.PacketType == protocol.PacketTypeInitial {
utils.Debugf("Skipping retransmission of packet %d. Already received a response to an Initial.", retransmitPacket.PacketNumber)
retransmitPacket = nil
continue
}
break

View file

@ -754,10 +754,10 @@ var _ = Describe("Session", func() {
PacketNumber: 10,
PacketType: protocol.PacketTypeHandshake,
})
sph.EXPECT().DequeuePacketForRetransmission()
sph.EXPECT().SendingAllowed().Return(true).Times(2)
sph.EXPECT().SendingAllowed()
sph.EXPECT().SendMode().Return(ackhandler.SendRetransmission)
sph.EXPECT().SendMode().Return(ackhandler.SendAny)
sph.EXPECT().ShouldSendNumPackets().Return(2)
sph.EXPECT().TimeUntilSend()
sph.EXPECT().GetStopWaitingFrame(gomock.Any()).Return(&wire.StopWaitingFrame{})
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {
// retransmitted packet
@ -773,6 +773,14 @@ var _ = Describe("Session", func() {
err := sess.sendPackets()
Expect(err).ToNot(HaveOccurred())
})
It("doesn't send when the SentPacketHandler doesn't allow it", func() {
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().SendMode().Return(ackhandler.SendNone)
sess.sentPacketHandler = sph
err := sess.sendPackets()
Expect(err).ToNot(HaveOccurred())
})
})
Context("packet pacing", func() {
@ -792,12 +800,11 @@ var _ = Describe("Session", func() {
sph.EXPECT().SentPacket(gomock.Any()).Times(2)
sph.EXPECT().ShouldSendNumPackets().Return(1).Times(2)
sph.EXPECT().TimeUntilSend().Return(time.Now()).Times(2)
sph.EXPECT().SendingAllowed().Do(func() {
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour))
sph.EXPECT().SendMode().Return(ackhandler.SendAny).Do(func() {
// make sure there's something to send
sess.packer.QueueControlFrame(&wire.MaxDataFrame{ByteOffset: 1})
}).Return(true).Times(3) // allow 2 packets...
// ...then report that we're congestion limited
sph.EXPECT().SendingAllowed()
}).Times(2) // allow 2 packets...
done := make(chan struct{})
go func() {
defer GinkgoRecover()
@ -812,6 +819,29 @@ var _ = Describe("Session", func() {
Eventually(done).Should(BeClosed())
})
// when becoming congestion limited, at some point the SendMode will change from SendAny to SendAck
// we shouldn't send the ACK in the same run
It("doesn't send an ACK right after becoming congestion limited", func() {
sess.packer.QueueControlFrame(&wire.MaxDataFrame{ByteOffset: 1})
sph.EXPECT().SentPacket(gomock.Any())
sph.EXPECT().ShouldSendNumPackets().Return(1000)
sph.EXPECT().TimeUntilSend().Return(time.Now())
sph.EXPECT().SendMode().Return(ackhandler.SendAny)
sph.EXPECT().SendMode().Return(ackhandler.SendAck)
done := make(chan struct{})
go func() {
defer GinkgoRecover()
sess.run()
close(done)
}()
sess.scheduleSending()
Eventually(mconn.written).Should(HaveLen(1))
Consistently(mconn.written).Should(HaveLen(1))
// make the go routine return
sess.Close(nil)
Eventually(done).Should(BeClosed())
})
It("paces packets", func() {
pacingDelay := scaleDuration(100 * time.Millisecond)
sess.packer.QueueControlFrame(&wire.MaxDataFrame{ByteOffset: 1})
@ -820,10 +850,10 @@ var _ = Describe("Session", func() {
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(pacingDelay)) // send one
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour))
sph.EXPECT().ShouldSendNumPackets().Times(2).Return(1)
sph.EXPECT().SendingAllowed().Do(func() { // after sending the first packet
sph.EXPECT().SendMode().Return(ackhandler.SendAny).Do(func() { // after sending the first packet
// make sure there's something to send
sess.packer.QueueControlFrame(&wire.MaxDataFrame{ByteOffset: 2})
}).Return(true).AnyTimes()
}).AnyTimes()
done := make(chan struct{})
go func() {
defer GinkgoRecover()
@ -844,10 +874,10 @@ var _ = Describe("Session", func() {
sph.EXPECT().ShouldSendNumPackets().Return(3)
sph.EXPECT().TimeUntilSend().Return(time.Now())
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour))
sph.EXPECT().SendingAllowed().Do(func() {
sph.EXPECT().SendMode().Return(ackhandler.SendAny).Do(func() {
// make sure there's something to send
sess.packer.QueueControlFrame(&wire.MaxDataFrame{ByteOffset: 1})
}).Return(true).Times(4)
}).Times(3)
done := make(chan struct{})
go func() {
defer GinkgoRecover()
@ -864,7 +894,7 @@ var _ = Describe("Session", func() {
It("doesn't set a pacing timer when there is no data to send", func() {
sph.EXPECT().TimeUntilSend().Return(time.Now())
sph.EXPECT().ShouldSendNumPackets().Return(1)
sph.EXPECT().SendingAllowed().Return(true).AnyTimes()
sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes()
done := make(chan struct{})
go func() {
defer GinkgoRecover()
@ -896,7 +926,8 @@ var _ = Describe("Session", func() {
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().GetPacketNumberLen(gomock.Any()).Return(protocol.PacketNumberLen2).AnyTimes()
sph.EXPECT().GetAlarmTimeout().AnyTimes()
sph.EXPECT().SendingAllowed()
sph.EXPECT().SendMode().Return(ackhandler.SendAck)
sph.EXPECT().ShouldSendNumPackets().Return(1000)
sph.EXPECT().GetStopWaitingFrame(false).Return(swf)
sph.EXPECT().TimeUntilSend()
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {
@ -927,7 +958,8 @@ var _ = Describe("Session", func() {
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().GetPacketNumberLen(gomock.Any()).Return(protocol.PacketNumberLen2).AnyTimes()
sph.EXPECT().GetAlarmTimeout().AnyTimes()
sph.EXPECT().SendingAllowed()
sph.EXPECT().SendMode().Return(ackhandler.SendAck)
sph.EXPECT().ShouldSendNumPackets().Return(1000)
sph.EXPECT().TimeUntilSend()
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {
Expect(p.Frames).To(HaveLen(1))
@ -1105,22 +1137,13 @@ var _ = Describe("Session", func() {
It("sends when scheduleSending is called", func() {
sess.packer.packetNumberGenerator.next = 10000
f := &wire.StreamFrame{
StreamID: 0x5,
Data: []byte("foobar"),
}
sess.packer.QueueControlFrame(&wire.BlockedFrame{})
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().GetAlarmTimeout().AnyTimes()
sph.EXPECT().TimeUntilSend().AnyTimes()
sph.EXPECT().SendingAllowed().AnyTimes().Return(true)
sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes()
sph.EXPECT().ShouldSendNumPackets().AnyTimes().Return(1)
sph.EXPECT().GetPacketNumberLen(gomock.Any()).Return(protocol.PacketNumberLen2).AnyTimes()
sph.EXPECT().GetStopWaitingFrame(true).Return(&wire.StopWaitingFrame{LeastUnacked: 10})
sph.EXPECT().DequeuePacketForRetransmission().Return(&ackhandler.Packet{
PacketNumber: 0x1337,
Frames: []wire.Frame{f},
EncryptionLevel: protocol.EncryptionForwardSecure,
})
sph.EXPECT().SentPacket(gomock.Any())
sess.sentPacketHandler = sph
@ -1144,8 +1167,7 @@ var _ = Describe("Session", func() {
sph.EXPECT().TimeUntilSend().Return(time.Now())
sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour))
sph.EXPECT().GetAlarmTimeout().AnyTimes()
sph.EXPECT().SendingAllowed().Return(true).AnyTimes()
sph.EXPECT().DequeuePacketForRetransmission()
sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes()
sph.EXPECT().GetStopWaitingFrame(gomock.Any())
sph.EXPECT().ShouldSendNumPackets().Return(1)
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {