Merge pull request #1218 from petervarkoly/master

Initial version of ldap authentication plugin.
This commit is contained in:
Peter Bieringer 2024-09-13 18:13:15 +02:00 committed by GitHub
commit da844f48e6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 283 additions and 5 deletions

View file

@ -744,6 +744,9 @@ Available backends:
authentication. This can be used to provide the username from a reverse
proxy.
`ldap`
: Use a LDAP or AD server to authenticate users.
Default: `none`
##### htpasswd_filename
@ -799,6 +802,42 @@ Message displayed in the client when a password is needed.
Default: `Radicale - Password Required`
##### ldap_uri
The URI to the ldap server
Default: `ldap://localhost`
##### ldap_base
LDAP base DN of the ldap server. This parameter must be provided if auth type is ldap.
Default:
##### ldap_reader_dn
The DN of a ldap user with read access to get the user accounts. This parameter must be provided if auth type is ldap.
Default:
##### ldap_secret
The password of the ldap_reader_dn. This parameter must be provided if auth type is ldap.
Default:
##### ldap_filter
The search filter to find the user DN to authenticate by the username. User '{0}' as placeholder for the user name.
Default: `(cn={0})`
##### ldap_load_groups
Load the ldap groups of the authenticated user. These groups can be used later on to define rights.
Default: False
##### lc_username
Сonvert username to lowercase, must be true for case-insensitive auth

18
config
View file

@ -53,6 +53,24 @@
[auth]
# Authentication method
# Value: none | htpasswd | remote_user | http_x_remote_user | ldap
#type = none
# URI to the LDAP server
#ldap_uri = ldap://localhost
# The base DN of the LDAP server
#ldap_base = ##BASE_DN##
# The reader DN of the LDAP server
#ldap_reader_dn = CN=ldapreader,CN=Users,##BASE_DN##
# Password of the reader DN
#ldap_secret = ldapreader-secret
# If the ldap groups of the user need to be loaded
#ldap_load_groups = True
# Value: none | htpasswd | remote_user | http_x_remote_user | denyall
#type = none

View file

@ -250,6 +250,12 @@ class Application(ApplicationPartDelete, ApplicationPartHead,
authorization.encode("ascii"))).split(":", 1)
user = self._auth.login(login, password) or "" if login else ""
if self.configuration.get("auth", "type") == "ldap":
try:
logger.debug("Groups %r", ",".join(self._auth._ldap_groups))
self._rights._user_groups = self._auth._ldap_groups
except AttributeError:
pass
if user and login == user:
logger.info("Successful login: %r", user)
elif user:

View file

