feat: add NIDAQ continuous scan v4 implementation and update related tests
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
from .nidaq_cont_scan import NIDAQContinuousScan
|
||||
from .nidaq_continuous_scan import NidaqContinuousScan
|
||||
from .xas_simple_scan import (
|
||||
XasAdvancedScan,
|
||||
XasAdvancedScanWithXrd,
|
||||
|
||||
@@ -0,0 +1,174 @@
|
||||
"""
|
||||
The NIDAQ continuous scan is used to measure with the NIDAQ without moving the monochromator or any other motor.
|
||||
|
||||
Scan procedure:
|
||||
- prepare_scan
|
||||
- open_scan
|
||||
- stage
|
||||
- pre_scan
|
||||
- scan_core
|
||||
- at_each_point (optionally called by scan_core)
|
||||
- post_scan
|
||||
- unstage
|
||||
- close_scan
|
||||
- on_exception (called if any exception is raised during the scan)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from typing import Annotated
|
||||
|
||||
from bec_lib.device import DeviceBase
|
||||
from bec_lib.scan_args import ScanArgument, Units
|
||||
from bec_server.scan_server.scans.scan_base import ScanBase, ScanType
|
||||
from bec_server.scan_server.scans.scan_modifier import scan_hook
|
||||
|
||||
|
||||
class NidaqContinuousScan(ScanBase):
|
||||
# Scan Type: Hardware triggered or software triggered?
|
||||
# If the main trigger and readout logic is done within the at_each_point method in scan_core, choose SOFTWARE_TRIGGERED.
|
||||
# If the main trigger and readout logic is implemented on a device that is simply kicked off in this scan, choose HARDWARE_TRIGGERED.
|
||||
# This primarily serves as information for devices: The device may need to react differently if a software trigger is expected
|
||||
# for every point.
|
||||
scan_type = ScanType.HARDWARE_TRIGGERED
|
||||
|
||||
# Scan name: This is the name of the scan, e.g. "line_scan". This is used for display purposes and to identify the scan type in user interfaces.
|
||||
# Choose a descriptive name that does not conflict with existing scan names.
|
||||
# It must be a valid Python identifier, that is, it can only contain letters, numbers, and underscores, and must not start with a number.
|
||||
scan_name = "nidaq_continuous_scan"
|
||||
|
||||
gui_config = {"Scan Parameters": ["scan_duration", "daq", "compression"]}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
#fmt: off
|
||||
scan_duration: Annotated[float, ScanArgument(display_name="Scan Duration", description="Duration of the scan", units=Units.s)],
|
||||
daq: Annotated[DeviceBase | None, ScanArgument(display_name="Daq", description="DAQ device to be used for the scan")],
|
||||
compression: Annotated[bool, ScanArgument(display_name="Compression", description="Whether to compress the data")],
|
||||
#fmt: on
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
The NIDAQ continuous scan is used to measure with the NIDAQ without moving the
|
||||
monochromator or any other motor. The NIDAQ thus runs in continuous mode, with a
|
||||
set scan_duration.
|
||||
|
||||
Args:
|
||||
scan_duration (float): Duration of the scan
|
||||
daq (DeviceBase): DAQ device to be used for the scan
|
||||
compression (bool): Whether to compress the data
|
||||
|
||||
Returns:
|
||||
ScanReport
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self._baseline_readout_status = None
|
||||
self.scan_duration = scan_duration
|
||||
self.daq = daq or self.dev["nidaq"]
|
||||
self.compression = compression
|
||||
self.primary_readout_cycle = 1.0 # seconds
|
||||
self.motors = [self.daq]
|
||||
|
||||
self.update_scan_info(scan_duration=scan_duration, compression=compression)
|
||||
self.actions.set_device_readout_priority([self.daq], priority="async")
|
||||
|
||||
@scan_hook
|
||||
def prepare_scan(self):
|
||||
"""
|
||||
Prepare the scan. This can include any steps that need to be executed
|
||||
before the scan is opened, such as preparing the positions (if not done already)
|
||||
or setting up the devices.
|
||||
"""
|
||||
|
||||
self.actions.add_scan_report_instruction_device_progress(self.daq)
|
||||
self._baseline_readout_status = self.actions.read_baseline_devices(wait=False)
|
||||
|
||||
@scan_hook
|
||||
def open_scan(self):
|
||||
"""
|
||||
Open the scan.
|
||||
This step must call self.actions.open_scan() to ensure that a new scan is
|
||||
opened. Make sure to prepare the scan metadata before, either in
|
||||
prepare_scan() or in open_scan() itself and call self.update_scan_info(...)
|
||||
to update the scan metadata if needed.
|
||||
"""
|
||||
self.actions.open_scan()
|
||||
|
||||
@scan_hook
|
||||
def stage(self):
|
||||
"""
|
||||
Stage the devices for the upcoming scan. The stage logic is typically
|
||||
implemented on the device itself (i.e. by the device's stage method).
|
||||
However, if there are any additional steps that need to be executed before
|
||||
staging the devices, they can be implemented here.
|
||||
"""
|
||||
self.actions.stage_all_devices()
|
||||
|
||||
@scan_hook
|
||||
def pre_scan(self):
|
||||
"""
|
||||
Pre-scan steps to be executed before the main scan logic.
|
||||
This is typically the last chance to prepare the devices before the core scan
|
||||
logic is executed. For example, this is a good place to initialize time-criticial
|
||||
devices, e.g. devices that have a short timeout.
|
||||
The pre-scan logic is typically implemented on the device itself.
|
||||
"""
|
||||
self.actions.pre_scan_all_devices()
|
||||
|
||||
@scan_hook
|
||||
def scan_core(self):
|
||||
"""
|
||||
Core scan logic to be executed during the scan.
|
||||
This is where the main scan logic should be implemented.
|
||||
"""
|
||||
|
||||
kickoff_status = self.actions.kickoff(device=self.daq, wait=False)
|
||||
kickoff_status.wait(timeout=5) # wait for proper kickoff of device
|
||||
|
||||
complete_status = self.actions.complete(device=self.daq, wait=False)
|
||||
|
||||
while not complete_status.done:
|
||||
self.at_each_point()
|
||||
time.sleep(self.primary_readout_cycle)
|
||||
|
||||
@scan_hook
|
||||
def at_each_point(self):
|
||||
"""
|
||||
Logic to be executed at each acquisition point during the scan.
|
||||
"""
|
||||
self.actions.read_monitored_devices()
|
||||
|
||||
@scan_hook
|
||||
def post_scan(self):
|
||||
"""
|
||||
Post-scan steps to be executed after the main scan logic.
|
||||
"""
|
||||
self.actions.complete_all_devices()
|
||||
|
||||
@scan_hook
|
||||
def unstage(self):
|
||||
"""Unstage the scan by executing post-scan steps."""
|
||||
self.actions.unstage_all_devices()
|
||||
|
||||
@scan_hook
|
||||
def close_scan(self):
|
||||
"""Close the scan."""
|
||||
if self._baseline_readout_status is not None:
|
||||
self._baseline_readout_status.wait()
|
||||
self.actions.close_scan()
|
||||
self.actions.check_for_unchecked_statuses()
|
||||
|
||||
@scan_hook
|
||||
def on_exception(self, exception: Exception):
|
||||
"""
|
||||
Handle exceptions that occur during the scan.
|
||||
This is a good place to implement any cleanup logic that needs to be executed in case of an exception,
|
||||
such as returning the devices to a safe state or moving the motors back to their starting position.
|
||||
"""
|
||||
|
||||
#######################################################
|
||||
######### Helper methods for the scan logic ###########
|
||||
#######################################################
|
||||
|
||||
# Implement scan-specific helper methods below.
|
||||
@@ -3,10 +3,10 @@ from functools import partial
|
||||
|
||||
import pytest
|
||||
from bec_server.device_server.tests.utils import DeviceMockType, DMMock
|
||||
from bec_server.scan_server.tests.fixtures import (
|
||||
ScanStubStatusMock,
|
||||
connector_mock,
|
||||
instruction_handler_mock,
|
||||
from bec_server.scan_server.tests.scan_fixtures import (
|
||||
nth_done_status_mock,
|
||||
readout_priority,
|
||||
v4_scan_assembler,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -1,429 +0,0 @@
|
||||
# pylint: skip-file
|
||||
from unittest import mock
|
||||
|
||||
from bec_lib.messages import DeviceInstructionMessage
|
||||
from bec_server.device_server.tests.utils import DMMock
|
||||
|
||||
from debye_bec.scans import (
|
||||
XASAdvancedScan,
|
||||
XASAdvancedScanWithXRD,
|
||||
XASSimpleScan,
|
||||
XASSimpleScanWithXRD,
|
||||
)
|
||||
|
||||
|
||||
def get_instructions(request, ScanStubStatusMock):
|
||||
request.metadata["RID"] = "my_test_request_id"
|
||||
|
||||
def fake_done():
|
||||
"""
|
||||
Fake done function for ScanStubStatusMock. Upon each call, it returns the next value from the generator.
|
||||
This is used to simulate the completion of the scan.
|
||||
"""
|
||||
yield False
|
||||
yield False
|
||||
yield True
|
||||
|
||||
def fake_complete(*args, **kwargs):
|
||||
yield "fake_complete"
|
||||
return ScanStubStatusMock(done_func=fake_done)
|
||||
|
||||
with (
|
||||
mock.patch.object(request.stubs, "complete", side_effect=fake_complete),
|
||||
mock.patch.object(request.stubs, "_get_result_from_status", return_value=None),
|
||||
):
|
||||
reference_commands = list(request.run())
|
||||
|
||||
for cmd in reference_commands:
|
||||
if not cmd or isinstance(cmd, str):
|
||||
continue
|
||||
if "RID" in cmd.metadata:
|
||||
cmd.metadata["RID"] = "my_test_request_id"
|
||||
if "rpc_id" in cmd.parameter:
|
||||
cmd.parameter["rpc_id"] = "my_test_rpc_id"
|
||||
cmd.metadata.pop("device_instr_id", None)
|
||||
|
||||
return reference_commands
|
||||
|
||||
|
||||
def test_xas_simple_scan(scan_assembler, ScanStubStatusMock):
|
||||
|
||||
request = scan_assembler(XASSimpleScan, start=0, stop=5, scan_time=1, scan_duration=10)
|
||||
request.device_manager.add_device("nidaq")
|
||||
reference_commands = get_instructions(request, ScanStubStatusMock)
|
||||
|
||||
assert reference_commands == [
|
||||
None,
|
||||
None,
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="scan_report_instruction",
|
||||
parameter={"device_progress": ["mo1_bragg"]},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="open_scan",
|
||||
parameter={
|
||||
"readout_priority": {
|
||||
"monitored": [],
|
||||
"baseline": [],
|
||||
"on_request": [],
|
||||
"async": ["nidaq"],
|
||||
},
|
||||
"num_points": None,
|
||||
"positions": [0.0, 5.0],
|
||||
"scan_name": "xas_simple_scan",
|
||||
"scan_type": "fly",
|
||||
},
|
||||
),
|
||||
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
|
||||
DeviceInstructionMessage(
|
||||
metadata={},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
|
||||
action="stage",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
|
||||
device=["samx"],
|
||||
action="read",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
|
||||
action="pre_scan",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device="mo1_bragg",
|
||||
action="kickoff",
|
||||
parameter={"configure": {}},
|
||||
),
|
||||
"fake_complete",
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
|
||||
device=["bpm4i", "eiger", "mo1_bragg"],
|
||||
action="read",
|
||||
parameter={"group": "monitored"},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
|
||||
device=["bpm4i", "eiger", "mo1_bragg"],
|
||||
action="read",
|
||||
parameter={"group": "monitored"},
|
||||
),
|
||||
"fake_complete",
|
||||
DeviceInstructionMessage(
|
||||
metadata={},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
|
||||
action="unstage",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="close_scan",
|
||||
parameter={},
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def test_xas_simple_scan_with_xrd(scan_assembler, ScanStubStatusMock):
|
||||
|
||||
request = scan_assembler(
|
||||
XASSimpleScanWithXRD,
|
||||
start=0,
|
||||
stop=5,
|
||||
scan_time=1,
|
||||
scan_duration=10,
|
||||
break_enable_low=True,
|
||||
break_time_low=1,
|
||||
cycle_low=1,
|
||||
break_enable_high=True,
|
||||
break_time_high=2,
|
||||
exp_time=1,
|
||||
n_of_trigger=1,
|
||||
cycle_high=4,
|
||||
)
|
||||
request.device_manager.add_device("nidaq")
|
||||
reference_commands = get_instructions(request, ScanStubStatusMock)
|
||||
# TODO #64 based on creating this ScanStatusMessage, we should test the logic of stage/kickoff/complete/unstage in Pilatus and mo1Bragg
|
||||
|
||||
assert reference_commands == [
|
||||
None,
|
||||
None,
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="scan_report_instruction",
|
||||
parameter={"device_progress": ["mo1_bragg"]},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="open_scan",
|
||||
parameter={
|
||||
"readout_priority": {
|
||||
"monitored": [],
|
||||
"baseline": [],
|
||||
"on_request": [],
|
||||
"async": ["nidaq"],
|
||||
},
|
||||
"num_points": None,
|
||||
"positions": [0.0, 5.0],
|
||||
"scan_name": "xas_simple_scan_with_xrd",
|
||||
"scan_type": "fly",
|
||||
},
|
||||
),
|
||||
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
|
||||
DeviceInstructionMessage(
|
||||
metadata={},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
|
||||
action="stage",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
|
||||
device=["samx"],
|
||||
action="read",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
|
||||
action="pre_scan",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device="mo1_bragg",
|
||||
action="kickoff",
|
||||
parameter={"configure": {}},
|
||||
),
|
||||
"fake_complete",
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
|
||||
device=["bpm4i", "eiger", "mo1_bragg"],
|
||||
action="read",
|
||||
parameter={"group": "monitored"},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
|
||||
device=["bpm4i", "eiger", "mo1_bragg"],
|
||||
action="read",
|
||||
parameter={"group": "monitored"},
|
||||
),
|
||||
"fake_complete",
|
||||
DeviceInstructionMessage(
|
||||
metadata={},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
|
||||
action="unstage",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="close_scan",
|
||||
parameter={},
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def test_xas_advanced_scan(scan_assembler, ScanStubStatusMock):
|
||||
|
||||
request = scan_assembler(
|
||||
XASAdvancedScan,
|
||||
start=8000,
|
||||
stop=9000,
|
||||
scan_time=1,
|
||||
scan_duration=10,
|
||||
p_kink=50,
|
||||
e_kink=8500,
|
||||
)
|
||||
request.device_manager.add_device("nidaq")
|
||||
reference_commands = get_instructions(request, ScanStubStatusMock)
|
||||
|
||||
assert reference_commands == [
|
||||
None,
|
||||
None,
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="scan_report_instruction",
|
||||
parameter={"device_progress": ["mo1_bragg"]},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="open_scan",
|
||||
parameter={
|
||||
"readout_priority": {
|
||||
"monitored": [],
|
||||
"baseline": [],
|
||||
"on_request": [],
|
||||
"async": ["nidaq"],
|
||||
},
|
||||
"num_points": None,
|
||||
"positions": [8000.0, 9000.0],
|
||||
"scan_name": "xas_advanced_scan",
|
||||
"scan_type": "fly",
|
||||
},
|
||||
),
|
||||
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
|
||||
DeviceInstructionMessage(
|
||||
metadata={},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
|
||||
action="stage",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
|
||||
device=["samx"],
|
||||
action="read",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
|
||||
action="pre_scan",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device="mo1_bragg",
|
||||
action="kickoff",
|
||||
parameter={"configure": {}},
|
||||
),
|
||||
"fake_complete",
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
|
||||
device=["bpm4i", "eiger", "mo1_bragg"],
|
||||
action="read",
|
||||
parameter={"group": "monitored"},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
|
||||
device=["bpm4i", "eiger", "mo1_bragg"],
|
||||
action="read",
|
||||
parameter={"group": "monitored"},
|
||||
),
|
||||
"fake_complete",
|
||||
DeviceInstructionMessage(
|
||||
metadata={},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
|
||||
action="unstage",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="close_scan",
|
||||
parameter={},
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def test_xas_advanced_scan_with_xrd(scan_assembler, ScanStubStatusMock):
|
||||
|
||||
request = scan_assembler(
|
||||
XASAdvancedScanWithXRD,
|
||||
start=8000,
|
||||
stop=9000,
|
||||
scan_time=1,
|
||||
scan_duration=10,
|
||||
p_kink=50,
|
||||
e_kink=8500,
|
||||
break_enable_low=True,
|
||||
break_time_low=1,
|
||||
cycle_low=1,
|
||||
break_enable_high=True,
|
||||
break_time_high=2,
|
||||
exp_time=1,
|
||||
n_of_trigger=1,
|
||||
cycle_high=4,
|
||||
)
|
||||
request.device_manager.add_device("nidaq")
|
||||
reference_commands = get_instructions(request, ScanStubStatusMock)
|
||||
|
||||
assert reference_commands == [
|
||||
None,
|
||||
None,
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="scan_report_instruction",
|
||||
parameter={"device_progress": ["mo1_bragg"]},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="open_scan",
|
||||
parameter={
|
||||
"readout_priority": {
|
||||
"monitored": [],
|
||||
"baseline": [],
|
||||
"on_request": [],
|
||||
"async": ["nidaq"],
|
||||
},
|
||||
"num_points": None,
|
||||
"positions": [8000.0, 9000.0],
|
||||
"scan_name": "xas_advanced_scan_with_xrd",
|
||||
"scan_type": "fly",
|
||||
},
|
||||
),
|
||||
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
|
||||
DeviceInstructionMessage(
|
||||
metadata={},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
|
||||
action="stage",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
|
||||
device=["samx"],
|
||||
action="read",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
|
||||
action="pre_scan",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device="mo1_bragg",
|
||||
action="kickoff",
|
||||
parameter={"configure": {}},
|
||||
),
|
||||
"fake_complete",
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
|
||||
device=["bpm4i", "eiger", "mo1_bragg"],
|
||||
action="read",
|
||||
parameter={"group": "monitored"},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
|
||||
device=["bpm4i", "eiger", "mo1_bragg"],
|
||||
action="read",
|
||||
parameter={"group": "monitored"},
|
||||
),
|
||||
"fake_complete",
|
||||
DeviceInstructionMessage(
|
||||
metadata={},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
|
||||
action="unstage",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="close_scan",
|
||||
parameter={},
|
||||
),
|
||||
]
|
||||
@@ -3,15 +3,8 @@ from unittest import mock
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
from bec_server.scan_server.tests.scan_hook_tests import (
|
||||
assert_close_scan_waits_for_baseline_and_closes,
|
||||
assert_pre_scan_called,
|
||||
assert_prepare_scan_reads_baseline_devices,
|
||||
assert_scan_open_called,
|
||||
assert_stage_all_devices_called,
|
||||
assert_unstage_all_devices_called,
|
||||
run_scan_tests,
|
||||
)
|
||||
from bec_server.scan_server.tests.scan_fixtures import *
|
||||
from bec_server.scan_server.tests.scan_hook_tests import *
|
||||
|
||||
XAS_SIMPLE_SCAN_DEFAULT_HOOK_TESTS = [
|
||||
("prepare_scan", [assert_prepare_scan_reads_baseline_devices]),
|
||||
|
||||
@@ -1,126 +1,75 @@
|
||||
# pylint: skip-file
|
||||
from unittest import mock
|
||||
|
||||
from bec_lib.messages import DeviceInstructionMessage
|
||||
from bec_server.device_server.tests.utils import DMMock
|
||||
import pytest
|
||||
from bec_server.scan_server.tests.scan_fixtures import *
|
||||
from bec_server.scan_server.tests.scan_hook_tests import *
|
||||
|
||||
from debye_bec.scans import NIDAQContinuousScan
|
||||
NIDAQ_CONTINUOUS_SCAN_DEFAULT_HOOK_TESTS = [
|
||||
("prepare_scan", [assert_prepare_scan_reads_baseline_devices]),
|
||||
("open_scan", [assert_scan_open_called]),
|
||||
("stage", [assert_stage_all_devices_called]),
|
||||
("pre_scan", [assert_pre_scan_called]),
|
||||
("unstage", [assert_unstage_all_devices_called]),
|
||||
("close_scan", [assert_close_scan_waits_for_baseline_and_closes]),
|
||||
]
|
||||
|
||||
|
||||
def get_instructions(request, ScanStubStatusMock):
|
||||
request.metadata["RID"] = "my_test_request_id"
|
||||
|
||||
def fake_done():
|
||||
"""
|
||||
Fake done function for ScanStubStatusMock. Upon each call, it returns the next value from the generator.
|
||||
This is used to simulate the completion of the scan.
|
||||
"""
|
||||
yield False
|
||||
yield False
|
||||
yield True
|
||||
|
||||
def fake_complete(*args, **kwargs):
|
||||
yield "fake_complete"
|
||||
return ScanStubStatusMock(done_func=fake_done)
|
||||
|
||||
with (
|
||||
mock.patch.object(request.stubs, "complete", side_effect=fake_complete),
|
||||
mock.patch.object(request.stubs, "_get_result_from_status", return_value=None),
|
||||
):
|
||||
reference_commands = list(request.run())
|
||||
|
||||
for cmd in reference_commands:
|
||||
if not cmd or isinstance(cmd, str):
|
||||
continue
|
||||
if "RID" in cmd.metadata:
|
||||
cmd.metadata["RID"] = "my_test_request_id"
|
||||
if "rpc_id" in cmd.parameter:
|
||||
cmd.parameter["rpc_id"] = "my_test_rpc_id"
|
||||
cmd.metadata.pop("device_instr_id", None)
|
||||
|
||||
return reference_commands
|
||||
def _assemble_nidaq_continuous_scan(v4_scan_assembler, **overrides):
|
||||
params = {"scan_duration": 10.0, "daq": "nidaq", "compression": False}
|
||||
params.update(overrides)
|
||||
return v4_scan_assembler("nidaq_continuous_scan", **params)
|
||||
|
||||
|
||||
def test_xas_simple_scan(scan_assembler, ScanStubStatusMock):
|
||||
@pytest.mark.parametrize(("hook_name", "hook_tests"), NIDAQ_CONTINUOUS_SCAN_DEFAULT_HOOK_TESTS)
|
||||
def test_nidaq_continuous_scan_v4_default_hooks(
|
||||
v4_scan_assembler, nth_done_status_mock, hook_name, hook_tests
|
||||
):
|
||||
scan = _assemble_nidaq_continuous_scan(v4_scan_assembler)
|
||||
|
||||
request = scan_assembler(NIDAQContinuousScan, scan_duration=10)
|
||||
request.device_manager.add_device("nidaq")
|
||||
reference_commands = get_instructions(request, ScanStubStatusMock)
|
||||
assert reference_commands == [
|
||||
None,
|
||||
None,
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="scan_report_instruction",
|
||||
parameter={"device_progress": ["nidaq"]},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="open_scan",
|
||||
parameter={
|
||||
"readout_priority": {
|
||||
"monitored": [],
|
||||
"baseline": [],
|
||||
"on_request": [],
|
||||
"async": ["nidaq"],
|
||||
},
|
||||
"num_points": 0,
|
||||
"positions": [],
|
||||
"scan_name": "nidaq_continuous_scan",
|
||||
"scan_type": "fly",
|
||||
},
|
||||
),
|
||||
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
|
||||
DeviceInstructionMessage(
|
||||
metadata={},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
|
||||
action="stage",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
|
||||
device=["samx"],
|
||||
action="read",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
|
||||
action="pre_scan",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device="nidaq",
|
||||
action="kickoff",
|
||||
parameter={"configure": {}},
|
||||
),
|
||||
"fake_complete",
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
|
||||
device=["bpm4i", "eiger", "mo1_bragg"],
|
||||
action="read",
|
||||
parameter={"group": "monitored"},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
|
||||
device=["bpm4i", "eiger", "mo1_bragg"],
|
||||
action="read",
|
||||
parameter={"group": "monitored"},
|
||||
),
|
||||
"fake_complete",
|
||||
DeviceInstructionMessage(
|
||||
metadata={},
|
||||
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
|
||||
action="unstage",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="close_scan",
|
||||
parameter={},
|
||||
),
|
||||
]
|
||||
run_scan_tests(scan, [(hook_name, hook_tests)], nth_done_status_mock=nth_done_status_mock)
|
||||
|
||||
|
||||
def test_nidaq_continuous_scan_v4_prepare_scan_updates_metadata(v4_scan_assembler):
|
||||
scan = _assemble_nidaq_continuous_scan(v4_scan_assembler)
|
||||
scan.actions.add_scan_report_instruction_device_progress = mock.MagicMock()
|
||||
baseline_status = mock.MagicMock()
|
||||
scan.actions.read_baseline_devices = mock.MagicMock(return_value=baseline_status)
|
||||
|
||||
scan.prepare_scan()
|
||||
scan.actions._build_scan_status_message("open")
|
||||
|
||||
assert scan.scan_info.additional_scan_parameters["scan_duration"] == 10.0
|
||||
assert scan.scan_info.additional_scan_parameters["compression"] is False
|
||||
assert scan.scan_info.readout_priority_modification["async"] == ["nidaq"]
|
||||
scan.actions.add_scan_report_instruction_device_progress.assert_called_once_with(scan.daq)
|
||||
scan.actions.read_baseline_devices.assert_called_once_with(wait=False)
|
||||
assert scan._baseline_readout_status is baseline_status
|
||||
|
||||
|
||||
def test_nidaq_continuous_scan_v4_scan_core_reads_until_complete(
|
||||
v4_scan_assembler, nth_done_status_mock
|
||||
):
|
||||
scan = _assemble_nidaq_continuous_scan(v4_scan_assembler)
|
||||
kickoff_status = mock.MagicMock()
|
||||
completion_status = nth_done_status_mock(resolve_after=3)
|
||||
scan.actions.kickoff = mock.MagicMock(return_value=kickoff_status)
|
||||
scan.actions.complete = mock.MagicMock(return_value=completion_status)
|
||||
scan.actions.read_monitored_devices = mock.MagicMock()
|
||||
|
||||
with mock.patch("debye_bec.scans.nidaq_continuous_scan.time.sleep"):
|
||||
scan.scan_core()
|
||||
|
||||
scan.actions.kickoff.assert_called_once_with(device=scan.daq, wait=False)
|
||||
kickoff_status.wait.assert_called_once_with(timeout=5)
|
||||
scan.actions.complete.assert_called_once_with(device=scan.daq, wait=False)
|
||||
assert scan.actions.read_monitored_devices.call_count == 2
|
||||
|
||||
|
||||
def test_nidaq_continuous_scan_v4_post_scan_completes_all_devices(v4_scan_assembler):
|
||||
scan = _assemble_nidaq_continuous_scan(v4_scan_assembler)
|
||||
scan.actions.complete_all_devices = mock.MagicMock()
|
||||
|
||||
scan.post_scan()
|
||||
|
||||
scan.actions.complete_all_devices.assert_called_once_with()
|
||||
|
||||
Reference in New Issue
Block a user