send RST_STREAM frames

ref #380
This commit is contained in:
Marten Seemann 2017-01-06 18:31:32 +07:00
parent c705a19f11
commit 7a91794292
No known key found for this signature in database
GPG key ID: 3603F40B121FCDEA
7 changed files with 176 additions and 38 deletions

View file

@ -31,6 +31,8 @@ type mockFlowControlHandler struct {
triggerConnectionWindowUpdate bool
}
var _ flowcontrol.FlowControlManager = &mockFlowControlHandler{}
func newMockFlowControlHandler() *mockFlowControlHandler {
return &mockFlowControlHandler{
sendWindowSizes: make(map[protocol.StreamID]protocol.ByteCount),
@ -61,8 +63,8 @@ func (m *mockFlowControlHandler) AddBytesRead(streamID protocol.StreamID, n prot
return nil
}
func (m *mockFlowControlHandler) ResetStream(streamID protocol.StreamID, byteOffset protocol.ByteCount) (protocol.ByteCount, error) {
return m.bytesSent, m.UpdateHighestReceived(streamID, byteOffset)
func (m *mockFlowControlHandler) ResetStream(streamID protocol.StreamID, byteOffset protocol.ByteCount) error {
return m.UpdateHighestReceived(streamID, byteOffset)
}
func (m *mockFlowControlHandler) UpdateHighestReceived(streamID protocol.StreamID, byteOffset protocol.ByteCount) error {
@ -108,19 +110,30 @@ var _ = Describe("Stream", func() {
var (
str *stream
onDataCalled bool
resetCalled bool
resetCalledForStream protocol.StreamID
resetCalledAtOffset protocol.ByteCount
)
onData := func() {
onDataCalled = true
}
onReset := func(id protocol.StreamID, offset protocol.ByteCount) {
resetCalled = true
resetCalledForStream = id
resetCalledAtOffset = offset
}
BeforeEach(func() {
onDataCalled = false
resetCalled = false
var streamID protocol.StreamID = 1337
cpm := &mockConnectionParametersManager{}
flowControlManager := flowcontrol.NewFlowControlManager(cpm, &congestion.RTTStats{})
flowControlManager.NewStream(streamID, true)
str, _ = newStream(streamID, onData, flowControlManager)
str, _ = newStream(streamID, onData, onReset, flowControlManager)
})
It("gets stream id", func() {
@ -432,6 +445,43 @@ var _ = Describe("Stream", func() {
Expect(n).To(BeZero())
Expect(err).To(MatchError(testErr))
})
It("calls onReset when receiving a remote error", func() {
var writeReturned bool
str.writeOffset = 0x1000
go func() {
str.Write([]byte("foobar"))
writeReturned = true
}()
str.RegisterRemoteError(testErr)
Expect(resetCalled).To(BeTrue())
Expect(resetCalledForStream).To(Equal(protocol.StreamID(1337)))
Expect(resetCalledAtOffset).To(Equal(protocol.ByteCount(0x1000)))
Eventually(func() bool { return writeReturned }).Should(BeTrue())
})
It("doesn't call onReset if it already sent a FIN", func() {
str.Close()
str.sentFin()
str.RegisterRemoteError(testErr)
Expect(resetCalled).To(BeFalse())
})
It("doesn't call onReset if the stream was reset locally before", func() {
str.Reset(testErr)
Expect(resetCalled).To(BeTrue())
resetCalled = false
str.RegisterRemoteError(testErr)
Expect(resetCalled).To(BeFalse())
})
It("doesn't call onReset twice, when it gets two remote errors", func() {
str.RegisterRemoteError(testErr)
Expect(resetCalled).To(BeTrue())
resetCalled = false
str.RegisterRemoteError(testErr)
Expect(resetCalled).To(BeFalse())
})
})
Context("reset locally", func() {
@ -446,8 +496,7 @@ var _ = Describe("Stream", func() {
}()
Consistently(func() bool { return writeReturned }).Should(BeFalse())
str.Reset(testErr)
data := str.getDataForWriting(6)
Expect(data).To(BeNil())
Expect(str.getDataForWriting(6)).To(BeNil())
Eventually(func() bool { return writeReturned }).Should(BeTrue())
Expect(n).To(BeZero())
Expect(err).To(MatchError(testErr))
@ -458,6 +507,7 @@ var _ = Describe("Stream", func() {
n, err := str.Write([]byte("foobar"))
Expect(n).To(BeZero())
Expect(err).To(MatchError(testErr))
Expect(str.getDataForWriting(6)).To(BeNil())
})
It("stops reading", func() {
@ -487,6 +537,37 @@ var _ = Describe("Stream", func() {
Expect(n).To(BeZero())
Expect(err).To(MatchError(testErr))
})
It("calls onReset", func() {
str.writeOffset = 0x1000
str.Reset(testErr)
Expect(resetCalled).To(BeTrue())
Expect(resetCalledForStream).To(Equal(protocol.StreamID(1337)))
Expect(resetCalledAtOffset).To(Equal(protocol.ByteCount(0x1000)))
})
It("doesn't call onReset if it already sent a FIN", func() {
str.Close()
str.sentFin()
str.Reset(testErr)
Expect(resetCalled).To(BeFalse())
})
It("doesn't call onReset if the stream was reset remotely before", func() {
str.RegisterRemoteError(testErr)
Expect(resetCalled).To(BeTrue())
resetCalled = false
str.Reset(testErr)
Expect(resetCalled).To(BeFalse())
})
It("doesn't call onReset twice", func() {
str.Reset(testErr)
Expect(resetCalled).To(BeTrue())
resetCalled = false
str.Reset(testErr)
Expect(resetCalled).To(BeFalse())
})
})
})