diff --git a/tests/tests_devices/test_mo1_bragg.py b/tests/tests_devices/test_mo1_bragg.py index 4aef71f..f3d8a1e 100644 --- a/tests/tests_devices/test_mo1_bragg.py +++ b/tests/tests_devices/test_mo1_bragg.py @@ -2,10 +2,12 @@ import os import threading import time +from dataclasses import fields from unittest import mock import ophyd import pytest +from bec_lib.messages import ScanStatusMessage from ophyd.utils import LimitError from ophyd_devices.tests.utils import MockPV @@ -123,33 +125,103 @@ def test_set_xtal(mock_bragg): assert dev.crystal.xtal_enum.get() == 1 -def test_setup_simple_xas_scan(mock_bragg): +def test_set_xas_settings(mock_bragg): dev = mock_bragg - dev.scan_control.scan_status._read_pv.mock_data = 1 - dev.setup_simple_xas_scan(low=0.5, high=1, scan_time=0.1, mode=0, scan_duration=10) - - assert dev.scan_settings.xrd_enable_hi_enum.get() == False - assert dev.scan_settings.xrd_enable_lo_enum.get() == False - - assert dev.scan_settings.s_scan_energy_hi.get() == 1 + dev.move_type.set(MoveType.ENERGY) + dev.set_xas_settings(low=0.5, high=1, scan_time=0.1) assert dev.scan_settings.s_scan_energy_lo.get() == 0.5 + assert dev.scan_settings.s_scan_energy_hi.get() == 1 assert dev.scan_settings.s_scan_scantime.get() == 0.1 + dev.move_type.set(MoveType.ANGLE) + dev.set_xas_settings(low=10, high=20, scan_time=1) + assert dev.scan_settings.s_scan_angle_lo.get() == 10 + assert dev.scan_settings.s_scan_angle_hi.get() == 20 + assert dev.scan_settings.s_scan_scantime.get() == 1 + +def test_set_xrd_settings(mock_bragg): + dev = mock_bragg + dev.set_xrd_settings( + enable_low=True, + enable_high=False, + num_trigger_low=1, + num_trigger_high=7, + exp_time_low=1, + exp_time_high=3, + cycle_low=1, + cycle_high=5, + ) + assert dev.scan_settings.xrd_enable_lo_enum.get() == True + assert dev.scan_settings.xrd_enable_hi_enum.get() == False + assert dev.scan_settings.xrd_n_trigger_lo.get() == 1 + assert dev.scan_settings.xrd_n_trigger_hi.get() == 7 + assert dev.scan_settings.xrd_time_lo.get() == 1 + assert dev.scan_settings.xrd_time_hi.get() == 3 + assert dev.scan_settings.xrd_every_n_lo.get() == 1 + assert dev.scan_settings.xrd_every_n_hi.get() == 5 + + +def test_set_control_settings(mock_bragg): + dev = mock_bragg + dev.set_scan_control_settings(mode=ScanControlMode.SIMPLE, scan_duration=10) assert dev.scan_control.scan_mode_enum.get() == ScanControlMode.SIMPLE assert dev.scan_control.scan_duration.get() == 10 + dev.set_scan_control_settings(mode=ScanControlMode.ADVANCED, scan_duration=5) + assert dev.scan_control.scan_mode_enum.get() == ScanControlMode.ADVANCED + assert dev.scan_control.scan_duration.get() == 5 + + +def test_update_scan_parameters(mock_bragg): + dev = mock_bragg + msg = ScanStatusMessage( + scan_id="my_scan_id", + status="closed", + info={ + "args": { + "start": 0, + "end": 5, + "scan_time": 1, + "scan_duration": 10, + "xrd_enable_low": True, + "xrd_enable_high": False, + "num_trigger_low": 1, + "num_trigger_high": 7, + "exp_time_low": 1, + "exp_time_high": 3, + "cycle_low": 1, + "cycle_high": 5, + } + }, + metadata={}, + ) + mock_bragg.scaninfo.scan_msg = msg + for field in fields(dev.scan_parameter): + assert getattr(dev.scan_parameter, field.name) == None + dev._update_scan_parameter() + for field in fields(dev.scan_parameter): + assert getattr(dev.scan_parameter, field.name) == msg.content["info"]["args"].get( + field.name, None + ) def test_kickoff_scan(mock_bragg): dev = mock_bragg + dev.scan_control.scan_status._read_pv.mock_data = ScanControlScanStatus.READY + dev.scan_control.scan_duration._read_pv.mock_data = 1 dev.scan_control.scan_start_timer._read_pv.mock_data = 0 - dev.scan_control.scan_status._read_pv.mock_data = 1 - dev.setup_simple_xas_scan(low=0.5, high=1, scan_time=0.1, mode=0, scan_duration=10) - # Start time mode - dev.kickoff() - assert dev.scan_control.scan_start_timer.get() == 1 dev.scan_control.scan_start_infinite._read_pv.mock_data = 0 - # Start infinite mode - dev.setup_simple_xas_scan(low=0.5, high=1, scan_time=0.1, mode=0, scan_duration=0) + status = dev.kickoff() + assert status.done is False + dev.scan_control.scan_status._read_pv.mock_data = ScanControlScanStatus.RUNNING + time.sleep(0.2) + assert status.done is True + assert dev.scan_control.scan_start_timer.get() == 1 + + dev.scan_control.scan_duration._read_pv.mock_data = 0 + dev.scan_control.scan_start_timer._read_pv.mock_data = 0 + dev.scan_control.scan_start_infinite._read_pv.mock_data = 0 + dev.scan_control.scan_start_infinite._read_pv.mock_data = 0 + status = dev.kickoff() dev.kickoff() assert dev.scan_control.scan_start_infinite.get() == 1 @@ -179,64 +251,107 @@ def test_complete(mock_bragg): def test_unstage(mock_bragg): mock_bragg.scan_control.scan_val_reset._read_pv.mock_data = 0 + mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS mock_bragg.unstage() assert mock_bragg.scan_control.scan_val_reset.get() == 1 - # Ensure PV is also called when stopped - mock_bragg._stopped = True mock_bragg.scan_control.scan_val_reset._read_pv.mock_data = 0 + mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.PENDING mock_bragg.unstage() - assert mock_bragg.scan_control.scan_val_reset.get() == 1 - assert mock_bragg._stopped == False + assert mock_bragg.scan_control.scan_val_reset.get() == 0 def test_stage(mock_bragg): # Test unknown scan type first - mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.PENDING + mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS + msg = ScanStatusMessage( + scan_id="my_scan_id", + status="closed", + info={ + "args": { + "start": 0, + "end": 5, + "scan_time": 1, + "scan_duration": 10, + "xrd_enable_low": True, + "xrd_enable_high": False, + "num_trigger_low": 1, + "num_trigger_high": 7, + "exp_time_low": 1, + "exp_time_high": 3, + "cycle_low": 1, + "cycle_high": 5, + } + }, + metadata={}, + ) + mock_bragg.scaninfo.scan_msg = msg mock_bragg.scaninfo.scan_msg.content["info"].update({"scan_name": "unknown_fly_scan"}) - with pytest.raises(Mo1BraggError): - mock_bragg.stage() - - # Test simple XAS scan - mock_bragg.scaninfo.scan_msg.content["info"].update({"scan_name": "energy_oscillation_scan"}) - start = 0 - end = 5 - scan_time = 1 - scan_duration = 10 - - # Ensure side effect sets parameters - def _get_scaninfo(): - mock_bragg.start = 0 - mock_bragg.end = 5 - mock_bragg.scan_time = 1 - mock_bragg.scan_duration = 10 - with ( - mock.patch.object(mock_bragg, "_get_scaninfo_parameters", side_effect=_get_scaninfo), - mock.patch.object(mock_bragg, "setup_simple_xas_scan") as mock_setup_simple_xas, mock.patch.object(mock_bragg.scaninfo, "load_scan_metadata") as mock_load_scan_metadata, - mock.patch.object( - mock_bragg.scan_control.scan_msg, - "get", - side_effect=[ - ScanControlLoadMessage.PENDING, - ScanControlLoadMessage.PENDING, - ScanControlLoadMessage.SUCCESS, - ], - ), + mock.patch.object(mock_bragg, "_check_scan_msg") as mock_check_scan_msg, ): - mock_bragg.stage() - assert mock_load_scan_metadata.call_count == 1 - assert mock_setup_simple_xas.call_args == mock.call( - low=start, - high=end, - scan_time=scan_time, - mode=ScanControlMode.SIMPLE, - scan_duration=scan_duration, - ) - - with mock.patch.object(mock_bragg, "on_stage") as mock_unstage: - mock_bragg._staged = ophyd.Staged.yes - with pytest.raises(ophyd.utils.errors.RedundantStaging): + with pytest.raises(Mo1BraggError): mock_bragg.stage() - assert mock_unstage.call_count == 0 + assert mock_check_scan_msg.call_count == 1 + assert mock_load_scan_metadata.call_count == 1 + # Test simple XAS scan + mock_bragg.scaninfo.scan_msg.content["info"].update({"scan_name": "xas_simple_scan"}) + + with ( + mock.patch.object(mock_bragg, "set_xas_settings") as mock_xas_settings, + mock.patch.object(mock_bragg, "set_xrd_settings") as mock_xrd_settings, + mock.patch.object( + mock_bragg, "set_scan_control_settings" + ) as mock_set_scan_control_settings, + ): + mock_bragg.stage() + assert mock_xas_settings.call_args == mock.call( + low=msg.content["info"]["args"]["start"], + high=msg.content["info"]["args"]["end"], + scan_time=msg.content["info"]["args"]["scan_time"], + ) + assert mock_xrd_settings.call_args == mock.call( + enable_low=False, + enable_high=False, + num_trigger_low=0, + num_trigger_high=0, + exp_time_low=0, + exp_time_high=0, + cycle_low=0, + cycle_high=0, + ) + assert mock_set_scan_control_settings.call_args == mock.call( + mode=ScanControlMode.SIMPLE, + scan_duration=msg.content["info"]["args"]["scan_duration"], + ) + mock_bragg.scaninfo.scan_msg.content["info"].update( + {"scan_name": "xas_simple_scan_with_xrd"} + ) + # Unstage mock_bragg to reset _staged + mock_bragg.unstage() + time.sleep(0.1) + # Test simple XAS scan with XRD + mock_bragg.stage() + assert mock_xas_settings.call_args == mock.call( + low=msg.content["info"]["args"]["start"], + high=msg.content["info"]["args"]["end"], + scan_time=msg.content["info"]["args"]["scan_time"], + ) + assert mock_xrd_settings.call_args == mock.call( + enable_low=msg.content["info"]["args"]["xrd_enable_low"], + enable_high=msg.content["info"]["args"]["xrd_enable_high"], + num_trigger_low=msg.content["info"]["args"]["num_trigger_low"], + num_trigger_high=msg.content["info"]["args"]["num_trigger_high"], + exp_time_low=msg.content["info"]["args"]["exp_time_low"], + exp_time_high=msg.content["info"]["args"]["exp_time_high"], + cycle_low=msg.content["info"]["args"]["cycle_low"], + cycle_high=msg.content["info"]["args"]["cycle_high"], + ) + assert mock_set_scan_control_settings.call_args == mock.call( + mode=ScanControlMode.SIMPLE, + scan_duration=msg.content["info"]["args"]["scan_duration"], + ) + # Test redundant staging + with pytest.raises(ophyd.utils.errors.RedundantStaging): + mock_bragg.stage() diff --git a/tests/tests_scans/test_mono_bragg_scans.py b/tests/tests_scans/test_mono_bragg_scans.py index 108e22d..235f3dc 100644 --- a/tests/tests_scans/test_mono_bragg_scans.py +++ b/tests/tests_scans/test_mono_bragg_scans.py @@ -3,15 +3,15 @@ from unittest import mock from bec_lib.messages import DeviceInstructionMessage from bec_server.device_server.tests.utils import DMMock -from debye_bec.scans import MonoBraggOscillationScan +from debye_bec.scans import XASSimpleScan, XASSimpleScanWithXRD -def test_mono_bragg_oscillation_scan(): +def test_xas_simple_scan(): # create a fake device manager that we can use to add devices device_manager = DMMock() device_manager.add_device("mo1_bragg") - request = MonoBraggOscillationScan( + request = XASSimpleScan( start=0, stop=5, scan_time=1, scan_duration=10, device_manager=device_manager ) request.metadata["RID"] = "my_test_request_id" @@ -52,7 +52,7 @@ def test_mono_bragg_oscillation_scan(): }, "num_points": None, "positions": [0.0, 5.0], - "scan_name": "energy_oscillation_scan", + "scan_name": "xas_simple_scan", "scan_type": "fly", }, ), @@ -123,19 +123,179 @@ def test_mono_bragg_oscillation_scan(): ), DeviceInstructionMessage( metadata={"readout_priority": "monitored", "DIID": 11, "RID": "my_test_request_id"}, - device=[], - action="complete", - parameter={}, + device=None, + action="wait", + parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"}, ), DeviceInstructionMessage( metadata={"readout_priority": "monitored", "DIID": 12, "RID": "my_test_request_id"}, device=None, - action="unstage", + action="complete", parameter={}, ), DeviceInstructionMessage( metadata={"readout_priority": "monitored", "DIID": 13, "RID": "my_test_request_id"}, device=None, + action="unstage", + parameter={}, + ), + DeviceInstructionMessage( + metadata={"readout_priority": "monitored", "DIID": 14, "RID": "my_test_request_id"}, + device=None, + action="close_scan", + parameter={}, + ), + ] + + +def test_xas_simple_scan_with_xrd(): + # create a fake device manager that we can use to add devices + device_manager = DMMock() + device_manager.add_device("mo1_bragg") + + request = XASSimpleScanWithXRD( + start=0, + stop=5, + scan_time=1, + scan_duration=10, + xrd_enable_low=True, + num_trigger_low=1, + exp_time_low=1, + cycle_low=1, + xrd_enable_high=True, + num_trigger_high=2, + exp_time_high=3, + cycle_high=4, + device_manager=device_manager, + ) + request.metadata["RID"] = "my_test_request_id" + with ( + mock.patch.object(request.stubs, "get_req_status", side_effect=[False, True]), + mock.patch.object(request.stubs, "_get_from_rpc", return_value=True), + ): + reference_commands = list(request.run()) + + for cmd in reference_commands: + if not cmd: + continue + if "RID" in cmd.metadata: + cmd.metadata["RID"] = "my_test_request_id" + if "rpc_id" in cmd.parameter: + cmd.parameter["rpc_id"] = "my_test_rpc_id" + + assert reference_commands == [ + None, + None, + DeviceInstructionMessage( + metadata={"readout_priority": "monitored", "DIID": 0, "RID": "my_test_request_id"}, + device=None, + action="scan_report_instruction", + parameter={"device_progress": ["mo1_bragg"]}, + ), + DeviceInstructionMessage( + metadata={"readout_priority": "monitored", "DIID": 1, "RID": "my_test_request_id"}, + device=None, + action="open_scan", + parameter={ + "scan_motors": [], + "readout_priority": { + "monitored": [], + "baseline": [], + "on_request": [], + "async": [], + }, + "num_points": None, + "positions": [0.0, 5.0], + "scan_name": "xas_simple_scan_with_xrd", + "scan_type": "fly", + }, + ), + DeviceInstructionMessage( + metadata={"readout_priority": "monitored", "DIID": 2, "RID": "my_test_request_id"}, + device=None, + action="stage", + parameter={}, + ), + DeviceInstructionMessage( + metadata={"readout_priority": "baseline", "DIID": 3, "RID": "my_test_request_id"}, + device=None, + action="baseline_reading", + parameter={}, + ), + DeviceInstructionMessage( + metadata={ + "readout_priority": "monitored", + "DIID": 4, + "RID": "my_test_request_id", + "response": True, + }, + device="mo1_bragg", + action="rpc", + parameter={ + "device": "mo1_bragg", + "func": "move_type.set", + "rpc_id": "my_test_rpc_id", + "args": ("energy",), + "kwargs": {}, + }, + ), + DeviceInstructionMessage( + metadata={"readout_priority": "monitored", "DIID": 5, "RID": "my_test_request_id"}, + device="mo1_bragg", + action="kickoff", + parameter={"configure": {}, "wait_group": "kickoff"}, + ), + DeviceInstructionMessage( + metadata={"readout_priority": "monitored", "DIID": 6, "RID": "my_test_request_id"}, + device="mo1_bragg", + action="complete", + parameter={}, + ), + DeviceInstructionMessage( + metadata={"readout_priority": "monitored", "DIID": 7, "RID": "my_test_request_id"}, + device=None, + action="read", + parameter={"group": "primary", "wait_group": "readout_primary"}, + ), + DeviceInstructionMessage( + metadata={"readout_priority": "monitored", "DIID": 8, "RID": "my_test_request_id"}, + device=None, + action="wait", + parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"}, + ), + DeviceInstructionMessage( + metadata={"readout_priority": "monitored", "DIID": 9, "RID": "my_test_request_id"}, + device=None, + action="read", + parameter={"group": "primary", "wait_group": "readout_primary"}, + ), + DeviceInstructionMessage( + metadata={"readout_priority": "monitored", "DIID": 10, "RID": "my_test_request_id"}, + device=None, + action="wait", + parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"}, + ), + DeviceInstructionMessage( + metadata={"readout_priority": "monitored", "DIID": 11, "RID": "my_test_request_id"}, + device=None, + action="wait", + parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"}, + ), + DeviceInstructionMessage( + metadata={"readout_priority": "monitored", "DIID": 12, "RID": "my_test_request_id"}, + device=None, + action="complete", + parameter={}, + ), + DeviceInstructionMessage( + metadata={"readout_priority": "monitored", "DIID": 13, "RID": "my_test_request_id"}, + device=None, + action="unstage", + parameter={}, + ), + DeviceInstructionMessage( + metadata={"readout_priority": "monitored", "DIID": 14, "RID": "my_test_request_id"}, + device=None, action="close_scan", parameter={}, ),