diff --git a/csaxs_bec/devices/epics/eps.py b/csaxs_bec/devices/epics/eps.py index d0c8629..315959c 100644 --- a/csaxs_bec/devices/epics/eps.py +++ b/csaxs_bec/devices/epics/eps.py @@ -1,11 +1,26 @@ from ophyd import Device, Component as Cpt, EpicsSignal - +import time # --------------------------- # Registry: sections/channels # --------------------------- CHANNELS = { + "EPS Alarms": [ + { + "attr": "EPSAlarmCnt", + "label": "X12SA EPS Alarm count", + "pv": "X12SA-EPS-PLC:AlarmCnt_EPS", + "kind": "alarmcount", + }, + { + "attr": "MISAlarmCnt", + "label": "FrontEnd MIS Alarm count", + "pv": "ARS00-MIS-PLC-01:AlarmCnt_Frontends", + "kind": "alarmcount", + }, + ], + "Valves Frontend": [ {"attr": "FEVVPG0000", "label": "FE-VVPG-0000", "pv": "X12SA-FE-VVPG-0000:PLC_OPEN", "kind": "valve"}, @@ -152,6 +167,7 @@ CHANNELS = { "pv": "X12SA-OP-CS-ECVW-0020:PLC_OPEN", "kind": "valve"}, ], + } @@ -161,7 +177,7 @@ CHANNELS = { def create_dynamic_eps_class(): class_attrs = dict( - USER_ACCESS=["show_all"], + USER_ACCESS=["show_all", "water_cooling_op"], SUB_VALUE="value", _default_sub="value", ) @@ -180,6 +196,59 @@ def create_dynamic_eps_class(): # ---------------------------------------------------------- class DynamicMethods: + def water_cooling_op(self): + """ + Opens all water‑cooling valves (ECVW) with alarm reset if needed. + Polls valves for 20 seconds and reports success/failure. + """ + print("\n=== Water Cooling Operation ===") + + # --------------------------- + # Collect required signals + # --------------------------- + eps_alarm = getattr(self, "EPSAlarmCnt", None) + ackerr = EpicsSignal("X12SA-EPS-PLC:ACKERR-REQUEST") + request = EpicsSignal("X12SA-OP-CS-ECVW:PLC_REQUEST") + + # Cooling valves we must check + valve_attrs = ["OPCSECVW0010", "OPCSECVW0020"] + valves = [getattr(self, a) for a in valve_attrs] + + # --------------------------- + # 1. Reset alarms if needed + # --------------------------- + alarm_value = eps_alarm.get() if eps_alarm else 0 + if alarm_value and alarm_value > 0: + print(f"EPS alarms present ({alarm_value}), resetting…") + ackerr.put(1) + time.sleep(0.3) + + # --------------------------- + # 2. Send valve‑open request + # --------------------------- + print("Sending cooling‑valve OPEN request…") + request.put(1) + + # --------------------------- + # 3. Poll valves for 20 seconds + # --------------------------- + timeout = 20 + end = time.time() + timeout + + while time.time() < end: + states = [v.get() for v in valves] + status = ["OPEN" if s else "CLOSED" for s in states] + print(" Valve status:", status) + + if all(states): + print("\n→ All cooling valves are OPEN. Operation successful.") + return True + + time.sleep(2) + + print("\n→ TIMEOUT: Cooling valves failed to open.") + return False + def show_all(self): red = "\x1b[91m" green = "\x1b[92m" @@ -234,11 +303,10 @@ def create_dynamic_eps_class(): return f"{green}OK{white}" if bool(value) else f"{red}FAIL{white}" # ------------------- FALLBACK ----------------------- - # Non-boolean numeric: return f"{value}" # ------------------- PRINT START --------------------- - print(f"{bold}X12SA valve/shutter/mono status{white}") + print(f"{bold}X12SA EPS status{white}") for section, items in CHANNELS.items(): print(f"\n{bold}{section}{white}") @@ -247,27 +315,33 @@ def create_dynamic_eps_class(): rows = [] for it in items: val = safe_get(it["attr"]) - rows.append((it["label"], val, it["pv"], it["kind"])) + rows.append((it["label"], val, it["pv"], it["kind"], it["attr"])) # Compute label width - label_width = max(32, *(len(label) for (label, _, _, _) in rows)) - - # Detect if summary applies - present = [v for (_, v, _, k) in rows if v is not None and k in ("valve", "shutter", "switch", "fault")] - bools = [v for v in present if is_bool_like(v)] - if present and len(bools) == len(present): - total = len(bools) - active = sum(1 for v in bools if v) - if active == total: - print(f"{green}All channels in this section are active.{white}") - else: - print(f"{red}Warning: {active}/{total} active.{white}") + label_width = max(32, *(len(label) for (label, _, _, _, _) in rows)) # Print lines - for label, value, pv, kind in rows: + for label, value, pv, kind, _attr in rows: fv = fmt_value(value, pv, kind) print(f" – {label:<{label_width}} {fv}") + # ------------------------------------------------- + # Contextual proposal: water_cooling() + # Triggered only in "Cooling Water" if both ECVW are CLOSED + # ------------------------------------------------- + if section == "Cooling Water": + v1 = safe_get("OPCSECVW0010") + v2 = safe_get("OPCSECVW0020") + + def is_closed(v): + return is_bool_like(v) and (v is False or v == 0) + + if is_closed(v1) and is_closed(v2): + print( + f"\n{cyan}Hint:{white} Water cooling valves OP are closed. " + f"You can request them to open them via {bold}dev.x12saEPS.water_cooling_op(){white}." + ) + # ---------------------------------------------------------- # Duplicates / missing PV validation # ---------------------------------------------------------- @@ -308,6 +382,7 @@ def create_dynamic_eps_class(): # Bind methods into class class_attrs["show_all"] = DynamicMethods.show_all class_attrs["consistency_report"] = DynamicMethods.consistency_report + class_attrs["water_cooling_op"] = DynamicMethods.water_cooling_op # Create the Device subclass return type("cSAXSEps", (Device,), class_attrs)