feat(forms): unified pydantic and scan control adapter for pydantic models

This commit is contained in:
2026-06-12 15:52:32 +02:00
committed by Jan Wyzula
co-authored by Jan Wyzula
parent d07d03c1be
commit 563603b80e
15 changed files with 1090 additions and 703 deletions
+4 -50
View File
@@ -33,7 +33,6 @@ _Widgets = {
"BECShell": "BECShell",
"BECStatusBox": "BECStatusBox",
"BeamlineStateManager": "BeamlineStateManager",
"BeamlineStatePill": "BeamlineStatePill",
"BecConsole": "BecConsole",
"DapComboBox": "DapComboBox",
"DeviceBrowser": "DeviceBrowser",
@@ -737,12 +736,6 @@ class BeamlineStateManager(RPCBase):
Set whether idle collapsed pills keep the status-tinted card background.
"""
@rpc_call
def refresh_states(self) -> "None":
"""
Fetch the latest cached available beamline states and update the list immediately.
"""
@rpc_call
def clear_filters(self) -> "None":
"""
@@ -750,51 +743,12 @@ class BeamlineStateManager(RPCBase):
"""
@rpc_call
def remove(self):
"""
Cleanup the BECConnector
def state_summary(self) -> "dict[str, dict[str, str]]":
"""
Return the displayed beamline states with their current status and label.
@rpc_call
def attach(self):
"""
None
"""
@rpc_call
def detach(self):
"""
Detach the widget from its parent dock widget (if widget is in the dock), making it a floating widget.
"""
@rpc_timeout(None)
@rpc_call
def screenshot(self, file_name: "str | None" = None):
"""
Take a screenshot of the dock area and save it to a file.
"""
class BeamlineStatePill(RPCBase):
"""Compact widget showing one BEC beamline state."""
_IMPORT_MODULE = "bec_widgets.widgets.services.beamline_states.beamline_state_pill"
@property
@rpc_call
def state_name(self) -> "str | None":
"""
Name of the BEC beamline state displayed by this pill.
"""
@rpc_call
def set_state_name(self, state_name: "str | None", title: "str | None" = None) -> "None":
"""
Set the BEC beamline state this pill displays.
Args:
state_name: State name as published by ``AvailableBeamlineStatesMessage``.
title: Optional human-readable title for the state.
Returns:
dict: Mapping of state name to a dictionary with ``status`` and ``label`` keys.
"""
@rpc_call
-5
View File
@@ -23,10 +23,6 @@ designer_plugins = {
"bec_widgets.widgets.services.beamline_states.beamline_state_pill",
"BeamlineStateManager",
),
"BeamlineStatePill": (
"bec_widgets.widgets.services.beamline_states.beamline_state_pill",
"BeamlineStatePill",
),
"BecConsole": ("bec_widgets.widgets.editors.bec_console.bec_console", "BecConsole"),
"ColorButton": ("bec_widgets.widgets.utility.visual.color_button.color_button", "ColorButton"),
"ColorButtonNative": (
@@ -127,7 +123,6 @@ widget_icons = {
"BECSpinBox": "123",
"BECStatusBox": "widgets",
"BeamlineStateManager": "format_list_bulleted",
"BeamlineStatePill": "info",
"BecConsole": "terminal",
"ColorButton": "colors",
"ColorButtonNative": "colors",
@@ -0,0 +1,49 @@
from __future__ import annotations
from collections.abc import Mapping
from typing import Any
from pydantic import BaseModel
from pydantic_core import PydanticUndefined
from bec_widgets.utils.scan_arg_metadata import ui_config_from_metadata
NUMERIC_BOUND_KEYS = {"gt", "ge", "lt", "le"}
def pydantic_model_input_configs(model: type[BaseModel]) -> list[dict[str, Any]]:
"""Return scan-control-style field items for a Pydantic model."""
configs = []
for name, info in model.model_fields.items():
metadata: dict[str, Any] = {}
for entry in info.metadata:
for key in NUMERIC_BOUND_KEYS:
value = getattr(entry, key, None)
if value is not None:
metadata.setdefault(key, value)
if isinstance(info.json_schema_extra, Mapping):
metadata.update(dict(info.json_schema_extra))
if info.description and metadata.get("description") is None:
metadata["description"] = info.description
default: Any
if info.default is not PydanticUndefined:
default = info.default
elif info.default_factory is not None:
default = info.get_default(call_default_factory=True)
else:
default = None
display_name = metadata.get("display_name") or info.title
if display_name is None:
display_name = name.replace("_", " ").capitalize()
item = ui_config_from_metadata(
name=name, metadata=metadata, default=default, display_name=display_name
)
item.update({key: value for key, value in metadata.items() if key not in item})
configs.append(item)
return configs
@@ -1,12 +1,11 @@
from __future__ import annotations
from types import NoneType, UnionType
from typing import Any, Literal, Union, get_args, get_origin
from types import NoneType
from typing import Any, Literal, get_args, get_origin
from bec_lib.device import DeviceBase, Signal
from pydantic import BaseModel, ValidationError
from pydantic.fields import FieldInfo
from pydantic_core import PydanticUndefined
from qtpy.QtCore import Qt
from qtpy.QtCore import Signal as QtSignal
from qtpy.QtWidgets import (
@@ -20,6 +19,16 @@ from qtpy.QtWidgets import (
QWidget,
)
from bec_widgets.utils.forms_from_types.pydantic_model_info_adapter import (
NUMERIC_BOUND_KEYS,
pydantic_model_input_configs,
)
from bec_widgets.utils.scan_arg_metadata import (
apply_numeric_limits,
apply_numeric_precision,
apply_unit_metadata,
device_units,
)
from bec_widgets.utils.widget_io import WidgetIO
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import SignalComboBox
@@ -27,11 +36,22 @@ from bec_widgets.widgets.utility.spinbox.decimal_spinbox import BECSpinBox
class OptionalValueWidget(QWidget):
"""Generic optional-value wrapper preserving ``None`` for editor widgets."""
"""Wrap a value widget with an enable checkbox for optional Pydantic fields.
Attributes:
value_changed: Signal emitted with the current value whenever the checkbox
state or wrapped widget value changes.
"""
value_changed = QtSignal(object)
def __init__(self, value_widget: QWidget, parent: QWidget | None = None) -> None:
"""Create an optional-value wrapper.
Args:
value_widget: Input widget used when the optional value is enabled.
parent: Optional parent widget.
"""
super().__init__(parent=parent)
self._value_widget = value_widget
self._checkbox = QCheckBox(self)
@@ -50,18 +70,38 @@ class OptionalValueWidget(QWidget):
@property
def value_widget(self) -> QWidget:
"""Return the wrapped input widget.
Returns:
The widget that edits the non-``None`` value.
"""
return self._value_widget
@property
def checkbox(self) -> QCheckBox:
"""Return the checkbox controlling whether the value is enabled.
Returns:
The enable checkbox.
"""
return self._checkbox
def value(self) -> Any:
"""Return the current optional value.
Returns:
``None`` when the checkbox is unchecked; otherwise the wrapped widget value.
"""
if not self._checkbox.isChecked():
return None
return WidgetIO.get_value(self._value_widget)
def set_value(self, value: Any) -> None:
"""Set the optional value.
Args:
value: Value to set on the wrapped widget. ``None`` disables the value.
"""
enabled = value is not None
self._checkbox.setChecked(enabled)
self._value_widget.setEnabled(enabled)
@@ -77,7 +117,17 @@ class OptionalValueWidget(QWidget):
class PydanticWidgetForm(QWidget):
"""Qt form generated from a Pydantic model using type-based widget selection."""
"""Generate a Qt form from a Pydantic model.
The form maps Pydantic field annotations to Qt widgets, applies supported
field metadata, and exposes typed and raw data accessors for the generated
fields.
Attributes:
changed: Signal emitted whenever a generated input widget changes.
validity_changed: Signal emitted by :meth:`validate` with the current
validation result.
"""
changed = QtSignal()
validity_changed = QtSignal(bool)
@@ -91,11 +141,22 @@ class PydanticWidgetForm(QWidget):
read_only_fields: set[str] | None = None,
client=None,
) -> None:
"""Create a generated form for a Pydantic model.
Args:
model: Pydantic model class used to generate fields and validate data.
parent: Optional parent widget.
data: Optional initial model instance or raw field-value mapping.
read_only_fields: Field names that should be displayed but not editable.
client: Optional BEC client passed to domain-specific widgets such as
device and signal combo boxes.
"""
super().__init__(parent=parent)
self._model = model
self._client = client
self._read_only_fields = set(read_only_fields or set())
self._widgets: dict[str, QWidget] = {}
self._field_configs: dict[str, dict[str, Any]] = {}
self._baseline: dict[str, Any] = {}
self._layout = QFormLayout()
@@ -113,32 +174,87 @@ class PydanticWidgetForm(QWidget):
@property
def model(self) -> type[BaseModel]:
"""Return the active Pydantic model class.
Returns:
The model class currently used by this form.
"""
return self._model
@property
def widgets(self) -> dict[str, QWidget]:
"""Return generated field widgets keyed by model field name.
Returns:
A shallow copy of the field-widget mapping. Optional fields return
their outer :class:`OptionalValueWidget`.
"""
return dict(self._widgets)
def field_widget(self, name: str) -> QWidget:
"""Return the generated widget for a field.
Args:
name: Model field name.
Returns:
The generated field widget. Optional fields return their outer
:class:`OptionalValueWidget`.
Raises:
KeyError: If no widget exists for ``name``.
"""
return self._widgets[name]
def input_widget(self, name: str) -> QWidget:
"""Return the direct input widget for a field.
Args:
name: Model field name.
Returns:
The editable input widget. Optional fields return the wrapped value
widget instead of the outer optional wrapper.
Raises:
KeyError: If no widget exists for ``name``.
"""
widget = self._widgets[name]
if isinstance(widget, OptionalValueWidget):
return widget.value_widget
return widget
def input_widgets(self) -> dict[str, QWidget]:
"""Return direct input widgets keyed by model field name.
Returns:
Mapping of field names to editable input widgets.
"""
return {name: self.input_widget(name) for name in self._widgets}
def input_widgets_by_type(self, widget_type: type[QWidget]) -> list[QWidget]:
"""Return direct input widgets matching a widget type.
Args:
widget_type: Qt widget class to match with ``isinstance``.
Returns:
List of input widgets matching ``widget_type``.
"""
return [
widget for widget in self.input_widgets().values() if isinstance(widget, widget_type)
]
def set_model(self, model: type[BaseModel], data: dict[str, Any] | None = None) -> None:
"""Replace the active model and rebuild the form.
Args:
model: New Pydantic model class.
data: Optional initial data for the new model. When omitted, values
from fields shared with the previous model are preserved.
"""
old_data = self.raw_data()
self._clear()
self.cleanup()
self._model = model
self._populate()
if data is None:
@@ -147,27 +263,69 @@ class PydanticWidgetForm(QWidget):
self.mark_clean()
def set_data(self, data: BaseModel | dict[str, Any]) -> None:
"""Set form values from a model instance or mapping.
Args:
data: Pydantic model instance or raw field-value mapping.
"""
values = data.model_dump() if isinstance(data, BaseModel) else dict(data)
self.set_partial_data(values)
def set_partial_data(self, data: dict[str, Any]) -> None:
"""Set values for fields present in the form.
Unknown keys are ignored, which allows callers to pass larger model
dumps or backend payloads safely.
Args:
data: Field-value mapping to apply.
"""
for name, value in data.items():
if name not in self._widgets:
continue
self._set_widget_value(name, value)
self._refresh_reference_units()
self.changed.emit()
def raw_data(self) -> dict[str, Any]:
"""Return current widget values without Pydantic validation.
Returns:
Mapping of model field names to raw widget values.
"""
return {name: self._read_widget_value(name) for name in self._widgets}
def get_data(self) -> dict[str, Any]:
"""Return current data after Pydantic validation.
Returns:
Validated model data as a dictionary.
Raises:
ValidationError: If Pydantic validation fails.
ValueError: If domain widget validation fails.
"""
return self.model_instance().model_dump()
def model_instance(self) -> BaseModel:
"""Return the current values as a Pydantic model instance.
Returns:
Validated instance of the active model class.
Raises:
ValidationError: If Pydantic validation fails.
ValueError: If domain widget validation fails.
"""
self._validate_domain_widgets()
return self._model.model_validate(self.raw_data())
def validate(self) -> bool:
"""Validate the current form values.
Returns:
``True`` when current values validate successfully, otherwise ``False``.
"""
try:
self.get_data()
except (ValidationError, ValueError):
@@ -177,21 +335,33 @@ class PydanticWidgetForm(QWidget):
return True
def dirty_fields(self) -> set[str]:
"""Return fields whose raw values differ from the clean baseline.
Returns:
Set of dirty field names.
"""
current = self.raw_data()
fields = set(current) | set(self._baseline)
dirty = set()
for field in fields:
if self._values_differ(current.get(field), self._baseline.get(field)):
dirty.add(field)
return dirty
return {field for field in fields if current.get(field) != self._baseline.get(field)}
def mark_clean(self) -> None:
"""Store the current raw values as the clean baseline."""
self._baseline = self.raw_data()
def reset_to_baseline(self) -> None:
"""Restore the form values to the current clean baseline."""
self.set_partial_data(self._baseline)
def editable_data(self) -> dict[str, Any]:
"""Return validated data excluding read-only fields.
Returns:
Validated editable field values.
Raises:
ValidationError: If Pydantic validation fails.
ValueError: If domain widget validation fails.
"""
return {
key: value
for key, value in self.get_data().items()
@@ -199,6 +369,11 @@ class PydanticWidgetForm(QWidget):
}
def raw_editable_data(self) -> dict[str, Any]:
"""Return raw widget data excluding read-only fields.
Returns:
Raw editable field values.
"""
return {
key: value
for key, value in self.raw_data().items()
@@ -206,62 +381,83 @@ class PydanticWidgetForm(QWidget):
}
def cleanup(self) -> None:
self._clear(delete_later=False)
"""Close and schedule deletion of all generated field widgets."""
while self._layout.rowCount():
row = self._layout.takeRow(0)
for item in (row.labelItem, row.fieldItem):
widget = item.widget() if item is not None else None
if widget is not None:
widget.close()
# Detach before deleteLater: a child pending deletion that still has a
# signal connection into this form crashes if the form is garbage
# collected before the deferred delete is processed.
widget.setParent(None)
widget.deleteLater()
self._widgets.clear()
self._field_configs.clear()
def closeEvent(self, event) -> None: # noqa: N802
self.cleanup()
super().closeEvent(event)
def _populate(self) -> None:
for name, info in self._model.model_fields.items():
for config in pydantic_model_input_configs(self._model):
name = config["name"]
info = self._model.model_fields[name]
widget = self._create_widget(name, info)
label_text = info.title or self._format_label(name)
label_text = config["display_name"]
self._layout.addRow(label_text, widget)
label = self._layout.labelForField(widget)
if label is not None:
label.setProperty("_model_field_name", name)
if info.description:
widget.setToolTip(info.description)
if label is not None:
label.setToolTip(info.description)
if config.get("tooltip") and label is not None:
label.setToolTip(config["tooltip"])
widget.setEnabled(name not in self._read_only_fields)
self._widgets[name] = widget
self._set_widget_value(name, self._field_default(info))
self._connect_widget(name, widget)
self._field_configs[name] = config
self._set_widget_value(name, config["default"])
self._apply_field_metadata(name)
self._connect_widget(widget)
self._connect_device_signal_widgets()
def _clear(self, *, delete_later: bool = True) -> None:
while self._layout.count():
item = self._layout.takeAt(0)
widget = item.widget()
if widget is not None:
widget.close()
if delete_later:
widget.deleteLater()
self._widgets.clear()
self._connect_reference_unit_widgets()
self._refresh_reference_units()
def _create_widget(self, name: str, info: FieldInfo) -> QWidget:
annotation = info.annotation
optional = self._is_optional(annotation)
value_annotation = self._without_none(annotation)
args = get_args(annotation)
optional = NoneType in args
non_none_args = tuple(arg for arg in args if arg is not NoneType)
value_annotation = non_none_args[0] if len(non_none_args) == 1 else annotation
widget = self._create_value_widget(name, value_annotation, info)
if optional and (self._is_numeric_annotation(value_annotation) or value_annotation is bool):
widget = self._create_value_widget(name, value_annotation)
numeric = value_annotation in (int, float) or any(
arg in (int, float) for arg in get_args(value_annotation)
)
if optional and (numeric or value_annotation is bool):
return OptionalValueWidget(widget, parent=self)
return widget
def _create_value_widget(self, name: str, annotation: Any, info: FieldInfo) -> QWidget:
if self._contains_type(annotation, Signal):
def _create_value_widget(self, name: str, annotation: Any) -> QWidget:
args = get_args(annotation)
if (
isinstance(annotation, type)
and issubclass(annotation, Signal)
or any(isinstance(arg, type) and issubclass(arg, Signal) for arg in args)
):
return SignalComboBox(
parent=self,
client=self._client,
require_device=self._model_has_device_field(),
arg_name=name,
)
if self._contains_type(annotation, DeviceBase):
if (
isinstance(annotation, type)
and issubclass(annotation, DeviceBase)
or any(isinstance(arg, type) and issubclass(arg, DeviceBase) for arg in args)
):
return DeviceComboBox(parent=self, client=self._client, arg_name=name)
if self._is_literal(annotation):
if get_origin(annotation) is Literal:
widget = QComboBox(self)
widget.addItems([str(value) for value in get_args(annotation)])
return widget
@@ -274,11 +470,24 @@ class PydanticWidgetForm(QWidget):
if annotation is float:
spin_box = BECSpinBox(self)
spin_box.setRange(-1_000_000_000, 1_000_000_000)
spin_box.setDecimals(int((info.json_schema_extra or {}).get("precision", 6)))
return spin_box
return QLineEdit(self)
def _connect_widget(self, _name: str, widget: QWidget) -> None:
def _apply_field_metadata(self, name: str) -> None:
config = self._field_configs[name]
field_widget = self._widgets[name]
input_widget = self.input_widget(name)
if config.get("precision") is not None:
apply_numeric_precision(input_widget, config)
if any(config.get(key) is not None for key in NUMERIC_BOUND_KEYS):
apply_numeric_limits(input_widget, config)
apply_unit_metadata(field_widget, config)
if input_widget is not field_widget:
apply_unit_metadata(input_widget, config)
def _connect_widget(self, widget: QWidget) -> None:
if isinstance(widget, OptionalValueWidget):
widget.value_changed.connect(lambda _value: self.changed.emit())
return
@@ -300,6 +509,47 @@ class PydanticWidgetForm(QWidget):
if device_widget.currentText().strip():
signal_widget.set_device(device_widget.currentText().strip())
def _connect_reference_unit_widgets(self) -> None:
for name, widget in self.input_widgets().items():
if not isinstance(widget, DeviceComboBox):
continue
widget.device_selected.connect(
lambda _device_name, field_name=name: self._update_reference_units(field_name)
)
widget.device_reset.connect(
lambda field_name=name: self._apply_reference_units(field_name, None)
)
widget.currentTextChanged.connect(
lambda text, field_name=name: self._handle_reference_device_text(field_name, text)
)
def _refresh_reference_units(self) -> None:
for name, widget in self.input_widgets().items():
if isinstance(widget, DeviceComboBox):
self._update_reference_units(name)
def _update_reference_units(self, source_name: str) -> None:
widget = self.input_widget(source_name)
if not isinstance(widget, DeviceComboBox) or not widget.is_valid_input:
self._apply_reference_units(source_name, None)
return
self._apply_reference_units(source_name, device_units(widget.get_current_device()))
def _apply_reference_units(self, source_name: str, units: str | None) -> None:
for field_name, config in self._field_configs.items():
if config.get("reference_units") != source_name:
continue
field_widget = self.field_widget(field_name)
input_widget = self.input_widget(field_name)
apply_unit_metadata(field_widget, config, units)
if input_widget is not field_widget:
apply_unit_metadata(input_widget, config, units)
def _handle_reference_device_text(self, source_name: str, device_name: str) -> None:
widget = self.input_widget(source_name)
if isinstance(widget, DeviceComboBox) and not widget.validate_device(device_name):
self._apply_reference_units(source_name, None)
def _validate_domain_widgets(self) -> None:
for widget in self._widgets.values():
if isinstance(widget, DeviceComboBox):
@@ -320,8 +570,8 @@ class PydanticWidgetForm(QWidget):
return widget.value()
if isinstance(widget, QLineEdit):
value = WidgetIO.get_value(widget)
return None if self._is_optional(info.annotation) and value == "" else value
if isinstance(widget, QComboBox) and self._is_literal(self._without_none(info.annotation)):
return None if NoneType in get_args(info.annotation) and value == "" else value
if isinstance(widget, QComboBox) and get_origin(info.annotation) is Literal:
return WidgetIO.get_value(widget, as_string=True)
return WidgetIO.get_value(widget)
@@ -339,91 +589,45 @@ class PydanticWidgetForm(QWidget):
value = 0
WidgetIO.set_value(widget, value)
@staticmethod
def _values_differ(current: Any, baseline: Any) -> bool:
if current is None or baseline is None:
return current is not None or baseline is not None
if isinstance(current, float) or isinstance(baseline, float):
return abs(float(current) - float(baseline)) >= 1e-9
return current != baseline
@staticmethod
def _field_default(info: FieldInfo) -> Any:
if info.default is not PydanticUndefined:
return info.default
if info.default_factory is not None:
return info.get_default(call_default_factory=True)
return None
@staticmethod
def _format_label(name: str) -> str:
return name.replace("_", " ").capitalize()
@staticmethod
def _is_literal(annotation: Any) -> bool:
return get_origin(annotation) is Literal
@classmethod
def _is_optional(cls, annotation: Any) -> bool:
return NoneType in cls._annotation_args(annotation)
@classmethod
def _without_none(cls, annotation: Any) -> Any:
args = [arg for arg in cls._annotation_args(annotation) if arg is not NoneType]
if not args:
return annotation
if len(args) == 1:
return args[0]
return annotation
@classmethod
def _annotation_args(cls, annotation: Any) -> tuple[Any, ...]:
origin = get_origin(annotation)
if origin in (Union, UnionType) or isinstance(annotation, UnionType):
return get_args(annotation)
return ()
@classmethod
def _contains_type(cls, annotation: Any, expected: type) -> bool:
if isinstance(annotation, type):
return issubclass(annotation, expected)
return any(
isinstance(arg, type) and issubclass(arg, expected)
for arg in cls._annotation_args(annotation)
)
def _model_has_device_field(self) -> bool:
return any(
self._is_device_annotation(field.annotation)
for field in self._model.model_fields.values()
)
@classmethod
def _is_device_annotation(cls, annotation: Any) -> bool:
return cls._contains_type(annotation, DeviceBase) and not cls._contains_type(
annotation, Signal
)
@classmethod
def _is_numeric_annotation(cls, annotation: Any) -> bool:
if annotation in (int, float):
return True
return any(arg in (int, float) for arg in cls._annotation_args(annotation))
for field in self._model.model_fields.values():
annotation = field.annotation
args = get_args(annotation)
has_device = (
isinstance(annotation, type)
and issubclass(annotation, DeviceBase)
or any(isinstance(arg, type) and issubclass(arg, DeviceBase) for arg in args)
)
has_signal = (
isinstance(annotation, type)
and issubclass(annotation, Signal)
or any(isinstance(arg, type) and issubclass(arg, Signal) for arg in args)
)
if has_device and not has_signal:
return True
return False
if __name__ == "__main__": # pragma: no cover
import json
import sys
from bec_lib.scan_args import ScanArgument
from pydantic import Field
from qtpy.QtWidgets import QApplication, QLabel, QPushButton, QTabWidget, QTextEdit, QVBoxLayout
from bec_widgets.utils.colors import apply_theme
class BasicScanConfig(BaseModel):
"""Plain Pydantic fields without GUI metadata."""
sample_name: str
enabled: bool = True
repeats: int = 3
class LimitConfig(BaseModel):
"""Normal Pydantic Field metadata."""
mode: Literal["monitor", "scan", "calibration"] = "scan"
low_limit: (
float | None
@@ -441,6 +645,58 @@ if __name__ == "__main__": # pragma: no cover
json_schema_extra={"precision": 4},
)
class ScanArgumentConfig(BaseModel):
"""ScanArgument metadata applied through Field extras."""
settling_time: float = Field(
default=0.0,
**ScanArgument(
display_name="Settling time",
description="Time to wait after moving.",
units="s",
precision=3,
ge=0,
).model_dump(),
)
frames: int = Field(
default=1,
**ScanArgument(
display_name="Frames", description="Number of frames per trigger.", ge=1
).model_dump(),
)
class DeviceSignalLimitsConfig(BaseModel):
"""Device, signal, and numeric fields whose units follow the selected device."""
model_config = {"arbitrary_types_allowed": True}
device: DeviceBase | str = Field(
default="",
**ScanArgument(display_name="Device", description="Positioner device.").model_dump(),
)
signal: Signal | str | None = Field(
default=None,
**ScanArgument(display_name="Signal", description="Device signal.").model_dump(),
)
low_limit: float | None = Field(
default=None,
**ScanArgument(
display_name="Low limit",
description="Optional lower limit.",
reference_units="device",
precision=4,
).model_dump(),
)
high_limit: float | None = Field(
default=None,
**ScanArgument(
display_name="High limit",
description="Optional upper limit.",
reference_units="device",
precision=4,
).model_dump(),
)
class DisplayConfig(BaseModel):
title: str | None = Field(
default=None, title="Title", description="Optional display title."
@@ -500,7 +756,6 @@ if __name__ == "__main__": # pragma: no cover
def __init__(self) -> None:
super().__init__()
self.setWindowTitle("PydanticWidgetForm example")
self.resize(720, 520)
self._tabs = QTabWidget(self)
self._output = QTextEdit(self)
@@ -510,8 +765,10 @@ if __name__ == "__main__": # pragma: no cover
self._add_form("Basic", PydanticWidgetForm(BasicScanConfig))
self._add_form("Limits", PydanticWidgetForm(LimitConfig))
self._add_form("ScanArgument", PydanticWidgetForm(ScanArgumentConfig))
self._add_form("Display", PydanticWidgetForm(DisplayConfig))
self._add_form("Device + signal", PydanticWidgetForm(DeviceAndSignalConfig))
self._add_form("Device limits", PydanticWidgetForm(DeviceSignalLimitsConfig))
self._add_form("Device only", PydanticWidgetForm(DeviceOnlyConfig))
self._add_form("Signal only", PydanticWidgetForm(SignalOnlyConfig))
@@ -552,6 +809,7 @@ if __name__ == "__main__": # pragma: no cover
self._show_current_data(validate=False)
app = QApplication(sys.argv)
apply_theme("dark")
window = ExampleWindow()
window.show()
sys.exit(app.exec())
+155
View File
@@ -0,0 +1,155 @@
from __future__ import annotations
import re
from collections.abc import Mapping
from typing import Any
from bec_lib import bec_logger
from qtpy.QtWidgets import QDoubleSpinBox, QSpinBox, QWidget
logger = bec_logger.logger
UNIT_TOOLTIP_PREFIXES = ("Units:", "Units from:")
def format_display_name(name: str) -> str:
"""Convert a raw argument name into a user-facing label."""
parts = re.split(r"(_|\d+)", name)
return " ".join(part.capitalize() for part in parts if part.isalnum()).strip()
def resolve_tooltip(scan_argument: Mapping[str, Any]) -> str | None:
"""Resolve explicit tooltip text, falling back to the description."""
return scan_argument.get("tooltip") or scan_argument.get("description")
def ui_config_from_metadata(
name: str,
metadata: Mapping[str, Any],
*,
default: Any = None,
input_type: Any = None,
arg: bool = False,
display_name: str | None = None,
) -> dict[str, Any]:
"""Build the normalized scan-input item consumed by form widgets."""
return {
"arg": arg,
"name": name,
"type": input_type,
"display_name": display_name or metadata.get("display_name") or format_display_name(name),
"tooltip": resolve_tooltip(metadata),
"default": default,
"expert": metadata.get("expert", False),
"hidden": metadata.get("hidden", False),
"precision": metadata.get("precision"),
"units": metadata.get("units"),
"reference_units": metadata.get("reference_units"),
"reference_limits": metadata.get("reference_limits"),
"gt": metadata.get("gt"),
"ge": metadata.get("ge"),
"lt": metadata.get("lt"),
"le": metadata.get("le"),
"alternative_group": metadata.get("alternative_group"),
}
def unit_tooltip(item: Mapping[str, Any], units: str | None = None) -> str | None:
"""Build tooltip text from scan argument unit metadata."""
tooltip = item.get("tooltip")
reference_units = item.get("reference_units")
units = units or item.get("units")
tooltip_parts = [tooltip] if tooltip else []
if units:
tooltip_parts.append(f"Units: {units}")
elif reference_units:
tooltip_parts.append(f"Units from: {reference_units}")
if tooltip_parts:
return "\n".join(str(part) for part in tooltip_parts)
return None
def strip_unit_tooltip(tooltip: str) -> str:
"""Remove unit lines added by :func:`apply_unit_metadata`."""
return "\n".join(
line for line in tooltip.splitlines() if not line.startswith(UNIT_TOOLTIP_PREFIXES)
).strip()
def apply_unit_metadata(widget: QWidget, item: Mapping[str, Any], units: str | None = None) -> None:
"""Apply unit tooltip text and numeric suffix metadata to a widget."""
units = units or item.get("units")
tooltip = unit_tooltip(item, units)
existing_tooltip = strip_unit_tooltip(widget.toolTip())
base_tooltip = item.get("tooltip")
if base_tooltip and existing_tooltip == base_tooltip:
existing_tooltip = ""
if tooltip:
widget.setToolTip(f"{existing_tooltip}\n{tooltip}" if existing_tooltip else tooltip)
else:
widget.setToolTip(existing_tooltip)
if hasattr(widget, "setSuffix"):
widget.setSuffix(f" {units}" if units else "")
def device_units(device: object) -> str | None:
"""Return engineering units from a BEC device object when available."""
egu = getattr(device, "egu", None)
if not callable(egu):
return None
try:
return egu()
except Exception:
logger.exception("Failed to fetch engineering units from device %s", device)
return None
def apply_numeric_precision(widget: QWidget, item: Mapping[str, Any]) -> None:
"""Apply decimal precision metadata to spinboxes supporting ``setDecimals``."""
if not hasattr(widget, "setDecimals"):
return
precision = item.get("precision")
if precision is None:
return
try:
widget.setDecimals(max(0, int(precision)))
except (TypeError, ValueError):
logger.warning(
"Ignoring invalid precision %r for parameter %s", precision, item.get("name")
)
def apply_numeric_limits(widget: QWidget, item: Mapping[str, Any]) -> None:
"""Apply ``gt/ge/lt/le`` numeric bounds to Qt spinboxes."""
if isinstance(widget, QSpinBox) and not isinstance(widget, QDoubleSpinBox):
minimum = -2147483647
maximum = 2147483647
if item.get("ge") is not None:
minimum = int(item["ge"])
if item.get("gt") is not None:
minimum = int(item["gt"]) + 1
if item.get("le") is not None:
maximum = int(item["le"])
if item.get("lt") is not None:
maximum = int(item["lt"]) - 1
widget.setRange(minimum, maximum)
return
if isinstance(widget, QDoubleSpinBox):
minimum = -float("inf")
maximum = float("inf")
step = 10 ** (-widget.decimals())
if item.get("ge") is not None:
minimum = float(item["ge"])
if item.get("gt") is not None:
minimum = float(item["gt"]) + step
if item.get("le") is not None:
maximum = float(item["le"])
if item.get("lt") is not None:
maximum = float(item["lt"]) - step
widget.setRange(minimum, maximum)
@@ -20,6 +20,12 @@ from qtpy.QtWidgets import (
QVBoxLayout,
)
from bec_widgets.utils.scan_arg_metadata import (
apply_numeric_limits,
apply_numeric_precision,
apply_unit_metadata,
device_units,
)
from bec_widgets.utils.widget_io import WidgetIO
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import (
BECDeviceFilter,
@@ -285,8 +291,8 @@ class ScanGroupBox(QGroupBox):
)
else:
widget = widget_class(parent=self.parent(), arg_name=arg_name, default=default)
self._apply_numeric_precision(widget, item)
self._apply_numeric_limits(widget, item)
apply_numeric_precision(widget, item)
apply_numeric_limits(widget, item)
if isinstance(widget, DeviceComboBox):
self.selected_devices[widget] = ""
widget.device_selected.connect(self.emit_device_selected)
@@ -298,7 +304,7 @@ class ScanGroupBox(QGroupBox):
if isinstance(widget, ScanLiteralsComboBox):
widget.set_literals(item["type"].get("Literal", []))
self._widget_configs[widget] = item
self._apply_unit_metadata(widget, item)
apply_unit_metadata(widget, item)
self.layout.addWidget(widget, row, column_index)
self.widgets.append(widget)
@@ -307,7 +313,7 @@ class ScanGroupBox(QGroupBox):
sender = self.sender()
self.selected_devices[sender] = device_name.strip()
if isinstance(sender, DeviceComboBox):
units = self._device_units(sender.get_current_device())
units = device_units(sender.get_current_device())
self._update_reference_units(sender, units)
self._emit_reference_units_changed(sender, units)
selected_devices_str = " ".join(self.selected_devices.values())
@@ -453,57 +459,11 @@ class ScanGroupBox(QGroupBox):
WidgetIO.set_value(widget, value)
break
@staticmethod
def _unit_tooltip(item: dict, units: str | None = None) -> str | None:
tooltip = item.get("tooltip", None)
reference_units = item.get("reference_units", None)
units = units or item.get("units", None)
tooltip_parts = [tooltip] if tooltip else []
if units:
tooltip_parts.append(f"Units: {units}")
elif reference_units:
tooltip_parts.append(f"Units from: {reference_units}")
if tooltip_parts:
return "\n".join(tooltip_parts)
return None
def _apply_unit_metadata(self, widget, item: dict, units: str | None = None) -> None:
units = units or item.get("units", None)
tooltip = self._unit_tooltip(item, units)
existing_tooltip = widget.toolTip()
if existing_tooltip:
# strip the existing unit info from the tooltip if it exists
# to avoid tooltip bloat on multiple updates
existing_tooltip = "\n".join(
line
for line in existing_tooltip.splitlines()
if not (line.startswith("Units:") or line.startswith("Units from:"))
).strip()
if tooltip:
if existing_tooltip:
widget.setToolTip(f"{existing_tooltip}\n{tooltip}")
else:
widget.setToolTip(tooltip)
if hasattr(widget, "setSuffix"):
widget.setSuffix(f" {units}" if units else "")
def _refresh_column_label(self, column: int, item: dict) -> None:
if column not in self._column_labels:
return
self._column_labels[column].setText(item.get("display_name", item.get("name", None)))
@staticmethod
def _device_units(device) -> str | None:
egu = getattr(device, "egu", None)
if not callable(egu):
return None
try:
return egu()
except Exception:
logger.exception("Failed to fetch engineering units from device %s", device)
return None
def _widget_position(self, widget) -> tuple[int, int] | None:
for row in range(self.layout.rowCount()):
for column in range(self.layout.columnCount()):
@@ -529,7 +489,7 @@ class ScanGroupBox(QGroupBox):
row, column = widget_position
if self.box_type == "args" and row != source_row:
continue
self._apply_unit_metadata(widget, item, units)
apply_unit_metadata(widget, item, units)
self._refresh_column_label(column, item)
def apply_reference_units(self, reference_name: str, units: str | None) -> None:
@@ -543,7 +503,7 @@ class ScanGroupBox(QGroupBox):
item = self._widget_configs.get(widget, {})
if item.get("reference_units") != reference_name:
continue
self._apply_unit_metadata(widget, item, units)
apply_unit_metadata(widget, item, units)
position = self._widget_position(widget)
if position is not None:
_, column = position
@@ -562,49 +522,3 @@ class ScanGroupBox(QGroupBox):
self.selected_devices[device_widget] = ""
self._update_reference_units(device_widget, None)
self._emit_reference_units_changed(device_widget, None)
@staticmethod
def _apply_numeric_precision(widget: ScanDoubleSpinBox, item: dict) -> None:
if not isinstance(widget, ScanDoubleSpinBox):
return
precision = item.get("precision")
if precision is None:
return
try:
widget.setDecimals(max(0, int(precision)))
except (TypeError, ValueError):
logger.warning(
"Ignoring invalid precision %r for parameter %s", precision, item.get("name")
)
@staticmethod
def _apply_numeric_limits(widget: ScanDoubleSpinBox | ScanSpinBox, item: dict) -> None:
if isinstance(widget, ScanSpinBox):
minimum = -2147483647 # largest int which qt allows
maximum = 2147483647
if item.get("ge") is not None:
minimum = int(item["ge"])
if item.get("gt") is not None:
minimum = int(item["gt"]) + 1
if item.get("le") is not None:
maximum = int(item["le"])
if item.get("lt") is not None:
maximum = int(item["lt"]) - 1
widget.setRange(minimum, maximum)
return
if isinstance(widget, ScanDoubleSpinBox):
minimum = -float("inf")
maximum = float("inf")
step = 10 ** (-widget.decimals())
if item.get("ge") is not None:
minimum = float(item["ge"])
if item.get("gt") is not None:
minimum = float(item["gt"]) + step
if item.get("le") is not None:
maximum = float(item["le"])
if item.get("lt") is not None:
maximum = float(item["lt"]) - step
widget.setRange(minimum, maximum)
@@ -2,9 +2,12 @@
from __future__ import annotations
import re
from typing import Any
from bec_widgets.utils.scan_arg_metadata import format_display_name as format_scan_display_name
from bec_widgets.utils.scan_arg_metadata import resolve_tooltip as resolve_scan_tooltip
from bec_widgets.utils.scan_arg_metadata import ui_config_from_metadata
AnnotationValue = str | dict[str, Any] | list[Any] | None
ScanArgumentMetadata = dict[str, Any]
SignatureEntry = dict[str, Any]
@@ -74,8 +77,7 @@ class ScanInfoAdapter:
Returns:
str: Formatted display label such as ``Exp Time``.
"""
parts = re.split(r"(_|\d+)", name)
return " ".join(part.capitalize() for part in parts if part.isalnum()).strip()
return format_scan_display_name(name)
@staticmethod
def resolve_tooltip(scan_argument: ScanArgumentMetadata) -> str | None:
@@ -87,7 +89,7 @@ class ScanInfoAdapter:
Returns:
str | None: Explicit tooltip text if provided, otherwise the description fallback.
"""
return scan_argument.get("tooltip") or scan_argument.get("description")
return resolve_scan_tooltip(scan_argument)
@staticmethod
def parse_annotation(
@@ -204,24 +206,13 @@ class ScanInfoAdapter:
Returns:
ScanInputConfig: Normalized input configuration.
"""
return {
"arg": arg,
"name": name,
"type": self.scan_arg_type_from_annotation(annotation),
"display_name": scan_argument.get("display_name") or self.format_display_name(name),
"tooltip": self.resolve_tooltip(scan_argument),
"default": default,
"expert": scan_argument.get("expert", False),
"hidden": scan_argument.get("hidden", False),
"precision": scan_argument.get("precision"),
"units": scan_argument.get("units"),
"reference_units": scan_argument.get("reference_units"),
"gt": scan_argument.get("gt"),
"ge": scan_argument.get("ge"),
"lt": scan_argument.get("lt"),
"le": scan_argument.get("le"),
"alternative_group": scan_argument.get("alternative_group"),
}
return ui_config_from_metadata(
name=name,
metadata=scan_argument,
input_type=self.scan_arg_type_from_annotation(annotation),
default=default,
arg=arg,
)
def build_scan_ui_config(self, scan_info: ScanInfo) -> ScanUIConfig:
"""Normalize one available-scan entry into the widget UI configuration.
@@ -6,6 +6,7 @@ from typing import Any
from bec_lib import bl_states
from bec_lib.endpoints import MessageEndpoints
from bec_qthemes import material_icon
from pydantic import BaseModel
from qtpy.QtCore import QAbstractListModel, QModelIndex, QSize, Qt, Signal
from qtpy.QtGui import QColor, QMouseEvent, QPalette
from qtpy.QtWidgets import (
@@ -48,30 +49,6 @@ from bec_widgets.widgets.services.beamline_states.dialogs import (
)
def _coerce_bool(value: Any) -> bool:
if isinstance(value, str):
return value.strip().lower() in {"1", "true", "yes", "on"}
return bool(value)
def _config_class_for_state_type(state_type: str) -> type[bl_states.BeamlineStateConfig]:
for state_class in SUPPORTED_BEAMLINE_STATES:
if state_type in {state_class.__name__, state_class.CONFIG_CLASS.state_type}:
return state_class.CONFIG_CLASS
return bl_states.DeviceStateConfig
def _update_parameters_from_config(config: object) -> dict[str, Any]:
if isinstance(config, bl_states.BeamlineStateConfig):
return config.model_dump(exclude={"name"})
if isinstance(config, dict):
return {key: value for key, value in config.items() if key != "name"}
if hasattr(config, "model_dump"):
data = config.model_dump()
return {key: value for key, value in data.items() if key != "name"}
raise TypeError(f"Unsupported beamline state config type: {type(config)!r}")
class _BeamlineStatePillHeader(QWidget):
"""Header surface responsible for pill click gestures."""
@@ -93,14 +70,13 @@ class BeamlineStatePill(BECWidget, QWidget):
a ``BeamlineStateMessage`` is published for that state.
"""
PLUGIN = True
ICON_NAME = "info"
USER_ACCESS = ["state_name", "set_state_name", "remove", "attach", "detach", "screenshot"]
PLUGIN = False
RPC = False
state_changed = Signal(str, str, str)
update_requested = Signal(str, object)
remove_requested = Signal(str)
size_hint_changed = Signal()
row_height_changed = Signal()
_STATUS_LABELS = BEAMLINE_STATE_STATUS_LABELS
_STATUS_ICONS = {
@@ -249,19 +225,11 @@ class BeamlineStatePill(BECWidget, QWidget):
layout.addWidget(self._settings)
self.setLayout(layout)
self.setMinimumWidth(0)
self._header.setMinimumWidth(0)
self._settings.setMinimumWidth(0)
self._settings.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Minimum)
self.setMinimumHeight(58)
self.set_state_name(state_name, title=title)
self._update_button.setEnabled(False)
self._revert_button.setEnabled(False)
def minimumSizeHint(self) -> QSize: # noqa: N802
hint = super().minimumSizeHint()
return QSize(0, hint.height())
@SafeProperty(str, default=None)
def state_name(self) -> str | None:
"""Name of the BEC beamline state displayed by this pill."""
@@ -318,7 +286,7 @@ class BeamlineStatePill(BECWidget, QWidget):
@idle_card_background.setter
def idle_card_background(self, enabled: bool) -> None:
self._idle_card_background = _coerce_bool(enabled)
self._idle_card_background = enabled
self._apply_visual_state()
def set_idle_card_background(self, enabled: bool) -> None:
@@ -328,15 +296,11 @@ class BeamlineStatePill(BECWidget, QWidget):
def _refresh_latest_state(self) -> None:
if self._state_name is None:
return
msg_container = self.client.connector.get_last(
MessageEndpoints.beamline_state(self._state_name)
msg = self.client.connector.get_last(
MessageEndpoints.beamline_state(self._state_name), key="data"
)
if not msg_container:
return
data = msg_container.get("data") if isinstance(msg_container, dict) else None
content = getattr(data, "content", data)
if isinstance(content, dict):
self.update_state(content, getattr(data, "metadata", {}))
if msg is not None:
self.update_state(msg.content, msg.metadata)
@SafeSlot(dict, dict)
def update_state(
@@ -473,7 +437,7 @@ class BeamlineStatePill(BECWidget, QWidget):
self._expanded = expanded
self._settings.setVisible(expanded)
self._apply_visual_state()
self.size_hint_changed.emit()
self.row_height_changed.emit()
def _ensure_config_form(
self, config_class: type[bl_states.BeamlineStateConfig] = bl_states.DeviceStateConfig
@@ -483,7 +447,6 @@ class BeamlineStatePill(BECWidget, QWidget):
config_class, parent=self._settings, client=self.client, read_only_fields={"name"}
)
self._config_form.changed.connect(self._update_settings_dirty_state)
self._config_form.setMinimumWidth(0)
self._config_form_host.addWidget(self._config_form)
return self._config_form
@@ -497,7 +460,13 @@ class BeamlineStatePill(BECWidget, QWidget):
self._populating_settings = True
try:
state_type = str(self._state_config.get("state_type") or "")
config_class = _config_class_for_state_type(state_type)
config_class = None
for state_class in SUPPORTED_BEAMLINE_STATES:
if state_type in {state_class.__name__, state_class.CONFIG_CLASS.state_type}:
config_class = state_class.CONFIG_CLASS
break
if config_class is None:
raise ValueError(f"Unsupported beamline state type '{state_type}'.")
config_form = self._ensure_config_form(config_class)
if config_form.model is not config_class:
config_form.set_model(config_class)
@@ -513,12 +482,14 @@ class BeamlineStatePill(BECWidget, QWidget):
config = self._ensure_settings_form_current().model_instance()
return config # type: ignore[return-value]
def mark_current_settings_clean(self, config: object | None = None) -> None:
def mark_current_settings_clean(
self, config: bl_states.BeamlineStateConfig | None = None
) -> None:
"""Mark the current editor values as saved."""
if config is None:
parameters = self._ensure_config_form().raw_editable_data()
else:
parameters = _update_parameters_from_config(config)
parameters = config.model_dump(exclude={"name"})
if self._state_config:
state_parameters = self._state_config.get("parameters")
if isinstance(state_parameters, dict):
@@ -694,8 +665,6 @@ class _BeamlineStateListModel(QAbstractListModel):
return name
if role == self.ConfigRole:
return self._state_configs.get(name, {})
if role == Qt.ItemDataRole.SizeHintRole:
return QSize(0, 58)
return None
def flags(self, index: QModelIndex) -> Qt.ItemFlag:
@@ -774,7 +743,7 @@ class _BeamlineStatePillDelegate(QStyledItemDelegate):
pill.state_changed.connect(self._manager._on_pill_state_changed)
pill.update_requested.connect(self._manager._update_state_parameters)
pill.remove_requested.connect(self._manager._remove_state_requested)
pill.size_hint_changed.connect(lambda name=name: self._manager._sync_pill_item_size(name))
pill.row_height_changed.connect(lambda name=name: self._manager._sync_pill_item_size(name))
self._manager._state_pills[str(name)] = pill
return pill
@@ -797,8 +766,8 @@ class _BeamlineStatePillDelegate(QStyledItemDelegate):
name = index.data(_BeamlineStateListModel.NameRole)
pill = self._manager._state_pills.get(str(name))
if pill is not None:
return QSize(0, pill.sizeHint().height())
return QSize(0, 58)
return pill.sizeHint()
return QSize(120, 58)
def destroyEditor(self, editor: QWidget, index: QModelIndex) -> None: # noqa: N802
if isinstance(editor, BeamlineStatePill):
@@ -820,8 +789,6 @@ class _BeamlineStateListView(QListView):
self.setHorizontalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAlwaysOff)
self.setFrameShape(QListView.Shape.NoFrame)
self.setSpacing(6)
self.setMinimumWidth(0)
self.viewport().setMinimumWidth(0)
self.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
self.setStyleSheet(
"QListView#beamline_state_pill_view {"
@@ -839,14 +806,6 @@ class _BeamlineStateListView(QListView):
"}"
)
def minimumSizeHint(self) -> QSize: # noqa: N802
hint = super().minimumSizeHint()
return QSize(0, hint.height())
def sizeHint(self) -> QSize: # noqa: N802
hint = super().sizeHint()
return QSize(0, hint.height())
class BeamlineStateManager(BECWidget, QWidget):
"""
@@ -861,8 +820,8 @@ class BeamlineStateManager(BECWidget, QWidget):
USER_ACCESS = [
"idle_card_background",
"set_idle_card_background",
"refresh_states",
"clear_filters",
"state_summary",
"remove",
"attach",
"detach",
@@ -881,7 +840,6 @@ class BeamlineStateManager(BECWidget, QWidget):
super().__init__(
parent=parent, client=client, config=config, gui_id=gui_id, theme_update=True, **kwargs
)
self.setMinimumWidth(0)
self.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
self._state_pills: dict[str, BeamlineStatePill] = {}
self._state_configs: dict[str, dict[str, Any]] = {}
@@ -914,7 +872,7 @@ class BeamlineStateManager(BECWidget, QWidget):
self._hidden_summary = QToolButton(self)
self._hidden_summary.setToolButtonStyle(Qt.ToolButtonStyle.ToolButtonTextBesideIcon)
self._hidden_summary.setCheckable(True)
self._hidden_summary.clicked.connect(self._toggle_hidden_states)
self._hidden_summary.toggled.connect(self._toggle_hidden_states)
layout.addWidget(self._hidden_summary)
self.setLayout(layout)
@@ -924,10 +882,6 @@ class BeamlineStateManager(BECWidget, QWidget):
self.refresh_states()
self._refresh_hidden_summary()
def minimumSizeHint(self) -> QSize: # noqa: N802
hint = super().minimumSizeHint()
return QSize(0, hint.height())
@SafeProperty(bool, default=False)
def idle_card_background(self) -> bool:
"""
@@ -937,7 +891,7 @@ class BeamlineStateManager(BECWidget, QWidget):
@idle_card_background.setter
def idle_card_background(self, enabled: bool) -> None:
self._idle_card_background = _coerce_bool(enabled)
self._idle_card_background = enabled
for pill in self._state_pills.values():
pill.idle_card_background = self._idle_card_background
@@ -958,28 +912,22 @@ class BeamlineStateManager(BECWidget, QWidget):
clear_filters = MaterialIconAction(
"filter_alt_off", "Clear beamline state filters", filled=True, parent=self
)
refresh = MaterialIconAction(
"restart_alt", "Refresh beamline states", filled=True, parent=self
)
add_state.action.triggered.connect(self.open_add_state_dialog)
filter_states.action.triggered.connect(self.open_status_filter_dialog)
filter_devices.action.triggered.connect(self.open_device_filter_dialog)
clear_filters.action.triggered.connect(self.clear_filters)
refresh.action.triggered.connect(self.refresh_states)
toolbar.components.add_safe("add_state", add_state)
toolbar.components.add_safe("filter_states", filter_states)
toolbar.components.add_safe("filter_devices", filter_devices)
toolbar.components.add_safe("clear_filters", clear_filters)
toolbar.components.add_safe("refresh", refresh)
bundle = ToolbarBundle("beamline_state_manager", toolbar.components)
bundle.add_action("add_state")
bundle.add_action("filter_states")
bundle.add_action("filter_devices")
bundle.add_action("clear_filters")
bundle.add_action("refresh")
toolbar.add_bundle(bundle)
toolbar.show_bundles(["beamline_state_manager"])
return toolbar
@@ -1015,14 +963,8 @@ class BeamlineStateManager(BECWidget, QWidget):
if config is None:
return
beamline_states = getattr(self.client, "beamline_states", None)
if beamline_states is None:
QMessageBox.warning(
self, "Cannot Add State", "BEC client has no beamline state manager."
)
return
try:
beamline_states.add(config)
self.client.beamline_states.add(config)
except Exception as exc:
QMessageBox.warning(self, "Cannot Add State", str(exc))
@@ -1058,16 +1000,26 @@ class BeamlineStateManager(BECWidget, QWidget):
self._hidden_expanded = False
self._apply_filters()
def state_summary(self) -> dict[str, dict[str, str]]:
"""
Return the displayed beamline states with their current status and label.
Returns:
dict: Mapping of state name to a dictionary with ``status`` and ``label`` keys.
"""
return {
name: {"status": pill._status, "label": pill._label}
for name, pill in self._state_pills.items()
}
@SafeSlot()
def refresh_states(self) -> None:
"""Fetch the latest cached available beamline states and update the list immediately."""
msg_container = self.client.connector.get_last(MessageEndpoints.available_beamline_states())
if not msg_container:
return
data = msg_container.get("data") if isinstance(msg_container, dict) else None
content = getattr(data, "content", data)
if isinstance(content, dict):
self.update_available_states(content, getattr(data, "metadata", {}))
msg = self.client.connector.get_last(
MessageEndpoints.available_beamline_states(), key="data"
)
if msg is not None:
self.update_available_states(msg.content, msg.metadata)
@SafeSlot(dict, dict)
def update_available_states(
@@ -1078,7 +1030,7 @@ class BeamlineStateManager(BECWidget, QWidget):
states = content.get("states", [])
state_configs = [self._state_config_to_dict(state) for state in states]
state_configs = [state for state in state_configs if state.get("name")]
if state_configs == [self._state_configs.get(name) for name in self._state_order]:
if state_configs == list(self._state_configs.values()):
self._apply_filters()
return
self._state_configs = {str(state["name"]): state for state in state_configs}
@@ -1156,16 +1108,17 @@ class BeamlineStateManager(BECWidget, QWidget):
self._apply_filters()
@SafeSlot(str, object)
def _update_state_parameters(self, state_name: str, config: object) -> None:
beamline_states = getattr(self.client, "beamline_states", None)
state_client = getattr(beamline_states, state_name, None) if beamline_states else None
if state_client is None or not hasattr(state_client, "update_parameters"):
def _update_state_parameters(
self, state_name: str, config: bl_states.BeamlineStateConfig
) -> None:
state_client = getattr(self.client.beamline_states, state_name, None)
if state_client is None:
QMessageBox.warning(
self, "Cannot Update State", f"Beamline state '{state_name}' is not available."
)
return
try:
parameters = _update_parameters_from_config(config)
parameters = config.model_dump(exclude={"name"})
state_client.update_parameters(**parameters)
except Exception as exc:
QMessageBox.warning(self, "Cannot Update State", str(exc))
@@ -1186,21 +1139,13 @@ class BeamlineStateManager(BECWidget, QWidget):
if reply != QMessageBox.StandardButton.Yes:
return
beamline_states = getattr(self.client, "beamline_states", None)
if beamline_states is None or not hasattr(beamline_states, "delete"):
QMessageBox.warning(
self, "Cannot Remove State", "BEC client has no beamline state manager."
)
return
try:
beamline_states.delete(state_name)
self.client.beamline_states.delete(state_name)
except Exception as exc:
QMessageBox.warning(self, "Cannot Remove State", str(exc))
@SafeSlot(bool)
def _toggle_hidden_states(self, checked: bool | None = None) -> None:
if checked is None:
checked = self._hidden_summary.isChecked()
def _toggle_hidden_states(self, checked: bool) -> None:
self._hidden_expanded = bool(checked)
self._apply_filters()
@@ -1228,16 +1173,17 @@ class BeamlineStateManager(BECWidget, QWidget):
return str(device) if device else None
@staticmethod
def _state_config_to_dict(state: Any) -> dict[str, Any]:
def _state_config_to_dict(state: dict[str, Any] | BaseModel) -> dict[str, Any]:
if isinstance(state, dict):
return state
if hasattr(state, "model_dump"):
state_dict = state.model_dump()
state_type = getattr(state, "state_type", None)
if state_type is not None:
state_dict.setdefault("state_type", state_type)
return state_dict
return {"name": getattr(state, "name", None), "title": getattr(state, "title", None)}
state_dict = state.model_dump()
state_type = getattr(state, "state_type", None)
if state_type is not None:
state_dict.setdefault("state_type", state_type)
title = getattr(state, "title", None)
if title is not None and not state_dict.get("title"):
state_dict["title"] = title
return state_dict
def cleanup(self) -> None:
self.bec_dispatcher.disconnect_slot(
@@ -1 +0,0 @@
{'files': ['beamline_state_pill.py']}
@@ -1,57 +0,0 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from qtpy.QtWidgets import QWidget
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.services.beamline_states.beamline_state_pill import BeamlineStatePill
DOM_XML = """
<ui language='c++'>
<widget class='BeamlineStatePill' name='beamline_state_pill'>
</widget>
</ui>
"""
class BeamlineStatePillPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
if parent is None:
return QWidget()
t = BeamlineStatePill(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return ""
def icon(self):
return designer_material_icon(BeamlineStatePill.ICON_NAME)
def includeFile(self):
return "beamline_state_pill"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "BeamlineStatePill"
def toolTip(self):
return ""
def whatsThis(self):
return self.toolTip()
@@ -1,17 +0,0 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.services.beamline_states.beamline_state_pill_plugin import (
BeamlineStatePillPlugin,
)
QPyDesignerCustomWidgetCollection.addCustomWidget(BeamlineStatePillPlugin())
if __name__ == "__main__": # pragma: no cover
main()
@@ -0,0 +1,73 @@
from __future__ import annotations
import uuid
import pytest
from bec_lib.bl_states import DeviceWithinLimitsStateConfig
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
# pylint: disable=protected-access
def _delete_state_if_present(bec, state_name: str) -> None:
if hasattr(bec.beamline_states, state_name):
bec.beamline_states.delete(state_name)
@pytest.mark.timeout(100)
def test_beamline_state_manager_adds_updates_and_deletes_state_e2e(
qtbot, bec_client_lib, connected_client_gui_obj
):
"""
Verify the real BEC beamline-state flow is reflected by a BeamlineStateManager
running in the GUI server, accessed through the dock area RPC interface.
"""
gui = connected_client_gui_obj
dock_area = gui.bec
bec = bec_client_lib
dev = bec.device_manager.devices
scans = bec.scans
state_name = f"samx_widget_limits_{uuid.uuid4().hex[:8]}"
config = DeviceWithinLimitsStateConfig(
name=state_name, device="samx", signal="samx", low_limit=0.0, high_limit=10.0, tolerance=1.0
)
manager = dock_area.new("BeamlineStateManager")
qtbot.waitUntil(lambda: manager._gui_id in gui._server_registry, timeout=5000)
def state_entry() -> dict[str, str]:
return manager.state_summary().get(state_name, {})
_delete_state_if_present(bec, state_name)
try:
bec.beamline_states.add(config)
qtbot.waitUntil(lambda: hasattr(bec.beamline_states, state_name), timeout=10000)
qtbot.waitUntil(lambda: state_name in manager.state_summary(), timeout=10000)
scans.umv(dev.samx, 5, relative=False).wait()
qtbot.waitUntil(
lambda: getattr(bec.beamline_states, state_name).get()["status"] == "valid",
timeout=10000,
)
qtbot.waitUntil(lambda: state_entry().get("status") == "valid", timeout=10000)
assert state_entry()["label"] == "Device samx within limits"
scans.umv(dev.samx, 20, relative=False).wait()
qtbot.waitUntil(
lambda: getattr(bec.beamline_states, state_name).get()["status"] == "invalid",
timeout=10000,
)
qtbot.waitUntil(lambda: state_entry().get("status") == "invalid", timeout=10000)
assert state_entry()["label"] == "Device samx out of limits"
bec.beamline_states.delete(state_name)
qtbot.waitUntil(lambda: not hasattr(bec.beamline_states, state_name), timeout=10000)
qtbot.waitUntil(lambda: state_name not in manager.state_summary(), timeout=10000)
finally:
_delete_state_if_present(bec, state_name)
scans.umv(dev.samx, 0, relative=False).wait()
+203 -224
View File
@@ -1,6 +1,3 @@
from typing import Any, Generator
import pytest
import shiboken6
from bec_lib import bl_states
from qtpy.QtCore import QCoreApplication, QEvent, Qt
@@ -16,17 +13,13 @@ from bec_widgets.widgets.services.beamline_states.beamline_state_pill import (
from bec_widgets.widgets.services.beamline_states.dialogs import AddBeamlineStateDialog
from .client_mocks import mocked_client
from .conftest import create_widget
@pytest.fixture
def pill(qtbot, mocked_client) -> Generator[BeamlineStatePill, Any, None]:
widget = BeamlineStatePill(state_name="shutter_open", title="Shutter", client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def test_beamline_state_pill_updates_from_message(pill):
def test_beamline_state_pill_updates_from_message(qtbot, mocked_client):
pill = create_widget(
qtbot, BeamlineStatePill, state_name="shutter_open", title="Shutter", client=mocked_client
)
pill.update_state({"name": "shutter_open", "status": "valid", "label": "Shutter is open."}, {})
assert pill._state_name == "shutter_open"
@@ -37,7 +30,10 @@ def test_beamline_state_pill_updates_from_message(pill):
assert pill.toolTip() == "Shutter is open."
def test_beamline_state_pill_ignores_other_states(pill):
def test_beamline_state_pill_ignores_other_states(qtbot, mocked_client):
pill = create_widget(
qtbot, BeamlineStatePill, state_name="shutter_open", title="Shutter", client=mocked_client
)
pill.update_state(
{"name": "other_state", "status": "invalid", "label": "Should be ignored."}, {}
)
@@ -47,10 +43,10 @@ def test_beamline_state_pill_ignores_other_states(pill):
def test_beamline_state_pill_expands_and_emits_updated_limits(qtbot, mocked_client):
widget = BeamlineStatePill(state_name="limits", title="Limits", client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
widget.set_state_config(
limits_pill = create_widget(
qtbot, BeamlineStatePill, state_name="limits", title="Limits", client=mocked_client
)
limits_pill.set_state_config(
{
"name": "limits",
"title": "Limits",
@@ -67,25 +63,27 @@ def test_beamline_state_pill_expands_and_emits_updated_limits(qtbot, mocked_clie
}
)
assert widget._settings.isHidden()
assert widget._config_form is None
assert not widget._update_button.isEnabled()
assert not widget._revert_button.isEnabled()
assert limits_pill._settings.isHidden()
assert limits_pill._config_form is None
assert not limits_pill._update_button.isEnabled()
assert not limits_pill._revert_button.isEnabled()
qtbot.mouseClick(widget._header, Qt.MouseButton.LeftButton)
assert widget._config_form is not None
high_limit = widget._config_form.input_widget("high_limit")
qtbot.mouseClick(limits_pill._header, Qt.MouseButton.LeftButton)
assert limits_pill._config_form is not None
high_limit = limits_pill._config_form.input_widget("high_limit")
high_limit.setValue(20.0)
assert not widget._settings.isHidden()
assert widget._update_button.isEnabled()
assert widget._revert_button.isEnabled()
assert widget._config_form.field_widget("high_limit").property("beamlineStateDirty") is True
assert widget._config_form.get_data()["device"] == "samx"
assert widget.edited_config().high_limit == 20.0
assert not limits_pill._settings.isHidden()
assert limits_pill._update_button.isEnabled()
assert limits_pill._revert_button.isEnabled()
assert (
limits_pill._config_form.field_widget("high_limit").property("beamlineStateDirty") is True
)
assert limits_pill._config_form.get_data()["device"] == "samx"
assert limits_pill.edited_config().high_limit == 20.0
with qtbot.waitSignal(widget.update_requested) as signal:
widget._update_button.click()
with qtbot.waitSignal(limits_pill.update_requested) as signal:
limits_pill._update_button.click()
assert signal.args[0] == "limits"
assert isinstance(signal.args[1], bl_states.DeviceWithinLimitsState.CONFIG_CLASS)
@@ -94,12 +92,15 @@ def test_beamline_state_pill_expands_and_emits_updated_limits(qtbot, mocked_clie
assert signal.args[1].low_limit == 0.0
assert signal.args[1].high_limit == 20.0
assert signal.args[1].tolerance == 0.1
assert not widget._settings.isHidden()
assert not limits_pill._settings.isHidden()
def test_beamline_state_pill_first_expand_uses_config_class_without_rebuild(
qtbot, mocked_client, monkeypatch
):
limits_pill = create_widget(
qtbot, BeamlineStatePill, state_name="limits", title="Limits", client=mocked_client
)
set_model_calls = []
original_set_model = pill_module.PydanticWidgetForm.set_model
@@ -108,9 +109,7 @@ def test_beamline_state_pill_first_expand_uses_config_class_without_rebuild(
return original_set_model(self, model, data=data)
monkeypatch.setattr(pill_module.PydanticWidgetForm, "set_model", set_model_spy)
widget = BeamlineStatePill(state_name="limits", title="Limits", client=mocked_client)
qtbot.addWidget(widget)
widget.set_state_config(
limits_pill.set_state_config(
{
"name": "limits",
"title": "Limits",
@@ -125,15 +124,16 @@ def test_beamline_state_pill_first_expand_uses_config_class_without_rebuild(
}
)
widget.set_expanded(True)
assert widget._config_form is not None
limits_pill.set_expanded(True)
assert limits_pill._config_form is not None
assert set_model_calls == []
def test_beamline_state_pill_reverts_changed_settings(qtbot, mocked_client):
widget = BeamlineStatePill(state_name="limits", title="Limits", client=mocked_client)
qtbot.addWidget(widget)
widget.set_state_config(
limits_pill = create_widget(
qtbot, BeamlineStatePill, state_name="limits", title="Limits", client=mocked_client
)
limits_pill.set_state_config(
{
"name": "limits",
"title": "Limits",
@@ -148,28 +148,45 @@ def test_beamline_state_pill_reverts_changed_settings(qtbot, mocked_client):
}
)
widget.set_expanded(True)
assert widget._config_form is not None
low_limit = widget._config_form.input_widget("low_limit")
limits_pill.set_expanded(True)
assert limits_pill._config_form is not None
low_limit = limits_pill._config_form.input_widget("low_limit")
low_limit.setValue(-5.0)
assert widget._update_button.isEnabled()
assert widget._config_form.field_widget("low_limit").property("beamlineStateDirty") is True
assert limits_pill._update_button.isEnabled()
assert limits_pill._config_form.field_widget("low_limit").property("beamlineStateDirty") is True
widget._revert_button.click()
limits_pill._revert_button.click()
assert low_limit.value() == 0.0
assert not widget._update_button.isEnabled()
assert not widget._revert_button.isEnabled()
assert widget._config_form.field_widget("low_limit").property("beamlineStateDirty") is False
assert not limits_pill._update_button.isEnabled()
assert not limits_pill._revert_button.isEnabled()
assert (
limits_pill._config_form.field_widget("low_limit").property("beamlineStateDirty") is False
)
def test_beamline_state_pill_does_not_override_themed_input_controls(qtbot, mocked_client):
widget = BeamlineStatePill(state_name="limits", title="Limits", client=mocked_client)
qtbot.addWidget(widget)
widget.set_expanded(True)
limits_pill = create_widget(
qtbot, BeamlineStatePill, state_name="limits", title="Limits", client=mocked_client
)
limits_pill.set_state_config(
{
"name": "limits",
"title": "Limits",
"state_type": "DeviceWithinLimitsState",
"parameters": {
"device": "samx",
"signal": "samx",
"low_limit": 0.0,
"high_limit": 10.0,
"tolerance": 0.1,
},
}
)
limits_pill.set_expanded(True)
stylesheet = widget.styleSheet()
stylesheet = limits_pill.styleSheet()
assert "QAbstractSpinBox" not in stylesheet
assert "QComboBox" not in stylesheet
@@ -177,15 +194,16 @@ def test_beamline_state_pill_does_not_override_themed_input_controls(qtbot, mock
def test_beamline_state_manager_adds_and_removes_pills(qtbot, mocked_client):
widget = BeamlineStateManager(client=mocked_client)
qtbot.addWidget(widget)
widget.update_available_states(
beamline_state_manager = create_widget(qtbot, BeamlineStateManager, client=mocked_client)
beamline_state_manager.update_available_states(
{
"states": [
messages.BeamlineStateConfig(
name="shutter_open", title="Shutter", state_type="ShutterState", parameters={}
),
{
"name": "shutter_open",
"title": "Shutter",
"state_type": "ShutterState",
"parameters": {},
},
{
"name": "limits",
"title": "Limits",
@@ -197,12 +215,19 @@ def test_beamline_state_manager_adds_and_removes_pills(qtbot, mocked_client):
{},
)
assert sorted(widget._state_pills) == ["limits", "shutter_open"]
assert widget._model.rowCount() == 2
assert widget._state_pills["shutter_open"]._name_label.text() == "Shutter"
assert not widget._empty_label.isVisible()
assert sorted(beamline_state_manager._state_pills) == ["limits", "shutter_open"]
assert beamline_state_manager._model.rowCount() == 2
assert beamline_state_manager._state_pills["shutter_open"]._name_label.text() == "Shutter"
assert not beamline_state_manager._empty_label.isVisible()
widget.update_available_states(
beamline_state_manager._state_pills["limits"].update_state(
{"name": "limits", "status": "valid", "label": "Within limits."}, {}
)
summary = beamline_state_manager.state_summary()
assert summary["limits"] == {"status": "valid", "label": "Within limits."}
assert summary["shutter_open"]["status"] == "unknown"
beamline_state_manager.update_available_states(
{
"states": [
{
@@ -216,13 +241,12 @@ def test_beamline_state_manager_adds_and_removes_pills(qtbot, mocked_client):
{},
)
assert sorted(widget._state_pills) == ["limits"]
assert widget._model.rowCount() == 1
assert sorted(beamline_state_manager._state_pills) == ["limits"]
assert beamline_state_manager._model.rowCount() == 1
def test_beamline_state_manager_ignores_unchanged_available_states(qtbot, mocked_client):
widget = BeamlineStateManager(client=mocked_client)
qtbot.addWidget(widget)
beamline_state_manager = create_widget(qtbot, BeamlineStateManager, client=mocked_client)
content = {
"states": [
{
@@ -240,18 +264,17 @@ def test_beamline_state_manager_ignores_unchanged_available_states(qtbot, mocked
]
}
widget.update_available_states(content, {})
pill = widget._state_pills["limits"]
beamline_state_manager.update_available_states(content, {})
pill = beamline_state_manager._state_pills["limits"]
widget.update_available_states(content, {})
beamline_state_manager.update_available_states(content, {})
assert widget._state_pills["limits"] is pill
assert beamline_state_manager._state_pills["limits"] is pill
assert pill._config_form is None
def test_beamline_state_manager_adds_state_without_recreating_existing_pills(qtbot, mocked_client):
widget = BeamlineStateManager(client=mocked_client)
qtbot.addWidget(widget)
beamline_state_manager = create_widget(qtbot, BeamlineStateManager, client=mocked_client)
limits_state = {
"name": "limits",
"title": "Limits",
@@ -271,57 +294,22 @@ def test_beamline_state_manager_adds_state_without_recreating_existing_pills(qtb
"parameters": {},
}
widget.update_available_states({"states": [limits_state]}, {})
pill = widget._state_pills["limits"]
beamline_state_manager.update_available_states({"states": [limits_state]}, {})
pill = beamline_state_manager._state_pills["limits"]
pill.set_expanded(True)
config_form = pill._config_form
widget.update_available_states({"states": [limits_state, shutter_state]}, {})
beamline_state_manager.update_available_states({"states": [limits_state, shutter_state]}, {})
assert widget._state_pills["limits"] is pill
assert beamline_state_manager._state_pills["limits"] is pill
assert pill._config_form is config_form
assert pill.is_expanded()
assert sorted(widget._state_pills) == ["limits", "shutter_open"]
def test_beamline_state_manager_does_not_force_horizontal_minimum(qtbot, mocked_client):
widget = BeamlineStateManager(client=mocked_client)
qtbot.addWidget(widget)
widget.update_available_states(
{
"states": [
{
"name": "limits",
"title": "Limits",
"state_type": "DeviceWithinLimitsState",
"parameters": {
"device": "samx",
"signal": "samx",
"low_limit": 0.0,
"high_limit": 10.0,
"tolerance": 0.1,
},
}
]
},
{},
)
index = widget._model.index_for_name("limits")
hint = widget._delegate.sizeHint(QStyleOptionViewItem(), index)
assert widget.minimumWidth() == 0
assert widget.minimumSizeHint().width() == 0
assert widget._view.minimumWidth() == 0
assert widget._view.minimumSizeHint().width() == 0
assert hint.width() == 0
assert sorted(beamline_state_manager._state_pills) == ["limits", "shutter_open"]
def test_beamline_state_manager_header_click_expands_pill_once(qtbot, mocked_client):
widget = BeamlineStateManager(client=mocked_client)
qtbot.addWidget(widget)
widget.update_available_states(
beamline_state_manager = create_widget(qtbot, BeamlineStateManager, client=mocked_client)
beamline_state_manager.update_available_states(
{
"states": [
{
@@ -335,7 +323,7 @@ def test_beamline_state_manager_header_click_expands_pill_once(qtbot, mocked_cli
{},
)
pill = widget._state_pills["limits"]
pill = beamline_state_manager._state_pills["limits"]
assert pill._settings.isHidden()
qtbot.mouseClick(pill._header, Qt.MouseButton.LeftButton)
@@ -344,28 +332,27 @@ def test_beamline_state_manager_header_click_expands_pill_once(qtbot, mocked_cli
def test_beamline_state_manager_preserves_expanded_pill_on_refresh(qtbot, mocked_client):
widget = BeamlineStateManager(client=mocked_client)
qtbot.addWidget(widget)
beamline_state_manager = create_widget(qtbot, BeamlineStateManager, client=mocked_client)
state = {
"name": "limits",
"title": "Limits",
"state_type": "DeviceWithinLimitsState",
"parameters": {"device": "samx", "high_limit": 10.0},
}
widget.update_available_states({"states": [state]}, {})
beamline_state_manager.update_available_states({"states": [state]}, {})
widget._state_pills["limits"].set_expanded(True)
widget.update_available_states({"states": [state]}, {})
beamline_state_manager._state_pills["limits"].set_expanded(True)
beamline_state_manager.update_available_states({"states": [state]}, {})
assert widget._state_pills["limits"].is_expanded()
assert not widget._state_pills["limits"]._settings.isHidden()
assert beamline_state_manager._state_pills["limits"].is_expanded()
assert not beamline_state_manager._state_pills["limits"]._settings.isHidden()
def test_beamline_state_manager_propagates_idle_card_background(qtbot, mocked_client):
widget = BeamlineStateManager(client=mocked_client, idle_card_background=True)
qtbot.addWidget(widget)
widget.update_available_states(
idle_card_manager = create_widget(
qtbot, BeamlineStateManager, client=mocked_client, idle_card_background=True
)
idle_card_manager.update_available_states(
{
"states": [
{
@@ -379,18 +366,16 @@ def test_beamline_state_manager_propagates_idle_card_background(qtbot, mocked_cl
{},
)
assert widget._state_pills["limits"]._idle_card_background is True
assert idle_card_manager._state_pills["limits"]._idle_card_background is True
widget.idle_card_background = False
idle_card_manager.idle_card_background = False
assert widget._state_pills["limits"]._idle_card_background is False
assert idle_card_manager._state_pills["limits"]._idle_card_background is False
def test_beamline_state_manager_filters_status(qtbot, mocked_client):
widget = BeamlineStateManager(client=mocked_client)
qtbot.addWidget(widget)
widget.update_available_states(
beamline_state_manager = create_widget(qtbot, BeamlineStateManager, client=mocked_client)
beamline_state_manager.update_available_states(
{
"states": [
{
@@ -410,38 +395,45 @@ def test_beamline_state_manager_filters_status(qtbot, mocked_client):
{},
)
assert isinstance(widget._toolbar, ModularToolBar)
assert isinstance(beamline_state_manager._toolbar, ModularToolBar)
assert not beamline_state_manager._toolbar.components.exists("refresh")
widget._state_pills["limits"].update_state(
beamline_state_manager._state_pills["limits"].update_state(
{"name": "limits", "status": "valid", "label": "Within limits."}, {}
)
widget._state_pills["shutter_open"].update_state(
beamline_state_manager._state_pills["shutter_open"].update_state(
{"name": "shutter_open", "status": "invalid", "label": "Closed."}, {}
)
widget._selected_statuses = {"valid"}
widget._apply_filters()
beamline_state_manager._selected_statuses = {"valid"}
beamline_state_manager._apply_filters()
assert not widget._hidden_summary.isHidden()
assert "1 state is hidden" in widget._hidden_summary.text()
assert not widget._view.isRowHidden(widget._model.index_for_name("limits").row())
assert widget._view.isRowHidden(widget._model.index_for_name("shutter_open").row())
assert not beamline_state_manager._hidden_summary.isHidden()
assert "1 state is hidden" in beamline_state_manager._hidden_summary.text()
assert not beamline_state_manager._view.isRowHidden(
beamline_state_manager._model.index_for_name("limits").row()
)
assert beamline_state_manager._view.isRowHidden(
beamline_state_manager._model.index_for_name("shutter_open").row()
)
widget._hidden_summary.click()
beamline_state_manager._hidden_summary.click()
assert not widget._view.isRowHidden(widget._model.index_for_name("shutter_open").row())
assert shiboken6.isValid(widget._state_pills["shutter_open"])
assert not beamline_state_manager._view.isRowHidden(
beamline_state_manager._model.index_for_name("shutter_open").row()
)
assert shiboken6.isValid(beamline_state_manager._state_pills["shutter_open"])
widget._hidden_summary.click()
beamline_state_manager._hidden_summary.click()
assert widget._view.isRowHidden(widget._model.index_for_name("shutter_open").row())
assert shiboken6.isValid(widget._state_pills["shutter_open"])
assert beamline_state_manager._view.isRowHidden(
beamline_state_manager._model.index_for_name("shutter_open").row()
)
assert shiboken6.isValid(beamline_state_manager._state_pills["shutter_open"])
def test_beamline_state_manager_status_filter_reacts_to_state_changes(qtbot, mocked_client):
widget = BeamlineStateManager(client=mocked_client)
qtbot.addWidget(widget)
widget.update_available_states(
beamline_state_manager = create_widget(qtbot, BeamlineStateManager, client=mocked_client)
beamline_state_manager.update_available_states(
{
"states": [
{
@@ -455,26 +447,26 @@ def test_beamline_state_manager_status_filter_reacts_to_state_changes(qtbot, moc
{},
)
widget._selected_statuses = {"valid"}
widget._state_pills["limits"].update_state(
beamline_state_manager._selected_statuses = {"valid"}
beamline_state_manager._state_pills["limits"].update_state(
{"name": "limits", "status": "valid", "label": "Within limits."}, {}
)
assert widget._hidden_summary.isHidden()
assert beamline_state_manager._hidden_summary.isHidden()
widget._state_pills["limits"].update_state(
beamline_state_manager._state_pills["limits"].update_state(
{"name": "limits", "status": "invalid", "label": "Out of limits."}, {}
)
assert not widget._hidden_summary.isHidden()
assert widget._view.isRowHidden(widget._model.index_for_name("limits").row())
assert not beamline_state_manager._hidden_summary.isHidden()
assert beamline_state_manager._view.isRowHidden(
beamline_state_manager._model.index_for_name("limits").row()
)
def test_beamline_state_manager_filters_devices(qtbot, mocked_client, monkeypatch):
widget = BeamlineStateManager(client=mocked_client)
qtbot.addWidget(widget)
widget.update_available_states(
beamline_state_manager = create_widget(qtbot, BeamlineStateManager, client=mocked_client)
beamline_state_manager.update_available_states(
{
"states": [
{
@@ -494,11 +486,11 @@ def test_beamline_state_manager_filters_devices(qtbot, mocked_client, monkeypatc
{},
)
widget._device_filter_text = "samx"
widget._apply_filters()
beamline_state_manager._device_filter_text = "samx"
beamline_state_manager._apply_filters()
assert not widget._hidden_summary.isHidden()
assert "1 state is hidden" in widget._hidden_summary.text()
assert not beamline_state_manager._hidden_summary.isHidden()
assert "1 state is hidden" in beamline_state_manager._hidden_summary.text()
captured = {}
@@ -514,17 +506,16 @@ def test_beamline_state_manager_filters_devices(qtbot, mocked_client, monkeypatc
monkeypatch.setattr(pill_module, "DeviceFilterDialog", FakeDeviceFilterDialog)
widget.open_device_filter_dialog()
beamline_state_manager.open_device_filter_dialog()
assert captured["devices"] == ["samx", "samy"]
assert captured["device_filter_text"] == "samx"
assert captured["parent"] is widget
assert captured["parent"] is beamline_state_manager
def test_beamline_state_manager_updates_state_parameters(qtbot, mocked_client):
widget = BeamlineStateManager(client=mocked_client)
qtbot.addWidget(widget)
widget.update_available_states(
beamline_state_manager = create_widget(qtbot, BeamlineStateManager, client=mocked_client)
beamline_state_manager.update_available_states(
{
"states": [
{
@@ -556,17 +547,16 @@ def test_beamline_state_manager_updates_state_parameters(qtbot, mocked_client):
self.limits = StateClient()
mocked_client.beamline_states = StateManager()
pill = widget._state_pills["limits"]
pill = beamline_state_manager._state_pills["limits"]
pill.set_expanded(True)
high_limit = pill._config_form.input_widget("high_limit")
high_limit.setValue(20.0)
assert pill._update_button.isEnabled()
widget._update_state_parameters("limits", pill.edited_config())
beamline_state_manager._update_state_parameters("limits", pill.edited_config())
assert mocked_client.beamline_states.limits.parameters == {
"title": "Limits",
"device": "samx",
"signal": "samx",
"low_limit": 0.0,
@@ -578,8 +568,7 @@ def test_beamline_state_manager_updates_state_parameters(qtbot, mocked_client):
def test_beamline_state_manager_removes_state(qtbot, mocked_client, monkeypatch):
widget = BeamlineStateManager(client=mocked_client)
qtbot.addWidget(widget)
beamline_state_manager = create_widget(qtbot, BeamlineStateManager, client=mocked_client)
class StateManager:
def __init__(self):
@@ -593,39 +582,35 @@ def test_beamline_state_manager_removes_state(qtbot, mocked_client, monkeypatch)
QMessageBox, "question", lambda *args, **kwargs: QMessageBox.StandardButton.Yes
)
widget._remove_state_requested("limits")
beamline_state_manager._remove_state_requested("limits")
assert mocked_client.beamline_states.deleted == "limits"
def test_add_beamline_state_dialog_uses_generated_widgets_and_normalizes_name(qtbot, mocked_client):
dialog = AddBeamlineStateDialog(client=mocked_client)
qtbot.addWidget(dialog)
limits_index = dialog._type_combo.findText(bl_states.DeviceWithinLimitsState.__name__)
add_state_dialog = create_widget(qtbot, AddBeamlineStateDialog, client=mocked_client)
limits_index = add_state_dialog._type_combo.findText(bl_states.DeviceWithinLimitsState.__name__)
assert limits_index >= 0
dialog._type_combo.setCurrentIndex(limits_index)
add_state_dialog._type_combo.setCurrentIndex(limits_index)
assert dialog._config_form.model is bl_states.DeviceWithinLimitsState.CONFIG_CLASS
assert add_state_dialog._config_form.model is bl_states.DeviceWithinLimitsState.CONFIG_CLASS
name = dialog._config_form.input_widget("name")
title = dialog._config_form.input_widget("title")
device = dialog._config_form.input_widget("device")
signal = dialog._config_form.input_widget("signal")
low_limit = dialog._config_form.field_widget("low_limit")
high_limit = dialog._config_form.field_widget("high_limit")
name = add_state_dialog._config_form.input_widget("name")
device = add_state_dialog._config_form.input_widget("device")
signal = add_state_dialog._config_form.input_widget("signal")
low_limit = add_state_dialog._config_form.field_widget("low_limit")
high_limit = add_state_dialog._config_form.field_widget("high_limit")
name.setText("samx-limits")
title.setText("samx-limits-15")
WidgetIO.set_value(device, "samx")
WidgetIO.set_value(signal, "samx")
low_limit.checkbox.setChecked(True)
high_limit.checkbox.setChecked(True)
high_limit.value_widget.setValue(15.0)
config = dialog.config()
config = add_state_dialog.config()
assert config.name == "samx_limits"
assert config.title == "samx-limits-15"
assert config.device == "samx"
assert config.signal == "samx"
assert config.low_limit == 0.0
@@ -635,10 +620,9 @@ def test_add_beamline_state_dialog_uses_generated_widgets_and_normalizes_name(qt
def test_add_beamline_state_dialog_generates_name_only_after_valid_device_selection(
qtbot, mocked_client
):
dialog = AddBeamlineStateDialog(client=mocked_client)
qtbot.addWidget(dialog)
name = dialog._config_form.input_widget("name")
device = dialog._config_form.input_widget("device")
add_state_dialog = create_widget(qtbot, AddBeamlineStateDialog, client=mocked_client)
name = add_state_dialog._config_form.input_widget("name")
device = add_state_dialog._config_form.input_widget("device")
device.setCurrentText("s")
@@ -650,46 +634,41 @@ def test_add_beamline_state_dialog_generates_name_only_after_valid_device_select
def test_add_beamline_state_dialog_switches_state_type_without_collapsing(qtbot, mocked_client):
dialog = AddBeamlineStateDialog(client=mocked_client)
qtbot.addWidget(dialog)
initial_height = dialog.height()
limits_index = dialog._type_combo.findText("DeviceWithinLimitsState")
add_state_dialog = create_widget(qtbot, AddBeamlineStateDialog, client=mocked_client)
initial_height = add_state_dialog.height()
limits_index = add_state_dialog._type_combo.findText("DeviceWithinLimitsState")
assert limits_index >= 0
shutter_index = dialog._type_combo.findText("ShutterState")
shutter_index = add_state_dialog._type_combo.findText("ShutterState")
assert shutter_index >= 0
dialog._type_combo.setCurrentIndex(shutter_index)
add_state_dialog._type_combo.setCurrentIndex(shutter_index)
qtbot.wait(0)
assert dialog._config_form.model is bl_states.DeviceStateConfig
assert dialog._config_form_host.count() == 1
assert not dialog._config_form.isHidden()
assert not dialog._buttons.isHidden()
assert dialog.sizeHint().height() > dialog._buttons.sizeHint().height()
assert dialog.minimumWidth() == 280
assert dialog.maximumWidth() > dialog.minimumWidth()
assert dialog.minimumHeight() == dialog.maximumHeight()
assert add_state_dialog._config_form.model is bl_states.DeviceStateConfig
assert add_state_dialog._config_form_host.count() == 1
assert not add_state_dialog._config_form.isHidden()
assert not add_state_dialog._buttons.isHidden()
assert add_state_dialog.sizeHint().height() > add_state_dialog._buttons.sizeHint().height()
assert add_state_dialog.minimumHeight() == add_state_dialog.maximumHeight()
dialog._type_combo.setCurrentIndex(limits_index)
add_state_dialog._type_combo.setCurrentIndex(limits_index)
qtbot.wait(0)
assert dialog._config_form.model is bl_states.DeviceWithinLimitsState.CONFIG_CLASS
assert dialog.height() >= initial_height
assert dialog.minimumHeight() == dialog.maximumHeight()
assert add_state_dialog._config_form.model is bl_states.DeviceWithinLimitsState.CONFIG_CLASS
assert add_state_dialog.height() >= initial_height
assert add_state_dialog.minimumHeight() == add_state_dialog.maximumHeight()
def test_add_beamline_state_dialog_cleanup_deletes_device_widgets(qtbot, mocked_client):
dialog = AddBeamlineStateDialog(client=mocked_client)
qtbot.addWidget(dialog)
device = dialog._config_form.input_widget("device")
signal = dialog._config_form.input_widget("signal")
add_state_dialog = create_widget(qtbot, AddBeamlineStateDialog, client=mocked_client)
device = add_state_dialog._config_form.input_widget("device")
signal = add_state_dialog._config_form.input_widget("signal")
dialog.reject()
add_state_dialog.reject()
assert shiboken6.isValid(device)
assert shiboken6.isValid(signal)
dialog.cleanup()
add_state_dialog.cleanup()
QCoreApplication.sendPostedEvents(None, QEvent.Type.DeferredDelete)
assert not shiboken6.isValid(device)
@@ -1,7 +1,9 @@
from decimal import Decimal
from unittest.mock import patch
import pytest
from bec_lib.device import Device, Signal
from bec_lib.scan_args import ScanArgument
from pydantic import BaseModel, Field
from qtpy.QtWidgets import QCheckBox, QLabel, QLineEdit
@@ -11,6 +13,7 @@ from bec_widgets.utils.forms_from_types.pydantic_widget_form import (
OptionalValueWidget,
PydanticWidgetForm,
)
from bec_widgets.utils.widget_io import WidgetIO
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import SignalComboBox
from bec_widgets.widgets.utility.spinbox.decimal_spinbox import BECSpinBox
@@ -76,6 +79,35 @@ class GeneratedSignalOnlySchema(BaseModel):
model_config = {"arbitrary_types_allowed": True}
class GeneratedScanArgumentSchema(BaseModel):
device: Device | str = Field(
default="", **ScanArgument(display_name="Device", description="Device source.").model_dump()
)
signal: Signal | str | None = Field(
default=None,
**ScanArgument(display_name="Signal", description="Signal source.").model_dump(),
)
low_limit: float | None = Field(
default=None,
**ScanArgument(
display_name="Low limit",
description="Optional lower bound.",
reference_units="device",
precision=4,
ge=-5,
le=5,
).model_dump(),
)
exposure: float = Field(
default=0.1,
**ScanArgument(
display_name="Exposure", tooltip="Camera exposure.", units="s", precision=3, gt=0
).model_dump(),
)
model_config = {"arbitrary_types_allowed": True}
class GeneratedRequiredNumericAndOptionalBoolSchema(BaseModel):
enabled: bool | None = None
retry_count: int
@@ -182,6 +214,37 @@ def test_pydantic_widget_form_plain_field_has_generated_label_and_no_tooltip(qtb
assert form.field_widget("sample_name").toolTip() == ""
def test_pydantic_widget_form_uses_scan_argument_metadata(qtbot, mocked_client):
form = PydanticWidgetForm(GeneratedScanArgumentSchema, client=mocked_client)
qtbot.addWidget(form)
low_limit = form.field_widget("low_limit")
low_limit_input = form.input_widget("low_limit")
exposure = form.input_widget("exposure")
low_limit_label = form.layout().labelForField(low_limit)
assert isinstance(low_limit_label, QLabel)
assert low_limit_label.text() == "Low limit"
assert low_limit.toolTip() == "Optional lower bound.\nUnits from: device"
assert low_limit_input.toolTip() == "Optional lower bound.\nUnits from: device"
assert low_limit_input.decimals() == 4
assert low_limit_input.minimum() == pytest.approx(-5)
assert low_limit_input.maximum() == pytest.approx(5)
assert form.field_widget("exposure").toolTip() == "Camera exposure.\nUnits: s"
assert exposure.toolTip() == "Camera exposure.\nUnits: s"
assert exposure.suffix() == " s"
assert exposure.decimals() == 3
assert exposure.minimum() == pytest.approx(0.001)
with patch.object(mocked_client.device_manager.devices.samx, "egu", return_value="mm"):
WidgetIO.set_value(form.input_widget("device"), "samx")
assert low_limit.toolTip() == "Optional lower bound.\nUnits: mm"
assert low_limit_input.toolTip() == "Optional lower bound.\nUnits: mm"
assert low_limit_input.suffix() == " mm"
def test_pydantic_widget_form_cleans_up_on_close(qtbot):
form = PydanticWidgetForm(GeneratedPlainSchema)
qtbot.addWidget(form)
@@ -0,0 +1,85 @@
import pytest
from qtpy.QtWidgets import QDoubleSpinBox, QSpinBox
from bec_widgets.utils.scan_arg_metadata import (
apply_numeric_limits,
apply_numeric_precision,
apply_unit_metadata,
device_units,
ui_config_from_metadata,
unit_tooltip,
)
from .conftest import create_widget
def test_unit_tooltip_and_cleanup(qtbot):
widget = create_widget(qtbot, QDoubleSpinBox)
item = {"tooltip": "Move start", "reference_units": "device"}
assert unit_tooltip(item) == "Move start\nUnits from: device"
apply_unit_metadata(widget, item)
assert widget.toolTip() == "Move start\nUnits from: device"
assert widget.suffix() == ""
apply_unit_metadata(widget, item, "mm")
assert widget.toolTip() == "Move start\nUnits: mm"
assert widget.suffix() == " mm"
apply_unit_metadata(widget, item, "deg")
assert widget.toolTip() == "Move start\nUnits: deg"
assert widget.suffix() == " deg"
def test_numeric_precision_and_limits(qtbot):
float_widget = create_widget(qtbot, QDoubleSpinBox)
int_widget = create_widget(qtbot, QSpinBox)
apply_numeric_precision(float_widget, {"name": "position", "precision": 3})
apply_numeric_limits(float_widget, {"ge": -1.5, "lt": 2.0})
apply_numeric_limits(int_widget, {"gt": 2, "le": 8})
assert float_widget.decimals() == 3
assert float_widget.minimum() == pytest.approx(-1.5)
assert float_widget.maximum() == pytest.approx(1.999)
assert int_widget.minimum() == 3
assert int_widget.maximum() == 8
def test_device_units_uses_egu():
class Device:
def egu(self):
return "mm"
assert device_units(Device()) == "mm"
assert device_units(object()) is None
def test_ui_config_from_metadata_matches_scan_control_item_shape():
item = ui_config_from_metadata(
name="exp_time",
input_type="float",
default=0.1,
metadata={"tooltip": "Exposure", "units": "s", "precision": 3, "ge": 0},
)
assert item == {
"arg": False,
"name": "exp_time",
"type": "float",
"display_name": "Exp Time",
"tooltip": "Exposure",
"default": 0.1,
"expert": False,
"hidden": False,
"precision": 3,
"units": "s",
"reference_units": None,
"reference_limits": None,
"gt": None,
"ge": 0,
"lt": None,
"le": None,
"alternative_group": None,
}