From 9eb51e99c7fa370596acb5b58b6746eb81fb4c54 Mon Sep 17 00:00:00 2001 From: wyzula-jan Date: Tue, 26 May 2026 21:18:34 +0200 Subject: [PATCH] feat(progress): progress is tracked from bec; unified progress backend --- .../bec_progressbar/bec_progressbar.py | 43 ++- .../widgets/progress/progress_backend.py | 315 +++++++++++++++ .../progress/ring_progress_bar/ring.py | 73 ++-- .../scan_progressbar/scan_progressbar.py | 317 ++++----------- tests/unit_tests/test_bec_progressbar.py | 9 +- ...test_device_initialization_progress_bar.py | 6 +- .../unit_tests/test_ring_progress_bar_ring.py | 14 +- tests/unit_tests/test_scan_progress_bar.py | 361 ++---------------- 8 files changed, 514 insertions(+), 624 deletions(-) create mode 100644 bec_widgets/widgets/progress/progress_backend.py diff --git a/bec_widgets/widgets/progress/bec_progressbar/bec_progressbar.py b/bec_widgets/widgets/progress/bec_progressbar/bec_progressbar.py index 51d7e158..03076173 100644 --- a/bec_widgets/widgets/progress/bec_progressbar/bec_progressbar.py +++ b/bec_widgets/widgets/progress/bec_progressbar/bec_progressbar.py @@ -20,7 +20,7 @@ class ProgressState(Enum): @classmethod def from_bec_status(cls, status: str) -> "ProgressState": """ - Map a BEC status string (open, paused, aborted, halted, closed) + Map a BEC status string (open, paused, aborted, halt/halted, closed, user_completed) to the corresponding ProgressState. Any unknown status falls back to NORMAL. """ @@ -28,8 +28,10 @@ class ProgressState(Enum): "open": cls.NORMAL, "paused": cls.PAUSED, "aborted": cls.INTERRUPTED, + "halt": cls.PAUSED, "halted": cls.PAUSED, "closed": cls.COMPLETED, + "user_completed": cls.PAUSED, } return mapping.get(status.lower(), cls.NORMAL) @@ -104,9 +106,6 @@ class BECProgressBar(BECWidget, QWidget): self.progressbar.setMinimumHeight(0) self.progressbar.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Ignored) - # Backwards-compatible alias used by existing tests and downstream code. - self.center_label = self.progressbar - self._layout = QVBoxLayout(self) self._layout.setContentsMargins(self._padding_left_right, 0, self._padding_left_right, 0) self._layout.setSpacing(0) @@ -339,6 +338,7 @@ class BECProgressBar(BECWidget, QWidget): def _setup_style_sheet(self, *, chunk_radius: int) -> None: radius = int(round(self._corner_radius)) + chunk_color = self._state_colors[self._current_visual_state()].name() self.progressbar.setStyleSheet(f""" QProgressBar {{ background-color: palette(mid); @@ -348,7 +348,7 @@ class BECProgressBar(BECWidget, QWidget): text-align: center; }} QProgressBar::chunk {{ - background-color: palette(highlight); + background-color: {chunk_color}; border-radius: {chunk_radius}px; }} """) @@ -385,6 +385,16 @@ class BECProgressBar(BECWidget, QWidget): return min(target_radius, max(1, int(fill_width / 2))) def _apply_state_style(self) -> None: + chunk_radius = self._chunk_radius + if chunk_radius is None: + target_radius = self._target_chunk_radius() + chunk_radius = ( + self._calculate_chunk_radius(target_radius) + if self._enable_dynamic_stylesheet + else target_radius + ) + self._chunk_radius = chunk_radius + self._setup_style_sheet(chunk_radius=chunk_radius) color = self._state_colors[self._current_visual_state()] palette = self.progressbar.palette() palette.setColor(QPalette.ColorRole.Highlight, color) @@ -406,20 +416,23 @@ class BECProgressBar(BECWidget, QWidget): if __name__ == "__main__": # pragma: no cover app = QApplication(sys.argv) - progressBar = BECProgressBar() - progressBar.show() - progressBar.set_minimum(-100) - progressBar.set_maximum(0) + progress_bar = BECProgressBar() + progress_bar.setWindowTitle("BEC Progress Bar") + progress_bar.resize(360, 48) + progress_bar.set_minimum(-100) + progress_bar.set_maximum(0) + progress_bar.set_value(-100) + progress_bar.show() # Example of setting values def update_progress(): - value = progressBar._user_value + 2.5 - if value > progressBar._user_maximum: - value = -100 # progressBar._maximum / progressBar._upsampling_factor - progressBar.set_value(value) + value = progress_bar._user_value + 2.5 + if value > progress_bar._user_maximum: + value = progress_bar._user_minimum + progress_bar.set_value(value) - timer = QTimer() + timer = QTimer(progress_bar) timer.timeout.connect(update_progress) - timer.start(200) # Update every half second + timer.start(200) sys.exit(app.exec()) diff --git a/bec_widgets/widgets/progress/progress_backend.py b/bec_widgets/widgets/progress/progress_backend.py new file mode 100644 index 00000000..c4f84272 --- /dev/null +++ b/bec_widgets/widgets/progress/progress_backend.py @@ -0,0 +1,315 @@ +from __future__ import annotations + +import enum +import time +from dataclasses import dataclass +from typing import Literal + +import numpy as np +from bec_lib.endpoints import MessageEndpoints +from qtpy.QtCore import QObject, QTimer, Signal + + +class ProgressSource(enum.Enum): + """ + Enum to define the source of the progress. + """ + + SCAN_PROGRESS = "scan_progress" + DEVICE_PROGRESS = "device_progress" + + +@dataclass(frozen=True) +class ProgressSnapshot: + source: ProgressSource + value: float + max_value: float + done: bool + status: Literal["open", "paused", "aborted", "halt", "halted", "closed", "user_completed"] + device: str | None = None + scan_id: str | None = None + scan_number: int | None = None + rid: str | None = None + is_new_scan: bool = False + + +class ProgressTask(QObject): + """ + Class to store progress information. + Inspired by https://github.com/Textualize/rich/blob/master/rich/progress.py + """ + + def __init__( + self, parent: QObject | None, value: float = 0, max_value: float = 0, done: bool = False + ): + super().__init__(parent=parent) + self.start_time = time.monotonic() + self.done = done + self.value = value + self.max_value = max_value + self._elapsed_time = 0 + + self.timer = QTimer(self) + self.timer.timeout.connect(self.update_elapsed_time) + self.timer.start(1000) + + def update(self, value: float, max_value: float, done: bool = False): + """ + Update the progress. + """ + self.max_value = max_value + self.done = done + self.value = value + if done: + self.timer.stop() + + def update_elapsed_time(self): + """ + Update the time estimates. This is called every second by a QTimer. + """ + self._elapsed_time = max(0.0, time.monotonic() - self.start_time) + + @property + def percentage(self) -> float: + """float: Get progress of task as a percentage. If a None total was set, returns 0""" + if not self.max_value: + return 0.0 + completed = (self.value / self.max_value) * 100.0 + completed = min(100.0, max(0.0, completed)) + return completed + + @property + def speed(self) -> float: + """Get the estimated speed in steps per second.""" + if self._elapsed_time == 0: + return 0.0 + + return self.value / self._elapsed_time + + @property + def frequency(self) -> float: + """Get the estimated frequency in steps per second.""" + if self.speed == 0: + return 0.0 + return 1 / self.speed + + @property + def time_elapsed(self) -> str: + return self._format_time(int(self._elapsed_time)) + + @property + def remaining(self) -> float: + """Get the estimated remaining steps.""" + if self.done: + return 0.0 + remaining = self.max_value - self.value + return remaining + + @property + def time_remaining(self) -> str: + """ + Get the estimated remaining time in the format HH:MM:SS. + """ + if self.done or not self.speed or not self.remaining: + return self._format_time(0) + estimate = int(np.round(self.remaining / self.speed)) + + return self._format_time(estimate) + + def _format_time(self, seconds: float) -> str: + """ + Format the time in seconds to a string in the format HH:MM:SS. + """ + return f"{seconds // 3600:02}:{(seconds // 60) % 60:02}:{seconds % 60:02}" + + +class BECProgressTracker(QObject): + """ + Shared backend for BEC scan and device progress messages. + """ + + progress_started = Signal(object) + progress_updated = Signal(object) + progress_finished = Signal(object) + progress_cleared = Signal() + source_changed = Signal(object) + + def __init__(self, bec_dispatcher, parent: QObject | None = None): + super().__init__(parent=parent) + self.bec_dispatcher = bec_dispatcher + self._progress_source: ProgressSource | None = None + self._progress_device: str | None = None + self.task: ProgressTask | None = None + self.scan_number: int | None = None + self._active_scan_id: str | None = None + self._active_rid: str | None = None + + @property + def progress_source(self) -> ProgressSource | None: + return self._progress_source + + @property + def progress_device(self) -> str | None: + return self._progress_device + + @property + def active_scan_id(self) -> str | None: + return self._active_scan_id + + @property + def active_rid(self) -> str | None: + return self._active_rid + + def start( + self, + *, + source: ProgressSource | None = ProgressSource.SCAN_PROGRESS, + device: str | None = None, + ) -> None: + if source is not None: + self.set_progress_source(source, device=device) + + def set_progress_source(self, source: ProgressSource, device: str | None = None) -> None: + if source == ProgressSource.DEVICE_PROGRESS and not device: + return + if self._progress_source == source and self._progress_device == device: + self.source_changed.emit(self.current_snapshot(value=0, max_value=100, done=False)) + return + + self._disconnect_progress_source() + self._progress_source = source + self._progress_device = None if source == ProgressSource.SCAN_PROGRESS else device + self.bec_dispatcher.connect_slot(self.on_progress_update, self._progress_endpoint()) + self.source_changed.emit(self.current_snapshot(value=0, max_value=100, done=False)) + + def _disconnect_progress_source(self) -> None: + if self._progress_source is None: + return + self.bec_dispatcher.disconnect_slot(self.on_progress_update, self._progress_endpoint()) + self._progress_source = None + self._progress_device = None + + def _progress_endpoint(self): + if self._progress_source == ProgressSource.SCAN_PROGRESS: + return MessageEndpoints.scan_progress() + return MessageEndpoints.device_progress(device=self._progress_device) + + def current_snapshot( + self, + *, + value: float, + max_value: float, + done: bool, + status: Literal[ + "open", "paused", "aborted", "halt", "halted", "closed", "user_completed" + ] = "open", + is_new_scan: bool = False, + ) -> ProgressSnapshot: + source = self._progress_source or ProgressSource.SCAN_PROGRESS + return ProgressSnapshot( + source=source, + value=value, + max_value=max_value, + done=done, + status=status, + device=self._progress_device, + scan_id=self._active_scan_id, + scan_number=self.scan_number, + rid=self._active_rid, + is_new_scan=is_new_scan, + ) + + def _start_task(self, scan_id: str | None, rid: str | None = None) -> None: + if self.task is not None: + self.task.timer.stop() + self.task.deleteLater() + self.task = ProgressTask(parent=self) + self._active_scan_id = scan_id + self._active_rid = rid + self.progress_started.emit(self.current_snapshot(value=0, max_value=100, done=False)) + + def clear_task(self, *, emit_finished: bool = True) -> None: + if self.task is None: + self._active_scan_id = None + self._active_rid = None + self.progress_cleared.emit() + return + self.task.timer.stop() + self.task.deleteLater() + self.task = None + self._active_scan_id = None + self._active_rid = None + self.progress_cleared.emit() + if emit_finished: + self.progress_finished.emit(self.current_snapshot(value=0, max_value=100, done=True)) + + def on_progress_update(self, msg_content: dict, metadata: dict): + if self._progress_source is None: + return + self.process_progress_message(self._progress_source, msg_content, metadata) + + def process_progress_message( + self, + source: ProgressSource, + msg_content: dict, + metadata: dict, + *, + device: str | None = None, + ) -> ProgressSnapshot | None: + done = msg_content.get("done", False) + value = msg_content.get("value", 0) + max_value = msg_content.get("max_value", 100) + status: Literal[ + "open", "paused", "aborted", "halt", "halted", "closed", "user_completed" + ] = metadata.get("status", "open") + if done and source == ProgressSource.DEVICE_PROGRESS: + value = max_value + scan_id = metadata.get("scan_id") or metadata.get("RID") + rid = metadata.get("RID") + scan_number = metadata.get("scan_number") + if scan_number is not None: + self.scan_number = scan_number + is_new_scan = False + previous_scan_id = self._active_scan_id + previous_rid = self._active_rid + identity_changed = ( + (scan_id is not None and scan_id != previous_scan_id) + or (rid is not None and rid != previous_rid) + or (previous_scan_id is None and previous_rid is None) + ) + + if self.task is None: + self._start_task(scan_id, rid=rid) + is_new_scan = identity_changed + elif scan_id is not None and scan_id != self._active_scan_id: + self._start_task(scan_id, rid=rid) + is_new_scan = True + elif rid is not None and rid != self._active_rid: + self._start_task(scan_id or self._active_scan_id, rid=rid) + is_new_scan = True + + if self.task is None: + return None + + self.task.update(value, max_value, done) + progress_device = device or self._progress_device + snapshot = ProgressSnapshot( + source=source, + value=value, + max_value=max_value, + done=done, + status=status, + device=progress_device if source == ProgressSource.DEVICE_PROGRESS else None, + scan_id=self._active_scan_id, + scan_number=self.scan_number, + rid=self._active_rid, + is_new_scan=is_new_scan, + ) + self.progress_updated.emit(snapshot) + if done: + self.clear_task() + return snapshot + + def cleanup(self) -> None: + self.clear_task(emit_finished=False) + self._disconnect_progress_source() diff --git a/bec_widgets/widgets/progress/ring_progress_bar/ring.py b/bec_widgets/widgets/progress/ring_progress_bar/ring.py index c998c953..48cee10c 100644 --- a/bec_widgets/widgets/progress/ring_progress_bar/ring.py +++ b/bec_widgets/widgets/progress/ring_progress_bar/ring.py @@ -13,6 +13,11 @@ from bec_widgets import BECWidget from bec_widgets.utils.bec_connector import ConnectionConfig from bec_widgets.utils.colors import Colors from bec_widgets.utils.error_popups import SafeProperty, SafeSlot +from bec_widgets.widgets.progress.progress_backend import ( + BECProgressTracker, + ProgressSnapshot, + ProgressSource, +) logger = bec_logger.logger if TYPE_CHECKING: @@ -81,6 +86,8 @@ class Ring(BECWidget, QWidget): self._color: QColor = self.convert_color(self.config.color) self._background_color: QColor = self.convert_color(self.config.background_color) self.registered_slot: tuple[Callable, str | EndpointInfo] | None = None + self.progress_tracker = BECProgressTracker(self.bec_dispatcher, parent=self) + self.progress_tracker.progress_updated.connect(self._on_progress_snapshot) self.RID = None self._gap = 5 self._hovered = False @@ -219,26 +226,18 @@ class Ring(BECWidget, QWidget): case "manual": if self.config.mode == "manual": return - if self.registered_slot is not None: - self.bec_dispatcher.disconnect_slot(*self.registered_slot) + self._disconnect_registered_update() self.config.mode = "manual" - self.registered_slot = None case "scan": if self.config.mode == "scan": return - if self.registered_slot is not None: - self.bec_dispatcher.disconnect_slot(*self.registered_slot) + self._disconnect_registered_update() self.config.mode = "scan" - self.bec_dispatcher.connect_slot( - self.on_scan_progress, MessageEndpoints.scan_progress() - ) - self.registered_slot = (self.on_scan_progress, MessageEndpoints.scan_progress()) + self.progress_tracker.start(source=ProgressSource.SCAN_PROGRESS) case "device": - if self.registered_slot is not None: - self.bec_dispatcher.disconnect_slot(*self.registered_slot) + self._disconnect_registered_update() self.config.mode = "device" if device == "": - self.registered_slot = None return self.config.device = device # self.config.signal = self._get_signal_from_device(device, signal) @@ -248,6 +247,12 @@ class Ring(BECWidget, QWidget): case _: raise ValueError(f"Unsupported mode: {mode}") + def _disconnect_registered_update(self): + if self.registered_slot is not None: + self.bec_dispatcher.disconnect_slot(*self.registered_slot) + self.registered_slot = None + self.progress_tracker.cleanup() + def set_precision(self, precision: int): """ Set the precision for the ring widget. @@ -362,15 +367,14 @@ class Ring(BECWidget, QWidget): return "" if signal in progress_signals: - endpoint = MessageEndpoints.device_progress(device) - self.bec_dispatcher.connect_slot(self.on_device_progress, endpoint) - self.registered_slot = (self.on_device_progress, endpoint) + self.progress_tracker.start(source=ProgressSource.DEVICE_PROGRESS, device=device) return signal if signal in hinted_signals or signal in normal_signals: endpoint = MessageEndpoints.device_readback(device) self.bec_dispatcher.connect_slot(self.on_device_readback, endpoint) self.registered_slot = (self.on_device_readback, endpoint) return signal + return "" @SafeSlot(dict, dict) def on_scan_progress(self, msg, meta): @@ -381,11 +385,10 @@ class Ring(BECWidget, QWidget): msg(dict): Message with the scan progress meta(dict): Metadata for the message """ - current_RID = meta.get("RID", None) - if current_RID != self.RID: - self.set_min_max_values(0, msg.get("max_value", 100)) - self.set_value(msg.get("value", 0)) - self.update() + if self.progress_tracker.active_rid is None and self.RID is not None: + self.progress_tracker._active_rid = self.RID + self.progress_tracker._active_scan_id = self.RID + self.progress_tracker.process_progress_message(ProgressSource.SCAN_PROGRESS, msg, meta) @SafeSlot(dict, dict) def on_device_readback(self, msg, meta): @@ -418,12 +421,20 @@ class Ring(BECWidget, QWidget): device = self.config.device if device is None: return - max_val = msg.get("max_value", 100) - self.set_min_max_values(0, max_val) - value = msg.get("value", 0) - if msg.get("done"): - value = max_val - self.set_value(value) + self.progress_tracker.process_progress_message( + ProgressSource.DEVICE_PROGRESS, msg, meta, device=device + ) + + def _on_progress_snapshot(self, snapshot: ProgressSnapshot): + if snapshot.source == ProgressSource.SCAN_PROGRESS: + if snapshot.is_new_scan: + self.set_min_max_values(0, snapshot.max_value) + self.RID = snapshot.rid + else: + if self.config.device is None: + return + self.set_min_max_values(0, snapshot.max_value) + self.set_value(snapshot.value) self.update() def paintEvent(self, event): @@ -509,15 +520,6 @@ class Ring(BECWidget, QWidget): return QtGui.QColor(*color) raise ValueError(f"Unsupported color format: {color}") - def cleanup(self): - """ - Cleanup the ring widget. - Disconnect any registered slots. - """ - if self.registered_slot is not None: - self.bec_dispatcher.disconnect_slot(*self.registered_slot) - self.registered_slot = None - ############################################### ####### QProperties ########################### ############################################### @@ -666,6 +668,7 @@ class Ring(BECWidget, QWidget): if self.registered_slot is not None: self.bec_dispatcher.disconnect_slot(*self.registered_slot) self.registered_slot = None + self.progress_tracker.cleanup() self._hover_animation.stop() super().cleanup() diff --git a/bec_widgets/widgets/progress/scan_progressbar/scan_progressbar.py b/bec_widgets/widgets/progress/scan_progressbar/scan_progressbar.py index 3b08d33e..f4b2603c 100644 --- a/bec_widgets/widgets/progress/scan_progressbar/scan_progressbar.py +++ b/bec_widgets/widgets/progress/scan_progressbar/scan_progressbar.py @@ -1,123 +1,25 @@ from __future__ import annotations -import enum import os -import time -from typing import Literal -import numpy as np -from bec_lib import messages -from bec_lib.endpoints import MessageEndpoints from bec_lib.logger import bec_logger -from qtpy.QtCore import QObject, QTimer, Signal +from qtpy.QtCore import Signal from qtpy.QtWidgets import QVBoxLayout, QWidget from bec_widgets.utils.bec_widget import BECWidget from bec_widgets.utils.error_popups import SafeProperty, SafeSlot from bec_widgets.utils.ui_loader import UILoader from bec_widgets.widgets.progress.bec_progressbar.bec_progressbar import ProgressState +from bec_widgets.widgets.progress.progress_backend import ( + BECProgressTracker, + ProgressSnapshot, + ProgressSource, + ProgressTask, +) logger = bec_logger.logger -class ProgressSource(enum.Enum): - """ - Enum to define the source of the progress. - """ - - SCAN_PROGRESS = "scan_progress" - DEVICE_PROGRESS = "device_progress" - - -class ProgressTask(QObject): - """ - Class to store progress information. - Inspired by https://github.com/Textualize/rich/blob/master/rich/progress.py - """ - - def __init__(self, parent: QWidget, value: float = 0, max_value: float = 0, done: bool = False): - super().__init__(parent=parent) - self.start_time = time.monotonic() - self.done = done - self.value = value - self.max_value = max_value - self._elapsed_time = 0 - - self.timer = QTimer(self) - self.timer.timeout.connect(self.update_elapsed_time) - self.timer.start(1000) - - def update(self, value: float, max_value: float, done: bool = False): - """ - Update the progress. - """ - self.max_value = max_value - self.done = done - self.value = value - if done: - self.timer.stop() - - def update_elapsed_time(self): - """ - Update the time estimates. This is called every second by a QTimer. - """ - self._elapsed_time = max(0.0, time.monotonic() - self.start_time) - - @property - def percentage(self) -> float: - """float: Get progress of task as a percentage. If a None total was set, returns 0""" - if not self.max_value: - return 0.0 - completed = (self.value / self.max_value) * 100.0 - completed = min(100.0, max(0.0, completed)) - return completed - - @property - def speed(self) -> float: - """Get the estimated speed in steps per second.""" - if self._elapsed_time == 0: - return 0.0 - - return self.value / self._elapsed_time - - @property - def frequency(self) -> float: - """Get the estimated frequency in steps per second.""" - if self.speed == 0: - return 0.0 - return 1 / self.speed - - @property - def time_elapsed(self) -> str: - # format the elapsed time to a string in the format HH:MM:SS - return self._format_time(int(self._elapsed_time)) - - @property - def remaining(self) -> float: - """Get the estimated remaining steps.""" - if self.done: - return 0.0 - remaining = self.max_value - self.value - return remaining - - @property - def time_remaining(self) -> str: - """ - Get the estimated remaining time in the format HH:MM:SS. - """ - if self.done or not self.speed or not self.remaining: - return self._format_time(0) - estimate = int(np.round(self.remaining / self.speed)) - - return self._format_time(estimate) - - def _format_time(self, seconds: float) -> str: - """ - Format the time in seconds to a string in the format HH:MM:SS. - """ - return f"{seconds // 3600:02}:{(seconds // 60) % 60:02}:{seconds % 60:02}" - - class ScanProgressBar(BECWidget, QWidget): """ Widget to display a progress bar that is hooked up to the scan progress of a scan. @@ -158,67 +60,64 @@ class ScanProgressBar(BECWidget, QWidget): self._show_remaining_time = self.ui.remaining_time_label.isVisible() self._show_source_label = self.ui.source_label.isVisible() - self._progress_source = None - self._progress_device = None - self.task = None - self.scan_number = None - self._active_scan_id = None - self.connect_to_queue() + self.progress_tracker = BECProgressTracker(self.bec_dispatcher, parent=self) + self.progress_tracker.progress_started.connect(self._on_progress_started) + self.progress_tracker.progress_updated.connect(self._on_progress_snapshot) + self.progress_tracker.progress_finished.connect(self._on_progress_finished) + self.progress_tracker.source_changed.connect(self._on_progress_source_changed) + self.progress_tracker.start(source=ProgressSource.SCAN_PROGRESS) - def connect_to_queue(self): - """ - Connect to the queue status signal. - """ - self.bec_dispatcher.connect_slot(self.on_queue_update, MessageEndpoints.scan_queue_status()) + @property + def task(self): + return self.progress_tracker.task + + @task.setter + def task(self, value): + self.progress_tracker.task = value + + @property + def scan_number(self): + return self.progress_tracker.scan_number + + @scan_number.setter + def scan_number(self, value): + self.progress_tracker.scan_number = value + + @property + def _active_scan_id(self): + return self.progress_tracker.active_scan_id + + @_active_scan_id.setter + def _active_scan_id(self, value): + self.progress_tracker._active_scan_id = value + + @property + def _progress_source(self): + return self.progress_tracker.progress_source + + @_progress_source.setter + def _progress_source(self, value): + self.progress_tracker._progress_source = value + + @property + def _progress_device(self): + return self.progress_tracker.progress_device + + @_progress_device.setter + def _progress_device(self, value): + self.progress_tracker._progress_device = value def set_progress_source(self, source: ProgressSource, device=None): """ Set the source of the progress. """ - if self._progress_source == source and self._progress_device == device: - self.update_source_label(source, device=device) - return - if self._progress_source is not None: - self.bec_dispatcher.disconnect_slot( - self.on_progress_update, - ( - MessageEndpoints.scan_progress() - if self._progress_source == ProgressSource.SCAN_PROGRESS - else MessageEndpoints.device_progress(device=self._progress_device) - ), - ) - self._progress_source = source - self._progress_device = None if source == ProgressSource.SCAN_PROGRESS else device - self.bec_dispatcher.connect_slot( - self.on_progress_update, - ( - MessageEndpoints.scan_progress() - if source == ProgressSource.SCAN_PROGRESS - else MessageEndpoints.device_progress(device=device) - ), - ) - self.update_source_label(source, device=device) - # self.progress_started.emit() + self.progress_tracker.set_progress_source(source, device=device) def _start_task(self, scan_id: str | None) -> None: - if self.task is not None: - self.task.timer.stop() - self.task.deleteLater() - self.task = ProgressTask(parent=self) - self.task.timer.timeout.connect(self.update_labels) - self._active_scan_id = scan_id - self.progress_started.emit() + self.progress_tracker._start_task(scan_id) def _clear_task(self, *, emit_finished: bool = True) -> None: - if self.task is None: - self._active_scan_id = None - return - self.task.timer.stop() - self.task.deleteLater() - self.task = None - self._active_scan_id = None - if emit_finished: - self.progress_finished.emit() + self.progress_tracker.clear_task(emit_finished=emit_finished) def update_source_label(self, source: ProgressSource, device=None): scan_text = f"Scan {self.scan_number}" if self.scan_number is not None else "Scan" @@ -233,26 +132,26 @@ class ScanProgressBar(BECWidget, QWidget): """ Update the progress bar based on the progress message. """ - value = msg_content["value"] - max_value = msg_content.get("max_value", 100) - done = msg_content.get("done", False) - status: Literal["open", "paused", "aborted", "halted", "closed"] = metadata.get( - "status", "open" - ) + self.progress_tracker.on_progress_update(msg_content, metadata) - if self.task is None: - return - self.task.update(value, max_value, done) + def _on_progress_started(self, _snapshot: ProgressSnapshot): + if self.task is not None: + self.task.timer.timeout.connect(self.update_labels) + self.progress_started.emit() + def _on_progress_finished(self, _snapshot: ProgressSnapshot): + self.progress_finished.emit() + + def _on_progress_source_changed(self, snapshot: ProgressSnapshot): + self.update_source_label(snapshot.source, device=snapshot.device) + + def _on_progress_snapshot(self, snapshot: ProgressSnapshot): self.update_labels() - - self.progressbar.set_maximum(self.task.max_value) - self.progressbar.state = ProgressState.from_bec_status(status) - self.progressbar.set_value(self.task.value) - - if done: - self._clear_task() - return + self.update_source_label(snapshot.source, device=snapshot.device) + self.progressbar.set_maximum(snapshot.max_value) + state = ProgressState.from_bec_status(snapshot.status) + self.progressbar.state = state + self.progressbar.set_value(snapshot.value) @SafeProperty(bool) def show_elapsed_time(self): @@ -295,80 +194,10 @@ class ScanProgressBar(BECWidget, QWidget): self.ui.elapsed_time_label.setText(self.task.time_elapsed) self.ui.remaining_time_label.setText(self.task.time_remaining) - @SafeSlot(dict, dict, verify_sender=True) - def on_queue_update(self, msg_content, metadata): - """ - Update the progress bar based on the queue status. - """ - if not "queue" in msg_content: - self._clear_task() - return - if "primary" not in msg_content["queue"]: - self._clear_task() - return - if (primary_queue := msg_content.get("queue").get("primary")) is None: - self._clear_task() - return - if not isinstance(primary_queue, messages.ScanQueueStatus): - self._clear_task() - return - primary_queue_info = primary_queue.info - if len(primary_queue_info) == 0: - self._clear_task() - return - scan_info = primary_queue_info[0] - if scan_info is None: - self._clear_task() - return - - active_request_block = scan_info.active_request_block - if active_request_block is None: - self._clear_task() - return - - status = scan_info.status.lower() - if status != "running": - self._clear_task() - return - - scan_id = active_request_block.scan_id or str(active_request_block.scan_number) - if self.task is None or self._active_scan_id != scan_id: - self._start_task(scan_id) - - self.scan_number = active_request_block.scan_number - report_instructions = active_request_block.report_instructions - if not report_instructions: - return - - # for now, let's just use the first instruction - instruction = report_instructions[0] - - if "scan_progress" in instruction: - self.set_progress_source(ProgressSource.SCAN_PROGRESS) - elif "device_progress" in instruction: - if not instruction["device_progress"]: - return - device = instruction["device_progress"][0] - self.set_progress_source(ProgressSource.DEVICE_PROGRESS, device=device) - def cleanup(self): - self._clear_task(emit_finished=False) - if self._progress_source is not None: - self.bec_dispatcher.disconnect_slot( - self.on_progress_update, - ( - MessageEndpoints.scan_progress() - if self._progress_source == ProgressSource.SCAN_PROGRESS - else MessageEndpoints.device_progress(device=self._progress_device) - ), - ) - self._progress_source = None - self._progress_device = None + self.progress_tracker.cleanup() self.progressbar.close() self.progressbar.deleteLater() - self.bec_dispatcher.disconnect_slot( - self.on_queue_update, MessageEndpoints.scan_queue_status() - ) super().cleanup() diff --git a/tests/unit_tests/test_bec_progressbar.py b/tests/unit_tests/test_bec_progressbar.py index d0628f8e..9269137e 100644 --- a/tests/unit_tests/test_bec_progressbar.py +++ b/tests/unit_tests/test_bec_progressbar.py @@ -44,7 +44,7 @@ def test_progressbar_label(progressbar): progressbar.label_template = "Test: $value" progressbar.set_value(50) assert progressbar._get_label() == "Test: 50" - assert progressbar.center_label.text() == "Test: 50" + assert progressbar.progressbar.text() == "Test: 50" def test_progressbar_equal_minimum_and_maximum_does_not_raise(progressbar): @@ -63,7 +63,10 @@ def test_progressbar_uses_static_stylesheet_with_palette_state_color(progressbar style_sheet = progressbar.progressbar.styleSheet() assert "QProgressBar::chunk" in style_sheet - assert "background-color: palette(highlight);" in style_sheet + assert ( + f"background-color: {progressbar._state_colors[ProgressState.PAUSED].name()};" + in style_sheet + ) assert "background-color: palette(mid);" in style_sheet assert "border-radius: 7px;" in style_sheet assert ( @@ -171,8 +174,10 @@ def test_progress_state_from_bec_status(): "open": ProgressState.NORMAL, "paused": ProgressState.PAUSED, "aborted": ProgressState.INTERRUPTED, + "halt": ProgressState.PAUSED, "halted": ProgressState.PAUSED, "closed": ProgressState.COMPLETED, + "user_completed": ProgressState.PAUSED, "UNKNOWN": ProgressState.NORMAL, # fallback } for text, expected in mapping.items(): diff --git a/tests/unit_tests/test_device_initialization_progress_bar.py b/tests/unit_tests/test_device_initialization_progress_bar.py index 530b53c1..c08a82b9 100644 --- a/tests/unit_tests/test_device_initialization_progress_bar.py +++ b/tests/unit_tests/test_device_initialization_progress_bar.py @@ -40,7 +40,7 @@ def test_update_device_initialization_progress(progress_bar, qtbot): assert progress_bar.progress_bar._user_value == 1 assert progress_bar.progress_bar._user_maximum == 3 assert progress_bar.progress_label.text() == f"{msg.device} initialization in progress..." - assert "1 / 3 - 33 %" == progress_bar.progress_bar.center_label.text() + assert "1 / 3 - 33 %" == progress_bar.progress_bar.progressbar.text() # II. Update with message of finished DeviceInitializationProgressMessage, finished=True, success=True msg.finished = True @@ -49,7 +49,7 @@ def test_update_device_initialization_progress(progress_bar, qtbot): assert progress_bar.progress_bar._user_value == 1 assert progress_bar.progress_bar._user_maximum == 3 assert progress_bar.progress_label.text() == f"{msg.device} initialization succeeded!" - assert "1 / 3 - 33 %" == progress_bar.progress_bar.center_label.text() + assert "1 / 3 - 33 %" == progress_bar.progress_bar.progressbar.text() # III. Update with message of finished DeviceInitializationProgressMessage, finished=True, success=False msg.finished = True @@ -59,7 +59,7 @@ def test_update_device_initialization_progress(progress_bar, qtbot): with qtbot.waitSignal(progress_bar.failed_devices_changed) as signal_blocker: progress_bar._update_device_initialization_progress(msg.model_dump(), {}) assert progress_bar.progress_label.text() == f"{msg.device} initialization failed!" - assert "2 / 3 - 66 %" == progress_bar.progress_bar.center_label.text() + assert "2 / 3 - 66 %" == progress_bar.progress_bar.progressbar.text() assert progress_bar.progress_bar._user_value == 2 assert progress_bar.progress_bar._user_maximum == 3 diff --git a/tests/unit_tests/test_ring_progress_bar_ring.py b/tests/unit_tests/test_ring_progress_bar_ring.py index 2700eb66..a4b10849 100644 --- a/tests/unit_tests/test_ring_progress_bar_ring.py +++ b/tests/unit_tests/test_ring_progress_bar_ring.py @@ -79,7 +79,7 @@ def test_set_update_to_scan(ring_widget): # Verify that connect_slot was called ring_widget.bec_dispatcher.connect_slot.assert_called_once() call_args = ring_widget.bec_dispatcher.connect_slot.call_args - assert call_args[0][0] == ring_widget.on_scan_progress + assert call_args[0][0] == ring_widget.progress_tracker.on_progress_update assert "scan_progress" in str(call_args[0][1]) @@ -437,7 +437,7 @@ def test_update_device_connection_with_progress_signal(ring_widget_with_device): # Should connect to device_progress endpoint ring_widget.bec_dispatcher.connect_slot.assert_called_once() call_args = ring_widget.bec_dispatcher.connect_slot.call_args - assert call_args[0][0] == ring_widget.on_device_progress + assert call_args[0][0] == ring_widget.progress_tracker.on_progress_update def test_update_device_connection_with_hinted_signal(ring_widget): @@ -630,3 +630,13 @@ def test_on_device_progress_default_values(ring_widget): # Should use defaults: value=0, max_value=100 assert ring_widget.config.value == 0 assert ring_widget.config.max_value == 100 + + +def test_cleanup_disconnects_progress_tracker(ring_widget): + ring_widget.set_update("scan") + ring_widget.bec_dispatcher.disconnect_slot = MagicMock() + + ring_widget.cleanup() + + ring_widget.bec_dispatcher.disconnect_slot.assert_called_once() + assert ring_widget.progress_tracker.progress_source is None diff --git a/tests/unit_tests/test_scan_progress_bar.py b/tests/unit_tests/test_scan_progress_bar.py index a9cf554b..1351c94f 100644 --- a/tests/unit_tests/test_scan_progress_bar.py +++ b/tests/unit_tests/test_scan_progress_bar.py @@ -2,7 +2,6 @@ from unittest import mock import numpy as np import pytest -from bec_lib import messages from bec_lib.endpoints import MessageEndpoints from bec_widgets.utils.bec_widget import BECWidget @@ -27,30 +26,6 @@ def scan_progressbar(qtbot, mocked_client): yield widget -@pytest.fixture -def scan_message(): - return messages.ScanQueueMessage( - metadata={ - "file_suffix": None, - "file_directory": None, - "user_metadata": {"sample_name": ""}, - "RID": "94949c6e-d5f2-4f01-837e-a5d36257dd5d", - }, - scan_type="line_scan", - parameter={ - "args": {"samx": [-10.0, 10.0]}, - "kwargs": { - "steps": 20, - "relative": False, - "exp_time": 0.1, - "burst_at_each_point": 1, - "system_config": {"file_suffix": None, "file_directory": None}, - }, - }, - queue="primary", - ) - - def test_progress_task_basic(): """percentage, remaining, and formatted time helpers behave as expected.""" task = ProgressTask(parent=None, value=50, max_value=100, done=False) @@ -74,8 +49,7 @@ def test_progress_task_basic(): def test_progress_task_elapsed_time_uses_monotonic_clock(monkeypatch): times = iter([100.0, 102.5]) monkeypatch.setattr( - "bec_widgets.widgets.progress.scan_progressbar.scan_progressbar.time.monotonic", - lambda: next(times), + "bec_widgets.widgets.progress.progress_backend.time.monotonic", lambda: next(times) ) task = ProgressTask(parent=None) task.timer.stop() @@ -98,6 +72,19 @@ def test_scan_progressbar_passes_dynamic_stylesheet_setting(qtbot, mocked_client assert widget.progressbar.enable_dynamic_stylesheet is False +def test_scan_progressbar_starts_from_scan_progress_before_queue_update(scan_progressbar): + scan_progressbar._clear_task(emit_finished=False) + + scan_progressbar.on_progress_update( + {"value": 3, "max_value": 10, "done": False}, metadata={"RID": "live-rid"} + ) + + assert scan_progressbar.task is not None + assert scan_progressbar._active_scan_id == "live-rid" + assert scan_progressbar.progressbar._user_value == 3 + assert scan_progressbar.progressbar._user_maximum == 10 + + def test_update_labels_content(scan_progressbar): """update_labels() reflects ProgressTask time strings on the UI.""" # fabricate a task with known timings @@ -138,8 +125,10 @@ def test_on_progress_update(qtbot, scan_progressbar): ("open", 10, 100, ProgressState.NORMAL), ("paused", 25, 100, ProgressState.PAUSED), ("aborted", 30, 100, ProgressState.INTERRUPTED), + ("halt", 40, 100, ProgressState.PAUSED), ("halted", 40, 100, ProgressState.PAUSED), ("closed", 100, 100, ProgressState.COMPLETED), + ("user_completed", 40, 100, ProgressState.PAUSED), ], ) def test_state_mapping_during_updates( @@ -158,6 +147,18 @@ def test_state_mapping_during_updates( assert scan_progressbar.progressbar.state is expected_state +def test_aborted_done_scan_keeps_partial_progress(scan_progressbar): + scan_progressbar.on_progress_update( + {"value": 4, "max_value": 10, "done": True}, + metadata={"scan_id": "scan-1", "RID": "rid-1", "status": "aborted"}, + ) + + assert scan_progressbar.progressbar._user_value == 4 + assert scan_progressbar.progressbar._user_maximum == 10 + assert scan_progressbar.progressbar.state is ProgressState.INTERRUPTED + assert scan_progressbar.task is None + + def test_source_label_updates(scan_progressbar): """update_source_label() renders correct text for both progress sources.""" # device progress @@ -205,7 +206,7 @@ def test_set_progress_source_connections(scan_progressbar, monkeypatch): assert scan_progressbar._progress_source == ProgressSource.SCAN_PROGRESS assert scan_progressbar.ui.source_label.text() == "Scan 7" - assert connect_calls[-1] == MessageEndpoints.scan_progress() + assert connect_calls == [] assert disconnect_calls == [] # switch to DEVICE_PROGRESS @@ -239,7 +240,10 @@ def test_set_progress_source_disconnects_previous_device_subscription( scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device="motor1") scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device="motor2") - assert disconnect_calls == [MessageEndpoints.device_progress(device="motor1")] + assert disconnect_calls == [ + MessageEndpoints.scan_progress(), + MessageEndpoints.device_progress(device="motor1"), + ] def test_set_progress_source_disconnects_device_when_switching_to_scan( @@ -258,7 +262,10 @@ def test_set_progress_source_disconnects_device_when_switching_to_scan( scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device="motor1") scan_progressbar.set_progress_source(ProgressSource.SCAN_PROGRESS) - assert disconnect_calls == [MessageEndpoints.device_progress(device="motor1")] + assert disconnect_calls == [ + MessageEndpoints.scan_progress(), + MessageEndpoints.device_progress(device="motor1"), + ] def test_cleanup_disconnects_active_device_subscription(scan_progressbar, monkeypatch): @@ -285,8 +292,8 @@ def test_cleanup_disconnects_active_device_subscription(scan_progressbar, monkey ScanProgressBar.cleanup(scan_progressbar) assert disconnect_calls == [ + MessageEndpoints.scan_progress(), MessageEndpoints.device_progress(device="motor1"), - MessageEndpoints.scan_queue_status(), ] assert scan_progressbar._progress_source is None assert scan_progressbar._progress_device is None @@ -305,295 +312,3 @@ def test_cleanup_stops_active_task(scan_progressbar, monkeypatch): assert not timer.isActive() assert scan_progressbar.task is None assert scan_progressbar._active_scan_id is None - - -def test_progressbar_queue_update(scan_progressbar): - """ - Test that an empty queue update does not change the progress source. - """ - msg = messages.ScanQueueStatusMessage( - queue={"primary": messages.ScanQueueStatus(info=[], status="RUNNING")} - ) - with mock.patch.object(scan_progressbar, "set_progress_source") as mock_set_source: - scan_progressbar.on_queue_update( - msg.content, msg.metadata, _override_slot_params={"verify_sender": False} - ) - mock_set_source.assert_not_called() - - -def test_progressbar_queue_update_clears_task_when_queue_is_empty(scan_progressbar): - scan_progressbar.task = ProgressTask(parent=scan_progressbar) - scan_progressbar._active_scan_id = "scan-1" - timer = scan_progressbar.task.timer - msg = messages.ScanQueueStatusMessage( - queue={"primary": messages.ScanQueueStatus(info=[], status="RUNNING")} - ) - - scan_progressbar.on_queue_update( - msg.content, msg.metadata, _override_slot_params={"verify_sender": False} - ) - - assert not timer.isActive() - assert scan_progressbar.task is None - assert scan_progressbar._active_scan_id is None - - -def test_progressbar_queue_update_clears_task_when_scan_is_not_running( - scan_progressbar, scan_message -): - scan_progressbar.task = ProgressTask(parent=scan_progressbar) - scan_progressbar._active_scan_id = "scan-1" - timer = scan_progressbar.task.timer - request_block = messages.RequestBlock( - msg=scan_message, - RID="some-rid", - scan_motors=["samx"], - readout_priority={"monitored": ["samx"]}, - is_scan=True, - scan_number=1, - scan_id="scan-1", - report_instructions=[{"scan_progress": 20}], - ) - msg = messages.ScanQueueStatusMessage( - metadata={}, - queue={ - "primary": messages.ScanQueueStatus( - info=[ - messages.QueueInfoEntry( - queue_id="queue-1", - scan_id=["scan-1"], - status="completed", - active_request_block=request_block, - is_scan=[True], - request_blocks=[request_block], - scan_number=[1], - ) - ], - status="RUNNING", - ) - }, - ) - - with mock.patch.object(scan_progressbar, "set_progress_source") as mock_set_source: - scan_progressbar.on_queue_update( - msg.content, msg.metadata, _override_slot_params={"verify_sender": False} - ) - - assert not timer.isActive() - assert scan_progressbar.task is None - assert scan_progressbar._active_scan_id is None - mock_set_source.assert_not_called() - - -def test_progressbar_queue_update_with_scan(scan_progressbar, scan_message): - """ - Test that a queue update with a scan changes the progress source to SCAN_PROGRESS. - """ - request_block = messages.RequestBlock( - msg=scan_message, - RID="some-rid", - scan_motors=["samx"], - readout_priority={"monitored": ["samx"]}, - is_scan=True, - scan_number=1, - scan_id="e3f50794-852c-4bb1-965e-41c585ab0aa9", - report_instructions=[{"scan_progress": 20}], - ) - msg = messages.ScanQueueStatusMessage( - metadata={}, - queue={ - "primary": messages.ScanQueueStatus( - info=[ - messages.QueueInfoEntry( - queue_id="40831e2c-fbd1-4432-8072-ad168a7ad964", - scan_id=["e3f50794-852c-4bb1-965e-41c585ab0aa9"], - status="RUNNING", - active_request_block=request_block, - is_scan=[True], - request_blocks=[request_block], - scan_number=[1], - ) - ], - status="RUNNING", - ) - }, - ) - - with mock.patch.object(scan_progressbar, "set_progress_source") as mock_set_source: - scan_progressbar.on_queue_update( - msg.content, msg.metadata, _override_slot_params={"verify_sender": False} - ) - mock_set_source.assert_called_once_with(ProgressSource.SCAN_PROGRESS) - - -def test_progressbar_queue_update_starts_new_task_for_new_scan(scan_progressbar, scan_message): - started = mock.Mock() - scan_progressbar.progress_started.connect(started) - - def queue_msg(scan_id: str, scan_number: int): - request_block = messages.RequestBlock( - msg=scan_message, - RID=f"rid-{scan_number}", - scan_motors=["samx"], - readout_priority={"monitored": ["samx"]}, - is_scan=True, - scan_number=scan_number, - scan_id=scan_id, - report_instructions=[{"scan_progress": 20}], - ) - return messages.ScanQueueStatusMessage( - metadata={}, - queue={ - "primary": messages.ScanQueueStatus( - info=[ - messages.QueueInfoEntry( - queue_id=f"queue-{scan_number}", - scan_id=[scan_id], - status="RUNNING", - active_request_block=request_block, - is_scan=[True], - request_blocks=[request_block], - scan_number=[scan_number], - ) - ], - status="RUNNING", - ) - }, - ) - - first_msg = queue_msg("scan-1", 1) - scan_progressbar.on_queue_update( - first_msg.content, first_msg.metadata, _override_slot_params={"verify_sender": False} - ) - first_task = scan_progressbar.task - assert first_task is not None - assert first_task.timer.isActive() - - second_msg = queue_msg("scan-2", 2) - scan_progressbar.on_queue_update( - second_msg.content, second_msg.metadata, _override_slot_params={"verify_sender": False} - ) - - assert started.call_count == 2 - assert not first_task.timer.isActive() - assert scan_progressbar.task is not first_task - assert scan_progressbar._active_scan_id == "scan-2" - - -def test_progressbar_queue_update_with_device(scan_progressbar, scan_message): - """ - Test that a queue update with a device changes the progress source to DEVICE_PROGRESS. - """ - request_block = messages.RequestBlock( - msg=scan_message, - RID="some-rid", - scan_motors=["samx"], - readout_priority={"monitored": ["samx"]}, - is_scan=True, - scan_number=1, - scan_id="e3f50794-852c-4bb1-965e-41c585ab0aa9", - report_instructions=[{"device_progress": ["samx"]}], - ) - msg = messages.ScanQueueStatusMessage( - metadata={}, - queue={ - "primary": messages.ScanQueueStatus( - info=[ - messages.QueueInfoEntry( - queue_id="40831e2c-fbd1-4432-8072-ad168a7ad964", - scan_id=["e3f50794-852c-4bb1-965e-41c585ab0aa9"], - status="RUNNING", - active_request_block=request_block, - is_scan=[True], - request_blocks=[request_block], - scan_number=[1], - ) - ], - status="RUNNING", - ) - }, - ) - - with mock.patch.object(scan_progressbar, "set_progress_source") as mock_set_source: - scan_progressbar.on_queue_update( - msg.content, msg.metadata, _override_slot_params={"verify_sender": False} - ) - mock_set_source.assert_called_once_with(ProgressSource.DEVICE_PROGRESS, device="samx") - - -def test_progressbar_queue_update_ignores_empty_device_progress(scan_progressbar, scan_message): - request_block = messages.RequestBlock( - msg=scan_message, - RID="some-rid", - scan_motors=["samx"], - readout_priority={"monitored": ["samx"]}, - is_scan=True, - scan_number=1, - scan_id="e3f50794-852c-4bb1-965e-41c585ab0aa9", - report_instructions=[{"device_progress": []}], - ) - msg = messages.ScanQueueStatusMessage( - metadata={}, - queue={ - "primary": messages.ScanQueueStatus( - info=[ - messages.QueueInfoEntry( - queue_id="40831e2c-fbd1-4432-8072-ad168a7ad964", - scan_id=["e3f50794-852c-4bb1-965e-41c585ab0aa9"], - status="RUNNING", - active_request_block=request_block, - is_scan=[True], - request_blocks=[request_block], - scan_number=[1], - ) - ], - status="RUNNING", - ) - }, - ) - - with mock.patch.object(scan_progressbar, "set_progress_source") as mock_set_source: - scan_progressbar.on_queue_update( - msg.content, msg.metadata, _override_slot_params={"verify_sender": False} - ) - mock_set_source.assert_not_called() - - -def test_progressbar_queue_update_with_no_scan_or_device(scan_progressbar, scan_message): - """ - Test that a queue update with neither scan nor device does not change the progress source. - """ - request_block = messages.RequestBlock( - msg=scan_message, - RID="some-rid", - scan_motors=["samx"], - readout_priority={"monitored": ["samx"]}, - is_scan=True, - scan_number=1, - scan_id="e3f50794-852c-4bb1-965e-41c585ab0aa9", - ) - msg = messages.ScanQueueStatusMessage( - metadata={}, - queue={ - "primary": messages.ScanQueueStatus( - info=[ - messages.QueueInfoEntry( - queue_id="40831e2c-fbd1-4432-8072-ad168a7ad964", - scan_id=["e3f50794-852c-4bb1-965e-41c585ab0aa9"], - status="RUNNING", - active_request_block=request_block, - is_scan=[True], - request_blocks=[request_block], - scan_number=[1], - ) - ], - status="RUNNING", - ) - }, - ) - - with mock.patch.object(scan_progressbar, "set_progress_source") as mock_set_source: - scan_progressbar.on_queue_update( - msg.content, msg.metadata, _override_slot_params={"verify_sender": False} - ) - mock_set_source.assert_not_called()