tests: fix DeviceMessages for tests

This commit is contained in:
2025-05-05 15:14:11 +02:00
parent a8e7325f0f
commit b8a050c424
2 changed files with 22 additions and 146 deletions

View File

@@ -181,6 +181,7 @@ def test_update_scan_parameters(mock_bragg):
scan_id="my_scan_id",
status="closed",
request_inputs={
"inputs": {},
"kwargs": {
"start": 0,
"stop": 5,
@@ -196,7 +197,7 @@ def test_update_scan_parameters(mock_bragg):
"cycle_high": 5,
"p_kink": 50,
"e_kink": 8000,
}
},
},
info={
"kwargs": {
@@ -441,138 +442,6 @@ def test_unstage(mock_bragg):
# )
# mock_bragg.scan_info.msg = scan_status_msg
<<<<<<< Updated upstream
# Ensure that ScanControlLoadMessage is set to SUCCESS
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS
with (
mock.patch.object(mock_bragg.scaninfo, "load_scan_metadata") as mock_load_scan_metadata,
mock.patch.object(mock_bragg, "_check_scan_msg") as mock_check_scan_msg,
mock.patch.object(mock_bragg, "on_unstage"),
):
scan_name = scan_status_msg.content["info"].get("scan_name", "")
# Chek the not implemented fly scan first, should raise Mo1BraggError
if scan_name not in [
"xas_simple_scan",
"xas_simple_scan_with_xrd",
"xas_advanced_scan",
"xas_advanced_scan_with_xrd",
]:
with pytest.raises(Mo1BraggError):
mock_bragg.stage()
assert mock_check_scan_msg.call_count == 1
assert mock_load_scan_metadata.call_count == 1
else:
with (
mock.patch.object(mock_bragg, "set_xas_settings") as mock_xas_settings,
mock.patch.object(
mock_bragg, "set_advanced_xas_settings"
) as mock_advanced_xas_settings,
mock.patch.object(mock_bragg, "set_trig_settings") as mock_trig_settings,
mock.patch.object(
mock_bragg, "set_scan_control_settings"
) as mock_set_scan_control_settings,
):
# Check xas_simple_scan
if scan_name == "xas_simple_scan":
mock_bragg.stage()
assert mock_xas_settings.call_args == mock.call(
low=scan_status_msg.content["info"]["kwargs"]["start"],
high=scan_status_msg.content["info"]["kwargs"]["stop"],
scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"],
)
assert mock_trig_settings.call_args == mock.call(
enable_low=False,
enable_high=False,
exp_time_low=0,
exp_time_high=0,
cycle_low=0,
cycle_high=0,
)
assert mock_set_scan_control_settings.call_args == mock.call(
mode=ScanControlMode.SIMPLE,
scan_duration=scan_status_msg.content["info"]["kwargs"][
"scan_duration"
],
)
# Check xas_simple_scan_with_xrd
elif scan_name == "xas_simple_scan_with_xrd":
mock_bragg.stage()
assert mock_xas_settings.call_args == mock.call(
low=scan_status_msg.content["info"]["kwargs"]["start"],
high=scan_status_msg.content["info"]["kwargs"]["stop"],
scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"],
)
assert mock_trig_settings.call_args == mock.call(
enable_low=scan_status_msg.content["info"]["kwargs"]["xrd_enable_low"],
enable_high=scan_status_msg.content["info"]["kwargs"][
"xrd_enable_high"
],
exp_time_low=scan_status_msg.content["info"]["kwargs"]["exp_time_low"],
exp_time_high=scan_status_msg.content["info"]["kwargs"][
"exp_time_high"
],
cycle_low=scan_status_msg.content["info"]["kwargs"]["cycle_low"],
cycle_high=scan_status_msg.content["info"]["kwargs"]["cycle_high"],
)
assert mock_set_scan_control_settings.call_args == mock.call(
mode=ScanControlMode.SIMPLE,
scan_duration=scan_status_msg.content["info"]["kwargs"][
"scan_duration"
],
)
# Check xas_advanced_scan
elif scan_name == "xas_advanced_scan":
mock_bragg.stage()
assert mock_advanced_xas_settings.call_args == mock.call(
low=scan_status_msg.content["info"]["kwargs"]["start"],
high=scan_status_msg.content["info"]["kwargs"]["stop"],
scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"],
p_kink=scan_status_msg.content["info"]["kwargs"]["p_kink"],
e_kink=scan_status_msg.content["info"]["kwargs"]["e_kink"],
)
assert mock_trig_settings.call_args == mock.call(
enable_low=False,
enable_high=False,
exp_time_low=0,
exp_time_high=0,
cycle_low=0,
cycle_high=0,
)
assert mock_set_scan_control_settings.call_args == mock.call(
mode=ScanControlMode.ADVANCED,
scan_duration=scan_status_msg.content["info"]["kwargs"][
"scan_duration"
],
)
# Check xas_advanced_scan_with_xrd
elif scan_name == "xas_advanced_scan_with_xrd":
mock_bragg.stage()
assert mock_advanced_xas_settings.call_args == mock.call(
low=scan_status_msg.content["info"]["kwargs"]["start"],
high=scan_status_msg.content["info"]["kwargs"]["stop"],
scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"],
p_kink=scan_status_msg.content["info"]["kwargs"]["p_kink"],
e_kink=scan_status_msg.content["info"]["kwargs"]["e_kink"],
)
assert mock_trig_settings.call_args == mock.call(
enable_low=scan_status_msg.content["info"]["kwargs"]["xrd_enable_low"],
enable_high=scan_status_msg.content["info"]["kwargs"][
"xrd_enable_high"
],
exp_time_low=scan_status_msg.content["info"]["kwargs"]["exp_time_low"],
exp_time_high=scan_status_msg.content["info"]["kwargs"][
"exp_time_high"
],
cycle_low=scan_status_msg.content["info"]["kwargs"]["cycle_low"],
cycle_high=scan_status_msg.content["info"]["kwargs"]["cycle_high"],
)
assert mock_set_scan_control_settings.call_args == mock.call(
mode=ScanControlMode.ADVANCED,
scan_duration=scan_status_msg.content["info"]["kwargs"][
"scan_duration"
],
)
=======
# # Ensure that ScanControlLoadMessage is set to SUCCESS
# mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS
# with (
@@ -701,4 +570,3 @@ def test_unstage(mock_bragg):
# "scan_duration"
# ],
# )
>>>>>>> Stashed changes

View File

@@ -49,6 +49,7 @@ def get_instructions(request, ScanStubStatusMock):
def test_xas_simple_scan(scan_assembler, ScanStubStatusMock):
request = scan_assembler(XASSimpleScan, start=0, stop=5, scan_time=1, scan_duration=10)
request.device_manager.add_device("nidaq")
reference_commands = get_instructions(request, ScanStubStatusMock)
assert reference_commands == [
@@ -70,7 +71,7 @@ def test_xas_simple_scan(scan_assembler, ScanStubStatusMock):
"monitored": [],
"baseline": [],
"on_request": [],
"async": [],
"async": ["nidaq"],
},
"num_points": None,
"positions": [0.0, 5.0],
@@ -78,6 +79,7 @@ def test_xas_simple_scan(scan_assembler, ScanStubStatusMock):
"scan_type": "fly",
},
),
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
@@ -104,7 +106,7 @@ def test_xas_simple_scan(scan_assembler, ScanStubStatusMock):
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="pre_scan",
parameter={},
),
@@ -130,7 +132,7 @@ def test_xas_simple_scan(scan_assembler, ScanStubStatusMock):
"fake_complete",
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="unstage",
parameter={},
),
@@ -160,6 +162,7 @@ def test_xas_simple_scan_with_xrd(scan_assembler, ScanStubStatusMock):
exp_time_high=3,
cycle_high=4,
)
request.device_manager.add_device("nidaq")
reference_commands = get_instructions(request, ScanStubStatusMock)
assert reference_commands == [
@@ -181,7 +184,7 @@ def test_xas_simple_scan_with_xrd(scan_assembler, ScanStubStatusMock):
"monitored": [],
"baseline": [],
"on_request": [],
"async": [],
"async": ["nidaq"],
},
"num_points": None,
"positions": [0.0, 5.0],
@@ -189,6 +192,7 @@ def test_xas_simple_scan_with_xrd(scan_assembler, ScanStubStatusMock):
"scan_type": "fly",
},
),
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
@@ -215,7 +219,7 @@ def test_xas_simple_scan_with_xrd(scan_assembler, ScanStubStatusMock):
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="pre_scan",
parameter={},
),
@@ -241,7 +245,7 @@ def test_xas_simple_scan_with_xrd(scan_assembler, ScanStubStatusMock):
"fake_complete",
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="unstage",
parameter={},
),
@@ -265,6 +269,7 @@ def test_xas_advanced_scan(scan_assembler, ScanStubStatusMock):
p_kink=50,
e_kink=8500,
)
request.device_manager.add_device("nidaq")
reference_commands = get_instructions(request, ScanStubStatusMock)
assert reference_commands == [
@@ -286,7 +291,7 @@ def test_xas_advanced_scan(scan_assembler, ScanStubStatusMock):
"monitored": [],
"baseline": [],
"on_request": [],
"async": [],
"async": ["nidaq"],
},
"num_points": None,
"positions": [8000.0, 9000.0],
@@ -294,6 +299,7 @@ def test_xas_advanced_scan(scan_assembler, ScanStubStatusMock):
"scan_type": "fly",
},
),
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
@@ -320,7 +326,7 @@ def test_xas_advanced_scan(scan_assembler, ScanStubStatusMock):
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="pre_scan",
parameter={},
),
@@ -346,7 +352,7 @@ def test_xas_advanced_scan(scan_assembler, ScanStubStatusMock):
"fake_complete",
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="unstage",
parameter={},
),
@@ -378,6 +384,7 @@ def test_xas_advanced_scan_with_xrd(scan_assembler, ScanStubStatusMock):
exp_time_high=3,
cycle_high=4,
)
request.device_manager.add_device("nidaq")
reference_commands = get_instructions(request, ScanStubStatusMock)
assert reference_commands == [
@@ -399,7 +406,7 @@ def test_xas_advanced_scan_with_xrd(scan_assembler, ScanStubStatusMock):
"monitored": [],
"baseline": [],
"on_request": [],
"async": [],
"async": ["nidaq"],
},
"num_points": None,
"positions": [8000.0, 9000.0],
@@ -407,6 +414,7 @@ def test_xas_advanced_scan_with_xrd(scan_assembler, ScanStubStatusMock):
"scan_type": "fly",
},
),
DeviceInstructionMessage(metadata={}, device="nidaq", action="stage", parameter={}),
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
@@ -433,7 +441,7 @@ def test_xas_advanced_scan_with_xrd(scan_assembler, ScanStubStatusMock):
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="pre_scan",
parameter={},
),
@@ -459,7 +467,7 @@ def test_xas_advanced_scan_with_xrd(scan_assembler, ScanStubStatusMock):
"fake_complete",
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
action="unstage",
parameter={},
),