1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-10 18:50:55 +02:00

Compare commits

...

3 Commits

Author SHA1 Message Date
8d2663c09b f 2026-04-08 17:23:08 +02:00
bf5fc6460b f 2026-04-08 16:36:49 +02:00
50d5c0460f feat: add support for new scan signatures including units 2026-04-08 11:07:50 +02:00
5 changed files with 593 additions and 12 deletions

View File

@@ -25,6 +25,7 @@ from bec_widgets.utils.colors import apply_theme, get_accent_colors
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.widgets.control.buttons.stop_button.stop_button import StopButton
from bec_widgets.widgets.control.scan_control.scan_group_box import ScanGroupBox
from bec_widgets.widgets.control.scan_control.scan_info_adapter import ScanInfoAdapter
from bec_widgets.widgets.editors.scan_metadata.scan_metadata import ScanMetadata
from bec_widgets.widgets.utility.toggle.toggle import ToggleSwitch
@@ -97,6 +98,7 @@ class ScanControl(BECWidget, QWidget):
self._hide_scan_control_buttons = False
self._hide_metadata = False
self._hide_scan_selection_combobox = False
self._scan_info_adapter = ScanInfoAdapter()
# Create and set main layout
self._init_UI()
@@ -195,7 +197,8 @@ class ScanControl(BECWidget, QWidget):
allowed_scans = [
scan_name
for scan_name, scan_info in self.available_scans.items()
if scan_info["base_class"] in supported_scans and len(scan_info["gui_config"]) > 0
if scan_info["base_class"] in supported_scans
and self._scan_info_adapter.has_scan_ui_config(scan_info)
]
else:
@@ -390,14 +393,14 @@ class ScanControl(BECWidget, QWidget):
self.reset_layout()
selected_scan_info = self.available_scans.get(scan_name, {})
gui_config = selected_scan_info.get("gui_config", {})
self.arg_group = gui_config.get("arg_group", None)
self.kwarg_groups = gui_config.get("kwarg_groups", None)
gui_config = self._scan_info_adapter.build_scan_ui_config(selected_scan_info)
arg_group = gui_config.get("arg_group", None)
kwarg_groups = gui_config.get("kwarg_groups", [])
if bool(self.arg_group["arg_inputs"]):
self.add_arg_group(self.arg_group)
if len(self.kwarg_groups) > 0:
self.add_kwargs_boxes(self.kwarg_groups)
if arg_group and bool(arg_group.get("arg_inputs")):
self.add_arg_group(arg_group)
if kwarg_groups:
self.add_kwargs_boxes(kwarg_groups)
self.update()
self.adjustSize()

View File

