Compare commits

...

15 Commits

Author SHA1 Message Date
x01da 40309491b0 fix: nexus file structure for nidaq continuous scan
CI for debye_bec / test (pull_request) Successful in 1m1s
CI for debye_bec / test (push) Successful in 57s
2026-05-27 14:23:21 +02:00
appel_c 2f0265fff7 refactor: deprecate old nidaq_cont_scan implementation
CI for debye_bec / test (pull_request) Successful in 53s
CI for debye_bec / test (push) Successful in 56s
2026-05-27 13:23:43 +02:00
x01da 2d21eb90fe fix: fix scan_done PV with RBV
CI for debye_bec / test (pull_request) Successful in 53s
CI for debye_bec / test (push) Successful in 56s
2026-05-27 11:09:30 +02:00
x01da 8ddf67e817 fix: catch positions is None
CI for debye_bec / test (push) Successful in 1m3s
CI for debye_bec / test (pull_request) Successful in 1m3s
2026-05-27 10:14:49 +02:00
x01da 617cca71a5 fix: nexus structure safe guards.
CI for debye_bec / test (push) Successful in 56s
CI for debye_bec / test (pull_request) Successful in 56s
2026-05-27 10:06:21 +02:00
x01da 7e20d46881 fix: fix bug in mo1_bragg stage
CI for debye_bec / test (pull_request) Successful in 56s
CI for debye_bec / test (push) Successful in 57s
2026-05-27 09:09:45 +02:00
appel_c 74ff173f98 refactor: cleanup, fix tests
CI for debye_bec / test (pull_request) Successful in 53s
CI for debye_bec / test (push) Successful in 1m1s
2026-05-27 08:30:05 +02:00
x01da 87758710d9 fix: fixes from tests at the beamline 2026-05-27 08:30:05 +02:00
appel_c 8bc36ed6a2 refactor(mo1-bragg): migrate mo1_bragg to scans v4 2026-05-27 08:30:05 +02:00
appel_c 78d58ad26f refactor(hutch-cam): migrate hutch cam to scans v4 2026-05-27 08:30:05 +02:00
appel_c 359ef0b6d7 refactor(nidaq): migrate nidaq to scans v4 2026-05-22 10:10:20 +02:00
appel_c 262a0b6318 refactor(pilatus): migrate to scans v4 interface 2026-05-22 10:04:49 +02:00
appel_c 98d5c22667 refactor(nidaq): migrate NIDAQ to v4 scan_info 2026-05-21 17:41:49 +02:00
wakonig_k 0e77dd5679 feat: add NIDAQ continuous scan v4 implementation and update related tests
CI for debye_bec / test (pull_request) Successful in 54s
CI for debye_bec / test (push) Successful in 56s
2026-05-21 17:16:50 +02:00
wakonig_k f8e5b5e073 feat: migrate XAS simple scan to V4 implementation and remove mono bragg scans 2026-05-21 17:16:50 +02:00
20 changed files with 1196 additions and 1306 deletions
+19 -12
View File
@@ -2,14 +2,17 @@
from __future__ import annotations
import cv2
import threading
from typing import TYPE_CHECKING
from bec_lib.logger import bec_logger
import cv2
from bec_lib.file_utils import get_full_path
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd_devices import DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from debye_bec.devices.utils.utils import fetch_scan_info
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
@@ -21,6 +24,7 @@ CAM_USERNAME = "camera_user"
CAM_PASSWORD = "camera_user1"
CAM_PORT = 554
class HutchCam(PSIDeviceBase):
"""Class for the Hutch Cameras"""
@@ -28,7 +32,7 @@ class HutchCam(PSIDeviceBase):
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
super().__init__(name=name, scan_info=scan_info, **kwargs)
self.scan_parameters: ScanServerScanInfo = None
self.hostname = prefix
self.status = None
@@ -47,33 +51,36 @@ class HutchCam(PSIDeviceBase):
def on_stage(self) -> DeviceStatus:
"""Called while staging the device."""
scan_msg: ScanStatusMessage = self.scan_info.msg
file_path = get_full_path(scan_msg, name='hutch_cam_' + self.hostname).removesuffix('h5')
self.scan_parameters = fetch_scan_info(self.scan_info)
file_path = get_full_path(self.scan_info, name="hutch_cam_" + self.hostname).removesuffix(
"h5"
)
self.status = DeviceStatus(self)
thread = threading.Thread(target=self._save_picture, args=(file_path, self.status), daemon=True)
thread = threading.Thread(
target=self._save_picture, args=(file_path, self.status), daemon=True
)
thread.start()
return self.status
def _save_picture(self, file_path, status):
try:
logger.info(f'Capture from camera {self.hostname}')
logger.info(f"Capture from camera {self.hostname}")
rtsp_url = f"rtsp://{CAM_USERNAME}:{CAM_PASSWORD}@{self.hostname}.psi.ch:{CAM_PORT}/rtpstream/config1"
cap = cv2.VideoCapture(f"{rtsp_url}?tcp")
if not cap.isOpened():
logger.error("Connection Failed", "Could not connect to the camera stream.")
return
logger.info(f'Connection to camera {self.hostname} established')
logger.info(f"Connection to camera {self.hostname} established")
ret, frame = cap.readAsync()
cap.release()
if not ret:
logger.error("Capture Failed", "Failed to capture image from camera.")
return
cv2.imwrite(file_path + 'png', frame)
cv2.imwrite(file_path + "png", frame)
status.set_finished()
logger.info(f'Capture from camera {self.hostname} done')
logger.info(f"Capture from camera {self.hostname} done")
except Exception as e:
status.set_exception(e)
+192 -122
View File
@@ -13,6 +13,7 @@ from typing import Literal
from bec_lib.devicemanager import ScanInfo
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd import Component as Cpt
from ophyd import DeviceStatus, StatusBase
from ophyd.status import WaitTimeoutError
@@ -33,6 +34,7 @@ from debye_bec.devices.mo1_bragg.mo1_bragg_enums import (
TriggerControlSource,
)
from debye_bec.devices.mo1_bragg.mo1_bragg_utils import compute_spline
from debye_bec.devices.utils.utils import fetch_scan_info
# Initialise logger
logger = bec_logger.logger
@@ -44,36 +46,6 @@ class Mo1BraggError(Exception):
"""Exception for the Mo1 Bragg positioner"""
########## Scan Parameter Model ##########
class ScanParameter(BaseModel):
"""Dataclass to store the scan parameters for the Mo1 Bragg positioner.
This needs to be in sync with the kwargs of the MO1 Bragg scans from Debye, to
ensure that the scan parameters are correctly set. Any changes in the scan kwargs,
i.e. renaming or adding new parameters, need to be represented here as well."""
scan_time: float | None = Field(None, description="Scan time for a half oscillation")
scan_duration: float | None = Field(None, description="Duration of the scan")
break_enable_low: bool | None = Field(
None, description="Break enabled for low, should be PV trig_ena_lo_enum"
) # trig_enable_low: bool = None
break_enable_high: bool | None = Field(
None, description="Break enabled for high, should be PV trig_ena_hi_enum"
) # trig_enable_high: bool = None
break_time_low: float | None = Field(None, description="Break time low energy/angle")
break_time_high: float | None = Field(None, description="Break time high energy/angle")
cycle_low: int | None = Field(None, description="Cycle for low energy/angle")
cycle_high: int | None = Field(None, description="Cycle for high energy/angle")
exp_time: float | None = Field(None, description="XRD trigger period")
n_of_trigger: int | None = Field(None, description="Amount of XRD triggers")
start: float | None = Field(None, description="Start value for energy/angle")
stop: float | None = Field(None, description="Stop value for energy/angle")
p_kink: float | None = Field(None, description="P Kink")
e_kink: float | None = Field(None, description="Energy Kink")
model_config: dict = {"validate_assignment": True}
########### Mo1 Bragg Motor Class ###########
@@ -85,7 +57,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
progress_signal = Cpt(ProgressSignal, name="progress_signal")
USER_ACCESS = ["set_advanced_xas_settings", "set_xtal"]
USER_ACCESS = ["set_advanced_xas_settings", "set_xtal", "convert_angle_energy"]
def __init__(self, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): # type: ignore
"""
@@ -96,8 +68,15 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
scan_info (ScanInfo): The scan info to use.
"""
super().__init__(name=name, scan_info=scan_info, prefix=prefix, **kwargs)
self.scan_parameter = ScanParameter()
self.scan_parameters: ScanServerScanInfo = None
self.timeout_for_pvwait = 7.5
self.valid_scan_names = [
"xas_simple_scan",
"xas_simple_scan_with_xrd",
"xas_advanced_scan",
"xas_advanced_scan_with_xrd",
"nidaq_continuous_scan",
]
########################################
# Beamline Specific Implementations #
@@ -124,94 +103,187 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
self.scan_parameters = fetch_scan_info(self.scan_info)
if self.scan_control.scan_msg.get() != ScanControlLoadMessage.PENDING:
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
self.cancel_on_stop(status)
self.scan_control.scan_val_reset.put(1)
status.wait(timeout=self.timeout_for_pvwait)
scan_name = self.scan_info.msg.scan_name
self._update_scan_parameter()
if scan_name == "xas_simple_scan":
self.set_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
)
self.set_trig_settings(
enable_low=False,
enable_high=False,
break_time_low=0,
break_time_high=0,
cycle_low=0,
cycle_high=0,
exp_time=0,
n_of_trigger=0,
)
self.set_scan_control_settings(
mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration
)
elif scan_name == "xas_simple_scan_with_xrd":
self.set_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
)
self.set_trig_settings(
enable_low=self.scan_parameter.break_enable_low,
enable_high=self.scan_parameter.break_enable_high,
break_time_low=self.scan_parameter.break_time_low,
break_time_high=self.scan_parameter.break_time_high,
cycle_low=self.scan_parameter.cycle_low,
cycle_high=self.scan_parameter.cycle_high,
exp_time=self.scan_parameter.exp_time,
n_of_trigger=self.scan_parameter.n_of_trigger,
)
self.set_scan_control_settings(
mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration
)
elif scan_name == "xas_advanced_scan":
self.set_advanced_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
p_kink=self.scan_parameter.p_kink,
e_kink=self.scan_parameter.e_kink,
)
self.set_trig_settings(
enable_low=False,
enable_high=False,
break_time_low=0,
break_time_high=0,
cycle_low=0,
cycle_high=0,
exp_time=0,
n_of_trigger=0,
)
self.set_scan_control_settings(
mode=ScanControlMode.ADVANCED, scan_duration=self.scan_parameter.scan_duration
)
elif scan_name == "xas_advanced_scan_with_xrd":
self.set_advanced_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
p_kink=self.scan_parameter.p_kink,
e_kink=self.scan_parameter.e_kink,
)
self.set_trig_settings(
enable_low=self.scan_parameter.break_enable_low,
enable_high=self.scan_parameter.break_enable_high,
break_time_low=self.scan_parameter.break_time_low,
break_time_high=self.scan_parameter.break_time_high,
cycle_low=self.scan_parameter.cycle_low,
cycle_high=self.scan_parameter.cycle_high,
exp_time=self.scan_parameter.exp_time,
n_of_trigger=self.scan_parameter.n_of_trigger,
)
self.set_scan_control_settings(
mode=ScanControlMode.ADVANCED, scan_duration=self.scan_parameter.scan_duration
scan_name = self.scan_parameters.scan_name
if self._check_if_scan_name_is_valid(self.scan_parameters):
if self.scan_parameters.positions is not None:
start, stop = (
self.scan_parameters.positions
if len(self.scan_parameters.positions) == 2
else (None, None)
)
else:
start, stop = (None, None)
scan_time = self.scan_parameters.additional_scan_parameters.get("scan_time", None)
scan_duration = self.scan_parameters.additional_scan_parameters.get(
"scan_duration", None
)
if scan_name == "xas_simple_scan":
if any(param is None for param in [start, stop, scan_time, scan_duration]):
raise Mo1BraggError(
f"Missing scan parameters for xas_simple_scan. Required parameters: start, stop, scan_time, scan_duration in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}"
)
self.set_xas_settings(low=start, high=stop, scan_time=scan_time)
self.set_trig_settings(
enable_low=False,
enable_high=False,
break_time_low=0,
break_time_high=0,
cycle_low=0,
cycle_high=0,
exp_time=0,
n_of_trigger=0,
)
self.set_scan_control_settings(
mode=ScanControlMode.SIMPLE, scan_duration=scan_duration
)
elif scan_name == "xas_simple_scan_with_xrd":
break_enable_low = self.scan_parameters.additional_scan_parameters.get(
"break_enable_low", None
)
break_enable_high = self.scan_parameters.additional_scan_parameters.get(
"break_enable_high", None
)
break_time_low = self.scan_parameters.additional_scan_parameters.get(
"break_time_low", None
)
break_time_high = self.scan_parameters.additional_scan_parameters.get(
"break_time_high", None
)
cycle_low = self.scan_parameters.additional_scan_parameters.get("cycle_low", None)
cycle_high = self.scan_parameters.additional_scan_parameters.get("cycle_high", None)
exp_time = self.scan_parameters.exp_time
n_of_trigger = self.scan_parameters.additional_scan_parameters.get(
"n_of_trigger", None
)
if any(
param is None
for param in [
start,
stop,
scan_time,
scan_duration,
break_enable_low,
break_enable_high,
break_time_low,
break_time_high,
cycle_low,
cycle_high,
exp_time,
n_of_trigger,
]
):
raise Mo1BraggError(
f"Missing scan parameters for xas_simple_scan_with_xrd. Required parameters: start, stop, scan_time, scan_duration, break_enable_low, break_enable_high, break_time_low, break_time_high, cycle_low, cycle_high, exp_time, n_of_trigger in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}"
)
self.set_xas_settings(low=start, high=stop, scan_time=scan_time)
self.set_trig_settings(
enable_low=break_enable_low,
enable_high=break_enable_high,
break_time_low=break_time_low,
break_time_high=break_time_high,
cycle_low=cycle_low,
cycle_high=cycle_high,
exp_time=exp_time,
n_of_trigger=n_of_trigger,
)
self.set_scan_control_settings(
mode=ScanControlMode.SIMPLE, scan_duration=scan_duration
)
elif scan_name == "xas_advanced_scan":
p_kink = self.scan_parameters.additional_scan_parameters.get("p_kink", None)
e_kink = self.scan_parameters.additional_scan_parameters.get("e_kink", None)
if any(
param is None
for param in [start, stop, scan_time, scan_duration, p_kink, e_kink]
):
raise Mo1BraggError(
f"Missing scan parameters for xas_advanced_scan. Required parameters: start, stop, scan_time, scan_duration, p_kink, e_kink in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}"
)
self.set_advanced_xas_settings(
low=start, high=stop, scan_time=scan_time, p_kink=p_kink, e_kink=e_kink
)
self.set_trig_settings(
enable_low=False,
enable_high=False,
break_time_low=0,
break_time_high=0,
cycle_low=0,
cycle_high=0,
exp_time=0,
n_of_trigger=0,
)
self.set_scan_control_settings(
mode=ScanControlMode.ADVANCED, scan_duration=scan_duration
)
elif scan_name == "xas_advanced_scan_with_xrd":
p_kink = self.scan_parameters.additional_scan_parameters.get("p_kink", None)
e_kink = self.scan_parameters.additional_scan_parameters.get("e_kink", None)
break_enable_low = self.scan_parameters.additional_scan_parameters.get(
"break_enable_low", None
)
break_enable_high = self.scan_parameters.additional_scan_parameters.get(
"break_enable_high", None
)
break_time_low = self.scan_parameters.additional_scan_parameters.get(
"break_time_low", None
)
break_time_high = self.scan_parameters.additional_scan_parameters.get(
"break_time_high", None
)
cycle_low = self.scan_parameters.additional_scan_parameters.get("cycle_low", None)
cycle_high = self.scan_parameters.additional_scan_parameters.get("cycle_high", None)
exp_time = self.scan_parameters.exp_time
n_of_trigger = self.scan_parameters.additional_scan_parameters.get(
"n_of_trigger", None
)
if any(
param is None
for param in [
start,
stop,
scan_time,
scan_duration,
p_kink,
e_kink,
break_enable_low,
break_enable_high,
break_time_low,
break_time_high,
cycle_low,
cycle_high,
exp_time,
n_of_trigger,
]
):
raise Mo1BraggError(
f"Missing scan parameters for xas_advanced_scan_with_xrd. Required parameters: start, stop, scan_time, scan_duration, p_kink, e_kink, break_enable_low, break_enable_high, break_time_low, break_time_high, cycle_low, cycle_high, exp_time, n_of_trigger in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}"
)
self.set_advanced_xas_settings(
low=start, high=stop, scan_time=scan_time, p_kink=p_kink, e_kink=e_kink
)
self.set_trig_settings(
enable_low=break_enable_low,
enable_high=break_enable_high,
break_time_low=break_time_low,
break_time_high=break_time_high,
cycle_low=cycle_low,
cycle_high=cycle_high,
exp_time=exp_time,
n_of_trigger=n_of_trigger,
)
self.set_scan_control_settings(
mode=ScanControlMode.ADVANCED, scan_duration=scan_duration
)
else:
return # Should never happen.
else:
return
# Setting scan duration seems to lag behind slightly in the backend, include small sleep
@@ -291,6 +363,13 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
self.stopped = True # Needs to be set to stop motion
######### Utility Methods #########
def _check_if_scan_name_is_valid(self, scan_parameters: ScanServerScanInfo) -> bool:
"""Check if the scan is within the list of scans for which the backend is working"""
if scan_parameters.scan_name in self.valid_scan_names:
return True
return False
def _progress_update(self, value, **kwargs) -> None:
"""Callback method to update the scan progress, runs a callback
to SUB_PROGRESS subscribers, i.e. BEC.
@@ -468,12 +547,3 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
for s in status_list:
s.wait(timeout=self.timeout_for_pvwait)
def _update_scan_parameter(self):
"""Get the scan_info parameters for the scan."""
for key, value in self.scan_info.msg.request_inputs["inputs"].items():
if hasattr(self.scan_parameter, key):
setattr(self.scan_parameter, key, value)
for key, value in self.scan_info.msg.request_inputs["kwargs"].items():
if hasattr(self.scan_parameter, key):
setattr(self.scan_parameter, key, value)
@@ -182,7 +182,7 @@ class Mo1TriggerSettings(Device):
class Mo1BraggCalculator(Device):
"""Mo1 Bragg PVs to convert angle to energy or vice-versa."""
calc_reset = Cpt(EpicsSignal, suffix="calc_reset", kind="config", put_complete=True)
calc_reset = Cpt(EpicsSignalWithRBV, suffix="calc_reset", kind="config", put_complete=True)
calc_done = Cpt(EpicsSignalRO, suffix="calc_done_RBV", kind="config")
calc_energy = Cpt(EpicsSignalWithRBV, suffix="calc_energy", kind="config")
calc_angle = Cpt(EpicsSignalWithRBV, suffix="calc_angle", kind="config")
+22 -18
View File
@@ -3,6 +3,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Literal
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase
from ophyd.status import WaitTimeoutError
@@ -18,6 +19,7 @@ from debye_bec.devices.nidaq.nidaq_enums import (
ScanRates,
ScanType,
)
from debye_bec.devices.utils.utils import fetch_scan_info
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
@@ -219,7 +221,7 @@ class Nidaq(PSIDeviceBase, NidaqControl):
def __init__(self, prefix: str = "", *, name: str, scan_info: ScanInfo = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.scan_info: ScanInfo
self.scan_parameters: ScanServerScanInfo = None
self.timeout_wait_for_signal = 5 # put 5s firsts
self._timeout_wait_for_pv = (
5 # 5s timeout for pv calls. editted due to timeout issues persisting
@@ -236,10 +238,9 @@ class Nidaq(PSIDeviceBase, NidaqControl):
# Beamline Methods #
########################################
def _check_if_scan_name_is_valid(self) -> bool:
def _check_if_scan_name_is_valid(self, scan_parameters: ScanServerScanInfo) -> bool:
"""Check if the scan is within the list of scans for which the backend is working"""
scan_name = self.scan_info.msg.scan_name
if scan_name in self.valid_scan_names:
if scan_parameters.scan_name in self.valid_scan_names:
return True
return False
@@ -396,7 +397,8 @@ class Nidaq(PSIDeviceBase, NidaqControl):
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
If the upcoming scan is not in the list of valid scans, return immediately.
"""
if not self._check_if_scan_name_is_valid():
self.scan_parameters = fetch_scan_info(self.scan_info)
if not self._check_if_scan_name_is_valid(self.scan_parameters):
return None
if self.state.get() != NidaqState.STANDBY:
@@ -406,18 +408,18 @@ class Nidaq(PSIDeviceBase, NidaqControl):
status.wait(timeout=self.timeout_wait_for_signal)
# If scan is not part of the valid_scan_names,
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
if self.scan_parameters.scan_name != "nidaq_continuous_scan": # what is the new v4 scan
self.scan_type.set(ScanType.TRIGGERED).wait(timeout=self._timeout_wait_for_pv)
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
self.enable_compression.set(1).wait(timeout=self._timeout_wait_for_pv)
else:
self.scan_type.set(ScanType.CONTINUOUS).wait(timeout=self._timeout_wait_for_pv)
self.scan_duration.set(self.scan_info.msg.scan_parameters["scan_duration"]).wait(
timeout=self._timeout_wait_for_pv
)
self.enable_compression.set(self.scan_info.msg.scan_parameters["compression"]).wait(
timeout=self._timeout_wait_for_pv
)
self.scan_duration.set(
self.scan_parameters.additional_scan_parameters["scan_duration"]
).wait(timeout=self._timeout_wait_for_pv)
self.enable_compression.set(
self.scan_parameters.additional_scan_parameters["compression"]
).wait(timeout=self._timeout_wait_for_pv)
# Stage call to IOC
status = CompareStatus(self.state, NidaqState.STAGE)
@@ -428,7 +430,7 @@ class Nidaq(PSIDeviceBase, NidaqControl):
# self.stage_call.set(1).wait(timeout=self._timeout_wait_for_pv)
self.stage_call.put(1)
status.wait(timeout=self.timeout_wait_for_signal)
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
if self.scan_parameters.scan_name != "nidaq_continuous_scan":
status = self.on_kickoff()
self.cancel_on_stop(status)
status.wait(timeout=self._timeout_wait_for_pv)
@@ -459,10 +461,10 @@ class Nidaq(PSIDeviceBase, NidaqControl):
before the motor starts its oscillation. This is needed for being properly homed.
The NIDAQ should go into Acquiring mode.
"""
if not self._check_if_scan_name_is_valid():
if not self._check_if_scan_name_is_valid(self.scan_parameters):
return None
if self.scan_info.msg.scan_name == "nidaq_continuous_scan":
if self.scan_parameters.scan_name == "nidaq_continuous_scan":
logger.info(f"Device {self.name} ready to be kicked off for nidaq_continuous_scan")
return None
@@ -483,12 +485,12 @@ class Nidaq(PSIDeviceBase, NidaqControl):
For the NIDAQ we use this method to stop the backend since it
would not stop by itself in its current implementation since the number of points are not predefined.
"""
if not self._check_if_scan_name_is_valid():
if not self._check_if_scan_name_is_valid(self.scan_parameters):
return None
status = CompareStatus(self.state, NidaqState.STANDBY)
self.cancel_on_stop(status)
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
if self.scan_parameters.scan_name != "nidaq_continuous_scan":
self.on_stop()
return status
@@ -499,7 +501,9 @@ class Nidaq(PSIDeviceBase, NidaqControl):
Args:
value (int) : current progress value
"""
scan_duration = self.scan_info.msg.scan_parameters.get("scan_duration", None)
if self.scan_parameters is None:
return
scan_duration = self.scan_parameters.additional_scan_parameters.get("scan_duration", None)
if not isinstance(scan_duration, (int, float)):
return
value = scan_duration - value
+79 -60
View File
@@ -11,16 +11,26 @@ from typing import TYPE_CHECKING, Tuple
import numpy as np
from bec_lib.file_utils import get_full_path
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd import Component as Cpt
from ophyd import EpicsSignal, EpicsSignalRO, Kind
from ophyd.areadetector.cam import ADBase, PilatusDetectorCam
from ophyd.areadetector.plugins import HDF5Plugin_V22 as HDF5Plugin
from ophyd.areadetector.plugins import ImagePlugin_V22 as ImagePlugin
from ophyd.status import WaitTimeoutError
from ophyd_devices import AndStatus, CompareStatus, DeviceStatus, FileEventSignal, PreviewSignal
from ophyd_devices import (
AndStatus,
CompareStatus,
DeviceStatus,
ExceptionStatus,
FileEventSignal,
PreviewSignal,
)
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from pydantic import BaseModel, Field
from debye_bec.devices.utils.utils import fetch_scan_info
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
from bec_lib.messages import DevicePreviewMessage, ScanStatusMessage
@@ -145,17 +155,17 @@ class Pilatus(PSIDeviceBase, ADBase):
# USER_ACCESS = ["start_live_mode", "stop_live_mode"]
cam_gain_menu_string = Cpt(EpicsSignalRO, suffix='cam1:GainMenu', string=True)
cam_gain_menu_string = Cpt(EpicsSignalRO, suffix="cam1:GainMenu", string=True)
_default_configuration_attrs = [
'cam.threshold_energy',
'cam.threshold_auto_apply',
'cam.gain_menu',
'cam_gain_menu_string',
'cam.pixel_cut_off',
'cam.acquire_time',
'cam.num_exposures',
'cam.model',
"cam.threshold_energy",
"cam.threshold_auto_apply",
"cam.gain_menu",
"cam_gain_menu_string",
"cam.pixel_cut_off",
"cam.acquire_time",
"cam.num_exposures",
"cam.model",
]
cam = Cpt(PilatusDetectorCam, "cam1:")
@@ -233,7 +243,6 @@ class Pilatus(PSIDeviceBase, ADBase):
super().__init__(
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
self.scan_parameter = ScanParameter()
self.device_manager = device_manager
self._readout_time = PILATUS_READOUT_TIME
self._full_path = ""
@@ -251,6 +260,7 @@ class Pilatus(PSIDeviceBase, ADBase):
# self._live_mode_run_event = threading.Event()
# self._live_mode_stopped_event = threading.Event()
# self._live_mode_stopped_event.set() # Initial state is stopped
self.scan_parameters: ScanServerScanInfo = None
########################################
# Custom Beamline Methods #
@@ -368,19 +378,22 @@ class Pilatus(PSIDeviceBase, ADBase):
status = status_acquire & status_writing & status_cam_server
return status
def _calculate_trigger(self, scan_msg: ScanStatusMessage) -> Tuple[float, float]:
self._update_scan_parameter()
def _calculate_trigger(self, scan_parameters: ScanServerScanInfo) -> Tuple[float, float]:
total_osc = 0
calc_duration = 0
total_trig_lo = 0
total_trig_hi = 0
# Switching high/low is intended as angle is inverse to energy and settings in BEC are always in energy
loc_break_enable_low = self.scan_parameter.break_enable_high
loc_break_time_low = self.scan_parameter.break_time_high
loc_cycle_low = self.scan_parameter.cycle_high
loc_break_enable_high = self.scan_parameter.break_enable_low
loc_break_time_high = self.scan_parameter.break_time_low
loc_cycle_high = self.scan_parameter.cycle_low
loc_break_enable_low = scan_parameters.additional_scan_parameters.get(
"break_enable_high", False
)
loc_break_time_low = scan_parameters.additional_scan_parameters.get("break_time_high", 0)
loc_cycle_low = scan_parameters.additional_scan_parameters.get("cycle_high", 1)
loc_break_enable_high = scan_parameters.additional_scan_parameters.get(
"break_enable_low", False
)
loc_break_time_high = scan_parameters.additional_scan_parameters.get("break_time_low", 0)
loc_cycle_high = scan_parameters.additional_scan_parameters.get("cycle_low", 1)
if not loc_break_enable_low:
loc_break_time_low = 0
@@ -389,28 +402,36 @@ class Pilatus(PSIDeviceBase, ADBase):
loc_break_time_high = 0
loc_cycle_high = 1
total_osc = self.scan_parameter.scan_duration / (
self.scan_parameter.scan_time +
loc_break_time_low / (2 * loc_cycle_low) +
loc_break_time_high / (2 * loc_cycle_high)
total_osc = scan_parameters.additional_scan_parameters.get("scan_duration", 0) / (
scan_parameters.additional_scan_parameters.get("scan_time", 0)
+ loc_break_time_low / (2 * loc_cycle_low)
+ loc_break_time_high / (2 * loc_cycle_high)
)
total_osc = np.ceil(total_osc)
total_osc = total_osc + total_osc % 2 # round up to the next even number
total_osc = total_osc + total_osc % 2 # round up to the next even number
if loc_break_enable_low:
total_trig_lo = np.floor(total_osc / (2 * loc_cycle_low))
if loc_break_enable_high:
total_trig_hi = np.floor(total_osc / (2 * loc_cycle_high))
calc_duration = total_osc * self.scan_parameter.scan_time + total_trig_lo * loc_break_time_low + total_trig_hi * loc_break_time_high
if calc_duration < self.scan_parameter.scan_duration:
calc_duration = (
total_osc * scan_parameters.additional_scan_parameters.get("scan_time", 0)
+ total_trig_lo * loc_break_time_low
+ total_trig_hi * loc_break_time_high
)
if calc_duration < scan_parameters.additional_scan_parameters.get("scan_duration", 0):
# Due to inaccuracy in formula, this can happen, we then need to manually add two oscillations and recalculate the triggers
total_osc = total_osc + 2
if loc_break_enable_low:
total_trig_lo = np.floor(total_osc / (2 * loc_cycle_low))
if loc_break_enable_high:
total_trig_hi = np.floor(total_osc / (2 * loc_cycle_high))
calc_duration = total_osc * self.scan_parameter.scan_time + total_trig_lo * loc_break_time_low + total_trig_hi * loc_break_time_high
calc_duration = (
total_osc * scan_parameters.additional_scan_parameters.get("scan_time", 0)
+ total_trig_lo * loc_break_time_low
+ total_trig_hi * loc_break_time_high
)
return total_trig_lo, total_trig_hi
@@ -464,6 +485,7 @@ class Pilatus(PSIDeviceBase, ADBase):
(self.scan_info.msg) object.
"""
# self.stop_live_mode() # Make sure that live mode is stopped if scan runs
self.scan_parameters = fetch_scan_info(self.scan_info)
# If user has activated alignment mode on qt panel, switch back to multitrigger and stop acquisition
if self.cam.trigger_mode.get() != TRIGGERMODE.MULT_TRIGGER.value:
@@ -473,25 +495,29 @@ class Pilatus(PSIDeviceBase, ADBase):
status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.DONE.value)
status_cam.wait(timeout=5)
scan_msg: ScanStatusMessage = self.scan_info.msg
if scan_msg.scan_name in self.xas_xrd_scan_names:
self._update_scan_parameter()
if self.scan_parameters.scan_name in self.xas_xrd_scan_names:
# Compute number of triggers
total_trig_lo, total_trig_hi = self._calculate_trigger(scan_msg)
total_trig_lo, total_trig_hi = self._calculate_trigger(self.scan_parameters)
# Set the number of images, we may also set this to a higher values if preferred and stop the acquisition
# TODO This logic is prone to errors, as we rely on the scans to nicely resolve to n_images. We should
# use here instead a way of settings the n_images independently of the scan parameters to avoid running out of sync
# with the complete method. Ideally we comput them in the scan itself.. This is much safer IMO!
self.n_images = (total_trig_lo + total_trig_hi) * self.scan_parameter.n_of_trigger
exp_time = self.scan_parameter.exp_time
self.n_images = (
total_trig_lo + total_trig_hi
) * self.scan_parameters.additional_scan_parameters.get("n_of_trigger", 1)
exp_time = self.scan_parameters.exp_time
self.trigger_source.set(MONOTRIGGERSOURCE.INPOS).wait(5)
self.trigger_n_of.set(self.scan_parameter.n_of_trigger).wait(5)
elif scan_msg.scan_type == "step":
self.n_images = scan_msg.num_points * scan_msg.scan_parameters.get(
"frames_per_trigger", 1
self.trigger_n_of.set(
self.scan_parameters.additional_scan_parameters.get("n_of_trigger", 1)
).wait(5)
# TODO migrate logic to v4 once old scans are deprecated,
# TODO if num_points=None and no logic from scan_name applies, can't measure with this detector..
elif self.scan_parameters.scan_type == "software_triggered":
self.n_images = (
self.scan_parameters.num_monitored_readouts
* self.scan_parameters.frames_per_trigger
)
exp_time = scan_msg.scan_parameters.get("exp_time")
exp_time = self.scan_parameters.exp_time
self.trigger_source.set(MONOTRIGGERSOURCE.EPICS).wait(5)
self.trigger_n_of.set(1).wait(5) # BEC will trigger each acquisition
else:
@@ -512,7 +538,7 @@ class Pilatus(PSIDeviceBase, ADBase):
)
)
detector_exp_time = exp_time - self._readout_time
self._full_path = get_full_path(scan_msg, name="pilatus")
self._full_path = get_full_path(self.scan_info.msg, name="pilatus")
file_path = "/".join(self._full_path.split("/")[:-1])
file_name = self._full_path.split("/")[-1]
# Prepare detector and backend
@@ -544,9 +570,9 @@ class Pilatus(PSIDeviceBase, ADBase):
def on_pre_scan(self) -> DeviceStatus | None:
"""Called right before the scan starts on all devices automatically."""
scan_msg: ScanStatusMessage = self.scan_info.msg
if (
scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == "step"
self.scan_parameters.scan_name in self.xas_xrd_scan_names
or self.scan_parameters.scan_type == "software_triggered"
): # TODO how to deal with fly scans?
status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.ACQUIRING.value)
status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.ACQUIRING.value)
@@ -561,8 +587,7 @@ class Pilatus(PSIDeviceBase, ADBase):
def on_trigger(self) -> DeviceStatus | None:
"""Called when the device is triggered."""
scan_msg: ScanStatusMessage = self.scan_info.msg
if not scan_msg.scan_type == "step":
if not self.scan_parameters.scan_type == "software_triggered":
return None
start_time = time.time()
img_counter = self.hdf.num_captured.get()
@@ -575,9 +600,9 @@ class Pilatus(PSIDeviceBase, ADBase):
def _complete_callback(self, status: DeviceStatus):
"""Callback for when the device completes a scan."""
scan_msg: ScanStatusMessage = self.scan_info.msg
if (
scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == "step"
self.scan_parameters.scan_name in self.xas_xrd_scan_names
or self.scan_parameters.scan_type == "software_triggered"
): # TODO how to deal with fly scans?
if status.success:
self.file_event.put(
@@ -598,14 +623,15 @@ class Pilatus(PSIDeviceBase, ADBase):
def on_complete(self) -> DeviceStatus | None:
"""Called to inquire if a device has completed a scans."""
scan_msg: ScanStatusMessage = self.scan_info.msg
if (
scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == "step"
self.scan_parameters.scan_name in self.xas_xrd_scan_names
or self.scan_parameters.scan_type == "software_triggered"
): # TODO how to deal with fly scans?
status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.DONE.value)
status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.DONE.value)
status_cam_server = CompareStatus(self.cam.armed, DETECTORSTATE.UNARMED.value)
if self.scan_info.msg.scan_name in self.xas_xrd_scan_names:
# status_write_error = ExceptionStatus(self.hdf.write_status, 0, operation="!=")
if self.scan_parameters.scan_name in self.xas_xrd_scan_names:
# For long scans, it can be that the mono will execute one cycle more,
# meaning a few more XRD triggers will be sent
status_img_written = CompareStatus(
@@ -614,7 +640,9 @@ class Pilatus(PSIDeviceBase, ADBase):
else:
status_img_written = CompareStatus(self.hdf.num_captured, self.n_images)
status_img_written = CompareStatus(self.hdf.num_captured, self.n_images)
status = status_hdf & status_cam & status_img_written & status_cam_server
status = (
status_hdf & status_cam & status_img_written & status_cam_server
) # & status_write_error
status.add_callback(self._complete_callback) # Callback that writing was successful
self.cancel_on_stop(status)
return status
@@ -635,15 +663,6 @@ class Pilatus(PSIDeviceBase, ADBase):
# TODO do we need to clean the poll thread ourselves?
self.on_stop()
def _update_scan_parameter(self):
"""Get the scan_info parameters for the scan."""
for key, value in self.scan_info.msg.request_inputs["inputs"].items():
if hasattr(self.scan_parameter, key):
setattr(self.scan_parameter, key, value)
for key, value in self.scan_info.msg.request_inputs["kwargs"].items():
if hasattr(self.scan_parameter, key):
setattr(self.scan_parameter, key, value)
if __name__ == "__main__":
try:
View File
+26
View File
@@ -0,0 +1,26 @@
"""Utility functions for the devices."""
from copy import deepcopy
import numpy as np
from bec_lib.devicemanager import ScanInfo
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from pydantic import ValidationError
def fetch_scan_info(scan_info: ScanInfo) -> ScanServerScanInfo:
"""Fetch the scan parameters from the scan_info object and return them as a ScanServerScanInfo object."""
info = scan_info.msg.info
if isinstance(info["positions"], list):
info["positions"] = np.array(info["positions"])
try:
msg = ScanServerScanInfo.model_validate(info)
except ValidationError: # This means we have an old scan_info object.
info = deepcopy(info)
# We need to convert a few parameters manually.
info["scan_type"] = (
"hardware_triggered" if info["scan_type"] == "fly" else "software_triggered"
)
msg = ScanServerScanInfo.model_validate(info)
return msg
+59 -51
View File
@@ -236,52 +236,57 @@ class DebyeNexusStructure(DefaultFormat):
self.configuration.get("nidaq", {}).get("nidaq_add_chans", {}).get("value")
)
rle = self.configuration.get("nidaq", {}).get("nidaq_rle", {}).get("value")
measurement_mode = entry.create_group(name="mode")
measurement_mode.attrs["NX_class"] = "NX_CHAR"
if (int(ci_chans_bits) & 0x7F) != 0:
# Create a dataset
rayspec_sdd_active = measurement_mode.create_group(
name="Multi_Element_Partial_Fluorescence_Yield"
)
me_sdd = rayspec_sdd_active.create_dataset(
name="Detector", data="Rayspec 7 element Silicon Drift Detector"
)
me_sdd.attrs["NX_class"] = "NX_CHAR"
if ci_chans_bits is not None:
if (int(ci_chans_bits) & 0x7F) != 0:
# Create a dataset
rayspec_sdd_active = measurement_mode.create_group(
name="Multi_Element_Partial_Fluorescence_Yield"
)
me_sdd = rayspec_sdd_active.create_dataset(
name="Detector", data="Rayspec 7 element Silicon Drift Detector"
)
me_sdd.attrs["NX_class"] = "NX_CHAR"
if (int(ci_chans_bits) & (1 << 8)) != 0:
# Create a dataset
ketek_sdd_active = measurement_mode.create_group(
name="Single_Element_Partial_Fluorescence_Yield"
)
se_sdd = ketek_sdd_active.create_dataset(
name="Detector", data="Ketex mini single element Silicon Drift Detector"
)
se_sdd.attrs["NX_class"] = "NX_CHAR"
if (int(ci_chans_bits) & (1 << 8)) != 0:
# Create a dataset
ketek_sdd_active = measurement_mode.create_group(
name="Single_Element_Partial_Fluorescence_Yield"
)
se_sdd = ketek_sdd_active.create_dataset(
name="Detector", data="Ketex mini single element Silicon Drift Detector"
)
se_sdd.attrs["NX_class"] = "NX_CHAR"
if (int(ai_chans_bits) & (1 << 6)) != 0:
# Create a dataset
pips_active = measurement_mode.create_group(name="Total_Flourescence_Yield")
tfy = pips_active.create_dataset(
name="Detector", data="Mirion Technologies Partially Depeleted PIPS Detector"
)
tfy.attrs["NX_class"] = "NX_CHAR"
if ai_chans_bits is not None:
if (int(ai_chans_bits) & (1 << 6)) != 0:
# Create a dataset
pips_active = measurement_mode.create_group(name="Total_Flourescence_Yield")
tfy = pips_active.create_dataset(
name="Detector",
data="Mirion Technologies Partially Depeleted PIPS Detector",
)
tfy.attrs["NX_class"] = "NX_CHAR"
if ((int(ai_chans_bits) & (1 << 0)) != 0) & ((int(ai_chans_bits) & (1 << 2)) != 0):
# Create a dataset
ai0ai2_active = measurement_mode.create_group(name="Sample_Transmission")
sam_trans = ai0ai2_active.create_dataset(
name="Detector", data="Ionitec 15 cm gas filled Ionisation Chambers"
)
sam_trans.attrs["NX_class"] = "NX_CHAR"
if ((int(ai_chans_bits) & (1 << 0)) != 0) & ((int(ai_chans_bits) & (1 << 2)) != 0):
# Create a dataset
ai0ai2_active = measurement_mode.create_group(name="Sample_Transmission")
sam_trans = ai0ai2_active.create_dataset(
name="Detector", data="Ionitec 15 cm gas filled Ionisation Chambers"
)
sam_trans.attrs["NX_class"] = "NX_CHAR"
if ((int(ai_chans_bits) & (1 << 2)) != 0) & ((int(ai_chans_bits) & (1 << 4)) != 0):
# Create a dataset
ai2ai4_active = measurement_mode.create_group(name="Reference_Transmission")
ref_trans = ai2ai4_active.create_dataset(
name="Detector", data="Ionitec 15 cm gas filled Ionisation Chambers"
)
ref_trans.attrs["NX_class"] = "NX_CHAR"
if ((int(ai_chans_bits) & (1 << 2)) != 0) & ((int(ai_chans_bits) & (1 << 4)) != 0):
# Create a dataset
ai2ai4_active = measurement_mode.create_group(name="Reference_Transmission")
ref_trans = ai2ai4_active.create_dataset(
name="Detector", data="Ionitec 15 cm gas filled Ionisation Chambers"
)
ref_trans.attrs["NX_class"] = "NX_CHAR"
main_data = entry.create_group(name="data")
main_data.attrs["NX_class"] = "NXdata"
@@ -308,10 +313,11 @@ class DebyeNexusStructure(DefaultFormat):
i0.attrs["NX_class"] = "NXdata"
i0.attrs["units"] = "V"
main_data.create_soft_link(
name="i0",
target="/entry/collection/readout_groups/async/nidaq/nidaq_ai0_mean/value",
)
if rle:
target = "/entry/collection/readout_groups/async/nidaq/nidaq_ai0_mean/value"
else:
target = "/entry/collection/readout_groups/async/nidaq/nidaq_ai0/value"
main_data.create_soft_link(name="i0", target=target)
##################
## i1
@@ -322,10 +328,11 @@ class DebyeNexusStructure(DefaultFormat):
i1.attrs["NX_class"] = "NXdata"
i1.attrs["units"] = "V"
main_data.create_soft_link(
name="i1",
target="/entry/collection/readout_groups/async/nidaq/nidaq_ai2_mean/value",
)
if rle:
target = "/entry/collection/readout_groups/async/nidaq/nidaq_ai2_mean/value"
else:
target = "/entry/collection/readout_groups/async/nidaq/nidaq_ai2/value"
main_data.create_soft_link(name="i1", target=target)
##################
## i2
@@ -336,10 +343,11 @@ class DebyeNexusStructure(DefaultFormat):
i2.attrs["NX_class"] = "NXdata"
i2.attrs["units"] = "V"
main_data.create_soft_link(
name="i2",
target="/entry/collection/readout_groups/async/nidaq/nidaq_ai4_mean/value",
)
if rle:
target = "/entry/collection/readout_groups/async/nidaq/nidaq_ai4_mean/value"
else:
target = "/entry/collection/readout_groups/async/nidaq/nidaq_ai4/value"
main_data.create_soft_link(name="i2", target=target)
##################
## ci sum
+6 -6
View File
@@ -1,7 +1,7 @@
from .mono_bragg_scans import (
XASAdvancedScan,
XASAdvancedScanWithXRD,
XASSimpleScan,
XASSimpleScanWithXRD,
from .nidaq_continuous_scan import NidaqContinuousScan
from .xas_simple_scan import (
XasAdvancedScan,
XasAdvancedScanWithXrd,
XasSimpleScan,
XasSimpleScanWithXrd,
)
from .nidaq_cont_scan import NIDAQContinuousScan
-310
View File
@@ -1,310 +0,0 @@
"""This module contains the scan classes for the mono bragg motor of the Debye beamline."""
import time
from typing import Literal
import numpy as np
from bec_lib.device import DeviceBase
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase
logger = bec_logger.logger
class XASSimpleScan(AsyncFlyScanBase):
"""Class for the XAS simple scan"""
scan_name = "xas_simple_scan"
scan_type = "fly"
scan_report_hint = "device_progress"
required_kwargs = []
use_scan_progress_report = False
pre_move = False
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration"],
}
def __init__(
self,
start: float,
stop: float,
scan_time: float,
scan_duration: float,
motor: DeviceBase = "mo1_bragg",
**kwargs,
):
"""The xas_simple_scan is used to start a simple oscillating scan on the mono bragg motor.
Start and Stop define the energy range for the scan, scan_time is the time for one scan
cycle and scan_duration is the duration of the scan. If scan duration is set to 0, the
scan will run infinitely.
Args:
start (float): Start energy for the scan.
stop (float): Stop energy for the scan.
scan_time (float): Time for one scan cycle.
scan_duration (float): Duration of the scan.
motor (DeviceBase, optional): Motor device to be used for the scan.
Defaults to "mo1_bragg".
Examples:
>>> scans.xas_simple_scan(start=8000, stop=9000, scan_time=1, scan_duration=10)
"""
super().__init__(**kwargs)
self.motor = motor
self.start = start
self.stop = stop
self.scan_time = scan_time
self.scan_duration = scan_duration
self.primary_readout_cycle = 1
def update_readout_priority(self):
"""Ensure that NIDAQ is not monitored for any quick EXAFS."""
super().update_readout_priority()
self.readout_priority["async"].append("nidaq")
def prepare_positions(self):
"""Prepare the positions for the scan.
Use here only start and end energy defining the range for the scan.
"""
self.positions = np.array([self.start, self.stop], dtype=float)
self.num_pos = None
yield None
def pre_scan(self):
"""Pre Scan action."""
self._check_limits()
# Ensure parent class pre_scan actions to be called.
yield from super().pre_scan()
def scan_report_instructions(self):
"""
Return the instructions for the scan report.
"""
yield from self.stubs.scan_report_instruction({"device_progress": [self.motor]})
def scan_core(self):
"""Run the scan core.
Kickoff the oscillation on the Bragg motor and wait for the completion of the motion.
"""
# Start the oscillation on the Bragg motor.
yield from self.stubs.kickoff(device=self.motor)
complete_status = yield from self.stubs.complete(device=self.motor, wait=False)
while not complete_status.done:
# Readout monitored devices
yield from self.stubs.read(group="monitored", point_id=self.point_id)
time.sleep(self.primary_readout_cycle)
self.point_id += 1
self.num_pos = self.point_id
class XASSimpleScanWithXRD(XASSimpleScan):
"""Class for the XAS simple scan with XRD"""
scan_name = "xas_simple_scan_with_xrd"
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration"],
"Low Energy Break": ["break_enable_low", "break_time_low", "cycle_low"],
"High Energy Break": ["break_enable_high", "break_time_high", "cycle_high"],
"XRD Triggers": ["exp_time", "n_of_trigger"],
}
def __init__(
self,
start: float,
stop: float,
scan_time: float,
scan_duration: float,
break_enable_low: bool,
break_time_low: float,
cycle_low: int,
break_enable_high: bool,
break_time_high: float,
cycle_high: float,
exp_time: float,
n_of_trigger: int,
motor: DeviceBase = "mo1_bragg",
**kwargs,
):
"""The xas_simple_scan_with_xrd is an oscillation motion on the mono motor
with XRD triggering at low and high energy ranges.
If scan duration is set to 0, the scan will run infinitely.
Args:
start (float): Start energy for the scan.
stop (float): Stop energy for the scan.
scan_time (float): Time for one oscillation .
scan_duration (float): Total duration of the scan.
break_enable_low (bool): Enable breaks for the low energy range.
break_time_low (float): Break time for the low energy range.
cycle_low (int): Specify how often the triggers should be considered,
every nth cycle for low
break_enable_high (bool): Enable breaks for the high energy range.
break_time_high (float): Break time for the high energy range.
cycle_high (int): Specify how often the triggers should be considered,
every nth cycle for high
exp_time (float): Length of 1 trigger period in seconds
n_of_trigger (int): Amount of triggers to be fired during break
motor (DeviceBase, optional): Motor device to be used for the scan.
Defaults to "mo1_bragg".
Examples:
>>> scans.xas_simple_scan_with_xrd(start=8000, stop=9000, scan_time=1, scan_duration=10, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000)
"""
super().__init__(
start=start,
stop=stop,
scan_time=scan_time,
scan_duration=scan_duration,
motor=motor,
**kwargs,
)
self.break_enable_low = break_enable_low
self.break_time_low = break_time_low
self.cycle_low = cycle_low
self.break_enable_high = break_enable_high
self.break_time_high = break_time_high
self.cycle_high = cycle_high
self.exp_time = exp_time
self.n_of_trigger = n_of_trigger
class XASAdvancedScan(XASSimpleScan):
"""Class for the XAS advanced scan"""
scan_name = "xas_advanced_scan"
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration"],
"Spline Parameters": ["p_kink", "e_kink"],
}
def __init__(
self,
start: float,
stop: float,
scan_time: float,
scan_duration: float,
p_kink: float,
e_kink: float,
motor: DeviceBase = "mo1_bragg",
**kwargs,
):
"""The xas_advanced_scan is an oscillation motion on the mono motor.
Start and Stop define the energy range for the scan, scan_time is the
time for one scan cycle and scan_duration is the duration of the scan.
If scan duration is set to 0, the scan will run infinitely.
p_kink and e_kink add a kink to the motion profile to slow down in the
exafs region of the scan.
Args:
start (float): Start angle for the scan.
stop (float): Stop angle for the scan.
scan_time (float): Time for one oscillation .
scan_duration (float): Total duration of the scan.
p_kink (float): Position of the kink.
e_kink (float): Energy of the kink.
motor (DeviceBase, optional): Motor device to be used for the scan.
Defaults to "mo1_bragg".
Examples:
>>> scans.xas_advanced_scan(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500)
"""
super().__init__(
start=start,
stop=stop,
scan_time=scan_time,
scan_duration=scan_duration,
motor=motor,
**kwargs,
)
self.p_kink = p_kink
self.e_kink = e_kink
class XASAdvancedScanWithXRD(XASAdvancedScan):
"""Class for the XAS advanced scan with XRD"""
scan_name = "xas_advanced_scan_with_xrd"
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration"],
"Spline Parameters": ["p_kink", "e_kink"],
"Low Energy Break": ["break_enable_low", "break_time_low", "cycle_low"],
"High Energy Break": ["break_enable_high", "break_time_high", "cycle_high"],
"XRD Triggers": ["exp_time", "n_of_trigger"],
}
def __init__(
self,
start: float,
stop: float,
scan_time: float,
scan_duration: float,
p_kink: float,
e_kink: float,
break_enable_low: bool,
break_time_low: float,
cycle_low: int,
break_enable_high: bool,
break_time_high: float,
cycle_high: float,
exp_time: float,
n_of_trigger: int,
motor: DeviceBase = "mo1_bragg",
**kwargs,
):
"""The xas_advanced_scan is an oscillation motion on the mono motor
with XRD triggering at low and high energy ranges.
Start and Stop define the energy range for the scan, scan_time is the time for
one scan cycle and scan_duration is the duration of the scan. If scan duration
is set to 0, the scan will run infinitely. p_kink and e_kink add a kink to the
motion profile to slow down in the exafs region of the scan.
Args:
start (float): Start angle for the scan.
stop (float): Stop angle for the scan.
scan_time (float): Time for one oscillation .
scan_duration (float): Total duration of the scan.
p_kink (float): Position of kink.
e_kink (float): Energy of the kink.
break_enable_low (bool): Enable breaks for the low energy range.
break_time_low (float): Break time for the low energy range.
cycle_low (int): Specify how often the triggers should be considered,
every nth cycle for low
break_enable_high (bool): Enable breaks for the high energy range.
break_time_high (float): Break time for the high energy range.
cycle_high (int): Specify how often the triggers should be considered,
every nth cycle for high
exp_time (float): Length of 1 trigger period in seconds
n_of_trigger (int): Amount of triggers to be fired during break
motor (DeviceBase, optional): Motor device to be used for the scan.
Defaults to "mo1_bragg".
Examples:
>>> scans.xas_advanced_scan_with_xrd(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000)
"""
super().__init__(
start=start,
stop=stop,
scan_time=scan_time,
scan_duration=scan_duration,
p_kink=p_kink,
e_kink=e_kink,
motor=motor,
**kwargs,
)
self.p_kink = p_kink
self.e_kink = e_kink
self.break_enable_low = break_enable_low
self.break_time_low = break_time_low
self.cycle_low = cycle_low
self.break_enable_high = break_enable_high
self.break_time_high = break_time_high
self.cycle_high = cycle_high
self.exp_time = exp_time
self.n_of_trigger = n_of_trigger
-84
View File
@@ -1,84 +0,0 @@
"""This module contains the scan class for the nidaq of the Debye beamline for use in continuous mode."""
import time
from typing import Literal
import numpy as np
from bec_lib.device import DeviceBase
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase
logger = bec_logger.logger
class NIDAQContinuousScan(AsyncFlyScanBase):
"""Class for the nidaq continuous scan (without mono)"""
scan_name = "nidaq_continuous_scan"
scan_type = "fly"
scan_report_hint = "device_progress"
required_kwargs = []
use_scan_progress_report = False
pre_move = False
gui_config = {"Scan Parameters": ["scan_duration"], "Data Compression": ["compression"]}
def __init__(
self, scan_duration: float, daq: DeviceBase = "nidaq", compression: bool = False, **kwargs
):
"""The NIDAQ continuous scan is used to measure with the NIDAQ without moving the
monochromator or any other motor. The NIDAQ thus runs in continuous mode, with a
set scan_duration.
Args:
scan_duration (float): Duration of the scan.
daq (DeviceBase, optional): DAQ device to be used for the scan.
Defaults to "nidaq".
Examples:
>>> scans.nidaq_continuous_scan(scan_duration=10)
"""
super().__init__(**kwargs)
self.scan_duration = scan_duration
self.daq = daq
self.start_time = 0
self.primary_readout_cycle = 1
self.scan_parameters["scan_duration"] = scan_duration
self.scan_parameters["compression"] = compression
def update_readout_priority(self):
"""Ensure that NIDAQ is not monitored for any quick EXAFS."""
super().update_readout_priority()
self.readout_priority["async"].append("nidaq")
def prepare_positions(self):
"""Prepare the positions for the scan."""
yield None
def pre_scan(self):
"""Pre Scan action."""
self.start_time = time.time()
# Ensure parent class pre_scan actions to be called.
yield from super().pre_scan()
def scan_report_instructions(self):
"""
Return the instructions for the scan report.
"""
yield from self.stubs.scan_report_instruction({"device_progress": [self.daq]})
def scan_core(self):
"""Run the scan core.
Kickoff the acquisition of the NIDAQ wait for the completion of the scan.
"""
kickoff_status = yield from self.stubs.kickoff(device=self.daq)
kickoff_status.wait(timeout=5) # wait for proper kickoff of device
complete_status = yield from self.stubs.complete(device=self.daq, wait=False)
while not complete_status.done:
# Readout monitored devices
yield from self.stubs.read(group="monitored", point_id=self.point_id)
time.sleep(self.primary_readout_cycle)
self.point_id += 1
self.num_pos = self.point_id
+174
View File
@@ -0,0 +1,174 @@
"""
The NIDAQ continuous scan is used to measure with the NIDAQ without moving the monochromator or any other motor.
Scan procedure:
- prepare_scan
- open_scan
- stage
- pre_scan
- scan_core
- at_each_point (optionally called by scan_core)
- post_scan
- unstage
- close_scan
- on_exception (called if any exception is raised during the scan)
"""
from __future__ import annotations
import time
from typing import Annotated
from bec_lib.device import DeviceBase
from bec_lib.scan_args import ScanArgument, Units
from bec_server.scan_server.scans.scan_base import ScanBase, ScanType
from bec_server.scan_server.scans.scan_modifier import scan_hook
class NidaqContinuousScan(ScanBase):
# Scan Type: Hardware triggered or software triggered?
# If the main trigger and readout logic is done within the at_each_point method in scan_core, choose SOFTWARE_TRIGGERED.
# If the main trigger and readout logic is implemented on a device that is simply kicked off in this scan, choose HARDWARE_TRIGGERED.
# This primarily serves as information for devices: The device may need to react differently if a software trigger is expected
# for every point.
scan_type = ScanType.HARDWARE_TRIGGERED
# Scan name: This is the name of the scan, e.g. "line_scan". This is used for display purposes and to identify the scan type in user interfaces.
# Choose a descriptive name that does not conflict with existing scan names.
# It must be a valid Python identifier, that is, it can only contain letters, numbers, and underscores, and must not start with a number.
scan_name = "nidaq_continuous_scan"
gui_config = {"Scan Parameters": ["scan_duration", "daq", "compression"]}
def __init__(
self,
#fmt: off
scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Duration of the scan", units=Units.s)],
daq: Annotated[DeviceBase | None, ScanArgument(display_name="Daq", description="DAQ device to be used for the scan")] = None,
compression: Annotated[bool, ScanArgument(display_name="Compression", description="Whether to compress the data")]= False,
#fmt: on
**kwargs,
):
"""
The NIDAQ continuous scan is used to measure with the NIDAQ without moving the
monochromator or any other motor. The NIDAQ thus runs in continuous mode, with a
set scan_duration.
Args:
scan_duration (float): Duration of the scan
daq (DeviceBase): DAQ device to be used for the scan
compression (bool): Whether to compress the data
Returns:
ScanReport
"""
super().__init__(**kwargs)
self._baseline_readout_status = None
self.scan_duration = scan_duration
self.daq = daq or self.dev["nidaq"]
self.compression = compression
self.monitored_readout_cycle = 1.0 # seconds
self.motors = [self.daq]
self.update_scan_info(scan_duration=scan_duration, compression=compression)
self.actions.set_device_readout_priority([self.daq], priority="async")
@scan_hook
def prepare_scan(self):
"""
Prepare the scan. This can include any steps that need to be executed
before the scan is opened, such as preparing the positions (if not done already)
or setting up the devices.
"""
self.actions.add_scan_report_instruction_device_progress(self.daq)
self._baseline_readout_status = self.actions.read_baseline_devices(wait=False)
@scan_hook
def open_scan(self):
"""
Open the scan.
This step must call self.actions.open_scan() to ensure that a new scan is
opened. Make sure to prepare the scan metadata before, either in
prepare_scan() or in open_scan() itself and call self.update_scan_info(...)
to update the scan metadata if needed.
"""
self.actions.open_scan()
@scan_hook
def stage(self):
"""
Stage the devices for the upcoming scan. The stage logic is typically
implemented on the device itself (i.e. by the device's stage method).
However, if there are any additional steps that need to be executed before
staging the devices, they can be implemented here.
"""
self.actions.stage_all_devices()
@scan_hook
def pre_scan(self):
"""
Pre-scan steps to be executed before the main scan logic.
This is typically the last chance to prepare the devices before the core scan
logic is executed. For example, this is a good place to initialize time-criticial
devices, e.g. devices that have a short timeout.
The pre-scan logic is typically implemented on the device itself.
"""
self.actions.pre_scan_all_devices()
@scan_hook
def scan_core(self):
"""
Core scan logic to be executed during the scan.
This is where the main scan logic should be implemented.
"""
kickoff_status = self.actions.kickoff(device=self.daq, wait=False)
kickoff_status.wait(timeout=5) # wait for proper kickoff of device
complete_status = self.actions.complete(device=self.daq, wait=False)
while not complete_status.done:
self.at_each_point()
time.sleep(self.monitored_readout_cycle)
@scan_hook
def at_each_point(self):
"""
Logic to be executed at each acquisition point during the scan.
"""
self.actions.read_monitored_devices()
@scan_hook
def post_scan(self):
"""
Post-scan steps to be executed after the main scan logic.
"""
self.actions.complete_all_devices()
@scan_hook
def unstage(self):
"""Unstage the scan by executing post-scan steps."""
self.actions.unstage_all_devices()
@scan_hook
def close_scan(self):
"""Close the scan."""
if self._baseline_readout_status is not None:
self._baseline_readout_status.wait()
self.actions.close_scan()
self.actions.check_for_unchecked_statuses()
@scan_hook
def on_exception(self, exception: Exception):
"""
Handle exceptions that occur during the scan.
This is a good place to implement any cleanup logic that needs to be executed in case of an exception,
such as returning the devices to a safe state or moving the motors back to their starting position.
"""
#######################################################
######### Helper methods for the scan logic ###########
#######################################################
# Implement scan-specific helper methods below.
+326
View File
@@ -0,0 +1,326 @@
"""
V4 implementation of the Debye XAS simple scan.
Scan procedure:
- prepare_scan
- open_scan
- stage
- pre_scan
- scan_core
- at_each_point (optionally called by scan_core)
- post_scan
- unstage
- close_scan
- on_exception (called if any exception is raised during the scan)
"""
from __future__ import annotations
import time
from typing import Annotated
import numpy as np
from bec_lib.device import DeviceBase
from bec_lib.scan_args import ScanArgument, Units
from bec_server.scan_server.scans.scan_base import ScanBase, ScanType
from bec_server.scan_server.scans.scan_modifier import scan_hook
class XasSimpleScan(ScanBase):
scan_type = ScanType.HARDWARE_TRIGGERED
scan_name = "xas_simple_scan"
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration", "monitored_readout_cycle"],
}
def __init__(
self,
# fmt: off
start: Annotated[float, ScanArgument(display_name="Start Energy", description="Start energy.", units=Units.eV, ge=4500, le=64000)],
stop: Annotated[float, ScanArgument(display_name="Stop Energy", description="Stop energy.", units=Units.eV, ge=4500, le=64000)],
scan_time: Annotated[float, ScanArgument(display_name="Scan Time", description="Time for one scan cycle.", units=Units.s, ge=0.05)],
scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Total scan duration.", units=Units.s, ge=0.05)],
motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None,
daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None,
monitored_readout_cycle: Annotated[float, ScanArgument(display_name="Monitored Readout Cycle", description="Delay between monitored readouts.",units=Units.s, gt=0)] = 1,
# fmt: on
**kwargs,
):
"""
Start a simple oscillating scan on the mono bragg motor.
Args:
start (float): Start energy.
stop (float): Stop energy.
scan_time (float): Time for one scan cycle.
scan_duration (float): Total scan duration.
motor (DeviceBase | None): Bragg motor device.
daq (DeviceBase | None): NIDAQ device.
monitored_readout_cycle (float): Delay between monitored readouts.
Returns:
ScanReport
"""
super().__init__(**kwargs)
self.start = start
self.stop = stop
self.scan_time = scan_time
self.scan_duration = scan_duration
self.motor = motor if motor is not None else self.dev["mo1_bragg"]
self.daq = daq if daq is not None else self.dev["nidaq"]
self.monitored_readout_cycle = monitored_readout_cycle
self.positions = np.array([self.start, self.stop], dtype=float)
# We pass on the arguments as "additional_scan_parameters" in the scan info
self.update_scan_info(
positions=self.positions,
scan_time=scan_time,
scan_duration=scan_duration,
monitored_readout_cycle=monitored_readout_cycle,
)
self.actions.set_device_readout_priority([self.daq], priority="async")
@scan_hook
def prepare_scan(self):
"""
Prepare the scan. This can include any steps that need to be executed
before the scan is opened, such as preparing the positions (if not done already)
or setting up the devices.
"""
self.actions.add_scan_report_instruction_device_progress(self.motor)
self._baseline_readout_status = self.actions.read_baseline_devices(wait=False)
@scan_hook
def open_scan(self):
"""
Open the scan.
This step must call self.actions.open_scan() to ensure that a new scan is
opened. Make sure to prepare the scan metadata before, either in
prepare_scan() or in open_scan() itself and call self.update_scan_info(...)
to update the scan metadata if needed.
"""
self.actions.open_scan()
@scan_hook
def stage(self):
"""
Stage the devices for the upcoming scan. The stage logic is typically
implemented on the device itself (i.e. by the device's stage method).
However, if there are any additional steps that need to be executed before
staging the devices, they can be implemented here.
"""
self.actions.stage_all_devices()
@scan_hook
def pre_scan(self):
"""
Pre-scan steps to be executed before the main scan logic.
This is typically the last chance to prepare the devices before the core scan
logic is executed. For example, this is a good place to initialize time-criticial
devices, e.g. devices that have a short timeout.
The pre-scan logic is typically implemented on the device itself.
"""
self.actions.pre_scan_all_devices()
@scan_hook
def scan_core(self):
"""
Core scan logic to be executed during the scan.
This is where the main scan logic should be implemented.
"""
self.actions.kickoff(self.motor)
completion_status = self.actions.complete(self.motor, wait=False)
while not completion_status.done:
self.at_each_point()
@scan_hook
def at_each_point(self):
"""
Logic to be executed at each acquisition point during the scan.
"""
self.actions.read_monitored_devices()
time.sleep(self.monitored_readout_cycle)
@scan_hook
def post_scan(self):
"""
Post-scan steps to be executed after the main scan logic.
"""
self.actions.complete_all_devices()
@scan_hook
def unstage(self):
"""Unstage the scan by executing post-scan steps."""
self.actions.unstage_all_devices()
@scan_hook
def close_scan(self):
"""Close the scan."""
if self._baseline_readout_status is not None:
self._baseline_readout_status.wait()
self.actions.close_scan()
self.actions.check_for_unchecked_statuses()
@scan_hook
def on_exception(self, exception: Exception):
"""
Handle exceptions that occur during the scan.
This is a good place to implement any cleanup logic that needs to be executed in case of an exception,
such as returning the devices to a safe state or moving the motors back to their starting position.
"""
self.actions.complete_all_devices(wait=False)
class XasSimpleScanWithXrd(XasSimpleScan):
scan_name = "xas_simple_scan_with_xrd"
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration", "monitored_readout_cycle"],
"Low Energy Break": ["break_enable_low", "break_time_low", "cycle_low"],
"High Energy Break": ["break_enable_high", "break_time_high", "cycle_high"],
"XRD Triggers": ["exp_time", "n_of_trigger"],
}
def __init__(
self,
# fmt: off
start: Annotated[float, ScanArgument(display_name="Start Energy", description="Start energy.", units=Units.eV)],
stop: Annotated[float, ScanArgument(display_name="Stop Energy", description="Stop energy.", units=Units.eV)],
scan_time: Annotated[float, ScanArgument(display_name="Scan Time", description="Time for one scan cycle.", units=Units.s, ge=0)],
scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Total scan duration.", units=Units.s, ge=0)],
break_enable_low: Annotated[bool, ScanArgument(display_name="Break Enable Low", description="Enable breaks for the low energy range.")],
break_time_low: Annotated[float, ScanArgument(display_name="Break Time Low", description="Break time for the low energy range.", units=Units.s, ge=0)],
cycle_low: Annotated[int, ScanArgument(display_name="Cycle Low", description="Use triggers every nth low-energy cycle.", ge=0)],
break_enable_high: Annotated[bool, ScanArgument(display_name="Break Enable High", description="Enable breaks for the high energy range.")],
break_time_high: Annotated[float, ScanArgument(display_name="Break Time High", description="Break time for the high energy range.", units=Units.s, ge=0)],
cycle_high: Annotated[int, ScanArgument(display_name="Cycle High", description="Use triggers every nth high-energy cycle.", ge=0)],
exp_time: Annotated[float, ScanArgument(display_name="Exposure Time", description="Length of one trigger period.", units=Units.s, ge=0)],
n_of_trigger: Annotated[int, ScanArgument(display_name="Number Of Trigger", description="Amount of triggers fired during a break.", ge=0)],
motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None,
daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None,
monitored_readout_cycle: Annotated[float, ScanArgument(display_name="Monitored Readout Cycle", description="Delay between monitored readouts.", units=Units.s, gt=0)] = 1,
**kwargs,
# fmt: on
):
super().__init__(
start=start,
stop=stop,
scan_time=scan_time,
scan_duration=scan_duration,
motor=motor,
daq=daq,
monitored_readout_cycle=monitored_readout_cycle,
**kwargs,
)
# We pass on the arguments as "additional_scan_parameters" in the scan info
self.update_scan_info(
break_enable_low=break_enable_low,
break_time_low=break_time_low,
cycle_low=cycle_low,
break_enable_high=break_enable_high,
break_time_high=break_time_high,
cycle_high=cycle_high,
exp_time=exp_time,
n_of_trigger=n_of_trigger,
)
class XasAdvancedScan(XasSimpleScan):
scan_name = "xas_advanced_scan"
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration", "monitored_readout_cycle"],
"Spline Parameters": ["p_kink", "e_kink"],
}
def __init__(
self,
# fmt: off
start: Annotated[float, ScanArgument(display_name="Start Energy", description="Start energy.", units=Units.eV)],
stop: Annotated[float, ScanArgument(display_name="Stop Energy", description="Stop energy.", units=Units.eV)],
scan_time: Annotated[float, ScanArgument(display_name="Scan Time", description="Time for one scan cycle.", units=Units.s, ge=0)],
scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Total scan duration.", units=Units.s, ge=0)],
p_kink: Annotated[float, ScanArgument(display_name="P Kink", description="Position of the kink.", ge=0)],
e_kink: Annotated[float, ScanArgument(display_name="E Kink", description="Energy of the kink.", units=Units.eV)],
motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None,
daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None,
monitored_readout_cycle: Annotated[float, ScanArgument(display_name="Monitored Readout Cycle", description="Delay between monitored readouts.", units=Units.s, gt=0)] = 1,
**kwargs,
# fmt: on
):
super().__init__(
start=start,
stop=stop,
scan_time=scan_time,
scan_duration=scan_duration,
motor=motor,
daq=daq,
monitored_readout_cycle=monitored_readout_cycle,
**kwargs,
)
# We pass on the arguments as "additional_scan_parameters" in the scan info
self.update_scan_info(p_kink=p_kink, e_kink=e_kink)
class XasAdvancedScanWithXrd(XasAdvancedScan):
scan_name = "xas_advanced_scan_with_xrd"
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration", "monitored_readout_cycle"],
"Spline Parameters": ["p_kink", "e_kink"],
"Low Energy Break": ["break_enable_low", "break_time_low", "cycle_low"],
"High Energy Break": ["break_enable_high", "break_time_high", "cycle_high"],
"XRD Triggers": ["exp_time", "n_of_trigger"],
}
def __init__(
self,
# fmt: off
start: Annotated[float, ScanArgument(display_name="Start Energy", description="Start energy.", units=Units.eV)],
stop: Annotated[float, ScanArgument(display_name="Stop Energy", description="Stop energy.", units=Units.eV)],
scan_time: Annotated[float, ScanArgument(display_name="Scan Time", description="Time for one scan cycle.", units=Units.s, ge=0)],
scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Total scan duration.", units=Units.s, ge=0)],
p_kink: Annotated[float, ScanArgument(display_name="P Kink", description="Position of the kink.", ge=0)],
e_kink: Annotated[float, ScanArgument(display_name="E Kink", description="Energy of the kink.", units=Units.eV)],
break_enable_low: Annotated[bool, ScanArgument(display_name="Break Enable Low", description="Enable breaks for the low energy range.")],
break_time_low: Annotated[float, ScanArgument(display_name="Break Time Low", description="Break time for the low energy range.", units=Units.s, ge=0)],
cycle_low: Annotated[int, ScanArgument(display_name="Cycle Low", description="Use triggers every nth low-energy cycle.", ge=0)],
break_enable_high: Annotated[bool, ScanArgument(display_name="Break Enable High", description="Enable breaks for the high energy range.")],
break_time_high: Annotated[float, ScanArgument(display_name="Break Time High", description="Break time for the high energy range.", units=Units.s, ge=0)],
cycle_high: Annotated[int, ScanArgument(display_name="Cycle High", description="Use triggers every nth high-energy cycle.", ge=0)],
exp_time: Annotated[float, ScanArgument(display_name="Exposure Time", description="Length of one trigger period.", units=Units.s, ge=0)],
n_of_trigger: Annotated[int, ScanArgument(display_name="Number Of Trigger", description="Amount of triggers fired during a break.", ge=0)],
motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None,
daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None,
monitored_readout_cycle: Annotated[float, ScanArgument(display_name="Monitored Readout Cycle", description="Delay between monitored readouts.", units=Units.s, gt=0)] = 1,
**kwargs,
# fmt: on
):
super().__init__(
start=start,
stop=stop,
scan_time=scan_time,
scan_duration=scan_duration,
p_kink=p_kink,
e_kink=e_kink,
motor=motor,
daq=daq,
monitored_readout_cycle=monitored_readout_cycle,
**kwargs,
)
# We pass on the arguments as "additional_scan_parameters" in the scan info
self.update_scan_info(
break_enable_low=break_enable_low,
break_time_low=break_time_low,
cycle_low=cycle_low,
break_enable_high=break_enable_high,
break_time_high=break_time_high,
cycle_high=cycle_high,
exp_time=exp_time,
n_of_trigger=n_of_trigger,
)
-54
View File
@@ -159,60 +159,6 @@ def test_set_control_settings(mock_bragg):
assert dev.scan_control.scan_duration.get() == 5
def test_update_scan_parameters(mock_bragg):
dev = mock_bragg
msg = ScanStatusMessage(
scan_id="my_scan_id",
status="closed",
request_inputs={
"inputs": {},
"kwargs": {
"start": 0,
"stop": 5,
"scan_time": 1,
"scan_duration": 10,
"xrd_enable_low": True,
"xrd_enable_high": False,
"num_trigger_low": 1,
"num_trigger_high": 7,
"exp_time_low": 1,
"exp_time_high": 3,
"cycle_low": 1,
"cycle_high": 5,
"p_kink": 50,
"e_kink": 8000,
},
},
info={
"kwargs": {
"start": 0,
"stop": 5,
"scan_time": 1,
"scan_duration": 10,
"xrd_enable_low": True,
"xrd_enable_high": False,
"num_trigger_low": 1,
"num_trigger_high": 7,
"exp_time_low": 1,
"exp_time_high": 3,
"cycle_low": 1,
"cycle_high": 5,
"p_kink": 50,
"e_kink": 8000,
}
},
metadata={},
)
mock_bragg.scan_info.msg = msg
scan_param = dev.scan_parameter.model_dump()
for _, v in scan_param.items():
assert v == None
dev._update_scan_parameter()
scan_param = dev.scan_parameter.model_dump()
for k, v in scan_param.items():
assert v == msg.content["request_inputs"]["kwargs"].get(k, None)
def test_kickoff_scan(mock_bragg):
dev = mock_bragg
dev.scan_control.scan_status._read_pv.mock_data = ScanControlScanStatus.READY
+28 -10
View File
@@ -6,6 +6,7 @@ from unittest import mock
import ophyd
import pytest
from bec_server.scan_server.scan_worker import ScanWorker
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd.status import WaitTimeoutError
from ophyd_devices.interfaces.base_classes.psi_device_base import DeviceStoppedError
from ophyd_devices.tests.utils import MockPV
@@ -15,6 +16,13 @@ from debye_bec.devices.nidaq.nidaq import Nidaq, NidaqError
# TODO move this function to ophyd_devices, it is duplicated in csaxs_bec and needed for other pluging repositories
from debye_bec.devices.test_utils.utils import patch_dual_pvs
from debye_bec.devices.utils.utils import fetch_scan_info
@pytest.fixture(scope="function")
def scan_info_mock():
"""Fixture for the ScanInfo object."""
return ScanServerScanInfo(scan_name="xas_simple_scan", scan_id="test")
@pytest.fixture(scope="function")
@@ -52,13 +60,17 @@ def test_init(mock_nidaq):
]
def test_check_if_scan_name_is_valid(mock_nidaq):
def test_check_if_scan_name_is_valid(mock_nidaq, scan_info_mock):
"""Test the check_if_scan_name_is_valid method."""
dev = mock_nidaq
dev.scan_info.msg.scan_name = "xas_simple_scan"
assert dev._check_if_scan_name_is_valid()
dev.scan_info.msg.scan_name = "invalid_scan_name"
assert not dev._check_if_scan_name_is_valid()
scan_info_mock.scan_name = "xas_simple_scan"
dev.scan_info.msg.info.update(scan_info_mock.model_dump())
scan_parameters = fetch_scan_info(dev.scan_info)
assert dev._check_if_scan_name_is_valid(scan_parameters)
scan_info_mock.scan_name = "invalid_scan_name"
dev.scan_info.msg.info.update(scan_info_mock.model_dump())
scan_parameters = fetch_scan_info(dev.scan_info)
assert not dev._check_if_scan_name_is_valid(scan_parameters)
def test_set_config(mock_nidaq):
@@ -120,11 +132,13 @@ def test_on_unstage(mock_nidaq):
("nidaq_continuous_scan", False, 0),
],
)
def test_on_pre_scan(mock_nidaq, scan_name, raise_error, nidaq_state):
def test_on_pre_scan(mock_nidaq, scan_name, raise_error, nidaq_state, scan_info_mock):
"""Test the on_pre_scan method of the Nidaq device."""
dev = mock_nidaq
dev.state.put(nidaq_state)
dev.scan_info.msg.scan_name = scan_name
scan_info_mock.scan_name = scan_name
dev.scan_info.msg.info.update(scan_info_mock.model_dump())
dev.scan_parameters = fetch_scan_info(dev.scan_info)
dev._timeout_wait_for_pv = 0.1 # Set a short timeout for testing
if not raise_error:
dev.pre_scan()
@@ -133,11 +147,13 @@ def test_on_pre_scan(mock_nidaq, scan_name, raise_error, nidaq_state):
dev.pre_scan()
def test_on_complete(mock_nidaq):
def test_on_complete(mock_nidaq, scan_info_mock):
"""Test the on_complete method of the Nidaq device."""
dev = mock_nidaq
scan_info_mock.scan_name = "nidaq_continuous_scan"
dev.scan_info.msg.info.update(scan_info_mock.model_dump())
dev.scan_parameters = fetch_scan_info(dev.scan_info)
# Check for nidaq_continuous_scan
dev.scan_info.msg.scan_name = "nidaq_continuous_scan"
dev.state.put(0) # Set state to DISABLED
status = dev.complete()
assert status.done is False
@@ -147,7 +163,9 @@ def test_on_complete(mock_nidaq):
assert status.done is True
# Check for XAS simple scan
dev.scan_info.msg.scan_name = "xas_simple_scan"
scan_info_mock.scan_name = "xas_simple_scan"
dev.scan_info.msg.info.update(scan_info_mock.model_dump())
dev.scan_parameters = fetch_scan_info(dev.scan_info)
dev.state.put(0) # Set state to ACQUIRE
dev.stop_call.put(0)
dev._timeout_wait_for_pv = 5
+42 -28
View File
@@ -7,6 +7,9 @@ import ophyd
import pytest
from bec_lib.messages import ScanStatusMessage
from bec_server.scan_server.scan_worker import ScanWorker
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from bec_server.scan_server.tests.scan_fixtures import *
from bec_server.scan_server.tests.scan_fixtures import _MockDevice
from ophyd_devices import CompareStatus, DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import DeviceStoppedError
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
@@ -20,6 +23,7 @@ from debye_bec.devices.pilatus.pilatus import (
TRIGGERMODE,
Pilatus,
)
from debye_bec.devices.utils.utils import fetch_scan_info
if TYPE_CHECKING: # pragma no cover
from bec_lib.messages import FileMessage
@@ -34,32 +38,38 @@ if TYPE_CHECKING: # pragma no cover
@pytest.fixture(
scope="function",
params=[
(0.1, 1, 1, "line_scan", "step"),
(0.2, 2, 2, "time_scan", "step"),
(0.5, 5, 5, "xas_advanced_scan", "fly"),
(("samx", 0.1, 1, 5, "samy", 0, 1, 5), {"relative": True}, "_v4_hexagonal_scan"),
((1, 0.2), {}, "_v4_time_scan"),
((9000, 10000, 1, 20, 0.1, 9500), {}, "xas_advanced_scan"),
],
)
def mock_scan_info(request, tmpdir):
exp_time, frames_per_trigger, num_points, scan_name, scan_type = request.param
scan_info = ScanStatusMessage(
scan_id="test_id",
status="open",
scan_type=scan_type,
scan_number=1,
scan_parameters={
"exp_time": exp_time,
"frames_per_trigger": frames_per_trigger,
"system_config": {},
},
info={"file_components": (f"{tmpdir}/data/S00000/S000001", "h5")},
num_points=num_points,
scan_name=scan_name,
)
yield scan_info
def mock_scan_info(request, tmpdir, v4_scan_assembler, device_manager):
args, kwargs, scan_name = request.param
mo1_bragg = _MockDevice(name="mo1_bragg")
nidaq = _MockDevice(name="nidaq")
device_manager.add_device(mo1_bragg)
device_manager.add_device(nidaq)
scan = v4_scan_assembler(scan_name, *args, **kwargs)
yield scan.scan_info
@pytest.fixture(scope="function")
def pilatus(mock_scan_info) -> Generator[Pilatus, None, None]:
def mock_scan_status_message(mock_scan_info, tmpdir) -> ScanStatusMessage:
info = mock_scan_info.model_dump()
info.update({"file_components": (f"{tmpdir}/data/S00000/S000001", "h5")})
return ScanStatusMessage(
scan_id=mock_scan_info.scan_id,
status="open",
scan_number=1,
scan_name=mock_scan_info.scan_name,
scan_type="fly" if mock_scan_info.scan_type == "hardware_triggered" else "step",
num_points=mock_scan_info.num_points,
info=info,
)
@pytest.fixture(scope="function")
def pilatus(mock_scan_status_message) -> Generator[Pilatus, None, None]:
name = "pilatus"
prefix = "X01DA-OP-MO1:PILATUS:"
with mock.patch.object(ophyd, "cl") as mock_cl:
@@ -70,8 +80,9 @@ def pilatus(mock_scan_info) -> Generator[Pilatus, None, None]:
# dev.image1 = mock.MagicMock()
# with mock.patch.object(dev, "image1"):
with mock.patch.object(dev, "task_handler"):
dev.scan_info.msg = mock_scan_info
dev.scan_info.msg = mock_scan_status_message
try:
dev.scan_parameters = fetch_scan_info(dev.scan_info)
yield dev
finally:
try:
@@ -177,7 +188,6 @@ def test_pilatus_on_trigger_cancel_on_stop(pilatus):
def test_pilatus_on_complete(pilatus: Pilatus):
"""Test the on_complete logic of the Pilatus detector."""
if pilatus.scan_info.msg.scan_name.startswith("xas"):
# TODO add test cases for xas scans
# status = pilatus.complete()
@@ -196,8 +206,9 @@ def test_pilatus_on_complete(pilatus: Pilatus):
pilatus.cam.acquire._read_pv.mock_data = ACQUIREMODE.ACQUIRING.value
pilatus.hdf.capture._read_pv.mock_data = ACQUIREMODE.ACQUIRING.value
pilatus.cam.armed._read_pv.mock_data = DETECTORSTATE.ARMED.value
num_images = pilatus.scan_info.msg.num_points * pilatus.scan_info.msg.scan_parameters.get(
"frames_per_trigger", 1
num_images = (
pilatus.scan_parameters.num_points
* pilatus.scan_parameters.additional_scan_parameters.get("frames_per_trigger", 1)
)
pilatus.hdf.num_captured._read_pv.mock_data = num_images - 1
# Call on complete
@@ -275,9 +286,12 @@ def test_pilatus_on_complete(pilatus: Pilatus):
def test_pilatus_on_stage_raises_low_exp_time(pilatus):
"""Test that on_stage raises a ValueError if the exposure time is too low."""
pilatus.scan_info.msg.scan_parameters["exp_time"] = 0.09
scan_msg = pilatus.scan_info.msg
if scan_msg.scan_type != "step" and scan_msg.scan_name not in pilatus.xas_xrd_scan_names:
pilatus.scan_info.msg.info["exp_time"] = 0.09
pilatus.scan_parameters = fetch_scan_info(pilatus.scan_info)
if (
pilatus.scan_parameters.scan_type != "software_triggered"
and pilatus.scan_parameters.scan_name not in pilatus.xas_xrd_scan_names
):
return
with pytest.raises(ValueError):
pilatus.on_stage()
+4 -4
View File
@@ -3,10 +3,10 @@ from functools import partial
import pytest
from bec_server.device_server.tests.utils import DeviceMockType, DMMock
from bec_server.scan_server.tests.fixtures import (
ScanStubStatusMock,
connector_mock,
instruction_handler_mock,
from bec_server.scan_server.tests.scan_fixtures import (
nth_done_status_mock,
readout_priority,
v4_scan_assembler,
)
-429
View File
@@ -1,429 +0,0 @@
# pylint: skip-file
from unittest import mock
from bec_lib.messages import DeviceInstructionMessage
from bec_server.device_server.tests.utils import DMMock
from debye_bec.scans import (
XASAdvancedScan,
XASAdvancedScanWithXRD,
XASSimpleScan,
XASSimpleScanWithXRD,
)
def get_instructions(request, ScanStubStatusMock):
request.metadata["RID"] = "my_test_request_id"
def fake_done():
"""
Fake done function for ScanStubStatusMock. Upon each call, it returns the next value from the generator.
This is used to simulate the completion of the scan.
"""
yield False
yield False
yield True
def fake_complete(*args, **kwargs):
yield "fake_complete"
return ScanStubStatusMock(done_func=fake_done)
with (
mock.patch.object(request.stubs, "complete", side_effect=fake_complete),
mock.patch.object(request.stubs, "_get_result_from_status", return_value=None),
):
reference_commands = list(request.run())
for cmd in reference_commands:
if not cmd or isinstance(cmd, str):
continue
if "RID" in cmd.metadata:
cmd.metadata["RID"] = "my_test_request_id"
if "rpc_id" in cmd.parameter:
cmd.parameter["rpc_id"] = "my_test_rpc_id"
cmd.metadata.pop("device_instr_id", None)
return reference_commands
def test_xas_simple_scan(scan_assembler, ScanStubStatusMock):
request = scan_assembler(XASSimpleScan, start=0, stop=5, scan_time=1, scan_duration=10)
request.device_manager.add_device("nidaq")
reference_commands = get_instructions(request, ScanStubStatusMock)
assert reference_commands == [
None,
None,
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="scan_report_instruction",
parameter={"device_progress": ["mo1_bragg"]},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="open_scan",
parameter={
"readout_priority": {
"monitored": [],
"baseline": [],
"on_request": [],
"async": ["nidaq"],
},
"num_points": None,
"positions": [0.0, 5.0],
"scan_name": "xas_simple_scan",
"scan_type": "fly",
},
),
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
action="stage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
device=["samx"],
action="read",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="pre_scan",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="kickoff",
parameter={"configure": {}},
),
"fake_complete",
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
device=["bpm4i", "eiger", "mo1_bragg"],
action="read",
parameter={"group": "monitored"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
device=["bpm4i", "eiger", "mo1_bragg"],
action="read",
parameter={"group": "monitored"},
),
"fake_complete",
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="unstage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="close_scan",
parameter={},
),
]
def test_xas_simple_scan_with_xrd(scan_assembler, ScanStubStatusMock):
request = scan_assembler(
XASSimpleScanWithXRD,
start=0,
stop=5,
scan_time=1,
scan_duration=10,
break_enable_low=True,
break_time_low=1,
cycle_low=1,
break_enable_high=True,
break_time_high=2,
exp_time=1,
n_of_trigger=1,
cycle_high=4,
)
request.device_manager.add_device("nidaq")
reference_commands = get_instructions(request, ScanStubStatusMock)
# TODO #64 based on creating this ScanStatusMessage, we should test the logic of stage/kickoff/complete/unstage in Pilatus and mo1Bragg
assert reference_commands == [
None,
None,
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="scan_report_instruction",
parameter={"device_progress": ["mo1_bragg"]},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="open_scan",
parameter={
"readout_priority": {
"monitored": [],
"baseline": [],
"on_request": [],
"async": ["nidaq"],
},
"num_points": None,
"positions": [0.0, 5.0],
"scan_name": "xas_simple_scan_with_xrd",
"scan_type": "fly",
},
),
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
action="stage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
device=["samx"],
action="read",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="pre_scan",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="kickoff",
parameter={"configure": {}},
),
"fake_complete",
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
device=["bpm4i", "eiger", "mo1_bragg"],
action="read",
parameter={"group": "monitored"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
device=["bpm4i", "eiger", "mo1_bragg"],
action="read",
parameter={"group": "monitored"},
),
"fake_complete",
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="unstage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="close_scan",
parameter={},
),
]
def test_xas_advanced_scan(scan_assembler, ScanStubStatusMock):
request = scan_assembler(
XASAdvancedScan,
start=8000,
stop=9000,
scan_time=1,
scan_duration=10,
p_kink=50,
e_kink=8500,
)
request.device_manager.add_device("nidaq")
reference_commands = get_instructions(request, ScanStubStatusMock)
assert reference_commands == [
None,
None,
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="scan_report_instruction",
parameter={"device_progress": ["mo1_bragg"]},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="open_scan",
parameter={
"readout_priority": {
"monitored": [],
"baseline": [],
"on_request": [],
"async": ["nidaq"],
},
"num_points": None,
"positions": [8000.0, 9000.0],
"scan_name": "xas_advanced_scan",
"scan_type": "fly",
},
),
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
action="stage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
device=["samx"],
action="read",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="pre_scan",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="kickoff",
parameter={"configure": {}},
),
"fake_complete",
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
device=["bpm4i", "eiger", "mo1_bragg"],
action="read",
parameter={"group": "monitored"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
device=["bpm4i", "eiger", "mo1_bragg"],
action="read",
parameter={"group": "monitored"},
),
"fake_complete",
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="unstage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="close_scan",
parameter={},
),
]
def test_xas_advanced_scan_with_xrd(scan_assembler, ScanStubStatusMock):
request = scan_assembler(
XASAdvancedScanWithXRD,
start=8000,
stop=9000,
scan_time=1,
scan_duration=10,
p_kink=50,
e_kink=8500,
break_enable_low=True,
break_time_low=1,
cycle_low=1,
break_enable_high=True,
break_time_high=2,
exp_time=1,
n_of_trigger=1,
cycle_high=4,
)
request.device_manager.add_device("nidaq")
reference_commands = get_instructions(request, ScanStubStatusMock)
assert reference_commands == [
None,
None,
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="scan_report_instruction",
parameter={"device_progress": ["mo1_bragg"]},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="open_scan",
parameter={
"readout_priority": {
"monitored": [],
"baseline": [],
"on_request": [],
"async": ["nidaq"],
},
"num_points": None,
"positions": [8000.0, 9000.0],
"scan_name": "xas_advanced_scan_with_xrd",
"scan_type": "fly",
},
),
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
action="stage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
device=["samx"],
action="read",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="pre_scan",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="kickoff",
parameter={"configure": {}},
),
"fake_complete",
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
device=["bpm4i", "eiger", "mo1_bragg"],
action="read",
parameter={"group": "monitored"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
device=["bpm4i", "eiger", "mo1_bragg"],
action="read",
parameter={"group": "monitored"},
),
"fake_complete",
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="unstage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="close_scan",
parameter={},
),
]
@@ -0,0 +1,152 @@
# pylint: skip-file
from unittest import mock
import numpy as np
import pytest
from bec_server.scan_server.tests.scan_fixtures import *
from bec_server.scan_server.tests.scan_hook_tests import *
XAS_SIMPLE_SCAN_DEFAULT_HOOK_TESTS = [
("prepare_scan", [assert_prepare_scan_reads_baseline_devices]),
("open_scan", [assert_scan_open_called]),
("stage", [assert_stage_all_devices_called]),
("pre_scan", [assert_pre_scan_called]),
("unstage", [assert_unstage_all_devices_called]),
("close_scan", [assert_close_scan_waits_for_baseline_and_closes]),
]
def _assemble_xas_simple_scan(v4_scan_assembler, **overrides):
params = {
"start": 8000.0,
"stop": 9000.0,
"scan_time": 1.0,
"scan_duration": 10.0,
"motor": "mo1_bragg",
"daq": "nidaq",
"monitored_readout_cycle": 1.0,
}
params.update(overrides)
return v4_scan_assembler("xas_simple_scan", **params)
@pytest.mark.parametrize(("hook_name", "hook_tests"), XAS_SIMPLE_SCAN_DEFAULT_HOOK_TESTS)
def test_xas_simple_scan_v4_default_hooks(
v4_scan_assembler, nth_done_status_mock, hook_name, hook_tests
):
scan = _assemble_xas_simple_scan(v4_scan_assembler)
run_scan_tests(scan, [(hook_name, hook_tests)], nth_done_status_mock=nth_done_status_mock)
def test_xas_simple_scan_v4_prepare_scan_updates_metadata(v4_scan_assembler):
scan = _assemble_xas_simple_scan(v4_scan_assembler)
scan.actions.add_scan_report_instruction_device_progress = mock.MagicMock()
baseline_status = mock.MagicMock()
scan.actions.read_baseline_devices = mock.MagicMock(return_value=baseline_status)
scan.prepare_scan()
scan.actions._build_scan_status_message("open")
np.testing.assert_array_equal(scan.scan_info.positions, np.array([8000.0, 9000.0]))
assert scan.scan_info.additional_scan_parameters["scan_time"] == 1.0
assert scan.scan_info.additional_scan_parameters["scan_duration"] == 10.0
assert scan.scan_info.readout_priority_modification["async"] == ["nidaq"]
scan.actions.add_scan_report_instruction_device_progress.assert_called_once_with(scan.motor)
scan.actions.read_baseline_devices.assert_called_once_with(wait=False)
assert scan._baseline_readout_status is baseline_status
def test_xas_simple_scan_v4_scan_core_reads_until_complete(v4_scan_assembler, nth_done_status_mock):
scan = _assemble_xas_simple_scan(v4_scan_assembler)
completion_status = nth_done_status_mock(resolve_after=3)
scan.actions.kickoff = mock.MagicMock()
scan.actions.complete = mock.MagicMock(return_value=completion_status)
scan.actions.read_monitored_devices = mock.MagicMock()
with mock.patch("debye_bec.scans.xas_simple_scan.time.sleep"):
scan.scan_core()
scan.actions.kickoff.assert_called_once_with(scan.motor)
scan.actions.complete.assert_called_once_with(scan.motor, wait=False)
assert scan.actions.read_monitored_devices.call_count == 2
def test_xas_simple_scan_v4_post_scan_completes_all_devices(v4_scan_assembler):
scan = _assemble_xas_simple_scan(v4_scan_assembler)
scan.actions.complete_all_devices = mock.MagicMock()
scan.post_scan()
scan.actions.complete_all_devices.assert_called_once_with()
def test_xas_simple_scan_with_xrd_v4_updates_xrd_metadata(v4_scan_assembler):
scan = v4_scan_assembler(
"xas_simple_scan_with_xrd",
start=8000.0,
stop=9000.0,
scan_time=1.0,
scan_duration=10.0,
break_enable_low=True,
break_time_low=1.0,
cycle_low=2,
break_enable_high=False,
break_time_high=3.0,
cycle_high=4,
exp_time=0.5,
n_of_trigger=6,
motor="mo1_bragg",
daq="nidaq",
)
assert scan.scan_name == "xas_simple_scan_with_xrd"
assert scan.scan_info.additional_scan_parameters["break_enable_low"] is True
assert scan.scan_info.additional_scan_parameters["cycle_high"] == 4
assert scan.scan_info.additional_scan_parameters["n_of_trigger"] == 6
def test_xas_advanced_scan_v4_updates_spline_metadata(v4_scan_assembler):
scan = v4_scan_assembler(
"xas_advanced_scan",
start=8000.0,
stop=9000.0,
scan_time=1.0,
scan_duration=10.0,
p_kink=50.0,
e_kink=8500.0,
motor="mo1_bragg",
daq="nidaq",
)
assert scan.scan_name == "xas_advanced_scan"
assert scan.scan_info.additional_scan_parameters["p_kink"] == 50.0
assert scan.scan_info.additional_scan_parameters["e_kink"] == 8500.0
def test_xas_advanced_scan_with_xrd_v4_updates_all_metadata(v4_scan_assembler):
scan = v4_scan_assembler(
"xas_advanced_scan_with_xrd",
start=8000.0,
stop=9000.0,
scan_time=1.0,
scan_duration=10.0,
p_kink=55.0,
e_kink=8450.0,
break_enable_low=True,
break_time_low=1.5,
cycle_low=2,
break_enable_high=True,
break_time_high=2.5,
cycle_high=3,
exp_time=0.25,
n_of_trigger=8,
motor="mo1_bragg",
daq="nidaq",
)
assert scan.scan_name == "xas_advanced_scan_with_xrd"
assert scan.scan_info.additional_scan_parameters["p_kink"] == 55.0
assert scan.scan_info.additional_scan_parameters["break_enable_high"] is True
assert scan.scan_info.exp_time == 0.25
+66 -117
View File
@@ -1,126 +1,75 @@
# pylint: skip-file
from unittest import mock
from bec_lib.messages import DeviceInstructionMessage
from bec_server.device_server.tests.utils import DMMock
import pytest
from bec_server.scan_server.tests.scan_fixtures import *
from bec_server.scan_server.tests.scan_hook_tests import *
from debye_bec.scans import NIDAQContinuousScan
NIDAQ_CONTINUOUS_SCAN_DEFAULT_HOOK_TESTS = [
("prepare_scan", [assert_prepare_scan_reads_baseline_devices]),
("open_scan", [assert_scan_open_called]),
("stage", [assert_stage_all_devices_called]),
("pre_scan", [assert_pre_scan_called]),
("unstage", [assert_unstage_all_devices_called]),
("close_scan", [assert_close_scan_waits_for_baseline_and_closes]),
]
def get_instructions(request, ScanStubStatusMock):
request.metadata["RID"] = "my_test_request_id"
def fake_done():
"""
Fake done function for ScanStubStatusMock. Upon each call, it returns the next value from the generator.
This is used to simulate the completion of the scan.
"""
yield False
yield False
yield True
def fake_complete(*args, **kwargs):
yield "fake_complete"
return ScanStubStatusMock(done_func=fake_done)
with (
mock.patch.object(request.stubs, "complete", side_effect=fake_complete),
mock.patch.object(request.stubs, "_get_result_from_status", return_value=None),
):
reference_commands = list(request.run())
for cmd in reference_commands:
if not cmd or isinstance(cmd, str):
continue
if "RID" in cmd.metadata:
cmd.metadata["RID"] = "my_test_request_id"
if "rpc_id" in cmd.parameter:
cmd.parameter["rpc_id"] = "my_test_rpc_id"
cmd.metadata.pop("device_instr_id", None)
return reference_commands
def _assemble_nidaq_continuous_scan(v4_scan_assembler, **overrides):
params = {"scan_duration": 10.0, "daq": "nidaq", "compression": False}
params.update(overrides)
return v4_scan_assembler("nidaq_continuous_scan", **params)
def test_xas_simple_scan(scan_assembler, ScanStubStatusMock):
@pytest.mark.parametrize(("hook_name", "hook_tests"), NIDAQ_CONTINUOUS_SCAN_DEFAULT_HOOK_TESTS)
def test_nidaq_continuous_scan_v4_default_hooks(
v4_scan_assembler, nth_done_status_mock, hook_name, hook_tests
):
scan = _assemble_nidaq_continuous_scan(v4_scan_assembler)
request = scan_assembler(NIDAQContinuousScan, scan_duration=10)
request.device_manager.add_device("nidaq")
reference_commands = get_instructions(request, ScanStubStatusMock)
assert reference_commands == [
None,
None,
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="scan_report_instruction",
parameter={"device_progress": ["nidaq"]},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="open_scan",
parameter={
"readout_priority": {
"monitored": [],
"baseline": [],
"on_request": [],
"async": ["nidaq"],
},
"num_points": 0,
"positions": [],
"scan_name": "nidaq_continuous_scan",
"scan_type": "fly",
},
),
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
action="stage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
device=["samx"],
action="read",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="pre_scan",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="nidaq",
action="kickoff",
parameter={"configure": {}},
),
"fake_complete",
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
device=["bpm4i", "eiger", "mo1_bragg"],
action="read",
parameter={"group": "monitored"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
device=["bpm4i", "eiger", "mo1_bragg"],
action="read",
parameter={"group": "monitored"},
),
"fake_complete",
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="unstage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="close_scan",
parameter={},
),
]
run_scan_tests(scan, [(hook_name, hook_tests)], nth_done_status_mock=nth_done_status_mock)
def test_nidaq_continuous_scan_v4_prepare_scan_updates_metadata(v4_scan_assembler):
scan = _assemble_nidaq_continuous_scan(v4_scan_assembler)
scan.actions.add_scan_report_instruction_device_progress = mock.MagicMock()
baseline_status = mock.MagicMock()
scan.actions.read_baseline_devices = mock.MagicMock(return_value=baseline_status)
scan.prepare_scan()
scan.actions._build_scan_status_message("open")
assert scan.scan_info.additional_scan_parameters["scan_duration"] == 10.0
assert scan.scan_info.additional_scan_parameters["compression"] is False
assert scan.scan_info.readout_priority_modification["async"] == ["nidaq"]
scan.actions.add_scan_report_instruction_device_progress.assert_called_once_with(scan.daq)
scan.actions.read_baseline_devices.assert_called_once_with(wait=False)
assert scan._baseline_readout_status is baseline_status
def test_nidaq_continuous_scan_v4_scan_core_reads_until_complete(
v4_scan_assembler, nth_done_status_mock
):
scan = _assemble_nidaq_continuous_scan(v4_scan_assembler)
kickoff_status = mock.MagicMock()
completion_status = nth_done_status_mock(resolve_after=3)
scan.actions.kickoff = mock.MagicMock(return_value=kickoff_status)
scan.actions.complete = mock.MagicMock(return_value=completion_status)
scan.actions.read_monitored_devices = mock.MagicMock()
with mock.patch("debye_bec.scans.nidaq_continuous_scan.time.sleep"):
scan.scan_core()
scan.actions.kickoff.assert_called_once_with(device=scan.daq, wait=False)
kickoff_status.wait.assert_called_once_with(timeout=5)
scan.actions.complete.assert_called_once_with(device=scan.daq, wait=False)
assert scan.actions.read_monitored_devices.call_count == 2
def test_nidaq_continuous_scan_v4_post_scan_completes_all_devices(v4_scan_assembler):
scan = _assemble_nidaq_continuous_scan(v4_scan_assembler)
scan.actions.complete_all_devices = mock.MagicMock()
scan.post_scan()
scan.actions.complete_all_devices.assert_called_once_with()