From 93e724434b65aa807fa8c8c01bcefc19093a17ce Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 13 Feb 2020 15:22:15 +0700 Subject: [PATCH] make sure that all packets in the send queue are sent before closing --- send_queue.go | 38 ++++++++++++++++++++++++-------------- send_queue_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 14 deletions(-) diff --git a/send_queue.go b/send_queue.go index 957296aa..d4992cfe 100644 --- a/send_queue.go +++ b/send_queue.go @@ -1,16 +1,18 @@ package quic type sendQueue struct { - queue chan *packedPacket - closeChan chan struct{} - conn connection + queue chan *packedPacket + closeCalled chan struct{} // runStopped when Close() is called + runStopped chan struct{} // runStopped when the run loop returns + conn connection } func newSendQueue(conn connection) *sendQueue { s := &sendQueue{ - conn: conn, - closeChan: make(chan struct{}), - queue: make(chan *packedPacket, 1), + conn: conn, + runStopped: make(chan struct{}), + closeCalled: make(chan struct{}), + queue: make(chan *packedPacket, 1), } return s } @@ -20,20 +22,28 @@ func (h *sendQueue) Send(p *packedPacket) { } func (h *sendQueue) Run() error { - var p *packedPacket + defer close(h.runStopped) + var shouldClose bool for { - select { - case <-h.closeChan: + if shouldClose && len(h.queue) == 0 { return nil - case p = <-h.queue: } - if err := h.conn.Write(p.raw); err != nil { - return err + select { + case <-h.closeCalled: + h.closeCalled = nil // prevent this case from being selected again + // make sure that all queued packets are actually sent out + shouldClose = true + case p := <-h.queue: + if err := h.conn.Write(p.raw); err != nil { + return err + } + p.buffer.Release() } - p.buffer.Release() } } func (h *sendQueue) Close() { - close(h.closeChan) + close(h.closeCalled) + // wait until the run loop returned + <-h.runStopped } diff --git a/send_queue_test.go b/send_queue_test.go index b245ec68..37b5a2e4 100644 --- a/send_queue_test.go +++ b/send_queue_test.go @@ -70,4 +70,30 @@ var _ = Describe("Send Queue", func() { q.Close() Eventually(done).Should(BeClosed()) }) + + It("blocks Close() until the packet has been sent out", func() { + written := make(chan []byte) + c.EXPECT().Write(gomock.Any()).Do(func(p []byte) { written <- p }) + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + q.Run() + close(done) + }() + + q.Send(getPacket([]byte("foobar"))) + + closed := make(chan struct{}) + go func() { + defer GinkgoRecover() + q.Close() + close(closed) + }() + + Consistently(closed).ShouldNot(BeClosed()) + // now write the packet + Expect(written).To(Receive()) + Eventually(done).Should(BeClosed()) + Eventually(closed).Should(BeClosed()) + }) })