water on method

This commit is contained in:
x12sa
2026-01-26 13:53:51 +01:00
committed by appel_c
parent 6de70d07bb
commit 2a46536c39

View File

@@ -1,11 +1,26 @@
from ophyd import Device, Component as Cpt, EpicsSignal
import time
# ---------------------------
# Registry: sections/channels
# ---------------------------
CHANNELS = {
"EPS Alarms": [
{
"attr": "EPSAlarmCnt",
"label": "X12SA EPS Alarm count",
"pv": "X12SA-EPS-PLC:AlarmCnt_EPS",
"kind": "alarmcount",
},
{
"attr": "MISAlarmCnt",
"label": "FrontEnd MIS Alarm count",
"pv": "ARS00-MIS-PLC-01:AlarmCnt_Frontends",
"kind": "alarmcount",
},
],
"Valves Frontend": [
{"attr": "FEVVPG0000", "label": "FE-VVPG-0000",
"pv": "X12SA-FE-VVPG-0000:PLC_OPEN", "kind": "valve"},
@@ -152,6 +167,7 @@ CHANNELS = {
"pv": "X12SA-OP-CS-ECVW-0020:PLC_OPEN", "kind": "valve"},
],
}
@@ -161,7 +177,7 @@ CHANNELS = {
def create_dynamic_eps_class():
class_attrs = dict(
USER_ACCESS=["show_all"],
USER_ACCESS=["show_all", "water_cooling_op"],
SUB_VALUE="value",
_default_sub="value",
)
@@ -180,6 +196,59 @@ def create_dynamic_eps_class():
# ----------------------------------------------------------
class DynamicMethods:
def water_cooling_op(self):
"""
Opens all watercooling valves (ECVW) with alarm reset if needed.
Polls valves for 20 seconds and reports success/failure.
"""
print("\n=== Water Cooling Operation ===")
# ---------------------------
# Collect required signals
# ---------------------------
eps_alarm = getattr(self, "EPSAlarmCnt", None)
ackerr = EpicsSignal("X12SA-EPS-PLC:ACKERR-REQUEST")
request = EpicsSignal("X12SA-OP-CS-ECVW:PLC_REQUEST")
# Cooling valves we must check
valve_attrs = ["OPCSECVW0010", "OPCSECVW0020"]
valves = [getattr(self, a) for a in valve_attrs]
# ---------------------------
# 1. Reset alarms if needed
# ---------------------------
alarm_value = eps_alarm.get() if eps_alarm else 0
if alarm_value and alarm_value > 0:
print(f"EPS alarms present ({alarm_value}), resetting…")
ackerr.put(1)
time.sleep(0.3)
# ---------------------------
# 2. Send valveopen request
# ---------------------------
print("Sending coolingvalve OPEN request…")
request.put(1)
# ---------------------------
# 3. Poll valves for 20 seconds
# ---------------------------
timeout = 20
end = time.time() + timeout
while time.time() < end:
states = [v.get() for v in valves]
status = ["OPEN" if s else "CLOSED" for s in states]
print(" Valve status:", status)
if all(states):
print("\n→ All cooling valves are OPEN. Operation successful.")
return True
time.sleep(2)
print("\n→ TIMEOUT: Cooling valves failed to open.")
return False
def show_all(self):
red = "\x1b[91m"
green = "\x1b[92m"
@@ -234,11 +303,10 @@ def create_dynamic_eps_class():
return f"{green}OK{white}" if bool(value) else f"{red}FAIL{white}"
# ------------------- FALLBACK -----------------------
# Non-boolean numeric:
return f"{value}"
# ------------------- PRINT START ---------------------
print(f"{bold}X12SA valve/shutter/mono status{white}")
print(f"{bold}X12SA EPS status{white}")
for section, items in CHANNELS.items():
print(f"\n{bold}{section}{white}")
@@ -247,27 +315,33 @@ def create_dynamic_eps_class():
rows = []
for it in items:
val = safe_get(it["attr"])
rows.append((it["label"], val, it["pv"], it["kind"]))
rows.append((it["label"], val, it["pv"], it["kind"], it["attr"]))
# Compute label width
label_width = max(32, *(len(label) for (label, _, _, _) in rows))
# Detect if summary applies
present = [v for (_, v, _, k) in rows if v is not None and k in ("valve", "shutter", "switch", "fault")]
bools = [v for v in present if is_bool_like(v)]
if present and len(bools) == len(present):
total = len(bools)
active = sum(1 for v in bools if v)
if active == total:
print(f"{green}All channels in this section are active.{white}")
else:
print(f"{red}Warning: {active}/{total} active.{white}")
label_width = max(32, *(len(label) for (label, _, _, _, _) in rows))
# Print lines
for label, value, pv, kind in rows:
for label, value, pv, kind, _attr in rows:
fv = fmt_value(value, pv, kind)
print(f" {label:<{label_width}} {fv}")
# -------------------------------------------------
# Contextual proposal: water_cooling()
# Triggered only in "Cooling Water" if both ECVW are CLOSED
# -------------------------------------------------
if section == "Cooling Water":
v1 = safe_get("OPCSECVW0010")
v2 = safe_get("OPCSECVW0020")
def is_closed(v):
return is_bool_like(v) and (v is False or v == 0)
if is_closed(v1) and is_closed(v2):
print(
f"\n{cyan}Hint:{white} Water cooling valves OP are closed. "
f"You can request them to open them via {bold}dev.x12saEPS.water_cooling_op(){white}."
)
# ----------------------------------------------------------
# Duplicates / missing PV validation
# ----------------------------------------------------------
@@ -308,6 +382,7 @@ def create_dynamic_eps_class():
# Bind methods into class
class_attrs["show_all"] = DynamicMethods.show_all
class_attrs["consistency_report"] = DynamicMethods.consistency_report
class_attrs["water_cooling_op"] = DynamicMethods.water_cooling_op
# Create the Device subclass
return type("cSAXSEps", (Device,), class_attrs)