Compare commits

...

22 Commits

Author SHA1 Message Date
9db56f5273 refactor: Eiger refactoring, fix test and add docs.
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m34s
CI for csaxs_bec / test (push) Successful in 1m33s
2026-02-16 14:13:55 +01:00
705df4b253 fix: cleanup of eiger integration 2026-02-16 14:13:55 +01:00
181b57494b fix: Cleanup of jungfrau_joch_client 2026-02-16 14:13:55 +01:00
efd51462fc refactor(jungfraujoch-preview): Improve Jungfraujoch Preview module 2026-02-16 14:13:55 +01:00
8a69c7aa36 fix(config): add eiger 9m to bl_detectors 2026-02-16 14:13:55 +01:00
b19bfb7ca4 fix: improve integration with feedback from the beamline 2026-02-16 14:13:55 +01:00
b818181da2 fix(mcs): wrap _progress_update callback in try except. 2026-02-16 14:13:55 +01:00
303929f8e6 fix: Cleanup after tests at the beamline 2026-02-16 14:13:55 +01:00
9f5799385c fix(eiger): add dectris-decompression and dependencies. 2026-02-16 14:13:55 +01:00
x12sa
cb968abe73 minor changes from the beamline 2026-02-16 14:13:55 +01:00
x12sa
5ab763ad38 test at beamline
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m28s
CI for csaxs_bec / test (push) Successful in 1m33s
2026-02-16 12:33:15 +01:00
fa35ddf1a9 refactor(eps): deactivate black for eps.py to keep signal component formatted in single line. 2026-02-16 12:33:15 +01:00
501bc52867 fix(eps): Fix tests and eps integration 2026-02-16 12:33:15 +01:00
f35c51efa7 test(eps): Add tests for EPS device 2026-02-16 12:33:15 +01:00
0c81c718d8 refactor(eps): Refactoring of EPS device from cSAXS 2026-02-16 12:33:15 +01:00
x12sa
ce88310125 fixed enable of water cooling 2026-02-16 12:33:15 +01:00
x12sa
e860571a64 water on method 2026-02-16 12:33:15 +01:00
x12sa
24bd5a71bc added water cooling channels 2026-02-16 12:33:15 +01:00
x12sa
541eb97096 added "kind" to channel list 2026-02-16 12:33:15 +01:00
x12sa
7911717142 create class dynamically, monos added 2026-02-16 12:33:15 +01:00
x12sa
bdf94533a5 first version of csaxs eps 2026-02-16 12:33:15 +01:00
5784331073 test(conftest): Add monkey patch for SocketSignal Readback_Timeout
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m35s
2026-02-16 11:10:47 +01:00
13 changed files with 1160 additions and 165 deletions

View File

@@ -9,6 +9,17 @@ eiger_1_5:
readoutPriority: async
softwareTrigger: False
eiger_9:
description: Eiger 9M detector
deviceClass: csaxs_bec.devices.jungfraujoch.eiger_9m.Eiger9M
deviceConfig:
detector_distance: 100
beam_center: [0, 0]
onFailure: raise
enabled: true
readoutPriority: async
softwareTrigger: False
ids_cam:
description: IDS camera for live image acquisition
deviceClass: csaxs_bec.devices.ids_cameras.IDSCamera

View File

@@ -1 +1,15 @@
############################################################
############################################################
##################### EPS ##################################
############################################################
x12saEPS:
description: X12SA EPS info and control
deviceClass: csaxs_bec.devices.epics.eps.EPS
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline

View File

