diff --git a/logic/scripts/ad_lookup.py b/logic/scripts/ad_lookup.py index c58b915..7da2690 100644 --- a/logic/scripts/ad_lookup.py +++ b/logic/scripts/ad_lookup.py @@ -1,21 +1,40 @@ +""" +Dieses Skript liest eine JSON-Datei mit Gruppeninformationen ein und fragt für jede Gruppe +das zugehörige Department im Active Directory (AD) ab. Department ist in Form der entsprechenden +Zahl (z.B. 6000 fuer CPS) angegeben. + +Es unterscheidet dabei zwischen zwei Hauptfällen: +1. a-Gruppen (z.B. a-groupname): Diese Gruppen enthalten direkt das Attribut 'department'. +2. p-Gruppen (z.B. p12345): Diese Gruppen sind jeweils einer Beamline zugeordnet. Jede + Beamline besitzt ein 'department' Attribut. Das Skripte ermittelt zuerst die Beamline + und danach das Department. Die Zuordnung Beamline -> Department wird gecached, was + AD Anfragen spart. + +Die Ergebnisse (oder Fehlermeldungen für nicht zuordenbare Gruppen) werden ausgegeben. +""" +import json import os import sys -import json -from ldap3 import Server, Connection, ALL + +from ldap3 import ALL, Connection, Server + # --- Konfiguration --- AD_SERVER = 'd.psi.ch' AD_USER = os.environ.get('AD_USER') AD_PASSWORD = os.environ.get('AD_PASSWORD') AD_SEARCH_BASE = 'DC=d,DC=psi,DC=ch' -#FILE_PATH = "/data/archivegroup_information.json" FILE_PATH = os.environ.get('GROUPINFO_JSON_LNK_NAME_WITH_PATH') -#FILE_PATH = "/data/" + os.environ.get('GROUPINFO_JSON_LNK_NAME') -#FILE_PATH = "../../" + os.environ.get('GROUPINFO_JSON_LNK_NAME') +# FILE_PATH = "/data/" + os.environ.get('GROUPINFO_JSON_LNK_NAME') + def lookup_ad(): try: # 1. Datenbasis aus lokaler JSON-Datei laden + if FILE_PATH is None: + print("ERROR: Environment variable GROUPINFO_JSON_LNK_NAME_WITH_PATH is not set.", file=sys.stderr, flush=True) + return + if not os.path.exists(FILE_PATH): print(f"ERROR: Datei {FILE_PATH} nicht gefunden!", file=sys.stderr, flush=True) return @@ -29,10 +48,10 @@ def lookup_ad(): server = Server(AD_SERVER, port=636, use_ssl=True, get_info=ALL, connect_timeout=10) with Connection(server, user=AD_USER, password=AD_PASSWORD, auto_bind=True) as conn: - # Interner Cache zur Speicherung der p-Gruppen-Auflösungen - p_group_resolution_cache = {} - + # Interner Cache zur Speicherung der Zuordnung von p-Gruppe zu Beamline + beamline2department_cache = {} count = 0 + for item in items: count += 1 group_name = item.get('ownerGroup', '') @@ -40,9 +59,11 @@ def lookup_ad(): # --- Fall A: Direkte a-Gruppen --- if group_name.startswith('a-'): search_filter = f'(&(objectClass=group)(cn={group_name}))' - conn.search(search_base=AD_SEARCH_BASE, - search_filter=search_filter, - attributes=['department']) + conn.search( + search_base=AD_SEARCH_BASE, + search_filter=search_filter, + attributes=['department'] + ) if conn.entries: res = str(conn.entries[0].department) if 'department' in conn.entries[0] else "no-dept" @@ -52,45 +73,48 @@ def lookup_ad(): # --- Fall B: Zweistufige p-Gruppen --- elif group_name.startswith('p') and group_name[1:2].isdigit(): - if group_name in p_group_resolution_cache: - target_group = p_group_resolution_cache[group_name] + if group_name in beamline2department_cache: + beamline = beamline2department_cache[group_name] else: - # Stufe 1: memberOf-Attribut des p-Objekts abfragen + # Stufe 1: memberOf-Attribut des p-Objekts abfragen, um übergeordnete Gruppe zu finden p_filter = f'(cn={group_name})' - conn.search(search_base=AD_SEARCH_BASE, - search_filter=p_filter, - attributes=['memberOf']) - - target_group = "not_found" + conn.search( + search_base=AD_SEARCH_BASE, + search_filter=p_filter, + attributes=['memberOf'] + ) + + beamline = "not_found" if conn.entries and 'memberOf' in conn.entries[0] and conn.entries[0].memberOf: - # Iterate over all memberOf entries + # Suche spezifisch nach dem Beamline-Serviceordner selected_dn = None for dn in conn.entries[0].memberOf: dn_str = str(dn) - # Look for the specific service folder in the DN if 'OU=Beamlines,OU=Experiment,OU=IT' in dn_str: selected_dn = dn_str break - + + # Fallback, falls der spezifische Ordner nicht in memberOf gefunden wurde if selected_dn is None: - # Fallback to the first one if the specific folder wasn't found selected_dn = str(conn.entries[0].memberOf[0]) - + # Zielgruppe (CN) aus dem Distinguished Name extrahieren try: - target_group = selected_dn.split(',')[0].split('=')[1] - p_group_resolution_cache[group_name] = target_group + beamline = selected_dn.split(',')[0].split('=')[1] + beamline2department_cache[group_name] = beamline except IndexError: print(f"WARN: Format von DN '{selected_dn}' unerwartet.", file=sys.stderr, flush=True) - target_group = "format_error" + beamline = "format_error" - item['target_group'] = target_group + item['beamline'] = beamline - # Stufe 2: Department anhand der ermittelten target_group bestimmen - search_filter = f'(&(objectClass=group)(cn={target_group}))' - conn.search(search_base=AD_SEARCH_BASE, - search_filter=search_filter, - attributes=['department']) + # Stufe 2: Department anhand der ermittelten beamline (aus memberOf) bestimmen + search_filter = f'(&(objectClass=group)(cn={beamline}))' + conn.search( + search_base=AD_SEARCH_BASE, + search_filter=search_filter, + attributes=['department'] + ) if conn.entries: res = str(conn.entries[0].department) if 'department' in conn.entries[0] else "no-dept" @@ -108,22 +132,26 @@ def lookup_ad(): if count % 500 == 0: print(f"Fortschritt: {count}/{len(items)}...", file=sys.stderr, flush=True) - # 3. Finales Ergebnis als JSON-String an stdout ausgeben - # print(json.dumps(items)) - # Print all not known groups - for item in items: - # print(f"group: {item['ownerGroup']} -> {item['department']} -> type: {type(item['department'])}") - if item['department'] == "[]": - if 'target_group' in item.keys(): - print(f"LEER -> {item['ownerGroup']} -> {item['target_group']}") - else: - print(f"LEER -> {item['ownerGroup']} has no target_group") + # 3. Finales Ergebnis auswerten + # Kompletten JSON-Output drucken (momentan auskommentiert): + # print(json.dumps(items)) + # Test-Output: Alle nicht gefundenen bzw. leeren Zuweisungen drucken + for item in items: + # Debug (auskommentiert): + # print(f"group: {item['ownerGroup']} -> {item['department']} -> type: {type(item['department'])}") + if item['department'] == "[]": + if 'beamline' in item: + print(f"LEER -> {item['ownerGroup']} -> {item['beamline']}") + else: + print(f"LEER -> {item['ownerGroup']} has no beamline") except Exception as e: print(f"ERROR: {str(e)}", file=sys.stderr, flush=True) -# if 'items' in locals(): -# print(json.dumps(items)) + # Bei Fehler optional Items als JSON ausgeben (auskommentiert) + # if 'items' in locals(): + # print(json.dumps(items)) + if __name__ == "__main__": lookup_ad() \ No newline at end of file