mirror of
https://github.com/refraction-networking/uquic.git
synced 2025-04-04 20:57:36 +03:00
remove the http3.DataStreamer (#3435)
This commit is contained in:
parent
e27fa1c9cf
commit
ccf897e519
3 changed files with 4 additions and 73 deletions
|
@ -12,25 +12,14 @@ import (
|
||||||
"github.com/marten-seemann/qpack"
|
"github.com/marten-seemann/qpack"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DataStreamer lets the caller take over the stream. After a call to DataStream
|
|
||||||
// the HTTP server library will not do anything else with the connection.
|
|
||||||
//
|
|
||||||
// It becomes the caller's responsibility to manage and close the stream.
|
|
||||||
//
|
|
||||||
// After a call to DataStream, the original Request.Body must not be used.
|
|
||||||
type DataStreamer interface {
|
|
||||||
DataStream() quic.Stream
|
|
||||||
}
|
|
||||||
|
|
||||||
type responseWriter struct {
|
type responseWriter struct {
|
||||||
conn quic.Connection
|
conn quic.Connection
|
||||||
stream quic.Stream // needed for DataStream()
|
stream quic.Stream // needed for DataStream()
|
||||||
bufferedStream *bufio.Writer
|
bufferedStream *bufio.Writer
|
||||||
|
|
||||||
header http.Header
|
header http.Header
|
||||||
status int // status code passed to WriteHeader
|
status int // status code passed to WriteHeader
|
||||||
headerWritten bool
|
headerWritten bool
|
||||||
dataStreamUsed bool // set when DataSteam() is called
|
|
||||||
|
|
||||||
logger utils.Logger
|
logger utils.Logger
|
||||||
}
|
}
|
||||||
|
@ -38,7 +27,6 @@ type responseWriter struct {
|
||||||
var (
|
var (
|
||||||
_ http.ResponseWriter = &responseWriter{}
|
_ http.ResponseWriter = &responseWriter{}
|
||||||
_ http.Flusher = &responseWriter{}
|
_ http.Flusher = &responseWriter{}
|
||||||
_ DataStreamer = &responseWriter{}
|
|
||||||
_ Hijacker = &responseWriter{}
|
_ Hijacker = &responseWriter{}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -112,16 +100,6 @@ func (w *responseWriter) Flush() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *responseWriter) usedDataStream() bool {
|
|
||||||
return w.dataStreamUsed
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *responseWriter) DataStream() quic.Stream {
|
|
||||||
w.dataStreamUsed = true
|
|
||||||
w.Flush()
|
|
||||||
return w.stream
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *responseWriter) StreamID() quic.StreamID {
|
func (w *responseWriter) StreamID() quic.StreamID {
|
||||||
return w.stream.StreamID()
|
return w.stream.StreamID()
|
||||||
}
|
}
|
||||||
|
|
|
@ -562,11 +562,7 @@ func (s *Server) handleRequest(conn quic.Connection, str quic.Stream, decoder *q
|
||||||
ctx = context.WithValue(ctx, http.LocalAddrContextKey, conn.LocalAddr())
|
ctx = context.WithValue(ctx, http.LocalAddrContextKey, conn.LocalAddr())
|
||||||
req = req.WithContext(ctx)
|
req = req.WithContext(ctx)
|
||||||
r := newResponseWriter(str, conn, s.logger)
|
r := newResponseWriter(str, conn, s.logger)
|
||||||
defer func() {
|
defer r.Flush()
|
||||||
if !r.usedDataStream() {
|
|
||||||
r.Flush()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
handler := s.Handler
|
handler := s.Handler
|
||||||
if handler == nil {
|
if handler == nil {
|
||||||
handler = http.DefaultServeMux
|
handler = http.DefaultServeMux
|
||||||
|
@ -587,10 +583,6 @@ func (s *Server) handleRequest(conn quic.Connection, str quic.Stream, decoder *q
|
||||||
handler.ServeHTTP(r, req)
|
handler.ServeHTTP(r, req)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if r.usedDataStream() {
|
|
||||||
return requestError{err: errHijacked}
|
|
||||||
}
|
|
||||||
|
|
||||||
if panicked {
|
if panicked {
|
||||||
r.WriteHeader(500)
|
r.WriteHeader(500)
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -222,45 +222,6 @@ var _ = Describe("Server", func() {
|
||||||
Expect(hfs).To(HaveKeyWithValue(":status", []string{"500"}))
|
Expect(hfs).To(HaveKeyWithValue(":status", []string{"500"}))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("doesn't close the stream if the handler called DataStream()", func() {
|
|
||||||
s.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
str := w.(DataStreamer).DataStream()
|
|
||||||
str.Write([]byte("foobar"))
|
|
||||||
})
|
|
||||||
|
|
||||||
rspWritten := make(chan struct{})
|
|
||||||
setRequest(encodeRequest(exampleGetRequest))
|
|
||||||
str.EXPECT().Context().Return(reqContext)
|
|
||||||
str.EXPECT().Write([]byte("foobar")).Do(func(b []byte) (int, error) {
|
|
||||||
close(rspWritten)
|
|
||||||
return len(b), nil
|
|
||||||
})
|
|
||||||
// don't EXPECT CancelRead()
|
|
||||||
|
|
||||||
ctrlStr := mockquic.NewMockStream(mockCtrl)
|
|
||||||
ctrlStr.EXPECT().Write(gomock.Any()).AnyTimes()
|
|
||||||
conn.EXPECT().OpenUniStream().Return(ctrlStr, nil)
|
|
||||||
conn.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
|
|
||||||
<-rspWritten
|
|
||||||
return nil, errors.New("done")
|
|
||||||
})
|
|
||||||
conn.EXPECT().AcceptStream(gomock.Any()).Return(str, nil)
|
|
||||||
conn.EXPECT().AcceptStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.Stream, error) {
|
|
||||||
<-rspWritten
|
|
||||||
return nil, errors.New("done")
|
|
||||||
})
|
|
||||||
|
|
||||||
done := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
defer GinkgoRecover()
|
|
||||||
defer close(done)
|
|
||||||
s.handleConn(conn)
|
|
||||||
}()
|
|
||||||
Eventually(rspWritten).Should(BeClosed())
|
|
||||||
time.Sleep(50 * time.Millisecond) // make sure that after str.Write there are no further calls to stream methods
|
|
||||||
Eventually(done).Should(BeClosed())
|
|
||||||
})
|
|
||||||
|
|
||||||
Context("hijacking bidirectional streams", func() {
|
Context("hijacking bidirectional streams", func() {
|
||||||
var conn *mockquic.MockEarlyConnection
|
var conn *mockquic.MockEarlyConnection
|
||||||
testDone := make(chan struct{})
|
testDone := make(chan struct{})
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue