From aa091fe672c37448b47d9d3908ac9745c7bb3b10 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 14 Feb 2023 10:57:00 +1300 Subject: [PATCH] remove unneeded tracking variables from streams, optimize memory layout (#3699) * remove the closedForShutdown boolean in the send stream * remove the canceledWrite boolean in the send stream * remove the closedForShutdown boolean in the receive stream * remove the canceledRead boolean in the receive stream * remove the resetRemotely boolean in the receive stream * optimize memory layout of the receiveStream This brings it down from 200 to 192 bytes. --- receive_stream.go | 36 +++++++++++++++--------------------- receive_stream_test.go | 2 +- send_stream.go | 28 ++++++++++++---------------- 3 files changed, 28 insertions(+), 38 deletions(-) diff --git a/receive_stream.go b/receive_stream.go index 5d220e22..0a7e9416 100644 --- a/receive_stream.go +++ b/receive_stream.go @@ -34,18 +34,14 @@ type receiveStream struct { currentFrame []byte currentFrameDone func() - currentFrameIsLast bool // is the currentFrame the last frame on this stream readPosInFrame int + currentFrameIsLast bool // is the currentFrame the last frame on this stream + finRead bool // set once we read a frame with a Fin closeForShutdownErr error cancelReadErr error resetRemotelyErr *StreamError - closedForShutdown bool // set when CloseForShutdown() is called - finRead bool // set once we read a frame with a Fin - canceledRead bool // set when CancelRead() is called - resetRemotely bool // set when handleResetStreamFrame() is called - readChan chan struct{} readOnce chan struct{} // cap: 1, to protect against concurrent use of Read deadline time.Time @@ -100,13 +96,13 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err if s.finRead { return false, 0, io.EOF } - if s.canceledRead { + if s.cancelReadErr != nil { return false, 0, s.cancelReadErr } - if s.resetRemotely { + if s.resetRemotelyErr != nil { return false, 0, s.resetRemotelyErr } - if s.closedForShutdown { + if s.closeForShutdownErr != nil { return false, 0, s.closeForShutdownErr } @@ -122,13 +118,13 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err for { // Stop waiting on errors - if s.closedForShutdown { + if s.closeForShutdownErr != nil { return false, bytesRead, s.closeForShutdownErr } - if s.canceledRead { + if s.cancelReadErr != nil { return false, bytesRead, s.cancelReadErr } - if s.resetRemotely { + if s.resetRemotelyErr != nil { return false, bytesRead, s.resetRemotelyErr } @@ -175,8 +171,9 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err s.readPosInFrame += m bytesRead += m - // when a RESET_STREAM was received, the was already informed about the final byteOffset for this stream - if !s.resetRemotely { + // when a RESET_STREAM was received, the flow controller was already + // informed about the final byteOffset for this stream + if s.resetRemotelyErr == nil { s.flowController.AddBytesRead(protocol.ByteCount(m)) } @@ -211,10 +208,9 @@ func (s *receiveStream) CancelRead(errorCode StreamErrorCode) { } func (s *receiveStream) cancelReadImpl(errorCode qerr.StreamErrorCode) bool /* completed */ { - if s.finRead || s.canceledRead || s.resetRemotely { + if s.finRead || s.cancelReadErr != nil || s.resetRemotelyErr != nil { return false } - s.canceledRead = true s.cancelReadErr = &StreamError{StreamID: s.streamID, ErrorCode: errorCode, Remote: false} s.signalRead() s.sender.queueControlFrame(&wire.StopSendingFrame{ @@ -247,7 +243,7 @@ func (s *receiveStream) handleStreamFrameImpl(frame *wire.StreamFrame) (bool /* newlyRcvdFinalOffset = s.finalOffset == protocol.MaxByteCount s.finalOffset = maxOffset } - if s.canceledRead { + if s.cancelReadErr != nil { return newlyRcvdFinalOffset, nil } if err := s.frameQueue.Push(frame.Data, frame.Offset, frame.PutBack); err != nil { @@ -270,7 +266,7 @@ func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame) err } func (s *receiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame) (bool /*completed */, error) { - if s.closedForShutdown { + if s.closeForShutdownErr != nil { return false, nil } if err := s.flowController.UpdateHighestReceived(frame.FinalSize, true); err != nil { @@ -280,10 +276,9 @@ func (s *receiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame) s.finalOffset = frame.FinalSize // ignore duplicate RESET_STREAM frames for this stream (after checking their final offset) - if s.resetRemotely { + if s.resetRemotelyErr != nil { return false, nil } - s.resetRemotely = true s.resetRemotelyErr = &StreamError{ StreamID: s.streamID, ErrorCode: frame.ErrorCode, @@ -310,7 +305,6 @@ func (s *receiveStream) SetReadDeadline(t time.Time) error { // The peer will NOT be informed about this: the stream is closed without sending a FIN or RESET. func (s *receiveStream) closeForShutdown(err error) { s.mutex.Lock() - s.closedForShutdown = true s.closeForShutdownErr = err s.mutex.Unlock() s.signalRead() diff --git a/receive_stream_test.go b/receive_stream_test.go index 3987eea6..f3c515e6 100644 --- a/receive_stream_test.go +++ b/receive_stream_test.go @@ -682,7 +682,7 @@ var _ = Describe("Receive Stream", func() { }) It("doesn't do anything when it was closed for shutdown", func() { - str.closeForShutdown(nil) + str.closeForShutdown(errors.New("shutdown")) err := str.handleResetStreamFrame(rst) Expect(err).ToNot(HaveOccurred()) }) diff --git a/send_stream.go b/send_stream.go index 6b1d7b17..cebe30ef 100644 --- a/send_stream.go +++ b/send_stream.go @@ -40,11 +40,9 @@ type sendStream struct { cancelWriteErr error closeForShutdownErr error - closedForShutdown bool // set when CloseForShutdown() is called - finishedWriting bool // set once Close() is called - canceledWrite bool // set when CancelWrite() is called, or a STOP_SENDING frame is received - finSent bool // set when a STREAM_FRAME with FIN bit has been sent - completed bool // set when this stream has been reported to the streamSender as completed + finishedWriting bool // set once Close() is called + finSent bool // set when a STREAM_FRAME with FIN bit has been sent + completed bool // set when this stream has been reported to the streamSender as completed dataForWriting []byte // during a Write() call, this slice is the part of p that still needs to be sent out nextFrame *wire.StreamFrame @@ -94,7 +92,7 @@ func (s *sendStream) Write(p []byte) (int, error) { if s.finishedWriting { return 0, fmt.Errorf("write on closed stream %d", s.streamID) } - if s.canceledWrite { + if s.cancelWriteErr != nil { return 0, s.cancelWriteErr } if s.closeForShutdownErr != nil { @@ -153,7 +151,7 @@ func (s *sendStream) Write(p []byte) (int, error) { } deadlineTimer.Reset(deadline) } - if s.dataForWriting == nil || s.canceledWrite || s.closedForShutdown { + if s.dataForWriting == nil || s.cancelWriteErr != nil || s.closeForShutdownErr != nil { break } } @@ -219,7 +217,7 @@ func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount, v protocol.Vers } func (s *sendStream) popNewOrRetransmittedStreamFrame(maxBytes protocol.ByteCount, v protocol.VersionNumber) (*wire.StreamFrame, bool /* has more data to send */) { - if s.canceledWrite || s.closeForShutdownErr != nil { + if s.cancelWriteErr != nil || s.closeForShutdownErr != nil { return nil, false } @@ -354,7 +352,7 @@ func (s *sendStream) frameAcked(f wire.Frame) { f.(*wire.StreamFrame).PutBack() s.mutex.Lock() - if s.canceledWrite { + if s.cancelWriteErr != nil { s.mutex.Unlock() return } @@ -371,7 +369,7 @@ func (s *sendStream) frameAcked(f wire.Frame) { } func (s *sendStream) isNewlyCompleted() bool { - completed := (s.finSent || s.canceledWrite) && s.numOutstandingFrames == 0 && len(s.retransmissionQueue) == 0 + completed := (s.finSent || s.cancelWriteErr != nil) && s.numOutstandingFrames == 0 && len(s.retransmissionQueue) == 0 if completed && !s.completed { s.completed = true return true @@ -383,7 +381,7 @@ func (s *sendStream) queueRetransmission(f wire.Frame) { sf := f.(*wire.StreamFrame) sf.DataLenPresent = true s.mutex.Lock() - if s.canceledWrite { + if s.cancelWriteErr != nil { s.mutex.Unlock() return } @@ -399,11 +397,11 @@ func (s *sendStream) queueRetransmission(f wire.Frame) { func (s *sendStream) Close() error { s.mutex.Lock() - if s.closedForShutdown { + if s.closeForShutdownErr != nil { s.mutex.Unlock() return nil } - if s.canceledWrite { + if s.cancelWriteErr != nil { s.mutex.Unlock() return fmt.Errorf("close called for canceled stream %d", s.streamID) } @@ -422,12 +420,11 @@ func (s *sendStream) CancelWrite(errorCode StreamErrorCode) { // must be called after locking the mutex func (s *sendStream) cancelWriteImpl(errorCode qerr.StreamErrorCode, remote bool) { s.mutex.Lock() - if s.canceledWrite { + if s.cancelWriteErr != nil { s.mutex.Unlock() return } s.ctxCancel() - s.canceledWrite = true s.cancelWriteErr = &StreamError{StreamID: s.streamID, ErrorCode: errorCode, Remote: remote} s.numOutstandingFrames = 0 s.retransmissionQueue = nil @@ -478,7 +475,6 @@ func (s *sendStream) SetWriteDeadline(t time.Time) error { func (s *sendStream) closeForShutdown(err error) { s.mutex.Lock() s.ctxCancel() - s.closedForShutdown = true s.closeForShutdownErr = err s.mutex.Unlock() s.signalWrite()