improve code/adjustments

This commit is contained in:
Peter Bieringer 2025-01-03 07:14:32 +01:00
parent 5a00baab3f
commit a9f2e6fe7b

View file

@ -90,7 +90,7 @@ class BaseAuth:
raise RuntimeError("auth.lc_username and auth.uc_username cannot be enabled together")
self._auth_delay = configuration.get("auth", "delay")
logger.info("auth.delay: %f", self._auth_delay)
self._failed_auth_delay = self._auth_delay
self._failed_auth_delay = 0
# cache_successful_logins
self._cache_logins = configuration.get("auth", "cache_logins")
self._type = configuration.get("auth", "type")
@ -147,18 +147,33 @@ class BaseAuth:
raise NotImplementedError
def _sleep(self, time_ns_begin):
"""Sleep some time to reach a constant execution time finally
def _sleep_for_constant_exec_time(self, time_ns_begin):
"""Sleep some time to reach a constant execution time for failed logins
Independent of time required by external backend or used digest methods
Increase final execution time in case initial limit exceeded
See also issue 591
"""
time_delta = (time.time_ns() - time_ns_begin) / 1000 / 1000 / 1000
if time_delta > self._failed_auth_delay:
logger.debug("Increase failed auth_delay %.3f -> %.3f seconds", self._failed_auth_delay, time_delta)
with self._lock:
self._failed_auth_delay = time_delta
sleep = self._failed_auth_delay - time_delta
logger.debug("Sleeping %.3f seconds", sleep)
time.sleep(sleep)
with self._lock:
# avoid that another thread is changing global value at the same time
failed_auth_delay = self._failed_auth_delay
failed_auth_delay_old = failed_auth_delay
if time_delta > failed_auth_delay:
# set new
failed_auth_delay = time_delta
# store globally
self._failed_auth_delay = failed_auth_delay
if (failed_auth_delay_old != failed_auth_delay):
logger.debug("Failed login constant execution time need increase of failed_auth_delay: %.9f -> %.9f sec", failed_auth_delay_old, failed_auth_delay)
# sleep == 0
else:
sleep = failed_auth_delay - time_delta
logger.debug("Failed login constant exection time alignment, sleeping: %.9f sec", sleep)
time.sleep(sleep)
@final
def login(self, login: str, password: str) -> Tuple[str, str]:
@ -202,7 +217,7 @@ class BaseAuth:
(time_ns_cache, login_cache) = self._cache_failed[digest]
age_failed = int((time_ns - time_ns_cache) / 1000 / 1000 / 1000)
logger.debug("Login failed cache entry for user+password found: '%s' (age: %d sec)", login_cache, age_failed)
self._sleep(time_ns_begin)
self._sleep_for_constant_exec_time(time_ns_begin)
return ("", self._type + " / cached")
if self._cache_successful.get(login):
# login found in cache "successful"
@ -249,15 +264,15 @@ class BaseAuth:
logger.debug("Login failed cache for user set: '%s'", login)
if result_from_cache is True:
if result == "":
self._sleep(time_ns_begin)
self._sleep_for_constant_exec_time(time_ns_begin)
return (result, self._type + " / cached")
else:
if result == "":
self._sleep(time_ns_begin)
self._sleep_for_constant_exec_time(time_ns_begin)
return (result, self._type)
else:
# self._cache_logins is False
result = self._login(login, password)
if result == "":
self._sleep(time_ns_begin)
self._sleep_for_constant_exec_time(time_ns_begin)
return (result, self._type)