Added config function for nidaq

This commit is contained in:
gac-x01da
2024-10-31 07:17:59 +01:00
committed by wakonig_k
parent 23dd8c11e0
commit e49fc6af41

View File

@@ -1,5 +1,7 @@
import enum
from typing import Literal
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase, CustomDetectorMixin
from ophyd import Device, Kind, DeviceStatus, Component as Cpt
from ophyd import EpicsSignal, EpicsSignalRO
@@ -35,11 +37,11 @@ class ScanRates(int, enum.Enum):
HUNDRED_KHZ = 0
FIVE_HUNDRED_KHZ = 1
ONE_MHZ = 2
FIVE_MHZ = 3
TEN_MHZ = 4
ELEVEN_ONE_MHZ = 5
TWELVE_FIVE_MHZ = 6
FOURTEEN_THREE_MTHZ = 7
TWO_MHZ = 3
FOUR_MHZ = 4
FIVE_MHZ = 5
TEN_MHZ = 6
FOURTEEN_THREE_MHZ = 7
class ReadoutRange(int, enum.Enum):
"""ReadoutRange in +-V"""
@@ -50,10 +52,9 @@ class ReadoutRange(int, enum.Enum):
class EncoderTypes(int, enum.Enum):
""" Encoder Types"""
TWO_PULSE_COUNTING = 0
X_1 = 1
X_2 = 2
X_4 = 3
X_1 = 0
X_2 = 1
X_4 = 2
class NIDAQCustomMixin(CustomDetectorMixin):
""" NIDAQ Custom Mixin class to implement the device and beamline-specific actions
@@ -82,13 +83,7 @@ class NIDAQCustomMixin(CustomDetectorMixin):
timeout = self.timeout_wait_for_signal,
check_stopped=True):
raise NidaqError(f"Device {self.parent.name} has not been reached in state STANDBY, current state {NidaqState(self.parent.state.get())}")
self.parent.encoder_type.set(EncoderTypes.X_1).wait()
self.parent.readout_range.set(ReadoutRange.TEN_V).wait()
self.parent.scan_type.set(ScanType.TRIGGERED).wait()
# TODO, not working with scan_duration 0 for the moment to be fixed once solved
self.parent.scan_duration.set(999999).wait()
# To be checked as default
self.parent.scan_rate.set(ScanRates.TEN_MHZ).wait()
self.parent.scan_duration.set(0).wait()
def on_stop(self):
""" Stop the NIDAQ backend"""
@@ -119,8 +114,7 @@ class NIDAQCustomMixin(CustomDetectorMixin):
check_stopped=True):
raise NidaqError(f"Device {self.parent.name} has not been reached in state STANDBY, current state {NidaqState(self.parent.state.get())}")
self.parent.scan_type.set(ScanType.TRIGGERED).wait()
# TODO, not working with scan_duration 0 for the moment to be fixed once solved
self.parent.scan_duration.set(999999).wait()
self.parent.scan_duration.set(0).wait()
self.parent.stage_call.set(1).wait()
if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.STAGE)],
timeout = self.timeout_wait_for_signal,
@@ -162,6 +156,9 @@ class NIDAQ(PSIDetectorBase):
parent (Device) : Parent clas
device_manager : device manager as forwarded by BEC
"""
USER_ACCESS = ['set_config']
encoder_angle = Cpt(SetableSignal,value=0, kind=Kind.normal)
signal_1 = Cpt(SetableSignal,value=0, kind=Kind.normal)
signal_2 = Cpt(SetableSignal,value=0, kind=Kind.normal)
@@ -186,11 +183,122 @@ class NIDAQ(PSIDetectorBase):
encoder_type = Cpt(EpicsSignal, suffix="NIDAQ-EncoderType", kind=Kind.config)
stop_call = Cpt(EpicsSignal, suffix="NIDAQ-Stop", kind=Kind.config)
# ai_channels_6396 = # To be added NIDAQ-AIChans
# ci_channels_6396 = # NIDAQ-CIChans6396
# ci_channels_6614 = # NIDAQ-CIChans6614
ai_chans = Cpt(EpicsSignal, suffix="NIDAQ-AIChans", kind=Kind.config)
ci_chans = Cpt(EpicsSignal, suffix="NIDAQ-CIChans6614", kind=Kind.config)
di_chans = Cpt(EpicsSignal, suffix="NIDAQ-DIChans", kind=Kind.config)
custom_prepare_cls = NIDAQCustomMixin
def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs):
super().__init__(prefix, name=name, kind=kind, parent=parent, device_manager=device_manager, **kwargs)
def set_config(
self,
sampling_rate: Literal[100000,
500000,
1000000,
2000000,
4000000,
5000000,
10000000,
14286000,
],
ai: list,
ci: list,
di: list,
scan_type: Literal['continuous', 'triggered'] = 'triggered',
scan_duration: float = 0,
readout_range: Literal[1, 2, 5, 10] = 10,
encoder_type: Literal['X_1', 'X_2', 'X_4'] = 'X_4',
enable_compression: bool = True,
) -> None:
"""Method to configure the NIDAQ
Args:
sampling_rate(Literal[100000, 500000, 1000000, 2000000, 4000000, 5000000,
10000000, 14286000]): Sampling rate in Hz
ai(list): List of analog input channel numbers to add, i.e. [0, 1, 2] for
input 0, 1 and 2
ci(list): List of counter input channel numbers to add, i.e. [0, 1, 2] for
input 0, 1 and 2
di(list): List of digital input channel numbers to add, i.e. [0, 1, 2] for
input 0, 1 and 2
scan_type(Literal['continuous', 'triggered']): Triggered to use with monochromator,
otherwise continuous, default 'triggered'
scan_duration(float): Scan duration in seconds, use 0 for infinite scan, default 0
readout_range(Literal[1, 2, 5, 10]): Readout range in +- Volts, default +-10V
encoder_type(Literal['X_1', 'X_2', 'X_4']): Encoder readout type, default 'X_4'
enable_compression(bool): Enable or disable compression of data, default True
"""
if sampling_rate == 100000:
self.sampling_rate.put(ScanRates.HUNDRED_KHZ)
elif sampling_rate == 500000:
self.sampling_rate.put(ScanRates.FIVE_HUNDRED_KHZ)
elif sampling_rate == 1000000:
self.sampling_rate.put(ScanRates.ONE_MHZ)
elif sampling_rate == 2000000:
self.sampling_rate.put(ScanRates.TWO_MHZ)
elif sampling_rate == 4000000:
self.sampling_rate.put(ScanRates.FOUR_MHZ)
elif sampling_rate == 5000000:
self.sampling_rate.put(ScanRates.FIVE_MHZ)
elif sampling_rate == 10000000:
self.sampling_rate.put(ScanRates.TEN_MHZ)
elif sampling_rate == 14286000:
self.sampling_rate.put(ScanRates.FOURTEEN_THREE_MHZ)
ai_chans = 0
if isinstance(ai, list):
for ch in ai:
if isinstance(ch, int):
if ch >= 0 and ch <= 7:
ai_chans = ai_chans | (1 << ch)
self.ai_chans.put(ai_chans)
ci_chans = 0
if isinstance(ci, list):
for ch in ci:
if isinstance(ch, int):
if ch >= 0 and ch <= 7:
ci_chans = ci_chans | (1 << ch)
self.ci_chans.put(ci_chans)
di_chans = 0
if isinstance(di, list):
for ch in di:
if isinstance(ch, int):
if ch >= 0 and ch <= 4:
di_chans = di_chans | (1 << ch)
self.di_chans.put(di_chans)
if scan_type in 'continuous':
self.scan_type.put(ScanType.CONTINUOUS)
elif scan_type in 'triggered':
self.scan_type.put(ScanType.TRIGGERED)
if scan_duration >= 0:
self.scan_duration.put(scan_duration)
if readout_range == 1:
self.readout_range.put(ReadoutRange.ONE_V)
elif readout_range == 2:
self.readout_range.put(ReadoutRange.TWO_V)
elif readout_range == 5:
self.readout_range.put(ReadoutRange.FIVE_V)
elif readout_range == 10:
self.readout_range.put(ReadoutRange.TEN_V)
if encoder_type in 'X_1':
self.encoder_type.put(EncoderTypes.X_1)
elif encoder_type in 'X_2':
self.encoder_type.put(EncoderTypes.X_2)
elif encoder_type in 'X_4':
self.encoder_type.put(EncoderTypes.X_4)
if enable_compression is True:
self.enable_compression.put(NIDAQCompression.ON)
elif enable_compression is False:
self.enable_compression.put(NIDAQCompression.OFF)