mirror of
https://github.com/bec-project/bec_widgets.git
synced 2026-05-04 22:04:21 +02:00
fix(bec_console): persistent bec session
This commit is contained in:
@@ -1,12 +1,12 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
from dataclasses import dataclass, field
|
||||
from uuid import uuid4
|
||||
from weakref import WeakValueDictionary
|
||||
|
||||
import shiboken6
|
||||
from bec_lib.logger import bec_logger
|
||||
from pydantic import BaseModel
|
||||
from qtpy.QtCore import Qt, Signal
|
||||
from qtpy.QtGui import QMouseEvent
|
||||
from qtpy.QtWidgets import (
|
||||
@@ -39,17 +39,18 @@ class ConsoleMode(str, enum.Enum):
|
||||
HIDDEN = "hidden"
|
||||
|
||||
|
||||
class _TerminalOwnerInfo(BaseModel):
|
||||
@dataclass
|
||||
class _TerminalOwnerInfo:
|
||||
"""Should be managed only by the BecConsoleRegistry. Consoles should ask the registry for
|
||||
necessary ownership info."""
|
||||
|
||||
owner_console_id: str | None = None
|
||||
registered_console_ids: set[str] = set()
|
||||
instance: BecTerminal
|
||||
terminal_id: str
|
||||
registered_console_ids: set[str] = field(default_factory=set)
|
||||
instance: BecTerminal | None = None
|
||||
terminal_id: str = ""
|
||||
initialized: bool = False
|
||||
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
persist_session: bool = False
|
||||
fallback_holder: QWidget | None = None
|
||||
|
||||
|
||||
class BecConsoleRegistry:
|
||||
@@ -64,6 +65,105 @@ class BecConsoleRegistry:
|
||||
self._consoles: WeakValueDictionary[str, BecConsole] = WeakValueDictionary()
|
||||
self._terminal_registry: dict[str, _TerminalOwnerInfo] = {}
|
||||
|
||||
@staticmethod
|
||||
def _is_valid_qobject(obj: object | None) -> bool:
|
||||
return obj is not None and shiboken6.isValid(obj)
|
||||
|
||||
def _connect_app_cleanup(self) -> None:
|
||||
app = QApplication.instance()
|
||||
if app is None:
|
||||
return
|
||||
app.aboutToQuit.connect(self.clear, Qt.ConnectionType.UniqueConnection)
|
||||
|
||||
def _new_terminal_info(self, console: BecConsole) -> _TerminalOwnerInfo:
|
||||
term = _BecTermClass()
|
||||
return _TerminalOwnerInfo(
|
||||
registered_console_ids={console.console_id},
|
||||
owner_console_id=console.console_id,
|
||||
instance=term,
|
||||
terminal_id=console.terminal_id,
|
||||
persist_session=console.persist_terminal_session,
|
||||
)
|
||||
|
||||
def _replace_terminal(self, info: _TerminalOwnerInfo, console: BecConsole) -> None:
|
||||
info.instance = _BecTermClass()
|
||||
info.initialized = False
|
||||
info.owner_console_id = console.console_id
|
||||
info.registered_console_ids.add(console.console_id)
|
||||
info.persist_session = info.persist_session or console.persist_terminal_session
|
||||
|
||||
def _delete_terminal_info(self, info: _TerminalOwnerInfo) -> None:
|
||||
if self._is_valid_qobject(info.instance):
|
||||
info.instance.deleteLater() # type: ignore[union-attr]
|
||||
info.instance = None
|
||||
if self._is_valid_qobject(info.fallback_holder):
|
||||
info.fallback_holder.deleteLater()
|
||||
info.fallback_holder = None
|
||||
|
||||
def _parking_parent(
|
||||
self,
|
||||
info: _TerminalOwnerInfo,
|
||||
console: BecConsole | None = None,
|
||||
*,
|
||||
avoid_console: bool = False,
|
||||
) -> QWidget | None:
|
||||
for console_id in info.registered_console_ids:
|
||||
candidate = self._consoles.get(console_id)
|
||||
if candidate is None or candidate is console:
|
||||
continue
|
||||
if self._is_valid_qobject(candidate):
|
||||
return candidate._term_holder
|
||||
|
||||
if console is None or not self._is_valid_qobject(console):
|
||||
return None
|
||||
|
||||
window = console.window()
|
||||
if window is not None and window is not console and self._is_valid_qobject(window):
|
||||
return window
|
||||
|
||||
if not avoid_console:
|
||||
return console._term_holder
|
||||
return None
|
||||
|
||||
def _fallback_holder(
|
||||
self,
|
||||
info: _TerminalOwnerInfo,
|
||||
console: BecConsole | None = None,
|
||||
*,
|
||||
avoid_console: bool = False,
|
||||
) -> QWidget:
|
||||
if not self._is_valid_qobject(info.fallback_holder):
|
||||
info.fallback_holder = QWidget(
|
||||
parent=self._parking_parent(info, console, avoid_console=avoid_console)
|
||||
)
|
||||
info.fallback_holder.setObjectName(f"_bec_console_terminal_holder_{info.terminal_id}")
|
||||
info.fallback_holder.hide()
|
||||
return info.fallback_holder
|
||||
|
||||
def _park_terminal(
|
||||
self,
|
||||
info: _TerminalOwnerInfo,
|
||||
console: BecConsole | None = None,
|
||||
*,
|
||||
avoid_console: bool = False,
|
||||
) -> None:
|
||||
if not self._is_valid_qobject(info.instance):
|
||||
return
|
||||
|
||||
parent = self._parking_parent(info, console, avoid_console=avoid_console)
|
||||
if parent is None and info.persist_session:
|
||||
parent = self._fallback_holder(info, console, avoid_console=avoid_console)
|
||||
|
||||
info.instance.hide() # type: ignore[union-attr]
|
||||
info.instance.setParent(parent) # type: ignore[union-attr]
|
||||
|
||||
def clear(self) -> None:
|
||||
"""Delete every tracked terminal and holder."""
|
||||
for info in list(self._terminal_registry.values()):
|
||||
self._delete_terminal_info(info)
|
||||
self._terminal_registry.clear()
|
||||
self._consoles.clear()
|
||||
|
||||
def register(self, console: BecConsole):
|
||||
"""
|
||||
Register an instance of BecConsole. If there is already a terminal with the associated
|
||||
@@ -72,22 +172,28 @@ class BecConsoleRegistry:
|
||||
Args:
|
||||
console (BecConsole): The instance to register.
|
||||
"""
|
||||
self._connect_app_cleanup()
|
||||
self._consoles[console.console_id] = console
|
||||
console_id, terminal_id = console.console_id, console.terminal_id
|
||||
if (term_info := self._terminal_registry.get(terminal_id)) is None or not shiboken6.isValid(
|
||||
term_info.instance
|
||||
):
|
||||
term = _BecTermClass()
|
||||
self._terminal_registry[terminal_id] = _TerminalOwnerInfo(
|
||||
registered_console_ids={console_id},
|
||||
owner_console_id=console_id,
|
||||
instance=term,
|
||||
terminal_id=terminal_id,
|
||||
)
|
||||
term_info = self._terminal_registry.get(terminal_id)
|
||||
if term_info is None:
|
||||
self._terminal_registry[terminal_id] = self._new_terminal_info(console)
|
||||
return
|
||||
|
||||
logger.info(f"Registered new console {console_id} for terminal {terminal_id}")
|
||||
term_info.persist_session = term_info.persist_session or console.persist_terminal_session
|
||||
had_registered_consoles = bool(term_info.registered_console_ids)
|
||||
term_info.registered_console_ids.add(console_id)
|
||||
if not self._is_valid_qobject(term_info.instance):
|
||||
self._replace_terminal(term_info, console)
|
||||
return
|
||||
if (
|
||||
term_info.owner_console_id is not None
|
||||
and term_info.owner_console_id not in self._consoles
|
||||
):
|
||||
term_info.owner_console_id = None
|
||||
if term_info.owner_console_id is None and not had_registered_consoles:
|
||||
term_info.owner_console_id = console_id
|
||||
logger.info(f"Registered new console {console_id} for terminal {terminal_id}")
|
||||
|
||||
def unregister(self, console: BecConsole):
|
||||
"""
|
||||
@@ -101,13 +207,21 @@ class BecConsoleRegistry:
|
||||
del self._consoles[console_id]
|
||||
if (term_info := self._terminal_registry.get(terminal_id)) is None:
|
||||
return
|
||||
detached = console._detach_terminal_widget(term_info.instance)
|
||||
if console_id in term_info.registered_console_ids:
|
||||
term_info.registered_console_ids.remove(console_id)
|
||||
if term_info.owner_console_id == console_id:
|
||||
term_info.owner_console_id = None
|
||||
if not term_info.registered_console_ids:
|
||||
term_info.instance.deleteLater()
|
||||
if term_info.persist_session and self._is_valid_qobject(term_info.instance):
|
||||
self._park_terminal(term_info, console, avoid_console=True)
|
||||
logger.info(f"Unregistered console {console_id} for terminal {terminal_id}")
|
||||
return
|
||||
|
||||
self._delete_terminal_info(term_info)
|
||||
del self._terminal_registry[terminal_id]
|
||||
elif detached:
|
||||
self._park_terminal(term_info, console, avoid_console=True)
|
||||
|
||||
logger.info(f"Unregistered console {console_id} for terminal {terminal_id}")
|
||||
|
||||
@@ -118,6 +232,8 @@ class BecConsoleRegistry:
|
||||
if (info := self._terminal_registry.get(console.terminal_id)) is None:
|
||||
logger.warning(f"Console {console.console_id} references an unknown terminal!")
|
||||
return False
|
||||
if not self._is_valid_qobject(info.instance):
|
||||
return False
|
||||
return info.owner_console_id == console.console_id
|
||||
|
||||
def take_ownership(self, console: BecConsole) -> BecTerminal | None:
|
||||
@@ -132,14 +248,19 @@ class BecConsoleRegistry:
|
||||
console_id, terminal_id = console.console_id, console.terminal_id
|
||||
|
||||
if terminal_id not in self._terminal_registry:
|
||||
logger.warning(f"Terminal {terminal_id} not found in registry")
|
||||
return None
|
||||
self.register(console)
|
||||
|
||||
instance_info = self._terminal_registry[terminal_id]
|
||||
if not self._is_valid_qobject(instance_info.instance):
|
||||
self._replace_terminal(instance_info, console)
|
||||
if (old_owner_console_ide := instance_info.owner_console_id) is not None:
|
||||
if (old_owner := self._consoles.get(old_owner_console_ide)) is not None:
|
||||
if (
|
||||
old_owner_console_ide != console_id
|
||||
and (old_owner := self._consoles.get(old_owner_console_ide)) is not None
|
||||
):
|
||||
old_owner.yield_ownership() # call this on the old owner to make sure it is updated
|
||||
instance_info.owner_console_id = console_id
|
||||
instance_info.registered_console_ids.add(console_id)
|
||||
logger.info(f"Transferred ownership of terminal {terminal_id} to {console_id}")
|
||||
return instance_info.instance
|
||||
|
||||
@@ -159,6 +280,11 @@ class BecConsoleRegistry:
|
||||
return None
|
||||
|
||||
instance_info = self._terminal_registry[terminal_id]
|
||||
if not self._is_valid_qobject(instance_info.instance):
|
||||
if instance_info.owner_console_id == console_id:
|
||||
self._replace_terminal(instance_info, console)
|
||||
else:
|
||||
return None
|
||||
if instance_info.owner_console_id == console_id:
|
||||
return instance_info.instance
|
||||
|
||||
@@ -181,7 +307,24 @@ class BecConsoleRegistry:
|
||||
logger.debug(f"But it was not the owner, which was {term_info.owner_console_id}!")
|
||||
return
|
||||
term_info.owner_console_id = None
|
||||
term_info.instance.setParent(None)
|
||||
console._detach_terminal_widget(term_info.instance)
|
||||
self._park_terminal(term_info, console)
|
||||
|
||||
def should_initialize(self, console: BecConsole) -> bool:
|
||||
"""Return true if the console should send its startup command to the terminal."""
|
||||
info = self._terminal_registry.get(console.terminal_id)
|
||||
if info is None:
|
||||
return False
|
||||
return (
|
||||
info.owner_console_id == console.console_id
|
||||
and not info.initialized
|
||||
and self._is_valid_qobject(info.instance)
|
||||
)
|
||||
|
||||
def mark_initialized(self, console: BecConsole) -> None:
|
||||
info = self._terminal_registry.get(console.terminal_id)
|
||||
if info is not None and info.owner_console_id == console.console_id:
|
||||
info.initialized = True
|
||||
|
||||
def owner_is_visible(self, term_id: str) -> bool:
|
||||
"""
|
||||
@@ -193,7 +336,11 @@ class BecConsoleRegistry:
|
||||
bool: True if the owner is visible, False otherwise.
|
||||
"""
|
||||
instance_info = self._terminal_registry.get(term_id)
|
||||
if instance_info is None or instance_info.owner_console_id is None:
|
||||
if (
|
||||
instance_info is None
|
||||
or instance_info.owner_console_id is None
|
||||
or not self._is_valid_qobject(instance_info.instance)
|
||||
):
|
||||
return False
|
||||
|
||||
if (owner := self._consoles.get(instance_info.owner_console_id)) is None:
|
||||
@@ -225,6 +372,7 @@ class BecConsole(BECWidget, QWidget):
|
||||
|
||||
PLUGIN = True
|
||||
ICON_NAME = "terminal"
|
||||
persist_terminal_session = False
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -272,8 +420,7 @@ class BecConsole(BECWidget, QWidget):
|
||||
# will create a new terminal instance if there isn't already one for this ID
|
||||
_bec_console_registry.register(self)
|
||||
self._infer_mode()
|
||||
if self.startup_cmd:
|
||||
self.write(self.startup_cmd, True) # will have no effect if not the owner
|
||||
self._ensure_startup_started()
|
||||
|
||||
def _infer_mode(self):
|
||||
self.term = _bec_console_registry.try_get_term(self)
|
||||
@@ -295,8 +442,9 @@ class BecConsole(BECWidget, QWidget):
|
||||
match mode:
|
||||
case ConsoleMode.ACTIVE:
|
||||
if self.term:
|
||||
if self.term not in (self._term_layout.children()):
|
||||
if self._term_layout.indexOf(self.term) == -1: # type: ignore[arg-type]
|
||||
self._term_layout.addWidget(self.term) # type: ignore # BecTerminal is QWidget
|
||||
self.term.show() # type: ignore[attr-defined]
|
||||
self._stacked_layout.setCurrentIndex(0)
|
||||
self._mode = mode
|
||||
else:
|
||||
@@ -335,6 +483,29 @@ class BecConsole(BECWidget, QWidget):
|
||||
if self.term:
|
||||
self.term.write(data, send_return)
|
||||
|
||||
def _ensure_startup_started(self):
|
||||
if not self.startup_cmd or not _bec_console_registry.should_initialize(self):
|
||||
return
|
||||
self.write(self.startup_cmd, True)
|
||||
_bec_console_registry.mark_initialized(self)
|
||||
|
||||
def _detach_terminal_widget(self, term: BecTerminal | None) -> bool:
|
||||
if term is None or not BecConsoleRegistry._is_valid_qobject(term):
|
||||
if self.term is term:
|
||||
self.term = None
|
||||
return False
|
||||
|
||||
is_child = self.isAncestorOf(term) # type: ignore[arg-type]
|
||||
if self._term_layout.indexOf(term) != -1: # type: ignore[arg-type]
|
||||
self._term_layout.removeWidget(term) # type: ignore[arg-type]
|
||||
is_child = True
|
||||
if is_child:
|
||||
term.hide() # type: ignore[attr-defined]
|
||||
term.setParent(None) # type: ignore[attr-defined]
|
||||
if self.term is term:
|
||||
self.term = None
|
||||
return is_child
|
||||
|
||||
def take_terminal_ownership(self):
|
||||
"""
|
||||
Take ownership of a web instance from the registry. This will transfer the instance
|
||||
@@ -343,6 +514,7 @@ class BecConsole(BECWidget, QWidget):
|
||||
# Get the instance from registry
|
||||
self.term = _bec_console_registry.take_ownership(self)
|
||||
self._infer_mode()
|
||||
self._ensure_startup_started()
|
||||
if self._mode == ConsoleMode.ACTIVE:
|
||||
logger.debug(f"Widget {self.gui_id} took ownership of instance {self.terminal_id}")
|
||||
|
||||
@@ -383,6 +555,7 @@ class BECShell(BecConsole):
|
||||
"""
|
||||
|
||||
ICON_NAME = "hub"
|
||||
persist_terminal_session = True
|
||||
|
||||
def __init__(self, parent=None, config=None, client=None, gui_id=None, **kwargs):
|
||||
super().__init__(
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from qtpy.QtCore import Qt
|
||||
import shiboken6
|
||||
from qtpy.QtCore import QEvent, QEventLoop, Qt
|
||||
from qtpy.QtGui import QHideEvent, QShowEvent
|
||||
from qtpy.QtTest import QTest
|
||||
from qtpy.QtWidgets import QApplication, QWidget
|
||||
|
||||
import bec_widgets.widgets.editors.bec_console.bec_console as bec_console_module
|
||||
from bec_widgets.widgets.editors.bec_console.bec_console import (
|
||||
BecConsole,
|
||||
BECShell,
|
||||
@@ -15,6 +18,20 @@ from bec_widgets.widgets.editors.bec_console.bec_console import (
|
||||
from .client_mocks import mocked_client
|
||||
|
||||
|
||||
def process_deferred_deletes():
|
||||
app = QApplication.instance()
|
||||
app.sendPostedEvents(None, QEvent.DeferredDelete)
|
||||
app.processEvents(QEventLoop.AllEvents)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def clean_bec_console_registry():
|
||||
_bec_console_registry.clear()
|
||||
yield
|
||||
_bec_console_registry.clear()
|
||||
process_deferred_deletes()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def console_widget(qtbot):
|
||||
"""Create a BecConsole widget."""
|
||||
@@ -126,3 +143,110 @@ def test_is_owner(console_widget: BecConsole):
|
||||
assert not _bec_console_registry.is_owner(mock_console)
|
||||
mock_console.terminal_id = console_widget.terminal_id
|
||||
assert not _bec_console_registry.is_owner(mock_console)
|
||||
|
||||
|
||||
def test_closing_active_console_keeps_terminal_valid_for_remaining_console(qtbot):
|
||||
widget1 = BecConsole(client=mocked_client, gui_id="close_owner", terminal_id="shared_close")
|
||||
widget2 = BecConsole(client=mocked_client, gui_id="remaining", terminal_id="shared_close")
|
||||
qtbot.addWidget(widget2)
|
||||
|
||||
widget1.take_terminal_ownership()
|
||||
term = widget1.term
|
||||
assert term is not None
|
||||
|
||||
widget1.close()
|
||||
widget1.deleteLater()
|
||||
process_deferred_deletes()
|
||||
|
||||
assert shiboken6.isValid(term)
|
||||
widget2.take_terminal_ownership()
|
||||
|
||||
assert widget2.term is term
|
||||
assert widget2._mode == ConsoleMode.ACTIVE
|
||||
|
||||
|
||||
def test_active_console_detaches_terminal_before_destruction(qtbot):
|
||||
widget1 = BecConsole(client=mocked_client, gui_id="owner", terminal_id="shared_detach")
|
||||
widget2 = BecConsole(client=mocked_client, gui_id="survivor", terminal_id="shared_detach")
|
||||
qtbot.addWidget(widget1)
|
||||
qtbot.addWidget(widget2)
|
||||
|
||||
widget1.take_terminal_ownership()
|
||||
term = widget1.term
|
||||
assert term is not None
|
||||
assert widget1.isAncestorOf(term)
|
||||
|
||||
widget1.close()
|
||||
|
||||
assert shiboken6.isValid(term)
|
||||
assert not widget1.isAncestorOf(term)
|
||||
assert term.parent() is widget2._term_holder
|
||||
|
||||
|
||||
def test_bec_shell_terminal_persists_after_last_shell_unregisters(qtbot):
|
||||
shell = BECShell(gui_id="bec_shell_persistent")
|
||||
qtbot.addWidget(shell)
|
||||
term = shell.term
|
||||
assert term is not None
|
||||
|
||||
_bec_console_registry.unregister(shell)
|
||||
|
||||
info = _bec_console_registry._terminal_registry["bec_shell"]
|
||||
assert info.registered_console_ids == set()
|
||||
assert info.owner_console_id is None
|
||||
assert info.persist_session is True
|
||||
assert info.instance is term
|
||||
assert shiboken6.isValid(term)
|
||||
|
||||
|
||||
def test_new_bec_shell_claims_preserved_terminal(qtbot):
|
||||
shell1 = BECShell(gui_id="bec_shell_first")
|
||||
term = shell1.term
|
||||
assert term is not None
|
||||
|
||||
shell1.close()
|
||||
shell1.deleteLater()
|
||||
process_deferred_deletes()
|
||||
|
||||
assert "bec_shell" in _bec_console_registry._terminal_registry
|
||||
assert shiboken6.isValid(term)
|
||||
|
||||
shell2 = BECShell(gui_id="bec_shell_second")
|
||||
qtbot.addWidget(shell2)
|
||||
shell2.showEvent(QShowEvent())
|
||||
|
||||
assert shell2.term is term
|
||||
assert shell2._mode == ConsoleMode.ACTIVE
|
||||
|
||||
|
||||
def test_persistent_bec_shell_sends_startup_command_once(qtbot, monkeypatch):
|
||||
class RecordingTerminal(QWidget):
|
||||
writes = []
|
||||
|
||||
def write(self, text: str, add_newline: bool = True):
|
||||
self.writes.append((text, add_newline))
|
||||
|
||||
monkeypatch.setattr(bec_console_module, "_BecTermClass", RecordingTerminal)
|
||||
|
||||
shell1 = BECShell(gui_id="bec_shell_startup_first")
|
||||
shell1.close()
|
||||
shell1.deleteLater()
|
||||
process_deferred_deletes()
|
||||
|
||||
shell2 = BECShell(gui_id="bec_shell_startup_second")
|
||||
qtbot.addWidget(shell2)
|
||||
shell2.showEvent(QShowEvent())
|
||||
|
||||
assert len(RecordingTerminal.writes) == 1
|
||||
assert RecordingTerminal.writes[0][0].startswith("bec ")
|
||||
assert RecordingTerminal.writes[0][1] is True
|
||||
|
||||
|
||||
def test_plain_console_terminal_removed_after_last_unregister(qtbot):
|
||||
widget = BecConsole(client=mocked_client, gui_id="plain_console", terminal_id="plain_terminal")
|
||||
qtbot.addWidget(widget)
|
||||
|
||||
assert "plain_terminal" in _bec_console_registry._terminal_registry
|
||||
_bec_console_registry.unregister(widget)
|
||||
|
||||
assert "plain_terminal" not in _bec_console_registry._terminal_registry
|
||||
|
||||
Reference in New Issue
Block a user