From 359ef0b6d789ab173c36094a4afdaab3c19ab397 Mon Sep 17 00:00:00 2001 From: appel_c Date: Fri, 22 May 2026 10:10:20 +0200 Subject: [PATCH] refactor(nidaq): migrate nidaq to scans v4 --- debye_bec/devices/nidaq/nidaq.py | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/debye_bec/devices/nidaq/nidaq.py b/debye_bec/devices/nidaq/nidaq.py index b403570..b7882a1 100644 --- a/debye_bec/devices/nidaq/nidaq.py +++ b/debye_bec/devices/nidaq/nidaq.py @@ -3,6 +3,7 @@ from __future__ import annotations from typing import TYPE_CHECKING, Literal from bec_lib.logger import bec_logger +from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo from ophyd import Component as Cpt from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase from ophyd.status import WaitTimeoutError @@ -18,6 +19,7 @@ from debye_bec.devices.nidaq.nidaq_enums import ( ScanRates, ScanType, ) +from debye_bec.devices.utils.utils import fetch_scan_info if TYPE_CHECKING: # pragma: no cover from bec_lib.devicemanager import ScanInfo @@ -219,7 +221,7 @@ class Nidaq(PSIDeviceBase, NidaqControl): def __init__(self, prefix: str = "", *, name: str, scan_info: ScanInfo = None, **kwargs): super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs) - self.scan_info: ScanInfo + self.scan_parameters: ScanServerScanInfo = None self.timeout_wait_for_signal = 5 # put 5s firsts self._timeout_wait_for_pv = ( 5 # 5s timeout for pv calls. editted due to timeout issues persisting @@ -236,10 +238,9 @@ class Nidaq(PSIDeviceBase, NidaqControl): # Beamline Methods # ######################################## - def _check_if_scan_name_is_valid(self) -> bool: + def _check_if_scan_name_is_valid(self, scan_parameters: ScanServerScanInfo) -> bool: """Check if the scan is within the list of scans for which the backend is working""" - scan_name = self.scan_info.msg.scan_name - if scan_name in self.valid_scan_names: + if scan_parameters.scan_name in self.valid_scan_names: return True return False @@ -396,7 +397,8 @@ class Nidaq(PSIDeviceBase, NidaqControl): Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object. If the upcoming scan is not in the list of valid scans, return immediately. """ - if not self._check_if_scan_name_is_valid(): + self.scan_parameters = fetch_scan_info(self.scan_info) + if not self._check_if_scan_name_is_valid(self.scan_parameters): return None if self.state.get() != NidaqState.STANDBY: @@ -406,17 +408,17 @@ class Nidaq(PSIDeviceBase, NidaqControl): status.wait(timeout=self.timeout_wait_for_signal) # If scan is not part of the valid_scan_names, - if self.scan_info.msg.scan_name != "nidaq_continuous_scan": # what is the new v4 scan + if self.scan_parameters.scan_name != "nidaq_continuous_scan": # what is the new v4 scan self.scan_type.set(ScanType.TRIGGERED).wait(timeout=self._timeout_wait_for_pv) self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv) self.enable_compression.set(1).wait(timeout=self._timeout_wait_for_pv) else: self.scan_type.set(ScanType.CONTINUOUS).wait(timeout=self._timeout_wait_for_pv) self.scan_duration.set( - self.scan_info.msg.additional_scan_parameters["scan_duration"] + self.scan_parameters.additional_scan_parameters["scan_duration"] ).wait(timeout=self._timeout_wait_for_pv) self.enable_compression.set( - self.scan_info.msg.additional_scan_parameters["compression"] + self.scan_parameters.additional_scan_parameters["compression"] ).wait(timeout=self._timeout_wait_for_pv) # Stage call to IOC @@ -428,7 +430,7 @@ class Nidaq(PSIDeviceBase, NidaqControl): # self.stage_call.set(1).wait(timeout=self._timeout_wait_for_pv) self.stage_call.put(1) status.wait(timeout=self.timeout_wait_for_signal) - if self.scan_info.msg.scan_name != "nidaq_continuous_scan": + if self.scan_parameters.scan_name != "nidaq_continuous_scan": status = self.on_kickoff() self.cancel_on_stop(status) status.wait(timeout=self._timeout_wait_for_pv) @@ -459,10 +461,10 @@ class Nidaq(PSIDeviceBase, NidaqControl): before the motor starts its oscillation. This is needed for being properly homed. The NIDAQ should go into Acquiring mode. """ - if not self._check_if_scan_name_is_valid(): + if not self._check_if_scan_name_is_valid(self.scan_parameters): return None - if self.scan_info.msg.scan_name == "nidaq_continuous_scan": + if self.scan_parameters.scan_name == "nidaq_continuous_scan": logger.info(f"Device {self.name} ready to be kicked off for nidaq_continuous_scan") return None @@ -483,12 +485,12 @@ class Nidaq(PSIDeviceBase, NidaqControl): For the NIDAQ we use this method to stop the backend since it would not stop by itself in its current implementation since the number of points are not predefined. """ - if not self._check_if_scan_name_is_valid(): + if not self._check_if_scan_name_is_valid(self.scan_parameters): return None status = CompareStatus(self.state, NidaqState.STANDBY) self.cancel_on_stop(status) - if self.scan_info.msg.scan_name != "nidaq_continuous_scan": + if self.scan_parameters.scan_name != "nidaq_continuous_scan": self.on_stop() return status @@ -499,7 +501,9 @@ class Nidaq(PSIDeviceBase, NidaqControl): Args: value (int) : current progress value """ - scan_duration = self.scan_info.msg.additional_scan_parameters.get("scan_duration", None) + if self.scan_parameters is None: + return + scan_duration = self.scan_parameters.additional_scan_parameters.get("scan_duration", None) if not isinstance(scan_duration, (int, float)): return value = scan_duration - value