mirror of
https://github.com/bec-project/bec_widgets.git
synced 2026-04-08 01:37:53 +02:00
Compare commits
11 Commits
feat/manag
...
fix/image-
| Author | SHA1 | Date | |
|---|---|---|---|
| 952be8f6fd | |||
| 3bd33b93cd | |||
| 3e4c5e9ab1 | |||
| 48e2a97ece | |||
| 953760c828 | |||
| dc3129357b | |||
| 12746ae4aa | |||
| 7e9cc20e59 | |||
| 5209f4c210 | |||
| 5f30ab5aa2 | |||
| 3926c5c947 |
17
.github/workflows/end2end-conda.yml
vendored
17
.github/workflows/end2end-conda.yml
vendored
@@ -9,10 +9,10 @@ jobs:
|
||||
shell: bash -el {0}
|
||||
|
||||
env:
|
||||
CHILD_PIPELINE_BRANCH: main # Set the branch you want for ophyd_devices
|
||||
BEC_CORE_BRANCH: main # Set the branch you want for bec
|
||||
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
|
||||
PLUGIN_REPO_BRANCH: main # Set the branch you want for the plugin repo
|
||||
CHILD_PIPELINE_BRANCH: main # Set the branch you want for ophyd_devices
|
||||
BEC_CORE_BRANCH: main # Set the branch you want for bec
|
||||
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
|
||||
PLUGIN_REPO_BRANCH: main # Set the branch you want for the plugin repo
|
||||
PROJECT_PATH: ${{ github.repository }}
|
||||
QTWEBENGINE_DISABLE_SANDBOX: 1
|
||||
QT_QPA_PLATFORM: "offscreen"
|
||||
@@ -23,15 +23,16 @@ jobs:
|
||||
- name: Set up Conda
|
||||
uses: conda-incubator/setup-miniconda@v3
|
||||
with:
|
||||
auto-update-conda: true
|
||||
auto-activate-base: true
|
||||
python-version: '3.11'
|
||||
auto-update-conda: true
|
||||
auto-activate-base: true
|
||||
python-version: "3.11"
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
sudo apt-get update
|
||||
sudo apt-get install -y libgl1 libegl1 x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb
|
||||
sudo apt-get -y install libnss3 libxdamage1 libasound2t64 libatomic1 libxcursor1
|
||||
sudo apt-get -y install ttyd
|
||||
|
||||
- name: Conda install and run pytest
|
||||
run: |
|
||||
@@ -55,4 +56,4 @@ jobs:
|
||||
with:
|
||||
name: pytest-logs
|
||||
path: ./logs/*.log
|
||||
retention-days: 7
|
||||
retention-days: 7
|
||||
|
||||
@@ -18,7 +18,7 @@ from bec_widgets.widgets.containers.advanced_dock_area.basic_dock_area import Do
|
||||
from bec_widgets.widgets.containers.qt_ads import CDockWidget
|
||||
from bec_widgets.widgets.editors.monaco.monaco_dock import MonacoDock
|
||||
from bec_widgets.widgets.editors.monaco.monaco_widget import MonacoWidget
|
||||
from bec_widgets.widgets.editors.web_console.web_console import WebConsole
|
||||
from bec_widgets.widgets.editors.web_console.web_console import BECShell, WebConsole
|
||||
from bec_widgets.widgets.utility.ide_explorer.ide_explorer import IDEExplorer
|
||||
|
||||
|
||||
@@ -91,9 +91,10 @@ class DeveloperWidget(DockAreaWidget):
|
||||
# Initialize the widgets
|
||||
self.explorer = IDEExplorer(self)
|
||||
self.explorer.setObjectName("Explorer")
|
||||
self.console = WebConsole(self)
|
||||
self.console.setObjectName("Console")
|
||||
self.terminal = WebConsole(self, startup_cmd="")
|
||||
|
||||
self.console = BECShell(self)
|
||||
self.console.setObjectName("BEC Shell")
|
||||
self.terminal = WebConsole(self)
|
||||
self.terminal.setObjectName("Terminal")
|
||||
self.monaco = MonacoDock(self)
|
||||
self.monaco.setObjectName("MonacoEditor")
|
||||
|
||||
@@ -30,6 +30,7 @@ _Widgets = {
|
||||
"BECMainWindow": "BECMainWindow",
|
||||
"BECProgressBar": "BECProgressBar",
|
||||
"BECQueue": "BECQueue",
|
||||
"BECShell": "BECShell",
|
||||
"BECStatusBox": "BECStatusBox",
|
||||
"DapComboBox": "DapComboBox",
|
||||
"DarkModeButton": "DarkModeButton",
|
||||
@@ -495,6 +496,28 @@ class BECQueue(RPCBase):
|
||||
"""
|
||||
|
||||
|
||||
class BECShell(RPCBase):
|
||||
"""A WebConsole pre-configured to run the BEC shell."""
|
||||
|
||||
@rpc_call
|
||||
def remove(self):
|
||||
"""
|
||||
Cleanup the BECConnector
|
||||
"""
|
||||
|
||||
@rpc_call
|
||||
def attach(self):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
@rpc_call
|
||||
def detach(self):
|
||||
"""
|
||||
Detach the widget from its parent dock widget (if widget is in the dock), making it a floating widget.
|
||||
"""
|
||||
|
||||
|
||||
class BECStatusBox(RPCBase):
|
||||
"""An autonomous widget to display the status of BEC services."""
|
||||
|
||||
@@ -2016,6 +2039,90 @@ class Heatmap(RPCBase):
|
||||
reload (bool): Whether to reload the heatmap with new data.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def x_device_name(self) -> "str":
|
||||
"""
|
||||
Device name for the X axis.
|
||||
"""
|
||||
|
||||
@x_device_name.setter
|
||||
@rpc_call
|
||||
def x_device_name(self) -> "str":
|
||||
"""
|
||||
Device name for the X axis.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def x_device_entry(self) -> "str":
|
||||
"""
|
||||
Signal entry for the X axis device.
|
||||
"""
|
||||
|
||||
@x_device_entry.setter
|
||||
@rpc_call
|
||||
def x_device_entry(self) -> "str":
|
||||
"""
|
||||
Signal entry for the X axis device.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def y_device_name(self) -> "str":
|
||||
"""
|
||||
Device name for the Y axis.
|
||||
"""
|
||||
|
||||
@y_device_name.setter
|
||||
@rpc_call
|
||||
def y_device_name(self) -> "str":
|
||||
"""
|
||||
Device name for the Y axis.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def y_device_entry(self) -> "str":
|
||||
"""
|
||||
Signal entry for the Y axis device.
|
||||
"""
|
||||
|
||||
@y_device_entry.setter
|
||||
@rpc_call
|
||||
def y_device_entry(self) -> "str":
|
||||
"""
|
||||
Signal entry for the Y axis device.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def z_device_name(self) -> "str":
|
||||
"""
|
||||
Device name for the Z (color) axis.
|
||||
"""
|
||||
|
||||
@z_device_name.setter
|
||||
@rpc_call
|
||||
def z_device_name(self) -> "str":
|
||||
"""
|
||||
Device name for the Z (color) axis.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def z_device_entry(self) -> "str":
|
||||
"""
|
||||
Signal entry for the Z (color) axis device.
|
||||
"""
|
||||
|
||||
@z_device_entry.setter
|
||||
@rpc_call
|
||||
def z_device_entry(self) -> "str":
|
||||
"""
|
||||
Signal entry for the Z (color) axis device.
|
||||
"""
|
||||
|
||||
|
||||
class Image(RPCBase):
|
||||
"""Image widget for displaying 2D data."""
|
||||
@@ -5265,13 +5372,6 @@ class ScatterWaveform(RPCBase):
|
||||
Take a screenshot of the dock area and save it to a file.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def main_curve(self) -> "ScatterCurve":
|
||||
"""
|
||||
The main scatter curve item.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def color_map(self) -> "str":
|
||||
@@ -5334,6 +5434,90 @@ class ScatterWaveform(RPCBase):
|
||||
Clear all the curves from the plot.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def x_device_name(self) -> "str":
|
||||
"""
|
||||
Device name for the X axis.
|
||||
"""
|
||||
|
||||
@x_device_name.setter
|
||||
@rpc_call
|
||||
def x_device_name(self) -> "str":
|
||||
"""
|
||||
Device name for the X axis.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def x_device_entry(self) -> "str":
|
||||
"""
|
||||
Signal entry for the X axis device.
|
||||
"""
|
||||
|
||||
@x_device_entry.setter
|
||||
@rpc_call
|
||||
def x_device_entry(self) -> "str":
|
||||
"""
|
||||
Signal entry for the X axis device.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def y_device_name(self) -> "str":
|
||||
"""
|
||||
Device name for the Y axis.
|
||||
"""
|
||||
|
||||
@y_device_name.setter
|
||||
@rpc_call
|
||||
def y_device_name(self) -> "str":
|
||||
"""
|
||||
Device name for the Y axis.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def y_device_entry(self) -> "str":
|
||||
"""
|
||||
Signal entry for the Y axis device.
|
||||
"""
|
||||
|
||||
@y_device_entry.setter
|
||||
@rpc_call
|
||||
def y_device_entry(self) -> "str":
|
||||
"""
|
||||
Signal entry for the Y axis device.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def z_device_name(self) -> "str":
|
||||
"""
|
||||
Device name for the Z (color) axis.
|
||||
"""
|
||||
|
||||
@z_device_name.setter
|
||||
@rpc_call
|
||||
def z_device_name(self) -> "str":
|
||||
"""
|
||||
Device name for the Z (color) axis.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def z_device_entry(self) -> "str":
|
||||
"""
|
||||
Signal entry for the Z (color) axis device.
|
||||
"""
|
||||
|
||||
@z_device_entry.setter
|
||||
@rpc_call
|
||||
def z_device_entry(self) -> "str":
|
||||
"""
|
||||
Signal entry for the Z (color) axis device.
|
||||
"""
|
||||
|
||||
|
||||
class SignalComboBox(RPCBase):
|
||||
"""Line edit widget for device input with autocomplete for device names."""
|
||||
@@ -5366,6 +5550,15 @@ class SignalComboBox(RPCBase):
|
||||
list[str]: List of device signals.
|
||||
"""
|
||||
|
||||
@rpc_call
|
||||
def get_signal_name(self) -> "str":
|
||||
"""
|
||||
Get the signal name from the combobox.
|
||||
|
||||
Returns:
|
||||
str: The signal name.
|
||||
"""
|
||||
|
||||
|
||||
class SignalLabel(RPCBase):
|
||||
@property
|
||||
|
||||
@@ -1,20 +1,16 @@
|
||||
# pylint: disable = no-name-in-module,missing-module-docstring
|
||||
from __future__ import annotations
|
||||
|
||||
import inspect
|
||||
import os
|
||||
import time
|
||||
import traceback
|
||||
import uuid
|
||||
import weakref
|
||||
from datetime import datetime
|
||||
from typing import TYPE_CHECKING, Callable, Final, Optional
|
||||
from weakref import WeakMethod, WeakValueDictionary
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_lib.utils.import_utils import lazy_import_from
|
||||
from louie import saferef
|
||||
from pydantic import BaseModel, ConfigDict, Field, field_validator
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
from qtpy.QtCore import QObject, QRunnable, QThreadPool, QTimer, Signal
|
||||
from qtpy.QtWidgets import QApplication
|
||||
|
||||
@@ -33,12 +29,6 @@ else:
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class _NextSentinel(object): ...
|
||||
|
||||
|
||||
NEXT_SENTINEL: Final[_NextSentinel] = _NextSentinel()
|
||||
|
||||
|
||||
class ConnectionConfig(BaseModel):
|
||||
"""Configuration for BECConnector mixin class"""
|
||||
|
||||
@@ -46,7 +36,7 @@ class ConnectionConfig(BaseModel):
|
||||
gui_id: Optional[str] = Field(
|
||||
default=None, validate_default=True, description="The GUI ID of the widget."
|
||||
)
|
||||
model_config: ConfigDict = {"validate_assignment": True}
|
||||
model_config: dict = {"validate_assignment": True}
|
||||
|
||||
@field_validator("gui_id")
|
||||
@classmethod
|
||||
@@ -87,9 +77,7 @@ class BECConnector:
|
||||
"""Connection mixin class to handle BEC client and device manager"""
|
||||
|
||||
USER_ACCESS = ["_config_dict", "_get_all_rpc", "_rpc_id"]
|
||||
EXIT_HANDLERS: WeakValueDictionary[int, Callable[[],]] = WeakValueDictionary()
|
||||
_exit_handler: Callable[[],] | None = None
|
||||
_method_handlers: set[Callable[[],]] = set()
|
||||
EXIT_HANDLERS = {}
|
||||
widget_removed = Signal()
|
||||
name_established = Signal(str)
|
||||
|
||||
@@ -134,7 +122,7 @@ class BECConnector:
|
||||
self.client = self.bec_dispatcher.client if client is None else client
|
||||
self.rpc_register = RPCRegister()
|
||||
|
||||
if not 0 in BECConnector.EXIT_HANDLERS.keys():
|
||||
if not self.client in BECConnector.EXIT_HANDLERS:
|
||||
# register function to clean connections at exit;
|
||||
# the function depends on BECClient, and BECDispatcher
|
||||
@SafeSlot()
|
||||
@@ -155,9 +143,8 @@ class BECConnector:
|
||||
logger.info("Shutting down BEC Client", repr(client))
|
||||
client.shutdown()
|
||||
|
||||
BECConnector._exit_handler = terminate # type: ignore # keep a strong reference to the final cleanup
|
||||
BECConnector._add_exit_handler(terminate, 0)
|
||||
QApplication.instance().aboutToQuit.connect(self._run_exit_handlers)
|
||||
BECConnector.EXIT_HANDLERS[self.client] = terminate
|
||||
QApplication.instance().aboutToQuit.connect(terminate)
|
||||
|
||||
if config:
|
||||
self.config = config
|
||||
@@ -200,51 +187,6 @@ class BECConnector:
|
||||
|
||||
QTimer.singleShot(0, self._update_object_name)
|
||||
|
||||
@classmethod
|
||||
def _add_exit_handler(cls, handler: Callable, priority: int):
|
||||
"""Private to allow use of priority 0"""
|
||||
if inspect.ismethod(handler):
|
||||
_h = saferef.safe_ref(handler)
|
||||
|
||||
def handler():
|
||||
if h := _h():
|
||||
h()
|
||||
|
||||
# cls._method_handlers.add(handler) # hold any instance methods in safe refs
|
||||
|
||||
cls.EXIT_HANDLERS[priority] = handler
|
||||
|
||||
@classmethod
|
||||
def add_exit_handler(cls, handler: Callable, priority: int | _NextSentinel = NEXT_SENTINEL):
|
||||
"""Add a handler to be called on the cleanup of the BEC Connector. Handlers are called in reverse order of their
|
||||
priority - i.e. a higher number is higher priority. The BEC Connector's own cleanup will always be run last.
|
||||
"""
|
||||
existing_priorities = set(cls.EXIT_HANDLERS.keys())
|
||||
priority_modified = False
|
||||
if isinstance(priority, _NextSentinel):
|
||||
priority = max(existing_priorities) + 1
|
||||
if priority < 1:
|
||||
raise ValueError(
|
||||
"Please use a priority greater than 1! Priority 0 is reserved for system cleanup."
|
||||
)
|
||||
if priority in cls.EXIT_HANDLERS.keys():
|
||||
priority_modified = True
|
||||
logger.warning(f"Priority {priority} already in use - using the next available:")
|
||||
while priority in cls.EXIT_HANDLERS.keys():
|
||||
priority += 1
|
||||
if priority_modified:
|
||||
logger.warning(f"Assigned priority {priority} for {handler}.")
|
||||
cls._add_exit_handler(handler, priority)
|
||||
|
||||
@SafeSlot()
|
||||
def _run_exit_handlers(self):
|
||||
"""Run all exit handlers from highest to lowest priority. Should be connected to AboutToQuit once and only once."""
|
||||
handlers = list(
|
||||
reversed(list(handler for _, handler in sorted(BECConnector.EXIT_HANDLERS.items())))
|
||||
)
|
||||
for handler in handlers:
|
||||
handler()
|
||||
|
||||
@property
|
||||
def parent_id(self) -> str | None:
|
||||
try:
|
||||
|
||||
@@ -70,7 +70,7 @@ from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow
|
||||
from bec_widgets.widgets.containers.qt_ads import CDockWidget
|
||||
from bec_widgets.widgets.control.device_control.positioner_box import PositionerBox, PositionerBox2D
|
||||
from bec_widgets.widgets.control.scan_control import ScanControl
|
||||
from bec_widgets.widgets.editors.web_console.web_console import WebConsole
|
||||
from bec_widgets.widgets.editors.web_console.web_console import BECShell, WebConsole
|
||||
from bec_widgets.widgets.plots.heatmap.heatmap import Heatmap
|
||||
from bec_widgets.widgets.plots.image.image import Image
|
||||
from bec_widgets.widgets.plots.motor_map.motor_map import MotorMap
|
||||
@@ -378,7 +378,7 @@ class AdvancedDockArea(DockAreaWidget):
|
||||
"RingProgressBar",
|
||||
),
|
||||
"terminal": (WebConsole.ICON_NAME, "Add Terminal", "WebConsole"),
|
||||
"bec_shell": (WebConsole.ICON_NAME, "Add BEC Shell", "WebConsole"),
|
||||
"bec_shell": (BECShell.ICON_NAME, "Add BEC Shell", "BECShell"),
|
||||
"log_panel": (LogPanel.ICON_NAME, "Add LogPanel - Disabled", "LogPanel"),
|
||||
"sbb_monitor": ("train", "Add SBB Monitor", "SBBMonitor"),
|
||||
}
|
||||
@@ -501,10 +501,7 @@ class AdvancedDockArea(DockAreaWidget):
|
||||
elif key == "bec_shell":
|
||||
act.triggered.connect(
|
||||
lambda _, t=widget_type: self.new(
|
||||
widget=t,
|
||||
closable=True,
|
||||
startup_cmd=f"bec --gui-id {self.bec_dispatcher.cli_server.gui_id}",
|
||||
show_settings_action=True,
|
||||
widget=t, closable=True, show_settings_action=False
|
||||
)
|
||||
)
|
||||
else:
|
||||
|
||||
@@ -27,7 +27,7 @@ class SignalComboBox(DeviceSignalInputBase, QComboBox):
|
||||
arg_name: Argument name, can be used for the other widgets which has to call some other function in bec using correct argument names.
|
||||
"""
|
||||
|
||||
USER_ACCESS = ["set_signal", "set_device", "signals"]
|
||||
USER_ACCESS = ["set_signal", "set_device", "signals", "get_signal_name"]
|
||||
|
||||
ICON_NAME = "list_alt"
|
||||
PLUGIN = True
|
||||
@@ -148,6 +148,24 @@ class SignalComboBox(DeviceSignalInputBase, QComboBox):
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_signal_name(self) -> str:
|
||||
"""
|
||||
Get the signal name from the combobox.
|
||||
|
||||
Returns:
|
||||
str: The signal name.
|
||||
"""
|
||||
signal_name = self.currentText()
|
||||
index = self.findText(signal_name)
|
||||
if index == -1:
|
||||
return signal_name
|
||||
|
||||
signal_info = self.itemData(index)
|
||||
if signal_info:
|
||||
signal_name = signal_info.get("obj_name", signal_name)
|
||||
|
||||
return signal_name if signal_name else ""
|
||||
|
||||
@SafeSlot()
|
||||
def reset_selection(self):
|
||||
"""Reset the selection of the combobox."""
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
{'files': ['web_console.py']}
|
||||
57
bec_widgets/widgets/editors/web_console/bec_shell_plugin.py
Normal file
57
bec_widgets/widgets/editors/web_console/bec_shell_plugin.py
Normal file
@@ -0,0 +1,57 @@
|
||||
# Copyright (C) 2022 The Qt Company Ltd.
|
||||
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
|
||||
|
||||
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
|
||||
from qtpy.QtWidgets import QWidget
|
||||
|
||||
from bec_widgets.utils.bec_designer import designer_material_icon
|
||||
from bec_widgets.widgets.editors.web_console.web_console import BECShell
|
||||
|
||||
DOM_XML = """
|
||||
<ui language='c++'>
|
||||
<widget class='BECShell' name='bec_shell'>
|
||||
</widget>
|
||||
</ui>
|
||||
"""
|
||||
|
||||
|
||||
class BECShellPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._form_editor = None
|
||||
|
||||
def createWidget(self, parent):
|
||||
if parent is None:
|
||||
return QWidget()
|
||||
t = BECShell(parent)
|
||||
return t
|
||||
|
||||
def domXml(self):
|
||||
return DOM_XML
|
||||
|
||||
def group(self):
|
||||
return ""
|
||||
|
||||
def icon(self):
|
||||
return designer_material_icon(BECShell.ICON_NAME)
|
||||
|
||||
def includeFile(self):
|
||||
return "bec_shell"
|
||||
|
||||
def initialize(self, form_editor):
|
||||
self._form_editor = form_editor
|
||||
|
||||
def isContainer(self):
|
||||
return False
|
||||
|
||||
def isInitialized(self):
|
||||
return self._form_editor is not None
|
||||
|
||||
def name(self):
|
||||
return "BECShell"
|
||||
|
||||
def toolTip(self):
|
||||
return ""
|
||||
|
||||
def whatsThis(self):
|
||||
return self.toolTip()
|
||||
@@ -0,0 +1,15 @@
|
||||
def main(): # pragma: no cover
|
||||
from qtpy import PYSIDE6
|
||||
|
||||
if not PYSIDE6:
|
||||
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
|
||||
return
|
||||
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
|
||||
|
||||
from bec_widgets.widgets.editors.web_console.bec_shell_plugin import BECShellPlugin
|
||||
|
||||
QPyDesignerCustomWidgetCollection.addCustomWidget(BECShellPlugin())
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
main()
|
||||
@@ -1,21 +1,39 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
import json
|
||||
import secrets
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from louie.saferef import safe_ref
|
||||
from qtpy.QtCore import QTimer, QUrl, Signal, qInstallMessageHandler
|
||||
from pydantic import BaseModel
|
||||
from qtpy.QtCore import Qt, QTimer, QUrl, Signal, qInstallMessageHandler
|
||||
from qtpy.QtGui import QMouseEvent, QResizeEvent
|
||||
from qtpy.QtWebEngineWidgets import QWebEnginePage, QWebEngineView
|
||||
from qtpy.QtWidgets import QApplication, QVBoxLayout, QWidget
|
||||
from qtpy.QtWidgets import QApplication, QLabel, QTabWidget, QVBoxLayout, QWidget
|
||||
|
||||
from bec_widgets.utils.bec_widget import BECWidget
|
||||
from bec_widgets.utils.error_popups import SafeProperty
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class ConsoleMode(str, enum.Enum):
|
||||
ACTIVE = "active"
|
||||
INACTIVE = "inactive"
|
||||
HIDDEN = "hidden"
|
||||
|
||||
|
||||
class PageOwnerInfo(BaseModel):
|
||||
owner_gui_id: str | None = None
|
||||
widget_ids: list[str] = []
|
||||
page: QWebEnginePage | None = None
|
||||
initialized: bool = False
|
||||
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
|
||||
|
||||
class WebConsoleRegistry:
|
||||
"""
|
||||
A registry for the WebConsole class to manage its instances.
|
||||
@@ -29,14 +47,21 @@ class WebConsoleRegistry:
|
||||
self._server_process = None
|
||||
self._server_port = None
|
||||
self._token = secrets.token_hex(16)
|
||||
self._page_registry: dict[str, PageOwnerInfo] = {}
|
||||
|
||||
def register(self, instance: WebConsole):
|
||||
"""
|
||||
Register an instance of WebConsole.
|
||||
|
||||
Args:
|
||||
instance (WebConsole): The instance to register.
|
||||
"""
|
||||
self._instances[instance.gui_id] = safe_ref(instance)
|
||||
self.cleanup()
|
||||
|
||||
if instance._unique_id:
|
||||
self._register_page(instance)
|
||||
|
||||
if self._server_process is None:
|
||||
# Start the ttyd server if not already running
|
||||
self.start_ttyd()
|
||||
@@ -141,8 +166,158 @@ class WebConsoleRegistry:
|
||||
if instance.gui_id in self._instances:
|
||||
del self._instances[instance.gui_id]
|
||||
|
||||
if instance._unique_id:
|
||||
self._unregister_page(instance._unique_id, instance.gui_id)
|
||||
|
||||
self.cleanup()
|
||||
|
||||
def _register_page(self, instance: WebConsole):
|
||||
"""
|
||||
Register a page in the registry. Please note that this does not transfer ownership
|
||||
for already existing pages; it simply records which widget currently owns the page.
|
||||
Use transfer_page_ownership to change ownership.
|
||||
|
||||
Args:
|
||||
instance (WebConsole): The instance to register.
|
||||
"""
|
||||
|
||||
unique_id = instance._unique_id
|
||||
gui_id = instance.gui_id
|
||||
|
||||
if unique_id is None:
|
||||
return
|
||||
|
||||
if unique_id not in self._page_registry:
|
||||
page = BECWebEnginePage()
|
||||
page.authenticationRequired.connect(instance._authenticate)
|
||||
self._page_registry[unique_id] = PageOwnerInfo(
|
||||
owner_gui_id=gui_id, widget_ids=[gui_id], page=page
|
||||
)
|
||||
logger.info(f"Registered new page {unique_id} for {gui_id}")
|
||||
return
|
||||
|
||||
if gui_id not in self._page_registry[unique_id].widget_ids:
|
||||
self._page_registry[unique_id].widget_ids.append(gui_id)
|
||||
|
||||
def _unregister_page(self, unique_id: str, gui_id: str):
|
||||
"""
|
||||
Unregister a page from the registry.
|
||||
|
||||
Args:
|
||||
unique_id (str): The unique identifier for the page.
|
||||
gui_id (str): The GUI ID of the widget.
|
||||
"""
|
||||
if unique_id not in self._page_registry:
|
||||
return
|
||||
page_info = self._page_registry[unique_id]
|
||||
if gui_id in page_info.widget_ids:
|
||||
page_info.widget_ids.remove(gui_id)
|
||||
if page_info.owner_gui_id == gui_id:
|
||||
page_info.owner_gui_id = None
|
||||
if not page_info.widget_ids:
|
||||
if page_info.page:
|
||||
page_info.page.deleteLater()
|
||||
del self._page_registry[unique_id]
|
||||
|
||||
logger.info(f"Unregistered page {unique_id} for {gui_id}")
|
||||
|
||||
def get_page_info(self, unique_id: str) -> PageOwnerInfo | None:
|
||||
"""
|
||||
Get a page from the registry.
|
||||
|
||||
Args:
|
||||
unique_id (str): The unique identifier for the page.
|
||||
|
||||
Returns:
|
||||
PageOwnerInfo | None: The page info if found, None otherwise.
|
||||
"""
|
||||
if unique_id not in self._page_registry:
|
||||
return None
|
||||
return self._page_registry[unique_id]
|
||||
|
||||
def take_page_ownership(self, unique_id: str, new_owner_gui_id: str) -> QWebEnginePage | None:
|
||||
"""
|
||||
Transfer ownership of a page to a new owner.
|
||||
|
||||
Args:
|
||||
unique_id (str): The unique identifier for the page.
|
||||
new_owner_gui_id (str): The GUI ID of the new owner.
|
||||
|
||||
Returns:
|
||||
QWebEnginePage | None: The page if ownership transfer was successful, None otherwise.
|
||||
"""
|
||||
if unique_id not in self._page_registry:
|
||||
logger.warning(f"Page {unique_id} not found in registry")
|
||||
return None
|
||||
|
||||
page_info = self._page_registry[unique_id]
|
||||
old_owner_gui_id = page_info.owner_gui_id
|
||||
if old_owner_gui_id:
|
||||
old_owner_ref = self._instances.get(old_owner_gui_id)
|
||||
if old_owner_ref:
|
||||
old_owner_instance = old_owner_ref()
|
||||
if old_owner_instance:
|
||||
old_owner_instance.yield_ownership()
|
||||
page_info.owner_gui_id = new_owner_gui_id
|
||||
|
||||
logger.info(f"Transferred ownership of page {unique_id} to {new_owner_gui_id}")
|
||||
return page_info.page
|
||||
|
||||
def yield_ownership(self, gui_id: str) -> bool:
|
||||
"""
|
||||
Yield ownership of a page without destroying it. The page remains in the
|
||||
registry with no owner, available for another widget to claim.
|
||||
|
||||
Args:
|
||||
gui_id (str): The GUI ID of the widget yielding ownership.
|
||||
|
||||
Returns:
|
||||
bool: True if ownership was yielded, False otherwise.
|
||||
"""
|
||||
if gui_id not in self._instances:
|
||||
return False
|
||||
|
||||
instance = self._instances[gui_id]()
|
||||
if instance is None:
|
||||
return False
|
||||
|
||||
unique_id = instance._unique_id
|
||||
if unique_id is None:
|
||||
return False
|
||||
|
||||
if unique_id not in self._page_registry:
|
||||
return False
|
||||
|
||||
page_owner_info = self._page_registry[unique_id]
|
||||
if page_owner_info.owner_gui_id != gui_id:
|
||||
return False
|
||||
|
||||
page_owner_info.owner_gui_id = None
|
||||
return True
|
||||
|
||||
def owner_is_visible(self, unique_id: str) -> bool:
|
||||
"""
|
||||
Check if the owner of a page is currently visible.
|
||||
|
||||
Args:
|
||||
unique_id (str): The unique identifier for the page.
|
||||
Returns:
|
||||
bool: True if the owner is visible, False otherwise.
|
||||
"""
|
||||
page_info = self.get_page_info(unique_id)
|
||||
if page_info is None or page_info.owner_gui_id is None:
|
||||
return False
|
||||
|
||||
owner_ref = self._instances.get(page_info.owner_gui_id)
|
||||
if owner_ref is None:
|
||||
return False
|
||||
|
||||
owner_instance = owner_ref()
|
||||
if owner_instance is None:
|
||||
return False
|
||||
|
||||
return owner_instance.isVisible()
|
||||
|
||||
|
||||
_web_console_registry = WebConsoleRegistry()
|
||||
|
||||
@@ -178,34 +353,103 @@ class WebConsole(BECWidget, QWidget):
|
||||
config=None,
|
||||
client=None,
|
||||
gui_id=None,
|
||||
startup_cmd: str | None = "bec --nogui",
|
||||
startup_cmd: str | None = None,
|
||||
is_bec_shell: bool = False,
|
||||
unique_id: str | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs)
|
||||
self._mode = ConsoleMode.INACTIVE
|
||||
self._is_bec_shell = is_bec_shell
|
||||
self._startup_cmd = startup_cmd
|
||||
self._is_initialized = False
|
||||
_web_console_registry.register(self)
|
||||
self._token = _web_console_registry._token
|
||||
layout = QVBoxLayout()
|
||||
layout.setContentsMargins(0, 0, 0, 0)
|
||||
self.browser = QWebEngineView(self)
|
||||
self.page = BECWebEnginePage(self)
|
||||
self.page.authenticationRequired.connect(self._authenticate)
|
||||
self.browser.setPage(self.page)
|
||||
layout.addWidget(self.browser)
|
||||
self.setLayout(layout)
|
||||
self.page.setUrl(QUrl(f"http://localhost:{_web_console_registry._server_port}"))
|
||||
self._unique_id = unique_id
|
||||
self.page = None # Will be set in _set_up_page
|
||||
|
||||
self._startup_timer = QTimer()
|
||||
self._startup_timer.setInterval(500)
|
||||
self._startup_timer.timeout.connect(self._check_page_ready)
|
||||
self._startup_timer.start()
|
||||
self._js_callback.connect(self._on_js_callback)
|
||||
|
||||
self._set_up_page()
|
||||
|
||||
def _set_up_page(self):
|
||||
"""
|
||||
Set up the web page and UI elements.
|
||||
"""
|
||||
layout = QVBoxLayout()
|
||||
layout.setContentsMargins(0, 0, 0, 0)
|
||||
self.browser = QWebEngineView(self)
|
||||
|
||||
layout.addWidget(self.browser)
|
||||
self.setLayout(layout)
|
||||
|
||||
# prepare overlay
|
||||
self.overlay = QWidget(self)
|
||||
layout = QVBoxLayout(self.overlay)
|
||||
layout.setAlignment(Qt.AlignmentFlag.AlignCenter)
|
||||
label = QLabel("Click to activate terminal", self.overlay)
|
||||
layout.addWidget(label)
|
||||
self.overlay.hide()
|
||||
|
||||
_web_console_registry.register(self)
|
||||
self._token = _web_console_registry._token
|
||||
|
||||
# If no unique_id is provided, create a new page
|
||||
if not self._unique_id:
|
||||
self.page = BECWebEnginePage(self)
|
||||
self.page.authenticationRequired.connect(self._authenticate)
|
||||
self.page.setUrl(QUrl(f"http://localhost:{_web_console_registry._server_port}"))
|
||||
self.browser.setPage(self.page)
|
||||
self._set_mode(ConsoleMode.ACTIVE)
|
||||
return
|
||||
|
||||
# Try to get the page from the registry
|
||||
page_info = _web_console_registry.get_page_info(self._unique_id)
|
||||
if page_info and page_info.page:
|
||||
self.page = page_info.page
|
||||
if not page_info.owner_gui_id or page_info.owner_gui_id == self.gui_id:
|
||||
self.browser.setPage(self.page)
|
||||
# Only set URL if this is a newly created page (no URL set yet)
|
||||
if self.page.url().isEmpty():
|
||||
self.page.setUrl(QUrl(f"http://localhost:{_web_console_registry._server_port}"))
|
||||
else:
|
||||
# We have an existing page, so we don't need the startup timer
|
||||
self._startup_timer.stop()
|
||||
if page_info.owner_gui_id != self.gui_id:
|
||||
self._set_mode(ConsoleMode.INACTIVE)
|
||||
else:
|
||||
self._set_mode(ConsoleMode.ACTIVE)
|
||||
|
||||
def _set_mode(self, mode: ConsoleMode):
|
||||
"""
|
||||
Set the mode of the web console.
|
||||
|
||||
Args:
|
||||
mode (ConsoleMode): The mode to set.
|
||||
"""
|
||||
if not self._unique_id:
|
||||
# For non-unique_id consoles, always active
|
||||
mode = ConsoleMode.ACTIVE
|
||||
|
||||
self._mode = mode
|
||||
match mode:
|
||||
case ConsoleMode.ACTIVE:
|
||||
self.browser.setVisible(True)
|
||||
self.overlay.hide()
|
||||
case ConsoleMode.INACTIVE:
|
||||
self.browser.setVisible(False)
|
||||
self.overlay.show()
|
||||
case ConsoleMode.HIDDEN:
|
||||
self.browser.setVisible(False)
|
||||
self.overlay.hide()
|
||||
|
||||
def _check_page_ready(self):
|
||||
"""
|
||||
Check if the page is ready and stop the timer if it is.
|
||||
"""
|
||||
if self.page.isLoading():
|
||||
if not self.page or self.page.isLoading():
|
||||
return
|
||||
|
||||
self.page.runJavaScript("window.term !== undefined", self._js_callback.emit)
|
||||
@@ -218,15 +462,27 @@ class WebConsole(BECWidget, QWidget):
|
||||
return
|
||||
self._is_initialized = True
|
||||
self._startup_timer.stop()
|
||||
if self._startup_cmd:
|
||||
self.write(self._startup_cmd)
|
||||
if self.startup_cmd:
|
||||
if self._unique_id:
|
||||
page_info = _web_console_registry.get_page_info(self._unique_id)
|
||||
if page_info is None:
|
||||
return
|
||||
if not page_info.initialized:
|
||||
page_info.initialized = True
|
||||
self.write(self.startup_cmd)
|
||||
else:
|
||||
self.write(self.startup_cmd)
|
||||
self.initialized.emit()
|
||||
|
||||
@SafeProperty(str)
|
||||
@property
|
||||
def startup_cmd(self):
|
||||
"""
|
||||
Get the startup command for the web console.
|
||||
"""
|
||||
if self._is_bec_shell:
|
||||
if self.bec_dispatcher.cli_server is None:
|
||||
return "bec --nogui"
|
||||
return f"bec --gui-id {self.bec_dispatcher.cli_server.gui_id}"
|
||||
return self._startup_cmd
|
||||
|
||||
@startup_cmd.setter
|
||||
@@ -241,11 +497,123 @@ class WebConsole(BECWidget, QWidget):
|
||||
def write(self, data: str, send_return: bool = True):
|
||||
"""
|
||||
Send data to the web page
|
||||
|
||||
Args:
|
||||
data (str): The data to send.
|
||||
send_return (bool): Whether to send a return after the data.
|
||||
"""
|
||||
self.page.runJavaScript(f"window.term.paste('{data}');")
|
||||
cmd = f"window.term.paste({json.dumps(data)});"
|
||||
if self.page is None:
|
||||
logger.warning("Cannot write to web console: page is not initialized.")
|
||||
return
|
||||
self.page.runJavaScript(cmd)
|
||||
if send_return:
|
||||
self.send_return()
|
||||
|
||||
def take_page_ownership(self, unique_id: str | None = None):
|
||||
"""
|
||||
Take ownership of a web page from the registry. This will transfer the page
|
||||
from its current owner (if any) to this widget.
|
||||
|
||||
Args:
|
||||
unique_id (str): The unique identifier of the page to take ownership of.
|
||||
If None, uses this widget's unique_id.
|
||||
"""
|
||||
if unique_id is None:
|
||||
unique_id = self._unique_id
|
||||
|
||||
if not unique_id:
|
||||
logger.warning("Cannot take page ownership without a unique_id")
|
||||
return
|
||||
|
||||
# Get the page from registry
|
||||
page = _web_console_registry.take_page_ownership(unique_id, self.gui_id)
|
||||
|
||||
if not page:
|
||||
logger.warning(f"Page {unique_id} not found in registry")
|
||||
return
|
||||
|
||||
self.page = page
|
||||
self.browser.setPage(page)
|
||||
self._set_mode(ConsoleMode.ACTIVE)
|
||||
logger.info(f"Widget {self.gui_id} took ownership of page {unique_id}")
|
||||
|
||||
def _on_ownership_lost(self):
|
||||
"""
|
||||
Called when this widget loses ownership of its page.
|
||||
Displays the overlay and hides the browser.
|
||||
"""
|
||||
self._set_mode(ConsoleMode.INACTIVE)
|
||||
logger.info(f"Widget {self.gui_id} lost ownership of page {self._unique_id}")
|
||||
|
||||
def yield_ownership(self):
|
||||
"""
|
||||
Yield ownership of the page. The page remains in the registry with no owner,
|
||||
available for another widget to claim. This is automatically called when the
|
||||
widget becomes hidden.
|
||||
"""
|
||||
if not self._unique_id:
|
||||
return
|
||||
success = _web_console_registry.yield_ownership(self.gui_id)
|
||||
if success:
|
||||
self._on_ownership_lost()
|
||||
logger.info(f"Widget {self.gui_id} yielded ownership of page {self._unique_id}")
|
||||
|
||||
def has_ownership(self) -> bool:
|
||||
"""
|
||||
Check if this widget currently has ownership of a page.
|
||||
|
||||
Returns:
|
||||
bool: True if this widget owns a page, False otherwise.
|
||||
"""
|
||||
if not self._unique_id:
|
||||
return False
|
||||
page_info = _web_console_registry.get_page_info(self._unique_id)
|
||||
if page_info is None:
|
||||
return False
|
||||
return page_info.owner_gui_id == self.gui_id
|
||||
|
||||
def hideEvent(self, event):
|
||||
"""
|
||||
Called when the widget is hidden. Automatically yields ownership.
|
||||
"""
|
||||
if self.has_ownership():
|
||||
self.yield_ownership()
|
||||
self._set_mode(ConsoleMode.HIDDEN)
|
||||
super().hideEvent(event)
|
||||
|
||||
def showEvent(self, event):
|
||||
"""
|
||||
Called when the widget is shown. Updates UI state based on ownership.
|
||||
"""
|
||||
super().showEvent(event)
|
||||
if self._unique_id and not self.has_ownership():
|
||||
# Take ownership if the page does not have an owner or
|
||||
# the owner is not visible
|
||||
page_info = _web_console_registry.get_page_info(self._unique_id)
|
||||
if page_info is None:
|
||||
self._set_mode(ConsoleMode.INACTIVE)
|
||||
return
|
||||
if page_info.owner_gui_id is None or not _web_console_registry.owner_is_visible(
|
||||
self._unique_id
|
||||
):
|
||||
self.take_page_ownership(self._unique_id)
|
||||
return
|
||||
if page_info.owner_gui_id != self.gui_id:
|
||||
self._set_mode(ConsoleMode.INACTIVE)
|
||||
return
|
||||
|
||||
def resizeEvent(self, event: QResizeEvent) -> None:
|
||||
super().resizeEvent(event)
|
||||
self.overlay.resize(event.size())
|
||||
|
||||
def mousePressEvent(self, event: QMouseEvent) -> None:
|
||||
if event.button() == Qt.MouseButton.LeftButton and not self.has_ownership():
|
||||
self.take_page_ownership(self._unique_id)
|
||||
event.accept()
|
||||
return
|
||||
return super().mousePressEvent(event)
|
||||
|
||||
def _authenticate(self, _, auth):
|
||||
"""
|
||||
Authenticate the request with the provided username and password.
|
||||
@@ -286,10 +654,52 @@ class WebConsole(BECWidget, QWidget):
|
||||
super().cleanup()
|
||||
|
||||
|
||||
class BECShell(WebConsole):
|
||||
"""
|
||||
A WebConsole pre-configured to run the BEC shell.
|
||||
We cannot simply expose the web console properties to Qt as we need to have a deterministic
|
||||
startup behavior for sharing the same shell instance across multiple widgets.
|
||||
"""
|
||||
|
||||
ICON_NAME = "hub"
|
||||
|
||||
def __init__(self, parent=None, config=None, client=None, gui_id=None, **kwargs):
|
||||
super().__init__(
|
||||
parent=parent,
|
||||
config=config,
|
||||
client=client,
|
||||
gui_id=gui_id,
|
||||
is_bec_shell=True,
|
||||
unique_id="bec_shell",
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
import sys
|
||||
|
||||
app = QApplication(sys.argv)
|
||||
widget = WebConsole()
|
||||
widget = QTabWidget()
|
||||
|
||||
# Create two consoles with different unique_ids
|
||||
web_console1 = WebConsole(startup_cmd="bec --nogui", unique_id="console1")
|
||||
web_console2 = WebConsole(startup_cmd="htop")
|
||||
web_console3 = WebConsole(startup_cmd="bec --nogui", unique_id="console1")
|
||||
widget.addTab(web_console1, "Console 1")
|
||||
widget.addTab(web_console2, "Console 2")
|
||||
widget.addTab(web_console3, "Console 3 -- mirror of Console 1")
|
||||
widget.show()
|
||||
|
||||
# Demonstrate page sharing:
|
||||
# After initialization, web_console2 can take ownership of console1's page:
|
||||
# web_console2.take_page_ownership("console1")
|
||||
|
||||
widget.resize(800, 600)
|
||||
|
||||
def _close_cons1():
|
||||
web_console2.close()
|
||||
web_console2.deleteLater()
|
||||
|
||||
# QTimer.singleShot(3000, _close_cons1)
|
||||
|
||||
sys.exit(app.exec_())
|
||||
|
||||
@@ -203,6 +203,19 @@ class Heatmap(ImageBase):
|
||||
"remove_roi",
|
||||
"rois",
|
||||
"plot",
|
||||
# Device properties
|
||||
"x_device_name",
|
||||
"x_device_name.setter",
|
||||
"x_device_entry",
|
||||
"x_device_entry.setter",
|
||||
"y_device_name",
|
||||
"y_device_name.setter",
|
||||
"y_device_entry",
|
||||
"y_device_entry.setter",
|
||||
"z_device_name",
|
||||
"z_device_name.setter",
|
||||
"z_device_entry",
|
||||
"z_device_entry.setter",
|
||||
]
|
||||
|
||||
PLUGIN = True
|
||||
@@ -413,9 +426,15 @@ class Heatmap(ImageBase):
|
||||
"""
|
||||
if self._image_config is None:
|
||||
return
|
||||
x_name = self._image_config.x_device.name
|
||||
y_name = self._image_config.y_device.name
|
||||
z_name = self._image_config.z_device.name
|
||||
|
||||
# Safely get device names (might be None if not yet configured)
|
||||
x_device = self._image_config.x_device
|
||||
y_device = self._image_config.y_device
|
||||
z_device = self._image_config.z_device
|
||||
|
||||
x_name = x_device.name if x_device else None
|
||||
y_name = y_device.name if y_device else None
|
||||
z_name = z_device.name if z_device else None
|
||||
|
||||
if x_name is not None:
|
||||
self.x_label = x_name # type: ignore
|
||||
@@ -1136,6 +1155,244 @@ class Heatmap(ImageBase):
|
||||
self.crosshair.reset()
|
||||
super().reset()
|
||||
|
||||
################################################################################
|
||||
# Widget Specific Properties
|
||||
################################################################################
|
||||
|
||||
@SafeProperty(str)
|
||||
def x_device_name(self) -> str:
|
||||
"""Device name for the X axis."""
|
||||
if self._image_config.x_device is None:
|
||||
return ""
|
||||
return self._image_config.x_device.name or ""
|
||||
|
||||
@x_device_name.setter
|
||||
def x_device_name(self, device_name: str) -> None:
|
||||
"""
|
||||
Set the X device name.
|
||||
|
||||
Args:
|
||||
device_name(str): Device name for the X axis
|
||||
"""
|
||||
device_name = device_name or ""
|
||||
|
||||
# Get current entry or validate
|
||||
if device_name:
|
||||
try:
|
||||
entry = self.entry_validator.validate_signal(device_name, None)
|
||||
self._image_config.x_device = HeatmapDeviceSignal(name=device_name, entry=entry)
|
||||
self.property_changed.emit("x_device_name", device_name)
|
||||
self.update_labels() # Update axis labels
|
||||
self._try_auto_plot()
|
||||
except Exception:
|
||||
pass # Silently fail if device is not available yet
|
||||
else:
|
||||
self._image_config.x_device = None
|
||||
self.property_changed.emit("x_device_name", "")
|
||||
self.update_labels() # Clear axis labels
|
||||
|
||||
@SafeProperty(str)
|
||||
def x_device_entry(self) -> str:
|
||||
"""Signal entry for the X axis device."""
|
||||
if self._image_config.x_device is None:
|
||||
return ""
|
||||
return self._image_config.x_device.entry or ""
|
||||
|
||||
@x_device_entry.setter
|
||||
def x_device_entry(self, entry: str) -> None:
|
||||
"""
|
||||
Set the X device entry.
|
||||
|
||||
Args:
|
||||
entry(str): Signal entry for the X axis device
|
||||
"""
|
||||
if not entry:
|
||||
return
|
||||
|
||||
if self._image_config.x_device is None:
|
||||
logger.warning("Cannot set x_device_entry without x_device_name set first.")
|
||||
return
|
||||
|
||||
device_name = self._image_config.x_device.name
|
||||
try:
|
||||
# Validate the entry for this device
|
||||
validated_entry = self.entry_validator.validate_signal(device_name, entry)
|
||||
self._image_config.x_device = HeatmapDeviceSignal(
|
||||
name=device_name, entry=validated_entry
|
||||
)
|
||||
self.property_changed.emit("x_device_entry", validated_entry)
|
||||
self.update_labels() # Update axis labels
|
||||
self._try_auto_plot()
|
||||
except Exception:
|
||||
pass # Silently fail if validation fails
|
||||
|
||||
@SafeProperty(str)
|
||||
def y_device_name(self) -> str:
|
||||
"""Device name for the Y axis."""
|
||||
if self._image_config.y_device is None:
|
||||
return ""
|
||||
return self._image_config.y_device.name or ""
|
||||
|
||||
@y_device_name.setter
|
||||
def y_device_name(self, device_name: str) -> None:
|
||||
"""
|
||||
Set the Y device name.
|
||||
|
||||
Args:
|
||||
device_name(str): Device name for the Y axis
|
||||
"""
|
||||
device_name = device_name or ""
|
||||
|
||||
# Get current entry or validate
|
||||
if device_name:
|
||||
try:
|
||||
entry = self.entry_validator.validate_signal(device_name, None)
|
||||
self._image_config.y_device = HeatmapDeviceSignal(name=device_name, entry=entry)
|
||||
self.property_changed.emit("y_device_name", device_name)
|
||||
self.update_labels() # Update axis labels
|
||||
self._try_auto_plot()
|
||||
except Exception:
|
||||
pass # Silently fail if device is not available yet
|
||||
else:
|
||||
self._image_config.y_device = None
|
||||
self.property_changed.emit("y_device_name", "")
|
||||
self.update_labels() # Clear axis labels
|
||||
|
||||
@SafeProperty(str)
|
||||
def y_device_entry(self) -> str:
|
||||
"""Signal entry for the Y axis device."""
|
||||
if self._image_config.y_device is None:
|
||||
return ""
|
||||
return self._image_config.y_device.entry or ""
|
||||
|
||||
@y_device_entry.setter
|
||||
def y_device_entry(self, entry: str) -> None:
|
||||
"""
|
||||
Set the Y device entry.
|
||||
|
||||
Args:
|
||||
entry(str): Signal entry for the Y axis device
|
||||
"""
|
||||
if not entry:
|
||||
return
|
||||
|
||||
if self._image_config.y_device is None:
|
||||
logger.warning("Cannot set y_device_entry without y_device_name set first.")
|
||||
return
|
||||
|
||||
device_name = self._image_config.y_device.name
|
||||
try:
|
||||
# Validate the entry for this device
|
||||
validated_entry = self.entry_validator.validate_signal(device_name, entry)
|
||||
self._image_config.y_device = HeatmapDeviceSignal(
|
||||
name=device_name, entry=validated_entry
|
||||
)
|
||||
self.property_changed.emit("y_device_entry", validated_entry)
|
||||
self.update_labels() # Update axis labels
|
||||
self._try_auto_plot()
|
||||
except Exception as e:
|
||||
logger.debug(f"Y device entry validation failed: {e}")
|
||||
pass # Silently fail if validation fails
|
||||
|
||||
@SafeProperty(str)
|
||||
def z_device_name(self) -> str:
|
||||
"""Device name for the Z (color) axis."""
|
||||
if self._image_config.z_device is None:
|
||||
return ""
|
||||
return self._image_config.z_device.name or ""
|
||||
|
||||
@z_device_name.setter
|
||||
def z_device_name(self, device_name: str) -> None:
|
||||
"""
|
||||
Set the Z device name.
|
||||
|
||||
Args:
|
||||
device_name(str): Device name for the Z axis
|
||||
"""
|
||||
device_name = device_name or ""
|
||||
|
||||
# Get current entry or validate
|
||||
if device_name:
|
||||
try:
|
||||
entry = self.entry_validator.validate_signal(device_name, None)
|
||||
self._image_config.z_device = HeatmapDeviceSignal(name=device_name, entry=entry)
|
||||
self.property_changed.emit("z_device_name", device_name)
|
||||
self.update_labels() # Update axis labels (title)
|
||||
self._try_auto_plot()
|
||||
except Exception as e:
|
||||
logger.debug(f"Z device name validation failed: {e}")
|
||||
pass # Silently fail if device is not available yet
|
||||
else:
|
||||
self._image_config.z_device = None
|
||||
self.property_changed.emit("z_device_name", "")
|
||||
self.update_labels() # Clear axis labels
|
||||
|
||||
@SafeProperty(str)
|
||||
def z_device_entry(self) -> str:
|
||||
"""Signal entry for the Z (color) axis device."""
|
||||
if self._image_config.z_device is None:
|
||||
return ""
|
||||
return self._image_config.z_device.entry or ""
|
||||
|
||||
@z_device_entry.setter
|
||||
def z_device_entry(self, entry: str) -> None:
|
||||
"""
|
||||
Set the Z device entry.
|
||||
|
||||
Args:
|
||||
entry(str): Signal entry for the Z axis device
|
||||
"""
|
||||
if not entry:
|
||||
return
|
||||
|
||||
if self._image_config.z_device is None:
|
||||
logger.warning("Cannot set z_device_entry without z_device_name set first.")
|
||||
return
|
||||
|
||||
device_name = self._image_config.z_device.name
|
||||
try:
|
||||
# Validate the entry for this device
|
||||
validated_entry = self.entry_validator.validate_signal(device_name, entry)
|
||||
self._image_config.z_device = HeatmapDeviceSignal(
|
||||
name=device_name, entry=validated_entry
|
||||
)
|
||||
self.property_changed.emit("z_device_entry", validated_entry)
|
||||
self.update_labels() # Update axis labels (title)
|
||||
self._try_auto_plot()
|
||||
except Exception as e:
|
||||
logger.debug(f"Z device entry validation failed: {e}")
|
||||
pass # Silently fail if validation fails
|
||||
|
||||
def _try_auto_plot(self) -> None:
|
||||
"""
|
||||
Attempt to automatically call plot() if all three devices are set.
|
||||
Similar to waveform's approach but requires all three devices.
|
||||
"""
|
||||
has_x = self._image_config.x_device is not None
|
||||
has_y = self._image_config.y_device is not None
|
||||
has_z = self._image_config.z_device is not None
|
||||
|
||||
if has_x and has_y and has_z:
|
||||
x_name = self._image_config.x_device.name
|
||||
x_entry = self._image_config.x_device.entry
|
||||
y_name = self._image_config.y_device.name
|
||||
y_entry = self._image_config.y_device.entry
|
||||
z_name = self._image_config.z_device.name
|
||||
z_entry = self._image_config.z_device.entry
|
||||
try:
|
||||
self.plot(
|
||||
x_name=x_name,
|
||||
y_name=y_name,
|
||||
z_name=z_name,
|
||||
x_entry=x_entry,
|
||||
y_entry=y_entry,
|
||||
z_entry=z_entry,
|
||||
validate_bec=False, # Don't validate - entries already validated
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug(f"Auto-plot failed: {e}")
|
||||
pass # Silently fail if plot cannot be called yet
|
||||
|
||||
@SafeProperty(str)
|
||||
def interpolation_method(self) -> str:
|
||||
"""
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from qtpy.QtWidgets import QFrame, QScrollArea, QVBoxLayout
|
||||
|
||||
@@ -9,11 +8,6 @@ from bec_widgets.utils import UILoader
|
||||
from bec_widgets.utils.error_popups import SafeSlot
|
||||
from bec_widgets.utils.settings_dialog import SettingWidget
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import (
|
||||
SignalComboBox,
|
||||
)
|
||||
|
||||
|
||||
class HeatmapSettings(SettingWidget):
|
||||
def __init__(self, parent=None, target_widget=None, popup=False, *args, **kwargs):
|
||||
@@ -120,36 +114,17 @@ class HeatmapSettings(SettingWidget):
|
||||
getattr(self.target_widget._image_config, "enforce_interpolation", False)
|
||||
)
|
||||
|
||||
def _get_signal_name(self, signal: SignalComboBox) -> str:
|
||||
"""
|
||||
Get the signal name from the signal combobox.
|
||||
Args:
|
||||
signal (SignalComboBox): The signal combobox to get the name from.
|
||||
Returns:
|
||||
str: The signal name.
|
||||
"""
|
||||
device_entry = signal.currentText()
|
||||
index = signal.findText(device_entry)
|
||||
if index == -1:
|
||||
return device_entry
|
||||
|
||||
device_entry_info = signal.itemData(index)
|
||||
if device_entry_info:
|
||||
device_entry = device_entry_info.get("obj_name", device_entry)
|
||||
|
||||
return device_entry if device_entry else ""
|
||||
|
||||
@SafeSlot()
|
||||
def accept_changes(self):
|
||||
"""
|
||||
Apply all properties from the settings widget to the target widget.
|
||||
"""
|
||||
x_name = self.ui.x_name.currentText()
|
||||
x_entry = self._get_signal_name(self.ui.x_entry)
|
||||
x_entry = self.ui.x_entry.get_signal_name()
|
||||
y_name = self.ui.y_name.currentText()
|
||||
y_entry = self._get_signal_name(self.ui.y_entry)
|
||||
y_entry = self.ui.y_entry.get_signal_name()
|
||||
z_name = self.ui.z_name.currentText()
|
||||
z_entry = self._get_signal_name(self.ui.z_entry)
|
||||
z_entry = self.ui.z_entry.get_signal_name()
|
||||
validate_bec = self.ui.validate_bec.checked
|
||||
color_map = self.ui.color_map.colormap
|
||||
interpolation = self.ui.interpolation.currentText()
|
||||
|
||||
@@ -7,8 +7,8 @@ import numpy as np
|
||||
from bec_lib import bec_logger
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
from qtpy.QtCore import Qt, QTimer
|
||||
from qtpy.QtWidgets import QComboBox, QStyledItemDelegate, QWidget
|
||||
from qtpy.QtCore import QTimer
|
||||
from qtpy.QtWidgets import QComboBox, QWidget
|
||||
|
||||
from bec_widgets.utils import ConnectionConfig
|
||||
from bec_widgets.utils.colors import Colors
|
||||
@@ -49,6 +49,9 @@ class ImageLayerConfig(BaseModel):
|
||||
source: Literal["device_monitor_1d", "device_monitor_2d", "auto"] = Field(
|
||||
"auto", description="The source of the image data."
|
||||
)
|
||||
async_signal_name: str | None = Field(
|
||||
None, description="Async signal name (obj_name) used for async endpoints."
|
||||
)
|
||||
|
||||
|
||||
class Image(ImageBase):
|
||||
@@ -95,6 +98,7 @@ class Image(ImageBase):
|
||||
"remove_roi",
|
||||
"rois",
|
||||
]
|
||||
SUPPORTED_SIGNALS = ["AsyncSignal", "AsyncMultiSignal", "DynamicSignal"]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -116,7 +120,14 @@ class Image(ImageBase):
|
||||
)
|
||||
self._init_toolbar_image()
|
||||
self.layer_removed.connect(self._on_layer_removed)
|
||||
self.old_scan_id = None
|
||||
self.scan_id = None
|
||||
self.async_update = False
|
||||
self.bec_dispatcher.connect_slot(self.on_scan_status, MessageEndpoints.scan_status())
|
||||
self.bec_dispatcher.connect_slot(self.on_scan_progress, MessageEndpoints.scan_progress())
|
||||
self.bec_dispatcher.connect_slot(
|
||||
self._populate_signals, MessageEndpoints.device_config_update()
|
||||
)
|
||||
|
||||
##################################
|
||||
### Toolbar Initialization
|
||||
@@ -181,20 +192,35 @@ class Image(ImageBase):
|
||||
Adjust the size of the device combo box and populate it with preview signals.
|
||||
Has to be done with QTimer.singleShot to ensure the UI is fully initialized, needed for testing.
|
||||
"""
|
||||
self._populate_preview_signals()
|
||||
self._reverse_device_items()
|
||||
self.device_combo_box.setCurrentText("") # set again default to empty string
|
||||
self._populate_signals()
|
||||
|
||||
def _populate_preview_signals(self) -> None:
|
||||
@SafeSlot(dict, dict)
|
||||
def _populate_signals(self, data: dict | None = None, meta: dict | None = None) -> None:
|
||||
"""
|
||||
Populate the device combo box with preview-signal devices in the
|
||||
format '<device>_<signal>' and store the tuple(device, signal) in
|
||||
the item's userData for later use.
|
||||
(Re)populate the device combo box with preview/async signals,
|
||||
matching the initial setup logic.
|
||||
"""
|
||||
self.device_combo_box.blockSignals(True)
|
||||
self.device_combo_box.clear()
|
||||
# Rebuild base device list via the combobox' own filtering logic
|
||||
self.device_combo_box.update_devices_from_filters()
|
||||
base_count = self.device_combo_box.count()
|
||||
# Place an empty default entry between base devices and signal entries
|
||||
self.device_combo_box.insertItem(base_count, "", None)
|
||||
|
||||
preview_signals = self.client.device_manager.get_bec_signals("PreviewSignal")
|
||||
for device, signal, signal_config in preview_signals:
|
||||
label = signal_config.get("obj_name", f"{device}_{signal}")
|
||||
async_signals = self.client.device_manager.get_bec_signals(self.SUPPORTED_SIGNALS)
|
||||
all_signals = preview_signals + async_signals
|
||||
for device, signal, signal_config in all_signals:
|
||||
describe = signal_config.get("describe") or {}
|
||||
signal_info = describe.get("signal_info") or {}
|
||||
ndim = signal_info.get("ndim", 0)
|
||||
if ndim == 0:
|
||||
continue
|
||||
label = signal_config.get("storage_name", f"{device}_{signal}")
|
||||
self.device_combo_box.addItem(label, (device, signal, signal_config))
|
||||
self.device_combo_box.setCurrentText("")
|
||||
self.device_combo_box.blockSignals(False)
|
||||
|
||||
def _reverse_device_items(self) -> None:
|
||||
"""
|
||||
@@ -422,51 +448,93 @@ class Image(ImageBase):
|
||||
"""
|
||||
|
||||
# TODO consider moving connecting and disconnecting logic to Image itself if multiple images
|
||||
self.async_update = False
|
||||
config = self.subscriptions["main"]
|
||||
needs_async_setup = False
|
||||
config.async_signal_name = None
|
||||
if isinstance(monitor, (list, tuple)):
|
||||
device = self.dev[monitor[0]]
|
||||
signal = monitor[1]
|
||||
try:
|
||||
device = self.dev[monitor[0]]
|
||||
except KeyError:
|
||||
logger.warning(f"Device '{monitor[0]}' not found; cannot connect monitor.")
|
||||
return
|
||||
# signal = monitor[1]
|
||||
signal = self._check_async_signal_found(monitor[0], monitor[1])
|
||||
if len(monitor) == 3:
|
||||
signal_config = monitor[2]
|
||||
else:
|
||||
signal_config = device._info["signals"][signal]
|
||||
try:
|
||||
signal_config = device._info["signals"][signal]
|
||||
except KeyError:
|
||||
logger.warning(f"Signal '{signal}' not found on device '{device.name}'.")
|
||||
return
|
||||
signal_class = signal_config.get("signal_class", None)
|
||||
if signal_class != "PreviewSignal":
|
||||
logger.warning(f"Signal '{monitor}' is not a PreviewSignal.")
|
||||
allowed_signal_classes = ["PreviewSignal"] + self.SUPPORTED_SIGNALS
|
||||
if signal_class not in allowed_signal_classes:
|
||||
logger.warning(
|
||||
f"Signal `{monitor}` is not a PreviewSignal or a supported async signal."
|
||||
)
|
||||
return
|
||||
|
||||
ndim = signal_config.get("describe", None).get("signal_info", None).get("ndim", None)
|
||||
describe = signal_config.get("describe") or {}
|
||||
signal_info = describe.get("signal_info") or {}
|
||||
ndim = signal_info.get("ndim", None)
|
||||
if ndim is None:
|
||||
logger.warning(
|
||||
f"Signal '{monitor}' does not have a valid 'ndim' in its signal_info."
|
||||
)
|
||||
return
|
||||
|
||||
if ndim == 1:
|
||||
self.bec_dispatcher.connect_slot(
|
||||
self.on_image_update_1d, MessageEndpoints.device_preview(device.name, signal)
|
||||
)
|
||||
self.subscriptions["main"].source = "device_monitor_1d"
|
||||
self.subscriptions["main"].monitor_type = "1d"
|
||||
config.source = "device_monitor_1d"
|
||||
config.monitor_type = "1d"
|
||||
if signal_class == "PreviewSignal":
|
||||
self.bec_dispatcher.connect_slot(
|
||||
self.on_image_update_1d,
|
||||
MessageEndpoints.device_preview(device.name, signal),
|
||||
)
|
||||
elif signal_class in self.SUPPORTED_SIGNALS:
|
||||
self.async_update = True
|
||||
needs_async_setup = True
|
||||
config.async_signal_name = signal_config.get(
|
||||
"obj_name", f"{device.name}_{signal}"
|
||||
)
|
||||
else:
|
||||
logger.warning(f"Unsupported signal class '{signal_class}' for 1D monitor.")
|
||||
return
|
||||
elif ndim == 2:
|
||||
self.bec_dispatcher.connect_slot(
|
||||
self.on_image_update_2d, MessageEndpoints.device_preview(device.name, signal)
|
||||
)
|
||||
self.subscriptions["main"].source = "device_monitor_2d"
|
||||
self.subscriptions["main"].monitor_type = "2d"
|
||||
config.source = "device_monitor_2d"
|
||||
config.monitor_type = "2d"
|
||||
if signal_class == "PreviewSignal":
|
||||
self.bec_dispatcher.connect_slot(
|
||||
self.on_image_update_2d,
|
||||
MessageEndpoints.device_preview(device.name, signal),
|
||||
)
|
||||
elif signal_class in self.SUPPORTED_SIGNALS:
|
||||
self.async_update = True
|
||||
needs_async_setup = True
|
||||
config.async_signal_name = signal_config.get(
|
||||
"obj_name", f"{device.name}_{signal}"
|
||||
)
|
||||
else:
|
||||
logger.warning(f"Unsupported signal class '{signal_class}' for 2D monitor.")
|
||||
return
|
||||
else:
|
||||
logger.warning(f"Unsupported ndim '{ndim}' for monitor '{monitor}'.")
|
||||
return
|
||||
|
||||
else: # FIXME old monitor 1d/2d endpoint handling, present for backwards compatibility, will be removed in future versions
|
||||
if type == "1d":
|
||||
self.bec_dispatcher.connect_slot(
|
||||
self.on_image_update_1d, MessageEndpoints.device_monitor_1d(monitor)
|
||||
)
|
||||
self.subscriptions["main"].source = "device_monitor_1d"
|
||||
self.subscriptions["main"].monitor_type = "1d"
|
||||
config.source = "device_monitor_1d"
|
||||
config.monitor_type = "1d"
|
||||
elif type == "2d":
|
||||
self.bec_dispatcher.connect_slot(
|
||||
self.on_image_update_2d, MessageEndpoints.device_monitor_2d(monitor)
|
||||
)
|
||||
self.subscriptions["main"].source = "device_monitor_2d"
|
||||
self.subscriptions["main"].monitor_type = "2d"
|
||||
config.source = "device_monitor_2d"
|
||||
config.monitor_type = "2d"
|
||||
elif type == "auto":
|
||||
self.bec_dispatcher.connect_slot(
|
||||
self.on_image_update_1d, MessageEndpoints.device_monitor_1d(monitor)
|
||||
@@ -474,14 +542,141 @@ class Image(ImageBase):
|
||||
self.bec_dispatcher.connect_slot(
|
||||
self.on_image_update_2d, MessageEndpoints.device_monitor_2d(monitor)
|
||||
)
|
||||
self.subscriptions["main"].source = "auto"
|
||||
config.source = "auto"
|
||||
logger.warning(
|
||||
f"Updates for '{monitor}' will be fetch from both 1D and 2D monitor endpoints."
|
||||
)
|
||||
self.subscriptions["main"].monitor_type = "auto"
|
||||
config.monitor_type = "auto"
|
||||
|
||||
config.monitor = monitor
|
||||
if needs_async_setup:
|
||||
self._setup_async_image(self.scan_id)
|
||||
logger.info(f"Connected to {monitor} with type {type}")
|
||||
self.subscriptions["main"].monitor = monitor
|
||||
|
||||
@SafeSlot(dict, dict)
|
||||
def on_scan_status(self, msg: dict, meta: dict):
|
||||
"""
|
||||
Initial scan status message handler, which is triggered at the begging and end of scan.
|
||||
Needed for setup of AsyncSignal connections.
|
||||
|
||||
Args:
|
||||
msg(dict): The message content.
|
||||
meta(dict): The message metadata.
|
||||
"""
|
||||
current_scan_id = msg.get("scan_id", None)
|
||||
if current_scan_id is None:
|
||||
return
|
||||
self._handle_scan_change(current_scan_id)
|
||||
|
||||
@SafeSlot(dict, dict)
|
||||
def on_scan_progress(self, msg: dict, meta: dict):
|
||||
"""
|
||||
For setting async image readback during scan progress updates if widget is started later than scan.
|
||||
|
||||
Args:
|
||||
msg(dict): The message content.
|
||||
meta(dict): The message metadata.
|
||||
"""
|
||||
current_scan_id = meta.get("scan_id", None)
|
||||
if current_scan_id is None:
|
||||
return
|
||||
self._handle_scan_change(current_scan_id)
|
||||
|
||||
def _handle_scan_change(self, current_scan_id: str):
|
||||
"""
|
||||
Update internal scan ids and refresh async connections if needed.
|
||||
|
||||
Args:
|
||||
current_scan_id (str): The current scan identifier.
|
||||
"""
|
||||
if current_scan_id == self.scan_id:
|
||||
return
|
||||
self.old_scan_id = self.scan_id
|
||||
self.scan_id = current_scan_id
|
||||
if self.async_update:
|
||||
self._setup_async_image(scan_id=self.scan_id)
|
||||
|
||||
def _get_async_signal_name(self) -> tuple[str, str] | None:
|
||||
"""
|
||||
Returns device name and async signal name used for endpoints/messages.
|
||||
|
||||
Returns:
|
||||
tuple[str, str] | None: (device_name, async_signal_name) or None if not available.
|
||||
"""
|
||||
config = self.subscriptions["main"]
|
||||
monitor = config.monitor
|
||||
if monitor is None or not isinstance(monitor, (list, tuple)) or len(monitor) < 2:
|
||||
return None
|
||||
device_name = monitor[0]
|
||||
async_signal = self._check_async_signal_found(
|
||||
name=device_name, signal=config.async_signal_name or monitor[1]
|
||||
)
|
||||
return device_name, async_signal
|
||||
|
||||
def _check_async_signal_found(self, name: str, signal: str) -> str:
|
||||
"""
|
||||
Check if the async signal is found in the BEC device manager.
|
||||
|
||||
Args:
|
||||
name(str): The name of the async signal.
|
||||
signal(str): The entry of the async signal.
|
||||
|
||||
Returns:
|
||||
tuple[bool, str]: A tuple where the first element is True if the async signal is found (False otherwise),
|
||||
and the second element is the signal name (either the original signal or the storage_name for AsyncMultiSignal).
|
||||
"""
|
||||
bec_async_signals = self.client.device_manager.get_bec_signals(self.SUPPORTED_SIGNALS)
|
||||
for entry_name, _, entry_data in bec_async_signals:
|
||||
if entry_name == name and entry_data.get("obj_name") == signal:
|
||||
return entry_data.get("storage_name")
|
||||
return signal
|
||||
|
||||
def _setup_async_image(self, scan_id: str | None):
|
||||
"""
|
||||
(Re)connect async image readback for the current scan.
|
||||
|
||||
Args:
|
||||
scan_id (str | None): The scan identifier to subscribe to.
|
||||
"""
|
||||
if not self.async_update:
|
||||
return
|
||||
|
||||
config = self.subscriptions["main"]
|
||||
async_names = self._get_async_signal_name()
|
||||
if async_names is None:
|
||||
logger.info("Async image setup skipped because monitor information is incomplete.")
|
||||
return
|
||||
|
||||
device_name, async_signal = async_names
|
||||
if config.monitor_type == "1d":
|
||||
slot = self.on_image_update_1d
|
||||
elif config.monitor_type == "2d":
|
||||
slot = self.on_image_update_2d
|
||||
else:
|
||||
logger.warning(
|
||||
f"Async image setup skipped due to unsupported monitor type '{config.monitor_type}'."
|
||||
)
|
||||
return
|
||||
|
||||
# Disconnect any previous scan subscriptions to avoid stale updates.
|
||||
for prev_scan_id in (self.old_scan_id, self.scan_id):
|
||||
if prev_scan_id is None:
|
||||
continue
|
||||
self.bec_dispatcher.disconnect_slot(
|
||||
slot, MessageEndpoints.device_async_signal(prev_scan_id, device_name, async_signal)
|
||||
)
|
||||
|
||||
if scan_id is None:
|
||||
logger.info("Scan ID not available yet; delaying async image subscription.")
|
||||
return
|
||||
|
||||
self.bec_dispatcher.connect_slot(
|
||||
slot,
|
||||
MessageEndpoints.device_async_signal(scan_id, device_name, async_signal),
|
||||
from_start=True,
|
||||
cb_info={"scan_id": scan_id},
|
||||
)
|
||||
logger.info(f"Setup async image for {device_name}.{async_signal} and scan {scan_id}.")
|
||||
|
||||
def disconnect_monitor(self, monitor: str | tuple):
|
||||
"""
|
||||
@@ -490,20 +685,47 @@ class Image(ImageBase):
|
||||
Args:
|
||||
monitor(str|tuple): The name of the monitor to disconnect, or a tuple of (device, signal) for preview signals.
|
||||
"""
|
||||
config = self.subscriptions["main"]
|
||||
if isinstance(monitor, (list, tuple)):
|
||||
if self.subscriptions["main"].source == "device_monitor_1d":
|
||||
self.bec_dispatcher.disconnect_slot(
|
||||
self.on_image_update_1d, MessageEndpoints.device_preview(monitor[0], monitor[1])
|
||||
)
|
||||
elif self.subscriptions["main"].source == "device_monitor_2d":
|
||||
self.bec_dispatcher.disconnect_slot(
|
||||
self.on_image_update_2d, MessageEndpoints.device_preview(monitor[0], monitor[1])
|
||||
)
|
||||
if self.async_update:
|
||||
async_names = self._get_async_signal_name()
|
||||
ids_to_check = [self.scan_id, self.old_scan_id]
|
||||
if config.source == "device_monitor_1d":
|
||||
for scan_id in ids_to_check:
|
||||
if scan_id is None or async_names is None:
|
||||
continue
|
||||
self.bec_dispatcher.disconnect_slot(
|
||||
self.on_image_update_1d,
|
||||
MessageEndpoints.device_async_signal(
|
||||
scan_id, async_names[0], async_names[1]
|
||||
),
|
||||
)
|
||||
elif config.source == "device_monitor_2d":
|
||||
for scan_id in ids_to_check:
|
||||
if scan_id is None or async_names is None:
|
||||
continue
|
||||
self.bec_dispatcher.disconnect_slot(
|
||||
self.on_image_update_2d,
|
||||
MessageEndpoints.device_async_signal(
|
||||
scan_id, async_names[0], async_names[1]
|
||||
),
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Cannot disconnect monitor {monitor} with source {self.subscriptions['main'].source}"
|
||||
)
|
||||
return
|
||||
if config.source == "device_monitor_1d":
|
||||
self.bec_dispatcher.disconnect_slot(
|
||||
self.on_image_update_1d,
|
||||
MessageEndpoints.device_preview(monitor[0], monitor[1]),
|
||||
)
|
||||
elif config.source == "device_monitor_2d":
|
||||
self.bec_dispatcher.disconnect_slot(
|
||||
self.on_image_update_2d,
|
||||
MessageEndpoints.device_preview(monitor[0], monitor[1]),
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Cannot disconnect monitor {monitor} with source {self.subscriptions['main'].source}"
|
||||
)
|
||||
return
|
||||
else: # FIXME old monitor 1d/2d endpoint handling, present for backwards compatibility, will be removed in future versions
|
||||
self.bec_dispatcher.disconnect_slot(
|
||||
self.on_image_update_1d, MessageEndpoints.device_monitor_1d(monitor)
|
||||
@@ -512,6 +734,8 @@ class Image(ImageBase):
|
||||
self.on_image_update_2d, MessageEndpoints.device_monitor_2d(monitor)
|
||||
)
|
||||
self.subscriptions["main"].monitor = None
|
||||
self.subscriptions["main"].async_signal_name = None
|
||||
self.async_update = False
|
||||
self._sync_device_selection()
|
||||
|
||||
########################################
|
||||
@@ -526,7 +750,7 @@ class Image(ImageBase):
|
||||
msg(dict): The message containing the data.
|
||||
metadata(dict): The metadata associated with the message.
|
||||
"""
|
||||
data = msg["data"]
|
||||
data = self._get_payload_data(msg)
|
||||
current_scan_id = metadata.get("scan_id", None)
|
||||
|
||||
if current_scan_id is None:
|
||||
@@ -538,6 +762,9 @@ class Image(ImageBase):
|
||||
self.main_image.max_len = 0
|
||||
if self.crosshair is not None:
|
||||
self.crosshair.reset()
|
||||
if data is None:
|
||||
logger.warning("No data received for image update.")
|
||||
return
|
||||
image_buffer = self.adjust_image_buffer(self.main_image, data)
|
||||
if self._color_bar is not None:
|
||||
self._color_bar.blockSignals(True)
|
||||
@@ -590,7 +817,10 @@ class Image(ImageBase):
|
||||
msg(dict): The message containing the data.
|
||||
metadata(dict): The metadata associated with the message.
|
||||
"""
|
||||
data = msg["data"]
|
||||
data = self._get_payload_data(msg)
|
||||
if data is None:
|
||||
logger.warning("No data received for image update.")
|
||||
return
|
||||
if self._color_bar is not None:
|
||||
self._color_bar.blockSignals(True)
|
||||
self.main_image.set_data(data)
|
||||
@@ -598,6 +828,22 @@ class Image(ImageBase):
|
||||
self._color_bar.blockSignals(False)
|
||||
self.image_updated.emit()
|
||||
|
||||
def _get_payload_data(self, msg: dict) -> np.ndarray | None:
|
||||
"""
|
||||
Extract payload from async/preview/monitor1D/2D message structures due to inconsistent formats in backend.
|
||||
|
||||
Args:
|
||||
msg (dict): The incoming message containing data.
|
||||
"""
|
||||
if not self.async_update:
|
||||
return msg.get("data")
|
||||
async_names = self._get_async_signal_name()
|
||||
if async_names is None:
|
||||
logger.warning("Async payload extraction failed; monitor info incomplete.")
|
||||
return None
|
||||
_, async_signal = async_names
|
||||
return msg.get("signals", {}).get(async_signal, {}).get("value", None)
|
||||
|
||||
################################################################################
|
||||
# Clean up
|
||||
################################################################################
|
||||
@@ -634,6 +880,8 @@ class Image(ImageBase):
|
||||
self.device_combo_box.deleteLater()
|
||||
self.dim_combo_box.close()
|
||||
self.dim_combo_box.deleteLater()
|
||||
self.bec_dispatcher.disconnect_slot(self.on_scan_status, MessageEndpoints.scan_status())
|
||||
self.bec_dispatcher.disconnect_slot(self.on_scan_progress, MessageEndpoints.scan_progress())
|
||||
super().cleanup()
|
||||
|
||||
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
|
||||
import pyqtgraph as pg
|
||||
from bec_lib import bec_logger
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
@@ -45,12 +43,24 @@ class ScatterWaveform(PlotBase):
|
||||
USER_ACCESS = [
|
||||
*PlotBase.USER_ACCESS,
|
||||
# Scatter Waveform Specific RPC Access
|
||||
"main_curve",
|
||||
"color_map",
|
||||
"color_map.setter",
|
||||
"plot",
|
||||
"update_with_scan_history",
|
||||
"clear_all",
|
||||
# Device properties
|
||||
"x_device_name",
|
||||
"x_device_name.setter",
|
||||
"x_device_entry",
|
||||
"x_device_entry.setter",
|
||||
"y_device_name",
|
||||
"y_device_name.setter",
|
||||
"y_device_entry",
|
||||
"y_device_entry.setter",
|
||||
"z_device_name",
|
||||
"z_device_name.setter",
|
||||
"z_device_entry",
|
||||
"z_device_entry.setter",
|
||||
]
|
||||
|
||||
sync_signal_update = Signal()
|
||||
@@ -93,6 +103,13 @@ class ScatterWaveform(PlotBase):
|
||||
)
|
||||
|
||||
self._init_scatter_curve_settings()
|
||||
|
||||
# Show toolbar bundles - only include scatter_waveform_settings if not in SIDE mode
|
||||
shown_bundles = ["plot_export", "mouse_interaction", "roi", "axis_popup"]
|
||||
if self.ui_mode != UIMode.SIDE:
|
||||
shown_bundles.insert(0, "scatter_waveform_settings")
|
||||
self.toolbar.show_bundles(shown_bundles)
|
||||
|
||||
self.update_with_scan_history(-1)
|
||||
|
||||
################################################################################
|
||||
@@ -121,15 +138,9 @@ class ScatterWaveform(PlotBase):
|
||||
checkable=True,
|
||||
parent=self,
|
||||
)
|
||||
self.toolbar.components.add_safe("scatter_waveform_settings", scatter_curve_action)
|
||||
self.toolbar.get_bundle("axis_popup").add_action("scatter_waveform_settings")
|
||||
self.toolbar.add_action("scatter_waveform_settings", scatter_curve_action)
|
||||
scatter_curve_action.action.triggered.connect(self.show_scatter_curve_settings)
|
||||
|
||||
shown_bundles = self.toolbar.shown_bundles
|
||||
if "performance" in shown_bundles:
|
||||
shown_bundles.remove("performance")
|
||||
self.toolbar.show_bundles(shown_bundles)
|
||||
|
||||
def show_scatter_curve_settings(self):
|
||||
"""
|
||||
Show the scatter curve settings dialog.
|
||||
@@ -145,7 +156,7 @@ class ScatterWaveform(PlotBase):
|
||||
window_title="Scatter Curve Settings",
|
||||
modal=False,
|
||||
)
|
||||
self.scatter_dialog.resize(620, 200)
|
||||
self.scatter_dialog.resize(700, 240)
|
||||
# When the dialog is closed, update the toolbar icon and clear the reference
|
||||
self.scatter_dialog.finished.connect(self._scatter_dialog_closed)
|
||||
self.scatter_dialog.show()
|
||||
@@ -191,27 +202,6 @@ class ScatterWaveform(PlotBase):
|
||||
except ValidationError:
|
||||
return
|
||||
|
||||
@SafeProperty(str, designable=False, popup_error=True)
|
||||
def curve_json(self) -> str:
|
||||
"""
|
||||
Get the curve configuration as a JSON string.
|
||||
"""
|
||||
return json.dumps(self.main_curve.config.model_dump(), indent=2)
|
||||
|
||||
@curve_json.setter
|
||||
def curve_json(self, value: str):
|
||||
"""
|
||||
Set the curve configuration from a JSON string.
|
||||
|
||||
Args:
|
||||
value(str): The JSON string to set the curve configuration from.
|
||||
"""
|
||||
try:
|
||||
config = ScatterCurveConfig(**json.loads(value))
|
||||
self._add_main_scatter_curve(config)
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Failed to decode JSON: {e}")
|
||||
|
||||
################################################################################
|
||||
# High Level methods for API
|
||||
################################################################################
|
||||
@@ -285,10 +275,6 @@ class ScatterWaveform(PlotBase):
|
||||
Args:
|
||||
config(ScatterCurveConfig): The configuration of the scatter curve.
|
||||
"""
|
||||
# Apply suffix for axes
|
||||
self.set_x_label_suffix(f"[{config.x_device.name}-{config.x_device.name}]")
|
||||
self.set_y_label_suffix(f"[{config.y_device.name}-{config.y_device.name}]")
|
||||
|
||||
# To have only one main curve
|
||||
if self._main_curve is not None:
|
||||
self.rpc_register.remove_rpc(self._main_curve)
|
||||
@@ -298,6 +284,9 @@ class ScatterWaveform(PlotBase):
|
||||
self._main_curve = None
|
||||
|
||||
self._main_curve = ScatterCurve(parent_item=self, config=config, name=config.label)
|
||||
|
||||
# Update axis labels (matching Heatmap's label policy)
|
||||
self.update_labels()
|
||||
self.plot_item.addItem(self._main_curve)
|
||||
|
||||
self.sync_signal_update.emit()
|
||||
@@ -405,6 +394,284 @@ class ScatterWaveform(PlotBase):
|
||||
scan_devices = self.scan_item.devices
|
||||
return scan_devices, "value"
|
||||
|
||||
################################################################################
|
||||
# Widget Specific Properties
|
||||
################################################################################
|
||||
|
||||
@SafeProperty(str)
|
||||
def x_device_name(self) -> str:
|
||||
"""Device name for the X axis."""
|
||||
if self._main_curve is None or self._main_curve.config.x_device is None:
|
||||
return ""
|
||||
return self._main_curve.config.x_device.name or ""
|
||||
|
||||
@x_device_name.setter
|
||||
def x_device_name(self, device_name: str) -> None:
|
||||
"""
|
||||
Set the X device name.
|
||||
|
||||
Args:
|
||||
device_name(str): Device name for the X axis
|
||||
"""
|
||||
device_name = device_name or ""
|
||||
|
||||
if device_name:
|
||||
try:
|
||||
entry = self.entry_validator.validate_signal(device_name, None)
|
||||
# Update or create config
|
||||
if self._main_curve.config.x_device is None:
|
||||
self._main_curve.config.x_device = ScatterDeviceSignal(
|
||||
name=device_name, entry=entry
|
||||
)
|
||||
else:
|
||||
self._main_curve.config.x_device.name = device_name
|
||||
self._main_curve.config.x_device.entry = entry
|
||||
self.property_changed.emit("x_device_name", device_name)
|
||||
self.update_labels()
|
||||
self._try_auto_plot()
|
||||
except Exception:
|
||||
pass # Silently fail if device is not available yet
|
||||
else:
|
||||
if self._main_curve.config.x_device is not None:
|
||||
self._main_curve.config.x_device = None
|
||||
self.property_changed.emit("x_device_name", "")
|
||||
self.update_labels()
|
||||
|
||||
@SafeProperty(str)
|
||||
def x_device_entry(self) -> str:
|
||||
"""Signal entry for the X axis device."""
|
||||
if self._main_curve is None or self._main_curve.config.x_device is None:
|
||||
return ""
|
||||
return self._main_curve.config.x_device.entry or ""
|
||||
|
||||
@x_device_entry.setter
|
||||
def x_device_entry(self, entry: str) -> None:
|
||||
"""
|
||||
Set the X device entry.
|
||||
|
||||
Args:
|
||||
entry(str): Signal entry for the X axis device
|
||||
"""
|
||||
if not entry:
|
||||
return
|
||||
|
||||
if self._main_curve.config.x_device is None:
|
||||
logger.warning("Cannot set x_device_entry without x_device_name set first.")
|
||||
return
|
||||
|
||||
device_name = self._main_curve.config.x_device.name
|
||||
try:
|
||||
validated_entry = self.entry_validator.validate_signal(device_name, entry)
|
||||
self._main_curve.config.x_device.entry = validated_entry
|
||||
self.property_changed.emit("x_device_entry", validated_entry)
|
||||
self.update_labels()
|
||||
self._try_auto_plot()
|
||||
except Exception:
|
||||
pass # Silently fail if validation fails
|
||||
|
||||
@SafeProperty(str)
|
||||
def y_device_name(self) -> str:
|
||||
"""Device name for the Y axis."""
|
||||
if self._main_curve is None or self._main_curve.config.y_device is None:
|
||||
return ""
|
||||
return self._main_curve.config.y_device.name or ""
|
||||
|
||||
@y_device_name.setter
|
||||
def y_device_name(self, device_name: str) -> None:
|
||||
"""
|
||||
Set the Y device name.
|
||||
|
||||
Args:
|
||||
device_name(str): Device name for the Y axis
|
||||
"""
|
||||
device_name = device_name or ""
|
||||
|
||||
if device_name:
|
||||
try:
|
||||
entry = self.entry_validator.validate_signal(device_name, None)
|
||||
# Update or create config
|
||||
if self._main_curve.config.y_device is None:
|
||||
self._main_curve.config.y_device = ScatterDeviceSignal(
|
||||
name=device_name, entry=entry
|
||||
)
|
||||
else:
|
||||
self._main_curve.config.y_device.name = device_name
|
||||
self._main_curve.config.y_device.entry = entry
|
||||
self.property_changed.emit("y_device_name", device_name)
|
||||
self.update_labels()
|
||||
self._try_auto_plot()
|
||||
except Exception:
|
||||
pass # Silently fail if device is not available yet
|
||||
else:
|
||||
if self._main_curve.config.y_device is not None:
|
||||
self._main_curve.config.y_device = None
|
||||
self.property_changed.emit("y_device_name", "")
|
||||
self.update_labels()
|
||||
|
||||
@SafeProperty(str)
|
||||
def y_device_entry(self) -> str:
|
||||
"""Signal entry for the Y axis device."""
|
||||
if self._main_curve is None or self._main_curve.config.y_device is None:
|
||||
return ""
|
||||
return self._main_curve.config.y_device.entry or ""
|
||||
|
||||
@y_device_entry.setter
|
||||
def y_device_entry(self, entry: str) -> None:
|
||||
"""
|
||||
Set the Y device entry.
|
||||
|
||||
Args:
|
||||
entry(str): Signal entry for the Y axis device
|
||||
"""
|
||||
if not entry:
|
||||
return
|
||||
|
||||
if self._main_curve.config.y_device is None:
|
||||
logger.warning("Cannot set y_device_entry without y_device_name set first.")
|
||||
return
|
||||
|
||||
device_name = self._main_curve.config.y_device.name
|
||||
try:
|
||||
validated_entry = self.entry_validator.validate_signal(device_name, entry)
|
||||
self._main_curve.config.y_device.entry = validated_entry
|
||||
self.property_changed.emit("y_device_entry", validated_entry)
|
||||
self.update_labels()
|
||||
self._try_auto_plot()
|
||||
except Exception:
|
||||
pass # Silently fail if validation fails
|
||||
|
||||
@SafeProperty(str)
|
||||
def z_device_name(self) -> str:
|
||||
"""Device name for the Z (color) axis."""
|
||||
if self._main_curve is None or self._main_curve.config.z_device is None:
|
||||
return ""
|
||||
return self._main_curve.config.z_device.name or ""
|
||||
|
||||
@z_device_name.setter
|
||||
def z_device_name(self, device_name: str) -> None:
|
||||
"""
|
||||
Set the Z device name.
|
||||
|
||||
Args:
|
||||
device_name(str): Device name for the Z axis
|
||||
"""
|
||||
device_name = device_name or ""
|
||||
|
||||
if device_name:
|
||||
try:
|
||||
entry = self.entry_validator.validate_signal(device_name, None)
|
||||
# Update or create config
|
||||
if self._main_curve.config.z_device is None:
|
||||
self._main_curve.config.z_device = ScatterDeviceSignal(
|
||||
name=device_name, entry=entry
|
||||
)
|
||||
else:
|
||||
self._main_curve.config.z_device.name = device_name
|
||||
self._main_curve.config.z_device.entry = entry
|
||||
self.property_changed.emit("z_device_name", device_name)
|
||||
self.update_labels()
|
||||
self._try_auto_plot()
|
||||
except Exception:
|
||||
pass # Silently fail if device is not available yet
|
||||
else:
|
||||
if self._main_curve.config.z_device is not None:
|
||||
self._main_curve.config.z_device = None
|
||||
self.property_changed.emit("z_device_name", "")
|
||||
self.update_labels()
|
||||
|
||||
@SafeProperty(str)
|
||||
def z_device_entry(self) -> str:
|
||||
"""Signal entry for the Z (color) axis device."""
|
||||
if self._main_curve is None or self._main_curve.config.z_device is None:
|
||||
return ""
|
||||
return self._main_curve.config.z_device.entry or ""
|
||||
|
||||
@z_device_entry.setter
|
||||
def z_device_entry(self, entry: str) -> None:
|
||||
"""
|
||||
Set the Z device entry.
|
||||
|
||||
Args:
|
||||
entry(str): Signal entry for the Z axis device
|
||||
"""
|
||||
if not entry:
|
||||
return
|
||||
|
||||
if self._main_curve.config.z_device is None:
|
||||
logger.warning("Cannot set z_device_entry without z_device_name set first.")
|
||||
return
|
||||
|
||||
device_name = self._main_curve.config.z_device.name
|
||||
try:
|
||||
validated_entry = self.entry_validator.validate_signal(device_name, entry)
|
||||
self._main_curve.config.z_device.entry = validated_entry
|
||||
self.property_changed.emit("z_device_entry", validated_entry)
|
||||
self.update_labels()
|
||||
self._try_auto_plot()
|
||||
except Exception:
|
||||
pass # Silently fail if validation fails
|
||||
|
||||
def _try_auto_plot(self) -> None:
|
||||
"""
|
||||
Attempt to automatically call plot() if all three devices are set.
|
||||
"""
|
||||
has_x = self._main_curve.config.x_device is not None
|
||||
has_y = self._main_curve.config.y_device is not None
|
||||
has_z = self._main_curve.config.z_device is not None
|
||||
|
||||
if has_x and has_y and has_z:
|
||||
x_name = self._main_curve.config.x_device.name
|
||||
x_entry = self._main_curve.config.x_device.entry
|
||||
y_name = self._main_curve.config.y_device.name
|
||||
y_entry = self._main_curve.config.y_device.entry
|
||||
z_name = self._main_curve.config.z_device.name
|
||||
z_entry = self._main_curve.config.z_device.entry
|
||||
try:
|
||||
self.plot(
|
||||
x_name=x_name,
|
||||
y_name=y_name,
|
||||
z_name=z_name,
|
||||
x_entry=x_entry,
|
||||
y_entry=y_entry,
|
||||
z_entry=z_entry,
|
||||
validate_bec=False, # Don't validate - entries already validated
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug(f"Auto-plot failed: {e}")
|
||||
pass # Silently fail if plot cannot be called yet
|
||||
|
||||
def update_labels(self):
|
||||
"""
|
||||
Update the labels of the x and y axes based on current device configuration.
|
||||
"""
|
||||
if self._main_curve is None:
|
||||
return
|
||||
|
||||
config = self._main_curve.config
|
||||
|
||||
# Safely get device names
|
||||
x_device = config.x_device
|
||||
y_device = config.y_device
|
||||
|
||||
x_name = x_device.name if x_device else None
|
||||
y_name = y_device.name if y_device else None
|
||||
|
||||
if x_name is not None:
|
||||
self.x_label = x_name # type: ignore
|
||||
x_dev = self.dev.get(x_name)
|
||||
if x_dev and hasattr(x_dev, "egu"):
|
||||
self.x_label_units = x_dev.egu()
|
||||
|
||||
if y_name is not None:
|
||||
self.y_label = y_name # type: ignore
|
||||
y_dev = self.dev.get(y_name)
|
||||
if y_dev and hasattr(y_dev, "egu"):
|
||||
self.y_label_units = y_dev.egu()
|
||||
|
||||
################################################################################
|
||||
# Scan History
|
||||
################################################################################
|
||||
|
||||
@SafeSlot(int)
|
||||
@SafeSlot(str)
|
||||
@SafeSlot()
|
||||
|
||||
@@ -86,29 +86,29 @@ class ScatterCurveSettings(SettingWidget):
|
||||
if hasattr(self.ui, "x_name"):
|
||||
self.ui.x_name.set_device(x_name)
|
||||
if hasattr(self.ui, "x_entry") and x_entry is not None:
|
||||
self.ui.x_entry.setText(x_entry)
|
||||
self.ui.x_entry.set_to_obj_name(x_entry)
|
||||
|
||||
if hasattr(self.ui, "y_name"):
|
||||
self.ui.y_name.set_device(y_name)
|
||||
if hasattr(self.ui, "y_entry") and y_entry is not None:
|
||||
self.ui.y_entry.setText(y_entry)
|
||||
self.ui.y_entry.set_to_obj_name(y_entry)
|
||||
|
||||
if hasattr(self.ui, "z_name"):
|
||||
self.ui.z_name.set_device(z_name)
|
||||
if hasattr(self.ui, "z_entry") and z_entry is not None:
|
||||
self.ui.z_entry.setText(z_entry)
|
||||
self.ui.z_entry.set_to_obj_name(z_entry)
|
||||
|
||||
@SafeSlot()
|
||||
def accept_changes(self):
|
||||
"""
|
||||
Apply all properties from the settings widget to the target widget.
|
||||
"""
|
||||
x_name = self.ui.x_name.text()
|
||||
x_entry = self.ui.x_entry.text()
|
||||
y_name = self.ui.y_name.text()
|
||||
y_entry = self.ui.y_entry.text()
|
||||
z_name = self.ui.z_name.text()
|
||||
z_entry = self.ui.z_entry.text()
|
||||
x_name = self.ui.x_name.currentText()
|
||||
x_entry = self.ui.x_entry.get_signal_name()
|
||||
y_name = self.ui.y_name.currentText()
|
||||
y_entry = self.ui.y_entry.get_signal_name()
|
||||
z_name = self.ui.z_name.currentText()
|
||||
z_entry = self.ui.z_entry.get_signal_name()
|
||||
validate_bec = self.ui.validate_bec.checked
|
||||
color_map = self.ui.color_map.colormap
|
||||
|
||||
|
||||
@@ -6,8 +6,8 @@
|
||||
<rect>
|
||||
<x>0</x>
|
||||
<y>0</y>
|
||||
<width>604</width>
|
||||
<height>166</height>
|
||||
<width>826</width>
|
||||
<height>204</height>
|
||||
</rect>
|
||||
</property>
|
||||
<property name="windowTitle">
|
||||
@@ -31,6 +31,13 @@
|
||||
</item>
|
||||
</layout>
|
||||
</item>
|
||||
<item>
|
||||
<widget class="Line" name="line">
|
||||
<property name="orientation">
|
||||
<enum>Qt::Orientation::Horizontal</enum>
|
||||
</property>
|
||||
</widget>
|
||||
</item>
|
||||
<item>
|
||||
<layout class="QHBoxLayout" name="horizontalLayout">
|
||||
<item>
|
||||
@@ -46,9 +53,6 @@
|
||||
</property>
|
||||
</widget>
|
||||
</item>
|
||||
<item row="0" column="1">
|
||||
<widget class="DeviceLineEdit" name="x_name"/>
|
||||
</item>
|
||||
<item row="1" column="0">
|
||||
<widget class="QLabel" name="label_2">
|
||||
<property name="text">
|
||||
@@ -56,8 +60,22 @@
|
||||
</property>
|
||||
</widget>
|
||||
</item>
|
||||
<item row="0" column="1">
|
||||
<widget class="DeviceComboBox" name="x_name">
|
||||
<property name="editable">
|
||||
<bool>true</bool>
|
||||
</property>
|
||||
<property name="set_first_element_as_empty" stdset="0">
|
||||
<bool>true</bool>
|
||||
</property>
|
||||
</widget>
|
||||
</item>
|
||||
<item row="1" column="1">
|
||||
<widget class="QLineEdit" name="x_entry"/>
|
||||
<widget class="SignalComboBox" name="x_entry">
|
||||
<property name="editable">
|
||||
<bool>true</bool>
|
||||
</property>
|
||||
</widget>
|
||||
</item>
|
||||
</layout>
|
||||
</widget>
|
||||
@@ -75,9 +93,6 @@
|
||||
</property>
|
||||
</widget>
|
||||
</item>
|
||||
<item row="0" column="1">
|
||||
<widget class="DeviceLineEdit" name="y_name"/>
|
||||
</item>
|
||||
<item row="1" column="0">
|
||||
<widget class="QLabel" name="label_4">
|
||||
<property name="text">
|
||||
@@ -85,8 +100,22 @@
|
||||
</property>
|
||||
</widget>
|
||||
</item>
|
||||
<item row="0" column="1">
|
||||
<widget class="DeviceComboBox" name="y_name">
|
||||
<property name="editable">
|
||||
<bool>true</bool>
|
||||
</property>
|
||||
<property name="set_first_element_as_empty" stdset="0">
|
||||
<bool>true</bool>
|
||||
</property>
|
||||
</widget>
|
||||
</item>
|
||||
<item row="1" column="1">
|
||||
<widget class="QLineEdit" name="y_entry"/>
|
||||
<widget class="SignalComboBox" name="y_entry">
|
||||
<property name="editable">
|
||||
<bool>true</bool>
|
||||
</property>
|
||||
</widget>
|
||||
</item>
|
||||
</layout>
|
||||
</widget>
|
||||
@@ -111,11 +140,22 @@
|
||||
</property>
|
||||
</widget>
|
||||
</item>
|
||||
<item row="1" column="1">
|
||||
<widget class="QLineEdit" name="z_entry"/>
|
||||
</item>
|
||||
<item row="0" column="1">
|
||||
<widget class="DeviceLineEdit" name="z_name"/>
|
||||
<widget class="DeviceComboBox" name="z_name">
|
||||
<property name="editable">
|
||||
<bool>true</bool>
|
||||
</property>
|
||||
<property name="set_first_element_as_empty" stdset="0">
|
||||
<bool>true</bool>
|
||||
</property>
|
||||
</widget>
|
||||
</item>
|
||||
<item row="1" column="1">
|
||||
<widget class="SignalComboBox" name="z_entry">
|
||||
<property name="editable">
|
||||
<bool>true</bool>
|
||||
</property>
|
||||
</widget>
|
||||
</item>
|
||||
</layout>
|
||||
</widget>
|
||||
@@ -125,77 +165,130 @@
|
||||
</layout>
|
||||
</widget>
|
||||
<customwidgets>
|
||||
<customwidget>
|
||||
<class>DeviceLineEdit</class>
|
||||
<extends>QLineEdit</extends>
|
||||
<header>device_line_edit</header>
|
||||
</customwidget>
|
||||
<customwidget>
|
||||
<class>ToggleSwitch</class>
|
||||
<extends>QWidget</extends>
|
||||
<extends></extends>
|
||||
<header>toggle_switch</header>
|
||||
</customwidget>
|
||||
<customwidget>
|
||||
<class>BECColorMapWidget</class>
|
||||
<extends>QWidget</extends>
|
||||
<extends></extends>
|
||||
<header>bec_color_map_widget</header>
|
||||
</customwidget>
|
||||
<customwidget>
|
||||
<class>DeviceComboBox</class>
|
||||
<extends></extends>
|
||||
<header>device_combo_box</header>
|
||||
</customwidget>
|
||||
<customwidget>
|
||||
<class>SignalComboBox</class>
|
||||
<extends></extends>
|
||||
<header>signal_combo_box</header>
|
||||
</customwidget>
|
||||
</customwidgets>
|
||||
<tabstops>
|
||||
<tabstop>x_name</tabstop>
|
||||
<tabstop>x_entry</tabstop>
|
||||
<tabstop>y_name</tabstop>
|
||||
<tabstop>y_entry</tabstop>
|
||||
<tabstop>z_name</tabstop>
|
||||
<tabstop>x_entry</tabstop>
|
||||
<tabstop>y_entry</tabstop>
|
||||
<tabstop>z_entry</tabstop>
|
||||
</tabstops>
|
||||
<resources/>
|
||||
<connections>
|
||||
<connection>
|
||||
<sender>x_name</sender>
|
||||
<signal>textChanged(QString)</signal>
|
||||
<signal>device_reset()</signal>
|
||||
<receiver>x_entry</receiver>
|
||||
<slot>clear()</slot>
|
||||
<slot>reset_selection()</slot>
|
||||
<hints>
|
||||
<hint type="sourcelabel">
|
||||
<x>134</x>
|
||||
<y>95</y>
|
||||
<x>136</x>
|
||||
<y>122</y>
|
||||
</hint>
|
||||
<hint type="destinationlabel">
|
||||
<x>138</x>
|
||||
<y>128</y>
|
||||
<x>133</x>
|
||||
<y>151</y>
|
||||
</hint>
|
||||
</hints>
|
||||
</connection>
|
||||
<connection>
|
||||
<sender>y_name</sender>
|
||||
<signal>textChanged(QString)</signal>
|
||||
<signal>device_reset()</signal>
|
||||
<receiver>y_entry</receiver>
|
||||
<slot>clear()</slot>
|
||||
<slot>reset_selection()</slot>
|
||||
<hints>
|
||||
<hint type="sourcelabel">
|
||||
<x>351</x>
|
||||
<y>91</y>
|
||||
<x>412</x>
|
||||
<y>122</y>
|
||||
</hint>
|
||||
<hint type="destinationlabel">
|
||||
<x>349</x>
|
||||
<y>121</y>
|
||||
<x>409</x>
|
||||
<y>151</y>
|
||||
</hint>
|
||||
</hints>
|
||||
</connection>
|
||||
<connection>
|
||||
<sender>z_name</sender>
|
||||
<signal>textChanged(QString)</signal>
|
||||
<signal>device_reset()</signal>
|
||||
<receiver>z_entry</receiver>
|
||||
<slot>clear()</slot>
|
||||
<slot>reset_selection()</slot>
|
||||
<hints>
|
||||
<hint type="sourcelabel">
|
||||
<x>520</x>
|
||||
<y>98</y>
|
||||
<x>687</x>
|
||||
<y>121</y>
|
||||
</hint>
|
||||
<hint type="destinationlabel">
|
||||
<x>522</x>
|
||||
<y>127</y>
|
||||
<x>684</x>
|
||||
<y>149</y>
|
||||
</hint>
|
||||
</hints>
|
||||
</connection>
|
||||
<connection>
|
||||
<sender>x_name</sender>
|
||||
<signal>currentTextChanged(QString)</signal>
|
||||
<receiver>x_entry</receiver>
|
||||
<slot>set_device(QString)</slot>
|
||||
<hints>
|
||||
<hint type="sourcelabel">
|
||||
<x>152</x>
|
||||
<y>123</y>
|
||||
</hint>
|
||||
<hint type="destinationlabel">
|
||||
<x>151</x>
|
||||
<y>151</y>
|
||||
</hint>
|
||||
</hints>
|
||||
</connection>
|
||||
<connection>
|
||||
<sender>y_name</sender>
|
||||
<signal>currentTextChanged(QString)</signal>
|
||||
<receiver>y_entry</receiver>
|
||||
<slot>set_device(QString)</slot>
|
||||
<hints>
|
||||
<hint type="sourcelabel">
|
||||
<x>412</x>
|
||||
<y>121</y>
|
||||
</hint>
|
||||
<hint type="destinationlabel">
|
||||
<x>409</x>
|
||||
<y>149</y>
|
||||
</hint>
|
||||
</hints>
|
||||
</connection>
|
||||
<connection>
|
||||
<sender>z_name</sender>
|
||||
<signal>currentTextChanged(QString)</signal>
|
||||
<receiver>z_entry</receiver>
|
||||
<slot>set_device(QString)</slot>
|
||||
<hints>
|
||||
<hint type="sourcelabel">
|
||||
<x>687</x>
|
||||
<y>121</y>
|
||||
</hint>
|
||||
<hint type="destinationlabel">
|
||||
<x>684</x>
|
||||
<y>149</y>
|
||||
</hint>
|
||||
</hints>
|
||||
</connection>
|
||||
|
||||
@@ -44,11 +44,7 @@ def test_rpc_plotting_shortcuts_init_configs(qtbot, connected_client_gui_obj):
|
||||
|
||||
im.image(monitor="eiger")
|
||||
mm.map(x_name="samx", y_name="samy")
|
||||
sw.plot(x_name="samx", y_name="samy", z_name="bpm4i")
|
||||
assert sw.main_curve.object_name == "bpm4i_bpm4i"
|
||||
# Create a new curve on the scatter waveform should replace the old one
|
||||
sw.plot(x_name="samx", y_name="samy", z_name="bpm4a")
|
||||
assert sw.main_curve.object_name == "bpm4a_bpm4a"
|
||||
mw.plot(monitor="waveform")
|
||||
# Adding multiple custom curves sho
|
||||
|
||||
|
||||
@@ -2208,7 +2208,7 @@ class TestFlatToolbarActions:
|
||||
"flat_status": "BECStatusBox",
|
||||
"flat_progress_bar": "RingProgressBar",
|
||||
"flat_terminal": "WebConsole",
|
||||
"flat_bec_shell": "WebConsole",
|
||||
"flat_bec_shell": "BECShell",
|
||||
"flat_sbb_monitor": "SBBMonitor",
|
||||
}
|
||||
|
||||
|
||||
@@ -1,18 +1,11 @@
|
||||
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
|
||||
import gc
|
||||
import time
|
||||
from functools import partial
|
||||
from multiprocessing import Process
|
||||
from unittest.mock import MagicMock, call, patch
|
||||
|
||||
import pytest
|
||||
from PySide6.QtWidgets import QWidget
|
||||
from qtpy.QtCore import QObject
|
||||
from qtpy.QtWidgets import QApplication
|
||||
|
||||
from bec_widgets.utils import BECConnector
|
||||
from bec_widgets.utils.bec_connector import ConnectionConfig
|
||||
from bec_widgets.utils.bec_widget import BECWidget
|
||||
from bec_widgets.utils.error_popups import SafeSlot as Slot
|
||||
|
||||
from .client_mocks import mocked_client
|
||||
@@ -138,127 +131,3 @@ def test_bec_connector_change_object_name(bec_connector):
|
||||
# Verify that the object with the previous name is no longer registered
|
||||
all_objects = bec_connector.rpc_register.list_all_connections().values()
|
||||
assert not any(obj.objectName() == previous_name for obj in all_objects)
|
||||
|
||||
|
||||
def test_bec_connector_terminate_run_on_about_to_quit(qtbot, bec_connector):
|
||||
assert BECConnector.EXIT_HANDLERS.get(0) is not None
|
||||
terminate_mock = MagicMock()
|
||||
bec_connector.__class__.EXIT_HANDLERS[0] = terminate_mock
|
||||
bec_connector._run_exit_handlers()
|
||||
qtbot.waitUntil(lambda: terminate_mock.call_count == 1)
|
||||
|
||||
|
||||
def test_bec_connector_terminate_run_once_and_only_once(qtbot, bec_connector):
|
||||
terminate_mock = MagicMock()
|
||||
bec_connector.__class__.EXIT_HANDLERS[0] = terminate_mock
|
||||
_conn_2 = BECConnectorQObject(client=mocked_client)
|
||||
_conn_3 = BECConnectorQObject(client=mocked_client)
|
||||
bec_connector._run_exit_handlers()
|
||||
qtbot.waitUntil(lambda: terminate_mock.call_count == 1)
|
||||
|
||||
|
||||
def test_bec_connector_exit_handlers_run_in_order(qtbot, bec_connector):
|
||||
handler = MagicMock()
|
||||
bec_connector.__class__.EXIT_HANDLERS[0] = handler
|
||||
|
||||
def h1():
|
||||
handler(prio=1)
|
||||
|
||||
def h2():
|
||||
handler(prio=2)
|
||||
|
||||
def h3():
|
||||
handler(prio=3)
|
||||
|
||||
bec_connector._add_exit_handler(h3, 5)
|
||||
bec_connector._add_exit_handler(h2, 10)
|
||||
bec_connector._add_exit_handler(h1, 15)
|
||||
bec_connector._run_exit_handlers()
|
||||
qtbot.waitUntil(lambda: handler.call_count == 4)
|
||||
handler.assert_has_calls([call(prio=1), call(prio=2), call(prio=3), call()])
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_widget_with_exit_handlers(bec_connector, mocked_client):
|
||||
with patch.object(mocked_client, "connector", bec_connector):
|
||||
handler = MagicMock()
|
||||
bec_connector.__class__.EXIT_HANDLERS[0] = handler
|
||||
|
||||
class DropWeakrefWidget(BECWidget, QWidget):
|
||||
def __init__(
|
||||
self,
|
||||
client=None,
|
||||
config: ConnectionConfig = None,
|
||||
gui_id: str | None = None,
|
||||
theme_update: bool = False,
|
||||
start_busy: bool = False,
|
||||
busy_text: str = "Loading…",
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(
|
||||
client, config, gui_id, theme_update, start_busy, busy_text, **kwargs
|
||||
)
|
||||
self.setup_on_exit()
|
||||
self.client.connector.add_exit_handler(self._on_exit_stored_ref, 5)
|
||||
self.client.connector.add_exit_handler(self.instance_on_exit, 7)
|
||||
|
||||
def setup_on_exit(self):
|
||||
def _on_exit():
|
||||
self.backgroundRole() # access some Qt thing just to fail test if c++ object is deleted
|
||||
handler("called by DropWeakrefWidget in stored reference to function")
|
||||
|
||||
self._on_exit_stored_ref = _on_exit
|
||||
|
||||
def instance_on_exit(self):
|
||||
self.backgroundRole() # access some Qt thing just to fail test if c++ object is deleted
|
||||
handler("called by DropWeakrefWidget in instance method")
|
||||
|
||||
widget = DropWeakrefWidget(client=mocked_client)
|
||||
return widget, handler
|
||||
|
||||
|
||||
def test_connector_exit_handlers_doesnt_drop_when_widget_lives(
|
||||
qtbot, bec_connector, mock_widget_with_exit_handlers
|
||||
):
|
||||
widget, handler = mock_widget_with_exit_handlers
|
||||
qtbot.addWidget(widget)
|
||||
|
||||
def h1():
|
||||
handler(prio=1)
|
||||
|
||||
bec_connector._add_exit_handler(h1, 15)
|
||||
|
||||
bec_connector._run_exit_handlers()
|
||||
qtbot.waitUntil(lambda: handler.call_count == 4)
|
||||
handler.assert_has_calls(
|
||||
[
|
||||
call(prio=1),
|
||||
call("called by DropWeakrefWidget in instance method"),
|
||||
call("called by DropWeakrefWidget in stored reference to function"),
|
||||
call(), # from root cleanup
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_connector_exit_handlers_drops_when_widget_dies(
|
||||
qtbot, bec_connector, mock_widget_with_exit_handlers
|
||||
):
|
||||
widget, handler = mock_widget_with_exit_handlers
|
||||
qtbot.addWidget(widget)
|
||||
|
||||
def h1():
|
||||
handler(prio=1)
|
||||
|
||||
bec_connector._add_exit_handler(h1, 15)
|
||||
|
||||
widget.deleteLater()
|
||||
qtbot.wait(100)
|
||||
QApplication.processEvents()
|
||||
del widget
|
||||
qtbot.wait(100)
|
||||
gc.collect()
|
||||
qtbot.wait(100)
|
||||
|
||||
bec_connector._run_exit_handlers()
|
||||
qtbot.waitUntil(lambda: handler.call_count == 2)
|
||||
handler.assert_has_calls([call(prio=1), call()])
|
||||
|
||||
@@ -4,7 +4,6 @@ import pytest
|
||||
from bec_lib.device import Signal
|
||||
from qtpy.QtWidgets import QWidget
|
||||
|
||||
from bec_widgets.tests.utils import FakeDevice
|
||||
from bec_widgets.utils.ophyd_kind_util import Kind
|
||||
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import BECDeviceFilter
|
||||
from bec_widgets.widgets.control.device_input.base_classes.device_signal_input_base import (
|
||||
@@ -153,3 +152,61 @@ def test_device_signal_input_base_cleanup(qtbot, mocked_client):
|
||||
widget.deleteLater()
|
||||
|
||||
mocked_client.callbacks.remove.assert_called_once_with(widget._device_update_register)
|
||||
|
||||
|
||||
def test_signal_combobox_get_signal_name_with_item_data(qtbot, device_signal_combobox):
|
||||
"""Test get_signal_name returns obj_name from item data when available."""
|
||||
device_signal_combobox.include_normal_signals = True
|
||||
device_signal_combobox.include_hinted_signals = True
|
||||
device_signal_combobox.set_device("samx")
|
||||
|
||||
# Select a signal that has item data with obj_name
|
||||
device_signal_combobox.setCurrentText("samx (readback)")
|
||||
|
||||
# get_signal_name should return the obj_name from item data
|
||||
signal_name = device_signal_combobox.get_signal_name()
|
||||
assert signal_name == "samx"
|
||||
|
||||
|
||||
def test_signal_combobox_get_signal_name_without_item_data(qtbot, device_signal_combobox):
|
||||
"""Test get_signal_name returns currentText when no item data available."""
|
||||
# Add a custom item without item data
|
||||
device_signal_combobox.addItem("custom_signal")
|
||||
device_signal_combobox.setCurrentText("custom_signal")
|
||||
|
||||
signal_name = device_signal_combobox.get_signal_name()
|
||||
assert signal_name == "custom_signal"
|
||||
|
||||
|
||||
def test_signal_combobox_get_signal_name_not_found(qtbot, device_signal_combobox):
|
||||
"""Test get_signal_name when text is not found in combobox (index == -1)."""
|
||||
# Set editable to allow text that's not in items
|
||||
device_signal_combobox.setEditable(True)
|
||||
device_signal_combobox.setCurrentText("nonexistent_signal")
|
||||
|
||||
signal_name = device_signal_combobox.get_signal_name()
|
||||
assert signal_name == "nonexistent_signal"
|
||||
|
||||
|
||||
def test_signal_combobox_get_signal_name_empty(qtbot, device_signal_combobox):
|
||||
"""Test get_signal_name when combobox is empty."""
|
||||
device_signal_combobox.clear()
|
||||
device_signal_combobox.setEditable(True)
|
||||
device_signal_combobox.setCurrentText("")
|
||||
|
||||
signal_name = device_signal_combobox.get_signal_name()
|
||||
assert signal_name == ""
|
||||
|
||||
|
||||
def test_signal_combobox_get_signal_name_with_velocity(qtbot, device_signal_combobox):
|
||||
"""Test get_signal_name with velocity signal."""
|
||||
device_signal_combobox.include_normal_signals = True
|
||||
device_signal_combobox.include_hinted_signals = True
|
||||
device_signal_combobox.include_config_signals = True
|
||||
device_signal_combobox.set_device("samx")
|
||||
|
||||
# Select velocity signal
|
||||
device_signal_combobox.setCurrentText("velocity")
|
||||
|
||||
signal_name = device_signal_combobox.get_signal_name()
|
||||
assert signal_name == "samx_velocity"
|
||||
|
||||
@@ -597,3 +597,277 @@ def test_finish_interpolation_thread_cleans_references(heatmap_widget):
|
||||
thread_mock.deleteLater.assert_called_once()
|
||||
assert heatmap_widget._interpolation_worker is None
|
||||
assert heatmap_widget._interpolation_thread is None
|
||||
|
||||
|
||||
def test_device_safe_properties_get(heatmap_widget):
|
||||
"""Test that device SafeProperty getters work correctly."""
|
||||
# Initially devices should be empty
|
||||
assert heatmap_widget.x_device_name == ""
|
||||
assert heatmap_widget.x_device_entry == ""
|
||||
assert heatmap_widget.y_device_name == ""
|
||||
assert heatmap_widget.y_device_entry == ""
|
||||
assert heatmap_widget.z_device_name == ""
|
||||
assert heatmap_widget.z_device_entry == ""
|
||||
|
||||
# Set devices via plot
|
||||
heatmap_widget.plot(x_name="samx", y_name="samy", z_name="bpm4i")
|
||||
|
||||
# Check properties return device names and entries separately
|
||||
assert heatmap_widget.x_device_name == "samx"
|
||||
assert heatmap_widget.x_device_entry # Should have some entry
|
||||
assert heatmap_widget.y_device_name == "samy"
|
||||
assert heatmap_widget.y_device_entry # Should have some entry
|
||||
assert heatmap_widget.z_device_name == "bpm4i"
|
||||
assert heatmap_widget.z_device_entry # Should have some entry
|
||||
|
||||
|
||||
def test_device_safe_properties_set_name(heatmap_widget):
|
||||
"""Test that device SafeProperty setters work for device names."""
|
||||
# Set x_device_name - should auto-validate entry
|
||||
heatmap_widget.x_device_name = "samx"
|
||||
assert heatmap_widget._image_config.x_device is not None
|
||||
assert heatmap_widget._image_config.x_device.name == "samx"
|
||||
assert heatmap_widget._image_config.x_device.entry is not None # Entry should be validated
|
||||
assert heatmap_widget.x_device_name == "samx"
|
||||
|
||||
# Set y_device_name
|
||||
heatmap_widget.y_device_name = "samy"
|
||||
assert heatmap_widget._image_config.y_device is not None
|
||||
assert heatmap_widget._image_config.y_device.name == "samy"
|
||||
assert heatmap_widget._image_config.y_device.entry is not None
|
||||
assert heatmap_widget.y_device_name == "samy"
|
||||
|
||||
# Set z_device_name
|
||||
heatmap_widget.z_device_name = "bpm4i"
|
||||
assert heatmap_widget._image_config.z_device is not None
|
||||
assert heatmap_widget._image_config.z_device.name == "bpm4i"
|
||||
assert heatmap_widget._image_config.z_device.entry is not None
|
||||
assert heatmap_widget.z_device_name == "bpm4i"
|
||||
|
||||
|
||||
def test_device_safe_properties_set_entry(heatmap_widget):
|
||||
"""Test that device entry properties can override default entries."""
|
||||
# Set device name first - this auto-validates entry
|
||||
heatmap_widget.x_device_name = "samx"
|
||||
initial_entry = heatmap_widget.x_device_entry
|
||||
assert initial_entry # Should have auto-validated entry
|
||||
|
||||
# Override with specific entry
|
||||
heatmap_widget.x_device_entry = "samx"
|
||||
assert heatmap_widget._image_config.x_device.entry == "samx"
|
||||
assert heatmap_widget.x_device_entry == "samx"
|
||||
|
||||
# Same for y device
|
||||
heatmap_widget.y_device_name = "samy"
|
||||
heatmap_widget.y_device_entry = "samy_setpoint"
|
||||
assert heatmap_widget._image_config.y_device.entry == "samy_setpoint"
|
||||
|
||||
# Same for z device
|
||||
heatmap_widget.z_device_name = "bpm4i"
|
||||
heatmap_widget.z_device_entry = "bpm4i"
|
||||
assert heatmap_widget._image_config.z_device.entry == "bpm4i"
|
||||
|
||||
|
||||
def test_device_entry_cannot_be_set_without_name(heatmap_widget):
|
||||
"""Test that setting entry without device name logs warning and does nothing."""
|
||||
# Try to set entry without device name
|
||||
heatmap_widget.x_device_entry = "some_entry"
|
||||
# Should not crash, entry should remain empty
|
||||
assert heatmap_widget.x_device_entry == ""
|
||||
assert heatmap_widget._image_config.x_device is None
|
||||
|
||||
|
||||
def test_device_safe_properties_set_empty(heatmap_widget):
|
||||
"""Test that device SafeProperty setters handle empty strings."""
|
||||
# Set device first
|
||||
heatmap_widget.x_device_name = "samx"
|
||||
assert heatmap_widget._image_config.x_device is not None
|
||||
|
||||
# Set to empty string - should clear the device
|
||||
heatmap_widget.x_device_name = ""
|
||||
assert heatmap_widget.x_device_name == ""
|
||||
assert heatmap_widget._image_config.x_device is None
|
||||
|
||||
|
||||
def test_device_safe_properties_auto_plot(heatmap_widget):
|
||||
"""Test that setting all three devices triggers auto-plot."""
|
||||
# Set all three devices
|
||||
heatmap_widget.x_device_name = "samx"
|
||||
heatmap_widget.y_device_name = "samy"
|
||||
heatmap_widget.z_device_name = "bpm4i"
|
||||
|
||||
# Check that plot was called (image_config should be updated)
|
||||
assert heatmap_widget._image_config.x_device is not None
|
||||
assert heatmap_widget._image_config.y_device is not None
|
||||
assert heatmap_widget._image_config.z_device is not None
|
||||
|
||||
|
||||
def test_device_properties_update_labels(heatmap_widget):
|
||||
"""Test that setting device properties updates axis labels."""
|
||||
# Set x device - should update x label
|
||||
heatmap_widget.x_device_name = "samx"
|
||||
assert heatmap_widget.x_label == "samx"
|
||||
|
||||
# Set y device - should update y label
|
||||
heatmap_widget.y_device_name = "samy"
|
||||
assert heatmap_widget.y_label == "samy"
|
||||
|
||||
# Set z device - should update title
|
||||
heatmap_widget.z_device_name = "bpm4i"
|
||||
assert heatmap_widget.title == "bpm4i"
|
||||
|
||||
|
||||
def test_device_properties_partial_configuration(heatmap_widget):
|
||||
"""Test that widget handles partial device configuration gracefully."""
|
||||
# Set only x device
|
||||
heatmap_widget.x_device_name = "samx"
|
||||
assert heatmap_widget.x_device_name == "samx"
|
||||
assert heatmap_widget.y_device_name == ""
|
||||
assert heatmap_widget.z_device_name == ""
|
||||
|
||||
# Set only y device (x already set)
|
||||
heatmap_widget.y_device_name = "samy"
|
||||
assert heatmap_widget.x_device_name == "samx"
|
||||
assert heatmap_widget.y_device_name == "samy"
|
||||
assert heatmap_widget.z_device_name == ""
|
||||
|
||||
# Auto-plot should not trigger yet (z missing)
|
||||
# But devices should be configured
|
||||
assert heatmap_widget._image_config.x_device is not None
|
||||
assert heatmap_widget._image_config.y_device is not None
|
||||
|
||||
|
||||
def test_device_properties_in_user_access(heatmap_widget):
|
||||
"""Test that device properties are exposed in USER_ACCESS for RPC."""
|
||||
from bec_widgets.widgets.plots.heatmap.heatmap import Heatmap
|
||||
|
||||
assert "x_device_name" in Heatmap.USER_ACCESS
|
||||
assert "x_device_name.setter" in Heatmap.USER_ACCESS
|
||||
assert "x_device_entry" in Heatmap.USER_ACCESS
|
||||
assert "x_device_entry.setter" in Heatmap.USER_ACCESS
|
||||
assert "y_device_name" in Heatmap.USER_ACCESS
|
||||
assert "y_device_name.setter" in Heatmap.USER_ACCESS
|
||||
assert "y_device_entry" in Heatmap.USER_ACCESS
|
||||
assert "y_device_entry.setter" in Heatmap.USER_ACCESS
|
||||
assert "z_device_name" in Heatmap.USER_ACCESS
|
||||
assert "z_device_name.setter" in Heatmap.USER_ACCESS
|
||||
assert "z_device_entry" in Heatmap.USER_ACCESS
|
||||
assert "z_device_entry.setter" in Heatmap.USER_ACCESS
|
||||
|
||||
|
||||
def test_device_properties_validation(heatmap_widget):
|
||||
"""Test that device entries are validated through entry_validator."""
|
||||
# Set device name - entry should be auto-validated
|
||||
heatmap_widget.x_device_name = "samx"
|
||||
initial_entry = heatmap_widget.x_device_entry
|
||||
|
||||
# The entry should be validated (will be "samx" in the mock)
|
||||
assert initial_entry == "samx"
|
||||
|
||||
# Set a different entry - should also be validated
|
||||
heatmap_widget.x_device_entry = "samx" # Use same name as validated entry
|
||||
assert heatmap_widget.x_device_entry == "samx"
|
||||
|
||||
|
||||
def test_device_properties_with_plot_method(heatmap_widget):
|
||||
"""Test that device properties reflect values set via plot() method."""
|
||||
# Use plot method
|
||||
heatmap_widget.plot(x_name="samx", y_name="samy", z_name="bpm4i")
|
||||
|
||||
# Properties should reflect the plotted devices
|
||||
assert heatmap_widget.x_device_name == "samx"
|
||||
assert heatmap_widget.y_device_name == "samy"
|
||||
assert heatmap_widget.z_device_name == "bpm4i"
|
||||
|
||||
# Entries should be validated
|
||||
assert heatmap_widget.x_device_entry == "samx"
|
||||
assert heatmap_widget.y_device_entry == "samy"
|
||||
assert heatmap_widget.z_device_entry == "bpm4i"
|
||||
|
||||
|
||||
def test_device_properties_overwrite_via_properties(heatmap_widget):
|
||||
"""Test that device properties can overwrite values set via plot()."""
|
||||
# First set via plot
|
||||
heatmap_widget.plot(x_name="samx", y_name="samy", z_name="bpm4i")
|
||||
|
||||
# Overwrite x device via properties
|
||||
heatmap_widget.x_device_name = "samz"
|
||||
assert heatmap_widget.x_device_name == "samz"
|
||||
assert heatmap_widget._image_config.x_device.name == "samz"
|
||||
|
||||
# Overwrite y device entry
|
||||
heatmap_widget.y_device_entry = "samy"
|
||||
assert heatmap_widget.y_device_entry == "samy"
|
||||
|
||||
|
||||
def test_device_properties_clearing_devices(heatmap_widget):
|
||||
"""Test clearing devices by setting to empty string."""
|
||||
# Set all devices
|
||||
heatmap_widget.x_device_name = "samx"
|
||||
heatmap_widget.y_device_name = "samy"
|
||||
heatmap_widget.z_device_name = "bpm4i"
|
||||
|
||||
# Clear x device
|
||||
heatmap_widget.x_device_name = ""
|
||||
assert heatmap_widget.x_device_name == ""
|
||||
assert heatmap_widget._image_config.x_device is None
|
||||
|
||||
# Y and Z should still be set
|
||||
assert heatmap_widget.y_device_name == "samy"
|
||||
assert heatmap_widget.z_device_name == "bpm4i"
|
||||
|
||||
|
||||
def test_device_properties_property_changed_signal(heatmap_widget):
|
||||
"""Test that property_changed signal is emitted when devices are set."""
|
||||
from unittest.mock import Mock
|
||||
|
||||
# Connect mock to property_changed signal
|
||||
mock_handler = Mock()
|
||||
heatmap_widget.property_changed.connect(mock_handler)
|
||||
|
||||
# Set device name
|
||||
heatmap_widget.x_device_name = "samx"
|
||||
|
||||
# Signal should have been emitted
|
||||
assert mock_handler.called
|
||||
# Check it was called with correct arguments
|
||||
mock_handler.assert_any_call("x_device_name", "samx")
|
||||
|
||||
|
||||
def test_device_entry_validation_with_invalid_device(heatmap_widget):
|
||||
"""Test that invalid device names are handled gracefully."""
|
||||
# Try to set invalid device name
|
||||
heatmap_widget.x_device_name = "nonexistent_device"
|
||||
|
||||
# Should not crash, but device might not be set if validation fails
|
||||
# The implementation silently fails, so we just check it doesn't crash
|
||||
|
||||
|
||||
def test_device_properties_sequential_entry_changes(heatmap_widget):
|
||||
"""Test changing device entry multiple times."""
|
||||
# Set device
|
||||
heatmap_widget.x_device_name = "samx"
|
||||
|
||||
# Change entry multiple times
|
||||
heatmap_widget.x_device_entry = "samx_velocity"
|
||||
assert heatmap_widget.x_device_entry == "samx_velocity"
|
||||
|
||||
heatmap_widget.x_device_entry = "samx_setpoint"
|
||||
assert heatmap_widget.x_device_entry == "samx_setpoint"
|
||||
|
||||
heatmap_widget.x_device_entry = "samx"
|
||||
assert heatmap_widget.x_device_entry == "samx"
|
||||
|
||||
|
||||
def test_device_properties_with_none_values(heatmap_widget):
|
||||
"""Test that None values are handled as empty strings."""
|
||||
# Device name None should be treated as empty
|
||||
heatmap_widget.x_device_name = None
|
||||
assert heatmap_widget.x_device_name == ""
|
||||
|
||||
# Set a device first
|
||||
heatmap_widget.y_device_name = "samy"
|
||||
|
||||
# Entry None should not change anything
|
||||
heatmap_widget.y_device_entry = None
|
||||
assert heatmap_widget.y_device_entry # Should still have validated entry
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import numpy as np
|
||||
import pyqtgraph as pg
|
||||
import pytest
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from qtpy.QtCore import QPointF
|
||||
|
||||
from bec_widgets.widgets.plots.image.image import Image
|
||||
@@ -178,6 +179,114 @@ def test_image_setup_preview_signal_2d(qtbot, mocked_client, monkeypatch):
|
||||
np.testing.assert_array_equal(view.main_image.image, test_data)
|
||||
|
||||
|
||||
def test_preview_signals_skip_0d_entries(qtbot, mocked_client, monkeypatch):
|
||||
"""
|
||||
Preview/async combobox should omit 0‑D signals.
|
||||
"""
|
||||
view = create_widget(qtbot, Image, client=mocked_client)
|
||||
|
||||
def fake_get(sign_cls):
|
||||
if sign_cls == "PreviewSignal":
|
||||
return [
|
||||
(
|
||||
"dev",
|
||||
"sig0d",
|
||||
{
|
||||
"obj_name": "sig0d",
|
||||
"signal_class": "PreviewSignal",
|
||||
"describe": {"signal_info": {"ndim": 0}},
|
||||
},
|
||||
),
|
||||
(
|
||||
"dev",
|
||||
"sig2d",
|
||||
{
|
||||
"obj_name": "sig2d",
|
||||
"signal_class": "PreviewSignal",
|
||||
"describe": {"signal_info": {"ndim": 2}},
|
||||
},
|
||||
),
|
||||
]
|
||||
return []
|
||||
|
||||
monkeypatch.setattr(view.client.device_manager, "get_bec_signals", fake_get)
|
||||
view.device_combo_box.clear()
|
||||
view.device_combo_box.addItem("", None)
|
||||
view._populate_signals()
|
||||
|
||||
texts = [view.device_combo_box.itemText(i) for i in range(view.device_combo_box.count())]
|
||||
assert "sig0d" not in texts
|
||||
assert "sig2d" in texts
|
||||
|
||||
|
||||
def test_image_async_signal_uses_obj_name(qtbot, mocked_client, monkeypatch):
|
||||
"""
|
||||
Verify async signals use obj_name for endpoints/payloads and reconnect with scan_id.
|
||||
"""
|
||||
view = create_widget(qtbot, Image, client=mocked_client)
|
||||
signal_config = {
|
||||
"obj_name": "async_obj",
|
||||
"signal_class": "AsyncSignal",
|
||||
"describe": {"signal_info": {"ndim": 1}},
|
||||
}
|
||||
|
||||
view.image(monitor=("eiger", "img", signal_config))
|
||||
assert view.subscriptions["main"].async_signal_name == "async_obj"
|
||||
|
||||
# Prepare scan ids and capture dispatcher calls
|
||||
view.old_scan_id = "old_scan"
|
||||
view.scan_id = "new_scan"
|
||||
connected = []
|
||||
disconnected = []
|
||||
monkeypatch.setattr(
|
||||
view.bec_dispatcher,
|
||||
"connect_slot",
|
||||
lambda slot, endpoint, from_start=False, cb_info=None: connected.append(
|
||||
(slot, endpoint, from_start, cb_info)
|
||||
),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
view.bec_dispatcher,
|
||||
"disconnect_slot",
|
||||
lambda slot, endpoint: disconnected.append((slot, endpoint)),
|
||||
)
|
||||
|
||||
view._setup_async_image(view.scan_id)
|
||||
|
||||
expected_new = MessageEndpoints.device_async_signal("new_scan", "eiger", "async_obj")
|
||||
expected_old = MessageEndpoints.device_async_signal("old_scan", "eiger", "async_obj")
|
||||
assert any(ep == expected_new for _, ep, _, _ in connected)
|
||||
assert any(ep == expected_old for _, ep in disconnected)
|
||||
|
||||
# Payload extraction should use obj_name
|
||||
payload = np.array([1, 2, 3])
|
||||
msg = {"signals": {"async_obj": {"value": payload}}}
|
||||
assert view._get_payload_data(msg) is payload
|
||||
|
||||
|
||||
def test_disconnect_monitor_clears_async_state(qtbot, mocked_client, monkeypatch):
|
||||
view = create_widget(qtbot, Image, client=mocked_client)
|
||||
signal_config = {
|
||||
"obj_name": "async_obj",
|
||||
"signal_class": "AsyncSignal",
|
||||
"describe": {"signal_info": {"ndim": 2}},
|
||||
}
|
||||
|
||||
view.image(monitor=("eiger", "img", signal_config))
|
||||
view.scan_id = "scan_x"
|
||||
view.old_scan_id = "scan_y"
|
||||
view.subscriptions["main"].async_signal_name = "async_obj"
|
||||
|
||||
# Avoid touching real dispatcher
|
||||
monkeypatch.setattr(view.bec_dispatcher, "disconnect_slot", lambda *args, **kwargs: None)
|
||||
|
||||
view.disconnect_monitor(("eiger", "img", signal_config))
|
||||
|
||||
assert view.subscriptions["main"].monitor is None
|
||||
assert view.subscriptions["main"].async_signal_name is None
|
||||
assert view.async_update is False
|
||||
|
||||
|
||||
##############################################
|
||||
# Device monitor endpoint update mechanism
|
||||
|
||||
@@ -600,33 +709,99 @@ def test_monitor_selection_reverse_device_items(qtbot, mocked_client):
|
||||
assert combo.currentText() == "samy"
|
||||
|
||||
|
||||
def test_monitor_selection_populate_preview_signals(qtbot, mocked_client, monkeypatch):
|
||||
def test_monitor_selection_populate_signals(qtbot, mocked_client, monkeypatch):
|
||||
"""
|
||||
Verify that _populate_preview_signals adds preview‑signal devices to the combo‑box
|
||||
Verify that _populate_signals adds preview‑signal and async-signal devices to the combo‑box
|
||||
with the correct userData.
|
||||
"""
|
||||
view = create_widget(qtbot, Image, client=mocked_client)
|
||||
|
||||
signal_configs = {
|
||||
"PreviewSignal": [
|
||||
("eiger", "img", {"obj_name": "eiger_img", "describe": {"signal_info": {"ndim": 2}}}),
|
||||
(
|
||||
"eiger2",
|
||||
"img2",
|
||||
{"obj_name": "eiger_img2", "describe": {"signal_info": {"ndim": 2}}},
|
||||
),
|
||||
],
|
||||
"AsyncSignal": [
|
||||
(
|
||||
"async_device",
|
||||
"img_async",
|
||||
{"obj_name": "async_device_img_async", "describe": {"signal_info": {"ndim": 2}}},
|
||||
)
|
||||
],
|
||||
"AsyncMultiSignal": [
|
||||
(
|
||||
"multi_device",
|
||||
"img_multi",
|
||||
{"obj_name": "multi_device_img_multi", "describe": {"signal_info": {"ndim": 2}}},
|
||||
)
|
||||
],
|
||||
"DynamicSignal": [
|
||||
(
|
||||
"dynamic_device",
|
||||
"img_dyn",
|
||||
{"obj_name": "dynamic_device_img_dyn", "describe": {"signal_info": {"ndim": 2}}},
|
||||
)
|
||||
],
|
||||
}
|
||||
|
||||
# Provide a deterministic fake device_manager with get_bec_signals
|
||||
class _FakeDM:
|
||||
def get_bec_signals(self, _filter):
|
||||
return [
|
||||
("eiger", "img", {"obj_name": "eiger_img"}),
|
||||
("async_device", "img2", {"obj_name": "async_device_img2"}),
|
||||
]
|
||||
if isinstance(_filter, str):
|
||||
filters = [_filter]
|
||||
else:
|
||||
filters = list(_filter)
|
||||
|
||||
signals = []
|
||||
for filt in filters:
|
||||
signals.extend(signal_configs.get(filt, []))
|
||||
return signals
|
||||
|
||||
monkeypatch.setattr(view.client, "device_manager", _FakeDM())
|
||||
|
||||
initial_count = view.device_combo_box.count()
|
||||
view._populate_signals()
|
||||
|
||||
view._populate_preview_signals()
|
||||
# Base devices first, then empty separator, then signal entries
|
||||
signal_texts = []
|
||||
separator_seen = False
|
||||
for i in range(view.device_combo_box.count()):
|
||||
data = view.device_combo_box.itemData(i)
|
||||
text = view.device_combo_box.itemText(i)
|
||||
if data is None and text == "":
|
||||
separator_seen = True
|
||||
continue
|
||||
if separator_seen is False:
|
||||
# base device entries
|
||||
continue
|
||||
# After separator we expect signal tuples
|
||||
assert isinstance(data, tuple)
|
||||
signal_texts.append(text)
|
||||
|
||||
# Two new entries should have been added
|
||||
assert view.device_combo_box.count() == initial_count + 2
|
||||
|
||||
# The first newly added item should carry tuple userData describing the device/signal
|
||||
data = view.device_combo_box.itemData(initial_count)
|
||||
assert isinstance(data, tuple) and data[0] == "eiger"
|
||||
expected_labels = {
|
||||
"eiger_img",
|
||||
"eiger_img2",
|
||||
"async_device_img_async",
|
||||
"multi_device_img_multi",
|
||||
"dynamic_device_img_dyn",
|
||||
}
|
||||
assert expected_labels.issubset(set(signal_texts))
|
||||
first_signal_idx = next(
|
||||
i
|
||||
for i in range(view.device_combo_box.count())
|
||||
if isinstance(view.device_combo_box.itemData(i), tuple)
|
||||
)
|
||||
data = view.device_combo_box.itemData(first_signal_idx)
|
||||
assert isinstance(data, tuple) and data[0] in [
|
||||
"eiger",
|
||||
"eiger2",
|
||||
"async_device",
|
||||
"multi_device",
|
||||
"dynamic_device",
|
||||
]
|
||||
|
||||
|
||||
def test_monitor_selection_adjust_and_connect(qtbot, mocked_client, monkeypatch):
|
||||
@@ -641,7 +816,26 @@ def test_monitor_selection_adjust_and_connect(qtbot, mocked_client, monkeypatch)
|
||||
# Deterministic fake device_manager
|
||||
class _FakeDM:
|
||||
def get_bec_signals(self, _filter):
|
||||
return [("eiger", "img", {"obj_name": "eiger_img"})]
|
||||
if isinstance(_filter, str):
|
||||
filters = [_filter]
|
||||
else:
|
||||
filters = list(_filter)
|
||||
|
||||
signals = []
|
||||
for filt in filters:
|
||||
if filt == "PreviewSignal":
|
||||
signals.extend(
|
||||
[
|
||||
(
|
||||
"eiger",
|
||||
"img",
|
||||
{"obj_name": "eiger_img", "describe": {"signal_info": {"ndim": 2}}},
|
||||
)
|
||||
]
|
||||
)
|
||||
else:
|
||||
signals.extend([])
|
||||
return signals
|
||||
|
||||
monkeypatch.setattr(view.client, "device_manager", _FakeDM())
|
||||
|
||||
@@ -654,9 +848,13 @@ def test_monitor_selection_adjust_and_connect(qtbot, mocked_client, monkeypatch)
|
||||
# Execute the method under test
|
||||
view._adjust_and_connect()
|
||||
|
||||
# Expect exactly two items: preview label followed by the empty default
|
||||
assert combo.count() == 2
|
||||
# Because of the reversal, the preview label comes first
|
||||
assert combo.itemText(0) == "eiger_img"
|
||||
# Base devices should appear first, then empty separator, then signals
|
||||
sep_idx = next(
|
||||
i for i in range(combo.count()) if combo.itemData(i) is None and combo.itemText(i) == ""
|
||||
)
|
||||
first_signal_idx = sep_idx + 1
|
||||
assert isinstance(combo.itemData(first_signal_idx), tuple)
|
||||
assert combo.itemText(first_signal_idx) == "eiger_img"
|
||||
assert combo.itemText(sep_idx) == ""
|
||||
# Current selection remains empty
|
||||
assert combo.currentText() == ""
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import json
|
||||
from unittest.mock import patch
|
||||
|
||||
import numpy as np
|
||||
|
||||
@@ -7,6 +7,9 @@ from bec_widgets.widgets.plots.scatter_waveform.scatter_curve import (
|
||||
ScatterDeviceSignal,
|
||||
)
|
||||
from bec_widgets.widgets.plots.scatter_waveform.scatter_waveform import ScatterWaveform
|
||||
from bec_widgets.widgets.plots.scatter_waveform.settings.scatter_curve_setting import (
|
||||
ScatterCurveSettings,
|
||||
)
|
||||
from tests.unit_tests.client_mocks import create_dummy_scan_item, mocked_client
|
||||
|
||||
from .conftest import create_widget
|
||||
@@ -46,28 +49,6 @@ def test_scatter_waveform_color_map(qtbot, mocked_client):
|
||||
assert swf.color_map == "plasma"
|
||||
|
||||
|
||||
def test_scatter_waveform_curve_json(qtbot, mocked_client):
|
||||
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
|
||||
|
||||
# Add a device-based scatter curve
|
||||
swf.plot(x_name="samx", y_name="samy", z_name="bpm4i", label="test_curve")
|
||||
|
||||
json_str = swf.curve_json
|
||||
data = json.loads(json_str)
|
||||
assert isinstance(data, dict)
|
||||
assert data["label"] == "test_curve"
|
||||
assert data["x_device"]["name"] == "samx"
|
||||
assert data["y_device"]["name"] == "samy"
|
||||
assert data["z_device"]["name"] == "bpm4i"
|
||||
|
||||
# Clear and reload from JSON
|
||||
swf.clear_all()
|
||||
assert swf.main_curve.getData() == (None, None)
|
||||
|
||||
swf.curve_json = json_str
|
||||
assert swf.main_curve.config.label == "test_curve"
|
||||
|
||||
|
||||
def test_scatter_waveform_update_with_scan_history(qtbot, mocked_client, monkeypatch):
|
||||
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
|
||||
|
||||
@@ -151,3 +132,413 @@ def test_scatter_waveform_scan_progress(qtbot, mocked_client, monkeypatch):
|
||||
# swf.scatter_dialog.close()
|
||||
# assert swf.scatter_dialog is None
|
||||
# assert not scatter_popup_action.isChecked(), "Should be unchecked after closing dialog"
|
||||
|
||||
|
||||
################################################################################
|
||||
# Device Property Tests
|
||||
################################################################################
|
||||
|
||||
|
||||
def test_device_safe_properties_get(qtbot, mocked_client):
|
||||
"""Test that device SafeProperty getters work correctly."""
|
||||
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
|
||||
|
||||
# Initially devices should be empty
|
||||
assert swf.x_device_name == ""
|
||||
assert swf.x_device_entry == ""
|
||||
assert swf.y_device_name == ""
|
||||
assert swf.y_device_entry == ""
|
||||
assert swf.z_device_name == ""
|
||||
assert swf.z_device_entry == ""
|
||||
|
||||
# Set devices via plot
|
||||
swf.plot(x_name="samx", y_name="samy", z_name="bpm4i")
|
||||
|
||||
# Check properties return device names and entries separately
|
||||
assert swf.x_device_name == "samx"
|
||||
assert swf.x_device_entry # Should have some entry
|
||||
assert swf.y_device_name == "samy"
|
||||
assert swf.y_device_entry # Should have some entry
|
||||
assert swf.z_device_name == "bpm4i"
|
||||
assert swf.z_device_entry # Should have some entry
|
||||
|
||||
|
||||
def test_device_safe_properties_set_name(qtbot, mocked_client):
|
||||
"""Test that device SafeProperty setters work for device names."""
|
||||
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
|
||||
|
||||
# Set x_device_name - should auto-validate entry
|
||||
swf.x_device_name = "samx"
|
||||
assert swf._main_curve.config.x_device is not None
|
||||
assert swf._main_curve.config.x_device.name == "samx"
|
||||
assert swf._main_curve.config.x_device.entry is not None # Entry should be validated
|
||||
assert swf.x_device_name == "samx"
|
||||
|
||||
# Set y_device_name
|
||||
swf.y_device_name = "samy"
|
||||
assert swf._main_curve.config.y_device is not None
|
||||
assert swf._main_curve.config.y_device.name == "samy"
|
||||
assert swf._main_curve.config.y_device.entry is not None
|
||||
assert swf.y_device_name == "samy"
|
||||
|
||||
# Set z_device_name
|
||||
swf.z_device_name = "bpm4i"
|
||||
assert swf._main_curve.config.z_device is not None
|
||||
assert swf._main_curve.config.z_device.name == "bpm4i"
|
||||
assert swf._main_curve.config.z_device.entry is not None
|
||||
assert swf.z_device_name == "bpm4i"
|
||||
|
||||
|
||||
def test_device_safe_properties_set_entry(qtbot, mocked_client):
|
||||
"""Test that device entry properties can override default entries."""
|
||||
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
|
||||
|
||||
# Set device name first - this auto-validates entry
|
||||
swf.x_device_name = "samx"
|
||||
initial_entry = swf.x_device_entry
|
||||
assert initial_entry # Should have auto-validated entry
|
||||
|
||||
# Override with specific entry
|
||||
swf.x_device_entry = "samx"
|
||||
assert swf._main_curve.config.x_device.entry == "samx"
|
||||
assert swf.x_device_entry == "samx"
|
||||
|
||||
# Same for y device
|
||||
swf.y_device_name = "samy"
|
||||
swf.y_device_entry = "samy_setpoint"
|
||||
assert swf._main_curve.config.y_device.entry == "samy_setpoint"
|
||||
|
||||
# Same for z device
|
||||
swf.z_device_name = "bpm4i"
|
||||
swf.z_device_entry = "bpm4i"
|
||||
assert swf._main_curve.config.z_device.entry == "bpm4i"
|
||||
|
||||
|
||||
def test_device_entry_cannot_be_set_without_name(qtbot, mocked_client):
|
||||
"""Test that setting entry without device name logs warning and does nothing."""
|
||||
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
|
||||
|
||||
# Try to set entry without device name
|
||||
swf.x_device_entry = "some_entry"
|
||||
# Should not crash, entry should remain empty
|
||||
assert swf.x_device_entry == ""
|
||||
assert swf._main_curve.config.x_device is None
|
||||
|
||||
|
||||
def test_device_safe_properties_set_empty(qtbot, mocked_client):
|
||||
"""Test that device SafeProperty setters handle empty strings."""
|
||||
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
|
||||
|
||||
# Set device first
|
||||
swf.x_device_name = "samx"
|
||||
assert swf._main_curve.config.x_device is not None
|
||||
|
||||
# Set to empty string - should clear the device
|
||||
swf.x_device_name = ""
|
||||
assert swf.x_device_name == ""
|
||||
assert swf._main_curve.config.x_device is None
|
||||
|
||||
|
||||
def test_device_safe_properties_auto_plot(qtbot, mocked_client):
|
||||
"""Test that setting all three devices triggers auto-plot."""
|
||||
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
|
||||
|
||||
# Set all three devices
|
||||
swf.x_device_name = "samx"
|
||||
swf.y_device_name = "samy"
|
||||
swf.z_device_name = "bpm4i"
|
||||
|
||||
# Check that plot was called (config should be updated)
|
||||
assert swf._main_curve.config.x_device is not None
|
||||
assert swf._main_curve.config.y_device is not None
|
||||
assert swf._main_curve.config.z_device is not None
|
||||
|
||||
|
||||
def test_device_properties_update_labels(qtbot, mocked_client):
|
||||
"""Test that setting device properties updates axis labels."""
|
||||
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
|
||||
|
||||
# Set x device - should update x label
|
||||
swf.x_device_name = "samx"
|
||||
assert swf.x_label == "samx"
|
||||
|
||||
# Set y device - should update y label
|
||||
swf.y_device_name = "samy"
|
||||
assert swf.y_label == "samy"
|
||||
|
||||
# Note: ScatterWaveform doesn't have a title like Heatmap does for z_device
|
||||
|
||||
|
||||
def test_device_properties_partial_configuration(qtbot, mocked_client):
|
||||
"""Test that widget handles partial device configuration gracefully."""
|
||||
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
|
||||
|
||||
# Set only x device
|
||||
swf.x_device_name = "samx"
|
||||
assert swf.x_device_name == "samx"
|
||||
assert swf.y_device_name == ""
|
||||
assert swf.z_device_name == ""
|
||||
|
||||
# Set only y device (x already set)
|
||||
swf.y_device_name = "samy"
|
||||
assert swf.x_device_name == "samx"
|
||||
assert swf.y_device_name == "samy"
|
||||
assert swf.z_device_name == ""
|
||||
|
||||
# Auto-plot should not trigger yet (z missing)
|
||||
# But devices should be configured
|
||||
assert swf._main_curve.config.x_device is not None
|
||||
assert swf._main_curve.config.y_device is not None
|
||||
|
||||
|
||||
def test_device_properties_in_user_access(qtbot, mocked_client):
|
||||
"""Test that device properties are exposed in USER_ACCESS for RPC."""
|
||||
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
|
||||
|
||||
assert "x_device_name" in ScatterWaveform.USER_ACCESS
|
||||
assert "x_device_name.setter" in ScatterWaveform.USER_ACCESS
|
||||
assert "x_device_entry" in ScatterWaveform.USER_ACCESS
|
||||
assert "x_device_entry.setter" in ScatterWaveform.USER_ACCESS
|
||||
assert "y_device_name" in ScatterWaveform.USER_ACCESS
|
||||
assert "y_device_name.setter" in ScatterWaveform.USER_ACCESS
|
||||
assert "y_device_entry" in ScatterWaveform.USER_ACCESS
|
||||
assert "y_device_entry.setter" in ScatterWaveform.USER_ACCESS
|
||||
assert "z_device_name" in ScatterWaveform.USER_ACCESS
|
||||
assert "z_device_name.setter" in ScatterWaveform.USER_ACCESS
|
||||
assert "z_device_entry" in ScatterWaveform.USER_ACCESS
|
||||
assert "z_device_entry.setter" in ScatterWaveform.USER_ACCESS
|
||||
|
||||
|
||||
def test_device_properties_validation(qtbot, mocked_client):
|
||||
"""Test that device entries are validated through entry_validator."""
|
||||
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
|
||||
|
||||
# Set device name - entry should be auto-validated
|
||||
swf.x_device_name = "samx"
|
||||
initial_entry = swf.x_device_entry
|
||||
|
||||
# The entry should be validated (will be "samx" in the mock)
|
||||
assert initial_entry == "samx"
|
||||
|
||||
# Set a different entry - should also be validated
|
||||
swf.x_device_entry = "samx" # Use same name as validated entry
|
||||
assert swf.x_device_entry == "samx"
|
||||
|
||||
|
||||
def test_device_properties_with_plot_method(qtbot, mocked_client):
|
||||
"""Test that device properties reflect values set via plot() method."""
|
||||
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
|
||||
|
||||
# Use plot method
|
||||
swf.plot(x_name="samx", y_name="samy", z_name="bpm4i")
|
||||
|
||||
# Properties should reflect the plotted devices
|
||||
assert swf.x_device_name == "samx"
|
||||
assert swf.y_device_name == "samy"
|
||||
assert swf.z_device_name == "bpm4i"
|
||||
|
||||
# Entries should be validated
|
||||
assert swf.x_device_entry == "samx"
|
||||
assert swf.y_device_entry == "samy"
|
||||
assert swf.z_device_entry == "bpm4i"
|
||||
|
||||
|
||||
def test_device_properties_overwrite_via_properties(qtbot, mocked_client):
|
||||
"""Test that device properties can overwrite values set via plot()."""
|
||||
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
|
||||
|
||||
# First set via plot
|
||||
swf.plot(x_name="samx", y_name="samy", z_name="bpm4i")
|
||||
|
||||
# Overwrite x device via properties
|
||||
swf.x_device_name = "samz"
|
||||
assert swf.x_device_name == "samz"
|
||||
assert swf._main_curve.config.x_device.name == "samz"
|
||||
|
||||
# Overwrite y device entry
|
||||
swf.y_device_entry = "samy"
|
||||
assert swf.y_device_entry == "samy"
|
||||
|
||||
|
||||
def test_device_properties_clearing_devices(qtbot, mocked_client):
|
||||
"""Test clearing devices by setting to empty string."""
|
||||
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
|
||||
|
||||
# Set all devices
|
||||
swf.x_device_name = "samx"
|
||||
swf.y_device_name = "samy"
|
||||
swf.z_device_name = "bpm4i"
|
||||
|
||||
# Clear x device
|
||||
swf.x_device_name = ""
|
||||
assert swf.x_device_name == ""
|
||||
assert swf._main_curve.config.x_device is None
|
||||
|
||||
# Y and Z should still be set
|
||||
assert swf.y_device_name == "samy"
|
||||
assert swf.z_device_name == "bpm4i"
|
||||
|
||||
|
||||
def test_device_properties_property_changed_signal(qtbot, mocked_client):
|
||||
"""Test that property_changed signal is emitted when devices are set."""
|
||||
from unittest.mock import Mock
|
||||
|
||||
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
|
||||
|
||||
# Connect mock to property_changed signal
|
||||
mock_handler = Mock()
|
||||
swf.property_changed.connect(mock_handler)
|
||||
|
||||
# Set device name
|
||||
swf.x_device_name = "samx"
|
||||
|
||||
# Signal should have been emitted
|
||||
assert mock_handler.called
|
||||
# Check it was called with correct arguments
|
||||
mock_handler.assert_any_call("x_device_name", "samx")
|
||||
|
||||
|
||||
def test_device_entry_validation_with_invalid_device(qtbot, mocked_client):
|
||||
"""Test that invalid device names are handled gracefully."""
|
||||
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
|
||||
|
||||
# Try to set invalid device name
|
||||
swf.x_device_name = "nonexistent_device"
|
||||
|
||||
# Should not crash, but device might not be set if validation fails
|
||||
# The implementation silently fails, so we just check it doesn't crash
|
||||
|
||||
|
||||
def test_device_properties_sequential_entry_changes(qtbot, mocked_client):
|
||||
"""Test changing device entry multiple times."""
|
||||
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
|
||||
|
||||
# Set device
|
||||
swf.x_device_name = "samx"
|
||||
|
||||
# Change entry multiple times
|
||||
swf.x_device_entry = "samx_velocity"
|
||||
assert swf.x_device_entry == "samx_velocity"
|
||||
|
||||
swf.x_device_entry = "samx_setpoint"
|
||||
assert swf.x_device_entry == "samx_setpoint"
|
||||
|
||||
swf.x_device_entry = "samx"
|
||||
assert swf.x_device_entry == "samx"
|
||||
|
||||
|
||||
def test_device_properties_with_none_values(qtbot, mocked_client):
|
||||
"""Test that None values are handled as empty strings."""
|
||||
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
|
||||
|
||||
# Device name None should be treated as empty
|
||||
swf.x_device_name = None
|
||||
assert swf.x_device_name == ""
|
||||
|
||||
# Set a device first
|
||||
swf.y_device_name = "samy"
|
||||
|
||||
# Entry None should not change anything
|
||||
swf.y_device_entry = None
|
||||
assert swf.y_device_entry # Should still have validated entry
|
||||
|
||||
|
||||
################################################################################
|
||||
# ScatterCurveSettings Tests
|
||||
################################################################################
|
||||
|
||||
|
||||
def test_scatter_curve_settings_accept_changes(qtbot, mocked_client):
|
||||
"""Test that accept_changes correctly extracts data from widgets and calls plot()."""
|
||||
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
|
||||
|
||||
# Create the settings widget
|
||||
settings = ScatterCurveSettings(parent=None, target_widget=swf, popup=True)
|
||||
qtbot.addWidget(settings)
|
||||
|
||||
# Set up the widgets with test values
|
||||
settings.ui.x_name.set_device("samx")
|
||||
settings.ui.y_name.set_device("samy")
|
||||
settings.ui.z_name.set_device("bpm4i")
|
||||
|
||||
# Mock the plot method to verify it gets called with correct arguments
|
||||
with patch.object(swf, "plot") as mock_plot:
|
||||
settings.accept_changes()
|
||||
|
||||
# Verify plot was called
|
||||
mock_plot.assert_called_once()
|
||||
|
||||
# Get the call arguments
|
||||
call_kwargs = mock_plot.call_args[1]
|
||||
|
||||
# Verify device names were extracted correctly
|
||||
assert call_kwargs["x_name"] == "samx"
|
||||
assert call_kwargs["y_name"] == "samy"
|
||||
assert call_kwargs["z_name"] == "bpm4i"
|
||||
|
||||
|
||||
def test_scatter_curve_settings_accept_changes_with_entries(qtbot, mocked_client):
|
||||
"""Test that accept_changes correctly extracts signal entries from SignalComboBox."""
|
||||
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
|
||||
|
||||
# Create the settings widget
|
||||
settings = ScatterCurveSettings(parent=None, target_widget=swf, popup=True)
|
||||
qtbot.addWidget(settings)
|
||||
|
||||
# Set devices first to populate signal comboboxes
|
||||
settings.ui.x_name.set_device("samx")
|
||||
settings.ui.y_name.set_device("samy")
|
||||
settings.ui.z_name.set_device("bpm4i")
|
||||
qtbot.wait(100) # Allow time for signals to populate
|
||||
|
||||
# Mock the plot method
|
||||
with patch.object(swf, "plot") as mock_plot:
|
||||
settings.accept_changes()
|
||||
|
||||
mock_plot.assert_called_once()
|
||||
call_kwargs = mock_plot.call_args[1]
|
||||
|
||||
# Verify entries are extracted (will use get_signal_name())
|
||||
assert "x_entry" in call_kwargs
|
||||
assert "y_entry" in call_kwargs
|
||||
assert "z_entry" in call_kwargs
|
||||
|
||||
|
||||
def test_scatter_curve_settings_accept_changes_color_map(qtbot, mocked_client):
|
||||
"""Test that accept_changes correctly extracts color_map from widget."""
|
||||
|
||||
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
|
||||
|
||||
# Create the settings widget
|
||||
settings = ScatterCurveSettings(parent=None, target_widget=swf, popup=True)
|
||||
qtbot.addWidget(settings)
|
||||
|
||||
# Set devices
|
||||
settings.ui.x_name.set_device("samx")
|
||||
settings.ui.y_name.set_device("samy")
|
||||
settings.ui.z_name.set_device("bpm4i")
|
||||
|
||||
# Get the current colormap
|
||||
color_map = settings.ui.color_map.colormap
|
||||
|
||||
with patch.object(swf, "plot") as mock_plot:
|
||||
settings.accept_changes()
|
||||
call_kwargs = mock_plot.call_args[1]
|
||||
assert call_kwargs["color_map"] == color_map
|
||||
|
||||
|
||||
def test_scatter_curve_settings_fetch_all_properties(qtbot, mocked_client):
|
||||
"""Test that fetch_all_properties correctly populates the settings from target widget."""
|
||||
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
|
||||
|
||||
# First set up the scatter waveform with some data
|
||||
swf.plot(x_name="samx", y_name="samy", z_name="bpm4i")
|
||||
|
||||
# Create the settings widget - it should fetch properties automatically
|
||||
settings = ScatterCurveSettings(parent=None, target_widget=swf, popup=True)
|
||||
qtbot.addWidget(settings)
|
||||
|
||||
# Verify the settings widget has fetched the values
|
||||
assert settings.ui.x_name.currentText() == "samx"
|
||||
assert settings.ui.y_name.currentText() == "samy"
|
||||
assert settings.ui.z_name.currentText() == "bpm4i"
|
||||
|
||||
@@ -55,20 +55,16 @@ def test_script_tree_hover_events(script_tree, qtbot):
|
||||
# Send the event to the viewport (the event filter is installed on the viewport)
|
||||
script_tree.eventFilter(viewport, mouse_event)
|
||||
|
||||
qtbot.wait(100) # Allow time for the hover to be processed
|
||||
|
||||
# Now, the hover index should be set to the first item
|
||||
assert script_tree.delegate.hovered_index.isValid() == True
|
||||
qtbot.waitUntil(lambda: script_tree.delegate.hovered_index.isValid(), timeout=5000)
|
||||
assert script_tree.delegate.hovered_index.row() == index.row()
|
||||
|
||||
# Simulate mouse leaving the viewport
|
||||
leave_event = QEvent(QEvent.Type.Leave)
|
||||
script_tree.eventFilter(viewport, leave_event)
|
||||
|
||||
qtbot.wait(100) # Allow time for the leave event to be processed
|
||||
|
||||
# After leaving, no item should be hovered
|
||||
assert script_tree.delegate.hovered_index.isValid() == False
|
||||
qtbot.waitUntil(lambda: not script_tree.delegate.hovered_index.isValid(), timeout=5000)
|
||||
|
||||
|
||||
@pytest.mark.timeout(10)
|
||||
|
||||
@@ -1,25 +1,69 @@
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from qtpy.QtCore import Qt
|
||||
from qtpy.QtGui import QHideEvent
|
||||
from qtpy.QtNetwork import QAuthenticator
|
||||
|
||||
from bec_widgets.widgets.editors.web_console.web_console import WebConsole, _web_console_registry
|
||||
from bec_widgets.widgets.editors.web_console.web_console import (
|
||||
BECShell,
|
||||
ConsoleMode,
|
||||
WebConsole,
|
||||
_web_console_registry,
|
||||
)
|
||||
|
||||
from .client_mocks import mocked_client
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def console_widget(qtbot, mocked_client):
|
||||
def mocked_server_startup():
|
||||
"""Mock the web console server startup process."""
|
||||
with mock.patch(
|
||||
"bec_widgets.widgets.editors.web_console.web_console.subprocess"
|
||||
) as mock_subprocess:
|
||||
with mock.patch.object(_web_console_registry, "_wait_for_server_port"):
|
||||
_web_console_registry._server_port = 12345
|
||||
# Create the WebConsole widget
|
||||
widget = WebConsole(client=mocked_client)
|
||||
qtbot.addWidget(widget)
|
||||
qtbot.waitExposed(widget)
|
||||
yield widget
|
||||
yield mock_subprocess
|
||||
|
||||
|
||||
def static_console(qtbot, client, unique_id: str | None = None):
|
||||
"""Fixture to provide a static unique_id for WebConsole tests."""
|
||||
if unique_id is None:
|
||||
widget = WebConsole(client=client)
|
||||
else:
|
||||
widget = WebConsole(client=client, unique_id=unique_id)
|
||||
qtbot.addWidget(widget)
|
||||
qtbot.waitExposed(widget)
|
||||
return widget
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def console_widget(qtbot, mocked_client, mocked_server_startup):
|
||||
"""Create a WebConsole widget with mocked server startup."""
|
||||
yield static_console(qtbot, mocked_client)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bec_shell_widget(qtbot, mocked_client, mocked_server_startup):
|
||||
"""Create a BECShell widget with mocked server startup."""
|
||||
widget = BECShell(client=mocked_client)
|
||||
qtbot.addWidget(widget)
|
||||
qtbot.waitExposed(widget)
|
||||
yield widget
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def console_widget_with_static_id(qtbot, mocked_client, mocked_server_startup):
|
||||
"""Create a WebConsole widget with a static unique ID."""
|
||||
yield static_console(qtbot, mocked_client, unique_id="test_console")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def two_console_widgets_same_id(qtbot, mocked_client, mocked_server_startup):
|
||||
"""Create two WebConsole widgets sharing the same unique ID."""
|
||||
widget1 = static_console(qtbot, mocked_client, unique_id="shared_console")
|
||||
widget2 = static_console(qtbot, mocked_client, unique_id="shared_console")
|
||||
yield widget1, widget2
|
||||
|
||||
|
||||
def test_web_console_widget_initialization(console_widget):
|
||||
@@ -34,7 +78,7 @@ def test_web_console_write(console_widget):
|
||||
with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js:
|
||||
console_widget.write("Hello, World!")
|
||||
|
||||
assert mock.call("window.term.paste('Hello, World!');") in mock_run_js.mock_calls
|
||||
assert mock.call('window.term.paste("Hello, World!");') in mock_run_js.mock_calls
|
||||
|
||||
|
||||
def test_web_console_write_no_return(console_widget):
|
||||
@@ -42,7 +86,7 @@ def test_web_console_write_no_return(console_widget):
|
||||
with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js:
|
||||
console_widget.write("Hello, World!", send_return=False)
|
||||
|
||||
assert mock.call("window.term.paste('Hello, World!');") in mock_run_js.mock_calls
|
||||
assert mock.call('window.term.paste("Hello, World!");') in mock_run_js.mock_calls
|
||||
assert mock_run_js.call_count == 1
|
||||
|
||||
|
||||
@@ -138,6 +182,20 @@ def test_web_console_startup_command_execution(console_widget, qtbot):
|
||||
assert not console_widget._startup_timer.isActive()
|
||||
|
||||
|
||||
def test_bec_shell_startup_contains_gui_id(bec_shell_widget):
|
||||
"""Test that the BEC shell startup command includes the GUI ID."""
|
||||
bec_shell = bec_shell_widget
|
||||
|
||||
assert bec_shell._is_bec_shell
|
||||
assert bec_shell._unique_id == "bec_shell"
|
||||
|
||||
assert bec_shell.startup_cmd == "bec --nogui"
|
||||
|
||||
with mock.patch.object(bec_shell.bec_dispatcher, "cli_server") as mock_cli_server:
|
||||
mock_cli_server.gui_id = "test_gui_id"
|
||||
assert bec_shell.startup_cmd == "bec --gui-id test_gui_id"
|
||||
|
||||
|
||||
def test_web_console_set_readonly(console_widget):
|
||||
# Test the set_readonly method
|
||||
console_widget.set_readonly(True)
|
||||
@@ -145,3 +203,274 @@ def test_web_console_set_readonly(console_widget):
|
||||
|
||||
console_widget.set_readonly(False)
|
||||
assert console_widget.isEnabled()
|
||||
|
||||
|
||||
def test_web_console_with_unique_id(console_widget_with_static_id):
|
||||
"""Test creating a WebConsole with a unique_id."""
|
||||
widget = console_widget_with_static_id
|
||||
|
||||
assert widget._unique_id == "test_console"
|
||||
assert widget._unique_id in _web_console_registry._page_registry
|
||||
page_info = _web_console_registry.get_page_info("test_console")
|
||||
assert page_info is not None
|
||||
assert page_info.owner_gui_id == widget.gui_id
|
||||
assert widget.gui_id in page_info.widget_ids
|
||||
|
||||
|
||||
def test_web_console_page_sharing(two_console_widgets_same_id):
|
||||
"""Test that two widgets can share the same page using unique_id."""
|
||||
widget1, widget2 = two_console_widgets_same_id
|
||||
|
||||
# Both should reference the same page in the registry
|
||||
page_info = _web_console_registry.get_page_info("shared_console")
|
||||
assert page_info is not None
|
||||
assert widget1.gui_id in page_info.widget_ids
|
||||
assert widget2.gui_id in page_info.widget_ids
|
||||
assert widget1.page == widget2.page
|
||||
|
||||
|
||||
def test_web_console_has_ownership(console_widget_with_static_id):
|
||||
"""Test the has_ownership method."""
|
||||
widget = console_widget_with_static_id
|
||||
|
||||
# Widget should have ownership by default
|
||||
assert widget.has_ownership()
|
||||
|
||||
|
||||
def test_web_console_yield_ownership(console_widget_with_static_id):
|
||||
"""Test yielding ownership of a page."""
|
||||
widget = console_widget_with_static_id
|
||||
|
||||
assert widget.has_ownership()
|
||||
|
||||
# Yield ownership
|
||||
widget.yield_ownership()
|
||||
|
||||
# Widget should no longer have ownership
|
||||
assert not widget.has_ownership()
|
||||
page_info = _web_console_registry.get_page_info("test_console")
|
||||
assert page_info.owner_gui_id is None
|
||||
# Overlay should be shown
|
||||
assert widget._mode == ConsoleMode.INACTIVE
|
||||
|
||||
|
||||
def test_web_console_take_page_ownership(two_console_widgets_same_id):
|
||||
"""Test taking ownership of a page."""
|
||||
widget1, widget2 = two_console_widgets_same_id
|
||||
|
||||
# Widget1 should have ownership initially
|
||||
assert widget1.has_ownership()
|
||||
assert not widget2.has_ownership()
|
||||
|
||||
# Widget2 takes ownership
|
||||
widget2.take_page_ownership()
|
||||
|
||||
# Now widget2 should have ownership
|
||||
assert not widget1.has_ownership()
|
||||
assert widget2.has_ownership()
|
||||
|
||||
assert widget2._mode == ConsoleMode.ACTIVE
|
||||
assert widget1._mode == ConsoleMode.INACTIVE
|
||||
|
||||
|
||||
def test_web_console_hide_event_yields_ownership(qtbot, console_widget_with_static_id):
|
||||
"""Test that hideEvent yields ownership."""
|
||||
widget = console_widget_with_static_id
|
||||
|
||||
assert widget.has_ownership()
|
||||
|
||||
# Hide the widget. Note that we cannot call widget.hide() directly
|
||||
# because it doesn't trigger the hideEvent in tests as widgets are
|
||||
# not visible in the test environment.
|
||||
widget.hideEvent(QHideEvent())
|
||||
qtbot.wait(100) # Allow event processing
|
||||
|
||||
# Widget should have yielded ownership
|
||||
assert not widget.has_ownership()
|
||||
page_info = _web_console_registry.get_page_info("test_console")
|
||||
assert page_info.owner_gui_id is None
|
||||
|
||||
|
||||
def test_web_console_show_event_takes_ownership(console_widget_with_static_id):
|
||||
"""Test that showEvent takes ownership when page has no owner."""
|
||||
widget = console_widget_with_static_id
|
||||
|
||||
# Yield ownership
|
||||
widget.yield_ownership()
|
||||
assert not widget.has_ownership()
|
||||
|
||||
# Show the widget again
|
||||
widget.show()
|
||||
|
||||
# Widget should have reclaimed ownership
|
||||
assert widget.has_ownership()
|
||||
assert widget.browser.isVisible()
|
||||
assert not widget.overlay.isVisible()
|
||||
|
||||
|
||||
def test_web_console_mouse_press_takes_ownership(qtbot, two_console_widgets_same_id):
|
||||
"""Test that clicking on overlay takes ownership."""
|
||||
widget1, widget2 = two_console_widgets_same_id
|
||||
widget1.show()
|
||||
widget2.show()
|
||||
|
||||
# Widget1 has ownership, widget2 doesn't
|
||||
assert widget1.has_ownership()
|
||||
assert not widget2.has_ownership()
|
||||
assert widget1.isVisible()
|
||||
assert widget1._mode == ConsoleMode.ACTIVE
|
||||
assert widget2._mode == ConsoleMode.INACTIVE
|
||||
|
||||
qtbot.mouseClick(widget2, Qt.MouseButton.LeftButton)
|
||||
|
||||
# Widget2 should now have ownership
|
||||
assert widget2.has_ownership()
|
||||
assert not widget1.has_ownership()
|
||||
|
||||
|
||||
def test_web_console_registry_cleanup_removes_page(console_widget_with_static_id):
|
||||
"""Test that the registry cleans up pages when all widgets are removed."""
|
||||
widget = console_widget_with_static_id
|
||||
|
||||
assert widget._unique_id in _web_console_registry._page_registry
|
||||
|
||||
# Cleanup the widget
|
||||
widget.cleanup()
|
||||
|
||||
# Page should be removed from registry
|
||||
assert widget._unique_id not in _web_console_registry._page_registry
|
||||
|
||||
|
||||
def test_web_console_without_unique_id_no_page_sharing(console_widget):
|
||||
"""Test that widgets without unique_id don't participate in page sharing."""
|
||||
widget = console_widget
|
||||
|
||||
# Widget should not be in the page registry
|
||||
assert widget._unique_id is None
|
||||
assert not widget.has_ownership() # Should return False for non-unique widgets
|
||||
|
||||
|
||||
def test_web_console_registry_get_page_info_nonexistent(qtbot, mocked_client):
|
||||
"""Test getting page info for a non-existent page."""
|
||||
page_info = _web_console_registry.get_page_info("nonexistent")
|
||||
assert page_info is None
|
||||
|
||||
|
||||
def test_web_console_take_ownership_without_unique_id(console_widget):
|
||||
"""Test that take_page_ownership fails gracefully without unique_id."""
|
||||
widget = console_widget
|
||||
# Should not crash when taking ownership without unique_id
|
||||
widget.take_page_ownership()
|
||||
|
||||
|
||||
def test_web_console_yield_ownership_without_unique_id(console_widget):
|
||||
"""Test that yield_ownership fails gracefully without unique_id."""
|
||||
widget = console_widget
|
||||
# Should not crash when yielding ownership without unique_id
|
||||
widget.yield_ownership()
|
||||
|
||||
|
||||
def test_registry_yield_ownership_gui_id_not_in_instances():
|
||||
"""Test registry yield_ownership returns False when gui_id not in instances."""
|
||||
result = _web_console_registry.yield_ownership("nonexistent_gui_id")
|
||||
assert result is False
|
||||
|
||||
|
||||
def test_registry_yield_ownership_instance_is_none(console_widget_with_static_id):
|
||||
"""Test registry yield_ownership returns False when instance weakref is dead."""
|
||||
widget = console_widget_with_static_id
|
||||
gui_id = widget.gui_id
|
||||
|
||||
# Store the gui_id and simulate the weakref being dead
|
||||
_web_console_registry._instances[gui_id] = lambda: None
|
||||
|
||||
result = _web_console_registry.yield_ownership(gui_id)
|
||||
assert result is False
|
||||
|
||||
|
||||
def test_registry_yield_ownership_unique_id_none(console_widget_with_static_id):
|
||||
"""Test registry yield_ownership returns False when page info's unique_id is None."""
|
||||
widget = console_widget_with_static_id
|
||||
gui_id = widget.gui_id
|
||||
unique_id = widget._unique_id
|
||||
widget._unique_id = None
|
||||
|
||||
result = _web_console_registry.yield_ownership(gui_id)
|
||||
assert result is False
|
||||
|
||||
widget._unique_id = unique_id # Restore for cleanup
|
||||
|
||||
|
||||
def test_registry_yield_ownership_unique_id_not_in_page_registry(console_widget_with_static_id):
|
||||
"""Test registry yield_ownership returns False when unique_id not in page registry."""
|
||||
widget = console_widget_with_static_id
|
||||
gui_id = widget.gui_id
|
||||
unique_id = widget._unique_id
|
||||
widget._unique_id = "nonexistent_unique_id"
|
||||
|
||||
result = _web_console_registry.yield_ownership(gui_id)
|
||||
assert result is False
|
||||
|
||||
widget._unique_id = unique_id # Restore for cleanup
|
||||
|
||||
|
||||
def test_registry_owner_is_visible_page_info_none():
|
||||
"""Test owner_is_visible returns False when page info doesn't exist."""
|
||||
result = _web_console_registry.owner_is_visible("nonexistent_page")
|
||||
assert result is False
|
||||
|
||||
|
||||
def test_registry_owner_is_visible_no_owner(console_widget_with_static_id):
|
||||
"""Test owner_is_visible returns False when page has no owner."""
|
||||
widget = console_widget_with_static_id
|
||||
|
||||
# Yield ownership so there's no owner
|
||||
widget.yield_ownership()
|
||||
page_info = _web_console_registry.get_page_info(widget._unique_id)
|
||||
assert page_info.owner_gui_id is None
|
||||
|
||||
result = _web_console_registry.owner_is_visible(widget._unique_id)
|
||||
assert result is False
|
||||
|
||||
|
||||
def test_registry_owner_is_visible_owner_ref_none(console_widget_with_static_id):
|
||||
"""Test owner_is_visible returns False when owner ref doesn't exist in instances."""
|
||||
widget = console_widget_with_static_id
|
||||
unique_id = widget._unique_id
|
||||
|
||||
# Remove owner from instances dict
|
||||
del _web_console_registry._instances[widget.gui_id]
|
||||
|
||||
result = _web_console_registry.owner_is_visible(unique_id)
|
||||
assert result is False
|
||||
|
||||
|
||||
def test_registry_owner_is_visible_owner_instance_none(console_widget_with_static_id):
|
||||
"""Test owner_is_visible returns False when owner instance weakref is dead."""
|
||||
widget = console_widget_with_static_id
|
||||
unique_id = widget._unique_id
|
||||
gui_id = widget.gui_id
|
||||
|
||||
# Simulate dead weakref
|
||||
_web_console_registry._instances[gui_id] = lambda: None
|
||||
|
||||
result = _web_console_registry.owner_is_visible(unique_id)
|
||||
assert result is False
|
||||
|
||||
|
||||
def test_registry_owner_is_visible_owner_visible(console_widget_with_static_id):
|
||||
"""Test owner_is_visible returns True when owner is visible."""
|
||||
widget = console_widget_with_static_id
|
||||
widget.show()
|
||||
|
||||
result = _web_console_registry.owner_is_visible(widget._unique_id)
|
||||
assert result is True
|
||||
|
||||
|
||||
def test_registry_owner_is_visible_owner_not_visible(console_widget_with_static_id):
|
||||
"""Test owner_is_visible returns False when owner is not visible."""
|
||||
widget = console_widget_with_static_id
|
||||
widget.hide()
|
||||
|
||||
result = _web_console_registry.owner_is_visible(widget._unique_id)
|
||||
assert result is False
|
||||
|
||||
Reference in New Issue
Block a user