1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-05-09 08:12:15 +02:00

Compare commits

..

11 Commits

22 changed files with 132 additions and 635 deletions
+4 -1
View File
@@ -1,4 +1,7 @@
#!/usr/bin/env python3
##########################
### AI-generated file. ###
##########################
"""Aggregate and merge benchmark JSON files.
The workflow runs the same benchmark suite on multiple independent runners.
+4 -1
View File
@@ -1,4 +1,7 @@
#!/usr/bin/env python3
##########################
### AI-generated file. ###
##########################
"""Compare benchmark JSON files and write a GitHub Actions summary.
The script supports JSON emitted by hyperfine, JSON emitted by pytest-benchmark,
+5
View File
@@ -1,4 +1,9 @@
#!/usr/bin/env bash
##########################
### AI-generated file. ###
##########################
set -euo pipefail
mkdir -p benchmark-results
+4 -1
View File
@@ -1,4 +1,7 @@
#!/usr/bin/env python3
##########################
### AI-generated file. ###
##########################
"""Run a command with BEC e2e services available."""
from __future__ import annotations
+9 -6
View File
@@ -1,6 +1,6 @@
name: BW Benchmarks
on: [workflow_call]
on: [ workflow_call ]
permissions:
contents: read
@@ -10,7 +10,7 @@ env:
BENCHMARK_BASELINE_JSON: gh-pages-benchmark-data/benchmarks/latest.json
BENCHMARK_SUMMARY: benchmark-results/summary.md
BENCHMARK_COMMAND: "bash .github/scripts/run_benchmarks.sh"
BENCHMARK_THRESHOLD_PERCENT: 10
BENCHMARK_THRESHOLD_PERCENT: 20
BENCHMARK_HIGHER_IS_BETTER: false
jobs:
@@ -25,7 +25,7 @@ jobs:
strategy:
fail-fast: false
matrix:
attempt: [1, 2, 3]
attempt: [ 1, 2, 3 ]
env:
BENCHMARK_JSON: benchmark-results/current-${{ matrix.attempt }}.json
@@ -84,7 +84,7 @@ jobs:
path: ${{ env.BENCHMARK_JSON }}
benchmark:
needs: [benchmark_attempt]
needs: [ benchmark_attempt ]
runs-on: ubuntu-latest
permissions:
contents: read
@@ -191,7 +191,7 @@ jobs:
run: exit 1
publish:
needs: [benchmark]
needs: [ benchmark ]
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
permissions:
@@ -208,7 +208,10 @@ jobs:
uses: actions/download-artifact@v4
with:
name: bw-benchmark-json
path: .
path: benchmark-results
- name: Verify aggregate benchmark artifact
run: test -s "$BENCHMARK_JSON"
- name: Prepare gh-pages for publishing
run: |
+1
View File
@@ -16,6 +16,7 @@ jobs:
PROJECT_PATH: ${{ github.repository }}
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
QTWEBENGINE_CHROMIUM_FLAGS: "--disable-gpu"
steps:
- uses: actions/checkout@v4
+1
View File
@@ -36,6 +36,7 @@ jobs:
PROJECT_PATH: ${{ github.repository }}
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
QTWEBENGINE_CHROMIUM_FLAGS: "--disable-gpu"
steps:
- name: Checkout BEC Widgets
+1
View File
@@ -35,6 +35,7 @@ jobs:
env:
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
QTWEBENGINE_CHROMIUM_FLAGS: "--disable-gpu"
steps:
- name: Checkout BEC Widgets
+47
View File
@@ -1,6 +1,53 @@
# CHANGELOG
## v3.7.2 (2026-04-29)
### Bug Fixes
- **dock-area**: Avoid switching profile when saving new profile
([`73b44cf`](https://github.com/bec-project/bec_widgets/commit/73b44cffb219347cacb609f3b93068eda6701b42))
- **workspace-actions**: Use try/finally and restore previous blocked state in refresh_profiles
([`30ef255`](https://github.com/bec-project/bec_widgets/commit/30ef25533af9df5a9bd9e69808dc49fdf22f4318))
Agent-Logs-Url:
https://github.com/bec-project/bec_widgets/sessions/004cb4bc-5015-485e-a803-1e63876b7024
Co-authored-by: wyzula-jan <133381102+wyzula-jan@users.noreply.github.com>
### Build System
- Add pytest-benchmark dependency
([`551d38d`](https://github.com/bec-project/bec_widgets/commit/551d38d90111361e64bfcf10a1409be71dd298bc))
### Chores
- Update header comments in script files to indicate AI generation
([`3f1aa80`](https://github.com/bec-project/bec_widgets/commit/3f1aa80756368454c3631cb6cf9d29db28af177a))
### Continuous Integration
- Add benchmark workflow
([`999b7a2`](https://github.com/bec-project/bec_widgets/commit/999b7a2321f2f222c04b056a2db4280f66de9c48))
- Fix benchmark upload
([`19b5c8f`](https://github.com/bec-project/bec_widgets/commit/19b5c8f724dbdb7421957c290f9213bf072392df))
- Increase threshold to 20 percent
([`409c9e5`](https://github.com/bec-project/bec_widgets/commit/409c9e5bfacdfc003f1bc8c9944f01798bd3818e))
### Testing
- Fix assertions after updating ophyd devices templates
([`a614d66`](https://github.com/bec-project/bec_widgets/commit/a614d662d6da716a49afb4ed3a3903f108210386))
Co-authored-by: Copilot <copilot@github.com>
- Remove references to "scan_motors" in tests
([`5056ef8`](https://github.com/bec-project/bec_widgets/commit/5056ef8946d03a20e802709d3fe81c84c195fe41))
## v3.7.1 (2026-04-21)
### Bug Fixes
@@ -235,11 +235,8 @@ class BECDockArea(DockAreaWidget):
def _load_initial_profile(self, name: str) -> None:
"""Load the initial profile."""
self.load_profile(name)
combo = self.toolbar.components.get_action("workspace_combo").widget
combo.blockSignals(True)
if not self._empty_profile_active:
combo.setCurrentText(name)
combo.blockSignals(False)
self._set_workspace_combo_text_silent(name)
def _start_empty_workspace(self) -> None:
"""
@@ -669,6 +666,14 @@ class BECDockArea(DockAreaWidget):
combo = self.toolbar.components.get_action("workspace_combo").widget
combo.refresh_profiles(active_profile=name)
def _set_workspace_combo_text_silent(self, text: str) -> None:
combo = self.toolbar.components.get_action("workspace_combo").widget
was_blocked = combo.blockSignals(True)
try:
combo.setCurrentText(text)
finally:
combo.blockSignals(was_blocked)
def _enter_empty_profile_state(self) -> None:
"""
Switch to the transient empty workspace state.
@@ -796,7 +801,6 @@ class BECDockArea(DockAreaWidget):
self._pending_autosave_skip = (current_profile, name)
else:
self._pending_autosave_skip = None
workspace_combo.setCurrentText(name)
self._finalize_profile_change(name, namespace)
@SafeSlot()
@@ -24,19 +24,9 @@ class ProfileComboBox(QComboBox):
def set_quick_profile_provider(self, provider: Callable[[], list[str]]) -> None:
self._quick_provider = provider
def refresh_profiles(
self, active_profile: str | None = None, show_empty_profile: bool = False
def _refresh_profiles(
self, current_text: str, active_profile: str | None = None, show_empty_profile: bool = False
) -> None:
"""
Refresh the profile list and ensure the active profile is visible.
Args:
active_profile(str | None): The currently active profile name.
show_empty_profile(bool): If True, show an explicit empty unsaved workspace entry.
"""
current_text = active_profile or self.currentText()
self.blockSignals(True)
self.clear()
quick_profiles = self._quick_provider()
@@ -103,7 +93,6 @@ class ProfileComboBox(QComboBox):
if index >= 0:
self.setCurrentIndex(index)
self.blockSignals(False)
if active_profile and self.currentText() != active_profile:
idx = self.findText(active_profile)
if idx >= 0:
@@ -115,6 +104,24 @@ class ProfileComboBox(QComboBox):
else:
self.setToolTip("")
def refresh_profiles(
self, active_profile: str | None = None, show_empty_profile: bool = False
) -> None:
"""
Refresh the profile list and ensure the active profile is visible.
Args:
active_profile(str | None): The currently active profile name.
show_empty_profile(bool): If True, show an explicit empty unsaved workspace entry.
"""
current_text = active_profile or self.currentText()
was_blocked = self.blockSignals(True)
try:
self._refresh_profiles(current_text, active_profile, show_empty_profile)
finally:
self.blockSignals(was_blocked)
def workspace_bundle(components: ToolbarComponents, enable_tools: bool = True) -> ToolbarBundle:
"""
@@ -122,6 +129,7 @@ def workspace_bundle(components: ToolbarComponents, enable_tools: bool = True) -
Args:
components (ToolbarComponents): The components to be added to the bundle.
enable_tools(bool): If True, show the workspace management tools; otherwise, only show the profile combo.
Returns:
ToolbarBundle: The workspace toolbar bundle.
@@ -25,7 +25,6 @@ from bec_widgets.utils.colors import apply_theme, get_accent_colors
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.widgets.control.buttons.stop_button.stop_button import StopButton
from bec_widgets.widgets.control.scan_control.scan_group_box import ScanGroupBox
from bec_widgets.widgets.control.scan_control.scan_info_adapter import ScanInfoAdapter
from bec_widgets.widgets.editors.scan_metadata.scan_metadata import ScanMetadata
from bec_widgets.widgets.utility.toggle.toggle import ToggleSwitch
@@ -98,7 +97,6 @@ class ScanControl(BECWidget, QWidget):
self._hide_scan_control_buttons = False
self._hide_metadata = False
self._hide_scan_selection_combobox = False
self._scan_info_adapter = ScanInfoAdapter()
# Create and set main layout
self._init_UI()
@@ -193,12 +191,11 @@ class ScanControl(BECWidget, QWidget):
MessageEndpoints.available_scans()
).resource
if self.config.allowed_scans is None:
supported_scans = ["ScanBase", "SyncFlyScanBase", "AsyncFlyScanBase", "ScanBaseV4"]
supported_scans = ["ScanBase", "SyncFlyScanBase", "AsyncFlyScanBase"]
allowed_scans = [
scan_name
for scan_name, scan_info in self.available_scans.items()
if scan_info["base_class"] in supported_scans
and self._scan_info_adapter.has_scan_ui_config(scan_info)
if scan_info["base_class"] in supported_scans and len(scan_info["gui_config"]) > 0
]
else:
@@ -393,14 +390,14 @@ class ScanControl(BECWidget, QWidget):
self.reset_layout()
selected_scan_info = self.available_scans.get(scan_name, {})
gui_config = self._scan_info_adapter.build_scan_ui_config(selected_scan_info)
arg_group = gui_config.get("arg_group", None)
kwarg_groups = gui_config.get("kwarg_groups", [])
gui_config = selected_scan_info.get("gui_config", {})
self.arg_group = gui_config.get("arg_group", None)
self.kwarg_groups = gui_config.get("kwarg_groups", None)
if arg_group and bool(arg_group.get("arg_inputs")):
self.add_arg_group(arg_group)
if kwarg_groups:
self.add_kwargs_boxes(kwarg_groups)
if bool(self.arg_group["arg_inputs"]):
self.add_arg_group(self.arg_group)
if len(self.kwarg_groups) > 0:
self.add_kwargs_boxes(self.kwarg_groups)
self.update()
self.adjustSize()
@@ -209,8 +209,6 @@ class ScanGroupBox(QGroupBox):
self.labels = []
self.widgets = []
self._widget_configs = {}
self._column_labels = {}
self.selected_devices = {}
self.init_box(self.config)
@@ -249,7 +247,6 @@ class ScanGroupBox(QGroupBox):
label = QLabel(text=display_name)
self.layout.addWidget(label, row, column_index)
self.labels.append(label)
self._column_labels[column_index] = label
def add_input_widgets(self, group_inputs: dict, row) -> None:
"""
@@ -275,30 +272,21 @@ class ScanGroupBox(QGroupBox):
if default == "_empty":
default = None
widget = widget_class(parent=self.parent(), arg_name=arg_name, default=default)
self._apply_numeric_precision(widget, item)
self._apply_numeric_limits(widget, item)
if isinstance(widget, DeviceLineEdit):
widget.set_device_filter(BECDeviceFilter.DEVICE)
self.selected_devices[widget] = ""
widget.device_selected.connect(self.emit_device_selected)
widget.textChanged.connect(
lambda text, device_widget=widget: self._handle_device_text_changed(
device_widget, text
)
)
if isinstance(widget, ScanLiteralsComboBox):
widget.set_literals(item["type"].get("Literal", []))
self._widget_configs[widget] = item
self._apply_unit_metadata(widget, item)
tooltip = item.get("tooltip", None)
if tooltip is not None:
widget.setToolTip(item["tooltip"])
self.layout.addWidget(widget, row, column_index)
self.widgets.append(widget)
@Slot(str)
def emit_device_selected(self, device_name):
sender = self.sender()
self.selected_devices[sender] = device_name.strip()
if isinstance(sender, DeviceLineEdit):
self._update_reference_units(sender, self._device_units(sender.get_current_device()))
self.selected_devices[self.sender()] = device_name.strip()
selected_devices_str = " ".join(self.selected_devices.values())
self.device_selected.emit(selected_devices_str)
@@ -325,7 +313,6 @@ class ScanGroupBox(QGroupBox):
for widget in self.widgets[-len(self.inputs) :]:
if isinstance(widget, DeviceLineEdit):
self.selected_devices[widget] = ""
self._widget_configs.pop(widget, None)
widget.close()
widget.deleteLater()
self.widgets = self.widgets[: -len(self.inputs)]
@@ -338,7 +325,6 @@ class ScanGroupBox(QGroupBox):
for widget in list(self.widgets):
if isinstance(widget, DeviceLineEdit):
self.selected_devices.pop(widget, None)
self._widget_configs.pop(widget, None)
widget.close()
widget.deleteLater()
self.layout.removeWidget(widget)
@@ -437,119 +423,3 @@ class ScanGroupBox(QGroupBox):
if widget.arg_name == key:
WidgetIO.set_value(widget, value)
break
@staticmethod
def _unit_tooltip(item: dict, units: str | None = None) -> str | None:
tooltip = item.get("tooltip", None)
reference_units = item.get("reference_units", None)
units = units or item.get("units", None)
tooltip_parts = [tooltip] if tooltip else []
if units:
tooltip_parts.append(f"Units: {units}")
elif reference_units:
tooltip_parts.append(f"Units from: {reference_units}")
if tooltip_parts:
return "\n".join(tooltip_parts)
return None
def _apply_unit_metadata(self, widget, item: dict, units: str | None = None) -> None:
units = units or item.get("units", None)
tooltip = self._unit_tooltip(item, units)
widget.setToolTip(tooltip or "")
if hasattr(widget, "setSuffix"):
widget.setSuffix(f" {units}" if units else "")
def _refresh_column_label(self, column: int, item: dict) -> None:
if column not in self._column_labels:
return
self._column_labels[column].setText(item.get("display_name", item.get("name", None)))
@staticmethod
def _device_units(device) -> str | None:
egu = getattr(device, "egu", None)
if not callable(egu):
return None
try:
return egu()
except Exception:
logger.exception("Failed to fetch engineering units from device %s", device)
return None
def _widget_position(self, widget) -> tuple[int, int] | None:
for row in range(self.layout.rowCount()):
for column in range(self.layout.columnCount()):
item = self.layout.itemAtPosition(row, column)
if item is not None and item.widget() is widget:
return row, column
return None
def _update_reference_units(self, device_widget: DeviceLineEdit, units: str | None) -> None:
position = self._widget_position(device_widget)
if position is None:
return
source_row, _ = position
source_name = device_widget.arg_name
for widget in self.widgets:
item = self._widget_configs.get(widget, {})
if item.get("reference_units") != source_name:
continue
widget_position = self._widget_position(widget)
if widget_position is None:
continue
row, column = widget_position
if self.box_type == "args" and row != source_row:
continue
self._apply_unit_metadata(widget, item, units)
self._refresh_column_label(column, item)
def _handle_device_text_changed(self, device_widget: DeviceLineEdit, device_name: str) -> None:
if not device_widget.validate_device(device_name):
self.selected_devices[device_widget] = ""
self._update_reference_units(device_widget, None)
@staticmethod
def _apply_numeric_precision(widget: ScanDoubleSpinBox, item: dict) -> None:
if not isinstance(widget, ScanDoubleSpinBox):
return
precision = item.get("precision")
if precision is None:
return
try:
widget.setDecimals(max(0, int(precision)))
except (TypeError, ValueError):
logger.warning(
"Ignoring invalid precision %r for parameter %s", precision, item.get("name")
)
@staticmethod
def _apply_numeric_limits(widget: ScanDoubleSpinBox | ScanSpinBox, item: dict) -> None:
if isinstance(widget, ScanSpinBox):
minimum = -2147483647 # largest int which qt allows
maximum = 2147483647
if item.get("ge") is not None:
minimum = int(item["ge"])
if item.get("gt") is not None:
minimum = int(item["gt"]) + 1
if item.get("le") is not None:
maximum = int(item["le"])
if item.get("lt") is not None:
maximum = int(item["lt"]) - 1
widget.setRange(minimum, maximum)
return
if isinstance(widget, ScanDoubleSpinBox):
minimum = -float("inf")
maximum = float("inf")
step = 10 ** (-widget.decimals())
if item.get("ge") is not None:
minimum = float(item["ge"])
if item.get("gt") is not None:
minimum = float(item["gt"]) + step
if item.get("le") is not None:
maximum = float(item["le"])
if item.get("lt") is not None:
maximum = float(item["lt"]) - step
widget.setRange(minimum, maximum)
@@ -1,243 +0,0 @@
"""Helpers for translating BEC scan metadata into ScanControl UI configuration."""
from __future__ import annotations
import re
from typing import Any
AnnotationValue = str | dict[str, Any] | list[Any] | None
ScanArgumentMetadata = dict[str, Any]
SignatureEntry = dict[str, Any]
ScanInputConfig = dict[str, Any]
ScanInfo = dict[str, Any]
ScanUIConfig = dict[str, Any]
class ScanInfoAdapter:
"""Normalize available-scan payloads into the structure consumed by ``ScanControl``."""
@staticmethod
def has_scan_ui_config(scan_info: ScanInfo) -> bool:
"""Check whether a scan exposes enough metadata to build a UI.
Args:
scan_info (ScanInfo): Available-scan payload for one scan.
Returns:
bool: ``True`` when a supported GUI metadata field is present.
"""
return bool(scan_info.get("gui_visibility") or scan_info.get("gui_config"))
@staticmethod
def format_display_name(name: str) -> str:
"""Convert a parameter name into a user-facing label.
Args:
name (str): Raw parameter name.
Returns:
str: Formatted display label such as ``Exp Time``.
"""
parts = re.split(r"(_|\d+)", name)
return " ".join(part.capitalize() for part in parts if part.isalnum()).strip()
@staticmethod
def resolve_tooltip(scan_argument: ScanArgumentMetadata) -> str | None:
"""Resolve the tooltip text from parsed ``ScanArgument`` metadata.
Args:
scan_argument (ScanArgumentMetadata): Parsed ``ScanArgument`` metadata.
Returns:
str | None: Explicit tooltip text if provided, otherwise the description fallback.
"""
return scan_argument.get("tooltip") or scan_argument.get("description")
@staticmethod
def parse_annotation(
annotation: AnnotationValue,
) -> tuple[AnnotationValue, ScanArgumentMetadata]:
"""Extract the serialized base annotation and ``ScanArgument`` metadata.
Args:
annotation (AnnotationValue): Serialized annotation payload from BEC.
Returns:
tuple[AnnotationValue, ScanArgumentMetadata]: The unwrapped annotation and parsed
``ScanArgument`` metadata.
"""
scan_argument: ScanArgumentMetadata = {}
if isinstance(annotation, list):
annotation = next(
(entry for entry in annotation if entry != "NoneType"),
annotation[0] if annotation else "_empty",
)
if isinstance(annotation, dict) and "Annotated" in annotation:
annotated = annotation["Annotated"]
annotation = annotated.get("type", "_empty")
scan_argument = annotated.get("metadata", {}).get("ScanArgument", {}) or {}
return annotation, scan_argument
@staticmethod
def scan_arg_type_from_annotation(annotation: AnnotationValue) -> AnnotationValue:
"""Normalize an annotation value to the widget type expected by ``ScanControl``.
Args:
annotation (AnnotationValue): Serialized or parsed annotation value.
Returns:
AnnotationValue: The normalized type identifier used by the widget layer.
"""
if isinstance(annotation, dict):
return annotation
if annotation in ("_empty", None):
return "str"
return annotation
def scan_input_from_signature(
self, param: SignatureEntry, arg: bool = False
) -> ScanInputConfig:
"""Build one ScanControl input description from a signature entry.
Args:
param (SignatureEntry): Serialized signature entry.
arg (bool): Whether the parameter belongs to the positional arg bundle.
Returns:
ScanInputConfig: Normalized input configuration for ``ScanControl``.
"""
annotation, scan_argument = self.parse_annotation(param.get("annotation"))
return self._build_scan_input(
name=param["name"],
annotation=annotation,
scan_argument=scan_argument,
arg=arg,
default=None if arg else param.get("default", None),
)
def scan_input_from_arg_input(
self, name: str, item_type: AnnotationValue, signature_by_name: dict[str, SignatureEntry]
) -> ScanInputConfig:
"""Build one arg-bundle input description from ``arg_input`` metadata.
Args:
name (str): Argument name from ``arg_input``.
item_type (AnnotationValue): Serialized argument type from ``arg_input``.
signature_by_name (dict[str, SignatureEntry]): Signature entries indexed by
parameter name.
Returns:
ScanInputConfig: Normalized input configuration for one arg-bundle field.
"""
if name in signature_by_name:
scan_input = self.scan_input_from_signature(signature_by_name[name], arg=True)
scan_input["type"] = self.scan_arg_type_from_annotation(
self.parse_annotation(signature_by_name[name].get("annotation"))[0]
)
else:
annotation, scan_argument = self.parse_annotation(item_type)
scan_input = self._build_scan_input(
name=name,
annotation=annotation,
scan_argument=scan_argument,
arg=True,
default=None,
)
if scan_input["type"] in ("_empty", None):
scan_input["type"] = item_type
return scan_input
def _build_scan_input(
self,
name: str,
annotation: AnnotationValue,
scan_argument: ScanArgumentMetadata,
*,
arg: bool,
default: Any,
) -> ScanInputConfig:
"""Build one normalized ScanControl input configuration.
Args:
name (str): Parameter name.
annotation (AnnotationValue): Parsed annotation value.
scan_argument (ScanArgumentMetadata): Parsed ``ScanArgument`` metadata.
arg (bool): Whether the parameter belongs to the positional arg bundle.
default (Any): Default value for the parameter.
Returns:
ScanInputConfig: Normalized input configuration.
"""
return {
"arg": arg,
"name": name,
"type": self.scan_arg_type_from_annotation(annotation),
"display_name": scan_argument.get("display_name") or self.format_display_name(name),
"tooltip": self.resolve_tooltip(scan_argument),
"default": default,
"expert": scan_argument.get("expert", False),
"precision": scan_argument.get("precision"),
"units": scan_argument.get("units"),
"reference_units": scan_argument.get("reference_units"),
"gt": scan_argument.get("gt"),
"ge": scan_argument.get("ge"),
"lt": scan_argument.get("lt"),
"le": scan_argument.get("le"),
"alternative_group": scan_argument.get("alternative_group"),
}
def build_scan_ui_config(self, scan_info: ScanInfo) -> ScanUIConfig:
"""Normalize one available-scan entry into the widget UI configuration.
Args:
scan_info (ScanInfo): Available-scan payload for one scan.
Returns:
ScanUIConfig: Legacy group structure consumed by ``ScanControl`` and
``ScanGroupBox``.
"""
gui_visualization = (
scan_info.get("gui_visualization") or scan_info.get("gui_visibility") or {}
)
if not gui_visualization and scan_info.get("gui_config"):
return scan_info["gui_config"]
signature = scan_info.get("signature", [])
signature_by_name = {entry["name"]: entry for entry in signature}
arg_group = None
arg_input = scan_info.get("arg_input", {})
if isinstance(arg_input, dict) and arg_input:
bundle_size = scan_info.get("arg_bundle_size", {})
inputs = [
self.scan_input_from_arg_input(name, item_type, signature_by_name)
for name, item_type in arg_input.items()
]
arg_group = {
"name": "Scan Arguments",
"bundle": bundle_size.get("bundle"),
"arg_inputs": arg_input,
"inputs": inputs,
"min": bundle_size.get("min"),
"max": bundle_size.get("max"),
}
kwarg_groups = []
arg_names = set(arg_input) if isinstance(arg_input, dict) else set()
for group_name, input_names in gui_visualization.items():
inputs = []
for input_name in input_names:
if input_name in arg_names or input_name not in signature_by_name:
continue
param = signature_by_name[input_name]
if param.get("kind") in ("VAR_POSITIONAL", "VAR_KEYWORD"):
continue
inputs.append(self.scan_input_from_signature(param))
if inputs:
kwarg_groups.append({"name": group_name, "inputs": inputs})
return {
"scan_class_name": scan_info.get("class"),
"arg_group": arg_group,
"kwarg_groups": kwarg_groups,
}
+5 -5
View File
@@ -1,6 +1,6 @@
[project]
name = "bec_widgets"
version = "3.7.1"
version = "3.7.2"
description = "BEC Widgets"
requires-python = ">=3.11"
classifiers = [
@@ -10,8 +10,8 @@ classifiers = [
]
dependencies = [
"PyJWT~=2.9",
"PySide6==6.9.0",
"PySide6-QtAds==4.4.0",
"PySide6==6.11.0",
"PySide6-QtAds==4.5.0.3",
"bec_ipython_client~=3.107,>=3.107.2", # needed for jupyter console
"bec_lib~=3.107,>=3.107.2",
"bec_qthemes~=1.0, >=1.3.4",
@@ -23,7 +23,7 @@ dependencies = [
"ophyd_devices~=1.29, >=1.29.1",
"pydantic~=2.0",
"pylsp-bec~=1.2",
"pyqtgraph==0.13.7",
"pyqtgraph==0.14.0",
"qtconsole~=5.5, >=5.5.1", # needed for jupyter console
"qtmonaco~=0.8, >=0.8.1",
"qtpy~=2.4",
@@ -56,7 +56,7 @@ dev = [
"watchdog~=6.0",
"pre_commit~=4.2",
]
qtermwidget = ["pyside6_qtermwidget"]
qtermwidget = []
[build-system]
requires = ["hatchling"]
-1
View File
@@ -59,5 +59,4 @@ def test_run_line_scan_with_parameters_e2e(scan_control, bec_client_lib, qtbot):
last_scan = queue.scan_storage.storage[-1]
assert last_scan.status_message.info["scan_name"] == scan_name
assert last_scan.status_message.info["exp_time"] == kwargs["exp_time"]
assert last_scan.status_message.info["scan_motors"] == [args["device"]]
assert last_scan.status_message.info["num_points"] == kwargs["steps"]
-1
View File
@@ -84,7 +84,6 @@ def test_scan_metadata_for_custom_scan(
last_scan = queue.scan_storage.storage[-1]
assert last_scan.status_message.info["scan_name"] == scan_name
assert last_scan.status_message.info["exp_time"] == kwargs["exp_time"]
assert last_scan.status_message.info["scan_motors"] == [args["device"]]
assert last_scan.status_message.info["num_points"] == kwargs["steps"]
if valid:
-1
View File
@@ -71,7 +71,6 @@ def bec_queue_msg_full():
},
"report_instructions": [{"scan_progress": 20}],
"scan_id": "2d704cc3-c172-404c-866d-608ce09fce40",
"scan_motors": ["samx"],
"scan_number": 1289,
}
],
+2 -5
View File
@@ -146,10 +146,7 @@ class TestDeviceManagerViewDialogs:
group_combo: QtWidgets.QComboBox = dialog._control_widgets["group_combo"]
assert group_combo.count() == len(OPHYD_DEVICE_TEMPLATES)
# Test select a group from available templates
variant_combo = dialog._control_widgets["variant_combo"]
assert variant_combo.isEnabled() is False
with qtbot.waitSignal(group_combo.currentTextChanged):
epics_signal_index = group_combo.findText("EpicsSignal")
group_combo.setCurrentIndex(epics_signal_index) # Select "EpicsSignal" group
@@ -235,7 +232,7 @@ class TestDeviceManagerViewDialogs:
sample_config = {
"name": "TestDevice",
"enabled": True,
"deviceClass": "ophyd.EpicsSignal",
"deviceClass": "ophyd_devices.EpicsSignal",
"readoutPriority": "baseline",
"deviceConfig": {"read_pv": "X25DA-ES1-MOT:GET"},
}
@@ -248,7 +245,7 @@ class TestDeviceManagerViewDialogs:
assert variant_combo.currentText() == "EpicsSignal"
config = dialog._device_config_template.get_config_fields()
assert config["name"] == "TestDevice"
assert config["deviceClass"] == "ophyd.EpicsSignal"
assert config["deviceClass"] == "ophyd_devices.EpicsSignal"
assert config["deviceConfig"]["read_pv"] == "X25DA-ES1-MOT:GET"
# Test now to add the device config with different validation results
+6 -1
View File
@@ -1863,9 +1863,14 @@ class TestWorkspaceProfileOperations:
with patch(
"bec_widgets.widgets.containers.dock_area.dock_area.SaveProfileDialog", StubDialog
):
advanced_dock_area.save_profile(show_dialog=True)
widgets_before_save = list(advanced_dock_area.widget_list())
with patch.object(advanced_dock_area, "load_profile") as mock_load_profile:
advanced_dock_area.save_profile(show_dialog=True)
qtbot.wait(100)
mock_load_profile.assert_not_called()
qtbot.wait(500)
assert list(advanced_dock_area.widget_list()) == widgets_before_save
source_manifest = read_manifest(helper.open_user(source_profile))
new_manifest = read_manifest(helper.open_user(new_profile))
-127
View File
@@ -277,133 +277,6 @@ def test_populate_scans(scan_control, mocked_client):
assert sorted(items) == sorted(expected_scans)
def test_scan_control_uses_gui_visibility_and_signature(qtbot, mocked_client):
scan_info = {
"class": "AnnotatedScan",
"base_class": "ScanBase",
"arg_input": {
"device": "DeviceBase",
"start": {
"Annotated": {
"type": "float",
"metadata": {
"ScanArgument": {
"display_name": "Start Position",
"description": "Start position",
"tooltip": "Custom start tooltip",
"expert": False,
"alternative_group": None,
"units": None,
"reference_units": "device",
}
},
}
},
"stop": {
"Annotated": {
"type": "float",
"metadata": {
"ScanArgument": {
"display_name": None,
"description": "Stop position",
"tooltip": None,
"expert": False,
"alternative_group": None,
"units": None,
"reference_units": "device",
}
},
}
},
},
"arg_bundle_size": {"bundle": 3, "min": 1, "max": None},
"gui_visibility": {
"Movement Parameters": ["steps", "step_size"],
"Acquisition Parameters": ["exp_time", "relative"],
},
"required_kwargs": [],
"signature": [
{"name": "args", "kind": "VAR_POSITIONAL", "default": "_empty", "annotation": "_empty"},
{"name": "steps", "kind": "KEYWORD_ONLY", "default": 10, "annotation": "int"},
{
"name": "step_size",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": {
"Annotated": {
"type": "float",
"metadata": {
"ScanArgument": {
"display_name": "Step Size Custom",
"description": "Step size",
"tooltip": "Custom step tooltip",
"expert": False,
"alternative_group": "scan_resolution",
"units": "mm",
"reference_units": None,
}
},
}
},
},
{
"name": "exp_time",
"kind": "KEYWORD_ONLY",
"default": 0,
"annotation": {
"Annotated": {
"type": "float",
"metadata": {
"ScanArgument": {
"display_name": None,
"description": None,
"tooltip": "Exposure time",
"expert": False,
"alternative_group": None,
"units": "s",
"reference_units": None,
}
},
}
},
},
{"name": "relative", "kind": "KEYWORD_ONLY", "default": False, "annotation": "bool"},
{"name": "kwargs", "kind": "VAR_KEYWORD", "default": "_empty", "annotation": "_empty"},
],
}
mocked_client.connector.set(
MessageEndpoints.available_scans(),
AvailableResourceMessage(resource={"annotated_scan": scan_info}),
)
widget = ScanControl(client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
widget.comboBox_scan_selection.setCurrentText("annotated_scan")
assert widget.comboBox_scan_selection.count() == 1
assert widget.arg_box.layout.itemAtPosition(0, 1).widget().text() == "Start Position"
assert widget.arg_box.widgets[1].toolTip() == "Custom start tooltip\nUnits from: device"
with patch.object(mocked_client.device_manager.devices.samx, "egu", return_value="mm"):
WidgetIO.set_value(widget.arg_box.widgets[0], "samx")
assert widget.arg_box.layout.itemAtPosition(0, 1).widget().text() == "Start Position"
assert widget.arg_box.widgets[1].suffix() == " mm"
assert widget.arg_box.widgets[1].toolTip() == "Custom start tooltip\nUnits: mm"
widget.arg_box.widgets[0].setText("not_a_device")
assert widget.arg_box.layout.itemAtPosition(0, 1).widget().text() == "Start Position"
assert widget.arg_box.widgets[1].suffix() == ""
assert widget.arg_box.widgets[1].toolTip() == "Custom start tooltip\nUnits from: device"
assert [box.title() for box in widget.kwarg_boxes] == [
"Movement Parameters",
"Acquisition Parameters",
]
assert widget.kwarg_boxes[0].layout.itemAtPosition(0, 1).widget().text() == "Step Size Custom"
assert widget.kwarg_boxes[0].widgets[1].suffix() == " mm"
assert widget.kwarg_boxes[0].widgets[1].toolTip() == "Custom step tooltip\nUnits: mm"
assert widget.kwarg_boxes[1].layout.itemAtPosition(0, 0).widget().text() == "Exp Time"
assert widget.kwarg_boxes[1].widgets[0].toolTip() == "Exposure time\nUnits: s"
def test_current_scan(scan_control, mocked_client):
current_scan = scan_control.current_scan
wrong_scan = "error_scan"
@@ -157,81 +157,3 @@ def test_arg_box(qtbot):
# Widget 2
assert arg_box.widgets[2].__class__.__name__ == "ScanSpinBox"
assert arg_box.widgets[2].arg_name
def test_spinbox_limits_from_scan_info(qtbot):
group_input = {
"name": "Kwarg Test",
"inputs": [
{
"arg": False,
"name": "exp_time",
"type": "float",
"display_name": "Exp Time",
"tooltip": "Exposure time in seconds",
"default": 2.0,
"expert": False,
"precision": 3,
"gt": 1.5,
"ge": None,
"lt": 5.0,
"le": None,
},
{
"arg": False,
"name": "num_points",
"type": "int",
"display_name": "Num Points",
"tooltip": "Number of points",
"default": 4,
"expert": False,
"gt": None,
"ge": 3,
"lt": 9,
"le": None,
},
{
"arg": False,
"name": "settling_time",
"type": "float",
"display_name": "Settling Time",
"tooltip": "Settling time in seconds",
"default": 0.5,
"expert": False,
"gt": None,
"ge": 0.2,
"lt": None,
"le": 3.5,
},
{
"arg": False,
"name": "steps",
"type": "int",
"display_name": "Steps",
"tooltip": "Number of steps",
"default": 4,
"expert": False,
"gt": 0,
"ge": None,
"lt": None,
"le": 10,
},
],
}
kwarg_box = ScanGroupBox(box_type="kwargs", config=group_input)
exp_time = kwarg_box.widgets[0]
num_points = kwarg_box.widgets[1]
settling_time = kwarg_box.widgets[2]
steps = kwarg_box.widgets[3]
assert exp_time.decimals() == 3
assert exp_time.minimum() == 1.501
assert exp_time.maximum() == 4.999
assert num_points.minimum() == 3
assert num_points.maximum() == 8
assert settling_time.minimum() == 0.2
assert settling_time.maximum() == 3.5
assert steps.minimum() == 1
assert steps.maximum() == 10