From d9edacf7119e3e02119ccf79c0edb3336a5adc88 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 15 Oct 2018 18:23:01 +0100 Subject: [PATCH] use a time.Timer for read deadlines --- receive_stream.go | 31 ++++++++++++++++++++----------- receive_stream_test.go | 14 ++++++++++++++ 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/receive_stream.go b/receive_stream.go index 55c463cc..f16174f3 100644 --- a/receive_stream.go +++ b/receive_stream.go @@ -43,8 +43,9 @@ type receiveStream struct { canceledRead bool // set when CancelRead() is called resetRemotely bool // set when HandleRstStreamFrame() is called - readChan chan struct{} - readDeadline time.Time + readChan chan struct{} + deadline time.Time + deadlineTimer *time.Timer // initialized by SetReadDeadline() flowController flowcontrol.StreamFlowController version protocol.VersionNumber @@ -120,8 +121,7 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err return false, bytesRead, s.resetRemotelyErr } - deadline := s.readDeadline - if !deadline.IsZero() && !time.Now().Before(deadline) { + if !s.deadline.IsZero() && !time.Now().Before(s.deadline) { return false, bytesRead, errDeadline } @@ -130,12 +130,12 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err } s.mutex.Unlock() - if deadline.IsZero() { + if s.deadline.IsZero() { <-s.readChan } else { select { case <-s.readChan: - case <-time.After(time.Until(deadline)): + case <-s.deadlineTimer.C: } } s.mutex.Lock() @@ -272,13 +272,22 @@ func (s *receiveStream) onClose(offset protocol.ByteCount) { func (s *receiveStream) SetReadDeadline(t time.Time) error { s.mutex.Lock() - oldDeadline := s.readDeadline - s.readDeadline = t - s.mutex.Unlock() - // if the new deadline is before the currently set deadline, wake up Read() - if t.Before(oldDeadline) { + defer s.mutex.Unlock() + s.deadline = t + if s.deadline.IsZero() { // skip if there's no deadline to set s.signalRead() + return nil } + // Lazily initialize the deadline timer. + if s.deadlineTimer == nil { + s.deadlineTimer = time.NewTimer(time.Until(t)) + return nil + } + // reset the timer to the new deadline + if !s.deadlineTimer.Stop() { + <-s.deadlineTimer.C + } + s.deadlineTimer.Reset(time.Until(t)) return nil } diff --git a/receive_stream_test.go b/receive_stream_test.go index 1b3f5c35..6c9b3469 100644 --- a/receive_stream_test.go +++ b/receive_stream_test.go @@ -243,6 +243,20 @@ var _ = Describe("Receive Stream", func() { Expect(n).To(BeZero()) }) + It("unblocks when the deadline is changed to the past", func() { + str.SetReadDeadline(time.Now().Add(time.Hour)) + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + _, err := str.Read(make([]byte, 6)) + Expect(err).To(MatchError(errDeadline)) + close(done) + }() + Consistently(done).ShouldNot(BeClosed()) + str.SetReadDeadline(time.Now().Add(-time.Hour)) + Eventually(done).Should(BeClosed()) + }) + It("unblocks after the deadline", func() { deadline := time.Now().Add(scaleDuration(50 * time.Millisecond)) str.SetReadDeadline(deadline)