test(eps): Add tests for EPS device

This commit is contained in:
2026-02-11 19:03:16 +01:00
parent 1b6f8356c2
commit f40f295081
2 changed files with 104 additions and 202 deletions

View File

@@ -577,8 +577,26 @@ class EPS(PSIDeviceBase):
cooling_water = Cpt(CoolingWater, name="cooling_water")
# Acknowledgment signals for PLC communication (if needed for future use)
ackerr = EpicsSignal("X12SA-EPS-PLC:ACKERR-REQUEST")
request = EpicsSignal("X12SA-OP-CS-ECVW:PLC_REQUEST")
ackerr = Cpt(
EpicsSignal,
read_pv="X12SA-EPS-PLC:ACKERR-REQUEST",
add_prefix=("",),
name="ackerr",
kind=Kind.omitted,
doc="ACKERR request - OP-CS-ECVW-0020",
auto_monitor=True,
labels={"request"},
)
request = Cpt(
EpicsSignal,
read_pv="X12SA-OP-CS-ECVW:PLC_REQUEST",
add_prefix=("",),
name="op_cs_ecvw_request",
kind=Kind.omitted,
doc="PLC request - OP-CS-ECVW-PLC_REQUEST",
auto_monitor=True,
labels={"request"},
)
def _notify(self, msg: str, show_as_client_msg: bool = True):
"""Utility method to print a message, and optionally send it to the client UI if it should be shown also as a client message."""
@@ -882,203 +900,3 @@ class EPS(PSIDeviceBase):
# print("\nNo duplicate PVs.")
# return {"missing_attrs": missing, "duplicate_pvs": dupes}
# ==============================================================================
# DYNAMIC CLASS CREATION — Ophyd-safe
# ==============================================================================
def create_dynamic_eps_class():
class_attrs = dict(
USER_ACCESS=["show_all", "water_cooling_op"], SUB_VALUE="value", _default_sub="value"
)
# Dynamically define Components
for section, items in CHANNELS.items():
for it in items:
class_attrs[it["attr"]] = Cpt(EpicsSignal, name=it["label"], read_pv=it["pv"])
# ----------------------------------------------------------
# Methods
# ----------------------------------------------------------
class DynamicMethods:
# ------------------------
# Client messaging helper
# ------------------------
def _notify(self, msg: str, show_asap=True):
"""
Send a message to the client UI through the device manager.
Falls back to print() if device_manager/connector is missing.
"""
try:
conn = getattr(getattr(self, "device_manager", None), "connector", None)
if conn and hasattr(conn, "send_client_info"):
conn.send_client_info(msg, scope="", show_asap=show_asap)
else:
print(msg)
except Exception:
print(msg)
# ----------------------------------------------------------
# Water cooling operation
# ----------------------------------------------------------
def water_cooling_op(self):
"""
Open ECVW valves, reset EPS alarms, monitor for 20s,
then ensure stability (valves remain open) for 10s.
All messages sent to client.
"""
from ophyd import EpicsSignal
POLL_PERIOD = 2
TIMEOUT = 20
STABILITY = 15
self._notify("=== Water Cooling Operation ===")
# --- Signals ---
eps_alarm_sig = getattr(self, "EPSAlarmCnt", None)
ackerr = EpicsSignal("X12SA-EPS-PLC:ACKERR-REQUEST")
request = EpicsSignal("X12SA-OP-CS-ECVW:PLC_REQUEST")
valve_attrs = ["OPCSECVW0010", "OPCSECVW0020"]
valves = [getattr(self, a, None) for a in valve_attrs]
# Flow channels list extracted from CHANNELS
flow_items = [
(it["attr"], it["label"])
for it in CHANNELS["Cooling Water"]
if it["kind"] == "flow"
]
def safe_get(sig, default=None):
try:
return sig.get()
except Exception:
return default
# --- Step 1: EPS alarm reset ---
alarm_value = safe_get(eps_alarm_sig, 0)
if alarm_value and alarm_value > 0:
self._notify(f"[WaterCooling] EPS alarms present ({alarm_value}) → resetting…")
try:
ackerr.put(1)
except Exception as ex:
self._notify(f"[WaterCooling] WARNING: ACKERR write failed: {ex}")
time.sleep(0.3)
else:
self._notify("[WaterCooling] No EPS alarms detected.")
# --- Step 2: Issue open request ---
self._notify("[WaterCooling] Sending coolingvalve OPEN request…")
try:
request.put(1)
except Exception as ex:
self._notify(f"[WaterCooling] ERROR: Failed to send OPEN request: {ex}")
return False
# --- Step 3: Monitoring loop (clean client table output) ---
start = time.time()
end = start + TIMEOUT
stable_until = None
# Print (server-side) header once
print("Monitoring valves and flow sensors...")
print(f" Valves: {valve_attrs[0][-4:]}, {valve_attrs[1][-4:]}")
print(f" Note: stability requires valves to remain OPEN for {STABILITY} seconds.")
# One table header to the client (via device manager)
# Fixed-width columns for alignment in monospaced UI
table_header = f"{'Time':>6} | {'Valves':<21} | {'Flows (OK/FAIL/N/A)':<20}"
self._notify(table_header)
def snapshot():
# Valve snapshot
v_states = [safe_get(v, None) for v in valves]
v1 = f"{valve_attrs[0][-4:]}=" + (
"OPEN "
if v_states[0] is True or v_states[0] == 1
else "CLOSED" if v_states[0] is False or v_states[0] == 0 else "N/A "
)
v2 = f"{valve_attrs[1][-4:]}=" + (
"OPEN "
if v_states[1] is True or v_states[1] == 1
else "CLOSED" if v_states[1] is False or v_states[1] == 0 else "N/A "
)
# 2 valves with a single space between => width ~ 21
valve_str = f"{v1} {v2}"
# Flow summary: OK/FAIL/N/A counts (compact)
flow_states = []
for f_attr, _lbl in flow_items:
fsig = getattr(self, f_attr, None)
fval = safe_get(fsig, None)
flow_states.append(
True if fval in (1, True) else False if fval in (0, False) else None
)
ok = sum(1 for f in flow_states if f is True)
fail = sum(1 for f in flow_states if f is False)
na = sum(1 for f in flow_states if f is None)
flow_summary = f"{ok:>2} / {fail:>2} / {na:>2}"
return v_states, valve_str, flow_summary
while True:
now = time.time()
elapsed = int(now - start)
if now > end:
# One last line to client
v_states, valves_s, flows_s = snapshot()
self._notify(f"{elapsed:>6}s | {valves_s:<21} | {flows_s:<20}")
print("→ TIMEOUT: Cooling valves failed to remain OPEN.")
return False
# Live snapshot
v_states, valves_s, flows_s = snapshot()
# Exactly one concise line to client per cycle
self._notify(f"{elapsed:>6}s | {valves_s:<21} | {flows_s:<20}")
both_open = all(s is not None and bool(s) for s in v_states)
if both_open:
if stable_until is None:
stable_until = now + STABILITY
print(
f"[WaterCooling] Both valves OPEN → starting {STABILITY}s stability window…"
)
else:
if now >= stable_until:
print("→ SUCCESS: Valves remained OPEN during stability window.")
return True
else:
if stable_until is not None:
print("[WaterCooling] Valve closed again → restarting stability window.")
stable_until = None
time.sleep(POLL_PERIOD)
# Attach methods to class attributes
class_attrs["show_all"] = DynamicMethods.show_all
class_attrs["consistency_report"] = DynamicMethods.consistency_report
class_attrs["water_cooling_op"] = DynamicMethods.water_cooling_op
class_attrs["_notify"] = DynamicMethods._notify
# ----------------------------------------------------------
# Custom __init__ to accept device_manager (BEC)
# ----------------------------------------------------------
def __init__(self, *args, device_manager=None, **kwargs):
super(cls, self).__init__(*args, **kwargs)
self.device_manager = device_manager
# Create class
cls = type("cSAXSEps", (Device,), class_attrs)
setattr(cls, "__init__", __init__)
return cls
# Create final class for BEC import
cSAXSEps = create_dynamic_eps_class()

