pass RST_STREAM frames directly to the stream

This commit is contained in:
Marten Seemann 2017-12-12 10:12:54 +07:00
parent 2d31440510
commit 03977c1a25
5 changed files with 141 additions and 75 deletions

View file

@ -104,6 +104,18 @@ func (_mr *MockStreamIMockRecorder) HandleMaxStreamDataFrame(arg0 interface{}) *
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "HandleMaxStreamDataFrame", reflect.TypeOf((*MockStreamI)(nil).HandleMaxStreamDataFrame), arg0) return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "HandleMaxStreamDataFrame", reflect.TypeOf((*MockStreamI)(nil).HandleMaxStreamDataFrame), arg0)
} }
// HandleRstStreamFrame mocks base method
func (_m *MockStreamI) HandleRstStreamFrame(_param0 *wire.RstStreamFrame) error {
ret := _m.ctrl.Call(_m, "HandleRstStreamFrame", _param0)
ret0, _ := ret[0].(error)
return ret0
}
// HandleRstStreamFrame indicates an expected call of HandleRstStreamFrame
func (_mr *MockStreamIMockRecorder) HandleRstStreamFrame(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "HandleRstStreamFrame", reflect.TypeOf((*MockStreamI)(nil).HandleRstStreamFrame), arg0)
}
// HandleStreamFrame mocks base method // HandleStreamFrame mocks base method
func (_m *MockStreamI) HandleStreamFrame(_param0 *wire.StreamFrame) error { func (_m *MockStreamI) HandleStreamFrame(_param0 *wire.StreamFrame) error {
ret := _m.ctrl.Call(_m, "HandleStreamFrame", _param0) ret := _m.ctrl.Call(_m, "HandleStreamFrame", _param0)
@ -153,18 +165,6 @@ func (_mr *MockStreamIMockRecorder) Read(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "Read", reflect.TypeOf((*MockStreamI)(nil).Read), arg0) return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "Read", reflect.TypeOf((*MockStreamI)(nil).Read), arg0)
} }
// RegisterRemoteError mocks base method
func (_m *MockStreamI) RegisterRemoteError(_param0 error, _param1 protocol.ByteCount) error {
ret := _m.ctrl.Call(_m, "RegisterRemoteError", _param0, _param1)
ret0, _ := ret[0].(error)
return ret0
}
// RegisterRemoteError indicates an expected call of RegisterRemoteError
func (_mr *MockStreamIMockRecorder) RegisterRemoteError(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "RegisterRemoteError", reflect.TypeOf((*MockStreamI)(nil).RegisterRemoteError), arg0, arg1)
}
// Reset mocks base method // Reset mocks base method
func (_m *MockStreamI) Reset(_param0 error) { func (_m *MockStreamI) Reset(_param0 error) {
_m.ctrl.Call(_m, "Reset", _param0) _m.ctrl.Call(_m, "Reset", _param0)

View file

@ -4,7 +4,6 @@ import (
"context" "context"
"crypto/tls" "crypto/tls"
"errors" "errors"
"fmt"
"net" "net"
"sync" "sync"
"time" "time"
@ -613,7 +612,7 @@ func (s *session) handleRstStreamFrame(frame *wire.RstStreamFrame) error {
// stream is closed and already garbage collected // stream is closed and already garbage collected
return nil return nil
} }
return str.RegisterRemoteError(fmt.Errorf("RST_STREAM received with code %d", frame.ErrorCode), frame.ByteOffset) return str.HandleRstStreamFrame(frame)
} }
func (s *session) handleAckFrame(frame *wire.AckFrame, encLevel protocol.EncryptionLevel) error { func (s *session) handleAckFrame(frame *wire.AckFrame, encLevel protocol.EncryptionLevel) error {

View file

@ -343,29 +343,28 @@ var _ = Describe("Session", func() {
Context("handling RST_STREAM frames", func() { Context("handling RST_STREAM frames", func() {
It("closes the streams for writing", func() { It("closes the streams for writing", func() {
str, err := sess.GetOrOpenStream(5) f := &wire.RstStreamFrame{
Expect(err).ToNot(HaveOccurred())
str.(*mocks.MockStreamI).EXPECT().RegisterRemoteError(
errors.New("RST_STREAM received with code 42"),
protocol.ByteCount(0x1337),
)
err = sess.handleRstStreamFrame(&wire.RstStreamFrame{
StreamID: 5, StreamID: 5,
ErrorCode: 42, ErrorCode: 42,
ByteOffset: 0x1337, ByteOffset: 0x1337,
}) }
str, err := sess.GetOrOpenStream(5)
Expect(err).ToNot(HaveOccurred())
str.(*mocks.MockStreamI).EXPECT().HandleRstStreamFrame(f)
err = sess.handleRstStreamFrame(f)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })
It("returns errors", func() { It("returns errors", func() {
f := &wire.RstStreamFrame{
StreamID: 5,
ByteOffset: 0x1337,
}
testErr := errors.New("flow control violation") testErr := errors.New("flow control violation")
str, err := sess.GetOrOpenStream(5) str, err := sess.GetOrOpenStream(5)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
str.(*mocks.MockStreamI).EXPECT().RegisterRemoteError(gomock.Any(), gomock.Any()).Return(testErr) str.(*mocks.MockStreamI).EXPECT().HandleRstStreamFrame(f).Return(testErr)
err = sess.handleRstStreamFrame(&wire.RstStreamFrame{ err = sess.handleRstStreamFrame(f)
StreamID: 5,
ByteOffset: 0x1337,
})
Expect(err).To(MatchError(testErr)) Expect(err).To(MatchError(testErr))
}) })

View file

@ -18,7 +18,7 @@ type streamI interface {
Stream Stream
HandleStreamFrame(*wire.StreamFrame) error HandleStreamFrame(*wire.StreamFrame) error
RegisterRemoteError(error, protocol.ByteCount) error HandleRstStreamFrame(*wire.RstStreamFrame) error
PopStreamFrame(maxBytes protocol.ByteCount) *wire.StreamFrame PopStreamFrame(maxBytes protocol.ByteCount) *wire.StreamFrame
Finished() bool Finished() bool
Cancel(error) Cancel(error)
@ -59,7 +59,7 @@ type stream struct {
finishedWriting utils.AtomicBool finishedWriting utils.AtomicBool
// resetLocally is set if Reset() is called // resetLocally is set if Reset() is called
resetLocally utils.AtomicBool resetLocally utils.AtomicBool
// resetRemotely is set if RegisterRemoteError() is called // resetRemotely is set if HandleRstStreamFrame() is called
resetRemotely utils.AtomicBool resetRemotely utils.AtomicBool
frameQueue *streamFrameSorter frameQueue *streamFrameSorter
@ -433,8 +433,7 @@ func (s *stream) Reset(err error) {
s.mutex.Unlock() s.mutex.Unlock()
} }
// resets the stream remotely func (s *stream) HandleRstStreamFrame(frame *wire.RstStreamFrame) error {
func (s *stream) RegisterRemoteError(err error, offset protocol.ByteCount) error {
if s.resetRemotely.Get() { if s.resetRemotely.Get() {
return nil return nil
} }
@ -443,10 +442,10 @@ func (s *stream) RegisterRemoteError(err error, offset protocol.ByteCount) error
s.ctxCancel() s.ctxCancel()
// errors must not be changed! // errors must not be changed!
if s.err == nil { if s.err == nil {
s.err = err s.err = fmt.Errorf("RST_STREAM received with code %d", frame.ErrorCode)
s.signalWrite() s.signalWrite()
} }
if err := s.flowController.UpdateHighestReceived(offset, true); err != nil { if err := s.flowController.UpdateHighestReceived(frame.ByteOffset, true); err != nil {
return err return err
} }
if s.shouldSendReset() { if s.shouldSendReset() {

View file

@ -478,8 +478,6 @@ var _ = Describe("Stream", func() {
}) })
Context("resetting", func() { Context("resetting", func() {
testErr := errors.New("testErr")
Context("reset by the peer", func() { Context("reset by the peer", func() {
It("continues reading after receiving a remote error", func() { It("continues reading after receiving a remote error", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false) mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
@ -489,22 +487,30 @@ var _ = Describe("Stream", func() {
Data: []byte{0xDE, 0xAD, 0xBE, 0xEF}, Data: []byte{0xDE, 0xAD, 0xBE, 0xEF},
} }
str.HandleStreamFrame(&frame) str.HandleStreamFrame(&frame)
str.RegisterRemoteError(testErr, 10) err := str.HandleRstStreamFrame(&wire.RstStreamFrame{
StreamID: streamID,
ByteOffset: 10,
})
Expect(err).ToNot(HaveOccurred())
b := make([]byte, 4) b := make([]byte, 4)
n, err := strWithTimeout.Read(b) n, err := strWithTimeout.Read(b)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(n).To(Equal(4)) Expect(n).To(Equal(4))
}) })
It("reads a delayed StreamFrame that arrives after receiving a remote error", func() { It("reads a delayed STREAM frame that arrives after receiving a remote error", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), true) mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), true)
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false) mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
str.RegisterRemoteError(testErr, 4) err := str.HandleRstStreamFrame(&wire.RstStreamFrame{
StreamID: streamID,
ByteOffset: 4,
})
Expect(err).ToNot(HaveOccurred())
frame := wire.StreamFrame{ frame := wire.StreamFrame{
Offset: 0, Offset: 0,
Data: []byte{0xDE, 0xAD, 0xBE, 0xEF}, Data: []byte{0xDE, 0xAD, 0xBE, 0xEF},
} }
err := str.HandleStreamFrame(&frame) err = str.HandleStreamFrame(&frame)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
b := make([]byte, 4) b := make([]byte, 4)
n, err := strWithTimeout.Read(b) n, err := strWithTimeout.Read(b)
@ -520,15 +526,20 @@ var _ = Describe("Stream", func() {
Data: []byte{0xDE, 0xAD, 0xBE, 0xEF}, Data: []byte{0xDE, 0xAD, 0xBE, 0xEF},
} }
str.HandleStreamFrame(&frame) str.HandleStreamFrame(&frame)
str.RegisterRemoteError(testErr, 8) err := str.HandleRstStreamFrame(&wire.RstStreamFrame{
StreamID: streamID,
ByteOffset: 8,
ErrorCode: 1337,
})
Expect(err).ToNot(HaveOccurred())
b := make([]byte, 10) b := make([]byte, 10)
n, err := strWithTimeout.Read(b) n, err := strWithTimeout.Read(b)
Expect(b[0:4]).To(Equal(frame.Data)) Expect(b[0:4]).To(Equal(frame.Data))
Expect(err).To(MatchError(testErr)) Expect(err).To(MatchError("RST_STREAM received with code 1337"))
Expect(n).To(Equal(4)) Expect(n).To(Equal(4))
}) })
It("returns an EOF when reading past the offset, if the stream received a finbit", func() { It("returns an EOF when reading past the offset, if the stream received a FIN bit", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), true) mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), true)
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(8), true) mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(8), true)
frame := wire.StreamFrame{ frame := wire.StreamFrame{
@ -537,7 +548,11 @@ var _ = Describe("Stream", func() {
FinBit: true, FinBit: true,
} }
str.HandleStreamFrame(&frame) str.HandleStreamFrame(&frame)
str.RegisterRemoteError(testErr, 8) err := str.HandleRstStreamFrame(&wire.RstStreamFrame{
StreamID: streamID,
ByteOffset: 8,
})
Expect(err).ToNot(HaveOccurred())
b := make([]byte, 10) b := make([]byte, 10)
n, err := strWithTimeout.Read(b) n, err := strWithTimeout.Read(b)
Expect(b[:4]).To(Equal(frame.Data)) Expect(b[:4]).To(Equal(frame.Data))
@ -554,9 +569,12 @@ var _ = Describe("Stream", func() {
FinBit: true, FinBit: true,
} }
str.HandleStreamFrame(&frame) str.HandleStreamFrame(&frame)
str.RegisterRemoteError(testErr, 4) err := str.HandleRstStreamFrame(&wire.RstStreamFrame{
StreamID: streamID,
ByteOffset: 4,
})
b := make([]byte, 3) b := make([]byte, 3)
_, err := strWithTimeout.Read(b) _, err = strWithTimeout.Read(b)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(b).To(Equal([]byte{0xde, 0xad, 0xbe})) Expect(b).To(Equal([]byte{0xde, 0xad, 0xbe}))
b = make([]byte, 3) b = make([]byte, 3)
@ -576,27 +594,36 @@ var _ = Describe("Stream", func() {
Data: []byte{0xDE, 0xAD, 0xBE, 0xEF}, Data: []byte{0xDE, 0xAD, 0xBE, 0xEF},
} }
str.HandleStreamFrame(&frame) str.HandleStreamFrame(&frame)
str.RegisterRemoteError(testErr, 10) err := str.HandleRstStreamFrame(&wire.RstStreamFrame{
StreamID: streamID,
ByteOffset: 10,
})
Expect(err).ToNot(HaveOccurred())
b := make([]byte, 3) b := make([]byte, 3)
_, err := strWithTimeout.Read(b) _, err = strWithTimeout.Read(b)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })
It("stops writing after receiving a remote error", func() { It("stops writing after receiving a remote error", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(10), true) mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(8), true)
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
defer GinkgoRecover() defer GinkgoRecover()
n, err := strWithTimeout.Write([]byte("foobar")) n, err := strWithTimeout.Write([]byte("foobar"))
Expect(n).To(BeZero()) Expect(n).To(BeZero())
Expect(err).To(MatchError(testErr)) Expect(err).To(MatchError("RST_STREAM received with code 1337"))
close(done) close(done)
}() }()
str.RegisterRemoteError(testErr, 10) err := str.HandleRstStreamFrame(&wire.RstStreamFrame{
StreamID: streamID,
ByteOffset: 8,
ErrorCode: 1337,
})
Expect(err).ToNot(HaveOccurred())
Eventually(done).Should(BeClosed()) Eventually(done).Should(BeClosed())
}) })
It("returns how much was written when recieving a remote error", func() { It("returns how much was written when receiving a remote error", func() {
frameHeaderSize := protocol.ByteCount(4) frameHeaderSize := protocol.ByteCount(4)
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(10), true) mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(10), true)
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)) mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
@ -605,7 +632,7 @@ var _ = Describe("Stream", func() {
go func() { go func() {
defer GinkgoRecover() defer GinkgoRecover()
n, err := strWithTimeout.Write([]byte("foobar")) n, err := strWithTimeout.Write([]byte("foobar"))
Expect(err).To(MatchError(testErr)) Expect(err).To(MatchError("RST_STREAM received with code 1337"))
Expect(n).To(Equal(4)) Expect(n).To(Equal(4))
close(done) close(done)
}() }()
@ -614,22 +641,31 @@ var _ = Describe("Stream", func() {
Eventually(func() *wire.StreamFrame { frame = str.PopStreamFrame(4 + frameHeaderSize); return frame }).ShouldNot(BeNil()) Eventually(func() *wire.StreamFrame { frame = str.PopStreamFrame(4 + frameHeaderSize); return frame }).ShouldNot(BeNil())
Expect(frame).ToNot(BeNil()) Expect(frame).ToNot(BeNil())
Expect(frame.DataLen()).To(BeEquivalentTo(4)) Expect(frame.DataLen()).To(BeEquivalentTo(4))
str.RegisterRemoteError(testErr, 10) err := str.HandleRstStreamFrame(&wire.RstStreamFrame{
StreamID: streamID,
ByteOffset: 10,
ErrorCode: 1337,
})
Expect(err).ToNot(HaveOccurred())
Eventually(done).Should(BeClosed()) Eventually(done).Should(BeClosed())
}) })
It("calls onReset when receiving a remote error", func() { It("calls queues a RST_STREAM frame when receiving a remote error", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true) mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(10), true)
done := make(chan struct{}) done := make(chan struct{})
str.writeOffset = 0x1000 str.writeOffset = 0x1000
go func() { go func() {
_, _ = strWithTimeout.Write([]byte("foobar")) _, _ = strWithTimeout.Write([]byte("foobar"))
close(done) close(done)
}() }()
str.RegisterRemoteError(testErr, 0) err := str.HandleRstStreamFrame(&wire.RstStreamFrame{
StreamID: streamID,
ByteOffset: 10,
})
Expect(err).ToNot(HaveOccurred())
Expect(queuedControlFrames).To(Equal([]wire.Frame{ Expect(queuedControlFrames).To(Equal([]wire.Frame{
&wire.RstStreamFrame{ &wire.RstStreamFrame{
StreamID: 1337, StreamID: streamID,
ByteOffset: 0x1000, ByteOffset: 0x1000,
}, },
})) }))
@ -637,32 +673,50 @@ var _ = Describe("Stream", func() {
}) })
It("doesn't call onReset if it already sent a FIN", func() { It("doesn't call onReset if it already sent a FIN", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true) mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(10), true)
str.Close() str.Close()
f := str.PopStreamFrame(100) f := str.PopStreamFrame(100)
Expect(f.FinBit).To(BeTrue()) Expect(f.FinBit).To(BeTrue())
str.RegisterRemoteError(testErr, 0) err := str.HandleRstStreamFrame(&wire.RstStreamFrame{
StreamID: streamID,
ByteOffset: 10,
})
Expect(err).ToNot(HaveOccurred())
Expect(queuedControlFrames).To(BeEmpty()) Expect(queuedControlFrames).To(BeEmpty())
}) })
It("doesn't call queue a RST_STREAM if the stream was reset locally before", func() { It("doesn't queue a RST_STREAM if the stream was reset locally before", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true) mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(10), true)
str.Reset(testErr) str.Reset(errors.New("reset"))
Expect(queuedControlFrames).To(HaveLen(1)) Expect(queuedControlFrames).To(HaveLen(1))
str.RegisterRemoteError(testErr, 0) err := str.HandleRstStreamFrame(&wire.RstStreamFrame{
StreamID: streamID,
ByteOffset: 10,
})
Expect(err).ToNot(HaveOccurred())
Expect(queuedControlFrames).To(HaveLen(1)) // no additional queued frame Expect(queuedControlFrames).To(HaveLen(1)) // no additional queued frame
}) })
It("doesn't queue two RST_STREAMs twice, when it gets two remote errors", func() { It("doesn't queue two RST_STREAMs twice, when it gets two remote errors", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true) mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(8), true)
str.RegisterRemoteError(testErr, 0) err := str.HandleRstStreamFrame(&wire.RstStreamFrame{
StreamID: streamID,
ByteOffset: 8,
})
Expect(err).ToNot(HaveOccurred())
Expect(queuedControlFrames).To(HaveLen(1))
err = str.HandleRstStreamFrame(&wire.RstStreamFrame{
StreamID: streamID,
ByteOffset: 9,
})
Expect(err).ToNot(HaveOccurred())
Expect(queuedControlFrames).To(HaveLen(1)) Expect(queuedControlFrames).To(HaveLen(1))
str.RegisterRemoteError(testErr, 0)
Expect(queuedControlFrames).To(HaveLen(1)) // no additional queued frame
}) })
}) })
Context("reset locally", func() { Context("reset locally", func() {
testErr := errors.New("test error")
It("stops writing", func() { It("stops writing", func() {
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
@ -733,11 +787,14 @@ var _ = Describe("Stream", func() {
}) })
It("doesn't queue a new RST_STREAM, if the stream was reset remotely before", func() { It("doesn't queue a new RST_STREAM, if the stream was reset remotely before", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true) mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(10), true)
str.RegisterRemoteError(testErr, 0) err := str.HandleRstStreamFrame(&wire.RstStreamFrame{
Expect(queuedControlFrames).To(HaveLen(1)) StreamID: streamID,
ByteOffset: 10,
})
Expect(err).ToNot(HaveOccurred())
str.Reset(testErr) str.Reset(testErr)
Expect(queuedControlFrames).To(HaveLen(1)) // no additional queued frame Expect(queuedControlFrames).To(HaveLen(1))
}) })
It("doesn't call onReset twice", func() { It("doesn't call onReset twice", func() {
@ -1037,7 +1094,10 @@ var _ = Describe("Stream", func() {
It("is finished after receiving a RST and sending one", func() { It("is finished after receiving a RST and sending one", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true) mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true)
// this directly sends a rst // this directly sends a rst
str.RegisterRemoteError(testErr, 0) str.HandleRstStreamFrame(&wire.RstStreamFrame{
StreamID: streamID,
ByteOffset: 0,
})
Expect(str.rstSent.Get()).To(BeTrue()) Expect(str.rstSent.Get()).To(BeTrue())
Expect(str.Finished()).To(BeTrue()) Expect(str.Finished()).To(BeTrue())
}) })
@ -1045,7 +1105,10 @@ var _ = Describe("Stream", func() {
It("cancels the context after receiving a RST", func() { It("cancels the context after receiving a RST", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true) mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true)
Expect(str.Context().Done()).ToNot(BeClosed()) Expect(str.Context().Done()).ToNot(BeClosed())
str.RegisterRemoteError(testErr, 0) str.HandleRstStreamFrame(&wire.RstStreamFrame{
StreamID: streamID,
ByteOffset: 0,
})
Expect(str.Context().Done()).To(BeClosed()) Expect(str.Context().Done()).To(BeClosed())
}) })
@ -1053,7 +1116,10 @@ var _ = Describe("Stream", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(13), true) mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(13), true)
str.Reset(testErr) str.Reset(testErr)
Expect(str.Finished()).To(BeFalse()) Expect(str.Finished()).To(BeFalse())
str.RegisterRemoteError(testErr, 13) str.HandleRstStreamFrame(&wire.RstStreamFrame{
StreamID: streamID,
ByteOffset: 13,
})
Expect(str.Finished()).To(BeTrue()) Expect(str.Finished()).To(BeTrue())
}) })
@ -1062,7 +1128,10 @@ var _ = Describe("Stream", func() {
str.Close() str.Close()
f := str.PopStreamFrame(1000) f := str.PopStreamFrame(1000)
Expect(f.FinBit).To(BeTrue()) Expect(f.FinBit).To(BeTrue())
str.RegisterRemoteError(testErr, 13) str.HandleRstStreamFrame(&wire.RstStreamFrame{
StreamID: streamID,
ByteOffset: 13,
})
Expect(str.Finished()).To(BeTrue()) Expect(str.Finished()).To(BeTrue())
}) })