refactor(mo1-bragg): migrate mo1_bragg to scans v4

This commit is contained in:
2026-05-22 10:28:55 +02:00
parent 78d58ad26f
commit 8bc36ed6a2
+121 -85
View File
@@ -13,6 +13,7 @@ from typing import Literal
from bec_lib.devicemanager import ScanInfo
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd import Component as Cpt
from ophyd import DeviceStatus, StatusBase
from ophyd.status import WaitTimeoutError
@@ -33,6 +34,7 @@ from debye_bec.devices.mo1_bragg.mo1_bragg_enums import (
TriggerControlSource,
)
from debye_bec.devices.mo1_bragg.mo1_bragg_utils import compute_spline
from debye_bec.devices.utils.utils import fetch_scan_info
# Initialise logger
logger = bec_logger.logger
@@ -44,36 +46,6 @@ class Mo1BraggError(Exception):
"""Exception for the Mo1 Bragg positioner"""
########## Scan Parameter Model ##########
class ScanParameter(BaseModel):
"""Dataclass to store the scan parameters for the Mo1 Bragg positioner.
This needs to be in sync with the kwargs of the MO1 Bragg scans from Debye, to
ensure that the scan parameters are correctly set. Any changes in the scan kwargs,
i.e. renaming or adding new parameters, need to be represented here as well."""
scan_time: float | None = Field(None, description="Scan time for a half oscillation")
scan_duration: float | None = Field(None, description="Duration of the scan")
break_enable_low: bool | None = Field(
None, description="Break enabled for low, should be PV trig_ena_lo_enum"
) # trig_enable_low: bool = None
break_enable_high: bool | None = Field(
None, description="Break enabled for high, should be PV trig_ena_hi_enum"
) # trig_enable_high: bool = None
break_time_low: float | None = Field(None, description="Break time low energy/angle")
break_time_high: float | None = Field(None, description="Break time high energy/angle")
cycle_low: int | None = Field(None, description="Cycle for low energy/angle")
cycle_high: int | None = Field(None, description="Cycle for high energy/angle")
exp_time: float | None = Field(None, description="XRD trigger period")
n_of_trigger: int | None = Field(None, description="Amount of XRD triggers")
start: float | None = Field(None, description="Start value for energy/angle")
stop: float | None = Field(None, description="Stop value for energy/angle")
p_kink: float | None = Field(None, description="P Kink")
e_kink: float | None = Field(None, description="Energy Kink")
model_config: dict = {"validate_assignment": True}
########### Mo1 Bragg Motor Class ###########
@@ -96,7 +68,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
scan_info (ScanInfo): The scan info to use.
"""
super().__init__(name=name, scan_info=scan_info, prefix=prefix, **kwargs)
self.scan_parameter = ScanParameter()
self.scan_parameters: ScanServerScanInfo = None
self.timeout_for_pvwait = 7.5
########################################
@@ -124,20 +96,24 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
self.scan_parameters = fetch_scan_info(self.scan_info)
if self.scan_control.scan_msg.get() != ScanControlLoadMessage.PENDING:
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
self.cancel_on_stop(status)
self.scan_control.scan_val_reset.put(1)
status.wait(timeout=self.timeout_for_pvwait)
scan_name = self.scan_info.msg.scan_name
self._update_scan_parameter()
scan_name = self.scan_parameters.msg.scan_name
start = self.scan_parameters.additional_scan_parameters.get("start", None)
stop = self.scan_parameters.additional_scan_parameters.get("stop", None)
scan_time = self.scan_parameters.additional_scan_parameters.get("scan_time", None)
scan_duration = self.scan_parameters.additional_scan_parameters.get("scan_duration", None)
if scan_name == "xas_simple_scan":
self.set_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
)
if any(param is None for param in [start, stop, scan_time, scan_duration]):
raise Mo1BraggError(
f"Missing scan parameters for xas_simple_scan. Required parameters: start, stop, scan_time, scan_duration in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}"
)
self.set_xas_settings(low=start, high=stop, scan_time=scan_time)
self.set_trig_settings(
enable_low=False,
enable_high=False,
@@ -148,35 +124,67 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
exp_time=0,
n_of_trigger=0,
)
self.set_scan_control_settings(
mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration
)
self.set_scan_control_settings(mode=ScanControlMode.SIMPLE, scan_duration=scan_duration)
elif scan_name == "xas_simple_scan_with_xrd":
self.set_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
break_enable_low = self.scan_parameters.additional_scan_parameters.get(
"break_enable_low", None
)
break_enable_high = self.scan_parameters.additional_scan_parameters.get(
"break_enable_high", None
)
break_time_low = self.scan_parameters.additional_scan_parameters.get(
"break_time_low", None
)
break_time_high = self.scan_parameters.additional_scan_parameters.get(
"break_time_high", None
)
cycle_low = self.scan_parameters.additional_scan_parameters.get("cycle_low", None)
cycle_high = self.scan_parameters.additional_scan_parameters.get("cycle_high", None)
exp_time = self.scan_parameters.additional_scan_parameters.get("exp_time", None)
n_of_trigger = self.scan_parameters.additional_scan_parameters.get("n_of_trigger", None)
if any(
param is None
for param in [
start,
stop,
scan_time,
scan_duration,
break_enable_low,
break_enable_high,
break_time_low,
break_time_high,
cycle_low,
cycle_high,
exp_time,
n_of_trigger,
]
):
raise Mo1BraggError(
f"Missing scan parameters for xas_simple_scan_with_xrd. Required parameters: start, stop, scan_time, scan_duration, break_enable_low, break_enable_high, break_time_low, break_time_high, cycle_low, cycle_high, exp_time, n_of_trigger in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}"
)
self.set_xas_settings(low=start, high=stop, scan_time=scan_time)
self.set_trig_settings(
enable_low=self.scan_parameter.break_enable_low,
enable_high=self.scan_parameter.break_enable_high,
break_time_low=self.scan_parameter.break_time_low,
break_time_high=self.scan_parameter.break_time_high,
cycle_low=self.scan_parameter.cycle_low,
cycle_high=self.scan_parameter.cycle_high,
exp_time=self.scan_parameter.exp_time,
n_of_trigger=self.scan_parameter.n_of_trigger,
)
self.set_scan_control_settings(
mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration
enable_low=break_enable_low,
enable_high=break_enable_high,
break_time_low=break_time_low,
break_time_high=break_time_high,
cycle_low=cycle_low,
cycle_high=cycle_high,
exp_time=exp_time,
n_of_trigger=n_of_trigger,
)
self.set_scan_control_settings(mode=ScanControlMode.SIMPLE, scan_duration=scan_duration)
elif scan_name == "xas_advanced_scan":
p_kink = self.scan_parameters.additional_scan_parameters.get("p_kink", None)
e_kink = self.scan_parameters.additional_scan_parameters.get("e_kink", None)
if any(
param is None for param in [start, stop, scan_time, scan_duration, p_kink, e_kink]
):
raise Mo1BraggError(
f"Missing scan parameters for xas_advanced_scan. Required parameters: start, stop, scan_time, scan_duration, p_kink, e_kink in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}"
)
self.set_advanced_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
p_kink=self.scan_parameter.p_kink,
e_kink=self.scan_parameter.e_kink,
low=start, high=stop, scan_time=scan_time, p_kink=p_kink, e_kink=e_kink
)
self.set_trig_settings(
enable_low=False,
@@ -189,28 +197,65 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
n_of_trigger=0,
)
self.set_scan_control_settings(
mode=ScanControlMode.ADVANCED, scan_duration=self.scan_parameter.scan_duration
mode=ScanControlMode.ADVANCED, scan_duration=scan_duration
)
elif scan_name == "xas_advanced_scan_with_xrd":
p_kink = self.scan_parameters.additional_scan_parameters.get("p_kink", None)
e_kink = self.scan_parameters.additional_scan_parameters.get("e_kink", None)
break_enable_low = self.scan_parameters.additional_scan_parameters.get(
"break_enable_low", None
)
break_enable_high = self.scan_parameters.additional_scan_parameters.get(
"break_enable_high", None
)
break_time_low = self.scan_parameters.additional_scan_parameters.get(
"break_time_low", None
)
break_time_high = self.scan_parameters.additional_scan_parameters.get(
"break_time_high", None
)
cycle_low = self.scan_parameters.additional_scan_parameters.get("cycle_low", None)
cycle_high = self.scan_parameters.additional_scan_parameters.get("cycle_high", None)
exp_time = self.scan_parameters.additional_scan_parameters.get("exp_time", None)
n_of_trigger = self.scan_parameters.additional_scan_parameters.get("n_of_trigger", None)
if any(
param is None
for param in [
start,
stop,
scan_time,
scan_duration,
p_kink,
e_kink,
break_enable_low,
break_enable_high,
break_time_low,
break_time_high,
cycle_low,
cycle_high,
exp_time,
n_of_trigger,
]
):
raise Mo1BraggError(
f"Missing scan parameters for xas_advanced_scan_with_xrd. Required parameters: start, stop, scan_time, scan_duration, p_kink, e_kink, break_enable_low, break_enable_high, break_time_low, break_time_high, cycle_low, cycle_high, exp_time, n_of_trigger in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}"
)
self.set_advanced_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
p_kink=self.scan_parameter.p_kink,
e_kink=self.scan_parameter.e_kink,
low=start, high=stop, scan_time=scan_time, p_kink=p_kink, e_kink=e_kink
)
self.set_trig_settings(
enable_low=self.scan_parameter.break_enable_low,
enable_high=self.scan_parameter.break_enable_high,
break_time_low=self.scan_parameter.break_time_low,
break_time_high=self.scan_parameter.break_time_high,
cycle_low=self.scan_parameter.cycle_low,
cycle_high=self.scan_parameter.cycle_high,
exp_time=self.scan_parameter.exp_time,
n_of_trigger=self.scan_parameter.n_of_trigger,
enable_low=break_enable_low,
enable_high=break_enable_high,
break_time_low=break_time_low,
break_time_high=break_time_high,
cycle_low=cycle_low,
cycle_high=cycle_high,
exp_time=exp_time,
n_of_trigger=n_of_trigger,
)
self.set_scan_control_settings(
mode=ScanControlMode.ADVANCED, scan_duration=self.scan_parameter.scan_duration
mode=ScanControlMode.ADVANCED, scan_duration=scan_duration
)
else:
return
@@ -468,12 +513,3 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
for s in status_list:
s.wait(timeout=self.timeout_for_pvwait)
def _update_scan_parameter(self):
"""Get the scan_info parameters for the scan."""
for key, value in self.scan_info.msg.request_inputs["inputs"].items():
if hasattr(self.scan_parameter, key):
setattr(self.scan_parameter, key, value)
for key, value in self.scan_info.msg.request_inputs["kwargs"].items():
if hasattr(self.scan_parameter, key):
setattr(self.scan_parameter, key, value)