diff --git a/debye_bec/scans/__init__.py b/debye_bec/scans/__init__.py index 9a3e710..08e8f07 100644 --- a/debye_bec/scans/__init__.py +++ b/debye_bec/scans/__init__.py @@ -1,7 +1,7 @@ -from .mono_bragg_scans import ( - XASAdvancedScan, - XASAdvancedScanWithXRD, - XASSimpleScan, - XASSimpleScanWithXRD, -) from .nidaq_cont_scan import NIDAQContinuousScan +from .xas_simple_scan import ( + XasAdvancedScan, + XasAdvancedScanWithXrd, + XasSimpleScan, + XasSimpleScanWithXrd, +) diff --git a/debye_bec/scans/mono_bragg_scans.py b/debye_bec/scans/mono_bragg_scans.py deleted file mode 100644 index 03c234b..0000000 --- a/debye_bec/scans/mono_bragg_scans.py +++ /dev/null @@ -1,310 +0,0 @@ -"""This module contains the scan classes for the mono bragg motor of the Debye beamline.""" - -import time -from typing import Literal - -import numpy as np -from bec_lib.device import DeviceBase -from bec_lib.logger import bec_logger -from bec_server.scan_server.scans import AsyncFlyScanBase - -logger = bec_logger.logger - - -class XASSimpleScan(AsyncFlyScanBase): - """Class for the XAS simple scan""" - - scan_name = "xas_simple_scan" - scan_type = "fly" - scan_report_hint = "device_progress" - required_kwargs = [] - use_scan_progress_report = False - pre_move = False - gui_config = { - "Movement Parameters": ["start", "stop"], - "Scan Parameters": ["scan_time", "scan_duration"], - } - - def __init__( - self, - start: float, - stop: float, - scan_time: float, - scan_duration: float, - motor: DeviceBase = "mo1_bragg", - **kwargs, - ): - """The xas_simple_scan is used to start a simple oscillating scan on the mono bragg motor. - Start and Stop define the energy range for the scan, scan_time is the time for one scan - cycle and scan_duration is the duration of the scan. If scan duration is set to 0, the - scan will run infinitely. - - Args: - start (float): Start energy for the scan. - stop (float): Stop energy for the scan. - scan_time (float): Time for one scan cycle. - scan_duration (float): Duration of the scan. - motor (DeviceBase, optional): Motor device to be used for the scan. - Defaults to "mo1_bragg". - Examples: - >>> scans.xas_simple_scan(start=8000, stop=9000, scan_time=1, scan_duration=10) - """ - super().__init__(**kwargs) - self.motor = motor - self.start = start - self.stop = stop - self.scan_time = scan_time - self.scan_duration = scan_duration - self.primary_readout_cycle = 1 - - def update_readout_priority(self): - """Ensure that NIDAQ is not monitored for any quick EXAFS.""" - super().update_readout_priority() - self.readout_priority["async"].append("nidaq") - - def prepare_positions(self): - """Prepare the positions for the scan. - - Use here only start and end energy defining the range for the scan. - """ - self.positions = np.array([self.start, self.stop], dtype=float) - self.num_pos = None - yield None - - def pre_scan(self): - """Pre Scan action.""" - - self._check_limits() - # Ensure parent class pre_scan actions to be called. - yield from super().pre_scan() - - def scan_report_instructions(self): - """ - Return the instructions for the scan report. - """ - yield from self.stubs.scan_report_instruction({"device_progress": [self.motor]}) - - def scan_core(self): - """Run the scan core. - Kickoff the oscillation on the Bragg motor and wait for the completion of the motion. - """ - # Start the oscillation on the Bragg motor. - yield from self.stubs.kickoff(device=self.motor) - complete_status = yield from self.stubs.complete(device=self.motor, wait=False) - - while not complete_status.done: - # Readout monitored devices - yield from self.stubs.read(group="monitored", point_id=self.point_id) - time.sleep(self.primary_readout_cycle) - self.point_id += 1 - - self.num_pos = self.point_id - - -class XASSimpleScanWithXRD(XASSimpleScan): - """Class for the XAS simple scan with XRD""" - - scan_name = "xas_simple_scan_with_xrd" - gui_config = { - "Movement Parameters": ["start", "stop"], - "Scan Parameters": ["scan_time", "scan_duration"], - "Low Energy Break": ["break_enable_low", "break_time_low", "cycle_low"], - "High Energy Break": ["break_enable_high", "break_time_high", "cycle_high"], - "XRD Triggers": ["exp_time", "n_of_trigger"], - } - - def __init__( - self, - start: float, - stop: float, - scan_time: float, - scan_duration: float, - break_enable_low: bool, - break_time_low: float, - cycle_low: int, - break_enable_high: bool, - break_time_high: float, - cycle_high: float, - exp_time: float, - n_of_trigger: int, - motor: DeviceBase = "mo1_bragg", - **kwargs, - ): - """The xas_simple_scan_with_xrd is an oscillation motion on the mono motor - with XRD triggering at low and high energy ranges. - If scan duration is set to 0, the scan will run infinitely. - - Args: - start (float): Start energy for the scan. - stop (float): Stop energy for the scan. - scan_time (float): Time for one oscillation . - scan_duration (float): Total duration of the scan. - break_enable_low (bool): Enable breaks for the low energy range. - break_time_low (float): Break time for the low energy range. - cycle_low (int): Specify how often the triggers should be considered, - every nth cycle for low - break_enable_high (bool): Enable breaks for the high energy range. - break_time_high (float): Break time for the high energy range. - cycle_high (int): Specify how often the triggers should be considered, - every nth cycle for high - exp_time (float): Length of 1 trigger period in seconds - n_of_trigger (int): Amount of triggers to be fired during break - motor (DeviceBase, optional): Motor device to be used for the scan. - Defaults to "mo1_bragg". - - Examples: - >>> scans.xas_simple_scan_with_xrd(start=8000, stop=9000, scan_time=1, scan_duration=10, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000) - """ - super().__init__( - start=start, - stop=stop, - scan_time=scan_time, - scan_duration=scan_duration, - motor=motor, - **kwargs, - ) - self.break_enable_low = break_enable_low - self.break_time_low = break_time_low - self.cycle_low = cycle_low - self.break_enable_high = break_enable_high - self.break_time_high = break_time_high - self.cycle_high = cycle_high - self.exp_time = exp_time - self.n_of_trigger = n_of_trigger - - -class XASAdvancedScan(XASSimpleScan): - """Class for the XAS advanced scan""" - - scan_name = "xas_advanced_scan" - gui_config = { - "Movement Parameters": ["start", "stop"], - "Scan Parameters": ["scan_time", "scan_duration"], - "Spline Parameters": ["p_kink", "e_kink"], - } - - def __init__( - self, - start: float, - stop: float, - scan_time: float, - scan_duration: float, - p_kink: float, - e_kink: float, - motor: DeviceBase = "mo1_bragg", - **kwargs, - ): - """The xas_advanced_scan is an oscillation motion on the mono motor. - Start and Stop define the energy range for the scan, scan_time is the - time for one scan cycle and scan_duration is the duration of the scan. - If scan duration is set to 0, the scan will run infinitely. - p_kink and e_kink add a kink to the motion profile to slow down in the - exafs region of the scan. - - Args: - start (float): Start angle for the scan. - stop (float): Stop angle for the scan. - scan_time (float): Time for one oscillation . - scan_duration (float): Total duration of the scan. - p_kink (float): Position of the kink. - e_kink (float): Energy of the kink. - motor (DeviceBase, optional): Motor device to be used for the scan. - Defaults to "mo1_bragg". - - Examples: - >>> scans.xas_advanced_scan(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500) - """ - super().__init__( - start=start, - stop=stop, - scan_time=scan_time, - scan_duration=scan_duration, - motor=motor, - **kwargs, - ) - self.p_kink = p_kink - self.e_kink = e_kink - - -class XASAdvancedScanWithXRD(XASAdvancedScan): - """Class for the XAS advanced scan with XRD""" - - scan_name = "xas_advanced_scan_with_xrd" - gui_config = { - "Movement Parameters": ["start", "stop"], - "Scan Parameters": ["scan_time", "scan_duration"], - "Spline Parameters": ["p_kink", "e_kink"], - "Low Energy Break": ["break_enable_low", "break_time_low", "cycle_low"], - "High Energy Break": ["break_enable_high", "break_time_high", "cycle_high"], - "XRD Triggers": ["exp_time", "n_of_trigger"], - } - - def __init__( - self, - start: float, - stop: float, - scan_time: float, - scan_duration: float, - p_kink: float, - e_kink: float, - break_enable_low: bool, - break_time_low: float, - cycle_low: int, - break_enable_high: bool, - break_time_high: float, - cycle_high: float, - exp_time: float, - n_of_trigger: int, - motor: DeviceBase = "mo1_bragg", - **kwargs, - ): - """The xas_advanced_scan is an oscillation motion on the mono motor - with XRD triggering at low and high energy ranges. - Start and Stop define the energy range for the scan, scan_time is the time for - one scan cycle and scan_duration is the duration of the scan. If scan duration - is set to 0, the scan will run infinitely. p_kink and e_kink add a kink to the - motion profile to slow down in the exafs region of the scan. - - Args: - start (float): Start angle for the scan. - stop (float): Stop angle for the scan. - scan_time (float): Time for one oscillation . - scan_duration (float): Total duration of the scan. - p_kink (float): Position of kink. - e_kink (float): Energy of the kink. - break_enable_low (bool): Enable breaks for the low energy range. - break_time_low (float): Break time for the low energy range. - cycle_low (int): Specify how often the triggers should be considered, - every nth cycle for low - break_enable_high (bool): Enable breaks for the high energy range. - break_time_high (float): Break time for the high energy range. - cycle_high (int): Specify how often the triggers should be considered, - every nth cycle for high - exp_time (float): Length of 1 trigger period in seconds - n_of_trigger (int): Amount of triggers to be fired during break - motor (DeviceBase, optional): Motor device to be used for the scan. - Defaults to "mo1_bragg". - - Examples: - >>> scans.xas_advanced_scan_with_xrd(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000) - """ - super().__init__( - start=start, - stop=stop, - scan_time=scan_time, - scan_duration=scan_duration, - p_kink=p_kink, - e_kink=e_kink, - motor=motor, - **kwargs, - ) - self.p_kink = p_kink - self.e_kink = e_kink - self.break_enable_low = break_enable_low - self.break_time_low = break_time_low - self.cycle_low = cycle_low - self.break_enable_high = break_enable_high - self.break_time_high = break_time_high - self.cycle_high = cycle_high - self.exp_time = exp_time - self.n_of_trigger = n_of_trigger diff --git a/debye_bec/scans/xas_simple_scan.py b/debye_bec/scans/xas_simple_scan.py new file mode 100644 index 0000000..581afd2 --- /dev/null +++ b/debye_bec/scans/xas_simple_scan.py @@ -0,0 +1,326 @@ +""" +V4 implementation of the Debye XAS simple scan. + +Scan procedure: + - prepare_scan + - open_scan + - stage + - pre_scan + - scan_core + - at_each_point (optionally called by scan_core) + - post_scan + - unstage + - close_scan + - on_exception (called if any exception is raised during the scan) +""" + +from __future__ import annotations + +import time +from typing import Annotated + +import numpy as np +from bec_lib.device import DeviceBase +from bec_lib.scan_args import ScanArgument, Units +from bec_server.scan_server.scans.scan_base import ScanBase, ScanType +from bec_server.scan_server.scans.scan_modifier import scan_hook + + +class XasSimpleScan(ScanBase): + scan_type = ScanType.HARDWARE_TRIGGERED + scan_name = "xas_simple_scan" + + gui_config = { + "Movement Parameters": ["start", "stop"], + "Scan Parameters": ["scan_time", "scan_duration", "primary_readout_cycle"], + } + + def __init__( + self, + #fmt: off + start: Annotated[float, ScanArgument(display_name="Start Energy", description="Start energy.", units=Units.eV)], + stop: Annotated[float, ScanArgument(display_name="Stop Energy", description="Stop energy.", units=Units.eV)], + scan_time: Annotated[float, ScanArgument(display_name="Scan Time", description="Time for one scan cycle.", units=Units.s, ge=0)], + scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Total scan duration.", units=Units.s, ge=0)], + motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None, + daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None, + primary_readout_cycle: Annotated[float, ScanArgument(display_name="Primary Readout Cycle", description="Delay between monitored readouts.",units=Units.s, gt=0,)] = 1, + #fmt: on + **kwargs, + ): + """ + Start a simple oscillating scan on the mono bragg motor. + + Args: + start (float): Start energy. + stop (float): Stop energy. + scan_time (float): Time for one scan cycle. + scan_duration (float): Total scan duration. + motor (DeviceBase | None): Bragg motor device. + daq (DeviceBase | None): NIDAQ device. + primary_readout_cycle (float): Delay between monitored readouts. + + Returns: + ScanReport + """ + super().__init__(**kwargs) + self.start = start + self.stop = stop + self.scan_time = scan_time + self.scan_duration = scan_duration + self.motor = motor if motor is not None else self.dev["mo1_bragg"] + self.daq = daq if daq is not None else self.dev["nidaq"] + self.primary_readout_cycle = primary_readout_cycle + self.positions = np.array([self.start, self.stop], dtype=float) + + # We pass on the arguments as "additional_scan_parameters" in the scan info + self.update_scan_info( + positions=self.positions, + scan_time=scan_time, + scan_duration=scan_duration, + primary_readout_cycle=primary_readout_cycle, + ) + self.actions.set_device_readout_priority([self.daq], priority="async") + + @scan_hook + def prepare_scan(self): + """ + Prepare the scan. This can include any steps that need to be executed + before the scan is opened, such as preparing the positions (if not done already) + or setting up the devices. + """ + self.actions.add_scan_report_instruction_device_progress(self.motor) + self._baseline_readout_status = self.actions.read_baseline_devices(wait=False) + + @scan_hook + def open_scan(self): + """ + Open the scan. + This step must call self.actions.open_scan() to ensure that a new scan is + opened. Make sure to prepare the scan metadata before, either in + prepare_scan() or in open_scan() itself and call self.update_scan_info(...) + to update the scan metadata if needed. + """ + self.actions.open_scan() + + @scan_hook + def stage(self): + """ + Stage the devices for the upcoming scan. The stage logic is typically + implemented on the device itself (i.e. by the device's stage method). + However, if there are any additional steps that need to be executed before + staging the devices, they can be implemented here. + """ + self.actions.stage_all_devices() + + @scan_hook + def pre_scan(self): + """ + Pre-scan steps to be executed before the main scan logic. + This is typically the last chance to prepare the devices before the core scan + logic is executed. For example, this is a good place to initialize time-criticial + devices, e.g. devices that have a short timeout. + The pre-scan logic is typically implemented on the device itself. + """ + self.actions.pre_scan_all_devices() + + @scan_hook + def scan_core(self): + """ + Core scan logic to be executed during the scan. + This is where the main scan logic should be implemented. + """ + self.actions.kickoff(self.motor) + completion_status = self.actions.complete(self.motor, wait=False) + + while not completion_status.done: + self.at_each_point() + + @scan_hook + def at_each_point(self): + """ + Logic to be executed at each acquisition point during the scan. + """ + self.actions.read_monitored_devices() + time.sleep(self.primary_readout_cycle) + + @scan_hook + def post_scan(self): + """ + Post-scan steps to be executed after the main scan logic. + """ + self.actions.complete_all_devices() + + @scan_hook + def unstage(self): + """Unstage the scan by executing post-scan steps.""" + self.actions.unstage_all_devices() + + @scan_hook + def close_scan(self): + """Close the scan.""" + if self._baseline_readout_status is not None: + self._baseline_readout_status.wait() + self.actions.close_scan() + self.actions.check_for_unchecked_statuses() + + @scan_hook + def on_exception(self, exception: Exception): + """ + Handle exceptions that occur during the scan. + This is a good place to implement any cleanup logic that needs to be executed in case of an exception, + such as returning the devices to a safe state or moving the motors back to their starting position. + """ + self.actions.complete_all_devices(wait=False) + + +class XasSimpleScanWithXrd(XasSimpleScan): + scan_name = "xas_simple_scan_with_xrd" + gui_config = { + "Movement Parameters": ["start", "stop"], + "Scan Parameters": ["scan_time", "scan_duration", "primary_readout_cycle"], + "Low Energy Break": ["break_enable_low", "break_time_low", "cycle_low"], + "High Energy Break": ["break_enable_high", "break_time_high", "cycle_high"], + "XRD Triggers": ["exp_time", "n_of_trigger"], + } + + def __init__( + self, + #fmt: off + start: Annotated[float, ScanArgument(display_name="Start Energy", description="Start energy.", units=Units.eV)], + stop: Annotated[float, ScanArgument(display_name="Stop Energy", description="Stop energy.", units=Units.eV)], + scan_time: Annotated[float, ScanArgument(display_name="Scan Time", description="Time for one scan cycle.", units=Units.s, ge=0)], + scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Total scan duration.", units=Units.s, ge=0)], + break_enable_low: Annotated[bool, ScanArgument(display_name="Break Enable Low", description="Enable breaks for the low energy range.")], + break_time_low: Annotated[float, ScanArgument(display_name="Break Time Low", description="Break time for the low energy range.", units=Units.s, ge=0)], + cycle_low: Annotated[int, ScanArgument(display_name="Cycle Low", description="Use triggers every nth low-energy cycle.", ge=0)], + break_enable_high: Annotated[bool, ScanArgument(display_name="Break Enable High", description="Enable breaks for the high energy range.")], + break_time_high: Annotated[float, ScanArgument(display_name="Break Time High", description="Break time for the high energy range.", units=Units.s, ge=0)], + cycle_high: Annotated[int, ScanArgument(display_name="Cycle High", description="Use triggers every nth high-energy cycle.", ge=0)], + exp_time: Annotated[float, ScanArgument(display_name="Exposure Time", description="Length of one trigger period.", units=Units.s, ge=0)], + n_of_trigger: Annotated[int, ScanArgument(display_name="Number Of Trigger", description="Amount of triggers fired during a break.", ge=0)], + motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None, + daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None, + primary_readout_cycle: Annotated[float, ScanArgument(display_name="Primary Readout Cycle", description="Delay between monitored readouts.", units=Units.s, gt=0)] = 1, + **kwargs, + #fmt: on + ): + super().__init__( + start=start, + stop=stop, + scan_time=scan_time, + scan_duration=scan_duration, + motor=motor, + daq=daq, + primary_readout_cycle=primary_readout_cycle, + **kwargs, + ) + + # We pass on the arguments as "additional_scan_parameters" in the scan info + self.update_scan_info( + break_enable_low=break_enable_low, + break_time_low=break_time_low, + cycle_low=cycle_low, + break_enable_high=break_enable_high, + break_time_high=break_time_high, + cycle_high=cycle_high, + exp_time=exp_time, + n_of_trigger=n_of_trigger, + ) + + +class XasAdvancedScan(XasSimpleScan): + scan_name = "xas_advanced_scan" + gui_config = { + "Movement Parameters": ["start", "stop"], + "Scan Parameters": ["scan_time", "scan_duration", "primary_readout_cycle"], + "Spline Parameters": ["p_kink", "e_kink"], + } + + def __init__( + self, + #fmt: off + start: Annotated[float, ScanArgument(display_name="Start Energy", description="Start energy.", units=Units.eV)], + stop: Annotated[float, ScanArgument(display_name="Stop Energy", description="Stop energy.", units=Units.eV)], + scan_time: Annotated[float, ScanArgument(display_name="Scan Time", description="Time for one scan cycle.", units=Units.s, ge=0)], + scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Total scan duration.", units=Units.s, ge=0)], + p_kink: Annotated[float, ScanArgument(display_name="P Kink", description="Position of the kink.", ge=0)], + e_kink: Annotated[float, ScanArgument(display_name="E Kink", description="Energy of the kink.", units=Units.eV)], + motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None, + daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None, + primary_readout_cycle: Annotated[float, ScanArgument(display_name="Primary Readout Cycle", description="Delay between monitored readouts.", units=Units.s, gt=0)] = 1, + **kwargs, + #fmt: on + ): + super().__init__( + start=start, + stop=stop, + scan_time=scan_time, + scan_duration=scan_duration, + motor=motor, + daq=daq, + primary_readout_cycle=primary_readout_cycle, + **kwargs, + ) + # We pass on the arguments as "additional_scan_parameters" in the scan info + self.update_scan_info(p_kink=p_kink, e_kink=e_kink) + + +class XasAdvancedScanWithXrd(XasAdvancedScan): + scan_name = "xas_advanced_scan_with_xrd" + gui_config = { + "Movement Parameters": ["start", "stop"], + "Scan Parameters": ["scan_time", "scan_duration", "primary_readout_cycle"], + "Spline Parameters": ["p_kink", "e_kink"], + "Low Energy Break": ["break_enable_low", "break_time_low", "cycle_low"], + "High Energy Break": ["break_enable_high", "break_time_high", "cycle_high"], + "XRD Triggers": ["exp_time", "n_of_trigger"], + } + + def __init__( + self, + #fmt: off + start: Annotated[float, ScanArgument(display_name="Start Energy", description="Start energy.", units=Units.eV)], + stop: Annotated[float, ScanArgument(display_name="Stop Energy", description="Stop energy.", units=Units.eV)], + scan_time: Annotated[float, ScanArgument(display_name="Scan Time", description="Time for one scan cycle.", units=Units.s, ge=0)], + scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Total scan duration.", units=Units.s, ge=0)], + p_kink: Annotated[float, ScanArgument(display_name="P Kink", description="Position of the kink.", ge=0)], + e_kink: Annotated[float, ScanArgument(display_name="E Kink", description="Energy of the kink.", units=Units.eV)], + break_enable_low: Annotated[bool, ScanArgument(display_name="Break Enable Low", description="Enable breaks for the low energy range.")], + break_time_low: Annotated[float, ScanArgument(display_name="Break Time Low", description="Break time for the low energy range.", units=Units.s, ge=0)], + cycle_low: Annotated[int, ScanArgument(display_name="Cycle Low", description="Use triggers every nth low-energy cycle.", ge=0)], + break_enable_high: Annotated[bool, ScanArgument(display_name="Break Enable High", description="Enable breaks for the high energy range.")], + break_time_high: Annotated[float, ScanArgument(display_name="Break Time High", description="Break time for the high energy range.", units=Units.s, ge=0)], + cycle_high: Annotated[int, ScanArgument(display_name="Cycle High", description="Use triggers every nth high-energy cycle.", ge=0)], + exp_time: Annotated[float, ScanArgument(display_name="Exposure Time", description="Length of one trigger period.", units=Units.s, ge=0)], + n_of_trigger: Annotated[int, ScanArgument(display_name="Number Of Trigger", description="Amount of triggers fired during a break.", ge=0)], + motor: Annotated[DeviceBase | None, ScanArgument(display_name="Motor", description="Bragg motor device.")] = None, + daq: Annotated[DeviceBase | None, ScanArgument(display_name="DAQ", description="NIDAQ device.")] = None, + primary_readout_cycle: Annotated[float, ScanArgument(display_name="Primary Readout Cycle", description="Delay between monitored readouts.", units=Units.s, gt=0)] = 1, + **kwargs, + #fmt: on + ): + super().__init__( + start=start, + stop=stop, + scan_time=scan_time, + scan_duration=scan_duration, + p_kink=p_kink, + e_kink=e_kink, + motor=motor, + daq=daq, + primary_readout_cycle=primary_readout_cycle, + **kwargs, + ) + + # We pass on the arguments as "additional_scan_parameters" in the scan info + self.update_scan_info( + break_enable_low=break_enable_low, + break_time_low=break_time_low, + cycle_low=cycle_low, + break_enable_high=break_enable_high, + break_time_high=break_time_high, + cycle_high=cycle_high, + exp_time=exp_time, + n_of_trigger=n_of_trigger, + ) diff --git a/tests/tests_scans/test_mono_bragg_scans_v4.py b/tests/tests_scans/test_mono_bragg_scans_v4.py new file mode 100644 index 0000000..bf791a1 --- /dev/null +++ b/tests/tests_scans/test_mono_bragg_scans_v4.py @@ -0,0 +1,159 @@ +# pylint: skip-file +from unittest import mock + +import numpy as np +import pytest +from bec_server.scan_server.tests.scan_hook_tests import ( + assert_close_scan_waits_for_baseline_and_closes, + assert_pre_scan_called, + assert_prepare_scan_reads_baseline_devices, + assert_scan_open_called, + assert_stage_all_devices_called, + assert_unstage_all_devices_called, + run_scan_tests, +) + +XAS_SIMPLE_SCAN_DEFAULT_HOOK_TESTS = [ + ("prepare_scan", [assert_prepare_scan_reads_baseline_devices]), + ("open_scan", [assert_scan_open_called]), + ("stage", [assert_stage_all_devices_called]), + ("pre_scan", [assert_pre_scan_called]), + ("unstage", [assert_unstage_all_devices_called]), + ("close_scan", [assert_close_scan_waits_for_baseline_and_closes]), +] + + +def _assemble_xas_simple_scan(v4_scan_assembler, **overrides): + params = { + "start": 8000.0, + "stop": 9000.0, + "scan_time": 1.0, + "scan_duration": 10.0, + "motor": "mo1_bragg", + "daq": "nidaq", + "primary_readout_cycle": 1.0, + } + params.update(overrides) + return v4_scan_assembler("xas_simple_scan", **params) + + +@pytest.mark.parametrize(("hook_name", "hook_tests"), XAS_SIMPLE_SCAN_DEFAULT_HOOK_TESTS) +def test_xas_simple_scan_v4_default_hooks( + v4_scan_assembler, nth_done_status_mock, hook_name, hook_tests +): + scan = _assemble_xas_simple_scan(v4_scan_assembler) + + run_scan_tests(scan, [(hook_name, hook_tests)], nth_done_status_mock=nth_done_status_mock) + + +def test_xas_simple_scan_v4_prepare_scan_updates_metadata(v4_scan_assembler): + scan = _assemble_xas_simple_scan(v4_scan_assembler) + scan.actions.add_scan_report_instruction_device_progress = mock.MagicMock() + baseline_status = mock.MagicMock() + scan.actions.read_baseline_devices = mock.MagicMock(return_value=baseline_status) + + scan.prepare_scan() + + scan.actions._build_scan_status_message("open") + + np.testing.assert_array_equal(scan.scan_info.positions, np.array([8000.0, 9000.0])) + assert scan.scan_info.additional_scan_parameters["scan_time"] == 1.0 + assert scan.scan_info.additional_scan_parameters["scan_duration"] == 10.0 + assert scan.scan_info.readout_priority_modification["async"] == ["nidaq"] + scan.actions.add_scan_report_instruction_device_progress.assert_called_once_with(scan.motor) + scan.actions.read_baseline_devices.assert_called_once_with(wait=False) + assert scan._baseline_readout_status is baseline_status + + +def test_xas_simple_scan_v4_scan_core_reads_until_complete(v4_scan_assembler, nth_done_status_mock): + scan = _assemble_xas_simple_scan(v4_scan_assembler) + completion_status = nth_done_status_mock(resolve_after=3) + scan.actions.kickoff = mock.MagicMock() + scan.actions.complete = mock.MagicMock(return_value=completion_status) + scan.actions.read_monitored_devices = mock.MagicMock() + + with mock.patch("debye_bec.scans.xas_simple_scan.time.sleep"): + scan.scan_core() + + scan.actions.kickoff.assert_called_once_with(scan.motor) + scan.actions.complete.assert_called_once_with(scan.motor, wait=False) + assert scan.actions.read_monitored_devices.call_count == 2 + + +def test_xas_simple_scan_v4_post_scan_completes_all_devices(v4_scan_assembler): + scan = _assemble_xas_simple_scan(v4_scan_assembler) + scan.actions.complete_all_devices = mock.MagicMock() + + scan.post_scan() + + scan.actions.complete_all_devices.assert_called_once_with() + + +def test_xas_simple_scan_with_xrd_v4_updates_xrd_metadata(v4_scan_assembler): + scan = v4_scan_assembler( + "xas_simple_scan_with_xrd", + start=8000.0, + stop=9000.0, + scan_time=1.0, + scan_duration=10.0, + break_enable_low=True, + break_time_low=1.0, + cycle_low=2, + break_enable_high=False, + break_time_high=3.0, + cycle_high=4, + exp_time=0.5, + n_of_trigger=6, + motor="mo1_bragg", + daq="nidaq", + ) + + assert scan.scan_name == "xas_simple_scan_with_xrd" + assert scan.scan_info.additional_scan_parameters["break_enable_low"] is True + assert scan.scan_info.additional_scan_parameters["cycle_high"] == 4 + assert scan.scan_info.additional_scan_parameters["n_of_trigger"] == 6 + + +def test_xas_advanced_scan_v4_updates_spline_metadata(v4_scan_assembler): + scan = v4_scan_assembler( + "xas_advanced_scan", + start=8000.0, + stop=9000.0, + scan_time=1.0, + scan_duration=10.0, + p_kink=50.0, + e_kink=8500.0, + motor="mo1_bragg", + daq="nidaq", + ) + + assert scan.scan_name == "xas_advanced_scan" + assert scan.scan_info.additional_scan_parameters["p_kink"] == 50.0 + assert scan.scan_info.additional_scan_parameters["e_kink"] == 8500.0 + + +def test_xas_advanced_scan_with_xrd_v4_updates_all_metadata(v4_scan_assembler): + scan = v4_scan_assembler( + "xas_advanced_scan_with_xrd", + start=8000.0, + stop=9000.0, + scan_time=1.0, + scan_duration=10.0, + p_kink=55.0, + e_kink=8450.0, + break_enable_low=True, + break_time_low=1.5, + cycle_low=2, + break_enable_high=True, + break_time_high=2.5, + cycle_high=3, + exp_time=0.25, + n_of_trigger=8, + motor="mo1_bragg", + daq="nidaq", + ) + + assert scan.scan_name == "xas_advanced_scan_with_xrd" + assert scan.scan_info.additional_scan_parameters["p_kink"] == 55.0 + assert scan.scan_info.additional_scan_parameters["break_enable_high"] is True + assert scan.scan_info.exp_time == 0.25