From f40f295081cf9c9824ead4094a24ca97c3f25fa2 Mon Sep 17 00:00:00 2001 From: appel_c Date: Wed, 11 Feb 2026 19:03:16 +0100 Subject: [PATCH] test(eps): Add tests for EPS device --- csaxs_bec/devices/epics/eps.py | 222 +++----------------------------- tests/tests_devices/test_eps.py | 84 ++++++++++++ 2 files changed, 104 insertions(+), 202 deletions(-) create mode 100644 tests/tests_devices/test_eps.py diff --git a/csaxs_bec/devices/epics/eps.py b/csaxs_bec/devices/epics/eps.py index 499c98d..d7678f5 100644 --- a/csaxs_bec/devices/epics/eps.py +++ b/csaxs_bec/devices/epics/eps.py @@ -577,8 +577,26 @@ class EPS(PSIDeviceBase): cooling_water = Cpt(CoolingWater, name="cooling_water") # Acknowledgment signals for PLC communication (if needed for future use) - ackerr = EpicsSignal("X12SA-EPS-PLC:ACKERR-REQUEST") - request = EpicsSignal("X12SA-OP-CS-ECVW:PLC_REQUEST") + ackerr = Cpt( + EpicsSignal, + read_pv="X12SA-EPS-PLC:ACKERR-REQUEST", + add_prefix=("",), + name="ackerr", + kind=Kind.omitted, + doc="ACKERR request - OP-CS-ECVW-0020", + auto_monitor=True, + labels={"request"}, + ) + request = Cpt( + EpicsSignal, + read_pv="X12SA-OP-CS-ECVW:PLC_REQUEST", + add_prefix=("",), + name="op_cs_ecvw_request", + kind=Kind.omitted, + doc="PLC request - OP-CS-ECVW-PLC_REQUEST", + auto_monitor=True, + labels={"request"}, + ) def _notify(self, msg: str, show_as_client_msg: bool = True): """Utility method to print a message, and optionally send it to the client UI if it should be shown also as a client message.""" @@ -882,203 +900,3 @@ class EPS(PSIDeviceBase): # print("\nNo duplicate PVs.") # return {"missing_attrs": missing, "duplicate_pvs": dupes} - - -# ============================================================================== -# DYNAMIC CLASS CREATION — Ophyd-safe -# ============================================================================== -def create_dynamic_eps_class(): - - class_attrs = dict( - USER_ACCESS=["show_all", "water_cooling_op"], SUB_VALUE="value", _default_sub="value" - ) - - # Dynamically define Components - for section, items in CHANNELS.items(): - for it in items: - class_attrs[it["attr"]] = Cpt(EpicsSignal, name=it["label"], read_pv=it["pv"]) - - # ---------------------------------------------------------- - # Methods - # ---------------------------------------------------------- - class DynamicMethods: - - # ------------------------ - # Client messaging helper - # ------------------------ - def _notify(self, msg: str, show_asap=True): - """ - Send a message to the client UI through the device manager. - Falls back to print() if device_manager/connector is missing. - """ - try: - conn = getattr(getattr(self, "device_manager", None), "connector", None) - if conn and hasattr(conn, "send_client_info"): - conn.send_client_info(msg, scope="", show_asap=show_asap) - else: - print(msg) - except Exception: - print(msg) - - # ---------------------------------------------------------- - # Water cooling operation - # ---------------------------------------------------------- - def water_cooling_op(self): - """ - Open ECVW valves, reset EPS alarms, monitor for 20s, - then ensure stability (valves remain open) for 10s. - All messages sent to client. - """ - from ophyd import EpicsSignal - - POLL_PERIOD = 2 - TIMEOUT = 20 - STABILITY = 15 - - self._notify("=== Water Cooling Operation ===") - - # --- Signals --- - eps_alarm_sig = getattr(self, "EPSAlarmCnt", None) - ackerr = EpicsSignal("X12SA-EPS-PLC:ACKERR-REQUEST") - request = EpicsSignal("X12SA-OP-CS-ECVW:PLC_REQUEST") - - valve_attrs = ["OPCSECVW0010", "OPCSECVW0020"] - valves = [getattr(self, a, None) for a in valve_attrs] - - # Flow channels list extracted from CHANNELS - flow_items = [ - (it["attr"], it["label"]) - for it in CHANNELS["Cooling Water"] - if it["kind"] == "flow" - ] - - def safe_get(sig, default=None): - try: - return sig.get() - except Exception: - return default - - # --- Step 1: EPS alarm reset --- - alarm_value = safe_get(eps_alarm_sig, 0) - if alarm_value and alarm_value > 0: - self._notify(f"[WaterCooling] EPS alarms present ({alarm_value}) → resetting…") - try: - ackerr.put(1) - except Exception as ex: - self._notify(f"[WaterCooling] WARNING: ACKERR write failed: {ex}") - time.sleep(0.3) - else: - self._notify("[WaterCooling] No EPS alarms detected.") - - # --- Step 2: Issue open request --- - self._notify("[WaterCooling] Sending cooling‑valve OPEN request…") - try: - request.put(1) - except Exception as ex: - self._notify(f"[WaterCooling] ERROR: Failed to send OPEN request: {ex}") - return False - - # --- Step 3: Monitoring loop (clean client table output) --- - start = time.time() - end = start + TIMEOUT - stable_until = None - - # Print (server-side) header once - print("Monitoring valves and flow sensors...") - print(f" Valves: {valve_attrs[0][-4:]}, {valve_attrs[1][-4:]}") - print(f" Note: stability requires valves to remain OPEN for {STABILITY} seconds.") - - # One table header to the client (via device manager) - # Fixed-width columns for alignment in monospaced UI - table_header = f"{'Time':>6} | {'Valves':<21} | {'Flows (OK/FAIL/N/A)':<20}" - self._notify(table_header) - - def snapshot(): - # Valve snapshot - v_states = [safe_get(v, None) for v in valves] - v1 = f"{valve_attrs[0][-4:]}=" + ( - "OPEN " - if v_states[0] is True or v_states[0] == 1 - else "CLOSED" if v_states[0] is False or v_states[0] == 0 else "N/A " - ) - v2 = f"{valve_attrs[1][-4:]}=" + ( - "OPEN " - if v_states[1] is True or v_states[1] == 1 - else "CLOSED" if v_states[1] is False or v_states[1] == 0 else "N/A " - ) - # 2 valves with a single space between => width ~ 21 - valve_str = f"{v1} {v2}" - - # Flow summary: OK/FAIL/N/A counts (compact) - flow_states = [] - for f_attr, _lbl in flow_items: - fsig = getattr(self, f_attr, None) - fval = safe_get(fsig, None) - flow_states.append( - True if fval in (1, True) else False if fval in (0, False) else None - ) - - ok = sum(1 for f in flow_states if f is True) - fail = sum(1 for f in flow_states if f is False) - na = sum(1 for f in flow_states if f is None) - flow_summary = f"{ok:>2} / {fail:>2} / {na:>2}" - - return v_states, valve_str, flow_summary - - while True: - now = time.time() - elapsed = int(now - start) - - if now > end: - # One last line to client - v_states, valves_s, flows_s = snapshot() - self._notify(f"{elapsed:>6}s | {valves_s:<21} | {flows_s:<20}") - print("→ TIMEOUT: Cooling valves failed to remain OPEN.") - return False - - # Live snapshot - v_states, valves_s, flows_s = snapshot() - # Exactly one concise line to client per cycle - self._notify(f"{elapsed:>6}s | {valves_s:<21} | {flows_s:<20}") - - both_open = all(s is not None and bool(s) for s in v_states) - - if both_open: - if stable_until is None: - stable_until = now + STABILITY - print( - f"[WaterCooling] Both valves OPEN → starting {STABILITY}s stability window…" - ) - else: - if now >= stable_until: - print("→ SUCCESS: Valves remained OPEN during stability window.") - return True - else: - if stable_until is not None: - print("[WaterCooling] Valve closed again → restarting stability window.") - stable_until = None - - time.sleep(POLL_PERIOD) - - # Attach methods to class attributes - class_attrs["show_all"] = DynamicMethods.show_all - class_attrs["consistency_report"] = DynamicMethods.consistency_report - class_attrs["water_cooling_op"] = DynamicMethods.water_cooling_op - class_attrs["_notify"] = DynamicMethods._notify - - # ---------------------------------------------------------- - # Custom __init__ to accept device_manager (BEC) - # ---------------------------------------------------------- - def __init__(self, *args, device_manager=None, **kwargs): - super(cls, self).__init__(*args, **kwargs) - self.device_manager = device_manager - - # Create class - cls = type("cSAXSEps", (Device,), class_attrs) - setattr(cls, "__init__", __init__) - - return cls - - -# Create final class for BEC import -cSAXSEps = create_dynamic_eps_class() diff --git a/tests/tests_devices/test_eps.py b/tests/tests_devices/test_eps.py new file mode 100644 index 0000000..86a590e --- /dev/null +++ b/tests/tests_devices/test_eps.py @@ -0,0 +1,84 @@ +from __future__ import annotations + +import pytest +from ophyd_devices.tests.utils import patched_device + +from csaxs_bec.devices.epics.eps import EPS + +ALL_PVS = [ + # ALARMS + "X12SA-EPS-PLC:AlarmCnt_EPS", + "ARS00-MIS-PLC-01:AlarmCnt_Frontends", + # FRONTEND VALVES + "X12SA-FE-VVPG-0000:PLC_OPEN", + "X12SA-FE-VVPG-1010:PLC_OPEN", + "X12SA-FE-VVFV-2010:PLC_OPEN", + "X12SA-FE-VVPG-2010:PLC_OPEN", + # Optics VALVES + "X12SA-OP-VVPG-1010:PLC_OPEN", + "X12SA-OP-VVPG-2010:PLC_OPEN", + "X12SA-OP-VVPG-3010:PLC_OPEN", + "X12SA-OP-VVPG-3020:PLC_OPEN", + "X12SA-OP-VVPG-4010:PLC_OPEN", + "X12SA-OP-VVPG-5010:PLC_OPEN", + "X12SA-OP-VVPG-6010:PLC_OPEN", + "X12SA-OP-VVPG-7010:PLC_OPEN", + # Endstation VALVES + "X12SA-ES-VVPG-1010:PLC_OPEN", + # Frontend SHUTTERS + "X12SA-FE-PSH1-EMLS-0010:OPEN", + "X12SA-FE-STO1-EMLS-0010:OPEN", + # Optics SHUTTERS + "X12SA-OP-PSH1-EMLS-7010:OPEN", + # DMM Monochromator + "X12SA-OP-DMM-ETTC-3010:TEMP", + "X12SA-OP-DMM-ETTC-3020:TEMP", + "X12SA-OP-DMM-ETTC-3030:TEMP", + "X12SA-OP-DMM-ETTC-3040:TEMP", + "X12SA-OP-DMM-EMLS-3010:THRU", + "X12SA-OP-DMM-EMLS-3020:IN", + "X12SA-OP-DMM-EMLS-3030:THRU", + "X12SA-OP-DMM-EMLS-3040:IN", + "X12SA-OP-DMM-EMSW-3050:SWITCH", + "X12SA-OP-DMM-EMSW-3060:SWITCH", + "X12SA-OP-DMM-EMSW-3070:SWITCH", + "X12SA-OP-DMM1:ENERGY-GET", + "X12SA-OP-DMM1:POSITION", + "X12SA-OP-DMM1:STRIPE", + # CCM Monochromator + "X12SA-OP-CCM-ETTC-4010:TEMP", + "X12SA-OP-CCM-ETTC-4020:TEMP", + "X12SA-OP-CCM-EMSW-4010:SWITCH", + "X12SA-OP-CCM-EMSW-4020:SWITCH", + "X12SA-OP-CCM-EMSW-4030:SWITCH", + "X12SA-OP-CCM1:ENERGY-GET", + "X12SA-OP-CCM1:POSITION", + # Water Cooling + "X12SA-OP-SL1-EFSW-2010:FLOW", + "X12SA-OP-SL2-EFSW-2010:FLOW", + "X12SA-OP-EB1-EFSW-5010:FLOW", + "X12SA-OP-EB1-EFSW-5020:FLOW", + "X12SA-OP-SL3-EFSW-5010:FLOW", + "X12SA-OP-KB-EFSW-6010:FLOW", + "X12SA-OP-PSH1-EFSW-7010:FLOW", + "X12SA-ES-EB2-EFSW-1010:FLOW", + "X12SA-OP-CS-ECVW-0010:PLC_OPEN", + "X12SA-OP-CS-ECVW-0020:PLC_OPEN", + # Request PVs + "X12SA-EPS-PLC:ACKERR-REQUEST", + "X12SA-OP-CS-ECVW:PLC_REQUEST", +] + + +@pytest.fixture +def eps(): + dev_name = "EPS" + with patched_device(EPS, name=dev_name) as eps: + yield eps + + +def test_eps_has_signals(eps): + found_pvs = [walk.item._read_pv.pvname for walk in eps.walk_signals()] + assert set(found_pvs) == set( + ALL_PVS + ), f"Expected PVs {ALL_PVS} but found {set(ALL_PVS) - set(found_pvs)}"