@@ -0,0 +1,435 @@
""" EPS module for cSAXS beamline: defines the EPS device with its components and methods. """
# fmt: off
# Disable Black formatting for this file to preserve an easier readable structure for the component definitions.
# pylint: disable=line-too-long
from __future__ import annotations
import time
from bec_lib . logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device , EpicsSignal , EpicsSignalRO , Kind
from ophyd_devices import PSIDeviceBase
logger = bec_logger . logger
# ---------------------------
# Registry: sections/channels
# ---------------------------
class EPSSubDevices ( Device ) :
""" Base class for EPS sub-device components (e.g. alarms, valves, shutters). with common methods if needed. """
def describe ( self ) - > dict :
desc = super ( ) . describe ( )
for walk in self . walk_signals ( ) :
if walk . item . attr_name not in desc :
desc [ walk . item . attr_name ] = walk . item . describe ( )
return desc
class EPSAlarms ( EPSSubDevices ) :
""" EPS alarms at the cSAXS beamline. """
eps_alarm_cnt = Cpt ( EpicsSignalRO , read_pv = " X12SA-EPS-PLC:AlarmCnt_EPS " , add_prefix = ( " " , ) , name = " eps_alarm_cnt " , kind = Kind . omitted , doc = " X12SA EPS Alarm count " , auto_monitor = True , labels = { " alarm " } )
mis_alarm_cnt = Cpt ( EpicsSignalRO , read_pv = " ARS00-MIS-PLC-01:AlarmCnt_Frontends " , add_prefix = ( " " , ) , name = " mis_alarm_cnt " , kind = Kind . omitted , doc = " FrontEnd MIS Alarm count " , auto_monitor = True , labels = { " alarm " } )
class ValvesFrontend ( EPSSubDevices ) :
""" Valves frontend at the cSAXS beamline. """
fe_vvpg_0000 = Cpt ( EpicsSignalRO , read_pv = " X12SA-FE-VVPG-0000:PLC_OPEN " , add_prefix = ( " " , ) , name = " fevvpg0000 " , kind = Kind . omitted , doc = " FE-VVPG-0000 " , auto_monitor = True , labels = { " valve " } )
fe_vvpg_1010 = Cpt ( EpicsSignalRO , read_pv = " X12SA-FE-VVPG-1010:PLC_OPEN " , add_prefix = ( " " , ) , name = " fevvpg1010 " , kind = Kind . omitted , doc = " FE-VVPG-1010 " , auto_monitor = True , labels = { " valve " } )
fe_vvfv_2010 = Cpt ( EpicsSignalRO , read_pv = " X12SA-FE-VVFV-2010:PLC_OPEN " , add_prefix = ( " " , ) , name = " fevvfv2010 " , kind = Kind . omitted , doc = " FE-VVFV-2010 " , auto_monitor = True , labels = { " valve " } )
fe_vvpg_2010 = Cpt ( EpicsSignalRO , read_pv = " X12SA-FE-VVPG-2010:PLC_OPEN " , add_prefix = ( " " , ) , name = " fevvpg2010 " , kind = Kind . omitted , doc = " FE-VVPG-2010 " , auto_monitor = True , labels = { " valve " } )
class ValvesOptics ( EPSSubDevices ) :
""" Valves at the optics hutch. """
op_vvpg_1010 = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-VVPG-1010:PLC_OPEN " , add_prefix = ( " " , ) , name = " opvvpg1010 " , kind = Kind . omitted , doc = " OP-VVPG-1010 " , auto_monitor = True , labels = { " valve " } )
op_vvpg_2010 = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-VVPG-2010:PLC_OPEN " , add_prefix = ( " " , ) , name = " opvvpg2010 " , kind = Kind . omitted , doc = " OP-VVPG-2010 " , auto_monitor = True , labels = { " valve " } )
op_vvpg_3010 = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-VVPG-3010:PLC_OPEN " , add_prefix = ( " " , ) , name = " opvvpg3010 " , kind = Kind . omitted , doc = " OP-VVPG-3010 " , auto_monitor = True , labels = { " valve " } )
op_vvpg_3020 = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-VVPG-3020:PLC_OPEN " , add_prefix = ( " " , ) , name = " opvvpg3020 " , kind = Kind . omitted , doc = " OP-VVPG-3020 " , auto_monitor = True , labels = { " valve " } )
op_vvpg_4010 = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-VVPG-4010:PLC_OPEN " , add_prefix = ( " " , ) , name = " opvvpg4010 " , kind = Kind . omitted , doc = " OP-VVPG-4010 " , auto_monitor = True , labels = { " valve " } )
op_vvpg_5010 = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-VVPG-5010:PLC_OPEN " , add_prefix = ( " " , ) , name = " opvvpg5010 " , kind = Kind . omitted , doc = " OP-VVPG-5010 " , auto_monitor = True , labels = { " valve " } )
op_vvpg_6010 = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-VVPG-6010:PLC_OPEN " , add_prefix = ( " " , ) , name = " opvvpg6010 " , kind = Kind . omitted , doc = " OP-VVPG-6010 " , auto_monitor = True , labels = { " valve " } )
op_vvpg_7010 = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-VVPG-7010:PLC_OPEN " , add_prefix = ( " " , ) , name = " opvvpg7010 " , kind = Kind . omitted , doc = " OP-VVPG-7010 " , auto_monitor = True , labels = { " valve " } )
class ValvesEndstation ( EPSSubDevices ) :
""" Endstation valves at the cSAXS beamline. """
es_vvpg_1010 = Cpt ( EpicsSignalRO , read_pv = " X12SA-ES-VVPG-1010:PLC_OPEN " , add_prefix = ( " " , ) , name = " esvvpg1010 " , kind = Kind . omitted , doc = " ES-VVPG-1010 " , auto_monitor = True , labels = { " valve " } )
class ShuttersFrontend ( EPSSubDevices ) :
""" Shutters frontend. """
fe_psh1 = Cpt ( EpicsSignalRO , read_pv = " X12SA-FE-PSH1-EMLS-0010:OPEN " , add_prefix = ( " " , ) , name = " fepsh1 " , kind = Kind . omitted , doc = " FE-PSH1-EMLS-0010 " , auto_monitor = True , labels = { " shutter " } )
fe_sto1 = Cpt ( EpicsSignalRO , read_pv = " X12SA-FE-STO1-EMLS-0010:OPEN " , add_prefix = ( " " , ) , name = " festo1 " , kind = Kind . omitted , doc = " FE-STO1-EMLS-0010 " , auto_monitor = True , labels = { " shutter " } )
class ShuttersEndstation ( EPSSubDevices ) :
""" Shutters at the endstation. """
es_psh17010 = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-PSH1-EMLS-7010:OPEN " , add_prefix = ( " " , ) , name = " espsh17010 " , kind = Kind . omitted , doc = " OP-PSH1-EMLS-7010 " , auto_monitor = True , labels = { " shutter " } )
class DMMMonochromator ( EPSSubDevices ) :
""" DMM monochromator signals at the cSAXS beamline. """
dmm_temp_surface_1 = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-DMM-ETTC-3010:TEMP " , add_prefix = ( " " , ) , name = " dmm_temp_surface_1 " , kind = Kind . omitted , doc = " DMM Temp Surface 1 " , auto_monitor = True , labels = { " temp " } )
dmm_temp_surface_2 = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-DMM-ETTC-3020:TEMP " , add_prefix = ( " " , ) , name = " dmm_temp_surface_2 " , kind = Kind . omitted , doc = " DMM Temp Surface 2 " , auto_monitor = True , labels = { " temp " } )
dmm_temp_shield_1_disaster = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-DMM-ETTC-3030:TEMP " , add_prefix = ( " " , ) , name = " dmm_temp_shield_1_disaster " , kind = Kind . omitted , doc = " DMM Temp Shield 1 (disaster) " , auto_monitor = True , labels = { " temp " } )
dmm_temp_shield_2_disaster = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-DMM-ETTC-3040:TEMP " , add_prefix = ( " " , ) , name = " dmm_temp_shield_2_disaster " , kind = Kind . omitted , doc = " DMM Temp Shield 2 (disaster) " , auto_monitor = True , labels = { " temp " } )
dmm_translation_thru = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-DMM-EMLS-3010:THRU " , add_prefix = ( " " , ) , name = " dmm_translation_thru " , kind = Kind . omitted , doc = " DMM Translation ThruPos " , auto_monitor = True , labels = { " switch " } )
dmm_translation_in = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-DMM-EMLS-3020:IN " , add_prefix = ( " " , ) , name = " dmm_translation_in " , kind = Kind . omitted , doc = " DMM Translation InPos " , auto_monitor = True , labels = { " switch " } )
dmm_bragg_thru = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-DMM-EMLS-3030:THRU " , add_prefix = ( " " , ) , name = " dmm_bragg_thru " , kind = Kind . omitted , doc = " DMM Bragg ThruPos " , auto_monitor = True , labels = { " switch " } )
dmm_bragg_in = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-DMM-EMLS-3040:IN " , add_prefix = ( " " , ) , name = " dmm_bragg_in " , kind = Kind . omitted , doc = " DMM Bragg InPos " , auto_monitor = True , labels = { " switch " } )
dmm_heater_fault_xtal_1 = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-DMM-EMSW-3050:SWITCH " , add_prefix = ( " " , ) , name = " dmm_heater_fault_xtal_1 " , kind = Kind . omitted , doc = " DMM Heater Fault XTAL 1 " , auto_monitor = True , labels = { " fault " } )
dmm_heater_fault_xtal_2 = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-DMM-EMSW-3060:SWITCH " , add_prefix = ( " " , ) , name = " dmm_heater_fault_xtal_2 " , kind = Kind . omitted , doc = " DMM Heater Fault XTAL 2 " , auto_monitor = True , labels = { " fault " } )
dmm_heater_fault_support_1 = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-DMM-EMSW-3070:SWITCH " , add_prefix = ( " " , ) , name = " dmm_heater_fault_support_1 " , kind = Kind . omitted , doc = " DMM Heater Fault Support 1 " , auto_monitor = True , labels = { " fault " } )
dmm_energy = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-DMM1:ENERGY-GET " , add_prefix = ( " " , ) , name = " dmm_energy " , kind = Kind . omitted , doc = " DMM Energy " , auto_monitor = True , labels = { " energy " } )
dmm_position = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-DMM1:POSITION " , add_prefix = ( " " , ) , name = " dmm_position " , kind = Kind . omitted , doc = " DMM Position " , auto_monitor = True , labels = { " string " } )
dmm_stripe = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-DMM1:STRIPE " , add_prefix = ( " " , ) , name = " dmm_stripe " , kind = Kind . omitted , doc = " DMM Stripe " , auto_monitor = True , labels = { " string " } )
class CCMMonochromator ( EPSSubDevices ) :
""" CCM monochromator signals at the cSAXS beamline. """
ccm_temp_crystal = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-CCM-ETTC-4010:TEMP " , add_prefix = ( " " , ) , name = " ccm_temp_crystal " , kind = Kind . omitted , doc = " CCM Temp Crystal " , auto_monitor = True , labels = { " temp " } )
ccm_temp_shield_disaster = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-CCM-ETTC-4020:TEMP " , add_prefix = ( " " , ) , name = " ccm_temp_shield_disaster " , kind = Kind . omitted , doc = " CCM Temp Shield (disaster) " , auto_monitor = True , labels = { " temp " } )
ccm_heater_fault_1 = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-CCM-EMSW-4010:SWITCH " , add_prefix = ( " " , ) , name = " ccm_heater_fault_1 " , kind = Kind . omitted , doc = " CCM Heater Fault 1 " , auto_monitor = True , labels = { " fault " } )
ccm_heater_fault_2 = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-CCM-EMSW-4020:SWITCH " , add_prefix = ( " " , ) , name = " ccm_heater_fault_2 " , kind = Kind . omitted , doc = " CCM Heater Fault 2 " , auto_monitor = True , labels = { " fault " } )
ccm_heater_fault_3 = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-CCM-EMSW-4030:SWITCH " , add_prefix = ( " " , ) , name = " ccm_heater_fault_3 " , kind = Kind . omitted , doc = " CCM Heater Fault 3 " , auto_monitor = True , labels = { " fault " } )
ccm_energy = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-CCM1:ENERGY-GET " , add_prefix = ( " " , ) , name = " ccm_energy " , kind = Kind . omitted , doc = " CCM Energy " , auto_monitor = True , labels = { " energy " } )
ccm_position = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-CCM1:POSITION " , add_prefix = ( " " , ) , name = " ccm_position " , kind = Kind . omitted , doc = " CCM Position " , auto_monitor = True , labels = { " string " } )
class CoolingWater ( EPSSubDevices ) :
""" Cooling water signals at the cSAXS beamline. """
op_sl1_efsw_2010_flow = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-SL1-EFSW-2010:FLOW " , add_prefix = ( " " , ) , name = " op_sl1_efsw_2010_flow " , kind = Kind . omitted , doc = " OP-SL1-EFSW-2010 " , auto_monitor = True , labels = { " flow " } )
op_sl2_efsw_2010_flow = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-SL2-EFSW-2010:FLOW " , add_prefix = ( " " , ) , name = " op_sl2_efsw_2010_flow " , kind = Kind . omitted , doc = " OP-SL2-EFSW-2010 " , auto_monitor = True , labels = { " flow " } )
op_eb1_efsw_5010_flow = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-EB1-EFSW-5010:FLOW " , add_prefix = ( " " , ) , name = " op_eb1_efsw_5010_flow " , kind = Kind . omitted , doc = " OP-EB1-EFSW-5010 " , auto_monitor = True , labels = { " flow " } )
op_eb1_efsw_5020_flow = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-EB1-EFSW-5020:FLOW " , add_prefix = ( " " , ) , name = " op_eb1_efsw_5020_flow " , kind = Kind . omitted , doc = " OP-EB1-EFSW-5020 " , auto_monitor = True , labels = { " flow " } )
op_sl3_efsw_5010_flow = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-SL3-EFSW-5010:FLOW " , add_prefix = ( " " , ) , name = " op_sl3_efsw_5010_flow " , kind = Kind . omitted , doc = " OP-SL3-EFSW-5010 " , auto_monitor = True , labels = { " flow " } )
op_kb_efsw_6010_flow = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-KB-EFSW-6010:FLOW " , add_prefix = ( " " , ) , name = " op_kb_efsw_6010_flow " , kind = Kind . omitted , doc = " OP-KB-EFSW-6010 " , auto_monitor = True , labels = { " flow " } )
op_psh1_efsw_7010_flow = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-PSH1-EFSW-7010:FLOW " , add_prefix = ( " " , ) , name = " op_psh1_efsw_7010_flow " , kind = Kind . omitted , doc = " OP-PSH1-EFSW-7010 " , auto_monitor = True , labels = { " flow " } )
es_eb2_efsw_1010_flow = Cpt ( EpicsSignalRO , read_pv = " X12SA-ES-EB2-EFSW-1010:FLOW " , add_prefix = ( " " , ) , name = " es_eb2_efsw_1010_flow " , kind = Kind . omitted , doc = " ES-EB2-EFSW-1010 " , auto_monitor = True , labels = { " flow " } )
op_cs_ecvw_0010 = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-CS-ECVW-0010:PLC_OPEN " , add_prefix = ( " " , ) , name = " op_cs_ecvw_0010 " , kind = Kind . omitted , doc = " OP-CS-ECVW-0010 " , auto_monitor = True , labels = { " valve " } )
op_cs_ecvw_0020 = Cpt ( EpicsSignalRO , read_pv = " X12SA-OP-CS-ECVW-0020:PLC_OPEN " , add_prefix = ( " " , ) , name = " op_cs_ecvw_0020 " , kind = Kind . omitted , doc = " OP-CS-ECVW-0020 " , auto_monitor = True , labels = { " valve " } )
class EPS ( PSIDeviceBase ) :
""" EPS device for the cSAXS beamline. """
USER_ACCESS = [
" show_all " ,
" water_cooling_op " ,
]
alarms = Cpt ( EPSAlarms , name = " alarms " , doc = " EPS Alarms " )
valves_frontend = Cpt ( ValvesFrontend , name = " valves_frontend " , doc = " Valves Frontend " )
valves_optics = Cpt ( ValvesOptics , name = " valves_optics " , doc = " Valves Optics Hutch " )
valves_es = Cpt ( ValvesEndstation , name = " valves_es " , doc = " Valves ES Hutch " )
shutters_frontend = Cpt ( ShuttersFrontend , name = " shutters_frontend " , doc = " Shutters Frontend " )
shutters_es = Cpt ( ShuttersEndstation , name = " shutters_es " , doc = " Shutters Endstation " )
dmm_monochromator = Cpt ( DMMMonochromator , name = " dmm_monochromator " , doc = " DMM Monochromator " )
ccm_monochromator = Cpt ( CCMMonochromator , name = " ccm_monochromator " , doc = " CCM Monochromator " )
cooling_water = Cpt ( CoolingWater , name = " cooling_water " , doc = " Cooling Water " )
# Acknowledgment signals for PLC communication (if needed for future use)
ackerr = Cpt ( EpicsSignal , read_pv = " X12SA-EPS-PLC:ACKERR-REQUEST " , add_prefix = ( " " , ) , name = " ackerr " , kind = Kind . omitted , doc = " ACKERR request - OP-CS-ECVW-0020 " , auto_monitor = True , labels = { " request " } )
request = Cpt ( EpicsSignal , read_pv = " X12SA-OP-CS-ECVW:PLC_REQUEST " , add_prefix = ( " " , ) , name = " op_cs_ecvw_request " , kind = Kind . omitted , doc = " PLC request - OP-CS-ECVW-PLC_REQUEST " , auto_monitor = True , labels = { " request " } )
def _notify ( self , msg : str , show_as_client_msg : bool = True ) :
""" Utility method to print a message, and optionally send it to the client UI if it should be shown also as a client message. """
try :
if show_as_client_msg :
self . device_manager . connector . send_client_info ( msg , scope = " " , show_asap = True )
else :
print ( msg )
except Exception :
logger . error ( f " Failed to send client message, falling back to print: { msg } " )
print ( str ( msg ) )
# ----------------------------------------------------------
# Water cooling operation
# ----------------------------------------------------------
def safe_get ( self , sig , default = None ) :
""" Helper method to safely get a signal value, returning a default if there ' s an error. """
try :
return sig . get ( )
except Exception as ex :
logger . warning ( f " Failed to get signal { sig . pvname } : { ex } " )
return default
def water_cooling_op ( self ) :
"""
Open ECVW valves, reset EPS alarms, monitor for 20s,
then ensure stability (valves remain open) for 10s.
All messages sent to client.
"""
POLL_PERIOD = 2
TIMEOUT = 20
STABILITY = 15
self . _notify ( " === Water Cooling Operation === " )
# --- Signals ---
eps_alarm_sig = self . alarms . eps_alarm_cnt
ackerr = self . ackerr
request = self . request
valves = [ self . cooling_water . op_cs_ecvw_0010 , self . cooling_water . op_cs_ecvw_0020 ]
# Flow channels list extracted from CHANNELS
flow_items = [ walk . item for walk in self . cooling_water . walk_signals ( ) if " flow " in walk . item . _ophyd_labels_ ]
# --- Step 1: EPS alarm reset ---
alarm_value = self . safe_get ( eps_alarm_sig , 0 )
if alarm_value and alarm_value > 0 :
self . _notify ( f " [WaterCooling] EPS alarms present ( { alarm_value } ) → resetting… " )
try :
ackerr . put ( 1 )
except Exception as ex :
self . _notify ( f " [WaterCooling] WARNING: ACKERR write failed: { ex } " )
time . sleep ( 0.3 )
else :
self . _notify ( " [WaterCooling] No EPS alarms detected. " )
# --- Step 2: Issue open request ---
self . _notify ( " [WaterCooling] Sending cooling-valve OPEN request… " )
try :
request . put ( 1 )
except Exception as ex :
self . _notify ( f " [WaterCooling] ERROR: Failed to send OPEN request: { ex } " )
return False
# --- Step 3: Monitoring loop (clean client table output) ---
start = time . time ( )
end = start + TIMEOUT
stable_until = None
# Print (server-side) header once
print ( " Monitoring valves and flow sensors... " )
print ( f " Valves: { valves [ 0 ] . attr_name [ - 4 : ] } , { valves [ 1 ] . attr_name [ - 4 : ] } " )
print ( f " Note: stability requires valves to remain OPEN for { STABILITY } seconds. " )
# One table header to the client (via device manager)
# Fixed-width columns for alignment in monospaced UI
table_header = f " { ' Time ' : >6 } | { ' Valves ' : <21 } | { ' Flows (OK/FAIL/N/A) ' : <20 } "
self . _notify ( table_header )
def snapshot ( ) :
# Valve snapshot
v_states = [ self . safe_get ( v , None ) for v in valves ]
v1 = f " { valves [ 0 ] . attr_name [ - 4 : ] } = " + ( " OPEN " if v_states [ 0 ] is True or v_states [ 0 ] == 1 else " CLOSED " if v_states [ 0 ] is False or v_states [ 0 ] == 0 else " N/A " )
v2 = f " { valves [ 1 ] . attr_name [ - 4 : ] } = " + ( " OPEN " if v_states [ 1 ] is True or v_states [ 1 ] == 1 else " CLOSED " if v_states [ 1 ] is False or v_states [ 1 ] == 0 else " N/A " )
# 2 valves with a single space between => width ~ 21
valve_str = f " { v1 } { v2 } "
# Flow summary: OK/FAIL/N/A counts (compact)
flow_states = [ ]
for fsig in flow_items :
fval = self . safe_get ( fsig , None )
flow_states . append ( True if fval in ( 1 , True ) else False if fval in ( 0 , False ) else None )
ok = sum ( 1 for f in flow_states if f is True )
fail = sum ( 1 for f in flow_states if f is False )
na = sum ( 1 for f in flow_states if f is None )
flow_summary = f " { ok : >2 } / { fail : >2 } / { na : >2 } "
return v_states , valve_str , flow_summary
while True :
# TODO Consider adding a timeout to avoid infinite loop.
now = time . time ( )
elapsed = int ( now - start )
if now > end :
# One last line to client
v_states , valves_s , flows_s = snapshot ( )
self . _notify ( f " { elapsed : >6 } s | { valves_s : <21 } | { flows_s : <20 } " )
print ( " → TIMEOUT: Cooling valves failed to remain OPEN. " )
return False
# Live snapshot
v_states , valves_s , flows_s = snapshot ( )
# Exactly one concise line to client per cycle
self . _notify ( f " { elapsed : >6 } s | { valves_s : <21 } | { flows_s : <20 } " )
both_open = all ( s is not None and bool ( s ) for s in v_states )
if both_open :
if stable_until is None :
stable_until = now + STABILITY
print ( f " [WaterCooling] Both valves OPEN → starting { STABILITY } s stability window… " )
else :
if now > = stable_until :
print ( " → SUCCESS: Valves remained OPEN during stability window. " )
return True
else :
if stable_until is not None :
print ( " [WaterCooling] Valve closed again → restarting stability window. " )
stable_until = None
time . sleep ( POLL_PERIOD )
def show_all ( self ) :
red = " \x1b [91m "
green = " \x1b [92m "
white = " \x1b [0m "
bold = " \x1b [1m "
cyan = " \x1b [96m "
# ---- New: enum maps for numeric -> string rendering ----
POSITION_ENUM = { 0 : " out of beam " , 1 : " in beam " }
STRIPE_ENUM = { 0 : " Stripe 1 W/B4C " , 1 : " Stripe 2 NiV/B4C " }
POSITION_ATTRS = { self . dmm_monochromator . dmm_position . attr_name , self . ccm_monochromator . ccm_position . attr_name }
STRIPE_ATTRS = { self . dmm_monochromator . dmm_stripe . attr_name }
def is_bool_like ( v ) :
return isinstance ( v , ( bool , int ) ) and v in ( 0 , 1 , True , False )
# ---- Changed: accept attr in formatter so we can apply enum mapping ----
def fmt_value ( value : any , signal : EpicsSignalRO ) :
if value is None :
return f " { red } MISSING { white } "
attr = signal . attr_name
# ---------- Explicit enum mappings by attribute ----------
if attr in POSITION_ATTRS :
# Position comes as numeric 0/1
try :
iv = int ( value )
return POSITION_ENUM . get ( iv , f " { iv } " )
except Exception :
# Fallback if it’ s already a string or unexpected
return f " { value } "
if attr in STRIPE_ATTRS :
# Stripe comes as numeric 0/1
try :
iv = int ( value )
return STRIPE_ENUM . get ( iv , f " { iv } " )
except Exception :
return f " { value } "
# ------------------- TEMPERATURE -------------------
if " temp " in signal . _ophyd_labels_ and isinstance ( value , ( int , float ) ) :
return f " { value : .1f } "
# ------------------- ENERGY ------------------------
if " energy " in signal . _ophyd_labels_ and isinstance ( value , ( int , float ) ) :
return f " { value : .4f } "
# ------------------- STRINGS -----------------------
if " string " in signal . _ophyd_labels_ or " position " in signal . _ophyd_labels_ :
# For other strings, just echo the value
return f " { value } "
# ------------------- SWITCH (ACTIVE/INACTIVE) ------
if " switch " in signal . _ophyd_labels_ and is_bool_like ( value ) :
return f " { green + ' ACTIVE ' + white if value else red + ' INACTIVE ' + white } "
# ------------------- FAULT (OK/FAULT) --------------
if " fault " in signal . _ophyd_labels_ and is_bool_like ( value ) :
return f " { green + ' OK ' + white if not value else red + ' FAULT ' + white } "
# ------------------- VALVE/SHUTTER -----------------
if ( " valve " in signal . _ophyd_labels_ or " shutter " in signal . _ophyd_labels_ ) and is_bool_like ( value ) :
return f " { green + ' OPEN ' + white if value else red + ' CLOSED ' + white } "
# ------------------- FLOW (OK/FAIL) ----------------
if " flow " in signal . _ophyd_labels_ and is_bool_like ( value ) :
return f " { green } OK { white } " if bool ( value ) else f " { red } FAIL { white } "
# ------------------- FALLBACK -----------------------
return f " { value } "
# ------------------- PRINT START ---------------------
print ( f " { bold } X12SA EPS status { white } " )
for name , component in self . _sig_attrs . items ( ) :
sub_device = getattr ( self , name )
rows = [ ]
# Only print sub-devices, not individual request signals
if not isinstance ( sub_device , Device ) :
continue
print ( f " \n { bold } { component . doc } { white } " )
for sub_walk in sub_device . walk_components ( ) :
cpt : Cpt = sub_walk . item
it : EpicsSignalRO = getattr ( sub_device , cpt . attr )
val = self . safe_get ( it )
rows . append ( ( cpt . doc , val , it ) )
label_width = max ( 32 , * ( len ( label ) for ( label , _ , _ ) in rows ) )
for label , value , it in rows :
fv = fmt_value ( value , it ) # <-- pass attr to formatter
print ( f " - { label : < { label_width } } { fv } " )
if sub_device . attr_name == " cooling_water " :
v1 = self . safe_get ( self . cooling_water . op_cs_ecvw_0010 )
v2 = self . safe_get ( self . cooling_water . op_cs_ecvw_0020 )
def closed ( v ) :
return is_bool_like ( v ) and not bool ( v )
if closed ( v1 ) and closed ( v2 ) :
print ( f " \n { cyan } Hint: { white } Both water cooling valves are CLOSED. \n " f " You can open them using: { bold } dev.x12saEPS.water_cooling_op() { white } " )
# fmt: on
# ----------------------------------------------------------
# Consistency report
# ----------------------------------------------------------
# def consistency_report(self, *, verbose=True):
# missing = []
# dupes = []
# seen = {}
# for sub_device in self.walk_components():
# section = sub_device.name
# for walk in sub_device.walk_components():
# cpt: Cpt = walk.ancestors[-1]
# it: EpicsSignalRO = walk.item
# if not hasattr(self, it["attr"]):
# missing.append((section, it["attr"], it["label"], it["pv"]))
# pv = it["pv"]
# if pv in seen:
# dupes.append((pv, seen[pv], (section, it["attr"], it["label"])))
# else:
# seen[pv] = (section, it["attr"], it["label"])
# if verbose:
# print("=== Consistency Report ===")
# if missing:
# print("\nMissing attributes:")
# for sec, a, lbl, pv in missing:
# print(f" - [{sec}] {a} {lbl} pv={pv}")
# else:
# print("\nNo missing attributes.")
# if dupes:
# print("\nDuplicate PVs:")
# for pv, f1, f2 in dupes:
# print(f" {pv} → {f1} AND {f2}")
# else:
# print("\nNo duplicate PVs.")
# return {"missing_attrs": missing, "duplicate_pvs": dupes}