first try on GSSAPI-based LDAP access via Kerberos, PSI LDAP specifics and a fake alternative for testing

This commit is contained in:
2026-05-14 18:41:18 +02:00
parent f030247b61
commit 8bd37ec0e3
3 changed files with 113 additions and 0 deletions
+23
View File
@@ -0,0 +1,23 @@
PASSWORDS = {
"a": "a",
"b": "b",
"c": "c"
}
PGROUPS = {
"a": {"p11111", "p22222"},
"b": {"p33333"}
}
def get_data(username, password):
if PASSWORDS.get(username) != password:
raise RuntimeError("authentication failed")
return {
"username": username,
"pgroups": PGROUPS.get(username, set())
}
+26
View File
@@ -0,0 +1,26 @@
import gssapi
from ldap3 import GSSAPI, NONE, ROUND_ROBIN, SASL, Connection, Server, ServerPool
def Konnection(hosts, username, password):
server_pool = make_server_pool(hosts)
creds = make_creds(username, password)
return Connection(
server_pool,
authentication=SASL,
sasl_mechanism=GSSAPI,
sasl_credentials=(None, None, creds)
)
def make_server_pool(hosts):
servers = [Server(host, get_info=NONE) for host in hosts]
return ServerPool(servers, ROUND_ROBIN, active=True, exhaust=True)
def make_creds(username, password):
user = gssapi.Name(base=username, name_type=gssapi.NameType.user)
return gssapi.raw.acquire_cred_with_password(user, password.encode()).creds
+64
View File
@@ -0,0 +1,64 @@
import re
from ldap3 import BASE
from krbldap import Konnection
ATTRIBUTE_MAP = {
"cn": "username",
"displayName": "display name",
"givenName": "first name",
# "mail": "email",
# "sn": "last name"
}
ATTRIBUTES = [*ATTRIBUTE_MAP, "memberOf"]
COMMON_DN = "OU=users,OU=psi,DC=d,DC=psi,DC=ch"
PGROUP_PATTERN = re.compile(r"CN=(p\d{5}),")
SERVERS = [f"dc{i:02}.d.psi.ch" for i in range(3)]
def get_data(username, password):
with Konnection(SERVERS, username, password) as conn:
conn.search(
make_search_base(username),
search_filter="(objectClass=*)",
search_scope=BASE,
attributes=ATTRIBUTES
)
entries = conn.entries
n_entries = len(entries)
if n_entries != 1:
raise RuntimeError(f"received ambiguous result ({n_entries} entries)")
entry = entries[0]
res = repack(entry)
res_username = res["username"]
if res_username != username:
raise RuntimeError(f"username mismatch: {res_username} != {username}")
return res
def make_search_base(cn):
res = [f"CN={cn}"]
if cn.startswith("ext-"):
res.append("OU=external")
elif cn.startswith("gac-"):
res.append("OU=nonpersons")
res.append(COMMON_DN)
return ",".join(res)
def repack(entry):
attrs = entry.entry_attributes_as_dict
res = {ATTRIBUTE_MAP[old]: item[0] for old, item in attrs.items() if old in ATTRIBUTE_MAP if item}
res["pgroups"] = sorted(m.group(1) for i in attrs["memberOf"] if (m := PGROUP_PATTERN.search(i)))
return res