1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-14 12:40:54 +02:00

Compare commits

...

13 Commits

51 changed files with 2502 additions and 1012 deletions

View File

@@ -17,6 +17,10 @@ on:
required: false
type: string
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
permissions:
pull-requests: write

View File

@@ -2,15 +2,15 @@ from __future__ import annotations
from bec_lib import bec_logger
from bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area import AdvancedDockArea
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
logger = bec_logger.logger
def dock_area(
object_name: str | None = None, profile: str | None = None, start_empty: bool = False
) -> AdvancedDockArea:
) -> BECDockArea:
"""
Create an advanced dock area using Qt Advanced Docking System.
@@ -20,7 +20,7 @@ def dock_area(
start_empty(bool): If True, start with an empty dock area when loading specified profile.
Returns:
AdvancedDockArea: The created advanced dock area.
BECDockArea: The created advanced dock area.
Note:
The "general" profile is mandatory and will always exist. If manually deleted,
@@ -29,7 +29,7 @@ def dock_area(
# Default to "general" profile when called from CLI without specifying a profile
effective_profile = profile if profile is not None else "general"
widget = AdvancedDockArea(
widget = BECDockArea(
object_name=object_name,
restore_initial_profile=True,
root_widget=True,
@@ -51,7 +51,7 @@ def auto_update_dock_area(object_name: str | None = None) -> AutoUpdates:
object_name(str): The name of the dock area.
Returns:
AdvancedDockArea: The created dock area.
BECDockArea: The created dock area.
"""
_auto_update = AutoUpdates(object_name=object_name)
return _auto_update

View File

@@ -27,14 +27,12 @@ from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.name_utils import pascal_to_snake
from bec_widgets.utils.plugin_utils import get_plugin_auto_updates
from bec_widgets.utils.round_frame import RoundedFrame
from bec_widgets.utils.screen_utils import apply_window_geometry, centered_geometry_for_app
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.utils.ui_loader import UILoader
from bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area import AdvancedDockArea
from bec_widgets.widgets.containers.advanced_dock_area.profile_utils import (
get_last_profile,
list_profiles,
)
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
from bec_widgets.widgets.containers.dock_area.profile_utils import get_last_profile, list_profiles
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow, BECMainWindowNoRPC
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
@@ -78,23 +76,28 @@ class LaunchTile(RoundedFrame):
circular_pixmap.fill(Qt.transparent)
painter = QPainter(circular_pixmap)
painter.setRenderHints(QPainter.Antialiasing, True)
painter.setRenderHints(QPainter.RenderHint.Antialiasing, True)
path = QPainterPath()
path.addEllipse(0, 0, size, size)
painter.setClipPath(path)
pixmap = pixmap.scaled(size, size, Qt.KeepAspectRatio, Qt.SmoothTransformation)
pixmap = pixmap.scaled(
size,
size,
Qt.AspectRatioMode.KeepAspectRatio,
Qt.TransformationMode.SmoothTransformation,
)
painter.drawPixmap(0, 0, pixmap)
painter.end()
self.icon_label.setPixmap(circular_pixmap)
self.layout.addWidget(self.icon_label, alignment=Qt.AlignCenter)
self.layout.addWidget(self.icon_label, alignment=Qt.AlignmentFlag.AlignCenter)
# Top label
self.top_label = QLabel(top_label.upper())
font_top = self.top_label.font()
font_top.setPointSize(10)
self.top_label.setFont(font_top)
self.layout.addWidget(self.top_label, alignment=Qt.AlignCenter)
self.layout.addWidget(self.top_label, alignment=Qt.AlignmentFlag.AlignCenter)
# Main label
self.main_label = QLabel(main_label)
@@ -104,7 +107,7 @@ class LaunchTile(RoundedFrame):
font_main.setPointSize(14)
font_main.setBold(True)
self.main_label.setFont(font_main)
self.main_label.setAlignment(Qt.AlignCenter)
self.main_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
# Shrink font if the default would wrap on this platform / DPI
content_width = (
@@ -120,13 +123,13 @@ class LaunchTile(RoundedFrame):
self.layout.addWidget(self.main_label)
self.spacer_top = QSpacerItem(0, 10, QSizePolicy.Fixed, QSizePolicy.Fixed)
self.spacer_top = QSpacerItem(0, 10, QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed)
self.layout.addItem(self.spacer_top)
# Description
self.description_label = QLabel(description)
self.description_label.setWordWrap(True)
self.description_label.setAlignment(Qt.AlignCenter)
self.description_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.layout.addWidget(self.description_label)
# Selector
@@ -136,7 +139,9 @@ class LaunchTile(RoundedFrame):
else:
self.selector = None
self.spacer_bottom = QSpacerItem(0, 0, QSizePolicy.Fixed, QSizePolicy.Expanding)
self.spacer_bottom = QSpacerItem(
0, 0, QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Expanding
)
self.layout.addItem(self.spacer_bottom)
# Action button
@@ -156,7 +161,7 @@ class LaunchTile(RoundedFrame):
}
"""
)
self.layout.addWidget(self.action_button, alignment=Qt.AlignCenter)
self.layout.addWidget(self.action_button, alignment=Qt.AlignmentFlag.AlignCenter)
def _fit_label_to_width(self, label: QLabel, max_width: int, min_pt: int = 10):
"""
@@ -179,12 +184,13 @@ class LaunchTile(RoundedFrame):
metrics = QFontMetrics(font)
label.setFont(font)
label.setWordWrap(False)
label.setText(metrics.elidedText(label.text(), Qt.ElideRight, max_width))
label.setText(metrics.elidedText(label.text(), Qt.TextElideMode.ElideRight, max_width))
class LaunchWindow(BECMainWindow):
RPC = True
TILE_SIZE = (250, 300)
DEFAULT_LAUNCH_SIZE = (800, 600)
USER_ACCESS = ["show_launcher", "hide_launcher"]
def __init__(
@@ -209,7 +215,7 @@ class LaunchWindow(BECMainWindow):
self.toolbar = ModularToolBar(parent=self)
self.addToolBar(Qt.TopToolBarArea, self.toolbar)
self.spacer = QWidget(self)
self.spacer.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
self.spacer.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
self.toolbar.addWidget(self.spacer)
self.toolbar.addWidget(self.dark_mode_button)
@@ -318,7 +324,7 @@ class LaunchWindow(BECMainWindow):
)
tile.setFixedWidth(self.TILE_SIZE[0])
tile.setMinimumHeight(self.TILE_SIZE[1])
tile.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.MinimumExpanding)
tile.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.MinimumExpanding)
if action_button:
tile.action_button.clicked.connect(action_button)
if show_selector and selector_items:
@@ -428,7 +434,9 @@ class LaunchWindow(BECMainWindow):
from bec_widgets.applications import bw_launch
with RPCRegister.delayed_broadcast() as rpc_register:
existing_dock_areas = rpc_register.get_names_of_rpc_by_class_type(AdvancedDockArea)
if geometry is None and launch_script != "custom_ui_file":
geometry = self._default_launch_geometry()
existing_dock_areas = rpc_register.get_names_of_rpc_by_class_type(BECDockArea)
if name is not None:
WidgetContainerUtils.raise_for_invalid_name(name)
# If name already exists, generate a unique one with counter suffix
@@ -451,13 +459,13 @@ class LaunchWindow(BECMainWindow):
if launch_script == "auto_update":
auto_update = kwargs.pop("auto_update", None)
return self._launch_auto_update(auto_update)
return self._launch_auto_update(auto_update, geometry=geometry)
if launch_script == "widget":
widget = kwargs.pop("widget", None)
if widget is None:
raise ValueError("Widget name must be provided.")
return self._launch_widget(widget)
return self._launch_widget(widget, geometry=geometry)
launch = getattr(bw_launch, launch_script, None)
if launch is None:
@@ -469,13 +477,13 @@ class LaunchWindow(BECMainWindow):
logger.info(f"Created new dock area: {name}")
if isinstance(result_widget, BECMainWindow):
self._apply_window_geometry(result_widget, geometry)
apply_window_geometry(result_widget, geometry)
result_widget.show()
else:
window = BECMainWindowNoRPC()
window.setCentralWidget(result_widget)
window.setWindowTitle(f"BEC - {result_widget.objectName()}")
self._apply_window_geometry(window, geometry)
apply_window_geometry(window, geometry)
window.show()
return result_widget
@@ -511,12 +519,14 @@ class LaunchWindow(BECMainWindow):
window.setCentralWidget(loaded)
window.setWindowTitle(f"BEC - {filename}")
self._apply_window_geometry(window, None)
apply_window_geometry(window, None)
window.show()
logger.info(f"Launched custom UI: {filename}, type: {type(window).__name__}")
return window
def _launch_auto_update(self, auto_update: str) -> AutoUpdates:
def _launch_auto_update(
self, auto_update: str, geometry: tuple[int, int, int, int] | None = None
) -> AutoUpdates:
if auto_update in self.available_auto_updates:
auto_update_cls = self.available_auto_updates[auto_update]
window = auto_update_cls()
@@ -527,11 +537,13 @@ class LaunchWindow(BECMainWindow):
window.resize(window.minimumSizeHint())
window.setWindowTitle(f"BEC - {window.objectName()}")
self._apply_window_geometry(window, None)
apply_window_geometry(window, geometry)
window.show()
return window
def _launch_widget(self, widget: type[BECWidget]) -> QWidget:
def _launch_widget(
self, widget: type[BECWidget], geometry: tuple[int, int, int, int] | None = None
) -> QWidget:
name = pascal_to_snake(widget.__name__)
WidgetContainerUtils.raise_for_invalid_name(name)
@@ -544,7 +556,7 @@ class LaunchWindow(BECMainWindow):
window.setCentralWidget(widget_instance)
window.resize(window.minimumSizeHint())
window.setWindowTitle(f"BEC - {widget_instance.objectName()}")
self._apply_window_geometry(window, None)
apply_window_geometry(window, geometry)
window.show()
return window
@@ -592,30 +604,9 @@ class LaunchWindow(BECMainWindow):
raise ValueError(f"Widget {widget} not found in available widgets.")
return self.launch("widget", widget=self.available_widgets[widget])
def _apply_window_geometry(
self, window: QWidget, geometry: tuple[int, int, int, int] | None
) -> None:
"""Apply a provided geometry or center the window with an 80% layout."""
if geometry is not None:
window.setGeometry(*geometry)
return
default_geometry = self._default_window_geometry(window)
if default_geometry is not None:
window.setGeometry(*default_geometry)
else:
window.resize(window.minimumSizeHint())
@staticmethod
def _default_window_geometry(window: QWidget) -> tuple[int, int, int, int] | None:
screen = window.screen() or QApplication.primaryScreen()
if screen is None:
return None
available = screen.availableGeometry()
width = int(available.width() * 0.8)
height = int(available.height() * 0.8)
x = available.x() + (available.width() - width) // 2
y = available.y() + (available.height() - height) // 2
return x, y, width, height
def _default_launch_geometry(self) -> tuple[int, int, int, int] | None:
width, height = self.DEFAULT_LAUNCH_SIZE
return centered_geometry_for_app(width=width, height=height)
@SafeSlot(popup_error=True)
def _open_custom_ui_file(self):
@@ -706,7 +697,7 @@ class LaunchWindow(BECMainWindow):
self.hide()
if __name__ == "__main__":
if __name__ == "__main__": # pragma: no cover
import sys
from bec_widgets.utils.colors import apply_theme

View File

@@ -7,7 +7,12 @@ from bec_widgets.applications.views.developer_view.developer_view import Develop
from bec_widgets.applications.views.device_manager_view.device_manager_view import DeviceManagerView
from bec_widgets.applications.views.view import ViewBase, WaveformViewInline, WaveformViewPopup
from bec_widgets.utils.colors import apply_theme
from bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area import AdvancedDockArea
from bec_widgets.utils.screen_utils import (
apply_centered_size,
available_screen_geometry,
main_app_size_for_screen,
)
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow
@@ -45,7 +50,7 @@ class BECMainApp(BECMainWindow):
def _add_views(self):
self.add_section("BEC Applications", "bec_apps")
self.ads = AdvancedDockArea(self, profile_namespace="bec", auto_profile_namespace=False)
self.ads = BECDockArea(self, profile_namespace="bec", auto_profile_namespace=False)
self.ads.setObjectName("MainWorkspace")
self.device_manager = DeviceManagerView(self)
self.developer_view = DeveloperView(self)
@@ -211,25 +216,12 @@ def main(): # pragma: no cover
apply_theme("dark")
w = BECMainApp(show_examples=args.examples)
screen = app.primaryScreen()
screen_geometry = screen.availableGeometry()
screen_width = screen_geometry.width()
screen_height = screen_geometry.height()
# 70% of screen height, keep 16:9 ratio
height = int(screen_height * 0.9)
width = int(height * (16 / 9))
# If width exceeds screen width, scale down
if width > screen_width * 0.9:
width = int(screen_width * 0.9)
height = int(width / (16 / 9))
w.resize(width, height)
# Center the window on the screen
x = screen_geometry.x() + (screen_geometry.width() - width) // 2
y = screen_geometry.y() + (screen_geometry.height() - height) // 2
w.move(x, y)
screen_geometry = available_screen_geometry()
if screen_geometry is not None:
width, height = main_app_size_for_screen(screen_geometry)
apply_centered_size(w, width, height, available=screen_geometry)
else:
w.resize(w.minimumSizeHint())
w.show()

View File

@@ -1,3 +1,5 @@
from __future__ import annotations
from qtpy.QtWidgets import QWidget
from bec_widgets.applications.views.developer_view.developer_widget import DeveloperWidget

View File