@@ -0,0 +1,435 @@
"""EPS module for cSAXS beamline: defines the EPS device with its components and methods."""
# fmt: off
# Disable Black formatting for this file to preserve an easier readable structure for the component definitions.
# pylint: disable=line-too-long
from __future__ import annotations
import time
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, Kind
from ophyd_devices import PSIDeviceBase
logger = bec_logger.logger
# ---------------------------
# Registry: sections/channels
# ---------------------------
class EPSSubDevices(Device):
"""Base class for EPS sub-device components (e.g. alarms, valves, shutters). with common methods if needed."""
def describe(self) -> dict:
desc = super().describe()
for walk in self.walk_signals():
if walk.item.attr_name not in desc:
desc[walk.item.attr_name] = walk.item.describe()
return desc
class EPSAlarms(EPSSubDevices):
"""EPS alarms at the cSAXS beamline."""
eps_alarm_cnt = Cpt(EpicsSignalRO, read_pv="X12SA-EPS-PLC:AlarmCnt_EPS", add_prefix=("",), name="eps_alarm_cnt", kind=Kind.omitted, doc="X12SA EPS Alarm count", auto_monitor=True, labels={"alarm"})
mis_alarm_cnt = Cpt(EpicsSignalRO, read_pv="ARS00-MIS-PLC-01:AlarmCnt_Frontends", add_prefix=("",), name="mis_alarm_cnt", kind=Kind.omitted, doc="FrontEnd MIS Alarm count", auto_monitor=True, labels={"alarm"})
class ValvesFrontend(EPSSubDevices):
"""Valves frontend at the cSAXS beamline."""
fe_vvpg_0000 = Cpt(EpicsSignalRO, read_pv="X12SA-FE-VVPG-0000:PLC_OPEN", add_prefix=("",), name="fevvpg0000", kind=Kind.omitted, doc="FE-VVPG-0000", auto_monitor=True, labels={"valve"})
fe_vvpg_1010 = Cpt(EpicsSignalRO, read_pv="X12SA-FE-VVPG-1010:PLC_OPEN", add_prefix=("",), name="fevvpg1010", kind=Kind.omitted, doc="FE-VVPG-1010", auto_monitor=True, labels={"valve"})
fe_vvfv_2010 = Cpt(EpicsSignalRO, read_pv="X12SA-FE-VVFV-2010:PLC_OPEN", add_prefix=("",), name="fevvfv2010", kind=Kind.omitted, doc="FE-VVFV-2010", auto_monitor=True, labels={"valve"})
fe_vvpg_2010 = Cpt(EpicsSignalRO, read_pv="X12SA-FE-VVPG-2010:PLC_OPEN", add_prefix=("",), name="fevvpg2010", kind=Kind.omitted, doc="FE-VVPG-2010", auto_monitor=True, labels={"valve"})
class ValvesOptics(EPSSubDevices):
"""Valves at the optics hutch."""
op_vvpg_1010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-1010:PLC_OPEN", add_prefix=("",), name="opvvpg1010", kind=Kind.omitted, doc="OP-VVPG-1010", auto_monitor=True, labels={"valve"})
op_vvpg_2010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-2010:PLC_OPEN", add_prefix=("",), name="opvvpg2010", kind=Kind.omitted, doc="OP-VVPG-2010", auto_monitor=True, labels={"valve"})
op_vvpg_3010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-3010:PLC_OPEN", add_prefix=("",), name="opvvpg3010", kind=Kind.omitted, doc="OP-VVPG-3010", auto_monitor=True, labels={"valve"})
op_vvpg_3020 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-3020:PLC_OPEN", add_prefix=("",), name="opvvpg3020", kind=Kind.omitted, doc="OP-VVPG-3020", auto_monitor=True, labels={"valve"})
op_vvpg_4010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-4010:PLC_OPEN", add_prefix=("",), name="opvvpg4010", kind=Kind.omitted, doc="OP-VVPG-4010", auto_monitor=True, labels={"valve"})
op_vvpg_5010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-5010:PLC_OPEN", add_prefix=("",), name="opvvpg5010", kind=Kind.omitted, doc="OP-VVPG-5010", auto_monitor=True, labels={"valve"})
op_vvpg_6010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-6010:PLC_OPEN", add_prefix=("",), name="opvvpg6010", kind=Kind.omitted, doc="OP-VVPG-6010", auto_monitor=True, labels={"valve"})
op_vvpg_7010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-7010:PLC_OPEN", add_prefix=("",), name="opvvpg7010", kind=Kind.omitted, doc="OP-VVPG-7010", auto_monitor=True, labels={"valve"})
class ValvesEndstation(EPSSubDevices):
"""Endstation valves at the cSAXS beamline."""
es_vvpg_1010 = Cpt(EpicsSignalRO, read_pv="X12SA-ES-VVPG-1010:PLC_OPEN", add_prefix=("",), name="esvvpg1010", kind=Kind.omitted, doc="ES-VVPG-1010", auto_monitor=True, labels={"valve"})
class ShuttersFrontend(EPSSubDevices):
"""Shutters frontend."""
fe_psh1 = Cpt(EpicsSignalRO, read_pv="X12SA-FE-PSH1-EMLS-0010:OPEN", add_prefix=("",), name="fepsh1", kind=Kind.omitted, doc="FE-PSH1-EMLS-0010", auto_monitor=True, labels={"shutter"})
fe_sto1 = Cpt(EpicsSignalRO, read_pv="X12SA-FE-STO1-EMLS-0010:OPEN", add_prefix=("",), name="festo1", kind=Kind.omitted, doc="FE-STO1-EMLS-0010", auto_monitor=True, labels={"shutter"})
class ShuttersEndstation(EPSSubDevices):
"""Shutters at the endstation."""
es_psh17010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-PSH1-EMLS-7010:OPEN", add_prefix=("",), name="espsh17010", kind=Kind.omitted, doc="OP-PSH1-EMLS-7010", auto_monitor=True, labels={"shutter"})
class DMMMonochromator(EPSSubDevices):
"""DMM monochromator signals at the cSAXS beamline."""
dmm_temp_surface_1 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-ETTC-3010:TEMP", add_prefix=("",), name="dmm_temp_surface_1", kind=Kind.omitted, doc="DMM Temp Surface 1", auto_monitor=True, labels={"temp"})
dmm_temp_surface_2 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-ETTC-3020:TEMP", add_prefix=("",), name="dmm_temp_surface_2", kind=Kind.omitted, doc="DMM Temp Surface 2", auto_monitor=True, labels={"temp"})
dmm_temp_shield_1_disaster = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-ETTC-3030:TEMP", add_prefix=("",), name="dmm_temp_shield_1_disaster", kind=Kind.omitted, doc="DMM Temp Shield 1 (disaster)", auto_monitor=True, labels={"temp"})
dmm_temp_shield_2_disaster = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-ETTC-3040:TEMP", add_prefix=("",), name="dmm_temp_shield_2_disaster", kind=Kind.omitted, doc="DMM Temp Shield 2 (disaster)", auto_monitor=True, labels={"temp"})
dmm_translation_thru = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMLS-3010:THRU", add_prefix=("",), name="dmm_translation_thru", kind=Kind.omitted, doc="DMM Translation ThruPos", auto_monitor=True, labels={"switch"})
dmm_translation_in = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMLS-3020:IN", add_prefix=("",), name="dmm_translation_in", kind=Kind.omitted, doc="DMM Translation InPos", auto_monitor=True, labels={"switch"})
dmm_bragg_thru = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMLS-3030:THRU", add_prefix=("",), name="dmm_bragg_thru", kind=Kind.omitted, doc="DMM Bragg ThruPos", auto_monitor=True, labels={"switch"})
dmm_bragg_in = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMLS-3040:IN", add_prefix=("",), name="dmm_bragg_in", kind=Kind.omitted, doc="DMM Bragg InPos", auto_monitor=True, labels={"switch"})
dmm_heater_fault_xtal_1 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMSW-3050:SWITCH", add_prefix=("",), name="dmm_heater_fault_xtal_1", kind=Kind.omitted, doc="DMM Heater Fault XTAL 1", auto_monitor=True, labels={"fault"})
dmm_heater_fault_xtal_2 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMSW-3060:SWITCH", add_prefix=("",), name="dmm_heater_fault_xtal_2", kind=Kind.omitted, doc="DMM Heater Fault XTAL 2", auto_monitor=True, labels={"fault"})
dmm_heater_fault_support_1 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMSW-3070:SWITCH", add_prefix=("",), name="dmm_heater_fault_support_1", kind=Kind.omitted, doc="DMM Heater Fault Support 1", auto_monitor=True, labels={"fault"})
dmm_energy = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM1:ENERGY-GET", add_prefix=("",), name="dmm_energy", kind=Kind.omitted, doc="DMM Energy", auto_monitor=True, labels={"energy"})
dmm_position = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM1:POSITION", add_prefix=("",), name="dmm_position", kind=Kind.omitted, doc="DMM Position", auto_monitor=True, labels={"string"})
dmm_stripe = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM1:STRIPE", add_prefix=("",), name="dmm_stripe", kind=Kind.omitted, doc="DMM Stripe", auto_monitor=True, labels={"string"})
class CCMMonochromator(EPSSubDevices):
"""CCM monochromator signals at the cSAXS beamline."""
ccm_temp_crystal = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM-ETTC-4010:TEMP", add_prefix=("",), name="ccm_temp_crystal", kind=Kind.omitted, doc="CCM Temp Crystal", auto_monitor=True, labels={"temp"})
ccm_temp_shield_disaster = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM-ETTC-4020:TEMP", add_prefix=("",), name="ccm_temp_shield_disaster", kind=Kind.omitted, doc="CCM Temp Shield (disaster)", auto_monitor=True, labels={"temp"})
ccm_heater_fault_1 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM-EMSW-4010:SWITCH", add_prefix=("",), name="ccm_heater_fault_1", kind=Kind.omitted, doc="CCM Heater Fault 1", auto_monitor=True, labels={"fault"})
ccm_heater_fault_2 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM-EMSW-4020:SWITCH", add_prefix=("",), name="ccm_heater_fault_2", kind=Kind.omitted, doc="CCM Heater Fault 2", auto_monitor=True, labels={"fault"})
ccm_heater_fault_3 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM-EMSW-4030:SWITCH", add_prefix=("",), name="ccm_heater_fault_3", kind=Kind.omitted, doc="CCM Heater Fault 3", auto_monitor=True, labels={"fault"})
ccm_energy = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM1:ENERGY-GET", add_prefix=("",), name="ccm_energy", kind=Kind.omitted, doc="CCM Energy", auto_monitor=True, labels={"energy"})
ccm_position = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM1:POSITION", add_prefix=("",), name="ccm_position", kind=Kind.omitted, doc="CCM Position", auto_monitor=True, labels={"string"})
class CoolingWater(EPSSubDevices):
"""Cooling water signals at the cSAXS beamline."""
op_sl1_efsw_2010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-SL1-EFSW-2010:FLOW", add_prefix=("",), name="op_sl1_efsw_2010_flow", kind=Kind.omitted, doc="OP-SL1-EFSW-2010", auto_monitor=True, labels={"flow"})
op_sl2_efsw_2010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-SL2-EFSW-2010:FLOW", add_prefix=("",), name="op_sl2_efsw_2010_flow", kind=Kind.omitted, doc="OP-SL2-EFSW-2010", auto_monitor=True, labels={"flow"})
op_eb1_efsw_5010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-EB1-EFSW-5010:FLOW", add_prefix=("",), name="op_eb1_efsw_5010_flow", kind=Kind.omitted, doc="OP-EB1-EFSW-5010", auto_monitor=True, labels={"flow"})
op_eb1_efsw_5020_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-EB1-EFSW-5020:FLOW", add_prefix=("",), name="op_eb1_efsw_5020_flow", kind=Kind.omitted, doc="OP-EB1-EFSW-5020", auto_monitor=True, labels={"flow"})
op_sl3_efsw_5010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-SL3-EFSW-5010:FLOW", add_prefix=("",), name="op_sl3_efsw_5010_flow", kind=Kind.omitted, doc="OP-SL3-EFSW-5010", auto_monitor=True, labels={"flow"})
op_kb_efsw_6010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-KB-EFSW-6010:FLOW", add_prefix=("",), name="op_kb_efsw_6010_flow", kind=Kind.omitted, doc="OP-KB-EFSW-6010", auto_monitor=True, labels={"flow"})
op_psh1_efsw_7010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-PSH1-EFSW-7010:FLOW", add_prefix=("",), name="op_psh1_efsw_7010_flow", kind=Kind.omitted, doc="OP-PSH1-EFSW-7010", auto_monitor=True, labels={"flow"})
es_eb2_efsw_1010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-ES-EB2-EFSW-1010:FLOW", add_prefix=("",), name="es_eb2_efsw_1010_flow", kind=Kind.omitted, doc="ES-EB2-EFSW-1010", auto_monitor=True, labels={"flow"})
op_cs_ecvw_0010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CS-ECVW-0010:PLC_OPEN", add_prefix=("",), name="op_cs_ecvw_0010", kind=Kind.omitted, doc="OP-CS-ECVW-0010", auto_monitor=True, labels={"valve"})
op_cs_ecvw_0020 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CS-ECVW-0020:PLC_OPEN", add_prefix=("",), name="op_cs_ecvw_0020", kind=Kind.omitted, doc="OP-CS-ECVW-0020", auto_monitor=True, labels={"valve"})
class EPS(PSIDeviceBase):
"""EPS device for the cSAXS beamline."""
USER_ACCESS = [
"show_all",
"water_cooling_op",
]
alarms = Cpt(EPSAlarms, name="alarms", doc="EPS Alarms")
valves_frontend = Cpt(ValvesFrontend, name="valves_frontend", doc="Valves Frontend")
valves_optics = Cpt(ValvesOptics, name="valves_optics", doc="Valves Optics Hutch")
valves_es = Cpt(ValvesEndstation, name="valves_es", doc="Valves ES Hutch")
shutters_frontend = Cpt(ShuttersFrontend, name="shutters_frontend", doc="Shutters Frontend")
shutters_es = Cpt(ShuttersEndstation, name="shutters_es", doc="Shutters Endstation")
dmm_monochromator = Cpt(DMMMonochromator, name="dmm_monochromator", doc="DMM Monochromator")
ccm_monochromator = Cpt(CCMMonochromator, name="ccm_monochromator", doc="CCM Monochromator")
cooling_water = Cpt(CoolingWater, name="cooling_water", doc="Cooling Water")
# Acknowledgment signals for PLC communication (if needed for future use)
ackerr = Cpt(EpicsSignal, read_pv="X12SA-EPS-PLC:ACKERR-REQUEST", add_prefix=("",), name="ackerr", kind=Kind.omitted, doc="ACKERR request - OP-CS-ECVW-0020", auto_monitor=True, labels={"request"})
request = Cpt(EpicsSignal, read_pv="X12SA-OP-CS-ECVW:PLC_REQUEST", add_prefix=("",), name="op_cs_ecvw_request", kind=Kind.omitted, doc="PLC request - OP-CS-ECVW-PLC_REQUEST", auto_monitor=True, labels={"request"})
def _notify(self, msg: str, show_as_client_msg: bool = True):
"""Utility method to print a message, and optionally send it to the client UI if it should be shown also as a client message."""
try:
if show_as_client_msg:
self.device_manager.connector.send_client_info(msg, scope="", show_asap=True)
else:
print(msg)
except Exception:
logger.error(f"Failed to send client message, falling back to print: {msg}")
print(str(msg))
# ----------------------------------------------------------
# Water cooling operation
# ----------------------------------------------------------
def safe_get(self, sig, default=None):
"""Helper method to safely get a signal value, returning a default if there's an error."""
try:
return sig.get()
except Exception as ex:
logger.warning(f"Failed to get signal {sig.pvname}: {ex}")
return default
def water_cooling_op(self):
"""
Open ECVW valves, reset EPS alarms, monitor for 20s,
then ensure stability (valves remain open) for 10s.
All messages sent to client.
"""
POLL_PERIOD = 2
TIMEOUT = 20
STABILITY = 15
self._notify("=== Water Cooling Operation ===")
# --- Signals ---
eps_alarm_sig = self.alarms.eps_alarm_cnt
ackerr = self.ackerr
request = self.request
valves = [self.cooling_water.op_cs_ecvw_0010, self.cooling_water.op_cs_ecvw_0020]
# Flow channels list extracted from CHANNELS
flow_items = [walk.item for walk in self.cooling_water.walk_signals() if "flow" in walk.item._ophyd_labels_]
# --- Step 1: EPS alarm reset ---
alarm_value = self.safe_get(eps_alarm_sig, 0)
if alarm_value and alarm_value > 0:
self._notify(f"[WaterCooling] EPS alarms present ({alarm_value}) → resetting…")
try:
ackerr.put(1)
except Exception as ex:
self._notify(f"[WaterCooling] WARNING: ACKERR write failed: {ex}")
time.sleep(0.3)
else:
self._notify("[WaterCooling] No EPS alarms detected.")
# --- Step 2: Issue open request ---
self._notify("[WaterCooling] Sending cooling-valve OPEN request…")
try:
request.put(1)
except Exception as ex:
self._notify(f"[WaterCooling] ERROR: Failed to send OPEN request: {ex}")
return False
# --- Step 3: Monitoring loop (clean client table output) ---
start = time.time()
end = start + TIMEOUT
stable_until = None
# Print (server-side) header once
print("Monitoring valves and flow sensors...")
print(f" Valves: {valves[0].attr_name[-4:]}, {valves[1].attr_name[-4:]}")
print(f" Note: stability requires valves to remain OPEN for {STABILITY} seconds.")
# One table header to the client (via device manager)
# Fixed-width columns for alignment in monospaced UI
table_header = f"{'Time':>6} | {'Valves':<21} | {'Flows (OK/FAIL/N/A)':<20}"
self._notify(table_header)
def snapshot():
# Valve snapshot
v_states = [self.safe_get(v, None) for v in valves]
v1 = f"{valves[0].attr_name[-4:]}=" + ("OPEN " if v_states[0] is True or v_states[0] == 1 else "CLOSED" if v_states[0] is False or v_states[0] == 0 else "N/A ")
v2 = f"{valves[1].attr_name[-4:]}=" + ("OPEN " if v_states[1] is True or v_states[1] == 1 else "CLOSED" if v_states[1] is False or v_states[1] == 0 else "N/A ")
# 2 valves with a single space between => width ~ 21
valve_str = f"{v1} {v2}"
# Flow summary: OK/FAIL/N/A counts (compact)
flow_states = []
for fsig in flow_items:
fval = self.safe_get(fsig, None)
flow_states.append(True if fval in (1, True) else False if fval in (0, False) else None)
ok = sum(1 for f in flow_states if f is True)
fail = sum(1 for f in flow_states if f is False)
na = sum(1 for f in flow_states if f is None)
flow_summary = f"{ok:>2} / {fail:>2} / {na:>2}"
return v_states, valve_str, flow_summary
while True:
# TODO Consider adding a timeout to avoid infinite loop.
now = time.time()
elapsed = int(now - start)
if now > end:
# One last line to client
v_states, valves_s, flows_s = snapshot()
self._notify(f"{elapsed:>6}s | {valves_s:<21} | {flows_s:<20}")
print("→ TIMEOUT: Cooling valves failed to remain OPEN.")
return False
# Live snapshot
v_states, valves_s, flows_s = snapshot()
# Exactly one concise line to client per cycle
self._notify(f"{elapsed:>6}s | {valves_s:<21} | {flows_s:<20}")
both_open = all(s is not None and bool(s) for s in v_states)
if both_open:
if stable_until is None:
stable_until = now + STABILITY
print(f"[WaterCooling] Both valves OPEN → starting {STABILITY}s stability window…")
else:
if now >= stable_until:
print("→ SUCCESS: Valves remained OPEN during stability window.")
return True
else:
if stable_until is not None:
print("[WaterCooling] Valve closed again → restarting stability window.")
stable_until = None
time.sleep(POLL_PERIOD)
def show_all(self):
red = "\x1b[91m"
green = "\x1b[92m"
white = "\x1b[0m"
bold = "\x1b[1m"
cyan = "\x1b[96m"
# ---- New: enum maps for numeric -> string rendering ----
POSITION_ENUM = {0: "out of beam", 1: "in beam"}
STRIPE_ENUM = {0: "Stripe 1 W/B4C", 1: "Stripe 2 NiV/B4C"}
POSITION_ATTRS = {self.dmm_monochromator.dmm_position.attr_name, self.ccm_monochromator.ccm_position.attr_name}
STRIPE_ATTRS = {self.dmm_monochromator.dmm_stripe.attr_name}
def is_bool_like(v):
return isinstance(v, (bool, int)) and v in (0, 1, True, False)
# ---- Changed: accept attr in formatter so we can apply enum mapping ----
def fmt_value(value: any, signal: EpicsSignalRO):
if value is None:
return f"{red}MISSING{white}"
attr = signal.attr_name
# ---------- Explicit enum mappings by attribute ----------
if attr in POSITION_ATTRS:
# Position comes as numeric 0/1
try:
iv = int(value)
return POSITION_ENUM.get(iv, f"{iv}")
except Exception:
# Fallback if its already a string or unexpected
return f"{value}"
if attr in STRIPE_ATTRS:
# Stripe comes as numeric 0/1
try:
iv = int(value)
return STRIPE_ENUM.get(iv, f"{iv}")
except Exception:
return f"{value}"
# ------------------- TEMPERATURE -------------------
if "temp" in signal._ophyd_labels_ and isinstance(value, (int, float)):
return f"{value:.1f}"
# ------------------- ENERGY ------------------------
if "energy" in signal._ophyd_labels_ and isinstance(value, (int, float)):
return f"{value:.4f}"
# ------------------- STRINGS -----------------------
if "string" in signal._ophyd_labels_ or "position" in signal._ophyd_labels_:
# For other strings, just echo the value
return f"{value}"
# ------------------- SWITCH (ACTIVE/INACTIVE) ------
if "switch" in signal._ophyd_labels_ and is_bool_like(value):
return f"{green+'ACTIVE'+white if value else red+'INACTIVE'+white}"
# ------------------- FAULT (OK/FAULT) --------------
if "fault" in signal._ophyd_labels_ and is_bool_like(value):
return f"{green+'OK'+white if not value else red+'FAULT'+white}"
# ------------------- VALVE/SHUTTER -----------------
if ("valve" in signal._ophyd_labels_ or "shutter" in signal._ophyd_labels_) and is_bool_like(value):
return f"{green+'OPEN'+white if value else red+'CLOSED'+white}"
# ------------------- FLOW (OK/FAIL) ----------------
if "flow" in signal._ophyd_labels_ and is_bool_like(value):
return f"{green}OK{white}" if bool(value) else f"{red}FAIL{white}"
# ------------------- FALLBACK -----------------------
return f"{value}"
# ------------------- PRINT START ---------------------
print(f"{bold}X12SA EPS status{white}")
for name, component in self._sig_attrs.items():
sub_device = getattr(self, name)
rows = []
# Only print sub-devices, not individual request signals
if not isinstance(sub_device, Device):
continue
print(f"\n{bold}{component.doc}{white}")
for sub_walk in sub_device.walk_components():
cpt: Cpt = sub_walk.item
it: EpicsSignalRO = getattr(sub_device, cpt.attr)
val = self.safe_get(it)
rows.append((cpt.doc, val, it))
label_width = max(32, *(len(label) for (label, _, _) in rows))
for label, value, it in rows:
fv = fmt_value(value, it) # <-- pass attr to formatter
print(f" - {label:<{label_width}} {fv}")
if sub_device.attr_name == "cooling_water":
v1 = self.safe_get(self.cooling_water.op_cs_ecvw_0010)
v2 = self.safe_get(self.cooling_water.op_cs_ecvw_0020)
def closed(v):
return is_bool_like(v) and not bool(v)
if closed(v1) and closed(v2):
print(f"\n{cyan}Hint:{white} Both water cooling valves are CLOSED.\n" f"You can open them using: {bold}dev.x12saEPS.water_cooling_op(){white}")
# fmt: on
# ----------------------------------------------------------
# Consistency report
# ----------------------------------------------------------
# def consistency_report(self, *, verbose=True):
# missing = []
# dupes = []
# seen = {}
# for sub_device in self.walk_components():
# section = sub_device.name
# for walk in sub_device.walk_components():
# cpt: Cpt = walk.ancestors[-1]
# it: EpicsSignalRO = walk.item
# if not hasattr(self, it["attr"]):
# missing.append((section, it["attr"], it["label"], it["pv"]))
# pv = it["pv"]
# if pv in seen:
# dupes.append((pv, seen[pv], (section, it["attr"], it["label"])))
# else:
# seen[pv] = (section, it["attr"], it["label"])
# if verbose:
# print("=== Consistency Report ===")
# if missing:
# print("\nMissing attributes:")
# for sec, a, lbl, pv in missing:
# print(f" - [{sec}] {a} {lbl} pv={pv}")
# else:
# print("\nNo missing attributes.")
# if dupes:
# print("\nDuplicate PVs:")
# for pv, f1, f2 in dupes:
# print(f" {pv} → {f1} AND {f2}")
# else:
# print("\nNo duplicate PVs.")
# return {"missing_attrs": missing, "duplicate_pvs": dupes}

