add an internal queue to signal that a datagram frame has been dequeued

This commit is contained in:
Mathis Engelbart 2021-03-09 18:17:38 +01:00
parent eb3e100e80
commit bcc7baf111

View file

@ -15,14 +15,17 @@ type datagramQueue struct {
hasData func() hasData func()
dequeued chan struct{}
logger utils.Logger logger utils.Logger
} }
func newDatagramQueue(hasData func(), logger utils.Logger) *datagramQueue { func newDatagramQueue(hasData func(), logger utils.Logger) *datagramQueue {
return &datagramQueue{ return &datagramQueue{
hasData: hasData, hasData: hasData,
sendQueue: make(chan *wire.DatagramFrame), sendQueue: make(chan *wire.DatagramFrame, 1),
rcvQueue: make(chan []byte, protocol.DatagramRcvQueueLen), rcvQueue: make(chan []byte, protocol.DatagramRcvQueueLen),
dequeued: make(chan struct{}),
closed: make(chan struct{}), closed: make(chan struct{}),
logger: logger, logger: logger,
} }
@ -31,9 +34,15 @@ func newDatagramQueue(hasData func(), logger utils.Logger) *datagramQueue {
// AddAndWait queues a new DATAGRAM frame for sending. // AddAndWait queues a new DATAGRAM frame for sending.
// It blocks until the frame has been dequeued. // It blocks until the frame has been dequeued.
func (h *datagramQueue) AddAndWait(f *wire.DatagramFrame) error { func (h *datagramQueue) AddAndWait(f *wire.DatagramFrame) error {
h.hasData()
select { select {
case h.sendQueue <- f: case h.sendQueue <- f:
h.hasData()
case <-h.closed:
return h.closeErr
}
select {
case <-h.dequeued:
return nil return nil
case <-h.closed: case <-h.closed:
return h.closeErr return h.closeErr
@ -44,6 +53,7 @@ func (h *datagramQueue) AddAndWait(f *wire.DatagramFrame) error {
func (h *datagramQueue) Get() *wire.DatagramFrame { func (h *datagramQueue) Get() *wire.DatagramFrame {
select { select {
case f := <-h.sendQueue: case f := <-h.sendQueue:
h.dequeued <- struct{}{}
return f return f
default: default:
return nil return nil