128 lines
4.4 KiB
Python
128 lines
4.4 KiB
Python
# pylint: skip-file
|
|
from unittest import mock
|
|
|
|
from bec_lib.messages import DeviceInstructionMessage
|
|
from bec_server.device_server.tests.utils import DMMock
|
|
|
|
from debye_bec.scans import NIDAQContinuousScan
|
|
|
|
|
|
def get_instructions(request, ScanStubStatusMock):
|
|
request.metadata["RID"] = "my_test_request_id"
|
|
|
|
def fake_done():
|
|
"""
|
|
Fake done function for ScanStubStatusMock. Upon each call, it returns the next value from the generator.
|
|
This is used to simulate the completion of the scan.
|
|
"""
|
|
yield False
|
|
yield False
|
|
yield True
|
|
|
|
def fake_complete(*args, **kwargs):
|
|
yield "fake_complete"
|
|
return ScanStubStatusMock(done_func=fake_done)
|
|
|
|
with (
|
|
mock.patch.object(request.stubs, "complete", side_effect=fake_complete),
|
|
mock.patch.object(request.stubs, "_get_result_from_status", return_value=None),
|
|
):
|
|
reference_commands = list(request.run())
|
|
|
|
for cmd in reference_commands:
|
|
if not cmd or isinstance(cmd, str):
|
|
continue
|
|
if "RID" in cmd.metadata:
|
|
cmd.metadata["RID"] = "my_test_request_id"
|
|
if "rpc_id" in cmd.parameter:
|
|
cmd.parameter["rpc_id"] = "my_test_rpc_id"
|
|
cmd.metadata.pop("device_instr_id", None)
|
|
|
|
return reference_commands
|
|
|
|
|
|
def test_xas_simple_scan(scan_assembler, ScanStubStatusMock):
|
|
|
|
request = scan_assembler(NIDAQContinuousScan, scan_duration=10)
|
|
request.device_manager.add_device("nidaq")
|
|
reference_commands = get_instructions(request, ScanStubStatusMock)
|
|
assert reference_commands == [
|
|
None,
|
|
None,
|
|
DeviceInstructionMessage(
|
|
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
|
device=None,
|
|
action="scan_report_instruction",
|
|
parameter={"device_progress": ["nidaq"]},
|
|
),
|
|
DeviceInstructionMessage(
|
|
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
|
device=None,
|
|
action="open_scan",
|
|
parameter={
|
|
"scan_motors": [],
|
|
"readout_priority": {
|
|
"monitored": [],
|
|
"baseline": [],
|
|
"on_request": [],
|
|
"async": ["nidaq"],
|
|
},
|
|
"num_points": 0,
|
|
"positions": [],
|
|
"scan_name": "nidaq_continuous_scan",
|
|
"scan_type": "fly",
|
|
},
|
|
),
|
|
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
|
|
DeviceInstructionMessage(
|
|
metadata={},
|
|
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
|
|
action="stage",
|
|
parameter={},
|
|
),
|
|
DeviceInstructionMessage(
|
|
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
|
|
device=["samx"],
|
|
action="read",
|
|
parameter={},
|
|
),
|
|
DeviceInstructionMessage(
|
|
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
|
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
|
|
action="pre_scan",
|
|
parameter={},
|
|
),
|
|
DeviceInstructionMessage(
|
|
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
|
device="nidaq",
|
|
action="kickoff",
|
|
parameter={"configure": {}},
|
|
),
|
|
"fake_complete",
|
|
DeviceInstructionMessage(
|
|
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
|
|
device=["bpm4i", "eiger", "mo1_bragg"],
|
|
action="read",
|
|
parameter={"group": "monitored"},
|
|
),
|
|
DeviceInstructionMessage(
|
|
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
|
|
device=["bpm4i", "eiger", "mo1_bragg"],
|
|
action="read",
|
|
parameter={"group": "monitored"},
|
|
),
|
|
"fake_complete",
|
|
DeviceInstructionMessage(
|
|
metadata={},
|
|
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
|
|
action="unstage",
|
|
parameter={},
|
|
),
|
|
DeviceInstructionMessage(
|
|
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
|
|
device=None,
|
|
action="close_scan",
|
|
parameter={},
|
|
),
|
|
]
|