From 8b8d7729a24ae72783ce7f61f1e9cdb7d22ca83c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dipl=2E=20Ing=2E=20P=C3=A9ter=20Varkoly?= Date: Mon, 26 Aug 2024 14:16:40 +0200 Subject: [PATCH] Now ldap auth can use ldap and ldap3 also. --- radicale/auth/ldap.py | 72 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 64 insertions(+), 8 deletions(-) diff --git a/radicale/auth/ldap.py b/radicale/auth/ldap.py index a55ceb65..79feccd6 100644 --- a/radicale/auth/ldap.py +++ b/radicale/auth/ldap.py @@ -24,7 +24,6 @@ Following parameters are needed in the configuration ldap_load_groups If the groups of the authenticated users need to be loaded """ -import ldap from radicale import auth, config from radicale.log import logger @@ -35,9 +34,18 @@ class Auth(auth.BaseAuth): _ldap_secret: str _ldap_filter: str _ldap_load_groups: bool + _ldap_version: 3 def __init__(self, configuration: config.Configuration) -> None: super().__init__(configuration) + try: + import ldap3 + except ImportError as e: + try: + import ldap + self._ldap_version = 2 + except ImportError as e: + raise RuntimeError("LDAP authentication requires the ldap3 module") from e self._ldap_uri = configuration.get("auth", "ldap_uri") self._ldap_base = configuration.get("auth", "ldap_base") self._ldap_reader_dn = configuration.get("auth", "ldap_reader_dn") @@ -45,13 +53,7 @@ class Auth(auth.BaseAuth): self._ldap_secret = configuration.get("auth", "ldap_secret") self._ldap_filter = configuration.get("auth", "ldap_filter") - def login(self, login: str, password: str) -> str: - """Validate credentials. - In first step we make a connection to the ldap server with the ldap_reader_dn credential. - In next step the DN of the user to authenticate will be searched. - In the last step the authentication of the user will be proceeded. - - """ + def _login2(self, login: str, password: str) -> str: try: """Bind as reader dn""" conn = ldap.initialize(self._ldap_uri) @@ -87,3 +89,57 @@ class Auth(auth.BaseAuth): return login except ldap.INVALID_CREDENTIALS: return "" + + def _login3(self, login: str, password: str) -> str: + """Connect the server""" + try: + server = ldap3.Server(self._ldap_uri) + conn = ldap3.Connection(server, self._ldap_reader_dn, password=self._ldap_secret) + except self.ldap3.core.exceptions.LDAPSocketOpenError: + raise RuntimeError("Unable to reach ldap server") + except Exception: + pass + + if not conn.bind(): + raise RuntimeError("Unable to read from ldap server") + + """Search the user dn""" + conn.search( + search_base = self._ldap_base, + search_filter = self._ldap_filter.format(login) + search_scope = 'SUBTREE', + attributes = ['memberOf'] + ) + if len(conn.entries) == 0: + """User could not be find""" + return "" + + user_entry = conn.entries[0].entry_to_json() + conn.unbind() + user_dn = user_entry['dn'] + try: + """Try to bind as the user itself""" + conn = ldap3.Connection(server, user_dn, password=password) + if not conn.bind(): + return "" + if self._ldap_load_groups: + tmp = [] + for g in user_entry['attributes']['memberOf']: + tmp.append(g) + self._ldap_groups = set(tmp) + conn.unbind() + return login + except Exception: + pass + return "" + + def login(self, login: str, password: str) -> str: + """Validate credentials. + In first step we make a connection to the ldap server with the ldap_reader_dn credential. + In next step the DN of the user to authenticate will be searched. + In the last step the authentication of the user will be proceeded. + """ + if self._ldap_version == 2: + return _login2(self, login, password) + return _login3(self, login, password) +