1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-08 01:37:53 +02:00

Compare commits

..

4 Commits

15 changed files with 724 additions and 478 deletions

View File

@@ -549,7 +549,7 @@ class LaunchWindow(BECMainWindow):
remaining_connections = [
connection for connection in connections.values() if connection.parent_id != self.gui_id
]
return len(remaining_connections) <= 4
return len(remaining_connections) <= 5
def _turn_off_the_lights(self, connections: dict):
"""

View File

@@ -1,3 +1,5 @@
from __future__ import annotations
from qtpy.QtWidgets import QWidget
from bec_widgets.applications.views.developer_view.developer_widget import DeveloperWidget

View File

@@ -18,6 +18,7 @@ from qtpy.QtWidgets import (
)
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.toolbars.status_bar import StatusToolBar
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import SignalComboBox
from bec_widgets.widgets.plots.waveform.waveform import Waveform
@@ -69,6 +70,7 @@ class ViewBase(QWidget):
parent (QWidget | None): Parent widget.
id (str | None): Optional view id, useful for debugging or introspection.
title (str | None): Optional human-readable title.
show_status (bool): Whether to show a status toolbar at the top of the view.
"""
def __init__(
@@ -78,6 +80,8 @@ class ViewBase(QWidget):
*,
id: str | None = None,
title: str | None = None,
show_status: bool = False,
status_names: list[str] | None = None,
):
super().__init__(parent=parent)
self.content: QWidget | None = None
@@ -88,15 +92,48 @@ class ViewBase(QWidget):
lay.setContentsMargins(0, 0, 0, 0)
lay.setSpacing(0)
self.status_bar: StatusToolBar | None = None
if show_status:
# If explicit status names are provided, default to showing only those.
show_all = status_names is None
self.setup_status_bar(show_all_status=show_all, status_names=status_names)
if content is not None:
self.set_content(content)
def set_content(self, content: QWidget) -> None:
"""Replace the current content widget with a new one."""
if self.content is not None:
self.layout().removeWidget(self.content)
self.content.setParent(None)
self.content.close()
self.content.deleteLater()
self.content = content
self.layout().addWidget(content)
if self.status_bar is not None:
insert_at = self.layout().indexOf(self.status_bar) + 1
self.layout().insertWidget(insert_at, content)
else:
self.layout().addWidget(content)
def setup_status_bar(
self, *, show_all_status: bool = True, status_names: list[str] | None = None
) -> None:
"""Create and attach a status toolbar managed by the status broker."""
if self.status_bar is not None:
return
names_arg = None if show_all_status else status_names
self.status_bar = StatusToolBar(parent=self, names=names_arg)
self.layout().addWidget(self.status_bar)
def set_status(
self, name: str = "main", *, state=None, text: str | None = None, tooltip: str | None = None
) -> None:
"""Manually set a status item on the status bar."""
if self.status_bar is None:
self.setup_status_bar(show_all_status=True)
if self.status_bar is None:
return
self.status_bar.set_status(name=name, state=state, text=text, tooltip=tooltip)
@SafeSlot()
def on_enter(self) -> None:

View File

@@ -25,7 +25,6 @@ from qtpy.QtWidgets import (
QWidget,
)
from bec_widgets import BECWidget
from bec_widgets.cli.rpc.rpc_widget_handler import widget_handler
from bec_widgets.utils.colors import apply_theme
from bec_widgets.utils.widget_io import WidgetHierarchy as wh
@@ -48,6 +47,7 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
self._widgets_by_name: Dict[str, QWidget] = {}
self._init_ui()
self.app = QApplication.instance()
# expose helper API and basics in the inprocess console
if self.console.inprocess is True:
@@ -94,7 +94,7 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
return list(widget_handler.widget_classes.keys())
self.jc = _ConsoleAPI(self)
self._push_to_console({"jc": self.jc, "np": np, "pg": pg, "wh": wh})
self._push_to_console({"jc": self.jc, "np": np, "pg": pg, "wh": wh, "app": self.app})
def _init_ui(self):
self.layout = QHBoxLayout(self)

View File

