From e52c4d1f85ecd4bc33667580f6bf490b50361ca4 Mon Sep 17 00:00:00 2001 From: appel_c Date: Thu, 28 May 2026 08:20:33 +0200 Subject: [PATCH 1/4] refactor: migrate xas_scans to v4 --- superxas_bec/devices/mo1_bragg/mo1_bragg.py | 248 +++++++---- superxas_bec/devices/nidaq/nidaq.py | 55 ++- superxas_bec/devices/utils/__init__.py | 2 + superxas_bec/devices/utils/utils.py | 26 ++ superxas_bec/scans/mono_bragg_scans.py | 420 ++++++------------ superxas_bec/scans/nidaq_cont_scan.py | 156 ++++--- .../test_device_scan_info_utils.py | 53 +++ .../test_mo1_bragg_v4_scan_info.py | 100 +++++ .../tests_devices/test_nidaq_v4_scan_info.py | 67 +++ tests/tests_scans/conftest.py | 7 + tests/tests_scans/test_mono_bragg_scans_v4.py | 121 +++++ .../test_nidaq_continuous_scan_v4.py | 76 ++++ 12 files changed, 907 insertions(+), 424 deletions(-) create mode 100644 superxas_bec/devices/utils/__init__.py create mode 100644 superxas_bec/devices/utils/utils.py create mode 100644 tests/tests_devices/test_device_scan_info_utils.py create mode 100644 tests/tests_devices/test_mo1_bragg_v4_scan_info.py create mode 100644 tests/tests_devices/test_nidaq_v4_scan_info.py create mode 100644 tests/tests_scans/conftest.py create mode 100644 tests/tests_scans/test_mono_bragg_scans_v4.py create mode 100644 tests/tests_scans/test_nidaq_continuous_scan_v4.py diff --git a/superxas_bec/devices/mo1_bragg/mo1_bragg.py b/superxas_bec/devices/mo1_bragg/mo1_bragg.py index e6311fc..89b1abf 100644 --- a/superxas_bec/devices/mo1_bragg/mo1_bragg.py +++ b/superxas_bec/devices/mo1_bragg/mo1_bragg.py @@ -12,13 +12,13 @@ from typing import Any, Literal from bec_lib.devicemanager import ScanInfo from bec_lib.logger import bec_logger +from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo from ophyd import Component as Cpt from ophyd import DeviceStatus, Signal, StatusBase from ophyd.status import SubscriptionStatus, WaitTimeoutError from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from ophyd_devices.utils.errors import DeviceStopError -from pydantic import BaseModel, Field from typeguard import typechecked from superxas_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner @@ -33,6 +33,7 @@ from superxas_bec.devices.mo1_bragg.mo1_bragg_enums import ( TriggerControlSource, ) from superxas_bec.devices.mo1_bragg.mo1_bragg_utils import compute_spline +from superxas_bec.devices.utils.utils import fetch_scan_info # Initialise logger logger = bec_logger.logger @@ -44,34 +45,6 @@ class Mo1BraggError(Exception): """Exception for the Mo1 Bragg positioner""" -########## Scan Parameter Model ########## - - -class ScanParameter(BaseModel): - """Dataclass to store the scan parameters for the Mo1 Bragg positioner. - This needs to be in sync with the kwargs of the MO1 Bragg scans from SuperXAS, to - ensure that the scan parameters are correctly set. Any changes in the scan kwargs, - i.e. renaming or adding new parameters, need to be represented here as well.""" - - scan_time: float | None = Field(None, description="Scan time for a half oscillation") - scan_duration: float | None = Field(None, description="Duration of the scan") - xrd_enable_low: bool | None = Field( - None, description="XRD enabled for low, should be PV trig_ena_lo_enum" - ) # trig_enable_low: bool = None - xrd_enable_high: bool | None = Field( - None, description="XRD enabled for high, should be PV trig_ena_hi_enum" - ) # trig_enable_high: bool = None - exp_time_low: float | None = Field(None, description="Exposure time low energy/angle") - exp_time_high: float | None = Field(None, description="Exposure time high energy/angle") - cycle_low: int | None = Field(None, description="Cycle for low energy/angle") - cycle_high: int | None = Field(None, description="Cycle for high energy/angle") - start: float | None = Field(None, description="Start value for energy/angle") - stop: float | None = Field(None, description="Stop value for energy/angle") - p_kink: float | None = Field(None, description="P Kink") - e_kink: float | None = Field(None, description="Energy Kink") - model_config: dict = {"validate_assignment": True} - - ########### Mo1 Bragg Motor Class ########### @@ -83,7 +56,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): progress_signal = Cpt(ProgressSignal, name="progress_signal") - USER_ACCESS = ["set_advanced_xas_settings", "set_xtal"] + USER_ACCESS = ["set_advanced_xas_settings", "set_xtal", "convert_angle_energy"] def __init__(self, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): # type: ignore """ @@ -94,8 +67,15 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): scan_info (ScanInfo): The scan info to use. """ super().__init__(name=name, scan_info=scan_info, prefix=prefix, **kwargs) - self.scan_parameter = ScanParameter() + self.scan_parameters: ScanServerScanInfo | None = None self.timeout_for_pvwait = 7.5 + self.valid_scan_names = [ + "xas_simple_scan", + "xas_simple_scan_with_xrd", + "xas_advanced_scan", + "xas_advanced_scan_with_xrd", + "nidaq_continuous_scan", + ] ######################################## # Beamline Specific Implementations # @@ -122,6 +102,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object. """ + self.scan_parameters = fetch_scan_info(self.scan_info) if self.scan_control.scan_msg.get() != ScanControlLoadMessage.PENDING: status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING) @@ -129,14 +110,22 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): self.scan_control.scan_val_reset.put(1) status.wait(timeout=self.timeout_for_pvwait) - scan_name = self.scan_info.msg.scan_name - self._update_scan_parameter() + scan_name = self.scan_parameters.scan_name + if not self._check_if_scan_name_is_valid(self.scan_parameters): + return None + start, stop = self._get_start_stop() + scan_time = self._scan_parameter("scan_time") + scan_duration = self._scan_parameter("scan_duration") if scan_name == "xas_simple_scan": + self._raise_for_missing( + scan_name, ["start", "stop", "scan_time", "scan_duration"], + [start, stop, scan_time, scan_duration], + ) self.set_xas_settings( - low=self.scan_parameter.start, - high=self.scan_parameter.stop, - scan_time=self.scan_parameter.scan_time, + low=start, + high=stop, + scan_time=scan_time, ) self.set_trig_settings( enable_low=False, @@ -147,32 +136,72 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): cycle_high=0, ) self.set_scan_control_settings( - mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration + mode=ScanControlMode.SIMPLE, scan_duration=scan_duration ) elif scan_name == "xas_simple_scan_with_xrd": + xrd_enable_low = self._scan_parameter("xrd_enable_low", "break_enable_low") + xrd_enable_high = self._scan_parameter("xrd_enable_high", "break_enable_high") + exp_time_low = self._scan_parameter("exp_time_low", "break_time_low") + exp_time_high = self._scan_parameter("exp_time_high", "break_time_high") + cycle_low = self._scan_parameter("cycle_low") + cycle_high = self._scan_parameter("cycle_high") + self._raise_for_missing( + scan_name, + [ + "start", + "stop", + "scan_time", + "scan_duration", + "xrd_enable_low", + "xrd_enable_high", + "exp_time_low", + "exp_time_high", + "cycle_low", + "cycle_high", + ], + [ + start, + stop, + scan_time, + scan_duration, + xrd_enable_low, + xrd_enable_high, + exp_time_low, + exp_time_high, + cycle_low, + cycle_high, + ], + ) self.set_xas_settings( - low=self.scan_parameter.start, - high=self.scan_parameter.stop, - scan_time=self.scan_parameter.scan_time, + low=start, + high=stop, + scan_time=scan_time, ) self.set_trig_settings( - enable_low=self.scan_parameter.xrd_enable_low, # enable_low=self.scan_parameter.trig_enable_low, - enable_high=self.scan_parameter.xrd_enable_high, # enable_high=self.scan_parameter.trig_enable_high, - exp_time_low=self.scan_parameter.exp_time_low, - exp_time_high=self.scan_parameter.exp_time_high, - cycle_low=self.scan_parameter.cycle_low, - cycle_high=self.scan_parameter.cycle_high, + enable_low=xrd_enable_low, + enable_high=xrd_enable_high, + exp_time_low=exp_time_low, + exp_time_high=exp_time_high, + cycle_low=cycle_low, + cycle_high=cycle_high, ) self.set_scan_control_settings( - mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration + mode=ScanControlMode.SIMPLE, scan_duration=scan_duration ) elif scan_name == "xas_advanced_scan": + p_kink = self._scan_parameter("p_kink") + e_kink = self._scan_parameter("e_kink") + self._raise_for_missing( + scan_name, + ["start", "stop", "scan_time", "scan_duration", "p_kink", "e_kink"], + [start, stop, scan_time, scan_duration, p_kink, e_kink], + ) self.set_advanced_xas_settings( - low=self.scan_parameter.start, - high=self.scan_parameter.stop, - scan_time=self.scan_parameter.scan_time, - p_kink=self.scan_parameter.p_kink, - e_kink=self.scan_parameter.e_kink, + low=start, + high=stop, + scan_time=scan_time, + p_kink=p_kink, + e_kink=e_kink, ) self.set_trig_settings( enable_low=False, @@ -183,26 +212,65 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): cycle_high=0, ) self.set_scan_control_settings( - mode=ScanControlMode.ADVANCED, scan_duration=self.scan_parameter.scan_duration + mode=ScanControlMode.ADVANCED, scan_duration=scan_duration ) elif scan_name == "xas_advanced_scan_with_xrd": + p_kink = self._scan_parameter("p_kink") + e_kink = self._scan_parameter("e_kink") + xrd_enable_low = self._scan_parameter("xrd_enable_low", "break_enable_low") + xrd_enable_high = self._scan_parameter("xrd_enable_high", "break_enable_high") + exp_time_low = self._scan_parameter("exp_time_low", "break_time_low") + exp_time_high = self._scan_parameter("exp_time_high", "break_time_high") + cycle_low = self._scan_parameter("cycle_low") + cycle_high = self._scan_parameter("cycle_high") + self._raise_for_missing( + scan_name, + [ + "start", + "stop", + "scan_time", + "scan_duration", + "p_kink", + "e_kink", + "xrd_enable_low", + "xrd_enable_high", + "exp_time_low", + "exp_time_high", + "cycle_low", + "cycle_high", + ], + [ + start, + stop, + scan_time, + scan_duration, + p_kink, + e_kink, + xrd_enable_low, + xrd_enable_high, + exp_time_low, + exp_time_high, + cycle_low, + cycle_high, + ], + ) self.set_advanced_xas_settings( - low=self.scan_parameter.start, - high=self.scan_parameter.stop, - scan_time=self.scan_parameter.scan_time, - p_kink=self.scan_parameter.p_kink, - e_kink=self.scan_parameter.e_kink, + low=start, + high=stop, + scan_time=scan_time, + p_kink=p_kink, + e_kink=e_kink, ) self.set_trig_settings( - enable_low=self.scan_parameter.xrd_enable_low, # enable_low=self.scan_parameter.trig_enable_low, - enable_high=self.scan_parameter.xrd_enable_high, # enable_high=self.scan_parameter.trig_enable_high, - exp_time_low=self.scan_parameter.exp_time_low, - exp_time_high=self.scan_parameter.exp_time_high, - cycle_low=self.scan_parameter.cycle_low, - cycle_high=self.scan_parameter.cycle_high, + enable_low=xrd_enable_low, + enable_high=xrd_enable_high, + exp_time_low=exp_time_low, + exp_time_high=exp_time_high, + cycle_low=cycle_low, + cycle_high=cycle_high, ) self.set_scan_control_settings( - mode=ScanControlMode.ADVANCED, scan_duration=self.scan_parameter.scan_duration + mode=ScanControlMode.ADVANCED, scan_duration=scan_duration ) else: return @@ -284,6 +352,46 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): self.stopped = True # Needs to be set to stop motion ######### Utility Methods ######### + def _check_if_scan_name_is_valid(self, scan_parameters: ScanServerScanInfo) -> bool: + """Check if the scan is within the list of scans supported by the backend.""" + return scan_parameters.scan_name in self.valid_scan_names + + def _scan_parameter(self, *names: str): + """Fetch a scan parameter from v4 metadata, with legacy fallbacks.""" + if self.scan_parameters is None: + return None + sources = [ + self.scan_parameters.additional_scan_parameters, + getattr(self.scan_parameters, "metadata", {}), + getattr(self.scan_info.msg, "scan_parameters", {}), + ] + request_inputs = self.scan_parameters.request_inputs or {} + sources.extend( + [ + request_inputs.get("inputs", {}), + request_inputs.get("kwargs", {}), + ] + ) + for source in sources: + for name in names: + if isinstance(source, dict) and name in source: + return source[name] + return None + + def _get_start_stop(self): + """Return scan start/stop from v4 positions or legacy request inputs.""" + if self.scan_parameters is not None and self.scan_parameters.positions is not None: + if len(self.scan_parameters.positions) == 2: + return self.scan_parameters.positions + return self._scan_parameter("start"), self._scan_parameter("stop") + + def _raise_for_missing(self, scan_name: str, names: list[str], values: list) -> None: + if any(value is None for value in values): + raise Mo1BraggError( + f"Missing scan parameters for {scan_name}. Required parameters: " + f"{', '.join(names)} in scan info {self.scan_parameters}" + ) + def _progress_update(self, value, **kwargs) -> None: """Callback method to update the scan progress, runs a callback to SUB_PROGRESS subscribers, i.e. BEC. @@ -454,13 +562,3 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): for s in status_list: s.wait(timeout=self.timeout_for_pvwait) - - - def _update_scan_parameter(self): - """Get the scan_info parameters for the scan.""" - for key, value in self.scan_info.msg.request_inputs["inputs"].items(): - if hasattr(self.scan_parameter, key): - setattr(self.scan_parameter, key, value) - for key, value in self.scan_info.msg.request_inputs["kwargs"].items(): - if hasattr(self.scan_parameter, key): - setattr(self.scan_parameter, key, value) diff --git a/superxas_bec/devices/nidaq/nidaq.py b/superxas_bec/devices/nidaq/nidaq.py index 17fcbc4..3ac1664 100644 --- a/superxas_bec/devices/nidaq/nidaq.py +++ b/superxas_bec/devices/nidaq/nidaq.py @@ -1,9 +1,10 @@ from __future__ import annotations import time -from typing import TYPE_CHECKING, Literal, cast +from typing import TYPE_CHECKING, Literal from bec_lib.logger import bec_logger +from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo from ophyd import Component as Cpt from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase from ophyd.status import SubscriptionStatus, WaitTimeoutError @@ -19,6 +20,7 @@ from superxas_bec.devices.nidaq.nidaq_enums import ( ScanRates, ScanType, ) +from superxas_bec.devices.utils.utils import fetch_scan_info if TYPE_CHECKING: # pragma: no cover from bec_lib.devicemanager import ScanInfo @@ -425,7 +427,7 @@ class Nidaq(PSIDeviceBase, NidaqControl): def __init__(self, prefix: str = "", *, name: str, scan_info: ScanInfo = None, **kwargs): super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs) - self.scan_info: ScanInfo + self.scan_parameters: ScanServerScanInfo | None = None self.timeout_wait_for_signal = 5 # put 5s firsts self._timeout_wait_for_pv = 5 # 5s timeout for pv calls. editted due to timeout issues persisting self.valid_scan_names = [ @@ -440,9 +442,11 @@ class Nidaq(PSIDeviceBase, NidaqControl): # Beamline Methods # ######################################## - def _check_if_scan_name_is_valid(self) -> bool: + def _check_if_scan_name_is_valid(self, scan_parameters: ScanServerScanInfo | None) -> bool: """Check if the scan is within the list of scans for which the backend is working""" - scan_name = self.scan_info.msg.scan_name + if scan_parameters is None: + return False + scan_name = scan_parameters.scan_name if scan_name in self.valid_scan_names: return True return False @@ -600,7 +604,8 @@ class Nidaq(PSIDeviceBase, NidaqControl): Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object. If the upcoming scan is not in the list of valid scans, return immediately. """ - if not self._check_if_scan_name_is_valid(): + self.scan_parameters = fetch_scan_info(self.scan_info) + if not self._check_if_scan_name_is_valid(self.scan_parameters): return None if self.state.get() != NidaqState.STANDBY: @@ -610,16 +615,16 @@ class Nidaq(PSIDeviceBase, NidaqControl): status.wait(timeout=self.timeout_wait_for_signal) # If scan is not part of the valid_scan_names, - if self.scan_info.msg.scan_name != "nidaq_continuous_scan": + if self.scan_parameters.scan_name != "nidaq_continuous_scan": self.scan_type.set(ScanType.TRIGGERED).wait(timeout=self._timeout_wait_for_pv) self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv) self.enable_compression.set(1).wait(timeout=self._timeout_wait_for_pv) else: self.scan_type.set(ScanType.CONTINUOUS).wait(timeout=self._timeout_wait_for_pv) - self.scan_duration.set(self.scan_info.msg.scan_parameters["scan_duration"]).wait( + self.scan_duration.set(self._scan_parameter("scan_duration")).wait( timeout=self._timeout_wait_for_pv ) - self.enable_compression.set(self.scan_info.msg.scan_parameters["compression"]).wait( + self.enable_compression.set(self._scan_parameter("compression")).wait( timeout=self._timeout_wait_for_pv ) @@ -632,7 +637,7 @@ class Nidaq(PSIDeviceBase, NidaqControl): # self.stage_call.set(1).wait(timeout=self._timeout_wait_for_pv) self.stage_call.put(1) status.wait(timeout=self.timeout_wait_for_signal) - if self.scan_info.msg.scan_name != "nidaq_continuous_scan": + if self.scan_parameters.scan_name != "nidaq_continuous_scan": status = self.on_kickoff() self.cancel_on_stop(status) status.wait(timeout=self._timeout_wait_for_pv) @@ -663,10 +668,10 @@ class Nidaq(PSIDeviceBase, NidaqControl): before the motor starts its oscillation. This is needed for being properly homed. The NIDAQ should go into Acquiring mode. """ - if not self._check_if_scan_name_is_valid(): + if not self._check_if_scan_name_is_valid(self.scan_parameters): return None - if self.scan_info.msg.scan_name == "nidaq_continuous_scan": + if self.scan_parameters.scan_name == "nidaq_continuous_scan": logger.info(f"Device {self.name} ready to be kicked off for nidaq_continuous_scan") return None @@ -687,15 +692,35 @@ class Nidaq(PSIDeviceBase, NidaqControl): For the NIDAQ we use this method to stop the backend since it would not stop by itself in its current implementation since the number of points are not predefined. """ - if not self._check_if_scan_name_is_valid(): + if not self._check_if_scan_name_is_valid(self.scan_parameters): return None status = CompareStatus(self.state, NidaqState.STANDBY) self.cancel_on_stop(status) - if self.scan_info.msg.scan_name != "nidaq_continuous_scan": + if self.scan_parameters.scan_name != "nidaq_continuous_scan": self.on_stop() return status + def _scan_parameter(self, name: str): + """Fetch a scan parameter from v4 metadata, with legacy fallbacks.""" + if self.scan_parameters is None: + return None + sources = [ + self.scan_parameters.additional_scan_parameters, + getattr(self.scan_info.msg, "scan_parameters", {}), + ] + request_inputs = self.scan_parameters.request_inputs or {} + sources.extend( + [ + request_inputs.get("inputs", {}), + request_inputs.get("kwargs", {}), + ] + ) + for source in sources: + if isinstance(source, dict) and name in source: + return source[name] + return None + def _progress_update(self, value, **kwargs) -> None: """Callback method to update the scan progress, runs a callback to SUB_PROGRESS subscribers, i.e. BEC. @@ -703,7 +728,9 @@ class Nidaq(PSIDeviceBase, NidaqControl): Args: value (int) : current progress value """ - scan_duration = self.scan_info.msg.scan_parameters.get("scan_duration", None) + if self.scan_parameters is None: + return + scan_duration = self._scan_parameter("scan_duration") if not isinstance(scan_duration, (int, float)): return value = scan_duration - value diff --git a/superxas_bec/devices/utils/__init__.py b/superxas_bec/devices/utils/__init__.py new file mode 100644 index 0000000..eb07fbf --- /dev/null +++ b/superxas_bec/devices/utils/__init__.py @@ -0,0 +1,2 @@ +"""Utility helpers for SuperXAS devices.""" + diff --git a/superxas_bec/devices/utils/utils.py b/superxas_bec/devices/utils/utils.py new file mode 100644 index 0000000..6cff1eb --- /dev/null +++ b/superxas_bec/devices/utils/utils.py @@ -0,0 +1,26 @@ +"""Utility functions for SuperXAS devices.""" + +from copy import deepcopy + +import numpy as np +from bec_lib.devicemanager import ScanInfo +from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo +from pydantic import ValidationError + + +def fetch_scan_info(scan_info: ScanInfo) -> ScanServerScanInfo: + """Normalize BEC scan info into the v4 scan info model.""" + info = deepcopy(scan_info.msg.info) + if isinstance(info.get("positions"), list): + info["positions"] = np.array(info["positions"]) + try: + msg = ScanServerScanInfo.model_validate(info) + except ValidationError: + if info.get("scan_type") == "fly": + info["scan_type"] = "hardware_triggered" + else: + info["scan_type"] = "software_triggered" + msg = ScanServerScanInfo.model_validate(info) + + return msg + diff --git a/superxas_bec/scans/mono_bragg_scans.py b/superxas_bec/scans/mono_bragg_scans.py index b142b4a..458b066 100644 --- a/superxas_bec/scans/mono_bragg_scans.py +++ b/superxas_bec/scans/mono_bragg_scans.py @@ -1,338 +1,202 @@ -"""This module contains the scan classes for the mono bragg motor of the SuperXAS beamline.""" +""" +V4 implementation of the SuperXAS mono bragg XAS scans. + +Scan procedure: + - prepare_scan + - open_scan + - stage + - pre_scan + - scan_core + - at_each_point (optionally called by scan_core) + - post_scan + - unstage + - close_scan + - on_exception (called if any exception is raised during the scan) +""" + +from __future__ import annotations import time -from typing import Literal +from typing import Annotated import numpy as np from bec_lib.device import DeviceBase -from bec_lib.logger import bec_logger -from bec_server.scan_server.scans import AsyncFlyScanBase - -logger = bec_logger.logger +from bec_lib.scan_args import ScanArgument, Units +from bec_server.scan_server.scans.scan_base import ScanBase, ScanType +from bec_server.scan_server.scans.scan_modifier import scan_hook -class XASSimpleScan(AsyncFlyScanBase): - """Class for the XAS simple scan""" +class XASSimpleScan(ScanBase): + """Simple oscillating XAS scan on the SuperXAS mono bragg motor.""" + scan_type = ScanType.HARDWARE_TRIGGERED scan_name = "xas_simple_scan" - scan_type = "fly" - scan_report_hint = "device_progress" - required_kwargs = [] - use_scan_progress_report = False - pre_move = False + gui_config = { "Movement Parameters": ["start", "stop"], - "Scan Parameters": ["scan_time", "scan_duration"], + "Scan Parameters": ["scan_time", "scan_duration", "monitored_readout_cycle"], } def __init__( self, - start: float, - stop: float, - scan_time: float, - scan_duration: float, - motor: DeviceBase = "mo1_bragg", + # fmt: off + start: Annotated[float, ScanArgument(display_name="Start Energy", description="Start energy.", units=Units.eV, ge=4500, le=64000)], + stop: Annotated[float, ScanArgument(display_name="Stop Energy", description="Stop energy.", units=Units.eV, ge=4500, le=64000)], + scan_time: Annotated[float, ScanArgument(display_name="Scan Time", description="Time for one scan cycle.", units=Units.s, ge=0.05)], + scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Total scan duration.", units=Units.s, ge=0.05)], + motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None, + daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None, + gonio: Annotated[DeviceBase | None, ScanArgument(display_name="Gonio", description="Goniometer device.")] = None, + monitored_readout_cycle: Annotated[float, ScanArgument(display_name="Monitored Readout Cycle", description="Delay between monitored readouts.", units=Units.s, gt=0)] = 1, + # fmt: on **kwargs, ): - """The xas_simple_scan is used to start a simple oscillating scan on the mono bragg motor. - Start and Stop define the energy range for the scan, scan_time is the time for one scan - cycle and scan_duration is the duration of the scan. If scan duration is set to 0, the - scan will run infinitely. + """ + Start a simple oscillating scan on the mono bragg motor. Args: - start (float): Start energy for the scan. - stop (float): Stop energy for the scan. + start (float): Start energy. + stop (float): Stop energy. scan_time (float): Time for one scan cycle. - scan_duration (float): Duration of the scan. - motor (DeviceBase, optional): Motor device to be used for the scan. - Defaults to "mo1_bragg". - Examples: - >>> scans.xas_simple_scan(start=8000, stop=9000, scan_time=1, scan_duration=10) + scan_duration (float): Total scan duration. + motor (DeviceBase | None): Bragg motor device. + daq (DeviceBase | None): NIDAQ device. + gonio (DeviceBase | None): Goniometer device. + monitored_readout_cycle (float): Delay between monitored readouts. + + Returns: + ScanReport """ super().__init__(**kwargs) - self.motor = motor + self._baseline_readout_status = None self.start = start self.stop = stop self.scan_time = scan_time self.scan_duration = scan_duration - self.primary_readout_cycle = 1 - - def stage(self): - """call the stage procedure""" - # Compute position for mo1_gonio pre move - # Since energy is not linear to angle, we have to calculate the angles first. - pos_start = yield from self.stubs.send_rpc_and_wait( - "mo1_bragg", - "convert_angle_energy", - mode = "EnergyToAngle", - inp = self.start, - ) - pos_end = yield from self.stubs.send_rpc_and_wait( - "mo1_bragg", - "convert_angle_energy", - mode = "EnergyToAngle", - inp = self.stop, - ) - # Goniometer position is in the middle of the start and stop angle of the scan - pos = (pos_start + pos_end) / 2 - - # Premove with mo1_gonio - yield from self.stubs.send_rpc_and_wait( - "mo1_gonio", - "move", - position = pos, - wait = True, - timeout = 30, # 30 seconds timeout - ) - # Continue with staging the devices - yield from self.stubs.stage() - - def update_readout_priority(self): - """Ensure that NIDAQ is not monitored for any quick EXAFS.""" - super().update_readout_priority() - self.readout_priority["async"].append("nidaq") - - def prepare_positions(self): - """Prepare the positions for the scan. - - Use here only start and end energy defining the range for the scan. - """ + self.mo1_bragg = self._resolve_device(motor, "mo1_bragg") + self.motor = self.mo1_bragg + self.daq = self._resolve_device(daq, "nidaq") + self.mo1_gonio = self._resolve_device(gonio, "mo1_gonio") + self.monitored_readout_cycle = monitored_readout_cycle self.positions = np.array([self.start, self.stop], dtype=float) - self.num_pos = None - yield None + self.update_scan_info( + positions=self.positions, + scan_time=scan_time, + scan_duration=scan_duration, + monitored_readout_cycle=monitored_readout_cycle, + ) + self.actions.set_device_readout_priority([self.daq], priority="async") + + def _resolve_device(self, device: DeviceBase | str | None, default_name: str) -> DeviceBase: + """Resolve optional device arguments against the v4 device container.""" + if device is None: + return self.dev[default_name] + if isinstance(device, str): + return self.dev[device] + return device + + @scan_hook + def prepare_scan(self): + """Prepare scan metadata and baseline readout.""" + self.actions.add_scan_report_instruction_device_progress(self.motor) + self._baseline_readout_status = self.actions.read_baseline_devices(wait=False) + + @scan_hook + def open_scan(self): + """Open the scan.""" + self.actions.open_scan() + + @scan_hook + def stage(self): + """Center the goniometer for the requested energy range and stage devices.""" + pos_start = self.mo1_bragg.convert_angle_energy(mode="EnergyToAngle", inp=self.start) + pos_end = self.mo1_bragg.convert_angle_energy(mode="EnergyToAngle", inp=self.stop) + gonio_position = (pos_start + pos_end) / 2 + + self.actions.set(self.mo1_gonio, gonio_position, wait=False).wait(timeout=30) + self.actions.stage_all_devices() + + @scan_hook def pre_scan(self): - """Pre Scan action.""" - - self._check_limits() - # Ensure parent class pre_scan actions to be called. - yield from super().pre_scan() - - def scan_report_instructions(self): - """ - Return the instructions for the scan report. - """ - yield from self.stubs.scan_report_instruction({"device_progress": [self.motor]}) + """Execute pre-scan hooks on all devices.""" + self.actions.pre_scan_all_devices() + @scan_hook def scan_core(self): - """Run the scan core. - Kickoff the oscillation on the Bragg motor and wait for the completion of the motion. - """ - # Start the oscillation on the Bragg motor. - yield from self.stubs.kickoff(device=self.motor) - complete_status = yield from self.stubs.complete(device=self.motor, wait=False) + """Kick off the mono bragg oscillation and read monitored devices.""" + self.actions.kickoff(self.motor) + completion_status = self.actions.complete(self.motor, wait=False) - while not complete_status.done: - # Readout monitored devices - yield from self.stubs.read(group="monitored", point_id=self.point_id) - time.sleep(self.primary_readout_cycle) - self.point_id += 1 + while not completion_status.done: + self.at_each_point() - self.num_pos = self.point_id + @scan_hook + def at_each_point(self): + """Read monitored devices during the hardware-triggered scan.""" + self.actions.read_monitored_devices() + time.sleep(self.monitored_readout_cycle) + @scan_hook + def post_scan(self): + """Complete all devices after the scan core.""" + self.actions.complete_all_devices() -# class XASSimpleScanWithXRD(XASSimpleScan): -# """Class for the XAS simple scan with XRD""" + @scan_hook + def unstage(self): + """Unstage all devices.""" + self.actions.unstage_all_devices() -# scan_name = "xas_simple_scan_with_xrd" -# gui_config = { -# "Movement Parameters": ["start", "stop"], -# "Scan Parameters": ["scan_time", "scan_duration"], -# "Low Energy Range": ["xrd_enable_low", "num_trigger_low", "exp_time_low", "cycle_low"], -# "High Energy Range": ["xrd_enable_high", "num_trigger_high", "exp_time_high", "cycle_high"], -# } + @scan_hook + def close_scan(self): + """Close the scan after the baseline readout has completed.""" + if self._baseline_readout_status is not None: + self._baseline_readout_status.wait() + self.actions.close_scan() + self.actions.check_for_unchecked_statuses() -# def __init__( -# self, -# start: float, -# stop: float, -# scan_time: float, -# scan_duration: float, -# xrd_enable_low: bool, -# num_trigger_low: int, -# exp_time_low: float, -# cycle_low: int, -# xrd_enable_high: bool, -# num_trigger_high: int, -# exp_time_high: float, -# cycle_high: float, -# motor: DeviceBase = "mo1_bragg", -# **kwargs, -# ): -# """The xas_simple_scan_with_xrd is an oscillation motion on the mono motor -# with XRD triggering at low and high energy ranges. -# If scan duration is set to 0, the scan will run infinitely. - -# Args: -# start (float): Start energy for the scan. -# stop (float): Stop energy for the scan. -# scan_time (float): Time for one oscillation . -# scan_duration (float): Total duration of the scan. -# xrd_enable_low (bool): Enable XRD triggering for the low energy range. -# num_trigger_low (int): Number of triggers for the low energy range. -# exp_time_low (float): Exposure time for the low energy range. -# cycle_low (int): Specify how often the triggers should be considered, -# every nth cycle for low -# xrd_enable_high (bool): Enable XRD triggering for the high energy range. -# num_trigger_high (int): Number of triggers for the high energy range. -# exp_time_high (float): Exposure time for the high energy range. -# cycle_high (int): Specify how often the triggers should be considered, -# every nth cycle for high -# motor (DeviceBase, optional): Motor device to be used for the scan. -# Defaults to "mo1_bragg". - -# Examples: -# >>> scans.xas_simple_scan_with_xrd(start=8000, stop=9000, scan_time=1, scan_duration=10, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000) -# """ -# super().__init__( -# start=start, -# stop=stop, -# scan_time=scan_time, -# scan_duration=scan_duration, -# motor=motor, -# **kwargs, -# ) -# self.xrd_enable_low = xrd_enable_low -# self.num_trigger_low = num_trigger_low -# self.exp_time_low = exp_time_low -# self.cycle_low = cycle_low -# self.xrd_enable_high = xrd_enable_high -# self.num_trigger_high = num_trigger_high -# self.exp_time_high = exp_time_high -# self.cycle_high = cycle_high + @scan_hook + def on_exception(self, exception: Exception): + """Try to stop outstanding device work if the scan fails.""" + self.actions.complete_all_devices(wait=False) class XASAdvancedScan(XASSimpleScan): - """Class for the XAS advanced scan""" + """Advanced oscillating XAS scan with spline parameters.""" scan_name = "xas_advanced_scan" gui_config = { "Movement Parameters": ["start", "stop"], - "Scan Parameters": ["scan_time", "scan_duration"], + "Scan Parameters": ["scan_time", "scan_duration", "monitored_readout_cycle"], "Spline Parameters": ["p_kink", "e_kink"], } def __init__( self, - start: float, - stop: float, - scan_time: float, - scan_duration: float, - p_kink: float, - e_kink: float, - motor: DeviceBase = "mo1_bragg", + # fmt: off + start: Annotated[float, ScanArgument(display_name="Start Energy", description="Start energy.", units=Units.eV)], + stop: Annotated[float, ScanArgument(display_name="Stop Energy", description="Stop energy.", units=Units.eV)], + scan_time: Annotated[float, ScanArgument(display_name="Scan Time", description="Time for one scan cycle.", units=Units.s, ge=0)], + scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Total scan duration.", units=Units.s, ge=0)], + p_kink: Annotated[float, ScanArgument(display_name="P Kink", description="Position of the kink.", ge=0)], + e_kink: Annotated[float, ScanArgument(display_name="E Kink", description="Energy of the kink.", units=Units.eV)], + motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None, + daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None, + gonio: Annotated[DeviceBase | None, ScanArgument(display_name="Gonio", description="Goniometer device.")] = None, + monitored_readout_cycle: Annotated[float, ScanArgument(display_name="Monitored Readout Cycle", description="Delay between monitored readouts.", units=Units.s, gt=0)] = 1, **kwargs, + # fmt: on ): - """The xas_advanced_scan is an oscillation motion on the mono motor. - Start and Stop define the energy range for the scan, scan_time is the - time for one scan cycle and scan_duration is the duration of the scan. - If scan duration is set to 0, the scan will run infinitely. - p_kink and e_kink add a kink to the motion profile to slow down in the - exafs region of the scan. - - Args: - start (float): Start angle for the scan. - stop (float): Stop angle for the scan. - scan_time (float): Time for one oscillation . - scan_duration (float): Total duration of the scan. - p_kink (float): Position of the kink. - e_kink (float): Energy of the kink. - motor (DeviceBase, optional): Motor device to be used for the scan. - Defaults to "mo1_bragg". - - Examples: - >>> scans.xas_advanced_scan(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500) - """ super().__init__( start=start, stop=stop, scan_time=scan_time, scan_duration=scan_duration, motor=motor, + daq=daq, + gonio=gonio, + monitored_readout_cycle=monitored_readout_cycle, **kwargs, ) - self.p_kink = p_kink - self.e_kink = e_kink + self.update_scan_info(p_kink=p_kink, e_kink=e_kink) - -# class XASAdvancedScanWithXRD(XASAdvancedScan): -# """Class for the XAS advanced scan with XRD""" - -# scan_name = "xas_advanced_scan_with_xrd" -# gui_config = { -# "Movement Parameters": ["start", "stop"], -# "Scan Parameters": ["scan_time", "scan_duration"], -# "Spline Parameters": ["p_kink", "e_kink"], -# "Low Energy Range": ["xrd_enable_low", "num_trigger_low", "exp_time_low", "cycle_low"], -# "High Energy Range": ["xrd_enable_high", "num_trigger_high", "exp_time_high", "cycle_high"], -# } - -# def __init__( -# self, -# start: float, -# stop: float, -# scan_time: float, -# scan_duration: float, -# p_kink: float, -# e_kink: float, -# xrd_enable_low: bool, -# num_trigger_low: int, -# exp_time_low: float, -# cycle_low: int, -# xrd_enable_high: bool, -# num_trigger_high: int, -# exp_time_high: float, -# cycle_high: float, -# motor: DeviceBase = "mo1_bragg", -# **kwargs, -# ): -# """The xas_advanced_scan is an oscillation motion on the mono motor -# with XRD triggering at low and high energy ranges. -# Start and Stop define the energy range for the scan, scan_time is the time for -# one scan cycle and scan_duration is the duration of the scan. If scan duration -# is set to 0, the scan will run infinitely. p_kink and e_kink add a kink to the -# motion profile to slow down in the exafs region of the scan. - -# Args: -# start (float): Start angle for the scan. -# stop (float): Stop angle for the scan. -# scan_time (float): Time for one oscillation . -# scan_duration (float): Total duration of the scan. -# p_kink (float): Position of kink. -# e_kink (float): Energy of the kink. -# xrd_enable_low (bool): Enable XRD triggering for the low energy range. -# num_trigger_low (int): Number of triggers for the low energy range. -# exp_time_low (float): Exposure time for the low energy range. -# cycle_low (int): Specify how often the triggers should be considered, -# every nth cycle for low -# xrd_enable_high (bool): Enable XRD triggering for the high energy range. -# num_trigger_high (int): Number of triggers for the high energy range. -# exp_time_high (float): Exposure time for the high energy range. -# cycle_high (int): Specify how often the triggers should be considered, -# every nth cycle for high -# motor (DeviceBase, optional): Motor device to be used for the scan. -# Defaults to "mo1_bragg". - -# Examples: -# >>> scans.xas_advanced_scan_with_xrd(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000) -# """ -# super().__init__( -# start=start, -# stop=stop, -# scan_time=scan_time, -# scan_duration=scan_duration, -# p_kink=p_kink, -# e_kink=e_kink, -# motor=motor, -# **kwargs, -# ) -# self.p_kink = p_kink -# self.e_kink = e_kink -# self.xrd_enable_low = xrd_enable_low -# self.num_trigger_low = num_trigger_low -# self.exp_time_low = exp_time_low -# self.cycle_low = cycle_low -# self.xrd_enable_high = xrd_enable_high -# self.num_trigger_high = num_trigger_high -# self.exp_time_high = exp_time_high -# self.cycle_high = cycle_high diff --git a/superxas_bec/scans/nidaq_cont_scan.py b/superxas_bec/scans/nidaq_cont_scan.py index 2530f2c..2e4be5b 100644 --- a/superxas_bec/scans/nidaq_cont_scan.py +++ b/superxas_bec/scans/nidaq_cont_scan.py @@ -1,84 +1,126 @@ -"""This module contains the scan class for the nidaq of the SuperXAS beamline for use in continuous mode.""" +""" +V4 implementation of the SuperXAS NIDAQ continuous scan. + +The NIDAQ continuous scan measures with the NIDAQ without moving the +monochromator or any other motor. +""" + +from __future__ import annotations import time -from typing import Literal +from typing import Annotated -import numpy as np from bec_lib.device import DeviceBase -from bec_lib.logger import bec_logger -from bec_server.scan_server.scans import AsyncFlyScanBase - -logger = bec_logger.logger +from bec_lib.scan_args import ScanArgument, Units +from bec_server.scan_server.scans.scan_base import ScanBase, ScanType +from bec_server.scan_server.scans.scan_modifier import scan_hook -class NIDAQContinuousScan(AsyncFlyScanBase): - """Class for the nidaq continuous scan (without mono)""" +class NIDAQContinuousScan(ScanBase): + """Continuous NIDAQ acquisition without mono motion.""" + scan_type = ScanType.HARDWARE_TRIGGERED scan_name = "nidaq_continuous_scan" - scan_type = "fly" - scan_report_hint = "device_progress" - required_kwargs = [] - use_scan_progress_report = False - pre_move = False - gui_config = {"Scan Parameters": ["scan_duration"], "Data Compression": ["compression"]} + + gui_config = {"Scan Parameters": ["scan_duration", "daq", "compression"]} def __init__( - self, scan_duration: float, daq: DeviceBase = "nidaq", compression: bool = False, **kwargs + self, + # fmt: off + scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Duration of the scan", units=Units.s)], + daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="DAQ device to be used for the scan")] = None, + compression: Annotated[bool, ScanArgument(display_name="Compression", description="Whether to compress the data")] = False, + # fmt: on + **kwargs, ): - """The NIDAQ continuous scan is used to measure with the NIDAQ without moving the - monochromator or any other motor. The NIDAQ thus runs in continuous mode, with a - set scan_duration. + """ + Start a continuous NIDAQ acquisition. Args: scan_duration (float): Duration of the scan. - daq (DeviceBase, optional): DAQ device to be used for the scan. - Defaults to "nidaq". - Examples: - >>> scans.nidaq_continuous_scan(scan_duration=10) + daq (DeviceBase | None): DAQ device to be used for the scan. + compression (bool): Whether to compress the data. + + Returns: + ScanReport """ super().__init__(**kwargs) + self._baseline_readout_status = None self.scan_duration = scan_duration - self.daq = daq - self.start_time = 0 - self.primary_readout_cycle = 1 - self.scan_parameters["scan_duration"] = scan_duration - self.scan_parameters["compression"] = compression + self.daq = self._resolve_device(daq, "nidaq") + self.compression = compression + self.monitored_readout_cycle = 1.0 + self.motors = [self.daq] - def update_readout_priority(self): - """Ensure that NIDAQ is not monitored for any quick EXAFS.""" - super().update_readout_priority() - self.readout_priority["async"].append("nidaq") + self.update_scan_info(scan_duration=scan_duration, compression=compression) + self.actions.set_device_readout_priority([self.daq], priority="async") - def prepare_positions(self): - """Prepare the positions for the scan.""" - yield None + def _resolve_device(self, device: DeviceBase | str | None, default_name: str) -> DeviceBase: + """Resolve optional device arguments against the v4 device container.""" + if device is None: + return self.dev[default_name] + if isinstance(device, str): + return self.dev[device] + return device + @scan_hook + def prepare_scan(self): + """Prepare scan metadata and baseline readout.""" + self.actions.add_scan_report_instruction_device_progress(self.daq) + self._baseline_readout_status = self.actions.read_baseline_devices(wait=False) + + @scan_hook + def open_scan(self): + """Open the scan.""" + self.actions.open_scan() + + @scan_hook + def stage(self): + """Stage all devices.""" + self.actions.stage_all_devices() + + @scan_hook def pre_scan(self): - """Pre Scan action.""" - - self.start_time = time.time() - # Ensure parent class pre_scan actions to be called. - yield from super().pre_scan() - - def scan_report_instructions(self): - """ - Return the instructions for the scan report. - """ - yield from self.stubs.scan_report_instruction({"device_progress": [self.daq]}) + """Execute pre-scan hooks on all devices.""" + self.actions.pre_scan_all_devices() + @scan_hook def scan_core(self): - """Run the scan core. - Kickoff the acquisition of the NIDAQ wait for the completion of the scan. - """ - kickoff_status = yield from self.stubs.kickoff(device=self.daq) - kickoff_status.wait(timeout=5) # wait for proper kickoff of device + """Kick off the NIDAQ acquisition and read monitored devices.""" + kickoff_status = self.actions.kickoff(device=self.daq, wait=False) + kickoff_status.wait(timeout=5) - complete_status = yield from self.stubs.complete(device=self.daq, wait=False) + complete_status = self.actions.complete(device=self.daq, wait=False) while not complete_status.done: - # Readout monitored devices - yield from self.stubs.read(group="monitored", point_id=self.point_id) - time.sleep(self.primary_readout_cycle) - self.point_id += 1 + self.at_each_point() + time.sleep(self.monitored_readout_cycle) + + @scan_hook + def at_each_point(self): + """Read monitored devices during the hardware-triggered scan.""" + self.actions.read_monitored_devices() + + @scan_hook + def post_scan(self): + """Complete all devices after the scan core.""" + self.actions.complete_all_devices() + + @scan_hook + def unstage(self): + """Unstage all devices.""" + self.actions.unstage_all_devices() + + @scan_hook + def close_scan(self): + """Close the scan after the baseline readout has completed.""" + if self._baseline_readout_status is not None: + self._baseline_readout_status.wait() + self.actions.close_scan() + self.actions.check_for_unchecked_statuses() + + @scan_hook + def on_exception(self, exception: Exception): + """Try to stop outstanding device work if the scan fails.""" + self.actions.complete_all_devices(wait=False) - self.num_pos = self.point_id diff --git a/tests/tests_devices/test_device_scan_info_utils.py b/tests/tests_devices/test_device_scan_info_utils.py new file mode 100644 index 0000000..5fea25e --- /dev/null +++ b/tests/tests_devices/test_device_scan_info_utils.py @@ -0,0 +1,53 @@ +# pylint: skip-file +from types import SimpleNamespace + +import numpy as np + +from superxas_bec.devices.utils.utils import fetch_scan_info + + +def test_fetch_scan_info_accepts_v4_scan_info_with_positions_list(): + scan_info = SimpleNamespace( + msg=SimpleNamespace( + info={ + "scan_name": "xas_simple_scan", + "scan_id": "scan-id-test", + "scan_type": "hardware_triggered", + "positions": [8000.0, 9000.0], + "additional_scan_parameters": { + "scan_time": 1.0, + "scan_duration": 10.0, + }, + } + ) + ) + + msg = fetch_scan_info(scan_info) + + assert msg.scan_name == "xas_simple_scan" + assert msg.scan_type == "hardware_triggered" + np.testing.assert_array_equal(msg.positions, np.array([8000.0, 9000.0])) + assert msg.additional_scan_parameters["scan_duration"] == 10.0 + + +def test_fetch_scan_info_converts_legacy_fly_scan_type(): + scan_info = SimpleNamespace( + msg=SimpleNamespace( + info={ + "scan_name": "xas_simple_scan", + "scan_id": "scan-id-test", + "scan_type": "fly", + "positions": [8000.0, 9000.0], + "request_inputs": { + "inputs": {}, + "kwargs": {"scan_time": 1.0, "scan_duration": 10.0}, + }, + } + ) + ) + + msg = fetch_scan_info(scan_info) + + assert msg.scan_type == "hardware_triggered" + assert msg.request_inputs["kwargs"]["scan_time"] == 1.0 + diff --git a/tests/tests_devices/test_mo1_bragg_v4_scan_info.py b/tests/tests_devices/test_mo1_bragg_v4_scan_info.py new file mode 100644 index 0000000..2cb6fa7 --- /dev/null +++ b/tests/tests_devices/test_mo1_bragg_v4_scan_info.py @@ -0,0 +1,100 @@ +# pylint: skip-file +from unittest import mock + +import numpy as np +import pytest +from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo +from ophyd_devices.tests.utils import patched_device + +from superxas_bec.devices.mo1_bragg.mo1_bragg import Mo1Bragg, ScanControlLoadMessage + + +@pytest.fixture(scope="function") +def mock_bragg(): + with patched_device( + Mo1Bragg, name="mo1_bragg", prefix="X10DA-OP-MO1:BRAGG:" + ) as dev: + yield dev + + +def _set_scan_info(dev, scan_info): + dev.scan_info.msg.info.update(scan_info.model_dump()) + + +def _mock_status(): + status = mock.MagicMock() + status.wait = mock.MagicMock() + return status + + +def test_mo1_bragg_stage_uses_v4_simple_scan_info(mock_bragg): + scan_info = ScanServerScanInfo( + scan_name="xas_simple_scan", + scan_id="scan-id-test", + scan_type="hardware_triggered", + positions=np.array([8000.0, 9000.0]), + additional_scan_parameters={"scan_time": 1.0, "scan_duration": 10.0}, + ) + _set_scan_info(mock_bragg, scan_info) + mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.PENDING + + with ( + mock.patch.object(mock_bragg, "set_xas_settings") as set_xas_settings, + mock.patch.object(mock_bragg, "set_trig_settings") as set_trig_settings, + mock.patch.object(mock_bragg, "set_scan_control_settings") as set_scan_control_settings, + mock.patch.object(mock_bragg, "cancel_on_stop"), + mock.patch( + "superxas_bec.devices.mo1_bragg.mo1_bragg.CompareStatus", + return_value=_mock_status(), + ), + ): + mock_bragg.on_stage() + + set_xas_settings.assert_called_once_with(low=8000.0, high=9000.0, scan_time=1.0) + set_trig_settings.assert_called_once_with( + enable_low=False, + enable_high=False, + exp_time_low=0, + exp_time_high=0, + cycle_low=0, + cycle_high=0, + ) + assert set_scan_control_settings.call_args.kwargs["scan_duration"] == 10.0 + + +def test_mo1_bragg_stage_uses_v4_advanced_scan_info(mock_bragg): + scan_info = ScanServerScanInfo( + scan_name="xas_advanced_scan", + scan_id="scan-id-test", + scan_type="hardware_triggered", + positions=np.array([8000.0, 9000.0]), + additional_scan_parameters={ + "scan_time": 1.0, + "scan_duration": 10.0, + "p_kink": 50.0, + "e_kink": 8500.0, + }, + ) + _set_scan_info(mock_bragg, scan_info) + mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.PENDING + + with ( + mock.patch.object(mock_bragg, "set_advanced_xas_settings") as set_advanced_xas_settings, + mock.patch.object(mock_bragg, "set_trig_settings"), + mock.patch.object(mock_bragg, "set_scan_control_settings") as set_scan_control_settings, + mock.patch.object(mock_bragg, "cancel_on_stop"), + mock.patch( + "superxas_bec.devices.mo1_bragg.mo1_bragg.CompareStatus", + return_value=_mock_status(), + ), + ): + mock_bragg.on_stage() + + set_advanced_xas_settings.assert_called_once_with( + low=8000.0, + high=9000.0, + scan_time=1.0, + p_kink=50.0, + e_kink=8500.0, + ) + assert set_scan_control_settings.call_args.kwargs["scan_duration"] == 10.0 diff --git a/tests/tests_devices/test_nidaq_v4_scan_info.py b/tests/tests_devices/test_nidaq_v4_scan_info.py new file mode 100644 index 0000000..53a5f7b --- /dev/null +++ b/tests/tests_devices/test_nidaq_v4_scan_info.py @@ -0,0 +1,67 @@ +# pylint: skip-file +from unittest import mock + +import pytest +from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo +from ophyd_devices.tests.utils import patched_device + +from superxas_bec.devices.nidaq.nidaq import Nidaq, NidaqState, ScanType +from superxas_bec.devices.utils.utils import fetch_scan_info + + +@pytest.fixture(scope="function") +def mock_nidaq(): + with patched_device( + Nidaq, name="nidaq", prefix="X10DA-CPCL-SCANSERVER:" + ) as dev: + yield dev + + +def _set_scan_info(dev, scan_info): + dev.scan_info.msg.info.update(scan_info.model_dump()) + dev.scan_parameters = fetch_scan_info(dev.scan_info) + + +def test_nidaq_check_scan_name_uses_normalized_scan_info(mock_nidaq): + valid = ScanServerScanInfo(scan_name="xas_simple_scan", scan_id="scan-id-test") + invalid = ScanServerScanInfo(scan_name="line_scan", scan_id="scan-id-test") + + assert mock_nidaq._check_if_scan_name_is_valid(valid) + assert not mock_nidaq._check_if_scan_name_is_valid(invalid) + + +def test_nidaq_progress_update_uses_v4_additional_parameters(mock_nidaq): + scan_info = ScanServerScanInfo( + scan_name="nidaq_continuous_scan", + scan_id="scan-id-test", + additional_scan_parameters={"scan_duration": 10.0, "compression": False}, + ) + _set_scan_info(mock_nidaq, scan_info) + mock_nidaq.progress_signal.put = mock.MagicMock() + + mock_nidaq._progress_update(4.0) + + mock_nidaq.progress_signal.put.assert_called_once_with(value=6.0, max_value=10.0, done=False) + + +def test_nidaq_stage_uses_v4_continuous_scan_info(mock_nidaq): + scan_info = ScanServerScanInfo( + scan_name="nidaq_continuous_scan", + scan_id="scan-id-test", + additional_scan_parameters={"scan_duration": 10.0, "compression": False}, + ) + mock_nidaq.scan_info.msg.info.update(scan_info.model_dump()) + mock_nidaq.state.put(NidaqState.STANDBY) + + with ( + mock.patch("superxas_bec.devices.nidaq.nidaq.CompareStatus") as status_cls, + mock.patch.object(mock_nidaq, "cancel_on_stop"), + mock.patch.object(mock_nidaq, "on_kickoff") as on_kickoff, + ): + status_cls.return_value.wait = mock.MagicMock() + mock_nidaq.on_stage() + + assert mock_nidaq.scan_type.get() == ScanType.CONTINUOUS + assert mock_nidaq.scan_duration.get() == 10.0 + assert mock_nidaq.enable_compression.get() is False + on_kickoff.assert_not_called() diff --git a/tests/tests_scans/conftest.py b/tests/tests_scans/conftest.py new file mode 100644 index 0000000..304b37a --- /dev/null +++ b/tests/tests_scans/conftest.py @@ -0,0 +1,7 @@ +# pylint: skip-file +from bec_server.scan_server.tests.scan_fixtures import ( + nth_done_status_mock, + readout_priority, + v4_scan_assembler, +) + diff --git a/tests/tests_scans/test_mono_bragg_scans_v4.py b/tests/tests_scans/test_mono_bragg_scans_v4.py new file mode 100644 index 0000000..bf5e1f3 --- /dev/null +++ b/tests/tests_scans/test_mono_bragg_scans_v4.py @@ -0,0 +1,121 @@ +# pylint: skip-file +from unittest import mock + +import numpy as np +import pytest +from bec_server.scan_server.tests.scan_fixtures import * +from bec_server.scan_server.tests.scan_hook_tests import * + +XAS_SIMPLE_SCAN_DEFAULT_HOOK_TESTS = [ + ("prepare_scan", [assert_prepare_scan_reads_baseline_devices]), + ("open_scan", [assert_scan_open_called]), + ("pre_scan", [assert_pre_scan_called]), + ("unstage", [assert_unstage_all_devices_called]), + ("close_scan", [assert_close_scan_waits_for_baseline_and_closes]), +] + + +def _assemble_xas_simple_scan(v4_scan_assembler, **overrides): + params = { + "start": 8000.0, + "stop": 9000.0, + "scan_time": 1.0, + "scan_duration": 10.0, + "motor": "mo1_bragg", + "daq": "nidaq", + "gonio": "mo1_gonio", + "monitored_readout_cycle": 1.0, + } + params.update(overrides) + return v4_scan_assembler("xas_simple_scan", **params) + + +@pytest.mark.parametrize(("hook_name", "hook_tests"), XAS_SIMPLE_SCAN_DEFAULT_HOOK_TESTS) +def test_xas_simple_scan_v4_default_hooks( + v4_scan_assembler, nth_done_status_mock, hook_name, hook_tests +): + scan = _assemble_xas_simple_scan(v4_scan_assembler) + + run_scan_tests(scan, [(hook_name, hook_tests)], nth_done_status_mock=nth_done_status_mock) + + +def test_xas_simple_scan_v4_prepare_scan_updates_metadata(v4_scan_assembler): + scan = _assemble_xas_simple_scan(v4_scan_assembler) + scan.actions.add_scan_report_instruction_device_progress = mock.MagicMock() + baseline_status = mock.MagicMock() + scan.actions.read_baseline_devices = mock.MagicMock(return_value=baseline_status) + + scan.prepare_scan() + + scan.actions._build_scan_status_message("open") + + np.testing.assert_array_equal(scan.scan_info.positions, np.array([8000.0, 9000.0])) + assert scan.scan_info.additional_scan_parameters["scan_time"] == 1.0 + assert scan.scan_info.additional_scan_parameters["scan_duration"] == 10.0 + assert scan.scan_info.additional_scan_parameters["monitored_readout_cycle"] == 1.0 + assert scan.scan_info.readout_priority_modification["async"] == ["nidaq"] + scan.actions.add_scan_report_instruction_device_progress.assert_called_once_with(scan.motor) + scan.actions.read_baseline_devices.assert_called_once_with(wait=False) + assert scan._baseline_readout_status is baseline_status + + +def test_xas_simple_scan_v4_stage_centers_gonio_and_stages_devices(v4_scan_assembler): + scan = _assemble_xas_simple_scan(v4_scan_assembler) + scan.mo1_bragg.convert_angle_energy = mock.MagicMock(side_effect=[10.0, 14.0]) + set_status = mock.MagicMock() + scan.actions.set = mock.MagicMock(return_value=set_status) + scan.actions.stage_all_devices = mock.MagicMock() + + scan.stage() + + assert scan.mo1_bragg.convert_angle_energy.call_args_list == [ + mock.call(mode="EnergyToAngle", inp=8000.0), + mock.call(mode="EnergyToAngle", inp=9000.0), + ] + scan.actions.set.assert_called_once_with(scan.mo1_gonio, 12.0, wait=False) + set_status.wait.assert_called_once_with(timeout=30) + scan.actions.stage_all_devices.assert_called_once_with() + + +def test_xas_simple_scan_v4_scan_core_reads_until_complete(v4_scan_assembler, nth_done_status_mock): + scan = _assemble_xas_simple_scan(v4_scan_assembler) + completion_status = nth_done_status_mock(resolve_after=3) + scan.actions.kickoff = mock.MagicMock() + scan.actions.complete = mock.MagicMock(return_value=completion_status) + scan.actions.read_monitored_devices = mock.MagicMock() + + with mock.patch("superxas_bec.scans.mono_bragg_scans.time.sleep"): + scan.scan_core() + + scan.actions.kickoff.assert_called_once_with(scan.motor) + scan.actions.complete.assert_called_once_with(scan.motor, wait=False) + assert scan.actions.read_monitored_devices.call_count == 2 + + +def test_xas_simple_scan_v4_post_scan_completes_all_devices(v4_scan_assembler): + scan = _assemble_xas_simple_scan(v4_scan_assembler) + scan.actions.complete_all_devices = mock.MagicMock() + + scan.post_scan() + + scan.actions.complete_all_devices.assert_called_once_with() + + +def test_xas_advanced_scan_v4_updates_spline_metadata(v4_scan_assembler): + scan = v4_scan_assembler( + "xas_advanced_scan", + start=8000.0, + stop=9000.0, + scan_time=1.0, + scan_duration=10.0, + p_kink=50.0, + e_kink=8500.0, + motor="mo1_bragg", + daq="nidaq", + gonio="mo1_gonio", + ) + + assert scan.scan_name == "xas_advanced_scan" + assert scan.scan_info.additional_scan_parameters["p_kink"] == 50.0 + assert scan.scan_info.additional_scan_parameters["e_kink"] == 8500.0 + diff --git a/tests/tests_scans/test_nidaq_continuous_scan_v4.py b/tests/tests_scans/test_nidaq_continuous_scan_v4.py new file mode 100644 index 0000000..39deb02 --- /dev/null +++ b/tests/tests_scans/test_nidaq_continuous_scan_v4.py @@ -0,0 +1,76 @@ +# pylint: skip-file +from unittest import mock + +import pytest +from bec_server.scan_server.tests.scan_fixtures import * +from bec_server.scan_server.tests.scan_hook_tests import * + +NIDAQ_CONTINUOUS_SCAN_DEFAULT_HOOK_TESTS = [ + ("prepare_scan", [assert_prepare_scan_reads_baseline_devices]), + ("open_scan", [assert_scan_open_called]), + ("stage", [assert_stage_all_devices_called]), + ("pre_scan", [assert_pre_scan_called]), + ("unstage", [assert_unstage_all_devices_called]), + ("close_scan", [assert_close_scan_waits_for_baseline_and_closes]), +] + + +def _assemble_nidaq_continuous_scan(v4_scan_assembler, **overrides): + params = {"scan_duration": 10.0, "daq": "nidaq", "compression": False} + params.update(overrides) + return v4_scan_assembler("nidaq_continuous_scan", **params) + + +@pytest.mark.parametrize(("hook_name", "hook_tests"), NIDAQ_CONTINUOUS_SCAN_DEFAULT_HOOK_TESTS) +def test_nidaq_continuous_scan_v4_default_hooks( + v4_scan_assembler, nth_done_status_mock, hook_name, hook_tests +): + scan = _assemble_nidaq_continuous_scan(v4_scan_assembler) + + run_scan_tests(scan, [(hook_name, hook_tests)], nth_done_status_mock=nth_done_status_mock) + + +def test_nidaq_continuous_scan_v4_prepare_scan_updates_metadata(v4_scan_assembler): + scan = _assemble_nidaq_continuous_scan(v4_scan_assembler) + scan.actions.add_scan_report_instruction_device_progress = mock.MagicMock() + baseline_status = mock.MagicMock() + scan.actions.read_baseline_devices = mock.MagicMock(return_value=baseline_status) + + scan.prepare_scan() + scan.actions._build_scan_status_message("open") + + assert scan.scan_info.additional_scan_parameters["scan_duration"] == 10.0 + assert scan.scan_info.additional_scan_parameters["compression"] is False + assert scan.scan_info.readout_priority_modification["async"] == ["nidaq"] + scan.actions.add_scan_report_instruction_device_progress.assert_called_once_with(scan.daq) + scan.actions.read_baseline_devices.assert_called_once_with(wait=False) + assert scan._baseline_readout_status is baseline_status + + +def test_nidaq_continuous_scan_v4_scan_core_reads_until_complete( + v4_scan_assembler, nth_done_status_mock +): + scan = _assemble_nidaq_continuous_scan(v4_scan_assembler) + kickoff_status = mock.MagicMock() + completion_status = nth_done_status_mock(resolve_after=3) + scan.actions.kickoff = mock.MagicMock(return_value=kickoff_status) + scan.actions.complete = mock.MagicMock(return_value=completion_status) + scan.actions.read_monitored_devices = mock.MagicMock() + + with mock.patch("superxas_bec.scans.nidaq_cont_scan.time.sleep"): + scan.scan_core() + + scan.actions.kickoff.assert_called_once_with(device=scan.daq, wait=False) + kickoff_status.wait.assert_called_once_with(timeout=5) + scan.actions.complete.assert_called_once_with(device=scan.daq, wait=False) + assert scan.actions.read_monitored_devices.call_count == 2 + + +def test_nidaq_continuous_scan_v4_post_scan_completes_all_devices(v4_scan_assembler): + scan = _assemble_nidaq_continuous_scan(v4_scan_assembler) + scan.actions.complete_all_devices = mock.MagicMock() + + scan.post_scan() + + scan.actions.complete_all_devices.assert_called_once_with() + -- 2.52.0 From d443f228bc599d43b3b9cabf0005ef6a2a54e7cd Mon Sep 17 00:00:00 2001 From: x10da Date: Thu, 28 May 2026 14:44:54 +0200 Subject: [PATCH 2/4] fix: minor fixes after test at the beamline --- superxas_bec/devices/mo1_bragg/mo1_bragg.py | 50 +++++-------------- .../devices/mo1_bragg/mo1_bragg_devices.py | 2 +- superxas_bec/scans/mono_bragg_scans.py | 3 +- 3 files changed, 15 insertions(+), 40 deletions(-) diff --git a/superxas_bec/devices/mo1_bragg/mo1_bragg.py b/superxas_bec/devices/mo1_bragg/mo1_bragg.py index 89b1abf..d50ef91 100644 --- a/superxas_bec/devices/mo1_bragg/mo1_bragg.py +++ b/superxas_bec/devices/mo1_bragg/mo1_bragg.py @@ -119,14 +119,11 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): scan_duration = self._scan_parameter("scan_duration") if scan_name == "xas_simple_scan": self._raise_for_missing( - scan_name, ["start", "stop", "scan_time", "scan_duration"], + scan_name, + ["start", "stop", "scan_time", "scan_duration"], [start, stop, scan_time, scan_duration], ) - self.set_xas_settings( - low=start, - high=stop, - scan_time=scan_time, - ) + self.set_xas_settings(low=start, high=stop, scan_time=scan_time) self.set_trig_settings( enable_low=False, enable_high=False, @@ -135,9 +132,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): cycle_low=0, cycle_high=0, ) - self.set_scan_control_settings( - mode=ScanControlMode.SIMPLE, scan_duration=scan_duration - ) + self.set_scan_control_settings(mode=ScanControlMode.SIMPLE, scan_duration=scan_duration) elif scan_name == "xas_simple_scan_with_xrd": xrd_enable_low = self._scan_parameter("xrd_enable_low", "break_enable_low") xrd_enable_high = self._scan_parameter("xrd_enable_high", "break_enable_high") @@ -172,11 +167,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): cycle_high, ], ) - self.set_xas_settings( - low=start, - high=stop, - scan_time=scan_time, - ) + self.set_xas_settings(low=start, high=stop, scan_time=scan_time) self.set_trig_settings( enable_low=xrd_enable_low, enable_high=xrd_enable_high, @@ -185,9 +176,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): cycle_low=cycle_low, cycle_high=cycle_high, ) - self.set_scan_control_settings( - mode=ScanControlMode.SIMPLE, scan_duration=scan_duration - ) + self.set_scan_control_settings(mode=ScanControlMode.SIMPLE, scan_duration=scan_duration) elif scan_name == "xas_advanced_scan": p_kink = self._scan_parameter("p_kink") e_kink = self._scan_parameter("e_kink") @@ -197,11 +186,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): [start, stop, scan_time, scan_duration, p_kink, e_kink], ) self.set_advanced_xas_settings( - low=start, - high=stop, - scan_time=scan_time, - p_kink=p_kink, - e_kink=e_kink, + low=start, high=stop, scan_time=scan_time, p_kink=p_kink, e_kink=e_kink ) self.set_trig_settings( enable_low=False, @@ -255,11 +240,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): ], ) self.set_advanced_xas_settings( - low=start, - high=stop, - scan_time=scan_time, - p_kink=p_kink, - e_kink=e_kink, + low=start, high=stop, scan_time=scan_time, p_kink=p_kink, e_kink=e_kink ) self.set_trig_settings( enable_low=xrd_enable_low, @@ -283,7 +264,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.SUCCESS) self.cancel_on_stop(status) self.scan_control.scan_load.put(1) - # Wait for params to be checked from controller + # Wait for params to be checked from controller status.wait(self.timeout_for_pvwait) return None @@ -366,12 +347,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): getattr(self.scan_info.msg, "scan_parameters", {}), ] request_inputs = self.scan_parameters.request_inputs or {} - sources.extend( - [ - request_inputs.get("inputs", {}), - request_inputs.get("kwargs", {}), - ] - ) + sources.extend([request_inputs.get("inputs", {}), request_inputs.get("kwargs", {})]) for source in sources: for name in names: if isinstance(source, dict) and name in source: @@ -421,7 +397,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): status_list.append(self.scan_settings.s_scan_scantime.set(scan_time)) self.cancel_on_stop(status_list[-1]) - + for s in status_list: s.wait(timeout=self.timeout_for_pvwait) @@ -443,6 +419,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): status = CompareStatus(self.calculator.calc_done, 0) self.cancel_on_stop(status) status.wait(self.timeout_for_pvwait) + time.sleep(0.25) if mode == "AngleToEnergy": self.calculator.calc_angle.put(inp) @@ -452,7 +429,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): status = CompareStatus(self.calculator.calc_done, 1) self.cancel_on_stop(status) status.wait(self.timeout_for_pvwait) - time.sleep(0.25) #TODO needed still? Needed due to update frequency of softIOC + time.sleep(0.25) # TODO needed still? Needed due to update frequency of softIOC if mode == "AngleToEnergy": return self.calculator.calc_energy.get() elif mode == "EnergyToAngle": @@ -542,7 +519,6 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): for s in status_list: s.wait(timeout=self.timeout_for_pvwait) - def set_scan_control_settings(self, mode: ScanControlMode, scan_duration: float) -> None: """Set the scan control settings for the upcoming scan. diff --git a/superxas_bec/devices/mo1_bragg/mo1_bragg_devices.py b/superxas_bec/devices/mo1_bragg/mo1_bragg_devices.py index 7fafbe0..fed7c1c 100644 --- a/superxas_bec/devices/mo1_bragg/mo1_bragg_devices.py +++ b/superxas_bec/devices/mo1_bragg/mo1_bragg_devices.py @@ -165,7 +165,7 @@ class Mo1TriggerSettings(Device): class Mo1BraggCalculator(Device): """Mo1 Bragg PVs to convert angle to energy or vice-versa.""" - calc_reset = Cpt(EpicsSignal, suffix="calc_reset", kind="config", put_complete=True) + calc_reset = Cpt(EpicsSignalWithRBV, suffix="calc_reset", kind="config", put_complete=True) calc_done = Cpt(EpicsSignalRO, suffix="calc_done_RBV", kind="config") calc_energy = Cpt(EpicsSignalWithRBV, suffix="calc_energy", kind="config") calc_angle = Cpt(EpicsSignalWithRBV, suffix="calc_angle", kind="config") diff --git a/superxas_bec/scans/mono_bragg_scans.py b/superxas_bec/scans/mono_bragg_scans.py index 458b066..8f691f5 100644 --- a/superxas_bec/scans/mono_bragg_scans.py +++ b/superxas_bec/scans/mono_bragg_scans.py @@ -76,7 +76,7 @@ class XASSimpleScan(ScanBase): self.mo1_bragg = self._resolve_device(motor, "mo1_bragg") self.motor = self.mo1_bragg self.daq = self._resolve_device(daq, "nidaq") - self.mo1_gonio = self._resolve_device(gonio, "mo1_gonio") + self.mo1_gonio = self._resolve_device(gonio, "mo1_rotx") self.monitored_readout_cycle = monitored_readout_cycle self.positions = np.array([self.start, self.stop], dtype=float) @@ -199,4 +199,3 @@ class XASAdvancedScan(XASSimpleScan): **kwargs, ) self.update_scan_info(p_kink=p_kink, e_kink=e_kink) - -- 2.52.0 From c1ba279b985cb1918bafddffbc990bee6386fa4f Mon Sep 17 00:00:00 2001 From: appel_c Date: Thu, 28 May 2026 17:06:56 +0200 Subject: [PATCH 3/4] refactor: cleanup, and remove readability --- superxas_bec/devices/mo1_bragg/mo1_bragg.py | 30 ++---- superxas_bec/devices/nidaq/nidaq.py | 100 ++++++++++++-------- 2 files changed, 68 insertions(+), 62 deletions(-) diff --git a/superxas_bec/devices/mo1_bragg/mo1_bragg.py b/superxas_bec/devices/mo1_bragg/mo1_bragg.py index d50ef91..ee5233c 100644 --- a/superxas_bec/devices/mo1_bragg/mo1_bragg.py +++ b/superxas_bec/devices/mo1_bragg/mo1_bragg.py @@ -8,29 +8,25 @@ used to ensure that the action is executed completely. This is believed to allow for a more stable execution of the action.""" import time -from typing import Any, Literal +from typing import Literal from bec_lib.devicemanager import ScanInfo from bec_lib.logger import bec_logger from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo from ophyd import Component as Cpt -from ophyd import DeviceStatus, Signal, StatusBase -from ophyd.status import SubscriptionStatus, WaitTimeoutError +from ophyd import DeviceStatus, StatusBase +from ophyd.status import WaitTimeoutError from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase -from ophyd_devices.utils.errors import DeviceStopError from typeguard import typechecked from superxas_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner # pylint: disable=unused-import from superxas_bec.devices.mo1_bragg.mo1_bragg_enums import ( - MoveType, ScanControlLoadMessage, ScanControlMode, ScanControlScanStatus, - TriggerControlMode, - TriggerControlSource, ) from superxas_bec.devices.mo1_bragg.mo1_bragg_utils import compute_spline from superxas_bec.devices.utils.utils import fetch_scan_info @@ -71,9 +67,9 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): self.timeout_for_pvwait = 7.5 self.valid_scan_names = [ "xas_simple_scan", - "xas_simple_scan_with_xrd", + "xas_simple_scan_with_xrd", # Prepared for future, currently not yet available "xas_advanced_scan", - "xas_advanced_scan_with_xrd", + "xas_advanced_scan_with_xrd", # Prepared for future, currently not yet available "nidaq_continuous_scan", ] @@ -341,17 +337,9 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): """Fetch a scan parameter from v4 metadata, with legacy fallbacks.""" if self.scan_parameters is None: return None - sources = [ - self.scan_parameters.additional_scan_parameters, - getattr(self.scan_parameters, "metadata", {}), - getattr(self.scan_info.msg, "scan_parameters", {}), - ] - request_inputs = self.scan_parameters.request_inputs or {} - sources.extend([request_inputs.get("inputs", {}), request_inputs.get("kwargs", {})]) - for source in sources: - for name in names: - if isinstance(source, dict) and name in source: - return source[name] + for name in names: + if name in self.scan_parameters.additional_scan_parameters: + return self.scan_parameters.additional_scan_parameters[name] return None def _get_start_stop(self): @@ -419,7 +407,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): status = CompareStatus(self.calculator.calc_done, 0) self.cancel_on_stop(status) status.wait(self.timeout_for_pvwait) - time.sleep(0.25) + time.sleep(0.25) # needed, otherwise there can be a timing issue with reset/calc_angle. if mode == "AngleToEnergy": self.calculator.calc_angle.put(inp) diff --git a/superxas_bec/devices/nidaq/nidaq.py b/superxas_bec/devices/nidaq/nidaq.py index 3ac1664..9216f7f 100644 --- a/superxas_bec/devices/nidaq/nidaq.py +++ b/superxas_bec/devices/nidaq/nidaq.py @@ -37,18 +37,12 @@ class NidaqControl(Device): energy = Cpt(SetableSignal, value=0, kind=Kind.normal) - smpl_abs = Cpt( - SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream sample absorption" - ) + smpl_abs = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream sample absorption") smpl_fluo = Cpt( SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream sample fluorescence" ) - ref_abs = Cpt( - SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream reference absorption" - ) - cisum = Cpt( - SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter sum" - ) + ref_abs = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream reference absorption") + cisum = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter sum") ai0_mean = Cpt( SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 0, MEAN" @@ -364,7 +358,9 @@ class NidaqControl(Device): ### Control PVs ### - enable_compression = Cpt(EpicsSignal, suffix="NIDAQ-EnableRLE", kind=Kind.config, auto_monitor=True) + enable_compression = Cpt( + EpicsSignal, suffix="NIDAQ-EnableRLE", kind=Kind.config, auto_monitor=True + ) # enable_dead_time_correction = Cpt(EpicsSignal, suffix="NIDAQ-EnableDTC", kind=Kind.config, auto_monitor=True) kickoff_call = Cpt(EpicsSignal, suffix="NIDAQ-Kickoff", kind=Kind.config) stage_call = Cpt(EpicsSignal, suffix="NIDAQ-Stage", kind=Kind.config) @@ -373,13 +369,29 @@ class NidaqControl(Device): compression_ratio = Cpt(EpicsSignalRO, suffix="NIDAQ-CompressionRatio", kind=Kind.config) scan_type = Cpt(EpicsSignal, suffix="NIDAQ-ScanType", kind=Kind.config) scan_type_string = Cpt(EpicsSignal, suffix="NIDAQ-ScanType", kind=Kind.config, string=True) - sampling_rate = Cpt(EpicsSignal, suffix="NIDAQ-SamplingRateRequested", kind=Kind.config, auto_monitor=True) - sampling_rate_string = Cpt(EpicsSignal, suffix="NIDAQ-SamplingRateRequested", kind=Kind.config, string=True, auto_monitor=True) + sampling_rate = Cpt( + EpicsSignal, suffix="NIDAQ-SamplingRateRequested", kind=Kind.config, auto_monitor=True + ) + sampling_rate_string = Cpt( + EpicsSignal, + suffix="NIDAQ-SamplingRateRequested", + kind=Kind.config, + string=True, + auto_monitor=True, + ) scan_duration = Cpt(EpicsSignal, suffix="NIDAQ-SamplingDuration", kind=Kind.config) - readout_range = Cpt(EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config, auto_monitor=True) - readout_range_string = Cpt(EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config, string=True, auto_monitor=True) - encoder_factor = Cpt(EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config, auto_monitor=True) - encoder_factor_string = Cpt(EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config, string=True, auto_monitor=True) + readout_range = Cpt( + EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config, auto_monitor=True + ) + readout_range_string = Cpt( + EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config, string=True, auto_monitor=True + ) + encoder_factor = Cpt( + EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config, auto_monitor=True + ) + encoder_factor_string = Cpt( + EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config, string=True, auto_monitor=True + ) stop_call = Cpt(EpicsSignal, suffix="NIDAQ-Stop", kind=Kind.config) power = Cpt(EpicsSignal, suffix="NIDAQ-Power", kind=Kind.config) heartbeat = Cpt(EpicsSignal, suffix="NIDAQ-Heartbeat", kind=Kind.config, auto_monitor=True) @@ -394,22 +406,38 @@ class NidaqControl(Device): ref_abs_ln = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_ln", kind=Kind.config, auto_monitor=True) smpl_abs_no = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_no", kind=Kind.config, auto_monitor=True) - smpl_abs_no_string = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_no", kind=Kind.config, string=True, auto_monitor=True) + smpl_abs_no_string = Cpt( + EpicsSignal, suffix="NIDAQ-smpl_abs_no", kind=Kind.config, string=True, auto_monitor=True + ) smpl_abs_de = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_de", kind=Kind.config, auto_monitor=True) - smpl_abs_de_string = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_de", kind=Kind.config, string=True, auto_monitor=True) + smpl_abs_de_string = Cpt( + EpicsSignal, suffix="NIDAQ-smpl_abs_de", kind=Kind.config, string=True, auto_monitor=True + ) - smpl_fluo_no = Cpt(EpicsSignal, suffix="NIDAQ-smpl_fluo_no", kind=Kind.config, auto_monitor=True) - smpl_fluo_no_string = Cpt(EpicsSignal, suffix="NIDAQ-smpl_fluo_no", kind=Kind.config, string=True, auto_monitor=True) + smpl_fluo_no = Cpt( + EpicsSignal, suffix="NIDAQ-smpl_fluo_no", kind=Kind.config, auto_monitor=True + ) + smpl_fluo_no_string = Cpt( + EpicsSignal, suffix="NIDAQ-smpl_fluo_no", kind=Kind.config, string=True, auto_monitor=True + ) - smpl_fluo_de = Cpt(EpicsSignal, suffix="NIDAQ-smpl_fluo_de", kind=Kind.config, auto_monitor=True) - smpl_fluo_de_string = Cpt(EpicsSignal, suffix="NIDAQ-smpl_fluo_de", kind=Kind.config, string=True, auto_monitor=True) + smpl_fluo_de = Cpt( + EpicsSignal, suffix="NIDAQ-smpl_fluo_de", kind=Kind.config, auto_monitor=True + ) + smpl_fluo_de_string = Cpt( + EpicsSignal, suffix="NIDAQ-smpl_fluo_de", kind=Kind.config, string=True, auto_monitor=True + ) ref_abs_no = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_no", kind=Kind.config, auto_monitor=True) - ref_abs_no_string = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_no", kind=Kind.config, string=True, auto_monitor=True) + ref_abs_no_string = Cpt( + EpicsSignal, suffix="NIDAQ-ref_abs_no", kind=Kind.config, string=True, auto_monitor=True + ) ref_abs_de = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_de", kind=Kind.config, auto_monitor=True) - ref_abs_de_string = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_de", kind=Kind.config, string=True, auto_monitor=True) + ref_abs_de_string = Cpt( + EpicsSignal, suffix="NIDAQ-ref_abs_de", kind=Kind.config, string=True, auto_monitor=True + ) class Nidaq(PSIDeviceBase, NidaqControl): @@ -429,12 +457,14 @@ class Nidaq(PSIDeviceBase, NidaqControl): super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs) self.scan_parameters: ScanServerScanInfo | None = None self.timeout_wait_for_signal = 5 # put 5s firsts - self._timeout_wait_for_pv = 5 # 5s timeout for pv calls. editted due to timeout issues persisting + self._timeout_wait_for_pv = ( + 5 # 5s timeout for pv calls. editted due to timeout issues persisting + ) self.valid_scan_names = [ "xas_simple_scan", - "xas_simple_scan_with_xrd", + "xas_simple_scan_with_xrd", # prepared for future, currently not yet available "xas_advanced_scan", - "xas_advanced_scan_with_xrd", + "xas_advanced_scan_with_xrd", # prepared for future, currently not yet available "nidaq_continuous_scan", ] @@ -705,20 +735,8 @@ class Nidaq(PSIDeviceBase, NidaqControl): """Fetch a scan parameter from v4 metadata, with legacy fallbacks.""" if self.scan_parameters is None: return None - sources = [ - self.scan_parameters.additional_scan_parameters, - getattr(self.scan_info.msg, "scan_parameters", {}), - ] - request_inputs = self.scan_parameters.request_inputs or {} - sources.extend( - [ - request_inputs.get("inputs", {}), - request_inputs.get("kwargs", {}), - ] - ) - for source in sources: - if isinstance(source, dict) and name in source: - return source[name] + if name in self.scan_parameters.additional_scan_parameters: + return self.scan_parameters.additional_scan_parameters[name] return None def _progress_update(self, value, **kwargs) -> None: -- 2.52.0 From 3fb2fd9703a38814fd3c152d6bbd2a7741afaa59 Mon Sep 17 00:00:00 2001 From: x10da Date: Tue, 2 Jun 2026 10:44:30 +0200 Subject: [PATCH 4/4] fix: mo1_bragg calculator --- superxas_bec/devices/mo1_bragg/mo1_bragg.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/superxas_bec/devices/mo1_bragg/mo1_bragg.py b/superxas_bec/devices/mo1_bragg/mo1_bragg.py index ee5233c..90d821d 100644 --- a/superxas_bec/devices/mo1_bragg/mo1_bragg.py +++ b/superxas_bec/devices/mo1_bragg/mo1_bragg.py @@ -402,26 +402,31 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): Returns: output (float): Converted angle or energy """ + self.calculator.calc_reset.put(0) self.calculator.calc_reset.put(1) status = CompareStatus(self.calculator.calc_done, 0) self.cancel_on_stop(status) status.wait(self.timeout_for_pvwait) - time.sleep(0.25) # needed, otherwise there can be a timing issue with reset/calc_angle. + self.calculator.calc_reset.put(0) if mode == "AngleToEnergy": - self.calculator.calc_angle.put(inp) + in_signal = self.calculator.calc_angle + out_signal = self.calculator.calc_energy elif mode == "EnergyToAngle": - self.calculator.calc_energy.put(inp) + in_signal = self.calculator.calc_energy + out_signal = self.calculator.calc_angle + else: + raise Mo1BraggError(f'Unknown mode {mode}') + in_signal.put(inp) status = CompareStatus(self.calculator.calc_done, 1) self.cancel_on_stop(status) status.wait(self.timeout_for_pvwait) - time.sleep(0.25) # TODO needed still? Needed due to update frequency of softIOC - if mode == "AngleToEnergy": - return self.calculator.calc_energy.get() - elif mode == "EnergyToAngle": - return self.calculator.calc_angle.get() + status = CompareStatus(out_signal, 0, operation_success='>') + self.cancel_on_stop(status) + status.wait(self.timeout_for_pvwait) + return out_signal.get() def set_advanced_xas_settings( self, low: float, high: float, scan_time: float, p_kink: float, e_kink: float -- 2.52.0