create class dynamically, monos added

This commit is contained in:
x12sa
2026-01-23 13:46:52 +01:00
committed by appel_c
parent 020db0e690
commit da038bf6d9

View File

@@ -1,8 +1,6 @@
from ophyd import Device, Component as Cpt, EpicsSignal
# ---------------------------
# Registry: sections/channels
# ---------------------------
@@ -37,168 +35,222 @@ CHANNELS = {
"Shutters Endstation": [
{"attr": "ESPSH17010", "label": "OP-PSH1-EMLS-7010", "pv": "X12SA-OP-PSH1-EMLS-7010:OPEN"},
],
# -------------------------------
# 🔵 DMM MONOCHROMATOR
# -------------------------------
"DMM Monochromator": [
# Temperature sensors (:TEMP)
{"attr": "DMM_ETTC_3010", "label": "DMM Temp Surface 1", "pv": "X12SA-OP-DMM-ETTC-3010:TEMP"},
{"attr": "DMM_ETTC_3020", "label": "DMM Temp Surface 2", "pv": "X12SA-OP-DMM-ETTC-3020:TEMP"},
{"attr": "DMM_ETTC_3030", "label": "DMM Temp Shield 1 (disaster)", "pv": "X12SA-OP-DMM-ETTC-3030:TEMP"},
{"attr": "DMM_ETTC_3040", "label": "DMM Temp Shield 2 (disaster)", "pv": "X12SA-OP-DMM-ETTC-3040:TEMP"},
# Translation and Bragg switches (:THRU / :IN)
{"attr": "DMM_EMLS_3010", "label": "DMM Translation ThruPos", "pv": "X12SA-OP-DMM-EMLS-3010:THRU"},
{"attr": "DMM_EMLS_3020", "label": "DMM Translation InPos", "pv": "X12SA-OP-DMM-EMLS-3020:IN"},
{"attr": "DMM_EMLS_3030", "label": "DMM Bragg ThruPos", "pv": "X12SA-OP-DMM-EMLS-3030:THRU"},
{"attr": "DMM_EMLS_3040", "label": "DMM Bragg InPos", "pv": "X12SA-OP-DMM-EMLS-3040:IN"},
# Heater faults (ALL use :SWITCH)
{"attr": "DMM_EMSW_3050_SWITCH", "label": "DMM Heater Fault XTAL 1 (switch)", "pv": "X12SA-OP-DMM-EMSW-3050:SWITCH"},
{"attr": "DMM_EMSW_3060_SWITCH", "label": "DMM Heater Fault XTAL 2 (switch)", "pv": "X12SA-OP-DMM-EMSW-3060:SWITCH"},
{"attr": "DMM_EMSW_3070_SWITCH", "label": "DMM Heater Fault Support 1 (switch)", "pv": "X12SA-OP-DMM-EMSW-3070:SWITCH"},
# Readbacks
{"attr": "DMM1_ENERGY_GET", "label": "DMM Energy", "pv": "X12SA-OP-DMM1:ENERGY-GET"},
{"attr": "DMM1_POSITION", "label": "DMM Position", "pv": "X12SA-OP-DMM1:POSITION"},
{"attr": "DMM1_STRIPE", "label": "DMM Stripe", "pv": "X12SA-OP-DMM1:STRIPE"},
],
# -------------------------------
# 🔵 CCM MONOCHROMATOR
# -------------------------------
"CCM Monochromator": [
# Temperature sensors (:TEMP)
{"attr": "CCM_ETTC_4010", "label": "CCM Temp Crystal", "pv": "X12SA-OP-CCM-ETTC-4010:TEMP"},
{"attr": "CCM_ETTC_4020", "label": "CCM Temp Shield (disaster)", "pv": "X12SA-OP-CCM-ETTC-4020:TEMP"},
# Heater faults (ALL use :SWITCH)
{"attr": "CCM_EMSW_4010_SWITCH", "label": "CCM Heater Fault 1 (switch)", "pv": "X12SA-OP-CCM-EMSW-4010:SWITCH"},
{"attr": "CCM_EMSW_4020_SWITCH", "label": "CCM Heater Fault 2 (switch)", "pv": "X12SA-OP-CCM-EMSW-4020:SWITCH"},
{"attr": "CCM_EMSW_4030_SWITCH", "label": "CCM Heater Fault 3 (switch)", "pv": "X12SA-OP-CCM-EMSW-4030:SWITCH"},
# Readbacks
{"attr": "CCM1_ENERGY_GET", "label": "CCM Energy", "pv": "X12SA-OP-CCM1:ENERGY-GET"},
{"attr": "CCM1_POSITION", "label": "CCM Position", "pv": "X12SA-OP-CCM1:POSITION"},
],
}
class cSAXSEps(Device):
# -------------------------------------------------------------------------
# Cpt definitions (AUTO-GENERATED FROM CHANNELS: one per registry entry)
# -------------------------------------------------------------------------
USER_ACCESS = [
"show_all",
]
SUB_VALUE = "value"
_default_sub = SUB_VALUE
# Valves Frontend
FEVVPG0000 = Cpt(EpicsSignal, name="FE-VVPG-0000", read_pv="X12SA-FE-VVPG-0000:PLC_OPEN")
FEVVPG1010 = Cpt(EpicsSignal, name="FE-VVPG-1010", read_pv="X12SA-FE-VVPG-1010:PLC_OPEN")
FEVVFV2010 = Cpt(EpicsSignal, name="FE-VVFV-2010", read_pv="X12SA-FE-VVFV-2010:PLC_OPEN")
FEVVPG2010 = Cpt(EpicsSignal, name="FE-VVPG-2010", read_pv="X12SA-FE-VVPG-2010:PLC_OPEN")
# ==============================================================================
# DYNAMIC CLASS CREATION (Ophyd-safe) — updated show_all formatting
# ==============================================================================
def create_dynamic_eps_class():
"""
Create an Ophyd Device subclass with all Components generated
from CHANNELS at class-creation time (safe for Ophyd/BEC).
"""
class_attrs = dict(
USER_ACCESS=["show_all"],
SUB_VALUE="value",
_default_sub="value",
)
# Valves Optics Hutch
OPVVPG1010 = Cpt(EpicsSignal, name="OP-VVPG-1010", read_pv="X12SA-OP-VVPG-1010:PLC_OPEN")
OPVVPG2010 = Cpt(EpicsSignal, name="OP-VVPG-2010", read_pv="X12SA-OP-VVPG-2010:PLC_OPEN")
OPVVPG3010 = Cpt(EpicsSignal, name="OP-VVPG-3010", read_pv="X12SA-OP-VVPG-3010:PLC_OPEN")
OPVVPG3020 = Cpt(EpicsSignal, name="OP-VVPG-3020", read_pv="X12SA-OP-VVPG-3020:PLC_OPEN")
OPVVPG4010 = Cpt(EpicsSignal, name="OP-VVPG-4010", read_pv="X12SA-OP-VVPG-4010:PLC_OPEN")
OPVVPG5010 = Cpt(EpicsSignal, name="OP-VVPG-5010", read_pv="X12SA-OP-VVPG-5010:PLC_OPEN")
OPVVPG6010 = Cpt(EpicsSignal, name="OP-VVPG-6010", read_pv="X12SA-OP-VVPG-6010:PLC_OPEN")
OPVVPG7010 = Cpt(EpicsSignal, name="OP-VVPG-7010", read_pv="X12SA-OP-VVPG-7010:PLC_OPEN")
# Valves ES Hutch
ESVVPG1010 = Cpt(EpicsSignal, name="ES-VVPG-1010", read_pv="X12SA-ES-VVPG-1010:PLC_OPEN")
# Shutters Frontend
FEPSH1 = Cpt(EpicsSignal, name="FE-PSH1-EMLS-0010", read_pv="X12SA-FE-PSH1-EMLS-0010:OPEN")
FESTO1 = Cpt(EpicsSignal, name="FE-STO1-EMLS-0010", read_pv="X12SA-FE-STO1-EMLS-0010:OPEN")
# Shutters Endstation
ESPSH17010 = Cpt(EpicsSignal, name="OP-PSH1-EMLS-7010", read_pv="X12SA-OP-PSH1-EMLS-7010:OPEN")
# ------------------------
# Status / consistency API
# ------------------------
def show_all(self):
"""
Print the status of all valves and shutters, grouped by section, driven by CHANNELS.
- Green = OPEN (True), Red = CLOSED (False)
- Missing attributes are shown as MISSING
"""
# ANSI colors
red = "\x1b[91m"
green = "\x1b[92m"
white = "\x1b[0m"
bold = "\x1b[1m"
def safe_get_attr_value(attr_name):
obj = getattr(self, attr_name, None)
if obj is None:
return None
try:
return bool(obj.get())
except Exception:
return None
def render_line(label, is_open):
if is_open is None:
return f" - {label:<24} {red}MISSING{white}"
color = green if is_open else red
status = "OPEN" if is_open else "CLOSED"
return f" - {label:<24} {color}{status}{white}"
def section_header(title):
print(f"\n{bold}{title}{white}")
def show_section(title, items):
section_header(title)
values = [(item["label"], safe_get_attr_value(item["attr"])) for item in items]
# Summary
vals_present = [v for (_, v) in values if v is not None]
if not vals_present:
print(" (no channels found on this device)")
else:
total = len(vals_present)
open_cnt = sum(1 for v in vals_present if v)
all_open = (total > 0 and open_cnt == total)
if all_open:
print(f"{green}All channels in this section are open.{white}")
else:
print(f"{red}Warning: Not all channels in this section are open. ({open_cnt}/{total} open){white}")
# Per-channel lines
for label, val in values:
print(render_line(label, val))
print("X12SA valve/shutter status")
for section, items in CHANNELS.items():
show_section(section, items)
def consistency_report(self, *, verbose=True):
"""
Checks for:
- attributes listed in CHANNELS but missing on the device
- duplicate PVs in CHANNELS (should be none in a real system)
"""
# Missing attributes
missing_attrs = []
# Duplicate PVs
seen_pvs = {}
duplicates = []
for section, items in CHANNELS.items():
for it in items:
# attr present?
if getattr(self, it["attr"], None) is None:
missing_attrs.append((section, it["attr"], it["label"], it["pv"]))
# PV uniqueness
pv = it["pv"]
if pv in seen_pvs:
duplicates.append((pv, seen_pvs[pv], (section, it["attr"], it["label"])))
else:
seen_pvs[pv] = (section, it["attr"], it["label"])
if verbose:
print("=== Consistency Report ===")
if missing_attrs:
print("\nMissing attributes on device (define matching Cpt):")
for section, attr, label, pv in missing_attrs:
print(f" - [{section}] {attr} ({label}) pv={pv}")
else:
print("\nNo missing attributes.")
if duplicates:
print("\nDuplicate PVs in CHANNELS (fix registry; real system should have none):")
for pv, first, dup in duplicates:
fsec, fattr, flabel = first
dsec, dattr, dlabel = dup
print(f" - {pv}\n first: [{fsec}] {fattr} ({flabel})\n second: [{dsec}] {dattr} ({dlabel})")
else:
print("\nNo duplicate PVs.")
return {
"missing_attrs": missing_attrs,
"duplicate_pvs": duplicates,
}
# -------------------------------
# Developer helper (optional): re-generate Cpt definitions from CHANNELS
# -------------------------------
def codegen_cpt_definitions():
"""Print Cpt(...) definitions for all channels based on CHANNELS."""
lines = []
lines.append("# === AUTO-GENERATED FROM CHANNELS ===")
# Define all Components before the class is created
for section, items in CHANNELS.items():
lines.append(f"\n# {section}")
for it in items:
attr = it["attr"]
label = it["label"]
pv = it["pv"]
lines.append(
f'{attr} = Cpt(EpicsSignal, name="{label}", read_pv="{pv}")'
class_attrs[it["attr"]] = Cpt(
EpicsSignal,
name=it["label"],
read_pv=it["pv"]
)
print("\n".join(lines))
class DynamicMethods:
def show_all(self):
red = "\x1b[91m"
green = "\x1b[92m"
white = "\x1b[0m"
bold = "\x1b[1m"
# Render helpers
def safe_get(attr):
obj = getattr(self, attr, None)
if obj is None:
return None
try:
return obj.get()
except Exception:
return None
def is_bool_like(v):
# handles 0/1/True/False specifically
return isinstance(v, (bool, int)) and v in (0, 1, True, False)
def format_value(value, pv):
"""
- Temperature PVs (…:TEMP): one decimal (e.g., 23.7)
- Energy PVs (…:ENERGY-GET): four decimals (e.g., 12.3456)
- Bool-like: handled separately
- Others (strings / floats / ints): printed as-is
"""
if value is None:
return f"{red}MISSING{white}"
# Temperature formatting → 1 decimal
if pv.endswith(":TEMP") and isinstance(value, (int, float)):
return f"{value:.1f}"
# Energy formatting → 4 decimals
if pv.endswith(":ENERGY-GET") and isinstance(value, (int, float)):
return f"{value:.4f}"
# Non-bool values (e.g., strings like POSITION/STRIPE)
if not is_bool_like(value):
return f"{value}"
# Return raw value for bool handling elsewhere
return value
def render_line(label, value, pv, label_width):
# Booleans as OPEN/CLOSED with color
if is_bool_like(value):
is_open = bool(value)
color = green if is_open else red
status = "OPEN" if is_open else "CLOSED"
return f" {label:<{label_width}} {color}{status}{white}"
# Everything else formatted via format_value
fv = format_value(value, pv)
# If format_value returned raw bool-like, it means not temp but bool-like; still handle color
if fv in (0, 1, True, False):
is_open = bool(fv)
color = green if is_open else red
status = "OPEN" if is_open else "CLOSED"
return f" {label:<{label_width}} {color}{status}{white}"
return f" {label:<{label_width}} {fv}"
print("X12SA valve/shutter/mono status")
for section, items in CHANNELS.items():
print(f"\n{bold}{section}{white}")
# Fetch values once
rows = []
for it in items:
val = safe_get(it["attr"])
rows.append((it["label"], val, it["pv"]))
# Compute a nice label width per section (min 32)
label_width = max(32, *(len(label) for (label, _, _) in rows) or [32])
# Section summary when all present values are pure booleans
present_vals = [v for (_, v, _) in rows if v is not None]
bool_like_vals = [v for v in present_vals if is_bool_like(v)]
if present_vals and len(bool_like_vals) == len(present_vals):
total = len(bool_like_vals)
open_cnt = sum(1 for v in bool_like_vals if bool(v))
if open_cnt == total:
print(f"{green}All channels in this section are open.{white}")
else:
print(f"{red}Warning: {open_cnt}/{total} open.{white}")
elif not present_vals:
print(" (no channels found on this device)")
# Lines
for label, value, pv in rows:
print(render_line(label, value, pv, label_width))
def consistency_report(self, *, verbose=True):
"""
Checks for:
- attributes listed in CHANNELS but missing on the device
- duplicate PVs in CHANNELS (should be none in a real system)
"""
missing = []
dupes = []
seen = {}
for section, items in CHANNELS.items():
for it in items:
if not hasattr(self, it["attr"]):
missing.append((section, it["attr"], it["label"], it["pv"]))
pv = it["pv"]
if pv in seen:
dupes.append((pv, seen[pv], (section, it["attr"], it["label"])))
else:
seen[pv] = (section, it["attr"], it["label"])
if verbose:
print("=== Consistency Report ===")
if missing:
print("\nMissing attributes on device:")
for section, attr, label, pv in missing:
print(f" - [{section}] {attr} ({label}) pv={pv}")
else:
print("\nNo missing attributes.")
if dupes:
print("\nDuplicate PVs detected:")
for pv, first, second in dupes:
print(f" {pv}")
print(f" First: {first}")
print(f" Second: {second}")
else:
print("\nNo duplicate PVs.")
return {"missing_attrs": missing, "duplicate_pvs": dupes}
# Bind methods
class_attrs["show_all"] = DynamicMethods.show_all
class_attrs["consistency_report"] = DynamicMethods.consistency_report
# Create the class
return type("cSAXSEps", (Device,), class_attrs)
# Create the final class type for use in BEC
cSAXSEps = create_dynamic_eps_class()