accept the control stream and parse SETTINGS frame, for the H3 server

This commit is contained in:
Marten Seemann 2020-12-21 15:18:09 +07:00
parent 9693a46d31
commit bf54ffe0df
3 changed files with 166 additions and 4 deletions

View file

@ -1,7 +1,10 @@
package http3
import (
"os"
"strconv"
"testing"
"time"
"github.com/golang/mock/gomock"
@ -23,3 +26,13 @@ var _ = BeforeEach(func() {
var _ = AfterEach(func() {
mockCtrl.Finish()
})
//nolint:unparam
func scaleDuration(t time.Duration) time.Duration {
scaleFactor := 1
if f, err := strconv.Atoi(os.Getenv("TIMESCALE_FACTOR")); err == nil { // parsing "" errors, so this works fine if the env is not set
scaleFactor = f
}
Expect(scaleFactor).ToNot(BeZero())
return time.Duration(scaleFactor) * t
}

View file

@ -37,8 +37,9 @@ var (
)
const (
nextProtoH3Draft29 = "h3-29"
nextProtoH3Draft32 = "h3-32"
nextProtoH3Draft29 = "h3-29"
nextProtoH3Draft32 = "h3-32"
streamTypeControlStream = 0
)
func versionToALPN(v protocol.VersionNumber) string {
@ -219,10 +220,13 @@ func (s *Server) handleConn(sess quic.EarlySession) {
s.logger.Debugf("Opening the control stream failed.")
return
}
buf := bytes.NewBuffer([]byte{0})
buf := &bytes.Buffer{}
utils.WriteVarInt(buf, streamTypeControlStream) // stream type
(&settingsFrame{}).Write(buf)
str.Write(buf.Bytes())
go s.handleUnidirectionalStreams(sess)
// Process all requests immediately.
// It's the client's responsibility to decide which requests are eligible for 0-RTT.
for {
@ -254,6 +258,35 @@ func (s *Server) handleConn(sess quic.EarlySession) {
}
}
func (s *Server) handleUnidirectionalStreams(sess quic.EarlySession) {
for {
str, err := sess.AcceptUniStream(context.Background())
if err != nil {
s.logger.Debugf("accepting unidirectional stream failed: %s", err)
return
}
go func(str quic.ReceiveStream) {
streamType, err := utils.ReadVarInt(&byteReaderImpl{str})
if err != nil {
s.logger.Debugf("reading stream type on stream %d failed: %s", str.StreamID(), err)
return
}
if streamType != streamTypeControlStream {
return
}
f, err := parseNextFrame(str)
if err != nil {
sess.CloseWithError(quic.ErrorCode(errorFrameError), "")
return
}
if _, ok := f.(*settingsFrame); !ok {
sess.CloseWithError(quic.ErrorCode(errorMissingSettings), "")
}
}(str)
}
}
func (s *Server) maxHeaderBytes() uint64 {
if s.Server.MaxHeaderBytes <= 0 {
return http.DefaultMaxHeaderBytes

View file

@ -181,21 +181,137 @@ var _ = Describe("Server", func() {
Expect(hfs).To(HaveKeyWithValue(":status", []string{"500"}))
})
Context("stream- and connection-level errors", func() {
Context("control stream handling", func() {
var sess *mockquic.MockEarlySession
testDone := make(chan struct{})
BeforeEach(func() {
sess = mockquic.NewMockEarlySession(mockCtrl)
controlStr := mockquic.NewMockStream(mockCtrl)
controlStr.EXPECT().Write(gomock.Any())
sess.EXPECT().OpenUniStream().Return(controlStr, nil)
sess.EXPECT().AcceptStream(gomock.Any()).Return(nil, errors.New("done"))
sess.EXPECT().RemoteAddr().Return(&net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 1337}).AnyTimes()
sess.EXPECT().LocalAddr().AnyTimes()
})
AfterEach(func() { testDone <- struct{}{} })
It("parses the SETTINGS frame", func() {
buf := &bytes.Buffer{}
utils.WriteVarInt(buf, streamTypeControlStream)
(&settingsFrame{}).Write(buf)
controlStr := mockquic.NewMockStream(mockCtrl)
controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes()
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
return controlStr, nil
})
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
<-testDone
return nil, errors.New("test done")
})
s.handleConn(sess)
time.Sleep(scaleDuration(20 * time.Millisecond)) // don't EXPECT any calls to sess.CloseWithError
})
It("ignores streams other than the control stream", func() {
controlBuf := &bytes.Buffer{}
utils.WriteVarInt(controlBuf, streamTypeControlStream)
(&settingsFrame{}).Write(controlBuf)
controlStr := mockquic.NewMockStream(mockCtrl)
controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(controlBuf.Read).AnyTimes()
otherBuf := &bytes.Buffer{}
utils.WriteVarInt(otherBuf, 1337)
otherStr := mockquic.NewMockStream(mockCtrl)
otherStr.EXPECT().Read(gomock.Any()).DoAndReturn(otherBuf.Read).AnyTimes()
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
return otherStr, nil
})
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
return controlStr, nil
})
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
<-testDone
return nil, errors.New("test done")
})
s.handleConn(sess)
time.Sleep(scaleDuration(20 * time.Millisecond)) // don't EXPECT any calls to sess.CloseWithError
})
It("errors when the first frame on the control stream is not a SETTINGS frame", func() {
buf := &bytes.Buffer{}
utils.WriteVarInt(buf, streamTypeControlStream)
(&dataFrame{}).Write(buf)
controlStr := mockquic.NewMockStream(mockCtrl)
controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes()
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
return controlStr, nil
})
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
<-testDone
return nil, errors.New("test done")
})
done := make(chan struct{})
sess.EXPECT().CloseWithError(gomock.Any(), gomock.Any()).Do(func(code quic.ErrorCode, _ string) {
defer GinkgoRecover()
Expect(code).To(BeEquivalentTo(errorMissingSettings))
close(done)
})
s.handleConn(sess)
Eventually(done).Should(BeClosed())
})
It("errors when parsing the frame on the control stream fails", func() {
buf := &bytes.Buffer{}
utils.WriteVarInt(buf, streamTypeControlStream)
b := &bytes.Buffer{}
(&settingsFrame{}).Write(b)
buf.Write(b.Bytes()[:b.Len()-1])
controlStr := mockquic.NewMockStream(mockCtrl)
controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes()
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
return controlStr, nil
})
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
<-testDone
return nil, errors.New("test done")
})
done := make(chan struct{})
sess.EXPECT().CloseWithError(gomock.Any(), gomock.Any()).Do(func(code quic.ErrorCode, _ string) {
defer GinkgoRecover()
Expect(code).To(BeEquivalentTo(errorFrameError))
close(done)
})
s.handleConn(sess)
Eventually(done).Should(BeClosed())
})
})
Context("stream- and connection-level errors", func() {
var sess *mockquic.MockEarlySession
testDone := make(chan struct{})
BeforeEach(func() {
testDone = make(chan struct{})
addr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 1337}
sess = mockquic.NewMockEarlySession(mockCtrl)
controlStr := mockquic.NewMockStream(mockCtrl)
controlStr.EXPECT().Write(gomock.Any())
sess.EXPECT().OpenUniStream().Return(controlStr, nil)
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
<-testDone
return nil, errors.New("test done")
})
sess.EXPECT().AcceptStream(gomock.Any()).Return(str, nil)
sess.EXPECT().AcceptStream(gomock.Any()).Return(nil, errors.New("done"))
sess.EXPECT().RemoteAddr().Return(addr).AnyTimes()
sess.EXPECT().LocalAddr().AnyTimes()
})
AfterEach(func() { testDone <- struct{}{} })
It("cancels reading when client sends a body in GET request", func() {
handlerCalled := make(chan struct{})
s.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {