1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-08 01:37:53 +02:00

Compare commits

..

11 Commits

25 changed files with 3057 additions and 472 deletions

View File

@@ -9,10 +9,10 @@ jobs:
shell: bash -el {0}
env:
CHILD_PIPELINE_BRANCH: main # Set the branch you want for ophyd_devices
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
PLUGIN_REPO_BRANCH: main # Set the branch you want for the plugin repo
CHILD_PIPELINE_BRANCH: main # Set the branch you want for ophyd_devices
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
PLUGIN_REPO_BRANCH: main # Set the branch you want for the plugin repo
PROJECT_PATH: ${{ github.repository }}
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
@@ -23,15 +23,16 @@ jobs:
- name: Set up Conda
uses: conda-incubator/setup-miniconda@v3
with:
auto-update-conda: true
auto-activate-base: true
python-version: '3.11'
auto-update-conda: true
auto-activate-base: true
python-version: "3.11"
- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y libgl1 libegl1 x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb
sudo apt-get -y install libnss3 libxdamage1 libasound2t64 libatomic1 libxcursor1
sudo apt-get -y install ttyd
- name: Conda install and run pytest
run: |
@@ -55,4 +56,4 @@ jobs:
with:
name: pytest-logs
path: ./logs/*.log
retention-days: 7
retention-days: 7

View File

@@ -18,7 +18,7 @@ from bec_widgets.widgets.containers.advanced_dock_area.basic_dock_area import Do
from bec_widgets.widgets.containers.qt_ads import CDockWidget
from bec_widgets.widgets.editors.monaco.monaco_dock import MonacoDock
from bec_widgets.widgets.editors.monaco.monaco_widget import MonacoWidget
from bec_widgets.widgets.editors.web_console.web_console import WebConsole
from bec_widgets.widgets.editors.web_console.web_console import BECShell, WebConsole
from bec_widgets.widgets.utility.ide_explorer.ide_explorer import IDEExplorer
@@ -91,9 +91,10 @@ class DeveloperWidget(DockAreaWidget):
# Initialize the widgets
self.explorer = IDEExplorer(self)
self.explorer.setObjectName("Explorer")
self.console = WebConsole(self)
self.console.setObjectName("Console")
self.terminal = WebConsole(self, startup_cmd="")
self.console = BECShell(self)
self.console.setObjectName("BEC Shell")
self.terminal = WebConsole(self)
self.terminal.setObjectName("Terminal")
self.monaco = MonacoDock(self)
self.monaco.setObjectName("MonacoEditor")

View File

@@ -30,6 +30,7 @@ _Widgets = {
"BECMainWindow": "BECMainWindow",
"BECProgressBar": "BECProgressBar",
"BECQueue": "BECQueue",
"BECShell": "BECShell",
"BECStatusBox": "BECStatusBox",
"DapComboBox": "DapComboBox",
"DarkModeButton": "DarkModeButton",
@@ -495,6 +496,28 @@ class BECQueue(RPCBase):
"""
class BECShell(RPCBase):
"""A WebConsole pre-configured to run the BEC shell."""
@rpc_call
def remove(self):
"""
Cleanup the BECConnector
"""
@rpc_call
def attach(self):
"""
None
"""
@rpc_call
def detach(self):
"""
Detach the widget from its parent dock widget (if widget is in the dock), making it a floating widget.
"""
class BECStatusBox(RPCBase):
"""An autonomous widget to display the status of BEC services."""
@@ -2016,6 +2039,90 @@ class Heatmap(RPCBase):
reload (bool): Whether to reload the heatmap with new data.
"""
@property
@rpc_call
def x_device_name(self) -> "str":
"""
Device name for the X axis.
"""
@x_device_name.setter
@rpc_call
def x_device_name(self) -> "str":
"""
Device name for the X axis.
"""
@property
@rpc_call
def x_device_entry(self) -> "str":
"""
Signal entry for the X axis device.
"""
@x_device_entry.setter
@rpc_call
def x_device_entry(self) -> "str":
"""
Signal entry for the X axis device.
"""
@property
@rpc_call
def y_device_name(self) -> "str":
"""
Device name for the Y axis.
"""
@y_device_name.setter
@rpc_call
def y_device_name(self) -> "str":
"""
Device name for the Y axis.
"""
@property
@rpc_call
def y_device_entry(self) -> "str":
"""
Signal entry for the Y axis device.
"""
@y_device_entry.setter
@rpc_call
def y_device_entry(self) -> "str":
"""
Signal entry for the Y axis device.
"""
@property
@rpc_call
def z_device_name(self) -> "str":
"""
Device name for the Z (color) axis.
"""
@z_device_name.setter
@rpc_call
def z_device_name(self) -> "str":
"""
Device name for the Z (color) axis.
"""
@property
@rpc_call
def z_device_entry(self) -> "str":
"""
Signal entry for the Z (color) axis device.
"""
@z_device_entry.setter
@rpc_call
def z_device_entry(self) -> "str":
"""
Signal entry for the Z (color) axis device.
"""
class Image(RPCBase):
"""Image widget for displaying 2D data."""
@@ -5265,13 +5372,6 @@ class ScatterWaveform(RPCBase):
Take a screenshot of the dock area and save it to a file.
"""
@property
@rpc_call
def main_curve(self) -> "ScatterCurve":
"""
The main scatter curve item.
"""
@property
@rpc_call
def color_map(self) -> "str":
@@ -5334,6 +5434,90 @@ class ScatterWaveform(RPCBase):
Clear all the curves from the plot.
"""
@property
@rpc_call
def x_device_name(self) -> "str":
"""
Device name for the X axis.
"""
@x_device_name.setter
@rpc_call
def x_device_name(self) -> "str":
"""
Device name for the X axis.
"""
@property
@rpc_call
def x_device_entry(self) -> "str":
"""
Signal entry for the X axis device.
"""
@x_device_entry.setter
@rpc_call
def x_device_entry(self) -> "str":
"""
Signal entry for the X axis device.
"""
@property
@rpc_call
def y_device_name(self) -> "str":
"""
Device name for the Y axis.
"""
@y_device_name.setter
@rpc_call
def y_device_name(self) -> "str":
"""
Device name for the Y axis.
"""
@property
@rpc_call
def y_device_entry(self) -> "str":
"""
Signal entry for the Y axis device.
"""
@y_device_entry.setter
@rpc_call
def y_device_entry(self) -> "str":
"""
Signal entry for the Y axis device.
"""
@property
@rpc_call
def z_device_name(self) -> "str":
"""
Device name for the Z (color) axis.
"""
@z_device_name.setter
@rpc_call
def z_device_name(self) -> "str":
"""
Device name for the Z (color) axis.
"""
@property
@rpc_call
def z_device_entry(self) -> "str":
"""
Signal entry for the Z (color) axis device.
"""
@z_device_entry.setter
@rpc_call
def z_device_entry(self) -> "str":
"""
Signal entry for the Z (color) axis device.
"""
class SignalComboBox(RPCBase):
"""Line edit widget for device input with autocomplete for device names."""
@@ -5366,6 +5550,15 @@ class SignalComboBox(RPCBase):
list[str]: List of device signals.
"""
@rpc_call
def get_signal_name(self) -> "str":
"""
Get the signal name from the combobox.
Returns:
str: The signal name.
"""
class SignalLabel(RPCBase):
@property

View File

@@ -1,20 +1,16 @@
# pylint: disable = no-name-in-module,missing-module-docstring
from __future__ import annotations
import inspect
import os
import time
import traceback
import uuid
import weakref
from datetime import datetime
from typing import TYPE_CHECKING, Callable, Final, Optional
from weakref import WeakMethod, WeakValueDictionary
from typing import TYPE_CHECKING, Optional
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import_from
from louie import saferef
from pydantic import BaseModel, ConfigDict, Field, field_validator
from pydantic import BaseModel, Field, field_validator
from qtpy.QtCore import QObject, QRunnable, QThreadPool, QTimer, Signal
from qtpy.QtWidgets import QApplication
@@ -33,12 +29,6 @@ else:
logger = bec_logger.logger
class _NextSentinel(object): ...
NEXT_SENTINEL: Final[_NextSentinel] = _NextSentinel()
class ConnectionConfig(BaseModel):
"""Configuration for BECConnector mixin class"""
@@ -46,7 +36,7 @@ class ConnectionConfig(BaseModel):
gui_id: Optional[str] = Field(
default=None, validate_default=True, description="The GUI ID of the widget."
)
model_config: ConfigDict = {"validate_assignment": True}
model_config: dict = {"validate_assignment": True}
@field_validator("gui_id")
@classmethod
@@ -87,9 +77,7 @@ class BECConnector:
"""Connection mixin class to handle BEC client and device manager"""
USER_ACCESS = ["_config_dict", "_get_all_rpc", "_rpc_id"]
EXIT_HANDLERS: WeakValueDictionary[int, Callable[[],]] = WeakValueDictionary()
_exit_handler: Callable[[],] | None = None
_method_handlers: set[Callable[[],]] = set()
EXIT_HANDLERS = {}
widget_removed = Signal()
name_established = Signal(str)
@@ -134,7 +122,7 @@ class BECConnector:
self.client = self.bec_dispatcher.client if client is None else client
self.rpc_register = RPCRegister()
if not 0 in BECConnector.EXIT_HANDLERS.keys():
if not self.client in BECConnector.EXIT_HANDLERS:
# register function to clean connections at exit;
# the function depends on BECClient, and BECDispatcher
@SafeSlot()
@@ -155,9 +143,8 @@ class BECConnector:
logger.info("Shutting down BEC Client", repr(client))
client.shutdown()
BECConnector._exit_handler = terminate # type: ignore # keep a strong reference to the final cleanup
BECConnector._add_exit_handler(terminate, 0)
QApplication.instance().aboutToQuit.connect(self._run_exit_handlers)
BECConnector.EXIT_HANDLERS[self.client] = terminate
QApplication.instance().aboutToQuit.connect(terminate)
if config:
self.config = config
@@ -200,51 +187,6 @@ class BECConnector:
QTimer.singleShot(0, self._update_object_name)
@classmethod
def _add_exit_handler(cls, handler: Callable, priority: int):
"""Private to allow use of priority 0"""
if inspect.ismethod(handler):
_h = saferef.safe_ref(handler)
def handler():
if h := _h():
h()
# cls._method_handlers.add(handler) # hold any instance methods in safe refs
cls.EXIT_HANDLERS[priority] = handler
@classmethod
def add_exit_handler(cls, handler: Callable, priority: int | _NextSentinel = NEXT_SENTINEL):
"""Add a handler to be called on the cleanup of the BEC Connector. Handlers are called in reverse order of their
priority - i.e. a higher number is higher priority. The BEC Connector's own cleanup will always be run last.
"""
existing_priorities = set(cls.EXIT_HANDLERS.keys())
priority_modified = False
if isinstance(priority, _NextSentinel):
priority = max(existing_priorities) + 1
if priority < 1:
raise ValueError(
"Please use a priority greater than 1! Priority 0 is reserved for system cleanup."
)
if priority in cls.EXIT_HANDLERS.keys():
priority_modified = True
logger.warning(f"Priority {priority} already in use - using the next available:")
while priority in cls.EXIT_HANDLERS.keys():
priority += 1
if priority_modified:
logger.warning(f"Assigned priority {priority} for {handler}.")
cls._add_exit_handler(handler, priority)
@SafeSlot()
def _run_exit_handlers(self):
"""Run all exit handlers from highest to lowest priority. Should be connected to AboutToQuit once and only once."""
handlers = list(
reversed(list(handler for _, handler in sorted(BECConnector.EXIT_HANDLERS.items())))
)
for handler in handlers:
handler()
@property
def parent_id(self) -> str | None:
try:

View File

@@ -70,7 +70,7 @@ from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow
from bec_widgets.widgets.containers.qt_ads import CDockWidget
from bec_widgets.widgets.control.device_control.positioner_box import PositionerBox, PositionerBox2D
from bec_widgets.widgets.control.scan_control import ScanControl
from bec_widgets.widgets.editors.web_console.web_console import WebConsole
from bec_widgets.widgets.editors.web_console.web_console import BECShell, WebConsole
from bec_widgets.widgets.plots.heatmap.heatmap import Heatmap
from bec_widgets.widgets.plots.image.image import Image
from bec_widgets.widgets.plots.motor_map.motor_map import MotorMap
@@ -378,7 +378,7 @@ class AdvancedDockArea(DockAreaWidget):
"RingProgressBar",
),
"terminal": (WebConsole.ICON_NAME, "Add Terminal", "WebConsole"),
"bec_shell": (WebConsole.ICON_NAME, "Add BEC Shell", "WebConsole"),
"bec_shell": (BECShell.ICON_NAME, "Add BEC Shell", "BECShell"),
"log_panel": (LogPanel.ICON_NAME, "Add LogPanel - Disabled", "LogPanel"),
"sbb_monitor": ("train", "Add SBB Monitor", "SBBMonitor"),
}
@@ -501,10 +501,7 @@ class AdvancedDockArea(DockAreaWidget):
elif key == "bec_shell":
act.triggered.connect(
lambda _, t=widget_type: self.new(
widget=t,
closable=True,
startup_cmd=f"bec --gui-id {self.bec_dispatcher.cli_server.gui_id}",
show_settings_action=True,
widget=t, closable=True, show_settings_action=False
)
)
else:

View File

@@ -27,7 +27,7 @@ class SignalComboBox(DeviceSignalInputBase, QComboBox):
arg_name: Argument name, can be used for the other widgets which has to call some other function in bec using correct argument names.
"""
USER_ACCESS = ["set_signal", "set_device", "signals"]
USER_ACCESS = ["set_signal", "set_device", "signals", "get_signal_name"]
ICON_NAME = "list_alt"
PLUGIN = True
@@ -148,6 +148,24 @@ class SignalComboBox(DeviceSignalInputBase, QComboBox):
return True
return False
def get_signal_name(self) -> str:
"""
Get the signal name from the combobox.
Returns:
str: The signal name.
"""
signal_name = self.currentText()
index = self.findText(signal_name)
if index == -1:
return signal_name
signal_info = self.itemData(index)
if signal_info:
signal_name = signal_info.get("obj_name", signal_name)
return signal_name if signal_name else ""
@SafeSlot()
def reset_selection(self):
"""Reset the selection of the combobox."""

View File

@@ -0,0 +1 @@
{'files': ['web_console.py']}

View File

@@ -0,0 +1,57 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from qtpy.QtWidgets import QWidget
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.editors.web_console.web_console import BECShell
DOM_XML = """
<ui language='c++'>
<widget class='BECShell' name='bec_shell'>
</widget>
</ui>
"""
class BECShellPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
if parent is None:
return QWidget()
t = BECShell(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return ""
def icon(self):
return designer_material_icon(BECShell.ICON_NAME)
def includeFile(self):
return "bec_shell"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "BECShell"
def toolTip(self):
return ""
def whatsThis(self):
return self.toolTip()

View File

@@ -0,0 +1,15 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.editors.web_console.bec_shell_plugin import BECShellPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(BECShellPlugin())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -1,21 +1,39 @@
from __future__ import annotations
import enum
import json
import secrets
import subprocess
import time
from bec_lib.logger import bec_logger
from louie.saferef import safe_ref
from qtpy.QtCore import QTimer, QUrl, Signal, qInstallMessageHandler
from pydantic import BaseModel
from qtpy.QtCore import Qt, QTimer, QUrl, Signal, qInstallMessageHandler
from qtpy.QtGui import QMouseEvent, QResizeEvent
from qtpy.QtWebEngineWidgets import QWebEnginePage, QWebEngineView
from qtpy.QtWidgets import QApplication, QVBoxLayout, QWidget
from qtpy.QtWidgets import QApplication, QLabel, QTabWidget, QVBoxLayout, QWidget
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeProperty
logger = bec_logger.logger
class ConsoleMode(str, enum.Enum):
ACTIVE = "active"
INACTIVE = "inactive"
HIDDEN = "hidden"
class PageOwnerInfo(BaseModel):
owner_gui_id: str | None = None
widget_ids: list[str] = []
page: QWebEnginePage | None = None
initialized: bool = False
model_config = {"arbitrary_types_allowed": True}
class WebConsoleRegistry:
"""
A registry for the WebConsole class to manage its instances.
@@ -29,14 +47,21 @@ class WebConsoleRegistry:
self._server_process = None
self._server_port = None
self._token = secrets.token_hex(16)
self._page_registry: dict[str, PageOwnerInfo] = {}
def register(self, instance: WebConsole):
"""
Register an instance of WebConsole.
Args:
instance (WebConsole): The instance to register.
"""
self._instances[instance.gui_id] = safe_ref(instance)
self.cleanup()
if instance._unique_id:
self._register_page(instance)
if self._server_process is None:
# Start the ttyd server if not already running
self.start_ttyd()
@@ -141,8 +166,158 @@ class WebConsoleRegistry:
if instance.gui_id in self._instances:
del self._instances[instance.gui_id]
if instance._unique_id:
self._unregister_page(instance._unique_id, instance.gui_id)
self.cleanup()
def _register_page(self, instance: WebConsole):
"""
Register a page in the registry. Please note that this does not transfer ownership
for already existing pages; it simply records which widget currently owns the page.
Use transfer_page_ownership to change ownership.
Args:
instance (WebConsole): The instance to register.
"""
unique_id = instance._unique_id
gui_id = instance.gui_id
if unique_id is None:
return
if unique_id not in self._page_registry:
page = BECWebEnginePage()
page.authenticationRequired.connect(instance._authenticate)
self._page_registry[unique_id] = PageOwnerInfo(
owner_gui_id=gui_id, widget_ids=[gui_id], page=page
)
logger.info(f"Registered new page {unique_id} for {gui_id}")
return
if gui_id not in self._page_registry[unique_id].widget_ids:
self._page_registry[unique_id].widget_ids.append(gui_id)
def _unregister_page(self, unique_id: str, gui_id: str):
"""
Unregister a page from the registry.
Args:
unique_id (str): The unique identifier for the page.
gui_id (str): The GUI ID of the widget.
"""
if unique_id not in self._page_registry:
return
page_info = self._page_registry[unique_id]
if gui_id in page_info.widget_ids:
page_info.widget_ids.remove(gui_id)
if page_info.owner_gui_id == gui_id:
page_info.owner_gui_id = None
if not page_info.widget_ids:
if page_info.page:
page_info.page.deleteLater()
del self._page_registry[unique_id]
logger.info(f"Unregistered page {unique_id} for {gui_id}")
def get_page_info(self, unique_id: str) -> PageOwnerInfo | None:
"""
Get a page from the registry.
Args:
unique_id (str): The unique identifier for the page.
Returns:
PageOwnerInfo | None: The page info if found, None otherwise.
"""
if unique_id not in self._page_registry:
return None
return self._page_registry[unique_id]
def take_page_ownership(self, unique_id: str, new_owner_gui_id: str) -> QWebEnginePage | None:
"""
Transfer ownership of a page to a new owner.
Args:
unique_id (str): The unique identifier for the page.
new_owner_gui_id (str): The GUI ID of the new owner.
Returns:
QWebEnginePage | None: The page if ownership transfer was successful, None otherwise.
"""
if unique_id not in self._page_registry:
logger.warning(f"Page {unique_id} not found in registry")
return None
page_info = self._page_registry[unique_id]
old_owner_gui_id = page_info.owner_gui_id
if old_owner_gui_id:
old_owner_ref = self._instances.get(old_owner_gui_id)
if old_owner_ref:
old_owner_instance = old_owner_ref()
if old_owner_instance:
old_owner_instance.yield_ownership()
page_info.owner_gui_id = new_owner_gui_id
logger.info(f"Transferred ownership of page {unique_id} to {new_owner_gui_id}")
return page_info.page
def yield_ownership(self, gui_id: str) -> bool:
"""
Yield ownership of a page without destroying it. The page remains in the
registry with no owner, available for another widget to claim.
Args:
gui_id (str): The GUI ID of the widget yielding ownership.
Returns:
bool: True if ownership was yielded, False otherwise.
"""
if gui_id not in self._instances:
return False
instance = self._instances[gui_id]()
if instance is None:
return False
unique_id = instance._unique_id
if unique_id is None:
return False
if unique_id not in self._page_registry:
return False
page_owner_info = self._page_registry[unique_id]
if page_owner_info.owner_gui_id != gui_id:
return False
page_owner_info.owner_gui_id = None
return True
def owner_is_visible(self, unique_id: str) -> bool:
"""
Check if the owner of a page is currently visible.
Args:
unique_id (str): The unique identifier for the page.
Returns:
bool: True if the owner is visible, False otherwise.
"""
page_info = self.get_page_info(unique_id)
if page_info is None or page_info.owner_gui_id is None:
return False
owner_ref = self._instances.get(page_info.owner_gui_id)
if owner_ref is None:
return False
owner_instance = owner_ref()
if owner_instance is None:
return False
return owner_instance.isVisible()
_web_console_registry = WebConsoleRegistry()
@@ -178,34 +353,103 @@ class WebConsole(BECWidget, QWidget):
config=None,
client=None,
gui_id=None,
startup_cmd: str | None = "bec --nogui",
startup_cmd: str | None = None,
is_bec_shell: bool = False,
unique_id: str | None = None,
**kwargs,
):
super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs)
self._mode = ConsoleMode.INACTIVE
self._is_bec_shell = is_bec_shell
self._startup_cmd = startup_cmd
self._is_initialized = False
_web_console_registry.register(self)
self._token = _web_console_registry._token
layout = QVBoxLayout()
layout.setContentsMargins(0, 0, 0, 0)
self.browser = QWebEngineView(self)
self.page = BECWebEnginePage(self)
self.page.authenticationRequired.connect(self._authenticate)
self.browser.setPage(self.page)
layout.addWidget(self.browser)
self.setLayout(layout)
self.page.setUrl(QUrl(f"http://localhost:{_web_console_registry._server_port}"))
self._unique_id = unique_id
self.page = None # Will be set in _set_up_page
self._startup_timer = QTimer()
self._startup_timer.setInterval(500)
self._startup_timer.timeout.connect(self._check_page_ready)
self._startup_timer.start()
self._js_callback.connect(self._on_js_callback)
self._set_up_page()
def _set_up_page(self):
"""
Set up the web page and UI elements.
"""
layout = QVBoxLayout()
layout.setContentsMargins(0, 0, 0, 0)
self.browser = QWebEngineView(self)
layout.addWidget(self.browser)
self.setLayout(layout)
# prepare overlay
self.overlay = QWidget(self)
layout = QVBoxLayout(self.overlay)
layout.setAlignment(Qt.AlignmentFlag.AlignCenter)
label = QLabel("Click to activate terminal", self.overlay)
layout.addWidget(label)
self.overlay.hide()
_web_console_registry.register(self)
self._token = _web_console_registry._token
# If no unique_id is provided, create a new page
if not self._unique_id:
self.page = BECWebEnginePage(self)
self.page.authenticationRequired.connect(self._authenticate)
self.page.setUrl(QUrl(f"http://localhost:{_web_console_registry._server_port}"))
self.browser.setPage(self.page)
self._set_mode(ConsoleMode.ACTIVE)
return
# Try to get the page from the registry
page_info = _web_console_registry.get_page_info(self._unique_id)
if page_info and page_info.page:
self.page = page_info.page
if not page_info.owner_gui_id or page_info.owner_gui_id == self.gui_id:
self.browser.setPage(self.page)
# Only set URL if this is a newly created page (no URL set yet)
if self.page.url().isEmpty():
self.page.setUrl(QUrl(f"http://localhost:{_web_console_registry._server_port}"))
else:
# We have an existing page, so we don't need the startup timer
self._startup_timer.stop()
if page_info.owner_gui_id != self.gui_id:
self._set_mode(ConsoleMode.INACTIVE)
else:
self._set_mode(ConsoleMode.ACTIVE)
def _set_mode(self, mode: ConsoleMode):
"""
Set the mode of the web console.
Args:
mode (ConsoleMode): The mode to set.
"""
if not self._unique_id:
# For non-unique_id consoles, always active
mode = ConsoleMode.ACTIVE
self._mode = mode
match mode:
case ConsoleMode.ACTIVE:
self.browser.setVisible(True)
self.overlay.hide()
case ConsoleMode.INACTIVE:
self.browser.setVisible(False)
self.overlay.show()
case ConsoleMode.HIDDEN:
self.browser.setVisible(False)
self.overlay.hide()
def _check_page_ready(self):
"""
Check if the page is ready and stop the timer if it is.
"""
if self.page.isLoading():
if not self.page or self.page.isLoading():
return
self.page.runJavaScript("window.term !== undefined", self._js_callback.emit)
@@ -218,15 +462,27 @@ class WebConsole(BECWidget, QWidget):
return
self._is_initialized = True
self._startup_timer.stop()
if self._startup_cmd:
self.write(self._startup_cmd)
if self.startup_cmd:
if self._unique_id:
page_info = _web_console_registry.get_page_info(self._unique_id)
if page_info is None:
return
if not page_info.initialized:
page_info.initialized = True
self.write(self.startup_cmd)
else:
self.write(self.startup_cmd)
self.initialized.emit()
@SafeProperty(str)
@property
def startup_cmd(self):
"""
Get the startup command for the web console.
"""
if self._is_bec_shell:
if self.bec_dispatcher.cli_server is None:
return "bec --nogui"
return f"bec --gui-id {self.bec_dispatcher.cli_server.gui_id}"
return self._startup_cmd
@startup_cmd.setter
@@ -241,11 +497,123 @@ class WebConsole(BECWidget, QWidget):
def write(self, data: str, send_return: bool = True):
"""
Send data to the web page
Args:
data (str): The data to send.
send_return (bool): Whether to send a return after the data.
"""
self.page.runJavaScript(f"window.term.paste('{data}');")
cmd = f"window.term.paste({json.dumps(data)});"
if self.page is None:
logger.warning("Cannot write to web console: page is not initialized.")
return
self.page.runJavaScript(cmd)
if send_return:
self.send_return()
def take_page_ownership(self, unique_id: str | None = None):
"""
Take ownership of a web page from the registry. This will transfer the page
from its current owner (if any) to this widget.
Args:
unique_id (str): The unique identifier of the page to take ownership of.
If None, uses this widget's unique_id.
"""
if unique_id is None:
unique_id = self._unique_id
if not unique_id:
logger.warning("Cannot take page ownership without a unique_id")
return
# Get the page from registry
page = _web_console_registry.take_page_ownership(unique_id, self.gui_id)
if not page:
logger.warning(f"Page {unique_id} not found in registry")
return
self.page = page
self.browser.setPage(page)
self._set_mode(ConsoleMode.ACTIVE)
logger.info(f"Widget {self.gui_id} took ownership of page {unique_id}")
def _on_ownership_lost(self):
"""
Called when this widget loses ownership of its page.
Displays the overlay and hides the browser.
"""
self._set_mode(ConsoleMode.INACTIVE)
logger.info(f"Widget {self.gui_id} lost ownership of page {self._unique_id}")
def yield_ownership(self):
"""
Yield ownership of the page. The page remains in the registry with no owner,
available for another widget to claim. This is automatically called when the
widget becomes hidden.
"""
if not self._unique_id:
return
success = _web_console_registry.yield_ownership(self.gui_id)
if success:
self._on_ownership_lost()
logger.info(f"Widget {self.gui_id} yielded ownership of page {self._unique_id}")
def has_ownership(self) -> bool:
"""
Check if this widget currently has ownership of a page.
Returns:
bool: True if this widget owns a page, False otherwise.
"""
if not self._unique_id:
return False
page_info = _web_console_registry.get_page_info(self._unique_id)
if page_info is None:
return False
return page_info.owner_gui_id == self.gui_id
def hideEvent(self, event):
"""
Called when the widget is hidden. Automatically yields ownership.
"""
if self.has_ownership():
self.yield_ownership()
self._set_mode(ConsoleMode.HIDDEN)
super().hideEvent(event)
def showEvent(self, event):
"""
Called when the widget is shown. Updates UI state based on ownership.
"""
super().showEvent(event)
if self._unique_id and not self.has_ownership():
# Take ownership if the page does not have an owner or
# the owner is not visible
page_info = _web_console_registry.get_page_info(self._unique_id)
if page_info is None:
self._set_mode(ConsoleMode.INACTIVE)
return
if page_info.owner_gui_id is None or not _web_console_registry.owner_is_visible(
self._unique_id
):
self.take_page_ownership(self._unique_id)
return
if page_info.owner_gui_id != self.gui_id:
self._set_mode(ConsoleMode.INACTIVE)
return
def resizeEvent(self, event: QResizeEvent) -> None:
super().resizeEvent(event)
self.overlay.resize(event.size())
def mousePressEvent(self, event: QMouseEvent) -> None:
if event.button() == Qt.MouseButton.LeftButton and not self.has_ownership():
self.take_page_ownership(self._unique_id)
event.accept()
return
return super().mousePressEvent(event)
def _authenticate(self, _, auth):
"""
Authenticate the request with the provided username and password.
@@ -286,10 +654,52 @@ class WebConsole(BECWidget, QWidget):
super().cleanup()
class BECShell(WebConsole):
"""
A WebConsole pre-configured to run the BEC shell.
We cannot simply expose the web console properties to Qt as we need to have a deterministic
startup behavior for sharing the same shell instance across multiple widgets.
"""
ICON_NAME = "hub"
def __init__(self, parent=None, config=None, client=None, gui_id=None, **kwargs):
super().__init__(
parent=parent,
config=config,
client=client,
gui_id=gui_id,
is_bec_shell=True,
unique_id="bec_shell",
**kwargs,
)
if __name__ == "__main__": # pragma: no cover
import sys
app = QApplication(sys.argv)
widget = WebConsole()
widget = QTabWidget()
# Create two consoles with different unique_ids
web_console1 = WebConsole(startup_cmd="bec --nogui", unique_id="console1")
web_console2 = WebConsole(startup_cmd="htop")
web_console3 = WebConsole(startup_cmd="bec --nogui", unique_id="console1")
widget.addTab(web_console1, "Console 1")
widget.addTab(web_console2, "Console 2")
widget.addTab(web_console3, "Console 3 -- mirror of Console 1")
widget.show()
# Demonstrate page sharing:
# After initialization, web_console2 can take ownership of console1's page:
# web_console2.take_page_ownership("console1")
widget.resize(800, 600)
def _close_cons1():
web_console2.close()
web_console2.deleteLater()
# QTimer.singleShot(3000, _close_cons1)
sys.exit(app.exec_())

View File

@@ -203,6 +203,19 @@ class Heatmap(ImageBase):
"remove_roi",
"rois",
"plot",
# Device properties
"x_device_name",
"x_device_name.setter",
"x_device_entry",
"x_device_entry.setter",
"y_device_name",
"y_device_name.setter",
"y_device_entry",
"y_device_entry.setter",
"z_device_name",
"z_device_name.setter",
"z_device_entry",
"z_device_entry.setter",
]
PLUGIN = True
@@ -413,9 +426,15 @@ class Heatmap(ImageBase):
"""
if self._image_config is None:
return
x_name = self._image_config.x_device.name
y_name = self._image_config.y_device.name
z_name = self._image_config.z_device.name
# Safely get device names (might be None if not yet configured)
x_device = self._image_config.x_device
y_device = self._image_config.y_device
z_device = self._image_config.z_device
x_name = x_device.name if x_device else None
y_name = y_device.name if y_device else None
z_name = z_device.name if z_device else None
if x_name is not None:
self.x_label = x_name # type: ignore
@@ -1136,6 +1155,244 @@ class Heatmap(ImageBase):
self.crosshair.reset()
super().reset()
################################################################################
# Widget Specific Properties
################################################################################
@SafeProperty(str)
def x_device_name(self) -> str:
"""Device name for the X axis."""
if self._image_config.x_device is None:
return ""
return self._image_config.x_device.name or ""
@x_device_name.setter
def x_device_name(self, device_name: str) -> None:
"""
Set the X device name.
Args:
device_name(str): Device name for the X axis
"""
device_name = device_name or ""
# Get current entry or validate
if device_name:
try:
entry = self.entry_validator.validate_signal(device_name, None)
self._image_config.x_device = HeatmapDeviceSignal(name=device_name, entry=entry)
self.property_changed.emit("x_device_name", device_name)
self.update_labels() # Update axis labels
self._try_auto_plot()
except Exception:
pass # Silently fail if device is not available yet
else:
self._image_config.x_device = None
self.property_changed.emit("x_device_name", "")
self.update_labels() # Clear axis labels
@SafeProperty(str)
def x_device_entry(self) -> str:
"""Signal entry for the X axis device."""
if self._image_config.x_device is None:
return ""
return self._image_config.x_device.entry or ""
@x_device_entry.setter
def x_device_entry(self, entry: str) -> None:
"""
Set the X device entry.
Args:
entry(str): Signal entry for the X axis device
"""
if not entry:
return
if self._image_config.x_device is None:
logger.warning("Cannot set x_device_entry without x_device_name set first.")
return
device_name = self._image_config.x_device.name
try:
# Validate the entry for this device
validated_entry = self.entry_validator.validate_signal(device_name, entry)
self._image_config.x_device = HeatmapDeviceSignal(
name=device_name, entry=validated_entry
)
self.property_changed.emit("x_device_entry", validated_entry)
self.update_labels() # Update axis labels
self._try_auto_plot()
except Exception:
pass # Silently fail if validation fails
@SafeProperty(str)
def y_device_name(self) -> str:
"""Device name for the Y axis."""
if self._image_config.y_device is None:
return ""
return self._image_config.y_device.name or ""
@y_device_name.setter
def y_device_name(self, device_name: str) -> None:
"""
Set the Y device name.
Args:
device_name(str): Device name for the Y axis
"""
device_name = device_name or ""
# Get current entry or validate
if device_name:
try:
entry = self.entry_validator.validate_signal(device_name, None)
self._image_config.y_device = HeatmapDeviceSignal(name=device_name, entry=entry)
self.property_changed.emit("y_device_name", device_name)
self.update_labels() # Update axis labels
self._try_auto_plot()
except Exception:
pass # Silently fail if device is not available yet
else:
self._image_config.y_device = None
self.property_changed.emit("y_device_name", "")
self.update_labels() # Clear axis labels
@SafeProperty(str)
def y_device_entry(self) -> str:
"""Signal entry for the Y axis device."""
if self._image_config.y_device is None:
return ""
return self._image_config.y_device.entry or ""
@y_device_entry.setter
def y_device_entry(self, entry: str) -> None:
"""
Set the Y device entry.
Args:
entry(str): Signal entry for the Y axis device
"""
if not entry:
return
if self._image_config.y_device is None:
logger.warning("Cannot set y_device_entry without y_device_name set first.")
return
device_name = self._image_config.y_device.name
try:
# Validate the entry for this device
validated_entry = self.entry_validator.validate_signal(device_name, entry)
self._image_config.y_device = HeatmapDeviceSignal(
name=device_name, entry=validated_entry
)
self.property_changed.emit("y_device_entry", validated_entry)
self.update_labels() # Update axis labels
self._try_auto_plot()
except Exception as e:
logger.debug(f"Y device entry validation failed: {e}")
pass # Silently fail if validation fails
@SafeProperty(str)
def z_device_name(self) -> str:
"""Device name for the Z (color) axis."""
if self._image_config.z_device is None:
return ""
return self._image_config.z_device.name or ""
@z_device_name.setter
def z_device_name(self, device_name: str) -> None:
"""
Set the Z device name.
Args:
device_name(str): Device name for the Z axis
"""
device_name = device_name or ""
# Get current entry or validate
if device_name:
try:
entry = self.entry_validator.validate_signal(device_name, None)
self._image_config.z_device = HeatmapDeviceSignal(name=device_name, entry=entry)
self.property_changed.emit("z_device_name", device_name)
self.update_labels() # Update axis labels (title)
self._try_auto_plot()
except Exception as e:
logger.debug(f"Z device name validation failed: {e}")
pass # Silently fail if device is not available yet
else:
self._image_config.z_device = None
self.property_changed.emit("z_device_name", "")
self.update_labels() # Clear axis labels
@SafeProperty(str)
def z_device_entry(self) -> str:
"""Signal entry for the Z (color) axis device."""
if self._image_config.z_device is None:
return ""
return self._image_config.z_device.entry or ""
@z_device_entry.setter
def z_device_entry(self, entry: str) -> None:
"""
Set the Z device entry.
Args:
entry(str): Signal entry for the Z axis device
"""
if not entry:
return
if self._image_config.z_device is None:
logger.warning("Cannot set z_device_entry without z_device_name set first.")
return
device_name = self._image_config.z_device.name
try:
# Validate the entry for this device
validated_entry = self.entry_validator.validate_signal(device_name, entry)
self._image_config.z_device = HeatmapDeviceSignal(
name=device_name, entry=validated_entry
)
self.property_changed.emit("z_device_entry", validated_entry)
self.update_labels() # Update axis labels (title)
self._try_auto_plot()
except Exception as e:
logger.debug(f"Z device entry validation failed: {e}")
pass # Silently fail if validation fails
def _try_auto_plot(self) -> None:
"""
Attempt to automatically call plot() if all three devices are set.
Similar to waveform's approach but requires all three devices.
"""
has_x = self._image_config.x_device is not None
has_y = self._image_config.y_device is not None
has_z = self._image_config.z_device is not None
if has_x and has_y and has_z:
x_name = self._image_config.x_device.name
x_entry = self._image_config.x_device.entry
y_name = self._image_config.y_device.name
y_entry = self._image_config.y_device.entry
z_name = self._image_config.z_device.name
z_entry = self._image_config.z_device.entry
try:
self.plot(
x_name=x_name,
y_name=y_name,
z_name=z_name,
x_entry=x_entry,
y_entry=y_entry,
z_entry=z_entry,
validate_bec=False, # Don't validate - entries already validated
)
except Exception as e:
logger.debug(f"Auto-plot failed: {e}")
pass # Silently fail if plot cannot be called yet
@SafeProperty(str)
def interpolation_method(self) -> str:
"""

View File

@@ -1,7 +1,6 @@
from __future__ import annotations
import os
from typing import TYPE_CHECKING
from qtpy.QtWidgets import QFrame, QScrollArea, QVBoxLayout
@@ -9,11 +8,6 @@ from bec_widgets.utils import UILoader
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.settings_dialog import SettingWidget
if TYPE_CHECKING:
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import (
SignalComboBox,
)
class HeatmapSettings(SettingWidget):
def __init__(self, parent=None, target_widget=None, popup=False, *args, **kwargs):
@@ -120,36 +114,17 @@ class HeatmapSettings(SettingWidget):
getattr(self.target_widget._image_config, "enforce_interpolation", False)
)
def _get_signal_name(self, signal: SignalComboBox) -> str:
"""
Get the signal name from the signal combobox.
Args:
signal (SignalComboBox): The signal combobox to get the name from.
Returns:
str: The signal name.
"""
device_entry = signal.currentText()
index = signal.findText(device_entry)
if index == -1:
return device_entry
device_entry_info = signal.itemData(index)
if device_entry_info:
device_entry = device_entry_info.get("obj_name", device_entry)
return device_entry if device_entry else ""
@SafeSlot()
def accept_changes(self):
"""
Apply all properties from the settings widget to the target widget.
"""
x_name = self.ui.x_name.currentText()
x_entry = self._get_signal_name(self.ui.x_entry)
x_entry = self.ui.x_entry.get_signal_name()
y_name = self.ui.y_name.currentText()
y_entry = self._get_signal_name(self.ui.y_entry)
y_entry = self.ui.y_entry.get_signal_name()
z_name = self.ui.z_name.currentText()
z_entry = self._get_signal_name(self.ui.z_entry)
z_entry = self.ui.z_entry.get_signal_name()
validate_bec = self.ui.validate_bec.checked
color_map = self.ui.color_map.colormap
interpolation = self.ui.interpolation.currentText()

View File

@@ -7,8 +7,8 @@ import numpy as np
from bec_lib import bec_logger
from bec_lib.endpoints import MessageEndpoints
from pydantic import BaseModel, Field, field_validator
from qtpy.QtCore import Qt, QTimer
from qtpy.QtWidgets import QComboBox, QStyledItemDelegate, QWidget
from qtpy.QtCore import QTimer
from qtpy.QtWidgets import QComboBox, QWidget
from bec_widgets.utils import ConnectionConfig
from bec_widgets.utils.colors import Colors
@@ -49,6 +49,9 @@ class ImageLayerConfig(BaseModel):
source: Literal["device_monitor_1d", "device_monitor_2d", "auto"] = Field(
"auto", description="The source of the image data."
)
async_signal_name: str | None = Field(
None, description="Async signal name (obj_name) used for async endpoints."
)
class Image(ImageBase):
@@ -95,6 +98,7 @@ class Image(ImageBase):
"remove_roi",
"rois",
]
SUPPORTED_SIGNALS = ["AsyncSignal", "AsyncMultiSignal", "DynamicSignal"]
def __init__(
self,
@@ -116,7 +120,14 @@ class Image(ImageBase):
)
self._init_toolbar_image()
self.layer_removed.connect(self._on_layer_removed)
self.old_scan_id = None
self.scan_id = None
self.async_update = False
self.bec_dispatcher.connect_slot(self.on_scan_status, MessageEndpoints.scan_status())
self.bec_dispatcher.connect_slot(self.on_scan_progress, MessageEndpoints.scan_progress())
self.bec_dispatcher.connect_slot(
self._populate_signals, MessageEndpoints.device_config_update()
)
##################################
### Toolbar Initialization
@@ -181,20 +192,35 @@ class Image(ImageBase):
Adjust the size of the device combo box and populate it with preview signals.
Has to be done with QTimer.singleShot to ensure the UI is fully initialized, needed for testing.
"""
self._populate_preview_signals()
self._reverse_device_items()
self.device_combo_box.setCurrentText("") # set again default to empty string
self._populate_signals()
def _populate_preview_signals(self) -> None:
@SafeSlot(dict, dict)
def _populate_signals(self, data: dict | None = None, meta: dict | None = None) -> None:
"""
Populate the device combo box with preview-signal devices in the
format '<device>_<signal>' and store the tuple(device, signal) in
the item's userData for later use.
(Re)populate the device combo box with preview/async signals,
matching the initial setup logic.
"""
self.device_combo_box.blockSignals(True)
self.device_combo_box.clear()
# Rebuild base device list via the combobox' own filtering logic
self.device_combo_box.update_devices_from_filters()
base_count = self.device_combo_box.count()
# Place an empty default entry between base devices and signal entries
self.device_combo_box.insertItem(base_count, "", None)
preview_signals = self.client.device_manager.get_bec_signals("PreviewSignal")
for device, signal, signal_config in preview_signals:
label = signal_config.get("obj_name", f"{device}_{signal}")
async_signals = self.client.device_manager.get_bec_signals(self.SUPPORTED_SIGNALS)
all_signals = preview_signals + async_signals
for device, signal, signal_config in all_signals:
describe = signal_config.get("describe") or {}
signal_info = describe.get("signal_info") or {}
ndim = signal_info.get("ndim", 0)
if ndim == 0:
continue
label = signal_config.get("storage_name", f"{device}_{signal}")
self.device_combo_box.addItem(label, (device, signal, signal_config))
self.device_combo_box.setCurrentText("")
self.device_combo_box.blockSignals(False)
def _reverse_device_items(self) -> None:
"""
@@ -422,51 +448,93 @@ class Image(ImageBase):
"""
# TODO consider moving connecting and disconnecting logic to Image itself if multiple images
self.async_update = False
config = self.subscriptions["main"]
needs_async_setup = False
config.async_signal_name = None
if isinstance(monitor, (list, tuple)):
device = self.dev[monitor[0]]
signal = monitor[1]
try:
device = self.dev[monitor[0]]
except KeyError:
logger.warning(f"Device '{monitor[0]}' not found; cannot connect monitor.")
return
# signal = monitor[1]
signal = self._check_async_signal_found(monitor[0], monitor[1])
if len(monitor) == 3:
signal_config = monitor[2]
else:
signal_config = device._info["signals"][signal]
try:
signal_config = device._info["signals"][signal]
except KeyError:
logger.warning(f"Signal '{signal}' not found on device '{device.name}'.")
return
signal_class = signal_config.get("signal_class", None)
if signal_class != "PreviewSignal":
logger.warning(f"Signal '{monitor}' is not a PreviewSignal.")
allowed_signal_classes = ["PreviewSignal"] + self.SUPPORTED_SIGNALS
if signal_class not in allowed_signal_classes:
logger.warning(
f"Signal `{monitor}` is not a PreviewSignal or a supported async signal."
)
return
ndim = signal_config.get("describe", None).get("signal_info", None).get("ndim", None)
describe = signal_config.get("describe") or {}
signal_info = describe.get("signal_info") or {}
ndim = signal_info.get("ndim", None)
if ndim is None:
logger.warning(
f"Signal '{monitor}' does not have a valid 'ndim' in its signal_info."
)
return
if ndim == 1:
self.bec_dispatcher.connect_slot(
self.on_image_update_1d, MessageEndpoints.device_preview(device.name, signal)
)
self.subscriptions["main"].source = "device_monitor_1d"
self.subscriptions["main"].monitor_type = "1d"
config.source = "device_monitor_1d"
config.monitor_type = "1d"
if signal_class == "PreviewSignal":
self.bec_dispatcher.connect_slot(
self.on_image_update_1d,
MessageEndpoints.device_preview(device.name, signal),
)
elif signal_class in self.SUPPORTED_SIGNALS:
self.async_update = True
needs_async_setup = True
config.async_signal_name = signal_config.get(
"obj_name", f"{device.name}_{signal}"
)
else:
logger.warning(f"Unsupported signal class '{signal_class}' for 1D monitor.")
return
elif ndim == 2:
self.bec_dispatcher.connect_slot(
self.on_image_update_2d, MessageEndpoints.device_preview(device.name, signal)
)
self.subscriptions["main"].source = "device_monitor_2d"
self.subscriptions["main"].monitor_type = "2d"
config.source = "device_monitor_2d"
config.monitor_type = "2d"
if signal_class == "PreviewSignal":
self.bec_dispatcher.connect_slot(
self.on_image_update_2d,
MessageEndpoints.device_preview(device.name, signal),
)
elif signal_class in self.SUPPORTED_SIGNALS:
self.async_update = True
needs_async_setup = True
config.async_signal_name = signal_config.get(
"obj_name", f"{device.name}_{signal}"
)
else:
logger.warning(f"Unsupported signal class '{signal_class}' for 2D monitor.")
return
else:
logger.warning(f"Unsupported ndim '{ndim}' for monitor '{monitor}'.")
return
else: # FIXME old monitor 1d/2d endpoint handling, present for backwards compatibility, will be removed in future versions
if type == "1d":
self.bec_dispatcher.connect_slot(
self.on_image_update_1d, MessageEndpoints.device_monitor_1d(monitor)
)
self.subscriptions["main"].source = "device_monitor_1d"
self.subscriptions["main"].monitor_type = "1d"
config.source = "device_monitor_1d"
config.monitor_type = "1d"
elif type == "2d":
self.bec_dispatcher.connect_slot(
self.on_image_update_2d, MessageEndpoints.device_monitor_2d(monitor)
)
self.subscriptions["main"].source = "device_monitor_2d"
self.subscriptions["main"].monitor_type = "2d"
config.source = "device_monitor_2d"
config.monitor_type = "2d"
elif type == "auto":
self.bec_dispatcher.connect_slot(
self.on_image_update_1d, MessageEndpoints.device_monitor_1d(monitor)
@@ -474,14 +542,141 @@ class Image(ImageBase):
self.bec_dispatcher.connect_slot(
self.on_image_update_2d, MessageEndpoints.device_monitor_2d(monitor)
)
self.subscriptions["main"].source = "auto"
config.source = "auto"
logger.warning(
f"Updates for '{monitor}' will be fetch from both 1D and 2D monitor endpoints."
)
self.subscriptions["main"].monitor_type = "auto"
config.monitor_type = "auto"
config.monitor = monitor
if needs_async_setup:
self._setup_async_image(self.scan_id)
logger.info(f"Connected to {monitor} with type {type}")
self.subscriptions["main"].monitor = monitor
@SafeSlot(dict, dict)
def on_scan_status(self, msg: dict, meta: dict):
"""
Initial scan status message handler, which is triggered at the begging and end of scan.
Needed for setup of AsyncSignal connections.
Args:
msg(dict): The message content.
meta(dict): The message metadata.
"""
current_scan_id = msg.get("scan_id", None)
if current_scan_id is None:
return
self._handle_scan_change(current_scan_id)
@SafeSlot(dict, dict)
def on_scan_progress(self, msg: dict, meta: dict):
"""
For setting async image readback during scan progress updates if widget is started later than scan.
Args:
msg(dict): The message content.
meta(dict): The message metadata.
"""
current_scan_id = meta.get("scan_id", None)
if current_scan_id is None:
return
self._handle_scan_change(current_scan_id)
def _handle_scan_change(self, current_scan_id: str):
"""
Update internal scan ids and refresh async connections if needed.
Args:
current_scan_id (str): The current scan identifier.
"""
if current_scan_id == self.scan_id:
return
self.old_scan_id = self.scan_id
self.scan_id = current_scan_id
if self.async_update:
self._setup_async_image(scan_id=self.scan_id)
def _get_async_signal_name(self) -> tuple[str, str] | None:
"""
Returns device name and async signal name used for endpoints/messages.
Returns:
tuple[str, str] | None: (device_name, async_signal_name) or None if not available.
"""
config = self.subscriptions["main"]
monitor = config.monitor
if monitor is None or not isinstance(monitor, (list, tuple)) or len(monitor) < 2:
return None
device_name = monitor[0]
async_signal = self._check_async_signal_found(
name=device_name, signal=config.async_signal_name or monitor[1]
)
return device_name, async_signal
def _check_async_signal_found(self, name: str, signal: str) -> str:
"""
Check if the async signal is found in the BEC device manager.
Args:
name(str): The name of the async signal.
signal(str): The entry of the async signal.
Returns:
tuple[bool, str]: A tuple where the first element is True if the async signal is found (False otherwise),
and the second element is the signal name (either the original signal or the storage_name for AsyncMultiSignal).
"""
bec_async_signals = self.client.device_manager.get_bec_signals(self.SUPPORTED_SIGNALS)
for entry_name, _, entry_data in bec_async_signals:
if entry_name == name and entry_data.get("obj_name") == signal:
return entry_data.get("storage_name")
return signal
def _setup_async_image(self, scan_id: str | None):
"""
(Re)connect async image readback for the current scan.
Args:
scan_id (str | None): The scan identifier to subscribe to.
"""
if not self.async_update:
return
config = self.subscriptions["main"]
async_names = self._get_async_signal_name()
if async_names is None:
logger.info("Async image setup skipped because monitor information is incomplete.")
return
device_name, async_signal = async_names
if config.monitor_type == "1d":
slot = self.on_image_update_1d
elif config.monitor_type == "2d":
slot = self.on_image_update_2d
else:
logger.warning(
f"Async image setup skipped due to unsupported monitor type '{config.monitor_type}'."
)
return
# Disconnect any previous scan subscriptions to avoid stale updates.
for prev_scan_id in (self.old_scan_id, self.scan_id):
if prev_scan_id is None:
continue
self.bec_dispatcher.disconnect_slot(
slot, MessageEndpoints.device_async_signal(prev_scan_id, device_name, async_signal)
)
if scan_id is None:
logger.info("Scan ID not available yet; delaying async image subscription.")
return
self.bec_dispatcher.connect_slot(
slot,
MessageEndpoints.device_async_signal(scan_id, device_name, async_signal),
from_start=True,
cb_info={"scan_id": scan_id},
)
logger.info(f"Setup async image for {device_name}.{async_signal} and scan {scan_id}.")
def disconnect_monitor(self, monitor: str | tuple):
"""
@@ -490,20 +685,47 @@ class Image(ImageBase):
Args:
monitor(str|tuple): The name of the monitor to disconnect, or a tuple of (device, signal) for preview signals.
"""
config = self.subscriptions["main"]
if isinstance(monitor, (list, tuple)):
if self.subscriptions["main"].source == "device_monitor_1d":
self.bec_dispatcher.disconnect_slot(
self.on_image_update_1d, MessageEndpoints.device_preview(monitor[0], monitor[1])
)
elif self.subscriptions["main"].source == "device_monitor_2d":
self.bec_dispatcher.disconnect_slot(
self.on_image_update_2d, MessageEndpoints.device_preview(monitor[0], monitor[1])
)
if self.async_update:
async_names = self._get_async_signal_name()
ids_to_check = [self.scan_id, self.old_scan_id]
if config.source == "device_monitor_1d":
for scan_id in ids_to_check:
if scan_id is None or async_names is None:
continue
self.bec_dispatcher.disconnect_slot(
self.on_image_update_1d,
MessageEndpoints.device_async_signal(
scan_id, async_names[0], async_names[1]
),
)
elif config.source == "device_monitor_2d":
for scan_id in ids_to_check:
if scan_id is None or async_names is None:
continue
self.bec_dispatcher.disconnect_slot(
self.on_image_update_2d,
MessageEndpoints.device_async_signal(
scan_id, async_names[0], async_names[1]
),
)
else:
logger.warning(
f"Cannot disconnect monitor {monitor} with source {self.subscriptions['main'].source}"
)
return
if config.source == "device_monitor_1d":
self.bec_dispatcher.disconnect_slot(
self.on_image_update_1d,
MessageEndpoints.device_preview(monitor[0], monitor[1]),
)
elif config.source == "device_monitor_2d":
self.bec_dispatcher.disconnect_slot(
self.on_image_update_2d,
MessageEndpoints.device_preview(monitor[0], monitor[1]),
)
else:
logger.warning(
f"Cannot disconnect monitor {monitor} with source {self.subscriptions['main'].source}"
)
return
else: # FIXME old monitor 1d/2d endpoint handling, present for backwards compatibility, will be removed in future versions
self.bec_dispatcher.disconnect_slot(
self.on_image_update_1d, MessageEndpoints.device_monitor_1d(monitor)
@@ -512,6 +734,8 @@ class Image(ImageBase):
self.on_image_update_2d, MessageEndpoints.device_monitor_2d(monitor)
)
self.subscriptions["main"].monitor = None
self.subscriptions["main"].async_signal_name = None
self.async_update = False
self._sync_device_selection()
########################################
@@ -526,7 +750,7 @@ class Image(ImageBase):
msg(dict): The message containing the data.
metadata(dict): The metadata associated with the message.
"""
data = msg["data"]
data = self._get_payload_data(msg)
current_scan_id = metadata.get("scan_id", None)
if current_scan_id is None:
@@ -538,6 +762,9 @@ class Image(ImageBase):
self.main_image.max_len = 0
if self.crosshair is not None:
self.crosshair.reset()
if data is None:
logger.warning("No data received for image update.")
return
image_buffer = self.adjust_image_buffer(self.main_image, data)
if self._color_bar is not None:
self._color_bar.blockSignals(True)
@@ -590,7 +817,10 @@ class Image(ImageBase):
msg(dict): The message containing the data.
metadata(dict): The metadata associated with the message.
"""
data = msg["data"]
data = self._get_payload_data(msg)
if data is None:
logger.warning("No data received for image update.")
return
if self._color_bar is not None:
self._color_bar.blockSignals(True)
self.main_image.set_data(data)
@@ -598,6 +828,22 @@ class Image(ImageBase):
self._color_bar.blockSignals(False)
self.image_updated.emit()
def _get_payload_data(self, msg: dict) -> np.ndarray | None:
"""
Extract payload from async/preview/monitor1D/2D message structures due to inconsistent formats in backend.
Args:
msg (dict): The incoming message containing data.
"""
if not self.async_update:
return msg.get("data")
async_names = self._get_async_signal_name()
if async_names is None:
logger.warning("Async payload extraction failed; monitor info incomplete.")
return None
_, async_signal = async_names
return msg.get("signals", {}).get(async_signal, {}).get("value", None)
################################################################################
# Clean up
################################################################################
@@ -634,6 +880,8 @@ class Image(ImageBase):
self.device_combo_box.deleteLater()
self.dim_combo_box.close()
self.dim_combo_box.deleteLater()
self.bec_dispatcher.disconnect_slot(self.on_scan_status, MessageEndpoints.scan_status())
self.bec_dispatcher.disconnect_slot(self.on_scan_progress, MessageEndpoints.scan_progress())
super().cleanup()

View File

@@ -1,7 +1,5 @@
from __future__ import annotations
import json
import pyqtgraph as pg
from bec_lib import bec_logger
from bec_lib.endpoints import MessageEndpoints
@@ -45,12 +43,24 @@ class ScatterWaveform(PlotBase):
USER_ACCESS = [
*PlotBase.USER_ACCESS,
# Scatter Waveform Specific RPC Access
"main_curve",
"color_map",
"color_map.setter",
"plot",
"update_with_scan_history",
"clear_all",
# Device properties
"x_device_name",
"x_device_name.setter",
"x_device_entry",
"x_device_entry.setter",
"y_device_name",
"y_device_name.setter",
"y_device_entry",
"y_device_entry.setter",
"z_device_name",
"z_device_name.setter",
"z_device_entry",
"z_device_entry.setter",
]
sync_signal_update = Signal()
@@ -93,6 +103,13 @@ class ScatterWaveform(PlotBase):
)
self._init_scatter_curve_settings()
# Show toolbar bundles - only include scatter_waveform_settings if not in SIDE mode
shown_bundles = ["plot_export", "mouse_interaction", "roi", "axis_popup"]
if self.ui_mode != UIMode.SIDE:
shown_bundles.insert(0, "scatter_waveform_settings")
self.toolbar.show_bundles(shown_bundles)
self.update_with_scan_history(-1)
################################################################################
@@ -121,15 +138,9 @@ class ScatterWaveform(PlotBase):
checkable=True,
parent=self,
)
self.toolbar.components.add_safe("scatter_waveform_settings", scatter_curve_action)
self.toolbar.get_bundle("axis_popup").add_action("scatter_waveform_settings")
self.toolbar.add_action("scatter_waveform_settings", scatter_curve_action)
scatter_curve_action.action.triggered.connect(self.show_scatter_curve_settings)
shown_bundles = self.toolbar.shown_bundles
if "performance" in shown_bundles:
shown_bundles.remove("performance")
self.toolbar.show_bundles(shown_bundles)
def show_scatter_curve_settings(self):
"""
Show the scatter curve settings dialog.
@@ -145,7 +156,7 @@ class ScatterWaveform(PlotBase):
window_title="Scatter Curve Settings",
modal=False,
)
self.scatter_dialog.resize(620, 200)
self.scatter_dialog.resize(700, 240)
# When the dialog is closed, update the toolbar icon and clear the reference
self.scatter_dialog.finished.connect(self._scatter_dialog_closed)
self.scatter_dialog.show()
@@ -191,27 +202,6 @@ class ScatterWaveform(PlotBase):
except ValidationError:
return
@SafeProperty(str, designable=False, popup_error=True)
def curve_json(self) -> str:
"""
Get the curve configuration as a JSON string.
"""
return json.dumps(self.main_curve.config.model_dump(), indent=2)
@curve_json.setter
def curve_json(self, value: str):
"""
Set the curve configuration from a JSON string.
Args:
value(str): The JSON string to set the curve configuration from.
"""
try:
config = ScatterCurveConfig(**json.loads(value))
self._add_main_scatter_curve(config)
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON: {e}")
################################################################################
# High Level methods for API
################################################################################
@@ -285,10 +275,6 @@ class ScatterWaveform(PlotBase):
Args:
config(ScatterCurveConfig): The configuration of the scatter curve.
"""
# Apply suffix for axes
self.set_x_label_suffix(f"[{config.x_device.name}-{config.x_device.name}]")
self.set_y_label_suffix(f"[{config.y_device.name}-{config.y_device.name}]")
# To have only one main curve
if self._main_curve is not None:
self.rpc_register.remove_rpc(self._main_curve)
@@ -298,6 +284,9 @@ class ScatterWaveform(PlotBase):
self._main_curve = None
self._main_curve = ScatterCurve(parent_item=self, config=config, name=config.label)
# Update axis labels (matching Heatmap's label policy)
self.update_labels()
self.plot_item.addItem(self._main_curve)
self.sync_signal_update.emit()
@@ -405,6 +394,284 @@ class ScatterWaveform(PlotBase):
scan_devices = self.scan_item.devices
return scan_devices, "value"
################################################################################
# Widget Specific Properties
################################################################################
@SafeProperty(str)
def x_device_name(self) -> str:
"""Device name for the X axis."""
if self._main_curve is None or self._main_curve.config.x_device is None:
return ""
return self._main_curve.config.x_device.name or ""
@x_device_name.setter
def x_device_name(self, device_name: str) -> None:
"""
Set the X device name.
Args:
device_name(str): Device name for the X axis
"""
device_name = device_name or ""
if device_name:
try:
entry = self.entry_validator.validate_signal(device_name, None)
# Update or create config
if self._main_curve.config.x_device is None:
self._main_curve.config.x_device = ScatterDeviceSignal(
name=device_name, entry=entry
)
else:
self._main_curve.config.x_device.name = device_name
self._main_curve.config.x_device.entry = entry
self.property_changed.emit("x_device_name", device_name)
self.update_labels()
self._try_auto_plot()
except Exception:
pass # Silently fail if device is not available yet
else:
if self._main_curve.config.x_device is not None:
self._main_curve.config.x_device = None
self.property_changed.emit("x_device_name", "")
self.update_labels()
@SafeProperty(str)
def x_device_entry(self) -> str:
"""Signal entry for the X axis device."""
if self._main_curve is None or self._main_curve.config.x_device is None:
return ""
return self._main_curve.config.x_device.entry or ""
@x_device_entry.setter
def x_device_entry(self, entry: str) -> None:
"""
Set the X device entry.
Args:
entry(str): Signal entry for the X axis device
"""
if not entry:
return
if self._main_curve.config.x_device is None:
logger.warning("Cannot set x_device_entry without x_device_name set first.")
return
device_name = self._main_curve.config.x_device.name
try:
validated_entry = self.entry_validator.validate_signal(device_name, entry)
self._main_curve.config.x_device.entry = validated_entry
self.property_changed.emit("x_device_entry", validated_entry)
self.update_labels()
self._try_auto_plot()
except Exception:
pass # Silently fail if validation fails
@SafeProperty(str)
def y_device_name(self) -> str:
"""Device name for the Y axis."""
if self._main_curve is None or self._main_curve.config.y_device is None:
return ""
return self._main_curve.config.y_device.name or ""
@y_device_name.setter
def y_device_name(self, device_name: str) -> None:
"""
Set the Y device name.
Args:
device_name(str): Device name for the Y axis
"""
device_name = device_name or ""
if device_name:
try:
entry = self.entry_validator.validate_signal(device_name, None)
# Update or create config
if self._main_curve.config.y_device is None:
self._main_curve.config.y_device = ScatterDeviceSignal(
name=device_name, entry=entry
)
else:
self._main_curve.config.y_device.name = device_name
self._main_curve.config.y_device.entry = entry
self.property_changed.emit("y_device_name", device_name)
self.update_labels()
self._try_auto_plot()
except Exception:
pass # Silently fail if device is not available yet
else:
if self._main_curve.config.y_device is not None:
self._main_curve.config.y_device = None
self.property_changed.emit("y_device_name", "")
self.update_labels()
@SafeProperty(str)
def y_device_entry(self) -> str:
"""Signal entry for the Y axis device."""
if self._main_curve is None or self._main_curve.config.y_device is None:
return ""
return self._main_curve.config.y_device.entry or ""
@y_device_entry.setter
def y_device_entry(self, entry: str) -> None:
"""
Set the Y device entry.
Args:
entry(str): Signal entry for the Y axis device
"""
if not entry:
return
if self._main_curve.config.y_device is None:
logger.warning("Cannot set y_device_entry without y_device_name set first.")
return
device_name = self._main_curve.config.y_device.name
try:
validated_entry = self.entry_validator.validate_signal(device_name, entry)
self._main_curve.config.y_device.entry = validated_entry
self.property_changed.emit("y_device_entry", validated_entry)
self.update_labels()
self._try_auto_plot()
except Exception:
pass # Silently fail if validation fails
@SafeProperty(str)
def z_device_name(self) -> str:
"""Device name for the Z (color) axis."""
if self._main_curve is None or self._main_curve.config.z_device is None:
return ""
return self._main_curve.config.z_device.name or ""
@z_device_name.setter
def z_device_name(self, device_name: str) -> None:
"""
Set the Z device name.
Args:
device_name(str): Device name for the Z axis
"""
device_name = device_name or ""
if device_name:
try:
entry = self.entry_validator.validate_signal(device_name, None)
# Update or create config
if self._main_curve.config.z_device is None:
self._main_curve.config.z_device = ScatterDeviceSignal(
name=device_name, entry=entry
)
else:
self._main_curve.config.z_device.name = device_name
self._main_curve.config.z_device.entry = entry
self.property_changed.emit("z_device_name", device_name)
self.update_labels()
self._try_auto_plot()
except Exception:
pass # Silently fail if device is not available yet
else:
if self._main_curve.config.z_device is not None:
self._main_curve.config.z_device = None
self.property_changed.emit("z_device_name", "")
self.update_labels()
@SafeProperty(str)
def z_device_entry(self) -> str:
"""Signal entry for the Z (color) axis device."""
if self._main_curve is None or self._main_curve.config.z_device is None:
return ""
return self._main_curve.config.z_device.entry or ""
@z_device_entry.setter
def z_device_entry(self, entry: str) -> None:
"""
Set the Z device entry.
Args:
entry(str): Signal entry for the Z axis device
"""
if not entry:
return
if self._main_curve.config.z_device is None:
logger.warning("Cannot set z_device_entry without z_device_name set first.")
return
device_name = self._main_curve.config.z_device.name
try:
validated_entry = self.entry_validator.validate_signal(device_name, entry)
self._main_curve.config.z_device.entry = validated_entry
self.property_changed.emit("z_device_entry", validated_entry)
self.update_labels()
self._try_auto_plot()
except Exception:
pass # Silently fail if validation fails
def _try_auto_plot(self) -> None:
"""
Attempt to automatically call plot() if all three devices are set.
"""
has_x = self._main_curve.config.x_device is not None
has_y = self._main_curve.config.y_device is not None
has_z = self._main_curve.config.z_device is not None
if has_x and has_y and has_z:
x_name = self._main_curve.config.x_device.name
x_entry = self._main_curve.config.x_device.entry
y_name = self._main_curve.config.y_device.name
y_entry = self._main_curve.config.y_device.entry
z_name = self._main_curve.config.z_device.name
z_entry = self._main_curve.config.z_device.entry
try:
self.plot(
x_name=x_name,
y_name=y_name,
z_name=z_name,
x_entry=x_entry,
y_entry=y_entry,
z_entry=z_entry,
validate_bec=False, # Don't validate - entries already validated
)
except Exception as e:
logger.debug(f"Auto-plot failed: {e}")
pass # Silently fail if plot cannot be called yet
def update_labels(self):
"""
Update the labels of the x and y axes based on current device configuration.
"""
if self._main_curve is None:
return
config = self._main_curve.config
# Safely get device names
x_device = config.x_device
y_device = config.y_device
x_name = x_device.name if x_device else None
y_name = y_device.name if y_device else None
if x_name is not None:
self.x_label = x_name # type: ignore
x_dev = self.dev.get(x_name)
if x_dev and hasattr(x_dev, "egu"):
self.x_label_units = x_dev.egu()
if y_name is not None:
self.y_label = y_name # type: ignore
y_dev = self.dev.get(y_name)
if y_dev and hasattr(y_dev, "egu"):
self.y_label_units = y_dev.egu()
################################################################################
# Scan History
################################################################################
@SafeSlot(int)
@SafeSlot(str)
@SafeSlot()

View File

@@ -86,29 +86,29 @@ class ScatterCurveSettings(SettingWidget):
if hasattr(self.ui, "x_name"):
self.ui.x_name.set_device(x_name)
if hasattr(self.ui, "x_entry") and x_entry is not None:
self.ui.x_entry.setText(x_entry)
self.ui.x_entry.set_to_obj_name(x_entry)
if hasattr(self.ui, "y_name"):
self.ui.y_name.set_device(y_name)
if hasattr(self.ui, "y_entry") and y_entry is not None:
self.ui.y_entry.setText(y_entry)
self.ui.y_entry.set_to_obj_name(y_entry)
if hasattr(self.ui, "z_name"):
self.ui.z_name.set_device(z_name)
if hasattr(self.ui, "z_entry") and z_entry is not None:
self.ui.z_entry.setText(z_entry)
self.ui.z_entry.set_to_obj_name(z_entry)
@SafeSlot()
def accept_changes(self):
"""
Apply all properties from the settings widget to the target widget.
"""
x_name = self.ui.x_name.text()
x_entry = self.ui.x_entry.text()
y_name = self.ui.y_name.text()
y_entry = self.ui.y_entry.text()
z_name = self.ui.z_name.text()
z_entry = self.ui.z_entry.text()
x_name = self.ui.x_name.currentText()
x_entry = self.ui.x_entry.get_signal_name()
y_name = self.ui.y_name.currentText()
y_entry = self.ui.y_entry.get_signal_name()
z_name = self.ui.z_name.currentText()
z_entry = self.ui.z_entry.get_signal_name()
validate_bec = self.ui.validate_bec.checked
color_map = self.ui.color_map.colormap

View File

@@ -6,8 +6,8 @@
<rect>
<x>0</x>
<y>0</y>
<width>604</width>
<height>166</height>
<width>826</width>
<height>204</height>
</rect>
</property>
<property name="windowTitle">
@@ -31,6 +31,13 @@
</item>
</layout>
</item>
<item>
<widget class="Line" name="line">
<property name="orientation">
<enum>Qt::Orientation::Horizontal</enum>
</property>
</widget>
</item>
<item>
<layout class="QHBoxLayout" name="horizontalLayout">
<item>
@@ -46,9 +53,6 @@
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="DeviceLineEdit" name="x_name"/>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label_2">
<property name="text">
@@ -56,8 +60,22 @@
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="DeviceComboBox" name="x_name">
<property name="editable">
<bool>true</bool>
</property>
<property name="set_first_element_as_empty" stdset="0">
<bool>true</bool>
</property>
</widget>
</item>
<item row="1" column="1">
<widget class="QLineEdit" name="x_entry"/>
<widget class="SignalComboBox" name="x_entry">
<property name="editable">
<bool>true</bool>
</property>
</widget>
</item>
</layout>
</widget>
@@ -75,9 +93,6 @@
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="DeviceLineEdit" name="y_name"/>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label_4">
<property name="text">
@@ -85,8 +100,22 @@
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="DeviceComboBox" name="y_name">
<property name="editable">
<bool>true</bool>
</property>
<property name="set_first_element_as_empty" stdset="0">
<bool>true</bool>
</property>
</widget>
</item>
<item row="1" column="1">
<widget class="QLineEdit" name="y_entry"/>
<widget class="SignalComboBox" name="y_entry">
<property name="editable">
<bool>true</bool>
</property>
</widget>
</item>
</layout>
</widget>
@@ -111,11 +140,22 @@
</property>
</widget>
</item>
<item row="1" column="1">
<widget class="QLineEdit" name="z_entry"/>
</item>
<item row="0" column="1">
<widget class="DeviceLineEdit" name="z_name"/>
<widget class="DeviceComboBox" name="z_name">
<property name="editable">
<bool>true</bool>
</property>
<property name="set_first_element_as_empty" stdset="0">
<bool>true</bool>
</property>
</widget>
</item>
<item row="1" column="1">
<widget class="SignalComboBox" name="z_entry">
<property name="editable">
<bool>true</bool>
</property>
</widget>
</item>
</layout>
</widget>
@@ -125,77 +165,130 @@
</layout>
</widget>
<customwidgets>
<customwidget>
<class>DeviceLineEdit</class>
<extends>QLineEdit</extends>
<header>device_line_edit</header>
</customwidget>
<customwidget>
<class>ToggleSwitch</class>
<extends>QWidget</extends>
<extends></extends>
<header>toggle_switch</header>
</customwidget>
<customwidget>
<class>BECColorMapWidget</class>
<extends>QWidget</extends>
<extends></extends>
<header>bec_color_map_widget</header>
</customwidget>
<customwidget>
<class>DeviceComboBox</class>
<extends></extends>
<header>device_combo_box</header>
</customwidget>
<customwidget>
<class>SignalComboBox</class>
<extends></extends>
<header>signal_combo_box</header>
</customwidget>
</customwidgets>
<tabstops>
<tabstop>x_name</tabstop>
<tabstop>x_entry</tabstop>
<tabstop>y_name</tabstop>
<tabstop>y_entry</tabstop>
<tabstop>z_name</tabstop>
<tabstop>x_entry</tabstop>
<tabstop>y_entry</tabstop>
<tabstop>z_entry</tabstop>
</tabstops>
<resources/>
<connections>
<connection>
<sender>x_name</sender>
<signal>textChanged(QString)</signal>
<signal>device_reset()</signal>
<receiver>x_entry</receiver>
<slot>clear()</slot>
<slot>reset_selection()</slot>
<hints>
<hint type="sourcelabel">
<x>134</x>
<y>95</y>
<x>136</x>
<y>122</y>
</hint>
<hint type="destinationlabel">
<x>138</x>
<y>128</y>
<x>133</x>
<y>151</y>
</hint>
</hints>
</connection>
<connection>
<sender>y_name</sender>
<signal>textChanged(QString)</signal>
<signal>device_reset()</signal>
<receiver>y_entry</receiver>
<slot>clear()</slot>
<slot>reset_selection()</slot>
<hints>
<hint type="sourcelabel">
<x>351</x>
<y>91</y>
<x>412</x>
<y>122</y>
</hint>
<hint type="destinationlabel">
<x>349</x>
<y>121</y>
<x>409</x>
<y>151</y>
</hint>
</hints>
</connection>
<connection>
<sender>z_name</sender>
<signal>textChanged(QString)</signal>
<signal>device_reset()</signal>
<receiver>z_entry</receiver>
<slot>clear()</slot>
<slot>reset_selection()</slot>
<hints>
<hint type="sourcelabel">
<x>520</x>
<y>98</y>
<x>687</x>
<y>121</y>
</hint>
<hint type="destinationlabel">
<x>522</x>
<y>127</y>
<x>684</x>
<y>149</y>
</hint>
</hints>
</connection>
<connection>
<sender>x_name</sender>
<signal>currentTextChanged(QString)</signal>
<receiver>x_entry</receiver>
<slot>set_device(QString)</slot>
<hints>
<hint type="sourcelabel">
<x>152</x>
<y>123</y>
</hint>
<hint type="destinationlabel">
<x>151</x>
<y>151</y>
</hint>
</hints>
</connection>
<connection>
<sender>y_name</sender>
<signal>currentTextChanged(QString)</signal>
<receiver>y_entry</receiver>
<slot>set_device(QString)</slot>
<hints>
<hint type="sourcelabel">
<x>412</x>
<y>121</y>
</hint>
<hint type="destinationlabel">
<x>409</x>
<y>149</y>
</hint>
</hints>
</connection>
<connection>
<sender>z_name</sender>
<signal>currentTextChanged(QString)</signal>
<receiver>z_entry</receiver>
<slot>set_device(QString)</slot>
<hints>
<hint type="sourcelabel">
<x>687</x>
<y>121</y>
</hint>
<hint type="destinationlabel">
<x>684</x>
<y>149</y>
</hint>
</hints>
</connection>

View File

@@ -44,11 +44,7 @@ def test_rpc_plotting_shortcuts_init_configs(qtbot, connected_client_gui_obj):
im.image(monitor="eiger")
mm.map(x_name="samx", y_name="samy")
sw.plot(x_name="samx", y_name="samy", z_name="bpm4i")
assert sw.main_curve.object_name == "bpm4i_bpm4i"
# Create a new curve on the scatter waveform should replace the old one
sw.plot(x_name="samx", y_name="samy", z_name="bpm4a")
assert sw.main_curve.object_name == "bpm4a_bpm4a"
mw.plot(monitor="waveform")
# Adding multiple custom curves sho

View File

@@ -2208,7 +2208,7 @@ class TestFlatToolbarActions:
"flat_status": "BECStatusBox",
"flat_progress_bar": "RingProgressBar",
"flat_terminal": "WebConsole",
"flat_bec_shell": "WebConsole",
"flat_bec_shell": "BECShell",
"flat_sbb_monitor": "SBBMonitor",
}

View File

@@ -1,18 +1,11 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
import gc
import time
from functools import partial
from multiprocessing import Process
from unittest.mock import MagicMock, call, patch
import pytest
from PySide6.QtWidgets import QWidget
from qtpy.QtCore import QObject
from qtpy.QtWidgets import QApplication
from bec_widgets.utils import BECConnector
from bec_widgets.utils.bec_connector import ConnectionConfig
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeSlot as Slot
from .client_mocks import mocked_client
@@ -138,127 +131,3 @@ def test_bec_connector_change_object_name(bec_connector):
# Verify that the object with the previous name is no longer registered
all_objects = bec_connector.rpc_register.list_all_connections().values()
assert not any(obj.objectName() == previous_name for obj in all_objects)
def test_bec_connector_terminate_run_on_about_to_quit(qtbot, bec_connector):
assert BECConnector.EXIT_HANDLERS.get(0) is not None
terminate_mock = MagicMock()
bec_connector.__class__.EXIT_HANDLERS[0] = terminate_mock
bec_connector._run_exit_handlers()
qtbot.waitUntil(lambda: terminate_mock.call_count == 1)
def test_bec_connector_terminate_run_once_and_only_once(qtbot, bec_connector):
terminate_mock = MagicMock()
bec_connector.__class__.EXIT_HANDLERS[0] = terminate_mock
_conn_2 = BECConnectorQObject(client=mocked_client)
_conn_3 = BECConnectorQObject(client=mocked_client)
bec_connector._run_exit_handlers()
qtbot.waitUntil(lambda: terminate_mock.call_count == 1)
def test_bec_connector_exit_handlers_run_in_order(qtbot, bec_connector):
handler = MagicMock()
bec_connector.__class__.EXIT_HANDLERS[0] = handler
def h1():
handler(prio=1)
def h2():
handler(prio=2)
def h3():
handler(prio=3)
bec_connector._add_exit_handler(h3, 5)
bec_connector._add_exit_handler(h2, 10)
bec_connector._add_exit_handler(h1, 15)
bec_connector._run_exit_handlers()
qtbot.waitUntil(lambda: handler.call_count == 4)
handler.assert_has_calls([call(prio=1), call(prio=2), call(prio=3), call()])
@pytest.fixture
def mock_widget_with_exit_handlers(bec_connector, mocked_client):
with patch.object(mocked_client, "connector", bec_connector):
handler = MagicMock()
bec_connector.__class__.EXIT_HANDLERS[0] = handler
class DropWeakrefWidget(BECWidget, QWidget):
def __init__(
self,
client=None,
config: ConnectionConfig = None,
gui_id: str | None = None,
theme_update: bool = False,
start_busy: bool = False,
busy_text: str = "Loading…",
**kwargs,
):
super().__init__(
client, config, gui_id, theme_update, start_busy, busy_text, **kwargs
)
self.setup_on_exit()
self.client.connector.add_exit_handler(self._on_exit_stored_ref, 5)
self.client.connector.add_exit_handler(self.instance_on_exit, 7)
def setup_on_exit(self):
def _on_exit():
self.backgroundRole() # access some Qt thing just to fail test if c++ object is deleted
handler("called by DropWeakrefWidget in stored reference to function")
self._on_exit_stored_ref = _on_exit
def instance_on_exit(self):
self.backgroundRole() # access some Qt thing just to fail test if c++ object is deleted
handler("called by DropWeakrefWidget in instance method")
widget = DropWeakrefWidget(client=mocked_client)
return widget, handler
def test_connector_exit_handlers_doesnt_drop_when_widget_lives(
qtbot, bec_connector, mock_widget_with_exit_handlers
):
widget, handler = mock_widget_with_exit_handlers
qtbot.addWidget(widget)
def h1():
handler(prio=1)
bec_connector._add_exit_handler(h1, 15)
bec_connector._run_exit_handlers()
qtbot.waitUntil(lambda: handler.call_count == 4)
handler.assert_has_calls(
[
call(prio=1),
call("called by DropWeakrefWidget in instance method"),
call("called by DropWeakrefWidget in stored reference to function"),
call(), # from root cleanup
]
)
def test_connector_exit_handlers_drops_when_widget_dies(
qtbot, bec_connector, mock_widget_with_exit_handlers
):
widget, handler = mock_widget_with_exit_handlers
qtbot.addWidget(widget)
def h1():
handler(prio=1)
bec_connector._add_exit_handler(h1, 15)
widget.deleteLater()
qtbot.wait(100)
QApplication.processEvents()
del widget
qtbot.wait(100)
gc.collect()
qtbot.wait(100)
bec_connector._run_exit_handlers()
qtbot.waitUntil(lambda: handler.call_count == 2)
handler.assert_has_calls([call(prio=1), call()])

View File

@@ -4,7 +4,6 @@ import pytest
from bec_lib.device import Signal
from qtpy.QtWidgets import QWidget
from bec_widgets.tests.utils import FakeDevice
from bec_widgets.utils.ophyd_kind_util import Kind
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import BECDeviceFilter
from bec_widgets.widgets.control.device_input.base_classes.device_signal_input_base import (
@@ -153,3 +152,61 @@ def test_device_signal_input_base_cleanup(qtbot, mocked_client):
widget.deleteLater()
mocked_client.callbacks.remove.assert_called_once_with(widget._device_update_register)
def test_signal_combobox_get_signal_name_with_item_data(qtbot, device_signal_combobox):
"""Test get_signal_name returns obj_name from item data when available."""
device_signal_combobox.include_normal_signals = True
device_signal_combobox.include_hinted_signals = True
device_signal_combobox.set_device("samx")
# Select a signal that has item data with obj_name
device_signal_combobox.setCurrentText("samx (readback)")
# get_signal_name should return the obj_name from item data
signal_name = device_signal_combobox.get_signal_name()
assert signal_name == "samx"
def test_signal_combobox_get_signal_name_without_item_data(qtbot, device_signal_combobox):
"""Test get_signal_name returns currentText when no item data available."""
# Add a custom item without item data
device_signal_combobox.addItem("custom_signal")
device_signal_combobox.setCurrentText("custom_signal")
signal_name = device_signal_combobox.get_signal_name()
assert signal_name == "custom_signal"
def test_signal_combobox_get_signal_name_not_found(qtbot, device_signal_combobox):
"""Test get_signal_name when text is not found in combobox (index == -1)."""
# Set editable to allow text that's not in items
device_signal_combobox.setEditable(True)
device_signal_combobox.setCurrentText("nonexistent_signal")
signal_name = device_signal_combobox.get_signal_name()
assert signal_name == "nonexistent_signal"
def test_signal_combobox_get_signal_name_empty(qtbot, device_signal_combobox):
"""Test get_signal_name when combobox is empty."""
device_signal_combobox.clear()
device_signal_combobox.setEditable(True)
device_signal_combobox.setCurrentText("")
signal_name = device_signal_combobox.get_signal_name()
assert signal_name == ""
def test_signal_combobox_get_signal_name_with_velocity(qtbot, device_signal_combobox):
"""Test get_signal_name with velocity signal."""
device_signal_combobox.include_normal_signals = True
device_signal_combobox.include_hinted_signals = True
device_signal_combobox.include_config_signals = True
device_signal_combobox.set_device("samx")
# Select velocity signal
device_signal_combobox.setCurrentText("velocity")
signal_name = device_signal_combobox.get_signal_name()
assert signal_name == "samx_velocity"

View File

@@ -597,3 +597,277 @@ def test_finish_interpolation_thread_cleans_references(heatmap_widget):
thread_mock.deleteLater.assert_called_once()
assert heatmap_widget._interpolation_worker is None
assert heatmap_widget._interpolation_thread is None
def test_device_safe_properties_get(heatmap_widget):
"""Test that device SafeProperty getters work correctly."""
# Initially devices should be empty
assert heatmap_widget.x_device_name == ""
assert heatmap_widget.x_device_entry == ""
assert heatmap_widget.y_device_name == ""
assert heatmap_widget.y_device_entry == ""
assert heatmap_widget.z_device_name == ""
assert heatmap_widget.z_device_entry == ""
# Set devices via plot
heatmap_widget.plot(x_name="samx", y_name="samy", z_name="bpm4i")
# Check properties return device names and entries separately
assert heatmap_widget.x_device_name == "samx"
assert heatmap_widget.x_device_entry # Should have some entry
assert heatmap_widget.y_device_name == "samy"
assert heatmap_widget.y_device_entry # Should have some entry
assert heatmap_widget.z_device_name == "bpm4i"
assert heatmap_widget.z_device_entry # Should have some entry
def test_device_safe_properties_set_name(heatmap_widget):
"""Test that device SafeProperty setters work for device names."""
# Set x_device_name - should auto-validate entry
heatmap_widget.x_device_name = "samx"
assert heatmap_widget._image_config.x_device is not None
assert heatmap_widget._image_config.x_device.name == "samx"
assert heatmap_widget._image_config.x_device.entry is not None # Entry should be validated
assert heatmap_widget.x_device_name == "samx"
# Set y_device_name
heatmap_widget.y_device_name = "samy"
assert heatmap_widget._image_config.y_device is not None
assert heatmap_widget._image_config.y_device.name == "samy"
assert heatmap_widget._image_config.y_device.entry is not None
assert heatmap_widget.y_device_name == "samy"
# Set z_device_name
heatmap_widget.z_device_name = "bpm4i"
assert heatmap_widget._image_config.z_device is not None
assert heatmap_widget._image_config.z_device.name == "bpm4i"
assert heatmap_widget._image_config.z_device.entry is not None
assert heatmap_widget.z_device_name == "bpm4i"
def test_device_safe_properties_set_entry(heatmap_widget):
"""Test that device entry properties can override default entries."""
# Set device name first - this auto-validates entry
heatmap_widget.x_device_name = "samx"
initial_entry = heatmap_widget.x_device_entry
assert initial_entry # Should have auto-validated entry
# Override with specific entry
heatmap_widget.x_device_entry = "samx"
assert heatmap_widget._image_config.x_device.entry == "samx"
assert heatmap_widget.x_device_entry == "samx"
# Same for y device
heatmap_widget.y_device_name = "samy"
heatmap_widget.y_device_entry = "samy_setpoint"
assert heatmap_widget._image_config.y_device.entry == "samy_setpoint"
# Same for z device
heatmap_widget.z_device_name = "bpm4i"
heatmap_widget.z_device_entry = "bpm4i"
assert heatmap_widget._image_config.z_device.entry == "bpm4i"
def test_device_entry_cannot_be_set_without_name(heatmap_widget):
"""Test that setting entry without device name logs warning and does nothing."""
# Try to set entry without device name
heatmap_widget.x_device_entry = "some_entry"
# Should not crash, entry should remain empty
assert heatmap_widget.x_device_entry == ""
assert heatmap_widget._image_config.x_device is None
def test_device_safe_properties_set_empty(heatmap_widget):
"""Test that device SafeProperty setters handle empty strings."""
# Set device first
heatmap_widget.x_device_name = "samx"
assert heatmap_widget._image_config.x_device is not None
# Set to empty string - should clear the device
heatmap_widget.x_device_name = ""
assert heatmap_widget.x_device_name == ""
assert heatmap_widget._image_config.x_device is None
def test_device_safe_properties_auto_plot(heatmap_widget):
"""Test that setting all three devices triggers auto-plot."""
# Set all three devices
heatmap_widget.x_device_name = "samx"
heatmap_widget.y_device_name = "samy"
heatmap_widget.z_device_name = "bpm4i"
# Check that plot was called (image_config should be updated)
assert heatmap_widget._image_config.x_device is not None
assert heatmap_widget._image_config.y_device is not None
assert heatmap_widget._image_config.z_device is not None
def test_device_properties_update_labels(heatmap_widget):
"""Test that setting device properties updates axis labels."""
# Set x device - should update x label
heatmap_widget.x_device_name = "samx"
assert heatmap_widget.x_label == "samx"
# Set y device - should update y label
heatmap_widget.y_device_name = "samy"
assert heatmap_widget.y_label == "samy"
# Set z device - should update title
heatmap_widget.z_device_name = "bpm4i"
assert heatmap_widget.title == "bpm4i"
def test_device_properties_partial_configuration(heatmap_widget):
"""Test that widget handles partial device configuration gracefully."""
# Set only x device
heatmap_widget.x_device_name = "samx"
assert heatmap_widget.x_device_name == "samx"
assert heatmap_widget.y_device_name == ""
assert heatmap_widget.z_device_name == ""
# Set only y device (x already set)
heatmap_widget.y_device_name = "samy"
assert heatmap_widget.x_device_name == "samx"
assert heatmap_widget.y_device_name == "samy"
assert heatmap_widget.z_device_name == ""
# Auto-plot should not trigger yet (z missing)
# But devices should be configured
assert heatmap_widget._image_config.x_device is not None
assert heatmap_widget._image_config.y_device is not None
def test_device_properties_in_user_access(heatmap_widget):
"""Test that device properties are exposed in USER_ACCESS for RPC."""
from bec_widgets.widgets.plots.heatmap.heatmap import Heatmap
assert "x_device_name" in Heatmap.USER_ACCESS
assert "x_device_name.setter" in Heatmap.USER_ACCESS
assert "x_device_entry" in Heatmap.USER_ACCESS
assert "x_device_entry.setter" in Heatmap.USER_ACCESS
assert "y_device_name" in Heatmap.USER_ACCESS
assert "y_device_name.setter" in Heatmap.USER_ACCESS
assert "y_device_entry" in Heatmap.USER_ACCESS
assert "y_device_entry.setter" in Heatmap.USER_ACCESS
assert "z_device_name" in Heatmap.USER_ACCESS
assert "z_device_name.setter" in Heatmap.USER_ACCESS
assert "z_device_entry" in Heatmap.USER_ACCESS
assert "z_device_entry.setter" in Heatmap.USER_ACCESS
def test_device_properties_validation(heatmap_widget):
"""Test that device entries are validated through entry_validator."""
# Set device name - entry should be auto-validated
heatmap_widget.x_device_name = "samx"
initial_entry = heatmap_widget.x_device_entry
# The entry should be validated (will be "samx" in the mock)
assert initial_entry == "samx"
# Set a different entry - should also be validated
heatmap_widget.x_device_entry = "samx" # Use same name as validated entry
assert heatmap_widget.x_device_entry == "samx"
def test_device_properties_with_plot_method(heatmap_widget):
"""Test that device properties reflect values set via plot() method."""
# Use plot method
heatmap_widget.plot(x_name="samx", y_name="samy", z_name="bpm4i")
# Properties should reflect the plotted devices
assert heatmap_widget.x_device_name == "samx"
assert heatmap_widget.y_device_name == "samy"
assert heatmap_widget.z_device_name == "bpm4i"
# Entries should be validated
assert heatmap_widget.x_device_entry == "samx"
assert heatmap_widget.y_device_entry == "samy"
assert heatmap_widget.z_device_entry == "bpm4i"
def test_device_properties_overwrite_via_properties(heatmap_widget):
"""Test that device properties can overwrite values set via plot()."""
# First set via plot
heatmap_widget.plot(x_name="samx", y_name="samy", z_name="bpm4i")
# Overwrite x device via properties
heatmap_widget.x_device_name = "samz"
assert heatmap_widget.x_device_name == "samz"
assert heatmap_widget._image_config.x_device.name == "samz"
# Overwrite y device entry
heatmap_widget.y_device_entry = "samy"
assert heatmap_widget.y_device_entry == "samy"
def test_device_properties_clearing_devices(heatmap_widget):
"""Test clearing devices by setting to empty string."""
# Set all devices
heatmap_widget.x_device_name = "samx"
heatmap_widget.y_device_name = "samy"
heatmap_widget.z_device_name = "bpm4i"
# Clear x device
heatmap_widget.x_device_name = ""
assert heatmap_widget.x_device_name == ""
assert heatmap_widget._image_config.x_device is None
# Y and Z should still be set
assert heatmap_widget.y_device_name == "samy"
assert heatmap_widget.z_device_name == "bpm4i"
def test_device_properties_property_changed_signal(heatmap_widget):
"""Test that property_changed signal is emitted when devices are set."""
from unittest.mock import Mock
# Connect mock to property_changed signal
mock_handler = Mock()
heatmap_widget.property_changed.connect(mock_handler)
# Set device name
heatmap_widget.x_device_name = "samx"
# Signal should have been emitted
assert mock_handler.called
# Check it was called with correct arguments
mock_handler.assert_any_call("x_device_name", "samx")
def test_device_entry_validation_with_invalid_device(heatmap_widget):
"""Test that invalid device names are handled gracefully."""
# Try to set invalid device name
heatmap_widget.x_device_name = "nonexistent_device"
# Should not crash, but device might not be set if validation fails
# The implementation silently fails, so we just check it doesn't crash
def test_device_properties_sequential_entry_changes(heatmap_widget):
"""Test changing device entry multiple times."""
# Set device
heatmap_widget.x_device_name = "samx"
# Change entry multiple times
heatmap_widget.x_device_entry = "samx_velocity"
assert heatmap_widget.x_device_entry == "samx_velocity"
heatmap_widget.x_device_entry = "samx_setpoint"
assert heatmap_widget.x_device_entry == "samx_setpoint"
heatmap_widget.x_device_entry = "samx"
assert heatmap_widget.x_device_entry == "samx"
def test_device_properties_with_none_values(heatmap_widget):
"""Test that None values are handled as empty strings."""
# Device name None should be treated as empty
heatmap_widget.x_device_name = None
assert heatmap_widget.x_device_name == ""
# Set a device first
heatmap_widget.y_device_name = "samy"
# Entry None should not change anything
heatmap_widget.y_device_entry = None
assert heatmap_widget.y_device_entry # Should still have validated entry

View File

@@ -1,6 +1,7 @@
import numpy as np
import pyqtgraph as pg
import pytest
from bec_lib.endpoints import MessageEndpoints
from qtpy.QtCore import QPointF
from bec_widgets.widgets.plots.image.image import Image
@@ -178,6 +179,114 @@ def test_image_setup_preview_signal_2d(qtbot, mocked_client, monkeypatch):
np.testing.assert_array_equal(view.main_image.image, test_data)
def test_preview_signals_skip_0d_entries(qtbot, mocked_client, monkeypatch):
"""
Preview/async combobox should omit 0D signals.
"""
view = create_widget(qtbot, Image, client=mocked_client)
def fake_get(sign_cls):
if sign_cls == "PreviewSignal":
return [
(
"dev",
"sig0d",
{
"obj_name": "sig0d",
"signal_class": "PreviewSignal",
"describe": {"signal_info": {"ndim": 0}},
},
),
(
"dev",
"sig2d",
{
"obj_name": "sig2d",
"signal_class": "PreviewSignal",
"describe": {"signal_info": {"ndim": 2}},
},
),
]
return []
monkeypatch.setattr(view.client.device_manager, "get_bec_signals", fake_get)
view.device_combo_box.clear()
view.device_combo_box.addItem("", None)
view._populate_signals()
texts = [view.device_combo_box.itemText(i) for i in range(view.device_combo_box.count())]
assert "sig0d" not in texts
assert "sig2d" in texts
def test_image_async_signal_uses_obj_name(qtbot, mocked_client, monkeypatch):
"""
Verify async signals use obj_name for endpoints/payloads and reconnect with scan_id.
"""
view = create_widget(qtbot, Image, client=mocked_client)
signal_config = {
"obj_name": "async_obj",
"signal_class": "AsyncSignal",
"describe": {"signal_info": {"ndim": 1}},
}
view.image(monitor=("eiger", "img", signal_config))
assert view.subscriptions["main"].async_signal_name == "async_obj"
# Prepare scan ids and capture dispatcher calls
view.old_scan_id = "old_scan"
view.scan_id = "new_scan"
connected = []
disconnected = []
monkeypatch.setattr(
view.bec_dispatcher,
"connect_slot",
lambda slot, endpoint, from_start=False, cb_info=None: connected.append(
(slot, endpoint, from_start, cb_info)
),
)
monkeypatch.setattr(
view.bec_dispatcher,
"disconnect_slot",
lambda slot, endpoint: disconnected.append((slot, endpoint)),
)
view._setup_async_image(view.scan_id)
expected_new = MessageEndpoints.device_async_signal("new_scan", "eiger", "async_obj")
expected_old = MessageEndpoints.device_async_signal("old_scan", "eiger", "async_obj")
assert any(ep == expected_new for _, ep, _, _ in connected)
assert any(ep == expected_old for _, ep in disconnected)
# Payload extraction should use obj_name
payload = np.array([1, 2, 3])
msg = {"signals": {"async_obj": {"value": payload}}}
assert view._get_payload_data(msg) is payload
def test_disconnect_monitor_clears_async_state(qtbot, mocked_client, monkeypatch):
view = create_widget(qtbot, Image, client=mocked_client)
signal_config = {
"obj_name": "async_obj",
"signal_class": "AsyncSignal",
"describe": {"signal_info": {"ndim": 2}},
}
view.image(monitor=("eiger", "img", signal_config))
view.scan_id = "scan_x"
view.old_scan_id = "scan_y"
view.subscriptions["main"].async_signal_name = "async_obj"
# Avoid touching real dispatcher
monkeypatch.setattr(view.bec_dispatcher, "disconnect_slot", lambda *args, **kwargs: None)
view.disconnect_monitor(("eiger", "img", signal_config))
assert view.subscriptions["main"].monitor is None
assert view.subscriptions["main"].async_signal_name is None
assert view.async_update is False
##############################################
# Device monitor endpoint update mechanism
@@ -600,33 +709,99 @@ def test_monitor_selection_reverse_device_items(qtbot, mocked_client):
assert combo.currentText() == "samy"
def test_monitor_selection_populate_preview_signals(qtbot, mocked_client, monkeypatch):
def test_monitor_selection_populate_signals(qtbot, mocked_client, monkeypatch):
"""
Verify that _populate_preview_signals adds previewsignal devices to the combobox
Verify that _populate_signals adds previewsignal and async-signal devices to the combobox
with the correct userData.
"""
view = create_widget(qtbot, Image, client=mocked_client)
signal_configs = {
"PreviewSignal": [
("eiger", "img", {"obj_name": "eiger_img", "describe": {"signal_info": {"ndim": 2}}}),
(
"eiger2",
"img2",
{"obj_name": "eiger_img2", "describe": {"signal_info": {"ndim": 2}}},
),
],
"AsyncSignal": [
(
"async_device",
"img_async",
{"obj_name": "async_device_img_async", "describe": {"signal_info": {"ndim": 2}}},
)
],
"AsyncMultiSignal": [
(
"multi_device",
"img_multi",
{"obj_name": "multi_device_img_multi", "describe": {"signal_info": {"ndim": 2}}},
)
],
"DynamicSignal": [
(
"dynamic_device",
"img_dyn",
{"obj_name": "dynamic_device_img_dyn", "describe": {"signal_info": {"ndim": 2}}},
)
],
}
# Provide a deterministic fake device_manager with get_bec_signals
class _FakeDM:
def get_bec_signals(self, _filter):
return [
("eiger", "img", {"obj_name": "eiger_img"}),
("async_device", "img2", {"obj_name": "async_device_img2"}),
]
if isinstance(_filter, str):
filters = [_filter]
else:
filters = list(_filter)
signals = []
for filt in filters:
signals.extend(signal_configs.get(filt, []))
return signals
monkeypatch.setattr(view.client, "device_manager", _FakeDM())
initial_count = view.device_combo_box.count()
view._populate_signals()
view._populate_preview_signals()
# Base devices first, then empty separator, then signal entries
signal_texts = []
separator_seen = False
for i in range(view.device_combo_box.count()):
data = view.device_combo_box.itemData(i)
text = view.device_combo_box.itemText(i)
if data is None and text == "":
separator_seen = True
continue
if separator_seen is False:
# base device entries
continue
# After separator we expect signal tuples
assert isinstance(data, tuple)
signal_texts.append(text)
# Two new entries should have been added
assert view.device_combo_box.count() == initial_count + 2
# The first newly added item should carry tuple userData describing the device/signal
data = view.device_combo_box.itemData(initial_count)
assert isinstance(data, tuple) and data[0] == "eiger"
expected_labels = {
"eiger_img",
"eiger_img2",
"async_device_img_async",
"multi_device_img_multi",
"dynamic_device_img_dyn",
}
assert expected_labels.issubset(set(signal_texts))
first_signal_idx = next(
i
for i in range(view.device_combo_box.count())
if isinstance(view.device_combo_box.itemData(i), tuple)
)
data = view.device_combo_box.itemData(first_signal_idx)
assert isinstance(data, tuple) and data[0] in [
"eiger",
"eiger2",
"async_device",
"multi_device",
"dynamic_device",
]
def test_monitor_selection_adjust_and_connect(qtbot, mocked_client, monkeypatch):
@@ -641,7 +816,26 @@ def test_monitor_selection_adjust_and_connect(qtbot, mocked_client, monkeypatch)
# Deterministic fake device_manager
class _FakeDM:
def get_bec_signals(self, _filter):
return [("eiger", "img", {"obj_name": "eiger_img"})]
if isinstance(_filter, str):
filters = [_filter]
else:
filters = list(_filter)
signals = []
for filt in filters:
if filt == "PreviewSignal":
signals.extend(
[
(
"eiger",
"img",
{"obj_name": "eiger_img", "describe": {"signal_info": {"ndim": 2}}},
)
]
)
else:
signals.extend([])
return signals
monkeypatch.setattr(view.client, "device_manager", _FakeDM())
@@ -654,9 +848,13 @@ def test_monitor_selection_adjust_and_connect(qtbot, mocked_client, monkeypatch)
# Execute the method under test
view._adjust_and_connect()
# Expect exactly two items: preview label followed by the empty default
assert combo.count() == 2
# Because of the reversal, the preview label comes first
assert combo.itemText(0) == "eiger_img"
# Base devices should appear first, then empty separator, then signals
sep_idx = next(
i for i in range(combo.count()) if combo.itemData(i) is None and combo.itemText(i) == ""
)
first_signal_idx = sep_idx + 1
assert isinstance(combo.itemData(first_signal_idx), tuple)
assert combo.itemText(first_signal_idx) == "eiger_img"
assert combo.itemText(sep_idx) == ""
# Current selection remains empty
assert combo.currentText() == ""

View File

@@ -1,4 +1,4 @@
import json
from unittest.mock import patch
import numpy as np
@@ -7,6 +7,9 @@ from bec_widgets.widgets.plots.scatter_waveform.scatter_curve import (
ScatterDeviceSignal,
)
from bec_widgets.widgets.plots.scatter_waveform.scatter_waveform import ScatterWaveform
from bec_widgets.widgets.plots.scatter_waveform.settings.scatter_curve_setting import (
ScatterCurveSettings,
)
from tests.unit_tests.client_mocks import create_dummy_scan_item, mocked_client
from .conftest import create_widget
@@ -46,28 +49,6 @@ def test_scatter_waveform_color_map(qtbot, mocked_client):
assert swf.color_map == "plasma"
def test_scatter_waveform_curve_json(qtbot, mocked_client):
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
# Add a device-based scatter curve
swf.plot(x_name="samx", y_name="samy", z_name="bpm4i", label="test_curve")
json_str = swf.curve_json
data = json.loads(json_str)
assert isinstance(data, dict)
assert data["label"] == "test_curve"
assert data["x_device"]["name"] == "samx"
assert data["y_device"]["name"] == "samy"
assert data["z_device"]["name"] == "bpm4i"
# Clear and reload from JSON
swf.clear_all()
assert swf.main_curve.getData() == (None, None)
swf.curve_json = json_str
assert swf.main_curve.config.label == "test_curve"
def test_scatter_waveform_update_with_scan_history(qtbot, mocked_client, monkeypatch):
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
@@ -151,3 +132,413 @@ def test_scatter_waveform_scan_progress(qtbot, mocked_client, monkeypatch):
# swf.scatter_dialog.close()
# assert swf.scatter_dialog is None
# assert not scatter_popup_action.isChecked(), "Should be unchecked after closing dialog"
################################################################################
# Device Property Tests
################################################################################
def test_device_safe_properties_get(qtbot, mocked_client):
"""Test that device SafeProperty getters work correctly."""
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
# Initially devices should be empty
assert swf.x_device_name == ""
assert swf.x_device_entry == ""
assert swf.y_device_name == ""
assert swf.y_device_entry == ""
assert swf.z_device_name == ""
assert swf.z_device_entry == ""
# Set devices via plot
swf.plot(x_name="samx", y_name="samy", z_name="bpm4i")
# Check properties return device names and entries separately
assert swf.x_device_name == "samx"
assert swf.x_device_entry # Should have some entry
assert swf.y_device_name == "samy"
assert swf.y_device_entry # Should have some entry
assert swf.z_device_name == "bpm4i"
assert swf.z_device_entry # Should have some entry
def test_device_safe_properties_set_name(qtbot, mocked_client):
"""Test that device SafeProperty setters work for device names."""
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
# Set x_device_name - should auto-validate entry
swf.x_device_name = "samx"
assert swf._main_curve.config.x_device is not None
assert swf._main_curve.config.x_device.name == "samx"
assert swf._main_curve.config.x_device.entry is not None # Entry should be validated
assert swf.x_device_name == "samx"
# Set y_device_name
swf.y_device_name = "samy"
assert swf._main_curve.config.y_device is not None
assert swf._main_curve.config.y_device.name == "samy"
assert swf._main_curve.config.y_device.entry is not None
assert swf.y_device_name == "samy"
# Set z_device_name
swf.z_device_name = "bpm4i"
assert swf._main_curve.config.z_device is not None
assert swf._main_curve.config.z_device.name == "bpm4i"
assert swf._main_curve.config.z_device.entry is not None
assert swf.z_device_name == "bpm4i"
def test_device_safe_properties_set_entry(qtbot, mocked_client):
"""Test that device entry properties can override default entries."""
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
# Set device name first - this auto-validates entry
swf.x_device_name = "samx"
initial_entry = swf.x_device_entry
assert initial_entry # Should have auto-validated entry
# Override with specific entry
swf.x_device_entry = "samx"
assert swf._main_curve.config.x_device.entry == "samx"
assert swf.x_device_entry == "samx"
# Same for y device
swf.y_device_name = "samy"
swf.y_device_entry = "samy_setpoint"
assert swf._main_curve.config.y_device.entry == "samy_setpoint"
# Same for z device
swf.z_device_name = "bpm4i"
swf.z_device_entry = "bpm4i"
assert swf._main_curve.config.z_device.entry == "bpm4i"
def test_device_entry_cannot_be_set_without_name(qtbot, mocked_client):
"""Test that setting entry without device name logs warning and does nothing."""
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
# Try to set entry without device name
swf.x_device_entry = "some_entry"
# Should not crash, entry should remain empty
assert swf.x_device_entry == ""
assert swf._main_curve.config.x_device is None
def test_device_safe_properties_set_empty(qtbot, mocked_client):
"""Test that device SafeProperty setters handle empty strings."""
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
# Set device first
swf.x_device_name = "samx"
assert swf._main_curve.config.x_device is not None
# Set to empty string - should clear the device
swf.x_device_name = ""
assert swf.x_device_name == ""
assert swf._main_curve.config.x_device is None
def test_device_safe_properties_auto_plot(qtbot, mocked_client):
"""Test that setting all three devices triggers auto-plot."""
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
# Set all three devices
swf.x_device_name = "samx"
swf.y_device_name = "samy"
swf.z_device_name = "bpm4i"
# Check that plot was called (config should be updated)
assert swf._main_curve.config.x_device is not None
assert swf._main_curve.config.y_device is not None
assert swf._main_curve.config.z_device is not None
def test_device_properties_update_labels(qtbot, mocked_client):
"""Test that setting device properties updates axis labels."""
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
# Set x device - should update x label
swf.x_device_name = "samx"
assert swf.x_label == "samx"
# Set y device - should update y label
swf.y_device_name = "samy"
assert swf.y_label == "samy"
# Note: ScatterWaveform doesn't have a title like Heatmap does for z_device
def test_device_properties_partial_configuration(qtbot, mocked_client):
"""Test that widget handles partial device configuration gracefully."""
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
# Set only x device
swf.x_device_name = "samx"
assert swf.x_device_name == "samx"
assert swf.y_device_name == ""
assert swf.z_device_name == ""
# Set only y device (x already set)
swf.y_device_name = "samy"
assert swf.x_device_name == "samx"
assert swf.y_device_name == "samy"
assert swf.z_device_name == ""
# Auto-plot should not trigger yet (z missing)
# But devices should be configured
assert swf._main_curve.config.x_device is not None
assert swf._main_curve.config.y_device is not None
def test_device_properties_in_user_access(qtbot, mocked_client):
"""Test that device properties are exposed in USER_ACCESS for RPC."""
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
assert "x_device_name" in ScatterWaveform.USER_ACCESS
assert "x_device_name.setter" in ScatterWaveform.USER_ACCESS
assert "x_device_entry" in ScatterWaveform.USER_ACCESS
assert "x_device_entry.setter" in ScatterWaveform.USER_ACCESS
assert "y_device_name" in ScatterWaveform.USER_ACCESS
assert "y_device_name.setter" in ScatterWaveform.USER_ACCESS
assert "y_device_entry" in ScatterWaveform.USER_ACCESS
assert "y_device_entry.setter" in ScatterWaveform.USER_ACCESS
assert "z_device_name" in ScatterWaveform.USER_ACCESS
assert "z_device_name.setter" in ScatterWaveform.USER_ACCESS
assert "z_device_entry" in ScatterWaveform.USER_ACCESS
assert "z_device_entry.setter" in ScatterWaveform.USER_ACCESS
def test_device_properties_validation(qtbot, mocked_client):
"""Test that device entries are validated through entry_validator."""
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
# Set device name - entry should be auto-validated
swf.x_device_name = "samx"
initial_entry = swf.x_device_entry
# The entry should be validated (will be "samx" in the mock)
assert initial_entry == "samx"
# Set a different entry - should also be validated
swf.x_device_entry = "samx" # Use same name as validated entry
assert swf.x_device_entry == "samx"
def test_device_properties_with_plot_method(qtbot, mocked_client):
"""Test that device properties reflect values set via plot() method."""
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
# Use plot method
swf.plot(x_name="samx", y_name="samy", z_name="bpm4i")
# Properties should reflect the plotted devices
assert swf.x_device_name == "samx"
assert swf.y_device_name == "samy"
assert swf.z_device_name == "bpm4i"
# Entries should be validated
assert swf.x_device_entry == "samx"
assert swf.y_device_entry == "samy"
assert swf.z_device_entry == "bpm4i"
def test_device_properties_overwrite_via_properties(qtbot, mocked_client):
"""Test that device properties can overwrite values set via plot()."""
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
# First set via plot
swf.plot(x_name="samx", y_name="samy", z_name="bpm4i")
# Overwrite x device via properties
swf.x_device_name = "samz"
assert swf.x_device_name == "samz"
assert swf._main_curve.config.x_device.name == "samz"
# Overwrite y device entry
swf.y_device_entry = "samy"
assert swf.y_device_entry == "samy"
def test_device_properties_clearing_devices(qtbot, mocked_client):
"""Test clearing devices by setting to empty string."""
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
# Set all devices
swf.x_device_name = "samx"
swf.y_device_name = "samy"
swf.z_device_name = "bpm4i"
# Clear x device
swf.x_device_name = ""
assert swf.x_device_name == ""
assert swf._main_curve.config.x_device is None
# Y and Z should still be set
assert swf.y_device_name == "samy"
assert swf.z_device_name == "bpm4i"
def test_device_properties_property_changed_signal(qtbot, mocked_client):
"""Test that property_changed signal is emitted when devices are set."""
from unittest.mock import Mock
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
# Connect mock to property_changed signal
mock_handler = Mock()
swf.property_changed.connect(mock_handler)
# Set device name
swf.x_device_name = "samx"
# Signal should have been emitted
assert mock_handler.called
# Check it was called with correct arguments
mock_handler.assert_any_call("x_device_name", "samx")
def test_device_entry_validation_with_invalid_device(qtbot, mocked_client):
"""Test that invalid device names are handled gracefully."""
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
# Try to set invalid device name
swf.x_device_name = "nonexistent_device"
# Should not crash, but device might not be set if validation fails
# The implementation silently fails, so we just check it doesn't crash
def test_device_properties_sequential_entry_changes(qtbot, mocked_client):
"""Test changing device entry multiple times."""
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
# Set device
swf.x_device_name = "samx"
# Change entry multiple times
swf.x_device_entry = "samx_velocity"
assert swf.x_device_entry == "samx_velocity"
swf.x_device_entry = "samx_setpoint"
assert swf.x_device_entry == "samx_setpoint"
swf.x_device_entry = "samx"
assert swf.x_device_entry == "samx"
def test_device_properties_with_none_values(qtbot, mocked_client):
"""Test that None values are handled as empty strings."""
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
# Device name None should be treated as empty
swf.x_device_name = None
assert swf.x_device_name == ""
# Set a device first
swf.y_device_name = "samy"
# Entry None should not change anything
swf.y_device_entry = None
assert swf.y_device_entry # Should still have validated entry
################################################################################
# ScatterCurveSettings Tests
################################################################################
def test_scatter_curve_settings_accept_changes(qtbot, mocked_client):
"""Test that accept_changes correctly extracts data from widgets and calls plot()."""
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
# Create the settings widget
settings = ScatterCurveSettings(parent=None, target_widget=swf, popup=True)
qtbot.addWidget(settings)
# Set up the widgets with test values
settings.ui.x_name.set_device("samx")
settings.ui.y_name.set_device("samy")
settings.ui.z_name.set_device("bpm4i")
# Mock the plot method to verify it gets called with correct arguments
with patch.object(swf, "plot") as mock_plot:
settings.accept_changes()
# Verify plot was called
mock_plot.assert_called_once()
# Get the call arguments
call_kwargs = mock_plot.call_args[1]
# Verify device names were extracted correctly
assert call_kwargs["x_name"] == "samx"
assert call_kwargs["y_name"] == "samy"
assert call_kwargs["z_name"] == "bpm4i"
def test_scatter_curve_settings_accept_changes_with_entries(qtbot, mocked_client):
"""Test that accept_changes correctly extracts signal entries from SignalComboBox."""
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
# Create the settings widget
settings = ScatterCurveSettings(parent=None, target_widget=swf, popup=True)
qtbot.addWidget(settings)
# Set devices first to populate signal comboboxes
settings.ui.x_name.set_device("samx")
settings.ui.y_name.set_device("samy")
settings.ui.z_name.set_device("bpm4i")
qtbot.wait(100) # Allow time for signals to populate
# Mock the plot method
with patch.object(swf, "plot") as mock_plot:
settings.accept_changes()
mock_plot.assert_called_once()
call_kwargs = mock_plot.call_args[1]
# Verify entries are extracted (will use get_signal_name())
assert "x_entry" in call_kwargs
assert "y_entry" in call_kwargs
assert "z_entry" in call_kwargs
def test_scatter_curve_settings_accept_changes_color_map(qtbot, mocked_client):
"""Test that accept_changes correctly extracts color_map from widget."""
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
# Create the settings widget
settings = ScatterCurveSettings(parent=None, target_widget=swf, popup=True)
qtbot.addWidget(settings)
# Set devices
settings.ui.x_name.set_device("samx")
settings.ui.y_name.set_device("samy")
settings.ui.z_name.set_device("bpm4i")
# Get the current colormap
color_map = settings.ui.color_map.colormap
with patch.object(swf, "plot") as mock_plot:
settings.accept_changes()
call_kwargs = mock_plot.call_args[1]
assert call_kwargs["color_map"] == color_map
def test_scatter_curve_settings_fetch_all_properties(qtbot, mocked_client):
"""Test that fetch_all_properties correctly populates the settings from target widget."""
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
# First set up the scatter waveform with some data
swf.plot(x_name="samx", y_name="samy", z_name="bpm4i")
# Create the settings widget - it should fetch properties automatically
settings = ScatterCurveSettings(parent=None, target_widget=swf, popup=True)
qtbot.addWidget(settings)
# Verify the settings widget has fetched the values
assert settings.ui.x_name.currentText() == "samx"
assert settings.ui.y_name.currentText() == "samy"
assert settings.ui.z_name.currentText() == "bpm4i"

View File

@@ -55,20 +55,16 @@ def test_script_tree_hover_events(script_tree, qtbot):
# Send the event to the viewport (the event filter is installed on the viewport)
script_tree.eventFilter(viewport, mouse_event)
qtbot.wait(100) # Allow time for the hover to be processed
# Now, the hover index should be set to the first item
assert script_tree.delegate.hovered_index.isValid() == True
qtbot.waitUntil(lambda: script_tree.delegate.hovered_index.isValid(), timeout=5000)
assert script_tree.delegate.hovered_index.row() == index.row()
# Simulate mouse leaving the viewport
leave_event = QEvent(QEvent.Type.Leave)
script_tree.eventFilter(viewport, leave_event)
qtbot.wait(100) # Allow time for the leave event to be processed
# After leaving, no item should be hovered
assert script_tree.delegate.hovered_index.isValid() == False
qtbot.waitUntil(lambda: not script_tree.delegate.hovered_index.isValid(), timeout=5000)
@pytest.mark.timeout(10)

View File

@@ -1,25 +1,69 @@
from unittest import mock
import pytest
from qtpy.QtCore import Qt
from qtpy.QtGui import QHideEvent
from qtpy.QtNetwork import QAuthenticator
from bec_widgets.widgets.editors.web_console.web_console import WebConsole, _web_console_registry
from bec_widgets.widgets.editors.web_console.web_console import (
BECShell,
ConsoleMode,
WebConsole,
_web_console_registry,
)
from .client_mocks import mocked_client
@pytest.fixture
def console_widget(qtbot, mocked_client):
def mocked_server_startup():
"""Mock the web console server startup process."""
with mock.patch(
"bec_widgets.widgets.editors.web_console.web_console.subprocess"
) as mock_subprocess:
with mock.patch.object(_web_console_registry, "_wait_for_server_port"):
_web_console_registry._server_port = 12345
# Create the WebConsole widget
widget = WebConsole(client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
yield mock_subprocess
def static_console(qtbot, client, unique_id: str | None = None):
"""Fixture to provide a static unique_id for WebConsole tests."""
if unique_id is None:
widget = WebConsole(client=client)
else:
widget = WebConsole(client=client, unique_id=unique_id)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
return widget
@pytest.fixture
def console_widget(qtbot, mocked_client, mocked_server_startup):
"""Create a WebConsole widget with mocked server startup."""
yield static_console(qtbot, mocked_client)
@pytest.fixture
def bec_shell_widget(qtbot, mocked_client, mocked_server_startup):
"""Create a BECShell widget with mocked server startup."""
widget = BECShell(client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
@pytest.fixture
def console_widget_with_static_id(qtbot, mocked_client, mocked_server_startup):
"""Create a WebConsole widget with a static unique ID."""
yield static_console(qtbot, mocked_client, unique_id="test_console")
@pytest.fixture
def two_console_widgets_same_id(qtbot, mocked_client, mocked_server_startup):
"""Create two WebConsole widgets sharing the same unique ID."""
widget1 = static_console(qtbot, mocked_client, unique_id="shared_console")
widget2 = static_console(qtbot, mocked_client, unique_id="shared_console")
yield widget1, widget2
def test_web_console_widget_initialization(console_widget):
@@ -34,7 +78,7 @@ def test_web_console_write(console_widget):
with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js:
console_widget.write("Hello, World!")
assert mock.call("window.term.paste('Hello, World!');") in mock_run_js.mock_calls
assert mock.call('window.term.paste("Hello, World!");') in mock_run_js.mock_calls
def test_web_console_write_no_return(console_widget):
@@ -42,7 +86,7 @@ def test_web_console_write_no_return(console_widget):
with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js:
console_widget.write("Hello, World!", send_return=False)
assert mock.call("window.term.paste('Hello, World!');") in mock_run_js.mock_calls
assert mock.call('window.term.paste("Hello, World!");') in mock_run_js.mock_calls
assert mock_run_js.call_count == 1
@@ -138,6 +182,20 @@ def test_web_console_startup_command_execution(console_widget, qtbot):
assert not console_widget._startup_timer.isActive()
def test_bec_shell_startup_contains_gui_id(bec_shell_widget):
"""Test that the BEC shell startup command includes the GUI ID."""
bec_shell = bec_shell_widget
assert bec_shell._is_bec_shell
assert bec_shell._unique_id == "bec_shell"
assert bec_shell.startup_cmd == "bec --nogui"
with mock.patch.object(bec_shell.bec_dispatcher, "cli_server") as mock_cli_server:
mock_cli_server.gui_id = "test_gui_id"
assert bec_shell.startup_cmd == "bec --gui-id test_gui_id"
def test_web_console_set_readonly(console_widget):
# Test the set_readonly method
console_widget.set_readonly(True)
@@ -145,3 +203,274 @@ def test_web_console_set_readonly(console_widget):
console_widget.set_readonly(False)
assert console_widget.isEnabled()
def test_web_console_with_unique_id(console_widget_with_static_id):
"""Test creating a WebConsole with a unique_id."""
widget = console_widget_with_static_id
assert widget._unique_id == "test_console"
assert widget._unique_id in _web_console_registry._page_registry
page_info = _web_console_registry.get_page_info("test_console")
assert page_info is not None
assert page_info.owner_gui_id == widget.gui_id
assert widget.gui_id in page_info.widget_ids
def test_web_console_page_sharing(two_console_widgets_same_id):
"""Test that two widgets can share the same page using unique_id."""
widget1, widget2 = two_console_widgets_same_id
# Both should reference the same page in the registry
page_info = _web_console_registry.get_page_info("shared_console")
assert page_info is not None
assert widget1.gui_id in page_info.widget_ids
assert widget2.gui_id in page_info.widget_ids
assert widget1.page == widget2.page
def test_web_console_has_ownership(console_widget_with_static_id):
"""Test the has_ownership method."""
widget = console_widget_with_static_id
# Widget should have ownership by default
assert widget.has_ownership()
def test_web_console_yield_ownership(console_widget_with_static_id):
"""Test yielding ownership of a page."""
widget = console_widget_with_static_id
assert widget.has_ownership()
# Yield ownership
widget.yield_ownership()
# Widget should no longer have ownership
assert not widget.has_ownership()
page_info = _web_console_registry.get_page_info("test_console")
assert page_info.owner_gui_id is None
# Overlay should be shown
assert widget._mode == ConsoleMode.INACTIVE
def test_web_console_take_page_ownership(two_console_widgets_same_id):
"""Test taking ownership of a page."""
widget1, widget2 = two_console_widgets_same_id
# Widget1 should have ownership initially
assert widget1.has_ownership()
assert not widget2.has_ownership()
# Widget2 takes ownership
widget2.take_page_ownership()
# Now widget2 should have ownership
assert not widget1.has_ownership()
assert widget2.has_ownership()
assert widget2._mode == ConsoleMode.ACTIVE
assert widget1._mode == ConsoleMode.INACTIVE
def test_web_console_hide_event_yields_ownership(qtbot, console_widget_with_static_id):
"""Test that hideEvent yields ownership."""
widget = console_widget_with_static_id
assert widget.has_ownership()
# Hide the widget. Note that we cannot call widget.hide() directly
# because it doesn't trigger the hideEvent in tests as widgets are
# not visible in the test environment.
widget.hideEvent(QHideEvent())
qtbot.wait(100) # Allow event processing
# Widget should have yielded ownership
assert not widget.has_ownership()
page_info = _web_console_registry.get_page_info("test_console")
assert page_info.owner_gui_id is None
def test_web_console_show_event_takes_ownership(console_widget_with_static_id):
"""Test that showEvent takes ownership when page has no owner."""
widget = console_widget_with_static_id
# Yield ownership
widget.yield_ownership()
assert not widget.has_ownership()
# Show the widget again
widget.show()
# Widget should have reclaimed ownership
assert widget.has_ownership()
assert widget.browser.isVisible()
assert not widget.overlay.isVisible()
def test_web_console_mouse_press_takes_ownership(qtbot, two_console_widgets_same_id):
"""Test that clicking on overlay takes ownership."""
widget1, widget2 = two_console_widgets_same_id
widget1.show()
widget2.show()
# Widget1 has ownership, widget2 doesn't
assert widget1.has_ownership()
assert not widget2.has_ownership()
assert widget1.isVisible()
assert widget1._mode == ConsoleMode.ACTIVE
assert widget2._mode == ConsoleMode.INACTIVE
qtbot.mouseClick(widget2, Qt.MouseButton.LeftButton)
# Widget2 should now have ownership
assert widget2.has_ownership()
assert not widget1.has_ownership()
def test_web_console_registry_cleanup_removes_page(console_widget_with_static_id):
"""Test that the registry cleans up pages when all widgets are removed."""
widget = console_widget_with_static_id
assert widget._unique_id in _web_console_registry._page_registry
# Cleanup the widget
widget.cleanup()
# Page should be removed from registry
assert widget._unique_id not in _web_console_registry._page_registry
def test_web_console_without_unique_id_no_page_sharing(console_widget):
"""Test that widgets without unique_id don't participate in page sharing."""
widget = console_widget
# Widget should not be in the page registry
assert widget._unique_id is None
assert not widget.has_ownership() # Should return False for non-unique widgets
def test_web_console_registry_get_page_info_nonexistent(qtbot, mocked_client):
"""Test getting page info for a non-existent page."""
page_info = _web_console_registry.get_page_info("nonexistent")
assert page_info is None
def test_web_console_take_ownership_without_unique_id(console_widget):
"""Test that take_page_ownership fails gracefully without unique_id."""
widget = console_widget
# Should not crash when taking ownership without unique_id
widget.take_page_ownership()
def test_web_console_yield_ownership_without_unique_id(console_widget):
"""Test that yield_ownership fails gracefully without unique_id."""
widget = console_widget
# Should not crash when yielding ownership without unique_id
widget.yield_ownership()
def test_registry_yield_ownership_gui_id_not_in_instances():
"""Test registry yield_ownership returns False when gui_id not in instances."""
result = _web_console_registry.yield_ownership("nonexistent_gui_id")
assert result is False
def test_registry_yield_ownership_instance_is_none(console_widget_with_static_id):
"""Test registry yield_ownership returns False when instance weakref is dead."""
widget = console_widget_with_static_id
gui_id = widget.gui_id
# Store the gui_id and simulate the weakref being dead
_web_console_registry._instances[gui_id] = lambda: None
result = _web_console_registry.yield_ownership(gui_id)
assert result is False
def test_registry_yield_ownership_unique_id_none(console_widget_with_static_id):
"""Test registry yield_ownership returns False when page info's unique_id is None."""
widget = console_widget_with_static_id
gui_id = widget.gui_id
unique_id = widget._unique_id
widget._unique_id = None
result = _web_console_registry.yield_ownership(gui_id)
assert result is False
widget._unique_id = unique_id # Restore for cleanup
def test_registry_yield_ownership_unique_id_not_in_page_registry(console_widget_with_static_id):
"""Test registry yield_ownership returns False when unique_id not in page registry."""
widget = console_widget_with_static_id
gui_id = widget.gui_id
unique_id = widget._unique_id
widget._unique_id = "nonexistent_unique_id"
result = _web_console_registry.yield_ownership(gui_id)
assert result is False
widget._unique_id = unique_id # Restore for cleanup
def test_registry_owner_is_visible_page_info_none():
"""Test owner_is_visible returns False when page info doesn't exist."""
result = _web_console_registry.owner_is_visible("nonexistent_page")
assert result is False
def test_registry_owner_is_visible_no_owner(console_widget_with_static_id):
"""Test owner_is_visible returns False when page has no owner."""
widget = console_widget_with_static_id
# Yield ownership so there's no owner
widget.yield_ownership()
page_info = _web_console_registry.get_page_info(widget._unique_id)
assert page_info.owner_gui_id is None
result = _web_console_registry.owner_is_visible(widget._unique_id)
assert result is False
def test_registry_owner_is_visible_owner_ref_none(console_widget_with_static_id):
"""Test owner_is_visible returns False when owner ref doesn't exist in instances."""
widget = console_widget_with_static_id
unique_id = widget._unique_id
# Remove owner from instances dict
del _web_console_registry._instances[widget.gui_id]
result = _web_console_registry.owner_is_visible(unique_id)
assert result is False
def test_registry_owner_is_visible_owner_instance_none(console_widget_with_static_id):
"""Test owner_is_visible returns False when owner instance weakref is dead."""
widget = console_widget_with_static_id
unique_id = widget._unique_id
gui_id = widget.gui_id
# Simulate dead weakref
_web_console_registry._instances[gui_id] = lambda: None
result = _web_console_registry.owner_is_visible(unique_id)
assert result is False
def test_registry_owner_is_visible_owner_visible(console_widget_with_static_id):
"""Test owner_is_visible returns True when owner is visible."""
widget = console_widget_with_static_id
widget.show()
result = _web_console_registry.owner_is_visible(widget._unique_id)
assert result is True
def test_registry_owner_is_visible_owner_not_visible(console_widget_with_static_id):
"""Test owner_is_visible returns False when owner is not visible."""
widget = console_widget_with_static_id
widget.hide()
result = _web_console_registry.owner_is_visible(widget._unique_id)
assert result is False