diff --git a/receive_stream.go b/receive_stream.go index f166ee56..de76335e 100644 --- a/receive_stream.go +++ b/receive_stream.go @@ -78,7 +78,10 @@ func (s *receiveStream) StreamID() protocol.StreamID { // Read implements io.Reader. It is not thread safe! func (s *receiveStream) Read(p []byte) (int, error) { + s.mutex.Lock() completed, n, err := s.readImpl(p) + s.mutex.Unlock() + if completed { s.streamCompleted() } @@ -86,9 +89,6 @@ func (s *receiveStream) Read(p []byte) (int, error) { } func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, error) { - s.mutex.Lock() - defer s.mutex.Unlock() - if s.finRead { return false, 0, io.EOF } @@ -191,15 +191,16 @@ func (s *receiveStream) dequeueNextFrame() { } func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) { - if completed := s.cancelReadImpl(errorCode); completed { + s.mutex.Lock() + completed := s.cancelReadImpl(errorCode) + s.mutex.Unlock() + + if completed { s.streamCompleted() } } func (s *receiveStream) cancelReadImpl(errorCode protocol.ApplicationErrorCode) bool /* completed */ { - s.mutex.Lock() - defer s.mutex.Unlock() - if s.finRead || s.canceledRead || s.resetRemotely { return false } @@ -215,7 +216,10 @@ func (s *receiveStream) cancelReadImpl(errorCode protocol.ApplicationErrorCode) } func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error { + s.mutex.Lock() completed, err := s.handleStreamFrameImpl(frame) + s.mutex.Unlock() + if completed { s.streamCompleted() } @@ -223,9 +227,6 @@ func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error { } func (s *receiveStream) handleStreamFrameImpl(frame *wire.StreamFrame) (bool /* completed */, error) { - s.mutex.Lock() - defer s.mutex.Unlock() - maxOffset := frame.Offset + frame.DataLen() if err := s.flowController.UpdateHighestReceived(maxOffset, frame.FinBit); err != nil { return false, err @@ -244,7 +245,10 @@ func (s *receiveStream) handleStreamFrameImpl(frame *wire.StreamFrame) (bool /* } func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame) error { + s.mutex.Lock() completed, err := s.handleResetStreamFrameImpl(frame) + s.mutex.Unlock() + if completed { s.streamCompleted() } @@ -252,9 +256,6 @@ func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame) err } func (s *receiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame) (bool /*completed */, error) { - s.mutex.Lock() - defer s.mutex.Unlock() - if s.closedForShutdown { return false, nil } diff --git a/send_stream.go b/send_stream.go index 056fcf56..92387c86 100644 --- a/send_stream.go +++ b/send_stream.go @@ -146,7 +146,10 @@ func (s *sendStream) Write(p []byte) (int, error) { // popStreamFrame returns the next STREAM frame that is supposed to be sent on this stream // maxBytes is the maximum length this frame (including frame header) will have. func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool /* has more data to send */) { + s.mutex.Lock() completed, frame, hasMoreData := s.popStreamFrameImpl(maxBytes) + s.mutex.Unlock() + if completed { s.sender.onStreamCompleted(s.streamID) } @@ -154,9 +157,6 @@ func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFr } func (s *sendStream) popStreamFrameImpl(maxBytes protocol.ByteCount) (bool /* completed */, *wire.StreamFrame, bool /* has more data to send */) { - s.mutex.Lock() - defer s.mutex.Unlock() - if s.canceledWrite || s.closeForShutdownErr != nil { return false, nil, false } @@ -273,6 +273,7 @@ func (s *sendStream) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) { s.mutex.Lock() hasStreamData := s.dataForWriting != nil s.mutex.Unlock() + s.flowController.UpdateSendWindow(frame.ByteOffset) if hasStreamData { s.sender.onHasStreamData(s.streamID) @@ -280,16 +281,17 @@ func (s *sendStream) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) { } func (s *sendStream) handleStopSendingFrame(frame *wire.StopSendingFrame) { - if completed := s.handleStopSendingFrameImpl(frame); completed { + s.mutex.Lock() + completed := s.handleStopSendingFrameImpl(frame) + s.mutex.Unlock() + + if completed { s.sender.onStreamCompleted(s.streamID) } } // must be called after locking the mutex func (s *sendStream) handleStopSendingFrameImpl(frame *wire.StopSendingFrame) bool /*completed*/ { - s.mutex.Lock() - defer s.mutex.Unlock() - writeErr := streamCanceledError{ errorCode: frame.ErrorCode, error: fmt.Errorf("Stream %d was reset with error code %d", s.streamID, frame.ErrorCode),