mirror of
https://github.com/bec-project/bec_widgets.git
synced 2026-06-13 16:40:56 +02:00
Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 79a4cda6c1 | |||
| 6f3ee6316b | |||
| 3d93cf2f01 | |||
| e547ec71ae | |||
| e8bd80377e | |||
| e8e67f68a2 | |||
| 51f7652b1f | |||
| 007f9306a6 | |||
| acfc1b4b88 | |||
| af125e2222 |
@@ -45,6 +45,18 @@ jobs:
|
||||
cd ./bec
|
||||
pip install pytest pytest-random-order
|
||||
pytest -v --maxfail=2 --junitxml=report.xml --random-order ./bec_server/tests ./bec_ipython_client/tests/client_tests ./bec_lib/tests
|
||||
|
||||
- name: Upload BEC unit test artifacts if job fails
|
||||
if: failure()
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: bec-unit-test-artifacts
|
||||
path: |
|
||||
./bec/report.xml
|
||||
./bec/logs/*.log
|
||||
if-no-files-found: ignore
|
||||
retention-days: 7
|
||||
|
||||
bec-e2e-test:
|
||||
name: BEC End2End Tests
|
||||
runs-on: ubuntu-latest
|
||||
@@ -62,3 +74,12 @@ jobs:
|
||||
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH }}
|
||||
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH }}
|
||||
PYTHON_VERSION: '3.11'
|
||||
|
||||
- name: Upload BEC e2e logs if job fails
|
||||
if: failure()
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: bec-e2e-test-logs
|
||||
path: ./_e2e_test_checkout_/bec/logs/*.log
|
||||
if-no-files-found: ignore
|
||||
retention-days: 7
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
name: 'Close stale issues and PRs'
|
||||
name: "Close stale issues and PRs"
|
||||
on:
|
||||
schedule:
|
||||
- cron: '00 10 * * *'
|
||||
- cron: "00 10 * * *"
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
@@ -13,7 +13,7 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/stale@v9
|
||||
with:
|
||||
stale-issue-message: 'This issue is stale because it has been open 120 days with no activity. Remove stale label or comment or this will be closed in 14 days.'
|
||||
stale-pr-message: 'This PR is stale because it has been open 120 days with no activity. Remove stale label or comment or this will be closed in 14 days.'
|
||||
stale-issue-message: "This issue is stale because it has been open 120 days with no activity."
|
||||
stale-pr-message: "This PR is stale because it has been open 120 days with no activity."
|
||||
days-before-stale: 120
|
||||
days-before-close: 14
|
||||
days-before-close: -1
|
||||
|
||||
@@ -1,6 +1,44 @@
|
||||
# CHANGELOG
|
||||
|
||||
|
||||
## v3.14.0 (2026-06-11)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
- **bec_progress_bar**: Replace the custom paint event progressbar with native QProgressBar
|
||||
([`007f930`](https://github.com/bec-project/bec_widgets/commit/007f9306a62f60cf66a268f608984b6a954b8653))
|
||||
|
||||
- **progress**: Scan progress reset on_scan_status in unified backend
|
||||
([`3d93cf2`](https://github.com/bec-project/bec_widgets/commit/3d93cf2f01778a9825a94adcba44a26fbeaf4be4))
|
||||
|
||||
- **ring**: Progresssignal fetch logic back
|
||||
([`e8bd803`](https://github.com/bec-project/bec_widgets/commit/e8bd80377e0bee40142c5cd0d4a9c35d35f2d950))
|
||||
|
||||
- **scan_control**: Remove parent from layout to prevent `QLayout: Attempting to add QLayout "" to
|
||||
ScanGroupBox "", which already has a layout`
|
||||
([`e8e67f6`](https://github.com/bec-project/bec_widgets/commit/e8e67f68a2912c69a7df3d82daaa67fab3ae1139))
|
||||
|
||||
### Continuous Integration
|
||||
|
||||
- **child_repos**: Artifact logs upload if child pipelines fail
|
||||
([`acfc1b4`](https://github.com/bec-project/bec_widgets/commit/acfc1b4b883b2a3cf0596c881489cb2c953dd219))
|
||||
|
||||
### Features
|
||||
|
||||
- **progress**: Progress is tracked from bec; unified progress backend
|
||||
([`51f7652`](https://github.com/bec-project/bec_widgets/commit/51f7652b1fe59db6bf94a8183ae0e3a715601aa6))
|
||||
|
||||
### Refactoring
|
||||
|
||||
- **bec_progress**: Simplification of chunk radius calculation
|
||||
([`e547ec7`](https://github.com/bec-project/bec_widgets/commit/e547ec71ae1f45db72d9b8cde0b5fe564466333c))
|
||||
|
||||
### Testing
|
||||
|
||||
- **e2e**: Increase rpc test_available_widgets timout back to 100
|
||||
([`af125e2`](https://github.com/bec-project/bec_widgets/commit/af125e2222ff11a73f28dadb3a5d93e409ad010e))
|
||||
|
||||
|
||||
## v3.13.5 (2026-06-02)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
@@ -427,7 +427,7 @@ class BECMainWindow(RPCBase):
|
||||
|
||||
|
||||
class BECProgressBar(RPCBase):
|
||||
"""A custom progress bar with smooth transitions. The displayed text can be customized using a template."""
|
||||
"""A BEC progress bar backed by Qt's native QProgressBar."""
|
||||
|
||||
_IMPORT_MODULE = "bec_widgets.widgets.progress.bec_progressbar.bec_progressbar"
|
||||
|
||||
|
||||
@@ -46,8 +46,8 @@ logger = bec_logger.logger
|
||||
class BECMainWindow(BECWidget, QMainWindow):
|
||||
RPC = True
|
||||
PLUGIN = True
|
||||
SCAN_PROGRESS_WIDTH = 100 # px
|
||||
SCAN_PROGRESS_HEIGHT = 12 # px
|
||||
SCAN_PROGRESS_WIDTH = 120 # px
|
||||
SCAN_PROGRESS_HEIGHT = 20 # px
|
||||
|
||||
def __init__(self, parent=None, window_title: str = "BEC", **kwargs):
|
||||
super().__init__(parent=parent, **kwargs)
|
||||
@@ -197,7 +197,11 @@ class BECMainWindow(BECWidget, QMainWindow):
|
||||
|
||||
# Setting HoverWidget for the scan progress bar - minimal and full version
|
||||
self._scan_progress_bar_simple = ScanProgressBar(
|
||||
self, one_line_design=True, rpc_exposed=False, rpc_passthrough_children=False
|
||||
self,
|
||||
one_line_design=True,
|
||||
rpc_exposed=False,
|
||||
rpc_passthrough_children=False,
|
||||
enable_dynamic_stylesheet=True,
|
||||
)
|
||||
self._scan_progress_bar_simple.show_elapsed_time = False
|
||||
self._scan_progress_bar_simple.show_remaining_time = False
|
||||
@@ -205,8 +209,9 @@ class BECMainWindow(BECWidget, QMainWindow):
|
||||
self._scan_progress_bar_simple.progressbar.label_template = ""
|
||||
self._scan_progress_bar_simple.progressbar.setFixedHeight(self.SCAN_PROGRESS_HEIGHT)
|
||||
self._scan_progress_bar_simple.progressbar.setFixedWidth(self.SCAN_PROGRESS_WIDTH)
|
||||
# This one do not need dynamic styling on hover ScanProgressBar since user will hover on it probably later, when progress bar is big enough
|
||||
self._scan_progress_bar_full = ScanProgressBar(
|
||||
self, rpc_exposed=False, rpc_passthrough_children=False
|
||||
self, rpc_exposed=False, rpc_passthrough_children=False, enable_dynamic_stylesheet=False
|
||||
)
|
||||
self._scan_progress_hover = HoverWidget(
|
||||
self, simple=self._scan_progress_bar_simple, full=self._scan_progress_bar_full
|
||||
@@ -233,8 +238,8 @@ class BECMainWindow(BECWidget, QMainWindow):
|
||||
|
||||
# The actual line
|
||||
line = QFrame()
|
||||
line.setFrameShape(QFrame.VLine)
|
||||
line.setFrameShadow(QFrame.Sunken)
|
||||
line.setFrameShape(QFrame.Shape.VLine)
|
||||
line.setFrameShadow(QFrame.Shadow.Sunken)
|
||||
line.setFixedHeight(status_bar.sizeHint().height() - 2)
|
||||
|
||||
# Wrapper to center the line vertically -> work around for QFrame not being able to center itself
|
||||
@@ -242,7 +247,7 @@ class BECMainWindow(BECWidget, QMainWindow):
|
||||
vbox = QVBoxLayout(wrapper)
|
||||
vbox.setContentsMargins(0, 0, 0, 0)
|
||||
vbox.addStretch()
|
||||
vbox.addWidget(line, alignment=Qt.AlignHCenter)
|
||||
vbox.addWidget(line, alignment=Qt.AlignmentFlag.AlignHCenter)
|
||||
vbox.addStretch()
|
||||
wrapper.setFixedWidth(line.sizeHint().width())
|
||||
|
||||
|
||||
@@ -162,47 +162,6 @@ class ScanCheckBox(QCheckBox):
|
||||
self.setChecked(default)
|
||||
|
||||
|
||||
class ScanOptionalWidget(QGroupBox):
|
||||
def __init__(self, widget, parent=None):
|
||||
super().__init__(parent=parent)
|
||||
self.inner_widget = widget
|
||||
self.arg_name = getattr(widget, "arg_name", None)
|
||||
self.setFlat(True)
|
||||
|
||||
layout = QHBoxLayout(self)
|
||||
layout.setContentsMargins(0, 0, 0, 0)
|
||||
layout.addWidget(widget)
|
||||
|
||||
self.none_checkbox = QCheckBox(self)
|
||||
self.none_checkbox.setToolTip("Set this value to None.")
|
||||
self.none_checkbox.toggled.connect(self._on_none_toggled)
|
||||
layout.addWidget(self.none_checkbox)
|
||||
|
||||
def _on_none_toggled(self, checked: bool) -> None:
|
||||
self.inner_widget.setEnabled(not checked)
|
||||
|
||||
def set_none(self, checked: bool) -> None:
|
||||
self.none_checkbox.setChecked(checked)
|
||||
|
||||
def is_none(self) -> bool:
|
||||
return self.none_checkbox.isChecked()
|
||||
|
||||
def setToolTip(self, text: str) -> None: # noqa: N802
|
||||
super().setToolTip(text)
|
||||
self.inner_widget.setToolTip(text)
|
||||
checkbox_tooltip = "Set this value to None."
|
||||
if text:
|
||||
checkbox_tooltip = f"{text}\n{checkbox_tooltip}"
|
||||
self.none_checkbox.setToolTip(checkbox_tooltip)
|
||||
|
||||
def toolTip(self) -> str: # noqa: N802
|
||||
return self.inner_widget.toolTip()
|
||||
|
||||
def setSuffix(self, suffix: str) -> None: # noqa: N802
|
||||
if hasattr(self.inner_widget, "setSuffix"):
|
||||
self.inner_widget.setSuffix(suffix)
|
||||
|
||||
|
||||
class ScanGroupBox(QGroupBox):
|
||||
WIDGET_HANDLER = {
|
||||
ScanArgType.DEVICE: DeviceComboBox,
|
||||
@@ -233,7 +192,7 @@ class ScanGroupBox(QGroupBox):
|
||||
vbox_layout = QVBoxLayout(self)
|
||||
hbox_layout = QHBoxLayout()
|
||||
vbox_layout.addLayout(hbox_layout)
|
||||
self.layout = QGridLayout(self)
|
||||
self.layout = QGridLayout()
|
||||
vbox_layout.addLayout(self.layout)
|
||||
|
||||
# Add bundle button
|
||||
@@ -252,7 +211,6 @@ class ScanGroupBox(QGroupBox):
|
||||
self.labels = []
|
||||
self.widgets = []
|
||||
self._widget_configs = {}
|
||||
self._wrapped_widgets = {}
|
||||
self._column_labels = {}
|
||||
self.selected_devices = {}
|
||||
|
||||
@@ -339,11 +297,10 @@ class ScanGroupBox(QGroupBox):
|
||||
)
|
||||
if isinstance(widget, ScanLiteralsComboBox):
|
||||
widget.set_literals(item["type"].get("Literal", []))
|
||||
display_widget = self._wrap_optional_widget(widget, item, default)
|
||||
self._widget_configs[display_widget] = item
|
||||
self._apply_unit_metadata(display_widget, item)
|
||||
self.layout.addWidget(display_widget, row, column_index)
|
||||
self.widgets.append(display_widget)
|
||||
self._widget_configs[widget] = item
|
||||
self._apply_unit_metadata(widget, item)
|
||||
self.layout.addWidget(widget, row, column_index)
|
||||
self.widgets.append(widget)
|
||||
|
||||
@Slot(str)
|
||||
def emit_device_selected(self, device_name):
|
||||
@@ -377,10 +334,8 @@ class ScanGroupBox(QGroupBox):
|
||||
return
|
||||
|
||||
for widget in self.widgets[-len(self.inputs) :]:
|
||||
inner_widget = self._inner_widget(widget)
|
||||
if isinstance(inner_widget, DeviceComboBox):
|
||||
self.selected_devices[inner_widget] = ""
|
||||
self._wrapped_widgets.pop(inner_widget, None)
|
||||
if isinstance(widget, DeviceComboBox):
|
||||
self.selected_devices[widget] = ""
|
||||
self._widget_configs.pop(widget, None)
|
||||
widget.close()
|
||||
widget.deleteLater()
|
||||
@@ -392,10 +347,8 @@ class ScanGroupBox(QGroupBox):
|
||||
def remove_all_widget_bundles(self):
|
||||
"""Remove every widget bundle from the scan control layout."""
|
||||
for widget in list(self.widgets):
|
||||
inner_widget = self._inner_widget(widget)
|
||||
if isinstance(inner_widget, DeviceComboBox):
|
||||
self.selected_devices.pop(inner_widget, None)
|
||||
self._wrapped_widgets.pop(inner_widget, None)
|
||||
if isinstance(widget, DeviceComboBox):
|
||||
self.selected_devices.pop(widget, None)
|
||||
self._widget_configs.pop(widget, None)
|
||||
widget.close()
|
||||
widget.deleteLater()
|
||||
@@ -432,7 +385,12 @@ class ScanGroupBox(QGroupBox):
|
||||
for j in range(self.layout.columnCount()):
|
||||
try: # In case that the bundle size changes
|
||||
widget = self.layout.itemAtPosition(i, j).widget()
|
||||
value = self._widget_value(widget, device_object=device_object)
|
||||
if isinstance(widget, DeviceComboBox) and device_object:
|
||||
value = widget.get_current_device()
|
||||
elif isinstance(widget, DeviceComboBox):
|
||||
value = widget.currentText()
|
||||
else:
|
||||
value = WidgetIO.get_value(widget)
|
||||
args.append(value)
|
||||
except AttributeError:
|
||||
continue
|
||||
@@ -442,23 +400,27 @@ class ScanGroupBox(QGroupBox):
|
||||
kwargs = {}
|
||||
for i in range(self.layout.columnCount()):
|
||||
widget = self.layout.itemAtPosition(1, i).widget()
|
||||
value = self._widget_value(widget, device_object=device_object)
|
||||
inner_widget = self._inner_widget(widget)
|
||||
if isinstance(inner_widget, DeviceComboBox) and value is not None and device_object:
|
||||
value = value.name
|
||||
if isinstance(widget, DeviceComboBox) and device_object:
|
||||
value = widget.get_current_device().name
|
||||
elif isinstance(widget, DeviceComboBox):
|
||||
value = widget.currentText()
|
||||
elif isinstance(widget, ScanLiteralsComboBox):
|
||||
value = widget.get_value()
|
||||
else:
|
||||
value = WidgetIO.get_value(widget)
|
||||
kwargs[widget.arg_name] = value
|
||||
return kwargs
|
||||
|
||||
def count_arg_rows(self):
|
||||
widget_rows = 0
|
||||
for row in range(self.layout.rowCount()):
|
||||
if row == 0:
|
||||
continue
|
||||
for col in range(self.layout.columnCount()):
|
||||
item = self.layout.itemAtPosition(row, col)
|
||||
if item is not None and item.widget() is not None:
|
||||
widget_rows += 1
|
||||
break
|
||||
if item is not None:
|
||||
widget = item.widget()
|
||||
if widget is not None:
|
||||
if isinstance(widget, DeviceComboBox):
|
||||
widget_rows += 1
|
||||
return widget_rows
|
||||
|
||||
def set_parameters(self, parameters: list | dict):
|
||||
@@ -482,13 +444,13 @@ class ScanGroupBox(QGroupBox):
|
||||
self.add_input_widgets(self.inputs, row)
|
||||
|
||||
for i, value in enumerate(parameters):
|
||||
self._set_widget_value(self.widgets[i], value)
|
||||
WidgetIO.set_value(self.widgets[i], value)
|
||||
|
||||
def _set_kwarg_parameters(self, parameters: dict):
|
||||
for widget in self.widgets:
|
||||
for key, value in parameters.items():
|
||||
if widget.arg_name == key:
|
||||
self._set_widget_value(widget, value)
|
||||
WidgetIO.set_value(widget, value)
|
||||
break
|
||||
|
||||
@staticmethod
|
||||
@@ -543,7 +505,6 @@ class ScanGroupBox(QGroupBox):
|
||||
return None
|
||||
|
||||
def _widget_position(self, widget) -> tuple[int, int] | None:
|
||||
widget = self._display_widget(widget)
|
||||
for row in range(self.layout.rowCount()):
|
||||
for column in range(self.layout.columnCount()):
|
||||
item = self.layout.itemAtPosition(row, column)
|
||||
@@ -647,43 +608,3 @@ class ScanGroupBox(QGroupBox):
|
||||
if item.get("lt") is not None:
|
||||
maximum = float(item["lt"]) - step
|
||||
widget.setRange(minimum, maximum)
|
||||
|
||||
def _wrap_optional_widget(self, widget, item: dict, default):
|
||||
if not item.get("optional", False):
|
||||
return widget
|
||||
|
||||
wrapped_widget = ScanOptionalWidget(widget, parent=self)
|
||||
wrapped_widget.set_none(default is None)
|
||||
self._wrapped_widgets[widget] = wrapped_widget
|
||||
return wrapped_widget
|
||||
|
||||
@staticmethod
|
||||
def _inner_widget(widget):
|
||||
if isinstance(widget, ScanOptionalWidget):
|
||||
return widget.inner_widget
|
||||
return widget
|
||||
|
||||
def _display_widget(self, widget):
|
||||
return self._wrapped_widgets.get(widget, widget)
|
||||
|
||||
def _widget_value(self, widget, *, device_object: bool = True):
|
||||
if isinstance(widget, ScanOptionalWidget) and widget.is_none():
|
||||
return None
|
||||
|
||||
inner_widget = self._inner_widget(widget)
|
||||
if isinstance(inner_widget, DeviceComboBox) and device_object:
|
||||
return inner_widget.get_current_device()
|
||||
if isinstance(inner_widget, DeviceComboBox):
|
||||
return inner_widget.currentText()
|
||||
if isinstance(inner_widget, ScanLiteralsComboBox):
|
||||
return inner_widget.get_value()
|
||||
return WidgetIO.get_value(inner_widget)
|
||||
|
||||
def _set_widget_value(self, widget, value) -> None:
|
||||
if isinstance(widget, ScanOptionalWidget):
|
||||
widget.set_none(value is None)
|
||||
if value is None:
|
||||
return
|
||||
WidgetIO.set_value(widget.inner_widget, value)
|
||||
return
|
||||
WidgetIO.set_value(widget, value)
|
||||
|
||||
@@ -92,34 +92,27 @@ class ScanInfoAdapter:
|
||||
@staticmethod
|
||||
def parse_annotation(
|
||||
annotation: AnnotationValue,
|
||||
) -> tuple[AnnotationValue, ScanArgumentMetadata, bool]:
|
||||
) -> tuple[AnnotationValue, ScanArgumentMetadata]:
|
||||
"""Extract the serialized base annotation and ``ScanArgument`` metadata.
|
||||
|
||||
Args:
|
||||
annotation (AnnotationValue): Serialized annotation payload from BEC.
|
||||
|
||||
Returns:
|
||||
tuple[AnnotationValue, ScanArgumentMetadata, bool]: The unwrapped annotation,
|
||||
parsed ``ScanArgument`` metadata, and whether ``None`` is an allowed value.
|
||||
tuple[AnnotationValue, ScanArgumentMetadata]: The unwrapped annotation and parsed
|
||||
``ScanArgument`` metadata.
|
||||
"""
|
||||
scan_argument: ScanArgumentMetadata = {}
|
||||
if isinstance(annotation, dict) and "Annotated" in annotation:
|
||||
annotated = annotation["Annotated"]
|
||||
annotation = annotated.get("type", "_empty")
|
||||
scan_argument = annotated.get("metadata", {}).get("ScanArgument", {}) or {}
|
||||
|
||||
allows_none = False
|
||||
if isinstance(annotation, list):
|
||||
allows_none = "NoneType" in annotation
|
||||
annotation = next(
|
||||
(entry for entry in annotation if entry != "NoneType"),
|
||||
annotation[0] if annotation else "_empty",
|
||||
)
|
||||
elif annotation == "NoneType":
|
||||
allows_none = True
|
||||
annotation = "_empty"
|
||||
|
||||
return annotation, scan_argument, allows_none
|
||||
if isinstance(annotation, dict) and "Annotated" in annotation:
|
||||
annotated = annotation["Annotated"]
|
||||
annotation = annotated.get("type", "_empty")
|
||||
scan_argument = annotated.get("metadata", {}).get("ScanArgument", {}) or {}
|
||||
return annotation, scan_argument
|
||||
|
||||
@staticmethod
|
||||
def scan_arg_type_from_annotation(annotation: AnnotationValue) -> AnnotationValue:
|
||||
@@ -149,14 +142,13 @@ class ScanInfoAdapter:
|
||||
Returns:
|
||||
ScanInputConfig: Normalized input configuration for ``ScanControl``.
|
||||
"""
|
||||
annotation, scan_argument, allows_none = self.parse_annotation(param.get("annotation"))
|
||||
annotation, scan_argument = self.parse_annotation(param.get("annotation"))
|
||||
return self._build_scan_input(
|
||||
name=param["name"],
|
||||
annotation=annotation,
|
||||
scan_argument=scan_argument,
|
||||
arg=arg,
|
||||
default=None if arg else param.get("default", None),
|
||||
optional=allows_none,
|
||||
)
|
||||
|
||||
def scan_input_from_arg_input(
|
||||
@@ -179,14 +171,13 @@ class ScanInfoAdapter:
|
||||
self.parse_annotation(signature_by_name[name].get("annotation"))[0]
|
||||
)
|
||||
else:
|
||||
annotation, scan_argument, allows_none = self.parse_annotation(item_type)
|
||||
annotation, scan_argument = self.parse_annotation(item_type)
|
||||
scan_input = self._build_scan_input(
|
||||
name=name,
|
||||
annotation=annotation,
|
||||
scan_argument=scan_argument,
|
||||
arg=True,
|
||||
default=None,
|
||||
optional=allows_none,
|
||||
)
|
||||
if scan_input["type"] in ("_empty", None):
|
||||
scan_input["type"] = item_type
|
||||
@@ -200,7 +191,6 @@ class ScanInfoAdapter:
|
||||
*,
|
||||
arg: bool,
|
||||
default: Any,
|
||||
optional: bool,
|
||||
) -> ScanInputConfig:
|
||||
"""Build one normalized ScanControl input configuration.
|
||||
|
||||
@@ -221,7 +211,6 @@ class ScanInfoAdapter:
|
||||
"display_name": scan_argument.get("display_name") or self.format_display_name(name),
|
||||
"tooltip": self.resolve_tooltip(scan_argument),
|
||||
"default": default,
|
||||
"optional": optional,
|
||||
"expert": scan_argument.get("expert", False),
|
||||
"hidden": scan_argument.get("hidden", False),
|
||||
"precision": scan_argument.get("precision"),
|
||||
|
||||
@@ -2,50 +2,41 @@ import sys
|
||||
from enum import Enum
|
||||
from string import Template
|
||||
|
||||
from qtpy.QtCore import QEasingCurve, QPropertyAnimation, QRectF, Qt, QTimer
|
||||
from qtpy.QtGui import QColor, QPainter, QPainterPath
|
||||
|
||||
|
||||
class ProgressState(Enum):
|
||||
NORMAL = "normal"
|
||||
PAUSED = "paused"
|
||||
INTERRUPTED = "interrupted"
|
||||
COMPLETED = "completed"
|
||||
|
||||
@classmethod
|
||||
def from_bec_status(cls, status: str) -> "ProgressState":
|
||||
"""
|
||||
Map a BEC status string (open, paused, aborted, halted, closed)
|
||||
to the corresponding ProgressState.
|
||||
Any unknown status falls back to NORMAL.
|
||||
"""
|
||||
mapping = {
|
||||
"open": cls.NORMAL,
|
||||
"paused": cls.PAUSED,
|
||||
"aborted": cls.INTERRUPTED,
|
||||
"halted": cls.PAUSED,
|
||||
"closed": cls.COMPLETED,
|
||||
}
|
||||
return mapping.get(status.lower(), cls.NORMAL)
|
||||
|
||||
|
||||
PROGRESS_STATE_COLORS = {
|
||||
ProgressState.NORMAL: QColor("#2979ff"), # blue – normal progress
|
||||
ProgressState.PAUSED: QColor("#ffca28"), # orange/amber – paused
|
||||
ProgressState.INTERRUPTED: QColor("#ff5252"), # red – interrupted
|
||||
ProgressState.COMPLETED: QColor("#00e676"), # green – finished
|
||||
}
|
||||
|
||||
from qtpy.QtWidgets import QApplication, QLabel, QVBoxLayout, QWidget
|
||||
from qtpy.QtCore import QTimer
|
||||
from qtpy.QtGui import QPalette
|
||||
from qtpy.QtWidgets import QApplication, QProgressBar, QSizePolicy, QVBoxLayout, QWidget
|
||||
|
||||
from bec_widgets.utils.bec_widget import BECWidget
|
||||
from bec_widgets.utils.colors import get_accent_colors
|
||||
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
|
||||
|
||||
|
||||
class ProgressState(Enum):
|
||||
NORMAL = "normal"
|
||||
PAUSED = "paused"
|
||||
WARNING = "warning"
|
||||
INTERRUPTED = "interrupted"
|
||||
COMPLETED = "completed"
|
||||
|
||||
|
||||
class BECProgressBar(BECWidget, QWidget):
|
||||
"""
|
||||
A custom progress bar with smooth transitions. The displayed text can be customized using a template.
|
||||
A BEC progress bar backed by Qt's native QProgressBar.
|
||||
|
||||
The displayed text can be customized using a template with $value, $maximum,
|
||||
and $percentage placeholders.
|
||||
|
||||
Args:
|
||||
parent: Parent Qt widget.
|
||||
client: Optional BEC client instance.
|
||||
config: Optional widget configuration.
|
||||
gui_id: Optional GUI identifier used by the BEC widget infrastructure.
|
||||
enable_dynamic_stylesheet: If True, adjust the chunk border radius while the
|
||||
filled chunk is still too narrow for the target radius. This avoids Qt
|
||||
stylesheet over-rounding artifacts on small progress values. Once the
|
||||
target radius is usable, normal value updates no longer rebuild the
|
||||
stylesheet.
|
||||
**kwargs: Additional keyword arguments forwarded to BECWidget.
|
||||
"""
|
||||
|
||||
PLUGIN = True
|
||||
@@ -61,7 +52,15 @@ class BECProgressBar(BECWidget, QWidget):
|
||||
]
|
||||
ICON_NAME = "page_control"
|
||||
|
||||
def __init__(self, parent=None, client=None, config=None, gui_id=None, **kwargs):
|
||||
def __init__(
|
||||
self,
|
||||
parent=None,
|
||||
client=None,
|
||||
config=None,
|
||||
gui_id=None,
|
||||
enable_dynamic_stylesheet: bool = True,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(
|
||||
parent=parent, client=client, gui_id=gui_id, config=config, theme_update=True, **kwargs
|
||||
)
|
||||
@@ -71,7 +70,6 @@ class BECProgressBar(BECWidget, QWidget):
|
||||
# internal values
|
||||
self._oversampling_factor = 50
|
||||
self._value = 0
|
||||
self._target_value = 0
|
||||
self._maximum = 100 * self._oversampling_factor
|
||||
|
||||
# User values
|
||||
@@ -80,46 +78,38 @@ class BECProgressBar(BECWidget, QWidget):
|
||||
self._user_maximum = 100
|
||||
self._label_template = "$value / $maximum - $percentage %"
|
||||
|
||||
# Color settings
|
||||
self._background_color = QColor(30, 30, 30)
|
||||
self._progress_color = accent_colors.highlight
|
||||
|
||||
self._completed_color = accent_colors.success
|
||||
self._border_color = QColor(50, 50, 50)
|
||||
# Corner‑rounding: base radius in pixels (auto‑reduced if bar is small)
|
||||
self._corner_radius = 10
|
||||
self._corner_radius = 8
|
||||
|
||||
# Progress‑bar state handling
|
||||
self._state = ProgressState.NORMAL
|
||||
|
||||
self._state_colors = {
|
||||
ProgressState.NORMAL: accent_colors.default,
|
||||
ProgressState.PAUSED: accent_colors.warning,
|
||||
ProgressState.PAUSED: accent_colors.highlight,
|
||||
ProgressState.WARNING: accent_colors.warning,
|
||||
ProgressState.INTERRUPTED: accent_colors.emergency,
|
||||
ProgressState.COMPLETED: accent_colors.success,
|
||||
}
|
||||
|
||||
# layout settings
|
||||
self._padding_left_right = 10
|
||||
self._value_animation = QPropertyAnimation(self, b"_progressbar_value")
|
||||
self._value_animation.setDuration(200)
|
||||
self._value_animation.setEasingCurve(QEasingCurve.Type.OutCubic)
|
||||
self._chunk_radius = None
|
||||
self._enable_dynamic_stylesheet = enable_dynamic_stylesheet
|
||||
|
||||
# label on top of the progress bar
|
||||
self.center_label = QLabel(self)
|
||||
self.center_label.setAlignment(Qt.AlignHCenter)
|
||||
self.center_label.setMinimumSize(0, 0)
|
||||
self.center_label.setStyleSheet("background: transparent; color: white;")
|
||||
self.progressbar = QProgressBar(self)
|
||||
self.progressbar.setTextVisible(True)
|
||||
self.progressbar.setRange(0, self._maximum)
|
||||
self.progressbar.setMinimumHeight(0)
|
||||
self.progressbar.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Ignored)
|
||||
|
||||
layout = QVBoxLayout(self)
|
||||
layout.setContentsMargins(10, 0, 10, 0)
|
||||
layout.setSpacing(0)
|
||||
layout.addWidget(self.center_label)
|
||||
layout.setAlignment(self.center_label, Qt.AlignCenter)
|
||||
self.setLayout(layout)
|
||||
self._layout = QVBoxLayout(self)
|
||||
self._layout.setContentsMargins(self._padding_left_right, 0, self._padding_left_right, 0)
|
||||
self._layout.setSpacing(0)
|
||||
self._layout.addWidget(self.progressbar)
|
||||
self.setLayout(self._layout)
|
||||
|
||||
self.update()
|
||||
self._adjust_label_width()
|
||||
self._sync_progressbar()
|
||||
self._apply_state_style()
|
||||
|
||||
@SafeProperty(
|
||||
str, doc="The template for the center label. Use $value, $maximum, and $percentage."
|
||||
@@ -140,17 +130,18 @@ class BECProgressBar(BECWidget, QWidget):
|
||||
accent_colors = get_accent_colors()
|
||||
self._state_colors = {
|
||||
ProgressState.NORMAL: accent_colors.default,
|
||||
ProgressState.PAUSED: accent_colors.warning,
|
||||
ProgressState.PAUSED: accent_colors.highlight,
|
||||
ProgressState.WARNING: accent_colors.warning,
|
||||
ProgressState.INTERRUPTED: accent_colors.emergency,
|
||||
ProgressState.COMPLETED: accent_colors.success,
|
||||
}
|
||||
self._chunk_radius = None
|
||||
self._apply_state_style()
|
||||
|
||||
@label_template.setter
|
||||
def label_template(self, template):
|
||||
self._label_template = template
|
||||
self._adjust_label_width()
|
||||
self.set_value(self._user_value)
|
||||
self.update()
|
||||
self._sync_progressbar()
|
||||
|
||||
@SafeProperty(float, designable=False)
|
||||
def _progressbar_value(self):
|
||||
@@ -162,28 +153,16 @@ class BECProgressBar(BECWidget, QWidget):
|
||||
@_progressbar_value.setter
|
||||
def _progressbar_value(self, val):
|
||||
self._value = val
|
||||
self.update()
|
||||
self.progressbar.setValue(int(round(val)))
|
||||
|
||||
def _update_template(self):
|
||||
template = Template(self._label_template)
|
||||
return template.safe_substitute(
|
||||
value=self._user_value,
|
||||
maximum=self._user_maximum,
|
||||
percentage=int((self.map_value(self._user_value) / self._maximum) * 100),
|
||||
percentage=int(self._percentage(self._user_value)),
|
||||
)
|
||||
|
||||
def _adjust_label_width(self):
|
||||
"""
|
||||
Reserve enough horizontal space for the center label so the widget
|
||||
doesn't resize as the text grows during progress.
|
||||
"""
|
||||
template = Template(self._label_template)
|
||||
sample_text = template.safe_substitute(
|
||||
value=self._user_maximum, maximum=self._user_maximum, percentage=100
|
||||
)
|
||||
width = self.center_label.fontMetrics().horizontalAdvance(sample_text)
|
||||
self.center_label.setFixedWidth(width)
|
||||
|
||||
@SafeSlot(float)
|
||||
@SafeSlot(int)
|
||||
def set_value(self, value):
|
||||
@@ -193,21 +172,35 @@ class BECProgressBar(BECWidget, QWidget):
|
||||
Args:
|
||||
value (float): The value to set.
|
||||
"""
|
||||
if value > self._user_maximum:
|
||||
value = self._user_maximum
|
||||
elif value < self._user_minimum:
|
||||
value = self._user_minimum
|
||||
self._target_value = self.map_value(value)
|
||||
self._user_value = value
|
||||
self.center_label.setText(self._update_template())
|
||||
previous_visual_state = self._current_visual_state()
|
||||
previous_value = self._value
|
||||
self._user_value = self._clamp_value(value)
|
||||
self._value = self.map_value(self._user_value)
|
||||
if self._enable_dynamic_stylesheet and self._value < previous_value:
|
||||
self._chunk_radius = None
|
||||
# Update state automatically unless paused or interrupted
|
||||
if self._state not in (ProgressState.PAUSED, ProgressState.INTERRUPTED):
|
||||
if self._state not in (
|
||||
ProgressState.PAUSED,
|
||||
ProgressState.WARNING,
|
||||
ProgressState.INTERRUPTED,
|
||||
):
|
||||
self._state = (
|
||||
ProgressState.COMPLETED
|
||||
if self._user_value >= self._user_maximum
|
||||
else ProgressState.NORMAL
|
||||
)
|
||||
self.animate_progress()
|
||||
self._sync_progressbar()
|
||||
visual_state_changed = self._current_visual_state() is not previous_visual_state
|
||||
if visual_state_changed:
|
||||
self._chunk_radius = None
|
||||
if (
|
||||
self._enable_dynamic_stylesheet
|
||||
and not visual_state_changed
|
||||
and (self._chunk_radius is None or self._chunk_radius != self._target_chunk_radius())
|
||||
):
|
||||
self._update_chunk_radius()
|
||||
if visual_state_changed:
|
||||
self._apply_state_style()
|
||||
|
||||
@SafeProperty(object, doc="Current visual state of the progress bar.")
|
||||
def state(self):
|
||||
@@ -226,7 +219,8 @@ class BECProgressBar(BECWidget, QWidget):
|
||||
if not isinstance(state, ProgressState):
|
||||
raise ValueError("state must be a ProgressState or its value")
|
||||
self._state = state
|
||||
self.update()
|
||||
self._chunk_radius = None
|
||||
self._apply_state_style()
|
||||
|
||||
@SafeProperty(float, doc="Base corner radius in pixels (auto‑scaled down on small bars).")
|
||||
def corner_radius(self) -> float:
|
||||
@@ -235,7 +229,18 @@ class BECProgressBar(BECWidget, QWidget):
|
||||
@corner_radius.setter
|
||||
def corner_radius(self, radius: float):
|
||||
self._corner_radius = max(0.0, radius)
|
||||
self.update()
|
||||
self._chunk_radius = None
|
||||
self._apply_state_style()
|
||||
|
||||
@SafeProperty(bool)
|
||||
def enable_dynamic_stylesheet(self) -> bool:
|
||||
return self._enable_dynamic_stylesheet
|
||||
|
||||
@enable_dynamic_stylesheet.setter
|
||||
def enable_dynamic_stylesheet(self, enabled: bool):
|
||||
self._enable_dynamic_stylesheet = bool(enabled)
|
||||
self._chunk_radius = None
|
||||
self._apply_state_style()
|
||||
|
||||
@SafeProperty(float)
|
||||
def padding_left_right(self) -> float:
|
||||
@@ -244,60 +249,12 @@ class BECProgressBar(BECWidget, QWidget):
|
||||
@padding_left_right.setter
|
||||
def padding_left_right(self, padding: float):
|
||||
self._padding_left_right = padding
|
||||
self.update()
|
||||
self._layout.setContentsMargins(int(round(padding)), 0, int(round(padding)), 0)
|
||||
|
||||
def paintEvent(self, event):
|
||||
painter = QPainter(self)
|
||||
painter.setRenderHint(QPainter.Antialiasing)
|
||||
rect = self.rect().adjusted(self._padding_left_right, 0, -self._padding_left_right, -1)
|
||||
|
||||
# Corner radius adapts to widget height so it never exceeds half the bar’s thickness
|
||||
radius = min(self._corner_radius, rect.height() / 2)
|
||||
|
||||
# Draw background
|
||||
painter.setBrush(self._background_color)
|
||||
painter.setPen(Qt.NoPen)
|
||||
painter.drawRoundedRect(rect, radius, radius) # Rounded corners
|
||||
|
||||
# Draw border
|
||||
painter.setBrush(Qt.NoBrush)
|
||||
painter.setPen(self._border_color)
|
||||
painter.drawRoundedRect(rect, radius, radius)
|
||||
|
||||
# Determine progress colour based on current state
|
||||
if self._state == ProgressState.PAUSED:
|
||||
current_color = self._state_colors[ProgressState.PAUSED]
|
||||
elif self._state == ProgressState.INTERRUPTED:
|
||||
current_color = self._state_colors[ProgressState.INTERRUPTED]
|
||||
elif self._state == ProgressState.COMPLETED or self._value >= self._maximum:
|
||||
current_color = self._state_colors[ProgressState.COMPLETED]
|
||||
else:
|
||||
current_color = self._state_colors[ProgressState.NORMAL]
|
||||
|
||||
# Set clipping region to preserve the background's rounded corners
|
||||
progress_rect = rect.adjusted(
|
||||
0, 0, int(-rect.width() + (self._value / self._maximum) * rect.width()), 0
|
||||
)
|
||||
clip_path = QPainterPath()
|
||||
clip_path.addRoundedRect(
|
||||
QRectF(rect), radius, radius
|
||||
) # Clip to the background's rounded corners
|
||||
painter.setClipPath(clip_path)
|
||||
|
||||
# Draw progress bar
|
||||
painter.setBrush(current_color)
|
||||
painter.drawRect(progress_rect) # Less rounded, no additional rounding
|
||||
|
||||
painter.end()
|
||||
|
||||
def animate_progress(self):
|
||||
"""
|
||||
Animate the progress bar from the current value to the target value.
|
||||
"""
|
||||
self._value_animation.stop()
|
||||
self._value_animation.setStartValue(self._value)
|
||||
self._value_animation.setEndValue(self._target_value)
|
||||
self._value_animation.start()
|
||||
def resizeEvent(self, event):
|
||||
super().resizeEvent(event)
|
||||
self._chunk_radius = None
|
||||
self._update_chunk_radius()
|
||||
|
||||
@SafeProperty(float)
|
||||
def maximum(self):
|
||||
@@ -343,10 +300,11 @@ class BECProgressBar(BECWidget, QWidget):
|
||||
Args:
|
||||
maximum (float): The maximum value.
|
||||
"""
|
||||
previous_maximum = self._user_maximum
|
||||
self._user_maximum = maximum
|
||||
self._adjust_label_width()
|
||||
if self._enable_dynamic_stylesheet and maximum != previous_maximum:
|
||||
self._chunk_radius = None
|
||||
self.set_value(self._user_value) # Update the value to fit the new range
|
||||
self.update()
|
||||
|
||||
@SafeSlot(float)
|
||||
def set_minimum(self, minimum: float):
|
||||
@@ -356,40 +314,126 @@ class BECProgressBar(BECWidget, QWidget):
|
||||
Args:
|
||||
minimum (float): The minimum value.
|
||||
"""
|
||||
previous_minimum = self._user_minimum
|
||||
self._user_minimum = minimum
|
||||
if self._enable_dynamic_stylesheet and minimum != previous_minimum:
|
||||
self._chunk_radius = None
|
||||
self.set_value(self._user_value) # Update the value to fit the new range
|
||||
self.update()
|
||||
|
||||
def map_value(self, value: float):
|
||||
"""
|
||||
Map the user value to the range [0, 100*self._oversampling_factor] for the progress
|
||||
"""
|
||||
return (
|
||||
(value - self._user_minimum) / (self._user_maximum - self._user_minimum) * self._maximum
|
||||
)
|
||||
span = self._user_maximum - self._user_minimum
|
||||
if span <= 0:
|
||||
return float(self._maximum if value >= self._user_maximum else 0)
|
||||
mapped_value = (value - self._user_minimum) / span * self._maximum
|
||||
return min(float(self._maximum), max(0.0, mapped_value))
|
||||
|
||||
def _percentage(self, value: float) -> float:
|
||||
return (self.map_value(value) / self._maximum) * 100 if self._maximum else 0.0
|
||||
|
||||
def _clamp_value(self, value: float) -> float:
|
||||
if self._user_maximum <= self._user_minimum:
|
||||
return self._user_maximum
|
||||
return min(self._user_maximum, max(self._user_minimum, value))
|
||||
|
||||
def _sync_progressbar(self) -> None:
|
||||
self.progressbar.setRange(0, int(self._maximum))
|
||||
self.progressbar.setValue(int(round(self._value)))
|
||||
self.progressbar.setFormat(self._update_template())
|
||||
|
||||
def _setup_style_sheet(self, *, chunk_radius: int) -> None:
|
||||
radius = int(round(self._corner_radius))
|
||||
chunk_color = self._state_colors[self._current_visual_state()].name()
|
||||
self.progressbar.setStyleSheet(f"""
|
||||
QProgressBar {{
|
||||
background-color: palette(mid);
|
||||
border: none;
|
||||
border-radius: {radius}px;
|
||||
color: palette(text);
|
||||
text-align: center;
|
||||
}}
|
||||
QProgressBar::chunk {{
|
||||
background-color: {chunk_color};
|
||||
border-radius: {chunk_radius}px;
|
||||
}}
|
||||
""")
|
||||
|
||||
def _update_chunk_radius(self) -> None:
|
||||
chunk_radius = self._current_chunk_radius()
|
||||
if chunk_radius != self._chunk_radius:
|
||||
self._chunk_radius = chunk_radius
|
||||
self._setup_style_sheet(chunk_radius=chunk_radius)
|
||||
self._apply_state_palette()
|
||||
|
||||
def _apply_state_style(self) -> None:
|
||||
if self._chunk_radius is None:
|
||||
self._chunk_radius = self._current_chunk_radius()
|
||||
self._setup_style_sheet(chunk_radius=self._chunk_radius)
|
||||
self._apply_state_palette()
|
||||
|
||||
def _apply_state_palette(self) -> None:
|
||||
color = self._state_colors[self._current_visual_state()]
|
||||
palette = self.progressbar.palette()
|
||||
palette.setColor(QPalette.ColorRole.Highlight, color)
|
||||
palette.setColor(QPalette.ColorRole.HighlightedText, palette.color(QPalette.ColorRole.Text))
|
||||
self.progressbar.setPalette(palette)
|
||||
|
||||
def _current_chunk_radius(self) -> int:
|
||||
target_radius = self._target_chunk_radius()
|
||||
if not self._enable_dynamic_stylesheet:
|
||||
return target_radius
|
||||
return self._calculate_chunk_radius(target_radius)
|
||||
|
||||
def _target_chunk_radius(self) -> int:
|
||||
radius = int(round(self._corner_radius))
|
||||
return max(0, radius - 1)
|
||||
|
||||
def _calculate_chunk_radius(self, target_radius: int) -> int:
|
||||
"""
|
||||
Scale the chunk radius down while the filled part is narrower than the target radius.
|
||||
Qt stylesheets otherwise over-round very small chunks.
|
||||
"""
|
||||
if target_radius <= 0 or self._maximum <= 0:
|
||||
return 0
|
||||
fill_width = self.progressbar.width() * min(1.0, max(0.0, self._value / self._maximum))
|
||||
if fill_width <= 0:
|
||||
return 0
|
||||
return min(target_radius, max(1, int(fill_width / 2)))
|
||||
|
||||
def _current_visual_state(self) -> ProgressState:
|
||||
if self._state in (ProgressState.PAUSED, ProgressState.WARNING, ProgressState.INTERRUPTED):
|
||||
return self._state
|
||||
if self._state == ProgressState.COMPLETED or self._value >= self._maximum:
|
||||
return ProgressState.COMPLETED
|
||||
return ProgressState.NORMAL
|
||||
|
||||
def _get_label(self) -> str:
|
||||
"""Return the label text. mostly used for testing rpc."""
|
||||
return self.center_label.text()
|
||||
return self.progressbar.text()
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
app = QApplication(sys.argv)
|
||||
|
||||
progressBar = BECProgressBar()
|
||||
progressBar.show()
|
||||
progressBar.set_minimum(-100)
|
||||
progressBar.set_maximum(0)
|
||||
progress_bar = BECProgressBar()
|
||||
progress_bar.setWindowTitle("BEC Progress Bar")
|
||||
progress_bar.resize(360, 48)
|
||||
progress_bar.set_minimum(-100)
|
||||
progress_bar.set_maximum(0)
|
||||
progress_bar.set_value(-100)
|
||||
progress_bar.show()
|
||||
|
||||
# Example of setting values
|
||||
def update_progress():
|
||||
value = progressBar._user_value + 2.5
|
||||
if value > progressBar._user_maximum:
|
||||
value = -100 # progressBar._maximum / progressBar._upsampling_factor
|
||||
progressBar.set_value(value)
|
||||
value = progress_bar._user_value + 2.5
|
||||
if value > progress_bar._user_maximum:
|
||||
value = progress_bar._user_minimum
|
||||
progress_bar.set_value(value)
|
||||
|
||||
timer = QTimer()
|
||||
timer = QTimer(progress_bar)
|
||||
timer.timeout.connect(update_progress)
|
||||
timer.start(200) # Update every half second
|
||||
timer.start(200)
|
||||
|
||||
sys.exit(app.exec())
|
||||
|
||||
@@ -0,0 +1,288 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Literal
|
||||
|
||||
import numpy as np
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from qtpy.QtCore import QObject, QTimer, Signal
|
||||
|
||||
from bec_widgets.utils.error_popups import SafeSlot
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ProgressSnapshot:
|
||||
value: float
|
||||
max_value: float
|
||||
done: bool
|
||||
status: Literal["open", "paused", "aborted", "halted", "closed", "user_completed"]
|
||||
scan_id: str | None = None
|
||||
scan_number: int | None = None
|
||||
rid: str | None = None
|
||||
is_new_scan: bool = False
|
||||
|
||||
|
||||
class ProgressTask(QObject):
|
||||
"""
|
||||
Class to store progress information.
|
||||
Inspired by https://github.com/Textualize/rich/blob/master/rich/progress.py
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, parent: QObject | None, value: float = 0, max_value: float = 0, done: bool = False
|
||||
):
|
||||
super().__init__(parent=parent)
|
||||
self.start_time = time.monotonic()
|
||||
self.done = done
|
||||
self.value = value
|
||||
self.max_value = max_value
|
||||
self._elapsed_time = 0
|
||||
|
||||
self.timer = QTimer(self)
|
||||
self.timer.timeout.connect(self.update_elapsed_time)
|
||||
self.timer.start(1000)
|
||||
|
||||
def update(self, value: float, max_value: float, done: bool = False):
|
||||
"""
|
||||
Update the progress.
|
||||
"""
|
||||
self.max_value = max_value
|
||||
self.done = done
|
||||
self.value = value
|
||||
if done:
|
||||
self.timer.stop()
|
||||
|
||||
def update_elapsed_time(self):
|
||||
"""
|
||||
Update the time estimates. This is called every second by a QTimer.
|
||||
"""
|
||||
self._elapsed_time = max(0.0, time.monotonic() - self.start_time)
|
||||
|
||||
@property
|
||||
def percentage(self) -> float:
|
||||
"""float: Get progress of task as a percentage. If a None total was set, returns 0"""
|
||||
if not self.max_value:
|
||||
return 0.0
|
||||
completed = (self.value / self.max_value) * 100.0
|
||||
completed = min(100.0, max(0.0, completed))
|
||||
return completed
|
||||
|
||||
@property
|
||||
def speed(self) -> float:
|
||||
"""Get the estimated speed in steps per second."""
|
||||
if self._elapsed_time == 0:
|
||||
return 0.0
|
||||
|
||||
return self.value / self._elapsed_time
|
||||
|
||||
@property
|
||||
def frequency(self) -> float:
|
||||
"""Get the estimated frequency in steps per second."""
|
||||
if self.speed == 0:
|
||||
return 0.0
|
||||
return 1 / self.speed
|
||||
|
||||
@property
|
||||
def time_elapsed(self) -> str:
|
||||
return self._format_time(int(self._elapsed_time))
|
||||
|
||||
@property
|
||||
def remaining(self) -> float:
|
||||
"""Get the estimated remaining steps."""
|
||||
if self.done:
|
||||
return 0.0
|
||||
remaining = self.max_value - self.value
|
||||
return remaining
|
||||
|
||||
@property
|
||||
def time_remaining(self) -> str:
|
||||
"""
|
||||
Get the estimated remaining time in the format HH:MM:SS.
|
||||
"""
|
||||
if self.done or not self.speed or not self.remaining:
|
||||
return self._format_time(0)
|
||||
estimate = int(np.round(self.remaining / self.speed))
|
||||
|
||||
return self._format_time(estimate)
|
||||
|
||||
@staticmethod
|
||||
def _format_time(seconds: float) -> str:
|
||||
"""
|
||||
Format the time in seconds to a string in the format HH:MM:SS.
|
||||
"""
|
||||
return f"{seconds // 3600:02}:{(seconds // 60) % 60:02}:{seconds % 60:02}"
|
||||
|
||||
|
||||
class BECProgressTracker(QObject):
|
||||
"""
|
||||
Shared backend for BEC scan progress messages.
|
||||
"""
|
||||
|
||||
progress_started = Signal(object)
|
||||
progress_updated = Signal(object)
|
||||
progress_finished = Signal(object)
|
||||
progress_cleared = Signal()
|
||||
|
||||
def __init__(self, bec_dispatcher, parent: QObject | None = None):
|
||||
super().__init__(parent=parent)
|
||||
self.bec_dispatcher = bec_dispatcher
|
||||
self._connected = False
|
||||
self.task: ProgressTask | None = None
|
||||
self.scan_number: int | None = None
|
||||
self._active_scan_id: str | None = None
|
||||
self._active_rid: str | None = None
|
||||
self._last_reset_scan_id: str | None = None
|
||||
self._last_progress_scan_id: str | None = None
|
||||
|
||||
def start(self) -> None:
|
||||
if self._connected:
|
||||
return
|
||||
self.bec_dispatcher.connect_slot(
|
||||
self.process_progress_message, MessageEndpoints.scan_progress()
|
||||
)
|
||||
self.bec_dispatcher.connect_slot(
|
||||
self.process_scan_status_message, MessageEndpoints.scan_status()
|
||||
)
|
||||
self._connected = True
|
||||
|
||||
def _start_task(self, scan_id: str | None, rid: str | None = None) -> None:
|
||||
if self.task is not None:
|
||||
self.task.timer.stop()
|
||||
self.task.deleteLater()
|
||||
self.task = ProgressTask(parent=self)
|
||||
self._active_scan_id = scan_id
|
||||
self._active_rid = rid
|
||||
self.progress_started.emit(
|
||||
ProgressSnapshot(
|
||||
value=0,
|
||||
max_value=100,
|
||||
done=False,
|
||||
status="open",
|
||||
scan_id=self._active_scan_id,
|
||||
scan_number=self.scan_number,
|
||||
rid=self._active_rid,
|
||||
)
|
||||
)
|
||||
|
||||
def clear_task(self, *, emit_finished: bool = True) -> None:
|
||||
if self.task is None:
|
||||
self._active_scan_id = None
|
||||
self._active_rid = None
|
||||
self.progress_cleared.emit()
|
||||
return
|
||||
self.task.timer.stop()
|
||||
self.task.deleteLater()
|
||||
self.task = None
|
||||
self._active_scan_id = None
|
||||
self._active_rid = None
|
||||
self.progress_cleared.emit()
|
||||
if emit_finished:
|
||||
self.progress_finished.emit(
|
||||
ProgressSnapshot(
|
||||
value=0,
|
||||
max_value=100,
|
||||
done=True,
|
||||
status="open",
|
||||
scan_id=self._active_scan_id,
|
||||
scan_number=self.scan_number,
|
||||
rid=self._active_rid,
|
||||
)
|
||||
)
|
||||
|
||||
@SafeSlot(dict, dict)
|
||||
def process_progress_message(
|
||||
self, msg_content: dict, metadata: dict
|
||||
) -> ProgressSnapshot | None:
|
||||
done = msg_content.get("done", False)
|
||||
value = msg_content.get("value", 0)
|
||||
max_value = msg_content.get("max_value", 100)
|
||||
status: Literal["open", "paused", "aborted", "halted", "closed", "user_completed"] = (
|
||||
metadata.get("status", "open")
|
||||
)
|
||||
scan_id = metadata.get("scan_id") or metadata.get("RID")
|
||||
rid = metadata.get("RID")
|
||||
scan_number = metadata.get("scan_number")
|
||||
if scan_number is not None:
|
||||
self.scan_number = scan_number
|
||||
if scan_id is not None:
|
||||
self._last_progress_scan_id = scan_id
|
||||
is_new_scan = False
|
||||
previous_scan_id = self._active_scan_id
|
||||
previous_rid = self._active_rid
|
||||
identity_changed = (
|
||||
(scan_id is not None and scan_id != previous_scan_id)
|
||||
or (rid is not None and rid != previous_rid)
|
||||
or (previous_scan_id is None and previous_rid is None)
|
||||
)
|
||||
|
||||
if self.task is None:
|
||||
self._start_task(scan_id, rid=rid)
|
||||
is_new_scan = identity_changed
|
||||
elif scan_id is not None and scan_id != self._active_scan_id:
|
||||
self._start_task(scan_id, rid=rid)
|
||||
is_new_scan = True
|
||||
elif rid is not None and rid != self._active_rid:
|
||||
self._start_task(scan_id or self._active_scan_id, rid=rid)
|
||||
is_new_scan = True
|
||||
|
||||
if self.task is None:
|
||||
return None
|
||||
|
||||
self.task.update(value, max_value, done)
|
||||
snapshot = ProgressSnapshot(
|
||||
value=value,
|
||||
max_value=max_value,
|
||||
done=done,
|
||||
status=status,
|
||||
scan_id=self._active_scan_id,
|
||||
scan_number=self.scan_number,
|
||||
rid=self._active_rid,
|
||||
is_new_scan=is_new_scan,
|
||||
)
|
||||
self.progress_updated.emit(snapshot)
|
||||
if done:
|
||||
self.clear_task()
|
||||
return snapshot
|
||||
|
||||
@SafeSlot(dict, dict)
|
||||
def process_scan_status_message(
|
||||
self, msg_content: dict, metadata: dict
|
||||
) -> ProgressSnapshot | None:
|
||||
if msg_content.get("status") != "open":
|
||||
return None
|
||||
scan_id = msg_content.get("scan_id") or metadata.get("scan_id") or metadata.get("RID")
|
||||
if scan_id is None or scan_id == self._last_reset_scan_id:
|
||||
return None
|
||||
if scan_id == self._last_progress_scan_id:
|
||||
self._last_reset_scan_id = scan_id
|
||||
return None
|
||||
|
||||
self.clear_task(emit_finished=False)
|
||||
self._last_reset_scan_id = scan_id
|
||||
self.scan_number = msg_content.get("scan_number")
|
||||
snapshot = ProgressSnapshot(
|
||||
value=0,
|
||||
max_value=100,
|
||||
done=False,
|
||||
status="open",
|
||||
scan_id=scan_id,
|
||||
scan_number=self.scan_number,
|
||||
rid=metadata.get("RID"),
|
||||
is_new_scan=True,
|
||||
)
|
||||
self.progress_updated.emit(snapshot)
|
||||
return snapshot
|
||||
|
||||
def cleanup(self) -> None:
|
||||
self.clear_task(emit_finished=False)
|
||||
if self._connected:
|
||||
self.bec_dispatcher.disconnect_slot(
|
||||
self.process_progress_message, MessageEndpoints.scan_progress()
|
||||
)
|
||||
self.bec_dispatcher.disconnect_slot(
|
||||
self.process_scan_status_message, MessageEndpoints.scan_status()
|
||||
)
|
||||
self._connected = False
|
||||
self._last_reset_scan_id = None
|
||||
self._last_progress_scan_id = None
|
||||
@@ -13,6 +13,7 @@ from bec_widgets import BECWidget
|
||||
from bec_widgets.utils.bec_connector import ConnectionConfig
|
||||
from bec_widgets.utils.colors import Colors
|
||||
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
|
||||
from bec_widgets.widgets.progress.progress_backend import BECProgressTracker, ProgressSnapshot
|
||||
|
||||
logger = bec_logger.logger
|
||||
if TYPE_CHECKING:
|
||||
@@ -81,6 +82,8 @@ class Ring(BECWidget, QWidget):
|
||||
self._color: QColor = self.convert_color(self.config.color)
|
||||
self._background_color: QColor = self.convert_color(self.config.background_color)
|
||||
self.registered_slot: tuple[Callable, str | EndpointInfo] | None = None
|
||||
self.progress_tracker = BECProgressTracker(self.bec_dispatcher, parent=self)
|
||||
self.progress_tracker.progress_updated.connect(self._on_progress_snapshot)
|
||||
self.RID = None
|
||||
self._gap = 5
|
||||
self._hovered = False
|
||||
@@ -219,35 +222,32 @@ class Ring(BECWidget, QWidget):
|
||||
case "manual":
|
||||
if self.config.mode == "manual":
|
||||
return
|
||||
if self.registered_slot is not None:
|
||||
self.bec_dispatcher.disconnect_slot(*self.registered_slot)
|
||||
self._disconnect_registered_update()
|
||||
self.config.mode = "manual"
|
||||
self.registered_slot = None
|
||||
case "scan":
|
||||
if self.config.mode == "scan":
|
||||
return
|
||||
if self.registered_slot is not None:
|
||||
self.bec_dispatcher.disconnect_slot(*self.registered_slot)
|
||||
self._disconnect_registered_update()
|
||||
self.config.mode = "scan"
|
||||
self.bec_dispatcher.connect_slot(
|
||||
self.on_scan_progress, MessageEndpoints.scan_progress()
|
||||
)
|
||||
self.registered_slot = (self.on_scan_progress, MessageEndpoints.scan_progress())
|
||||
self.progress_tracker.start()
|
||||
case "device":
|
||||
if self.registered_slot is not None:
|
||||
self.bec_dispatcher.disconnect_slot(*self.registered_slot)
|
||||
self._disconnect_registered_update()
|
||||
self.config.mode = "device"
|
||||
if device == "":
|
||||
self.registered_slot = None
|
||||
return
|
||||
self.config.device = device
|
||||
# self.config.signal = self._get_signal_from_device(device, signal)
|
||||
signal = self._update_device_connection(device, signal)
|
||||
self.config.signal = signal
|
||||
|
||||
case _:
|
||||
raise ValueError(f"Unsupported mode: {mode}")
|
||||
|
||||
def _disconnect_registered_update(self):
|
||||
if self.registered_slot is not None:
|
||||
self.bec_dispatcher.disconnect_slot(*self.registered_slot)
|
||||
self.registered_slot = None
|
||||
self.progress_tracker.cleanup()
|
||||
|
||||
def set_precision(self, precision: int):
|
||||
"""
|
||||
Set the precision for the ring widget.
|
||||
@@ -270,13 +270,13 @@ class Ring(BECWidget, QWidget):
|
||||
|
||||
def _get_signals_for_device(self, device: str) -> dict[str, list[str]]:
|
||||
"""
|
||||
Get the signals for the device.
|
||||
Get the appropriate signals for the device to be used in the ring widget, based on the signal infos from the device manager.
|
||||
|
||||
Args:
|
||||
device(str): Device name for the device
|
||||
device(str): Device name for the device readback mode
|
||||
|
||||
Returns:
|
||||
dict[str, list[str]]: Dictionary with the signals for the device
|
||||
dict[str, list[str]]: Signal infos for the device to be used in the ring widget
|
||||
"""
|
||||
dm = self.bec_dispatcher.client.device_manager
|
||||
if not dm:
|
||||
@@ -285,24 +285,25 @@ class Ring(BECWidget, QWidget):
|
||||
if dev_obj is None:
|
||||
raise ValueError(f"Device '{device}' not found in device manager.")
|
||||
|
||||
signal_infos = getattr(dev_obj, "_info", {}).get("signals", {})
|
||||
progress_signals = [
|
||||
obj["component_name"]
|
||||
for obj in dev_obj._info["signals"].values()
|
||||
if obj["signal_class"] == "ProgressSignal"
|
||||
for obj in signal_infos.values()
|
||||
if obj.get("signal_class") == "ProgressSignal"
|
||||
]
|
||||
hinted_signals = [
|
||||
obj["obj_name"]
|
||||
for obj in dev_obj._info["signals"].values()
|
||||
if obj["kind_str"] == "hinted"
|
||||
and obj["signal_class"]
|
||||
for obj in signal_infos.values()
|
||||
if obj.get("kind_str") == "hinted"
|
||||
and obj.get("signal_class")
|
||||
not in ["ProgressSignal", "AsyncSignal", "AsyncMultiSignal", "DynamicSignal"]
|
||||
]
|
||||
|
||||
normal_signals = [
|
||||
obj["component_name"]
|
||||
for obj in dev_obj._info["signals"].values()
|
||||
if obj["kind_str"] == "normal"
|
||||
for obj in signal_infos.values()
|
||||
if obj.get("kind_str") == "normal"
|
||||
]
|
||||
|
||||
return {
|
||||
"progress_signals": progress_signals,
|
||||
"hinted_signals": hinted_signals,
|
||||
@@ -311,21 +312,15 @@ class Ring(BECWidget, QWidget):
|
||||
|
||||
def _update_device_connection(self, device: str, signal: str | None) -> str:
|
||||
"""
|
||||
Update the device connection for the ring widget.
|
||||
Subscribe device mode to the endpoint matching the selected signal.
|
||||
|
||||
In general, we support two modes here:
|
||||
- If signal is provided, we use that directly.
|
||||
- If signal is not provided, we try to get the signal from the device manager.
|
||||
We first check for progress signals, then for hinted signals, and finally for normal signals.
|
||||
|
||||
Depending on what type of signal we get (progress or hinted/normal), we subscribe to different endpoints.
|
||||
|
||||
Args:
|
||||
device(str): Device name for the device mode
|
||||
signal(str): Signal name for the device mode
|
||||
When no signal is provided, the ring selects the first available progress
|
||||
signal, then the first hinted readback signal, then the first normal
|
||||
readback signal. Progress signals use the device_progress endpoint;
|
||||
readback signals use the device_readback endpoint.
|
||||
|
||||
Returns:
|
||||
str: The selected signal name for the device mode
|
||||
The selected signal name, or an empty string if the device is not known.
|
||||
"""
|
||||
logger.info(f"Updating device connection for device '{device}' and signal '{signal}'")
|
||||
dm = self.bec_dispatcher.client.device_manager
|
||||
@@ -341,18 +336,17 @@ class Ring(BECWidget, QWidget):
|
||||
normal_signals = signals["normal_signals"]
|
||||
|
||||
if not signal:
|
||||
# If signal is not provided, we try to get it from the device manager
|
||||
if len(progress_signals) > 0:
|
||||
if progress_signals:
|
||||
signal = progress_signals[0]
|
||||
logger.info(
|
||||
f"Using progress signal '{signal}' for device '{device}' in ring progress bar."
|
||||
)
|
||||
elif len(hinted_signals) > 0:
|
||||
elif hinted_signals:
|
||||
signal = hinted_signals[0]
|
||||
logger.info(
|
||||
f"Using hinted signal '{signal}' for device '{device}' in ring progress bar."
|
||||
)
|
||||
elif len(normal_signals) > 0:
|
||||
elif normal_signals:
|
||||
signal = normal_signals[0]
|
||||
logger.info(
|
||||
f"Using normal signal '{signal}' for device '{device}' in ring progress bar."
|
||||
@@ -366,26 +360,18 @@ class Ring(BECWidget, QWidget):
|
||||
self.bec_dispatcher.connect_slot(self.on_device_progress, endpoint)
|
||||
self.registered_slot = (self.on_device_progress, endpoint)
|
||||
return signal
|
||||
|
||||
if signal in hinted_signals or signal in normal_signals:
|
||||
endpoint = MessageEndpoints.device_readback(device)
|
||||
self.bec_dispatcher.connect_slot(self.on_device_readback, endpoint)
|
||||
self.registered_slot = (self.on_device_readback, endpoint)
|
||||
return signal
|
||||
|
||||
@SafeSlot(dict, dict)
|
||||
def on_scan_progress(self, msg, meta):
|
||||
"""
|
||||
Update the ring widget with the scan progress.
|
||||
|
||||
Args:
|
||||
msg(dict): Message with the scan progress
|
||||
meta(dict): Metadata for the message
|
||||
"""
|
||||
current_RID = meta.get("RID", None)
|
||||
if current_RID != self.RID:
|
||||
self.set_min_max_values(0, msg.get("max_value", 100))
|
||||
self.set_value(msg.get("value", 0))
|
||||
self.update()
|
||||
raise ValueError(
|
||||
f"Signal '{signal}' is not usable for ring progress device mode. "
|
||||
f"Available progress signals: {progress_signals}; "
|
||||
f"available readback signals: {hinted_signals + normal_signals}."
|
||||
)
|
||||
|
||||
@SafeSlot(dict, dict)
|
||||
def on_device_readback(self, msg, meta):
|
||||
@@ -408,30 +394,31 @@ class Ring(BECWidget, QWidget):
|
||||
|
||||
@SafeSlot(dict, dict)
|
||||
def on_device_progress(self, msg, meta):
|
||||
"""
|
||||
Update the ring widget with the device progress.
|
||||
|
||||
Args:
|
||||
msg(dict): Message with the device progress
|
||||
meta(dict): Metadata for the message
|
||||
"""
|
||||
device = self.config.device
|
||||
if device is None:
|
||||
return
|
||||
max_val = msg.get("max_value", 100)
|
||||
self.set_min_max_values(0, max_val)
|
||||
value = msg.get("value", 0)
|
||||
if msg.get("done"):
|
||||
value = max_val
|
||||
self.set_value(value)
|
||||
self.set_value(max_val if msg.get("done") else msg.get("value", 0))
|
||||
self.update()
|
||||
|
||||
def _on_progress_snapshot(self, snapshot: ProgressSnapshot):
|
||||
if snapshot.is_new_scan:
|
||||
self.set_min_max_values(0, snapshot.max_value)
|
||||
self.RID = snapshot.rid
|
||||
self.set_value(snapshot.value)
|
||||
self.update()
|
||||
|
||||
def paintEvent(self, event):
|
||||
if not self.progress_container:
|
||||
return
|
||||
painter = QtGui.QPainter(self)
|
||||
painter.setRenderHint(QtGui.QPainter.RenderHint.Antialiasing)
|
||||
size = min(self.width(), self.height())
|
||||
if size <= 0 or not self.isVisible():
|
||||
return
|
||||
painter = QtGui.QPainter(self)
|
||||
if not painter.isActive():
|
||||
return
|
||||
painter.setRenderHint(QtGui.QPainter.RenderHint.Antialiasing)
|
||||
|
||||
# Center the ring
|
||||
x_offset = (self.width() - size) // 2
|
||||
@@ -509,15 +496,6 @@ class Ring(BECWidget, QWidget):
|
||||
return QtGui.QColor(*color)
|
||||
raise ValueError(f"Unsupported color format: {color}")
|
||||
|
||||
def cleanup(self):
|
||||
"""
|
||||
Cleanup the ring widget.
|
||||
Disconnect any registered slots.
|
||||
"""
|
||||
if self.registered_slot is not None:
|
||||
self.bec_dispatcher.disconnect_slot(*self.registered_slot)
|
||||
self.registered_slot = None
|
||||
|
||||
###############################################
|
||||
####### QProperties ###########################
|
||||
###############################################
|
||||
@@ -666,6 +644,7 @@ class Ring(BECWidget, QWidget):
|
||||
if self.registered_slot is not None:
|
||||
self.bec_dispatcher.disconnect_slot(*self.registered_slot)
|
||||
self.registered_slot = None
|
||||
self.progress_tracker.cleanup()
|
||||
self._hover_animation.stop()
|
||||
super().cleanup()
|
||||
|
||||
|
||||
@@ -1,121 +1,27 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
import os
|
||||
import time
|
||||
from typing import Literal
|
||||
|
||||
import numpy as np
|
||||
from bec_lib import messages
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.logger import bec_logger
|
||||
from qtpy.QtCore import QObject, QTimer, Signal
|
||||
from qtpy.QtCore import Signal
|
||||
from qtpy.QtWidgets import QVBoxLayout, QWidget
|
||||
|
||||
from bec_widgets.utils.bec_widget import BECWidget
|
||||
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
|
||||
from bec_widgets.utils.error_popups import SafeProperty
|
||||
from bec_widgets.utils.ui_loader import UILoader
|
||||
from bec_widgets.widgets.progress.bec_progressbar.bec_progressbar import ProgressState
|
||||
from bec_widgets.widgets.progress.progress_backend import BECProgressTracker, ProgressSnapshot
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class ProgressSource(enum.Enum):
|
||||
"""
|
||||
Enum to define the source of the progress.
|
||||
"""
|
||||
|
||||
SCAN_PROGRESS = "scan_progress"
|
||||
DEVICE_PROGRESS = "device_progress"
|
||||
|
||||
|
||||
class ProgressTask(QObject):
|
||||
"""
|
||||
Class to store progress information.
|
||||
Inspired by https://github.com/Textualize/rich/blob/master/rich/progress.py
|
||||
"""
|
||||
|
||||
def __init__(self, parent: QWidget, value: float = 0, max_value: float = 0, done: bool = False):
|
||||
super().__init__(parent=parent)
|
||||
self.start_time = time.time()
|
||||
self.done = done
|
||||
self.value = value
|
||||
self.max_value = max_value
|
||||
self._elapsed_time = 0
|
||||
|
||||
self.timer = QTimer(self)
|
||||
self.timer.timeout.connect(self.update_elapsed_time)
|
||||
self.timer.start(100) # update the elapsed time every 100 ms
|
||||
|
||||
def update(self, value: float, max_value: float, done: bool = False):
|
||||
"""
|
||||
Update the progress.
|
||||
"""
|
||||
self.max_value = max_value
|
||||
self.done = done
|
||||
self.value = value
|
||||
if done:
|
||||
self.timer.stop()
|
||||
|
||||
def update_elapsed_time(self):
|
||||
"""
|
||||
Update the time estimates. This is called every 100 ms by a QTimer.
|
||||
"""
|
||||
self._elapsed_time += 0.1
|
||||
|
||||
@property
|
||||
def percentage(self) -> float:
|
||||
"""float: Get progress of task as a percentage. If a None total was set, returns 0"""
|
||||
if not self.max_value:
|
||||
return 0.0
|
||||
completed = (self.value / self.max_value) * 100.0
|
||||
completed = min(100.0, max(0.0, completed))
|
||||
return completed
|
||||
|
||||
@property
|
||||
def speed(self) -> float:
|
||||
"""Get the estimated speed in steps per second."""
|
||||
if self._elapsed_time == 0:
|
||||
return 0.0
|
||||
|
||||
return self.value / self._elapsed_time
|
||||
|
||||
@property
|
||||
def frequency(self) -> float:
|
||||
"""Get the estimated frequency in steps per second."""
|
||||
if self.speed == 0:
|
||||
return 0.0
|
||||
return 1 / self.speed
|
||||
|
||||
@property
|
||||
def time_elapsed(self) -> str:
|
||||
# format the elapsed time to a string in the format HH:MM:SS
|
||||
return self._format_time(int(self._elapsed_time))
|
||||
|
||||
@property
|
||||
def remaining(self) -> float:
|
||||
"""Get the estimated remaining steps."""
|
||||
if self.done:
|
||||
return 0.0
|
||||
remaining = self.max_value - self.value
|
||||
return remaining
|
||||
|
||||
@property
|
||||
def time_remaining(self) -> str:
|
||||
"""
|
||||
Get the estimated remaining time in the format HH:MM:SS.
|
||||
"""
|
||||
if self.done or not self.speed or not self.remaining:
|
||||
return self._format_time(0)
|
||||
estimate = int(np.round(self.remaining / self.speed))
|
||||
|
||||
return self._format_time(estimate)
|
||||
|
||||
def _format_time(self, seconds: float) -> str:
|
||||
"""
|
||||
Format the time in seconds to a string in the format HH:MM:SS.
|
||||
"""
|
||||
return f"{seconds // 3600:02}:{(seconds // 60) % 60:02}:{seconds % 60:02}"
|
||||
BEC_STATUS_TO_PROGRESS_STATE = {
|
||||
"open": ProgressState.NORMAL,
|
||||
"paused": ProgressState.PAUSED,
|
||||
"aborted": ProgressState.WARNING,
|
||||
"halted": ProgressState.INTERRUPTED,
|
||||
"closed": ProgressState.COMPLETED,
|
||||
"user_completed": ProgressState.COMPLETED,
|
||||
}
|
||||
|
||||
|
||||
class ScanProgressBar(BECWidget, QWidget):
|
||||
@@ -130,7 +36,14 @@ class ScanProgressBar(BECWidget, QWidget):
|
||||
progress_finished = Signal()
|
||||
|
||||
def __init__(
|
||||
self, parent=None, client=None, config=None, gui_id=None, one_line_design=False, **kwargs
|
||||
self,
|
||||
parent=None,
|
||||
client=None,
|
||||
config=None,
|
||||
gui_id=None,
|
||||
one_line_design=False,
|
||||
enable_dynamic_stylesheet: bool = True,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(parent=parent, client=client, config=config, gui_id=gui_id, **kwargs)
|
||||
|
||||
@@ -146,83 +59,43 @@ class ScanProgressBar(BECWidget, QWidget):
|
||||
self.layout.addWidget(self.ui)
|
||||
self.setLayout(self.layout)
|
||||
self.progressbar = self.ui.progressbar
|
||||
self.progressbar.enable_dynamic_stylesheet = enable_dynamic_stylesheet
|
||||
self._show_elapsed_time = self.ui.elapsed_time_label.isVisible()
|
||||
self._show_remaining_time = self.ui.remaining_time_label.isVisible()
|
||||
self._show_source_label = self.ui.source_label.isVisible()
|
||||
|
||||
self.connect_to_queue()
|
||||
self._progress_source = None
|
||||
self._progress_device = None
|
||||
self.task = None
|
||||
self.scan_number = None
|
||||
|
||||
def connect_to_queue(self):
|
||||
"""
|
||||
Connect to the queue status signal.
|
||||
"""
|
||||
self.bec_dispatcher.connect_slot(self.on_queue_update, MessageEndpoints.scan_queue_status())
|
||||
|
||||
def set_progress_source(self, source: ProgressSource, device=None):
|
||||
"""
|
||||
Set the source of the progress.
|
||||
"""
|
||||
if self._progress_source == source and self._progress_device == device:
|
||||
self.update_source_label(source, device=device)
|
||||
return
|
||||
if self._progress_source is not None:
|
||||
self.bec_dispatcher.disconnect_slot(
|
||||
self.on_progress_update,
|
||||
(
|
||||
MessageEndpoints.scan_progress()
|
||||
if self._progress_source == ProgressSource.SCAN_PROGRESS
|
||||
else MessageEndpoints.device_progress(device=self._progress_device)
|
||||
),
|
||||
)
|
||||
self._progress_source = source
|
||||
self._progress_device = None if source == ProgressSource.SCAN_PROGRESS else device
|
||||
self.bec_dispatcher.connect_slot(
|
||||
self.on_progress_update,
|
||||
(
|
||||
MessageEndpoints.scan_progress()
|
||||
if source == ProgressSource.SCAN_PROGRESS
|
||||
else MessageEndpoints.device_progress(device=device)
|
||||
),
|
||||
self.progress_tracker = BECProgressTracker(self.bec_dispatcher, parent=self)
|
||||
self.progress_tracker.progress_started.connect(self._on_progress_started)
|
||||
self.progress_tracker.progress_updated.connect(self._on_progress_snapshot)
|
||||
self.progress_tracker.progress_finished.connect(
|
||||
lambda _snapshot: self.progress_finished.emit()
|
||||
)
|
||||
self.update_source_label(source, device=device)
|
||||
# self.progress_started.emit()
|
||||
self.progress_tracker.start()
|
||||
|
||||
def update_source_label(self, source: ProgressSource, device=None):
|
||||
scan_text = f"Scan {self.scan_number}" if self.scan_number is not None else "Scan"
|
||||
text = scan_text if source == ProgressSource.SCAN_PROGRESS else f"Device {device}"
|
||||
logger.info(f"Set progress source to {text}")
|
||||
self.ui.source_label.setText(text)
|
||||
|
||||
@SafeSlot(dict, dict)
|
||||
def on_progress_update(self, msg_content: dict, metadata: dict):
|
||||
"""
|
||||
Update the progress bar based on the progress message.
|
||||
"""
|
||||
value = msg_content["value"]
|
||||
max_value = msg_content.get("max_value", 100)
|
||||
done = msg_content.get("done", False)
|
||||
status: Literal["open", "paused", "aborted", "halted", "closed"] = metadata.get(
|
||||
"status", "open"
|
||||
)
|
||||
|
||||
if self.task is None:
|
||||
def update_source_label(self):
|
||||
scan_number = self.progress_tracker.scan_number
|
||||
scan_text = f"Scan {scan_number}" if scan_number is not None else "Scan"
|
||||
if self.ui.source_label.text() == scan_text:
|
||||
return
|
||||
self.task.update(value, max_value, done)
|
||||
logger.info(f"Set progress source to {scan_text}")
|
||||
self.ui.source_label.setText(scan_text)
|
||||
|
||||
def _on_progress_started(self, _snapshot: ProgressSnapshot):
|
||||
if self.progress_tracker.task is not None:
|
||||
self.progress_tracker.task.timer.timeout.connect(self.update_labels)
|
||||
self.progress_started.emit()
|
||||
|
||||
def _on_progress_snapshot(self, snapshot: ProgressSnapshot):
|
||||
self.update_labels()
|
||||
|
||||
self.progressbar.set_maximum(self.task.max_value)
|
||||
self.progressbar.state = ProgressState.from_bec_status(status)
|
||||
self.progressbar.set_value(self.task.value)
|
||||
|
||||
if done:
|
||||
self.task = None
|
||||
self.progress_finished.emit()
|
||||
return
|
||||
if snapshot.is_new_scan and self.progress_tracker.task is None:
|
||||
self.ui.elapsed_time_label.setText("00:00:00")
|
||||
self.ui.remaining_time_label.setText("00:00:00")
|
||||
self.update_source_label()
|
||||
self.progressbar.set_maximum(snapshot.max_value)
|
||||
self.progressbar.set_value(snapshot.value)
|
||||
self.progressbar.state = BEC_STATUS_TO_PROGRESS_STATE.get(
|
||||
snapshot.status.lower(), ProgressState.NORMAL
|
||||
)
|
||||
|
||||
@SafeProperty(bool)
|
||||
def show_elapsed_time(self):
|
||||
@@ -259,74 +132,17 @@ class ScanProgressBar(BECWidget, QWidget):
|
||||
"""
|
||||
Update the labels based on the progress task.
|
||||
"""
|
||||
if self.task is None:
|
||||
task = self.progress_tracker.task
|
||||
if task is None:
|
||||
return
|
||||
|
||||
self.ui.elapsed_time_label.setText(self.task.time_elapsed)
|
||||
self.ui.remaining_time_label.setText(self.task.time_remaining)
|
||||
|
||||
@SafeSlot(dict, dict, verify_sender=True)
|
||||
def on_queue_update(self, msg_content, metadata):
|
||||
"""
|
||||
Update the progress bar based on the queue status.
|
||||
"""
|
||||
if not "queue" in msg_content:
|
||||
return
|
||||
if "primary" not in msg_content["queue"]:
|
||||
return
|
||||
if (primary_queue := msg_content.get("queue").get("primary")) is None:
|
||||
return
|
||||
if not isinstance(primary_queue, messages.ScanQueueStatus):
|
||||
return
|
||||
primary_queue_info = primary_queue.info
|
||||
if len(primary_queue_info) == 0:
|
||||
return
|
||||
scan_info = primary_queue_info[0]
|
||||
if scan_info is None:
|
||||
return
|
||||
if scan_info.status.lower() == "running" and self.task is None:
|
||||
self.task = ProgressTask(parent=self)
|
||||
self.progress_started.emit()
|
||||
|
||||
active_request_block = scan_info.active_request_block
|
||||
if active_request_block is None:
|
||||
return
|
||||
|
||||
self.scan_number = active_request_block.scan_number
|
||||
report_instructions = active_request_block.report_instructions
|
||||
if not report_instructions:
|
||||
return
|
||||
|
||||
# for now, let's just use the first instruction
|
||||
instruction = report_instructions[0]
|
||||
|
||||
if "scan_progress" in instruction:
|
||||
self.set_progress_source(ProgressSource.SCAN_PROGRESS)
|
||||
elif "device_progress" in instruction:
|
||||
device = instruction["device_progress"][0]
|
||||
self.set_progress_source(ProgressSource.DEVICE_PROGRESS, device=device)
|
||||
self.ui.elapsed_time_label.setText(task.time_elapsed)
|
||||
self.ui.remaining_time_label.setText(task.time_remaining)
|
||||
|
||||
def cleanup(self):
|
||||
if self.task is not None:
|
||||
self.task.timer.stop()
|
||||
self.close()
|
||||
self.deleteLater()
|
||||
if self._progress_source is not None:
|
||||
self.bec_dispatcher.disconnect_slot(
|
||||
self.on_progress_update,
|
||||
(
|
||||
MessageEndpoints.scan_progress()
|
||||
if self._progress_source == ProgressSource.SCAN_PROGRESS
|
||||
else MessageEndpoints.device_progress(device=self._progress_device)
|
||||
),
|
||||
)
|
||||
self._progress_source = None
|
||||
self._progress_device = None
|
||||
self.progress_tracker.cleanup()
|
||||
self.progressbar.close()
|
||||
self.progressbar.deleteLater()
|
||||
self.bec_dispatcher.disconnect_slot(
|
||||
self.on_queue_update, MessageEndpoints.scan_queue_status()
|
||||
)
|
||||
super().cleanup()
|
||||
|
||||
|
||||
|
||||
+2
-1
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "bec_widgets"
|
||||
version = "3.13.5"
|
||||
version = "3.14.0"
|
||||
description = "BEC Widgets"
|
||||
requires-python = ">=3.11"
|
||||
classifiers = [
|
||||
@@ -73,6 +73,7 @@ qtermwidget = ["pyside6_qtermwidget"]
|
||||
|
||||
|
||||
|
||||
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
@@ -69,7 +69,7 @@ def create_widget(
|
||||
return widget
|
||||
|
||||
|
||||
@pytest.mark.timeout(20)
|
||||
@pytest.mark.timeout(100)
|
||||
def test_available_widgets(qtbot, connected_client_gui_obj):
|
||||
"""This test checks that all widgets that are available via gui.available_widgets can be created and removed."""
|
||||
gui = connected_client_gui_obj
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
import numpy as np
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from qtpy.QtGui import QPalette
|
||||
from qtpy.QtWidgets import QProgressBar
|
||||
|
||||
from bec_widgets.widgets.progress.bec_progressbar.bec_progressbar import (
|
||||
BECProgressBar,
|
||||
@@ -15,6 +18,14 @@ def progressbar(qtbot):
|
||||
yield widget
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def static_progressbar(qtbot):
|
||||
widget = BECProgressBar(enable_dynamic_stylesheet=False)
|
||||
qtbot.addWidget(widget)
|
||||
qtbot.waitExposed(widget)
|
||||
yield widget
|
||||
|
||||
|
||||
def test_progressbar(progressbar):
|
||||
progressbar.update()
|
||||
|
||||
@@ -23,36 +34,181 @@ def test_progressbar_set_value(qtbot, progressbar):
|
||||
progressbar.set_minimum(0)
|
||||
progressbar.set_maximum(100)
|
||||
progressbar.set_value(50)
|
||||
progressbar.paintEvent(None)
|
||||
|
||||
qtbot.waitUntil(
|
||||
lambda: np.isclose(
|
||||
progressbar._value, progressbar._user_value * progressbar._oversampling_factor
|
||||
)
|
||||
)
|
||||
assert isinstance(progressbar.progressbar, QProgressBar)
|
||||
assert progressbar._value == progressbar._user_value * progressbar._oversampling_factor
|
||||
assert progressbar.progressbar.value() == 50 * progressbar._oversampling_factor
|
||||
|
||||
|
||||
def test_progressbar_label(progressbar):
|
||||
progressbar.label_template = "Test: $value"
|
||||
progressbar.set_value(50)
|
||||
assert progressbar.center_label.text() == "Test: 50"
|
||||
assert progressbar._get_label() == "Test: 50"
|
||||
assert progressbar.progressbar.text() == "Test: 50"
|
||||
|
||||
|
||||
def test_progress_state_from_bec_status():
|
||||
"""ProgressState.from_bec_status() maps BEC literals correctly."""
|
||||
mapping = {
|
||||
"open": ProgressState.NORMAL,
|
||||
"paused": ProgressState.PAUSED,
|
||||
"aborted": ProgressState.INTERRUPTED,
|
||||
"halted": ProgressState.PAUSED,
|
||||
"closed": ProgressState.COMPLETED,
|
||||
"UNKNOWN": ProgressState.NORMAL, # fallback
|
||||
}
|
||||
for text, expected in mapping.items():
|
||||
assert ProgressState.from_bec_status(text) is expected
|
||||
def test_progressbar_equal_minimum_and_maximum_does_not_raise(progressbar):
|
||||
progressbar.set_minimum(0)
|
||||
progressbar.set_maximum(0)
|
||||
progressbar.set_value(0)
|
||||
|
||||
assert progressbar._get_label() == "0 / 0 - 100 %"
|
||||
assert progressbar.progressbar.value() == progressbar.progressbar.maximum()
|
||||
|
||||
|
||||
def test_progressbar_uses_static_stylesheet_with_palette_state_color(progressbar):
|
||||
progressbar.progressbar.resize(100, 20)
|
||||
progressbar.set_value(50)
|
||||
progressbar.state = ProgressState.PAUSED
|
||||
|
||||
style_sheet = progressbar.progressbar.styleSheet()
|
||||
assert "QProgressBar::chunk" in style_sheet
|
||||
assert (
|
||||
f"background-color: {progressbar._state_colors[ProgressState.PAUSED].name()};"
|
||||
in style_sheet
|
||||
)
|
||||
assert "background-color: palette(mid);" in style_sheet
|
||||
assert "border-radius: 7px;" in style_sheet
|
||||
assert (
|
||||
progressbar.progressbar.palette().color(QPalette.ColorRole.Highlight)
|
||||
== progressbar._state_colors[ProgressState.PAUSED]
|
||||
)
|
||||
|
||||
|
||||
def test_progressbar_value_updates_do_not_rebuild_stylesheet_within_same_chunk_mode(progressbar):
|
||||
progressbar.progressbar.resize(100, 20)
|
||||
progressbar.set_value(30)
|
||||
|
||||
with mock.patch.object(
|
||||
progressbar, "_setup_style_sheet", wraps=progressbar._setup_style_sheet
|
||||
) as setup_style_sheet:
|
||||
progressbar.set_value(35)
|
||||
progressbar.set_value(42)
|
||||
progressbar.set_value(50)
|
||||
|
||||
setup_style_sheet.assert_not_called()
|
||||
|
||||
|
||||
def test_progressbar_value_updates_skip_chunk_radius_after_target_reached(progressbar):
|
||||
progressbar.progressbar.resize(100, 20)
|
||||
progressbar.set_value(30)
|
||||
assert progressbar._chunk_radius == progressbar._target_chunk_radius()
|
||||
|
||||
with mock.patch.object(
|
||||
progressbar, "_update_chunk_radius", wraps=progressbar._update_chunk_radius
|
||||
) as update_chunk_radius:
|
||||
progressbar.set_value(35)
|
||||
progressbar.set_value(42)
|
||||
progressbar.set_value(50)
|
||||
|
||||
update_chunk_radius.assert_not_called()
|
||||
|
||||
|
||||
def test_progressbar_repeated_same_maximum_does_not_reset_chunk_radius(progressbar):
|
||||
progressbar.progressbar.resize(100, 20)
|
||||
progressbar.set_maximum(100)
|
||||
progressbar.set_value(30)
|
||||
assert progressbar._chunk_radius == progressbar._target_chunk_radius()
|
||||
|
||||
with mock.patch.object(
|
||||
progressbar, "_update_chunk_radius", wraps=progressbar._update_chunk_radius
|
||||
) as update_chunk_radius:
|
||||
progressbar.set_maximum(100)
|
||||
progressbar.set_value(40)
|
||||
|
||||
update_chunk_radius.assert_not_called()
|
||||
|
||||
|
||||
def test_progressbar_can_disable_dynamic_stylesheet(static_progressbar):
|
||||
static_progressbar.progressbar.resize(100, 20)
|
||||
assert static_progressbar.enable_dynamic_stylesheet is False
|
||||
assert static_progressbar._chunk_radius == static_progressbar._target_chunk_radius()
|
||||
|
||||
with mock.patch.object(
|
||||
static_progressbar, "_setup_style_sheet", wraps=static_progressbar._setup_style_sheet
|
||||
) as setup_style_sheet:
|
||||
static_progressbar.set_value(1)
|
||||
static_progressbar.set_value(2)
|
||||
static_progressbar.set_value(3)
|
||||
|
||||
setup_style_sheet.assert_not_called()
|
||||
assert "border-radius: 7px;" in static_progressbar.progressbar.styleSheet()
|
||||
|
||||
|
||||
def test_progressbar_dynamic_stylesheet_can_be_toggled(progressbar):
|
||||
progressbar.enable_dynamic_stylesheet = False
|
||||
|
||||
assert progressbar.enable_dynamic_stylesheet is False
|
||||
assert progressbar._chunk_radius == progressbar._target_chunk_radius()
|
||||
assert "border-radius: 7px;" in progressbar.progressbar.styleSheet()
|
||||
|
||||
|
||||
def test_progressbar_rebuilds_stylesheet_until_chunk_radius_reaches_target(progressbar):
|
||||
progressbar.progressbar.resize(100, 20)
|
||||
progressbar.set_value(9)
|
||||
|
||||
with mock.patch.object(
|
||||
progressbar, "_setup_style_sheet", wraps=progressbar._setup_style_sheet
|
||||
) as setup_style_sheet:
|
||||
progressbar.set_value(12)
|
||||
progressbar.set_value(25)
|
||||
progressbar.set_value(30)
|
||||
|
||||
assert setup_style_sheet.call_count == 2
|
||||
assert "border-radius: 7px;" in progressbar.progressbar.styleSheet()
|
||||
|
||||
|
||||
def test_progressbar_resets_chunk_radius_when_value_goes_backwards(progressbar):
|
||||
progressbar.progressbar.resize(100, 20)
|
||||
progressbar.set_value(30)
|
||||
assert "border-radius: 7px;" in progressbar.progressbar.styleSheet()
|
||||
|
||||
progressbar.set_value(4)
|
||||
|
||||
assert "border-radius: 2px;" in progressbar.progressbar.styleSheet()
|
||||
|
||||
|
||||
def test_progressbar_state_setter(progressbar):
|
||||
"""Setting .state reflects internally."""
|
||||
progressbar.state = ProgressState.PAUSED
|
||||
assert progressbar.state is ProgressState.PAUSED
|
||||
|
||||
|
||||
def test_progressbar_warning_state_has_own_color_and_persists_on_value_update(progressbar):
|
||||
assert (
|
||||
progressbar._state_colors[ProgressState.PAUSED]
|
||||
!= progressbar._state_colors[ProgressState.WARNING]
|
||||
)
|
||||
assert (
|
||||
progressbar._state_colors[ProgressState.WARNING]
|
||||
!= progressbar._state_colors[ProgressState.INTERRUPTED]
|
||||
)
|
||||
|
||||
progressbar.state = ProgressState.WARNING
|
||||
progressbar.set_value(50)
|
||||
|
||||
assert progressbar.state is ProgressState.WARNING
|
||||
assert (
|
||||
progressbar.progressbar.palette().color(QPalette.ColorRole.Highlight)
|
||||
== progressbar._state_colors[ProgressState.WARNING]
|
||||
)
|
||||
|
||||
|
||||
def test_progressbar_warning_state_has_own_color_and_persists_on_value_update(progressbar):
|
||||
assert (
|
||||
progressbar._state_colors[ProgressState.PAUSED]
|
||||
!= progressbar._state_colors[ProgressState.WARNING]
|
||||
)
|
||||
assert (
|
||||
progressbar._state_colors[ProgressState.WARNING]
|
||||
!= progressbar._state_colors[ProgressState.INTERRUPTED]
|
||||
)
|
||||
|
||||
progressbar.state = ProgressState.WARNING
|
||||
progressbar.set_value(50)
|
||||
|
||||
assert progressbar.state is ProgressState.WARNING
|
||||
assert (
|
||||
progressbar.progressbar.palette().color(QPalette.ColorRole.Highlight)
|
||||
== progressbar._state_colors[ProgressState.WARNING]
|
||||
)
|
||||
|
||||
@@ -40,7 +40,7 @@ def test_update_device_initialization_progress(progress_bar, qtbot):
|
||||
assert progress_bar.progress_bar._user_value == 1
|
||||
assert progress_bar.progress_bar._user_maximum == 3
|
||||
assert progress_bar.progress_label.text() == f"{msg.device} initialization in progress..."
|
||||
assert "1 / 3 - 33 %" == progress_bar.progress_bar.center_label.text()
|
||||
assert "1 / 3 - 33 %" == progress_bar.progress_bar.progressbar.text()
|
||||
|
||||
# II. Update with message of finished DeviceInitializationProgressMessage, finished=True, success=True
|
||||
msg.finished = True
|
||||
@@ -49,7 +49,7 @@ def test_update_device_initialization_progress(progress_bar, qtbot):
|
||||
assert progress_bar.progress_bar._user_value == 1
|
||||
assert progress_bar.progress_bar._user_maximum == 3
|
||||
assert progress_bar.progress_label.text() == f"{msg.device} initialization succeeded!"
|
||||
assert "1 / 3 - 33 %" == progress_bar.progress_bar.center_label.text()
|
||||
assert "1 / 3 - 33 %" == progress_bar.progress_bar.progressbar.text()
|
||||
|
||||
# III. Update with message of finished DeviceInitializationProgressMessage, finished=True, success=False
|
||||
msg.finished = True
|
||||
@@ -59,7 +59,7 @@ def test_update_device_initialization_progress(progress_bar, qtbot):
|
||||
with qtbot.waitSignal(progress_bar.failed_devices_changed) as signal_blocker:
|
||||
progress_bar._update_device_initialization_progress(msg.model_dump(), {})
|
||||
assert progress_bar.progress_label.text() == f"{msg.device} initialization failed!"
|
||||
assert "2 / 3 - 66 %" == progress_bar.progress_bar.center_label.text()
|
||||
assert "2 / 3 - 66 %" == progress_bar.progress_bar.progressbar.text()
|
||||
assert progress_bar.progress_bar._user_value == 2
|
||||
assert progress_bar.progress_bar._user_maximum == 3
|
||||
|
||||
|
||||
@@ -0,0 +1,179 @@
|
||||
from unittest import mock
|
||||
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
|
||||
from bec_widgets.widgets.progress.progress_backend import BECProgressTracker
|
||||
|
||||
|
||||
def _dispatcher():
|
||||
dispatcher = mock.MagicMock()
|
||||
return dispatcher
|
||||
|
||||
|
||||
def test_tracker_subscribes_to_scan_progress_immediately():
|
||||
dispatcher = _dispatcher()
|
||||
tracker = BECProgressTracker(dispatcher)
|
||||
|
||||
tracker.start()
|
||||
|
||||
assert dispatcher.connect_slot.call_args_list == [
|
||||
mock.call(tracker.process_progress_message, MessageEndpoints.scan_progress()),
|
||||
mock.call(tracker.process_scan_status_message, MessageEndpoints.scan_status()),
|
||||
]
|
||||
tracker.cleanup()
|
||||
|
||||
|
||||
def test_tracker_starts_scan_from_scan_progress_metadata():
|
||||
dispatcher = _dispatcher()
|
||||
tracker = BECProgressTracker(dispatcher)
|
||||
snapshots = []
|
||||
tracker.progress_updated.connect(snapshots.append)
|
||||
|
||||
tracker.start()
|
||||
tracker.process_progress_message(
|
||||
{"value": 3, "max_value": 10},
|
||||
{"scan_id": "scan-2", "RID": "rid-2", "scan_number": 2, "status": "open"},
|
||||
)
|
||||
|
||||
assert tracker.task is not None
|
||||
assert tracker._active_scan_id == "scan-2"
|
||||
assert tracker._active_rid == "rid-2"
|
||||
assert tracker.scan_number == 2
|
||||
assert snapshots[-1].scan_number == 2
|
||||
|
||||
tracker.cleanup()
|
||||
|
||||
|
||||
def test_tracker_switches_sources_idempotently():
|
||||
dispatcher = _dispatcher()
|
||||
tracker = BECProgressTracker(dispatcher)
|
||||
|
||||
tracker.start()
|
||||
tracker.start()
|
||||
assert dispatcher.connect_slot.call_count == 2
|
||||
assert dispatcher.disconnect_slot.call_count == 0
|
||||
|
||||
tracker.cleanup()
|
||||
|
||||
|
||||
def test_tracker_resets_progress_on_new_open_scan_status():
|
||||
dispatcher = _dispatcher()
|
||||
tracker = BECProgressTracker(dispatcher)
|
||||
snapshots = []
|
||||
tracker.progress_updated.connect(snapshots.append)
|
||||
tracker.start()
|
||||
|
||||
snapshot = tracker.process_scan_status_message(
|
||||
{"scan_id": "scan-1", "scan_number": 7, "status": "open"}, {}
|
||||
)
|
||||
|
||||
assert snapshot is not None
|
||||
assert snapshot.value == 0
|
||||
assert snapshot.max_value == 100
|
||||
assert snapshot.status == "open"
|
||||
assert snapshot.scan_id == "scan-1"
|
||||
assert snapshot.scan_number == 7
|
||||
assert snapshot.is_new_scan is True
|
||||
assert tracker.task is None
|
||||
assert tracker.scan_number == 7
|
||||
assert snapshots[-1] == snapshot
|
||||
|
||||
tracker.cleanup()
|
||||
|
||||
|
||||
def test_tracker_ignores_duplicate_open_scan_status():
|
||||
dispatcher = _dispatcher()
|
||||
tracker = BECProgressTracker(dispatcher)
|
||||
snapshots = []
|
||||
tracker.progress_updated.connect(snapshots.append)
|
||||
tracker.start()
|
||||
|
||||
tracker.process_scan_status_message({"scan_id": "scan-1", "status": "open"}, {})
|
||||
tracker.process_scan_status_message({"scan_id": "scan-1", "status": "open"}, {})
|
||||
|
||||
assert len(snapshots) == 1
|
||||
|
||||
tracker.cleanup()
|
||||
|
||||
|
||||
def test_tracker_ignores_open_scan_status_after_progress_for_same_scan():
|
||||
dispatcher = _dispatcher()
|
||||
tracker = BECProgressTracker(dispatcher)
|
||||
snapshots = []
|
||||
tracker.progress_updated.connect(snapshots.append)
|
||||
tracker.start()
|
||||
|
||||
tracker.process_progress_message(
|
||||
{"value": 3, "max_value": 10}, {"scan_id": "scan-1", "RID": "rid-1", "status": "open"}
|
||||
)
|
||||
snapshot = tracker.process_scan_status_message(
|
||||
{"scan_id": "scan-1", "scan_number": 7, "status": "open"}, {}
|
||||
)
|
||||
|
||||
assert snapshot is None
|
||||
assert len(snapshots) == 1
|
||||
assert snapshots[-1].value == 3
|
||||
assert tracker.task is not None
|
||||
assert tracker.task.value == 3
|
||||
|
||||
tracker.cleanup()
|
||||
|
||||
|
||||
def test_tracker_ignores_open_scan_status_after_done_progress_for_same_scan():
|
||||
dispatcher = _dispatcher()
|
||||
tracker = BECProgressTracker(dispatcher)
|
||||
snapshots = []
|
||||
tracker.progress_updated.connect(snapshots.append)
|
||||
tracker.start()
|
||||
|
||||
tracker.process_progress_message(
|
||||
{"value": 10, "max_value": 10, "done": True},
|
||||
{"scan_id": "scan-1", "RID": "rid-1", "status": "closed"},
|
||||
)
|
||||
snapshot = tracker.process_scan_status_message(
|
||||
{"scan_id": "scan-1", "scan_number": 7, "status": "open"}, {}
|
||||
)
|
||||
|
||||
assert snapshot is None
|
||||
assert len(snapshots) == 1
|
||||
assert snapshots[-1].value == 10
|
||||
assert tracker.task is None
|
||||
|
||||
tracker.cleanup()
|
||||
|
||||
|
||||
def test_tracker_marks_new_scan_only_when_rid_changes():
|
||||
dispatcher = _dispatcher()
|
||||
tracker = BECProgressTracker(dispatcher)
|
||||
snapshots = []
|
||||
tracker.progress_updated.connect(snapshots.append)
|
||||
tracker.start()
|
||||
|
||||
tracker.process_progress_message({"value": 10, "max_value": 100}, {"RID": "rid-1"})
|
||||
tracker.process_progress_message({"value": 20, "max_value": 200}, {"RID": "rid-1"})
|
||||
tracker.process_progress_message({"value": 5, "max_value": 50}, {"RID": "rid-2"})
|
||||
|
||||
assert [snapshot.is_new_scan for snapshot in snapshots] == [True, False, True]
|
||||
assert tracker._active_rid == "rid-2"
|
||||
|
||||
tracker.cleanup()
|
||||
|
||||
|
||||
def test_tracker_keeps_partial_value_for_done_scan_progress():
|
||||
dispatcher = _dispatcher()
|
||||
tracker = BECProgressTracker(dispatcher)
|
||||
snapshots = []
|
||||
tracker.progress_updated.connect(snapshots.append)
|
||||
tracker.start()
|
||||
|
||||
tracker.process_progress_message(
|
||||
{"value": 4, "max_value": 10, "done": True},
|
||||
{"scan_id": "scan-1", "RID": "rid-1", "status": "aborted"},
|
||||
)
|
||||
|
||||
assert snapshots[-1].value == 4
|
||||
assert snapshots[-1].max_value == 10
|
||||
assert snapshots[-1].done is True
|
||||
assert tracker.task is None
|
||||
|
||||
tracker.cleanup()
|
||||
@@ -1,8 +1,9 @@
|
||||
# pylint: disable=missing-function-docstring, missing-module-docstring
|
||||
|
||||
from unittest.mock import MagicMock
|
||||
from unittest.mock import MagicMock, call
|
||||
|
||||
import pytest
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from qtpy.QtGui import QColor
|
||||
|
||||
from bec_widgets.tests.utils import FakeDevice
|
||||
@@ -76,11 +77,14 @@ def test_set_update_to_scan(ring_widget):
|
||||
ring_widget.set_update("scan")
|
||||
|
||||
assert ring_widget.config.mode == "scan"
|
||||
# Verify that connect_slot was called
|
||||
ring_widget.bec_dispatcher.connect_slot.assert_called_once()
|
||||
call_args = ring_widget.bec_dispatcher.connect_slot.call_args
|
||||
assert call_args[0][0] == ring_widget.on_scan_progress
|
||||
assert "scan_progress" in str(call_args[0][1])
|
||||
assert ring_widget.bec_dispatcher.connect_slot.call_args_list == [
|
||||
call(
|
||||
ring_widget.progress_tracker.process_progress_message, MessageEndpoints.scan_progress()
|
||||
),
|
||||
call(
|
||||
ring_widget.progress_tracker.process_scan_status_message, MessageEndpoints.scan_status()
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def test_set_update_from_scan_to_manual(ring_widget):
|
||||
@@ -97,6 +101,14 @@ def test_set_update_from_scan_to_manual(ring_widget):
|
||||
|
||||
assert ring_widget.config.mode == "manual"
|
||||
assert ring_widget.registered_slot is None
|
||||
assert ring_widget.bec_dispatcher.disconnect_slot.call_args_list == [
|
||||
call(
|
||||
ring_widget.progress_tracker.process_progress_message, MessageEndpoints.scan_progress()
|
||||
),
|
||||
call(
|
||||
ring_widget.progress_tracker.process_scan_status_message, MessageEndpoints.scan_status()
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def test_set_update_to_device(ring_widget_with_device):
|
||||
@@ -420,7 +432,7 @@ def test_set_direction_counter_clockwise(ring_widget):
|
||||
###################################
|
||||
|
||||
|
||||
def test_update_device_connection_with_progress_signal(ring_widget_with_device):
|
||||
def test_update_device_connection_prefers_progress_signal(ring_widget_with_device):
|
||||
ring_widget = ring_widget_with_device
|
||||
samx = ring_widget.bec_dispatcher.client.device_manager.devices.samx
|
||||
samx._info["signals"]["progress"] = {
|
||||
@@ -432,15 +444,114 @@ def test_update_device_connection_with_progress_signal(ring_widget_with_device):
|
||||
|
||||
ring_widget.bec_dispatcher.connect_slot = MagicMock()
|
||||
|
||||
ring_widget._update_device_connection("samx", "progress")
|
||||
signal = ring_widget._update_device_connection("samx", "")
|
||||
|
||||
# Should connect to device_progress endpoint
|
||||
assert signal == "progress"
|
||||
ring_widget.bec_dispatcher.connect_slot.assert_called_once()
|
||||
call_args = ring_widget.bec_dispatcher.connect_slot.call_args
|
||||
assert call_args[0][0] == ring_widget.on_device_progress
|
||||
assert call_args[0][1] == MessageEndpoints.device_progress("samx")
|
||||
|
||||
|
||||
def test_update_device_connection_with_hinted_signal(ring_widget):
|
||||
def test_update_device_connection_accepts_explicit_progress_signal(ring_widget_with_device):
|
||||
ring_widget = ring_widget_with_device
|
||||
samx = ring_widget.bec_dispatcher.client.device_manager.devices.samx
|
||||
samx._info["signals"]["progress"] = {
|
||||
"obj_name": "samx_progress",
|
||||
"component_name": "progress",
|
||||
"signal_class": "ProgressSignal",
|
||||
"kind_str": "hinted",
|
||||
}
|
||||
|
||||
ring_widget.bec_dispatcher.connect_slot = MagicMock()
|
||||
|
||||
signal = ring_widget._update_device_connection("samx", "progress")
|
||||
|
||||
assert signal == "progress"
|
||||
ring_widget.bec_dispatcher.connect_slot.assert_called_once()
|
||||
call_args = ring_widget.bec_dispatcher.connect_slot.call_args
|
||||
assert call_args[0][0] == ring_widget.on_device_progress
|
||||
assert call_args[0][1] == MessageEndpoints.device_progress("samx")
|
||||
|
||||
|
||||
def test_update_device_connection_resolves_component_name_to_readback_signal(
|
||||
ring_widget_with_device,
|
||||
):
|
||||
ring_widget = ring_widget_with_device
|
||||
samx = ring_widget.bec_dispatcher.client.device_manager.devices.samx
|
||||
samx._info["signals"]["setpoint"] = {
|
||||
"obj_name": "samx_setpoint",
|
||||
"component_name": "setpoint",
|
||||
"signal_class": "Signal",
|
||||
"kind_str": "normal",
|
||||
}
|
||||
|
||||
ring_widget.bec_dispatcher.connect_slot = MagicMock()
|
||||
|
||||
signal = ring_widget._update_device_connection("samx", "setpoint")
|
||||
|
||||
assert signal == "setpoint"
|
||||
ring_widget.bec_dispatcher.connect_slot.assert_called_once()
|
||||
call_args = ring_widget.bec_dispatcher.connect_slot.call_args
|
||||
assert call_args[0][0] == ring_widget.on_device_readback
|
||||
assert call_args[0][1] == MessageEndpoints.device_readback("samx")
|
||||
|
||||
|
||||
def test_update_device_connection_falls_back_to_hinted_signal(ring_widget_with_device):
|
||||
ring_widget = ring_widget_with_device
|
||||
ring_widget.bec_dispatcher.connect_slot = MagicMock()
|
||||
|
||||
signal = ring_widget._update_device_connection("samx", "")
|
||||
|
||||
assert signal == "samx"
|
||||
ring_widget.bec_dispatcher.connect_slot.assert_called_once()
|
||||
call_args = ring_widget.bec_dispatcher.connect_slot.call_args
|
||||
assert call_args[0][0] == ring_widget.on_device_readback
|
||||
assert call_args[0][1] == MessageEndpoints.device_readback("samx")
|
||||
|
||||
|
||||
def test_update_device_connection_falls_back_to_normal_signal(ring_widget_with_device):
|
||||
ring_widget = ring_widget_with_device
|
||||
samx = ring_widget.bec_dispatcher.client.device_manager.devices.samx
|
||||
samx._info["signals"] = {
|
||||
"setpoint": {
|
||||
"obj_name": "samx_setpoint",
|
||||
"component_name": "setpoint",
|
||||
"signal_class": "Signal",
|
||||
"kind_str": "normal",
|
||||
}
|
||||
}
|
||||
|
||||
ring_widget.bec_dispatcher.connect_slot = MagicMock()
|
||||
|
||||
signal = ring_widget._update_device_connection("samx", "")
|
||||
|
||||
assert signal == "setpoint"
|
||||
ring_widget.bec_dispatcher.connect_slot.assert_called_once()
|
||||
call_args = ring_widget.bec_dispatcher.connect_slot.call_args
|
||||
assert call_args[0][0] == ring_widget.on_device_readback
|
||||
assert call_args[0][1] == MessageEndpoints.device_readback("samx")
|
||||
|
||||
|
||||
def test_update_device_connection_rejects_unusable_signal(ring_widget_with_device):
|
||||
ring_widget = ring_widget_with_device
|
||||
samx = ring_widget.bec_dispatcher.client.device_manager.devices.samx
|
||||
samx._info["signals"]["async_signal"] = {
|
||||
"obj_name": "samx_async",
|
||||
"component_name": "async_signal",
|
||||
"signal_class": "AsyncSignal",
|
||||
"kind_str": "hinted",
|
||||
}
|
||||
|
||||
ring_widget.bec_dispatcher.connect_slot = MagicMock()
|
||||
|
||||
with pytest.raises(ValueError, match="not usable for ring progress device mode"):
|
||||
ring_widget._update_device_connection("samx", "samx_async")
|
||||
|
||||
ring_widget.bec_dispatcher.connect_slot.assert_not_called()
|
||||
|
||||
|
||||
def test_update_device_connection_accepts_explicit_hinted_signal(ring_widget):
|
||||
mock_device = FakeDevice(name="samx")
|
||||
mock_device._info = {
|
||||
"signals": {
|
||||
@@ -452,12 +563,13 @@ def test_update_device_connection_with_hinted_signal(ring_widget):
|
||||
|
||||
ring_widget.bec_dispatcher.connect_slot = MagicMock()
|
||||
|
||||
ring_widget._update_device_connection("samx", "samx")
|
||||
signal = ring_widget._update_device_connection("samx", "samx")
|
||||
|
||||
# Should connect to device_readback endpoint
|
||||
assert signal == "samx"
|
||||
ring_widget.bec_dispatcher.connect_slot.assert_called_once()
|
||||
call_args = ring_widget.bec_dispatcher.connect_slot.call_args
|
||||
assert call_args[0][0] == ring_widget.on_device_readback
|
||||
assert call_args[0][1] == MessageEndpoints.device_readback("samx")
|
||||
|
||||
|
||||
def test_update_device_connection_no_device_manager(ring_widget):
|
||||
@@ -472,44 +584,52 @@ def test_update_device_connection_device_not_found(ring_widget):
|
||||
mock_device = FakeDevice(name="samx")
|
||||
ring_widget.bec_dispatcher.client.device_manager.devices["samx"] = mock_device
|
||||
|
||||
# Should return without raising an error
|
||||
ring_widget._update_device_connection("nonexistent", "signal")
|
||||
assert ring_widget._update_device_connection("nonexistent", "signal") == ""
|
||||
|
||||
|
||||
###################################
|
||||
# on_scan_progress tests
|
||||
# scan progress tests
|
||||
###################################
|
||||
|
||||
|
||||
def test_on_scan_progress_updates_value(ring_widget):
|
||||
def test_scan_progress_updates_value(ring_widget):
|
||||
msg = {"value": 42, "max_value": 100}
|
||||
meta = {"RID": "test_rid_123"}
|
||||
|
||||
ring_widget.on_scan_progress(msg, meta)
|
||||
ring_widget.progress_tracker.process_progress_message(msg, meta)
|
||||
|
||||
assert ring_widget.config.value == 42
|
||||
|
||||
|
||||
def test_on_scan_progress_updates_min_max_on_new_rid(ring_widget):
|
||||
def test_scan_status_open_resets_scan_progress_value(ring_widget):
|
||||
ring_widget.set_min_max_values(0, 200)
|
||||
ring_widget.set_value(80)
|
||||
|
||||
ring_widget.progress_tracker.process_scan_status_message(
|
||||
{"scan_id": "scan-1", "scan_number": 7, "status": "open"}, {}
|
||||
)
|
||||
|
||||
assert ring_widget.config.min_value == 0
|
||||
assert ring_widget.config.max_value == 100
|
||||
assert ring_widget.config.value == 0
|
||||
|
||||
|
||||
def test_scan_progress_updates_min_max_on_new_rid(ring_widget):
|
||||
msg = {"value": 50, "max_value": 200}
|
||||
meta = {"RID": "new_rid"}
|
||||
|
||||
ring_widget.RID = "old_rid"
|
||||
ring_widget.on_scan_progress(msg, meta)
|
||||
ring_widget.progress_tracker.process_progress_message(msg, meta)
|
||||
|
||||
assert ring_widget.config.min_value == 0
|
||||
assert ring_widget.config.max_value == 200
|
||||
assert ring_widget.config.value == 50
|
||||
|
||||
|
||||
def test_on_scan_progress_same_rid_no_min_max_update(ring_widget):
|
||||
msg = {"value": 75, "max_value": 300}
|
||||
def test_scan_progress_same_rid_no_min_max_update(ring_widget):
|
||||
meta = {"RID": "same_rid"}
|
||||
|
||||
ring_widget.RID = "same_rid"
|
||||
ring_widget.set_min_max_values(0, 100)
|
||||
|
||||
ring_widget.on_scan_progress(msg, meta)
|
||||
ring_widget.progress_tracker.process_progress_message({"value": 10, "max_value": 100}, meta)
|
||||
ring_widget.progress_tracker.process_progress_message({"value": 75, "max_value": 300}, meta)
|
||||
|
||||
# Max value should not be updated when RID is the same
|
||||
assert ring_widget.config.max_value == 100
|
||||
|
||||
@@ -442,44 +442,6 @@ def test_scan_info_adapter_skips_duplicate_visible_kwargs():
|
||||
}
|
||||
|
||||
|
||||
def test_scan_info_adapter_supports_optional_annotated_types():
|
||||
scan_info = {
|
||||
"class": "OptionalScan",
|
||||
"base_class": "ScanBaseV4",
|
||||
"arg_input": {},
|
||||
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
|
||||
"gui_visibility": {"Matching": ["atol"]},
|
||||
"signature": [
|
||||
{
|
||||
"arg": False,
|
||||
"name": "atol",
|
||||
"annotation": {
|
||||
"Annotated": {
|
||||
"type": ["float", "NoneType"],
|
||||
"metadata": {
|
||||
"ScanArgument": {
|
||||
"display_name": "Tolerance",
|
||||
"tooltip": "Optional tolerance used for position matching",
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
"default": None,
|
||||
"kind": "KEYWORD_ONLY",
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
gui_config = ScanInfoAdapter().build_scan_ui_config(scan_info)
|
||||
input_spec = gui_config["kwarg_groups"][0]["inputs"][0]
|
||||
|
||||
assert input_spec["name"] == "atol"
|
||||
assert input_spec["type"] == "float"
|
||||
assert input_spec["optional"] is True
|
||||
assert input_spec["default"] is None
|
||||
assert input_spec["display_name"] == "Tolerance"
|
||||
|
||||
|
||||
def test_scan_info_adapter_rejects_unsupported_visible_inputs():
|
||||
scan_info = {
|
||||
"class": "UnsupportedScan",
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
|
||||
|
||||
from bec_widgets.utils.widget_io import WidgetIO
|
||||
from bec_widgets.widgets.control.scan_control.scan_group_box import ScanGroupBox, ScanOptionalWidget
|
||||
from bec_widgets.widgets.control.scan_control.scan_group_box import ScanGroupBox
|
||||
|
||||
|
||||
def test_kwarg_box(qtbot):
|
||||
@@ -235,41 +235,3 @@ def test_spinbox_limits_from_scan_info(qtbot):
|
||||
assert settling_time.maximum() == 3.5
|
||||
assert steps.minimum() == 1
|
||||
assert steps.maximum() == 10
|
||||
|
||||
|
||||
def test_optional_kwarg_widget_round_trips_none(qtbot):
|
||||
group_input = {
|
||||
"name": "Kwarg Test",
|
||||
"inputs": [
|
||||
{
|
||||
"arg": False,
|
||||
"name": "atol",
|
||||
"type": "float",
|
||||
"display_name": "Tolerance",
|
||||
"tooltip": "Optional tolerance used for position matching",
|
||||
"default": None,
|
||||
"optional": True,
|
||||
"expert": False,
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
kwarg_box = ScanGroupBox(box_type="kwargs", config=group_input)
|
||||
|
||||
assert isinstance(kwarg_box.widgets[0], ScanOptionalWidget)
|
||||
assert kwarg_box.widgets[0].none_checkbox.text() == ""
|
||||
assert kwarg_box.widgets[0].is_none() is True
|
||||
assert kwarg_box.widgets[0].inner_widget.isEnabled() is False
|
||||
assert kwarg_box.get_parameters() == {"atol": None}
|
||||
|
||||
kwarg_box.set_parameters({"atol": 1.25})
|
||||
|
||||
assert kwarg_box.widgets[0].is_none() is False
|
||||
assert kwarg_box.widgets[0].inner_widget.isEnabled() is True
|
||||
assert WidgetIO.get_value(kwarg_box.widgets[0].inner_widget) == 1.25
|
||||
assert kwarg_box.get_parameters() == {"atol": 1.25}
|
||||
|
||||
kwarg_box.set_parameters({"atol": None})
|
||||
|
||||
assert kwarg_box.widgets[0].is_none() is True
|
||||
assert kwarg_box.get_parameters() == {"atol": None}
|
||||
|
||||
@@ -2,7 +2,6 @@ from unittest import mock
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
from bec_lib import messages
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
|
||||
from bec_widgets.utils.bec_widget import BECWidget
|
||||
@@ -10,11 +9,8 @@ from bec_widgets.widgets.progress.bec_progressbar.bec_progressbar import (
|
||||
BECProgressBar,
|
||||
ProgressState,
|
||||
)
|
||||
from bec_widgets.widgets.progress.scan_progressbar.scan_progressbar import (
|
||||
ProgressSource,
|
||||
ProgressTask,
|
||||
ScanProgressBar,
|
||||
)
|
||||
from bec_widgets.widgets.progress.progress_backend import ProgressTask
|
||||
from bec_widgets.widgets.progress.scan_progressbar.scan_progressbar import ScanProgressBar
|
||||
|
||||
from .client_mocks import mocked_client
|
||||
|
||||
@@ -27,30 +23,6 @@ def scan_progressbar(qtbot, mocked_client):
|
||||
yield widget
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def scan_message():
|
||||
return messages.ScanQueueMessage(
|
||||
metadata={
|
||||
"file_suffix": None,
|
||||
"file_directory": None,
|
||||
"user_metadata": {"sample_name": ""},
|
||||
"RID": "94949c6e-d5f2-4f01-837e-a5d36257dd5d",
|
||||
},
|
||||
scan_type="line_scan",
|
||||
parameter={
|
||||
"args": {"samx": [-10.0, 10.0]},
|
||||
"kwargs": {
|
||||
"steps": 20,
|
||||
"relative": False,
|
||||
"exp_time": 0.1,
|
||||
"burst_at_each_point": 1,
|
||||
"system_config": {"file_suffix": None, "file_directory": None},
|
||||
},
|
||||
},
|
||||
queue="primary",
|
||||
)
|
||||
|
||||
|
||||
def test_progress_task_basic():
|
||||
"""percentage, remaining, and formatted time helpers behave as expected."""
|
||||
task = ProgressTask(parent=None, value=50, max_value=100, done=False)
|
||||
@@ -71,18 +43,52 @@ def test_progress_task_basic():
|
||||
assert task.time_elapsed == "00:00:10"
|
||||
|
||||
|
||||
def test_progress_task_elapsed_time_uses_monotonic_clock(monkeypatch):
|
||||
times = iter([100.0, 102.5])
|
||||
monkeypatch.setattr(
|
||||
"bec_widgets.widgets.progress.progress_backend.time.monotonic", lambda: next(times)
|
||||
)
|
||||
task = ProgressTask(parent=None)
|
||||
task.timer.stop()
|
||||
|
||||
task.update_elapsed_time()
|
||||
|
||||
assert task._elapsed_time == 2.5
|
||||
assert task.time_elapsed == "00:00:02"
|
||||
|
||||
|
||||
def test_scan_progressbar_initialization(scan_progressbar):
|
||||
assert isinstance(scan_progressbar, ScanProgressBar)
|
||||
assert isinstance(scan_progressbar.progressbar, BECProgressBar)
|
||||
|
||||
|
||||
def test_scan_progressbar_passes_dynamic_stylesheet_setting(qtbot, mocked_client):
|
||||
widget = ScanProgressBar(client=mocked_client, enable_dynamic_stylesheet=False)
|
||||
qtbot.addWidget(widget)
|
||||
|
||||
assert widget.progressbar.enable_dynamic_stylesheet is False
|
||||
|
||||
|
||||
def test_scan_progressbar_starts_from_scan_progress_before_queue_update(scan_progressbar):
|
||||
scan_progressbar.progress_tracker.clear_task(emit_finished=False)
|
||||
|
||||
scan_progressbar.progress_tracker.process_progress_message(
|
||||
{"value": 3, "max_value": 10, "done": False}, metadata={"RID": "live-rid"}
|
||||
)
|
||||
|
||||
assert scan_progressbar.progress_tracker.task is not None
|
||||
assert scan_progressbar.progress_tracker._active_scan_id == "live-rid"
|
||||
assert scan_progressbar.progressbar._user_value == 3
|
||||
assert scan_progressbar.progressbar._user_maximum == 10
|
||||
|
||||
|
||||
def test_update_labels_content(scan_progressbar):
|
||||
"""update_labels() reflects ProgressTask time strings on the UI."""
|
||||
# fabricate a task with known timings
|
||||
task = ProgressTask(parent=scan_progressbar, value=30, max_value=100, done=False)
|
||||
task.timer.stop()
|
||||
task._elapsed_time = 50
|
||||
scan_progressbar.task = task
|
||||
scan_progressbar.progress_tracker.task = task
|
||||
|
||||
scan_progressbar.update_labels()
|
||||
|
||||
@@ -90,17 +96,17 @@ def test_update_labels_content(scan_progressbar):
|
||||
assert scan_progressbar.ui.remaining_time_label.text() == "00:01:57"
|
||||
|
||||
|
||||
def test_on_progress_update(qtbot, scan_progressbar):
|
||||
def test_progress_update(qtbot, scan_progressbar):
|
||||
"""
|
||||
on_progress_update() should forward new values to the embedded
|
||||
BECProgressBar and keep ProgressTask in sync.
|
||||
Scan progress updates should update the embedded BECProgressBar
|
||||
and keep ProgressTask in sync.
|
||||
"""
|
||||
task = ProgressTask(parent=scan_progressbar, value=0, max_value=100, done=False)
|
||||
task.timer.stop()
|
||||
scan_progressbar.task = task
|
||||
scan_progressbar.progress_tracker.task = task
|
||||
|
||||
msg = {"value": 20, "max_value": 100, "done": False}
|
||||
scan_progressbar.on_progress_update(msg, metadata={"status": "open"})
|
||||
scan_progressbar.progress_tracker.process_progress_message(msg, metadata={"status": "open"})
|
||||
|
||||
qtbot.wait(200)
|
||||
bar = scan_progressbar.progressbar
|
||||
@@ -110,14 +116,58 @@ def test_on_progress_update(qtbot, scan_progressbar):
|
||||
assert bar.state is ProgressState.NORMAL
|
||||
|
||||
|
||||
def test_scan_status_open_resets_progress_before_first_progress_update(scan_progressbar):
|
||||
scan_progressbar.progressbar.set_maximum(50)
|
||||
scan_progressbar.progressbar.set_value(25)
|
||||
scan_progressbar.progressbar.state = ProgressState.INTERRUPTED
|
||||
scan_progressbar.ui.elapsed_time_label.setText("00:00:12")
|
||||
scan_progressbar.ui.remaining_time_label.setText("00:00:34")
|
||||
|
||||
scan_progressbar.progress_tracker.process_scan_status_message(
|
||||
{"scan_id": "scan-1", "scan_number": 7, "status": "open"}, {"RID": "rid-1"}
|
||||
)
|
||||
|
||||
assert scan_progressbar.progressbar._user_value == 0
|
||||
assert scan_progressbar.progressbar._user_maximum == 100
|
||||
assert scan_progressbar.progressbar.state is ProgressState.NORMAL
|
||||
assert scan_progressbar.ui.elapsed_time_label.text() == "00:00:00"
|
||||
assert scan_progressbar.ui.remaining_time_label.text() == "00:00:00"
|
||||
assert scan_progressbar.ui.source_label.text() == "Scan 7"
|
||||
|
||||
|
||||
def test_scan_status_open_reset_ignores_same_scan(scan_progressbar):
|
||||
scan_progressbar.progress_tracker.process_scan_status_message(
|
||||
{"scan_id": "scan-1", "status": "open"}, {}
|
||||
)
|
||||
scan_progressbar.progressbar.set_value(20)
|
||||
|
||||
scan_progressbar.progress_tracker.process_scan_status_message(
|
||||
{"scan_id": "scan-1", "status": "open"}, {}
|
||||
)
|
||||
|
||||
assert scan_progressbar.progressbar._user_value == 20
|
||||
|
||||
|
||||
def test_scan_status_non_open_does_not_reset_progress(scan_progressbar):
|
||||
scan_progressbar.progressbar.set_value(25)
|
||||
|
||||
scan_progressbar.progress_tracker.process_scan_status_message(
|
||||
{"scan_id": "scan-1", "status": "closed"}, {}
|
||||
)
|
||||
|
||||
assert scan_progressbar.progressbar._user_value == 25
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"status, value, max_val, expected_state",
|
||||
[
|
||||
("open", 10, 100, ProgressState.NORMAL),
|
||||
("paused", 25, 100, ProgressState.PAUSED),
|
||||
("aborted", 30, 100, ProgressState.INTERRUPTED),
|
||||
("halted", 40, 100, ProgressState.PAUSED),
|
||||
("aborted", 30, 100, ProgressState.WARNING),
|
||||
("halted", 40, 100, ProgressState.INTERRUPTED),
|
||||
("closed", 100, 100, ProgressState.COMPLETED),
|
||||
("user_completed", 40, 100, ProgressState.COMPLETED),
|
||||
("UNKNOWN", 10, 100, ProgressState.NORMAL),
|
||||
],
|
||||
)
|
||||
def test_state_mapping_during_updates(
|
||||
@@ -126,9 +176,9 @@ def test_state_mapping_during_updates(
|
||||
"""ScanProgressBar should translate BEC status → ProgressState consistently."""
|
||||
task = ProgressTask(parent=scan_progressbar, value=0, max_value=max_val, done=False)
|
||||
task.timer.stop()
|
||||
scan_progressbar.task = task
|
||||
scan_progressbar.progress_tracker.task = task
|
||||
|
||||
scan_progressbar.on_progress_update(
|
||||
scan_progressbar.progress_tracker.process_progress_message(
|
||||
{"value": value, "max_value": max_val, "done": status == "closed"},
|
||||
metadata={"status": status},
|
||||
)
|
||||
@@ -136,97 +186,39 @@ def test_state_mapping_during_updates(
|
||||
assert scan_progressbar.progressbar.state is expected_state
|
||||
|
||||
|
||||
def test_source_label_updates(scan_progressbar):
|
||||
"""update_source_label() renders correct text for both progress sources."""
|
||||
# device progress
|
||||
scan_progressbar.update_source_label(ProgressSource.DEVICE_PROGRESS, device="motor")
|
||||
assert scan_progressbar.ui.source_label.text() == "Device motor"
|
||||
def test_aborted_done_scan_keeps_partial_progress(scan_progressbar):
|
||||
scan_progressbar.progress_tracker.process_progress_message(
|
||||
{"value": 4, "max_value": 10, "done": True},
|
||||
metadata={"scan_id": "scan-1", "RID": "rid-1", "status": "aborted"},
|
||||
)
|
||||
|
||||
# scan progress (needs a scan_number for deterministic text)
|
||||
scan_progressbar.scan_number = 5
|
||||
scan_progressbar.update_source_label(ProgressSource.SCAN_PROGRESS)
|
||||
assert scan_progressbar.progressbar._user_value == 4
|
||||
assert scan_progressbar.progressbar._user_maximum == 10
|
||||
assert scan_progressbar.progressbar.state is ProgressState.WARNING
|
||||
assert scan_progressbar.progress_tracker.task is None
|
||||
|
||||
|
||||
def test_source_label_updates(scan_progressbar):
|
||||
"""update_source_label() renders the current scan label."""
|
||||
scan_progressbar.progress_tracker.scan_number = 5
|
||||
scan_progressbar.update_source_label()
|
||||
assert scan_progressbar.ui.source_label.text() == "Scan 5"
|
||||
|
||||
|
||||
def test_set_progress_source_connections(scan_progressbar, monkeypatch):
|
||||
""" """
|
||||
def test_source_label_update_logs_only_on_text_change(scan_progressbar):
|
||||
scan_progressbar.progress_tracker.scan_number = 5
|
||||
|
||||
connect_calls = []
|
||||
disconnect_calls = []
|
||||
with mock.patch(
|
||||
"bec_widgets.widgets.progress.scan_progressbar.scan_progressbar.logger.info"
|
||||
) as mock_info:
|
||||
scan_progressbar.update_source_label()
|
||||
scan_progressbar.update_source_label()
|
||||
scan_progressbar.update_source_label()
|
||||
|
||||
def fake_connect(slot, endpoint):
|
||||
connect_calls.append(endpoint)
|
||||
|
||||
def fake_disconnect(slot, endpoint):
|
||||
disconnect_calls.append(endpoint)
|
||||
|
||||
# Patch dispatcher methods
|
||||
monkeypatch.setattr(scan_progressbar.bec_dispatcher, "connect_slot", fake_connect)
|
||||
monkeypatch.setattr(scan_progressbar.bec_dispatcher, "disconnect_slot", fake_disconnect)
|
||||
|
||||
# switch to SCAN_PROGRESS
|
||||
scan_progressbar.scan_number = 7
|
||||
scan_progressbar.set_progress_source(ProgressSource.SCAN_PROGRESS)
|
||||
|
||||
assert scan_progressbar._progress_source == ProgressSource.SCAN_PROGRESS
|
||||
assert scan_progressbar.ui.source_label.text() == "Scan 7"
|
||||
assert connect_calls[-1] == MessageEndpoints.scan_progress()
|
||||
assert disconnect_calls == []
|
||||
|
||||
# switch to DEVICE_PROGRESS
|
||||
device = "motor"
|
||||
scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device=device)
|
||||
|
||||
assert scan_progressbar._progress_source == ProgressSource.DEVICE_PROGRESS
|
||||
assert scan_progressbar.ui.source_label.text() == f"Device {device}"
|
||||
assert connect_calls[-1] == MessageEndpoints.device_progress(device=device)
|
||||
assert disconnect_calls == [MessageEndpoints.scan_progress()]
|
||||
|
||||
# calling again with the SAME source should not add new connect calls
|
||||
prev_connect_count = len(connect_calls)
|
||||
scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device=device)
|
||||
assert len(connect_calls) == prev_connect_count, "No extra connect made for same source"
|
||||
mock_info.assert_called_once_with("Set progress source to Scan 5")
|
||||
|
||||
|
||||
def test_set_progress_source_disconnects_previous_device_subscription(
|
||||
scan_progressbar, monkeypatch
|
||||
):
|
||||
|
||||
disconnect_calls = []
|
||||
|
||||
monkeypatch.setattr(scan_progressbar.bec_dispatcher, "connect_slot", lambda *args: None)
|
||||
monkeypatch.setattr(
|
||||
scan_progressbar.bec_dispatcher,
|
||||
"disconnect_slot",
|
||||
lambda slot, endpoint: disconnect_calls.append(endpoint),
|
||||
)
|
||||
|
||||
scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device="motor1")
|
||||
scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device="motor2")
|
||||
|
||||
assert disconnect_calls == [MessageEndpoints.device_progress(device="motor1")]
|
||||
|
||||
|
||||
def test_set_progress_source_disconnects_device_when_switching_to_scan(
|
||||
scan_progressbar, monkeypatch
|
||||
):
|
||||
|
||||
disconnect_calls = []
|
||||
|
||||
monkeypatch.setattr(scan_progressbar.bec_dispatcher, "connect_slot", lambda *args: None)
|
||||
monkeypatch.setattr(
|
||||
scan_progressbar.bec_dispatcher,
|
||||
"disconnect_slot",
|
||||
lambda slot, endpoint: disconnect_calls.append(endpoint),
|
||||
)
|
||||
|
||||
scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device="motor1")
|
||||
scan_progressbar.set_progress_source(ProgressSource.SCAN_PROGRESS)
|
||||
|
||||
assert disconnect_calls == [MessageEndpoints.device_progress(device="motor1")]
|
||||
|
||||
|
||||
def test_cleanup_disconnects_active_device_subscription(scan_progressbar, monkeypatch):
|
||||
def test_cleanup_disconnects_active_scan_subscription(scan_progressbar, monkeypatch):
|
||||
|
||||
disconnect_calls = []
|
||||
|
||||
@@ -240,148 +232,28 @@ def test_cleanup_disconnects_active_device_subscription(scan_progressbar, monkey
|
||||
monkeypatch.setattr(scan_progressbar.progressbar, "deleteLater", lambda: None)
|
||||
monkeypatch.setattr(BECWidget, "cleanup", lambda self: None)
|
||||
|
||||
scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device="motor1")
|
||||
with (
|
||||
mock.patch.object(scan_progressbar, "close", wraps=scan_progressbar.close) as close_mock,
|
||||
mock.patch.object(
|
||||
scan_progressbar, "deleteLater", wraps=scan_progressbar.deleteLater
|
||||
) as delete_later_mock,
|
||||
):
|
||||
ScanProgressBar.cleanup(scan_progressbar)
|
||||
|
||||
assert disconnect_calls == [MessageEndpoints.scan_progress(), MessageEndpoints.scan_status()]
|
||||
assert scan_progressbar.progress_tracker._connected is False
|
||||
close_mock.assert_not_called()
|
||||
delete_later_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_cleanup_stops_active_task(scan_progressbar, monkeypatch):
|
||||
monkeypatch.setattr(BECWidget, "cleanup", lambda self: None)
|
||||
scan_progressbar.progress_tracker.task = ProgressTask(parent=scan_progressbar)
|
||||
scan_progressbar.progress_tracker._active_scan_id = "scan-1"
|
||||
timer = scan_progressbar.progress_tracker.task.timer
|
||||
|
||||
ScanProgressBar.cleanup(scan_progressbar)
|
||||
|
||||
assert disconnect_calls == [
|
||||
MessageEndpoints.device_progress(device="motor1"),
|
||||
MessageEndpoints.scan_queue_status(),
|
||||
]
|
||||
assert scan_progressbar._progress_source is None
|
||||
assert scan_progressbar._progress_device is None
|
||||
|
||||
|
||||
def test_progressbar_queue_update(scan_progressbar):
|
||||
"""
|
||||
Test that an empty queue update does not change the progress source.
|
||||
"""
|
||||
msg = messages.ScanQueueStatusMessage(
|
||||
queue={"primary": messages.ScanQueueStatus(info=[], status="RUNNING")}
|
||||
)
|
||||
with mock.patch.object(scan_progressbar, "set_progress_source") as mock_set_source:
|
||||
scan_progressbar.on_queue_update(
|
||||
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
|
||||
)
|
||||
mock_set_source.assert_not_called()
|
||||
|
||||
|
||||
def test_progressbar_queue_update_with_scan(scan_progressbar, scan_message):
|
||||
"""
|
||||
Test that a queue update with a scan changes the progress source to SCAN_PROGRESS.
|
||||
"""
|
||||
request_block = messages.RequestBlock(
|
||||
msg=scan_message,
|
||||
RID="some-rid",
|
||||
scan_motors=["samx"],
|
||||
readout_priority={"monitored": ["samx"]},
|
||||
is_scan=True,
|
||||
scan_number=1,
|
||||
scan_id="e3f50794-852c-4bb1-965e-41c585ab0aa9",
|
||||
report_instructions=[{"scan_progress": 20}],
|
||||
)
|
||||
msg = messages.ScanQueueStatusMessage(
|
||||
metadata={},
|
||||
queue={
|
||||
"primary": messages.ScanQueueStatus(
|
||||
info=[
|
||||
messages.QueueInfoEntry(
|
||||
queue_id="40831e2c-fbd1-4432-8072-ad168a7ad964",
|
||||
scan_id=["e3f50794-852c-4bb1-965e-41c585ab0aa9"],
|
||||
status="RUNNING",
|
||||
active_request_block=request_block,
|
||||
is_scan=[True],
|
||||
request_blocks=[request_block],
|
||||
scan_number=[1],
|
||||
)
|
||||
],
|
||||
status="RUNNING",
|
||||
)
|
||||
},
|
||||
)
|
||||
|
||||
with mock.patch.object(scan_progressbar, "set_progress_source") as mock_set_source:
|
||||
scan_progressbar.on_queue_update(
|
||||
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
|
||||
)
|
||||
mock_set_source.assert_called_once_with(ProgressSource.SCAN_PROGRESS)
|
||||
|
||||
|
||||
def test_progressbar_queue_update_with_device(scan_progressbar, scan_message):
|
||||
"""
|
||||
Test that a queue update with a device changes the progress source to DEVICE_PROGRESS.
|
||||
"""
|
||||
request_block = messages.RequestBlock(
|
||||
msg=scan_message,
|
||||
RID="some-rid",
|
||||
scan_motors=["samx"],
|
||||
readout_priority={"monitored": ["samx"]},
|
||||
is_scan=True,
|
||||
scan_number=1,
|
||||
scan_id="e3f50794-852c-4bb1-965e-41c585ab0aa9",
|
||||
report_instructions=[{"device_progress": ["samx"]}],
|
||||
)
|
||||
msg = messages.ScanQueueStatusMessage(
|
||||
metadata={},
|
||||
queue={
|
||||
"primary": messages.ScanQueueStatus(
|
||||
info=[
|
||||
messages.QueueInfoEntry(
|
||||
queue_id="40831e2c-fbd1-4432-8072-ad168a7ad964",
|
||||
scan_id=["e3f50794-852c-4bb1-965e-41c585ab0aa9"],
|
||||
status="RUNNING",
|
||||
active_request_block=request_block,
|
||||
is_scan=[True],
|
||||
request_blocks=[request_block],
|
||||
scan_number=[1],
|
||||
)
|
||||
],
|
||||
status="RUNNING",
|
||||
)
|
||||
},
|
||||
)
|
||||
|
||||
with mock.patch.object(scan_progressbar, "set_progress_source") as mock_set_source:
|
||||
scan_progressbar.on_queue_update(
|
||||
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
|
||||
)
|
||||
mock_set_source.assert_called_once_with(ProgressSource.DEVICE_PROGRESS, device="samx")
|
||||
|
||||
|
||||
def test_progressbar_queue_update_with_no_scan_or_device(scan_progressbar, scan_message):
|
||||
"""
|
||||
Test that a queue update with neither scan nor device does not change the progress source.
|
||||
"""
|
||||
request_block = messages.RequestBlock(
|
||||
msg=scan_message,
|
||||
RID="some-rid",
|
||||
scan_motors=["samx"],
|
||||
readout_priority={"monitored": ["samx"]},
|
||||
is_scan=True,
|
||||
scan_number=1,
|
||||
scan_id="e3f50794-852c-4bb1-965e-41c585ab0aa9",
|
||||
)
|
||||
msg = messages.ScanQueueStatusMessage(
|
||||
metadata={},
|
||||
queue={
|
||||
"primary": messages.ScanQueueStatus(
|
||||
info=[
|
||||
messages.QueueInfoEntry(
|
||||
queue_id="40831e2c-fbd1-4432-8072-ad168a7ad964",
|
||||
scan_id=["e3f50794-852c-4bb1-965e-41c585ab0aa9"],
|
||||
status="RUNNING",
|
||||
active_request_block=request_block,
|
||||
is_scan=[True],
|
||||
request_blocks=[request_block],
|
||||
scan_number=[1],
|
||||
)
|
||||
],
|
||||
status="RUNNING",
|
||||
)
|
||||
},
|
||||
)
|
||||
|
||||
with mock.patch.object(scan_progressbar, "set_progress_source") as mock_set_source:
|
||||
scan_progressbar.on_queue_update(
|
||||
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
|
||||
)
|
||||
mock_set_source.assert_not_called()
|
||||
assert not timer.isActive()
|
||||
assert scan_progressbar.progress_tracker.task is None
|
||||
assert scan_progressbar.progress_tracker._active_scan_id is None
|
||||
|
||||
Reference in New Issue
Block a user