Merge pull request #1666 from marschap/LDAPauth-patches

LDAP auth patches - thank you!
This commit is contained in:
Peter Bieringer 2025-01-02 21:11:32 +00:00 committed by GitHub
commit 234be74b87
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 91 additions and 36 deletions

View file

@ -901,6 +901,12 @@ The search filter to find the user DN to authenticate by the username. User '{0}
Default: `(cn={0})` Default: `(cn={0})`
#### ldap_user_attribute
The LDAP attribute whose value shall be used as the user name after successful authentication
Default: not set, i.e. the login name given is used directly.
##### ldap_load_groups ##### ldap_load_groups
Load the ldap groups of the authenticated user. These groups can be used later on to define rights. This also gives you access to the group calendars, if they exist. Load the ldap groups of the authenticated user. These groups can be used later on to define rights. This also gives you access to the group calendars, if they exist.

3
config
View file

@ -83,6 +83,9 @@
# The filter to find the DN of the user. This filter must contain a python-style placeholder for the login # The filter to find the DN of the user. This filter must contain a python-style placeholder for the login
#ldap_filter = (&(objectClass=person)(uid={0})) #ldap_filter = (&(objectClass=person)(uid={0}))
# the attribute holding the value to be used as username after authentication
#ldap_user_attribute = cn
# Use ssl on the ldap connection # Use ssl on the ldap connection
#ldap_use_ssl = False #ldap_use_ssl = False

View file

