diff --git a/.github/workflows/end2end-conda.yml b/.github/workflows/end2end-conda.yml index 65c644fd..432903f1 100644 --- a/.github/workflows/end2end-conda.yml +++ b/.github/workflows/end2end-conda.yml @@ -9,10 +9,10 @@ jobs: shell: bash -el {0} env: - CHILD_PIPELINE_BRANCH: main # Set the branch you want for ophyd_devices - BEC_CORE_BRANCH: main # Set the branch you want for bec - OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices - PLUGIN_REPO_BRANCH: main # Set the branch you want for the plugin repo + CHILD_PIPELINE_BRANCH: main # Set the branch you want for ophyd_devices + BEC_CORE_BRANCH: main # Set the branch you want for bec + OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices + PLUGIN_REPO_BRANCH: main # Set the branch you want for the plugin repo PROJECT_PATH: ${{ github.repository }} QTWEBENGINE_DISABLE_SANDBOX: 1 QT_QPA_PLATFORM: "offscreen" @@ -23,15 +23,16 @@ jobs: - name: Set up Conda uses: conda-incubator/setup-miniconda@v3 with: - auto-update-conda: true - auto-activate-base: true - python-version: '3.11' + auto-update-conda: true + auto-activate-base: true + python-version: "3.11" - name: Install dependencies run: | sudo apt-get update sudo apt-get install -y libgl1 libegl1 x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb sudo apt-get -y install libnss3 libxdamage1 libasound2t64 libatomic1 libxcursor1 + sudo apt-get -y install ttyd - name: Conda install and run pytest run: | @@ -55,4 +56,4 @@ jobs: with: name: pytest-logs path: ./logs/*.log - retention-days: 7 \ No newline at end of file + retention-days: 7 diff --git a/bec_widgets/applications/views/developer_view/developer_widget.py b/bec_widgets/applications/views/developer_view/developer_widget.py index 76f482bc..8e4f42f1 100644 --- a/bec_widgets/applications/views/developer_view/developer_widget.py +++ b/bec_widgets/applications/views/developer_view/developer_widget.py @@ -18,7 +18,7 @@ from bec_widgets.widgets.containers.advanced_dock_area.basic_dock_area import Do from bec_widgets.widgets.containers.qt_ads import CDockWidget from bec_widgets.widgets.editors.monaco.monaco_dock import MonacoDock from bec_widgets.widgets.editors.monaco.monaco_widget import MonacoWidget -from bec_widgets.widgets.editors.web_console.web_console import WebConsole +from bec_widgets.widgets.editors.web_console.web_console import BECShell, WebConsole from bec_widgets.widgets.utility.ide_explorer.ide_explorer import IDEExplorer @@ -91,9 +91,10 @@ class DeveloperWidget(DockAreaWidget): # Initialize the widgets self.explorer = IDEExplorer(self) self.explorer.setObjectName("Explorer") - self.console = WebConsole(self) - self.console.setObjectName("Console") - self.terminal = WebConsole(self, startup_cmd="") + + self.console = BECShell(self) + self.console.setObjectName("BEC Shell") + self.terminal = WebConsole(self) self.terminal.setObjectName("Terminal") self.monaco = MonacoDock(self) self.monaco.setObjectName("MonacoEditor") diff --git a/bec_widgets/cli/client.py b/bec_widgets/cli/client.py index 6fd5dd0c..7ae6767f 100644 --- a/bec_widgets/cli/client.py +++ b/bec_widgets/cli/client.py @@ -30,6 +30,7 @@ _Widgets = { "BECMainWindow": "BECMainWindow", "BECProgressBar": "BECProgressBar", "BECQueue": "BECQueue", + "BECShell": "BECShell", "BECStatusBox": "BECStatusBox", "DapComboBox": "DapComboBox", "DarkModeButton": "DarkModeButton", @@ -495,6 +496,28 @@ class BECQueue(RPCBase): """ +class BECShell(RPCBase): + """A WebConsole pre-configured to run the BEC shell.""" + + @rpc_call + def remove(self): + """ + Cleanup the BECConnector + """ + + @rpc_call + def attach(self): + """ + None + """ + + @rpc_call + def detach(self): + """ + Detach the widget from its parent dock widget (if widget is in the dock), making it a floating widget. + """ + + class BECStatusBox(RPCBase): """An autonomous widget to display the status of BEC services.""" diff --git a/bec_widgets/widgets/containers/advanced_dock_area/advanced_dock_area.py b/bec_widgets/widgets/containers/advanced_dock_area/advanced_dock_area.py index c9a4812e..4955b0da 100644 --- a/bec_widgets/widgets/containers/advanced_dock_area/advanced_dock_area.py +++ b/bec_widgets/widgets/containers/advanced_dock_area/advanced_dock_area.py @@ -70,7 +70,7 @@ from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow from bec_widgets.widgets.containers.qt_ads import CDockWidget from bec_widgets.widgets.control.device_control.positioner_box import PositionerBox, PositionerBox2D from bec_widgets.widgets.control.scan_control import ScanControl -from bec_widgets.widgets.editors.web_console.web_console import WebConsole +from bec_widgets.widgets.editors.web_console.web_console import BECShell, WebConsole from bec_widgets.widgets.plots.heatmap.heatmap import Heatmap from bec_widgets.widgets.plots.image.image import Image from bec_widgets.widgets.plots.motor_map.motor_map import MotorMap @@ -378,7 +378,7 @@ class AdvancedDockArea(DockAreaWidget): "RingProgressBar", ), "terminal": (WebConsole.ICON_NAME, "Add Terminal", "WebConsole"), - "bec_shell": (WebConsole.ICON_NAME, "Add BEC Shell", "WebConsole"), + "bec_shell": (BECShell.ICON_NAME, "Add BEC Shell", "BECShell"), "log_panel": (LogPanel.ICON_NAME, "Add LogPanel - Disabled", "LogPanel"), "sbb_monitor": ("train", "Add SBB Monitor", "SBBMonitor"), } @@ -501,10 +501,7 @@ class AdvancedDockArea(DockAreaWidget): elif key == "bec_shell": act.triggered.connect( lambda _, t=widget_type: self.new( - widget=t, - closable=True, - startup_cmd=f"bec --gui-id {self.bec_dispatcher.cli_server.gui_id}", - show_settings_action=True, + widget=t, closable=True, show_settings_action=False ) ) else: diff --git a/bec_widgets/widgets/editors/web_console/bec_shell.pyproject b/bec_widgets/widgets/editors/web_console/bec_shell.pyproject new file mode 100644 index 00000000..786a751f --- /dev/null +++ b/bec_widgets/widgets/editors/web_console/bec_shell.pyproject @@ -0,0 +1 @@ +{'files': ['web_console.py']} \ No newline at end of file diff --git a/bec_widgets/widgets/editors/web_console/bec_shell_plugin.py b/bec_widgets/widgets/editors/web_console/bec_shell_plugin.py new file mode 100644 index 00000000..92112c39 --- /dev/null +++ b/bec_widgets/widgets/editors/web_console/bec_shell_plugin.py @@ -0,0 +1,57 @@ +# Copyright (C) 2022 The Qt Company Ltd. +# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause + +from qtpy.QtDesigner import QDesignerCustomWidgetInterface +from qtpy.QtWidgets import QWidget + +from bec_widgets.utils.bec_designer import designer_material_icon +from bec_widgets.widgets.editors.web_console.web_console import BECShell + +DOM_XML = """ + + + + +""" + + +class BECShellPlugin(QDesignerCustomWidgetInterface): # pragma: no cover + def __init__(self): + super().__init__() + self._form_editor = None + + def createWidget(self, parent): + if parent is None: + return QWidget() + t = BECShell(parent) + return t + + def domXml(self): + return DOM_XML + + def group(self): + return "" + + def icon(self): + return designer_material_icon(BECShell.ICON_NAME) + + def includeFile(self): + return "bec_shell" + + def initialize(self, form_editor): + self._form_editor = form_editor + + def isContainer(self): + return False + + def isInitialized(self): + return self._form_editor is not None + + def name(self): + return "BECShell" + + def toolTip(self): + return "" + + def whatsThis(self): + return self.toolTip() diff --git a/bec_widgets/widgets/editors/web_console/register_bec_shell.py b/bec_widgets/widgets/editors/web_console/register_bec_shell.py new file mode 100644 index 00000000..3e556298 --- /dev/null +++ b/bec_widgets/widgets/editors/web_console/register_bec_shell.py @@ -0,0 +1,15 @@ +def main(): # pragma: no cover + from qtpy import PYSIDE6 + + if not PYSIDE6: + print("PYSIDE6 is not available in the environment. Cannot patch designer.") + return + from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection + + from bec_widgets.widgets.editors.web_console.bec_shell_plugin import BECShellPlugin + + QPyDesignerCustomWidgetCollection.addCustomWidget(BECShellPlugin()) + + +if __name__ == "__main__": # pragma: no cover + main() diff --git a/bec_widgets/widgets/editors/web_console/web_console.py b/bec_widgets/widgets/editors/web_console/web_console.py index 62eede57..c7ca75da 100644 --- a/bec_widgets/widgets/editors/web_console/web_console.py +++ b/bec_widgets/widgets/editors/web_console/web_console.py @@ -1,21 +1,39 @@ from __future__ import annotations +import enum +import json import secrets import subprocess import time from bec_lib.logger import bec_logger from louie.saferef import safe_ref -from qtpy.QtCore import QTimer, QUrl, Signal, qInstallMessageHandler +from pydantic import BaseModel +from qtpy.QtCore import Qt, QTimer, QUrl, Signal, qInstallMessageHandler +from qtpy.QtGui import QMouseEvent, QResizeEvent from qtpy.QtWebEngineWidgets import QWebEnginePage, QWebEngineView -from qtpy.QtWidgets import QApplication, QVBoxLayout, QWidget +from qtpy.QtWidgets import QApplication, QLabel, QTabWidget, QVBoxLayout, QWidget from bec_widgets.utils.bec_widget import BECWidget -from bec_widgets.utils.error_popups import SafeProperty logger = bec_logger.logger +class ConsoleMode(str, enum.Enum): + ACTIVE = "active" + INACTIVE = "inactive" + HIDDEN = "hidden" + + +class PageOwnerInfo(BaseModel): + owner_gui_id: str | None = None + widget_ids: list[str] = [] + page: QWebEnginePage | None = None + initialized: bool = False + + model_config = {"arbitrary_types_allowed": True} + + class WebConsoleRegistry: """ A registry for the WebConsole class to manage its instances. @@ -29,14 +47,21 @@ class WebConsoleRegistry: self._server_process = None self._server_port = None self._token = secrets.token_hex(16) + self._page_registry: dict[str, PageOwnerInfo] = {} def register(self, instance: WebConsole): """ Register an instance of WebConsole. + + Args: + instance (WebConsole): The instance to register. """ self._instances[instance.gui_id] = safe_ref(instance) self.cleanup() + if instance._unique_id: + self._register_page(instance) + if self._server_process is None: # Start the ttyd server if not already running self.start_ttyd() @@ -141,8 +166,158 @@ class WebConsoleRegistry: if instance.gui_id in self._instances: del self._instances[instance.gui_id] + if instance._unique_id: + self._unregister_page(instance._unique_id, instance.gui_id) + self.cleanup() + def _register_page(self, instance: WebConsole): + """ + Register a page in the registry. Please note that this does not transfer ownership + for already existing pages; it simply records which widget currently owns the page. + Use transfer_page_ownership to change ownership. + + Args: + instance (WebConsole): The instance to register. + """ + + unique_id = instance._unique_id + gui_id = instance.gui_id + + if unique_id is None: + return + + if unique_id not in self._page_registry: + page = BECWebEnginePage() + page.authenticationRequired.connect(instance._authenticate) + self._page_registry[unique_id] = PageOwnerInfo( + owner_gui_id=gui_id, widget_ids=[gui_id], page=page + ) + logger.info(f"Registered new page {unique_id} for {gui_id}") + return + + if gui_id not in self._page_registry[unique_id].widget_ids: + self._page_registry[unique_id].widget_ids.append(gui_id) + + def _unregister_page(self, unique_id: str, gui_id: str): + """ + Unregister a page from the registry. + + Args: + unique_id (str): The unique identifier for the page. + gui_id (str): The GUI ID of the widget. + """ + if unique_id not in self._page_registry: + return + page_info = self._page_registry[unique_id] + if gui_id in page_info.widget_ids: + page_info.widget_ids.remove(gui_id) + if page_info.owner_gui_id == gui_id: + page_info.owner_gui_id = None + if not page_info.widget_ids: + if page_info.page: + page_info.page.deleteLater() + del self._page_registry[unique_id] + + logger.info(f"Unregistered page {unique_id} for {gui_id}") + + def get_page_info(self, unique_id: str) -> PageOwnerInfo | None: + """ + Get a page from the registry. + + Args: + unique_id (str): The unique identifier for the page. + + Returns: + PageOwnerInfo | None: The page info if found, None otherwise. + """ + if unique_id not in self._page_registry: + return None + return self._page_registry[unique_id] + + def take_page_ownership(self, unique_id: str, new_owner_gui_id: str) -> QWebEnginePage | None: + """ + Transfer ownership of a page to a new owner. + + Args: + unique_id (str): The unique identifier for the page. + new_owner_gui_id (str): The GUI ID of the new owner. + + Returns: + QWebEnginePage | None: The page if ownership transfer was successful, None otherwise. + """ + if unique_id not in self._page_registry: + logger.warning(f"Page {unique_id} not found in registry") + return None + + page_info = self._page_registry[unique_id] + old_owner_gui_id = page_info.owner_gui_id + if old_owner_gui_id: + old_owner_ref = self._instances.get(old_owner_gui_id) + if old_owner_ref: + old_owner_instance = old_owner_ref() + if old_owner_instance: + old_owner_instance.yield_ownership() + page_info.owner_gui_id = new_owner_gui_id + + logger.info(f"Transferred ownership of page {unique_id} to {new_owner_gui_id}") + return page_info.page + + def yield_ownership(self, gui_id: str) -> bool: + """ + Yield ownership of a page without destroying it. The page remains in the + registry with no owner, available for another widget to claim. + + Args: + gui_id (str): The GUI ID of the widget yielding ownership. + + Returns: + bool: True if ownership was yielded, False otherwise. + """ + if gui_id not in self._instances: + return False + + instance = self._instances[gui_id]() + if instance is None: + return False + + unique_id = instance._unique_id + if unique_id is None: + return False + + if unique_id not in self._page_registry: + return False + + page_owner_info = self._page_registry[unique_id] + if page_owner_info.owner_gui_id != gui_id: + return False + + page_owner_info.owner_gui_id = None + return True + + def owner_is_visible(self, unique_id: str) -> bool: + """ + Check if the owner of a page is currently visible. + + Args: + unique_id (str): The unique identifier for the page. + Returns: + bool: True if the owner is visible, False otherwise. + """ + page_info = self.get_page_info(unique_id) + if page_info is None or page_info.owner_gui_id is None: + return False + + owner_ref = self._instances.get(page_info.owner_gui_id) + if owner_ref is None: + return False + + owner_instance = owner_ref() + if owner_instance is None: + return False + + return owner_instance.isVisible() + _web_console_registry = WebConsoleRegistry() @@ -178,34 +353,103 @@ class WebConsole(BECWidget, QWidget): config=None, client=None, gui_id=None, - startup_cmd: str | None = "bec --nogui", + startup_cmd: str | None = None, + is_bec_shell: bool = False, + unique_id: str | None = None, **kwargs, ): super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs) + self._mode = ConsoleMode.INACTIVE + self._is_bec_shell = is_bec_shell self._startup_cmd = startup_cmd self._is_initialized = False - _web_console_registry.register(self) - self._token = _web_console_registry._token - layout = QVBoxLayout() - layout.setContentsMargins(0, 0, 0, 0) - self.browser = QWebEngineView(self) - self.page = BECWebEnginePage(self) - self.page.authenticationRequired.connect(self._authenticate) - self.browser.setPage(self.page) - layout.addWidget(self.browser) - self.setLayout(layout) - self.page.setUrl(QUrl(f"http://localhost:{_web_console_registry._server_port}")) + self._unique_id = unique_id + self.page = None # Will be set in _set_up_page + self._startup_timer = QTimer() self._startup_timer.setInterval(500) self._startup_timer.timeout.connect(self._check_page_ready) self._startup_timer.start() self._js_callback.connect(self._on_js_callback) + self._set_up_page() + + def _set_up_page(self): + """ + Set up the web page and UI elements. + """ + layout = QVBoxLayout() + layout.setContentsMargins(0, 0, 0, 0) + self.browser = QWebEngineView(self) + + layout.addWidget(self.browser) + self.setLayout(layout) + + # prepare overlay + self.overlay = QWidget(self) + layout = QVBoxLayout(self.overlay) + layout.setAlignment(Qt.AlignmentFlag.AlignCenter) + label = QLabel("Click to activate terminal", self.overlay) + layout.addWidget(label) + self.overlay.hide() + + _web_console_registry.register(self) + self._token = _web_console_registry._token + + # If no unique_id is provided, create a new page + if not self._unique_id: + self.page = BECWebEnginePage(self) + self.page.authenticationRequired.connect(self._authenticate) + self.page.setUrl(QUrl(f"http://localhost:{_web_console_registry._server_port}")) + self.browser.setPage(self.page) + self._set_mode(ConsoleMode.ACTIVE) + return + + # Try to get the page from the registry + page_info = _web_console_registry.get_page_info(self._unique_id) + if page_info and page_info.page: + self.page = page_info.page + if not page_info.owner_gui_id or page_info.owner_gui_id == self.gui_id: + self.browser.setPage(self.page) + # Only set URL if this is a newly created page (no URL set yet) + if self.page.url().isEmpty(): + self.page.setUrl(QUrl(f"http://localhost:{_web_console_registry._server_port}")) + else: + # We have an existing page, so we don't need the startup timer + self._startup_timer.stop() + if page_info.owner_gui_id != self.gui_id: + self._set_mode(ConsoleMode.INACTIVE) + else: + self._set_mode(ConsoleMode.ACTIVE) + + def _set_mode(self, mode: ConsoleMode): + """ + Set the mode of the web console. + + Args: + mode (ConsoleMode): The mode to set. + """ + if not self._unique_id: + # For non-unique_id consoles, always active + mode = ConsoleMode.ACTIVE + + self._mode = mode + match mode: + case ConsoleMode.ACTIVE: + self.browser.setVisible(True) + self.overlay.hide() + case ConsoleMode.INACTIVE: + self.browser.setVisible(False) + self.overlay.show() + case ConsoleMode.HIDDEN: + self.browser.setVisible(False) + self.overlay.hide() + def _check_page_ready(self): """ Check if the page is ready and stop the timer if it is. """ - if self.page.isLoading(): + if not self.page or self.page.isLoading(): return self.page.runJavaScript("window.term !== undefined", self._js_callback.emit) @@ -218,15 +462,27 @@ class WebConsole(BECWidget, QWidget): return self._is_initialized = True self._startup_timer.stop() - if self._startup_cmd: - self.write(self._startup_cmd) + if self.startup_cmd: + if self._unique_id: + page_info = _web_console_registry.get_page_info(self._unique_id) + if page_info is None: + return + if not page_info.initialized: + page_info.initialized = True + self.write(self.startup_cmd) + else: + self.write(self.startup_cmd) self.initialized.emit() - @SafeProperty(str) + @property def startup_cmd(self): """ Get the startup command for the web console. """ + if self._is_bec_shell: + if self.bec_dispatcher.cli_server is None: + return "bec --nogui" + return f"bec --gui-id {self.bec_dispatcher.cli_server.gui_id}" return self._startup_cmd @startup_cmd.setter @@ -241,11 +497,123 @@ class WebConsole(BECWidget, QWidget): def write(self, data: str, send_return: bool = True): """ Send data to the web page + + Args: + data (str): The data to send. + send_return (bool): Whether to send a return after the data. """ - self.page.runJavaScript(f"window.term.paste('{data}');") + cmd = f"window.term.paste({json.dumps(data)});" + if self.page is None: + logger.warning("Cannot write to web console: page is not initialized.") + return + self.page.runJavaScript(cmd) if send_return: self.send_return() + def take_page_ownership(self, unique_id: str | None = None): + """ + Take ownership of a web page from the registry. This will transfer the page + from its current owner (if any) to this widget. + + Args: + unique_id (str): The unique identifier of the page to take ownership of. + If None, uses this widget's unique_id. + """ + if unique_id is None: + unique_id = self._unique_id + + if not unique_id: + logger.warning("Cannot take page ownership without a unique_id") + return + + # Get the page from registry + page = _web_console_registry.take_page_ownership(unique_id, self.gui_id) + + if not page: + logger.warning(f"Page {unique_id} not found in registry") + return + + self.page = page + self.browser.setPage(page) + self._set_mode(ConsoleMode.ACTIVE) + logger.info(f"Widget {self.gui_id} took ownership of page {unique_id}") + + def _on_ownership_lost(self): + """ + Called when this widget loses ownership of its page. + Displays the overlay and hides the browser. + """ + self._set_mode(ConsoleMode.INACTIVE) + logger.info(f"Widget {self.gui_id} lost ownership of page {self._unique_id}") + + def yield_ownership(self): + """ + Yield ownership of the page. The page remains in the registry with no owner, + available for another widget to claim. This is automatically called when the + widget becomes hidden. + """ + if not self._unique_id: + return + success = _web_console_registry.yield_ownership(self.gui_id) + if success: + self._on_ownership_lost() + logger.info(f"Widget {self.gui_id} yielded ownership of page {self._unique_id}") + + def has_ownership(self) -> bool: + """ + Check if this widget currently has ownership of a page. + + Returns: + bool: True if this widget owns a page, False otherwise. + """ + if not self._unique_id: + return False + page_info = _web_console_registry.get_page_info(self._unique_id) + if page_info is None: + return False + return page_info.owner_gui_id == self.gui_id + + def hideEvent(self, event): + """ + Called when the widget is hidden. Automatically yields ownership. + """ + if self.has_ownership(): + self.yield_ownership() + self._set_mode(ConsoleMode.HIDDEN) + super().hideEvent(event) + + def showEvent(self, event): + """ + Called when the widget is shown. Updates UI state based on ownership. + """ + super().showEvent(event) + if self._unique_id and not self.has_ownership(): + # Take ownership if the page does not have an owner or + # the owner is not visible + page_info = _web_console_registry.get_page_info(self._unique_id) + if page_info is None: + self._set_mode(ConsoleMode.INACTIVE) + return + if page_info.owner_gui_id is None or not _web_console_registry.owner_is_visible( + self._unique_id + ): + self.take_page_ownership(self._unique_id) + return + if page_info.owner_gui_id != self.gui_id: + self._set_mode(ConsoleMode.INACTIVE) + return + + def resizeEvent(self, event: QResizeEvent) -> None: + super().resizeEvent(event) + self.overlay.resize(event.size()) + + def mousePressEvent(self, event: QMouseEvent) -> None: + if event.button() == Qt.MouseButton.LeftButton and not self.has_ownership(): + self.take_page_ownership(self._unique_id) + event.accept() + return + return super().mousePressEvent(event) + def _authenticate(self, _, auth): """ Authenticate the request with the provided username and password. @@ -286,10 +654,52 @@ class WebConsole(BECWidget, QWidget): super().cleanup() +class BECShell(WebConsole): + """ + A WebConsole pre-configured to run the BEC shell. + We cannot simply expose the web console properties to Qt as we need to have a deterministic + startup behavior for sharing the same shell instance across multiple widgets. + """ + + ICON_NAME = "hub" + + def __init__(self, parent=None, config=None, client=None, gui_id=None, **kwargs): + super().__init__( + parent=parent, + config=config, + client=client, + gui_id=gui_id, + is_bec_shell=True, + unique_id="bec_shell", + **kwargs, + ) + + if __name__ == "__main__": # pragma: no cover import sys app = QApplication(sys.argv) - widget = WebConsole() + widget = QTabWidget() + + # Create two consoles with different unique_ids + web_console1 = WebConsole(startup_cmd="bec --nogui", unique_id="console1") + web_console2 = WebConsole(startup_cmd="htop") + web_console3 = WebConsole(startup_cmd="bec --nogui", unique_id="console1") + widget.addTab(web_console1, "Console 1") + widget.addTab(web_console2, "Console 2") + widget.addTab(web_console3, "Console 3 -- mirror of Console 1") widget.show() + + # Demonstrate page sharing: + # After initialization, web_console2 can take ownership of console1's page: + # web_console2.take_page_ownership("console1") + + widget.resize(800, 600) + + def _close_cons1(): + web_console2.close() + web_console2.deleteLater() + + # QTimer.singleShot(3000, _close_cons1) + sys.exit(app.exec_()) diff --git a/tests/unit_tests/test_advanced_dock_area.py b/tests/unit_tests/test_advanced_dock_area.py index 0430af7d..c77ac543 100644 --- a/tests/unit_tests/test_advanced_dock_area.py +++ b/tests/unit_tests/test_advanced_dock_area.py @@ -2208,7 +2208,7 @@ class TestFlatToolbarActions: "flat_status": "BECStatusBox", "flat_progress_bar": "RingProgressBar", "flat_terminal": "WebConsole", - "flat_bec_shell": "WebConsole", + "flat_bec_shell": "BECShell", "flat_sbb_monitor": "SBBMonitor", } diff --git a/tests/unit_tests/test_web_console.py b/tests/unit_tests/test_web_console.py index 3da2f9a0..be49cff1 100644 --- a/tests/unit_tests/test_web_console.py +++ b/tests/unit_tests/test_web_console.py @@ -1,25 +1,69 @@ from unittest import mock import pytest +from qtpy.QtCore import Qt +from qtpy.QtGui import QHideEvent from qtpy.QtNetwork import QAuthenticator -from bec_widgets.widgets.editors.web_console.web_console import WebConsole, _web_console_registry +from bec_widgets.widgets.editors.web_console.web_console import ( + BECShell, + ConsoleMode, + WebConsole, + _web_console_registry, +) from .client_mocks import mocked_client @pytest.fixture -def console_widget(qtbot, mocked_client): +def mocked_server_startup(): + """Mock the web console server startup process.""" with mock.patch( "bec_widgets.widgets.editors.web_console.web_console.subprocess" ) as mock_subprocess: with mock.patch.object(_web_console_registry, "_wait_for_server_port"): _web_console_registry._server_port = 12345 - # Create the WebConsole widget - widget = WebConsole(client=mocked_client) - qtbot.addWidget(widget) - qtbot.waitExposed(widget) - yield widget + yield mock_subprocess + + +def static_console(qtbot, client, unique_id: str | None = None): + """Fixture to provide a static unique_id for WebConsole tests.""" + if unique_id is None: + widget = WebConsole(client=client) + else: + widget = WebConsole(client=client, unique_id=unique_id) + qtbot.addWidget(widget) + qtbot.waitExposed(widget) + return widget + + +@pytest.fixture +def console_widget(qtbot, mocked_client, mocked_server_startup): + """Create a WebConsole widget with mocked server startup.""" + yield static_console(qtbot, mocked_client) + + +@pytest.fixture +def bec_shell_widget(qtbot, mocked_client, mocked_server_startup): + """Create a BECShell widget with mocked server startup.""" + widget = BECShell(client=mocked_client) + qtbot.addWidget(widget) + qtbot.waitExposed(widget) + yield widget + + +@pytest.fixture +def console_widget_with_static_id(qtbot, mocked_client, mocked_server_startup): + """Create a WebConsole widget with a static unique ID.""" + yield static_console(qtbot, mocked_client, unique_id="test_console") + + +@pytest.fixture +def two_console_widgets_same_id(qtbot, mocked_client, mocked_server_startup): + """Create two WebConsole widgets sharing the same unique ID.""" + widget1 = static_console(qtbot, mocked_client, unique_id="shared_console") + widget2 = static_console(qtbot, mocked_client, unique_id="shared_console") + yield widget1, widget2 def test_web_console_widget_initialization(console_widget): @@ -34,7 +78,7 @@ def test_web_console_write(console_widget): with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js: console_widget.write("Hello, World!") - assert mock.call("window.term.paste('Hello, World!');") in mock_run_js.mock_calls + assert mock.call('window.term.paste("Hello, World!");') in mock_run_js.mock_calls def test_web_console_write_no_return(console_widget): @@ -42,7 +86,7 @@ def test_web_console_write_no_return(console_widget): with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js: console_widget.write("Hello, World!", send_return=False) - assert mock.call("window.term.paste('Hello, World!');") in mock_run_js.mock_calls + assert mock.call('window.term.paste("Hello, World!");') in mock_run_js.mock_calls assert mock_run_js.call_count == 1 @@ -138,6 +182,20 @@ def test_web_console_startup_command_execution(console_widget, qtbot): assert not console_widget._startup_timer.isActive() +def test_bec_shell_startup_contains_gui_id(bec_shell_widget): + """Test that the BEC shell startup command includes the GUI ID.""" + bec_shell = bec_shell_widget + + assert bec_shell._is_bec_shell + assert bec_shell._unique_id == "bec_shell" + + assert bec_shell.startup_cmd == "bec --nogui" + + with mock.patch.object(bec_shell.bec_dispatcher, "cli_server") as mock_cli_server: + mock_cli_server.gui_id = "test_gui_id" + assert bec_shell.startup_cmd == "bec --gui-id test_gui_id" + + def test_web_console_set_readonly(console_widget): # Test the set_readonly method console_widget.set_readonly(True) @@ -145,3 +203,274 @@ def test_web_console_set_readonly(console_widget): console_widget.set_readonly(False) assert console_widget.isEnabled() + + +def test_web_console_with_unique_id(console_widget_with_static_id): + """Test creating a WebConsole with a unique_id.""" + widget = console_widget_with_static_id + + assert widget._unique_id == "test_console" + assert widget._unique_id in _web_console_registry._page_registry + page_info = _web_console_registry.get_page_info("test_console") + assert page_info is not None + assert page_info.owner_gui_id == widget.gui_id + assert widget.gui_id in page_info.widget_ids + + +def test_web_console_page_sharing(two_console_widgets_same_id): + """Test that two widgets can share the same page using unique_id.""" + widget1, widget2 = two_console_widgets_same_id + + # Both should reference the same page in the registry + page_info = _web_console_registry.get_page_info("shared_console") + assert page_info is not None + assert widget1.gui_id in page_info.widget_ids + assert widget2.gui_id in page_info.widget_ids + assert widget1.page == widget2.page + + +def test_web_console_has_ownership(console_widget_with_static_id): + """Test the has_ownership method.""" + widget = console_widget_with_static_id + + # Widget should have ownership by default + assert widget.has_ownership() + + +def test_web_console_yield_ownership(console_widget_with_static_id): + """Test yielding ownership of a page.""" + widget = console_widget_with_static_id + + assert widget.has_ownership() + + # Yield ownership + widget.yield_ownership() + + # Widget should no longer have ownership + assert not widget.has_ownership() + page_info = _web_console_registry.get_page_info("test_console") + assert page_info.owner_gui_id is None + # Overlay should be shown + assert widget._mode == ConsoleMode.INACTIVE + + +def test_web_console_take_page_ownership(two_console_widgets_same_id): + """Test taking ownership of a page.""" + widget1, widget2 = two_console_widgets_same_id + + # Widget1 should have ownership initially + assert widget1.has_ownership() + assert not widget2.has_ownership() + + # Widget2 takes ownership + widget2.take_page_ownership() + + # Now widget2 should have ownership + assert not widget1.has_ownership() + assert widget2.has_ownership() + + assert widget2._mode == ConsoleMode.ACTIVE + assert widget1._mode == ConsoleMode.INACTIVE + + +def test_web_console_hide_event_yields_ownership(qtbot, console_widget_with_static_id): + """Test that hideEvent yields ownership.""" + widget = console_widget_with_static_id + + assert widget.has_ownership() + + # Hide the widget. Note that we cannot call widget.hide() directly + # because it doesn't trigger the hideEvent in tests as widgets are + # not visible in the test environment. + widget.hideEvent(QHideEvent()) + qtbot.wait(100) # Allow event processing + + # Widget should have yielded ownership + assert not widget.has_ownership() + page_info = _web_console_registry.get_page_info("test_console") + assert page_info.owner_gui_id is None + + +def test_web_console_show_event_takes_ownership(console_widget_with_static_id): + """Test that showEvent takes ownership when page has no owner.""" + widget = console_widget_with_static_id + + # Yield ownership + widget.yield_ownership() + assert not widget.has_ownership() + + # Show the widget again + widget.show() + + # Widget should have reclaimed ownership + assert widget.has_ownership() + assert widget.browser.isVisible() + assert not widget.overlay.isVisible() + + +def test_web_console_mouse_press_takes_ownership(qtbot, two_console_widgets_same_id): + """Test that clicking on overlay takes ownership.""" + widget1, widget2 = two_console_widgets_same_id + widget1.show() + widget2.show() + + # Widget1 has ownership, widget2 doesn't + assert widget1.has_ownership() + assert not widget2.has_ownership() + assert widget1.isVisible() + assert widget1._mode == ConsoleMode.ACTIVE + assert widget2._mode == ConsoleMode.INACTIVE + + qtbot.mouseClick(widget2, Qt.MouseButton.LeftButton) + + # Widget2 should now have ownership + assert widget2.has_ownership() + assert not widget1.has_ownership() + + +def test_web_console_registry_cleanup_removes_page(console_widget_with_static_id): + """Test that the registry cleans up pages when all widgets are removed.""" + widget = console_widget_with_static_id + + assert widget._unique_id in _web_console_registry._page_registry + + # Cleanup the widget + widget.cleanup() + + # Page should be removed from registry + assert widget._unique_id not in _web_console_registry._page_registry + + +def test_web_console_without_unique_id_no_page_sharing(console_widget): + """Test that widgets without unique_id don't participate in page sharing.""" + widget = console_widget + + # Widget should not be in the page registry + assert widget._unique_id is None + assert not widget.has_ownership() # Should return False for non-unique widgets + + +def test_web_console_registry_get_page_info_nonexistent(qtbot, mocked_client): + """Test getting page info for a non-existent page.""" + page_info = _web_console_registry.get_page_info("nonexistent") + assert page_info is None + + +def test_web_console_take_ownership_without_unique_id(console_widget): + """Test that take_page_ownership fails gracefully without unique_id.""" + widget = console_widget + # Should not crash when taking ownership without unique_id + widget.take_page_ownership() + + +def test_web_console_yield_ownership_without_unique_id(console_widget): + """Test that yield_ownership fails gracefully without unique_id.""" + widget = console_widget + # Should not crash when yielding ownership without unique_id + widget.yield_ownership() + + +def test_registry_yield_ownership_gui_id_not_in_instances(): + """Test registry yield_ownership returns False when gui_id not in instances.""" + result = _web_console_registry.yield_ownership("nonexistent_gui_id") + assert result is False + + +def test_registry_yield_ownership_instance_is_none(console_widget_with_static_id): + """Test registry yield_ownership returns False when instance weakref is dead.""" + widget = console_widget_with_static_id + gui_id = widget.gui_id + + # Store the gui_id and simulate the weakref being dead + _web_console_registry._instances[gui_id] = lambda: None + + result = _web_console_registry.yield_ownership(gui_id) + assert result is False + + +def test_registry_yield_ownership_unique_id_none(console_widget_with_static_id): + """Test registry yield_ownership returns False when page info's unique_id is None.""" + widget = console_widget_with_static_id + gui_id = widget.gui_id + unique_id = widget._unique_id + widget._unique_id = None + + result = _web_console_registry.yield_ownership(gui_id) + assert result is False + + widget._unique_id = unique_id # Restore for cleanup + + +def test_registry_yield_ownership_unique_id_not_in_page_registry(console_widget_with_static_id): + """Test registry yield_ownership returns False when unique_id not in page registry.""" + widget = console_widget_with_static_id + gui_id = widget.gui_id + unique_id = widget._unique_id + widget._unique_id = "nonexistent_unique_id" + + result = _web_console_registry.yield_ownership(gui_id) + assert result is False + + widget._unique_id = unique_id # Restore for cleanup + + +def test_registry_owner_is_visible_page_info_none(): + """Test owner_is_visible returns False when page info doesn't exist.""" + result = _web_console_registry.owner_is_visible("nonexistent_page") + assert result is False + + +def test_registry_owner_is_visible_no_owner(console_widget_with_static_id): + """Test owner_is_visible returns False when page has no owner.""" + widget = console_widget_with_static_id + + # Yield ownership so there's no owner + widget.yield_ownership() + page_info = _web_console_registry.get_page_info(widget._unique_id) + assert page_info.owner_gui_id is None + + result = _web_console_registry.owner_is_visible(widget._unique_id) + assert result is False + + +def test_registry_owner_is_visible_owner_ref_none(console_widget_with_static_id): + """Test owner_is_visible returns False when owner ref doesn't exist in instances.""" + widget = console_widget_with_static_id + unique_id = widget._unique_id + + # Remove owner from instances dict + del _web_console_registry._instances[widget.gui_id] + + result = _web_console_registry.owner_is_visible(unique_id) + assert result is False + + +def test_registry_owner_is_visible_owner_instance_none(console_widget_with_static_id): + """Test owner_is_visible returns False when owner instance weakref is dead.""" + widget = console_widget_with_static_id + unique_id = widget._unique_id + gui_id = widget.gui_id + + # Simulate dead weakref + _web_console_registry._instances[gui_id] = lambda: None + + result = _web_console_registry.owner_is_visible(unique_id) + assert result is False + + +def test_registry_owner_is_visible_owner_visible(console_widget_with_static_id): + """Test owner_is_visible returns True when owner is visible.""" + widget = console_widget_with_static_id + widget.show() + + result = _web_console_registry.owner_is_visible(widget._unique_id) + assert result is True + + +def test_registry_owner_is_visible_owner_not_visible(console_widget_with_static_id): + """Test owner_is_visible returns False when owner is not visible.""" + widget = console_widget_with_static_id + widget.hide() + + result = _web_console_registry.owner_is_visible(widget._unique_id) + assert result is False