rework stream / connection erros for the client

This commit is contained in:
Marten Seemann 2019-09-14 10:17:15 +07:00
parent 917cfc0d39
commit b6330461d6
2 changed files with 62 additions and 42 deletions

View file

@ -130,8 +130,7 @@ func (c *client) maxHeaderBytes() uint64 {
return uint64(c.opts.MaxHeaderBytes)
}
// Roundtrip executes a request and returns a response
// TODO: handle request cancelations
// RoundTrip executes a request and returns a response
func (c *client) RoundTrip(req *http.Request) (*http.Response, error) {
if req.URL.Scheme != "https" {
return nil, errors.New("http3: unsupported scheme")
@ -153,7 +152,22 @@ func (c *client) RoundTrip(req *http.Request) (*http.Response, error) {
return nil, err
}
// Request Cancelation:
rsp, rerr := c.doRequest(req, str)
if rerr.streamErr != 0 {
str.CancelWrite(quic.ErrorCode(rerr.streamErr))
}
if rerr.connErr != 0 {
var reason string
if rerr.err != nil {
reason = rerr.err.Error()
}
c.session.CloseWithError(quic.ErrorCode(rerr.connErr), reason)
}
return rsp, rerr.err
}
func (c *client) doRequest(req *http.Request, str quic.Stream) (*http.Response, requestError) {
// Request Cancellation:
// This go routine keeps running even after RoundTrip() returns.
// It is shut down when the application is done processing the body.
reqDone := make(chan struct{})
@ -171,28 +185,30 @@ func (c *client) RoundTrip(req *http.Request) (*http.Response, error) {
requestGzip = true
}
if err := c.requestWriter.WriteRequest(str, req, requestGzip); err != nil {
return nil, err
return nil, newStreamError(errorInternalError, err)
}
frame, err := parseNextFrame(str)
if err != nil {
return nil, err
return nil, newStreamError(errorFrameError, err)
}
hf, ok := frame.(*headersFrame)
if !ok {
return nil, errors.New("not a HEADERS frame")
return nil, newConnError(errorUnexpectedFrame, errors.New("expected first frame to be a HEADERS frame"))
}
if hf.Length > c.maxHeaderBytes() {
return nil, fmt.Errorf("Headers frame too large: %d bytes (max: %d)", hf.Length, c.maxHeaderBytes())
return nil, newStreamError(errorFrameError, fmt.Errorf("HEADERS frame too large: %d bytes (max: %d)", hf.Length, c.maxHeaderBytes()))
}
headerBlock := make([]byte, hf.Length)
if _, err := io.ReadFull(str, headerBlock); err != nil {
return nil, err
return nil, newStreamError(errorRequestIncomplete, err)
}
hfs, err := c.decoder.DecodeFull(headerBlock)
if err != nil {
return nil, err
// TODO: use the right error code
return nil, newConnError(errorGeneralProtocolError, err)
}
res := &http.Response{
Proto: "HTTP/3",
ProtoMajor: 3,
@ -203,7 +219,7 @@ func (c *client) RoundTrip(req *http.Request) (*http.Response, error) {
case ":status":
status, err := strconv.Atoi(hf.Value)
if err != nil {
return nil, errors.New("malformed non-numeric status pseudo header")
return nil, newStreamError(errorGeneralProtocolError, errors.New("malformed non-numeric status pseudo header"))
}
res.StatusCode = status
res.Status = hf.Value + " " + http.StatusText(status)
@ -222,5 +238,5 @@ func (c *client) RoundTrip(req *http.Request) (*http.Response, error) {
res.Body = respBody
}
return res, nil
return res, requestError{}
}

View file

@ -181,23 +181,6 @@ var _ = Describe("Client", func() {
Expect(err).ToNot(HaveOccurred())
})
It("sends a request", func() {
sess.EXPECT().OpenStreamSync(context.Background()).Return(str, nil)
buf := &bytes.Buffer{}
str.EXPECT().Write(gomock.Any()).DoAndReturn(func(p []byte) (int, error) {
return buf.Write(p)
})
str.EXPECT().Close()
str.EXPECT().Read(gomock.Any()).Return(0, errors.New("test done"))
_, err := client.RoundTrip(request)
Expect(err).To(MatchError("test done"))
hfs := decodeHeader(buf)
Expect(hfs).To(HaveKeyWithValue(":scheme", "https"))
Expect(hfs).To(HaveKeyWithValue(":method", "GET"))
Expect(hfs).To(HaveKeyWithValue(":authority", "quic.clemente.io:1337"))
Expect(hfs).To(HaveKeyWithValue(":path", "/file1.dat"))
})
It("returns a response", func() {
rspBuf := &bytes.Buffer{}
rw := newResponseWriter(rspBuf, utils.DefaultLogger)
@ -250,7 +233,10 @@ var _ = Describe("Client", func() {
It("sends a request", func() {
done := make(chan struct{})
str.EXPECT().Close().Do(func() { close(done) })
gomock.InOrder(
str.EXPECT().Close().Do(func() { close(done) }),
str.EXPECT().CancelWrite(gomock.Any()).MaxTimes(1), // when reading the response errors
)
// the response body is sent asynchronously, while already reading the response
str.EXPECT().Read(gomock.Any()).DoAndReturn(func([]byte) (int, error) {
<-done
@ -266,9 +252,13 @@ var _ = Describe("Client", func() {
It("returns the error that occurred when reading the body", func() {
request.Body.(*mockBody).readErr = errors.New("testErr")
done := make(chan struct{})
str.EXPECT().CancelWrite(quic.ErrorCode(errorRequestCanceled)).Do(func(quic.ErrorCode) {
close(done)
})
gomock.InOrder(
str.EXPECT().CancelWrite(quic.ErrorCode(errorRequestCanceled)).Do(func(quic.ErrorCode) {
close(done)
}),
str.EXPECT().CancelWrite(gomock.Any()),
)
// the response body is sent asynchronously, while already reading the response
str.EXPECT().Read(gomock.Any()).DoAndReturn(func([]byte) (int, error) {
<-done
@ -278,30 +268,32 @@ var _ = Describe("Client", func() {
Expect(err).To(MatchError("test done"))
})
It("errors when the first frame is not a HEADERS frame", func() {
It("closes the connection when the first frame is not a HEADERS frame", func() {
buf := &bytes.Buffer{}
(&dataFrame{Length: 0x42}).Write(buf)
sess.EXPECT().CloseWithError(quic.ErrorCode(errorUnexpectedFrame), gomock.Any())
str.EXPECT().Close().MaxTimes(1)
str.EXPECT().Read(gomock.Any()).DoAndReturn(func(b []byte) (int, error) {
return buf.Read(b)
}).AnyTimes()
_, err := client.RoundTrip(request)
Expect(err).To(MatchError("not a HEADERS frame"))
Expect(err).To(MatchError("expected first frame to be a HEADERS frame"))
})
It("errors when the first frame is not a HEADERS frame", func() {
It("cancels the stream when the HEADERS frame is too large", func() {
buf := &bytes.Buffer{}
(&headersFrame{Length: 1338}).Write(buf)
str.EXPECT().CancelWrite(quic.ErrorCode(errorFrameError))
str.EXPECT().Close().MaxTimes(1)
str.EXPECT().Read(gomock.Any()).DoAndReturn(func(b []byte) (int, error) {
return buf.Read(b)
}).AnyTimes()
_, err := client.RoundTrip(request)
Expect(err).To(MatchError("Headers frame too large: 1338 bytes (max: 1337)"))
Expect(err).To(MatchError("HEADERS frame too large: 1338 bytes (max: 1337)"))
})
})
Context("request cancelations", func() {
Context("request cancellations", func() {
It("cancels a request while the request is still in flight", func() {
ctx, cancel := context.WithCancel(context.Background())
req := request.WithContext(ctx)
@ -309,14 +301,20 @@ var _ = Describe("Client", func() {
buf := &bytes.Buffer{}
str.EXPECT().Close().MaxTimes(1)
done := make(chan struct{})
str.EXPECT().Write(gomock.Any()).DoAndReturn(func(p []byte) (int, error) {
return buf.Write(p)
})
str.EXPECT().CancelWrite(quic.ErrorCode(errorRequestCanceled))
str.EXPECT().CancelRead(quic.ErrorCode(errorRequestCanceled)).Do(func(quic.ErrorCode) { close(done) })
done := make(chan struct{})
canceled := make(chan struct{})
gomock.InOrder(
str.EXPECT().CancelWrite(quic.ErrorCode(errorRequestCanceled)).Do(func(quic.ErrorCode) { close(canceled) }),
str.EXPECT().CancelWrite(gomock.Any()).MaxTimes(1).Do(func(quic.ErrorCode) { close(done) }),
)
str.EXPECT().CancelRead(quic.ErrorCode(errorRequestCanceled))
str.EXPECT().Read(gomock.Any()).DoAndReturn(func([]byte) (int, error) {
cancel()
<-canceled
return 0, errors.New("test done")
})
_, err := client.RoundTrip(req)
@ -375,7 +373,10 @@ var _ = Describe("Client", func() {
str.EXPECT().Write(gomock.Any()).DoAndReturn(func(p []byte) (int, error) {
return buf.Write(p)
})
str.EXPECT().Close()
gomock.InOrder(
str.EXPECT().Close(),
str.EXPECT().CancelWrite(gomock.Any()).MaxTimes(1), // when the Read errors
)
str.EXPECT().Read(gomock.Any()).Return(0, errors.New("test done"))
_, err := client.RoundTrip(request)
Expect(err).To(MatchError("test done"))
@ -390,7 +391,10 @@ var _ = Describe("Client", func() {
str.EXPECT().Write(gomock.Any()).DoAndReturn(func(p []byte) (int, error) {
return buf.Write(p)
})
str.EXPECT().Close()
gomock.InOrder(
str.EXPECT().Close(),
str.EXPECT().CancelWrite(gomock.Any()).MaxTimes(1), // when the Read errors
)
str.EXPECT().Read(gomock.Any()).Return(0, errors.New("test done"))
_, err := client.RoundTrip(request)
Expect(err).To(MatchError("test done"))