mirror of
https://github.com/refraction-networking/uquic.git
synced 2025-04-04 20:57:36 +03:00
immediately remove sessions that were closed remotely
We only need to keep sessions that were closed locally, in order to retransmit the CONNECTION_CLOSE packet. For sessions that were closed by the peer, we don't need to wait for any more packets, and there's also no CONNECTION_CLOSE to retransmit. The same applies for sessions that were destroyed when receiving a Version Negotiation or a Retry packet.
This commit is contained in:
parent
1dd0b9a635
commit
65cca7b726
8 changed files with 46 additions and 4 deletions
|
@ -407,6 +407,7 @@ func (c *client) createNewTLSSession(version protocol.VersionNumber) error {
|
||||||
runner := &runner{
|
runner := &runner{
|
||||||
onHandshakeCompleteImpl: func(_ Session) { close(c.handshakeChan) },
|
onHandshakeCompleteImpl: func(_ Session) { close(c.handshakeChan) },
|
||||||
retireConnectionIDImpl: c.packetHandlers.Retire,
|
retireConnectionIDImpl: c.packetHandlers.Retire,
|
||||||
|
removeConnectionIDImpl: c.packetHandlers.Remove,
|
||||||
}
|
}
|
||||||
sess, err := newClientSession(
|
sess, err := newClientSession(
|
||||||
c.conn,
|
c.conn,
|
||||||
|
|
|
@ -54,6 +54,16 @@ func (mr *MockPacketHandlerManagerMockRecorder) CloseServer() *gomock.Call {
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseServer", reflect.TypeOf((*MockPacketHandlerManager)(nil).CloseServer))
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseServer", reflect.TypeOf((*MockPacketHandlerManager)(nil).CloseServer))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove mocks base method
|
||||||
|
func (m *MockPacketHandlerManager) Remove(arg0 protocol.ConnectionID) {
|
||||||
|
m.ctrl.Call(m, "Remove", arg0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove indicates an expected call of Remove
|
||||||
|
func (mr *MockPacketHandlerManagerMockRecorder) Remove(arg0 interface{}) *gomock.Call {
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Remove", reflect.TypeOf((*MockPacketHandlerManager)(nil).Remove), arg0)
|
||||||
|
}
|
||||||
|
|
||||||
// Retire mocks base method
|
// Retire mocks base method
|
||||||
func (m *MockPacketHandlerManager) Retire(arg0 protocol.ConnectionID) {
|
func (m *MockPacketHandlerManager) Retire(arg0 protocol.ConnectionID) {
|
||||||
m.ctrl.Call(m, "Retire", arg0)
|
m.ctrl.Call(m, "Retire", arg0)
|
||||||
|
|
|
@ -44,6 +44,16 @@ func (mr *MockSessionRunnerMockRecorder) onHandshakeComplete(arg0 interface{}) *
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "onHandshakeComplete", reflect.TypeOf((*MockSessionRunner)(nil).onHandshakeComplete), arg0)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "onHandshakeComplete", reflect.TypeOf((*MockSessionRunner)(nil).onHandshakeComplete), arg0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// removeConnectionID mocks base method
|
||||||
|
func (m *MockSessionRunner) removeConnectionID(arg0 protocol.ConnectionID) {
|
||||||
|
m.ctrl.Call(m, "removeConnectionID", arg0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// removeConnectionID indicates an expected call of removeConnectionID
|
||||||
|
func (mr *MockSessionRunnerMockRecorder) removeConnectionID(arg0 interface{}) *gomock.Call {
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "removeConnectionID", reflect.TypeOf((*MockSessionRunner)(nil).removeConnectionID), arg0)
|
||||||
|
}
|
||||||
|
|
||||||
// retireConnectionID mocks base method
|
// retireConnectionID mocks base method
|
||||||
func (m *MockSessionRunner) retireConnectionID(arg0 protocol.ConnectionID) {
|
func (m *MockSessionRunner) retireConnectionID(arg0 protocol.ConnectionID) {
|
||||||
m.ctrl.Call(m, "retireConnectionID", arg0)
|
m.ctrl.Call(m, "retireConnectionID", arg0)
|
||||||
|
|
|
@ -51,6 +51,12 @@ func (h *packetHandlerMap) Add(id protocol.ConnectionID, handler packetHandler)
|
||||||
h.mutex.Unlock()
|
h.mutex.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *packetHandlerMap) Remove(id protocol.ConnectionID) {
|
||||||
|
h.mutex.Lock()
|
||||||
|
delete(h.handlers, string(id))
|
||||||
|
h.mutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
func (h *packetHandlerMap) Retire(id protocol.ConnectionID) {
|
func (h *packetHandlerMap) Retire(id protocol.ConnectionID) {
|
||||||
h.retireByConnectionIDAsString(string(id))
|
h.retireByConnectionIDAsString(string(id))
|
||||||
}
|
}
|
||||||
|
|
|
@ -88,7 +88,15 @@ var _ = Describe("Packet Handler Map", func() {
|
||||||
Expect(err.Error()).To(ContainSubstring("error parsing invariant header:"))
|
Expect(err.Error()).To(ContainSubstring("error parsing invariant header:"))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("deletes closed session entries after a wait time", func() {
|
It("deletes removed session immediately", func() {
|
||||||
|
handler.deleteRetiredSessionsAfter = time.Hour
|
||||||
|
connID := protocol.ConnectionID{1, 2, 3, 4, 5, 6, 7, 8}
|
||||||
|
handler.Add(connID, NewMockPacketHandler(mockCtrl))
|
||||||
|
handler.Remove(connID)
|
||||||
|
Expect(handler.handlePacket(nil, getPacket(connID))).To(MatchError("received a packet with an unexpected connection ID 0x0102030405060708"))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("deletes retired session entries after a wait time", func() {
|
||||||
handler.deleteRetiredSessionsAfter = 10 * time.Millisecond
|
handler.deleteRetiredSessionsAfter = 10 * time.Millisecond
|
||||||
connID := protocol.ConnectionID{1, 2, 3, 4, 5, 6, 7, 8}
|
connID := protocol.ConnectionID{1, 2, 3, 4, 5, 6, 7, 8}
|
||||||
handler.Add(connID, NewMockPacketHandler(mockCtrl))
|
handler.Add(connID, NewMockPacketHandler(mockCtrl))
|
||||||
|
|
|
@ -33,6 +33,7 @@ type unknownPacketHandler interface {
|
||||||
type packetHandlerManager interface {
|
type packetHandlerManager interface {
|
||||||
Add(protocol.ConnectionID, packetHandler)
|
Add(protocol.ConnectionID, packetHandler)
|
||||||
Retire(protocol.ConnectionID)
|
Retire(protocol.ConnectionID)
|
||||||
|
Remove(protocol.ConnectionID)
|
||||||
SetServer(unknownPacketHandler)
|
SetServer(unknownPacketHandler)
|
||||||
CloseServer()
|
CloseServer()
|
||||||
}
|
}
|
||||||
|
@ -49,15 +50,18 @@ type quicSession interface {
|
||||||
type sessionRunner interface {
|
type sessionRunner interface {
|
||||||
onHandshakeComplete(Session)
|
onHandshakeComplete(Session)
|
||||||
retireConnectionID(protocol.ConnectionID)
|
retireConnectionID(protocol.ConnectionID)
|
||||||
|
removeConnectionID(protocol.ConnectionID)
|
||||||
}
|
}
|
||||||
|
|
||||||
type runner struct {
|
type runner struct {
|
||||||
onHandshakeCompleteImpl func(Session)
|
onHandshakeCompleteImpl func(Session)
|
||||||
retireConnectionIDImpl func(protocol.ConnectionID)
|
retireConnectionIDImpl func(protocol.ConnectionID)
|
||||||
|
removeConnectionIDImpl func(protocol.ConnectionID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *runner) onHandshakeComplete(s Session) { r.onHandshakeCompleteImpl(s) }
|
func (r *runner) onHandshakeComplete(s Session) { r.onHandshakeCompleteImpl(s) }
|
||||||
func (r *runner) retireConnectionID(c protocol.ConnectionID) { r.retireConnectionIDImpl(c) }
|
func (r *runner) retireConnectionID(c protocol.ConnectionID) { r.retireConnectionIDImpl(c) }
|
||||||
|
func (r *runner) removeConnectionID(c protocol.ConnectionID) { r.removeConnectionIDImpl(c) }
|
||||||
|
|
||||||
var _ sessionRunner = &runner{}
|
var _ sessionRunner = &runner{}
|
||||||
|
|
||||||
|
@ -153,6 +157,7 @@ func (s *server) setup() error {
|
||||||
s.sessionRunner = &runner{
|
s.sessionRunner = &runner{
|
||||||
onHandshakeCompleteImpl: func(sess Session) { s.sessionQueue <- sess },
|
onHandshakeCompleteImpl: func(sess Session) { s.sessionQueue <- sess },
|
||||||
retireConnectionIDImpl: s.sessionHandler.Retire,
|
retireConnectionIDImpl: s.sessionHandler.Retire,
|
||||||
|
removeConnectionIDImpl: s.sessionHandler.Remove,
|
||||||
}
|
}
|
||||||
cookieGenerator, err := handshake.NewCookieGenerator()
|
cookieGenerator, err := handshake.NewCookieGenerator()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -424,7 +424,6 @@ runLoop:
|
||||||
}
|
}
|
||||||
s.closed.Set(true)
|
s.closed.Set(true)
|
||||||
s.logger.Infof("Connection %s closed.", s.srcConnID)
|
s.logger.Infof("Connection %s closed.", s.srcConnID)
|
||||||
s.sessionRunner.retireConnectionID(s.srcConnID)
|
|
||||||
s.cryptoStreamHandler.Close()
|
s.cryptoStreamHandler.Close()
|
||||||
return closeErr.err
|
return closeErr.err
|
||||||
}
|
}
|
||||||
|
@ -718,6 +717,7 @@ func (s *session) handleAckFrame(frame *wire.AckFrame, encLevel protocol.Encrypt
|
||||||
// closeLocal closes the session and send a CONNECTION_CLOSE containing the error
|
// closeLocal closes the session and send a CONNECTION_CLOSE containing the error
|
||||||
func (s *session) closeLocal(e error) {
|
func (s *session) closeLocal(e error) {
|
||||||
s.closeOnce.Do(func() {
|
s.closeOnce.Do(func() {
|
||||||
|
s.sessionRunner.retireConnectionID(s.srcConnID)
|
||||||
s.closeChan <- closeError{err: e, sendClose: true, remote: false}
|
s.closeChan <- closeError{err: e, sendClose: true, remote: false}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -725,12 +725,14 @@ func (s *session) closeLocal(e error) {
|
||||||
// destroy closes the session without sending the error on the wire
|
// destroy closes the session without sending the error on the wire
|
||||||
func (s *session) destroy(e error) {
|
func (s *session) destroy(e error) {
|
||||||
s.closeOnce.Do(func() {
|
s.closeOnce.Do(func() {
|
||||||
|
s.sessionRunner.removeConnectionID(s.srcConnID)
|
||||||
s.closeChan <- closeError{err: e, sendClose: false, remote: false}
|
s.closeChan <- closeError{err: e, sendClose: false, remote: false}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *session) closeRemote(e error) {
|
func (s *session) closeRemote(e error) {
|
||||||
s.closeOnce.Do(func() {
|
s.closeOnce.Do(func() {
|
||||||
|
s.sessionRunner.removeConnectionID(s.srcConnID)
|
||||||
s.closeChan <- closeError{err: e, remote: true}
|
s.closeChan <- closeError{err: e, remote: true}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -326,7 +326,7 @@ var _ = Describe("Session", func() {
|
||||||
It("handles CONNECTION_CLOSE frames", func() {
|
It("handles CONNECTION_CLOSE frames", func() {
|
||||||
testErr := qerr.Error(qerr.ProofInvalid, "foobar")
|
testErr := qerr.Error(qerr.ProofInvalid, "foobar")
|
||||||
streamManager.EXPECT().CloseWithError(testErr)
|
streamManager.EXPECT().CloseWithError(testErr)
|
||||||
sessionRunner.EXPECT().retireConnectionID(gomock.Any())
|
sessionRunner.EXPECT().removeConnectionID(gomock.Any())
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -402,7 +402,7 @@ var _ = Describe("Session", func() {
|
||||||
|
|
||||||
It("closes the session in order to replace it with another QUIC version", func() {
|
It("closes the session in order to replace it with another QUIC version", func() {
|
||||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||||
sessionRunner.EXPECT().retireConnectionID(gomock.Any())
|
sessionRunner.EXPECT().removeConnectionID(gomock.Any())
|
||||||
cryptoSetup.EXPECT().Close()
|
cryptoSetup.EXPECT().Close()
|
||||||
sess.destroy(errCloseSessionForNewVersion)
|
sess.destroy(errCloseSessionForNewVersion)
|
||||||
Eventually(areSessionsRunning).Should(BeFalse())
|
Eventually(areSessionsRunning).Should(BeFalse())
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue