refactor: migrate xas_scans to v4
This commit is contained in:
@@ -12,13 +12,13 @@ from typing import Any, Literal
|
||||
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import DeviceStatus, Signal, StatusBase
|
||||
from ophyd.status import SubscriptionStatus, WaitTimeoutError
|
||||
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
from ophyd_devices.utils.errors import DeviceStopError
|
||||
from pydantic import BaseModel, Field
|
||||
from typeguard import typechecked
|
||||
|
||||
from superxas_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner
|
||||
@@ -33,6 +33,7 @@ from superxas_bec.devices.mo1_bragg.mo1_bragg_enums import (
|
||||
TriggerControlSource,
|
||||
)
|
||||
from superxas_bec.devices.mo1_bragg.mo1_bragg_utils import compute_spline
|
||||
from superxas_bec.devices.utils.utils import fetch_scan_info
|
||||
|
||||
# Initialise logger
|
||||
logger = bec_logger.logger
|
||||
@@ -44,34 +45,6 @@ class Mo1BraggError(Exception):
|
||||
"""Exception for the Mo1 Bragg positioner"""
|
||||
|
||||
|
||||
########## Scan Parameter Model ##########
|
||||
|
||||
|
||||
class ScanParameter(BaseModel):
|
||||
"""Dataclass to store the scan parameters for the Mo1 Bragg positioner.
|
||||
This needs to be in sync with the kwargs of the MO1 Bragg scans from SuperXAS, to
|
||||
ensure that the scan parameters are correctly set. Any changes in the scan kwargs,
|
||||
i.e. renaming or adding new parameters, need to be represented here as well."""
|
||||
|
||||
scan_time: float | None = Field(None, description="Scan time for a half oscillation")
|
||||
scan_duration: float | None = Field(None, description="Duration of the scan")
|
||||
xrd_enable_low: bool | None = Field(
|
||||
None, description="XRD enabled for low, should be PV trig_ena_lo_enum"
|
||||
) # trig_enable_low: bool = None
|
||||
xrd_enable_high: bool | None = Field(
|
||||
None, description="XRD enabled for high, should be PV trig_ena_hi_enum"
|
||||
) # trig_enable_high: bool = None
|
||||
exp_time_low: float | None = Field(None, description="Exposure time low energy/angle")
|
||||
exp_time_high: float | None = Field(None, description="Exposure time high energy/angle")
|
||||
cycle_low: int | None = Field(None, description="Cycle for low energy/angle")
|
||||
cycle_high: int | None = Field(None, description="Cycle for high energy/angle")
|
||||
start: float | None = Field(None, description="Start value for energy/angle")
|
||||
stop: float | None = Field(None, description="Stop value for energy/angle")
|
||||
p_kink: float | None = Field(None, description="P Kink")
|
||||
e_kink: float | None = Field(None, description="Energy Kink")
|
||||
model_config: dict = {"validate_assignment": True}
|
||||
|
||||
|
||||
########### Mo1 Bragg Motor Class ###########
|
||||
|
||||
|
||||
@@ -83,7 +56,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
|
||||
progress_signal = Cpt(ProgressSignal, name="progress_signal")
|
||||
|
||||
USER_ACCESS = ["set_advanced_xas_settings", "set_xtal"]
|
||||
USER_ACCESS = ["set_advanced_xas_settings", "set_xtal", "convert_angle_energy"]
|
||||
|
||||
def __init__(self, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): # type: ignore
|
||||
"""
|
||||
@@ -94,8 +67,15 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
scan_info (ScanInfo): The scan info to use.
|
||||
"""
|
||||
super().__init__(name=name, scan_info=scan_info, prefix=prefix, **kwargs)
|
||||
self.scan_parameter = ScanParameter()
|
||||
self.scan_parameters: ScanServerScanInfo | None = None
|
||||
self.timeout_for_pvwait = 7.5
|
||||
self.valid_scan_names = [
|
||||
"xas_simple_scan",
|
||||
"xas_simple_scan_with_xrd",
|
||||
"xas_advanced_scan",
|
||||
"xas_advanced_scan_with_xrd",
|
||||
"nidaq_continuous_scan",
|
||||
]
|
||||
|
||||
########################################
|
||||
# Beamline Specific Implementations #
|
||||
@@ -122,6 +102,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
|
||||
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
|
||||
"""
|
||||
self.scan_parameters = fetch_scan_info(self.scan_info)
|
||||
|
||||
if self.scan_control.scan_msg.get() != ScanControlLoadMessage.PENDING:
|
||||
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
|
||||
@@ -129,14 +110,22 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
self.scan_control.scan_val_reset.put(1)
|
||||
status.wait(timeout=self.timeout_for_pvwait)
|
||||
|
||||
scan_name = self.scan_info.msg.scan_name
|
||||
self._update_scan_parameter()
|
||||
scan_name = self.scan_parameters.scan_name
|
||||
if not self._check_if_scan_name_is_valid(self.scan_parameters):
|
||||
return None
|
||||
|
||||
start, stop = self._get_start_stop()
|
||||
scan_time = self._scan_parameter("scan_time")
|
||||
scan_duration = self._scan_parameter("scan_duration")
|
||||
if scan_name == "xas_simple_scan":
|
||||
self._raise_for_missing(
|
||||
scan_name, ["start", "stop", "scan_time", "scan_duration"],
|
||||
[start, stop, scan_time, scan_duration],
|
||||
)
|
||||
self.set_xas_settings(
|
||||
low=self.scan_parameter.start,
|
||||
high=self.scan_parameter.stop,
|
||||
scan_time=self.scan_parameter.scan_time,
|
||||
low=start,
|
||||
high=stop,
|
||||
scan_time=scan_time,
|
||||
)
|
||||
self.set_trig_settings(
|
||||
enable_low=False,
|
||||
@@ -147,32 +136,72 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
cycle_high=0,
|
||||
)
|
||||
self.set_scan_control_settings(
|
||||
mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration
|
||||
mode=ScanControlMode.SIMPLE, scan_duration=scan_duration
|
||||
)
|
||||
elif scan_name == "xas_simple_scan_with_xrd":
|
||||
xrd_enable_low = self._scan_parameter("xrd_enable_low", "break_enable_low")
|
||||
xrd_enable_high = self._scan_parameter("xrd_enable_high", "break_enable_high")
|
||||
exp_time_low = self._scan_parameter("exp_time_low", "break_time_low")
|
||||
exp_time_high = self._scan_parameter("exp_time_high", "break_time_high")
|
||||
cycle_low = self._scan_parameter("cycle_low")
|
||||
cycle_high = self._scan_parameter("cycle_high")
|
||||
self._raise_for_missing(
|
||||
scan_name,
|
||||
[
|
||||
"start",
|
||||
"stop",
|
||||
"scan_time",
|
||||
"scan_duration",
|
||||
"xrd_enable_low",
|
||||
"xrd_enable_high",
|
||||
"exp_time_low",
|
||||
"exp_time_high",
|
||||
"cycle_low",
|
||||
"cycle_high",
|
||||
],
|
||||
[
|
||||
start,
|
||||
stop,
|
||||
scan_time,
|
||||
scan_duration,
|
||||
xrd_enable_low,
|
||||
xrd_enable_high,
|
||||
exp_time_low,
|
||||
exp_time_high,
|
||||
cycle_low,
|
||||
cycle_high,
|
||||
],
|
||||
)
|
||||
self.set_xas_settings(
|
||||
low=self.scan_parameter.start,
|
||||
high=self.scan_parameter.stop,
|
||||
scan_time=self.scan_parameter.scan_time,
|
||||
low=start,
|
||||
high=stop,
|
||||
scan_time=scan_time,
|
||||
)
|
||||
self.set_trig_settings(
|
||||
enable_low=self.scan_parameter.xrd_enable_low, # enable_low=self.scan_parameter.trig_enable_low,
|
||||
enable_high=self.scan_parameter.xrd_enable_high, # enable_high=self.scan_parameter.trig_enable_high,
|
||||
exp_time_low=self.scan_parameter.exp_time_low,
|
||||
exp_time_high=self.scan_parameter.exp_time_high,
|
||||
cycle_low=self.scan_parameter.cycle_low,
|
||||
cycle_high=self.scan_parameter.cycle_high,
|
||||
enable_low=xrd_enable_low,
|
||||
enable_high=xrd_enable_high,
|
||||
exp_time_low=exp_time_low,
|
||||
exp_time_high=exp_time_high,
|
||||
cycle_low=cycle_low,
|
||||
cycle_high=cycle_high,
|
||||
)
|
||||
self.set_scan_control_settings(
|
||||
mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration
|
||||
mode=ScanControlMode.SIMPLE, scan_duration=scan_duration
|
||||
)
|
||||
elif scan_name == "xas_advanced_scan":
|
||||
p_kink = self._scan_parameter("p_kink")
|
||||
e_kink = self._scan_parameter("e_kink")
|
||||
self._raise_for_missing(
|
||||
scan_name,
|
||||
["start", "stop", "scan_time", "scan_duration", "p_kink", "e_kink"],
|
||||
[start, stop, scan_time, scan_duration, p_kink, e_kink],
|
||||
)
|
||||
self.set_advanced_xas_settings(
|
||||
low=self.scan_parameter.start,
|
||||
high=self.scan_parameter.stop,
|
||||
scan_time=self.scan_parameter.scan_time,
|
||||
p_kink=self.scan_parameter.p_kink,
|
||||
e_kink=self.scan_parameter.e_kink,
|
||||
low=start,
|
||||
high=stop,
|
||||
scan_time=scan_time,
|
||||
p_kink=p_kink,
|
||||
e_kink=e_kink,
|
||||
)
|
||||
self.set_trig_settings(
|
||||
enable_low=False,
|
||||
@@ -183,26 +212,65 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
cycle_high=0,
|
||||
)
|
||||
self.set_scan_control_settings(
|
||||
mode=ScanControlMode.ADVANCED, scan_duration=self.scan_parameter.scan_duration
|
||||
mode=ScanControlMode.ADVANCED, scan_duration=scan_duration
|
||||
)
|
||||
elif scan_name == "xas_advanced_scan_with_xrd":
|
||||
p_kink = self._scan_parameter("p_kink")
|
||||
e_kink = self._scan_parameter("e_kink")
|
||||
xrd_enable_low = self._scan_parameter("xrd_enable_low", "break_enable_low")
|
||||
xrd_enable_high = self._scan_parameter("xrd_enable_high", "break_enable_high")
|
||||
exp_time_low = self._scan_parameter("exp_time_low", "break_time_low")
|
||||
exp_time_high = self._scan_parameter("exp_time_high", "break_time_high")
|
||||
cycle_low = self._scan_parameter("cycle_low")
|
||||
cycle_high = self._scan_parameter("cycle_high")
|
||||
self._raise_for_missing(
|
||||
scan_name,
|
||||
[
|
||||
"start",
|
||||
"stop",
|
||||
"scan_time",
|
||||
"scan_duration",
|
||||
"p_kink",
|
||||
"e_kink",
|
||||
"xrd_enable_low",
|
||||
"xrd_enable_high",
|
||||
"exp_time_low",
|
||||
"exp_time_high",
|
||||
"cycle_low",
|
||||
"cycle_high",
|
||||
],
|
||||
[
|
||||
start,
|
||||
stop,
|
||||
scan_time,
|
||||
scan_duration,
|
||||
p_kink,
|
||||
e_kink,
|
||||
xrd_enable_low,
|
||||
xrd_enable_high,
|
||||
exp_time_low,
|
||||
exp_time_high,
|
||||
cycle_low,
|
||||
cycle_high,
|
||||
],
|
||||
)
|
||||
self.set_advanced_xas_settings(
|
||||
low=self.scan_parameter.start,
|
||||
high=self.scan_parameter.stop,
|
||||
scan_time=self.scan_parameter.scan_time,
|
||||
p_kink=self.scan_parameter.p_kink,
|
||||
e_kink=self.scan_parameter.e_kink,
|
||||
low=start,
|
||||
high=stop,
|
||||
scan_time=scan_time,
|
||||
p_kink=p_kink,
|
||||
e_kink=e_kink,
|
||||
)
|
||||
self.set_trig_settings(
|
||||
enable_low=self.scan_parameter.xrd_enable_low, # enable_low=self.scan_parameter.trig_enable_low,
|
||||
enable_high=self.scan_parameter.xrd_enable_high, # enable_high=self.scan_parameter.trig_enable_high,
|
||||
exp_time_low=self.scan_parameter.exp_time_low,
|
||||
exp_time_high=self.scan_parameter.exp_time_high,
|
||||
cycle_low=self.scan_parameter.cycle_low,
|
||||
cycle_high=self.scan_parameter.cycle_high,
|
||||
enable_low=xrd_enable_low,
|
||||
enable_high=xrd_enable_high,
|
||||
exp_time_low=exp_time_low,
|
||||
exp_time_high=exp_time_high,
|
||||
cycle_low=cycle_low,
|
||||
cycle_high=cycle_high,
|
||||
)
|
||||
self.set_scan_control_settings(
|
||||
mode=ScanControlMode.ADVANCED, scan_duration=self.scan_parameter.scan_duration
|
||||
mode=ScanControlMode.ADVANCED, scan_duration=scan_duration
|
||||
)
|
||||
else:
|
||||
return
|
||||
@@ -284,6 +352,46 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
self.stopped = True # Needs to be set to stop motion
|
||||
|
||||
######### Utility Methods #########
|
||||
def _check_if_scan_name_is_valid(self, scan_parameters: ScanServerScanInfo) -> bool:
|
||||
"""Check if the scan is within the list of scans supported by the backend."""
|
||||
return scan_parameters.scan_name in self.valid_scan_names
|
||||
|
||||
def _scan_parameter(self, *names: str):
|
||||
"""Fetch a scan parameter from v4 metadata, with legacy fallbacks."""
|
||||
if self.scan_parameters is None:
|
||||
return None
|
||||
sources = [
|
||||
self.scan_parameters.additional_scan_parameters,
|
||||
getattr(self.scan_parameters, "metadata", {}),
|
||||
getattr(self.scan_info.msg, "scan_parameters", {}),
|
||||
]
|
||||
request_inputs = self.scan_parameters.request_inputs or {}
|
||||
sources.extend(
|
||||
[
|
||||
request_inputs.get("inputs", {}),
|
||||
request_inputs.get("kwargs", {}),
|
||||
]
|
||||
)
|
||||
for source in sources:
|
||||
for name in names:
|
||||
if isinstance(source, dict) and name in source:
|
||||
return source[name]
|
||||
return None
|
||||
|
||||
def _get_start_stop(self):
|
||||
"""Return scan start/stop from v4 positions or legacy request inputs."""
|
||||
if self.scan_parameters is not None and self.scan_parameters.positions is not None:
|
||||
if len(self.scan_parameters.positions) == 2:
|
||||
return self.scan_parameters.positions
|
||||
return self._scan_parameter("start"), self._scan_parameter("stop")
|
||||
|
||||
def _raise_for_missing(self, scan_name: str, names: list[str], values: list) -> None:
|
||||
if any(value is None for value in values):
|
||||
raise Mo1BraggError(
|
||||
f"Missing scan parameters for {scan_name}. Required parameters: "
|
||||
f"{', '.join(names)} in scan info {self.scan_parameters}"
|
||||
)
|
||||
|
||||
def _progress_update(self, value, **kwargs) -> None:
|
||||
"""Callback method to update the scan progress, runs a callback
|
||||
to SUB_PROGRESS subscribers, i.e. BEC.
|
||||
@@ -454,13 +562,3 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
|
||||
for s in status_list:
|
||||
s.wait(timeout=self.timeout_for_pvwait)
|
||||
|
||||
|
||||
def _update_scan_parameter(self):
|
||||
"""Get the scan_info parameters for the scan."""
|
||||
for key, value in self.scan_info.msg.request_inputs["inputs"].items():
|
||||
if hasattr(self.scan_parameter, key):
|
||||
setattr(self.scan_parameter, key, value)
|
||||
for key, value in self.scan_info.msg.request_inputs["kwargs"].items():
|
||||
if hasattr(self.scan_parameter, key):
|
||||
setattr(self.scan_parameter, key, value)
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Literal, cast
|
||||
from typing import TYPE_CHECKING, Literal
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase
|
||||
from ophyd.status import SubscriptionStatus, WaitTimeoutError
|
||||
@@ -19,6 +20,7 @@ from superxas_bec.devices.nidaq.nidaq_enums import (
|
||||
ScanRates,
|
||||
ScanType,
|
||||
)
|
||||
from superxas_bec.devices.utils.utils import fetch_scan_info
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
@@ -425,7 +427,7 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
|
||||
def __init__(self, prefix: str = "", *, name: str, scan_info: ScanInfo = None, **kwargs):
|
||||
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
|
||||
self.scan_info: ScanInfo
|
||||
self.scan_parameters: ScanServerScanInfo | None = None
|
||||
self.timeout_wait_for_signal = 5 # put 5s firsts
|
||||
self._timeout_wait_for_pv = 5 # 5s timeout for pv calls. editted due to timeout issues persisting
|
||||
self.valid_scan_names = [
|
||||
@@ -440,9 +442,11 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
# Beamline Methods #
|
||||
########################################
|
||||
|
||||
def _check_if_scan_name_is_valid(self) -> bool:
|
||||
def _check_if_scan_name_is_valid(self, scan_parameters: ScanServerScanInfo | None) -> bool:
|
||||
"""Check if the scan is within the list of scans for which the backend is working"""
|
||||
scan_name = self.scan_info.msg.scan_name
|
||||
if scan_parameters is None:
|
||||
return False
|
||||
scan_name = scan_parameters.scan_name
|
||||
if scan_name in self.valid_scan_names:
|
||||
return True
|
||||
return False
|
||||
@@ -600,7 +604,8 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
|
||||
If the upcoming scan is not in the list of valid scans, return immediately.
|
||||
"""
|
||||
if not self._check_if_scan_name_is_valid():
|
||||
self.scan_parameters = fetch_scan_info(self.scan_info)
|
||||
if not self._check_if_scan_name_is_valid(self.scan_parameters):
|
||||
return None
|
||||
|
||||
if self.state.get() != NidaqState.STANDBY:
|
||||
@@ -610,16 +615,16 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
status.wait(timeout=self.timeout_wait_for_signal)
|
||||
|
||||
# If scan is not part of the valid_scan_names,
|
||||
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
|
||||
if self.scan_parameters.scan_name != "nidaq_continuous_scan":
|
||||
self.scan_type.set(ScanType.TRIGGERED).wait(timeout=self._timeout_wait_for_pv)
|
||||
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
|
||||
self.enable_compression.set(1).wait(timeout=self._timeout_wait_for_pv)
|
||||
else:
|
||||
self.scan_type.set(ScanType.CONTINUOUS).wait(timeout=self._timeout_wait_for_pv)
|
||||
self.scan_duration.set(self.scan_info.msg.scan_parameters["scan_duration"]).wait(
|
||||
self.scan_duration.set(self._scan_parameter("scan_duration")).wait(
|
||||
timeout=self._timeout_wait_for_pv
|
||||
)
|
||||
self.enable_compression.set(self.scan_info.msg.scan_parameters["compression"]).wait(
|
||||
self.enable_compression.set(self._scan_parameter("compression")).wait(
|
||||
timeout=self._timeout_wait_for_pv
|
||||
)
|
||||
|
||||
@@ -632,7 +637,7 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
# self.stage_call.set(1).wait(timeout=self._timeout_wait_for_pv)
|
||||
self.stage_call.put(1)
|
||||
status.wait(timeout=self.timeout_wait_for_signal)
|
||||
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
|
||||
if self.scan_parameters.scan_name != "nidaq_continuous_scan":
|
||||
status = self.on_kickoff()
|
||||
self.cancel_on_stop(status)
|
||||
status.wait(timeout=self._timeout_wait_for_pv)
|
||||
@@ -663,10 +668,10 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
before the motor starts its oscillation. This is needed for being properly homed.
|
||||
The NIDAQ should go into Acquiring mode.
|
||||
"""
|
||||
if not self._check_if_scan_name_is_valid():
|
||||
if not self._check_if_scan_name_is_valid(self.scan_parameters):
|
||||
return None
|
||||
|
||||
if self.scan_info.msg.scan_name == "nidaq_continuous_scan":
|
||||
if self.scan_parameters.scan_name == "nidaq_continuous_scan":
|
||||
logger.info(f"Device {self.name} ready to be kicked off for nidaq_continuous_scan")
|
||||
return None
|
||||
|
||||
@@ -687,15 +692,35 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
For the NIDAQ we use this method to stop the backend since it
|
||||
would not stop by itself in its current implementation since the number of points are not predefined.
|
||||
"""
|
||||
if not self._check_if_scan_name_is_valid():
|
||||
if not self._check_if_scan_name_is_valid(self.scan_parameters):
|
||||
return None
|
||||
|
||||
status = CompareStatus(self.state, NidaqState.STANDBY)
|
||||
self.cancel_on_stop(status)
|
||||
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
|
||||
if self.scan_parameters.scan_name != "nidaq_continuous_scan":
|
||||
self.on_stop()
|
||||
return status
|
||||
|
||||
def _scan_parameter(self, name: str):
|
||||
"""Fetch a scan parameter from v4 metadata, with legacy fallbacks."""
|
||||
if self.scan_parameters is None:
|
||||
return None
|
||||
sources = [
|
||||
self.scan_parameters.additional_scan_parameters,
|
||||
getattr(self.scan_info.msg, "scan_parameters", {}),
|
||||
]
|
||||
request_inputs = self.scan_parameters.request_inputs or {}
|
||||
sources.extend(
|
||||
[
|
||||
request_inputs.get("inputs", {}),
|
||||
request_inputs.get("kwargs", {}),
|
||||
]
|
||||
)
|
||||
for source in sources:
|
||||
if isinstance(source, dict) and name in source:
|
||||
return source[name]
|
||||
return None
|
||||
|
||||
def _progress_update(self, value, **kwargs) -> None:
|
||||
"""Callback method to update the scan progress, runs a callback
|
||||
to SUB_PROGRESS subscribers, i.e. BEC.
|
||||
@@ -703,7 +728,9 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
Args:
|
||||
value (int) : current progress value
|
||||
"""
|
||||
scan_duration = self.scan_info.msg.scan_parameters.get("scan_duration", None)
|
||||
if self.scan_parameters is None:
|
||||
return
|
||||
scan_duration = self._scan_parameter("scan_duration")
|
||||
if not isinstance(scan_duration, (int, float)):
|
||||
return
|
||||
value = scan_duration - value
|
||||
|
||||
@@ -0,0 +1,2 @@
|
||||
"""Utility helpers for SuperXAS devices."""
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
"""Utility functions for SuperXAS devices."""
|
||||
|
||||
from copy import deepcopy
|
||||
|
||||
import numpy as np
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
|
||||
from pydantic import ValidationError
|
||||
|
||||
|
||||
def fetch_scan_info(scan_info: ScanInfo) -> ScanServerScanInfo:
|
||||
"""Normalize BEC scan info into the v4 scan info model."""
|
||||
info = deepcopy(scan_info.msg.info)
|
||||
if isinstance(info.get("positions"), list):
|
||||
info["positions"] = np.array(info["positions"])
|
||||
try:
|
||||
msg = ScanServerScanInfo.model_validate(info)
|
||||
except ValidationError:
|
||||
if info.get("scan_type") == "fly":
|
||||
info["scan_type"] = "hardware_triggered"
|
||||
else:
|
||||
info["scan_type"] = "software_triggered"
|
||||
msg = ScanServerScanInfo.model_validate(info)
|
||||
|
||||
return msg
|
||||
|
||||
@@ -1,338 +1,202 @@
|
||||
"""This module contains the scan classes for the mono bragg motor of the SuperXAS beamline."""
|
||||
"""
|
||||
V4 implementation of the SuperXAS mono bragg XAS scans.
|
||||
|
||||
Scan procedure:
|
||||
- prepare_scan
|
||||
- open_scan
|
||||
- stage
|
||||
- pre_scan
|
||||
- scan_core
|
||||
- at_each_point (optionally called by scan_core)
|
||||
- post_scan
|
||||
- unstage
|
||||
- close_scan
|
||||
- on_exception (called if any exception is raised during the scan)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from typing import Literal
|
||||
from typing import Annotated
|
||||
|
||||
import numpy as np
|
||||
from bec_lib.device import DeviceBase
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_server.scan_server.scans import AsyncFlyScanBase
|
||||
|
||||
logger = bec_logger.logger
|
||||
from bec_lib.scan_args import ScanArgument, Units
|
||||
from bec_server.scan_server.scans.scan_base import ScanBase, ScanType
|
||||
from bec_server.scan_server.scans.scan_modifier import scan_hook
|
||||
|
||||
|
||||
class XASSimpleScan(AsyncFlyScanBase):
|
||||
"""Class for the XAS simple scan"""
|
||||
class XASSimpleScan(ScanBase):
|
||||
"""Simple oscillating XAS scan on the SuperXAS mono bragg motor."""
|
||||
|
||||
scan_type = ScanType.HARDWARE_TRIGGERED
|
||||
scan_name = "xas_simple_scan"
|
||||
scan_type = "fly"
|
||||
scan_report_hint = "device_progress"
|
||||
required_kwargs = []
|
||||
use_scan_progress_report = False
|
||||
pre_move = False
|
||||
|
||||
gui_config = {
|
||||
"Movement Parameters": ["start", "stop"],
|
||||
"Scan Parameters": ["scan_time", "scan_duration"],
|
||||
"Scan Parameters": ["scan_time", "scan_duration", "monitored_readout_cycle"],
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
start: float,
|
||||
stop: float,
|
||||
scan_time: float,
|
||||
scan_duration: float,
|
||||
motor: DeviceBase = "mo1_bragg",
|
||||
# fmt: off
|
||||
start: Annotated[float, ScanArgument(display_name="Start Energy", description="Start energy.", units=Units.eV, ge=4500, le=64000)],
|
||||
stop: Annotated[float, ScanArgument(display_name="Stop Energy", description="Stop energy.", units=Units.eV, ge=4500, le=64000)],
|
||||
scan_time: Annotated[float, ScanArgument(display_name="Scan Time", description="Time for one scan cycle.", units=Units.s, ge=0.05)],
|
||||
scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Total scan duration.", units=Units.s, ge=0.05)],
|
||||
motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None,
|
||||
daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None,
|
||||
gonio: Annotated[DeviceBase | None, ScanArgument(display_name="Gonio", description="Goniometer device.")] = None,
|
||||
monitored_readout_cycle: Annotated[float, ScanArgument(display_name="Monitored Readout Cycle", description="Delay between monitored readouts.", units=Units.s, gt=0)] = 1,
|
||||
# fmt: on
|
||||
**kwargs,
|
||||
):
|
||||
"""The xas_simple_scan is used to start a simple oscillating scan on the mono bragg motor.
|
||||
Start and Stop define the energy range for the scan, scan_time is the time for one scan
|
||||
cycle and scan_duration is the duration of the scan. If scan duration is set to 0, the
|
||||
scan will run infinitely.
|
||||
"""
|
||||
Start a simple oscillating scan on the mono bragg motor.
|
||||
|
||||
Args:
|
||||
start (float): Start energy for the scan.
|
||||
stop (float): Stop energy for the scan.
|
||||
start (float): Start energy.
|
||||
stop (float): Stop energy.
|
||||
scan_time (float): Time for one scan cycle.
|
||||
scan_duration (float): Duration of the scan.
|
||||
motor (DeviceBase, optional): Motor device to be used for the scan.
|
||||
Defaults to "mo1_bragg".
|
||||
Examples:
|
||||
>>> scans.xas_simple_scan(start=8000, stop=9000, scan_time=1, scan_duration=10)
|
||||
scan_duration (float): Total scan duration.
|
||||
motor (DeviceBase | None): Bragg motor device.
|
||||
daq (DeviceBase | None): NIDAQ device.
|
||||
gonio (DeviceBase | None): Goniometer device.
|
||||
monitored_readout_cycle (float): Delay between monitored readouts.
|
||||
|
||||
Returns:
|
||||
ScanReport
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self.motor = motor
|
||||
self._baseline_readout_status = None
|
||||
self.start = start
|
||||
self.stop = stop
|
||||
self.scan_time = scan_time
|
||||
self.scan_duration = scan_duration
|
||||
self.primary_readout_cycle = 1
|
||||
|
||||
def stage(self):
|
||||
"""call the stage procedure"""
|
||||
# Compute position for mo1_gonio pre move
|
||||
# Since energy is not linear to angle, we have to calculate the angles first.
|
||||
pos_start = yield from self.stubs.send_rpc_and_wait(
|
||||
"mo1_bragg",
|
||||
"convert_angle_energy",
|
||||
mode = "EnergyToAngle",
|
||||
inp = self.start,
|
||||
)
|
||||
pos_end = yield from self.stubs.send_rpc_and_wait(
|
||||
"mo1_bragg",
|
||||
"convert_angle_energy",
|
||||
mode = "EnergyToAngle",
|
||||
inp = self.stop,
|
||||
)
|
||||
# Goniometer position is in the middle of the start and stop angle of the scan
|
||||
pos = (pos_start + pos_end) / 2
|
||||
|
||||
# Premove with mo1_gonio
|
||||
yield from self.stubs.send_rpc_and_wait(
|
||||
"mo1_gonio",
|
||||
"move",
|
||||
position = pos,
|
||||
wait = True,
|
||||
timeout = 30, # 30 seconds timeout
|
||||
)
|
||||
# Continue with staging the devices
|
||||
yield from self.stubs.stage()
|
||||
|
||||
def update_readout_priority(self):
|
||||
"""Ensure that NIDAQ is not monitored for any quick EXAFS."""
|
||||
super().update_readout_priority()
|
||||
self.readout_priority["async"].append("nidaq")
|
||||
|
||||
def prepare_positions(self):
|
||||
"""Prepare the positions for the scan.
|
||||
|
||||
Use here only start and end energy defining the range for the scan.
|
||||
"""
|
||||
self.mo1_bragg = self._resolve_device(motor, "mo1_bragg")
|
||||
self.motor = self.mo1_bragg
|
||||
self.daq = self._resolve_device(daq, "nidaq")
|
||||
self.mo1_gonio = self._resolve_device(gonio, "mo1_gonio")
|
||||
self.monitored_readout_cycle = monitored_readout_cycle
|
||||
self.positions = np.array([self.start, self.stop], dtype=float)
|
||||
self.num_pos = None
|
||||
yield None
|
||||
|
||||
self.update_scan_info(
|
||||
positions=self.positions,
|
||||
scan_time=scan_time,
|
||||
scan_duration=scan_duration,
|
||||
monitored_readout_cycle=monitored_readout_cycle,
|
||||
)
|
||||
self.actions.set_device_readout_priority([self.daq], priority="async")
|
||||
|
||||
def _resolve_device(self, device: DeviceBase | str | None, default_name: str) -> DeviceBase:
|
||||
"""Resolve optional device arguments against the v4 device container."""
|
||||
if device is None:
|
||||
return self.dev[default_name]
|
||||
if isinstance(device, str):
|
||||
return self.dev[device]
|
||||
return device
|
||||
|
||||
@scan_hook
|
||||
def prepare_scan(self):
|
||||
"""Prepare scan metadata and baseline readout."""
|
||||
self.actions.add_scan_report_instruction_device_progress(self.motor)
|
||||
self._baseline_readout_status = self.actions.read_baseline_devices(wait=False)
|
||||
|
||||
@scan_hook
|
||||
def open_scan(self):
|
||||
"""Open the scan."""
|
||||
self.actions.open_scan()
|
||||
|
||||
@scan_hook
|
||||
def stage(self):
|
||||
"""Center the goniometer for the requested energy range and stage devices."""
|
||||
pos_start = self.mo1_bragg.convert_angle_energy(mode="EnergyToAngle", inp=self.start)
|
||||
pos_end = self.mo1_bragg.convert_angle_energy(mode="EnergyToAngle", inp=self.stop)
|
||||
gonio_position = (pos_start + pos_end) / 2
|
||||
|
||||
self.actions.set(self.mo1_gonio, gonio_position, wait=False).wait(timeout=30)
|
||||
self.actions.stage_all_devices()
|
||||
|
||||
@scan_hook
|
||||
def pre_scan(self):
|
||||
"""Pre Scan action."""
|
||||
|
||||
self._check_limits()
|
||||
# Ensure parent class pre_scan actions to be called.
|
||||
yield from super().pre_scan()
|
||||
|
||||
def scan_report_instructions(self):
|
||||
"""
|
||||
Return the instructions for the scan report.
|
||||
"""
|
||||
yield from self.stubs.scan_report_instruction({"device_progress": [self.motor]})
|
||||
"""Execute pre-scan hooks on all devices."""
|
||||
self.actions.pre_scan_all_devices()
|
||||
|
||||
@scan_hook
|
||||
def scan_core(self):
|
||||
"""Run the scan core.
|
||||
Kickoff the oscillation on the Bragg motor and wait for the completion of the motion.
|
||||
"""
|
||||
# Start the oscillation on the Bragg motor.
|
||||
yield from self.stubs.kickoff(device=self.motor)
|
||||
complete_status = yield from self.stubs.complete(device=self.motor, wait=False)
|
||||
"""Kick off the mono bragg oscillation and read monitored devices."""
|
||||
self.actions.kickoff(self.motor)
|
||||
completion_status = self.actions.complete(self.motor, wait=False)
|
||||
|
||||
while not complete_status.done:
|
||||
# Readout monitored devices
|
||||
yield from self.stubs.read(group="monitored", point_id=self.point_id)
|
||||
time.sleep(self.primary_readout_cycle)
|
||||
self.point_id += 1
|
||||
while not completion_status.done:
|
||||
self.at_each_point()
|
||||
|
||||
self.num_pos = self.point_id
|
||||
@scan_hook
|
||||
def at_each_point(self):
|
||||
"""Read monitored devices during the hardware-triggered scan."""
|
||||
self.actions.read_monitored_devices()
|
||||
time.sleep(self.monitored_readout_cycle)
|
||||
|
||||
@scan_hook
|
||||
def post_scan(self):
|
||||
"""Complete all devices after the scan core."""
|
||||
self.actions.complete_all_devices()
|
||||
|
||||
# class XASSimpleScanWithXRD(XASSimpleScan):
|
||||
# """Class for the XAS simple scan with XRD"""
|
||||
@scan_hook
|
||||
def unstage(self):
|
||||
"""Unstage all devices."""
|
||||
self.actions.unstage_all_devices()
|
||||
|
||||
# scan_name = "xas_simple_scan_with_xrd"
|
||||
# gui_config = {
|
||||
# "Movement Parameters": ["start", "stop"],
|
||||
# "Scan Parameters": ["scan_time", "scan_duration"],
|
||||
# "Low Energy Range": ["xrd_enable_low", "num_trigger_low", "exp_time_low", "cycle_low"],
|
||||
# "High Energy Range": ["xrd_enable_high", "num_trigger_high", "exp_time_high", "cycle_high"],
|
||||
# }
|
||||
@scan_hook
|
||||
def close_scan(self):
|
||||
"""Close the scan after the baseline readout has completed."""
|
||||
if self._baseline_readout_status is not None:
|
||||
self._baseline_readout_status.wait()
|
||||
self.actions.close_scan()
|
||||
self.actions.check_for_unchecked_statuses()
|
||||
|
||||
# def __init__(
|
||||
# self,
|
||||
# start: float,
|
||||
# stop: float,
|
||||
# scan_time: float,
|
||||
# scan_duration: float,
|
||||
# xrd_enable_low: bool,
|
||||
# num_trigger_low: int,
|
||||
# exp_time_low: float,
|
||||
# cycle_low: int,
|
||||
# xrd_enable_high: bool,
|
||||
# num_trigger_high: int,
|
||||
# exp_time_high: float,
|
||||
# cycle_high: float,
|
||||
# motor: DeviceBase = "mo1_bragg",
|
||||
# **kwargs,
|
||||
# ):
|
||||
# """The xas_simple_scan_with_xrd is an oscillation motion on the mono motor
|
||||
# with XRD triggering at low and high energy ranges.
|
||||
# If scan duration is set to 0, the scan will run infinitely.
|
||||
|
||||
# Args:
|
||||
# start (float): Start energy for the scan.
|
||||
# stop (float): Stop energy for the scan.
|
||||
# scan_time (float): Time for one oscillation .
|
||||
# scan_duration (float): Total duration of the scan.
|
||||
# xrd_enable_low (bool): Enable XRD triggering for the low energy range.
|
||||
# num_trigger_low (int): Number of triggers for the low energy range.
|
||||
# exp_time_low (float): Exposure time for the low energy range.
|
||||
# cycle_low (int): Specify how often the triggers should be considered,
|
||||
# every nth cycle for low
|
||||
# xrd_enable_high (bool): Enable XRD triggering for the high energy range.
|
||||
# num_trigger_high (int): Number of triggers for the high energy range.
|
||||
# exp_time_high (float): Exposure time for the high energy range.
|
||||
# cycle_high (int): Specify how often the triggers should be considered,
|
||||
# every nth cycle for high
|
||||
# motor (DeviceBase, optional): Motor device to be used for the scan.
|
||||
# Defaults to "mo1_bragg".
|
||||
|
||||
# Examples:
|
||||
# >>> scans.xas_simple_scan_with_xrd(start=8000, stop=9000, scan_time=1, scan_duration=10, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000)
|
||||
# """
|
||||
# super().__init__(
|
||||
# start=start,
|
||||
# stop=stop,
|
||||
# scan_time=scan_time,
|
||||
# scan_duration=scan_duration,
|
||||
# motor=motor,
|
||||
# **kwargs,
|
||||
# )
|
||||
# self.xrd_enable_low = xrd_enable_low
|
||||
# self.num_trigger_low = num_trigger_low
|
||||
# self.exp_time_low = exp_time_low
|
||||
# self.cycle_low = cycle_low
|
||||
# self.xrd_enable_high = xrd_enable_high
|
||||
# self.num_trigger_high = num_trigger_high
|
||||
# self.exp_time_high = exp_time_high
|
||||
# self.cycle_high = cycle_high
|
||||
@scan_hook
|
||||
def on_exception(self, exception: Exception):
|
||||
"""Try to stop outstanding device work if the scan fails."""
|
||||
self.actions.complete_all_devices(wait=False)
|
||||
|
||||
|
||||
class XASAdvancedScan(XASSimpleScan):
|
||||
"""Class for the XAS advanced scan"""
|
||||
"""Advanced oscillating XAS scan with spline parameters."""
|
||||
|
||||
scan_name = "xas_advanced_scan"
|
||||
gui_config = {
|
||||
"Movement Parameters": ["start", "stop"],
|
||||
"Scan Parameters": ["scan_time", "scan_duration"],
|
||||
"Scan Parameters": ["scan_time", "scan_duration", "monitored_readout_cycle"],
|
||||
"Spline Parameters": ["p_kink", "e_kink"],
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
start: float,
|
||||
stop: float,
|
||||
scan_time: float,
|
||||
scan_duration: float,
|
||||
p_kink: float,
|
||||
e_kink: float,
|
||||
motor: DeviceBase = "mo1_bragg",
|
||||
# fmt: off
|
||||
start: Annotated[float, ScanArgument(display_name="Start Energy", description="Start energy.", units=Units.eV)],
|
||||
stop: Annotated[float, ScanArgument(display_name="Stop Energy", description="Stop energy.", units=Units.eV)],
|
||||
scan_time: Annotated[float, ScanArgument(display_name="Scan Time", description="Time for one scan cycle.", units=Units.s, ge=0)],
|
||||
scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Total scan duration.", units=Units.s, ge=0)],
|
||||
p_kink: Annotated[float, ScanArgument(display_name="P Kink", description="Position of the kink.", ge=0)],
|
||||
e_kink: Annotated[float, ScanArgument(display_name="E Kink", description="Energy of the kink.", units=Units.eV)],
|
||||
motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None,
|
||||
daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None,
|
||||
gonio: Annotated[DeviceBase | None, ScanArgument(display_name="Gonio", description="Goniometer device.")] = None,
|
||||
monitored_readout_cycle: Annotated[float, ScanArgument(display_name="Monitored Readout Cycle", description="Delay between monitored readouts.", units=Units.s, gt=0)] = 1,
|
||||
**kwargs,
|
||||
# fmt: on
|
||||
):
|
||||
"""The xas_advanced_scan is an oscillation motion on the mono motor.
|
||||
Start and Stop define the energy range for the scan, scan_time is the
|
||||
time for one scan cycle and scan_duration is the duration of the scan.
|
||||
If scan duration is set to 0, the scan will run infinitely.
|
||||
p_kink and e_kink add a kink to the motion profile to slow down in the
|
||||
exafs region of the scan.
|
||||
|
||||
Args:
|
||||
start (float): Start angle for the scan.
|
||||
stop (float): Stop angle for the scan.
|
||||
scan_time (float): Time for one oscillation .
|
||||
scan_duration (float): Total duration of the scan.
|
||||
p_kink (float): Position of the kink.
|
||||
e_kink (float): Energy of the kink.
|
||||
motor (DeviceBase, optional): Motor device to be used for the scan.
|
||||
Defaults to "mo1_bragg".
|
||||
|
||||
Examples:
|
||||
>>> scans.xas_advanced_scan(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500)
|
||||
"""
|
||||
super().__init__(
|
||||
start=start,
|
||||
stop=stop,
|
||||
scan_time=scan_time,
|
||||
scan_duration=scan_duration,
|
||||
motor=motor,
|
||||
daq=daq,
|
||||
gonio=gonio,
|
||||
monitored_readout_cycle=monitored_readout_cycle,
|
||||
**kwargs,
|
||||
)
|
||||
self.p_kink = p_kink
|
||||
self.e_kink = e_kink
|
||||
self.update_scan_info(p_kink=p_kink, e_kink=e_kink)
|
||||
|
||||
|
||||
# class XASAdvancedScanWithXRD(XASAdvancedScan):
|
||||
# """Class for the XAS advanced scan with XRD"""
|
||||
|
||||
# scan_name = "xas_advanced_scan_with_xrd"
|
||||
# gui_config = {
|
||||
# "Movement Parameters": ["start", "stop"],
|
||||
# "Scan Parameters": ["scan_time", "scan_duration"],
|
||||
# "Spline Parameters": ["p_kink", "e_kink"],
|
||||
# "Low Energy Range": ["xrd_enable_low", "num_trigger_low", "exp_time_low", "cycle_low"],
|
||||
# "High Energy Range": ["xrd_enable_high", "num_trigger_high", "exp_time_high", "cycle_high"],
|
||||
# }
|
||||
|
||||
# def __init__(
|
||||
# self,
|
||||
# start: float,
|
||||
# stop: float,
|
||||
# scan_time: float,
|
||||
# scan_duration: float,
|
||||
# p_kink: float,
|
||||
# e_kink: float,
|
||||
# xrd_enable_low: bool,
|
||||
# num_trigger_low: int,
|
||||
# exp_time_low: float,
|
||||
# cycle_low: int,
|
||||
# xrd_enable_high: bool,
|
||||
# num_trigger_high: int,
|
||||
# exp_time_high: float,
|
||||
# cycle_high: float,
|
||||
# motor: DeviceBase = "mo1_bragg",
|
||||
# **kwargs,
|
||||
# ):
|
||||
# """The xas_advanced_scan is an oscillation motion on the mono motor
|
||||
# with XRD triggering at low and high energy ranges.
|
||||
# Start and Stop define the energy range for the scan, scan_time is the time for
|
||||
# one scan cycle and scan_duration is the duration of the scan. If scan duration
|
||||
# is set to 0, the scan will run infinitely. p_kink and e_kink add a kink to the
|
||||
# motion profile to slow down in the exafs region of the scan.
|
||||
|
||||
# Args:
|
||||
# start (float): Start angle for the scan.
|
||||
# stop (float): Stop angle for the scan.
|
||||
# scan_time (float): Time for one oscillation .
|
||||
# scan_duration (float): Total duration of the scan.
|
||||
# p_kink (float): Position of kink.
|
||||
# e_kink (float): Energy of the kink.
|
||||
# xrd_enable_low (bool): Enable XRD triggering for the low energy range.
|
||||
# num_trigger_low (int): Number of triggers for the low energy range.
|
||||
# exp_time_low (float): Exposure time for the low energy range.
|
||||
# cycle_low (int): Specify how often the triggers should be considered,
|
||||
# every nth cycle for low
|
||||
# xrd_enable_high (bool): Enable XRD triggering for the high energy range.
|
||||
# num_trigger_high (int): Number of triggers for the high energy range.
|
||||
# exp_time_high (float): Exposure time for the high energy range.
|
||||
# cycle_high (int): Specify how often the triggers should be considered,
|
||||
# every nth cycle for high
|
||||
# motor (DeviceBase, optional): Motor device to be used for the scan.
|
||||
# Defaults to "mo1_bragg".
|
||||
|
||||
# Examples:
|
||||
# >>> scans.xas_advanced_scan_with_xrd(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000)
|
||||
# """
|
||||
# super().__init__(
|
||||
# start=start,
|
||||
# stop=stop,
|
||||
# scan_time=scan_time,
|
||||
# scan_duration=scan_duration,
|
||||
# p_kink=p_kink,
|
||||
# e_kink=e_kink,
|
||||
# motor=motor,
|
||||
# **kwargs,
|
||||
# )
|
||||
# self.p_kink = p_kink
|
||||
# self.e_kink = e_kink
|
||||
# self.xrd_enable_low = xrd_enable_low
|
||||
# self.num_trigger_low = num_trigger_low
|
||||
# self.exp_time_low = exp_time_low
|
||||
# self.cycle_low = cycle_low
|
||||
# self.xrd_enable_high = xrd_enable_high
|
||||
# self.num_trigger_high = num_trigger_high
|
||||
# self.exp_time_high = exp_time_high
|
||||
# self.cycle_high = cycle_high
|
||||
|
||||
@@ -1,84 +1,126 @@
|
||||
"""This module contains the scan class for the nidaq of the SuperXAS beamline for use in continuous mode."""
|
||||
"""
|
||||
V4 implementation of the SuperXAS NIDAQ continuous scan.
|
||||
|
||||
The NIDAQ continuous scan measures with the NIDAQ without moving the
|
||||
monochromator or any other motor.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from typing import Literal
|
||||
from typing import Annotated
|
||||
|
||||
import numpy as np
|
||||
from bec_lib.device import DeviceBase
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_server.scan_server.scans import AsyncFlyScanBase
|
||||
|
||||
logger = bec_logger.logger
|
||||
from bec_lib.scan_args import ScanArgument, Units
|
||||
from bec_server.scan_server.scans.scan_base import ScanBase, ScanType
|
||||
from bec_server.scan_server.scans.scan_modifier import scan_hook
|
||||
|
||||
|
||||
class NIDAQContinuousScan(AsyncFlyScanBase):
|
||||
"""Class for the nidaq continuous scan (without mono)"""
|
||||
class NIDAQContinuousScan(ScanBase):
|
||||
"""Continuous NIDAQ acquisition without mono motion."""
|
||||
|
||||
scan_type = ScanType.HARDWARE_TRIGGERED
|
||||
scan_name = "nidaq_continuous_scan"
|
||||
scan_type = "fly"
|
||||
scan_report_hint = "device_progress"
|
||||
required_kwargs = []
|
||||
use_scan_progress_report = False
|
||||
pre_move = False
|
||||
gui_config = {"Scan Parameters": ["scan_duration"], "Data Compression": ["compression"]}
|
||||
|
||||
gui_config = {"Scan Parameters": ["scan_duration", "daq", "compression"]}
|
||||
|
||||
def __init__(
|
||||
self, scan_duration: float, daq: DeviceBase = "nidaq", compression: bool = False, **kwargs
|
||||
self,
|
||||
# fmt: off
|
||||
scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Duration of the scan", units=Units.s)],
|
||||
daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="DAQ device to be used for the scan")] = None,
|
||||
compression: Annotated[bool, ScanArgument(display_name="Compression", description="Whether to compress the data")] = False,
|
||||
# fmt: on
|
||||
**kwargs,
|
||||
):
|
||||
"""The NIDAQ continuous scan is used to measure with the NIDAQ without moving the
|
||||
monochromator or any other motor. The NIDAQ thus runs in continuous mode, with a
|
||||
set scan_duration.
|
||||
"""
|
||||
Start a continuous NIDAQ acquisition.
|
||||
|
||||
Args:
|
||||
scan_duration (float): Duration of the scan.
|
||||
daq (DeviceBase, optional): DAQ device to be used for the scan.
|
||||
Defaults to "nidaq".
|
||||
Examples:
|
||||
>>> scans.nidaq_continuous_scan(scan_duration=10)
|
||||
daq (DeviceBase | None): DAQ device to be used for the scan.
|
||||
compression (bool): Whether to compress the data.
|
||||
|
||||
Returns:
|
||||
ScanReport
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self._baseline_readout_status = None
|
||||
self.scan_duration = scan_duration
|
||||
self.daq = daq
|
||||
self.start_time = 0
|
||||
self.primary_readout_cycle = 1
|
||||
self.scan_parameters["scan_duration"] = scan_duration
|
||||
self.scan_parameters["compression"] = compression
|
||||
self.daq = self._resolve_device(daq, "nidaq")
|
||||
self.compression = compression
|
||||
self.monitored_readout_cycle = 1.0
|
||||
self.motors = [self.daq]
|
||||
|
||||
def update_readout_priority(self):
|
||||
"""Ensure that NIDAQ is not monitored for any quick EXAFS."""
|
||||
super().update_readout_priority()
|
||||
self.readout_priority["async"].append("nidaq")
|
||||
self.update_scan_info(scan_duration=scan_duration, compression=compression)
|
||||
self.actions.set_device_readout_priority([self.daq], priority="async")
|
||||
|
||||
def prepare_positions(self):
|
||||
"""Prepare the positions for the scan."""
|
||||
yield None
|
||||
def _resolve_device(self, device: DeviceBase | str | None, default_name: str) -> DeviceBase:
|
||||
"""Resolve optional device arguments against the v4 device container."""
|
||||
if device is None:
|
||||
return self.dev[default_name]
|
||||
if isinstance(device, str):
|
||||
return self.dev[device]
|
||||
return device
|
||||
|
||||
@scan_hook
|
||||
def prepare_scan(self):
|
||||
"""Prepare scan metadata and baseline readout."""
|
||||
self.actions.add_scan_report_instruction_device_progress(self.daq)
|
||||
self._baseline_readout_status = self.actions.read_baseline_devices(wait=False)
|
||||
|
||||
@scan_hook
|
||||
def open_scan(self):
|
||||
"""Open the scan."""
|
||||
self.actions.open_scan()
|
||||
|
||||
@scan_hook
|
||||
def stage(self):
|
||||
"""Stage all devices."""
|
||||
self.actions.stage_all_devices()
|
||||
|
||||
@scan_hook
|
||||
def pre_scan(self):
|
||||
"""Pre Scan action."""
|
||||
|
||||
self.start_time = time.time()
|
||||
# Ensure parent class pre_scan actions to be called.
|
||||
yield from super().pre_scan()
|
||||
|
||||
def scan_report_instructions(self):
|
||||
"""
|
||||
Return the instructions for the scan report.
|
||||
"""
|
||||
yield from self.stubs.scan_report_instruction({"device_progress": [self.daq]})
|
||||
"""Execute pre-scan hooks on all devices."""
|
||||
self.actions.pre_scan_all_devices()
|
||||
|
||||
@scan_hook
|
||||
def scan_core(self):
|
||||
"""Run the scan core.
|
||||
Kickoff the acquisition of the NIDAQ wait for the completion of the scan.
|
||||
"""
|
||||
kickoff_status = yield from self.stubs.kickoff(device=self.daq)
|
||||
kickoff_status.wait(timeout=5) # wait for proper kickoff of device
|
||||
"""Kick off the NIDAQ acquisition and read monitored devices."""
|
||||
kickoff_status = self.actions.kickoff(device=self.daq, wait=False)
|
||||
kickoff_status.wait(timeout=5)
|
||||
|
||||
complete_status = yield from self.stubs.complete(device=self.daq, wait=False)
|
||||
complete_status = self.actions.complete(device=self.daq, wait=False)
|
||||
|
||||
while not complete_status.done:
|
||||
# Readout monitored devices
|
||||
yield from self.stubs.read(group="monitored", point_id=self.point_id)
|
||||
time.sleep(self.primary_readout_cycle)
|
||||
self.point_id += 1
|
||||
self.at_each_point()
|
||||
time.sleep(self.monitored_readout_cycle)
|
||||
|
||||
@scan_hook
|
||||
def at_each_point(self):
|
||||
"""Read monitored devices during the hardware-triggered scan."""
|
||||
self.actions.read_monitored_devices()
|
||||
|
||||
@scan_hook
|
||||
def post_scan(self):
|
||||
"""Complete all devices after the scan core."""
|
||||
self.actions.complete_all_devices()
|
||||
|
||||
@scan_hook
|
||||
def unstage(self):
|
||||
"""Unstage all devices."""
|
||||
self.actions.unstage_all_devices()
|
||||
|
||||
@scan_hook
|
||||
def close_scan(self):
|
||||
"""Close the scan after the baseline readout has completed."""
|
||||
if self._baseline_readout_status is not None:
|
||||
self._baseline_readout_status.wait()
|
||||
self.actions.close_scan()
|
||||
self.actions.check_for_unchecked_statuses()
|
||||
|
||||
@scan_hook
|
||||
def on_exception(self, exception: Exception):
|
||||
"""Try to stop outstanding device work if the scan fails."""
|
||||
self.actions.complete_all_devices(wait=False)
|
||||
|
||||
self.num_pos = self.point_id
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
# pylint: skip-file
|
||||
from types import SimpleNamespace
|
||||
|
||||
import numpy as np
|
||||
|
||||
from superxas_bec.devices.utils.utils import fetch_scan_info
|
||||
|
||||
|
||||
def test_fetch_scan_info_accepts_v4_scan_info_with_positions_list():
|
||||
scan_info = SimpleNamespace(
|
||||
msg=SimpleNamespace(
|
||||
info={
|
||||
"scan_name": "xas_simple_scan",
|
||||
"scan_id": "scan-id-test",
|
||||
"scan_type": "hardware_triggered",
|
||||
"positions": [8000.0, 9000.0],
|
||||
"additional_scan_parameters": {
|
||||
"scan_time": 1.0,
|
||||
"scan_duration": 10.0,
|
||||
},
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
msg = fetch_scan_info(scan_info)
|
||||
|
||||
assert msg.scan_name == "xas_simple_scan"
|
||||
assert msg.scan_type == "hardware_triggered"
|
||||
np.testing.assert_array_equal(msg.positions, np.array([8000.0, 9000.0]))
|
||||
assert msg.additional_scan_parameters["scan_duration"] == 10.0
|
||||
|
||||
|
||||
def test_fetch_scan_info_converts_legacy_fly_scan_type():
|
||||
scan_info = SimpleNamespace(
|
||||
msg=SimpleNamespace(
|
||||
info={
|
||||
"scan_name": "xas_simple_scan",
|
||||
"scan_id": "scan-id-test",
|
||||
"scan_type": "fly",
|
||||
"positions": [8000.0, 9000.0],
|
||||
"request_inputs": {
|
||||
"inputs": {},
|
||||
"kwargs": {"scan_time": 1.0, "scan_duration": 10.0},
|
||||
},
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
msg = fetch_scan_info(scan_info)
|
||||
|
||||
assert msg.scan_type == "hardware_triggered"
|
||||
assert msg.request_inputs["kwargs"]["scan_time"] == 1.0
|
||||
|
||||
@@ -0,0 +1,100 @@
|
||||
# pylint: skip-file
|
||||
from unittest import mock
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
|
||||
from ophyd_devices.tests.utils import patched_device
|
||||
|
||||
from superxas_bec.devices.mo1_bragg.mo1_bragg import Mo1Bragg, ScanControlLoadMessage
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def mock_bragg():
|
||||
with patched_device(
|
||||
Mo1Bragg, name="mo1_bragg", prefix="X10DA-OP-MO1:BRAGG:"
|
||||
) as dev:
|
||||
yield dev
|
||||
|
||||
|
||||
def _set_scan_info(dev, scan_info):
|
||||
dev.scan_info.msg.info.update(scan_info.model_dump())
|
||||
|
||||
|
||||
def _mock_status():
|
||||
status = mock.MagicMock()
|
||||
status.wait = mock.MagicMock()
|
||||
return status
|
||||
|
||||
|
||||
def test_mo1_bragg_stage_uses_v4_simple_scan_info(mock_bragg):
|
||||
scan_info = ScanServerScanInfo(
|
||||
scan_name="xas_simple_scan",
|
||||
scan_id="scan-id-test",
|
||||
scan_type="hardware_triggered",
|
||||
positions=np.array([8000.0, 9000.0]),
|
||||
additional_scan_parameters={"scan_time": 1.0, "scan_duration": 10.0},
|
||||
)
|
||||
_set_scan_info(mock_bragg, scan_info)
|
||||
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.PENDING
|
||||
|
||||
with (
|
||||
mock.patch.object(mock_bragg, "set_xas_settings") as set_xas_settings,
|
||||
mock.patch.object(mock_bragg, "set_trig_settings") as set_trig_settings,
|
||||
mock.patch.object(mock_bragg, "set_scan_control_settings") as set_scan_control_settings,
|
||||
mock.patch.object(mock_bragg, "cancel_on_stop"),
|
||||
mock.patch(
|
||||
"superxas_bec.devices.mo1_bragg.mo1_bragg.CompareStatus",
|
||||
return_value=_mock_status(),
|
||||
),
|
||||
):
|
||||
mock_bragg.on_stage()
|
||||
|
||||
set_xas_settings.assert_called_once_with(low=8000.0, high=9000.0, scan_time=1.0)
|
||||
set_trig_settings.assert_called_once_with(
|
||||
enable_low=False,
|
||||
enable_high=False,
|
||||
exp_time_low=0,
|
||||
exp_time_high=0,
|
||||
cycle_low=0,
|
||||
cycle_high=0,
|
||||
)
|
||||
assert set_scan_control_settings.call_args.kwargs["scan_duration"] == 10.0
|
||||
|
||||
|
||||
def test_mo1_bragg_stage_uses_v4_advanced_scan_info(mock_bragg):
|
||||
scan_info = ScanServerScanInfo(
|
||||
scan_name="xas_advanced_scan",
|
||||
scan_id="scan-id-test",
|
||||
scan_type="hardware_triggered",
|
||||
positions=np.array([8000.0, 9000.0]),
|
||||
additional_scan_parameters={
|
||||
"scan_time": 1.0,
|
||||
"scan_duration": 10.0,
|
||||
"p_kink": 50.0,
|
||||
"e_kink": 8500.0,
|
||||
},
|
||||
)
|
||||
_set_scan_info(mock_bragg, scan_info)
|
||||
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.PENDING
|
||||
|
||||
with (
|
||||
mock.patch.object(mock_bragg, "set_advanced_xas_settings") as set_advanced_xas_settings,
|
||||
mock.patch.object(mock_bragg, "set_trig_settings"),
|
||||
mock.patch.object(mock_bragg, "set_scan_control_settings") as set_scan_control_settings,
|
||||
mock.patch.object(mock_bragg, "cancel_on_stop"),
|
||||
mock.patch(
|
||||
"superxas_bec.devices.mo1_bragg.mo1_bragg.CompareStatus",
|
||||
return_value=_mock_status(),
|
||||
),
|
||||
):
|
||||
mock_bragg.on_stage()
|
||||
|
||||
set_advanced_xas_settings.assert_called_once_with(
|
||||
low=8000.0,
|
||||
high=9000.0,
|
||||
scan_time=1.0,
|
||||
p_kink=50.0,
|
||||
e_kink=8500.0,
|
||||
)
|
||||
assert set_scan_control_settings.call_args.kwargs["scan_duration"] == 10.0
|
||||
@@ -0,0 +1,67 @@
|
||||
# pylint: skip-file
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
|
||||
from ophyd_devices.tests.utils import patched_device
|
||||
|
||||
from superxas_bec.devices.nidaq.nidaq import Nidaq, NidaqState, ScanType
|
||||
from superxas_bec.devices.utils.utils import fetch_scan_info
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def mock_nidaq():
|
||||
with patched_device(
|
||||
Nidaq, name="nidaq", prefix="X10DA-CPCL-SCANSERVER:"
|
||||
) as dev:
|
||||
yield dev
|
||||
|
||||
|
||||
def _set_scan_info(dev, scan_info):
|
||||
dev.scan_info.msg.info.update(scan_info.model_dump())
|
||||
dev.scan_parameters = fetch_scan_info(dev.scan_info)
|
||||
|
||||
|
||||
def test_nidaq_check_scan_name_uses_normalized_scan_info(mock_nidaq):
|
||||
valid = ScanServerScanInfo(scan_name="xas_simple_scan", scan_id="scan-id-test")
|
||||
invalid = ScanServerScanInfo(scan_name="line_scan", scan_id="scan-id-test")
|
||||
|
||||
assert mock_nidaq._check_if_scan_name_is_valid(valid)
|
||||
assert not mock_nidaq._check_if_scan_name_is_valid(invalid)
|
||||
|
||||
|
||||
def test_nidaq_progress_update_uses_v4_additional_parameters(mock_nidaq):
|
||||
scan_info = ScanServerScanInfo(
|
||||
scan_name="nidaq_continuous_scan",
|
||||
scan_id="scan-id-test",
|
||||
additional_scan_parameters={"scan_duration": 10.0, "compression": False},
|
||||
)
|
||||
_set_scan_info(mock_nidaq, scan_info)
|
||||
mock_nidaq.progress_signal.put = mock.MagicMock()
|
||||
|
||||
mock_nidaq._progress_update(4.0)
|
||||
|
||||
mock_nidaq.progress_signal.put.assert_called_once_with(value=6.0, max_value=10.0, done=False)
|
||||
|
||||
|
||||
def test_nidaq_stage_uses_v4_continuous_scan_info(mock_nidaq):
|
||||
scan_info = ScanServerScanInfo(
|
||||
scan_name="nidaq_continuous_scan",
|
||||
scan_id="scan-id-test",
|
||||
additional_scan_parameters={"scan_duration": 10.0, "compression": False},
|
||||
)
|
||||
mock_nidaq.scan_info.msg.info.update(scan_info.model_dump())
|
||||
mock_nidaq.state.put(NidaqState.STANDBY)
|
||||
|
||||
with (
|
||||
mock.patch("superxas_bec.devices.nidaq.nidaq.CompareStatus") as status_cls,
|
||||
mock.patch.object(mock_nidaq, "cancel_on_stop"),
|
||||
mock.patch.object(mock_nidaq, "on_kickoff") as on_kickoff,
|
||||
):
|
||||
status_cls.return_value.wait = mock.MagicMock()
|
||||
mock_nidaq.on_stage()
|
||||
|
||||
assert mock_nidaq.scan_type.get() == ScanType.CONTINUOUS
|
||||
assert mock_nidaq.scan_duration.get() == 10.0
|
||||
assert mock_nidaq.enable_compression.get() is False
|
||||
on_kickoff.assert_not_called()
|
||||
@@ -0,0 +1,7 @@
|
||||
# pylint: skip-file
|
||||
from bec_server.scan_server.tests.scan_fixtures import (
|
||||
nth_done_status_mock,
|
||||
readout_priority,
|
||||
v4_scan_assembler,
|
||||
)
|
||||
|
||||
@@ -0,0 +1,121 @@
|
||||
# pylint: skip-file
|
||||
from unittest import mock
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
from bec_server.scan_server.tests.scan_fixtures import *
|
||||
from bec_server.scan_server.tests.scan_hook_tests import *
|
||||
|
||||
XAS_SIMPLE_SCAN_DEFAULT_HOOK_TESTS = [
|
||||
("prepare_scan", [assert_prepare_scan_reads_baseline_devices]),
|
||||
("open_scan", [assert_scan_open_called]),
|
||||
("pre_scan", [assert_pre_scan_called]),
|
||||
("unstage", [assert_unstage_all_devices_called]),
|
||||
("close_scan", [assert_close_scan_waits_for_baseline_and_closes]),
|
||||
]
|
||||
|
||||
|
||||
def _assemble_xas_simple_scan(v4_scan_assembler, **overrides):
|
||||
params = {
|
||||
"start": 8000.0,
|
||||
"stop": 9000.0,
|
||||
"scan_time": 1.0,
|
||||
"scan_duration": 10.0,
|
||||
"motor": "mo1_bragg",
|
||||
"daq": "nidaq",
|
||||
"gonio": "mo1_gonio",
|
||||
"monitored_readout_cycle": 1.0,
|
||||
}
|
||||
params.update(overrides)
|
||||
return v4_scan_assembler("xas_simple_scan", **params)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(("hook_name", "hook_tests"), XAS_SIMPLE_SCAN_DEFAULT_HOOK_TESTS)
|
||||
def test_xas_simple_scan_v4_default_hooks(
|
||||
v4_scan_assembler, nth_done_status_mock, hook_name, hook_tests
|
||||
):
|
||||
scan = _assemble_xas_simple_scan(v4_scan_assembler)
|
||||
|
||||
run_scan_tests(scan, [(hook_name, hook_tests)], nth_done_status_mock=nth_done_status_mock)
|
||||
|
||||
|
||||
def test_xas_simple_scan_v4_prepare_scan_updates_metadata(v4_scan_assembler):
|
||||
scan = _assemble_xas_simple_scan(v4_scan_assembler)
|
||||
scan.actions.add_scan_report_instruction_device_progress = mock.MagicMock()
|
||||
baseline_status = mock.MagicMock()
|
||||
scan.actions.read_baseline_devices = mock.MagicMock(return_value=baseline_status)
|
||||
|
||||
scan.prepare_scan()
|
||||
|
||||
scan.actions._build_scan_status_message("open")
|
||||
|
||||
np.testing.assert_array_equal(scan.scan_info.positions, np.array([8000.0, 9000.0]))
|
||||
assert scan.scan_info.additional_scan_parameters["scan_time"] == 1.0
|
||||
assert scan.scan_info.additional_scan_parameters["scan_duration"] == 10.0
|
||||
assert scan.scan_info.additional_scan_parameters["monitored_readout_cycle"] == 1.0
|
||||
assert scan.scan_info.readout_priority_modification["async"] == ["nidaq"]
|
||||
scan.actions.add_scan_report_instruction_device_progress.assert_called_once_with(scan.motor)
|
||||
scan.actions.read_baseline_devices.assert_called_once_with(wait=False)
|
||||
assert scan._baseline_readout_status is baseline_status
|
||||
|
||||
|
||||
def test_xas_simple_scan_v4_stage_centers_gonio_and_stages_devices(v4_scan_assembler):
|
||||
scan = _assemble_xas_simple_scan(v4_scan_assembler)
|
||||
scan.mo1_bragg.convert_angle_energy = mock.MagicMock(side_effect=[10.0, 14.0])
|
||||
set_status = mock.MagicMock()
|
||||
scan.actions.set = mock.MagicMock(return_value=set_status)
|
||||
scan.actions.stage_all_devices = mock.MagicMock()
|
||||
|
||||
scan.stage()
|
||||
|
||||
assert scan.mo1_bragg.convert_angle_energy.call_args_list == [
|
||||
mock.call(mode="EnergyToAngle", inp=8000.0),
|
||||
mock.call(mode="EnergyToAngle", inp=9000.0),
|
||||
]
|
||||
scan.actions.set.assert_called_once_with(scan.mo1_gonio, 12.0, wait=False)
|
||||
set_status.wait.assert_called_once_with(timeout=30)
|
||||
scan.actions.stage_all_devices.assert_called_once_with()
|
||||
|
||||
|
||||
def test_xas_simple_scan_v4_scan_core_reads_until_complete(v4_scan_assembler, nth_done_status_mock):
|
||||
scan = _assemble_xas_simple_scan(v4_scan_assembler)
|
||||
completion_status = nth_done_status_mock(resolve_after=3)
|
||||
scan.actions.kickoff = mock.MagicMock()
|
||||
scan.actions.complete = mock.MagicMock(return_value=completion_status)
|
||||
scan.actions.read_monitored_devices = mock.MagicMock()
|
||||
|
||||
with mock.patch("superxas_bec.scans.mono_bragg_scans.time.sleep"):
|
||||
scan.scan_core()
|
||||
|
||||
scan.actions.kickoff.assert_called_once_with(scan.motor)
|
||||
scan.actions.complete.assert_called_once_with(scan.motor, wait=False)
|
||||
assert scan.actions.read_monitored_devices.call_count == 2
|
||||
|
||||
|
||||
def test_xas_simple_scan_v4_post_scan_completes_all_devices(v4_scan_assembler):
|
||||
scan = _assemble_xas_simple_scan(v4_scan_assembler)
|
||||
scan.actions.complete_all_devices = mock.MagicMock()
|
||||
|
||||
scan.post_scan()
|
||||
|
||||
scan.actions.complete_all_devices.assert_called_once_with()
|
||||
|
||||
|
||||
def test_xas_advanced_scan_v4_updates_spline_metadata(v4_scan_assembler):
|
||||
scan = v4_scan_assembler(
|
||||
"xas_advanced_scan",
|
||||
start=8000.0,
|
||||
stop=9000.0,
|
||||
scan_time=1.0,
|
||||
scan_duration=10.0,
|
||||
p_kink=50.0,
|
||||
e_kink=8500.0,
|
||||
motor="mo1_bragg",
|
||||
daq="nidaq",
|
||||
gonio="mo1_gonio",
|
||||
)
|
||||
|
||||
assert scan.scan_name == "xas_advanced_scan"
|
||||
assert scan.scan_info.additional_scan_parameters["p_kink"] == 50.0
|
||||
assert scan.scan_info.additional_scan_parameters["e_kink"] == 8500.0
|
||||
|
||||
@@ -0,0 +1,76 @@
|
||||
# pylint: skip-file
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from bec_server.scan_server.tests.scan_fixtures import *
|
||||
from bec_server.scan_server.tests.scan_hook_tests import *
|
||||
|
||||
NIDAQ_CONTINUOUS_SCAN_DEFAULT_HOOK_TESTS = [
|
||||
("prepare_scan", [assert_prepare_scan_reads_baseline_devices]),
|
||||
("open_scan", [assert_scan_open_called]),
|
||||
("stage", [assert_stage_all_devices_called]),
|
||||
("pre_scan", [assert_pre_scan_called]),
|
||||
("unstage", [assert_unstage_all_devices_called]),
|
||||
("close_scan", [assert_close_scan_waits_for_baseline_and_closes]),
|
||||
]
|
||||
|
||||
|
||||
def _assemble_nidaq_continuous_scan(v4_scan_assembler, **overrides):
|
||||
params = {"scan_duration": 10.0, "daq": "nidaq", "compression": False}
|
||||
params.update(overrides)
|
||||
return v4_scan_assembler("nidaq_continuous_scan", **params)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(("hook_name", "hook_tests"), NIDAQ_CONTINUOUS_SCAN_DEFAULT_HOOK_TESTS)
|
||||
def test_nidaq_continuous_scan_v4_default_hooks(
|
||||
v4_scan_assembler, nth_done_status_mock, hook_name, hook_tests
|
||||
):
|
||||
scan = _assemble_nidaq_continuous_scan(v4_scan_assembler)
|
||||
|
||||
run_scan_tests(scan, [(hook_name, hook_tests)], nth_done_status_mock=nth_done_status_mock)
|
||||
|
||||
|
||||
def test_nidaq_continuous_scan_v4_prepare_scan_updates_metadata(v4_scan_assembler):
|
||||
scan = _assemble_nidaq_continuous_scan(v4_scan_assembler)
|
||||
scan.actions.add_scan_report_instruction_device_progress = mock.MagicMock()
|
||||
baseline_status = mock.MagicMock()
|
||||
scan.actions.read_baseline_devices = mock.MagicMock(return_value=baseline_status)
|
||||
|
||||
scan.prepare_scan()
|
||||
scan.actions._build_scan_status_message("open")
|
||||
|
||||
assert scan.scan_info.additional_scan_parameters["scan_duration"] == 10.0
|
||||
assert scan.scan_info.additional_scan_parameters["compression"] is False
|
||||
assert scan.scan_info.readout_priority_modification["async"] == ["nidaq"]
|
||||
scan.actions.add_scan_report_instruction_device_progress.assert_called_once_with(scan.daq)
|
||||
scan.actions.read_baseline_devices.assert_called_once_with(wait=False)
|
||||
assert scan._baseline_readout_status is baseline_status
|
||||
|
||||
|
||||
def test_nidaq_continuous_scan_v4_scan_core_reads_until_complete(
|
||||
v4_scan_assembler, nth_done_status_mock
|
||||
):
|
||||
scan = _assemble_nidaq_continuous_scan(v4_scan_assembler)
|
||||
kickoff_status = mock.MagicMock()
|
||||
completion_status = nth_done_status_mock(resolve_after=3)
|
||||
scan.actions.kickoff = mock.MagicMock(return_value=kickoff_status)
|
||||
scan.actions.complete = mock.MagicMock(return_value=completion_status)
|
||||
scan.actions.read_monitored_devices = mock.MagicMock()
|
||||
|
||||
with mock.patch("superxas_bec.scans.nidaq_cont_scan.time.sleep"):
|
||||
scan.scan_core()
|
||||
|
||||
scan.actions.kickoff.assert_called_once_with(device=scan.daq, wait=False)
|
||||
kickoff_status.wait.assert_called_once_with(timeout=5)
|
||||
scan.actions.complete.assert_called_once_with(device=scan.daq, wait=False)
|
||||
assert scan.actions.read_monitored_devices.call_count == 2
|
||||
|
||||
|
||||
def test_nidaq_continuous_scan_v4_post_scan_completes_all_devices(v4_scan_assembler):
|
||||
scan = _assemble_nidaq_continuous_scan(v4_scan_assembler)
|
||||
scan.actions.complete_all_devices = mock.MagicMock()
|
||||
|
||||
scan.post_scan()
|
||||
|
||||
scan.actions.complete_all_devices.assert_called_once_with()
|
||||
|
||||
Reference in New Issue
Block a user