View File

@@ -0,0 +1,84 @@
from __future__ import annotations
import pytest
from ophyd_devices.tests.utils import patched_device
from csaxs_bec.devices.epics.eps import EPS
ALL_PVS = [
# ALARMS
"X12SA-EPS-PLC:AlarmCnt_EPS",
"ARS00-MIS-PLC-01:AlarmCnt_Frontends",
# FRONTEND VALVES
"X12SA-FE-VVPG-0000:PLC_OPEN",
"X12SA-FE-VVPG-1010:PLC_OPEN",
"X12SA-FE-VVFV-2010:PLC_OPEN",
"X12SA-FE-VVPG-2010:PLC_OPEN",
# Optics VALVES
"X12SA-OP-VVPG-1010:PLC_OPEN",
"X12SA-OP-VVPG-2010:PLC_OPEN",
"X12SA-OP-VVPG-3010:PLC_OPEN",
"X12SA-OP-VVPG-3020:PLC_OPEN",
"X12SA-OP-VVPG-4010:PLC_OPEN",
"X12SA-OP-VVPG-5010:PLC_OPEN",
"X12SA-OP-VVPG-6010:PLC_OPEN",
"X12SA-OP-VVPG-7010:PLC_OPEN",
# Endstation VALVES
"X12SA-ES-VVPG-1010:PLC_OPEN",
# Frontend SHUTTERS
"X12SA-FE-PSH1-EMLS-0010:OPEN",
"X12SA-FE-STO1-EMLS-0010:OPEN",
# Optics SHUTTERS
"X12SA-OP-PSH1-EMLS-7010:OPEN",
# DMM Monochromator
"X12SA-OP-DMM-ETTC-3010:TEMP",
"X12SA-OP-DMM-ETTC-3020:TEMP",
"X12SA-OP-DMM-ETTC-3030:TEMP",
"X12SA-OP-DMM-ETTC-3040:TEMP",
"X12SA-OP-DMM-EMLS-3010:THRU",
"X12SA-OP-DMM-EMLS-3020:IN",
"X12SA-OP-DMM-EMLS-3030:THRU",
"X12SA-OP-DMM-EMLS-3040:IN",
"X12SA-OP-DMM-EMSW-3050:SWITCH",
"X12SA-OP-DMM-EMSW-3060:SWITCH",
"X12SA-OP-DMM-EMSW-3070:SWITCH",
"X12SA-OP-DMM1:ENERGY-GET",
"X12SA-OP-DMM1:POSITION",
"X12SA-OP-DMM1:STRIPE",
# CCM Monochromator
"X12SA-OP-CCM-ETTC-4010:TEMP",
"X12SA-OP-CCM-ETTC-4020:TEMP",
"X12SA-OP-CCM-EMSW-4010:SWITCH",
"X12SA-OP-CCM-EMSW-4020:SWITCH",
"X12SA-OP-CCM-EMSW-4030:SWITCH",
"X12SA-OP-CCM1:ENERGY-GET",
"X12SA-OP-CCM1:POSITION",
# Water Cooling
"X12SA-OP-SL1-EFSW-2010:FLOW",
"X12SA-OP-SL2-EFSW-2010:FLOW",
"X12SA-OP-EB1-EFSW-5010:FLOW",
"X12SA-OP-EB1-EFSW-5020:FLOW",
"X12SA-OP-SL3-EFSW-5010:FLOW",
"X12SA-OP-KB-EFSW-6010:FLOW",
"X12SA-OP-PSH1-EFSW-7010:FLOW",
"X12SA-ES-EB2-EFSW-1010:FLOW",
"X12SA-OP-CS-ECVW-0010:PLC_OPEN",
"X12SA-OP-CS-ECVW-0020:PLC_OPEN",
# Request PVs
"X12SA-EPS-PLC:ACKERR-REQUEST",
"X12SA-OP-CS-ECVW:PLC_REQUEST",
]
@pytest.fixture
def eps():
dev_name = "EPS"
with patched_device(EPS, name=dev_name) as eps:
yield eps
def test_eps_has_signals(eps):
found_pvs = [walk.item._read_pv.pvname for walk in eps.walk_signals()]
assert set(found_pvs) == set(
ALL_PVS
), f"Expected PVs {ALL_PVS} but found {set(ALL_PVS) - set(found_pvs)}"