View File

@@ -317,10 +317,14 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
old_value: Previous value of the signal.
value: New value of the signal.
"""
scan_done = bool(value == self._num_total_triggers)
self.progress.put(value=value, max_value=self._num_total_triggers, done=scan_done)
if scan_done:
self._scan_done_event.set()
try:
scan_done = bool(value == self._num_total_triggers)
self.progress.put(value=value, max_value=self._num_total_triggers, done=scan_done)
if scan_done:
self._scan_done_event.set()
except Exception:
content = traceback.format_exc()
logger.info(f"Device {self.name} error: {content}")
def on_stage(self) -> None:
"""

View File

@@ -0,0 +1,48 @@
# Overview
Integration module for Eiger detectors at the cSAXS beamline with JungfrauJoch backend.
There are currently two supported Eiger detectors:
- EIGER 1.5M
- EIGER 9M
This module provides a base integration for both detectors. A short list of useful
information is also provided below.
## JungfrauJoch Service
The JungfrauJoch WEB UI is available on http://sls-jfjoch-001:8080. This is an interface
to the broker which runs on sls-jfjoch-001.psi.ch. The writer service runs on
xbl-daq-34.psi.ch. Permissions to get access to these machines and run systemctl or
journalctl commands can be requested with the Infrastructure and Services group in AWI.
Beamline scientists need to check if they have the necessary permissions to connect
to these machines and run the commands below.
Useful commands for the broker service on sls-jfjoch-001.psi.ch:
- sudo systemctl status jfjoch_broker # Check status
- sudo systemctl start jfjoch_broker # Start service
- sudo systemctl stop jfjoch_broker # Stop service
- sudo systemctl restart jfjoch_broker # Restart service
For the writer service on xbl-daq-34.psi.ch:
- sudo journalctl -u jfjoch_writer -f # streams live logs
- sudo systemctl status jfjoch_writer # Check status
- sudo systemctl start jfjoch_writer # Start service
- sudo systemctl stop jfjoch_writer # Stop service
- sudo systemctl restart jfjoch_writer # Restart service
More information about the JungfrauJoch and API client can be found at: (https://jungfraujoch.readthedocs.io/en/latest/index.html)
### JungfrauJoch API Client
A thin wrapper for the JungfrauJoch API client is provided in the [jungfrau_joch_client](./jungfrau_joch_client.py).
Details about the specific integration are provided in the code.
## Eiger implementation
The Eiger detector integration is provided in the [eiger.py](./eiger.py) module. It provides a base integration for both Eiger 1.5M and Eiger 9M detectors.
Logic specific to each detector is implemented in the respective modules:
- [eiger_1_5m.py](./eiger_1_5m.py)
- [eiger_9m.py](./eiger_9m.py)
With the current implementation, the detector initialization should be done by a beamline scientist through the JungfrauJoch WEB UI by choosing the
appropriate detector (1.5M or 9M) before loading the device config with BEC. BEC will check upon connecting if the selected detector matches the expected one.
A preview stream for images is also provided which is forwarded and accessible through the `preview_image` signal.
For more specific details, please check the code documentation.

View File

@@ -1,34 +1,23 @@
"""
Generic integration of JungfrauJoch backend with Eiger detectors
for the cSAXS beamline at the Swiss Light Source.
The WEB UI is available on http://sls-jfjoch-001:8080
Integration module for Eiger detectors at the cSAXS beamline with JungfrauJoch backend.
NOTE: this may not be the best place to store this information. It should be migrated to
beamline documentation for debugging of Eiger & JungfrauJoch.
A few notes on setup and operation of the Eiger detectors through the JungfrauJoch broker:
The JungfrauJoch server for cSAXS runs on sls-jfjoch-001.psi.ch
User with sufficient rights may use:
- sudo systemctl restart jfjoch_broker
- sudo systemctl status jfjoch_broker
to check and/or restart the broker for the JungfrauJoch server.
Some extra notes for setting up the detector:
- If the energy on JFJ is set via DetectorSettings, the variable in DatasetSettings will be ignored
- Changes in energy may take time, good to implement logic that only resets energy if needed.
- For the Eiger, the frame_time_us in DetectorSettings is ignored, only the frame_time_us in
the DatasetSettings is relevant
- The bit_depth will be adjusted automatically based on the exp_time. Here, we need to ensure
that subsequent triggers properly
consider the readout_time of the boards. For Jungfrau detectors, the difference between
count_time_us and frame_time_us is the readout_time of the boards. For the Eiger, this needs
to be taken into account during the integration.
that subsequent triggers properly consider the readout_time of the boards. For the Eiger detectors
at cSAXS, a readout time of 20us is configured through the JungfrauJoch deployment config. This
setting is sufficiently large for the detectors if they run in parallel mode.
- beam_center and detector settings are required input arguments, thus, they may be set to wrong
values for acquisitions to start. Please keep this in mind.
Hardware related notes:
- If there is an HW issue with the detector, power cycling may help.
- The sls_detector package is available on console on /sls/X12SA/data/gac-x12sa/erik/micromamba
- The sls_detector package is available on console on /sls/x12sa/applications/erik/micromamba
- Run: source setup_9m.sh # Be careful, this connects to the detector, so it should not be
used during operation
- Useful commands:
@@ -39,9 +28,6 @@ Hardware related notes:
- cd power_control_user/
- ./on
- ./off
Further information that may be relevant for debugging:
JungfrauJoch - one needs to connect to the jfj-server (sls-jfjoch-001)
"""
from __future__ import annotations
@@ -84,10 +70,19 @@ class EigerError(Exception):
class Eiger(PSIDeviceBase):
"""
Base integration of the Eiger1.5M and Eiger9M at cSAXS. All relevant
Base integration of the Eiger1.5M and Eiger9M at cSAXS.
Args:
name (str) : Name of the device
detector_name (str): Name of the detector. Supports ["EIGER 9M", "EIGER 8.5M (tmp)", "EIGER 1.5M"]
host (str): Hostname of the Jungfrau Joch server.
port (int): Port of the Jungfrau Joch server.
scan_info (ScanInfo): The scan info to use.
device_manager (DeviceManagerDS): The device manager to use.
**kwargs: Additional keyword arguments.
"""
USER_ACCESS = ["detector_distance", "beam_center"]
USER_ACCESS = ["set_detector_distance", "set_beam_center"]
file_event = Cpt(FileEventSignal, name="file_event")
preview_image = Cpt(PreviewSignal, name="preview_image", ndim=2)
@@ -105,23 +100,12 @@ class Eiger(PSIDeviceBase):
device_manager=None,
**kwargs,
):
"""
Initialize the PSI Device Base class.
Args:
name (str) : Name of the device
detector_name (str): Name of the detector. Supports ["EIGER 9M", "EIGER 8.5M (tmp)", "EIGER 1.5M"]
host (str): Hostname of the Jungfrau Joch server.
port (int): Port of the Jungfrau Joch server.
scan_info (ScanInfo): The scan info to use.
device_manager (DeviceManagerDS): The device manager to use.
**kwargs: Additional keyword arguments.
"""
super().__init__(name=name, scan_info=scan_info, device_manager=device_manager, **kwargs)
self._host = f"{host}:{port}"
self.jfj_client = JungfrauJochClient(host=self._host, parent=self)
# NOTE: fetch this information from JungfrauJochClient during on_connected!
self.jfj_preview_client = JungfrauJochPreview(
url="tcp://129.129.95.114:5400", cb=self.preview_image.put
url="tcp://129.129.95.114:5400", cb=self._preview_callback
) # IP of sls-jfjoch-001.psi.ch on port 5400 for ZMQ stream
self.device_manager = device_manager
self.detector_name = detector_name
@@ -129,53 +113,102 @@ class Eiger(PSIDeviceBase):
self._beam_center = beam_center
self._readout_time = readout_time
self._full_path = ""
self._num_triggers = 0
self._wait_for_on_complete = 20 # seconds
if self.device_manager is not None:
self.device_manager: DeviceManagerDS
def _preview_callback(self, message: dict) -> None:
"""
Callback method for handling preview messages as received from the JungfrauJoch preview stream.
These messages are dictionary dumps as described in the JFJ ZMQ preview stream documentation.
(https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream).
Args:
message (dict): The message received from the preview stream.
"""
if message.get("type", "") == "image":
data = message.get("data", {}).get("default", None)
if data is None:
logger.error(f"Received image message on device {self.name} without data.")
return
logger.info(f"Received preview image on device {self.name}")
self.preview_image.put(data)
# pylint: disable=missing-function-docstring
@property
def detector_distance(self) -> float:
"""The detector distance in mm."""
return self._detector_distance
@detector_distance.setter
def detector_distance(self, value: float) -> None:
"""Set the detector distance in mm."""
if value <= 0:
raise ValueError("Detector distance must be a positive value.")
self._detector_distance = value
def set_detector_distance(self, distance: float) -> None:
"""
Set the detector distance in mm.
Args:
distance (float): The detector distance in mm.
"""
self.detector_distance = distance
# pylint: disable=missing-function-docstring
@property
def beam_center(self) -> tuple[float, float]:
"""The beam center in pixels. (x,y)"""
return self._beam_center
@beam_center.setter
def beam_center(self, value: tuple[float, float]) -> None:
"""Set the beam center in pixels. (x,y)"""
if any(coord < 0 for coord in value):
raise ValueError("Beam center coordinates must be non-negative.")
self._beam_center = value
def on_init(self) -> None:
def set_beam_center(self, x: float, y: float) -> None:
"""
Called when the device is initialized.
Set the beam center coordinates in pixels.
No siganls are connected at this point,
thus should not be set here but in on_connected instead.
Args:
x (float): The x coordinate of the beam center in pixels.
y (float): The y coordinate of the beam center in pixels.
"""
self.beam_center = (x, y)
def on_init(self) -> None:
"""Hook called during device initialization."""
# pylint: disable=arguments-differ
def wait_for_connection(self, timeout: float = 10) -> None:
"""
Wait for the device to be connected to the JungfrauJoch backend.
Args:
timeout (float): Timeout in seconds to wait for the connection.
"""
self.jfj_client.api.status_get(_request_timeout=timeout) # If connected, this responds
def on_connected(self) -> None:
"""
Hook called after the device is connected to through the device server.
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
Default values for signals should be set here. Currently, the detector needs to be
initialised manually through the WEB UI of JungfrauJoch. Once agreed upon, the automated
initialisation can be re-enabled here (code commented below).
"""
start_time = time.time()
logger.debug(f"On connected called for {self.name}")
self.jfj_client.stop(request_timeout=3)
# Check which detector is selected
# Get available detectors
available_detectors = self.jfj_client.api.config_select_detector_get(_request_timeout=5)
logger.debug(f"Available detectors {available_detectors}")
# Get current detector
current_detector_name = ""
if available_detectors.current_id:
if available_detectors.current_id is not None:
detector_selection = [
det.description
for det in available_detectors.detectors
@@ -190,8 +223,9 @@ class Eiger(PSIDeviceBase):
raise RuntimeError(
f"Detector {self.detector_name} is not in IDLE state, current state: {self.jfj_client.detector_state}. Please initialize the detector in the WEB UI: {self._host}."
)
# TODO - check again once Eiger should be initialized automatically, currently human initialization is expected
# # Once the automation should be enabled, we may use here
# TODO - Currently the initialisation of the detector is done manually through the WEB UI. Once adjusted
# this can be automated here again.
# detector_selection = [
# det for det in available_detectors.detectors if det.id == self.detector_name
# ]
@@ -207,41 +241,51 @@ class Eiger(PSIDeviceBase):
# Setup Detector settings, here we may also set the energy already as this might be time consuming
settings = DetectorSettings(frame_time_us=int(500), timing=DetectorTiming.TRIGGER)
self.jfj_client.set_detector_settings(settings, timeout=10)
self.jfj_client.set_detector_settings(settings, timeout=5)
# Set the file writer to the appropriate output for the HDF5 file
file_writer_settings = FileWriterSettings(overwrite=True, format=FileWriterFormat.NXMXVDS)
logger.debug(
f"Setting writer_settings: {yaml.dump(file_writer_settings.to_dict(), indent=4)}"
)
# Setup the file writer settings
self.jfj_client.api.config_file_writer_put(
file_writer_settings=file_writer_settings, _request_timeout=10
)
# Start the preview client
self.jfj_preview_client.connect()
self.jfj_preview_client.start()
logger.info(f"Connected to JungfrauJoch preview stream at {self.jfj_preview_client.url}")
logger.info(
f"Device {self.name} initialized after {time.time()-start_time:.2f}s. Preview stream connected on url: {self.jfj_preview_client.url}"
)
def on_stage(self) -> DeviceStatus | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info object.
Hook called when staging the device. Information about the upcoming scan can be accessed from the scan_info object.
scan_msg = self.scan_info.msg
"""
start_time = time.time()
scan_msg = self.scan_info.msg
# Set acquisition parameter
# TODO add check of mono energy, this can then also be passed to DatasetSettings
# TODO: Check mono energy from device in BEC
# Setting incident energy in keV
incident_energy = 12.0
# Setting up exp_time and num_triggers acquisition parameter
exp_time = scan_msg.scan_parameters.get("exp_time", 0)
if exp_time <= self._readout_time:
if exp_time <= self._readout_time: # Exp_time must be at least the readout time
raise ValueError(
f"Receive scan request for scan {scan_msg.scan_name} with exp_time {exp_time}s, which must be larger than the readout time {self._readout_time}s of the detector {self.detector_name}."
f"Value error on device {self.name}: Exposure time {exp_time}s is less than readout time {self._readout_time}s."
)
frame_time_us = exp_time #
ntrigger = int(scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"])
# Fetch file path
self._num_triggers = int(
scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"]
)
# Setting up the full path for file writing
self._full_path = get_full_path(scan_msg, name=f"{self.name}_master")
self._full_path = os.path.abspath(os.path.expanduser(self._full_path))
# Inform BEC about upcoming file event
self.file_event.put(
file_path=self._full_path,
@@ -249,11 +293,14 @@ class Eiger(PSIDeviceBase):
successful=False,
hinted_h5_entries={"data": "entry/data/data"},
)
# JFJ adds _master.h5 automatically
path = os.path.relpath(self._full_path, start="/sls/x12sa/data").removesuffix("_master.h5")
# Create dataset settings for API call.
data_settings = DatasetSettings(
image_time_us=int(frame_time_us * 1e6), # This is currently ignored
ntrigger=ntrigger,
image_time_us=int(exp_time * 1e6),
ntrigger=self._num_triggers,
file_prefix=path,
beam_x_pxl=int(self._beam_center[0]),
beam_y_pxl=int(self._beam_center[1]),
@@ -261,11 +308,15 @@ class Eiger(PSIDeviceBase):
incident_energy_ke_v=incident_energy,
)
logger.debug(f"Setting data_settings: {yaml.dump(data_settings.to_dict(), indent=4)}")
prep_time = start_time - time.time()
logger.debug(f"Prepared information for eiger to start acquisition in {prep_time:.2f}s")
self.jfj_client.wait_for_idle(timeout=10, request_timeout=10) # Ensure we are in IDLE state
prep_time = time.time()
self.jfj_client.wait_for_idle(timeout=10) # Ensure we are in IDLE state
self.jfj_client.start(settings=data_settings) # Takes around ~0.6s
logger.debug(f"Wait for IDLE and start call took {time.time()-start_time-prep_time:.2f}s")
# Time the stage process
logger.info(
f"Device {self.name} staged for scan. Time spent {time.time()-start_time:.2f}s,"
f" with {time.time()-prep_time:.2f}s spent with communication to JungfrauJoch."
)
def on_unstage(self) -> DeviceStatus:
"""Called while unstaging the device."""
@@ -278,7 +329,9 @@ class Eiger(PSIDeviceBase):
def _file_event_callback(self, status: DeviceStatus) -> None:
"""Callback to update the file_event signal when the acquisition is done."""
logger.info(f"Acquisition done callback called for {self.name} for status {status.success}")
logger.debug(
f"File event callback on complete status for device {self.name}: done={status.done}, successful={status.success}"
)
self.file_event.put(
file_path=self._full_path,
done=status.done,
@@ -287,19 +340,44 @@ class Eiger(PSIDeviceBase):
)
def on_complete(self) -> DeviceStatus:
"""Called to inquire if a device has completed a scans."""
"""
Called at the end of the scan. The method should implement an asynchronous wait for the
device to complete the acquisition. A callback to update the file_event signal is
attached that resolves the file event when the acquisition is done.
Returns:
DeviceStatus: The status object representing the completion of the acquisition.
"""
def wait_for_complete():
start_time = time.time()
timeout = 10
for _ in range(timeout):
if self.jfj_client.wait_for_idle(timeout=1, request_timeout=10):
# NOTE: This adjust the time (s) that should be waited for completion of the scan.
timeout = self._wait_for_on_complete
while time.time() - start_time < timeout:
if self.jfj_client.wait_for_idle(timeout=1, raise_on_timeout=False):
# TODO: Once available, add check for
statistics: MeasurementStatistics = (
self.jfj_client.api.statistics_data_collection_get(_request_timeout=5)
)
if statistics.images_collected < self._num_triggers:
raise EigerError(
f"Device {self.name} acquisition incomplete. "
f"Expected {self._num_triggers} triggers, "
f"but only {statistics.images_collected} were collected."
)
return
logger.info(
f"Waiting for device {self.name} to finish complete, time elapsed: "
f"{time.time() - start_time}."
)
statistics: MeasurementStatistics = self.jfj_client.api.statistics_data_collection_get(
_request_timeout=5
)
broker_status = self.jfj_client.jfj_status
raise TimeoutError(
f"Timeout after waiting for detector {self.name} to complete for {time.time()-start_time:.2f}s, measurement statistics: {yaml.dump(statistics.to_dict(), indent=4)}"
f"Timeout after waiting for device {self.name} to complete for {time.time()-start_time:.2f}s \n \n"
f"Broker status: \n{yaml.dump(broker_status.to_dict(), indent=4)} \n \n"
f"Measurement statistics: \n{yaml.dump(statistics.to_dict(), indent=4)}"
)
status = self.task_handler.submit_task(wait_for_complete, run=True)
@@ -312,7 +390,11 @@ class Eiger(PSIDeviceBase):
def on_stop(self) -> None:
"""Called when the device is stopped."""
self.jfj_client.stop(
request_timeout=0.5
) # Call should not block more than 0.5 seconds to stop all devices...
self.jfj_client.stop(request_timeout=0.5)
self.task_handler.shutdown()
def on_destroy(self):
"""Called when the device is destroyed."""
self.jfj_preview_client.stop()
self.on_stop()
return super().on_destroy()

View File

@@ -21,18 +21,18 @@ if TYPE_CHECKING: # pragma no cover
from bec_server.device_server.device_server import DeviceManagerDS
EIGER9M_READOUT_TIME_US = 500e-6 # 500 microseconds in s
DETECTOR_NAME = "EIGER 8.5M (tmp)" # "EIGER 9M""
DETECTOR_NAME = "EIGER 9M" # "EIGER 9M""
# pylint:disable=invalid-name
class Eiger9M(Eiger):
"""
Eiger 1.5M specific integration for the in-vaccum Eiger.
EIGER 9M specific integration for the in-vaccum Eiger.
The logic implemented here is coupled to the DelayGenerator integration,
repsonsible for the global triggering of all devices through a single Trigger logic.
Please check the eiger.py class for more details about the integration of relevant backend
services. The detector_name must be set to "EIGER 1.5M:
services. The detector_name must be set to "EIGER 9M":
"""
USER_ACCESS = Eiger.USER_ACCESS + [] # Add more user_access methods here.

View File

@@ -1,13 +1,15 @@
"""Module with client interface for the Jungfrau Joch detector API"""
"""Module with a thin client wrapper around the Jungfrau Joch detector API"""
from __future__ import annotations
import enum
import threading
import time
import traceback
from typing import TYPE_CHECKING
import requests
import yaml
from bec_lib.logger import bec_logger
from jfjoch_client.api.default_api import DefaultApi
from jfjoch_client.api_client import ApiClient
@@ -18,7 +20,7 @@ from jfjoch_client.models.detector_settings import DetectorSettings
logger = bec_logger.logger
if TYPE_CHECKING:
if TYPE_CHECKING: # pragma: no cover
from ophyd import Device
@@ -29,7 +31,10 @@ class JungfrauJochClientError(Exception):
class DetectorState(str, enum.Enum):
"""Possible Detector states for Jungfrau Joch detector"""
"""
Enum states of the BrokerStatus state. The pydantic model validates in runtime,
thus we keep the possible states here for a convenient overview and access.
"""
INACTIVE = "Inactive"
IDLE = "Idle"
@@ -40,13 +45,15 @@ class DetectorState(str, enum.Enum):
class JungfrauJochClient:
"""Thin wrapper around the Jungfrau Joch API client.
"""
Jungfrau Joch API client wrapper. It provides a thin wrapper methods around the API client,
that allow to connect, initialise, wait for state changes, set settings, start and stop
acquisitions.
sudo systemctl restart jfjoch_broker
sudo systemctl status jfjoch_broker
It looks as if the detector is not being stopped properly.
One module remains running, how can we restart the detector?
Args:
host (str): Hostname of the Jungfrau Joch broker service.
Default is "http://sls-jfjoch-001:8080"
parent (Device, optional): Parent ophyd device, used for logging purposes.
"""
def __init__(
@@ -59,50 +66,63 @@ class JungfrauJochClient:
self._parent_name = parent.name if parent else self.__class__.__name__
@property
def jjf_state(self) -> BrokerStatus:
"""Get the status of JungfrauJoch"""
def jfj_status(self) -> BrokerStatus:
"""Broker status of JungfrauJoch."""
response = self.api.status_get()
return BrokerStatus(**response.to_dict())
# pylint: disable=missing-function-docstring
@property
def initialised(self) -> bool:
"""Check if jfj is connected and ready to receive commands"""
return self._initialised
@initialised.setter
def initialised(self, value: bool) -> None:
"""Set the connected status"""
self._initialised = value
# TODO this is not correct, as it may be that the state in INACTIVE. Models are not in sync...
# REMOVE all model enums as most of the validation takes place in the Pydantic models, i.e. BrokerStatus here..
# pylint: disable=missing-function-docstring
@property
def detector_state(self) -> DetectorState:
"""Get the status of JungfrauJoch"""
return DetectorState(self.jjf_state.state)
return DetectorState(self.jfj_status.state)
def connect_and_initialise(self, timeout: int = 10, **kwargs) -> None:
"""Check if JungfrauJoch is connected and ready to receive commands"""
def connect_and_initialise(self, timeout: int = 10) -> None:
"""
Connect and initialise the JungfrauJoch detector. The detector must be in
IDLE state to become initialised. This is a blocking call, the timeout parameter
will be passed to the HTTP requests timeout method of the wait_for_idle method.
Args:
timeout (int): Timeout in seconds for the initialisation and waiting for IDLE state.
"""
status = self.detector_state
# TODO: #135 Check if the detector has to be in INACTIVE state before initialisation
if status != DetectorState.IDLE:
self.api.initialize_post() # This is a blocking call....
self.wait_for_idle(timeout, request_timeout=timeout) # Blocking call
self.api.initialize_post()
self.wait_for_idle(timeout)
self.initialised = True
def set_detector_settings(self, settings: dict | DetectorSettings, timeout: int = 10) -> None:
"""Set the detector settings. JungfrauJoch must be in IDLE, Error or Inactive state.
Note, the full settings have to be provided, otherwise the settings will be overwritten with default values.
"""
Set the detector settings. The state of JungfrauJoch must be in IDLE,
Error or Inactive state. Please note: a full set of setttings has to be provided,
otherwise the settings will be overwritten with default values.
Args:
settings (dict): dictionary of settings
timeout (int): Timeout in seconds for the HTTP request to set the settings.
"""
state = self.detector_state
if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]:
logger.info(
f"JungfrauJoch backend fo device {self._parent_name} is not in IDLE state,"
" waiting 1s before retrying..."
)
time.sleep(1) # Give the detector 1s to become IDLE, retry
state = self.detector_state
if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]:
raise JungfrauJochClientError(
f"Error in {self._parent_name}. Detector must be in IDLE, ERROR or INACTIVE state to set settings. Current state: {state}"
f"Error on {self._parent_name}. Detector must be in IDLE, ERROR or INACTIVE"
" state to set settings. Current state: {state}"
)
if isinstance(settings, dict):
@@ -110,28 +130,36 @@ class JungfrauJochClient:
try:
self.api.config_detector_put(detector_settings=settings, _request_timeout=timeout)
except requests.exceptions.Timeout:
raise TimeoutError(f"Timeout while setting detector settings for {self._parent_name}")
raise TimeoutError(
f"Timeout on device {self._parent_name} while setting detector settings:\n "
f"{yaml.dump(settings, indent=4)}."
)
except Exception:
content = traceback.format_exc()
logger.error(
f"Error on device {self._parent_name} while setting detector settings:\n "
f"{yaml.dump(settings, indent=4)}. Error traceback: {content}"
)
raise JungfrauJochClientError(
f"Error while setting detector settings for {self._parent_name}: {content}"
f"Error on device {self._parent_name} while setting detector settings:\n "
f"{yaml.dump(settings, indent=4)}. Full traceback: {content}."
)
def start(self, settings: dict | DatasetSettings, request_timeout: float = 10) -> None:
"""Start the mesaurement. DatasetSettings must be provided, and JungfrauJoch must be in IDLE state.
The method call is blocking and JungfrauJoch will be ready to measure after the call resolves.
"""
Start the acquisition with the provided dataset settings.
The detector must be in IDLE state. Settings must always provide a full set of
parameters, missing parameters will be set to default values.
Args:
settings (dict): dictionary of settings
Please check the DataSettings class for the available settings. Minimum required settings are
beam_x_pxl, beam_y_pxl, detector_distance_mm, incident_energy_keV.
settings (dict | DatasetSettings): Dataset settings to start the acquisition with.
request_timeout (float): Timeout in sec for the HTTP request to start the acquisition.
"""
state = self.detector_state
if state != DetectorState.IDLE:
raise JungfrauJochClientError(
f"Error in {self._parent_name}. Detector must be in IDLE state to set settings. Current state: {state}"
f"Error on device {self._parent_name}. "
f"Detector must be in IDLE state to start acquisition. Current state: {state}"
)
if isinstance(settings, dict):
@@ -141,46 +169,80 @@ class JungfrauJochClient:
dataset_settings=settings, _request_timeout=request_timeout
)
except requests.exceptions.Timeout:
content = traceback.format_exc()
logger.error(
f"Timeout error after {request_timeout} seconds on device {self._parent_name} "
f"during 'start' call with dataset settings: {yaml.dump(settings, indent=4)}. \n"
f"Traceback: {content}"
)
raise TimeoutError(
f"TimeoutError in JungfrauJochClient for parent device {self._parent_name} for 'start' call"
f"Timeout error after {request_timeout} seconds on device {self._parent_name} "
f"during 'start' call with dataset settings: {yaml.dump(settings, indent=4)}."
)
except Exception:
content = traceback.format_exc()
logger.error(
f"Error on device {self._parent_name} during 'start' post with dataset settings: \n"
f"{yaml.dump(settings, indent=4)}. \nTraceback: {content}"
)
raise JungfrauJochClientError(
f"Error in JungfrauJochClient for parent device {self._parent_name} during 'start' call: {content}"
f"Error on device {self._parent_name} during 'start' post with dataset settings: \n"
f"{yaml.dump(settings, indent=4)}. \nTraceback: {content}."
)
def stop(self, request_timeout: float = 0.5) -> None:
"""Stop the acquisition, this only logs errors and is not raising."""
try:
self.api.cancel_post_with_http_info(_request_timeout=request_timeout)
except requests.exceptions.Timeout:
content = traceback.format_exc()
logger.error(
f"Timeout in JungFrauJochClient for device {self._parent_name} during stop: {content}"
)
except Exception:
content = traceback.format_exc()
logger.error(
f"Error in JungFrauJochClient for device {self._parent_name} during stop: {content}"
)
def wait_for_idle(self, timeout: int = 10, request_timeout: float | None = None) -> bool:
"""Wait for JungfrauJoch to be in Idle state. Blocking call with timeout.
def _stop_call(self):
try:
self.api.cancel_post_with_http_info() # (_request_timeout=request_timeout)
except requests.exceptions.Timeout:
content = traceback.format_exc()
logger.error(
f"Timeout error after {request_timeout} seconds on device {self._parent_name} "
f"during stop: {content}"
)
except Exception:
content = traceback.format_exc()
logger.error(f"Error on device {self._parent_name} during stop: {content}")
thread = threading.Thread(
target=_stop_call, daemon=True, args=(self,), name="stop_jungfraujoch_thread"
)
thread.start()
def wait_for_idle(self, timeout: int = 10, raise_on_timeout: bool = True) -> bool:
"""
Method to wait until the detector is in IDLE state. This is a blocking call with a
timeout that can be specified. The additional parameter raise_on_timeout can be used to
raise an exception on timeout instead of returning boolean True/False.
Args:
timeout (int): timeout in seconds
raise_on_timeout (bool): If True, raises an exception on timeout. Default is True.
Returns:
bool: True if the detector is in IDLE state, False if timeout occurred
"""
if request_timeout is None:
request_timeout = timeout
try:
self.api.wait_till_done_post(timeout=timeout, _request_timeout=request_timeout)
self.api.wait_till_done_post(timeout=timeout, _request_timeout=timeout)
except requests.exceptions.Timeout:
raise TimeoutError(f"HTTP request timeout in wait_for_idle for {self._parent_name}")
except Exception:
content = traceback.format_exc()
logger.debug(f"Waiting for device {self._parent_name} to become IDLE: {content}")
logger.info(
f"Timeout after {timeout} seconds on device {self._parent_name} in wait_for_idle: {content}"
)
if raise_on_timeout:
raise TimeoutError(
f"Timeout after {timeout} seconds on device {self._parent_name} in wait_for_idle."
)
return False
except Exception as exc:
content = traceback.format_exc()
logger.info(
f"Error on device {self._parent_name} in wait_for_idle. Full traceback: {content}"
)
if raise_on_timeout:
raise JungfrauJochClientError(
f"Error on device {self._parent_name} in wait_for_idle: {content}"
) from exc
return False
return True

View File

@@ -1,22 +1,136 @@
"""Module for the Eiger preview ZMQ stream."""
"""
Module for the JungfrauJoch preview ZMQ stream for the Eiger detector at cSAXS.
The Preview client is implemented for the JungfrauJoch ZMQ PUB-SUB interface, and
should be independent of the EIGER detector type.
The client connects to the ZMQ PUB-SUB preview stream and calls a user provided callback
function with the decompressed messages received from the stream. The callback needs to be
able to deal with the different message types sent by the JungfrauJoch server ("start",
"image", "end") as described in the JungfrauJoch ZEROMQ preview stream documentation.
(https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream).
"""
from __future__ import annotations
import json
import threading
import time
from typing import Callable
import cbor2
import numpy as np
import zmq
from bec_lib.logger import bec_logger
from dectris.compression import decompress
logger = bec_logger.logger
ZMQ_TOPIC_FILTER = b""
###############################
###### CBOR TAG DECODERS ######
###############################
# Dectris specific CBOR tags and decoders for Jungfrau data
# Reference:
# https://github.com/dectris/documentation/blob/main/stream_v2/examples/client.py
def decode_multi_dim_array(tag: cbor2.CBORTag, column_major: bool = False):
"""Decode a multi-dimensional array from a CBOR tag."""
dimensions, contents = tag.value
if isinstance(contents, list):
array = np.empty((len(contents),), dtype=object)
array[:] = contents
elif isinstance(contents, (np.ndarray, np.generic)):
array = contents
else:
raise cbor2.CBORDecodeValueError("expected array or typed array")
return array.reshape(dimensions, order="F" if column_major else "C")
def decode_typed_array(tag: cbor2.CBORTag, dtype: str):
"""Decode a typed array from a CBOR tag."""
if not isinstance(tag.value, bytes):
raise cbor2.CBORDecodeValueError("expected byte string in typed array")
return np.frombuffer(tag.value, dtype=dtype)
def decode_dectris_compression(tag: cbor2.CBORTag):
"""Decode a Dectris compressed array from a CBOR tag."""
algorithm, elem_size, encoded = tag.value
return decompress(encoded, algorithm, elem_size=elem_size)
#########################################
#### Dectris CBOR TAG Extensions ########
#########################################
# Mapping of various additional CBOR tags from Dectris to decoder functions
tag_decoders = {
40: lambda tag: decode_multi_dim_array(tag, column_major=False),
64: lambda tag: decode_typed_array(tag, dtype="u1"),
65: lambda tag: decode_typed_array(tag, dtype=">u2"),
66: lambda tag: decode_typed_array(tag, dtype=">u4"),
67: lambda tag: decode_typed_array(tag, dtype=">u8"),
68: lambda tag: decode_typed_array(tag, dtype="u1"),
69: lambda tag: decode_typed_array(tag, dtype="<u2"),
70: lambda tag: decode_typed_array(tag, dtype="<u4"),
71: lambda tag: decode_typed_array(tag, dtype="<u8"),
72: lambda tag: decode_typed_array(tag, dtype="i1"),
73: lambda tag: decode_typed_array(tag, dtype=">i2"),
74: lambda tag: decode_typed_array(tag, dtype=">i4"),
75: lambda tag: decode_typed_array(tag, dtype=">i8"),
77: lambda tag: decode_typed_array(tag, dtype="<i2"),
78: lambda tag: decode_typed_array(tag, dtype="<i4"),
79: lambda tag: decode_typed_array(tag, dtype="<i8"),
80: lambda tag: decode_typed_array(tag, dtype=">f2"),
81: lambda tag: decode_typed_array(tag, dtype=">f4"),
82: lambda tag: decode_typed_array(tag, dtype=">f8"),
83: lambda tag: decode_typed_array(tag, dtype=">f16"),
84: lambda tag: decode_typed_array(tag, dtype="<f2"),
85: lambda tag: decode_typed_array(tag, dtype="<f4"),
86: lambda tag: decode_typed_array(tag, dtype="<f8"),
87: lambda tag: decode_typed_array(tag, dtype="<f16"),
1040: lambda tag: decode_multi_dim_array(tag, column_major=True),
56500: lambda tag: decode_dectris_compression(tag), # pylint: disable=unnecessary-lambda
}
def tag_hook(decoder, tag: int):
"""
Tag hook for the cbor2.loads method. Both arguments "decoder" and "tag" mus be present.
We use the tag to choose the respective decoder from the tag_decoders registry if available.
"""
tag_decoder = tag_decoders.get(tag.tag)
return tag_decoder(tag) if tag_decoder else tag
######################
#### ZMQ Settings ####
######################
ZMQ_TOPIC_FILTER = b"" # Subscribe to all topics
ZMQ_CONFLATE_SETTING = 1 # Keep only the most recent message
ZMQ_RCVHWM_SETTING = 1 # Set high water mark to 1, this configures the max number of queue messages
#################################
#### Jungfrau Preview Client ####
#################################
class JungfrauJochPreview:
"""
Preview client for the JungfrauJoch ZMQ preview stream. The client is started with
a URL to receive the data from the JungfrauJoch PUB-SUB preview interface, and a
callback function that is called with messages received from the preview stream.
The callback needs to be able to deal with the different message types sent
by the JungfrauJoch server ("start", "image", "end") as described in the
JungfrauJoch ZEROMQ preview stream documentation. Messages are dictionary dumps.
(https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream).
Args:
url (str): ZMQ PUB-SUB preview stream URL.
cb (Callable): Callback function called with messages received from the stream.
"""
USER_ACCESS = ["start", "stop"]
def __init__(self, url: str, cb: Callable):
@@ -27,16 +141,18 @@ class JungfrauJochPreview:
self._on_update_callback = cb
def connect(self):
"""Connect to the JungfrauJoch PUB-SUB streaming interface
JungfrauJoch may reject connection for a few seconds when it restarts,
so if it fails, wait a bit and try to connect again.
"""
Connect to the JungfrauJoch PUB-SUB streaming interface. If the connection is refused
it will reattempt a second time after a one second delay.
"""
# pylint: disable=no-member
context = zmq.Context()
self._socket = context.socket(zmq.SUB)
self._socket.setsockopt(zmq.CONFLATE, ZMQ_CONFLATE_SETTING)
self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER)
self._socket.setsockopt(zmq.RCVHWM, ZMQ_RCVHWM_SETTING)
try:
self._socket.connect(self.url)
except ConnectionRefusedError:
@@ -44,17 +160,26 @@ class JungfrauJochPreview:
self._socket.connect(self.url)
def start(self):
"""Start the ZMQ update loop in a background thread."""
self._zmq_thread = threading.Thread(
target=self._zmq_update_loop, daemon=True, name="JungfrauJoch_live_preview"
)
self._zmq_thread.start()
def stop(self):
"""Stop the ZMQ update loop and wait for the thread to finish."""
self._shutdown_event.set()
if self._zmq_thread:
self._zmq_thread.join()
self._zmq_thread.join(timeout=1.0)
def _zmq_update_loop(self):
def _zmq_update_loop(self, poll_interval: float = 0.2):
"""
ZMQ update loop running in a background thread. The polling is throttled by
the poll_interval parameter.
Args:
poll_interval (float): Time in seconds to wait between polling attempts.
"""
while not self._shutdown_event.is_set():
if self._socket is None:
self.connect()
@@ -64,18 +189,21 @@ class JungfrauJochPreview:
# Happens when ZMQ partially delivers the multipart message
pass
except zmq.error.Again:
# Happens when receive queue is empty
time.sleep(0.1)
logger.debug(
f"ZMQ Again exception, receive queue is empty for JFJ preview at {self.url}."
)
finally:
# We throttle the polling to avoid heavy load on the device server
time.sleep(poll_interval)
def _poll(self):
"""
Poll the ZMQ socket for new data. It will throttle the data update and
only subscribe to the topic for a single update. This is not very nice
but it seems like there is currently no option to set the update rate on
the backend.
Poll the ZMQ socket for new data. We are currently subscribing and unsubscribing
for each poll loop to avoid receiving too many messages. Throttling of the update
loop is handled in the _zmq_update_loop method.
"""
if self._shutdown_event.wait(0.2):
if self._shutdown_event.is_set():
return
try:
@@ -90,7 +218,19 @@ class JungfrauJochPreview:
# Unsubscribe from the topic
self._socket.setsockopt(zmq.UNSUBSCRIBE, ZMQ_TOPIC_FILTER)
def _parse_data(self, data):
# TODO decode and parse the data
# self._on_update_callback(data)
pass
def _parse_data(self, bytes_list: list[bytes]):
"""
Parse the received ZMQ data from the JungfrauJoch preview stream.
We will call the _on_update_callback with the decompressed messages as a dictionary.
The callback needs to be able to deal with the different message types sent
by the JungfrauJoch server ("start", "image", "end") as described in the
JungfrauJoch ZEROMQ preview stream documentation. Messages are dictionary dumps.
(https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream).
Args:
bytes_list (list[bytes]): List of byte messages received from ZMQ recv_multipart.
"""
for byte_msg in bytes_list:
msg = cbor2.loads(byte_msg, tag_hook=tag_hook)
self._on_update_callback(msg)

