diff --git a/debye_bec/devices/nidaq/nidaq.py b/debye_bec/devices/nidaq/nidaq.py index 0e84f3a..f3a0b5b 100644 --- a/debye_bec/devices/nidaq/nidaq.py +++ b/debye_bec/devices/nidaq/nidaq.py @@ -1,137 +1,47 @@ +from __future__ import annotations +from typing import Literal, TYPE_CHECKING, cast - -from typing import Literal - -from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase, CustomDetectorMixin +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from ophyd import Device, Kind, DeviceStatus, Component as Cpt -from ophyd import EpicsSignal, EpicsSignalRO +from ophyd import EpicsSignal, EpicsSignalRO, StatusBase from ophyd_devices.sim.sim_signals import SetableSignal from bec_lib.logger import bec_logger -from debye_bec.devices.nidaq.nidaq_enums import NIDAQCompression, ScanType, NidaqState, ScanRates, ReadoutRange, EncoderTypes +from debye_bec.devices.nidaq.nidaq_enums import ( + NIDAQCompression, + ScanType, + NidaqState, + ScanRates, + ReadoutRange, + EncoderTypes, +) + +if TYPE_CHECKING: # pragma: no cover + from bec_lib.devicemanager import ScanInfo logger = bec_logger.logger + class NidaqError(Exception): - """ Nidaq specific error""" + """Nidaq specific error""" -class NIDAQCustomMixin(CustomDetectorMixin): - """ NIDAQ Custom Mixin class to implement the device and beamline-specific actions - to the psidetectorbase class via custom_prepare methods""" - def __init__(self, *_args, parent: Device = None, **_kwargs) -> None: - super().__init__(args=_args, parent=parent, kwargs=_kwargs) - self.timeout_wait_for_signal = 5 # put 5s firsts - self.valid_scan_names = ["xas_simple_scan", - "xas_simple_scan_with_xrd", - "xas_advanced_scan", - "xas_advanced_scan_with_xrd"] - - def _check_if_scan_name_is_valid(self) -> bool: - """ Check if the scan is within the list of scans for which the backend is working""" - scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "") - if scan_name in self.valid_scan_names: - return True - return False - - def on_connection_established(self) -> None: - """Method called once wait_for_connection is called on the parent class. - This should be used to implement checks that require the device to be connected, i.e. setting standard pvs. - """ - if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.STANDBY)], - timeout = self.timeout_wait_for_signal, - check_stopped=True): - raise NidaqError(f"Device {self.parent.name} has not been reached in state STANDBY, current state {NidaqState(self.parent.state.get())}") - self.parent.scan_duration.set(0).wait() - - def on_stop(self): - """ Stop the NIDAQ backend""" - self.parent.stop_call.set(1).wait() - - def on_complete(self) -> None | DeviceStatus: - """ Complete actions. For the NIDAQ we use this method to stop the backend since it - would not stop by itself in its current implementation since the number of points are not predefined. - """ - if not self._check_if_scan_name_is_valid(): - return None - self.on_stop() - #TODO check if this wait can be removed. We are waiting in unstage anyways which will always be called afterwards - # Wait for device to be stopped - status = self.wait_with_status(signal_conditions=[(self.parent.state.get, NidaqState.STANDBY)], - check_stopped= True, - timeout=self.timeout_wait_for_signal, - ) - return status - - def on_stage(self): - """ Prepare the device for the upcoming acquisition. If the upcoming scan is not in the list - of valid scans, return immediately. """ - if not self._check_if_scan_name_is_valid(): - return None - if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.STANDBY)], - timeout = self.timeout_wait_for_signal, - check_stopped=True): - raise NidaqError(f"Device {self.parent.name} has not been reached in state STANDBY, current state {NidaqState(self.parent.state.get())}") - self.parent.scan_type.set(ScanType.TRIGGERED).wait() - self.parent.scan_duration.set(0).wait() - self.parent.stage_call.set(1).wait() - if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.STAGE)], - timeout = self.timeout_wait_for_signal, - check_stopped=True, - ): - raise NidaqError(f"Device {self.parent.name} has not been reached in state STAGE, current state {NidaqState(self.parent.state.get())}") - self.parent.kickoff_call.set(1).wait() - logger.info(f"Device {self.parent.name} was staged: {NidaqState(self.parent.state.get())}") - - def on_pre_scan(self) -> None: - """ Execute time critical actions. Here we ensure that the NIDAQ master task is running - before the motor starts its oscillation. This is needed for being properly homed. - The NIDAQ should go into Acquiring mode. """ - if not self._check_if_scan_name_is_valid(): - return None - if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.KICKOFF)], - timeout = self.timeout_wait_for_signal, - check_stopped=True, - ): - raise NidaqError(f"Device {self.parent.name} failed to reach state KICKOFF during pre scan, current state {NidaqState(self.parent.state.get())}") - logger.info(f"Device {self.parent.name} ready to take data after pre_scan: {NidaqState(self.parent.state.get())}") - - def on_unstage(self) -> None: - """ Unstage actions, the NIDAQ has to be in STANDBY state.""" - if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.STANDBY)], - timeout = self.timeout_wait_for_signal, - check_stopped=False): - raise NidaqError(f"Device {self.parent.name} has not been reached in state STANDBY, current state {NidaqState(self.parent.state.get())}") - logger.info(f"Device {self.parent.name} was unstaged: {NidaqState(self.parent.state.get())}") - - -class Nidaq(PSIDetectorBase): - """ NIDAQ ophyd wrapper around the NIDAQ backend currently running at x01da-cons-05 - - Args: - prefix (str) : Prefix to the NIDAQ soft ioc, currently X01DA-PC-SCANSERVER: - name (str) : Name of the device - kind (Kind) : Ophyd Kind of the device - parent (Device) : Parent clas - device_manager : device manager as forwarded by BEC - """ - - USER_ACCESS = ['set_config'] - - enc = Cpt(SetableSignal,value=0, kind=Kind.normal) - signal_1 = Cpt(SetableSignal,value=0, kind=Kind.normal) - signal_2 = Cpt(SetableSignal,value=0, kind=Kind.normal) - signal_3 = Cpt(SetableSignal,value=0, kind=Kind.normal) - signal_4 = Cpt(SetableSignal,value=0, kind=Kind.normal) - signal_5 = Cpt(SetableSignal,value=0, kind=Kind.normal) - signal_6 = Cpt(SetableSignal,value=0, kind=Kind.normal) - signal_7 = Cpt(SetableSignal,value=0, kind=Kind.normal) - signal_8 = Cpt(SetableSignal,value=0, kind=Kind.normal) +class NidaqControl(Device): + """Nidaq control class with all PVs""" + enc = Cpt(SetableSignal, value=0, kind=Kind.normal) + signal_1 = Cpt(SetableSignal, value=0, kind=Kind.normal) + signal_2 = Cpt(SetableSignal, value=0, kind=Kind.normal) + signal_3 = Cpt(SetableSignal, value=0, kind=Kind.normal) + signal_4 = Cpt(SetableSignal, value=0, kind=Kind.normal) + signal_5 = Cpt(SetableSignal, value=0, kind=Kind.normal) + signal_6 = Cpt(SetableSignal, value=0, kind=Kind.normal) + signal_7 = Cpt(SetableSignal, value=0, kind=Kind.normal) + signal_8 = Cpt(SetableSignal, value=0, kind=Kind.normal) enable_compression = Cpt(EpicsSignal, suffix="NIDAQ-EnableRLE", kind=Kind.config) kickoff_call = Cpt(EpicsSignal, suffix="NIDAQ-Kickoff", kind=Kind.config) - stage_call = Cpt(EpicsSignal, suffix="NIDAQ-Stage", kind = Kind.config) - state = Cpt(EpicsSignal, suffix="NIDAQ-FSMState", kind= Kind.config) + stage_call = Cpt(EpicsSignal, suffix="NIDAQ-Stage", kind=Kind.config) + state = Cpt(EpicsSignal, suffix="NIDAQ-FSMState", kind=Kind.config) server_status = Cpt(EpicsSignalRO, suffix="NIDAQ-ServerStatus", kind=Kind.config) compression_ratio = Cpt(EpicsSignalRO, suffix="NIDAQ-CompressionRatio", kind=Kind.config) scan_type = Cpt(EpicsSignal, suffix="NIDAQ-ScanType", kind=Kind.config) @@ -145,35 +55,55 @@ class Nidaq(PSIDetectorBase): ci_chans = Cpt(EpicsSignal, suffix="NIDAQ-CIChans6614", kind=Kind.config) di_chans = Cpt(EpicsSignal, suffix="NIDAQ-DIChans", kind=Kind.config) - custom_prepare_cls = NIDAQCustomMixin - def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs): - super().__init__(name=name, prefix=prefix, kind=kind, parent=parent, device_manager=device_manager, **kwargs) +class Nidaq(PSIDeviceBase, NidaqControl): + """NIDAQ ophyd wrapper around the NIDAQ backend currently running at x01da-cons-05 + Args: + prefix (str) : Prefix to the NIDAQ soft ioc, currently X01DA-PC-SCANSERVER: + name (str) : Name of the device + scan_info (ScanInfo) : ScanInfo object passed by BEC's devicemanager. + """ + + USER_ACCESS = ["set_config"] + + def __init__(self, prefix: str = "", *, name: str, scan_info: ScanInfo = None, **kwargs): + super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs) + self.timeout_wait_for_signal = 5 # put 5s firsts + self.valid_scan_names = [ + "xas_simple_scan", + "xas_simple_scan_with_xrd", + "xas_advanced_scan", + "xas_advanced_scan_with_xrd", + ] + + ######################################## + # Beamline Methods # + ######################################## + + def _check_if_scan_name_is_valid(self) -> bool: + """Check if the scan is within the list of scans for which the backend is working""" + scan_name = self.scan_info.msg.scan_name + if scan_name in self.valid_scan_names: + return True + return False def set_config( - self, - sampling_rate: Literal[100000, - 500000, - 1000000, - 2000000, - 4000000, - 5000000, - 10000000, - 14286000, - ], - ai: list, - ci: list, - di: list, - scan_type: Literal['continuous', 'triggered'] = 'triggered', - scan_duration: float = 0, - readout_range: Literal[1, 2, 5, 10] = 10, - encoder_type: Literal['X_1', 'X_2', 'X_4'] = 'X_4', - enable_compression: bool = True, - + self, + sampling_rate: Literal[ + 100000, 500000, 1000000, 2000000, 4000000, 5000000, 10000000, 14286000 + ], + ai: list, + ci: list, + di: list, + scan_type: Literal["continuous", "triggered"] = "triggered", + scan_duration: float = 0, + readout_range: Literal[1, 2, 5, 10] = 10, + encoder_type: Literal["X_1", "X_2", "X_4"] = "X_4", + enable_compression: bool = True, ) -> None: """Method to configure the NIDAQ - + Args: sampling_rate(Literal[100000, 500000, 1000000, 2000000, 4000000, 5000000, 10000000, 14286000]): Sampling rate in Hz @@ -191,7 +121,7 @@ class Nidaq(PSIDetectorBase): enable_compression(bool): Enable or disable compression of data, default True """ - if sampling_rate == 100000: + if sampling_rate == 100000: self.sampling_rate.put(ScanRates.HUNDRED_KHZ) elif sampling_rate == 500000: self.sampling_rate.put(ScanRates.FIVE_HUNDRED_KHZ) @@ -232,15 +162,15 @@ class Nidaq(PSIDetectorBase): di_chans = di_chans | (1 << ch) self.di_chans.put(di_chans) - if scan_type in 'continuous': + if scan_type in "continuous": self.scan_type.put(ScanType.CONTINUOUS) - elif scan_type in 'triggered': + elif scan_type in "triggered": self.scan_type.put(ScanType.TRIGGERED) if scan_duration >= 0: self.scan_duration.put(scan_duration) - if readout_range == 1: + if readout_range == 1: self.readout_range.put(ReadoutRange.ONE_V) elif readout_range == 2: self.readout_range.put(ReadoutRange.TWO_V) @@ -249,14 +179,134 @@ class Nidaq(PSIDetectorBase): elif readout_range == 10: self.readout_range.put(ReadoutRange.TEN_V) - if encoder_type in 'X_1': + if encoder_type in "X_1": self.encoder_type.put(EncoderTypes.X_1) - elif encoder_type in 'X_2': + elif encoder_type in "X_2": self.encoder_type.put(EncoderTypes.X_2) - elif encoder_type in 'X_4': + elif encoder_type in "X_4": self.encoder_type.put(EncoderTypes.X_4) if enable_compression is True: self.enable_compression.put(NIDAQCompression.ON) elif enable_compression is False: self.enable_compression.put(NIDAQCompression.OFF) + + ######################################## + # Beamline Specific Implementations # + ######################################## + + def on_init(self) -> None: + """ + Called when the device is initialized. + + No signals are connected at this point. If you like to + set default values on signals, please use on_connected instead. + """ + + def on_connected(self) -> None: + """ + Called after the device is connected and its signals are connected. + Default values for signals should be set here. + """ + if not self.wait_for_condition( + condition= lambda: self.state.get() == NidaqState.STANDBY, + timeout = self.timeout_wait_for_signal, + check_stopped=True): + raise NidaqError(f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}") + self.scan_duration.set(0).wait() + + def on_stage(self) -> DeviceStatus | StatusBase | None: + """ + Called while staging the device. + + Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object. + If the upcoming scan is not in the list of valid scans, return immediately. + """ + if not self._check_if_scan_name_is_valid(): + return None + + if not self.wait_for_condition( + condition=lambda: self.state.get() == NidaqState.STANDBY, timeout=self.timeout_wait_for_signal, check_stopped=True + ): + raise NidaqError( + f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}" + ) + self.scan_type.set(ScanType.TRIGGERED).wait() + self.scan_duration.set(0).wait() + self.stage_call.set(1).wait() + + if not self.wait_for_condition( + condition=lambda: self.state.get() == NidaqState.STAGE, timeout=self.timeout_wait_for_signal, check_stopped=True + ): + raise NidaqError( + f"Device {self.name} has not been reached in state STAGE, current state {NidaqState(self.state.get())}" + ) + self.kickoff_call.set(1).wait() + logger.info(f"Device {self.name} was staged: {NidaqState(self.state.get())}") + + def on_unstage(self) -> DeviceStatus | StatusBase | None: + """Called while unstaging the device. Check that the Nidaq goes into Standby""" + + def _get_state(): + return self.state.get() == NidaqState.STANDBY + + if not self.wait_for_condition( + condition=_get_state, timeout=self.timeout_wait_for_signal, check_stopped=False + ): + raise NidaqError( + f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}" + ) + logger.info(f"Device {self.name} was unstaged: {NidaqState(self.state.get())}") + + def on_pre_scan(self) -> DeviceStatus | StatusBase | None: + """ + Called right before the scan starts on all devices automatically. + + Here we ensure that the NIDAQ master task is running + before the motor starts its oscillation. This is needed for being properly homed. + The NIDAQ should go into Acquiring mode. + """ + if not self._check_if_scan_name_is_valid(): + return None + + def _wait_for_state(): + return self.state.get() == NidaqState.KICKOFF + + if not self.wait_for_condition( + _wait_for_state, timeout=self.timeout_wait_for_signal, check_stopped=True + ): + raise NidaqError( + f"Device {self.name} failed to reach state KICKOFF during pre scan, current state {NidaqState(self.state.get())}" + ) + logger.info( + f"Device {self.name} ready to take data after pre_scan: {NidaqState(self.state.get())}" + ) + + def on_trigger(self) -> DeviceStatus | StatusBase | None: + """Called when the device is triggered.""" + + def on_complete(self) -> DeviceStatus | StatusBase | None: + """ + Called to inquire if a device has completed a scans. + + For the NIDAQ we use this method to stop the backend since it + would not stop by itself in its current implementation since the number of points are not predefined. + """ + if not self._check_if_scan_name_is_valid(): + return None + self.on_stop() + #TODO check if this wait can be removed. We are waiting in unstage anyways which will always be called afterwards + # Wait for device to be stopped + status = self.wait_for_condition( + condition = lambda: self.state.get() == NidaqState.STANDBY, + check_stopped=True, + timeout=self.timeout_wait_for_signal, + ) + return status + + def on_kickoff(self) -> DeviceStatus | StatusBase | None: + """Called to kickoff a device for a fly scan. Has to be called explicitly.""" + + def on_stop(self) -> None: + """Called when the device is stopped.""" + self.stop_call.set(1).wait()