fixed enable of water cooling
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m23s
CI for csaxs_bec / test (pull_request) Successful in 1m19s

This commit is contained in:
x12sa
2026-01-26 14:20:46 +01:00
parent af8b408fa8
commit 262d8d7ac9

View File

@@ -1,6 +1,7 @@
from ophyd import Device, Component as Cpt, EpicsSignal
import time
# ---------------------------
# Registry: sections/channels
# ---------------------------
@@ -68,13 +69,7 @@ CHANNELS = {
"pv": "X12SA-OP-PSH1-EMLS-7010:OPEN", "kind": "shutter"},
],
# -------------------------------------------------
# DMM MONOCHROMATOR — with temperature, switches,
# heater faults, energy, string readbacks
# -------------------------------------------------
"DMM Monochromator": [
# Temperature sensors (1 decimal)
{"attr": "DMM_ETTC_3010", "label": "DMM Temp Surface 1",
"pv": "X12SA-OP-DMM-ETTC-3010:TEMP", "kind": "temp"},
{"attr": "DMM_ETTC_3020", "label": "DMM Temp Surface 2",
@@ -84,7 +79,6 @@ CHANNELS = {
{"attr": "DMM_ETTC_3040", "label": "DMM Temp Shield 2 (disaster)",
"pv": "X12SA-OP-DMM-ETTC-3040:TEMP", "kind": "temp"},
# Switches (ACTIVE/INACTIVE)
{"attr": "DMM_EMLS_3010", "label": "DMM Translation ThruPos",
"pv": "X12SA-OP-DMM-EMLS-3010:THRU", "kind": "switch"},
{"attr": "DMM_EMLS_3020", "label": "DMM Translation InPos",
@@ -94,7 +88,6 @@ CHANNELS = {
{"attr": "DMM_EMLS_3040", "label": "DMM Bragg InPos",
"pv": "X12SA-OP-DMM-EMLS-3040:IN", "kind": "switch"},
# Heater faults (OK / FAULT)
{"attr": "DMM_EMSW_3050_SWITCH", "label": "DMM Heater Fault XTAL 1",
"pv": "X12SA-OP-DMM-EMSW-3050:SWITCH", "kind": "fault"},
{"attr": "DMM_EMSW_3060_SWITCH", "label": "DMM Heater Fault XTAL 2",
@@ -102,7 +95,6 @@ CHANNELS = {
{"attr": "DMM_EMSW_3070_SWITCH", "label": "DMM Heater Fault Support 1",
"pv": "X12SA-OP-DMM-EMSW-3070:SWITCH", "kind": "fault"},
# Readbacks
{"attr": "DMM1_ENERGY_GET", "label": "DMM Energy",
"pv": "X12SA-OP-DMM1:ENERGY-GET", "kind": "energy"},
{"attr": "DMM1_POSITION", "label": "DMM Position",
@@ -111,18 +103,12 @@ CHANNELS = {
"pv": "X12SA-OP-DMM1:STRIPE", "kind": "string"},
],
# -------------------------------------------------
# CCM MONOCHROMATOR
# -------------------------------------------------
"CCM Monochromator": [
# Temperatures
{"attr": "CCM_ETTC_4010", "label": "CCM Temp Crystal",
"pv": "X12SA-OP-CCM-ETTC-4010:TEMP", "kind": "temp"},
{"attr": "CCM_ETTC_4020", "label": "CCM Temp Shield (disaster)",
"pv": "X12SA-OP-CCM-ETTC-4020:TEMP", "kind": "temp"},
# Heater faults
{"attr": "CCM_EMSW_4010_SWITCH", "label": "CCM Heater Fault 1",
"pv": "X12SA-OP-CCM-EMSW-4010:SWITCH", "kind": "fault"},
{"attr": "CCM_EMSW_4020_SWITCH", "label": "CCM Heater Fault 2",
@@ -130,44 +116,35 @@ CHANNELS = {
{"attr": "CCM_EMSW_4030_SWITCH", "label": "CCM Heater Fault 3",
"pv": "X12SA-OP-CCM-EMSW-4030:SWITCH", "kind": "fault"},
# Readbacks
{"attr": "CCM1_ENERGY_GET", "label": "CCM Energy",
"pv": "X12SA-OP-CCM1:ENERGY-GET", "kind": "energy"},
{"attr": "CCM1_POSITION", "label": "CCM Position",
"pv": "X12SA-OP-CCM1:POSITION", "kind": "string"},
],
# -------------------------------------------------
# COOLING WATER (flows + supply/return valves)
# -------------------------------------------------
"Cooling Water": [
# Flows (1=OK, 0=FAIL). Labels mirror the tag style.
{"attr": "OPSL1EFSW2010", "label": "OP-SL1-EFSW-2010",
{"attr": "OPSL1EFSW2010", "label": "OP-SL1-EFSW-2010",
"pv": "X12SA-OP-SL1-EFSW-2010:FLOW", "kind": "flow"},
{"attr": "OPSL2EFSW2010", "label": "OP-SL2-EFSW-2010",
{"attr": "OPSL2EFSW2010", "label": "OP-SL2-EFSW-2010",
"pv": "X12SA-OP-SL2-EFSW-2010:FLOW", "kind": "flow"},
{"attr": "OPEB1EFSW5010", "label": "OP-EB1-EFSW-5010",
{"attr": "OPEB1EFSW5010", "label": "OP-EB1-EFSW-5010",
"pv": "X12SA-OP-EB1-EFSW-5010:FLOW", "kind": "flow"},
{"attr": "OPEB1EFSW5020", "label": "OP-EB1-EFSW-5020",
{"attr": "OPEB1EFSW5020", "label": "OP-EB1-EFSW-5020",
"pv": "X12SA-OP-EB1-EFSW-5020:FLOW", "kind": "flow"},
{"attr": "OPSL3EFSW5010", "label": "OP-SL3-EFSW-5010",
{"attr": "OPSL3EFSW5010", "label": "OP-SL3-EFSW-5010",
"pv": "X12SA-OP-SL3-EFSW-5010:FLOW", "kind": "flow"},
{"attr": "OPKBEFSW6010", "label": "OP-KB-EFSW-6010",
{"attr": "OPKBEFSW6010", "label": "OP-KB-EFSW-6010",
"pv": "X12SA-OP-KB-EFSW-6010:FLOW", "kind": "flow"},
{"attr": "OPPSH1EFSW7010", "label": "OP-PSH1-EFSW-7010",
"pv": "X12SA-OP-PSH1-EFSW-7010:FLOW", "kind": "flow"},
{"attr": "ESEB2EFSW1010", "label": "ES-EB2-EFSW-1010",
{"attr": "ESEB2EFSW1010", "label": "ES-EB2-EFSW-1010",
"pv": "X12SA-ES-EB2-EFSW-1010:FLOW", "kind": "flow"},
# Cooling supply/return valves (OPEN/CLOSED)
{"attr": "OPCSECVW0010", "label": "OP-CS-ECVW-0010",
"pv": "X12SA-OP-CS-ECVW-0010:PLC_OPEN", "kind": "valve"},
{"attr": "OPCSECVW0020", "label": "OP-CS-ECVW-0020",
"pv": "X12SA-OP-CS-ECVW-0020:PLC_OPEN", "kind": "valve"},
],
}
@@ -182,7 +159,7 @@ def create_dynamic_eps_class():
_default_sub="value",
)
# Dynamically define Components FIRST
# Dynamically define Components
for section, items in CHANNELS.items():
for it in items:
class_attrs[it["attr"]] = Cpt(
@@ -196,58 +173,162 @@ def create_dynamic_eps_class():
# ----------------------------------------------------------
class DynamicMethods:
# ------------------------
# Client messaging helper
# ------------------------
def _notify(self, msg: str, show_asap=True):
"""
Send a message to the client UI through the device manager.
Falls back to print() if device_manager/connector is missing.
"""
try:
conn = getattr(getattr(self, "device_manager", None), "connector", None)
if conn and hasattr(conn, "send_client_info"):
conn.send_client_info(msg, scope="", show_asap=show_asap)
else:
print(msg)
except Exception:
print(msg)
# ----------------------------------------------------------
# Water cooling operation
# ----------------------------------------------------------
def water_cooling_op(self):
"""
Opens all watercooling valves (ECVW) with alarm reset if needed.
Polls valves for 20 seconds and reports success/failure.
Open ECVW valves, reset EPS alarms, monitor for 20s,
then ensure stability (valves remain open) for 10s.
All messages sent to client.
"""
print("\n=== Water Cooling Operation ===")
from ophyd import EpicsSignal
# ---------------------------
# Collect required signals
# ---------------------------
eps_alarm = getattr(self, "EPSAlarmCnt", None)
ackerr = EpicsSignal("X12SA-EPS-PLC:ACKERR-REQUEST")
request = EpicsSignal("X12SA-OP-CS-ECVW:PLC_REQUEST")
POLL_PERIOD = 2
TIMEOUT = 20
STABILITY = 15
self._notify("=== Water Cooling Operation ===")
# --- Signals ---
eps_alarm_sig = getattr(self, "EPSAlarmCnt", None)
ackerr = EpicsSignal("X12SA-EPS-PLC:ACKERR-REQUEST")
request = EpicsSignal("X12SA-OP-CS-ECVW:PLC_REQUEST")
# Cooling valves we must check
valve_attrs = ["OPCSECVW0010", "OPCSECVW0020"]
valves = [getattr(self, a) for a in valve_attrs]
valves = [getattr(self, a, None) for a in valve_attrs]
# ---------------------------
# 1. Reset alarms if needed
# ---------------------------
alarm_value = eps_alarm.get() if eps_alarm else 0
# Flow channels list extracted from CHANNELS
flow_items = [
(it["attr"], it["label"])
for it in CHANNELS["Cooling Water"]
if it["kind"] == "flow"
]
def safe_get(sig, default=None):
try:
return sig.get()
except Exception:
return default
# --- Step 1: EPS alarm reset ---
alarm_value = safe_get(eps_alarm_sig, 0)
if alarm_value and alarm_value > 0:
print(f"EPS alarms present ({alarm_value}), resetting…")
ackerr.put(1)
self._notify(f"[WaterCooling] EPS alarms present ({alarm_value}) resetting…")
try:
ackerr.put(1)
except Exception as ex:
self._notify(f"[WaterCooling] WARNING: ACKERR write failed: {ex}")
time.sleep(0.3)
else:
self._notify("[WaterCooling] No EPS alarms detected.")
# ---------------------------
# 2. Send valveopen request
# ---------------------------
print("Sending coolingvalve OPEN request…")
request.put(1)
# --- Step 2: Issue open request ---
self._notify("[WaterCooling] Sending coolingvalve OPEN request")
try:
request.put(1)
except Exception as ex:
self._notify(f"[WaterCooling] ERROR: Failed to send OPEN request: {ex}")
return False
# ---------------------------
# 3. Poll valves for 20 seconds
# ---------------------------
timeout = 20
end = time.time() + timeout
# --- Step 3: Monitoring loop (clean client table output) ---
start = time.time()
end = start + TIMEOUT
stable_until = None
while time.time() < end:
states = [v.get() for v in valves]
status = ["OPEN" if s else "CLOSED" for s in states]
print(" Valve status:", status)
# Print (server-side) header once
print("Monitoring valves and flow sensors...")
print(f" Valves: {valve_attrs[0][-4:]}, {valve_attrs[1][-4:]}")
print(f" Note: stability requires valves to remain OPEN for {STABILITY} seconds.")
if all(states):
print("\n→ All cooling valves are OPEN. Operation successful.")
return True
# One table header to the client (via device manager)
# Fixed-width columns for alignment in monospaced UI
table_header = (
f"{'Time':>6} | {'Valves':<21} | {'Flows (OK/FAIL/N/A)':<20}"
)
self._notify(table_header)
time.sleep(2)
def snapshot():
# Valve snapshot
v_states = [safe_get(v, None) for v in valves]
v1 = (
f"{valve_attrs[0][-4:]}="
+ ("OPEN " if v_states[0] is True or v_states[0] == 1 else
"CLOSED" if v_states[0] is False or v_states[0] == 0 else
"N/A ")
)
v2 = (
f"{valve_attrs[1][-4:]}="
+ ("OPEN " if v_states[1] is True or v_states[1] == 1 else
"CLOSED" if v_states[1] is False or v_states[1] == 0 else
"N/A ")
)
# 2 valves with a single space between => width ~ 21
valve_str = f"{v1} {v2}"
print("\n→ TIMEOUT: Cooling valves failed to open.")
return False
# Flow summary: OK/FAIL/N/A counts (compact)
flow_states = []
for f_attr, _lbl in flow_items:
fsig = getattr(self, f_attr, None)
fval = safe_get(fsig, None)
flow_states.append(True if fval in (1, True) else False if fval in (0, False) else None)
ok = sum(1 for f in flow_states if f is True)
fail = sum(1 for f in flow_states if f is False)
na = sum(1 for f in flow_states if f is None)
flow_summary = f"{ok:>2} / {fail:>2} / {na:>2}"
return v_states, valve_str, flow_summary
while True:
now = time.time()
elapsed = int(now - start)
if now > end:
# One last line to client
v_states, valves_s, flows_s = snapshot()
self._notify(f"{elapsed:>6}s | {valves_s:<21} | {flows_s:<20}")
print("→ TIMEOUT: Cooling valves failed to remain OPEN.")
return False
# Live snapshot
v_states, valves_s, flows_s = snapshot()
# Exactly one concise line to client per cycle
self._notify(f"{elapsed:>6}s | {valves_s:<21} | {flows_s:<20}")
both_open = all(s is not None and bool(s) for s in v_states)
if both_open:
if stable_until is None:
stable_until = now + STABILITY
print(f"[WaterCooling] Both valves OPEN → starting {STABILITY}s stability window…")
else:
if now >= stable_until:
print("→ SUCCESS: Valves remained OPEN during stability window.")
return True
else:
if stable_until is not None:
print("[WaterCooling] Valve closed again → restarting stability window.")
stable_until = None
time.sleep(POLL_PERIOD)
def show_all(self):
red = "\x1b[91m"
@@ -256,11 +337,20 @@ def create_dynamic_eps_class():
bold = "\x1b[1m"
cyan = "\x1b[96m"
# ---- New: enum maps for numeric -> string rendering ----
POSITION_ENUM = {
0: "out of beam",
1: "in beam",
}
STRIPE_ENUM = {
0: "Stripe 1 W/B4C",
1: "Stripe 2 NiV/B4C",
}
POSITION_ATTRS = {"DMM1_POSITION", "CCM1_POSITION"}
STRIPE_ATTRS = {"DMM1_STRIPE"}
def safe_get(attr):
"""Get value or None."""
obj = getattr(self, attr, None)
if obj is None:
return None
try:
return obj.get()
except Exception:
@@ -269,11 +359,29 @@ def create_dynamic_eps_class():
def is_bool_like(v):
return isinstance(v, (bool, int)) and v in (0, 1, True, False)
def fmt_value(value, pv, kind):
"""Format value based on semantic kind."""
# ---- Changed: accept attr in formatter so we can apply enum mapping ----
def fmt_value(value, pv, kind, attr):
if value is None:
return f"{red}MISSING{white}"
# ---------- Explicit enum mappings by attribute ----------
if attr in POSITION_ATTRS:
# Position comes as numeric 0/1
try:
iv = int(value)
return POSITION_ENUM.get(iv, f"{iv}")
except Exception:
# Fallback if its already a string or unexpected
return f"{value}"
if attr in STRIPE_ATTRS:
# Stripe comes as numeric 0/1
try:
iv = int(value)
return STRIPE_ENUM.get(iv, f"{iv}")
except Exception:
return f"{value}"
# ------------------- TEMPERATURE -------------------
if kind == "temp" and isinstance(value, (int, float)):
return f"{value:.1f}"
@@ -284,6 +392,7 @@ def create_dynamic_eps_class():
# ------------------- STRINGS -----------------------
if kind in ("string", "position"):
# For other strings, just echo the value
return f"{value}"
# ------------------- SWITCH (ACTIVE/INACTIVE) ------
@@ -298,7 +407,7 @@ def create_dynamic_eps_class():
if kind in ("valve", "shutter") and is_bool_like(value):
return f"{green+'OPEN'+white if value else red+'CLOSED'+white}"
# Flow → OK / FAIL (1=OK, 0=FAIL)
# ------------------- FLOW (OK/FAIL) ----------------
if kind == "flow" and is_bool_like(value):
return f"{green}OK{white}" if bool(value) else f"{red}FAIL{white}"
@@ -311,39 +420,31 @@ def create_dynamic_eps_class():
for section, items in CHANNELS.items():
print(f"\n{bold}{section}{white}")
# Gather row values
rows = []
for it in items:
val = safe_get(it["attr"])
rows.append((it["label"], val, it["pv"], it["kind"], it["attr"]))
# Compute label width
label_width = max(32, *(len(label) for (label, _, _, _, _) in rows))
# Print lines
for label, value, pv, kind, _attr in rows:
fv = fmt_value(value, pv, kind)
print(f" {label:<{label_width}} {fv}")
fv = fmt_value(value, pv, kind, _attr) # <-- pass attr to formatter
print(f" - {label:<{label_width}} {fv}")
# -------------------------------------------------
# Contextual proposal: water_cooling()
# Triggered only in "Cooling Water" if both ECVW are CLOSED
# -------------------------------------------------
if section == "Cooling Water":
v1 = safe_get("OPCSECVW0010")
v2 = safe_get("OPCSECVW0020")
def is_closed(v):
return is_bool_like(v) and (v is False or v == 0)
def closed(v): return is_bool_like(v) and not bool(v)
if is_closed(v1) and is_closed(v2):
if closed(v1) and closed(v2):
print(
f"\n{cyan}Hint:{white} Water cooling valves OP are closed. "
f"You can request them to open them via {bold}dev.x12saEPS.water_cooling_op(){white}."
f"\n{cyan}Hint:{white} Both water cooling valves are CLOSED.\n"
f"You can open them using: {bold}dev.x12saEPS.water_cooling_op(){white}"
)
# ----------------------------------------------------------
# Duplicates / missing PV validation
# Consistency report
# ----------------------------------------------------------
def consistency_report(self, *, verbose=True):
missing = []
@@ -363,6 +464,7 @@ def create_dynamic_eps_class():
if verbose:
print("=== Consistency Report ===")
if missing:
print("\nMissing attributes:")
for sec, a, lbl, pv in missing:
@@ -379,14 +481,26 @@ def create_dynamic_eps_class():
return {"missing_attrs": missing, "duplicate_pvs": dupes}
# Bind methods into class
# Attach methods to class attributes
class_attrs["show_all"] = DynamicMethods.show_all
class_attrs["consistency_report"] = DynamicMethods.consistency_report
class_attrs["water_cooling_op"] = DynamicMethods.water_cooling_op
class_attrs["_notify"] = DynamicMethods._notify
# Create the Device subclass
return type("cSAXSEps", (Device,), class_attrs)
# ----------------------------------------------------------
# Custom __init__ to accept device_manager (BEC)
# ----------------------------------------------------------
def __init__(self, *args, device_manager=None, **kwargs):
super(cls, self).__init__(*args, **kwargs)
self.device_manager = device_manager
# Create class
cls = type("cSAXSEps", (Device,), class_attrs)
setattr(cls, "__init__", __init__)
return cls
# Create final class for BEC to import
# Create final class for BEC import
cSAXSEps = create_dynamic_eps_class()