From f8ee82e5b3615235787f7efb8745c70402e61045 Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Sun, 10 May 2026 14:43:27 +0200 Subject: [PATCH] feat: add NIDAQ continuous scan v4 implementation and update related tests --- debye_bec/scans/__init__.py | 1 + debye_bec/scans/nidaq_continuous_scan.py | 174 +++++++ tests/tests_scans/conftest.py | 8 +- tests/tests_scans/test_mono_bragg_scans.py | 429 ------------------ tests/tests_scans/test_mono_bragg_scans_v4.py | 11 +- .../tests_scans/test_nidaq_continous_scan.py | 183 +++----- 6 files changed, 247 insertions(+), 559 deletions(-) create mode 100644 debye_bec/scans/nidaq_continuous_scan.py delete mode 100644 tests/tests_scans/test_mono_bragg_scans.py diff --git a/debye_bec/scans/__init__.py b/debye_bec/scans/__init__.py index 08e8f07..1339603 100644 --- a/debye_bec/scans/__init__.py +++ b/debye_bec/scans/__init__.py @@ -1,4 +1,5 @@ from .nidaq_cont_scan import NIDAQContinuousScan +from .nidaq_continuous_scan import NidaqContinuousScan from .xas_simple_scan import ( XasAdvancedScan, XasAdvancedScanWithXrd, diff --git a/debye_bec/scans/nidaq_continuous_scan.py b/debye_bec/scans/nidaq_continuous_scan.py new file mode 100644 index 0000000..ca67744 --- /dev/null +++ b/debye_bec/scans/nidaq_continuous_scan.py @@ -0,0 +1,174 @@ +""" +The NIDAQ continuous scan is used to measure with the NIDAQ without moving the monochromator or any other motor. + +Scan procedure: + - prepare_scan + - open_scan + - stage + - pre_scan + - scan_core + - at_each_point (optionally called by scan_core) + - post_scan + - unstage + - close_scan + - on_exception (called if any exception is raised during the scan) +""" + +from __future__ import annotations + +import time +from typing import Annotated + +from bec_lib.device import DeviceBase +from bec_lib.scan_args import ScanArgument, Units +from bec_server.scan_server.scans.scan_base import ScanBase, ScanType +from bec_server.scan_server.scans.scan_modifier import scan_hook + + +class NidaqContinuousScan(ScanBase): + # Scan Type: Hardware triggered or software triggered? + # If the main trigger and readout logic is done within the at_each_point method in scan_core, choose SOFTWARE_TRIGGERED. + # If the main trigger and readout logic is implemented on a device that is simply kicked off in this scan, choose HARDWARE_TRIGGERED. + # This primarily serves as information for devices: The device may need to react differently if a software trigger is expected + # for every point. + scan_type = ScanType.HARDWARE_TRIGGERED + + # Scan name: This is the name of the scan, e.g. "line_scan". This is used for display purposes and to identify the scan type in user interfaces. + # Choose a descriptive name that does not conflict with existing scan names. + # It must be a valid Python identifier, that is, it can only contain letters, numbers, and underscores, and must not start with a number. + scan_name = "nidaq_continuous_scan" + + gui_config = {"Scan Parameters": ["scan_duration", "daq", "compression"]} + + def __init__( + self, + #fmt: off + scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Duration of the scan", units=Units.s)], + daq: Annotated[DeviceBase | None, ScanArgument(display_name="Daq", description="DAQ device to be used for the scan")], + compression: Annotated[bool, ScanArgument(display_name="Compression", description="Whether to compress the data")], + #fmt: on + **kwargs, + ): + """ + The NIDAQ continuous scan is used to measure with the NIDAQ without moving the + monochromator or any other motor. The NIDAQ thus runs in continuous mode, with a + set scan_duration. + + Args: + scan_duration (float): Duration of the scan + daq (DeviceBase): DAQ device to be used for the scan + compression (bool): Whether to compress the data + + Returns: + ScanReport + """ + super().__init__(**kwargs) + self._baseline_readout_status = None + self.scan_duration = scan_duration + self.daq = daq or self.dev["nidaq"] + self.compression = compression + self.primary_readout_cycle = 1.0 # seconds + self.motors = [self.daq] + + self.update_scan_info(scan_duration=scan_duration, compression=compression) + self.actions.set_device_readout_priority([self.daq], priority="async") + + @scan_hook + def prepare_scan(self): + """ + Prepare the scan. This can include any steps that need to be executed + before the scan is opened, such as preparing the positions (if not done already) + or setting up the devices. + """ + + self.actions.add_scan_report_instruction_device_progress(self.daq) + self._baseline_readout_status = self.actions.read_baseline_devices(wait=False) + + @scan_hook + def open_scan(self): + """ + Open the scan. + This step must call self.actions.open_scan() to ensure that a new scan is + opened. Make sure to prepare the scan metadata before, either in + prepare_scan() or in open_scan() itself and call self.update_scan_info(...) + to update the scan metadata if needed. + """ + self.actions.open_scan() + + @scan_hook + def stage(self): + """ + Stage the devices for the upcoming scan. The stage logic is typically + implemented on the device itself (i.e. by the device's stage method). + However, if there are any additional steps that need to be executed before + staging the devices, they can be implemented here. + """ + self.actions.stage_all_devices() + + @scan_hook + def pre_scan(self): + """ + Pre-scan steps to be executed before the main scan logic. + This is typically the last chance to prepare the devices before the core scan + logic is executed. For example, this is a good place to initialize time-criticial + devices, e.g. devices that have a short timeout. + The pre-scan logic is typically implemented on the device itself. + """ + self.actions.pre_scan_all_devices() + + @scan_hook + def scan_core(self): + """ + Core scan logic to be executed during the scan. + This is where the main scan logic should be implemented. + """ + + kickoff_status = self.actions.kickoff(device=self.daq, wait=False) + kickoff_status.wait(timeout=5) # wait for proper kickoff of device + + complete_status = self.actions.complete(device=self.daq, wait=False) + + while not complete_status.done: + self.at_each_point() + time.sleep(self.primary_readout_cycle) + + @scan_hook + def at_each_point(self): + """ + Logic to be executed at each acquisition point during the scan. + """ + self.actions.read_monitored_devices() + + @scan_hook + def post_scan(self): + """ + Post-scan steps to be executed after the main scan logic. + """ + self.actions.complete_all_devices() + + @scan_hook + def unstage(self): + """Unstage the scan by executing post-scan steps.""" + self.actions.unstage_all_devices() + + @scan_hook + def close_scan(self): + """Close the scan.""" + if self._baseline_readout_status is not None: + self._baseline_readout_status.wait() + self.actions.close_scan() + self.actions.check_for_unchecked_statuses() + + @scan_hook + def on_exception(self, exception: Exception): + """ + Handle exceptions that occur during the scan. + This is a good place to implement any cleanup logic that needs to be executed in case of an exception, + such as returning the devices to a safe state or moving the motors back to their starting position. + """ + + ####################################################### + ######### Helper methods for the scan logic ########### + ####################################################### + + # Implement scan-specific helper methods below. diff --git a/tests/tests_scans/conftest.py b/tests/tests_scans/conftest.py index af89857..690abd4 100644 --- a/tests/tests_scans/conftest.py +++ b/tests/tests_scans/conftest.py @@ -3,10 +3,10 @@ from functools import partial import pytest from bec_server.device_server.tests.utils import DeviceMockType, DMMock -from bec_server.scan_server.tests.fixtures import ( - ScanStubStatusMock, - connector_mock, - instruction_handler_mock, +from bec_server.scan_server.tests.scan_fixtures import ( + nth_done_status_mock, + readout_priority, + v4_scan_assembler, ) diff --git a/tests/tests_scans/test_mono_bragg_scans.py b/tests/tests_scans/test_mono_bragg_scans.py deleted file mode 100644 index 789b24b..0000000 --- a/tests/tests_scans/test_mono_bragg_scans.py +++ /dev/null @@ -1,429 +0,0 @@ -# pylint: skip-file -from unittest import mock - -from bec_lib.messages import DeviceInstructionMessage -from bec_server.device_server.tests.utils import DMMock - -from debye_bec.scans import ( - XASAdvancedScan, - XASAdvancedScanWithXRD, - XASSimpleScan, - XASSimpleScanWithXRD, -) - - -def get_instructions(request, ScanStubStatusMock): - request.metadata["RID"] = "my_test_request_id" - - def fake_done(): - """ - Fake done function for ScanStubStatusMock. Upon each call, it returns the next value from the generator. - This is used to simulate the completion of the scan. - """ - yield False - yield False - yield True - - def fake_complete(*args, **kwargs): - yield "fake_complete" - return ScanStubStatusMock(done_func=fake_done) - - with ( - mock.patch.object(request.stubs, "complete", side_effect=fake_complete), - mock.patch.object(request.stubs, "_get_result_from_status", return_value=None), - ): - reference_commands = list(request.run()) - - for cmd in reference_commands: - if not cmd or isinstance(cmd, str): - continue - if "RID" in cmd.metadata: - cmd.metadata["RID"] = "my_test_request_id" - if "rpc_id" in cmd.parameter: - cmd.parameter["rpc_id"] = "my_test_rpc_id" - cmd.metadata.pop("device_instr_id", None) - - return reference_commands - - -def test_xas_simple_scan(scan_assembler, ScanStubStatusMock): - - request = scan_assembler(XASSimpleScan, start=0, stop=5, scan_time=1, scan_duration=10) - request.device_manager.add_device("nidaq") - reference_commands = get_instructions(request, ScanStubStatusMock) - - assert reference_commands == [ - None, - None, - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device=None, - action="scan_report_instruction", - parameter={"device_progress": ["mo1_bragg"]}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device=None, - action="open_scan", - parameter={ - "readout_priority": { - "monitored": [], - "baseline": [], - "on_request": [], - "async": ["nidaq"], - }, - "num_points": None, - "positions": [0.0, 5.0], - "scan_name": "xas_simple_scan", - "scan_type": "fly", - }, - ), - DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}), - DeviceInstructionMessage( - metadata={}, - device=["bpm4i", "eiger", "mo1_bragg", "samx"], - action="stage", - parameter={}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "baseline", "RID": "my_test_request_id"}, - device=["samx"], - action="read", - parameter={}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"], - action="pre_scan", - parameter={}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device="mo1_bragg", - action="kickoff", - parameter={"configure": {}}, - ), - "fake_complete", - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0}, - device=["bpm4i", "eiger", "mo1_bragg"], - action="read", - parameter={"group": "monitored"}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1}, - device=["bpm4i", "eiger", "mo1_bragg"], - action="read", - parameter={"group": "monitored"}, - ), - "fake_complete", - DeviceInstructionMessage( - metadata={}, - device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"], - action="unstage", - parameter={}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device=None, - action="close_scan", - parameter={}, - ), - ] - - -def test_xas_simple_scan_with_xrd(scan_assembler, ScanStubStatusMock): - - request = scan_assembler( - XASSimpleScanWithXRD, - start=0, - stop=5, - scan_time=1, - scan_duration=10, - break_enable_low=True, - break_time_low=1, - cycle_low=1, - break_enable_high=True, - break_time_high=2, - exp_time=1, - n_of_trigger=1, - cycle_high=4, - ) - request.device_manager.add_device("nidaq") - reference_commands = get_instructions(request, ScanStubStatusMock) - # TODO #64 based on creating this ScanStatusMessage, we should test the logic of stage/kickoff/complete/unstage in Pilatus and mo1Bragg - - assert reference_commands == [ - None, - None, - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device=None, - action="scan_report_instruction", - parameter={"device_progress": ["mo1_bragg"]}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device=None, - action="open_scan", - parameter={ - "readout_priority": { - "monitored": [], - "baseline": [], - "on_request": [], - "async": ["nidaq"], - }, - "num_points": None, - "positions": [0.0, 5.0], - "scan_name": "xas_simple_scan_with_xrd", - "scan_type": "fly", - }, - ), - DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}), - DeviceInstructionMessage( - metadata={}, - device=["bpm4i", "eiger", "mo1_bragg", "samx"], - action="stage", - parameter={}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "baseline", "RID": "my_test_request_id"}, - device=["samx"], - action="read", - parameter={}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"], - action="pre_scan", - parameter={}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device="mo1_bragg", - action="kickoff", - parameter={"configure": {}}, - ), - "fake_complete", - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0}, - device=["bpm4i", "eiger", "mo1_bragg"], - action="read", - parameter={"group": "monitored"}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1}, - device=["bpm4i", "eiger", "mo1_bragg"], - action="read", - parameter={"group": "monitored"}, - ), - "fake_complete", - DeviceInstructionMessage( - metadata={}, - device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"], - action="unstage", - parameter={}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device=None, - action="close_scan", - parameter={}, - ), - ] - - -def test_xas_advanced_scan(scan_assembler, ScanStubStatusMock): - - request = scan_assembler( - XASAdvancedScan, - start=8000, - stop=9000, - scan_time=1, - scan_duration=10, - p_kink=50, - e_kink=8500, - ) - request.device_manager.add_device("nidaq") - reference_commands = get_instructions(request, ScanStubStatusMock) - - assert reference_commands == [ - None, - None, - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device=None, - action="scan_report_instruction", - parameter={"device_progress": ["mo1_bragg"]}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device=None, - action="open_scan", - parameter={ - "readout_priority": { - "monitored": [], - "baseline": [], - "on_request": [], - "async": ["nidaq"], - }, - "num_points": None, - "positions": [8000.0, 9000.0], - "scan_name": "xas_advanced_scan", - "scan_type": "fly", - }, - ), - DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}), - DeviceInstructionMessage( - metadata={}, - device=["bpm4i", "eiger", "mo1_bragg", "samx"], - action="stage", - parameter={}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "baseline", "RID": "my_test_request_id"}, - device=["samx"], - action="read", - parameter={}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"], - action="pre_scan", - parameter={}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device="mo1_bragg", - action="kickoff", - parameter={"configure": {}}, - ), - "fake_complete", - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0}, - device=["bpm4i", "eiger", "mo1_bragg"], - action="read", - parameter={"group": "monitored"}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1}, - device=["bpm4i", "eiger", "mo1_bragg"], - action="read", - parameter={"group": "monitored"}, - ), - "fake_complete", - DeviceInstructionMessage( - metadata={}, - device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"], - action="unstage", - parameter={}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device=None, - action="close_scan", - parameter={}, - ), - ] - - -def test_xas_advanced_scan_with_xrd(scan_assembler, ScanStubStatusMock): - - request = scan_assembler( - XASAdvancedScanWithXRD, - start=8000, - stop=9000, - scan_time=1, - scan_duration=10, - p_kink=50, - e_kink=8500, - break_enable_low=True, - break_time_low=1, - cycle_low=1, - break_enable_high=True, - break_time_high=2, - exp_time=1, - n_of_trigger=1, - cycle_high=4, - ) - request.device_manager.add_device("nidaq") - reference_commands = get_instructions(request, ScanStubStatusMock) - - assert reference_commands == [ - None, - None, - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device=None, - action="scan_report_instruction", - parameter={"device_progress": ["mo1_bragg"]}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device=None, - action="open_scan", - parameter={ - "readout_priority": { - "monitored": [], - "baseline": [], - "on_request": [], - "async": ["nidaq"], - }, - "num_points": None, - "positions": [8000.0, 9000.0], - "scan_name": "xas_advanced_scan_with_xrd", - "scan_type": "fly", - }, - ), - DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}), - DeviceInstructionMessage( - metadata={}, - device=["bpm4i", "eiger", "mo1_bragg", "samx"], - action="stage", - parameter={}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "baseline", "RID": "my_test_request_id"}, - device=["samx"], - action="read", - parameter={}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"], - action="pre_scan", - parameter={}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device="mo1_bragg", - action="kickoff", - parameter={"configure": {}}, - ), - "fake_complete", - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0}, - device=["bpm4i", "eiger", "mo1_bragg"], - action="read", - parameter={"group": "monitored"}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1}, - device=["bpm4i", "eiger", "mo1_bragg"], - action="read", - parameter={"group": "monitored"}, - ), - "fake_complete", - DeviceInstructionMessage( - metadata={}, - device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"], - action="unstage", - parameter={}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device=None, - action="close_scan", - parameter={}, - ), - ] diff --git a/tests/tests_scans/test_mono_bragg_scans_v4.py b/tests/tests_scans/test_mono_bragg_scans_v4.py index bf791a1..078c53a 100644 --- a/tests/tests_scans/test_mono_bragg_scans_v4.py +++ b/tests/tests_scans/test_mono_bragg_scans_v4.py @@ -3,15 +3,8 @@ from unittest import mock import numpy as np import pytest -from bec_server.scan_server.tests.scan_hook_tests import ( - assert_close_scan_waits_for_baseline_and_closes, - assert_pre_scan_called, - assert_prepare_scan_reads_baseline_devices, - assert_scan_open_called, - assert_stage_all_devices_called, - assert_unstage_all_devices_called, - run_scan_tests, -) +from bec_server.scan_server.tests.scan_fixtures import * +from bec_server.scan_server.tests.scan_hook_tests import * XAS_SIMPLE_SCAN_DEFAULT_HOOK_TESTS = [ ("prepare_scan", [assert_prepare_scan_reads_baseline_devices]), diff --git a/tests/tests_scans/test_nidaq_continous_scan.py b/tests/tests_scans/test_nidaq_continous_scan.py index 4596d84..f2c10ec 100644 --- a/tests/tests_scans/test_nidaq_continous_scan.py +++ b/tests/tests_scans/test_nidaq_continous_scan.py @@ -1,126 +1,75 @@ # pylint: skip-file from unittest import mock -from bec_lib.messages import DeviceInstructionMessage -from bec_server.device_server.tests.utils import DMMock +import pytest +from bec_server.scan_server.tests.scan_fixtures import * +from bec_server.scan_server.tests.scan_hook_tests import * -from debye_bec.scans import NIDAQContinuousScan +NIDAQ_CONTINUOUS_SCAN_DEFAULT_HOOK_TESTS = [ + ("prepare_scan", [assert_prepare_scan_reads_baseline_devices]), + ("open_scan", [assert_scan_open_called]), + ("stage", [assert_stage_all_devices_called]), + ("pre_scan", [assert_pre_scan_called]), + ("unstage", [assert_unstage_all_devices_called]), + ("close_scan", [assert_close_scan_waits_for_baseline_and_closes]), +] -def get_instructions(request, ScanStubStatusMock): - request.metadata["RID"] = "my_test_request_id" - - def fake_done(): - """ - Fake done function for ScanStubStatusMock. Upon each call, it returns the next value from the generator. - This is used to simulate the completion of the scan. - """ - yield False - yield False - yield True - - def fake_complete(*args, **kwargs): - yield "fake_complete" - return ScanStubStatusMock(done_func=fake_done) - - with ( - mock.patch.object(request.stubs, "complete", side_effect=fake_complete), - mock.patch.object(request.stubs, "_get_result_from_status", return_value=None), - ): - reference_commands = list(request.run()) - - for cmd in reference_commands: - if not cmd or isinstance(cmd, str): - continue - if "RID" in cmd.metadata: - cmd.metadata["RID"] = "my_test_request_id" - if "rpc_id" in cmd.parameter: - cmd.parameter["rpc_id"] = "my_test_rpc_id" - cmd.metadata.pop("device_instr_id", None) - - return reference_commands +def _assemble_nidaq_continuous_scan(v4_scan_assembler, **overrides): + params = {"scan_duration": 10.0, "daq": "nidaq", "compression": False} + params.update(overrides) + return v4_scan_assembler("nidaq_continuous_scan", **params) -def test_xas_simple_scan(scan_assembler, ScanStubStatusMock): +@pytest.mark.parametrize(("hook_name", "hook_tests"), NIDAQ_CONTINUOUS_SCAN_DEFAULT_HOOK_TESTS) +def test_nidaq_continuous_scan_v4_default_hooks( + v4_scan_assembler, nth_done_status_mock, hook_name, hook_tests +): + scan = _assemble_nidaq_continuous_scan(v4_scan_assembler) - request = scan_assembler(NIDAQContinuousScan, scan_duration=10) - request.device_manager.add_device("nidaq") - reference_commands = get_instructions(request, ScanStubStatusMock) - assert reference_commands == [ - None, - None, - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device=None, - action="scan_report_instruction", - parameter={"device_progress": ["nidaq"]}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device=None, - action="open_scan", - parameter={ - "readout_priority": { - "monitored": [], - "baseline": [], - "on_request": [], - "async": ["nidaq"], - }, - "num_points": 0, - "positions": [], - "scan_name": "nidaq_continuous_scan", - "scan_type": "fly", - }, - ), - DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}), - DeviceInstructionMessage( - metadata={}, - device=["bpm4i", "eiger", "mo1_bragg", "samx"], - action="stage", - parameter={}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "baseline", "RID": "my_test_request_id"}, - device=["samx"], - action="read", - parameter={}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"], - action="pre_scan", - parameter={}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device="nidaq", - action="kickoff", - parameter={"configure": {}}, - ), - "fake_complete", - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0}, - device=["bpm4i", "eiger", "mo1_bragg"], - action="read", - parameter={"group": "monitored"}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1}, - device=["bpm4i", "eiger", "mo1_bragg"], - action="read", - parameter={"group": "monitored"}, - ), - "fake_complete", - DeviceInstructionMessage( - metadata={}, - device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"], - action="unstage", - parameter={}, - ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device=None, - action="close_scan", - parameter={}, - ), - ] + run_scan_tests(scan, [(hook_name, hook_tests)], nth_done_status_mock=nth_done_status_mock) + + +def test_nidaq_continuous_scan_v4_prepare_scan_updates_metadata(v4_scan_assembler): + scan = _assemble_nidaq_continuous_scan(v4_scan_assembler) + scan.actions.add_scan_report_instruction_device_progress = mock.MagicMock() + baseline_status = mock.MagicMock() + scan.actions.read_baseline_devices = mock.MagicMock(return_value=baseline_status) + + scan.prepare_scan() + scan.actions._build_scan_status_message("open") + + assert scan.scan_info.additional_scan_parameters["scan_duration"] == 10.0 + assert scan.scan_info.additional_scan_parameters["compression"] is False + assert scan.scan_info.readout_priority_modification["async"] == ["nidaq"] + scan.actions.add_scan_report_instruction_device_progress.assert_called_once_with(scan.daq) + scan.actions.read_baseline_devices.assert_called_once_with(wait=False) + assert scan._baseline_readout_status is baseline_status + + +def test_nidaq_continuous_scan_v4_scan_core_reads_until_complete( + v4_scan_assembler, nth_done_status_mock +): + scan = _assemble_nidaq_continuous_scan(v4_scan_assembler) + kickoff_status = mock.MagicMock() + completion_status = nth_done_status_mock(resolve_after=3) + scan.actions.kickoff = mock.MagicMock(return_value=kickoff_status) + scan.actions.complete = mock.MagicMock(return_value=completion_status) + scan.actions.read_monitored_devices = mock.MagicMock() + + with mock.patch("debye_bec.scans.nidaq_continuous_scan.time.sleep"): + scan.scan_core() + + scan.actions.kickoff.assert_called_once_with(device=scan.daq, wait=False) + kickoff_status.wait.assert_called_once_with(timeout=5) + scan.actions.complete.assert_called_once_with(device=scan.daq, wait=False) + assert scan.actions.read_monitored_devices.call_count == 2 + + +def test_nidaq_continuous_scan_v4_post_scan_completes_all_devices(v4_scan_assembler): + scan = _assemble_nidaq_continuous_scan(v4_scan_assembler) + scan.actions.complete_all_devices = mock.MagicMock() + + scan.post_scan() + + scan.actions.complete_all_devices.assert_called_once_with()