Compare commits

..

15 Commits

18 changed files with 1364 additions and 62 deletions
+22
View File
@@ -1,6 +1,28 @@
# CHANGELOG
## v3.12.0 (2026-05-20)
### Bug Fixes
- **scan-control**: Filter out private scans from allowed scans
([`2dc0227`](https://github.com/bec-project/bec_widgets/commit/2dc0227d38f0e217e252a5e5751bafd60363a5a4))
- **scan-control**: Hide hidden scan arguments
([`2d8e1ee`](https://github.com/bec-project/bec_widgets/commit/2d8e1eed4d6503c42a38c8de910ddaa54132405d))
- **scan-control**: Reject unsupported scan input types
([`3b579e7`](https://github.com/bec-project/bec_widgets/commit/3b579e740f36c60c3635681a9b2c35b518498f58))
- **scan-control**: Skip duplicate visible scan kwargs
([`b8740c9`](https://github.com/bec-project/bec_widgets/commit/b8740c95941d36102f07a51d74a50e6f262a6646))
### Features
- Add support for new scan signatures including units
([`d5bf10e`](https://github.com/bec-project/bec_widgets/commit/d5bf10e21682ae8270078c7858a036bafbabf10e))
## v3.11.1 (2026-05-18)
### Bug Fixes
+3 -3
View File
@@ -665,9 +665,9 @@ class LaunchWindow(BECMainWindow):
try:
parent = connection.parent()
if parent is None and connection.objectName() != self.objectName():
logger.info(
f"Found non-launcher connection without parent: {connection.objectName()}"
)
# logger.info(
# f"Found non-launcher connection without parent: {connection.objectName()}"
# ) #TODO disabled due to high count
return False
except Exception as e:
logger.error(f"Error getting parent of connection: {e}")
+18
View File
@@ -22,6 +22,7 @@ logger = bec_logger.logger
if TYPE_CHECKING: # pragma: no cover
from bec_lib.endpoints import EndpointInfo
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.rpc_server import RPCServer
@@ -42,6 +43,7 @@ class QtThreadSafeCallback(QObject):
self.cb_info = cb_info
self.cb = cb
self.cb_owner = louie.saferef.safe_ref(cb.__self__) if hasattr(cb, "__self__") else None
self.cb_ref = louie.saferef.safe_ref(cb)
self.cb_signal.connect(self.cb)
self.topics = set()
@@ -240,6 +242,22 @@ class BECDispatcher:
# pylint: disable=protected-access
self.disconnect_topics(self.client.connector._topics_cb)
def disconnect_owner(self, owner: BECWidget):
"""
Disconnect all slots owned by a particular widget.
Args:
owner(BECWidget): The owner widget whose slots should be disconnected
"""
slots_to_disconnect = []
for connected_slot in self._registered_slots.values():
if connected_slot.cb_owner is not None and connected_slot.cb_owner() == owner:
slots_to_disconnect.append(connected_slot)
for slot in slots_to_disconnect:
topics = slot.topics.copy()
for topic in topics:
self.disconnect_slot(slot.cb, topic)
def start_cli_server(self, gui_id: str | None = None):
"""
Start the CLI server.
+103
View File
@@ -1,5 +1,6 @@
from __future__ import annotations
import threading
from datetime import datetime
from typing import TYPE_CHECKING
@@ -325,19 +326,72 @@ class BECWidget(BECConnector):
return
dock.setFloating()
def _debug_bec_parent_chain(self) -> list[str]:
"""Return BECWidget ancestors for warning-level lifecycle diagnostics."""
chain: list[str] = []
parent = self.parent()
while parent is not None:
if not shiboken6.isValid(parent):
chain.append(f"<invalid parent py_id={id(parent)}>")
break
if isinstance(parent, BECWidget):
chain.append(
f"{parent.__class__.__name__}"
f"(object={parent.objectName()}, py_id={id(parent)}, "
f"destroyed={getattr(parent, '_destroyed', None)})"
)
parent = parent.parent() if hasattr(parent, "parent") else None
return chain
def cleanup(self):
"""Cleanup the widget."""
logger.warning(
"BEC WIDGET LIFECYCLE TRACE | "
f"event=cleanup:start | class={self.__class__.__name__} | "
f"object={self.objectName()} | py_id={id(self)} | "
f"thread={threading.current_thread().name}:{threading.get_ident()} | "
f"destroyed={getattr(self, '_destroyed', None)} | "
f"bec_parent_chain={self._debug_bec_parent_chain()}"
)
with RPCRegister.delayed_broadcast():
# All widgets need to call super().cleanup() in their cleanup method
logger.info(f"Registry cleanup for widget {self.__class__.__name__}")
self.rpc_register.remove_rpc(self)
children = self.findChildren(BECWidget)
logger.warning(
"BEC WIDGET LIFECYCLE TRACE | "
f"event=cleanup:children-found | class={self.__class__.__name__} | "
f"object={self.objectName()} | py_id={id(self)} | child_count={len(children)} | "
f"bec_parent_chain={self._debug_bec_parent_chain()}"
)
for child in children:
if not shiboken6.isValid(child):
# If the child is not valid, it means it has already been deleted
logger.warning(
"BEC WIDGET LIFECYCLE TRACE | "
f"event=cleanup:skip-invalid-child | parent={self.objectName()} | "
f"parent_py_id={id(self)} | child_py_id={id(child)} | "
f"parent_bec_parent_chain={self._debug_bec_parent_chain()}"
)
continue
logger.warning(
"BEC WIDGET LIFECYCLE TRACE | "
f"event=cleanup:closing-child | parent={self.objectName()} | "
f"parent_py_id={id(self)} | child_class={child.__class__.__name__} | "
f"child_object={child.objectName()} | child_py_id={id(child)} | "
f"child_destroyed={getattr(child, '_destroyed', None)} | "
f"parent_bec_parent_chain={self._debug_bec_parent_chain()} | "
f"child_bec_parent_chain={child._debug_bec_parent_chain()}"
)
child.close()
child.deleteLater()
logger.warning(
"BEC WIDGET LIFECYCLE TRACE | "
f"event=cleanup:child-deleteLater-called | parent={self.objectName()} | "
f"parent_py_id={id(self)} | child_class={child.__class__.__name__} | "
f"child_object={child.objectName()} | child_py_id={id(child)} | "
f"child_bec_parent_chain={child._debug_bec_parent_chain()}"
)
# Tear down busy overlay explicitly to stop spinner and remove filters
overlay = getattr(self, "_busy_overlay", None)
@@ -357,12 +411,61 @@ class BECWidget(BECConnector):
overlay.deleteLater()
except Exception as exc:
logger.warning(f"Failed to delete busy overlay: {exc}")
logger.warning(
"BEC WIDGET LIFECYCLE TRACE | "
f"event=cleanup:end | class={self.__class__.__name__} | "
f"object={self.objectName()} | py_id={id(self)} | "
f"destroyed={getattr(self, '_destroyed', None)} | "
f"bec_parent_chain={self._debug_bec_parent_chain()}"
)
def closeEvent(self, event):
"""Wrap the close even to ensure the rpc_register is cleaned up."""
logger.warning(
"BEC WIDGET LIFECYCLE TRACE | "
f"event=closeEvent:enter | class={self.__class__.__name__} | "
f"object={self.objectName()} | py_id={id(self)} | "
f"thread={threading.current_thread().name}:{threading.get_ident()} | "
f"destroyed={getattr(self, '_destroyed', None)} | "
f"bec_parent_chain={self._debug_bec_parent_chain()}"
)
try:
if not self._destroyed:
self.bec_dispatcher.disconnect_owner(self)
logger.warning(
"BEC WIDGET LIFECYCLE TRACE | "
f"event=closeEvent:before-cleanup | class={self.__class__.__name__} | "
f"object={self.objectName()} | py_id={id(self)} | "
f"bec_parent_chain={self._debug_bec_parent_chain()}"
)
self.cleanup()
self._destroyed = True
logger.warning(
"BEC WIDGET LIFECYCLE TRACE | "
f"event=closeEvent:after-cleanup-set-destroyed | "
f"class={self.__class__.__name__} | object={self.objectName()} | "
f"py_id={id(self)} | destroyed={self._destroyed} | "
f"bec_parent_chain={self._debug_bec_parent_chain()}"
)
else:
logger.warning(
"BEC WIDGET LIFECYCLE TRACE | "
f"event=closeEvent:already-destroyed | class={self.__class__.__name__} | "
f"object={self.objectName()} | py_id={id(self)} | "
f"bec_parent_chain={self._debug_bec_parent_chain()}"
)
finally:
logger.warning(
"BEC WIDGET LIFECYCLE TRACE | "
f"event=closeEvent:calling-super | class={self.__class__.__name__} | "
f"object={self.objectName()} | py_id={id(self)} | "
f"bec_parent_chain={self._debug_bec_parent_chain()}"
)
super().closeEvent(event) # pylint: disable=no-member
logger.warning(
"BEC WIDGET LIFECYCLE TRACE | "
f"event=closeEvent:exit | class={self.__class__.__name__} | "
f"object={self.objectName()} | py_id={id(self)} | "
f"destroyed={getattr(self, '_destroyed', None)} | "
f"bec_parent_chain={self._debug_bec_parent_chain()}"
)
+43 -3
View File
@@ -1,6 +1,6 @@
from bec_lib.logger import bec_logger
from qtpy import PYSIDE6
from qtpy.QtCore import QFile, QIODevice
from qtpy.QtCore import QEvent, QFile, QIODevice, QObject
from bec_widgets.utils.plugin_utils import get_designer_plugin
@@ -9,16 +9,56 @@ logger = bec_logger.logger
if PYSIDE6:
from qtpy.QtUiTools import QUiLoader
class _LoadedUiCloser(QObject):
"""Forward root close events to widgets instantiated by ``QUiLoader``.
Destroying a parent widget does not guarantee ``closeEvent`` is delivered to
every child widget. Some of our designer plugins rely on ``closeEvent`` /
``cleanup`` to unregister callbacks, so explicitly close loaded descendants
when the loaded form itself is closed.
"""
def __init__(self, root_widget):
super().__init__(root_widget)
self._root_widget = root_widget
self._widgets = []
root_widget.installEventFilter(self)
def register_widget(self, widget):
if widget is None or widget is self._root_widget:
return
self._widgets.append(widget)
def eventFilter(self, watched, event):
if watched is self._root_widget and event.type() == QEvent.Close:
for widget in reversed(self._widgets):
try:
widget.close()
except RuntimeError:
continue
return super().eventFilter(watched, event)
class CustomUiLoader(QUiLoader):
def __init__(self, baseinstance):
super().__init__(baseinstance)
self.baseinstance = baseinstance
self._closer = _LoadedUiCloser(baseinstance) if baseinstance is not None else None
def createWidget(self, class_name, parent=None, name=""):
if parent is None and self.baseinstance is not None:
return self.baseinstance
widget_parent = parent if parent is not None else self.baseinstance
widget = get_designer_plugin(class_name, raise_on_missing=False)
if widget is not None:
return widget(self.baseinstance)
return super().createWidget(class_name, self.baseinstance, name)
created_widget = widget(widget_parent)
created_widget.setObjectName(name)
else:
created_widget = super().createWidget(class_name, widget_parent, name)
if self._closer is not None:
self._closer.register_widget(created_widget)
return created_widget
class UILoader:
@@ -1,6 +1,7 @@
from __future__ import annotations
import inspect
import threading
from dataclasses import dataclass
from typing import Any, Callable, Literal, Mapping, Sequence, cast
@@ -166,9 +167,31 @@ class DockAreaWidget(BECWidget, QWidget):
def _default_close_handler(self, dock: CDockWidget, widget: QWidget) -> None:
"""Default dock close routine used when no custom handler is provided."""
logger.warning(
"DOCK AREA LIFECYCLE TRACE | "
f"event=default-close-handler:start | dock={dock.objectName()} | "
f"dock_py_id={id(dock)} | widget={widget.objectName()} | "
f"widget_class={widget.__class__.__name__} | widget_py_id={id(widget)} | "
f"thread={threading.current_thread().name}:{threading.get_ident()}"
)
widget.close()
logger.warning(
"DOCK AREA LIFECYCLE TRACE | "
f"event=default-close-handler:after-widget-close | dock={dock.objectName()} | "
f"dock_valid={isValid(dock)} | widget={widget.objectName()} | "
f"widget_valid={isValid(widget)}"
)
dock.closeDockWidget()
logger.warning(
"DOCK AREA LIFECYCLE TRACE | "
f"event=default-close-handler:after-closeDockWidget | dock={dock.objectName()} | "
f"dock_valid={isValid(dock)}"
)
dock.deleteDockWidget()
logger.warning(
"DOCK AREA LIFECYCLE TRACE | "
f"event=default-close-handler:end | dock_py_id={id(dock)} | dock_valid={isValid(dock)}"
)
def close_dock(self, dock: CDockWidget, widget: QWidget | None = None) -> None:
"""
@@ -372,12 +395,45 @@ class DockAreaWidget(BECWidget, QWidget):
close_handler = self._resolve_close_handler(widget, on_close)
def on_widget_destroyed():
logger.warning(
"DOCK AREA LIFECYCLE TRACE | "
f"event=widget_removed-signal:start | dock_py_id={id(dock)} | "
f"dock_valid={isValid(dock)} | widget={widget.objectName()} | "
f"widget_py_id={id(widget)} | thread={threading.current_thread().name}:{threading.get_ident()}"
)
if not isValid(dock):
logger.warning(
"DOCK AREA LIFECYCLE TRACE | "
f"event=widget_removed-signal:dock-invalid-return | dock_py_id={id(dock)}"
)
return
dock.closeDockWidget()
logger.warning(
"DOCK AREA LIFECYCLE TRACE | "
f"event=widget_removed-signal:after-closeDockWidget | dock={dock.objectName()} | "
f"dock_valid={isValid(dock)}"
)
dock.deleteDockWidget()
logger.warning(
"DOCK AREA LIFECYCLE TRACE | "
f"event=widget_removed-signal:end | dock_py_id={id(dock)} | dock_valid={isValid(dock)}"
)
dock.closeRequested.connect(lambda: close_handler(dock))
def on_close_requested():
logger.warning(
"DOCK AREA LIFECYCLE TRACE | "
f"event=closeRequested:start | dock={dock.objectName()} | dock_py_id={id(dock)} | "
f"widget={widget.objectName()} | widget_class={widget.__class__.__name__} | "
f"widget_py_id={id(widget)} | thread={threading.current_thread().name}:{threading.get_ident()}"
)
close_handler(dock)
logger.warning(
"DOCK AREA LIFECYCLE TRACE | "
f"event=closeRequested:end | dock_py_id={id(dock)} | dock_valid={isValid(dock)} | "
f"widget_py_id={id(widget)} | widget_valid={isValid(widget)}"
)
dock.closeRequested.connect(on_close_requested)
if hasattr(widget, "widget_removed"):
widget.widget_removed.connect(on_widget_destroyed)
@@ -410,13 +466,50 @@ class DockAreaWidget(BECWidget, QWidget):
return dock
def _delete_dock(self, dock: CDockWidget) -> None:
logger.warning(
"DOCK AREA LIFECYCLE TRACE | "
f"event=delete-dock:start | dock={dock.objectName()} | dock_py_id={id(dock)} | "
f"dock_valid={isValid(dock)} | thread={threading.current_thread().name}:{threading.get_ident()}"
)
widget = dock.widget()
logger.warning(
"DOCK AREA LIFECYCLE TRACE | "
f"event=delete-dock:widget-resolved | dock_py_id={id(dock)} | "
f"widget={widget.objectName() if widget else None} | "
f"widget_class={widget.__class__.__name__ if widget else None} | "
f"widget_py_id={id(widget) if widget else None} | "
f"widget_valid={isValid(widget) if widget else None}"
)
if widget and isValid(widget):
widget.close()
logger.warning(
"DOCK AREA LIFECYCLE TRACE | "
f"event=delete-dock:after-widget-close | dock_py_id={id(dock)} | "
f"widget={widget.objectName()} | widget_valid={isValid(widget)}"
)
widget.deleteLater()
logger.warning(
"DOCK AREA LIFECYCLE TRACE | "
f"event=delete-dock:after-widget-deleteLater | dock_py_id={id(dock)} | "
f"widget_py_id={id(widget)} | widget_valid={isValid(widget)}"
)
if isValid(dock):
dock.closeDockWidget()
logger.warning(
"DOCK AREA LIFECYCLE TRACE | "
f"event=delete-dock:after-closeDockWidget | dock={dock.objectName()} | "
f"dock_valid={isValid(dock)}"
)
dock.deleteDockWidget()
logger.warning(
"DOCK AREA LIFECYCLE TRACE | "
f"event=delete-dock:end | dock_py_id={id(dock)} | dock_valid={isValid(dock)}"
)
else:
logger.warning(
"DOCK AREA LIFECYCLE TRACE | "
f"event=delete-dock:dock-invalid-skip | dock_py_id={id(dock)}"
)
def _resolve_dock_reference(
self, ref: CDockWidget | QWidget | str | None, *, allow_none: bool = True
@@ -3,13 +3,14 @@
from __future__ import annotations
import enum
import threading
from bec_lib.callback_handler import EventType
from bec_lib.device import ComputedSignal, Device, Positioner, ReadoutPriority
from bec_lib.device import Signal as BECSignal
from bec_lib.logger import bec_logger
from pydantic import Field, field_validator
from qtpy.QtCore import QSize, QStringListModel, Qt, Signal, Slot
from qtpy.QtCore import QSize, QStringListModel, Signal, Slot
from qtpy.QtWidgets import QComboBox, QCompleter, QSizePolicy
from bec_widgets.utils.bec_connector import ConnectionConfig
@@ -216,15 +217,28 @@ class DeviceComboBox(BECWidget, QComboBox):
else:
self.setCurrentText("")
self._log_callback_state("init: before DEVICE_UPDATE register")
self._callback_id = self.bec_dispatcher.client.callbacks.register(
EventType.DEVICE_UPDATE, self.on_device_update
)
# self.device_config_update.connect(
# self.update_devices_from_filters, Qt.ConnectionType.QueuedConnection
# )
self._log_callback_state("init: after DEVICE_UPDATE register")
self.device_config_update.connect(self.update_devices_from_filters)
self._log_callback_state("init: device_config_update connected")
self.currentTextChanged.connect(self.check_validity)
self.check_validity(self.currentText())
self._log_callback_state("init: finished")
def _log_callback_state(self, event: str, **details) -> None:
logger.warning(
"DEVICE COMBOBOX CALLBACK TRACE | "
f"event={event} | object={self.objectName()} | py_id={id(self)} | "
f"thread={threading.current_thread().name}:{threading.get_ident()} | "
f"callback_id={getattr(self, '_callback_id', None)} | "
f"destroyed={getattr(self, '_destroyed', None)} | "
f"current={self.currentText()} | devices_count={len(getattr(self, '_devices', []))} | "
f"bec_parent_chain={self._debug_bec_parent_chain()} | "
f"details={details}"
)
@staticmethod
def _process_config(config: DeviceInputConfig | dict | None) -> DeviceInputConfig:
@@ -258,19 +272,25 @@ class DeviceComboBox(BECWidget, QComboBox):
@SafeSlot()
def update_devices_from_filters(self):
"""Refresh the available device list from current device/readout/signal filters."""
# if getattr(self, "_destroyed", False):
# return
self._log_callback_state(
"update_devices_from_filters: enter",
apply_filter=self.apply_filter,
device_filter=[entry.value for entry in self.device_filter],
readout_filter=[entry.value for entry in self.readout_filter],
signal_class_filter=self.signal_class_filter,
)
self.config.device_filter = [entry.value for entry in self.device_filter]
self.config.readout_filter = [entry.value for entry in self.readout_filter]
self.config.signal_class_filter = self.signal_class_filter
if not self.apply_filter:
self._log_callback_state("update_devices_from_filters: apply-filter false return")
return
devices = self._filter_devices_by_signal_class(self.dev.enabled_devices)
devices = [device for device in devices if self._check_device_filter(device)]
devices = [device for device in devices if self._check_readout_filter(device)]
self.devices = [device.name for device in devices]
self._log_callback_state("update_devices_from_filters: finished")
@SafeSlot(list)
def set_available_devices(self, devices: list[str]):
@@ -495,17 +515,26 @@ class DeviceComboBox(BECWidget, QComboBox):
action: Device update action emitted by BEC.
content: Device update payload. Currently unused.
"""
# if getattr(self, "_destroyed", False):
# return
self._log_callback_state("on_device_update: enter", action=action, content=content)
if action in ["add", "remove", "reload"]:
self._log_callback_state("on_device_update: emitting device_config_update")
self.device_config_update.emit()
self._log_callback_state("on_device_update: emitted device_config_update")
else:
self._log_callback_state("on_device_update: ignored action", action=action)
def cleanup(self):
"""Cleanup the widget."""
self._log_callback_state("cleanup: start")
if self._callback_id is not None:
self._log_callback_state("cleanup: removing callback")
self.bec_dispatcher.client.callbacks.remove(self._callback_id)
self._callback_id = None
self._log_callback_state("cleanup: callback removed")
else:
self._log_callback_state("cleanup: callback already None")
super().cleanup()
self._log_callback_state("cleanup: after super")
def get_current_device(self) -> object:
"""Return the current BEC device object.
@@ -2,6 +2,8 @@
from __future__ import annotations
import threading
from bec_lib.callback_handler import EventType
from bec_lib.device import Signal as BECSignal
from bec_lib.logger import bec_logger
@@ -77,7 +79,6 @@ class SignalComboBox(BECWidget, QComboBox):
device_signal_changed = Signal(str)
signal_reset = Signal()
device_config_update = Signal()
def __init__(
self,
@@ -138,14 +139,13 @@ class SignalComboBox(BECWidget, QComboBox):
if self.config.autocomplete:
self.autocomplete = True
self._log_callback_state("init: before DEVICE_UPDATE register")
self._device_update_register = self.bec_dispatcher.client.callbacks.register(
EventType.DEVICE_UPDATE, self.on_device_update
)
# self.device_config_update.connect(
# self.update_signals_from_filters, Qt.ConnectionType.QueuedConnection
# )
self.device_config_update.connect(self.update_signals_from_filters)
self._log_callback_state("init: after DEVICE_UPDATE register")
self.currentTextChanged.connect(self.on_text_changed)
self._log_callback_state("init: currentTextChanged connected")
self.set_filter(signal_filter or [Kind.hinted, Kind.normal, Kind.config])
@@ -154,6 +154,19 @@ class SignalComboBox(BECWidget, QComboBox):
if default is not None:
self.set_signal(default)
self.check_validity(self.currentText())
self._log_callback_state("init: finished")
def _log_callback_state(self, event: str, **details) -> None:
logger.warning(
"SIGNAL COMBOBOX CALLBACK TRACE | "
f"event={event} | object={self.objectName()} | py_id={id(self)} | "
f"thread={threading.current_thread().name}:{threading.get_ident()} | "
f"callback_id={getattr(self, '_device_update_register', None)} | "
f"destroyed={getattr(self, '_destroyed', None)} | "
f"device={getattr(self, '_device', None)} | current={self.currentText()} | "
f"bec_parent_chain={self._debug_bec_parent_chain()} | "
f"details={details}"
)
@staticmethod
def _process_config(config: SignalComboBoxConfig | dict | None) -> SignalComboBoxConfig:
@@ -195,33 +208,50 @@ class SignalComboBox(BECWidget, QComboBox):
"""
previous_device = self._device
valid_device = device if self.validate_device(device) else None
self._log_callback_state(
"set_device: before update_signals_from_filters",
requested_device=device,
previous_device=previous_device,
valid_device=valid_device,
)
self._device = valid_device
self.config.device = self._device
if valid_device is None or valid_device != previous_device:
self.setCurrentText("")
self.update_signals_from_filters()
self._log_callback_state("set_device: after update_signals_from_filters")
@SafeSlot()
@SafeSlot(dict, dict)
def update_signals_from_filters(
self, content: dict | None = None, metadata: dict | None = None
):
@SafeSlot(str, dict)
def update_signals_from_filters(self, action: str | None = None, content: dict | None = None):
"""Refresh available signals from the current device and filters.
Args:
action: Optional BEC device update action. If provided, only device list changing
actions trigger a refresh.
content: Optional callback payload from BEC device updates. Currently unused.
metadata: Optional callback metadata from BEC device updates. Currently unused.
"""
# if getattr(self, "_destroyed", False):
# return
if action is not None and action not in ["add", "remove", "reload"]:
self._log_callback_state("update_signals_from_filters: ignored action", action=action)
return
self._log_callback_state(
"update_signals_from_filters: enter",
action=action,
content=content,
signal_class_filter=self._signal_class_filter,
require_device=self._require_device,
)
self.config.signal_filter = [kind.name for kind in self.signal_filter]
if self._signal_class_filter:
self._log_callback_state("update_signals_from_filters: class-filter path")
self.update_signals_from_signal_classes()
self._log_callback_state("update_signals_from_filters: class-filter return")
return
if not self.validate_device(self._device):
self._log_callback_state("update_signals_from_filters: invalid-device return")
self._device = None
self.config.device = None
self._set_signal_groups([], [], [])
@@ -231,6 +261,7 @@ class SignalComboBox(BECWidget, QComboBox):
device_info = device._info.get("signals", {})
if isinstance(device, BECSignal):
self._log_callback_state("update_signals_from_filters: bec-signal return")
self._set_signal_groups([(self._device, {})], [], [])
return
@@ -254,13 +285,20 @@ class SignalComboBox(BECWidget, QComboBox):
device_name=self._device,
),
)
self._log_callback_state(
"update_signals_from_filters: finished",
signal_count=len(self._signals),
hinted_count=len(self._hinted_signals),
normal_count=len(self._normal_signals),
config_count=len(self._config_signals),
)
def on_device_update(self, action: str, content: dict) -> None:
"""Refresh filters when BEC reports device configuration changes."""
# if getattr(self, "_destroyed", False):
# return
if action in ["add", "remove", "reload"]:
self.device_config_update.emit()
"""Log BEC device-update callback entry before refreshing filters."""
self._log_callback_state("on_device_update: enter", action=action, content=content)
self._log_callback_state("on_device_update: before update_signals_from_filters")
self.update_signals_from_filters(action, content)
self._log_callback_state("on_device_update: after update_signals_from_filters")
@Property(str)
def device(self) -> str:
@@ -603,10 +641,16 @@ class SignalComboBox(BECWidget, QComboBox):
def cleanup(self):
"""Cleanup the widget."""
self._log_callback_state("cleanup: start")
if self._device_update_register is not None:
self._log_callback_state("cleanup: removing callback")
self.bec_dispatcher.client.callbacks.remove(self._device_update_register)
self._device_update_register = None
self._log_callback_state("cleanup: callback removed")
else:
self._log_callback_state("cleanup: callback already None")
super().cleanup()
self._log_callback_state("cleanup: after super")
@staticmethod
def _normalize_kind(value: Kind | str) -> Kind | None:
@@ -14,7 +14,6 @@ from qtpy.QtWidgets import (
QLabel,
QPushButton,
QSizePolicy,
QSpacerItem,
QVBoxLayout,
QWidget,
)
@@ -25,6 +24,7 @@ from bec_widgets.utils.colors import apply_theme, get_accent_colors
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.widgets.control.buttons.stop_button.stop_button import StopButton
from bec_widgets.widgets.control.scan_control.scan_group_box import ScanGroupBox
from bec_widgets.widgets.control.scan_control.scan_info_adapter import ScanInfoAdapter
from bec_widgets.widgets.editors.scan_metadata.scan_metadata import ScanMetadata
@@ -95,6 +95,7 @@ class ScanControl(BECWidget, QWidget):
self._hide_scan_control_buttons = False
self._hide_metadata = False
self._hide_scan_selection_combobox = False
self._scan_info_adapter = ScanInfoAdapter()
# Create and set main layout
self._init_UI()
@@ -184,12 +185,17 @@ class ScanControl(BECWidget, QWidget):
MessageEndpoints.available_scans()
).resource
if self.config.allowed_scans is None:
supported_scans = ["ScanBase", "SyncFlyScanBase", "AsyncFlyScanBase"]
allowed_scans = [
scan_name
for scan_name, scan_info in self.available_scans.items()
if scan_info["base_class"] in supported_scans and len(scan_info["gui_config"]) > 0
]
supported_scans = ["ScanBase", "SyncFlyScanBase", "AsyncFlyScanBase", "ScanBaseV4"]
def _is_scan_supported(scan_name):
scan_info = self.available_scans[scan_name]
return (
scan_info.get("base_class") in supported_scans
and self._scan_info_adapter.has_scan_ui_config(scan_info)
and not scan_name.startswith("_")
)
allowed_scans = filter(_is_scan_supported, self.available_scans.keys())
else:
allowed_scans = self.config.allowed_scans
@@ -376,14 +382,14 @@ class ScanControl(BECWidget, QWidget):
self.reset_layout()
selected_scan_info = self.available_scans.get(scan_name, {})
gui_config = selected_scan_info.get("gui_config", {})
self.arg_group = gui_config.get("arg_group", None)
self.kwarg_groups = gui_config.get("kwarg_groups", None)
gui_config = self._scan_info_adapter.build_scan_ui_config(selected_scan_info)
arg_group = gui_config.get("arg_group", None)
kwarg_groups = gui_config.get("kwarg_groups", [])
if bool(self.arg_group["arg_inputs"]):
self.add_arg_group(self.arg_group)
if len(self.kwarg_groups) > 0:
self.add_kwargs_boxes(self.kwarg_groups)
if arg_group and bool(arg_group.get("arg_inputs")):
self.add_arg_group(arg_group)
if kwarg_groups:
self.add_kwargs_boxes(kwarg_groups)
self.update()
self.adjustSize()
@@ -414,6 +420,7 @@ class ScanControl(BECWidget, QWidget):
position = self.ARG_BOX_POSITION + (1 if self.arg_box is not None else 0)
for group in groups:
box = ScanGroupBox(box_type="kwargs", config=group)
box.reference_units_changed.connect(self._apply_reference_units_to_other_boxes)
box.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Fixed)
self.layout.insertWidget(position + len(self.kwarg_boxes), box)
self.kwarg_boxes.append(box)
@@ -427,11 +434,30 @@ class ScanControl(BECWidget, QWidget):
"""
self.arg_box = ScanGroupBox(box_type="args", config=group)
self.arg_box.device_selected.connect(self.emit_device_selected)
self.arg_box.reference_units_changed.connect(self._apply_reference_units_to_other_boxes)
self.arg_box.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Fixed)
self.arg_box.hide_add_remove_buttons = self._hide_add_remove_buttons
self.layout.insertWidget(self.ARG_BOX_POSITION, self.arg_box)
self.arg_box.setVisible(not self._hide_arg_box)
def _scan_group_boxes(self) -> list[ScanGroupBox]:
boxes = []
if self.arg_box is not None:
boxes.append(self.arg_box)
boxes.extend(self.kwarg_boxes)
return boxes
def _apply_reference_units_to_other_boxes(
self, source_box: ScanGroupBox, reference_name: str, units: str | None
) -> None:
"""
Propagate device-derived units to scan fields that reference a device in another group.
"""
for box in self._scan_group_boxes():
if box is source_box:
continue
box.apply_reference_units(reference_name, units)
@SafeSlot(str)
def emit_device_selected(self, dev_names):
"""
@@ -174,6 +174,7 @@ class ScanGroupBox(QGroupBox):
}
device_selected = Signal(str)
reference_units_changed = Signal(object, str, object)
def __init__(
self,
@@ -209,6 +210,8 @@ class ScanGroupBox(QGroupBox):
self.labels = []
self.widgets = []
self._widget_configs = {}
self._column_labels = {}
self.selected_devices = {}
self.init_box(self.config)
@@ -247,6 +250,7 @@ class ScanGroupBox(QGroupBox):
label = QLabel(text=display_name)
self.layout.addWidget(label, row, column_index)
self.labels.append(label)
self._column_labels[column_index] = label
def add_input_widgets(self, group_inputs: dict, row) -> None:
"""
@@ -281,20 +285,31 @@ class ScanGroupBox(QGroupBox):
)
else:
widget = widget_class(parent=self.parent(), arg_name=arg_name, default=default)
self._apply_numeric_precision(widget, item)
self._apply_numeric_limits(widget, item)
if isinstance(widget, DeviceComboBox):
self.selected_devices[widget] = ""
widget.device_selected.connect(self.emit_device_selected)
widget.currentTextChanged.connect(
lambda text, device_widget=widget: self._handle_device_text_changed(
device_widget, text
)
)
if isinstance(widget, ScanLiteralsComboBox):
widget.set_literals(item["type"].get("Literal", []))
tooltip = item.get("tooltip", None)
if tooltip is not None:
widget.setToolTip(item["tooltip"])
self._widget_configs[widget] = item
self._apply_unit_metadata(widget, item)
self.layout.addWidget(widget, row, column_index)
self.widgets.append(widget)
@Slot(str)
def emit_device_selected(self, device_name):
self.selected_devices[self.sender()] = device_name.strip()
sender = self.sender()
self.selected_devices[sender] = device_name.strip()
if isinstance(sender, DeviceComboBox):
units = self._device_units(sender.get_current_device())
self._update_reference_units(sender, units)
self._emit_reference_units_changed(sender, units)
selected_devices_str = " ".join(self.selected_devices.values())
self.device_selected.emit(selected_devices_str)
@@ -321,6 +336,7 @@ class ScanGroupBox(QGroupBox):
for widget in self.widgets[-len(self.inputs) :]:
if isinstance(widget, DeviceComboBox):
self.selected_devices[widget] = ""
self._widget_configs.pop(widget, None)
widget.close()
widget.deleteLater()
self.widgets = self.widgets[: -len(self.inputs)]
@@ -333,6 +349,7 @@ class ScanGroupBox(QGroupBox):
for widget in list(self.widgets):
if isinstance(widget, DeviceComboBox):
self.selected_devices.pop(widget, None)
self._widget_configs.pop(widget, None)
widget.close()
widget.deleteLater()
self.layout.removeWidget(widget)
@@ -435,3 +452,159 @@ class ScanGroupBox(QGroupBox):
if widget.arg_name == key:
WidgetIO.set_value(widget, value)
break
@staticmethod
def _unit_tooltip(item: dict, units: str | None = None) -> str | None:
tooltip = item.get("tooltip", None)
reference_units = item.get("reference_units", None)
units = units or item.get("units", None)
tooltip_parts = [tooltip] if tooltip else []
if units:
tooltip_parts.append(f"Units: {units}")
elif reference_units:
tooltip_parts.append(f"Units from: {reference_units}")
if tooltip_parts:
return "\n".join(tooltip_parts)
return None
def _apply_unit_metadata(self, widget, item: dict, units: str | None = None) -> None:
units = units or item.get("units", None)
tooltip = self._unit_tooltip(item, units)
existing_tooltip = widget.toolTip()
if existing_tooltip:
# strip the existing unit info from the tooltip if it exists
# to avoid tooltip bloat on multiple updates
existing_tooltip = "\n".join(
line
for line in existing_tooltip.splitlines()
if not (line.startswith("Units:") or line.startswith("Units from:"))
).strip()
if tooltip:
if existing_tooltip:
widget.setToolTip(f"{existing_tooltip}\n{tooltip}")
else:
widget.setToolTip(tooltip)
if hasattr(widget, "setSuffix"):
widget.setSuffix(f" {units}" if units else "")
def _refresh_column_label(self, column: int, item: dict) -> None:
if column not in self._column_labels:
return
self._column_labels[column].setText(item.get("display_name", item.get("name", None)))
@staticmethod
def _device_units(device) -> str | None:
egu = getattr(device, "egu", None)
if not callable(egu):
return None
try:
return egu()
except Exception:
logger.exception("Failed to fetch engineering units from device %s", device)
return None
def _widget_position(self, widget) -> tuple[int, int] | None:
for row in range(self.layout.rowCount()):
for column in range(self.layout.columnCount()):
item = self.layout.itemAtPosition(row, column)
if item is not None and item.widget() is widget:
return row, column
return None
def _update_reference_units(self, device_widget: DeviceComboBox, units: str | None) -> None:
position = self._widget_position(device_widget)
if position is None:
return
source_row, _ = position
source_name = device_widget.arg_name
for widget in self.widgets:
item = self._widget_configs.get(widget, {})
if item.get("reference_units") != source_name:
continue
widget_position = self._widget_position(widget)
if widget_position is None:
continue
row, column = widget_position
if self.box_type == "args" and row != source_row:
continue
self._apply_unit_metadata(widget, item, units)
self._refresh_column_label(column, item)
def apply_reference_units(self, reference_name: str, units: str | None) -> None:
"""
Apply units to widgets that reference an argument owned by another group box.
Cross-box references only have one widget row, so row scoping is intentionally handled by
the source group before this method is called.
"""
for widget in self.widgets:
item = self._widget_configs.get(widget, {})
if item.get("reference_units") != reference_name:
continue
self._apply_unit_metadata(widget, item, units)
position = self._widget_position(widget)
if position is not None:
_, column = position
self._refresh_column_label(column, item)
def _emit_reference_units_changed(
self, device_widget: DeviceComboBox, units: str | None
) -> None:
reference_name = getattr(device_widget, "arg_name", None)
if not reference_name:
return
self.reference_units_changed.emit(self, reference_name, units)
def _handle_device_text_changed(self, device_widget: DeviceComboBox, device_name: str) -> None:
if not device_widget.validate_device(device_name):
self.selected_devices[device_widget] = ""
self._update_reference_units(device_widget, None)
self._emit_reference_units_changed(device_widget, None)
@staticmethod
def _apply_numeric_precision(widget: ScanDoubleSpinBox, item: dict) -> None:
if not isinstance(widget, ScanDoubleSpinBox):
return
precision = item.get("precision")
if precision is None:
return
try:
widget.setDecimals(max(0, int(precision)))
except (TypeError, ValueError):
logger.warning(
"Ignoring invalid precision %r for parameter %s", precision, item.get("name")
)
@staticmethod
def _apply_numeric_limits(widget: ScanDoubleSpinBox | ScanSpinBox, item: dict) -> None:
if isinstance(widget, ScanSpinBox):
minimum = -2147483647 # largest int which qt allows
maximum = 2147483647
if item.get("ge") is not None:
minimum = int(item["ge"])
if item.get("gt") is not None:
minimum = int(item["gt"]) + 1
if item.get("le") is not None:
maximum = int(item["le"])
if item.get("lt") is not None:
maximum = int(item["lt"]) - 1
widget.setRange(minimum, maximum)
return
if isinstance(widget, ScanDoubleSpinBox):
minimum = -float("inf")
maximum = float("inf")
step = 10 ** (-widget.decimals())
if item.get("ge") is not None:
minimum = float(item["ge"])
if item.get("gt") is not None:
minimum = float(item["gt"]) + step
if item.get("le") is not None:
maximum = float(item["le"])
if item.get("lt") is not None:
maximum = float(item["lt"]) - step
widget.setRange(minimum, maximum)
@@ -0,0 +1,287 @@
"""Helpers for translating BEC scan metadata into ScanControl UI configuration."""
from __future__ import annotations
import re
from typing import Any
AnnotationValue = str | dict[str, Any] | list[Any] | None
ScanArgumentMetadata = dict[str, Any]
SignatureEntry = dict[str, Any]
ScanInputConfig = dict[str, Any]
ScanInfo = dict[str, Any]
ScanUIConfig = dict[str, Any]
SUPPORTED_SCAN_INPUT_TYPES = {"device", "DeviceBase", "float", "int", "bool", "str"}
class ScanInfoAdapter:
"""Normalize available-scan payloads into the structure consumed by ``ScanControl``."""
@staticmethod
def has_scan_ui_config(scan_info: ScanInfo) -> bool:
"""Check whether a scan exposes enough metadata to build a UI.
Args:
scan_info (ScanInfo): Available-scan payload for one scan.
Returns:
bool: ``True`` when a supported GUI metadata field is present.
"""
if not (
scan_info.get("gui_visibility")
or scan_info.get("gui_config")
or scan_info.get("gui_visualization")
or scan_info.get("signature")
):
return False
gui_config = ScanInfoAdapter().build_scan_ui_config(scan_info)
return not ScanInfoAdapter.unsupported_inputs(gui_config)
@staticmethod
def is_supported_input_type(input_type: AnnotationValue) -> bool:
"""Return whether ``ScanGroupBox`` has a widget for this serialized type."""
return (
isinstance(input_type, str)
and input_type in SUPPORTED_SCAN_INPUT_TYPES
or isinstance(input_type, dict)
and "Literal" in input_type
)
@staticmethod
def unsupported_inputs(gui_config: ScanUIConfig) -> list[ScanInputConfig]:
"""Return input configs that cannot be rendered by ``ScanGroupBox``."""
inputs = []
arg_group = gui_config.get("arg_group")
if arg_group:
inputs.extend(arg_group.get("inputs", []))
for group in gui_config.get("kwarg_groups", []):
inputs.extend(group.get("inputs", []))
return [
input_config
for input_config in inputs
if not ScanInfoAdapter.is_supported_input_type(input_config.get("type"))
]
@staticmethod
def format_display_name(name: str) -> str:
"""Convert a parameter name into a user-facing label.
Args:
name (str): Raw parameter name.
Returns:
str: Formatted display label such as ``Exp Time``.
"""
parts = re.split(r"(_|\d+)", name)
return " ".join(part.capitalize() for part in parts if part.isalnum()).strip()
@staticmethod
def resolve_tooltip(scan_argument: ScanArgumentMetadata) -> str | None:
"""Resolve the tooltip text from parsed ``ScanArgument`` metadata.
Args:
scan_argument (ScanArgumentMetadata): Parsed ``ScanArgument`` metadata.
Returns:
str | None: Explicit tooltip text if provided, otherwise the description fallback.
"""
return scan_argument.get("tooltip") or scan_argument.get("description")
@staticmethod
def parse_annotation(
annotation: AnnotationValue,
) -> tuple[AnnotationValue, ScanArgumentMetadata]:
"""Extract the serialized base annotation and ``ScanArgument`` metadata.
Args:
annotation (AnnotationValue): Serialized annotation payload from BEC.
Returns:
tuple[AnnotationValue, ScanArgumentMetadata]: The unwrapped annotation and parsed
``ScanArgument`` metadata.
"""
scan_argument: ScanArgumentMetadata = {}
if isinstance(annotation, list):
annotation = next(
(entry for entry in annotation if entry != "NoneType"),
annotation[0] if annotation else "_empty",
)
if isinstance(annotation, dict) and "Annotated" in annotation:
annotated = annotation["Annotated"]
annotation = annotated.get("type", "_empty")
scan_argument = annotated.get("metadata", {}).get("ScanArgument", {}) or {}
return annotation, scan_argument
@staticmethod
def scan_arg_type_from_annotation(annotation: AnnotationValue) -> AnnotationValue:
"""Normalize an annotation value to the widget type expected by ``ScanControl``.
Args:
annotation (AnnotationValue): Serialized or parsed annotation value.
Returns:
AnnotationValue: The normalized type identifier used by the widget layer.
"""
if isinstance(annotation, dict):
return annotation
if annotation in ("_empty", None):
return "str"
return annotation
def scan_input_from_signature(
self, param: SignatureEntry, arg: bool = False
) -> ScanInputConfig:
"""Build one ScanControl input description from a signature entry.
Args:
param (SignatureEntry): Serialized signature entry.
arg (bool): Whether the parameter belongs to the positional arg bundle.
Returns:
ScanInputConfig: Normalized input configuration for ``ScanControl``.
"""
annotation, scan_argument = self.parse_annotation(param.get("annotation"))
return self._build_scan_input(
name=param["name"],
annotation=annotation,
scan_argument=scan_argument,
arg=arg,
default=None if arg else param.get("default", None),
)
def scan_input_from_arg_input(
self, name: str, item_type: AnnotationValue, signature_by_name: dict[str, SignatureEntry]
) -> ScanInputConfig:
"""Build one arg-bundle input description from ``arg_input`` metadata.
Args:
name (str): Argument name from ``arg_input``.
item_type (AnnotationValue): Serialized argument type from ``arg_input``.
signature_by_name (dict[str, SignatureEntry]): Signature entries indexed by
parameter name.
Returns:
ScanInputConfig: Normalized input configuration for one arg-bundle field.
"""
if name in signature_by_name:
scan_input = self.scan_input_from_signature(signature_by_name[name], arg=True)
scan_input["type"] = self.scan_arg_type_from_annotation(
self.parse_annotation(signature_by_name[name].get("annotation"))[0]
)
else:
annotation, scan_argument = self.parse_annotation(item_type)
scan_input = self._build_scan_input(
name=name,
annotation=annotation,
scan_argument=scan_argument,
arg=True,
default=None,
)
if scan_input["type"] in ("_empty", None):
scan_input["type"] = item_type
return scan_input
def _build_scan_input(
self,
name: str,
annotation: AnnotationValue,
scan_argument: ScanArgumentMetadata,
*,
arg: bool,
default: Any,
) -> ScanInputConfig:
"""Build one normalized ScanControl input configuration.
Args:
name (str): Parameter name.
annotation (AnnotationValue): Parsed annotation value.
scan_argument (ScanArgumentMetadata): Parsed ``ScanArgument`` metadata.
arg (bool): Whether the parameter belongs to the positional arg bundle.
default (Any): Default value for the parameter.
Returns:
ScanInputConfig: Normalized input configuration.
"""
return {
"arg": arg,
"name": name,
"type": self.scan_arg_type_from_annotation(annotation),
"display_name": scan_argument.get("display_name") or self.format_display_name(name),
"tooltip": self.resolve_tooltip(scan_argument),
"default": default,
"expert": scan_argument.get("expert", False),
"hidden": scan_argument.get("hidden", False),
"precision": scan_argument.get("precision"),
"units": scan_argument.get("units"),
"reference_units": scan_argument.get("reference_units"),
"gt": scan_argument.get("gt"),
"ge": scan_argument.get("ge"),
"lt": scan_argument.get("lt"),
"le": scan_argument.get("le"),
"alternative_group": scan_argument.get("alternative_group"),
}
def build_scan_ui_config(self, scan_info: ScanInfo) -> ScanUIConfig:
"""Normalize one available-scan entry into the widget UI configuration.
Args:
scan_info (ScanInfo): Available-scan payload for one scan.
Returns:
ScanUIConfig: Legacy group structure consumed by ``ScanControl`` and
``ScanGroupBox``.
"""
gui_visualization = (
scan_info.get("gui_visualization") or scan_info.get("gui_visibility") or {}
)
if not gui_visualization and scan_info.get("gui_config"):
return scan_info["gui_config"]
signature = scan_info.get("signature", [])
signature_by_name = {entry["name"]: entry for entry in signature}
arg_group = None
arg_input = scan_info.get("arg_input", {})
if isinstance(arg_input, dict) and arg_input:
bundle_size = scan_info.get("arg_bundle_size", {})
inputs = [
self.scan_input_from_arg_input(name, item_type, signature_by_name)
for name, item_type in arg_input.items()
]
arg_group = {
"name": "Scan Arguments",
"bundle": bundle_size.get("bundle"),
"arg_inputs": arg_input,
"inputs": inputs,
"min": bundle_size.get("min"),
"max": bundle_size.get("max"),
}
kwarg_groups = []
arg_names = set(arg_input) if isinstance(arg_input, dict) else set()
visible_kwarg_names = set()
for group_name, input_names in gui_visualization.items():
inputs = []
for input_name in input_names:
if input_name in arg_names or input_name not in signature_by_name:
continue
if input_name in visible_kwarg_names:
continue
param = signature_by_name[input_name]
if param.get("kind") in ("VAR_POSITIONAL", "VAR_KEYWORD"):
continue
scan_input = self.scan_input_from_signature(param)
if scan_input.get("hidden"):
continue
inputs.append(scan_input)
visible_kwarg_names.add(input_name)
if inputs:
kwarg_groups.append({"name": group_name, "inputs": inputs})
return {
"scan_class_name": scan_info.get("class"),
"arg_group": arg_group,
"kwarg_groups": kwarg_groups,
}
@@ -5,6 +5,9 @@ from bec_widgets.utils.toolbars.bundles import ToolbarBundle, ToolbarComponents
from bec_widgets.utils.toolbars.connections import BundleConnection
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import SignalComboBox
from bec_lib.logger import bec_logger
logger = bec_logger.logger
class DeviceSelection(QWidget):
@@ -14,6 +17,7 @@ class DeviceSelection(QWidget):
super().__init__(parent=parent)
self.client = client
self._cleanup_done = False
self.supported_signals = [
"PreviewSignal",
"AsyncSignal",
@@ -138,10 +142,12 @@ class DeviceSelection(QWidget):
def cleanup(self):
"""Clean up the widget resources."""
logger.error("Cleaning up DeviceSelection")
if self._cleanup_done:
return
self._cleanup_done = True
self.device_combo_box.close()
self.device_combo_box.deleteLater()
self.signal_combo_box.close()
self.signal_combo_box.deleteLater()
def device_selection_bundle(components: ToolbarComponents, client=None) -> ToolbarBundle:
+2 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "bec_widgets"
version = "3.11.1"
version = "3.12.0"
description = "BEC Widgets"
requires-python = ">=3.11"
classifiers = [
@@ -64,6 +64,7 @@ qtermwidget = ["pyside6_qtermwidget"]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
@@ -1,3 +1,5 @@
from unittest import mock
import pytest
from bec_lib.device import ReadoutPriority
@@ -124,6 +126,19 @@ def test_device_input_combobox_disabled_invalid_has_neutral_border(device_input_
assert "red" in device_input_combobox.styleSheet()
def test_device_input_combobox_cleanup_unregisters_callback(qtbot, mocked_client):
with mock.patch.object(mocked_client.callbacks, "remove"):
widget = DeviceComboBox(client=mocked_client)
qtbot.addWidget(widget)
callback_id = widget._callback_id
widget.close()
widget.deleteLater()
mocked_client.callbacks.remove.assert_called_once_with(callback_id)
assert widget._callback_id is None
def test_get_device_from_input_combobox_init(device_input_combobox):
device_input_combobox.setCurrentIndex(0)
device_text = device_input_combobox.currentText()
+3 -1
View File
@@ -188,10 +188,12 @@ def test_linked_device_combobox_updates_signal_combobox_on_each_text_change(
def test_device_signal_input_base_cleanup(qtbot, mocked_client):
with mock.patch.object(mocked_client.callbacks, "remove"):
widget = SignalComboBox(client=mocked_client)
callback_id = widget._device_update_register
widget.close()
widget.deleteLater()
mocked_client.callbacks.remove.assert_called_once_with(widget._device_update_register)
mocked_client.callbacks.remove.assert_called_once_with(callback_id)
assert widget._device_update_register is None
def test_signal_combobox_get_signal_name_with_item_data(qtbot, device_signal_combobox):
@@ -35,6 +35,17 @@ def test_initialization_defaults(qtbot, mocked_client):
assert bec_image_view._color_bar is None
def test_image_cleanup_cleans_toolbar_device_selection_callbacks(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
device_selection = bec_image_view.toolbar.components.get_action("device_selection").widget
bec_image_view.cleanup()
assert device_selection._cleanup_done is True
assert device_selection.device_combo_box._callback_id is None
assert device_selection.signal_combo_box._device_update_register is None
def test_setting_color_map(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.color_map = "viridis"
+354
View File
@@ -11,6 +11,7 @@ from bec_widgets.utils.forms_from_types.items import StrFormItem
from bec_widgets.utils.widget_io import WidgetIO
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.scan_control import ScanControl
from bec_widgets.widgets.control.scan_control.scan_info_adapter import ScanInfoAdapter
from .client_mocks import mocked_client
@@ -280,6 +281,359 @@ def test_populate_scans(scan_control, mocked_client):
assert sorted(items) == sorted(expected_scans)
def test_scan_control_uses_gui_visibility_and_signature(qtbot, mocked_client):
scan_info = {
"class": "AnnotatedScan",
"base_class": "ScanBase",
"arg_input": {
"device": "DeviceBase",
"start": {
"Annotated": {
"type": "float",
"metadata": {
"ScanArgument": {
"display_name": "Start Position",
"description": "Start position",
"tooltip": "Custom start tooltip",
"expert": False,
"alternative_group": None,
"units": None,
"reference_units": "device",
}
},
}
},
"stop": {
"Annotated": {
"type": "float",
"metadata": {
"ScanArgument": {
"display_name": None,
"description": "Stop position",
"tooltip": None,
"expert": False,
"alternative_group": None,
"units": None,
"reference_units": "device",
}
},
}
},
},
"arg_bundle_size": {"bundle": 3, "min": 1, "max": None},
"gui_visibility": {
"Movement Parameters": ["steps", "step_size"],
"Acquisition Parameters": ["exp_time", "relative"],
},
"required_kwargs": [],
"signature": [
{"name": "args", "kind": "VAR_POSITIONAL", "default": "_empty", "annotation": "_empty"},
{"name": "steps", "kind": "KEYWORD_ONLY", "default": 10, "annotation": "int"},
{
"name": "step_size",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": {
"Annotated": {
"type": "float",
"metadata": {
"ScanArgument": {
"display_name": "Step Size Custom",
"description": "Step size",
"tooltip": "Custom step tooltip",
"expert": False,
"alternative_group": "scan_resolution",
"units": "mm",
"reference_units": None,
}
},
}
},
},
{
"name": "exp_time",
"kind": "KEYWORD_ONLY",
"default": 0,
"annotation": {
"Annotated": {
"type": "float",
"metadata": {
"ScanArgument": {
"display_name": None,
"description": None,
"tooltip": "Exposure time",
"expert": False,
"alternative_group": None,
"units": "s",
"reference_units": None,
}
},
}
},
},
{"name": "relative", "kind": "KEYWORD_ONLY", "default": False, "annotation": "bool"},
{"name": "kwargs", "kind": "VAR_KEYWORD", "default": "_empty", "annotation": "_empty"},
],
}
mocked_client.connector.set_and_publish(
MessageEndpoints.available_scans(),
AvailableResourceMessage(resource={"annotated_scan": scan_info}),
)
widget = ScanControl(client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
widget.comboBox_scan_selection.setCurrentText("annotated_scan")
assert widget.comboBox_scan_selection.count() == 1
assert widget.arg_box.layout.itemAtPosition(0, 1).widget().text() == "Start Position"
assert "Custom start tooltip\nUnits from: device" in widget.arg_box.widgets[1].toolTip()
with patch.object(mocked_client.device_manager.devices.samx, "egu", return_value="mm"):
WidgetIO.set_value(widget.arg_box.widgets[0], "samx")
assert widget.arg_box.layout.itemAtPosition(0, 1).widget().text() == "Start Position"
assert widget.arg_box.widgets[1].suffix() == " mm"
assert "Custom start tooltip\nUnits: mm" in widget.arg_box.widgets[1].toolTip()
widget.arg_box.widgets[0].setCurrentText("not_a_device")
assert widget.arg_box.layout.itemAtPosition(0, 1).widget().text() == "Start Position"
assert widget.arg_box.widgets[1].suffix() == ""
assert "Custom start tooltip\nUnits from: device" in widget.arg_box.widgets[1].toolTip()
assert [box.title() for box in widget.kwarg_boxes] == [
"Movement Parameters",
"Acquisition Parameters",
]
assert widget.kwarg_boxes[0].layout.itemAtPosition(0, 1).widget().text() == "Step Size Custom"
assert widget.kwarg_boxes[0].widgets[1].suffix() == " mm"
assert "Custom step tooltip\nUnits: mm" in widget.kwarg_boxes[0].widgets[1].toolTip()
assert widget.kwarg_boxes[1].layout.itemAtPosition(0, 0).widget().text() == "Exp Time"
assert "Exposure time\nUnits: s" in widget.kwarg_boxes[1].widgets[0].toolTip()
def test_scan_info_adapter_skips_duplicate_visible_kwargs():
scan_info = {
"class": "DuplicateScan",
"base_class": "ScanBaseV4",
"arg_input": {},
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
"gui_visibility": {
"Scan Parameters": ["relative", "burst_at_each_point"],
"Acquisition Parameters": ["exp_time", "burst_at_each_point"],
},
"signature": [
{"name": "relative", "kind": "KEYWORD_ONLY", "default": False, "annotation": "bool"},
{
"name": "burst_at_each_point",
"kind": "KEYWORD_ONLY",
"default": 1,
"annotation": "int",
},
{"name": "exp_time", "kind": "KEYWORD_ONLY", "default": 0, "annotation": "float"},
],
}
gui_config = ScanInfoAdapter().build_scan_ui_config(scan_info)
groups = {
group["name"]: [input_spec["name"] for input_spec in group["inputs"]]
for group in gui_config["kwarg_groups"]
}
assert groups == {
"Scan Parameters": ["relative", "burst_at_each_point"],
"Acquisition Parameters": ["exp_time"],
}
def test_scan_info_adapter_rejects_unsupported_visible_inputs():
scan_info = {
"class": "UnsupportedScan",
"base_class": "ScanBaseV4",
"arg_input": {},
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
"gui_visibility": {"Regions": ["regions"]},
"signature": [
{
"name": "regions",
"kind": "KEYWORD_ONLY",
"default": "_empty",
"annotation": {
"Generic": {
"origin": "list",
"args": [
{"Generic": {"origin": "tuple", "args": ["float", "float", "int"]}}
],
}
},
}
],
}
gui_config = ScanInfoAdapter().build_scan_ui_config(scan_info)
unsupported_inputs = ScanInfoAdapter.unsupported_inputs(gui_config)
assert [input_spec["name"] for input_spec in unsupported_inputs] == ["regions"]
assert ScanInfoAdapter.has_scan_ui_config(scan_info) is False
def test_scan_info_adapter_skips_hidden_visible_kwargs():
scan_info = {
"class": "HiddenScan",
"base_class": "ScanBaseV4",
"arg_input": {},
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
"gui_visibility": {"Acquisition": ["exp_time", "internal_token"]},
"signature": [
{"name": "exp_time", "kind": "KEYWORD_ONLY", "default": 0, "annotation": "float"},
{
"name": "internal_token",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": {
"Annotated": {
"type": "str",
"metadata": {
"ScanArgument": {"display_name": "Internal Token", "hidden": True}
},
}
},
},
],
}
gui_config = ScanInfoAdapter().build_scan_ui_config(scan_info)
assert [input_spec["name"] for input_spec in gui_config["kwarg_groups"][0]["inputs"]] == [
"exp_time"
]
def test_scan_control_propagates_reference_units_across_kwarg_groups(qtbot, mocked_client):
scan_info = {
"class": "RoundScan",
"base_class": "ScanBaseV4",
"arg_input": {},
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
"gui_visibility": {
"Motors": ["motor_1", "motor_2"],
"Ring Parameters": ["inner_radius", "outer_radius", "center_1", "center_2"],
},
"required_kwargs": [],
"signature": [
{
"name": "motor_1",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "DeviceBase",
},
{
"name": "motor_2",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "DeviceBase",
},
{
"name": "inner_radius",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": {
"Annotated": {
"type": "float",
"metadata": {
"ScanArgument": {
"display_name": "Inner Radius",
"units": None,
"reference_units": "motor_1",
"ge": 0,
}
},
}
},
},
{
"name": "outer_radius",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": {
"Annotated": {
"type": "float",
"metadata": {
"ScanArgument": {
"display_name": "Outer Radius",
"units": None,
"reference_units": "motor_1",
"ge": 0,
}
},
}
},
},
{
"name": "center_1",
"kind": "KEYWORD_ONLY",
"default": 0,
"annotation": {
"Annotated": {
"type": "float",
"metadata": {
"ScanArgument": {
"display_name": "Center Motor 1",
"units": None,
"reference_units": "motor_1",
}
},
}
},
},
{
"name": "center_2",
"kind": "KEYWORD_ONLY",
"default": 0,
"annotation": {
"Annotated": {
"type": "float",
"metadata": {
"ScanArgument": {
"display_name": "Center Motor 2",
"units": None,
"reference_units": "motor_2",
}
},
}
},
},
],
}
mocked_client.connector.set_and_publish(
MessageEndpoints.available_scans(),
AvailableResourceMessage(resource={"round_scan": scan_info}),
)
widget = ScanControl(client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
widget.comboBox_scan_selection.setCurrentText("round_scan")
motor_box = widget.kwarg_boxes[0]
ring_box = widget.kwarg_boxes[1]
assert "Units from: motor_1" in ring_box.widgets[0].toolTip()
assert ring_box.widgets[0].suffix() == ""
with patch.object(mocked_client.device_manager.devices.samx, "egu", return_value="mm"):
WidgetIO.set_value(motor_box.widgets[0], "samx")
assert ring_box.widgets[0].suffix() == " mm"
assert ring_box.widgets[1].suffix() == " mm"
assert ring_box.widgets[2].suffix() == " mm"
assert ring_box.widgets[3].suffix() == ""
assert "Units: mm" in ring_box.widgets[0].toolTip()
motor_box.widgets[0].setCurrentText("not_a_device")
assert ring_box.widgets[0].suffix() == ""
assert ring_box.widgets[1].suffix() == ""
assert ring_box.widgets[2].suffix() == ""
assert "Units from: motor_1" in ring_box.widgets[0].toolTip()
def test_current_scan(scan_control, mocked_client):
current_scan = scan_control.current_scan
wrong_scan = "error_scan"
@@ -67,28 +67,28 @@ def test_kwarg_box(qtbot):
assert kwarg_box.widgets[0].__class__.__name__ == "ScanDoubleSpinBox"
assert kwarg_box.widgets[0].arg_name == "exp_time"
assert WidgetIO.get_value(kwarg_box.widgets[0]) == 0
assert kwarg_box.widgets[0].toolTip() == "Exposure time in seconds"
assert "Exposure time in seconds" in kwarg_box.widgets[0].toolTip()
# Widget 1
assert kwarg_box.widgets[1].__class__.__name__ == "ScanSpinBox"
assert kwarg_box.widgets[1].arg_name == "num_points"
assert WidgetIO.get_value(kwarg_box.widgets[1]) == 1
assert kwarg_box.widgets[1].toolTip() == "Number of points"
assert "Number of points" in kwarg_box.widgets[1].toolTip()
# Widget 2
assert kwarg_box.widgets[2].__class__.__name__ == "ScanCheckBox"
assert kwarg_box.widgets[2].arg_name == "relative"
assert WidgetIO.get_value(kwarg_box.widgets[2]) == False
assert (
kwarg_box.widgets[2].toolTip()
== "If True, the motors will be moved relative to their current position"
"If True, the motors will be moved relative to their current position"
in kwarg_box.widgets[2].toolTip()
)
# Widget 3
assert kwarg_box.widgets[3].__class__.__name__ == "ScanLineEdit"
assert kwarg_box.widgets[3].arg_name == "scan_type"
assert WidgetIO.get_value(kwarg_box.widgets[3]) == "line"
assert kwarg_box.widgets[3].toolTip() == "Type of scan"
assert "Type of scan" in kwarg_box.widgets[3].toolTip()
parameters = kwarg_box.get_parameters()
assert parameters == {"exp_time": 0, "num_points": 1, "relative": False, "scan_type": "line"}
@@ -146,14 +146,92 @@ def test_arg_box(qtbot):
assert arg_box.widgets[0].__class__.__name__ == "ScanLineEdit"
assert arg_box.widgets[0].arg_name == "device"
assert WidgetIO.get_value(arg_box.widgets[0]) == "samx"
assert arg_box.widgets[0].toolTip() == "Device to scan"
assert "Device to scan" in arg_box.widgets[0].toolTip()
# Widget 1
assert arg_box.widgets[1].__class__.__name__ == "ScanDoubleSpinBox"
assert arg_box.widgets[1].arg_name == "start"
assert WidgetIO.get_value(arg_box.widgets[1]) == 0
assert arg_box.widgets[1].toolTip() == "Start position"
assert "Start position" in arg_box.widgets[1].toolTip()
# Widget 2
assert arg_box.widgets[2].__class__.__name__ == "ScanSpinBox"
assert arg_box.widgets[2].arg_name
def test_spinbox_limits_from_scan_info(qtbot):
group_input = {
"name": "Kwarg Test",
"inputs": [
{
"arg": False,
"name": "exp_time",
"type": "float",
"display_name": "Exp Time",
"tooltip": "Exposure time in seconds",
"default": 2.0,
"expert": False,
"precision": 3,
"gt": 1.5,
"ge": None,
"lt": 5.0,
"le": None,
},
{
"arg": False,
"name": "num_points",
"type": "int",
"display_name": "Num Points",
"tooltip": "Number of points",
"default": 4,
"expert": False,
"gt": None,
"ge": 3,
"lt": 9,
"le": None,
},
{
"arg": False,
"name": "settling_time",
"type": "float",
"display_name": "Settling Time",
"tooltip": "Settling time in seconds",
"default": 0.5,
"expert": False,
"gt": None,
"ge": 0.2,
"lt": None,
"le": 3.5,
},
{
"arg": False,
"name": "steps",
"type": "int",
"display_name": "Steps",
"tooltip": "Number of steps",
"default": 4,
"expert": False,
"gt": 0,
"ge": None,
"lt": None,
"le": 10,
},
],
}
kwarg_box = ScanGroupBox(box_type="kwargs", config=group_input)
exp_time = kwarg_box.widgets[0]
num_points = kwarg_box.widgets[1]
settling_time = kwarg_box.widgets[2]
steps = kwarg_box.widgets[3]
assert exp_time.decimals() == 3
assert exp_time.minimum() == 1.501
assert exp_time.maximum() == 4.999
assert num_points.minimum() == 3
assert num_points.maximum() == 8
assert settling_time.minimum() == 0.2
assert settling_time.maximum() == 3.5
assert steps.minimum() == 1
assert steps.maximum() == 10