Migrate SuperXAS scans to V4 #21

Merged
hitz_s merged 4 commits from superxas-v4-migration into main 2026-06-02 12:49:40 +02:00
13 changed files with 945 additions and 476 deletions
+165 -98
View File
@@ -8,31 +8,28 @@ used to ensure that the action is executed completely. This is believed
to allow for a more stable execution of the action."""
import time
from typing import Any, Literal
from typing import Literal
from bec_lib.devicemanager import ScanInfo
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd import Component as Cpt
from ophyd import DeviceStatus, Signal, StatusBase
from ophyd.status import SubscriptionStatus, WaitTimeoutError
from ophyd import DeviceStatus, StatusBase
from ophyd.status import WaitTimeoutError
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.errors import DeviceStopError
from pydantic import BaseModel, Field
from typeguard import typechecked
from superxas_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner
# pylint: disable=unused-import
from superxas_bec.devices.mo1_bragg.mo1_bragg_enums import (
MoveType,
ScanControlLoadMessage,
ScanControlMode,
ScanControlScanStatus,
TriggerControlMode,
TriggerControlSource,
)
from superxas_bec.devices.mo1_bragg.mo1_bragg_utils import compute_spline
from superxas_bec.devices.utils.utils import fetch_scan_info
# Initialise logger
logger = bec_logger.logger
@@ -44,34 +41,6 @@ class Mo1BraggError(Exception):
"""Exception for the Mo1 Bragg positioner"""
########## Scan Parameter Model ##########
class ScanParameter(BaseModel):
"""Dataclass to store the scan parameters for the Mo1 Bragg positioner.
This needs to be in sync with the kwargs of the MO1 Bragg scans from SuperXAS, to
ensure that the scan parameters are correctly set. Any changes in the scan kwargs,
i.e. renaming or adding new parameters, need to be represented here as well."""
scan_time: float | None = Field(None, description="Scan time for a half oscillation")
scan_duration: float | None = Field(None, description="Duration of the scan")
xrd_enable_low: bool | None = Field(
None, description="XRD enabled for low, should be PV trig_ena_lo_enum"
) # trig_enable_low: bool = None
xrd_enable_high: bool | None = Field(
None, description="XRD enabled for high, should be PV trig_ena_hi_enum"
) # trig_enable_high: bool = None
exp_time_low: float | None = Field(None, description="Exposure time low energy/angle")
exp_time_high: float | None = Field(None, description="Exposure time high energy/angle")
cycle_low: int | None = Field(None, description="Cycle for low energy/angle")
cycle_high: int | None = Field(None, description="Cycle for high energy/angle")
start: float | None = Field(None, description="Start value for energy/angle")
stop: float | None = Field(None, description="Stop value for energy/angle")
p_kink: float | None = Field(None, description="P Kink")
e_kink: float | None = Field(None, description="Energy Kink")
model_config: dict = {"validate_assignment": True}
########### Mo1 Bragg Motor Class ###########
@@ -83,7 +52,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
progress_signal = Cpt(ProgressSignal, name="progress_signal")
USER_ACCESS = ["set_advanced_xas_settings", "set_xtal"]
USER_ACCESS = ["set_advanced_xas_settings", "set_xtal", "convert_angle_energy"]
def __init__(self, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): # type: ignore
"""
@@ -94,8 +63,15 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
scan_info (ScanInfo): The scan info to use.
"""
super().__init__(name=name, scan_info=scan_info, prefix=prefix, **kwargs)
self.scan_parameter = ScanParameter()
self.scan_parameters: ScanServerScanInfo | None = None
self.timeout_for_pvwait = 7.5
self.valid_scan_names = [
"xas_simple_scan",
"xas_simple_scan_with_xrd", # Prepared for future, currently not yet available
"xas_advanced_scan",
"xas_advanced_scan_with_xrd", # Prepared for future, currently not yet available
"nidaq_continuous_scan",
]
########################################
# Beamline Specific Implementations #
@@ -122,6 +98,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
self.scan_parameters = fetch_scan_info(self.scan_info)
if self.scan_control.scan_msg.get() != ScanControlLoadMessage.PENDING:
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
@@ -129,15 +106,20 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
self.scan_control.scan_val_reset.put(1)
status.wait(timeout=self.timeout_for_pvwait)
scan_name = self.scan_info.msg.scan_name
self._update_scan_parameter()
scan_name = self.scan_parameters.scan_name
if not self._check_if_scan_name_is_valid(self.scan_parameters):
return None
start, stop = self._get_start_stop()
scan_time = self._scan_parameter("scan_time")
scan_duration = self._scan_parameter("scan_duration")
if scan_name == "xas_simple_scan":
self.set_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
self._raise_for_missing(
scan_name,
["start", "stop", "scan_time", "scan_duration"],
[start, stop, scan_time, scan_duration],
)
self.set_xas_settings(low=start, high=stop, scan_time=scan_time)
self.set_trig_settings(
enable_low=False,
enable_high=False,
@@ -146,33 +128,61 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
cycle_low=0,
cycle_high=0,
)
self.set_scan_control_settings(
mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration
)
self.set_scan_control_settings(mode=ScanControlMode.SIMPLE, scan_duration=scan_duration)
elif scan_name == "xas_simple_scan_with_xrd":
self.set_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
xrd_enable_low = self._scan_parameter("xrd_enable_low", "break_enable_low")
xrd_enable_high = self._scan_parameter("xrd_enable_high", "break_enable_high")
exp_time_low = self._scan_parameter("exp_time_low", "break_time_low")
exp_time_high = self._scan_parameter("exp_time_high", "break_time_high")
cycle_low = self._scan_parameter("cycle_low")
cycle_high = self._scan_parameter("cycle_high")
self._raise_for_missing(
scan_name,
[
"start",
"stop",
"scan_time",
"scan_duration",
"xrd_enable_low",
"xrd_enable_high",
"exp_time_low",
"exp_time_high",
"cycle_low",
"cycle_high",
],
[
start,
stop,
scan_time,
scan_duration,
xrd_enable_low,
xrd_enable_high,
exp_time_low,
exp_time_high,
cycle_low,
cycle_high,
],
)
self.set_xas_settings(low=start, high=stop, scan_time=scan_time)
self.set_trig_settings(
enable_low=self.scan_parameter.xrd_enable_low, # enable_low=self.scan_parameter.trig_enable_low,
enable_high=self.scan_parameter.xrd_enable_high, # enable_high=self.scan_parameter.trig_enable_high,
exp_time_low=self.scan_parameter.exp_time_low,
exp_time_high=self.scan_parameter.exp_time_high,
cycle_low=self.scan_parameter.cycle_low,
cycle_high=self.scan_parameter.cycle_high,
)
self.set_scan_control_settings(
mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration
enable_low=xrd_enable_low,
enable_high=xrd_enable_high,
exp_time_low=exp_time_low,
exp_time_high=exp_time_high,
cycle_low=cycle_low,
cycle_high=cycle_high,
)
self.set_scan_control_settings(mode=ScanControlMode.SIMPLE, scan_duration=scan_duration)
elif scan_name == "xas_advanced_scan":
p_kink = self._scan_parameter("p_kink")
e_kink = self._scan_parameter("e_kink")
self._raise_for_missing(
scan_name,
["start", "stop", "scan_time", "scan_duration", "p_kink", "e_kink"],
[start, stop, scan_time, scan_duration, p_kink, e_kink],
)
self.set_advanced_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
p_kink=self.scan_parameter.p_kink,
e_kink=self.scan_parameter.e_kink,
low=start, high=stop, scan_time=scan_time, p_kink=p_kink, e_kink=e_kink
)
self.set_trig_settings(
enable_low=False,
@@ -183,26 +193,61 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
cycle_high=0,
)
self.set_scan_control_settings(
mode=ScanControlMode.ADVANCED, scan_duration=self.scan_parameter.scan_duration
mode=ScanControlMode.ADVANCED, scan_duration=scan_duration
)
elif scan_name == "xas_advanced_scan_with_xrd":
p_kink = self._scan_parameter("p_kink")
e_kink = self._scan_parameter("e_kink")
xrd_enable_low = self._scan_parameter("xrd_enable_low", "break_enable_low")
xrd_enable_high = self._scan_parameter("xrd_enable_high", "break_enable_high")
exp_time_low = self._scan_parameter("exp_time_low", "break_time_low")
exp_time_high = self._scan_parameter("exp_time_high", "break_time_high")
cycle_low = self._scan_parameter("cycle_low")
cycle_high = self._scan_parameter("cycle_high")
self._raise_for_missing(
scan_name,
[
"start",
"stop",
"scan_time",
"scan_duration",
"p_kink",
"e_kink",
"xrd_enable_low",
"xrd_enable_high",
"exp_time_low",
"exp_time_high",
"cycle_low",
"cycle_high",
],
[
start,
stop,
scan_time,
scan_duration,
p_kink,
e_kink,
xrd_enable_low,
xrd_enable_high,
exp_time_low,
exp_time_high,
cycle_low,
cycle_high,
],
)
self.set_advanced_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
p_kink=self.scan_parameter.p_kink,
e_kink=self.scan_parameter.e_kink,
low=start, high=stop, scan_time=scan_time, p_kink=p_kink, e_kink=e_kink
)
self.set_trig_settings(
enable_low=self.scan_parameter.xrd_enable_low, # enable_low=self.scan_parameter.trig_enable_low,
enable_high=self.scan_parameter.xrd_enable_high, # enable_high=self.scan_parameter.trig_enable_high,
exp_time_low=self.scan_parameter.exp_time_low,
exp_time_high=self.scan_parameter.exp_time_high,
cycle_low=self.scan_parameter.cycle_low,
cycle_high=self.scan_parameter.cycle_high,
enable_low=xrd_enable_low,
enable_high=xrd_enable_high,
exp_time_low=exp_time_low,
exp_time_high=exp_time_high,
cycle_low=cycle_low,
cycle_high=cycle_high,
)
self.set_scan_control_settings(
mode=ScanControlMode.ADVANCED, scan_duration=self.scan_parameter.scan_duration
mode=ScanControlMode.ADVANCED, scan_duration=scan_duration
)
else:
return
@@ -215,7 +260,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.SUCCESS)
self.cancel_on_stop(status)
self.scan_control.scan_load.put(1)
# Wait for params to be checked from controller
# Wait for params to be checked from controller
status.wait(self.timeout_for_pvwait)
return None
@@ -284,6 +329,33 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
self.stopped = True # Needs to be set to stop motion
######### Utility Methods #########
def _check_if_scan_name_is_valid(self, scan_parameters: ScanServerScanInfo) -> bool:
"""Check if the scan is within the list of scans supported by the backend."""
return scan_parameters.scan_name in self.valid_scan_names
def _scan_parameter(self, *names: str):
"""Fetch a scan parameter from v4 metadata, with legacy fallbacks."""
if self.scan_parameters is None:
return None
for name in names:
if name in self.scan_parameters.additional_scan_parameters:
return self.scan_parameters.additional_scan_parameters[name]
return None
def _get_start_stop(self):
"""Return scan start/stop from v4 positions or legacy request inputs."""
if self.scan_parameters is not None and self.scan_parameters.positions is not None:
if len(self.scan_parameters.positions) == 2:
return self.scan_parameters.positions
return self._scan_parameter("start"), self._scan_parameter("stop")
def _raise_for_missing(self, scan_name: str, names: list[str], values: list) -> None:
if any(value is None for value in values):
raise Mo1BraggError(
f"Missing scan parameters for {scan_name}. Required parameters: "
f"{', '.join(names)} in scan info {self.scan_parameters}"
)
def _progress_update(self, value, **kwargs) -> None:
"""Callback method to update the scan progress, runs a callback
to SUB_PROGRESS subscribers, i.e. BEC.
@@ -313,7 +385,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
status_list.append(self.scan_settings.s_scan_scantime.set(scan_time))
self.cancel_on_stop(status_list[-1])
for s in status_list:
s.wait(timeout=self.timeout_for_pvwait)
@@ -330,25 +402,31 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
Returns:
output (float): Converted angle or energy
"""
self.calculator.calc_reset.put(0)
self.calculator.calc_reset.put(1)
status = CompareStatus(self.calculator.calc_done, 0)
self.cancel_on_stop(status)
status.wait(self.timeout_for_pvwait)
self.calculator.calc_reset.put(0)
if mode == "AngleToEnergy":
self.calculator.calc_angle.put(inp)
in_signal = self.calculator.calc_angle
out_signal = self.calculator.calc_energy
elif mode == "EnergyToAngle":
self.calculator.calc_energy.put(inp)
in_signal = self.calculator.calc_energy
out_signal = self.calculator.calc_angle
else:
raise Mo1BraggError(f'Unknown mode {mode}')
in_signal.put(inp)
status = CompareStatus(self.calculator.calc_done, 1)
self.cancel_on_stop(status)
status.wait(self.timeout_for_pvwait)
time.sleep(0.25) #TODO needed still? Needed due to update frequency of softIOC
if mode == "AngleToEnergy":
return self.calculator.calc_energy.get()
elif mode == "EnergyToAngle":
return self.calculator.calc_angle.get()
status = CompareStatus(out_signal, 0, operation_success='>')
self.cancel_on_stop(status)
status.wait(self.timeout_for_pvwait)
return out_signal.get()
def set_advanced_xas_settings(
self, low: float, high: float, scan_time: float, p_kink: float, e_kink: float
@@ -434,7 +512,6 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
for s in status_list:
s.wait(timeout=self.timeout_for_pvwait)
def set_scan_control_settings(self, mode: ScanControlMode, scan_duration: float) -> None:
"""Set the scan control settings for the upcoming scan.
@@ -454,13 +531,3 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
for s in status_list:
s.wait(timeout=self.timeout_for_pvwait)
def _update_scan_parameter(self):
"""Get the scan_info parameters for the scan."""
for key, value in self.scan_info.msg.request_inputs["inputs"].items():
if hasattr(self.scan_parameter, key):
setattr(self.scan_parameter, key, value)
for key, value in self.scan_info.msg.request_inputs["kwargs"].items():
if hasattr(self.scan_parameter, key):
setattr(self.scan_parameter, key, value)
@@ -165,7 +165,7 @@ class Mo1TriggerSettings(Device):
class Mo1BraggCalculator(Device):
"""Mo1 Bragg PVs to convert angle to energy or vice-versa."""
calc_reset = Cpt(EpicsSignal, suffix="calc_reset", kind="config", put_complete=True)
calc_reset = Cpt(EpicsSignalWithRBV, suffix="calc_reset", kind="config", put_complete=True)
calc_done = Cpt(EpicsSignalRO, suffix="calc_done_RBV", kind="config")
calc_energy = Cpt(EpicsSignalWithRBV, suffix="calc_energy", kind="config")
calc_angle = Cpt(EpicsSignalWithRBV, suffix="calc_angle", kind="config")
+86 -41
View File
@@ -1,9 +1,10 @@
from __future__ import annotations
import time
from typing import TYPE_CHECKING, Literal, cast
from typing import TYPE_CHECKING, Literal
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase
from ophyd.status import SubscriptionStatus, WaitTimeoutError
@@ -19,6 +20,7 @@ from superxas_bec.devices.nidaq.nidaq_enums import (
ScanRates,
ScanType,
)
from superxas_bec.devices.utils.utils import fetch_scan_info
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
@@ -35,18 +37,12 @@ class NidaqControl(Device):
energy = Cpt(SetableSignal, value=0, kind=Kind.normal)
smpl_abs = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream sample absorption"
)
smpl_abs = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream sample absorption")
smpl_fluo = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream sample fluorescence"
)
ref_abs = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream reference absorption"
)
cisum = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter sum"
)
ref_abs = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream reference absorption")
cisum = Cpt(SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream counter sum")
ai0_mean = Cpt(
SetableSignal, value=0, kind=Kind.normal, doc="NIDAQ stream analog input 0, MEAN"
@@ -362,7 +358,9 @@ class NidaqControl(Device):
### Control PVs ###
enable_compression = Cpt(EpicsSignal, suffix="NIDAQ-EnableRLE", kind=Kind.config, auto_monitor=True)
enable_compression = Cpt(
EpicsSignal, suffix="NIDAQ-EnableRLE", kind=Kind.config, auto_monitor=True
)
# enable_dead_time_correction = Cpt(EpicsSignal, suffix="NIDAQ-EnableDTC", kind=Kind.config, auto_monitor=True)
kickoff_call = Cpt(EpicsSignal, suffix="NIDAQ-Kickoff", kind=Kind.config)
stage_call = Cpt(EpicsSignal, suffix="NIDAQ-Stage", kind=Kind.config)
@@ -371,13 +369,29 @@ class NidaqControl(Device):
compression_ratio = Cpt(EpicsSignalRO, suffix="NIDAQ-CompressionRatio", kind=Kind.config)
scan_type = Cpt(EpicsSignal, suffix="NIDAQ-ScanType", kind=Kind.config)
scan_type_string = Cpt(EpicsSignal, suffix="NIDAQ-ScanType", kind=Kind.config, string=True)
sampling_rate = Cpt(EpicsSignal, suffix="NIDAQ-SamplingRateRequested", kind=Kind.config, auto_monitor=True)
sampling_rate_string = Cpt(EpicsSignal, suffix="NIDAQ-SamplingRateRequested", kind=Kind.config, string=True, auto_monitor=True)
sampling_rate = Cpt(
EpicsSignal, suffix="NIDAQ-SamplingRateRequested", kind=Kind.config, auto_monitor=True
)
sampling_rate_string = Cpt(
EpicsSignal,
suffix="NIDAQ-SamplingRateRequested",
kind=Kind.config,
string=True,
auto_monitor=True,
)
scan_duration = Cpt(EpicsSignal, suffix="NIDAQ-SamplingDuration", kind=Kind.config)
readout_range = Cpt(EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config, auto_monitor=True)
readout_range_string = Cpt(EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config, string=True, auto_monitor=True)
encoder_factor = Cpt(EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config, auto_monitor=True)
encoder_factor_string = Cpt(EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config, string=True, auto_monitor=True)
readout_range = Cpt(
EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config, auto_monitor=True
)
readout_range_string = Cpt(
EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config, string=True, auto_monitor=True
)
encoder_factor = Cpt(
EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config, auto_monitor=True
)
encoder_factor_string = Cpt(
EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config, string=True, auto_monitor=True
)
stop_call = Cpt(EpicsSignal, suffix="NIDAQ-Stop", kind=Kind.config)
power = Cpt(EpicsSignal, suffix="NIDAQ-Power", kind=Kind.config)
heartbeat = Cpt(EpicsSignal, suffix="NIDAQ-Heartbeat", kind=Kind.config, auto_monitor=True)
@@ -392,22 +406,38 @@ class NidaqControl(Device):
ref_abs_ln = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_ln", kind=Kind.config, auto_monitor=True)
smpl_abs_no = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_no", kind=Kind.config, auto_monitor=True)
smpl_abs_no_string = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_no", kind=Kind.config, string=True, auto_monitor=True)
smpl_abs_no_string = Cpt(
EpicsSignal, suffix="NIDAQ-smpl_abs_no", kind=Kind.config, string=True, auto_monitor=True
)
smpl_abs_de = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_de", kind=Kind.config, auto_monitor=True)
smpl_abs_de_string = Cpt(EpicsSignal, suffix="NIDAQ-smpl_abs_de", kind=Kind.config, string=True, auto_monitor=True)
smpl_abs_de_string = Cpt(
EpicsSignal, suffix="NIDAQ-smpl_abs_de", kind=Kind.config, string=True, auto_monitor=True
)
smpl_fluo_no = Cpt(EpicsSignal, suffix="NIDAQ-smpl_fluo_no", kind=Kind.config, auto_monitor=True)
smpl_fluo_no_string = Cpt(EpicsSignal, suffix="NIDAQ-smpl_fluo_no", kind=Kind.config, string=True, auto_monitor=True)
smpl_fluo_no = Cpt(
EpicsSignal, suffix="NIDAQ-smpl_fluo_no", kind=Kind.config, auto_monitor=True
)
smpl_fluo_no_string = Cpt(
EpicsSignal, suffix="NIDAQ-smpl_fluo_no", kind=Kind.config, string=True, auto_monitor=True
)
smpl_fluo_de = Cpt(EpicsSignal, suffix="NIDAQ-smpl_fluo_de", kind=Kind.config, auto_monitor=True)
smpl_fluo_de_string = Cpt(EpicsSignal, suffix="NIDAQ-smpl_fluo_de", kind=Kind.config, string=True, auto_monitor=True)
smpl_fluo_de = Cpt(
EpicsSignal, suffix="NIDAQ-smpl_fluo_de", kind=Kind.config, auto_monitor=True
)
smpl_fluo_de_string = Cpt(
EpicsSignal, suffix="NIDAQ-smpl_fluo_de", kind=Kind.config, string=True, auto_monitor=True
)
ref_abs_no = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_no", kind=Kind.config, auto_monitor=True)
ref_abs_no_string = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_no", kind=Kind.config, string=True, auto_monitor=True)
ref_abs_no_string = Cpt(
EpicsSignal, suffix="NIDAQ-ref_abs_no", kind=Kind.config, string=True, auto_monitor=True
)
ref_abs_de = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_de", kind=Kind.config, auto_monitor=True)
ref_abs_de_string = Cpt(EpicsSignal, suffix="NIDAQ-ref_abs_de", kind=Kind.config, string=True, auto_monitor=True)
ref_abs_de_string = Cpt(
EpicsSignal, suffix="NIDAQ-ref_abs_de", kind=Kind.config, string=True, auto_monitor=True
)
class Nidaq(PSIDeviceBase, NidaqControl):
@@ -425,14 +455,16 @@ class Nidaq(PSIDeviceBase, NidaqControl):
def __init__(self, prefix: str = "", *, name: str, scan_info: ScanInfo = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.scan_info: ScanInfo
self.scan_parameters: ScanServerScanInfo | None = None
self.timeout_wait_for_signal = 5 # put 5s firsts
self._timeout_wait_for_pv = 5 # 5s timeout for pv calls. editted due to timeout issues persisting
self._timeout_wait_for_pv = (
5 # 5s timeout for pv calls. editted due to timeout issues persisting
)
self.valid_scan_names = [
"xas_simple_scan",
"xas_simple_scan_with_xrd",
"xas_simple_scan_with_xrd", # prepared for future, currently not yet available
"xas_advanced_scan",
"xas_advanced_scan_with_xrd",
"xas_advanced_scan_with_xrd", # prepared for future, currently not yet available
"nidaq_continuous_scan",
]
@@ -440,9 +472,11 @@ class Nidaq(PSIDeviceBase, NidaqControl):
# Beamline Methods #
########################################
def _check_if_scan_name_is_valid(self) -> bool:
def _check_if_scan_name_is_valid(self, scan_parameters: ScanServerScanInfo | None) -> bool:
"""Check if the scan is within the list of scans for which the backend is working"""
scan_name = self.scan_info.msg.scan_name
if scan_parameters is None:
return False
scan_name = scan_parameters.scan_name
if scan_name in self.valid_scan_names:
return True
return False
@@ -600,7 +634,8 @@ class Nidaq(PSIDeviceBase, NidaqControl):
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
If the upcoming scan is not in the list of valid scans, return immediately.
"""
if not self._check_if_scan_name_is_valid():
self.scan_parameters = fetch_scan_info(self.scan_info)
if not self._check_if_scan_name_is_valid(self.scan_parameters):
return None
if self.state.get() != NidaqState.STANDBY:
@@ -610,16 +645,16 @@ class Nidaq(PSIDeviceBase, NidaqControl):
status.wait(timeout=self.timeout_wait_for_signal)
# If scan is not part of the valid_scan_names,
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
if self.scan_parameters.scan_name != "nidaq_continuous_scan":
self.scan_type.set(ScanType.TRIGGERED).wait(timeout=self._timeout_wait_for_pv)
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
self.enable_compression.set(1).wait(timeout=self._timeout_wait_for_pv)
else:
self.scan_type.set(ScanType.CONTINUOUS).wait(timeout=self._timeout_wait_for_pv)
self.scan_duration.set(self.scan_info.msg.scan_parameters["scan_duration"]).wait(
self.scan_duration.set(self._scan_parameter("scan_duration")).wait(
timeout=self._timeout_wait_for_pv
)
self.enable_compression.set(self.scan_info.msg.scan_parameters["compression"]).wait(
self.enable_compression.set(self._scan_parameter("compression")).wait(
timeout=self._timeout_wait_for_pv
)
@@ -632,7 +667,7 @@ class Nidaq(PSIDeviceBase, NidaqControl):
# self.stage_call.set(1).wait(timeout=self._timeout_wait_for_pv)
self.stage_call.put(1)
status.wait(timeout=self.timeout_wait_for_signal)
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
if self.scan_parameters.scan_name != "nidaq_continuous_scan":
status = self.on_kickoff()
self.cancel_on_stop(status)
status.wait(timeout=self._timeout_wait_for_pv)
@@ -663,10 +698,10 @@ class Nidaq(PSIDeviceBase, NidaqControl):
before the motor starts its oscillation. This is needed for being properly homed.
The NIDAQ should go into Acquiring mode.
"""
if not self._check_if_scan_name_is_valid():
if not self._check_if_scan_name_is_valid(self.scan_parameters):
return None
if self.scan_info.msg.scan_name == "nidaq_continuous_scan":
if self.scan_parameters.scan_name == "nidaq_continuous_scan":
logger.info(f"Device {self.name} ready to be kicked off for nidaq_continuous_scan")
return None
@@ -687,15 +722,23 @@ class Nidaq(PSIDeviceBase, NidaqControl):
For the NIDAQ we use this method to stop the backend since it
would not stop by itself in its current implementation since the number of points are not predefined.
"""
if not self._check_if_scan_name_is_valid():
if not self._check_if_scan_name_is_valid(self.scan_parameters):
return None
status = CompareStatus(self.state, NidaqState.STANDBY)
self.cancel_on_stop(status)
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
if self.scan_parameters.scan_name != "nidaq_continuous_scan":
self.on_stop()
return status
def _scan_parameter(self, name: str):
"""Fetch a scan parameter from v4 metadata, with legacy fallbacks."""
if self.scan_parameters is None:
return None
if name in self.scan_parameters.additional_scan_parameters:
return self.scan_parameters.additional_scan_parameters[name]
return None
def _progress_update(self, value, **kwargs) -> None:
"""Callback method to update the scan progress, runs a callback
to SUB_PROGRESS subscribers, i.e. BEC.
@@ -703,7 +746,9 @@ class Nidaq(PSIDeviceBase, NidaqControl):
Args:
value (int) : current progress value
"""
scan_duration = self.scan_info.msg.scan_parameters.get("scan_duration", None)
if self.scan_parameters is None:
return
scan_duration = self._scan_parameter("scan_duration")
if not isinstance(scan_duration, (int, float)):
return
value = scan_duration - value
+2
View File
@@ -0,0 +1,2 @@
"""Utility helpers for SuperXAS devices."""
+26
View File
@@ -0,0 +1,26 @@
"""Utility functions for SuperXAS devices."""
from copy import deepcopy
import numpy as np
from bec_lib.devicemanager import ScanInfo
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from pydantic import ValidationError
def fetch_scan_info(scan_info: ScanInfo) -> ScanServerScanInfo:
"""Normalize BEC scan info into the v4 scan info model."""
info = deepcopy(scan_info.msg.info)
if isinstance(info.get("positions"), list):
info["positions"] = np.array(info["positions"])
try:
msg = ScanServerScanInfo.model_validate(info)
except ValidationError:
if info.get("scan_type") == "fly":
info["scan_type"] = "hardware_triggered"
else:
info["scan_type"] = "software_triggered"
msg = ScanServerScanInfo.model_validate(info)
return msg
+142 -279
View File
@@ -1,338 +1,201 @@
"""This module contains the scan classes for the mono bragg motor of the SuperXAS beamline."""
"""
V4 implementation of the SuperXAS mono bragg XAS scans.
Scan procedure:
- prepare_scan
- open_scan
- stage
- pre_scan
- scan_core
- at_each_point (optionally called by scan_core)
- post_scan
- unstage
- close_scan
- on_exception (called if any exception is raised during the scan)
"""
from __future__ import annotations
import time
from typing import Literal
from typing import Annotated
import numpy as np
from bec_lib.device import DeviceBase
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase
logger = bec_logger.logger
from bec_lib.scan_args import ScanArgument, Units
from bec_server.scan_server.scans.scan_base import ScanBase, ScanType
from bec_server.scan_server.scans.scan_modifier import scan_hook
class XASSimpleScan(AsyncFlyScanBase):
"""Class for the XAS simple scan"""
class XASSimpleScan(ScanBase):
"""Simple oscillating XAS scan on the SuperXAS mono bragg motor."""
scan_type = ScanType.HARDWARE_TRIGGERED
scan_name = "xas_simple_scan"
scan_type = "fly"
scan_report_hint = "device_progress"
required_kwargs = []
use_scan_progress_report = False
pre_move = False
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration"],
"Scan Parameters": ["scan_time", "scan_duration", "monitored_readout_cycle"],
}
def __init__(
self,
start: float,
stop: float,
scan_time: float,
scan_duration: float,
motor: DeviceBase = "mo1_bragg",
# fmt: off
start: Annotated[float, ScanArgument(display_name="Start Energy", description="Start energy.", units=Units.eV, ge=4500, le=64000)],
stop: Annotated[float, ScanArgument(display_name="Stop Energy", description="Stop energy.", units=Units.eV, ge=4500, le=64000)],
scan_time: Annotated[float, ScanArgument(display_name="Scan Time", description="Time for one scan cycle.", units=Units.s, ge=0.05)],
scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Total scan duration.", units=Units.s, ge=0.05)],
motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None,
daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None,
gonio: Annotated[DeviceBase | None, ScanArgument(display_name="Gonio", description="Goniometer device.")] = None,
monitored_readout_cycle: Annotated[float, ScanArgument(display_name="Monitored Readout Cycle", description="Delay between monitored readouts.", units=Units.s, gt=0)] = 1,
# fmt: on
**kwargs,
):
"""The xas_simple_scan is used to start a simple oscillating scan on the mono bragg motor.
Start and Stop define the energy range for the scan, scan_time is the time for one scan
cycle and scan_duration is the duration of the scan. If scan duration is set to 0, the
scan will run infinitely.
"""
Start a simple oscillating scan on the mono bragg motor.
Args:
start (float): Start energy for the scan.
stop (float): Stop energy for the scan.
start (float): Start energy.
stop (float): Stop energy.
scan_time (float): Time for one scan cycle.
scan_duration (float): Duration of the scan.
motor (DeviceBase, optional): Motor device to be used for the scan.
Defaults to "mo1_bragg".
Examples:
>>> scans.xas_simple_scan(start=8000, stop=9000, scan_time=1, scan_duration=10)
scan_duration (float): Total scan duration.
motor (DeviceBase | None): Bragg motor device.
daq (DeviceBase | None): NIDAQ device.
gonio (DeviceBase | None): Goniometer device.
monitored_readout_cycle (float): Delay between monitored readouts.
Returns:
ScanReport
"""
super().__init__(**kwargs)
self.motor = motor
self._baseline_readout_status = None
self.start = start
self.stop = stop
self.scan_time = scan_time
self.scan_duration = scan_duration
self.primary_readout_cycle = 1
def stage(self):
"""call the stage procedure"""
# Compute position for mo1_gonio pre move
# Since energy is not linear to angle, we have to calculate the angles first.
pos_start = yield from self.stubs.send_rpc_and_wait(
"mo1_bragg",
"convert_angle_energy",
mode = "EnergyToAngle",
inp = self.start,
)
pos_end = yield from self.stubs.send_rpc_and_wait(
"mo1_bragg",
"convert_angle_energy",
mode = "EnergyToAngle",
inp = self.stop,
)
# Goniometer position is in the middle of the start and stop angle of the scan
pos = (pos_start + pos_end) / 2
# Premove with mo1_gonio
yield from self.stubs.send_rpc_and_wait(
"mo1_gonio",
"move",
position = pos,
wait = True,
timeout = 30, # 30 seconds timeout
)
# Continue with staging the devices
yield from self.stubs.stage()
def update_readout_priority(self):
"""Ensure that NIDAQ is not monitored for any quick EXAFS."""
super().update_readout_priority()
self.readout_priority["async"].append("nidaq")
def prepare_positions(self):
"""Prepare the positions for the scan.
Use here only start and end energy defining the range for the scan.
"""
self.mo1_bragg = self._resolve_device(motor, "mo1_bragg")
self.motor = self.mo1_bragg
self.daq = self._resolve_device(daq, "nidaq")
self.mo1_gonio = self._resolve_device(gonio, "mo1_rotx")
self.monitored_readout_cycle = monitored_readout_cycle
self.positions = np.array([self.start, self.stop], dtype=float)
self.num_pos = None
yield None
self.update_scan_info(
positions=self.positions,
scan_time=scan_time,
scan_duration=scan_duration,
monitored_readout_cycle=monitored_readout_cycle,
)
self.actions.set_device_readout_priority([self.daq], priority="async")
def _resolve_device(self, device: DeviceBase | str | None, default_name: str) -> DeviceBase:
"""Resolve optional device arguments against the v4 device container."""
if device is None:
return self.dev[default_name]
if isinstance(device, str):
return self.dev[device]
return device
@scan_hook
def prepare_scan(self):
"""Prepare scan metadata and baseline readout."""
self.actions.add_scan_report_instruction_device_progress(self.motor)
self._baseline_readout_status = self.actions.read_baseline_devices(wait=False)
@scan_hook
def open_scan(self):
"""Open the scan."""
self.actions.open_scan()
@scan_hook
def stage(self):
"""Center the goniometer for the requested energy range and stage devices."""
pos_start = self.mo1_bragg.convert_angle_energy(mode="EnergyToAngle", inp=self.start)
pos_end = self.mo1_bragg.convert_angle_energy(mode="EnergyToAngle", inp=self.stop)
gonio_position = (pos_start + pos_end) / 2
self.actions.set(self.mo1_gonio, gonio_position, wait=False).wait(timeout=30)
self.actions.stage_all_devices()
@scan_hook
def pre_scan(self):
"""Pre Scan action."""
self._check_limits()
# Ensure parent class pre_scan actions to be called.
yield from super().pre_scan()
def scan_report_instructions(self):
"""
Return the instructions for the scan report.
"""
yield from self.stubs.scan_report_instruction({"device_progress": [self.motor]})
"""Execute pre-scan hooks on all devices."""
self.actions.pre_scan_all_devices()
@scan_hook
def scan_core(self):
"""Run the scan core.
Kickoff the oscillation on the Bragg motor and wait for the completion of the motion.
"""
# Start the oscillation on the Bragg motor.
yield from self.stubs.kickoff(device=self.motor)
complete_status = yield from self.stubs.complete(device=self.motor, wait=False)
"""Kick off the mono bragg oscillation and read monitored devices."""
self.actions.kickoff(self.motor)
completion_status = self.actions.complete(self.motor, wait=False)
while not complete_status.done:
# Readout monitored devices
yield from self.stubs.read(group="monitored", point_id=self.point_id)
time.sleep(self.primary_readout_cycle)
self.point_id += 1
while not completion_status.done:
self.at_each_point()
self.num_pos = self.point_id
@scan_hook
def at_each_point(self):
"""Read monitored devices during the hardware-triggered scan."""
self.actions.read_monitored_devices()
time.sleep(self.monitored_readout_cycle)
@scan_hook
def post_scan(self):
"""Complete all devices after the scan core."""
self.actions.complete_all_devices()
# class XASSimpleScanWithXRD(XASSimpleScan):
# """Class for the XAS simple scan with XRD"""
@scan_hook
def unstage(self):
"""Unstage all devices."""
self.actions.unstage_all_devices()
# scan_name = "xas_simple_scan_with_xrd"
# gui_config = {
# "Movement Parameters": ["start", "stop"],
# "Scan Parameters": ["scan_time", "scan_duration"],
# "Low Energy Range": ["xrd_enable_low", "num_trigger_low", "exp_time_low", "cycle_low"],
# "High Energy Range": ["xrd_enable_high", "num_trigger_high", "exp_time_high", "cycle_high"],
# }
@scan_hook
def close_scan(self):
"""Close the scan after the baseline readout has completed."""
if self._baseline_readout_status is not None:
self._baseline_readout_status.wait()
self.actions.close_scan()
self.actions.check_for_unchecked_statuses()
# def __init__(
# self,
# start: float,
# stop: float,
# scan_time: float,
# scan_duration: float,
# xrd_enable_low: bool,
# num_trigger_low: int,
# exp_time_low: float,
# cycle_low: int,
# xrd_enable_high: bool,
# num_trigger_high: int,
# exp_time_high: float,
# cycle_high: float,
# motor: DeviceBase = "mo1_bragg",
# **kwargs,
# ):
# """The xas_simple_scan_with_xrd is an oscillation motion on the mono motor
# with XRD triggering at low and high energy ranges.
# If scan duration is set to 0, the scan will run infinitely.
# Args:
# start (float): Start energy for the scan.
# stop (float): Stop energy for the scan.
# scan_time (float): Time for one oscillation .
# scan_duration (float): Total duration of the scan.
# xrd_enable_low (bool): Enable XRD triggering for the low energy range.
# num_trigger_low (int): Number of triggers for the low energy range.
# exp_time_low (float): Exposure time for the low energy range.
# cycle_low (int): Specify how often the triggers should be considered,
# every nth cycle for low
# xrd_enable_high (bool): Enable XRD triggering for the high energy range.
# num_trigger_high (int): Number of triggers for the high energy range.
# exp_time_high (float): Exposure time for the high energy range.
# cycle_high (int): Specify how often the triggers should be considered,
# every nth cycle for high
# motor (DeviceBase, optional): Motor device to be used for the scan.
# Defaults to "mo1_bragg".
# Examples:
# >>> scans.xas_simple_scan_with_xrd(start=8000, stop=9000, scan_time=1, scan_duration=10, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000)
# """
# super().__init__(
# start=start,
# stop=stop,
# scan_time=scan_time,
# scan_duration=scan_duration,
# motor=motor,
# **kwargs,
# )
# self.xrd_enable_low = xrd_enable_low
# self.num_trigger_low = num_trigger_low
# self.exp_time_low = exp_time_low
# self.cycle_low = cycle_low
# self.xrd_enable_high = xrd_enable_high
# self.num_trigger_high = num_trigger_high
# self.exp_time_high = exp_time_high
# self.cycle_high = cycle_high
@scan_hook
def on_exception(self, exception: Exception):
"""Try to stop outstanding device work if the scan fails."""
self.actions.complete_all_devices(wait=False)
class XASAdvancedScan(XASSimpleScan):
"""Class for the XAS advanced scan"""
"""Advanced oscillating XAS scan with spline parameters."""
scan_name = "xas_advanced_scan"
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration"],
"Scan Parameters": ["scan_time", "scan_duration", "monitored_readout_cycle"],
"Spline Parameters": ["p_kink", "e_kink"],
}
def __init__(
self,
start: float,
stop: float,
scan_time: float,
scan_duration: float,
p_kink: float,
e_kink: float,
motor: DeviceBase = "mo1_bragg",
# fmt: off
start: Annotated[float, ScanArgument(display_name="Start Energy", description="Start energy.", units=Units.eV)],
stop: Annotated[float, ScanArgument(display_name="Stop Energy", description="Stop energy.", units=Units.eV)],
scan_time: Annotated[float, ScanArgument(display_name="Scan Time", description="Time for one scan cycle.", units=Units.s, ge=0)],
scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Total scan duration.", units=Units.s, ge=0)],
p_kink: Annotated[float, ScanArgument(display_name="P Kink", description="Position of the kink.", ge=0)],
e_kink: Annotated[float, ScanArgument(display_name="E Kink", description="Energy of the kink.", units=Units.eV)],
motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None,
daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None,
gonio: Annotated[DeviceBase | None, ScanArgument(display_name="Gonio", description="Goniometer device.")] = None,
monitored_readout_cycle: Annotated[float, ScanArgument(display_name="Monitored Readout Cycle", description="Delay between monitored readouts.", units=Units.s, gt=0)] = 1,
**kwargs,
# fmt: on
):
"""The xas_advanced_scan is an oscillation motion on the mono motor.
Start and Stop define the energy range for the scan, scan_time is the
time for one scan cycle and scan_duration is the duration of the scan.
If scan duration is set to 0, the scan will run infinitely.
p_kink and e_kink add a kink to the motion profile to slow down in the
exafs region of the scan.
Args:
start (float): Start angle for the scan.
stop (float): Stop angle for the scan.
scan_time (float): Time for one oscillation .
scan_duration (float): Total duration of the scan.
p_kink (float): Position of the kink.
e_kink (float): Energy of the kink.
motor (DeviceBase, optional): Motor device to be used for the scan.
Defaults to "mo1_bragg".
Examples:
>>> scans.xas_advanced_scan(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500)
"""
super().__init__(
start=start,
stop=stop,
scan_time=scan_time,
scan_duration=scan_duration,
motor=motor,
daq=daq,
gonio=gonio,
monitored_readout_cycle=monitored_readout_cycle,
**kwargs,
)
self.p_kink = p_kink
self.e_kink = e_kink
# class XASAdvancedScanWithXRD(XASAdvancedScan):
# """Class for the XAS advanced scan with XRD"""
# scan_name = "xas_advanced_scan_with_xrd"
# gui_config = {
# "Movement Parameters": ["start", "stop"],
# "Scan Parameters": ["scan_time", "scan_duration"],
# "Spline Parameters": ["p_kink", "e_kink"],
# "Low Energy Range": ["xrd_enable_low", "num_trigger_low", "exp_time_low", "cycle_low"],
# "High Energy Range": ["xrd_enable_high", "num_trigger_high", "exp_time_high", "cycle_high"],
# }
# def __init__(
# self,
# start: float,
# stop: float,
# scan_time: float,
# scan_duration: float,
# p_kink: float,
# e_kink: float,
# xrd_enable_low: bool,
# num_trigger_low: int,
# exp_time_low: float,
# cycle_low: int,
# xrd_enable_high: bool,
# num_trigger_high: int,
# exp_time_high: float,
# cycle_high: float,
# motor: DeviceBase = "mo1_bragg",
# **kwargs,
# ):
# """The xas_advanced_scan is an oscillation motion on the mono motor
# with XRD triggering at low and high energy ranges.
# Start and Stop define the energy range for the scan, scan_time is the time for
# one scan cycle and scan_duration is the duration of the scan. If scan duration
# is set to 0, the scan will run infinitely. p_kink and e_kink add a kink to the
# motion profile to slow down in the exafs region of the scan.
# Args:
# start (float): Start angle for the scan.
# stop (float): Stop angle for the scan.
# scan_time (float): Time for one oscillation .
# scan_duration (float): Total duration of the scan.
# p_kink (float): Position of kink.
# e_kink (float): Energy of the kink.
# xrd_enable_low (bool): Enable XRD triggering for the low energy range.
# num_trigger_low (int): Number of triggers for the low energy range.
# exp_time_low (float): Exposure time for the low energy range.
# cycle_low (int): Specify how often the triggers should be considered,
# every nth cycle for low
# xrd_enable_high (bool): Enable XRD triggering for the high energy range.
# num_trigger_high (int): Number of triggers for the high energy range.
# exp_time_high (float): Exposure time for the high energy range.
# cycle_high (int): Specify how often the triggers should be considered,
# every nth cycle for high
# motor (DeviceBase, optional): Motor device to be used for the scan.
# Defaults to "mo1_bragg".
# Examples:
# >>> scans.xas_advanced_scan_with_xrd(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000)
# """
# super().__init__(
# start=start,
# stop=stop,
# scan_time=scan_time,
# scan_duration=scan_duration,
# p_kink=p_kink,
# e_kink=e_kink,
# motor=motor,
# **kwargs,
# )
# self.p_kink = p_kink
# self.e_kink = e_kink
# self.xrd_enable_low = xrd_enable_low
# self.num_trigger_low = num_trigger_low
# self.exp_time_low = exp_time_low
# self.cycle_low = cycle_low
# self.xrd_enable_high = xrd_enable_high
# self.num_trigger_high = num_trigger_high
# self.exp_time_high = exp_time_high
# self.cycle_high = cycle_high
self.update_scan_info(p_kink=p_kink, e_kink=e_kink)
+99 -57
View File
@@ -1,84 +1,126 @@
"""This module contains the scan class for the nidaq of the SuperXAS beamline for use in continuous mode."""
"""
V4 implementation of the SuperXAS NIDAQ continuous scan.
The NIDAQ continuous scan measures with the NIDAQ without moving the
monochromator or any other motor.
"""
from __future__ import annotations
import time
from typing import Literal
from typing import Annotated
import numpy as np
from bec_lib.device import DeviceBase
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase
logger = bec_logger.logger
from bec_lib.scan_args import ScanArgument, Units
from bec_server.scan_server.scans.scan_base import ScanBase, ScanType
from bec_server.scan_server.scans.scan_modifier import scan_hook
class NIDAQContinuousScan(AsyncFlyScanBase):
"""Class for the nidaq continuous scan (without mono)"""
class NIDAQContinuousScan(ScanBase):
"""Continuous NIDAQ acquisition without mono motion."""
scan_type = ScanType.HARDWARE_TRIGGERED
scan_name = "nidaq_continuous_scan"
scan_type = "fly"
scan_report_hint = "device_progress"
required_kwargs = []
use_scan_progress_report = False
pre_move = False
gui_config = {"Scan Parameters": ["scan_duration"], "Data Compression": ["compression"]}
gui_config = {"Scan Parameters": ["scan_duration", "daq", "compression"]}
def __init__(
self, scan_duration: float, daq: DeviceBase = "nidaq", compression: bool = False, **kwargs
self,
# fmt: off
scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Duration of the scan", units=Units.s)],
daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="DAQ device to be used for the scan")] = None,
compression: Annotated[bool, ScanArgument(display_name="Compression", description="Whether to compress the data")] = False,
# fmt: on
**kwargs,
):
"""The NIDAQ continuous scan is used to measure with the NIDAQ without moving the
monochromator or any other motor. The NIDAQ thus runs in continuous mode, with a
set scan_duration.
"""
Start a continuous NIDAQ acquisition.
Args:
scan_duration (float): Duration of the scan.
daq (DeviceBase, optional): DAQ device to be used for the scan.
Defaults to "nidaq".
Examples:
>>> scans.nidaq_continuous_scan(scan_duration=10)
daq (DeviceBase | None): DAQ device to be used for the scan.
compression (bool): Whether to compress the data.
Returns:
ScanReport
"""
super().__init__(**kwargs)
self._baseline_readout_status = None
self.scan_duration = scan_duration
self.daq = daq
self.start_time = 0
self.primary_readout_cycle = 1
self.scan_parameters["scan_duration"] = scan_duration
self.scan_parameters["compression"] = compression
self.daq = self._resolve_device(daq, "nidaq")
self.compression = compression
self.monitored_readout_cycle = 1.0
self.motors = [self.daq]
def update_readout_priority(self):
"""Ensure that NIDAQ is not monitored for any quick EXAFS."""
super().update_readout_priority()
self.readout_priority["async"].append("nidaq")
self.update_scan_info(scan_duration=scan_duration, compression=compression)
self.actions.set_device_readout_priority([self.daq], priority="async")
def prepare_positions(self):
"""Prepare the positions for the scan."""
yield None
def _resolve_device(self, device: DeviceBase | str | None, default_name: str) -> DeviceBase:
"""Resolve optional device arguments against the v4 device container."""
if device is None:
return self.dev[default_name]
if isinstance(device, str):
return self.dev[device]
return device
@scan_hook
def prepare_scan(self):
"""Prepare scan metadata and baseline readout."""
self.actions.add_scan_report_instruction_device_progress(self.daq)
self._baseline_readout_status = self.actions.read_baseline_devices(wait=False)
@scan_hook
def open_scan(self):
"""Open the scan."""
self.actions.open_scan()
@scan_hook
def stage(self):
"""Stage all devices."""
self.actions.stage_all_devices()
@scan_hook
def pre_scan(self):
"""Pre Scan action."""
self.start_time = time.time()
# Ensure parent class pre_scan actions to be called.
yield from super().pre_scan()
def scan_report_instructions(self):
"""
Return the instructions for the scan report.
"""
yield from self.stubs.scan_report_instruction({"device_progress": [self.daq]})
"""Execute pre-scan hooks on all devices."""
self.actions.pre_scan_all_devices()
@scan_hook
def scan_core(self):
"""Run the scan core.
Kickoff the acquisition of the NIDAQ wait for the completion of the scan.
"""
kickoff_status = yield from self.stubs.kickoff(device=self.daq)
kickoff_status.wait(timeout=5) # wait for proper kickoff of device
"""Kick off the NIDAQ acquisition and read monitored devices."""
kickoff_status = self.actions.kickoff(device=self.daq, wait=False)
kickoff_status.wait(timeout=5)
complete_status = yield from self.stubs.complete(device=self.daq, wait=False)
complete_status = self.actions.complete(device=self.daq, wait=False)
while not complete_status.done:
# Readout monitored devices
yield from self.stubs.read(group="monitored", point_id=self.point_id)
time.sleep(self.primary_readout_cycle)
self.point_id += 1
self.at_each_point()
time.sleep(self.monitored_readout_cycle)
@scan_hook
def at_each_point(self):
"""Read monitored devices during the hardware-triggered scan."""
self.actions.read_monitored_devices()
@scan_hook
def post_scan(self):
"""Complete all devices after the scan core."""
self.actions.complete_all_devices()
@scan_hook
def unstage(self):
"""Unstage all devices."""
self.actions.unstage_all_devices()
@scan_hook
def close_scan(self):
"""Close the scan after the baseline readout has completed."""
if self._baseline_readout_status is not None:
self._baseline_readout_status.wait()
self.actions.close_scan()
self.actions.check_for_unchecked_statuses()
@scan_hook
def on_exception(self, exception: Exception):
"""Try to stop outstanding device work if the scan fails."""
self.actions.complete_all_devices(wait=False)
self.num_pos = self.point_id
@@ -0,0 +1,53 @@
# pylint: skip-file
from types import SimpleNamespace
import numpy as np
from superxas_bec.devices.utils.utils import fetch_scan_info
def test_fetch_scan_info_accepts_v4_scan_info_with_positions_list():
scan_info = SimpleNamespace(
msg=SimpleNamespace(
info={
"scan_name": "xas_simple_scan",
"scan_id": "scan-id-test",
"scan_type": "hardware_triggered",
"positions": [8000.0, 9000.0],
"additional_scan_parameters": {
"scan_time": 1.0,
"scan_duration": 10.0,
},
}
)
)
msg = fetch_scan_info(scan_info)
assert msg.scan_name == "xas_simple_scan"
assert msg.scan_type == "hardware_triggered"
np.testing.assert_array_equal(msg.positions, np.array([8000.0, 9000.0]))
assert msg.additional_scan_parameters["scan_duration"] == 10.0
def test_fetch_scan_info_converts_legacy_fly_scan_type():
scan_info = SimpleNamespace(
msg=SimpleNamespace(
info={
"scan_name": "xas_simple_scan",
"scan_id": "scan-id-test",
"scan_type": "fly",
"positions": [8000.0, 9000.0],
"request_inputs": {
"inputs": {},
"kwargs": {"scan_time": 1.0, "scan_duration": 10.0},
},
}
)
)
msg = fetch_scan_info(scan_info)
assert msg.scan_type == "hardware_triggered"
assert msg.request_inputs["kwargs"]["scan_time"] == 1.0
@@ -0,0 +1,100 @@
# pylint: skip-file
from unittest import mock
import numpy as np
import pytest
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd_devices.tests.utils import patched_device
from superxas_bec.devices.mo1_bragg.mo1_bragg import Mo1Bragg, ScanControlLoadMessage
@pytest.fixture(scope="function")
def mock_bragg():
with patched_device(
Mo1Bragg, name="mo1_bragg", prefix="X10DA-OP-MO1:BRAGG:"
) as dev:
yield dev
def _set_scan_info(dev, scan_info):
dev.scan_info.msg.info.update(scan_info.model_dump())
def _mock_status():
status = mock.MagicMock()
status.wait = mock.MagicMock()
return status
def test_mo1_bragg_stage_uses_v4_simple_scan_info(mock_bragg):
scan_info = ScanServerScanInfo(
scan_name="xas_simple_scan",
scan_id="scan-id-test",
scan_type="hardware_triggered",
positions=np.array([8000.0, 9000.0]),
additional_scan_parameters={"scan_time": 1.0, "scan_duration": 10.0},
)
_set_scan_info(mock_bragg, scan_info)
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.PENDING
with (
mock.patch.object(mock_bragg, "set_xas_settings") as set_xas_settings,
mock.patch.object(mock_bragg, "set_trig_settings") as set_trig_settings,
mock.patch.object(mock_bragg, "set_scan_control_settings") as set_scan_control_settings,
mock.patch.object(mock_bragg, "cancel_on_stop"),
mock.patch(
"superxas_bec.devices.mo1_bragg.mo1_bragg.CompareStatus",
return_value=_mock_status(),
),
):
mock_bragg.on_stage()
set_xas_settings.assert_called_once_with(low=8000.0, high=9000.0, scan_time=1.0)
set_trig_settings.assert_called_once_with(
enable_low=False,
enable_high=False,
exp_time_low=0,
exp_time_high=0,
cycle_low=0,
cycle_high=0,
)
assert set_scan_control_settings.call_args.kwargs["scan_duration"] == 10.0
def test_mo1_bragg_stage_uses_v4_advanced_scan_info(mock_bragg):
scan_info = ScanServerScanInfo(
scan_name="xas_advanced_scan",
scan_id="scan-id-test",
scan_type="hardware_triggered",
positions=np.array([8000.0, 9000.0]),
additional_scan_parameters={
"scan_time": 1.0,
"scan_duration": 10.0,
"p_kink": 50.0,
"e_kink": 8500.0,
},
)
_set_scan_info(mock_bragg, scan_info)
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.PENDING
with (
mock.patch.object(mock_bragg, "set_advanced_xas_settings") as set_advanced_xas_settings,
mock.patch.object(mock_bragg, "set_trig_settings"),
mock.patch.object(mock_bragg, "set_scan_control_settings") as set_scan_control_settings,
mock.patch.object(mock_bragg, "cancel_on_stop"),
mock.patch(
"superxas_bec.devices.mo1_bragg.mo1_bragg.CompareStatus",
return_value=_mock_status(),
),
):
mock_bragg.on_stage()
set_advanced_xas_settings.assert_called_once_with(
low=8000.0,
high=9000.0,
scan_time=1.0,
p_kink=50.0,
e_kink=8500.0,
)
assert set_scan_control_settings.call_args.kwargs["scan_duration"] == 10.0
@@ -0,0 +1,67 @@
# pylint: skip-file
from unittest import mock
import pytest
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd_devices.tests.utils import patched_device
from superxas_bec.devices.nidaq.nidaq import Nidaq, NidaqState, ScanType
from superxas_bec.devices.utils.utils import fetch_scan_info
@pytest.fixture(scope="function")
def mock_nidaq():
with patched_device(
Nidaq, name="nidaq", prefix="X10DA-CPCL-SCANSERVER:"
) as dev:
yield dev
def _set_scan_info(dev, scan_info):
dev.scan_info.msg.info.update(scan_info.model_dump())
dev.scan_parameters = fetch_scan_info(dev.scan_info)
def test_nidaq_check_scan_name_uses_normalized_scan_info(mock_nidaq):
valid = ScanServerScanInfo(scan_name="xas_simple_scan", scan_id="scan-id-test")
invalid = ScanServerScanInfo(scan_name="line_scan", scan_id="scan-id-test")
assert mock_nidaq._check_if_scan_name_is_valid(valid)
assert not mock_nidaq._check_if_scan_name_is_valid(invalid)
def test_nidaq_progress_update_uses_v4_additional_parameters(mock_nidaq):
scan_info = ScanServerScanInfo(
scan_name="nidaq_continuous_scan",
scan_id="scan-id-test",
additional_scan_parameters={"scan_duration": 10.0, "compression": False},
)
_set_scan_info(mock_nidaq, scan_info)
mock_nidaq.progress_signal.put = mock.MagicMock()
mock_nidaq._progress_update(4.0)
mock_nidaq.progress_signal.put.assert_called_once_with(value=6.0, max_value=10.0, done=False)
def test_nidaq_stage_uses_v4_continuous_scan_info(mock_nidaq):
scan_info = ScanServerScanInfo(
scan_name="nidaq_continuous_scan",
scan_id="scan-id-test",
additional_scan_parameters={"scan_duration": 10.0, "compression": False},
)
mock_nidaq.scan_info.msg.info.update(scan_info.model_dump())
mock_nidaq.state.put(NidaqState.STANDBY)
with (
mock.patch("superxas_bec.devices.nidaq.nidaq.CompareStatus") as status_cls,
mock.patch.object(mock_nidaq, "cancel_on_stop"),
mock.patch.object(mock_nidaq, "on_kickoff") as on_kickoff,
):
status_cls.return_value.wait = mock.MagicMock()
mock_nidaq.on_stage()
assert mock_nidaq.scan_type.get() == ScanType.CONTINUOUS
assert mock_nidaq.scan_duration.get() == 10.0
assert mock_nidaq.enable_compression.get() is False
on_kickoff.assert_not_called()
+7
View File
@@ -0,0 +1,7 @@
# pylint: skip-file
from bec_server.scan_server.tests.scan_fixtures import (
nth_done_status_mock,
readout_priority,
v4_scan_assembler,
)
@@ -0,0 +1,121 @@
# pylint: skip-file
from unittest import mock
import numpy as np
import pytest
from bec_server.scan_server.tests.scan_fixtures import *
from bec_server.scan_server.tests.scan_hook_tests import *
XAS_SIMPLE_SCAN_DEFAULT_HOOK_TESTS = [
("prepare_scan", [assert_prepare_scan_reads_baseline_devices]),
("open_scan", [assert_scan_open_called]),
("pre_scan", [assert_pre_scan_called]),
("unstage", [assert_unstage_all_devices_called]),
("close_scan", [assert_close_scan_waits_for_baseline_and_closes]),
]
def _assemble_xas_simple_scan(v4_scan_assembler, **overrides):
params = {
"start": 8000.0,
"stop": 9000.0,
"scan_time": 1.0,
"scan_duration": 10.0,
"motor": "mo1_bragg",
"daq": "nidaq",
"gonio": "mo1_gonio",
"monitored_readout_cycle": 1.0,
}
params.update(overrides)
return v4_scan_assembler("xas_simple_scan", **params)
@pytest.mark.parametrize(("hook_name", "hook_tests"), XAS_SIMPLE_SCAN_DEFAULT_HOOK_TESTS)
def test_xas_simple_scan_v4_default_hooks(
v4_scan_assembler, nth_done_status_mock, hook_name, hook_tests
):
scan = _assemble_xas_simple_scan(v4_scan_assembler)
run_scan_tests(scan, [(hook_name, hook_tests)], nth_done_status_mock=nth_done_status_mock)
def test_xas_simple_scan_v4_prepare_scan_updates_metadata(v4_scan_assembler):
scan = _assemble_xas_simple_scan(v4_scan_assembler)
scan.actions.add_scan_report_instruction_device_progress = mock.MagicMock()
baseline_status = mock.MagicMock()
scan.actions.read_baseline_devices = mock.MagicMock(return_value=baseline_status)
scan.prepare_scan()
scan.actions._build_scan_status_message("open")
np.testing.assert_array_equal(scan.scan_info.positions, np.array([8000.0, 9000.0]))
assert scan.scan_info.additional_scan_parameters["scan_time"] == 1.0
assert scan.scan_info.additional_scan_parameters["scan_duration"] == 10.0
assert scan.scan_info.additional_scan_parameters["monitored_readout_cycle"] == 1.0
assert scan.scan_info.readout_priority_modification["async"] == ["nidaq"]
scan.actions.add_scan_report_instruction_device_progress.assert_called_once_with(scan.motor)
scan.actions.read_baseline_devices.assert_called_once_with(wait=False)
assert scan._baseline_readout_status is baseline_status
def test_xas_simple_scan_v4_stage_centers_gonio_and_stages_devices(v4_scan_assembler):
scan = _assemble_xas_simple_scan(v4_scan_assembler)
scan.mo1_bragg.convert_angle_energy = mock.MagicMock(side_effect=[10.0, 14.0])
set_status = mock.MagicMock()
scan.actions.set = mock.MagicMock(return_value=set_status)
scan.actions.stage_all_devices = mock.MagicMock()
scan.stage()
assert scan.mo1_bragg.convert_angle_energy.call_args_list == [
mock.call(mode="EnergyToAngle", inp=8000.0),
mock.call(mode="EnergyToAngle", inp=9000.0),
]
scan.actions.set.assert_called_once_with(scan.mo1_gonio, 12.0, wait=False)
set_status.wait.assert_called_once_with(timeout=30)
scan.actions.stage_all_devices.assert_called_once_with()
def test_xas_simple_scan_v4_scan_core_reads_until_complete(v4_scan_assembler, nth_done_status_mock):
scan = _assemble_xas_simple_scan(v4_scan_assembler)
completion_status = nth_done_status_mock(resolve_after=3)
scan.actions.kickoff = mock.MagicMock()
scan.actions.complete = mock.MagicMock(return_value=completion_status)
scan.actions.read_monitored_devices = mock.MagicMock()
with mock.patch("superxas_bec.scans.mono_bragg_scans.time.sleep"):
scan.scan_core()
scan.actions.kickoff.assert_called_once_with(scan.motor)
scan.actions.complete.assert_called_once_with(scan.motor, wait=False)
assert scan.actions.read_monitored_devices.call_count == 2
def test_xas_simple_scan_v4_post_scan_completes_all_devices(v4_scan_assembler):
scan = _assemble_xas_simple_scan(v4_scan_assembler)
scan.actions.complete_all_devices = mock.MagicMock()
scan.post_scan()
scan.actions.complete_all_devices.assert_called_once_with()
def test_xas_advanced_scan_v4_updates_spline_metadata(v4_scan_assembler):
scan = v4_scan_assembler(
"xas_advanced_scan",
start=8000.0,
stop=9000.0,
scan_time=1.0,
scan_duration=10.0,
p_kink=50.0,
e_kink=8500.0,
motor="mo1_bragg",
daq="nidaq",
gonio="mo1_gonio",
)
assert scan.scan_name == "xas_advanced_scan"
assert scan.scan_info.additional_scan_parameters["p_kink"] == 50.0
assert scan.scan_info.additional_scan_parameters["e_kink"] == 8500.0
@@ -0,0 +1,76 @@
# pylint: skip-file
from unittest import mock
import pytest
from bec_server.scan_server.tests.scan_fixtures import *
from bec_server.scan_server.tests.scan_hook_tests import *
NIDAQ_CONTINUOUS_SCAN_DEFAULT_HOOK_TESTS = [
("prepare_scan", [assert_prepare_scan_reads_baseline_devices]),
("open_scan", [assert_scan_open_called]),
("stage", [assert_stage_all_devices_called]),
("pre_scan", [assert_pre_scan_called]),
("unstage", [assert_unstage_all_devices_called]),
("close_scan", [assert_close_scan_waits_for_baseline_and_closes]),
]
def _assemble_nidaq_continuous_scan(v4_scan_assembler, **overrides):
params = {"scan_duration": 10.0, "daq": "nidaq", "compression": False}
params.update(overrides)
return v4_scan_assembler("nidaq_continuous_scan", **params)
@pytest.mark.parametrize(("hook_name", "hook_tests"), NIDAQ_CONTINUOUS_SCAN_DEFAULT_HOOK_TESTS)
def test_nidaq_continuous_scan_v4_default_hooks(
v4_scan_assembler, nth_done_status_mock, hook_name, hook_tests
):
scan = _assemble_nidaq_continuous_scan(v4_scan_assembler)
run_scan_tests(scan, [(hook_name, hook_tests)], nth_done_status_mock=nth_done_status_mock)
def test_nidaq_continuous_scan_v4_prepare_scan_updates_metadata(v4_scan_assembler):
scan = _assemble_nidaq_continuous_scan(v4_scan_assembler)
scan.actions.add_scan_report_instruction_device_progress = mock.MagicMock()
baseline_status = mock.MagicMock()
scan.actions.read_baseline_devices = mock.MagicMock(return_value=baseline_status)
scan.prepare_scan()
scan.actions._build_scan_status_message("open")
assert scan.scan_info.additional_scan_parameters["scan_duration"] == 10.0
assert scan.scan_info.additional_scan_parameters["compression"] is False
assert scan.scan_info.readout_priority_modification["async"] == ["nidaq"]
scan.actions.add_scan_report_instruction_device_progress.assert_called_once_with(scan.daq)
scan.actions.read_baseline_devices.assert_called_once_with(wait=False)
assert scan._baseline_readout_status is baseline_status
def test_nidaq_continuous_scan_v4_scan_core_reads_until_complete(
v4_scan_assembler, nth_done_status_mock
):
scan = _assemble_nidaq_continuous_scan(v4_scan_assembler)
kickoff_status = mock.MagicMock()
completion_status = nth_done_status_mock(resolve_after=3)
scan.actions.kickoff = mock.MagicMock(return_value=kickoff_status)
scan.actions.complete = mock.MagicMock(return_value=completion_status)
scan.actions.read_monitored_devices = mock.MagicMock()
with mock.patch("superxas_bec.scans.nidaq_cont_scan.time.sleep"):
scan.scan_core()
scan.actions.kickoff.assert_called_once_with(device=scan.daq, wait=False)
kickoff_status.wait.assert_called_once_with(timeout=5)
scan.actions.complete.assert_called_once_with(device=scan.daq, wait=False)
assert scan.actions.read_monitored_devices.call_count == 2
def test_nidaq_continuous_scan_v4_post_scan_completes_all_devices(v4_scan_assembler):
scan = _assemble_nidaq_continuous_scan(v4_scan_assembler)
scan.actions.complete_all_devices = mock.MagicMock()
scan.post_scan()
scan.actions.complete_all_devices.assert_called_once_with()