implement read deadlines

This commit is contained in:
Marten Seemann 2017-04-16 11:12:21 +07:00
parent a70ae86f5a
commit 5720e8af7d
2 changed files with 105 additions and 7 deletions

View file

@ -1,9 +1,11 @@
package quic
import (
"errors"
"fmt"
"io"
"sync"
"time"
"github.com/lucas-clemente/quic-go/flowcontrol"
"github.com/lucas-clemente/quic-go/frames"
@ -40,8 +42,9 @@ type stream struct {
// resetRemotely is set if RegisterRemoteError() is called
resetRemotely utils.AtomicBool
frameQueue *streamFrameSorter
readChan chan struct{}
frameQueue *streamFrameSorter
readChan chan struct{}
readDeadline time.Time
dataForWriting []byte
finSent utils.AtomicBool
@ -51,6 +54,8 @@ type stream struct {
flowControlManager flowcontrol.FlowControlManager
}
var errDeadline = errors.New("deadline exceeded")
// newStream creates a new Stream
func newStream(StreamID protocol.StreamID,
onData func(),
@ -83,10 +88,10 @@ func (s *stream) Read(p []byte) (int, error) {
for bytesRead < len(p) {
s.mutex.Lock()
frame := s.frameQueue.Head()
s.mutex.Unlock()
if frame == nil && bytesRead > 0 {
return bytesRead, s.err
err = s.err
s.mutex.Unlock()
return bytesRead, err
}
var err error
@ -96,15 +101,31 @@ func (s *stream) Read(p []byte) (int, error) {
err = s.err
break
}
deadline := s.readDeadline
if !deadline.IsZero() && !time.Now().Before(deadline) {
err = errDeadline
break
}
if frame != nil {
s.readPosInFrame = int(s.readOffset - frame.Offset)
break
}
<-s.readChan
s.mutex.Unlock()
if deadline.IsZero() {
<-s.readChan
} else {
select {
case <-s.readChan:
case <-time.After(deadline.Sub(time.Now())):
}
}
s.mutex.Lock()
frame = s.frameQueue.Head()
s.mutex.Unlock()
}
s.mutex.Unlock()
if err != nil {
return bytesRead, err
@ -272,6 +293,21 @@ func (s *stream) signalWrite() {
}
}
// SetReadDeadline sets the deadline for future Read calls and
// any currently-blocked Read call.
// A zero value for t means Read will not time out.
func (s *stream) SetReadDeadline(t time.Time) error {
s.mutex.Lock()
oldDeadline := s.readDeadline
s.readDeadline = t
s.mutex.Unlock()
// if the new deadline is before the currently set deadline, wake up Read()
if t.Before(oldDeadline) {
s.signalRead()
}
return nil
}
// CloseRemote makes the stream receive a "virtual" FIN stream frame at a given offset
func (s *stream) CloseRemote(offset protocol.ByteCount) {
s.AddStreamFrame(&frames.StreamFrame{FinBit: true, Offset: offset})

View file

@ -3,6 +3,7 @@ package quic
import (
"errors"
"io"
"runtime"
"time"
"github.com/lucas-clemente/quic-go/frames"
@ -238,6 +239,67 @@ var _ = Describe("Stream", func() {
Expect(onDataCalled).To(BeTrue())
})
Context("deadlines", func() {
It("returns an error when Read is called after the deadline", func() {
mockFcm.EXPECT().UpdateHighestReceived(streamID, protocol.ByteCount(6)).AnyTimes()
f := &frames.StreamFrame{Data: []byte("foobar")}
err := str.AddStreamFrame(f)
Expect(err).ToNot(HaveOccurred())
str.SetReadDeadline(time.Now().Add(-time.Second))
b := make([]byte, 6)
n, err := str.Read(b)
Expect(err).To(MatchError(errDeadline))
Expect(n).To(BeZero())
})
It("unblocks read after the deadline", func() {
deadline := time.Now().Add(200 * time.Millisecond)
str.SetReadDeadline(deadline)
b := make([]byte, 6)
n, err := str.Read(b)
Expect(err).To(MatchError(errDeadline))
Expect(n).To(BeZero())
Expect(time.Now()).To(BeTemporally("~", deadline, 50*time.Millisecond))
})
It("doesn't unblock if the deadline is changed before the first one expires", func() {
deadline1 := time.Now().Add(200 * time.Millisecond)
deadline2 := time.Now().Add(400 * time.Millisecond)
str.SetReadDeadline(deadline1)
go func() {
defer GinkgoRecover()
time.Sleep(50 * time.Millisecond)
str.SetReadDeadline(deadline2)
// make sure that this was actually execute before the deadline expires
Expect(time.Now()).To(BeTemporally("<", deadline1))
}()
runtime.Gosched()
b := make([]byte, 10)
n, err := str.Read(b)
Expect(err).To(MatchError(errDeadline))
Expect(n).To(BeZero())
Expect(time.Now()).To(BeTemporally("~", deadline2, 50*time.Millisecond))
})
It("unblocks earlier, when a new deadline is set", func() {
deadline1 := time.Now().Add(1200 * time.Millisecond)
deadline2 := time.Now().Add(300 * time.Millisecond)
go func() {
defer GinkgoRecover()
time.Sleep(50 * time.Millisecond)
str.SetReadDeadline(deadline2)
// make sure that this was actually execute before the deadline expires
Expect(time.Now()).To(BeTemporally("<", deadline2))
}()
str.SetReadDeadline(deadline1)
runtime.Gosched()
b := make([]byte, 10)
_, err := str.Read(b)
Expect(err).To(MatchError(errDeadline))
Expect(time.Now()).To(BeTemporally("~", deadline2, 50*time.Millisecond))
})
})
Context("closing", func() {
Context("with FIN bit", func() {
It("returns EOFs", func() {