From e2316217492f5cd84f4b925cd75d6f4614191db4 Mon Sep 17 00:00:00 2001 From: Sven Augustin Date: Fri, 22 May 2026 12:57:38 +0200 Subject: [PATCH] added mapping pgroup to beamline --- stand/auth/psildap.py | 74 +++++++++++++++++++++++++++++++------------ 1 file changed, 53 insertions(+), 21 deletions(-) diff --git a/stand/auth/psildap.py b/stand/auth/psildap.py index 2ebfddf..af5e7f4 100644 --- a/stand/auth/psildap.py +++ b/stand/auth/psildap.py @@ -1,11 +1,11 @@ import re -from ldap3 import BASE +from ldap3 import BASE, SUBTREE from .krbldap import Konnection -ATTRIBUTE_MAP = { +USER_ATTRIBUTE_MAP = { "cn": "username", "displayName": "display name", "givenName": "first name", @@ -13,52 +13,84 @@ ATTRIBUTE_MAP = { # "sn": "last name" } -ATTRIBUTES = [*ATTRIBUTE_MAP, "memberOf"] -COMMON_DN = "OU=users,OU=psi,DC=d,DC=psi,DC=ch" -PGROUP_PATTERN = re.compile(r"CN=(p\d{5}),") +USER_ATTRIBUTES = [*USER_ATTRIBUTE_MAP, "memberOf"] +COMMON_USER_DN = "OU=users,OU=psi,DC=d,DC=psi,DC=ch" +BASE_PGROUP_DN = "OU=Groups,OU=Experiment,OU=IT,DC=d,DC=psi,DC=ch" +PGROUP_PATTERN = re.compile(r"CN=(p\d{5}),OU=Groups,OU=Experiment,OU=IT,DC=d,DC=psi,DC=ch") +BEAMLINE_PATTERN = re.compile(r"CN=([^,]+),OU=Beamlines,OU=Experiment,OU=IT,DC=d,DC=psi,DC=ch") SERVERS = [f"dc{i:02}.d.psi.ch" for i in range(3)] def get_data(username, password): with Konnection(SERVERS, username, password) as conn: conn.search( - make_search_base(username), + make_user_search_base(username), search_filter="(objectClass=*)", search_scope=BASE, - attributes=ATTRIBUTES + attributes=USER_ATTRIBUTES ) - entries = conn.entries + entry = ensure_single(conn.entries, "users") + res = repack_user(entry) - n_entries = len(entries) - if n_entries != 1: - raise RuntimeError(f"received ambiguous result ({n_entries} entries)") + res_username = res["username"] + if res_username != username: + raise RuntimeError(f"username mismatch: {res_username} != {username}") - entry = entries[0] - res = repack(entry) + conn.search( + BASE_PGROUP_DN, + search_filter=make_pgroups_search_filter(res["pgroups"]), + search_scope=SUBTREE, + attributes=["cn", "memberOf"] + ) - res_username = res["username"] - if res_username != username: - raise RuntimeError(f"username mismatch: {res_username} != {username}") + res["pgroups"] = repack_pgroups(conn.entries) return res -def make_search_base(cn): +def make_user_search_base(cn): res = [f"CN={cn}"] if cn.startswith("ext-"): res.append("OU=external") elif cn.startswith("gac-"): res.append("OU=nonpersons") - res.append(COMMON_DN) + res.append(COMMON_USER_DN) return ",".join(res) -def repack(entry): +def repack_user(entry): attrs = entry.entry_attributes_as_dict - res = {ATTRIBUTE_MAP[old]: item[0] for old, item in attrs.items() if old in ATTRIBUTE_MAP if item} - res["pgroups"] = sorted(m.group(1) for i in attrs["memberOf"] if (m := PGROUP_PATTERN.search(i))) + res = {USER_ATTRIBUTE_MAP[old]: item[0] for old, item in attrs.items() if old in USER_ATTRIBUTE_MAP if item} + res["pgroups"] = extract_matches(attrs["memberOf"], PGROUP_PATTERN) return res +def make_pgroups_search_filter(pgroups): + search_filter = "".join(f"(cn={cn})" for cn in pgroups) + return f"(|{search_filter})" + + +def repack_pgroups(entries): + res = {} + for entry in entries: + attrs = entry.entry_attributes_as_dict + beamlines = extract_matches(attrs["memberOf"], BEAMLINE_PATTERN) + beamline = ensure_single(beamlines, "beamlines") + cn = ensure_single(attrs["cn"], "CNs") + res[cn] = beamline + return res + + +def ensure_single(seq, what): + n = len(seq) + if n == 1: + return seq[0] + raise RuntimeError(f"received ambiguous {what} ({n} entries)") + + +def extract_matches(seq, pattern): + return sorted(m.group(1) for i in seq if (m := pattern.search(i))) + +