mirror of
https://github.com/refraction-networking/uquic.git
synced 2025-04-04 20:57:36 +03:00
Merge pull request #3066 from lucas-clemente/zero-rtt-rejection
require the application to handle 0-RTT rejection
This commit is contained in:
commit
b2c2e4940f
16 changed files with 489 additions and 85 deletions
29
framer.go
29
framer.go
|
@ -1,6 +1,7 @@
|
||||||
package quic
|
package quic
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/lucas-clemente/quic-go/internal/ackhandler"
|
"github.com/lucas-clemente/quic-go/internal/ackhandler"
|
||||||
|
@ -17,6 +18,8 @@ type framer interface {
|
||||||
|
|
||||||
AddActiveStream(protocol.StreamID)
|
AddActiveStream(protocol.StreamID)
|
||||||
AppendStreamFrames([]ackhandler.Frame, protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount)
|
AppendStreamFrames([]ackhandler.Frame, protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount)
|
||||||
|
|
||||||
|
Handle0RTTRejection() error
|
||||||
}
|
}
|
||||||
|
|
||||||
type framerI struct {
|
type framerI struct {
|
||||||
|
@ -140,3 +143,29 @@ func (f *framerI) AppendStreamFrames(frames []ackhandler.Frame, maxLen protocol.
|
||||||
}
|
}
|
||||||
return frames, length
|
return frames, length
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *framerI) Handle0RTTRejection() error {
|
||||||
|
f.mutex.Lock()
|
||||||
|
defer f.mutex.Unlock()
|
||||||
|
|
||||||
|
f.controlFrameMutex.Lock()
|
||||||
|
f.streamQueue = f.streamQueue[:0]
|
||||||
|
for id := range f.activeStreams {
|
||||||
|
delete(f.activeStreams, id)
|
||||||
|
}
|
||||||
|
var j int
|
||||||
|
for i, frame := range f.controlFrames {
|
||||||
|
switch frame.(type) {
|
||||||
|
case *wire.MaxDataFrame, *wire.MaxStreamDataFrame, *wire.MaxStreamsFrame:
|
||||||
|
return errors.New("didn't expect MAX_DATA / MAX_STREAM_DATA / MAX_STREAMS frame to be sent in 0-RTT")
|
||||||
|
case *wire.DataBlockedFrame, *wire.StreamDataBlockedFrame, *wire.StreamsBlockedFrame:
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
f.controlFrames[j] = f.controlFrames[i]
|
||||||
|
j++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f.controlFrames = f.controlFrames[:j]
|
||||||
|
f.controlFrameMutex.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -2,14 +2,14 @@ package quic
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"math/rand"
|
||||||
|
|
||||||
"github.com/lucas-clemente/quic-go/internal/ackhandler"
|
"github.com/lucas-clemente/quic-go/internal/ackhandler"
|
||||||
|
|
||||||
"github.com/golang/mock/gomock"
|
|
||||||
|
|
||||||
"github.com/lucas-clemente/quic-go/internal/protocol"
|
"github.com/lucas-clemente/quic-go/internal/protocol"
|
||||||
"github.com/lucas-clemente/quic-go/internal/wire"
|
"github.com/lucas-clemente/quic-go/internal/wire"
|
||||||
|
|
||||||
|
"github.com/golang/mock/gomock"
|
||||||
|
|
||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
)
|
)
|
||||||
|
@ -86,6 +86,26 @@ var _ = Describe("Framer", func() {
|
||||||
Expect(frames).To(HaveLen(1))
|
Expect(frames).To(HaveLen(1))
|
||||||
Expect(length).To(Equal(bfLen))
|
Expect(length).To(Equal(bfLen))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("drops *_BLOCKED frames when 0-RTT is rejected", func() {
|
||||||
|
ping := &wire.PingFrame{}
|
||||||
|
ncid := &wire.NewConnectionIDFrame{SequenceNumber: 10, ConnectionID: protocol.ConnectionID{0xde, 0xad, 0xbe, 0xef}}
|
||||||
|
frames := []wire.Frame{
|
||||||
|
&wire.DataBlockedFrame{MaximumData: 1337},
|
||||||
|
&wire.StreamDataBlockedFrame{StreamID: 42, MaximumStreamData: 1337},
|
||||||
|
&wire.StreamsBlockedFrame{StreamLimit: 13},
|
||||||
|
ping,
|
||||||
|
ncid,
|
||||||
|
}
|
||||||
|
rand.Shuffle(len(frames), func(i, j int) { frames[i], frames[j] = frames[j], frames[i] })
|
||||||
|
for _, f := range frames {
|
||||||
|
framer.QueueControlFrame(f)
|
||||||
|
}
|
||||||
|
Expect(framer.Handle0RTTRejection()).To(Succeed())
|
||||||
|
fs, length := framer.AppendControlFrames(nil, protocol.MaxByteCount)
|
||||||
|
Expect(fs).To(HaveLen(2))
|
||||||
|
Expect(length).To(Equal(ping.Length(version) + ncid.Length(version)))
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
Context("popping STREAM frames", func() {
|
Context("popping STREAM frames", func() {
|
||||||
|
@ -353,5 +373,13 @@ var _ = Describe("Framer", func() {
|
||||||
Expect(fs[0].Frame).To(Equal(f))
|
Expect(fs[0].Frame).To(Equal(f))
|
||||||
Expect(length).To(Equal(f.Length(version)))
|
Expect(length).To(Equal(f.Length(version)))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("drops all STREAM frames when 0-RTT is rejected", func() {
|
||||||
|
framer.AddActiveStream(id1)
|
||||||
|
Expect(framer.Handle0RTTRejection()).To(Succeed())
|
||||||
|
fs, length := framer.AppendStreamFrames(nil, protocol.MaxByteCount)
|
||||||
|
Expect(fs).To(BeEmpty())
|
||||||
|
Expect(length).To(BeZero())
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
|
@ -21,11 +21,22 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ = Describe("0-RTT", func() {
|
var _ = Describe("0-RTT", func() {
|
||||||
const rtt = 50 * time.Millisecond
|
rtt := scaleDuration(5 * time.Millisecond)
|
||||||
|
|
||||||
for _, v := range protocol.SupportedVersions {
|
for _, v := range protocol.SupportedVersions {
|
||||||
version := v
|
version := v
|
||||||
|
|
||||||
Context(fmt.Sprintf("with QUIC version %s", version), func() {
|
Context(fmt.Sprintf("with QUIC version %s", version), func() {
|
||||||
|
runDelayProxy := func(serverPort int) *quicproxy.QuicProxy {
|
||||||
|
proxy, err := quicproxy.NewQuicProxy("localhost:0", &quicproxy.Opts{
|
||||||
|
RemoteAddr: fmt.Sprintf("localhost:%d", serverPort),
|
||||||
|
DelayPacket: func(_ quicproxy.Direction, data []byte) time.Duration { return rtt / 2 },
|
||||||
|
})
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
return proxy
|
||||||
|
}
|
||||||
|
|
||||||
runCountingProxy := func(serverPort int) (*quicproxy.QuicProxy, *uint32) {
|
runCountingProxy := func(serverPort int) (*quicproxy.QuicProxy, *uint32) {
|
||||||
var num0RTTPackets uint32 // to be used as an atomic
|
var num0RTTPackets uint32 // to be used as an atomic
|
||||||
proxy, err := quicproxy.NewQuicProxy("localhost:0", &quicproxy.Opts{
|
proxy, err := quicproxy.NewQuicProxy("localhost:0", &quicproxy.Opts{
|
||||||
|
@ -105,6 +116,34 @@ var _ = Describe("0-RTT", func() {
|
||||||
Eventually(done).Should(BeClosed())
|
Eventually(done).Should(BeClosed())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
check0RTTRejected := func(
|
||||||
|
ln quic.EarlyListener,
|
||||||
|
proxyPort int,
|
||||||
|
clientConf *tls.Config,
|
||||||
|
) {
|
||||||
|
sess, err := quic.DialAddrEarly(
|
||||||
|
fmt.Sprintf("localhost:%d", proxyPort),
|
||||||
|
clientConf,
|
||||||
|
getQuicConfig(&quic.Config{Versions: []protocol.VersionNumber{version}}),
|
||||||
|
)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
str, err := sess.OpenUniStream()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
_, err = str.Write(make([]byte, 3000))
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(str.Close()).To(Succeed())
|
||||||
|
Expect(sess.ConnectionState().TLS.Used0RTT).To(BeFalse())
|
||||||
|
|
||||||
|
// make sure the server doesn't process the data
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), scaleDuration(50*time.Millisecond))
|
||||||
|
defer cancel()
|
||||||
|
serverSess, err := ln.Accept(ctx)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(serverSess.ConnectionState().TLS.Used0RTT).To(BeFalse())
|
||||||
|
_, err = serverSess.AcceptUniStream(ctx)
|
||||||
|
Expect(err).To(Equal(context.DeadlineExceeded))
|
||||||
|
}
|
||||||
|
|
||||||
It("transfers 0-RTT data", func() {
|
It("transfers 0-RTT data", func() {
|
||||||
ln, err := quic.ListenAddrEarly(
|
ln, err := quic.ListenAddrEarly(
|
||||||
"localhost:0",
|
"localhost:0",
|
||||||
|
@ -354,7 +393,7 @@ var _ = Describe("0-RTT", func() {
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
proxy, num0RTTPackets := runCountingProxy(ln.Addr().(*net.UDPAddr).Port)
|
proxy, num0RTTPackets := runCountingProxy(ln.Addr().(*net.UDPAddr).Port)
|
||||||
defer proxy.Close()
|
defer proxy.Close()
|
||||||
transfer0RTTData(ln, proxy.LocalPort(), clientConf, PRData, false)
|
check0RTTRejected(ln, proxy.LocalPort(), clientConf)
|
||||||
|
|
||||||
// The client should send 0-RTT packets, but the server doesn't process them.
|
// The client should send 0-RTT packets, but the server doesn't process them.
|
||||||
num0RTT := atomic.LoadUint32(num0RTTPackets)
|
num0RTT := atomic.LoadUint32(num0RTTPackets)
|
||||||
|
@ -374,7 +413,9 @@ var _ = Describe("0-RTT", func() {
|
||||||
)
|
)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
clientConf := dialAndReceiveSessionTicket(ln, ln.Addr().(*net.UDPAddr).Port)
|
delayProxy := runDelayProxy(ln.Addr().(*net.UDPAddr).Port)
|
||||||
|
defer delayProxy.Close()
|
||||||
|
clientConf := dialAndReceiveSessionTicket(ln, delayProxy.LocalPort())
|
||||||
|
|
||||||
// now close the listener and dial new connection with a different ALPN
|
// now close the listener and dial new connection with a different ALPN
|
||||||
Expect(ln.Close()).To(Succeed())
|
Expect(ln.Close()).To(Succeed())
|
||||||
|
@ -391,7 +432,91 @@ var _ = Describe("0-RTT", func() {
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
proxy, num0RTTPackets := runCountingProxy(ln.Addr().(*net.UDPAddr).Port)
|
proxy, num0RTTPackets := runCountingProxy(ln.Addr().(*net.UDPAddr).Port)
|
||||||
defer proxy.Close()
|
defer proxy.Close()
|
||||||
transfer0RTTData(ln, proxy.LocalPort(), clientConf, PRData, false)
|
|
||||||
|
check0RTTRejected(ln, proxy.LocalPort(), clientConf)
|
||||||
|
|
||||||
|
// The client should send 0-RTT packets, but the server doesn't process them.
|
||||||
|
num0RTT := atomic.LoadUint32(num0RTTPackets)
|
||||||
|
fmt.Fprintf(GinkgoWriter, "Sent %d 0-RTT packets.", num0RTT)
|
||||||
|
Expect(num0RTT).ToNot(BeZero())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("correctly deals with 0-RTT rejections", func() {
|
||||||
|
tlsConf := getTLSConfig()
|
||||||
|
ln, err := quic.ListenAddrEarly(
|
||||||
|
"localhost:0",
|
||||||
|
tlsConf,
|
||||||
|
getQuicConfig(&quic.Config{
|
||||||
|
Versions: []protocol.VersionNumber{version},
|
||||||
|
MaxIncomingUniStreams: 2,
|
||||||
|
AcceptToken: func(_ net.Addr, _ *quic.Token) bool { return true },
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
delayProxy := runDelayProxy(ln.Addr().(*net.UDPAddr).Port)
|
||||||
|
defer delayProxy.Close()
|
||||||
|
clientConf := dialAndReceiveSessionTicket(ln, delayProxy.LocalPort())
|
||||||
|
// now close the listener and dial new connection with different transport parameters
|
||||||
|
Expect(ln.Close()).To(Succeed())
|
||||||
|
ln, err = quic.ListenAddrEarly(
|
||||||
|
"localhost:0",
|
||||||
|
tlsConf,
|
||||||
|
getQuicConfig(&quic.Config{
|
||||||
|
Versions: []protocol.VersionNumber{version},
|
||||||
|
MaxIncomingUniStreams: 1,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
proxy, num0RTTPackets := runCountingProxy(ln.Addr().(*net.UDPAddr).Port)
|
||||||
|
defer proxy.Close()
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
defer close(done)
|
||||||
|
sess, err := ln.Accept(context.Background())
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
str, err := sess.AcceptUniStream(context.Background())
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
data, err := ioutil.ReadAll(str)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(string(data)).To(Equal("second flight"))
|
||||||
|
}()
|
||||||
|
|
||||||
|
sess, err := quic.DialAddrEarly(
|
||||||
|
fmt.Sprintf("localhost:%d", proxy.LocalPort()),
|
||||||
|
clientConf,
|
||||||
|
getQuicConfig(&quic.Config{Versions: []protocol.VersionNumber{version}}),
|
||||||
|
)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
// The client remembers that it was allowed to open 2 uni-directional streams.
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
str, err := sess.OpenUniStream()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
go func() {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
_, err = str.Write([]byte("first flight"))
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
defer cancel()
|
||||||
|
_, err = sess.AcceptStream(ctx)
|
||||||
|
Expect(err).To(Equal(quic.Err0RTTRejected))
|
||||||
|
|
||||||
|
newSess := sess.NextSession()
|
||||||
|
str, err := newSess.OpenUniStream()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
_, err = newSess.OpenUniStream()
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("too many open streams"))
|
||||||
|
_, err = str.Write([]byte("second flight"))
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(str.Close()).To(Succeed())
|
||||||
|
|
||||||
|
Eventually(done).Should(BeClosed())
|
||||||
|
|
||||||
// The client should send 0-RTT packets, but the server doesn't process them.
|
// The client should send 0-RTT packets, but the server doesn't process them.
|
||||||
num0RTT := atomic.LoadUint32(num0RTTPackets)
|
num0RTT := atomic.LoadUint32(num0RTTPackets)
|
||||||
|
|
10
interface.go
10
interface.go
|
@ -2,6 +2,7 @@ package quic
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"time"
|
"time"
|
||||||
|
@ -65,6 +66,13 @@ type TokenStore interface {
|
||||||
// Valid values range between 0 and MAX_UINT62.
|
// Valid values range between 0 and MAX_UINT62.
|
||||||
type ErrorCode = protocol.ApplicationErrorCode
|
type ErrorCode = protocol.ApplicationErrorCode
|
||||||
|
|
||||||
|
// Err0RTTRejected is the returned from:
|
||||||
|
// * Open{Uni}Stream{Sync}
|
||||||
|
// * Accept{Uni}Stream
|
||||||
|
// * Stream.Read and Stream.Write
|
||||||
|
// when the server rejects a 0-RTT connection attempt.
|
||||||
|
var Err0RTTRejected = errors.New("0-RTT rejected")
|
||||||
|
|
||||||
// Stream is the interface implemented by QUIC streams
|
// Stream is the interface implemented by QUIC streams
|
||||||
type Stream interface {
|
type Stream interface {
|
||||||
ReceiveStream
|
ReceiveStream
|
||||||
|
@ -207,6 +215,8 @@ type EarlySession interface {
|
||||||
// Data sent before completion of the handshake is encrypted with 1-RTT keys.
|
// Data sent before completion of the handshake is encrypted with 1-RTT keys.
|
||||||
// Note that the client's identity hasn't been verified yet.
|
// Note that the client's identity hasn't been verified yet.
|
||||||
HandshakeComplete() context.Context
|
HandshakeComplete() context.Context
|
||||||
|
|
||||||
|
NextSession() Session
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config contains all configuration data needed for a QUIC server or client.
|
// Config contains all configuration data needed for a QUIC server or client.
|
||||||
|
|
|
@ -169,15 +169,14 @@ func (h *sentPacketHandler) dropPackets(encLevel protocol.EncryptionLevel) {
|
||||||
case protocol.EncryptionHandshake:
|
case protocol.EncryptionHandshake:
|
||||||
h.handshakePackets = nil
|
h.handshakePackets = nil
|
||||||
case protocol.Encryption0RTT:
|
case protocol.Encryption0RTT:
|
||||||
// TODO(#2067): invalidate sent data
|
// This function is only called when 0-RTT is rejected,
|
||||||
|
// and not when the client drops 0-RTT keys when the handshake completes.
|
||||||
|
// When 0-RTT is rejected, all application data sent so far becomes invalid.
|
||||||
|
// Delete the packets from the history and remove them from bytes_in_flight.
|
||||||
h.appDataPackets.history.Iterate(func(p *Packet) (bool, error) {
|
h.appDataPackets.history.Iterate(func(p *Packet) (bool, error) {
|
||||||
if p.skippedPacket {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
if p.EncryptionLevel != protocol.Encryption0RTT {
|
if p.EncryptionLevel != protocol.Encryption0RTT {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
h.queueFramesForRetransmission(p)
|
|
||||||
h.removeFromBytesInFlight(p)
|
h.removeFromBytesInFlight(p)
|
||||||
h.appDataPackets.history.Remove(p.PacketNumber)
|
h.appDataPackets.history.Remove(p.PacketNumber)
|
||||||
return true, nil
|
return true, nil
|
||||||
|
|
|
@ -1078,8 +1078,7 @@ var _ = Describe("SentPacketHandler", func() {
|
||||||
Expect(handler.handshakePackets).To(BeNil())
|
Expect(handler.handshakePackets).To(BeNil())
|
||||||
})
|
})
|
||||||
|
|
||||||
// TODO(#2067): invalidate 0-RTT data when 0-RTT is rejected
|
It("doesn't retransmit 0-RTT packets when 0-RTT keys are dropped", func() {
|
||||||
It("retransmits 0-RTT packets when 0-RTT keys are dropped", func() {
|
|
||||||
for i := protocol.PacketNumber(0); i < 6; i++ {
|
for i := protocol.PacketNumber(0); i < 6; i++ {
|
||||||
if i == 3 {
|
if i == 3 {
|
||||||
continue
|
continue
|
||||||
|
@ -1094,7 +1093,7 @@ var _ = Describe("SentPacketHandler", func() {
|
||||||
}
|
}
|
||||||
Expect(handler.bytesInFlight).To(Equal(protocol.ByteCount(11)))
|
Expect(handler.bytesInFlight).To(Equal(protocol.ByteCount(11)))
|
||||||
handler.DropPackets(protocol.Encryption0RTT)
|
handler.DropPackets(protocol.Encryption0RTT)
|
||||||
Expect(lostPackets).To(Equal([]protocol.PacketNumber{0, 1, 2, 4, 5}))
|
Expect(lostPackets).To(BeEmpty())
|
||||||
Expect(handler.bytesInFlight).To(Equal(protocol.ByteCount(6)))
|
Expect(handler.bytesInFlight).To(Equal(protocol.ByteCount(6)))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package flowcontrol
|
package flowcontrol
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -86,3 +87,18 @@ func (c *connectionFlowController) EnsureMinimumWindowSize(inc protocol.ByteCoun
|
||||||
}
|
}
|
||||||
c.mutex.Unlock()
|
c.mutex.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The flow controller is reset when 0-RTT is rejected.
|
||||||
|
// All stream data is invalidated, it's if we had never opened a stream and never sent any data.
|
||||||
|
// At that point, we only have sent stream data, but we didn't have the keys to open 1-RTT keys yet.
|
||||||
|
func (c *connectionFlowController) Reset() error {
|
||||||
|
c.mutex.Lock()
|
||||||
|
defer c.mutex.Unlock()
|
||||||
|
|
||||||
|
if c.bytesRead > 0 || c.highestReceived > 0 || !c.epochStartTime.IsZero() {
|
||||||
|
return errors.New("flow controller reset after reading data")
|
||||||
|
}
|
||||||
|
c.bytesSent = 0
|
||||||
|
c.lastBlockedAt = 0
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -129,4 +129,28 @@ var _ = Describe("Connection Flow controller", func() {
|
||||||
Expect(controller.epochStartTime).To(BeTemporally("~", time.Now(), 100*time.Millisecond))
|
Expect(controller.epochStartTime).To(BeTemporally("~", time.Now(), 100*time.Millisecond))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Context("resetting", func() {
|
||||||
|
It("resets", func() {
|
||||||
|
const initialWindow protocol.ByteCount = 1337
|
||||||
|
controller.UpdateSendWindow(initialWindow)
|
||||||
|
controller.AddBytesSent(1000)
|
||||||
|
Expect(controller.SendWindowSize()).To(Equal(initialWindow - 1000))
|
||||||
|
Expect(controller.Reset()).To(Succeed())
|
||||||
|
Expect(controller.SendWindowSize()).To(Equal(initialWindow))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("says if is blocked after resetting", func() {
|
||||||
|
const initialWindow protocol.ByteCount = 1337
|
||||||
|
controller.UpdateSendWindow(initialWindow)
|
||||||
|
controller.AddBytesSent(initialWindow)
|
||||||
|
blocked, _ := controller.IsNewlyBlocked()
|
||||||
|
Expect(blocked).To(BeTrue())
|
||||||
|
Expect(controller.Reset()).To(Succeed())
|
||||||
|
controller.AddBytesSent(initialWindow)
|
||||||
|
blocked, blockedAt := controller.IsNewlyBlocked()
|
||||||
|
Expect(blocked).To(BeTrue())
|
||||||
|
Expect(blockedAt).To(Equal(initialWindow))
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
|
@ -29,6 +29,7 @@ type StreamFlowController interface {
|
||||||
// The ConnectionFlowController is the flow controller for the connection.
|
// The ConnectionFlowController is the flow controller for the connection.
|
||||||
type ConnectionFlowController interface {
|
type ConnectionFlowController interface {
|
||||||
flowController
|
flowController
|
||||||
|
Reset() error
|
||||||
}
|
}
|
||||||
|
|
||||||
type connectionFlowControllerI interface {
|
type connectionFlowControllerI interface {
|
||||||
|
|
|
@ -87,6 +87,20 @@ func (mr *MockConnectionFlowControllerMockRecorder) IsNewlyBlocked() *gomock.Cal
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsNewlyBlocked", reflect.TypeOf((*MockConnectionFlowController)(nil).IsNewlyBlocked))
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsNewlyBlocked", reflect.TypeOf((*MockConnectionFlowController)(nil).IsNewlyBlocked))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reset mocks base method.
|
||||||
|
func (m *MockConnectionFlowController) Reset() error {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "Reset")
|
||||||
|
ret0, _ := ret[0].(error)
|
||||||
|
return ret0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset indicates an expected call of Reset.
|
||||||
|
func (mr *MockConnectionFlowControllerMockRecorder) Reset() *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reset", reflect.TypeOf((*MockConnectionFlowController)(nil).Reset))
|
||||||
|
}
|
||||||
|
|
||||||
// SendWindowSize mocks base method.
|
// SendWindowSize mocks base method.
|
||||||
func (m *MockConnectionFlowController) SendWindowSize() protocol.ByteCount {
|
func (m *MockConnectionFlowController) SendWindowSize() protocol.ByteCount {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
|
|
|
@ -137,6 +137,20 @@ func (mr *MockEarlySessionMockRecorder) LocalAddr() *gomock.Call {
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LocalAddr", reflect.TypeOf((*MockEarlySession)(nil).LocalAddr))
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LocalAddr", reflect.TypeOf((*MockEarlySession)(nil).LocalAddr))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NextSession mocks base method.
|
||||||
|
func (m *MockEarlySession) NextSession() quic.Session {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "NextSession")
|
||||||
|
ret0, _ := ret[0].(quic.Session)
|
||||||
|
return ret0
|
||||||
|
}
|
||||||
|
|
||||||
|
// NextSession indicates an expected call of NextSession.
|
||||||
|
func (mr *MockEarlySessionMockRecorder) NextSession() *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NextSession", reflect.TypeOf((*MockEarlySession)(nil).NextSession))
|
||||||
|
}
|
||||||
|
|
||||||
// OpenStream mocks base method.
|
// OpenStream mocks base method.
|
||||||
func (m *MockEarlySession) OpenStream() (quic.Stream, error) {
|
func (m *MockEarlySession) OpenStream() (quic.Stream, error) {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
|
|
|
@ -150,6 +150,20 @@ func (mr *MockQuicSessionMockRecorder) LocalAddr() *gomock.Call {
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LocalAddr", reflect.TypeOf((*MockQuicSession)(nil).LocalAddr))
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LocalAddr", reflect.TypeOf((*MockQuicSession)(nil).LocalAddr))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NextSession mocks base method.
|
||||||
|
func (m *MockQuicSession) NextSession() Session {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "NextSession")
|
||||||
|
ret0, _ := ret[0].(Session)
|
||||||
|
return ret0
|
||||||
|
}
|
||||||
|
|
||||||
|
// NextSession indicates an expected call of NextSession.
|
||||||
|
func (mr *MockQuicSessionMockRecorder) NextSession() *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NextSession", reflect.TypeOf((*MockQuicSession)(nil).NextSession))
|
||||||
|
}
|
||||||
|
|
||||||
// OpenStream mocks base method.
|
// OpenStream mocks base method.
|
||||||
func (m *MockQuicSession) OpenStream() (Stream, error) {
|
func (m *MockQuicSession) OpenStream() (Stream, error) {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
|
|
|
@ -194,6 +194,18 @@ func (mr *MockStreamManagerMockRecorder) OpenUniStreamSync(arg0 interface{}) *go
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OpenUniStreamSync", reflect.TypeOf((*MockStreamManager)(nil).OpenUniStreamSync), arg0)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OpenUniStreamSync", reflect.TypeOf((*MockStreamManager)(nil).OpenUniStreamSync), arg0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ResetFor0RTT mocks base method.
|
||||||
|
func (m *MockStreamManager) ResetFor0RTT() {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
m.ctrl.Call(m, "ResetFor0RTT")
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResetFor0RTT indicates an expected call of ResetFor0RTT.
|
||||||
|
func (mr *MockStreamManagerMockRecorder) ResetFor0RTT() *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetFor0RTT", reflect.TypeOf((*MockStreamManager)(nil).ResetFor0RTT))
|
||||||
|
}
|
||||||
|
|
||||||
// UpdateLimits mocks base method.
|
// UpdateLimits mocks base method.
|
||||||
func (m *MockStreamManager) UpdateLimits(arg0 *wire.TransportParameters) {
|
func (m *MockStreamManager) UpdateLimits(arg0 *wire.TransportParameters) {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
|
@ -205,3 +217,15 @@ func (mr *MockStreamManagerMockRecorder) UpdateLimits(arg0 interface{}) *gomock.
|
||||||
mr.mock.ctrl.T.Helper()
|
mr.mock.ctrl.T.Helper()
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateLimits", reflect.TypeOf((*MockStreamManager)(nil).UpdateLimits), arg0)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateLimits", reflect.TypeOf((*MockStreamManager)(nil).UpdateLimits), arg0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UseResetMaps mocks base method.
|
||||||
|
func (m *MockStreamManager) UseResetMaps() {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
m.ctrl.Call(m, "UseResetMaps")
|
||||||
|
}
|
||||||
|
|
||||||
|
// UseResetMaps indicates an expected call of UseResetMaps.
|
||||||
|
func (mr *MockStreamManagerMockRecorder) UseResetMaps() *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UseResetMaps", reflect.TypeOf((*MockStreamManager)(nil).UseResetMaps))
|
||||||
|
}
|
||||||
|
|
17
session.go
17
session.go
|
@ -45,6 +45,8 @@ type streamManager interface {
|
||||||
UpdateLimits(*wire.TransportParameters)
|
UpdateLimits(*wire.TransportParameters)
|
||||||
HandleMaxStreamsFrame(*wire.MaxStreamsFrame)
|
HandleMaxStreamsFrame(*wire.MaxStreamsFrame)
|
||||||
CloseWithError(error)
|
CloseWithError(error)
|
||||||
|
ResetFor0RTT()
|
||||||
|
UseResetMaps()
|
||||||
}
|
}
|
||||||
|
|
||||||
type cryptoStreamHandler interface {
|
type cryptoStreamHandler interface {
|
||||||
|
@ -1456,6 +1458,15 @@ func (s *session) dropEncryptionLevel(encLevel protocol.EncryptionLevel) {
|
||||||
if s.tracer != nil {
|
if s.tracer != nil {
|
||||||
s.tracer.DroppedEncryptionLevel(encLevel)
|
s.tracer.DroppedEncryptionLevel(encLevel)
|
||||||
}
|
}
|
||||||
|
if encLevel == protocol.Encryption0RTT {
|
||||||
|
s.streamsMap.ResetFor0RTT()
|
||||||
|
if err := s.connFlowController.Reset(); err != nil {
|
||||||
|
s.closeLocal(err)
|
||||||
|
}
|
||||||
|
if err := s.framer.Handle0RTTRejection(); err != nil {
|
||||||
|
s.closeLocal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// is called for the client, when restoring transport parameters saved for 0-RTT
|
// is called for the client, when restoring transport parameters saved for 0-RTT
|
||||||
|
@ -1882,3 +1893,9 @@ func (s *session) getPerspective() protocol.Perspective {
|
||||||
func (s *session) GetVersion() protocol.VersionNumber {
|
func (s *session) GetVersion() protocol.VersionNumber {
|
||||||
return s.version
|
return s.version
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *session) NextSession() Session {
|
||||||
|
<-s.HandshakeComplete().Done()
|
||||||
|
s.streamsMap.UseResetMaps()
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
153
streams_map.go
153
streams_map.go
|
@ -5,6 +5,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/lucas-clemente/quic-go/internal/flowcontrol"
|
"github.com/lucas-clemente/quic-go/internal/flowcontrol"
|
||||||
"github.com/lucas-clemente/quic-go/internal/protocol"
|
"github.com/lucas-clemente/quic-go/internal/protocol"
|
||||||
|
@ -45,14 +46,20 @@ var errTooManyOpenStreams = errors.New("too many open streams")
|
||||||
|
|
||||||
type streamsMap struct {
|
type streamsMap struct {
|
||||||
perspective protocol.Perspective
|
perspective protocol.Perspective
|
||||||
|
version protocol.VersionNumber
|
||||||
|
|
||||||
|
maxIncomingBidiStreams uint64
|
||||||
|
maxIncomingUniStreams uint64
|
||||||
|
|
||||||
sender streamSender
|
sender streamSender
|
||||||
newFlowController func(protocol.StreamID) flowcontrol.StreamFlowController
|
newFlowController func(protocol.StreamID) flowcontrol.StreamFlowController
|
||||||
|
|
||||||
|
mutex sync.Mutex
|
||||||
outgoingBidiStreams *outgoingBidiStreamsMap
|
outgoingBidiStreams *outgoingBidiStreamsMap
|
||||||
outgoingUniStreams *outgoingUniStreamsMap
|
outgoingUniStreams *outgoingUniStreamsMap
|
||||||
incomingBidiStreams *incomingBidiStreamsMap
|
incomingBidiStreams *incomingBidiStreamsMap
|
||||||
incomingUniStreams *incomingUniStreamsMap
|
incomingUniStreams *incomingUniStreamsMap
|
||||||
|
reset bool
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ streamManager = &streamsMap{}
|
var _ streamManager = &streamsMap{}
|
||||||
|
@ -66,70 +73,119 @@ func newStreamsMap(
|
||||||
version protocol.VersionNumber,
|
version protocol.VersionNumber,
|
||||||
) streamManager {
|
) streamManager {
|
||||||
m := &streamsMap{
|
m := &streamsMap{
|
||||||
perspective: perspective,
|
perspective: perspective,
|
||||||
newFlowController: newFlowController,
|
newFlowController: newFlowController,
|
||||||
sender: sender,
|
maxIncomingBidiStreams: maxIncomingBidiStreams,
|
||||||
|
maxIncomingUniStreams: maxIncomingUniStreams,
|
||||||
|
sender: sender,
|
||||||
|
version: version,
|
||||||
}
|
}
|
||||||
m.outgoingBidiStreams = newOutgoingBidiStreamsMap(
|
m.initMaps()
|
||||||
func(num protocol.StreamNum) streamI {
|
|
||||||
id := num.StreamID(protocol.StreamTypeBidi, perspective)
|
|
||||||
return newStream(id, m.sender, m.newFlowController(id), version)
|
|
||||||
},
|
|
||||||
sender.queueControlFrame,
|
|
||||||
)
|
|
||||||
m.incomingBidiStreams = newIncomingBidiStreamsMap(
|
|
||||||
func(num protocol.StreamNum) streamI {
|
|
||||||
id := num.StreamID(protocol.StreamTypeBidi, perspective.Opposite())
|
|
||||||
return newStream(id, m.sender, m.newFlowController(id), version)
|
|
||||||
},
|
|
||||||
maxIncomingBidiStreams,
|
|
||||||
sender.queueControlFrame,
|
|
||||||
)
|
|
||||||
m.outgoingUniStreams = newOutgoingUniStreamsMap(
|
|
||||||
func(num protocol.StreamNum) sendStreamI {
|
|
||||||
id := num.StreamID(protocol.StreamTypeUni, perspective)
|
|
||||||
return newSendStream(id, m.sender, m.newFlowController(id), version)
|
|
||||||
},
|
|
||||||
sender.queueControlFrame,
|
|
||||||
)
|
|
||||||
m.incomingUniStreams = newIncomingUniStreamsMap(
|
|
||||||
func(num protocol.StreamNum) receiveStreamI {
|
|
||||||
id := num.StreamID(protocol.StreamTypeUni, perspective.Opposite())
|
|
||||||
return newReceiveStream(id, m.sender, m.newFlowController(id), version)
|
|
||||||
},
|
|
||||||
maxIncomingUniStreams,
|
|
||||||
sender.queueControlFrame,
|
|
||||||
)
|
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *streamsMap) initMaps() {
|
||||||
|
m.outgoingBidiStreams = newOutgoingBidiStreamsMap(
|
||||||
|
func(num protocol.StreamNum) streamI {
|
||||||
|
id := num.StreamID(protocol.StreamTypeBidi, m.perspective)
|
||||||
|
return newStream(id, m.sender, m.newFlowController(id), m.version)
|
||||||
|
},
|
||||||
|
m.sender.queueControlFrame,
|
||||||
|
)
|
||||||
|
m.incomingBidiStreams = newIncomingBidiStreamsMap(
|
||||||
|
func(num protocol.StreamNum) streamI {
|
||||||
|
id := num.StreamID(protocol.StreamTypeBidi, m.perspective.Opposite())
|
||||||
|
return newStream(id, m.sender, m.newFlowController(id), m.version)
|
||||||
|
},
|
||||||
|
m.maxIncomingBidiStreams,
|
||||||
|
m.sender.queueControlFrame,
|
||||||
|
)
|
||||||
|
m.outgoingUniStreams = newOutgoingUniStreamsMap(
|
||||||
|
func(num protocol.StreamNum) sendStreamI {
|
||||||
|
id := num.StreamID(protocol.StreamTypeUni, m.perspective)
|
||||||
|
return newSendStream(id, m.sender, m.newFlowController(id), m.version)
|
||||||
|
},
|
||||||
|
m.sender.queueControlFrame,
|
||||||
|
)
|
||||||
|
m.incomingUniStreams = newIncomingUniStreamsMap(
|
||||||
|
func(num protocol.StreamNum) receiveStreamI {
|
||||||
|
id := num.StreamID(protocol.StreamTypeUni, m.perspective.Opposite())
|
||||||
|
return newReceiveStream(id, m.sender, m.newFlowController(id), m.version)
|
||||||
|
},
|
||||||
|
m.maxIncomingUniStreams,
|
||||||
|
m.sender.queueControlFrame,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
func (m *streamsMap) OpenStream() (Stream, error) {
|
func (m *streamsMap) OpenStream() (Stream, error) {
|
||||||
str, err := m.outgoingBidiStreams.OpenStream()
|
m.mutex.Lock()
|
||||||
|
reset := m.reset
|
||||||
|
mm := m.outgoingBidiStreams
|
||||||
|
m.mutex.Unlock()
|
||||||
|
if reset {
|
||||||
|
return nil, Err0RTTRejected
|
||||||
|
}
|
||||||
|
str, err := mm.OpenStream()
|
||||||
return str, convertStreamError(err, protocol.StreamTypeBidi, m.perspective)
|
return str, convertStreamError(err, protocol.StreamTypeBidi, m.perspective)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *streamsMap) OpenStreamSync(ctx context.Context) (Stream, error) {
|
func (m *streamsMap) OpenStreamSync(ctx context.Context) (Stream, error) {
|
||||||
str, err := m.outgoingBidiStreams.OpenStreamSync(ctx)
|
m.mutex.Lock()
|
||||||
|
reset := m.reset
|
||||||
|
mm := m.outgoingBidiStreams
|
||||||
|
m.mutex.Unlock()
|
||||||
|
if reset {
|
||||||
|
return nil, Err0RTTRejected
|
||||||
|
}
|
||||||
|
str, err := mm.OpenStreamSync(ctx)
|
||||||
return str, convertStreamError(err, protocol.StreamTypeBidi, m.perspective)
|
return str, convertStreamError(err, protocol.StreamTypeBidi, m.perspective)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *streamsMap) OpenUniStream() (SendStream, error) {
|
func (m *streamsMap) OpenUniStream() (SendStream, error) {
|
||||||
str, err := m.outgoingUniStreams.OpenStream()
|
m.mutex.Lock()
|
||||||
|
reset := m.reset
|
||||||
|
mm := m.outgoingUniStreams
|
||||||
|
m.mutex.Unlock()
|
||||||
|
if reset {
|
||||||
|
return nil, Err0RTTRejected
|
||||||
|
}
|
||||||
|
str, err := mm.OpenStream()
|
||||||
return str, convertStreamError(err, protocol.StreamTypeBidi, m.perspective)
|
return str, convertStreamError(err, protocol.StreamTypeBidi, m.perspective)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *streamsMap) OpenUniStreamSync(ctx context.Context) (SendStream, error) {
|
func (m *streamsMap) OpenUniStreamSync(ctx context.Context) (SendStream, error) {
|
||||||
str, err := m.outgoingUniStreams.OpenStreamSync(ctx)
|
m.mutex.Lock()
|
||||||
|
reset := m.reset
|
||||||
|
mm := m.outgoingUniStreams
|
||||||
|
m.mutex.Unlock()
|
||||||
|
if reset {
|
||||||
|
return nil, Err0RTTRejected
|
||||||
|
}
|
||||||
|
str, err := mm.OpenStreamSync(ctx)
|
||||||
return str, convertStreamError(err, protocol.StreamTypeUni, m.perspective)
|
return str, convertStreamError(err, protocol.StreamTypeUni, m.perspective)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *streamsMap) AcceptStream(ctx context.Context) (Stream, error) {
|
func (m *streamsMap) AcceptStream(ctx context.Context) (Stream, error) {
|
||||||
str, err := m.incomingBidiStreams.AcceptStream(ctx)
|
m.mutex.Lock()
|
||||||
|
reset := m.reset
|
||||||
|
mm := m.incomingBidiStreams
|
||||||
|
m.mutex.Unlock()
|
||||||
|
if reset {
|
||||||
|
return nil, Err0RTTRejected
|
||||||
|
}
|
||||||
|
str, err := mm.AcceptStream(ctx)
|
||||||
return str, convertStreamError(err, protocol.StreamTypeBidi, m.perspective.Opposite())
|
return str, convertStreamError(err, protocol.StreamTypeBidi, m.perspective.Opposite())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *streamsMap) AcceptUniStream(ctx context.Context) (ReceiveStream, error) {
|
func (m *streamsMap) AcceptUniStream(ctx context.Context) (ReceiveStream, error) {
|
||||||
str, err := m.incomingUniStreams.AcceptStream(ctx)
|
m.mutex.Lock()
|
||||||
|
reset := m.reset
|
||||||
|
mm := m.incomingUniStreams
|
||||||
|
m.mutex.Unlock()
|
||||||
|
if reset {
|
||||||
|
return nil, Err0RTTRejected
|
||||||
|
}
|
||||||
|
str, err := mm.AcceptStream(ctx)
|
||||||
return str, convertStreamError(err, protocol.StreamTypeUni, m.perspective.Opposite())
|
return str, convertStreamError(err, protocol.StreamTypeUni, m.perspective.Opposite())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -232,3 +288,22 @@ func (m *streamsMap) CloseWithError(err error) {
|
||||||
m.incomingBidiStreams.CloseWithError(err)
|
m.incomingBidiStreams.CloseWithError(err)
|
||||||
m.incomingUniStreams.CloseWithError(err)
|
m.incomingUniStreams.CloseWithError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ResetFor0RTT resets is used when 0-RTT is rejected. In that case, the streams maps are
|
||||||
|
// 1. closed with an Err0RTTRejected, making calls to Open{Uni}Stream{Sync} / Accept{Uni}Stream return that error.
|
||||||
|
// 2. reset to their initial state, such that we can immediately process new incoming stream data.
|
||||||
|
// Afterwards, calls to Open{Uni}Stream{Sync} / Accept{Uni}Stream will continue to return the error,
|
||||||
|
// until UseResetMaps() has been called.
|
||||||
|
func (m *streamsMap) ResetFor0RTT() {
|
||||||
|
m.mutex.Lock()
|
||||||
|
defer m.mutex.Unlock()
|
||||||
|
m.reset = true
|
||||||
|
m.CloseWithError(Err0RTTRejected)
|
||||||
|
m.initMaps()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *streamsMap) UseResetMaps() {
|
||||||
|
m.mutex.Lock()
|
||||||
|
m.reset = false
|
||||||
|
m.mutex.Unlock()
|
||||||
|
}
|
||||||
|
|
|
@ -320,39 +320,32 @@ var _ = Describe("Streams Map", func() {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
Context("updating stream ID limits", func() {
|
It("processes the parameter for outgoing streams", func() {
|
||||||
for _, p := range []protocol.Perspective{protocol.PerspectiveClient, protocol.PerspectiveServer} {
|
mockSender.EXPECT().queueControlFrame(gomock.Any())
|
||||||
pers := p
|
_, err := m.OpenStream()
|
||||||
|
expectTooManyStreamsError(err)
|
||||||
|
m.UpdateLimits(&wire.TransportParameters{
|
||||||
|
MaxBidiStreamNum: 5,
|
||||||
|
MaxUniStreamNum: 8,
|
||||||
|
})
|
||||||
|
|
||||||
It(fmt.Sprintf("processes the parameter for outgoing streams, as a %s", pers), func() {
|
mockSender.EXPECT().queueControlFrame(gomock.Any()).Times(2)
|
||||||
mockSender.EXPECT().queueControlFrame(gomock.Any())
|
// test we can only 5 bidirectional streams
|
||||||
m.perspective = pers
|
for i := 0; i < 5; i++ {
|
||||||
_, err := m.OpenStream()
|
str, err := m.OpenStream()
|
||||||
expectTooManyStreamsError(err)
|
Expect(err).ToNot(HaveOccurred())
|
||||||
m.UpdateLimits(&wire.TransportParameters{
|
Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream + protocol.StreamID(4*i)))
|
||||||
MaxBidiStreamNum: 5,
|
|
||||||
MaxUniStreamNum: 8,
|
|
||||||
})
|
|
||||||
|
|
||||||
mockSender.EXPECT().queueControlFrame(gomock.Any()).Times(2)
|
|
||||||
// test we can only 5 bidirectional streams
|
|
||||||
for i := 0; i < 5; i++ {
|
|
||||||
str, err := m.OpenStream()
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream + protocol.StreamID(4*i)))
|
|
||||||
}
|
|
||||||
_, err = m.OpenStream()
|
|
||||||
expectTooManyStreamsError(err)
|
|
||||||
// test we can only 8 unidirectional streams
|
|
||||||
for i := 0; i < 8; i++ {
|
|
||||||
str, err := m.OpenUniStream()
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(str.StreamID()).To(Equal(ids.firstOutgoingUniStream + protocol.StreamID(4*i)))
|
|
||||||
}
|
|
||||||
_, err = m.OpenUniStream()
|
|
||||||
expectTooManyStreamsError(err)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
_, err = m.OpenStream()
|
||||||
|
expectTooManyStreamsError(err)
|
||||||
|
// test we can only 8 unidirectional streams
|
||||||
|
for i := 0; i < 8; i++ {
|
||||||
|
str, err := m.OpenUniStream()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(str.StreamID()).To(Equal(ids.firstOutgoingUniStream + protocol.StreamID(4*i)))
|
||||||
|
}
|
||||||
|
_, err = m.OpenUniStream()
|
||||||
|
expectTooManyStreamsError(err)
|
||||||
})
|
})
|
||||||
|
|
||||||
Context("handling MAX_STREAMS frames", func() {
|
Context("handling MAX_STREAMS frames", func() {
|
||||||
|
@ -431,6 +424,28 @@ var _ = Describe("Streams Map", func() {
|
||||||
Expect(err).To(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
Expect(err.Error()).To(Equal(testErr.Error()))
|
Expect(err.Error()).To(Equal(testErr.Error()))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if perspective == protocol.PerspectiveClient {
|
||||||
|
It("resets for 0-RTT", func() {
|
||||||
|
mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes()
|
||||||
|
m.ResetFor0RTT()
|
||||||
|
// make sure that calls to open / accept streams fail
|
||||||
|
_, err := m.OpenStream()
|
||||||
|
Expect(err).To(MatchError(Err0RTTRejected))
|
||||||
|
_, err = m.AcceptStream(context.Background())
|
||||||
|
Expect(err).To(MatchError(Err0RTTRejected))
|
||||||
|
// make sure that we can still get new streams, as the server might be sending us data
|
||||||
|
str, err := m.GetOrOpenReceiveStream(3)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(str).ToNot(BeNil())
|
||||||
|
|
||||||
|
// now switch to using the new streams map
|
||||||
|
m.UseResetMaps()
|
||||||
|
_, err = m.OpenStream()
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("too many open streams"))
|
||||||
|
})
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue