diff --git a/datagram_queue.go b/datagram_queue.go index 58aad3b8..59c7d069 100644 --- a/datagram_queue.go +++ b/datagram_queue.go @@ -1,6 +1,8 @@ package quic import ( + "sync" + "github.com/quic-go/quic-go/internal/protocol" "github.com/quic-go/quic-go/internal/utils" "github.com/quic-go/quic-go/internal/wire" @@ -9,7 +11,10 @@ import ( type datagramQueue struct { sendQueue chan *wire.DatagramFrame nextFrame *wire.DatagramFrame - rcvQueue chan []byte + + rcvMx sync.Mutex + rcvQueue [][]byte + rcvd chan struct{} // used to notify Receive that a new datagram was received closeErr error closed chan struct{} @@ -25,7 +30,7 @@ func newDatagramQueue(hasData func(), logger utils.Logger) *datagramQueue { return &datagramQueue{ hasData: hasData, sendQueue: make(chan *wire.DatagramFrame, 1), - rcvQueue: make(chan []byte, protocol.DatagramRcvQueueLen), + rcvd: make(chan struct{}, 1), dequeued: make(chan struct{}), closed: make(chan struct{}), logger: logger, @@ -76,20 +81,39 @@ func (h *datagramQueue) Pop() { func (h *datagramQueue) HandleDatagramFrame(f *wire.DatagramFrame) { data := make([]byte, len(f.Data)) copy(data, f.Data) - select { - case h.rcvQueue <- data: - default: + var queued bool + h.rcvMx.Lock() + if len(h.rcvQueue) < protocol.DatagramRcvQueueLen { + h.rcvQueue = append(h.rcvQueue, data) + queued = true + select { + case h.rcvd <- struct{}{}: + default: + } + } + h.rcvMx.Unlock() + if !queued && h.logger.Debug() { h.logger.Debugf("Discarding DATAGRAM frame (%d bytes payload)", len(f.Data)) } } // Receive gets a received DATAGRAM frame. func (h *datagramQueue) Receive() ([]byte, error) { - select { - case data := <-h.rcvQueue: - return data, nil - case <-h.closed: - return nil, h.closeErr + for { + h.rcvMx.Lock() + if len(h.rcvQueue) > 0 { + data := h.rcvQueue[0] + h.rcvQueue = h.rcvQueue[1:] + h.rcvMx.Unlock() + return data, nil + } + h.rcvMx.Unlock() + select { + case <-h.rcvd: + continue + case <-h.closed: + return nil, h.closeErr + } } }