don't reset the QPACK encoder / decoder streams

We don't need these streams, since our QPACK implementation doesn't use the
dynamic table yet. However, we MUST NOT close / reset these streams. Instead,
just ignore them.
This commit is contained in:
Marten Seemann 2021-03-04 09:47:57 +08:00
parent f513437854
commit 6f32d2df1d
4 changed files with 73 additions and 7 deletions

View file

@ -156,6 +156,10 @@ func (c *client) handleUnidirectionalStreams() {
// We're only interested in the control stream here. // We're only interested in the control stream here.
switch streamType { switch streamType {
case streamTypeControlStream: case streamTypeControlStream:
case streamTypeQPACKEncoderStream, streamTypeQPACKDecoderStream:
// Our QPACK implementation doesn't use the dynamic table yet.
// TODO: check that only one stream of each type is opened.
return
case streamTypePushStream: case streamTypePushStream:
// We never increased the Push ID, so we don't expect any push streams. // We never increased the Push ID, so we don't expect any push streams.
c.session.CloseWithError(quic.ErrorCode(errorIDError), "") c.session.CloseWithError(quic.ErrorCode(errorIDError), "")

View file

@ -6,6 +6,7 @@ import (
"context" "context"
"crypto/tls" "crypto/tls"
"errors" "errors"
"fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
@ -234,7 +235,33 @@ var _ = Describe("Client", func() {
time.Sleep(scaleDuration(20 * time.Millisecond)) // don't EXPECT any calls to sess.CloseWithError time.Sleep(scaleDuration(20 * time.Millisecond)) // don't EXPECT any calls to sess.CloseWithError
}) })
It("ignores streams other than the control stream", func() { for _, t := range []uint64{streamTypeQPACKEncoderStream, streamTypeQPACKDecoderStream} {
streamType := t
name := "encoder"
if streamType == streamTypeQPACKDecoderStream {
name = "decoder"
}
It(fmt.Sprintf("ignores the QPACK %s streams", name), func() {
buf := &bytes.Buffer{}
quicvarint.Write(buf, streamType)
str := mockquic.NewMockStream(mockCtrl)
str.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes()
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
return str, nil
})
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
<-testDone
return nil, errors.New("test done")
})
_, err := client.RoundTrip(request)
Expect(err).To(MatchError("done"))
time.Sleep(scaleDuration(20 * time.Millisecond)) // don't EXPECT any calls to str.CancelRead
})
}
It("resets streams other than the control stream and the QPACK streams", func() {
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
quicvarint.Write(buf, 1337) quicvarint.Write(buf, 1337)
str := mockquic.NewMockStream(mockCtrl) str := mockquic.NewMockStream(mockCtrl)

View file

@ -30,11 +30,16 @@ var (
) )
const ( const (
nextProtoH3Draft29 = "h3-29" nextProtoH3Draft29 = "h3-29"
nextProtoH3Draft32 = "h3-32" nextProtoH3Draft32 = "h3-32"
nextProtoH3Draft34 = "h3-34" nextProtoH3Draft34 = "h3-34"
streamTypeControlStream = 0 )
streamTypePushStream = 1
const (
streamTypeControlStream = 0
streamTypePushStream = 1
streamTypeQPACKEncoderStream = 2
streamTypeQPACKDecoderStream = 3
) )
func versionToALPN(v protocol.VersionNumber) string { func versionToALPN(v protocol.VersionNumber) string {
@ -291,6 +296,10 @@ func (s *Server) handleUnidirectionalStreams(sess quic.EarlySession) {
// We're only interested in the control stream here. // We're only interested in the control stream here.
switch streamType { switch streamType {
case streamTypeControlStream: case streamTypeControlStream:
case streamTypeQPACKEncoderStream, streamTypeQPACKDecoderStream:
// Our QPACK implementation doesn't use the dynamic table yet.
// TODO: check that only one stream of each type is opened.
return
case streamTypePushStream: // only the server can push case streamTypePushStream: // only the server can push
sess.CloseWithError(quic.ErrorCode(errorStreamCreationError), "") sess.CloseWithError(quic.ErrorCode(errorStreamCreationError), "")
return return

View file

@ -5,6 +5,7 @@ import (
"context" "context"
"crypto/tls" "crypto/tls"
"errors" "errors"
"fmt"
"io" "io"
"net" "net"
"net/http" "net/http"
@ -225,7 +226,32 @@ var _ = Describe("Server", func() {
time.Sleep(scaleDuration(20 * time.Millisecond)) // don't EXPECT any calls to sess.CloseWithError time.Sleep(scaleDuration(20 * time.Millisecond)) // don't EXPECT any calls to sess.CloseWithError
}) })
It("ignores streams other than the control stream", func() { for _, t := range []uint64{streamTypeQPACKEncoderStream, streamTypeQPACKDecoderStream} {
streamType := t
name := "encoder"
if streamType == streamTypeQPACKDecoderStream {
name = "decoder"
}
It(fmt.Sprintf("ignores the QPACK %s streams", name), func() {
buf := &bytes.Buffer{}
quicvarint.Write(buf, streamType)
str := mockquic.NewMockStream(mockCtrl)
str.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes()
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
return str, nil
})
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
<-testDone
return nil, errors.New("test done")
})
s.handleConn(sess)
time.Sleep(scaleDuration(20 * time.Millisecond)) // don't EXPECT any calls to str.CancelRead
})
}
It("reset streams other than the control stream and the QPACK streams", func() {
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
quicvarint.Write(buf, 1337) quicvarint.Write(buf, 1337)
str := mockquic.NewMockStream(mockCtrl) str := mockquic.NewMockStream(mockCtrl)