From 8bd37ec0e346378ba7d89a6b7ae332d45afe888b Mon Sep 17 00:00:00 2001 From: Sven Augustin Date: Thu, 14 May 2026 18:41:18 +0200 Subject: [PATCH] first try on GSSAPI-based LDAP access via Kerberos, PSI LDAP specifics and a fake alternative for testing --- auth/fakeldap.py | 23 +++++++++++++++++ auth/krbldap.py | 26 ++++++++++++++++++++ auth/psildap.py | 64 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 113 insertions(+) create mode 100644 auth/fakeldap.py create mode 100644 auth/krbldap.py create mode 100644 auth/psildap.py diff --git a/auth/fakeldap.py b/auth/fakeldap.py new file mode 100644 index 0000000..6bf5c6f --- /dev/null +++ b/auth/fakeldap.py @@ -0,0 +1,23 @@ +PASSWORDS = { + "a": "a", + "b": "b", + "c": "c" +} + +PGROUPS = { + "a": {"p11111", "p22222"}, + "b": {"p33333"} +} + + +def get_data(username, password): + if PASSWORDS.get(username) != password: + raise RuntimeError("authentication failed") + + return { + "username": username, + "pgroups": PGROUPS.get(username, set()) + } + + + diff --git a/auth/krbldap.py b/auth/krbldap.py new file mode 100644 index 0000000..2310694 --- /dev/null +++ b/auth/krbldap.py @@ -0,0 +1,26 @@ +import gssapi +from ldap3 import GSSAPI, NONE, ROUND_ROBIN, SASL, Connection, Server, ServerPool + + +def Konnection(hosts, username, password): + server_pool = make_server_pool(hosts) + creds = make_creds(username, password) + return Connection( + server_pool, + authentication=SASL, + sasl_mechanism=GSSAPI, + sasl_credentials=(None, None, creds) + ) + + +def make_server_pool(hosts): + servers = [Server(host, get_info=NONE) for host in hosts] + return ServerPool(servers, ROUND_ROBIN, active=True, exhaust=True) + + +def make_creds(username, password): + user = gssapi.Name(base=username, name_type=gssapi.NameType.user) + return gssapi.raw.acquire_cred_with_password(user, password.encode()).creds + + + diff --git a/auth/psildap.py b/auth/psildap.py new file mode 100644 index 0000000..c5b7e34 --- /dev/null +++ b/auth/psildap.py @@ -0,0 +1,64 @@ +import re + +from ldap3 import BASE + +from krbldap import Konnection + + +ATTRIBUTE_MAP = { + "cn": "username", + "displayName": "display name", + "givenName": "first name", +# "mail": "email", +# "sn": "last name" +} + +ATTRIBUTES = [*ATTRIBUTE_MAP, "memberOf"] +COMMON_DN = "OU=users,OU=psi,DC=d,DC=psi,DC=ch" +PGROUP_PATTERN = re.compile(r"CN=(p\d{5}),") +SERVERS = [f"dc{i:02}.d.psi.ch" for i in range(3)] + + +def get_data(username, password): + with Konnection(SERVERS, username, password) as conn: + conn.search( + make_search_base(username), + search_filter="(objectClass=*)", + search_scope=BASE, + attributes=ATTRIBUTES + ) + + entries = conn.entries + + n_entries = len(entries) + if n_entries != 1: + raise RuntimeError(f"received ambiguous result ({n_entries} entries)") + + entry = entries[0] + res = repack(entry) + + res_username = res["username"] + if res_username != username: + raise RuntimeError(f"username mismatch: {res_username} != {username}") + + return res + + +def make_search_base(cn): + res = [f"CN={cn}"] + if cn.startswith("ext-"): + res.append("OU=external") + elif cn.startswith("gac-"): + res.append("OU=nonpersons") + res.append(COMMON_DN) + return ",".join(res) + + +def repack(entry): + attrs = entry.entry_attributes_as_dict + res = {ATTRIBUTE_MAP[old]: item[0] for old, item in attrs.items() if old in ATTRIBUTE_MAP if item} + res["pgroups"] = sorted(m.group(1) for i in attrs["memberOf"] if (m := PGROUP_PATTERN.search(i))) + return res + + +