only declare send stream completed when RESET_STREAM is acknowledged (#4613)

This commit is contained in:
Marten Seemann 2024-08-06 16:55:36 -07:00 committed by GitHub
parent 7c471aac74
commit 46fc42d0da
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 105 additions and 47 deletions

View file

@ -689,10 +689,7 @@ var _ = Describe("Send Stream", func() {
Context("stream cancellations", func() {
Context("canceling writing", func() {
It("queues a RESET_STREAM frame", func() {
gomock.InOrder(
mockSender.EXPECT().onHasStreamControlFrame(streamID, gomock.Any()),
mockSender.EXPECT().onStreamCompleted(streamID),
)
mockSender.EXPECT().onHasStreamControlFrame(streamID, gomock.Any())
str.writeOffset = 1234
str.CancelWrite(9876)
cf, ok, hasMore := str.getControlFrame()
@ -705,6 +702,20 @@ var _ = Describe("Send Stream", func() {
Expect(hasMore).To(BeFalse())
})
It("retransmits a RESET_STREAM frame", func() {
mockSender.EXPECT().onHasStreamControlFrame(streamID, gomock.Any())
str.CancelWrite(9876)
cf, ok, _ := str.getControlFrame()
Expect(ok).To(BeTrue())
Expect(cf.Frame).To(BeAssignableToTypeOf(&wire.ResetStreamFrame{}))
mockSender.EXPECT().onHasStreamControlFrame(streamID, gomock.Any())
cf.Handler.OnLost(cf.Frame)
cf2, ok, _ := str.getControlFrame()
Expect(ok).To(BeTrue())
Expect(cf2.Frame).To(Equal(cf.Frame))
})
// This test is inherently racy, as it tests a concurrent call to Write() and CancelRead().
// A single successful run of this test therefore doesn't mean a lot,
// for reliable results it has to be run many times.
@ -753,7 +764,6 @@ var _ = Describe("Send Stream", func() {
frame, ok, _ := str.popStreamFrame(50, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(frame).ToNot(BeNil())
mockSender.EXPECT().onStreamCompleted(streamID)
str.CancelWrite(1234)
Eventually(writeReturned).Should(BeClosed())
Expect(n).To(BeEquivalentTo(frame.Frame.DataLen()))
@ -775,7 +785,6 @@ var _ = Describe("Send Stream", func() {
Expect(ok).To(BeTrue())
Expect(hasMoreData).To(BeTrue())
Expect(frame).ToNot(BeNil())
mockSender.EXPECT().onStreamCompleted(streamID)
str.CancelWrite(1234)
_, ok, hasMoreData = str.popStreamFrame(10, protocol.Version1)
Expect(ok).To(BeFalse())
@ -804,7 +813,6 @@ var _ = Describe("Send Stream", func() {
Expect(ok).To(BeTrue())
Expect(hasMoreData).To(BeTrue())
Expect(frame).ToNot(BeNil())
mockSender.EXPECT().onStreamCompleted(streamID)
str.CancelWrite(1234)
_, ok, hasMoreData = str.popStreamFrame(10, protocol.Version1)
Expect(ok).To(BeFalse())
@ -828,14 +836,12 @@ var _ = Describe("Send Stream", func() {
Expect(ok).To(BeTrue())
Expect(hasMoreData).To(BeTrue())
Expect(frame).ToNot(BeNil())
mockSender.EXPECT().onStreamCompleted(streamID)
str.CancelWrite(1234)
frame.Handler.OnAcked(frame.Frame)
})
It("cancels the context", func() {
mockSender.EXPECT().onHasStreamControlFrame(gomock.Any(), gomock.Any())
mockSender.EXPECT().onStreamCompleted(gomock.Any())
Expect(str.Context().Done()).ToNot(BeClosed())
str.CancelWrite(1234)
Expect(str.Context().Done()).To(BeClosed())
@ -845,7 +851,6 @@ var _ = Describe("Send Stream", func() {
It("doesn't allow further calls to Write", func() {
mockSender.EXPECT().onHasStreamControlFrame(gomock.Any(), gomock.Any())
mockSender.EXPECT().onStreamCompleted(gomock.Any())
str.CancelWrite(1234)
_, err := strWithTimeout.Write([]byte("foobar"))
Expect(err).To(MatchError(&StreamError{
@ -857,7 +862,6 @@ var _ = Describe("Send Stream", func() {
It("only cancels once", func() {
mockSender.EXPECT().onHasStreamControlFrame(streamID, gomock.Any())
mockSender.EXPECT().onStreamCompleted(gomock.Any())
str.CancelWrite(1234)
str.CancelWrite(4321)
cf, ok, hasMore := str.getControlFrame()
@ -873,7 +877,6 @@ var _ = Describe("Send Stream", func() {
It("queues a RESET_STREAM frame, even if the stream was already closed", func() {
mockSender.EXPECT().onHasStreamData(streamID, str)
mockSender.EXPECT().onHasStreamControlFrame(streamID, str)
mockSender.EXPECT().onStreamCompleted(gomock.Any())
Expect(str.Close()).To(Succeed())
// don't EXPECT any calls to queueControlFrame
str.CancelWrite(123)
@ -909,7 +912,6 @@ var _ = Describe("Send Stream", func() {
ErrorCode: 101,
})
mockSender.EXPECT().onStreamCompleted(gomock.Any())
str.CancelWrite(101)
})
@ -919,7 +921,6 @@ var _ = Describe("Send Stream", func() {
done := make(chan struct{})
go func() {
defer GinkgoRecover()
mockSender.EXPECT().onStreamCompleted(gomock.Any())
_, err := str.Write(getData(5000))
Expect(err).To(Equal(&StreamError{
StreamID: streamID,
@ -942,7 +943,6 @@ var _ = Describe("Send Stream", func() {
StreamID: streamID,
ErrorCode: 123,
})
mockSender.EXPECT().onStreamCompleted(gomock.Any())
_, err := str.Write([]byte("foobar"))
Expect(err).To(Equal(&StreamError{
StreamID: streamID,
@ -952,12 +952,11 @@ var _ = Describe("Send Stream", func() {
})
It("handles Close after STOP_SENDING", func() {
mockSender.EXPECT().onHasStreamControlFrame(gomock.Any(), gomock.Any())
mockSender.EXPECT().onHasStreamControlFrame(streamID, str)
str.handleStopSendingFrame(&wire.StopSendingFrame{
StreamID: streamID,
ErrorCode: 123,
})
mockSender.EXPECT().onStreamCompleted(gomock.Any())
str.Close()
})
@ -966,10 +965,7 @@ var _ = Describe("Send Stream", func() {
str.Close()
_, ok, _ := str.popStreamFrame(protocol.MaxByteCount, protocol.Version1)
Expect(ok).To(BeTrue())
gomock.InOrder(
mockSender.EXPECT().onHasStreamControlFrame(gomock.Any(), gomock.Any()),
mockSender.EXPECT().onStreamCompleted(gomock.Any()),
)
mockSender.EXPECT().onHasStreamControlFrame(gomock.Any(), gomock.Any())
str.handleStopSendingFrame(&wire.StopSendingFrame{
StreamID: streamID,
ErrorCode: 123,
@ -979,10 +975,7 @@ var _ = Describe("Send Stream", func() {
It("handles STOP_SENDING after Close, but before sending the FIN", func() {
mockSender.EXPECT().onHasStreamData(gomock.Any(), gomock.Any())
str.Close()
gomock.InOrder(
mockSender.EXPECT().onHasStreamControlFrame(gomock.Any(), gomock.Any()),
mockSender.EXPECT().onStreamCompleted(gomock.Any()),
)
mockSender.EXPECT().onHasStreamControlFrame(gomock.Any(), gomock.Any())
str.handleStopSendingFrame(&wire.StopSendingFrame{
StreamID: streamID,
ErrorCode: 123,
@ -1093,10 +1086,7 @@ var _ = Describe("Send Stream", func() {
Expect(ok).To(BeTrue())
Eventually(done).Should(BeClosed())
Expect(f).ToNot(BeNil())
gomock.InOrder(
mockSender.EXPECT().onHasStreamControlFrame(gomock.Any(), gomock.Any()),
mockSender.EXPECT().onStreamCompleted(streamID),
)
mockSender.EXPECT().onHasStreamControlFrame(gomock.Any(), gomock.Any())
str.CancelWrite(9876)
// don't EXPECT any calls to onHasStreamData
f.Handler.OnLost(f.Frame)
@ -1151,6 +1141,36 @@ var _ = Describe("Send Stream", func() {
frame.Handler.OnAcked(frame.Frame)
})
It("waits until a RESET_STREAM is acknowledged", func() {
mockSender.EXPECT().onHasStreamData(streamID, str)
done := make(chan struct{})
go func() {
defer GinkgoRecover()
_, err := strWithTimeout.Write([]byte("foobar"))
Expect(err).ToNot(HaveOccurred())
close(done)
}()
waitForWrite()
frame, ok, hasMoreData := str.popStreamFrame(20, protocol.Version1)
Expect(ok).To(BeTrue())
Expect(frame.Frame.DataLen()).To(BeEquivalentTo(6))
Expect(hasMoreData).To(BeFalse())
mockSender.EXPECT().onHasStreamControlFrame(streamID, str)
str.CancelWrite(1234)
cf, ok, _ := str.getControlFrame()
Expect(ok).To(BeTrue())
// Acknowledge the STREAM frame.
// This doesn't complete the stream, since we're still waiting for
// the acknowledgment of the RESET_STREAM frame.
frame.Handler.OnAcked(frame.Frame)
mockSender.EXPECT().onStreamCompleted(streamID)
cf.Handler.OnAcked(cf.Frame)
})
It("says when a stream is completed, if Close() is called before popping the frame", func() {
mockSender.EXPECT().onHasStreamData(streamID, str).Times(2)
done := make(chan struct{})