mirror of
https://github.com/refraction-networking/uquic.git
synced 2025-04-03 20:27:35 +03:00
release connection-level flow control credit when a stream is reset
This commit is contained in:
parent
1864e301ef
commit
1d7d532035
5 changed files with 99 additions and 26 deletions
|
@ -18,8 +18,12 @@ type StreamFlowController interface {
|
|||
flowController
|
||||
// for receiving
|
||||
// UpdateHighestReceived should be called when a new highest offset is received
|
||||
// final has to be to true if this is the final offset of the stream, as contained in a STREAM frame with FIN bit, and the RESET_STREAM frame
|
||||
// final has to be to true if this is the final offset of the stream,
|
||||
// as contained in a STREAM frame with FIN bit, and the RESET_STREAM frame
|
||||
UpdateHighestReceived(offset protocol.ByteCount, final bool) error
|
||||
// Abandon should be called when reading from the stream is aborted early,
|
||||
// and there won't be any further calls to AddBytesRead.
|
||||
Abandon()
|
||||
}
|
||||
|
||||
// The ConnectionFlowController is the flow controller for the connection.
|
||||
|
|
|
@ -93,6 +93,12 @@ func (c *streamFlowController) AddBytesRead(n protocol.ByteCount) {
|
|||
c.connection.AddBytesRead(n)
|
||||
}
|
||||
|
||||
func (c *streamFlowController) Abandon() {
|
||||
if unread := c.highestReceived - c.bytesRead; unread > 0 {
|
||||
c.connection.AddBytesRead(unread)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *streamFlowController) AddBytesSent(n protocol.ByteCount) {
|
||||
c.baseFlowController.AddBytesSent(n)
|
||||
c.connection.AddBytesSent(n)
|
||||
|
|
|
@ -149,6 +149,13 @@ var _ = Describe("Stream Flow controller", func() {
|
|||
err = controller.UpdateHighestReceived(201, true)
|
||||
Expect(err).To(MatchError("StreamDataAfterTermination: Received inconsistent final offset for stream 10 (old: 200, new: 201 bytes)"))
|
||||
})
|
||||
|
||||
It("tells the connection flow controller when a stream is abandoned", func() {
|
||||
controller.AddBytesRead(5)
|
||||
Expect(controller.UpdateHighestReceived(100, true)).To(Succeed())
|
||||
controller.Abandon()
|
||||
Expect(controller.connection.(*connectionFlowController).bytesRead).To(Equal(protocol.ByteCount(100)))
|
||||
})
|
||||
})
|
||||
|
||||
It("saves when data is read", func() {
|
||||
|
@ -198,6 +205,13 @@ var _ = Describe("Stream Flow controller", func() {
|
|||
Expect(controller.connection.(*connectionFlowController).receiveWindowSize).To(Equal(protocol.ByteCount(float64(controller.receiveWindowSize) * protocol.ConnectionFlowControlMultiplier)))
|
||||
})
|
||||
|
||||
It("sends a connection-level window update when a large stream is abandoned", func() {
|
||||
Expect(controller.UpdateHighestReceived(90, true)).To(Succeed())
|
||||
Expect(controller.connection.GetWindowUpdate()).To(BeZero())
|
||||
controller.Abandon()
|
||||
Expect(controller.connection.GetWindowUpdate()).ToNot(BeZero())
|
||||
})
|
||||
|
||||
It("doesn't increase the window after a final offset was already received", func() {
|
||||
Expect(controller.UpdateHighestReceived(90, true)).To(Succeed())
|
||||
controller.AddBytesRead(30)
|
||||
|
|
|
@ -80,7 +80,7 @@ func (s *receiveStream) StreamID() protocol.StreamID {
|
|||
func (s *receiveStream) Read(p []byte) (int, error) {
|
||||
completed, n, err := s.readImpl(p)
|
||||
if completed {
|
||||
s.sender.onStreamCompleted(s.streamID)
|
||||
s.streamCompleted()
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
@ -197,6 +197,9 @@ func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) erro
|
|||
if s.finRead || s.canceledRead || s.resetRemotely {
|
||||
return nil
|
||||
}
|
||||
if s.finalOffset != protocol.MaxByteCount { // final offset was already received
|
||||
s.streamCompleted()
|
||||
}
|
||||
s.canceledRead = true
|
||||
s.cancelReadErr = fmt.Errorf("Read on stream %d canceled with error code %d", s.streamID, errorCode)
|
||||
s.signalRead()
|
||||
|
@ -209,12 +212,22 @@ func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) erro
|
|||
|
||||
func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error {
|
||||
maxOffset := frame.Offset + frame.DataLen()
|
||||
if err := s.flowController.UpdateHighestReceived(maxOffset, frame.FinBit); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
if err := s.flowController.UpdateHighestReceived(maxOffset, frame.FinBit); err != nil {
|
||||
return err
|
||||
}
|
||||
if frame.FinBit {
|
||||
s.finalOffset = maxOffset
|
||||
}
|
||||
if s.canceledRead {
|
||||
if frame.FinBit {
|
||||
s.streamCompleted()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if err := s.frameQueue.Push(frame.Data, frame.Offset); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -228,7 +241,7 @@ func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error {
|
|||
func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame) error {
|
||||
completed, err := s.handleResetStreamFrameImpl(frame)
|
||||
if completed {
|
||||
s.sender.onStreamCompleted(s.streamID)
|
||||
s.streamCompleted()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -285,6 +298,13 @@ func (s *receiveStream) getWindowUpdate() protocol.ByteCount {
|
|||
return s.flowController.GetWindowUpdate()
|
||||
}
|
||||
|
||||
func (s *receiveStream) streamCompleted() {
|
||||
if !s.finRead {
|
||||
s.flowController.Abandon()
|
||||
}
|
||||
s.sender.onStreamCompleted(s.streamID)
|
||||
}
|
||||
|
||||
// signalRead performs a non-blocking send on the readChan
|
||||
func (s *receiveStream) signalRead() {
|
||||
select {
|
||||
|
|
|
@ -458,26 +458,22 @@ var _ = Describe("Receive Stream", func() {
|
|||
close(done)
|
||||
}()
|
||||
Consistently(done).ShouldNot(BeClosed())
|
||||
err := str.CancelRead(1234)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(str.CancelRead(1234)).To(Succeed())
|
||||
Eventually(done).Should(BeClosed())
|
||||
})
|
||||
|
||||
It("doesn't allow further calls to Read", func() {
|
||||
mockSender.EXPECT().queueControlFrame(gomock.Any())
|
||||
err := str.CancelRead(1234)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
_, err = strWithTimeout.Read([]byte{0})
|
||||
Expect(str.CancelRead(1234)).To(Succeed())
|
||||
_, err := strWithTimeout.Read([]byte{0})
|
||||
Expect(err).To(MatchError("Read on stream 1337 canceled with error code 1234"))
|
||||
})
|
||||
|
||||
It("does nothing when CancelRead is called twice", func() {
|
||||
mockSender.EXPECT().queueControlFrame(gomock.Any())
|
||||
err := str.CancelRead(1234)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = str.CancelRead(2345)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
_, err = strWithTimeout.Read([]byte{0})
|
||||
Expect(str.CancelRead(1234)).To(Succeed())
|
||||
Expect(str.CancelRead(1234)).To(Succeed())
|
||||
_, err := strWithTimeout.Read([]byte{0})
|
||||
Expect(err).To(MatchError("Read on stream 1337 canceled with error code 1234"))
|
||||
})
|
||||
|
||||
|
@ -505,7 +501,10 @@ var _ = Describe("Receive Stream", func() {
|
|||
})
|
||||
|
||||
It("doesn't send a STOP_SENDING frame, if the stream was already reset", func() {
|
||||
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true)
|
||||
gomock.InOrder(
|
||||
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true),
|
||||
mockFC.EXPECT().Abandon(),
|
||||
)
|
||||
mockSender.EXPECT().onStreamCompleted(streamID)
|
||||
Expect(str.handleResetStreamFrame(&wire.ResetStreamFrame{
|
||||
StreamID: streamID,
|
||||
|
@ -513,6 +512,32 @@ var _ = Describe("Receive Stream", func() {
|
|||
})).To(Succeed())
|
||||
Expect(str.CancelRead(1234)).To(Succeed())
|
||||
})
|
||||
|
||||
It("sends a STOP_SENDING and completes the stream after receiving the final offset", func() {
|
||||
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(1000), true)
|
||||
Expect(str.handleStreamFrame(&wire.StreamFrame{
|
||||
Offset: 1000,
|
||||
FinBit: true,
|
||||
})).To(Succeed())
|
||||
mockFC.EXPECT().Abandon()
|
||||
mockSender.EXPECT().queueControlFrame(gomock.Any())
|
||||
mockSender.EXPECT().onStreamCompleted(streamID)
|
||||
Expect(str.CancelRead(1234)).To(Succeed())
|
||||
})
|
||||
|
||||
It("completes the stream when receiving the FinBit after the stream was canceled", func() {
|
||||
mockSender.EXPECT().queueControlFrame(gomock.Any())
|
||||
Expect(str.CancelRead(1234)).To(Succeed())
|
||||
gomock.InOrder(
|
||||
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(1000), true),
|
||||
mockFC.EXPECT().Abandon(),
|
||||
)
|
||||
mockSender.EXPECT().onStreamCompleted(streamID)
|
||||
Expect(str.handleStreamFrame(&wire.StreamFrame{
|
||||
Offset: 1000,
|
||||
FinBit: true,
|
||||
})).To(Succeed())
|
||||
})
|
||||
})
|
||||
|
||||
Context("receiving RESET_STREAM frames", func() {
|
||||
|
@ -523,7 +548,6 @@ var _ = Describe("Receive Stream", func() {
|
|||
}
|
||||
|
||||
It("unblocks Read", func() {
|
||||
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
|
@ -536,16 +560,22 @@ var _ = Describe("Receive Stream", func() {
|
|||
}()
|
||||
Consistently(done).ShouldNot(BeClosed())
|
||||
mockSender.EXPECT().onStreamCompleted(streamID)
|
||||
gomock.InOrder(
|
||||
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true),
|
||||
mockFC.EXPECT().Abandon(),
|
||||
)
|
||||
str.handleResetStreamFrame(rst)
|
||||
Eventually(done).Should(BeClosed())
|
||||
})
|
||||
|
||||
It("doesn't allow further calls to Read", func() {
|
||||
mockSender.EXPECT().onStreamCompleted(streamID)
|
||||
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true)
|
||||
err := str.handleResetStreamFrame(rst)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
_, err = strWithTimeout.Read([]byte{0})
|
||||
gomock.InOrder(
|
||||
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true),
|
||||
mockFC.EXPECT().Abandon(),
|
||||
)
|
||||
Expect(str.handleResetStreamFrame(rst)).To(Succeed())
|
||||
_, err := strWithTimeout.Read([]byte{0})
|
||||
Expect(err).To(MatchError("Stream 1337 was reset with error code 1234"))
|
||||
Expect(err).To(BeAssignableToTypeOf(streamCanceledError{}))
|
||||
Expect(err.(streamCanceledError).Canceled()).To(BeTrue())
|
||||
|
@ -561,11 +591,10 @@ var _ = Describe("Receive Stream", func() {
|
|||
|
||||
It("ignores duplicate RESET_STREAM frames", func() {
|
||||
mockSender.EXPECT().onStreamCompleted(streamID)
|
||||
mockFC.EXPECT().Abandon()
|
||||
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true).Times(2)
|
||||
err := str.handleResetStreamFrame(rst)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = str.handleResetStreamFrame(rst)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(str.handleResetStreamFrame(rst)).To(Succeed())
|
||||
Expect(str.handleResetStreamFrame(rst)).To(Succeed())
|
||||
})
|
||||
|
||||
It("doesn't do anyting when it was closed for shutdown", func() {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue