diff --git a/radicale/auth/__init__.py b/radicale/auth/__init__.py index 812649c5..9cb70bb8 100644 --- a/radicale/auth/__init__.py +++ b/radicale/auth/__init__.py @@ -29,6 +29,8 @@ Take a look at the class ``BaseAuth`` if you want to implement your own. """ +import hashlib +import time from typing import Sequence, Set, Tuple, Union, final from radicale import config, types, utils @@ -57,6 +59,10 @@ class BaseAuth: _lc_username: bool _uc_username: bool _strip_domain: bool + _cache: dict + _cache_logins: bool + _cache_logins_expiry: int + _cache_logins_expiry_ns: int def __init__(self, configuration: "config.Configuration") -> None: """Initialize BaseAuth. @@ -70,11 +76,27 @@ class BaseAuth: self._lc_username = configuration.get("auth", "lc_username") self._uc_username = configuration.get("auth", "uc_username") self._strip_domain = configuration.get("auth", "strip_domain") + self._cache_logins = configuration.get("auth", "cache_logins") + self._cache_logins_expiry = configuration.get("auth", "cache_logins_expiry") + if self._cache_logins_expiry < 0: + raise RuntimeError("self._cache_logins_expiry cannot be < 0") logger.info("auth.strip_domain: %s", self._strip_domain) logger.info("auth.lc_username: %s", self._lc_username) logger.info("auth.uc_username: %s", self._uc_username) if self._lc_username is True and self._uc_username is True: raise RuntimeError("auth.lc_username and auth.uc_username cannot be enabled together") + logger.info("auth.cache_logins: %s", self._cache_logins) + if self._cache_logins is True: + logger.info("auth.cache_logins_expiry: %s seconds", self._cache_logins_expiry) + self._cache_logins_expiry_ns = self._cache_logins_expiry * 1000 * 1000 * 1000 + self._cache = dict() + + def _cache_digest(self, login: str, password: str, salt: str) -> str: + h = hashlib.sha3_512() + h.update(salt.encode()) + h.update(login.encode()) + h.update(password.encode()) + return h.digest() def get_external_login(self, environ: types.WSGIEnviron) -> Union[ Tuple[()], Tuple[str, str]]: @@ -110,4 +132,36 @@ class BaseAuth: login = login.upper() if self._strip_domain: login = login.split('@')[0] - return self._login(login, password) + if self._cache_logins is True: + # time_ns is also used as salt + result = "" + digest = "" + time_ns = time.time_ns() + if self._cache.get(login): + # entry found in cache + (digest_cache, time_ns_cache) = self._cache[login] + digest = self._cache_digest(login, password, str(time_ns_cache)) + if digest == digest_cache: + if (time_ns - time_ns_cache) > self._cache_logins_expiry_ns: + logger.debug("Login cache entry for user found but expired: '%s'", login) + digest = "" + else: + logger.debug("Login cache entry for user found: '%s'", login) + result = login + else: + logger.debug("Login cache entry for user not matching: '%s'", login) + else: + # entry not found in cache, caculate always to avoid timing attacks + digest = self._cache_digest(login, password, str(time_ns)) + if result == "": + result = self._login(login, password) + if result is not "": + if digest is "": + # successful login, but expired, digest must be recalculated + digest = self._cache_digest(login, password, str(time_ns)) + # store successful login in cache + self._cache[login] = (digest, time_ns) + logger.debug("Login cache for user set: '%s'", login) + return result + else: + return self._login(login, password)