implement cache_logins* option

This commit is contained in:
Peter Bieringer 2024-12-30 08:17:15 +01:00
parent 74311560c9
commit 8e97b709bf

View file

@ -29,6 +29,8 @@ Take a look at the class ``BaseAuth`` if you want to implement your own.
""" """
import hashlib
import time
from typing import Sequence, Set, Tuple, Union, final from typing import Sequence, Set, Tuple, Union, final
from radicale import config, types, utils from radicale import config, types, utils
@ -57,6 +59,10 @@ class BaseAuth:
_lc_username: bool _lc_username: bool
_uc_username: bool _uc_username: bool
_strip_domain: bool _strip_domain: bool
_cache: dict
_cache_logins: bool
_cache_logins_expiry: int
_cache_logins_expiry_ns: int
def __init__(self, configuration: "config.Configuration") -> None: def __init__(self, configuration: "config.Configuration") -> None:
"""Initialize BaseAuth. """Initialize BaseAuth.
@ -70,11 +76,27 @@ class BaseAuth:
self._lc_username = configuration.get("auth", "lc_username") self._lc_username = configuration.get("auth", "lc_username")
self._uc_username = configuration.get("auth", "uc_username") self._uc_username = configuration.get("auth", "uc_username")
self._strip_domain = configuration.get("auth", "strip_domain") self._strip_domain = configuration.get("auth", "strip_domain")
self._cache_logins = configuration.get("auth", "cache_logins")
self._cache_logins_expiry = configuration.get("auth", "cache_logins_expiry")
if self._cache_logins_expiry < 0:
raise RuntimeError("self._cache_logins_expiry cannot be < 0")
logger.info("auth.strip_domain: %s", self._strip_domain) logger.info("auth.strip_domain: %s", self._strip_domain)
logger.info("auth.lc_username: %s", self._lc_username) logger.info("auth.lc_username: %s", self._lc_username)
logger.info("auth.uc_username: %s", self._uc_username) logger.info("auth.uc_username: %s", self._uc_username)
if self._lc_username is True and self._uc_username is True: if self._lc_username is True and self._uc_username is True:
raise RuntimeError("auth.lc_username and auth.uc_username cannot be enabled together") raise RuntimeError("auth.lc_username and auth.uc_username cannot be enabled together")
logger.info("auth.cache_logins: %s", self._cache_logins)
if self._cache_logins is True:
logger.info("auth.cache_logins_expiry: %s seconds", self._cache_logins_expiry)
self._cache_logins_expiry_ns = self._cache_logins_expiry * 1000 * 1000 * 1000
self._cache = dict()
def _cache_digest(self, login: str, password: str, salt: str) -> str:
h = hashlib.sha3_512()
h.update(salt.encode())
h.update(login.encode())
h.update(password.encode())
return h.digest()
def get_external_login(self, environ: types.WSGIEnviron) -> Union[ def get_external_login(self, environ: types.WSGIEnviron) -> Union[
Tuple[()], Tuple[str, str]]: Tuple[()], Tuple[str, str]]:
@ -110,4 +132,36 @@ class BaseAuth:
login = login.upper() login = login.upper()
if self._strip_domain: if self._strip_domain:
login = login.split('@')[0] login = login.split('@')[0]
if self._cache_logins is True:
# time_ns is also used as salt
result = ""
digest = ""
time_ns = time.time_ns()
if self._cache.get(login):
# entry found in cache
(digest_cache, time_ns_cache) = self._cache[login]
digest = self._cache_digest(login, password, str(time_ns_cache))
if digest == digest_cache:
if (time_ns - time_ns_cache) > self._cache_logins_expiry_ns:
logger.debug("Login cache entry for user found but expired: '%s'", login)
digest = ""
else:
logger.debug("Login cache entry for user found: '%s'", login)
result = login
else:
logger.debug("Login cache entry for user not matching: '%s'", login)
else:
# entry not found in cache, caculate always to avoid timing attacks
digest = self._cache_digest(login, password, str(time_ns))
if result == "":
result = self._login(login, password)
if result is not "":
if digest is "":
# successful login, but expired, digest must be recalculated
digest = self._cache_digest(login, password, str(time_ns))
# store successful login in cache
self._cache[login] = (digest, time_ns)
logger.debug("Login cache for user set: '%s'", login)
return result
else:
return self._login(login, password) return self._login(login, password)