Compare commits
15 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 40309491b0 | |||
| 2f0265fff7 | |||
| 2d21eb90fe | |||
| 8ddf67e817 | |||
| 617cca71a5 | |||
| 7e20d46881 | |||
| 74ff173f98 | |||
| 87758710d9 | |||
| 8bc36ed6a2 | |||
| 78d58ad26f | |||
| 359ef0b6d7 | |||
| 262a0b6318 | |||
| 98d5c22667 | |||
| 0e77dd5679 | |||
| f8e5b5e073 |
@@ -2,14 +2,17 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import cv2
|
||||
import threading
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
import cv2
|
||||
from bec_lib.file_utils import get_full_path
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
|
||||
from ophyd_devices import DeviceStatus
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
|
||||
from debye_bec.devices.utils.utils import fetch_scan_info
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
@@ -21,6 +24,7 @@ CAM_USERNAME = "camera_user"
|
||||
CAM_PASSWORD = "camera_user1"
|
||||
CAM_PORT = 554
|
||||
|
||||
|
||||
class HutchCam(PSIDeviceBase):
|
||||
"""Class for the Hutch Cameras"""
|
||||
|
||||
@@ -28,7 +32,7 @@ class HutchCam(PSIDeviceBase):
|
||||
|
||||
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
|
||||
super().__init__(name=name, scan_info=scan_info, **kwargs)
|
||||
|
||||
self.scan_parameters: ScanServerScanInfo = None
|
||||
self.hostname = prefix
|
||||
self.status = None
|
||||
|
||||
@@ -47,33 +51,36 @@ class HutchCam(PSIDeviceBase):
|
||||
|
||||
def on_stage(self) -> DeviceStatus:
|
||||
"""Called while staging the device."""
|
||||
|
||||
scan_msg: ScanStatusMessage = self.scan_info.msg
|
||||
file_path = get_full_path(scan_msg, name='hutch_cam_' + self.hostname).removesuffix('h5')
|
||||
self.scan_parameters = fetch_scan_info(self.scan_info)
|
||||
file_path = get_full_path(self.scan_info, name="hutch_cam_" + self.hostname).removesuffix(
|
||||
"h5"
|
||||
)
|
||||
|
||||
self.status = DeviceStatus(self)
|
||||
|
||||
thread = threading.Thread(target=self._save_picture, args=(file_path, self.status), daemon=True)
|
||||
thread = threading.Thread(
|
||||
target=self._save_picture, args=(file_path, self.status), daemon=True
|
||||
)
|
||||
thread.start()
|
||||
|
||||
return self.status
|
||||
|
||||
def _save_picture(self, file_path, status):
|
||||
try:
|
||||
logger.info(f'Capture from camera {self.hostname}')
|
||||
logger.info(f"Capture from camera {self.hostname}")
|
||||
rtsp_url = f"rtsp://{CAM_USERNAME}:{CAM_PASSWORD}@{self.hostname}.psi.ch:{CAM_PORT}/rtpstream/config1"
|
||||
cap = cv2.VideoCapture(f"{rtsp_url}?tcp")
|
||||
if not cap.isOpened():
|
||||
logger.error("Connection Failed", "Could not connect to the camera stream.")
|
||||
return
|
||||
logger.info(f'Connection to camera {self.hostname} established')
|
||||
logger.info(f"Connection to camera {self.hostname} established")
|
||||
ret, frame = cap.readAsync()
|
||||
cap.release()
|
||||
if not ret:
|
||||
logger.error("Capture Failed", "Failed to capture image from camera.")
|
||||
return
|
||||
cv2.imwrite(file_path + 'png', frame)
|
||||
cv2.imwrite(file_path + "png", frame)
|
||||
status.set_finished()
|
||||
logger.info(f'Capture from camera {self.hostname} done')
|
||||
logger.info(f"Capture from camera {self.hostname} done")
|
||||
except Exception as e:
|
||||
status.set_exception(e)
|
||||
|
||||
@@ -13,6 +13,7 @@ from typing import Literal
|
||||
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import DeviceStatus, StatusBase
|
||||
from ophyd.status import WaitTimeoutError
|
||||
@@ -33,6 +34,7 @@ from debye_bec.devices.mo1_bragg.mo1_bragg_enums import (
|
||||
TriggerControlSource,
|
||||
)
|
||||
from debye_bec.devices.mo1_bragg.mo1_bragg_utils import compute_spline
|
||||
from debye_bec.devices.utils.utils import fetch_scan_info
|
||||
|
||||
# Initialise logger
|
||||
logger = bec_logger.logger
|
||||
@@ -44,36 +46,6 @@ class Mo1BraggError(Exception):
|
||||
"""Exception for the Mo1 Bragg positioner"""
|
||||
|
||||
|
||||
########## Scan Parameter Model ##########
|
||||
|
||||
|
||||
class ScanParameter(BaseModel):
|
||||
"""Dataclass to store the scan parameters for the Mo1 Bragg positioner.
|
||||
This needs to be in sync with the kwargs of the MO1 Bragg scans from Debye, to
|
||||
ensure that the scan parameters are correctly set. Any changes in the scan kwargs,
|
||||
i.e. renaming or adding new parameters, need to be represented here as well."""
|
||||
|
||||
scan_time: float | None = Field(None, description="Scan time for a half oscillation")
|
||||
scan_duration: float | None = Field(None, description="Duration of the scan")
|
||||
break_enable_low: bool | None = Field(
|
||||
None, description="Break enabled for low, should be PV trig_ena_lo_enum"
|
||||
) # trig_enable_low: bool = None
|
||||
break_enable_high: bool | None = Field(
|
||||
None, description="Break enabled for high, should be PV trig_ena_hi_enum"
|
||||
) # trig_enable_high: bool = None
|
||||
break_time_low: float | None = Field(None, description="Break time low energy/angle")
|
||||
break_time_high: float | None = Field(None, description="Break time high energy/angle")
|
||||
cycle_low: int | None = Field(None, description="Cycle for low energy/angle")
|
||||
cycle_high: int | None = Field(None, description="Cycle for high energy/angle")
|
||||
exp_time: float | None = Field(None, description="XRD trigger period")
|
||||
n_of_trigger: int | None = Field(None, description="Amount of XRD triggers")
|
||||
start: float | None = Field(None, description="Start value for energy/angle")
|
||||
stop: float | None = Field(None, description="Stop value for energy/angle")
|
||||
p_kink: float | None = Field(None, description="P Kink")
|
||||
e_kink: float | None = Field(None, description="Energy Kink")
|
||||
model_config: dict = {"validate_assignment": True}
|
||||
|
||||
|
||||
########### Mo1 Bragg Motor Class ###########
|
||||
|
||||
|
||||
@@ -85,7 +57,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
|
||||
progress_signal = Cpt(ProgressSignal, name="progress_signal")
|
||||
|
||||
USER_ACCESS = ["set_advanced_xas_settings", "set_xtal"]
|
||||
USER_ACCESS = ["set_advanced_xas_settings", "set_xtal", "convert_angle_energy"]
|
||||
|
||||
def __init__(self, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): # type: ignore
|
||||
"""
|
||||
@@ -96,8 +68,15 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
scan_info (ScanInfo): The scan info to use.
|
||||
"""
|
||||
super().__init__(name=name, scan_info=scan_info, prefix=prefix, **kwargs)
|
||||
self.scan_parameter = ScanParameter()
|
||||
self.scan_parameters: ScanServerScanInfo = None
|
||||
self.timeout_for_pvwait = 7.5
|
||||
self.valid_scan_names = [
|
||||
"xas_simple_scan",
|
||||
"xas_simple_scan_with_xrd",
|
||||
"xas_advanced_scan",
|
||||
"xas_advanced_scan_with_xrd",
|
||||
"nidaq_continuous_scan",
|
||||
]
|
||||
|
||||
########################################
|
||||
# Beamline Specific Implementations #
|
||||
@@ -124,94 +103,187 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
|
||||
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
|
||||
"""
|
||||
self.scan_parameters = fetch_scan_info(self.scan_info)
|
||||
if self.scan_control.scan_msg.get() != ScanControlLoadMessage.PENDING:
|
||||
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
|
||||
self.cancel_on_stop(status)
|
||||
self.scan_control.scan_val_reset.put(1)
|
||||
status.wait(timeout=self.timeout_for_pvwait)
|
||||
|
||||
scan_name = self.scan_info.msg.scan_name
|
||||
self._update_scan_parameter()
|
||||
if scan_name == "xas_simple_scan":
|
||||
self.set_xas_settings(
|
||||
low=self.scan_parameter.start,
|
||||
high=self.scan_parameter.stop,
|
||||
scan_time=self.scan_parameter.scan_time,
|
||||
)
|
||||
self.set_trig_settings(
|
||||
enable_low=False,
|
||||
enable_high=False,
|
||||
break_time_low=0,
|
||||
break_time_high=0,
|
||||
cycle_low=0,
|
||||
cycle_high=0,
|
||||
exp_time=0,
|
||||
n_of_trigger=0,
|
||||
)
|
||||
self.set_scan_control_settings(
|
||||
mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration
|
||||
)
|
||||
elif scan_name == "xas_simple_scan_with_xrd":
|
||||
self.set_xas_settings(
|
||||
low=self.scan_parameter.start,
|
||||
high=self.scan_parameter.stop,
|
||||
scan_time=self.scan_parameter.scan_time,
|
||||
)
|
||||
self.set_trig_settings(
|
||||
enable_low=self.scan_parameter.break_enable_low,
|
||||
enable_high=self.scan_parameter.break_enable_high,
|
||||
break_time_low=self.scan_parameter.break_time_low,
|
||||
break_time_high=self.scan_parameter.break_time_high,
|
||||
cycle_low=self.scan_parameter.cycle_low,
|
||||
cycle_high=self.scan_parameter.cycle_high,
|
||||
exp_time=self.scan_parameter.exp_time,
|
||||
n_of_trigger=self.scan_parameter.n_of_trigger,
|
||||
)
|
||||
self.set_scan_control_settings(
|
||||
mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration
|
||||
)
|
||||
elif scan_name == "xas_advanced_scan":
|
||||
self.set_advanced_xas_settings(
|
||||
low=self.scan_parameter.start,
|
||||
high=self.scan_parameter.stop,
|
||||
scan_time=self.scan_parameter.scan_time,
|
||||
p_kink=self.scan_parameter.p_kink,
|
||||
e_kink=self.scan_parameter.e_kink,
|
||||
)
|
||||
self.set_trig_settings(
|
||||
enable_low=False,
|
||||
enable_high=False,
|
||||
break_time_low=0,
|
||||
break_time_high=0,
|
||||
cycle_low=0,
|
||||
cycle_high=0,
|
||||
exp_time=0,
|
||||
n_of_trigger=0,
|
||||
)
|
||||
self.set_scan_control_settings(
|
||||
mode=ScanControlMode.ADVANCED, scan_duration=self.scan_parameter.scan_duration
|
||||
)
|
||||
elif scan_name == "xas_advanced_scan_with_xrd":
|
||||
self.set_advanced_xas_settings(
|
||||
low=self.scan_parameter.start,
|
||||
high=self.scan_parameter.stop,
|
||||
scan_time=self.scan_parameter.scan_time,
|
||||
p_kink=self.scan_parameter.p_kink,
|
||||
e_kink=self.scan_parameter.e_kink,
|
||||
)
|
||||
self.set_trig_settings(
|
||||
enable_low=self.scan_parameter.break_enable_low,
|
||||
enable_high=self.scan_parameter.break_enable_high,
|
||||
break_time_low=self.scan_parameter.break_time_low,
|
||||
break_time_high=self.scan_parameter.break_time_high,
|
||||
cycle_low=self.scan_parameter.cycle_low,
|
||||
cycle_high=self.scan_parameter.cycle_high,
|
||||
exp_time=self.scan_parameter.exp_time,
|
||||
n_of_trigger=self.scan_parameter.n_of_trigger,
|
||||
)
|
||||
self.set_scan_control_settings(
|
||||
mode=ScanControlMode.ADVANCED, scan_duration=self.scan_parameter.scan_duration
|
||||
scan_name = self.scan_parameters.scan_name
|
||||
if self._check_if_scan_name_is_valid(self.scan_parameters):
|
||||
if self.scan_parameters.positions is not None:
|
||||
start, stop = (
|
||||
self.scan_parameters.positions
|
||||
if len(self.scan_parameters.positions) == 2
|
||||
else (None, None)
|
||||
)
|
||||
else:
|
||||
start, stop = (None, None)
|
||||
scan_time = self.scan_parameters.additional_scan_parameters.get("scan_time", None)
|
||||
scan_duration = self.scan_parameters.additional_scan_parameters.get(
|
||||
"scan_duration", None
|
||||
)
|
||||
if scan_name == "xas_simple_scan":
|
||||
if any(param is None for param in [start, stop, scan_time, scan_duration]):
|
||||
raise Mo1BraggError(
|
||||
f"Missing scan parameters for xas_simple_scan. Required parameters: start, stop, scan_time, scan_duration in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}"
|
||||
)
|
||||
self.set_xas_settings(low=start, high=stop, scan_time=scan_time)
|
||||
self.set_trig_settings(
|
||||
enable_low=False,
|
||||
enable_high=False,
|
||||
break_time_low=0,
|
||||
break_time_high=0,
|
||||
cycle_low=0,
|
||||
cycle_high=0,
|
||||
exp_time=0,
|
||||
n_of_trigger=0,
|
||||
)
|
||||
self.set_scan_control_settings(
|
||||
mode=ScanControlMode.SIMPLE, scan_duration=scan_duration
|
||||
)
|
||||
elif scan_name == "xas_simple_scan_with_xrd":
|
||||
break_enable_low = self.scan_parameters.additional_scan_parameters.get(
|
||||
"break_enable_low", None
|
||||
)
|
||||
break_enable_high = self.scan_parameters.additional_scan_parameters.get(
|
||||
"break_enable_high", None
|
||||
)
|
||||
break_time_low = self.scan_parameters.additional_scan_parameters.get(
|
||||
"break_time_low", None
|
||||
)
|
||||
break_time_high = self.scan_parameters.additional_scan_parameters.get(
|
||||
"break_time_high", None
|
||||
)
|
||||
cycle_low = self.scan_parameters.additional_scan_parameters.get("cycle_low", None)
|
||||
cycle_high = self.scan_parameters.additional_scan_parameters.get("cycle_high", None)
|
||||
exp_time = self.scan_parameters.exp_time
|
||||
n_of_trigger = self.scan_parameters.additional_scan_parameters.get(
|
||||
"n_of_trigger", None
|
||||
)
|
||||
if any(
|
||||
param is None
|
||||
for param in [
|
||||
start,
|
||||
stop,
|
||||
scan_time,
|
||||
scan_duration,
|
||||
break_enable_low,
|
||||
break_enable_high,
|
||||
break_time_low,
|
||||
break_time_high,
|
||||
cycle_low,
|
||||
cycle_high,
|
||||
exp_time,
|
||||
n_of_trigger,
|
||||
]
|
||||
):
|
||||
raise Mo1BraggError(
|
||||
f"Missing scan parameters for xas_simple_scan_with_xrd. Required parameters: start, stop, scan_time, scan_duration, break_enable_low, break_enable_high, break_time_low, break_time_high, cycle_low, cycle_high, exp_time, n_of_trigger in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}"
|
||||
)
|
||||
self.set_xas_settings(low=start, high=stop, scan_time=scan_time)
|
||||
self.set_trig_settings(
|
||||
enable_low=break_enable_low,
|
||||
enable_high=break_enable_high,
|
||||
break_time_low=break_time_low,
|
||||
break_time_high=break_time_high,
|
||||
cycle_low=cycle_low,
|
||||
cycle_high=cycle_high,
|
||||
exp_time=exp_time,
|
||||
n_of_trigger=n_of_trigger,
|
||||
)
|
||||
self.set_scan_control_settings(
|
||||
mode=ScanControlMode.SIMPLE, scan_duration=scan_duration
|
||||
)
|
||||
elif scan_name == "xas_advanced_scan":
|
||||
p_kink = self.scan_parameters.additional_scan_parameters.get("p_kink", None)
|
||||
e_kink = self.scan_parameters.additional_scan_parameters.get("e_kink", None)
|
||||
if any(
|
||||
param is None
|
||||
for param in [start, stop, scan_time, scan_duration, p_kink, e_kink]
|
||||
):
|
||||
raise Mo1BraggError(
|
||||
f"Missing scan parameters for xas_advanced_scan. Required parameters: start, stop, scan_time, scan_duration, p_kink, e_kink in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}"
|
||||
)
|
||||
self.set_advanced_xas_settings(
|
||||
low=start, high=stop, scan_time=scan_time, p_kink=p_kink, e_kink=e_kink
|
||||
)
|
||||
self.set_trig_settings(
|
||||
enable_low=False,
|
||||
enable_high=False,
|
||||
break_time_low=0,
|
||||
break_time_high=0,
|
||||
cycle_low=0,
|
||||
cycle_high=0,
|
||||
exp_time=0,
|
||||
n_of_trigger=0,
|
||||
)
|
||||
self.set_scan_control_settings(
|
||||
mode=ScanControlMode.ADVANCED, scan_duration=scan_duration
|
||||
)
|
||||
elif scan_name == "xas_advanced_scan_with_xrd":
|
||||
p_kink = self.scan_parameters.additional_scan_parameters.get("p_kink", None)
|
||||
e_kink = self.scan_parameters.additional_scan_parameters.get("e_kink", None)
|
||||
break_enable_low = self.scan_parameters.additional_scan_parameters.get(
|
||||
"break_enable_low", None
|
||||
)
|
||||
break_enable_high = self.scan_parameters.additional_scan_parameters.get(
|
||||
"break_enable_high", None
|
||||
)
|
||||
break_time_low = self.scan_parameters.additional_scan_parameters.get(
|
||||
"break_time_low", None
|
||||
)
|
||||
break_time_high = self.scan_parameters.additional_scan_parameters.get(
|
||||
"break_time_high", None
|
||||
)
|
||||
cycle_low = self.scan_parameters.additional_scan_parameters.get("cycle_low", None)
|
||||
cycle_high = self.scan_parameters.additional_scan_parameters.get("cycle_high", None)
|
||||
exp_time = self.scan_parameters.exp_time
|
||||
n_of_trigger = self.scan_parameters.additional_scan_parameters.get(
|
||||
"n_of_trigger", None
|
||||
)
|
||||
if any(
|
||||
param is None
|
||||
for param in [
|
||||
start,
|
||||
stop,
|
||||
scan_time,
|
||||
scan_duration,
|
||||
p_kink,
|
||||
e_kink,
|
||||
break_enable_low,
|
||||
break_enable_high,
|
||||
break_time_low,
|
||||
break_time_high,
|
||||
cycle_low,
|
||||
cycle_high,
|
||||
exp_time,
|
||||
n_of_trigger,
|
||||
]
|
||||
):
|
||||
raise Mo1BraggError(
|
||||
f"Missing scan parameters for xas_advanced_scan_with_xrd. Required parameters: start, stop, scan_time, scan_duration, p_kink, e_kink, break_enable_low, break_enable_high, break_time_low, break_time_high, cycle_low, cycle_high, exp_time, n_of_trigger in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}"
|
||||
)
|
||||
|
||||
self.set_advanced_xas_settings(
|
||||
low=start, high=stop, scan_time=scan_time, p_kink=p_kink, e_kink=e_kink
|
||||
)
|
||||
self.set_trig_settings(
|
||||
enable_low=break_enable_low,
|
||||
enable_high=break_enable_high,
|
||||
break_time_low=break_time_low,
|
||||
break_time_high=break_time_high,
|
||||
cycle_low=cycle_low,
|
||||
cycle_high=cycle_high,
|
||||
exp_time=exp_time,
|
||||
n_of_trigger=n_of_trigger,
|
||||
)
|
||||
self.set_scan_control_settings(
|
||||
mode=ScanControlMode.ADVANCED, scan_duration=scan_duration
|
||||
)
|
||||
else:
|
||||
return # Should never happen.
|
||||
else:
|
||||
return
|
||||
# Setting scan duration seems to lag behind slightly in the backend, include small sleep
|
||||
@@ -291,6 +363,13 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
self.stopped = True # Needs to be set to stop motion
|
||||
|
||||
######### Utility Methods #########
|
||||
|
||||
def _check_if_scan_name_is_valid(self, scan_parameters: ScanServerScanInfo) -> bool:
|
||||
"""Check if the scan is within the list of scans for which the backend is working"""
|
||||
if scan_parameters.scan_name in self.valid_scan_names:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _progress_update(self, value, **kwargs) -> None:
|
||||
"""Callback method to update the scan progress, runs a callback
|
||||
to SUB_PROGRESS subscribers, i.e. BEC.
|
||||
@@ -468,12 +547,3 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
|
||||
for s in status_list:
|
||||
s.wait(timeout=self.timeout_for_pvwait)
|
||||
|
||||
def _update_scan_parameter(self):
|
||||
"""Get the scan_info parameters for the scan."""
|
||||
for key, value in self.scan_info.msg.request_inputs["inputs"].items():
|
||||
if hasattr(self.scan_parameter, key):
|
||||
setattr(self.scan_parameter, key, value)
|
||||
for key, value in self.scan_info.msg.request_inputs["kwargs"].items():
|
||||
if hasattr(self.scan_parameter, key):
|
||||
setattr(self.scan_parameter, key, value)
|
||||
|
||||
@@ -182,7 +182,7 @@ class Mo1TriggerSettings(Device):
|
||||
class Mo1BraggCalculator(Device):
|
||||
"""Mo1 Bragg PVs to convert angle to energy or vice-versa."""
|
||||
|
||||
calc_reset = Cpt(EpicsSignal, suffix="calc_reset", kind="config", put_complete=True)
|
||||
calc_reset = Cpt(EpicsSignalWithRBV, suffix="calc_reset", kind="config", put_complete=True)
|
||||
calc_done = Cpt(EpicsSignalRO, suffix="calc_done_RBV", kind="config")
|
||||
calc_energy = Cpt(EpicsSignalWithRBV, suffix="calc_energy", kind="config")
|
||||
calc_angle = Cpt(EpicsSignalWithRBV, suffix="calc_angle", kind="config")
|
||||
|
||||
@@ -3,6 +3,7 @@ from __future__ import annotations
|
||||
from typing import TYPE_CHECKING, Literal
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase
|
||||
from ophyd.status import WaitTimeoutError
|
||||
@@ -18,6 +19,7 @@ from debye_bec.devices.nidaq.nidaq_enums import (
|
||||
ScanRates,
|
||||
ScanType,
|
||||
)
|
||||
from debye_bec.devices.utils.utils import fetch_scan_info
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
@@ -219,7 +221,7 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
|
||||
def __init__(self, prefix: str = "", *, name: str, scan_info: ScanInfo = None, **kwargs):
|
||||
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
|
||||
self.scan_info: ScanInfo
|
||||
self.scan_parameters: ScanServerScanInfo = None
|
||||
self.timeout_wait_for_signal = 5 # put 5s firsts
|
||||
self._timeout_wait_for_pv = (
|
||||
5 # 5s timeout for pv calls. editted due to timeout issues persisting
|
||||
@@ -236,10 +238,9 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
# Beamline Methods #
|
||||
########################################
|
||||
|
||||
def _check_if_scan_name_is_valid(self) -> bool:
|
||||
def _check_if_scan_name_is_valid(self, scan_parameters: ScanServerScanInfo) -> bool:
|
||||
"""Check if the scan is within the list of scans for which the backend is working"""
|
||||
scan_name = self.scan_info.msg.scan_name
|
||||
if scan_name in self.valid_scan_names:
|
||||
if scan_parameters.scan_name in self.valid_scan_names:
|
||||
return True
|
||||
return False
|
||||
|
||||
@@ -396,7 +397,8 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
|
||||
If the upcoming scan is not in the list of valid scans, return immediately.
|
||||
"""
|
||||
if not self._check_if_scan_name_is_valid():
|
||||
self.scan_parameters = fetch_scan_info(self.scan_info)
|
||||
if not self._check_if_scan_name_is_valid(self.scan_parameters):
|
||||
return None
|
||||
|
||||
if self.state.get() != NidaqState.STANDBY:
|
||||
@@ -406,18 +408,18 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
status.wait(timeout=self.timeout_wait_for_signal)
|
||||
|
||||
# If scan is not part of the valid_scan_names,
|
||||
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
|
||||
if self.scan_parameters.scan_name != "nidaq_continuous_scan": # what is the new v4 scan
|
||||
self.scan_type.set(ScanType.TRIGGERED).wait(timeout=self._timeout_wait_for_pv)
|
||||
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
|
||||
self.enable_compression.set(1).wait(timeout=self._timeout_wait_for_pv)
|
||||
else:
|
||||
self.scan_type.set(ScanType.CONTINUOUS).wait(timeout=self._timeout_wait_for_pv)
|
||||
self.scan_duration.set(self.scan_info.msg.scan_parameters["scan_duration"]).wait(
|
||||
timeout=self._timeout_wait_for_pv
|
||||
)
|
||||
self.enable_compression.set(self.scan_info.msg.scan_parameters["compression"]).wait(
|
||||
timeout=self._timeout_wait_for_pv
|
||||
)
|
||||
self.scan_duration.set(
|
||||
self.scan_parameters.additional_scan_parameters["scan_duration"]
|
||||
).wait(timeout=self._timeout_wait_for_pv)
|
||||
self.enable_compression.set(
|
||||
self.scan_parameters.additional_scan_parameters["compression"]
|
||||
).wait(timeout=self._timeout_wait_for_pv)
|
||||
|
||||
# Stage call to IOC
|
||||
status = CompareStatus(self.state, NidaqState.STAGE)
|
||||
@@ -428,7 +430,7 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
# self.stage_call.set(1).wait(timeout=self._timeout_wait_for_pv)
|
||||
self.stage_call.put(1)
|
||||
status.wait(timeout=self.timeout_wait_for_signal)
|
||||
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
|
||||
if self.scan_parameters.scan_name != "nidaq_continuous_scan":
|
||||
status = self.on_kickoff()
|
||||
self.cancel_on_stop(status)
|
||||
status.wait(timeout=self._timeout_wait_for_pv)
|
||||
@@ -459,10 +461,10 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
before the motor starts its oscillation. This is needed for being properly homed.
|
||||
The NIDAQ should go into Acquiring mode.
|
||||
"""
|
||||
if not self._check_if_scan_name_is_valid():
|
||||
if not self._check_if_scan_name_is_valid(self.scan_parameters):
|
||||
return None
|
||||
|
||||
if self.scan_info.msg.scan_name == "nidaq_continuous_scan":
|
||||
if self.scan_parameters.scan_name == "nidaq_continuous_scan":
|
||||
logger.info(f"Device {self.name} ready to be kicked off for nidaq_continuous_scan")
|
||||
return None
|
||||
|
||||
@@ -483,12 +485,12 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
For the NIDAQ we use this method to stop the backend since it
|
||||
would not stop by itself in its current implementation since the number of points are not predefined.
|
||||
"""
|
||||
if not self._check_if_scan_name_is_valid():
|
||||
if not self._check_if_scan_name_is_valid(self.scan_parameters):
|
||||
return None
|
||||
|
||||
status = CompareStatus(self.state, NidaqState.STANDBY)
|
||||
self.cancel_on_stop(status)
|
||||
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
|
||||
if self.scan_parameters.scan_name != "nidaq_continuous_scan":
|
||||
self.on_stop()
|
||||
return status
|
||||
|
||||
@@ -499,7 +501,9 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
Args:
|
||||
value (int) : current progress value
|
||||
"""
|
||||
scan_duration = self.scan_info.msg.scan_parameters.get("scan_duration", None)
|
||||
if self.scan_parameters is None:
|
||||
return
|
||||
scan_duration = self.scan_parameters.additional_scan_parameters.get("scan_duration", None)
|
||||
if not isinstance(scan_duration, (int, float)):
|
||||
return
|
||||
value = scan_duration - value
|
||||
|
||||
@@ -11,16 +11,26 @@ from typing import TYPE_CHECKING, Tuple
|
||||
import numpy as np
|
||||
from bec_lib.file_utils import get_full_path
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import EpicsSignal, EpicsSignalRO, Kind
|
||||
from ophyd.areadetector.cam import ADBase, PilatusDetectorCam
|
||||
from ophyd.areadetector.plugins import HDF5Plugin_V22 as HDF5Plugin
|
||||
from ophyd.areadetector.plugins import ImagePlugin_V22 as ImagePlugin
|
||||
from ophyd.status import WaitTimeoutError
|
||||
from ophyd_devices import AndStatus, CompareStatus, DeviceStatus, FileEventSignal, PreviewSignal
|
||||
from ophyd_devices import (
|
||||
AndStatus,
|
||||
CompareStatus,
|
||||
DeviceStatus,
|
||||
ExceptionStatus,
|
||||
FileEventSignal,
|
||||
PreviewSignal,
|
||||
)
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from debye_bec.devices.utils.utils import fetch_scan_info
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from bec_lib.messages import DevicePreviewMessage, ScanStatusMessage
|
||||
@@ -145,17 +155,17 @@ class Pilatus(PSIDeviceBase, ADBase):
|
||||
|
||||
# USER_ACCESS = ["start_live_mode", "stop_live_mode"]
|
||||
|
||||
cam_gain_menu_string = Cpt(EpicsSignalRO, suffix='cam1:GainMenu', string=True)
|
||||
cam_gain_menu_string = Cpt(EpicsSignalRO, suffix="cam1:GainMenu", string=True)
|
||||
|
||||
_default_configuration_attrs = [
|
||||
'cam.threshold_energy',
|
||||
'cam.threshold_auto_apply',
|
||||
'cam.gain_menu',
|
||||
'cam_gain_menu_string',
|
||||
'cam.pixel_cut_off',
|
||||
'cam.acquire_time',
|
||||
'cam.num_exposures',
|
||||
'cam.model',
|
||||
"cam.threshold_energy",
|
||||
"cam.threshold_auto_apply",
|
||||
"cam.gain_menu",
|
||||
"cam_gain_menu_string",
|
||||
"cam.pixel_cut_off",
|
||||
"cam.acquire_time",
|
||||
"cam.num_exposures",
|
||||
"cam.model",
|
||||
]
|
||||
|
||||
cam = Cpt(PilatusDetectorCam, "cam1:")
|
||||
@@ -233,7 +243,6 @@ class Pilatus(PSIDeviceBase, ADBase):
|
||||
super().__init__(
|
||||
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
|
||||
)
|
||||
self.scan_parameter = ScanParameter()
|
||||
self.device_manager = device_manager
|
||||
self._readout_time = PILATUS_READOUT_TIME
|
||||
self._full_path = ""
|
||||
@@ -251,6 +260,7 @@ class Pilatus(PSIDeviceBase, ADBase):
|
||||
# self._live_mode_run_event = threading.Event()
|
||||
# self._live_mode_stopped_event = threading.Event()
|
||||
# self._live_mode_stopped_event.set() # Initial state is stopped
|
||||
self.scan_parameters: ScanServerScanInfo = None
|
||||
|
||||
########################################
|
||||
# Custom Beamline Methods #
|
||||
@@ -368,19 +378,22 @@ class Pilatus(PSIDeviceBase, ADBase):
|
||||
status = status_acquire & status_writing & status_cam_server
|
||||
return status
|
||||
|
||||
def _calculate_trigger(self, scan_msg: ScanStatusMessage) -> Tuple[float, float]:
|
||||
self._update_scan_parameter()
|
||||
def _calculate_trigger(self, scan_parameters: ScanServerScanInfo) -> Tuple[float, float]:
|
||||
total_osc = 0
|
||||
calc_duration = 0
|
||||
total_trig_lo = 0
|
||||
total_trig_hi = 0
|
||||
# Switching high/low is intended as angle is inverse to energy and settings in BEC are always in energy
|
||||
loc_break_enable_low = self.scan_parameter.break_enable_high
|
||||
loc_break_time_low = self.scan_parameter.break_time_high
|
||||
loc_cycle_low = self.scan_parameter.cycle_high
|
||||
loc_break_enable_high = self.scan_parameter.break_enable_low
|
||||
loc_break_time_high = self.scan_parameter.break_time_low
|
||||
loc_cycle_high = self.scan_parameter.cycle_low
|
||||
loc_break_enable_low = scan_parameters.additional_scan_parameters.get(
|
||||
"break_enable_high", False
|
||||
)
|
||||
loc_break_time_low = scan_parameters.additional_scan_parameters.get("break_time_high", 0)
|
||||
loc_cycle_low = scan_parameters.additional_scan_parameters.get("cycle_high", 1)
|
||||
loc_break_enable_high = scan_parameters.additional_scan_parameters.get(
|
||||
"break_enable_low", False
|
||||
)
|
||||
loc_break_time_high = scan_parameters.additional_scan_parameters.get("break_time_low", 0)
|
||||
loc_cycle_high = scan_parameters.additional_scan_parameters.get("cycle_low", 1)
|
||||
|
||||
if not loc_break_enable_low:
|
||||
loc_break_time_low = 0
|
||||
@@ -389,28 +402,36 @@ class Pilatus(PSIDeviceBase, ADBase):
|
||||
loc_break_time_high = 0
|
||||
loc_cycle_high = 1
|
||||
|
||||
total_osc = self.scan_parameter.scan_duration / (
|
||||
self.scan_parameter.scan_time +
|
||||
loc_break_time_low / (2 * loc_cycle_low) +
|
||||
loc_break_time_high / (2 * loc_cycle_high)
|
||||
total_osc = scan_parameters.additional_scan_parameters.get("scan_duration", 0) / (
|
||||
scan_parameters.additional_scan_parameters.get("scan_time", 0)
|
||||
+ loc_break_time_low / (2 * loc_cycle_low)
|
||||
+ loc_break_time_high / (2 * loc_cycle_high)
|
||||
)
|
||||
total_osc = np.ceil(total_osc)
|
||||
total_osc = total_osc + total_osc % 2 # round up to the next even number
|
||||
total_osc = total_osc + total_osc % 2 # round up to the next even number
|
||||
|
||||
if loc_break_enable_low:
|
||||
total_trig_lo = np.floor(total_osc / (2 * loc_cycle_low))
|
||||
if loc_break_enable_high:
|
||||
total_trig_hi = np.floor(total_osc / (2 * loc_cycle_high))
|
||||
calc_duration = total_osc * self.scan_parameter.scan_time + total_trig_lo * loc_break_time_low + total_trig_hi * loc_break_time_high
|
||||
|
||||
if calc_duration < self.scan_parameter.scan_duration:
|
||||
calc_duration = (
|
||||
total_osc * scan_parameters.additional_scan_parameters.get("scan_time", 0)
|
||||
+ total_trig_lo * loc_break_time_low
|
||||
+ total_trig_hi * loc_break_time_high
|
||||
)
|
||||
|
||||
if calc_duration < scan_parameters.additional_scan_parameters.get("scan_duration", 0):
|
||||
# Due to inaccuracy in formula, this can happen, we then need to manually add two oscillations and recalculate the triggers
|
||||
total_osc = total_osc + 2
|
||||
if loc_break_enable_low:
|
||||
total_trig_lo = np.floor(total_osc / (2 * loc_cycle_low))
|
||||
if loc_break_enable_high:
|
||||
total_trig_hi = np.floor(total_osc / (2 * loc_cycle_high))
|
||||
calc_duration = total_osc * self.scan_parameter.scan_time + total_trig_lo * loc_break_time_low + total_trig_hi * loc_break_time_high
|
||||
calc_duration = (
|
||||
total_osc * scan_parameters.additional_scan_parameters.get("scan_time", 0)
|
||||
+ total_trig_lo * loc_break_time_low
|
||||
+ total_trig_hi * loc_break_time_high
|
||||
)
|
||||
|
||||
return total_trig_lo, total_trig_hi
|
||||
|
||||
@@ -464,6 +485,7 @@ class Pilatus(PSIDeviceBase, ADBase):
|
||||
(self.scan_info.msg) object.
|
||||
"""
|
||||
# self.stop_live_mode() # Make sure that live mode is stopped if scan runs
|
||||
self.scan_parameters = fetch_scan_info(self.scan_info)
|
||||
|
||||
# If user has activated alignment mode on qt panel, switch back to multitrigger and stop acquisition
|
||||
if self.cam.trigger_mode.get() != TRIGGERMODE.MULT_TRIGGER.value:
|
||||
@@ -473,25 +495,29 @@ class Pilatus(PSIDeviceBase, ADBase):
|
||||
status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.DONE.value)
|
||||
status_cam.wait(timeout=5)
|
||||
|
||||
scan_msg: ScanStatusMessage = self.scan_info.msg
|
||||
if scan_msg.scan_name in self.xas_xrd_scan_names:
|
||||
self._update_scan_parameter()
|
||||
if self.scan_parameters.scan_name in self.xas_xrd_scan_names:
|
||||
# Compute number of triggers
|
||||
total_trig_lo, total_trig_hi = self._calculate_trigger(scan_msg)
|
||||
total_trig_lo, total_trig_hi = self._calculate_trigger(self.scan_parameters)
|
||||
# Set the number of images, we may also set this to a higher values if preferred and stop the acquisition
|
||||
# TODO This logic is prone to errors, as we rely on the scans to nicely resolve to n_images. We should
|
||||
# use here instead a way of settings the n_images independently of the scan parameters to avoid running out of sync
|
||||
# with the complete method. Ideally we comput them in the scan itself.. This is much safer IMO!
|
||||
self.n_images = (total_trig_lo + total_trig_hi) * self.scan_parameter.n_of_trigger
|
||||
exp_time = self.scan_parameter.exp_time
|
||||
self.n_images = (
|
||||
total_trig_lo + total_trig_hi
|
||||
) * self.scan_parameters.additional_scan_parameters.get("n_of_trigger", 1)
|
||||
exp_time = self.scan_parameters.exp_time
|
||||
self.trigger_source.set(MONOTRIGGERSOURCE.INPOS).wait(5)
|
||||
self.trigger_n_of.set(self.scan_parameter.n_of_trigger).wait(5)
|
||||
|
||||
elif scan_msg.scan_type == "step":
|
||||
self.n_images = scan_msg.num_points * scan_msg.scan_parameters.get(
|
||||
"frames_per_trigger", 1
|
||||
self.trigger_n_of.set(
|
||||
self.scan_parameters.additional_scan_parameters.get("n_of_trigger", 1)
|
||||
).wait(5)
|
||||
# TODO migrate logic to v4 once old scans are deprecated,
|
||||
# TODO if num_points=None and no logic from scan_name applies, can't measure with this detector..
|
||||
elif self.scan_parameters.scan_type == "software_triggered":
|
||||
self.n_images = (
|
||||
self.scan_parameters.num_monitored_readouts
|
||||
* self.scan_parameters.frames_per_trigger
|
||||
)
|
||||
exp_time = scan_msg.scan_parameters.get("exp_time")
|
||||
exp_time = self.scan_parameters.exp_time
|
||||
self.trigger_source.set(MONOTRIGGERSOURCE.EPICS).wait(5)
|
||||
self.trigger_n_of.set(1).wait(5) # BEC will trigger each acquisition
|
||||
else:
|
||||
@@ -512,7 +538,7 @@ class Pilatus(PSIDeviceBase, ADBase):
|
||||
)
|
||||
)
|
||||
detector_exp_time = exp_time - self._readout_time
|
||||
self._full_path = get_full_path(scan_msg, name="pilatus")
|
||||
self._full_path = get_full_path(self.scan_info.msg, name="pilatus")
|
||||
file_path = "/".join(self._full_path.split("/")[:-1])
|
||||
file_name = self._full_path.split("/")[-1]
|
||||
# Prepare detector and backend
|
||||
@@ -544,9 +570,9 @@ class Pilatus(PSIDeviceBase, ADBase):
|
||||
|
||||
def on_pre_scan(self) -> DeviceStatus | None:
|
||||
"""Called right before the scan starts on all devices automatically."""
|
||||
scan_msg: ScanStatusMessage = self.scan_info.msg
|
||||
if (
|
||||
scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == "step"
|
||||
self.scan_parameters.scan_name in self.xas_xrd_scan_names
|
||||
or self.scan_parameters.scan_type == "software_triggered"
|
||||
): # TODO how to deal with fly scans?
|
||||
status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.ACQUIRING.value)
|
||||
status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.ACQUIRING.value)
|
||||
@@ -561,8 +587,7 @@ class Pilatus(PSIDeviceBase, ADBase):
|
||||
|
||||
def on_trigger(self) -> DeviceStatus | None:
|
||||
"""Called when the device is triggered."""
|
||||
scan_msg: ScanStatusMessage = self.scan_info.msg
|
||||
if not scan_msg.scan_type == "step":
|
||||
if not self.scan_parameters.scan_type == "software_triggered":
|
||||
return None
|
||||
start_time = time.time()
|
||||
img_counter = self.hdf.num_captured.get()
|
||||
@@ -575,9 +600,9 @@ class Pilatus(PSIDeviceBase, ADBase):
|
||||
|
||||
def _complete_callback(self, status: DeviceStatus):
|
||||
"""Callback for when the device completes a scan."""
|
||||
scan_msg: ScanStatusMessage = self.scan_info.msg
|
||||
if (
|
||||
scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == "step"
|
||||
self.scan_parameters.scan_name in self.xas_xrd_scan_names
|
||||
or self.scan_parameters.scan_type == "software_triggered"
|
||||
): # TODO how to deal with fly scans?
|
||||
if status.success:
|
||||
self.file_event.put(
|
||||
@@ -598,14 +623,15 @@ class Pilatus(PSIDeviceBase, ADBase):
|
||||
|
||||
def on_complete(self) -> DeviceStatus | None:
|
||||
"""Called to inquire if a device has completed a scans."""
|
||||
scan_msg: ScanStatusMessage = self.scan_info.msg
|
||||
if (
|
||||
scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == "step"
|
||||
self.scan_parameters.scan_name in self.xas_xrd_scan_names
|
||||
or self.scan_parameters.scan_type == "software_triggered"
|
||||
): # TODO how to deal with fly scans?
|
||||
status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.DONE.value)
|
||||
status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.DONE.value)
|
||||
status_cam_server = CompareStatus(self.cam.armed, DETECTORSTATE.UNARMED.value)
|
||||
if self.scan_info.msg.scan_name in self.xas_xrd_scan_names:
|
||||
# status_write_error = ExceptionStatus(self.hdf.write_status, 0, operation="!=")
|
||||
if self.scan_parameters.scan_name in self.xas_xrd_scan_names:
|
||||
# For long scans, it can be that the mono will execute one cycle more,
|
||||
# meaning a few more XRD triggers will be sent
|
||||
status_img_written = CompareStatus(
|
||||
@@ -614,7 +640,9 @@ class Pilatus(PSIDeviceBase, ADBase):
|
||||
else:
|
||||
status_img_written = CompareStatus(self.hdf.num_captured, self.n_images)
|
||||
status_img_written = CompareStatus(self.hdf.num_captured, self.n_images)
|
||||
status = status_hdf & status_cam & status_img_written & status_cam_server
|
||||
status = (
|
||||
status_hdf & status_cam & status_img_written & status_cam_server
|
||||
) # & status_write_error
|
||||
status.add_callback(self._complete_callback) # Callback that writing was successful
|
||||
self.cancel_on_stop(status)
|
||||
return status
|
||||
@@ -635,15 +663,6 @@ class Pilatus(PSIDeviceBase, ADBase):
|
||||
# TODO do we need to clean the poll thread ourselves?
|
||||
self.on_stop()
|
||||
|
||||
def _update_scan_parameter(self):
|
||||
"""Get the scan_info parameters for the scan."""
|
||||
for key, value in self.scan_info.msg.request_inputs["inputs"].items():
|
||||
if hasattr(self.scan_parameter, key):
|
||||
setattr(self.scan_parameter, key, value)
|
||||
for key, value in self.scan_info.msg.request_inputs["kwargs"].items():
|
||||
if hasattr(self.scan_parameter, key):
|
||||
setattr(self.scan_parameter, key, value)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
"""Utility functions for the devices."""
|
||||
|
||||
from copy import deepcopy
|
||||
|
||||
import numpy as np
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
|
||||
from pydantic import ValidationError
|
||||
|
||||
|
||||
def fetch_scan_info(scan_info: ScanInfo) -> ScanServerScanInfo:
|
||||
"""Fetch the scan parameters from the scan_info object and return them as a ScanServerScanInfo object."""
|
||||
info = scan_info.msg.info
|
||||
if isinstance(info["positions"], list):
|
||||
info["positions"] = np.array(info["positions"])
|
||||
try:
|
||||
msg = ScanServerScanInfo.model_validate(info)
|
||||
except ValidationError: # This means we have an old scan_info object.
|
||||
info = deepcopy(info)
|
||||
# We need to convert a few parameters manually.
|
||||
info["scan_type"] = (
|
||||
"hardware_triggered" if info["scan_type"] == "fly" else "software_triggered"
|
||||
)
|
||||
msg = ScanServerScanInfo.model_validate(info)
|
||||
|
||||
return msg
|
||||
@@ -236,52 +236,57 @@ class DebyeNexusStructure(DefaultFormat):
|
||||
self.configuration.get("nidaq", {}).get("nidaq_add_chans", {}).get("value")
|
||||
)
|
||||
|
||||
rle = self.configuration.get("nidaq", {}).get("nidaq_rle", {}).get("value")
|
||||
|
||||
measurement_mode = entry.create_group(name="mode")
|
||||
measurement_mode.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
if (int(ci_chans_bits) & 0x7F) != 0:
|
||||
# Create a dataset
|
||||
rayspec_sdd_active = measurement_mode.create_group(
|
||||
name="Multi_Element_Partial_Fluorescence_Yield"
|
||||
)
|
||||
me_sdd = rayspec_sdd_active.create_dataset(
|
||||
name="Detector", data="Rayspec 7 element Silicon Drift Detector"
|
||||
)
|
||||
me_sdd.attrs["NX_class"] = "NX_CHAR"
|
||||
if ci_chans_bits is not None:
|
||||
if (int(ci_chans_bits) & 0x7F) != 0:
|
||||
# Create a dataset
|
||||
rayspec_sdd_active = measurement_mode.create_group(
|
||||
name="Multi_Element_Partial_Fluorescence_Yield"
|
||||
)
|
||||
me_sdd = rayspec_sdd_active.create_dataset(
|
||||
name="Detector", data="Rayspec 7 element Silicon Drift Detector"
|
||||
)
|
||||
me_sdd.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
if (int(ci_chans_bits) & (1 << 8)) != 0:
|
||||
# Create a dataset
|
||||
ketek_sdd_active = measurement_mode.create_group(
|
||||
name="Single_Element_Partial_Fluorescence_Yield"
|
||||
)
|
||||
se_sdd = ketek_sdd_active.create_dataset(
|
||||
name="Detector", data="Ketex mini single element Silicon Drift Detector"
|
||||
)
|
||||
se_sdd.attrs["NX_class"] = "NX_CHAR"
|
||||
if (int(ci_chans_bits) & (1 << 8)) != 0:
|
||||
# Create a dataset
|
||||
ketek_sdd_active = measurement_mode.create_group(
|
||||
name="Single_Element_Partial_Fluorescence_Yield"
|
||||
)
|
||||
se_sdd = ketek_sdd_active.create_dataset(
|
||||
name="Detector", data="Ketex mini single element Silicon Drift Detector"
|
||||
)
|
||||
se_sdd.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
if (int(ai_chans_bits) & (1 << 6)) != 0:
|
||||
# Create a dataset
|
||||
pips_active = measurement_mode.create_group(name="Total_Flourescence_Yield")
|
||||
tfy = pips_active.create_dataset(
|
||||
name="Detector", data="Mirion Technologies Partially Depeleted PIPS Detector"
|
||||
)
|
||||
tfy.attrs["NX_class"] = "NX_CHAR"
|
||||
if ai_chans_bits is not None:
|
||||
if (int(ai_chans_bits) & (1 << 6)) != 0:
|
||||
# Create a dataset
|
||||
pips_active = measurement_mode.create_group(name="Total_Flourescence_Yield")
|
||||
tfy = pips_active.create_dataset(
|
||||
name="Detector",
|
||||
data="Mirion Technologies Partially Depeleted PIPS Detector",
|
||||
)
|
||||
tfy.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
if ((int(ai_chans_bits) & (1 << 0)) != 0) & ((int(ai_chans_bits) & (1 << 2)) != 0):
|
||||
# Create a dataset
|
||||
ai0ai2_active = measurement_mode.create_group(name="Sample_Transmission")
|
||||
sam_trans = ai0ai2_active.create_dataset(
|
||||
name="Detector", data="Ionitec 15 cm gas filled Ionisation Chambers"
|
||||
)
|
||||
sam_trans.attrs["NX_class"] = "NX_CHAR"
|
||||
if ((int(ai_chans_bits) & (1 << 0)) != 0) & ((int(ai_chans_bits) & (1 << 2)) != 0):
|
||||
# Create a dataset
|
||||
ai0ai2_active = measurement_mode.create_group(name="Sample_Transmission")
|
||||
sam_trans = ai0ai2_active.create_dataset(
|
||||
name="Detector", data="Ionitec 15 cm gas filled Ionisation Chambers"
|
||||
)
|
||||
sam_trans.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
if ((int(ai_chans_bits) & (1 << 2)) != 0) & ((int(ai_chans_bits) & (1 << 4)) != 0):
|
||||
# Create a dataset
|
||||
ai2ai4_active = measurement_mode.create_group(name="Reference_Transmission")
|
||||
ref_trans = ai2ai4_active.create_dataset(
|
||||
name="Detector", data="Ionitec 15 cm gas filled Ionisation Chambers"
|
||||
)
|
||||
ref_trans.attrs["NX_class"] = "NX_CHAR"
|
||||
if ((int(ai_chans_bits) & (1 << 2)) != 0) & ((int(ai_chans_bits) & (1 << 4)) != 0):
|
||||
# Create a dataset
|
||||
ai2ai4_active = measurement_mode.create_group(name="Reference_Transmission")
|
||||
ref_trans = ai2ai4_active.create_dataset(
|
||||
name="Detector", data="Ionitec 15 cm gas filled Ionisation Chambers"
|
||||
)
|
||||
ref_trans.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
main_data = entry.create_group(name="data")
|
||||
main_data.attrs["NX_class"] = "NXdata"
|
||||
@@ -308,10 +313,11 @@ class DebyeNexusStructure(DefaultFormat):
|
||||
i0.attrs["NX_class"] = "NXdata"
|
||||
i0.attrs["units"] = "V"
|
||||
|
||||
main_data.create_soft_link(
|
||||
name="i0",
|
||||
target="/entry/collection/readout_groups/async/nidaq/nidaq_ai0_mean/value",
|
||||
)
|
||||
if rle:
|
||||
target = "/entry/collection/readout_groups/async/nidaq/nidaq_ai0_mean/value"
|
||||
else:
|
||||
target = "/entry/collection/readout_groups/async/nidaq/nidaq_ai0/value"
|
||||
main_data.create_soft_link(name="i0", target=target)
|
||||
|
||||
##################
|
||||
## i1
|
||||
@@ -322,10 +328,11 @@ class DebyeNexusStructure(DefaultFormat):
|
||||
i1.attrs["NX_class"] = "NXdata"
|
||||
i1.attrs["units"] = "V"
|
||||
|
||||
main_data.create_soft_link(
|
||||
name="i1",
|
||||
target="/entry/collection/readout_groups/async/nidaq/nidaq_ai2_mean/value",
|
||||
)
|
||||
if rle:
|
||||
target = "/entry/collection/readout_groups/async/nidaq/nidaq_ai2_mean/value"
|
||||
else:
|
||||
target = "/entry/collection/readout_groups/async/nidaq/nidaq_ai2/value"
|
||||
main_data.create_soft_link(name="i1", target=target)
|
||||
|
||||
##################
|
||||
## i2
|
||||
@@ -336,10 +343,11 @@ class DebyeNexusStructure(DefaultFormat):
|
||||
i2.attrs["NX_class"] = "NXdata"
|
||||
i2.attrs["units"] = "V"
|
||||
|
||||
main_data.create_soft_link(
|
||||
name="i2",
|
||||
target="/entry/collection/readout_groups/async/nidaq/nidaq_ai4_mean/value",
|
||||
)
|
||||
if rle:
|
||||
target = "/entry/collection/readout_groups/async/nidaq/nidaq_ai4_mean/value"
|
||||
else:
|
||||
target = "/entry/collection/readout_groups/async/nidaq/nidaq_ai4/value"
|
||||
main_data.create_soft_link(name="i2", target=target)
|
||||
|
||||
##################
|
||||
## ci sum
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from .mono_bragg_scans import (
|
||||
XASAdvancedScan,
|
||||
XASAdvancedScanWithXRD,
|
||||
XASSimpleScan,
|
||||
XASSimpleScanWithXRD,
|
||||
from .nidaq_continuous_scan import NidaqContinuousScan
|
||||
from .xas_simple_scan import (
|
||||
XasAdvancedScan,
|
||||
XasAdvancedScanWithXrd,
|
||||
XasSimpleScan,
|
||||
XasSimpleScanWithXrd,
|
||||
)
|
||||
from .nidaq_cont_scan import NIDAQContinuousScan
|
||||
|
||||
@@ -1,310 +0,0 @@
|
||||
"""This module contains the scan classes for the mono bragg motor of the Debye beamline."""
|
||||
|
||||
import time
|
||||
from typing import Literal
|
||||
|
||||
import numpy as np
|
||||
from bec_lib.device import DeviceBase
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_server.scan_server.scans import AsyncFlyScanBase
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class XASSimpleScan(AsyncFlyScanBase):
|
||||
"""Class for the XAS simple scan"""
|
||||
|
||||
scan_name = "xas_simple_scan"
|
||||
scan_type = "fly"
|
||||
scan_report_hint = "device_progress"
|
||||
required_kwargs = []
|
||||
use_scan_progress_report = False
|
||||
pre_move = False
|
||||
gui_config = {
|
||||
"Movement Parameters": ["start", "stop"],
|
||||
"Scan Parameters": ["scan_time", "scan_duration"],
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
start: float,
|
||||
stop: float,
|
||||
scan_time: float,
|
||||
scan_duration: float,
|
||||
motor: DeviceBase = "mo1_bragg",
|
||||
**kwargs,
|
||||
):
|
||||
"""The xas_simple_scan is used to start a simple oscillating scan on the mono bragg motor.
|
||||
Start and Stop define the energy range for the scan, scan_time is the time for one scan
|
||||
cycle and scan_duration is the duration of the scan. If scan duration is set to 0, the
|
||||
scan will run infinitely.
|
||||
|
||||
Args:
|
||||
start (float): Start energy for the scan.
|
||||
stop (float): Stop energy for the scan.
|
||||
scan_time (float): Time for one scan cycle.
|
||||
scan_duration (float): Duration of the scan.
|
||||
motor (DeviceBase, optional): Motor device to be used for the scan.
|
||||
Defaults to "mo1_bragg".
|
||||
Examples:
|
||||
>>> scans.xas_simple_scan(start=8000, stop=9000, scan_time=1, scan_duration=10)
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self.motor = motor
|
||||
self.start = start
|
||||
self.stop = stop
|
||||
self.scan_time = scan_time
|
||||
self.scan_duration = scan_duration
|
||||
self.primary_readout_cycle = 1
|
||||
|
||||
def update_readout_priority(self):
|
||||
"""Ensure that NIDAQ is not monitored for any quick EXAFS."""
|
||||
super().update_readout_priority()
|
||||
self.readout_priority["async"].append("nidaq")
|
||||
|
||||
def prepare_positions(self):
|
||||
"""Prepare the positions for the scan.
|
||||
|
||||
Use here only start and end energy defining the range for the scan.
|
||||
"""
|
||||
self.positions = np.array([self.start, self.stop], dtype=float)
|
||||
self.num_pos = None
|
||||
yield None
|
||||
|
||||
def pre_scan(self):
|
||||
"""Pre Scan action."""
|
||||
|
||||
self._check_limits()
|
||||
# Ensure parent class pre_scan actions to be called.
|
||||
yield from super().pre_scan()
|
||||
|
||||
def scan_report_instructions(self):
|
||||
"""
|
||||
Return the instructions for the scan report.
|
||||
"""
|
||||
yield from self.stubs.scan_report_instruction({"device_progress": [self.motor]})
|
||||
|
||||
def scan_core(self):
|
||||
"""Run the scan core.
|
||||
Kickoff the oscillation on the Bragg motor and wait for the completion of the motion.
|
||||
"""
|
||||
# Start the oscillation on the Bragg motor.
|
||||
yield from self.stubs.kickoff(device=self.motor)
|
||||
complete_status = yield from self.stubs.complete(device=self.motor, wait=False)
|
||||
|
||||
while not complete_status.done:
|
||||
# Readout monitored devices
|
||||
yield from self.stubs.read(group="monitored", point_id=self.point_id)
|
||||
time.sleep(self.primary_readout_cycle)
|
||||
self.point_id += 1
|
||||
|
||||
self.num_pos = self.point_id
|
||||
|
||||
|
||||
class XASSimpleScanWithXRD(XASSimpleScan):
|
||||
"""Class for the XAS simple scan with XRD"""
|
||||
|
||||
scan_name = "xas_simple_scan_with_xrd"
|
||||
gui_config = {
|
||||
"Movement Parameters": ["start", "stop"],
|
||||
"Scan Parameters": ["scan_time", "scan_duration"],
|
||||
"Low Energy Break": ["break_enable_low", "break_time_low", "cycle_low"],
|
||||
"High Energy Break": ["break_enable_high", "break_time_high", "cycle_high"],
|
||||
"XRD Triggers": ["exp_time", "n_of_trigger"],
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
start: float,
|
||||
stop: float,
|
||||
scan_time: float,
|
||||
scan_duration: float,
|
||||
break_enable_low: bool,
|
||||
break_time_low: float,
|
||||
cycle_low: int,
|
||||
break_enable_high: bool,
|
||||
break_time_high: float,
|
||||
cycle_high: float,
|
||||
exp_time: float,
|
||||
n_of_trigger: int,
|
||||
motor: DeviceBase = "mo1_bragg",
|
||||
**kwargs,
|
||||
):
|
||||
"""The xas_simple_scan_with_xrd is an oscillation motion on the mono motor
|
||||
with XRD triggering at low and high energy ranges.
|
||||
If scan duration is set to 0, the scan will run infinitely.
|
||||
|
||||
Args:
|
||||
start (float): Start energy for the scan.
|
||||
stop (float): Stop energy for the scan.
|
||||
scan_time (float): Time for one oscillation .
|
||||
scan_duration (float): Total duration of the scan.
|
||||
break_enable_low (bool): Enable breaks for the low energy range.
|
||||
break_time_low (float): Break time for the low energy range.
|
||||
cycle_low (int): Specify how often the triggers should be considered,
|
||||
every nth cycle for low
|
||||
break_enable_high (bool): Enable breaks for the high energy range.
|
||||
break_time_high (float): Break time for the high energy range.
|
||||
cycle_high (int): Specify how often the triggers should be considered,
|
||||
every nth cycle for high
|
||||
exp_time (float): Length of 1 trigger period in seconds
|
||||
n_of_trigger (int): Amount of triggers to be fired during break
|
||||
motor (DeviceBase, optional): Motor device to be used for the scan.
|
||||
Defaults to "mo1_bragg".
|
||||
|
||||
Examples:
|
||||
>>> scans.xas_simple_scan_with_xrd(start=8000, stop=9000, scan_time=1, scan_duration=10, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000)
|
||||
"""
|
||||
super().__init__(
|
||||
start=start,
|
||||
stop=stop,
|
||||
scan_time=scan_time,
|
||||
scan_duration=scan_duration,
|
||||
motor=motor,
|
||||
**kwargs,
|
||||
)
|
||||
self.break_enable_low = break_enable_low
|
||||
self.break_time_low = break_time_low
|
||||
self.cycle_low = cycle_low
|
||||
self.break_enable_high = break_enable_high
|
||||
self.break_time_high = break_time_high
|
||||
self.cycle_high = cycle_high
|
||||
self.exp_time = exp_time
|
||||
self.n_of_trigger = n_of_trigger
|
||||
|
||||
|
||||
class XASAdvancedScan(XASSimpleScan):
|
||||
"""Class for the XAS advanced scan"""
|
||||
|
||||
scan_name = "xas_advanced_scan"
|
||||
gui_config = {
|
||||
"Movement Parameters": ["start", "stop"],
|
||||
"Scan Parameters": ["scan_time", "scan_duration"],
|
||||
"Spline Parameters": ["p_kink", "e_kink"],
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
start: float,
|
||||
stop: float,
|
||||
scan_time: float,
|
||||
scan_duration: float,
|
||||
p_kink: float,
|
||||
e_kink: float,
|
||||
motor: DeviceBase = "mo1_bragg",
|
||||
**kwargs,
|
||||
):
|
||||
"""The xas_advanced_scan is an oscillation motion on the mono motor.
|
||||
Start and Stop define the energy range for the scan, scan_time is the
|
||||
time for one scan cycle and scan_duration is the duration of the scan.
|
||||
If scan duration is set to 0, the scan will run infinitely.
|
||||
p_kink and e_kink add a kink to the motion profile to slow down in the
|
||||
exafs region of the scan.
|
||||
|
||||
Args:
|
||||
start (float): Start angle for the scan.
|
||||
stop (float): Stop angle for the scan.
|
||||
scan_time (float): Time for one oscillation .
|
||||
scan_duration (float): Total duration of the scan.
|
||||
p_kink (float): Position of the kink.
|
||||
e_kink (float): Energy of the kink.
|
||||
motor (DeviceBase, optional): Motor device to be used for the scan.
|
||||
Defaults to "mo1_bragg".
|
||||
|
||||
Examples:
|
||||
>>> scans.xas_advanced_scan(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500)
|
||||
"""
|
||||
super().__init__(
|
||||
start=start,
|
||||
stop=stop,
|
||||
scan_time=scan_time,
|
||||
scan_duration=scan_duration,
|
||||
motor=motor,
|
||||
**kwargs,
|
||||
)
|
||||
self.p_kink = p_kink
|
||||
self.e_kink = e_kink
|
||||
|
||||
|
||||
class XASAdvancedScanWithXRD(XASAdvancedScan):
|
||||
"""Class for the XAS advanced scan with XRD"""
|
||||
|
||||
scan_name = "xas_advanced_scan_with_xrd"
|
||||
gui_config = {
|
||||
"Movement Parameters": ["start", "stop"],
|
||||
"Scan Parameters": ["scan_time", "scan_duration"],
|
||||
"Spline Parameters": ["p_kink", "e_kink"],
|
||||
"Low Energy Break": ["break_enable_low", "break_time_low", "cycle_low"],
|
||||
"High Energy Break": ["break_enable_high", "break_time_high", "cycle_high"],
|
||||
"XRD Triggers": ["exp_time", "n_of_trigger"],
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
start: float,
|
||||
stop: float,
|
||||
scan_time: float,
|
||||
scan_duration: float,
|
||||
p_kink: float,
|
||||
e_kink: float,
|
||||
break_enable_low: bool,
|
||||
break_time_low: float,
|
||||
cycle_low: int,
|
||||
break_enable_high: bool,
|
||||
break_time_high: float,
|
||||
cycle_high: float,
|
||||
exp_time: float,
|
||||
n_of_trigger: int,
|
||||
motor: DeviceBase = "mo1_bragg",
|
||||
**kwargs,
|
||||
):
|
||||
"""The xas_advanced_scan is an oscillation motion on the mono motor
|
||||
with XRD triggering at low and high energy ranges.
|
||||
Start and Stop define the energy range for the scan, scan_time is the time for
|
||||
one scan cycle and scan_duration is the duration of the scan. If scan duration
|
||||
is set to 0, the scan will run infinitely. p_kink and e_kink add a kink to the
|
||||
motion profile to slow down in the exafs region of the scan.
|
||||
|
||||
Args:
|
||||
start (float): Start angle for the scan.
|
||||
stop (float): Stop angle for the scan.
|
||||
scan_time (float): Time for one oscillation .
|
||||
scan_duration (float): Total duration of the scan.
|
||||
p_kink (float): Position of kink.
|
||||
e_kink (float): Energy of the kink.
|
||||
break_enable_low (bool): Enable breaks for the low energy range.
|
||||
break_time_low (float): Break time for the low energy range.
|
||||
cycle_low (int): Specify how often the triggers should be considered,
|
||||
every nth cycle for low
|
||||
break_enable_high (bool): Enable breaks for the high energy range.
|
||||
break_time_high (float): Break time for the high energy range.
|
||||
cycle_high (int): Specify how often the triggers should be considered,
|
||||
every nth cycle for high
|
||||
exp_time (float): Length of 1 trigger period in seconds
|
||||
n_of_trigger (int): Amount of triggers to be fired during break
|
||||
motor (DeviceBase, optional): Motor device to be used for the scan.
|
||||
Defaults to "mo1_bragg".
|
||||
|
||||
Examples:
|
||||
>>> scans.xas_advanced_scan_with_xrd(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000)
|
||||
"""
|
||||
super().__init__(
|
||||
start=start,
|
||||
stop=stop,
|
||||
scan_time=scan_time,
|
||||
scan_duration=scan_duration,
|
||||
p_kink=p_kink,
|
||||
e_kink=e_kink,
|
||||
motor=motor,
|
||||
**kwargs,
|
||||
)
|
||||
self.p_kink = p_kink
|
||||
self.e_kink = e_kink
|
||||
self.break_enable_low = break_enable_low
|
||||
self.break_time_low = break_time_low
|
||||
self.cycle_low = cycle_low
|
||||
self.break_enable_high = break_enable_high
|
||||
self.break_time_high = break_time_high
|
||||
self.cycle_high = cycle_high
|
||||
self.exp_time = exp_time
|
||||
self.n_of_trigger = n_of_trigger
|
||||
@@ -1,84 +0,0 @@
|
||||
"""This module contains the scan class for the nidaq of the Debye beamline for use in continuous mode."""
|
||||
|
||||
import time
|
||||
from typing import Literal
|
||||
|
||||
import numpy as np
|
||||
from bec_lib.device import DeviceBase
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_server.scan_server.scans import AsyncFlyScanBase
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class NIDAQContinuousScan(AsyncFlyScanBase):
|
||||
"""Class for the nidaq continuous scan (without mono)"""
|
||||
|
||||
scan_name = "nidaq_continuous_scan"
|
||||
scan_type = "fly"
|
||||
scan_report_hint = "device_progress"
|
||||
required_kwargs = []
|
||||
use_scan_progress_report = False
|
||||
pre_move = False
|
||||
gui_config = {"Scan Parameters": ["scan_duration"], "Data Compression": ["compression"]}
|
||||
|
||||
def __init__(
|
||||
self, scan_duration: float, daq: DeviceBase = "nidaq", compression: bool = False, **kwargs
|
||||
):
|
||||
"""The NIDAQ continuous scan is used to measure with the NIDAQ without moving the
|
||||
monochromator or any other motor. The NIDAQ thus runs in continuous mode, with a
|
||||
set scan_duration.
|
||||
|
||||
Args:
|
||||
scan_duration (float): Duration of the scan.
|
||||
daq (DeviceBase, optional): DAQ device to be used for the scan.
|
||||
Defaults to "nidaq".
|
||||
Examples:
|
||||
>>> scans.nidaq_continuous_scan(scan_duration=10)
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self.scan_duration = scan_duration
|
||||
self.daq = daq
|
||||
self.start_time = 0
|
||||
self.primary_readout_cycle = 1
|
||||
self.scan_parameters["scan_duration"] = scan_duration
|
||||
self.scan_parameters["compression"] = compression
|
||||
|
||||
def update_readout_priority(self):
|
||||
"""Ensure that NIDAQ is not monitored for any quick EXAFS."""
|
||||
super().update_readout_priority()
|
||||
self.readout_priority["async"].append("nidaq")
|
||||
|
||||
def prepare_positions(self):
|
||||
"""Prepare the positions for the scan."""
|
||||
yield None
|
||||
|
||||
def pre_scan(self):
|
||||
"""Pre Scan action."""
|
||||
|
||||
self.start_time = time.time()
|
||||
# Ensure parent class pre_scan actions to be called.
|
||||
yield from super().pre_scan()
|
||||
|
||||
def scan_report_instructions(self):
|
||||
"""
|
||||
Return the instructions for the scan report.
|
||||
"""
|
||||
yield from self.stubs.scan_report_instruction({"device_progress": [self.daq]})
|
||||
|
||||
def scan_core(self):
|
||||
"""Run the scan core.
|
||||
Kickoff the acquisition of the NIDAQ wait for the completion of the scan.
|
||||
"""
|
||||
kickoff_status = yield from self.stubs.kickoff(device=self.daq)
|
||||
kickoff_status.wait(timeout=5) # wait for proper kickoff of device
|
||||
|
||||
complete_status = yield from self.stubs.complete(device=self.daq, wait=False)
|
||||
|
||||
while not complete_status.done:
|
||||
# Readout monitored devices
|
||||
yield from self.stubs.read(group="monitored", point_id=self.point_id)
|
||||
time.sleep(self.primary_readout_cycle)
|
||||
self.point_id += 1
|
||||
|
||||
self.num_pos = self.point_id
|
||||
@@ -0,0 +1,174 @@
|
||||
"""
|
||||
The NIDAQ continuous scan is used to measure with the NIDAQ without moving the monochromator or any other motor.
|
||||
|
||||
Scan procedure:
|
||||
- prepare_scan
|
||||
- open_scan
|
||||
- stage
|
||||
- pre_scan
|
||||
- scan_core
|
||||
- at_each_point (optionally called by scan_core)
|
||||
- post_scan
|
||||
- unstage
|
||||
- close_scan
|
||||
- on_exception (called if any exception is raised during the scan)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from typing import Annotated
|
||||
|
||||
from bec_lib.device import DeviceBase
|
||||
from bec_lib.scan_args import ScanArgument, Units
|
||||
from bec_server.scan_server.scans.scan_base import ScanBase, ScanType
|
||||
from bec_server.scan_server.scans.scan_modifier import scan_hook
|
||||
|
||||
|
||||
class NidaqContinuousScan(ScanBase):
|
||||
# Scan Type: Hardware triggered or software triggered?
|
||||
# If the main trigger and readout logic is done within the at_each_point method in scan_core, choose SOFTWARE_TRIGGERED.
|
||||
# If the main trigger and readout logic is implemented on a device that is simply kicked off in this scan, choose HARDWARE_TRIGGERED.
|
||||
# This primarily serves as information for devices: The device may need to react differently if a software trigger is expected
|
||||
# for every point.
|
||||
scan_type = ScanType.HARDWARE_TRIGGERED
|
||||
|
||||
# Scan name: This is the name of the scan, e.g. "line_scan". This is used for display purposes and to identify the scan type in user interfaces.
|
||||
# Choose a descriptive name that does not conflict with existing scan names.
|
||||
# It must be a valid Python identifier, that is, it can only contain letters, numbers, and underscores, and must not start with a number.
|
||||
scan_name = "nidaq_continuous_scan"
|
||||
|
||||
gui_config = {"Scan Parameters": ["scan_duration", "daq", "compression"]}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
#fmt: off
|
||||
scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Duration of the scan", units=Units.s)],
|
||||
daq: Annotated[DeviceBase | None, ScanArgument(display_name="Daq", description="DAQ device to be used for the scan")] = None,
|
||||
compression: Annotated[bool, ScanArgument(display_name="Compression", description="Whether to compress the data")]= False,
|
||||
#fmt: on
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
The NIDAQ continuous scan is used to measure with the NIDAQ without moving the
|
||||
monochromator or any other motor. The NIDAQ thus runs in continuous mode, with a
|
||||
set scan_duration.
|
||||
|
||||
Args:
|
||||
scan_duration (float): Duration of the scan
|
||||
daq (DeviceBase): DAQ device to be used for the scan
|
||||
compression (bool): Whether to compress the data
|
||||
|
||||
Returns:
|
||||
ScanReport
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self._baseline_readout_status = None
|
||||
self.scan_duration = scan_duration
|
||||
self.daq = daq or self.dev["nidaq"]
|
||||
self.compression = compression
|
||||
self.monitored_readout_cycle = 1.0 # seconds
|
||||
self.motors = [self.daq]
|
||||
|
||||
self.update_scan_info(scan_duration=scan_duration, compression=compression)
|
||||
self.actions.set_device_readout_priority([self.daq], priority="async")
|
||||
|
||||
@scan_hook
|
||||
def prepare_scan(self):
|
||||
"""
|
||||
Prepare the scan. This can include any steps that need to be executed
|
||||
before the scan is opened, such as preparing the positions (if not done already)
|
||||
or setting up the devices.
|
||||
"""
|
||||
|
||||
self.actions.add_scan_report_instruction_device_progress(self.daq)
|
||||
self._baseline_readout_status = self.actions.read_baseline_devices(wait=False)
|
||||
|
||||
@scan_hook
|
||||
def open_scan(self):
|
||||
"""
|
||||
Open the scan.
|
||||
This step must call self.actions.open_scan() to ensure that a new scan is
|
||||
opened. Make sure to prepare the scan metadata before, either in
|
||||
prepare_scan() or in open_scan() itself and call self.update_scan_info(...)
|
||||
to update the scan metadata if needed.
|
||||
"""
|
||||
self.actions.open_scan()
|
||||
|
||||
@scan_hook
|
||||
def stage(self):
|
||||
"""
|
||||
Stage the devices for the upcoming scan. The stage logic is typically
|
||||
implemented on the device itself (i.e. by the device's stage method).
|
||||
However, if there are any additional steps that need to be executed before
|
||||
staging the devices, they can be implemented here.
|
||||
"""
|
||||
self.actions.stage_all_devices()
|
||||
|
||||
@scan_hook
|
||||
def pre_scan(self):
|
||||
"""
|
||||
Pre-scan steps to be executed before the main scan logic.
|
||||
This is typically the last chance to prepare the devices before the core scan
|
||||
logic is executed. For example, this is a good place to initialize time-criticial
|
||||
devices, e.g. devices that have a short timeout.
|
||||
The pre-scan logic is typically implemented on the device itself.
|
||||
"""
|
||||
self.actions.pre_scan_all_devices()
|
||||
|
||||
@scan_hook
|
||||
def scan_core(self):
|
||||
"""
|
||||
Core scan logic to be executed during the scan.
|
||||
This is where the main scan logic should be implemented.
|
||||
"""
|
||||
|
||||
kickoff_status = self.actions.kickoff(device=self.daq, wait=False)
|
||||
kickoff_status.wait(timeout=5) # wait for proper kickoff of device
|
||||
|
||||
complete_status = self.actions.complete(device=self.daq, wait=False)
|
||||
|
||||
while not complete_status.done:
|
||||
self.at_each_point()
|
||||
time.sleep(self.monitored_readout_cycle)
|
||||
|
||||
@scan_hook
|
||||
def at_each_point(self):
|
||||
"""
|
||||
Logic to be executed at each acquisition point during the scan.
|
||||
"""
|
||||
self.actions.read_monitored_devices()
|
||||
|
||||
@scan_hook
|
||||
def post_scan(self):
|
||||
"""
|
||||
Post-scan steps to be executed after the main scan logic.
|
||||
"""
|
||||
self.actions.complete_all_devices()
|
||||
|
||||
@scan_hook
|
||||
def unstage(self):
|
||||
"""Unstage the scan by executing post-scan steps."""
|
||||
self.actions.unstage_all_devices()
|
||||
|
||||
@scan_hook
|
||||
def close_scan(self):
|
||||
"""Close the scan."""
|
||||
if self._baseline_readout_status is not None:
|
||||
self._baseline_readout_status.wait()
|
||||
self.actions.close_scan()
|
||||
self.actions.check_for_unchecked_statuses()
|
||||
|
||||
@scan_hook
|
||||
def on_exception(self, exception: Exception):
|
||||
"""
|
||||
Handle exceptions that occur during the scan.
|
||||
This is a good place to implement any cleanup logic that needs to be executed in case of an exception,
|
||||
such as returning the devices to a safe state or moving the motors back to their starting position.
|
||||
"""
|
||||
|
||||
#######################################################
|
||||
######### Helper methods for the scan logic ###########
|
||||
#######################################################
|
||||
|
||||
# Implement scan-specific helper methods below.
|
||||
@@ -0,0 +1,326 @@
|
||||
"""
|
||||
V4 implementation of the Debye XAS simple scan.
|
||||
|
||||
Scan procedure:
|
||||
- prepare_scan
|
||||
- open_scan
|
||||
- stage
|
||||
- pre_scan
|
||||
- scan_core
|
||||
- at_each_point (optionally called by scan_core)
|
||||
- post_scan
|
||||
- unstage
|
||||
- close_scan
|
||||
- on_exception (called if any exception is raised during the scan)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from typing import Annotated
|
||||
|
||||
import numpy as np
|
||||
from bec_lib.device import DeviceBase
|
||||
from bec_lib.scan_args import ScanArgument, Units
|
||||
from bec_server.scan_server.scans.scan_base import ScanBase, ScanType
|
||||
from bec_server.scan_server.scans.scan_modifier import scan_hook
|
||||
|
||||
|
||||
class XasSimpleScan(ScanBase):
|
||||
scan_type = ScanType.HARDWARE_TRIGGERED
|
||||
scan_name = "xas_simple_scan"
|
||||
|
||||
gui_config = {
|
||||
"Movement Parameters": ["start", "stop"],
|
||||
"Scan Parameters": ["scan_time", "scan_duration", "monitored_readout_cycle"],
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
# fmt: off
|
||||
start: Annotated[float, ScanArgument(display_name="Start Energy", description="Start energy.", units=Units.eV, ge=4500, le=64000)],
|
||||
stop: Annotated[float, ScanArgument(display_name="Stop Energy", description="Stop energy.", units=Units.eV, ge=4500, le=64000)],
|
||||
scan_time: Annotated[float, ScanArgument(display_name="Scan Time", description="Time for one scan cycle.", units=Units.s, ge=0.05)],
|
||||
scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Total scan duration.", units=Units.s, ge=0.05)],
|
||||
motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None,
|
||||
daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None,
|
||||
monitored_readout_cycle: Annotated[float, ScanArgument(display_name="Monitored Readout Cycle", description="Delay between monitored readouts.",units=Units.s, gt=0)] = 1,
|
||||
# fmt: on
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
Start a simple oscillating scan on the mono bragg motor.
|
||||
|
||||
Args:
|
||||
start (float): Start energy.
|
||||
stop (float): Stop energy.
|
||||
scan_time (float): Time for one scan cycle.
|
||||
scan_duration (float): Total scan duration.
|
||||
motor (DeviceBase | None): Bragg motor device.
|
||||
daq (DeviceBase | None): NIDAQ device.
|
||||
monitored_readout_cycle (float): Delay between monitored readouts.
|
||||
|
||||
Returns:
|
||||
ScanReport
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self.start = start
|
||||
self.stop = stop
|
||||
self.scan_time = scan_time
|
||||
self.scan_duration = scan_duration
|
||||
self.motor = motor if motor is not None else self.dev["mo1_bragg"]
|
||||
self.daq = daq if daq is not None else self.dev["nidaq"]
|
||||
self.monitored_readout_cycle = monitored_readout_cycle
|
||||
self.positions = np.array([self.start, self.stop], dtype=float)
|
||||
|
||||
# We pass on the arguments as "additional_scan_parameters" in the scan info
|
||||
self.update_scan_info(
|
||||
positions=self.positions,
|
||||
scan_time=scan_time,
|
||||
scan_duration=scan_duration,
|
||||
monitored_readout_cycle=monitored_readout_cycle,
|
||||
)
|
||||
self.actions.set_device_readout_priority([self.daq], priority="async")
|
||||
|
||||
@scan_hook
|
||||
def prepare_scan(self):
|
||||
"""
|
||||
Prepare the scan. This can include any steps that need to be executed
|
||||
before the scan is opened, such as preparing the positions (if not done already)
|
||||
or setting up the devices.
|
||||
"""
|
||||
self.actions.add_scan_report_instruction_device_progress(self.motor)
|
||||
self._baseline_readout_status = self.actions.read_baseline_devices(wait=False)
|
||||
|
||||
@scan_hook
|
||||
def open_scan(self):
|
||||
"""
|
||||
Open the scan.
|
||||
This step must call self.actions.open_scan() to ensure that a new scan is
|
||||
opened. Make sure to prepare the scan metadata before, either in
|
||||
prepare_scan() or in open_scan() itself and call self.update_scan_info(...)
|
||||
to update the scan metadata if needed.
|
||||
"""
|
||||
self.actions.open_scan()
|
||||
|
||||
@scan_hook
|
||||
def stage(self):
|
||||
"""
|
||||
Stage the devices for the upcoming scan. The stage logic is typically
|
||||
implemented on the device itself (i.e. by the device's stage method).
|
||||
However, if there are any additional steps that need to be executed before
|
||||
staging the devices, they can be implemented here.
|
||||
"""
|
||||
self.actions.stage_all_devices()
|
||||
|
||||
@scan_hook
|
||||
def pre_scan(self):
|
||||
"""
|
||||
Pre-scan steps to be executed before the main scan logic.
|
||||
This is typically the last chance to prepare the devices before the core scan
|
||||
logic is executed. For example, this is a good place to initialize time-criticial
|
||||
devices, e.g. devices that have a short timeout.
|
||||
The pre-scan logic is typically implemented on the device itself.
|
||||
"""
|
||||
self.actions.pre_scan_all_devices()
|
||||
|
||||
@scan_hook
|
||||
def scan_core(self):
|
||||
"""
|
||||
Core scan logic to be executed during the scan.
|
||||
This is where the main scan logic should be implemented.
|
||||
"""
|
||||
self.actions.kickoff(self.motor)
|
||||
completion_status = self.actions.complete(self.motor, wait=False)
|
||||
|
||||
while not completion_status.done:
|
||||
self.at_each_point()
|
||||
|
||||
@scan_hook
|
||||
def at_each_point(self):
|
||||
"""
|
||||
Logic to be executed at each acquisition point during the scan.
|
||||
"""
|
||||
self.actions.read_monitored_devices()
|
||||
time.sleep(self.monitored_readout_cycle)
|
||||
|
||||
@scan_hook
|
||||
def post_scan(self):
|
||||
"""
|
||||
Post-scan steps to be executed after the main scan logic.
|
||||
"""
|
||||
self.actions.complete_all_devices()
|
||||
|
||||
@scan_hook
|
||||
def unstage(self):
|
||||
"""Unstage the scan by executing post-scan steps."""
|
||||
self.actions.unstage_all_devices()
|
||||
|
||||
@scan_hook
|
||||
def close_scan(self):
|
||||
"""Close the scan."""
|
||||
if self._baseline_readout_status is not None:
|
||||
self._baseline_readout_status.wait()
|
||||
self.actions.close_scan()
|
||||
self.actions.check_for_unchecked_statuses()
|
||||
|
||||
@scan_hook
|
||||
def on_exception(self, exception: Exception):
|
||||
"""
|
||||
Handle exceptions that occur during the scan.
|
||||
This is a good place to implement any cleanup logic that needs to be executed in case of an exception,
|
||||
such as returning the devices to a safe state or moving the motors back to their starting position.
|
||||
"""
|
||||
self.actions.complete_all_devices(wait=False)
|
||||
|
||||
|
||||
class XasSimpleScanWithXrd(XasSimpleScan):
|
||||
scan_name = "xas_simple_scan_with_xrd"
|
||||
gui_config = {
|
||||
"Movement Parameters": ["start", "stop"],
|
||||
"Scan Parameters": ["scan_time", "scan_duration", "monitored_readout_cycle"],
|
||||
"Low Energy Break": ["break_enable_low", "break_time_low", "cycle_low"],
|
||||
"High Energy Break": ["break_enable_high", "break_time_high", "cycle_high"],
|
||||
"XRD Triggers": ["exp_time", "n_of_trigger"],
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
# fmt: off
|
||||
start: Annotated[float, ScanArgument(display_name="Start Energy", description="Start energy.", units=Units.eV)],
|
||||
stop: Annotated[float, ScanArgument(display_name="Stop Energy", description="Stop energy.", units=Units.eV)],
|
||||
scan_time: Annotated[float, ScanArgument(display_name="Scan Time", description="Time for one scan cycle.", units=Units.s, ge=0)],
|
||||
scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Total scan duration.", units=Units.s, ge=0)],
|
||||
break_enable_low: Annotated[bool, ScanArgument(display_name="Break Enable Low", description="Enable breaks for the low energy range.")],
|
||||
break_time_low: Annotated[float, ScanArgument(display_name="Break Time Low", description="Break time for the low energy range.", units=Units.s, ge=0)],
|
||||
cycle_low: Annotated[int, ScanArgument(display_name="Cycle Low", description="Use triggers every nth low-energy cycle.", ge=0)],
|
||||
break_enable_high: Annotated[bool, ScanArgument(display_name="Break Enable High", description="Enable breaks for the high energy range.")],
|
||||
break_time_high: Annotated[float, ScanArgument(display_name="Break Time High", description="Break time for the high energy range.", units=Units.s, ge=0)],
|
||||
cycle_high: Annotated[int, ScanArgument(display_name="Cycle High", description="Use triggers every nth high-energy cycle.", ge=0)],
|
||||
exp_time: Annotated[float, ScanArgument(display_name="Exposure Time", description="Length of one trigger period.", units=Units.s, ge=0)],
|
||||
n_of_trigger: Annotated[int, ScanArgument(display_name="Number Of Trigger", description="Amount of triggers fired during a break.", ge=0)],
|
||||
motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None,
|
||||
daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None,
|
||||
monitored_readout_cycle: Annotated[float, ScanArgument(display_name="Monitored Readout Cycle", description="Delay between monitored readouts.", units=Units.s, gt=0)] = 1,
|
||||
**kwargs,
|
||||
# fmt: on
|
||||
):
|
||||
super().__init__(
|
||||
start=start,
|
||||
stop=stop,
|
||||
scan_time=scan_time,
|
||||
scan_duration=scan_duration,
|
||||
motor=motor,
|
||||
daq=daq,
|
||||
monitored_readout_cycle=monitored_readout_cycle,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
# We pass on the arguments as "additional_scan_parameters" in the scan info
|
||||
self.update_scan_info(
|
||||
break_enable_low=break_enable_low,
|
||||
break_time_low=break_time_low,
|
||||
cycle_low=cycle_low,
|
||||
break_enable_high=break_enable_high,
|
||||
break_time_high=break_time_high,
|
||||
cycle_high=cycle_high,
|
||||
exp_time=exp_time,
|
||||
n_of_trigger=n_of_trigger,
|
||||
)
|
||||
|
||||
|
||||
class XasAdvancedScan(XasSimpleScan):
|
||||
scan_name = "xas_advanced_scan"
|
||||
gui_config = {
|
||||
"Movement Parameters": ["start", "stop"],
|
||||
"Scan Parameters": ["scan_time", "scan_duration", "monitored_readout_cycle"],
|
||||
"Spline Parameters": ["p_kink", "e_kink"],
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
# fmt: off
|
||||
start: Annotated[float, ScanArgument(display_name="Start Energy", description="Start energy.", units=Units.eV)],
|
||||
stop: Annotated[float, ScanArgument(display_name="Stop Energy", description="Stop energy.", units=Units.eV)],
|
||||
scan_time: Annotated[float, ScanArgument(display_name="Scan Time", description="Time for one scan cycle.", units=Units.s, ge=0)],
|
||||
scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Total scan duration.", units=Units.s, ge=0)],
|
||||
p_kink: Annotated[float, ScanArgument(display_name="P Kink", description="Position of the kink.", ge=0)],
|
||||
e_kink: Annotated[float, ScanArgument(display_name="E Kink", description="Energy of the kink.", units=Units.eV)],
|
||||
motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None,
|
||||
daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None,
|
||||
monitored_readout_cycle: Annotated[float, ScanArgument(display_name="Monitored Readout Cycle", description="Delay between monitored readouts.", units=Units.s, gt=0)] = 1,
|
||||
**kwargs,
|
||||
# fmt: on
|
||||
):
|
||||
super().__init__(
|
||||
start=start,
|
||||
stop=stop,
|
||||
scan_time=scan_time,
|
||||
scan_duration=scan_duration,
|
||||
motor=motor,
|
||||
daq=daq,
|
||||
monitored_readout_cycle=monitored_readout_cycle,
|
||||
**kwargs,
|
||||
)
|
||||
# We pass on the arguments as "additional_scan_parameters" in the scan info
|
||||
self.update_scan_info(p_kink=p_kink, e_kink=e_kink)
|
||||
|
||||
|
||||
class XasAdvancedScanWithXrd(XasAdvancedScan):
|
||||
scan_name = "xas_advanced_scan_with_xrd"
|
||||
gui_config = {
|
||||
"Movement Parameters": ["start", "stop"],
|
||||
"Scan Parameters": ["scan_time", "scan_duration", "monitored_readout_cycle"],
|
||||
"Spline Parameters": ["p_kink", "e_kink"],
|
||||
"Low Energy Break": ["break_enable_low", "break_time_low", "cycle_low"],
|
||||
"High Energy Break": ["break_enable_high", "break_time_high", "cycle_high"],
|
||||
"XRD Triggers": ["exp_time", "n_of_trigger"],
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
# fmt: off
|
||||
start: Annotated[float, ScanArgument(display_name="Start Energy", description="Start energy.", units=Units.eV)],
|
||||
stop: Annotated[float, ScanArgument(display_name="Stop Energy", description="Stop energy.", units=Units.eV)],
|
||||
scan_time: Annotated[float, ScanArgument(display_name="Scan Time", description="Time for one scan cycle.", units=Units.s, ge=0)],
|
||||
scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Total scan duration.", units=Units.s, ge=0)],
|
||||
p_kink: Annotated[float, ScanArgument(display_name="P Kink", description="Position of the kink.", ge=0)],
|
||||
e_kink: Annotated[float, ScanArgument(display_name="E Kink", description="Energy of the kink.", units=Units.eV)],
|
||||
break_enable_low: Annotated[bool, ScanArgument(display_name="Break Enable Low", description="Enable breaks for the low energy range.")],
|
||||
break_time_low: Annotated[float, ScanArgument(display_name="Break Time Low", description="Break time for the low energy range.", units=Units.s, ge=0)],
|
||||
cycle_low: Annotated[int, ScanArgument(display_name="Cycle Low", description="Use triggers every nth low-energy cycle.", ge=0)],
|
||||
break_enable_high: Annotated[bool, ScanArgument(display_name="Break Enable High", description="Enable breaks for the high energy range.")],
|
||||
break_time_high: Annotated[float, ScanArgument(display_name="Break Time High", description="Break time for the high energy range.", units=Units.s, ge=0)],
|
||||
cycle_high: Annotated[int, ScanArgument(display_name="Cycle High", description="Use triggers every nth high-energy cycle.", ge=0)],
|
||||
exp_time: Annotated[float, ScanArgument(display_name="Exposure Time", description="Length of one trigger period.", units=Units.s, ge=0)],
|
||||
n_of_trigger: Annotated[int, ScanArgument(display_name="Number Of Trigger", description="Amount of triggers fired during a break.", ge=0)],
|
||||
motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None,
|
||||
daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None,
|
||||
monitored_readout_cycle: Annotated[float, ScanArgument(display_name="Monitored Readout Cycle", description="Delay between monitored readouts.", units=Units.s, gt=0)] = 1,
|
||||
**kwargs,
|
||||
# fmt: on
|
||||
):
|
||||
super().__init__(
|
||||
start=start,
|
||||
stop=stop,
|
||||
scan_time=scan_time,
|
||||
scan_duration=scan_duration,
|
||||
p_kink=p_kink,
|
||||
e_kink=e_kink,
|
||||
motor=motor,
|
||||
daq=daq,
|
||||
monitored_readout_cycle=monitored_readout_cycle,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
# We pass on the arguments as "additional_scan_parameters" in the scan info
|
||||
self.update_scan_info(
|
||||
break_enable_low=break_enable_low,
|
||||
break_time_low=break_time_low,
|
||||
cycle_low=cycle_low,
|
||||
break_enable_high=break_enable_high,
|
||||
break_time_high=break_time_high,
|
||||
cycle_high=cycle_high,
|
||||
exp_time=exp_time,
|
||||
n_of_trigger=n_of_trigger,
|
||||
)
|
||||
@@ -159,60 +159,6 @@ def test_set_control_settings(mock_bragg):
|
||||
assert dev.scan_control.scan_duration.get() == 5
|
||||
|
||||
|
||||
def test_update_scan_parameters(mock_bragg):
|
||||
dev = mock_bragg
|
||||
msg = ScanStatusMessage(
|
||||
scan_id="my_scan_id",
|
||||
status="closed",
|
||||
request_inputs={
|
||||
"inputs": {},
|
||||
"kwargs": {
|
||||
"start": 0,
|
||||
"stop": 5,
|
||||
"scan_time": 1,
|
||||
"scan_duration": 10,
|
||||
"xrd_enable_low": True,
|
||||
"xrd_enable_high": False,
|
||||
"num_trigger_low": 1,
|
||||
"num_trigger_high": 7,
|
||||
"exp_time_low": 1,
|
||||
"exp_time_high": 3,
|
||||
"cycle_low": 1,
|
||||
"cycle_high": 5,
|
||||
"p_kink": 50,
|
||||
"e_kink": 8000,
|
||||
},
|
||||
},
|
||||
info={
|
||||
"kwargs": {
|
||||
"start": 0,
|
||||
"stop": 5,
|
||||
"scan_time": 1,
|
||||
"scan_duration": 10,
|
||||
"xrd_enable_low": True,
|
||||
"xrd_enable_high": False,
|
||||
"num_trigger_low": 1,
|
||||
"num_trigger_high": 7,
|
||||
"exp_time_low": 1,
|
||||
"exp_time_high": 3,
|
||||
"cycle_low": 1,
|
||||
"cycle_high": 5,
|
||||
"p_kink": 50,
|
||||
"e_kink": 8000,
|
||||
}
|
||||
},
|
||||
metadata={},
|
||||
)
|
||||
mock_bragg.scan_info.msg = msg
|
||||
scan_param = dev.scan_parameter.model_dump()
|
||||
for _, v in scan_param.items():
|
||||
assert v == None
|
||||
dev._update_scan_parameter()
|
||||
scan_param = dev.scan_parameter.model_dump()
|
||||
for k, v in scan_param.items():
|
||||
assert v == msg.content["request_inputs"]["kwargs"].get(k, None)
|
||||
|
||||
|
||||
def test_kickoff_scan(mock_bragg):
|
||||
dev = mock_bragg
|
||||
dev.scan_control.scan_status._read_pv.mock_data = ScanControlScanStatus.READY
|
||||
|
||||
@@ -6,6 +6,7 @@ from unittest import mock
|
||||
import ophyd
|
||||
import pytest
|
||||
from bec_server.scan_server.scan_worker import ScanWorker
|
||||
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
|
||||
from ophyd.status import WaitTimeoutError
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import DeviceStoppedError
|
||||
from ophyd_devices.tests.utils import MockPV
|
||||
@@ -15,6 +16,13 @@ from debye_bec.devices.nidaq.nidaq import Nidaq, NidaqError
|
||||
|
||||
# TODO move this function to ophyd_devices, it is duplicated in csaxs_bec and needed for other pluging repositories
|
||||
from debye_bec.devices.test_utils.utils import patch_dual_pvs
|
||||
from debye_bec.devices.utils.utils import fetch_scan_info
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def scan_info_mock():
|
||||
"""Fixture for the ScanInfo object."""
|
||||
return ScanServerScanInfo(scan_name="xas_simple_scan", scan_id="test")
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
@@ -52,13 +60,17 @@ def test_init(mock_nidaq):
|
||||
]
|
||||
|
||||
|
||||
def test_check_if_scan_name_is_valid(mock_nidaq):
|
||||
def test_check_if_scan_name_is_valid(mock_nidaq, scan_info_mock):
|
||||
"""Test the check_if_scan_name_is_valid method."""
|
||||
dev = mock_nidaq
|
||||
dev.scan_info.msg.scan_name = "xas_simple_scan"
|
||||
assert dev._check_if_scan_name_is_valid()
|
||||
dev.scan_info.msg.scan_name = "invalid_scan_name"
|
||||
assert not dev._check_if_scan_name_is_valid()
|
||||
scan_info_mock.scan_name = "xas_simple_scan"
|
||||
dev.scan_info.msg.info.update(scan_info_mock.model_dump())
|
||||
scan_parameters = fetch_scan_info(dev.scan_info)
|
||||
assert dev._check_if_scan_name_is_valid(scan_parameters)
|
||||
scan_info_mock.scan_name = "invalid_scan_name"
|
||||
dev.scan_info.msg.info.update(scan_info_mock.model_dump())
|
||||
scan_parameters = fetch_scan_info(dev.scan_info)
|
||||
assert not dev._check_if_scan_name_is_valid(scan_parameters)
|
||||
|
||||
|
||||
def test_set_config(mock_nidaq):
|
||||
@@ -120,11 +132,13 @@ def test_on_unstage(mock_nidaq):
|
||||
("nidaq_continuous_scan", False, 0),
|
||||
],
|
||||
)
|
||||
def test_on_pre_scan(mock_nidaq, scan_name, raise_error, nidaq_state):
|
||||
def test_on_pre_scan(mock_nidaq, scan_name, raise_error, nidaq_state, scan_info_mock):
|
||||
"""Test the on_pre_scan method of the Nidaq device."""
|
||||
dev = mock_nidaq
|
||||
dev.state.put(nidaq_state)
|
||||
dev.scan_info.msg.scan_name = scan_name
|
||||
scan_info_mock.scan_name = scan_name
|
||||
dev.scan_info.msg.info.update(scan_info_mock.model_dump())
|
||||
dev.scan_parameters = fetch_scan_info(dev.scan_info)
|
||||
dev._timeout_wait_for_pv = 0.1 # Set a short timeout for testing
|
||||
if not raise_error:
|
||||
dev.pre_scan()
|
||||
@@ -133,11 +147,13 @@ def test_on_pre_scan(mock_nidaq, scan_name, raise_error, nidaq_state):
|
||||
dev.pre_scan()
|
||||
|
||||
|
||||
def test_on_complete(mock_nidaq):
|
||||
def test_on_complete(mock_nidaq, scan_info_mock):
|
||||
"""Test the on_complete method of the Nidaq device."""
|
||||
dev = mock_nidaq
|
||||
scan_info_mock.scan_name = "nidaq_continuous_scan"
|
||||
dev.scan_info.msg.info.update(scan_info_mock.model_dump())
|
||||
dev.scan_parameters = fetch_scan_info(dev.scan_info)
|
||||
# Check for nidaq_continuous_scan
|
||||
dev.scan_info.msg.scan_name = "nidaq_continuous_scan"
|
||||
dev.state.put(0) # Set state to DISABLED
|
||||
status = dev.complete()
|
||||
assert status.done is False
|
||||
@@ -147,7 +163,9 @@ def test_on_complete(mock_nidaq):
|
||||
assert status.done is True
|
||||
|
||||
# Check for XAS simple scan
|
||||
dev.scan_info.msg.scan_name = "xas_simple_scan"
|
||||
scan_info_mock.scan_name = "xas_simple_scan"
|
||||
dev.scan_info.msg.info.update(scan_info_mock.model_dump())
|
||||
dev.scan_parameters = fetch_scan_info(dev.scan_info)
|
||||
dev.state.put(0) # Set state to ACQUIRE
|
||||
dev.stop_call.put(0)
|
||||
dev._timeout_wait_for_pv = 5
|
||||
|
||||
@@ -7,6 +7,9 @@ import ophyd
|
||||
import pytest
|
||||
from bec_lib.messages import ScanStatusMessage
|
||||
from bec_server.scan_server.scan_worker import ScanWorker
|
||||
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
|
||||
from bec_server.scan_server.tests.scan_fixtures import *
|
||||
from bec_server.scan_server.tests.scan_fixtures import _MockDevice
|
||||
from ophyd_devices import CompareStatus, DeviceStatus
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import DeviceStoppedError
|
||||
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
|
||||
@@ -20,6 +23,7 @@ from debye_bec.devices.pilatus.pilatus import (
|
||||
TRIGGERMODE,
|
||||
Pilatus,
|
||||
)
|
||||
from debye_bec.devices.utils.utils import fetch_scan_info
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
from bec_lib.messages import FileMessage
|
||||
@@ -34,32 +38,38 @@ if TYPE_CHECKING: # pragma no cover
|
||||
@pytest.fixture(
|
||||
scope="function",
|
||||
params=[
|
||||
(0.1, 1, 1, "line_scan", "step"),
|
||||
(0.2, 2, 2, "time_scan", "step"),
|
||||
(0.5, 5, 5, "xas_advanced_scan", "fly"),
|
||||
(("samx", 0.1, 1, 5, "samy", 0, 1, 5), {"relative": True}, "_v4_hexagonal_scan"),
|
||||
((1, 0.2), {}, "_v4_time_scan"),
|
||||
((9000, 10000, 1, 20, 0.1, 9500), {}, "xas_advanced_scan"),
|
||||
],
|
||||
)
|
||||
def mock_scan_info(request, tmpdir):
|
||||
exp_time, frames_per_trigger, num_points, scan_name, scan_type = request.param
|
||||
scan_info = ScanStatusMessage(
|
||||
scan_id="test_id",
|
||||
status="open",
|
||||
scan_type=scan_type,
|
||||
scan_number=1,
|
||||
scan_parameters={
|
||||
"exp_time": exp_time,
|
||||
"frames_per_trigger": frames_per_trigger,
|
||||
"system_config": {},
|
||||
},
|
||||
info={"file_components": (f"{tmpdir}/data/S00000/S000001", "h5")},
|
||||
num_points=num_points,
|
||||
scan_name=scan_name,
|
||||
)
|
||||
yield scan_info
|
||||
def mock_scan_info(request, tmpdir, v4_scan_assembler, device_manager):
|
||||
args, kwargs, scan_name = request.param
|
||||
mo1_bragg = _MockDevice(name="mo1_bragg")
|
||||
nidaq = _MockDevice(name="nidaq")
|
||||
device_manager.add_device(mo1_bragg)
|
||||
device_manager.add_device(nidaq)
|
||||
scan = v4_scan_assembler(scan_name, *args, **kwargs)
|
||||
yield scan.scan_info
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pilatus(mock_scan_info) -> Generator[Pilatus, None, None]:
|
||||
def mock_scan_status_message(mock_scan_info, tmpdir) -> ScanStatusMessage:
|
||||
info = mock_scan_info.model_dump()
|
||||
info.update({"file_components": (f"{tmpdir}/data/S00000/S000001", "h5")})
|
||||
return ScanStatusMessage(
|
||||
scan_id=mock_scan_info.scan_id,
|
||||
status="open",
|
||||
scan_number=1,
|
||||
scan_name=mock_scan_info.scan_name,
|
||||
scan_type="fly" if mock_scan_info.scan_type == "hardware_triggered" else "step",
|
||||
num_points=mock_scan_info.num_points,
|
||||
info=info,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pilatus(mock_scan_status_message) -> Generator[Pilatus, None, None]:
|
||||
name = "pilatus"
|
||||
prefix = "X01DA-OP-MO1:PILATUS:"
|
||||
with mock.patch.object(ophyd, "cl") as mock_cl:
|
||||
@@ -70,8 +80,9 @@ def pilatus(mock_scan_info) -> Generator[Pilatus, None, None]:
|
||||
# dev.image1 = mock.MagicMock()
|
||||
# with mock.patch.object(dev, "image1"):
|
||||
with mock.patch.object(dev, "task_handler"):
|
||||
dev.scan_info.msg = mock_scan_info
|
||||
dev.scan_info.msg = mock_scan_status_message
|
||||
try:
|
||||
dev.scan_parameters = fetch_scan_info(dev.scan_info)
|
||||
yield dev
|
||||
finally:
|
||||
try:
|
||||
@@ -177,7 +188,6 @@ def test_pilatus_on_trigger_cancel_on_stop(pilatus):
|
||||
|
||||
def test_pilatus_on_complete(pilatus: Pilatus):
|
||||
"""Test the on_complete logic of the Pilatus detector."""
|
||||
|
||||
if pilatus.scan_info.msg.scan_name.startswith("xas"):
|
||||
# TODO add test cases for xas scans
|
||||
# status = pilatus.complete()
|
||||
@@ -196,8 +206,9 @@ def test_pilatus_on_complete(pilatus: Pilatus):
|
||||
pilatus.cam.acquire._read_pv.mock_data = ACQUIREMODE.ACQUIRING.value
|
||||
pilatus.hdf.capture._read_pv.mock_data = ACQUIREMODE.ACQUIRING.value
|
||||
pilatus.cam.armed._read_pv.mock_data = DETECTORSTATE.ARMED.value
|
||||
num_images = pilatus.scan_info.msg.num_points * pilatus.scan_info.msg.scan_parameters.get(
|
||||
"frames_per_trigger", 1
|
||||
num_images = (
|
||||
pilatus.scan_parameters.num_points
|
||||
* pilatus.scan_parameters.additional_scan_parameters.get("frames_per_trigger", 1)
|
||||
)
|
||||
pilatus.hdf.num_captured._read_pv.mock_data = num_images - 1
|
||||
# Call on complete
|
||||
@@ -275,9 +286,12 @@ def test_pilatus_on_complete(pilatus: Pilatus):
|
||||
|
||||
def test_pilatus_on_stage_raises_low_exp_time(pilatus):
|
||||
"""Test that on_stage raises a ValueError if the exposure time is too low."""
|
||||
pilatus.scan_info.msg.scan_parameters["exp_time"] = 0.09
|
||||
scan_msg = pilatus.scan_info.msg
|
||||
if scan_msg.scan_type != "step" and scan_msg.scan_name not in pilatus.xas_xrd_scan_names:
|
||||
pilatus.scan_info.msg.info["exp_time"] = 0.09
|
||||
pilatus.scan_parameters = fetch_scan_info(pilatus.scan_info)
|
||||
if (
|
||||
pilatus.scan_parameters.scan_type != "software_triggered"
|
||||
and pilatus.scan_parameters.scan_name not in pilatus.xas_xrd_scan_names
|
||||
):
|
||||
return
|
||||
with pytest.raises(ValueError):
|
||||
pilatus.on_stage()
|
||||
|
||||
@@ -3,10 +3,10 @@ from functools import partial
|
||||
|
||||
import pytest
|
||||
from bec_server.device_server.tests.utils import DeviceMockType, DMMock
|
||||
from bec_server.scan_server.tests.fixtures import (
|
||||
ScanStubStatusMock,
|
||||
connector_mock,
|
||||
instruction_handler_mock,
|
||||
from bec_server.scan_server.tests.scan_fixtures import (
|
||||
nth_done_status_mock,
|
||||
readout_priority,
|
||||
v4_scan_assembler,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -1,429 +0,0 @@
|
||||
# pylint: skip-file
|
||||
from unittest import mock
|
||||
|
||||
from bec_lib.messages import DeviceInstructionMessage
|
||||
from bec_server.device_server.tests.utils import DMMock
|
||||
|
||||
from debye_bec.scans import (
|
||||
XASAdvancedScan,
|
||||
XASAdvancedScanWithXRD,
|
||||
XASSimpleScan,
|
||||
XASSimpleScanWithXRD,
|
||||
)
|
||||
|
||||
|
||||
def get_instructions(request, ScanStubStatusMock):
|
||||
request.metadata["RID"] = "my_test_request_id"
|
||||
|
||||
def fake_done():
|
||||
"""
|
||||
Fake done function for ScanStubStatusMock. Upon each call, it returns the next value from the generator.
|
||||
This is used to simulate the completion of the scan.
|
||||
"""
|
||||
yield False
|
||||
yield False
|
||||
yield True
|
||||
|
||||
def fake_complete(*args, **kwargs):
|
||||
yield "fake_complete"
|
||||
return ScanStubStatusMock(done_func=fake_done)
|
||||
|
||||
with (
|
||||
mock.patch.object(request.stubs, "complete", side_effect=fake_complete),
|
||||
mock.patch.object(request.stubs, "_get_result_from_status", return_value=None),
|
||||
):
|
||||
reference_commands = list(request.run())
|
||||
|
||||
for cmd in reference_commands:
|
||||
if not cmd or isinstance(cmd, str):
|
||||
continue
|
||||
if "RID" in cmd.metadata:
|
||||
cmd.metadata["RID"] = "my_test_request_id"
|
||||
if "rpc_id" in cmd.parameter:
|
||||
cmd.parameter["rpc_id"] = "my_test_rpc_id"
|
||||
cmd.metadata.pop("device_instr_id", None)
|
||||
|
||||
return reference_commands
|
||||
|
||||
|
||||
def test_xas_simple_scan(scan_assembler, ScanStubStatusMock):
|
||||
|
||||
request = scan_assembler(XASSimpleScan, start=0, stop=5, scan_time=1, scan_duration=10)
|
||||
request.device_manager.add_device("nidaq")
|
||||
reference_commands = get_instructions(request, ScanStubStatusMock)
|
||||
|
||||
assert reference_commands == [
|
||||
None,
|
||||
None,
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="scan_report_instruction",
|
||||
parameter={"device_progress": ["mo1_bragg"]},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="open_scan",
|
||||
parameter={
|
||||
"readout_priority": {
|
||||
"monitored": [],
|
||||
"baseline": [],
|
||||
"on_request": [],
|
||||
"async": ["nidaq"],
|
||||
},
|
||||
"num_points": None,
|
||||
"positions": [0.0, 5.0],
|
||||
"scan_name": "xas_simple_scan",
|
||||
"scan_type": "fly",
|
||||
},
|
||||
),
|
||||
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
|
||||
DeviceInstructionMessage(
|
||||
metadata={},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
|
||||
action="stage",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
|
||||
device=["samx"],
|
||||
action="read",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
|
||||
action="pre_scan",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device="mo1_bragg",
|
||||
action="kickoff",
|
||||
parameter={"configure": {}},
|
||||
),
|
||||
"fake_complete",
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
|
||||
device=["bpm4i", "eiger", "mo1_bragg"],
|
||||
action="read",
|
||||
parameter={"group": "monitored"},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
|
||||
device=["bpm4i", "eiger", "mo1_bragg"],
|
||||
action="read",
|
||||
parameter={"group": "monitored"},
|
||||
),
|
||||
"fake_complete",
|
||||
DeviceInstructionMessage(
|
||||
metadata={},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
|
||||
action="unstage",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="close_scan",
|
||||
parameter={},
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def test_xas_simple_scan_with_xrd(scan_assembler, ScanStubStatusMock):
|
||||
|
||||
request = scan_assembler(
|
||||
XASSimpleScanWithXRD,
|
||||
start=0,
|
||||
stop=5,
|
||||
scan_time=1,
|
||||
scan_duration=10,
|
||||
break_enable_low=True,
|
||||
break_time_low=1,
|
||||
cycle_low=1,
|
||||
break_enable_high=True,
|
||||
break_time_high=2,
|
||||
exp_time=1,
|
||||
n_of_trigger=1,
|
||||
cycle_high=4,
|
||||
)
|
||||
request.device_manager.add_device("nidaq")
|
||||
reference_commands = get_instructions(request, ScanStubStatusMock)
|
||||
# TODO #64 based on creating this ScanStatusMessage, we should test the logic of stage/kickoff/complete/unstage in Pilatus and mo1Bragg
|
||||
|
||||
assert reference_commands == [
|
||||
None,
|
||||
None,
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="scan_report_instruction",
|
||||
parameter={"device_progress": ["mo1_bragg"]},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="open_scan",
|
||||
parameter={
|
||||
"readout_priority": {
|
||||
"monitored": [],
|
||||
"baseline": [],
|
||||
"on_request": [],
|
||||
"async": ["nidaq"],
|
||||
},
|
||||
"num_points": None,
|
||||
"positions": [0.0, 5.0],
|
||||
"scan_name": "xas_simple_scan_with_xrd",
|
||||
"scan_type": "fly",
|
||||
},
|
||||
),
|
||||
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
|
||||
DeviceInstructionMessage(
|
||||
metadata={},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
|
||||
action="stage",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
|
||||
device=["samx"],
|
||||
action="read",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
|
||||
action="pre_scan",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device="mo1_bragg",
|
||||
action="kickoff",
|
||||
parameter={"configure": {}},
|
||||
),
|
||||
"fake_complete",
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
|
||||
device=["bpm4i", "eiger", "mo1_bragg"],
|
||||
action="read",
|
||||
parameter={"group": "monitored"},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
|
||||
device=["bpm4i", "eiger", "mo1_bragg"],
|
||||
action="read",
|
||||
parameter={"group": "monitored"},
|
||||
),
|
||||
"fake_complete",
|
||||
DeviceInstructionMessage(
|
||||
metadata={},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
|
||||
action="unstage",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="close_scan",
|
||||
parameter={},
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def test_xas_advanced_scan(scan_assembler, ScanStubStatusMock):
|
||||
|
||||
request = scan_assembler(
|
||||
XASAdvancedScan,
|
||||
start=8000,
|
||||
stop=9000,
|
||||
scan_time=1,
|
||||
scan_duration=10,
|
||||
p_kink=50,
|
||||
e_kink=8500,
|
||||
)
|
||||
request.device_manager.add_device("nidaq")
|
||||
reference_commands = get_instructions(request, ScanStubStatusMock)
|
||||
|
||||
assert reference_commands == [
|
||||
None,
|
||||
None,
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="scan_report_instruction",
|
||||
parameter={"device_progress": ["mo1_bragg"]},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="open_scan",
|
||||
parameter={
|
||||
"readout_priority": {
|
||||
"monitored": [],
|
||||
"baseline": [],
|
||||
"on_request": [],
|
||||
"async": ["nidaq"],
|
||||
},
|
||||
"num_points": None,
|
||||
"positions": [8000.0, 9000.0],
|
||||
"scan_name": "xas_advanced_scan",
|
||||
"scan_type": "fly",
|
||||
},
|
||||
),
|
||||
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
|
||||
DeviceInstructionMessage(
|
||||
metadata={},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
|
||||
action="stage",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
|
||||
device=["samx"],
|
||||
action="read",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
|
||||
action="pre_scan",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device="mo1_bragg",
|
||||
action="kickoff",
|
||||
parameter={"configure": {}},
|
||||
),
|
||||
"fake_complete",
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
|
||||
device=["bpm4i", "eiger", "mo1_bragg"],
|
||||
action="read",
|
||||
parameter={"group": "monitored"},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
|
||||
device=["bpm4i", "eiger", "mo1_bragg"],
|
||||
action="read",
|
||||
parameter={"group": "monitored"},
|
||||
),
|
||||
"fake_complete",
|
||||
DeviceInstructionMessage(
|
||||
metadata={},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
|
||||
action="unstage",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="close_scan",
|
||||
parameter={},
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def test_xas_advanced_scan_with_xrd(scan_assembler, ScanStubStatusMock):
|
||||
|
||||
request = scan_assembler(
|
||||
XASAdvancedScanWithXRD,
|
||||
start=8000,
|
||||
stop=9000,
|
||||
scan_time=1,
|
||||
scan_duration=10,
|
||||
p_kink=50,
|
||||
e_kink=8500,
|
||||
break_enable_low=True,
|
||||
break_time_low=1,
|
||||
cycle_low=1,
|
||||
break_enable_high=True,
|
||||
break_time_high=2,
|
||||
exp_time=1,
|
||||
n_of_trigger=1,
|
||||
cycle_high=4,
|
||||
)
|
||||
request.device_manager.add_device("nidaq")
|
||||
reference_commands = get_instructions(request, ScanStubStatusMock)
|
||||
|
||||
assert reference_commands == [
|
||||
None,
|
||||
None,
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="scan_report_instruction",
|
||||
parameter={"device_progress": ["mo1_bragg"]},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="open_scan",
|
||||
parameter={
|
||||
"readout_priority": {
|
||||
"monitored": [],
|
||||
"baseline": [],
|
||||
"on_request": [],
|
||||
"async": ["nidaq"],
|
||||
},
|
||||
"num_points": None,
|
||||
"positions": [8000.0, 9000.0],
|
||||
"scan_name": "xas_advanced_scan_with_xrd",
|
||||
"scan_type": "fly",
|
||||
},
|
||||
),
|
||||
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
|
||||
DeviceInstructionMessage(
|
||||
metadata={},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
|
||||
action="stage",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
|
||||
device=["samx"],
|
||||
action="read",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
|
||||
action="pre_scan",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device="mo1_bragg",
|
||||
action="kickoff",
|
||||
parameter={"configure": {}},
|
||||
),
|
||||
"fake_complete",
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
|
||||
device=["bpm4i", "eiger", "mo1_bragg"],
|
||||
action="read",
|
||||
parameter={"group": "monitored"},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
|
||||
device=["bpm4i", "eiger", "mo1_bragg"],
|
||||
action="read",
|
||||
parameter={"group": "monitored"},
|
||||
),
|
||||
"fake_complete",
|
||||
DeviceInstructionMessage(
|
||||
metadata={},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
|
||||
action="unstage",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="close_scan",
|
||||
parameter={},
|
||||
),
|
||||
]
|
||||
@@ -0,0 +1,152 @@
|
||||
# pylint: skip-file
|
||||
from unittest import mock
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
from bec_server.scan_server.tests.scan_fixtures import *
|
||||
from bec_server.scan_server.tests.scan_hook_tests import *
|
||||
|
||||
XAS_SIMPLE_SCAN_DEFAULT_HOOK_TESTS = [
|
||||
("prepare_scan", [assert_prepare_scan_reads_baseline_devices]),
|
||||
("open_scan", [assert_scan_open_called]),
|
||||
("stage", [assert_stage_all_devices_called]),
|
||||
("pre_scan", [assert_pre_scan_called]),
|
||||
("unstage", [assert_unstage_all_devices_called]),
|
||||
("close_scan", [assert_close_scan_waits_for_baseline_and_closes]),
|
||||
]
|
||||
|
||||
|
||||
def _assemble_xas_simple_scan(v4_scan_assembler, **overrides):
|
||||
params = {
|
||||
"start": 8000.0,
|
||||
"stop": 9000.0,
|
||||
"scan_time": 1.0,
|
||||
"scan_duration": 10.0,
|
||||
"motor": "mo1_bragg",
|
||||
"daq": "nidaq",
|
||||
"monitored_readout_cycle": 1.0,
|
||||
}
|
||||
params.update(overrides)
|
||||
return v4_scan_assembler("xas_simple_scan", **params)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(("hook_name", "hook_tests"), XAS_SIMPLE_SCAN_DEFAULT_HOOK_TESTS)
|
||||
def test_xas_simple_scan_v4_default_hooks(
|
||||
v4_scan_assembler, nth_done_status_mock, hook_name, hook_tests
|
||||
):
|
||||
scan = _assemble_xas_simple_scan(v4_scan_assembler)
|
||||
|
||||
run_scan_tests(scan, [(hook_name, hook_tests)], nth_done_status_mock=nth_done_status_mock)
|
||||
|
||||
|
||||
def test_xas_simple_scan_v4_prepare_scan_updates_metadata(v4_scan_assembler):
|
||||
scan = _assemble_xas_simple_scan(v4_scan_assembler)
|
||||
scan.actions.add_scan_report_instruction_device_progress = mock.MagicMock()
|
||||
baseline_status = mock.MagicMock()
|
||||
scan.actions.read_baseline_devices = mock.MagicMock(return_value=baseline_status)
|
||||
|
||||
scan.prepare_scan()
|
||||
|
||||
scan.actions._build_scan_status_message("open")
|
||||
|
||||
np.testing.assert_array_equal(scan.scan_info.positions, np.array([8000.0, 9000.0]))
|
||||
assert scan.scan_info.additional_scan_parameters["scan_time"] == 1.0
|
||||
assert scan.scan_info.additional_scan_parameters["scan_duration"] == 10.0
|
||||
assert scan.scan_info.readout_priority_modification["async"] == ["nidaq"]
|
||||
scan.actions.add_scan_report_instruction_device_progress.assert_called_once_with(scan.motor)
|
||||
scan.actions.read_baseline_devices.assert_called_once_with(wait=False)
|
||||
assert scan._baseline_readout_status is baseline_status
|
||||
|
||||
|
||||
def test_xas_simple_scan_v4_scan_core_reads_until_complete(v4_scan_assembler, nth_done_status_mock):
|
||||
scan = _assemble_xas_simple_scan(v4_scan_assembler)
|
||||
completion_status = nth_done_status_mock(resolve_after=3)
|
||||
scan.actions.kickoff = mock.MagicMock()
|
||||
scan.actions.complete = mock.MagicMock(return_value=completion_status)
|
||||
scan.actions.read_monitored_devices = mock.MagicMock()
|
||||
|
||||
with mock.patch("debye_bec.scans.xas_simple_scan.time.sleep"):
|
||||
scan.scan_core()
|
||||
|
||||
scan.actions.kickoff.assert_called_once_with(scan.motor)
|
||||
scan.actions.complete.assert_called_once_with(scan.motor, wait=False)
|
||||
assert scan.actions.read_monitored_devices.call_count == 2
|
||||
|
||||
|
||||
def test_xas_simple_scan_v4_post_scan_completes_all_devices(v4_scan_assembler):
|
||||
scan = _assemble_xas_simple_scan(v4_scan_assembler)
|
||||
scan.actions.complete_all_devices = mock.MagicMock()
|
||||
|
||||
scan.post_scan()
|
||||
|
||||
scan.actions.complete_all_devices.assert_called_once_with()
|
||||
|
||||
|
||||
def test_xas_simple_scan_with_xrd_v4_updates_xrd_metadata(v4_scan_assembler):
|
||||
scan = v4_scan_assembler(
|
||||
"xas_simple_scan_with_xrd",
|
||||
start=8000.0,
|
||||
stop=9000.0,
|
||||
scan_time=1.0,
|
||||
scan_duration=10.0,
|
||||
break_enable_low=True,
|
||||
break_time_low=1.0,
|
||||
cycle_low=2,
|
||||
break_enable_high=False,
|
||||
break_time_high=3.0,
|
||||
cycle_high=4,
|
||||
exp_time=0.5,
|
||||
n_of_trigger=6,
|
||||
motor="mo1_bragg",
|
||||
daq="nidaq",
|
||||
)
|
||||
|
||||
assert scan.scan_name == "xas_simple_scan_with_xrd"
|
||||
assert scan.scan_info.additional_scan_parameters["break_enable_low"] is True
|
||||
assert scan.scan_info.additional_scan_parameters["cycle_high"] == 4
|
||||
assert scan.scan_info.additional_scan_parameters["n_of_trigger"] == 6
|
||||
|
||||
|
||||
def test_xas_advanced_scan_v4_updates_spline_metadata(v4_scan_assembler):
|
||||
scan = v4_scan_assembler(
|
||||
"xas_advanced_scan",
|
||||
start=8000.0,
|
||||
stop=9000.0,
|
||||
scan_time=1.0,
|
||||
scan_duration=10.0,
|
||||
p_kink=50.0,
|
||||
e_kink=8500.0,
|
||||
motor="mo1_bragg",
|
||||
daq="nidaq",
|
||||
)
|
||||
|
||||
assert scan.scan_name == "xas_advanced_scan"
|
||||
assert scan.scan_info.additional_scan_parameters["p_kink"] == 50.0
|
||||
assert scan.scan_info.additional_scan_parameters["e_kink"] == 8500.0
|
||||
|
||||
|
||||
def test_xas_advanced_scan_with_xrd_v4_updates_all_metadata(v4_scan_assembler):
|
||||
scan = v4_scan_assembler(
|
||||
"xas_advanced_scan_with_xrd",
|
||||
start=8000.0,
|
||||
stop=9000.0,
|
||||
scan_time=1.0,
|
||||
scan_duration=10.0,
|
||||
p_kink=55.0,
|
||||
e_kink=8450.0,
|
||||
break_enable_low=True,
|
||||
break_time_low=1.5,
|
||||
cycle_low=2,
|
||||
break_enable_high=True,
|
||||
break_time_high=2.5,
|
||||
cycle_high=3,
|
||||
exp_time=0.25,
|
||||
n_of_trigger=8,
|
||||
motor="mo1_bragg",
|
||||
daq="nidaq",
|
||||
)
|
||||
|
||||
assert scan.scan_name == "xas_advanced_scan_with_xrd"
|
||||
assert scan.scan_info.additional_scan_parameters["p_kink"] == 55.0
|
||||
assert scan.scan_info.additional_scan_parameters["break_enable_high"] is True
|
||||
assert scan.scan_info.exp_time == 0.25
|
||||
@@ -1,126 +1,75 @@
|
||||
# pylint: skip-file
|
||||
from unittest import mock
|
||||
|
||||
from bec_lib.messages import DeviceInstructionMessage
|
||||
from bec_server.device_server.tests.utils import DMMock
|
||||
import pytest
|
||||
from bec_server.scan_server.tests.scan_fixtures import *
|
||||
from bec_server.scan_server.tests.scan_hook_tests import *
|
||||
|
||||
from debye_bec.scans import NIDAQContinuousScan
|
||||
NIDAQ_CONTINUOUS_SCAN_DEFAULT_HOOK_TESTS = [
|
||||
("prepare_scan", [assert_prepare_scan_reads_baseline_devices]),
|
||||
("open_scan", [assert_scan_open_called]),
|
||||
("stage", [assert_stage_all_devices_called]),
|
||||
("pre_scan", [assert_pre_scan_called]),
|
||||
("unstage", [assert_unstage_all_devices_called]),
|
||||
("close_scan", [assert_close_scan_waits_for_baseline_and_closes]),
|
||||
]
|
||||
|
||||
|
||||
def get_instructions(request, ScanStubStatusMock):
|
||||
request.metadata["RID"] = "my_test_request_id"
|
||||
|
||||
def fake_done():
|
||||
"""
|
||||
Fake done function for ScanStubStatusMock. Upon each call, it returns the next value from the generator.
|
||||
This is used to simulate the completion of the scan.
|
||||
"""
|
||||
yield False
|
||||
yield False
|
||||
yield True
|
||||
|
||||
def fake_complete(*args, **kwargs):
|
||||
yield "fake_complete"
|
||||
return ScanStubStatusMock(done_func=fake_done)
|
||||
|
||||
with (
|
||||
mock.patch.object(request.stubs, "complete", side_effect=fake_complete),
|
||||
mock.patch.object(request.stubs, "_get_result_from_status", return_value=None),
|
||||
):
|
||||
reference_commands = list(request.run())
|
||||
|
||||
for cmd in reference_commands:
|
||||
if not cmd or isinstance(cmd, str):
|
||||
continue
|
||||
if "RID" in cmd.metadata:
|
||||
cmd.metadata["RID"] = "my_test_request_id"
|
||||
if "rpc_id" in cmd.parameter:
|
||||
cmd.parameter["rpc_id"] = "my_test_rpc_id"
|
||||
cmd.metadata.pop("device_instr_id", None)
|
||||
|
||||
return reference_commands
|
||||
def _assemble_nidaq_continuous_scan(v4_scan_assembler, **overrides):
|
||||
params = {"scan_duration": 10.0, "daq": "nidaq", "compression": False}
|
||||
params.update(overrides)
|
||||
return v4_scan_assembler("nidaq_continuous_scan", **params)
|
||||
|
||||
|
||||
def test_xas_simple_scan(scan_assembler, ScanStubStatusMock):
|
||||
@pytest.mark.parametrize(("hook_name", "hook_tests"), NIDAQ_CONTINUOUS_SCAN_DEFAULT_HOOK_TESTS)
|
||||
def test_nidaq_continuous_scan_v4_default_hooks(
|
||||
v4_scan_assembler, nth_done_status_mock, hook_name, hook_tests
|
||||
):
|
||||
scan = _assemble_nidaq_continuous_scan(v4_scan_assembler)
|
||||
|
||||
request = scan_assembler(NIDAQContinuousScan, scan_duration=10)
|
||||
request.device_manager.add_device("nidaq")
|
||||
reference_commands = get_instructions(request, ScanStubStatusMock)
|
||||
assert reference_commands == [
|
||||
None,
|
||||
None,
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="scan_report_instruction",
|
||||
parameter={"device_progress": ["nidaq"]},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="open_scan",
|
||||
parameter={
|
||||
"readout_priority": {
|
||||
"monitored": [],
|
||||
"baseline": [],
|
||||
"on_request": [],
|
||||
"async": ["nidaq"],
|
||||
},
|
||||
"num_points": 0,
|
||||
"positions": [],
|
||||
"scan_name": "nidaq_continuous_scan",
|
||||
"scan_type": "fly",
|
||||
},
|
||||
),
|
||||
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
|
||||
DeviceInstructionMessage(
|
||||
metadata={},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
|
||||
action="stage",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
|
||||
device=["samx"],
|
||||
action="read",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
|
||||
action="pre_scan",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device="nidaq",
|
||||
action="kickoff",
|
||||
parameter={"configure": {}},
|
||||
),
|
||||
"fake_complete",
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
|
||||
device=["bpm4i", "eiger", "mo1_bragg"],
|
||||
action="read",
|
||||
parameter={"group": "monitored"},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
|
||||
device=["bpm4i", "eiger", "mo1_bragg"],
|
||||
action="read",
|
||||
parameter={"group": "monitored"},
|
||||
),
|
||||
"fake_complete",
|
||||
DeviceInstructionMessage(
|
||||
metadata={},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
|
||||
action="unstage",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="close_scan",
|
||||
parameter={},
|
||||
),
|
||||
]
|
||||
run_scan_tests(scan, [(hook_name, hook_tests)], nth_done_status_mock=nth_done_status_mock)
|
||||
|
||||
|
||||
def test_nidaq_continuous_scan_v4_prepare_scan_updates_metadata(v4_scan_assembler):
|
||||
scan = _assemble_nidaq_continuous_scan(v4_scan_assembler)
|
||||
scan.actions.add_scan_report_instruction_device_progress = mock.MagicMock()
|
||||
baseline_status = mock.MagicMock()
|
||||
scan.actions.read_baseline_devices = mock.MagicMock(return_value=baseline_status)
|
||||
|
||||
scan.prepare_scan()
|
||||
scan.actions._build_scan_status_message("open")
|
||||
|
||||
assert scan.scan_info.additional_scan_parameters["scan_duration"] == 10.0
|
||||
assert scan.scan_info.additional_scan_parameters["compression"] is False
|
||||
assert scan.scan_info.readout_priority_modification["async"] == ["nidaq"]
|
||||
scan.actions.add_scan_report_instruction_device_progress.assert_called_once_with(scan.daq)
|
||||
scan.actions.read_baseline_devices.assert_called_once_with(wait=False)
|
||||
assert scan._baseline_readout_status is baseline_status
|
||||
|
||||
|
||||
def test_nidaq_continuous_scan_v4_scan_core_reads_until_complete(
|
||||
v4_scan_assembler, nth_done_status_mock
|
||||
):
|
||||
scan = _assemble_nidaq_continuous_scan(v4_scan_assembler)
|
||||
kickoff_status = mock.MagicMock()
|
||||
completion_status = nth_done_status_mock(resolve_after=3)
|
||||
scan.actions.kickoff = mock.MagicMock(return_value=kickoff_status)
|
||||
scan.actions.complete = mock.MagicMock(return_value=completion_status)
|
||||
scan.actions.read_monitored_devices = mock.MagicMock()
|
||||
|
||||
with mock.patch("debye_bec.scans.nidaq_continuous_scan.time.sleep"):
|
||||
scan.scan_core()
|
||||
|
||||
scan.actions.kickoff.assert_called_once_with(device=scan.daq, wait=False)
|
||||
kickoff_status.wait.assert_called_once_with(timeout=5)
|
||||
scan.actions.complete.assert_called_once_with(device=scan.daq, wait=False)
|
||||
assert scan.actions.read_monitored_devices.call_count == 2
|
||||
|
||||
|
||||
def test_nidaq_continuous_scan_v4_post_scan_completes_all_devices(v4_scan_assembler):
|
||||
scan = _assemble_nidaq_continuous_scan(v4_scan_assembler)
|
||||
scan.actions.complete_all_devices = mock.MagicMock()
|
||||
|
||||
scan.post_scan()
|
||||
|
||||
scan.actions.complete_all_devices.assert_called_once_with()
|
||||
|
||||
Reference in New Issue
Block a user