Merge pull request #1610 from sysnux/master

Rebase galaxy4public patch on top of bf4f5834
This commit is contained in:
Peter Bieringer 2024-10-31 21:08:12 +01:00 committed by GitHub
commit 56c375fca2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 306 additions and 1 deletions

View file

@ -746,6 +746,9 @@ Available backends:
`ldap` `ldap`
: Use a LDAP or AD server to authenticate users. : Use a LDAP or AD server to authenticate users.
`dovecot`
: Use a local Dovecot server to authenticate users.
Default: `none` Default: `none`
##### htpasswd_filename ##### htpasswd_filename
@ -858,6 +861,12 @@ The path to the CA file in pem format which is used to certificate the server ce
Default: Default:
##### dovecot_socket
The path to the Dovecot client authentication socket (eg. /run/dovecot/auth-client on Fedora). Radicale must have read / write access to the socket.
Default:
##### lc_username ##### lc_username
Сonvert username to lowercase, must be true for case-insensitive auth Сonvert username to lowercase, must be true for case-insensitive auth

View file

@ -37,7 +37,8 @@ from radicale.log import logger
INTERNAL_TYPES: Sequence[str] = ("none", "remote_user", "http_x_remote_user", INTERNAL_TYPES: Sequence[str] = ("none", "remote_user", "http_x_remote_user",
"denyall", "denyall",
"htpasswd", "htpasswd",
"ldap") "ldap",
"dovecot")
def load(configuration: "config.Configuration") -> "BaseAuth": def load(configuration: "config.Configuration") -> "BaseAuth":

178
radicale/auth/dovecot.py Normal file
View file

@ -0,0 +1,178 @@
# This file is part of Radicale Server - Calendar Server
# Copyright © 2014 Giel van Schijndel
# Copyright © 2019 (GalaxyMaster)
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Radicale. If not, see <http://www.gnu.org/licenses/>.
import base64
import itertools
import os
import socket
from contextlib import closing
from radicale import auth
from radicale.log import logger
class Auth(auth.BaseAuth):
def __init__(self, configuration):
super().__init__(configuration)
self.socket = configuration.get("auth", "dovecot_socket")
self.timeout = 5
self.request_id_gen = itertools.count(1)
def login(self, login, password):
"""Validate credentials.
Check if the ``login``/``password`` pair is valid according to Dovecot.
This implementation communicates with a Dovecot server through the
Dovecot Authentication Protocol v1.1.
https://dovecot.org/doc/auth-protocol.txt
"""
logger.info("Authentication request (dovecot): '{}'".format(login))
if not login or not password:
return ""
with closing(socket.socket(
socket.AF_UNIX,
socket.SOCK_STREAM)
) as sock:
try:
sock.settimeout(self.timeout)
sock.connect(self.socket)
buf = bytes()
supported_mechs = []
done = False
seen_part = [0, 0, 0]
# Upon the initial connection we only care about the
# handshake, which is usually just around 100 bytes long,
# e.g.
#
# VERSION 1 2
# MECH PLAIN plaintext
# SPID 22901
# CUID 1
# COOKIE 2dbe4116a30fb4b8a8719f4448420af7
# DONE
#
# Hence, we try to read just once with a buffer big
# enough to hold all of it.
buf = sock.recv(1024)
while b'\n' in buf and not done:
line, buf = buf.split(b'\n', 1)
parts = line.split(b'\t')
first, parts = parts[0], parts[1:]
if first == b'VERSION':
if seen_part[0]:
logger.warning(
"Server presented multiple VERSION "
"tokens, ignoring"
)
continue
version = parts
logger.debug("Dovecot server version: '{}'".format(
(b'.'.join(version)).decode()
))
if int(version[0]) != 1:
logger.fatal(
"Only Dovecot 1.x versions are supported!"
)
return ""
seen_part[0] += 1
elif first == b'MECH':
supported_mechs.append(parts[0])
seen_part[1] += 1
elif first == b'DONE':
seen_part[2] += 1
if not (seen_part[0] and seen_part[1]):
logger.fatal(
"An unexpected end of the server "
"handshake received!"
)
return ""
done = True
if not done:
logger.fatal("Encountered a broken server handshake!")
return ""
logger.debug(
"Supported auth methods: '{}'"
.format((b"', '".join(supported_mechs)).decode())
)
if b'PLAIN' not in supported_mechs:
logger.info(
"Authentication method 'PLAIN' is not supported, "
"but is required!"
)
return ""
# Handshake
logger.debug("Sending auth handshake")
sock.send(b'VERSION\t1\t1\n')
sock.send(b'CPID\t%u\n' % os.getpid())
request_id = next(self.request_id_gen)
logger.debug(
"Authenticating with request id: '{}'"
.format(request_id)
)
sock.send(
b'AUTH\t%u\tPLAIN\tservice=radicale\tresp=%b\n' %
(
request_id, base64.b64encode(
b'\0%b\0%b' %
(login.encode(), password.encode())
)
)
)
logger.debug("Processing auth response")
buf = sock.recv(1024)
line = buf.split(b'\n', 1)[0]
parts = line.split(b'\t')[:2]
resp, reply_id, params = (
parts[0], int(parts[1]),
dict(part.split('=', 1) for part in parts[2:])
)
logger.debug(
"Auth response: result='{}', id='{}', parameters={}"
.format(resp.decode(), reply_id, params)
)
if request_id != reply_id:
logger.fatal(
"Unexpected reply ID {} received (expected {})"
.format(
reply_id, request_id
)
)
return ""
if resp == b'OK':
return login
except socket.error as e:
logger.fatal(
"Failed to communicate with Dovecot socket %r: %s" %
(self.socket, e)
)
return ""

View file

