From 46b1d7a1fc34bbbe29d83815526a48722b957b96 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 24 Jan 2019 13:11:20 +0700 Subject: [PATCH] call the onStreamCompleted callback without holding the stream mutex --- receive_stream.go | 42 ++++++++++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/receive_stream.go b/receive_stream.go index 59deb744..f166ee56 100644 --- a/receive_stream.go +++ b/receive_stream.go @@ -191,14 +191,17 @@ func (s *receiveStream) dequeueNextFrame() { } func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) { + if completed := s.cancelReadImpl(errorCode); completed { + s.streamCompleted() + } +} + +func (s *receiveStream) cancelReadImpl(errorCode protocol.ApplicationErrorCode) bool /* completed */ { s.mutex.Lock() defer s.mutex.Unlock() if s.finRead || s.canceledRead || s.resetRemotely { - return - } - if s.finalOffset != protocol.MaxByteCount { // final offset was already received - s.streamCompleted() + return false } s.canceledRead = true s.cancelReadErr = fmt.Errorf("Read on stream %d canceled with error code %d", s.streamID, errorCode) @@ -207,34 +210,37 @@ func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) { StreamID: s.streamID, ErrorCode: errorCode, }) + // We're done with this stream if the final offset was already received. + return s.finalOffset != protocol.MaxByteCount } func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error { - maxOffset := frame.Offset + frame.DataLen() + completed, err := s.handleStreamFrameImpl(frame) + if completed { + s.streamCompleted() + } + return err +} +func (s *receiveStream) handleStreamFrameImpl(frame *wire.StreamFrame) (bool /* completed */, error) { s.mutex.Lock() defer s.mutex.Unlock() + maxOffset := frame.Offset + frame.DataLen() if err := s.flowController.UpdateHighestReceived(maxOffset, frame.FinBit); err != nil { - return err + return false, err } if frame.FinBit { s.finalOffset = maxOffset } if s.canceledRead { - if frame.FinBit { - s.streamCompleted() - } - return nil + return frame.FinBit, nil } if err := s.frameQueue.Push(frame.Data, frame.Offset); err != nil { - return err - } - if frame.FinBit { - s.finalOffset = maxOffset + return false, err } s.signalRead() - return nil + return false, nil } func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame) error { @@ -298,7 +304,11 @@ func (s *receiveStream) getWindowUpdate() protocol.ByteCount { } func (s *receiveStream) streamCompleted() { - if !s.finRead { + s.mutex.Lock() + finRead := s.finRead + s.mutex.Unlock() + + if !finRead { s.flowController.Abandon() } s.sender.onStreamCompleted(s.streamID)