From 56b6a0b8c27e4f60417df15f87f420848d781e79 Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Wed, 14 May 2025 13:31:07 +0200 Subject: [PATCH] feat: add web console --- bec_widgets/cli/client.py | 11 + .../web_console/register_web_console.py | 15 ++ .../editors/web_console/web_console.py | 230 ++++++++++++++++++ .../editors/web_console/web_console.pyproject | 1 + .../editors/web_console/web_console_plugin.py | 54 ++++ tests/end-2-end/test_rpc_widgets_e2e.py | 4 + tests/unit_tests/test_web_console.py | 90 +++++++ 7 files changed, 405 insertions(+) create mode 100644 bec_widgets/widgets/editors/web_console/register_web_console.py create mode 100644 bec_widgets/widgets/editors/web_console/web_console.py create mode 100644 bec_widgets/widgets/editors/web_console/web_console.pyproject create mode 100644 bec_widgets/widgets/editors/web_console/web_console_plugin.py create mode 100644 tests/unit_tests/test_web_console.py diff --git a/bec_widgets/cli/client.py b/bec_widgets/cli/client.py index ee73b059..57ef207a 100644 --- a/bec_widgets/cli/client.py +++ b/bec_widgets/cli/client.py @@ -55,6 +55,7 @@ _Widgets = { "TextBox": "TextBox", "VSCodeEditor": "VSCodeEditor", "Waveform": "Waveform", + "WebConsole": "WebConsole", "WebsiteWidget": "WebsiteWidget", } @@ -3501,6 +3502,16 @@ class Waveform(RPCBase): """ +class WebConsole(RPCBase): + """A simple widget to display a website""" + + @rpc_call + def remove(self): + """ + Cleanup the BECConnector + """ + + class WebsiteWidget(RPCBase): """A simple widget to display a website""" diff --git a/bec_widgets/widgets/editors/web_console/register_web_console.py b/bec_widgets/widgets/editors/web_console/register_web_console.py new file mode 100644 index 00000000..e814e0ca --- /dev/null +++ b/bec_widgets/widgets/editors/web_console/register_web_console.py @@ -0,0 +1,15 @@ +def main(): # pragma: no cover + from qtpy import PYSIDE6 + + if not PYSIDE6: + print("PYSIDE6 is not available in the environment. Cannot patch designer.") + return + from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection + + from bec_widgets.widgets.editors.web_console.web_console_plugin import WebConsolePlugin + + QPyDesignerCustomWidgetCollection.addCustomWidget(WebConsolePlugin()) + + +if __name__ == "__main__": # pragma: no cover + main() diff --git a/bec_widgets/widgets/editors/web_console/web_console.py b/bec_widgets/widgets/editors/web_console/web_console.py new file mode 100644 index 00000000..e84c35a2 --- /dev/null +++ b/bec_widgets/widgets/editors/web_console/web_console.py @@ -0,0 +1,230 @@ +from __future__ import annotations + +import secrets +import subprocess +import time + +from bec_lib.logger import bec_logger +from louie.saferef import safe_ref +from qtpy.QtCore import QUrl, qInstallMessageHandler +from qtpy.QtWebEngineWidgets import QWebEnginePage, QWebEngineView +from qtpy.QtWidgets import QApplication, QVBoxLayout, QWidget + +from bec_widgets.utils.bec_widget import BECWidget + +logger = bec_logger.logger + + +class WebConsoleRegistry: + """ + A registry for the WebConsole class to manage its instances. + """ + + def __init__(self): + """ + Initialize the registry. + """ + self._instances = {} + self._server_process = None + self._server_port = None + self._token = secrets.token_hex(16) + + def register(self, instance: WebConsole): + """ + Register an instance of WebConsole. + """ + self._instances[instance.gui_id] = safe_ref(instance) + self.cleanup() + + if self._server_process is None: + # Start the ttyd server if not already running + self.start_ttyd() + + def start_ttyd(self, use_zsh: bool | None = None): + """ + Start the ttyd server + ttyd -q -W -t 'theme={"background": "black"}' zsh + + Args: + use_zsh (bool): Whether to use zsh or bash. If None, it will try to detect if zsh is available. + """ + + # First, check if ttyd is installed + try: + subprocess.run(["ttyd", "--version"], check=True, stdout=subprocess.PIPE) + except FileNotFoundError: + # pylint: disable=raise-missing-from + raise RuntimeError("ttyd is not installed. Please install it first.") + + if use_zsh is None: + # Check if we can use zsh + try: + subprocess.run(["zsh", "--version"], check=True, stdout=subprocess.PIPE) + use_zsh = True + except FileNotFoundError: + use_zsh = False + + command = [ + "ttyd", + "-p", + "0", + "-W", + "-t", + 'theme={"background": "black"}', + "-c", + f"user:{self._token}", + ] + if use_zsh: + command.append("zsh") + else: + command.append("bash") + + # Start the ttyd server + self._server_process = subprocess.Popen( + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + + self._wait_for_server_port() + + self._server_process.stdout.close() + self._server_process.stderr.close() + + def _wait_for_server_port(self, timeout: float = 10): + """ + Wait for the ttyd server to start and get the port number. + + Args: + timeout (float): The timeout in seconds to wait for the server to start. + """ + start_time = time.time() + while True: + output = self._server_process.stderr.readline() + if output == b"" and self._server_process.poll() is not None: + break + if not output: + continue + + output = output.decode("utf-8").strip() + if "Listening on" in output: + # Extract the port number from the output + self._server_port = int(output.split(":")[-1]) + logger.info(f"ttyd server started on port {self._server_port}") + break + if time.time() - start_time > timeout: + raise TimeoutError( + "Timeout waiting for ttyd server to start. Please check if ttyd is installed and available in your PATH." + ) + + def cleanup(self): + """ + Clean up the registry by removing any instances that are no longer valid. + """ + for gui_id, weak_ref in list(self._instances.items()): + if weak_ref() is None: + del self._instances[gui_id] + + if not self._instances and self._server_process: + # If no instances are left, terminate the server process + self._server_process.terminate() + self._server_process = None + self._server_port = None + logger.info("ttyd server terminated") + + def unregister(self, instance: WebConsole): + """ + Unregister an instance of WebConsole. + + Args: + instance (WebConsole): The instance to unregister. + """ + if instance.gui_id in self._instances: + del self._instances[instance.gui_id] + + self.cleanup() + + +_web_console_registry = WebConsoleRegistry() + + +def suppress_qt_messages(type_, context, msg): + if context.category in ["js", "default"]: + return + print(msg) + + +qInstallMessageHandler(suppress_qt_messages) + + +class BECWebEnginePage(QWebEnginePage): + def javaScriptConsoleMessage(self, level, message, lineNumber, sourceID): + logger.info(f"[JS Console] {level.name} at line {lineNumber} in {sourceID}: {message}") + + +class WebConsole(BECWidget, QWidget): + """ + A simple widget to display a website + """ + + PLUGIN = True + ICON_NAME = "terminal" + + def __init__(self, parent=None, config=None, client=None, gui_id=None, **kwargs): + super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs) + _web_console_registry.register(self) + self._token = _web_console_registry._token + layout = QVBoxLayout() + layout.setContentsMargins(0, 0, 0, 0) + self.browser = QWebEngineView(self) + self.page = BECWebEnginePage(self) + self.page.authenticationRequired.connect(self._authenticate) + self.browser.setPage(self.page) + layout.addWidget(self.browser) + self.setLayout(layout) + self.page.setUrl(QUrl(f"http://localhost:{_web_console_registry._server_port}")) + + def write(self, data: str, send_return: bool = True): + """ + Send data to the web page + """ + self.page.runJavaScript(f"window.term.paste('{data}');") + if send_return: + self.send_return() + + def _authenticate(self, _, auth): + """ + Authenticate the request with the provided username and password. + """ + auth.setUser("user") + auth.setPassword(self._token) + + def send_return(self): + """ + Send return to the web page + """ + self.page.runJavaScript( + "document.querySelector('textarea.xterm-helper-textarea').dispatchEvent(new KeyboardEvent('keypress', {charCode: 13}))" + ) + + def send_ctrl_c(self): + """ + Send Ctrl+C to the web page + """ + self.page.runJavaScript( + "document.querySelector('textarea.xterm-helper-textarea').dispatchEvent(new KeyboardEvent('keypress', {charCode: 3}))" + ) + + def cleanup(self): + """ + Clean up the registry by removing any instances that are no longer valid. + """ + _web_console_registry.unregister(self) + super().cleanup() + + +if __name__ == "__main__": # pragma: no cover + import sys + + app = QApplication(sys.argv) + widget = WebConsole() + widget.show() + sys.exit(app.exec_()) diff --git a/bec_widgets/widgets/editors/web_console/web_console.pyproject b/bec_widgets/widgets/editors/web_console/web_console.pyproject new file mode 100644 index 00000000..786a751f --- /dev/null +++ b/bec_widgets/widgets/editors/web_console/web_console.pyproject @@ -0,0 +1 @@ +{'files': ['web_console.py']} \ No newline at end of file diff --git a/bec_widgets/widgets/editors/web_console/web_console_plugin.py b/bec_widgets/widgets/editors/web_console/web_console_plugin.py new file mode 100644 index 00000000..572ad87e --- /dev/null +++ b/bec_widgets/widgets/editors/web_console/web_console_plugin.py @@ -0,0 +1,54 @@ +# Copyright (C) 2022 The Qt Company Ltd. +# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause + +from qtpy.QtDesigner import QDesignerCustomWidgetInterface + +from bec_widgets.utils.bec_designer import designer_material_icon +from bec_widgets.widgets.editors.web_console.web_console import WebConsole + +DOM_XML = """ + + + + +""" + + +class WebConsolePlugin(QDesignerCustomWidgetInterface): # pragma: no cover + def __init__(self): + super().__init__() + self._form_editor = None + + def createWidget(self, parent): + t = WebConsole(parent) + return t + + def domXml(self): + return DOM_XML + + def group(self): + return "BEC Console" + + def icon(self): + return designer_material_icon(WebConsole.ICON_NAME) + + def includeFile(self): + return "web_console" + + def initialize(self, form_editor): + self._form_editor = form_editor + + def isContainer(self): + return False + + def isInitialized(self): + return self._form_editor is not None + + def name(self): + return "WebConsole" + + def toolTip(self): + return "" + + def whatsThis(self): + return self.toolTip() diff --git a/tests/end-2-end/test_rpc_widgets_e2e.py b/tests/end-2-end/test_rpc_widgets_e2e.py index 4ed1fcb2..e980c50e 100644 --- a/tests/end-2-end/test_rpc_widgets_e2e.py +++ b/tests/end-2-end/test_rpc_widgets_e2e.py @@ -96,6 +96,10 @@ def test_available_widgets(qtbot, connected_client_gui_obj): if object_name == "VSCodeEditor": continue + # Skip WebConsole as ttyd is not installed + if object_name == "WebConsole": + continue + ############################# ######### Add widget ######## ############################# diff --git a/tests/unit_tests/test_web_console.py b/tests/unit_tests/test_web_console.py new file mode 100644 index 00000000..2e5fee2c --- /dev/null +++ b/tests/unit_tests/test_web_console.py @@ -0,0 +1,90 @@ +from unittest import mock + +import pytest +from qtpy.QtNetwork import QAuthenticator + +from bec_widgets.widgets.editors.web_console.web_console import WebConsole, _web_console_registry + +from .client_mocks import mocked_client + + +@pytest.fixture +def console_widget(qtbot, mocked_client): + with mock.patch( + "bec_widgets.widgets.editors.web_console.web_console.subprocess" + ) as mock_subprocess: + with mock.patch.object(_web_console_registry, "_wait_for_server_port"): + _web_console_registry._server_port = 12345 + # Create the WebConsole widget + widget = WebConsole(client=mocked_client) + qtbot.addWidget(widget) + qtbot.waitExposed(widget) + yield widget + + +def test_web_console_widget_initialization(console_widget): + assert ( + console_widget.page.url().toString() + == f"http://localhost:{_web_console_registry._server_port}" + ) + + +def test_web_console_write(console_widget): + # Test the write method + with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js: + console_widget.write("Hello, World!") + + assert mock.call("window.term.paste('Hello, World!');") in mock_run_js.mock_calls + + +def test_web_console_write_no_return(console_widget): + # Test the write method with send_return=False + with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js: + console_widget.write("Hello, World!", send_return=False) + + assert mock.call("window.term.paste('Hello, World!');") in mock_run_js.mock_calls + assert mock_run_js.call_count == 1 + + +def test_web_console_send_return(console_widget): + # Test the send_return method + with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js: + console_widget.send_return() + + script = mock_run_js.call_args[0][0] + assert "new KeyboardEvent('keypress', {charCode: 13})" in script + assert mock_run_js.call_count == 1 + + +def test_web_console_send_ctrl_c(console_widget): + # Test the send_ctrl_c method + with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js: + console_widget.send_ctrl_c() + + script = mock_run_js.call_args[0][0] + assert "new KeyboardEvent('keypress', {charCode: 3})" in script + assert mock_run_js.call_count == 1 + + +def test_web_console_authenticate(console_widget): + # Test the _authenticate method + token = _web_console_registry._token + mock_auth = mock.MagicMock(spec=QAuthenticator) + console_widget._authenticate(None, mock_auth) + mock_auth.setUser.assert_called_once_with("user") + mock_auth.setPassword.assert_called_once_with(token) + + +def test_web_console_registry_wait_for_server_port(): + # Test the _wait_for_server_port method + with mock.patch.object(_web_console_registry, "_server_process") as mock_subprocess: + mock_subprocess.stderr.readline.side_effect = [b"Starting", b"Listening on port: 12345"] + _web_console_registry._wait_for_server_port() + assert _web_console_registry._server_port == 12345 + + +def test_web_console_registry_wait_for_server_port_timeout(): + # Test the _wait_for_server_port method with timeout + with mock.patch.object(_web_console_registry, "_server_process") as mock_subprocess: + with pytest.raises(TimeoutError): + _web_console_registry._wait_for_server_port(timeout=0.1)