remove stream from BlockManager when a WindowUpdate increases its offset

fixes #130
This commit is contained in:
Marten Seemann 2016-05-20 12:16:33 +07:00
parent 7f907a17c2
commit d571c0bfd5
7 changed files with 91 additions and 20 deletions

View file

@ -25,6 +25,13 @@ func (m *blockedManager) AddBlockedStream(streamID protocol.StreamID, offset pro
m.blockedStreams[streamID] = offset m.blockedStreams[streamID] = offset
} }
func (m *blockedManager) RemoveBlockedStream(streamID protocol.StreamID) {
m.mutex.Lock()
defer m.mutex.Unlock()
delete(m.blockedStreams, streamID)
}
func (m *blockedManager) GetBlockedFrame(streamID protocol.StreamID, offset protocol.ByteCount) *frames.BlockedFrame { func (m *blockedManager) GetBlockedFrame(streamID protocol.StreamID, offset protocol.ByteCount) *frames.BlockedFrame {
m.mutex.RLock() m.mutex.RLock()
defer m.mutex.RUnlock() defer m.mutex.RUnlock()

View file

@ -31,6 +31,17 @@ var _ = Describe("WindowUpdateManager", func() {
Expect(bm.GetBlockedFrame(1337, 0x1337)).To(BeNil()) Expect(bm.GetBlockedFrame(1337, 0x1337)).To(BeNil())
}) })
It("removes the blocked entry when the stream is not blocked anymore", func() {
bm.AddBlockedStream(1337, 0x1337)
bm.RemoveBlockedStream(1337)
Expect(bm.GetBlockedFrame(1337, 0x1337)).To(BeNil())
})
It("doesn't care if the stream was previously blocked when removing the block", func() {
bm.RemoveBlockedStream(1337)
Expect(bm.GetBlockedFrame(1337, 0x1337)).To(BeNil())
})
It("doesn't get a blocked frame for smaller offsets", func() { It("doesn't get a blocked frame for smaller offsets", func() {
bm.AddBlockedStream(1337, 0x1337) bm.AddBlockedStream(1337, 0x1337)
Expect(bm.GetBlockedFrame(1337, 0x1336)).To(BeNil()) Expect(bm.GetBlockedFrame(1337, 0x1336)).To(BeNil())

View file

@ -35,14 +35,14 @@ type packetPacker struct {
lastPacketNumber protocol.PacketNumber lastPacketNumber protocol.PacketNumber
} }
func newPacketPacker(connectionID protocol.ConnectionID, aead crypto.AEAD, sentPacketHandler ackhandler.SentPacketHandler, connectionParametersHandler *handshake.ConnectionParametersManager, version protocol.VersionNumber) *packetPacker { func newPacketPacker(connectionID protocol.ConnectionID, aead crypto.AEAD, sentPacketHandler ackhandler.SentPacketHandler, connectionParametersHandler *handshake.ConnectionParametersManager, blockedManager *blockedManager, version protocol.VersionNumber) *packetPacker {
return &packetPacker{ return &packetPacker{
aead: aead, aead: aead,
connectionID: connectionID, connectionID: connectionID,
connectionParametersManager: connectionParametersHandler, connectionParametersManager: connectionParametersHandler,
version: version, version: version,
sentPacketHandler: sentPacketHandler, sentPacketHandler: sentPacketHandler,
blockedManager: newBlockedManager(), blockedManager: blockedManager,
} }
} }
@ -57,6 +57,7 @@ func (p *packetPacker) AddHighPrioStreamFrame(f frames.StreamFrame) {
func (p *packetPacker) AddBlocked(streamID protocol.StreamID, byteOffset protocol.ByteCount) { func (p *packetPacker) AddBlocked(streamID protocol.StreamID, byteOffset protocol.ByteCount) {
// TODO: send out connection-level BlockedFrames at the right time // TODO: send out connection-level BlockedFrames at the right time
// see https://github.com/lucas-clemente/quic-go/issues/113 // see https://github.com/lucas-clemente/quic-go/issues/113
// TODO: remove this function completely once #113 is resolved
if streamID == 0 { if streamID == 0 {
p.controlFrames = append(p.controlFrames, &frames.BlockedFrame{StreamID: 0}) p.controlFrames = append(p.controlFrames, &frames.BlockedFrame{StreamID: 0})
} }

View file

@ -51,6 +51,7 @@ type Session struct {
receivedPacketHandler ackhandler.ReceivedPacketHandler receivedPacketHandler ackhandler.ReceivedPacketHandler
stopWaitingManager ackhandler.StopWaitingManager stopWaitingManager ackhandler.StopWaitingManager
windowUpdateManager *windowUpdateManager windowUpdateManager *windowUpdateManager
blockedManager *blockedManager
flowController flowcontrol.FlowController // connection level flow controller flowController flowcontrol.FlowController // connection level flow controller
@ -96,6 +97,7 @@ func newSession(conn connection, v protocol.VersionNumber, connectionID protocol
stopWaitingManager: stopWaitingManager, stopWaitingManager: stopWaitingManager,
flowController: flowcontrol.NewFlowController(0, connectionParametersManager), flowController: flowcontrol.NewFlowController(0, connectionParametersManager),
windowUpdateManager: newWindowUpdateManager(), windowUpdateManager: newWindowUpdateManager(),
blockedManager: newBlockedManager(),
receivedPackets: make(chan receivedPacket, protocol.MaxSessionUnprocessedPackets), receivedPackets: make(chan receivedPacket, protocol.MaxSessionUnprocessedPackets),
closeChan: make(chan struct{}, 1), closeChan: make(chan struct{}, 1),
sendingScheduled: make(chan struct{}, 1), sendingScheduled: make(chan struct{}, 1),
@ -113,7 +115,7 @@ func newSession(conn connection, v protocol.VersionNumber, connectionID protocol
return nil, err return nil, err
} }
session.packer = newPacketPacker(connectionID, session.cryptoSetup, session.sentPacketHandler, session.connectionParametersManager, v) session.packer = newPacketPacker(connectionID, session.cryptoSetup, session.sentPacketHandler, session.connectionParametersManager, session.blockedManager, v)
session.unpacker = &packetUnpacker{aead: session.cryptoSetup, version: v} session.unpacker = &packetUnpacker{aead: session.cryptoSetup, version: v}
return session, err return session, err
@ -302,7 +304,10 @@ func (s *Session) isValidStreamID(streamID protocol.StreamID) bool {
func (s *Session) handleWindowUpdateFrame(frame *frames.WindowUpdateFrame) error { func (s *Session) handleWindowUpdateFrame(frame *frames.WindowUpdateFrame) error {
if frame.StreamID == 0 { if frame.StreamID == 0 {
s.flowController.UpdateSendWindow(frame.ByteOffset) updated := s.flowController.UpdateSendWindow(frame.ByteOffset)
if updated {
s.blockedManager.RemoveBlockedStream(0)
}
s.streamsMutex.RLock() s.streamsMutex.RLock()
// tell all streams that the connection-level was updated // tell all streams that the connection-level was updated
for _, stream := range s.streams { for _, stream := range s.streams {
@ -322,7 +327,10 @@ func (s *Session) handleWindowUpdateFrame(frame *frames.WindowUpdateFrame) error
} }
s.streamsMutex.RUnlock() s.streamsMutex.RUnlock()
stream.UpdateSendFlowControlWindow(frame.ByteOffset) updated := stream.UpdateSendFlowControlWindow(frame.ByteOffset)
if updated {
s.blockedManager.RemoveBlockedStream(frame.StreamID)
}
} }
return nil return nil

View file

@ -381,26 +381,54 @@ var _ = Describe("Session", func() {
Expect(conn.written).To(HaveLen(int(protocol.WindowUpdateNumRepitions))) // no packet was sent Expect(conn.written).To(HaveLen(int(protocol.WindowUpdateNumRepitions))) // no packet was sent
}) })
It("queues a Blocked frames", func() {
len := 500
frame := frames.StreamFrame{
StreamID: 0x1337,
Data: bytes.Repeat([]byte{'f'}, len),
}
session.streamBlocked(0x1337, protocol.ByteCount(len))
session.packer.AddStreamFrame(frame)
err := session.sendPacket()
Expect(err).NotTo(HaveOccurred())
Expect(conn.written).To(HaveLen(1))
Expect(conn.written[0]).To(ContainSubstring(string([]byte{0x05, 0x37, 0x13, 0, 0})))
})
It("sends public reset", func() { It("sends public reset", func() {
err := session.sendPublicReset(1) err := session.sendPublicReset(1)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(conn.written).To(HaveLen(1)) Expect(conn.written).To(HaveLen(1))
Expect(conn.written[0]).To(ContainSubstring(string([]byte("PRST")))) Expect(conn.written[0]).To(ContainSubstring(string([]byte("PRST"))))
}) })
Context("Blocked", func() {
It("queues a Blocked frames", func() {
len := 500
frame := frames.StreamFrame{
StreamID: 0x1337,
Data: bytes.Repeat([]byte{'f'}, len),
}
session.streamBlocked(0x1337, protocol.ByteCount(len))
session.packer.AddStreamFrame(frame)
err := session.sendPacket()
Expect(err).NotTo(HaveOccurred())
Expect(conn.written).To(HaveLen(1))
Expect(conn.written[0]).To(ContainSubstring(string([]byte{0x05, 0x37, 0x13, 0, 0})))
})
It("does not send a blocked frame for a stream if a WindowUpdate arrived before", func() {
len := 500
_, err := session.OpenStream(0x1337)
Expect(err).ToNot(HaveOccurred())
session.streamBlocked(0x1337, protocol.ByteCount(len))
wuf := frames.WindowUpdateFrame{
StreamID: 0x1337,
ByteOffset: protocol.ByteCount(len * 2),
}
err = session.handleWindowUpdateFrame(&wuf)
Expect(err).ToNot(HaveOccurred())
Expect(session.blockedManager.GetBlockedFrame(0x1337, protocol.ByteCount(len))).To(BeNil())
})
It("does not send a blocked frame for the connection if a WindowUpdate arrived before", func() {
len := 500
session.streamBlocked(0, protocol.ByteCount(len))
wuf := frames.WindowUpdateFrame{
StreamID: 0,
ByteOffset: protocol.ByteCount(len * 2),
}
err := session.handleWindowUpdateFrame(&wuf)
Expect(err).ToNot(HaveOccurred())
Expect(session.blockedManager.GetBlockedFrame(0, protocol.ByteCount(len))).To(BeNil())
})
})
}) })
Context("scheduling sending", func() { Context("scheduling sending", func() {

View file

@ -161,10 +161,12 @@ func (s *stream) ConnectionFlowControlWindowUpdated() {
s.windowUpdateOrErrCond.Broadcast() s.windowUpdateOrErrCond.Broadcast()
} }
func (s *stream) UpdateSendFlowControlWindow(n protocol.ByteCount) { func (s *stream) UpdateSendFlowControlWindow(n protocol.ByteCount) bool {
if s.flowController.UpdateSendWindow(n) { if s.flowController.UpdateSendWindow(n) {
s.windowUpdateOrErrCond.Broadcast() s.windowUpdateOrErrCond.Broadcast()
return true
} }
return false
} }
func (s *stream) Write(p []byte) (int, error) { func (s *stream) Write(p []byte) (int, error) {

View file

@ -352,6 +352,20 @@ var _ = Describe("Stream", func() {
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })
It("returns true when the flow control window was updated", func() {
updated := str.flowController.UpdateSendWindow(4)
Expect(updated).To(BeTrue())
updated = str.UpdateSendFlowControlWindow(5)
Expect(updated).To(BeTrue())
})
It("returns false when the flow control window was not updated", func() {
updated := str.flowController.UpdateSendWindow(4)
Expect(updated).To(BeTrue())
updated = str.UpdateSendFlowControlWindow(3)
Expect(updated).To(BeFalse())
})
It("waits for a stream flow control window update", func() { It("waits for a stream flow control window update", func() {
var b bool var b bool
updated := str.flowController.UpdateSendWindow(1) updated := str.flowController.UpdateSendWindow(1)