mirror of
https://github.com/bec-project/bec_widgets.git
synced 2026-04-08 01:37:53 +02:00
Compare commits
4 Commits
fix/themin
...
feat/statu
| Author | SHA1 | Date | |
|---|---|---|---|
| d6f5c0e4f9 | |||
| 55146665d2 | |||
| 6836036507 | |||
| c60cb31eaa |
@@ -549,7 +549,7 @@ class LaunchWindow(BECMainWindow):
|
||||
remaining_connections = [
|
||||
connection for connection in connections.values() if connection.parent_id != self.gui_id
|
||||
]
|
||||
return len(remaining_connections) <= 4
|
||||
return len(remaining_connections) <= 5
|
||||
|
||||
def _turn_off_the_lights(self, connections: dict):
|
||||
"""
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from qtpy.QtWidgets import QWidget
|
||||
|
||||
from bec_widgets.applications.views.developer_view.developer_widget import DeveloperWidget
|
||||
|
||||
@@ -18,6 +18,7 @@ from qtpy.QtWidgets import (
|
||||
)
|
||||
|
||||
from bec_widgets.utils.error_popups import SafeSlot
|
||||
from bec_widgets.utils.toolbars.status_bar import StatusToolBar
|
||||
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
|
||||
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import SignalComboBox
|
||||
from bec_widgets.widgets.plots.waveform.waveform import Waveform
|
||||
@@ -69,6 +70,7 @@ class ViewBase(QWidget):
|
||||
parent (QWidget | None): Parent widget.
|
||||
id (str | None): Optional view id, useful for debugging or introspection.
|
||||
title (str | None): Optional human-readable title.
|
||||
show_status (bool): Whether to show a status toolbar at the top of the view.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -78,6 +80,8 @@ class ViewBase(QWidget):
|
||||
*,
|
||||
id: str | None = None,
|
||||
title: str | None = None,
|
||||
show_status: bool = False,
|
||||
status_names: list[str] | None = None,
|
||||
):
|
||||
super().__init__(parent=parent)
|
||||
self.content: QWidget | None = None
|
||||
@@ -88,15 +92,48 @@ class ViewBase(QWidget):
|
||||
lay.setContentsMargins(0, 0, 0, 0)
|
||||
lay.setSpacing(0)
|
||||
|
||||
self.status_bar: StatusToolBar | None = None
|
||||
if show_status:
|
||||
# If explicit status names are provided, default to showing only those.
|
||||
show_all = status_names is None
|
||||
self.setup_status_bar(show_all_status=show_all, status_names=status_names)
|
||||
|
||||
if content is not None:
|
||||
self.set_content(content)
|
||||
|
||||
def set_content(self, content: QWidget) -> None:
|
||||
"""Replace the current content widget with a new one."""
|
||||
if self.content is not None:
|
||||
self.layout().removeWidget(self.content)
|
||||
self.content.setParent(None)
|
||||
self.content.close()
|
||||
self.content.deleteLater()
|
||||
self.content = content
|
||||
self.layout().addWidget(content)
|
||||
if self.status_bar is not None:
|
||||
insert_at = self.layout().indexOf(self.status_bar) + 1
|
||||
self.layout().insertWidget(insert_at, content)
|
||||
else:
|
||||
self.layout().addWidget(content)
|
||||
|
||||
def setup_status_bar(
|
||||
self, *, show_all_status: bool = True, status_names: list[str] | None = None
|
||||
) -> None:
|
||||
"""Create and attach a status toolbar managed by the status broker."""
|
||||
if self.status_bar is not None:
|
||||
return
|
||||
names_arg = None if show_all_status else status_names
|
||||
self.status_bar = StatusToolBar(parent=self, names=names_arg)
|
||||
self.layout().addWidget(self.status_bar)
|
||||
|
||||
def set_status(
|
||||
self, name: str = "main", *, state=None, text: str | None = None, tooltip: str | None = None
|
||||
) -> None:
|
||||
"""Manually set a status item on the status bar."""
|
||||
if self.status_bar is None:
|
||||
self.setup_status_bar(show_all_status=True)
|
||||
if self.status_bar is None:
|
||||
return
|
||||
self.status_bar.set_status(name=name, state=state, text=text, tooltip=tooltip)
|
||||
|
||||
@SafeSlot()
|
||||
def on_enter(self) -> None:
|
||||
|
||||
@@ -25,7 +25,6 @@ from qtpy.QtWidgets import (
|
||||
QWidget,
|
||||
)
|
||||
|
||||
from bec_widgets import BECWidget
|
||||
from bec_widgets.cli.rpc.rpc_widget_handler import widget_handler
|
||||
from bec_widgets.utils.colors import apply_theme
|
||||
from bec_widgets.utils.widget_io import WidgetHierarchy as wh
|
||||
@@ -48,6 +47,7 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
|
||||
|
||||
self._widgets_by_name: Dict[str, QWidget] = {}
|
||||
self._init_ui()
|
||||
self.app = QApplication.instance()
|
||||
|
||||
# expose helper API and basics in the inprocess console
|
||||
if self.console.inprocess is True:
|
||||
@@ -94,7 +94,7 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
|
||||
return list(widget_handler.widget_classes.keys())
|
||||
|
||||
self.jc = _ConsoleAPI(self)
|
||||
self._push_to_console({"jc": self.jc, "np": np, "pg": pg, "wh": wh})
|
||||
self._push_to_console({"jc": self.jc, "np": np, "pg": pg, "wh": wh, "app": self.app})
|
||||
|
||||
def _init_ui(self):
|
||||
self.layout = QHBoxLayout(self)
|
||||
|
||||
@@ -5,7 +5,8 @@ import os
|
||||
import weakref
|
||||
from abc import ABC, abstractmethod
|
||||
from contextlib import contextmanager
|
||||
from typing import Dict, Literal
|
||||
from enum import Enum
|
||||
from typing import Dict, Literal, Union
|
||||
|
||||
from bec_lib.device import ReadoutPriority
|
||||
from bec_lib.logger import bec_logger
|
||||
@@ -15,6 +16,7 @@ from qtpy.QtGui import QAction, QColor, QIcon # type: ignore
|
||||
from qtpy.QtWidgets import (
|
||||
QApplication,
|
||||
QComboBox,
|
||||
QGraphicsDropShadowEffect,
|
||||
QHBoxLayout,
|
||||
QLabel,
|
||||
QMenu,
|
||||
@@ -26,6 +28,7 @@ from qtpy.QtWidgets import (
|
||||
)
|
||||
|
||||
import bec_widgets
|
||||
from bec_widgets.utils.colors import AccentColors, get_accent_colors
|
||||
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import BECDeviceFilter
|
||||
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
|
||||
|
||||
@@ -101,6 +104,205 @@ class LongPressToolButton(QToolButton):
|
||||
self.showMenu()
|
||||
|
||||
|
||||
class StatusState(str, Enum):
|
||||
DEFAULT = "default"
|
||||
HIGHLIGHT = "highlight"
|
||||
WARNING = "warning"
|
||||
EMERGENCY = "emergency"
|
||||
SUCCESS = "success"
|
||||
|
||||
|
||||
class StatusIndicatorWidget(QWidget):
|
||||
"""Pill-shaped status indicator with icon + label using accent colors."""
|
||||
|
||||
def __init__(
|
||||
self, parent=None, text: str = "Ready", state: StatusState | str = StatusState.DEFAULT
|
||||
):
|
||||
super().__init__(parent)
|
||||
self.setObjectName("StatusIndicatorWidget")
|
||||
self._text = text
|
||||
self._state = self._normalize_state(state)
|
||||
self._theme_connected = False
|
||||
|
||||
layout = QHBoxLayout(self)
|
||||
layout.setContentsMargins(6, 2, 8, 2)
|
||||
layout.setSpacing(6)
|
||||
|
||||
self._icon_label = QLabel(self)
|
||||
self._icon_label.setFixedSize(18, 18)
|
||||
|
||||
self._text_label = QLabel(self)
|
||||
self._text_label.setText(self._text)
|
||||
|
||||
layout.addWidget(self._icon_label)
|
||||
layout.addWidget(self._text_label)
|
||||
|
||||
# Give it a consistent pill height
|
||||
self.setMinimumHeight(24)
|
||||
self.setSizePolicy(QSizePolicy.Policy.Minimum, QSizePolicy.Policy.Fixed)
|
||||
|
||||
# Soft shadow similar to notification banners
|
||||
self._shadow = QGraphicsDropShadowEffect(self)
|
||||
self._shadow.setBlurRadius(18)
|
||||
self._shadow.setOffset(0, 2)
|
||||
self.setGraphicsEffect(self._shadow)
|
||||
|
||||
self._apply_state(self._state)
|
||||
self._connect_theme_change()
|
||||
|
||||
def set_state(self, state: Union[StatusState, str]):
|
||||
"""Update state and refresh visuals."""
|
||||
self._state = self._normalize_state(state)
|
||||
self._apply_state(self._state)
|
||||
|
||||
def set_text(self, text: str):
|
||||
"""Update the displayed text."""
|
||||
self._text = text
|
||||
self._text_label.setText(text)
|
||||
|
||||
def _apply_state(self, state: StatusState):
|
||||
palette = self._resolve_accent_colors()
|
||||
color_attr = {
|
||||
StatusState.DEFAULT: "default",
|
||||
StatusState.HIGHLIGHT: "highlight",
|
||||
StatusState.WARNING: "warning",
|
||||
StatusState.EMERGENCY: "emergency",
|
||||
StatusState.SUCCESS: "success",
|
||||
}.get(state, "default")
|
||||
base_color = getattr(palette, color_attr, None) or getattr(
|
||||
palette, "default", QColor("gray")
|
||||
)
|
||||
|
||||
# Apply style first (returns text color for label)
|
||||
text_color = self._update_style(base_color, self._theme_fg_color())
|
||||
theme_name = self._theme_name()
|
||||
|
||||
# Choose icon per state
|
||||
icon_name_map = {
|
||||
StatusState.DEFAULT: "check_circle",
|
||||
StatusState.HIGHLIGHT: "check_circle",
|
||||
StatusState.SUCCESS: "check_circle",
|
||||
StatusState.WARNING: "warning",
|
||||
StatusState.EMERGENCY: "dangerous",
|
||||
}
|
||||
icon_name = icon_name_map.get(state, "check_circle")
|
||||
|
||||
# Icon color:
|
||||
# - Dark mode: follow text color (usually white) for high contrast.
|
||||
# - Light mode: use a stronger version of the accent color for a colored glyph
|
||||
# that stands out on the pastel pill background.
|
||||
if theme_name == "light":
|
||||
icon_q = QColor(base_color)
|
||||
icon_color = icon_q.name(QColor.HexRgb)
|
||||
else:
|
||||
icon_color = text_color
|
||||
|
||||
icon = material_icon(
|
||||
icon_name, size=(18, 18), convert_to_pixmap=False, filled=True, color=icon_color
|
||||
)
|
||||
if not icon.isNull():
|
||||
self._icon_label.setPixmap(icon.pixmap(18, 18))
|
||||
|
||||
def _update_style(self, color: QColor, fg_color: QColor) -> str:
|
||||
# Ensure the widget actually paints its own background
|
||||
self.setAttribute(Qt.WidgetAttribute.WA_StyledBackground, True)
|
||||
|
||||
fg = QColor(fg_color)
|
||||
text_color = fg.name(QColor.HexRgb)
|
||||
|
||||
theme_name = self._theme_name()
|
||||
|
||||
base = QColor(color)
|
||||
|
||||
start = QColor(base)
|
||||
end = QColor(base)
|
||||
border = QColor(base)
|
||||
|
||||
if theme_name == "light":
|
||||
start.setAlphaF(0.20)
|
||||
end.setAlphaF(0.06)
|
||||
else:
|
||||
start.setAlphaF(0.35)
|
||||
end.setAlphaF(0.12)
|
||||
border = border.darker(120)
|
||||
|
||||
# shadow color tuned per theme to match notification banners
|
||||
if hasattr(self, "_shadow"):
|
||||
if theme_name == "light":
|
||||
shadow_color = QColor(15, 23, 42, 60) # softer shadow on light bg
|
||||
else:
|
||||
shadow_color = QColor(0, 0, 0, 160)
|
||||
self._shadow.setColor(shadow_color)
|
||||
|
||||
# Use a fixed radius for a stable pill look inside toolbars
|
||||
radius = 10
|
||||
|
||||
self.setStyleSheet(
|
||||
f"""
|
||||
#StatusIndicatorWidget {{
|
||||
background-color: qlineargradient(spread:pad, x1:0, y1:0, x2:1, y2:1,
|
||||
stop:0 {start.name(QColor.HexArgb)}, stop:1 {end.name(QColor.HexArgb)});
|
||||
border: 1px solid {border.name(QColor.HexRgb)};
|
||||
border-radius: {radius}px;
|
||||
padding: 2px 8px;
|
||||
}}
|
||||
#StatusIndicatorWidget QLabel {{
|
||||
color: {text_color};
|
||||
background: transparent;
|
||||
}}
|
||||
"""
|
||||
)
|
||||
return text_color
|
||||
|
||||
def _theme_fg_color(self) -> QColor:
|
||||
app = QApplication.instance()
|
||||
theme = getattr(app, "theme", None)
|
||||
if theme is not None and hasattr(theme, "color"):
|
||||
try:
|
||||
fg = theme.color("FG")
|
||||
if isinstance(fg, QColor):
|
||||
return fg
|
||||
except Exception:
|
||||
pass
|
||||
palette = self._resolve_accent_colors()
|
||||
base = getattr(palette, "default", QColor("white"))
|
||||
luminance = (0.299 * base.red() + 0.587 * base.green() + 0.114 * base.blue()) / 255
|
||||
return QColor("#000000") if luminance > 0.65 else QColor("#ffffff")
|
||||
|
||||
def _theme_name(self) -> str:
|
||||
app = QApplication.instance()
|
||||
theme = getattr(app, "theme", None)
|
||||
name = getattr(theme, "theme", None)
|
||||
if isinstance(name, str):
|
||||
return name.lower()
|
||||
return "dark"
|
||||
|
||||
def _connect_theme_change(self):
|
||||
if self._theme_connected:
|
||||
return
|
||||
app = QApplication.instance()
|
||||
theme = getattr(app, "theme", None)
|
||||
if theme is not None and hasattr(theme, "theme_changed"):
|
||||
try:
|
||||
theme.theme_changed.connect(lambda _: self._apply_state(self._state))
|
||||
self._theme_connected = True
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def _normalize_state(state: Union[StatusState, str]) -> StatusState:
|
||||
if isinstance(state, StatusState):
|
||||
return state
|
||||
try:
|
||||
return StatusState(state)
|
||||
except ValueError:
|
||||
return StatusState.DEFAULT
|
||||
|
||||
@staticmethod
|
||||
def _resolve_accent_colors() -> AccentColors:
|
||||
return get_accent_colors()
|
||||
|
||||
|
||||
class ToolBarAction(ABC):
|
||||
"""
|
||||
Abstract base class for toolbar actions.
|
||||
@@ -147,6 +349,54 @@ class SeparatorAction(ToolBarAction):
|
||||
toolbar.addSeparator()
|
||||
|
||||
|
||||
class StatusIndicatorAction(ToolBarAction):
|
||||
"""Toolbar action hosting a LED indicator and status text."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
text: str = "Ready",
|
||||
state: Union[StatusState, str] = StatusState.DEFAULT,
|
||||
tooltip: str | None = None,
|
||||
):
|
||||
super().__init__(icon_path=None, tooltip=tooltip or "View status", checkable=False)
|
||||
self._text = text
|
||||
self._state: StatusState = StatusIndicatorWidget._normalize_state(state)
|
||||
self.widget: StatusIndicatorWidget | None = None
|
||||
self.tooltip = tooltip or ""
|
||||
|
||||
def add_to_toolbar(self, toolbar: QToolBar, target: QWidget):
|
||||
if (
|
||||
self.widget is None
|
||||
or self.widget.parent() is None
|
||||
or self.widget.parent() is not toolbar
|
||||
):
|
||||
self.widget = StatusIndicatorWidget(parent=toolbar, text=self._text, state=self._state)
|
||||
self.action = toolbar.addWidget(self.widget)
|
||||
self.action.setText(self._text)
|
||||
self.set_tooltip(self.tooltip)
|
||||
|
||||
def set_state(self, state: Union[StatusState, str]):
|
||||
self._state = StatusIndicatorWidget._normalize_state(state)
|
||||
if self.widget is not None:
|
||||
self.widget.set_state(self._state)
|
||||
|
||||
def set_text(self, text: str):
|
||||
self._text = text
|
||||
if self.widget is not None:
|
||||
self.widget.set_text(text)
|
||||
if hasattr(self, "action") and self.action is not None:
|
||||
self.action.setText(text)
|
||||
|
||||
def set_tooltip(self, tooltip: str | None):
|
||||
"""Set tooltip on both the underlying widget and the QWidgetAction."""
|
||||
self.tooltip = tooltip or ""
|
||||
if self.widget is not None:
|
||||
self.widget.setToolTip(self.tooltip)
|
||||
if hasattr(self, "action") and self.action is not None:
|
||||
self.action.setToolTip(self.tooltip)
|
||||
|
||||
|
||||
class QtIconAction(IconAction):
|
||||
def __init__(
|
||||
self,
|
||||
|
||||
283
bec_widgets/utils/toolbars/status_bar.py
Normal file
283
bec_widgets/utils/toolbars/status_bar.py
Normal file
@@ -0,0 +1,283 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_lib.messages import BeamlineConditionUpdateEntry
|
||||
from qtpy.QtCore import QObject, QTimer, Signal
|
||||
|
||||
from bec_widgets.utils.bec_connector import BECConnector
|
||||
from bec_widgets.utils.error_popups import SafeSlot
|
||||
from bec_widgets.utils.toolbars.actions import StatusIndicatorAction, StatusState
|
||||
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class BECStatusBroker(BECConnector, QObject):
|
||||
"""Listen to BEC beamline condition endpoints and emit structured signals."""
|
||||
|
||||
_instance: "BECStatusBroker | None" = None
|
||||
_initialized: bool = False
|
||||
|
||||
available_updated = Signal(list) # list of conditions available
|
||||
status_updated = Signal(str, dict) # name, status update
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
return cls._instance
|
||||
|
||||
def __init__(self, parent=None, gui_id: str | None = None, client=None, **kwargs):
|
||||
if self._initialized:
|
||||
return
|
||||
super().__init__(parent=parent, gui_id=gui_id, client=client, **kwargs)
|
||||
self._watched: set[str] = set()
|
||||
self.bec_dispatcher.connect_slot(
|
||||
self.on_available, MessageEndpoints.available_beamline_conditions()
|
||||
)
|
||||
|
||||
self._initialized = True
|
||||
self.refresh_available()
|
||||
|
||||
def refresh_available(self):
|
||||
"""Fetch the current set of beamline conditions once."""
|
||||
try:
|
||||
msg = self.client.connector.get_last(MessageEndpoints.available_beamline_conditions())
|
||||
logger.info(f"StatusBroker: fetched available conditions payload: {msg}")
|
||||
if msg:
|
||||
self.on_available(msg.get("data").content, None)
|
||||
except Exception as exc: # pragma: no cover - runtime env
|
||||
logger.debug(f"Could not fetch available conditions: {exc}")
|
||||
|
||||
@SafeSlot(dict, dict)
|
||||
def on_available(self, data: dict, meta: dict | None = None):
|
||||
condition_list = data.get("conditions") # latest one from the stream
|
||||
self.available_updated.emit(condition_list)
|
||||
for condition in condition_list:
|
||||
name = condition.name
|
||||
if name:
|
||||
self.watch_condition(name)
|
||||
|
||||
def watch_condition(self, name: str):
|
||||
"""Subscribe to updates for a single beamline condition."""
|
||||
if name in self._watched:
|
||||
return
|
||||
self._watched.add(name)
|
||||
endpoint = MessageEndpoints.beamline_condition(name)
|
||||
logger.info(f"StatusBroker: watching condition '{name}' on {endpoint.endpoint}")
|
||||
self.bec_dispatcher.connect_slot(self.on_condition, endpoint)
|
||||
self.fetch_condition(name)
|
||||
|
||||
def fetch_condition(self, name: str):
|
||||
"""Fetch the current value of a beamline condition once."""
|
||||
endpoint = MessageEndpoints.beamline_condition(name)
|
||||
try:
|
||||
msg = self.client.connector.get_last(endpoint)
|
||||
logger.info(f"StatusBroker: fetched condition '{name}' payload: {msg}")
|
||||
if msg:
|
||||
self.on_condition(msg.get("data").content, None)
|
||||
except Exception as exc: # pragma: no cover - runtime env
|
||||
logger.debug(f"Could not fetch condition {name}: {exc}")
|
||||
|
||||
@SafeSlot(dict, dict)
|
||||
def on_condition(self, data: dict, meta: dict | None = None):
|
||||
name = data.get("name")
|
||||
if not name:
|
||||
return
|
||||
logger.info(f"StatusBroker: condition update for '{name}' -> {data}")
|
||||
self.status_updated.emit(str(name), data)
|
||||
|
||||
@classmethod
|
||||
def reset_singleton(cls):
|
||||
"""
|
||||
Reset the singleton instance of the BECStatusBroker.
|
||||
"""
|
||||
cls._instance = None
|
||||
cls._initialized = False
|
||||
|
||||
|
||||
class StatusToolBar(ModularToolBar):
|
||||
"""Status toolbar that auto-manages beamline condition indicators."""
|
||||
|
||||
STATUS_MAP: dict[str, StatusState] = {
|
||||
"normal": StatusState.SUCCESS,
|
||||
"warning": StatusState.WARNING,
|
||||
"alarm": StatusState.EMERGENCY,
|
||||
}
|
||||
|
||||
def __init__(self, parent=None, names: list[str] | None = None, **kwargs):
|
||||
super().__init__(parent=parent, orientation="horizontal", **kwargs)
|
||||
self.setObjectName("StatusToolbar")
|
||||
self._status_bundle = self.new_bundle("status")
|
||||
self.show_bundles(["status"])
|
||||
self._apply_status_toolbar_style()
|
||||
|
||||
self.allowed_names: set[str] | None = set(names) if names is not None else None
|
||||
logger.info(f"StatusToolbar init allowed_names={self.allowed_names}")
|
||||
|
||||
self.broker = BECStatusBroker()
|
||||
self.broker.available_updated.connect(self.on_available_updated)
|
||||
self.broker.status_updated.connect(self.on_status_updated)
|
||||
|
||||
QTimer.singleShot(0, self.refresh_from_broker)
|
||||
|
||||
def refresh_from_broker(self) -> None:
|
||||
|
||||
if self.allowed_names is None:
|
||||
self.broker.refresh_available()
|
||||
else:
|
||||
for name in self.allowed_names:
|
||||
if not self.components.exists(name):
|
||||
# Pre-create a placeholder pill so it is visible even before data arrives.
|
||||
self.add_status_item(
|
||||
name=name, text=name, state=StatusState.DEFAULT, tooltip=None
|
||||
)
|
||||
self.broker.watch_condition(name)
|
||||
|
||||
def _apply_status_toolbar_style(self) -> None:
|
||||
self.setStyleSheet(
|
||||
"QToolBar#StatusToolbar {"
|
||||
f" background-color: {self.background_color};"
|
||||
" border: none;"
|
||||
" border-bottom: 1px solid palette(mid);"
|
||||
"}"
|
||||
)
|
||||
|
||||
# -------- Slots for updates --------
|
||||
@SafeSlot(list)
|
||||
def on_available_updated(self, available_conditions: list):
|
||||
"""Process the available conditions stream and start watching them."""
|
||||
# Keep track of current names from the broker to remove stale ones.
|
||||
current_names: set[str] = set()
|
||||
for condition in available_conditions:
|
||||
if not isinstance(condition, BeamlineConditionUpdateEntry):
|
||||
continue
|
||||
name = condition.name
|
||||
title = condition.title or name
|
||||
if not name:
|
||||
continue
|
||||
current_names.add(name)
|
||||
logger.info(f"StatusToolbar: discovered condition '{name}' title='{title}'")
|
||||
# auto-add unless filtered out
|
||||
if self.allowed_names is None or name in self.allowed_names:
|
||||
self.add_status_item(name=name, text=title, state=StatusState.DEFAULT, tooltip=None)
|
||||
else:
|
||||
# keep hidden but present for context menu toggling
|
||||
self.add_status_item(name=name, text=title, state=StatusState.DEFAULT, tooltip=None)
|
||||
act = self.components.get_action(name)
|
||||
if act and act.action:
|
||||
act.action.setVisible(False)
|
||||
|
||||
# Remove actions that are no longer present in available_conditions.
|
||||
known_actions = [
|
||||
n for n in self.components._components.keys() if n not in ("separator",)
|
||||
] # direct access used for clean-up
|
||||
for name in known_actions:
|
||||
if name not in current_names:
|
||||
logger.info(f"StatusToolbar: removing stale condition '{name}'")
|
||||
try:
|
||||
self.components.remove_action(name)
|
||||
except Exception as exc:
|
||||
logger.warning(f"Failed to remove stale condition '{name}': {exc}")
|
||||
self.refresh()
|
||||
|
||||
@SafeSlot(str, dict)
|
||||
def on_status_updated(self, name: str, payload: dict): # TODO finish update logic
|
||||
"""Update a status pill when a condition update arrives."""
|
||||
state = self.STATUS_MAP.get(str(payload.get("status", "")).lower(), StatusState.DEFAULT)
|
||||
action = self.components.get_action(name) if self.components.exists(name) else None
|
||||
|
||||
# Only update the label when a title is explicitly provided; otherwise keep current text.
|
||||
title = payload.get("title") or None
|
||||
text = title
|
||||
if text is None and action is None:
|
||||
text = payload.get("name") or name
|
||||
|
||||
if "message" in payload:
|
||||
tooltip = payload.get("message") or ""
|
||||
else:
|
||||
tooltip = None
|
||||
logger.info(
|
||||
f"StatusToolbar: update condition '{name}' -> state={state} text='{text}' tooltip='{tooltip}'"
|
||||
)
|
||||
self.set_status(name=name, text=text, state=state, tooltip=tooltip)
|
||||
|
||||
# -------- Items Management --------
|
||||
def add_status_item(
|
||||
self,
|
||||
name: str,
|
||||
*,
|
||||
text: str = "Ready",
|
||||
state: StatusState | str = StatusState.DEFAULT,
|
||||
tooltip: str | None = None,
|
||||
) -> StatusIndicatorAction | None:
|
||||
"""
|
||||
Add or update a named status item in the toolbar.
|
||||
After you added all actions, call `toolbar.refresh()` to update the display.
|
||||
|
||||
Args:
|
||||
name(str): Unique name for the status item.
|
||||
text(str): Text to display in the status item.
|
||||
state(StatusState | str): State of the status item.
|
||||
tooltip(str | None): Optional tooltip for the status item.
|
||||
|
||||
Returns:
|
||||
StatusIndicatorAction | None: The created or updated status action, or None if toolbar is not initialized.
|
||||
"""
|
||||
if self._status_bundle is None:
|
||||
return
|
||||
if self.components.exists(name):
|
||||
return
|
||||
|
||||
action = StatusIndicatorAction(text=text, state=state, tooltip=tooltip)
|
||||
return self.add_status_action(name, action)
|
||||
|
||||
def add_status_action(
|
||||
self, name: str, action: StatusIndicatorAction
|
||||
) -> StatusIndicatorAction | None:
|
||||
"""
|
||||
Attach an existing StatusIndicatorAction to the status toolbar.
|
||||
After you added all actions, call `toolbar.refresh()` to update the display.
|
||||
|
||||
Args:
|
||||
name(str): Unique name for the status item.
|
||||
action(StatusIndicatorAction): The status action to add.
|
||||
|
||||
Returns:
|
||||
StatusIndicatorAction | None: The added status action, or None if toolbar is not initialized.
|
||||
"""
|
||||
self.components.add_safe(name, action)
|
||||
self.get_bundle("status").add_action(name)
|
||||
self.refresh()
|
||||
self.broker.fetch_condition(name)
|
||||
return action
|
||||
|
||||
def set_status(
|
||||
self,
|
||||
name: str = "main",
|
||||
*,
|
||||
state: StatusState | str | None = None,
|
||||
text: str | None = None,
|
||||
tooltip: str | None = None,
|
||||
) -> None:
|
||||
"""
|
||||
Update the status item with the given name, creating it if necessary.
|
||||
|
||||
Args:
|
||||
name(str): Unique name for the status item.
|
||||
state(StatusState | str | None): New state for the status item.
|
||||
text(str | None): New text for the status item.
|
||||
"""
|
||||
action = self.components.get_action(name) if self.components.exists(name) else None
|
||||
if action is None:
|
||||
action = self.add_status_item(
|
||||
name, text=text or "Ready", state=state or "default", tooltip=tooltip
|
||||
)
|
||||
if action is None:
|
||||
return
|
||||
if state is not None:
|
||||
action.set_state(state)
|
||||
if text is not None:
|
||||
action.set_text(text)
|
||||
if tooltip is not None and hasattr(action, "set_tooltip"):
|
||||
action.set_tooltip(tooltip)
|
||||
@@ -6,7 +6,6 @@ from qtpy.QtGui import QIcon
|
||||
from qtpy.QtWidgets import QWidget
|
||||
|
||||
from bec_widgets.utils.bec_designer import designer_material_icon
|
||||
from bec_widgets.utils.colors import apply_theme
|
||||
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow
|
||||
|
||||
DOM_XML = """
|
||||
@@ -45,8 +44,6 @@ class BECMainWindowPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
|
||||
from qtpy.QtCore import Qt
|
||||
from qtpy.QtWidgets import QApplication
|
||||
|
||||
apply_theme("dark")
|
||||
|
||||
import bec_widgets
|
||||
|
||||
MODULE_PATH = os.path.dirname(bec_widgets.__file__)
|
||||
|
||||
@@ -22,6 +22,7 @@ from bec_widgets.utils import UILoader
|
||||
from bec_widgets.utils.bec_widget import BECWidget
|
||||
from bec_widgets.utils.colors import apply_theme
|
||||
from bec_widgets.utils.error_popups import SafeSlot
|
||||
from bec_widgets.utils.toolbars.status_bar import StatusToolBar
|
||||
from bec_widgets.widgets.containers.main_window.addons.hover_widget import HoverWidget
|
||||
from bec_widgets.widgets.containers.main_window.addons.notification_center.notification_banner import (
|
||||
BECNotificationBroker,
|
||||
@@ -114,14 +115,11 @@ class BECMainWindow(BECWidget, QMainWindow):
|
||||
Prepare the BEC specific widgets in the status bar.
|
||||
"""
|
||||
|
||||
# Left: App‑ID label
|
||||
self._app_id_label = QLabel()
|
||||
self._app_id_label.setAlignment(
|
||||
Qt.AlignmentFlag.AlignHCenter | Qt.AlignmentFlag.AlignVCenter
|
||||
)
|
||||
self.status_bar.addWidget(self._app_id_label)
|
||||
# Left: Beamline condition status toolbar (auto-fetches all conditions)
|
||||
self._status_toolbar = StatusToolBar(parent=self, names=None)
|
||||
self.status_bar.addWidget(self._status_toolbar)
|
||||
|
||||
# Add a separator after the app ID label
|
||||
# Add a separator after the status toolbar
|
||||
self._add_separator()
|
||||
|
||||
# Centre: Client‑info label (stretch=1 so it expands)
|
||||
@@ -394,13 +392,17 @@ class BECMainWindow(BECWidget, QMainWindow):
|
||||
help_menu.addAction(bec_docs)
|
||||
help_menu.addAction(widgets_docs)
|
||||
help_menu.addAction(bug_report)
|
||||
help_menu.addSeparator()
|
||||
self._app_id_action = QAction(self)
|
||||
self._app_id_action.setEnabled(False)
|
||||
help_menu.addAction(self._app_id_action)
|
||||
|
||||
################################################################################
|
||||
# Status Bar Addons
|
||||
################################################################################
|
||||
def display_app_id(self):
|
||||
"""
|
||||
Display the app ID in the status bar.
|
||||
Display the app ID in the Help menu.
|
||||
"""
|
||||
if self.bec_dispatcher.cli_server is None:
|
||||
status_message = "Not connected"
|
||||
@@ -408,7 +410,8 @@ class BECMainWindow(BECWidget, QMainWindow):
|
||||
# Get the server ID from the dispatcher
|
||||
server_id = self.bec_dispatcher.cli_server.gui_id
|
||||
status_message = f"App ID: {server_id}"
|
||||
self._app_id_label.setText(status_message)
|
||||
if hasattr(self, "_app_id_action"):
|
||||
self._app_id_action.setText(status_message)
|
||||
|
||||
@SafeSlot(dict, dict)
|
||||
def display_client_message(self, msg: dict, meta: dict):
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from typing import Literal
|
||||
|
||||
import numpy as np
|
||||
@@ -9,7 +8,7 @@ import pyqtgraph as pg
|
||||
from bec_lib import bec_logger, messages
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
from qtpy.QtCore import QObject, Qt, QThread, QTimer, Signal
|
||||
from qtpy.QtCore import QTimer, Signal
|
||||
from qtpy.QtGui import QTransform
|
||||
from scipy.interpolate import (
|
||||
CloughTocher2DInterpolator,
|
||||
@@ -78,90 +77,6 @@ class HeatmapConfig(ConnectionConfig):
|
||||
_validate_color_palette = field_validator("color_map")(Colors.validate_color_map)
|
||||
|
||||
|
||||
@dataclass
|
||||
class _InterpolationRequest:
|
||||
"""Immutable payload describing an interpolation request for the worker thread.
|
||||
|
||||
Args:
|
||||
x_data: X coordinates collected so far.
|
||||
y_data: Y coordinates collected so far.
|
||||
z_data: Z values associated with x/y.
|
||||
data_version: Number of points at request time (len(z_data)); used to reject stale results.
|
||||
scan_id: Identifier for the scan that produced the data.
|
||||
interpolation: Interpolation method to apply.
|
||||
oversampling_factor: Oversampling factor for the interpolation grid.
|
||||
"""
|
||||
|
||||
x_data: list[float]
|
||||
y_data: list[float]
|
||||
z_data: list[float]
|
||||
data_version: int
|
||||
scan_id: str
|
||||
interpolation: str
|
||||
oversampling_factor: float
|
||||
|
||||
|
||||
class _StepInterpolationWorker(QObject):
|
||||
"""Worker for performing step-scan interpolation in a background thread.
|
||||
|
||||
This worker computes the interpolated heatmap image using the provided data
|
||||
and settings, then emits the result or a failure signal.
|
||||
|
||||
Signals:
|
||||
finished(image, transform, data_version, scan_id):
|
||||
Emitted when interpolation is successful.
|
||||
- image: The resulting image (numpy array or similar).
|
||||
- transform: The QTransform for the image.
|
||||
- data_version: The data version for the request.
|
||||
- scan_id: The scan identifier.
|
||||
failed(error_message, data_version, scan_id):
|
||||
Emitted when interpolation fails.
|
||||
- error_message: The error message string.
|
||||
- data_version: The data version for the request.
|
||||
- scan_id: The scan identifier.
|
||||
"""
|
||||
|
||||
finished = Signal(object, object, int, str)
|
||||
failed = Signal(str, int, str)
|
||||
|
||||
def __init__(self, parent: QObject | None = None):
|
||||
super().__init__(parent=parent)
|
||||
self._active_request: _InterpolationRequest | None = None
|
||||
self._processing = False
|
||||
|
||||
@property
|
||||
def is_processing(self) -> bool:
|
||||
"""Return whether the worker is currently processing a request."""
|
||||
return self._processing
|
||||
|
||||
@SafeSlot(object, int)
|
||||
def process(self, request: _InterpolationRequest, data_version: int):
|
||||
"""
|
||||
Process an interpolation request in the worker thread.
|
||||
|
||||
Args:
|
||||
request(_InterpolationRequest): The interpolation request payload.
|
||||
data_version(int): The data version for the request.
|
||||
"""
|
||||
self._active_request = request
|
||||
self._processing = True
|
||||
try:
|
||||
image, transform = Heatmap.compute_step_scan_image(
|
||||
x_data=np.asarray(request.x_data, dtype=float),
|
||||
y_data=np.asarray(request.y_data, dtype=float),
|
||||
z_data=np.asarray(request.z_data, dtype=float),
|
||||
oversampling_factor=request.oversampling_factor,
|
||||
interpolation_method=request.interpolation,
|
||||
)
|
||||
except Exception as exc: # pragma: no cover - defensive
|
||||
logger.warning(f"Step-scan interpolation failed with: {exc}")
|
||||
self.failed.emit(str(exc), data_version, request.scan_id)
|
||||
self._processing = False
|
||||
return
|
||||
self._processing = False
|
||||
self.finished.emit(image, transform, data_version, request.scan_id)
|
||||
|
||||
|
||||
class Heatmap(ImageBase):
|
||||
"""
|
||||
Heatmap widget for visualizing 2d grid data with color mapping for the z-axis.
|
||||
@@ -248,7 +163,6 @@ class Heatmap(ImageBase):
|
||||
new_scan_id = Signal(str)
|
||||
sync_signal_update = Signal()
|
||||
heatmap_property_changed = Signal()
|
||||
interpolation_requested = Signal(object, int)
|
||||
|
||||
def __init__(self, parent=None, config: HeatmapConfig | None = None, **kwargs):
|
||||
if config is None:
|
||||
@@ -271,12 +185,6 @@ class Heatmap(ImageBase):
|
||||
self.scan_item = None
|
||||
self.status_message = None
|
||||
self._grid_index = None
|
||||
# Highest data_version we have dispatched for the current scan; used to drop stale results.
|
||||
# Initialized to -1 so the first real request (len(z_data) >= 0) always supersedes it.
|
||||
self._latest_interpolation_version = -1
|
||||
self._interpolation_thread: QThread | None = None
|
||||
self._interpolation_worker: _StepInterpolationWorker | None = None
|
||||
self._pending_interpolation_request: _InterpolationRequest | None = None
|
||||
self.heatmap_dialog = None
|
||||
bg_color = pg.mkColor((240, 240, 240, 150))
|
||||
self.config_label = pg.LegendItem(
|
||||
@@ -536,7 +444,6 @@ class Heatmap(ImageBase):
|
||||
if current_scan_id is None:
|
||||
return
|
||||
if current_scan_id != self.scan_id:
|
||||
self._invalidate_interpolation_generation() # Invalidate any pending interpolation work when a new scan starts
|
||||
self.reset()
|
||||
self.new_scan.emit()
|
||||
self.new_scan_id.emit(current_scan_id)
|
||||
@@ -642,38 +549,13 @@ class Heatmap(ImageBase):
|
||||
if self._image_config.show_config_label:
|
||||
self.redraw_config_label()
|
||||
|
||||
if self._is_grid_scan_supported(scan_msg):
|
||||
img, transform = self.get_grid_scan_image(z_data, scan_msg)
|
||||
self._apply_image_update(img, transform)
|
||||
return
|
||||
|
||||
if len(z_data) < 4:
|
||||
# LinearNDInterpolator requires at least 4 points to interpolate
|
||||
logger.warning("Not enough data points to interpolate; skipping update.")
|
||||
return
|
||||
|
||||
self._request_step_scan_interpolation(x_data, y_data, z_data, scan_msg)
|
||||
|
||||
def _apply_image_update(self, img: np.ndarray | None, transform: QTransform | None):
|
||||
"""Apply interpolated image and transform to the heatmap display.
|
||||
|
||||
This method updates the main image with the computed data and emits
|
||||
the image_updated signal. Color bar signals are temporarily blocked
|
||||
during the update to prevent cascading updates.
|
||||
|
||||
Args:
|
||||
img(np.ndarray): The interpolated image data, or None if unavailable
|
||||
transform(QTransform): QTransform mapping pixel to world coordinates, or None if unavailable
|
||||
"""
|
||||
if img is None or transform is None:
|
||||
img, transform = self.get_image_data(x_data=x_data, y_data=y_data, z_data=z_data)
|
||||
if img is None:
|
||||
logger.warning("Image data is None; skipping update.")
|
||||
return
|
||||
|
||||
if self._color_bar is not None:
|
||||
self._color_bar.blockSignals(True)
|
||||
if self.main_image is None:
|
||||
logger.warning("Main image item is None; cannot update image.")
|
||||
return
|
||||
self.main_image.set_data(img, transform=transform)
|
||||
if self._color_bar is not None:
|
||||
self._color_bar.blockSignals(False)
|
||||
@@ -681,128 +563,6 @@ class Heatmap(ImageBase):
|
||||
if self.crosshair is not None:
|
||||
self.crosshair.update_markers_on_image_change()
|
||||
|
||||
def _request_step_scan_interpolation(
|
||||
self,
|
||||
x_data: list[float],
|
||||
y_data: list[float],
|
||||
z_data: list[float],
|
||||
msg: messages.ScanStatusMessage,
|
||||
):
|
||||
"""Request step-scan interpolation in a background thread.
|
||||
|
||||
If a thread is already running, the request is queued as a pending request
|
||||
and will be processed when the current interpolation completes.
|
||||
|
||||
Args:
|
||||
x_data(list[float]): X coordinates of data points
|
||||
y_data(list[float]): Y coordinates of data points
|
||||
z_data(list[float]): Z values at each point
|
||||
msg(messages.ScanStatusMessage): Scan status message containing scan metadata
|
||||
"""
|
||||
request = _InterpolationRequest(
|
||||
x_data=list(x_data),
|
||||
y_data=list(y_data),
|
||||
z_data=list(z_data),
|
||||
data_version=len(z_data),
|
||||
scan_id=msg.scan_id,
|
||||
interpolation=self._image_config.interpolation,
|
||||
oversampling_factor=self._image_config.oversampling_factor,
|
||||
)
|
||||
|
||||
if self._interpolation_worker is not None and self._interpolation_worker.is_processing:
|
||||
self._pending_interpolation_request = request
|
||||
return
|
||||
|
||||
self._start_step_scan_interpolation(request)
|
||||
|
||||
def _ensure_interpolation_thread(self):
|
||||
if self._interpolation_thread is None:
|
||||
self._interpolation_thread = QThread()
|
||||
self._interpolation_worker = _StepInterpolationWorker()
|
||||
self._interpolation_worker.moveToThread(self._interpolation_thread)
|
||||
self.interpolation_requested.connect(
|
||||
self._interpolation_worker.process, Qt.ConnectionType.QueuedConnection
|
||||
)
|
||||
self._interpolation_worker.finished.connect(
|
||||
self._on_interpolation_finished, Qt.ConnectionType.QueuedConnection
|
||||
)
|
||||
self._interpolation_worker.failed.connect(
|
||||
self._on_interpolation_failed, Qt.ConnectionType.QueuedConnection
|
||||
)
|
||||
if self._interpolation_thread is not None and not self._interpolation_thread.isRunning():
|
||||
self._interpolation_thread.start()
|
||||
|
||||
def _start_step_scan_interpolation(self, request: _InterpolationRequest):
|
||||
# data_version = len(z_data) at the time of the request; keep the latest to gate results.
|
||||
self._ensure_interpolation_thread()
|
||||
if self._interpolation_thread is not None and not self._interpolation_thread.isRunning():
|
||||
self._interpolation_thread.start()
|
||||
self._latest_interpolation_version = request.data_version
|
||||
self.interpolation_requested.emit(request, request.data_version)
|
||||
|
||||
def _on_interpolation_finished(
|
||||
self, img: np.ndarray, transform: QTransform, data_version: int, scan_id: str
|
||||
):
|
||||
# Only accept results that match the latest dispatched version for the active scan.
|
||||
if data_version == self._latest_interpolation_version and scan_id == self.scan_id:
|
||||
self._apply_image_update(img, transform)
|
||||
else:
|
||||
logger.info("Discarding outdated interpolation result.")
|
||||
self._maybe_start_pending_interpolation()
|
||||
|
||||
def _on_interpolation_failed(self, error: str, data_version: int, scan_id: str):
|
||||
logger.warning(f"Interpolation failed for scan {scan_id} (version {data_version}): {error}")
|
||||
self._maybe_start_pending_interpolation()
|
||||
|
||||
def _finish_interpolation_thread(self):
|
||||
self._pending_interpolation_request = None
|
||||
if self._interpolation_worker is not None:
|
||||
try:
|
||||
self.interpolation_requested.disconnect(self._interpolation_worker.process)
|
||||
except (TypeError, RuntimeError) as ext:
|
||||
logger.warning(f"Processing thread already disconnected: {ext}")
|
||||
pass
|
||||
self._interpolation_worker.deleteLater()
|
||||
self._interpolation_worker = None
|
||||
if self._interpolation_thread is not None:
|
||||
if self._interpolation_thread.isRunning():
|
||||
self._interpolation_thread.quit()
|
||||
if not self._interpolation_thread.wait(3000): # 3s timeout
|
||||
logger.error(
|
||||
f"Interpolation thread of widget {self.gui_id} did not stop within timeout 3s; leaving it dangling."
|
||||
)
|
||||
self._interpolation_thread.deleteLater()
|
||||
self._interpolation_thread = None
|
||||
logger.info(f"Interpolation thread finished of widget {self.gui_id}")
|
||||
|
||||
def _maybe_start_pending_interpolation(self):
|
||||
if self._pending_interpolation_request is None:
|
||||
return
|
||||
if self._pending_interpolation_request.scan_id != self.scan_id:
|
||||
self._pending_interpolation_request = None
|
||||
return
|
||||
if self._interpolation_worker is not None and self._interpolation_worker.is_processing:
|
||||
return
|
||||
|
||||
pending = self._pending_interpolation_request
|
||||
self._pending_interpolation_request = None
|
||||
self._start_step_scan_interpolation(pending)
|
||||
|
||||
def _cancel_interpolation(self):
|
||||
"""Cancel any pending interpolation request without invalidating in-flight work.
|
||||
|
||||
This clears the pending request queue but does not invalidate in-flight work,
|
||||
allowing any currently running interpolation to complete and update the display
|
||||
if it matches the current scan.
|
||||
"""
|
||||
self._pending_interpolation_request = None
|
||||
# Do not change the active data version so an in-flight worker can still deliver.
|
||||
|
||||
def _invalidate_interpolation_generation(self):
|
||||
"""Invalidate all pending interpolation results and ignore in-flight updates."""
|
||||
self._pending_interpolation_request = None
|
||||
self._latest_interpolation_version = -1
|
||||
|
||||
def redraw_config_label(self):
|
||||
scan_msg = self.status_message
|
||||
if scan_msg is None:
|
||||
@@ -848,35 +608,21 @@ class Heatmap(ImageBase):
|
||||
logger.warning("x, y, or z data is None; skipping update.")
|
||||
return None, None
|
||||
|
||||
if self._is_grid_scan_supported(msg):
|
||||
return self.get_grid_scan_image(z_data, msg)
|
||||
if msg.scan_name == "grid_scan" and not self._image_config.enforce_interpolation:
|
||||
# We only support the grid scan mode if both scanning motors
|
||||
# are configured in the heatmap config.
|
||||
device_x = self._image_config.x_device.entry
|
||||
device_y = self._image_config.y_device.entry
|
||||
if (
|
||||
device_x in msg.request_inputs["arg_bundle"]
|
||||
and device_y in msg.request_inputs["arg_bundle"]
|
||||
):
|
||||
return self.get_grid_scan_image(z_data, msg)
|
||||
if len(z_data) < 4:
|
||||
# LinearNDInterpolator requires at least 4 points to interpolate
|
||||
return None, None
|
||||
return self.get_step_scan_image(x_data, y_data, z_data, msg)
|
||||
|
||||
def _is_grid_scan_supported(self, msg: messages.ScanStatusMessage) -> bool:
|
||||
"""Check if the scan can use optimized grid_scan rendering.
|
||||
|
||||
Grid scans can avoid interpolation if both X and Y devices match
|
||||
the configured devices and interpolation is not enforced.
|
||||
|
||||
Args:
|
||||
msg(messages.ScanStatusMessage): Scan status message containing scan metadata
|
||||
|
||||
Returns:
|
||||
True if grid_scan optimization is applicable, False otherwise
|
||||
"""
|
||||
if msg.scan_name != "grid_scan" or self._image_config.enforce_interpolation:
|
||||
return False
|
||||
|
||||
device_x = self._image_config.x_device.entry
|
||||
device_y = self._image_config.y_device.entry
|
||||
return (
|
||||
device_x in msg.request_inputs["arg_bundle"]
|
||||
and device_y in msg.request_inputs["arg_bundle"]
|
||||
)
|
||||
|
||||
def get_grid_scan_image(
|
||||
self, z_data: list[float], msg: messages.ScanStatusMessage
|
||||
) -> tuple[np.ndarray, QTransform]:
|
||||
@@ -971,49 +717,17 @@ class Heatmap(ImageBase):
|
||||
Returns:
|
||||
tuple[np.ndarray, QTransform]: The image data and the QTransform.
|
||||
"""
|
||||
return self.compute_step_scan_image(
|
||||
x_data=x_data,
|
||||
y_data=y_data,
|
||||
z_data=z_data,
|
||||
oversampling_factor=self._image_config.oversampling_factor,
|
||||
interpolation_method=self._image_config.interpolation,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def compute_step_scan_image(
|
||||
x_data: list[float] | np.ndarray,
|
||||
y_data: list[float] | np.ndarray,
|
||||
z_data: list[float] | np.ndarray,
|
||||
oversampling_factor: float,
|
||||
interpolation_method: str,
|
||||
) -> tuple[np.ndarray, QTransform]:
|
||||
"""Compute interpolated heatmap image from step-scan data.
|
||||
|
||||
This static method is suitable for execution in a background thread
|
||||
as it doesn't access any instance state.
|
||||
|
||||
Args:
|
||||
x_data(list[float]): X coordinates of data points
|
||||
y_data(list[float]): Y coordinates of data points
|
||||
z_data(list[float]): Z values at each point
|
||||
oversampling_factor(float): Grid resolution multiplier (>1.0 for higher resolution)
|
||||
interpolation_method(str): One of 'linear', 'nearest', or 'clough'
|
||||
|
||||
Returns:
|
||||
(tuple[np.ndarray, QTransform]):Tuple of (interpolated_grid, transform) where transform maps pixel to world coordinates
|
||||
"""
|
||||
xy_data = np.column_stack((x_data, y_data))
|
||||
grid_x, grid_y, transform = Heatmap.build_image_grid(
|
||||
positions=xy_data, oversampling_factor=oversampling_factor
|
||||
)
|
||||
grid_x, grid_y, transform = self.get_image_grid(xy_data)
|
||||
|
||||
if interpolation_method == "linear":
|
||||
# Interpolate the z data onto the grid
|
||||
if self._image_config.interpolation == "linear":
|
||||
interp = LinearNDInterpolator(xy_data, z_data)
|
||||
elif interpolation_method == "nearest":
|
||||
elif self._image_config.interpolation == "nearest":
|
||||
interp = NearestNDInterpolator(xy_data, z_data)
|
||||
elif interpolation_method == "clough":
|
||||
elif self._image_config.interpolation == "clough":
|
||||
interp = CloughTocher2DInterpolator(xy_data, z_data)
|
||||
else: # pragma: no cover - guarded by validation
|
||||
else:
|
||||
raise ValueError(
|
||||
"Interpolation method must be either 'linear', 'nearest', or 'clough'."
|
||||
)
|
||||
@@ -1032,33 +746,22 @@ class Heatmap(ImageBase):
|
||||
Returns:
|
||||
tuple[np.ndarray, np.ndarray, QTransform]: The grid x and y coordinates and the QTransform.
|
||||
"""
|
||||
return self.build_image_grid(
|
||||
positions=positions, oversampling_factor=self._image_config.oversampling_factor
|
||||
)
|
||||
base_width, base_height = self.estimate_image_resolution(positions)
|
||||
|
||||
@staticmethod
|
||||
def build_image_grid(
|
||||
positions: np.ndarray, oversampling_factor: float
|
||||
) -> tuple[np.ndarray, np.ndarray, QTransform]:
|
||||
"""Build an interpolation grid covering the data positions.
|
||||
# Apply oversampling factor
|
||||
factor = self._image_config.oversampling_factor
|
||||
|
||||
Args:
|
||||
positions: (N, 2) array of (x, y) coordinates
|
||||
oversampling_factor: Grid resolution multiplier (>1.0 for higher resolution)
|
||||
|
||||
Returns:
|
||||
Tuple of (grid_x, grid_y, transform) where grid_x/grid_y are meshgrids
|
||||
for interpolation and transform maps pixel to world coordinates
|
||||
"""
|
||||
base_width, base_height = Heatmap.estimate_image_resolution(positions)
|
||||
width = max(1, int(base_width * oversampling_factor))
|
||||
height = max(1, int(base_height * oversampling_factor))
|
||||
# Apply oversampling
|
||||
width = int(base_width * factor)
|
||||
height = int(base_height * factor)
|
||||
|
||||
# Create grid
|
||||
grid_x, grid_y = np.mgrid[
|
||||
min(positions[:, 0]) : max(positions[:, 0]) : width * 1j,
|
||||
min(positions[:, 1]) : max(positions[:, 1]) : height * 1j,
|
||||
]
|
||||
|
||||
# Calculate transform
|
||||
x_min, x_max = min(positions[:, 0]), max(positions[:, 0])
|
||||
y_min, y_max = min(positions[:, 1]), max(positions[:, 1])
|
||||
x_range = x_max - x_min
|
||||
@@ -1142,7 +845,6 @@ class Heatmap(ImageBase):
|
||||
return scan_devices, "value"
|
||||
|
||||
def reset(self):
|
||||
self._cancel_interpolation()
|
||||
self._grid_index = None
|
||||
self.main_image.clear()
|
||||
if self.crosshair is not None:
|
||||
@@ -1277,10 +979,6 @@ class Heatmap(ImageBase):
|
||||
"""
|
||||
self.main_image.transpose = enable
|
||||
|
||||
def cleanup(self):
|
||||
self._finish_interpolation_thread()
|
||||
super().cleanup()
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
import sys
|
||||
|
||||
@@ -47,9 +47,7 @@ class DeviceBrowser(BECWidget, QWidget):
|
||||
) -> None:
|
||||
super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs)
|
||||
self.get_bec_shortcuts()
|
||||
self._config_helper = ConfigHelper(
|
||||
self.client.connector, self.client._service_name, self.client.device_manager
|
||||
)
|
||||
self._config_helper = ConfigHelper(self.client.connector, self.client._service_name)
|
||||
self._q_threadpool = QThreadPool()
|
||||
self.ui = None
|
||||
self.init_ui()
|
||||
|
||||
@@ -229,7 +229,7 @@ class DirectUpdateDeviceConfigDialog(BECWidget, DeviceConfigDialog):
|
||||
self._device = device
|
||||
self._q_threadpool = threadpool or QThreadPool()
|
||||
self._config_helper = config_helper or ConfigHelper(
|
||||
self.client.connector, self.client._service_name, self.client.device_manager
|
||||
self.client.connector, self.client._service_name
|
||||
)
|
||||
super().__init__(parent=parent, **kwargs)
|
||||
self.get_bec_shortcuts()
|
||||
|
||||
@@ -78,15 +78,15 @@ def test_available_widgets(qtbot, connected_client_gui_obj):
|
||||
"""This test checks that all widgets that are available via gui.available_widgets can be created and removed."""
|
||||
gui = connected_client_gui_obj
|
||||
dock_area = gui.bec
|
||||
# Number of top level widgets, should be 4
|
||||
top_level_widgets_count = 12
|
||||
# Number of top level widgets, should be 5
|
||||
top_level_widgets_count = 13
|
||||
assert len(gui._server_registry) == top_level_widgets_count
|
||||
names = set(list(gui._server_registry.keys()))
|
||||
# Number of widgets with parent_id == None, should be 2
|
||||
# Number of widgets with parent_id == None, should be 3
|
||||
widgets = [
|
||||
widget for widget in gui._server_registry.values() if widget["config"]["parent_id"] is None
|
||||
]
|
||||
assert len(widgets) == 2
|
||||
assert len(widgets) == 3
|
||||
|
||||
# Test all relevant widgets
|
||||
for object_name in gui.available_widgets.__dict__:
|
||||
@@ -122,7 +122,7 @@ def test_available_widgets(qtbot, connected_client_gui_obj):
|
||||
for widget in gui._server_registry.values()
|
||||
if widget["config"]["parent_id"] is None
|
||||
]
|
||||
assert len(widgets) == 2
|
||||
assert len(widgets) == 3
|
||||
|
||||
#############################
|
||||
####### Remove widget #######
|
||||
@@ -154,10 +154,10 @@ def test_available_widgets(qtbot, connected_client_gui_obj):
|
||||
f"is {len(gui._server_registry)} instead of {top_level_widgets_count}. The following "
|
||||
f"widgets are not cleaned up: {set(gui._server_registry.keys()) - names}"
|
||||
) from exc
|
||||
# Number of widgets with parent_id == None, should be 2
|
||||
# Number of widgets with parent_id == None, should be 3
|
||||
widgets = [
|
||||
widget
|
||||
for widget in gui._server_registry.values()
|
||||
if widget["config"]["parent_id"] is None
|
||||
]
|
||||
assert len(widgets) == 2
|
||||
assert len(widgets) == 3
|
||||
|
||||
@@ -134,7 +134,7 @@ def test_update_cycle(update_dialog, qtbot):
|
||||
"deviceClass": "TestDevice",
|
||||
"deviceConfig": {"param1": "val1"},
|
||||
"readoutPriority": "monitored",
|
||||
"description": "",
|
||||
"description": None,
|
||||
"readOnly": False,
|
||||
"softwareTrigger": False,
|
||||
"onFailure": "retry",
|
||||
|
||||
@@ -4,15 +4,8 @@ import numpy as np
|
||||
import pytest
|
||||
from bec_lib import messages
|
||||
from bec_lib.scan_history import ScanHistory
|
||||
from qtpy.QtGui import QTransform
|
||||
|
||||
from bec_widgets.widgets.plots.heatmap.heatmap import (
|
||||
Heatmap,
|
||||
HeatmapConfig,
|
||||
HeatmapDeviceSignal,
|
||||
_InterpolationRequest,
|
||||
_StepInterpolationWorker,
|
||||
)
|
||||
from bec_widgets.widgets.plots.heatmap.heatmap import Heatmap, HeatmapConfig, HeatmapDeviceSignal
|
||||
|
||||
# pytest: disable=unused-import
|
||||
from tests.unit_tests.client_mocks import mocked_client
|
||||
@@ -341,16 +334,12 @@ def test_heatmap_widget_reset(heatmap_widget):
|
||||
"""
|
||||
Test that the reset method clears the plot.
|
||||
"""
|
||||
heatmap_widget._pending_interpolation_request = object()
|
||||
heatmap_widget._latest_interpolation_version = 5
|
||||
heatmap_widget.scan_item = create_dummy_scan_item()
|
||||
heatmap_widget.plot(x_name="samx", y_name="samy", z_name="bpm4i")
|
||||
|
||||
heatmap_widget.reset()
|
||||
assert heatmap_widget._grid_index is None
|
||||
assert heatmap_widget.main_image.raw_data is None
|
||||
assert heatmap_widget._pending_interpolation_request is None
|
||||
assert heatmap_widget._latest_interpolation_version == 5
|
||||
|
||||
|
||||
def test_heatmap_widget_update_plot_with_scan_history(heatmap_widget, grid_scan_history_msg, qtbot):
|
||||
@@ -375,111 +364,3 @@ def test_heatmap_widget_update_plot_with_scan_history(heatmap_widget, grid_scan_
|
||||
heatmap_widget.enforce_interpolation = True
|
||||
heatmap_widget.oversampling_factor = 2.0
|
||||
qtbot.waitUntil(lambda: heatmap_widget.main_image.raw_data.shape == (20, 20))
|
||||
|
||||
|
||||
def test_step_interpolation_worker_emits_finished(qtbot):
|
||||
worker = _StepInterpolationWorker()
|
||||
request = _InterpolationRequest(
|
||||
x_data=[0.0, 1.0, 0.5, 0.2],
|
||||
y_data=[0.0, 0.0, 1.0, 1.0],
|
||||
z_data=[1.0, 2.0, 3.0, 4.0],
|
||||
data_version=4,
|
||||
scan_id="scan-1",
|
||||
interpolation="linear",
|
||||
oversampling_factor=1.0,
|
||||
)
|
||||
with qtbot.waitSignal(worker.finished, timeout=1000) as blocker:
|
||||
worker.process(request, request.data_version)
|
||||
img, transform, data_version, scan_id = blocker.args
|
||||
assert img.shape[0] > 0
|
||||
assert isinstance(transform, QTransform)
|
||||
assert data_version == request.data_version
|
||||
assert scan_id == request.scan_id
|
||||
|
||||
|
||||
def test_step_interpolation_worker_emits_failed(qtbot, monkeypatch):
|
||||
def _scan_goes_boom(**kwargs):
|
||||
raise RuntimeError("crash")
|
||||
|
||||
monkeypatch.setattr(
|
||||
"bec_widgets.widgets.plots.heatmap.heatmap.Heatmap.compute_step_scan_image", _scan_goes_boom
|
||||
)
|
||||
worker = _StepInterpolationWorker()
|
||||
request = _InterpolationRequest(
|
||||
x_data=[0.0, 1.0, 0.5, 0.2],
|
||||
y_data=[0.0, 0.0, 1.0, 1.0],
|
||||
z_data=[1.0, 2.0, 3.0, 4.0],
|
||||
data_version=99,
|
||||
scan_id="scan-err",
|
||||
interpolation="linear",
|
||||
oversampling_factor=1.0,
|
||||
)
|
||||
with qtbot.waitSignal(worker.failed, timeout=1000) as blocker:
|
||||
worker.process(request, request.data_version)
|
||||
error, data_version, scan_id = blocker.args
|
||||
assert "crash" in error
|
||||
assert data_version == request.data_version
|
||||
assert scan_id == request.scan_id
|
||||
|
||||
|
||||
def test_interpolation_generation_invalidation(heatmap_widget):
|
||||
heatmap_widget.scan_id = "scan-1"
|
||||
heatmap_widget._latest_interpolation_version = 2
|
||||
with (
|
||||
mock.patch.object(heatmap_widget, "_apply_image_update") as apply_mock,
|
||||
mock.patch.object(heatmap_widget, "_maybe_start_pending_interpolation") as maybe_mock,
|
||||
):
|
||||
heatmap_widget._on_interpolation_finished(
|
||||
np.zeros((2, 2)), QTransform(), data_version=1, scan_id="scan-1"
|
||||
)
|
||||
apply_mock.assert_not_called()
|
||||
maybe_mock.assert_called_once()
|
||||
|
||||
|
||||
def test_pending_request_queueing_and_start(heatmap_widget):
|
||||
heatmap_widget.scan_id = "scan-queue"
|
||||
heatmap_widget.status_message = messages.ScanStatusMessage(
|
||||
scan_id="scan-queue",
|
||||
status="open",
|
||||
scan_name="step_scan",
|
||||
scan_type="step",
|
||||
metadata={},
|
||||
info={"positions": [[0, 0], [1, 1], [2, 2], [3, 3]]},
|
||||
)
|
||||
# Simulate an active worker processing a job so new requests are queued.
|
||||
heatmap_widget._interpolation_worker = mock.MagicMock()
|
||||
heatmap_widget._interpolation_worker.is_processing = True
|
||||
|
||||
with mock.patch.object(heatmap_widget, "_start_step_scan_interpolation") as start_mock:
|
||||
heatmap_widget._request_step_scan_interpolation(
|
||||
x_data=[0, 1, 2, 3],
|
||||
y_data=[0, 1, 2, 3],
|
||||
z_data=[0, 1, 2, 3],
|
||||
msg=heatmap_widget.status_message,
|
||||
)
|
||||
assert heatmap_widget._pending_interpolation_request is not None
|
||||
|
||||
# Now simulate worker finished and thread cleaned up
|
||||
heatmap_widget._interpolation_worker.is_processing = False
|
||||
pending = heatmap_widget._pending_interpolation_request
|
||||
heatmap_widget._pending_interpolation_request = pending
|
||||
heatmap_widget._maybe_start_pending_interpolation()
|
||||
|
||||
start_mock.assert_called_once()
|
||||
|
||||
|
||||
def test_finish_interpolation_thread_cleans_references(heatmap_widget):
|
||||
worker_mock = mock.Mock()
|
||||
thread_mock = mock.Mock()
|
||||
thread_mock.isRunning.return_value = True
|
||||
heatmap_widget._interpolation_worker = worker_mock
|
||||
heatmap_widget._interpolation_thread = thread_mock
|
||||
|
||||
heatmap_widget._finish_interpolation_thread()
|
||||
|
||||
worker_mock.deleteLater.assert_called_once()
|
||||
thread_mock.quit.assert_called_once()
|
||||
thread_mock.wait.assert_called_once()
|
||||
thread_mock.deleteLater.assert_called_once()
|
||||
assert heatmap_widget._interpolation_worker is None
|
||||
assert heatmap_widget._interpolation_thread is None
|
||||
|
||||
97
tests/unit_tests/test_status_bar.py
Normal file
97
tests/unit_tests/test_status_bar.py
Normal file
@@ -0,0 +1,97 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from bec_lib.messages import BeamlineConditionUpdateEntry
|
||||
from qtpy.QtWidgets import QToolBar
|
||||
|
||||
from bec_widgets.utils.toolbars.actions import StatusIndicatorAction, StatusIndicatorWidget
|
||||
from bec_widgets.utils.toolbars.status_bar import BECStatusBroker, StatusToolBar
|
||||
|
||||
from .client_mocks import mocked_client
|
||||
from .conftest import create_widget
|
||||
|
||||
|
||||
class TestStatusIndicators:
|
||||
"""Widget/action level tests independent of broker wiring."""
|
||||
|
||||
def test_indicator_widget_state_and_text(self, qtbot):
|
||||
widget = StatusIndicatorWidget(text="Ready", state="success")
|
||||
qtbot.addWidget(widget)
|
||||
qtbot.waitExposed(widget)
|
||||
widget.set_state("warning")
|
||||
widget.set_text("Alert")
|
||||
assert widget._state.value == "warning"
|
||||
assert widget._text_label.text() == "Alert"
|
||||
|
||||
def test_indicator_action_updates_widget_and_action(self, qtbot):
|
||||
qt_toolbar = QToolBar()
|
||||
qtbot.addWidget(qt_toolbar)
|
||||
|
||||
action = StatusIndicatorAction(text="Ready", tooltip="Initial")
|
||||
action.add_to_toolbar(qt_toolbar, qt_toolbar)
|
||||
|
||||
action.set_tooltip("Updated tooltip")
|
||||
action.set_text("Running")
|
||||
|
||||
assert action.action.toolTip() == "Updated tooltip"
|
||||
assert action.widget.toolTip() == "Updated tooltip" # type: ignore[union-attr]
|
||||
assert action.widget._text_label.text() == "Running" # type: ignore[union-attr]
|
||||
|
||||
|
||||
class TestStatusBar:
|
||||
"""Status bar + broker integration using fake redis client (mocked_client)."""
|
||||
|
||||
@pytest.fixture(params=[{}, {"names": ["alpha"]}])
|
||||
def status_toolbar(self, qtbot, mocked_client, request):
|
||||
broker = BECStatusBroker(client=mocked_client)
|
||||
toolbar = create_widget(qtbot, StatusToolBar, **request.param)
|
||||
yield toolbar
|
||||
broker.reset_singleton()
|
||||
|
||||
def test_allowed_names_precreates_placeholder(self, status_toolbar):
|
||||
status_toolbar.broker.refresh_available = lambda: None
|
||||
status_toolbar.refresh_from_broker()
|
||||
|
||||
# We parametrize the fixture so one invocation has allowed_names set.
|
||||
if status_toolbar.allowed_names:
|
||||
name = next(iter(status_toolbar.allowed_names))
|
||||
assert status_toolbar.components.exists(name)
|
||||
act = status_toolbar.components.get_action(name)
|
||||
assert isinstance(act, StatusIndicatorAction)
|
||||
assert act.widget._text_label.text() == name # type: ignore[union-attr]
|
||||
|
||||
def test_on_available_adds_and_removes(self, status_toolbar):
|
||||
conditions = [
|
||||
BeamlineConditionUpdateEntry(name="c1", title="Cond 1", condition_type="test"),
|
||||
BeamlineConditionUpdateEntry(name="c2", title="Cond 2", condition_type="test"),
|
||||
]
|
||||
status_toolbar.on_available_updated(conditions)
|
||||
assert status_toolbar.components.exists("c1")
|
||||
assert status_toolbar.components.exists("c2")
|
||||
|
||||
conditions2 = [
|
||||
BeamlineConditionUpdateEntry(name="c1", title="Cond 1", condition_type="test")
|
||||
]
|
||||
status_toolbar.on_available_updated(conditions2)
|
||||
assert status_toolbar.components.exists("c1")
|
||||
assert not status_toolbar.components.exists("c2")
|
||||
|
||||
def test_on_status_updated_sets_title_and_message(self, status_toolbar):
|
||||
status_toolbar.add_status_item("beam", text="Initial", state="default", tooltip=None)
|
||||
payload = {"name": "beam", "status": "warning", "title": "New Title", "message": "Detail"}
|
||||
status_toolbar.on_status_updated("beam", payload)
|
||||
|
||||
action = status_toolbar.components.get_action("beam")
|
||||
assert isinstance(action, StatusIndicatorAction)
|
||||
assert action.widget._text_label.text() == "New Title" # type: ignore[union-attr]
|
||||
assert action.action.toolTip() == "Detail"
|
||||
|
||||
def test_on_status_updated_keeps_existing_text_when_no_title(self, status_toolbar):
|
||||
status_toolbar.add_status_item("beam", text="Keep Me", state="default", tooltip=None)
|
||||
payload = {"name": "beam", "status": "normal", "message": "Note"}
|
||||
status_toolbar.on_status_updated("beam", payload)
|
||||
|
||||
action = status_toolbar.components.get_action("beam")
|
||||
assert isinstance(action, StatusIndicatorAction)
|
||||
assert action.widget._text_label.text() == "Keep Me" # type: ignore[union-attr]
|
||||
assert action.action.toolTip() == "Note"
|
||||
Reference in New Issue
Block a user