Files
bec_widgets/tests/unit_tests/test_scan_progress_bar.py
T

600 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from unittest import mock
import numpy as np
import pytest
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.widgets.progress.bec_progressbar.bec_progressbar import (
BECProgressBar,
ProgressState,
)
from bec_widgets.widgets.progress.scan_progressbar.scan_progressbar import (
ProgressSource,
ProgressTask,
ScanProgressBar,
)
from .client_mocks import mocked_client
@pytest.fixture
def scan_progressbar(qtbot, mocked_client):
widget = ScanProgressBar(client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
@pytest.fixture
def scan_message():
return messages.ScanQueueMessage(
metadata={
"file_suffix": None,
"file_directory": None,
"user_metadata": {"sample_name": ""},
"RID": "94949c6e-d5f2-4f01-837e-a5d36257dd5d",
},
scan_type="line_scan",
parameter={
"args": {"samx": [-10.0, 10.0]},
"kwargs": {
"steps": 20,
"relative": False,
"exp_time": 0.1,
"burst_at_each_point": 1,
"system_config": {"file_suffix": None, "file_directory": None},
},
},
queue="primary",
)
def test_progress_task_basic():
"""percentage, remaining, and formatted time helpers behave as expected."""
task = ProgressTask(parent=None, value=50, max_value=100, done=False)
task.timer.stop() # we dont want the timer ticking in tests
task._elapsed_time = 10 # simulate 10 s elapsed
# 50 / 100 ⇒ 50 %
assert task.percentage == 50
# speed = value / elapsed = 5 steps / s
assert np.isclose(task.speed, 5)
# remaining steps = 50 ; time_remaining ≈ 10 s
assert task.remaining == 50
assert task.time_remaining == "00:00:10"
# time_elapsed formatting
assert task.time_elapsed == "00:00:10"
def test_progress_task_elapsed_time_uses_monotonic_clock(monkeypatch):
times = iter([100.0, 102.5])
monkeypatch.setattr(
"bec_widgets.widgets.progress.scan_progressbar.scan_progressbar.time.monotonic",
lambda: next(times),
)
task = ProgressTask(parent=None)
task.timer.stop()
task.update_elapsed_time()
assert task._elapsed_time == 2.5
assert task.time_elapsed == "00:00:02"
def test_scan_progressbar_initialization(scan_progressbar):
assert isinstance(scan_progressbar, ScanProgressBar)
assert isinstance(scan_progressbar.progressbar, BECProgressBar)
def test_scan_progressbar_passes_dynamic_stylesheet_setting(qtbot, mocked_client):
widget = ScanProgressBar(client=mocked_client, enable_dynamic_stylesheet=False)
qtbot.addWidget(widget)
assert widget.progressbar.enable_dynamic_stylesheet is False
def test_update_labels_content(scan_progressbar):
"""update_labels() reflects ProgressTask time strings on the UI."""
# fabricate a task with known timings
task = ProgressTask(parent=scan_progressbar, value=30, max_value=100, done=False)
task.timer.stop()
task._elapsed_time = 50
scan_progressbar.task = task
scan_progressbar.update_labels()
assert scan_progressbar.ui.elapsed_time_label.text() == "00:00:50"
assert scan_progressbar.ui.remaining_time_label.text() == "00:01:57"
def test_on_progress_update(qtbot, scan_progressbar):
"""
on_progress_update() should forward new values to the embedded
BECProgressBar and keep ProgressTask in sync.
"""
task = ProgressTask(parent=scan_progressbar, value=0, max_value=100, done=False)
task.timer.stop()
scan_progressbar.task = task
msg = {"value": 20, "max_value": 100, "done": False}
scan_progressbar.on_progress_update(msg, metadata={"status": "open"})
qtbot.wait(200)
bar = scan_progressbar.progressbar
assert bar._user_value == 20
assert bar._user_maximum == 100
# state reflects BEC status
assert bar.state is ProgressState.NORMAL
@pytest.mark.parametrize(
"status, value, max_val, expected_state",
[
("open", 10, 100, ProgressState.NORMAL),
("paused", 25, 100, ProgressState.PAUSED),
("aborted", 30, 100, ProgressState.INTERRUPTED),
("halted", 40, 100, ProgressState.PAUSED),
("closed", 100, 100, ProgressState.COMPLETED),
],
)
def test_state_mapping_during_updates(
qtbot, scan_progressbar, status, value, max_val, expected_state
):
"""ScanProgressBar should translate BEC status → ProgressState consistently."""
task = ProgressTask(parent=scan_progressbar, value=0, max_value=max_val, done=False)
task.timer.stop()
scan_progressbar.task = task
scan_progressbar.on_progress_update(
{"value": value, "max_value": max_val, "done": status == "closed"},
metadata={"status": status},
)
assert scan_progressbar.progressbar.state is expected_state
def test_source_label_updates(scan_progressbar):
"""update_source_label() renders correct text for both progress sources."""
# device progress
scan_progressbar.update_source_label(ProgressSource.DEVICE_PROGRESS, device="motor")
assert scan_progressbar.ui.source_label.text() == "Device motor"
# scan progress (needs a scan_number for deterministic text)
scan_progressbar.scan_number = 5
scan_progressbar.update_source_label(ProgressSource.SCAN_PROGRESS)
assert scan_progressbar.ui.source_label.text() == "Scan 5"
def test_source_label_update_logs_only_on_text_change(scan_progressbar):
scan_progressbar.scan_number = 5
with mock.patch(
"bec_widgets.widgets.progress.scan_progressbar.scan_progressbar.logger.info"
) as mock_info:
scan_progressbar.set_progress_source(ProgressSource.SCAN_PROGRESS)
scan_progressbar.set_progress_source(ProgressSource.SCAN_PROGRESS)
scan_progressbar.set_progress_source(ProgressSource.SCAN_PROGRESS)
mock_info.assert_called_once_with("Set progress source to Scan 5")
def test_set_progress_source_connections(scan_progressbar, monkeypatch):
""" """
connect_calls = []
disconnect_calls = []
def fake_connect(slot, endpoint):
connect_calls.append(endpoint)
def fake_disconnect(slot, endpoint):
disconnect_calls.append(endpoint)
# Patch dispatcher methods
monkeypatch.setattr(scan_progressbar.bec_dispatcher, "connect_slot", fake_connect)
monkeypatch.setattr(scan_progressbar.bec_dispatcher, "disconnect_slot", fake_disconnect)
# switch to SCAN_PROGRESS
scan_progressbar.scan_number = 7
scan_progressbar.set_progress_source(ProgressSource.SCAN_PROGRESS)
assert scan_progressbar._progress_source == ProgressSource.SCAN_PROGRESS
assert scan_progressbar.ui.source_label.text() == "Scan 7"
assert connect_calls[-1] == MessageEndpoints.scan_progress()
assert disconnect_calls == []
# switch to DEVICE_PROGRESS
device = "motor"
scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device=device)
assert scan_progressbar._progress_source == ProgressSource.DEVICE_PROGRESS
assert scan_progressbar.ui.source_label.text() == f"Device {device}"
assert connect_calls[-1] == MessageEndpoints.device_progress(device=device)
assert disconnect_calls == [MessageEndpoints.scan_progress()]
# calling again with the SAME source should not add new connect calls
prev_connect_count = len(connect_calls)
scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device=device)
assert len(connect_calls) == prev_connect_count, "No extra connect made for same source"
def test_set_progress_source_disconnects_previous_device_subscription(
scan_progressbar, monkeypatch
):
disconnect_calls = []
monkeypatch.setattr(scan_progressbar.bec_dispatcher, "connect_slot", lambda *args: None)
monkeypatch.setattr(
scan_progressbar.bec_dispatcher,
"disconnect_slot",
lambda slot, endpoint: disconnect_calls.append(endpoint),
)
scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device="motor1")
scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device="motor2")
assert disconnect_calls == [MessageEndpoints.device_progress(device="motor1")]
def test_set_progress_source_disconnects_device_when_switching_to_scan(
scan_progressbar, monkeypatch
):
disconnect_calls = []
monkeypatch.setattr(scan_progressbar.bec_dispatcher, "connect_slot", lambda *args: None)
monkeypatch.setattr(
scan_progressbar.bec_dispatcher,
"disconnect_slot",
lambda slot, endpoint: disconnect_calls.append(endpoint),
)
scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device="motor1")
scan_progressbar.set_progress_source(ProgressSource.SCAN_PROGRESS)
assert disconnect_calls == [MessageEndpoints.device_progress(device="motor1")]
def test_cleanup_disconnects_active_device_subscription(scan_progressbar, monkeypatch):
disconnect_calls = []
monkeypatch.setattr(scan_progressbar.bec_dispatcher, "connect_slot", lambda *args: None)
monkeypatch.setattr(
scan_progressbar.bec_dispatcher,
"disconnect_slot",
lambda slot, endpoint: disconnect_calls.append(endpoint),
)
monkeypatch.setattr(scan_progressbar.progressbar, "close", lambda: None)
monkeypatch.setattr(scan_progressbar.progressbar, "deleteLater", lambda: None)
monkeypatch.setattr(BECWidget, "cleanup", lambda self: None)
scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device="motor1")
with (
mock.patch.object(scan_progressbar, "close", wraps=scan_progressbar.close) as close_mock,
mock.patch.object(
scan_progressbar, "deleteLater", wraps=scan_progressbar.deleteLater
) as delete_later_mock,
):
ScanProgressBar.cleanup(scan_progressbar)
assert disconnect_calls == [
MessageEndpoints.device_progress(device="motor1"),
MessageEndpoints.scan_queue_status(),
]
assert scan_progressbar._progress_source is None
assert scan_progressbar._progress_device is None
close_mock.assert_not_called()
delete_later_mock.assert_not_called()
def test_cleanup_stops_active_task(scan_progressbar, monkeypatch):
monkeypatch.setattr(BECWidget, "cleanup", lambda self: None)
scan_progressbar.task = ProgressTask(parent=scan_progressbar)
scan_progressbar._active_scan_id = "scan-1"
timer = scan_progressbar.task.timer
ScanProgressBar.cleanup(scan_progressbar)
assert not timer.isActive()
assert scan_progressbar.task is None
assert scan_progressbar._active_scan_id is None
def test_progressbar_queue_update(scan_progressbar):
"""
Test that an empty queue update does not change the progress source.
"""
msg = messages.ScanQueueStatusMessage(
queue={"primary": messages.ScanQueueStatus(info=[], status="RUNNING")}
)
with mock.patch.object(scan_progressbar, "set_progress_source") as mock_set_source:
scan_progressbar.on_queue_update(
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
)
mock_set_source.assert_not_called()
def test_progressbar_queue_update_clears_task_when_queue_is_empty(scan_progressbar):
scan_progressbar.task = ProgressTask(parent=scan_progressbar)
scan_progressbar._active_scan_id = "scan-1"
timer = scan_progressbar.task.timer
msg = messages.ScanQueueStatusMessage(
queue={"primary": messages.ScanQueueStatus(info=[], status="RUNNING")}
)
scan_progressbar.on_queue_update(
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
)
assert not timer.isActive()
assert scan_progressbar.task is None
assert scan_progressbar._active_scan_id is None
def test_progressbar_queue_update_clears_task_when_scan_is_not_running(
scan_progressbar, scan_message
):
scan_progressbar.task = ProgressTask(parent=scan_progressbar)
scan_progressbar._active_scan_id = "scan-1"
timer = scan_progressbar.task.timer
request_block = messages.RequestBlock(
msg=scan_message,
RID="some-rid",
scan_motors=["samx"],
readout_priority={"monitored": ["samx"]},
is_scan=True,
scan_number=1,
scan_id="scan-1",
report_instructions=[{"scan_progress": 20}],
)
msg = messages.ScanQueueStatusMessage(
metadata={},
queue={
"primary": messages.ScanQueueStatus(
info=[
messages.QueueInfoEntry(
queue_id="queue-1",
scan_id=["scan-1"],
status="completed",
active_request_block=request_block,
is_scan=[True],
request_blocks=[request_block],
scan_number=[1],
)
],
status="RUNNING",
)
},
)
with mock.patch.object(scan_progressbar, "set_progress_source") as mock_set_source:
scan_progressbar.on_queue_update(
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
)
assert not timer.isActive()
assert scan_progressbar.task is None
assert scan_progressbar._active_scan_id is None
mock_set_source.assert_not_called()
def test_progressbar_queue_update_with_scan(scan_progressbar, scan_message):
"""
Test that a queue update with a scan changes the progress source to SCAN_PROGRESS.
"""
request_block = messages.RequestBlock(
msg=scan_message,
RID="some-rid",
scan_motors=["samx"],
readout_priority={"monitored": ["samx"]},
is_scan=True,
scan_number=1,
scan_id="e3f50794-852c-4bb1-965e-41c585ab0aa9",
report_instructions=[{"scan_progress": 20}],
)
msg = messages.ScanQueueStatusMessage(
metadata={},
queue={
"primary": messages.ScanQueueStatus(
info=[
messages.QueueInfoEntry(
queue_id="40831e2c-fbd1-4432-8072-ad168a7ad964",
scan_id=["e3f50794-852c-4bb1-965e-41c585ab0aa9"],
status="RUNNING",
active_request_block=request_block,
is_scan=[True],
request_blocks=[request_block],
scan_number=[1],
)
],
status="RUNNING",
)
},
)
with mock.patch.object(scan_progressbar, "set_progress_source") as mock_set_source:
scan_progressbar.on_queue_update(
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
)
mock_set_source.assert_called_once_with(ProgressSource.SCAN_PROGRESS)
def test_progressbar_queue_update_starts_new_task_for_new_scan(scan_progressbar, scan_message):
started = mock.Mock()
scan_progressbar.progress_started.connect(started)
def queue_msg(scan_id: str, scan_number: int):
request_block = messages.RequestBlock(
msg=scan_message,
RID=f"rid-{scan_number}",
scan_motors=["samx"],
readout_priority={"monitored": ["samx"]},
is_scan=True,
scan_number=scan_number,
scan_id=scan_id,
report_instructions=[{"scan_progress": 20}],
)
return messages.ScanQueueStatusMessage(
metadata={},
queue={
"primary": messages.ScanQueueStatus(
info=[
messages.QueueInfoEntry(
queue_id=f"queue-{scan_number}",
scan_id=[scan_id],
status="RUNNING",
active_request_block=request_block,
is_scan=[True],
request_blocks=[request_block],
scan_number=[scan_number],
)
],
status="RUNNING",
)
},
)
first_msg = queue_msg("scan-1", 1)
scan_progressbar.on_queue_update(
first_msg.content, first_msg.metadata, _override_slot_params={"verify_sender": False}
)
first_task = scan_progressbar.task
assert first_task is not None
assert first_task.timer.isActive()
second_msg = queue_msg("scan-2", 2)
scan_progressbar.on_queue_update(
second_msg.content, second_msg.metadata, _override_slot_params={"verify_sender": False}
)
assert started.call_count == 2
assert not first_task.timer.isActive()
assert scan_progressbar.task is not first_task
assert scan_progressbar._active_scan_id == "scan-2"
def test_progressbar_queue_update_with_device(scan_progressbar, scan_message):
"""
Test that a queue update with a device changes the progress source to DEVICE_PROGRESS.
"""
request_block = messages.RequestBlock(
msg=scan_message,
RID="some-rid",
scan_motors=["samx"],
readout_priority={"monitored": ["samx"]},
is_scan=True,
scan_number=1,
scan_id="e3f50794-852c-4bb1-965e-41c585ab0aa9",
report_instructions=[{"device_progress": ["samx"]}],
)
msg = messages.ScanQueueStatusMessage(
metadata={},
queue={
"primary": messages.ScanQueueStatus(
info=[
messages.QueueInfoEntry(
queue_id="40831e2c-fbd1-4432-8072-ad168a7ad964",
scan_id=["e3f50794-852c-4bb1-965e-41c585ab0aa9"],
status="RUNNING",
active_request_block=request_block,
is_scan=[True],
request_blocks=[request_block],
scan_number=[1],
)
],
status="RUNNING",
)
},
)
with mock.patch.object(scan_progressbar, "set_progress_source") as mock_set_source:
scan_progressbar.on_queue_update(
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
)
mock_set_source.assert_called_once_with(ProgressSource.DEVICE_PROGRESS, device="samx")
def test_progressbar_queue_update_ignores_empty_device_progress(scan_progressbar, scan_message):
request_block = messages.RequestBlock(
msg=scan_message,
RID="some-rid",
scan_motors=["samx"],
readout_priority={"monitored": ["samx"]},
is_scan=True,
scan_number=1,
scan_id="e3f50794-852c-4bb1-965e-41c585ab0aa9",
report_instructions=[{"device_progress": []}],
)
msg = messages.ScanQueueStatusMessage(
metadata={},
queue={
"primary": messages.ScanQueueStatus(
info=[
messages.QueueInfoEntry(
queue_id="40831e2c-fbd1-4432-8072-ad168a7ad964",
scan_id=["e3f50794-852c-4bb1-965e-41c585ab0aa9"],
status="RUNNING",
active_request_block=request_block,
is_scan=[True],
request_blocks=[request_block],
scan_number=[1],
)
],
status="RUNNING",
)
},
)
with mock.patch.object(scan_progressbar, "set_progress_source") as mock_set_source:
scan_progressbar.on_queue_update(
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
)
mock_set_source.assert_not_called()
def test_progressbar_queue_update_with_no_scan_or_device(scan_progressbar, scan_message):
"""
Test that a queue update with neither scan nor device does not change the progress source.
"""
request_block = messages.RequestBlock(
msg=scan_message,
RID="some-rid",
scan_motors=["samx"],
readout_priority={"monitored": ["samx"]},
is_scan=True,
scan_number=1,
scan_id="e3f50794-852c-4bb1-965e-41c585ab0aa9",
)
msg = messages.ScanQueueStatusMessage(
metadata={},
queue={
"primary": messages.ScanQueueStatus(
info=[
messages.QueueInfoEntry(
queue_id="40831e2c-fbd1-4432-8072-ad168a7ad964",
scan_id=["e3f50794-852c-4bb1-965e-41c585ab0aa9"],
status="RUNNING",
active_request_block=request_block,
is_scan=[True],
request_blocks=[request_block],
scan_number=[1],
)
],
status="RUNNING",
)
},
)
with mock.patch.object(scan_progressbar, "set_progress_source") as mock_set_source:
scan_progressbar.on_queue_update(
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
)
mock_set_source.assert_not_called()