Feat/introduce nidaq #48
@@ -17,3 +17,13 @@ dummy_pv:
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
nidaq:
|
||||
readoutPriority: async
|
||||
description: NIDAQ backend for data reading for debye scans
|
||||
deviceClass: debye_bec.devices.nidaq.NIDAQ
|
||||
deviceConfig:
|
||||
prefix: "X01DA-PC-SCANSERVER:"
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
softwareTrigger: false
|
||||
|
||||
|
||||
304
debye_bec/devices/nidaq.py
Normal file
304
debye_bec/devices/nidaq.py
Normal file
@@ -0,0 +1,304 @@
|
||||
import enum
|
||||
|
||||
from typing import Literal
|
||||
|
||||
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase, CustomDetectorMixin
|
||||
from ophyd import Device, Kind, DeviceStatus, Component as Cpt
|
||||
from ophyd import EpicsSignal, EpicsSignalRO
|
||||
from ophyd_devices.sim.sim_signals import SetableSignal
|
||||
from bec_lib.logger import bec_logger
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
class NidaqError(Exception):
|
||||
""" Nidaq specific error"""
|
||||
|
||||
class NIDAQCompression(str, enum.Enum):
|
||||
""" Options for Compression"""
|
||||
OFF = 0
|
||||
ON = 1
|
||||
|
||||
class ScanType(int, enum.Enum):
|
||||
""" Triggering options of the backend"""
|
||||
TRIGGERED = 0
|
||||
CONTINUOUS = 1
|
||||
|
||||
class NidaqState(int, enum.Enum):
|
||||
""" Possible States of the NIDAQ backend"""
|
||||
DISABLED = 0
|
||||
STANDBY = 1
|
||||
STAGE = 2
|
||||
KICKOFF = 3
|
||||
ACQUIRE = 4
|
||||
UNSTAGE = 5
|
||||
|
||||
class ScanRates(int, enum.Enum):
|
||||
""" Sampling Rate options for the backend, in kHZ and MHz"""
|
||||
HUNDRED_KHZ = 0
|
||||
FIVE_HUNDRED_KHZ = 1
|
||||
ONE_MHZ = 2
|
||||
TWO_MHZ = 3
|
||||
FOUR_MHZ = 4
|
||||
FIVE_MHZ = 5
|
||||
TEN_MHZ = 6
|
||||
FOURTEEN_THREE_MHZ = 7
|
||||
|
||||
class ReadoutRange(int, enum.Enum):
|
||||
"""ReadoutRange in +-V"""
|
||||
ONE_V = 0
|
||||
TWO_V = 1
|
||||
FIVE_V = 2
|
||||
TEN_V = 3
|
||||
|
||||
class EncoderTypes(int, enum.Enum):
|
||||
""" Encoder Types"""
|
||||
X_1 = 0
|
||||
X_2 = 1
|
||||
X_4 = 2
|
||||
|
||||
class NIDAQCustomMixin(CustomDetectorMixin):
|
||||
""" NIDAQ Custom Mixin class to implement the device and beamline-specific actions
|
||||
to the psidetectorbase class via custom_prepare methods"""
|
||||
|
||||
def __init__(self, *_args, parent: Device = None, **_kwargs) -> None:
|
||||
super().__init__(args=_args, parent=parent, kwargs=_kwargs)
|
||||
self.timeout_wait_for_signal = 5 # put 5s firsts
|
||||
self.valid_scan_names = ["xas_simple_scan",
|
||||
"xas_simple_scan_with_xrd",
|
||||
"xas_advanced_scan",
|
||||
"xas_advanced_scan_with_xrd"]
|
||||
|
||||
def _check_if_scan_name_is_valid(self) -> bool:
|
||||
""" Check if the scan is within the list of scans for which the backend is working"""
|
||||
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
|
||||
if scan_name in self.valid_scan_names:
|
||||
return True
|
||||
return False
|
||||
|
||||
def on_connection_established(self) -> None:
|
||||
"""Method called once wait_for_connection is called on the parent class.
|
||||
This should be used to implement checks that require the device to be connected, i.e. setting standard pvs.
|
||||
"""
|
||||
if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.STANDBY)],
|
||||
timeout = self.timeout_wait_for_signal,
|
||||
check_stopped=True):
|
||||
raise NidaqError(f"Device {self.parent.name} has not been reached in state STANDBY, current state {NidaqState(self.parent.state.get())}")
|
||||
self.parent.scan_duration.set(0).wait()
|
||||
|
||||
def on_stop(self):
|
||||
""" Stop the NIDAQ backend"""
|
||||
self.parent.stop_call.set(1).wait()
|
||||
|
||||
def on_complete(self) -> None | DeviceStatus:
|
||||
""" Complete actions. For the NIDAQ we use this method to stop the backend since it
|
||||
would not stop by itself in its current implementation since the number of points are not predefined.
|
||||
"""
|
||||
if not self._check_if_scan_name_is_valid():
|
||||
return None
|
||||
self.on_stop()
|
||||
#TODO check if this wait can be removed. We are waiting in unstage anyways which will always be called afterwards
|
||||
# Wait for device to be stopped
|
||||
status = self.wait_with_status(signal_conditions=[(self.parent.state.get, NidaqState.STANDBY)],
|
||||
check_stopped= True,
|
||||
timeout=self.timeout_wait_for_signal,
|
||||
)
|
||||
return status
|
||||
|
||||
def on_stage(self):
|
||||
""" Prepare the device for the upcoming acquisition. If the upcoming scan is not in the list
|
||||
of valid scans, return immediately. """
|
||||
if not self._check_if_scan_name_is_valid():
|
||||
return None
|
||||
if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.STANDBY)],
|
||||
timeout = self.timeout_wait_for_signal,
|
||||
check_stopped=True):
|
||||
raise NidaqError(f"Device {self.parent.name} has not been reached in state STANDBY, current state {NidaqState(self.parent.state.get())}")
|
||||
self.parent.scan_type.set(ScanType.TRIGGERED).wait()
|
||||
self.parent.scan_duration.set(0).wait()
|
||||
self.parent.stage_call.set(1).wait()
|
||||
if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.STAGE)],
|
||||
timeout = self.timeout_wait_for_signal,
|
||||
check_stopped=True,
|
||||
):
|
||||
raise NidaqError(f"Device {self.parent.name} has not been reached in state STAGE, current state {NidaqState(self.parent.state.get())}")
|
||||
self.parent.kickoff_call.set(1).wait()
|
||||
logger.info(f"Device {self.parent.name} was staged: {NidaqState(self.parent.state.get())}")
|
||||
|
||||
def on_pre_scan(self) -> None:
|
||||
""" Execute time critical actions. Here we ensure that the NIDAQ master task is running
|
||||
before the motor starts its oscillation. This is needed for being properly homed.
|
||||
The NIDAQ should go into Acquiring mode. """
|
||||
if not self._check_if_scan_name_is_valid():
|
||||
return None
|
||||
if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.KICKOFF)],
|
||||
timeout = self.timeout_wait_for_signal,
|
||||
check_stopped=True,
|
||||
):
|
||||
raise NidaqError(f"Device {self.parent.name} failed to reach state KICKOFF during pre scan, current state {NidaqState(self.parent.state.get())}")
|
||||
logger.info(f"Device {self.parent.name} ready to take data after pre_scan: {NidaqState(self.parent.state.get())}")
|
||||
|
||||
def on_unstage(self) -> None:
|
||||
""" Unstage actions, the NIDAQ has to be in STANDBY state."""
|
||||
if not self.wait_for_signals(signal_conditions=[(self.parent.state.get, NidaqState.STANDBY)],
|
||||
timeout = self.timeout_wait_for_signal,
|
||||
check_stopped=False):
|
||||
raise NidaqError(f"Device {self.parent.name} has not been reached in state STANDBY, current state {NidaqState(self.parent.state.get())}")
|
||||
logger.info(f"Device {self.parent.name} was unstaged: {NidaqState(self.parent.state.get())}")
|
||||
|
||||
|
||||
class NIDAQ(PSIDetectorBase):
|
||||
""" NIDAQ ophyd wrapper around the NIDAQ backend currently running at x01da-cons-05
|
||||
|
||||
Args:
|
||||
prefix (str) : Prefix to the NIDAQ soft ioc, currently X01DA-PC-SCANSERVER:
|
||||
name (str) : Name of the device
|
||||
kind (Kind) : Ophyd Kind of the device
|
||||
parent (Device) : Parent clas
|
||||
device_manager : device manager as forwarded by BEC
|
||||
"""
|
||||
|
||||
USER_ACCESS = ['set_config']
|
||||
|
||||
encoder_angle = Cpt(SetableSignal,value=0, kind=Kind.normal)
|
||||
signal_1 = Cpt(SetableSignal,value=0, kind=Kind.normal)
|
||||
signal_2 = Cpt(SetableSignal,value=0, kind=Kind.normal)
|
||||
signal_3 = Cpt(SetableSignal,value=0, kind=Kind.normal)
|
||||
signal_4 = Cpt(SetableSignal,value=0, kind=Kind.normal)
|
||||
signal_5 = Cpt(SetableSignal,value=0, kind=Kind.normal)
|
||||
signal_6 = Cpt(SetableSignal,value=0, kind=Kind.normal)
|
||||
signal_7 = Cpt(SetableSignal,value=0, kind=Kind.normal)
|
||||
signal_8 = Cpt(SetableSignal,value=0, kind=Kind.normal)
|
||||
|
||||
|
||||
enable_compression = Cpt(EpicsSignal, suffix="NIDAQ-EnableRLE", kind=Kind.config)
|
||||
kickoff_call = Cpt(EpicsSignal, suffix="NIDAQ-Kickoff", kind=Kind.config)
|
||||
stage_call = Cpt(EpicsSignal, suffix="NIDAQ-Stage", kind = Kind.config)
|
||||
state = Cpt(EpicsSignal, suffix="NIDAQ-FSMState", kind= Kind.config)
|
||||
server_status = Cpt(EpicsSignalRO, suffix="NIDAQ-ServerStatus", kind=Kind.config)
|
||||
compression_ratio = Cpt(EpicsSignalRO, suffix="NIDAQ-CompressionRatio", kind=Kind.config)
|
||||
scan_type = Cpt(EpicsSignal, suffix="NIDAQ-ScanType", kind=Kind.config)
|
||||
sampling_rate = Cpt(EpicsSignal, suffix="NIDAQ-SamplingRateRequested", kind=Kind.config)
|
||||
scan_duration = Cpt(EpicsSignal, suffix="NIDAQ-SamplingDuration", kind=Kind.config)
|
||||
readout_range = Cpt(EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config)
|
||||
encoder_type = Cpt(EpicsSignal, suffix="NIDAQ-EncoderType", kind=Kind.config)
|
||||
stop_call = Cpt(EpicsSignal, suffix="NIDAQ-Stop", kind=Kind.config)
|
||||
|
||||
ai_chans = Cpt(EpicsSignal, suffix="NIDAQ-AIChans", kind=Kind.config)
|
||||
ci_chans = Cpt(EpicsSignal, suffix="NIDAQ-CIChans6614", kind=Kind.config)
|
||||
di_chans = Cpt(EpicsSignal, suffix="NIDAQ-DIChans", kind=Kind.config)
|
||||
|
||||
custom_prepare_cls = NIDAQCustomMixin
|
||||
|
||||
def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs):
|
||||
super().__init__(prefix, name=name, kind=kind, parent=parent, device_manager=device_manager, **kwargs)
|
||||
|
||||
|
||||
def set_config(
|
||||
self,
|
||||
sampling_rate: Literal[100000,
|
||||
500000,
|
||||
1000000,
|
||||
2000000,
|
||||
4000000,
|
||||
5000000,
|
||||
10000000,
|
||||
14286000,
|
||||
],
|
||||
ai: list,
|
||||
ci: list,
|
||||
di: list,
|
||||
scan_type: Literal['continuous', 'triggered'] = 'triggered',
|
||||
scan_duration: float = 0,
|
||||
readout_range: Literal[1, 2, 5, 10] = 10,
|
||||
encoder_type: Literal['X_1', 'X_2', 'X_4'] = 'X_4',
|
||||
enable_compression: bool = True,
|
||||
|
||||
) -> None:
|
||||
"""Method to configure the NIDAQ
|
||||
|
||||
Args:
|
||||
sampling_rate(Literal[100000, 500000, 1000000, 2000000, 4000000, 5000000,
|
||||
10000000, 14286000]): Sampling rate in Hz
|
||||
ai(list): List of analog input channel numbers to add, i.e. [0, 1, 2] for
|
||||
input 0, 1 and 2
|
||||
ci(list): List of counter input channel numbers to add, i.e. [0, 1, 2] for
|
||||
input 0, 1 and 2
|
||||
di(list): List of digital input channel numbers to add, i.e. [0, 1, 2] for
|
||||
input 0, 1 and 2
|
||||
scan_type(Literal['continuous', 'triggered']): Triggered to use with monochromator,
|
||||
otherwise continuous, default 'triggered'
|
||||
scan_duration(float): Scan duration in seconds, use 0 for infinite scan, default 0
|
||||
readout_range(Literal[1, 2, 5, 10]): Readout range in +- Volts, default +-10V
|
||||
encoder_type(Literal['X_1', 'X_2', 'X_4']): Encoder readout type, default 'X_4'
|
||||
enable_compression(bool): Enable or disable compression of data, default True
|
||||
|
||||
"""
|
||||
if sampling_rate == 100000:
|
||||
self.sampling_rate.put(ScanRates.HUNDRED_KHZ)
|
||||
elif sampling_rate == 500000:
|
||||
self.sampling_rate.put(ScanRates.FIVE_HUNDRED_KHZ)
|
||||
elif sampling_rate == 1000000:
|
||||
self.sampling_rate.put(ScanRates.ONE_MHZ)
|
||||
elif sampling_rate == 2000000:
|
||||
self.sampling_rate.put(ScanRates.TWO_MHZ)
|
||||
elif sampling_rate == 4000000:
|
||||
self.sampling_rate.put(ScanRates.FOUR_MHZ)
|
||||
elif sampling_rate == 5000000:
|
||||
self.sampling_rate.put(ScanRates.FIVE_MHZ)
|
||||
elif sampling_rate == 10000000:
|
||||
self.sampling_rate.put(ScanRates.TEN_MHZ)
|
||||
elif sampling_rate == 14286000:
|
||||
self.sampling_rate.put(ScanRates.FOURTEEN_THREE_MHZ)
|
||||
|
||||
ai_chans = 0
|
||||
if isinstance(ai, list):
|
||||
for ch in ai:
|
||||
if isinstance(ch, int):
|
||||
if ch >= 0 and ch <= 7:
|
||||
ai_chans = ai_chans | (1 << ch)
|
||||
self.ai_chans.put(ai_chans)
|
||||
|
||||
ci_chans = 0
|
||||
if isinstance(ci, list):
|
||||
for ch in ci:
|
||||
if isinstance(ch, int):
|
||||
if ch >= 0 and ch <= 7:
|
||||
ci_chans = ci_chans | (1 << ch)
|
||||
self.ci_chans.put(ci_chans)
|
||||
|
||||
di_chans = 0
|
||||
if isinstance(di, list):
|
||||
for ch in di:
|
||||
if isinstance(ch, int):
|
||||
if ch >= 0 and ch <= 4:
|
||||
di_chans = di_chans | (1 << ch)
|
||||
self.di_chans.put(di_chans)
|
||||
|
||||
if scan_type in 'continuous':
|
||||
self.scan_type.put(ScanType.CONTINUOUS)
|
||||
elif scan_type in 'triggered':
|
||||
self.scan_type.put(ScanType.TRIGGERED)
|
||||
|
||||
if scan_duration >= 0:
|
||||
self.scan_duration.put(scan_duration)
|
||||
|
||||
if readout_range == 1:
|
||||
self.readout_range.put(ReadoutRange.ONE_V)
|
||||
elif readout_range == 2:
|
||||
self.readout_range.put(ReadoutRange.TWO_V)
|
||||
elif readout_range == 5:
|
||||
self.readout_range.put(ReadoutRange.FIVE_V)
|
||||
elif readout_range == 10:
|
||||
self.readout_range.put(ReadoutRange.TEN_V)
|
||||
|
||||
if encoder_type in 'X_1':
|
||||
self.encoder_type.put(EncoderTypes.X_1)
|
||||
elif encoder_type in 'X_2':
|
||||
self.encoder_type.put(EncoderTypes.X_2)
|
||||
elif encoder_type in 'X_4':
|
||||
self.encoder_type.put(EncoderTypes.X_4)
|
||||
|
||||
if enable_compression is True:
|
||||
self.enable_compression.put(NIDAQCompression.ON)
|
||||
elif enable_compression is False:
|
||||
self.enable_compression.put(NIDAQCompression.OFF)
|
||||
@@ -1,11 +1,14 @@
|
||||
""" This module contains the scan classes for the mono bragg motor of the Debye beamline."""
|
||||
"""This module contains the scan classes for the mono bragg motor of the Debye beamline."""
|
||||
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
from bec_lib.device import DeviceBase
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_server.scan_server.scans import AsyncFlyScanBase
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class XASSimpleScan(AsyncFlyScanBase):
|
||||
"""Class for the XAS simple scan"""
|
||||
@@ -71,7 +74,7 @@ class XASSimpleScan(AsyncFlyScanBase):
|
||||
|
||||
self._check_limits()
|
||||
# Ensure parent class pre_scan actions to be called.
|
||||
super().pre_scan()
|
||||
yield from super().pre_scan()
|
||||
|
||||
def scan_report_instructions(self):
|
||||
"""
|
||||
|
||||
@@ -12,12 +12,12 @@ classifiers = [
|
||||
"Programming Language :: Python :: 3",
|
||||
"Topic :: Scientific/Engineering",
|
||||
]
|
||||
dependencies = ["numpy", "scipy", "bec_lib", "h5py", "ophyd_devices"]
|
||||
dependencies = ["numpy ~= 1.24", "scipy", "bec_lib", "h5py", "ophyd_devices"]
|
||||
|
||||
[project.optional-dependencies]
|
||||
dev = [
|
||||
"bec_server",
|
||||
"black",
|
||||
"black ~= 24.0",
|
||||
"isort",
|
||||
"coverage",
|
||||
"pylint",
|
||||
|
||||
@@ -102,6 +102,12 @@ def test_xas_simple_scan(scan_assembler, ScanStubStatusMock):
|
||||
"kwargs": {},
|
||||
},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
|
||||
action="pre_scan",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device="mo1_bragg",
|
||||
@@ -207,6 +213,12 @@ def test_xas_simple_scan_with_xrd(scan_assembler, ScanStubStatusMock):
|
||||
"kwargs": {},
|
||||
},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
|
||||
action="pre_scan",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device="mo1_bragg",
|
||||
@@ -306,6 +318,12 @@ def test_xas_advanced_scan(scan_assembler, ScanStubStatusMock):
|
||||
"kwargs": {},
|
||||
},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
|
||||
action="pre_scan",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device="mo1_bragg",
|
||||
@@ -413,6 +431,12 @@ def test_xas_advanced_scan_with_xrd(scan_assembler, ScanStubStatusMock):
|
||||
"kwargs": {},
|
||||
},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
|
||||
action="pre_scan",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device="mo1_bragg",
|
||||
|
||||
Reference in New Issue
Block a user