0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-14 11:41:49 +02:00

feat(queue): BECQueue controls extended with Resume, Stop, Abort, Reset buttons

This commit is contained in:
2024-08-29 17:01:47 +02:00
committed by wyzula_j
parent df5eff3147
commit 0d7c10e670
9 changed files with 201 additions and 28 deletions

View File

@ -2,10 +2,13 @@
from unittest.mock import MagicMock, patch
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_lib.messages import ScanQueueStatusMessage
from bec_widgets.widgets.dock import BECDock, BECDockArea
from .client_mocks import mocked_client
from .test_bec_queue import bec_queue_msg_full
@pytest.fixture
@ -135,7 +138,10 @@ def test_toolbar_add_device_positioner_box(bec_dock_area):
)
def test_toolbar_add_utils_queue(bec_dock_area):
def test_toolbar_add_utils_queue(bec_dock_area, bec_queue_msg_full):
bec_dock_area.client.connector.set_and_publish(
MessageEndpoints.scan_queue_status(), bec_queue_msg_full
)
bec_dock_area.toolbar.widgets["menu_utils"].widgets["queue"].trigger()
assert "queue_1" in bec_dock_area.panels
assert bec_dock_area.panels["queue_1"].widgets[0].config.widget_class == "BECQueue"

View File

@ -89,7 +89,7 @@ def bec_queue_msg_full():
@pytest.fixture
def bec_queue(qtbot, mocked_client):
widget = BECQueue(client=mocked_client)
widget = BECQueue(client=mocked_client, refresh_upon_start=False)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
@ -109,3 +109,36 @@ def test_bec_queue_empty(bec_queue):
assert bec_queue.table.item(0, 0).text() == ""
assert bec_queue.table.item(0, 1).text() == ""
assert bec_queue.table.item(0, 2).text() == ""
def test_queue_abort(bec_queue, bec_queue_msg_full):
# test if abort buttons are not leaking, so making 3 times the same abort request and fixture should check the leak of the buttons automatically
bec_queue.update_queue(bec_queue_msg_full.content, {})
assert bec_queue.table.rowCount() == 1
assert bec_queue.table.item(0, 0).text() == "1289"
assert bec_queue.table.item(0, 1).text() == "line_scan"
assert bec_queue.table.item(0, 2).text() == "COMPLETED"
abort_button = bec_queue.table.cellWidget(0, 3)
abort_button.button.click()
bec_queue.update_queue(bec_queue_msg_full.content, {})
assert bec_queue.table.rowCount() == 1
assert bec_queue.table.item(0, 0).text() == "1289"
assert bec_queue.table.item(0, 1).text() == "line_scan"
assert bec_queue.table.item(0, 2).text() == "COMPLETED"
abort_button = bec_queue.table.cellWidget(0, 3)
abort_button.button.click()
bec_queue.update_queue(bec_queue_msg_full.content, {})
assert bec_queue.table.rowCount() == 1
assert bec_queue.table.item(0, 0).text() == "1289"
assert bec_queue.table.item(0, 1).text() == "line_scan"
assert bec_queue.table.item(0, 2).text() == "COMPLETED"
abort_button = bec_queue.table.cellWidget(0, 3)
abort_button.button.click()