View File

@@ -25,6 +25,8 @@ dependencies = [
"bec_widgets",
"zmq",
"opencv-python",
"dectris-compression", # for JFJ preview stream decompression
"cbor2",
]
[project.optional-dependencies]

14
tests/conftest.py Normal file
View File

@@ -0,0 +1,14 @@
"""
Conftest runs for all tests in this directory and subdirectories. Thereby, we know for
certain that the SocketSignal.READBACK_TIMEOUT is set to 0 for all tests, which prevents
hanging tests when a readback is attempted on a non-connected socket.
"""
# conftest.py
import pytest
from ophyd_devices.utils.socket import SocketSignal
@pytest.fixture(autouse=True)
def patch_socket_timeout(monkeypatch):
monkeypatch.setattr(SocketSignal, "READBACK_TIMEOUT", 0.0)

View File

@@ -5,6 +5,7 @@ from time import time
from typing import TYPE_CHECKING, Generator
from unittest import mock
import numpy as np
import pytest
from bec_lib.messages import FileMessage, ScanStatusMessage
from jfjoch_client.models.broker_status import BrokerStatus
@@ -78,7 +79,7 @@ def detector_list(request) -> Generator[DetectorList, None, None]:
),
DetectorListElement(
id=2,
description="EIGER 8.5M (tmp)",
description="EIGER 9M",
serial_number="123456",
base_ipv4_addr="192.168.0.1",
udp_interface_count=1,
@@ -103,7 +104,11 @@ def eiger_1_5m(mock_scan_info) -> Generator[Eiger1_5M, None, None]:
name = "eiger_1_5m"
dev = Eiger1_5M(name=name, beam_center=(256, 256), detector_distance=100.0)
dev.scan_info.msg = mock_scan_info
yield dev
try:
yield dev
finally:
if dev._destroyed is False:
dev.destroy()
@pytest.fixture(scope="function")
@@ -113,7 +118,19 @@ def eiger_9m(mock_scan_info) -> Generator[Eiger9M, None, None]:
name = "eiger_9m"
dev = Eiger9M(name=name)
dev.scan_info.msg = mock_scan_info
yield dev
try:
yield dev
finally:
if dev._destroyed is False:
dev.destroy()
def test_eiger_wait_for_connection(eiger_1_5m, eiger_9m):
"""Test the wait_for_connection metho is calling status_get on the JFJ API client."""
for eiger in (eiger_1_5m, eiger_9m):
with mock.patch.object(eiger.jfj_client.api, "status_get") as mock_status_get:
eiger.wait_for_connection(timeout=1)
mock_status_get.assert_called_once_with(_request_timeout=1)
@pytest.mark.parametrize("detector_state", ["Idle", "Inactive"])
@@ -141,7 +158,7 @@ def test_eiger_1_5m_on_connected(eiger_1_5m, detector_list, detector_state):
else:
eiger.on_connected()
assert mock_set_det.call_args == mock.call(
DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=10
DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=5
)
assert mock_file_writer.call_args == mock.call(
file_writer_settings=FileWriterSettings(
@@ -179,7 +196,7 @@ def test_eiger_9m_on_connected(eiger_9m, detector_list, detector_state):
else:
eiger.on_connected()
assert mock_set_det.call_args == mock.call(
DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=10
DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=5
)
assert mock_file_writer.call_args == mock.call(
file_writer_settings=FileWriterSettings(
@@ -216,11 +233,39 @@ def test_eiger_on_stop(eiger_1_5m):
stop_event.wait(timeout=5) # Thread should be killed from task_handler
def test_eiger_on_destroy(eiger_1_5m):
"""Test the on_destroy logic of the Eiger detector. This is equivalent for 9M and 1_5M."""
eiger = eiger_1_5m
start_event = threading.Event()
stop_event = threading.Event()
def tmp_task():
start_event.set()
try:
while True:
time.sleep(0.1)
finally:
stop_event.set()
eiger.task_handler.submit_task(tmp_task)
start_event.wait(timeout=5)
with (
mock.patch.object(eiger.jfj_preview_client, "stop") as mock_jfj_preview_client_stop,
mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop,
):
eiger.on_destroy()
mock_jfj_preview_client_stop.assert_called_once()
mock_jfj_client_stop.assert_called_once()
stop_event.wait(timeout=5)
@pytest.mark.timeout(25)
@pytest.mark.parametrize("raise_timeout", [True, False])
def test_eiger_on_complete(eiger_1_5m, raise_timeout):
"""Test the on_complete logic of the Eiger detector. This is equivalent for 9M and 1_5M."""
eiger = eiger_1_5m
eiger._wait_for_on_complete = 1 # reduce wait time for testing
callback_completed_event = threading.Event()
@@ -230,7 +275,7 @@ def test_eiger_on_complete(eiger_1_5m, raise_timeout):
unblock_wait_for_idle = threading.Event()
def mock_wait_for_idle(timeout: int, request_timeout: float):
def mock_wait_for_idle(timeout: float, raise_on_timeout: bool) -> bool:
if unblock_wait_for_idle.wait(timeout):
if raise_timeout:
return False
@@ -238,11 +283,18 @@ def test_eiger_on_complete(eiger_1_5m, raise_timeout):
return False
with (
mock.patch.object(
eiger.jfj_client.api, "status_get", return_value=BrokerStatus(state="Idle")
),
mock.patch.object(eiger.jfj_client, "wait_for_idle", side_effect=mock_wait_for_idle),
mock.patch.object(
eiger.jfj_client.api,
"statistics_data_collection_get",
return_value=MeasurementStatistics(run_number=1),
return_value=MeasurementStatistics(
run_number=1,
images_collected=eiger.scan_info.msg.num_points
* eiger.scan_info.msg.scan_parameters["frames_per_trigger"],
),
),
):
status = eiger.complete()
@@ -284,7 +336,7 @@ def test_eiger_file_event_callback(eiger_1_5m, tmp_path):
assert file_msg.hinted_h5_entries == {"data": "entry/data/data"}
def test_eiger_on_sage(eiger_1_5m):
def test_eiger_on_stage(eiger_1_5m):
"""Test the on_stage and on_unstage logic of the Eiger detector. This is equivalent for 9M and 1_5M."""
eiger = eiger_1_5m
scan_msg = eiger.scan_info.msg
@@ -316,3 +368,35 @@ def test_eiger_on_sage(eiger_1_5m):
)
assert mock_start.call_args == mock.call(settings=data_settings)
assert eiger.staged is Staged.yes
def test_eiger_set_det_distance_test_beam_center(eiger_1_5m):
"""Test the set_detector_distance and set_beam_center methods. Equivalent for 9M and 1_5M."""
eiger = eiger_1_5m
old_distance = eiger.detector_distance
new_distance = old_distance + 100
old_beam_center = eiger.beam_center
new_beam_center = (old_beam_center[0] + 20, old_beam_center[1] + 50)
eiger.set_detector_distance(new_distance)
assert eiger.detector_distance == new_distance
eiger.set_beam_center(x=new_beam_center[0], y=new_beam_center[1])
assert eiger.beam_center == new_beam_center
with pytest.raises(ValueError):
eiger.set_beam_center(x=-10, y=100) # Cannot set negative beam center
with pytest.raises(ValueError):
eiger.detector_distance = -50 # Cannot set negative detector distance
def test_eiger_preview_callback(eiger_1_5m):
"""Preview callback test for the Eiger detector. This is equivalent for 9M and 1_5M."""
eiger = eiger_1_5m
# NOTE: I don't find models for the CBOR messages used by JFJ, currently using a dummay dict.
# Please adjust once the proper model is found.
for msg_type in ["start", "end", "image", "calibration", "metadata"]:
msg = {"type": msg_type, "data": {"default": np.array([[1, 2], [3, 4]])}}
with mock.patch.object(eiger.preview_image, "put") as mock_preview_put:
eiger._preview_callback(msg)
if msg_type == "image":
mock_preview_put.assert_called_once_with(msg["data"]["default"])
else:
mock_preview_put.assert_not_called()

View File

@@ -0,0 +1,99 @@
# pylint: skip-file
from __future__ import annotations
import pytest
from ophyd_devices.tests.utils import patched_device
from csaxs_bec.devices.epics.eps import EPS
ALL_PVS = [
# ALARMS
"X12SA-EPS-PLC:AlarmCnt_EPS",
"ARS00-MIS-PLC-01:AlarmCnt_Frontends",
# FRONTEND VALVES
"X12SA-FE-VVPG-0000:PLC_OPEN",
"X12SA-FE-VVPG-1010:PLC_OPEN",
"X12SA-FE-VVFV-2010:PLC_OPEN",
"X12SA-FE-VVPG-2010:PLC_OPEN",
# Optics VALVES
"X12SA-OP-VVPG-1010:PLC_OPEN",
"X12SA-OP-VVPG-2010:PLC_OPEN",
"X12SA-OP-VVPG-3010:PLC_OPEN",
"X12SA-OP-VVPG-3020:PLC_OPEN",
"X12SA-OP-VVPG-4010:PLC_OPEN",
"X12SA-OP-VVPG-5010:PLC_OPEN",
"X12SA-OP-VVPG-6010:PLC_OPEN",
"X12SA-OP-VVPG-7010:PLC_OPEN",
# Endstation VALVES
"X12SA-ES-VVPG-1010:PLC_OPEN",
# Frontend SHUTTERS
"X12SA-FE-PSH1-EMLS-0010:OPEN",
"X12SA-FE-STO1-EMLS-0010:OPEN",
# Optics SHUTTERS
"X12SA-OP-PSH1-EMLS-7010:OPEN",
# DMM Monochromator
"X12SA-OP-DMM-ETTC-3010:TEMP",
"X12SA-OP-DMM-ETTC-3020:TEMP",
"X12SA-OP-DMM-ETTC-3030:TEMP",
"X12SA-OP-DMM-ETTC-3040:TEMP",
"X12SA-OP-DMM-EMLS-3010:THRU",
"X12SA-OP-DMM-EMLS-3020:IN",
"X12SA-OP-DMM-EMLS-3030:THRU",
"X12SA-OP-DMM-EMLS-3040:IN",
"X12SA-OP-DMM-EMSW-3050:SWITCH",
"X12SA-OP-DMM-EMSW-3060:SWITCH",
"X12SA-OP-DMM-EMSW-3070:SWITCH",
"X12SA-OP-DMM1:ENERGY-GET",
"X12SA-OP-DMM1:POSITION",
"X12SA-OP-DMM1:STRIPE",
# CCM Monochromator
"X12SA-OP-CCM-ETTC-4010:TEMP",
"X12SA-OP-CCM-ETTC-4020:TEMP",
"X12SA-OP-CCM-EMSW-4010:SWITCH",
"X12SA-OP-CCM-EMSW-4020:SWITCH",
"X12SA-OP-CCM-EMSW-4030:SWITCH",
"X12SA-OP-CCM1:ENERGY-GET",
"X12SA-OP-CCM1:POSITION",
# Water Cooling
"X12SA-OP-SL1-EFSW-2010:FLOW",
"X12SA-OP-SL2-EFSW-2010:FLOW",
"X12SA-OP-EB1-EFSW-5010:FLOW",
"X12SA-OP-EB1-EFSW-5020:FLOW",
"X12SA-OP-SL3-EFSW-5010:FLOW",
"X12SA-OP-KB-EFSW-6010:FLOW",
"X12SA-OP-PSH1-EFSW-7010:FLOW",
"X12SA-ES-EB2-EFSW-1010:FLOW",
"X12SA-OP-CS-ECVW-0010:PLC_OPEN",
"X12SA-OP-CS-ECVW-0020:PLC_OPEN",
# Request PVs
"X12SA-EPS-PLC:ACKERR-REQUEST",
"X12SA-OP-CS-ECVW:PLC_REQUEST",
]
@pytest.fixture
def eps():
dev_name = "EPS"
with patched_device(EPS, name=dev_name) as eps:
yield eps
def test_eps_has_signals(eps):
"""Test that all expected PVs are present in the eps device."""
found_pvs = [walk.item._read_pv.pvname for walk in eps.walk_signals()]
assert set(found_pvs) == set(
ALL_PVS
), f"Expected PVs {ALL_PVS} but found {set(ALL_PVS) - set(found_pvs)}"
# pylint: disable=line-too-long
expected_show_all_output = "\x1b[1mX12SA EPS status\x1b[0m\n\n\x1b[1mEPS Alarms\x1b[0m\n - X12SA EPS Alarm count 0\n - FrontEnd MIS Alarm count 0\n\n\x1b[1mValves Frontend\x1b[0m\n - FE-VVPG-0000 \x1b[91mCLOSED\x1b[0m\n - FE-VVPG-1010 \x1b[91mCLOSED\x1b[0m\n - FE-VVFV-2010 \x1b[91mCLOSED\x1b[0m\n - FE-VVPG-2010 \x1b[91mCLOSED\x1b[0m\n\n\x1b[1mValves Optics Hutch\x1b[0m\n - OP-VVPG-1010 \x1b[91mCLOSED\x1b[0m\n - OP-VVPG-2010 \x1b[91mCLOSED\x1b[0m\n - OP-VVPG-3010 \x1b[91mCLOSED\x1b[0m\n - OP-VVPG-3020 \x1b[91mCLOSED\x1b[0m\n - OP-VVPG-4010 \x1b[91mCLOSED\x1b[0m\n - OP-VVPG-5010 \x1b[91mCLOSED\x1b[0m\n - OP-VVPG-6010 \x1b[91mCLOSED\x1b[0m\n - OP-VVPG-7010 \x1b[91mCLOSED\x1b[0m\n\n\x1b[1mValves ES Hutch\x1b[0m\n - ES-VVPG-1010 \x1b[91mCLOSED\x1b[0m\n\n\x1b[1mShutters Frontend\x1b[0m\n - FE-PSH1-EMLS-0010 \x1b[91mCLOSED\x1b[0m\n - FE-STO1-EMLS-0010 \x1b[91mCLOSED\x1b[0m\n\n\x1b[1mShutters Endstation\x1b[0m\n - OP-PSH1-EMLS-7010 \x1b[91mCLOSED\x1b[0m\n\n\x1b[1mDMM Monochromator\x1b[0m\n - DMM Temp Surface 1 0.0\n - DMM Temp Surface 2 0.0\n - DMM Temp Shield 1 (disaster) 0.0\n - DMM Temp Shield 2 (disaster) 0.0\n - DMM Translation ThruPos \x1b[91mINACTIVE\x1b[0m\n - DMM Translation InPos \x1b[91mINACTIVE\x1b[0m\n - DMM Bragg ThruPos \x1b[91mINACTIVE\x1b[0m\n - DMM Bragg InPos \x1b[91mINACTIVE\x1b[0m\n - DMM Heater Fault XTAL 1 \x1b[92mOK\x1b[0m\n - DMM Heater Fault XTAL 2 \x1b[92mOK\x1b[0m\n - DMM Heater Fault Support 1 \x1b[92mOK\x1b[0m\n - DMM Energy 0.0000\n - DMM Position out of beam\n - DMM Stripe Stripe 1 W/B4C\n\n\x1b[1mCCM Monochromator\x1b[0m\n - CCM Temp Crystal 0.0\n - CCM Temp Shield (disaster) 0.0\n - CCM Heater Fault 1 \x1b[92mOK\x1b[0m\n - CCM Heater Fault 2 \x1b[92mOK\x1b[0m\n - CCM Heater Fault 3 \x1b[92mOK\x1b[0m\n - CCM Energy 0.0000\n - CCM Position out of beam\n\n\x1b[1mCooling Water\x1b[0m\n - OP-SL1-EFSW-2010 \x1b[91mFAIL\x1b[0m\n - OP-SL2-EFSW-2010 \x1b[91mFAIL\x1b[0m\n - OP-EB1-EFSW-5010 \x1b[91mFAIL\x1b[0m\n - OP-EB1-EFSW-5020 \x1b[91mFAIL\x1b[0m\n - OP-SL3-EFSW-5010 \x1b[91mFAIL\x1b[0m\n - OP-KB-EFSW-6010 \x1b[91mFAIL\x1b[0m\n - OP-PSH1-EFSW-7010 \x1b[91mFAIL\x1b[0m\n - ES-EB2-EFSW-1010 \x1b[91mFAIL\x1b[0m\n - OP-CS-ECVW-0010 \x1b[91mCLOSED\x1b[0m\n - OP-CS-ECVW-0020 \x1b[91mCLOSED\x1b[0m\n\n\x1b[96mHint:\x1b[0m Both water cooling valves are CLOSED.\nYou can open them using: \x1b[1mdev.x12saEPS.water_cooling_op()\x1b[0m\n"
def test_eps_show_all(eps, capsys):
"""Test that the show_all method outputs the expected status."""
eps.show_all()
output = capsys.readouterr().out
assert (
output == expected_show_all_output
), f"Expected output does not match actual output.\nExpected:\n{expected_show_all_output}\nActual:\n{output}"