rename the session to connection

This commit is contained in:
Marten Seemann 2022-03-27 11:25:42 +01:00
parent d7ad1b6b9b
commit 4b17343631
16 changed files with 574 additions and 574 deletions

View file

@ -280,7 +280,7 @@ func newClient(
func (c *client) dial(ctx context.Context) error {
c.logger.Infof("Starting new connection to %s (%s -> %s), source connection ID %s, destination connection ID %s, version %s", c.tlsConf.ServerName, c.sconn.LocalAddr(), c.sconn.RemoteAddr(), c.srcConnID, c.destConnID, c.version)
c.conn = newClientSession(
c.conn = newClientConnection(
c.sconn,
c.packetHandlers,
c.destConnID,

View file

@ -31,7 +31,7 @@ var _ = Describe("Client", func() {
tracer *mocklogging.MockConnectionTracer
config *Config
originalClientSessConstructor func(
originalClientConnConstructor func(
conn sendConn,
runner connRunner,
destConnID protocol.ConnectionID,
@ -51,7 +51,7 @@ var _ = Describe("Client", func() {
BeforeEach(func() {
tlsConf = &tls.Config{NextProtos: []string{"proto1"}}
connID = protocol.ConnectionID{0, 0, 0, 0, 0, 0, 0x13, 0x37}
originalClientSessConstructor = newClientSession
originalClientConnConstructor = newClientConnection
tracer = mocklogging.NewMockConnectionTracer(mockCtrl)
tr := mocklogging.NewMockTracer(mockCtrl)
tr.EXPECT().TracerForConnection(gomock.Any(), protocol.PerspectiveClient, gomock.Any()).Return(tracer).MaxTimes(1)
@ -77,11 +77,11 @@ var _ = Describe("Client", func() {
AfterEach(func() {
connMuxer = origMultiplexer
newClientSession = originalClientSessConstructor
newClientConnection = originalClientConnConstructor
})
AfterEach(func() {
if s, ok := cl.conn.(*session); ok {
if s, ok := cl.conn.(*connection); ok {
s.shutdown()
}
Eventually(areConnsRunning).Should(BeFalse())
@ -118,7 +118,7 @@ var _ = Describe("Client", func() {
mockMultiplexer.EXPECT().AddConn(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(manager, nil)
remoteAddrChan := make(chan string, 1)
newClientSession = func(
newClientConnection = func(
sconn sendConn,
_ connRunner,
_ protocol.ConnectionID,
@ -151,7 +151,7 @@ var _ = Describe("Client", func() {
mockMultiplexer.EXPECT().AddConn(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(manager, nil)
hostnameChan := make(chan string, 1)
newClientSession = func(
newClientConnection = func(
_ sendConn,
_ connRunner,
_ protocol.ConnectionID,
@ -184,7 +184,7 @@ var _ = Describe("Client", func() {
mockMultiplexer.EXPECT().AddConn(packetConn, gomock.Any(), gomock.Any(), gomock.Any()).Return(manager, nil)
hostnameChan := make(chan string, 1)
newClientSession = func(
newClientConnection = func(
_ sendConn,
_ connRunner,
_ protocol.ConnectionID,
@ -223,7 +223,7 @@ var _ = Describe("Client", func() {
mockMultiplexer.EXPECT().AddConn(packetConn, gomock.Any(), gomock.Any(), gomock.Any()).Return(manager, nil)
run := make(chan struct{})
newClientSession = func(
newClientConnection = func(
_ sendConn,
runner connRunner,
_ protocol.ConnectionID,
@ -266,7 +266,7 @@ var _ = Describe("Client", func() {
readyChan := make(chan struct{})
done := make(chan struct{})
newClientSession = func(
newClientConnection = func(
_ sendConn,
runner connRunner,
_ protocol.ConnectionID,
@ -314,7 +314,7 @@ var _ = Describe("Client", func() {
mockMultiplexer.EXPECT().AddConn(packetConn, gomock.Any(), gomock.Any(), gomock.Any()).Return(manager, nil)
testErr := errors.New("early handshake error")
newClientSession = func(
newClientConnection = func(
_ sendConn,
_ connRunner,
_ protocol.ConnectionID,
@ -357,7 +357,7 @@ var _ = Describe("Client", func() {
<-connRunning
})
conn.EXPECT().HandshakeComplete().Return(context.Background())
newClientSession = func(
newClientConnection = func(
_ sendConn,
_ connRunner,
_ protocol.ConnectionID,
@ -409,7 +409,7 @@ var _ = Describe("Client", func() {
run := make(chan struct{})
connCreated := make(chan struct{})
conn := NewMockQuicConn(mockCtrl)
newClientSession = func(
newClientConnection = func(
connP sendConn,
_ connRunner,
_ protocol.ConnectionID,
@ -529,7 +529,7 @@ var _ = Describe("Client", func() {
var cconn sendConn
var version protocol.VersionNumber
var conf *Config
newClientSession = func(
newClientConnection = func(
connP sendConn,
_ connRunner,
_ protocol.ConnectionID,
@ -569,7 +569,7 @@ var _ = Describe("Client", func() {
mockMultiplexer.EXPECT().AddConn(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(manager, nil)
var counter int
newClientSession = func(
newClientConnection = func(
_ sendConn,
_ connRunner,
_ protocol.ConnectionID,

View file

@ -171,7 +171,7 @@ var _ = Describe("Connection ID Generator", func() {
}
})
It("replaces with a closed session for all connection IDs", func() {
It("replaces with a closed connection for all connection IDs", func() {
Expect(g.SetMaxActiveConnIDs(5)).To(Succeed())
Expect(queuedFrames).To(HaveLen(4))
sess := NewMockPacketHandler(mockCtrl)

View file

@ -124,14 +124,14 @@ type errCloseForRecreating struct {
}
func (e *errCloseForRecreating) Error() string {
return "closing session in order to recreate it"
return "closing connection in order to recreate it"
}
var connTracingID uint64 // to be accessed atomically
func nextConnTracingID() uint64 { return atomic.AddUint64(&connTracingID, 1) }
// A Connection is a QUIC session
type session struct {
// A Connection is a QUIC connection
type connection struct {
// Destination connection ID used during the handshake.
// Used to check source connection ID on incoming packets.
handshakeDestConnID protocol.ConnectionID
@ -197,8 +197,8 @@ type session struct {
versionNegotiated bool
receivedFirstPacket bool
idleTimeout time.Duration
sessionCreationTime time.Time
idleTimeout time.Duration
creationTime time.Time
// The idle timeout is set based on the max of the time we received the last packet...
lastPacketReceivedTime time.Time
// ... and the time we sent a new ack-eliciting packet after receiving a packet.
@ -222,13 +222,13 @@ type session struct {
}
var (
_ Connection = &session{}
_ EarlyConnection = &session{}
_ streamSender = &session{}
_ Connection = &connection{}
_ EarlyConnection = &connection{}
_ streamSender = &connection{}
deadlineSendImmediately = time.Time{}.Add(42 * time.Millisecond) // any value > time.Time{} and before time.Now() is fine
)
var newSession = func(
var newConnection = func(
conn sendConn,
runner connRunner,
origDestConnID protocol.ConnectionID,
@ -246,7 +246,7 @@ var newSession = func(
logger utils.Logger,
v protocol.VersionNumber,
) quicConn {
s := &session{
s := &connection{
conn: conn,
config: conf,
handshakeDestConnID: destConnID,
@ -362,7 +362,7 @@ var newSession = func(
}
// declare this as a variable, such that we can it mock it in the tests
var newClientSession = func(
var newClientConnection = func(
conn sendConn,
runner connRunner,
destConnID protocol.ConnectionID,
@ -377,7 +377,7 @@ var newClientSession = func(
logger utils.Logger,
v protocol.VersionNumber,
) quicConn {
s := &session{
s := &connection{
conn: conn,
config: conf,
origDestConnID: destConnID,
@ -493,7 +493,7 @@ var newClientSession = func(
return s
}
func (s *session) preSetup() {
func (s *connection) preSetup() {
s.sendQueue = newSendQueue(s.conn)
s.retransmissionQueue = newRetransmissionQueue(s.version)
s.frameParser = wire.NewFrameParser(s.config.EnableDatagrams, s.version)
@ -521,14 +521,14 @@ func (s *session) preSetup() {
s.version,
)
s.framer = newFramer(s.streamsMap, s.version)
s.receivedPackets = make(chan *receivedPacket, protocol.MaxSessionUnprocessedPackets)
s.receivedPackets = make(chan *receivedPacket, protocol.MaxConnUnprocessedPackets)
s.closeChan = make(chan closeError, 1)
s.sendingScheduled = make(chan struct{}, 1)
s.handshakeCtx, s.handshakeCtxCancel = context.WithCancel(context.Background())
now := time.Now()
s.lastPacketReceivedTime = now
s.sessionCreationTime = now
s.creationTime = now
s.windowUpdateQueue = newWindowUpdateQueue(s.streamsMap, s.connFlowController, s.framer.QueueControlFrame)
if s.config.EnableDatagrams {
@ -536,8 +536,8 @@ func (s *session) preSetup() {
}
}
// run the session main loop
func (s *session) run() error {
// run the connection main loop
func (s *connection) run() error {
defer s.ctxCancel()
s.timer = utils.NewTimer()
@ -589,7 +589,7 @@ runLoop:
if processed := s.handlePacketImpl(p); processed {
processedUndecryptablePacket = true
}
// Don't set timers and send packets if the packet made us close the session.
// Don't set timers and send packets if the packet made us close the connection.
select {
case closeErr = <-s.closeChan:
break runLoop
@ -612,7 +612,7 @@ runLoop:
case <-sendQueueAvailable:
case firstPacket := <-s.receivedPackets:
wasProcessed := s.handlePacketImpl(firstPacket)
// Don't set timers and send packets if the packet made us close the session.
// Don't set timers and send packets if the packet made us close the connection.
select {
case closeErr = <-s.closeChan:
break runLoop
@ -661,11 +661,11 @@ runLoop:
}
if keepAliveTime := s.nextKeepAliveTime(); !keepAliveTime.IsZero() && !now.Before(keepAliveTime) {
// send a PING frame since there is no activity in the session
// send a PING frame since there is no activity in the connection
s.logger.Debugf("Sending a keep-alive PING to keep the connection alive.")
s.framer.QueueControlFrame(&wire.PingFrame{})
s.keepAlivePingSent = true
} else if !s.handshakeComplete && now.Sub(s.sessionCreationTime) >= s.config.handshakeTimeout() {
} else if !s.handshakeComplete && now.Sub(s.creationTime) >= s.config.handshakeTimeout() {
s.destroyImpl(qerr.ErrHandshakeTimeout)
continue
} else {
@ -704,24 +704,24 @@ runLoop:
return closeErr.err
}
// blocks until the early session can be used
func (s *session) earlyConnReady() <-chan struct{} {
// blocks until the early connection can be used
func (s *connection) earlyConnReady() <-chan struct{} {
return s.earlyConnReadyChan
}
func (s *session) HandshakeComplete() context.Context {
func (s *connection) HandshakeComplete() context.Context {
return s.handshakeCtx
}
func (s *session) Context() context.Context {
func (s *connection) Context() context.Context {
return s.ctx
}
func (s *session) supportsDatagrams() bool {
func (s *connection) supportsDatagrams() bool {
return s.peerParams.MaxDatagramFrameSize != protocol.InvalidByteCount
}
func (s *session) ConnectionState() ConnectionState {
func (s *connection) ConnectionState() ConnectionState {
return ConnectionState{
TLS: s.cryptoStreamHandler.ConnectionState(),
SupportsDatagrams: s.supportsDatagrams(),
@ -730,18 +730,18 @@ func (s *session) ConnectionState() ConnectionState {
// Time when the next keep-alive packet should be sent.
// It returns a zero time if no keep-alive should be sent.
func (s *session) nextKeepAliveTime() time.Time {
func (s *connection) nextKeepAliveTime() time.Time {
if !s.config.KeepAlive || s.keepAlivePingSent || !s.firstAckElicitingPacketAfterIdleSentTime.IsZero() {
return time.Time{}
}
return s.lastPacketReceivedTime.Add(s.keepAliveInterval)
}
func (s *session) maybeResetTimer() {
func (s *connection) maybeResetTimer() {
var deadline time.Time
if !s.handshakeComplete {
deadline = utils.MinTime(
s.sessionCreationTime.Add(s.config.handshakeTimeout()),
s.creationTime.Add(s.config.handshakeTimeout()),
s.idleTimeoutStartTime().Add(s.config.HandshakeIdleTimeout),
)
} else {
@ -770,11 +770,11 @@ func (s *session) maybeResetTimer() {
s.timer.Reset(deadline)
}
func (s *session) idleTimeoutStartTime() time.Time {
func (s *connection) idleTimeoutStartTime() time.Time {
return utils.MaxTime(s.lastPacketReceivedTime, s.firstAckElicitingPacketAfterIdleSentTime)
}
func (s *session) handleHandshakeComplete() {
func (s *connection) handleHandshakeComplete() {
s.handshakeComplete = true
s.handshakeCompleteChan = nil // prevent this case from ever being selected again
defer s.handshakeCtxCancel()
@ -810,7 +810,7 @@ func (s *session) handleHandshakeComplete() {
s.queueControlFrame(&wire.HandshakeDoneFrame{})
}
func (s *session) handleHandshakeConfirmed() {
func (s *connection) handleHandshakeConfirmed() {
s.handshakeConfirmed = true
s.sentPacketHandler.SetHandshakeConfirmed()
s.cryptoStreamHandler.SetHandshakeConfirmed()
@ -833,7 +833,7 @@ func (s *session) handleHandshakeConfirmed() {
}
}
func (s *session) handlePacketImpl(rp *receivedPacket) bool {
func (s *connection) handlePacketImpl(rp *receivedPacket) bool {
s.sentPacketHandler.ReceivedBytes(rp.Size())
if wire.IsVersionNegotiationPacket(rp.data) {
@ -901,7 +901,7 @@ func (s *session) handlePacketImpl(rp *receivedPacket) bool {
return processed
}
func (s *session) handleSinglePacket(p *receivedPacket, hdr *wire.Header) bool /* was the packet successfully processed */ {
func (s *connection) handleSinglePacket(p *receivedPacket, hdr *wire.Header) bool /* was the packet successfully processed */ {
var wasQueued bool
defer func() {
@ -993,7 +993,7 @@ func (s *session) handleSinglePacket(p *receivedPacket, hdr *wire.Header) bool /
return true
}
func (s *session) handleRetryPacket(hdr *wire.Header, data []byte) bool /* was this a valid Retry */ {
func (s *connection) handleRetryPacket(hdr *wire.Header, data []byte) bool /* was this a valid Retry */ {
if s.perspective == protocol.PerspectiveServer {
if s.tracer != nil {
s.tracer.DroppedPacket(logging.PacketTypeRetry, protocol.ByteCount(len(data)), logging.PacketDropUnexpectedPacket)
@ -1055,7 +1055,7 @@ func (s *session) handleRetryPacket(hdr *wire.Header, data []byte) bool /* was t
return true
}
func (s *session) handleVersionNegotiationPacket(p *receivedPacket) {
func (s *connection) handleVersionNegotiationPacket(p *receivedPacket) {
if s.perspective == protocol.PerspectiveServer || // servers never receive version negotiation packets
s.receivedFirstPacket || s.versionNegotiated { // ignore delayed / duplicated version negotiation packets
if s.tracer != nil {
@ -1109,7 +1109,7 @@ func (s *session) handleVersionNegotiationPacket(p *receivedPacket) {
})
}
func (s *session) handleUnpackedPacket(
func (s *connection) handleUnpackedPacket(
packet *unpackedPacket,
ecn protocol.ECN,
rcvTime time.Time,
@ -1141,10 +1141,10 @@ func (s *session) handleUnpackedPacket(
s.handshakeDestConnID = cid
s.connIDManager.ChangeInitialConnID(cid)
}
// We create the session as soon as we receive the first packet from the client.
// We create the connection as soon as we receive the first packet from the client.
// We do that before authenticating the packet.
// That means that if the source connection ID was corrupted,
// we might have create a session with an incorrect source connection ID.
// we might have create a connection with an incorrect source connection ID.
// Once we authenticate the first packet, we need to update it.
if s.perspective == protocol.PerspectiveServer {
if !packet.hdr.SrcConnectionID.Equal(s.handshakeDestConnID) {
@ -1209,7 +1209,7 @@ func (s *session) handleUnpackedPacket(
return s.receivedPacketHandler.ReceivedPacket(packet.packetNumber, ecn, packet.encryptionLevel, rcvTime, isAckEliciting)
}
func (s *session) handleFrame(f wire.Frame, encLevel protocol.EncryptionLevel, destConnID protocol.ConnectionID) error {
func (s *connection) handleFrame(f wire.Frame, encLevel protocol.EncryptionLevel, destConnID protocol.ConnectionID) error {
var err error
wire.LogFrame(s.logger, f, false)
switch frame := f.(type) {
@ -1257,9 +1257,9 @@ func (s *session) handleFrame(f wire.Frame, encLevel protocol.EncryptionLevel, d
}
// handlePacket is called by the server with a new packet
func (s *session) handlePacket(p *receivedPacket) {
func (s *connection) handlePacket(p *receivedPacket) {
// Discard packets once the amount of queued packets is larger than
// the channel size, protocol.MaxSessionUnprocessedPackets
// the channel size, protocol.MaxConnUnprocessedPackets
select {
case s.receivedPackets <- p:
default:
@ -1269,7 +1269,7 @@ func (s *session) handlePacket(p *receivedPacket) {
}
}
func (s *session) handleConnectionCloseFrame(frame *wire.ConnectionCloseFrame) {
func (s *connection) handleConnectionCloseFrame(frame *wire.ConnectionCloseFrame) {
if frame.IsApplicationError {
s.closeRemote(&qerr.ApplicationError{
Remote: true,
@ -1286,7 +1286,7 @@ func (s *session) handleConnectionCloseFrame(frame *wire.ConnectionCloseFrame) {
})
}
func (s *session) handleCryptoFrame(frame *wire.CryptoFrame, encLevel protocol.EncryptionLevel) error {
func (s *connection) handleCryptoFrame(frame *wire.CryptoFrame, encLevel protocol.EncryptionLevel) error {
encLevelChanged, err := s.cryptoStreamManager.HandleCryptoFrame(frame, encLevel)
if err != nil {
return err
@ -1299,7 +1299,7 @@ func (s *session) handleCryptoFrame(frame *wire.CryptoFrame, encLevel protocol.E
return nil
}
func (s *session) handleStreamFrame(frame *wire.StreamFrame) error {
func (s *connection) handleStreamFrame(frame *wire.StreamFrame) error {
str, err := s.streamsMap.GetOrOpenReceiveStream(frame.StreamID)
if err != nil {
return err
@ -1312,11 +1312,11 @@ func (s *session) handleStreamFrame(frame *wire.StreamFrame) error {
return str.handleStreamFrame(frame)
}
func (s *session) handleMaxDataFrame(frame *wire.MaxDataFrame) {
func (s *connection) handleMaxDataFrame(frame *wire.MaxDataFrame) {
s.connFlowController.UpdateSendWindow(frame.MaximumData)
}
func (s *session) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) error {
func (s *connection) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) error {
str, err := s.streamsMap.GetOrOpenSendStream(frame.StreamID)
if err != nil {
return err
@ -1329,11 +1329,11 @@ func (s *session) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) error
return nil
}
func (s *session) handleMaxStreamsFrame(frame *wire.MaxStreamsFrame) {
func (s *connection) handleMaxStreamsFrame(frame *wire.MaxStreamsFrame) {
s.streamsMap.HandleMaxStreamsFrame(frame)
}
func (s *session) handleResetStreamFrame(frame *wire.ResetStreamFrame) error {
func (s *connection) handleResetStreamFrame(frame *wire.ResetStreamFrame) error {
str, err := s.streamsMap.GetOrOpenReceiveStream(frame.StreamID)
if err != nil {
return err
@ -1345,7 +1345,7 @@ func (s *session) handleResetStreamFrame(frame *wire.ResetStreamFrame) error {
return str.handleResetStreamFrame(frame)
}
func (s *session) handleStopSendingFrame(frame *wire.StopSendingFrame) error {
func (s *connection) handleStopSendingFrame(frame *wire.StopSendingFrame) error {
str, err := s.streamsMap.GetOrOpenSendStream(frame.StreamID)
if err != nil {
return err
@ -1358,11 +1358,11 @@ func (s *session) handleStopSendingFrame(frame *wire.StopSendingFrame) error {
return nil
}
func (s *session) handlePathChallengeFrame(frame *wire.PathChallengeFrame) {
func (s *connection) handlePathChallengeFrame(frame *wire.PathChallengeFrame) {
s.queueControlFrame(&wire.PathResponseFrame{Data: frame.Data})
}
func (s *session) handleNewTokenFrame(frame *wire.NewTokenFrame) error {
func (s *connection) handleNewTokenFrame(frame *wire.NewTokenFrame) error {
if s.perspective == protocol.PerspectiveServer {
return &qerr.TransportError{
ErrorCode: qerr.ProtocolViolation,
@ -1375,15 +1375,15 @@ func (s *session) handleNewTokenFrame(frame *wire.NewTokenFrame) error {
return nil
}
func (s *session) handleNewConnectionIDFrame(f *wire.NewConnectionIDFrame) error {
func (s *connection) handleNewConnectionIDFrame(f *wire.NewConnectionIDFrame) error {
return s.connIDManager.Add(f)
}
func (s *session) handleRetireConnectionIDFrame(f *wire.RetireConnectionIDFrame, destConnID protocol.ConnectionID) error {
func (s *connection) handleRetireConnectionIDFrame(f *wire.RetireConnectionIDFrame, destConnID protocol.ConnectionID) error {
return s.connIDGenerator.Retire(f.SequenceNumber, destConnID)
}
func (s *session) handleHandshakeDoneFrame() error {
func (s *connection) handleHandshakeDoneFrame() error {
if s.perspective == protocol.PerspectiveServer {
return &qerr.TransportError{
ErrorCode: qerr.ProtocolViolation,
@ -1396,7 +1396,7 @@ func (s *session) handleHandshakeDoneFrame() error {
return nil
}
func (s *session) handleAckFrame(frame *wire.AckFrame, encLevel protocol.EncryptionLevel) error {
func (s *connection) handleAckFrame(frame *wire.AckFrame, encLevel protocol.EncryptionLevel) error {
acked1RTTPacket, err := s.sentPacketHandler.ReceivedAck(frame, encLevel, s.lastPacketReceivedTime)
if err != nil {
return err
@ -1410,7 +1410,7 @@ func (s *session) handleAckFrame(frame *wire.AckFrame, encLevel protocol.Encrypt
return s.cryptoStreamHandler.SetLargest1RTTAcked(frame.LargestAcked())
}
func (s *session) handleDatagramFrame(f *wire.DatagramFrame) error {
func (s *connection) handleDatagramFrame(f *wire.DatagramFrame) error {
if f.Length(s.version) > protocol.MaxDatagramFrameSize {
return &qerr.TransportError{
ErrorCode: qerr.ProtocolViolation,
@ -1421,50 +1421,50 @@ func (s *session) handleDatagramFrame(f *wire.DatagramFrame) error {
return nil
}
// closeLocal closes the session and send a CONNECTION_CLOSE containing the error
func (s *session) closeLocal(e error) {
// closeLocal closes the connection and send a CONNECTION_CLOSE containing the error
func (s *connection) closeLocal(e error) {
s.closeOnce.Do(func() {
if e == nil {
s.logger.Infof("Closing session.")
s.logger.Infof("Closing connection.")
} else {
s.logger.Errorf("Closing session with error: %s", e)
s.logger.Errorf("Closing connection with error: %s", e)
}
s.closeChan <- closeError{err: e, immediate: false, remote: false}
})
}
// destroy closes the session without sending the error on the wire
func (s *session) destroy(e error) {
// destroy closes the connection without sending the error on the wire
func (s *connection) destroy(e error) {
s.destroyImpl(e)
<-s.ctx.Done()
}
func (s *session) destroyImpl(e error) {
func (s *connection) destroyImpl(e error) {
s.closeOnce.Do(func() {
if nerr, ok := e.(net.Error); ok && nerr.Timeout() {
s.logger.Errorf("Destroying session: %s", e)
s.logger.Errorf("Destroying connection: %s", e)
} else {
s.logger.Errorf("Destroying session with error: %s", e)
s.logger.Errorf("Destroying connection with error: %s", e)
}
s.closeChan <- closeError{err: e, immediate: true, remote: false}
})
}
func (s *session) closeRemote(e error) {
func (s *connection) closeRemote(e error) {
s.closeOnce.Do(func() {
s.logger.Errorf("Peer closed session with error: %s", e)
s.logger.Errorf("Peer closed connection with error: %s", e)
s.closeChan <- closeError{err: e, immediate: true, remote: true}
})
}
// Close the connection. It sends a NO_ERROR application error.
// It waits until the run loop has stopped before returning
func (s *session) shutdown() {
func (s *connection) shutdown() {
s.closeLocal(nil)
<-s.ctx.Done()
}
func (s *session) CloseWithError(code ApplicationErrorCode, desc string) error {
func (s *connection) CloseWithError(code ApplicationErrorCode, desc string) error {
s.closeLocal(&qerr.ApplicationError{
ErrorCode: code,
ErrorMessage: desc,
@ -1473,7 +1473,7 @@ func (s *session) CloseWithError(code ApplicationErrorCode, desc string) error {
return nil
}
func (s *session) handleCloseError(closeErr *closeError) {
func (s *connection) handleCloseError(closeErr *closeError) {
e := closeErr.err
if e == nil {
e = &qerr.ApplicationError{}
@ -1532,7 +1532,7 @@ func (s *session) handleCloseError(closeErr *closeError) {
s.connIDGenerator.ReplaceWithClosed(cs)
}
func (s *session) dropEncryptionLevel(encLevel protocol.EncryptionLevel) {
func (s *connection) dropEncryptionLevel(encLevel protocol.EncryptionLevel) {
s.sentPacketHandler.DropPackets(encLevel)
s.receivedPacketHandler.DropPackets(encLevel)
if s.tracer != nil {
@ -1550,7 +1550,7 @@ func (s *session) dropEncryptionLevel(encLevel protocol.EncryptionLevel) {
}
// is called for the client, when restoring transport parameters saved for 0-RTT
func (s *session) restoreTransportParameters(params *wire.TransportParameters) {
func (s *connection) restoreTransportParameters(params *wire.TransportParameters) {
if s.logger.Debug() {
s.logger.Debugf("Restoring Transport Parameters: %s", params)
}
@ -1561,7 +1561,7 @@ func (s *session) restoreTransportParameters(params *wire.TransportParameters) {
s.streamsMap.UpdateLimits(params)
}
func (s *session) handleTransportParameters(params *wire.TransportParameters) {
func (s *connection) handleTransportParameters(params *wire.TransportParameters) {
if err := s.checkTransportParameters(params); err != nil {
s.closeLocal(&qerr.TransportError{
ErrorCode: qerr.TransportParameterError,
@ -1573,13 +1573,13 @@ func (s *session) handleTransportParameters(params *wire.TransportParameters) {
// During a 0-RTT connection, we are only allowed to use the new transport parameters for 1-RTT packets.
if s.perspective == protocol.PerspectiveServer {
s.applyTransportParameters()
// On the server side, the early session is ready as soon as we processed
// On the server side, the early connection is ready as soon as we processed
// the client's transport parameters.
close(s.earlyConnReadyChan)
}
}
func (s *session) checkTransportParameters(params *wire.TransportParameters) error {
func (s *connection) checkTransportParameters(params *wire.TransportParameters) error {
if s.logger.Debug() {
s.logger.Debugf("Processed Transport Parameters: %s", params)
}
@ -1612,7 +1612,7 @@ func (s *session) checkTransportParameters(params *wire.TransportParameters) err
return nil
}
func (s *session) applyTransportParameters() {
func (s *connection) applyTransportParameters() {
params := s.peerParams
// Our local idle timeout will always be > 0.
s.idleTimeout = utils.MinNonZeroDuration(s.config.MaxIdleTimeout, params.MaxIdleTimeout)
@ -1633,7 +1633,7 @@ func (s *session) applyTransportParameters() {
}
}
func (s *session) sendPackets() error {
func (s *connection) sendPackets() error {
s.pacingDeadline = time.Time{}
var sentPacket bool // only used in for packets sent in send mode SendAny
@ -1699,7 +1699,7 @@ func (s *session) sendPackets() error {
}
}
func (s *session) maybeSendAckOnlyPacket() error {
func (s *connection) maybeSendAckOnlyPacket() error {
packet, err := s.packer.MaybePackAckPacket(s.handshakeConfirmed)
if err != nil {
return err
@ -1711,7 +1711,7 @@ func (s *session) maybeSendAckOnlyPacket() error {
return nil
}
func (s *session) sendProbePacket(encLevel protocol.EncryptionLevel) error {
func (s *connection) sendProbePacket(encLevel protocol.EncryptionLevel) error {
// Queue probe packets until we actually send out a packet,
// or until there are no more packets to queue.
var packet *packedPacket
@ -1747,13 +1747,13 @@ func (s *session) sendProbePacket(encLevel protocol.EncryptionLevel) error {
}
}
if packet == nil || packet.packetContents == nil {
return fmt.Errorf("session BUG: couldn't pack %s probe packet", encLevel)
return fmt.Errorf("connection BUG: couldn't pack %s probe packet", encLevel)
}
s.sendPackedPacket(packet, time.Now())
return nil
}
func (s *session) sendPacket() (bool, error) {
func (s *connection) sendPacket() (bool, error) {
if isBlocked, offset := s.connFlowController.IsNewlyBlocked(); isBlocked {
s.framer.QueueControlFrame(&wire.DataBlockedFrame{MaximumData: offset})
}
@ -1792,7 +1792,7 @@ func (s *session) sendPacket() (bool, error) {
return true, nil
}
func (s *session) sendPackedPacket(packet *packedPacket, now time.Time) {
func (s *connection) sendPackedPacket(packet *packedPacket, now time.Time) {
if s.firstAckElicitingPacketAfterIdleSentTime.IsZero() && packet.IsAckEliciting() {
s.firstAckElicitingPacketAfterIdleSentTime = now
}
@ -1802,7 +1802,7 @@ func (s *session) sendPackedPacket(packet *packedPacket, now time.Time) {
s.sendQueue.Send(packet.buffer)
}
func (s *session) sendConnectionClose(e error) ([]byte, error) {
func (s *connection) sendConnectionClose(e error) ([]byte, error) {
var packet *coalescedPacket
var err error
var transportErr *qerr.TransportError
@ -1814,7 +1814,7 @@ func (s *session) sendConnectionClose(e error) ([]byte, error) {
} else {
packet, err = s.packer.PackConnectionClose(&qerr.TransportError{
ErrorCode: qerr.InternalError,
ErrorMessage: fmt.Sprintf("session BUG: unspecified error type (msg: %s)", e.Error()),
ErrorMessage: fmt.Sprintf("connection BUG: unspecified error type (msg: %s)", e.Error()),
})
}
if err != nil {
@ -1824,7 +1824,7 @@ func (s *session) sendConnectionClose(e error) ([]byte, error) {
return packet.buffer.Data, s.conn.Write(packet.buffer.Data)
}
func (s *session) logPacketContents(p *packetContents) {
func (s *connection) logPacketContents(p *packetContents) {
// tracing
if s.tracer != nil {
frames := make([]logging.Frame, 0, len(p.frames))
@ -1847,7 +1847,7 @@ func (s *session) logPacketContents(p *packetContents) {
}
}
func (s *session) logCoalescedPacket(packet *coalescedPacket) {
func (s *connection) logCoalescedPacket(packet *coalescedPacket) {
if s.logger.Debug() {
if len(packet.packets) > 1 {
s.logger.Debugf("-> Sending coalesced packet (%d parts, %d bytes) for connection %s", len(packet.packets), packet.buffer.Len(), s.logID)
@ -1860,7 +1860,7 @@ func (s *session) logCoalescedPacket(packet *coalescedPacket) {
}
}
func (s *session) logPacket(packet *packedPacket) {
func (s *connection) logPacket(packet *packedPacket) {
if s.logger.Debug() {
s.logger.Debugf("-> Sending packet %d (%d bytes) for connection %s, %s", packet.header.PacketNumber, packet.buffer.Len(), s.logID, packet.EncryptionLevel())
}
@ -1868,32 +1868,32 @@ func (s *session) logPacket(packet *packedPacket) {
}
// AcceptStream returns the next stream openend by the peer
func (s *session) AcceptStream(ctx context.Context) (Stream, error) {
func (s *connection) AcceptStream(ctx context.Context) (Stream, error) {
return s.streamsMap.AcceptStream(ctx)
}
func (s *session) AcceptUniStream(ctx context.Context) (ReceiveStream, error) {
func (s *connection) AcceptUniStream(ctx context.Context) (ReceiveStream, error) {
return s.streamsMap.AcceptUniStream(ctx)
}
// OpenStream opens a stream
func (s *session) OpenStream() (Stream, error) {
func (s *connection) OpenStream() (Stream, error) {
return s.streamsMap.OpenStream()
}
func (s *session) OpenStreamSync(ctx context.Context) (Stream, error) {
func (s *connection) OpenStreamSync(ctx context.Context) (Stream, error) {
return s.streamsMap.OpenStreamSync(ctx)
}
func (s *session) OpenUniStream() (SendStream, error) {
func (s *connection) OpenUniStream() (SendStream, error) {
return s.streamsMap.OpenUniStream()
}
func (s *session) OpenUniStreamSync(ctx context.Context) (SendStream, error) {
func (s *connection) OpenUniStreamSync(ctx context.Context) (SendStream, error) {
return s.streamsMap.OpenUniStreamSync(ctx)
}
func (s *session) newFlowController(id protocol.StreamID) flowcontrol.StreamFlowController {
func (s *connection) newFlowController(id protocol.StreamID) flowcontrol.StreamFlowController {
initialSendWindow := s.peerParams.InitialMaxStreamDataUni
if id.Type() == protocol.StreamTypeBidi {
if id.InitiatedBy() == s.perspective {
@ -1915,14 +1915,14 @@ func (s *session) newFlowController(id protocol.StreamID) flowcontrol.StreamFlow
}
// scheduleSending signals that we have data for sending
func (s *session) scheduleSending() {
func (s *connection) scheduleSending() {
select {
case s.sendingScheduled <- struct{}{}:
default:
}
}
func (s *session) tryQueueingUndecryptablePacket(p *receivedPacket, hdr *wire.Header) {
func (s *connection) tryQueueingUndecryptablePacket(p *receivedPacket, hdr *wire.Header) {
if s.handshakeComplete {
panic("shouldn't queue undecryptable packets after handshake completion")
}
@ -1940,33 +1940,33 @@ func (s *session) tryQueueingUndecryptablePacket(p *receivedPacket, hdr *wire.He
s.undecryptablePackets = append(s.undecryptablePackets, p)
}
func (s *session) queueControlFrame(f wire.Frame) {
func (s *connection) queueControlFrame(f wire.Frame) {
s.framer.QueueControlFrame(f)
s.scheduleSending()
}
func (s *session) onHasStreamWindowUpdate(id protocol.StreamID) {
func (s *connection) onHasStreamWindowUpdate(id protocol.StreamID) {
s.windowUpdateQueue.AddStream(id)
s.scheduleSending()
}
func (s *session) onHasConnectionWindowUpdate() {
func (s *connection) onHasConnectionWindowUpdate() {
s.windowUpdateQueue.AddConnection()
s.scheduleSending()
}
func (s *session) onHasStreamData(id protocol.StreamID) {
func (s *connection) onHasStreamData(id protocol.StreamID) {
s.framer.AddActiveStream(id)
s.scheduleSending()
}
func (s *session) onStreamCompleted(id protocol.StreamID) {
func (s *connection) onStreamCompleted(id protocol.StreamID) {
if err := s.streamsMap.DeleteStream(id); err != nil {
s.closeLocal(err)
}
}
func (s *session) SendMessage(p []byte) error {
func (s *connection) SendMessage(p []byte) error {
f := &wire.DatagramFrame{DataLenPresent: true}
if protocol.ByteCount(len(p)) > f.MaxDataLen(s.peerParams.MaxDatagramFrameSize, s.version) {
return errors.New("message too large")
@ -1976,27 +1976,27 @@ func (s *session) SendMessage(p []byte) error {
return s.datagramQueue.AddAndWait(f)
}
func (s *session) ReceiveMessage() ([]byte, error) {
func (s *connection) ReceiveMessage() ([]byte, error) {
return s.datagramQueue.Receive()
}
func (s *session) LocalAddr() net.Addr {
func (s *connection) LocalAddr() net.Addr {
return s.conn.LocalAddr()
}
func (s *session) RemoteAddr() net.Addr {
func (s *connection) RemoteAddr() net.Addr {
return s.conn.RemoteAddr()
}
func (s *session) getPerspective() protocol.Perspective {
func (s *connection) getPerspective() protocol.Perspective {
return s.perspective
}
func (s *session) GetVersion() protocol.VersionNumber {
func (s *connection) GetVersion() protocol.VersionNumber {
return s.version
}
func (s *session) NextConnection() Connection {
func (s *connection) NextConnection() Connection {
<-s.HandshakeComplete().Done()
s.streamsMap.UseResetMaps()
return s

File diff suppressed because it is too large Load diff

View file

@ -12,7 +12,7 @@ import (
"log"
"math/big"
quic "github.com/lucas-clemente/quic-go"
"github.com/lucas-clemente/quic-go"
)
const addr = "localhost:4242"
@ -36,11 +36,11 @@ func echoServer() error {
if err != nil {
return err
}
sess, err := listener.Accept(context.Background())
conn, err := listener.Accept(context.Background())
if err != nil {
return err
}
stream, err := sess.AcceptStream(context.Background())
stream, err := conn.AcceptStream(context.Background())
if err != nil {
panic(err)
}
@ -54,12 +54,12 @@ func clientMain() error {
InsecureSkipVerify: true,
NextProtos: []string{"quic-echo-example"},
}
session, err := quic.DialAddr(addr, tlsConf, nil)
conn, err := quic.DialAddr(addr, tlsConf, nil)
if err != nil {
return err
}
stream, err := session.OpenStreamSync(context.Background())
stream, err := conn.OpenStreamSync(context.Background())
if err != nil {
return err
}

View file

@ -14,7 +14,7 @@ const InitialPacketSizeIPv6 = 1232
// MaxCongestionWindowPackets is the maximum congestion window in packet.
const MaxCongestionWindowPackets = 10000
// MaxUndecryptablePackets limits the number of undecryptable packets that are queued in the session.
// MaxUndecryptablePackets limits the number of undecryptable packets that are queued in the connection.
const MaxUndecryptablePackets = 32
// ConnectionFlowControlMultiplier determines how much larger the connection flow control windows needs to be relative to any stream's flow control window
@ -45,8 +45,8 @@ const DefaultMaxIncomingUniStreams = 100
// MaxServerUnprocessedPackets is the max number of packets stored in the server that are not yet processed.
const MaxServerUnprocessedPackets = 1024
// MaxSessionUnprocessedPackets is the max number of packets stored in each session that are not yet processed.
const MaxSessionUnprocessedPackets = 256
// MaxConnUnprocessedPackets is the max number of packets stored in each connection that are not yet processed.
const MaxConnUnprocessedPackets = 256
// SkipPacketInitialPeriod is the initial period length used for packet number skipping to prevent an Optimistic ACK attack.
// Every time a packet number is skipped, the period is doubled, up to SkipPacketMaxPeriod.
@ -190,6 +190,6 @@ const Max0RTTQueues = 32
// Max0RTTQueueLen is the maximum number of 0-RTT packets that we buffer for each connection.
// When a new connection is created, all buffered packets are passed to the connection immediately.
// To avoid blocking, this value has to be smaller than MaxSessionUnprocessedPackets.
// To avoid blocking, this value has to be smaller than MaxConnUnprocessedPackets.
// To avoid packets being dropped as undecryptable by the connection, this value has to be smaller than MaxUndecryptablePackets.
const Max0RTTQueueLen = 31

View file

@ -7,7 +7,7 @@ import (
var _ = Describe("Parameters", func() {
It("can queue more packets in the session than in the 0-RTT queue", func() {
Expect(MaxSessionUnprocessedPackets).To(BeNumerically(">", Max0RTTQueueLen))
Expect(MaxConnUnprocessedPackets).To(BeNumerically(">", Max0RTTQueueLen))
Expect(MaxUndecryptablePackets).To(BeNumerically(">", Max0RTTQueueLen))
})
})

View file

@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: session.go
// Source: connection.go
// Package quic is a generated GoMock package.
package quic

View file

@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: session.go
// Source: connection.go
// Package quic is a generated GoMock package.
package quic

View file

@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: session.go
// Source: connection.go
// Package quic is a generated GoMock package.
package quic

View file

@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: session.go
// Source: connection.go
// Package quic is a generated GoMock package.
package quic

View file

@ -32,7 +32,7 @@ type connManager struct {
}
// The connMultiplexer listens on multiple net.PacketConns and dispatches
// incoming packets to the session handler.
// incoming packets to the connection handler.
type connMultiplexer struct {
mutex sync.Mutex

View file

@ -64,7 +64,7 @@ type packetHandlerMapEntry struct {
// The packetHandlerMap stores packetHandlers, identified by connection ID.
// It is used:
// * by the server to store sessions
// * by the server to store connections
// * when multiplexing outgoing connections to store clients
type packetHandlerMap struct {
mutex sync.Mutex

View file

@ -73,7 +73,7 @@ type baseServer struct {
receivedPackets chan *receivedPacket
// set as a member, so they can be set in the tests
newSession func(
newConn func(
sendConn,
connRunner,
protocol.ConnectionID, /* original dest connection ID */
@ -209,7 +209,7 @@ func listen(conn net.PacketConn, tlsConf *tls.Config, config *Config, acceptEarl
errorChan: make(chan struct{}),
running: make(chan struct{}),
receivedPackets: make(chan *receivedPacket, protocol.MaxServerUnprocessedPackets),
newSession: newSession,
newConn: newConnection,
logger: utils.DefaultLogger.WithPrefix("server"),
acceptEarlyConns: acceptEarly,
}
@ -268,9 +268,9 @@ func (s *baseServer) accept(ctx context.Context) (quicConn, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case sess := <-s.connQueue:
case conn := <-s.connQueue:
atomic.AddInt32(&s.connQueueLen, -1)
return sess, nil
return conn, nil
case <-s.errorChan:
return nil, s.serverError
}
@ -468,7 +468,7 @@ func (s *baseServer) handleInitialImpl(p *receivedPacket, hdr *wire.Header) erro
connID,
)
}
conn = s.newSession(
conn = s.newConn(
newSendConn(s.conn, p.remoteAddr, p.info),
s.connHandler,
origDestConnID,
@ -501,19 +501,19 @@ func (s *baseServer) handleInitialImpl(p *receivedPacket, hdr *wire.Header) erro
}
func (s *baseServer) handleNewConn(conn quicConn) {
sessCtx := conn.Context()
connCtx := conn.Context()
if s.acceptEarlyConns {
// wait until the early connection is ready (or the handshake fails)
select {
case <-conn.earlyConnReady():
case <-sessCtx.Done():
case <-connCtx.Done():
return
}
} else {
// wait until the handshake is complete (or fails)
select {
case <-conn.HandshakeComplete().Done():
case <-sessCtx.Done():
case <-connCtx.Done():
return
}
}
@ -522,7 +522,7 @@ func (s *baseServer) handleNewConn(conn quicConn) {
select {
case s.connQueue <- conn:
// blocks until the connection is accepted
case <-sessCtx.Done():
case <-connCtx.Done():
atomic.AddInt32(&s.connQueueLen, -1)
// don't pass connections that were already closed to Accept()
}

View file

@ -324,7 +324,7 @@ var _ = Describe("Server", func() {
})
tracer.EXPECT().TracerForConnection(gomock.Any(), protocol.PerspectiveServer, protocol.ConnectionID{0xde, 0xad, 0xc0, 0xde})
conn := NewMockQuicConn(mockCtrl)
serv.newSession = func(
serv.newConn = func(
_ sendConn,
_ connRunner,
origDestConnID protocol.ConnectionID,
@ -603,7 +603,7 @@ var _ = Describe("Server", func() {
tracer.EXPECT().TracerForConnection(gomock.Any(), protocol.PerspectiveServer, protocol.ConnectionID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
conn := NewMockQuicConn(mockCtrl)
serv.newSession = func(
serv.newConn = func(
_ sendConn,
_ connRunner,
origDestConnID protocol.ConnectionID,
@ -663,7 +663,7 @@ var _ = Describe("Server", func() {
serv.config.AcceptToken = func(net.Addr, *Token) bool { return true }
acceptConn := make(chan struct{})
var counter uint32 // to be used as an atomic, so we query it in Eventually
serv.newSession = func(
serv.newConn = func(
_ sendConn,
runner connRunner,
_ protocol.ConnectionID,
@ -717,7 +717,7 @@ var _ = Describe("Server", func() {
serv.config.AcceptToken = func(_ net.Addr, _ *Token) bool { return true }
var createdConn bool
conn := NewMockQuicConn(mockCtrl)
serv.newSession = func(
serv.newConn = func(
_ sendConn,
runner connRunner,
_ protocol.ConnectionID,
@ -748,7 +748,7 @@ var _ = Describe("Server", func() {
It("rejects new connection attempts if the accept queue is full", func() {
serv.config.AcceptToken = func(_ net.Addr, _ *Token) bool { return true }
serv.newSession = func(
serv.newConn = func(
_ sendConn,
runner connRunner,
_ protocol.ConnectionID,
@ -820,7 +820,7 @@ var _ = Describe("Server", func() {
ctx, cancel := context.WithCancel(context.Background())
connCreated := make(chan struct{})
conn := NewMockQuicConn(mockCtrl)
serv.newSession = func(
serv.newConn = func(
_ sendConn,
runner connRunner,
_ protocol.ConnectionID,
@ -932,7 +932,7 @@ var _ = Describe("Server", func() {
ctx, cancel := context.WithCancel(context.Background()) // handshake context
serv.config.AcceptToken = func(_ net.Addr, _ *Token) bool { return true }
serv.newSession = func(
serv.newConn = func(
_ sendConn,
runner connRunner,
_ protocol.ConnectionID,
@ -1006,7 +1006,7 @@ var _ = Describe("Server", func() {
ready := make(chan struct{})
serv.config.AcceptToken = func(_ net.Addr, _ *Token) bool { return true }
serv.newSession = func(
serv.newConn = func(
_ sendConn,
runner connRunner,
_ protocol.ConnectionID,
@ -1049,7 +1049,7 @@ var _ = Describe("Server", func() {
serv.config.AcceptToken = func(_ net.Addr, _ *Token) bool { return true }
senderAddr := &net.UDPAddr{IP: net.IPv4(1, 2, 3, 4), Port: 42}
serv.newSession = func(
serv.newConn = func(
_ sendConn,
runner connRunner,
_ protocol.ConnectionID,
@ -1113,7 +1113,7 @@ var _ = Describe("Server", func() {
ctx, cancel := context.WithCancel(context.Background())
connCreated := make(chan struct{})
conn := NewMockQuicConn(mockCtrl)
serv.newSession = func(
serv.newConn = func(
_ sendConn,
runner connRunner,
_ protocol.ConnectionID,