diff --git a/stream.go b/stream.go index ce0ffdb2..9ee9b3a3 100644 --- a/stream.go +++ b/stream.go @@ -1,9 +1,11 @@ package quic import ( + "errors" "fmt" "io" "sync" + "time" "github.com/lucas-clemente/quic-go/flowcontrol" "github.com/lucas-clemente/quic-go/frames" @@ -40,8 +42,9 @@ type stream struct { // resetRemotely is set if RegisterRemoteError() is called resetRemotely utils.AtomicBool - frameQueue *streamFrameSorter - readChan chan struct{} + frameQueue *streamFrameSorter + readChan chan struct{} + readDeadline time.Time dataForWriting []byte finSent utils.AtomicBool @@ -51,6 +54,8 @@ type stream struct { flowControlManager flowcontrol.FlowControlManager } +var errDeadline = errors.New("deadline exceeded") + // newStream creates a new Stream func newStream(StreamID protocol.StreamID, onData func(), @@ -83,10 +88,10 @@ func (s *stream) Read(p []byte) (int, error) { for bytesRead < len(p) { s.mutex.Lock() frame := s.frameQueue.Head() - s.mutex.Unlock() - if frame == nil && bytesRead > 0 { - return bytesRead, s.err + err = s.err + s.mutex.Unlock() + return bytesRead, err } var err error @@ -96,15 +101,31 @@ func (s *stream) Read(p []byte) (int, error) { err = s.err break } + + deadline := s.readDeadline + if !deadline.IsZero() && !time.Now().Before(deadline) { + err = errDeadline + break + } + if frame != nil { s.readPosInFrame = int(s.readOffset - frame.Offset) break } - <-s.readChan + + s.mutex.Unlock() + if deadline.IsZero() { + <-s.readChan + } else { + select { + case <-s.readChan: + case <-time.After(deadline.Sub(time.Now())): + } + } s.mutex.Lock() frame = s.frameQueue.Head() - s.mutex.Unlock() } + s.mutex.Unlock() if err != nil { return bytesRead, err @@ -272,6 +293,21 @@ func (s *stream) signalWrite() { } } +// SetReadDeadline sets the deadline for future Read calls and +// any currently-blocked Read call. +// A zero value for t means Read will not time out. +func (s *stream) SetReadDeadline(t time.Time) error { + s.mutex.Lock() + oldDeadline := s.readDeadline + s.readDeadline = t + s.mutex.Unlock() + // if the new deadline is before the currently set deadline, wake up Read() + if t.Before(oldDeadline) { + s.signalRead() + } + return nil +} + // CloseRemote makes the stream receive a "virtual" FIN stream frame at a given offset func (s *stream) CloseRemote(offset protocol.ByteCount) { s.AddStreamFrame(&frames.StreamFrame{FinBit: true, Offset: offset}) diff --git a/stream_test.go b/stream_test.go index 4f4f273a..6b7cdf72 100644 --- a/stream_test.go +++ b/stream_test.go @@ -3,6 +3,7 @@ package quic import ( "errors" "io" + "runtime" "time" "github.com/lucas-clemente/quic-go/frames" @@ -238,6 +239,67 @@ var _ = Describe("Stream", func() { Expect(onDataCalled).To(BeTrue()) }) + Context("deadlines", func() { + It("returns an error when Read is called after the deadline", func() { + mockFcm.EXPECT().UpdateHighestReceived(streamID, protocol.ByteCount(6)).AnyTimes() + f := &frames.StreamFrame{Data: []byte("foobar")} + err := str.AddStreamFrame(f) + Expect(err).ToNot(HaveOccurred()) + str.SetReadDeadline(time.Now().Add(-time.Second)) + b := make([]byte, 6) + n, err := str.Read(b) + Expect(err).To(MatchError(errDeadline)) + Expect(n).To(BeZero()) + }) + + It("unblocks read after the deadline", func() { + deadline := time.Now().Add(200 * time.Millisecond) + str.SetReadDeadline(deadline) + b := make([]byte, 6) + n, err := str.Read(b) + Expect(err).To(MatchError(errDeadline)) + Expect(n).To(BeZero()) + Expect(time.Now()).To(BeTemporally("~", deadline, 50*time.Millisecond)) + }) + + It("doesn't unblock if the deadline is changed before the first one expires", func() { + deadline1 := time.Now().Add(200 * time.Millisecond) + deadline2 := time.Now().Add(400 * time.Millisecond) + str.SetReadDeadline(deadline1) + go func() { + defer GinkgoRecover() + time.Sleep(50 * time.Millisecond) + str.SetReadDeadline(deadline2) + // make sure that this was actually execute before the deadline expires + Expect(time.Now()).To(BeTemporally("<", deadline1)) + }() + runtime.Gosched() + b := make([]byte, 10) + n, err := str.Read(b) + Expect(err).To(MatchError(errDeadline)) + Expect(n).To(BeZero()) + Expect(time.Now()).To(BeTemporally("~", deadline2, 50*time.Millisecond)) + }) + + It("unblocks earlier, when a new deadline is set", func() { + deadline1 := time.Now().Add(1200 * time.Millisecond) + deadline2 := time.Now().Add(300 * time.Millisecond) + go func() { + defer GinkgoRecover() + time.Sleep(50 * time.Millisecond) + str.SetReadDeadline(deadline2) + // make sure that this was actually execute before the deadline expires + Expect(time.Now()).To(BeTemporally("<", deadline2)) + }() + str.SetReadDeadline(deadline1) + runtime.Gosched() + b := make([]byte, 10) + _, err := str.Read(b) + Expect(err).To(MatchError(errDeadline)) + Expect(time.Now()).To(BeTemporally("~", deadline2, 50*time.Millisecond)) + }) + }) + Context("closing", func() { Context("with FIN bit", func() { It("returns EOFs", func() {