expose methods for opening and accepting unidirectional streams

This commit is contained in:
Marten Seemann 2018-02-03 22:24:49 +08:00
parent 8c594e1084
commit da49295b52
8 changed files with 140 additions and 17 deletions

View file

@ -1,5 +1,9 @@
# Changelog
## v0.8.0 (unreleased)
- Add support for unidirectional streams (for IETF QUIC).
## v0.7.0 (2018-02-03)
- The lower boundary for packets included in ACKs is now derived, and the value sent in STOP_WAITING frames is ignored.

View file

@ -78,7 +78,10 @@ func (s *mockSession) RemoteAddr() net.Addr {
func (s *mockSession) Context() context.Context {
return s.ctx
}
func (s *mockSession) ConnectionState() quic.ConnectionState { panic("not implemented") }
func (s *mockSession) ConnectionState() quic.ConnectionState { panic("not implemented") }
func (s *mockSession) AcceptUniStream() (quic.ReceiveStream, error) { panic("not implemented") }
func (s *mockSession) OpenUniStream() (quic.SendStream, error) { panic("not implemented") }
func (s *mockSession) OpenUniStreamSync() (quic.SendStream, error) { panic("not implemented") }
var _ = Describe("H2 server", func() {
var (

View file

@ -113,15 +113,23 @@ type StreamError interface {
// A Session is a QUIC connection between two peers.
type Session interface {
// AcceptStream returns the next stream opened by the peer, blocking until one is available.
// Since stream 1 is reserved for the crypto stream, the first stream is either 2 (for a client) or 3 (for a server).
AcceptStream() (Stream, error)
// OpenStream opens a new QUIC stream, returning a special error when the peer's concurrent stream limit is reached.
// New streams always have the smallest possible stream ID.
// TODO: Enable testing for the special error
// AcceptUniStream returns the next unidirectional stream opened by the peer, blocking until one is available.
AcceptUniStream() (ReceiveStream, error)
// OpenStream opens a new bidirectional QUIC stream.
// It returns a special error when the peer's concurrent stream limit is reached.
// TODO(#1152): Enable testing for the special error
OpenStream() (Stream, error)
// OpenStreamSync opens a new QUIC stream, blocking until the peer's concurrent stream limit allows a new stream to be opened.
// It always picks the smallest possible stream ID.
// OpenStreamSync opens a new bidirectional QUIC stream.
// It blocks until the peer's concurrent stream limit allows a new stream to be opened.
OpenStreamSync() (Stream, error)
// OpenUniStream opens a new outgoing unidirectional QUIC stream.
// It returns a special error when the peer's concurrent stream limit is reached.
// TODO(#1152): Enable testing for the special error
OpenUniStream() (SendStream, error)
// OpenUniStreamSync opens a new outgoing unidirectional QUIC stream.
// It blocks until the peer's concurrent stream limit allows a new stream to be opened.
OpenUniStreamSync() (SendStream, error)
// LocalAddr returns the local address.
LocalAddr() net.Addr
// RemoteAddr returns the address of the peer.

View file

@ -49,6 +49,19 @@ func (mr *MockStreamManagerMockRecorder) AcceptStream() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AcceptStream", reflect.TypeOf((*MockStreamManager)(nil).AcceptStream))
}
// AcceptUniStream mocks base method
func (m *MockStreamManager) AcceptUniStream() (ReceiveStream, error) {
ret := m.ctrl.Call(m, "AcceptUniStream")
ret0, _ := ret[0].(ReceiveStream)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// AcceptUniStream indicates an expected call of AcceptUniStream
func (mr *MockStreamManagerMockRecorder) AcceptUniStream() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AcceptUniStream", reflect.TypeOf((*MockStreamManager)(nil).AcceptUniStream))
}
// CloseWithError mocks base method
func (m *MockStreamManager) CloseWithError(arg0 error) {
m.ctrl.Call(m, "CloseWithError", arg0)
@ -135,6 +148,32 @@ func (mr *MockStreamManagerMockRecorder) OpenStreamSync() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OpenStreamSync", reflect.TypeOf((*MockStreamManager)(nil).OpenStreamSync))
}
// OpenUniStream mocks base method
func (m *MockStreamManager) OpenUniStream() (SendStream, error) {
ret := m.ctrl.Call(m, "OpenUniStream")
ret0, _ := ret[0].(SendStream)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// OpenUniStream indicates an expected call of OpenUniStream
func (mr *MockStreamManagerMockRecorder) OpenUniStream() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OpenUniStream", reflect.TypeOf((*MockStreamManager)(nil).OpenUniStream))
}
// OpenUniStreamSync mocks base method
func (m *MockStreamManager) OpenUniStreamSync() (SendStream, error) {
ret := m.ctrl.Call(m, "OpenUniStreamSync")
ret0, _ := ret[0].(SendStream)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// OpenUniStreamSync indicates an expected call of OpenUniStreamSync
func (mr *MockStreamManagerMockRecorder) OpenUniStreamSync() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OpenUniStreamSync", reflect.TypeOf((*MockStreamManager)(nil).OpenUniStreamSync))
}
// UpdateLimits mocks base method
func (m *MockStreamManager) UpdateLimits(arg0 *handshake.TransportParameters) {
m.ctrl.Call(m, "UpdateLimits", arg0)

View file

@ -57,15 +57,18 @@ func (s *mockSession) closeRemote(e error) {
func (s *mockSession) OpenStream() (Stream, error) {
return &stream{}, nil
}
func (s *mockSession) AcceptStream() (Stream, error) { panic("not implemented") }
func (s *mockSession) OpenStreamSync() (Stream, error) { panic("not implemented") }
func (s *mockSession) LocalAddr() net.Addr { panic("not implemented") }
func (s *mockSession) RemoteAddr() net.Addr { panic("not implemented") }
func (*mockSession) Context() context.Context { panic("not implemented") }
func (*mockSession) ConnectionState() ConnectionState { panic("not implemented") }
func (*mockSession) GetVersion() protocol.VersionNumber { return protocol.VersionWhatever }
func (s *mockSession) handshakeStatus() <-chan error { return s.handshakeChan }
func (*mockSession) getCryptoStream() cryptoStreamI { panic("not implemented") }
func (s *mockSession) AcceptStream() (Stream, error) { panic("not implemented") }
func (s *mockSession) AcceptUniStream() (ReceiveStream, error) { panic("not implemented") }
func (s *mockSession) OpenStreamSync() (Stream, error) { panic("not implemented") }
func (s *mockSession) OpenUniStream() (SendStream, error) { panic("not implemented") }
func (s *mockSession) OpenUniStreamSync() (SendStream, error) { panic("not implemented") }
func (s *mockSession) LocalAddr() net.Addr { panic("not implemented") }
func (s *mockSession) RemoteAddr() net.Addr { panic("not implemented") }
func (*mockSession) Context() context.Context { panic("not implemented") }
func (*mockSession) ConnectionState() ConnectionState { panic("not implemented") }
func (*mockSession) GetVersion() protocol.VersionNumber { return protocol.VersionWhatever }
func (s *mockSession) handshakeStatus() <-chan error { return s.handshakeChan }
func (*mockSession) getCryptoStream() cryptoStreamI { panic("not implemented") }
var _ Session = &mockSession{}

View file

@ -33,8 +33,11 @@ type streamManager interface {
GetOrOpenSendStream(protocol.StreamID) (sendStreamI, error)
GetOrOpenReceiveStream(protocol.StreamID) (receiveStreamI, error)
OpenStream() (Stream, error)
OpenUniStream() (SendStream, error)
OpenStreamSync() (Stream, error)
OpenUniStreamSync() (SendStream, error)
AcceptStream() (Stream, error)
AcceptUniStream() (ReceiveStream, error)
DeleteStream(protocol.StreamID) error
UpdateLimits(*handshake.TransportParameters)
HandleMaxStreamIDFrame(*wire.MaxStreamIDFrame) error
@ -923,6 +926,10 @@ func (s *session) AcceptStream() (Stream, error) {
return s.streamsMap.AcceptStream()
}
func (s *session) AcceptUniStream() (ReceiveStream, error) {
return s.streamsMap.AcceptUniStream()
}
// OpenStream opens a stream
func (s *session) OpenStream() (Stream, error) {
return s.streamsMap.OpenStream()
@ -932,6 +939,14 @@ func (s *session) OpenStreamSync() (Stream, error) {
return s.streamsMap.OpenStreamSync()
}
func (s *session) OpenUniStream() (SendStream, error) {
return s.streamsMap.OpenUniStream()
}
func (s *session) OpenUniStreamSync() (SendStream, error) {
return s.streamsMap.OpenUniStreamSync()
}
func (s *session) newStream(id protocol.StreamID) streamI {
flowController := s.newFlowController(id)
return newStream(id, s, flowController, s.version)

View file

@ -1507,7 +1507,14 @@ var _ = Describe("Session", func() {
Expect(err).To(MatchError("Stream 100 is not a bidirectional stream"))
})
// all relevant tests for this are in the streamsMap
It("opens streams", func() {
mstr := NewMockStreamI(mockCtrl)
streamManager.EXPECT().OpenStream().Return(mstr, nil)
str, err := sess.OpenStream()
Expect(err).ToNot(HaveOccurred())
Expect(str).To(Equal(mstr))
})
It("opens streams synchronously", func() {
mstr := NewMockStreamI(mockCtrl)
streamManager.EXPECT().OpenStreamSync().Return(mstr, nil)
@ -1515,6 +1522,38 @@ var _ = Describe("Session", func() {
Expect(err).ToNot(HaveOccurred())
Expect(str).To(Equal(mstr))
})
It("opens unidirectional streams", func() {
mstr := NewMockSendStreamI(mockCtrl)
streamManager.EXPECT().OpenUniStream().Return(mstr, nil)
str, err := sess.OpenUniStream()
Expect(err).ToNot(HaveOccurred())
Expect(str).To(Equal(mstr))
})
It("opens unidirectional streams synchronously", func() {
mstr := NewMockSendStreamI(mockCtrl)
streamManager.EXPECT().OpenUniStreamSync().Return(mstr, nil)
str, err := sess.OpenUniStreamSync()
Expect(err).ToNot(HaveOccurred())
Expect(str).To(Equal(mstr))
})
It("accepts streams", func() {
mstr := NewMockStreamI(mockCtrl)
streamManager.EXPECT().AcceptStream().Return(mstr, nil)
str, err := sess.AcceptStream()
Expect(err).ToNot(HaveOccurred())
Expect(str).To(Equal(mstr))
})
It("accepts unidirectional streams", func() {
mstr := NewMockReceiveStreamI(mockCtrl)
streamManager.EXPECT().AcceptUniStream().Return(mstr, nil)
str, err := sess.AcceptUniStream()
Expect(err).ToNot(HaveOccurred())
Expect(str).To(Equal(mstr))
})
})
Context("ignoring errors", func() {

View file

@ -185,6 +185,14 @@ func (m *streamsMapLegacy) OpenStreamSync() (Stream, error) {
}
}
func (m *streamsMapLegacy) OpenUniStream() (SendStream, error) {
return nil, errors.New("gQUIC doesn't support unidirectional streams")
}
func (m *streamsMapLegacy) OpenUniStreamSync() (SendStream, error) {
return nil, errors.New("gQUIC doesn't support unidirectional streams")
}
// AcceptStream returns the next stream opened by the peer
// it blocks until a new stream is opened
func (m *streamsMapLegacy) AcceptStream() (Stream, error) {
@ -206,6 +214,10 @@ func (m *streamsMapLegacy) AcceptStream() (Stream, error) {
return str, nil
}
func (m *streamsMapLegacy) AcceptUniStream() (ReceiveStream, error) {
return nil, errors.New("gQUIC doesn't support unidirectional streams")
}
func (m *streamsMapLegacy) DeleteStream(id protocol.StreamID) error {
m.mutex.Lock()
defer m.mutex.Unlock()