add a http3.HTTPStreamer, allowing users to take over the HTTP/3 stream

This commit is contained in:
Marten Seemann 2022-05-30 18:34:33 +02:00
parent 5fd6fa7cae
commit 6fa7494c2f
2 changed files with 59 additions and 2 deletions

View file

@ -8,6 +8,14 @@ import (
"github.com/lucas-clemente/quic-go"
)
// The HTTPStreamer allows taking over a HTTP/3 stream. The interface is implemented by:
// * for the server: the http.Request.Body
// * for the client: the http.Response.Body
// On the client side, the stream will be closed for writing, unless the DontCloseRequestStream RoundTripOpt was set.
type HTTPStreamer interface {
HTTPStream() Stream
}
type StreamCreator interface {
OpenStream() (quic.Stream, error)
OpenStreamSync(context.Context) (quic.Stream, error)
@ -30,12 +38,19 @@ type body struct {
str quic.Stream
}
var _ io.ReadCloser = &body{}
var (
_ io.ReadCloser = &body{}
_ HTTPStreamer = &body{}
)
func newRequestBody(str Stream) *body {
return &body{str: str}
}
func (r *body) HTTPStream() Stream {
return r.str
}
func (r *body) Read(b []byte) (int, error) {
return r.str.Read(b)
}
@ -56,7 +71,10 @@ type hijackableBody struct {
reqDoneClosed bool
}
var _ Hijacker = &hijackableBody{}
var (
_ Hijacker = &hijackableBody{}
_ HTTPStreamer = &hijackableBody{}
)
func newResponseBody(str Stream, conn quic.Connection, done chan<- struct{}) *hijackableBody {
return &hijackableBody{
@ -98,3 +116,7 @@ func (r *hijackableBody) Close() error {
r.str.CancelRead(quic.StreamErrorCode(errorRequestCanceled))
return nil
}
func (r *hijackableBody) HTTPStream() Stream {
return r.str
}

View file

@ -311,6 +311,41 @@ var _ = Describe("HTTP tests", func() {
Expect(req.Body.Close()).To(Succeed())
Eventually(done).Should(BeClosed())
})
It("allows taking over the stream", func() {
mux.HandleFunc("/httpstreamer", func(w http.ResponseWriter, r *http.Request) {
defer GinkgoRecover()
w.WriteHeader(200)
w.(http.Flusher).Flush()
str := r.Body.(http3.HTTPStreamer).HTTPStream()
str.Write([]byte("foobar"))
_, err := io.Copy(str, str)
Expect(err).ToNot(HaveOccurred())
Expect(str.Close()).To(Succeed())
})
req, err := http.NewRequest(http.MethodGet, "https://localhost:"+port+"/httpstreamer", nil)
Expect(err).ToNot(HaveOccurred())
rsp, err := client.Transport.(*http3.RoundTripper).RoundTripOpt(req, http3.RoundTripOpt{DontCloseRequestStream: true})
Expect(err).ToNot(HaveOccurred())
Expect(rsp.StatusCode).To(Equal(200))
str := rsp.Body.(http3.HTTPStreamer).HTTPStream()
b := make([]byte, 6)
_, err = io.ReadFull(str, b)
Expect(err).ToNot(HaveOccurred())
Expect(b).To(Equal([]byte("foobar")))
data := GeneratePRData(8 * 1024)
_, err = str.Write(data)
Expect(err).ToNot(HaveOccurred())
Expect(str.Close()).To(Succeed())
repl, err := io.ReadAll(str)
Expect(err).ToNot(HaveOccurred())
Expect(repl).To(Equal(data))
})
})
}
})