mirror of
https://github.com/refraction-networking/uquic.git
synced 2025-04-04 20:57:36 +03:00
introduce a streamManager interface for the streamsMap
This commit is contained in:
parent
e802491a8f
commit
69437a0e78
5 changed files with 343 additions and 252 deletions
146
mock_stream_manager_test.go
Normal file
146
mock_stream_manager_test.go
Normal file
|
@ -0,0 +1,146 @@
|
|||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: github.com/lucas-clemente/quic-go (interfaces: StreamManager)
|
||||
|
||||
// Package quic is a generated GoMock package.
|
||||
package quic
|
||||
|
||||
import (
|
||||
reflect "reflect"
|
||||
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
handshake "github.com/lucas-clemente/quic-go/internal/handshake"
|
||||
protocol "github.com/lucas-clemente/quic-go/internal/protocol"
|
||||
)
|
||||
|
||||
// MockStreamManager is a mock of StreamManager interface
|
||||
type MockStreamManager struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockStreamManagerMockRecorder
|
||||
}
|
||||
|
||||
// MockStreamManagerMockRecorder is the mock recorder for MockStreamManager
|
||||
type MockStreamManagerMockRecorder struct {
|
||||
mock *MockStreamManager
|
||||
}
|
||||
|
||||
// NewMockStreamManager creates a new mock instance
|
||||
func NewMockStreamManager(ctrl *gomock.Controller) *MockStreamManager {
|
||||
mock := &MockStreamManager{ctrl: ctrl}
|
||||
mock.recorder = &MockStreamManagerMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use
|
||||
func (m *MockStreamManager) EXPECT() *MockStreamManagerMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// AcceptStream mocks base method
|
||||
func (m *MockStreamManager) AcceptStream() (Stream, error) {
|
||||
ret := m.ctrl.Call(m, "AcceptStream")
|
||||
ret0, _ := ret[0].(Stream)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// AcceptStream indicates an expected call of AcceptStream
|
||||
func (mr *MockStreamManagerMockRecorder) AcceptStream() *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AcceptStream", reflect.TypeOf((*MockStreamManager)(nil).AcceptStream))
|
||||
}
|
||||
|
||||
// CloseWithError mocks base method
|
||||
func (m *MockStreamManager) CloseWithError(arg0 error) {
|
||||
m.ctrl.Call(m, "CloseWithError", arg0)
|
||||
}
|
||||
|
||||
// CloseWithError indicates an expected call of CloseWithError
|
||||
func (mr *MockStreamManagerMockRecorder) CloseWithError(arg0 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseWithError", reflect.TypeOf((*MockStreamManager)(nil).CloseWithError), arg0)
|
||||
}
|
||||
|
||||
// DeleteStream mocks base method
|
||||
func (m *MockStreamManager) DeleteStream(arg0 protocol.StreamID) error {
|
||||
ret := m.ctrl.Call(m, "DeleteStream", arg0)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// DeleteStream indicates an expected call of DeleteStream
|
||||
func (mr *MockStreamManagerMockRecorder) DeleteStream(arg0 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteStream", reflect.TypeOf((*MockStreamManager)(nil).DeleteStream), arg0)
|
||||
}
|
||||
|
||||
// GetOrOpenReceiveStream mocks base method
|
||||
func (m *MockStreamManager) GetOrOpenReceiveStream(arg0 protocol.StreamID) (receiveStreamI, error) {
|
||||
ret := m.ctrl.Call(m, "GetOrOpenReceiveStream", arg0)
|
||||
ret0, _ := ret[0].(receiveStreamI)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// GetOrOpenReceiveStream indicates an expected call of GetOrOpenReceiveStream
|
||||
func (mr *MockStreamManagerMockRecorder) GetOrOpenReceiveStream(arg0 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrOpenReceiveStream", reflect.TypeOf((*MockStreamManager)(nil).GetOrOpenReceiveStream), arg0)
|
||||
}
|
||||
|
||||
// GetOrOpenSendStream mocks base method
|
||||
func (m *MockStreamManager) GetOrOpenSendStream(arg0 protocol.StreamID) (sendStreamI, error) {
|
||||
ret := m.ctrl.Call(m, "GetOrOpenSendStream", arg0)
|
||||
ret0, _ := ret[0].(sendStreamI)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// GetOrOpenSendStream indicates an expected call of GetOrOpenSendStream
|
||||
func (mr *MockStreamManagerMockRecorder) GetOrOpenSendStream(arg0 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrOpenSendStream", reflect.TypeOf((*MockStreamManager)(nil).GetOrOpenSendStream), arg0)
|
||||
}
|
||||
|
||||
// GetOrOpenStream mocks base method
|
||||
func (m *MockStreamManager) GetOrOpenStream(arg0 protocol.StreamID) (streamI, error) {
|
||||
ret := m.ctrl.Call(m, "GetOrOpenStream", arg0)
|
||||
ret0, _ := ret[0].(streamI)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// GetOrOpenStream indicates an expected call of GetOrOpenStream
|
||||
func (mr *MockStreamManagerMockRecorder) GetOrOpenStream(arg0 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrOpenStream", reflect.TypeOf((*MockStreamManager)(nil).GetOrOpenStream), arg0)
|
||||
}
|
||||
|
||||
// OpenStream mocks base method
|
||||
func (m *MockStreamManager) OpenStream() (Stream, error) {
|
||||
ret := m.ctrl.Call(m, "OpenStream")
|
||||
ret0, _ := ret[0].(Stream)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// OpenStream indicates an expected call of OpenStream
|
||||
func (mr *MockStreamManagerMockRecorder) OpenStream() *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OpenStream", reflect.TypeOf((*MockStreamManager)(nil).OpenStream))
|
||||
}
|
||||
|
||||
// OpenStreamSync mocks base method
|
||||
func (m *MockStreamManager) OpenStreamSync() (Stream, error) {
|
||||
ret := m.ctrl.Call(m, "OpenStreamSync")
|
||||
ret0, _ := ret[0].(Stream)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// OpenStreamSync indicates an expected call of OpenStreamSync
|
||||
func (mr *MockStreamManagerMockRecorder) OpenStreamSync() *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OpenStreamSync", reflect.TypeOf((*MockStreamManager)(nil).OpenStreamSync))
|
||||
}
|
||||
|
||||
// UpdateLimits mocks base method
|
||||
func (m *MockStreamManager) UpdateLimits(arg0 *handshake.TransportParameters) {
|
||||
m.ctrl.Call(m, "UpdateLimits", arg0)
|
||||
}
|
||||
|
||||
// UpdateLimits indicates an expected call of UpdateLimits
|
||||
func (mr *MockStreamManagerMockRecorder) UpdateLimits(arg0 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateLimits", reflect.TypeOf((*MockStreamManager)(nil).UpdateLimits), arg0)
|
||||
}
|
|
@ -7,5 +7,6 @@ package quic
|
|||
//go:generate sh -c "./mockgen_private.sh quic mock_stream_getter_test.go github.com/lucas-clemente/quic-go streamGetter StreamGetter"
|
||||
//go:generate sh -c "./mockgen_private.sh quic mock_stream_frame_source_test.go github.com/lucas-clemente/quic-go streamFrameSource StreamFrameSource"
|
||||
//go:generate sh -c "./mockgen_private.sh quic mock_crypto_stream_test.go github.com/lucas-clemente/quic-go cryptoStreamI CryptoStream"
|
||||
//go:generate sh -c "sed -i '' 's/quic_go.//g' mock_stream_getter_test.go"
|
||||
//go:generate sh -c "./mockgen_private.sh quic mock_stream_manager_test.go github.com/lucas-clemente/quic-go streamManager StreamManager"
|
||||
//go:generate sh -c "sed -i '' 's/quic_go.//g' mock_stream_getter_test.go mock_stream_manager_test.go"
|
||||
//go:generate sh -c "goimports -w mock*_test.go"
|
||||
|
|
|
@ -54,7 +54,7 @@ type session struct {
|
|||
|
||||
conn connection
|
||||
|
||||
streamsMap *streamsMap
|
||||
streamsMap streamManager
|
||||
cryptoStream cryptoStreamI
|
||||
|
||||
rttStats *congestion.RTTStats
|
||||
|
@ -855,7 +855,7 @@ func (s *session) GetOrOpenStream(id protocol.StreamID) (Stream, error) {
|
|||
return str, err
|
||||
}
|
||||
// make sure to return an actual nil value here, not an Stream with value nil
|
||||
return nil, err
|
||||
return str, err
|
||||
}
|
||||
|
||||
// AcceptStream returns the next stream openend by the peer
|
||||
|
|
428
session_test.go
428
session_test.go
|
@ -83,6 +83,7 @@ var _ = Describe("Session", func() {
|
|||
scfg *handshake.ServerConfig
|
||||
mconn *mockConnection
|
||||
cryptoSetup *mockCryptoSetup
|
||||
streamManager *MockStreamManager
|
||||
handshakeChan chan<- struct{}
|
||||
)
|
||||
|
||||
|
@ -123,7 +124,8 @@ var _ = Describe("Session", func() {
|
|||
)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
sess = pSess.(*session)
|
||||
Expect(sess.streamsMap.streams).To(BeEmpty())
|
||||
streamManager = NewMockStreamManager(mockCtrl)
|
||||
sess.streamsMap = streamManager
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
|
@ -192,70 +194,35 @@ var _ = Describe("Session", func() {
|
|||
})
|
||||
|
||||
Context("frame handling", func() {
|
||||
BeforeEach(func() {
|
||||
sess.streamsMap.newStream = func(id protocol.StreamID) streamI {
|
||||
str := NewMockStreamI(mockCtrl)
|
||||
str.EXPECT().StreamID().Return(id).AnyTimes()
|
||||
return str
|
||||
}
|
||||
})
|
||||
|
||||
Context("handling STREAM frames", func() {
|
||||
BeforeEach(func() {
|
||||
sess.streamsMap.UpdateLimits(&handshake.TransportParameters{MaxStreams: 10000})
|
||||
})
|
||||
|
||||
It("makes new streams", func() {
|
||||
It("passes STREAM frames to the stream", func() {
|
||||
f := &wire.StreamFrame{
|
||||
StreamID: 5,
|
||||
Data: []byte{0xde, 0xca, 0xfb, 0xad},
|
||||
}
|
||||
newStreamLambda := sess.streamsMap.newStream
|
||||
sess.streamsMap.newStream = func(id protocol.StreamID) streamI {
|
||||
str := newStreamLambda(id)
|
||||
if id == 5 {
|
||||
str.(*MockStreamI).EXPECT().handleStreamFrame(f)
|
||||
}
|
||||
return str
|
||||
}
|
||||
str := NewMockReceiveStreamI(mockCtrl)
|
||||
str.EXPECT().handleStreamFrame(f)
|
||||
streamManager.EXPECT().GetOrOpenReceiveStream(protocol.StreamID(5)).Return(str, nil)
|
||||
err := sess.handleStreamFrame(f)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
str, err := sess.streamsMap.GetOrOpenStream(5)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(str).ToNot(BeNil())
|
||||
})
|
||||
|
||||
It("handles existing streams", func() {
|
||||
f1 := &wire.StreamFrame{
|
||||
It("returns errors", func() {
|
||||
testErr := errors.New("test err")
|
||||
f := &wire.StreamFrame{
|
||||
StreamID: 5,
|
||||
Data: []byte{0xde, 0xca},
|
||||
Data: []byte{0xde, 0xca, 0xfb, 0xad},
|
||||
}
|
||||
f2 := &wire.StreamFrame{
|
||||
StreamID: 5,
|
||||
Offset: 2,
|
||||
Data: []byte{0xfb, 0xad},
|
||||
}
|
||||
newStreamLambda := sess.streamsMap.newStream
|
||||
sess.streamsMap.newStream = func(id protocol.StreamID) streamI {
|
||||
str := newStreamLambda(id)
|
||||
if id == 5 {
|
||||
str.(*MockStreamI).EXPECT().handleStreamFrame(f1)
|
||||
str.(*MockStreamI).EXPECT().handleStreamFrame(f2)
|
||||
}
|
||||
return str
|
||||
}
|
||||
sess.handleStreamFrame(f1)
|
||||
numOpenStreams := len(sess.streamsMap.streams)
|
||||
sess.handleStreamFrame(f2)
|
||||
Expect(sess.streamsMap.streams).To(HaveLen(numOpenStreams))
|
||||
str := NewMockReceiveStreamI(mockCtrl)
|
||||
str.EXPECT().handleStreamFrame(f).Return(testErr)
|
||||
streamManager.EXPECT().GetOrOpenReceiveStream(protocol.StreamID(5)).Return(str, nil)
|
||||
err := sess.handleStreamFrame(f)
|
||||
Expect(err).To(MatchError(testErr))
|
||||
})
|
||||
|
||||
It("ignores STREAM frames for closed streams", func() {
|
||||
sess.streamsMap.streams[5] = nil
|
||||
str, err := sess.GetOrOpenStream(5)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(str).To(BeNil()) // make sure the stream is gone
|
||||
err = sess.handleStreamFrame(&wire.StreamFrame{
|
||||
streamManager.EXPECT().GetOrOpenReceiveStream(protocol.StreamID(5)).Return(nil, nil) // for closed streams, the streamManager returns nil
|
||||
err := sess.handleStreamFrame(&wire.StreamFrame{
|
||||
StreamID: 5,
|
||||
Data: []byte("foobar"),
|
||||
})
|
||||
|
@ -300,38 +267,33 @@ var _ = Describe("Session", func() {
|
|||
Context("handling RST_STREAM frames", func() {
|
||||
It("closes the streams for writing", func() {
|
||||
f := &wire.RstStreamFrame{
|
||||
StreamID: 5,
|
||||
StreamID: 555,
|
||||
ErrorCode: 42,
|
||||
ByteOffset: 0x1337,
|
||||
}
|
||||
str, err := sess.GetOrOpenStream(5)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
str.(*MockStreamI).EXPECT().handleRstStreamFrame(f)
|
||||
err = sess.handleRstStreamFrame(f)
|
||||
str := NewMockReceiveStreamI(mockCtrl)
|
||||
streamManager.EXPECT().GetOrOpenReceiveStream(protocol.StreamID(555)).Return(str, nil)
|
||||
str.EXPECT().handleRstStreamFrame(f)
|
||||
err := sess.handleRstStreamFrame(f)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
})
|
||||
|
||||
It("returns errors", func() {
|
||||
f := &wire.RstStreamFrame{
|
||||
StreamID: 5,
|
||||
StreamID: 7,
|
||||
ByteOffset: 0x1337,
|
||||
}
|
||||
testErr := errors.New("flow control violation")
|
||||
str, err := sess.GetOrOpenStream(5)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
str.(*MockStreamI).EXPECT().handleRstStreamFrame(f).Return(testErr)
|
||||
err = sess.handleRstStreamFrame(f)
|
||||
str := NewMockReceiveStreamI(mockCtrl)
|
||||
streamManager.EXPECT().GetOrOpenReceiveStream(protocol.StreamID(7)).Return(str, nil)
|
||||
str.EXPECT().handleRstStreamFrame(f).Return(testErr)
|
||||
err := sess.handleRstStreamFrame(f)
|
||||
Expect(err).To(MatchError(testErr))
|
||||
})
|
||||
|
||||
It("ignores the error when the stream is not known", func() {
|
||||
str, err := sess.GetOrOpenStream(3)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
sess.onStreamCompleted(3)
|
||||
str, err = sess.GetOrOpenStream(3)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(str).To(BeNil())
|
||||
err = sess.handleFrames([]wire.Frame{&wire.RstStreamFrame{
|
||||
It("ignores RST_STREAM frames for closed streams", func() {
|
||||
streamManager.EXPECT().GetOrOpenReceiveStream(protocol.StreamID(3)).Return(nil, nil)
|
||||
err := sess.handleFrames([]wire.Frame{&wire.RstStreamFrame{
|
||||
StreamID: 3,
|
||||
ErrorCode: 42,
|
||||
}}, protocol.EncryptionUnspecified)
|
||||
|
@ -369,13 +331,13 @@ var _ = Describe("Session", func() {
|
|||
|
||||
It("updates the flow control window of a stream", func() {
|
||||
f := &wire.MaxStreamDataFrame{
|
||||
StreamID: 5,
|
||||
ByteOffset: 0x1234,
|
||||
StreamID: 12345,
|
||||
ByteOffset: 0x1337,
|
||||
}
|
||||
str, err := sess.GetOrOpenStream(5)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
str.(*MockStreamI).EXPECT().handleMaxStreamDataFrame(f)
|
||||
err = sess.handleMaxStreamDataFrame(f)
|
||||
str := NewMockSendStreamI(mockCtrl)
|
||||
streamManager.EXPECT().GetOrOpenSendStream(protocol.StreamID(12345)).Return(str, nil)
|
||||
str.EXPECT().handleMaxStreamDataFrame(f)
|
||||
err := sess.handleMaxStreamDataFrame(f)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
})
|
||||
|
||||
|
@ -385,35 +347,10 @@ var _ = Describe("Session", func() {
|
|||
sess.handleMaxDataFrame(&wire.MaxDataFrame{ByteOffset: offset})
|
||||
})
|
||||
|
||||
It("opens a new stream when receiving a MAX_STREAM_DATA frame for an unknown stream", func() {
|
||||
f := &wire.MaxStreamDataFrame{
|
||||
StreamID: 5,
|
||||
ByteOffset: 0x1337,
|
||||
}
|
||||
newStreamLambda := sess.streamsMap.newStream
|
||||
sess.streamsMap.newStream = func(id protocol.StreamID) streamI {
|
||||
str := newStreamLambda(id)
|
||||
if id == 5 {
|
||||
str.(*MockStreamI).EXPECT().handleMaxStreamDataFrame(f)
|
||||
}
|
||||
return str
|
||||
}
|
||||
err := sess.handleMaxStreamDataFrame(f)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
str, err := sess.streamsMap.GetOrOpenStream(5)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(str).ToNot(BeNil())
|
||||
})
|
||||
|
||||
It("ignores MAX_STREAM_DATA frames for a closed stream", func() {
|
||||
str, err := sess.GetOrOpenStream(3)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
sess.onStreamCompleted(3)
|
||||
str, err = sess.GetOrOpenStream(3)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(str).To(BeNil())
|
||||
err = sess.handleFrames([]wire.Frame{&wire.MaxStreamDataFrame{
|
||||
StreamID: 3,
|
||||
streamManager.EXPECT().GetOrOpenSendStream(protocol.StreamID(10)).Return(nil, nil)
|
||||
err := sess.handleFrames([]wire.Frame{&wire.MaxStreamDataFrame{
|
||||
StreamID: 10,
|
||||
ByteOffset: 1337,
|
||||
}}, protocol.EncryptionUnspecified)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
@ -421,24 +358,16 @@ var _ = Describe("Session", func() {
|
|||
})
|
||||
|
||||
Context("handling STOP_SENDING frames", func() {
|
||||
It("opens a new stream when receiving a STOP_SENDING frame for an unknown stream", func() {
|
||||
It("passes the frame to the stream", func() {
|
||||
f := &wire.StopSendingFrame{
|
||||
StreamID: 5,
|
||||
ErrorCode: 10,
|
||||
}
|
||||
newStreamLambda := sess.streamsMap.newStream
|
||||
sess.streamsMap.newStream = func(id protocol.StreamID) streamI {
|
||||
str := newStreamLambda(id)
|
||||
if id == 5 {
|
||||
str.(*MockStreamI).EXPECT().handleStopSendingFrame(f)
|
||||
}
|
||||
return str
|
||||
}
|
||||
str := NewMockSendStreamI(mockCtrl)
|
||||
streamManager.EXPECT().GetOrOpenSendStream(protocol.StreamID(5)).Return(str, nil)
|
||||
str.EXPECT().handleStopSendingFrame(f)
|
||||
err := sess.handleStopSendingFrame(f)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
str, err := sess.streamsMap.GetOrOpenStream(5)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(str).ToNot(BeNil())
|
||||
})
|
||||
|
||||
It("errors when receiving a STOP_SENDING for the crypto stream", func() {
|
||||
|
@ -450,14 +379,8 @@ var _ = Describe("Session", func() {
|
|||
})
|
||||
|
||||
It("ignores STOP_SENDING frames for a closed stream", func() {
|
||||
str, err := sess.GetOrOpenStream(3)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
sess.onStreamCompleted(3)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
str, err = sess.GetOrOpenStream(3)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(str).To(BeNil())
|
||||
err = sess.handleFrames([]wire.Frame{&wire.StopSendingFrame{
|
||||
streamManager.EXPECT().GetOrOpenSendStream(protocol.StreamID(3)).Return(nil, nil)
|
||||
err := sess.handleFrames([]wire.Frame{&wire.StopSendingFrame{
|
||||
StreamID: 3,
|
||||
ErrorCode: 1337,
|
||||
}}, protocol.EncryptionUnspecified)
|
||||
|
@ -486,19 +409,16 @@ var _ = Describe("Session", func() {
|
|||
})
|
||||
|
||||
It("handles CONNECTION_CLOSE frames", func() {
|
||||
testErr := qerr.Error(qerr.ProofInvalid, "foobar")
|
||||
streamManager.EXPECT().CloseWithError(testErr)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
err := sess.run()
|
||||
Expect(err).To(MatchError("ProofInvalid: foobar"))
|
||||
Expect(err).To(MatchError(testErr))
|
||||
close(done)
|
||||
}()
|
||||
_, err := sess.GetOrOpenStream(5)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
for _, s := range sess.streamsMap.streams {
|
||||
s.(*MockStreamI).EXPECT().closeForShutdown(gomock.Any())
|
||||
}
|
||||
err = sess.handleFrames([]wire.Frame{&wire.ConnectionCloseFrame{ErrorCode: qerr.ProofInvalid, ReasonPhrase: "foobar"}}, protocol.EncryptionUnspecified)
|
||||
err := sess.handleFrames([]wire.Frame{&wire.ConnectionCloseFrame{ErrorCode: qerr.ProofInvalid, ReasonPhrase: "foobar"}}, protocol.EncryptionUnspecified)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Eventually(sess.Context().Done()).Should(BeClosed())
|
||||
Eventually(done).Should(BeClosed())
|
||||
|
@ -510,74 +430,26 @@ var _ = Describe("Session", func() {
|
|||
Expect(sess.GetVersion()).To(Equal(protocol.VersionNumber(4242)))
|
||||
})
|
||||
|
||||
Context("accepting streams", func() {
|
||||
BeforeEach(func() {
|
||||
// don't use the mock here
|
||||
sess.streamsMap.newStream = sess.newStream
|
||||
})
|
||||
|
||||
It("waits for new streams", func() {
|
||||
strChan := make(chan Stream)
|
||||
// accept two streams
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
for i := 0; i < 2; i++ {
|
||||
str, err := sess.AcceptStream()
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
strChan <- str
|
||||
}
|
||||
}()
|
||||
Consistently(strChan).ShouldNot(Receive())
|
||||
// this could happen e.g. by receiving a STREAM frame
|
||||
_, err := sess.GetOrOpenStream(5)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
var str Stream
|
||||
Eventually(strChan).Should(Receive(&str))
|
||||
Expect(str.StreamID()).To(Equal(protocol.StreamID(3)))
|
||||
Eventually(strChan).Should(Receive(&str))
|
||||
Expect(str.StreamID()).To(Equal(protocol.StreamID(5)))
|
||||
})
|
||||
|
||||
It("stops accepting when the session is closed", func() {
|
||||
testErr := errors.New("testErr")
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
_, err := sess.AcceptStream()
|
||||
Expect(err).To(MatchError(qerr.ToQuicError(testErr)))
|
||||
close(done)
|
||||
}()
|
||||
go sess.run()
|
||||
Consistently(done).ShouldNot(BeClosed())
|
||||
sess.Close(testErr)
|
||||
Eventually(done).Should(BeClosed())
|
||||
})
|
||||
|
||||
It("stops accepting when the session is closed after version negotiation", func() {
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
_, err := sess.AcceptStream()
|
||||
Expect(err).To(MatchError(qerr.Error(qerr.InternalError, errCloseSessionForNewVersion.Error())))
|
||||
close(done)
|
||||
}()
|
||||
go sess.run()
|
||||
Consistently(done).ShouldNot(BeClosed())
|
||||
Expect(sess.Context().Done()).ToNot(BeClosed())
|
||||
sess.Close(errCloseSessionForNewVersion)
|
||||
Eventually(done).Should(BeClosed())
|
||||
Eventually(sess.Context().Done()).Should(BeClosed())
|
||||
})
|
||||
It("accepts new streams", func() {
|
||||
mstr := NewMockStreamI(mockCtrl)
|
||||
streamManager.EXPECT().AcceptStream().Return(mstr, nil)
|
||||
str, err := sess.AcceptStream()
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(str).To(Equal(mstr))
|
||||
})
|
||||
|
||||
Context("closing", func() {
|
||||
BeforeEach(func() {
|
||||
Eventually(areSessionsRunning).Should(BeFalse())
|
||||
go sess.run()
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
sess.run()
|
||||
}()
|
||||
Eventually(areSessionsRunning).Should(BeTrue())
|
||||
})
|
||||
|
||||
It("shuts down without error", func() {
|
||||
streamManager.EXPECT().CloseWithError(qerr.Error(qerr.PeerGoingAway, ""))
|
||||
sess.Close(nil)
|
||||
Eventually(areSessionsRunning).Should(BeFalse())
|
||||
Expect(mconn.written).To(HaveLen(1))
|
||||
|
@ -589,6 +461,7 @@ var _ = Describe("Session", func() {
|
|||
})
|
||||
|
||||
It("only closes once", func() {
|
||||
streamManager.EXPECT().CloseWithError(qerr.Error(qerr.PeerGoingAway, ""))
|
||||
sess.Close(nil)
|
||||
sess.Close(nil)
|
||||
Eventually(areSessionsRunning).Should(BeFalse())
|
||||
|
@ -598,26 +471,21 @@ var _ = Describe("Session", func() {
|
|||
|
||||
It("closes streams with proper error", func() {
|
||||
testErr := errors.New("test error")
|
||||
s, err := sess.GetOrOpenStream(5)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
streamManager.EXPECT().CloseWithError(qerr.Error(qerr.InternalError, testErr.Error()))
|
||||
sess.Close(testErr)
|
||||
Eventually(areSessionsRunning).Should(BeFalse())
|
||||
n, err := s.Read([]byte{0})
|
||||
Expect(n).To(BeZero())
|
||||
Expect(err.Error()).To(ContainSubstring(testErr.Error()))
|
||||
n, err = s.Write([]byte{0})
|
||||
Expect(n).To(BeZero())
|
||||
Expect(err.Error()).To(ContainSubstring(testErr.Error()))
|
||||
Expect(sess.Context().Done()).To(BeClosed())
|
||||
})
|
||||
|
||||
It("closes the session in order to replace it with another QUIC version", func() {
|
||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||
sess.Close(errCloseSessionForNewVersion)
|
||||
Eventually(areSessionsRunning).Should(BeFalse())
|
||||
Expect(mconn.written).To(BeEmpty()) // no CONNECTION_CLOSE or PUBLIC_RESET sent
|
||||
})
|
||||
|
||||
It("sends a Public Reset if the client is initiating the head-of-line blocking experiment", func() {
|
||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||
sess.Close(handshake.ErrHOLExperiment)
|
||||
Expect(mconn.written).To(HaveLen(1))
|
||||
Expect((<-mconn.written)[0] & 0x02).ToNot(BeZero()) // Public Reset
|
||||
|
@ -625,6 +493,7 @@ var _ = Describe("Session", func() {
|
|||
})
|
||||
|
||||
It("sends a Public Reset if the client is initiating the no STOP_WAITING experiment", func() {
|
||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||
sess.Close(handshake.ErrHOLExperiment)
|
||||
Expect(mconn.written).To(HaveLen(1))
|
||||
Expect((<-mconn.written)[0] & 0x02).ToNot(BeZero()) // Public Reset
|
||||
|
@ -632,6 +501,7 @@ var _ = Describe("Session", func() {
|
|||
})
|
||||
|
||||
It("cancels the context when the run loop exists", func() {
|
||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||
returned := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
|
@ -663,10 +533,12 @@ var _ = Describe("Session", func() {
|
|||
})
|
||||
|
||||
It("closes when handling a packet fails", func(done Done) {
|
||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||
testErr := errors.New("unpack error")
|
||||
hdr.PacketNumber = 5
|
||||
var runErr error
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
runErr = sess.run()
|
||||
}()
|
||||
sess.unpacker.(*mockUnpacker).unpackErr = testErr
|
||||
|
@ -845,6 +717,7 @@ var _ = Describe("Session", func() {
|
|||
sess.scheduleSending()
|
||||
Eventually(mconn.written).Should(HaveLen(2))
|
||||
// make the go routine return
|
||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||
sess.Close(nil)
|
||||
Eventually(done).Should(BeClosed())
|
||||
})
|
||||
|
@ -875,8 +748,6 @@ var _ = Describe("Session", func() {
|
|||
sess.packer.cryptoSetup = &mockCryptoSetup{encLevelSeal: protocol.EncryptionForwardSecure}
|
||||
|
||||
sess.streamFramer.AddFrameForRetransmission(f)
|
||||
_, err := sess.GetOrOpenStream(5)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
sent, err := sess.sendPacket()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(sent).To(BeTrue())
|
||||
|
@ -921,6 +792,7 @@ var _ = Describe("Session", func() {
|
|||
sess.scheduleSending()
|
||||
Eventually(mconn.written).Should(HaveLen(1))
|
||||
// make sure that the go routine returns
|
||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||
sess.Close(nil)
|
||||
Eventually(done).Should(BeClosed())
|
||||
})
|
||||
|
@ -947,6 +819,7 @@ var _ = Describe("Session", func() {
|
|||
sess.scheduleSending()
|
||||
Eventually(mconn.written).Should(HaveLen(1))
|
||||
// make sure that the go routine returns
|
||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||
sess.Close(nil)
|
||||
Eventually(done).Should(BeClosed())
|
||||
})
|
||||
|
@ -1113,29 +986,33 @@ var _ = Describe("Session", func() {
|
|||
sess.scheduleSending()
|
||||
Eventually(func() int { return len(mconn.written) }).ShouldNot(BeZero())
|
||||
Expect(mconn.written).To(Receive(ContainSubstring("foobar")))
|
||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||
})
|
||||
|
||||
Context("scheduling sending", func() {
|
||||
BeforeEach(func() {
|
||||
sess.packer.hasSentPacket = true // make sure this is not the first packet the packer sends
|
||||
sess.processTransportParameters(&handshake.TransportParameters{
|
||||
StreamFlowControlWindow: protocol.MaxByteCount,
|
||||
ConnectionFlowControlWindow: protocol.MaxByteCount,
|
||||
MaxStreams: 1000,
|
||||
})
|
||||
sess.packer.cryptoSetup = &mockCryptoSetup{encLevelSeal: protocol.EncryptionForwardSecure}
|
||||
})
|
||||
|
||||
It("sends after writing to a stream", func(done Done) {
|
||||
Expect(sess.sendingScheduled).NotTo(Receive())
|
||||
s, err := sess.GetOrOpenStream(3)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
It("sends when scheduleSending is called", func() {
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
s.Write([]byte("foobar"))
|
||||
defer GinkgoRecover()
|
||||
sess.run()
|
||||
close(done)
|
||||
}()
|
||||
Eventually(sess.sendingScheduled).Should(Receive())
|
||||
s.(*stream).popStreamFrame(1000) // unblock
|
||||
sess.streamFramer.AddFrameForRetransmission(&wire.StreamFrame{
|
||||
StreamID: 5,
|
||||
Data: []byte("foobar"),
|
||||
})
|
||||
Consistently(mconn.written).ShouldNot(Receive())
|
||||
sess.scheduleSending()
|
||||
Eventually(mconn.written).Should(Receive())
|
||||
// make the go routine return
|
||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||
sess.Close(nil)
|
||||
Eventually(done).Should(BeClosed())
|
||||
})
|
||||
|
||||
It("sets the timer to the ack timer", func() {
|
||||
|
@ -1150,10 +1027,9 @@ var _ = Describe("Session", func() {
|
|||
sess.run()
|
||||
close(done)
|
||||
}()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
Eventually(func() int { return len(mconn.written) }).ShouldNot(BeZero())
|
||||
Expect(mconn.written).To(Receive(ContainSubstring(string([]byte{0x13, 0x37}))))
|
||||
// make sure the go routine returns
|
||||
Eventually(mconn.written).Should(Receive(ContainSubstring(string([]byte{0x13, 0x37}))))
|
||||
// make the go routine return
|
||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||
sess.Close(nil)
|
||||
Eventually(done).Should(BeClosed())
|
||||
})
|
||||
|
@ -1161,13 +1037,16 @@ var _ = Describe("Session", func() {
|
|||
|
||||
It("closes when crypto stream errors", func() {
|
||||
testErr := errors.New("crypto setup error")
|
||||
streamManager.EXPECT().CloseWithError(qerr.Error(qerr.InternalError, testErr.Error()))
|
||||
cryptoSetup.handleErr = testErr
|
||||
var runErr error
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
runErr = sess.run()
|
||||
defer GinkgoRecover()
|
||||
err := sess.run()
|
||||
Expect(err).To(MatchError(testErr))
|
||||
close(done)
|
||||
}()
|
||||
Eventually(func() error { return runErr }).Should(HaveOccurred())
|
||||
Expect(runErr).To(MatchError(testErr))
|
||||
Eventually(done).Should(BeClosed())
|
||||
})
|
||||
|
||||
Context("sending a Public Reset when receiving undecryptable packets during the handshake", func() {
|
||||
|
@ -1189,10 +1068,14 @@ var _ = Describe("Session", func() {
|
|||
BeforeEach(func() {
|
||||
sess.unpacker = &mockUnpacker{unpackErr: qerr.Error(qerr.DecryptionFailure, "")}
|
||||
sess.cryptoSetup = &mockCryptoSetup{}
|
||||
streamManager.EXPECT().CloseWithError(gomock.Any()).MaxTimes(1)
|
||||
})
|
||||
|
||||
It("doesn't immediately send a Public Reset after receiving too many undecryptable packets", func() {
|
||||
go sess.run()
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
sess.run()
|
||||
}()
|
||||
sendUndecryptablePackets()
|
||||
sess.scheduleSending()
|
||||
Consistently(mconn.written).Should(HaveLen(0))
|
||||
|
@ -1201,7 +1084,10 @@ var _ = Describe("Session", func() {
|
|||
})
|
||||
|
||||
It("sets a deadline to send a Public Reset after receiving too many undecryptable packets", func() {
|
||||
go sess.run()
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
sess.run()
|
||||
}()
|
||||
sendUndecryptablePackets()
|
||||
Eventually(func() time.Time { return sess.receivedTooManyUndecrytablePacketsTime }).Should(BeTemporally("~", time.Now(), 20*time.Millisecond))
|
||||
sess.Close(nil)
|
||||
|
@ -1209,7 +1095,10 @@ var _ = Describe("Session", func() {
|
|||
})
|
||||
|
||||
It("drops undecryptable packets when the undecrytable packet queue is full", func() {
|
||||
go sess.run()
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
sess.run()
|
||||
}()
|
||||
sendUndecryptablePackets()
|
||||
Eventually(func() []*receivedPacket { return sess.undecryptablePackets }).Should(HaveLen(protocol.MaxUndecryptablePackets))
|
||||
// check that old packets are kept, and the new packets are dropped
|
||||
|
@ -1220,7 +1109,10 @@ var _ = Describe("Session", func() {
|
|||
|
||||
It("sends a Public Reset after a timeout", func() {
|
||||
Expect(sess.receivedTooManyUndecrytablePacketsTime).To(BeZero())
|
||||
go sess.run()
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
sess.run()
|
||||
}()
|
||||
sendUndecryptablePackets()
|
||||
Eventually(func() time.Time { return sess.receivedTooManyUndecrytablePacketsTime }).Should(BeTemporally("~", time.Now(), time.Second))
|
||||
// speed up this test by manually setting back the time when too many packets were received
|
||||
|
@ -1233,7 +1125,10 @@ var _ = Describe("Session", func() {
|
|||
})
|
||||
|
||||
It("doesn't send a Public Reset if decrypting them suceeded during the timeout", func() {
|
||||
go sess.run()
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
sess.run()
|
||||
}()
|
||||
sess.receivedTooManyUndecrytablePacketsTime = time.Now().Add(-protocol.PublicResetTimeout).Add(-time.Millisecond)
|
||||
sess.scheduleSending() // wake up the run loop
|
||||
// there are no packets in the undecryptable packet queue
|
||||
|
@ -1246,7 +1141,10 @@ var _ = Describe("Session", func() {
|
|||
|
||||
It("ignores undecryptable packets after the handshake is complete", func() {
|
||||
sess.handshakeComplete = true
|
||||
go sess.run()
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
sess.run()
|
||||
}()
|
||||
sendUndecryptablePackets()
|
||||
Consistently(sess.undecryptablePackets).Should(BeEmpty())
|
||||
Expect(sess.Close(nil)).To(Succeed())
|
||||
|
@ -1275,6 +1173,7 @@ var _ = Describe("Session", func() {
|
|||
handshakeChan <- struct{}{}
|
||||
Consistently(sess.handshakeStatus()).ShouldNot(Receive())
|
||||
// make sure the go routine returns
|
||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||
Expect(sess.Close(nil)).To(Succeed())
|
||||
Eventually(done).Should(BeClosed())
|
||||
})
|
||||
|
@ -1290,6 +1189,7 @@ var _ = Describe("Session", func() {
|
|||
close(handshakeChan)
|
||||
Eventually(sess.handshakeStatus()).Should(BeClosed())
|
||||
// make sure the go routine returns
|
||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||
Expect(sess.Close(nil)).To(Succeed())
|
||||
Eventually(done).Should(BeClosed())
|
||||
})
|
||||
|
@ -1303,6 +1203,7 @@ var _ = Describe("Session", func() {
|
|||
Expect(err).To(MatchError(testErr))
|
||||
close(done)
|
||||
}()
|
||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||
sess.Close(testErr)
|
||||
Expect(sess.handshakeStatus()).To(Receive(Equal(testErr)))
|
||||
Eventually(done).Should(BeClosed())
|
||||
|
@ -1311,9 +1212,12 @@ var _ = Describe("Session", func() {
|
|||
It("process transport parameters received from the peer", func() {
|
||||
paramsChan := make(chan handshake.TransportParameters)
|
||||
sess.paramsChan = paramsChan
|
||||
_, err := sess.GetOrOpenStream(5)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
go sess.run()
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
sess.run()
|
||||
close(done)
|
||||
}()
|
||||
params := handshake.TransportParameters{
|
||||
MaxStreams: 123,
|
||||
IdleTimeout: 90 * time.Second,
|
||||
|
@ -1321,12 +1225,14 @@ var _ = Describe("Session", func() {
|
|||
ConnectionFlowControlWindow: 0x5000,
|
||||
OmitConnectionID: true,
|
||||
}
|
||||
streamManager.EXPECT().UpdateLimits(¶ms)
|
||||
paramsChan <- params
|
||||
Eventually(func() *handshake.TransportParameters { return sess.peerParams }).Should(Equal(¶ms))
|
||||
Eventually(func() uint32 { return sess.streamsMap.maxOutgoingStreams }).Should(Equal(uint32(123)))
|
||||
// Eventually(func() (protocol.ByteCount, error) { return sess.flowControlManager.SendWindowSize(5) }).Should(Equal(protocol.ByteCount(0x5000)))
|
||||
Eventually(func() bool { return sess.packer.omitConnectionID }).Should(BeTrue())
|
||||
// make the go routine return
|
||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||
Expect(sess.Close(nil)).To(Succeed())
|
||||
Eventually(done).Should(BeClosed())
|
||||
})
|
||||
|
||||
Context("keep-alives", func() {
|
||||
|
@ -1343,34 +1249,62 @@ var _ = Describe("Session", func() {
|
|||
sess.config.KeepAlive = true
|
||||
sess.lastNetworkActivityTime = time.Now().Add(-remoteIdleTimeout / 2)
|
||||
sess.packer.hasSentPacket = true // make sure this is not the first packet the packer sends
|
||||
go sess.run()
|
||||
defer sess.Close(nil)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
sess.run()
|
||||
close(done)
|
||||
}()
|
||||
var data []byte
|
||||
Eventually(mconn.written).Should(Receive(&data))
|
||||
// -12 because of the crypto tag. This should be 7 (the frame id for a ping frame).
|
||||
Expect(data[len(data)-12-1 : len(data)-12]).To(Equal([]byte{0x07}))
|
||||
// make the go routine return
|
||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||
sess.Close(nil)
|
||||
Eventually(done).Should(BeClosed())
|
||||
})
|
||||
|
||||
It("doesn't send a PING packet if keep-alive is disabled", func() {
|
||||
sess.handshakeComplete = true
|
||||
sess.config.KeepAlive = false
|
||||
sess.lastNetworkActivityTime = time.Now().Add(-remoteIdleTimeout / 2)
|
||||
go sess.run()
|
||||
defer sess.Close(nil)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
sess.run()
|
||||
close(done)
|
||||
}()
|
||||
Consistently(mconn.written).ShouldNot(Receive())
|
||||
// make the go routine return
|
||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||
sess.Close(nil)
|
||||
Eventually(done).Should(BeClosed())
|
||||
})
|
||||
|
||||
It("doesn't send a PING if the handshake isn't completed yet", func() {
|
||||
sess.handshakeComplete = false
|
||||
sess.config.KeepAlive = true
|
||||
sess.lastNetworkActivityTime = time.Now().Add(-remoteIdleTimeout / 2)
|
||||
go sess.run()
|
||||
defer sess.Close(nil)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
sess.run()
|
||||
close(done)
|
||||
}()
|
||||
Consistently(mconn.written).ShouldNot(Receive())
|
||||
// make the go routine return
|
||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||
sess.Close(nil)
|
||||
Eventually(done).Should(BeClosed())
|
||||
})
|
||||
})
|
||||
|
||||
Context("timeouts", func() {
|
||||
BeforeEach(func() {
|
||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||
})
|
||||
|
||||
It("times out due to no network activity", func(done Done) {
|
||||
sess.handshakeComplete = true
|
||||
sess.lastNetworkActivityTime = time.Now().Add(-time.Hour)
|
||||
|
@ -1430,24 +1364,19 @@ var _ = Describe("Session", func() {
|
|||
}, 0.5)
|
||||
|
||||
Context("getting streams", func() {
|
||||
BeforeEach(func() {
|
||||
sess.processTransportParameters(&handshake.TransportParameters{MaxStreams: 1000})
|
||||
})
|
||||
|
||||
It("returns a new stream", func() {
|
||||
mstr := NewMockStreamI(mockCtrl)
|
||||
streamManager.EXPECT().GetOrOpenStream(protocol.StreamID(11)).Return(mstr, nil)
|
||||
str, err := sess.GetOrOpenStream(11)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(str).ToNot(BeNil())
|
||||
Expect(str.StreamID()).To(Equal(protocol.StreamID(11)))
|
||||
Expect(str).To(Equal(mstr))
|
||||
})
|
||||
|
||||
It("returns a nil-value (not an interface with value nil) for closed streams", func() {
|
||||
str, err := sess.GetOrOpenStream(9)
|
||||
strI := Stream(nil)
|
||||
streamManager.EXPECT().GetOrOpenStream(protocol.StreamID(1337)).Return(strI, nil)
|
||||
str, err := sess.GetOrOpenStream(1337)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
sess.onStreamCompleted(9)
|
||||
str, err = sess.GetOrOpenStream(9)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(str).To(BeNil())
|
||||
// make sure that the returned value is a plain nil, not an Stream with value nil
|
||||
_, ok := str.(Stream)
|
||||
Expect(ok).To(BeFalse())
|
||||
|
@ -1455,9 +1384,11 @@ var _ = Describe("Session", func() {
|
|||
|
||||
// all relevant tests for this are in the streamsMap
|
||||
It("opens streams synchronously", func() {
|
||||
mstr := NewMockStreamI(mockCtrl)
|
||||
streamManager.EXPECT().OpenStreamSync().Return(mstr, nil)
|
||||
str, err := sess.OpenStreamSync()
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(str).ToNot(BeNil())
|
||||
Expect(str).To(Equal(mstr))
|
||||
})
|
||||
})
|
||||
|
||||
|
@ -1533,7 +1464,6 @@ var _ = Describe("Client Session", func() {
|
|||
)
|
||||
sess = sessP.(*session)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(sess.streamsMap.streams).To(BeEmpty())
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
|
|
|
@ -12,6 +12,18 @@ import (
|
|||
"github.com/lucas-clemente/quic-go/qerr"
|
||||
)
|
||||
|
||||
type streamManager interface {
|
||||
GetOrOpenStream(protocol.StreamID) (streamI, error)
|
||||
GetOrOpenSendStream(protocol.StreamID) (sendStreamI, error)
|
||||
GetOrOpenReceiveStream(protocol.StreamID) (receiveStreamI, error)
|
||||
OpenStream() (Stream, error)
|
||||
OpenStreamSync() (Stream, error)
|
||||
AcceptStream() (Stream, error)
|
||||
DeleteStream(protocol.StreamID) error
|
||||
UpdateLimits(*handshake.TransportParameters)
|
||||
CloseWithError(error)
|
||||
}
|
||||
|
||||
type streamsMap struct {
|
||||
mutex sync.RWMutex
|
||||
|
||||
|
@ -35,6 +47,8 @@ type streamsMap struct {
|
|||
maxOutgoingStreams uint32
|
||||
}
|
||||
|
||||
var _ streamManager = &streamsMap{}
|
||||
|
||||
type streamLambda func(streamI) (bool, error)
|
||||
type newStreamLambda func(protocol.StreamID) streamI
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue