From 198de32ef64a31e8cb459ed408e98678c2de40e4 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 3 Feb 2024 13:02:13 +0700 Subject: [PATCH] don't enqueue stream for sending on reordered MAX_STREAM_DATA frames (#4269) --- internal/flowcontrol/base_flow_controller.go | 4 +++- .../flowcontrol/base_flow_controller_test.go | 4 ++-- internal/flowcontrol/interface.go | 7 +++---- internal/mocks/connection_flow_controller.go | 14 +++++++------ internal/mocks/stream_flow_controller.go | 14 +++++++------ send_stream.go | 6 ++++-- send_stream_test.go | 20 ++++++++++++++++++- 7 files changed, 47 insertions(+), 22 deletions(-) diff --git a/internal/flowcontrol/base_flow_controller.go b/internal/flowcontrol/base_flow_controller.go index 184aad34..3d88d577 100644 --- a/internal/flowcontrol/base_flow_controller.go +++ b/internal/flowcontrol/base_flow_controller.go @@ -48,10 +48,12 @@ func (c *baseFlowController) AddBytesSent(n protocol.ByteCount) { } // UpdateSendWindow is called after receiving a MAX_{STREAM_}DATA frame. -func (c *baseFlowController) UpdateSendWindow(offset protocol.ByteCount) { +func (c *baseFlowController) UpdateSendWindow(offset protocol.ByteCount) (updated bool) { if offset > c.sendWindow { c.sendWindow = offset + return true } + return false } func (c *baseFlowController) sendWindowSize() protocol.ByteCount { diff --git a/internal/flowcontrol/base_flow_controller_test.go b/internal/flowcontrol/base_flow_controller_test.go index 6c26e7f8..da6cf357 100644 --- a/internal/flowcontrol/base_flow_controller_test.go +++ b/internal/flowcontrol/base_flow_controller_test.go @@ -59,9 +59,9 @@ var _ = Describe("Base Flow controller", func() { }) It("does not decrease the flow control window", func() { - controller.UpdateSendWindow(20) + Expect(controller.UpdateSendWindow(20)).To(BeTrue()) Expect(controller.sendWindowSize()).To(Equal(protocol.ByteCount(20))) - controller.UpdateSendWindow(10) + Expect(controller.UpdateSendWindow(10)).To(BeFalse()) Expect(controller.sendWindowSize()).To(Equal(protocol.ByteCount(20))) }) diff --git a/internal/flowcontrol/interface.go b/internal/flowcontrol/interface.go index 946519d5..fc5f9de0 100644 --- a/internal/flowcontrol/interface.go +++ b/internal/flowcontrol/interface.go @@ -5,7 +5,7 @@ import "github.com/quic-go/quic-go/internal/protocol" type flowController interface { // for sending SendWindowSize() protocol.ByteCount - UpdateSendWindow(protocol.ByteCount) + UpdateSendWindow(protocol.ByteCount) (updated bool) AddBytesSent(protocol.ByteCount) // for receiving AddBytesRead(protocol.ByteCount) @@ -16,12 +16,11 @@ type flowController interface { // A StreamFlowController is a flow controller for a QUIC stream. type StreamFlowController interface { flowController - // for receiving - // UpdateHighestReceived should be called when a new highest offset is received + // UpdateHighestReceived is called when a new highest offset is received // final has to be to true if this is the final offset of the stream, // as contained in a STREAM frame with FIN bit, and the RESET_STREAM frame UpdateHighestReceived(offset protocol.ByteCount, final bool) error - // Abandon should be called when reading from the stream is aborted early, + // Abandon is called when reading from the stream is aborted early, // and there won't be any further calls to AddBytesRead. Abandon() } diff --git a/internal/mocks/connection_flow_controller.go b/internal/mocks/connection_flow_controller.go index ac328602..5f6cb2aa 100644 --- a/internal/mocks/connection_flow_controller.go +++ b/internal/mocks/connection_flow_controller.go @@ -264,9 +264,11 @@ func (c *ConnectionFlowControllerSendWindowSizeCall) DoAndReturn(f func() protoc } // UpdateSendWindow mocks base method. -func (m *MockConnectionFlowController) UpdateSendWindow(arg0 protocol.ByteCount) { +func (m *MockConnectionFlowController) UpdateSendWindow(arg0 protocol.ByteCount) bool { m.ctrl.T.Helper() - m.ctrl.Call(m, "UpdateSendWindow", arg0) + ret := m.ctrl.Call(m, "UpdateSendWindow", arg0) + ret0, _ := ret[0].(bool) + return ret0 } // UpdateSendWindow indicates an expected call of UpdateSendWindow. @@ -282,19 +284,19 @@ type ConnectionFlowControllerUpdateSendWindowCall struct { } // Return rewrite *gomock.Call.Return -func (c *ConnectionFlowControllerUpdateSendWindowCall) Return() *ConnectionFlowControllerUpdateSendWindowCall { - c.Call = c.Call.Return() +func (c *ConnectionFlowControllerUpdateSendWindowCall) Return(arg0 bool) *ConnectionFlowControllerUpdateSendWindowCall { + c.Call = c.Call.Return(arg0) return c } // Do rewrite *gomock.Call.Do -func (c *ConnectionFlowControllerUpdateSendWindowCall) Do(f func(protocol.ByteCount)) *ConnectionFlowControllerUpdateSendWindowCall { +func (c *ConnectionFlowControllerUpdateSendWindowCall) Do(f func(protocol.ByteCount) bool) *ConnectionFlowControllerUpdateSendWindowCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *ConnectionFlowControllerUpdateSendWindowCall) DoAndReturn(f func(protocol.ByteCount)) *ConnectionFlowControllerUpdateSendWindowCall { +func (c *ConnectionFlowControllerUpdateSendWindowCall) DoAndReturn(f func(protocol.ByteCount) bool) *ConnectionFlowControllerUpdateSendWindowCall { c.Call = c.Call.DoAndReturn(f) return c } diff --git a/internal/mocks/stream_flow_controller.go b/internal/mocks/stream_flow_controller.go index 7207d760..2e244dfb 100644 --- a/internal/mocks/stream_flow_controller.go +++ b/internal/mocks/stream_flow_controller.go @@ -300,9 +300,11 @@ func (c *StreamFlowControllerUpdateHighestReceivedCall) DoAndReturn(f func(proto } // UpdateSendWindow mocks base method. -func (m *MockStreamFlowController) UpdateSendWindow(arg0 protocol.ByteCount) { +func (m *MockStreamFlowController) UpdateSendWindow(arg0 protocol.ByteCount) bool { m.ctrl.T.Helper() - m.ctrl.Call(m, "UpdateSendWindow", arg0) + ret := m.ctrl.Call(m, "UpdateSendWindow", arg0) + ret0, _ := ret[0].(bool) + return ret0 } // UpdateSendWindow indicates an expected call of UpdateSendWindow. @@ -318,19 +320,19 @@ type StreamFlowControllerUpdateSendWindowCall struct { } // Return rewrite *gomock.Call.Return -func (c *StreamFlowControllerUpdateSendWindowCall) Return() *StreamFlowControllerUpdateSendWindowCall { - c.Call = c.Call.Return() +func (c *StreamFlowControllerUpdateSendWindowCall) Return(arg0 bool) *StreamFlowControllerUpdateSendWindowCall { + c.Call = c.Call.Return(arg0) return c } // Do rewrite *gomock.Call.Do -func (c *StreamFlowControllerUpdateSendWindowCall) Do(f func(protocol.ByteCount)) *StreamFlowControllerUpdateSendWindowCall { +func (c *StreamFlowControllerUpdateSendWindowCall) Do(f func(protocol.ByteCount) bool) *StreamFlowControllerUpdateSendWindowCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *StreamFlowControllerUpdateSendWindowCall) DoAndReturn(f func(protocol.ByteCount)) *StreamFlowControllerUpdateSendWindowCall { +func (c *StreamFlowControllerUpdateSendWindowCall) DoAndReturn(f func(protocol.ByteCount) bool) *StreamFlowControllerUpdateSendWindowCall { c.Call = c.Call.DoAndReturn(f) return c } diff --git a/send_stream.go b/send_stream.go index f4ae5b2a..e1ce3e67 100644 --- a/send_stream.go +++ b/send_stream.go @@ -404,11 +404,13 @@ func (s *sendStream) cancelWriteImpl(errorCode qerr.StreamErrorCode, remote bool } func (s *sendStream) updateSendWindow(limit protocol.ByteCount) { + updated := s.flowController.UpdateSendWindow(limit) + if !updated { // duplicate or reordered MAX_STREAM_DATA frame + return + } s.mutex.Lock() hasStreamData := s.dataForWriting != nil || s.nextFrame != nil s.mutex.Unlock() - - s.flowController.UpdateSendWindow(limit) if hasStreamData { s.sender.onHasStreamData(s.streamID) } diff --git a/send_stream_test.go b/send_stream_test.go index ad1d0469..50713308 100644 --- a/send_stream_test.go +++ b/send_stream_test.go @@ -682,7 +682,7 @@ var _ = Describe("Send Stream", func() { }) It("says when it has data for sending", func() { - mockFC.EXPECT().UpdateSendWindow(gomock.Any()) + mockFC.EXPECT().UpdateSendWindow(gomock.Any()).Return(true) mockSender.EXPECT().onHasStreamData(streamID) done := make(chan struct{}) go func() { @@ -698,6 +698,24 @@ var _ = Describe("Send Stream", func() { str.closeForShutdown(nil) Eventually(done).Should(BeClosed()) }) + + It("doesn't say it has data for sending if the MAX_STREAM_DATA frame was reordered", func() { + mockFC.EXPECT().UpdateSendWindow(gomock.Any()).Return(false) // reordered frame + mockSender.EXPECT().onHasStreamData(streamID) + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + _, err := str.Write([]byte("foobar")) + Expect(err).ToNot(HaveOccurred()) + close(done) + }() + waitForWrite() + // don't expect any calls to onHasStreamData + str.updateSendWindow(42) + // make sure the Write go routine returns + str.closeForShutdown(nil) + Eventually(done).Should(BeClosed()) + }) }) Context("stream cancellations", func() {