@@ -209,6 +209,8 @@ class ScanGroupBox(QGroupBox):
self.labels = []
self.widgets = []
self._widget_configs = {}
self._column_labels = {}
self.selected_devices = {}
self.init_box(self.config)
@@ -247,6 +249,7 @@ class ScanGroupBox(QGroupBox):
label = QLabel(text=display_name)
self.layout.addWidget(label, row, column_index)
self.labels.append(label)
self._column_labels[column_index] = label
def add_input_widgets(self, group_inputs: dict, row) -> None:
"""
@@ -272,21 +275,30 @@ class ScanGroupBox(QGroupBox):
if default == "_empty":
default = None
widget = widget_class(parent=self.parent(), arg_name=arg_name, default=default)
self._apply_numeric_precision(widget, item)
self._apply_numeric_limits(widget, item)
if isinstance(widget, DeviceLineEdit):
widget.set_device_filter(BECDeviceFilter.DEVICE)
self.selected_devices[widget] = ""
widget.device_selected.connect(self.emit_device_selected)
widget.textChanged.connect(
lambda text, device_widget=widget: self._handle_device_text_changed(
device_widget, text
)
)
if isinstance(widget, ScanLiteralsComboBox):
widget.set_literals(item["type"].get("Literal", []))
tooltip = item.get("tooltip", None)
if tooltip is not None:
widget.setToolTip(item["tooltip"])
self._widget_configs[widget] = item
self._apply_unit_metadata(widget, item)
self.layout.addWidget(widget, row, column_index)
self.widgets.append(widget)
@Slot(str)
def emit_device_selected(self, device_name):
self.selected_devices[self.sender()] = device_name.strip()
sender = self.sender()
self.selected_devices[sender] = device_name.strip()
if isinstance(sender, DeviceLineEdit):
self._update_reference_units(sender, self._device_units(sender.get_current_device()))
selected_devices_str = " ".join(self.selected_devices.values())
self.device_selected.emit(selected_devices_str)
@@ -313,6 +325,7 @@ class ScanGroupBox(QGroupBox):
for widget in self.widgets[-len(self.inputs) :]:
if isinstance(widget, DeviceLineEdit):
self.selected_devices[widget] = ""
self._widget_configs.pop(widget, None)
widget.close()
widget.deleteLater()
self.widgets = self.widgets[: -len(self.inputs)]
@@ -325,6 +338,7 @@ class ScanGroupBox(QGroupBox):
for widget in list(self.widgets):
if isinstance(widget, DeviceLineEdit):
self.selected_devices.pop(widget, None)
self._widget_configs.pop(widget, None)
widget.close()
widget.deleteLater()
self.layout.removeWidget(widget)
@@ -423,3 +437,119 @@ class ScanGroupBox(QGroupBox):
if widget.arg_name == key:
WidgetIO.set_value(widget, value)
break
@staticmethod
def _unit_tooltip(item: dict, units: str | None = None) -> str | None:
tooltip = item.get("tooltip", None)
reference_units = item.get("reference_units", None)
units = units or item.get("units", None)
tooltip_parts = [tooltip] if tooltip else []
if units:
tooltip_parts.append(f"Units: {units}")
elif reference_units:
tooltip_parts.append(f"Units from: {reference_units}")
if tooltip_parts:
return "\n".join(tooltip_parts)
return None
def _apply_unit_metadata(self, widget, item: dict, units: str | None = None) -> None:
units = units or item.get("units", None)
tooltip = self._unit_tooltip(item, units)
widget.setToolTip(tooltip or "")
if hasattr(widget, "setSuffix"):
widget.setSuffix(f" {units}" if units else "")
def _refresh_column_label(self, column: int, item: dict) -> None:
if column not in self._column_labels:
return
self._column_labels[column].setText(item.get("display_name", item.get("name", None)))
@staticmethod
def _device_units(device) -> str | None:
egu = getattr(device, "egu", None)
if not callable(egu):
return None
try:
return egu()
except Exception:
logger.exception("Failed to fetch engineering units from device %s", device)
return None
def _widget_position(self, widget) -> tuple[int, int] | None:
for row in range(self.layout.rowCount()):
for column in range(self.layout.columnCount()):
item = self.layout.itemAtPosition(row, column)
if item is not None and item.widget() is widget:
return row, column
return None
def _update_reference_units(self, device_widget: DeviceLineEdit, units: str | None) -> None:
position = self._widget_position(device_widget)
if position is None:
return
source_row, _ = position
source_name = device_widget.arg_name
for widget in self.widgets:
item = self._widget_configs.get(widget, {})
if item.get("reference_units") != source_name:
continue
widget_position = self._widget_position(widget)
if widget_position is None:
continue
row, column = widget_position
if self.box_type == "args" and row != source_row:
continue
self._apply_unit_metadata(widget, item, units)
self._refresh_column_label(column, item)
def _handle_device_text_changed(self, device_widget: DeviceLineEdit, device_name: str) -> None:
if not device_widget.validate_device(device_name):
self.selected_devices[device_widget] = ""
self._update_reference_units(device_widget, None)
@staticmethod
def _apply_numeric_precision(widget: ScanDoubleSpinBox, item: dict) -> None:
if not isinstance(widget, ScanDoubleSpinBox):
return
precision = item.get("precision")
if precision is None:
return
try:
widget.setDecimals(max(0, int(precision)))
except (TypeError, ValueError):
logger.warning(
"Ignoring invalid precision %r for parameter %s", precision, item.get("name")
)
@staticmethod
def _apply_numeric_limits(widget: ScanDoubleSpinBox | ScanSpinBox, item: dict) -> None:
if isinstance(widget, ScanSpinBox):
minimum = -2147483647 # largest int which qt allows
maximum = 2147483647
if item.get("ge") is not None:
minimum = int(item["ge"])
if item.get("gt") is not None:
minimum = int(item["gt"]) + 1
if item.get("le") is not None:
maximum = int(item["le"])
if item.get("lt") is not None:
maximum = int(item["lt"]) - 1
widget.setRange(minimum, maximum)
return
if isinstance(widget, ScanDoubleSpinBox):
minimum = -float("inf")
maximum = float("inf")
step = 10 ** (-widget.decimals())
if item.get("ge") is not None:
minimum = float(item["ge"])
if item.get("gt") is not None:
minimum = float(item["gt"]) + step
if item.get("le") is not None:
maximum = float(item["le"])
if item.get("lt") is not None:
maximum = float(item["lt"]) - step
widget.setRange(minimum, maximum)

View File

@@ -0,0 +1,243 @@
"""Helpers for translating BEC scan metadata into ScanControl UI configuration."""
from __future__ import annotations
import re
from typing import Any
AnnotationValue = str | dict[str, Any] | list[Any] | None
ScanArgumentMetadata = dict[str, Any]
SignatureEntry = dict[str, Any]
ScanInputConfig = dict[str, Any]
ScanInfo = dict[str, Any]
ScanUIConfig = dict[str, Any]
class ScanInfoAdapter:
"""Normalize available-scan payloads into the structure consumed by ``ScanControl``."""
@staticmethod
def has_scan_ui_config(scan_info: ScanInfo) -> bool:
"""Check whether a scan exposes enough metadata to build a UI.
Args:
scan_info (ScanInfo): Available-scan payload for one scan.
Returns:
bool: ``True`` when a supported GUI metadata field is present.
"""
return bool(scan_info.get("gui_visibility") or scan_info.get("gui_config"))
@staticmethod
def format_display_name(name: str) -> str:
"""Convert a parameter name into a user-facing label.
Args:
name (str): Raw parameter name.
Returns:
str: Formatted display label such as ``Exp Time``.
"""
parts = re.split(r"(_|\d+)", name)
return " ".join(part.capitalize() for part in parts if part.isalnum()).strip()
@staticmethod
def resolve_tooltip(scan_argument: ScanArgumentMetadata) -> str | None:
"""Resolve the tooltip text from parsed ``ScanArgument`` metadata.
Args:
scan_argument (ScanArgumentMetadata): Parsed ``ScanArgument`` metadata.
Returns:
str | None: Explicit tooltip text if provided, otherwise the description fallback.
"""
return scan_argument.get("tooltip") or scan_argument.get("description")
@staticmethod
def parse_annotation(
annotation: AnnotationValue,
) -> tuple[AnnotationValue, ScanArgumentMetadata]:
"""Extract the serialized base annotation and ``ScanArgument`` metadata.
Args:
annotation (AnnotationValue): Serialized annotation payload from BEC.
Returns:
tuple[AnnotationValue, ScanArgumentMetadata]: The unwrapped annotation and parsed
``ScanArgument`` metadata.
"""
scan_argument: ScanArgumentMetadata = {}
if isinstance(annotation, list):
annotation = next(
(entry for entry in annotation if entry != "NoneType"),
annotation[0] if annotation else "_empty",
)
if isinstance(annotation, dict) and "Annotated" in annotation:
annotated = annotation["Annotated"]
annotation = annotated.get("type", "_empty")
scan_argument = annotated.get("metadata", {}).get("ScanArgument", {}) or {}
return annotation, scan_argument
@staticmethod
def scan_arg_type_from_annotation(annotation: AnnotationValue) -> AnnotationValue:
"""Normalize an annotation value to the widget type expected by ``ScanControl``.
Args:
annotation (AnnotationValue): Serialized or parsed annotation value.
Returns:
AnnotationValue: The normalized type identifier used by the widget layer.
"""
if isinstance(annotation, dict):
return annotation
if annotation in ("_empty", None):
return "str"
return annotation
def scan_input_from_signature(
self, param: SignatureEntry, arg: bool = False
) -> ScanInputConfig:
"""Build one ScanControl input description from a signature entry.
Args:
param (SignatureEntry): Serialized signature entry.
arg (bool): Whether the parameter belongs to the positional arg bundle.
Returns:
ScanInputConfig: Normalized input configuration for ``ScanControl``.
"""
annotation, scan_argument = self.parse_annotation(param.get("annotation"))
return self._build_scan_input(
name=param["name"],
annotation=annotation,
scan_argument=scan_argument,
arg=arg,
default=None if arg else param.get("default", None),
)
def scan_input_from_arg_input(
self, name: str, item_type: AnnotationValue, signature_by_name: dict[str, SignatureEntry]
) -> ScanInputConfig:
"""Build one arg-bundle input description from ``arg_input`` metadata.
Args:
name (str): Argument name from ``arg_input``.
item_type (AnnotationValue): Serialized argument type from ``arg_input``.
signature_by_name (dict[str, SignatureEntry]): Signature entries indexed by
parameter name.
Returns:
ScanInputConfig: Normalized input configuration for one arg-bundle field.
"""
if name in signature_by_name:
scan_input = self.scan_input_from_signature(signature_by_name[name], arg=True)
scan_input["type"] = self.scan_arg_type_from_annotation(
self.parse_annotation(signature_by_name[name].get("annotation"))[0]
)
else:
annotation, scan_argument = self.parse_annotation(item_type)
scan_input = self._build_scan_input(
name=name,
annotation=annotation,
scan_argument=scan_argument,
arg=True,
default=None,
)
if scan_input["type"] in ("_empty", None):
scan_input["type"] = item_type
return scan_input
def _build_scan_input(
self,
name: str,
annotation: AnnotationValue,
scan_argument: ScanArgumentMetadata,
*,
arg: bool,
default: Any,
) -> ScanInputConfig:
"""Build one normalized ScanControl input configuration.
Args:
name (str): Parameter name.
annotation (AnnotationValue): Parsed annotation value.
scan_argument (ScanArgumentMetadata): Parsed ``ScanArgument`` metadata.
arg (bool): Whether the parameter belongs to the positional arg bundle.
default (Any): Default value for the parameter.
Returns:
ScanInputConfig: Normalized input configuration.
"""
return {
"arg": arg,
"name": name,
"type": self.scan_arg_type_from_annotation(annotation),
"display_name": scan_argument.get("display_name") or self.format_display_name(name),
"tooltip": self.resolve_tooltip(scan_argument),
"default": default,
"expert": scan_argument.get("expert", False),
"precision": scan_argument.get("precision"),
"units": scan_argument.get("units"),
"reference_units": scan_argument.get("reference_units"),
"gt": scan_argument.get("gt"),
"ge": scan_argument.get("ge"),
"lt": scan_argument.get("lt"),
"le": scan_argument.get("le"),
"alternative_group": scan_argument.get("alternative_group"),
}
def build_scan_ui_config(self, scan_info: ScanInfo) -> ScanUIConfig:
"""Normalize one available-scan entry into the widget UI configuration.
Args:
scan_info (ScanInfo): Available-scan payload for one scan.
Returns:
ScanUIConfig: Legacy group structure consumed by ``ScanControl`` and
``ScanGroupBox``.
"""
gui_visualization = (
scan_info.get("gui_visualization") or scan_info.get("gui_visibility") or {}
)
if not gui_visualization and scan_info.get("gui_config"):
return scan_info["gui_config"]
signature = scan_info.get("signature", [])
signature_by_name = {entry["name"]: entry for entry in signature}
arg_group = None
arg_input = scan_info.get("arg_input", {})
if isinstance(arg_input, dict) and arg_input:
bundle_size = scan_info.get("arg_bundle_size", {})
inputs = [
self.scan_input_from_arg_input(name, item_type, signature_by_name)
for name, item_type in arg_input.items()
]
arg_group = {
"name": "Scan Arguments",
"bundle": bundle_size.get("bundle"),
"arg_inputs": arg_input,
"inputs": inputs,
"min": bundle_size.get("min"),
"max": bundle_size.get("max"),
}
kwarg_groups = []
arg_names = set(arg_input) if isinstance(arg_input, dict) else set()
for group_name, input_names in gui_visualization.items():
inputs = []
for input_name in input_names:
if input_name in arg_names or input_name not in signature_by_name:
continue
param = signature_by_name[input_name]
if param.get("kind") in ("VAR_POSITIONAL", "VAR_KEYWORD"):
continue
inputs.append(self.scan_input_from_signature(param))
if inputs:
kwarg_groups.append({"name": group_name, "inputs": inputs})
return {
"scan_class_name": scan_info.get("class"),
"arg_group": arg_group,
"kwarg_groups": kwarg_groups,
}

View File

@@ -277,6 +277,133 @@ def test_populate_scans(scan_control, mocked_client):
assert sorted(items) == sorted(expected_scans)
def test_scan_control_uses_gui_visibility_and_signature(qtbot, mocked_client):
scan_info = {
"class": "AnnotatedScan",
"base_class": "ScanBase",
"arg_input": {
"device": "DeviceBase",
"start": {
"Annotated": {
"type": "float",
"metadata": {
"ScanArgument": {
"display_name": "Start Position",
"description": "Start position",
"tooltip": "Custom start tooltip",
"expert": False,
"alternative_group": None,
"units": None,
"reference_units": "device",
}
},
}
},
"stop": {
"Annotated": {
"type": "float",
"metadata": {
"ScanArgument": {
"display_name": None,
"description": "Stop position",
"tooltip": None,
"expert": False,
"alternative_group": None,
"units": None,
"reference_units": "device",
}
},
}
},
},
"arg_bundle_size": {"bundle": 3, "min": 1, "max": None},
"gui_visibility": {
"Movement Parameters": ["steps", "step_size"],
"Acquisition Parameters": ["exp_time", "relative"],
},
"required_kwargs": [],
"signature": [
{"name": "args", "kind": "VAR_POSITIONAL", "default": "_empty", "annotation": "_empty"},
{"name": "steps", "kind": "KEYWORD_ONLY", "default": 10, "annotation": "int"},
{
"name": "step_size",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": {
"Annotated": {
"type": "float",
"metadata": {
"ScanArgument": {
"display_name": "Step Size Custom",
"description": "Step size",
"tooltip": "Custom step tooltip",
"expert": False,
"alternative_group": "scan_resolution",
"units": "mm",
"reference_units": None,
}
},
}
},
},
{
"name": "exp_time",
"kind": "KEYWORD_ONLY",
"default": 0,
"annotation": {
"Annotated": {
"type": "float",
"metadata": {
"ScanArgument": {
"display_name": None,
"description": None,
"tooltip": "Exposure time",
"expert": False,
"alternative_group": None,
"units": "s",
"reference_units": None,
}
},
}
},
},
{"name": "relative", "kind": "KEYWORD_ONLY", "default": False, "annotation": "bool"},
{"name": "kwargs", "kind": "VAR_KEYWORD", "default": "_empty", "annotation": "_empty"},
],
}
mocked_client.connector.set(
MessageEndpoints.available_scans(),
AvailableResourceMessage(resource={"annotated_scan": scan_info}),
)
widget = ScanControl(client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
widget.comboBox_scan_selection.setCurrentText("annotated_scan")
assert widget.comboBox_scan_selection.count() == 1
assert widget.arg_box.layout.itemAtPosition(0, 1).widget().text() == "Start Position"
assert widget.arg_box.widgets[1].toolTip() == "Custom start tooltip\nUnits from: device"
with patch.object(mocked_client.device_manager.devices.samx, "egu", return_value="mm"):
WidgetIO.set_value(widget.arg_box.widgets[0], "samx")
assert widget.arg_box.layout.itemAtPosition(0, 1).widget().text() == "Start Position"
assert widget.arg_box.widgets[1].suffix() == " mm"
assert widget.arg_box.widgets[1].toolTip() == "Custom start tooltip\nUnits: mm"
widget.arg_box.widgets[0].setText("not_a_device")
assert widget.arg_box.layout.itemAtPosition(0, 1).widget().text() == "Start Position"
assert widget.arg_box.widgets[1].suffix() == ""
assert widget.arg_box.widgets[1].toolTip() == "Custom start tooltip\nUnits from: device"
assert [box.title() for box in widget.kwarg_boxes] == [
"Movement Parameters",
"Acquisition Parameters",
]
assert widget.kwarg_boxes[0].layout.itemAtPosition(0, 1).widget().text() == "Step Size Custom"
assert widget.kwarg_boxes[0].widgets[1].suffix() == " mm"
assert widget.kwarg_boxes[0].widgets[1].toolTip() == "Custom step tooltip\nUnits: mm"
assert widget.kwarg_boxes[1].layout.itemAtPosition(0, 0).widget().text() == "Exp Time"
assert widget.kwarg_boxes[1].widgets[0].toolTip() == "Exposure time\nUnits: s"
def test_current_scan(scan_control, mocked_client):
current_scan = scan_control.current_scan
wrong_scan = "error_scan"

View File

@@ -157,3 +157,81 @@ def test_arg_box(qtbot):
# Widget 2
assert arg_box.widgets[2].__class__.__name__ == "ScanSpinBox"
assert arg_box.widgets[2].arg_name
def test_spinbox_limits_from_scan_info(qtbot):
group_input = {
"name": "Kwarg Test",
"inputs": [
{
"arg": False,
"name": "exp_time",
"type": "float",
"display_name": "Exp Time",
"tooltip": "Exposure time in seconds",
"default": 2.0,
"expert": False,
"precision": 3,
"gt": 1.5,
"ge": None,
"lt": 5.0,
"le": None,
},
{
"arg": False,
"name": "num_points",
"type": "int",
"display_name": "Num Points",
"tooltip": "Number of points",
"default": 4,
"expert": False,
"gt": None,
"ge": 3,
"lt": 9,
"le": None,
},
{
"arg": False,
"name": "settling_time",
"type": "float",
"display_name": "Settling Time",
"tooltip": "Settling time in seconds",
"default": 0.5,
"expert": False,
"gt": None,
"ge": 0.2,
"lt": None,
"le": 3.5,
},
{
"arg": False,
"name": "steps",
"type": "int",
"display_name": "Steps",
"tooltip": "Number of steps",
"default": 4,
"expert": False,
"gt": 0,
"ge": None,
"lt": None,
"le": 10,
},
],
}
kwarg_box = ScanGroupBox(box_type="kwargs", config=group_input)
exp_time = kwarg_box.widgets[0]
num_points = kwarg_box.widgets[1]
settling_time = kwarg_box.widgets[2]
steps = kwarg_box.widgets[3]
assert exp_time.decimals() == 3
assert exp_time.minimum() == 1.501
assert exp_time.maximum() == 4.999
assert num_points.minimum() == 3
assert num_points.maximum() == 8
assert settling_time.minimum() == 0.2
assert settling_time.maximum() == 3.5
assert steps.minimum() == 1
assert steps.maximum() == 10