mirror of
https://github.com/refraction-networking/uquic.git
synced 2025-04-06 05:37:36 +03:00
Merge pull request #1749 from lucas-clemente/dont-send-after-reset
don't send stream data after a stream was canceled
This commit is contained in:
commit
d6521fa3cc
4 changed files with 72 additions and 64 deletions
|
@ -49,40 +49,42 @@ func NewStreamFlowController(
|
|||
}
|
||||
}
|
||||
|
||||
// UpdateHighestReceived updates the highestReceived value, if the byteOffset is higher
|
||||
// it returns an ErrReceivedSmallerByteOffset if the received byteOffset is smaller than any byteOffset received before
|
||||
func (c *streamFlowController) UpdateHighestReceived(byteOffset protocol.ByteCount, final bool) error {
|
||||
// UpdateHighestReceived updates the highestReceived value, if the offset is higher.
|
||||
func (c *streamFlowController) UpdateHighestReceived(offset protocol.ByteCount, final bool) error {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
// when receiving a final offset, check that this final offset is consistent with a final offset we might have received earlier
|
||||
if final && c.receivedFinalOffset && byteOffset != c.highestReceived {
|
||||
return qerr.Error(qerr.StreamDataAfterTermination, fmt.Sprintf("Received inconsistent final offset for stream %d (old: %d, new: %d bytes)", c.streamID, c.highestReceived, byteOffset))
|
||||
// If the final offset for this stream is already known, check for consistency.
|
||||
if c.receivedFinalOffset {
|
||||
// If we receive another final offset, check that it's the same.
|
||||
if final && offset != c.highestReceived {
|
||||
return qerr.Error(qerr.StreamDataAfterTermination, fmt.Sprintf("Received inconsistent final offset for stream %d (old: %#x, new: %#x bytes)", c.streamID, c.highestReceived, offset))
|
||||
}
|
||||
// if we already received a final offset, check that the offset in the STREAM frames is below the final offset
|
||||
if c.receivedFinalOffset && byteOffset > c.highestReceived {
|
||||
return qerr.StreamDataAfterTermination
|
||||
// Check that the offset is below the final offset.
|
||||
if offset > c.highestReceived {
|
||||
return qerr.Error(qerr.StreamDataAfterTermination, fmt.Sprintf("Received offset %#x for stream %d. Final offset was already received at %#x", offset, c.streamID, c.highestReceived))
|
||||
}
|
||||
}
|
||||
|
||||
if final {
|
||||
c.receivedFinalOffset = true
|
||||
}
|
||||
if byteOffset == c.highestReceived {
|
||||
if offset == c.highestReceived {
|
||||
return nil
|
||||
}
|
||||
if byteOffset <= c.highestReceived {
|
||||
// a STREAM_FRAME with a higher offset was received before.
|
||||
// A higher offset was received before.
|
||||
// This can happen due to reordering.
|
||||
if offset <= c.highestReceived {
|
||||
if final {
|
||||
// If the current byteOffset is smaller than the offset in that STREAM_FRAME, this STREAM_FRAME contained data after the end of the stream
|
||||
return qerr.StreamDataAfterTermination
|
||||
return qerr.Error(qerr.StreamDataAfterTermination, fmt.Sprintf("Received final offset %#x for stream %d, but already received offset %#x before", offset, c.streamID, c.highestReceived))
|
||||
}
|
||||
// this is a reordered STREAM_FRAME
|
||||
return nil
|
||||
}
|
||||
|
||||
increment := byteOffset - c.highestReceived
|
||||
c.highestReceived = byteOffset
|
||||
increment := offset - c.highestReceived
|
||||
c.highestReceived = offset
|
||||
if c.checkFlowControlViolation() {
|
||||
return qerr.Error(qerr.FlowControlReceivedTooMuchData, fmt.Sprintf("Received %d bytes on stream %d, allowed %d bytes", byteOffset, c.streamID, c.receiveWindow))
|
||||
return qerr.Error(qerr.FlowControlReceivedTooMuchData, fmt.Sprintf("Received %#x bytes on stream %d, allowed %#x bytes", offset, c.streamID, c.receiveWindow))
|
||||
}
|
||||
return c.connection.IncrementHighestReceived(increment)
|
||||
}
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
|
||||
"github.com/lucas-clemente/quic-go/internal/congestion"
|
||||
"github.com/lucas-clemente/quic-go/internal/protocol"
|
||||
"github.com/lucas-clemente/quic-go/internal/qerr"
|
||||
"github.com/lucas-clemente/quic-go/internal/utils"
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
|
@ -61,7 +60,7 @@ var _ = Describe("Stream Flow controller", func() {
|
|||
|
||||
Context("receiving data", func() {
|
||||
Context("registering received offsets", func() {
|
||||
var receiveWindow protocol.ByteCount = 10000
|
||||
var receiveWindow protocol.ByteCount = 0x10000
|
||||
var receiveWindowSize protocol.ByteCount = 600
|
||||
|
||||
BeforeEach(func() {
|
||||
|
@ -71,83 +70,67 @@ var _ = Describe("Stream Flow controller", func() {
|
|||
|
||||
It("updates the highestReceived", func() {
|
||||
controller.highestReceived = 1337
|
||||
err := controller.UpdateHighestReceived(1338, false)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(controller.UpdateHighestReceived(1338, false)).To(Succeed())
|
||||
Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1338)))
|
||||
})
|
||||
|
||||
It("informs the connection flow controller about received data", func() {
|
||||
controller.highestReceived = 10
|
||||
controller.connection.(*connectionFlowController).highestReceived = 100
|
||||
err := controller.UpdateHighestReceived(20, false)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(controller.UpdateHighestReceived(20, false)).To(Succeed())
|
||||
Expect(controller.connection.(*connectionFlowController).highestReceived).To(Equal(protocol.ByteCount(100 + 10)))
|
||||
})
|
||||
|
||||
It("does not decrease the highestReceived", func() {
|
||||
controller.highestReceived = 1337
|
||||
err := controller.UpdateHighestReceived(1000, false)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(controller.UpdateHighestReceived(1000, false)).To(Succeed())
|
||||
Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1337)))
|
||||
})
|
||||
|
||||
It("does nothing when setting the same byte offset", func() {
|
||||
controller.highestReceived = 1337
|
||||
err := controller.UpdateHighestReceived(1337, false)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(controller.UpdateHighestReceived(1337, false)).To(Succeed())
|
||||
})
|
||||
|
||||
It("does not give a flow control violation when using the window completely", func() {
|
||||
controller.connection.(*connectionFlowController).receiveWindow = receiveWindow
|
||||
err := controller.UpdateHighestReceived(receiveWindow, false)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(controller.UpdateHighestReceived(receiveWindow, false)).To(Succeed())
|
||||
})
|
||||
|
||||
It("detects a flow control violation", func() {
|
||||
err := controller.UpdateHighestReceived(receiveWindow+1, false)
|
||||
Expect(err).To(MatchError("FlowControlReceivedTooMuchData: Received 10001 bytes on stream 10, allowed 10000 bytes"))
|
||||
Expect(controller.UpdateHighestReceived(receiveWindow+1, false)).To(MatchError("FlowControlReceivedTooMuchData: Received 0x10001 bytes on stream 10, allowed 0x10000 bytes"))
|
||||
})
|
||||
|
||||
It("accepts a final offset higher than the highest received", func() {
|
||||
controller.highestReceived = 100
|
||||
err := controller.UpdateHighestReceived(101, true)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(controller.UpdateHighestReceived(100, false)).To(Succeed())
|
||||
Expect(controller.UpdateHighestReceived(101, true)).To(Succeed())
|
||||
Expect(controller.highestReceived).To(Equal(protocol.ByteCount(101)))
|
||||
})
|
||||
|
||||
It("errors when receiving a final offset smaller than the highest offset received so far", func() {
|
||||
controller.highestReceived = 100
|
||||
err := controller.UpdateHighestReceived(99, true)
|
||||
Expect(err).To(MatchError(qerr.StreamDataAfterTermination))
|
||||
controller.UpdateHighestReceived(0x100, false)
|
||||
Expect(controller.UpdateHighestReceived(0xff, true)).To(MatchError("StreamDataAfterTermination: Received final offset 0xff for stream 10, but already received offset 0x100 before"))
|
||||
})
|
||||
|
||||
It("accepts delayed data after receiving a final offset", func() {
|
||||
err := controller.UpdateHighestReceived(300, true)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = controller.UpdateHighestReceived(250, false)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(controller.UpdateHighestReceived(300, true)).To(Succeed())
|
||||
Expect(controller.UpdateHighestReceived(250, false)).To(Succeed())
|
||||
})
|
||||
|
||||
It("errors when receiving a higher offset after receiving a final offset", func() {
|
||||
err := controller.UpdateHighestReceived(200, true)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = controller.UpdateHighestReceived(250, false)
|
||||
Expect(err).To(MatchError(qerr.StreamDataAfterTermination))
|
||||
Expect(controller.UpdateHighestReceived(0x200, true)).To(Succeed())
|
||||
Expect(controller.UpdateHighestReceived(0x250, false)).To(MatchError("StreamDataAfterTermination: Received offset 0x250 for stream 10. Final offset was already received at 0x200"))
|
||||
})
|
||||
|
||||
It("accepts duplicate final offsets", func() {
|
||||
err := controller.UpdateHighestReceived(200, true)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = controller.UpdateHighestReceived(200, true)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(controller.UpdateHighestReceived(200, true)).To(Succeed())
|
||||
Expect(controller.UpdateHighestReceived(200, true)).To(Succeed())
|
||||
Expect(controller.highestReceived).To(Equal(protocol.ByteCount(200)))
|
||||
})
|
||||
|
||||
It("errors when receiving inconsistent final offsets", func() {
|
||||
err := controller.UpdateHighestReceived(200, true)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = controller.UpdateHighestReceived(201, true)
|
||||
Expect(err).To(MatchError("StreamDataAfterTermination: Received inconsistent final offset for stream 10 (old: 200, new: 201 bytes)"))
|
||||
Expect(controller.UpdateHighestReceived(0x200, true)).To(Succeed())
|
||||
Expect(controller.UpdateHighestReceived(0x201, true)).To(MatchError("StreamDataAfterTermination: Received inconsistent final offset for stream 10 (old: 0x200, new: 0x201 bytes)"))
|
||||
})
|
||||
|
||||
It("tells the connection flow controller when a stream is abandoned", func() {
|
||||
|
|
|
@ -157,7 +157,7 @@ func (s *sendStream) popStreamFrameImpl(maxBytes protocol.ByteCount) (bool /* co
|
|||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
if s.closeForShutdownErr != nil {
|
||||
if s.canceledWrite || s.closeForShutdownErr != nil {
|
||||
return false, nil, false
|
||||
}
|
||||
|
||||
|
@ -273,12 +273,6 @@ func (s *sendStream) cancelWriteImpl(errorCode protocol.ApplicationErrorCode, wr
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func (s *sendStream) handleStopSendingFrame(frame *wire.StopSendingFrame) {
|
||||
if completed := s.handleStopSendingFrameImpl(frame); completed {
|
||||
s.sender.onStreamCompleted(s.streamID)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *sendStream) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) {
|
||||
s.mutex.Lock()
|
||||
hasStreamData := s.dataForWriting != nil
|
||||
|
@ -289,6 +283,12 @@ func (s *sendStream) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *sendStream) handleStopSendingFrame(frame *wire.StopSendingFrame) {
|
||||
if completed := s.handleStopSendingFrameImpl(frame); completed {
|
||||
s.sender.onStreamCompleted(s.streamID)
|
||||
}
|
||||
}
|
||||
|
||||
// must be called after locking the mutex
|
||||
func (s *sendStream) handleStopSendingFrameImpl(frame *wire.StopSendingFrame) bool /*completed*/ {
|
||||
s.mutex.Lock()
|
||||
|
|
|
@ -549,12 +549,35 @@ var _ = Describe("Send Stream", func() {
|
|||
waitForWrite()
|
||||
frame, _ := str.popStreamFrame(50)
|
||||
Expect(frame).ToNot(BeNil())
|
||||
err := str.CancelWrite(1234)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(str.CancelWrite(1234)).To(Succeed())
|
||||
Eventually(writeReturned).Should(BeClosed())
|
||||
Expect(n).To(BeEquivalentTo(frame.DataLen()))
|
||||
})
|
||||
|
||||
It("doesn't pop STREAM frames after being canceled", func() {
|
||||
mockSender.EXPECT().onHasStreamData(streamID)
|
||||
mockSender.EXPECT().onStreamCompleted(streamID)
|
||||
mockSender.EXPECT().queueControlFrame(gomock.Any())
|
||||
mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount)
|
||||
mockFC.EXPECT().AddBytesSent(gomock.Any())
|
||||
writeReturned := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
_, err := strWithTimeout.Write(bytes.Repeat([]byte{0}, 100))
|
||||
Expect(err).To(MatchError("Write on stream 1337 canceled with error code 1234"))
|
||||
close(writeReturned)
|
||||
}()
|
||||
waitForWrite()
|
||||
frame, hasMoreData := str.popStreamFrame(50)
|
||||
Expect(hasMoreData).To(BeTrue())
|
||||
Expect(frame).ToNot(BeNil())
|
||||
Expect(str.CancelWrite(1234)).To(Succeed())
|
||||
frame, hasMoreData = str.popStreamFrame(10)
|
||||
Expect(hasMoreData).To(BeFalse())
|
||||
Expect(frame).To(BeNil())
|
||||
Eventually(writeReturned).Should(BeClosed())
|
||||
})
|
||||
|
||||
It("cancels the context", func() {
|
||||
mockSender.EXPECT().queueControlFrame(gomock.Any())
|
||||
mockSender.EXPECT().onStreamCompleted(streamID)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue