From 8bc36ed6a2e8ab778a99f034423ede15fc7e4a03 Mon Sep 17 00:00:00 2001 From: appel_c Date: Fri, 22 May 2026 10:28:55 +0200 Subject: [PATCH] refactor(mo1-bragg): migrate mo1_bragg to scans v4 --- debye_bec/devices/mo1_bragg/mo1_bragg.py | 206 +++++++++++++---------- 1 file changed, 121 insertions(+), 85 deletions(-) diff --git a/debye_bec/devices/mo1_bragg/mo1_bragg.py b/debye_bec/devices/mo1_bragg/mo1_bragg.py index 5189816..3e14afd 100644 --- a/debye_bec/devices/mo1_bragg/mo1_bragg.py +++ b/debye_bec/devices/mo1_bragg/mo1_bragg.py @@ -13,6 +13,7 @@ from typing import Literal from bec_lib.devicemanager import ScanInfo from bec_lib.logger import bec_logger +from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo from ophyd import Component as Cpt from ophyd import DeviceStatus, StatusBase from ophyd.status import WaitTimeoutError @@ -33,6 +34,7 @@ from debye_bec.devices.mo1_bragg.mo1_bragg_enums import ( TriggerControlSource, ) from debye_bec.devices.mo1_bragg.mo1_bragg_utils import compute_spline +from debye_bec.devices.utils.utils import fetch_scan_info # Initialise logger logger = bec_logger.logger @@ -44,36 +46,6 @@ class Mo1BraggError(Exception): """Exception for the Mo1 Bragg positioner""" -########## Scan Parameter Model ########## - - -class ScanParameter(BaseModel): - """Dataclass to store the scan parameters for the Mo1 Bragg positioner. - This needs to be in sync with the kwargs of the MO1 Bragg scans from Debye, to - ensure that the scan parameters are correctly set. Any changes in the scan kwargs, - i.e. renaming or adding new parameters, need to be represented here as well.""" - - scan_time: float | None = Field(None, description="Scan time for a half oscillation") - scan_duration: float | None = Field(None, description="Duration of the scan") - break_enable_low: bool | None = Field( - None, description="Break enabled for low, should be PV trig_ena_lo_enum" - ) # trig_enable_low: bool = None - break_enable_high: bool | None = Field( - None, description="Break enabled for high, should be PV trig_ena_hi_enum" - ) # trig_enable_high: bool = None - break_time_low: float | None = Field(None, description="Break time low energy/angle") - break_time_high: float | None = Field(None, description="Break time high energy/angle") - cycle_low: int | None = Field(None, description="Cycle for low energy/angle") - cycle_high: int | None = Field(None, description="Cycle for high energy/angle") - exp_time: float | None = Field(None, description="XRD trigger period") - n_of_trigger: int | None = Field(None, description="Amount of XRD triggers") - start: float | None = Field(None, description="Start value for energy/angle") - stop: float | None = Field(None, description="Stop value for energy/angle") - p_kink: float | None = Field(None, description="P Kink") - e_kink: float | None = Field(None, description="Energy Kink") - model_config: dict = {"validate_assignment": True} - - ########### Mo1 Bragg Motor Class ########### @@ -96,7 +68,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): scan_info (ScanInfo): The scan info to use. """ super().__init__(name=name, scan_info=scan_info, prefix=prefix, **kwargs) - self.scan_parameter = ScanParameter() + self.scan_parameters: ScanServerScanInfo = None self.timeout_for_pvwait = 7.5 ######################################## @@ -124,20 +96,24 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object. """ + self.scan_parameters = fetch_scan_info(self.scan_info) if self.scan_control.scan_msg.get() != ScanControlLoadMessage.PENDING: status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING) self.cancel_on_stop(status) self.scan_control.scan_val_reset.put(1) status.wait(timeout=self.timeout_for_pvwait) - scan_name = self.scan_info.msg.scan_name - self._update_scan_parameter() + scan_name = self.scan_parameters.msg.scan_name + start = self.scan_parameters.additional_scan_parameters.get("start", None) + stop = self.scan_parameters.additional_scan_parameters.get("stop", None) + scan_time = self.scan_parameters.additional_scan_parameters.get("scan_time", None) + scan_duration = self.scan_parameters.additional_scan_parameters.get("scan_duration", None) if scan_name == "xas_simple_scan": - self.set_xas_settings( - low=self.scan_parameter.start, - high=self.scan_parameter.stop, - scan_time=self.scan_parameter.scan_time, - ) + if any(param is None for param in [start, stop, scan_time, scan_duration]): + raise Mo1BraggError( + f"Missing scan parameters for xas_simple_scan. Required parameters: start, stop, scan_time, scan_duration in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}" + ) + self.set_xas_settings(low=start, high=stop, scan_time=scan_time) self.set_trig_settings( enable_low=False, enable_high=False, @@ -148,35 +124,67 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): exp_time=0, n_of_trigger=0, ) - self.set_scan_control_settings( - mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration - ) + self.set_scan_control_settings(mode=ScanControlMode.SIMPLE, scan_duration=scan_duration) elif scan_name == "xas_simple_scan_with_xrd": - self.set_xas_settings( - low=self.scan_parameter.start, - high=self.scan_parameter.stop, - scan_time=self.scan_parameter.scan_time, + break_enable_low = self.scan_parameters.additional_scan_parameters.get( + "break_enable_low", None ) + break_enable_high = self.scan_parameters.additional_scan_parameters.get( + "break_enable_high", None + ) + break_time_low = self.scan_parameters.additional_scan_parameters.get( + "break_time_low", None + ) + break_time_high = self.scan_parameters.additional_scan_parameters.get( + "break_time_high", None + ) + cycle_low = self.scan_parameters.additional_scan_parameters.get("cycle_low", None) + cycle_high = self.scan_parameters.additional_scan_parameters.get("cycle_high", None) + exp_time = self.scan_parameters.additional_scan_parameters.get("exp_time", None) + n_of_trigger = self.scan_parameters.additional_scan_parameters.get("n_of_trigger", None) + if any( + param is None + for param in [ + start, + stop, + scan_time, + scan_duration, + break_enable_low, + break_enable_high, + break_time_low, + break_time_high, + cycle_low, + cycle_high, + exp_time, + n_of_trigger, + ] + ): + raise Mo1BraggError( + f"Missing scan parameters for xas_simple_scan_with_xrd. Required parameters: start, stop, scan_time, scan_duration, break_enable_low, break_enable_high, break_time_low, break_time_high, cycle_low, cycle_high, exp_time, n_of_trigger in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}" + ) + self.set_xas_settings(low=start, high=stop, scan_time=scan_time) self.set_trig_settings( - enable_low=self.scan_parameter.break_enable_low, - enable_high=self.scan_parameter.break_enable_high, - break_time_low=self.scan_parameter.break_time_low, - break_time_high=self.scan_parameter.break_time_high, - cycle_low=self.scan_parameter.cycle_low, - cycle_high=self.scan_parameter.cycle_high, - exp_time=self.scan_parameter.exp_time, - n_of_trigger=self.scan_parameter.n_of_trigger, - ) - self.set_scan_control_settings( - mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration + enable_low=break_enable_low, + enable_high=break_enable_high, + break_time_low=break_time_low, + break_time_high=break_time_high, + cycle_low=cycle_low, + cycle_high=cycle_high, + exp_time=exp_time, + n_of_trigger=n_of_trigger, ) + self.set_scan_control_settings(mode=ScanControlMode.SIMPLE, scan_duration=scan_duration) elif scan_name == "xas_advanced_scan": + p_kink = self.scan_parameters.additional_scan_parameters.get("p_kink", None) + e_kink = self.scan_parameters.additional_scan_parameters.get("e_kink", None) + if any( + param is None for param in [start, stop, scan_time, scan_duration, p_kink, e_kink] + ): + raise Mo1BraggError( + f"Missing scan parameters for xas_advanced_scan. Required parameters: start, stop, scan_time, scan_duration, p_kink, e_kink in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}" + ) self.set_advanced_xas_settings( - low=self.scan_parameter.start, - high=self.scan_parameter.stop, - scan_time=self.scan_parameter.scan_time, - p_kink=self.scan_parameter.p_kink, - e_kink=self.scan_parameter.e_kink, + low=start, high=stop, scan_time=scan_time, p_kink=p_kink, e_kink=e_kink ) self.set_trig_settings( enable_low=False, @@ -189,28 +197,65 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): n_of_trigger=0, ) self.set_scan_control_settings( - mode=ScanControlMode.ADVANCED, scan_duration=self.scan_parameter.scan_duration + mode=ScanControlMode.ADVANCED, scan_duration=scan_duration ) elif scan_name == "xas_advanced_scan_with_xrd": + p_kink = self.scan_parameters.additional_scan_parameters.get("p_kink", None) + e_kink = self.scan_parameters.additional_scan_parameters.get("e_kink", None) + break_enable_low = self.scan_parameters.additional_scan_parameters.get( + "break_enable_low", None + ) + break_enable_high = self.scan_parameters.additional_scan_parameters.get( + "break_enable_high", None + ) + break_time_low = self.scan_parameters.additional_scan_parameters.get( + "break_time_low", None + ) + break_time_high = self.scan_parameters.additional_scan_parameters.get( + "break_time_high", None + ) + cycle_low = self.scan_parameters.additional_scan_parameters.get("cycle_low", None) + cycle_high = self.scan_parameters.additional_scan_parameters.get("cycle_high", None) + exp_time = self.scan_parameters.additional_scan_parameters.get("exp_time", None) + n_of_trigger = self.scan_parameters.additional_scan_parameters.get("n_of_trigger", None) + if any( + param is None + for param in [ + start, + stop, + scan_time, + scan_duration, + p_kink, + e_kink, + break_enable_low, + break_enable_high, + break_time_low, + break_time_high, + cycle_low, + cycle_high, + exp_time, + n_of_trigger, + ] + ): + raise Mo1BraggError( + f"Missing scan parameters for xas_advanced_scan_with_xrd. Required parameters: start, stop, scan_time, scan_duration, p_kink, e_kink, break_enable_low, break_enable_high, break_time_low, break_time_high, cycle_low, cycle_high, exp_time, n_of_trigger in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}" + ) + self.set_advanced_xas_settings( - low=self.scan_parameter.start, - high=self.scan_parameter.stop, - scan_time=self.scan_parameter.scan_time, - p_kink=self.scan_parameter.p_kink, - e_kink=self.scan_parameter.e_kink, + low=start, high=stop, scan_time=scan_time, p_kink=p_kink, e_kink=e_kink ) self.set_trig_settings( - enable_low=self.scan_parameter.break_enable_low, - enable_high=self.scan_parameter.break_enable_high, - break_time_low=self.scan_parameter.break_time_low, - break_time_high=self.scan_parameter.break_time_high, - cycle_low=self.scan_parameter.cycle_low, - cycle_high=self.scan_parameter.cycle_high, - exp_time=self.scan_parameter.exp_time, - n_of_trigger=self.scan_parameter.n_of_trigger, + enable_low=break_enable_low, + enable_high=break_enable_high, + break_time_low=break_time_low, + break_time_high=break_time_high, + cycle_low=cycle_low, + cycle_high=cycle_high, + exp_time=exp_time, + n_of_trigger=n_of_trigger, ) self.set_scan_control_settings( - mode=ScanControlMode.ADVANCED, scan_duration=self.scan_parameter.scan_duration + mode=ScanControlMode.ADVANCED, scan_duration=scan_duration ) else: return @@ -468,12 +513,3 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): for s in status_list: s.wait(timeout=self.timeout_for_pvwait) - - def _update_scan_parameter(self): - """Get the scan_info parameters for the scan.""" - for key, value in self.scan_info.msg.request_inputs["inputs"].items(): - if hasattr(self.scan_parameter, key): - setattr(self.scan_parameter, key, value) - for key, value in self.scan_info.msg.request_inputs["kwargs"].items(): - if hasattr(self.scan_parameter, key): - setattr(self.scan_parameter, key, value)