mirror of
https://github.com/refraction-networking/uquic.git
synced 2025-04-04 12:47:36 +03:00
accept the control stream and parse SETTINGS frame, for the H3 client
This commit is contained in:
parent
f92b0ec74a
commit
808671e2d4
3 changed files with 205 additions and 10 deletions
|
@ -2,6 +2,7 @@ package http3
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -109,7 +110,7 @@ func (c *client) dial() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// run the sesssion setup using 0-RTT data
|
// send the SETTINGs frame, using 0-RTT data, if possible
|
||||||
go func() {
|
go func() {
|
||||||
if err := c.setupSession(); err != nil {
|
if err := c.setupSession(); err != nil {
|
||||||
c.logger.Debugf("Setting up session failed: %s", err)
|
c.logger.Debugf("Setting up session failed: %s", err)
|
||||||
|
@ -117,6 +118,7 @@ func (c *client) dial() error {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
go c.handleUnidirectionalStreams()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,15 +129,47 @@ func (c *client) setupSession() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
buf := &bytes.Buffer{}
|
buf := &bytes.Buffer{}
|
||||||
// write the type byte
|
utils.WriteVarInt(buf, streamTypeControlStream)
|
||||||
buf.Write([]byte{0x0})
|
|
||||||
// send the SETTINGS frame
|
// send the SETTINGS frame
|
||||||
(&settingsFrame{}).Write(buf)
|
(&settingsFrame{}).Write(buf)
|
||||||
if _, err := str.Write(buf.Bytes()); err != nil {
|
_, err = str.Write(buf.Bytes())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
func (c *client) handleUnidirectionalStreams() {
|
||||||
|
for {
|
||||||
|
str, err := c.session.AcceptUniStream(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
c.logger.Debugf("accepting unidirectional stream failed: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
streamType, err := utils.ReadVarInt(&byteReaderImpl{str})
|
||||||
|
if err != nil {
|
||||||
|
c.logger.Debugf("reading stream type on stream %d failed: %s", str.StreamID(), err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// We're only interested in the control stream here.
|
||||||
|
switch streamType {
|
||||||
|
case streamTypeControlStream:
|
||||||
|
case streamTypePushStream:
|
||||||
|
// We never increased the Push ID, so we don't expect any push streams.
|
||||||
|
c.session.CloseWithError(quic.ErrorCode(errorIDError), "")
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
f, err := parseNextFrame(str)
|
||||||
|
if err != nil {
|
||||||
|
c.session.CloseWithError(quic.ErrorCode(errorFrameError), "")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if _, ok := f.(*settingsFrame); !ok {
|
||||||
|
c.session.CloseWithError(quic.ErrorCode(errorMissingSettings), "")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *client) Close() error {
|
func (c *client) Close() error {
|
||||||
|
|
|
@ -169,6 +169,155 @@ var _ = Describe("Client", func() {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Context("control stream handling", func() {
|
||||||
|
var (
|
||||||
|
request *http.Request
|
||||||
|
sess *mockquic.MockEarlySession
|
||||||
|
settingsFrameWritten chan struct{}
|
||||||
|
)
|
||||||
|
testDone := make(chan struct{})
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
settingsFrameWritten = make(chan struct{})
|
||||||
|
controlStr := mockquic.NewMockStream(mockCtrl)
|
||||||
|
controlStr.EXPECT().Write(gomock.Any()).Do(func(b []byte) {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
close(settingsFrameWritten)
|
||||||
|
})
|
||||||
|
sess = mockquic.NewMockEarlySession(mockCtrl)
|
||||||
|
sess.EXPECT().OpenUniStream().Return(controlStr, nil)
|
||||||
|
sess.EXPECT().HandshakeComplete().Return(handshakeCtx)
|
||||||
|
sess.EXPECT().OpenStreamSync(gomock.Any()).Return(nil, errors.New("done"))
|
||||||
|
dialAddr = func(hostname string, _ *tls.Config, _ *quic.Config) (quic.EarlySession, error) { return sess, nil }
|
||||||
|
var err error
|
||||||
|
request, err = http.NewRequest("GET", "https://quic.clemente.io:1337/file1.dat", nil)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
testDone <- struct{}{}
|
||||||
|
Eventually(settingsFrameWritten).Should(BeClosed())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("parses the SETTINGS frame", func() {
|
||||||
|
buf := &bytes.Buffer{}
|
||||||
|
utils.WriteVarInt(buf, streamTypeControlStream)
|
||||||
|
(&settingsFrame{}).Write(buf)
|
||||||
|
controlStr := mockquic.NewMockStream(mockCtrl)
|
||||||
|
controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes()
|
||||||
|
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
|
||||||
|
return controlStr, nil
|
||||||
|
})
|
||||||
|
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
|
||||||
|
<-testDone
|
||||||
|
return nil, errors.New("test done")
|
||||||
|
})
|
||||||
|
_, err := client.RoundTrip(request)
|
||||||
|
Expect(err).To(MatchError("done"))
|
||||||
|
time.Sleep(scaleDuration(20 * time.Millisecond)) // don't EXPECT any calls to sess.CloseWithError
|
||||||
|
})
|
||||||
|
|
||||||
|
It("ignores streams other than the control stream", func() {
|
||||||
|
controlBuf := &bytes.Buffer{}
|
||||||
|
utils.WriteVarInt(controlBuf, streamTypeControlStream)
|
||||||
|
(&settingsFrame{}).Write(controlBuf)
|
||||||
|
controlStr := mockquic.NewMockStream(mockCtrl)
|
||||||
|
controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(controlBuf.Read).AnyTimes()
|
||||||
|
|
||||||
|
otherBuf := &bytes.Buffer{}
|
||||||
|
utils.WriteVarInt(otherBuf, 1337)
|
||||||
|
otherStr := mockquic.NewMockStream(mockCtrl)
|
||||||
|
otherStr.EXPECT().Read(gomock.Any()).DoAndReturn(otherBuf.Read).AnyTimes()
|
||||||
|
|
||||||
|
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
|
||||||
|
return otherStr, nil
|
||||||
|
})
|
||||||
|
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
|
||||||
|
return controlStr, nil
|
||||||
|
})
|
||||||
|
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
|
||||||
|
<-testDone
|
||||||
|
return nil, errors.New("test done")
|
||||||
|
})
|
||||||
|
_, err := client.RoundTrip(request)
|
||||||
|
Expect(err).To(MatchError("done"))
|
||||||
|
time.Sleep(scaleDuration(20 * time.Millisecond)) // don't EXPECT any calls to sess.CloseWithError
|
||||||
|
})
|
||||||
|
|
||||||
|
It("errors when the first frame on the control stream is not a SETTINGS frame", func() {
|
||||||
|
buf := &bytes.Buffer{}
|
||||||
|
utils.WriteVarInt(buf, streamTypeControlStream)
|
||||||
|
(&dataFrame{}).Write(buf)
|
||||||
|
controlStr := mockquic.NewMockStream(mockCtrl)
|
||||||
|
controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes()
|
||||||
|
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
|
||||||
|
return controlStr, nil
|
||||||
|
})
|
||||||
|
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
|
||||||
|
<-testDone
|
||||||
|
return nil, errors.New("test done")
|
||||||
|
})
|
||||||
|
done := make(chan struct{})
|
||||||
|
sess.EXPECT().CloseWithError(gomock.Any(), gomock.Any()).Do(func(code quic.ErrorCode, _ string) {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
Expect(code).To(BeEquivalentTo(errorMissingSettings))
|
||||||
|
close(done)
|
||||||
|
})
|
||||||
|
_, err := client.RoundTrip(request)
|
||||||
|
Expect(err).To(MatchError("done"))
|
||||||
|
Eventually(done).Should(BeClosed())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("errors when parsing the frame on the control stream fails", func() {
|
||||||
|
buf := &bytes.Buffer{}
|
||||||
|
utils.WriteVarInt(buf, streamTypeControlStream)
|
||||||
|
b := &bytes.Buffer{}
|
||||||
|
(&settingsFrame{}).Write(b)
|
||||||
|
buf.Write(b.Bytes()[:b.Len()-1])
|
||||||
|
controlStr := mockquic.NewMockStream(mockCtrl)
|
||||||
|
controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes()
|
||||||
|
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
|
||||||
|
return controlStr, nil
|
||||||
|
})
|
||||||
|
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
|
||||||
|
<-testDone
|
||||||
|
return nil, errors.New("test done")
|
||||||
|
})
|
||||||
|
done := make(chan struct{})
|
||||||
|
sess.EXPECT().CloseWithError(gomock.Any(), gomock.Any()).Do(func(code quic.ErrorCode, _ string) {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
Expect(code).To(BeEquivalentTo(errorFrameError))
|
||||||
|
close(done)
|
||||||
|
})
|
||||||
|
_, err := client.RoundTrip(request)
|
||||||
|
Expect(err).To(MatchError("done"))
|
||||||
|
Eventually(done).Should(BeClosed())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("errors when parsing the server opens a push stream", func() {
|
||||||
|
buf := &bytes.Buffer{}
|
||||||
|
utils.WriteVarInt(buf, streamTypePushStream)
|
||||||
|
controlStr := mockquic.NewMockStream(mockCtrl)
|
||||||
|
controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes()
|
||||||
|
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
|
||||||
|
return controlStr, nil
|
||||||
|
})
|
||||||
|
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
|
||||||
|
<-testDone
|
||||||
|
return nil, errors.New("test done")
|
||||||
|
})
|
||||||
|
done := make(chan struct{})
|
||||||
|
sess.EXPECT().CloseWithError(gomock.Any(), gomock.Any()).Do(func(code quic.ErrorCode, _ string) {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
Expect(code).To(BeEquivalentTo(errorIDError))
|
||||||
|
close(done)
|
||||||
|
})
|
||||||
|
_, err := client.RoundTrip(request)
|
||||||
|
Expect(err).To(MatchError("done"))
|
||||||
|
Eventually(done).Should(BeClosed())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
Context("Doing requests", func() {
|
Context("Doing requests", func() {
|
||||||
var (
|
var (
|
||||||
request *http.Request
|
request *http.Request
|
||||||
|
@ -176,6 +325,7 @@ var _ = Describe("Client", func() {
|
||||||
sess *mockquic.MockEarlySession
|
sess *mockquic.MockEarlySession
|
||||||
settingsFrameWritten chan struct{}
|
settingsFrameWritten chan struct{}
|
||||||
)
|
)
|
||||||
|
testDone := make(chan struct{})
|
||||||
|
|
||||||
decodeHeader := func(str io.Reader) map[string]string {
|
decodeHeader := func(str io.Reader) map[string]string {
|
||||||
fields := make(map[string]string)
|
fields := make(map[string]string)
|
||||||
|
@ -210,15 +360,18 @@ var _ = Describe("Client", func() {
|
||||||
str = mockquic.NewMockStream(mockCtrl)
|
str = mockquic.NewMockStream(mockCtrl)
|
||||||
sess = mockquic.NewMockEarlySession(mockCtrl)
|
sess = mockquic.NewMockEarlySession(mockCtrl)
|
||||||
sess.EXPECT().OpenUniStream().Return(controlStr, nil)
|
sess.EXPECT().OpenUniStream().Return(controlStr, nil)
|
||||||
dialAddr = func(hostname string, _ *tls.Config, _ *quic.Config) (quic.EarlySession, error) {
|
sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
|
||||||
return sess, nil
|
<-testDone
|
||||||
}
|
return nil, errors.New("test done")
|
||||||
|
})
|
||||||
|
dialAddr = func(hostname string, _ *tls.Config, _ *quic.Config) (quic.EarlySession, error) { return sess, nil }
|
||||||
var err error
|
var err error
|
||||||
request, err = http.NewRequest("GET", "https://quic.clemente.io:1337/file1.dat", nil)
|
request, err = http.NewRequest("GET", "https://quic.clemente.io:1337/file1.dat", nil)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
AfterEach(func() {
|
AfterEach(func() {
|
||||||
|
testDone <- struct{}{}
|
||||||
Eventually(settingsFrameWritten).Should(BeClosed())
|
Eventually(settingsFrameWritten).Should(BeClosed())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -101,6 +101,10 @@ var _ = Describe("RoundTripper", func() {
|
||||||
session.EXPECT().OpenUniStream().AnyTimes().Return(nil, testErr)
|
session.EXPECT().OpenUniStream().AnyTimes().Return(nil, testErr)
|
||||||
session.EXPECT().HandshakeComplete().Return(handshakeCtx)
|
session.EXPECT().HandshakeComplete().Return(handshakeCtx)
|
||||||
session.EXPECT().OpenStreamSync(context.Background()).Return(nil, testErr)
|
session.EXPECT().OpenStreamSync(context.Background()).Return(nil, testErr)
|
||||||
|
session.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
|
||||||
|
<-closed
|
||||||
|
return nil, errors.New("test done")
|
||||||
|
}).MaxTimes(1)
|
||||||
session.EXPECT().CloseWithError(gomock.Any(), gomock.Any()).Do(func(quic.ErrorCode, string) { close(closed) })
|
session.EXPECT().CloseWithError(gomock.Any(), gomock.Any()).Do(func(quic.ErrorCode, string) { close(closed) })
|
||||||
_, err = rt.RoundTrip(req)
|
_, err = rt.RoundTrip(req)
|
||||||
Expect(err).To(MatchError(testErr))
|
Expect(err).To(MatchError(testErr))
|
||||||
|
@ -139,6 +143,10 @@ var _ = Describe("RoundTripper", func() {
|
||||||
session.EXPECT().OpenUniStream().AnyTimes().Return(nil, testErr)
|
session.EXPECT().OpenUniStream().AnyTimes().Return(nil, testErr)
|
||||||
session.EXPECT().HandshakeComplete().Return(handshakeCtx).Times(2)
|
session.EXPECT().HandshakeComplete().Return(handshakeCtx).Times(2)
|
||||||
session.EXPECT().OpenStreamSync(context.Background()).Return(nil, testErr).Times(2)
|
session.EXPECT().OpenStreamSync(context.Background()).Return(nil, testErr).Times(2)
|
||||||
|
session.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
|
||||||
|
<-closed
|
||||||
|
return nil, errors.New("test done")
|
||||||
|
}).MaxTimes(1)
|
||||||
session.EXPECT().CloseWithError(gomock.Any(), gomock.Any()).Do(func(quic.ErrorCode, string) { close(closed) })
|
session.EXPECT().CloseWithError(gomock.Any(), gomock.Any()).Do(func(quic.ErrorCode, string) { close(closed) })
|
||||||
req, err := http.NewRequest("GET", "https://quic.clemente.io/file1.html", nil)
|
req, err := http.NewRequest("GET", "https://quic.clemente.io/file1.html", nil)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue