diff --git a/bec_widgets/cli/client.py b/bec_widgets/cli/client.py index 622f8cb5..9bf4a21c 100644 --- a/bec_widgets/cli/client.py +++ b/bec_widgets/cli/client.py @@ -4787,6 +4787,26 @@ class PositionerGroup(RPCBase): """ +class ProcedureControl(RPCBase): + @rpc_call + def remove(self): + """ + Cleanup the BECConnector + """ + + @rpc_call + def attach(self): + """ + None + """ + + @rpc_call + def detach(self): + """ + Detach the widget from its parent dock widget (if widget is in the dock), making it a floating widget. + """ + + class RectangularROI(RPCBase): """Defines a rectangular Region of Interest (ROI) with additional functionality.""" diff --git a/bec_widgets/widgets/control/procedure_control/__init__.py b/bec_widgets/widgets/control/procedure_control/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/bec_widgets/widgets/control/procedure_control/procedure_control.py b/bec_widgets/widgets/control/procedure_control/procedure_control.py new file mode 100644 index 00000000..cb123c9b --- /dev/null +++ b/bec_widgets/widgets/control/procedure_control/procedure_control.py @@ -0,0 +1,273 @@ +import operator +from functools import partial, reduce +from typing import Literal + +from bec_lib.endpoints import MessageEndpoints +from bec_lib.logger import bec_logger +from bec_lib.messages import ( + ProcedureExecutionMessage, + ProcedureQNotifMessage, + ProcedureRequestMessage, +) +from bec_qthemes._icon.material_icons import material_icon +from bec_server.scan_server.procedures.helper import FrontendProcedureHelper +from pydantic import BaseModel, ConfigDict +from qtpy.QtWidgets import ( + QHBoxLayout, + QToolButton, + QTreeWidget, + QTreeWidgetItem, + QVBoxLayout, + QWidget, +) + +from bec_widgets.utils.bec_connector import ConnectionConfig +from bec_widgets.utils.bec_widget import BECWidget +from bec_widgets.utils.error_popups import SafeSlot + +logger = bec_logger.logger + +_icon = partial(material_icon, size=(20, 20), convert_to_pixmap=False, filled=False) + +_ActionTypes = Literal["abort", "delete", "resubmit"] + + +class _BaseConfig(BaseModel): + model_config = ConfigDict(arbitrary_types_allowed=True) + actions: set[_ActionTypes] + child_actions: set[_ActionTypes] + actions_column: int = 3 + params_column: int = 2 + helper: FrontendProcedureHelper + tree: QTreeWidget + active_queue: bool = False + + +class _QueueConfig(BaseModel): + queue: str + base: _BaseConfig + msgs: list[ProcedureExecutionMessage] + + +class _ItemConfig(BaseModel): + base: _BaseConfig + msg: ProcedureExecutionMessage + + +class _ActionItem(QTreeWidgetItem): + ABORT_BUTTON_COLOR = DELETE_BUTTON_COLOR = "#CC181E" + RESUBMIT_BUTTON_COLOR = "#2266BB" + ACTION_TYPE: Literal["parent", "child"] = "child" + + def __init__(self, parent, strings: list[str], config: _BaseConfig): + super().__init__(parent, strings) + self._tree = config.tree + self._config = config + self._init_actions() + + def _init_actions(self): + """Create the actions widget in the given column.""" + self.actions_widget = QWidget() + actions_layout = QHBoxLayout(self.actions_widget) + actions_layout.setContentsMargins(0, 0, 0, 0) + actions_layout.setSpacing(0) + + def button(icon, color, slot, tooltip): + button = QToolButton(self.actions_widget) + setattr(self, icon, button) + icon = _icon(icon, color=color) + button.setIcon(icon) + button.clicked.connect(slot) + actions_layout.addWidget(button) + button.setToolTip(tooltip) + + actions = ( + self._config.actions if self.ACTION_TYPE == "parent" else self._config.child_actions + ) + if "abort" in actions: + button("cancel_presentation", self.ABORT_BUTTON_COLOR, self._abort_self, "abort") + if "delete" in actions: + button("delete", self.DELETE_BUTTON_COLOR, self._delete_self, "delete") + if "resubmit" in actions: + button("autorenew", self.RESUBMIT_BUTTON_COLOR, self._resubmit_self, "resubmit") + + self._tree.setItemWidget(self, self._config.actions_column, self.actions_widget) + + @SafeSlot() + def _abort_self(self): ... + @SafeSlot() + def _delete_self(self): ... + @SafeSlot() + def _resubmit_self(self): ... + + +class JobItem(_ActionItem): + def __init__(self, parent, strings: list[str], config: _ItemConfig): + super().__init__(parent, strings, config.base) + self._msg = config.msg + self._init_params_display() + + def _init_params_display(self): + self.setText(self._config.params_column, self._short_params_text()) + self.setToolTip(self._config.params_column, self._long_params_html()) + + def _short_params_text(self): + a, k = self._msg.args_kwargs + args = f"{a}, " if a else "" + kwargs = f"{k}".strip("{}") if k else "" + return args + kwargs + + def _long_params_html(self): + a, k = self._msg.args_kwargs + args = "Positional arguments:
" + ", ".join(str(arg) for arg in a) if a else "" + kwargs = ( + reduce( + operator.add, + (f" {k}: {v}
" for k, v in k.items()), + "Keyword arguments:
", + ) + if k + else "" + ) + return args + kwargs + + @SafeSlot() + def _abort_self(self): + self._config.helper.request.abort_execution(self._msg.execution_id) + + @SafeSlot() + def _delete_self(self): + self._config.helper.request.clear_unhandled_execution(self._msg.execution_id) + + @SafeSlot() + def _resubmit_self(self): + self._config.helper.request.clear_unhandled_execution(self._msg.execution_id) + self._config.helper.request.procedure( + identifier=self._msg.identifier, + queue=self._msg.queue, + args_kwargs=self._msg.args_kwargs, + ) + + +class QueueItem(_ActionItem): + ACTION_TYPE = "parent" + + def __init__(self, parent, strings: list[str], config: _QueueConfig): + super().__init__(parent, strings, config.base) + self._queue = config.queue + self.update(config.msgs) + + def clear(self): + for i in reversed(range(self.childCount())): + self.removeChild(self.child(i)) + + def update(self, msgs: list[ProcedureExecutionMessage]): + if self._config.active_queue: + active = self._config.helper.get.running_procedures() + for msg in active: + if msg.queue == self._queue: + JobItem( + self, [msg.identifier, "RUNNING"], _ItemConfig(base=self._config, msg=msg) + ) + for msg in msgs: + JobItem( + self, + [msg.identifier, "PENDING" if self._config.active_queue else "ABORTED"], + _ItemConfig(base=self._config, msg=msg), + ) + + @SafeSlot() + def _abort_self(self): + self._config.helper.request.abort_queue(self._queue) + + @SafeSlot() + def _delete_self(self): + self._config.helper.request.clear_unhandled_queue(self._queue) + + +class CategoryItem(QTreeWidgetItem): + def __init__(self, parent, strings: list[str], config: _BaseConfig): + super().__init__(parent, strings) + self._queues: dict[str, QueueItem] = {} + self._tree: QTreeWidget = parent + self._config = config + + def update(self, queue: str, msgs: list[ProcedureExecutionMessage]): + if (queue_item := self._queues.get(queue)) is not None: + queue_item.clear() + queue_item.update(msgs) + if queue_item.childCount() == 0: + self.removeChild(queue_item) + del self._queues[queue] + elif msgs: + self._queues[queue] = QueueItem( + self, [queue], _QueueConfig(base=self._config, queue=queue, msgs=msgs) + ) + + +class ProcedureControl(BECWidget, QWidget): + + def __init__(self, parent=None, client=None, config=None, gui_id: str | None = None, **kwargs): + config = config or ConnectionConfig() + super().__init__(parent=parent, client=client, config=config, gui_id=gui_id, **kwargs) + self._conn = self.bec_dispatcher.client.connector + self._helper = FrontendProcedureHelper(self._conn) + self._setup_ui() + self.bec_dispatcher.connect_slot(self._update, MessageEndpoints.procedure_queue_notif()) + self._init_queues() + + @SafeSlot(ProcedureQNotifMessage, dict) + def _update(self, msg: dict | ProcedureQNotifMessage, _): + msg = ProcedureQNotifMessage.model_validate(msg) + if msg.queue_type == "execution": + cat_to_update = self._active_queues + read_queue = self._helper.get.exec_queue + else: + cat_to_update = self._unhandled_queues + read_queue = self._helper.get.unhandled_queue + cat_to_update.update(msg.queue_name, read_queue(msg.queue_name)) + + def _setup_ui(self): + self._layout = QVBoxLayout() + self.setLayout(self._layout) + + self._content = QTreeWidget() + self._content.setAlternatingRowColors(True) + self._content.setHeaderLabels(["name", "status", "params", "actions"]) + self._layout.addWidget(self._content) + self._content.header().resizeSection(0, 250) + + config = partial(_BaseConfig, helper=self._helper, tree=self._content, actions_column=3) + + self._active_queues = CategoryItem( + self._content, + ["active queues"], + config(actions={"abort"}, child_actions={"abort"}, active_queue=True), + ) + self._content.addTopLevelItem(self._active_queues) + + self._unhandled_queues = CategoryItem( + self._content, + ["unhandled queues"], + config(actions={"delete"}, child_actions={"delete", "resubmit"}), + ) + self._content.addTopLevelItem(self._unhandled_queues) + + def _init_queues(self): + for queue in self._helper.get.active_and_pending_queue_names(): + self._active_queues.update(queue, self._helper.get.exec_queue(queue)) + for queue in self._helper.get.queue_names("unhandled"): + self._unhandled_queues.update(queue, self._helper.get.unhandled_queue(queue)) + + +if __name__ == "__main__": + import sys + + from qtpy.QtWidgets import QApplication + + app = QApplication(sys.argv) + widget = ProcedureControl() + widget.setFixedWidth(800) + widget.setFixedHeight(800) + widget.show() + sys.exit(app.exec()) diff --git a/tests/unit_tests/client_mocks.py b/tests/unit_tests/client_mocks.py index 613119e0..e7c51c34 100644 --- a/tests/unit_tests/client_mocks.py +++ b/tests/unit_tests/client_mocks.py @@ -6,10 +6,10 @@ import fakeredis import pytest from bec_lib.bec_service import messages from bec_lib.endpoints import MessageEndpoints -from bec_lib.redis_connector import RedisConnector from bec_lib.scan_history import ScanHistory from bec_widgets.tests.utils import DEVICES, DMMock, FakePositioner, Positioner +from bec_widgets.utils.bec_dispatcher import QtRedisConnector def fake_redis_server(host, port, **kwargs): @@ -19,7 +19,7 @@ def fake_redis_server(host, port, **kwargs): @pytest.fixture(scope="function") def mocked_client(bec_dispatcher): - connector = RedisConnector("localhost:1", redis_cls=fake_redis_server) + connector = QtRedisConnector("localhost:1", redis_cls=fake_redis_server) # Create a MagicMock object client = MagicMock() # TODO change to real BECClient diff --git a/tests/unit_tests/conftest.py b/tests/unit_tests/conftest.py index 6f81a4cf..8bd061d1 100644 --- a/tests/unit_tests/conftest.py +++ b/tests/unit_tests/conftest.py @@ -1,5 +1,6 @@ import json import time +from typing import Callable import h5py import numpy as np diff --git a/tests/unit_tests/test_procedure_control.py b/tests/unit_tests/test_procedure_control.py new file mode 100644 index 00000000..5e873220 --- /dev/null +++ b/tests/unit_tests/test_procedure_control.py @@ -0,0 +1,109 @@ +from time import sleep +from typing import Callable +from unittest.mock import MagicMock, patch + +import pytest +from bec_lib.messages import ProcedureExecutionMessage, ProcedureRequestMessage +from bec_server.scan_server.procedures.helper import BackendProcedureHelper +from bec_server.scan_server.procedures.manager import ProcedureManager +from bec_server.scan_server.procedures.procedure_registry import register +from bec_server.scan_server.procedures.worker_base import ProcedureWorker + +from bec_widgets.widgets.control.procedure_control.procedure_control import ( + ProcedureControl, + QueueItem, +) + +from .client_mocks import mocked_client + + +class MockWorker(ProcedureWorker): + def _kill_process(self): ... + + def _run_task(self, item): + sleep(0.1) + + def _setup_execution_environment(self): ... + + def abort(self): ... + + def abort_execution(self, execution_id: str): ... + + +@pytest.fixture(scope="module", autouse=True) +def register_test_proc(): + register("test", lambda: None) + + +@pytest.fixture +def proc_ctrl_w_helper(qtbot, mocked_client: MagicMock): + proc_ctrl = ProcedureControl(client=mocked_client) + qtbot.addWidget(proc_ctrl) + with patch( + "bec_server.scan_server.procedures.manager.RedisConnector", + lambda _: proc_ctrl.client.connector, + ): + manager = ProcedureManager(MagicMock(), MockWorker) + yield proc_ctrl, BackendProcedureHelper(proc_ctrl.client.connector) + manager.shutdown() + + +@pytest.fixture +def req_msg(): + return ProcedureRequestMessage(identifier="test") + + +@pytest.fixture +def exec_msg(): + return lambda id: ProcedureExecutionMessage(identifier="test", queue="test", execution_id=id) + + +def test_add_proc(qtbot, proc_ctrl_w_helper: tuple[ProcedureControl, BackendProcedureHelper]): + proc_ctrl, helper = proc_ctrl_w_helper + assert proc_ctrl._active_queues.childCount() == 0 + helper.request.procedure("test") + qtbot.waitUntil(lambda: proc_ctrl._active_queues.childCount() != 0, timeout=500) + + +def test_abort(qtbot, proc_ctrl_w_helper: tuple[ProcedureControl, BackendProcedureHelper]): + proc_ctrl, helper = proc_ctrl_w_helper + assert proc_ctrl._active_queues.childCount() == 0 + helper.request.procedure("test") + qtbot.waitUntil(lambda: proc_ctrl._active_queues.childCount() != 0, timeout=500) + + assert proc_ctrl._unhandled_queues.childCount() == 0 + queue: QueueItem = proc_ctrl._active_queues.child(0) + queue.child(0).actions_widget.layout().itemAt(0).widget().click() + qtbot.waitUntil(lambda: proc_ctrl._active_queues.childCount() == 0, timeout=500) + qtbot.waitUntil(lambda: proc_ctrl._unhandled_queues.childCount() != 0, timeout=500) + + +def test_delete( + qtbot, + proc_ctrl_w_helper: tuple[ProcedureControl, BackendProcedureHelper], + exec_msg: Callable[[str], ProcedureExecutionMessage], +): + proc_ctrl, helper = proc_ctrl_w_helper + [helper.push.unhandled("test", exec_msg("abcd")) for _ in range(3)] + qtbot.waitUntil(lambda: proc_ctrl._unhandled_queues.childCount() != 0, timeout=500) + queue: QueueItem = proc_ctrl._unhandled_queues.child(0) + assert queue.childCount() == 3 + queue.actions_widget.layout().itemAt(0).widget().click() + qtbot.waitUntil(lambda: helper.get.unhandled_queue("test") == [], timeout=500) + qtbot.waitUntil(lambda: proc_ctrl._unhandled_queues.childCount() == 0, timeout=500) + + +def test_resubmit( + qtbot, + proc_ctrl_w_helper: tuple[ProcedureControl, BackendProcedureHelper], + exec_msg: Callable[[str], ProcedureExecutionMessage], +): + proc_ctrl, helper = proc_ctrl_w_helper + [helper.push.unhandled("test", exec_msg(f"abcd{i}")) for i in range(3)] + qtbot.waitUntil(lambda: proc_ctrl._unhandled_queues.childCount() != 0, timeout=500) + queue: QueueItem = proc_ctrl._unhandled_queues.child(0) + assert queue.childCount() == 3 + assert proc_ctrl._active_queues.childCount() == 0 + queue.child(0).actions_widget.layout().itemAt(1).widget().click() + qtbot.waitUntil(lambda: len(helper.get.unhandled_queue("test")) == 2, timeout=500) + qtbot.waitUntil(lambda: proc_ctrl._active_queues.childCount() != 0, timeout=500)