fix: adapt nidaq to PSIDeviceBase

This commit is contained in:
gac-x01da
2025-03-18 16:50:07 +01:00
committed by appel_c
parent 81bca16f67
commit edcf00a55c

View File

@@ -1,137 +1,47 @@
from __future__ import annotations
from typing import Literal, TYPE_CHECKING, cast
from typing import Literal
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase, CustomDetectorMixin
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd import Device, Kind, DeviceStatus, Component as Cpt
from ophyd import EpicsSignal, EpicsSignalRO
from ophyd import EpicsSignal, EpicsSignalRO, StatusBase
from ophyd_devices.sim.sim_signals import SetableSignal
from bec_lib.logger import bec_logger
from debye_bec.devices.nidaq.nidaq_enums import NIDAQCompression, ScanType, NidaqState, ScanRates, ReadoutRange, EncoderTypes
from debye_bec.devices.nidaq.nidaq_enums import (
NIDAQCompression,
ScanType,
NidaqState,
ScanRates,
ReadoutRange,
EncoderTypes,
)
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
logger = bec_logger.logger
class NidaqError(Exception):
""" Nidaq specific error"""
"""Nidaq specific error"""
class NIDAQCustomMixin(CustomDetectorMixin):
""" NIDAQ Custom Mixin class to implement the device and beamline-specific actions
to the psidetectorbase class via custom_prepare methods"""
def __init__(self, *_args, parent: Device = None, **_kwargs) -> None:
super().__init__(args=_args, parent=parent, kwargs=_kwargs)
self.timeout_wait_for_signal = 5 # put 5s firsts
self.valid_scan_names = ["xas_simple_scan",
"xas_simple_scan_with_xrd",
"xas_advanced_scan",
"xas_advanced_scan_with_xrd"]
def _check_if_scan_name_is_valid(self) -> bool:
""" Check if the scan is within the list of scans for which the backend is working"""
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name in self.valid_scan_names:
return True
return False
def on_connection_established(self) -> None:
"""Method called once wait_for_connection is called on the parent class.
This should be used to implement checks that require the device to be connected, i.e. setting standard pvs.
"""
if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.STANDBY)],
timeout = self.timeout_wait_for_signal,
check_stopped=True):
raise NidaqError(f"Device {self.parent.name} has not been reached in state STANDBY, current state {NidaqState(self.parent.state.get())}")
self.parent.scan_duration.set(0).wait()
def on_stop(self):
""" Stop the NIDAQ backend"""
self.parent.stop_call.set(1).wait()
def on_complete(self) -> None | DeviceStatus:
""" Complete actions. For the NIDAQ we use this method to stop the backend since it
would not stop by itself in its current implementation since the number of points are not predefined.
"""
if not self._check_if_scan_name_is_valid():
return None
self.on_stop()
#TODO check if this wait can be removed. We are waiting in unstage anyways which will always be called afterwards
# Wait for device to be stopped
status = self.wait_with_status(signal_conditions=[(self.parent.state.get, NidaqState.STANDBY)],
check_stopped= True,
timeout=self.timeout_wait_for_signal,
)
return status
def on_stage(self):
""" Prepare the device for the upcoming acquisition. If the upcoming scan is not in the list
of valid scans, return immediately. """
if not self._check_if_scan_name_is_valid():
return None
if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.STANDBY)],
timeout = self.timeout_wait_for_signal,
check_stopped=True):
raise NidaqError(f"Device {self.parent.name} has not been reached in state STANDBY, current state {NidaqState(self.parent.state.get())}")
self.parent.scan_type.set(ScanType.TRIGGERED).wait()
self.parent.scan_duration.set(0).wait()
self.parent.stage_call.set(1).wait()
if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.STAGE)],
timeout = self.timeout_wait_for_signal,
check_stopped=True,
):
raise NidaqError(f"Device {self.parent.name} has not been reached in state STAGE, current state {NidaqState(self.parent.state.get())}")
self.parent.kickoff_call.set(1).wait()
logger.info(f"Device {self.parent.name} was staged: {NidaqState(self.parent.state.get())}")
def on_pre_scan(self) -> None:
""" Execute time critical actions. Here we ensure that the NIDAQ master task is running
before the motor starts its oscillation. This is needed for being properly homed.
The NIDAQ should go into Acquiring mode. """
if not self._check_if_scan_name_is_valid():
return None
if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.KICKOFF)],
timeout = self.timeout_wait_for_signal,
check_stopped=True,
):
raise NidaqError(f"Device {self.parent.name} failed to reach state KICKOFF during pre scan, current state {NidaqState(self.parent.state.get())}")
logger.info(f"Device {self.parent.name} ready to take data after pre_scan: {NidaqState(self.parent.state.get())}")
def on_unstage(self) -> None:
""" Unstage actions, the NIDAQ has to be in STANDBY state."""
if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.STANDBY)],
timeout = self.timeout_wait_for_signal,
check_stopped=False):
raise NidaqError(f"Device {self.parent.name} has not been reached in state STANDBY, current state {NidaqState(self.parent.state.get())}")
logger.info(f"Device {self.parent.name} was unstaged: {NidaqState(self.parent.state.get())}")
class Nidaq(PSIDetectorBase):
""" NIDAQ ophyd wrapper around the NIDAQ backend currently running at x01da-cons-05
Args:
prefix (str) : Prefix to the NIDAQ soft ioc, currently X01DA-PC-SCANSERVER:
name (str) : Name of the device
kind (Kind) : Ophyd Kind of the device
parent (Device) : Parent clas
device_manager : device manager as forwarded by BEC
"""
USER_ACCESS = ['set_config']
enc = Cpt(SetableSignal,value=0, kind=Kind.normal)
signal_1 = Cpt(SetableSignal,value=0, kind=Kind.normal)
signal_2 = Cpt(SetableSignal,value=0, kind=Kind.normal)
signal_3 = Cpt(SetableSignal,value=0, kind=Kind.normal)
signal_4 = Cpt(SetableSignal,value=0, kind=Kind.normal)
signal_5 = Cpt(SetableSignal,value=0, kind=Kind.normal)
signal_6 = Cpt(SetableSignal,value=0, kind=Kind.normal)
signal_7 = Cpt(SetableSignal,value=0, kind=Kind.normal)
signal_8 = Cpt(SetableSignal,value=0, kind=Kind.normal)
class NidaqControl(Device):
"""Nidaq control class with all PVs"""
enc = Cpt(SetableSignal, value=0, kind=Kind.normal)
signal_1 = Cpt(SetableSignal, value=0, kind=Kind.normal)
signal_2 = Cpt(SetableSignal, value=0, kind=Kind.normal)
signal_3 = Cpt(SetableSignal, value=0, kind=Kind.normal)
signal_4 = Cpt(SetableSignal, value=0, kind=Kind.normal)
signal_5 = Cpt(SetableSignal, value=0, kind=Kind.normal)
signal_6 = Cpt(SetableSignal, value=0, kind=Kind.normal)
signal_7 = Cpt(SetableSignal, value=0, kind=Kind.normal)
signal_8 = Cpt(SetableSignal, value=0, kind=Kind.normal)
enable_compression = Cpt(EpicsSignal, suffix="NIDAQ-EnableRLE", kind=Kind.config)
kickoff_call = Cpt(EpicsSignal, suffix="NIDAQ-Kickoff", kind=Kind.config)
stage_call = Cpt(EpicsSignal, suffix="NIDAQ-Stage", kind = Kind.config)
state = Cpt(EpicsSignal, suffix="NIDAQ-FSMState", kind= Kind.config)
stage_call = Cpt(EpicsSignal, suffix="NIDAQ-Stage", kind=Kind.config)
state = Cpt(EpicsSignal, suffix="NIDAQ-FSMState", kind=Kind.config)
server_status = Cpt(EpicsSignalRO, suffix="NIDAQ-ServerStatus", kind=Kind.config)
compression_ratio = Cpt(EpicsSignalRO, suffix="NIDAQ-CompressionRatio", kind=Kind.config)
scan_type = Cpt(EpicsSignal, suffix="NIDAQ-ScanType", kind=Kind.config)
@@ -145,35 +55,55 @@ class Nidaq(PSIDetectorBase):
ci_chans = Cpt(EpicsSignal, suffix="NIDAQ-CIChans6614", kind=Kind.config)
di_chans = Cpt(EpicsSignal, suffix="NIDAQ-DIChans", kind=Kind.config)
custom_prepare_cls = NIDAQCustomMixin
def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs):
super().__init__(name=name, prefix=prefix, kind=kind, parent=parent, device_manager=device_manager, **kwargs)
class Nidaq(PSIDeviceBase, NidaqControl):
"""NIDAQ ophyd wrapper around the NIDAQ backend currently running at x01da-cons-05
Args:
prefix (str) : Prefix to the NIDAQ soft ioc, currently X01DA-PC-SCANSERVER:
name (str) : Name of the device
scan_info (ScanInfo) : ScanInfo object passed by BEC's devicemanager.
"""
USER_ACCESS = ["set_config"]
def __init__(self, prefix: str = "", *, name: str, scan_info: ScanInfo = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.timeout_wait_for_signal = 5 # put 5s firsts
self.valid_scan_names = [
"xas_simple_scan",
"xas_simple_scan_with_xrd",
"xas_advanced_scan",
"xas_advanced_scan_with_xrd",
]
########################################
# Beamline Methods #
########################################
def _check_if_scan_name_is_valid(self) -> bool:
"""Check if the scan is within the list of scans for which the backend is working"""
scan_name = self.scan_info.msg.scan_name
if scan_name in self.valid_scan_names:
return True
return False
def set_config(
self,
sampling_rate: Literal[100000,
500000,
1000000,
2000000,
4000000,
5000000,
10000000,
14286000,
],
ai: list,
ci: list,
di: list,
scan_type: Literal['continuous', 'triggered'] = 'triggered',
scan_duration: float = 0,
readout_range: Literal[1, 2, 5, 10] = 10,
encoder_type: Literal['X_1', 'X_2', 'X_4'] = 'X_4',
enable_compression: bool = True,
self,
sampling_rate: Literal[
100000, 500000, 1000000, 2000000, 4000000, 5000000, 10000000, 14286000
],
ai: list,
ci: list,
di: list,
scan_type: Literal["continuous", "triggered"] = "triggered",
scan_duration: float = 0,
readout_range: Literal[1, 2, 5, 10] = 10,
encoder_type: Literal["X_1", "X_2", "X_4"] = "X_4",
enable_compression: bool = True,
) -> None:
"""Method to configure the NIDAQ
Args:
sampling_rate(Literal[100000, 500000, 1000000, 2000000, 4000000, 5000000,
10000000, 14286000]): Sampling rate in Hz
@@ -191,7 +121,7 @@ class Nidaq(PSIDetectorBase):
enable_compression(bool): Enable or disable compression of data, default True
"""
if sampling_rate == 100000:
if sampling_rate == 100000:
self.sampling_rate.put(ScanRates.HUNDRED_KHZ)
elif sampling_rate == 500000:
self.sampling_rate.put(ScanRates.FIVE_HUNDRED_KHZ)
@@ -232,15 +162,15 @@ class Nidaq(PSIDetectorBase):
di_chans = di_chans | (1 << ch)
self.di_chans.put(di_chans)
if scan_type in 'continuous':
if scan_type in "continuous":
self.scan_type.put(ScanType.CONTINUOUS)
elif scan_type in 'triggered':
elif scan_type in "triggered":
self.scan_type.put(ScanType.TRIGGERED)
if scan_duration >= 0:
self.scan_duration.put(scan_duration)
if readout_range == 1:
if readout_range == 1:
self.readout_range.put(ReadoutRange.ONE_V)
elif readout_range == 2:
self.readout_range.put(ReadoutRange.TWO_V)
@@ -249,14 +179,134 @@ class Nidaq(PSIDetectorBase):
elif readout_range == 10:
self.readout_range.put(ReadoutRange.TEN_V)
if encoder_type in 'X_1':
if encoder_type in "X_1":
self.encoder_type.put(EncoderTypes.X_1)
elif encoder_type in 'X_2':
elif encoder_type in "X_2":
self.encoder_type.put(EncoderTypes.X_2)
elif encoder_type in 'X_4':
elif encoder_type in "X_4":
self.encoder_type.put(EncoderTypes.X_4)
if enable_compression is True:
self.enable_compression.put(NIDAQCompression.ON)
elif enable_compression is False:
self.enable_compression.put(NIDAQCompression.OFF)
########################################
# Beamline Specific Implementations #
########################################
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
if not self.wait_for_condition(
condition= lambda: self.state.get() == NidaqState.STANDBY,
timeout = self.timeout_wait_for_signal,
check_stopped=True):
raise NidaqError(f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}")
self.scan_duration.set(0).wait()
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
If the upcoming scan is not in the list of valid scans, return immediately.
"""
if not self._check_if_scan_name_is_valid():
return None
if not self.wait_for_condition(
condition=lambda: self.state.get() == NidaqState.STANDBY, timeout=self.timeout_wait_for_signal, check_stopped=True
):
raise NidaqError(
f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}"
)
self.scan_type.set(ScanType.TRIGGERED).wait()
self.scan_duration.set(0).wait()
self.stage_call.set(1).wait()
if not self.wait_for_condition(
condition=lambda: self.state.get() == NidaqState.STAGE, timeout=self.timeout_wait_for_signal, check_stopped=True
):
raise NidaqError(
f"Device {self.name} has not been reached in state STAGE, current state {NidaqState(self.state.get())}"
)
self.kickoff_call.set(1).wait()
logger.info(f"Device {self.name} was staged: {NidaqState(self.state.get())}")
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device. Check that the Nidaq goes into Standby"""
def _get_state():
return self.state.get() == NidaqState.STANDBY
if not self.wait_for_condition(
condition=_get_state, timeout=self.timeout_wait_for_signal, check_stopped=False
):
raise NidaqError(
f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}"
)
logger.info(f"Device {self.name} was unstaged: {NidaqState(self.state.get())}")
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""
Called right before the scan starts on all devices automatically.
Here we ensure that the NIDAQ master task is running
before the motor starts its oscillation. This is needed for being properly homed.
The NIDAQ should go into Acquiring mode.
"""
if not self._check_if_scan_name_is_valid():
return None
def _wait_for_state():
return self.state.get() == NidaqState.KICKOFF
if not self.wait_for_condition(
_wait_for_state, timeout=self.timeout_wait_for_signal, check_stopped=True
):
raise NidaqError(
f"Device {self.name} failed to reach state KICKOFF during pre scan, current state {NidaqState(self.state.get())}"
)
logger.info(
f"Device {self.name} ready to take data after pre_scan: {NidaqState(self.state.get())}"
)
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""
Called to inquire if a device has completed a scans.
For the NIDAQ we use this method to stop the backend since it
would not stop by itself in its current implementation since the number of points are not predefined.
"""
if not self._check_if_scan_name_is_valid():
return None
self.on_stop()
#TODO check if this wait can be removed. We are waiting in unstage anyways which will always be called afterwards
# Wait for device to be stopped
status = self.wait_for_condition(
condition = lambda: self.state.get() == NidaqState.STANDBY,
check_stopped=True,
timeout=self.timeout_wait_for_signal,
)
return status
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""
self.stop_call.set(1).wait()