replace the sync.Cond for stream.Read() by a channel

This commit is contained in:
Marten Seemann 2017-04-15 12:13:12 +07:00
parent bd693193ed
commit 5fbd52158f
2 changed files with 21 additions and 14 deletions

View file

@ -41,7 +41,7 @@ type stream struct {
resetRemotely utils.AtomicBool
frameQueue *streamFrameSorter
newFrameOrErrCond sync.Cond
readChan chan struct{}
dataForWriting []byte
finSent utils.AtomicBool
@ -62,8 +62,8 @@ func newStream(StreamID protocol.StreamID,
streamID: StreamID,
flowControlManager: flowControlManager,
frameQueue: newStreamFrameSorter(),
readChan: make(chan struct{}, 1),
}
s.newFrameOrErrCond.L = &s.mutex
s.doneWritingOrErrCond.L = &s.mutex
return s
}
@ -84,9 +84,9 @@ func (s *stream) Read(p []byte) (int, error) {
for bytesRead < len(p) {
s.mutex.Lock()
frame := s.frameQueue.Head()
s.mutex.Unlock()
if frame == nil && bytesRead > 0 {
s.mutex.Unlock()
return bytesRead, s.err
}
@ -101,10 +101,11 @@ func (s *stream) Read(p []byte) (int, error) {
s.readPosInFrame = int(s.readOffset - frame.Offset)
break
}
s.newFrameOrErrCond.Wait()
<-s.readChan
s.mutex.Lock()
frame = s.frameQueue.Head()
}
s.mutex.Unlock()
}
if err != nil {
return bytesRead, err
@ -250,10 +251,18 @@ func (s *stream) AddStreamFrame(frame *frames.StreamFrame) error {
if err != nil && err != errDuplicateStreamData {
return err
}
s.newFrameOrErrCond.Signal()
s.signalRead()
return nil
}
// signalRead performs a non-blocking send on the readChan
func (s *stream) signalRead() {
select {
case s.readChan <- struct{}{}:
default:
}
}
// CloseRemote makes the stream receive a "virtual" FIN stream frame at a given offset
func (s *stream) CloseRemote(offset protocol.ByteCount) {
s.AddStreamFrame(&frames.StreamFrame{FinBit: true, Offset: offset})
@ -267,7 +276,7 @@ func (s *stream) Cancel(err error) {
// errors must not be changed!
if s.err == nil {
s.err = err
s.newFrameOrErrCond.Signal()
s.signalRead()
s.doneWritingOrErrCond.Signal()
}
s.mutex.Unlock()
@ -283,7 +292,7 @@ func (s *stream) Reset(err error) {
// errors must not be changed!
if s.err == nil {
s.err = err
s.newFrameOrErrCond.Signal()
s.signalRead()
s.doneWritingOrErrCond.Signal()
}
if s.shouldSendReset() {

View file

@ -135,11 +135,9 @@ var _ = Describe("Stream", func() {
mockFcm.EXPECT().UpdateHighestReceived(streamID, protocol.ByteCount(2))
mockFcm.EXPECT().AddBytesRead(streamID, protocol.ByteCount(2))
go func() {
frame := frames.StreamFrame{
Offset: 0,
Data: []byte{0xDE, 0xAD},
}
time.Sleep(time.Millisecond)
defer GinkgoRecover()
frame := frames.StreamFrame{Data: []byte{0xDE, 0xAD}}
time.Sleep(10 * time.Millisecond)
err := str.AddStreamFrame(&frame)
Expect(err).ToNot(HaveOccurred())
}()