Merge pull request #1731 from pbiering/add-remote-auth-warn-if-not-loopback

Add remote auth warn if not loopback
This commit is contained in:
Peter Bieringer 2025-03-14 21:55:42 +01:00 committed by GitHub
commit dc56d67c33
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 60 additions and 27 deletions

View file

@ -17,6 +17,7 @@
* Improve: log client IP on SSL error and SSL protocol+cipher if successful * Improve: log client IP on SSL error and SSL protocol+cipher if successful
* Improve: catch htpasswd hash verification errors * Improve: catch htpasswd hash verification errors
* Improve: add support for more bcrypt algos on autodetection, extend logging for autodetection fallback to PLAIN in case of hash length is not matching * Improve: add support for more bcrypt algos on autodetection, extend logging for autodetection fallback to PLAIN in case of hash length is not matching
* Add: warning in case of started standalone and not listen on loopback interface but trusting external authentication
## 3.4.1 ## 3.4.1
* Add: option [auth] dovecot_connection_type / dovecot_host / dovecot_port * Add: option [auth] dovecot_connection_type / dovecot_host / dovecot_port

View file

@ -506,7 +506,9 @@ RequestHeader set X-Remote-User expr=%{REMOTE_USER}
``` ```
> **Security:** Untrusted clients should not be able to access the Radicale > **Security:** Untrusted clients should not be able to access the Radicale
> server directly. Otherwise, they can authenticate as any user. > server directly. Otherwise, they can authenticate as any user by simply
> setting related HTTP header. This can be prevented by restrict listen to
> loopback interface only or at least a local firewall rule.
#### Secure connection between Radicale and the reverse proxy #### Secure connection between Radicale and the reverse proxy

View file

@ -3,4 +3,8 @@ Radicale WSGI file (mod_wsgi and uWSGI compliant).
""" """
import os
from radicale import application from radicale import application
# set an environment variable
os.environ.setdefault('SERVER_GATEWAY_INTERFACE', 'Web')

View file

@ -30,9 +30,10 @@ Take a look at the class ``BaseAuth`` if you want to implement your own.
""" """
import hashlib import hashlib
import os
import threading import threading
import time import time
from typing import Sequence, Set, Tuple, Union, final from typing import List, Sequence, Set, Tuple, Union, final
from radicale import config, types, utils from radicale import config, types, utils
from radicale.log import logger from radicale.log import logger
@ -55,15 +56,36 @@ CACHE_LOGIN_TYPES: Sequence[str] = (
"pam", "pam",
) )
INSECURE_IF_NO_LOOPBACK_TYPES: Sequence[str] = (
"remote_user",
"http_x_remote_user",
)
AUTH_SOCKET_FAMILY: Sequence[str] = ("AF_UNIX", "AF_INET", "AF_INET6") AUTH_SOCKET_FAMILY: Sequence[str] = ("AF_UNIX", "AF_INET", "AF_INET6")
def load(configuration: "config.Configuration") -> "BaseAuth": def load(configuration: "config.Configuration") -> "BaseAuth":
"""Load the authentication module chosen in configuration.""" """Load the authentication module chosen in configuration."""
if configuration.get("auth", "type") == "none": _type = configuration.get("auth", "type")
logger.warning("No user authentication is selected: '[auth] type=none' (insecure)") if _type == "none":
if configuration.get("auth", "type") == "denyall": logger.warning("No user authentication is selected: '[auth] type=none' (INSECURE)")
logger.warning("All access is blocked by: '[auth] type=denyall'") elif _type == "denyall":
logger.warning("All user authentication is blocked by: '[auth] type=denyall'")
elif _type in INSECURE_IF_NO_LOOPBACK_TYPES:
sgi = os.environ.get('SERVER_GATEWAY_INTERFACE') or None
if not sgi:
hosts: List[Tuple[str, int]] = configuration.get("server", "hosts")
localhost_only = True
address_lo = []
address = []
for address_port in hosts:
if address_port[0] in ["localhost", "localhost6", "127.0.0.1", "::1"]:
address_lo.append(utils.format_address(address_port))
else:
address.append(utils.format_address(address_port))
localhost_only = False
if localhost_only is False:
logger.warning("User authentication '[auth] type=%s' is selected but server is not only listen on loopback address (potentially INSECURE): %s", _type, " ".join(address))
return utils.load_plugin(INTERNAL_TYPES, "auth", "Auth", BaseAuth, return utils.load_plugin(INTERNAL_TYPES, "auth", "Auth", BaseAuth,
configuration) configuration)

View file

@ -2,7 +2,7 @@
# Copyright © 2008 Nicolas Kandel # Copyright © 2008 Nicolas Kandel
# Copyright © 2008 Pascal Halter # Copyright © 2008 Pascal Halter
# Copyright © 2008-2017 Guillaume Ayoub # Copyright © 2008-2017 Guillaume Ayoub
# Copyright © 2017-2018 Unrud <unrud@outlook.com> # Copyright © 2017-2021 Unrud <unrud@outlook.com>
# #
# This library is free software: you can redistribute it and/or modify # This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by # it under the terms of the GNU General Public License as published by

View file

