handle Retry packets in the session

This commit is contained in:
Marten Seemann 2019-02-02 12:44:43 +08:00
parent 7a7e7ca6eb
commit 17f4ebad64
11 changed files with 199 additions and 194 deletions

View file

@ -24,8 +24,6 @@ type client struct {
packetHandlers packetHandlerManager
token []byte
versionNegotiated utils.AtomicBool // has the server accepted our version
receivedVersionNegotiationPacket bool
negotiatedVersions []protocol.VersionNumber // the list of versions from the version negotiation packet
@ -33,9 +31,8 @@ type client struct {
tlsConf *tls.Config
config *Config
srcConnID protocol.ConnectionID
destConnID protocol.ConnectionID
origDestConnID protocol.ConnectionID // the destination conn ID used on the first Initial (before a Retry)
srcConnID protocol.ConnectionID
destConnID protocol.ConnectionID
initialPacketNumber protocol.PacketNumber
@ -262,7 +259,7 @@ func (c *client) dial(ctx context.Context) error {
// establishSecureConnection runs the session, and tries to establish a secure connection
// It returns:
// - errCloseSessionRecreating when the server sends a version negotiation packet, or a stateless retry is performed
// - errCloseForRecreating when the server sends a version negotiation packet
// - any other error that might occur
// - when the connection is forward-secure
func (c *client) establishSecureConnection(ctx context.Context) error {
@ -295,11 +292,6 @@ func (c *client) handlePacket(p *receivedPacket) {
return
}
if p.hdr.Type == protocol.PacketTypeRetry {
go c.handleRetryPacket(p.hdr)
return
}
// this is the first packet we are receiving
// since it is not a Version Negotiation Packet, this means the server supports the suggested version
if !c.versionNegotiated.Get() {
@ -345,32 +337,6 @@ func (c *client) handleVersionNegotiationPacket(hdr *wire.Header) {
c.initialPacketNumber = c.session.closeForRecreating()
}
func (c *client) handleRetryPacket(hdr *wire.Header) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.logger.Debugf("<- Received Retry")
(&wire.ExtendedHeader{Header: *hdr}).Log(c.logger)
if !hdr.OrigDestConnectionID.Equal(c.destConnID) {
c.logger.Debugf("Ignoring spoofed Retry. Original Destination Connection ID: %s, expected: %s", hdr.OrigDestConnectionID, c.destConnID)
return
}
if hdr.SrcConnectionID.Equal(c.destConnID) {
c.logger.Debugf("Ignoring Retry, since the server didn't change the Source Connection ID.")
return
}
// If a token is already set, this means that we already received a Retry from the server.
// Ignore this Retry packet.
if len(c.token) > 0 {
c.logger.Debugf("Ignoring Retry, since a Retry was already received.")
return
}
c.origDestConnID = c.destConnID
c.destConnID = hdr.SrcConnectionID
c.token = hdr.Token
c.initialPacketNumber = c.session.closeForRecreating()
}
func (c *client) createNewTLSSession(version protocol.VersionNumber) error {
params := &handshake.TransportParameters{
InitialMaxStreamDataBidiRemote: protocol.InitialMaxStreamData,
@ -394,8 +360,6 @@ func (c *client) createNewTLSSession(version protocol.VersionNumber) error {
sess, err := newClientSession(
c.conn,
runner,
c.token,
c.origDestConnID,
c.destConnID,
c.srcConnID,
c.config,

View file

@ -32,8 +32,6 @@ var _ = Describe("Client", func() {
originalClientSessConstructor func(
conn connection,
runner sessionRunner,
token []byte,
origDestConnID protocol.ConnectionID,
destConnID protocol.ConnectionID,
srcConnID protocol.ConnectionID,
conf *Config,
@ -138,8 +136,6 @@ var _ = Describe("Client", func() {
newClientSession = func(
conn connection,
_ sessionRunner,
_ []byte, // token
_ protocol.ConnectionID,
_ protocol.ConnectionID,
_ protocol.ConnectionID,
_ *Config,
@ -170,8 +166,6 @@ var _ = Describe("Client", func() {
newClientSession = func(
_ connection,
_ sessionRunner,
_ []byte, // token
_ protocol.ConnectionID,
_ protocol.ConnectionID,
_ protocol.ConnectionID,
_ *Config,
@ -201,8 +195,6 @@ var _ = Describe("Client", func() {
newClientSession = func(
_ connection,
runner sessionRunner,
_ []byte, // token
_ protocol.ConnectionID,
_ protocol.ConnectionID,
_ protocol.ConnectionID,
_ *Config,
@ -239,8 +231,6 @@ var _ = Describe("Client", func() {
newClientSession = func(
_ connection,
_ sessionRunner,
_ []byte, // token
_ protocol.ConnectionID,
_ protocol.ConnectionID,
_ protocol.ConnectionID,
_ *Config,
@ -280,8 +270,6 @@ var _ = Describe("Client", func() {
newClientSession = func(
_ connection,
_ sessionRunner,
_ []byte, // token
_ protocol.ConnectionID,
_ protocol.ConnectionID,
_ protocol.ConnectionID,
_ *Config,
@ -326,8 +314,6 @@ var _ = Describe("Client", func() {
newClientSession = func(
_ connection,
runnerP sessionRunner,
_ []byte, // token
_ protocol.ConnectionID,
_ protocol.ConnectionID,
_ protocol.ConnectionID,
_ *Config,
@ -372,8 +358,6 @@ var _ = Describe("Client", func() {
newClientSession = func(
connP connection,
_ sessionRunner,
_ []byte, // token
_ protocol.ConnectionID,
_ protocol.ConnectionID,
_ protocol.ConnectionID,
_ *Config,
@ -485,8 +469,6 @@ var _ = Describe("Client", func() {
newClientSession = func(
connP connection,
_ sessionRunner,
tokenP []byte,
_ protocol.ConnectionID,
_ protocol.ConnectionID,
_ protocol.ConnectionID,
configP *Config,
@ -514,132 +496,6 @@ var _ = Describe("Client", func() {
Expect(conf.Versions).To(Equal(config.Versions))
})
It("creates a new session when the server performs a retry", func() {
manager := NewMockPacketHandlerManager(mockCtrl)
manager.EXPECT().Add(gomock.Any(), gomock.Any()).Do(func(id protocol.ConnectionID, handler packetHandler) {
go handler.handlePacket(&receivedPacket{
hdr: &wire.Header{
IsLongHeader: true,
Type: protocol.PacketTypeRetry,
Version: cl.version,
Token: []byte("foobar"),
OrigDestConnectionID: connID,
DestConnectionID: id,
},
})
})
manager.EXPECT().Add(gomock.Any(), gomock.Any())
mockMultiplexer.EXPECT().AddConn(packetConn, gomock.Any()).Return(manager, nil)
config := &Config{Versions: []protocol.VersionNumber{protocol.VersionTLS}}
cl.config = config
run1 := make(chan error)
sess1 := NewMockQuicSession(mockCtrl)
sess1.EXPECT().run().DoAndReturn(func() error {
return <-run1
})
sess1.EXPECT().closeForRecreating().DoAndReturn(func() protocol.PacketNumber {
run1 <- errCloseForRecreating
return 42
})
sess2 := NewMockQuicSession(mockCtrl)
sess2.EXPECT().run()
sessions := make(chan quicSession, 2)
sessions <- sess1
sessions <- sess2
newClientSession = func(
conn connection,
_ sessionRunner,
_ []byte, // token
origDestConnID protocol.ConnectionID,
destConnID protocol.ConnectionID,
_ protocol.ConnectionID,
_ *Config,
_ *tls.Config,
initialPacketNumber protocol.PacketNumber,
_ *handshake.TransportParameters,
_ protocol.VersionNumber,
_ utils.Logger,
_ protocol.VersionNumber,
) (quicSession, error) {
switch len(sessions) {
case 2: // for the first session
Expect(initialPacketNumber).To(BeZero())
Expect(origDestConnID).To(BeNil())
Expect(destConnID).ToNot(BeNil())
case 1: // for the second session
Expect(initialPacketNumber).To(Equal(protocol.PacketNumber(42)))
Expect(origDestConnID).To(Equal(connID))
Expect(destConnID).ToNot(Equal(connID))
}
return <-sessions, nil
}
_, err := Dial(packetConn, addr, "localhost:1337", nil, config)
Expect(err).ToNot(HaveOccurred())
Expect(sessions).To(BeEmpty())
})
It("only accepts a single retry", func() {
manager := NewMockPacketHandlerManager(mockCtrl)
manager.EXPECT().Add(gomock.Any(), gomock.Any()).Do(func(id protocol.ConnectionID, handler packetHandler) {
go handler.handlePacket(&receivedPacket{
hdr: &wire.Header{
IsLongHeader: true,
Type: protocol.PacketTypeRetry,
SrcConnectionID: protocol.ConnectionID{1, 2, 3, 4},
DestConnectionID: id,
OrigDestConnectionID: connID,
Token: []byte("foobar"),
Version: cl.version,
},
})
}).AnyTimes()
manager.EXPECT().Add(gomock.Any(), gomock.Any()).AnyTimes()
mockMultiplexer.EXPECT().AddConn(packetConn, gomock.Any()).Return(manager, nil)
config := &Config{Versions: []protocol.VersionNumber{protocol.VersionTLS}}
cl.config = config
sessions := make(chan quicSession, 2)
run := make(chan error)
sess := NewMockQuicSession(mockCtrl)
sess.EXPECT().run().DoAndReturn(func() error {
defer GinkgoRecover()
var err error
Eventually(run).Should(Receive(&err))
return err
})
sess.EXPECT().closeForRecreating().Do(func() {
run <- errCloseForRecreating
})
sessions <- sess
doneErr := errors.New("nothing to do")
sess = NewMockQuicSession(mockCtrl)
sess.EXPECT().run().Return(doneErr)
sessions <- sess
newClientSession = func(
conn connection,
_ sessionRunner,
_ []byte, // token
_ protocol.ConnectionID,
_ protocol.ConnectionID,
_ protocol.ConnectionID,
_ *Config,
_ *tls.Config,
_ protocol.PacketNumber,
_ *handshake.TransportParameters,
_ protocol.VersionNumber,
_ utils.Logger,
_ protocol.VersionNumber,
) (quicSession, error) {
return <-sessions, nil
}
_, err := Dial(packetConn, addr, "localhost:1337", nil, config)
Expect(err).To(MatchError(doneErr))
Expect(sessions).To(BeEmpty())
})
Context("version negotiation", func() {
var origSupportedVersions []protocol.VersionNumber
@ -661,8 +517,6 @@ var _ = Describe("Client", func() {
newClientSession = func(
conn connection,
_ sessionRunner,
_ []byte, // token
_ protocol.ConnectionID,
_ protocol.ConnectionID,
_ protocol.ConnectionID,
_ *Config,

View file

@ -14,6 +14,7 @@ type SentPacketHandler interface {
SentPacketsAsRetransmission(packets []*Packet, retransmissionOf protocol.PacketNumber)
ReceivedAck(ackFrame *wire.AckFrame, withPacketNumber protocol.PacketNumber, encLevel protocol.EncryptionLevel, recvTime time.Time) error
SetHandshakeComplete()
ResetForRetry() error
// The SendMode determines if and what kind of packets can be sent.
SendMode() SendMode

View file

@ -578,3 +578,22 @@ func (h *sentPacketHandler) computePTOTimeout() time.Duration {
duration := utils.MaxDuration(h.rttStats.SmoothedOrInitialRTT()+4*h.rttStats.MeanDeviation(), granularity)
return duration << h.ptoCount
}
func (h *sentPacketHandler) ResetForRetry() error {
h.cryptoCount = 0
h.bytesInFlight = 0
var packets []*Packet
h.packetHistory.Iterate(func(p *Packet) (bool, error) {
if p.canBeRetransmitted {
packets = append(packets, p)
}
return true, nil
})
for _, p := range packets {
h.logger.Debugf("Queueing packet %#x for retransmission.", p.PacketNumber)
h.retransmissionQueue = append(h.retransmissionQueue, p)
}
h.packetHistory = newSentPacketHistory()
h.updateLossDetectionAlarm()
return nil
}

View file

@ -873,4 +873,28 @@ var _ = Describe("SentPacketHandler", func() {
Expect(handler.PopPacketNumber()).To(BeNumerically(">", 42))
})
})
Context("resetting for retry", func() {
It("queues outstanding packets for retransmission and cancels alarms", func() {
packet := &Packet{
PacketNumber: 42,
EncryptionLevel: protocol.EncryptionInitial,
Frames: []wire.Frame{&wire.CryptoFrame{Data: []byte("foobar")}},
Length: 100,
}
handler.SentPacket(packet)
Expect(handler.GetAlarmTimeout()).ToNot(BeZero())
Expect(handler.bytesInFlight).ToNot(BeZero())
Expect(handler.DequeuePacketForRetransmission()).To(BeNil())
Expect(handler.SendMode()).To(Equal(SendAny))
// now receive a Retry
Expect(handler.ResetForRetry()).To(Succeed())
Expect(handler.bytesInFlight).To(BeZero())
Expect(handler.GetAlarmTimeout()).To(BeZero())
Expect(handler.SendMode()).To(Equal(SendRetransmission))
p := handler.DequeuePacketForRetransmission()
Expect(p.PacketNumber).To(Equal(packet.PacketNumber))
Expect(p.Frames).To(Equal(packet.Frames))
})
})
})

View file

@ -199,6 +199,16 @@ func newCryptoSetup(
return cs, cs.clientHelloWrittenChan, nil
}
func (h *cryptoSetup) ChangeConnectionID(id protocol.ConnectionID) error {
initialSealer, initialOpener, err := NewInitialAEAD(id, h.perspective)
if err != nil {
return err
}
h.initialSealer = initialSealer
h.initialOpener = initialOpener
return nil
}
func (h *cryptoSetup) RunHandshake() error {
// Handle errors that might occur when HandleData() is called.
handshakeErrChan := make(chan error, 1)

View file

@ -32,6 +32,7 @@ type tlsExtensionHandler interface {
type CryptoSetup interface {
RunHandshake() error
io.Closer
ChangeConnectionID(protocol.ConnectionID) error
HandleMessage([]byte, protocol.EncryptionLevel) bool
ConnectionState() ConnectionState

View file

@ -135,6 +135,18 @@ func (mr *MockSentPacketHandlerMockRecorder) ReceivedAck(arg0, arg1, arg2, arg3
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReceivedAck", reflect.TypeOf((*MockSentPacketHandler)(nil).ReceivedAck), arg0, arg1, arg2, arg3)
}
// ResetForRetry mocks base method
func (m *MockSentPacketHandler) ResetForRetry() error {
ret := m.ctrl.Call(m, "ResetForRetry")
ret0, _ := ret[0].(error)
return ret0
}
// ResetForRetry indicates an expected call of ResetForRetry
func (mr *MockSentPacketHandlerMockRecorder) ResetForRetry() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetForRetry", reflect.TypeOf((*MockSentPacketHandler)(nil).ResetForRetry))
}
// SendMode mocks base method
func (m *MockSentPacketHandler) SendMode() ackhandler.SendMode {
ret := m.ctrl.Call(m, "SendMode")

View file

@ -35,6 +35,18 @@ func (m *MockCryptoSetup) EXPECT() *MockCryptoSetupMockRecorder {
return m.recorder
}
// ChangeConnectionID mocks base method
func (m *MockCryptoSetup) ChangeConnectionID(arg0 protocol.ConnectionID) error {
ret := m.ctrl.Call(m, "ChangeConnectionID", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// ChangeConnectionID indicates an expected call of ChangeConnectionID
func (mr *MockCryptoSetupMockRecorder) ChangeConnectionID(arg0 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChangeConnectionID", reflect.TypeOf((*MockCryptoSetup)(nil).ChangeConnectionID), arg0)
}
// Close mocks base method
func (m *MockCryptoSetup) Close() error {
ret := m.ctrl.Call(m, "Close")

View file

@ -48,6 +48,7 @@ type streamManager interface {
type cryptoStreamHandler interface {
RunHandshake() error
ChangeConnectionID(protocol.ConnectionID) error
io.Closer
ConnectionState() handshake.ConnectionState
}
@ -120,6 +121,7 @@ type session struct {
handshakeCompleteChan chan struct{} // is closed when the handshake completes
handshakeComplete bool
receivedRetry bool
receivedFirstPacket bool
receivedFirstForwardSecurePacket bool
@ -221,8 +223,6 @@ var newSession = func(
var newClientSession = func(
conn connection,
runner sessionRunner,
token []byte,
origDestConnID protocol.ConnectionID,
destConnID protocol.ConnectionID,
srcConnID protocol.ConnectionID,
conf *Config,
@ -239,7 +239,6 @@ var newClientSession = func(
config: conf,
srcConnID: srcConnID,
destConnID: destConnID,
origDestConnID: origDestConnID,
perspective: protocol.PerspectiveClient,
handshakeCompleteChan: make(chan struct{}),
logger: logger,
@ -292,7 +291,6 @@ var newClientSession = func(
s.perspective,
s.version,
)
s.packer.SetToken(token)
return s, s.postSetup()
}
@ -491,6 +489,10 @@ func (s *session) handlePacketImpl(p *receivedPacket) bool /* was the packet suc
}
}()
if p.hdr.Type == protocol.PacketTypeRetry {
return s.handleRetryPacket(p)
}
// The server can change the source connection ID with the first Handshake packet.
// After this, all packets with a different source connection have to be ignored.
if s.receivedFirstPacket && p.hdr.IsLongHeader && !p.hdr.SrcConnectionID.Equal(s.destConnID) {
@ -529,6 +531,47 @@ func (s *session) handlePacketImpl(p *receivedPacket) bool /* was the packet suc
return true
}
func (s *session) handleRetryPacket(p *receivedPacket) bool /* was this a valid Retry */ {
if s.perspective == protocol.PerspectiveServer {
s.logger.Debugf("Ignoring Retry.")
return false
}
if s.receivedFirstPacket {
s.logger.Debugf("Ignoring Retry, since we already received a packet.")
return false
}
hdr := p.hdr
(&wire.ExtendedHeader{Header: *hdr}).Log(s.logger)
if !hdr.OrigDestConnectionID.Equal(s.destConnID) {
s.logger.Debugf("Ignoring spoofed Retry. Original Destination Connection ID: %s, expected: %s", hdr.OrigDestConnectionID, s.destConnID)
return false
}
if hdr.SrcConnectionID.Equal(s.destConnID) {
s.logger.Debugf("Ignoring Retry, since the server didn't change the Source Connection ID.")
return false
}
// If a token is already set, this means that we already received a Retry from the server.
// Ignore this Retry packet.
if s.receivedRetry {
s.logger.Debugf("Ignoring Retry, since a Retry was already received.")
return false
}
s.logger.Debugf("<- Received Retry")
s.logger.Debugf("Switching destination connection ID to: %s", hdr.SrcConnectionID)
s.origDestConnID = s.destConnID
s.destConnID = hdr.SrcConnectionID
s.receivedRetry = true
if err := s.sentPacketHandler.ResetForRetry(); err != nil {
s.closeLocal(err)
return false
}
s.cryptoStreamHandler.ChangeConnectionID(s.destConnID)
s.packer.SetToken(hdr.Token)
s.packer.ChangeDestConnectionID(s.destConnID)
s.scheduleSending()
return true
}
func (s *session) handleUnpackedPacket(packet *unpackedPacket, rcvTime time.Time) error {
if len(packet.data) == 0 {
return qerr.MissingPayload

View file

@ -4,7 +4,6 @@ import (
"bytes"
"context"
"errors"
"fmt"
"net"
"runtime/pprof"
"strings"
@ -355,6 +354,20 @@ var _ = Describe("Session", func() {
Expect(str).To(Equal(mstr))
})
It("drops Retry packets", func() {
hdr := wire.Header{
IsLongHeader: true,
Type: protocol.PacketTypeRetry,
}
buf := &bytes.Buffer{}
(&wire.ExtendedHeader{Header: hdr}).Write(buf, sess.version)
Expect(sess.handlePacketImpl(&receivedPacket{
hdr: &hdr,
data: buf.Bytes(),
buffer: getPacketBuffer(),
})).To(BeFalse())
})
Context("closing", func() {
var (
runErr error
@ -1431,10 +1444,8 @@ var _ = Describe("Client Session", func() {
sessP, err := newClientSession(
mconn,
sessionRunner,
[]byte("token"),
protocol.ConnectionID{8, 7, 6, 5, 4, 3, 2, 1},
protocol.ConnectionID{8, 7, 6, 5, 4, 3, 2, 1},
protocol.ConnectionID{8, 7, 6, 5, 4, 3, 2, 1},
protocol.ConnectionID{1, 2, 3, 4, 5, 6, 7, 8},
populateClientConfig(&Config{}, true),
nil, // tls.Config
42, // initial packet number
@ -1486,6 +1497,60 @@ var _ = Describe("Client Session", func() {
Eventually(sess.Context().Done()).Should(BeClosed())
})
Context("handling Retry", func() {
var validRetryHdr *wire.Header
BeforeEach(func() {
validRetryHdr = &wire.Header{
IsLongHeader: true,
Type: protocol.PacketTypeRetry,
SrcConnectionID: protocol.ConnectionID{0xde, 0xad, 0xbe, 0xef},
DestConnectionID: protocol.ConnectionID{1, 2, 3, 4, 5, 6, 7, 8},
OrigDestConnectionID: protocol.ConnectionID{8, 7, 6, 5, 4, 3, 2, 1},
Token: []byte("foobar"),
}
})
getPacket := func(hdr *wire.Header) *receivedPacket {
buf := &bytes.Buffer{}
(&wire.ExtendedHeader{Header: *hdr}).Write(buf, sess.version)
return &receivedPacket{
hdr: hdr,
data: buf.Bytes(),
buffer: getPacketBuffer(),
}
}
It("handles Retry packets", func() {
cryptoSetup.EXPECT().ChangeConnectionID(protocol.ConnectionID{0xde, 0xad, 0xbe, 0xef})
packer.EXPECT().SetToken([]byte("foobar"))
packer.EXPECT().ChangeDestConnectionID(protocol.ConnectionID{0xde, 0xad, 0xbe, 0xef})
Expect(sess.handlePacketImpl(getPacket(validRetryHdr))).To(BeTrue())
})
It("ignores Retry packets after receiving a regular packet", func() {
sess.receivedFirstPacket = true
Expect(sess.handlePacketImpl(getPacket(validRetryHdr))).To(BeFalse())
})
It("ignores Retry packets if the server didn't change the connection ID", func() {
validRetryHdr.SrcConnectionID = sess.destConnID
Expect(sess.handlePacketImpl(getPacket(validRetryHdr))).To(BeFalse())
})
It("ignores Retry packets with the wrong original destination connection ID", func() {
hdr := &wire.Header{
IsLongHeader: true,
Type: protocol.PacketTypeRetry,
SrcConnectionID: protocol.ConnectionID{0xde, 0xad, 0xbe, 0xef},
DestConnectionID: protocol.ConnectionID{1, 2, 3, 4, 5, 6, 7, 8},
OrigDestConnectionID: protocol.ConnectionID{1, 2, 3, 4},
Token: []byte("foobar"),
}
Expect(sess.handlePacketImpl(getPacket(hdr))).To(BeFalse())
})
})
Context("transport parameters", func() {
It("errors if it can't unmarshal the TransportParameters", func() {
go func() {
@ -1522,7 +1587,7 @@ var _ = Describe("Client Session", func() {
},
}
_, err := sess.processTransportParametersForClient(eetp.Marshal())
Expect(err).To(MatchError(fmt.Sprintf("expected original_connection_id to equal %s, is 0xdecafbad", sess.destConnID)))
Expect(err).To(MatchError("expected original_connection_id to equal (empty), is 0xdecafbad"))
})
It("errors if the TransportParameters contain an original_connection_id, although no Retry was performed", func() {