Added description and changed some variable names for better understanding

This commit is contained in:
2026-06-12 11:50:46 +02:00
parent 32dbe09d7a
commit ef17a370fc
+73 -45
View File
@@ -1,21 +1,40 @@
"""
Dieses Skript liest eine JSON-Datei mit Gruppeninformationen ein und fragt für jede Gruppe
das zugehörige Department im Active Directory (AD) ab. Department ist in Form der entsprechenden
Zahl (z.B. 6000 fuer CPS) angegeben.
Es unterscheidet dabei zwischen zwei Hauptfällen:
1. a-Gruppen (z.B. a-groupname): Diese Gruppen enthalten direkt das Attribut 'department'.
2. p-Gruppen (z.B. p12345): Diese Gruppen sind jeweils einer Beamline zugeordnet. Jede
Beamline besitzt ein 'department' Attribut. Das Skripte ermittelt zuerst die Beamline
und danach das Department. Die Zuordnung Beamline -> Department wird gecached, was
AD Anfragen spart.
Die Ergebnisse (oder Fehlermeldungen für nicht zuordenbare Gruppen) werden ausgegeben.
"""
import json
import os
import sys
import json
from ldap3 import Server, Connection, ALL
from ldap3 import ALL, Connection, Server
# --- Konfiguration ---
AD_SERVER = 'd.psi.ch'
AD_USER = os.environ.get('AD_USER')
AD_PASSWORD = os.environ.get('AD_PASSWORD')
AD_SEARCH_BASE = 'DC=d,DC=psi,DC=ch'
#FILE_PATH = "/data/archivegroup_information.json"
FILE_PATH = os.environ.get('GROUPINFO_JSON_LNK_NAME_WITH_PATH')
#FILE_PATH = "/data/" + os.environ.get('GROUPINFO_JSON_LNK_NAME')
#FILE_PATH = "../../" + os.environ.get('GROUPINFO_JSON_LNK_NAME')
# FILE_PATH = "/data/" + os.environ.get('GROUPINFO_JSON_LNK_NAME')
def lookup_ad():
try:
# 1. Datenbasis aus lokaler JSON-Datei laden
if FILE_PATH is None:
print("ERROR: Environment variable GROUPINFO_JSON_LNK_NAME_WITH_PATH is not set.", file=sys.stderr, flush=True)
return
if not os.path.exists(FILE_PATH):
print(f"ERROR: Datei {FILE_PATH} nicht gefunden!", file=sys.stderr, flush=True)
return
@@ -29,10 +48,10 @@ def lookup_ad():
server = Server(AD_SERVER, port=636, use_ssl=True, get_info=ALL, connect_timeout=10)
with Connection(server, user=AD_USER, password=AD_PASSWORD, auto_bind=True) as conn:
# Interner Cache zur Speicherung der p-Gruppen-Auflösungen
p_group_resolution_cache = {}
# Interner Cache zur Speicherung der Zuordnung von p-Gruppe zu Beamline
beamline2department_cache = {}
count = 0
for item in items:
count += 1
group_name = item.get('ownerGroup', '')
@@ -40,9 +59,11 @@ def lookup_ad():
# --- Fall A: Direkte a-Gruppen ---
if group_name.startswith('a-'):
search_filter = f'(&(objectClass=group)(cn={group_name}))'
conn.search(search_base=AD_SEARCH_BASE,
search_filter=search_filter,
attributes=['department'])
conn.search(
search_base=AD_SEARCH_BASE,
search_filter=search_filter,
attributes=['department']
)
if conn.entries:
res = str(conn.entries[0].department) if 'department' in conn.entries[0] else "no-dept"
@@ -52,45 +73,48 @@ def lookup_ad():
# --- Fall B: Zweistufige p-Gruppen ---
elif group_name.startswith('p') and group_name[1:2].isdigit():
if group_name in p_group_resolution_cache:
target_group = p_group_resolution_cache[group_name]
if group_name in beamline2department_cache:
beamline = beamline2department_cache[group_name]
else:
# Stufe 1: memberOf-Attribut des p-Objekts abfragen
# Stufe 1: memberOf-Attribut des p-Objekts abfragen, um übergeordnete Gruppe zu finden
p_filter = f'(cn={group_name})'
conn.search(search_base=AD_SEARCH_BASE,
search_filter=p_filter,
attributes=['memberOf'])
target_group = "not_found"
conn.search(
search_base=AD_SEARCH_BASE,
search_filter=p_filter,
attributes=['memberOf']
)
beamline = "not_found"
if conn.entries and 'memberOf' in conn.entries[0] and conn.entries[0].memberOf:
# Iterate over all memberOf entries
# Suche spezifisch nach dem Beamline-Serviceordner
selected_dn = None
for dn in conn.entries[0].memberOf:
dn_str = str(dn)
# Look for the specific service folder in the DN
if 'OU=Beamlines,OU=Experiment,OU=IT' in dn_str:
selected_dn = dn_str
break
# Fallback, falls der spezifische Ordner nicht in memberOf gefunden wurde
if selected_dn is None:
# Fallback to the first one if the specific folder wasn't found
selected_dn = str(conn.entries[0].memberOf[0])
# Zielgruppe (CN) aus dem Distinguished Name extrahieren
try:
target_group = selected_dn.split(',')[0].split('=')[1]
p_group_resolution_cache[group_name] = target_group
beamline = selected_dn.split(',')[0].split('=')[1]
beamline2department_cache[group_name] = beamline
except IndexError:
print(f"WARN: Format von DN '{selected_dn}' unerwartet.", file=sys.stderr, flush=True)
target_group = "format_error"
beamline = "format_error"
item['target_group'] = target_group
item['beamline'] = beamline
# Stufe 2: Department anhand der ermittelten target_group bestimmen
search_filter = f'(&(objectClass=group)(cn={target_group}))'
conn.search(search_base=AD_SEARCH_BASE,
search_filter=search_filter,
attributes=['department'])
# Stufe 2: Department anhand der ermittelten beamline (aus memberOf) bestimmen
search_filter = f'(&(objectClass=group)(cn={beamline}))'
conn.search(
search_base=AD_SEARCH_BASE,
search_filter=search_filter,
attributes=['department']
)
if conn.entries:
res = str(conn.entries[0].department) if 'department' in conn.entries[0] else "no-dept"
@@ -108,22 +132,26 @@ def lookup_ad():
if count % 500 == 0:
print(f"Fortschritt: {count}/{len(items)}...", file=sys.stderr, flush=True)
# 3. Finales Ergebnis als JSON-String an stdout ausgeben
# print(json.dumps(items))
# Print all not known groups
for item in items:
# print(f"group: {item['ownerGroup']} -> {item['department']} -> type: {type(item['department'])}")
if item['department'] == "[]":
if 'target_group' in item.keys():
print(f"LEER -> {item['ownerGroup']} -> {item['target_group']}")
else:
print(f"LEER -> {item['ownerGroup']} has no target_group")
# 3. Finales Ergebnis auswerten
# Kompletten JSON-Output drucken (momentan auskommentiert):
# print(json.dumps(items))
# Test-Output: Alle nicht gefundenen bzw. leeren Zuweisungen drucken
for item in items:
# Debug (auskommentiert):
# print(f"group: {item['ownerGroup']} -> {item['department']} -> type: {type(item['department'])}")
if item['department'] == "[]":
if 'beamline' in item:
print(f"LEER -> {item['ownerGroup']} -> {item['beamline']}")
else:
print(f"LEER -> {item['ownerGroup']} has no beamline")
except Exception as e:
print(f"ERROR: {str(e)}", file=sys.stderr, flush=True)
# if 'items' in locals():
# print(json.dumps(items))
# Bei Fehler optional Items als JSON ausgeben (auskommentiert)
# if 'items' in locals():
# print(json.dumps(items))
if __name__ == "__main__":
lookup_ad()