@ -2,7 +2,7 @@
# Copyright © 2008 Nicolas Kandel # Copyright © 2008 Nicolas Kandel
# Copyright © 2008 Pascal Halter # Copyright © 2008 Pascal Halter
# Copyright © 2008-2017 Guillaume Ayoub # Copyright © 2008-2017 Guillaume Ayoub
# Copyright © 2017-2018 Unrud <unrud@outlook.com> # Copyright © 2017-2021 Unrud <unrud@outlook.com>
# #
# This library is free software: you can redistribute it and/or modify # This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by # it under the terms of the GNU General Public License as published by

View file

@ -58,19 +58,7 @@ elif sys.platform == "win32":
# IPv4 (host, port) and IPv6 (host, port, flowinfo, scopeid) # IPv4 (host, port) and IPv6 (host, port, flowinfo, scopeid)
ADDRESS_TYPE = Union[Tuple[Union[str, bytes, bytearray], int], ADDRESS_TYPE = utils.ADDRESS_TYPE
Tuple[str, int, int, int]]
def format_address(address: ADDRESS_TYPE) -> str:
host, port, *_ = address
if not isinstance(host, str):
raise NotImplementedError("Unsupported address format: %r" %
(address,))
if host.find(":") == -1:
return "%s:%d" % (host, port)
else:
return "[%s]:%d" % (host, port)
class ParallelHTTPServer(socketserver.ThreadingMixIn, class ParallelHTTPServer(socketserver.ThreadingMixIn,
@ -321,20 +309,20 @@ def serve(configuration: config.Configuration,
try: try:
getaddrinfo = socket.getaddrinfo(address_port[0], address_port[1], 0, socket.SOCK_STREAM, socket.IPPROTO_TCP) getaddrinfo = socket.getaddrinfo(address_port[0], address_port[1], 0, socket.SOCK_STREAM, socket.IPPROTO_TCP)
except OSError as e: except OSError as e:
logger.warning("cannot retrieve IPv4 or IPv6 address of '%s': %s" % (format_address(address_port), e)) logger.warning("cannot retrieve IPv4 or IPv6 address of '%s': %s" % (utils.format_address(address_port), e))
continue continue
logger.debug("getaddrinfo of '%s': %s" % (format_address(address_port), getaddrinfo)) logger.debug("getaddrinfo of '%s': %s" % (utils.format_address(address_port), getaddrinfo))
for (address_family, socket_kind, socket_proto, socket_flags, socket_address) in getaddrinfo: for (address_family, socket_kind, socket_proto, socket_flags, socket_address) in getaddrinfo:
logger.debug("try to create server socket on '%s'" % (format_address(socket_address))) logger.debug("try to create server socket on '%s'" % (utils.format_address(socket_address)))
try: try:
server = server_class(configuration, address_family, (socket_address[0], socket_address[1]), RequestHandler) server = server_class(configuration, address_family, (socket_address[0], socket_address[1]), RequestHandler)
except OSError as e: except OSError as e:
logger.warning("cannot create server socket on '%s': %s" % (format_address(socket_address), e)) logger.warning("cannot create server socket on '%s': %s" % (utils.format_address(socket_address), e))
continue continue
servers[server.socket] = server servers[server.socket] = server
server.set_app(application) server.set_app(application)
logger.info("Listening on %r%s", logger.info("Listening on %r%s",
format_address(server.server_address), utils.format_address(server.server_address),
" with SSL" if use_ssl else "") " with SSL" if use_ssl else "")
if not servers: if not servers:
raise RuntimeError("No servers started") raise RuntimeError("No servers started")

View file

@ -20,7 +20,7 @@
import ssl import ssl
import sys import sys
from importlib import import_module, metadata from importlib import import_module, metadata
from typing import Callable, Sequence, Type, TypeVar, Union from typing import Callable, Sequence, Tuple, Type, TypeVar, Union
from radicale import config from radicale import config
from radicale.log import logger from radicale.log import logger
@ -36,6 +36,11 @@ RADICALE_MODULES: Sequence[str] = ("radicale", "vobject", "passlib", "defusedxml
"pam") "pam")
# IPv4 (host, port) and IPv6 (host, port, flowinfo, scopeid)
ADDRESS_TYPE = Union[Tuple[Union[str, bytes, bytearray], int],
Tuple[str, int, int, int]]
def load_plugin(internal_types: Sequence[str], module_name: str, def load_plugin(internal_types: Sequence[str], module_name: str,
class_name: str, base_class: Type[_T_co], class_name: str, base_class: Type[_T_co],
configuration: "config.Configuration") -> _T_co: configuration: "config.Configuration") -> _T_co:
@ -74,6 +79,17 @@ def packages_version():
return " ".join(versions) return " ".join(versions)
def format_address(address: ADDRESS_TYPE) -> str:
host, port, *_ = address
if not isinstance(host, str):
raise NotImplementedError("Unsupported address format: %r" %
(address,))
if host.find(":") == -1:
return "%s:%d" % (host, port)
else:
return "[%s]:%d" % (host, port)
def ssl_context_options_by_protocol(protocol: str, ssl_context_options): def ssl_context_options_by_protocol(protocol: str, ssl_context_options):
logger.debug("SSL protocol string: '%s' and current SSL context options: '0x%x'", protocol, ssl_context_options) logger.debug("SSL protocol string: '%s' and current SSL context options: '0x%x'", protocol, ssl_context_options)
# disable any protocol by default # disable any protocol by default