From c741b6fc092ed77a5cd3669bb248b208c32d62af Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 1 Mar 2021 11:23:19 +0800 Subject: [PATCH 1/5] drop STREAM and *_BLOCKED frames from queue when 0-RTT is rejected --- framer.go | 29 +++++++++++++++++++++++++++++ framer_test.go | 34 +++++++++++++++++++++++++++++++--- 2 files changed, 60 insertions(+), 3 deletions(-) diff --git a/framer.go b/framer.go index 4fbb9ef9..29d36b85 100644 --- a/framer.go +++ b/framer.go @@ -1,6 +1,7 @@ package quic import ( + "errors" "sync" "github.com/lucas-clemente/quic-go/internal/ackhandler" @@ -17,6 +18,8 @@ type framer interface { AddActiveStream(protocol.StreamID) AppendStreamFrames([]ackhandler.Frame, protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) + + Handle0RTTRejection() error } type framerI struct { @@ -140,3 +143,29 @@ func (f *framerI) AppendStreamFrames(frames []ackhandler.Frame, maxLen protocol. } return frames, length } + +func (f *framerI) Handle0RTTRejection() error { + f.mutex.Lock() + defer f.mutex.Unlock() + + f.controlFrameMutex.Lock() + f.streamQueue = f.streamQueue[:0] + for id := range f.activeStreams { + delete(f.activeStreams, id) + } + var j int + for i, frame := range f.controlFrames { + switch frame.(type) { + case *wire.MaxDataFrame, *wire.MaxStreamDataFrame, *wire.MaxStreamsFrame: + return errors.New("didn't expect MAX_DATA / MAX_STREAM_DATA / MAX_STREAMS frame to be sent in 0-RTT") + case *wire.DataBlockedFrame, *wire.StreamDataBlockedFrame, *wire.StreamsBlockedFrame: + continue + default: + f.controlFrames[j] = f.controlFrames[i] + j++ + } + } + f.controlFrames = f.controlFrames[:j] + f.controlFrameMutex.Unlock() + return nil +} diff --git a/framer_test.go b/framer_test.go index 28590275..71f7b2aa 100644 --- a/framer_test.go +++ b/framer_test.go @@ -2,14 +2,14 @@ package quic import ( "bytes" + "math/rand" "github.com/lucas-clemente/quic-go/internal/ackhandler" - - "github.com/golang/mock/gomock" - "github.com/lucas-clemente/quic-go/internal/protocol" "github.com/lucas-clemente/quic-go/internal/wire" + "github.com/golang/mock/gomock" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) @@ -86,6 +86,26 @@ var _ = Describe("Framer", func() { Expect(frames).To(HaveLen(1)) Expect(length).To(Equal(bfLen)) }) + + It("drops *_BLOCKED frames when 0-RTT is rejected", func() { + ping := &wire.PingFrame{} + ncid := &wire.NewConnectionIDFrame{SequenceNumber: 10, ConnectionID: protocol.ConnectionID{0xde, 0xad, 0xbe, 0xef}} + frames := []wire.Frame{ + &wire.DataBlockedFrame{MaximumData: 1337}, + &wire.StreamDataBlockedFrame{StreamID: 42, MaximumStreamData: 1337}, + &wire.StreamsBlockedFrame{StreamLimit: 13}, + ping, + ncid, + } + rand.Shuffle(len(frames), func(i, j int) { frames[i], frames[j] = frames[j], frames[i] }) + for _, f := range frames { + framer.QueueControlFrame(f) + } + Expect(framer.Handle0RTTRejection()).To(Succeed()) + fs, length := framer.AppendControlFrames(nil, protocol.MaxByteCount) + Expect(fs).To(HaveLen(2)) + Expect(length).To(Equal(ping.Length(version) + ncid.Length(version))) + }) }) Context("popping STREAM frames", func() { @@ -353,5 +373,13 @@ var _ = Describe("Framer", func() { Expect(fs[0].Frame).To(Equal(f)) Expect(length).To(Equal(f.Length(version))) }) + + It("drops all STREAM frames when 0-RTT is rejected", func() { + framer.AddActiveStream(id1) + Expect(framer.Handle0RTTRejection()).To(Succeed()) + fs, length := framer.AppendStreamFrames(nil, protocol.MaxByteCount) + Expect(fs).To(BeEmpty()) + Expect(length).To(BeZero()) + }) }) }) From a04a0072fbe7e7835157e5fdac8c598fe780036f Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 1 Mar 2021 11:55:02 +0800 Subject: [PATCH 2/5] add a function to reset the connection flow controller --- .../flowcontrol/connection_flow_controller.go | 16 +++++++++++++ .../connection_flow_controller_test.go | 24 +++++++++++++++++++ internal/flowcontrol/interface.go | 1 + internal/mocks/connection_flow_controller.go | 14 +++++++++++ 4 files changed, 55 insertions(+) diff --git a/internal/flowcontrol/connection_flow_controller.go b/internal/flowcontrol/connection_flow_controller.go index 253d3156..3519817a 100644 --- a/internal/flowcontrol/connection_flow_controller.go +++ b/internal/flowcontrol/connection_flow_controller.go @@ -1,6 +1,7 @@ package flowcontrol import ( + "errors" "fmt" "time" @@ -86,3 +87,18 @@ func (c *connectionFlowController) EnsureMinimumWindowSize(inc protocol.ByteCoun } c.mutex.Unlock() } + +// The flow controller is reset when 0-RTT is rejected. +// All stream data is invalidated, it's if we had never opened a stream and never sent any data. +// At that point, we only have sent stream data, but we didn't have the keys to open 1-RTT keys yet. +func (c *connectionFlowController) Reset() error { + c.mutex.Lock() + defer c.mutex.Unlock() + + if c.bytesRead > 0 || c.highestReceived > 0 || !c.epochStartTime.IsZero() { + return errors.New("flow controller reset after reading data") + } + c.bytesSent = 0 + c.lastBlockedAt = 0 + return nil +} diff --git a/internal/flowcontrol/connection_flow_controller_test.go b/internal/flowcontrol/connection_flow_controller_test.go index 4dc1b692..e7782783 100644 --- a/internal/flowcontrol/connection_flow_controller_test.go +++ b/internal/flowcontrol/connection_flow_controller_test.go @@ -129,4 +129,28 @@ var _ = Describe("Connection Flow controller", func() { Expect(controller.epochStartTime).To(BeTemporally("~", time.Now(), 100*time.Millisecond)) }) }) + + Context("resetting", func() { + It("resets", func() { + const initialWindow protocol.ByteCount = 1337 + controller.UpdateSendWindow(initialWindow) + controller.AddBytesSent(1000) + Expect(controller.SendWindowSize()).To(Equal(initialWindow - 1000)) + Expect(controller.Reset()).To(Succeed()) + Expect(controller.SendWindowSize()).To(Equal(initialWindow)) + }) + + It("says if is blocked after resetting", func() { + const initialWindow protocol.ByteCount = 1337 + controller.UpdateSendWindow(initialWindow) + controller.AddBytesSent(initialWindow) + blocked, _ := controller.IsNewlyBlocked() + Expect(blocked).To(BeTrue()) + Expect(controller.Reset()).To(Succeed()) + controller.AddBytesSent(initialWindow) + blocked, blockedAt := controller.IsNewlyBlocked() + Expect(blocked).To(BeTrue()) + Expect(blockedAt).To(Equal(initialWindow)) + }) + }) }) diff --git a/internal/flowcontrol/interface.go b/internal/flowcontrol/interface.go index a47e7cd7..1eeaee9f 100644 --- a/internal/flowcontrol/interface.go +++ b/internal/flowcontrol/interface.go @@ -29,6 +29,7 @@ type StreamFlowController interface { // The ConnectionFlowController is the flow controller for the connection. type ConnectionFlowController interface { flowController + Reset() error } type connectionFlowControllerI interface { diff --git a/internal/mocks/connection_flow_controller.go b/internal/mocks/connection_flow_controller.go index 2fb5b0d8..ae5848a8 100644 --- a/internal/mocks/connection_flow_controller.go +++ b/internal/mocks/connection_flow_controller.go @@ -87,6 +87,20 @@ func (mr *MockConnectionFlowControllerMockRecorder) IsNewlyBlocked() *gomock.Cal return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsNewlyBlocked", reflect.TypeOf((*MockConnectionFlowController)(nil).IsNewlyBlocked)) } +// Reset mocks base method. +func (m *MockConnectionFlowController) Reset() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Reset") + ret0, _ := ret[0].(error) + return ret0 +} + +// Reset indicates an expected call of Reset. +func (mr *MockConnectionFlowControllerMockRecorder) Reset() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reset", reflect.TypeOf((*MockConnectionFlowController)(nil).Reset)) +} + // SendWindowSize mocks base method. func (m *MockConnectionFlowController) SendWindowSize() protocol.ByteCount { m.ctrl.T.Helper() From 8b630396649e6a7a1d5392225a33853a1b46a728 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 4 Mar 2021 11:57:39 +0800 Subject: [PATCH 3/5] add a function to reset the streams map after 0-RTT rejection --- interface.go | 8 ++ mock_stream_manager_test.go | 24 ++++++ session.go | 2 + streams_map.go | 153 +++++++++++++++++++++++++++--------- streams_map_test.go | 77 ++++++++++-------- 5 files changed, 194 insertions(+), 70 deletions(-) diff --git a/interface.go b/interface.go index 6b5a0176..84ae906c 100644 --- a/interface.go +++ b/interface.go @@ -2,6 +2,7 @@ package quic import ( "context" + "errors" "io" "net" "time" @@ -65,6 +66,13 @@ type TokenStore interface { // Valid values range between 0 and MAX_UINT62. type ErrorCode = protocol.ApplicationErrorCode +// Err0RTTRejected is the returned from: +// * Open{Uni}Stream{Sync} +// * Accept{Uni}Stream +// * Stream.Read and Stream.Write +// when the server rejects a 0-RTT connection attempt. +var Err0RTTRejected = errors.New("0-RTT rejected") + // Stream is the interface implemented by QUIC streams type Stream interface { ReceiveStream diff --git a/mock_stream_manager_test.go b/mock_stream_manager_test.go index 44226e02..92c31da9 100644 --- a/mock_stream_manager_test.go +++ b/mock_stream_manager_test.go @@ -194,6 +194,18 @@ func (mr *MockStreamManagerMockRecorder) OpenUniStreamSync(arg0 interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OpenUniStreamSync", reflect.TypeOf((*MockStreamManager)(nil).OpenUniStreamSync), arg0) } +// ResetFor0RTT mocks base method. +func (m *MockStreamManager) ResetFor0RTT() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ResetFor0RTT") +} + +// ResetFor0RTT indicates an expected call of ResetFor0RTT. +func (mr *MockStreamManagerMockRecorder) ResetFor0RTT() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetFor0RTT", reflect.TypeOf((*MockStreamManager)(nil).ResetFor0RTT)) +} + // UpdateLimits mocks base method. func (m *MockStreamManager) UpdateLimits(arg0 *wire.TransportParameters) { m.ctrl.T.Helper() @@ -205,3 +217,15 @@ func (mr *MockStreamManagerMockRecorder) UpdateLimits(arg0 interface{}) *gomock. mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateLimits", reflect.TypeOf((*MockStreamManager)(nil).UpdateLimits), arg0) } + +// UseResetMaps mocks base method. +func (m *MockStreamManager) UseResetMaps() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "UseResetMaps") +} + +// UseResetMaps indicates an expected call of UseResetMaps. +func (mr *MockStreamManagerMockRecorder) UseResetMaps() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UseResetMaps", reflect.TypeOf((*MockStreamManager)(nil).UseResetMaps)) +} diff --git a/session.go b/session.go index 7146aa2a..395eadfb 100644 --- a/session.go +++ b/session.go @@ -45,6 +45,8 @@ type streamManager interface { UpdateLimits(*wire.TransportParameters) HandleMaxStreamsFrame(*wire.MaxStreamsFrame) CloseWithError(error) + ResetFor0RTT() + UseResetMaps() } type cryptoStreamHandler interface { diff --git a/streams_map.go b/streams_map.go index de4d850b..61a5398e 100644 --- a/streams_map.go +++ b/streams_map.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net" + "sync" "github.com/lucas-clemente/quic-go/internal/flowcontrol" "github.com/lucas-clemente/quic-go/internal/protocol" @@ -45,14 +46,20 @@ var errTooManyOpenStreams = errors.New("too many open streams") type streamsMap struct { perspective protocol.Perspective + version protocol.VersionNumber + + maxIncomingBidiStreams uint64 + maxIncomingUniStreams uint64 sender streamSender newFlowController func(protocol.StreamID) flowcontrol.StreamFlowController + mutex sync.Mutex outgoingBidiStreams *outgoingBidiStreamsMap outgoingUniStreams *outgoingUniStreamsMap incomingBidiStreams *incomingBidiStreamsMap incomingUniStreams *incomingUniStreamsMap + reset bool } var _ streamManager = &streamsMap{} @@ -66,70 +73,119 @@ func newStreamsMap( version protocol.VersionNumber, ) streamManager { m := &streamsMap{ - perspective: perspective, - newFlowController: newFlowController, - sender: sender, + perspective: perspective, + newFlowController: newFlowController, + maxIncomingBidiStreams: maxIncomingBidiStreams, + maxIncomingUniStreams: maxIncomingUniStreams, + sender: sender, + version: version, } - m.outgoingBidiStreams = newOutgoingBidiStreamsMap( - func(num protocol.StreamNum) streamI { - id := num.StreamID(protocol.StreamTypeBidi, perspective) - return newStream(id, m.sender, m.newFlowController(id), version) - }, - sender.queueControlFrame, - ) - m.incomingBidiStreams = newIncomingBidiStreamsMap( - func(num protocol.StreamNum) streamI { - id := num.StreamID(protocol.StreamTypeBidi, perspective.Opposite()) - return newStream(id, m.sender, m.newFlowController(id), version) - }, - maxIncomingBidiStreams, - sender.queueControlFrame, - ) - m.outgoingUniStreams = newOutgoingUniStreamsMap( - func(num protocol.StreamNum) sendStreamI { - id := num.StreamID(protocol.StreamTypeUni, perspective) - return newSendStream(id, m.sender, m.newFlowController(id), version) - }, - sender.queueControlFrame, - ) - m.incomingUniStreams = newIncomingUniStreamsMap( - func(num protocol.StreamNum) receiveStreamI { - id := num.StreamID(protocol.StreamTypeUni, perspective.Opposite()) - return newReceiveStream(id, m.sender, m.newFlowController(id), version) - }, - maxIncomingUniStreams, - sender.queueControlFrame, - ) + m.initMaps() return m } +func (m *streamsMap) initMaps() { + m.outgoingBidiStreams = newOutgoingBidiStreamsMap( + func(num protocol.StreamNum) streamI { + id := num.StreamID(protocol.StreamTypeBidi, m.perspective) + return newStream(id, m.sender, m.newFlowController(id), m.version) + }, + m.sender.queueControlFrame, + ) + m.incomingBidiStreams = newIncomingBidiStreamsMap( + func(num protocol.StreamNum) streamI { + id := num.StreamID(protocol.StreamTypeBidi, m.perspective.Opposite()) + return newStream(id, m.sender, m.newFlowController(id), m.version) + }, + m.maxIncomingBidiStreams, + m.sender.queueControlFrame, + ) + m.outgoingUniStreams = newOutgoingUniStreamsMap( + func(num protocol.StreamNum) sendStreamI { + id := num.StreamID(protocol.StreamTypeUni, m.perspective) + return newSendStream(id, m.sender, m.newFlowController(id), m.version) + }, + m.sender.queueControlFrame, + ) + m.incomingUniStreams = newIncomingUniStreamsMap( + func(num protocol.StreamNum) receiveStreamI { + id := num.StreamID(protocol.StreamTypeUni, m.perspective.Opposite()) + return newReceiveStream(id, m.sender, m.newFlowController(id), m.version) + }, + m.maxIncomingUniStreams, + m.sender.queueControlFrame, + ) +} + func (m *streamsMap) OpenStream() (Stream, error) { - str, err := m.outgoingBidiStreams.OpenStream() + m.mutex.Lock() + reset := m.reset + mm := m.outgoingBidiStreams + m.mutex.Unlock() + if reset { + return nil, Err0RTTRejected + } + str, err := mm.OpenStream() return str, convertStreamError(err, protocol.StreamTypeBidi, m.perspective) } func (m *streamsMap) OpenStreamSync(ctx context.Context) (Stream, error) { - str, err := m.outgoingBidiStreams.OpenStreamSync(ctx) + m.mutex.Lock() + reset := m.reset + mm := m.outgoingBidiStreams + m.mutex.Unlock() + if reset { + return nil, Err0RTTRejected + } + str, err := mm.OpenStreamSync(ctx) return str, convertStreamError(err, protocol.StreamTypeBidi, m.perspective) } func (m *streamsMap) OpenUniStream() (SendStream, error) { - str, err := m.outgoingUniStreams.OpenStream() + m.mutex.Lock() + reset := m.reset + mm := m.outgoingUniStreams + m.mutex.Unlock() + if reset { + return nil, Err0RTTRejected + } + str, err := mm.OpenStream() return str, convertStreamError(err, protocol.StreamTypeBidi, m.perspective) } func (m *streamsMap) OpenUniStreamSync(ctx context.Context) (SendStream, error) { - str, err := m.outgoingUniStreams.OpenStreamSync(ctx) + m.mutex.Lock() + reset := m.reset + mm := m.outgoingUniStreams + m.mutex.Unlock() + if reset { + return nil, Err0RTTRejected + } + str, err := mm.OpenStreamSync(ctx) return str, convertStreamError(err, protocol.StreamTypeUni, m.perspective) } func (m *streamsMap) AcceptStream(ctx context.Context) (Stream, error) { - str, err := m.incomingBidiStreams.AcceptStream(ctx) + m.mutex.Lock() + reset := m.reset + mm := m.incomingBidiStreams + m.mutex.Unlock() + if reset { + return nil, Err0RTTRejected + } + str, err := mm.AcceptStream(ctx) return str, convertStreamError(err, protocol.StreamTypeBidi, m.perspective.Opposite()) } func (m *streamsMap) AcceptUniStream(ctx context.Context) (ReceiveStream, error) { - str, err := m.incomingUniStreams.AcceptStream(ctx) + m.mutex.Lock() + reset := m.reset + mm := m.incomingUniStreams + m.mutex.Unlock() + if reset { + return nil, Err0RTTRejected + } + str, err := mm.AcceptStream(ctx) return str, convertStreamError(err, protocol.StreamTypeUni, m.perspective.Opposite()) } @@ -232,3 +288,22 @@ func (m *streamsMap) CloseWithError(err error) { m.incomingBidiStreams.CloseWithError(err) m.incomingUniStreams.CloseWithError(err) } + +// ResetFor0RTT resets is used when 0-RTT is rejected. In that case, the streams maps are +// 1. closed with an Err0RTTRejected, making calls to Open{Uni}Stream{Sync} / Accept{Uni}Stream return that error. +// 2. reset to their initial state, such that we can immediately process new incoming stream data. +// Afterwards, calls to Open{Uni}Stream{Sync} / Accept{Uni}Stream will continue to return the error, +// until UseResetMaps() has been called. +func (m *streamsMap) ResetFor0RTT() { + m.mutex.Lock() + defer m.mutex.Unlock() + m.reset = true + m.CloseWithError(Err0RTTRejected) + m.initMaps() +} + +func (m *streamsMap) UseResetMaps() { + m.mutex.Lock() + m.reset = false + m.mutex.Unlock() +} diff --git a/streams_map_test.go b/streams_map_test.go index 0ba630b5..81e4b128 100644 --- a/streams_map_test.go +++ b/streams_map_test.go @@ -320,39 +320,32 @@ var _ = Describe("Streams Map", func() { }) }) - Context("updating stream ID limits", func() { - for _, p := range []protocol.Perspective{protocol.PerspectiveClient, protocol.PerspectiveServer} { - pers := p + It("processes the parameter for outgoing streams", func() { + mockSender.EXPECT().queueControlFrame(gomock.Any()) + _, err := m.OpenStream() + expectTooManyStreamsError(err) + m.UpdateLimits(&wire.TransportParameters{ + MaxBidiStreamNum: 5, + MaxUniStreamNum: 8, + }) - It(fmt.Sprintf("processes the parameter for outgoing streams, as a %s", pers), func() { - mockSender.EXPECT().queueControlFrame(gomock.Any()) - m.perspective = pers - _, err := m.OpenStream() - expectTooManyStreamsError(err) - m.UpdateLimits(&wire.TransportParameters{ - MaxBidiStreamNum: 5, - MaxUniStreamNum: 8, - }) - - mockSender.EXPECT().queueControlFrame(gomock.Any()).Times(2) - // test we can only 5 bidirectional streams - for i := 0; i < 5; i++ { - str, err := m.OpenStream() - Expect(err).ToNot(HaveOccurred()) - Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream + protocol.StreamID(4*i))) - } - _, err = m.OpenStream() - expectTooManyStreamsError(err) - // test we can only 8 unidirectional streams - for i := 0; i < 8; i++ { - str, err := m.OpenUniStream() - Expect(err).ToNot(HaveOccurred()) - Expect(str.StreamID()).To(Equal(ids.firstOutgoingUniStream + protocol.StreamID(4*i))) - } - _, err = m.OpenUniStream() - expectTooManyStreamsError(err) - }) + mockSender.EXPECT().queueControlFrame(gomock.Any()).Times(2) + // test we can only 5 bidirectional streams + for i := 0; i < 5; i++ { + str, err := m.OpenStream() + Expect(err).ToNot(HaveOccurred()) + Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream + protocol.StreamID(4*i))) } + _, err = m.OpenStream() + expectTooManyStreamsError(err) + // test we can only 8 unidirectional streams + for i := 0; i < 8; i++ { + str, err := m.OpenUniStream() + Expect(err).ToNot(HaveOccurred()) + Expect(str.StreamID()).To(Equal(ids.firstOutgoingUniStream + protocol.StreamID(4*i))) + } + _, err = m.OpenUniStream() + expectTooManyStreamsError(err) }) Context("handling MAX_STREAMS frames", func() { @@ -431,6 +424,28 @@ var _ = Describe("Streams Map", func() { Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal(testErr.Error())) }) + + if perspective == protocol.PerspectiveClient { + It("resets for 0-RTT", func() { + mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes() + m.ResetFor0RTT() + // make sure that calls to open / accept streams fail + _, err := m.OpenStream() + Expect(err).To(MatchError(Err0RTTRejected)) + _, err = m.AcceptStream(context.Background()) + Expect(err).To(MatchError(Err0RTTRejected)) + // make sure that we can still get new streams, as the server might be sending us data + str, err := m.GetOrOpenReceiveStream(3) + Expect(err).ToNot(HaveOccurred()) + Expect(str).ToNot(BeNil()) + + // now switch to using the new streams map + m.UseResetMaps() + _, err = m.OpenStream() + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("too many open streams")) + }) + } }) } }) From 97ab0144790f69fa5074f6de2620f66aaafb01ce Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 1 Mar 2021 10:46:53 +0800 Subject: [PATCH 4/5] don't retransmit 0-RTT packets when 0-RTT is rejected --- internal/ackhandler/sent_packet_handler.go | 9 ++++----- internal/ackhandler/sent_packet_handler_test.go | 5 ++--- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/internal/ackhandler/sent_packet_handler.go b/internal/ackhandler/sent_packet_handler.go index cfec0c1e..91ff05a3 100644 --- a/internal/ackhandler/sent_packet_handler.go +++ b/internal/ackhandler/sent_packet_handler.go @@ -169,15 +169,14 @@ func (h *sentPacketHandler) dropPackets(encLevel protocol.EncryptionLevel) { case protocol.EncryptionHandshake: h.handshakePackets = nil case protocol.Encryption0RTT: - // TODO(#2067): invalidate sent data + // This function is only called when 0-RTT is rejected, + // and not when the client drops 0-RTT keys when the handshake completes. + // When 0-RTT is rejected, all application data sent so far becomes invalid. + // Delete the packets from the history and remove them from bytes_in_flight. h.appDataPackets.history.Iterate(func(p *Packet) (bool, error) { - if p.skippedPacket { - return true, nil - } if p.EncryptionLevel != protocol.Encryption0RTT { return false, nil } - h.queueFramesForRetransmission(p) h.removeFromBytesInFlight(p) h.appDataPackets.history.Remove(p.PacketNumber) return true, nil diff --git a/internal/ackhandler/sent_packet_handler_test.go b/internal/ackhandler/sent_packet_handler_test.go index db093584..380be3c8 100644 --- a/internal/ackhandler/sent_packet_handler_test.go +++ b/internal/ackhandler/sent_packet_handler_test.go @@ -1078,8 +1078,7 @@ var _ = Describe("SentPacketHandler", func() { Expect(handler.handshakePackets).To(BeNil()) }) - // TODO(#2067): invalidate 0-RTT data when 0-RTT is rejected - It("retransmits 0-RTT packets when 0-RTT keys are dropped", func() { + It("doesn't retransmit 0-RTT packets when 0-RTT keys are dropped", func() { for i := protocol.PacketNumber(0); i < 6; i++ { if i == 3 { continue @@ -1094,7 +1093,7 @@ var _ = Describe("SentPacketHandler", func() { } Expect(handler.bytesInFlight).To(Equal(protocol.ByteCount(11))) handler.DropPackets(protocol.Encryption0RTT) - Expect(lostPackets).To(Equal([]protocol.PacketNumber{0, 1, 2, 4, 5})) + Expect(lostPackets).To(BeEmpty()) Expect(handler.bytesInFlight).To(Equal(protocol.ByteCount(6))) }) From f8313d868fcb80d77a8edc7db67bf16e6a95df33 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 4 Mar 2021 18:14:25 +0800 Subject: [PATCH 5/5] return an Err0RTTRejected when the server rejects a 0-RTT connection --- integrationtests/self/zero_rtt_test.go | 133 ++++++++++++++++++++++++- interface.go | 2 + internal/mocks/quic/early_session.go | 14 +++ mock_quic_session_test.go | 14 +++ session.go | 15 +++ 5 files changed, 174 insertions(+), 4 deletions(-) diff --git a/integrationtests/self/zero_rtt_test.go b/integrationtests/self/zero_rtt_test.go index 3ab95efb..904ea907 100644 --- a/integrationtests/self/zero_rtt_test.go +++ b/integrationtests/self/zero_rtt_test.go @@ -21,11 +21,22 @@ import ( ) var _ = Describe("0-RTT", func() { - const rtt = 50 * time.Millisecond + rtt := scaleDuration(5 * time.Millisecond) + for _, v := range protocol.SupportedVersions { version := v Context(fmt.Sprintf("with QUIC version %s", version), func() { + runDelayProxy := func(serverPort int) *quicproxy.QuicProxy { + proxy, err := quicproxy.NewQuicProxy("localhost:0", &quicproxy.Opts{ + RemoteAddr: fmt.Sprintf("localhost:%d", serverPort), + DelayPacket: func(_ quicproxy.Direction, data []byte) time.Duration { return rtt / 2 }, + }) + Expect(err).ToNot(HaveOccurred()) + + return proxy + } + runCountingProxy := func(serverPort int) (*quicproxy.QuicProxy, *uint32) { var num0RTTPackets uint32 // to be used as an atomic proxy, err := quicproxy.NewQuicProxy("localhost:0", &quicproxy.Opts{ @@ -105,6 +116,34 @@ var _ = Describe("0-RTT", func() { Eventually(done).Should(BeClosed()) } + check0RTTRejected := func( + ln quic.EarlyListener, + proxyPort int, + clientConf *tls.Config, + ) { + sess, err := quic.DialAddrEarly( + fmt.Sprintf("localhost:%d", proxyPort), + clientConf, + getQuicConfig(&quic.Config{Versions: []protocol.VersionNumber{version}}), + ) + Expect(err).ToNot(HaveOccurred()) + str, err := sess.OpenUniStream() + Expect(err).ToNot(HaveOccurred()) + _, err = str.Write(make([]byte, 3000)) + Expect(err).ToNot(HaveOccurred()) + Expect(str.Close()).To(Succeed()) + Expect(sess.ConnectionState().TLS.Used0RTT).To(BeFalse()) + + // make sure the server doesn't process the data + ctx, cancel := context.WithTimeout(context.Background(), scaleDuration(50*time.Millisecond)) + defer cancel() + serverSess, err := ln.Accept(ctx) + Expect(err).ToNot(HaveOccurred()) + Expect(serverSess.ConnectionState().TLS.Used0RTT).To(BeFalse()) + _, err = serverSess.AcceptUniStream(ctx) + Expect(err).To(Equal(context.DeadlineExceeded)) + } + It("transfers 0-RTT data", func() { ln, err := quic.ListenAddrEarly( "localhost:0", @@ -354,7 +393,7 @@ var _ = Describe("0-RTT", func() { Expect(err).ToNot(HaveOccurred()) proxy, num0RTTPackets := runCountingProxy(ln.Addr().(*net.UDPAddr).Port) defer proxy.Close() - transfer0RTTData(ln, proxy.LocalPort(), clientConf, PRData, false) + check0RTTRejected(ln, proxy.LocalPort(), clientConf) // The client should send 0-RTT packets, but the server doesn't process them. num0RTT := atomic.LoadUint32(num0RTTPackets) @@ -374,7 +413,9 @@ var _ = Describe("0-RTT", func() { ) Expect(err).ToNot(HaveOccurred()) - clientConf := dialAndReceiveSessionTicket(ln, ln.Addr().(*net.UDPAddr).Port) + delayProxy := runDelayProxy(ln.Addr().(*net.UDPAddr).Port) + defer delayProxy.Close() + clientConf := dialAndReceiveSessionTicket(ln, delayProxy.LocalPort()) // now close the listener and dial new connection with a different ALPN Expect(ln.Close()).To(Succeed()) @@ -391,7 +432,91 @@ var _ = Describe("0-RTT", func() { Expect(err).ToNot(HaveOccurred()) proxy, num0RTTPackets := runCountingProxy(ln.Addr().(*net.UDPAddr).Port) defer proxy.Close() - transfer0RTTData(ln, proxy.LocalPort(), clientConf, PRData, false) + + check0RTTRejected(ln, proxy.LocalPort(), clientConf) + + // The client should send 0-RTT packets, but the server doesn't process them. + num0RTT := atomic.LoadUint32(num0RTTPackets) + fmt.Fprintf(GinkgoWriter, "Sent %d 0-RTT packets.", num0RTT) + Expect(num0RTT).ToNot(BeZero()) + }) + + It("correctly deals with 0-RTT rejections", func() { + tlsConf := getTLSConfig() + ln, err := quic.ListenAddrEarly( + "localhost:0", + tlsConf, + getQuicConfig(&quic.Config{ + Versions: []protocol.VersionNumber{version}, + MaxIncomingUniStreams: 2, + AcceptToken: func(_ net.Addr, _ *quic.Token) bool { return true }, + }), + ) + Expect(err).ToNot(HaveOccurred()) + + delayProxy := runDelayProxy(ln.Addr().(*net.UDPAddr).Port) + defer delayProxy.Close() + clientConf := dialAndReceiveSessionTicket(ln, delayProxy.LocalPort()) + // now close the listener and dial new connection with different transport parameters + Expect(ln.Close()).To(Succeed()) + ln, err = quic.ListenAddrEarly( + "localhost:0", + tlsConf, + getQuicConfig(&quic.Config{ + Versions: []protocol.VersionNumber{version}, + MaxIncomingUniStreams: 1, + }), + ) + Expect(err).ToNot(HaveOccurred()) + proxy, num0RTTPackets := runCountingProxy(ln.Addr().(*net.UDPAddr).Port) + defer proxy.Close() + + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + defer close(done) + sess, err := ln.Accept(context.Background()) + Expect(err).ToNot(HaveOccurred()) + str, err := sess.AcceptUniStream(context.Background()) + Expect(err).ToNot(HaveOccurred()) + data, err := ioutil.ReadAll(str) + Expect(err).ToNot(HaveOccurred()) + Expect(string(data)).To(Equal("second flight")) + }() + + sess, err := quic.DialAddrEarly( + fmt.Sprintf("localhost:%d", proxy.LocalPort()), + clientConf, + getQuicConfig(&quic.Config{Versions: []protocol.VersionNumber{version}}), + ) + Expect(err).ToNot(HaveOccurred()) + // The client remembers that it was allowed to open 2 uni-directional streams. + for i := 0; i < 2; i++ { + str, err := sess.OpenUniStream() + Expect(err).ToNot(HaveOccurred()) + go func() { + defer GinkgoRecover() + _, err = str.Write([]byte("first flight")) + Expect(err).ToNot(HaveOccurred()) + }() + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + _, err = sess.AcceptStream(ctx) + Expect(err).To(Equal(quic.Err0RTTRejected)) + + newSess := sess.NextSession() + str, err := newSess.OpenUniStream() + Expect(err).ToNot(HaveOccurred()) + _, err = newSess.OpenUniStream() + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("too many open streams")) + _, err = str.Write([]byte("second flight")) + Expect(err).ToNot(HaveOccurred()) + Expect(str.Close()).To(Succeed()) + + Eventually(done).Should(BeClosed()) // The client should send 0-RTT packets, but the server doesn't process them. num0RTT := atomic.LoadUint32(num0RTTPackets) diff --git a/interface.go b/interface.go index 84ae906c..b6df74ff 100644 --- a/interface.go +++ b/interface.go @@ -215,6 +215,8 @@ type EarlySession interface { // Data sent before completion of the handshake is encrypted with 1-RTT keys. // Note that the client's identity hasn't been verified yet. HandshakeComplete() context.Context + + NextSession() Session } // Config contains all configuration data needed for a QUIC server or client. diff --git a/internal/mocks/quic/early_session.go b/internal/mocks/quic/early_session.go index 3d5e5519..5d24a0b4 100644 --- a/internal/mocks/quic/early_session.go +++ b/internal/mocks/quic/early_session.go @@ -137,6 +137,20 @@ func (mr *MockEarlySessionMockRecorder) LocalAddr() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LocalAddr", reflect.TypeOf((*MockEarlySession)(nil).LocalAddr)) } +// NextSession mocks base method. +func (m *MockEarlySession) NextSession() quic.Session { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NextSession") + ret0, _ := ret[0].(quic.Session) + return ret0 +} + +// NextSession indicates an expected call of NextSession. +func (mr *MockEarlySessionMockRecorder) NextSession() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NextSession", reflect.TypeOf((*MockEarlySession)(nil).NextSession)) +} + // OpenStream mocks base method. func (m *MockEarlySession) OpenStream() (quic.Stream, error) { m.ctrl.T.Helper() diff --git a/mock_quic_session_test.go b/mock_quic_session_test.go index b36a2627..e91d0c3b 100644 --- a/mock_quic_session_test.go +++ b/mock_quic_session_test.go @@ -150,6 +150,20 @@ func (mr *MockQuicSessionMockRecorder) LocalAddr() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LocalAddr", reflect.TypeOf((*MockQuicSession)(nil).LocalAddr)) } +// NextSession mocks base method. +func (m *MockQuicSession) NextSession() Session { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NextSession") + ret0, _ := ret[0].(Session) + return ret0 +} + +// NextSession indicates an expected call of NextSession. +func (mr *MockQuicSessionMockRecorder) NextSession() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NextSession", reflect.TypeOf((*MockQuicSession)(nil).NextSession)) +} + // OpenStream mocks base method. func (m *MockQuicSession) OpenStream() (Stream, error) { m.ctrl.T.Helper() diff --git a/session.go b/session.go index 395eadfb..aedb16ab 100644 --- a/session.go +++ b/session.go @@ -1458,6 +1458,15 @@ func (s *session) dropEncryptionLevel(encLevel protocol.EncryptionLevel) { if s.tracer != nil { s.tracer.DroppedEncryptionLevel(encLevel) } + if encLevel == protocol.Encryption0RTT { + s.streamsMap.ResetFor0RTT() + if err := s.connFlowController.Reset(); err != nil { + s.closeLocal(err) + } + if err := s.framer.Handle0RTTRejection(); err != nil { + s.closeLocal(err) + } + } } // is called for the client, when restoring transport parameters saved for 0-RTT @@ -1884,3 +1893,9 @@ func (s *session) getPerspective() protocol.Perspective { func (s *session) GetVersion() protocol.VersionNumber { return s.version } + +func (s *session) NextSession() Session { + <-s.HandshakeComplete().Done() + s.streamsMap.UseResetMaps() + return s +}