diff --git a/csaxs_bec/scans/scans_v4/cont_grid.py b/csaxs_bec/scans/scans_v4/cont_grid.py index 2863fbb..9ec1c38 100644 --- a/csaxs_bec/scans/scans_v4/cont_grid.py +++ b/csaxs_bec/scans/scans_v4/cont_grid.py @@ -75,58 +75,21 @@ class ContGrid(ScanBase): } def __init__( + # fmt: off self, - fast_axis: Annotated[ - DeviceBase, - ScanArgument(display_name="Fast Axis", description="Axis with continuous motion."), - ], - fast_start: Annotated[ - float, - ScanArgument( - display_name="Fast Start", - description="Start position for measurement points of the fast axis.", - ), - ], - fast_end: Annotated[ - float, - ScanArgument( - display_name="Fast End", - description="End position for measurement points of the fast axis.", - ), - ], - fast_step_size: Annotated[ - float, - ScanArgument( - display_name="Fast Step Size", - description="Step size for points of the continuous motion axis.", - ), - ], - stepper_axis: Annotated[ - DeviceBase, - ScanArgument( - display_name="Step Axis", - description="Step axis of the grid scan, stepping through the lines.", - ), - ], - stepper_start: Annotated[ - float, - ScanArgument(display_name="Step Start", description="Start position of the step axis."), - ], - stepper_stop: Annotated[ - float, - ScanArgument(display_name="Step Stop", description="End position of the step axis."), - ], - stepper_step_size: Annotated[ - float, - ScanArgument( - display_name="Step Step Size", - description="Step size of the step axis in units of the motor.", - ), - ], + fast_axis: Annotated[DeviceBase, ScanArgument(display_name="Fast Axis", description="Axis with continuous motion.")], + fast_start: Annotated[float, ScanArgument(display_name="Fast Start", description="Start position for measurement points of the fast axis.")], + fast_end: Annotated[float, ScanArgument(display_name="Fast End", description="End position for measurement points of the fast axis.")], + fast_step_size: Annotated[float, ScanArgument(display_name="Fast Step Size", description="Step size for points of the continuous motion axis.")], + stepper_axis: Annotated[DeviceBase, ScanArgument(display_name="Step Axis", description="Step axis of the grid scan, stepping through the lines.")], + stepper_start: Annotated[float, ScanArgument(display_name="Step Start", description="Start position of the step axis.")], + stepper_stop: Annotated[float, ScanArgument(display_name="Step Stop", description="End position of the step axis.")], + stepper_step_size: Annotated[float, ScanArgument(display_name="Step Step Size", description="Step size of the step axis in units of the motor.")], exp_time: DefaultArgType.ExposureTime, relative: DefaultArgType.Relative = False, fast_axis_always_in_pos_dir: bool = True, **kwargs, + # fmt: on ): """ Continuous grid scan with 2-axis. The scan requires the fast axis to properly implement base velocity as well as high velocity and high acceleration time diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..5868e25 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1 @@ +from bec_server.scan_server.tests.scan_fixtures import * diff --git a/tests/tests_scans/test_cont_grid_scan.py b/tests/tests_scans/test_cont_grid_scan.py new file mode 100644 index 0000000..90ece7d --- /dev/null +++ b/tests/tests_scans/test_cont_grid_scan.py @@ -0,0 +1,125 @@ +from unittest import mock + +import numpy as np +import pytest +from bec_server.scan_server.tests.scan_fixtures import MockCustomDevice +from bec_server.scan_server.tests.scan_hook_tests import ( + DEFAULT_HOOK_TESTS, + PREMOVE_HOOK_TESTS, + run_scan_tests, +) + +CONT_GRID_HOOK_TESTS = [*DEFAULT_HOOK_TESTS, *PREMOVE_HOOK_TESTS] + + +def _assemble_cont_grid_scan(v4_scan_assembler, device_manager): + custom_samx = MockCustomDevice( + "samx", + device_info={ + "signals": { + "readback": { + "obj_name": "samx", + "kind_str": "hinted", + "describe": {"precision": 3}, + }, + "velocity": { + "obj_name": "samx_velocity", + "kind_str": "config", + "describe": {"precision": 3}, + }, + "acceleration": { + "obj_name": "samx_acceleration", + "kind_str": "config", + "describe": {"precision": 3}, + }, + "base_velocity": { + "obj_name": "samx_base_velocity", + "kind_str": "config", + "describe": {"precision": 3}, + }, + } + }, + signal_read_values={ + "samx": 0.0, + "samx_velocity": 10.0, + "samx_acceleration": 2.0, + "samx_base_velocity": 0.0, + }, + ) + custom_samy = MockCustomDevice( + "samy", + device_info={ + "signals": { + "readback": {"obj_name": "samy", "kind_str": "hinted", "describe": {"precision": 3}} + } + }, + signal_read_values={"samy": 0.0}, + ) + custom_ddg1 = MockCustomDevice( + "ddg1", + device_info={ + "signals": { + "readback": {"obj_name": "ddg1", "kind_str": "hinted", "describe": {"precision": 3}} + } + }, + signal_read_values={"ddg1": 0.0}, + ) + custom_ddg1.get_shutter_to_open_delay = mock.MagicMock(return_value=2e-3) + custom_mcs = MockCustomDevice( + "mcs", + device_info={ + "signals": { + "readback": {"obj_name": "mcs", "kind_str": "hinted", "describe": {"precision": 3}} + } + }, + signal_read_values={"mcs": 0.0}, + ) + device_manager.add_device(custom_samx, replace=True) + device_manager.add_device(custom_samy, replace=True) + device_manager.add_device(custom_mcs, replace=True) + device_manager.add_device(custom_ddg1, replace=True) + return v4_scan_assembler("cont_grid", "samx", -1.0, 1.0, 1, "samy", -2.0, 2.0, 2, exp_time=0.1) + + +@pytest.mark.parametrize(("hook_name", "hook_tests"), CONT_GRID_HOOK_TESTS) +def test_cont_grid_default_hooks( + v4_scan_assembler, device_manager, nth_done_status_mock, hook_name, hook_tests +): + scan = _assemble_cont_grid_scan(v4_scan_assembler, device_manager) + + run_scan_tests(scan, [(hook_name, hook_tests)], nth_done_status_mock=nth_done_status_mock) + + +def test_cont_grid_post_scan_waits_for_completion_and_moves_back_when_relative( + v4_scan_assembler, device_manager, nth_done_status_mock +): + scan = _assemble_cont_grid_scan(v4_scan_assembler, device_manager) + + completion_status = nth_done_status_mock(resolve_after=3) + scan.relative = True + scan.start_positions = [1.2, -0.7] + scan.actions.complete_all_devices = mock.MagicMock(return_value=completion_status) + scan.components.move_and_wait = mock.MagicMock() + scan._restore_motor_properties = mock.MagicMock() + + scan.post_scan() + + scan._restore_motor_properties.assert_called_once_with() + scan.actions.complete_all_devices.assert_called_once_with(wait=False) + scan.components.move_and_wait.assert_called_once_with(scan.motors, scan.start_positions) + assert completion_status.wait_calls == 1 + + +def test_cont_grid_prepare_scan_keeps_generated_positions_stable(v4_scan_assembler, device_manager): + scan = _assemble_cont_grid_scan(v4_scan_assembler, device_manager) + + scan.prepare_scan() + + assert np.array_equal(scan.positions, np.array([[1.0, -2.0], [1.0, 2.0]])) + assert scan.scan_info.frames_per_trigger == 2 + assert scan._cont_motor_params["num_lines"] == 2 + assert scan.scan_info.additional_scan_parameters["num_lines"] == 2 + assert np.array_equal( + scan.scan_info.additional_scan_parameters["computed_positions"], + np.array([[-1.0, -2.0], [1.0, -2.0], [-1.0, 2.0], [1.0, 2.0]]), + )