add a http3.Hijacker that allows stream creation on a QUIC session from a http.Response.Body

This commit is contained in:
Marten Seemann 2022-03-22 09:34:11 +01:00
parent 332473668a
commit 57461e01b5
3 changed files with 38 additions and 8 deletions

View file

@ -1,12 +1,28 @@
package http3 package http3
import ( import (
"context"
"fmt" "fmt"
"io" "io"
"github.com/lucas-clemente/quic-go" "github.com/lucas-clemente/quic-go"
) )
type StreamCreator interface {
OpenStream() (quic.Stream, error)
OpenStreamSync(context.Context) (quic.Stream, error)
OpenUniStream() (quic.SendStream, error)
OpenUniStreamSync(context.Context) (quic.SendStream, error)
}
var _ StreamCreator = quic.Connection(nil)
// A Hijacker allows hijacking of the stream creating part of a quic.Session from a http.Response.Body.
// It is used by WebTransport to create WebTransport streams after a session has been established.
type Hijacker interface {
StreamCreator() StreamCreator
}
// The body of a http.Request or http.Response. // The body of a http.Request or http.Response.
type body struct { type body struct {
str quic.Stream str quic.Stream
@ -24,6 +40,13 @@ type body struct {
var _ io.ReadCloser = &body{} var _ io.ReadCloser = &body{}
type hijackableBody struct {
body
conn quic.Connection // only needed to implement Hijacker
}
var _ Hijacker = &hijackableBody{}
func newRequestBody(str quic.Stream, onFrameError func()) *body { func newRequestBody(str quic.Stream, onFrameError func()) *body {
return &body{ return &body{
str: str, str: str,
@ -31,14 +54,21 @@ func newRequestBody(str quic.Stream, onFrameError func()) *body {
} }
} }
func newResponseBody(str quic.Stream, done chan<- struct{}, onFrameError func()) *body { func newResponseBody(str quic.Stream, conn quic.Connection, done chan<- struct{}, onFrameError func()) *hijackableBody {
return &body{ return &hijackableBody{
str: str, body: body{
onFrameError: onFrameError, str: str,
reqDone: done, onFrameError: onFrameError,
reqDone: done,
},
conn: conn,
} }
} }
func (r *hijackableBody) StreamCreator() StreamCreator {
return r.conn
}
func (r *body) Read(b []byte) (int, error) { func (r *body) Read(b []byte) (int, error) {
n, err := r.readImpl(b) n, err := r.readImpl(b)
if err != nil { if err != nil {

View file

@ -29,7 +29,7 @@ func (t bodyType) String() string {
var _ = Describe("Body", func() { var _ = Describe("Body", func() {
var ( var (
rb *body rb io.ReadCloser
str *mockquic.MockStream str *mockquic.MockStream
buf *bytes.Buffer buf *bytes.Buffer
reqDone chan struct{} reqDone chan struct{}
@ -68,7 +68,7 @@ var _ = Describe("Body", func() {
rb = newRequestBody(str, errorCb) rb = newRequestBody(str, errorCb)
case bodyTypeResponse: case bodyTypeResponse:
reqDone = make(chan struct{}) reqDone = make(chan struct{})
rb = newResponseBody(str, reqDone, errorCb) rb = newResponseBody(str, nil, reqDone, errorCb)
} }
}) })

View file

@ -316,7 +316,7 @@ func (c *client) doRequest(
res.Header.Add(hf.Name, hf.Value) res.Header.Add(hf.Name, hf.Value)
} }
} }
respBody := newResponseBody(str, reqDone, func() { respBody := newResponseBody(str, c.conn, reqDone, func() {
c.conn.CloseWithError(quic.ApplicationErrorCode(errorFrameUnexpected), "") c.conn.CloseWithError(quic.ApplicationErrorCode(errorFrameUnexpected), "")
}) })