Compare commits

..

14 Commits

37 changed files with 3953 additions and 439 deletions
+51
View File
@@ -32,6 +32,7 @@ _Widgets = {
"BECQueue": "BECQueue",
"BECShell": "BECShell",
"BECStatusBox": "BECStatusBox",
"BeamlineStateManager": "BeamlineStateManager",
"BecConsole": "BecConsole",
"DapComboBox": "DapComboBox",
"DeviceBrowser": "DeviceBrowser",
@@ -717,6 +718,56 @@ class BaseROI(RPCBase):
"""
class BeamlineStateManager(RPCBase):
"""Widget displaying and managing all BEC beamline states."""
_IMPORT_MODULE = "bec_widgets.widgets.services.beamline_states.beamline_state_pill"
@property
@rpc_call
def idle_card_background(self) -> "bool":
"""
Whether idle collapsed pills keep the status-tinted card background.
"""
@rpc_call
def set_idle_card_background(self, enabled: "bool") -> "None":
"""
Set whether idle collapsed pills keep the status-tinted card background.
"""
@rpc_call
def clear_filters(self) -> "None":
"""
None
"""
@rpc_call
def remove(self):
"""
Cleanup the BECConnector
"""
@rpc_call
def attach(self):
"""
None
"""
@rpc_call
def detach(self):
"""
Detach the widget from its parent dock widget (if widget is in the dock), making it a floating widget.
"""
@rpc_timeout(None)
@rpc_call
def screenshot(self, file_name: "str | None" = None):
"""
Take a screenshot of the dock area and save it to a file.
"""
class BecConsole(RPCBase):
"""A console widget with access to a shared registry of terminals, such that instances can be moved around."""
+5
View File
@@ -19,6 +19,10 @@ designer_plugins = {
"BECShell": ("bec_widgets.widgets.editors.bec_console.bec_console", "BECShell"),
"BECSpinBox": ("bec_widgets.widgets.utility.spinbox.decimal_spinbox", "BECSpinBox"),
"BECStatusBox": ("bec_widgets.widgets.services.bec_status_box.bec_status_box", "BECStatusBox"),
"BeamlineStateManager": (
"bec_widgets.widgets.services.beamline_states.beamline_state_pill",
"BeamlineStateManager",
),
"BecConsole": ("bec_widgets.widgets.editors.bec_console.bec_console", "BecConsole"),
"ColorButton": ("bec_widgets.widgets.utility.visual.color_button.color_button", "ColorButton"),
"ColorButtonNative": (
@@ -118,6 +122,7 @@ widget_icons = {
"BECShell": "hub",
"BECSpinBox": "123",
"BECStatusBox": "widgets",
"BeamlineStateManager": "format_list_bulleted",
"BecConsole": "terminal",
"ColorButton": "colors",
"ColorButtonNative": "colors",
-1
View File
@@ -20,7 +20,6 @@ from bec_widgets.widgets.utility.spinner.spinner import SpinnerWidget
if TYPE_CHECKING: # pragma: no cover
from bec_widgets.utils.busy_loader import BusyLoaderOverlay
from bec_widgets.widgets.containers.dock import BECDock
logger = bec_logger.logger
+35 -65
View File
@@ -2,7 +2,7 @@ from __future__ import annotations
import re
from functools import lru_cache
from typing import Literal
from typing import Any, Literal
import numpy as np
import pyqtgraph as pg
@@ -21,8 +21,7 @@ logger = bec_logger.logger
def get_theme_name():
if QApplication.instance() is None or not hasattr(QApplication.instance(), "theme"):
return "dark"
else:
return QApplication.instance().theme.theme
return QApplication.instance().theme.theme
def get_theme_palette():
@@ -58,6 +57,26 @@ def apply_theme(theme: Literal["dark", "light"]):
process_all_deferred_deletes(QApplication.instance())
def theme_color(theme: Any | None, key: str, fallback: QColor | str) -> QColor:
"""
Return a QColor from a BEC theme with a robust fallback.
"""
fallback_color = fallback if isinstance(fallback, QColor) else QColor(str(fallback))
if theme is None or not hasattr(theme, "color"):
return fallback_color
color = theme.color(key, fallback_color.name())
return color if isinstance(color, QColor) else QColor(str(color))
def rgba(color: QColor | str, alpha: int) -> str:
"""
Return a QSS-compatible rgba string with alpha clamped to the 0-255 range.
"""
qcolor = color if isinstance(color, QColor) else QColor(str(color))
alpha = max(0, min(255, alpha))
return f"rgba({qcolor.red()}, {qcolor.green()}, {qcolor.blue()}, {alpha})"
class Colors:
@staticmethod
def list_available_colormaps() -> list[str]:
@@ -150,25 +169,6 @@ class Colors:
return ge.colorMap()
@staticmethod
def golden_ratio(num: int) -> list:
"""Calculate the golden ratio for a given number of angles.
Args:
num (int): Number of angles
Returns:
list: List of angles calculated using the golden ratio.
"""
phi = 2 * np.pi * ((1 + np.sqrt(5)) / 2)
angles = []
for ii in range(num):
x = np.cos(ii * phi)
y = np.sin(ii * phi)
angle = np.arctan2(y, x)
angles.append(angle)
return angles
@staticmethod
def set_theme_offset(theme: Literal["light", "dark"] | None = None, offset=0.2) -> tuple:
"""
@@ -239,20 +239,7 @@ class Colors:
else:
positions = np.linspace(min_pos, max_pos, num)
# Sample colors from the colormap at the calculated positions
colors = cmap.map(positions, mode="float")
color_list = []
for color in colors:
if format.upper() == "HEX":
color_list.append(QColor.fromRgbF(*color).name())
elif format.upper() == "RGB":
color_list.append(tuple((np.array(color) * 255).astype(int)))
elif format.upper() == "QCOLOR":
color_list.append(QColor.fromRgbF(*color))
else:
raise ValueError("Unsupported format. Please choose 'RGB', 'HEX', or 'QColor'.")
return color_list
return Colors._format_mapped_colors(cmap.map(positions, mode="float"), format)
@staticmethod
def golden_angle_color(
@@ -288,20 +275,19 @@ class Colors:
positions = np.mod(np.arange(num) * golden_angle_conjugate, 1)
positions = min_pos + positions * (max_pos - min_pos)
# Sample colors from the colormap at the calculated positions
colors = cmap.map(positions, mode="float")
color_list = []
return Colors._format_mapped_colors(cmap.map(positions, mode="float"), format)
for color in colors:
if format.upper() == "HEX":
color_list.append(QColor.fromRgbF(*color).name())
elif format.upper() == "RGB":
color_list.append(tuple((np.array(color) * 255).astype(int)))
elif format.upper() == "QCOLOR":
color_list.append(QColor.fromRgbF(*color))
else:
raise ValueError("Unsupported format. Please choose 'RGB', 'HEX', or 'QColor'.")
return color_list
@staticmethod
def _format_mapped_colors(colors: np.ndarray, format: Literal["QColor", "HEX", "RGB"]) -> list:
color_format = format.upper()
if color_format not in {"QCOLOR", "HEX", "RGB"}:
raise ValueError("Unsupported format. Please choose 'RGB', 'HEX', or 'QColor'.")
if color_format == "QCOLOR":
return [QColor.fromRgbF(*color) for color in colors]
if color_format == "HEX":
return [QColor.fromRgbF(*color).name() for color in colors]
return [tuple((np.array(color) * 255).astype(int)) for color in colors]
@staticmethod
def hex_to_rgba(hex_color: str, alpha=255) -> tuple:
@@ -325,22 +311,6 @@ class Colors:
raise ValueError("HEX color must be 6 or 8 characters long.")
return (r, g, b, alpha)
@staticmethod
def rgba_to_hex(r: int, g: int, b: int, a: int = 255) -> str:
"""
Convert RGBA color to HEX.
Args:
r(int): Red value (0-255).
g(int): Green value (0-255).
b(int): Blue value (0-255).
a(int): Alpha value (0-255). Default is 255 (opaque).
Returns:
hec_color(str): HEX color string.
"""
return "#{:02X}{:02X}{:02X}{:02X}".format(r, g, b, a)
@staticmethod
def validate_color(color: tuple | str) -> tuple | str:
"""
@@ -0,0 +1,49 @@
from __future__ import annotations
from collections.abc import Mapping
from typing import Any
from pydantic import BaseModel
from pydantic_core import PydanticUndefined
from bec_widgets.utils.scan_arg_metadata import ui_config_from_metadata
NUMERIC_BOUND_KEYS = {"gt", "ge", "lt", "le"}
def pydantic_model_input_configs(model: type[BaseModel]) -> list[dict[str, Any]]:
"""Return scan-control-style field items for a Pydantic model."""
configs = []
for name, info in model.model_fields.items():
metadata: dict[str, Any] = {}
for entry in info.metadata:
for key in NUMERIC_BOUND_KEYS:
value = getattr(entry, key, None)
if value is not None:
metadata.setdefault(key, value)
if isinstance(info.json_schema_extra, Mapping):
metadata.update(dict(info.json_schema_extra))
if info.description and metadata.get("description") is None:
metadata["description"] = info.description
default: Any
if info.default is not PydanticUndefined:
default = info.default
elif info.default_factory is not None:
default = info.get_default(call_default_factory=True)
else:
default = None
display_name = metadata.get("display_name") or info.title
if display_name is None:
display_name = name.replace("_", " ").capitalize()
item = ui_config_from_metadata(
name=name, metadata=metadata, default=default, display_name=display_name
)
item.update({key: value for key, value in metadata.items() if key not in item})
configs.append(item)
return configs
@@ -0,0 +1,653 @@
from __future__ import annotations
from types import NoneType
from typing import Any, Literal, get_args, get_origin
from bec_lib.device import DeviceBase, Signal
from pydantic import BaseModel, ValidationError
from pydantic.fields import FieldInfo
from qtpy.QtCore import Qt
from qtpy.QtCore import Signal as QtSignal
from qtpy.QtWidgets import (
QCheckBox,
QComboBox,
QDoubleSpinBox,
QFormLayout,
QHBoxLayout,
QLineEdit,
QSpinBox,
QWidget,
)
from bec_widgets.utils.forms_from_types.pydantic_model_info_adapter import (
NUMERIC_BOUND_KEYS,
pydantic_model_input_configs,
)
from bec_widgets.utils.scan_arg_metadata import (
apply_numeric_limits,
apply_numeric_precision,
apply_unit_metadata,
device_units,
)
from bec_widgets.utils.widget_io import WidgetIO
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import SignalComboBox
from bec_widgets.widgets.utility.spinbox.decimal_spinbox import BECSpinBox
class OptionalValueWidget(QWidget):
"""Generic optional-value wrapper preserving ``None`` for editor widgets."""
value_changed = QtSignal(object)
def __init__(self, value_widget: QWidget, parent: QWidget | None = None) -> None:
super().__init__(parent=parent)
self._value_widget = value_widget
self._checkbox = QCheckBox(self)
self._checkbox.setToolTip("Enable value")
self._value_widget.setParent(self)
layout = QHBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(8)
layout.addWidget(self._checkbox)
layout.addWidget(self._value_widget, 1)
self._checkbox.toggled.connect(self._on_enabled_changed)
WidgetIO.connect_widget_change_signal(self._value_widget, self._emit_current_value)
self._on_enabled_changed(False)
@property
def value_widget(self) -> QWidget:
return self._value_widget
@property
def checkbox(self) -> QCheckBox:
return self._checkbox
def value(self) -> Any:
if not self._checkbox.isChecked():
return None
return WidgetIO.get_value(self._value_widget)
def set_value(self, value: Any) -> None:
enabled = value is not None
self._checkbox.setChecked(enabled)
self._value_widget.setEnabled(enabled)
if enabled:
WidgetIO.set_value(self._value_widget, value)
def _on_enabled_changed(self, enabled: bool) -> None:
self._value_widget.setEnabled(enabled)
self.value_changed.emit(self.value())
def _emit_current_value(self, *_args) -> None:
self.value_changed.emit(self.value())
class PydanticWidgetForm(QWidget):
"""Qt form generated from a Pydantic model using type-based widget selection."""
changed = QtSignal()
validity_changed = QtSignal(bool)
def __init__(
self,
model: type[BaseModel],
parent: QWidget | None = None,
*,
data: BaseModel | dict[str, Any] | None = None,
read_only_fields: set[str] | None = None,
client=None,
) -> None:
super().__init__(parent=parent)
self._model = model
self._client = client
self._read_only_fields = set(read_only_fields or set())
self._widgets: dict[str, QWidget] = {}
self._field_configs: dict[str, dict[str, Any]] = {}
self._baseline: dict[str, Any] = {}
self._layout = QFormLayout()
self._layout.setContentsMargins(0, 0, 0, 0)
self._layout.setHorizontalSpacing(10)
self._layout.setVerticalSpacing(8)
self._layout.setFieldGrowthPolicy(QFormLayout.FieldGrowthPolicy.AllNonFixedFieldsGrow)
self._layout.setLabelAlignment(Qt.AlignmentFlag.AlignRight)
self.setLayout(self._layout)
self._populate()
if data is not None:
self.set_data(data)
self.mark_clean()
@property
def model(self) -> type[BaseModel]:
return self._model
@property
def widgets(self) -> dict[str, QWidget]:
return dict(self._widgets)
def field_widget(self, name: str) -> QWidget:
return self._widgets[name]
def input_widget(self, name: str) -> QWidget:
widget = self._widgets[name]
if isinstance(widget, OptionalValueWidget):
return widget.value_widget
return widget
def input_widgets(self) -> dict[str, QWidget]:
return {name: self.input_widget(name) for name in self._widgets}
def input_widgets_by_type(self, widget_type: type[QWidget]) -> list[QWidget]:
return [
widget for widget in self.input_widgets().values() if isinstance(widget, widget_type)
]
def set_model(self, model: type[BaseModel], data: dict[str, Any] | None = None) -> None:
old_data = self.raw_data()
self.cleanup()
self._model = model
self._populate()
if data is None:
data = {key: value for key, value in old_data.items() if key in model.model_fields}
self.set_partial_data(data)
self.mark_clean()
def set_data(self, data: BaseModel | dict[str, Any]) -> None:
values = data.model_dump() if isinstance(data, BaseModel) else dict(data)
self.set_partial_data(values)
def set_partial_data(self, data: dict[str, Any]) -> None:
for name, value in data.items():
if name not in self._widgets:
continue
self._set_widget_value(name, value)
self._refresh_reference_units()
self.changed.emit()
def raw_data(self) -> dict[str, Any]:
return {name: self._read_widget_value(name) for name in self._widgets}
def get_data(self) -> dict[str, Any]:
return self.model_instance().model_dump()
def model_instance(self) -> BaseModel:
self._validate_domain_widgets()
return self._model.model_validate(self.raw_data())
def validate(self) -> bool:
try:
self.get_data()
except (ValidationError, ValueError):
self.validity_changed.emit(False)
return False
self.validity_changed.emit(True)
return True
def dirty_fields(self) -> set[str]:
current = self.raw_data()
fields = set(current) | set(self._baseline)
dirty = set()
for field in fields:
current_value = current.get(field)
baseline_value = self._baseline.get(field)
if current_value is None or baseline_value is None:
changed = current_value is not None or baseline_value is not None
elif isinstance(current_value, float) or isinstance(baseline_value, float):
changed = abs(float(current_value) - float(baseline_value)) >= 1e-9
else:
changed = current_value != baseline_value
if changed:
dirty.add(field)
return dirty
def mark_clean(self) -> None:
self._baseline = self.raw_data()
def reset_to_baseline(self) -> None:
self.set_partial_data(self._baseline)
def editable_data(self) -> dict[str, Any]:
return {
key: value
for key, value in self.get_data().items()
if key not in self._read_only_fields
}
def raw_editable_data(self) -> dict[str, Any]:
return {
key: value
for key, value in self.raw_data().items()
if key not in self._read_only_fields
}
def cleanup(self) -> None:
while self._layout.count():
item = self._layout.takeAt(0)
widget = item.widget()
if widget is not None:
widget.close()
widget.deleteLater()
self._widgets.clear()
self._field_configs.clear()
def closeEvent(self, event) -> None: # noqa: N802
self.cleanup()
super().closeEvent(event)
def _populate(self) -> None:
for config in pydantic_model_input_configs(self._model):
name = config["name"]
info = self._model.model_fields[name]
widget = self._create_widget(name, info)
label_text = config["display_name"]
self._layout.addRow(label_text, widget)
label = self._layout.labelForField(widget)
if label is not None:
label.setProperty("_model_field_name", name)
if config.get("tooltip") and label is not None:
label.setToolTip(config["tooltip"])
widget.setEnabled(name not in self._read_only_fields)
self._widgets[name] = widget
self._field_configs[name] = config
self._set_widget_value(name, config["default"])
self._apply_field_metadata(name)
self._connect_widget(widget)
self._connect_device_signal_widgets()
self._connect_reference_unit_widgets()
self._refresh_reference_units()
def _create_widget(self, name: str, info: FieldInfo) -> QWidget:
annotation = info.annotation
args = get_args(annotation)
optional = NoneType in args
non_none_args = tuple(arg for arg in args if arg is not NoneType)
value_annotation = non_none_args[0] if len(non_none_args) == 1 else annotation
widget = self._create_value_widget(name, value_annotation)
numeric = value_annotation in (int, float) or any(
arg in (int, float) for arg in get_args(value_annotation)
)
if optional and (numeric or value_annotation is bool):
return OptionalValueWidget(widget, parent=self)
return widget
def _create_value_widget(self, name: str, annotation: Any) -> QWidget:
args = get_args(annotation)
if (
isinstance(annotation, type)
and issubclass(annotation, Signal)
or any(isinstance(arg, type) and issubclass(arg, Signal) for arg in args)
):
return SignalComboBox(
parent=self,
client=self._client,
require_device=self._model_has_device_field(),
arg_name=name,
)
if (
isinstance(annotation, type)
and issubclass(annotation, DeviceBase)
or any(isinstance(arg, type) and issubclass(arg, DeviceBase) for arg in args)
):
return DeviceComboBox(parent=self, client=self._client, arg_name=name)
if get_origin(annotation) is Literal:
widget = QComboBox(self)
widget.addItems([str(value) for value in get_args(annotation)])
return widget
if annotation is bool:
return QCheckBox(self)
if annotation is int:
spin_box = QSpinBox(self)
spin_box.setRange(-2147483647, 2147483647)
return spin_box
if annotation is float:
spin_box = BECSpinBox(self)
spin_box.setRange(-1_000_000_000, 1_000_000_000)
return spin_box
return QLineEdit(self)
def _apply_field_metadata(self, name: str) -> None:
config = self._field_configs[name]
field_widget = self._widgets[name]
input_widget = self.input_widget(name)
if config.get("precision") is not None:
apply_numeric_precision(input_widget, config)
if any(config.get(key) is not None for key in NUMERIC_BOUND_KEYS):
apply_numeric_limits(input_widget, config)
apply_unit_metadata(field_widget, config)
if input_widget is not field_widget:
apply_unit_metadata(input_widget, config)
def _connect_widget(self, widget: QWidget) -> None:
if isinstance(widget, OptionalValueWidget):
widget.value_changed.connect(lambda _value: self.changed.emit())
return
WidgetIO.connect_widget_change_signal(widget, lambda *_args: self.changed.emit())
def _connect_device_signal_widgets(self) -> None:
devices = [
widget for widget in self._widgets.values() if isinstance(widget, DeviceComboBox)
]
signals = [
widget for widget in self._widgets.values() if isinstance(widget, SignalComboBox)
]
if not devices or not signals:
return
device_widget = devices[0]
for signal_widget in signals:
device_widget.device_selected.connect(signal_widget.set_device)
device_widget.device_reset.connect(lambda w=signal_widget: w.set_device(None))
if device_widget.currentText().strip():
signal_widget.set_device(device_widget.currentText().strip())
def _connect_reference_unit_widgets(self) -> None:
for name, widget in self.input_widgets().items():
if not isinstance(widget, DeviceComboBox):
continue
widget.device_selected.connect(
lambda _device_name, field_name=name: self._update_reference_units(field_name)
)
widget.device_reset.connect(
lambda field_name=name: self._apply_reference_units(field_name, None)
)
widget.currentTextChanged.connect(
lambda text, field_name=name: self._handle_reference_device_text(field_name, text)
)
def _refresh_reference_units(self) -> None:
for name, widget in self.input_widgets().items():
if isinstance(widget, DeviceComboBox):
self._update_reference_units(name)
def _update_reference_units(self, source_name: str) -> None:
widget = self.input_widget(source_name)
if not isinstance(widget, DeviceComboBox) or not widget.is_valid_input:
self._apply_reference_units(source_name, None)
return
self._apply_reference_units(source_name, device_units(widget.get_current_device()))
def _apply_reference_units(self, source_name: str, units: str | None) -> None:
for field_name, config in self._field_configs.items():
if config.get("reference_units") != source_name:
continue
field_widget = self.field_widget(field_name)
input_widget = self.input_widget(field_name)
apply_unit_metadata(field_widget, config, units)
if input_widget is not field_widget:
apply_unit_metadata(input_widget, config, units)
def _handle_reference_device_text(self, source_name: str, device_name: str) -> None:
widget = self.input_widget(source_name)
if isinstance(widget, DeviceComboBox) and not widget.validate_device(device_name):
self._apply_reference_units(source_name, None)
def _validate_domain_widgets(self) -> None:
for widget in self._widgets.values():
if isinstance(widget, DeviceComboBox):
device = widget.currentText().strip()
if not device:
raise ValueError("Device is required.")
if not widget.is_valid_input:
raise ValueError(f"Device '{device}' is not available.")
if isinstance(widget, SignalComboBox):
signal = widget.get_signal_name().strip()
if signal and not widget.is_valid_input:
raise ValueError(f"Signal '{signal}' is not available.")
def _read_widget_value(self, name: str) -> Any:
widget = self._widgets[name]
info = self._model.model_fields[name]
if isinstance(widget, OptionalValueWidget):
return widget.value()
if isinstance(widget, QLineEdit):
value = WidgetIO.get_value(widget)
return None if NoneType in get_args(info.annotation) and value == "" else value
if isinstance(widget, QComboBox) and get_origin(info.annotation) is Literal:
return WidgetIO.get_value(widget, as_string=True)
return WidgetIO.get_value(widget)
def _set_widget_value(self, name: str, value: Any) -> None:
widget = self._widgets[name]
if isinstance(widget, OptionalValueWidget):
widget.set_value(value)
return
if value is None:
if isinstance(widget, QLineEdit):
value = ""
elif isinstance(widget, QCheckBox):
value = False
elif isinstance(widget, (QSpinBox, QDoubleSpinBox)):
value = 0
WidgetIO.set_value(widget, value)
def _model_has_device_field(self) -> bool:
for field in self._model.model_fields.values():
annotation = field.annotation
args = get_args(annotation)
has_device = (
isinstance(annotation, type)
and issubclass(annotation, DeviceBase)
or any(isinstance(arg, type) and issubclass(arg, DeviceBase) for arg in args)
)
has_signal = (
isinstance(annotation, type)
and issubclass(annotation, Signal)
or any(isinstance(arg, type) and issubclass(arg, Signal) for arg in args)
)
if has_device and not has_signal:
return True
return False
if __name__ == "__main__": # pragma: no cover
import json
import sys
from bec_lib.scan_args import ScanArgument
from pydantic import Field
from qtpy.QtWidgets import QApplication, QLabel, QPushButton, QTabWidget, QTextEdit, QVBoxLayout
from bec_widgets.utils.colors import apply_theme
class BasicScanConfig(BaseModel):
"""Plain Pydantic fields without GUI metadata."""
sample_name: str
enabled: bool = True
repeats: int = 3
class LimitConfig(BaseModel):
"""Normal Pydantic Field metadata."""
mode: Literal["monitor", "scan", "calibration"] = "scan"
low_limit: (
float | None
) # example of the field without additional metadata, still works in form
high_limit: float | None = Field(
default=10.0,
title="High limit",
description="Optional upper allowed value.",
json_schema_extra={"precision": 4},
)
tolerance: float = Field(
default=0.1,
title="Tolerance",
description="Warning tolerance around configured limits.",
json_schema_extra={"precision": 4},
)
class ScanArgumentConfig(BaseModel):
"""ScanArgument metadata applied through Field extras."""
settling_time: float = Field(
default=0.0,
**ScanArgument(
display_name="Settling time",
description="Time to wait after moving.",
units="s",
precision=3,
ge=0,
).model_dump(),
)
frames: int = Field(
default=1,
**ScanArgument(
display_name="Frames", description="Number of frames per trigger.", ge=1
).model_dump(),
)
class DeviceSignalLimitsConfig(BaseModel):
"""Device, signal, and numeric fields whose units follow the selected device."""
model_config = {"arbitrary_types_allowed": True}
device: DeviceBase | str = Field(
default="",
**ScanArgument(display_name="Device", description="Positioner device.").model_dump(),
)
signal: Signal | str | None = Field(
default=None,
**ScanArgument(display_name="Signal", description="Device signal.").model_dump(),
)
low_limit: float | None = Field(
default=None,
**ScanArgument(
display_name="Low limit",
description="Optional lower limit.",
reference_units="device",
precision=4,
).model_dump(),
)
high_limit: float | None = Field(
default=None,
**ScanArgument(
display_name="High limit",
description="Optional upper limit.",
reference_units="device",
precision=4,
).model_dump(),
)
class DisplayConfig(BaseModel):
title: str | None = Field(
default=None, title="Title", description="Optional display title."
)
show_grid: bool = Field(default=True, title="Show grid")
refresh_interval: int = Field(
default=1000, title="Refresh interval", description="Refresh interval in milliseconds."
)
class DeviceAndSignalConfig(BaseModel):
model_config = {"arbitrary_types_allowed": True}
title: str | None = Field(
default=None, title="Title", description="Optional display title."
)
device: DeviceBase | str = Field(
default="", title="Device", description="BEC device selection."
)
signal: Signal | str | None = Field(
default=None,
title="Signal",
description="Signal selection scoped to the selected device.",
)
refresh_interval: int = Field(
default=1000, title="Refresh interval", description="Refresh interval in milliseconds."
)
class DeviceOnlyConfig(BaseModel):
model_config = {"arbitrary_types_allowed": True}
title: str | None = Field(
default=None, title="Title", description="Optional display title."
)
device: DeviceBase | str = Field(
default="", title="Device", description="BEC device selection."
)
refresh_interval: int = Field(
default=1000, title="Refresh interval", description="Refresh interval in milliseconds."
)
class SignalOnlyConfig(BaseModel):
model_config = {"arbitrary_types_allowed": True}
title: str | None = Field(
default=None, title="Title", description="Optional display title."
)
signal: Signal | str | None = Field(
default=None,
title="Signal",
description="Global BEC signal selection without a device field.",
)
refresh_interval: int = Field(
default=1000, title="Refresh interval", description="Refresh interval in milliseconds."
)
class ExampleWindow(QWidget):
def __init__(self) -> None:
super().__init__()
self.setWindowTitle("PydanticWidgetForm example")
self._tabs = QTabWidget(self)
self._output = QTextEdit(self)
self._output.setReadOnly(True)
self._output.setPlaceholderText("Validated form data appears here.")
self._forms: list[PydanticWidgetForm] = []
self._add_form("Basic", PydanticWidgetForm(BasicScanConfig))
self._add_form("Limits", PydanticWidgetForm(LimitConfig))
self._add_form("ScanArgument", PydanticWidgetForm(ScanArgumentConfig))
self._add_form("Display", PydanticWidgetForm(DisplayConfig))
self._add_form("Device + signal", PydanticWidgetForm(DeviceAndSignalConfig))
self._add_form("Device limits", PydanticWidgetForm(DeviceSignalLimitsConfig))
self._add_form("Device only", PydanticWidgetForm(DeviceOnlyConfig))
self._add_form("Signal only", PydanticWidgetForm(SignalOnlyConfig))
show_data = QPushButton("Show current tab data", self)
show_data.clicked.connect(self._show_current_data)
layout = QVBoxLayout(self)
layout.addWidget(QLabel("Generated forms from Pydantic models", self))
layout.addWidget(self._tabs)
layout.addWidget(show_data)
layout.addWidget(self._output)
def _add_form(self, title: str, form: PydanticWidgetForm) -> None:
form.changed.connect(lambda _form=form: self._on_form_changed(_form))
self._forms.append(form)
self._tabs.addTab(form, title)
def _show_current_data(self, _checked: bool = False, *, validate: bool = True) -> None:
form = self._forms[self._tabs.currentIndex()]
if validate:
try:
data = form.get_data()
except (ValidationError, ValueError) as exc:
self._output.setPlainText(str(exc))
return
key = "data"
else:
data = form.raw_data()
key = "raw_data"
self._output.setPlainText(
json.dumps(
{key: data, "dirty_fields": sorted(form.dirty_fields())}, indent=2, default=str
)
)
def _on_form_changed(self, form: PydanticWidgetForm) -> None:
if form is self._forms[self._tabs.currentIndex()]:
self._show_current_data(validate=False)
app = QApplication(sys.argv)
apply_theme("dark")
window = ExampleWindow()
window.show()
sys.exit(app.exec())
+155
View File
@@ -0,0 +1,155 @@
from __future__ import annotations
import re
from collections.abc import Mapping
from typing import Any
from bec_lib import bec_logger
from qtpy.QtWidgets import QDoubleSpinBox, QSpinBox, QWidget
logger = bec_logger.logger
UNIT_TOOLTIP_PREFIXES = ("Units:", "Units from:")
def format_display_name(name: str) -> str:
"""Convert a raw argument name into a user-facing label."""
parts = re.split(r"(_|\d+)", name)
return " ".join(part.capitalize() for part in parts if part.isalnum()).strip()
def resolve_tooltip(scan_argument: Mapping[str, Any]) -> str | None:
"""Resolve explicit tooltip text, falling back to the description."""
return scan_argument.get("tooltip") or scan_argument.get("description")
def ui_config_from_metadata(
name: str,
metadata: Mapping[str, Any],
*,
default: Any = None,
input_type: Any = None,
arg: bool = False,
display_name: str | None = None,
) -> dict[str, Any]:
"""Build the normalized scan-input item consumed by form widgets."""
return {
"arg": arg,
"name": name,
"type": input_type,
"display_name": display_name or metadata.get("display_name") or format_display_name(name),
"tooltip": resolve_tooltip(metadata),
"default": default,
"expert": metadata.get("expert", False),
"hidden": metadata.get("hidden", False),
"precision": metadata.get("precision"),
"units": metadata.get("units"),
"reference_units": metadata.get("reference_units"),
"reference_limits": metadata.get("reference_limits"),
"gt": metadata.get("gt"),
"ge": metadata.get("ge"),
"lt": metadata.get("lt"),
"le": metadata.get("le"),
"alternative_group": metadata.get("alternative_group"),
}
def unit_tooltip(item: Mapping[str, Any], units: str | None = None) -> str | None:
"""Build tooltip text from scan argument unit metadata."""
tooltip = item.get("tooltip")
reference_units = item.get("reference_units")
units = units or item.get("units")
tooltip_parts = [tooltip] if tooltip else []
if units:
tooltip_parts.append(f"Units: {units}")
elif reference_units:
tooltip_parts.append(f"Units from: {reference_units}")
if tooltip_parts:
return "\n".join(str(part) for part in tooltip_parts)
return None
def strip_unit_tooltip(tooltip: str) -> str:
"""Remove unit lines added by :func:`apply_unit_metadata`."""
return "\n".join(
line for line in tooltip.splitlines() if not line.startswith(UNIT_TOOLTIP_PREFIXES)
).strip()
def apply_unit_metadata(widget: QWidget, item: Mapping[str, Any], units: str | None = None) -> None:
"""Apply unit tooltip text and numeric suffix metadata to a widget."""
units = units or item.get("units")
tooltip = unit_tooltip(item, units)
existing_tooltip = strip_unit_tooltip(widget.toolTip())
base_tooltip = item.get("tooltip")
if base_tooltip and existing_tooltip == base_tooltip:
existing_tooltip = ""
if tooltip:
widget.setToolTip(f"{existing_tooltip}\n{tooltip}" if existing_tooltip else tooltip)
else:
widget.setToolTip(existing_tooltip)
if hasattr(widget, "setSuffix"):
widget.setSuffix(f" {units}" if units else "")
def device_units(device: object) -> str | None:
"""Return engineering units from a BEC device object when available."""
egu = getattr(device, "egu", None)
if not callable(egu):
return None
try:
return egu()
except Exception:
logger.exception("Failed to fetch engineering units from device %s", device)
return None
def apply_numeric_precision(widget: QWidget, item: Mapping[str, Any]) -> None:
"""Apply decimal precision metadata to spinboxes supporting ``setDecimals``."""
if not hasattr(widget, "setDecimals"):
return
precision = item.get("precision")
if precision is None:
return
try:
widget.setDecimals(max(0, int(precision)))
except (TypeError, ValueError):
logger.warning(
"Ignoring invalid precision %r for parameter %s", precision, item.get("name")
)
def apply_numeric_limits(widget: QWidget, item: Mapping[str, Any]) -> None:
"""Apply ``gt/ge/lt/le`` numeric bounds to Qt spinboxes."""
if isinstance(widget, QSpinBox) and not isinstance(widget, QDoubleSpinBox):
minimum = -2147483647
maximum = 2147483647
if item.get("ge") is not None:
minimum = int(item["ge"])
if item.get("gt") is not None:
minimum = int(item["gt"]) + 1
if item.get("le") is not None:
maximum = int(item["le"])
if item.get("lt") is not None:
maximum = int(item["lt"]) - 1
widget.setRange(minimum, maximum)
return
if isinstance(widget, QDoubleSpinBox):
minimum = -float("inf")
maximum = float("inf")
step = 10 ** (-widget.decimals())
if item.get("ge") is not None:
minimum = float(item["ge"])
if item.get("gt") is not None:
minimum = float(item["gt"]) + step
if item.get("le") is not None:
maximum = float(item["le"])
if item.get("lt") is not None:
maximum = float(item["lt"]) - step
widget.setRange(minimum, maximum)
+51
View File
@@ -99,6 +99,45 @@ class ComboBoxHandler(WidgetHandler):
widget.currentIndexChanged.connect(lambda idx, w=widget: slot(w, self.get_value(w)))
class DeviceComboBoxHandler(ComboBoxHandler):
"""Handler for BEC device comboboxes."""
def get_value(self, widget, **kwargs) -> str:
return widget.currentText().strip()
def set_value(self, widget, value: str | None) -> None:
device = "" if value is None else str(value)
if not device:
widget.setCurrentText("")
return
widget.set_device(device)
if widget.currentText() != device:
widget.setCurrentText(device)
def connect_change_signal(self, widget, slot):
widget.currentTextChanged.connect(lambda text, w=widget: slot(w, text.strip()))
class SignalComboBoxHandler(ComboBoxHandler):
"""Handler for BEC signal comboboxes."""
def get_value(self, widget, **kwargs) -> str | None:
signal = widget.get_signal_name().strip()
return signal or None
def set_value(self, widget, value: str | None) -> None:
signal = "" if value is None else str(value)
if not signal:
widget.setCurrentText("")
return
widget.set_signal(signal)
if widget.currentText() != signal and widget.get_signal_name() != signal:
widget.setCurrentText(signal)
def connect_change_signal(self, widget, slot):
widget.currentTextChanged.connect(lambda _text, w=widget: slot(w, self.get_value(w)))
class TableWidgetHandler(WidgetHandler):
"""Handler for QTableWidget widgets."""
@@ -282,6 +321,18 @@ class WidgetIO:
Returns:
handler_class: The handler class if found, otherwise None.
"""
if (
isinstance(widget, QComboBox)
and hasattr(widget, "set_signal")
and hasattr(widget, "get_signal_name")
):
return SignalComboBoxHandler
if (
isinstance(widget, QComboBox)
and hasattr(widget, "set_device")
and hasattr(widget, "device_selected")
):
return DeviceComboBoxHandler
for base in type(widget).__mro__:
if base in WidgetIO._handlers:
return WidgetIO._handlers[base]
@@ -385,6 +385,11 @@ class BECDockArea(DockAreaWidget):
"bec_shell": (widget_icons["BECShell"], "Add BEC Shell", "BECShell"),
"sbb_monitor": (widget_icons["SBBMonitor"], "Add SBB Monitor", "SBBMonitor"),
"log_panel": (widget_icons["LogPanel"], "Add LogPanel", "LogPanel"),
"beamline_state_manager": (
widget_icons["BeamlineStateManager"],
"Add Beamline State Manager",
"BeamlineStateManager",
),
}
# Create expandable menu actions (original behavior)
@@ -11,7 +11,6 @@ Intended for use in desktop applications to provide user feedback, warnings, and
from __future__ import annotations
import json
import sys
from datetime import datetime
from enum import Enum
@@ -29,7 +28,7 @@ from qtpy.QtWidgets import QApplication, QFrame, QMainWindow, QScrollArea, QWidg
from bec_widgets import SafeProperty, SafeSlot
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.colors import apply_theme
from bec_widgets.utils.colors import apply_theme, get_theme_name
from bec_widgets.utils.widget_io import WidgetIO
@@ -258,8 +257,8 @@ class NotificationToast(QFrame):
def _connect_to_theme_change(self):
"""Connect this toast to the global themeupdated signal."""
qapp = QApplication.instance()
if hasattr(qapp, "theme_signal"):
qapp.theme_signal.theme_updated.connect(self.apply_theme)
if hasattr(qapp, "theme"):
qapp.theme.theme_changed.connect(self.apply_theme)
# helper methods -----------------------------------------------------
def _current_inner_width(self) -> int:
@@ -354,11 +353,9 @@ class NotificationToast(QFrame):
Args:
theme(str | None): "light" or "dark". If None, auto-detects from QApplication.
"""
# determine effective theme
if theme is None:
app = QApplication.instance()
theme = getattr(getattr(app, "theme", None), "theme", "dark")
theme = theme.lower()
theme = str(theme or get_theme_name()).lower()
if theme not in {"light", "dark"}:
theme = "dark"
self._theme = theme
palette = DARK_PALETTE if theme == "dark" else LIGHT_PALETTE
@@ -403,11 +400,18 @@ class NotificationToast(QFrame):
#NotificationToast QPushButton:hover {{ color: {btn_hover}; }}
""")
# traceback panel colours
trace_bg = "#1e1e1e" if theme == "dark" else "#f0f0f0"
if theme == "dark":
trace_bg = "#1e1e1e"
trace_fg = palette["body"]
trace_border = "rgba(255,255,255,48)"
else:
trace_bg = "#ffffff"
trace_fg = palette["body"]
trace_border = "rgba(15,23,42,54)"
self.trace_view.setStyleSheet(f"""
background:{trace_bg};
color:{palette['body']};
border:none;
color:{trace_fg};
border: 1px solid {trace_border};
border-radius:8px;
""")
@@ -438,8 +442,8 @@ class NotificationToast(QFrame):
}}
""")
# stronger accent wash in light mode, slightly stronger in dark too
self._accent_alpha = 110 if theme == "light" else 60
self._accent_alpha = 6 if theme == "light" else 60
self._gradient_width_factor = 1.0 if theme == "light" else 0.70
self.update()
########################################
@@ -471,6 +475,8 @@ class NotificationToast(QFrame):
# Event Filters
########################################
def eventFilter(self, watched, event):
if not isinstance(event, QtCore.QEvent):
return False
# timestamp label → toggle absolute time
if watched is self.time_lbl:
if event.type() == QtCore.QEvent.Enter and not self._showing_absolute:
@@ -519,7 +525,9 @@ class NotificationToast(QFrame):
painter.fillPath(path, self._base_color)
# accent gradient, fades to transparent
grad = QtGui.QLinearGradient(0, 0, self.width() * 0.7, 0)
grad = QtGui.QLinearGradient(
0, 0, self.width() * getattr(self, "_gradient_width_factor", 0.70), 0
)
accent = QtGui.QColor(self._accent_color)
if getattr(self, "_theme", "dark") == "light":
accent = accent.darker(115)
@@ -543,7 +551,7 @@ class NotificationToast(QFrame):
def close(self) -> None:
self.closed.emit()
QtWidgets.QApplication.instance().removeEventFilter(self)
self.time_lbl.removeEventFilter(self)
super().close() # this will remove the widget from its parent
@@ -673,8 +681,8 @@ class NotificationCentre(QScrollArea):
def _connect_to_theme_change(self):
"""Connect to the theme change signal."""
qapp = QApplication.instance()
if hasattr(qapp, "theme_signal"):
qapp.theme_signal.theme_updated.connect(self.apply_theme)
if hasattr(qapp, "theme"):
qapp.theme.theme_changed.connect(self.apply_theme)
# public API
def add_notification(
@@ -888,6 +896,8 @@ class NotificationCentre(QScrollArea):
self.setFixedHeight(min(content_h, avail))
def eventFilter(self, watched, event):
if not isinstance(event, QtCore.QEvent):
return False
if watched is self.parent() and event.type() == QtCore.QEvent.Resize:
self._adjust_height()
return super().eventFilter(watched, event)
@@ -4,7 +4,7 @@ import os
from bec_lib import bec_logger
from bec_lib.endpoints import MessageEndpoints
from qtpy.QtCore import QEvent, QSize, Qt, QTimer
from qtpy.QtCore import QSize, Qt, QTimer
from qtpy.QtGui import QAction, QActionGroup, QIcon
from qtpy.QtWidgets import (
QApplication,
@@ -412,11 +412,6 @@ class BECMainWindow(BECWidget, QMainWindow):
"""
apply_theme(theme) # emits theme_updated and applies palette globally
def event(self, event):
if event.type() == QEvent.Type.StatusTip:
return True
return super().event(event)
def _show_widget_hierarchy_dialog(self):
if self._widget_hierarchy_dialog is None:
dialog = WidgetHierarchyDialog(root_widget=None, parent=self)
@@ -603,8 +603,10 @@ class DeviceComboBox(BECWidget, QComboBox):
return device.readout_priority in self.readout_filter
def _update_validity_style(self, is_valid: bool) -> None:
border_color = "transparent" if is_valid or not self.isEnabled() else "red"
self.setStyleSheet(f"border: 1px solid {border_color};")
if is_valid or not self.isEnabled():
self.setStyleSheet("")
return
self.setStyleSheet("QComboBox { border: 1px solid red; }")
def _filter_devices_by_signal_class(
self, devices: list[Device | BECSignal | ComputedSignal | Positioner]
@@ -468,8 +468,7 @@ class SignalComboBox(BECWidget, QComboBox):
True if an enabled item was found and selected.
"""
for index in range(self.count()):
item = self.model().item(index)
if item is not None and item.isEnabled():
if self._item_is_enabled(index):
self.setCurrentIndex(index)
return True
return False
@@ -626,8 +625,10 @@ class SignalComboBox(BECWidget, QComboBox):
self.check_validity(self.currentText())
def _update_validity_style(self, is_valid: bool) -> None:
border_color = "transparent" if is_valid or not self.isEnabled() else "red"
self.setStyleSheet(f"border: 1px solid {border_color};")
if is_valid or not self.isEnabled():
self.setStyleSheet("")
return
self.setStyleSheet("QComboBox { border: 1px solid red; }")
def _replace_signal_items(self, items: list[str | tuple[str, dict]] | None = None):
combo_items = self._signals if items is None else items
@@ -648,15 +649,37 @@ class SignalComboBox(BECWidget, QComboBox):
if self._config_signals:
index = offset + len(self._hinted_signals) + len(self._normal_signals)
self.insertItem(index, "Config Signals")
self.model().item(index).setEnabled(False)
self._set_item_enabled(index, False)
if self._normal_signals:
index = offset + len(self._hinted_signals)
self.insertItem(index, "Normal Signals")
self.model().item(index).setEnabled(False)
self._set_item_enabled(index, False)
if self._hinted_signals:
index = offset
self.insertItem(index, "Hinted Signals")
self.model().item(index).setEnabled(False)
self._set_item_enabled(index, False)
def _standard_item(self, index: int):
model = self.model()
item_getter = getattr(model, "item", None)
if callable(item_getter):
return item_getter(index)
return None
def _item_is_enabled(self, index: int) -> bool:
item = self._standard_item(index)
if item is not None:
return item.isEnabled()
model_index = self.model().index(index, self.modelColumn())
if not model_index.isValid():
return True
return bool(self.model().flags(model_index) & Qt.ItemFlag.ItemIsEnabled)
def _set_item_enabled(self, index: int, enabled: bool) -> None:
item = self._standard_item(index)
if item is not None:
item.setEnabled(enabled)
def _display_text_for_signal(self, signal: str) -> str | None:
for entry in self._signals:
@@ -20,6 +20,12 @@ from qtpy.QtWidgets import (
QVBoxLayout,
)
from bec_widgets.utils.scan_arg_metadata import (
apply_numeric_limits,
apply_numeric_precision,
apply_unit_metadata,
device_units,
)
from bec_widgets.utils.widget_io import WidgetIO
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import (
BECDeviceFilter,
@@ -162,47 +168,6 @@ class ScanCheckBox(QCheckBox):
self.setChecked(default)
class ScanOptionalWidget(QGroupBox):
def __init__(self, widget, parent=None):
super().__init__(parent=parent)
self.inner_widget = widget
self.arg_name = getattr(widget, "arg_name", None)
self.setFlat(True)
layout = QHBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
layout.addWidget(widget)
self.none_checkbox = QCheckBox(self)
self.none_checkbox.setToolTip("Set this value to None.")
self.none_checkbox.toggled.connect(self._on_none_toggled)
layout.addWidget(self.none_checkbox)
def _on_none_toggled(self, checked: bool) -> None:
self.inner_widget.setEnabled(not checked)
def set_none(self, checked: bool) -> None:
self.none_checkbox.setChecked(checked)
def is_none(self) -> bool:
return self.none_checkbox.isChecked()
def setToolTip(self, text: str) -> None: # noqa: N802
super().setToolTip(text)
self.inner_widget.setToolTip(text)
checkbox_tooltip = "Set this value to None."
if text:
checkbox_tooltip = f"{text}\n{checkbox_tooltip}"
self.none_checkbox.setToolTip(checkbox_tooltip)
def toolTip(self) -> str: # noqa: N802
return self.inner_widget.toolTip()
def setSuffix(self, suffix: str) -> None: # noqa: N802
if hasattr(self.inner_widget, "setSuffix"):
self.inner_widget.setSuffix(suffix)
class ScanGroupBox(QGroupBox):
WIDGET_HANDLER = {
ScanArgType.DEVICE: DeviceComboBox,
@@ -252,7 +217,6 @@ class ScanGroupBox(QGroupBox):
self.labels = []
self.widgets = []
self._widget_configs = {}
self._wrapped_widgets = {}
self._column_labels = {}
self.selected_devices = {}
@@ -327,8 +291,8 @@ class ScanGroupBox(QGroupBox):
)
else:
widget = widget_class(parent=self.parent(), arg_name=arg_name, default=default)
self._apply_numeric_precision(widget, item)
self._apply_numeric_limits(widget, item)
apply_numeric_precision(widget, item)
apply_numeric_limits(widget, item)
if isinstance(widget, DeviceComboBox):
self.selected_devices[widget] = ""
widget.device_selected.connect(self.emit_device_selected)
@@ -339,18 +303,17 @@ class ScanGroupBox(QGroupBox):
)
if isinstance(widget, ScanLiteralsComboBox):
widget.set_literals(item["type"].get("Literal", []))
display_widget = self._wrap_optional_widget(widget, item, default)
self._widget_configs[display_widget] = item
self._apply_unit_metadata(display_widget, item)
self.layout.addWidget(display_widget, row, column_index)
self.widgets.append(display_widget)
self._widget_configs[widget] = item
apply_unit_metadata(widget, item)
self.layout.addWidget(widget, row, column_index)
self.widgets.append(widget)
@Slot(str)
def emit_device_selected(self, device_name):
sender = self.sender()
self.selected_devices[sender] = device_name.strip()
if isinstance(sender, DeviceComboBox):
units = self._device_units(sender.get_current_device())
units = device_units(sender.get_current_device())
self._update_reference_units(sender, units)
self._emit_reference_units_changed(sender, units)
selected_devices_str = " ".join(self.selected_devices.values())
@@ -377,10 +340,8 @@ class ScanGroupBox(QGroupBox):
return
for widget in self.widgets[-len(self.inputs) :]:
inner_widget = self._inner_widget(widget)
if isinstance(inner_widget, DeviceComboBox):
self.selected_devices[inner_widget] = ""
self._wrapped_widgets.pop(inner_widget, None)
if isinstance(widget, DeviceComboBox):
self.selected_devices[widget] = ""
self._widget_configs.pop(widget, None)
widget.close()
widget.deleteLater()
@@ -392,10 +353,8 @@ class ScanGroupBox(QGroupBox):
def remove_all_widget_bundles(self):
"""Remove every widget bundle from the scan control layout."""
for widget in list(self.widgets):
inner_widget = self._inner_widget(widget)
if isinstance(inner_widget, DeviceComboBox):
self.selected_devices.pop(inner_widget, None)
self._wrapped_widgets.pop(inner_widget, None)
if isinstance(widget, DeviceComboBox):
self.selected_devices.pop(widget, None)
self._widget_configs.pop(widget, None)
widget.close()
widget.deleteLater()
@@ -432,7 +391,12 @@ class ScanGroupBox(QGroupBox):
for j in range(self.layout.columnCount()):
try: # In case that the bundle size changes
widget = self.layout.itemAtPosition(i, j).widget()
value = self._widget_value(widget, device_object=device_object)
if isinstance(widget, DeviceComboBox) and device_object:
value = widget.get_current_device()
elif isinstance(widget, DeviceComboBox):
value = widget.currentText()
else:
value = WidgetIO.get_value(widget)
args.append(value)
except AttributeError:
continue
@@ -442,23 +406,27 @@ class ScanGroupBox(QGroupBox):
kwargs = {}
for i in range(self.layout.columnCount()):
widget = self.layout.itemAtPosition(1, i).widget()
value = self._widget_value(widget, device_object=device_object)
inner_widget = self._inner_widget(widget)
if isinstance(inner_widget, DeviceComboBox) and value is not None and device_object:
value = value.name
if isinstance(widget, DeviceComboBox) and device_object:
value = widget.get_current_device().name
elif isinstance(widget, DeviceComboBox):
value = widget.currentText()
elif isinstance(widget, ScanLiteralsComboBox):
value = widget.get_value()
else:
value = WidgetIO.get_value(widget)
kwargs[widget.arg_name] = value
return kwargs
def count_arg_rows(self):
widget_rows = 0
for row in range(self.layout.rowCount()):
if row == 0:
continue
for col in range(self.layout.columnCount()):
item = self.layout.itemAtPosition(row, col)
if item is not None and item.widget() is not None:
widget_rows += 1
break
if item is not None:
widget = item.widget()
if widget is not None:
if isinstance(widget, DeviceComboBox):
widget_rows += 1
return widget_rows
def set_parameters(self, parameters: list | dict):
@@ -482,68 +450,21 @@ class ScanGroupBox(QGroupBox):
self.add_input_widgets(self.inputs, row)
for i, value in enumerate(parameters):
self._set_widget_value(self.widgets[i], value)
WidgetIO.set_value(self.widgets[i], value)
def _set_kwarg_parameters(self, parameters: dict):
for widget in self.widgets:
for key, value in parameters.items():
if widget.arg_name == key:
self._set_widget_value(widget, value)
WidgetIO.set_value(widget, value)
break
@staticmethod
def _unit_tooltip(item: dict, units: str | None = None) -> str | None:
tooltip = item.get("tooltip", None)
reference_units = item.get("reference_units", None)
units = units or item.get("units", None)
tooltip_parts = [tooltip] if tooltip else []
if units:
tooltip_parts.append(f"Units: {units}")
elif reference_units:
tooltip_parts.append(f"Units from: {reference_units}")
if tooltip_parts:
return "\n".join(tooltip_parts)
return None
def _apply_unit_metadata(self, widget, item: dict, units: str | None = None) -> None:
units = units or item.get("units", None)
tooltip = self._unit_tooltip(item, units)
existing_tooltip = widget.toolTip()
if existing_tooltip:
# strip the existing unit info from the tooltip if it exists
# to avoid tooltip bloat on multiple updates
existing_tooltip = "\n".join(
line
for line in existing_tooltip.splitlines()
if not (line.startswith("Units:") or line.startswith("Units from:"))
).strip()
if tooltip:
if existing_tooltip:
widget.setToolTip(f"{existing_tooltip}\n{tooltip}")
else:
widget.setToolTip(tooltip)
if hasattr(widget, "setSuffix"):
widget.setSuffix(f" {units}" if units else "")
def _refresh_column_label(self, column: int, item: dict) -> None:
if column not in self._column_labels:
return
self._column_labels[column].setText(item.get("display_name", item.get("name", None)))
@staticmethod
def _device_units(device) -> str | None:
egu = getattr(device, "egu", None)
if not callable(egu):
return None
try:
return egu()
except Exception:
logger.exception("Failed to fetch engineering units from device %s", device)
return None
def _widget_position(self, widget) -> tuple[int, int] | None:
widget = self._display_widget(widget)
for row in range(self.layout.rowCount()):
for column in range(self.layout.columnCount()):
item = self.layout.itemAtPosition(row, column)
@@ -568,7 +489,7 @@ class ScanGroupBox(QGroupBox):
row, column = widget_position
if self.box_type == "args" and row != source_row:
continue
self._apply_unit_metadata(widget, item, units)
apply_unit_metadata(widget, item, units)
self._refresh_column_label(column, item)
def apply_reference_units(self, reference_name: str, units: str | None) -> None:
@@ -582,7 +503,7 @@ class ScanGroupBox(QGroupBox):
item = self._widget_configs.get(widget, {})
if item.get("reference_units") != reference_name:
continue
self._apply_unit_metadata(widget, item, units)
apply_unit_metadata(widget, item, units)
position = self._widget_position(widget)
if position is not None:
_, column = position
@@ -601,89 +522,3 @@ class ScanGroupBox(QGroupBox):
self.selected_devices[device_widget] = ""
self._update_reference_units(device_widget, None)
self._emit_reference_units_changed(device_widget, None)
@staticmethod
def _apply_numeric_precision(widget: ScanDoubleSpinBox, item: dict) -> None:
if not isinstance(widget, ScanDoubleSpinBox):
return
precision = item.get("precision")
if precision is None:
return
try:
widget.setDecimals(max(0, int(precision)))
except (TypeError, ValueError):
logger.warning(
"Ignoring invalid precision %r for parameter %s", precision, item.get("name")
)
@staticmethod
def _apply_numeric_limits(widget: ScanDoubleSpinBox | ScanSpinBox, item: dict) -> None:
if isinstance(widget, ScanSpinBox):
minimum = -2147483647 # largest int which qt allows
maximum = 2147483647
if item.get("ge") is not None:
minimum = int(item["ge"])
if item.get("gt") is not None:
minimum = int(item["gt"]) + 1
if item.get("le") is not None:
maximum = int(item["le"])
if item.get("lt") is not None:
maximum = int(item["lt"]) - 1
widget.setRange(minimum, maximum)
return
if isinstance(widget, ScanDoubleSpinBox):
minimum = -float("inf")
maximum = float("inf")
step = 10 ** (-widget.decimals())
if item.get("ge") is not None:
minimum = float(item["ge"])
if item.get("gt") is not None:
minimum = float(item["gt"]) + step
if item.get("le") is not None:
maximum = float(item["le"])
if item.get("lt") is not None:
maximum = float(item["lt"]) - step
widget.setRange(minimum, maximum)
def _wrap_optional_widget(self, widget, item: dict, default):
if not item.get("optional", False):
return widget
wrapped_widget = ScanOptionalWidget(widget, parent=self)
wrapped_widget.set_none(default is None)
self._wrapped_widgets[widget] = wrapped_widget
return wrapped_widget
@staticmethod
def _inner_widget(widget):
if isinstance(widget, ScanOptionalWidget):
return widget.inner_widget
return widget
def _display_widget(self, widget):
return self._wrapped_widgets.get(widget, widget)
def _widget_value(self, widget, *, device_object: bool = True):
if isinstance(widget, ScanOptionalWidget) and widget.is_none():
return None
inner_widget = self._inner_widget(widget)
if isinstance(inner_widget, DeviceComboBox) and device_object:
return inner_widget.get_current_device()
if isinstance(inner_widget, DeviceComboBox):
return inner_widget.currentText()
if isinstance(inner_widget, ScanLiteralsComboBox):
return inner_widget.get_value()
return WidgetIO.get_value(inner_widget)
def _set_widget_value(self, widget, value) -> None:
if isinstance(widget, ScanOptionalWidget):
widget.set_none(value is None)
if value is None:
return
WidgetIO.set_value(widget.inner_widget, value)
return
WidgetIO.set_value(widget, value)
@@ -2,9 +2,12 @@
from __future__ import annotations
import re
from typing import Any
from bec_widgets.utils.scan_arg_metadata import format_display_name as format_scan_display_name
from bec_widgets.utils.scan_arg_metadata import resolve_tooltip as resolve_scan_tooltip
from bec_widgets.utils.scan_arg_metadata import ui_config_from_metadata
AnnotationValue = str | dict[str, Any] | list[Any] | None
ScanArgumentMetadata = dict[str, Any]
SignatureEntry = dict[str, Any]
@@ -74,8 +77,7 @@ class ScanInfoAdapter:
Returns:
str: Formatted display label such as ``Exp Time``.
"""
parts = re.split(r"(_|\d+)", name)
return " ".join(part.capitalize() for part in parts if part.isalnum()).strip()
return format_scan_display_name(name)
@staticmethod
def resolve_tooltip(scan_argument: ScanArgumentMetadata) -> str | None:
@@ -87,39 +89,32 @@ class ScanInfoAdapter:
Returns:
str | None: Explicit tooltip text if provided, otherwise the description fallback.
"""
return scan_argument.get("tooltip") or scan_argument.get("description")
return resolve_scan_tooltip(scan_argument)
@staticmethod
def parse_annotation(
annotation: AnnotationValue,
) -> tuple[AnnotationValue, ScanArgumentMetadata, bool]:
) -> tuple[AnnotationValue, ScanArgumentMetadata]:
"""Extract the serialized base annotation and ``ScanArgument`` metadata.
Args:
annotation (AnnotationValue): Serialized annotation payload from BEC.
Returns:
tuple[AnnotationValue, ScanArgumentMetadata, bool]: The unwrapped annotation,
parsed ``ScanArgument`` metadata, and whether ``None`` is an allowed value.
tuple[AnnotationValue, ScanArgumentMetadata]: The unwrapped annotation and parsed
``ScanArgument`` metadata.
"""
scan_argument: ScanArgumentMetadata = {}
if isinstance(annotation, dict) and "Annotated" in annotation:
annotated = annotation["Annotated"]
annotation = annotated.get("type", "_empty")
scan_argument = annotated.get("metadata", {}).get("ScanArgument", {}) or {}
allows_none = False
if isinstance(annotation, list):
allows_none = "NoneType" in annotation
annotation = next(
(entry for entry in annotation if entry != "NoneType"),
annotation[0] if annotation else "_empty",
)
elif annotation == "NoneType":
allows_none = True
annotation = "_empty"
return annotation, scan_argument, allows_none
if isinstance(annotation, dict) and "Annotated" in annotation:
annotated = annotation["Annotated"]
annotation = annotated.get("type", "_empty")
scan_argument = annotated.get("metadata", {}).get("ScanArgument", {}) or {}
return annotation, scan_argument
@staticmethod
def scan_arg_type_from_annotation(annotation: AnnotationValue) -> AnnotationValue:
@@ -149,14 +144,13 @@ class ScanInfoAdapter:
Returns:
ScanInputConfig: Normalized input configuration for ``ScanControl``.
"""
annotation, scan_argument, allows_none = self.parse_annotation(param.get("annotation"))
annotation, scan_argument = self.parse_annotation(param.get("annotation"))
return self._build_scan_input(
name=param["name"],
annotation=annotation,
scan_argument=scan_argument,
arg=arg,
default=None if arg else param.get("default", None),
optional=allows_none,
)
def scan_input_from_arg_input(
@@ -179,14 +173,13 @@ class ScanInfoAdapter:
self.parse_annotation(signature_by_name[name].get("annotation"))[0]
)
else:
annotation, scan_argument, allows_none = self.parse_annotation(item_type)
annotation, scan_argument = self.parse_annotation(item_type)
scan_input = self._build_scan_input(
name=name,
annotation=annotation,
scan_argument=scan_argument,
arg=True,
default=None,
optional=allows_none,
)
if scan_input["type"] in ("_empty", None):
scan_input["type"] = item_type
@@ -200,7 +193,6 @@ class ScanInfoAdapter:
*,
arg: bool,
default: Any,
optional: bool,
) -> ScanInputConfig:
"""Build one normalized ScanControl input configuration.
@@ -214,25 +206,13 @@ class ScanInfoAdapter:
Returns:
ScanInputConfig: Normalized input configuration.
"""
return {
"arg": arg,
"name": name,
"type": self.scan_arg_type_from_annotation(annotation),
"display_name": scan_argument.get("display_name") or self.format_display_name(name),
"tooltip": self.resolve_tooltip(scan_argument),
"default": default,
"optional": optional,
"expert": scan_argument.get("expert", False),
"hidden": scan_argument.get("hidden", False),
"precision": scan_argument.get("precision"),
"units": scan_argument.get("units"),
"reference_units": scan_argument.get("reference_units"),
"gt": scan_argument.get("gt"),
"ge": scan_argument.get("ge"),
"lt": scan_argument.get("lt"),
"le": scan_argument.get("le"),
"alternative_group": scan_argument.get("alternative_group"),
}
return ui_config_from_metadata(
name=name,
metadata=scan_argument,
input_type=self.scan_arg_type_from_annotation(annotation),
default=default,
arg=arg,
)
def build_scan_ui_config(self, scan_info: ScanInfo) -> ScanUIConfig:
"""Normalize one available-scan entry into the widget UI configuration.
@@ -0,0 +1 @@
{'files': ['beamline_state_pill.py']}
@@ -0,0 +1,57 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from qtpy.QtWidgets import QWidget
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.services.beamline_states.beamline_state_pill import BeamlineStateManager
DOM_XML = """
<ui language='c++'>
<widget class='BeamlineStateManager' name='beamline_state_manager'>
</widget>
</ui>
"""
class BeamlineStateManagerPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
if parent is None:
return QWidget()
t = BeamlineStateManager(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return "BEC Services"
def icon(self):
return designer_material_icon(BeamlineStateManager.ICON_NAME)
def includeFile(self):
return "beamline_state_manager"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "BeamlineStateManager"
def toolTip(self):
return ""
def whatsThis(self):
return self.toolTip()
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,267 @@
from __future__ import annotations
import slugify
from bec_lib import bl_states
from qtpy.QtWidgets import (
QCheckBox,
QComboBox,
QDialog,
QDialogButtonBox,
QFormLayout,
QGroupBox,
QHBoxLayout,
QLineEdit,
QMessageBox,
QPushButton,
QVBoxLayout,
QWidget,
)
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.forms_from_types.pydantic_widget_form import PydanticWidgetForm
from bec_widgets.utils.name_utils import pascal_to_snake
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
BEAMLINE_STATE_STATUS_LABELS = {
"valid": "VALID",
"invalid": "INVALID",
"warning": "WARNING",
"unknown": "UNKNOWN",
}
SUPPORTED_BEAMLINE_STATES: tuple[type[bl_states.BeamlineState], ...] = (
bl_states.DeviceWithinLimitsState,
bl_states.ShutterState,
)
class AddBeamlineStateDialog(QDialog):
"""Dialog for creating supported beamline state configurations."""
def __init__(self, parent: QWidget | None = None, client=None) -> None:
super().__init__(parent=parent)
self.setWindowTitle("Add Beamline State")
self._cleaned_up = False
self._client = client
self._config: bl_states.BeamlineStateConfig | None = None
self._auto_generated_name: str | None = None
self._type_combo = QComboBox(self)
for state_class in SUPPORTED_BEAMLINE_STATES:
self._type_combo.addItem(state_class.__name__, state_class)
self._type_combo.currentIndexChanged.connect(self._update_config_form)
self._form = QFormLayout()
self._form.addRow("State type", self._type_combo)
self._config_form_host = QVBoxLayout()
self._config_form: PydanticWidgetForm | None = None
self._buttons = QDialogButtonBox(
QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel, parent=self
)
self._buttons.accepted.connect(self.accept)
self._buttons.rejected.connect(self.reject)
layout = QVBoxLayout(self)
layout.addLayout(self._form)
layout.addLayout(self._config_form_host)
layout.addWidget(self._buttons)
self.setLayout(layout)
self.setMinimumWidth(280)
self._update_config_form()
self._fit_height_to_contents()
def config(self) -> bl_states.BeamlineStateConfig:
state_class = self._selected_state_class()
config_class = state_class.CONFIG_CLASS
name = self._state_name()
data = self._config_form.get_data()
data["name"] = name
return config_class.model_validate(data)
def accept(self) -> None:
try:
self._config = self.config()
except Exception as exc:
QMessageBox.warning(self, "Invalid Beamline State", str(exc))
return
super().accept()
@property
def config_result(self) -> bl_states.BeamlineStateConfig:
if self._config is None:
raise RuntimeError("Beamline state dialog was not accepted with a valid config.")
return self._config
def cleanup(self) -> None:
if self._cleaned_up:
return
self._cleaned_up = True
if self._config_form is not None:
self._config_form.cleanup()
self._config_form.close()
self._config_form.deleteLater()
def closeEvent(self, event) -> None: # noqa: N802
self.cleanup()
super().closeEvent(event)
@SafeSlot(str)
def _on_valid_device_selected(self, device: str) -> None:
if self._cleaned_up:
return
name_widget = self._config_form.input_widget("name")
current_name = name_widget.text().strip()
if current_name and current_name != self._auto_generated_name:
return
suffix = slugify.slugify(
pascal_to_snake(self._selected_state_class().__name__), separator="_"
)
generated_name = f"{slugify.slugify(device, separator='_')}_{suffix}"
self._auto_generated_name = generated_name
name_widget.setText(generated_name)
@SafeSlot(int)
def _update_config_form(self, _index: int = 0) -> None:
previous_data = self._config_form.raw_data() if self._config_form is not None else {}
if self._config_form is not None:
self._config_form_host.removeWidget(self._config_form)
self._config_form.cleanup()
self._config_form.setParent(None)
self._config_form.deleteLater()
config_class = self._selected_state_class().CONFIG_CLASS
data = {
key: value
for key, value in previous_data.items()
if key in config_class.model_fields and value is not None
}
self._config_form = PydanticWidgetForm(config_class, parent=self, client=self._client)
self._config_form.set_partial_data(data)
self._config_form_host.addWidget(self._config_form)
for device_widget in self._config_form.input_widgets_by_type(DeviceComboBox):
device_widget.device_selected.connect(self._on_valid_device_selected)
self._fit_height_to_contents()
def _fit_height_to_contents(self) -> None:
self.setMinimumHeight(0)
self.setMaximumHeight(16777215)
self.layout().activate()
self.adjustSize()
height = self.sizeHint().expandedTo(self.minimumSizeHint()).height()
self.setMinimumHeight(height)
self.setMaximumHeight(height)
def _selected_state_class(self) -> type[bl_states.BeamlineState]:
state_class = self._type_combo.currentData()
if state_class is None:
raise RuntimeError("No beamline state class selected.")
return state_class
def _state_name(self) -> str:
name_widget = self._config_form.input_widget("name")
raw_name = name_widget.text().strip()
if not raw_name:
raise ValueError("Name is required.")
name = slugify.slugify(raw_name, separator="_")
name_widget.setText(name)
return name
class StatusFilterDialog(QDialog):
"""Dialog for selecting visible beamline state statuses."""
def __init__(self, selected_statuses: set[str] | None, parent: QWidget | None = None) -> None:
super().__init__(parent=parent)
self.setWindowTitle("Filter Beamline State Status")
self._checkboxes: dict[str, QCheckBox] = {}
controls = QHBoxLayout()
select_all = QPushButton("Select all", self)
clear = QPushButton("Clear", self)
select_all.clicked.connect(lambda: self._set_all(True))
clear.clicked.connect(lambda: self._set_all(False))
controls.addWidget(select_all)
controls.addWidget(clear)
controls.addStretch(1)
list_layout = QVBoxLayout()
for status, label in BEAMLINE_STATE_STATUS_LABELS.items():
checkbox = QCheckBox(label, self)
checkbox.setChecked(selected_statuses is None or status in selected_statuses)
self._checkboxes[status] = checkbox
list_layout.addWidget(checkbox)
list_layout.addStretch(1)
box = QGroupBox("Displayed status", self)
box.setLayout(list_layout)
buttons = QDialogButtonBox(
QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel, parent=self
)
buttons.accepted.connect(self.accept)
buttons.rejected.connect(self.reject)
layout = QVBoxLayout(self)
layout.addLayout(controls)
layout.addWidget(box)
layout.addWidget(buttons)
self.setLayout(layout)
def selected_statuses(self) -> set[str] | None:
selected = {status for status, checkbox in self._checkboxes.items() if checkbox.isChecked()}
if selected == set(self._checkboxes):
return None
return selected
def _set_all(self, checked: bool) -> None:
for checkbox in self._checkboxes.values():
checkbox.setChecked(checked)
class DeviceFilterDialog(QDialog):
"""Dialog for filtering beamline states by configured device."""
def __init__(
self,
devices: list[str],
selected_devices: set[str] | None,
device_filter_text: str,
parent: QWidget | None = None,
) -> None:
super().__init__(parent=parent)
self.setWindowTitle("Filter Beamline State Devices")
self._checkboxes: dict[str, QCheckBox] = {}
self._device_text = QLineEdit(self)
self._device_text.setPlaceholderText("Device name or comma-separated names")
self._device_text.setText(device_filter_text)
list_layout = QVBoxLayout()
for device in devices:
checkbox = QCheckBox(device, self)
checkbox.setChecked(selected_devices is not None and device in selected_devices)
self._checkboxes[device] = checkbox
list_layout.addWidget(checkbox)
list_layout.addStretch(1)
box = QGroupBox("Known devices", self)
box.setLayout(list_layout)
buttons = QDialogButtonBox(
QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel, parent=self
)
buttons.accepted.connect(self.accept)
buttons.rejected.connect(self.reject)
layout = QVBoxLayout(self)
layout.addWidget(self._device_text)
layout.addWidget(box)
layout.addWidget(buttons)
self.setLayout(layout)
def selected_devices(self) -> set[str] | None:
selected = {device for device, checkbox in self._checkboxes.items() if checkbox.isChecked()}
return selected or None
def filter_text(self) -> str:
return self._device_text.text().strip()
@@ -0,0 +1,17 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.services.beamline_states.beamline_state_manager_plugin import (
BeamlineStateManagerPlugin,
)
QPyDesignerCustomWidgetCollection.addCustomWidget(BeamlineStateManagerPlugin())
if __name__ == "__main__": # pragma: no cover
main()
+1
View File
@@ -24,6 +24,7 @@ dependencies = [
"pydantic~=2.0",
"pylsp-bec~=1.2",
"pyqtgraph==0.13.7",
"python-slugify~=8.0",
"qtconsole~=5.5, >=5.5.1", # needed for jupyter console
"qtmonaco~=0.8, >=0.8.1",
"qtpy~=2.4",
@@ -0,0 +1,73 @@
from __future__ import annotations
import uuid
import pytest
from bec_lib.bl_states import DeviceWithinLimitsStateConfig
from bec_widgets.widgets.services.beamline_states.beamline_state_pill import BeamlineStateManager
# pylint: disable=protected-access
def _delete_state_if_present(bec, state_name: str) -> None:
if hasattr(bec.beamline_states, state_name):
bec.beamline_states.delete(state_name)
@pytest.mark.timeout(100)
def test_beamline_state_manager_adds_updates_and_deletes_state_e2e(qtbot, bec_client_lib):
"""
Verify the real BEC beamline-state flow is reflected by BeamlineStateManager.
This test requires the e2e BEC servers and is intended to be run with
``--start-servers``.
"""
bec = bec_client_lib
dev = bec.device_manager.devices
scans = bec.scans
state_name = f"samx_widget_limits_{uuid.uuid4().hex[:8]}"
config = DeviceWithinLimitsStateConfig(
name=state_name, device="samx", signal="samx", low_limit=0.0, high_limit=10.0, tolerance=1.0
)
manager = BeamlineStateManager(client=bec)
qtbot.addWidget(manager)
manager.show()
qtbot.waitExposed(manager)
_delete_state_if_present(bec, state_name)
try:
bec.beamline_states.add(config)
qtbot.waitUntil(lambda: hasattr(bec.beamline_states, state_name), timeout=10000)
qtbot.waitUntil(lambda: state_name in manager._state_pills, timeout=10000)
pill = manager._state_pills[state_name]
assert pill.state_name == state_name
scans.umv(dev.samx, 5, relative=False).wait()
qtbot.waitUntil(
lambda: getattr(bec.beamline_states, state_name).get()["status"] == "valid",
timeout=10000,
)
qtbot.waitUntil(lambda: pill._status == "valid", timeout=10000)
assert pill._status_label.text() == "VALID"
assert pill._detail_label.text() == "Device samx within limits"
scans.umv(dev.samx, 20, relative=False).wait()
qtbot.waitUntil(
lambda: getattr(bec.beamline_states, state_name).get()["status"] == "invalid",
timeout=10000,
)
qtbot.waitUntil(lambda: pill._status == "invalid", timeout=10000)
assert pill._status_label.text() == "INVALID"
assert pill._detail_label.text() == "Device samx out of limits"
bec.beamline_states.delete(state_name)
qtbot.waitUntil(lambda: not hasattr(bec.beamline_states, state_name), timeout=10000)
qtbot.waitUntil(lambda: state_name not in manager._state_pills, timeout=10000)
finally:
_delete_state_if_present(bec, state_name)
scans.umv(dev.samx, 0, relative=False).wait()
+1 -1
View File
@@ -69,7 +69,7 @@ def create_widget(
return widget
@pytest.mark.timeout(20)
@pytest.mark.timeout(100)
def test_available_widgets(qtbot, connected_client_gui_obj):
"""This test checks that all widgets that are available via gui.available_widgets can be created and removed."""
gui = connected_client_gui_obj
@@ -0,0 +1,702 @@
import shiboken6
from bec_lib import bl_states
from qtpy.QtCore import QCoreApplication, QEvent, Qt
from qtpy.QtWidgets import QMessageBox, QStyleOptionViewItem
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.utils.widget_io import WidgetIO
from bec_widgets.widgets.services.beamline_states import beamline_state_pill as pill_module
from bec_widgets.widgets.services.beamline_states.beamline_state_pill import (
BeamlineStateManager,
BeamlineStatePill,
)
from bec_widgets.widgets.services.beamline_states.dialogs import AddBeamlineStateDialog
from .client_mocks import mocked_client
from .conftest import create_widget
def test_beamline_state_pill_updates_from_message(qtbot, mocked_client):
pill = create_widget(
qtbot, BeamlineStatePill, state_name="shutter_open", title="Shutter", client=mocked_client
)
pill.update_state({"name": "shutter_open", "status": "valid", "label": "Shutter is open."}, {})
assert pill._state_name == "shutter_open"
assert pill._name_label.text() == "Shutter"
assert pill._status_label.text() == "VALID"
assert pill._detail_label.text() == "Shutter is open."
assert not pill._icon_label.pixmap().isNull()
assert pill.toolTip() == "Shutter is open."
def test_beamline_state_pill_ignores_other_states(qtbot, mocked_client):
pill = create_widget(
qtbot, BeamlineStatePill, state_name="shutter_open", title="Shutter", client=mocked_client
)
pill.update_state(
{"name": "other_state", "status": "invalid", "label": "Should be ignored."}, {}
)
assert pill._status_label.text() == "UNKNOWN"
assert pill.toolTip() == "No state information available."
def test_beamline_state_pill_expands_and_emits_updated_limits(qtbot, mocked_client):
limits_pill = create_widget(
qtbot, BeamlineStatePill, state_name="limits", title="Limits", client=mocked_client
)
limits_pill.set_state_config(
{
"name": "limits",
"title": "Limits",
"state_type": "DeviceWithinLimitsState",
"parameters": {
"name": "limits",
"title": "Limits",
"device": "samx",
"signal": "samx",
"low_limit": 0.0,
"high_limit": 10.0,
"tolerance": 0.1,
},
}
)
assert limits_pill._settings.isHidden()
assert limits_pill._config_form is None
assert not limits_pill._update_button.isEnabled()
assert not limits_pill._revert_button.isEnabled()
qtbot.mouseClick(limits_pill._header, Qt.MouseButton.LeftButton)
assert limits_pill._config_form is not None
high_limit = limits_pill._config_form.input_widget("high_limit")
high_limit.setValue(20.0)
assert not limits_pill._settings.isHidden()
assert limits_pill._update_button.isEnabled()
assert limits_pill._revert_button.isEnabled()
assert (
limits_pill._config_form.field_widget("high_limit").property("beamlineStateDirty") is True
)
assert limits_pill._config_form.get_data()["device"] == "samx"
assert limits_pill.edited_config().high_limit == 20.0
with qtbot.waitSignal(limits_pill.update_requested) as signal:
limits_pill._update_button.click()
assert signal.args[0] == "limits"
assert isinstance(signal.args[1], bl_states.DeviceWithinLimitsState.CONFIG_CLASS)
assert signal.args[1].device == "samx"
assert signal.args[1].signal == "samx"
assert signal.args[1].low_limit == 0.0
assert signal.args[1].high_limit == 20.0
assert signal.args[1].tolerance == 0.1
assert not limits_pill._settings.isHidden()
def test_beamline_state_pill_first_expand_uses_config_class_without_rebuild(
qtbot, mocked_client, monkeypatch
):
limits_pill = create_widget(
qtbot, BeamlineStatePill, state_name="limits", title="Limits", client=mocked_client
)
set_model_calls = []
original_set_model = pill_module.PydanticWidgetForm.set_model
def set_model_spy(self, model, data=None):
set_model_calls.append(model)
return original_set_model(self, model, data=data)
monkeypatch.setattr(pill_module.PydanticWidgetForm, "set_model", set_model_spy)
limits_pill.set_state_config(
{
"name": "limits",
"title": "Limits",
"state_type": "DeviceWithinLimitsState",
"parameters": {
"device": "samx",
"signal": "samx",
"low_limit": 0.0,
"high_limit": 10.0,
"tolerance": 0.1,
},
}
)
limits_pill.set_expanded(True)
assert limits_pill._config_form is not None
assert set_model_calls == []
def test_beamline_state_pill_reverts_changed_settings(qtbot, mocked_client):
limits_pill = create_widget(
qtbot, BeamlineStatePill, state_name="limits", title="Limits", client=mocked_client
)
limits_pill.set_state_config(
{
"name": "limits",
"title": "Limits",
"state_type": "DeviceWithinLimitsState",
"parameters": {
"device": "samx",
"signal": "samx",
"low_limit": 0.0,
"high_limit": 10.0,
"tolerance": 0.1,
},
}
)
limits_pill.set_expanded(True)
assert limits_pill._config_form is not None
low_limit = limits_pill._config_form.input_widget("low_limit")
low_limit.setValue(-5.0)
assert limits_pill._update_button.isEnabled()
assert limits_pill._config_form.field_widget("low_limit").property("beamlineStateDirty") is True
limits_pill._revert_button.click()
assert low_limit.value() == 0.0
assert not limits_pill._update_button.isEnabled()
assert not limits_pill._revert_button.isEnabled()
assert (
limits_pill._config_form.field_widget("low_limit").property("beamlineStateDirty") is False
)
def test_beamline_state_pill_does_not_override_themed_input_controls(qtbot, mocked_client):
limits_pill = create_widget(
qtbot, BeamlineStatePill, state_name="limits", title="Limits", client=mocked_client
)
limits_pill.set_state_config(
{
"name": "limits",
"title": "Limits",
"state_type": "DeviceWithinLimitsState",
"parameters": {
"device": "samx",
"signal": "samx",
"low_limit": 0.0,
"high_limit": 10.0,
"tolerance": 0.1,
},
}
)
limits_pill.set_expanded(True)
stylesheet = limits_pill.styleSheet()
assert "QAbstractSpinBox" not in stylesheet
assert "QComboBox" not in stylesheet
assert "QCheckBox::indicator" not in stylesheet
def test_beamline_state_manager_adds_and_removes_pills(qtbot, mocked_client):
beamline_state_manager = create_widget(qtbot, BeamlineStateManager, client=mocked_client)
beamline_state_manager.update_available_states(
{
"states": [
{
"name": "shutter_open",
"title": "Shutter",
"state_type": "ShutterState",
"parameters": {},
},
{
"name": "limits",
"title": "Limits",
"state_type": "DeviceWithinLimitsState",
"parameters": {},
},
]
},
{},
)
assert sorted(beamline_state_manager._state_pills) == ["limits", "shutter_open"]
assert beamline_state_manager._model.rowCount() == 2
assert beamline_state_manager._state_pills["shutter_open"]._name_label.text() == "Shutter"
assert not beamline_state_manager._empty_label.isVisible()
beamline_state_manager.update_available_states(
{
"states": [
{
"name": "limits",
"title": "Limits",
"state_type": "DeviceWithinLimitsState",
"parameters": {},
}
]
},
{},
)
assert sorted(beamline_state_manager._state_pills) == ["limits"]
assert beamline_state_manager._model.rowCount() == 1
def test_beamline_state_manager_ignores_unchanged_available_states(qtbot, mocked_client):
beamline_state_manager = create_widget(qtbot, BeamlineStateManager, client=mocked_client)
content = {
"states": [
{
"name": "limits",
"title": "Limits",
"state_type": "DeviceWithinLimitsState",
"parameters": {
"device": "samx",
"signal": "samx",
"low_limit": 0.0,
"high_limit": 10.0,
"tolerance": 0.1,
},
}
]
}
beamline_state_manager.update_available_states(content, {})
pill = beamline_state_manager._state_pills["limits"]
beamline_state_manager.update_available_states(content, {})
assert beamline_state_manager._state_pills["limits"] is pill
assert pill._config_form is None
def test_beamline_state_manager_adds_state_without_recreating_existing_pills(qtbot, mocked_client):
beamline_state_manager = create_widget(qtbot, BeamlineStateManager, client=mocked_client)
limits_state = {
"name": "limits",
"title": "Limits",
"state_type": "DeviceWithinLimitsState",
"parameters": {
"device": "samx",
"signal": "samx",
"low_limit": 0.0,
"high_limit": 10.0,
"tolerance": 0.1,
},
}
shutter_state = {
"name": "shutter_open",
"title": "Shutter",
"state_type": "ShutterState",
"parameters": {},
}
beamline_state_manager.update_available_states({"states": [limits_state]}, {})
pill = beamline_state_manager._state_pills["limits"]
pill.set_expanded(True)
config_form = pill._config_form
beamline_state_manager.update_available_states({"states": [limits_state, shutter_state]}, {})
assert beamline_state_manager._state_pills["limits"] is pill
assert pill._config_form is config_form
assert pill.is_expanded()
assert sorted(beamline_state_manager._state_pills) == ["limits", "shutter_open"]
def test_beamline_state_manager_uses_valid_horizontal_size_hints(qtbot, mocked_client):
beamline_state_manager = create_widget(qtbot, BeamlineStateManager, client=mocked_client)
beamline_state_manager.update_available_states(
{
"states": [
{
"name": "limits",
"title": "Limits",
"state_type": "DeviceWithinLimitsState",
"parameters": {
"device": "samx",
"signal": "samx",
"low_limit": 0.0,
"high_limit": 10.0,
"tolerance": 0.1,
},
}
]
},
{},
)
index = beamline_state_manager._model.index_for_name("limits")
hint = beamline_state_manager._delegate.sizeHint(QStyleOptionViewItem(), index)
assert beamline_state_manager.minimumWidth() == 0
assert beamline_state_manager._view.minimumWidth() == 0
assert beamline_state_manager._view.horizontalScrollBarPolicy() == Qt.ScrollBarAlwaysOff
assert hint.width() > 0
assert hint.height() >= 58
def test_beamline_state_manager_header_click_expands_pill_once(qtbot, mocked_client):
beamline_state_manager = create_widget(qtbot, BeamlineStateManager, client=mocked_client)
beamline_state_manager.update_available_states(
{
"states": [
{
"name": "limits",
"title": "Limits",
"state_type": "DeviceWithinLimitsState",
"parameters": {"device": "samx"},
}
]
},
{},
)
pill = beamline_state_manager._state_pills["limits"]
assert pill._settings.isHidden()
qtbot.mouseClick(pill._header, Qt.MouseButton.LeftButton)
assert not pill._settings.isHidden()
def test_beamline_state_manager_preserves_expanded_pill_on_refresh(qtbot, mocked_client):
beamline_state_manager = create_widget(qtbot, BeamlineStateManager, client=mocked_client)
state = {
"name": "limits",
"title": "Limits",
"state_type": "DeviceWithinLimitsState",
"parameters": {"device": "samx", "high_limit": 10.0},
}
beamline_state_manager.update_available_states({"states": [state]}, {})
beamline_state_manager._state_pills["limits"].set_expanded(True)
beamline_state_manager.update_available_states({"states": [state]}, {})
assert beamline_state_manager._state_pills["limits"].is_expanded()
assert not beamline_state_manager._state_pills["limits"]._settings.isHidden()
def test_beamline_state_manager_propagates_idle_card_background(qtbot, mocked_client):
idle_card_manager = create_widget(
qtbot, BeamlineStateManager, client=mocked_client, idle_card_background=True
)
idle_card_manager.update_available_states(
{
"states": [
{
"name": "limits",
"title": "Limits",
"state_type": "DeviceWithinLimitsState",
"parameters": {"device": "samx"},
}
]
},
{},
)
assert idle_card_manager._state_pills["limits"]._idle_card_background is True
idle_card_manager.idle_card_background = False
assert idle_card_manager._state_pills["limits"]._idle_card_background is False
def test_beamline_state_manager_filters_status(qtbot, mocked_client):
beamline_state_manager = create_widget(qtbot, BeamlineStateManager, client=mocked_client)
beamline_state_manager.update_available_states(
{
"states": [
{
"name": "shutter_open",
"title": "Shutter",
"state_type": "ShutterState",
"parameters": {"device": "samy"},
},
{
"name": "limits",
"title": "Limits",
"state_type": "DeviceWithinLimitsState",
"parameters": {"device": "samx"},
},
]
},
{},
)
assert isinstance(beamline_state_manager._toolbar, ModularToolBar)
assert not beamline_state_manager._toolbar.components.exists("refresh")
beamline_state_manager._state_pills["limits"].update_state(
{"name": "limits", "status": "valid", "label": "Within limits."}, {}
)
beamline_state_manager._state_pills["shutter_open"].update_state(
{"name": "shutter_open", "status": "invalid", "label": "Closed."}, {}
)
beamline_state_manager._selected_statuses = {"valid"}
beamline_state_manager._apply_filters()
assert not beamline_state_manager._hidden_summary.isHidden()
assert "1 state is hidden" in beamline_state_manager._hidden_summary.text()
assert not beamline_state_manager._view.isRowHidden(
beamline_state_manager._model.index_for_name("limits").row()
)
assert beamline_state_manager._view.isRowHidden(
beamline_state_manager._model.index_for_name("shutter_open").row()
)
beamline_state_manager._hidden_summary.click()
assert not beamline_state_manager._view.isRowHidden(
beamline_state_manager._model.index_for_name("shutter_open").row()
)
assert shiboken6.isValid(beamline_state_manager._state_pills["shutter_open"])
beamline_state_manager._hidden_summary.click()
assert beamline_state_manager._view.isRowHidden(
beamline_state_manager._model.index_for_name("shutter_open").row()
)
assert shiboken6.isValid(beamline_state_manager._state_pills["shutter_open"])
def test_beamline_state_manager_status_filter_reacts_to_state_changes(qtbot, mocked_client):
beamline_state_manager = create_widget(qtbot, BeamlineStateManager, client=mocked_client)
beamline_state_manager.update_available_states(
{
"states": [
{
"name": "limits",
"title": "Limits",
"state_type": "DeviceWithinLimitsState",
"parameters": {"device": "samx"},
}
]
},
{},
)
beamline_state_manager._selected_statuses = {"valid"}
beamline_state_manager._state_pills["limits"].update_state(
{"name": "limits", "status": "valid", "label": "Within limits."}, {}
)
assert beamline_state_manager._hidden_summary.isHidden()
beamline_state_manager._state_pills["limits"].update_state(
{"name": "limits", "status": "invalid", "label": "Out of limits."}, {}
)
assert not beamline_state_manager._hidden_summary.isHidden()
assert beamline_state_manager._view.isRowHidden(
beamline_state_manager._model.index_for_name("limits").row()
)
def test_beamline_state_manager_filters_devices(qtbot, mocked_client, monkeypatch):
beamline_state_manager = create_widget(qtbot, BeamlineStateManager, client=mocked_client)
beamline_state_manager.update_available_states(
{
"states": [
{
"name": "samx_limits",
"title": "samx",
"state_type": "DeviceWithinLimitsState",
"parameters": {"device": "samx"},
},
{
"name": "samy_limits",
"title": "samy",
"state_type": "DeviceWithinLimitsState",
"parameters": {"device": "samy"},
},
]
},
{},
)
beamline_state_manager._device_filter_text = "samx"
beamline_state_manager._apply_filters()
assert not beamline_state_manager._hidden_summary.isHidden()
assert "1 state is hidden" in beamline_state_manager._hidden_summary.text()
captured = {}
class FakeDeviceFilterDialog:
def __init__(self, devices, selected_devices, device_filter_text, parent):
captured["devices"] = devices
captured["selected_devices"] = selected_devices
captured["device_filter_text"] = device_filter_text
captured["parent"] = parent
def exec(self):
return 0
monkeypatch.setattr(pill_module, "DeviceFilterDialog", FakeDeviceFilterDialog)
beamline_state_manager.open_device_filter_dialog()
assert captured["devices"] == ["samx", "samy"]
assert captured["device_filter_text"] == "samx"
assert captured["parent"] is beamline_state_manager
def test_beamline_state_manager_updates_state_parameters(qtbot, mocked_client):
beamline_state_manager = create_widget(qtbot, BeamlineStateManager, client=mocked_client)
beamline_state_manager.update_available_states(
{
"states": [
{
"name": "limits",
"title": "Limits",
"state_type": "DeviceWithinLimitsState",
"parameters": {
"device": "samx",
"signal": "samx",
"low_limit": 0.0,
"high_limit": 10.0,
"tolerance": 0.1,
},
}
]
},
{},
)
class StateClient:
def __init__(self):
self.parameters = None
def update_parameters(self, **kwargs):
self.parameters = kwargs
class StateManager:
def __init__(self):
self.limits = StateClient()
mocked_client.beamline_states = StateManager()
pill = beamline_state_manager._state_pills["limits"]
pill.set_expanded(True)
high_limit = pill._config_form.input_widget("high_limit")
high_limit.setValue(20.0)
assert pill._update_button.isEnabled()
beamline_state_manager._update_state_parameters("limits", pill.edited_config())
assert mocked_client.beamline_states.limits.parameters == {
"device": "samx",
"signal": "samx",
"low_limit": 0.0,
"high_limit": 20.0,
"tolerance": 0.1,
}
assert not pill._update_button.isEnabled()
assert pill._config_form.field_widget("high_limit").property("beamlineStateDirty") is False
def test_beamline_state_manager_removes_state(qtbot, mocked_client, monkeypatch):
beamline_state_manager = create_widget(qtbot, BeamlineStateManager, client=mocked_client)
class StateManager:
def __init__(self):
self.deleted = None
def delete(self, state_name):
self.deleted = state_name
mocked_client.beamline_states = StateManager()
monkeypatch.setattr(
QMessageBox, "question", lambda *args, **kwargs: QMessageBox.StandardButton.Yes
)
beamline_state_manager._remove_state_requested("limits")
assert mocked_client.beamline_states.deleted == "limits"
def test_add_beamline_state_dialog_uses_generated_widgets_and_normalizes_name(qtbot, mocked_client):
add_state_dialog = create_widget(qtbot, AddBeamlineStateDialog, client=mocked_client)
limits_index = add_state_dialog._type_combo.findText(bl_states.DeviceWithinLimitsState.__name__)
assert limits_index >= 0
add_state_dialog._type_combo.setCurrentIndex(limits_index)
assert add_state_dialog._config_form.model is bl_states.DeviceWithinLimitsState.CONFIG_CLASS
name = add_state_dialog._config_form.input_widget("name")
device = add_state_dialog._config_form.input_widget("device")
signal = add_state_dialog._config_form.input_widget("signal")
low_limit = add_state_dialog._config_form.field_widget("low_limit")
high_limit = add_state_dialog._config_form.field_widget("high_limit")
name.setText("samx-limits")
WidgetIO.set_value(device, "samx")
WidgetIO.set_value(signal, "samx")
low_limit.checkbox.setChecked(True)
high_limit.checkbox.setChecked(True)
high_limit.value_widget.setValue(15.0)
config = add_state_dialog.config()
assert config.name == "samx_limits"
assert config.device == "samx"
assert config.signal == "samx"
assert config.low_limit == 0.0
assert config.high_limit == 15.0
def test_add_beamline_state_dialog_generates_name_only_after_valid_device_selection(
qtbot, mocked_client
):
add_state_dialog = create_widget(qtbot, AddBeamlineStateDialog, client=mocked_client)
name = add_state_dialog._config_form.input_widget("name")
device = add_state_dialog._config_form.input_widget("device")
device.setCurrentText("s")
assert name.text() == ""
device.set_device("samx")
assert name.text() == "samx_device_within_limits_state"
def test_add_beamline_state_dialog_switches_state_type_without_collapsing(qtbot, mocked_client):
add_state_dialog = create_widget(qtbot, AddBeamlineStateDialog, client=mocked_client)
initial_height = add_state_dialog.height()
limits_index = add_state_dialog._type_combo.findText("DeviceWithinLimitsState")
assert limits_index >= 0
shutter_index = add_state_dialog._type_combo.findText("ShutterState")
assert shutter_index >= 0
add_state_dialog._type_combo.setCurrentIndex(shutter_index)
qtbot.wait(0)
assert add_state_dialog._config_form.model is bl_states.DeviceStateConfig
assert add_state_dialog._config_form_host.count() == 1
assert not add_state_dialog._config_form.isHidden()
assert not add_state_dialog._buttons.isHidden()
assert add_state_dialog.sizeHint().height() > add_state_dialog._buttons.sizeHint().height()
assert add_state_dialog.minimumWidth() == 280
assert add_state_dialog.maximumWidth() > add_state_dialog.minimumWidth()
assert add_state_dialog.minimumHeight() == add_state_dialog.maximumHeight()
add_state_dialog._type_combo.setCurrentIndex(limits_index)
qtbot.wait(0)
assert add_state_dialog._config_form.model is bl_states.DeviceWithinLimitsState.CONFIG_CLASS
assert add_state_dialog.height() >= initial_height
assert add_state_dialog.minimumHeight() == add_state_dialog.maximumHeight()
def test_add_beamline_state_dialog_cleanup_deletes_device_widgets(qtbot, mocked_client):
add_state_dialog = create_widget(qtbot, AddBeamlineStateDialog, client=mocked_client)
device = add_state_dialog._config_form.input_widget("device")
signal = add_state_dialog._config_form.input_widget("signal")
add_state_dialog.reject()
assert shiboken6.isValid(device)
assert shiboken6.isValid(signal)
add_state_dialog.cleanup()
QCoreApplication.sendPostedEvents(None, QEvent.Type.DeferredDelete)
assert not shiboken6.isValid(device)
assert not shiboken6.isValid(signal)
+23 -6
View File
@@ -2,11 +2,11 @@ import pyqtgraph as pg
import pytest
from pydantic import ValidationError
from qtpy.QtGui import QColor
from qtpy.QtWidgets import QVBoxLayout, QWidget
from qtpy.QtWidgets import QApplication, QVBoxLayout, QWidget
from bec_widgets.utils.bec_connector import ConnectionConfig
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import Colors, apply_theme
from bec_widgets.utils.colors import Colors, apply_theme, get_theme_name, rgba, theme_color
from bec_widgets.widgets.plots.waveform.curve import CurveConfig
from tests.unit_tests.client_mocks import mocked_client
from tests.unit_tests.conftest import create_widget
@@ -76,10 +76,27 @@ def test_hex_to_rgba():
Colors.hex_to_rgba("#FF573")
def test_rgba_to_hex():
assert Colors.rgba_to_hex(255, 87, 51, 255) == "#FF5733FF"
assert Colors.rgba_to_hex(255, 87, 51, 128) == "#FF573380"
assert Colors.rgba_to_hex(255, 87, 51) == "#FF5733FF"
def test_get_theme_name_uses_application_theme():
app = QApplication.instance()
assert app.theme.theme == "light"
assert get_theme_name() == "light"
def test_theme_color_uses_theme_color_method():
app = QApplication.instance()
fallback = QColor("#ffffff")
expected = app.theme.color("FG", fallback.name())
expected = expected if isinstance(expected, QColor) else QColor(str(expected))
assert theme_color(app.theme, "FG", fallback).name() == expected.name()
def test_theme_color_returns_fallback_without_theme_color_method():
assert theme_color("light", "FG", QColor("#ffffff")).name() == "#ffffff"
def test_qss_rgba_and_blend_helpers():
assert rgba(QColor("#010203"), 300) == "rgba(1, 2, 3, 255)"
def test_canonical_colormap_name_case_insensitive():
@@ -120,7 +120,7 @@ def test_device_input_combobox_disabled_invalid_has_neutral_border(device_input_
assert "red" in device_input_combobox.styleSheet()
device_input_combobox.setEnabled(False)
assert "transparent" in device_input_combobox.styleSheet()
assert device_input_combobox.styleSheet() == ""
device_input_combobox.setEnabled(True)
assert "red" in device_input_combobox.styleSheet()
+17 -1
View File
@@ -96,6 +96,22 @@ def test_signal_combobox_autocomplete(qtbot, mocked_client):
assert text_changes[-1] == "manual_signal"
def test_signal_combobox_group_headers_are_disabled(qtbot, mocked_client):
widget = create_widget(qtbot=qtbot, widget=SignalComboBox, client=mocked_client)
widget.set_device("samx")
assert widget.itemText(0) == "Hinted Signals"
assert widget.itemText(2) == "Normal Signals"
assert widget.itemText(4) == "Config Signals"
assert widget.model().item(0).isEnabled() is False
assert widget.model().item(2).isEnabled() is False
assert widget.model().item(4).isEnabled() is False
assert widget.set_to_first_enabled() is True
assert widget.currentText() == "samx (readback)"
def test_signal_combobox_qproperties(device_signal_combobox):
device_signal_combobox.include_config_signals = False
device_signal_combobox.include_normal_signals = False
@@ -116,7 +132,7 @@ def test_signal_combobox_disabled_invalid_has_neutral_border(device_signal_combo
assert "red" in device_signal_combobox.styleSheet()
device_signal_combobox.setEnabled(False)
assert "transparent" in device_signal_combobox.styleSheet()
assert device_signal_combobox.styleSheet() == ""
device_signal_combobox.setEnabled(True)
assert "red" in device_signal_combobox.styleSheet()
+10 -1
View File
@@ -869,7 +869,14 @@ class TestToolbarFunctionality:
def test_toolbar_utils_actions(self, advanced_dock_area):
"""Test utils toolbar actions trigger widget creation."""
utils_actions = ["queue", "terminal", "status", "progress_bar", "sbb_monitor"]
utils_actions = [
"queue",
"terminal",
"status",
"progress_bar",
"sbb_monitor",
"beamline_state_manager",
]
for action_name in utils_actions:
with patch.object(advanced_dock_area, "new") as mock_new:
@@ -2428,6 +2435,7 @@ class TestFlatToolbarActions:
"flat_terminal",
"flat_bec_shell",
"flat_sbb_monitor",
"flat_beamline_state_manager",
]
for action_name in utils_actions:
@@ -2472,6 +2480,7 @@ class TestFlatToolbarActions:
"flat_terminal": "BecConsole",
"flat_bec_shell": "BECShell",
"flat_sbb_monitor": "SBBMonitor",
"flat_beamline_state_manager": "BeamlineStateManager",
}
for action_name, widget_type in utils_action_mapping.items():
@@ -1,10 +1,24 @@
from decimal import Decimal
from unittest.mock import patch
import pytest
from bec_lib.device import Device, Signal
from bec_lib.scan_args import ScanArgument
from pydantic import BaseModel, Field
from qtpy.QtWidgets import QCheckBox, QLabel, QLineEdit
from bec_widgets.utils.forms_from_types.forms import PydanticModelForm, TypedForm
from bec_widgets.utils.forms_from_types.items import FloatDecimalFormItem, IntFormItem, StrFormItem
from bec_widgets.utils.forms_from_types.pydantic_widget_form import (
OptionalValueWidget,
PydanticWidgetForm,
)
from bec_widgets.utils.widget_io import WidgetIO
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import SignalComboBox
from bec_widgets.widgets.utility.spinbox.decimal_spinbox import BECSpinBox
from .client_mocks import mocked_client
# pylint: disable=no-member
# pylint: disable=missing-function-docstring
@@ -26,6 +40,80 @@ class ExampleSchema(BaseModel):
decimal_dp_limits_nodefault: Decimal = Field(decimal_places=2, gt=1, le=34.5)
class GeneratedBeamlineSchema(BaseModel):
name: str = Field(title="State name", description="Unique state identifier.")
title: str | None = Field(default=None, title="Display title", description="Visible title.")
device: Device | str = Field(title="Device", description="BEC device.")
signal: Signal | str | None = Field(
default=None, title="Signal", description="Optional device signal."
)
limit: float | None = Field(
default=None,
title="Limit",
description="Optional numeric limit.",
json_schema_extra={"precision": 6},
)
tolerance: float = Field(
default=0.1,
title="Tolerance",
description="Warning tolerance.",
json_schema_extra={"precision": 6},
)
model_config = {"arbitrary_types_allowed": True}
class GeneratedPlainSchema(BaseModel):
sample_name: str
class GeneratedDeviceOnlySchema(BaseModel):
device: Device | str = Field(default="", title="Device")
model_config = {"arbitrary_types_allowed": True}
class GeneratedSignalOnlySchema(BaseModel):
signal: Signal | str | None = Field(default=None, title="Signal")
model_config = {"arbitrary_types_allowed": True}
class GeneratedScanArgumentSchema(BaseModel):
device: Device | str = Field(
default="", **ScanArgument(display_name="Device", description="Device source.").model_dump()
)
signal: Signal | str | None = Field(
default=None,
**ScanArgument(display_name="Signal", description="Signal source.").model_dump(),
)
low_limit: float | None = Field(
default=None,
**ScanArgument(
display_name="Low limit",
description="Optional lower bound.",
reference_units="device",
precision=4,
ge=-5,
le=5,
).model_dump(),
)
exposure: float = Field(
default=0.1,
**ScanArgument(
display_name="Exposure", tooltip="Camera exposure.", units="s", precision=3, gt=0
).model_dump(),
)
model_config = {"arbitrary_types_allowed": True}
class GeneratedRequiredNumericAndOptionalBoolSchema(BaseModel):
enabled: bool | None = None
retry_count: int
scale: float
TEST_DICT = {
"sample_name": "test name",
"str_optional": "None",
@@ -74,3 +162,146 @@ def test_widget_set_data(model_widget: PydanticModelForm):
"decimal_dp_limits_nodefault",
]:
assert model_widget.widget_dict[key].getValue() == TEST_DICT[key]
def test_pydantic_widget_form_uses_field_metadata_and_type_widgets(qtbot, mocked_client):
form = PydanticWidgetForm(GeneratedBeamlineSchema, client=mocked_client)
qtbot.addWidget(form)
assert isinstance(form.input_widget("name"), QLineEdit)
assert isinstance(form.input_widget("device"), DeviceComboBox)
assert isinstance(form.input_widget("signal"), SignalComboBox)
assert isinstance(form.field_widget("limit"), OptionalValueWidget)
assert isinstance(form.input_widget("limit"), BECSpinBox)
assert form.input_widgets_by_type(DeviceComboBox) == [form.input_widget("device")]
assert form.input_widgets_by_type(SignalComboBox) == [form.input_widget("signal")]
label = form.layout().labelForField(form.field_widget("device"))
assert isinstance(label, QLabel)
assert label.text() == "Device"
assert label.toolTip() == "BEC device."
assert form.field_widget("limit").toolTip() == "Optional numeric limit."
def test_pydantic_widget_form_device_signal_variants(qtbot, mocked_client):
device_signal_form = PydanticWidgetForm(GeneratedBeamlineSchema, client=mocked_client)
device_only_form = PydanticWidgetForm(GeneratedDeviceOnlySchema, client=mocked_client)
signal_only_form = PydanticWidgetForm(GeneratedSignalOnlySchema, client=mocked_client)
qtbot.addWidget(device_signal_form)
qtbot.addWidget(device_only_form)
qtbot.addWidget(signal_only_form)
assert isinstance(device_signal_form.input_widget("device"), DeviceComboBox)
assert isinstance(device_signal_form.input_widget("signal"), SignalComboBox)
assert device_signal_form.input_widget("signal").require_device is True
assert isinstance(device_only_form.input_widget("device"), DeviceComboBox)
assert device_only_form.input_widgets_by_type(SignalComboBox) == []
assert isinstance(signal_only_form.input_widget("signal"), SignalComboBox)
assert signal_only_form.input_widget("signal").require_device is False
assert signal_only_form.input_widgets_by_type(DeviceComboBox) == []
def test_pydantic_widget_form_plain_field_has_generated_label_and_no_tooltip(qtbot):
form = PydanticWidgetForm(GeneratedPlainSchema)
qtbot.addWidget(form)
label = form.layout().labelForField(form.field_widget("sample_name"))
assert isinstance(label, QLabel)
assert label.text() == "Sample name"
assert label.toolTip() == ""
assert form.field_widget("sample_name").toolTip() == ""
def test_pydantic_widget_form_uses_scan_argument_metadata(qtbot, mocked_client):
form = PydanticWidgetForm(GeneratedScanArgumentSchema, client=mocked_client)
qtbot.addWidget(form)
low_limit = form.field_widget("low_limit")
low_limit_input = form.input_widget("low_limit")
exposure = form.input_widget("exposure")
low_limit_label = form.layout().labelForField(low_limit)
assert isinstance(low_limit_label, QLabel)
assert low_limit_label.text() == "Low limit"
assert low_limit.toolTip() == "Optional lower bound.\nUnits from: device"
assert low_limit_input.toolTip() == "Optional lower bound.\nUnits from: device"
assert low_limit_input.decimals() == 4
assert low_limit_input.minimum() == pytest.approx(-5)
assert low_limit_input.maximum() == pytest.approx(5)
assert form.field_widget("exposure").toolTip() == "Camera exposure.\nUnits: s"
assert exposure.toolTip() == "Camera exposure.\nUnits: s"
assert exposure.suffix() == " s"
assert exposure.decimals() == 3
assert exposure.minimum() == pytest.approx(0.001)
with patch.object(mocked_client.device_manager.devices.samx, "egu", return_value="mm"):
WidgetIO.set_value(form.input_widget("device"), "samx")
assert low_limit.toolTip() == "Optional lower bound.\nUnits: mm"
assert low_limit_input.toolTip() == "Optional lower bound.\nUnits: mm"
assert low_limit_input.suffix() == " mm"
def test_pydantic_widget_form_cleans_up_on_close(qtbot):
form = PydanticWidgetForm(GeneratedPlainSchema)
qtbot.addWidget(form)
form.close()
assert form.widgets == {}
assert form.layout().count() == 0
def test_pydantic_widget_form_round_trips_optional_numeric_and_dirty_state(qtbot, mocked_client):
form = PydanticWidgetForm(
GeneratedBeamlineSchema,
client=mocked_client,
data={"name": "state_1", "title": "State", "device": "samx", "signal": "samx"},
)
qtbot.addWidget(form)
assert form.get_data()["limit"] is None
limit = form.field_widget("limit")
limit.checkbox.setChecked(True)
form.input_widget("limit").setValue(5.0)
assert form.get_data()["limit"] == 5.0
assert form.model_instance().limit == 5.0
assert "limit" in form.dirty_fields()
form.reset_to_baseline()
assert form.get_data()["limit"] is None
assert form.dirty_fields() == set()
def test_pydantic_widget_form_initializes_required_numeric_fields(qtbot):
form = PydanticWidgetForm(GeneratedRequiredNumericAndOptionalBoolSchema)
qtbot.addWidget(form)
assert form.raw_data()["retry_count"] == 0
assert form.raw_data()["scale"] == 0.0
assert form.model_instance().retry_count == 0
assert form.model_instance().scale == 0.0
def test_pydantic_widget_form_preserves_optional_bool_none(qtbot):
form = PydanticWidgetForm(GeneratedRequiredNumericAndOptionalBoolSchema)
qtbot.addWidget(form)
enabled = form.field_widget("enabled")
assert isinstance(enabled, OptionalValueWidget)
assert isinstance(form.input_widget("enabled"), QCheckBox)
assert form.raw_data()["enabled"] is None
assert form.model_instance().enabled is None
enabled.checkbox.setChecked(True)
form.input_widget("enabled").setChecked(True)
assert form.raw_data()["enabled"] is True
assert form.model_instance().enabled is True
-5
View File
@@ -69,11 +69,6 @@ def test_display_app_id_connected(bec_main_window):
assert bec_main_window._app_id_label.text() == "App ID: gui_123"
def test_event_consumes_status_tip(bec_main_window):
status_tip_event = QEvent(QEvent.Type.StatusTip)
assert bec_main_window.event(status_tip_event) is True
def test_get_launcher_from_qapp_returns_none_when_absent(bec_main_window):
with patch.object(
QApplication, "instance", return_value=SimpleNamespace(topLevelWidgets=lambda: [])
+13
View File
@@ -0,0 +1,13 @@
from bec_widgets.utils.name_utils import pascal_to_snake, sanitize_namespace
def test_pascal_to_snake():
assert pascal_to_snake("DeviceWithinLimitsState") == "device_within_limits_state"
assert pascal_to_snake("BECStatusWidget") == "bec_status_widget"
def test_sanitize_namespace():
assert sanitize_namespace("scan 1 / user") == "scan_1_user"
assert sanitize_namespace(" beamline.state-1 ") == "beamline.state-1"
assert sanitize_namespace(" ") is None
assert sanitize_namespace(None) is None
+26
View File
@@ -40,11 +40,23 @@ def test_apply_theme_updates_colours(qtbot, toast):
"""apply_theme("light") should inject LIGHT palette colours into stylesheets."""
toast.apply_theme("light")
assert LIGHT_PALETTE["title"] in toast._title_lbl.styleSheet()
assert "border: 1px solid" in toast.trace_view.styleSheet()
assert "border:none" not in toast.trace_view.styleSheet()
toast.apply_theme("dark")
assert DARK_PALETTE["title"] in toast._title_lbl.styleSheet()
def test_toast_updates_from_qapp_theme_changed_signal(qtbot, toast):
app = QtWidgets.QApplication.instance()
assert hasattr(app, "theme")
app.theme.theme_changed.emit("light")
qtbot.wait(10)
assert LIGHT_PALETTE["title"] in toast._title_lbl.styleSheet()
def test_expired_signal(qtbot, toast):
"""Toast must emit expired once its lifetime finishes."""
with qtbot.waitSignal(toast.expired, timeout=1000):
@@ -251,6 +263,20 @@ def test_theme_propagation(qtbot, centre):
assert LIGHT_PALETTE["title"] in toast._title_lbl.styleSheet()
def test_centre_updates_from_qapp_theme_changed_signal(qtbot, centre):
toast = _post(centre, SeverityKind.INFO)
centre.apply_theme("dark")
app = QtWidgets.QApplication.instance()
assert hasattr(app, "theme")
app.theme.theme_changed.emit("light")
qtbot.wait(10)
assert centre._theme == "light"
assert LIGHT_PALETTE["title"] in toast._title_lbl.styleSheet()
# ------------------------------------------------------------------------
# NotificationIndicator tests
# ------------------------------------------------------------------------
@@ -0,0 +1,85 @@
import pytest
from qtpy.QtWidgets import QDoubleSpinBox, QSpinBox
from bec_widgets.utils.scan_arg_metadata import (
apply_numeric_limits,
apply_numeric_precision,
apply_unit_metadata,
device_units,
ui_config_from_metadata,
unit_tooltip,
)
from .conftest import create_widget
def test_unit_tooltip_and_cleanup(qtbot):
widget = create_widget(qtbot, QDoubleSpinBox)
item = {"tooltip": "Move start", "reference_units": "device"}
assert unit_tooltip(item) == "Move start\nUnits from: device"
apply_unit_metadata(widget, item)
assert widget.toolTip() == "Move start\nUnits from: device"
assert widget.suffix() == ""
apply_unit_metadata(widget, item, "mm")
assert widget.toolTip() == "Move start\nUnits: mm"
assert widget.suffix() == " mm"
apply_unit_metadata(widget, item, "deg")
assert widget.toolTip() == "Move start\nUnits: deg"
assert widget.suffix() == " deg"
def test_numeric_precision_and_limits(qtbot):
float_widget = create_widget(qtbot, QDoubleSpinBox)
int_widget = create_widget(qtbot, QSpinBox)
apply_numeric_precision(float_widget, {"name": "position", "precision": 3})
apply_numeric_limits(float_widget, {"ge": -1.5, "lt": 2.0})
apply_numeric_limits(int_widget, {"gt": 2, "le": 8})
assert float_widget.decimals() == 3
assert float_widget.minimum() == pytest.approx(-1.5)
assert float_widget.maximum() == pytest.approx(1.999)
assert int_widget.minimum() == 3
assert int_widget.maximum() == 8
def test_device_units_uses_egu():
class Device:
def egu(self):
return "mm"
assert device_units(Device()) == "mm"
assert device_units(object()) is None
def test_ui_config_from_metadata_matches_scan_control_item_shape():
item = ui_config_from_metadata(
name="exp_time",
input_type="float",
default=0.1,
metadata={"tooltip": "Exposure", "units": "s", "precision": 3, "ge": 0},
)
assert item == {
"arg": False,
"name": "exp_time",
"type": "float",
"display_name": "Exp Time",
"tooltip": "Exposure",
"default": 0.1,
"expert": False,
"hidden": False,
"precision": 3,
"units": "s",
"reference_units": None,
"reference_limits": None,
"gt": None,
"ge": 0,
"lt": None,
"le": None,
"alternative_group": None,
}
-38
View File
@@ -442,44 +442,6 @@ def test_scan_info_adapter_skips_duplicate_visible_kwargs():
}
def test_scan_info_adapter_supports_optional_annotated_types():
scan_info = {
"class": "OptionalScan",
"base_class": "ScanBaseV4",
"arg_input": {},
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
"gui_visibility": {"Matching": ["atol"]},
"signature": [
{
"arg": False,
"name": "atol",
"annotation": {
"Annotated": {
"type": ["float", "NoneType"],
"metadata": {
"ScanArgument": {
"display_name": "Tolerance",
"tooltip": "Optional tolerance used for position matching",
}
},
}
},
"default": None,
"kind": "KEYWORD_ONLY",
}
],
}
gui_config = ScanInfoAdapter().build_scan_ui_config(scan_info)
input_spec = gui_config["kwarg_groups"][0]["inputs"][0]
assert input_spec["name"] == "atol"
assert input_spec["type"] == "float"
assert input_spec["optional"] is True
assert input_spec["default"] is None
assert input_spec["display_name"] == "Tolerance"
def test_scan_info_adapter_rejects_unsupported_visible_inputs():
scan_info = {
"class": "UnsupportedScan",
@@ -1,7 +1,7 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
from bec_widgets.utils.widget_io import WidgetIO
from bec_widgets.widgets.control.scan_control.scan_group_box import ScanGroupBox, ScanOptionalWidget
from bec_widgets.widgets.control.scan_control.scan_group_box import ScanGroupBox
def test_kwarg_box(qtbot):
@@ -235,41 +235,3 @@ def test_spinbox_limits_from_scan_info(qtbot):
assert settling_time.maximum() == 3.5
assert steps.minimum() == 1
assert steps.maximum() == 10
def test_optional_kwarg_widget_round_trips_none(qtbot):
group_input = {
"name": "Kwarg Test",
"inputs": [
{
"arg": False,
"name": "atol",
"type": "float",
"display_name": "Tolerance",
"tooltip": "Optional tolerance used for position matching",
"default": None,
"optional": True,
"expert": False,
}
],
}
kwarg_box = ScanGroupBox(box_type="kwargs", config=group_input)
assert isinstance(kwarg_box.widgets[0], ScanOptionalWidget)
assert kwarg_box.widgets[0].none_checkbox.text() == ""
assert kwarg_box.widgets[0].is_none() is True
assert kwarg_box.widgets[0].inner_widget.isEnabled() is False
assert kwarg_box.get_parameters() == {"atol": None}
kwarg_box.set_parameters({"atol": 1.25})
assert kwarg_box.widgets[0].is_none() is False
assert kwarg_box.widgets[0].inner_widget.isEnabled() is True
assert WidgetIO.get_value(kwarg_box.widgets[0].inner_widget) == 1.25
assert kwarg_box.get_parameters() == {"atol": 1.25}
kwarg_box.set_parameters({"atol": None})
assert kwarg_box.widgets[0].is_none() is True
assert kwarg_box.get_parameters() == {"atol": None}
+56
View File
@@ -16,8 +16,12 @@ from qtpy.QtWidgets import (
)
from bec_widgets.utils.widget_io import WidgetHierarchy, WidgetIO, WidgetTreeNode
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import SignalComboBox
from bec_widgets.widgets.utility.toggle.toggle import ToggleSwitch
from .client_mocks import mocked_client
@pytest.fixture(scope="function")
def example_widget(qtbot):
@@ -196,6 +200,58 @@ def test_widget_io_signal(qtbot, example_widget):
assert changes[-1][1] == False
def test_widget_io_device_combobox_handler(qtbot, mocked_client):
widget = DeviceComboBox(client=mocked_client)
qtbot.addWidget(widget)
changes = []
WidgetIO.connect_widget_change_signal(widget, lambda _widget, value: changes.append(value))
WidgetIO.set_value(widget, "samx")
assert WidgetIO.get_value(widget) == "samx"
assert changes[-1] == "samx"
def test_widget_io_device_combobox_handler_accepts_subclasses(qtbot, mocked_client):
class PromotedDeviceComboBox(DeviceComboBox):
pass
widget = PromotedDeviceComboBox(client=mocked_client)
qtbot.addWidget(widget)
WidgetIO.set_value(widget, "samx")
assert WidgetIO.get_value(widget) == "samx"
def test_widget_io_signal_combobox_handler(qtbot, mocked_client):
widget = SignalComboBox(client=mocked_client, require_device=True)
qtbot.addWidget(widget)
changes = []
widget.set_device("samx")
WidgetIO.connect_widget_change_signal(widget, lambda _widget, value: changes.append(value))
WidgetIO.set_value(widget, "samx")
assert WidgetIO.get_value(widget) == "samx"
widget.setCurrentText("")
widget.setCurrentText("samx")
assert changes[-1] == "samx"
def test_widget_io_signal_combobox_handler_accepts_subclasses(qtbot, mocked_client):
class PromotedSignalComboBox(SignalComboBox):
pass
widget = PromotedSignalComboBox(client=mocked_client, require_device=True)
qtbot.addWidget(widget)
widget.set_device("samx")
WidgetIO.set_value(widget, "samx")
assert WidgetIO.get_value(widget) == "samx"
def test_find_widgets(example_widget):
# Test find_widgets by class type
line_edits = WidgetIO.find_widgets(QLineEdit)