@@ -13,8 +13,8 @@ from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.toolbars.actions import MaterialIconAction
from bec_widgets.utils.toolbars.bundles import ToolbarBundle
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area import AdvancedDockArea
from bec_widgets.widgets.containers.advanced_dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.containers.dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
from bec_widgets.widgets.containers.qt_ads import CDockWidget
from bec_widgets.widgets.editors.monaco.monaco_dock import MonacoDock
from bec_widgets.widgets.editors.monaco.monaco_widget import MonacoWidget
@@ -99,7 +99,7 @@ class DeveloperWidget(DockAreaWidget):
self.monaco = MonacoDock(self)
self.monaco.setObjectName("MonacoEditor")
self.monaco.save_enabled.connect(self._on_save_enabled_update)
self.plotting_ads = AdvancedDockArea(
self.plotting_ads = BECDockArea(
self,
mode="plot",
default_add_direction="bottom",

View File

@@ -38,7 +38,7 @@ from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.toolbars.actions import MaterialIconAction
from bec_widgets.utils.toolbars.bundles import ToolbarBundle
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.widgets.containers.advanced_dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.containers.dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.control.device_manager.components import (
DeviceTable,
DMConfigView,

View File

@@ -15,6 +15,7 @@ from qtpy.QtWidgets import (
)
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.toolbars.status_bar import StatusToolBar
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import SignalComboBox
from bec_widgets.widgets.plots.waveform.waveform import Waveform
@@ -30,6 +31,7 @@ class ViewBase(QWidget):
parent (QWidget | None): Parent widget.
id (str | None): Optional view id, useful for debugging or introspection.
title (str | None): Optional human-readable title.
show_status (bool): Whether to show a status toolbar at the top of the view.
"""
def __init__(
@@ -39,6 +41,8 @@ class ViewBase(QWidget):
*,
id: str | None = None,
title: str | None = None,
show_status: bool = False,
status_names: list[str] | None = None,
):
super().__init__(parent=parent)
self.content: QWidget | None = None
@@ -49,15 +53,48 @@ class ViewBase(QWidget):
lay.setContentsMargins(0, 0, 0, 0)
lay.setSpacing(0)
self.status_bar: StatusToolBar | None = None
if show_status:
# If explicit status names are provided, default to showing only those.
show_all = status_names is None
self.setup_status_bar(show_all_status=show_all, status_names=status_names)
if content is not None:
self.set_content(content)
def set_content(self, content: QWidget) -> None:
"""Replace the current content widget with a new one."""
if self.content is not None:
self.layout().removeWidget(self.content)
self.content.setParent(None)
self.content.close()
self.content.deleteLater()
self.content = content
self.layout().addWidget(content)
if self.status_bar is not None:
insert_at = self.layout().indexOf(self.status_bar) + 1
self.layout().insertWidget(insert_at, content)
else:
self.layout().addWidget(content)
def setup_status_bar(
self, *, show_all_status: bool = True, status_names: list[str] | None = None
) -> None:
"""Create and attach a status toolbar managed by the status broker."""
if self.status_bar is not None:
return
names_arg = None if show_all_status else status_names
self.status_bar = StatusToolBar(parent=self, names=names_arg)
self.layout().addWidget(self.status_bar)
def set_status(
self, name: str = "main", *, state=None, text: str | None = None, tooltip: str | None = None
) -> None:
"""Manually set a status item on the status bar."""
if self.status_bar is None:
self.setup_status_bar(show_all_status=True)
if self.status_bar is None:
return
self.status_bar.set_status(name=name, state=state, text=text, tooltip=tooltip)
@SafeSlot()
def on_enter(self) -> None:

View File

@@ -56,7 +56,6 @@ _Widgets = {
"ScatterWaveform": "ScatterWaveform",
"SignalLabel": "SignalLabel",
"TextBox": "TextBox",
"VSCodeEditor": "VSCodeEditor",
"Waveform": "Waveform",
"WebConsole": "WebConsole",
"WebsiteWidget": "WebsiteWidget",
@@ -91,7 +90,63 @@ except ImportError as e:
logger.error(f"Failed loading plugins: \n{reduce(add, traceback.format_exception(e))}")
class AdvancedDockArea(RPCBase):
class AutoUpdates(RPCBase):
@property
@rpc_call
def enabled(self) -> "bool":
"""
Get the enabled status of the auto updates.
"""
@enabled.setter
@rpc_call
def enabled(self) -> "bool":
"""
Get the enabled status of the auto updates.
"""
@property
@rpc_call
def selected_device(self) -> "str | None":
"""
Get the selected device from the auto update config.
Returns:
str: The selected device. If no device is selected, None is returned.
"""
@selected_device.setter
@rpc_call
def selected_device(self) -> "str | None":
"""
Get the selected device from the auto update config.
Returns:
str: The selected device. If no device is selected, None is returned.
"""
class AvailableDeviceResources(RPCBase):
@rpc_call
def remove(self):
"""
Cleanup the BECConnector
"""
@rpc_call
def attach(self):
"""
None
"""
@rpc_call
def detach(self):
"""
Detach the widget from its parent dock widget (if widget is in the dock), making it a floating widget.
"""
class BECDockArea(RPCBase):
@rpc_call
def new(
self,
@@ -321,62 +376,6 @@ class AdvancedDockArea(RPCBase):
"""
class AutoUpdates(RPCBase):
@property
@rpc_call
def enabled(self) -> "bool":
"""
Get the enabled status of the auto updates.
"""
@enabled.setter
@rpc_call
def enabled(self) -> "bool":
"""
Get the enabled status of the auto updates.
"""
@property
@rpc_call
def selected_device(self) -> "str | None":
"""
Get the selected device from the auto update config.
Returns:
str: The selected device. If no device is selected, None is returned.
"""
@selected_device.setter
@rpc_call
def selected_device(self) -> "str | None":
"""
Get the selected device from the auto update config.
Returns:
str: The selected device. If no device is selected, None is returned.
"""
class AvailableDeviceResources(RPCBase):
@rpc_call
def remove(self):
"""
Cleanup the BECConnector
"""
@rpc_call
def attach(self):
"""
None
"""
@rpc_call
def detach(self):
"""
Detach the widget from its parent dock widget (if widget is in the dock), making it a floating widget.
"""
class BECMainWindow(RPCBase):
@rpc_call
def remove(self):
@@ -2502,16 +2501,30 @@ class Image(RPCBase):
@property
@rpc_call
def monitor(self) -> "str":
def device_name(self) -> "str":
"""
The name of the monitor to use for the image.
The name of the device to monitor for image data.
"""
@monitor.setter
@device_name.setter
@rpc_call
def monitor(self) -> "str":
def device_name(self) -> "str":
"""
The name of the monitor to use for the image.
The name of the device to monitor for image data.
"""
@property
@rpc_call
def device_entry(self) -> "str":
"""
The signal/entry name to monitor on the device.
"""
@device_entry.setter
@rpc_call
def device_entry(self) -> "str":
"""
The signal/entry name to monitor on the device.
"""
@rpc_call
@@ -2617,8 +2630,8 @@ class Image(RPCBase):
@rpc_call
def image(
self,
monitor: "str | tuple | None" = None,
monitor_type: "Literal['auto', '1d', '2d']" = "auto",
device_name: "str | None" = None,
device_entry: "str | None" = None,
color_map: "str | None" = None,
color_bar: "Literal['simple', 'full'] | None" = None,
vrange: "tuple[int, int] | None" = None,
@@ -2627,14 +2640,14 @@ class Image(RPCBase):
Set the image source and update the image.
Args:
monitor(str|tuple|None): The name of the monitor to use for the image, or a tuple of (device, signal) for preview signals. If None or empty string, the current monitor will be disconnected.
monitor_type(str): The type of monitor to use. Options are "1d", "2d", or "auto".
device_name(str|None): The name of the device to monitor. If None or empty string, the current monitor will be disconnected.
device_entry(str|None): The signal/entry name to monitor on the device.
color_map(str): The color map to use for the image.
color_bar(str): The type of color bar to use. Options are "simple" or "full".
vrange(tuple): The range of values to use for the color map.
Returns:
ImageItem: The image object.
ImageItem: The image object, or None if connection failed.
"""
@property
@@ -5515,12 +5528,6 @@ class TextBox(RPCBase):
"""
class VSCodeEditor(RPCBase):
"""A widget to display the VSCode editor."""
...
class Waveform(RPCBase):
"""Widget for plotting waveforms."""

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import re
from functools import lru_cache
from typing import Literal
import numpy as np
@@ -9,6 +10,7 @@ from bec_lib import bec_logger
from bec_qthemes import apply_theme as apply_theme_global
from bec_qthemes._theme import AccentColors
from pydantic_core import PydanticCustomError
from pyqtgraph.graphicsItems.GradientEditorItem import Gradients
from qtpy.QtCore import QEvent, QEventLoop
from qtpy.QtGui import QColor
from qtpy.QtWidgets import QApplication
@@ -57,6 +59,96 @@ def apply_theme(theme: Literal["dark", "light"]):
class Colors:
@staticmethod
def list_available_colormaps() -> list[str]:
"""
List colormap names available via the pyqtgraph colormap registry.
Note: This does not include `GradientEditorItem` presets (used by HistogramLUT menus).
"""
def _list(source: str | None = None) -> list[str]:
try:
return pg.colormap.listMaps() if source is None else pg.colormap.listMaps(source)
except Exception: # pragma: no cover - backend may be missing
return []
return [*_list(None), *_list("matplotlib"), *_list("colorcet")]
@staticmethod
def list_available_gradient_presets() -> list[str]:
"""
List `GradientEditorItem` preset names (HistogramLUT right-click menu entries).
"""
from pyqtgraph.graphicsItems.GradientEditorItem import Gradients
return list(Gradients.keys())
@staticmethod
def canonical_colormap_name(color_map: str) -> str:
"""
Return an available colormap/preset name if a case-insensitive match exists.
"""
requested = (color_map or "").strip()
if not requested:
return requested
registry = Colors.list_available_colormaps()
presets = Colors.list_available_gradient_presets()
available = set(registry) | set(presets)
if requested in available:
return requested
# Case-insensitive match.
requested_lc = requested.casefold()
for name in available:
if name.casefold() == requested_lc:
return name
return requested
@staticmethod
def get_colormap(color_map: str) -> pg.ColorMap:
"""
Resolve a string into a `pg.ColorMap` using either:
- the `pg.colormap` registry (optionally including matplotlib/colorcet backends), or
- `GradientEditorItem` presets (HistogramLUT right-click menu).
"""
name = Colors.canonical_colormap_name(color_map)
if not name:
raise ValueError("Empty colormap name")
return Colors._get_colormap_cached(name)
@staticmethod
@lru_cache(maxsize=256)
def _get_colormap_cached(name: str) -> pg.ColorMap:
# 1) Registry/backends
try:
cmap = pg.colormap.get(name)
if cmap is not None:
return cmap
except Exception:
pass
for source in ("matplotlib", "colorcet"):
try:
cmap = pg.colormap.get(name, source=source)
if cmap is not None:
return cmap
except Exception:
continue
# 2) Presets -> ColorMap
if name not in Gradients:
raise KeyError(f"Colormap '{name}' not found")
ge = pg.GradientEditorItem()
ge.loadPreset(name)
return ge.colorMap()
@staticmethod
def golden_ratio(num: int) -> list:
@@ -138,7 +230,7 @@ class Colors:
if theme_offset < 0 or theme_offset > 1:
raise ValueError("theme_offset must be between 0 and 1")
cmap = pg.colormap.get(colormap)
cmap = Colors.get_colormap(colormap)
min_pos, max_pos = Colors.set_theme_offset(theme, theme_offset)
# Generate positions that are evenly spaced within the acceptable range
@@ -186,7 +278,7 @@ class Colors:
ValueError: If theme_offset is not between 0 and 1.
"""
cmap = pg.colormap.get(colormap)
cmap = Colors.get_colormap(colormap)
phi = (1 + np.sqrt(5)) / 2 # Golden ratio
golden_angle_conjugate = 1 - (1 / phi) # Approximately 0.38196601125
@@ -452,21 +544,24 @@ class Colors:
Raises:
PydanticCustomError: If colormap is invalid.
"""
available_pg_maps = pg.colormap.listMaps()
available_mpl_maps = pg.colormap.listMaps("matplotlib")
available_mpl_colorcet = pg.colormap.listMaps("colorcet")
available_colormaps = available_pg_maps + available_mpl_maps + available_mpl_colorcet
if color_map not in available_colormaps:
normalized = Colors.canonical_colormap_name(color_map)
try:
Colors.get_colormap(normalized)
except Exception as ext:
logger.warning(f"Colormap validation error: {ext}")
if return_error:
available_colormaps = sorted(
set(Colors.list_available_colormaps())
| set(Colors.list_available_gradient_presets())
)
raise PydanticCustomError(
"unsupported colormap",
f"Colormap '{color_map}' not found in the current installation of pyqtgraph. Choose on the following: {available_colormaps}.",
f"Colormap '{color_map}' not found in the current installation of pyqtgraph. Choose from the following: {available_colormaps}.",
{"wrong_value": color_map},
)
else:
return False
return color_map
return normalized
@staticmethod
def relative_luminance(color: QColor) -> float:

View File

@@ -0,0 +1,100 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from qtpy.QtWidgets import QApplication, QWidget
if TYPE_CHECKING: # pragma: no cover
from qtpy.QtCore import QRect
def available_screen_geometry(*, widget: QWidget | None = None) -> QRect | None:
"""
Get the available geometry of the screen associated with the given widget or application.
Args:
widget(QWidget | None): The widget to get the screen from.
Returns:
QRect | None: The available geometry of the screen, or None if no screen is found.
"""
screen = widget.screen() if widget is not None else None
if screen is None:
app = QApplication.instance()
screen = app.primaryScreen() if app is not None else None
if screen is None:
return None
return screen.availableGeometry()
def centered_geometry(available: "QRect", width: int, height: int) -> tuple[int, int, int, int]:
"""
Calculate centered geometry within the available rectangle.
Args:
available(QRect): The available rectangle to center within.
width(int): The desired width.
height(int): The desired height.
Returns:
tuple[int, int, int, int]: The (x, y, width, height) of the centered geometry.
"""
x = available.x() + (available.width() - width) // 2
y = available.y() + (available.height() - height) // 2
return x, y, width, height
def centered_geometry_for_app(width: int, height: int) -> tuple[int, int, int, int] | None:
available = available_screen_geometry()
if available is None:
return None
return centered_geometry(available, width, height)
def scaled_centered_geometry_for_window(
window: QWidget, *, width_ratio: float = 0.8, height_ratio: float = 0.8
) -> tuple[int, int, int, int] | None:
available = available_screen_geometry(widget=window)
if available is None:
return None
width = int(available.width() * width_ratio)
height = int(available.height() * height_ratio)
return centered_geometry(available, width, height)
def apply_window_geometry(
window: QWidget,
geometry: tuple[int, int, int, int] | None,
*,
width_ratio: float = 0.8,
height_ratio: float = 0.8,
) -> None:
if geometry is not None:
window.setGeometry(*geometry)
return
default_geometry = scaled_centered_geometry_for_window(
window, width_ratio=width_ratio, height_ratio=height_ratio
)
if default_geometry is not None:
window.setGeometry(*default_geometry)
else:
window.resize(window.minimumSizeHint())
def main_app_size_for_screen(available: "QRect") -> tuple[int, int]:
height = int(available.height() * 0.9)
width = int(height * (16 / 9))
if width > available.width() * 0.9:
width = int(available.width() * 0.9)
height = int(width / (16 / 9))
return width, height
def apply_centered_size(
window: QWidget, width: int, height: int, *, available: "QRect" | None = None
) -> None:
if available is None:
available = available_screen_geometry(widget=window)
if available is None:
window.resize(width, height)
return
window.setGeometry(*centered_geometry(available, width, height))

View File

@@ -5,7 +5,8 @@ import os
import weakref
from abc import ABC, abstractmethod
from contextlib import contextmanager
from typing import Dict, Literal
from enum import Enum
from typing import Dict, Literal, Union
from bec_lib.device import ReadoutPriority
from bec_lib.logger import bec_logger
@@ -15,6 +16,7 @@ from qtpy.QtGui import QAction, QColor, QIcon # type: ignore
from qtpy.QtWidgets import (
QApplication,
QComboBox,
QGraphicsDropShadowEffect,
QHBoxLayout,
QLabel,
QMenu,
@@ -26,6 +28,7 @@ from qtpy.QtWidgets import (
)
import bec_widgets
from bec_widgets.utils.colors import AccentColors, get_accent_colors
from bec_widgets.utils.toolbars.splitter import ResizableSpacer
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import BECDeviceFilter
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
@@ -102,6 +105,205 @@ class LongPressToolButton(QToolButton):
self.showMenu()
class StatusState(str, Enum):
DEFAULT = "default"
HIGHLIGHT = "highlight"
WARNING = "warning"
EMERGENCY = "emergency"
SUCCESS = "success"
class StatusIndicatorWidget(QWidget):
"""Pill-shaped status indicator with icon + label using accent colors."""
def __init__(
self, parent=None, text: str = "Ready", state: StatusState | str = StatusState.DEFAULT
):
super().__init__(parent)
self.setObjectName("StatusIndicatorWidget")
self._text = text
self._state = self._normalize_state(state)
self._theme_connected = False
layout = QHBoxLayout(self)
layout.setContentsMargins(6, 2, 8, 2)
layout.setSpacing(6)
self._icon_label = QLabel(self)
self._icon_label.setFixedSize(18, 18)
self._text_label = QLabel(self)
self._text_label.setText(self._text)
layout.addWidget(self._icon_label)
layout.addWidget(self._text_label)
# Give it a consistent pill height
self.setMinimumHeight(24)
self.setSizePolicy(QSizePolicy.Policy.Minimum, QSizePolicy.Policy.Fixed)
# Soft shadow similar to notification banners
self._shadow = QGraphicsDropShadowEffect(self)
self._shadow.setBlurRadius(18)
self._shadow.setOffset(0, 2)
self.setGraphicsEffect(self._shadow)
self._apply_state(self._state)
self._connect_theme_change()
def set_state(self, state: Union[StatusState, str]):
"""Update state and refresh visuals."""
self._state = self._normalize_state(state)
self._apply_state(self._state)
def set_text(self, text: str):
"""Update the displayed text."""
self._text = text
self._text_label.setText(text)
def _apply_state(self, state: StatusState):
palette = self._resolve_accent_colors()
color_attr = {
StatusState.DEFAULT: "default",
StatusState.HIGHLIGHT: "highlight",
StatusState.WARNING: "warning",
StatusState.EMERGENCY: "emergency",
StatusState.SUCCESS: "success",
}.get(state, "default")
base_color = getattr(palette, color_attr, None) or getattr(
palette, "default", QColor("gray")
)
# Apply style first (returns text color for label)
text_color = self._update_style(base_color, self._theme_fg_color())
theme_name = self._theme_name()
# Choose icon per state
icon_name_map = {
StatusState.DEFAULT: "check_circle",
StatusState.HIGHLIGHT: "check_circle",
StatusState.SUCCESS: "check_circle",
StatusState.WARNING: "warning",
StatusState.EMERGENCY: "dangerous",
}
icon_name = icon_name_map.get(state, "check_circle")
# Icon color:
# - Dark mode: follow text color (usually white) for high contrast.
# - Light mode: use a stronger version of the accent color for a colored glyph
# that stands out on the pastel pill background.
if theme_name == "light":
icon_q = QColor(base_color)
icon_color = icon_q.name(QColor.HexRgb)
else:
icon_color = text_color
icon = material_icon(
icon_name, size=(18, 18), convert_to_pixmap=False, filled=True, color=icon_color
)
if not icon.isNull():
self._icon_label.setPixmap(icon.pixmap(18, 18))
def _update_style(self, color: QColor, fg_color: QColor) -> str:
# Ensure the widget actually paints its own background
self.setAttribute(Qt.WidgetAttribute.WA_StyledBackground, True)
fg = QColor(fg_color)
text_color = fg.name(QColor.HexRgb)
theme_name = self._theme_name()
base = QColor(color)
start = QColor(base)
end = QColor(base)
border = QColor(base)
if theme_name == "light":
start.setAlphaF(0.20)
end.setAlphaF(0.06)
else:
start.setAlphaF(0.35)
end.setAlphaF(0.12)
border = border.darker(120)
# shadow color tuned per theme to match notification banners
if hasattr(self, "_shadow"):
if theme_name == "light":
shadow_color = QColor(15, 23, 42, 60) # softer shadow on light bg
else:
shadow_color = QColor(0, 0, 0, 160)
self._shadow.setColor(shadow_color)
# Use a fixed radius for a stable pill look inside toolbars
radius = 10
self.setStyleSheet(
f"""
#StatusIndicatorWidget {{
background-color: qlineargradient(spread:pad, x1:0, y1:0, x2:1, y2:1,
stop:0 {start.name(QColor.HexArgb)}, stop:1 {end.name(QColor.HexArgb)});
border: 1px solid {border.name(QColor.HexRgb)};
border-radius: {radius}px;
padding: 2px 8px;
}}
#StatusIndicatorWidget QLabel {{
color: {text_color};
background: transparent;
}}
"""
)
return text_color
def _theme_fg_color(self) -> QColor:
app = QApplication.instance()
theme = getattr(app, "theme", None)
if theme is not None and hasattr(theme, "color"):
try:
fg = theme.color("FG")
if isinstance(fg, QColor):
return fg
except Exception:
pass
palette = self._resolve_accent_colors()
base = getattr(palette, "default", QColor("white"))
luminance = (0.299 * base.red() + 0.587 * base.green() + 0.114 * base.blue()) / 255
return QColor("#000000") if luminance > 0.65 else QColor("#ffffff")
def _theme_name(self) -> str:
app = QApplication.instance()
theme = getattr(app, "theme", None)
name = getattr(theme, "theme", None)
if isinstance(name, str):
return name.lower()
return "dark"
def _connect_theme_change(self):
if self._theme_connected:
return
app = QApplication.instance()
theme = getattr(app, "theme", None)
if theme is not None and hasattr(theme, "theme_changed"):
try:
theme.theme_changed.connect(lambda _: self._apply_state(self._state))
self._theme_connected = True
except Exception:
pass
@staticmethod
def _normalize_state(state: Union[StatusState, str]) -> StatusState:
if isinstance(state, StatusState):
return state
try:
return StatusState(state)
except ValueError:
return StatusState.DEFAULT
@staticmethod
def _resolve_accent_colors() -> AccentColors:
return get_accent_colors()
class ToolBarAction(ABC):
"""
Abstract base class for toolbar actions.
@@ -148,6 +350,54 @@ class SeparatorAction(ToolBarAction):
toolbar.addSeparator()
class StatusIndicatorAction(ToolBarAction):
"""Toolbar action hosting a LED indicator and status text."""
def __init__(
self,
*,
text: str = "Ready",
state: Union[StatusState, str] = StatusState.DEFAULT,
tooltip: str | None = None,
):
super().__init__(icon_path=None, tooltip=tooltip or "View status", checkable=False)
self._text = text
self._state: StatusState = StatusIndicatorWidget._normalize_state(state)
self.widget: StatusIndicatorWidget | None = None
self.tooltip = tooltip or ""
def add_to_toolbar(self, toolbar: QToolBar, target: QWidget):
if (
self.widget is None
or self.widget.parent() is None
or self.widget.parent() is not toolbar
):
self.widget = StatusIndicatorWidget(parent=toolbar, text=self._text, state=self._state)
self.action = toolbar.addWidget(self.widget)
self.action.setText(self._text)
self.set_tooltip(self.tooltip)
def set_state(self, state: Union[StatusState, str]):
self._state = StatusIndicatorWidget._normalize_state(state)
if self.widget is not None:
self.widget.set_state(self._state)
def set_text(self, text: str):
self._text = text
if self.widget is not None:
self.widget.set_text(text)
if hasattr(self, "action") and self.action is not None:
self.action.setText(text)
def set_tooltip(self, tooltip: str | None):
"""Set tooltip on both the underlying widget and the QWidgetAction."""
self.tooltip = tooltip or ""
if self.widget is not None:
self.widget.setToolTip(self.tooltip)
if hasattr(self, "action") and self.action is not None:
self.action.setToolTip(self.tooltip)
class QtIconAction(IconAction):
def __init__(
self,

View File

@@ -0,0 +1,283 @@
from __future__ import annotations
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.messages import BeamlineStateConfig
from qtpy.QtCore import QObject, QTimer, Signal
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.toolbars.actions import StatusIndicatorAction, StatusState
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
logger = bec_logger.logger
class BECStatusBroker(BECConnector, QObject):
"""Listen to BEC beamline state endpoints and emit structured signals."""
_instance: "BECStatusBroker | None" = None
_initialized: bool = False
available_updated = Signal(list) # list of states available
status_updated = Signal(str, dict) # name, status update
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, parent=None, gui_id: str | None = None, client=None, **kwargs):
if self._initialized:
return
super().__init__(parent=parent, gui_id=gui_id, client=client, **kwargs)
self._watched: set[str] = set()
self.bec_dispatcher.connect_slot(
self.on_available, MessageEndpoints.available_beamline_states()
)
self._initialized = True
self.refresh_available()
def refresh_available(self):
"""Fetch the current set of beamline conditions once."""
try:
msg = self.client.connector.get_last(MessageEndpoints.available_beamline_states())
logger.info(f"StatusBroker: fetched available conditions payload: {msg}")
if msg:
self.on_available(msg.get("data").content, None)
except Exception as exc: # pragma: no cover - runtime env
logger.debug(f"Could not fetch available conditions: {exc}")
@SafeSlot(dict, dict)
def on_available(self, data: dict, meta: dict | None = None):
state_list = data.get("states") # latest one from the stream
self.available_updated.emit(state_list)
for state in state_list:
name = state.name
if name:
self.watch_state(name)
def watch_state(self, name: str):
"""Subscribe to updates for a single beamline state."""
if name in self._watched:
return
self._watched.add(name)
endpoint = MessageEndpoints.beamline_state(name)
logger.info(f"StatusBroker: watching state '{name}' on {endpoint.endpoint}")
self.bec_dispatcher.connect_slot(self.on_state, endpoint)
self.fetch_state(name)
def fetch_state(self, name: str):
"""Fetch the current value of a beamline state once."""
endpoint = MessageEndpoints.beamline_state(name)
try:
msg = self.client.connector.get_last(endpoint)
logger.info(f"StatusBroker: fetched state '{name}' payload: {msg}")
if msg:
self.on_state(msg.get("data").content, None)
except Exception as exc: # pragma: no cover - runtime env
logger.debug(f"Could not fetch state {name}: {exc}")
@SafeSlot(dict, dict)
def on_state(self, data: dict, meta: dict | None = None):
name = data.get("name")
if not name:
return
logger.info(f"StatusBroker: state update for '{name}' -> {data}")
self.status_updated.emit(str(name), data)
@classmethod
def reset_singleton(cls):
"""
Reset the singleton instance of the BECStatusBroker.
"""
cls._instance = None
cls._initialized = False
class StatusToolBar(ModularToolBar):
"""Status toolbar that auto-manages beamline state indicators."""
STATUS_MAP: dict[str, StatusState] = {
"valid": StatusState.SUCCESS,
"warning": StatusState.WARNING,
"invalid": StatusState.EMERGENCY,
}
def __init__(self, parent=None, names: list[str] | None = None, **kwargs):
super().__init__(parent=parent, orientation="horizontal", **kwargs)
self.setObjectName("StatusToolbar")
self._status_bundle = self.new_bundle("status")
self.show_bundles(["status"])
self._apply_status_toolbar_style()
self.allowed_names: set[str] | None = set(names) if names is not None else None
logger.info(f"StatusToolbar init allowed_names={self.allowed_names}")
self.broker = BECStatusBroker()
self.broker.available_updated.connect(self.on_available_updated)
self.broker.status_updated.connect(self.on_status_updated)
QTimer.singleShot(0, self.refresh_from_broker)
def refresh_from_broker(self) -> None:
if self.allowed_names is None:
self.broker.refresh_available()
else:
for name in self.allowed_names:
if not self.components.exists(name):
# Pre-create a placeholder pill so it is visible even before data arrives.
self.add_status_item(
name=name, text=name, state=StatusState.DEFAULT, tooltip=None
)
self.broker.watch_state(name)
def _apply_status_toolbar_style(self) -> None:
self.setStyleSheet(
"QToolBar#StatusToolbar {"
f" background-color: {self.background_color};"
" border: none;"
" border-bottom: 1px solid palette(mid);"
"}"
)
# -------- Slots for updates --------
@SafeSlot(list)
def on_available_updated(self, available_states: list):
"""Process the available states stream and start watching them."""
# Keep track of current names from the broker to remove stale ones.
current_names: set[str] = set()
for state in available_states:
if not isinstance(state, BeamlineStateConfig):
continue
name = state.name
title = state.title or name
if not name:
continue
current_names.add(name)
logger.info(f"StatusToolbar: discovered state '{name}' title='{title}'")
# auto-add unless filtered out
if self.allowed_names is None or name in self.allowed_names:
self.add_status_item(name=name, text=title, state=StatusState.DEFAULT, tooltip=None)
else:
# keep hidden but present for context menu toggling
self.add_status_item(name=name, text=title, state=StatusState.DEFAULT, tooltip=None)
act = self.components.get_action(name)
if act and act.action:
act.action.setVisible(False)
# Remove actions that are no longer present in available_states.
known_actions = [
n for n in self.components._components.keys() if n not in ("separator",)
] # direct access used for clean-up
for name in known_actions:
if name not in current_names:
logger.info(f"StatusToolbar: removing stale state '{name}'")
try:
self.components.remove_action(name)
except Exception as exc:
logger.warning(f"Failed to remove stale state '{name}': {exc}")
self.refresh()
@SafeSlot(str, dict)
def on_status_updated(self, name: str, payload: dict): # TODO finish update logic
"""Update a status pill when a state update arrives."""
state = self.STATUS_MAP.get(str(payload.get("status", "")).lower(), StatusState.DEFAULT)
action = self.components.get_action(name) if self.components.exists(name) else None
# Only update the label when a title is explicitly provided; otherwise keep current text.
title = payload.get("title") or None
text = title
if text is None and action is None:
text = payload.get("name") or name
if "label" in payload:
tooltip = payload.get("label") or ""
else:
tooltip = None
logger.info(
f"StatusToolbar: update state '{name}' -> state={state} text='{text}' tooltip='{tooltip}'"
)
self.set_status(name=name, text=text, state=state, tooltip=tooltip)
# -------- Items Management --------
def add_status_item(
self,
name: str,
*,
text: str = "Ready",
state: StatusState | str = StatusState.DEFAULT,
tooltip: str | None = None,
) -> StatusIndicatorAction | None:
"""
Add or update a named status item in the toolbar.
After you added all actions, call `toolbar.refresh()` to update the display.
Args:
name(str): Unique name for the status item.
text(str): Text to display in the status item.
state(StatusState | str): State of the status item.
tooltip(str | None): Optional tooltip for the status item.
Returns:
StatusIndicatorAction | None: The created or updated status action, or None if toolbar is not initialized.
"""
if self._status_bundle is None:
return
if self.components.exists(name):
return
action = StatusIndicatorAction(text=text, state=state, tooltip=tooltip)
return self.add_status_action(name, action)
def add_status_action(
self, name: str, action: StatusIndicatorAction
) -> StatusIndicatorAction | None:
"""
Attach an existing StatusIndicatorAction to the status toolbar.
After you added all actions, call `toolbar.refresh()` to update the display.
Args:
name(str): Unique name for the status item.
action(StatusIndicatorAction): The status action to add.
Returns:
StatusIndicatorAction | None: The added status action, or None if toolbar is not initialized.
"""
self.components.add_safe(name, action)
self.get_bundle("status").add_action(name)
self.refresh()
self.broker.fetch_state(name)
return action
def set_status(
self,
name: str = "main",
*,
state: StatusState | str | None = None,
text: str | None = None,
tooltip: str | None = None,
) -> None:
"""
Update the status item with the given name, creating it if necessary.
Args:
name(str): Unique name for the status item.
state(StatusState | str | None): New state for the status item.
text(str | None): New text for the status item.
"""
action = self.components.get_action(name) if self.components.exists(name) else None
if action is None:
action = self.add_status_item(
name, text=text or "Ready", state=state or "default", tooltip=tooltip
)
if action is None:
return
if state is not None:
action.set_state(state)
if text is not None:
action.set_text(text)
if tooltip is not None and hasattr(action, "set_tooltip"):
action.set_tooltip(tooltip)

View File

@@ -7,7 +7,7 @@ from bec_lib.logger import bec_logger
from bec_lib.messages import ScanStatusMessage
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area import AdvancedDockArea
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow
from bec_widgets.widgets.containers.qt_ads import CDockWidget
@@ -37,7 +37,7 @@ class AutoUpdates(BECMainWindow):
):
super().__init__(parent=parent, gui_id=gui_id, window_title=window_title, **kwargs)
self.dock_area = AdvancedDockArea(
self.dock_area = BECDockArea(
parent=self,
object_name="dock_area",
enable_profile_management=False,

View File

@@ -5,7 +5,7 @@ from typing import Literal, Mapping, Sequence
import slugify
from bec_lib import bec_logger
from qtpy.QtCore import QTimer, Signal
from qtpy.QtCore import Signal
from qtpy.QtGui import QPixmap
from qtpy.QtWidgets import (
QApplication,
@@ -31,8 +31,8 @@ from bec_widgets.utils.toolbars.actions import (
from bec_widgets.utils.toolbars.bundles import ToolbarBundle
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.utils.widget_state_manager import WidgetStateManager
from bec_widgets.widgets.containers.advanced_dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.containers.advanced_dock_area.profile_utils import (
from bec_widgets.widgets.containers.dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.containers.dock_area.profile_utils import (
SETTINGS_KEYS,
default_profile_candidates,
delete_profile_files,
@@ -55,14 +55,12 @@ from bec_widgets.widgets.containers.advanced_dock_area.profile_utils import (
user_profile_candidates,
write_manifest,
)
from bec_widgets.widgets.containers.advanced_dock_area.settings.dialogs import (
from bec_widgets.widgets.containers.dock_area.settings.dialogs import (
RestoreProfileDialog,
SaveProfileDialog,
)
from bec_widgets.widgets.containers.advanced_dock_area.settings.workspace_manager import (
WorkSpaceManager,
)
from bec_widgets.widgets.containers.advanced_dock_area.toolbar_components.workspace_actions import (
from bec_widgets.widgets.containers.dock_area.settings.workspace_manager import WorkSpaceManager
from bec_widgets.widgets.containers.dock_area.toolbar_components.workspace_actions import (
WorkspaceConnection,
workspace_bundle,
)
@@ -90,7 +88,7 @@ _PROFILE_NAMESPACE_UNSET = object()
PROFILE_STATE_KEYS = {key: SETTINGS_KEYS[key] for key in ("geom", "state", "ads_state")}
class AdvancedDockArea(DockAreaWidget):
class BECDockArea(DockAreaWidget):
RPC = True
PLUGIN = False
USER_ACCESS = [
@@ -1163,7 +1161,7 @@ if __name__ == "__main__": # pragma: no cover
dispatcher = BECDispatcher(gui_id="ads")
window = BECMainWindowNoRPC()
ads = AdvancedDockArea(mode="creator", enable_profile_management=True, root_widget=True)
ads = BECDockArea(mode="creator", enable_profile_management=True, root_widget=True)
window.setCentralWidget(ads)
window.show()

View File

@@ -28,7 +28,7 @@ from qtpy.QtWidgets import (
from bec_widgets import BECWidget, SafeSlot
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.widgets.containers.advanced_dock_area.profile_utils import (
from bec_widgets.widgets.containers.dock_area.profile_utils import (
get_profile_info,
is_quick_select,
list_profiles,

View File

@@ -10,7 +10,7 @@ from bec_widgets import SafeSlot
from bec_widgets.utils.toolbars.actions import MaterialIconAction, WidgetAction
from bec_widgets.utils.toolbars.bundles import ToolbarBundle, ToolbarComponents
from bec_widgets.utils.toolbars.connections import BundleConnection
from bec_widgets.widgets.containers.advanced_dock_area.profile_utils import list_quick_profiles
from bec_widgets.widgets.containers.dock_area.profile_utils import list_quick_profiles
class ProfileComboBox(QComboBox):

View File

@@ -1,7 +1,6 @@
from __future__ import annotations
import os
from typing import TYPE_CHECKING
from bec_lib.endpoints import MessageEndpoints
from qtpy.QtCore import QEvent, QSize, Qt, QTimer
@@ -22,6 +21,7 @@ from bec_widgets.utils import UILoader
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import apply_theme
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.toolbars.status_bar import StatusToolBar
from bec_widgets.widgets.containers.main_window.addons.hover_widget import HoverWidget
from bec_widgets.widgets.containers.main_window.addons.notification_center.notification_banner import (
BECNotificationBroker,
@@ -115,14 +115,11 @@ class BECMainWindow(BECWidget, QMainWindow):
Prepare the BEC specific widgets in the status bar.
"""
# Left: AppID label
self._app_id_label = QLabel()
self._app_id_label.setAlignment(
Qt.AlignmentFlag.AlignHCenter | Qt.AlignmentFlag.AlignVCenter
)
self.status_bar.addWidget(self._app_id_label)
# Left: Beamline condition status toolbar (auto-fetches all conditions)
self._status_toolbar = StatusToolBar(parent=self, names=None)
self.status_bar.addWidget(self._status_toolbar)
# Add a separator after the app ID label
# Add a separator after the status toolbar
self._add_separator()
# Centre: Clientinfo label (stretch=1 so it expands)
@@ -341,13 +338,27 @@ class BECMainWindow(BECWidget, QMainWindow):
help_menu.addAction(bec_docs)
help_menu.addAction(widgets_docs)
help_menu.addAction(bug_report)
help_menu.addSeparator()
self._app_id_action = QAction(self)
self._app_id_action.triggered.connect(self._copy_app_id_to_clipboard)
help_menu.addAction(self._app_id_action)
def _copy_app_id_to_clipboard(self):
"""
Copy the app ID to the clipboard.
"""
if self.bec_dispatcher.cli_server is not None:
server_id = self.bec_dispatcher.cli_server.gui_id
clipboard = QApplication.clipboard()
clipboard.setText(server_id)
################################################################################
# Status Bar Addons
################################################################################
def display_app_id(self):
"""
Display the app ID in the status bar.
Display the app ID in the Help menu.
"""
if self.bec_dispatcher.cli_server is None:
status_message = "Not connected"
@@ -355,7 +366,8 @@ class BECMainWindow(BECWidget, QMainWindow):
# Get the server ID from the dispatcher
server_id = self.bec_dispatcher.cli_server.gui_id
status_message = f"App ID: {server_id}"
self._app_id_label.setText(status_message)
if hasattr(self, "_app_id_action"):
self._app_id_action.setText(status_message)
@SafeSlot(dict, dict)
def display_client_message(self, msg: dict, meta: dict):

View File

@@ -186,6 +186,11 @@ class DeviceComboBox(DeviceInputBase, QComboBox):
device = self.itemData(idx)[0] # type: ignore[assignment]
return super().validate_device(device)
@property
def is_valid_input(self) -> bool:
"""Whether the current text represents a valid device selection."""
return self._is_valid_input
if __name__ == "__main__": # pragma: no cover
# pylint: disable=import-outside-toplevel

View File

@@ -69,11 +69,12 @@ class DeviceTest(QtCore.QRunnable):
enable_connect: bool,
force_connect: bool,
timeout: float,
device_manager_ds: object | None = None,
):
super().__init__()
self.uuid = device_model.uuid
test_config = {device_model.device_name: device_model.device_config}
self.tester = StaticDeviceTest(config_dict=test_config)
self.tester = StaticDeviceTest(config_dict=test_config, device_manager_ds=device_manager_ds)
self.signals = DeviceTestResult()
self.device_config = device_model.device_config
self.enable_connect = enable_connect
@@ -752,11 +753,15 @@ class OphydValidation(BECWidget, QtWidgets.QWidget):
# Remove widget from list as it's safe to assume it can be loaded.
self._remove_device_config(widget.device_model.device_config)
return
dm_ds = None
if self.client:
dm_ds = getattr(self.client, "device_manager", None)
runnable = DeviceTest(
device_model=widget.device_model,
enable_connect=connect,
force_connect=force_connect,
timeout=timeout,
device_manager_ds=dm_ds,
)
widget.validation_scheduled()
if self.thread_pool_manager:

View File

@@ -9,7 +9,7 @@ from bec_lib.macro_update_handler import has_executable_code
from qtpy.QtCore import QEvent, QTimer, Signal
from qtpy.QtWidgets import QFileDialog, QMessageBox, QToolButton, QWidget
from bec_widgets.widgets.containers.advanced_dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.containers.dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.containers.qt_ads import CDockAreaWidget, CDockWidget
from bec_widgets.widgets.editors.monaco.monaco_widget import MonacoWidget

View File

@@ -1,15 +0,0 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.editors.vscode.vs_code_editor_plugin import VSCodeEditorPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(VSCodeEditorPlugin())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -1 +0,0 @@
{'files': ['vscode.py']}

View File

@@ -1,57 +0,0 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from qtpy.QtWidgets import QWidget
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.editors.vscode.vscode import VSCodeEditor
DOM_XML = """
<ui language='c++'>
<widget class='VSCodeEditor' name='vs_code_editor'>
</widget>
</ui>
"""
class VSCodeEditorPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
if parent is None:
return QWidget()
t = VSCodeEditor(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return "BEC Developer"
def icon(self):
return designer_material_icon(VSCodeEditor.ICON_NAME)
def includeFile(self):
return "vs_code_editor"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "VSCodeEditor"
def toolTip(self):
return ""
def whatsThis(self):
return self.toolTip()

View File

@@ -1,203 +0,0 @@
import os
import select
import shlex
import signal
import socket
import subprocess
from typing import Literal
from pydantic import BaseModel
from qtpy.QtCore import Signal, Slot
from bec_widgets.widgets.editors.website.website import WebsiteWidget
class VSCodeInstructionMessage(BaseModel):
command: Literal["open", "write", "close", "zenMode", "save", "new", "setCursor"]
content: str = ""
def get_free_port():
"""
Get a free port on the local machine.
Returns:
int: The free port number
"""
sock = socket.socket()
sock.bind(("", 0))
port = sock.getsockname()[1]
sock.close()
return port
class VSCodeEditor(WebsiteWidget):
"""
A widget to display the VSCode editor.
"""
file_saved = Signal(str)
token = "bec"
host = "127.0.0.1"
PLUGIN = True
USER_ACCESS = []
ICON_NAME = "developer_mode_tv"
def __init__(self, parent=None, config=None, client=None, gui_id=None, **kwargs):
self.process = None
self.port = get_free_port()
self._url = f"http://{self.host}:{self.port}?tkn={self.token}"
super().__init__(parent=parent, config=config, client=client, gui_id=gui_id, **kwargs)
self.start_server()
self.bec_dispatcher.connect_slot(self.on_vscode_event, f"vscode-events/{self.gui_id}")
def start_server(self):
"""
Start the server.
This method starts the server for the VSCode editor in a subprocess.
"""
env = os.environ.copy()
env["BEC_Widgets_GUIID"] = self.gui_id
env["BEC_REDIS_HOST"] = self.client.connector.host
cmd = shlex.split(
f"code serve-web --port {self.port} --connection-token={self.token} --accept-server-license-terms"
)
self.process = subprocess.Popen(
cmd,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
preexec_fn=os.setsid,
env=env,
)
os.set_blocking(self.process.stdout.fileno(), False)
while self.process.poll() is None:
readylist, _, _ = select.select([self.process.stdout], [], [], 1)
if self.process.stdout in readylist:
output = self.process.stdout.read(1024)
if output and f"available at {self._url}" in output:
break
self.set_url(self._url)
self.wait_until_loaded()
@Slot(str)
def open_file(self, file_path: str):
"""
Open a file in the VSCode editor.
Args:
file_path: The file path to open
"""
msg = VSCodeInstructionMessage(command="open", content=f"file://{file_path}")
self.client.connector.raw_send(f"vscode-instructions/{self.gui_id}", msg.model_dump_json())
@Slot(dict, dict)
def on_vscode_event(self, content, _metadata):
"""
Handle the VSCode event. VSCode events are received as RawMessages.
Args:
content: The content of the event
metadata: The metadata of the event
"""
# the message also contains the content but I think is fine for now to just emit the file path
if not isinstance(content["data"], dict):
return
if "uri" not in content["data"]:
return
if not content["data"]["uri"].startswith("file://"):
return
file_path = content["data"]["uri"].split("file://")[1]
self.file_saved.emit(file_path)
@Slot()
def save_file(self):
"""
Save the file in the VSCode editor.
"""
msg = VSCodeInstructionMessage(command="save")
self.client.connector.raw_send(f"vscode-instructions/{self.gui_id}", msg.model_dump_json())
@Slot()
def new_file(self):
"""
Create a new file in the VSCode editor.
"""
msg = VSCodeInstructionMessage(command="new")
self.client.connector.raw_send(f"vscode-instructions/{self.gui_id}", msg.model_dump_json())
@Slot()
def close_file(self):
"""
Close the file in the VSCode editor.
"""
msg = VSCodeInstructionMessage(command="close")
self.client.connector.raw_send(f"vscode-instructions/{self.gui_id}", msg.model_dump_json())
@Slot(str)
def write_file(self, content: str):
"""
Write content to the file in the VSCode editor.
Args:
content: The content to write
"""
msg = VSCodeInstructionMessage(command="write", content=content)
self.client.connector.raw_send(f"vscode-instructions/{self.gui_id}", msg.model_dump_json())
@Slot()
def zen_mode(self):
"""
Toggle the Zen mode in the VSCode editor.
"""
msg = VSCodeInstructionMessage(command="zenMode")
self.client.connector.raw_send(f"vscode-instructions/{self.gui_id}", msg.model_dump_json())
@Slot(int, int)
def set_cursor(self, line: int, column: int):
"""
Set the cursor in the VSCode editor.
Args:
line: The line number
column: The column number
"""
msg = VSCodeInstructionMessage(command="setCursor", content=f"{line},{column}")
self.client.connector.raw_send(f"vscode-instructions/{self.gui_id}", msg.model_dump_json())
def cleanup_vscode(self):
"""
Cleanup the VSCode editor.
"""
if not self.process or self.process.poll() is not None:
return
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
self.process.wait()
def cleanup(self):
"""
Cleanup the widget. This method is called from the dock area when the widget is removed.
"""
self.bec_dispatcher.disconnect_slot(self.on_vscode_event, f"vscode-events/{self.gui_id}")
self.cleanup_vscode()
return super().cleanup()
if __name__ == "__main__": # pragma: no cover
import sys
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
widget = VSCodeEditor(gui_id="unknown")
widget.show()
app.exec_()
widget.bec_dispatcher.disconnect_all()
widget.client.shutdown()

File diff suppressed because it is too large Load Diff

View File

@@ -9,6 +9,7 @@ from pydantic import BaseModel, ConfigDict, Field, ValidationError
from qtpy.QtCore import QPointF, Signal, SignalInstance
from qtpy.QtWidgets import QDialog, QVBoxLayout
from bec_widgets.utils import Colors
from bec_widgets.utils.container_utils import WidgetContainerUtils
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.utils.side_panel import SidePanel
@@ -131,8 +132,9 @@ class ImageLayerManager:
image.setZValue(z_position)
image.removed.connect(self._remove_destroyed_layer)
# FIXME: For now, we hard-code the default color map here. In the future, this should be configurable.
image.color_map = "plasma"
color_map = getattr(getattr(self.parent, "config", None), "color_map", None)
if color_map:
image.color_map = color_map
self.layers[name] = ImageLayer(name=name, image=image, sync=sync)
self.plot_item.addItem(image)
@@ -249,6 +251,8 @@ class ImageBase(PlotBase):
Base class for the Image widget.
"""
MAX_TICKS_COLORBAR = 10
sync_colorbar_with_autorange = Signal()
image_updated = Signal()
layer_added = Signal(str)
@@ -460,18 +464,20 @@ class ImageBase(PlotBase):
self.setProperty("autorange", False)
if style == "simple":
self._color_bar = pg.ColorBarItem(colorMap=self.config.color_map)
cmap = Colors.get_colormap(self.config.color_map)
self._color_bar = pg.ColorBarItem(colorMap=cmap)
self._color_bar.setImageItem(self.layer_manager["main"].image)
self._color_bar.sigLevelsChangeFinished.connect(disable_autorange)
self.config.color_bar = "simple"
elif style == "full":
self._color_bar = pg.HistogramLUTItem()
self._color_bar.setImageItem(self.layer_manager["main"].image)
self._color_bar.gradient.loadPreset(self.config.color_map)
self.config.color_bar = "full"
self._apply_colormap_to_colorbar(self.config.color_map)
self._color_bar.sigLevelsChanged.connect(disable_autorange)
self.plot_widget.addItem(self._color_bar, row=0, col=1)
self.config.color_bar = style
else:
if self._color_bar:
self.plot_widget.removeItem(self._color_bar)
@@ -484,6 +490,37 @@ class ImageBase(PlotBase):
if vrange: # should be at the end to disable the autorange if defined
self.v_range = vrange
def _apply_colormap_to_colorbar(self, color_map: str) -> None:
if not self._color_bar:
return
cmap = Colors.get_colormap(color_map)
if self.config.color_bar == "simple":
self._color_bar.setColorMap(cmap)
return
if self.config.color_bar != "full":
return
gradient = getattr(self._color_bar, "gradient", None)
if gradient is None:
return
positions = np.linspace(0.0, 1.0, self.MAX_TICKS_COLORBAR)
colors = cmap.map(positions, mode="byte")
colors = np.asarray(colors)
if colors.ndim != 2:
return
if colors.shape[1] == 3: # add alpha
alpha = np.full((colors.shape[0], 1), 255, dtype=colors.dtype)
colors = np.concatenate([colors, alpha], axis=1)
ticks = [(float(p), tuple(int(x) for x in c)) for p, c in zip(positions, colors)]
state = {"mode": "rgb", "ticks": ticks}
gradient.restoreState(state)
################################################################################
# Static rois with roi manager
@@ -754,11 +791,11 @@ class ImageBase(PlotBase):
layer.image.color_map = value
if self._color_bar:
if self.config.color_bar == "simple":
self._color_bar.setColorMap(value)
elif self.config.color_bar == "full":
self._color_bar.gradient.loadPreset(value)
except ValidationError:
self._apply_colormap_to_colorbar(self.config.color_map)
except ValidationError as exc:
logger.warning(
f"Colormap '{value}' is not available; keeping '{self.config.color_map}'. {exc}"
)
return
@SafeProperty("QPointF")

View File

@@ -119,7 +119,8 @@ class ImageItem(BECConnector, pg.ImageItem):
"""Set a new color map."""
try:
self.config.color_map = value
self.setColorMap(value)
cmap = Colors.get_colormap(self.config.color_map)
self.setColorMap(cmap)
except ValidationError:
logger.error(f"Invalid colormap '{value}' provided.")

View File

@@ -0,0 +1,255 @@
from qtpy.QtWidgets import QHBoxLayout, QSizePolicy, QWidget
from bec_widgets.utils.toolbars.actions import WidgetAction
from bec_widgets.utils.toolbars.bundles import ToolbarBundle, ToolbarComponents
from bec_widgets.utils.toolbars.connections import BundleConnection
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import SignalComboBox
class DeviceSelection(QWidget):
"""Device and signal selection widget for image toolbar."""
def __init__(self, parent=None, client=None):
super().__init__(parent=parent)
self.client = client
self.supported_signals = [
"PreviewSignal",
"AsyncSignal",
"AsyncMultiSignal",
"DynamicSignal",
]
# Create device combobox with signal class filter
# This will only show devices that have signals matching the supported signal classes
self.device_combo_box = DeviceComboBox(
parent=self, client=self.client, signal_class_filter=self.supported_signals
)
self.device_combo_box.setToolTip("Select Device")
self.device_combo_box.setEditable(True)
# Set expanding size policy so it grows with available space
self.device_combo_box.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed)
self.device_combo_box.lineEdit().setPlaceholderText("Select Device")
# Configure SignalComboBox to filter by PreviewSignal and supported async signals
# Also filter by ndim (1D and 2D only) for Image widget
self.signal_combo_box = SignalComboBox(
parent=self,
client=self.client,
signal_class_filter=[
"PreviewSignal",
"AsyncSignal",
"AsyncMultiSignal",
"DynamicSignal",
],
ndim_filter=[1, 2], # Only show 1D and 2D signals for Image widget
store_signal_config=True,
require_device=True,
)
self.signal_combo_box.setToolTip("Select Signal")
self.signal_combo_box.setEditable(True)
# Set expanding size policy so it grows with available space
self.signal_combo_box.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed)
self.signal_combo_box.lineEdit().setPlaceholderText("Select Signal")
# Connect comboboxes together
self.device_combo_box.currentTextChanged.connect(self.signal_combo_box.set_device)
self.device_combo_box.device_reset.connect(self.signal_combo_box.reset_selection)
# Simple horizontal layout with stretch to fill space
layout = QHBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(2)
layout.addWidget(self.device_combo_box, stretch=1)
layout.addWidget(self.signal_combo_box, stretch=1)
def set_device_and_signal(self, device_name: str | None, device_entry: str | None) -> None:
"""Set the displayed device and signal without emitting selection signals."""
device_name = device_name or ""
device_entry = device_entry or ""
self.device_combo_box.blockSignals(True)
self.signal_combo_box.blockSignals(True)
try:
if device_name:
# Set device in device_combo_box
index = self.device_combo_box.findText(device_name)
if index >= 0:
self.device_combo_box.setCurrentIndex(index)
else:
# Device not found in list, but still set it
self.device_combo_box.setCurrentText(device_name)
# Only update signal combobox device filter if it's actually changing
# This prevents redundant repopulation which can cause duplicates !!!!
current_device = getattr(self.signal_combo_box, "_device", None)
if current_device != device_name:
self.signal_combo_box.set_device(device_name)
# Sync signal combobox selection
if device_entry:
# Try to find the signal by component_name (which is what's displayed)
found = False
for i in range(self.signal_combo_box.count()):
text = self.signal_combo_box.itemText(i)
config_data = self.signal_combo_box.itemData(i)
# Check if this matches our signal
if config_data:
component_name = config_data.get("component_name", "")
if text == component_name or text == device_entry:
self.signal_combo_box.setCurrentIndex(i)
found = True
break
if not found:
# Fallback: try to match the device_entry directly
index = self.signal_combo_box.findText(device_entry)
if index >= 0:
self.signal_combo_box.setCurrentIndex(index)
else:
# No device set, clear selections
self.device_combo_box.setCurrentText("")
self.signal_combo_box.reset_selection()
finally:
# Always unblock signals
self.device_combo_box.blockSignals(False)
self.signal_combo_box.blockSignals(False)
def set_connection_status(self, status: str, message: str | None = None) -> None:
tooltip = f"Connection status: {status}"
if message:
tooltip = f"{tooltip}\n{message}"
self.device_combo_box.setToolTip(tooltip)
self.signal_combo_box.setToolTip(tooltip)
if not self.device_combo_box.is_valid_input or not self.signal_combo_box.is_valid_input:
return
if status == "error":
style = "border: 1px solid orange;"
else:
style = "border: 1px solid transparent;"
self.device_combo_box.setStyleSheet(style)
self.signal_combo_box.setStyleSheet(style)
def cleanup(self):
"""Clean up the widget resources."""
self.device_combo_box.close()
self.device_combo_box.deleteLater()
self.signal_combo_box.close()
self.signal_combo_box.deleteLater()
def device_selection_bundle(components: ToolbarComponents, client=None) -> ToolbarBundle:
"""
Creates a device selection toolbar bundle for Image widget.
Includes a resizable splitter after the device selection. All subsequent bundles'
actions will appear compactly after the splitter with no gaps.
Args:
components (ToolbarComponents): The components to be added to the bundle.
client: The BEC client instance.
Returns:
ToolbarBundle: The device selection toolbar bundle.
"""
device_selection_widget = DeviceSelection(parent=components.toolbar, client=client)
components.add_safe(
"device_selection", WidgetAction(widget=device_selection_widget, adjust_size=False)
)
bundle = ToolbarBundle("device_selection", components)
bundle.add_action("device_selection")
bundle.add_splitter(
name="device_selection_splitter",
target_widget=device_selection_widget,
min_width=210,
max_width=600,
)
return bundle
class DeviceSelectionConnection(BundleConnection):
"""
Connection helper for the device selection bundle.
"""
def __init__(self, components: ToolbarComponents, target_widget=None):
super().__init__(parent=components.toolbar)
self.bundle_name = "device_selection"
self.components = components
self.target_widget = target_widget
self._connected = False
self.register_property_sync("device_name", self._sync_from_device_name)
self.register_property_sync("device_entry", self._sync_from_device_entry)
self.register_property_sync("connection_status", self._sync_connection_status)
self.register_property_sync("connection_error", self._sync_connection_status)
def _widget(self) -> DeviceSelection:
return self.components.get_action("device_selection").widget
def connect(self):
if self._connected:
return
widget = self._widget()
widget.device_combo_box.device_selected.connect(
self.target_widget.on_device_selection_changed
)
widget.signal_combo_box.device_signal_changed.connect(
self.target_widget.on_device_selection_changed
)
self.connect_property_sync(self.target_widget)
self._connected = True
def disconnect(self):
if not self._connected:
return
widget = self._widget()
widget.device_combo_box.device_selected.disconnect(
self.target_widget.on_device_selection_changed
)
widget.signal_combo_box.device_signal_changed.disconnect(
self.target_widget.on_device_selection_changed
)
self.disconnect_property_sync(self.target_widget)
self._connected = False
widget.cleanup()
def _sync_from_device_name(self, _):
try:
widget = self._widget()
except Exception:
return
widget.set_device_and_signal(
self.target_widget.device_name, self.target_widget.device_entry
)
self.target_widget._sync_device_entry_from_toolbar()
def _sync_from_device_entry(self, _):
try:
widget = self._widget()
except Exception:
return
widget.set_device_and_signal(
self.target_widget.device_name, self.target_widget.device_entry
)
def _sync_connection_status(self, _):
try:
widget = self._widget()
except Exception:
return
widget.set_connection_status(
self.target_widget._config.connection_status,
self.target_widget._config.connection_error,
)

View File

@@ -32,7 +32,7 @@ dock_area = gui.new()
img_widget = dock_area.new().new(gui.available_widgets.Image)
# Add an ImageWidget to the BECFigure for a 2D detector
img_widget.image(monitor='eiger', monitor_type='2d')
img_widget.image(device_name='eiger', device_entry='preview')
img_widget.title = "Camera Image - Eiger Detector"
```
@@ -46,7 +46,7 @@ dock_area = gui.new()
img_widget = dock_area.new().new(gui.available_widgets.Image)
# Add an ImageWidget to the BECFigure for a 2D detector
img_widget.image(monitor='waveform', monitor_type='1d')
img_widget.image(device_name='waveform', device_entry='data')
img_widget.title = "Line Detector Data"
# Optional: Set the color map and value range
@@ -84,7 +84,7 @@ The Image Widget can be configured for different detectors by specifying the cor
```python
# For a 2D camera detector
img_widget = fig.image(monitor='eiger', monitor_type='2d')
img_widget = fig.image(device_name='eiger', device_entry='preview')
img_widget.set_title("Eiger Camera Image")
```
@@ -92,7 +92,7 @@ img_widget.set_title("Eiger Camera Image")
```python
# For a 1D line detector
img_widget = fig.image(monitor='waveform', monitor_type='1d')
img_widget = fig.image(device_name='waveform', device_entry='data')
img_widget.set_title("Line Detector Data")
```

View File

@@ -59,7 +59,7 @@ def test_rpc_add_dock_with_plots_e2e(qtbot, bec_client_lib, connected_client_gui
mm.map("samx", "samy")
curve = wf.plot(x_name="samx", y_name="bpm4i")
im_item = im.image("eiger")
im_item = im.image(device_name="eiger", device_entry="preview")
assert curve.__class__.__name__ == "RPCReference"
assert curve.__class__ == RPCReference
@@ -122,12 +122,12 @@ def test_rpc_gui_obj(connected_client_gui_obj, qtbot):
assert gui.windows["bec"] is gui.bec
mw = gui.bec
assert mw.__class__.__name__ == "RPCReference"
assert gui._ipython_registry[mw._gui_id].__class__.__name__ == "AdvancedDockArea"
assert gui._ipython_registry[mw._gui_id].__class__.__name__ == "BECDockArea"
xw = gui.new("X")
xw.delete_all()
assert xw.__class__.__name__ == "RPCReference"
assert gui._ipython_registry[xw._gui_id].__class__.__name__ == "AdvancedDockArea"
assert gui._ipython_registry[xw._gui_id].__class__.__name__ == "BECDockArea"
assert len(gui.windows) == 2
assert gui._gui_is_alive()

View File

@@ -42,7 +42,7 @@ def test_rpc_plotting_shortcuts_init_configs(qtbot, connected_client_gui_obj):
c3 = wf.plot(y=[1, 2, 3], x=[1, 2, 3])
assert c3.object_name == "Curve_0"
im.image(monitor="eiger")
im.image(device_name="eiger", device_entry="preview")
mm.map(x_name="samx", y_name="samy")
sw.plot(x_name="samx", y_name="samy", z_name="bpm4a")
mw.plot(monitor="waveform")
@@ -165,14 +165,14 @@ def test_rpc_image(qtbot, bec_client_lib, connected_client_gui_obj):
scans = client.scans
im = dock_area.new("Image")
im.image(monitor="eiger")
im.image(device_name="eiger", device_entry="preview")
status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
status.wait()
last_image_device = client.connector.get_last(MessageEndpoints.device_monitor_2d("eiger"))[
"data"
].data
last_image_device = client.connector.get_last(
MessageEndpoints.device_preview("eiger", "preview")
)["data"].data
last_image_plot = im.main_image.get_data()
# check plotted data

View File

@@ -15,7 +15,7 @@ def test_rpc_reference_objects(connected_client_gui_obj):
plt.plot(x_name="samx", y_name="bpm4i")
im = dock_area.new("Image")
im.image("eiger")
im.image(device_name="eiger", device_entry="preview")
motor_map = dock_area.new("MotorMap")
motor_map.map("samx", "samy")
plt_z = dock_area.new("Waveform")
@@ -23,7 +23,8 @@ def test_rpc_reference_objects(connected_client_gui_obj):
assert len(plt_z.curves) == 1
assert len(plt.curves) == 1
assert im.monitor == "eiger"
assert im.device_name == "eiger"
assert im.device_entry == "preview"
assert isinstance(im.main_image, RPCReference)
image_item = gui._ipython_registry.get(im.main_image._gui_id, None)

View File

@@ -74,15 +74,15 @@ def test_available_widgets(qtbot, connected_client_gui_obj):
"""This test checks that all widgets that are available via gui.available_widgets can be created and removed."""
gui = connected_client_gui_obj
dock_area = gui.bec
# Number of top level widgets, should be 4
top_level_widgets_count = 12
# Number of top level widgets, should be 5
top_level_widgets_count = 13
assert len(gui._server_registry) == top_level_widgets_count
names = set(list(gui._server_registry.keys()))
# Number of widgets with parent_id == None, should be 2
# Number of widgets with parent_id == None, should be 3
widgets = [
widget for widget in gui._server_registry.values() if widget["config"]["parent_id"] is None
]
assert len(widgets) == 2
assert len(widgets) == 3
# Test all relevant widgets
for object_name in gui.available_widgets.__dict__:
@@ -115,7 +115,7 @@ def test_available_widgets(qtbot, connected_client_gui_obj):
for widget in gui._server_registry.values()
if widget["config"]["parent_id"] is None
]
assert len(widgets) == 2
assert len(widgets) == 3
#############################
####### Remove widget #######

View File

@@ -16,6 +16,7 @@ from typing import TYPE_CHECKING
import numpy as np
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.cli.rpc.rpc_base import RPCBase, RPCReference
@@ -233,7 +234,7 @@ def test_widgets_e2e_image(qtbot, connected_client_gui_obj, random_generator_fro
scans = bec.scans
dev = bec.device_manager.devices
# Test rpc calls
img = widget.image(dev.eiger)
img = widget.image(device_name=dev.eiger.name, device_entry="preview")
assert img.get_data() is None
# Run a scan and plot the image
s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
@@ -247,13 +248,13 @@ def test_widgets_e2e_image(qtbot, connected_client_gui_obj, random_generator_fro
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# Check that last image is equivalent to data in Redis
last_img = bec.device_monitor.get_data(
dev.eiger, count=1
) # Get last image from Redis monitor 2D endpoint
last_img = bec.connector.get_last(MessageEndpoints.device_preview("eiger", "preview"))[
"data"
].data
assert np.allclose(img.get_data(), last_img)
# Now add a device with a preview signal
img = widget.image(["eiger", "preview"])
img = widget.image(device_name="eiger", device_entry="preview")
s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
s.wait()

View File

@@ -3,13 +3,13 @@ from unittest import mock
import pytest
from bec_widgets.cli.client import AdvancedDockArea
from bec_widgets.cli.client import BECDockArea
from bec_widgets.cli.client_utils import BECGuiClient, _start_plot_process
@pytest.fixture
def cli_dock_area():
dock_area = AdvancedDockArea(gui_id="test")
dock_area = BECDockArea(gui_id="test")
with mock.patch.object(dock_area, "_run_rpc") as mock_rpc_call:
with mock.patch.object(dock_area, "_gui_is_alive", return_value=True):
yield dock_area, mock_rpc_call

View File

@@ -82,6 +82,45 @@ def test_rgba_to_hex():
assert Colors.rgba_to_hex(255, 87, 51) == "#FF5733FF"
def test_canonical_colormap_name_case_insensitive():
available = Colors.list_available_colormaps()
presets = Colors.list_available_gradient_presets()
if not available and not presets:
pytest.skip("No colormaps or presets available to test canonical mapping.")
name = (available or presets)[0]
requested = name.swapcase()
assert Colors.canonical_colormap_name(requested) == name
def test_validate_color_map_returns_canonical_name():
available = Colors.list_available_colormaps()
presets = Colors.list_available_gradient_presets()
if not available and not presets:
pytest.skip("No colormaps or presets available to test validation.")
name = (available or presets)[0]
requested = name.swapcase()
assert Colors.validate_color_map(requested) == name
def test_get_colormap_uses_gradient_preset_fallback(monkeypatch):
presets = Colors.list_available_gradient_presets()
if not presets:
pytest.skip("No gradient presets available to test fallback.")
preset = presets[0]
Colors._get_colormap_cached.cache_clear()
def _raise(*args, **kwargs):
raise Exception("registry unavailable")
monkeypatch.setattr(pg.colormap, "get", _raise)
cmap = Colors._get_colormap_cached(preset)
assert isinstance(cmap, pg.ColorMap)
@pytest.mark.parametrize("num", [10, 100, 400])
def test_evenly_spaced_colors(num):
colors_qcolor = Colors.evenly_spaced_colors(colormap="magma", num=num, format="QColor")

View File

@@ -10,17 +10,14 @@ from qtpy.QtCore import QSettings, Qt, QTimer
from qtpy.QtGui import QPixmap
from qtpy.QtWidgets import QDialog, QMessageBox, QWidget
import bec_widgets.widgets.containers.advanced_dock_area.basic_dock_area as basic_dock_module
import bec_widgets.widgets.containers.advanced_dock_area.profile_utils as profile_utils
from bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area import (
AdvancedDockArea,
SaveProfileDialog,
)
from bec_widgets.widgets.containers.advanced_dock_area.basic_dock_area import (
import bec_widgets.widgets.containers.dock_area.basic_dock_area as basic_dock_module
import bec_widgets.widgets.containers.dock_area.profile_utils as profile_utils
from bec_widgets.widgets.containers.dock_area.basic_dock_area import (
DockAreaWidget,
DockSettingsDialog,
)
from bec_widgets.widgets.containers.advanced_dock_area.profile_utils import (
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea, SaveProfileDialog
from bec_widgets.widgets.containers.dock_area.profile_utils import (
SETTINGS_KEYS,
default_profile_path,
get_profile_info,
@@ -31,20 +28,17 @@ from bec_widgets.widgets.containers.advanced_dock_area.profile_utils import (
load_user_profile_screenshot,
open_default_settings,
open_user_settings,
plugin_profiles_dir,
read_manifest,
restore_user_from_default,
set_quick_select,
user_profile_path,
write_manifest,
)
from bec_widgets.widgets.containers.advanced_dock_area.settings.dialogs import (
from bec_widgets.widgets.containers.dock_area.settings.dialogs import (
PreviewPanel,
RestoreProfileDialog,
)
from bec_widgets.widgets.containers.advanced_dock_area.settings.workspace_manager import (
WorkSpaceManager,
)
from bec_widgets.widgets.containers.dock_area.settings.workspace_manager import WorkSpaceManager
from .client_mocks import mocked_client
@@ -52,7 +46,7 @@ from .client_mocks import mocked_client
@pytest.fixture
def advanced_dock_area(qtbot, mocked_client):
"""Create an AdvancedDockArea instance for testing."""
widget = AdvancedDockArea(client=mocked_client)
widget = BECDockArea(client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
@@ -152,7 +146,7 @@ def workspace_manager_target():
"""Mock delete_profile that performs actual file deletion."""
from qtpy.QtWidgets import QMessageBox
from bec_widgets.widgets.containers.advanced_dock_area.profile_utils import (
from bec_widgets.widgets.containers.dock_area.profile_utils import (
delete_profile_files,
is_profile_read_only,
)
@@ -190,7 +184,7 @@ def basic_dock_area(qtbot, mocked_client):
class _NamespaceProfiles:
"""Helper that routes profile file helpers through a namespace."""
def __init__(self, widget: AdvancedDockArea):
def __init__(self, widget: BECDockArea):
self.namespace = widget.profile_namespace
def open_user(self, name: str):
@@ -215,7 +209,7 @@ class _NamespaceProfiles:
return is_quick_select(name, namespace=self.namespace)
def profile_helper(widget: AdvancedDockArea) -> _NamespaceProfiles:
def profile_helper(widget: BECDockArea) -> _NamespaceProfiles:
"""Return a helper wired to the widget's profile namespace."""
return _NamespaceProfiles(widget)
@@ -590,7 +584,7 @@ class TestAdvancedDockAreaInit:
def test_init(self, advanced_dock_area):
assert advanced_dock_area is not None
assert isinstance(advanced_dock_area, AdvancedDockArea)
assert isinstance(advanced_dock_area, BECDockArea)
assert advanced_dock_area.mode == "creator"
assert hasattr(advanced_dock_area, "dock_manager")
assert hasattr(advanced_dock_area, "toolbar")
@@ -598,8 +592,8 @@ class TestAdvancedDockAreaInit:
assert hasattr(advanced_dock_area, "state_manager")
def test_rpc_and_plugin_flags(self):
assert AdvancedDockArea.RPC is True
assert AdvancedDockArea.PLUGIN is False
assert BECDockArea.RPC is True
assert BECDockArea.PLUGIN is False
def test_user_access_list(self):
expected_methods = [
@@ -611,7 +605,7 @@ class TestAdvancedDockAreaInit:
"delete_all",
]
for method in expected_methods:
assert method in AdvancedDockArea.USER_ACCESS
assert method in BECDockArea.USER_ACCESS
class TestDockManagement:
@@ -1421,21 +1415,21 @@ class TestAdvancedDockAreaRestoreAndDialogs:
pix = QPixmap(8, 8)
pix.fill(Qt.red)
monkeypatch.setattr(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.load_user_profile_screenshot",
"bec_widgets.widgets.containers.dock_area.dock_area.load_user_profile_screenshot",
lambda name, namespace=None: pix,
)
monkeypatch.setattr(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.load_default_profile_screenshot",
"bec_widgets.widgets.containers.dock_area.dock_area.load_default_profile_screenshot",
lambda name, namespace=None: pix,
)
monkeypatch.setattr(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.RestoreProfileDialog.confirm",
"bec_widgets.widgets.containers.dock_area.dock_area.RestoreProfileDialog.confirm",
lambda *args, **kwargs: True,
)
with (
patch(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.restore_user_from_default"
"bec_widgets.widgets.containers.dock_area.dock_area.restore_user_from_default"
) as mock_restore,
patch.object(advanced_dock_area, "delete_all") as mock_delete_all,
patch.object(advanced_dock_area, "load_profile") as mock_load_profile,
@@ -1457,20 +1451,20 @@ class TestAdvancedDockAreaRestoreAndDialogs:
advanced_dock_area._current_profile_name = profile_name
advanced_dock_area.isVisible = lambda: False
monkeypatch.setattr(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.load_user_profile_screenshot",
"bec_widgets.widgets.containers.dock_area.dock_area.load_user_profile_screenshot",
lambda name: QPixmap(),
)
monkeypatch.setattr(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.load_default_profile_screenshot",
"bec_widgets.widgets.containers.dock_area.dock_area.load_default_profile_screenshot",
lambda name: QPixmap(),
)
monkeypatch.setattr(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.RestoreProfileDialog.confirm",
"bec_widgets.widgets.containers.dock_area.dock_area.RestoreProfileDialog.confirm",
lambda *args, **kwargs: False,
)
with patch(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.restore_user_from_default"
"bec_widgets.widgets.containers.dock_area.dock_area.restore_user_from_default"
) as mock_restore:
advanced_dock_area.restore_user_profile_from_default()
@@ -1479,7 +1473,7 @@ class TestAdvancedDockAreaRestoreAndDialogs:
def test_restore_user_profile_from_default_no_target(self, advanced_dock_area, monkeypatch):
advanced_dock_area._current_profile_name = None
with patch(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.RestoreProfileDialog.confirm"
"bec_widgets.widgets.containers.dock_area.dock_area.RestoreProfileDialog.confirm"
) as mock_confirm:
advanced_dock_area.restore_user_profile_from_default()
mock_confirm.assert_not_called()
@@ -1723,8 +1717,7 @@ class TestWorkspaceProfileOperations:
return False
with patch(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.SaveProfileDialog",
StubDialog,
"bec_widgets.widgets.containers.dock_area.dock_area.SaveProfileDialog", StubDialog
):
advanced_dock_area.save_profile(profile_name, show_dialog=True)
@@ -1795,8 +1788,7 @@ class TestWorkspaceProfileOperations:
return False
with patch(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.SaveProfileDialog",
StubDialog,
"bec_widgets.widgets.containers.dock_area.dock_area.SaveProfileDialog", StubDialog
):
advanced_dock_area.save_profile(show_dialog=True)
@@ -1859,11 +1851,11 @@ class TestWorkspaceProfileOperations:
with (
patch(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.QMessageBox.question",
"bec_widgets.widgets.containers.dock_area.dock_area.QMessageBox.question",
return_value=QMessageBox.Yes,
) as mock_question,
patch(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.QMessageBox.information",
"bec_widgets.widgets.containers.dock_area.dock_area.QMessageBox.information",
return_value=None,
) as mock_info,
):
@@ -1893,7 +1885,7 @@ class TestWorkspaceProfileOperations:
mock_get_action.return_value.widget = mock_combo
with patch(
"bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area.QMessageBox.question"
"bec_widgets.widgets.containers.dock_area.dock_area.QMessageBox.question"
) as mock_question:
mock_question.return_value = QMessageBox.Yes

View File

@@ -4,7 +4,6 @@ import pyqtgraph as pg
import pytest
from bec_widgets.widgets.plots.image.image_base import ImageLayerManager
from bec_widgets.widgets.plots.image.image_item import ImageItem
@pytest.fixture()

View File

@@ -1,6 +1,7 @@
import numpy as np
import pyqtgraph as pg
import pytest
from bec_lib.endpoints import MessageEndpoints
from qtpy.QtCore import QPointF
from bec_widgets.widgets.plots.image.image import Image
@@ -12,6 +13,23 @@ from tests.unit_tests.conftest import create_widget
##################################################
def _set_signal_config(
client,
device_name: str,
signal_name: str,
signal_class: str,
ndim: int,
obj_name: str | None = None,
):
device = client.device_manager.devices[device_name]
device._info["signals"][signal_name] = {
"obj_name": obj_name or signal_name,
"signal_class": signal_class,
"component_name": signal_name,
"describe": {"signal_info": {"ndim": ndim}},
}
def test_initialization_defaults(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
assert bec_image_view.color_map == "plasma"
@@ -114,32 +132,35 @@ def test_enable_colorbar_with_vrange(qtbot, mocked_client, colorbar_type):
##############################################
# Previewsignal update mechanism
# Device/signal update mechanism
def test_image_setup_preview_signal_1d(qtbot, mocked_client, monkeypatch):
def test_image_setup_preview_signal_1d(qtbot, mocked_client):
"""
Ensure that calling .image() with a (device, signal, config) tuple representing
a 1D PreviewSignal connects using the 1D path and updates correctly.
Ensure that calling .image() with a 1D PreviewSignal connects using the 1D path
and updates correctly.
"""
import numpy as np
view = create_widget(qtbot, Image, client=mocked_client)
signal_config = {
"obj_name": "waveform1d_img",
"signal_class": "PreviewSignal",
"describe": {"signal_info": {"ndim": 1}},
}
_set_signal_config(
mocked_client,
"waveform1d",
"img",
signal_class="PreviewSignal",
ndim=1,
obj_name="waveform1d_img",
)
# Set the image monitor to the preview signal
view.image(monitor=("waveform1d", "img", signal_config))
view.image(device_name="waveform1d", device_entry="img")
# Subscriptions should indicate 1D preview connection
sub = view.subscriptions["main"]
assert sub.source == "device_monitor_1d"
assert sub.monitor_type == "1d"
assert sub.monitor == ("waveform1d", "img", signal_config)
assert view.device_name == "waveform1d"
assert view.device_entry == "img"
# Simulate a waveform update from the dispatcher
waveform = np.arange(25, dtype=float)
@@ -148,29 +169,32 @@ def test_image_setup_preview_signal_1d(qtbot, mocked_client, monkeypatch):
np.testing.assert_array_equal(view.main_image.raw_data[0], waveform)
def test_image_setup_preview_signal_2d(qtbot, mocked_client, monkeypatch):
def test_image_setup_preview_signal_2d(qtbot, mocked_client):
"""
Ensure that calling .image() with a (device, signal, config) tuple representing
a 2D PreviewSignal connects using the 2D path and updates correctly.
Ensure that calling .image() with a 2D PreviewSignal connects using the 2D path
and updates correctly.
"""
import numpy as np
view = create_widget(qtbot, Image, client=mocked_client)
signal_config = {
"obj_name": "eiger_img2d",
"signal_class": "PreviewSignal",
"describe": {"signal_info": {"ndim": 2}},
}
_set_signal_config(
mocked_client,
"eiger",
"img2d",
signal_class="PreviewSignal",
ndim=2,
obj_name="eiger_img2d",
)
# Set the image monitor to the preview signal
view.image(monitor=("eiger", "img2d", signal_config))
view.image(device_name="eiger", device_entry="img2d")
# Subscriptions should indicate 2D preview connection
sub = view.subscriptions["main"]
assert sub.source == "device_monitor_2d"
assert sub.monitor_type == "2d"
assert sub.monitor == ("eiger", "img2d", signal_config)
assert view.device_name == "eiger"
assert view.device_entry == "img2d"
# Simulate a 2D image update
test_data = np.arange(16, dtype=float).reshape(4, 4)
@@ -178,38 +202,197 @@ def test_image_setup_preview_signal_2d(qtbot, mocked_client, monkeypatch):
np.testing.assert_array_equal(view.main_image.image, test_data)
def test_preview_signals_skip_0d_entries(qtbot, mocked_client, monkeypatch):
"""
Preview/async combobox should omit 0D signals.
"""
view = create_widget(qtbot, Image, client=mocked_client)
def fake_get(signal_class_filter):
signal_classes = (
signal_class_filter
if isinstance(signal_class_filter, (list, tuple, set))
else [signal_class_filter]
)
if "PreviewSignal" in signal_classes:
return [
(
"eiger",
"sig0d",
{
"obj_name": "sig0d",
"signal_class": "PreviewSignal",
"describe": {"signal_info": {"ndim": 0}},
},
),
(
"eiger",
"sig2d",
{
"obj_name": "sig2d",
"signal_class": "PreviewSignal",
"describe": {"signal_info": {"ndim": 2}},
},
),
]
return []
monkeypatch.setattr(view.client.device_manager, "get_bec_signals", fake_get)
device_selection = view.toolbar.components.get_action("device_selection").widget
device_selection.signal_combo_box.set_device("eiger")
device_selection.signal_combo_box.update_signals_from_signal_classes()
texts = [
device_selection.signal_combo_box.itemText(i)
for i in range(device_selection.signal_combo_box.count())
]
assert "sig0d" not in texts
assert "sig2d" in texts
def test_image_async_signal_uses_obj_name(qtbot, mocked_client, monkeypatch):
"""
Verify async signals use obj_name for endpoints/payloads and reconnect with scan_id.
"""
view = create_widget(qtbot, Image, client=mocked_client)
_set_signal_config(
mocked_client, "eiger", "img", signal_class="AsyncSignal", ndim=1, obj_name="async_obj"
)
view.image(device_name="eiger", device_entry="img")
assert view.subscriptions["main"].async_signal_name == "async_obj"
assert view.async_update is True
# Prepare scan ids and capture dispatcher calls
view.old_scan_id = "old_scan"
view.scan_id = "new_scan"
connected = []
disconnected = []
monkeypatch.setattr(
view.bec_dispatcher,
"connect_slot",
lambda slot, endpoint, from_start=False, cb_info=None: connected.append(
(slot, endpoint, from_start, cb_info)
),
)
monkeypatch.setattr(
view.bec_dispatcher,
"disconnect_slot",
lambda slot, endpoint: disconnected.append((slot, endpoint)),
)
view._setup_async_image(view.scan_id)
expected_new = MessageEndpoints.device_async_signal("new_scan", "eiger", "async_obj")
expected_old = MessageEndpoints.device_async_signal("old_scan", "eiger", "async_obj")
assert any(ep == expected_new for _, ep, _, _ in connected)
assert any(ep == expected_old for _, ep in disconnected)
# Payload extraction should use obj_name
payload = np.array([1, 2, 3])
msg = {"signals": {"async_obj": {"value": payload}}}
assert np.array_equal(view._get_payload_data(msg), payload)
def test_disconnect_clears_async_state(qtbot, mocked_client, monkeypatch):
view = create_widget(qtbot, Image, client=mocked_client)
_set_signal_config(
mocked_client, "eiger", "img", signal_class="AsyncSignal", ndim=2, obj_name="async_obj"
)
view.image(device_name="eiger", device_entry="img")
view.scan_id = "scan_x"
view.old_scan_id = "scan_y"
view.subscriptions["main"].async_signal_name = "async_obj"
# Avoid touching real dispatcher
monkeypatch.setattr(view.bec_dispatcher, "disconnect_slot", lambda *args, **kwargs: None)
view.disconnect_monitor(device_name="eiger", device_entry="img")
assert view.subscriptions["main"].async_signal_name is None
assert view.async_update is False
##############################################
# Device monitor endpoint update mechanism
# Connection guardrails
def test_image_setup_image_2d(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.image(monitor="eiger", monitor_type="2d")
assert bec_image_view.monitor == "eiger"
assert bec_image_view.subscriptions["main"].source == "device_monitor_2d"
assert bec_image_view.subscriptions["main"].monitor_type == "2d"
assert bec_image_view.main_image.raw_data is None
assert bec_image_view.main_image.image is None
def test_image_setup_rejects_unsupported_signal_class(qtbot, mocked_client):
view = create_widget(qtbot, Image, client=mocked_client)
_set_signal_config(mocked_client, "eiger", "img", signal_class="Signal", ndim=2)
view.image(device_name="eiger", device_entry="img")
assert view.subscriptions["main"].source is None
assert view.subscriptions["main"].monitor_type is None
assert view.async_update is False
def test_image_setup_image_1d(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.image(monitor="eiger", monitor_type="1d")
assert bec_image_view.monitor == "eiger"
assert bec_image_view.subscriptions["main"].source == "device_monitor_1d"
assert bec_image_view.subscriptions["main"].monitor_type == "1d"
assert bec_image_view.main_image.raw_data is None
assert bec_image_view.main_image.image is None
def test_image_disconnects_with_missing_entry(qtbot, mocked_client):
view = create_widget(qtbot, Image, client=mocked_client)
_set_signal_config(mocked_client, "eiger", "img", signal_class="PreviewSignal", ndim=2)
view.image(device_name="eiger", device_entry="img")
assert view.device_name == "eiger"
assert view.device_entry == "img"
view.image(device_name="eiger", device_entry=None)
assert view.device_name == ""
assert view.device_entry == ""
def test_image_setup_image_auto(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.image(monitor="eiger", monitor_type="auto")
assert bec_image_view.monitor == "eiger"
assert bec_image_view.subscriptions["main"].source == "auto"
assert bec_image_view.subscriptions["main"].monitor_type == "auto"
assert bec_image_view.main_image.raw_data is None
assert bec_image_view.main_image.image is None
def test_handle_scan_change_clears_buffers_and_resets_crosshair(qtbot, mocked_client, monkeypatch):
view = create_widget(qtbot, Image, client=mocked_client)
view.scan_id = "scan_1"
view.main_image.buffer = [np.array([1.0, 2.0])]
view.main_image.max_len = 2
clear_called = []
monkeypatch.setattr(view.main_image, "clear", lambda: clear_called.append(True))
reset_called = []
if view.crosshair is not None:
monkeypatch.setattr(view.crosshair, "reset", lambda: reset_called.append(True))
view._handle_scan_change("scan_2")
assert view.old_scan_id == "scan_1"
assert view.scan_id == "scan_2"
assert clear_called == [True]
assert view.main_image.buffer == []
assert view.main_image.max_len == 0
if view.crosshair is not None:
assert reset_called == [True]
def test_handle_scan_change_reconnects_async(qtbot, mocked_client, monkeypatch):
view = create_widget(qtbot, Image, client=mocked_client)
view.scan_id = "scan_1"
view.async_update = True
called = []
monkeypatch.setattr(view, "_setup_async_image", lambda scan_id: called.append(scan_id))
view._handle_scan_change("scan_2")
assert called == ["scan_2"]
def test_handle_scan_change_same_scan_noop(qtbot, mocked_client, monkeypatch):
view = create_widget(qtbot, Image, client=mocked_client)
view.scan_id = "scan_1"
view.main_image.buffer = [np.array([1.0])]
view.main_image.max_len = 1
clear_called = []
monkeypatch.setattr(view.main_image, "clear", lambda: clear_called.append(True))
view._handle_scan_change("scan_1")
assert view.scan_id == "scan_1"
assert clear_called == []
assert view.main_image.buffer == [np.array([1.0])]
assert view.main_image.max_len == 1
def test_image_data_update_2d(qtbot, mocked_client):
@@ -245,8 +428,7 @@ def test_toolbar_actions_presence(qtbot, mocked_client):
assert bec_image_view.toolbar.components.exists("image_autorange")
assert bec_image_view.toolbar.components.exists("lock_aspect_ratio")
assert bec_image_view.toolbar.components.exists("image_processing_fft")
assert bec_image_view.toolbar.components.exists("image_device_combo")
assert bec_image_view.toolbar.components.exists("image_dim_combo")
assert bec_image_view.toolbar.components.exists("device_selection")
def test_auto_emit_syncs_image_toolbar_actions(qtbot, mocked_client):
@@ -327,13 +509,40 @@ def test_setting_vrange_with_colorbar(qtbot, mocked_client, colorbar_type):
###################################
def test_setup_image_from_toolbar(qtbot, mocked_client):
def test_setup_image_from_toolbar(qtbot, mocked_client, monkeypatch):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.device_combo_box.setCurrentText("eiger")
bec_image_view.dim_combo_box.setCurrentText("2d")
_set_signal_config(mocked_client, "eiger", "img", signal_class="PreviewSignal", ndim=2)
monkeypatch.setattr(
mocked_client.device_manager,
"get_bec_signals",
lambda signal_class_filter: (
[
(
"eiger",
"img",
{
"obj_name": "img",
"signal_class": "PreviewSignal",
"describe": {"signal_info": {"ndim": 2}},
},
)
]
if "PreviewSignal" in (signal_class_filter or [])
else []
),
)
assert bec_image_view.monitor == "eiger"
device_selection = bec_image_view.toolbar.components.get_action("device_selection").widget
device_selection.device_combo_box.update_devices_from_filters()
device_selection.device_combo_box.setCurrentText("eiger")
device_selection.signal_combo_box.setCurrentText("img")
bec_image_view.on_device_selection_changed(None)
qtbot.wait(200)
assert bec_image_view.device_name == "eiger"
assert bec_image_view.device_entry == "img"
assert bec_image_view.subscriptions["main"].source == "device_monitor_2d"
assert bec_image_view.subscriptions["main"].monitor_type == "2d"
assert bec_image_view.main_image.raw_data is None
@@ -598,90 +807,59 @@ def test_roi_plot_data_from_image(qtbot, mocked_client):
##############################################
# MonitorSelectionToolbarBundle specific tests
# Device selection toolbar sync
##############################################
def test_monitor_selection_reverse_device_items(qtbot, mocked_client):
"""
Verify that _reverse_device_items correctly reverses the order of items in the
device combobox while preserving the current selection.
"""
def test_device_selection_syncs_from_properties(qtbot, mocked_client, monkeypatch):
view = create_widget(qtbot, Image, client=mocked_client)
combo = view.device_combo_box
# Replace existing items with a deterministic list
combo.clear()
combo.addItem("samx", 1)
combo.addItem("samy", 2)
combo.addItem("samz", 3)
combo.setCurrentText("samy")
# Reverse the items
view._reverse_device_items()
# Order should be reversed and selection preserved
assert [combo.itemText(i) for i in range(combo.count())] == ["samz", "samy", "samx"]
assert combo.currentText() == "samy"
def test_monitor_selection_populate_preview_signals(qtbot, mocked_client, monkeypatch):
"""
Verify that _populate_preview_signals adds previewsignal devices to the combobox
with the correct userData.
"""
view = create_widget(qtbot, Image, client=mocked_client)
# Provide a deterministic fake device_manager with get_bec_signals
class _FakeDM:
def get_bec_signals(self, _filter):
return [
("eiger", "img", {"obj_name": "eiger_img"}),
("async_device", "img2", {"obj_name": "async_device_img2"}),
_set_signal_config(mocked_client, "eiger", "img2d", signal_class="PreviewSignal", ndim=2)
monkeypatch.setattr(
view.client.device_manager,
"get_bec_signals",
lambda signal_class_filter: (
[
(
"eiger",
"img2d",
{
"obj_name": "img2d",
"signal_class": "PreviewSignal",
"describe": {"signal_info": {"ndim": 2}},
},
)
]
if "PreviewSignal" in (signal_class_filter or [])
else []
),
)
monkeypatch.setattr(view.client, "device_manager", _FakeDM())
view.device_name = "eiger"
view.device_entry = "img2d"
initial_count = view.device_combo_box.count()
qtbot.wait(200) # Allow signal processing
view._populate_preview_signals()
# Two new entries should have been added
assert view.device_combo_box.count() == initial_count + 2
# The first newly added item should carry tuple userData describing the device/signal
data = view.device_combo_box.itemData(initial_count)
assert isinstance(data, tuple) and data[0] == "eiger"
device_selection = view.toolbar.components.get_action("device_selection").widget
qtbot.waitUntil(
lambda: device_selection.device_combo_box.currentText() == "eiger"
and device_selection.signal_combo_box.currentText() == "img2d",
timeout=1000,
)
def test_monitor_selection_adjust_and_connect(qtbot, mocked_client, monkeypatch):
"""
Verify that _adjust_and_connect performs the full set-up:
- fills the combobox with preview signals,
- reverses their order,
- and resets the currentText to an empty string.
"""
def test_device_entry_syncs_from_toolbar(qtbot, mocked_client):
view = create_widget(qtbot, Image, client=mocked_client)
_set_signal_config(mocked_client, "eiger", "img_a", signal_class="PreviewSignal", ndim=2)
_set_signal_config(mocked_client, "eiger", "img_b", signal_class="PreviewSignal", ndim=2)
# Deterministic fake device_manager
class _FakeDM:
def get_bec_signals(self, _filter):
return [("eiger", "img", {"obj_name": "eiger_img"})]
view.device_name = "eiger"
view.device_entry = "img_a"
monkeypatch.setattr(view.client, "device_manager", _FakeDM())
device_selection = view.toolbar.components.get_action("device_selection").widget
device_selection.signal_combo_box.blockSignals(True)
device_selection.signal_combo_box.setCurrentText("img_b")
device_selection.signal_combo_box.blockSignals(False)
combo = view.device_combo_box
# Start from a clean state
combo.clear()
combo.addItem("", None)
combo.setCurrentText("")
view._sync_device_entry_from_toolbar()
# Execute the method under test
view._adjust_and_connect()
# Expect exactly two items: preview label followed by the empty default
assert combo.count() == 2
# Because of the reversal, the preview label comes first
assert combo.itemText(0) == "eiger_img"
# Current selection remains empty
assert combo.currentText() == ""
assert view.device_entry == "img_b"

View File

@@ -9,7 +9,7 @@ def test_rpc_widget_handler():
handler = RPCWidgetHandler()
assert "Image" in handler.widget_classes
assert "RingProgressBar" in handler.widget_classes
assert "AdvancedDockArea" in handler.widget_classes
assert "BECDockArea" in handler.widget_classes
class _TestPluginWidget(BECWidget): ...

View File

@@ -0,0 +1,38 @@
from qtpy.QtCore import QRect
from qtpy.QtWidgets import QWidget
from bec_widgets.utils.screen_utils import (
apply_centered_size,
centered_geometry,
main_app_size_for_screen,
)
def test_centered_geometry_returns_expected_tuple():
available = QRect(100, 50, 800, 600)
result = centered_geometry(available, 400, 300)
assert result == (300, 200, 400, 300)
def test_main_app_size_for_screen_respects_16_9_and_screen_caps():
available = QRect(0, 0, 1920, 1080)
width, height = main_app_size_for_screen(available)
assert (width, height) == (1728, 972)
narrow = QRect(0, 0, 1000, 800)
width, height = main_app_size_for_screen(narrow)
assert (width, height) == (900, 506)
def test_apply_centered_size_uses_provided_geometry(qtbot):
widget = QWidget()
qtbot.addWidget(widget)
available = QRect(10, 20, 600, 400)
apply_centered_size(widget, 200, 100, available=available)
geometry = widget.geometry()
assert geometry.x() == 210
assert geometry.y() == 170
assert geometry.width() == 200
assert geometry.height() == 100

View File

@@ -0,0 +1,97 @@
from __future__ import annotations
import pytest
from bec_lib.messages import BeamlineConditionUpdateEntry
from qtpy.QtWidgets import QToolBar
from bec_widgets.utils.toolbars.actions import StatusIndicatorAction, StatusIndicatorWidget
from bec_widgets.utils.toolbars.status_bar import BECStatusBroker, StatusToolBar
from .client_mocks import mocked_client
from .conftest import create_widget
class TestStatusIndicators:
"""Widget/action level tests independent of broker wiring."""
def test_indicator_widget_state_and_text(self, qtbot):
widget = StatusIndicatorWidget(text="Ready", state="success")
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
widget.set_state("warning")
widget.set_text("Alert")
assert widget._state.value == "warning"
assert widget._text_label.text() == "Alert"
def test_indicator_action_updates_widget_and_action(self, qtbot):
qt_toolbar = QToolBar()
qtbot.addWidget(qt_toolbar)
action = StatusIndicatorAction(text="Ready", tooltip="Initial")
action.add_to_toolbar(qt_toolbar, qt_toolbar)
action.set_tooltip("Updated tooltip")
action.set_text("Running")
assert action.action.toolTip() == "Updated tooltip"
assert action.widget.toolTip() == "Updated tooltip" # type: ignore[union-attr]
assert action.widget._text_label.text() == "Running" # type: ignore[union-attr]
class TestStatusBar:
"""Status bar + broker integration using fake redis client (mocked_client)."""
@pytest.fixture(params=[{}, {"names": ["alpha"]}])
def status_toolbar(self, qtbot, mocked_client, request):
broker = BECStatusBroker(client=mocked_client)
toolbar = create_widget(qtbot, StatusToolBar, **request.param)
yield toolbar
broker.reset_singleton()
def test_allowed_names_precreates_placeholder(self, status_toolbar):
status_toolbar.broker.refresh_available = lambda: None
status_toolbar.refresh_from_broker()
# We parametrize the fixture so one invocation has allowed_names set.
if status_toolbar.allowed_names:
name = next(iter(status_toolbar.allowed_names))
assert status_toolbar.components.exists(name)
act = status_toolbar.components.get_action(name)
assert isinstance(act, StatusIndicatorAction)
assert act.widget._text_label.text() == name # type: ignore[union-attr]
def test_on_available_adds_and_removes(self, status_toolbar):
conditions = [
BeamlineConditionUpdateEntry(name="c1", title="Cond 1", condition_type="test"),
BeamlineConditionUpdateEntry(name="c2", title="Cond 2", condition_type="test"),
]
status_toolbar.on_available_updated(conditions)
assert status_toolbar.components.exists("c1")
assert status_toolbar.components.exists("c2")
conditions2 = [
BeamlineConditionUpdateEntry(name="c1", title="Cond 1", condition_type="test")
]
status_toolbar.on_available_updated(conditions2)
assert status_toolbar.components.exists("c1")
assert not status_toolbar.components.exists("c2")
def test_on_status_updated_sets_title_and_message(self, status_toolbar):
status_toolbar.add_status_item("beam", text="Initial", state="default", tooltip=None)
payload = {"name": "beam", "status": "warning", "title": "New Title", "message": "Detail"}
status_toolbar.on_status_updated("beam", payload)
action = status_toolbar.components.get_action("beam")
assert isinstance(action, StatusIndicatorAction)
assert action.widget._text_label.text() == "New Title" # type: ignore[union-attr]
assert action.action.toolTip() == "Detail"
def test_on_status_updated_keeps_existing_text_when_no_title(self, status_toolbar):
status_toolbar.add_status_item("beam", text="Keep Me", state="default", tooltip=None)
payload = {"name": "beam", "status": "normal", "message": "Note"}
status_toolbar.on_status_updated("beam", payload)
action = status_toolbar.components.get_action("beam")
assert isinstance(action, StatusIndicatorAction)
assert action.widget._text_label.text() == "Keep Me" # type: ignore[union-attr]
assert action.action.toolTip() == "Note"

View File

@@ -1,91 +0,0 @@
import os
import shlex
import subprocess
from unittest import mock
import pytest
from bec_widgets.widgets.editors.vscode.vscode import VSCodeEditor
from .client_mocks import mocked_client
@pytest.fixture
def vscode_widget(qtbot, mocked_client):
with mock.patch("bec_widgets.widgets.editors.vscode.vscode.subprocess.Popen") as mock_popen:
widget = VSCodeEditor(client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def test_vscode_widget(qtbot, vscode_widget):
assert vscode_widget.process is not None
assert vscode_widget._url == f"http://127.0.0.1:{vscode_widget.port}?tkn=bec"
def test_start_server(qtbot, mocked_client):
with mock.patch("bec_widgets.widgets.editors.vscode.vscode.os.killpg") as mock_killpg:
with mock.patch("bec_widgets.widgets.editors.vscode.vscode.os.getpgid") as mock_getpgid:
with mock.patch(
"bec_widgets.widgets.editors.vscode.vscode.subprocess.Popen"
) as mock_popen:
with mock.patch(
"bec_widgets.widgets.editors.vscode.vscode.select.select"
) as mock_select:
with mock.patch(
"bec_widgets.widgets.editors.vscode.vscode.get_free_port"
) as mock_get_free_port:
mock_get_free_port.return_value = 12345
mock_process = mock.Mock()
mock_process.stdout.fileno.return_value = 1
mock_process.poll.return_value = None
mock_process.stdout.read.return_value = f"available at http://{VSCodeEditor.host}:{12345}?tkn={VSCodeEditor.token}"
mock_popen.return_value = mock_process
mock_select.return_value = [[mock_process.stdout], [], []]
widget = VSCodeEditor(client=mocked_client)
widget.close()
widget.deleteLater()
assert (
mock.call(
shlex.split(
f"code serve-web --port {widget.port} --connection-token={widget.token} --accept-server-license-terms"
),
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
preexec_fn=os.setsid,
env=mock.ANY,
)
in mock_popen.mock_calls
)
@pytest.fixture
def patched_vscode_process(qtbot, vscode_widget):
with mock.patch("bec_widgets.widgets.editors.vscode.vscode.os.killpg") as mock_killpg:
mock_killpg.reset_mock()
with mock.patch("bec_widgets.widgets.editors.vscode.vscode.os.getpgid") as mock_getpgid:
mock_getpgid.return_value = 123
vscode_widget.process = mock.Mock()
yield vscode_widget, mock_killpg
def test_vscode_cleanup(qtbot, patched_vscode_process):
vscode_patched, mock_killpg = patched_vscode_process
vscode_patched.process.pid = 123
vscode_patched.process.poll.return_value = None
vscode_patched.cleanup_vscode()
mock_killpg.assert_called_once_with(123, 15)
vscode_patched.process.wait.assert_called_once()
def test_close_event_on_terminated_code(qtbot, patched_vscode_process):
vscode_patched, mock_killpg = patched_vscode_process
vscode_patched.process.pid = 123
vscode_patched.process.poll.return_value = 0
vscode_patched.cleanup_vscode()
mock_killpg.assert_not_called()
vscode_patched.process.wait.assert_not_called()