mirror of
https://github.com/refraction-networking/uquic.git
synced 2025-04-03 20:27:35 +03:00
parent
d547034ed3
commit
bdbf8d3ebe
3 changed files with 83 additions and 35 deletions
|
@ -17,6 +17,7 @@ type streamFramer struct {
|
|||
flowControlManager flowcontrol.FlowControlManager
|
||||
|
||||
retransmissionQueue []*frames.StreamFrame
|
||||
blockedFrameQueue []*frames.BlockedFrame
|
||||
}
|
||||
|
||||
func newStreamFramer(streams *map[protocol.StreamID]*stream, streamsMutex *sync.RWMutex, flowControlManager flowcontrol.FlowControlManager) *streamFramer {
|
||||
|
@ -95,6 +96,15 @@ func (f *streamFramer) PopStreamFrame(maxLen protocol.ByteCount) (*frames.Stream
|
|||
return f.maybePopNormalFrame(maxLen)
|
||||
}
|
||||
|
||||
func (f *streamFramer) PopBlockedFrame() *frames.BlockedFrame {
|
||||
if len(f.blockedFrameQueue) == 0 {
|
||||
return nil
|
||||
}
|
||||
frame := f.blockedFrameQueue[0]
|
||||
f.blockedFrameQueue = f.blockedFrameQueue[1:]
|
||||
return frame
|
||||
}
|
||||
|
||||
func (f *streamFramer) maybePopFrameForRetransmission(maxLen protocol.ByteCount) *frames.StreamFrame {
|
||||
if len(f.retransmissionQueue) == 0 {
|
||||
return nil
|
||||
|
@ -159,6 +169,18 @@ func (f *streamFramer) maybePopNormalFrame(maxBytes protocol.ByteCount) (*frames
|
|||
if err := f.flowControlManager.AddBytesSent(s.streamID, protocol.ByteCount(len(data))); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Finally, check if we are now FC blocked and should queue a BLOCKED frame
|
||||
individualFcOffset, _ := f.flowControlManager.SendWindowSize(s.streamID) // can never error
|
||||
if s.writeOffset == individualFcOffset {
|
||||
// We are now stream-level FC blocked
|
||||
f.blockedFrameQueue = append(f.blockedFrameQueue, &frames.BlockedFrame{StreamID: s.StreamID()})
|
||||
}
|
||||
if f.flowControlManager.RemainingConnectionWindowSize() == 0 {
|
||||
// We are now connection-level FC blocked
|
||||
f.blockedFrameQueue = append(f.blockedFrameQueue, &frames.BlockedFrame{StreamID: 0})
|
||||
}
|
||||
|
||||
return frame, nil
|
||||
}
|
||||
return nil, nil
|
||||
|
|
|
@ -395,39 +395,59 @@ var _ = Describe("Stream Framer", func() {
|
|||
Expect(frame).To(BeNil())
|
||||
})
|
||||
})
|
||||
|
||||
Context("BLOCKED frames", func() {
|
||||
BeforeEach(func() {
|
||||
fcm.remainingConnectionWindowSize = protocol.MaxByteCount
|
||||
})
|
||||
|
||||
It("Pop returns nil if no frame is queued", func() {
|
||||
Expect(framer.PopBlockedFrame()).To(BeNil())
|
||||
})
|
||||
|
||||
It("queues and pops BLOCKED frames for individually blocked streams", func() {
|
||||
fcm.sendWindowSizes[stream1.StreamID()] = 3
|
||||
stream1.dataForWriting = []byte("foo")
|
||||
_, err := framer.PopStreamFrame(1000)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
blockedFrame := framer.PopBlockedFrame()
|
||||
Expect(blockedFrame).ToNot(BeNil())
|
||||
Expect(blockedFrame.StreamID).To(Equal(stream1.StreamID()))
|
||||
Expect(framer.PopBlockedFrame()).To(BeNil())
|
||||
})
|
||||
|
||||
It("queues and pops BLOCKED frames for connection blocked streams", func() {
|
||||
fcm.remainingConnectionWindowSize = 3
|
||||
fcm.streamsContributing = []protocol.StreamID{stream1.StreamID()}
|
||||
stream1.dataForWriting = []byte("foo")
|
||||
_, err := framer.PopStreamFrame(1000)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
blockedFrame := framer.PopBlockedFrame()
|
||||
Expect(blockedFrame).ToNot(BeNil())
|
||||
Expect(blockedFrame.StreamID).To(BeZero())
|
||||
Expect(framer.PopBlockedFrame()).To(BeNil())
|
||||
})
|
||||
|
||||
It("does not queue BLOCKED frames for non-contributing streams", func() {
|
||||
fcm.remainingConnectionWindowSize = 3
|
||||
stream1.dataForWriting = []byte("foo")
|
||||
_, err := framer.PopStreamFrame(1000)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(framer.PopBlockedFrame()).To(BeNil())
|
||||
})
|
||||
|
||||
It("does not queue BLOCKED frames twice", func() {
|
||||
fcm.sendWindowSizes[stream1.StreamID()] = 3
|
||||
stream1.dataForWriting = []byte("foobar")
|
||||
_, err := framer.PopStreamFrame(1000)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
frame, err := framer.PopStreamFrame(1000)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(frame).To(BeNil())
|
||||
blockedFrame := framer.PopBlockedFrame()
|
||||
Expect(blockedFrame).ToNot(BeNil())
|
||||
Expect(blockedFrame.StreamID).To(Equal(stream1.StreamID()))
|
||||
Expect(framer.PopBlockedFrame()).To(BeNil())
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// Old stream tests
|
||||
|
||||
// PContext("Blocked streams", func() {
|
||||
// It("notifies the session when a stream is flow control blocked", func() {
|
||||
// updated, err := str.flowControlManager.UpdateWindow(str.streamID, 1337)
|
||||
// Expect(err).ToNot(HaveOccurred())
|
||||
// Expect(updated).To(BeTrue())
|
||||
// str.flowControlManager.AddBytesSent(str.streamID, 1337)
|
||||
// str.maybeTriggerBlocked()
|
||||
// Expect(handler.receivedBlockedCalled).To(BeTrue())
|
||||
// Expect(handler.receivedBlockedForStream).To(Equal(str.streamID))
|
||||
// })
|
||||
//
|
||||
// It("notifies the session as soon as a stream is reaching the end of the window", func() {
|
||||
// updated, err := str.flowControlManager.UpdateWindow(str.streamID, 4)
|
||||
// Expect(err).ToNot(HaveOccurred())
|
||||
// Expect(updated).To(BeTrue())
|
||||
// str.Write([]byte{0xDE, 0xCA, 0xFB, 0xAD})
|
||||
// Expect(handler.receivedBlockedCalled).To(BeTrue())
|
||||
// Expect(handler.receivedBlockedForStream).To(Equal(str.streamID))
|
||||
// })
|
||||
//
|
||||
// It("notifies the session as soon as a stream is flow control blocked", func() {
|
||||
// updated, err := str.flowControlManager.UpdateWindow(str.streamID, 2)
|
||||
// Expect(err).ToNot(HaveOccurred())
|
||||
// Expect(updated).To(BeTrue())
|
||||
// go func() {
|
||||
// str.Write([]byte{0xDE, 0xCA, 0xFB, 0xAD})
|
||||
// }()
|
||||
// time.Sleep(time.Millisecond)
|
||||
// Expect(handler.receivedBlockedCalled).To(BeTrue())
|
||||
// Expect(handler.receivedBlockedForStream).To(Equal(str.streamID))
|
||||
// })
|
||||
// })
|
||||
|
|
|
@ -75,6 +75,12 @@ func (m *mockFlowControlHandler) UpdateHighestReceived(streamID protocol.StreamI
|
|||
|
||||
func (m *mockFlowControlHandler) AddBytesSent(streamID protocol.StreamID, n protocol.ByteCount) error {
|
||||
m.bytesSent += n
|
||||
for _, s := range m.streamsContributing {
|
||||
if s == streamID {
|
||||
m.remainingConnectionWindowSize -= n
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue