From 84b54eb7023c7f2734a85e9d9f295c4f8af9b0bd Mon Sep 17 00:00:00 2001 From: Toby Date: Fri, 26 Jan 2024 11:49:19 -0800 Subject: [PATCH 1/2] fix: incorrect reconnect logic that causes blocking when dialing connections --- core/client/reconnect.go | 44 ++++++++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/core/client/reconnect.go b/core/client/reconnect.go index 137285f..46f72bc 100644 --- a/core/client/reconnect.go +++ b/core/client/reconnect.go @@ -58,52 +58,60 @@ func (rc *reconnectableClientImpl) reconnect() error { func (rc *reconnectableClientImpl) TCP(addr string) (net.Conn, error) { rc.m.Lock() - defer rc.m.Unlock() if rc.closed { + rc.m.Unlock() return nil, coreErrs.ClosedError{} } if rc.client == nil { // No active connection, connect first if err := rc.reconnect(); err != nil { + rc.m.Unlock() return nil, err } } - conn, err := rc.client.TCP(addr) + client := rc.client + rc.m.Unlock() + + conn, err := client.TCP(addr) if _, ok := err.(coreErrs.ClosedError); ok { - // Connection closed, reconnect - if err := rc.reconnect(); err != nil { - return nil, err + // Connection closed, set client to nil for reconnect next time + rc.m.Lock() + // In case the client has already been reconnected by another goroutine + if rc.client == client { + rc.client = nil } - return rc.client.TCP(addr) - } else { - // OK or some other temporary error - return conn, err + rc.m.Unlock() } + return conn, err } func (rc *reconnectableClientImpl) UDP() (HyUDPConn, error) { rc.m.Lock() - defer rc.m.Unlock() if rc.closed { + rc.m.Unlock() return nil, coreErrs.ClosedError{} } if rc.client == nil { // No active connection, connect first if err := rc.reconnect(); err != nil { + rc.m.Unlock() return nil, err } } - conn, err := rc.client.UDP() + client := rc.client + rc.m.Unlock() + + conn, err := client.UDP() if _, ok := err.(coreErrs.ClosedError); ok { - // Connection closed, reconnect - if err := rc.reconnect(); err != nil { - return nil, err + // Connection closed, set client to nil for reconnect next time + rc.m.Lock() + // In case the client has already been reconnected by another goroutine + if rc.client == client { + rc.client = nil } - return rc.client.UDP() - } else { - // OK or some other temporary error - return conn, err + rc.m.Unlock() } + return conn, err } func (rc *reconnectableClientImpl) Close() error { From ae402d9d919df6aec334f1f1490e036c735fed07 Mon Sep 17 00:00:00 2001 From: Toby Date: Fri, 26 Jan 2024 13:19:02 -0800 Subject: [PATCH 2/2] chore: code improvements --- core/client/reconnect.go | 53 ++++++++++++++++++---------------------- 1 file changed, 24 insertions(+), 29 deletions(-) diff --git a/core/client/reconnect.go b/core/client/reconnect.go index 46f72bc..05d60b3 100644 --- a/core/client/reconnect.go +++ b/core/client/reconnect.go @@ -56,7 +56,11 @@ func (rc *reconnectableClientImpl) reconnect() error { } } -func (rc *reconnectableClientImpl) TCP(addr string) (net.Conn, error) { +// clientDo calls f with the current client. +// If the client is nil, it will first reconnect. +// It will also detect if the client is closed, and if so, +// set it to nil for reconnect next time. +func (rc *reconnectableClientImpl) clientDo(f func(Client) (interface{}, error)) (interface{}, error) { rc.m.Lock() if rc.closed { rc.m.Unlock() @@ -72,46 +76,37 @@ func (rc *reconnectableClientImpl) TCP(addr string) (net.Conn, error) { client := rc.client rc.m.Unlock() - conn, err := client.TCP(addr) + ret, err := f(client) if _, ok := err.(coreErrs.ClosedError); ok { // Connection closed, set client to nil for reconnect next time rc.m.Lock() - // In case the client has already been reconnected by another goroutine if rc.client == client { + // This check is in case the client is already changed by another goroutine rc.client = nil } rc.m.Unlock() } - return conn, err + return ret, err +} + +func (rc *reconnectableClientImpl) TCP(addr string) (net.Conn, error) { + if c, err := rc.clientDo(func(client Client) (interface{}, error) { + return client.TCP(addr) + }); err != nil { + return nil, err + } else { + return c.(net.Conn), nil + } } func (rc *reconnectableClientImpl) UDP() (HyUDPConn, error) { - rc.m.Lock() - if rc.closed { - rc.m.Unlock() - return nil, coreErrs.ClosedError{} + if c, err := rc.clientDo(func(client Client) (interface{}, error) { + return client.UDP() + }); err != nil { + return nil, err + } else { + return c.(HyUDPConn), nil } - if rc.client == nil { - // No active connection, connect first - if err := rc.reconnect(); err != nil { - rc.m.Unlock() - return nil, err - } - } - client := rc.client - rc.m.Unlock() - - conn, err := client.UDP() - if _, ok := err.(coreErrs.ClosedError); ok { - // Connection closed, set client to nil for reconnect next time - rc.m.Lock() - // In case the client has already been reconnected by another goroutine - if rc.client == client { - rc.client = nil - } - rc.m.Unlock() - } - return conn, err } func (rc *reconnectableClientImpl) Close() error {