test: add cont grid scan tests #233

Merged
wakonig_k merged 1 commits from test/cont_grid into main 2026-06-24 15:33:29 +02:00
3 changed files with 136 additions and 47 deletions
+10 -47
View File
@@ -75,58 +75,21 @@ class ContGrid(ScanBase):
}
def __init__(
# fmt: off
self,
fast_axis: Annotated[
DeviceBase,
ScanArgument(display_name="Fast Axis", description="Axis with continuous motion."),
],
fast_start: Annotated[
float,
ScanArgument(
display_name="Fast Start",
description="Start position for measurement points of the fast axis.",
),
],
fast_end: Annotated[
float,
ScanArgument(
display_name="Fast End",
description="End position for measurement points of the fast axis.",
),
],
fast_step_size: Annotated[
float,
ScanArgument(
display_name="Fast Step Size",
description="Step size for points of the continuous motion axis.",
),
],
stepper_axis: Annotated[
DeviceBase,
ScanArgument(
display_name="Step Axis",
description="Step axis of the grid scan, stepping through the lines.",
),
],
stepper_start: Annotated[
float,
ScanArgument(display_name="Step Start", description="Start position of the step axis."),
],
stepper_stop: Annotated[
float,
ScanArgument(display_name="Step Stop", description="End position of the step axis."),
],
stepper_step_size: Annotated[
float,
ScanArgument(
display_name="Step Step Size",
description="Step size of the step axis in units of the motor.",
),
],
fast_axis: Annotated[DeviceBase, ScanArgument(display_name="Fast Axis", description="Axis with continuous motion.")],
fast_start: Annotated[float, ScanArgument(display_name="Fast Start", description="Start position for measurement points of the fast axis.")],
fast_end: Annotated[float, ScanArgument(display_name="Fast End", description="End position for measurement points of the fast axis.")],
fast_step_size: Annotated[float, ScanArgument(display_name="Fast Step Size", description="Step size for points of the continuous motion axis.")],
stepper_axis: Annotated[DeviceBase, ScanArgument(display_name="Step Axis", description="Step axis of the grid scan, stepping through the lines.")],
stepper_start: Annotated[float, ScanArgument(display_name="Step Start", description="Start position of the step axis.")],
stepper_stop: Annotated[float, ScanArgument(display_name="Step Stop", description="End position of the step axis.")],
stepper_step_size: Annotated[float, ScanArgument(display_name="Step Step Size", description="Step size of the step axis in units of the motor.")],
exp_time: DefaultArgType.ExposureTime,
relative: DefaultArgType.Relative = False,
fast_axis_always_in_pos_dir: bool = True,
**kwargs,
# fmt: on
):
"""
Continuous grid scan with 2-axis. The scan requires the fast axis to properly implement base velocity as well as high velocity and high acceleration time
+1
View File
@@ -0,0 +1 @@
from bec_server.scan_server.tests.scan_fixtures import *
+125
View File
@@ -0,0 +1,125 @@
from unittest import mock
import numpy as np
import pytest
from bec_server.scan_server.tests.scan_fixtures import MockCustomDevice
from bec_server.scan_server.tests.scan_hook_tests import (
DEFAULT_HOOK_TESTS,
PREMOVE_HOOK_TESTS,
run_scan_tests,
)
CONT_GRID_HOOK_TESTS = [*DEFAULT_HOOK_TESTS, *PREMOVE_HOOK_TESTS]
def _assemble_cont_grid_scan(v4_scan_assembler, device_manager):
custom_samx = MockCustomDevice(
"samx",
device_info={
"signals": {
"readback": {
"obj_name": "samx",
"kind_str": "hinted",
"describe": {"precision": 3},
},
"velocity": {
"obj_name": "samx_velocity",
"kind_str": "config",
"describe": {"precision": 3},
},
"acceleration": {
"obj_name": "samx_acceleration",
"kind_str": "config",
"describe": {"precision": 3},
},
"base_velocity": {
"obj_name": "samx_base_velocity",
"kind_str": "config",
"describe": {"precision": 3},
},
}
},
signal_read_values={
"samx": 0.0,
"samx_velocity": 10.0,
"samx_acceleration": 2.0,
"samx_base_velocity": 0.0,
},
)
custom_samy = MockCustomDevice(
"samy",
device_info={
"signals": {
"readback": {"obj_name": "samy", "kind_str": "hinted", "describe": {"precision": 3}}
}
},
signal_read_values={"samy": 0.0},
)
custom_ddg1 = MockCustomDevice(
"ddg1",
device_info={
"signals": {
"readback": {"obj_name": "ddg1", "kind_str": "hinted", "describe": {"precision": 3}}
}
},
signal_read_values={"ddg1": 0.0},
)
custom_ddg1.get_shutter_to_open_delay = mock.MagicMock(return_value=2e-3)
custom_mcs = MockCustomDevice(
"mcs",
device_info={
"signals": {
"readback": {"obj_name": "mcs", "kind_str": "hinted", "describe": {"precision": 3}}
}
},
signal_read_values={"mcs": 0.0},
)
device_manager.add_device(custom_samx, replace=True)
device_manager.add_device(custom_samy, replace=True)
device_manager.add_device(custom_mcs, replace=True)
device_manager.add_device(custom_ddg1, replace=True)
return v4_scan_assembler("cont_grid", "samx", -1.0, 1.0, 1, "samy", -2.0, 2.0, 2, exp_time=0.1)
@pytest.mark.parametrize(("hook_name", "hook_tests"), CONT_GRID_HOOK_TESTS)
def test_cont_grid_default_hooks(
v4_scan_assembler, device_manager, nth_done_status_mock, hook_name, hook_tests
):
scan = _assemble_cont_grid_scan(v4_scan_assembler, device_manager)
run_scan_tests(scan, [(hook_name, hook_tests)], nth_done_status_mock=nth_done_status_mock)
def test_cont_grid_post_scan_waits_for_completion_and_moves_back_when_relative(
v4_scan_assembler, device_manager, nth_done_status_mock
):
scan = _assemble_cont_grid_scan(v4_scan_assembler, device_manager)
completion_status = nth_done_status_mock(resolve_after=3)
scan.relative = True
scan.start_positions = [1.2, -0.7]
scan.actions.complete_all_devices = mock.MagicMock(return_value=completion_status)
scan.components.move_and_wait = mock.MagicMock()
scan._restore_motor_properties = mock.MagicMock()
scan.post_scan()
scan._restore_motor_properties.assert_called_once_with()
scan.actions.complete_all_devices.assert_called_once_with(wait=False)
scan.components.move_and_wait.assert_called_once_with(scan.motors, scan.start_positions)
assert completion_status.wait_calls == 1
def test_cont_grid_prepare_scan_keeps_generated_positions_stable(v4_scan_assembler, device_manager):
scan = _assemble_cont_grid_scan(v4_scan_assembler, device_manager)
scan.prepare_scan()
assert np.array_equal(scan.positions, np.array([[1.0, -2.0], [1.0, 2.0]]))
assert scan.scan_info.frames_per_trigger == 2
assert scan._cont_motor_params["num_lines"] == 2
assert scan.scan_info.additional_scan_parameters["num_lines"] == 2
assert np.array_equal(
scan.scan_info.additional_scan_parameters["computed_positions"],
np.array([[-1.0, -2.0], [1.0, -2.0], [-1.0, 2.0], [1.0, 2.0]]),
)