feat: migrate XAS simple scan to V4 implementation and remove mono bragg scans
CI for debye_bec / test (push) Failing after 35s

This commit is contained in:
2026-05-10 14:20:08 +02:00
parent bee4562ab1
commit 99d0380977
4 changed files with 491 additions and 316 deletions
+6 -6
View File
@@ -1,7 +1,7 @@
from .mono_bragg_scans import (
XASAdvancedScan,
XASAdvancedScanWithXRD,
XASSimpleScan,
XASSimpleScanWithXRD,
)
from .nidaq_cont_scan import NIDAQContinuousScan
from .xas_simple_scan import (
XasAdvancedScan,
XasAdvancedScanWithXrd,
XasSimpleScan,
XasSimpleScanWithXrd,
)
-310
View File
@@ -1,310 +0,0 @@
"""This module contains the scan classes for the mono bragg motor of the Debye beamline."""
import time
from typing import Literal
import numpy as np
from bec_lib.device import DeviceBase
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase
logger = bec_logger.logger
class XASSimpleScan(AsyncFlyScanBase):
"""Class for the XAS simple scan"""
scan_name = "xas_simple_scan"
scan_type = "fly"
scan_report_hint = "device_progress"
required_kwargs = []
use_scan_progress_report = False
pre_move = False
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration"],
}
def __init__(
self,
start: float,
stop: float,
scan_time: float,
scan_duration: float,
motor: DeviceBase = "mo1_bragg",
**kwargs,
):
"""The xas_simple_scan is used to start a simple oscillating scan on the mono bragg motor.
Start and Stop define the energy range for the scan, scan_time is the time for one scan
cycle and scan_duration is the duration of the scan. If scan duration is set to 0, the
scan will run infinitely.
Args:
start (float): Start energy for the scan.
stop (float): Stop energy for the scan.
scan_time (float): Time for one scan cycle.
scan_duration (float): Duration of the scan.
motor (DeviceBase, optional): Motor device to be used for the scan.
Defaults to "mo1_bragg".
Examples:
>>> scans.xas_simple_scan(start=8000, stop=9000, scan_time=1, scan_duration=10)
"""
super().__init__(**kwargs)
self.motor = motor
self.start = start
self.stop = stop
self.scan_time = scan_time
self.scan_duration = scan_duration
self.primary_readout_cycle = 1
def update_readout_priority(self):
"""Ensure that NIDAQ is not monitored for any quick EXAFS."""
super().update_readout_priority()
self.readout_priority["async"].append("nidaq")
def prepare_positions(self):
"""Prepare the positions for the scan.
Use here only start and end energy defining the range for the scan.
"""
self.positions = np.array([self.start, self.stop], dtype=float)
self.num_pos = None
yield None
def pre_scan(self):
"""Pre Scan action."""
self._check_limits()
# Ensure parent class pre_scan actions to be called.
yield from super().pre_scan()
def scan_report_instructions(self):
"""
Return the instructions for the scan report.
"""
yield from self.stubs.scan_report_instruction({"device_progress": [self.motor]})
def scan_core(self):
"""Run the scan core.
Kickoff the oscillation on the Bragg motor and wait for the completion of the motion.
"""
# Start the oscillation on the Bragg motor.
yield from self.stubs.kickoff(device=self.motor)
complete_status = yield from self.stubs.complete(device=self.motor, wait=False)
while not complete_status.done:
# Readout monitored devices
yield from self.stubs.read(group="monitored", point_id=self.point_id)
time.sleep(self.primary_readout_cycle)
self.point_id += 1
self.num_pos = self.point_id
class XASSimpleScanWithXRD(XASSimpleScan):
"""Class for the XAS simple scan with XRD"""
scan_name = "xas_simple_scan_with_xrd"
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration"],
"Low Energy Break": ["break_enable_low", "break_time_low", "cycle_low"],
"High Energy Break": ["break_enable_high", "break_time_high", "cycle_high"],
"XRD Triggers": ["exp_time", "n_of_trigger"],
}
def __init__(
self,
start: float,
stop: float,
scan_time: float,
scan_duration: float,
break_enable_low: bool,
break_time_low: float,
cycle_low: int,
break_enable_high: bool,
break_time_high: float,
cycle_high: float,
exp_time: float,
n_of_trigger: int,
motor: DeviceBase = "mo1_bragg",
**kwargs,
):
"""The xas_simple_scan_with_xrd is an oscillation motion on the mono motor
with XRD triggering at low and high energy ranges.
If scan duration is set to 0, the scan will run infinitely.
Args:
start (float): Start energy for the scan.
stop (float): Stop energy for the scan.
scan_time (float): Time for one oscillation .
scan_duration (float): Total duration of the scan.
break_enable_low (bool): Enable breaks for the low energy range.
break_time_low (float): Break time for the low energy range.
cycle_low (int): Specify how often the triggers should be considered,
every nth cycle for low
break_enable_high (bool): Enable breaks for the high energy range.
break_time_high (float): Break time for the high energy range.
cycle_high (int): Specify how often the triggers should be considered,
every nth cycle for high
exp_time (float): Length of 1 trigger period in seconds
n_of_trigger (int): Amount of triggers to be fired during break
motor (DeviceBase, optional): Motor device to be used for the scan.
Defaults to "mo1_bragg".
Examples:
>>> scans.xas_simple_scan_with_xrd(start=8000, stop=9000, scan_time=1, scan_duration=10, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000)
"""
super().__init__(
start=start,
stop=stop,
scan_time=scan_time,
scan_duration=scan_duration,
motor=motor,
**kwargs,
)
self.break_enable_low = break_enable_low
self.break_time_low = break_time_low
self.cycle_low = cycle_low
self.break_enable_high = break_enable_high
self.break_time_high = break_time_high
self.cycle_high = cycle_high
self.exp_time = exp_time
self.n_of_trigger = n_of_trigger
class XASAdvancedScan(XASSimpleScan):
"""Class for the XAS advanced scan"""
scan_name = "xas_advanced_scan"
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration"],
"Spline Parameters": ["p_kink", "e_kink"],
}
def __init__(
self,
start: float,
stop: float,
scan_time: float,
scan_duration: float,
p_kink: float,
e_kink: float,
motor: DeviceBase = "mo1_bragg",
**kwargs,
):
"""The xas_advanced_scan is an oscillation motion on the mono motor.
Start and Stop define the energy range for the scan, scan_time is the
time for one scan cycle and scan_duration is the duration of the scan.
If scan duration is set to 0, the scan will run infinitely.
p_kink and e_kink add a kink to the motion profile to slow down in the
exafs region of the scan.
Args:
start (float): Start angle for the scan.
stop (float): Stop angle for the scan.
scan_time (float): Time for one oscillation .
scan_duration (float): Total duration of the scan.
p_kink (float): Position of the kink.
e_kink (float): Energy of the kink.
motor (DeviceBase, optional): Motor device to be used for the scan.
Defaults to "mo1_bragg".
Examples:
>>> scans.xas_advanced_scan(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500)
"""
super().__init__(
start=start,
stop=stop,
scan_time=scan_time,
scan_duration=scan_duration,
motor=motor,
**kwargs,
)
self.p_kink = p_kink
self.e_kink = e_kink
class XASAdvancedScanWithXRD(XASAdvancedScan):
"""Class for the XAS advanced scan with XRD"""
scan_name = "xas_advanced_scan_with_xrd"
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration"],
"Spline Parameters": ["p_kink", "e_kink"],
"Low Energy Break": ["break_enable_low", "break_time_low", "cycle_low"],
"High Energy Break": ["break_enable_high", "break_time_high", "cycle_high"],
"XRD Triggers": ["exp_time", "n_of_trigger"],
}
def __init__(
self,
start: float,
stop: float,
scan_time: float,
scan_duration: float,
p_kink: float,
e_kink: float,
break_enable_low: bool,
break_time_low: float,
cycle_low: int,
break_enable_high: bool,
break_time_high: float,
cycle_high: float,
exp_time: float,
n_of_trigger: int,
motor: DeviceBase = "mo1_bragg",
**kwargs,
):
"""The xas_advanced_scan is an oscillation motion on the mono motor
with XRD triggering at low and high energy ranges.
Start and Stop define the energy range for the scan, scan_time is the time for
one scan cycle and scan_duration is the duration of the scan. If scan duration
is set to 0, the scan will run infinitely. p_kink and e_kink add a kink to the
motion profile to slow down in the exafs region of the scan.
Args:
start (float): Start angle for the scan.
stop (float): Stop angle for the scan.
scan_time (float): Time for one oscillation .
scan_duration (float): Total duration of the scan.
p_kink (float): Position of kink.
e_kink (float): Energy of the kink.
break_enable_low (bool): Enable breaks for the low energy range.
break_time_low (float): Break time for the low energy range.
cycle_low (int): Specify how often the triggers should be considered,
every nth cycle for low
break_enable_high (bool): Enable breaks for the high energy range.
break_time_high (float): Break time for the high energy range.
cycle_high (int): Specify how often the triggers should be considered,
every nth cycle for high
exp_time (float): Length of 1 trigger period in seconds
n_of_trigger (int): Amount of triggers to be fired during break
motor (DeviceBase, optional): Motor device to be used for the scan.
Defaults to "mo1_bragg".
Examples:
>>> scans.xas_advanced_scan_with_xrd(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000)
"""
super().__init__(
start=start,
stop=stop,
scan_time=scan_time,
scan_duration=scan_duration,
p_kink=p_kink,
e_kink=e_kink,
motor=motor,
**kwargs,
)
self.p_kink = p_kink
self.e_kink = e_kink
self.break_enable_low = break_enable_low
self.break_time_low = break_time_low
self.cycle_low = cycle_low
self.break_enable_high = break_enable_high
self.break_time_high = break_time_high
self.cycle_high = cycle_high
self.exp_time = exp_time
self.n_of_trigger = n_of_trigger
+326
View File
@@ -0,0 +1,326 @@
"""
V4 implementation of the Debye XAS simple scan.
Scan procedure:
- prepare_scan
- open_scan
- stage
- pre_scan
- scan_core
- at_each_point (optionally called by scan_core)
- post_scan
- unstage
- close_scan
- on_exception (called if any exception is raised during the scan)
"""
from __future__ import annotations
import time
from typing import Annotated
import numpy as np
from bec_lib.device import DeviceBase
from bec_lib.scan_args import ScanArgument, Units
from bec_server.scan_server.scans.scan_base import ScanBase, ScanType
from bec_server.scan_server.scans.scan_modifier import scan_hook
class XasSimpleScan(ScanBase):
scan_type = ScanType.HARDWARE_TRIGGERED
scan_name = "xas_simple_scan"
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration", "primary_readout_cycle"],
}
def __init__(
self,
#fmt: off
start: Annotated[float, ScanArgument(display_name="Start Energy", description="Start energy.", units=Units.eV)],
stop: Annotated[float, ScanArgument(display_name="Stop Energy", description="Stop energy.", units=Units.eV)],
scan_time: Annotated[float, ScanArgument(display_name="Scan Time", description="Time for one scan cycle.", units=Units.s, ge=0)],
scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Total scan duration.", units=Units.s, ge=0)],
motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None,
daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None,
primary_readout_cycle: Annotated[float, ScanArgument(display_name="Primary Readout Cycle", description="Delay between monitored readouts.",units=Units.s, gt=0,)] = 1,
#fmt: on
**kwargs,
):
"""
Start a simple oscillating scan on the mono bragg motor.
Args:
start (float): Start energy.
stop (float): Stop energy.
scan_time (float): Time for one scan cycle.
scan_duration (float): Total scan duration.
motor (DeviceBase | None): Bragg motor device.
daq (DeviceBase | None): NIDAQ device.
primary_readout_cycle (float): Delay between monitored readouts.
Returns:
ScanReport
"""
super().__init__(**kwargs)
self.start = start
self.stop = stop
self.scan_time = scan_time
self.scan_duration = scan_duration
self.motor = motor if motor is not None else self.dev["mo1_bragg"]
self.daq = daq if daq is not None else self.dev["nidaq"]
self.primary_readout_cycle = primary_readout_cycle
self.positions = np.array([self.start, self.stop], dtype=float)
# We pass on the arguments as "additional_scan_parameters" in the scan info
self.update_scan_info(
positions=self.positions,
scan_time=scan_time,
scan_duration=scan_duration,
primary_readout_cycle=primary_readout_cycle,
)
self.actions.set_device_readout_priority([self.daq], priority="async")
@scan_hook
def prepare_scan(self):
"""
Prepare the scan. This can include any steps that need to be executed
before the scan is opened, such as preparing the positions (if not done already)
or setting up the devices.
"""
self.actions.add_scan_report_instruction_device_progress(self.motor)
self._baseline_readout_status = self.actions.read_baseline_devices(wait=False)
@scan_hook
def open_scan(self):
"""
Open the scan.
This step must call self.actions.open_scan() to ensure that a new scan is
opened. Make sure to prepare the scan metadata before, either in
prepare_scan() or in open_scan() itself and call self.update_scan_info(...)
to update the scan metadata if needed.
"""
self.actions.open_scan()
@scan_hook
def stage(self):
"""
Stage the devices for the upcoming scan. The stage logic is typically
implemented on the device itself (i.e. by the device's stage method).
However, if there are any additional steps that need to be executed before
staging the devices, they can be implemented here.
"""
self.actions.stage_all_devices()
@scan_hook
def pre_scan(self):
"""
Pre-scan steps to be executed before the main scan logic.
This is typically the last chance to prepare the devices before the core scan
logic is executed. For example, this is a good place to initialize time-criticial
devices, e.g. devices that have a short timeout.
The pre-scan logic is typically implemented on the device itself.
"""
self.actions.pre_scan_all_devices()
@scan_hook
def scan_core(self):
"""
Core scan logic to be executed during the scan.
This is where the main scan logic should be implemented.
"""
self.actions.kickoff(self.motor)
completion_status = self.actions.complete(self.motor, wait=False)
while not completion_status.done:
self.at_each_point()
@scan_hook
def at_each_point(self):
"""
Logic to be executed at each acquisition point during the scan.
"""
self.actions.read_monitored_devices()
time.sleep(self.primary_readout_cycle)
@scan_hook
def post_scan(self):
"""
Post-scan steps to be executed after the main scan logic.
"""
self.actions.complete_all_devices()
@scan_hook
def unstage(self):
"""Unstage the scan by executing post-scan steps."""
self.actions.unstage_all_devices()
@scan_hook
def close_scan(self):
"""Close the scan."""
if self._baseline_readout_status is not None:
self._baseline_readout_status.wait()
self.actions.close_scan()
self.actions.check_for_unchecked_statuses()
@scan_hook
def on_exception(self, exception: Exception):
"""
Handle exceptions that occur during the scan.
This is a good place to implement any cleanup logic that needs to be executed in case of an exception,
such as returning the devices to a safe state or moving the motors back to their starting position.
"""
self.actions.complete_all_devices(wait=False)
class XasSimpleScanWithXrd(XasSimpleScan):
scan_name = "xas_simple_scan_with_xrd"
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration", "primary_readout_cycle"],
"Low Energy Break": ["break_enable_low", "break_time_low", "cycle_low"],
"High Energy Break": ["break_enable_high", "break_time_high", "cycle_high"],
"XRD Triggers": ["exp_time", "n_of_trigger"],
}
def __init__(
self,
#fmt: off
start: Annotated[float, ScanArgument(display_name="Start Energy", description="Start energy.", units=Units.eV)],
stop: Annotated[float, ScanArgument(display_name="Stop Energy", description="Stop energy.", units=Units.eV)],
scan_time: Annotated[float, ScanArgument(display_name="Scan Time", description="Time for one scan cycle.", units=Units.s, ge=0)],
scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Total scan duration.", units=Units.s, ge=0)],
break_enable_low: Annotated[bool, ScanArgument(display_name="Break Enable Low", description="Enable breaks for the low energy range.")],
break_time_low: Annotated[float, ScanArgument(display_name="Break Time Low", description="Break time for the low energy range.", units=Units.s, ge=0)],
cycle_low: Annotated[int, ScanArgument(display_name="Cycle Low", description="Use triggers every nth low-energy cycle.", ge=0)],
break_enable_high: Annotated[bool, ScanArgument(display_name="Break Enable High", description="Enable breaks for the high energy range.")],
break_time_high: Annotated[float, ScanArgument(display_name="Break Time High", description="Break time for the high energy range.", units=Units.s, ge=0)],
cycle_high: Annotated[int, ScanArgument(display_name="Cycle High", description="Use triggers every nth high-energy cycle.", ge=0)],
exp_time: Annotated[float, ScanArgument(display_name="Exposure Time", description="Length of one trigger period.", units=Units.s, ge=0)],
n_of_trigger: Annotated[int, ScanArgument(display_name="Number Of Trigger", description="Amount of triggers fired during a break.", ge=0)],
motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None,
daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None,
primary_readout_cycle: Annotated[float, ScanArgument(display_name="Primary Readout Cycle", description="Delay between monitored readouts.", units=Units.s, gt=0)] = 1,
**kwargs,
#fmt: on
):
super().__init__(
start=start,
stop=stop,
scan_time=scan_time,
scan_duration=scan_duration,
motor=motor,
daq=daq,
primary_readout_cycle=primary_readout_cycle,
**kwargs,
)
# We pass on the arguments as "additional_scan_parameters" in the scan info
self.update_scan_info(
break_enable_low=break_enable_low,
break_time_low=break_time_low,
cycle_low=cycle_low,
break_enable_high=break_enable_high,
break_time_high=break_time_high,
cycle_high=cycle_high,
exp_time=exp_time,
n_of_trigger=n_of_trigger,
)
class XasAdvancedScan(XasSimpleScan):
scan_name = "xas_advanced_scan"
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration", "primary_readout_cycle"],
"Spline Parameters": ["p_kink", "e_kink"],
}
def __init__(
self,
#fmt: off
start: Annotated[float, ScanArgument(display_name="Start Energy", description="Start energy.", units=Units.eV)],
stop: Annotated[float, ScanArgument(display_name="Stop Energy", description="Stop energy.", units=Units.eV)],
scan_time: Annotated[float, ScanArgument(display_name="Scan Time", description="Time for one scan cycle.", units=Units.s, ge=0)],
scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Total scan duration.", units=Units.s, ge=0)],
p_kink: Annotated[float, ScanArgument(display_name="P Kink", description="Position of the kink.", ge=0)],
e_kink: Annotated[float, ScanArgument(display_name="E Kink", description="Energy of the kink.", units=Units.eV)],
motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None,
daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None,
primary_readout_cycle: Annotated[float, ScanArgument(display_name="Primary Readout Cycle", description="Delay between monitored readouts.", units=Units.s, gt=0)] = 1,
**kwargs,
#fmt: on
):
super().__init__(
start=start,
stop=stop,
scan_time=scan_time,
scan_duration=scan_duration,
motor=motor,
daq=daq,
primary_readout_cycle=primary_readout_cycle,
**kwargs,
)
# We pass on the arguments as "additional_scan_parameters" in the scan info
self.update_scan_info(p_kink=p_kink, e_kink=e_kink)
class XasAdvancedScanWithXrd(XasAdvancedScan):
scan_name = "xas_advanced_scan_with_xrd"
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration", "primary_readout_cycle"],
"Spline Parameters": ["p_kink", "e_kink"],
"Low Energy Break": ["break_enable_low", "break_time_low", "cycle_low"],
"High Energy Break": ["break_enable_high", "break_time_high", "cycle_high"],
"XRD Triggers": ["exp_time", "n_of_trigger"],
}
def __init__(
self,
#fmt: off
start: Annotated[float, ScanArgument(display_name="Start Energy", description="Start energy.", units=Units.eV)],
stop: Annotated[float, ScanArgument(display_name="Stop Energy", description="Stop energy.", units=Units.eV)],
scan_time: Annotated[float, ScanArgument(display_name="Scan Time", description="Time for one scan cycle.", units=Units.s, ge=0)],
scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Total scan duration.", units=Units.s, ge=0)],
p_kink: Annotated[float, ScanArgument(display_name="P Kink", description="Position of the kink.", ge=0)],
e_kink: Annotated[float, ScanArgument(display_name="E Kink", description="Energy of the kink.", units=Units.eV)],
break_enable_low: Annotated[bool, ScanArgument(display_name="Break Enable Low", description="Enable breaks for the low energy range.")],
break_time_low: Annotated[float, ScanArgument(display_name="Break Time Low", description="Break time for the low energy range.", units=Units.s, ge=0)],
cycle_low: Annotated[int, ScanArgument(display_name="Cycle Low", description="Use triggers every nth low-energy cycle.", ge=0)],
break_enable_high: Annotated[bool, ScanArgument(display_name="Break Enable High", description="Enable breaks for the high energy range.")],
break_time_high: Annotated[float, ScanArgument(display_name="Break Time High", description="Break time for the high energy range.", units=Units.s, ge=0)],
cycle_high: Annotated[int, ScanArgument(display_name="Cycle High", description="Use triggers every nth high-energy cycle.", ge=0)],
exp_time: Annotated[float, ScanArgument(display_name="Exposure Time", description="Length of one trigger period.", units=Units.s, ge=0)],
n_of_trigger: Annotated[int, ScanArgument(display_name="Number Of Trigger", description="Amount of triggers fired during a break.", ge=0)],
motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None,
daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None,
primary_readout_cycle: Annotated[float, ScanArgument(display_name="Primary Readout Cycle", description="Delay between monitored readouts.", units=Units.s, gt=0)] = 1,
**kwargs,
#fmt: on
):
super().__init__(
start=start,
stop=stop,
scan_time=scan_time,
scan_duration=scan_duration,
p_kink=p_kink,
e_kink=e_kink,
motor=motor,
daq=daq,
primary_readout_cycle=primary_readout_cycle,
**kwargs,
)
# We pass on the arguments as "additional_scan_parameters" in the scan info
self.update_scan_info(
break_enable_low=break_enable_low,
break_time_low=break_time_low,
cycle_low=cycle_low,
break_enable_high=break_enable_high,
break_time_high=break_time_high,
cycle_high=cycle_high,
exp_time=exp_time,
n_of_trigger=n_of_trigger,
)
@@ -0,0 +1,159 @@
# pylint: skip-file
from unittest import mock
import numpy as np
import pytest
from bec_server.scan_server.tests.scan_hook_tests import (
assert_close_scan_waits_for_baseline_and_closes,
assert_pre_scan_called,
assert_prepare_scan_reads_baseline_devices,
assert_scan_open_called,
assert_stage_all_devices_called,
assert_unstage_all_devices_called,
run_scan_tests,
)
XAS_SIMPLE_SCAN_DEFAULT_HOOK_TESTS = [
("prepare_scan", [assert_prepare_scan_reads_baseline_devices]),
("open_scan", [assert_scan_open_called]),
("stage", [assert_stage_all_devices_called]),
("pre_scan", [assert_pre_scan_called]),
("unstage", [assert_unstage_all_devices_called]),
("close_scan", [assert_close_scan_waits_for_baseline_and_closes]),
]
def _assemble_xas_simple_scan(v4_scan_assembler, **overrides):
params = {
"start": 8000.0,
"stop": 9000.0,
"scan_time": 1.0,
"scan_duration": 10.0,
"motor": "mo1_bragg",
"daq": "nidaq",
"primary_readout_cycle": 1.0,
}
params.update(overrides)
return v4_scan_assembler("xas_simple_scan", **params)
@pytest.mark.parametrize(("hook_name", "hook_tests"), XAS_SIMPLE_SCAN_DEFAULT_HOOK_TESTS)
def test_xas_simple_scan_v4_default_hooks(
v4_scan_assembler, nth_done_status_mock, hook_name, hook_tests
):
scan = _assemble_xas_simple_scan(v4_scan_assembler)
run_scan_tests(scan, [(hook_name, hook_tests)], nth_done_status_mock=nth_done_status_mock)
def test_xas_simple_scan_v4_prepare_scan_updates_metadata(v4_scan_assembler):
scan = _assemble_xas_simple_scan(v4_scan_assembler)
scan.actions.add_scan_report_instruction_device_progress = mock.MagicMock()
baseline_status = mock.MagicMock()
scan.actions.read_baseline_devices = mock.MagicMock(return_value=baseline_status)
scan.prepare_scan()
scan.actions._build_scan_status_message("open")
np.testing.assert_array_equal(scan.scan_info.positions, np.array([8000.0, 9000.0]))
assert scan.scan_info.additional_scan_parameters["scan_time"] == 1.0
assert scan.scan_info.additional_scan_parameters["scan_duration"] == 10.0
assert scan.scan_info.readout_priority_modification["async"] == ["nidaq"]
scan.actions.add_scan_report_instruction_device_progress.assert_called_once_with(scan.motor)
scan.actions.read_baseline_devices.assert_called_once_with(wait=False)
assert scan._baseline_readout_status is baseline_status
def test_xas_simple_scan_v4_scan_core_reads_until_complete(v4_scan_assembler, nth_done_status_mock):
scan = _assemble_xas_simple_scan(v4_scan_assembler)
completion_status = nth_done_status_mock(resolve_after=3)
scan.actions.kickoff = mock.MagicMock()
scan.actions.complete = mock.MagicMock(return_value=completion_status)
scan.actions.read_monitored_devices = mock.MagicMock()
with mock.patch("debye_bec.scans.xas_simple_scan.time.sleep"):
scan.scan_core()
scan.actions.kickoff.assert_called_once_with(scan.motor)
scan.actions.complete.assert_called_once_with(scan.motor, wait=False)
assert scan.actions.read_monitored_devices.call_count == 2
def test_xas_simple_scan_v4_post_scan_completes_all_devices(v4_scan_assembler):
scan = _assemble_xas_simple_scan(v4_scan_assembler)
scan.actions.complete_all_devices = mock.MagicMock()
scan.post_scan()
scan.actions.complete_all_devices.assert_called_once_with()
def test_xas_simple_scan_with_xrd_v4_updates_xrd_metadata(v4_scan_assembler):
scan = v4_scan_assembler(
"xas_simple_scan_with_xrd",
start=8000.0,
stop=9000.0,
scan_time=1.0,
scan_duration=10.0,
break_enable_low=True,
break_time_low=1.0,
cycle_low=2,
break_enable_high=False,
break_time_high=3.0,
cycle_high=4,
exp_time=0.5,
n_of_trigger=6,
motor="mo1_bragg",
daq="nidaq",
)
assert scan.scan_name == "xas_simple_scan_with_xrd"
assert scan.scan_info.additional_scan_parameters["break_enable_low"] is True
assert scan.scan_info.additional_scan_parameters["cycle_high"] == 4
assert scan.scan_info.additional_scan_parameters["n_of_trigger"] == 6
def test_xas_advanced_scan_v4_updates_spline_metadata(v4_scan_assembler):
scan = v4_scan_assembler(
"xas_advanced_scan",
start=8000.0,
stop=9000.0,
scan_time=1.0,
scan_duration=10.0,
p_kink=50.0,
e_kink=8500.0,
motor="mo1_bragg",
daq="nidaq",
)
assert scan.scan_name == "xas_advanced_scan"
assert scan.scan_info.additional_scan_parameters["p_kink"] == 50.0
assert scan.scan_info.additional_scan_parameters["e_kink"] == 8500.0
def test_xas_advanced_scan_with_xrd_v4_updates_all_metadata(v4_scan_assembler):
scan = v4_scan_assembler(
"xas_advanced_scan_with_xrd",
start=8000.0,
stop=9000.0,
scan_time=1.0,
scan_duration=10.0,
p_kink=55.0,
e_kink=8450.0,
break_enable_low=True,
break_time_low=1.5,
cycle_low=2,
break_enable_high=True,
break_time_high=2.5,
cycle_high=3,
exp_time=0.25,
n_of_trigger=8,
motor="mo1_bragg",
daq="nidaq",
)
assert scan.scan_name == "xas_advanced_scan_with_xrd"
assert scan.scan_info.additional_scan_parameters["p_kink"] == 55.0
assert scan.scan_info.additional_scan_parameters["break_enable_high"] is True
assert scan.scan_info.exp_time == 0.25