From bcc7baf111179581de3ce2f356986acf61ab3f33 Mon Sep 17 00:00:00 2001 From: Mathis Engelbart Date: Tue, 9 Mar 2021 18:17:38 +0100 Subject: [PATCH] add an internal queue to signal that a datagram frame has been dequeued --- datagram_queue.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/datagram_queue.go b/datagram_queue.go index 92b5c3b0..b1cbbf6d 100644 --- a/datagram_queue.go +++ b/datagram_queue.go @@ -15,14 +15,17 @@ type datagramQueue struct { hasData func() + dequeued chan struct{} + logger utils.Logger } func newDatagramQueue(hasData func(), logger utils.Logger) *datagramQueue { return &datagramQueue{ hasData: hasData, - sendQueue: make(chan *wire.DatagramFrame), + sendQueue: make(chan *wire.DatagramFrame, 1), rcvQueue: make(chan []byte, protocol.DatagramRcvQueueLen), + dequeued: make(chan struct{}), closed: make(chan struct{}), logger: logger, } @@ -31,9 +34,15 @@ func newDatagramQueue(hasData func(), logger utils.Logger) *datagramQueue { // AddAndWait queues a new DATAGRAM frame for sending. // It blocks until the frame has been dequeued. func (h *datagramQueue) AddAndWait(f *wire.DatagramFrame) error { - h.hasData() select { case h.sendQueue <- f: + h.hasData() + case <-h.closed: + return h.closeErr + } + + select { + case <-h.dequeued: return nil case <-h.closed: return h.closeErr @@ -44,6 +53,7 @@ func (h *datagramQueue) AddAndWait(f *wire.DatagramFrame) error { func (h *datagramQueue) Get() *wire.DatagramFrame { select { case f := <-h.sendQueue: + h.dequeued <- struct{}{} return f default: return nil