Compare commits

...

6 Commits

12 changed files with 1203 additions and 798 deletions
+1 -1
View File
@@ -427,7 +427,7 @@ class BECMainWindow(RPCBase):
class BECProgressBar(RPCBase):
"""A custom progress bar with smooth transitions. The displayed text can be customized using a template."""
"""A BEC progress bar backed by Qt's native QProgressBar."""
_IMPORT_MODULE = "bec_widgets.widgets.progress.bec_progressbar.bec_progressbar"
@@ -46,8 +46,8 @@ logger = bec_logger.logger
class BECMainWindow(BECWidget, QMainWindow):
RPC = True
PLUGIN = True
SCAN_PROGRESS_WIDTH = 100 # px
SCAN_PROGRESS_HEIGHT = 12 # px
SCAN_PROGRESS_WIDTH = 120 # px
SCAN_PROGRESS_HEIGHT = 20 # px
def __init__(self, parent=None, window_title: str = "BEC", **kwargs):
super().__init__(parent=parent, **kwargs)
@@ -197,7 +197,11 @@ class BECMainWindow(BECWidget, QMainWindow):
# Setting HoverWidget for the scan progress bar - minimal and full version
self._scan_progress_bar_simple = ScanProgressBar(
self, one_line_design=True, rpc_exposed=False, rpc_passthrough_children=False
self,
one_line_design=True,
rpc_exposed=False,
rpc_passthrough_children=False,
enable_dynamic_stylesheet=True,
)
self._scan_progress_bar_simple.show_elapsed_time = False
self._scan_progress_bar_simple.show_remaining_time = False
@@ -205,8 +209,9 @@ class BECMainWindow(BECWidget, QMainWindow):
self._scan_progress_bar_simple.progressbar.label_template = ""
self._scan_progress_bar_simple.progressbar.setFixedHeight(self.SCAN_PROGRESS_HEIGHT)
self._scan_progress_bar_simple.progressbar.setFixedWidth(self.SCAN_PROGRESS_WIDTH)
# This one do not need dynamic styling on hover ScanProgressBar since user will hover on it probably later, when progress bar is big enough
self._scan_progress_bar_full = ScanProgressBar(
self, rpc_exposed=False, rpc_passthrough_children=False
self, rpc_exposed=False, rpc_passthrough_children=False, enable_dynamic_stylesheet=False
)
self._scan_progress_hover = HoverWidget(
self, simple=self._scan_progress_bar_simple, full=self._scan_progress_bar_full
@@ -233,8 +238,8 @@ class BECMainWindow(BECWidget, QMainWindow):
# The actual line
line = QFrame()
line.setFrameShape(QFrame.VLine)
line.setFrameShadow(QFrame.Sunken)
line.setFrameShape(QFrame.Shape.VLine)
line.setFrameShadow(QFrame.Shadow.Sunken)
line.setFixedHeight(status_bar.sizeHint().height() - 2)
# Wrapper to center the line vertically -> work around for QFrame not being able to center itself
@@ -242,7 +247,7 @@ class BECMainWindow(BECWidget, QMainWindow):
vbox = QVBoxLayout(wrapper)
vbox.setContentsMargins(0, 0, 0, 0)
vbox.addStretch()
vbox.addWidget(line, alignment=Qt.AlignHCenter)
vbox.addWidget(line, alignment=Qt.AlignmentFlag.AlignHCenter)
vbox.addStretch()
wrapper.setFixedWidth(line.sizeHint().width())
@@ -192,7 +192,7 @@ class ScanGroupBox(QGroupBox):
vbox_layout = QVBoxLayout(self)
hbox_layout = QHBoxLayout()
vbox_layout.addLayout(hbox_layout)
self.layout = QGridLayout(self)
self.layout = QGridLayout()
vbox_layout.addLayout(self.layout)
# Add bundle button
@@ -2,50 +2,41 @@ import sys
from enum import Enum
from string import Template
from qtpy.QtCore import QEasingCurve, QPropertyAnimation, QRectF, Qt, QTimer
from qtpy.QtGui import QColor, QPainter, QPainterPath
class ProgressState(Enum):
NORMAL = "normal"
PAUSED = "paused"
INTERRUPTED = "interrupted"
COMPLETED = "completed"
@classmethod
def from_bec_status(cls, status: str) -> "ProgressState":
"""
Map a BEC status string (open, paused, aborted, halted, closed)
to the corresponding ProgressState.
Any unknown status falls back to NORMAL.
"""
mapping = {
"open": cls.NORMAL,
"paused": cls.PAUSED,
"aborted": cls.INTERRUPTED,
"halted": cls.PAUSED,
"closed": cls.COMPLETED,
}
return mapping.get(status.lower(), cls.NORMAL)
PROGRESS_STATE_COLORS = {
ProgressState.NORMAL: QColor("#2979ff"), # blue normal progress
ProgressState.PAUSED: QColor("#ffca28"), # orange/amber paused
ProgressState.INTERRUPTED: QColor("#ff5252"), # red interrupted
ProgressState.COMPLETED: QColor("#00e676"), # green finished
}
from qtpy.QtWidgets import QApplication, QLabel, QVBoxLayout, QWidget
from qtpy.QtCore import QTimer
from qtpy.QtGui import QPalette
from qtpy.QtWidgets import QApplication, QProgressBar, QSizePolicy, QVBoxLayout, QWidget
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
class ProgressState(Enum):
NORMAL = "normal"
PAUSED = "paused"
WARNING = "warning"
INTERRUPTED = "interrupted"
COMPLETED = "completed"
class BECProgressBar(BECWidget, QWidget):
"""
A custom progress bar with smooth transitions. The displayed text can be customized using a template.
A BEC progress bar backed by Qt's native QProgressBar.
The displayed text can be customized using a template with $value, $maximum,
and $percentage placeholders.
Args:
parent: Parent Qt widget.
client: Optional BEC client instance.
config: Optional widget configuration.
gui_id: Optional GUI identifier used by the BEC widget infrastructure.
enable_dynamic_stylesheet: If True, adjust the chunk border radius while the
filled chunk is still too narrow for the target radius. This avoids Qt
stylesheet over-rounding artifacts on small progress values. Once the
target radius is usable, normal value updates no longer rebuild the
stylesheet.
**kwargs: Additional keyword arguments forwarded to BECWidget.
"""
PLUGIN = True
@@ -61,7 +52,15 @@ class BECProgressBar(BECWidget, QWidget):
]
ICON_NAME = "page_control"
def __init__(self, parent=None, client=None, config=None, gui_id=None, **kwargs):
def __init__(
self,
parent=None,
client=None,
config=None,
gui_id=None,
enable_dynamic_stylesheet: bool = True,
**kwargs,
):
super().__init__(
parent=parent, client=client, gui_id=gui_id, config=config, theme_update=True, **kwargs
)
@@ -71,7 +70,6 @@ class BECProgressBar(BECWidget, QWidget):
# internal values
self._oversampling_factor = 50
self._value = 0
self._target_value = 0
self._maximum = 100 * self._oversampling_factor
# User values
@@ -80,46 +78,38 @@ class BECProgressBar(BECWidget, QWidget):
self._user_maximum = 100
self._label_template = "$value / $maximum - $percentage %"
# Color settings
self._background_color = QColor(30, 30, 30)
self._progress_color = accent_colors.highlight
self._completed_color = accent_colors.success
self._border_color = QColor(50, 50, 50)
# Cornerrounding: base radius in pixels (autoreduced if bar is small)
self._corner_radius = 10
self._corner_radius = 8
# Progressbar state handling
self._state = ProgressState.NORMAL
self._state_colors = {
ProgressState.NORMAL: accent_colors.default,
ProgressState.PAUSED: accent_colors.warning,
ProgressState.PAUSED: accent_colors.highlight,
ProgressState.WARNING: accent_colors.warning,
ProgressState.INTERRUPTED: accent_colors.emergency,
ProgressState.COMPLETED: accent_colors.success,
}
# layout settings
self._padding_left_right = 10
self._value_animation = QPropertyAnimation(self, b"_progressbar_value")
self._value_animation.setDuration(200)
self._value_animation.setEasingCurve(QEasingCurve.Type.OutCubic)
self._chunk_radius = None
self._enable_dynamic_stylesheet = enable_dynamic_stylesheet
# label on top of the progress bar
self.center_label = QLabel(self)
self.center_label.setAlignment(Qt.AlignHCenter)
self.center_label.setMinimumSize(0, 0)
self.center_label.setStyleSheet("background: transparent; color: white;")
self.progressbar = QProgressBar(self)
self.progressbar.setTextVisible(True)
self.progressbar.setRange(0, self._maximum)
self.progressbar.setMinimumHeight(0)
self.progressbar.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Ignored)
layout = QVBoxLayout(self)
layout.setContentsMargins(10, 0, 10, 0)
layout.setSpacing(0)
layout.addWidget(self.center_label)
layout.setAlignment(self.center_label, Qt.AlignCenter)
self.setLayout(layout)
self._layout = QVBoxLayout(self)
self._layout.setContentsMargins(self._padding_left_right, 0, self._padding_left_right, 0)
self._layout.setSpacing(0)
self._layout.addWidget(self.progressbar)
self.setLayout(self._layout)
self.update()
self._adjust_label_width()
self._sync_progressbar()
self._apply_state_style()
@SafeProperty(
str, doc="The template for the center label. Use $value, $maximum, and $percentage."
@@ -140,17 +130,18 @@ class BECProgressBar(BECWidget, QWidget):
accent_colors = get_accent_colors()
self._state_colors = {
ProgressState.NORMAL: accent_colors.default,
ProgressState.PAUSED: accent_colors.warning,
ProgressState.PAUSED: accent_colors.highlight,
ProgressState.WARNING: accent_colors.warning,
ProgressState.INTERRUPTED: accent_colors.emergency,
ProgressState.COMPLETED: accent_colors.success,
}
self._chunk_radius = None
self._apply_state_style()
@label_template.setter
def label_template(self, template):
self._label_template = template
self._adjust_label_width()
self.set_value(self._user_value)
self.update()
self._sync_progressbar()
@SafeProperty(float, designable=False)
def _progressbar_value(self):
@@ -162,28 +153,16 @@ class BECProgressBar(BECWidget, QWidget):
@_progressbar_value.setter
def _progressbar_value(self, val):
self._value = val
self.update()
self.progressbar.setValue(int(round(val)))
def _update_template(self):
template = Template(self._label_template)
return template.safe_substitute(
value=self._user_value,
maximum=self._user_maximum,
percentage=int((self.map_value(self._user_value) / self._maximum) * 100),
percentage=int(self._percentage(self._user_value)),
)
def _adjust_label_width(self):
"""
Reserve enough horizontal space for the center label so the widget
doesn't resize as the text grows during progress.
"""
template = Template(self._label_template)
sample_text = template.safe_substitute(
value=self._user_maximum, maximum=self._user_maximum, percentage=100
)
width = self.center_label.fontMetrics().horizontalAdvance(sample_text)
self.center_label.setFixedWidth(width)
@SafeSlot(float)
@SafeSlot(int)
def set_value(self, value):
@@ -193,21 +172,35 @@ class BECProgressBar(BECWidget, QWidget):
Args:
value (float): The value to set.
"""
if value > self._user_maximum:
value = self._user_maximum
elif value < self._user_minimum:
value = self._user_minimum
self._target_value = self.map_value(value)
self._user_value = value
self.center_label.setText(self._update_template())
previous_visual_state = self._current_visual_state()
previous_value = self._value
self._user_value = self._clamp_value(value)
self._value = self.map_value(self._user_value)
if self._enable_dynamic_stylesheet and self._value < previous_value:
self._chunk_radius = None
# Update state automatically unless paused or interrupted
if self._state not in (ProgressState.PAUSED, ProgressState.INTERRUPTED):
if self._state not in (
ProgressState.PAUSED,
ProgressState.WARNING,
ProgressState.INTERRUPTED,
):
self._state = (
ProgressState.COMPLETED
if self._user_value >= self._user_maximum
else ProgressState.NORMAL
)
self.animate_progress()
self._sync_progressbar()
visual_state_changed = self._current_visual_state() is not previous_visual_state
if visual_state_changed:
self._chunk_radius = None
if (
self._enable_dynamic_stylesheet
and not visual_state_changed
and (self._chunk_radius is None or self._chunk_radius != self._target_chunk_radius())
):
self._update_chunk_radius()
if visual_state_changed:
self._apply_state_style()
@SafeProperty(object, doc="Current visual state of the progress bar.")
def state(self):
@@ -226,7 +219,8 @@ class BECProgressBar(BECWidget, QWidget):
if not isinstance(state, ProgressState):
raise ValueError("state must be a ProgressState or its value")
self._state = state
self.update()
self._chunk_radius = None
self._apply_state_style()
@SafeProperty(float, doc="Base corner radius in pixels (autoscaled down on small bars).")
def corner_radius(self) -> float:
@@ -235,7 +229,18 @@ class BECProgressBar(BECWidget, QWidget):
@corner_radius.setter
def corner_radius(self, radius: float):
self._corner_radius = max(0.0, radius)
self.update()
self._chunk_radius = None
self._apply_state_style()
@SafeProperty(bool)
def enable_dynamic_stylesheet(self) -> bool:
return self._enable_dynamic_stylesheet
@enable_dynamic_stylesheet.setter
def enable_dynamic_stylesheet(self, enabled: bool):
self._enable_dynamic_stylesheet = bool(enabled)
self._chunk_radius = None
self._apply_state_style()
@SafeProperty(float)
def padding_left_right(self) -> float:
@@ -244,60 +249,12 @@ class BECProgressBar(BECWidget, QWidget):
@padding_left_right.setter
def padding_left_right(self, padding: float):
self._padding_left_right = padding
self.update()
self._layout.setContentsMargins(int(round(padding)), 0, int(round(padding)), 0)
def paintEvent(self, event):
painter = QPainter(self)
painter.setRenderHint(QPainter.Antialiasing)
rect = self.rect().adjusted(self._padding_left_right, 0, -self._padding_left_right, -1)
# Corner radius adapts to widget height so it never exceeds half the bars thickness
radius = min(self._corner_radius, rect.height() / 2)
# Draw background
painter.setBrush(self._background_color)
painter.setPen(Qt.NoPen)
painter.drawRoundedRect(rect, radius, radius) # Rounded corners
# Draw border
painter.setBrush(Qt.NoBrush)
painter.setPen(self._border_color)
painter.drawRoundedRect(rect, radius, radius)
# Determine progress colour based on current state
if self._state == ProgressState.PAUSED:
current_color = self._state_colors[ProgressState.PAUSED]
elif self._state == ProgressState.INTERRUPTED:
current_color = self._state_colors[ProgressState.INTERRUPTED]
elif self._state == ProgressState.COMPLETED or self._value >= self._maximum:
current_color = self._state_colors[ProgressState.COMPLETED]
else:
current_color = self._state_colors[ProgressState.NORMAL]
# Set clipping region to preserve the background's rounded corners
progress_rect = rect.adjusted(
0, 0, int(-rect.width() + (self._value / self._maximum) * rect.width()), 0
)
clip_path = QPainterPath()
clip_path.addRoundedRect(
QRectF(rect), radius, radius
) # Clip to the background's rounded corners
painter.setClipPath(clip_path)
# Draw progress bar
painter.setBrush(current_color)
painter.drawRect(progress_rect) # Less rounded, no additional rounding
painter.end()
def animate_progress(self):
"""
Animate the progress bar from the current value to the target value.
"""
self._value_animation.stop()
self._value_animation.setStartValue(self._value)
self._value_animation.setEndValue(self._target_value)
self._value_animation.start()
def resizeEvent(self, event):
super().resizeEvent(event)
self._chunk_radius = None
self._update_chunk_radius()
@SafeProperty(float)
def maximum(self):
@@ -343,10 +300,11 @@ class BECProgressBar(BECWidget, QWidget):
Args:
maximum (float): The maximum value.
"""
previous_maximum = self._user_maximum
self._user_maximum = maximum
self._adjust_label_width()
if self._enable_dynamic_stylesheet and maximum != previous_maximum:
self._chunk_radius = None
self.set_value(self._user_value) # Update the value to fit the new range
self.update()
@SafeSlot(float)
def set_minimum(self, minimum: float):
@@ -356,40 +314,126 @@ class BECProgressBar(BECWidget, QWidget):
Args:
minimum (float): The minimum value.
"""
previous_minimum = self._user_minimum
self._user_minimum = minimum
if self._enable_dynamic_stylesheet and minimum != previous_minimum:
self._chunk_radius = None
self.set_value(self._user_value) # Update the value to fit the new range
self.update()
def map_value(self, value: float):
"""
Map the user value to the range [0, 100*self._oversampling_factor] for the progress
"""
return (
(value - self._user_minimum) / (self._user_maximum - self._user_minimum) * self._maximum
)
span = self._user_maximum - self._user_minimum
if span <= 0:
return float(self._maximum if value >= self._user_maximum else 0)
mapped_value = (value - self._user_minimum) / span * self._maximum
return min(float(self._maximum), max(0.0, mapped_value))
def _percentage(self, value: float) -> float:
return (self.map_value(value) / self._maximum) * 100 if self._maximum else 0.0
def _clamp_value(self, value: float) -> float:
if self._user_maximum <= self._user_minimum:
return self._user_maximum
return min(self._user_maximum, max(self._user_minimum, value))
def _sync_progressbar(self) -> None:
self.progressbar.setRange(0, int(self._maximum))
self.progressbar.setValue(int(round(self._value)))
self.progressbar.setFormat(self._update_template())
def _setup_style_sheet(self, *, chunk_radius: int) -> None:
radius = int(round(self._corner_radius))
chunk_color = self._state_colors[self._current_visual_state()].name()
self.progressbar.setStyleSheet(f"""
QProgressBar {{
background-color: palette(mid);
border: none;
border-radius: {radius}px;
color: palette(text);
text-align: center;
}}
QProgressBar::chunk {{
background-color: {chunk_color};
border-radius: {chunk_radius}px;
}}
""")
def _update_chunk_radius(self) -> None:
chunk_radius = self._current_chunk_radius()
if chunk_radius != self._chunk_radius:
self._chunk_radius = chunk_radius
self._setup_style_sheet(chunk_radius=chunk_radius)
self._apply_state_palette()
def _apply_state_style(self) -> None:
if self._chunk_radius is None:
self._chunk_radius = self._current_chunk_radius()
self._setup_style_sheet(chunk_radius=self._chunk_radius)
self._apply_state_palette()
def _apply_state_palette(self) -> None:
color = self._state_colors[self._current_visual_state()]
palette = self.progressbar.palette()
palette.setColor(QPalette.ColorRole.Highlight, color)
palette.setColor(QPalette.ColorRole.HighlightedText, palette.color(QPalette.ColorRole.Text))
self.progressbar.setPalette(palette)
def _current_chunk_radius(self) -> int:
target_radius = self._target_chunk_radius()
if not self._enable_dynamic_stylesheet:
return target_radius
return self._calculate_chunk_radius(target_radius)
def _target_chunk_radius(self) -> int:
radius = int(round(self._corner_radius))
return max(0, radius - 1)
def _calculate_chunk_radius(self, target_radius: int) -> int:
"""
Scale the chunk radius down while the filled part is narrower than the target radius.
Qt stylesheets otherwise over-round very small chunks.
"""
if target_radius <= 0 or self._maximum <= 0:
return 0
fill_width = self.progressbar.width() * min(1.0, max(0.0, self._value / self._maximum))
if fill_width <= 0:
return 0
return min(target_radius, max(1, int(fill_width / 2)))
def _current_visual_state(self) -> ProgressState:
if self._state in (ProgressState.PAUSED, ProgressState.WARNING, ProgressState.INTERRUPTED):
return self._state
if self._state == ProgressState.COMPLETED or self._value >= self._maximum:
return ProgressState.COMPLETED
return ProgressState.NORMAL
def _get_label(self) -> str:
"""Return the label text. mostly used for testing rpc."""
return self.center_label.text()
return self.progressbar.text()
if __name__ == "__main__": # pragma: no cover
app = QApplication(sys.argv)
progressBar = BECProgressBar()
progressBar.show()
progressBar.set_minimum(-100)
progressBar.set_maximum(0)
progress_bar = BECProgressBar()
progress_bar.setWindowTitle("BEC Progress Bar")
progress_bar.resize(360, 48)
progress_bar.set_minimum(-100)
progress_bar.set_maximum(0)
progress_bar.set_value(-100)
progress_bar.show()
# Example of setting values
def update_progress():
value = progressBar._user_value + 2.5
if value > progressBar._user_maximum:
value = -100 # progressBar._maximum / progressBar._upsampling_factor
progressBar.set_value(value)
value = progress_bar._user_value + 2.5
if value > progress_bar._user_maximum:
value = progress_bar._user_minimum
progress_bar.set_value(value)
timer = QTimer()
timer = QTimer(progress_bar)
timer.timeout.connect(update_progress)
timer.start(200) # Update every half second
timer.start(200)
sys.exit(app.exec())
@@ -0,0 +1,280 @@
from __future__ import annotations
import time
from dataclasses import dataclass
from typing import Literal
import numpy as np
from bec_lib.endpoints import MessageEndpoints
from qtpy.QtCore import QObject, QTimer, Signal
from bec_widgets.utils.error_popups import SafeSlot
@dataclass(frozen=True)
class ProgressSnapshot:
value: float
max_value: float
done: bool
status: Literal["open", "paused", "aborted", "halted", "closed", "user_completed"]
scan_id: str | None = None
scan_number: int | None = None
rid: str | None = None
is_new_scan: bool = False
class ProgressTask(QObject):
"""
Class to store progress information.
Inspired by https://github.com/Textualize/rich/blob/master/rich/progress.py
"""
def __init__(
self, parent: QObject | None, value: float = 0, max_value: float = 0, done: bool = False
):
super().__init__(parent=parent)
self.start_time = time.monotonic()
self.done = done
self.value = value
self.max_value = max_value
self._elapsed_time = 0
self.timer = QTimer(self)
self.timer.timeout.connect(self.update_elapsed_time)
self.timer.start(1000)
def update(self, value: float, max_value: float, done: bool = False):
"""
Update the progress.
"""
self.max_value = max_value
self.done = done
self.value = value
if done:
self.timer.stop()
def update_elapsed_time(self):
"""
Update the time estimates. This is called every second by a QTimer.
"""
self._elapsed_time = max(0.0, time.monotonic() - self.start_time)
@property
def percentage(self) -> float:
"""float: Get progress of task as a percentage. If a None total was set, returns 0"""
if not self.max_value:
return 0.0
completed = (self.value / self.max_value) * 100.0
completed = min(100.0, max(0.0, completed))
return completed
@property
def speed(self) -> float:
"""Get the estimated speed in steps per second."""
if self._elapsed_time == 0:
return 0.0
return self.value / self._elapsed_time
@property
def frequency(self) -> float:
"""Get the estimated frequency in steps per second."""
if self.speed == 0:
return 0.0
return 1 / self.speed
@property
def time_elapsed(self) -> str:
return self._format_time(int(self._elapsed_time))
@property
def remaining(self) -> float:
"""Get the estimated remaining steps."""
if self.done:
return 0.0
remaining = self.max_value - self.value
return remaining
@property
def time_remaining(self) -> str:
"""
Get the estimated remaining time in the format HH:MM:SS.
"""
if self.done or not self.speed or not self.remaining:
return self._format_time(0)
estimate = int(np.round(self.remaining / self.speed))
return self._format_time(estimate)
@staticmethod
def _format_time(seconds: float) -> str:
"""
Format the time in seconds to a string in the format HH:MM:SS.
"""
return f"{seconds // 3600:02}:{(seconds // 60) % 60:02}:{seconds % 60:02}"
class BECProgressTracker(QObject):
"""
Shared backend for BEC scan progress messages.
"""
progress_started = Signal(object)
progress_updated = Signal(object)
progress_finished = Signal(object)
progress_cleared = Signal()
def __init__(self, bec_dispatcher, parent: QObject | None = None):
super().__init__(parent=parent)
self.bec_dispatcher = bec_dispatcher
self._connected = False
self.task: ProgressTask | None = None
self.scan_number: int | None = None
self._active_scan_id: str | None = None
self._active_rid: str | None = None
self._last_reset_scan_id: str | None = None
def start(self) -> None:
if self._connected:
return
self.bec_dispatcher.connect_slot(
self.process_progress_message, MessageEndpoints.scan_progress()
)
self.bec_dispatcher.connect_slot(
self.process_scan_status_message, MessageEndpoints.scan_status()
)
self._connected = True
def _start_task(self, scan_id: str | None, rid: str | None = None) -> None:
if self.task is not None:
self.task.timer.stop()
self.task.deleteLater()
self.task = ProgressTask(parent=self)
self._active_scan_id = scan_id
self._active_rid = rid
self.progress_started.emit(
ProgressSnapshot(
value=0,
max_value=100,
done=False,
status="open",
scan_id=self._active_scan_id,
scan_number=self.scan_number,
rid=self._active_rid,
)
)
def clear_task(self, *, emit_finished: bool = True) -> None:
if self.task is None:
self._active_scan_id = None
self._active_rid = None
self.progress_cleared.emit()
return
self.task.timer.stop()
self.task.deleteLater()
self.task = None
self._active_scan_id = None
self._active_rid = None
self.progress_cleared.emit()
if emit_finished:
self.progress_finished.emit(
ProgressSnapshot(
value=0,
max_value=100,
done=True,
status="open",
scan_id=self._active_scan_id,
scan_number=self.scan_number,
rid=self._active_rid,
)
)
@SafeSlot(dict, dict)
def process_progress_message(
self, msg_content: dict, metadata: dict
) -> ProgressSnapshot | None:
done = msg_content.get("done", False)
value = msg_content.get("value", 0)
max_value = msg_content.get("max_value", 100)
status: Literal["open", "paused", "aborted", "halted", "closed", "user_completed"] = (
metadata.get("status", "open")
)
scan_id = metadata.get("scan_id") or metadata.get("RID")
rid = metadata.get("RID")
scan_number = metadata.get("scan_number")
if scan_number is not None:
self.scan_number = scan_number
is_new_scan = False
previous_scan_id = self._active_scan_id
previous_rid = self._active_rid
identity_changed = (
(scan_id is not None and scan_id != previous_scan_id)
or (rid is not None and rid != previous_rid)
or (previous_scan_id is None and previous_rid is None)
)
if self.task is None:
self._start_task(scan_id, rid=rid)
is_new_scan = identity_changed
elif scan_id is not None and scan_id != self._active_scan_id:
self._start_task(scan_id, rid=rid)
is_new_scan = True
elif rid is not None and rid != self._active_rid:
self._start_task(scan_id or self._active_scan_id, rid=rid)
is_new_scan = True
if self.task is None:
return None
self.task.update(value, max_value, done)
snapshot = ProgressSnapshot(
value=value,
max_value=max_value,
done=done,
status=status,
scan_id=self._active_scan_id,
scan_number=self.scan_number,
rid=self._active_rid,
is_new_scan=is_new_scan,
)
self.progress_updated.emit(snapshot)
if done:
self.clear_task()
return snapshot
@SafeSlot(dict, dict)
def process_scan_status_message(
self, msg_content: dict, metadata: dict
) -> ProgressSnapshot | None:
if msg_content.get("status") != "open":
return None
scan_id = msg_content.get("scan_id") or metadata.get("scan_id") or metadata.get("RID")
if scan_id is None or scan_id == self._last_reset_scan_id:
return None
self.clear_task(emit_finished=False)
self._last_reset_scan_id = scan_id
self.scan_number = msg_content.get("scan_number")
snapshot = ProgressSnapshot(
value=0,
max_value=100,
done=False,
status="open",
scan_id=scan_id,
scan_number=self.scan_number,
rid=metadata.get("RID"),
is_new_scan=True,
)
self.progress_updated.emit(snapshot)
return snapshot
def cleanup(self) -> None:
self.clear_task(emit_finished=False)
if self._connected:
self.bec_dispatcher.disconnect_slot(
self.process_progress_message, MessageEndpoints.scan_progress()
)
self.bec_dispatcher.disconnect_slot(
self.process_scan_status_message, MessageEndpoints.scan_status()
)
self._connected = False
@@ -13,6 +13,7 @@ from bec_widgets import BECWidget
from bec_widgets.utils.bec_connector import ConnectionConfig
from bec_widgets.utils.colors import Colors
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.widgets.progress.progress_backend import BECProgressTracker, ProgressSnapshot
logger = bec_logger.logger
if TYPE_CHECKING:
@@ -81,6 +82,8 @@ class Ring(BECWidget, QWidget):
self._color: QColor = self.convert_color(self.config.color)
self._background_color: QColor = self.convert_color(self.config.background_color)
self.registered_slot: tuple[Callable, str | EndpointInfo] | None = None
self.progress_tracker = BECProgressTracker(self.bec_dispatcher, parent=self)
self.progress_tracker.progress_updated.connect(self._on_progress_snapshot)
self.RID = None
self._gap = 5
self._hovered = False
@@ -219,35 +222,32 @@ class Ring(BECWidget, QWidget):
case "manual":
if self.config.mode == "manual":
return
if self.registered_slot is not None:
self.bec_dispatcher.disconnect_slot(*self.registered_slot)
self._disconnect_registered_update()
self.config.mode = "manual"
self.registered_slot = None
case "scan":
if self.config.mode == "scan":
return
if self.registered_slot is not None:
self.bec_dispatcher.disconnect_slot(*self.registered_slot)
self._disconnect_registered_update()
self.config.mode = "scan"
self.bec_dispatcher.connect_slot(
self.on_scan_progress, MessageEndpoints.scan_progress()
)
self.registered_slot = (self.on_scan_progress, MessageEndpoints.scan_progress())
self.progress_tracker.start()
case "device":
if self.registered_slot is not None:
self.bec_dispatcher.disconnect_slot(*self.registered_slot)
self._disconnect_registered_update()
self.config.mode = "device"
if device == "":
self.registered_slot = None
return
self.config.device = device
# self.config.signal = self._get_signal_from_device(device, signal)
signal = self._update_device_connection(device, signal)
self.config.signal = signal
case _:
raise ValueError(f"Unsupported mode: {mode}")
def _disconnect_registered_update(self):
if self.registered_slot is not None:
self.bec_dispatcher.disconnect_slot(*self.registered_slot)
self.registered_slot = None
self.progress_tracker.cleanup()
def set_precision(self, precision: int):
"""
Set the precision for the ring widget.
@@ -270,13 +270,13 @@ class Ring(BECWidget, QWidget):
def _get_signals_for_device(self, device: str) -> dict[str, list[str]]:
"""
Get the signals for the device.
Get the appropriate signals for the device to be used in the ring widget, based on the signal infos from the device manager.
Args:
device(str): Device name for the device
device(str): Device name for the device readback mode
Returns:
dict[str, list[str]]: Dictionary with the signals for the device
dict[str, list[str]]: Signal infos for the device to be used in the ring widget
"""
dm = self.bec_dispatcher.client.device_manager
if not dm:
@@ -285,24 +285,25 @@ class Ring(BECWidget, QWidget):
if dev_obj is None:
raise ValueError(f"Device '{device}' not found in device manager.")
signal_infos = getattr(dev_obj, "_info", {}).get("signals", {})
progress_signals = [
obj["component_name"]
for obj in dev_obj._info["signals"].values()
if obj["signal_class"] == "ProgressSignal"
for obj in signal_infos.values()
if obj.get("signal_class") == "ProgressSignal"
]
hinted_signals = [
obj["obj_name"]
for obj in dev_obj._info["signals"].values()
if obj["kind_str"] == "hinted"
and obj["signal_class"]
for obj in signal_infos.values()
if obj.get("kind_str") == "hinted"
and obj.get("signal_class")
not in ["ProgressSignal", "AsyncSignal", "AsyncMultiSignal", "DynamicSignal"]
]
normal_signals = [
obj["component_name"]
for obj in dev_obj._info["signals"].values()
if obj["kind_str"] == "normal"
for obj in signal_infos.values()
if obj.get("kind_str") == "normal"
]
return {
"progress_signals": progress_signals,
"hinted_signals": hinted_signals,
@@ -311,21 +312,15 @@ class Ring(BECWidget, QWidget):
def _update_device_connection(self, device: str, signal: str | None) -> str:
"""
Update the device connection for the ring widget.
Subscribe device mode to the endpoint matching the selected signal.
In general, we support two modes here:
- If signal is provided, we use that directly.
- If signal is not provided, we try to get the signal from the device manager.
We first check for progress signals, then for hinted signals, and finally for normal signals.
Depending on what type of signal we get (progress or hinted/normal), we subscribe to different endpoints.
Args:
device(str): Device name for the device mode
signal(str): Signal name for the device mode
When no signal is provided, the ring selects the first available progress
signal, then the first hinted readback signal, then the first normal
readback signal. Progress signals use the device_progress endpoint;
readback signals use the device_readback endpoint.
Returns:
str: The selected signal name for the device mode
The selected signal name, or an empty string if the device is not known.
"""
logger.info(f"Updating device connection for device '{device}' and signal '{signal}'")
dm = self.bec_dispatcher.client.device_manager
@@ -341,18 +336,17 @@ class Ring(BECWidget, QWidget):
normal_signals = signals["normal_signals"]
if not signal:
# If signal is not provided, we try to get it from the device manager
if len(progress_signals) > 0:
if progress_signals:
signal = progress_signals[0]
logger.info(
f"Using progress signal '{signal}' for device '{device}' in ring progress bar."
)
elif len(hinted_signals) > 0:
elif hinted_signals:
signal = hinted_signals[0]
logger.info(
f"Using hinted signal '{signal}' for device '{device}' in ring progress bar."
)
elif len(normal_signals) > 0:
elif normal_signals:
signal = normal_signals[0]
logger.info(
f"Using normal signal '{signal}' for device '{device}' in ring progress bar."
@@ -366,26 +360,18 @@ class Ring(BECWidget, QWidget):
self.bec_dispatcher.connect_slot(self.on_device_progress, endpoint)
self.registered_slot = (self.on_device_progress, endpoint)
return signal
if signal in hinted_signals or signal in normal_signals:
endpoint = MessageEndpoints.device_readback(device)
self.bec_dispatcher.connect_slot(self.on_device_readback, endpoint)
self.registered_slot = (self.on_device_readback, endpoint)
return signal
@SafeSlot(dict, dict)
def on_scan_progress(self, msg, meta):
"""
Update the ring widget with the scan progress.
Args:
msg(dict): Message with the scan progress
meta(dict): Metadata for the message
"""
current_RID = meta.get("RID", None)
if current_RID != self.RID:
self.set_min_max_values(0, msg.get("max_value", 100))
self.set_value(msg.get("value", 0))
self.update()
raise ValueError(
f"Signal '{signal}' is not usable for ring progress device mode. "
f"Available progress signals: {progress_signals}; "
f"available readback signals: {hinted_signals + normal_signals}."
)
@SafeSlot(dict, dict)
def on_device_readback(self, msg, meta):
@@ -408,30 +394,31 @@ class Ring(BECWidget, QWidget):
@SafeSlot(dict, dict)
def on_device_progress(self, msg, meta):
"""
Update the ring widget with the device progress.
Args:
msg(dict): Message with the device progress
meta(dict): Metadata for the message
"""
device = self.config.device
if device is None:
return
max_val = msg.get("max_value", 100)
self.set_min_max_values(0, max_val)
value = msg.get("value", 0)
if msg.get("done"):
value = max_val
self.set_value(value)
self.set_value(max_val if msg.get("done") else msg.get("value", 0))
self.update()
def _on_progress_snapshot(self, snapshot: ProgressSnapshot):
if snapshot.is_new_scan:
self.set_min_max_values(0, snapshot.max_value)
self.RID = snapshot.rid
self.set_value(snapshot.value)
self.update()
def paintEvent(self, event):
if not self.progress_container:
return
painter = QtGui.QPainter(self)
painter.setRenderHint(QtGui.QPainter.RenderHint.Antialiasing)
size = min(self.width(), self.height())
if size <= 0 or not self.isVisible():
return
painter = QtGui.QPainter(self)
if not painter.isActive():
return
painter.setRenderHint(QtGui.QPainter.RenderHint.Antialiasing)
# Center the ring
x_offset = (self.width() - size) // 2
@@ -509,15 +496,6 @@ class Ring(BECWidget, QWidget):
return QtGui.QColor(*color)
raise ValueError(f"Unsupported color format: {color}")
def cleanup(self):
"""
Cleanup the ring widget.
Disconnect any registered slots.
"""
if self.registered_slot is not None:
self.bec_dispatcher.disconnect_slot(*self.registered_slot)
self.registered_slot = None
###############################################
####### QProperties ###########################
###############################################
@@ -666,6 +644,7 @@ class Ring(BECWidget, QWidget):
if self.registered_slot is not None:
self.bec_dispatcher.disconnect_slot(*self.registered_slot)
self.registered_slot = None
self.progress_tracker.cleanup()
self._hover_animation.stop()
super().cleanup()
@@ -1,121 +1,27 @@
from __future__ import annotations
import enum
import os
import time
from typing import Literal
import numpy as np
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from qtpy.QtCore import QObject, QTimer, Signal
from qtpy.QtCore import Signal
from qtpy.QtWidgets import QVBoxLayout, QWidget
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.utils.error_popups import SafeProperty
from bec_widgets.utils.ui_loader import UILoader
from bec_widgets.widgets.progress.bec_progressbar.bec_progressbar import ProgressState
from bec_widgets.widgets.progress.progress_backend import BECProgressTracker, ProgressSnapshot
logger = bec_logger.logger
class ProgressSource(enum.Enum):
"""
Enum to define the source of the progress.
"""
SCAN_PROGRESS = "scan_progress"
DEVICE_PROGRESS = "device_progress"
class ProgressTask(QObject):
"""
Class to store progress information.
Inspired by https://github.com/Textualize/rich/blob/master/rich/progress.py
"""
def __init__(self, parent: QWidget, value: float = 0, max_value: float = 0, done: bool = False):
super().__init__(parent=parent)
self.start_time = time.time()
self.done = done
self.value = value
self.max_value = max_value
self._elapsed_time = 0
self.timer = QTimer(self)
self.timer.timeout.connect(self.update_elapsed_time)
self.timer.start(100) # update the elapsed time every 100 ms
def update(self, value: float, max_value: float, done: bool = False):
"""
Update the progress.
"""
self.max_value = max_value
self.done = done
self.value = value
if done:
self.timer.stop()
def update_elapsed_time(self):
"""
Update the time estimates. This is called every 100 ms by a QTimer.
"""
self._elapsed_time += 0.1
@property
def percentage(self) -> float:
"""float: Get progress of task as a percentage. If a None total was set, returns 0"""
if not self.max_value:
return 0.0
completed = (self.value / self.max_value) * 100.0
completed = min(100.0, max(0.0, completed))
return completed
@property
def speed(self) -> float:
"""Get the estimated speed in steps per second."""
if self._elapsed_time == 0:
return 0.0
return self.value / self._elapsed_time
@property
def frequency(self) -> float:
"""Get the estimated frequency in steps per second."""
if self.speed == 0:
return 0.0
return 1 / self.speed
@property
def time_elapsed(self) -> str:
# format the elapsed time to a string in the format HH:MM:SS
return self._format_time(int(self._elapsed_time))
@property
def remaining(self) -> float:
"""Get the estimated remaining steps."""
if self.done:
return 0.0
remaining = self.max_value - self.value
return remaining
@property
def time_remaining(self) -> str:
"""
Get the estimated remaining time in the format HH:MM:SS.
"""
if self.done or not self.speed or not self.remaining:
return self._format_time(0)
estimate = int(np.round(self.remaining / self.speed))
return self._format_time(estimate)
def _format_time(self, seconds: float) -> str:
"""
Format the time in seconds to a string in the format HH:MM:SS.
"""
return f"{seconds // 3600:02}:{(seconds // 60) % 60:02}:{seconds % 60:02}"
BEC_STATUS_TO_PROGRESS_STATE = {
"open": ProgressState.NORMAL,
"paused": ProgressState.PAUSED,
"aborted": ProgressState.WARNING,
"halted": ProgressState.INTERRUPTED,
"closed": ProgressState.COMPLETED,
"user_completed": ProgressState.COMPLETED,
}
class ScanProgressBar(BECWidget, QWidget):
@@ -130,7 +36,14 @@ class ScanProgressBar(BECWidget, QWidget):
progress_finished = Signal()
def __init__(
self, parent=None, client=None, config=None, gui_id=None, one_line_design=False, **kwargs
self,
parent=None,
client=None,
config=None,
gui_id=None,
one_line_design=False,
enable_dynamic_stylesheet: bool = True,
**kwargs,
):
super().__init__(parent=parent, client=client, config=config, gui_id=gui_id, **kwargs)
@@ -146,83 +59,43 @@ class ScanProgressBar(BECWidget, QWidget):
self.layout.addWidget(self.ui)
self.setLayout(self.layout)
self.progressbar = self.ui.progressbar
self.progressbar.enable_dynamic_stylesheet = enable_dynamic_stylesheet
self._show_elapsed_time = self.ui.elapsed_time_label.isVisible()
self._show_remaining_time = self.ui.remaining_time_label.isVisible()
self._show_source_label = self.ui.source_label.isVisible()
self.connect_to_queue()
self._progress_source = None
self._progress_device = None
self.task = None
self.scan_number = None
def connect_to_queue(self):
"""
Connect to the queue status signal.
"""
self.bec_dispatcher.connect_slot(self.on_queue_update, MessageEndpoints.scan_queue_status())
def set_progress_source(self, source: ProgressSource, device=None):
"""
Set the source of the progress.
"""
if self._progress_source == source and self._progress_device == device:
self.update_source_label(source, device=device)
return
if self._progress_source is not None:
self.bec_dispatcher.disconnect_slot(
self.on_progress_update,
(
MessageEndpoints.scan_progress()
if self._progress_source == ProgressSource.SCAN_PROGRESS
else MessageEndpoints.device_progress(device=self._progress_device)
),
)
self._progress_source = source
self._progress_device = None if source == ProgressSource.SCAN_PROGRESS else device
self.bec_dispatcher.connect_slot(
self.on_progress_update,
(
MessageEndpoints.scan_progress()
if source == ProgressSource.SCAN_PROGRESS
else MessageEndpoints.device_progress(device=device)
),
self.progress_tracker = BECProgressTracker(self.bec_dispatcher, parent=self)
self.progress_tracker.progress_started.connect(self._on_progress_started)
self.progress_tracker.progress_updated.connect(self._on_progress_snapshot)
self.progress_tracker.progress_finished.connect(
lambda _snapshot: self.progress_finished.emit()
)
self.update_source_label(source, device=device)
# self.progress_started.emit()
self.progress_tracker.start()
def update_source_label(self, source: ProgressSource, device=None):
scan_text = f"Scan {self.scan_number}" if self.scan_number is not None else "Scan"
text = scan_text if source == ProgressSource.SCAN_PROGRESS else f"Device {device}"
logger.info(f"Set progress source to {text}")
self.ui.source_label.setText(text)
@SafeSlot(dict, dict)
def on_progress_update(self, msg_content: dict, metadata: dict):
"""
Update the progress bar based on the progress message.
"""
value = msg_content["value"]
max_value = msg_content.get("max_value", 100)
done = msg_content.get("done", False)
status: Literal["open", "paused", "aborted", "halted", "closed"] = metadata.get(
"status", "open"
)
if self.task is None:
def update_source_label(self):
scan_number = self.progress_tracker.scan_number
scan_text = f"Scan {scan_number}" if scan_number is not None else "Scan"
if self.ui.source_label.text() == scan_text:
return
self.task.update(value, max_value, done)
logger.info(f"Set progress source to {scan_text}")
self.ui.source_label.setText(scan_text)
def _on_progress_started(self, _snapshot: ProgressSnapshot):
if self.progress_tracker.task is not None:
self.progress_tracker.task.timer.timeout.connect(self.update_labels)
self.progress_started.emit()
def _on_progress_snapshot(self, snapshot: ProgressSnapshot):
self.update_labels()
self.progressbar.set_maximum(self.task.max_value)
self.progressbar.state = ProgressState.from_bec_status(status)
self.progressbar.set_value(self.task.value)
if done:
self.task = None
self.progress_finished.emit()
return
if snapshot.is_new_scan and self.progress_tracker.task is None:
self.ui.elapsed_time_label.setText("00:00:00")
self.ui.remaining_time_label.setText("00:00:00")
self.update_source_label()
self.progressbar.set_maximum(snapshot.max_value)
self.progressbar.set_value(snapshot.value)
self.progressbar.state = BEC_STATUS_TO_PROGRESS_STATE.get(
snapshot.status.lower(), ProgressState.NORMAL
)
@SafeProperty(bool)
def show_elapsed_time(self):
@@ -259,74 +132,17 @@ class ScanProgressBar(BECWidget, QWidget):
"""
Update the labels based on the progress task.
"""
if self.task is None:
task = self.progress_tracker.task
if task is None:
return
self.ui.elapsed_time_label.setText(self.task.time_elapsed)
self.ui.remaining_time_label.setText(self.task.time_remaining)
@SafeSlot(dict, dict, verify_sender=True)
def on_queue_update(self, msg_content, metadata):
"""
Update the progress bar based on the queue status.
"""
if not "queue" in msg_content:
return
if "primary" not in msg_content["queue"]:
return
if (primary_queue := msg_content.get("queue").get("primary")) is None:
return
if not isinstance(primary_queue, messages.ScanQueueStatus):
return
primary_queue_info = primary_queue.info
if len(primary_queue_info) == 0:
return
scan_info = primary_queue_info[0]
if scan_info is None:
return
if scan_info.status.lower() == "running" and self.task is None:
self.task = ProgressTask(parent=self)
self.progress_started.emit()
active_request_block = scan_info.active_request_block
if active_request_block is None:
return
self.scan_number = active_request_block.scan_number
report_instructions = active_request_block.report_instructions
if not report_instructions:
return
# for now, let's just use the first instruction
instruction = report_instructions[0]
if "scan_progress" in instruction:
self.set_progress_source(ProgressSource.SCAN_PROGRESS)
elif "device_progress" in instruction:
device = instruction["device_progress"][0]
self.set_progress_source(ProgressSource.DEVICE_PROGRESS, device=device)
self.ui.elapsed_time_label.setText(task.time_elapsed)
self.ui.remaining_time_label.setText(task.time_remaining)
def cleanup(self):
if self.task is not None:
self.task.timer.stop()
self.close()
self.deleteLater()
if self._progress_source is not None:
self.bec_dispatcher.disconnect_slot(
self.on_progress_update,
(
MessageEndpoints.scan_progress()
if self._progress_source == ProgressSource.SCAN_PROGRESS
else MessageEndpoints.device_progress(device=self._progress_device)
),
)
self._progress_source = None
self._progress_device = None
self.progress_tracker.cleanup()
self.progressbar.close()
self.progressbar.deleteLater()
self.bec_dispatcher.disconnect_slot(
self.on_queue_update, MessageEndpoints.scan_queue_status()
)
super().cleanup()
+176 -20
View File
@@ -1,5 +1,8 @@
import numpy as np
from unittest import mock
import pytest
from qtpy.QtGui import QPalette
from qtpy.QtWidgets import QProgressBar
from bec_widgets.widgets.progress.bec_progressbar.bec_progressbar import (
BECProgressBar,
@@ -15,6 +18,14 @@ def progressbar(qtbot):
yield widget
@pytest.fixture
def static_progressbar(qtbot):
widget = BECProgressBar(enable_dynamic_stylesheet=False)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def test_progressbar(progressbar):
progressbar.update()
@@ -23,36 +34,181 @@ def test_progressbar_set_value(qtbot, progressbar):
progressbar.set_minimum(0)
progressbar.set_maximum(100)
progressbar.set_value(50)
progressbar.paintEvent(None)
qtbot.waitUntil(
lambda: np.isclose(
progressbar._value, progressbar._user_value * progressbar._oversampling_factor
)
)
assert isinstance(progressbar.progressbar, QProgressBar)
assert progressbar._value == progressbar._user_value * progressbar._oversampling_factor
assert progressbar.progressbar.value() == 50 * progressbar._oversampling_factor
def test_progressbar_label(progressbar):
progressbar.label_template = "Test: $value"
progressbar.set_value(50)
assert progressbar.center_label.text() == "Test: 50"
assert progressbar._get_label() == "Test: 50"
assert progressbar.progressbar.text() == "Test: 50"
def test_progress_state_from_bec_status():
"""ProgressState.from_bec_status() maps BEC literals correctly."""
mapping = {
"open": ProgressState.NORMAL,
"paused": ProgressState.PAUSED,
"aborted": ProgressState.INTERRUPTED,
"halted": ProgressState.PAUSED,
"closed": ProgressState.COMPLETED,
"UNKNOWN": ProgressState.NORMAL, # fallback
}
for text, expected in mapping.items():
assert ProgressState.from_bec_status(text) is expected
def test_progressbar_equal_minimum_and_maximum_does_not_raise(progressbar):
progressbar.set_minimum(0)
progressbar.set_maximum(0)
progressbar.set_value(0)
assert progressbar._get_label() == "0 / 0 - 100 %"
assert progressbar.progressbar.value() == progressbar.progressbar.maximum()
def test_progressbar_uses_static_stylesheet_with_palette_state_color(progressbar):
progressbar.progressbar.resize(100, 20)
progressbar.set_value(50)
progressbar.state = ProgressState.PAUSED
style_sheet = progressbar.progressbar.styleSheet()
assert "QProgressBar::chunk" in style_sheet
assert (
f"background-color: {progressbar._state_colors[ProgressState.PAUSED].name()};"
in style_sheet
)
assert "background-color: palette(mid);" in style_sheet
assert "border-radius: 7px;" in style_sheet
assert (
progressbar.progressbar.palette().color(QPalette.ColorRole.Highlight)
== progressbar._state_colors[ProgressState.PAUSED]
)
def test_progressbar_value_updates_do_not_rebuild_stylesheet_within_same_chunk_mode(progressbar):
progressbar.progressbar.resize(100, 20)
progressbar.set_value(30)
with mock.patch.object(
progressbar, "_setup_style_sheet", wraps=progressbar._setup_style_sheet
) as setup_style_sheet:
progressbar.set_value(35)
progressbar.set_value(42)
progressbar.set_value(50)
setup_style_sheet.assert_not_called()
def test_progressbar_value_updates_skip_chunk_radius_after_target_reached(progressbar):
progressbar.progressbar.resize(100, 20)
progressbar.set_value(30)
assert progressbar._chunk_radius == progressbar._target_chunk_radius()
with mock.patch.object(
progressbar, "_update_chunk_radius", wraps=progressbar._update_chunk_radius
) as update_chunk_radius:
progressbar.set_value(35)
progressbar.set_value(42)
progressbar.set_value(50)
update_chunk_radius.assert_not_called()
def test_progressbar_repeated_same_maximum_does_not_reset_chunk_radius(progressbar):
progressbar.progressbar.resize(100, 20)
progressbar.set_maximum(100)
progressbar.set_value(30)
assert progressbar._chunk_radius == progressbar._target_chunk_radius()
with mock.patch.object(
progressbar, "_update_chunk_radius", wraps=progressbar._update_chunk_radius
) as update_chunk_radius:
progressbar.set_maximum(100)
progressbar.set_value(40)
update_chunk_radius.assert_not_called()
def test_progressbar_can_disable_dynamic_stylesheet(static_progressbar):
static_progressbar.progressbar.resize(100, 20)
assert static_progressbar.enable_dynamic_stylesheet is False
assert static_progressbar._chunk_radius == static_progressbar._target_chunk_radius()
with mock.patch.object(
static_progressbar, "_setup_style_sheet", wraps=static_progressbar._setup_style_sheet
) as setup_style_sheet:
static_progressbar.set_value(1)
static_progressbar.set_value(2)
static_progressbar.set_value(3)
setup_style_sheet.assert_not_called()
assert "border-radius: 7px;" in static_progressbar.progressbar.styleSheet()
def test_progressbar_dynamic_stylesheet_can_be_toggled(progressbar):
progressbar.enable_dynamic_stylesheet = False
assert progressbar.enable_dynamic_stylesheet is False
assert progressbar._chunk_radius == progressbar._target_chunk_radius()
assert "border-radius: 7px;" in progressbar.progressbar.styleSheet()
def test_progressbar_rebuilds_stylesheet_until_chunk_radius_reaches_target(progressbar):
progressbar.progressbar.resize(100, 20)
progressbar.set_value(9)
with mock.patch.object(
progressbar, "_setup_style_sheet", wraps=progressbar._setup_style_sheet
) as setup_style_sheet:
progressbar.set_value(12)
progressbar.set_value(25)
progressbar.set_value(30)
assert setup_style_sheet.call_count == 2
assert "border-radius: 7px;" in progressbar.progressbar.styleSheet()
def test_progressbar_resets_chunk_radius_when_value_goes_backwards(progressbar):
progressbar.progressbar.resize(100, 20)
progressbar.set_value(30)
assert "border-radius: 7px;" in progressbar.progressbar.styleSheet()
progressbar.set_value(4)
assert "border-radius: 2px;" in progressbar.progressbar.styleSheet()
def test_progressbar_state_setter(progressbar):
"""Setting .state reflects internally."""
progressbar.state = ProgressState.PAUSED
assert progressbar.state is ProgressState.PAUSED
def test_progressbar_warning_state_has_own_color_and_persists_on_value_update(progressbar):
assert (
progressbar._state_colors[ProgressState.PAUSED]
!= progressbar._state_colors[ProgressState.WARNING]
)
assert (
progressbar._state_colors[ProgressState.WARNING]
!= progressbar._state_colors[ProgressState.INTERRUPTED]
)
progressbar.state = ProgressState.WARNING
progressbar.set_value(50)
assert progressbar.state is ProgressState.WARNING
assert (
progressbar.progressbar.palette().color(QPalette.ColorRole.Highlight)
== progressbar._state_colors[ProgressState.WARNING]
)
def test_progressbar_warning_state_has_own_color_and_persists_on_value_update(progressbar):
assert (
progressbar._state_colors[ProgressState.PAUSED]
!= progressbar._state_colors[ProgressState.WARNING]
)
assert (
progressbar._state_colors[ProgressState.WARNING]
!= progressbar._state_colors[ProgressState.INTERRUPTED]
)
progressbar.state = ProgressState.WARNING
progressbar.set_value(50)
assert progressbar.state is ProgressState.WARNING
assert (
progressbar.progressbar.palette().color(QPalette.ColorRole.Highlight)
== progressbar._state_colors[ProgressState.WARNING]
)
@@ -40,7 +40,7 @@ def test_update_device_initialization_progress(progress_bar, qtbot):
assert progress_bar.progress_bar._user_value == 1
assert progress_bar.progress_bar._user_maximum == 3
assert progress_bar.progress_label.text() == f"{msg.device} initialization in progress..."
assert "1 / 3 - 33 %" == progress_bar.progress_bar.center_label.text()
assert "1 / 3 - 33 %" == progress_bar.progress_bar.progressbar.text()
# II. Update with message of finished DeviceInitializationProgressMessage, finished=True, success=True
msg.finished = True
@@ -49,7 +49,7 @@ def test_update_device_initialization_progress(progress_bar, qtbot):
assert progress_bar.progress_bar._user_value == 1
assert progress_bar.progress_bar._user_maximum == 3
assert progress_bar.progress_label.text() == f"{msg.device} initialization succeeded!"
assert "1 / 3 - 33 %" == progress_bar.progress_bar.center_label.text()
assert "1 / 3 - 33 %" == progress_bar.progress_bar.progressbar.text()
# III. Update with message of finished DeviceInitializationProgressMessage, finished=True, success=False
msg.finished = True
@@ -59,7 +59,7 @@ def test_update_device_initialization_progress(progress_bar, qtbot):
with qtbot.waitSignal(progress_bar.failed_devices_changed) as signal_blocker:
progress_bar._update_device_initialization_progress(msg.model_dump(), {})
assert progress_bar.progress_label.text() == f"{msg.device} initialization failed!"
assert "2 / 3 - 66 %" == progress_bar.progress_bar.center_label.text()
assert "2 / 3 - 66 %" == progress_bar.progress_bar.progressbar.text()
assert progress_bar.progress_bar._user_value == 2
assert progress_bar.progress_bar._user_maximum == 3
+133
View File
@@ -0,0 +1,133 @@
from unittest import mock
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.widgets.progress.progress_backend import BECProgressTracker
def _dispatcher():
dispatcher = mock.MagicMock()
return dispatcher
def test_tracker_subscribes_to_scan_progress_immediately():
dispatcher = _dispatcher()
tracker = BECProgressTracker(dispatcher)
tracker.start()
assert dispatcher.connect_slot.call_args_list == [
mock.call(tracker.process_progress_message, MessageEndpoints.scan_progress()),
mock.call(tracker.process_scan_status_message, MessageEndpoints.scan_status()),
]
tracker.cleanup()
def test_tracker_starts_scan_from_scan_progress_metadata():
dispatcher = _dispatcher()
tracker = BECProgressTracker(dispatcher)
snapshots = []
tracker.progress_updated.connect(snapshots.append)
tracker.start()
tracker.process_progress_message(
{"value": 3, "max_value": 10},
{"scan_id": "scan-2", "RID": "rid-2", "scan_number": 2, "status": "open"},
)
assert tracker.task is not None
assert tracker._active_scan_id == "scan-2"
assert tracker._active_rid == "rid-2"
assert tracker.scan_number == 2
assert snapshots[-1].scan_number == 2
tracker.cleanup()
def test_tracker_switches_sources_idempotently():
dispatcher = _dispatcher()
tracker = BECProgressTracker(dispatcher)
tracker.start()
tracker.start()
assert dispatcher.connect_slot.call_count == 2
assert dispatcher.disconnect_slot.call_count == 0
tracker.cleanup()
def test_tracker_resets_progress_on_new_open_scan_status():
dispatcher = _dispatcher()
tracker = BECProgressTracker(dispatcher)
snapshots = []
tracker.progress_updated.connect(snapshots.append)
tracker.start()
snapshot = tracker.process_scan_status_message(
{"scan_id": "scan-1", "scan_number": 7, "status": "open"}, {}
)
assert snapshot is not None
assert snapshot.value == 0
assert snapshot.max_value == 100
assert snapshot.status == "open"
assert snapshot.scan_id == "scan-1"
assert snapshot.scan_number == 7
assert snapshot.is_new_scan is True
assert tracker.task is None
assert tracker.scan_number == 7
assert snapshots[-1] == snapshot
tracker.cleanup()
def test_tracker_ignores_duplicate_open_scan_status():
dispatcher = _dispatcher()
tracker = BECProgressTracker(dispatcher)
snapshots = []
tracker.progress_updated.connect(snapshots.append)
tracker.start()
tracker.process_scan_status_message({"scan_id": "scan-1", "status": "open"}, {})
tracker.process_scan_status_message({"scan_id": "scan-1", "status": "open"}, {})
assert len(snapshots) == 1
tracker.cleanup()
def test_tracker_marks_new_scan_only_when_rid_changes():
dispatcher = _dispatcher()
tracker = BECProgressTracker(dispatcher)
snapshots = []
tracker.progress_updated.connect(snapshots.append)
tracker.start()
tracker.process_progress_message({"value": 10, "max_value": 100}, {"RID": "rid-1"})
tracker.process_progress_message({"value": 20, "max_value": 200}, {"RID": "rid-1"})
tracker.process_progress_message({"value": 5, "max_value": 50}, {"RID": "rid-2"})
assert [snapshot.is_new_scan for snapshot in snapshots] == [True, False, True]
assert tracker._active_rid == "rid-2"
tracker.cleanup()
def test_tracker_keeps_partial_value_for_done_scan_progress():
dispatcher = _dispatcher()
tracker = BECProgressTracker(dispatcher)
snapshots = []
tracker.progress_updated.connect(snapshots.append)
tracker.start()
tracker.process_progress_message(
{"value": 4, "max_value": 10, "done": True},
{"scan_id": "scan-1", "RID": "rid-1", "status": "aborted"},
)
assert snapshots[-1].value == 4
assert snapshots[-1].max_value == 10
assert snapshots[-1].done is True
assert tracker.task is None
tracker.cleanup()
+146 -26
View File
@@ -1,8 +1,9 @@
# pylint: disable=missing-function-docstring, missing-module-docstring
from unittest.mock import MagicMock
from unittest.mock import MagicMock, call
import pytest
from bec_lib.endpoints import MessageEndpoints
from qtpy.QtGui import QColor
from bec_widgets.tests.utils import FakeDevice
@@ -76,11 +77,14 @@ def test_set_update_to_scan(ring_widget):
ring_widget.set_update("scan")
assert ring_widget.config.mode == "scan"
# Verify that connect_slot was called
ring_widget.bec_dispatcher.connect_slot.assert_called_once()
call_args = ring_widget.bec_dispatcher.connect_slot.call_args
assert call_args[0][0] == ring_widget.on_scan_progress
assert "scan_progress" in str(call_args[0][1])
assert ring_widget.bec_dispatcher.connect_slot.call_args_list == [
call(
ring_widget.progress_tracker.process_progress_message, MessageEndpoints.scan_progress()
),
call(
ring_widget.progress_tracker.process_scan_status_message, MessageEndpoints.scan_status()
),
]
def test_set_update_from_scan_to_manual(ring_widget):
@@ -97,6 +101,14 @@ def test_set_update_from_scan_to_manual(ring_widget):
assert ring_widget.config.mode == "manual"
assert ring_widget.registered_slot is None
assert ring_widget.bec_dispatcher.disconnect_slot.call_args_list == [
call(
ring_widget.progress_tracker.process_progress_message, MessageEndpoints.scan_progress()
),
call(
ring_widget.progress_tracker.process_scan_status_message, MessageEndpoints.scan_status()
),
]
def test_set_update_to_device(ring_widget_with_device):
@@ -420,7 +432,7 @@ def test_set_direction_counter_clockwise(ring_widget):
###################################
def test_update_device_connection_with_progress_signal(ring_widget_with_device):
def test_update_device_connection_prefers_progress_signal(ring_widget_with_device):
ring_widget = ring_widget_with_device
samx = ring_widget.bec_dispatcher.client.device_manager.devices.samx
samx._info["signals"]["progress"] = {
@@ -432,15 +444,114 @@ def test_update_device_connection_with_progress_signal(ring_widget_with_device):
ring_widget.bec_dispatcher.connect_slot = MagicMock()
ring_widget._update_device_connection("samx", "progress")
signal = ring_widget._update_device_connection("samx", "")
# Should connect to device_progress endpoint
assert signal == "progress"
ring_widget.bec_dispatcher.connect_slot.assert_called_once()
call_args = ring_widget.bec_dispatcher.connect_slot.call_args
assert call_args[0][0] == ring_widget.on_device_progress
assert call_args[0][1] == MessageEndpoints.device_progress("samx")
def test_update_device_connection_with_hinted_signal(ring_widget):
def test_update_device_connection_accepts_explicit_progress_signal(ring_widget_with_device):
ring_widget = ring_widget_with_device
samx = ring_widget.bec_dispatcher.client.device_manager.devices.samx
samx._info["signals"]["progress"] = {
"obj_name": "samx_progress",
"component_name": "progress",
"signal_class": "ProgressSignal",
"kind_str": "hinted",
}
ring_widget.bec_dispatcher.connect_slot = MagicMock()
signal = ring_widget._update_device_connection("samx", "progress")
assert signal == "progress"
ring_widget.bec_dispatcher.connect_slot.assert_called_once()
call_args = ring_widget.bec_dispatcher.connect_slot.call_args
assert call_args[0][0] == ring_widget.on_device_progress
assert call_args[0][1] == MessageEndpoints.device_progress("samx")
def test_update_device_connection_resolves_component_name_to_readback_signal(
ring_widget_with_device,
):
ring_widget = ring_widget_with_device
samx = ring_widget.bec_dispatcher.client.device_manager.devices.samx
samx._info["signals"]["setpoint"] = {
"obj_name": "samx_setpoint",
"component_name": "setpoint",
"signal_class": "Signal",
"kind_str": "normal",
}
ring_widget.bec_dispatcher.connect_slot = MagicMock()
signal = ring_widget._update_device_connection("samx", "setpoint")
assert signal == "setpoint"
ring_widget.bec_dispatcher.connect_slot.assert_called_once()
call_args = ring_widget.bec_dispatcher.connect_slot.call_args
assert call_args[0][0] == ring_widget.on_device_readback
assert call_args[0][1] == MessageEndpoints.device_readback("samx")
def test_update_device_connection_falls_back_to_hinted_signal(ring_widget_with_device):
ring_widget = ring_widget_with_device
ring_widget.bec_dispatcher.connect_slot = MagicMock()
signal = ring_widget._update_device_connection("samx", "")
assert signal == "samx"
ring_widget.bec_dispatcher.connect_slot.assert_called_once()
call_args = ring_widget.bec_dispatcher.connect_slot.call_args
assert call_args[0][0] == ring_widget.on_device_readback
assert call_args[0][1] == MessageEndpoints.device_readback("samx")
def test_update_device_connection_falls_back_to_normal_signal(ring_widget_with_device):
ring_widget = ring_widget_with_device
samx = ring_widget.bec_dispatcher.client.device_manager.devices.samx
samx._info["signals"] = {
"setpoint": {
"obj_name": "samx_setpoint",
"component_name": "setpoint",
"signal_class": "Signal",
"kind_str": "normal",
}
}
ring_widget.bec_dispatcher.connect_slot = MagicMock()
signal = ring_widget._update_device_connection("samx", "")
assert signal == "setpoint"
ring_widget.bec_dispatcher.connect_slot.assert_called_once()
call_args = ring_widget.bec_dispatcher.connect_slot.call_args
assert call_args[0][0] == ring_widget.on_device_readback
assert call_args[0][1] == MessageEndpoints.device_readback("samx")
def test_update_device_connection_rejects_unusable_signal(ring_widget_with_device):
ring_widget = ring_widget_with_device
samx = ring_widget.bec_dispatcher.client.device_manager.devices.samx
samx._info["signals"]["async_signal"] = {
"obj_name": "samx_async",
"component_name": "async_signal",
"signal_class": "AsyncSignal",
"kind_str": "hinted",
}
ring_widget.bec_dispatcher.connect_slot = MagicMock()
with pytest.raises(ValueError, match="not usable for ring progress device mode"):
ring_widget._update_device_connection("samx", "samx_async")
ring_widget.bec_dispatcher.connect_slot.assert_not_called()
def test_update_device_connection_accepts_explicit_hinted_signal(ring_widget):
mock_device = FakeDevice(name="samx")
mock_device._info = {
"signals": {
@@ -452,12 +563,13 @@ def test_update_device_connection_with_hinted_signal(ring_widget):
ring_widget.bec_dispatcher.connect_slot = MagicMock()
ring_widget._update_device_connection("samx", "samx")
signal = ring_widget._update_device_connection("samx", "samx")
# Should connect to device_readback endpoint
assert signal == "samx"
ring_widget.bec_dispatcher.connect_slot.assert_called_once()
call_args = ring_widget.bec_dispatcher.connect_slot.call_args
assert call_args[0][0] == ring_widget.on_device_readback
assert call_args[0][1] == MessageEndpoints.device_readback("samx")
def test_update_device_connection_no_device_manager(ring_widget):
@@ -472,44 +584,52 @@ def test_update_device_connection_device_not_found(ring_widget):
mock_device = FakeDevice(name="samx")
ring_widget.bec_dispatcher.client.device_manager.devices["samx"] = mock_device
# Should return without raising an error
ring_widget._update_device_connection("nonexistent", "signal")
assert ring_widget._update_device_connection("nonexistent", "signal") == ""
###################################
# on_scan_progress tests
# scan progress tests
###################################
def test_on_scan_progress_updates_value(ring_widget):
def test_scan_progress_updates_value(ring_widget):
msg = {"value": 42, "max_value": 100}
meta = {"RID": "test_rid_123"}
ring_widget.on_scan_progress(msg, meta)
ring_widget.progress_tracker.process_progress_message(msg, meta)
assert ring_widget.config.value == 42
def test_on_scan_progress_updates_min_max_on_new_rid(ring_widget):
def test_scan_status_open_resets_scan_progress_value(ring_widget):
ring_widget.set_min_max_values(0, 200)
ring_widget.set_value(80)
ring_widget.progress_tracker.process_scan_status_message(
{"scan_id": "scan-1", "scan_number": 7, "status": "open"}, {}
)
assert ring_widget.config.min_value == 0
assert ring_widget.config.max_value == 100
assert ring_widget.config.value == 0
def test_scan_progress_updates_min_max_on_new_rid(ring_widget):
msg = {"value": 50, "max_value": 200}
meta = {"RID": "new_rid"}
ring_widget.RID = "old_rid"
ring_widget.on_scan_progress(msg, meta)
ring_widget.progress_tracker.process_progress_message(msg, meta)
assert ring_widget.config.min_value == 0
assert ring_widget.config.max_value == 200
assert ring_widget.config.value == 50
def test_on_scan_progress_same_rid_no_min_max_update(ring_widget):
msg = {"value": 75, "max_value": 300}
def test_scan_progress_same_rid_no_min_max_update(ring_widget):
meta = {"RID": "same_rid"}
ring_widget.RID = "same_rid"
ring_widget.set_min_max_values(0, 100)
ring_widget.on_scan_progress(msg, meta)
ring_widget.progress_tracker.process_progress_message({"value": 10, "max_value": 100}, meta)
ring_widget.progress_tracker.process_progress_message({"value": 75, "max_value": 300}, meta)
# Max value should not be updated when RID is the same
assert ring_widget.config.max_value == 100
+138 -266
View File
@@ -2,7 +2,6 @@ from unittest import mock
import numpy as np
import pytest
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.utils.bec_widget import BECWidget
@@ -10,11 +9,8 @@ from bec_widgets.widgets.progress.bec_progressbar.bec_progressbar import (
BECProgressBar,
ProgressState,
)
from bec_widgets.widgets.progress.scan_progressbar.scan_progressbar import (
ProgressSource,
ProgressTask,
ScanProgressBar,
)
from bec_widgets.widgets.progress.progress_backend import ProgressTask
from bec_widgets.widgets.progress.scan_progressbar.scan_progressbar import ScanProgressBar
from .client_mocks import mocked_client
@@ -27,30 +23,6 @@ def scan_progressbar(qtbot, mocked_client):
yield widget
@pytest.fixture
def scan_message():
return messages.ScanQueueMessage(
metadata={
"file_suffix": None,
"file_directory": None,
"user_metadata": {"sample_name": ""},
"RID": "94949c6e-d5f2-4f01-837e-a5d36257dd5d",
},
scan_type="line_scan",
parameter={
"args": {"samx": [-10.0, 10.0]},
"kwargs": {
"steps": 20,
"relative": False,
"exp_time": 0.1,
"burst_at_each_point": 1,
"system_config": {"file_suffix": None, "file_directory": None},
},
},
queue="primary",
)
def test_progress_task_basic():
"""percentage, remaining, and formatted time helpers behave as expected."""
task = ProgressTask(parent=None, value=50, max_value=100, done=False)
@@ -71,18 +43,52 @@ def test_progress_task_basic():
assert task.time_elapsed == "00:00:10"
def test_progress_task_elapsed_time_uses_monotonic_clock(monkeypatch):
times = iter([100.0, 102.5])
monkeypatch.setattr(
"bec_widgets.widgets.progress.progress_backend.time.monotonic", lambda: next(times)
)
task = ProgressTask(parent=None)
task.timer.stop()
task.update_elapsed_time()
assert task._elapsed_time == 2.5
assert task.time_elapsed == "00:00:02"
def test_scan_progressbar_initialization(scan_progressbar):
assert isinstance(scan_progressbar, ScanProgressBar)
assert isinstance(scan_progressbar.progressbar, BECProgressBar)
def test_scan_progressbar_passes_dynamic_stylesheet_setting(qtbot, mocked_client):
widget = ScanProgressBar(client=mocked_client, enable_dynamic_stylesheet=False)
qtbot.addWidget(widget)
assert widget.progressbar.enable_dynamic_stylesheet is False
def test_scan_progressbar_starts_from_scan_progress_before_queue_update(scan_progressbar):
scan_progressbar.progress_tracker.clear_task(emit_finished=False)
scan_progressbar.progress_tracker.process_progress_message(
{"value": 3, "max_value": 10, "done": False}, metadata={"RID": "live-rid"}
)
assert scan_progressbar.progress_tracker.task is not None
assert scan_progressbar.progress_tracker._active_scan_id == "live-rid"
assert scan_progressbar.progressbar._user_value == 3
assert scan_progressbar.progressbar._user_maximum == 10
def test_update_labels_content(scan_progressbar):
"""update_labels() reflects ProgressTask time strings on the UI."""
# fabricate a task with known timings
task = ProgressTask(parent=scan_progressbar, value=30, max_value=100, done=False)
task.timer.stop()
task._elapsed_time = 50
scan_progressbar.task = task
scan_progressbar.progress_tracker.task = task
scan_progressbar.update_labels()
@@ -90,17 +96,17 @@ def test_update_labels_content(scan_progressbar):
assert scan_progressbar.ui.remaining_time_label.text() == "00:01:57"
def test_on_progress_update(qtbot, scan_progressbar):
def test_progress_update(qtbot, scan_progressbar):
"""
on_progress_update() should forward new values to the embedded
BECProgressBar and keep ProgressTask in sync.
Scan progress updates should update the embedded BECProgressBar
and keep ProgressTask in sync.
"""
task = ProgressTask(parent=scan_progressbar, value=0, max_value=100, done=False)
task.timer.stop()
scan_progressbar.task = task
scan_progressbar.progress_tracker.task = task
msg = {"value": 20, "max_value": 100, "done": False}
scan_progressbar.on_progress_update(msg, metadata={"status": "open"})
scan_progressbar.progress_tracker.process_progress_message(msg, metadata={"status": "open"})
qtbot.wait(200)
bar = scan_progressbar.progressbar
@@ -110,14 +116,58 @@ def test_on_progress_update(qtbot, scan_progressbar):
assert bar.state is ProgressState.NORMAL
def test_scan_status_open_resets_progress_before_first_progress_update(scan_progressbar):
scan_progressbar.progressbar.set_maximum(50)
scan_progressbar.progressbar.set_value(25)
scan_progressbar.progressbar.state = ProgressState.INTERRUPTED
scan_progressbar.ui.elapsed_time_label.setText("00:00:12")
scan_progressbar.ui.remaining_time_label.setText("00:00:34")
scan_progressbar.progress_tracker.process_scan_status_message(
{"scan_id": "scan-1", "scan_number": 7, "status": "open"}, {"RID": "rid-1"}
)
assert scan_progressbar.progressbar._user_value == 0
assert scan_progressbar.progressbar._user_maximum == 100
assert scan_progressbar.progressbar.state is ProgressState.NORMAL
assert scan_progressbar.ui.elapsed_time_label.text() == "00:00:00"
assert scan_progressbar.ui.remaining_time_label.text() == "00:00:00"
assert scan_progressbar.ui.source_label.text() == "Scan 7"
def test_scan_status_open_reset_ignores_same_scan(scan_progressbar):
scan_progressbar.progress_tracker.process_scan_status_message(
{"scan_id": "scan-1", "status": "open"}, {}
)
scan_progressbar.progressbar.set_value(20)
scan_progressbar.progress_tracker.process_scan_status_message(
{"scan_id": "scan-1", "status": "open"}, {}
)
assert scan_progressbar.progressbar._user_value == 20
def test_scan_status_non_open_does_not_reset_progress(scan_progressbar):
scan_progressbar.progressbar.set_value(25)
scan_progressbar.progress_tracker.process_scan_status_message(
{"scan_id": "scan-1", "status": "closed"}, {}
)
assert scan_progressbar.progressbar._user_value == 25
@pytest.mark.parametrize(
"status, value, max_val, expected_state",
[
("open", 10, 100, ProgressState.NORMAL),
("paused", 25, 100, ProgressState.PAUSED),
("aborted", 30, 100, ProgressState.INTERRUPTED),
("halted", 40, 100, ProgressState.PAUSED),
("aborted", 30, 100, ProgressState.WARNING),
("halted", 40, 100, ProgressState.INTERRUPTED),
("closed", 100, 100, ProgressState.COMPLETED),
("user_completed", 40, 100, ProgressState.COMPLETED),
("UNKNOWN", 10, 100, ProgressState.NORMAL),
],
)
def test_state_mapping_during_updates(
@@ -126,9 +176,9 @@ def test_state_mapping_during_updates(
"""ScanProgressBar should translate BEC status → ProgressState consistently."""
task = ProgressTask(parent=scan_progressbar, value=0, max_value=max_val, done=False)
task.timer.stop()
scan_progressbar.task = task
scan_progressbar.progress_tracker.task = task
scan_progressbar.on_progress_update(
scan_progressbar.progress_tracker.process_progress_message(
{"value": value, "max_value": max_val, "done": status == "closed"},
metadata={"status": status},
)
@@ -136,97 +186,39 @@ def test_state_mapping_during_updates(
assert scan_progressbar.progressbar.state is expected_state
def test_source_label_updates(scan_progressbar):
"""update_source_label() renders correct text for both progress sources."""
# device progress
scan_progressbar.update_source_label(ProgressSource.DEVICE_PROGRESS, device="motor")
assert scan_progressbar.ui.source_label.text() == "Device motor"
def test_aborted_done_scan_keeps_partial_progress(scan_progressbar):
scan_progressbar.progress_tracker.process_progress_message(
{"value": 4, "max_value": 10, "done": True},
metadata={"scan_id": "scan-1", "RID": "rid-1", "status": "aborted"},
)
# scan progress (needs a scan_number for deterministic text)
scan_progressbar.scan_number = 5
scan_progressbar.update_source_label(ProgressSource.SCAN_PROGRESS)
assert scan_progressbar.progressbar._user_value == 4
assert scan_progressbar.progressbar._user_maximum == 10
assert scan_progressbar.progressbar.state is ProgressState.WARNING
assert scan_progressbar.progress_tracker.task is None
def test_source_label_updates(scan_progressbar):
"""update_source_label() renders the current scan label."""
scan_progressbar.progress_tracker.scan_number = 5
scan_progressbar.update_source_label()
assert scan_progressbar.ui.source_label.text() == "Scan 5"
def test_set_progress_source_connections(scan_progressbar, monkeypatch):
""" """
def test_source_label_update_logs_only_on_text_change(scan_progressbar):
scan_progressbar.progress_tracker.scan_number = 5
connect_calls = []
disconnect_calls = []
with mock.patch(
"bec_widgets.widgets.progress.scan_progressbar.scan_progressbar.logger.info"
) as mock_info:
scan_progressbar.update_source_label()
scan_progressbar.update_source_label()
scan_progressbar.update_source_label()
def fake_connect(slot, endpoint):
connect_calls.append(endpoint)
def fake_disconnect(slot, endpoint):
disconnect_calls.append(endpoint)
# Patch dispatcher methods
monkeypatch.setattr(scan_progressbar.bec_dispatcher, "connect_slot", fake_connect)
monkeypatch.setattr(scan_progressbar.bec_dispatcher, "disconnect_slot", fake_disconnect)
# switch to SCAN_PROGRESS
scan_progressbar.scan_number = 7
scan_progressbar.set_progress_source(ProgressSource.SCAN_PROGRESS)
assert scan_progressbar._progress_source == ProgressSource.SCAN_PROGRESS
assert scan_progressbar.ui.source_label.text() == "Scan 7"
assert connect_calls[-1] == MessageEndpoints.scan_progress()
assert disconnect_calls == []
# switch to DEVICE_PROGRESS
device = "motor"
scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device=device)
assert scan_progressbar._progress_source == ProgressSource.DEVICE_PROGRESS
assert scan_progressbar.ui.source_label.text() == f"Device {device}"
assert connect_calls[-1] == MessageEndpoints.device_progress(device=device)
assert disconnect_calls == [MessageEndpoints.scan_progress()]
# calling again with the SAME source should not add new connect calls
prev_connect_count = len(connect_calls)
scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device=device)
assert len(connect_calls) == prev_connect_count, "No extra connect made for same source"
mock_info.assert_called_once_with("Set progress source to Scan 5")
def test_set_progress_source_disconnects_previous_device_subscription(
scan_progressbar, monkeypatch
):
disconnect_calls = []
monkeypatch.setattr(scan_progressbar.bec_dispatcher, "connect_slot", lambda *args: None)
monkeypatch.setattr(
scan_progressbar.bec_dispatcher,
"disconnect_slot",
lambda slot, endpoint: disconnect_calls.append(endpoint),
)
scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device="motor1")
scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device="motor2")
assert disconnect_calls == [MessageEndpoints.device_progress(device="motor1")]
def test_set_progress_source_disconnects_device_when_switching_to_scan(
scan_progressbar, monkeypatch
):
disconnect_calls = []
monkeypatch.setattr(scan_progressbar.bec_dispatcher, "connect_slot", lambda *args: None)
monkeypatch.setattr(
scan_progressbar.bec_dispatcher,
"disconnect_slot",
lambda slot, endpoint: disconnect_calls.append(endpoint),
)
scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device="motor1")
scan_progressbar.set_progress_source(ProgressSource.SCAN_PROGRESS)
assert disconnect_calls == [MessageEndpoints.device_progress(device="motor1")]
def test_cleanup_disconnects_active_device_subscription(scan_progressbar, monkeypatch):
def test_cleanup_disconnects_active_scan_subscription(scan_progressbar, monkeypatch):
disconnect_calls = []
@@ -240,148 +232,28 @@ def test_cleanup_disconnects_active_device_subscription(scan_progressbar, monkey
monkeypatch.setattr(scan_progressbar.progressbar, "deleteLater", lambda: None)
monkeypatch.setattr(BECWidget, "cleanup", lambda self: None)
scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device="motor1")
with (
mock.patch.object(scan_progressbar, "close", wraps=scan_progressbar.close) as close_mock,
mock.patch.object(
scan_progressbar, "deleteLater", wraps=scan_progressbar.deleteLater
) as delete_later_mock,
):
ScanProgressBar.cleanup(scan_progressbar)
assert disconnect_calls == [MessageEndpoints.scan_progress(), MessageEndpoints.scan_status()]
assert scan_progressbar.progress_tracker._connected is False
close_mock.assert_not_called()
delete_later_mock.assert_not_called()
def test_cleanup_stops_active_task(scan_progressbar, monkeypatch):
monkeypatch.setattr(BECWidget, "cleanup", lambda self: None)
scan_progressbar.progress_tracker.task = ProgressTask(parent=scan_progressbar)
scan_progressbar.progress_tracker._active_scan_id = "scan-1"
timer = scan_progressbar.progress_tracker.task.timer
ScanProgressBar.cleanup(scan_progressbar)
assert disconnect_calls == [
MessageEndpoints.device_progress(device="motor1"),
MessageEndpoints.scan_queue_status(),
]
assert scan_progressbar._progress_source is None
assert scan_progressbar._progress_device is None
def test_progressbar_queue_update(scan_progressbar):
"""
Test that an empty queue update does not change the progress source.
"""
msg = messages.ScanQueueStatusMessage(
queue={"primary": messages.ScanQueueStatus(info=[], status="RUNNING")}
)
with mock.patch.object(scan_progressbar, "set_progress_source") as mock_set_source:
scan_progressbar.on_queue_update(
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
)
mock_set_source.assert_not_called()
def test_progressbar_queue_update_with_scan(scan_progressbar, scan_message):
"""
Test that a queue update with a scan changes the progress source to SCAN_PROGRESS.
"""
request_block = messages.RequestBlock(
msg=scan_message,
RID="some-rid",
scan_motors=["samx"],
readout_priority={"monitored": ["samx"]},
is_scan=True,
scan_number=1,
scan_id="e3f50794-852c-4bb1-965e-41c585ab0aa9",
report_instructions=[{"scan_progress": 20}],
)
msg = messages.ScanQueueStatusMessage(
metadata={},
queue={
"primary": messages.ScanQueueStatus(
info=[
messages.QueueInfoEntry(
queue_id="40831e2c-fbd1-4432-8072-ad168a7ad964",
scan_id=["e3f50794-852c-4bb1-965e-41c585ab0aa9"],
status="RUNNING",
active_request_block=request_block,
is_scan=[True],
request_blocks=[request_block],
scan_number=[1],
)
],
status="RUNNING",
)
},
)
with mock.patch.object(scan_progressbar, "set_progress_source") as mock_set_source:
scan_progressbar.on_queue_update(
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
)
mock_set_source.assert_called_once_with(ProgressSource.SCAN_PROGRESS)
def test_progressbar_queue_update_with_device(scan_progressbar, scan_message):
"""
Test that a queue update with a device changes the progress source to DEVICE_PROGRESS.
"""
request_block = messages.RequestBlock(
msg=scan_message,
RID="some-rid",
scan_motors=["samx"],
readout_priority={"monitored": ["samx"]},
is_scan=True,
scan_number=1,
scan_id="e3f50794-852c-4bb1-965e-41c585ab0aa9",
report_instructions=[{"device_progress": ["samx"]}],
)
msg = messages.ScanQueueStatusMessage(
metadata={},
queue={
"primary": messages.ScanQueueStatus(
info=[
messages.QueueInfoEntry(
queue_id="40831e2c-fbd1-4432-8072-ad168a7ad964",
scan_id=["e3f50794-852c-4bb1-965e-41c585ab0aa9"],
status="RUNNING",
active_request_block=request_block,
is_scan=[True],
request_blocks=[request_block],
scan_number=[1],
)
],
status="RUNNING",
)
},
)
with mock.patch.object(scan_progressbar, "set_progress_source") as mock_set_source:
scan_progressbar.on_queue_update(
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
)
mock_set_source.assert_called_once_with(ProgressSource.DEVICE_PROGRESS, device="samx")
def test_progressbar_queue_update_with_no_scan_or_device(scan_progressbar, scan_message):
"""
Test that a queue update with neither scan nor device does not change the progress source.
"""
request_block = messages.RequestBlock(
msg=scan_message,
RID="some-rid",
scan_motors=["samx"],
readout_priority={"monitored": ["samx"]},
is_scan=True,
scan_number=1,
scan_id="e3f50794-852c-4bb1-965e-41c585ab0aa9",
)
msg = messages.ScanQueueStatusMessage(
metadata={},
queue={
"primary": messages.ScanQueueStatus(
info=[
messages.QueueInfoEntry(
queue_id="40831e2c-fbd1-4432-8072-ad168a7ad964",
scan_id=["e3f50794-852c-4bb1-965e-41c585ab0aa9"],
status="RUNNING",
active_request_block=request_block,
is_scan=[True],
request_blocks=[request_block],
scan_number=[1],
)
],
status="RUNNING",
)
},
)
with mock.patch.object(scan_progressbar, "set_progress_source") as mock_set_source:
scan_progressbar.on_queue_update(
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
)
mock_set_source.assert_not_called()
assert not timer.isActive()
assert scan_progressbar.progress_tracker.task is None
assert scan_progressbar.progress_tracker._active_scan_id is None