rename stream.Cancel to stream.CloseForShutdown

This commit is contained in:
Marten Seemann 2017-12-13 16:32:32 +07:00
parent d28f09837e
commit 8c5741ae79
8 changed files with 35 additions and 34 deletions

View file

@ -13,7 +13,7 @@ type cryptoStreamI interface {
io.Writer io.Writer
HandleStreamFrame(*wire.StreamFrame) error HandleStreamFrame(*wire.StreamFrame) error
PopStreamFrame(protocol.ByteCount) *wire.StreamFrame PopStreamFrame(protocol.ByteCount) *wire.StreamFrame
Cancel(error) CloseForShutdown(error)
HasDataForWriting() bool HasDataForWriting() bool
SetReadOffset(protocol.ByteCount) SetReadOffset(protocol.ByteCount)
// methods needed for flow control // methods needed for flow control

View file

@ -36,16 +36,6 @@ func (_m *MockStreamI) EXPECT() *MockStreamIMockRecorder {
return _m.recorder return _m.recorder
} }
// Cancel mocks base method
func (_m *MockStreamI) Cancel(_param0 error) {
_m.ctrl.Call(_m, "Cancel", _param0)
}
// Cancel indicates an expected call of Cancel
func (_mr *MockStreamIMockRecorder) Cancel(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "Cancel", reflect.TypeOf((*MockStreamI)(nil).Cancel), arg0)
}
// Close mocks base method // Close mocks base method
func (_m *MockStreamI) Close() error { func (_m *MockStreamI) Close() error {
ret := _m.ctrl.Call(_m, "Close") ret := _m.ctrl.Call(_m, "Close")
@ -58,6 +48,16 @@ func (_mr *MockStreamIMockRecorder) Close() *gomock.Call {
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "Close", reflect.TypeOf((*MockStreamI)(nil).Close)) return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "Close", reflect.TypeOf((*MockStreamI)(nil).Close))
} }
// CloseForShutdown mocks base method
func (_m *MockStreamI) CloseForShutdown(_param0 error) {
_m.ctrl.Call(_m, "CloseForShutdown", _param0)
}
// CloseForShutdown indicates an expected call of CloseForShutdown
func (_mr *MockStreamIMockRecorder) CloseForShutdown(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "CloseForShutdown", reflect.TypeOf((*MockStreamI)(nil).CloseForShutdown), arg0)
}
// Context mocks base method // Context mocks base method
func (_m *MockStreamI) Context() context.Context { func (_m *MockStreamI) Context() context.Context {
ret := _m.ctrl.Call(_m, "Context") ret := _m.ctrl.Call(_m, "Context")

View file

@ -656,7 +656,7 @@ func (s *session) handleCloseError(closeErr closeError) error {
utils.Errorf("Closing session with error: %s", closeErr.err.Error()) utils.Errorf("Closing session with error: %s", closeErr.err.Error())
} }
s.cryptoStream.Cancel(quicErr) s.cryptoStream.CloseForShutdown(quicErr)
s.streamsMap.CloseWithError(quicErr) s.streamsMap.CloseWithError(quicErr)
if closeErr.err == errCloseSessionForNewVersion || closeErr.err == handshake.ErrCloseSessionForRetry { if closeErr.err == errCloseSessionForNewVersion || closeErr.err == handshake.ErrCloseSessionForRetry {

View file

@ -498,7 +498,7 @@ var _ = Describe("Session", func() {
_, err := sess.GetOrOpenStream(5) _, err := sess.GetOrOpenStream(5)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
sess.streamsMap.Range(func(s streamI) { sess.streamsMap.Range(func(s streamI) {
s.(*mocks.MockStreamI).EXPECT().Cancel(gomock.Any()) s.(*mocks.MockStreamI).EXPECT().CloseForShutdown(gomock.Any())
}) })
err = sess.handleFrames([]wire.Frame{&wire.ConnectionCloseFrame{ErrorCode: qerr.ProofInvalid, ReasonPhrase: "foobar"}}, protocol.EncryptionUnspecified) err = sess.handleFrames([]wire.Frame{&wire.ConnectionCloseFrame{ErrorCode: qerr.ProofInvalid, ReasonPhrase: "foobar"}}, protocol.EncryptionUnspecified)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -1397,7 +1397,7 @@ var _ = Describe("Session", func() {
str, err := sess.GetOrOpenStream(9) str, err := sess.GetOrOpenStream(9)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
str.Close() str.Close()
str.(*stream).Cancel(nil) str.(*stream).CloseForShutdown(nil)
Expect(str.(*stream).Finished()).To(BeTrue()) Expect(str.(*stream).Finished()).To(BeTrue())
err = sess.streamsMap.DeleteClosedStreams() err = sess.streamsMap.DeleteClosedStreams()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())

View file

@ -21,7 +21,7 @@ type streamI interface {
HandleRstStreamFrame(*wire.RstStreamFrame) error HandleRstStreamFrame(*wire.RstStreamFrame) error
PopStreamFrame(maxBytes protocol.ByteCount) *wire.StreamFrame PopStreamFrame(maxBytes protocol.ByteCount) *wire.StreamFrame
Finished() bool Finished() bool
Cancel(error) CloseForShutdown(error)
// methods needed for flow control // methods needed for flow control
GetWindowUpdate() protocol.ByteCount GetWindowUpdate() protocol.ByteCount
HandleMaxStreamDataFrame(*wire.MaxStreamDataFrame) HandleMaxStreamDataFrame(*wire.MaxStreamDataFrame)
@ -51,8 +51,8 @@ type stream struct {
// Once set, the errors must not be changed! // Once set, the errors must not be changed!
err error err error
// cancelled is set when Cancel() is called // closedForShutdown is set when Cancel() is called
cancelled utils.AtomicBool closedForShutdown utils.AtomicBool
// finishedReading is set once we read a frame with a FinBit // finishedReading is set once we read a frame with a FinBit
finishedReading utils.AtomicBool finishedReading utils.AtomicBool
// finisedWriting is set once Close() is called // finisedWriting is set once Close() is called
@ -113,7 +113,7 @@ func (s *stream) Read(p []byte) (int, error) {
s.mutex.Lock() s.mutex.Lock()
err := s.err err := s.err
s.mutex.Unlock() s.mutex.Unlock()
if s.cancelled.Get() || s.resetLocally.Get() { if s.closedForShutdown.Get() || s.resetLocally.Get() {
return 0, err return 0, err
} }
if s.finishedReading.Get() { if s.finishedReading.Get() {
@ -133,7 +133,7 @@ func (s *stream) Read(p []byte) (int, error) {
var err error var err error
for { for {
// Stop waiting on errors // Stop waiting on errors
if s.resetLocally.Get() || s.cancelled.Get() { if s.resetLocally.Get() || s.closedForShutdown.Get() {
err = s.err err = s.err
break break
} }
@ -393,11 +393,12 @@ func (s *stream) CloseRemote(offset protocol.ByteCount) {
s.HandleStreamFrame(&wire.StreamFrame{FinBit: true, Offset: offset}) s.HandleStreamFrame(&wire.StreamFrame{FinBit: true, Offset: offset})
} }
// Cancel is called by session to indicate that an error occurred // CloseForShutdown closes a stream abruptly.
// The stream should will be closed immediately // It makes Read and Write unblock (and return the error) immediately.
func (s *stream) Cancel(err error) { // The peer will NOT be informed about this: the stream is closed without sending a FIN or RST.
func (s *stream) CloseForShutdown(err error) {
s.mutex.Lock() s.mutex.Lock()
s.cancelled.Set(true) s.closedForShutdown.Set(true)
s.ctxCancel() s.ctxCancel()
// errors must not be changed! // errors must not be changed!
if s.err == nil { if s.err == nil {
@ -465,7 +466,7 @@ func (s *stream) finishedWriteAndSentFin() bool {
} }
func (s *stream) Finished() bool { func (s *stream) Finished() bool {
return s.cancelled.Get() || return s.closedForShutdown.Get() ||
(s.finishedReading.Get() && s.finishedWriteAndSentFin()) || (s.finishedReading.Get() && s.finishedWriteAndSentFin()) ||
(s.resetRemotely.Get() && s.rstSent.Get()) || (s.resetRemotely.Get() && s.rstSent.Get()) ||
(s.finishedReading.Get() && s.rstSent.Get()) || (s.finishedReading.Get() && s.rstSent.Get()) ||

View file

@ -457,12 +457,12 @@ var _ = Describe("Stream", func() {
close(done) close(done)
}() }()
Consistently(done).ShouldNot(BeClosed()) Consistently(done).ShouldNot(BeClosed())
str.Cancel(testErr) str.CloseForShutdown(testErr)
Eventually(done).Should(BeClosed()) Eventually(done).Should(BeClosed())
}) })
It("errors for all following reads", func() { It("errors for all following reads", func() {
str.Cancel(testErr) str.CloseForShutdown(testErr)
b := make([]byte, 1) b := make([]byte, 1)
n, err := strWithTimeout.Read(b) n, err := strWithTimeout.Read(b)
Expect(n).To(BeZero()) Expect(n).To(BeZero())
@ -471,7 +471,7 @@ var _ = Describe("Stream", func() {
It("cancels the context", func() { It("cancels the context", func() {
Expect(str.Context().Done()).ToNot(BeClosed()) Expect(str.Context().Done()).ToNot(BeClosed())
str.Cancel(testErr) str.CloseForShutdown(testErr)
Expect(str.Context().Done()).To(BeClosed()) Expect(str.Context().Done()).To(BeClosed())
}) })
}) })
@ -1001,7 +1001,7 @@ var _ = Describe("Stream", func() {
}) })
It("doesn't allow FIN after an error", func() { It("doesn't allow FIN after an error", func() {
str.Cancel(errors.New("test")) str.CloseForShutdown(errors.New("test"))
f := str.PopStreamFrame(1000) f := str.PopStreamFrame(1000)
Expect(f).To(BeNil()) Expect(f).To(BeNil())
}) })
@ -1016,11 +1016,11 @@ var _ = Describe("Stream", func() {
}) })
}) })
Context("cancelling", func() { Context("closing abruptly", func() {
testErr := errors.New("test") testErr := errors.New("test")
It("returns errors when the stream is cancelled", func() { It("returns errors when the stream is cancelled", func() {
str.Cancel(testErr) str.CloseForShutdown(testErr)
n, err := strWithTimeout.Write([]byte("foo")) n, err := strWithTimeout.Write([]byte("foo"))
Expect(n).To(BeZero()) Expect(n).To(BeZero())
Expect(err).To(MatchError(testErr)) Expect(err).To(MatchError(testErr))
@ -1037,7 +1037,7 @@ var _ = Describe("Stream", func() {
close(done) close(done)
}() }()
Eventually(func() *wire.StreamFrame { return str.PopStreamFrame(50) }).ShouldNot(BeNil()) // get a STREAM frame containing some data, but not all Eventually(func() *wire.StreamFrame { return str.PopStreamFrame(50) }).ShouldNot(BeNil()) // get a STREAM frame containing some data, but not all
str.Cancel(testErr) str.CloseForShutdown(testErr)
Expect(str.PopStreamFrame(1000)).To(BeNil()) Expect(str.PopStreamFrame(1000)).To(BeNil())
Eventually(done).Should(BeClosed()) Eventually(done).Should(BeClosed())
}) })
@ -1068,7 +1068,7 @@ var _ = Describe("Stream", func() {
} }
It("is finished after it is canceled", func() { It("is finished after it is canceled", func() {
str.Cancel(testErr) str.CloseForShutdown(testErr)
Expect(str.Finished()).To(BeTrue()) Expect(str.Finished()).To(BeTrue())
}) })

View file

@ -317,7 +317,7 @@ func (m *streamsMap) CloseWithError(err error) {
m.nextStreamOrErrCond.Broadcast() m.nextStreamOrErrCond.Broadcast()
m.openStreamOrErrCond.Broadcast() m.openStreamOrErrCond.Broadcast()
for _, s := range m.openStreams { for _, s := range m.openStreams {
m.streams[s].Cancel(err) m.streams[s].CloseForShutdown(err)
} }
} }

View file

@ -245,7 +245,7 @@ var _ = Describe("Streams Map", func() {
testErr := errors.New("test error") testErr := errors.New("test error")
openMaxNumStreams() openMaxNumStreams()
for _, str := range m.streams { for _, str := range m.streams {
str.(*mocks.MockStreamI).EXPECT().Cancel(testErr) str.(*mocks.MockStreamI).EXPECT().CloseForShutdown(testErr)
} }
done := make(chan struct{}) done := make(chan struct{})