@ -29,14 +29,15 @@ Take a look at the class ``BaseAuth`` if you want to implement your own.
"""
from typing import Sequence, Tuple, Union
from typing import Sequence, Set, Tuple, Union
from radicale import config, types, utils
from radicale.log import logger
INTERNAL_TYPES: Sequence[str] = ("none", "remote_user", "http_x_remote_user",
"denyall",
"htpasswd")
"htpasswd",
"ldap")
def load(configuration: "config.Configuration") -> "BaseAuth":
@ -51,6 +52,7 @@ def load(configuration: "config.Configuration") -> "BaseAuth":
class BaseAuth:
_ldap_groups: Set[str] = set([])
_lc_username: bool
_strip_domain: bool

147
radicale/auth/ldap.py Normal file
View file

@ -0,0 +1,147 @@
# This file is part of Radicale - CalDAV and CardDAV server
# Copyright 2022 Peter Varkoly
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Radicale. If not, see <http://www.gnu.org/licenses/>.
"""
Authentication backend that checks credentials with a ldap server.
Following parameters are needed in the configuration
ldap_uri The ldap url to the server like ldap://localhost
ldap_base The baseDN of the ldap server
ldap_reader_dn The DN of a ldap user with read access to get the user accounts
ldap_secret The password of the ldap_reader_dn
ldap_filter The search filter to find the user to authenticate by the username
ldap_load_groups If the groups of the authenticated users need to be loaded
"""
from radicale import auth, config
from radicale.log import logger
class Auth(auth.BaseAuth):
_ldap_uri: str
_ldap_base: str
_ldap_reader_dn: str
_ldap_secret: str
_ldap_filter: str
_ldap_load_groups: bool
_ldap_version: int = 3
def __init__(self, configuration: config.Configuration) -> None:
super().__init__(configuration)
try:
import ldap3
self.ldap3 = ldap3
except ImportError:
try:
import ldap
self._ldap_version = 2
self.ldap = ldap
except ImportError as e:
raise RuntimeError("LDAP authentication requires the ldap3 module") from e
self._ldap_uri = configuration.get("auth", "ldap_uri")
self._ldap_base = configuration.get("auth", "ldap_base")
self._ldap_reader_dn = configuration.get("auth", "ldap_reader_dn")
self._ldap_load_groups = configuration.get("auth", "ldap_load_groups")
self._ldap_secret = configuration.get("auth", "ldap_secret")
self._ldap_filter = configuration.get("auth", "ldap_filter")
def _login2(self, login: str, password: str) -> str:
try:
"""Bind as reader dn"""
conn = self.ldap.initialize(self._ldap_uri)
conn.protocol_version = 3
conn.set_option(self.ldap.OPT_REFERRALS, 0)
conn.simple_bind_s(self._ldap_reader_dn, self._ldap_secret)
"""Search for the dn of user to authenticate"""
res = conn.search_s(self._ldap_base, self.ldap.SCOPE_SUBTREE, filterstr=self._ldap_filter.format(login), attrlist=['memberOf'])
if len(res) == 0:
"""User could not be find"""
return ""
user_dn = res[0][0]
logger.debug("LDAP Auth user: %s", user_dn)
"""Close ldap connection"""
conn.unbind()
except Exception:
raise RuntimeError("Invalide ldap configuration")
try:
"""Bind as user to authenticate"""
conn = self.ldap.initialize(self._ldap_uri)
conn.protocol_version = 3
conn.set_option(self.ldap.OPT_REFERRALS, 0)
conn.simple_bind_s(user_dn, password)
tmp: list[str] = []
if self._ldap_load_groups:
tmp = []
for t in res[0][1]['memberOf']:
tmp.append(t.decode('utf-8').split(',')[0][3:])
self._ldap_groups = set(tmp)
logger.debug("LDAP Auth groups of user: %s", ",".join(self._ldap_groups))
conn.unbind()
return login
except self.ldap.INVALID_CREDENTIALS:
return ""
def _login3(self, login: str, password: str) -> str:
"""Connect the server"""
try:
server = self.ldap3.Server(self._ldap_uri)
conn = self.ldap3.Connection(server, self._ldap_reader_dn, password=self._ldap_secret)
except self.ldap3.core.exceptions.LDAPSocketOpenError:
raise RuntimeError("Unable to reach ldap server")
except Exception:
pass
if not conn.bind():
raise RuntimeError("Unable to read from ldap server")
"""Search the user dn"""
conn.search(
search_base=self._ldap_base,
search_filter=self._ldap_filter.format(login),
search_scope='SUBTREE',
attributes=['memberOf']
)
if len(conn.entries) == 0:
"""User could not be find"""
return ""
user_entry = conn.entries[0].entry_to_json()
conn.unbind()
user_dn = user_entry['dn']
try:
"""Try to bind as the user itself"""
conn = self.ldap3.Connection(server, user_dn, password=password)
if not conn.bind():
return ""
if self._ldap_load_groups:
tmp = []
for g in user_entry['attributes']['memberOf']:
tmp.append(g)
self._ldap_groups = set(tmp)
conn.unbind()
return login
except Exception:
pass
return ""
def login(self, login: str, password: str) -> str:
"""Validate credentials.
In first step we make a connection to the ldap server with the ldap_reader_dn credential.
In next step the DN of the user to authenticate will be searched.
In the last step the authentication of the user will be proceeded.
"""
if self._ldap_version == 2:
return self._login2(login, password)
return self._login3(login, password)

View file

@ -191,6 +191,30 @@ DEFAULT_CONFIG_SCHEMA: types.CONFIG_SCHEMA = OrderedDict([
"value": "1",
"help": "incorrect authentication delay",
"type": positive_float}),
("ldap_uri", {
"value": "ldap://localhost",
"help": "URI to the ldap server",
"type": str}),
("ldap_base", {
"value": "none",
"help": "LDAP base DN of the ldap server",
"type": str}),
("ldap_reader_dn", {
"value": "none",
"help": "the DN of a ldap user with read access to get the user accounts",
"type": str}),
("ldap_secret", {
"value": "none",
"help": "the password of the ldap_reader_dn",
"type": str}),
("ldap_filter", {
"value": "(cn={0})",
"help": "the search filter to find the user DN to authenticate by the username",
"type": str}),
("ldap_load_groups", {
"value": "False",
"help": "load the ldap groups of the authenticated user",
"type": bool}),
("strip_domain", {
"value": "False",
"help": "strip domain from username",

View file

@ -32,7 +32,7 @@ Take a look at the class ``BaseRights`` if you want to implement your own.
"""
from typing import Sequence
from typing import Sequence, Set
from radicale import config, utils
@ -57,6 +57,8 @@ def intersect(a: str, b: str) -> str:
class BaseRights:
_user_groups: Set[str] = set([])
def __init__(self, configuration: "config.Configuration") -> None:
"""Initialize BaseRights.

View file

@ -65,24 +65,40 @@ class Rights(rights.BaseRights):
if not self._log_rights_rule_doesnt_match_on_debug:
logger.debug("logging of rules which doesn't match suppressed by config/option [logging] rights_rule_doesnt_match_on_debug")
for section in rights_config.sections():
group_match = False
try:
user_pattern = rights_config.get(section, "user")
collection_pattern = rights_config.get(section, "collection")
allowed_groups = rights_config.get(section, "groups", fallback="").split(",")
try:
group_match = len(self._user_groups.intersection(allowed_groups)) > 0
except Exception:
pass
# Use empty format() for harmonized handling of curly braces
user_match = re.fullmatch(user_pattern.format(), user)
collection_match = user_match and re.fullmatch(
user_collection_match = user_match and re.fullmatch(
collection_pattern.format(
*(re.escape(s) for s in user_match.groups()),
user=escaped_user), sane_path)
group_collection_match = re.fullmatch(collection_pattern.format(user=escaped_user), sane_path)
except Exception as e:
raise RuntimeError("Error in section %r of rights file %r: "
"%s" % (section, self._filename, e)) from e
if user_match and collection_match:
if user_match and user_collection_match:
permission = rights_config.get(section, "permissions")
logger.debug("Rule %r:%r matches %r:%r from section %r permission %r",
user, sane_path, user_pattern,
collection_pattern, section, permission)
return permission
if group_match and group_collection_match:
permission = rights_config.get(section, "permissions")
logger.debug("Rule %r:%r matches %r:%r from section %r permission %r by group membership",
user, sane_path, user_pattern,
collection_pattern, section, permission)
return permission
logger.debug("Rule %r:%r doesn't match %r:%r from section %r",
user, sane_path, user_pattern, collection_pattern,
section)
if self._log_rights_rule_doesnt_match_on_debug:
logger.debug("Rule %r:%r doesn't match %r:%r from section %r",
user, sane_path, user_pattern, collection_pattern,

24
rights
View file

@ -1,5 +1,29 @@
# -*- mode: conf -*-
# vim:ft=cfg
# Allow all rights for the Administrator
#[root]
#user: Administrator
#collection: .*
#permissions: RW
# Allow reading principal collection (same as username)
#[principal]
#user: .+
#collection: {user}
#permissions: R
# Allow reading and writing private collection (same as username)
#[private]
#user: .+
#collection: {user}/private/
#permissions: RW
# Allow reading calendars and address books that are direct
# children of the principal collection for other users
#[calendarsReader]
#user: .+
#collection: {user}/[^/]+
#permissions: r
# Rights management file for Radicale - A simple calendar server
#