1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-01-01 11:31:19 +01:00
Files
bec_widgets/tests/unit_tests/test_procedure_control.py

119 lines
4.1 KiB
Python

from time import sleep
from typing import Callable
from unittest.mock import MagicMock, patch
import pytest
from bec_lib.messages import ProcedureExecutionMessage, ProcedureRequestMessage
from bec_server.scan_server.procedures.helper import BackendProcedureHelper
from bec_server.scan_server.procedures.manager import ProcedureManager
from bec_server.scan_server.procedures.procedure_registry import register
from bec_server.scan_server.procedures.worker_base import ProcedureWorker
from bec_widgets.widgets.control.procedure_control.procedure_control import (
JobItem,
ProcedureControl,
QueueItem,
)
from .client_mocks import mocked_client
class MockWorker(ProcedureWorker):
def _kill_process(self): ...
def _run_task(self, item):
sleep(0.1)
def _setup_execution_environment(self): ...
def abort(self): ...
def abort_execution(self, execution_id: str): ...
@pytest.fixture(scope="module", autouse=True)
def register_test_proc():
register("test", lambda: None)
@pytest.fixture
def proc_ctrl_w_helper(qtbot, mocked_client: MagicMock):
proc_ctrl = ProcedureControl(client=mocked_client)
qtbot.addWidget(proc_ctrl)
with patch(
"bec_server.scan_server.procedures.manager.RedisConnector",
lambda _: proc_ctrl.client.connector,
):
manager = ProcedureManager(MagicMock(), MockWorker)
yield proc_ctrl, BackendProcedureHelper(proc_ctrl.client.connector)
manager.shutdown()
@pytest.fixture
def req_msg():
return ProcedureRequestMessage(identifier="test")
@pytest.fixture
def exec_msg():
return lambda: ProcedureExecutionMessage(identifier="test", queue="test")
def test_add_proc(
qtbot,
proc_ctrl_w_helper: tuple[ProcedureControl, BackendProcedureHelper],
req_msg: ProcedureRequestMessage,
):
proc_ctrl, helper = proc_ctrl_w_helper
assert proc_ctrl._active_queues.childCount() == 0
helper.request.procedure(req_msg)
qtbot.waitUntil(lambda: proc_ctrl._active_queues.childCount() != 0, timeout=500)
def test_abort(
qtbot,
proc_ctrl_w_helper: tuple[ProcedureControl, BackendProcedureHelper],
req_msg: ProcedureRequestMessage,
):
proc_ctrl, helper = proc_ctrl_w_helper
assert proc_ctrl._active_queues.childCount() == 0
helper.request.procedure(req_msg)
qtbot.waitUntil(lambda: proc_ctrl._active_queues.childCount() != 0, timeout=500)
assert proc_ctrl._unhandled_queues.childCount() == 0
queue: QueueItem = proc_ctrl._active_queues.child(0)
queue.child(0).actions_widget.layout().itemAt(0).widget().click()
qtbot.waitUntil(lambda: proc_ctrl._active_queues.childCount() == 0, timeout=500)
qtbot.waitUntil(lambda: proc_ctrl._unhandled_queues.childCount() != 0, timeout=500)
def test_delete(
qtbot,
proc_ctrl_w_helper: tuple[ProcedureControl, BackendProcedureHelper],
exec_msg: Callable[[], ProcedureExecutionMessage],
):
proc_ctrl, helper = proc_ctrl_w_helper
[helper.push.unhandled("test", exec_msg()) for _ in range(3)]
qtbot.waitUntil(lambda: proc_ctrl._unhandled_queues.childCount() != 0, timeout=500)
queue: QueueItem = proc_ctrl._unhandled_queues.child(0)
assert queue.childCount() == 3
queue.actions_widget.layout().itemAt(0).widget().click()
qtbot.waitUntil(lambda: helper.get.unhandled_queue("test") == [], timeout=500)
qtbot.waitUntil(lambda: proc_ctrl._unhandled_queues.childCount() == 0, timeout=500)
def test_resubmit(
qtbot,
proc_ctrl_w_helper: tuple[ProcedureControl, BackendProcedureHelper],
exec_msg: Callable[[], ProcedureExecutionMessage],
):
proc_ctrl, helper = proc_ctrl_w_helper
[helper.push.unhandled("test", exec_msg()) for _ in range(3)]
qtbot.waitUntil(lambda: proc_ctrl._unhandled_queues.childCount() != 0, timeout=500)
queue: QueueItem = proc_ctrl._unhandled_queues.child(0)
assert queue.childCount() == 3
assert proc_ctrl._active_queues.childCount() == 0
queue.child(0).actions_widget.layout().itemAt(1).widget().click()
qtbot.waitUntil(lambda: len(helper.get.unhandled_queue("test")) == 2, timeout=500)
qtbot.waitUntil(lambda: proc_ctrl._active_queues.childCount() != 0, timeout=500)