mirror of
https://github.com/refraction-networking/uquic.git
synced 2025-04-04 20:57:36 +03:00
remove some defer statements in the stream
This commit is contained in:
parent
46b1d7a1fc
commit
2787a6051a
2 changed files with 23 additions and 20 deletions
|
@ -78,7 +78,10 @@ func (s *receiveStream) StreamID() protocol.StreamID {
|
||||||
|
|
||||||
// Read implements io.Reader. It is not thread safe!
|
// Read implements io.Reader. It is not thread safe!
|
||||||
func (s *receiveStream) Read(p []byte) (int, error) {
|
func (s *receiveStream) Read(p []byte) (int, error) {
|
||||||
|
s.mutex.Lock()
|
||||||
completed, n, err := s.readImpl(p)
|
completed, n, err := s.readImpl(p)
|
||||||
|
s.mutex.Unlock()
|
||||||
|
|
||||||
if completed {
|
if completed {
|
||||||
s.streamCompleted()
|
s.streamCompleted()
|
||||||
}
|
}
|
||||||
|
@ -86,9 +89,6 @@ func (s *receiveStream) Read(p []byte) (int, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, error) {
|
func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, error) {
|
||||||
s.mutex.Lock()
|
|
||||||
defer s.mutex.Unlock()
|
|
||||||
|
|
||||||
if s.finRead {
|
if s.finRead {
|
||||||
return false, 0, io.EOF
|
return false, 0, io.EOF
|
||||||
}
|
}
|
||||||
|
@ -191,15 +191,16 @@ func (s *receiveStream) dequeueNextFrame() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) {
|
func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) {
|
||||||
if completed := s.cancelReadImpl(errorCode); completed {
|
s.mutex.Lock()
|
||||||
|
completed := s.cancelReadImpl(errorCode)
|
||||||
|
s.mutex.Unlock()
|
||||||
|
|
||||||
|
if completed {
|
||||||
s.streamCompleted()
|
s.streamCompleted()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *receiveStream) cancelReadImpl(errorCode protocol.ApplicationErrorCode) bool /* completed */ {
|
func (s *receiveStream) cancelReadImpl(errorCode protocol.ApplicationErrorCode) bool /* completed */ {
|
||||||
s.mutex.Lock()
|
|
||||||
defer s.mutex.Unlock()
|
|
||||||
|
|
||||||
if s.finRead || s.canceledRead || s.resetRemotely {
|
if s.finRead || s.canceledRead || s.resetRemotely {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -215,7 +216,10 @@ func (s *receiveStream) cancelReadImpl(errorCode protocol.ApplicationErrorCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error {
|
func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error {
|
||||||
|
s.mutex.Lock()
|
||||||
completed, err := s.handleStreamFrameImpl(frame)
|
completed, err := s.handleStreamFrameImpl(frame)
|
||||||
|
s.mutex.Unlock()
|
||||||
|
|
||||||
if completed {
|
if completed {
|
||||||
s.streamCompleted()
|
s.streamCompleted()
|
||||||
}
|
}
|
||||||
|
@ -223,9 +227,6 @@ func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *receiveStream) handleStreamFrameImpl(frame *wire.StreamFrame) (bool /* completed */, error) {
|
func (s *receiveStream) handleStreamFrameImpl(frame *wire.StreamFrame) (bool /* completed */, error) {
|
||||||
s.mutex.Lock()
|
|
||||||
defer s.mutex.Unlock()
|
|
||||||
|
|
||||||
maxOffset := frame.Offset + frame.DataLen()
|
maxOffset := frame.Offset + frame.DataLen()
|
||||||
if err := s.flowController.UpdateHighestReceived(maxOffset, frame.FinBit); err != nil {
|
if err := s.flowController.UpdateHighestReceived(maxOffset, frame.FinBit); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
|
@ -244,7 +245,10 @@ func (s *receiveStream) handleStreamFrameImpl(frame *wire.StreamFrame) (bool /*
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame) error {
|
func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame) error {
|
||||||
|
s.mutex.Lock()
|
||||||
completed, err := s.handleResetStreamFrameImpl(frame)
|
completed, err := s.handleResetStreamFrameImpl(frame)
|
||||||
|
s.mutex.Unlock()
|
||||||
|
|
||||||
if completed {
|
if completed {
|
||||||
s.streamCompleted()
|
s.streamCompleted()
|
||||||
}
|
}
|
||||||
|
@ -252,9 +256,6 @@ func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame) err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *receiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame) (bool /*completed */, error) {
|
func (s *receiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame) (bool /*completed */, error) {
|
||||||
s.mutex.Lock()
|
|
||||||
defer s.mutex.Unlock()
|
|
||||||
|
|
||||||
if s.closedForShutdown {
|
if s.closedForShutdown {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -146,7 +146,10 @@ func (s *sendStream) Write(p []byte) (int, error) {
|
||||||
// popStreamFrame returns the next STREAM frame that is supposed to be sent on this stream
|
// popStreamFrame returns the next STREAM frame that is supposed to be sent on this stream
|
||||||
// maxBytes is the maximum length this frame (including frame header) will have.
|
// maxBytes is the maximum length this frame (including frame header) will have.
|
||||||
func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool /* has more data to send */) {
|
func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool /* has more data to send */) {
|
||||||
|
s.mutex.Lock()
|
||||||
completed, frame, hasMoreData := s.popStreamFrameImpl(maxBytes)
|
completed, frame, hasMoreData := s.popStreamFrameImpl(maxBytes)
|
||||||
|
s.mutex.Unlock()
|
||||||
|
|
||||||
if completed {
|
if completed {
|
||||||
s.sender.onStreamCompleted(s.streamID)
|
s.sender.onStreamCompleted(s.streamID)
|
||||||
}
|
}
|
||||||
|
@ -154,9 +157,6 @@ func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sendStream) popStreamFrameImpl(maxBytes protocol.ByteCount) (bool /* completed */, *wire.StreamFrame, bool /* has more data to send */) {
|
func (s *sendStream) popStreamFrameImpl(maxBytes protocol.ByteCount) (bool /* completed */, *wire.StreamFrame, bool /* has more data to send */) {
|
||||||
s.mutex.Lock()
|
|
||||||
defer s.mutex.Unlock()
|
|
||||||
|
|
||||||
if s.canceledWrite || s.closeForShutdownErr != nil {
|
if s.canceledWrite || s.closeForShutdownErr != nil {
|
||||||
return false, nil, false
|
return false, nil, false
|
||||||
}
|
}
|
||||||
|
@ -273,6 +273,7 @@ func (s *sendStream) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) {
|
||||||
s.mutex.Lock()
|
s.mutex.Lock()
|
||||||
hasStreamData := s.dataForWriting != nil
|
hasStreamData := s.dataForWriting != nil
|
||||||
s.mutex.Unlock()
|
s.mutex.Unlock()
|
||||||
|
|
||||||
s.flowController.UpdateSendWindow(frame.ByteOffset)
|
s.flowController.UpdateSendWindow(frame.ByteOffset)
|
||||||
if hasStreamData {
|
if hasStreamData {
|
||||||
s.sender.onHasStreamData(s.streamID)
|
s.sender.onHasStreamData(s.streamID)
|
||||||
|
@ -280,16 +281,17 @@ func (s *sendStream) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sendStream) handleStopSendingFrame(frame *wire.StopSendingFrame) {
|
func (s *sendStream) handleStopSendingFrame(frame *wire.StopSendingFrame) {
|
||||||
if completed := s.handleStopSendingFrameImpl(frame); completed {
|
s.mutex.Lock()
|
||||||
|
completed := s.handleStopSendingFrameImpl(frame)
|
||||||
|
s.mutex.Unlock()
|
||||||
|
|
||||||
|
if completed {
|
||||||
s.sender.onStreamCompleted(s.streamID)
|
s.sender.onStreamCompleted(s.streamID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// must be called after locking the mutex
|
// must be called after locking the mutex
|
||||||
func (s *sendStream) handleStopSendingFrameImpl(frame *wire.StopSendingFrame) bool /*completed*/ {
|
func (s *sendStream) handleStopSendingFrameImpl(frame *wire.StopSendingFrame) bool /*completed*/ {
|
||||||
s.mutex.Lock()
|
|
||||||
defer s.mutex.Unlock()
|
|
||||||
|
|
||||||
writeErr := streamCanceledError{
|
writeErr := streamCanceledError{
|
||||||
errorCode: frame.ErrorCode,
|
errorCode: frame.ErrorCode,
|
||||||
error: fmt.Errorf("Stream %d was reset with error code %d", s.streamID, frame.ErrorCode),
|
error: fmt.Errorf("Stream %d was reset with error code %d", s.streamID, frame.ErrorCode),
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue