diff --git a/bec_widgets/widgets/control/scan_control/scan_control.py b/bec_widgets/widgets/control/scan_control/scan_control.py index c0bc1e24..2c6e57be 100644 --- a/bec_widgets/widgets/control/scan_control/scan_control.py +++ b/bec_widgets/widgets/control/scan_control/scan_control.py @@ -25,6 +25,7 @@ from bec_widgets.utils.colors import apply_theme, get_accent_colors from bec_widgets.utils.error_popups import SafeProperty, SafeSlot from bec_widgets.widgets.control.buttons.stop_button.stop_button import StopButton from bec_widgets.widgets.control.scan_control.scan_group_box import ScanGroupBox +from bec_widgets.widgets.control.scan_control.scan_info_adapter import ScanInfoAdapter from bec_widgets.widgets.editors.scan_metadata.scan_metadata import ScanMetadata from bec_widgets.widgets.utility.toggle.toggle import ToggleSwitch @@ -97,6 +98,7 @@ class ScanControl(BECWidget, QWidget): self._hide_scan_control_buttons = False self._hide_metadata = False self._hide_scan_selection_combobox = False + self._scan_info_adapter = ScanInfoAdapter() # Create and set main layout self._init_UI() @@ -195,7 +197,8 @@ class ScanControl(BECWidget, QWidget): allowed_scans = [ scan_name for scan_name, scan_info in self.available_scans.items() - if scan_info["base_class"] in supported_scans and len(scan_info["gui_config"]) > 0 + if scan_info["base_class"] in supported_scans + and self._scan_info_adapter.has_scan_ui_config(scan_info) ] else: @@ -390,14 +393,14 @@ class ScanControl(BECWidget, QWidget): self.reset_layout() selected_scan_info = self.available_scans.get(scan_name, {}) - gui_config = selected_scan_info.get("gui_config", {}) - self.arg_group = gui_config.get("arg_group", None) - self.kwarg_groups = gui_config.get("kwarg_groups", None) + gui_config = self._scan_info_adapter.build_scan_ui_config(selected_scan_info) + arg_group = gui_config.get("arg_group", None) + kwarg_groups = gui_config.get("kwarg_groups", []) - if bool(self.arg_group["arg_inputs"]): - self.add_arg_group(self.arg_group) - if len(self.kwarg_groups) > 0: - self.add_kwargs_boxes(self.kwarg_groups) + if arg_group and bool(arg_group.get("arg_inputs")): + self.add_arg_group(arg_group) + if kwarg_groups: + self.add_kwargs_boxes(kwarg_groups) self.update() self.adjustSize() diff --git a/bec_widgets/widgets/control/scan_control/scan_group_box.py b/bec_widgets/widgets/control/scan_control/scan_group_box.py index 46f07f8d..738c6b0a 100644 --- a/bec_widgets/widgets/control/scan_control/scan_group_box.py +++ b/bec_widgets/widgets/control/scan_control/scan_group_box.py @@ -209,6 +209,8 @@ class ScanGroupBox(QGroupBox): self.labels = [] self.widgets = [] + self._widget_configs = {} + self._column_labels = {} self.selected_devices = {} self.init_box(self.config) @@ -247,6 +249,7 @@ class ScanGroupBox(QGroupBox): label = QLabel(text=display_name) self.layout.addWidget(label, row, column_index) self.labels.append(label) + self._column_labels[column_index] = label def add_input_widgets(self, group_inputs: dict, row) -> None: """ @@ -272,21 +275,30 @@ class ScanGroupBox(QGroupBox): if default == "_empty": default = None widget = widget_class(parent=self.parent(), arg_name=arg_name, default=default) + self._apply_numeric_precision(widget, item) + self._apply_numeric_limits(widget, item) if isinstance(widget, DeviceLineEdit): widget.set_device_filter(BECDeviceFilter.DEVICE) self.selected_devices[widget] = "" widget.device_selected.connect(self.emit_device_selected) + widget.textChanged.connect( + lambda text, device_widget=widget: self._handle_device_text_changed( + device_widget, text + ) + ) if isinstance(widget, ScanLiteralsComboBox): widget.set_literals(item["type"].get("Literal", [])) - tooltip = item.get("tooltip", None) - if tooltip is not None: - widget.setToolTip(item["tooltip"]) + self._widget_configs[widget] = item + self._apply_unit_metadata(widget, item) self.layout.addWidget(widget, row, column_index) self.widgets.append(widget) @Slot(str) def emit_device_selected(self, device_name): - self.selected_devices[self.sender()] = device_name.strip() + sender = self.sender() + self.selected_devices[sender] = device_name.strip() + if isinstance(sender, DeviceLineEdit): + self._update_reference_units(sender, self._device_units(sender.get_current_device())) selected_devices_str = " ".join(self.selected_devices.values()) self.device_selected.emit(selected_devices_str) @@ -313,6 +325,7 @@ class ScanGroupBox(QGroupBox): for widget in self.widgets[-len(self.inputs) :]: if isinstance(widget, DeviceLineEdit): self.selected_devices[widget] = "" + self._widget_configs.pop(widget, None) widget.close() widget.deleteLater() self.widgets = self.widgets[: -len(self.inputs)] @@ -325,6 +338,7 @@ class ScanGroupBox(QGroupBox): for widget in list(self.widgets): if isinstance(widget, DeviceLineEdit): self.selected_devices.pop(widget, None) + self._widget_configs.pop(widget, None) widget.close() widget.deleteLater() self.layout.removeWidget(widget) @@ -423,3 +437,119 @@ class ScanGroupBox(QGroupBox): if widget.arg_name == key: WidgetIO.set_value(widget, value) break + + @staticmethod + def _unit_tooltip(item: dict, units: str | None = None) -> str | None: + tooltip = item.get("tooltip", None) + reference_units = item.get("reference_units", None) + units = units or item.get("units", None) + tooltip_parts = [tooltip] if tooltip else [] + if units: + tooltip_parts.append(f"Units: {units}") + elif reference_units: + tooltip_parts.append(f"Units from: {reference_units}") + if tooltip_parts: + return "\n".join(tooltip_parts) + return None + + def _apply_unit_metadata(self, widget, item: dict, units: str | None = None) -> None: + units = units or item.get("units", None) + tooltip = self._unit_tooltip(item, units) + widget.setToolTip(tooltip or "") + if hasattr(widget, "setSuffix"): + widget.setSuffix(f" {units}" if units else "") + + def _refresh_column_label(self, column: int, item: dict) -> None: + if column not in self._column_labels: + return + self._column_labels[column].setText(item.get("display_name", item.get("name", None))) + + @staticmethod + def _device_units(device) -> str | None: + egu = getattr(device, "egu", None) + if not callable(egu): + return None + try: + return egu() + except Exception: + logger.exception("Failed to fetch engineering units from device %s", device) + return None + + def _widget_position(self, widget) -> tuple[int, int] | None: + for row in range(self.layout.rowCount()): + for column in range(self.layout.columnCount()): + item = self.layout.itemAtPosition(row, column) + if item is not None and item.widget() is widget: + return row, column + return None + + def _update_reference_units(self, device_widget: DeviceLineEdit, units: str | None) -> None: + position = self._widget_position(device_widget) + if position is None: + return + source_row, _ = position + source_name = device_widget.arg_name + + for widget in self.widgets: + item = self._widget_configs.get(widget, {}) + if item.get("reference_units") != source_name: + continue + widget_position = self._widget_position(widget) + if widget_position is None: + continue + row, column = widget_position + if self.box_type == "args" and row != source_row: + continue + self._apply_unit_metadata(widget, item, units) + self._refresh_column_label(column, item) + + def _handle_device_text_changed(self, device_widget: DeviceLineEdit, device_name: str) -> None: + if not device_widget.validate_device(device_name): + self.selected_devices[device_widget] = "" + self._update_reference_units(device_widget, None) + + @staticmethod + def _apply_numeric_precision(widget: ScanDoubleSpinBox, item: dict) -> None: + if not isinstance(widget, ScanDoubleSpinBox): + return + + precision = item.get("precision") + if precision is None: + return + + try: + widget.setDecimals(max(0, int(precision))) + except (TypeError, ValueError): + logger.warning( + "Ignoring invalid precision %r for parameter %s", precision, item.get("name") + ) + + @staticmethod + def _apply_numeric_limits(widget: ScanDoubleSpinBox | ScanSpinBox, item: dict) -> None: + if isinstance(widget, ScanSpinBox): + minimum = -2147483647 # largest int which qt allows + maximum = 2147483647 + if item.get("ge") is not None: + minimum = int(item["ge"]) + if item.get("gt") is not None: + minimum = int(item["gt"]) + 1 + if item.get("le") is not None: + maximum = int(item["le"]) + if item.get("lt") is not None: + maximum = int(item["lt"]) - 1 + widget.setRange(minimum, maximum) + return + + if isinstance(widget, ScanDoubleSpinBox): + minimum = -float("inf") + maximum = float("inf") + step = 10 ** (-widget.decimals()) + if item.get("ge") is not None: + minimum = float(item["ge"]) + if item.get("gt") is not None: + minimum = float(item["gt"]) + step + if item.get("le") is not None: + maximum = float(item["le"]) + if item.get("lt") is not None: + maximum = float(item["lt"]) - step + widget.setRange(minimum, maximum) diff --git a/bec_widgets/widgets/control/scan_control/scan_info_adapter.py b/bec_widgets/widgets/control/scan_control/scan_info_adapter.py new file mode 100644 index 00000000..5e082a48 --- /dev/null +++ b/bec_widgets/widgets/control/scan_control/scan_info_adapter.py @@ -0,0 +1,243 @@ +"""Helpers for translating BEC scan metadata into ScanControl UI configuration.""" + +from __future__ import annotations + +import re +from typing import Any + +AnnotationValue = str | dict[str, Any] | list[Any] | None +ScanArgumentMetadata = dict[str, Any] +SignatureEntry = dict[str, Any] +ScanInputConfig = dict[str, Any] +ScanInfo = dict[str, Any] +ScanUIConfig = dict[str, Any] + + +class ScanInfoAdapter: + """Normalize available-scan payloads into the structure consumed by ``ScanControl``.""" + + @staticmethod + def has_scan_ui_config(scan_info: ScanInfo) -> bool: + """Check whether a scan exposes enough metadata to build a UI. + + Args: + scan_info (ScanInfo): Available-scan payload for one scan. + + Returns: + bool: ``True`` when a supported GUI metadata field is present. + """ + return bool(scan_info.get("gui_visibility") or scan_info.get("gui_config")) + + @staticmethod + def format_display_name(name: str) -> str: + """Convert a parameter name into a user-facing label. + + Args: + name (str): Raw parameter name. + + Returns: + str: Formatted display label such as ``Exp Time``. + """ + parts = re.split(r"(_|\d+)", name) + return " ".join(part.capitalize() for part in parts if part.isalnum()).strip() + + @staticmethod + def resolve_tooltip(scan_argument: ScanArgumentMetadata) -> str | None: + """Resolve the tooltip text from parsed ``ScanArgument`` metadata. + + Args: + scan_argument (ScanArgumentMetadata): Parsed ``ScanArgument`` metadata. + + Returns: + str | None: Explicit tooltip text if provided, otherwise the description fallback. + """ + return scan_argument.get("tooltip") or scan_argument.get("description") + + @staticmethod + def parse_annotation( + annotation: AnnotationValue, + ) -> tuple[AnnotationValue, ScanArgumentMetadata]: + """Extract the serialized base annotation and ``ScanArgument`` metadata. + + Args: + annotation (AnnotationValue): Serialized annotation payload from BEC. + + Returns: + tuple[AnnotationValue, ScanArgumentMetadata]: The unwrapped annotation and parsed + ``ScanArgument`` metadata. + """ + scan_argument: ScanArgumentMetadata = {} + if isinstance(annotation, list): + annotation = next( + (entry for entry in annotation if entry != "NoneType"), + annotation[0] if annotation else "_empty", + ) + if isinstance(annotation, dict) and "Annotated" in annotation: + annotated = annotation["Annotated"] + annotation = annotated.get("type", "_empty") + scan_argument = annotated.get("metadata", {}).get("ScanArgument", {}) or {} + return annotation, scan_argument + + @staticmethod + def scan_arg_type_from_annotation(annotation: AnnotationValue) -> AnnotationValue: + """Normalize an annotation value to the widget type expected by ``ScanControl``. + + Args: + annotation (AnnotationValue): Serialized or parsed annotation value. + + Returns: + AnnotationValue: The normalized type identifier used by the widget layer. + """ + if isinstance(annotation, dict): + return annotation + if annotation in ("_empty", None): + return "str" + return annotation + + def scan_input_from_signature( + self, param: SignatureEntry, arg: bool = False + ) -> ScanInputConfig: + """Build one ScanControl input description from a signature entry. + + Args: + param (SignatureEntry): Serialized signature entry. + arg (bool): Whether the parameter belongs to the positional arg bundle. + + Returns: + ScanInputConfig: Normalized input configuration for ``ScanControl``. + """ + annotation, scan_argument = self.parse_annotation(param.get("annotation")) + return self._build_scan_input( + name=param["name"], + annotation=annotation, + scan_argument=scan_argument, + arg=arg, + default=None if arg else param.get("default", None), + ) + + def scan_input_from_arg_input( + self, name: str, item_type: AnnotationValue, signature_by_name: dict[str, SignatureEntry] + ) -> ScanInputConfig: + """Build one arg-bundle input description from ``arg_input`` metadata. + + Args: + name (str): Argument name from ``arg_input``. + item_type (AnnotationValue): Serialized argument type from ``arg_input``. + signature_by_name (dict[str, SignatureEntry]): Signature entries indexed by + parameter name. + + Returns: + ScanInputConfig: Normalized input configuration for one arg-bundle field. + """ + if name in signature_by_name: + scan_input = self.scan_input_from_signature(signature_by_name[name], arg=True) + scan_input["type"] = self.scan_arg_type_from_annotation( + self.parse_annotation(signature_by_name[name].get("annotation"))[0] + ) + else: + annotation, scan_argument = self.parse_annotation(item_type) + scan_input = self._build_scan_input( + name=name, + annotation=annotation, + scan_argument=scan_argument, + arg=True, + default=None, + ) + if scan_input["type"] in ("_empty", None): + scan_input["type"] = item_type + return scan_input + + def _build_scan_input( + self, + name: str, + annotation: AnnotationValue, + scan_argument: ScanArgumentMetadata, + *, + arg: bool, + default: Any, + ) -> ScanInputConfig: + """Build one normalized ScanControl input configuration. + + Args: + name (str): Parameter name. + annotation (AnnotationValue): Parsed annotation value. + scan_argument (ScanArgumentMetadata): Parsed ``ScanArgument`` metadata. + arg (bool): Whether the parameter belongs to the positional arg bundle. + default (Any): Default value for the parameter. + + Returns: + ScanInputConfig: Normalized input configuration. + """ + return { + "arg": arg, + "name": name, + "type": self.scan_arg_type_from_annotation(annotation), + "display_name": scan_argument.get("display_name") or self.format_display_name(name), + "tooltip": self.resolve_tooltip(scan_argument), + "default": default, + "expert": scan_argument.get("expert", False), + "precision": scan_argument.get("precision"), + "units": scan_argument.get("units"), + "reference_units": scan_argument.get("reference_units"), + "gt": scan_argument.get("gt"), + "ge": scan_argument.get("ge"), + "lt": scan_argument.get("lt"), + "le": scan_argument.get("le"), + "alternative_group": scan_argument.get("alternative_group"), + } + + def build_scan_ui_config(self, scan_info: ScanInfo) -> ScanUIConfig: + """Normalize one available-scan entry into the widget UI configuration. + + Args: + scan_info (ScanInfo): Available-scan payload for one scan. + + Returns: + ScanUIConfig: Legacy group structure consumed by ``ScanControl`` and + ``ScanGroupBox``. + """ + gui_visualization = ( + scan_info.get("gui_visualization") or scan_info.get("gui_visibility") or {} + ) + if not gui_visualization and scan_info.get("gui_config"): + return scan_info["gui_config"] + + signature = scan_info.get("signature", []) + signature_by_name = {entry["name"]: entry for entry in signature} + + arg_group = None + arg_input = scan_info.get("arg_input", {}) + if isinstance(arg_input, dict) and arg_input: + bundle_size = scan_info.get("arg_bundle_size", {}) + inputs = [ + self.scan_input_from_arg_input(name, item_type, signature_by_name) + for name, item_type in arg_input.items() + ] + arg_group = { + "name": "Scan Arguments", + "bundle": bundle_size.get("bundle"), + "arg_inputs": arg_input, + "inputs": inputs, + "min": bundle_size.get("min"), + "max": bundle_size.get("max"), + } + + kwarg_groups = [] + arg_names = set(arg_input) if isinstance(arg_input, dict) else set() + for group_name, input_names in gui_visualization.items(): + inputs = [] + for input_name in input_names: + if input_name in arg_names or input_name not in signature_by_name: + continue + param = signature_by_name[input_name] + if param.get("kind") in ("VAR_POSITIONAL", "VAR_KEYWORD"): + continue + inputs.append(self.scan_input_from_signature(param)) + if inputs: + kwarg_groups.append({"name": group_name, "inputs": inputs}) + + return { + "scan_class_name": scan_info.get("class"), + "arg_group": arg_group, + "kwarg_groups": kwarg_groups, + } diff --git a/tests/unit_tests/test_scan_control.py b/tests/unit_tests/test_scan_control.py index d070e5e2..bd43abac 100644 --- a/tests/unit_tests/test_scan_control.py +++ b/tests/unit_tests/test_scan_control.py @@ -277,6 +277,133 @@ def test_populate_scans(scan_control, mocked_client): assert sorted(items) == sorted(expected_scans) +def test_scan_control_uses_gui_visualization_and_signature(qtbot, mocked_client): + scan_info = { + "class": "AnnotatedScan", + "base_class": "ScanBase", + "arg_input": { + "device": "DeviceBase", + "start": { + "Annotated": { + "type": "float", + "metadata": { + "ScanArgument": { + "display_name": "Start Position", + "description": "Start position", + "tooltip": "Custom start tooltip", + "expert": False, + "alternative_group": None, + "units": None, + "reference_units": "device", + } + }, + } + }, + "stop": { + "Annotated": { + "type": "float", + "metadata": { + "ScanArgument": { + "display_name": None, + "description": "Stop position", + "tooltip": None, + "expert": False, + "alternative_group": None, + "units": None, + "reference_units": "device", + } + }, + } + }, + }, + "arg_bundle_size": {"bundle": 3, "min": 1, "max": None}, + "gui_visualization": { + "Movement Parameters": ["steps", "step_size"], + "Acquisition Parameters": ["exp_time", "relative"], + }, + "required_kwargs": [], + "signature": [ + {"name": "args", "kind": "VAR_POSITIONAL", "default": "_empty", "annotation": "_empty"}, + {"name": "steps", "kind": "KEYWORD_ONLY", "default": 10, "annotation": "int"}, + { + "name": "step_size", + "kind": "KEYWORD_ONLY", + "default": None, + "annotation": { + "Annotated": { + "type": "float", + "metadata": { + "ScanArgument": { + "display_name": "Step Size Custom", + "description": "Step size", + "tooltip": "Custom step tooltip", + "expert": False, + "alternative_group": "scan_resolution", + "units": "mm", + "reference_units": None, + } + }, + } + }, + }, + { + "name": "exp_time", + "kind": "KEYWORD_ONLY", + "default": 0, + "annotation": { + "Annotated": { + "type": "float", + "metadata": { + "ScanArgument": { + "display_name": None, + "description": None, + "tooltip": "Exposure time", + "expert": False, + "alternative_group": None, + "units": "s", + "reference_units": None, + } + }, + } + }, + }, + {"name": "relative", "kind": "KEYWORD_ONLY", "default": False, "annotation": "bool"}, + {"name": "kwargs", "kind": "VAR_KEYWORD", "default": "_empty", "annotation": "_empty"}, + ], + } + mocked_client.connector.set( + MessageEndpoints.available_scans(), + AvailableResourceMessage(resource={"annotated_scan": scan_info}), + ) + + widget = ScanControl(client=mocked_client) + qtbot.addWidget(widget) + qtbot.waitExposed(widget) + widget.comboBox_scan_selection.setCurrentText("annotated_scan") + + assert widget.comboBox_scan_selection.count() == 1 + assert widget.arg_box.layout.itemAtPosition(0, 1).widget().text() == "Start Position" + assert widget.arg_box.widgets[1].toolTip() == "Custom start tooltip\nUnits from: device" + with patch.object(mocked_client.device_manager.devices.samx, "egu", return_value="mm"): + WidgetIO.set_value(widget.arg_box.widgets[0], "samx") + assert widget.arg_box.layout.itemAtPosition(0, 1).widget().text() == "Start Position" + assert widget.arg_box.widgets[1].suffix() == " mm" + assert widget.arg_box.widgets[1].toolTip() == "Custom start tooltip\nUnits: mm" + widget.arg_box.widgets[0].setText("not_a_device") + assert widget.arg_box.layout.itemAtPosition(0, 1).widget().text() == "Start" + assert widget.arg_box.widgets[1].suffix() == "" + assert widget.arg_box.widgets[1].toolTip() == "Custom start tooltip\nUnits from: device" + assert [box.title() for box in widget.kwarg_boxes] == [ + "Movement Parameters", + "Acquisition Parameters", + ] + assert widget.kwarg_boxes[0].layout.itemAtPosition(0, 1).widget().text() == "Step Size Custom" + assert widget.kwarg_boxes[0].widgets[1].suffix() == " mm" + assert widget.kwarg_boxes[0].widgets[1].toolTip() == "Custom step tooltip\nUnits: mm" + assert widget.kwarg_boxes[1].layout.itemAtPosition(0, 0).widget().text() == "Exp Time" + assert widget.kwarg_boxes[1].widgets[0].toolTip() == "Exposure time\nUnits: s" + + def test_current_scan(scan_control, mocked_client): current_scan = scan_control.current_scan wrong_scan = "error_scan" diff --git a/tests/unit_tests/test_scan_control_group_box.py b/tests/unit_tests/test_scan_control_group_box.py index 15096c4e..15264ef3 100644 --- a/tests/unit_tests/test_scan_control_group_box.py +++ b/tests/unit_tests/test_scan_control_group_box.py @@ -157,3 +157,81 @@ def test_arg_box(qtbot): # Widget 2 assert arg_box.widgets[2].__class__.__name__ == "ScanSpinBox" assert arg_box.widgets[2].arg_name + + +def test_spinbox_limits_from_scan_info(qtbot): + group_input = { + "name": "Kwarg Test", + "inputs": [ + { + "arg": False, + "name": "exp_time", + "type": "float", + "display_name": "Exp Time", + "tooltip": "Exposure time in seconds", + "default": 2.0, + "expert": False, + "precision": 3, + "gt": 1.5, + "ge": None, + "lt": 5.0, + "le": None, + }, + { + "arg": False, + "name": "num_points", + "type": "int", + "display_name": "Num Points", + "tooltip": "Number of points", + "default": 4, + "expert": False, + "gt": None, + "ge": 3, + "lt": 9, + "le": None, + }, + { + "arg": False, + "name": "settling_time", + "type": "float", + "display_name": "Settling Time", + "tooltip": "Settling time in seconds", + "default": 0.5, + "expert": False, + "gt": None, + "ge": 0.2, + "lt": None, + "le": 3.5, + }, + { + "arg": False, + "name": "steps", + "type": "int", + "display_name": "Steps", + "tooltip": "Number of steps", + "default": 4, + "expert": False, + "gt": 0, + "ge": None, + "lt": None, + "le": 10, + }, + ], + } + + kwarg_box = ScanGroupBox(box_type="kwargs", config=group_input) + + exp_time = kwarg_box.widgets[0] + num_points = kwarg_box.widgets[1] + settling_time = kwarg_box.widgets[2] + steps = kwarg_box.widgets[3] + + assert exp_time.decimals() == 3 + assert exp_time.minimum() == 1.501 + assert exp_time.maximum() == 4.999 + assert num_points.minimum() == 3 + assert num_points.maximum() == 8 + assert settling_time.minimum() == 0.2 + assert settling_time.maximum() == 3.5 + assert steps.minimum() == 1 + assert steps.maximum() == 10