diff --git a/csaxs_bec/devices/epics/eps.py b/csaxs_bec/devices/epics/eps.py index c7eb989..5777a97 100644 --- a/csaxs_bec/devices/epics/eps.py +++ b/csaxs_bec/devices/epics/eps.py @@ -1,8 +1,6 @@ from ophyd import Device, Component as Cpt, EpicsSignal - - # --------------------------- # Registry: sections/channels # --------------------------- @@ -37,168 +35,222 @@ CHANNELS = { "Shutters Endstation": [ {"attr": "ESPSH17010", "label": "OP-PSH1-EMLS-7010", "pv": "X12SA-OP-PSH1-EMLS-7010:OPEN"}, ], + + # ------------------------------- + # 🔵 DMM MONOCHROMATOR + # ------------------------------- + "DMM Monochromator": [ + # Temperature sensors (:TEMP) + {"attr": "DMM_ETTC_3010", "label": "DMM Temp Surface 1", "pv": "X12SA-OP-DMM-ETTC-3010:TEMP"}, + {"attr": "DMM_ETTC_3020", "label": "DMM Temp Surface 2", "pv": "X12SA-OP-DMM-ETTC-3020:TEMP"}, + {"attr": "DMM_ETTC_3030", "label": "DMM Temp Shield 1 (disaster)", "pv": "X12SA-OP-DMM-ETTC-3030:TEMP"}, + {"attr": "DMM_ETTC_3040", "label": "DMM Temp Shield 2 (disaster)", "pv": "X12SA-OP-DMM-ETTC-3040:TEMP"}, + + # Translation and Bragg switches (:THRU / :IN) + {"attr": "DMM_EMLS_3010", "label": "DMM Translation ThruPos", "pv": "X12SA-OP-DMM-EMLS-3010:THRU"}, + {"attr": "DMM_EMLS_3020", "label": "DMM Translation InPos", "pv": "X12SA-OP-DMM-EMLS-3020:IN"}, + {"attr": "DMM_EMLS_3030", "label": "DMM Bragg ThruPos", "pv": "X12SA-OP-DMM-EMLS-3030:THRU"}, + {"attr": "DMM_EMLS_3040", "label": "DMM Bragg InPos", "pv": "X12SA-OP-DMM-EMLS-3040:IN"}, + + # Heater faults (ALL use :SWITCH) + {"attr": "DMM_EMSW_3050_SWITCH", "label": "DMM Heater Fault XTAL 1 (switch)", "pv": "X12SA-OP-DMM-EMSW-3050:SWITCH"}, + {"attr": "DMM_EMSW_3060_SWITCH", "label": "DMM Heater Fault XTAL 2 (switch)", "pv": "X12SA-OP-DMM-EMSW-3060:SWITCH"}, + {"attr": "DMM_EMSW_3070_SWITCH", "label": "DMM Heater Fault Support 1 (switch)", "pv": "X12SA-OP-DMM-EMSW-3070:SWITCH"}, + + # Readbacks + {"attr": "DMM1_ENERGY_GET", "label": "DMM Energy", "pv": "X12SA-OP-DMM1:ENERGY-GET"}, + {"attr": "DMM1_POSITION", "label": "DMM Position", "pv": "X12SA-OP-DMM1:POSITION"}, + {"attr": "DMM1_STRIPE", "label": "DMM Stripe", "pv": "X12SA-OP-DMM1:STRIPE"}, + ], + + # ------------------------------- + # 🔵 CCM MONOCHROMATOR + # ------------------------------- + "CCM Monochromator": [ + # Temperature sensors (:TEMP) + {"attr": "CCM_ETTC_4010", "label": "CCM Temp Crystal", "pv": "X12SA-OP-CCM-ETTC-4010:TEMP"}, + {"attr": "CCM_ETTC_4020", "label": "CCM Temp Shield (disaster)", "pv": "X12SA-OP-CCM-ETTC-4020:TEMP"}, + + # Heater faults (ALL use :SWITCH) + {"attr": "CCM_EMSW_4010_SWITCH", "label": "CCM Heater Fault 1 (switch)", "pv": "X12SA-OP-CCM-EMSW-4010:SWITCH"}, + {"attr": "CCM_EMSW_4020_SWITCH", "label": "CCM Heater Fault 2 (switch)", "pv": "X12SA-OP-CCM-EMSW-4020:SWITCH"}, + {"attr": "CCM_EMSW_4030_SWITCH", "label": "CCM Heater Fault 3 (switch)", "pv": "X12SA-OP-CCM-EMSW-4030:SWITCH"}, + + # Readbacks + {"attr": "CCM1_ENERGY_GET", "label": "CCM Energy", "pv": "X12SA-OP-CCM1:ENERGY-GET"}, + {"attr": "CCM1_POSITION", "label": "CCM Position", "pv": "X12SA-OP-CCM1:POSITION"}, + ], } -class cSAXSEps(Device): - # ------------------------------------------------------------------------- - # Cpt definitions (AUTO-GENERATED FROM CHANNELS: one per registry entry) - # ------------------------------------------------------------------------- - USER_ACCESS = [ - "show_all", - ] - SUB_VALUE = "value" - _default_sub = SUB_VALUE - # Valves Frontend - FEVVPG0000 = Cpt(EpicsSignal, name="FE-VVPG-0000", read_pv="X12SA-FE-VVPG-0000:PLC_OPEN") - FEVVPG1010 = Cpt(EpicsSignal, name="FE-VVPG-1010", read_pv="X12SA-FE-VVPG-1010:PLC_OPEN") - FEVVFV2010 = Cpt(EpicsSignal, name="FE-VVFV-2010", read_pv="X12SA-FE-VVFV-2010:PLC_OPEN") - FEVVPG2010 = Cpt(EpicsSignal, name="FE-VVPG-2010", read_pv="X12SA-FE-VVPG-2010:PLC_OPEN") +# ============================================================================== +# DYNAMIC CLASS CREATION (Ophyd-safe) — updated show_all formatting +# ============================================================================== +def create_dynamic_eps_class(): + """ + Create an Ophyd Device subclass with all Components generated + from CHANNELS at class-creation time (safe for Ophyd/BEC). + """ + class_attrs = dict( + USER_ACCESS=["show_all"], + SUB_VALUE="value", + _default_sub="value", + ) - # Valves Optics Hutch - OPVVPG1010 = Cpt(EpicsSignal, name="OP-VVPG-1010", read_pv="X12SA-OP-VVPG-1010:PLC_OPEN") - OPVVPG2010 = Cpt(EpicsSignal, name="OP-VVPG-2010", read_pv="X12SA-OP-VVPG-2010:PLC_OPEN") - OPVVPG3010 = Cpt(EpicsSignal, name="OP-VVPG-3010", read_pv="X12SA-OP-VVPG-3010:PLC_OPEN") - OPVVPG3020 = Cpt(EpicsSignal, name="OP-VVPG-3020", read_pv="X12SA-OP-VVPG-3020:PLC_OPEN") - OPVVPG4010 = Cpt(EpicsSignal, name="OP-VVPG-4010", read_pv="X12SA-OP-VVPG-4010:PLC_OPEN") - OPVVPG5010 = Cpt(EpicsSignal, name="OP-VVPG-5010", read_pv="X12SA-OP-VVPG-5010:PLC_OPEN") - OPVVPG6010 = Cpt(EpicsSignal, name="OP-VVPG-6010", read_pv="X12SA-OP-VVPG-6010:PLC_OPEN") - OPVVPG7010 = Cpt(EpicsSignal, name="OP-VVPG-7010", read_pv="X12SA-OP-VVPG-7010:PLC_OPEN") - - # Valves ES Hutch - ESVVPG1010 = Cpt(EpicsSignal, name="ES-VVPG-1010", read_pv="X12SA-ES-VVPG-1010:PLC_OPEN") - - # Shutters Frontend - FEPSH1 = Cpt(EpicsSignal, name="FE-PSH1-EMLS-0010", read_pv="X12SA-FE-PSH1-EMLS-0010:OPEN") - FESTO1 = Cpt(EpicsSignal, name="FE-STO1-EMLS-0010", read_pv="X12SA-FE-STO1-EMLS-0010:OPEN") - - # Shutters Endstation - ESPSH17010 = Cpt(EpicsSignal, name="OP-PSH1-EMLS-7010", read_pv="X12SA-OP-PSH1-EMLS-7010:OPEN") - - # ------------------------ - # Status / consistency API - # ------------------------ - def show_all(self): - - """ - Print the status of all valves and shutters, grouped by section, driven by CHANNELS. - - Green = OPEN (True), Red = CLOSED (False) - - Missing attributes are shown as MISSING - """ - # ANSI colors - red = "\x1b[91m" - green = "\x1b[92m" - white = "\x1b[0m" - bold = "\x1b[1m" - - def safe_get_attr_value(attr_name): - obj = getattr(self, attr_name, None) - if obj is None: - return None - try: - return bool(obj.get()) - except Exception: - return None - - def render_line(label, is_open): - if is_open is None: - return f" - {label:<24} {red}MISSING{white}" - color = green if is_open else red - status = "OPEN" if is_open else "CLOSED" - return f" - {label:<24} {color}{status}{white}" - - def section_header(title): - print(f"\n{bold}{title}{white}") - - def show_section(title, items): - section_header(title) - - values = [(item["label"], safe_get_attr_value(item["attr"])) for item in items] - - # Summary - vals_present = [v for (_, v) in values if v is not None] - if not vals_present: - print(" (no channels found on this device)") - else: - total = len(vals_present) - open_cnt = sum(1 for v in vals_present if v) - all_open = (total > 0 and open_cnt == total) - if all_open: - print(f"{green}All channels in this section are open.{white}") - else: - print(f"{red}Warning: Not all channels in this section are open. ({open_cnt}/{total} open){white}") - - # Per-channel lines - for label, val in values: - print(render_line(label, val)) - - print("X12SA valve/shutter status") - for section, items in CHANNELS.items(): - show_section(section, items) - - def consistency_report(self, *, verbose=True): - """ - Checks for: - - attributes listed in CHANNELS but missing on the device - - duplicate PVs in CHANNELS (should be none in a real system) - """ - # Missing attributes - missing_attrs = [] - # Duplicate PVs - seen_pvs = {} - duplicates = [] - - for section, items in CHANNELS.items(): - for it in items: - # attr present? - if getattr(self, it["attr"], None) is None: - missing_attrs.append((section, it["attr"], it["label"], it["pv"])) - - # PV uniqueness - pv = it["pv"] - if pv in seen_pvs: - duplicates.append((pv, seen_pvs[pv], (section, it["attr"], it["label"]))) - else: - seen_pvs[pv] = (section, it["attr"], it["label"]) - - if verbose: - print("=== Consistency Report ===") - if missing_attrs: - print("\nMissing attributes on device (define matching Cpt):") - for section, attr, label, pv in missing_attrs: - print(f" - [{section}] {attr} ({label}) pv={pv}") - else: - print("\nNo missing attributes.") - - if duplicates: - print("\nDuplicate PVs in CHANNELS (fix registry; real system should have none):") - for pv, first, dup in duplicates: - fsec, fattr, flabel = first - dsec, dattr, dlabel = dup - print(f" - {pv}\n first: [{fsec}] {fattr} ({flabel})\n second: [{dsec}] {dattr} ({dlabel})") - else: - print("\nNo duplicate PVs.") - - return { - "missing_attrs": missing_attrs, - "duplicate_pvs": duplicates, - } - - -# ------------------------------- -# Developer helper (optional): re-generate Cpt definitions from CHANNELS -# ------------------------------- -def codegen_cpt_definitions(): - """Print Cpt(...) definitions for all channels based on CHANNELS.""" - lines = [] - lines.append("# === AUTO-GENERATED FROM CHANNELS ===") + # Define all Components before the class is created for section, items in CHANNELS.items(): - lines.append(f"\n# {section}") for it in items: - attr = it["attr"] - label = it["label"] - pv = it["pv"] - lines.append( - f'{attr} = Cpt(EpicsSignal, name="{label}", read_pv="{pv}")' + class_attrs[it["attr"]] = Cpt( + EpicsSignal, + name=it["label"], + read_pv=it["pv"] ) - print("\n".join(lines)) + + class DynamicMethods: + def show_all(self): + red = "\x1b[91m" + green = "\x1b[92m" + white = "\x1b[0m" + bold = "\x1b[1m" + + # Render helpers + def safe_get(attr): + obj = getattr(self, attr, None) + if obj is None: + return None + try: + return obj.get() + except Exception: + return None + + def is_bool_like(v): + # handles 0/1/True/False specifically + return isinstance(v, (bool, int)) and v in (0, 1, True, False) + + + def format_value(value, pv): + """ + - Temperature PVs (…:TEMP): one decimal (e.g., 23.7) + - Energy PVs (…:ENERGY-GET): four decimals (e.g., 12.3456) + - Bool-like: handled separately + - Others (strings / floats / ints): printed as-is + """ + if value is None: + return f"{red}MISSING{white}" + + # Temperature formatting → 1 decimal + if pv.endswith(":TEMP") and isinstance(value, (int, float)): + return f"{value:.1f}" + + # Energy formatting → 4 decimals + if pv.endswith(":ENERGY-GET") and isinstance(value, (int, float)): + return f"{value:.4f}" + + # Non-bool values (e.g., strings like POSITION/STRIPE) + if not is_bool_like(value): + return f"{value}" + + # Return raw value for bool handling elsewhere + return value + + + def render_line(label, value, pv, label_width): + # Booleans as OPEN/CLOSED with color + if is_bool_like(value): + is_open = bool(value) + color = green if is_open else red + status = "OPEN" if is_open else "CLOSED" + return f" – {label:<{label_width}} {color}{status}{white}" + + # Everything else formatted via format_value + fv = format_value(value, pv) + # If format_value returned raw bool-like, it means not temp but bool-like; still handle color + if fv in (0, 1, True, False): + is_open = bool(fv) + color = green if is_open else red + status = "OPEN" if is_open else "CLOSED" + return f" – {label:<{label_width}} {color}{status}{white}" + + return f" – {label:<{label_width}} {fv}" + + print("X12SA valve/shutter/mono status") + + for section, items in CHANNELS.items(): + print(f"\n{bold}{section}{white}") + + # Fetch values once + rows = [] + for it in items: + val = safe_get(it["attr"]) + rows.append((it["label"], val, it["pv"])) + + # Compute a nice label width per section (min 32) + label_width = max(32, *(len(label) for (label, _, _) in rows) or [32]) + + # Section summary when all present values are pure booleans + present_vals = [v for (_, v, _) in rows if v is not None] + bool_like_vals = [v for v in present_vals if is_bool_like(v)] + if present_vals and len(bool_like_vals) == len(present_vals): + total = len(bool_like_vals) + open_cnt = sum(1 for v in bool_like_vals if bool(v)) + if open_cnt == total: + print(f"{green}All channels in this section are open.{white}") + else: + print(f"{red}Warning: {open_cnt}/{total} open.{white}") + elif not present_vals: + print(" (no channels found on this device)") + + # Lines + for label, value, pv in rows: + print(render_line(label, value, pv, label_width)) + + def consistency_report(self, *, verbose=True): + """ + Checks for: + - attributes listed in CHANNELS but missing on the device + - duplicate PVs in CHANNELS (should be none in a real system) + """ + missing = [] + dupes = [] + seen = {} + + for section, items in CHANNELS.items(): + for it in items: + if not hasattr(self, it["attr"]): + missing.append((section, it["attr"], it["label"], it["pv"])) + + pv = it["pv"] + if pv in seen: + dupes.append((pv, seen[pv], (section, it["attr"], it["label"]))) + else: + seen[pv] = (section, it["attr"], it["label"]) + + if verbose: + print("=== Consistency Report ===") + if missing: + print("\nMissing attributes on device:") + for section, attr, label, pv in missing: + print(f" - [{section}] {attr} ({label}) pv={pv}") + else: + print("\nNo missing attributes.") + + if dupes: + print("\nDuplicate PVs detected:") + for pv, first, second in dupes: + print(f" {pv}") + print(f" First: {first}") + print(f" Second: {second}") + else: + print("\nNo duplicate PVs.") + + return {"missing_attrs": missing, "duplicate_pvs": dupes} + + # Bind methods + class_attrs["show_all"] = DynamicMethods.show_all + class_attrs["consistency_report"] = DynamicMethods.consistency_report + + # Create the class + return type("cSAXSEps", (Device,), class_attrs) + +# Create the final class type for use in BEC +cSAXSEps = create_dynamic_eps_class()