diff --git a/debye_bec/devices/nidaq.py b/debye_bec/devices/nidaq.py index 618277d..b9fc79e 100644 --- a/debye_bec/devices/nidaq.py +++ b/debye_bec/devices/nidaq.py @@ -1,5 +1,7 @@ import enum +from typing import Literal + from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase, CustomDetectorMixin from ophyd import Device, Kind, DeviceStatus, Component as Cpt from ophyd import EpicsSignal, EpicsSignalRO @@ -35,11 +37,11 @@ class ScanRates(int, enum.Enum): HUNDRED_KHZ = 0 FIVE_HUNDRED_KHZ = 1 ONE_MHZ = 2 - FIVE_MHZ = 3 - TEN_MHZ = 4 - ELEVEN_ONE_MHZ = 5 - TWELVE_FIVE_MHZ = 6 - FOURTEEN_THREE_MTHZ = 7 + TWO_MHZ = 3 + FOUR_MHZ = 4 + FIVE_MHZ = 5 + TEN_MHZ = 6 + FOURTEEN_THREE_MHZ = 7 class ReadoutRange(int, enum.Enum): """ReadoutRange in +-V""" @@ -50,10 +52,9 @@ class ReadoutRange(int, enum.Enum): class EncoderTypes(int, enum.Enum): """ Encoder Types""" - TWO_PULSE_COUNTING = 0 - X_1 = 1 - X_2 = 2 - X_4 = 3 + X_1 = 0 + X_2 = 1 + X_4 = 2 class NIDAQCustomMixin(CustomDetectorMixin): """ NIDAQ Custom Mixin class to implement the device and beamline-specific actions @@ -82,13 +83,7 @@ class NIDAQCustomMixin(CustomDetectorMixin): timeout = self.timeout_wait_for_signal, check_stopped=True): raise NidaqError(f"Device {self.parent.name} has not been reached in state STANDBY, current state {NidaqState(self.parent.state.get())}") - self.parent.encoder_type.set(EncoderTypes.X_1).wait() - self.parent.readout_range.set(ReadoutRange.TEN_V).wait() - self.parent.scan_type.set(ScanType.TRIGGERED).wait() - # TODO, not working with scan_duration 0 for the moment to be fixed once solved - self.parent.scan_duration.set(999999).wait() - # To be checked as default - self.parent.scan_rate.set(ScanRates.TEN_MHZ).wait() + self.parent.scan_duration.set(0).wait() def on_stop(self): """ Stop the NIDAQ backend""" @@ -119,8 +114,7 @@ class NIDAQCustomMixin(CustomDetectorMixin): check_stopped=True): raise NidaqError(f"Device {self.parent.name} has not been reached in state STANDBY, current state {NidaqState(self.parent.state.get())}") self.parent.scan_type.set(ScanType.TRIGGERED).wait() - # TODO, not working with scan_duration 0 for the moment to be fixed once solved - self.parent.scan_duration.set(999999).wait() + self.parent.scan_duration.set(0).wait() self.parent.stage_call.set(1).wait() if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.STAGE)], timeout = self.timeout_wait_for_signal, @@ -162,6 +156,9 @@ class NIDAQ(PSIDetectorBase): parent (Device) : Parent clas device_manager : device manager as forwarded by BEC """ + + USER_ACCESS = ['set_config'] + encoder_angle = Cpt(SetableSignal,value=0, kind=Kind.normal) signal_1 = Cpt(SetableSignal,value=0, kind=Kind.normal) signal_2 = Cpt(SetableSignal,value=0, kind=Kind.normal) @@ -186,11 +183,122 @@ class NIDAQ(PSIDetectorBase): encoder_type = Cpt(EpicsSignal, suffix="NIDAQ-EncoderType", kind=Kind.config) stop_call = Cpt(EpicsSignal, suffix="NIDAQ-Stop", kind=Kind.config) - # ai_channels_6396 = # To be added NIDAQ-AIChans - # ci_channels_6396 = # NIDAQ-CIChans6396 - # ci_channels_6614 = # NIDAQ-CIChans6614 + ai_chans = Cpt(EpicsSignal, suffix="NIDAQ-AIChans", kind=Kind.config) + ci_chans = Cpt(EpicsSignal, suffix="NIDAQ-CIChans6614", kind=Kind.config) + di_chans = Cpt(EpicsSignal, suffix="NIDAQ-DIChans", kind=Kind.config) custom_prepare_cls = NIDAQCustomMixin def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs): super().__init__(prefix, name=name, kind=kind, parent=parent, device_manager=device_manager, **kwargs) + + + def set_config( + self, + sampling_rate: Literal[100000, + 500000, + 1000000, + 2000000, + 4000000, + 5000000, + 10000000, + 14286000, + ], + ai: list, + ci: list, + di: list, + scan_type: Literal['continuous', 'triggered'] = 'triggered', + scan_duration: float = 0, + readout_range: Literal[1, 2, 5, 10] = 10, + encoder_type: Literal['X_1', 'X_2', 'X_4'] = 'X_4', + enable_compression: bool = True, + + ) -> None: + """Method to configure the NIDAQ + + Args: + sampling_rate(Literal[100000, 500000, 1000000, 2000000, 4000000, 5000000, + 10000000, 14286000]): Sampling rate in Hz + ai(list): List of analog input channel numbers to add, i.e. [0, 1, 2] for + input 0, 1 and 2 + ci(list): List of counter input channel numbers to add, i.e. [0, 1, 2] for + input 0, 1 and 2 + di(list): List of digital input channel numbers to add, i.e. [0, 1, 2] for + input 0, 1 and 2 + scan_type(Literal['continuous', 'triggered']): Triggered to use with monochromator, + otherwise continuous, default 'triggered' + scan_duration(float): Scan duration in seconds, use 0 for infinite scan, default 0 + readout_range(Literal[1, 2, 5, 10]): Readout range in +- Volts, default +-10V + encoder_type(Literal['X_1', 'X_2', 'X_4']): Encoder readout type, default 'X_4' + enable_compression(bool): Enable or disable compression of data, default True + + """ + if sampling_rate == 100000: + self.sampling_rate.put(ScanRates.HUNDRED_KHZ) + elif sampling_rate == 500000: + self.sampling_rate.put(ScanRates.FIVE_HUNDRED_KHZ) + elif sampling_rate == 1000000: + self.sampling_rate.put(ScanRates.ONE_MHZ) + elif sampling_rate == 2000000: + self.sampling_rate.put(ScanRates.TWO_MHZ) + elif sampling_rate == 4000000: + self.sampling_rate.put(ScanRates.FOUR_MHZ) + elif sampling_rate == 5000000: + self.sampling_rate.put(ScanRates.FIVE_MHZ) + elif sampling_rate == 10000000: + self.sampling_rate.put(ScanRates.TEN_MHZ) + elif sampling_rate == 14286000: + self.sampling_rate.put(ScanRates.FOURTEEN_THREE_MHZ) + + ai_chans = 0 + if isinstance(ai, list): + for ch in ai: + if isinstance(ch, int): + if ch >= 0 and ch <= 7: + ai_chans = ai_chans | (1 << ch) + self.ai_chans.put(ai_chans) + + ci_chans = 0 + if isinstance(ci, list): + for ch in ci: + if isinstance(ch, int): + if ch >= 0 and ch <= 7: + ci_chans = ci_chans | (1 << ch) + self.ci_chans.put(ci_chans) + + di_chans = 0 + if isinstance(di, list): + for ch in di: + if isinstance(ch, int): + if ch >= 0 and ch <= 4: + di_chans = di_chans | (1 << ch) + self.di_chans.put(di_chans) + + if scan_type in 'continuous': + self.scan_type.put(ScanType.CONTINUOUS) + elif scan_type in 'triggered': + self.scan_type.put(ScanType.TRIGGERED) + + if scan_duration >= 0: + self.scan_duration.put(scan_duration) + + if readout_range == 1: + self.readout_range.put(ReadoutRange.ONE_V) + elif readout_range == 2: + self.readout_range.put(ReadoutRange.TWO_V) + elif readout_range == 5: + self.readout_range.put(ReadoutRange.FIVE_V) + elif readout_range == 10: + self.readout_range.put(ReadoutRange.TEN_V) + + if encoder_type in 'X_1': + self.encoder_type.put(EncoderTypes.X_1) + elif encoder_type in 'X_2': + self.encoder_type.put(EncoderTypes.X_2) + elif encoder_type in 'X_4': + self.encoder_type.put(EncoderTypes.X_4) + + if enable_compression is True: + self.enable_compression.put(NIDAQCompression.ON) + elif enable_compression is False: + self.enable_compression.put(NIDAQCompression.OFF)