Files
ArchiveCostWebapp/logic/scripts/ad_lookup.py
T
2026-06-16 17:39:34 +02:00

194 lines
8.6 KiB
Python

"""
Dieses Skript reichert eine Liste von JSON-Objekten mit Department-Informationen aus dem
Active Directory (AD) an.
Ablauf:
1. Eine JSON-Datei (definiert durch 'GROUPINFO_JSON_LNK_NAME_WITH_PATH') wird als
Datenbasis eingelesen.
2. Eine optionale, manuell gepflegte CSV-Datei (definiert durch 'NON_SPECIFIED_GROUPS')
wird geladen, um spezielle Gruppen direkt zuzuweisen und die AD-Abfrage zu überspringen.
Die CSV-Datei hat die Spalten: group,paymentunit,info.
3. Für jeden Eintrag aus der JSON-Datei wird das Department ermittelt:
a) Manuelle Zuweisung: Hat Priorität, falls die Gruppe in der CSV-Datei definiert ist.
b) a-Gruppen: Das Department wird direkt aus dem AD-Attribut der Gruppe gelesen.
c) p-Gruppen: Das Skript ermittelt in einem zweistufigen Prozess die zugehörige
Beamline-Gruppe und fragt von dieser das Department ab. Die Zuordnung
'Beamline → Department' wird gecached, um die Anzahl der AD-Abfragen zu minimieren.
4. Das finale Ergebnis (die ursprüngliche Liste, ergänzt um 'department' und 'beamline')
wird als JSON-String auf stdout ausgegeben. Alle Log-Meldungen gehen auf stderr.
Resultat:
Jede Archivierungsgruppe (entspricht einem Dictionary in der Liste) besitzt am Schluss
einen zusaetzlichen Eintrag 'department', welcher den zugeordneten Bereich angibt. Falls
das Namensschema nicht stimmt, steht dort "unknown_scheme", falls im AD nichts gefunden
wird "not_found".
"""
import csv
import json
import os
import sys
from ldap3 import ALL, Connection, Server
def lookup_ad():
try:
# --- 1. Konfiguration und Daten prüfen und einlesen ---
# Umgebungsvariablen laden und prüfen
config = {
'AD_SERVER': os.environ.get('AD_SERVER'),
'AD_USER': os.environ.get('AD_USER'),
'AD_PASSWORD': os.environ.get('AD_PASSWORD'),
'AD_SEARCH_BASE': os.environ.get('AD_SEARCH_BASE'),
'FILE_PATH': os.environ.get('GROUPINFO_JSON_LNK_NAME_WITH_PATH')
}
file_path_non_specified_groups = os.environ.get('NON_SPECIFIED_GROUPS')
for key, value in config.items():
if value is None:
print(f"ERROR: Environment variable {key} is not set.", file=sys.stderr, flush=True)
return
# Sicheres Casten zu Strings für den Type-Checker
ad_server = str(config['AD_SERVER'])
ad_user = str(config['AD_USER'])
ad_password = str(config['AD_PASSWORD'])
ad_search_base = str(config['AD_SEARCH_BASE'])
file_path = str(config['FILE_PATH'])
# JSON-Datei als Haupt-Datenquelle laden
if not os.path.exists(file_path):
print(f"ERROR: File {file_path} not found!", file=sys.stderr, flush=True)
return
with open(file_path, 'r') as f:
items = json.load(f)
print(f"INFO: {len(items)} entries loaded from JSON.", file=sys.stderr, flush=True)
# Optionale CSV-Datei mit manuellen Zuweisungen laden
manual_departments = {}
if file_path_non_specified_groups and os.path.exists(file_path_non_specified_groups):
with open(file_path_non_specified_groups, 'r', newline='') as f:
reader = csv.DictReader(f)
for row in reader:
group = row.get('group', '').strip()
department = row.get('paymentunit', '').strip()
info = row.get('info', '').strip()
# Nur Gruppen hinzufügen, die nicht explizit als "nicht im AD" markiert sind
if group and department and info != 'NOT_IN_AD':
manual_departments[group] = department
print(f"INFO: Loaded {len(manual_departments)} manual group definitions from CSV.",
file=sys.stderr, flush=True)
else:
print("INFO: No valid manual groups CSV file provided or found.", file=sys.stderr, flush=True)
# --- 2. AD-Verarbeitung ---
# AD-Verbindung via SSL (Port 636) initialisieren
server = Server(ad_server, port=636, use_ssl=True, get_info=ALL, connect_timeout=10)
with Connection(server, user=ad_user, password=ad_password, auto_bind=True) as conn:
# Cache für die Zuordnung von Beamline zu Department (Performance-Optimierung)
beamline_department_cache = {}
count = 0
for item in items:
count += 1
group_name = item.get('ownerGroup', '')
# Priorität 1: Manuelle Zuweisung aus CSV
if group_name in manual_departments:
item['department'] = manual_departments[group_name]
continue
# Priorität 2: Direkte a-Gruppen
if group_name.startswith('a-'):
search_filter = f'(&(objectClass=group)(cn={group_name}))'
conn.search(
search_base=ad_search_base,
search_filter=search_filter,
attributes=['department']
)
if conn.entries and 'department' in conn.entries[0]:
item['department'] = str(conn.entries[0].department)
else:
item['department'] = "not_found"
# Priorität 3: Zweistufige p-Gruppen
elif group_name.startswith('p') and group_name[1:2].isdigit():
beamline = "not_found"
# Stufe 1: Zugehörige Beamline-Gruppe via 'memberOf' ermitteln
p_filter = f'(cn={group_name})'
conn.search(
search_base=ad_search_base,
search_filter=p_filter,
attributes=['memberOf']
)
if conn.entries and 'memberOf' in conn.entries[0] and conn.entries[0].memberOf:
dn_generator = (str(dn) for dn in conn.entries[0].memberOf
if 'OU=Beamlines,OU=Experiment,OU=IT' in str(dn))
selected_dn = next(dn_generator, None)
if selected_dn is None:
selected_dn = str(conn.entries[0].memberOf[0])
try:
beamline = selected_dn.split(',')[0].split('=')[1]
except IndexError:
msg = f"WARN: Format von DN '{selected_dn}' unerwartet."
print(msg, file=sys.stderr, flush=True)
beamline = "format_error"
item['beamline'] = beamline
# Stufe 2: Department für die Beamline ermitteln (mit Caching)
if beamline in beamline_department_cache:
item['department'] = beamline_department_cache[beamline]
else:
search_filter = f'(&(objectClass=group)(cn={beamline}))'
conn.search(
search_base=ad_search_base,
search_filter=search_filter,
attributes=['department']
)
if conn.entries and 'department' in conn.entries[0]:
res = str(conn.entries[0].department)
else:
res = "not_found"
beamline_department_cache[beamline] = res
item['department'] = res
# Fallback: Unbekannte Gruppenschemata
else:
msg = f"WARNING: Unbekanntes Gruppen-Schema: {group_name}"
print(msg, file=sys.stderr, flush=True)
item['department'] = "unknown_schema"
if count % 500 == 0:
print(f"INFO: progress {count}/{len(items)}...", file=sys.stderr, flush=True)
# --- 3. Ausgabe ---
# Finales Ergebnis als JSON auf stdout ausgeben
print(json.dumps(items))
# Zusätzliche Debug-Ausgabe auf stderr für Gruppen ohne valides Department
for item in items:
if item.get('department') in ("[]", "not_found", "no-dept"):
msg = f"WARNING: Kein Department für {item['ownerGroup']}"
if 'beamline' in item:
msg += f" (Beamline: {item['beamline']})"
msg += " gefunden."
print(msg, file=sys.stderr, flush=True)
except Exception as e:
print(f"ERROR: {str(e)}", file=sys.stderr, flush=True)
if __name__ == "__main__":
lookup_ad()