add variable sleep to have a constant execution time on failed login

This commit is contained in:
Peter Bieringer 2025-01-02 22:33:54 +01:00
parent 6f0ac545f0
commit 0d43a49ffb

View file

@ -60,6 +60,8 @@ class BaseAuth:
_lc_username: bool
_uc_username: bool
_strip_domain: bool
_auth_delay: float
_failed_auth_delay: float
_type: str
_cache_logins: bool
_cache_successful: dict # login -> (digest, time_ns)
@ -86,6 +88,9 @@ class BaseAuth:
logger.info("auth.uc_username: %s", self._uc_username)
if self._lc_username is True and self._uc_username is True:
raise RuntimeError("auth.lc_username and auth.uc_username cannot be enabled together")
self._auth_delay = configuration.get("auth", "delay")
logger.info("auth.delay: %f", self._auth_delay)
self._failed_auth_delay = self._auth_delay
# cache_successful_logins
self._cache_logins = configuration.get("auth", "cache_logins")
self._type = configuration.get("auth", "type")
@ -142,8 +147,22 @@ class BaseAuth:
raise NotImplementedError
def _sleep(self, time_ns_begin):
"""Sleep some time to reach a constant execution time finally
Increase final execution time in case initial limit exceeded
"""
time_delta = (time.time_ns() - time_ns_begin) / 1000 / 1000 / 1000
if time_delta > self._failed_auth_delay:
logger.debug("Increase failed auth_delay %.3f -> %.3f seconds", self._failed_auth_delay, time_delta)
with self._lock:
self._failed_auth_delay = time_delta
sleep = self._failed_auth_delay - time_delta
logger.debug("Sleeping %.3f seconds", sleep)
time.sleep(sleep)
@final
def login(self, login: str, password: str) -> Tuple[str, str]:
time_ns_begin = time.time_ns()
result_from_cache = False
if self._lc_username:
login = login.lower()
@ -183,6 +202,7 @@ class BaseAuth:
(time_ns_cache, login_cache) = self._cache_failed[digest]
age_failed = int((time_ns - time_ns_cache) / 1000 / 1000 / 1000)
logger.debug("Login failed cache entry for user+password found: '%s' (age: %d sec)", login_cache, age_failed)
self._sleep(time_ns_begin)
return ("", self._type + " / cached")
if self._cache_successful.get(login):
# login found in cache "successful"
@ -228,8 +248,16 @@ class BaseAuth:
self._lock.release()
logger.debug("Login failed cache for user set: '%s'", login)
if result_from_cache is True:
if result == "":
self._sleep(time_ns_begin)
return (result, self._type + " / cached")
else:
if result == "":
self._sleep(time_ns_begin)
return (result, self._type)
else:
return (self._login(login, password), self._type)
# self._cache_logins is False
result = self._login(login, password)
if result == "":
self._sleep(time_ns_begin)
return (result, self._type)