diff --git a/connection.go b/connection.go index b62f8be5..082e95d7 100644 --- a/connection.go +++ b/connection.go @@ -2359,7 +2359,7 @@ func (s *connection) SendDatagram(p []byte) error { } f.Data = make([]byte, len(p)) copy(f.Data, p) - return s.datagramQueue.AddAndWait(f) + return s.datagramQueue.Add(f) } func (s *connection) ReceiveDatagram(ctx context.Context) ([]byte, error) { diff --git a/datagram_queue.go b/datagram_queue.go index ca80d404..43026b25 100644 --- a/datagram_queue.go +++ b/datagram_queue.go @@ -4,14 +4,19 @@ import ( "context" "sync" - "github.com/quic-go/quic-go/internal/protocol" "github.com/quic-go/quic-go/internal/utils" "github.com/quic-go/quic-go/internal/wire" ) +const ( + maxDatagramSendQueueLen = 32 + maxDatagramRcvQueueLen = 128 +) + type datagramQueue struct { - sendQueue chan *wire.DatagramFrame - nextFrame *wire.DatagramFrame + sendMx sync.Mutex + sendQueue []*wire.DatagramFrame // TODO: this could be a ring buffer + sent chan struct{} // used to notify Add that a datagram was dequeued rcvMx sync.Mutex rcvQueue [][]byte @@ -22,60 +27,68 @@ type datagramQueue struct { hasData func() - dequeued chan struct{} - logger utils.Logger } func newDatagramQueue(hasData func(), logger utils.Logger) *datagramQueue { return &datagramQueue{ - hasData: hasData, - sendQueue: make(chan *wire.DatagramFrame, 1), - rcvd: make(chan struct{}, 1), - dequeued: make(chan struct{}), - closed: make(chan struct{}), - logger: logger, + hasData: hasData, + rcvd: make(chan struct{}, 1), + sent: make(chan struct{}, 1), + closed: make(chan struct{}), + logger: logger, } } -// AddAndWait queues a new DATAGRAM frame for sending. -// It blocks until the frame has been dequeued. -func (h *datagramQueue) AddAndWait(f *wire.DatagramFrame) error { - select { - case h.sendQueue <- f: - h.hasData() - case <-h.closed: - return h.closeErr - } +// Add queues a new DATAGRAM frame for sending. +// Up to 32 DATAGRAM frames will be queued. +// Once that limit is reached, Add blocks until the queue size has reduced. +func (h *datagramQueue) Add(f *wire.DatagramFrame) error { + h.sendMx.Lock() - select { - case <-h.dequeued: - return nil - case <-h.closed: - return h.closeErr + for { + if len(h.sendQueue) < maxDatagramSendQueueLen { + h.sendQueue = append(h.sendQueue, f) + h.sendMx.Unlock() + h.hasData() + return nil + } + select { + case <-h.sent: // drain the queue so we don't loop immediately + default: + } + h.sendMx.Unlock() + select { + case <-h.closed: + return h.closeErr + case <-h.sent: + } + h.sendMx.Lock() } } // Peek gets the next DATAGRAM frame for sending. // If actually sent out, Pop needs to be called before the next call to Peek. func (h *datagramQueue) Peek() *wire.DatagramFrame { - if h.nextFrame != nil { - return h.nextFrame - } - select { - case h.nextFrame = <-h.sendQueue: - h.dequeued <- struct{}{} - default: + h.sendMx.Lock() + defer h.sendMx.Unlock() + if len(h.sendQueue) == 0 { return nil } - return h.nextFrame + return h.sendQueue[0] } func (h *datagramQueue) Pop() { - if h.nextFrame == nil { + h.sendMx.Lock() + defer h.sendMx.Unlock() + if len(h.sendQueue) == 0 { panic("datagramQueue BUG: Pop called for nil frame") } - h.nextFrame = nil + h.sendQueue = h.sendQueue[1:] + select { + case h.sent <- struct{}{}: + default: + } } // HandleDatagramFrame handles a received DATAGRAM frame. @@ -84,7 +97,7 @@ func (h *datagramQueue) HandleDatagramFrame(f *wire.DatagramFrame) { copy(data, f.Data) var queued bool h.rcvMx.Lock() - if len(h.rcvQueue) < protocol.DatagramRcvQueueLen { + if len(h.rcvQueue) < maxDatagramRcvQueueLen { h.rcvQueue = append(h.rcvQueue, data) queued = true select { @@ -94,7 +107,7 @@ func (h *datagramQueue) HandleDatagramFrame(f *wire.DatagramFrame) { } h.rcvMx.Unlock() if !queued && h.logger.Debug() { - h.logger.Debugf("Discarding DATAGRAM frame (%d bytes payload)", len(f.Data)) + h.logger.Debugf("Discarding received DATAGRAM frame (%d bytes payload)", len(f.Data)) } } diff --git a/datagram_queue_test.go b/datagram_queue_test.go index de3f8f57..fe8db59f 100644 --- a/datagram_queue_test.go +++ b/datagram_queue_test.go @@ -3,6 +3,7 @@ package quic import ( "context" "errors" + "time" "github.com/quic-go/quic-go/internal/utils" "github.com/quic-go/quic-go/internal/wire" @@ -26,55 +27,65 @@ var _ = Describe("Datagram Queue", func() { }) It("queues a datagram", func() { - done := make(chan struct{}) frame := &wire.DatagramFrame{Data: []byte("foobar")} - go func() { - defer GinkgoRecover() - defer close(done) - Expect(queue.AddAndWait(frame)).To(Succeed()) - }() - - Eventually(queued).Should(HaveLen(1)) - Consistently(done).ShouldNot(BeClosed()) + Expect(queue.Add(frame)).To(Succeed()) + Expect(queued).To(HaveLen(1)) f := queue.Peek() Expect(f.Data).To(Equal([]byte("foobar"))) - Eventually(done).Should(BeClosed()) queue.Pop() Expect(queue.Peek()).To(BeNil()) }) - It("returns the same datagram multiple times, when Pop isn't called", func() { - sent := make(chan struct{}, 1) + It("blocks when the maximum number of datagrams have been queued", func() { + for i := 0; i < maxDatagramSendQueueLen; i++ { + Expect(queue.Add(&wire.DatagramFrame{Data: []byte{0}})).To(Succeed()) + } + errChan := make(chan error, 1) go func() { defer GinkgoRecover() - Expect(queue.AddAndWait(&wire.DatagramFrame{Data: []byte("foo")})).To(Succeed()) - sent <- struct{}{} - Expect(queue.AddAndWait(&wire.DatagramFrame{Data: []byte("bar")})).To(Succeed()) - sent <- struct{}{} + errChan <- queue.Add(&wire.DatagramFrame{Data: []byte("foobar")}) }() + Consistently(errChan, 50*time.Millisecond).ShouldNot(Receive()) + Expect(queue.Peek()).ToNot(BeNil()) + Consistently(errChan, 50*time.Millisecond).ShouldNot(Receive()) + queue.Pop() + Eventually(errChan).Should(Receive(BeNil())) + for i := 1; i < maxDatagramSendQueueLen; i++ { + queue.Pop() + } + f := queue.Peek() + Expect(f).ToNot(BeNil()) + Expect(f.Data).To(Equal([]byte("foobar"))) + }) - Eventually(queued).Should(HaveLen(1)) + It("returns the same datagram multiple times, when Pop isn't called", func() { + Expect(queue.Add(&wire.DatagramFrame{Data: []byte("foo")})).To(Succeed()) + Expect(queue.Add(&wire.DatagramFrame{Data: []byte("bar")})).To(Succeed()) + + Eventually(queued).Should(HaveLen(2)) f := queue.Peek() Expect(f.Data).To(Equal([]byte("foo"))) - Eventually(sent).Should(Receive()) Expect(queue.Peek()).To(Equal(f)) Expect(queue.Peek()).To(Equal(f)) queue.Pop() - Eventually(func() *wire.DatagramFrame { f = queue.Peek(); return f }).ShouldNot(BeNil()) f = queue.Peek() + Expect(f).ToNot(BeNil()) Expect(f.Data).To(Equal([]byte("bar"))) }) It("closes", func() { + for i := 0; i < maxDatagramSendQueueLen; i++ { + Expect(queue.Add(&wire.DatagramFrame{Data: []byte("foo")})).To(Succeed()) + } errChan := make(chan error, 1) go func() { defer GinkgoRecover() - errChan <- queue.AddAndWait(&wire.DatagramFrame{Data: []byte("foobar")}) + errChan <- queue.Add(&wire.DatagramFrame{Data: []byte("foo")}) }() - - Consistently(errChan).ShouldNot(Receive()) - queue.CloseWithError(errors.New("test error")) - Eventually(errChan).Should(Receive(MatchError("test error"))) + Consistently(errChan, 25*time.Millisecond).ShouldNot(Receive()) + testErr := errors.New("test error") + queue.CloseWithError(testErr) + Eventually(errChan).Should(Receive(MatchError(testErr))) }) }) diff --git a/internal/protocol/params.go b/internal/protocol/params.go index 28b6da7c..487cbc06 100644 --- a/internal/protocol/params.go +++ b/internal/protocol/params.go @@ -129,9 +129,6 @@ const MaxPostHandshakeCryptoFrameSize = 1000 // but must ensure that a maximum size ACK frame fits into one packet. const MaxAckFrameSize ByteCount = 1000 -// DatagramRcvQueueLen is the length of the receive queue for DATAGRAM frames (RFC 9221) -const DatagramRcvQueueLen = 128 - // MaxNumAckRanges is the maximum number of ACK ranges that we send in an ACK frame. // It also serves as a limit for the packet history. // If at any point we keep track of more ranges, old ranges are discarded. diff --git a/packet_packer_test.go b/packet_packer_test.go index 677760ad..5b227b73 100644 --- a/packet_packer_test.go +++ b/packet_packer_test.go @@ -602,7 +602,7 @@ var _ = Describe("Packet packer", func() { go func() { defer GinkgoRecover() defer close(done) - datagramQueue.AddAndWait(f) + datagramQueue.Add(f) }() // make sure the DATAGRAM has actually been queued time.Sleep(scaleDuration(20 * time.Millisecond)) @@ -630,7 +630,7 @@ var _ = Describe("Packet packer", func() { go func() { defer GinkgoRecover() defer close(done) - datagramQueue.AddAndWait(f) + datagramQueue.Add(f) }() // make sure the DATAGRAM has actually been queued time.Sleep(scaleDuration(20 * time.Millisecond)) @@ -659,7 +659,7 @@ var _ = Describe("Packet packer", func() { go func() { defer GinkgoRecover() defer close(done) - datagramQueue.AddAndWait(f) + datagramQueue.Add(f) }() // make sure the DATAGRAM has actually been queued time.Sleep(scaleDuration(20 * time.Millisecond))