@ -15,14 +15,15 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with Radicale. If not, see <http://www.gnu.org/licenses/>. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
""" """
Authentication backend that checks credentials with a ldap server. Authentication backend that checks credentials with a LDAP server.
Following parameters are needed in the configuration: Following parameters are needed in the configuration:
ldap_uri The ldap url to the server like ldap://localhost ldap_uri The LDAP URL to the server like ldap://localhost
ldap_base The baseDN of the ldap server ldap_base The baseDN of the LDAP server
ldap_reader_dn The DN of a ldap user with read access to get the user accounts ldap_reader_dn The DN of a LDAP user with read access to get the user accounts
ldap_secret The password of the ldap_reader_dn ldap_secret The password of the ldap_reader_dn
ldap_secret_file The path of the file containing the password of the ldap_reader_dn ldap_secret_file The path of the file containing the password of the ldap_reader_dn
ldap_filter The search filter to find the user to authenticate by the username ldap_filter The search filter to find the user to authenticate by the username
ldap_user_attribute The attribute to be used as username after authentication
ldap_load_groups If the groups of the authenticated users need to be loaded ldap_load_groups If the groups of the authenticated users need to be loaded
Following parameters controls SSL connections: Following parameters controls SSL connections:
ldap_use_ssl If the connection ldap_use_ssl If the connection
@ -42,8 +43,9 @@ class Auth(auth.BaseAuth):
_ldap_reader_dn: str _ldap_reader_dn: str
_ldap_secret: str _ldap_secret: str
_ldap_filter: str _ldap_filter: str
_ldap_user_attr: str
_ldap_load_groups: bool _ldap_load_groups: bool
_ldap_version: int = 3 _ldap_module_version: int = 3
_ldap_use_ssl: bool = False _ldap_use_ssl: bool = False
_ldap_ssl_verify_mode: int = ssl.CERT_REQUIRED _ldap_ssl_verify_mode: int = ssl.CERT_REQUIRED
_ldap_ssl_ca_file: str = "" _ldap_ssl_ca_file: str = ""
@ -56,7 +58,7 @@ class Auth(auth.BaseAuth):
except ImportError: except ImportError:
try: try:
import ldap import ldap
self._ldap_version = 2 self._ldap_module_version = 2
self.ldap = ldap self.ldap = ldap
except ImportError as e: except ImportError as e:
raise RuntimeError("LDAP authentication requires the ldap3 module") from e raise RuntimeError("LDAP authentication requires the ldap3 module") from e
@ -66,11 +68,12 @@ class Auth(auth.BaseAuth):
self._ldap_load_groups = configuration.get("auth", "ldap_load_groups") self._ldap_load_groups = configuration.get("auth", "ldap_load_groups")
self._ldap_secret = configuration.get("auth", "ldap_secret") self._ldap_secret = configuration.get("auth", "ldap_secret")
self._ldap_filter = configuration.get("auth", "ldap_filter") self._ldap_filter = configuration.get("auth", "ldap_filter")
self._ldap_user_attr = configuration.get("auth", "ldap_user_attribute")
ldap_secret_file_path = configuration.get("auth", "ldap_secret_file") ldap_secret_file_path = configuration.get("auth", "ldap_secret_file")
if ldap_secret_file_path: if ldap_secret_file_path:
with open(ldap_secret_file_path, 'r') as file: with open(ldap_secret_file_path, 'r') as file:
self._ldap_secret = file.read().rstrip('\n') self._ldap_secret = file.read().rstrip('\n')
if self._ldap_version == 3: if self._ldap_module_version == 3:
self._ldap_use_ssl = configuration.get("auth", "ldap_use_ssl") self._ldap_use_ssl = configuration.get("auth", "ldap_use_ssl")
if self._ldap_use_ssl: if self._ldap_use_ssl:
self._ldap_ssl_ca_file = configuration.get("auth", "ldap_ssl_ca_file") self._ldap_ssl_ca_file = configuration.get("auth", "ldap_ssl_ca_file")
@ -84,6 +87,10 @@ class Auth(auth.BaseAuth):
logger.info("auth.ldap_reader_dn : %r" % self._ldap_reader_dn) logger.info("auth.ldap_reader_dn : %r" % self._ldap_reader_dn)
logger.info("auth.ldap_load_groups : %s" % self._ldap_load_groups) logger.info("auth.ldap_load_groups : %s" % self._ldap_load_groups)
logger.info("auth.ldap_filter : %r" % self._ldap_filter) logger.info("auth.ldap_filter : %r" % self._ldap_filter)
if self._ldap_user_attr:
logger.info("auth.ldap_user_attribute : %r" % self._ldap_user_attr)
else:
logger.info("auth.ldap_user_attribute : (not provided)")
if ldap_secret_file_path: if ldap_secret_file_path:
logger.info("auth.ldap_secret_file_path: %r" % ldap_secret_file_path) logger.info("auth.ldap_secret_file_path: %r" % ldap_secret_file_path)
if self._ldap_secret: if self._ldap_secret:
@ -94,7 +101,7 @@ class Auth(auth.BaseAuth):
logger.info("auth.ldap_secret : (from config)") logger.info("auth.ldap_secret : (from config)")
if self._ldap_reader_dn and not self._ldap_secret: if self._ldap_reader_dn and not self._ldap_secret:
logger.error("auth.ldap_secret : (not provided)") logger.error("auth.ldap_secret : (not provided)")
raise RuntimeError("LDAP authentication requires ldap_secret for reader_dn") raise RuntimeError("LDAP authentication requires ldap_secret for ldap_reader_dn")
logger.info("auth.ldap_use_ssl : %s" % self._ldap_use_ssl) logger.info("auth.ldap_use_ssl : %s" % self._ldap_use_ssl)
if self._ldap_use_ssl is True: if self._ldap_use_ssl is True:
logger.info("auth.ldap_ssl_verify_mode : %s" % self._ldap_ssl_verify_mode) logger.info("auth.ldap_ssl_verify_mode : %s" % self._ldap_ssl_verify_mode)
@ -112,16 +119,29 @@ class Auth(auth.BaseAuth):
conn.set_option(self.ldap.OPT_REFERRALS, 0) conn.set_option(self.ldap.OPT_REFERRALS, 0)
conn.simple_bind_s(self._ldap_reader_dn, self._ldap_secret) conn.simple_bind_s(self._ldap_reader_dn, self._ldap_secret)
"""Search for the dn of user to authenticate""" """Search for the dn of user to authenticate"""
res = conn.search_s(self._ldap_base, self.ldap.SCOPE_SUBTREE, filterstr=self._ldap_filter.format(login), attrlist=['memberOf']) escaped_login = self.ldap.filter.escape_filter_chars(login)
if len(res) == 0: logger.debug(f"_login2 login escaped for LDAP filters: {escaped_login}")
"""User could not be find""" attrs = ['memberof']
if self._ldap_user_attr:
attrs = ['memberOf', self._ldap_user_attr]
logger.debug(f"_login2 attrs: {attrs}")
res = conn.search_s(
self._ldap_base,
self.ldap.SCOPE_SUBTREE,
filterstr=self._ldap_filter.format(escaped_login),
attrlist=attrs
)
if len(res) != 1:
"""User could not be found unambiguously"""
logger.debug(f"_login2 no unique DN found for '{login}'")
return "" return ""
user_dn = res[0][0] user_entry = res[0]
logger.debug("LDAP Auth user: %s", user_dn) user_dn = user_entry[0]
"""Close ldap connection""" logger.debug(f"_login2 found LDAP user DN {user_dn}")
"""Close LDAP connection"""
conn.unbind() conn.unbind()
except Exception as e: except Exception as e:
raise RuntimeError(f"Invalid ldap configuration:{e}") raise RuntimeError(f"Invalid LDAP configuration:{e}")
try: try:
"""Bind as user to authenticate""" """Bind as user to authenticate"""
@ -132,11 +152,19 @@ class Auth(auth.BaseAuth):
tmp: list[str] = [] tmp: list[str] = []
if self._ldap_load_groups: if self._ldap_load_groups:
tmp = [] tmp = []
for t in res[0][1]['memberOf']: for g in user_entry[1]['memberOf']:
tmp.append(t.decode('utf-8').split(',')[0][3:]) """Get group g's RDN's attribute value"""
g = g.decode('utf-8').split(',')[0]
tmp.append(g.partition('=')[2])
self._ldap_groups = set(tmp) self._ldap_groups = set(tmp)
logger.debug("LDAP Auth groups of user: %s", ",".join(self._ldap_groups)) logger.debug("_login2 LDAP groups of user: %s", ",".join(self._ldap_groups))
if self._ldap_user_attr:
if user_entry[1][self._ldap_user_attr]:
tmplogin = user_entry[1][self._ldap_user_attr][0]
login = tmplogin.decode('utf-8')
logger.debug(f"_login2 user set to: '{login}'")
conn.unbind() conn.unbind()
logger.debug(f"_login2 {login} successfully authenticated")
return login return login
except self.ldap.INVALID_CREDENTIALS: except self.ldap.INVALID_CREDENTIALS:
return "" return ""
@ -157,45 +185,59 @@ class Auth(auth.BaseAuth):
server = self.ldap3.Server(self._ldap_uri) server = self.ldap3.Server(self._ldap_uri)
conn = self.ldap3.Connection(server, self._ldap_reader_dn, password=self._ldap_secret) conn = self.ldap3.Connection(server, self._ldap_reader_dn, password=self._ldap_secret)
except self.ldap3.core.exceptions.LDAPSocketOpenError: except self.ldap3.core.exceptions.LDAPSocketOpenError:
raise RuntimeError("Unable to reach ldap server") raise RuntimeError("Unable to reach LDAP server")
except Exception as e: except Exception as e:
logger.debug(f"_login3 error 1 {e}") logger.debug(f"_login3 error 1 {e}")
pass pass
if not conn.bind(): if not conn.bind():
logger.debug("_login3 can not bind") logger.debug("_login3 cannot bind")
raise RuntimeError("Unable to read from ldap server") raise RuntimeError("Unable to read from LDAP server")
logger.debug(f"_login3 bind as {self._ldap_reader_dn}") logger.debug(f"_login3 bind as {self._ldap_reader_dn}")
"""Search the user dn""" """Search the user dn"""
escaped_login = self.ldap3.utils.conv.escape_filter_chars(login)
logger.debug(f"_login3 login escaped for LDAP filters: {escaped_login}")
attrs = ['memberof']
if self._ldap_user_attr:
attrs = ['memberOf', self._ldap_user_attr]
logger.debug(f"_login3 attrs: {attrs}")
conn.search( conn.search(
search_base=self._ldap_base, search_base=self._ldap_base,
search_filter=self._ldap_filter.format(login), search_filter=self._ldap_filter.format(escaped_login),
search_scope=self.ldap3.SUBTREE, search_scope=self.ldap3.SUBTREE,
attributes=['memberOf'] attributes=attrs
) )
if len(conn.entries) == 0: if len(conn.entries) != 1:
logger.debug(f"_login3 user '{login}' can not be find") """User could not be found unambiguously"""
"""User could not be find""" logger.debug(f"_login3 no unique DN found for '{login}'")
return "" return ""
user_entry = conn.response[0] user_entry = conn.response[0]
conn.unbind() conn.unbind()
user_dn = user_entry['dn'] user_dn = user_entry['dn']
logger.debug(f"_login3 found user_dn {user_dn}") logger.debug(f"_login3 found LDAP user DN {user_dn}")
try: try:
"""Try to bind as the user itself""" """Try to bind as the user itself"""
conn = self.ldap3.Connection(server, user_dn, password=password) conn = self.ldap3.Connection(server, user_dn, password=password)
if not conn.bind(): if not conn.bind():
logger.debug(f"_login3 user '{login}' can not be find") logger.debug(f"_login3 user '{login}' cannot be found")
return "" return ""
tmp: list[str] = []
if self._ldap_load_groups: if self._ldap_load_groups:
tmp = [] tmp = []
for g in user_entry['attributes']['memberOf']: for g in user_entry['attributes']['memberOf']:
tmp.append(g.split(',')[0][3:]) """Get group g's RDN's attribute value"""
g = g.split(',')[0]
tmp.append(g.partition('=')[2])
self._ldap_groups = set(tmp) self._ldap_groups = set(tmp)
logger.debug("_login3 LDAP groups of user: %s", ",".join(self._ldap_groups))
if self._ldap_user_attr:
if user_entry['attributes'][self._ldap_user_attr]:
login = user_entry['attributes'][self._ldap_user_attr][0]
logger.debug(f"_login3 user set to: '{login}'")
conn.unbind() conn.unbind()
logger.debug(f"_login3 {login} successfully authorized") logger.debug(f"_login3 {login} successfully authenticated")
return login return login
except Exception as e: except Exception as e:
logger.debug(f"_login3 error 2 {e}") logger.debug(f"_login3 error 2 {e}")
@ -204,10 +246,10 @@ class Auth(auth.BaseAuth):
def _login(self, login: str, password: str) -> str: def _login(self, login: str, password: str) -> str:
"""Validate credentials. """Validate credentials.
In first step we make a connection to the ldap server with the ldap_reader_dn credential. In first step we make a connection to the LDAP server with the ldap_reader_dn credential.
In next step the DN of the user to authenticate will be searched. In next step the DN of the user to authenticate will be searched.
In the last step the authentication of the user will be proceeded. In the last step the authentication of the user will be proceeded.
""" """
if self._ldap_version == 2: if self._ldap_module_version == 2:
return self._login2(login, password) return self._login2(login, password)
return self._login3(login, password) return self._login3(login, password)

View file

@ -227,6 +227,10 @@ DEFAULT_CONFIG_SCHEMA: types.CONFIG_SCHEMA = OrderedDict([
"value": "(cn={0})", "value": "(cn={0})",
"help": "the search filter to find the user DN to authenticate by the username", "help": "the search filter to find the user DN to authenticate by the username",
"type": str}), "type": str}),
("ldap_user_attribute", {
"value": "",
"help": "the attribute to be used as username after authentication",
"type": str}),
("ldap_load_groups", { ("ldap_load_groups", {
"value": "False", "value": "False",
"help": "load the ldap groups of the authenticated user", "help": "load the ldap groups of the authenticated user",