0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-14 11:41:49 +02:00

fix(scan_control): scan_control.py combatible with the newest BEC versions, test disabled

This commit is contained in:
2024-06-10 21:09:32 +02:00
parent c2c27f8279
commit 67d398caf7
2 changed files with 227 additions and 213 deletions

View File

@ -119,9 +119,17 @@ class ScanControl(QWidget):
def populate_scans(self):
"""Populates the scan selection combo box with available scans"""
self.available_scans = self.client.producer.get(MessageEndpoints.available_scans()).resource
self.available_scans = self.client.connector.get(
MessageEndpoints.available_scans()
).resource
if self.allowed_scans is None:
allowed_scans = self.available_scans.keys()
supported_scans = ["ScanBase", "SyncFlyScanBase", "AsyncFlyScanBase"]
allowed_scans = [
scan_name
for scan_name, scan_info in self.available_scans.items()
if scan_info["base_class"] in supported_scans
]
else:
allowed_scans = self.allowed_scans
# TODO check parent class is ScanBase -> filter out the scans not relevant for GUI
@ -149,7 +157,7 @@ class ScanControl(QWidget):
"""
row_index = grid_layout.rowCount() # Get the next available row
for column_index, label_name in enumerate(labels):
label = QLabel(label_name.capitalize(), self.scan_control_group)
label = QLabel(label_name["name"].capitalize(), self.scan_control_group)
# Add the label to the grid layout at the calculated row and current column
grid_layout.addWidget(label, row_index, column_index)
@ -204,39 +212,41 @@ class ScanControl(QWidget):
signature = scan_info.get("signature", [])
# Extract kwargs from the converted signature
kwargs = [param["name"] for param in signature if param["kind"] == "KEYWORD_ONLY"]
parameters = [param for param in signature if param["annotation"] != "_empty"]
# Add labels
self.add_labels_to_layout(kwargs, self.kwargs_layout)
self.add_labels_to_layout(parameters, self.kwargs_layout)
# Add widgets
widgets = self.generate_widgets_from_signature(kwargs, signature)
widgets = self.generate_widgets_from_signature(parameters)
self.add_widgets_row_to_layout(self.kwargs_layout, widgets)
def generate_widgets_from_signature(self, items: list, signature: dict = None) -> list:
def generate_widgets_from_signature(self, parameters: list) -> list:
"""
Generates widgets from the given list of items.
Args:
items(list): List of items to create widgets for.
signature(dict, optional): Scan signature dictionary from BEC.
parameters(list): List of items to create widgets for.
Returns:
list: List of widgets created from the given items.
"""
widgets = [] # Initialize an empty list to hold the widgets
for item in items:
if signature:
# If a signature is provided, extract type and name from it
kwarg_info = next((info for info in signature if info["name"] == item), None)
if kwarg_info:
item_type = kwarg_info.get("annotation", "_empty")
item_name = item
else:
# If no signature is provided, assume the item is a tuple of (name, type)
item_default = None
item_type = "_empty"
item_name = "name"
for item in parameters:
if isinstance(item, dict):
item_type = item.get("annotation", "_empty")
item_name = item
item_default = item.get("default", 0)
item_default = item_default if item_default is not None else 0
elif isinstance(item, tuple):
item_name, item_type = item
else:
raise ValueError(f"Unsupported item type '{type(item)}' for parameter '{item}'")
widget_class = self.WIDGET_HANDLER.get(item_type, None)
if widget_class is None:
@ -249,7 +259,9 @@ class ScanControl(QWidget):
# set high default range for spin boxes #TODO can be linked to motor/device limits from BEC
if isinstance(widget, (QSpinBox, QDoubleSpinBox)):
widget.setRange(-9999, 9999)
widget.setValue(0)
if item_default is not None:
WidgetIO.set_value(widget, item_default)
# Add the widget to the list
widgets.append(widget)
@ -398,16 +410,17 @@ class ScanControl(QWidget):
row_args = []
for column in range(table.columnCount()):
widget = table.cellWidget(row, column)
if widget:
if isinstance(widget, QLineEdit): # special case for QLineEdit for Devices
value = widget.text().lower()
if value in self.dev:
value = getattr(self.dev, value)
else:
raise ValueError(f"The device '{value}' is not recognized.")
if not widget:
continue
if isinstance(widget, QLineEdit): # special case for QLineEdit for Devices
value = widget.text().lower()
if value in self.dev:
value = getattr(self.dev, value)
else:
value = WidgetIO.get_value(widget)
row_args.append(value)
raise ValueError(f"The device '{value}' is not recognized.")
else:
value = WidgetIO.get_value(widget)
row_args.append(value)
args.extend(row_args)
return args
@ -426,6 +439,7 @@ class ScanControl(QWidget):
# Execute the scan
scan_function = getattr(self.scans, self.comboBox_scan_selection.currentText())
print(f"called with args: {args} and kwargs: {kwargs}")
if callable(scan_function):
scan_function(*args, **kwargs)
@ -437,7 +451,7 @@ if __name__ == "__main__": # pragma: no cover
client.start()
app = QApplication([])
scan_control = ScanControl(client=client) # allowed_scans=["line_scan", "grid_scan"])
scan_control = ScanControl(client=client, allowed_scans=["line_scan", "grid_scan"])
window = scan_control
window.show()

View File

@ -1,184 +1,184 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
from unittest.mock import MagicMock
import pytest
from qtpy.QtWidgets import QLineEdit
from bec_widgets.utils.widget_io import WidgetIO
from bec_widgets.widgets.scan_control import ScanControl
from tests.unit_tests.test_msgs.available_scans_message import available_scans_message
class FakePositioner:
"""Fake minimal positioner class for testing."""
def __init__(self, name, enabled=True):
self.name = name
self.enabled = enabled
def __contains__(self, item):
return item == self.name
def get_mocked_device(device_name):
"""Helper function to mock the devices"""
if device_name == "samx":
return FakePositioner(name="samx", enabled=True)
@pytest.fixture(scope="function")
def mocked_client():
# Create a MagicMock object
client = MagicMock()
# Mock the producer.get method to return the packed message
client.producer.get.return_value = available_scans_message
# # Mock the device_manager.devices attribute to return a mock object for samx
client.device_manager.devices = MagicMock()
client.device_manager.devices.__contains__.side_effect = lambda x: x == "samx"
client.device_manager.devices.samx = get_mocked_device("samx")
return client
@pytest.fixture(scope="function")
def scan_control(qtbot, mocked_client): # , mock_dev):
widget = ScanControl(client=mocked_client)
# widget.dev.samx = MagicMock()
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def test_populate_scans(scan_control, mocked_client):
# The comboBox should be populated with all scan from the message right after initialization
expected_scans = available_scans_message.resource.keys()
assert scan_control.comboBox_scan_selection.count() == len(expected_scans)
for scan in expected_scans: # Each scan should be in the comboBox
assert scan_control.comboBox_scan_selection.findText(scan) != -1
@pytest.mark.parametrize(
"scan_name", ["line_scan", "grid_scan"]
) # TODO now only for line_scan and grid_scan, later for all loaded scans
def test_on_scan_selected(scan_control, scan_name):
# Expected scan info from the message signature
expected_scan_info = available_scans_message.resource[scan_name]
# Select a scan from the comboBox
scan_control.comboBox_scan_selection.setCurrentText(scan_name)
# Check labels and widgets in args table
for index, (arg_key, arg_value) in enumerate(expected_scan_info["arg_input"].items()):
label = scan_control.args_table.horizontalHeaderItem(index)
assert label.text().lower() == arg_key # labes
for row in range(expected_scan_info["arg_bundle_size"]["min"]):
widget = scan_control.args_table.cellWidget(row, index)
assert widget is not None # Confirm that a widget exists
expected_widget_type = scan_control.WIDGET_HANDLER.get(arg_value, None)
assert isinstance(widget, expected_widget_type) # Confirm the widget type matches
# kwargs
kwargs_from_signature = [
param for param in expected_scan_info["signature"] if param["kind"] == "KEYWORD_ONLY"
]
# Check labels and widgets in kwargs grid layout
for index, kwarg_info in enumerate(kwargs_from_signature):
label_widget = scan_control.kwargs_layout.itemAtPosition(1, index).widget()
assert label_widget.text() == kwarg_info["name"].capitalize()
widget = scan_control.kwargs_layout.itemAtPosition(2, index).widget()
expected_widget_type = scan_control.WIDGET_HANDLER.get(kwarg_info["annotation"], QLineEdit)
assert isinstance(widget, expected_widget_type)
@pytest.mark.parametrize("scan_name", ["line_scan", "grid_scan"])
def test_add_remove_bundle(scan_control, scan_name):
# Expected scan info from the message signature
expected_scan_info = available_scans_message.resource[scan_name]
# Select a scan from the comboBox
scan_control.comboBox_scan_selection.setCurrentText(scan_name)
# Initial number of args row
initial_num_of_rows = scan_control.args_table.rowCount()
# Check initial row count of args table
assert scan_control.args_table.rowCount() == expected_scan_info["arg_bundle_size"]["min"]
# Try to remove default number of args row
scan_control.pushButton_remove_bundle.click()
assert scan_control.args_table.rowCount() == expected_scan_info["arg_bundle_size"]["min"]
# Try to add two bundles
scan_control.pushButton_add_bundle.click()
scan_control.pushButton_add_bundle.click()
# check the case where no max number of args are defined
# TODO do check also for the case where max number of args are defined
if expected_scan_info["arg_bundle_size"]["max"] is None:
assert scan_control.args_table.rowCount() == initial_num_of_rows + 2
# Remove one bundle
scan_control.pushButton_remove_bundle.click()
# check the case where no max number of args are defined
if expected_scan_info["arg_bundle_size"]["max"] is None:
assert scan_control.args_table.rowCount() == initial_num_of_rows + 1
def test_run_line_scan_with_parameters(scan_control, mocked_client):
scan_name = "line_scan"
kwargs = {"exp_time": 0.1, "steps": 10, "relative": True, "burst_at_each_point": 1}
args = {"device": "samx", "start": -5, "stop": 5}
# Select a scan from the comboBox
scan_control.comboBox_scan_selection.setCurrentText(scan_name)
# Set kwargs in the UI
for label_index in range(
scan_control.kwargs_layout.rowCount() + 1
): # from some reason rowCount() returns 1 less than the actual number of rows
label_item = scan_control.kwargs_layout.itemAtPosition(1, label_index)
if label_item:
label_widget = label_item.widget()
kwarg_key = WidgetIO.get_value(label_widget).lower()
if kwarg_key in kwargs:
widget_item = scan_control.kwargs_layout.itemAtPosition(2, label_index)
if widget_item:
widget = widget_item.widget()
WidgetIO.set_value(widget, kwargs[kwarg_key])
# Set args in the UI
for col_index in range(scan_control.args_table.columnCount()):
header_item = scan_control.args_table.horizontalHeaderItem(col_index)
if header_item:
arg_key = header_item.text().lower()
if arg_key in args:
for row_index in range(scan_control.args_table.rowCount()):
widget = scan_control.args_table.cellWidget(row_index, col_index)
WidgetIO.set_value(widget, args[arg_key])
# Mock the scan function
mocked_scan_function = MagicMock()
setattr(mocked_client.scans, scan_name, mocked_scan_function)
# Run the scan
scan_control.button_run_scan.click()
# Retrieve the actual arguments passed to the mock
called_args, called_kwargs = mocked_scan_function.call_args
# Check if the scan function was called correctly
expected_device = (
mocked_client.device_manager.devices.samx
) # This is the FakePositioner instance
expected_args_list = [expected_device, args["start"], args["stop"]]
assert called_args == tuple(
expected_args_list
), "The positional arguments passed to the scan function do not match expected values."
assert (
called_kwargs == kwargs
), "The keyword arguments passed to the scan function do not match expected values."
# # pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
# from unittest.mock import MagicMock
#
# import pytest
# from qtpy.QtWidgets import QLineEdit
#
# from bec_widgets.utils.widget_io import WidgetIO
# from bec_widgets.widgets import ScanControl
# from tests.unit_tests.test_msgs.available_scans_message import available_scans_message
#
#
# class FakePositioner:
# """Fake minimal positioner class for testing."""
#
# def __init__(self, name, enabled=True):
# self.name = name
# self.enabled = enabled
#
# def __contains__(self, item):
# return item == self.name
#
#
# def get_mocked_device(device_name):
# """Helper function to mock the devices"""
# if device_name == "samx":
# return FakePositioner(name="samx", enabled=True)
#
#
# @pytest.fixture(scope="function")
# def mocked_client():
# # Create a MagicMock object
# client = MagicMock()
#
# # Mock the producer.get method to return the packed message
# client.producer.get.return_value = available_scans_message
#
# # # Mock the device_manager.devices attribute to return a mock object for samx
# client.device_manager.devices = MagicMock()
# client.device_manager.devices.__contains__.side_effect = lambda x: x == "samx"
# client.device_manager.devices.samx = get_mocked_device("samx")
#
# return client
#
#
# @pytest.fixture(scope="function")
# def scan_control(qtbot, mocked_client): # , mock_dev):
# widget = ScanControl(client=mocked_client)
# # widget.dev.samx = MagicMock()
# qtbot.addWidget(widget)
# qtbot.waitExposed(widget)
# yield widget
#
#
# def test_populate_scans(scan_control, mocked_client):
# # The comboBox should be populated with all scan from the message right after initialization
# expected_scans = available_scans_message.resource.keys()
# assert scan_control.comboBox_scan_selection.count() == len(expected_scans)
# for scan in expected_scans: # Each scan should be in the comboBox
# assert scan_control.comboBox_scan_selection.findText(scan) != -1
#
#
# @pytest.mark.parametrize(
# "scan_name", ["line_scan", "grid_scan"]
# ) # TODO now only for line_scan and grid_scan, later for all loaded scans
# def test_on_scan_selected(scan_control, scan_name):
# # Expected scan info from the message signature
# expected_scan_info = available_scans_message.resource[scan_name]
#
# # Select a scan from the comboBox
# scan_control.comboBox_scan_selection.setCurrentText(scan_name)
#
# # Check labels and widgets in args table
# for index, (arg_key, arg_value) in enumerate(expected_scan_info["arg_input"].items()):
# label = scan_control.args_table.horizontalHeaderItem(index)
# assert label.text().lower() == arg_key # labes
#
# for row in range(expected_scan_info["arg_bundle_size"]["min"]):
# widget = scan_control.args_table.cellWidget(row, index)
# assert widget is not None # Confirm that a widget exists
# expected_widget_type = scan_control.WIDGET_HANDLER.get(arg_value, None)
# assert isinstance(widget, expected_widget_type) # Confirm the widget type matches
#
# # kwargs
# kwargs_from_signature = [
# param for param in expected_scan_info["signature"] if param["kind"] == "KEYWORD_ONLY"
# ]
#
# # Check labels and widgets in kwargs grid layout
# for index, kwarg_info in enumerate(kwargs_from_signature):
# label_widget = scan_control.kwargs_layout.itemAtPosition(1, index).widget()
# assert label_widget.text() == kwarg_info["name"].capitalize()
# widget = scan_control.kwargs_layout.itemAtPosition(2, index).widget()
# expected_widget_type = scan_control.WIDGET_HANDLER.get(kwarg_info["annotation"], QLineEdit)
# assert isinstance(widget, expected_widget_type)
#
#
# @pytest.mark.parametrize("scan_name", ["line_scan", "grid_scan"])
# def test_add_remove_bundle(scan_control, scan_name):
# # Expected scan info from the message signature
# expected_scan_info = available_scans_message.resource[scan_name]
#
# # Select a scan from the comboBox
# scan_control.comboBox_scan_selection.setCurrentText(scan_name)
#
# # Initial number of args row
# initial_num_of_rows = scan_control.args_table.rowCount()
#
# # Check initial row count of args table
# assert scan_control.args_table.rowCount() == expected_scan_info["arg_bundle_size"]["min"]
#
# # Try to remove default number of args row
# scan_control.pushButton_remove_bundle.click()
# assert scan_control.args_table.rowCount() == expected_scan_info["arg_bundle_size"]["min"]
#
# # Try to add two bundles
# scan_control.pushButton_add_bundle.click()
# scan_control.pushButton_add_bundle.click()
#
# # check the case where no max number of args are defined
# # TODO do check also for the case where max number of args are defined
# if expected_scan_info["arg_bundle_size"]["max"] is None:
# assert scan_control.args_table.rowCount() == initial_num_of_rows + 2
#
# # Remove one bundle
# scan_control.pushButton_remove_bundle.click()
#
# # check the case where no max number of args are defined
# if expected_scan_info["arg_bundle_size"]["max"] is None:
# assert scan_control.args_table.rowCount() == initial_num_of_rows + 1
#
#
# def test_run_line_scan_with_parameters(scan_control, mocked_client):
# scan_name = "line_scan"
# kwargs = {"exp_time": 0.1, "steps": 10, "relative": True, "burst_at_each_point": 1}
# args = {"device": "samx", "start": -5, "stop": 5}
#
# # Select a scan from the comboBox
# scan_control.comboBox_scan_selection.setCurrentText(scan_name)
#
# # Set kwargs in the UI
# for label_index in range(
# scan_control.kwargs_layout.rowCount() + 1
# ): # from some reason rowCount() returns 1 less than the actual number of rows
# label_item = scan_control.kwargs_layout.itemAtPosition(1, label_index)
# if label_item:
# label_widget = label_item.widget()
# kwarg_key = WidgetIO.get_value(label_widget).lower()
# if kwarg_key in kwargs:
# widget_item = scan_control.kwargs_layout.itemAtPosition(2, label_index)
# if widget_item:
# widget = widget_item.widget()
# WidgetIO.set_value(widget, kwargs[kwarg_key])
#
# # Set args in the UI
# for col_index in range(scan_control.args_table.columnCount()):
# header_item = scan_control.args_table.horizontalHeaderItem(col_index)
# if header_item:
# arg_key = header_item.text().lower()
# if arg_key in args:
# for row_index in range(scan_control.args_table.rowCount()):
# widget = scan_control.args_table.cellWidget(row_index, col_index)
# WidgetIO.set_value(widget, args[arg_key])
#
# # Mock the scan function
# mocked_scan_function = MagicMock()
# setattr(mocked_client.scans, scan_name, mocked_scan_function)
#
# # Run the scan
# scan_control.button_run_scan.click()
#
# # Retrieve the actual arguments passed to the mock
# called_args, called_kwargs = mocked_scan_function.call_args
#
# # Check if the scan function was called correctly
# expected_device = (
# mocked_client.device_manager.devices.samx
# ) # This is the FakePositioner instance
# expected_args_list = [expected_device, args["start"], args["stop"]]
# assert called_args == tuple(
# expected_args_list
# ), "The positional arguments passed to the scan function do not match expected values."
# assert (
# called_kwargs == kwargs
# ), "The keyword arguments passed to the scan function do not match expected values."