mirror of
https://github.com/bec-project/bec_widgets.git
synced 2026-05-03 05:14:25 +02:00
wip
This commit is contained in:
@@ -16,9 +16,9 @@ from bec_widgets.utils.toolbars.toolbar import ModularToolBar
|
||||
from bec_widgets.widgets.containers.dock_area.basic_dock_area import DockAreaWidget
|
||||
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
|
||||
from bec_widgets.widgets.containers.qt_ads import CDockWidget
|
||||
from bec_widgets.widgets.editors.bec_console.bec_console import BecConsole, BECShell
|
||||
from bec_widgets.widgets.editors.monaco.monaco_dock import MonacoDock
|
||||
from bec_widgets.widgets.editors.monaco.monaco_widget import MonacoWidget
|
||||
from bec_widgets.widgets.editors.web_console.web_console import BECShell, WebConsole
|
||||
from bec_widgets.widgets.utility.ide_explorer.ide_explorer import IDEExplorer
|
||||
|
||||
|
||||
@@ -96,7 +96,7 @@ class DeveloperWidget(DockAreaWidget):
|
||||
|
||||
self.console = BECShell(self, rpc_exposed=False)
|
||||
self.console.setObjectName("BEC Shell")
|
||||
self.terminal = WebConsole(self, rpc_exposed=False)
|
||||
self.terminal = BecConsole(self, rpc_exposed=False)
|
||||
self.terminal.setObjectName("Terminal")
|
||||
self.monaco = MonacoDock(self, rpc_exposed=False, rpc_passthrough_children=False)
|
||||
self.monaco.setObjectName("MonacoEditor")
|
||||
@@ -410,23 +410,3 @@ class DeveloperWidget(DockAreaWidget):
|
||||
"""Clean up resources used by the developer widget."""
|
||||
self.delete_all()
|
||||
return super().cleanup()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
|
||||
from bec_qthemes import apply_theme
|
||||
from qtpy.QtWidgets import QApplication
|
||||
|
||||
from bec_widgets.applications.main_app import BECMainApp
|
||||
|
||||
app = QApplication(sys.argv)
|
||||
apply_theme("dark")
|
||||
|
||||
_app = BECMainApp()
|
||||
_app.show()
|
||||
# developer_view.show()
|
||||
# developer_view.setWindowTitle("Developer View")
|
||||
# developer_view.resize(1920, 1080)
|
||||
# developer_view.set_stretch(horizontal=[1, 3, 2], vertical=[5, 5]) #can be set during runtime
|
||||
sys.exit(app.exec_())
|
||||
|
||||
@@ -56,7 +56,7 @@ _Widgets = {
|
||||
"SignalLabel": "SignalLabel",
|
||||
"TextBox": "TextBox",
|
||||
"Waveform": "Waveform",
|
||||
"WebConsole": "WebConsole",
|
||||
"BecConsole": "BecConsole",
|
||||
"WebsiteWidget": "WebsiteWidget",
|
||||
}
|
||||
|
||||
@@ -506,7 +506,7 @@ class BECQueue(RPCBase):
|
||||
|
||||
|
||||
class BECShell(RPCBase):
|
||||
"""A WebConsole pre-configured to run the BEC shell."""
|
||||
"""A BecConsole pre-configured to run the BEC shell."""
|
||||
|
||||
@rpc_call
|
||||
def remove(self):
|
||||
@@ -6417,7 +6417,7 @@ class WaveformViewPopup(RPCBase):
|
||||
"""
|
||||
|
||||
|
||||
class WebConsole(RPCBase):
|
||||
class BecConsole(RPCBase):
|
||||
"""A simple widget to display a website"""
|
||||
|
||||
@rpc_call
|
||||
|
||||
@@ -69,7 +69,7 @@ from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow
|
||||
from bec_widgets.widgets.containers.qt_ads import CDockWidget
|
||||
from bec_widgets.widgets.control.device_control.positioner_box import PositionerBox, PositionerBox2D
|
||||
from bec_widgets.widgets.control.scan_control import ScanControl
|
||||
from bec_widgets.widgets.editors.web_console.web_console import BECShell, WebConsole
|
||||
from bec_widgets.widgets.editors.bec_console.bec_console import BecConsole, BECShell
|
||||
from bec_widgets.widgets.plots.heatmap.heatmap import Heatmap
|
||||
from bec_widgets.widgets.plots.image.image import Image
|
||||
from bec_widgets.widgets.plots.motor_map.motor_map import MotorMap
|
||||
@@ -372,7 +372,7 @@ class BECDockArea(DockAreaWidget):
|
||||
"Add Circular ProgressBar",
|
||||
"RingProgressBar",
|
||||
),
|
||||
"terminal": (WebConsole.ICON_NAME, "Add Terminal", "WebConsole"),
|
||||
"terminal": (BecConsole.ICON_NAME, "Add Terminal", "BecConsole"),
|
||||
"bec_shell": (BECShell.ICON_NAME, "Add BEC Shell", "BECShell"),
|
||||
"log_panel": (LogPanel.ICON_NAME, "Add LogPanel - Disabled", "LogPanel"),
|
||||
"sbb_monitor": ("train", "Add SBB Monitor", "SBBMonitor"),
|
||||
|
||||
@@ -0,0 +1,537 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
import json
|
||||
import secrets
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from louie.saferef import safe_ref
|
||||
from pydantic import BaseModel
|
||||
from qtpy.QtCore import Qt, QTimer, Signal
|
||||
from qtpy.QtGui import QMouseEvent, QResizeEvent
|
||||
from qtpy.QtWidgets import QApplication, QLabel, QTabWidget, QVBoxLayout, QWidget
|
||||
|
||||
from bec_widgets.utils.bec_widget import BECWidget
|
||||
from bec_widgets.widgets.utility.bec_term.wtermwidget_wrapper import BecTerminal
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class ConsoleMode(str, enum.Enum):
|
||||
ACTIVE = "active"
|
||||
INACTIVE = "inactive"
|
||||
HIDDEN = "hidden"
|
||||
|
||||
|
||||
class InstanceOwnerInfo(BaseModel):
|
||||
owner_gui_id: str | None = None
|
||||
widget_ids: list[str] = []
|
||||
instance: BecConsole | None = None
|
||||
initialized: bool = False
|
||||
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
|
||||
|
||||
class BecConsoleRegistry:
|
||||
"""
|
||||
A registry for the BecConsole class to manage its instances.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialize the registry.
|
||||
"""
|
||||
self._instances = {}
|
||||
self._server_process = None
|
||||
self._server_port = None
|
||||
self._token = secrets.token_hex(16)
|
||||
self._instance_registry: dict[str, InstanceOwnerInfo] = {}
|
||||
|
||||
def register(self, instance: BecConsole):
|
||||
"""
|
||||
Register an instance of BecConsole.
|
||||
|
||||
Args:
|
||||
instance (BecConsole): The instance to register.
|
||||
"""
|
||||
self._instances[instance.gui_id] = safe_ref(instance)
|
||||
self.cleanup()
|
||||
|
||||
if instance._unique_id:
|
||||
self._register_instance(instance)
|
||||
|
||||
def cleanup(self):
|
||||
"""
|
||||
Clean up the registry by removing any instances that are no longer valid.
|
||||
"""
|
||||
for gui_id, weak_ref in list(self._instances.items()):
|
||||
if weak_ref() is None:
|
||||
del self._instances[gui_id]
|
||||
|
||||
def unregister(self, instance: BecConsole):
|
||||
"""
|
||||
Unregister an instance of BecConsole.
|
||||
|
||||
Args:
|
||||
instance (BecConsole): The instance to unregister.
|
||||
"""
|
||||
if instance.gui_id in self._instances:
|
||||
del self._instances[instance.gui_id]
|
||||
|
||||
if instance._unique_id:
|
||||
self._unregister_instance(instance._unique_id, instance.gui_id)
|
||||
|
||||
self.cleanup()
|
||||
|
||||
def _register_instance(self, instance: BecConsole):
|
||||
"""
|
||||
Register a instance in the registry. Please note that this does not transfer ownership
|
||||
for already existing instances; it simply records which widget currently owns the instance.
|
||||
Use transfer_instance_ownership to change ownership.
|
||||
|
||||
Args:
|
||||
instance (BecConsole): The instance to register.
|
||||
"""
|
||||
|
||||
unique_id = instance._unique_id
|
||||
gui_id = instance.gui_id
|
||||
|
||||
if unique_id is None:
|
||||
return
|
||||
|
||||
if unique_id not in self._instance_registry:
|
||||
instance = BecConsole()
|
||||
self._instance_registry[unique_id] = InstanceOwnerInfo(
|
||||
owner_gui_id=gui_id, widget_ids=[gui_id], instance=instance
|
||||
)
|
||||
logger.info(f"Registered new instance {unique_id} for {gui_id}")
|
||||
return
|
||||
|
||||
if gui_id not in self._instance_registry[unique_id].widget_ids:
|
||||
self._instance_registry[unique_id].widget_ids.append(gui_id)
|
||||
|
||||
def _unregister_instance(self, unique_id: str, gui_id: str):
|
||||
"""
|
||||
Unregister a instance from the registry.
|
||||
|
||||
Args:
|
||||
unique_id (str): The unique identifier for the instance.
|
||||
gui_id (str): The GUI ID of the widget.
|
||||
"""
|
||||
if unique_id not in self._instance_registry:
|
||||
return
|
||||
instance_info = self._instance_registry[unique_id]
|
||||
if gui_id in instance_info.widget_ids:
|
||||
instance_info.widget_ids.remove(gui_id)
|
||||
if instance_info.owner_gui_id == gui_id:
|
||||
instance_info.owner_gui_id = None
|
||||
if not instance_info.widget_ids:
|
||||
if instance_info.instance:
|
||||
instance_info.instance.deleteLater()
|
||||
del self._instance_registry[unique_id]
|
||||
|
||||
logger.info(f"Unregistered instance {unique_id} for {gui_id}")
|
||||
|
||||
def get_instance_info(self, unique_id: str) -> InstanceOwnerInfo | None:
|
||||
"""
|
||||
Get a instance from the registry.
|
||||
|
||||
Args:
|
||||
unique_id (str): The unique identifier for the instance.
|
||||
|
||||
Returns:
|
||||
InstanceOwnerInfo | None: The instance info if found, None otherwise.
|
||||
"""
|
||||
if unique_id not in self._instance_registry:
|
||||
return None
|
||||
return self._instance_registry[unique_id]
|
||||
|
||||
def take_instance_ownership(self, unique_id: str, new_owner_gui_id: str) -> BecConsole | None:
|
||||
"""
|
||||
Transfer ownership of a instance to a new owner.
|
||||
|
||||
Args:
|
||||
unique_id (str): The unique identifier for the instance.
|
||||
new_owner_gui_id (str): The GUI ID of the new owner.
|
||||
|
||||
Returns:
|
||||
BecTerminal | None: The instance if ownership transfer was successful, None otherwise.
|
||||
"""
|
||||
if unique_id not in self._instance_registry:
|
||||
logger.warning(f"Instance {unique_id} not found in registry")
|
||||
return None
|
||||
|
||||
instance_info = self._instance_registry[unique_id]
|
||||
old_owner_gui_id = instance_info.owner_gui_id
|
||||
if old_owner_gui_id:
|
||||
old_owner_ref = self._instances.get(old_owner_gui_id)
|
||||
if old_owner_ref:
|
||||
old_owner_instance = old_owner_ref()
|
||||
if old_owner_instance:
|
||||
old_owner_instance.yield_ownership()
|
||||
instance_info.owner_gui_id = new_owner_gui_id
|
||||
|
||||
logger.info(f"Transferred ownership of instance {unique_id} to {new_owner_gui_id}")
|
||||
return instance_info.instance
|
||||
|
||||
def yield_ownership(self, gui_id: str) -> bool:
|
||||
"""
|
||||
Yield ownership of a instance without destroying it. The instance remains in the
|
||||
registry with no owner, available for another widget to claim.
|
||||
|
||||
Args:
|
||||
gui_id (str): The GUI ID of the widget yielding ownership.
|
||||
|
||||
Returns:
|
||||
bool: True if ownership was yielded, False otherwise.
|
||||
"""
|
||||
if gui_id not in self._instances:
|
||||
return False
|
||||
|
||||
instance = self._instances[gui_id]()
|
||||
if instance is None:
|
||||
return False
|
||||
|
||||
unique_id = instance._unique_id
|
||||
if unique_id is None:
|
||||
return False
|
||||
|
||||
if unique_id not in self._instance_registry:
|
||||
return False
|
||||
|
||||
instance_owner_info = self._instance_registry[unique_id]
|
||||
if instance_owner_info.owner_gui_id != gui_id:
|
||||
return False
|
||||
|
||||
instance_owner_info.owner_gui_id = None
|
||||
return True
|
||||
|
||||
def owner_is_visible(self, unique_id: str) -> bool:
|
||||
"""
|
||||
Check if the owner of a instance is currently visible.
|
||||
|
||||
Args:
|
||||
unique_id (str): The unique identifier for the instance.
|
||||
Returns:
|
||||
bool: True if the owner is visible, False otherwise.
|
||||
"""
|
||||
instance_info = self.get_instance_info(unique_id)
|
||||
if instance_info is None or instance_info.owner_gui_id is None:
|
||||
return False
|
||||
|
||||
owner_ref = self._instances.get(instance_info.owner_gui_id)
|
||||
if owner_ref is None:
|
||||
return False
|
||||
|
||||
owner_instance = owner_ref()
|
||||
if owner_instance is None:
|
||||
return False
|
||||
|
||||
return owner_instance.isVisible()
|
||||
|
||||
|
||||
_bec_console_registry = BecConsoleRegistry()
|
||||
|
||||
|
||||
class BecConsole(BECWidget, QWidget):
|
||||
"""
|
||||
A simple widget to display a website
|
||||
"""
|
||||
|
||||
_js_callback = Signal(bool)
|
||||
initialized = Signal()
|
||||
|
||||
PLUGIN = True
|
||||
ICON_NAME = "terminal"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
parent=None,
|
||||
config=None,
|
||||
client=None,
|
||||
gui_id=None,
|
||||
startup_cmd: str | None = None,
|
||||
is_bec_shell: bool = False,
|
||||
unique_id: str | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs)
|
||||
self._mode = ConsoleMode.INACTIVE
|
||||
self._is_bec_shell = is_bec_shell
|
||||
self._startup_cmd = startup_cmd
|
||||
self._is_initialized = False
|
||||
self._unique_id = unique_id
|
||||
self.instance = None # Will be set in _set_up_instance
|
||||
|
||||
self._set_up_instance()
|
||||
|
||||
def _set_up_instance(self):
|
||||
"""
|
||||
Set up the web instance and UI elements.
|
||||
"""
|
||||
layout = QVBoxLayout()
|
||||
layout.setContentsMargins(0, 0, 0, 0)
|
||||
self.term = BecTerminal(self)
|
||||
|
||||
layout.addWidget(self.term)
|
||||
self.setLayout(layout)
|
||||
|
||||
# prepare overlay
|
||||
self.overlay = QWidget(self)
|
||||
layout = QVBoxLayout(self.overlay)
|
||||
layout.setAlignment(Qt.AlignmentFlag.AlignCenter)
|
||||
label = QLabel("Click to activate terminal", self.overlay)
|
||||
layout.addWidget(label)
|
||||
self.overlay.hide()
|
||||
|
||||
_bec_console_registry.register(self)
|
||||
self._token = _bec_console_registry._token
|
||||
|
||||
# If no unique_id is provided, create a new instance
|
||||
if not self._unique_id:
|
||||
self._set_mode(ConsoleMode.ACTIVE)
|
||||
return
|
||||
|
||||
# Try to get the instance from the registry
|
||||
if instance := _bec_console_registry.get_instance_info(self._unique_id):
|
||||
if instance.owner_gui_id != self.gui_id:
|
||||
self._set_mode(ConsoleMode.INACTIVE)
|
||||
else:
|
||||
self._set_mode(ConsoleMode.ACTIVE)
|
||||
|
||||
def _set_mode(self, mode: ConsoleMode):
|
||||
"""
|
||||
Set the mode of the web console.
|
||||
|
||||
Args:
|
||||
mode (ConsoleMode): The mode to set.
|
||||
"""
|
||||
if not self._unique_id:
|
||||
# For non-unique_id consoles, always active
|
||||
mode = ConsoleMode.ACTIVE
|
||||
|
||||
self._mode = mode
|
||||
match mode:
|
||||
case ConsoleMode.ACTIVE:
|
||||
self.term.setVisible(True)
|
||||
self.overlay.hide()
|
||||
case ConsoleMode.INACTIVE:
|
||||
self.term.setVisible(False)
|
||||
self.overlay.show()
|
||||
case ConsoleMode.HIDDEN:
|
||||
self.term.setVisible(False)
|
||||
self.overlay.hide()
|
||||
|
||||
@property
|
||||
def startup_cmd(self):
|
||||
"""
|
||||
Get the startup command for the web console.
|
||||
"""
|
||||
if self._is_bec_shell:
|
||||
if self.bec_dispatcher.cli_server is None:
|
||||
return "bec --nogui"
|
||||
return f"bec --gui-id {self.bec_dispatcher.cli_server.gui_id}"
|
||||
return self._startup_cmd
|
||||
|
||||
@startup_cmd.setter
|
||||
def startup_cmd(self, cmd: str):
|
||||
"""
|
||||
Set the startup command for the console.
|
||||
"""
|
||||
if not isinstance(cmd, str):
|
||||
raise ValueError("Startup command must be a string.")
|
||||
self._startup_cmd = cmd
|
||||
|
||||
# def write(self, data: str, send_return: bool = True):
|
||||
# """
|
||||
# Send data to the console
|
||||
|
||||
# Args:
|
||||
# data (str): The data to send.
|
||||
# send_return (bool): Whether to send a return after the data.
|
||||
# """
|
||||
# cmd = f"window.term.paste({json.dumps(data)});"
|
||||
# if self.instance is None:
|
||||
# logger.warning("Cannot write to console: instance is not initialized.")
|
||||
# return
|
||||
# self.instance.runJavaScript(cmd)
|
||||
# if send_return:
|
||||
# self.send_return()
|
||||
|
||||
def take_instance_ownership(self, unique_id: str | None = None):
|
||||
"""
|
||||
Take ownership of a web instance from the registry. This will transfer the instance
|
||||
from its current owner (if any) to this widget.
|
||||
|
||||
Args:
|
||||
unique_id (str): The unique identifier of the instance to take ownership of.
|
||||
If None, uses this widget's unique_id.
|
||||
"""
|
||||
if unique_id is None:
|
||||
unique_id = self._unique_id
|
||||
|
||||
if not unique_id:
|
||||
logger.warning("Cannot take instance ownership without a unique_id")
|
||||
return
|
||||
|
||||
# Get the instance from registry
|
||||
instance = _bec_console_registry.take_instance_ownership(unique_id, self.gui_id)
|
||||
|
||||
if not instance:
|
||||
logger.warning(f"Instance {unique_id} not found in registry")
|
||||
return
|
||||
|
||||
self.instance = instance
|
||||
self._set_mode(ConsoleMode.ACTIVE)
|
||||
logger.info(f"Widget {self.gui_id} took ownership of instance {unique_id}")
|
||||
|
||||
def _on_ownership_lost(self):
|
||||
"""
|
||||
Called when this widget loses ownership of its instance.
|
||||
Displays the overlay and hides the browser.
|
||||
"""
|
||||
self._set_mode(ConsoleMode.INACTIVE)
|
||||
logger.info(f"Widget {self.gui_id} lost ownership of instance {self._unique_id}")
|
||||
|
||||
def yield_ownership(self):
|
||||
"""
|
||||
Yield ownership of the instance. The instance remains in the registry with no owner,
|
||||
available for another widget to claim. This is automatically called when the
|
||||
widget becomes hidden.
|
||||
"""
|
||||
if not self._unique_id:
|
||||
return
|
||||
success = _bec_console_registry.yield_ownership(self.gui_id)
|
||||
if success:
|
||||
self._on_ownership_lost()
|
||||
logger.info(f"Widget {self.gui_id} yielded ownership of instance {self._unique_id}")
|
||||
|
||||
def has_ownership(self) -> bool:
|
||||
"""
|
||||
Check if this widget currently has ownership of a instance.
|
||||
|
||||
Returns:
|
||||
bool: True if this widget owns a instance, False otherwise.
|
||||
"""
|
||||
if not self._unique_id:
|
||||
return False
|
||||
instance_info = _bec_console_registry.get_instance_info(self._unique_id)
|
||||
if instance_info is None:
|
||||
return False
|
||||
return instance_info.owner_gui_id == self.gui_id
|
||||
|
||||
def hideEvent(self, event):
|
||||
"""
|
||||
Called when the widget is hidden. Automatically yields ownership.
|
||||
"""
|
||||
if self.has_ownership():
|
||||
self.yield_ownership()
|
||||
self._set_mode(ConsoleMode.HIDDEN)
|
||||
super().hideEvent(event)
|
||||
|
||||
def showEvent(self, event):
|
||||
"""
|
||||
Called when the widget is shown. Updates UI state based on ownership.
|
||||
"""
|
||||
super().showEvent(event)
|
||||
if self._unique_id and not self.has_ownership():
|
||||
# Take ownership if the instance does not have an owner or
|
||||
# the owner is not visible
|
||||
instance_info = _bec_console_registry.get_instance_info(self._unique_id)
|
||||
if instance_info is None:
|
||||
self._set_mode(ConsoleMode.INACTIVE)
|
||||
return
|
||||
if instance_info.owner_gui_id is None or not _bec_console_registry.owner_is_visible(
|
||||
self._unique_id
|
||||
):
|
||||
self.take_instance_ownership(self._unique_id)
|
||||
return
|
||||
if instance_info.owner_gui_id != self.gui_id:
|
||||
self._set_mode(ConsoleMode.INACTIVE)
|
||||
return
|
||||
|
||||
def resizeEvent(self, event: QResizeEvent) -> None:
|
||||
super().resizeEvent(event)
|
||||
self.overlay.resize(event.size())
|
||||
|
||||
def mousePressEvent(self, event: QMouseEvent) -> None:
|
||||
if event.button() == Qt.MouseButton.LeftButton and not self.has_ownership():
|
||||
self.take_instance_ownership(self._unique_id)
|
||||
event.accept()
|
||||
return
|
||||
return super().mousePressEvent(event)
|
||||
|
||||
def _authenticate(self, _, auth):
|
||||
"""
|
||||
Authenticate the request with the provided username and password.
|
||||
"""
|
||||
auth.setUser("user")
|
||||
auth.setPassword(self._token)
|
||||
|
||||
def set_readonly(self, readonly: bool):
|
||||
"""
|
||||
Set the web console to read-only mode.
|
||||
"""
|
||||
if not isinstance(readonly, bool):
|
||||
raise ValueError("Readonly must be a boolean.")
|
||||
self.setEnabled(not readonly)
|
||||
|
||||
def cleanup(self):
|
||||
"""
|
||||
Clean up the registry by removing any instances that are no longer valid.
|
||||
"""
|
||||
_bec_console_registry.unregister(self)
|
||||
super().cleanup()
|
||||
|
||||
|
||||
class BECShell(BecConsole):
|
||||
"""
|
||||
A BecConsole pre-configured to run the BEC shell.
|
||||
We cannot simply expose the web console properties to Qt as we need to have a deterministic
|
||||
startup behavior for sharing the same shell instance across multiple widgets.
|
||||
"""
|
||||
|
||||
ICON_NAME = "hub"
|
||||
|
||||
def __init__(self, parent=None, config=None, client=None, gui_id=None, **kwargs):
|
||||
super().__init__(
|
||||
parent=parent,
|
||||
config=config,
|
||||
client=client,
|
||||
gui_id=gui_id,
|
||||
is_bec_shell=True,
|
||||
unique_id="bec_shell",
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
import sys
|
||||
|
||||
app = QApplication(sys.argv)
|
||||
widget = QTabWidget()
|
||||
|
||||
# Create two consoles with different unique_ids
|
||||
bec_console1 = BecConsole(startup_cmd="bec --nogui", unique_id="console1")
|
||||
bec_console2 = BecConsole(startup_cmd="htop")
|
||||
bec_console3 = BecConsole(startup_cmd="bec --nogui", unique_id="console1")
|
||||
widget.addTab(bec_console1, "Console 1")
|
||||
widget.addTab(bec_console2, "Console 2")
|
||||
widget.addTab(bec_console3, "Console 3 -- mirror of Console 1")
|
||||
widget.show()
|
||||
|
||||
# Demonstrate instance sharing:
|
||||
# After initialization, bec_console2 can take ownership of console1's instance:
|
||||
# bec_console2.take_instance_ownership("console1")
|
||||
|
||||
widget.resize(800, 600)
|
||||
|
||||
def _close_cons1():
|
||||
bec_console2.close()
|
||||
bec_console2.deleteLater()
|
||||
|
||||
# QTimer.singleShot(3000, _close_cons1)
|
||||
|
||||
sys.exit(app.exec_())
|
||||
@@ -1 +0,0 @@
|
||||
{'files': ['web_console.py']}
|
||||
@@ -1,57 +0,0 @@
|
||||
# Copyright (C) 2022 The Qt Company Ltd.
|
||||
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
|
||||
|
||||
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
|
||||
from qtpy.QtWidgets import QWidget
|
||||
|
||||
from bec_widgets.utils.bec_designer import designer_material_icon
|
||||
from bec_widgets.widgets.editors.web_console.web_console import BECShell
|
||||
|
||||
DOM_XML = """
|
||||
<ui language='c++'>
|
||||
<widget class='BECShell' name='bec_shell'>
|
||||
</widget>
|
||||
</ui>
|
||||
"""
|
||||
|
||||
|
||||
class BECShellPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._form_editor = None
|
||||
|
||||
def createWidget(self, parent):
|
||||
if parent is None:
|
||||
return QWidget()
|
||||
t = BECShell(parent)
|
||||
return t
|
||||
|
||||
def domXml(self):
|
||||
return DOM_XML
|
||||
|
||||
def group(self):
|
||||
return ""
|
||||
|
||||
def icon(self):
|
||||
return designer_material_icon(BECShell.ICON_NAME)
|
||||
|
||||
def includeFile(self):
|
||||
return "bec_shell"
|
||||
|
||||
def initialize(self, form_editor):
|
||||
self._form_editor = form_editor
|
||||
|
||||
def isContainer(self):
|
||||
return False
|
||||
|
||||
def isInitialized(self):
|
||||
return self._form_editor is not None
|
||||
|
||||
def name(self):
|
||||
return "BECShell"
|
||||
|
||||
def toolTip(self):
|
||||
return ""
|
||||
|
||||
def whatsThis(self):
|
||||
return self.toolTip()
|
||||
@@ -1,15 +0,0 @@
|
||||
def main(): # pragma: no cover
|
||||
from qtpy import PYSIDE6
|
||||
|
||||
if not PYSIDE6:
|
||||
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
|
||||
return
|
||||
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
|
||||
|
||||
from bec_widgets.widgets.editors.web_console.bec_shell_plugin import BECShellPlugin
|
||||
|
||||
QPyDesignerCustomWidgetCollection.addCustomWidget(BECShellPlugin())
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
main()
|
||||
@@ -1,15 +0,0 @@
|
||||
def main(): # pragma: no cover
|
||||
from qtpy import PYSIDE6
|
||||
|
||||
if not PYSIDE6:
|
||||
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
|
||||
return
|
||||
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
|
||||
|
||||
from bec_widgets.widgets.editors.web_console.web_console_plugin import WebConsolePlugin
|
||||
|
||||
QPyDesignerCustomWidgetCollection.addCustomWidget(WebConsolePlugin())
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
main()
|
||||
@@ -1,705 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
import json
|
||||
import secrets
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from louie.saferef import safe_ref
|
||||
from pydantic import BaseModel
|
||||
from qtpy.QtCore import Qt, QTimer, QUrl, Signal, qInstallMessageHandler
|
||||
from qtpy.QtGui import QMouseEvent, QResizeEvent
|
||||
from qtpy.QtWebEngineWidgets import QWebEnginePage, QWebEngineView
|
||||
from qtpy.QtWidgets import QApplication, QLabel, QTabWidget, QVBoxLayout, QWidget
|
||||
|
||||
from bec_widgets.utils.bec_widget import BECWidget
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class ConsoleMode(str, enum.Enum):
|
||||
ACTIVE = "active"
|
||||
INACTIVE = "inactive"
|
||||
HIDDEN = "hidden"
|
||||
|
||||
|
||||
class PageOwnerInfo(BaseModel):
|
||||
owner_gui_id: str | None = None
|
||||
widget_ids: list[str] = []
|
||||
page: QWebEnginePage | None = None
|
||||
initialized: bool = False
|
||||
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
|
||||
|
||||
class WebConsoleRegistry:
|
||||
"""
|
||||
A registry for the WebConsole class to manage its instances.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialize the registry.
|
||||
"""
|
||||
self._instances = {}
|
||||
self._server_process = None
|
||||
self._server_port = None
|
||||
self._token = secrets.token_hex(16)
|
||||
self._page_registry: dict[str, PageOwnerInfo] = {}
|
||||
|
||||
def register(self, instance: WebConsole):
|
||||
"""
|
||||
Register an instance of WebConsole.
|
||||
|
||||
Args:
|
||||
instance (WebConsole): The instance to register.
|
||||
"""
|
||||
self._instances[instance.gui_id] = safe_ref(instance)
|
||||
self.cleanup()
|
||||
|
||||
if instance._unique_id:
|
||||
self._register_page(instance)
|
||||
|
||||
if self._server_process is None:
|
||||
# Start the ttyd server if not already running
|
||||
self.start_ttyd()
|
||||
|
||||
def start_ttyd(self, use_zsh: bool | None = None):
|
||||
"""
|
||||
Start the ttyd server
|
||||
ttyd -q -W -t 'theme={"background": "black"}' zsh
|
||||
|
||||
Args:
|
||||
use_zsh (bool): Whether to use zsh or bash. If None, it will try to detect if zsh is available.
|
||||
"""
|
||||
|
||||
# First, check if ttyd is installed
|
||||
try:
|
||||
subprocess.run(["ttyd", "--version"], check=True, stdout=subprocess.PIPE)
|
||||
except FileNotFoundError:
|
||||
# pylint: disable=raise-missing-from
|
||||
raise RuntimeError("ttyd is not installed. Please install it first.")
|
||||
|
||||
if use_zsh is None:
|
||||
# Check if we can use zsh
|
||||
try:
|
||||
subprocess.run(["zsh", "--version"], check=True, stdout=subprocess.PIPE)
|
||||
use_zsh = True
|
||||
except FileNotFoundError:
|
||||
use_zsh = False
|
||||
|
||||
command = [
|
||||
"ttyd",
|
||||
"-p",
|
||||
"0",
|
||||
"-W",
|
||||
"-t",
|
||||
'theme={"background": "black"}',
|
||||
"-c",
|
||||
f"user:{self._token}",
|
||||
]
|
||||
if use_zsh:
|
||||
command.append("zsh")
|
||||
else:
|
||||
command.append("bash")
|
||||
|
||||
# Start the ttyd server
|
||||
self._server_process = subprocess.Popen(
|
||||
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
||||
)
|
||||
|
||||
self._wait_for_server_port()
|
||||
|
||||
self._server_process.stdout.close()
|
||||
self._server_process.stderr.close()
|
||||
|
||||
def _wait_for_server_port(self, timeout: float = 10):
|
||||
"""
|
||||
Wait for the ttyd server to start and get the port number.
|
||||
|
||||
Args:
|
||||
timeout (float): The timeout in seconds to wait for the server to start.
|
||||
"""
|
||||
start_time = time.time()
|
||||
while True:
|
||||
output = self._server_process.stderr.readline()
|
||||
if output == b"" and self._server_process.poll() is not None:
|
||||
break
|
||||
if not output:
|
||||
continue
|
||||
|
||||
output = output.decode("utf-8").strip()
|
||||
if "Listening on" in output:
|
||||
# Extract the port number from the output
|
||||
self._server_port = int(output.split(":")[-1])
|
||||
logger.info(f"ttyd server started on port {self._server_port}")
|
||||
break
|
||||
if time.time() - start_time > timeout:
|
||||
raise TimeoutError(
|
||||
"Timeout waiting for ttyd server to start. Please check if ttyd is installed and available in your PATH."
|
||||
)
|
||||
|
||||
def cleanup(self):
|
||||
"""
|
||||
Clean up the registry by removing any instances that are no longer valid.
|
||||
"""
|
||||
for gui_id, weak_ref in list(self._instances.items()):
|
||||
if weak_ref() is None:
|
||||
del self._instances[gui_id]
|
||||
|
||||
if not self._instances and self._server_process:
|
||||
# If no instances are left, terminate the server process
|
||||
self._server_process.terminate()
|
||||
self._server_process = None
|
||||
self._server_port = None
|
||||
logger.info("ttyd server terminated")
|
||||
|
||||
def unregister(self, instance: WebConsole):
|
||||
"""
|
||||
Unregister an instance of WebConsole.
|
||||
|
||||
Args:
|
||||
instance (WebConsole): The instance to unregister.
|
||||
"""
|
||||
if instance.gui_id in self._instances:
|
||||
del self._instances[instance.gui_id]
|
||||
|
||||
if instance._unique_id:
|
||||
self._unregister_page(instance._unique_id, instance.gui_id)
|
||||
|
||||
self.cleanup()
|
||||
|
||||
def _register_page(self, instance: WebConsole):
|
||||
"""
|
||||
Register a page in the registry. Please note that this does not transfer ownership
|
||||
for already existing pages; it simply records which widget currently owns the page.
|
||||
Use transfer_page_ownership to change ownership.
|
||||
|
||||
Args:
|
||||
instance (WebConsole): The instance to register.
|
||||
"""
|
||||
|
||||
unique_id = instance._unique_id
|
||||
gui_id = instance.gui_id
|
||||
|
||||
if unique_id is None:
|
||||
return
|
||||
|
||||
if unique_id not in self._page_registry:
|
||||
page = BECWebEnginePage()
|
||||
page.authenticationRequired.connect(instance._authenticate)
|
||||
self._page_registry[unique_id] = PageOwnerInfo(
|
||||
owner_gui_id=gui_id, widget_ids=[gui_id], page=page
|
||||
)
|
||||
logger.info(f"Registered new page {unique_id} for {gui_id}")
|
||||
return
|
||||
|
||||
if gui_id not in self._page_registry[unique_id].widget_ids:
|
||||
self._page_registry[unique_id].widget_ids.append(gui_id)
|
||||
|
||||
def _unregister_page(self, unique_id: str, gui_id: str):
|
||||
"""
|
||||
Unregister a page from the registry.
|
||||
|
||||
Args:
|
||||
unique_id (str): The unique identifier for the page.
|
||||
gui_id (str): The GUI ID of the widget.
|
||||
"""
|
||||
if unique_id not in self._page_registry:
|
||||
return
|
||||
page_info = self._page_registry[unique_id]
|
||||
if gui_id in page_info.widget_ids:
|
||||
page_info.widget_ids.remove(gui_id)
|
||||
if page_info.owner_gui_id == gui_id:
|
||||
page_info.owner_gui_id = None
|
||||
if not page_info.widget_ids:
|
||||
if page_info.page:
|
||||
page_info.page.deleteLater()
|
||||
del self._page_registry[unique_id]
|
||||
|
||||
logger.info(f"Unregistered page {unique_id} for {gui_id}")
|
||||
|
||||
def get_page_info(self, unique_id: str) -> PageOwnerInfo | None:
|
||||
"""
|
||||
Get a page from the registry.
|
||||
|
||||
Args:
|
||||
unique_id (str): The unique identifier for the page.
|
||||
|
||||
Returns:
|
||||
PageOwnerInfo | None: The page info if found, None otherwise.
|
||||
"""
|
||||
if unique_id not in self._page_registry:
|
||||
return None
|
||||
return self._page_registry[unique_id]
|
||||
|
||||
def take_page_ownership(self, unique_id: str, new_owner_gui_id: str) -> QWebEnginePage | None:
|
||||
"""
|
||||
Transfer ownership of a page to a new owner.
|
||||
|
||||
Args:
|
||||
unique_id (str): The unique identifier for the page.
|
||||
new_owner_gui_id (str): The GUI ID of the new owner.
|
||||
|
||||
Returns:
|
||||
QWebEnginePage | None: The page if ownership transfer was successful, None otherwise.
|
||||
"""
|
||||
if unique_id not in self._page_registry:
|
||||
logger.warning(f"Page {unique_id} not found in registry")
|
||||
return None
|
||||
|
||||
page_info = self._page_registry[unique_id]
|
||||
old_owner_gui_id = page_info.owner_gui_id
|
||||
if old_owner_gui_id:
|
||||
old_owner_ref = self._instances.get(old_owner_gui_id)
|
||||
if old_owner_ref:
|
||||
old_owner_instance = old_owner_ref()
|
||||
if old_owner_instance:
|
||||
old_owner_instance.yield_ownership()
|
||||
page_info.owner_gui_id = new_owner_gui_id
|
||||
|
||||
logger.info(f"Transferred ownership of page {unique_id} to {new_owner_gui_id}")
|
||||
return page_info.page
|
||||
|
||||
def yield_ownership(self, gui_id: str) -> bool:
|
||||
"""
|
||||
Yield ownership of a page without destroying it. The page remains in the
|
||||
registry with no owner, available for another widget to claim.
|
||||
|
||||
Args:
|
||||
gui_id (str): The GUI ID of the widget yielding ownership.
|
||||
|
||||
Returns:
|
||||
bool: True if ownership was yielded, False otherwise.
|
||||
"""
|
||||
if gui_id not in self._instances:
|
||||
return False
|
||||
|
||||
instance = self._instances[gui_id]()
|
||||
if instance is None:
|
||||
return False
|
||||
|
||||
unique_id = instance._unique_id
|
||||
if unique_id is None:
|
||||
return False
|
||||
|
||||
if unique_id not in self._page_registry:
|
||||
return False
|
||||
|
||||
page_owner_info = self._page_registry[unique_id]
|
||||
if page_owner_info.owner_gui_id != gui_id:
|
||||
return False
|
||||
|
||||
page_owner_info.owner_gui_id = None
|
||||
return True
|
||||
|
||||
def owner_is_visible(self, unique_id: str) -> bool:
|
||||
"""
|
||||
Check if the owner of a page is currently visible.
|
||||
|
||||
Args:
|
||||
unique_id (str): The unique identifier for the page.
|
||||
Returns:
|
||||
bool: True if the owner is visible, False otherwise.
|
||||
"""
|
||||
page_info = self.get_page_info(unique_id)
|
||||
if page_info is None or page_info.owner_gui_id is None:
|
||||
return False
|
||||
|
||||
owner_ref = self._instances.get(page_info.owner_gui_id)
|
||||
if owner_ref is None:
|
||||
return False
|
||||
|
||||
owner_instance = owner_ref()
|
||||
if owner_instance is None:
|
||||
return False
|
||||
|
||||
return owner_instance.isVisible()
|
||||
|
||||
|
||||
_web_console_registry = WebConsoleRegistry()
|
||||
|
||||
|
||||
def suppress_qt_messages(type_, context, msg):
|
||||
if context.category in ["js", "default"]:
|
||||
return
|
||||
print(msg)
|
||||
|
||||
|
||||
qInstallMessageHandler(suppress_qt_messages)
|
||||
|
||||
|
||||
class BECWebEnginePage(QWebEnginePage):
|
||||
def javaScriptConsoleMessage(self, level, message, lineNumber, sourceID):
|
||||
logger.info(f"[JS Console] {level.name} at line {lineNumber} in {sourceID}: {message}")
|
||||
|
||||
|
||||
class WebConsole(BECWidget, QWidget):
|
||||
"""
|
||||
A simple widget to display a website
|
||||
"""
|
||||
|
||||
_js_callback = Signal(bool)
|
||||
initialized = Signal()
|
||||
|
||||
PLUGIN = True
|
||||
ICON_NAME = "terminal"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
parent=None,
|
||||
config=None,
|
||||
client=None,
|
||||
gui_id=None,
|
||||
startup_cmd: str | None = None,
|
||||
is_bec_shell: bool = False,
|
||||
unique_id: str | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs)
|
||||
self._mode = ConsoleMode.INACTIVE
|
||||
self._is_bec_shell = is_bec_shell
|
||||
self._startup_cmd = startup_cmd
|
||||
self._is_initialized = False
|
||||
self._unique_id = unique_id
|
||||
self.page = None # Will be set in _set_up_page
|
||||
|
||||
self._startup_timer = QTimer()
|
||||
self._startup_timer.setInterval(500)
|
||||
self._startup_timer.timeout.connect(self._check_page_ready)
|
||||
self._startup_timer.start()
|
||||
self._js_callback.connect(self._on_js_callback)
|
||||
|
||||
self._set_up_page()
|
||||
|
||||
def _set_up_page(self):
|
||||
"""
|
||||
Set up the web page and UI elements.
|
||||
"""
|
||||
layout = QVBoxLayout()
|
||||
layout.setContentsMargins(0, 0, 0, 0)
|
||||
self.browser = QWebEngineView(self)
|
||||
|
||||
layout.addWidget(self.browser)
|
||||
self.setLayout(layout)
|
||||
|
||||
# prepare overlay
|
||||
self.overlay = QWidget(self)
|
||||
layout = QVBoxLayout(self.overlay)
|
||||
layout.setAlignment(Qt.AlignmentFlag.AlignCenter)
|
||||
label = QLabel("Click to activate terminal", self.overlay)
|
||||
layout.addWidget(label)
|
||||
self.overlay.hide()
|
||||
|
||||
_web_console_registry.register(self)
|
||||
self._token = _web_console_registry._token
|
||||
|
||||
# If no unique_id is provided, create a new page
|
||||
if not self._unique_id:
|
||||
self.page = BECWebEnginePage(self)
|
||||
self.page.authenticationRequired.connect(self._authenticate)
|
||||
self.page.setUrl(QUrl(f"http://localhost:{_web_console_registry._server_port}"))
|
||||
self.browser.setPage(self.page)
|
||||
self._set_mode(ConsoleMode.ACTIVE)
|
||||
return
|
||||
|
||||
# Try to get the page from the registry
|
||||
page_info = _web_console_registry.get_page_info(self._unique_id)
|
||||
if page_info and page_info.page:
|
||||
self.page = page_info.page
|
||||
if not page_info.owner_gui_id or page_info.owner_gui_id == self.gui_id:
|
||||
self.browser.setPage(self.page)
|
||||
# Only set URL if this is a newly created page (no URL set yet)
|
||||
if self.page.url().isEmpty():
|
||||
self.page.setUrl(QUrl(f"http://localhost:{_web_console_registry._server_port}"))
|
||||
else:
|
||||
# We have an existing page, so we don't need the startup timer
|
||||
self._startup_timer.stop()
|
||||
if page_info.owner_gui_id != self.gui_id:
|
||||
self._set_mode(ConsoleMode.INACTIVE)
|
||||
else:
|
||||
self._set_mode(ConsoleMode.ACTIVE)
|
||||
|
||||
def _set_mode(self, mode: ConsoleMode):
|
||||
"""
|
||||
Set the mode of the web console.
|
||||
|
||||
Args:
|
||||
mode (ConsoleMode): The mode to set.
|
||||
"""
|
||||
if not self._unique_id:
|
||||
# For non-unique_id consoles, always active
|
||||
mode = ConsoleMode.ACTIVE
|
||||
|
||||
self._mode = mode
|
||||
match mode:
|
||||
case ConsoleMode.ACTIVE:
|
||||
self.browser.setVisible(True)
|
||||
self.overlay.hide()
|
||||
case ConsoleMode.INACTIVE:
|
||||
self.browser.setVisible(False)
|
||||
self.overlay.show()
|
||||
case ConsoleMode.HIDDEN:
|
||||
self.browser.setVisible(False)
|
||||
self.overlay.hide()
|
||||
|
||||
def _check_page_ready(self):
|
||||
"""
|
||||
Check if the page is ready and stop the timer if it is.
|
||||
"""
|
||||
if not self.page or self.page.isLoading():
|
||||
return
|
||||
|
||||
self.page.runJavaScript("window.term !== undefined", self._js_callback.emit)
|
||||
|
||||
def _on_js_callback(self, ready: bool):
|
||||
"""
|
||||
Callback for when the JavaScript is ready.
|
||||
"""
|
||||
if not ready:
|
||||
return
|
||||
self._is_initialized = True
|
||||
self._startup_timer.stop()
|
||||
if self.startup_cmd:
|
||||
if self._unique_id:
|
||||
page_info = _web_console_registry.get_page_info(self._unique_id)
|
||||
if page_info is None:
|
||||
return
|
||||
if not page_info.initialized:
|
||||
page_info.initialized = True
|
||||
self.write(self.startup_cmd)
|
||||
else:
|
||||
self.write(self.startup_cmd)
|
||||
self.initialized.emit()
|
||||
|
||||
@property
|
||||
def startup_cmd(self):
|
||||
"""
|
||||
Get the startup command for the web console.
|
||||
"""
|
||||
if self._is_bec_shell:
|
||||
if self.bec_dispatcher.cli_server is None:
|
||||
return "bec --nogui"
|
||||
return f"bec --gui-id {self.bec_dispatcher.cli_server.gui_id}"
|
||||
return self._startup_cmd
|
||||
|
||||
@startup_cmd.setter
|
||||
def startup_cmd(self, cmd: str):
|
||||
"""
|
||||
Set the startup command for the web console.
|
||||
"""
|
||||
if not isinstance(cmd, str):
|
||||
raise ValueError("Startup command must be a string.")
|
||||
self._startup_cmd = cmd
|
||||
|
||||
def write(self, data: str, send_return: bool = True):
|
||||
"""
|
||||
Send data to the web page
|
||||
|
||||
Args:
|
||||
data (str): The data to send.
|
||||
send_return (bool): Whether to send a return after the data.
|
||||
"""
|
||||
cmd = f"window.term.paste({json.dumps(data)});"
|
||||
if self.page is None:
|
||||
logger.warning("Cannot write to web console: page is not initialized.")
|
||||
return
|
||||
self.page.runJavaScript(cmd)
|
||||
if send_return:
|
||||
self.send_return()
|
||||
|
||||
def take_page_ownership(self, unique_id: str | None = None):
|
||||
"""
|
||||
Take ownership of a web page from the registry. This will transfer the page
|
||||
from its current owner (if any) to this widget.
|
||||
|
||||
Args:
|
||||
unique_id (str): The unique identifier of the page to take ownership of.
|
||||
If None, uses this widget's unique_id.
|
||||
"""
|
||||
if unique_id is None:
|
||||
unique_id = self._unique_id
|
||||
|
||||
if not unique_id:
|
||||
logger.warning("Cannot take page ownership without a unique_id")
|
||||
return
|
||||
|
||||
# Get the page from registry
|
||||
page = _web_console_registry.take_page_ownership(unique_id, self.gui_id)
|
||||
|
||||
if not page:
|
||||
logger.warning(f"Page {unique_id} not found in registry")
|
||||
return
|
||||
|
||||
self.page = page
|
||||
self.browser.setPage(page)
|
||||
self._set_mode(ConsoleMode.ACTIVE)
|
||||
logger.info(f"Widget {self.gui_id} took ownership of page {unique_id}")
|
||||
|
||||
def _on_ownership_lost(self):
|
||||
"""
|
||||
Called when this widget loses ownership of its page.
|
||||
Displays the overlay and hides the browser.
|
||||
"""
|
||||
self._set_mode(ConsoleMode.INACTIVE)
|
||||
logger.info(f"Widget {self.gui_id} lost ownership of page {self._unique_id}")
|
||||
|
||||
def yield_ownership(self):
|
||||
"""
|
||||
Yield ownership of the page. The page remains in the registry with no owner,
|
||||
available for another widget to claim. This is automatically called when the
|
||||
widget becomes hidden.
|
||||
"""
|
||||
if not self._unique_id:
|
||||
return
|
||||
success = _web_console_registry.yield_ownership(self.gui_id)
|
||||
if success:
|
||||
self._on_ownership_lost()
|
||||
logger.info(f"Widget {self.gui_id} yielded ownership of page {self._unique_id}")
|
||||
|
||||
def has_ownership(self) -> bool:
|
||||
"""
|
||||
Check if this widget currently has ownership of a page.
|
||||
|
||||
Returns:
|
||||
bool: True if this widget owns a page, False otherwise.
|
||||
"""
|
||||
if not self._unique_id:
|
||||
return False
|
||||
page_info = _web_console_registry.get_page_info(self._unique_id)
|
||||
if page_info is None:
|
||||
return False
|
||||
return page_info.owner_gui_id == self.gui_id
|
||||
|
||||
def hideEvent(self, event):
|
||||
"""
|
||||
Called when the widget is hidden. Automatically yields ownership.
|
||||
"""
|
||||
if self.has_ownership():
|
||||
self.yield_ownership()
|
||||
self._set_mode(ConsoleMode.HIDDEN)
|
||||
super().hideEvent(event)
|
||||
|
||||
def showEvent(self, event):
|
||||
"""
|
||||
Called when the widget is shown. Updates UI state based on ownership.
|
||||
"""
|
||||
super().showEvent(event)
|
||||
if self._unique_id and not self.has_ownership():
|
||||
# Take ownership if the page does not have an owner or
|
||||
# the owner is not visible
|
||||
page_info = _web_console_registry.get_page_info(self._unique_id)
|
||||
if page_info is None:
|
||||
self._set_mode(ConsoleMode.INACTIVE)
|
||||
return
|
||||
if page_info.owner_gui_id is None or not _web_console_registry.owner_is_visible(
|
||||
self._unique_id
|
||||
):
|
||||
self.take_page_ownership(self._unique_id)
|
||||
return
|
||||
if page_info.owner_gui_id != self.gui_id:
|
||||
self._set_mode(ConsoleMode.INACTIVE)
|
||||
return
|
||||
|
||||
def resizeEvent(self, event: QResizeEvent) -> None:
|
||||
super().resizeEvent(event)
|
||||
self.overlay.resize(event.size())
|
||||
|
||||
def mousePressEvent(self, event: QMouseEvent) -> None:
|
||||
if event.button() == Qt.MouseButton.LeftButton and not self.has_ownership():
|
||||
self.take_page_ownership(self._unique_id)
|
||||
event.accept()
|
||||
return
|
||||
return super().mousePressEvent(event)
|
||||
|
||||
def _authenticate(self, _, auth):
|
||||
"""
|
||||
Authenticate the request with the provided username and password.
|
||||
"""
|
||||
auth.setUser("user")
|
||||
auth.setPassword(self._token)
|
||||
|
||||
def send_return(self):
|
||||
"""
|
||||
Send return to the web page
|
||||
"""
|
||||
self.page.runJavaScript(
|
||||
"document.querySelector('textarea.xterm-helper-textarea').dispatchEvent(new KeyboardEvent('keypress', {charCode: 13}))"
|
||||
)
|
||||
|
||||
def send_ctrl_c(self):
|
||||
"""
|
||||
Send Ctrl+C to the web page
|
||||
"""
|
||||
self.page.runJavaScript(
|
||||
"document.querySelector('textarea.xterm-helper-textarea').dispatchEvent(new KeyboardEvent('keypress', {charCode: 3}))"
|
||||
)
|
||||
|
||||
def set_readonly(self, readonly: bool):
|
||||
"""
|
||||
Set the web console to read-only mode.
|
||||
"""
|
||||
if not isinstance(readonly, bool):
|
||||
raise ValueError("Readonly must be a boolean.")
|
||||
self.setEnabled(not readonly)
|
||||
|
||||
def cleanup(self):
|
||||
"""
|
||||
Clean up the registry by removing any instances that are no longer valid.
|
||||
"""
|
||||
self._startup_timer.stop()
|
||||
_web_console_registry.unregister(self)
|
||||
super().cleanup()
|
||||
|
||||
|
||||
class BECShell(WebConsole):
|
||||
"""
|
||||
A WebConsole pre-configured to run the BEC shell.
|
||||
We cannot simply expose the web console properties to Qt as we need to have a deterministic
|
||||
startup behavior for sharing the same shell instance across multiple widgets.
|
||||
"""
|
||||
|
||||
ICON_NAME = "hub"
|
||||
|
||||
def __init__(self, parent=None, config=None, client=None, gui_id=None, **kwargs):
|
||||
super().__init__(
|
||||
parent=parent,
|
||||
config=config,
|
||||
client=client,
|
||||
gui_id=gui_id,
|
||||
is_bec_shell=True,
|
||||
unique_id="bec_shell",
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
import sys
|
||||
|
||||
app = QApplication(sys.argv)
|
||||
widget = QTabWidget()
|
||||
|
||||
# Create two consoles with different unique_ids
|
||||
web_console1 = WebConsole(startup_cmd="bec --nogui", unique_id="console1")
|
||||
web_console2 = WebConsole(startup_cmd="htop")
|
||||
web_console3 = WebConsole(startup_cmd="bec --nogui", unique_id="console1")
|
||||
widget.addTab(web_console1, "Console 1")
|
||||
widget.addTab(web_console2, "Console 2")
|
||||
widget.addTab(web_console3, "Console 3 -- mirror of Console 1")
|
||||
widget.show()
|
||||
|
||||
# Demonstrate page sharing:
|
||||
# After initialization, web_console2 can take ownership of console1's page:
|
||||
# web_console2.take_page_ownership("console1")
|
||||
|
||||
widget.resize(800, 600)
|
||||
|
||||
def _close_cons1():
|
||||
web_console2.close()
|
||||
web_console2.deleteLater()
|
||||
|
||||
# QTimer.singleShot(3000, _close_cons1)
|
||||
|
||||
sys.exit(app.exec_())
|
||||
@@ -1 +0,0 @@
|
||||
{'files': ['web_console.py']}
|
||||
@@ -1,57 +0,0 @@
|
||||
# Copyright (C) 2022 The Qt Company Ltd.
|
||||
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
|
||||
|
||||
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
|
||||
from qtpy.QtWidgets import QWidget
|
||||
|
||||
from bec_widgets.utils.bec_designer import designer_material_icon
|
||||
from bec_widgets.widgets.editors.web_console.web_console import WebConsole
|
||||
|
||||
DOM_XML = """
|
||||
<ui language='c++'>
|
||||
<widget class='WebConsole' name='web_console'>
|
||||
</widget>
|
||||
</ui>
|
||||
"""
|
||||
|
||||
|
||||
class WebConsolePlugin(QDesignerCustomWidgetInterface): # pragma: no cover
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._form_editor = None
|
||||
|
||||
def createWidget(self, parent):
|
||||
if parent is None:
|
||||
return QWidget()
|
||||
t = WebConsole(parent)
|
||||
return t
|
||||
|
||||
def domXml(self):
|
||||
return DOM_XML
|
||||
|
||||
def group(self):
|
||||
return "BEC Developer"
|
||||
|
||||
def icon(self):
|
||||
return designer_material_icon(WebConsole.ICON_NAME)
|
||||
|
||||
def includeFile(self):
|
||||
return "web_console"
|
||||
|
||||
def initialize(self, form_editor):
|
||||
self._form_editor = form_editor
|
||||
|
||||
def isContainer(self):
|
||||
return False
|
||||
|
||||
def isInitialized(self):
|
||||
return self._form_editor is not None
|
||||
|
||||
def name(self):
|
||||
return "WebConsole"
|
||||
|
||||
def toolTip(self):
|
||||
return ""
|
||||
|
||||
def whatsThis(self):
|
||||
return self.toolTip()
|
||||
@@ -0,0 +1,11 @@
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
import sys
|
||||
|
||||
from pyside6_qtermwidget import QTermWidget # pylint: disable=ungrouped-imports
|
||||
from qtpy.QtWidgets import QApplication # pylint: disable=ungrouped-imports
|
||||
|
||||
app = QApplication(sys.argv)
|
||||
widget = QTermWidget()
|
||||
|
||||
widget.show()
|
||||
sys.exit(app.exec())
|
||||
@@ -0,0 +1,227 @@
|
||||
from functools import wraps
|
||||
from typing import Sequence
|
||||
|
||||
from PySide6.QtGui import QAction, QFont, QResizeEvent
|
||||
from qtpy.QtCore import QIODevice, QPoint, QSize, QUrl, Signal # type: ignore
|
||||
from qtpy.QtGui import QKeyEvent
|
||||
from qtpy.QtWidgets import QLabel, QVBoxLayout, QWidget
|
||||
|
||||
try:
|
||||
from pyside6_qtermwidget import QTermWidget
|
||||
except ImportError:
|
||||
QTermWidget = None
|
||||
|
||||
|
||||
def _forward(func):
|
||||
@wraps(func)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
target = getattr(self, "_main_widget")
|
||||
method = getattr(target, func.__name__)
|
||||
if QTermWidget:
|
||||
return method(*args, **kwargs)
|
||||
else:
|
||||
...
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
class BecTerminal(QWidget):
|
||||
|
||||
activity = Signal()
|
||||
bell = Signal(str)
|
||||
copyAvailable = Signal(bool)
|
||||
currentDirectoryChanged = Signal(str)
|
||||
finished = Signal()
|
||||
profileChanged = Signal(str)
|
||||
receivedData = Signal(str)
|
||||
silence = Signal()
|
||||
termGetFocus = Signal()
|
||||
termKeyPressed = Signal(QKeyEvent)
|
||||
termLostFocus = Signal()
|
||||
titleChanged = Signal()
|
||||
urlActivated = Signal(QUrl, bool)
|
||||
|
||||
def __init__(self, /, parent: QWidget | None = None, **kwargs) -> None:
|
||||
super().__init__(parent)
|
||||
self._layout = QVBoxLayout()
|
||||
self.setLayout(self._layout)
|
||||
if QTermWidget:
|
||||
self._main_widget = QTermWidget(parent=self)
|
||||
self.activity.connect(self._main_widget.activity)
|
||||
self.bell.connect(self._main_widget.bell)
|
||||
self.copyAvailable.connect(self._main_widget.copyAvailable)
|
||||
self.currentDirectoryChanged.connect(self._main_widget.currentDirectoryChanged)
|
||||
self.finished.connect(self._main_widget.finished)
|
||||
self.profileChanged.connect(self._main_widget.profileChanged)
|
||||
self.receivedData.connect(self._main_widget.receivedData)
|
||||
self.silence.connect(self._main_widget.silence)
|
||||
self.termGetFocus.connect(self._main_widget.termGetFocus)
|
||||
self.termKeyPressed.connect(self._main_widget.termKeyPressed)
|
||||
self.termLostFocus.connect(self._main_widget.termLostFocus)
|
||||
self.titleChanged.connect(self._main_widget.titleChanged)
|
||||
self.urlActivated.connect(self._main_widget.urlActivated)
|
||||
else:
|
||||
self._main_widget = QLabel("pyside6_qterminal is not installed!")
|
||||
|
||||
self._layout.addWidget(self._main_widget)
|
||||
|
||||
@_forward
|
||||
def addCustomColorSchemeDir(self, custom_dir: str, /) -> None: ...
|
||||
@_forward
|
||||
def autoHideMouseAfter(self, delay: int, /) -> None: ...
|
||||
@_forward
|
||||
def availableColorSchemes(self) -> list[str]: ...
|
||||
@_forward
|
||||
def availableKeyBindings(self) -> list[str]: ...
|
||||
@_forward
|
||||
def bracketText(self, text: str, /) -> None: ...
|
||||
@_forward
|
||||
def bracketedPasteModeIsDisabled(self, /) -> bool: ...
|
||||
@_forward
|
||||
def changeDir(self, dir: str, /) -> None: ...
|
||||
@_forward
|
||||
def clear(self, /) -> None: ...
|
||||
@_forward
|
||||
def clearCustomKeyBindingsDir(self, /) -> None: ...
|
||||
@_forward
|
||||
def copyClipboard(self, /) -> None: ...
|
||||
@_forward
|
||||
def disableBracketedPasteMode(self, disable: bool, /) -> None: ...
|
||||
@_forward
|
||||
def filterActions(self, position: QPoint, /) -> list[QAction]: ...
|
||||
@_forward
|
||||
def flowControlEnabled(self, /) -> bool: ...
|
||||
@_forward
|
||||
def getAvailableColorSchemes(self, /) -> list[str]: ...
|
||||
@_forward
|
||||
def getForegroundProcessId(self, /) -> int: ...
|
||||
@_forward
|
||||
def getMargin(self, /) -> int: ...
|
||||
@_forward
|
||||
def getPtySlaveFd(self, /) -> int: ...
|
||||
@_forward
|
||||
def getSelectionEnd(self, row: int, column: int, /) -> None: ...
|
||||
@_forward
|
||||
def getSelectionStart(self, row: int, column: int, /) -> None: ...
|
||||
@_forward
|
||||
def getShellPID(self, /) -> int: ...
|
||||
@_forward
|
||||
def getTerminalFont(self, /) -> QFont: ...
|
||||
@_forward
|
||||
def historyLinesCount(self, /) -> int: ...
|
||||
@_forward
|
||||
def historySize(self, /) -> int: ...
|
||||
@_forward
|
||||
def icon(self, /) -> str: ...
|
||||
@_forward
|
||||
def isBidiEnabled(self, /) -> bool: ...
|
||||
@_forward
|
||||
def isTitleChanged(self, /) -> bool: ...
|
||||
@_forward
|
||||
def keyBindings(self, /) -> str: ...
|
||||
@_forward
|
||||
def pasteClipboard(self, /) -> None: ...
|
||||
@_forward
|
||||
def pasteSelection(self, /) -> None: ...
|
||||
@_forward
|
||||
def resizeEvent(self, arg__1: QResizeEvent, /) -> None: ...
|
||||
@_forward
|
||||
def saveHistory(self, device: QIODevice, /) -> None: ...
|
||||
@_forward
|
||||
def screenColumnsCount(self, /) -> int: ...
|
||||
@_forward
|
||||
def screenLinesCount(self, /) -> int: ...
|
||||
@_forward
|
||||
def scrollToEnd(self, /) -> None: ...
|
||||
@_forward
|
||||
def selectedText(self, /, preserveLineBreaks: bool = ...) -> str: ...
|
||||
@_forward
|
||||
def selectionChanged(self, textSelected: bool, /) -> None: ...
|
||||
@_forward
|
||||
def sendKeyEvent(self, e: QKeyEvent, /) -> None: ...
|
||||
@_forward
|
||||
def sendText(self, text: str, /) -> None: ...
|
||||
@_forward
|
||||
def sessionFinished(self, /) -> None: ...
|
||||
@_forward
|
||||
def setArgs(self, args: Sequence[str], /) -> None: ...
|
||||
@_forward
|
||||
def setAutoClose(self, arg__1: bool, /) -> None: ...
|
||||
@_forward
|
||||
def setBidiEnabled(self, enabled: bool, /) -> None: ...
|
||||
@_forward
|
||||
def setBlinkingCursor(self, blink: bool, /) -> None: ...
|
||||
@_forward
|
||||
def setBoldIntense(self, boldIntense: bool, /) -> None: ...
|
||||
@_forward
|
||||
def setColorScheme(self, name: str, /) -> None: ...
|
||||
@_forward
|
||||
def setConfirmMultilinePaste(self, confirmMultilinePaste: bool, /) -> None: ...
|
||||
@_forward
|
||||
def setCustomKeyBindingsDir(self, custom_dir: str, /) -> None: ...
|
||||
@_forward
|
||||
def setDrawLineChars(self, drawLineChars: bool, /) -> None: ...
|
||||
@_forward
|
||||
def setEnvironment(self, environment: Sequence[str], /) -> None: ...
|
||||
@_forward
|
||||
def setFlowControlEnabled(self, enabled: bool, /) -> None: ...
|
||||
@_forward
|
||||
def setFlowControlWarningEnabled(self, enabled: bool, /) -> None: ...
|
||||
@_forward
|
||||
def setHistorySize(self, lines: int, /) -> None: ...
|
||||
@_forward
|
||||
def setKeyBindings(self, kb: str, /) -> None: ...
|
||||
@_forward
|
||||
def setMargin(self, arg__1: int, /) -> None: ...
|
||||
@_forward
|
||||
def setMonitorActivity(self, arg__1: bool, /) -> None: ...
|
||||
@_forward
|
||||
def setMonitorSilence(self, arg__1: bool, /) -> None: ...
|
||||
@_forward
|
||||
def setMotionAfterPasting(self, arg__1: int, /) -> None: ...
|
||||
@_forward
|
||||
def setSelectionEnd(self, row: int, column: int, /) -> None: ...
|
||||
@_forward
|
||||
def setSelectionStart(self, row: int, column: int, /) -> None: ...
|
||||
@_forward
|
||||
def setShellProgram(self, program: str, /) -> None: ...
|
||||
@_forward
|
||||
def setSilenceTimeout(self, seconds: int, /) -> None: ...
|
||||
@_forward
|
||||
def setSize(self, arg__1: QSize, /) -> None: ...
|
||||
@_forward
|
||||
def setTerminalBackgroundImage(self, backgroundImage: str, /) -> None: ...
|
||||
@_forward
|
||||
def setTerminalBackgroundMode(self, mode: int, /) -> None: ...
|
||||
@_forward
|
||||
def setTerminalFont(self, font: QFont | str | Sequence[str], /) -> None: ...
|
||||
@_forward
|
||||
def setTerminalOpacity(self, level: float, /) -> None: ...
|
||||
@_forward
|
||||
def setTerminalSizeHint(self, enabled: bool, /) -> None: ...
|
||||
@_forward
|
||||
def setTrimPastedTrailingNewlines(self, trimPastedTrailingNewlines: bool, /) -> None: ...
|
||||
@_forward
|
||||
def setWordCharacters(self, chars: str, /) -> None: ...
|
||||
@_forward
|
||||
def setWorkingDirectory(self, dir: str, /) -> None: ...
|
||||
@_forward
|
||||
def sizeHint(self, /) -> QSize: ...
|
||||
@_forward
|
||||
def startShellProgram(self, /) -> None: ...
|
||||
@_forward
|
||||
def startTerminalTeletype(self, /) -> None: ...
|
||||
@_forward
|
||||
def terminalSizeHint(self, /) -> bool: ...
|
||||
@_forward
|
||||
def title(self, /) -> str: ...
|
||||
@_forward
|
||||
def toggleShowSearchBar(self, /) -> None: ...
|
||||
@_forward
|
||||
def wordCharacters(self, /) -> str: ...
|
||||
@_forward
|
||||
def workingDirectory(self, /) -> str: ...
|
||||
@_forward
|
||||
def zoomIn(self, /) -> None: ...
|
||||
@_forward
|
||||
def zoomOut(self, /) -> None: ...
|
||||
@@ -93,8 +93,8 @@ def test_available_widgets(qtbot, connected_client_gui_obj):
|
||||
if object_name == "BECShell":
|
||||
continue
|
||||
|
||||
# Skip WebConsole as ttyd is not installed
|
||||
if object_name == "WebConsole":
|
||||
# Skip BecConsole as ttyd is not installed
|
||||
if object_name == "BecConsole":
|
||||
continue
|
||||
|
||||
#############################
|
||||
|
||||
@@ -918,7 +918,7 @@ class TestToolbarFunctionality:
|
||||
action.trigger()
|
||||
if action_name == "terminal":
|
||||
mock_new.assert_called_once_with(
|
||||
widget="WebConsole", closable=True, startup_cmd=None
|
||||
widget="BecConsole", closable=True, startup_cmd=None
|
||||
)
|
||||
else:
|
||||
mock_new.assert_called_once_with(widget=widget_type)
|
||||
@@ -2272,7 +2272,7 @@ class TestFlatToolbarActions:
|
||||
"flat_queue": "BECQueue",
|
||||
"flat_status": "BECStatusBox",
|
||||
"flat_progress_bar": "RingProgressBar",
|
||||
"flat_terminal": "WebConsole",
|
||||
"flat_terminal": "BecConsole",
|
||||
"flat_bec_shell": "BECShell",
|
||||
"flat_sbb_monitor": "SBBMonitor",
|
||||
}
|
||||
|
||||
@@ -5,11 +5,11 @@ from qtpy.QtCore import Qt
|
||||
from qtpy.QtGui import QHideEvent
|
||||
from qtpy.QtNetwork import QAuthenticator
|
||||
|
||||
from bec_widgets.widgets.editors.web_console.web_console import (
|
||||
from bec_widgets.widgets.editors.bec_console.bec_console import (
|
||||
BecConsole,
|
||||
BECShell,
|
||||
ConsoleMode,
|
||||
WebConsole,
|
||||
_web_console_registry,
|
||||
_bec_console_registry,
|
||||
)
|
||||
|
||||
from .client_mocks import mocked_client
|
||||
@@ -19,19 +19,19 @@ from .client_mocks import mocked_client
|
||||
def mocked_server_startup():
|
||||
"""Mock the web console server startup process."""
|
||||
with mock.patch(
|
||||
"bec_widgets.widgets.editors.web_console.web_console.subprocess"
|
||||
"bec_widgets.widgets.editors.bec_console.bec_console.subprocess"
|
||||
) as mock_subprocess:
|
||||
with mock.patch.object(_web_console_registry, "_wait_for_server_port"):
|
||||
_web_console_registry._server_port = 12345
|
||||
with mock.patch.object(_bec_console_registry, "_wait_for_server_port"):
|
||||
_bec_console_registry._server_port = 12345
|
||||
yield mock_subprocess
|
||||
|
||||
|
||||
def static_console(qtbot, client, unique_id: str | None = None):
|
||||
"""Fixture to provide a static unique_id for WebConsole tests."""
|
||||
"""Fixture to provide a static unique_id for BecConsole tests."""
|
||||
if unique_id is None:
|
||||
widget = WebConsole(client=client)
|
||||
widget = BecConsole(client=client)
|
||||
else:
|
||||
widget = WebConsole(client=client, unique_id=unique_id)
|
||||
widget = BecConsole(client=client, unique_id=unique_id)
|
||||
qtbot.addWidget(widget)
|
||||
qtbot.waitExposed(widget)
|
||||
return widget
|
||||
@@ -39,7 +39,7 @@ def static_console(qtbot, client, unique_id: str | None = None):
|
||||
|
||||
@pytest.fixture
|
||||
def console_widget(qtbot, mocked_client, mocked_server_startup):
|
||||
"""Create a WebConsole widget with mocked server startup."""
|
||||
"""Create a BecConsole widget with mocked server startup."""
|
||||
yield static_console(qtbot, mocked_client)
|
||||
|
||||
|
||||
@@ -54,26 +54,26 @@ def bec_shell_widget(qtbot, mocked_client, mocked_server_startup):
|
||||
|
||||
@pytest.fixture
|
||||
def console_widget_with_static_id(qtbot, mocked_client, mocked_server_startup):
|
||||
"""Create a WebConsole widget with a static unique ID."""
|
||||
"""Create a BecConsole widget with a static unique ID."""
|
||||
yield static_console(qtbot, mocked_client, unique_id="test_console")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def two_console_widgets_same_id(qtbot, mocked_client, mocked_server_startup):
|
||||
"""Create two WebConsole widgets sharing the same unique ID."""
|
||||
"""Create two BecConsole widgets sharing the same unique ID."""
|
||||
widget1 = static_console(qtbot, mocked_client, unique_id="shared_console")
|
||||
widget2 = static_console(qtbot, mocked_client, unique_id="shared_console")
|
||||
yield widget1, widget2
|
||||
|
||||
|
||||
def test_web_console_widget_initialization(console_widget):
|
||||
def test_bec_console_widget_initialization(console_widget):
|
||||
assert (
|
||||
console_widget.page.url().toString()
|
||||
== f"http://localhost:{_web_console_registry._server_port}"
|
||||
== f"http://localhost:{_bec_console_registry._server_port}"
|
||||
)
|
||||
|
||||
|
||||
def test_web_console_write(console_widget):
|
||||
def test_bec_console_write(console_widget):
|
||||
# Test the write method
|
||||
with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js:
|
||||
console_widget.write("Hello, World!")
|
||||
@@ -81,7 +81,7 @@ def test_web_console_write(console_widget):
|
||||
assert mock.call('window.term.paste("Hello, World!");') in mock_run_js.mock_calls
|
||||
|
||||
|
||||
def test_web_console_write_no_return(console_widget):
|
||||
def test_bec_console_write_no_return(console_widget):
|
||||
# Test the write method with send_return=False
|
||||
with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js:
|
||||
console_widget.write("Hello, World!", send_return=False)
|
||||
@@ -90,7 +90,7 @@ def test_web_console_write_no_return(console_widget):
|
||||
assert mock_run_js.call_count == 1
|
||||
|
||||
|
||||
def test_web_console_send_return(console_widget):
|
||||
def test_bec_console_send_return(console_widget):
|
||||
# Test the send_return method
|
||||
with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js:
|
||||
console_widget.send_return()
|
||||
@@ -100,7 +100,7 @@ def test_web_console_send_return(console_widget):
|
||||
assert mock_run_js.call_count == 1
|
||||
|
||||
|
||||
def test_web_console_send_ctrl_c(console_widget):
|
||||
def test_bec_console_send_ctrl_c(console_widget):
|
||||
# Test the send_ctrl_c method
|
||||
with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js:
|
||||
console_widget.send_ctrl_c()
|
||||
@@ -110,31 +110,31 @@ def test_web_console_send_ctrl_c(console_widget):
|
||||
assert mock_run_js.call_count == 1
|
||||
|
||||
|
||||
def test_web_console_authenticate(console_widget):
|
||||
def test_bec_console_authenticate(console_widget):
|
||||
# Test the _authenticate method
|
||||
token = _web_console_registry._token
|
||||
token = _bec_console_registry._token
|
||||
mock_auth = mock.MagicMock(spec=QAuthenticator)
|
||||
console_widget._authenticate(None, mock_auth)
|
||||
mock_auth.setUser.assert_called_once_with("user")
|
||||
mock_auth.setPassword.assert_called_once_with(token)
|
||||
|
||||
|
||||
def test_web_console_registry_wait_for_server_port():
|
||||
def test_bec_console_registry_wait_for_server_port():
|
||||
# Test the _wait_for_server_port method
|
||||
with mock.patch.object(_web_console_registry, "_server_process") as mock_subprocess:
|
||||
with mock.patch.object(_bec_console_registry, "_server_process") as mock_subprocess:
|
||||
mock_subprocess.stderr.readline.side_effect = [b"Starting", b"Listening on port: 12345"]
|
||||
_web_console_registry._wait_for_server_port()
|
||||
assert _web_console_registry._server_port == 12345
|
||||
_bec_console_registry._wait_for_server_port()
|
||||
assert _bec_console_registry._server_port == 12345
|
||||
|
||||
|
||||
def test_web_console_registry_wait_for_server_port_timeout():
|
||||
def test_bec_console_registry_wait_for_server_port_timeout():
|
||||
# Test the _wait_for_server_port method with timeout
|
||||
with mock.patch.object(_web_console_registry, "_server_process") as mock_subprocess:
|
||||
with mock.patch.object(_bec_console_registry, "_server_process") as mock_subprocess:
|
||||
with pytest.raises(TimeoutError):
|
||||
_web_console_registry._wait_for_server_port(timeout=0.1)
|
||||
_bec_console_registry._wait_for_server_port(timeout=0.1)
|
||||
|
||||
|
||||
def test_web_console_startup_command_execution(console_widget, qtbot):
|
||||
def test_bec_console_startup_command_execution(console_widget, qtbot):
|
||||
"""Test that the startup command is triggered after successful initialization."""
|
||||
# Set a custom startup command
|
||||
console_widget.startup_cmd = "test startup command"
|
||||
@@ -196,7 +196,7 @@ def test_bec_shell_startup_contains_gui_id(bec_shell_widget):
|
||||
assert bec_shell.startup_cmd == "bec --gui-id test_gui_id"
|
||||
|
||||
|
||||
def test_web_console_set_readonly(console_widget):
|
||||
def test_bec_console_set_readonly(console_widget):
|
||||
# Test the set_readonly method
|
||||
console_widget.set_readonly(True)
|
||||
assert not console_widget.isEnabled()
|
||||
@@ -205,31 +205,31 @@ def test_web_console_set_readonly(console_widget):
|
||||
assert console_widget.isEnabled()
|
||||
|
||||
|
||||
def test_web_console_with_unique_id(console_widget_with_static_id):
|
||||
"""Test creating a WebConsole with a unique_id."""
|
||||
def test_bec_console_with_unique_id(console_widget_with_static_id):
|
||||
"""Test creating a BecConsole with a unique_id."""
|
||||
widget = console_widget_with_static_id
|
||||
|
||||
assert widget._unique_id == "test_console"
|
||||
assert widget._unique_id in _web_console_registry._page_registry
|
||||
page_info = _web_console_registry.get_page_info("test_console")
|
||||
assert widget._unique_id in _bec_console_registry._page_registry
|
||||
page_info = _bec_console_registry.get_page_info("test_console")
|
||||
assert page_info is not None
|
||||
assert page_info.owner_gui_id == widget.gui_id
|
||||
assert widget.gui_id in page_info.widget_ids
|
||||
|
||||
|
||||
def test_web_console_page_sharing(two_console_widgets_same_id):
|
||||
def test_bec_console_page_sharing(two_console_widgets_same_id):
|
||||
"""Test that two widgets can share the same page using unique_id."""
|
||||
widget1, widget2 = two_console_widgets_same_id
|
||||
|
||||
# Both should reference the same page in the registry
|
||||
page_info = _web_console_registry.get_page_info("shared_console")
|
||||
page_info = _bec_console_registry.get_page_info("shared_console")
|
||||
assert page_info is not None
|
||||
assert widget1.gui_id in page_info.widget_ids
|
||||
assert widget2.gui_id in page_info.widget_ids
|
||||
assert widget1.page == widget2.page
|
||||
|
||||
|
||||
def test_web_console_has_ownership(console_widget_with_static_id):
|
||||
def test_bec_console_has_ownership(console_widget_with_static_id):
|
||||
"""Test the has_ownership method."""
|
||||
widget = console_widget_with_static_id
|
||||
|
||||
@@ -237,7 +237,7 @@ def test_web_console_has_ownership(console_widget_with_static_id):
|
||||
assert widget.has_ownership()
|
||||
|
||||
|
||||
def test_web_console_yield_ownership(console_widget_with_static_id):
|
||||
def test_bec_console_yield_ownership(console_widget_with_static_id):
|
||||
"""Test yielding ownership of a page."""
|
||||
widget = console_widget_with_static_id
|
||||
|
||||
@@ -248,13 +248,13 @@ def test_web_console_yield_ownership(console_widget_with_static_id):
|
||||
|
||||
# Widget should no longer have ownership
|
||||
assert not widget.has_ownership()
|
||||
page_info = _web_console_registry.get_page_info("test_console")
|
||||
page_info = _bec_console_registry.get_page_info("test_console")
|
||||
assert page_info.owner_gui_id is None
|
||||
# Overlay should be shown
|
||||
assert widget._mode == ConsoleMode.INACTIVE
|
||||
|
||||
|
||||
def test_web_console_take_page_ownership(two_console_widgets_same_id):
|
||||
def test_bec_console_take_page_ownership(two_console_widgets_same_id):
|
||||
"""Test taking ownership of a page."""
|
||||
widget1, widget2 = two_console_widgets_same_id
|
||||
|
||||
@@ -273,7 +273,7 @@ def test_web_console_take_page_ownership(two_console_widgets_same_id):
|
||||
assert widget1._mode == ConsoleMode.INACTIVE
|
||||
|
||||
|
||||
def test_web_console_hide_event_yields_ownership(qtbot, console_widget_with_static_id):
|
||||
def test_bec_console_hide_event_yields_ownership(qtbot, console_widget_with_static_id):
|
||||
"""Test that hideEvent yields ownership."""
|
||||
widget = console_widget_with_static_id
|
||||
|
||||
@@ -287,11 +287,11 @@ def test_web_console_hide_event_yields_ownership(qtbot, console_widget_with_stat
|
||||
|
||||
# Widget should have yielded ownership
|
||||
assert not widget.has_ownership()
|
||||
page_info = _web_console_registry.get_page_info("test_console")
|
||||
page_info = _bec_console_registry.get_page_info("test_console")
|
||||
assert page_info.owner_gui_id is None
|
||||
|
||||
|
||||
def test_web_console_show_event_takes_ownership(console_widget_with_static_id):
|
||||
def test_bec_console_show_event_takes_ownership(console_widget_with_static_id):
|
||||
"""Test that showEvent takes ownership when page has no owner."""
|
||||
widget = console_widget_with_static_id
|
||||
|
||||
@@ -308,7 +308,7 @@ def test_web_console_show_event_takes_ownership(console_widget_with_static_id):
|
||||
assert not widget.overlay.isVisible()
|
||||
|
||||
|
||||
def test_web_console_mouse_press_takes_ownership(qtbot, two_console_widgets_same_id):
|
||||
def test_bec_console_mouse_press_takes_ownership(qtbot, two_console_widgets_same_id):
|
||||
"""Test that clicking on overlay takes ownership."""
|
||||
widget1, widget2 = two_console_widgets_same_id
|
||||
widget1.show()
|
||||
@@ -328,20 +328,20 @@ def test_web_console_mouse_press_takes_ownership(qtbot, two_console_widgets_same
|
||||
assert not widget1.has_ownership()
|
||||
|
||||
|
||||
def test_web_console_registry_cleanup_removes_page(console_widget_with_static_id):
|
||||
def test_bec_console_registry_cleanup_removes_page(console_widget_with_static_id):
|
||||
"""Test that the registry cleans up pages when all widgets are removed."""
|
||||
widget = console_widget_with_static_id
|
||||
|
||||
assert widget._unique_id in _web_console_registry._page_registry
|
||||
assert widget._unique_id in _bec_console_registry._page_registry
|
||||
|
||||
# Cleanup the widget
|
||||
widget.cleanup()
|
||||
|
||||
# Page should be removed from registry
|
||||
assert widget._unique_id not in _web_console_registry._page_registry
|
||||
assert widget._unique_id not in _bec_console_registry._page_registry
|
||||
|
||||
|
||||
def test_web_console_without_unique_id_no_page_sharing(console_widget):
|
||||
def test_bec_console_without_unique_id_no_page_sharing(console_widget):
|
||||
"""Test that widgets without unique_id don't participate in page sharing."""
|
||||
widget = console_widget
|
||||
|
||||
@@ -350,20 +350,20 @@ def test_web_console_without_unique_id_no_page_sharing(console_widget):
|
||||
assert not widget.has_ownership() # Should return False for non-unique widgets
|
||||
|
||||
|
||||
def test_web_console_registry_get_page_info_nonexistent(qtbot, mocked_client):
|
||||
def test_bec_console_registry_get_page_info_nonexistent(qtbot, mocked_client):
|
||||
"""Test getting page info for a non-existent page."""
|
||||
page_info = _web_console_registry.get_page_info("nonexistent")
|
||||
page_info = _bec_console_registry.get_page_info("nonexistent")
|
||||
assert page_info is None
|
||||
|
||||
|
||||
def test_web_console_take_ownership_without_unique_id(console_widget):
|
||||
def test_bec_console_take_ownership_without_unique_id(console_widget):
|
||||
"""Test that take_page_ownership fails gracefully without unique_id."""
|
||||
widget = console_widget
|
||||
# Should not crash when taking ownership without unique_id
|
||||
widget.take_page_ownership()
|
||||
|
||||
|
||||
def test_web_console_yield_ownership_without_unique_id(console_widget):
|
||||
def test_bec_console_yield_ownership_without_unique_id(console_widget):
|
||||
"""Test that yield_ownership fails gracefully without unique_id."""
|
||||
widget = console_widget
|
||||
# Should not crash when yielding ownership without unique_id
|
||||
@@ -372,7 +372,7 @@ def test_web_console_yield_ownership_without_unique_id(console_widget):
|
||||
|
||||
def test_registry_yield_ownership_gui_id_not_in_instances():
|
||||
"""Test registry yield_ownership returns False when gui_id not in instances."""
|
||||
result = _web_console_registry.yield_ownership("nonexistent_gui_id")
|
||||
result = _bec_console_registry.yield_ownership("nonexistent_gui_id")
|
||||
assert result is False
|
||||
|
||||
|
||||
@@ -382,9 +382,9 @@ def test_registry_yield_ownership_instance_is_none(console_widget_with_static_id
|
||||
gui_id = widget.gui_id
|
||||
|
||||
# Store the gui_id and simulate the weakref being dead
|
||||
_web_console_registry._instances[gui_id] = lambda: None
|
||||
_bec_console_registry._instances[gui_id] = lambda: None
|
||||
|
||||
result = _web_console_registry.yield_ownership(gui_id)
|
||||
result = _bec_console_registry.yield_ownership(gui_id)
|
||||
assert result is False
|
||||
|
||||
|
||||
@@ -395,7 +395,7 @@ def test_registry_yield_ownership_unique_id_none(console_widget_with_static_id):
|
||||
unique_id = widget._unique_id
|
||||
widget._unique_id = None
|
||||
|
||||
result = _web_console_registry.yield_ownership(gui_id)
|
||||
result = _bec_console_registry.yield_ownership(gui_id)
|
||||
assert result is False
|
||||
|
||||
widget._unique_id = unique_id # Restore for cleanup
|
||||
@@ -408,7 +408,7 @@ def test_registry_yield_ownership_unique_id_not_in_page_registry(console_widget_
|
||||
unique_id = widget._unique_id
|
||||
widget._unique_id = "nonexistent_unique_id"
|
||||
|
||||
result = _web_console_registry.yield_ownership(gui_id)
|
||||
result = _bec_console_registry.yield_ownership(gui_id)
|
||||
assert result is False
|
||||
|
||||
widget._unique_id = unique_id # Restore for cleanup
|
||||
@@ -416,7 +416,7 @@ def test_registry_yield_ownership_unique_id_not_in_page_registry(console_widget_
|
||||
|
||||
def test_registry_owner_is_visible_page_info_none():
|
||||
"""Test owner_is_visible returns False when page info doesn't exist."""
|
||||
result = _web_console_registry.owner_is_visible("nonexistent_page")
|
||||
result = _bec_console_registry.owner_is_visible("nonexistent_page")
|
||||
assert result is False
|
||||
|
||||
|
||||
@@ -426,10 +426,10 @@ def test_registry_owner_is_visible_no_owner(console_widget_with_static_id):
|
||||
|
||||
# Yield ownership so there's no owner
|
||||
widget.yield_ownership()
|
||||
page_info = _web_console_registry.get_page_info(widget._unique_id)
|
||||
page_info = _bec_console_registry.get_page_info(widget._unique_id)
|
||||
assert page_info.owner_gui_id is None
|
||||
|
||||
result = _web_console_registry.owner_is_visible(widget._unique_id)
|
||||
result = _bec_console_registry.owner_is_visible(widget._unique_id)
|
||||
assert result is False
|
||||
|
||||
|
||||
@@ -439,9 +439,9 @@ def test_registry_owner_is_visible_owner_ref_none(console_widget_with_static_id)
|
||||
unique_id = widget._unique_id
|
||||
|
||||
# Remove owner from instances dict
|
||||
del _web_console_registry._instances[widget.gui_id]
|
||||
del _bec_console_registry._instances[widget.gui_id]
|
||||
|
||||
result = _web_console_registry.owner_is_visible(unique_id)
|
||||
result = _bec_console_registry.owner_is_visible(unique_id)
|
||||
assert result is False
|
||||
|
||||
|
||||
@@ -452,9 +452,9 @@ def test_registry_owner_is_visible_owner_instance_none(console_widget_with_stati
|
||||
gui_id = widget.gui_id
|
||||
|
||||
# Simulate dead weakref
|
||||
_web_console_registry._instances[gui_id] = lambda: None
|
||||
_bec_console_registry._instances[gui_id] = lambda: None
|
||||
|
||||
result = _web_console_registry.owner_is_visible(unique_id)
|
||||
result = _bec_console_registry.owner_is_visible(unique_id)
|
||||
assert result is False
|
||||
|
||||
|
||||
@@ -463,7 +463,7 @@ def test_registry_owner_is_visible_owner_visible(console_widget_with_static_id):
|
||||
widget = console_widget_with_static_id
|
||||
widget.show()
|
||||
|
||||
result = _web_console_registry.owner_is_visible(widget._unique_id)
|
||||
result = _bec_console_registry.owner_is_visible(widget._unique_id)
|
||||
assert result is True
|
||||
|
||||
|
||||
@@ -472,5 +472,5 @@ def test_registry_owner_is_visible_owner_not_visible(console_widget_with_static_
|
||||
widget = console_widget_with_static_id
|
||||
widget.hide()
|
||||
|
||||
result = _web_console_registry.owner_is_visible(widget._unique_id)
|
||||
result = _bec_console_registry.owner_is_visible(widget._unique_id)
|
||||
assert result is False
|
||||
|
||||
Reference in New Issue
Block a user