mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-14 03:31:50 +02:00
145 lines
5.8 KiB
Python
145 lines
5.8 KiB
Python
import pytest
|
|
from bec_lib import messages
|
|
|
|
from bec_widgets.widgets.services.bec_queue.bec_queue import BECQueue
|
|
|
|
from .client_mocks import mocked_client
|
|
|
|
|
|
@pytest.fixture
|
|
def bec_queue_msg_full():
|
|
content = {
|
|
"primary": {
|
|
"info": [
|
|
{
|
|
"active_request_block": None,
|
|
"is_scan": [True],
|
|
"queue_id": "600163fc-5e56-4901-af25-14e9ee76817c",
|
|
"request_blocks": [
|
|
{
|
|
"RID": "89a76021-28c0-4297-828e-74ae40b941e5",
|
|
"content": {
|
|
"parameter": {
|
|
"args": {"samx": [-0.1, 0.1]},
|
|
"kwargs": {
|
|
"exp_time": 0.5,
|
|
"relative": True,
|
|
"steps": 20,
|
|
"system_config": {
|
|
"file_directory": None,
|
|
"file_suffix": None,
|
|
},
|
|
},
|
|
},
|
|
"queue": "primary",
|
|
"scan_type": "line_scan",
|
|
},
|
|
"is_scan": True,
|
|
"metadata": {
|
|
"RID": "89a76021-28c0-4297-828e-74ae40b941e5",
|
|
"file_directory": None,
|
|
"file_suffix": None,
|
|
"user_metadata": {"sample_name": "testA"},
|
|
},
|
|
"msg": messages.ScanQueueMessage(
|
|
metadata={
|
|
"file_suffix": None,
|
|
"file_directory": None,
|
|
"user_metadata": {"sample_name": "testA"},
|
|
"RID": "89a76021-28c0-4297-828e-74ae40b941e5",
|
|
},
|
|
scan_type="line_scan",
|
|
parameter={
|
|
"args": {"samx": [-0.1, 0.1]},
|
|
"kwargs": {
|
|
"steps": 20,
|
|
"exp_time": 0.5,
|
|
"relative": True,
|
|
"system_config": {
|
|
"file_suffix": None,
|
|
"file_directory": None,
|
|
},
|
|
},
|
|
},
|
|
queue="primary",
|
|
),
|
|
"readout_priority": {
|
|
"async": [],
|
|
"baseline": [],
|
|
"monitored": ["samx"],
|
|
"on_request": [],
|
|
},
|
|
"report_instructions": [{"scan_progress": 20}],
|
|
"scan_id": "2d704cc3-c172-404c-866d-608ce09fce40",
|
|
"scan_motors": ["samx"],
|
|
"scan_number": 1289,
|
|
}
|
|
],
|
|
"scan_id": ["2d704cc3-c172-404c-866d-608ce09fce40"],
|
|
"scan_number": [1289],
|
|
"status": "COMPLETED",
|
|
}
|
|
],
|
|
"status": "RUNNING",
|
|
}
|
|
}
|
|
msg = messages.ScanQueueStatusMessage(metadata={}, queue=content)
|
|
return msg
|
|
|
|
|
|
@pytest.fixture
|
|
def bec_queue(qtbot, mocked_client):
|
|
widget = BECQueue(client=mocked_client, refresh_upon_start=False)
|
|
qtbot.addWidget(widget)
|
|
qtbot.waitExposed(widget)
|
|
yield widget
|
|
|
|
|
|
def test_bec_queue(bec_queue, bec_queue_msg_full):
|
|
bec_queue.update_queue(bec_queue_msg_full.content, {})
|
|
assert bec_queue.table.rowCount() == 1
|
|
assert bec_queue.table.item(0, 0).text() == "1289"
|
|
assert bec_queue.table.item(0, 1).text() == "line_scan"
|
|
assert bec_queue.table.item(0, 2).text() == "COMPLETED"
|
|
|
|
|
|
def test_bec_queue_empty(bec_queue):
|
|
bec_queue.update_queue({}, {})
|
|
assert bec_queue.table.rowCount() == 1
|
|
assert bec_queue.table.item(0, 0).text() == ""
|
|
assert bec_queue.table.item(0, 1).text() == ""
|
|
assert bec_queue.table.item(0, 2).text() == ""
|
|
|
|
|
|
def test_queue_abort(bec_queue, bec_queue_msg_full):
|
|
# test if abort buttons are not leaking, so making 3 times the same abort request and fixture should check the leak of the buttons automatically
|
|
|
|
bec_queue.update_queue(bec_queue_msg_full.content, {})
|
|
assert bec_queue.table.rowCount() == 1
|
|
assert bec_queue.table.item(0, 0).text() == "1289"
|
|
assert bec_queue.table.item(0, 1).text() == "line_scan"
|
|
assert bec_queue.table.item(0, 2).text() == "COMPLETED"
|
|
|
|
abort_button = bec_queue.table.cellWidget(0, 3)
|
|
abort_button.button.click()
|
|
|
|
bec_queue.update_queue(bec_queue_msg_full.content, {})
|
|
|
|
assert bec_queue.table.rowCount() == 1
|
|
assert bec_queue.table.item(0, 0).text() == "1289"
|
|
assert bec_queue.table.item(0, 1).text() == "line_scan"
|
|
assert bec_queue.table.item(0, 2).text() == "COMPLETED"
|
|
|
|
abort_button = bec_queue.table.cellWidget(0, 3)
|
|
abort_button.button.click()
|
|
|
|
bec_queue.update_queue(bec_queue_msg_full.content, {})
|
|
|
|
assert bec_queue.table.rowCount() == 1
|
|
assert bec_queue.table.item(0, 0).text() == "1289"
|
|
assert bec_queue.table.item(0, 1).text() == "line_scan"
|
|
assert bec_queue.table.item(0, 2).text() == "COMPLETED"
|
|
|
|
abort_button = bec_queue.table.cellWidget(0, 3)
|
|
abort_button.button.click()
|