Added config function for nidaq

This commit is contained in:
gac-x01da
2024-10-31 07:17:59 +01:00
committed by wakonig_k
parent 23dd8c11e0
commit e49fc6af41

View File

@@ -1,5 +1,7 @@
import enum import enum
from typing import Literal
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase, CustomDetectorMixin from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase, CustomDetectorMixin
from ophyd import Device, Kind, DeviceStatus, Component as Cpt from ophyd import Device, Kind, DeviceStatus, Component as Cpt
from ophyd import EpicsSignal, EpicsSignalRO from ophyd import EpicsSignal, EpicsSignalRO
@@ -35,11 +37,11 @@ class ScanRates(int, enum.Enum):
HUNDRED_KHZ = 0 HUNDRED_KHZ = 0
FIVE_HUNDRED_KHZ = 1 FIVE_HUNDRED_KHZ = 1
ONE_MHZ = 2 ONE_MHZ = 2
FIVE_MHZ = 3 TWO_MHZ = 3
TEN_MHZ = 4 FOUR_MHZ = 4
ELEVEN_ONE_MHZ = 5 FIVE_MHZ = 5
TWELVE_FIVE_MHZ = 6 TEN_MHZ = 6
FOURTEEN_THREE_MTHZ = 7 FOURTEEN_THREE_MHZ = 7
class ReadoutRange(int, enum.Enum): class ReadoutRange(int, enum.Enum):
"""ReadoutRange in +-V""" """ReadoutRange in +-V"""
@@ -50,10 +52,9 @@ class ReadoutRange(int, enum.Enum):
class EncoderTypes(int, enum.Enum): class EncoderTypes(int, enum.Enum):
""" Encoder Types""" """ Encoder Types"""
TWO_PULSE_COUNTING = 0 X_1 = 0
X_1 = 1 X_2 = 1
X_2 = 2 X_4 = 2
X_4 = 3
class NIDAQCustomMixin(CustomDetectorMixin): class NIDAQCustomMixin(CustomDetectorMixin):
""" NIDAQ Custom Mixin class to implement the device and beamline-specific actions """ NIDAQ Custom Mixin class to implement the device and beamline-specific actions
@@ -82,13 +83,7 @@ class NIDAQCustomMixin(CustomDetectorMixin):
timeout = self.timeout_wait_for_signal, timeout = self.timeout_wait_for_signal,
check_stopped=True): check_stopped=True):
raise NidaqError(f"Device {self.parent.name} has not been reached in state STANDBY, current state {NidaqState(self.parent.state.get())}") raise NidaqError(f"Device {self.parent.name} has not been reached in state STANDBY, current state {NidaqState(self.parent.state.get())}")
self.parent.encoder_type.set(EncoderTypes.X_1).wait() self.parent.scan_duration.set(0).wait()
self.parent.readout_range.set(ReadoutRange.TEN_V).wait()
self.parent.scan_type.set(ScanType.TRIGGERED).wait()
# TODO, not working with scan_duration 0 for the moment to be fixed once solved
self.parent.scan_duration.set(999999).wait()
# To be checked as default
self.parent.scan_rate.set(ScanRates.TEN_MHZ).wait()
def on_stop(self): def on_stop(self):
""" Stop the NIDAQ backend""" """ Stop the NIDAQ backend"""
@@ -119,8 +114,7 @@ class NIDAQCustomMixin(CustomDetectorMixin):
check_stopped=True): check_stopped=True):
raise NidaqError(f"Device {self.parent.name} has not been reached in state STANDBY, current state {NidaqState(self.parent.state.get())}") raise NidaqError(f"Device {self.parent.name} has not been reached in state STANDBY, current state {NidaqState(self.parent.state.get())}")
self.parent.scan_type.set(ScanType.TRIGGERED).wait() self.parent.scan_type.set(ScanType.TRIGGERED).wait()
# TODO, not working with scan_duration 0 for the moment to be fixed once solved self.parent.scan_duration.set(0).wait()
self.parent.scan_duration.set(999999).wait()
self.parent.stage_call.set(1).wait() self.parent.stage_call.set(1).wait()
if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.STAGE)], if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.STAGE)],
timeout = self.timeout_wait_for_signal, timeout = self.timeout_wait_for_signal,
@@ -162,6 +156,9 @@ class NIDAQ(PSIDetectorBase):
parent (Device) : Parent clas parent (Device) : Parent clas
device_manager : device manager as forwarded by BEC device_manager : device manager as forwarded by BEC
""" """
USER_ACCESS = ['set_config']
encoder_angle = Cpt(SetableSignal,value=0, kind=Kind.normal) encoder_angle = Cpt(SetableSignal,value=0, kind=Kind.normal)
signal_1 = Cpt(SetableSignal,value=0, kind=Kind.normal) signal_1 = Cpt(SetableSignal,value=0, kind=Kind.normal)
signal_2 = Cpt(SetableSignal,value=0, kind=Kind.normal) signal_2 = Cpt(SetableSignal,value=0, kind=Kind.normal)
@@ -186,11 +183,122 @@ class NIDAQ(PSIDetectorBase):
encoder_type = Cpt(EpicsSignal, suffix="NIDAQ-EncoderType", kind=Kind.config) encoder_type = Cpt(EpicsSignal, suffix="NIDAQ-EncoderType", kind=Kind.config)
stop_call = Cpt(EpicsSignal, suffix="NIDAQ-Stop", kind=Kind.config) stop_call = Cpt(EpicsSignal, suffix="NIDAQ-Stop", kind=Kind.config)
# ai_channels_6396 = # To be added NIDAQ-AIChans ai_chans = Cpt(EpicsSignal, suffix="NIDAQ-AIChans", kind=Kind.config)
# ci_channels_6396 = # NIDAQ-CIChans6396 ci_chans = Cpt(EpicsSignal, suffix="NIDAQ-CIChans6614", kind=Kind.config)
# ci_channels_6614 = # NIDAQ-CIChans6614 di_chans = Cpt(EpicsSignal, suffix="NIDAQ-DIChans", kind=Kind.config)
custom_prepare_cls = NIDAQCustomMixin custom_prepare_cls = NIDAQCustomMixin
def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs): def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs):
super().__init__(prefix, name=name, kind=kind, parent=parent, device_manager=device_manager, **kwargs) super().__init__(prefix, name=name, kind=kind, parent=parent, device_manager=device_manager, **kwargs)
def set_config(
self,
sampling_rate: Literal[100000,
500000,
1000000,
2000000,
4000000,
5000000,
10000000,
14286000,
],
ai: list,
ci: list,
di: list,
scan_type: Literal['continuous', 'triggered'] = 'triggered',
scan_duration: float = 0,
readout_range: Literal[1, 2, 5, 10] = 10,
encoder_type: Literal['X_1', 'X_2', 'X_4'] = 'X_4',
enable_compression: bool = True,
) -> None:
"""Method to configure the NIDAQ
Args:
sampling_rate(Literal[100000, 500000, 1000000, 2000000, 4000000, 5000000,
10000000, 14286000]): Sampling rate in Hz
ai(list): List of analog input channel numbers to add, i.e. [0, 1, 2] for
input 0, 1 and 2
ci(list): List of counter input channel numbers to add, i.e. [0, 1, 2] for
input 0, 1 and 2
di(list): List of digital input channel numbers to add, i.e. [0, 1, 2] for
input 0, 1 and 2
scan_type(Literal['continuous', 'triggered']): Triggered to use with monochromator,
otherwise continuous, default 'triggered'
scan_duration(float): Scan duration in seconds, use 0 for infinite scan, default 0
readout_range(Literal[1, 2, 5, 10]): Readout range in +- Volts, default +-10V
encoder_type(Literal['X_1', 'X_2', 'X_4']): Encoder readout type, default 'X_4'
enable_compression(bool): Enable or disable compression of data, default True
"""
if sampling_rate == 100000:
self.sampling_rate.put(ScanRates.HUNDRED_KHZ)
elif sampling_rate == 500000:
self.sampling_rate.put(ScanRates.FIVE_HUNDRED_KHZ)
elif sampling_rate == 1000000:
self.sampling_rate.put(ScanRates.ONE_MHZ)
elif sampling_rate == 2000000:
self.sampling_rate.put(ScanRates.TWO_MHZ)
elif sampling_rate == 4000000:
self.sampling_rate.put(ScanRates.FOUR_MHZ)
elif sampling_rate == 5000000:
self.sampling_rate.put(ScanRates.FIVE_MHZ)
elif sampling_rate == 10000000:
self.sampling_rate.put(ScanRates.TEN_MHZ)
elif sampling_rate == 14286000:
self.sampling_rate.put(ScanRates.FOURTEEN_THREE_MHZ)
ai_chans = 0
if isinstance(ai, list):
for ch in ai:
if isinstance(ch, int):
if ch >= 0 and ch <= 7:
ai_chans = ai_chans | (1 << ch)
self.ai_chans.put(ai_chans)
ci_chans = 0
if isinstance(ci, list):
for ch in ci:
if isinstance(ch, int):
if ch >= 0 and ch <= 7:
ci_chans = ci_chans | (1 << ch)
self.ci_chans.put(ci_chans)
di_chans = 0
if isinstance(di, list):
for ch in di:
if isinstance(ch, int):
if ch >= 0 and ch <= 4:
di_chans = di_chans | (1 << ch)
self.di_chans.put(di_chans)
if scan_type in 'continuous':
self.scan_type.put(ScanType.CONTINUOUS)
elif scan_type in 'triggered':
self.scan_type.put(ScanType.TRIGGERED)
if scan_duration >= 0:
self.scan_duration.put(scan_duration)
if readout_range == 1:
self.readout_range.put(ReadoutRange.ONE_V)
elif readout_range == 2:
self.readout_range.put(ReadoutRange.TWO_V)
elif readout_range == 5:
self.readout_range.put(ReadoutRange.FIVE_V)
elif readout_range == 10:
self.readout_range.put(ReadoutRange.TEN_V)
if encoder_type in 'X_1':
self.encoder_type.put(EncoderTypes.X_1)
elif encoder_type in 'X_2':
self.encoder_type.put(EncoderTypes.X_2)
elif encoder_type in 'X_4':
self.encoder_type.put(EncoderTypes.X_4)
if enable_compression is True:
self.enable_compression.put(NIDAQCompression.ON)
elif enable_compression is False:
self.enable_compression.put(NIDAQCompression.OFF)