""" Dieses Skript reichert eine Liste von JSON-Objekten mit Department-Informationen aus dem Active Directory (AD) an. Ablauf: 1. Eine JSON-Datei (definiert durch 'GROUPINFO_JSON_LNK_NAME_WITH_PATH') wird als Datenbasis eingelesen. 2. Eine optionale, manuell gepflegte CSV-Datei (definiert durch 'NON_SPECIFIED_GROUPS') wird geladen, um spezielle Gruppen direkt zuzuweisen und die AD-Abfrage zu überspringen. Die CSV-Datei hat die Spalten: group,paymentunit,info. 3. Für jeden Eintrag aus der JSON-Datei wird das Department ermittelt: a) Manuelle Zuweisung: Hat Priorität, falls die Gruppe in der CSV-Datei definiert ist. b) a-Gruppen: Das Department wird direkt aus dem AD-Attribut der Gruppe gelesen. c) p-Gruppen: Das Skript ermittelt in einem zweistufigen Prozess die zugehörige Beamline-Gruppe und fragt von dieser das Department ab. Die Zuordnung 'Beamline → Department' wird gecached, um die Anzahl der AD-Abfragen zu minimieren. 4. Das finale Ergebnis (die ursprüngliche Liste, ergänzt um 'department' und 'beamline') wird als JSON-String auf stdout ausgegeben. Alle Log-Meldungen gehen auf stderr. Resultat: Jede Archivierungsgruppe (entspricht einem Dictionary in der Liste) besitzt am Schluss einen zusaetzlichen Eintrag 'department', welcher den zugeordneten Bereich angibt. Falls das Namensschema nicht stimmt, steht dort "unknown_scheme", falls im AD nichts gefunden wird "not_found". """ import csv import json import os import sys from ldap3 import ALL, Connection, Server def lookup_ad(): try: # --- 1. Konfiguration und Daten prüfen und einlesen --- # Umgebungsvariablen laden und prüfen config = { 'AD_SERVER': os.environ.get('AD_SERVER'), 'AD_USER': os.environ.get('AD_USER'), 'AD_PASSWORD': os.environ.get('AD_PASSWORD'), 'AD_SEARCH_BASE': os.environ.get('AD_SEARCH_BASE'), 'FILE_PATH': os.environ.get('GROUPINFO_JSON_LNK_NAME_WITH_PATH') } file_path_non_specified_groups = os.environ.get('NON_SPECIFIED_GROUPS') for key, value in config.items(): if value is None: print(f"ERROR: Environment variable {key} is not set.", file=sys.stderr, flush=True) return # Sicheres Casten zu Strings für den Type-Checker ad_server = str(config['AD_SERVER']) ad_user = str(config['AD_USER']) ad_password = str(config['AD_PASSWORD']) ad_search_base = str(config['AD_SEARCH_BASE']) file_path = str(config['FILE_PATH']) # JSON-Datei als Haupt-Datenquelle laden if not os.path.exists(file_path): print(f"ERROR: File {file_path} not found!", file=sys.stderr, flush=True) return with open(file_path, 'r') as f: items = json.load(f) print(f"INFO: {len(items)} entries loaded from JSON.", file=sys.stderr, flush=True) # Optionale CSV-Datei mit manuellen Zuweisungen laden manual_departments = {} if file_path_non_specified_groups and os.path.exists(file_path_non_specified_groups): with open(file_path_non_specified_groups, 'r', newline='') as f: reader = csv.DictReader(f) for row in reader: group = row.get('group', '').strip() department = row.get('paymentunit', '').strip() info = row.get('info', '').strip() # Nur Gruppen hinzufügen, die nicht explizit als "nicht im AD" markiert sind if group and department and info != 'NOT_IN_AD': manual_departments[group] = department print(f"INFO: Loaded {len(manual_departments)} manual group definitions from CSV.", file=sys.stderr, flush=True) else: print("INFO: No valid manual groups CSV file provided or found.", file=sys.stderr, flush=True) # --- 2. AD-Verarbeitung --- # AD-Verbindung via SSL (Port 636) initialisieren server = Server(ad_server, port=636, use_ssl=True, get_info=ALL, connect_timeout=10) with Connection(server, user=ad_user, password=ad_password, auto_bind=True) as conn: # Cache für die Zuordnung von Beamline zu Department (Performance-Optimierung) beamline_department_cache = {} count = 0 for item in items: count += 1 group_name = item.get('ownerGroup', '') # Priorität 1: Manuelle Zuweisung aus CSV if group_name in manual_departments: item['department'] = manual_departments[group_name] continue # Priorität 2: Direkte a-Gruppen if group_name.startswith('a-'): search_filter = f'(&(objectClass=group)(cn={group_name}))' conn.search( search_base=ad_search_base, search_filter=search_filter, attributes=['department'] ) if conn.entries and 'department' in conn.entries[0]: item['department'] = str(conn.entries[0].department) else: item['department'] = "not_found" # Priorität 3: Zweistufige p-Gruppen elif group_name.startswith('p') and group_name[1:2].isdigit(): beamline = "not_found" # Stufe 1: Zugehörige Beamline-Gruppe via 'memberOf' ermitteln p_filter = f'(cn={group_name})' conn.search( search_base=ad_search_base, search_filter=p_filter, attributes=['memberOf'] ) if conn.entries and 'memberOf' in conn.entries[0] and conn.entries[0].memberOf: dn_generator = (str(dn) for dn in conn.entries[0].memberOf if 'OU=Beamlines,OU=Experiment,OU=IT' in str(dn)) selected_dn = next(dn_generator, None) if selected_dn is None: selected_dn = str(conn.entries[0].memberOf[0]) try: beamline = selected_dn.split(',')[0].split('=')[1] except IndexError: msg = f"WARN: Format von DN '{selected_dn}' unerwartet." print(msg, file=sys.stderr, flush=True) beamline = "format_error" item['beamline'] = beamline # Stufe 2: Department für die Beamline ermitteln (mit Caching) if beamline in beamline_department_cache: item['department'] = beamline_department_cache[beamline] else: search_filter = f'(&(objectClass=group)(cn={beamline}))' conn.search( search_base=ad_search_base, search_filter=search_filter, attributes=['department'] ) if conn.entries and 'department' in conn.entries[0]: res = str(conn.entries[0].department) else: res = "not_found" beamline_department_cache[beamline] = res item['department'] = res # Fallback: Unbekannte Gruppenschemata else: msg = f"WARNING: Unbekanntes Gruppen-Schema: {group_name}" print(msg, file=sys.stderr, flush=True) item['department'] = "unknown_schema" if count % 500 == 0: print(f"INFO: progress {count}/{len(items)}...", file=sys.stderr, flush=True) # --- 3. Ausgabe --- # Finales Ergebnis als JSON auf stdout ausgeben print(json.dumps(items)) # Zusätzliche Debug-Ausgabe auf stderr für Gruppen ohne valides Department for item in items: if item.get('department') in ("[]", "not_found", "no-dept"): msg = f"WARNING: Kein Department für {item['ownerGroup']}" if 'beamline' in item: msg += f" (Beamline: {item['beamline']})" msg += " gefunden." print(msg, file=sys.stderr, flush=True) except Exception as e: print(f"ERROR: {str(e)}", file=sys.stderr, flush=True) if __name__ == "__main__": lookup_ad()