feat: add nidaq draft for ophyd

This commit is contained in:
gac-x01da
2024-09-18 09:41:23 +02:00
committed by wakonig_k
parent 9b9db93677
commit 6d43a08bd5
2 changed files with 206 additions and 0 deletions

View File

@@ -17,3 +17,13 @@ dummy_pv:
onFailure: retry
enabled: true
softwareTrigger: false
nidaq:
readoutPriority: async
description: NIDAQ backend for data reading for debye scans
deviceClass: debye_bec.devices.nidaq.NIDAQ
deviceConfig:
prefix: "X01DA-PC-SCANSERVER:"
onFailure: retry
enabled: true
softwareTrigger: false

196
debye_bec/devices/nidaq.py Normal file
View File

@@ -0,0 +1,196 @@
import enum
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase, CustomDetectorMixin
from ophyd import Device, Kind, DeviceStatus, Component as Cpt
from ophyd import EpicsSignal, EpicsSignalRO
from ophyd_devices.sim.sim_signals import SetableSignal
from bec_lib.logger import bec_logger
logger = bec_logger.logger
class NidaqError(Exception):
""" Nidaq specific error"""
class NIDAQCompression(str, enum.Enum):
""" Options for Compression"""
OFF = 0
ON = 1
class ScanType(int, enum.Enum):
""" Triggering options of the backend"""
TRIGGERED = 0
CONTINUOUS = 1
class NidaqState(int, enum.Enum):
""" Possible States of the NIDAQ backend"""
DISABLED = 0
STANDBY = 1
PRERUN = 2
ACQUIRING = 3
STOPPED = 4
ERRORSTOP = 5
class ScanRates(int, enum.Enum):
""" Sampling Rate options for the backend, in kHZ and MHz"""
HUNDRED_KHZ = 0
FIVE_HUNDRED_KHZ = 1
ONE_MHZ = 2
FIVE_MHZ = 3
TEN_MHZ = 4
ELEVEN_ONE_MHZ = 5
TWELVE_FIVE_MHZ = 6
FOURTEEN_THREE_MTHZ = 7
class ReadoutRange(int, enum.Enum):
"""ReadoutRange in +-V"""
ONE_V = 0
TWO_V = 1
FIVE_V = 2
TEN_V = 3
class EncoderTypes(int, enum.Enum):
""" Encoder Types"""
TWO_PULSE_COUNTING = 0
X_1 = 1
X_2 = 2
X_4 = 3
class NIDAQCustomMixin(CustomDetectorMixin):
""" NIDAQ Custom Mixin class to implement the device and beamline-specific actions
to the psidetectorbase class via custom_prepare methods"""
def __init__(self, *_args, parent: Device = None, **_kwargs) -> None:
super().__init__(args=_args, parent=parent, kwargs=_kwargs)
self.timeout_wait_for_signal = 5 # put 5s firsts
self.valid_scan_names = ["xas_simple_scan",
"xas_simple_scan_with_xrd",
"xas_advanced_scan",
"xas_advanced_scan_with_xrd"]
def _check_if_scan_name_is_valid(self) -> bool:
""" Check if the scan is within the list of scans for which the backend is working"""
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name in self.valid_scan_names:
return True
return False
def on_connection_established(self) -> None:
"""Method called once wait_for_connection is called on the parent class.
This should be used to implement checks that require the device to be connected, i.e. setting standard pvs.
"""
if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.STANDBY)],
timeout = self.timeout_wait_for_signal,
check_stopped=True):
raise NidaqError(f"Device {self.parent.name} has not been reached in state STANDBY, current state {NidaqState(self.parent.state.get())}")
self.parent.encoder_type.set(EncoderTypes.X_1).wait()
self.parent.readout_range.set(ReadoutRange.TEN_V).wait()
self.parent.scan_type.set(ScanType.TRIGGERED).wait()
# TODO, not working with scan_duration 0 for the moment to be fixed once solved
self.parent.scan_duration.set(999999).wait()
# To be checked as default
self.parent.scan_rate.set(ScanRates.TEN_MHZ).wait()
def on_stop(self):
""" Stop the NIDAQ backend"""
self.parent.stop_call.set(1).wait()
def on_complete(self) -> None | DeviceStatus:
""" Complete actions. For the NIDAQ we use this method to stop the backend since it
would not stop by itself in its current implementation since the number of points are not predefined.
"""
if not self._check_if_scan_name_is_valid():
return None
self.on_stop()
#TODO check if this wait can be removed. We are waiting in unstage anyways which will always be called afterwards
# Wait for device to be stopped
status = self.wait_with_status(signal_conditions=[(self.parent.state.get, NidaqState.STANDBY)],
check_stopped= True,
timeout=self.timeout_wait_for_signal,
)
return status
def on_stage(self):
""" Prepare the device for the upcoming acquisition. If the upcoming scan is not in the list
of valid scans, return immediately. """
if not self._check_if_scan_name_is_valid():
return None
if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.STANDBY)],
timeout = self.timeout_wait_for_signal,
check_stopped=True):
raise NidaqError(f"Device {self.parent.name} has not been reached in state STANDBY, current state {NidaqState(self.parent.state.get())}")
self.parent.scan_type.set(ScanType.TRIGGERED).wait()
# TODO, not working with scan_duration 0 for the moment to be fixed once solved
self.parent.scan_duration.set(999999).wait()
self.parent.arm.set(1).wait()
if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.PRERUN)],
timeout = self.timeout_wait_for_signal,
check_stopped=True,
):
raise NidaqError(f"Device {self.parent.name} has not been reached in state PRERUN, current state {NidaqState(self.parent.state.get())}")
self.parent.trigger_call.set(1).wait()
logger.info(f"Device {self.parent.name} was staged: {NidaqState(self.parent.state.get())}")
def on_pre_scan(self) -> None:
""" Execute time critical actions. Here we ensure that the NIDAQ master task is running
before the motor starts its oscillation. This is needed for being properly homed.
The NIDAQ should go into Acquiring mode. """
if not self._check_if_scan_name_is_valid():
return None
if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.ACQUIRING)],
timeout = self.timeout_wait_for_signal,
check_stopped=True,
):
raise NidaqError(f"Device {self.parent.name} failed to reach state ACQUIRING during pre scan, current state {NidaqState(self.parent.state.get())}")
logger.info(f"Device {self.parent.name} ready to take data after pre_scan: {NidaqState(self.parent.state.get())}")
def on_unstage(self) -> None:
""" Unstage actions, the NIDAQ has to be in STANDBY state."""
if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.STANDBY)],
timeout = self.timeout_wait_for_signal,
check_stopped=False):
raise NidaqError(f"Device {self.parent.name} has not been reached in state STANDBY, current state {NidaqState(self.parent.state.get())}")
logger.info(f"Device {self.parent.name} was unstaged: {NidaqState(self.parent.state.get())}")
class NIDAQ(PSIDetectorBase):
""" NIDAQ ophyd wrapper around the NIDAQ backend currently running at x01da-cons-05
Args:
prefix (str) : Prefix to the NIDAQ soft ioc, currently X01DA-PC-SCANSERVER:
name (str) : Name of the device
kind (Kind) : Ophyd Kind of the device
parent (Device) : Parent clas
device_manager : device manager as forwarded by BEC
"""
encoder_angle = Cpt(SetableSignal,value=0, kind=Kind.normal)
signal_1 = Cpt(SetableSignal,value=0, kind=Kind.normal)
signal_2 = Cpt(SetableSignal,value=0, kind=Kind.normal)
signal_3 = Cpt(SetableSignal,value=0, kind=Kind.normal)
signal_4 = Cpt(SetableSignal,value=0, kind=Kind.normal)
signal_5 = Cpt(SetableSignal,value=0, kind=Kind.normal)
signal_6 = Cpt(SetableSignal,value=0, kind=Kind.normal)
signal_7 = Cpt(SetableSignal,value=0, kind=Kind.normal)
signal_8 = Cpt(SetableSignal,value=0, kind=Kind.normal)
enable_compression = Cpt(EpicsSignal, suffix="NIDAQ-EnableRLE", kind=Kind.config)
trigger_call = Cpt(EpicsSignal, suffix="NIDAQ-Trigger", kind=Kind.config)
arm = Cpt(EpicsSignal, suffix="NIDAQ-Arm", kind = Kind.config)
state = Cpt(EpicsSignal, suffix="NIDAQ-FSMState", kind= Kind.config)
server_status = Cpt(EpicsSignalRO, suffix="NIDAQ-ServerStatus", kind=Kind.config)
compression_ratio = Cpt(EpicsSignalRO, suffix="NIDAQ-CompressionRatio", kind=Kind.config)
scan_type = Cpt(EpicsSignal, suffix="NIDAQ-ScanType", kind=Kind.config)
sampling_rate = Cpt(EpicsSignal, suffix="NIDAQ-SamplingRateRequested", kind=Kind.config)
scan_duration = Cpt(EpicsSignal, suffix="NIDAQ-SamplingDuration", kind=Kind.config)
readout_range = Cpt(EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config)
encoder_type = Cpt(EpicsSignal, suffix="NIDAQ-EncoderType", kind=Kind.config)
stop_call = Cpt(EpicsSignal, suffix="NIDAQ-Stop", kind=Kind.config)
# ai_channels_6396 = # To be added NIDAQ-AIChans
# ci_channels_6396 = # NIDAQ-CIChans6396
# ci_channels_6614 = # NIDAQ-CIChans6614
custom_prepare_cls = NIDAQCustomMixin
def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs):
super().__init__(prefix, name=name, kind=kind, parent=parent, device_manager=device_manager, **kwargs)