diff --git a/crypto_stream_test.go b/crypto_stream_test.go index 2debfe05..fc2a61a6 100644 --- a/crypto_stream_test.go +++ b/crypto_stream_test.go @@ -25,7 +25,7 @@ var _ = Describe("Crypto Stream", func() { }) It("says if it has data for writing", func() { - mockSender.EXPECT().scheduleSending() + mockSender.EXPECT().onHasStreamData(str.version.CryptoStreamID()) Expect(str.hasDataForWriting()).To(BeFalse()) done := make(chan struct{}) go func() { diff --git a/mock_stream_sender_test.go b/mock_stream_sender_test.go index 637e8cba..8549e343 100644 --- a/mock_stream_sender_test.go +++ b/mock_stream_sender_test.go @@ -35,6 +35,16 @@ func (m *MockStreamSender) EXPECT() *MockStreamSenderMockRecorder { return m.recorder } +// onHasStreamData mocks base method +func (m *MockStreamSender) onHasStreamData(arg0 protocol.StreamID) { + m.ctrl.Call(m, "onHasStreamData", arg0) +} + +// onHasStreamData indicates an expected call of onHasStreamData +func (mr *MockStreamSenderMockRecorder) onHasStreamData(arg0 interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "onHasStreamData", reflect.TypeOf((*MockStreamSender)(nil).onHasStreamData), arg0) +} + // onHasWindowUpdate mocks base method func (m *MockStreamSender) onHasWindowUpdate(arg0 protocol.StreamID) { m.ctrl.Call(m, "onHasWindowUpdate", arg0) @@ -54,13 +64,3 @@ func (m *MockStreamSender) queueControlFrame(arg0 wire.Frame) { func (mr *MockStreamSenderMockRecorder) queueControlFrame(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "queueControlFrame", reflect.TypeOf((*MockStreamSender)(nil).queueControlFrame), arg0) } - -// scheduleSending mocks base method -func (m *MockStreamSender) scheduleSending() { - m.ctrl.Call(m, "scheduleSending") -} - -// scheduleSending indicates an expected call of scheduleSending -func (mr *MockStreamSenderMockRecorder) scheduleSending() *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "scheduleSending", reflect.TypeOf((*MockStreamSender)(nil).scheduleSending)) -} diff --git a/packet_packer_test.go b/packet_packer_test.go index a75b2407..93b85493 100644 --- a/packet_packer_test.go +++ b/packet_packer_test.go @@ -4,6 +4,7 @@ import ( "bytes" "math" + "github.com/golang/mock/gomock" "github.com/lucas-clemente/quic-go/ackhandler" "github.com/lucas-clemente/quic-go/internal/flowcontrol" "github.com/lucas-clemente/quic-go/internal/handshake" @@ -62,7 +63,7 @@ var _ = Describe("Packet packer", func() { BeforeEach(func() { version := versionGQUICFrames mockSender := NewMockStreamSender(mockCtrl) - mockSender.EXPECT().scheduleSending().AnyTimes() + mockSender.EXPECT().onHasStreamData(gomock.Any()).AnyTimes() cryptoStream = newCryptoStream(mockSender, flowcontrol.NewStreamFlowController(version.CryptoStreamID(), false, flowcontrol.NewConnectionFlowController(1000, 1000, nil), 1000, 1000, 1000, nil), version) streamsMap := newStreamsMap(nil, protocol.PerspectiveServer, versionGQUICFrames) streamFramer = newStreamFramer(cryptoStream, streamsMap, versionGQUICFrames) diff --git a/send_stream.go b/send_stream.go index f9008087..3584febb 100644 --- a/send_stream.go +++ b/send_stream.go @@ -84,7 +84,7 @@ func (s *sendStream) Write(p []byte) (int, error) { s.dataForWriting = make([]byte, len(p)) copy(s.dataForWriting, p) - s.sender.scheduleSending() + s.sender.onHasStreamData(s.streamID) var bytesWritten int var err error @@ -191,7 +191,7 @@ func (s *sendStream) Close() error { return fmt.Errorf("Close called for canceled stream %d", s.streamID) } s.finishedWriting = true - s.sender.scheduleSending() + s.sender.onHasStreamData(s.streamID) // need to send the FIN s.ctxCancel() return nil } diff --git a/send_stream_test.go b/send_stream_test.go index 5d2b4472..51b0258f 100644 --- a/send_stream_test.go +++ b/send_stream_test.go @@ -42,7 +42,7 @@ var _ = Describe("Send Stream", func() { Context("writing", func() { It("writes and gets all data at once", func() { - mockSender.EXPECT().scheduleSending() + mockSender.EXPECT().onHasStreamData(streamID) mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6)) mockFC.EXPECT().IsNewlyBlocked() @@ -70,7 +70,7 @@ var _ = Describe("Send Stream", func() { }) It("writes and gets data in two turns", func() { - mockSender.EXPECT().scheduleSending() + mockSender.EXPECT().onHasStreamData(streamID) frameHeaderLen := protocol.ByteCount(4) mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2) mockFC.EXPECT().AddBytesSent(gomock.Any() /* protocol.ByteCount(3)*/).Times(2) @@ -107,7 +107,7 @@ var _ = Describe("Send Stream", func() { }) It("copies the slice while writing", func() { - mockSender.EXPECT().scheduleSending() + mockSender.EXPECT().onHasStreamData(streamID) frameHeaderSize := protocol.ByteCount(4) mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(1)) @@ -142,7 +142,7 @@ var _ = Describe("Send Stream", func() { }) It("cancels the context when Close is called", func() { - mockSender.EXPECT().scheduleSending() + mockSender.EXPECT().onHasStreamData(streamID) Expect(str.Context().Done()).ToNot(BeClosed()) str.Close() Expect(str.Context().Done()).To(BeClosed()) @@ -150,7 +150,7 @@ var _ = Describe("Send Stream", func() { Context("adding BLOCKED", func() { It("queues a BLOCKED frame if the stream is flow control blocked", func() { - mockSender.EXPECT().scheduleSending() + mockSender.EXPECT().onHasStreamData(streamID) mockSender.EXPECT().queueControlFrame(&wire.StreamBlockedFrame{ StreamID: streamID, Offset: 10, @@ -175,7 +175,7 @@ var _ = Describe("Send Stream", func() { }) It("doesn't queue a BLOCKED frame if the stream is flow control blocked, but the frame popped has the FIN bit set", func() { - mockSender.EXPECT().scheduleSending().Times(2) // once for the Write, once for the Close + mockSender.EXPECT().onHasStreamData(streamID).Times(2) // once for the Write, once for the Close mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6)) // don't EXPECT a call to mockFC.IsNewlyBlocked @@ -208,7 +208,7 @@ var _ = Describe("Send Stream", func() { }) It("unblocks after the deadline", func() { - mockSender.EXPECT().scheduleSending() + mockSender.EXPECT().onHasStreamData(streamID) deadline := time.Now().Add(scaleDuration(50 * time.Millisecond)) str.SetWriteDeadline(deadline) n, err := strWithTimeout.Write([]byte("foobar")) @@ -218,7 +218,7 @@ var _ = Describe("Send Stream", func() { }) It("returns the number of bytes written, when the deadline expires", func() { - mockSender.EXPECT().scheduleSending() + mockSender.EXPECT().onHasStreamData(streamID) mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(10000)).AnyTimes() mockFC.EXPECT().AddBytesSent(gomock.Any()) mockFC.EXPECT().IsNewlyBlocked() @@ -244,7 +244,7 @@ var _ = Describe("Send Stream", func() { }) It("doesn't pop any data after the deadline expired", func() { - mockSender.EXPECT().scheduleSending() + mockSender.EXPECT().onHasStreamData(streamID) mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(10000)).AnyTimes() mockFC.EXPECT().AddBytesSent(gomock.Any()) mockFC.EXPECT().IsNewlyBlocked() @@ -267,7 +267,7 @@ var _ = Describe("Send Stream", func() { }) It("doesn't unblock if the deadline is changed before the first one expires", func() { - mockSender.EXPECT().scheduleSending() + mockSender.EXPECT().onHasStreamData(streamID) deadline1 := time.Now().Add(scaleDuration(50 * time.Millisecond)) deadline2 := time.Now().Add(scaleDuration(100 * time.Millisecond)) str.SetWriteDeadline(deadline1) @@ -286,7 +286,7 @@ var _ = Describe("Send Stream", func() { }) It("unblocks earlier, when a new deadline is set", func() { - mockSender.EXPECT().scheduleSending() + mockSender.EXPECT().onHasStreamData(streamID) deadline1 := time.Now().Add(scaleDuration(200 * time.Millisecond)) deadline2 := time.Now().Add(scaleDuration(50 * time.Millisecond)) go func() { @@ -306,14 +306,14 @@ var _ = Describe("Send Stream", func() { Context("closing", func() { It("doesn't allow writes after it has been closed", func() { - mockSender.EXPECT().scheduleSending() + mockSender.EXPECT().onHasStreamData(streamID) str.Close() _, err := strWithTimeout.Write([]byte("foobar")) Expect(err).To(MatchError("write on closed stream 1337")) }) It("allows FIN", func() { - mockSender.EXPECT().scheduleSending() + mockSender.EXPECT().onHasStreamData(streamID) str.Close() f := str.popStreamFrame(1000) Expect(f).ToNot(BeNil()) @@ -322,7 +322,7 @@ var _ = Describe("Send Stream", func() { }) It("doesn't send a FIN when there's still data", func() { - mockSender.EXPECT().scheduleSending() + mockSender.EXPECT().onHasStreamData(streamID) frameHeaderLen := protocol.ByteCount(4) mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2) mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2) @@ -345,7 +345,7 @@ var _ = Describe("Send Stream", func() { }) It("doesn't allow FIN twice", func() { - mockSender.EXPECT().scheduleSending() + mockSender.EXPECT().onHasStreamData(streamID) str.Close() f := str.popStreamFrame(1000) Expect(f).ToNot(BeNil()) @@ -366,7 +366,7 @@ var _ = Describe("Send Stream", func() { }) It("doesn't get data for writing if an error occurred", func() { - mockSender.EXPECT().scheduleSending() + mockSender.EXPECT().onHasStreamData(streamID) mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)) mockFC.EXPECT().AddBytesSent(gomock.Any()) mockFC.EXPECT().IsNewlyBlocked() @@ -405,7 +405,7 @@ var _ = Describe("Send Stream", func() { }) It("unblocks Write", func() { - mockSender.EXPECT().scheduleSending() + mockSender.EXPECT().onHasStreamData(streamID) mockSender.EXPECT().queueControlFrame(gomock.Any()) mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount) mockFC.EXPECT().AddBytesSent(gomock.Any()) @@ -454,7 +454,7 @@ var _ = Describe("Send Stream", func() { }) It("doesn't cancel when the stream was already closed", func() { - mockSender.EXPECT().scheduleSending() + mockSender.EXPECT().onHasStreamData(streamID) err := str.Close() Expect(err).ToNot(HaveOccurred()) err = str.CancelWrite(123) @@ -475,7 +475,7 @@ var _ = Describe("Send Stream", func() { }) It("unblocks Write", func() { - mockSender.EXPECT().scheduleSending() + mockSender.EXPECT().onHasStreamData(streamID) mockSender.EXPECT().queueControlFrame(gomock.Any()) done := make(chan struct{}) go func() { @@ -517,7 +517,7 @@ var _ = Describe("Send Stream", func() { }) It("is finished after Close()", func() { - mockSender.EXPECT().scheduleSending() + mockSender.EXPECT().onHasStreamData(streamID) str.Close() f := str.popStreamFrame(1000) Expect(f.FinBit).To(BeTrue()) diff --git a/session.go b/session.go index 9b269c78..7d76790b 100644 --- a/session.go +++ b/session.go @@ -961,6 +961,10 @@ func (s *session) onHasWindowUpdate(id protocol.StreamID) { s.scheduleSending() } +func (s *session) onHasStreamData(streamID protocol.StreamID) { + s.scheduleSending() +} + func (s *session) LocalAddr() net.Addr { return s.conn.LocalAddr() } diff --git a/stream.go b/stream.go index 398557fd..69611ac2 100644 --- a/stream.go +++ b/stream.go @@ -16,9 +16,9 @@ const ( // The streamSender is notified by the stream about various events. type streamSender interface { - scheduleSending() queueControlFrame(wire.Frame) onHasWindowUpdate(protocol.StreamID) + onHasStreamData(protocol.StreamID) } type streamI interface { diff --git a/stream_test.go b/stream_test.go index 0a5802e4..9a7ad757 100644 --- a/stream_test.go +++ b/stream_test.go @@ -58,10 +58,6 @@ var _ = Describe("Stream", func() { // need some stream cancelation tests here, since gQUIC doesn't cleanly separate the two stream halves Context("stream cancelations", func() { - BeforeEach(func() { - mockSender.EXPECT().scheduleSending().AnyTimes() - }) - Context("for gQUIC", func() { BeforeEach(func() { str.version = versionGQUICFrames @@ -70,6 +66,7 @@ var _ = Describe("Stream", func() { }) It("unblocks Write when receiving a RST_STREAM frame with non-zero error code", func() { + mockSender.EXPECT().onHasStreamData(streamID) mockSender.EXPECT().queueControlFrame(&wire.RstStreamFrame{ StreamID: streamID, ByteOffset: 1000, @@ -99,6 +96,7 @@ var _ = Describe("Stream", func() { }) It("unblocks Write when receiving a RST_STREAM frame with error code 0", func() { + mockSender.EXPECT().onHasStreamData(streamID) mockSender.EXPECT().queueControlFrame(&wire.RstStreamFrame{ StreamID: streamID, ByteOffset: 1000, @@ -129,6 +127,7 @@ var _ = Describe("Stream", func() { It("sends a RST_STREAM with error code 0, after the stream is closed", func() { str.version = versionGQUICFrames + mockSender.EXPECT().onHasStreamData(streamID).Times(2) // once for the Write, once for the Close mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).AnyTimes() mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6)) mockFC.EXPECT().IsNewlyBlocked() @@ -148,8 +147,7 @@ var _ = Describe("Stream", func() { ByteOffset: 6, ErrorCode: 0, }) - err = str.Close() - Expect(err).ToNot(HaveOccurred()) + Expect(str.Close()).To(Succeed()) }) }) @@ -159,6 +157,7 @@ var _ = Describe("Stream", func() { StreamID: streamID, ErrorCode: 1234, }) + mockSender.EXPECT().onHasStreamData(streamID) err := str.CancelRead(1234) Expect(err).ToNot(HaveOccurred()) Expect(str.Close()).To(Succeed())