Added config function for nidaq
This commit is contained in:
@@ -1,5 +1,7 @@
|
|||||||
import enum
|
import enum
|
||||||
|
|
||||||
|
from typing import Literal
|
||||||
|
|
||||||
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase, CustomDetectorMixin
|
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase, CustomDetectorMixin
|
||||||
from ophyd import Device, Kind, DeviceStatus, Component as Cpt
|
from ophyd import Device, Kind, DeviceStatus, Component as Cpt
|
||||||
from ophyd import EpicsSignal, EpicsSignalRO
|
from ophyd import EpicsSignal, EpicsSignalRO
|
||||||
@@ -35,11 +37,11 @@ class ScanRates(int, enum.Enum):
|
|||||||
HUNDRED_KHZ = 0
|
HUNDRED_KHZ = 0
|
||||||
FIVE_HUNDRED_KHZ = 1
|
FIVE_HUNDRED_KHZ = 1
|
||||||
ONE_MHZ = 2
|
ONE_MHZ = 2
|
||||||
FIVE_MHZ = 3
|
TWO_MHZ = 3
|
||||||
TEN_MHZ = 4
|
FOUR_MHZ = 4
|
||||||
ELEVEN_ONE_MHZ = 5
|
FIVE_MHZ = 5
|
||||||
TWELVE_FIVE_MHZ = 6
|
TEN_MHZ = 6
|
||||||
FOURTEEN_THREE_MTHZ = 7
|
FOURTEEN_THREE_MHZ = 7
|
||||||
|
|
||||||
class ReadoutRange(int, enum.Enum):
|
class ReadoutRange(int, enum.Enum):
|
||||||
"""ReadoutRange in +-V"""
|
"""ReadoutRange in +-V"""
|
||||||
@@ -50,10 +52,9 @@ class ReadoutRange(int, enum.Enum):
|
|||||||
|
|
||||||
class EncoderTypes(int, enum.Enum):
|
class EncoderTypes(int, enum.Enum):
|
||||||
""" Encoder Types"""
|
""" Encoder Types"""
|
||||||
TWO_PULSE_COUNTING = 0
|
X_1 = 0
|
||||||
X_1 = 1
|
X_2 = 1
|
||||||
X_2 = 2
|
X_4 = 2
|
||||||
X_4 = 3
|
|
||||||
|
|
||||||
class NIDAQCustomMixin(CustomDetectorMixin):
|
class NIDAQCustomMixin(CustomDetectorMixin):
|
||||||
""" NIDAQ Custom Mixin class to implement the device and beamline-specific actions
|
""" NIDAQ Custom Mixin class to implement the device and beamline-specific actions
|
||||||
@@ -82,13 +83,7 @@ class NIDAQCustomMixin(CustomDetectorMixin):
|
|||||||
timeout = self.timeout_wait_for_signal,
|
timeout = self.timeout_wait_for_signal,
|
||||||
check_stopped=True):
|
check_stopped=True):
|
||||||
raise NidaqError(f"Device {self.parent.name} has not been reached in state STANDBY, current state {NidaqState(self.parent.state.get())}")
|
raise NidaqError(f"Device {self.parent.name} has not been reached in state STANDBY, current state {NidaqState(self.parent.state.get())}")
|
||||||
self.parent.encoder_type.set(EncoderTypes.X_1).wait()
|
self.parent.scan_duration.set(0).wait()
|
||||||
self.parent.readout_range.set(ReadoutRange.TEN_V).wait()
|
|
||||||
self.parent.scan_type.set(ScanType.TRIGGERED).wait()
|
|
||||||
# TODO, not working with scan_duration 0 for the moment to be fixed once solved
|
|
||||||
self.parent.scan_duration.set(999999).wait()
|
|
||||||
# To be checked as default
|
|
||||||
self.parent.scan_rate.set(ScanRates.TEN_MHZ).wait()
|
|
||||||
|
|
||||||
def on_stop(self):
|
def on_stop(self):
|
||||||
""" Stop the NIDAQ backend"""
|
""" Stop the NIDAQ backend"""
|
||||||
@@ -119,8 +114,7 @@ class NIDAQCustomMixin(CustomDetectorMixin):
|
|||||||
check_stopped=True):
|
check_stopped=True):
|
||||||
raise NidaqError(f"Device {self.parent.name} has not been reached in state STANDBY, current state {NidaqState(self.parent.state.get())}")
|
raise NidaqError(f"Device {self.parent.name} has not been reached in state STANDBY, current state {NidaqState(self.parent.state.get())}")
|
||||||
self.parent.scan_type.set(ScanType.TRIGGERED).wait()
|
self.parent.scan_type.set(ScanType.TRIGGERED).wait()
|
||||||
# TODO, not working with scan_duration 0 for the moment to be fixed once solved
|
self.parent.scan_duration.set(0).wait()
|
||||||
self.parent.scan_duration.set(999999).wait()
|
|
||||||
self.parent.stage_call.set(1).wait()
|
self.parent.stage_call.set(1).wait()
|
||||||
if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.STAGE)],
|
if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.STAGE)],
|
||||||
timeout = self.timeout_wait_for_signal,
|
timeout = self.timeout_wait_for_signal,
|
||||||
@@ -162,6 +156,9 @@ class NIDAQ(PSIDetectorBase):
|
|||||||
parent (Device) : Parent clas
|
parent (Device) : Parent clas
|
||||||
device_manager : device manager as forwarded by BEC
|
device_manager : device manager as forwarded by BEC
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
USER_ACCESS = ['set_config']
|
||||||
|
|
||||||
encoder_angle = Cpt(SetableSignal,value=0, kind=Kind.normal)
|
encoder_angle = Cpt(SetableSignal,value=0, kind=Kind.normal)
|
||||||
signal_1 = Cpt(SetableSignal,value=0, kind=Kind.normal)
|
signal_1 = Cpt(SetableSignal,value=0, kind=Kind.normal)
|
||||||
signal_2 = Cpt(SetableSignal,value=0, kind=Kind.normal)
|
signal_2 = Cpt(SetableSignal,value=0, kind=Kind.normal)
|
||||||
@@ -186,11 +183,122 @@ class NIDAQ(PSIDetectorBase):
|
|||||||
encoder_type = Cpt(EpicsSignal, suffix="NIDAQ-EncoderType", kind=Kind.config)
|
encoder_type = Cpt(EpicsSignal, suffix="NIDAQ-EncoderType", kind=Kind.config)
|
||||||
stop_call = Cpt(EpicsSignal, suffix="NIDAQ-Stop", kind=Kind.config)
|
stop_call = Cpt(EpicsSignal, suffix="NIDAQ-Stop", kind=Kind.config)
|
||||||
|
|
||||||
# ai_channels_6396 = # To be added NIDAQ-AIChans
|
ai_chans = Cpt(EpicsSignal, suffix="NIDAQ-AIChans", kind=Kind.config)
|
||||||
# ci_channels_6396 = # NIDAQ-CIChans6396
|
ci_chans = Cpt(EpicsSignal, suffix="NIDAQ-CIChans6614", kind=Kind.config)
|
||||||
# ci_channels_6614 = # NIDAQ-CIChans6614
|
di_chans = Cpt(EpicsSignal, suffix="NIDAQ-DIChans", kind=Kind.config)
|
||||||
|
|
||||||
custom_prepare_cls = NIDAQCustomMixin
|
custom_prepare_cls = NIDAQCustomMixin
|
||||||
|
|
||||||
def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs):
|
def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs):
|
||||||
super().__init__(prefix, name=name, kind=kind, parent=parent, device_manager=device_manager, **kwargs)
|
super().__init__(prefix, name=name, kind=kind, parent=parent, device_manager=device_manager, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
def set_config(
|
||||||
|
self,
|
||||||
|
sampling_rate: Literal[100000,
|
||||||
|
500000,
|
||||||
|
1000000,
|
||||||
|
2000000,
|
||||||
|
4000000,
|
||||||
|
5000000,
|
||||||
|
10000000,
|
||||||
|
14286000,
|
||||||
|
],
|
||||||
|
ai: list,
|
||||||
|
ci: list,
|
||||||
|
di: list,
|
||||||
|
scan_type: Literal['continuous', 'triggered'] = 'triggered',
|
||||||
|
scan_duration: float = 0,
|
||||||
|
readout_range: Literal[1, 2, 5, 10] = 10,
|
||||||
|
encoder_type: Literal['X_1', 'X_2', 'X_4'] = 'X_4',
|
||||||
|
enable_compression: bool = True,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
"""Method to configure the NIDAQ
|
||||||
|
|
||||||
|
Args:
|
||||||
|
sampling_rate(Literal[100000, 500000, 1000000, 2000000, 4000000, 5000000,
|
||||||
|
10000000, 14286000]): Sampling rate in Hz
|
||||||
|
ai(list): List of analog input channel numbers to add, i.e. [0, 1, 2] for
|
||||||
|
input 0, 1 and 2
|
||||||
|
ci(list): List of counter input channel numbers to add, i.e. [0, 1, 2] for
|
||||||
|
input 0, 1 and 2
|
||||||
|
di(list): List of digital input channel numbers to add, i.e. [0, 1, 2] for
|
||||||
|
input 0, 1 and 2
|
||||||
|
scan_type(Literal['continuous', 'triggered']): Triggered to use with monochromator,
|
||||||
|
otherwise continuous, default 'triggered'
|
||||||
|
scan_duration(float): Scan duration in seconds, use 0 for infinite scan, default 0
|
||||||
|
readout_range(Literal[1, 2, 5, 10]): Readout range in +- Volts, default +-10V
|
||||||
|
encoder_type(Literal['X_1', 'X_2', 'X_4']): Encoder readout type, default 'X_4'
|
||||||
|
enable_compression(bool): Enable or disable compression of data, default True
|
||||||
|
|
||||||
|
"""
|
||||||
|
if sampling_rate == 100000:
|
||||||
|
self.sampling_rate.put(ScanRates.HUNDRED_KHZ)
|
||||||
|
elif sampling_rate == 500000:
|
||||||
|
self.sampling_rate.put(ScanRates.FIVE_HUNDRED_KHZ)
|
||||||
|
elif sampling_rate == 1000000:
|
||||||
|
self.sampling_rate.put(ScanRates.ONE_MHZ)
|
||||||
|
elif sampling_rate == 2000000:
|
||||||
|
self.sampling_rate.put(ScanRates.TWO_MHZ)
|
||||||
|
elif sampling_rate == 4000000:
|
||||||
|
self.sampling_rate.put(ScanRates.FOUR_MHZ)
|
||||||
|
elif sampling_rate == 5000000:
|
||||||
|
self.sampling_rate.put(ScanRates.FIVE_MHZ)
|
||||||
|
elif sampling_rate == 10000000:
|
||||||
|
self.sampling_rate.put(ScanRates.TEN_MHZ)
|
||||||
|
elif sampling_rate == 14286000:
|
||||||
|
self.sampling_rate.put(ScanRates.FOURTEEN_THREE_MHZ)
|
||||||
|
|
||||||
|
ai_chans = 0
|
||||||
|
if isinstance(ai, list):
|
||||||
|
for ch in ai:
|
||||||
|
if isinstance(ch, int):
|
||||||
|
if ch >= 0 and ch <= 7:
|
||||||
|
ai_chans = ai_chans | (1 << ch)
|
||||||
|
self.ai_chans.put(ai_chans)
|
||||||
|
|
||||||
|
ci_chans = 0
|
||||||
|
if isinstance(ci, list):
|
||||||
|
for ch in ci:
|
||||||
|
if isinstance(ch, int):
|
||||||
|
if ch >= 0 and ch <= 7:
|
||||||
|
ci_chans = ci_chans | (1 << ch)
|
||||||
|
self.ci_chans.put(ci_chans)
|
||||||
|
|
||||||
|
di_chans = 0
|
||||||
|
if isinstance(di, list):
|
||||||
|
for ch in di:
|
||||||
|
if isinstance(ch, int):
|
||||||
|
if ch >= 0 and ch <= 4:
|
||||||
|
di_chans = di_chans | (1 << ch)
|
||||||
|
self.di_chans.put(di_chans)
|
||||||
|
|
||||||
|
if scan_type in 'continuous':
|
||||||
|
self.scan_type.put(ScanType.CONTINUOUS)
|
||||||
|
elif scan_type in 'triggered':
|
||||||
|
self.scan_type.put(ScanType.TRIGGERED)
|
||||||
|
|
||||||
|
if scan_duration >= 0:
|
||||||
|
self.scan_duration.put(scan_duration)
|
||||||
|
|
||||||
|
if readout_range == 1:
|
||||||
|
self.readout_range.put(ReadoutRange.ONE_V)
|
||||||
|
elif readout_range == 2:
|
||||||
|
self.readout_range.put(ReadoutRange.TWO_V)
|
||||||
|
elif readout_range == 5:
|
||||||
|
self.readout_range.put(ReadoutRange.FIVE_V)
|
||||||
|
elif readout_range == 10:
|
||||||
|
self.readout_range.put(ReadoutRange.TEN_V)
|
||||||
|
|
||||||
|
if encoder_type in 'X_1':
|
||||||
|
self.encoder_type.put(EncoderTypes.X_1)
|
||||||
|
elif encoder_type in 'X_2':
|
||||||
|
self.encoder_type.put(EncoderTypes.X_2)
|
||||||
|
elif encoder_type in 'X_4':
|
||||||
|
self.encoder_type.put(EncoderTypes.X_4)
|
||||||
|
|
||||||
|
if enable_compression is True:
|
||||||
|
self.enable_compression.put(NIDAQCompression.ON)
|
||||||
|
elif enable_compression is False:
|
||||||
|
self.enable_compression.put(NIDAQCompression.OFF)
|
||||||
|
|||||||
Reference in New Issue
Block a user