mirror of
https://github.com/refraction-networking/uquic.git
synced 2025-04-04 12:47:36 +03:00
don't report blocked streams as active for sending data
This commit is contained in:
parent
c626137608
commit
5371f804f8
11 changed files with 161 additions and 61 deletions
|
@ -11,9 +11,8 @@ import (
|
||||||
|
|
||||||
type baseFlowController struct {
|
type baseFlowController struct {
|
||||||
// for sending data
|
// for sending data
|
||||||
bytesSent protocol.ByteCount
|
bytesSent protocol.ByteCount
|
||||||
sendWindow protocol.ByteCount
|
sendWindow protocol.ByteCount
|
||||||
lastBlockedAt protocol.ByteCount
|
|
||||||
|
|
||||||
// for receiving data
|
// for receiving data
|
||||||
mutex sync.RWMutex
|
mutex sync.RWMutex
|
||||||
|
@ -78,17 +77,6 @@ func (c *baseFlowController) getWindowUpdate() protocol.ByteCount {
|
||||||
return c.receiveWindow
|
return c.receiveWindow
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsBlocked says if it is newly blocked by flow control.
|
|
||||||
// For every offset, it only returns true once.
|
|
||||||
// If it is blocked, the offset is returned.
|
|
||||||
func (c *baseFlowController) IsNewlyBlocked() (bool, protocol.ByteCount) {
|
|
||||||
if c.sendWindowSize() != 0 || c.sendWindow == c.lastBlockedAt {
|
|
||||||
return false, 0
|
|
||||||
}
|
|
||||||
c.lastBlockedAt = c.sendWindow
|
|
||||||
return true, c.sendWindow
|
|
||||||
}
|
|
||||||
|
|
||||||
// maybeAdjustWindowSize increases the receiveWindowSize if we're sending updates too often.
|
// maybeAdjustWindowSize increases the receiveWindowSize if we're sending updates too often.
|
||||||
// For details about auto-tuning, see https://docs.google.com/document/d/1SExkMmGiz8VYzV3s9E35JQlJ73vhzCekKkDi85F1qCE/edit?usp=sharing.
|
// For details about auto-tuning, see https://docs.google.com/document/d/1SExkMmGiz8VYzV3s9E35JQlJ73vhzCekKkDi85F1qCE/edit?usp=sharing.
|
||||||
func (c *baseFlowController) maybeAdjustWindowSize() {
|
func (c *baseFlowController) maybeAdjustWindowSize() {
|
||||||
|
|
|
@ -49,29 +49,6 @@ var _ = Describe("Base Flow controller", func() {
|
||||||
controller.UpdateSendWindow(10)
|
controller.UpdateSendWindow(10)
|
||||||
Expect(controller.sendWindowSize()).To(Equal(protocol.ByteCount(20)))
|
Expect(controller.sendWindowSize()).To(Equal(protocol.ByteCount(20)))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("says when it's blocked", func() {
|
|
||||||
controller.UpdateSendWindow(100)
|
|
||||||
Expect(controller.IsNewlyBlocked()).To(BeFalse())
|
|
||||||
controller.AddBytesSent(100)
|
|
||||||
blocked, offset := controller.IsNewlyBlocked()
|
|
||||||
Expect(blocked).To(BeTrue())
|
|
||||||
Expect(offset).To(Equal(protocol.ByteCount(100)))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("doesn't say that it's newly blocked multiple times for the same offset", func() {
|
|
||||||
controller.UpdateSendWindow(100)
|
|
||||||
controller.AddBytesSent(100)
|
|
||||||
newlyBlocked, offset := controller.IsNewlyBlocked()
|
|
||||||
Expect(newlyBlocked).To(BeTrue())
|
|
||||||
Expect(offset).To(Equal(protocol.ByteCount(100)))
|
|
||||||
newlyBlocked, _ = controller.IsNewlyBlocked()
|
|
||||||
Expect(newlyBlocked).To(BeFalse())
|
|
||||||
controller.UpdateSendWindow(150)
|
|
||||||
controller.AddBytesSent(150)
|
|
||||||
newlyBlocked, offset = controller.IsNewlyBlocked()
|
|
||||||
Expect(newlyBlocked).To(BeTrue())
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
|
|
||||||
Context("receive flow control", func() {
|
Context("receive flow control", func() {
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type connectionFlowController struct {
|
type connectionFlowController struct {
|
||||||
|
lastBlockedAt protocol.ByteCount
|
||||||
baseFlowController
|
baseFlowController
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,6 +37,17 @@ func (c *connectionFlowController) SendWindowSize() protocol.ByteCount {
|
||||||
return c.baseFlowController.sendWindowSize()
|
return c.baseFlowController.sendWindowSize()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsNewlyBlocked says if it is newly blocked by flow control.
|
||||||
|
// For every offset, it only returns true once.
|
||||||
|
// If it is blocked, the offset is returned.
|
||||||
|
func (c *connectionFlowController) IsNewlyBlocked() (bool, protocol.ByteCount) {
|
||||||
|
if c.sendWindowSize() != 0 || c.sendWindow == c.lastBlockedAt {
|
||||||
|
return false, 0
|
||||||
|
}
|
||||||
|
c.lastBlockedAt = c.sendWindow
|
||||||
|
return true, c.sendWindow
|
||||||
|
}
|
||||||
|
|
||||||
// IncrementHighestReceived adds an increment to the highestReceived value
|
// IncrementHighestReceived adds an increment to the highestReceived value
|
||||||
func (c *connectionFlowController) IncrementHighestReceived(increment protocol.ByteCount) error {
|
func (c *connectionFlowController) IncrementHighestReceived(increment protocol.ByteCount) error {
|
||||||
c.mutex.Lock()
|
c.mutex.Lock()
|
||||||
|
|
|
@ -77,6 +77,31 @@ var _ = Describe("Connection Flow controller", func() {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Context("send flow control", func() {
|
||||||
|
It("says when it's blocked", func() {
|
||||||
|
controller.UpdateSendWindow(100)
|
||||||
|
Expect(controller.IsNewlyBlocked()).To(BeFalse())
|
||||||
|
controller.AddBytesSent(100)
|
||||||
|
blocked, offset := controller.IsNewlyBlocked()
|
||||||
|
Expect(blocked).To(BeTrue())
|
||||||
|
Expect(offset).To(Equal(protocol.ByteCount(100)))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("doesn't say that it's newly blocked multiple times for the same offset", func() {
|
||||||
|
controller.UpdateSendWindow(100)
|
||||||
|
controller.AddBytesSent(100)
|
||||||
|
newlyBlocked, offset := controller.IsNewlyBlocked()
|
||||||
|
Expect(newlyBlocked).To(BeTrue())
|
||||||
|
Expect(offset).To(Equal(protocol.ByteCount(100)))
|
||||||
|
newlyBlocked, _ = controller.IsNewlyBlocked()
|
||||||
|
Expect(newlyBlocked).To(BeFalse())
|
||||||
|
controller.UpdateSendWindow(150)
|
||||||
|
controller.AddBytesSent(150)
|
||||||
|
newlyBlocked, offset = controller.IsNewlyBlocked()
|
||||||
|
Expect(newlyBlocked).To(BeTrue())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
Context("setting the minimum window size", func() {
|
Context("setting the minimum window size", func() {
|
||||||
var (
|
var (
|
||||||
oldWindowSize protocol.ByteCount
|
oldWindowSize protocol.ByteCount
|
||||||
|
|
|
@ -5,7 +5,6 @@ import "github.com/lucas-clemente/quic-go/internal/protocol"
|
||||||
type flowController interface {
|
type flowController interface {
|
||||||
// for sending
|
// for sending
|
||||||
SendWindowSize() protocol.ByteCount
|
SendWindowSize() protocol.ByteCount
|
||||||
IsNewlyBlocked() (bool, protocol.ByteCount)
|
|
||||||
UpdateSendWindow(protocol.ByteCount)
|
UpdateSendWindow(protocol.ByteCount)
|
||||||
AddBytesSent(protocol.ByteCount)
|
AddBytesSent(protocol.ByteCount)
|
||||||
// for receiving
|
// for receiving
|
||||||
|
@ -16,6 +15,8 @@ type flowController interface {
|
||||||
// A StreamFlowController is a flow controller for a QUIC stream.
|
// A StreamFlowController is a flow controller for a QUIC stream.
|
||||||
type StreamFlowController interface {
|
type StreamFlowController interface {
|
||||||
flowController
|
flowController
|
||||||
|
// for sending
|
||||||
|
IsBlocked() (bool, protocol.ByteCount)
|
||||||
// for receiving
|
// for receiving
|
||||||
// UpdateHighestReceived should be called when a new highest offset is received
|
// UpdateHighestReceived should be called when a new highest offset is received
|
||||||
// final has to be to true if this is the final offset of the stream, as contained in a STREAM frame with FIN bit, and the RST_STREAM frame
|
// final has to be to true if this is the final offset of the stream, as contained in a STREAM frame with FIN bit, and the RST_STREAM frame
|
||||||
|
@ -27,6 +28,8 @@ type StreamFlowController interface {
|
||||||
// The ConnectionFlowController is the flow controller for the connection.
|
// The ConnectionFlowController is the flow controller for the connection.
|
||||||
type ConnectionFlowController interface {
|
type ConnectionFlowController interface {
|
||||||
flowController
|
flowController
|
||||||
|
// for sending
|
||||||
|
IsNewlyBlocked() (bool, protocol.ByteCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
type connectionFlowControllerI interface {
|
type connectionFlowControllerI interface {
|
||||||
|
|
|
@ -109,6 +109,15 @@ func (c *streamFlowController) SendWindowSize() protocol.ByteCount {
|
||||||
return window
|
return window
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsBlocked says if it is blocked by stream-level flow control.
|
||||||
|
// If it is blocked, the offset is returned.
|
||||||
|
func (c *streamFlowController) IsBlocked() (bool, protocol.ByteCount) {
|
||||||
|
if c.sendWindowSize() != 0 {
|
||||||
|
return false, 0
|
||||||
|
}
|
||||||
|
return true, c.sendWindow
|
||||||
|
}
|
||||||
|
|
||||||
func (c *streamFlowController) HasWindowUpdate() bool {
|
func (c *streamFlowController) HasWindowUpdate() bool {
|
||||||
c.mutex.Lock()
|
c.mutex.Lock()
|
||||||
hasWindowUpdate := !c.receivedFinalOffset && c.hasWindowUpdate()
|
hasWindowUpdate := !c.receivedFinalOffset && c.hasWindowUpdate()
|
||||||
|
|
|
@ -247,7 +247,7 @@ var _ = Describe("Stream Flow controller", func() {
|
||||||
controller.AddBytesSent(50)
|
controller.AddBytesSent(50)
|
||||||
blocked, _ := controller.connection.IsNewlyBlocked()
|
blocked, _ := controller.connection.IsNewlyBlocked()
|
||||||
Expect(blocked).To(BeTrue())
|
Expect(blocked).To(BeTrue())
|
||||||
Expect(controller.IsNewlyBlocked()).To(BeFalse())
|
Expect(controller.IsBlocked()).To(BeFalse())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
|
@ -78,17 +78,17 @@ func (mr *MockStreamFlowControllerMockRecorder) HasWindowUpdate() *gomock.Call {
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasWindowUpdate", reflect.TypeOf((*MockStreamFlowController)(nil).HasWindowUpdate))
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasWindowUpdate", reflect.TypeOf((*MockStreamFlowController)(nil).HasWindowUpdate))
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsNewlyBlocked mocks base method
|
// IsBlocked mocks base method
|
||||||
func (m *MockStreamFlowController) IsNewlyBlocked() (bool, protocol.ByteCount) {
|
func (m *MockStreamFlowController) IsBlocked() (bool, protocol.ByteCount) {
|
||||||
ret := m.ctrl.Call(m, "IsNewlyBlocked")
|
ret := m.ctrl.Call(m, "IsBlocked")
|
||||||
ret0, _ := ret[0].(bool)
|
ret0, _ := ret[0].(bool)
|
||||||
ret1, _ := ret[1].(protocol.ByteCount)
|
ret1, _ := ret[1].(protocol.ByteCount)
|
||||||
return ret0, ret1
|
return ret0, ret1
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsNewlyBlocked indicates an expected call of IsNewlyBlocked
|
// IsBlocked indicates an expected call of IsBlocked
|
||||||
func (mr *MockStreamFlowControllerMockRecorder) IsNewlyBlocked() *gomock.Call {
|
func (mr *MockStreamFlowControllerMockRecorder) IsBlocked() *gomock.Call {
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsNewlyBlocked", reflect.TypeOf((*MockStreamFlowController)(nil).IsNewlyBlocked))
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsBlocked", reflect.TypeOf((*MockStreamFlowController)(nil).IsBlocked))
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendWindowSize mocks base method
|
// SendWindowSize mocks base method
|
||||||
|
|
|
@ -36,7 +36,8 @@ type sendStream struct {
|
||||||
writeDeadline time.Time
|
writeDeadline time.Time
|
||||||
|
|
||||||
flowController flowcontrol.StreamFlowController
|
flowController flowcontrol.StreamFlowController
|
||||||
version protocol.VersionNumber
|
|
||||||
|
version protocol.VersionNumber
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ SendStream = &sendStream{}
|
var _ SendStream = &sendStream{}
|
||||||
|
@ -141,16 +142,25 @@ func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFr
|
||||||
}
|
}
|
||||||
frame.Data, frame.FinBit = s.getDataForWriting(maxBytes - frameLen)
|
frame.Data, frame.FinBit = s.getDataForWriting(maxBytes - frameLen)
|
||||||
if len(frame.Data) == 0 && !frame.FinBit {
|
if len(frame.Data) == 0 && !frame.FinBit {
|
||||||
return nil, s.dataForWriting != nil
|
// this can happen if:
|
||||||
|
// - popStreamFrame is called but there's no data for writing
|
||||||
|
// - there's data for writing, but the stream is stream-level flow control blocked
|
||||||
|
// - there's data for writing, but the stream is connection-level flow control blocked
|
||||||
|
if s.dataForWriting == nil {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
isBlocked, _ := s.flowController.IsBlocked()
|
||||||
|
return nil, !isBlocked
|
||||||
}
|
}
|
||||||
if frame.FinBit {
|
if frame.FinBit {
|
||||||
s.finSent = true
|
s.finSent = true
|
||||||
} else if s.streamID != s.version.CryptoStreamID() { // TODO(#657): Flow control for the crypto stream
|
} else if s.streamID != s.version.CryptoStreamID() { // TODO(#657): Flow control for the crypto stream
|
||||||
if isBlocked, offset := s.flowController.IsNewlyBlocked(); isBlocked {
|
if isBlocked, offset := s.flowController.IsBlocked(); isBlocked {
|
||||||
s.sender.queueControlFrame(&wire.StreamBlockedFrame{
|
s.sender.queueControlFrame(&wire.StreamBlockedFrame{
|
||||||
StreamID: s.streamID,
|
StreamID: s.streamID,
|
||||||
Offset: offset,
|
Offset: offset,
|
||||||
})
|
})
|
||||||
|
return frame, false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return frame, s.dataForWriting != nil
|
return frame, s.dataForWriting != nil
|
||||||
|
@ -232,6 +242,11 @@ func (s *sendStream) handleStopSendingFrame(frame *wire.StopSendingFrame) {
|
||||||
|
|
||||||
func (s *sendStream) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) {
|
func (s *sendStream) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) {
|
||||||
s.flowController.UpdateSendWindow(frame.ByteOffset)
|
s.flowController.UpdateSendWindow(frame.ByteOffset)
|
||||||
|
s.mutex.Lock()
|
||||||
|
if s.dataForWriting != nil {
|
||||||
|
s.sender.onHasStreamData(s.streamID)
|
||||||
|
}
|
||||||
|
s.mutex.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// must be called after locking the mutex
|
// must be called after locking the mutex
|
||||||
|
|
|
@ -45,7 +45,7 @@ var _ = Describe("Send Stream", func() {
|
||||||
mockSender.EXPECT().onHasStreamData(streamID)
|
mockSender.EXPECT().onHasStreamData(streamID)
|
||||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
|
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
|
||||||
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
|
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
|
||||||
mockFC.EXPECT().IsNewlyBlocked()
|
mockFC.EXPECT().IsBlocked()
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
defer GinkgoRecover()
|
defer GinkgoRecover()
|
||||||
|
@ -74,7 +74,7 @@ var _ = Describe("Send Stream", func() {
|
||||||
frameHeaderLen := protocol.ByteCount(4)
|
frameHeaderLen := protocol.ByteCount(4)
|
||||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
|
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
|
||||||
mockFC.EXPECT().AddBytesSent(gomock.Any() /* protocol.ByteCount(3)*/).Times(2)
|
mockFC.EXPECT().AddBytesSent(gomock.Any() /* protocol.ByteCount(3)*/).Times(2)
|
||||||
mockFC.EXPECT().IsNewlyBlocked().Times(2)
|
mockFC.EXPECT().IsBlocked().Times(2)
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
defer GinkgoRecover()
|
defer GinkgoRecover()
|
||||||
|
@ -112,7 +112,7 @@ var _ = Describe("Send Stream", func() {
|
||||||
mockSender.EXPECT().onHasStreamData(streamID)
|
mockSender.EXPECT().onHasStreamData(streamID)
|
||||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
|
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
|
||||||
mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2)
|
mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2)
|
||||||
mockFC.EXPECT().IsNewlyBlocked().Times(2)
|
mockFC.EXPECT().IsBlocked().Times(2)
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
defer GinkgoRecover()
|
defer GinkgoRecover()
|
||||||
|
@ -142,7 +142,7 @@ var _ = Describe("Send Stream", func() {
|
||||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
|
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
|
||||||
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(1))
|
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(1))
|
||||||
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(2))
|
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(2))
|
||||||
mockFC.EXPECT().IsNewlyBlocked().Times(2)
|
mockFC.EXPECT().IsBlocked().Times(2)
|
||||||
s := []byte("foo")
|
s := []byte("foo")
|
||||||
go func() {
|
go func() {
|
||||||
defer GinkgoRecover()
|
defer GinkgoRecover()
|
||||||
|
@ -178,7 +178,27 @@ var _ = Describe("Send Stream", func() {
|
||||||
Expect(str.Context().Done()).To(BeClosed())
|
Expect(str.Context().Done()).To(BeClosed())
|
||||||
})
|
})
|
||||||
|
|
||||||
Context("adding BLOCKED", func() {
|
Context("flow control blocking", func() {
|
||||||
|
It("returns nil when it is blocked", func() {
|
||||||
|
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(0))
|
||||||
|
mockFC.EXPECT().IsBlocked().Return(true, protocol.ByteCount(10))
|
||||||
|
mockSender.EXPECT().onHasStreamData(streamID)
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
_, err := str.Write([]byte("foobar"))
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
Consistently(done).ShouldNot(BeClosed())
|
||||||
|
f, hasMoreData := str.popStreamFrame(1000)
|
||||||
|
Expect(f).To(BeNil())
|
||||||
|
Expect(hasMoreData).To(BeFalse())
|
||||||
|
// make the Write go routine return
|
||||||
|
str.closeForShutdown(nil)
|
||||||
|
Eventually(done).Should(BeClosed())
|
||||||
|
})
|
||||||
|
|
||||||
It("queues a BLOCKED frame if the stream is flow control blocked", func() {
|
It("queues a BLOCKED frame if the stream is flow control blocked", func() {
|
||||||
mockSender.EXPECT().onHasStreamData(streamID)
|
mockSender.EXPECT().onHasStreamData(streamID)
|
||||||
mockSender.EXPECT().queueControlFrame(&wire.StreamBlockedFrame{
|
mockSender.EXPECT().queueControlFrame(&wire.StreamBlockedFrame{
|
||||||
|
@ -188,7 +208,7 @@ var _ = Describe("Send Stream", func() {
|
||||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
|
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
|
||||||
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
|
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
|
||||||
// don't use offset 6 here, to make sure the BLOCKED frame contains the number returned by the flow controller
|
// don't use offset 6 here, to make sure the BLOCKED frame contains the number returned by the flow controller
|
||||||
mockFC.EXPECT().IsNewlyBlocked().Return(true, protocol.ByteCount(10))
|
mockFC.EXPECT().IsBlocked().Return(true, protocol.ByteCount(10))
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
defer GinkgoRecover()
|
defer GinkgoRecover()
|
||||||
|
@ -204,11 +224,34 @@ var _ = Describe("Send Stream", func() {
|
||||||
Eventually(done).Should(BeClosed())
|
Eventually(done).Should(BeClosed())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("says that it doesn't have any more data, when it is flow control blocked", func() {
|
||||||
|
mockSender.EXPECT().onHasStreamData(streamID)
|
||||||
|
mockSender.EXPECT().queueControlFrame(gomock.Any())
|
||||||
|
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
|
||||||
|
mockFC.EXPECT().AddBytesSent(gomock.Any())
|
||||||
|
mockFC.EXPECT().IsBlocked().Return(true, protocol.ByteCount(10))
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
_, err := str.Write(bytes.Repeat([]byte{0}, 100))
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
Eventually(func() *wire.StreamFrame {
|
||||||
|
f, hasMoreData := str.popStreamFrame(50)
|
||||||
|
Expect(hasMoreData).To(BeFalse())
|
||||||
|
return f
|
||||||
|
}).ShouldNot(BeNil())
|
||||||
|
// make the Write go routine return
|
||||||
|
str.closeForShutdown(nil)
|
||||||
|
Eventually(done).Should(BeClosed())
|
||||||
|
})
|
||||||
|
|
||||||
It("doesn't queue a BLOCKED frame if the stream is flow control blocked, but the frame popped has the FIN bit set", func() {
|
It("doesn't queue a BLOCKED frame if the stream is flow control blocked, but the frame popped has the FIN bit set", func() {
|
||||||
mockSender.EXPECT().onHasStreamData(streamID).Times(2) // once for the Write, once for the Close
|
mockSender.EXPECT().onHasStreamData(streamID).Times(2) // once for the Write, once for the Close
|
||||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
|
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
|
||||||
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
|
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
|
||||||
// don't EXPECT a call to mockFC.IsNewlyBlocked
|
// don't EXPECT a call to mockFC.IsBlocked
|
||||||
// don't EXPECT a call to mockSender.queueControlFrame
|
// don't EXPECT a call to mockSender.queueControlFrame
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -251,7 +294,7 @@ var _ = Describe("Send Stream", func() {
|
||||||
mockSender.EXPECT().onHasStreamData(streamID)
|
mockSender.EXPECT().onHasStreamData(streamID)
|
||||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(10000)).AnyTimes()
|
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(10000)).AnyTimes()
|
||||||
mockFC.EXPECT().AddBytesSent(gomock.Any())
|
mockFC.EXPECT().AddBytesSent(gomock.Any())
|
||||||
mockFC.EXPECT().IsNewlyBlocked()
|
mockFC.EXPECT().IsBlocked()
|
||||||
deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
|
deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
|
||||||
str.SetWriteDeadline(deadline)
|
str.SetWriteDeadline(deadline)
|
||||||
var n int
|
var n int
|
||||||
|
@ -277,7 +320,7 @@ var _ = Describe("Send Stream", func() {
|
||||||
mockSender.EXPECT().onHasStreamData(streamID)
|
mockSender.EXPECT().onHasStreamData(streamID)
|
||||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(10000)).AnyTimes()
|
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(10000)).AnyTimes()
|
||||||
mockFC.EXPECT().AddBytesSent(gomock.Any())
|
mockFC.EXPECT().AddBytesSent(gomock.Any())
|
||||||
mockFC.EXPECT().IsNewlyBlocked()
|
mockFC.EXPECT().IsBlocked()
|
||||||
deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
|
deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
|
||||||
str.SetWriteDeadline(deadline)
|
str.SetWriteDeadline(deadline)
|
||||||
writeReturned := make(chan struct{})
|
writeReturned := make(chan struct{})
|
||||||
|
@ -359,7 +402,7 @@ var _ = Describe("Send Stream", func() {
|
||||||
frameHeaderLen := protocol.ByteCount(4)
|
frameHeaderLen := protocol.ByteCount(4)
|
||||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
|
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
|
||||||
mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2)
|
mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2)
|
||||||
mockFC.EXPECT().IsNewlyBlocked()
|
mockFC.EXPECT().IsBlocked()
|
||||||
str.dataForWriting = []byte("foobar")
|
str.dataForWriting = []byte("foobar")
|
||||||
Expect(str.Close()).To(Succeed())
|
Expect(str.Close()).To(Succeed())
|
||||||
f, _ := str.popStreamFrame(3 + frameHeaderLen)
|
f, _ := str.popStreamFrame(3 + frameHeaderLen)
|
||||||
|
@ -405,7 +448,7 @@ var _ = Describe("Send Stream", func() {
|
||||||
mockSender.EXPECT().onHasStreamData(streamID)
|
mockSender.EXPECT().onHasStreamData(streamID)
|
||||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
|
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
|
||||||
mockFC.EXPECT().AddBytesSent(gomock.Any())
|
mockFC.EXPECT().AddBytesSent(gomock.Any())
|
||||||
mockFC.EXPECT().IsNewlyBlocked()
|
mockFC.EXPECT().IsBlocked()
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
defer GinkgoRecover()
|
defer GinkgoRecover()
|
||||||
|
@ -432,6 +475,34 @@ var _ = Describe("Send Stream", func() {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Context("hanlding MAX_STREAM_DATA frames", func() {
|
||||||
|
It("informs the flow controller", func() {
|
||||||
|
mockFC.EXPECT().UpdateSendWindow(protocol.ByteCount(0x1337))
|
||||||
|
str.handleMaxStreamDataFrame(&wire.MaxStreamDataFrame{
|
||||||
|
StreamID: streamID,
|
||||||
|
ByteOffset: 0x1337,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
It("says when it has data for sending", func() {
|
||||||
|
mockFC.EXPECT().UpdateSendWindow(gomock.Any())
|
||||||
|
mockSender.EXPECT().onHasStreamData(streamID)
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
_, _ = str.Write([]byte("foobar"))
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
str.handleMaxStreamDataFrame(&wire.MaxStreamDataFrame{
|
||||||
|
StreamID: streamID,
|
||||||
|
ByteOffset: 42,
|
||||||
|
})
|
||||||
|
// make sure the Write go routine returns
|
||||||
|
str.closeForShutdown(nil)
|
||||||
|
Eventually(done).Should(BeClosed())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
Context("stream cancelations", func() {
|
Context("stream cancelations", func() {
|
||||||
Context("canceling writing", func() {
|
Context("canceling writing", func() {
|
||||||
It("queues a RST_STREAM frame", func() {
|
It("queues a RST_STREAM frame", func() {
|
||||||
|
@ -450,7 +521,7 @@ var _ = Describe("Send Stream", func() {
|
||||||
mockSender.EXPECT().queueControlFrame(gomock.Any())
|
mockSender.EXPECT().queueControlFrame(gomock.Any())
|
||||||
mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount)
|
mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount)
|
||||||
mockFC.EXPECT().AddBytesSent(gomock.Any())
|
mockFC.EXPECT().AddBytesSent(gomock.Any())
|
||||||
mockFC.EXPECT().IsNewlyBlocked()
|
mockFC.EXPECT().IsBlocked()
|
||||||
writeReturned := make(chan struct{})
|
writeReturned := make(chan struct{})
|
||||||
var n int
|
var n int
|
||||||
go func() {
|
go func() {
|
||||||
|
|
|
@ -130,7 +130,7 @@ var _ = Describe("Stream", func() {
|
||||||
mockSender.EXPECT().onHasStreamData(streamID).Times(2) // once for the Write, once for the Close
|
mockSender.EXPECT().onHasStreamData(streamID).Times(2) // once for the Write, once for the Close
|
||||||
mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).AnyTimes()
|
mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).AnyTimes()
|
||||||
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
|
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
|
||||||
mockFC.EXPECT().IsNewlyBlocked()
|
mockFC.EXPECT().IsBlocked()
|
||||||
err := str.CancelRead(1234)
|
err := str.CancelRead(1234)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
writeReturned := make(chan struct{})
|
writeReturned := make(chan struct{})
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue