diff --git a/datagram_queue.go b/datagram_queue.go index 92b5c3b0..b1cbbf6d 100644 --- a/datagram_queue.go +++ b/datagram_queue.go @@ -15,14 +15,17 @@ type datagramQueue struct { hasData func() + dequeued chan struct{} + logger utils.Logger } func newDatagramQueue(hasData func(), logger utils.Logger) *datagramQueue { return &datagramQueue{ hasData: hasData, - sendQueue: make(chan *wire.DatagramFrame), + sendQueue: make(chan *wire.DatagramFrame, 1), rcvQueue: make(chan []byte, protocol.DatagramRcvQueueLen), + dequeued: make(chan struct{}), closed: make(chan struct{}), logger: logger, } @@ -31,9 +34,15 @@ func newDatagramQueue(hasData func(), logger utils.Logger) *datagramQueue { // AddAndWait queues a new DATAGRAM frame for sending. // It blocks until the frame has been dequeued. func (h *datagramQueue) AddAndWait(f *wire.DatagramFrame) error { - h.hasData() select { case h.sendQueue <- f: + h.hasData() + case <-h.closed: + return h.closeErr + } + + select { + case <-h.dequeued: return nil case <-h.closed: return h.closeErr @@ -44,6 +53,7 @@ func (h *datagramQueue) AddAndWait(f *wire.DatagramFrame) error { func (h *datagramQueue) Get() *wire.DatagramFrame { select { case f := <-h.sendQueue: + h.dequeued <- struct{}{} return f default: return nil