diff --git a/csaxs_bec/device_configs/user_setup.yaml b/csaxs_bec/device_configs/user_setup.yaml index d8ccfe0..1e6681d 100644 --- a/csaxs_bec/device_configs/user_setup.yaml +++ b/csaxs_bec/device_configs/user_setup.yaml @@ -1 +1,15 @@ ############################################################ + + + +############################################################ +##################### EPS ################################## +############################################################ +x12saEPS: + description: X12SA EPS info and control + deviceClass: csaxs_bec.devices.epics.eps.EPS + deviceConfig: {} + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: baseline \ No newline at end of file diff --git a/csaxs_bec/devices/epics/eps.py b/csaxs_bec/devices/epics/eps.py new file mode 100644 index 0000000..e8eb8cc --- /dev/null +++ b/csaxs_bec/devices/epics/eps.py @@ -0,0 +1,435 @@ +"""EPS module for cSAXS beamline: defines the EPS device with its components and methods.""" + +# fmt: off +# Disable Black formatting for this file to preserve an easier readable structure for the component definitions. + +# pylint: disable=line-too-long +from __future__ import annotations + +import time + +from bec_lib.logger import bec_logger +from ophyd import Component as Cpt +from ophyd import Device, EpicsSignal, EpicsSignalRO, Kind +from ophyd_devices import PSIDeviceBase + +logger = bec_logger.logger + +# --------------------------- +# Registry: sections/channels +# --------------------------- + + +class EPSSubDevices(Device): + """Base class for EPS sub-device components (e.g. alarms, valves, shutters). with common methods if needed.""" + + def describe(self) -> dict: + desc = super().describe() + for walk in self.walk_signals(): + if walk.item.attr_name not in desc: + desc[walk.item.attr_name] = walk.item.describe() + return desc + + +class EPSAlarms(EPSSubDevices): + """EPS alarms at the cSAXS beamline.""" + + eps_alarm_cnt = Cpt(EpicsSignalRO, read_pv="X12SA-EPS-PLC:AlarmCnt_EPS", add_prefix=("",), name="eps_alarm_cnt", kind=Kind.omitted, doc="X12SA EPS Alarm count", auto_monitor=True, labels={"alarm"}) + mis_alarm_cnt = Cpt(EpicsSignalRO, read_pv="ARS00-MIS-PLC-01:AlarmCnt_Frontends", add_prefix=("",), name="mis_alarm_cnt", kind=Kind.omitted, doc="FrontEnd MIS Alarm count", auto_monitor=True, labels={"alarm"}) + + +class ValvesFrontend(EPSSubDevices): + """Valves frontend at the cSAXS beamline.""" + + + fe_vvpg_0000 = Cpt(EpicsSignalRO, read_pv="X12SA-FE-VVPG-0000:PLC_OPEN", add_prefix=("",), name="fevvpg0000", kind=Kind.omitted, doc="FE-VVPG-0000", auto_monitor=True, labels={"valve"}) + fe_vvpg_1010 = Cpt(EpicsSignalRO, read_pv="X12SA-FE-VVPG-1010:PLC_OPEN", add_prefix=("",), name="fevvpg1010", kind=Kind.omitted, doc="FE-VVPG-1010", auto_monitor=True, labels={"valve"}) + fe_vvfv_2010 = Cpt(EpicsSignalRO, read_pv="X12SA-FE-VVFV-2010:PLC_OPEN", add_prefix=("",), name="fevvfv2010", kind=Kind.omitted, doc="FE-VVFV-2010", auto_monitor=True, labels={"valve"}) + fe_vvpg_2010 = Cpt(EpicsSignalRO, read_pv="X12SA-FE-VVPG-2010:PLC_OPEN", add_prefix=("",), name="fevvpg2010", kind=Kind.omitted, doc="FE-VVPG-2010", auto_monitor=True, labels={"valve"}) + +class ValvesOptics(EPSSubDevices): + """Valves at the optics hutch.""" + + op_vvpg_1010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-1010:PLC_OPEN", add_prefix=("",), name="opvvpg1010", kind=Kind.omitted, doc="OP-VVPG-1010", auto_monitor=True, labels={"valve"}) + op_vvpg_2010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-2010:PLC_OPEN", add_prefix=("",), name="opvvpg2010", kind=Kind.omitted, doc="OP-VVPG-2010", auto_monitor=True, labels={"valve"}) + op_vvpg_3010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-3010:PLC_OPEN", add_prefix=("",), name="opvvpg3010", kind=Kind.omitted, doc="OP-VVPG-3010", auto_monitor=True, labels={"valve"}) + op_vvpg_3020 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-3020:PLC_OPEN", add_prefix=("",), name="opvvpg3020", kind=Kind.omitted, doc="OP-VVPG-3020", auto_monitor=True, labels={"valve"}) + op_vvpg_4010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-4010:PLC_OPEN", add_prefix=("",), name="opvvpg4010", kind=Kind.omitted, doc="OP-VVPG-4010", auto_monitor=True, labels={"valve"}) + op_vvpg_5010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-5010:PLC_OPEN", add_prefix=("",), name="opvvpg5010", kind=Kind.omitted, doc="OP-VVPG-5010", auto_monitor=True, labels={"valve"}) + op_vvpg_6010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-6010:PLC_OPEN", add_prefix=("",), name="opvvpg6010", kind=Kind.omitted, doc="OP-VVPG-6010", auto_monitor=True, labels={"valve"}) + op_vvpg_7010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-7010:PLC_OPEN", add_prefix=("",), name="opvvpg7010", kind=Kind.omitted, doc="OP-VVPG-7010", auto_monitor=True, labels={"valve"}) + + +class ValvesEndstation(EPSSubDevices): + """Endstation valves at the cSAXS beamline.""" + + es_vvpg_1010 = Cpt(EpicsSignalRO, read_pv="X12SA-ES-VVPG-1010:PLC_OPEN", add_prefix=("",), name="esvvpg1010", kind=Kind.omitted, doc="ES-VVPG-1010", auto_monitor=True, labels={"valve"}) + + +class ShuttersFrontend(EPSSubDevices): + """Shutters frontend.""" + + fe_psh1 = Cpt(EpicsSignalRO, read_pv="X12SA-FE-PSH1-EMLS-0010:OPEN", add_prefix=("",), name="fepsh1", kind=Kind.omitted, doc="FE-PSH1-EMLS-0010", auto_monitor=True, labels={"shutter"}) + fe_sto1 = Cpt(EpicsSignalRO, read_pv="X12SA-FE-STO1-EMLS-0010:OPEN", add_prefix=("",), name="festo1", kind=Kind.omitted, doc="FE-STO1-EMLS-0010", auto_monitor=True, labels={"shutter"}) + + +class ShuttersEndstation(EPSSubDevices): + """Shutters at the endstation.""" + + es_psh17010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-PSH1-EMLS-7010:OPEN", add_prefix=("",), name="espsh17010", kind=Kind.omitted, doc="OP-PSH1-EMLS-7010", auto_monitor=True, labels={"shutter"}) + + +class DMMMonochromator(EPSSubDevices): + """DMM monochromator signals at the cSAXS beamline.""" + + dmm_temp_surface_1 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-ETTC-3010:TEMP", add_prefix=("",), name="dmm_temp_surface_1", kind=Kind.omitted, doc="DMM Temp Surface 1", auto_monitor=True, labels={"temp"}) + dmm_temp_surface_2 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-ETTC-3020:TEMP", add_prefix=("",), name="dmm_temp_surface_2", kind=Kind.omitted, doc="DMM Temp Surface 2", auto_monitor=True, labels={"temp"}) + dmm_temp_shield_1_disaster = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-ETTC-3030:TEMP", add_prefix=("",), name="dmm_temp_shield_1_disaster", kind=Kind.omitted, doc="DMM Temp Shield 1 (disaster)", auto_monitor=True, labels={"temp"}) + dmm_temp_shield_2_disaster = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-ETTC-3040:TEMP", add_prefix=("",), name="dmm_temp_shield_2_disaster", kind=Kind.omitted, doc="DMM Temp Shield 2 (disaster)", auto_monitor=True, labels={"temp"}) + + dmm_translation_thru = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMLS-3010:THRU", add_prefix=("",), name="dmm_translation_thru", kind=Kind.omitted, doc="DMM Translation ThruPos", auto_monitor=True, labels={"switch"}) + dmm_translation_in = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMLS-3020:IN", add_prefix=("",), name="dmm_translation_in", kind=Kind.omitted, doc="DMM Translation InPos", auto_monitor=True, labels={"switch"}) + dmm_bragg_thru = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMLS-3030:THRU", add_prefix=("",), name="dmm_bragg_thru", kind=Kind.omitted, doc="DMM Bragg ThruPos", auto_monitor=True, labels={"switch"}) + dmm_bragg_in = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMLS-3040:IN", add_prefix=("",), name="dmm_bragg_in", kind=Kind.omitted, doc="DMM Bragg InPos", auto_monitor=True, labels={"switch"}) + + dmm_heater_fault_xtal_1 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMSW-3050:SWITCH", add_prefix=("",), name="dmm_heater_fault_xtal_1", kind=Kind.omitted, doc="DMM Heater Fault XTAL 1", auto_monitor=True, labels={"fault"}) + dmm_heater_fault_xtal_2 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMSW-3060:SWITCH", add_prefix=("",), name="dmm_heater_fault_xtal_2", kind=Kind.omitted, doc="DMM Heater Fault XTAL 2", auto_monitor=True, labels={"fault"}) + dmm_heater_fault_support_1 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMSW-3070:SWITCH", add_prefix=("",), name="dmm_heater_fault_support_1", kind=Kind.omitted, doc="DMM Heater Fault Support 1", auto_monitor=True, labels={"fault"}) + + dmm_energy = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM1:ENERGY-GET", add_prefix=("",), name="dmm_energy", kind=Kind.omitted, doc="DMM Energy", auto_monitor=True, labels={"energy"}) + dmm_position = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM1:POSITION", add_prefix=("",), name="dmm_position", kind=Kind.omitted, doc="DMM Position", auto_monitor=True, labels={"string"}) + dmm_stripe = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM1:STRIPE", add_prefix=("",), name="dmm_stripe", kind=Kind.omitted, doc="DMM Stripe", auto_monitor=True, labels={"string"}) + + +class CCMMonochromator(EPSSubDevices): + """CCM monochromator signals at the cSAXS beamline.""" + + ccm_temp_crystal = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM-ETTC-4010:TEMP", add_prefix=("",), name="ccm_temp_crystal", kind=Kind.omitted, doc="CCM Temp Crystal", auto_monitor=True, labels={"temp"}) + ccm_temp_shield_disaster = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM-ETTC-4020:TEMP", add_prefix=("",), name="ccm_temp_shield_disaster", kind=Kind.omitted, doc="CCM Temp Shield (disaster)", auto_monitor=True, labels={"temp"}) + + ccm_heater_fault_1 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM-EMSW-4010:SWITCH", add_prefix=("",), name="ccm_heater_fault_1", kind=Kind.omitted, doc="CCM Heater Fault 1", auto_monitor=True, labels={"fault"}) + ccm_heater_fault_2 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM-EMSW-4020:SWITCH", add_prefix=("",), name="ccm_heater_fault_2", kind=Kind.omitted, doc="CCM Heater Fault 2", auto_monitor=True, labels={"fault"}) + ccm_heater_fault_3 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM-EMSW-4030:SWITCH", add_prefix=("",), name="ccm_heater_fault_3", kind=Kind.omitted, doc="CCM Heater Fault 3", auto_monitor=True, labels={"fault"}) + + ccm_energy = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM1:ENERGY-GET", add_prefix=("",), name="ccm_energy", kind=Kind.omitted, doc="CCM Energy", auto_monitor=True, labels={"energy"}) + ccm_position = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM1:POSITION", add_prefix=("",), name="ccm_position", kind=Kind.omitted, doc="CCM Position", auto_monitor=True, labels={"string"}) + + +class CoolingWater(EPSSubDevices): + """Cooling water signals at the cSAXS beamline.""" + + op_sl1_efsw_2010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-SL1-EFSW-2010:FLOW", add_prefix=("",), name="op_sl1_efsw_2010_flow", kind=Kind.omitted, doc="OP-SL1-EFSW-2010", auto_monitor=True, labels={"flow"}) + op_sl2_efsw_2010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-SL2-EFSW-2010:FLOW", add_prefix=("",), name="op_sl2_efsw_2010_flow", kind=Kind.omitted, doc="OP-SL2-EFSW-2010", auto_monitor=True, labels={"flow"}) + op_eb1_efsw_5010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-EB1-EFSW-5010:FLOW", add_prefix=("",), name="op_eb1_efsw_5010_flow", kind=Kind.omitted, doc="OP-EB1-EFSW-5010", auto_monitor=True, labels={"flow"}) + op_eb1_efsw_5020_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-EB1-EFSW-5020:FLOW", add_prefix=("",), name="op_eb1_efsw_5020_flow", kind=Kind.omitted, doc="OP-EB1-EFSW-5020", auto_monitor=True, labels={"flow"}) + op_sl3_efsw_5010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-SL3-EFSW-5010:FLOW", add_prefix=("",), name="op_sl3_efsw_5010_flow", kind=Kind.omitted, doc="OP-SL3-EFSW-5010", auto_monitor=True, labels={"flow"}) + op_kb_efsw_6010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-KB-EFSW-6010:FLOW", add_prefix=("",), name="op_kb_efsw_6010_flow", kind=Kind.omitted, doc="OP-KB-EFSW-6010", auto_monitor=True, labels={"flow"}) + op_psh1_efsw_7010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-PSH1-EFSW-7010:FLOW", add_prefix=("",), name="op_psh1_efsw_7010_flow", kind=Kind.omitted, doc="OP-PSH1-EFSW-7010", auto_monitor=True, labels={"flow"}) + es_eb2_efsw_1010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-ES-EB2-EFSW-1010:FLOW", add_prefix=("",), name="es_eb2_efsw_1010_flow", kind=Kind.omitted, doc="ES-EB2-EFSW-1010", auto_monitor=True, labels={"flow"}) + + op_cs_ecvw_0010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CS-ECVW-0010:PLC_OPEN", add_prefix=("",), name="op_cs_ecvw_0010", kind=Kind.omitted, doc="OP-CS-ECVW-0010", auto_monitor=True, labels={"valve"}) + op_cs_ecvw_0020 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CS-ECVW-0020:PLC_OPEN", add_prefix=("",), name="op_cs_ecvw_0020", kind=Kind.omitted, doc="OP-CS-ECVW-0020", auto_monitor=True, labels={"valve"}) + + +class EPS(PSIDeviceBase): + """EPS device for the cSAXS beamline.""" + USER_ACCESS = [ + "show_all", + "water_cooling_op", + ] + alarms = Cpt(EPSAlarms, name="alarms", doc="EPS Alarms") + valves_frontend = Cpt(ValvesFrontend, name="valves_frontend", doc="Valves Frontend") + valves_optics = Cpt(ValvesOptics, name="valves_optics", doc="Valves Optics Hutch") + valves_es = Cpt(ValvesEndstation, name="valves_es", doc="Valves ES Hutch") + shutters_frontend = Cpt(ShuttersFrontend, name="shutters_frontend", doc="Shutters Frontend") + shutters_es = Cpt(ShuttersEndstation, name="shutters_es", doc="Shutters Endstation") + dmm_monochromator = Cpt(DMMMonochromator, name="dmm_monochromator", doc="DMM Monochromator") + ccm_monochromator = Cpt(CCMMonochromator, name="ccm_monochromator", doc="CCM Monochromator") + cooling_water = Cpt(CoolingWater, name="cooling_water", doc="Cooling Water") + + # Acknowledgment signals for PLC communication (if needed for future use) + ackerr = Cpt(EpicsSignal, read_pv="X12SA-EPS-PLC:ACKERR-REQUEST", add_prefix=("",), name="ackerr", kind=Kind.omitted, doc="ACKERR request - OP-CS-ECVW-0020", auto_monitor=True, labels={"request"}) + request = Cpt(EpicsSignal, read_pv="X12SA-OP-CS-ECVW:PLC_REQUEST", add_prefix=("",), name="op_cs_ecvw_request", kind=Kind.omitted, doc="PLC request - OP-CS-ECVW-PLC_REQUEST", auto_monitor=True, labels={"request"}) + + def _notify(self, msg: str, show_as_client_msg: bool = True): + """Utility method to print a message, and optionally send it to the client UI if it should be shown also as a client message.""" + try: + if show_as_client_msg: + self.device_manager.connector.send_client_info(msg, scope="", show_asap=True) + else: + print(msg) + except Exception: + logger.error(f"Failed to send client message, falling back to print: {msg}") + print(str(msg)) + + # ---------------------------------------------------------- + # Water cooling operation + # ---------------------------------------------------------- + + def safe_get(self, sig, default=None): + """Helper method to safely get a signal value, returning a default if there's an error.""" + try: + return sig.get() + except Exception as ex: + logger.warning(f"Failed to get signal {sig.pvname}: {ex}") + return default + + def water_cooling_op(self): + """ + Open ECVW valves, reset EPS alarms, monitor for 20s, + then ensure stability (valves remain open) for 10s. + All messages sent to client. + """ + + POLL_PERIOD = 2 + TIMEOUT = 20 + STABILITY = 15 + + self._notify("=== Water Cooling Operation ===") + + # --- Signals --- + eps_alarm_sig = self.alarms.eps_alarm_cnt + ackerr = self.ackerr + request = self.request + + valves = [self.cooling_water.op_cs_ecvw_0010, self.cooling_water.op_cs_ecvw_0020] + + # Flow channels list extracted from CHANNELS + flow_items = [walk.item for walk in self.cooling_water.walk_signals() if "flow" in walk.item._ophyd_labels_] + + # --- Step 1: EPS alarm reset --- + alarm_value = self.safe_get(eps_alarm_sig, 0) + if alarm_value and alarm_value > 0: + self._notify(f"[WaterCooling] EPS alarms present ({alarm_value}) → resetting…") + try: + ackerr.put(1) + except Exception as ex: + self._notify(f"[WaterCooling] WARNING: ACKERR write failed: {ex}") + time.sleep(0.3) + else: + self._notify("[WaterCooling] No EPS alarms detected.") + + # --- Step 2: Issue open request --- + self._notify("[WaterCooling] Sending cooling-valve OPEN request…") + try: + request.put(1) + except Exception as ex: + self._notify(f"[WaterCooling] ERROR: Failed to send OPEN request: {ex}") + return False + + # --- Step 3: Monitoring loop (clean client table output) --- + start = time.time() + end = start + TIMEOUT + stable_until = None + + # Print (server-side) header once + print("Monitoring valves and flow sensors...") + print(f" Valves: {valves[0].attr_name[-4:]}, {valves[1].attr_name[-4:]}") + print(f" Note: stability requires valves to remain OPEN for {STABILITY} seconds.") + + # One table header to the client (via device manager) + # Fixed-width columns for alignment in monospaced UI + table_header = f"{'Time':>6} | {'Valves':<21} | {'Flows (OK/FAIL/N/A)':<20}" + self._notify(table_header) + + def snapshot(): + # Valve snapshot + v_states = [self.safe_get(v, None) for v in valves] + v1 = f"{valves[0].attr_name[-4:]}=" + ("OPEN " if v_states[0] is True or v_states[0] == 1 else "CLOSED" if v_states[0] is False or v_states[0] == 0 else "N/A ") + v2 = f"{valves[1].attr_name[-4:]}=" + ("OPEN " if v_states[1] is True or v_states[1] == 1 else "CLOSED" if v_states[1] is False or v_states[1] == 0 else "N/A ") + # 2 valves with a single space between => width ~ 21 + valve_str = f"{v1} {v2}" + + # Flow summary: OK/FAIL/N/A counts (compact) + flow_states = [] + for fsig in flow_items: + fval = self.safe_get(fsig, None) + flow_states.append(True if fval in (1, True) else False if fval in (0, False) else None) + + ok = sum(1 for f in flow_states if f is True) + fail = sum(1 for f in flow_states if f is False) + na = sum(1 for f in flow_states if f is None) + flow_summary = f"{ok:>2} / {fail:>2} / {na:>2}" + + return v_states, valve_str, flow_summary + + while True: + # TODO Consider adding a timeout to avoid infinite loop. + now = time.time() + elapsed = int(now - start) + + if now > end: + # One last line to client + v_states, valves_s, flows_s = snapshot() + self._notify(f"{elapsed:>6}s | {valves_s:<21} | {flows_s:<20}") + print("→ TIMEOUT: Cooling valves failed to remain OPEN.") + return False + + # Live snapshot + v_states, valves_s, flows_s = snapshot() + # Exactly one concise line to client per cycle + self._notify(f"{elapsed:>6}s | {valves_s:<21} | {flows_s:<20}") + + both_open = all(s is not None and bool(s) for s in v_states) + + if both_open: + if stable_until is None: + stable_until = now + STABILITY + print(f"[WaterCooling] Both valves OPEN → starting {STABILITY}s stability window…") + else: + if now >= stable_until: + print("→ SUCCESS: Valves remained OPEN during stability window.") + return True + else: + if stable_until is not None: + print("[WaterCooling] Valve closed again → restarting stability window.") + stable_until = None + + time.sleep(POLL_PERIOD) + + def show_all(self): + red = "\x1b[91m" + green = "\x1b[92m" + white = "\x1b[0m" + bold = "\x1b[1m" + cyan = "\x1b[96m" + + # ---- New: enum maps for numeric -> string rendering ---- + POSITION_ENUM = {0: "out of beam", 1: "in beam"} + STRIPE_ENUM = {0: "Stripe 1 W/B4C", 1: "Stripe 2 NiV/B4C"} + POSITION_ATTRS = {self.dmm_monochromator.dmm_position.attr_name, self.ccm_monochromator.ccm_position.attr_name} + STRIPE_ATTRS = {self.dmm_monochromator.dmm_stripe.attr_name} + + def is_bool_like(v): + return isinstance(v, (bool, int)) and v in (0, 1, True, False) + + # ---- Changed: accept attr in formatter so we can apply enum mapping ---- + def fmt_value(value: any, signal: EpicsSignalRO): + if value is None: + return f"{red}MISSING{white}" + + attr = signal.attr_name + + # ---------- Explicit enum mappings by attribute ---------- + if attr in POSITION_ATTRS: + # Position comes as numeric 0/1 + try: + iv = int(value) + return POSITION_ENUM.get(iv, f"{iv}") + except Exception: + # Fallback if it’s already a string or unexpected + return f"{value}" + + if attr in STRIPE_ATTRS: + # Stripe comes as numeric 0/1 + try: + iv = int(value) + return STRIPE_ENUM.get(iv, f"{iv}") + except Exception: + return f"{value}" + + # ------------------- TEMPERATURE ------------------- + if "temp" in signal._ophyd_labels_ and isinstance(value, (int, float)): + return f"{value:.1f}" + + # ------------------- ENERGY ------------------------ + if "energy" in signal._ophyd_labels_ and isinstance(value, (int, float)): + return f"{value:.4f}" + + # ------------------- STRINGS ----------------------- + if "string" in signal._ophyd_labels_ or "position" in signal._ophyd_labels_: + # For other strings, just echo the value + return f"{value}" + + # ------------------- SWITCH (ACTIVE/INACTIVE) ------ + if "switch" in signal._ophyd_labels_ and is_bool_like(value): + return f"{green+'ACTIVE'+white if value else red+'INACTIVE'+white}" + + # ------------------- FAULT (OK/FAULT) -------------- + if "fault" in signal._ophyd_labels_ and is_bool_like(value): + return f"{green+'OK'+white if not value else red+'FAULT'+white}" + + # ------------------- VALVE/SHUTTER ----------------- + if ("valve" in signal._ophyd_labels_ or "shutter" in signal._ophyd_labels_) and is_bool_like(value): + return f"{green+'OPEN'+white if value else red+'CLOSED'+white}" + + # ------------------- FLOW (OK/FAIL) ---------------- + if "flow" in signal._ophyd_labels_ and is_bool_like(value): + return f"{green}OK{white}" if bool(value) else f"{red}FAIL{white}" + + # ------------------- FALLBACK ----------------------- + return f"{value}" + + # ------------------- PRINT START --------------------- + print(f"{bold}X12SA EPS status{white}") + + for name, component in self._sig_attrs.items(): + sub_device = getattr(self, name) + rows = [] + # Only print sub-devices, not individual request signals + if not isinstance(sub_device, Device): + continue + print(f"\n{bold}{component.doc}{white}") + for sub_walk in sub_device.walk_components(): + cpt: Cpt = sub_walk.item + it: EpicsSignalRO = getattr(sub_device, cpt.attr) + val = self.safe_get(it) + rows.append((cpt.doc, val, it)) + + label_width = max(32, *(len(label) for (label, _, _) in rows)) + + for label, value, it in rows: + fv = fmt_value(value, it) # <-- pass attr to formatter + print(f" - {label:<{label_width}} {fv}") + + if sub_device.attr_name == "cooling_water": + v1 = self.safe_get(self.cooling_water.op_cs_ecvw_0010) + v2 = self.safe_get(self.cooling_water.op_cs_ecvw_0020) + + def closed(v): + return is_bool_like(v) and not bool(v) + + if closed(v1) and closed(v2): + print(f"\n{cyan}Hint:{white} Both water cooling valves are CLOSED.\n" f"You can open them using: {bold}dev.x12saEPS.water_cooling_op(){white}") + +# fmt: on +# ---------------------------------------------------------- +# Consistency report +# ---------------------------------------------------------- +# def consistency_report(self, *, verbose=True): +# missing = [] +# dupes = [] +# seen = {} + +# for sub_device in self.walk_components(): +# section = sub_device.name +# for walk in sub_device.walk_components(): +# cpt: Cpt = walk.ancestors[-1] +# it: EpicsSignalRO = walk.item +# if not hasattr(self, it["attr"]): +# missing.append((section, it["attr"], it["label"], it["pv"])) + +# pv = it["pv"] +# if pv in seen: +# dupes.append((pv, seen[pv], (section, it["attr"], it["label"]))) +# else: +# seen[pv] = (section, it["attr"], it["label"]) + +# if verbose: +# print("=== Consistency Report ===") + +# if missing: +# print("\nMissing attributes:") +# for sec, a, lbl, pv in missing: +# print(f" - [{sec}] {a} {lbl} pv={pv}") +# else: +# print("\nNo missing attributes.") + +# if dupes: +# print("\nDuplicate PVs:") +# for pv, f1, f2 in dupes: +# print(f" {pv} → {f1} AND {f2}") +# else: +# print("\nNo duplicate PVs.") + +# return {"missing_attrs": missing, "duplicate_pvs": dupes} diff --git a/tests/tests_devices/test_eps.py b/tests/tests_devices/test_eps.py new file mode 100644 index 0000000..61c914f --- /dev/null +++ b/tests/tests_devices/test_eps.py @@ -0,0 +1,99 @@ +# pylint: skip-file +from __future__ import annotations + +import pytest +from ophyd_devices.tests.utils import patched_device + +from csaxs_bec.devices.epics.eps import EPS + +ALL_PVS = [ + # ALARMS + "X12SA-EPS-PLC:AlarmCnt_EPS", + "ARS00-MIS-PLC-01:AlarmCnt_Frontends", + # FRONTEND VALVES + "X12SA-FE-VVPG-0000:PLC_OPEN", + "X12SA-FE-VVPG-1010:PLC_OPEN", + "X12SA-FE-VVFV-2010:PLC_OPEN", + "X12SA-FE-VVPG-2010:PLC_OPEN", + # Optics VALVES + "X12SA-OP-VVPG-1010:PLC_OPEN", + "X12SA-OP-VVPG-2010:PLC_OPEN", + "X12SA-OP-VVPG-3010:PLC_OPEN", + "X12SA-OP-VVPG-3020:PLC_OPEN", + "X12SA-OP-VVPG-4010:PLC_OPEN", + "X12SA-OP-VVPG-5010:PLC_OPEN", + "X12SA-OP-VVPG-6010:PLC_OPEN", + "X12SA-OP-VVPG-7010:PLC_OPEN", + # Endstation VALVES + "X12SA-ES-VVPG-1010:PLC_OPEN", + # Frontend SHUTTERS + "X12SA-FE-PSH1-EMLS-0010:OPEN", + "X12SA-FE-STO1-EMLS-0010:OPEN", + # Optics SHUTTERS + "X12SA-OP-PSH1-EMLS-7010:OPEN", + # DMM Monochromator + "X12SA-OP-DMM-ETTC-3010:TEMP", + "X12SA-OP-DMM-ETTC-3020:TEMP", + "X12SA-OP-DMM-ETTC-3030:TEMP", + "X12SA-OP-DMM-ETTC-3040:TEMP", + "X12SA-OP-DMM-EMLS-3010:THRU", + "X12SA-OP-DMM-EMLS-3020:IN", + "X12SA-OP-DMM-EMLS-3030:THRU", + "X12SA-OP-DMM-EMLS-3040:IN", + "X12SA-OP-DMM-EMSW-3050:SWITCH", + "X12SA-OP-DMM-EMSW-3060:SWITCH", + "X12SA-OP-DMM-EMSW-3070:SWITCH", + "X12SA-OP-DMM1:ENERGY-GET", + "X12SA-OP-DMM1:POSITION", + "X12SA-OP-DMM1:STRIPE", + # CCM Monochromator + "X12SA-OP-CCM-ETTC-4010:TEMP", + "X12SA-OP-CCM-ETTC-4020:TEMP", + "X12SA-OP-CCM-EMSW-4010:SWITCH", + "X12SA-OP-CCM-EMSW-4020:SWITCH", + "X12SA-OP-CCM-EMSW-4030:SWITCH", + "X12SA-OP-CCM1:ENERGY-GET", + "X12SA-OP-CCM1:POSITION", + # Water Cooling + "X12SA-OP-SL1-EFSW-2010:FLOW", + "X12SA-OP-SL2-EFSW-2010:FLOW", + "X12SA-OP-EB1-EFSW-5010:FLOW", + "X12SA-OP-EB1-EFSW-5020:FLOW", + "X12SA-OP-SL3-EFSW-5010:FLOW", + "X12SA-OP-KB-EFSW-6010:FLOW", + "X12SA-OP-PSH1-EFSW-7010:FLOW", + "X12SA-ES-EB2-EFSW-1010:FLOW", + "X12SA-OP-CS-ECVW-0010:PLC_OPEN", + "X12SA-OP-CS-ECVW-0020:PLC_OPEN", + # Request PVs + "X12SA-EPS-PLC:ACKERR-REQUEST", + "X12SA-OP-CS-ECVW:PLC_REQUEST", +] + + +@pytest.fixture +def eps(): + dev_name = "EPS" + with patched_device(EPS, name=dev_name) as eps: + yield eps + + +def test_eps_has_signals(eps): + """Test that all expected PVs are present in the eps device.""" + found_pvs = [walk.item._read_pv.pvname for walk in eps.walk_signals()] + assert set(found_pvs) == set( + ALL_PVS + ), f"Expected PVs {ALL_PVS} but found {set(ALL_PVS) - set(found_pvs)}" + + +# pylint: disable=line-too-long +expected_show_all_output = "\x1b[1mX12SA EPS status\x1b[0m\n\n\x1b[1mEPS Alarms\x1b[0m\n - X12SA EPS Alarm count 0\n - FrontEnd MIS Alarm count 0\n\n\x1b[1mValves Frontend\x1b[0m\n - FE-VVPG-0000 \x1b[91mCLOSED\x1b[0m\n - FE-VVPG-1010 \x1b[91mCLOSED\x1b[0m\n - FE-VVFV-2010 \x1b[91mCLOSED\x1b[0m\n - FE-VVPG-2010 \x1b[91mCLOSED\x1b[0m\n\n\x1b[1mValves Optics Hutch\x1b[0m\n - OP-VVPG-1010 \x1b[91mCLOSED\x1b[0m\n - OP-VVPG-2010 \x1b[91mCLOSED\x1b[0m\n - OP-VVPG-3010 \x1b[91mCLOSED\x1b[0m\n - OP-VVPG-3020 \x1b[91mCLOSED\x1b[0m\n - OP-VVPG-4010 \x1b[91mCLOSED\x1b[0m\n - OP-VVPG-5010 \x1b[91mCLOSED\x1b[0m\n - OP-VVPG-6010 \x1b[91mCLOSED\x1b[0m\n - OP-VVPG-7010 \x1b[91mCLOSED\x1b[0m\n\n\x1b[1mValves ES Hutch\x1b[0m\n - ES-VVPG-1010 \x1b[91mCLOSED\x1b[0m\n\n\x1b[1mShutters Frontend\x1b[0m\n - FE-PSH1-EMLS-0010 \x1b[91mCLOSED\x1b[0m\n - FE-STO1-EMLS-0010 \x1b[91mCLOSED\x1b[0m\n\n\x1b[1mShutters Endstation\x1b[0m\n - OP-PSH1-EMLS-7010 \x1b[91mCLOSED\x1b[0m\n\n\x1b[1mDMM Monochromator\x1b[0m\n - DMM Temp Surface 1 0.0\n - DMM Temp Surface 2 0.0\n - DMM Temp Shield 1 (disaster) 0.0\n - DMM Temp Shield 2 (disaster) 0.0\n - DMM Translation ThruPos \x1b[91mINACTIVE\x1b[0m\n - DMM Translation InPos \x1b[91mINACTIVE\x1b[0m\n - DMM Bragg ThruPos \x1b[91mINACTIVE\x1b[0m\n - DMM Bragg InPos \x1b[91mINACTIVE\x1b[0m\n - DMM Heater Fault XTAL 1 \x1b[92mOK\x1b[0m\n - DMM Heater Fault XTAL 2 \x1b[92mOK\x1b[0m\n - DMM Heater Fault Support 1 \x1b[92mOK\x1b[0m\n - DMM Energy 0.0000\n - DMM Position out of beam\n - DMM Stripe Stripe 1 W/B4C\n\n\x1b[1mCCM Monochromator\x1b[0m\n - CCM Temp Crystal 0.0\n - CCM Temp Shield (disaster) 0.0\n - CCM Heater Fault 1 \x1b[92mOK\x1b[0m\n - CCM Heater Fault 2 \x1b[92mOK\x1b[0m\n - CCM Heater Fault 3 \x1b[92mOK\x1b[0m\n - CCM Energy 0.0000\n - CCM Position out of beam\n\n\x1b[1mCooling Water\x1b[0m\n - OP-SL1-EFSW-2010 \x1b[91mFAIL\x1b[0m\n - OP-SL2-EFSW-2010 \x1b[91mFAIL\x1b[0m\n - OP-EB1-EFSW-5010 \x1b[91mFAIL\x1b[0m\n - OP-EB1-EFSW-5020 \x1b[91mFAIL\x1b[0m\n - OP-SL3-EFSW-5010 \x1b[91mFAIL\x1b[0m\n - OP-KB-EFSW-6010 \x1b[91mFAIL\x1b[0m\n - OP-PSH1-EFSW-7010 \x1b[91mFAIL\x1b[0m\n - ES-EB2-EFSW-1010 \x1b[91mFAIL\x1b[0m\n - OP-CS-ECVW-0010 \x1b[91mCLOSED\x1b[0m\n - OP-CS-ECVW-0020 \x1b[91mCLOSED\x1b[0m\n\n\x1b[96mHint:\x1b[0m Both water cooling valves are CLOSED.\nYou can open them using: \x1b[1mdev.x12saEPS.water_cooling_op()\x1b[0m\n" + + +def test_eps_show_all(eps, capsys): + """Test that the show_all method outputs the expected status.""" + eps.show_all() + output = capsys.readouterr().out + assert ( + output == expected_show_all_output + ), f"Expected output does not match actual output.\nExpected:\n{expected_show_all_output}\nActual:\n{output}"