call the onStreamCompleted callback without holding the stream mutex

This commit is contained in:
Marten Seemann 2019-01-24 13:11:20 +07:00
parent 770beb55ab
commit 46b1d7a1fc

View file

@ -191,14 +191,17 @@ func (s *receiveStream) dequeueNextFrame() {
} }
func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) { func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) {
if completed := s.cancelReadImpl(errorCode); completed {
s.streamCompleted()
}
}
func (s *receiveStream) cancelReadImpl(errorCode protocol.ApplicationErrorCode) bool /* completed */ {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
if s.finRead || s.canceledRead || s.resetRemotely { if s.finRead || s.canceledRead || s.resetRemotely {
return return false
}
if s.finalOffset != protocol.MaxByteCount { // final offset was already received
s.streamCompleted()
} }
s.canceledRead = true s.canceledRead = true
s.cancelReadErr = fmt.Errorf("Read on stream %d canceled with error code %d", s.streamID, errorCode) s.cancelReadErr = fmt.Errorf("Read on stream %d canceled with error code %d", s.streamID, errorCode)
@ -207,34 +210,37 @@ func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) {
StreamID: s.streamID, StreamID: s.streamID,
ErrorCode: errorCode, ErrorCode: errorCode,
}) })
// We're done with this stream if the final offset was already received.
return s.finalOffset != protocol.MaxByteCount
} }
func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error { func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error {
maxOffset := frame.Offset + frame.DataLen() completed, err := s.handleStreamFrameImpl(frame)
if completed {
s.streamCompleted()
}
return err
}
func (s *receiveStream) handleStreamFrameImpl(frame *wire.StreamFrame) (bool /* completed */, error) {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
maxOffset := frame.Offset + frame.DataLen()
if err := s.flowController.UpdateHighestReceived(maxOffset, frame.FinBit); err != nil { if err := s.flowController.UpdateHighestReceived(maxOffset, frame.FinBit); err != nil {
return err return false, err
} }
if frame.FinBit { if frame.FinBit {
s.finalOffset = maxOffset s.finalOffset = maxOffset
} }
if s.canceledRead { if s.canceledRead {
if frame.FinBit { return frame.FinBit, nil
s.streamCompleted()
}
return nil
} }
if err := s.frameQueue.Push(frame.Data, frame.Offset); err != nil { if err := s.frameQueue.Push(frame.Data, frame.Offset); err != nil {
return err return false, err
}
if frame.FinBit {
s.finalOffset = maxOffset
} }
s.signalRead() s.signalRead()
return nil return false, nil
} }
func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame) error { func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame) error {
@ -298,7 +304,11 @@ func (s *receiveStream) getWindowUpdate() protocol.ByteCount {
} }
func (s *receiveStream) streamCompleted() { func (s *receiveStream) streamCompleted() {
if !s.finRead { s.mutex.Lock()
finRead := s.finRead
s.mutex.Unlock()
if !finRead {
s.flowController.Abandon() s.flowController.Abandon()
} }
s.sender.onStreamCompleted(s.streamID) s.sender.onStreamCompleted(s.streamID)