diff --git a/internal/flowcontrol/stream_flow_controller.go b/internal/flowcontrol/stream_flow_controller.go index 5e58b566..ffee741b 100644 --- a/internal/flowcontrol/stream_flow_controller.go +++ b/internal/flowcontrol/stream_flow_controller.go @@ -49,40 +49,42 @@ func NewStreamFlowController( } } -// UpdateHighestReceived updates the highestReceived value, if the byteOffset is higher -// it returns an ErrReceivedSmallerByteOffset if the received byteOffset is smaller than any byteOffset received before -func (c *streamFlowController) UpdateHighestReceived(byteOffset protocol.ByteCount, final bool) error { +// UpdateHighestReceived updates the highestReceived value, if the offset is higher. +func (c *streamFlowController) UpdateHighestReceived(offset protocol.ByteCount, final bool) error { c.mutex.Lock() defer c.mutex.Unlock() - // when receiving a final offset, check that this final offset is consistent with a final offset we might have received earlier - if final && c.receivedFinalOffset && byteOffset != c.highestReceived { - return qerr.Error(qerr.StreamDataAfterTermination, fmt.Sprintf("Received inconsistent final offset for stream %d (old: %d, new: %d bytes)", c.streamID, c.highestReceived, byteOffset)) - } - // if we already received a final offset, check that the offset in the STREAM frames is below the final offset - if c.receivedFinalOffset && byteOffset > c.highestReceived { - return qerr.StreamDataAfterTermination + // If the final offset for this stream is already known, check for consistency. + if c.receivedFinalOffset { + // If we receive another final offset, check that it's the same. + if final && offset != c.highestReceived { + return qerr.Error(qerr.StreamDataAfterTermination, fmt.Sprintf("Received inconsistent final offset for stream %d (old: %#x, new: %#x bytes)", c.streamID, c.highestReceived, offset)) + } + // Check that the offset is below the final offset. + if offset > c.highestReceived { + return qerr.Error(qerr.StreamDataAfterTermination, fmt.Sprintf("Received offset %#x for stream %d. Final offset was already received at %#x", offset, c.streamID, c.highestReceived)) + } } + if final { c.receivedFinalOffset = true } - if byteOffset == c.highestReceived { + if offset == c.highestReceived { return nil } - if byteOffset <= c.highestReceived { - // a STREAM_FRAME with a higher offset was received before. + // A higher offset was received before. + // This can happen due to reordering. + if offset <= c.highestReceived { if final { - // If the current byteOffset is smaller than the offset in that STREAM_FRAME, this STREAM_FRAME contained data after the end of the stream - return qerr.StreamDataAfterTermination + return qerr.Error(qerr.StreamDataAfterTermination, fmt.Sprintf("Received final offset %#x for stream %d, but already received offset %#x before", offset, c.streamID, c.highestReceived)) } - // this is a reordered STREAM_FRAME return nil } - increment := byteOffset - c.highestReceived - c.highestReceived = byteOffset + increment := offset - c.highestReceived + c.highestReceived = offset if c.checkFlowControlViolation() { - return qerr.Error(qerr.FlowControlReceivedTooMuchData, fmt.Sprintf("Received %d bytes on stream %d, allowed %d bytes", byteOffset, c.streamID, c.receiveWindow)) + return qerr.Error(qerr.FlowControlReceivedTooMuchData, fmt.Sprintf("Received %#x bytes on stream %d, allowed %#x bytes", offset, c.streamID, c.receiveWindow)) } return c.connection.IncrementHighestReceived(increment) } diff --git a/internal/flowcontrol/stream_flow_controller_test.go b/internal/flowcontrol/stream_flow_controller_test.go index 35e1d143..810c9ade 100644 --- a/internal/flowcontrol/stream_flow_controller_test.go +++ b/internal/flowcontrol/stream_flow_controller_test.go @@ -5,7 +5,6 @@ import ( "github.com/lucas-clemente/quic-go/internal/congestion" "github.com/lucas-clemente/quic-go/internal/protocol" - "github.com/lucas-clemente/quic-go/internal/qerr" "github.com/lucas-clemente/quic-go/internal/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -64,7 +63,7 @@ var _ = Describe("Stream Flow controller", func() { Context("receiving data", func() { Context("registering received offsets", func() { - var receiveWindow protocol.ByteCount = 10000 + var receiveWindow protocol.ByteCount = 0x10000 var receiveWindowSize protocol.ByteCount = 600 BeforeEach(func() { @@ -74,83 +73,67 @@ var _ = Describe("Stream Flow controller", func() { It("updates the highestReceived", func() { controller.highestReceived = 1337 - err := controller.UpdateHighestReceived(1338, false) - Expect(err).ToNot(HaveOccurred()) + Expect(controller.UpdateHighestReceived(1338, false)).To(Succeed()) Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1338))) }) It("informs the connection flow controller about received data", func() { controller.highestReceived = 10 controller.connection.(*connectionFlowController).highestReceived = 100 - err := controller.UpdateHighestReceived(20, false) - Expect(err).ToNot(HaveOccurred()) + Expect(controller.UpdateHighestReceived(20, false)).To(Succeed()) Expect(controller.connection.(*connectionFlowController).highestReceived).To(Equal(protocol.ByteCount(100 + 10))) }) It("does not decrease the highestReceived", func() { controller.highestReceived = 1337 - err := controller.UpdateHighestReceived(1000, false) - Expect(err).ToNot(HaveOccurred()) + Expect(controller.UpdateHighestReceived(1000, false)).To(Succeed()) Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1337))) }) It("does nothing when setting the same byte offset", func() { controller.highestReceived = 1337 - err := controller.UpdateHighestReceived(1337, false) - Expect(err).ToNot(HaveOccurred()) + Expect(controller.UpdateHighestReceived(1337, false)).To(Succeed()) }) It("does not give a flow control violation when using the window completely", func() { controller.connection.(*connectionFlowController).receiveWindow = receiveWindow - err := controller.UpdateHighestReceived(receiveWindow, false) - Expect(err).ToNot(HaveOccurred()) + Expect(controller.UpdateHighestReceived(receiveWindow, false)).To(Succeed()) }) It("detects a flow control violation", func() { - err := controller.UpdateHighestReceived(receiveWindow+1, false) - Expect(err).To(MatchError("FlowControlReceivedTooMuchData: Received 10001 bytes on stream 10, allowed 10000 bytes")) + Expect(controller.UpdateHighestReceived(receiveWindow+1, false)).To(MatchError("FlowControlReceivedTooMuchData: Received 0x10001 bytes on stream 10, allowed 0x10000 bytes")) }) It("accepts a final offset higher than the highest received", func() { - controller.highestReceived = 100 - err := controller.UpdateHighestReceived(101, true) - Expect(err).ToNot(HaveOccurred()) + Expect(controller.UpdateHighestReceived(100, false)).To(Succeed()) + Expect(controller.UpdateHighestReceived(101, true)).To(Succeed()) Expect(controller.highestReceived).To(Equal(protocol.ByteCount(101))) }) It("errors when receiving a final offset smaller than the highest offset received so far", func() { - controller.highestReceived = 100 - err := controller.UpdateHighestReceived(99, true) - Expect(err).To(MatchError(qerr.StreamDataAfterTermination)) + controller.UpdateHighestReceived(0x100, false) + Expect(controller.UpdateHighestReceived(0xff, true)).To(MatchError("StreamDataAfterTermination: Received final offset 0xff for stream 10, but already received offset 0x100 before")) }) It("accepts delayed data after receiving a final offset", func() { - err := controller.UpdateHighestReceived(300, true) - Expect(err).ToNot(HaveOccurred()) - err = controller.UpdateHighestReceived(250, false) - Expect(err).ToNot(HaveOccurred()) + Expect(controller.UpdateHighestReceived(300, true)).To(Succeed()) + Expect(controller.UpdateHighestReceived(250, false)).To(Succeed()) }) It("errors when receiving a higher offset after receiving a final offset", func() { - err := controller.UpdateHighestReceived(200, true) - Expect(err).ToNot(HaveOccurred()) - err = controller.UpdateHighestReceived(250, false) - Expect(err).To(MatchError(qerr.StreamDataAfterTermination)) + Expect(controller.UpdateHighestReceived(0x200, true)).To(Succeed()) + Expect(controller.UpdateHighestReceived(0x250, false)).To(MatchError("StreamDataAfterTermination: Received offset 0x250 for stream 10. Final offset was already received at 0x200")) }) It("accepts duplicate final offsets", func() { - err := controller.UpdateHighestReceived(200, true) - Expect(err).ToNot(HaveOccurred()) - err = controller.UpdateHighestReceived(200, true) - Expect(err).ToNot(HaveOccurred()) + Expect(controller.UpdateHighestReceived(200, true)).To(Succeed()) + Expect(controller.UpdateHighestReceived(200, true)).To(Succeed()) Expect(controller.highestReceived).To(Equal(protocol.ByteCount(200))) }) It("errors when receiving inconsistent final offsets", func() { - err := controller.UpdateHighestReceived(200, true) - Expect(err).ToNot(HaveOccurred()) - err = controller.UpdateHighestReceived(201, true) - Expect(err).To(MatchError("StreamDataAfterTermination: Received inconsistent final offset for stream 10 (old: 200, new: 201 bytes)")) + Expect(controller.UpdateHighestReceived(0x200, true)).To(Succeed()) + Expect(controller.UpdateHighestReceived(0x201, true)).To(MatchError("StreamDataAfterTermination: Received inconsistent final offset for stream 10 (old: 0x200, new: 0x201 bytes)")) }) })