0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-13 19:21:50 +02:00
Files
bec_widgets/tests/unit_tests/test_scan_control.py

580 lines
23 KiB
Python

# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_lib.messages import AvailableResourceMessage, ScanHistoryMessage
from qtpy.QtCore import QModelIndex, Qt
from bec_widgets.utils.forms_from_types.items import StrFormItem
from bec_widgets.utils.widget_io import WidgetIO
from bec_widgets.widgets.control.scan_control import ScanControl
from .client_mocks import mocked_client
# pylint: disable=no-member
# pylint: disable=missing-function-docstring
# pylint: disable=redefined-outer-name
# pylint: disable=protected-access
available_scans_message = AvailableResourceMessage(
resource={
"line_scan": {
"class": "LineScan",
"base_class": "ScanBase",
"arg_input": {"device": "device", "start": "float", "stop": "float"},
"gui_config": {
"scan_class_name": "LineScan",
"arg_group": {
"name": "Scan Arguments",
"bundle": 3,
"arg_inputs": {"device": "device", "start": "float", "stop": "float"},
"inputs": [
{
"arg": True,
"name": "device",
"type": "device",
"display_name": "Device",
"tooltip": None,
"default": None,
"expert": False,
},
{
"arg": True,
"name": "start",
"type": "float",
"display_name": "Start",
"tooltip": None,
"default": None,
"expert": False,
},
{
"arg": True,
"name": "stop",
"type": "float",
"display_name": "Stop",
"tooltip": None,
"default": None,
"expert": False,
},
],
"min": 1,
"max": None,
},
"kwarg_groups": [
{
"name": "Movement Parameters",
"inputs": [
{
"arg": False,
"name": "steps",
"type": "int",
"display_name": "Steps",
"tooltip": "Number of steps",
"default": None,
"expert": False,
},
{
"arg": False,
"name": "relative",
"type": "bool",
"display_name": "Relative",
"tooltip": "If True, the start and end positions are relative to the current position",
"default": False,
"expert": False,
},
],
},
{
"name": "Acquisition Parameters",
"inputs": [
{
"arg": False,
"name": "exp_time",
"type": "float",
"display_name": "Exp Time",
"tooltip": "Exposure time in s",
"default": 0,
"expert": False,
},
{
"arg": False,
"name": "burst_at_each_point",
"type": "int",
"display_name": "Burst At Each Point",
"tooltip": "Number of acquisition per point",
"default": 1,
"expert": False,
},
],
},
],
},
"required_kwargs": ["steps", "relative"],
"arg_bundle_size": {"bundle": 3, "min": 1, "max": None},
},
"grid_scan": {
"class": "Scan",
"base_class": "ScanBase",
"arg_input": {"device": "device", "start": "float", "stop": "float", "steps": "int"},
"gui_config": {
"scan_class_name": "Scan",
"arg_group": {
"name": "Scan Arguments",
"bundle": 4,
"arg_inputs": {
"device": "device",
"start": "float",
"stop": "float",
"steps": "int",
},
"inputs": [
{
"arg": True,
"name": "device",
"type": "device",
"display_name": "Device",
"tooltip": None,
"default": None,
"expert": False,
},
{
"arg": True,
"name": "start",
"type": "float",
"display_name": "Start",
"tooltip": None,
"default": None,
"expert": False,
},
{
"arg": True,
"name": "stop",
"type": "float",
"display_name": "Stop",
"tooltip": None,
"default": None,
"expert": False,
},
{
"arg": True,
"name": "steps",
"type": "int",
"display_name": "Steps",
"tooltip": None,
"default": None,
"expert": False,
},
],
"min": 2,
"max": None,
},
"kwarg_groups": [
{
"name": "Scan Parameters",
"inputs": [
{
"arg": False,
"name": "exp_time",
"type": "float",
"display_name": "Exp Time",
"tooltip": "Exposure time in seconds",
"default": 0,
"expert": False,
},
{
"arg": False,
"name": "settling_time",
"type": "float",
"display_name": "Settling Time",
"tooltip": "Settling time in seconds",
"default": 0,
"expert": False,
},
{
"arg": False,
"name": "burst_at_each_point",
"type": "int",
"display_name": "Burst At Each Point",
"tooltip": "Number of exposures at each point",
"default": 1,
"expert": False,
},
{
"arg": False,
"name": "relative",
"type": "bool",
"display_name": "Relative",
"tooltip": "If True, the motors will be moved relative to their current position",
"default": False,
"expert": False,
},
],
}
],
},
"required_kwargs": ["relative"],
"arg_bundle_size": {"bundle": 4, "min": 2, "max": None},
},
"not_supported_scan_class": {"base_class": "NotSupportedScanClass"},
}
)
scan_history = ScanHistoryMessage(
metadata={},
scan_id="79cbef20-9ebe-45bb-a44c-f518be27a25c",
scan_number=1,
dataset_number=1,
file_path="/somepath/scan_1.h5",
exit_status="closed",
start_time=1750618470.936856,
end_time=1750618473.668227,
scan_name="line_scan",
num_points=100,
request_inputs={
"arg_bundle": ["samx", 0.0, 2.0],
"inputs": {},
"kwargs": {
"steps": 10,
"exp_time": 2,
"relative": False,
"system_config": {"file_suffix": None, "file_directory": None},
},
},
)
@pytest.fixture(scope="function")
def scan_control(qtbot, mocked_client): # , mock_dev):
mocked_client.connector.set(MessageEndpoints.available_scans(), available_scans_message)
mocked_client.connector.xadd(
topic=MessageEndpoints.scan_history(), msg_dict={"data": scan_history}
)
widget = ScanControl(client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def test_populate_scans(scan_control, mocked_client):
expected_scans = ["line_scan", "grid_scan"]
items = [
scan_control.comboBox_scan_selection.itemText(i)
for i in range(scan_control.comboBox_scan_selection.count())
]
assert scan_control.comboBox_scan_selection.count() == 2
assert sorted(items) == sorted(expected_scans)
def test_current_scan(scan_control, mocked_client):
current_scan = scan_control.current_scan
wrong_scan = "error_scan"
scan_control.current_scan = wrong_scan
assert scan_control.current_scan == current_scan
new_scan = "grid_scan" if current_scan == "line_scan" else "line_scan"
scan_control.current_scan = new_scan
assert scan_control.current_scan == new_scan
@pytest.mark.parametrize("scan_name", ["line_scan", "grid_scan"])
def test_on_scan_selected(scan_control, scan_name):
expected_scan_info = available_scans_message.resource[scan_name]
scan_control.comboBox_scan_selection.setCurrentText(scan_name)
# Check arg_box labels and widgets
for index, (arg_key, arg_value) in enumerate(expected_scan_info["arg_input"].items()):
label = scan_control.arg_box.layout.itemAtPosition(0, index).widget()
assert label.text().lower() == arg_key
for row in range(1, expected_scan_info["arg_bundle_size"]["min"] + 1):
widget = scan_control.arg_box.layout.itemAtPosition(row, index).widget()
assert widget is not None # Confirm that a widget exists
expected_widget_type = scan_control.arg_box.WIDGET_HANDLER.get(arg_value, None)
assert isinstance(widget, expected_widget_type) # Confirm the widget type matches
# Check kwargs boxes
kwargs_group = [param for param in expected_scan_info["gui_config"]["kwarg_groups"]]
print(kwargs_group)
for kwarg_box, kwarg_group in zip(scan_control.kwarg_boxes, kwargs_group):
assert kwarg_box.title() == kwarg_group["name"]
for index, kwarg_info in enumerate(kwarg_group["inputs"]):
label = kwarg_box.layout.itemAtPosition(0, index).widget()
assert label.text() == kwarg_info["display_name"]
widget = kwarg_box.layout.itemAtPosition(1, index).widget()
expected_widget_type = kwarg_box.WIDGET_HANDLER.get(kwarg_info["type"], None)
assert isinstance(widget, expected_widget_type)
@pytest.mark.parametrize("scan_name", ["line_scan", "grid_scan"])
def test_add_remove_bundle(scan_control, scan_name, qtbot):
expected_scan_info = available_scans_message.resource[scan_name]
scan_control.comboBox_scan_selection.setCurrentText(scan_name)
# Initial number of args row
initial_num_of_rows = scan_control.arg_box.count_arg_rows()
assert initial_num_of_rows == expected_scan_info["arg_bundle_size"]["min"]
scan_control.arg_box.button_add_bundle.click()
scan_control.arg_box.button_add_bundle.click()
if expected_scan_info["arg_bundle_size"]["max"] is None:
assert scan_control.arg_box.count_arg_rows() == initial_num_of_rows + 2
# Remove one bundle
scan_control.arg_box.button_remove_bundle.click()
qtbot.wait(200)
assert scan_control.arg_box.count_arg_rows() == initial_num_of_rows + 1
def test_run_line_scan_with_parameters(scan_control, mocked_client):
scan_name = "line_scan"
kwargs = {"exp_time": 0.1, "steps": 10, "relative": True, "burst_at_each_point": 1}
args = {"device": "samx", "start": -5, "stop": 5}
mock_slot = MagicMock()
scan_control.scan_args.connect(mock_slot)
scan_control.comboBox_scan_selection.setCurrentText(scan_name)
# Set kwargs in the UI
for kwarg_box in scan_control.kwarg_boxes:
for widget in kwarg_box.widgets:
if widget.arg_name in kwargs:
WidgetIO.set_value(widget, kwargs[widget.arg_name])
# Set args in the UI
for widget in scan_control.arg_box.widgets:
if widget.arg_name in args:
WidgetIO.set_value(widget, args[widget.arg_name])
# Mock the scan function
mocked_scan_function = MagicMock()
setattr(mocked_client.scans, scan_name, mocked_scan_function)
# Run the scan
scan_control.button_run_scan.click()
# Retrieve the actual arguments passed to the mock
called_args, called_kwargs = mocked_scan_function.call_args
# Check if the scan function was called correctly
expected_device = mocked_client.device_manager.devices.samx
expected_args_list = [expected_device, args["start"], args["stop"]]
assert called_args == tuple(expected_args_list)
assert called_kwargs == kwargs | {"metadata": {"sample_name": ""}}
# Check the emitted signal
mock_slot.assert_called_once()
emitted_args_list = mock_slot.call_args[0][0]
assert len(emitted_args_list) == 3 # Expected 3 arguments for line_scan
assert emitted_args_list == [expected_device, -5.0, 5.0]
def test_run_grid_scan_with_parameters(scan_control, mocked_client):
scan_name = "grid_scan"
kwargs = {"exp_time": 0.2, "settling_time": 0.1, "relative": False, "burst_at_each_point": 2}
args_row1 = {"device": "samx", "start": -10, "stop": 10, "steps": 20}
args_row2 = {"device": "samy", "start": -5, "stop": 5, "steps": 10}
mock_slot = MagicMock()
scan_control.scan_args.connect(mock_slot)
scan_control.comboBox_scan_selection.setCurrentText(scan_name)
# Ensure there are two rows in the arg_box
current_rows = scan_control.arg_box.count_arg_rows()
required_rows = 2
while current_rows < required_rows:
scan_control.arg_box.add_widget_bundle()
current_rows += 1
# Set kwargs in the UI
for kwarg_box in scan_control.kwarg_boxes:
for widget in kwarg_box.widgets:
if widget.arg_name in kwargs:
WidgetIO.set_value(widget, kwargs[widget.arg_name])
# Set args in the UI for both rows
arg_widgets = scan_control.arg_box.widgets # This is a flat list of widgets
num_columns = len(scan_control.arg_box.inputs)
num_rows = int(len(arg_widgets) / num_columns)
assert num_rows == required_rows # We expect 2 rows for grid_scan
# Set values for first row
for i in range(num_columns):
widget = arg_widgets[i]
arg_name = widget.arg_name
if arg_name in args_row1:
WidgetIO.set_value(widget, args_row1[arg_name])
# Set values for second row
for i in range(num_columns):
widget = arg_widgets[num_columns + i] # Next row
arg_name = widget.arg_name
if arg_name in args_row2:
WidgetIO.set_value(widget, args_row2[arg_name])
# Mock the scan function
mocked_scan_function = MagicMock()
setattr(mocked_client.scans, scan_name, mocked_scan_function)
# Run the scan
scan_control.button_run_scan.click()
# Retrieve the actual arguments passed to the mock
called_args, called_kwargs = mocked_scan_function.call_args
# Check if the scan function was called correctly
expected_device1 = mocked_client.device_manager.devices.samx
expected_device2 = mocked_client.device_manager.devices.samy
expected_args_list = [
expected_device1,
args_row1["start"],
args_row1["stop"],
args_row1["steps"],
expected_device2,
args_row2["start"],
args_row2["stop"],
args_row2["steps"],
]
assert called_args == tuple(expected_args_list)
assert called_kwargs == kwargs | {"metadata": {"sample_name": ""}}
# Check the emitted signal
mock_slot.assert_called_once()
emitted_args_list = mock_slot.call_args[0][0]
assert len(emitted_args_list) == 8 # Expected 8 arguments for grid_scan
assert emitted_args_list == expected_args_list
def test_changing_scans_remember_parameters(scan_control, mocked_client):
scan_name = "line_scan"
kwargs = {"exp_time": 0.1, "steps": 10, "relative": True, "burst_at_each_point": 1}
args = {"device": "samx", "start": -5, "stop": 5}
scan_control.comboBox_scan_selection.setCurrentText(scan_name)
# Set kwargs in the UI
for kwarg_box in scan_control.kwarg_boxes:
for widget in kwarg_box.widgets:
for key, value in kwargs.items():
if widget.arg_name == key:
WidgetIO.set_value(widget, value)
break
# Set args in the UI
for widget in scan_control.arg_box.widgets:
for key, value in args.items():
if widget.arg_name == key:
WidgetIO.set_value(widget, value)
break
scan_control.save_current_scan_parameters()
# Change the scan
new_scan_name = "grid_scan"
scan_control.comboBox_scan_selection.setCurrentText(new_scan_name)
# Check if kwargs are same as in the line_scan
grid_args, grid_kwargs = scan_control.get_scan_parameters(bec_object=False)
assert grid_kwargs["exp_time"] == kwargs["exp_time"]
assert grid_kwargs["relative"] == kwargs["relative"]
assert grid_kwargs["burst_at_each_point"] == kwargs["burst_at_each_point"]
def test_get_scan_parameters_from_redis(scan_control, mocked_client):
scan_name = "line_scan"
scan_control.comboBox_scan_selection.setCurrentText(scan_name)
scan_control.toggle.checked = True
args, kwargs = scan_control.get_scan_parameters(bec_object=False)
assert args == ["samx", 0.0, 2.0]
assert kwargs == {"steps": 10, "relative": False, "exp_time": 2.0, "burst_at_each_point": 1}
TEST_MD = {"sample_name": "Test Sample", "test key 1": "test value 1", "test key 2": "test value 2"}
TEST_TABLE_ENTRY = [["test key 1", "test value 1"], ["test key 2", "test value 2"]]
def test_scan_metadata_is_updated_even_without_default_form_changes(
scan_control: ScanControl, qtbot
):
assert scan_control._metadata_form._scan_name == "line_scan"
scan_control.comboBox_scan_selection.setCurrentText("grid_scan")
assert scan_control._metadata_form._scan_name == "grid_scan"
scan_control._metadata_form._additional_metadata._add_button.click()
qtbot.wait(100)
table_model = scan_control._metadata_form._additional_metadata._table_model
model_key = table_model.index(0, 0, QModelIndex())
table_model.setData(model_key, "test key 1", Qt.EditRole)
model_value = model_key.siblingAtColumn(1)
table_model.setData(model_value, "test value 1", Qt.EditRole)
assert scan_control._metadata_form._additional_metadata.dump_dict() == {
"test key 1": "test value 1"
}
assert scan_control._scan_metadata == {"sample_name": "", "test key 1": "test value 1"}
def test_scan_metadata_is_connected(scan_control):
assert scan_control._metadata_form._scan_name == "line_scan"
scan_control.comboBox_scan_selection.setCurrentText("grid_scan")
assert scan_control._metadata_form._scan_name == "grid_scan"
sample_name = scan_control._metadata_form._form_grid.layout().itemAtPosition(0, 1).widget()
assert isinstance(sample_name, StrFormItem)
sample_name._main_widget.setText("Test Sample")
scan_control._metadata_form._additional_metadata._table_model._data = TEST_TABLE_ENTRY
scan_control._metadata_form.validate_form()
assert scan_control._scan_metadata == TEST_MD
def test_scan_metadata_is_passed_to_scan_function(scan_control: ScanControl):
scan_control.comboBox_scan_selection.setCurrentText("grid_scan")
sample_name = scan_control._metadata_form._form_grid.layout().itemAtPosition(0, 1).widget()
sample_name._main_widget.setText("Test Sample")
scan_control._metadata_form._additional_metadata._table_model._data = TEST_TABLE_ENTRY
scan_control._metadata_form.validate_form()
assert scan_control._scan_metadata == TEST_MD
scans = SimpleNamespace(grid_scan=MagicMock())
with (
patch.object(scan_control, "scans", scans),
patch.object(scan_control, "get_scan_parameters", lambda: ((), {})),
):
scan_control.run_scan()
scans.grid_scan.assert_called_once_with(metadata=TEST_MD)
def test_restore_parameters_with_fewer_arg_bundles(scan_control, qtbot):
"""
Ensure that when more argument bundles are present than exist in the
stored history, restoring parameters regenerates the arg box to the
correct (smaller) size and sets the values properly.
This is a check for the previous infinite loop bug.
"""
# Select the scan type that has history with only one arg bundle
scan_control.comboBox_scan_selection.setCurrentText("line_scan")
# Manually add bundles so we end up with three rows
while scan_control.arg_box.count_arg_rows() < 3:
scan_control.arg_box.add_widget_bundle()
assert scan_control.arg_box.count_arg_rows() == 3
# Trigger restore of parameters from history
scan_control.toggle.checked = True
qtbot.wait(200)
# After restore, arg_box should have only one bundle (the history size)
assert scan_control.arg_box.count_arg_rows() == 1
# Verify that the restored parameter values match the history
args, kwargs = scan_control.get_scan_parameters(bec_object=False)
assert args == ["samx", 0.0, 2.0]
assert kwargs["steps"] == 10