@@ -5,7 +5,8 @@ import os
import weakref
from abc import ABC, abstractmethod
from contextlib import contextmanager
from typing import Dict, Literal
from enum import Enum
from typing import Dict, Literal, Union
from bec_lib.device import ReadoutPriority
from bec_lib.logger import bec_logger
@@ -15,6 +16,7 @@ from qtpy.QtGui import QAction, QColor, QIcon # type: ignore
from qtpy.QtWidgets import (
QApplication,
QComboBox,
QGraphicsDropShadowEffect,
QHBoxLayout,
QLabel,
QMenu,
@@ -26,6 +28,7 @@ from qtpy.QtWidgets import (
)
import bec_widgets
from bec_widgets.utils.colors import AccentColors, get_accent_colors
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import BECDeviceFilter
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
@@ -101,6 +104,205 @@ class LongPressToolButton(QToolButton):
self.showMenu()
class StatusState(str, Enum):
DEFAULT = "default"
HIGHLIGHT = "highlight"
WARNING = "warning"
EMERGENCY = "emergency"
SUCCESS = "success"
class StatusIndicatorWidget(QWidget):
"""Pill-shaped status indicator with icon + label using accent colors."""
def __init__(
self, parent=None, text: str = "Ready", state: StatusState | str = StatusState.DEFAULT
):
super().__init__(parent)
self.setObjectName("StatusIndicatorWidget")
self._text = text
self._state = self._normalize_state(state)
self._theme_connected = False
layout = QHBoxLayout(self)
layout.setContentsMargins(6, 2, 8, 2)
layout.setSpacing(6)
self._icon_label = QLabel(self)
self._icon_label.setFixedSize(18, 18)
self._text_label = QLabel(self)
self._text_label.setText(self._text)
layout.addWidget(self._icon_label)
layout.addWidget(self._text_label)
# Give it a consistent pill height
self.setMinimumHeight(24)
self.setSizePolicy(QSizePolicy.Policy.Minimum, QSizePolicy.Policy.Fixed)
# Soft shadow similar to notification banners
self._shadow = QGraphicsDropShadowEffect(self)
self._shadow.setBlurRadius(18)
self._shadow.setOffset(0, 2)
self.setGraphicsEffect(self._shadow)
self._apply_state(self._state)
self._connect_theme_change()
def set_state(self, state: Union[StatusState, str]):
"""Update state and refresh visuals."""
self._state = self._normalize_state(state)
self._apply_state(self._state)
def set_text(self, text: str):
"""Update the displayed text."""
self._text = text
self._text_label.setText(text)
def _apply_state(self, state: StatusState):
palette = self._resolve_accent_colors()
color_attr = {
StatusState.DEFAULT: "default",
StatusState.HIGHLIGHT: "highlight",
StatusState.WARNING: "warning",
StatusState.EMERGENCY: "emergency",
StatusState.SUCCESS: "success",
}.get(state, "default")
base_color = getattr(palette, color_attr, None) or getattr(
palette, "default", QColor("gray")
)
# Apply style first (returns text color for label)
text_color = self._update_style(base_color, self._theme_fg_color())
theme_name = self._theme_name()
# Choose icon per state
icon_name_map = {
StatusState.DEFAULT: "check_circle",
StatusState.HIGHLIGHT: "check_circle",
StatusState.SUCCESS: "check_circle",
StatusState.WARNING: "warning",
StatusState.EMERGENCY: "dangerous",
}
icon_name = icon_name_map.get(state, "check_circle")
# Icon color:
# - Dark mode: follow text color (usually white) for high contrast.
# - Light mode: use a stronger version of the accent color for a colored glyph
# that stands out on the pastel pill background.
if theme_name == "light":
icon_q = QColor(base_color)
icon_color = icon_q.name(QColor.HexRgb)
else:
icon_color = text_color
icon = material_icon(
icon_name, size=(18, 18), convert_to_pixmap=False, filled=True, color=icon_color
)
if not icon.isNull():
self._icon_label.setPixmap(icon.pixmap(18, 18))
def _update_style(self, color: QColor, fg_color: QColor) -> str:
# Ensure the widget actually paints its own background
self.setAttribute(Qt.WidgetAttribute.WA_StyledBackground, True)
fg = QColor(fg_color)
text_color = fg.name(QColor.HexRgb)
theme_name = self._theme_name()
base = QColor(color)
start = QColor(base)
end = QColor(base)
border = QColor(base)
if theme_name == "light":
start.setAlphaF(0.20)
end.setAlphaF(0.06)
else:
start.setAlphaF(0.35)
end.setAlphaF(0.12)
border = border.darker(120)
# shadow color tuned per theme to match notification banners
if hasattr(self, "_shadow"):
if theme_name == "light":
shadow_color = QColor(15, 23, 42, 60) # softer shadow on light bg
else:
shadow_color = QColor(0, 0, 0, 160)
self._shadow.setColor(shadow_color)
# Use a fixed radius for a stable pill look inside toolbars
radius = 10
self.setStyleSheet(
f"""
#StatusIndicatorWidget {{
background-color: qlineargradient(spread:pad, x1:0, y1:0, x2:1, y2:1,
stop:0 {start.name(QColor.HexArgb)}, stop:1 {end.name(QColor.HexArgb)});
border: 1px solid {border.name(QColor.HexRgb)};
border-radius: {radius}px;
padding: 2px 8px;
}}
#StatusIndicatorWidget QLabel {{
color: {text_color};
background: transparent;
}}
"""
)
return text_color
def _theme_fg_color(self) -> QColor:
app = QApplication.instance()
theme = getattr(app, "theme", None)
if theme is not None and hasattr(theme, "color"):
try:
fg = theme.color("FG")
if isinstance(fg, QColor):
return fg
except Exception:
pass
palette = self._resolve_accent_colors()
base = getattr(palette, "default", QColor("white"))
luminance = (0.299 * base.red() + 0.587 * base.green() + 0.114 * base.blue()) / 255
return QColor("#000000") if luminance > 0.65 else QColor("#ffffff")
def _theme_name(self) -> str:
app = QApplication.instance()
theme = getattr(app, "theme", None)
name = getattr(theme, "theme", None)
if isinstance(name, str):
return name.lower()
return "dark"
def _connect_theme_change(self):
if self._theme_connected:
return
app = QApplication.instance()
theme = getattr(app, "theme", None)
if theme is not None and hasattr(theme, "theme_changed"):
try:
theme.theme_changed.connect(lambda _: self._apply_state(self._state))
self._theme_connected = True
except Exception:
pass
@staticmethod
def _normalize_state(state: Union[StatusState, str]) -> StatusState:
if isinstance(state, StatusState):
return state
try:
return StatusState(state)
except ValueError:
return StatusState.DEFAULT
@staticmethod
def _resolve_accent_colors() -> AccentColors:
return get_accent_colors()
class ToolBarAction(ABC):
"""
Abstract base class for toolbar actions.
@@ -147,6 +349,54 @@ class SeparatorAction(ToolBarAction):
toolbar.addSeparator()
class StatusIndicatorAction(ToolBarAction):
"""Toolbar action hosting a LED indicator and status text."""
def __init__(
self,
*,
text: str = "Ready",
state: Union[StatusState, str] = StatusState.DEFAULT,
tooltip: str | None = None,
):
super().__init__(icon_path=None, tooltip=tooltip or "View status", checkable=False)
self._text = text
self._state: StatusState = StatusIndicatorWidget._normalize_state(state)
self.widget: StatusIndicatorWidget | None = None
self.tooltip = tooltip or ""
def add_to_toolbar(self, toolbar: QToolBar, target: QWidget):
if (
self.widget is None
or self.widget.parent() is None
or self.widget.parent() is not toolbar
):
self.widget = StatusIndicatorWidget(parent=toolbar, text=self._text, state=self._state)
self.action = toolbar.addWidget(self.widget)
self.action.setText(self._text)
self.set_tooltip(self.tooltip)
def set_state(self, state: Union[StatusState, str]):
self._state = StatusIndicatorWidget._normalize_state(state)
if self.widget is not None:
self.widget.set_state(self._state)
def set_text(self, text: str):
self._text = text
if self.widget is not None:
self.widget.set_text(text)
if hasattr(self, "action") and self.action is not None:
self.action.setText(text)
def set_tooltip(self, tooltip: str | None):
"""Set tooltip on both the underlying widget and the QWidgetAction."""
self.tooltip = tooltip or ""
if self.widget is not None:
self.widget.setToolTip(self.tooltip)
if hasattr(self, "action") and self.action is not None:
self.action.setToolTip(self.tooltip)
class QtIconAction(IconAction):
def __init__(
self,

View File

@@ -0,0 +1,283 @@
from __future__ import annotations
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.messages import BeamlineConditionUpdateEntry
from qtpy.QtCore import QObject, QTimer, Signal
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.toolbars.actions import StatusIndicatorAction, StatusState
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
logger = bec_logger.logger
class BECStatusBroker(BECConnector, QObject):
"""Listen to BEC beamline condition endpoints and emit structured signals."""
_instance: "BECStatusBroker | None" = None
_initialized: bool = False
available_updated = Signal(list) # list of conditions available
status_updated = Signal(str, dict) # name, status update
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, parent=None, gui_id: str | None = None, client=None, **kwargs):
if self._initialized:
return
super().__init__(parent=parent, gui_id=gui_id, client=client, **kwargs)
self._watched: set[str] = set()
self.bec_dispatcher.connect_slot(
self.on_available, MessageEndpoints.available_beamline_conditions()
)
self._initialized = True
self.refresh_available()
def refresh_available(self):
"""Fetch the current set of beamline conditions once."""
try:
msg = self.client.connector.get_last(MessageEndpoints.available_beamline_conditions())
logger.info(f"StatusBroker: fetched available conditions payload: {msg}")
if msg:
self.on_available(msg.get("data").content, None)
except Exception as exc: # pragma: no cover - runtime env
logger.debug(f"Could not fetch available conditions: {exc}")
@SafeSlot(dict, dict)
def on_available(self, data: dict, meta: dict | None = None):
condition_list = data.get("conditions") # latest one from the stream
self.available_updated.emit(condition_list)
for condition in condition_list:
name = condition.name
if name:
self.watch_condition(name)
def watch_condition(self, name: str):
"""Subscribe to updates for a single beamline condition."""
if name in self._watched:
return
self._watched.add(name)
endpoint = MessageEndpoints.beamline_condition(name)
logger.info(f"StatusBroker: watching condition '{name}' on {endpoint.endpoint}")
self.bec_dispatcher.connect_slot(self.on_condition, endpoint)
self.fetch_condition(name)
def fetch_condition(self, name: str):
"""Fetch the current value of a beamline condition once."""
endpoint = MessageEndpoints.beamline_condition(name)
try:
msg = self.client.connector.get_last(endpoint)
logger.info(f"StatusBroker: fetched condition '{name}' payload: {msg}")
if msg:
self.on_condition(msg.get("data").content, None)
except Exception as exc: # pragma: no cover - runtime env
logger.debug(f"Could not fetch condition {name}: {exc}")
@SafeSlot(dict, dict)
def on_condition(self, data: dict, meta: dict | None = None):
name = data.get("name")
if not name:
return
logger.info(f"StatusBroker: condition update for '{name}' -> {data}")
self.status_updated.emit(str(name), data)
@classmethod
def reset_singleton(cls):
"""
Reset the singleton instance of the BECStatusBroker.
"""
cls._instance = None
cls._initialized = False
class StatusToolBar(ModularToolBar):
"""Status toolbar that auto-manages beamline condition indicators."""
STATUS_MAP: dict[str, StatusState] = {
"normal": StatusState.SUCCESS,
"warning": StatusState.WARNING,
"alarm": StatusState.EMERGENCY,
}
def __init__(self, parent=None, names: list[str] | None = None, **kwargs):
super().__init__(parent=parent, orientation="horizontal", **kwargs)
self.setObjectName("StatusToolbar")
self._status_bundle = self.new_bundle("status")
self.show_bundles(["status"])
self._apply_status_toolbar_style()
self.allowed_names: set[str] | None = set(names) if names is not None else None
logger.info(f"StatusToolbar init allowed_names={self.allowed_names}")
self.broker = BECStatusBroker()
self.broker.available_updated.connect(self.on_available_updated)
self.broker.status_updated.connect(self.on_status_updated)
QTimer.singleShot(0, self.refresh_from_broker)
def refresh_from_broker(self) -> None:
if self.allowed_names is None:
self.broker.refresh_available()
else:
for name in self.allowed_names:
if not self.components.exists(name):
# Pre-create a placeholder pill so it is visible even before data arrives.
self.add_status_item(
name=name, text=name, state=StatusState.DEFAULT, tooltip=None
)
self.broker.watch_condition(name)
def _apply_status_toolbar_style(self) -> None:
self.setStyleSheet(
"QToolBar#StatusToolbar {"
f" background-color: {self.background_color};"
" border: none;"
" border-bottom: 1px solid palette(mid);"
"}"
)
# -------- Slots for updates --------
@SafeSlot(list)
def on_available_updated(self, available_conditions: list):
"""Process the available conditions stream and start watching them."""
# Keep track of current names from the broker to remove stale ones.
current_names: set[str] = set()
for condition in available_conditions:
if not isinstance(condition, BeamlineConditionUpdateEntry):
continue
name = condition.name
title = condition.title or name
if not name:
continue
current_names.add(name)
logger.info(f"StatusToolbar: discovered condition '{name}' title='{title}'")
# auto-add unless filtered out
if self.allowed_names is None or name in self.allowed_names:
self.add_status_item(name=name, text=title, state=StatusState.DEFAULT, tooltip=None)
else:
# keep hidden but present for context menu toggling
self.add_status_item(name=name, text=title, state=StatusState.DEFAULT, tooltip=None)
act = self.components.get_action(name)
if act and act.action:
act.action.setVisible(False)
# Remove actions that are no longer present in available_conditions.
known_actions = [
n for n in self.components._components.keys() if n not in ("separator",)
] # direct access used for clean-up
for name in known_actions:
if name not in current_names:
logger.info(f"StatusToolbar: removing stale condition '{name}'")
try:
self.components.remove_action(name)
except Exception as exc:
logger.warning(f"Failed to remove stale condition '{name}': {exc}")
self.refresh()
@SafeSlot(str, dict)
def on_status_updated(self, name: str, payload: dict): # TODO finish update logic
"""Update a status pill when a condition update arrives."""
state = self.STATUS_MAP.get(str(payload.get("status", "")).lower(), StatusState.DEFAULT)
action = self.components.get_action(name) if self.components.exists(name) else None
# Only update the label when a title is explicitly provided; otherwise keep current text.
title = payload.get("title") or None
text = title
if text is None and action is None:
text = payload.get("name") or name
if "message" in payload:
tooltip = payload.get("message") or ""
else:
tooltip = None
logger.info(
f"StatusToolbar: update condition '{name}' -> state={state} text='{text}' tooltip='{tooltip}'"
)
self.set_status(name=name, text=text, state=state, tooltip=tooltip)
# -------- Items Management --------
def add_status_item(
self,
name: str,
*,
text: str = "Ready",
state: StatusState | str = StatusState.DEFAULT,
tooltip: str | None = None,
) -> StatusIndicatorAction | None:
"""
Add or update a named status item in the toolbar.
After you added all actions, call `toolbar.refresh()` to update the display.
Args:
name(str): Unique name for the status item.
text(str): Text to display in the status item.
state(StatusState | str): State of the status item.
tooltip(str | None): Optional tooltip for the status item.
Returns:
StatusIndicatorAction | None: The created or updated status action, or None if toolbar is not initialized.
"""
if self._status_bundle is None:
return
if self.components.exists(name):
return
action = StatusIndicatorAction(text=text, state=state, tooltip=tooltip)
return self.add_status_action(name, action)
def add_status_action(
self, name: str, action: StatusIndicatorAction
) -> StatusIndicatorAction | None:
"""
Attach an existing StatusIndicatorAction to the status toolbar.
After you added all actions, call `toolbar.refresh()` to update the display.
Args:
name(str): Unique name for the status item.
action(StatusIndicatorAction): The status action to add.
Returns:
StatusIndicatorAction | None: The added status action, or None if toolbar is not initialized.
"""
self.components.add_safe(name, action)
self.get_bundle("status").add_action(name)
self.refresh()
self.broker.fetch_condition(name)
return action
def set_status(
self,
name: str = "main",
*,
state: StatusState | str | None = None,
text: str | None = None,
tooltip: str | None = None,
) -> None:
"""
Update the status item with the given name, creating it if necessary.
Args:
name(str): Unique name for the status item.
state(StatusState | str | None): New state for the status item.
text(str | None): New text for the status item.
"""
action = self.components.get_action(name) if self.components.exists(name) else None
if action is None:
action = self.add_status_item(
name, text=text or "Ready", state=state or "default", tooltip=tooltip
)
if action is None:
return
if state is not None:
action.set_state(state)
if text is not None:
action.set_text(text)
if tooltip is not None and hasattr(action, "set_tooltip"):
action.set_tooltip(tooltip)

View File

@@ -6,7 +6,6 @@ from qtpy.QtGui import QIcon
from qtpy.QtWidgets import QWidget
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.utils.colors import apply_theme
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow
DOM_XML = """
@@ -45,8 +44,6 @@ class BECMainWindowPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
from qtpy.QtCore import Qt
from qtpy.QtWidgets import QApplication
apply_theme("dark")
import bec_widgets
MODULE_PATH = os.path.dirname(bec_widgets.__file__)

View File

@@ -22,6 +22,7 @@ from bec_widgets.utils import UILoader
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import apply_theme
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.toolbars.status_bar import StatusToolBar
from bec_widgets.widgets.containers.main_window.addons.hover_widget import HoverWidget
from bec_widgets.widgets.containers.main_window.addons.notification_center.notification_banner import (
BECNotificationBroker,
@@ -114,14 +115,11 @@ class BECMainWindow(BECWidget, QMainWindow):
Prepare the BEC specific widgets in the status bar.
"""
# Left: AppID label
self._app_id_label = QLabel()
self._app_id_label.setAlignment(
Qt.AlignmentFlag.AlignHCenter | Qt.AlignmentFlag.AlignVCenter
)
self.status_bar.addWidget(self._app_id_label)
# Left: Beamline condition status toolbar (auto-fetches all conditions)
self._status_toolbar = StatusToolBar(parent=self, names=None)
self.status_bar.addWidget(self._status_toolbar)
# Add a separator after the app ID label
# Add a separator after the status toolbar
self._add_separator()
# Centre: Clientinfo label (stretch=1 so it expands)
@@ -394,13 +392,17 @@ class BECMainWindow(BECWidget, QMainWindow):
help_menu.addAction(bec_docs)
help_menu.addAction(widgets_docs)
help_menu.addAction(bug_report)
help_menu.addSeparator()
self._app_id_action = QAction(self)
self._app_id_action.setEnabled(False)
help_menu.addAction(self._app_id_action)
################################################################################
# Status Bar Addons
################################################################################
def display_app_id(self):
"""
Display the app ID in the status bar.
Display the app ID in the Help menu.
"""
if self.bec_dispatcher.cli_server is None:
status_message = "Not connected"
@@ -408,7 +410,8 @@ class BECMainWindow(BECWidget, QMainWindow):
# Get the server ID from the dispatcher
server_id = self.bec_dispatcher.cli_server.gui_id
status_message = f"App ID: {server_id}"
self._app_id_label.setText(status_message)
if hasattr(self, "_app_id_action"):
self._app_id_action.setText(status_message)
@SafeSlot(dict, dict)
def display_client_message(self, msg: dict, meta: dict):

View File

@@ -1,7 +1,6 @@
from __future__ import annotations
import json
from dataclasses import dataclass
from typing import Literal
import numpy as np
@@ -9,7 +8,7 @@ import pyqtgraph as pg
from bec_lib import bec_logger, messages
from bec_lib.endpoints import MessageEndpoints
from pydantic import BaseModel, Field, field_validator
from qtpy.QtCore import QObject, Qt, QThread, QTimer, Signal
from qtpy.QtCore import QTimer, Signal
from qtpy.QtGui import QTransform
from scipy.interpolate import (
CloughTocher2DInterpolator,
@@ -78,90 +77,6 @@ class HeatmapConfig(ConnectionConfig):
_validate_color_palette = field_validator("color_map")(Colors.validate_color_map)
@dataclass
class _InterpolationRequest:
"""Immutable payload describing an interpolation request for the worker thread.
Args:
x_data: X coordinates collected so far.
y_data: Y coordinates collected so far.
z_data: Z values associated with x/y.
data_version: Number of points at request time (len(z_data)); used to reject stale results.
scan_id: Identifier for the scan that produced the data.
interpolation: Interpolation method to apply.
oversampling_factor: Oversampling factor for the interpolation grid.
"""
x_data: list[float]
y_data: list[float]
z_data: list[float]
data_version: int
scan_id: str
interpolation: str
oversampling_factor: float
class _StepInterpolationWorker(QObject):
"""Worker for performing step-scan interpolation in a background thread.
This worker computes the interpolated heatmap image using the provided data
and settings, then emits the result or a failure signal.
Signals:
finished(image, transform, data_version, scan_id):
Emitted when interpolation is successful.
- image: The resulting image (numpy array or similar).
- transform: The QTransform for the image.
- data_version: The data version for the request.
- scan_id: The scan identifier.
failed(error_message, data_version, scan_id):
Emitted when interpolation fails.
- error_message: The error message string.
- data_version: The data version for the request.
- scan_id: The scan identifier.
"""
finished = Signal(object, object, int, str)
failed = Signal(str, int, str)
def __init__(self, parent: QObject | None = None):
super().__init__(parent=parent)
self._active_request: _InterpolationRequest | None = None
self._processing = False
@property
def is_processing(self) -> bool:
"""Return whether the worker is currently processing a request."""
return self._processing
@SafeSlot(object, int)
def process(self, request: _InterpolationRequest, data_version: int):
"""
Process an interpolation request in the worker thread.
Args:
request(_InterpolationRequest): The interpolation request payload.
data_version(int): The data version for the request.
"""
self._active_request = request
self._processing = True
try:
image, transform = Heatmap.compute_step_scan_image(
x_data=np.asarray(request.x_data, dtype=float),
y_data=np.asarray(request.y_data, dtype=float),
z_data=np.asarray(request.z_data, dtype=float),
oversampling_factor=request.oversampling_factor,
interpolation_method=request.interpolation,
)
except Exception as exc: # pragma: no cover - defensive
logger.warning(f"Step-scan interpolation failed with: {exc}")
self.failed.emit(str(exc), data_version, request.scan_id)
self._processing = False
return
self._processing = False
self.finished.emit(image, transform, data_version, request.scan_id)
class Heatmap(ImageBase):
"""
Heatmap widget for visualizing 2d grid data with color mapping for the z-axis.
@@ -248,7 +163,6 @@ class Heatmap(ImageBase):
new_scan_id = Signal(str)
sync_signal_update = Signal()
heatmap_property_changed = Signal()
interpolation_requested = Signal(object, int)
def __init__(self, parent=None, config: HeatmapConfig | None = None, **kwargs):
if config is None:
@@ -271,12 +185,6 @@ class Heatmap(ImageBase):
self.scan_item = None
self.status_message = None
self._grid_index = None
# Highest data_version we have dispatched for the current scan; used to drop stale results.
# Initialized to -1 so the first real request (len(z_data) >= 0) always supersedes it.
self._latest_interpolation_version = -1
self._interpolation_thread: QThread | None = None
self._interpolation_worker: _StepInterpolationWorker | None = None
self._pending_interpolation_request: _InterpolationRequest | None = None
self.heatmap_dialog = None
bg_color = pg.mkColor((240, 240, 240, 150))
self.config_label = pg.LegendItem(
@@ -536,7 +444,6 @@ class Heatmap(ImageBase):
if current_scan_id is None:
return
if current_scan_id != self.scan_id:
self._invalidate_interpolation_generation() # Invalidate any pending interpolation work when a new scan starts
self.reset()
self.new_scan.emit()
self.new_scan_id.emit(current_scan_id)
@@ -642,38 +549,13 @@ class Heatmap(ImageBase):
if self._image_config.show_config_label:
self.redraw_config_label()
if self._is_grid_scan_supported(scan_msg):
img, transform = self.get_grid_scan_image(z_data, scan_msg)
self._apply_image_update(img, transform)
return
if len(z_data) < 4:
# LinearNDInterpolator requires at least 4 points to interpolate
logger.warning("Not enough data points to interpolate; skipping update.")
return
self._request_step_scan_interpolation(x_data, y_data, z_data, scan_msg)
def _apply_image_update(self, img: np.ndarray | None, transform: QTransform | None):
"""Apply interpolated image and transform to the heatmap display.
This method updates the main image with the computed data and emits
the image_updated signal. Color bar signals are temporarily blocked
during the update to prevent cascading updates.
Args:
img(np.ndarray): The interpolated image data, or None if unavailable
transform(QTransform): QTransform mapping pixel to world coordinates, or None if unavailable
"""
if img is None or transform is None:
img, transform = self.get_image_data(x_data=x_data, y_data=y_data, z_data=z_data)
if img is None:
logger.warning("Image data is None; skipping update.")
return
if self._color_bar is not None:
self._color_bar.blockSignals(True)
if self.main_image is None:
logger.warning("Main image item is None; cannot update image.")
return
self.main_image.set_data(img, transform=transform)
if self._color_bar is not None:
self._color_bar.blockSignals(False)
@@ -681,128 +563,6 @@ class Heatmap(ImageBase):
if self.crosshair is not None:
self.crosshair.update_markers_on_image_change()
def _request_step_scan_interpolation(
self,
x_data: list[float],
y_data: list[float],
z_data: list[float],
msg: messages.ScanStatusMessage,
):
"""Request step-scan interpolation in a background thread.
If a thread is already running, the request is queued as a pending request
and will be processed when the current interpolation completes.
Args:
x_data(list[float]): X coordinates of data points
y_data(list[float]): Y coordinates of data points
z_data(list[float]): Z values at each point
msg(messages.ScanStatusMessage): Scan status message containing scan metadata
"""
request = _InterpolationRequest(
x_data=list(x_data),
y_data=list(y_data),
z_data=list(z_data),
data_version=len(z_data),
scan_id=msg.scan_id,
interpolation=self._image_config.interpolation,
oversampling_factor=self._image_config.oversampling_factor,
)
if self._interpolation_worker is not None and self._interpolation_worker.is_processing:
self._pending_interpolation_request = request
return
self._start_step_scan_interpolation(request)
def _ensure_interpolation_thread(self):
if self._interpolation_thread is None:
self._interpolation_thread = QThread()
self._interpolation_worker = _StepInterpolationWorker()
self._interpolation_worker.moveToThread(self._interpolation_thread)
self.interpolation_requested.connect(
self._interpolation_worker.process, Qt.ConnectionType.QueuedConnection
)
self._interpolation_worker.finished.connect(
self._on_interpolation_finished, Qt.ConnectionType.QueuedConnection
)
self._interpolation_worker.failed.connect(
self._on_interpolation_failed, Qt.ConnectionType.QueuedConnection
)
if self._interpolation_thread is not None and not self._interpolation_thread.isRunning():
self._interpolation_thread.start()
def _start_step_scan_interpolation(self, request: _InterpolationRequest):
# data_version = len(z_data) at the time of the request; keep the latest to gate results.
self._ensure_interpolation_thread()
if self._interpolation_thread is not None and not self._interpolation_thread.isRunning():
self._interpolation_thread.start()
self._latest_interpolation_version = request.data_version
self.interpolation_requested.emit(request, request.data_version)
def _on_interpolation_finished(
self, img: np.ndarray, transform: QTransform, data_version: int, scan_id: str
):
# Only accept results that match the latest dispatched version for the active scan.
if data_version == self._latest_interpolation_version and scan_id == self.scan_id:
self._apply_image_update(img, transform)
else:
logger.info("Discarding outdated interpolation result.")
self._maybe_start_pending_interpolation()
def _on_interpolation_failed(self, error: str, data_version: int, scan_id: str):
logger.warning(f"Interpolation failed for scan {scan_id} (version {data_version}): {error}")
self._maybe_start_pending_interpolation()
def _finish_interpolation_thread(self):
self._pending_interpolation_request = None
if self._interpolation_worker is not None:
try:
self.interpolation_requested.disconnect(self._interpolation_worker.process)
except (TypeError, RuntimeError) as ext:
logger.warning(f"Processing thread already disconnected: {ext}")
pass
self._interpolation_worker.deleteLater()
self._interpolation_worker = None
if self._interpolation_thread is not None:
if self._interpolation_thread.isRunning():
self._interpolation_thread.quit()
if not self._interpolation_thread.wait(3000): # 3s timeout
logger.error(
f"Interpolation thread of widget {self.gui_id} did not stop within timeout 3s; leaving it dangling."
)
self._interpolation_thread.deleteLater()
self._interpolation_thread = None
logger.info(f"Interpolation thread finished of widget {self.gui_id}")
def _maybe_start_pending_interpolation(self):
if self._pending_interpolation_request is None:
return
if self._pending_interpolation_request.scan_id != self.scan_id:
self._pending_interpolation_request = None
return
if self._interpolation_worker is not None and self._interpolation_worker.is_processing:
return
pending = self._pending_interpolation_request
self._pending_interpolation_request = None
self._start_step_scan_interpolation(pending)
def _cancel_interpolation(self):
"""Cancel any pending interpolation request without invalidating in-flight work.
This clears the pending request queue but does not invalidate in-flight work,
allowing any currently running interpolation to complete and update the display
if it matches the current scan.
"""
self._pending_interpolation_request = None
# Do not change the active data version so an in-flight worker can still deliver.
def _invalidate_interpolation_generation(self):
"""Invalidate all pending interpolation results and ignore in-flight updates."""
self._pending_interpolation_request = None
self._latest_interpolation_version = -1
def redraw_config_label(self):
scan_msg = self.status_message
if scan_msg is None:
@@ -848,35 +608,21 @@ class Heatmap(ImageBase):
logger.warning("x, y, or z data is None; skipping update.")
return None, None
if self._is_grid_scan_supported(msg):
return self.get_grid_scan_image(z_data, msg)
if msg.scan_name == "grid_scan" and not self._image_config.enforce_interpolation:
# We only support the grid scan mode if both scanning motors
# are configured in the heatmap config.
device_x = self._image_config.x_device.entry
device_y = self._image_config.y_device.entry
if (
device_x in msg.request_inputs["arg_bundle"]
and device_y in msg.request_inputs["arg_bundle"]
):
return self.get_grid_scan_image(z_data, msg)
if len(z_data) < 4:
# LinearNDInterpolator requires at least 4 points to interpolate
return None, None
return self.get_step_scan_image(x_data, y_data, z_data, msg)
def _is_grid_scan_supported(self, msg: messages.ScanStatusMessage) -> bool:
"""Check if the scan can use optimized grid_scan rendering.
Grid scans can avoid interpolation if both X and Y devices match
the configured devices and interpolation is not enforced.
Args:
msg(messages.ScanStatusMessage): Scan status message containing scan metadata
Returns:
True if grid_scan optimization is applicable, False otherwise
"""
if msg.scan_name != "grid_scan" or self._image_config.enforce_interpolation:
return False
device_x = self._image_config.x_device.entry
device_y = self._image_config.y_device.entry
return (
device_x in msg.request_inputs["arg_bundle"]
and device_y in msg.request_inputs["arg_bundle"]
)
def get_grid_scan_image(
self, z_data: list[float], msg: messages.ScanStatusMessage
) -> tuple[np.ndarray, QTransform]:
@@ -971,49 +717,17 @@ class Heatmap(ImageBase):
Returns:
tuple[np.ndarray, QTransform]: The image data and the QTransform.
"""
return self.compute_step_scan_image(
x_data=x_data,
y_data=y_data,
z_data=z_data,
oversampling_factor=self._image_config.oversampling_factor,
interpolation_method=self._image_config.interpolation,
)
@staticmethod
def compute_step_scan_image(
x_data: list[float] | np.ndarray,
y_data: list[float] | np.ndarray,
z_data: list[float] | np.ndarray,
oversampling_factor: float,
interpolation_method: str,
) -> tuple[np.ndarray, QTransform]:
"""Compute interpolated heatmap image from step-scan data.
This static method is suitable for execution in a background thread
as it doesn't access any instance state.
Args:
x_data(list[float]): X coordinates of data points
y_data(list[float]): Y coordinates of data points
z_data(list[float]): Z values at each point
oversampling_factor(float): Grid resolution multiplier (>1.0 for higher resolution)
interpolation_method(str): One of 'linear', 'nearest', or 'clough'
Returns:
(tuple[np.ndarray, QTransform]):Tuple of (interpolated_grid, transform) where transform maps pixel to world coordinates
"""
xy_data = np.column_stack((x_data, y_data))
grid_x, grid_y, transform = Heatmap.build_image_grid(
positions=xy_data, oversampling_factor=oversampling_factor
)
grid_x, grid_y, transform = self.get_image_grid(xy_data)
if interpolation_method == "linear":
# Interpolate the z data onto the grid
if self._image_config.interpolation == "linear":
interp = LinearNDInterpolator(xy_data, z_data)
elif interpolation_method == "nearest":
elif self._image_config.interpolation == "nearest":
interp = NearestNDInterpolator(xy_data, z_data)
elif interpolation_method == "clough":
elif self._image_config.interpolation == "clough":
interp = CloughTocher2DInterpolator(xy_data, z_data)
else: # pragma: no cover - guarded by validation
else:
raise ValueError(
"Interpolation method must be either 'linear', 'nearest', or 'clough'."
)
@@ -1032,33 +746,22 @@ class Heatmap(ImageBase):
Returns:
tuple[np.ndarray, np.ndarray, QTransform]: The grid x and y coordinates and the QTransform.
"""
return self.build_image_grid(
positions=positions, oversampling_factor=self._image_config.oversampling_factor
)
base_width, base_height = self.estimate_image_resolution(positions)
@staticmethod
def build_image_grid(
positions: np.ndarray, oversampling_factor: float
) -> tuple[np.ndarray, np.ndarray, QTransform]:
"""Build an interpolation grid covering the data positions.
# Apply oversampling factor
factor = self._image_config.oversampling_factor
Args:
positions: (N, 2) array of (x, y) coordinates
oversampling_factor: Grid resolution multiplier (>1.0 for higher resolution)
Returns:
Tuple of (grid_x, grid_y, transform) where grid_x/grid_y are meshgrids
for interpolation and transform maps pixel to world coordinates
"""
base_width, base_height = Heatmap.estimate_image_resolution(positions)
width = max(1, int(base_width * oversampling_factor))
height = max(1, int(base_height * oversampling_factor))
# Apply oversampling
width = int(base_width * factor)
height = int(base_height * factor)
# Create grid
grid_x, grid_y = np.mgrid[
min(positions[:, 0]) : max(positions[:, 0]) : width * 1j,
min(positions[:, 1]) : max(positions[:, 1]) : height * 1j,
]
# Calculate transform
x_min, x_max = min(positions[:, 0]), max(positions[:, 0])
y_min, y_max = min(positions[:, 1]), max(positions[:, 1])
x_range = x_max - x_min
@@ -1142,7 +845,6 @@ class Heatmap(ImageBase):
return scan_devices, "value"
def reset(self):
self._cancel_interpolation()
self._grid_index = None
self.main_image.clear()
if self.crosshair is not None:
@@ -1277,10 +979,6 @@ class Heatmap(ImageBase):
"""
self.main_image.transpose = enable
def cleanup(self):
self._finish_interpolation_thread()
super().cleanup()
if __name__ == "__main__": # pragma: no cover
import sys

View File

@@ -47,9 +47,7 @@ class DeviceBrowser(BECWidget, QWidget):
) -> None:
super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs)
self.get_bec_shortcuts()
self._config_helper = ConfigHelper(
self.client.connector, self.client._service_name, self.client.device_manager
)
self._config_helper = ConfigHelper(self.client.connector, self.client._service_name)
self._q_threadpool = QThreadPool()
self.ui = None
self.init_ui()

View File

@@ -229,7 +229,7 @@ class DirectUpdateDeviceConfigDialog(BECWidget, DeviceConfigDialog):
self._device = device
self._q_threadpool = threadpool or QThreadPool()
self._config_helper = config_helper or ConfigHelper(
self.client.connector, self.client._service_name, self.client.device_manager
self.client.connector, self.client._service_name
)
super().__init__(parent=parent, **kwargs)
self.get_bec_shortcuts()

View File

@@ -78,15 +78,15 @@ def test_available_widgets(qtbot, connected_client_gui_obj):
"""This test checks that all widgets that are available via gui.available_widgets can be created and removed."""
gui = connected_client_gui_obj
dock_area = gui.bec
# Number of top level widgets, should be 4
top_level_widgets_count = 12
# Number of top level widgets, should be 5
top_level_widgets_count = 13
assert len(gui._server_registry) == top_level_widgets_count
names = set(list(gui._server_registry.keys()))
# Number of widgets with parent_id == None, should be 2
# Number of widgets with parent_id == None, should be 3
widgets = [
widget for widget in gui._server_registry.values() if widget["config"]["parent_id"] is None
]
assert len(widgets) == 2
assert len(widgets) == 3
# Test all relevant widgets
for object_name in gui.available_widgets.__dict__:
@@ -122,7 +122,7 @@ def test_available_widgets(qtbot, connected_client_gui_obj):
for widget in gui._server_registry.values()
if widget["config"]["parent_id"] is None
]
assert len(widgets) == 2
assert len(widgets) == 3
#############################
####### Remove widget #######
@@ -154,10 +154,10 @@ def test_available_widgets(qtbot, connected_client_gui_obj):
f"is {len(gui._server_registry)} instead of {top_level_widgets_count}. The following "
f"widgets are not cleaned up: {set(gui._server_registry.keys()) - names}"
) from exc
# Number of widgets with parent_id == None, should be 2
# Number of widgets with parent_id == None, should be 3
widgets = [
widget
for widget in gui._server_registry.values()
if widget["config"]["parent_id"] is None
]
assert len(widgets) == 2
assert len(widgets) == 3

View File

@@ -134,7 +134,7 @@ def test_update_cycle(update_dialog, qtbot):
"deviceClass": "TestDevice",
"deviceConfig": {"param1": "val1"},
"readoutPriority": "monitored",
"description": "",
"description": None,
"readOnly": False,
"softwareTrigger": False,
"onFailure": "retry",

View File

@@ -4,15 +4,8 @@ import numpy as np
import pytest
from bec_lib import messages
from bec_lib.scan_history import ScanHistory
from qtpy.QtGui import QTransform
from bec_widgets.widgets.plots.heatmap.heatmap import (
Heatmap,
HeatmapConfig,
HeatmapDeviceSignal,
_InterpolationRequest,
_StepInterpolationWorker,
)
from bec_widgets.widgets.plots.heatmap.heatmap import Heatmap, HeatmapConfig, HeatmapDeviceSignal
# pytest: disable=unused-import
from tests.unit_tests.client_mocks import mocked_client
@@ -341,16 +334,12 @@ def test_heatmap_widget_reset(heatmap_widget):
"""
Test that the reset method clears the plot.
"""
heatmap_widget._pending_interpolation_request = object()
heatmap_widget._latest_interpolation_version = 5
heatmap_widget.scan_item = create_dummy_scan_item()
heatmap_widget.plot(x_name="samx", y_name="samy", z_name="bpm4i")
heatmap_widget.reset()
assert heatmap_widget._grid_index is None
assert heatmap_widget.main_image.raw_data is None
assert heatmap_widget._pending_interpolation_request is None
assert heatmap_widget._latest_interpolation_version == 5
def test_heatmap_widget_update_plot_with_scan_history(heatmap_widget, grid_scan_history_msg, qtbot):
@@ -375,111 +364,3 @@ def test_heatmap_widget_update_plot_with_scan_history(heatmap_widget, grid_scan_
heatmap_widget.enforce_interpolation = True
heatmap_widget.oversampling_factor = 2.0
qtbot.waitUntil(lambda: heatmap_widget.main_image.raw_data.shape == (20, 20))
def test_step_interpolation_worker_emits_finished(qtbot):
worker = _StepInterpolationWorker()
request = _InterpolationRequest(
x_data=[0.0, 1.0, 0.5, 0.2],
y_data=[0.0, 0.0, 1.0, 1.0],
z_data=[1.0, 2.0, 3.0, 4.0],
data_version=4,
scan_id="scan-1",
interpolation="linear",
oversampling_factor=1.0,
)
with qtbot.waitSignal(worker.finished, timeout=1000) as blocker:
worker.process(request, request.data_version)
img, transform, data_version, scan_id = blocker.args
assert img.shape[0] > 0
assert isinstance(transform, QTransform)
assert data_version == request.data_version
assert scan_id == request.scan_id
def test_step_interpolation_worker_emits_failed(qtbot, monkeypatch):
def _scan_goes_boom(**kwargs):
raise RuntimeError("crash")
monkeypatch.setattr(
"bec_widgets.widgets.plots.heatmap.heatmap.Heatmap.compute_step_scan_image", _scan_goes_boom
)
worker = _StepInterpolationWorker()
request = _InterpolationRequest(
x_data=[0.0, 1.0, 0.5, 0.2],
y_data=[0.0, 0.0, 1.0, 1.0],
z_data=[1.0, 2.0, 3.0, 4.0],
data_version=99,
scan_id="scan-err",
interpolation="linear",
oversampling_factor=1.0,
)
with qtbot.waitSignal(worker.failed, timeout=1000) as blocker:
worker.process(request, request.data_version)
error, data_version, scan_id = blocker.args
assert "crash" in error
assert data_version == request.data_version
assert scan_id == request.scan_id
def test_interpolation_generation_invalidation(heatmap_widget):
heatmap_widget.scan_id = "scan-1"
heatmap_widget._latest_interpolation_version = 2
with (
mock.patch.object(heatmap_widget, "_apply_image_update") as apply_mock,
mock.patch.object(heatmap_widget, "_maybe_start_pending_interpolation") as maybe_mock,
):
heatmap_widget._on_interpolation_finished(
np.zeros((2, 2)), QTransform(), data_version=1, scan_id="scan-1"
)
apply_mock.assert_not_called()
maybe_mock.assert_called_once()
def test_pending_request_queueing_and_start(heatmap_widget):
heatmap_widget.scan_id = "scan-queue"
heatmap_widget.status_message = messages.ScanStatusMessage(
scan_id="scan-queue",
status="open",
scan_name="step_scan",
scan_type="step",
metadata={},
info={"positions": [[0, 0], [1, 1], [2, 2], [3, 3]]},
)
# Simulate an active worker processing a job so new requests are queued.
heatmap_widget._interpolation_worker = mock.MagicMock()
heatmap_widget._interpolation_worker.is_processing = True
with mock.patch.object(heatmap_widget, "_start_step_scan_interpolation") as start_mock:
heatmap_widget._request_step_scan_interpolation(
x_data=[0, 1, 2, 3],
y_data=[0, 1, 2, 3],
z_data=[0, 1, 2, 3],
msg=heatmap_widget.status_message,
)
assert heatmap_widget._pending_interpolation_request is not None
# Now simulate worker finished and thread cleaned up
heatmap_widget._interpolation_worker.is_processing = False
pending = heatmap_widget._pending_interpolation_request
heatmap_widget._pending_interpolation_request = pending
heatmap_widget._maybe_start_pending_interpolation()
start_mock.assert_called_once()
def test_finish_interpolation_thread_cleans_references(heatmap_widget):
worker_mock = mock.Mock()
thread_mock = mock.Mock()
thread_mock.isRunning.return_value = True
heatmap_widget._interpolation_worker = worker_mock
heatmap_widget._interpolation_thread = thread_mock
heatmap_widget._finish_interpolation_thread()
worker_mock.deleteLater.assert_called_once()
thread_mock.quit.assert_called_once()
thread_mock.wait.assert_called_once()
thread_mock.deleteLater.assert_called_once()
assert heatmap_widget._interpolation_worker is None
assert heatmap_widget._interpolation_thread is None

View File

@@ -0,0 +1,97 @@
from __future__ import annotations
import pytest
from bec_lib.messages import BeamlineConditionUpdateEntry
from qtpy.QtWidgets import QToolBar
from bec_widgets.utils.toolbars.actions import StatusIndicatorAction, StatusIndicatorWidget
from bec_widgets.utils.toolbars.status_bar import BECStatusBroker, StatusToolBar
from .client_mocks import mocked_client
from .conftest import create_widget
class TestStatusIndicators:
"""Widget/action level tests independent of broker wiring."""
def test_indicator_widget_state_and_text(self, qtbot):
widget = StatusIndicatorWidget(text="Ready", state="success")
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
widget.set_state("warning")
widget.set_text("Alert")
assert widget._state.value == "warning"
assert widget._text_label.text() == "Alert"
def test_indicator_action_updates_widget_and_action(self, qtbot):
qt_toolbar = QToolBar()
qtbot.addWidget(qt_toolbar)
action = StatusIndicatorAction(text="Ready", tooltip="Initial")
action.add_to_toolbar(qt_toolbar, qt_toolbar)
action.set_tooltip("Updated tooltip")
action.set_text("Running")
assert action.action.toolTip() == "Updated tooltip"
assert action.widget.toolTip() == "Updated tooltip" # type: ignore[union-attr]
assert action.widget._text_label.text() == "Running" # type: ignore[union-attr]
class TestStatusBar:
"""Status bar + broker integration using fake redis client (mocked_client)."""
@pytest.fixture(params=[{}, {"names": ["alpha"]}])
def status_toolbar(self, qtbot, mocked_client, request):
broker = BECStatusBroker(client=mocked_client)
toolbar = create_widget(qtbot, StatusToolBar, **request.param)
yield toolbar
broker.reset_singleton()
def test_allowed_names_precreates_placeholder(self, status_toolbar):
status_toolbar.broker.refresh_available = lambda: None
status_toolbar.refresh_from_broker()
# We parametrize the fixture so one invocation has allowed_names set.
if status_toolbar.allowed_names:
name = next(iter(status_toolbar.allowed_names))
assert status_toolbar.components.exists(name)
act = status_toolbar.components.get_action(name)
assert isinstance(act, StatusIndicatorAction)
assert act.widget._text_label.text() == name # type: ignore[union-attr]
def test_on_available_adds_and_removes(self, status_toolbar):
conditions = [
BeamlineConditionUpdateEntry(name="c1", title="Cond 1", condition_type="test"),
BeamlineConditionUpdateEntry(name="c2", title="Cond 2", condition_type="test"),
]
status_toolbar.on_available_updated(conditions)
assert status_toolbar.components.exists("c1")
assert status_toolbar.components.exists("c2")
conditions2 = [
BeamlineConditionUpdateEntry(name="c1", title="Cond 1", condition_type="test")
]
status_toolbar.on_available_updated(conditions2)
assert status_toolbar.components.exists("c1")
assert not status_toolbar.components.exists("c2")
def test_on_status_updated_sets_title_and_message(self, status_toolbar):
status_toolbar.add_status_item("beam", text="Initial", state="default", tooltip=None)
payload = {"name": "beam", "status": "warning", "title": "New Title", "message": "Detail"}
status_toolbar.on_status_updated("beam", payload)
action = status_toolbar.components.get_action("beam")
assert isinstance(action, StatusIndicatorAction)
assert action.widget._text_label.text() == "New Title" # type: ignore[union-attr]
assert action.action.toolTip() == "Detail"
def test_on_status_updated_keeps_existing_text_when_no_title(self, status_toolbar):
status_toolbar.add_status_item("beam", text="Keep Me", state="default", tooltip=None)
payload = {"name": "beam", "status": "normal", "message": "Note"}
status_toolbar.on_status_updated("beam", payload)
action = status_toolbar.components.get_action("beam")
assert isinstance(action, StatusIndicatorAction)
assert action.widget._text_label.text() == "Keep Me" # type: ignore[union-attr]
assert action.action.toolTip() == "Note"