mirror of
https://github.com/bec-project/bec_widgets.git
synced 2026-01-01 11:31:19 +01:00
119 lines
4.1 KiB
Python
119 lines
4.1 KiB
Python
from time import sleep
|
|
from typing import Callable
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
from bec_lib.messages import ProcedureExecutionMessage, ProcedureRequestMessage
|
|
from bec_server.scan_server.procedures.helper import BackendProcedureHelper
|
|
from bec_server.scan_server.procedures.manager import ProcedureManager
|
|
from bec_server.scan_server.procedures.procedure_registry import register
|
|
from bec_server.scan_server.procedures.worker_base import ProcedureWorker
|
|
|
|
from bec_widgets.widgets.control.procedure_control.procedure_control import (
|
|
JobItem,
|
|
ProcedureControl,
|
|
QueueItem,
|
|
)
|
|
|
|
from .client_mocks import mocked_client
|
|
|
|
|
|
class MockWorker(ProcedureWorker):
|
|
def _kill_process(self): ...
|
|
|
|
def _run_task(self, item):
|
|
sleep(0.1)
|
|
|
|
def _setup_execution_environment(self): ...
|
|
|
|
def abort(self): ...
|
|
|
|
def abort_execution(self, execution_id: str): ...
|
|
|
|
|
|
@pytest.fixture(scope="module", autouse=True)
|
|
def register_test_proc():
|
|
register("test", lambda: None)
|
|
|
|
|
|
@pytest.fixture
|
|
def proc_ctrl_w_helper(qtbot, mocked_client: MagicMock):
|
|
proc_ctrl = ProcedureControl(client=mocked_client)
|
|
qtbot.addWidget(proc_ctrl)
|
|
with patch(
|
|
"bec_server.scan_server.procedures.manager.RedisConnector",
|
|
lambda _: proc_ctrl.client.connector,
|
|
):
|
|
manager = ProcedureManager(MagicMock(), MockWorker)
|
|
yield proc_ctrl, BackendProcedureHelper(proc_ctrl.client.connector)
|
|
manager.shutdown()
|
|
|
|
|
|
@pytest.fixture
|
|
def req_msg():
|
|
return ProcedureRequestMessage(identifier="test")
|
|
|
|
|
|
@pytest.fixture
|
|
def exec_msg():
|
|
return lambda: ProcedureExecutionMessage(identifier="test", queue="test")
|
|
|
|
|
|
def test_add_proc(
|
|
qtbot,
|
|
proc_ctrl_w_helper: tuple[ProcedureControl, BackendProcedureHelper],
|
|
req_msg: ProcedureRequestMessage,
|
|
):
|
|
proc_ctrl, helper = proc_ctrl_w_helper
|
|
assert proc_ctrl._active_queues.childCount() == 0
|
|
helper.request.procedure(req_msg)
|
|
qtbot.waitUntil(lambda: proc_ctrl._active_queues.childCount() != 0, timeout=500)
|
|
|
|
|
|
def test_abort(
|
|
qtbot,
|
|
proc_ctrl_w_helper: tuple[ProcedureControl, BackendProcedureHelper],
|
|
req_msg: ProcedureRequestMessage,
|
|
):
|
|
proc_ctrl, helper = proc_ctrl_w_helper
|
|
assert proc_ctrl._active_queues.childCount() == 0
|
|
helper.request.procedure(req_msg)
|
|
qtbot.waitUntil(lambda: proc_ctrl._active_queues.childCount() != 0, timeout=500)
|
|
|
|
assert proc_ctrl._unhandled_queues.childCount() == 0
|
|
queue: QueueItem = proc_ctrl._active_queues.child(0)
|
|
queue.child(0).actions_widget.layout().itemAt(0).widget().click()
|
|
qtbot.waitUntil(lambda: proc_ctrl._active_queues.childCount() == 0, timeout=500)
|
|
qtbot.waitUntil(lambda: proc_ctrl._unhandled_queues.childCount() != 0, timeout=500)
|
|
|
|
|
|
def test_delete(
|
|
qtbot,
|
|
proc_ctrl_w_helper: tuple[ProcedureControl, BackendProcedureHelper],
|
|
exec_msg: Callable[[], ProcedureExecutionMessage],
|
|
):
|
|
proc_ctrl, helper = proc_ctrl_w_helper
|
|
[helper.push.unhandled("test", exec_msg()) for _ in range(3)]
|
|
qtbot.waitUntil(lambda: proc_ctrl._unhandled_queues.childCount() != 0, timeout=500)
|
|
queue: QueueItem = proc_ctrl._unhandled_queues.child(0)
|
|
assert queue.childCount() == 3
|
|
queue.actions_widget.layout().itemAt(0).widget().click()
|
|
qtbot.waitUntil(lambda: helper.get.unhandled_queue("test") == [], timeout=500)
|
|
qtbot.waitUntil(lambda: proc_ctrl._unhandled_queues.childCount() == 0, timeout=500)
|
|
|
|
|
|
def test_resubmit(
|
|
qtbot,
|
|
proc_ctrl_w_helper: tuple[ProcedureControl, BackendProcedureHelper],
|
|
exec_msg: Callable[[], ProcedureExecutionMessage],
|
|
):
|
|
proc_ctrl, helper = proc_ctrl_w_helper
|
|
[helper.push.unhandled("test", exec_msg()) for _ in range(3)]
|
|
qtbot.waitUntil(lambda: proc_ctrl._unhandled_queues.childCount() != 0, timeout=500)
|
|
queue: QueueItem = proc_ctrl._unhandled_queues.child(0)
|
|
assert queue.childCount() == 3
|
|
assert proc_ctrl._active_queues.childCount() == 0
|
|
queue.child(0).actions_widget.layout().itemAt(1).widget().click()
|
|
qtbot.waitUntil(lambda: len(helper.get.unhandled_queue("test")) == 2, timeout=500)
|
|
qtbot.waitUntil(lambda: proc_ctrl._active_queues.childCount() != 0, timeout=500)
|