refactor(nidaq): migrate nidaq to scans v4

This commit is contained in:
2026-05-22 10:10:20 +02:00
parent 262a0b6318
commit 359ef0b6d7
+18 -14
View File
@@ -3,6 +3,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Literal
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase
from ophyd.status import WaitTimeoutError
@@ -18,6 +19,7 @@ from debye_bec.devices.nidaq.nidaq_enums import (
ScanRates,
ScanType,
)
from debye_bec.devices.utils.utils import fetch_scan_info
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
@@ -219,7 +221,7 @@ class Nidaq(PSIDeviceBase, NidaqControl):
def __init__(self, prefix: str = "", *, name: str, scan_info: ScanInfo = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.scan_info: ScanInfo
self.scan_parameters: ScanServerScanInfo = None
self.timeout_wait_for_signal = 5 # put 5s firsts
self._timeout_wait_for_pv = (
5 # 5s timeout for pv calls. editted due to timeout issues persisting
@@ -236,10 +238,9 @@ class Nidaq(PSIDeviceBase, NidaqControl):
# Beamline Methods #
########################################
def _check_if_scan_name_is_valid(self) -> bool:
def _check_if_scan_name_is_valid(self, scan_parameters: ScanServerScanInfo) -> bool:
"""Check if the scan is within the list of scans for which the backend is working"""
scan_name = self.scan_info.msg.scan_name
if scan_name in self.valid_scan_names:
if scan_parameters.scan_name in self.valid_scan_names:
return True
return False
@@ -396,7 +397,8 @@ class Nidaq(PSIDeviceBase, NidaqControl):
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
If the upcoming scan is not in the list of valid scans, return immediately.
"""
if not self._check_if_scan_name_is_valid():
self.scan_parameters = fetch_scan_info(self.scan_info)
if not self._check_if_scan_name_is_valid(self.scan_parameters):
return None
if self.state.get() != NidaqState.STANDBY:
@@ -406,17 +408,17 @@ class Nidaq(PSIDeviceBase, NidaqControl):
status.wait(timeout=self.timeout_wait_for_signal)
# If scan is not part of the valid_scan_names,
if self.scan_info.msg.scan_name != "nidaq_continuous_scan": # what is the new v4 scan
if self.scan_parameters.scan_name != "nidaq_continuous_scan": # what is the new v4 scan
self.scan_type.set(ScanType.TRIGGERED).wait(timeout=self._timeout_wait_for_pv)
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
self.enable_compression.set(1).wait(timeout=self._timeout_wait_for_pv)
else:
self.scan_type.set(ScanType.CONTINUOUS).wait(timeout=self._timeout_wait_for_pv)
self.scan_duration.set(
self.scan_info.msg.additional_scan_parameters["scan_duration"]
self.scan_parameters.additional_scan_parameters["scan_duration"]
).wait(timeout=self._timeout_wait_for_pv)
self.enable_compression.set(
self.scan_info.msg.additional_scan_parameters["compression"]
self.scan_parameters.additional_scan_parameters["compression"]
).wait(timeout=self._timeout_wait_for_pv)
# Stage call to IOC
@@ -428,7 +430,7 @@ class Nidaq(PSIDeviceBase, NidaqControl):
# self.stage_call.set(1).wait(timeout=self._timeout_wait_for_pv)
self.stage_call.put(1)
status.wait(timeout=self.timeout_wait_for_signal)
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
if self.scan_parameters.scan_name != "nidaq_continuous_scan":
status = self.on_kickoff()
self.cancel_on_stop(status)
status.wait(timeout=self._timeout_wait_for_pv)
@@ -459,10 +461,10 @@ class Nidaq(PSIDeviceBase, NidaqControl):
before the motor starts its oscillation. This is needed for being properly homed.
The NIDAQ should go into Acquiring mode.
"""
if not self._check_if_scan_name_is_valid():
if not self._check_if_scan_name_is_valid(self.scan_parameters):
return None
if self.scan_info.msg.scan_name == "nidaq_continuous_scan":
if self.scan_parameters.scan_name == "nidaq_continuous_scan":
logger.info(f"Device {self.name} ready to be kicked off for nidaq_continuous_scan")
return None
@@ -483,12 +485,12 @@ class Nidaq(PSIDeviceBase, NidaqControl):
For the NIDAQ we use this method to stop the backend since it
would not stop by itself in its current implementation since the number of points are not predefined.
"""
if not self._check_if_scan_name_is_valid():
if not self._check_if_scan_name_is_valid(self.scan_parameters):
return None
status = CompareStatus(self.state, NidaqState.STANDBY)
self.cancel_on_stop(status)
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
if self.scan_parameters.scan_name != "nidaq_continuous_scan":
self.on_stop()
return status
@@ -499,7 +501,9 @@ class Nidaq(PSIDeviceBase, NidaqControl):
Args:
value (int) : current progress value
"""
scan_duration = self.scan_info.msg.additional_scan_parameters.get("scan_duration", None)
if self.scan_parameters is None:
return
scan_duration = self.scan_parameters.additional_scan_parameters.get("scan_duration", None)
if not isinstance(scan_duration, (int, float)):
return
value = scan_duration - value