From bdf94533a5788424bc8d0decd43f0cd31b4eeed3 Mon Sep 17 00:00:00 2001 From: x12sa Date: Thu, 22 Jan 2026 16:25:37 +0100 Subject: [PATCH] first version of csaxs eps --- csaxs_bec/device_configs/user_setup.yaml | 14 ++ csaxs_bec/devices/epics/eps.py | 204 +++++++++++++++++++++++ 2 files changed, 218 insertions(+) create mode 100644 csaxs_bec/devices/epics/eps.py diff --git a/csaxs_bec/device_configs/user_setup.yaml b/csaxs_bec/device_configs/user_setup.yaml index d8ccfe0..232849c 100644 --- a/csaxs_bec/device_configs/user_setup.yaml +++ b/csaxs_bec/device_configs/user_setup.yaml @@ -1 +1,15 @@ ############################################################ + + + +############################################################ +##################### EPS ################################## +############################################################ +x12saEPS: + description: X12SA EPS info and control + deviceClass: csaxs_bec.devices.epics.eps.cSAXSEps + deviceConfig: {} + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: baseline \ No newline at end of file diff --git a/csaxs_bec/devices/epics/eps.py b/csaxs_bec/devices/epics/eps.py new file mode 100644 index 0000000..c7eb989 --- /dev/null +++ b/csaxs_bec/devices/epics/eps.py @@ -0,0 +1,204 @@ + +from ophyd import Device, Component as Cpt, EpicsSignal + + + +# --------------------------- +# Registry: sections/channels +# --------------------------- +CHANNELS = { + "Valves Frontend": [ + {"attr": "FEVVPG0000", "label": "FE-VVPG-0000", "pv": "X12SA-FE-VVPG-0000:PLC_OPEN"}, + {"attr": "FEVVPG1010", "label": "FE-VVPG-1010", "pv": "X12SA-FE-VVPG-1010:PLC_OPEN"}, + {"attr": "FEVVFV2010", "label": "FE-VVFV-2010", "pv": "X12SA-FE-VVFV-2010:PLC_OPEN"}, + {"attr": "FEVVPG2010", "label": "FE-VVPG-2010", "pv": "X12SA-FE-VVPG-2010:PLC_OPEN"}, + ], + + "Valves Optics Hutch": [ + {"attr": "OPVVPG1010", "label": "OP-VVPG-1010", "pv": "X12SA-OP-VVPG-1010:PLC_OPEN"}, + {"attr": "OPVVPG2010", "label": "OP-VVPG-2010", "pv": "X12SA-OP-VVPG-2010:PLC_OPEN"}, + {"attr": "OPVVPG3010", "label": "OP-VVPG-3010", "pv": "X12SA-OP-VVPG-3010:PLC_OPEN"}, + {"attr": "OPVVPG3020", "label": "OP-VVPG-3020", "pv": "X12SA-OP-VVPG-3020:PLC_OPEN"}, + {"attr": "OPVVPG4010", "label": "OP-VVPG-4010", "pv": "X12SA-OP-VVPG-4010:PLC_OPEN"}, + {"attr": "OPVVPG5010", "label": "OP-VVPG-5010", "pv": "X12SA-OP-VVPG-5010:PLC_OPEN"}, + {"attr": "OPVVPG6010", "label": "OP-VVPG-6010", "pv": "X12SA-OP-VVPG-6010:PLC_OPEN"}, + {"attr": "OPVVPG7010", "label": "OP-VVPG-7010", "pv": "X12SA-OP-VVPG-7010:PLC_OPEN"}, + ], + + "Valves ES Hutch": [ + {"attr": "ESVVPG1010", "label": "ES-VVPG-1010", "pv": "X12SA-ES-VVPG-1010:PLC_OPEN"}, + ], + + "Shutters Frontend": [ + {"attr": "FEPSH1", "label": "FE-PSH1-EMLS-0010", "pv": "X12SA-FE-PSH1-EMLS-0010:OPEN"}, + {"attr": "FESTO1", "label": "FE-STO1-EMLS-0010", "pv": "X12SA-FE-STO1-EMLS-0010:OPEN"}, + ], + + "Shutters Endstation": [ + {"attr": "ESPSH17010", "label": "OP-PSH1-EMLS-7010", "pv": "X12SA-OP-PSH1-EMLS-7010:OPEN"}, + ], +} + + +class cSAXSEps(Device): + # ------------------------------------------------------------------------- + # Cpt definitions (AUTO-GENERATED FROM CHANNELS: one per registry entry) + # ------------------------------------------------------------------------- + USER_ACCESS = [ + "show_all", + ] + SUB_VALUE = "value" + _default_sub = SUB_VALUE + # Valves Frontend + FEVVPG0000 = Cpt(EpicsSignal, name="FE-VVPG-0000", read_pv="X12SA-FE-VVPG-0000:PLC_OPEN") + FEVVPG1010 = Cpt(EpicsSignal, name="FE-VVPG-1010", read_pv="X12SA-FE-VVPG-1010:PLC_OPEN") + FEVVFV2010 = Cpt(EpicsSignal, name="FE-VVFV-2010", read_pv="X12SA-FE-VVFV-2010:PLC_OPEN") + FEVVPG2010 = Cpt(EpicsSignal, name="FE-VVPG-2010", read_pv="X12SA-FE-VVPG-2010:PLC_OPEN") + + # Valves Optics Hutch + OPVVPG1010 = Cpt(EpicsSignal, name="OP-VVPG-1010", read_pv="X12SA-OP-VVPG-1010:PLC_OPEN") + OPVVPG2010 = Cpt(EpicsSignal, name="OP-VVPG-2010", read_pv="X12SA-OP-VVPG-2010:PLC_OPEN") + OPVVPG3010 = Cpt(EpicsSignal, name="OP-VVPG-3010", read_pv="X12SA-OP-VVPG-3010:PLC_OPEN") + OPVVPG3020 = Cpt(EpicsSignal, name="OP-VVPG-3020", read_pv="X12SA-OP-VVPG-3020:PLC_OPEN") + OPVVPG4010 = Cpt(EpicsSignal, name="OP-VVPG-4010", read_pv="X12SA-OP-VVPG-4010:PLC_OPEN") + OPVVPG5010 = Cpt(EpicsSignal, name="OP-VVPG-5010", read_pv="X12SA-OP-VVPG-5010:PLC_OPEN") + OPVVPG6010 = Cpt(EpicsSignal, name="OP-VVPG-6010", read_pv="X12SA-OP-VVPG-6010:PLC_OPEN") + OPVVPG7010 = Cpt(EpicsSignal, name="OP-VVPG-7010", read_pv="X12SA-OP-VVPG-7010:PLC_OPEN") + + # Valves ES Hutch + ESVVPG1010 = Cpt(EpicsSignal, name="ES-VVPG-1010", read_pv="X12SA-ES-VVPG-1010:PLC_OPEN") + + # Shutters Frontend + FEPSH1 = Cpt(EpicsSignal, name="FE-PSH1-EMLS-0010", read_pv="X12SA-FE-PSH1-EMLS-0010:OPEN") + FESTO1 = Cpt(EpicsSignal, name="FE-STO1-EMLS-0010", read_pv="X12SA-FE-STO1-EMLS-0010:OPEN") + + # Shutters Endstation + ESPSH17010 = Cpt(EpicsSignal, name="OP-PSH1-EMLS-7010", read_pv="X12SA-OP-PSH1-EMLS-7010:OPEN") + + # ------------------------ + # Status / consistency API + # ------------------------ + def show_all(self): + + """ + Print the status of all valves and shutters, grouped by section, driven by CHANNELS. + - Green = OPEN (True), Red = CLOSED (False) + - Missing attributes are shown as MISSING + """ + # ANSI colors + red = "\x1b[91m" + green = "\x1b[92m" + white = "\x1b[0m" + bold = "\x1b[1m" + + def safe_get_attr_value(attr_name): + obj = getattr(self, attr_name, None) + if obj is None: + return None + try: + return bool(obj.get()) + except Exception: + return None + + def render_line(label, is_open): + if is_open is None: + return f" - {label:<24} {red}MISSING{white}" + color = green if is_open else red + status = "OPEN" if is_open else "CLOSED" + return f" - {label:<24} {color}{status}{white}" + + def section_header(title): + print(f"\n{bold}{title}{white}") + + def show_section(title, items): + section_header(title) + + values = [(item["label"], safe_get_attr_value(item["attr"])) for item in items] + + # Summary + vals_present = [v for (_, v) in values if v is not None] + if not vals_present: + print(" (no channels found on this device)") + else: + total = len(vals_present) + open_cnt = sum(1 for v in vals_present if v) + all_open = (total > 0 and open_cnt == total) + if all_open: + print(f"{green}All channels in this section are open.{white}") + else: + print(f"{red}Warning: Not all channels in this section are open. ({open_cnt}/{total} open){white}") + + # Per-channel lines + for label, val in values: + print(render_line(label, val)) + + print("X12SA valve/shutter status") + for section, items in CHANNELS.items(): + show_section(section, items) + + def consistency_report(self, *, verbose=True): + """ + Checks for: + - attributes listed in CHANNELS but missing on the device + - duplicate PVs in CHANNELS (should be none in a real system) + """ + # Missing attributes + missing_attrs = [] + # Duplicate PVs + seen_pvs = {} + duplicates = [] + + for section, items in CHANNELS.items(): + for it in items: + # attr present? + if getattr(self, it["attr"], None) is None: + missing_attrs.append((section, it["attr"], it["label"], it["pv"])) + + # PV uniqueness + pv = it["pv"] + if pv in seen_pvs: + duplicates.append((pv, seen_pvs[pv], (section, it["attr"], it["label"]))) + else: + seen_pvs[pv] = (section, it["attr"], it["label"]) + + if verbose: + print("=== Consistency Report ===") + if missing_attrs: + print("\nMissing attributes on device (define matching Cpt):") + for section, attr, label, pv in missing_attrs: + print(f" - [{section}] {attr} ({label}) pv={pv}") + else: + print("\nNo missing attributes.") + + if duplicates: + print("\nDuplicate PVs in CHANNELS (fix registry; real system should have none):") + for pv, first, dup in duplicates: + fsec, fattr, flabel = first + dsec, dattr, dlabel = dup + print(f" - {pv}\n first: [{fsec}] {fattr} ({flabel})\n second: [{dsec}] {dattr} ({dlabel})") + else: + print("\nNo duplicate PVs.") + + return { + "missing_attrs": missing_attrs, + "duplicate_pvs": duplicates, + } + + +# ------------------------------- +# Developer helper (optional): re-generate Cpt definitions from CHANNELS +# ------------------------------- +def codegen_cpt_definitions(): + """Print Cpt(...) definitions for all channels based on CHANNELS.""" + lines = [] + lines.append("# === AUTO-GENERATED FROM CHANNELS ===") + for section, items in CHANNELS.items(): + lines.append(f"\n# {section}") + for it in items: + attr = it["attr"] + label = it["label"] + pv = it["pv"] + lines.append( + f'{attr} = Cpt(EpicsSignal, name="{label}", read_pv="{pv}")' + ) + print("\n".join(lines))