@ -183,6 +183,10 @@ DEFAULT_CONFIG_SCHEMA: types.CONFIG_SCHEMA = OrderedDict([
"value": "autodetect", "value": "autodetect",
"help": "htpasswd encryption method", "help": "htpasswd encryption method",
"type": str}), "type": str}),
("dovecot_socket", {
"value": "/var/run/dovecot/auth-client",
"help": "dovecot auth socket",
"type": str}),
("realm", { ("realm", {
"value": "Radicale - Password Required", "value": "Radicale - Password Required",
"help": "message displayed when a password is needed", "help": "message displayed when a password is needed",

View file

@ -22,6 +22,7 @@ Radicale tests with simple requests and authentication.
""" """
import base64
import os import os
import sys import sys
from typing import Iterable, Tuple, Union from typing import Iterable, Tuple, Union
@ -159,6 +160,118 @@ class TestBaseAuthRequests(BaseTest):
href_element = prop.find(xmlutils.make_clark("D:href")) href_element = prop.find(xmlutils.make_clark("D:href"))
assert href_element is not None and href_element.text == "/test/" assert href_element is not None and href_element.text == "/test/"
@pytest.mark.skipif(sys.platform == 'win32', reason="Not supported on Windows")
def _test_dovecot(
self, user, password, expected_status,
response=b'FAIL\n1\n', mech=[b'PLAIN'], broken=None):
import socket
from unittest.mock import DEFAULT, patch
self.configure({"auth": {"type": "dovecot",
"dovecot_socket": "./dovecot.sock"}})
if broken is None:
broken = []
handshake = b''
if "version" not in broken:
handshake += b'VERSION\t'
if "incompatible" in broken:
handshake += b'2'
else:
handshake += b'1'
handshake += b'\t2\n'
if "mech" not in broken:
handshake += b'MECH\t%b\n' % b' '.join(mech)
if "duplicate" in broken:
handshake += b'VERSION\t1\t2\n'
if "done" not in broken:
handshake += b'DONE\n'
with patch.multiple(
'socket.socket',
connect=DEFAULT,
send=DEFAULT,
recv=DEFAULT
) as mock_socket:
if "socket" in broken:
mock_socket["connect"].side_effect = socket.error(
"Testing error with the socket"
)
mock_socket["recv"].side_effect = [handshake, response]
status, _, answer = self.request(
"PROPFIND", "/",
HTTP_AUTHORIZATION="Basic %s" % base64.b64encode(
("%s:%s" % (user, password)).encode()).decode())
assert status == expected_status
@pytest.mark.skipif(sys.platform == 'win32', reason="Not supported on Windows")
def test_dovecot_no_user(self):
self._test_dovecot("", "", 401)
@pytest.mark.skipif(sys.platform == 'win32', reason="Not supported on Windows")
def test_dovecot_no_password(self):
self._test_dovecot("user", "", 401)
@pytest.mark.skipif(sys.platform == 'win32', reason="Not supported on Windows")
def test_dovecot_broken_handshake_no_version(self):
self._test_dovecot("user", "password", 401, broken=["version"])
@pytest.mark.skipif(sys.platform == 'win32', reason="Not supported on Windows")
def test_dovecot_broken_handshake_incompatible(self):
self._test_dovecot("user", "password", 401, broken=["incompatible"])
@pytest.mark.skipif(sys.platform == 'win32', reason="Not supported on Windows")
def test_dovecot_broken_handshake_duplicate(self):
self._test_dovecot(
"user", "password", 207, response=b'OK\t1',
broken=["duplicate"]
)
@pytest.mark.skipif(sys.platform == 'win32', reason="Not supported on Windows")
def test_dovecot_broken_handshake_no_mech(self):
self._test_dovecot("user", "password", 401, broken=["mech"])
@pytest.mark.skipif(sys.platform == 'win32', reason="Not supported on Windows")
def test_dovecot_broken_handshake_unsupported_mech(self):
self._test_dovecot("user", "password", 401, mech=[b'ONE', b'TWO'])
@pytest.mark.skipif(sys.platform == 'win32', reason="Not supported on Windows")
def test_dovecot_broken_handshake_no_done(self):
self._test_dovecot("user", "password", 401, broken=["done"])
@pytest.mark.skipif(sys.platform == 'win32', reason="Not supported on Windows")
def test_dovecot_broken_socket(self):
self._test_dovecot("user", "password", 401, broken=["socket"])
@pytest.mark.skipif(sys.platform == 'win32', reason="Not supported on Windows")
def test_dovecot_auth_good1(self):
self._test_dovecot("user", "password", 207, response=b'OK\t1')
@pytest.mark.skipif(sys.platform == 'win32', reason="Not supported on Windows")
def test_dovecot_auth_good2(self):
self._test_dovecot(
"user", "password", 207, response=b'OK\t1',
mech=[b'PLAIN\nEXTRA\tTERM']
)
self._test_dovecot("user", "password", 207, response=b'OK\t1')
@pytest.mark.skipif(sys.platform == 'win32', reason="Not supported on Windows")
def test_dovecot_auth_bad1(self):
self._test_dovecot("user", "password", 401, response=b'FAIL\t1')
@pytest.mark.skipif(sys.platform == 'win32', reason="Not supported on Windows")
def test_dovecot_auth_bad2(self):
self._test_dovecot("user", "password", 401, response=b'CONT\t1')
@pytest.mark.skipif(sys.platform == 'win32', reason="Not supported on Windows")
def test_dovecot_auth_id_mismatch(self):
self._test_dovecot("user", "password", 401, response=b'OK\t2')
def test_custom(self) -> None: def test_custom(self) -> None:
"""Custom authentication.""" """Custom authentication."""
self.configure({"auth": {"type": "radicale.tests.custom.auth"}}) self.configure({"auth": {"type": "radicale.tests.custom.auth"}})