Compare commits

..

34 Commits

Author SHA1 Message Date
wyzula_j 775695589b wip formatter 2026-05-12 21:05:33 +02:00
wyzula_j 88f5bdfbea wip line edit like optional completer for Editable Comboboxes 2026-05-12 20:34:18 +02:00
wyzula_j b47ae8f917 wip scan group box correct filter 2026-05-12 20:16:24 +02:00
wyzula_j 6b7a094407 wip scan group box enfore 2026-05-12 20:10:25 +02:00
wyzula_j 3055a7e090 wip fix group scan box 2026-05-12 20:05:45 +02:00
wyzula_j 31e15675f7 fix: remove device/signal line edit and abstraction layer for combobox/lineEdit 2026-05-12 20:05:45 +02:00
semantic-release 1057db9d76 3.9.1
Automatically generated by python-semantic-release
2026-05-12 17:46:15 +00:00
wyzula_j be35e249f9 wip further opt 2026-05-12 19:45:22 +02:00
wyzula_j cdd833dfc2 tests(scan_control): tests extended and optimized 2026-05-12 19:45:22 +02:00
wyzula_j 3c7834b492 fix: logpanel fixture overwriting xread 2026-05-12 19:45:22 +02:00
wyzula_j acd35a2786 fix(scan_control): restore scan parameters from history are fetched on demand with button 2026-05-12 19:45:22 +02:00
semantic-release 108b249f1d 3.9.0
Automatically generated by python-semantic-release
2026-05-12 11:49:32 +00:00
wyzula_j 085f9fa271 fix: test bw-generate-cli 2026-05-12 13:48:43 +02:00
wyzula_j 79931faf55 fix(dock_area): icon fetching for toolbar import optimised 2026-05-12 13:48:43 +02:00
wyzula_j 6b3cebe9cb fix(jupyter_console_widget): widget_handler API fix 2026-05-12 13:48:43 +02:00
wakonig_k 5cc82425f0 feat: move to lazy widget import 2026-05-12 13:48:43 +02:00
wakonig_k bb1544ecb7 test: fix available scans endpoint operation 2026-05-11 13:08:04 +02:00
semantic-release 8ad0e46d98 3.8.1
Automatically generated by python-semantic-release
2026-05-11 09:36:33 +00:00
wakonig_k 9d92f8b53a fix(web_links): update documentation links in BECWebLinksMixin 2026-05-11 11:35:46 +02:00
semantic-release c1d5069a48 3.8.0
Automatically generated by python-semantic-release
2026-05-01 15:16:03 +00:00
wyzula_j 0b1f0b4c26 fix(dock_area): change to show_dialo=False for CLI profile baseline restore 2026-05-01 17:15:03 +02:00
wyzula_j cc825972c2 fix(dock_area): cli call load_profile has restore_baseline kwarg 2026-05-01 17:15:03 +02:00
wyzula_j 17865a2c33 feat(dock_area): add CLI restore current profile from baseline with optional confirmation dialog 2026-05-01 17:15:03 +02:00
semantic-release 0728811238 3.7.3
Automatically generated by python-semantic-release
2026-05-01 11:33:30 +00:00
wyzula_j 717d74b19e test(dock_area): remove low-value tests 2026-05-01 13:32:46 +02:00
wyzula_j dd32caf6e8 fix(dock_area): profile names changed, default->baseline, user->runtime 2026-05-01 13:32:46 +02:00
semantic-release 603edede9c 3.7.2
Automatically generated by python-semantic-release
2026-04-29 13:13:24 +00:00
copilot-swe-agent[bot] 30ef25533a fix(workspace-actions): use try/finally and restore previous blocked state in refresh_profiles
Agent-Logs-Url: https://github.com/bec-project/bec_widgets/sessions/004cb4bc-5015-485e-a803-1e63876b7024

Co-authored-by: wyzula-jan <133381102+wyzula-jan@users.noreply.github.com>
2026-04-29 15:12:35 +02:00
wyzula_j 73b44cffb2 fix(dock-area): avoid switching profile when saving new profile 2026-04-29 15:12:35 +02:00
wakonig_k a614d662d6 test: fix assertions after updating ophyd devices templates
Co-authored-by: Copilot <copilot@github.com>
2026-04-28 14:45:42 +02:00
wakonig_k 3f1aa80756 chore: update header comments in script files to indicate AI generation 2026-04-24 16:17:44 +02:00
wakonig_k 409c9e5bfa ci: increase threshold to 20 percent 2026-04-22 13:10:15 +02:00
wakonig_k 19b5c8f724 ci: fix benchmark upload 2026-04-22 13:10:15 +02:00
wyzula_j 5056ef8946 test: remove references to "scan_motors" in tests 2026-04-22 08:12:39 +02:00
69 changed files with 2598 additions and 3121 deletions
+4 -1
View File
@@ -1,4 +1,7 @@
#!/usr/bin/env python3
##########################
### AI-generated file. ###
##########################
"""Aggregate and merge benchmark JSON files.
The workflow runs the same benchmark suite on multiple independent runners.
+4 -1
View File
@@ -1,4 +1,7 @@
#!/usr/bin/env python3
##########################
### AI-generated file. ###
##########################
"""Compare benchmark JSON files and write a GitHub Actions summary.
The script supports JSON emitted by hyperfine, JSON emitted by pytest-benchmark,
+5
View File
@@ -1,4 +1,9 @@
#!/usr/bin/env bash
##########################
### AI-generated file. ###
##########################
set -euo pipefail
mkdir -p benchmark-results
+4 -1
View File
@@ -1,4 +1,7 @@
#!/usr/bin/env python3
##########################
### AI-generated file. ###
##########################
"""Run a command with BEC e2e services available."""
from __future__ import annotations
+9 -6
View File
@@ -1,6 +1,6 @@
name: BW Benchmarks
on: [workflow_call]
on: [ workflow_call ]
permissions:
contents: read
@@ -10,7 +10,7 @@ env:
BENCHMARK_BASELINE_JSON: gh-pages-benchmark-data/benchmarks/latest.json
BENCHMARK_SUMMARY: benchmark-results/summary.md
BENCHMARK_COMMAND: "bash .github/scripts/run_benchmarks.sh"
BENCHMARK_THRESHOLD_PERCENT: 10
BENCHMARK_THRESHOLD_PERCENT: 20
BENCHMARK_HIGHER_IS_BETTER: false
jobs:
@@ -25,7 +25,7 @@ jobs:
strategy:
fail-fast: false
matrix:
attempt: [1, 2, 3]
attempt: [ 1, 2, 3 ]
env:
BENCHMARK_JSON: benchmark-results/current-${{ matrix.attempt }}.json
@@ -84,7 +84,7 @@ jobs:
path: ${{ env.BENCHMARK_JSON }}
benchmark:
needs: [benchmark_attempt]
needs: [ benchmark_attempt ]
runs-on: ubuntu-latest
permissions:
contents: read
@@ -191,7 +191,7 @@ jobs:
run: exit 1
publish:
needs: [benchmark]
needs: [ benchmark ]
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
permissions:
@@ -208,7 +208,10 @@ jobs:
uses: actions/download-artifact@v4
with:
name: bw-benchmark-json
path: .
path: benchmark-results
- name: Verify aggregate benchmark artifact
run: test -s "$BENCHMARK_JSON"
- name: Prepare gh-pages for publishing
run: |
+119
View File
@@ -1,6 +1,125 @@
# CHANGELOG
## v3.9.1 (2026-05-12)
### Bug Fixes
- Logpanel fixture overwriting xread
([`3c7834b`](https://github.com/bec-project/bec_widgets/commit/3c7834b492a5d2da13689f58b20caf38dda9ac1d))
- **scan_control**: Restore scan parameters from history are fetched on demand with button
([`acd35a2`](https://github.com/bec-project/bec_widgets/commit/acd35a278660ce4962167af6237b5d12007f0774))
## v3.9.0 (2026-05-12)
### Bug Fixes
- Test bw-generate-cli
([`085f9fa`](https://github.com/bec-project/bec_widgets/commit/085f9fa271a0a8e339bff83f235011ac4a9d29ea))
- **dock_area**: Icon fetching for toolbar import optimised
([`79931fa`](https://github.com/bec-project/bec_widgets/commit/79931faf554fd0978c54d6562aa1b5fc4ab823b2))
- **jupyter_console_widget**: Widget_handler API fix
([`6b3cebe`](https://github.com/bec-project/bec_widgets/commit/6b3cebe9cbdb5c02ae2aa14b0f624a51c9c2ca4c))
### Features
- Move to lazy widget import
([`5cc8242`](https://github.com/bec-project/bec_widgets/commit/5cc82425f07d76e881ae59a121a3af77f227bfee))
### Testing
- Fix available scans endpoint operation
([`bb1544e`](https://github.com/bec-project/bec_widgets/commit/bb1544ecb70612267e2b03ba041c6f656789d63c))
## v3.8.1 (2026-05-11)
### Bug Fixes
- **web_links**: Update documentation links in BECWebLinksMixin
([`9d92f8b`](https://github.com/bec-project/bec_widgets/commit/9d92f8b53a6ffe57a9dffad797580228023bf6e1))
## v3.8.0 (2026-05-01)
### Bug Fixes
- **dock_area**: Change to show_dialo=False for CLI profile baseline restore
([`0b1f0b4`](https://github.com/bec-project/bec_widgets/commit/0b1f0b4c262ff31469b7114b9f00bf0a7b85e8f2))
- **dock_area**: Cli call load_profile has restore_baseline kwarg
([`cc82597`](https://github.com/bec-project/bec_widgets/commit/cc825972c202cd9ded32f8b2d1ce5f822c2ebdba))
### Features
- **dock_area**: Add CLI restore current profile from baseline with optional confirmation dialog
([`17865a2`](https://github.com/bec-project/bec_widgets/commit/17865a2c338a4a1f944659dde4ec05c25a8dd963))
## v3.7.3 (2026-05-01)
### Bug Fixes
- **dock_area**: Profile names changed, default->baseline, user->runtime
([`dd32caf`](https://github.com/bec-project/bec_widgets/commit/dd32caf6e815fd1922b6aae84d00decad9dbf869))
### Testing
- **dock_area**: Remove low-value tests
([`717d74b`](https://github.com/bec-project/bec_widgets/commit/717d74b19e8c6960209190c47ba32732ffaa0094))
## v3.7.2 (2026-04-29)
### Bug Fixes
- **dock-area**: Avoid switching profile when saving new profile
([`73b44cf`](https://github.com/bec-project/bec_widgets/commit/73b44cffb219347cacb609f3b93068eda6701b42))
- **workspace-actions**: Use try/finally and restore previous blocked state in refresh_profiles
([`30ef255`](https://github.com/bec-project/bec_widgets/commit/30ef25533af9df5a9bd9e69808dc49fdf22f4318))
Agent-Logs-Url:
https://github.com/bec-project/bec_widgets/sessions/004cb4bc-5015-485e-a803-1e63876b7024
Co-authored-by: wyzula-jan <133381102+wyzula-jan@users.noreply.github.com>
### Build System
- Add pytest-benchmark dependency
([`551d38d`](https://github.com/bec-project/bec_widgets/commit/551d38d90111361e64bfcf10a1409be71dd298bc))
### Chores
- Update header comments in script files to indicate AI generation
([`3f1aa80`](https://github.com/bec-project/bec_widgets/commit/3f1aa80756368454c3631cb6cf9d29db28af177a))
### Continuous Integration
- Add benchmark workflow
([`999b7a2`](https://github.com/bec-project/bec_widgets/commit/999b7a2321f2f222c04b056a2db4280f66de9c48))
- Fix benchmark upload
([`19b5c8f`](https://github.com/bec-project/bec_widgets/commit/19b5c8f724dbdb7421957c290f9213bf072392df))
- Increase threshold to 20 percent
([`409c9e5`](https://github.com/bec-project/bec_widgets/commit/409c9e5bfacdfc003f1bc8c9944f01798bd3818e))
### Testing
- Fix assertions after updating ophyd devices templates
([`a614d66`](https://github.com/bec-project/bec_widgets/commit/a614d662d6da716a49afb4ed3a3903f108210386))
Co-authored-by: Copilot <copilot@github.com>
- Remove references to "scan_motors" in tests
([`5056ef8`](https://github.com/bec-project/bec_widgets/commit/5056ef8946d03a20e802709d3fe81c84c195fe41))
## v3.7.1 (2026-04-21)
### Bug Fixes
+46 -38
View File
@@ -340,10 +340,10 @@ class BECDockArea(RPCBase):
Save the current workspace profile.
On first save of a given name:
- writes a default copy to states/default/<name>.ini with tag=default and created_at
- writes a user copy to states/user/<name>.ini with tag=user and created_at
On subsequent saves of user-owned profiles:
- updates both the default and user copies so restore uses the latest snapshot.
- writes a baseline copy to profiles/baseline/<name>.ini with created_at
- writes a runtime copy to profiles/runtime/<name>.ini with created_at
On subsequent saves:
- updates both the baseline and runtime copies so restore uses the latest snapshot.
Read-only bundled profiles cannot be overwritten.
Args:
@@ -358,15 +358,31 @@ class BECDockArea(RPCBase):
@rpc_timeout(None)
@rpc_call
def load_profile(self, name: "str | None" = None):
def load_profile(self, name: "str | None" = None, restore_baseline: "bool" = False):
"""
Load a workspace profile.
Before switching, persist the current profile to the user copy.
Prefer loading the user copy; fall back to the default copy.
Before switching, persist the current profile to the runtime copy.
Prefer loading the runtime copy; fall back to the baseline copy. When
``restore_baseline`` is True, first overwrite the runtime copy with the
baseline profile and then load it.
Args:
name (str | None): The name of the profile to load. If None, prompts the user.
restore_baseline (bool): If True, restore the runtime copy from the
baseline before loading. Defaults to False.
"""
@rpc_timeout(None)
@rpc_call
def restore_baseline_profile(self, name: "str | None" = None, show_dialog: "bool" = False):
"""
Overwrite the runtime copy of *name* with the baseline.
If *name* is None, target the currently active profile.
Args:
name (str | None): The name of the profile to restore. If None, uses the current profile.
show_dialog (bool): If True, ask for confirmation before restoring.
"""
@rpc_call
@@ -1115,30 +1131,6 @@ class DeviceInitializationProgressBar(RPCBase):
"""
class DeviceInputBase(RPCBase):
"""Mixin base class for device input widgets."""
_IMPORT_MODULE = "bec_widgets.widgets.control.device_input.base_classes.device_input_base"
@rpc_call
def remove(self):
"""
Cleanup the BECConnector
"""
@rpc_call
def attach(self):
"""
None
"""
@rpc_call
def detach(self):
"""
Detach the widget from its parent dock widget (if widget is in the dock), making it a floating widget.
"""
class DeviceManagerView(RPCBase):
"""A view for users to manage devices within the application."""
@@ -1348,10 +1340,10 @@ class DockAreaView(RPCBase):
Save the current workspace profile.
On first save of a given name:
- writes a default copy to states/default/<name>.ini with tag=default and created_at
- writes a user copy to states/user/<name>.ini with tag=user and created_at
On subsequent saves of user-owned profiles:
- updates both the default and user copies so restore uses the latest snapshot.
- writes a baseline copy to profiles/baseline/<name>.ini with created_at
- writes a runtime copy to profiles/runtime/<name>.ini with created_at
On subsequent saves:
- updates both the baseline and runtime copies so restore uses the latest snapshot.
Read-only bundled profiles cannot be overwritten.
Args:
@@ -1366,15 +1358,31 @@ class DockAreaView(RPCBase):
@rpc_timeout(None)
@rpc_call
def load_profile(self, name: "str | None" = None):
def load_profile(self, name: "str | None" = None, restore_baseline: "bool" = False):
"""
Load a workspace profile.
Before switching, persist the current profile to the user copy.
Prefer loading the user copy; fall back to the default copy.
Before switching, persist the current profile to the runtime copy.
Prefer loading the runtime copy; fall back to the baseline copy. When
``restore_baseline`` is True, first overwrite the runtime copy with the
baseline profile and then load it.
Args:
name (str | None): The name of the profile to load. If None, prompts the user.
restore_baseline (bool): If True, restore the runtime copy from the
baseline before loading. Defaults to False.
"""
@rpc_timeout(None)
@rpc_call
def restore_baseline_profile(self, name: "str | None" = None, show_dialog: "bool" = False):
"""
Overwrite the runtime copy of *name* with the baseline.
If *name* is None, target the currently active profile.
Args:
name (str | None): The name of the profile to restore. If None, uses the current profile.
show_dialog (bool): If True, ask for confirmation before restoring.
"""
@rpc_call
+11 -30
View File
@@ -26,9 +26,7 @@ if TYPE_CHECKING: # pragma: no cover
import bec_widgets.cli.client as client
else:
GUIRegistryStateMessage = lazy_import_from(
"bec_lib.messages", "GUIRegistryStateMessage"
)
GUIRegistryStateMessage = lazy_import_from("bec_lib.messages", "GUIRegistryStateMessage")
client = lazy_import("bec_widgets.cli.client")
@@ -201,9 +199,7 @@ class AvailableWidgetsNamespace:
for attr_name, _ in self.__dict__.items():
docs = getattr(client, attr_name).__doc__
docs = docs if docs else "No description available"
table.add_row(
attr_name, docs if len(docs.strip()) > 0 else "No description available"
)
table.add_row(attr_name, docs if len(docs.strip()) > 0 else "No description available")
console.print(table)
return ""
@@ -234,9 +230,7 @@ class BECGuiClient(RPCBase):
@property
def launcher(self) -> RPCBase:
"""The launcher object."""
return RPCBase(
gui_id=f"{self._gui_id}:launcher", parent=self, object_name="launcher"
)
return RPCBase(gui_id=f"{self._gui_id}:launcher", parent=self, object_name="launcher")
def _safe_register_stream(self, endpoint: EndpointInfo, cb: Callable, **kwargs):
"""Check if already registered for registration in idempotent functions."""
@@ -247,8 +241,7 @@ class BECGuiClient(RPCBase):
"""Connect to a GUI server"""
# Unregister the old callback
self._client.connector.unregister(
MessageEndpoints.gui_registry_state(self._gui_id),
cb=self._handle_registry_update,
MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update
)
self._gui_id = gui_id
@@ -408,9 +401,7 @@ class BECGuiClient(RPCBase):
and "has no attribute 'system.launch_dock_area'" not in error
):
raise
logger.debug(
"Server does not support system.launch_dock_area; using launcher RPC"
)
logger.debug("Server does not support system.launch_dock_area; using launcher RPC")
return self.launcher._run_rpc(
"launch",
@@ -471,8 +462,7 @@ class BECGuiClient(RPCBase):
# Unregister the registry state
self._client.connector.unregister(
MessageEndpoints.gui_registry_state(self._gui_id),
cb=self._handle_registry_update,
MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update
)
# Remove all reference from top level
self._top_level.clear()
@@ -535,15 +525,9 @@ class BECGuiClient(RPCBase):
finally:
threading.current_thread().cancel() # type: ignore
def check_gui_started_and_continue():
if self._gui_is_alive():
gui_started_callback(self._gui_post_startup)
elif self._process.poll():
logger.error(
f"GUI process failed to start with: {self._process.communicate()}"
)
self._gui_started_timer = RepeatTimer(0.5, check_gui_started_and_continue)
self._gui_started_timer = RepeatTimer(
0.5, lambda: self._gui_is_alive() and gui_started_callback(self._gui_post_startup)
)
self._gui_started_timer.start()
if wait:
@@ -552,8 +536,7 @@ class BECGuiClient(RPCBase):
def _start(self, wait: bool = False) -> None:
self._killed = False
self._safe_register_stream(
MessageEndpoints.gui_registry_state(self._gui_id),
cb=self._handle_registry_update,
MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update
)
return self._start_server(wait=wait)
@@ -620,9 +603,7 @@ class BECGuiClient(RPCBase):
self._ipython_registry.pop(gui_id)
removed_widgets = [
widget.object_name
for widget in self._top_level.values()
if widget._is_deleted()
widget.object_name for widget in self._top_level.values() if widget._is_deleted()
]
for widget_name in removed_widgets:
+161
View File
@@ -0,0 +1,161 @@
# This file was automatically generated by generate_cli.py
# type: ignore
from __future__ import annotations
# pylint: skip-file
designer_plugins = {
"AbortButton": ("bec_widgets.widgets.control.buttons.button_abort.button_abort", "AbortButton"),
"BECColorMapWidget": (
"bec_widgets.widgets.utility.visual.colormap_widget.colormap_widget",
"BECColorMapWidget",
),
"BECMainWindow": ("bec_widgets.widgets.containers.main_window.main_window", "BECMainWindow"),
"BECProgressBar": (
"bec_widgets.widgets.progress.bec_progressbar.bec_progressbar",
"BECProgressBar",
),
"BECQueue": ("bec_widgets.widgets.services.bec_queue.bec_queue", "BECQueue"),
"BECShell": ("bec_widgets.widgets.editors.bec_console.bec_console", "BECShell"),
"BECSpinBox": ("bec_widgets.widgets.utility.spinbox.decimal_spinbox", "BECSpinBox"),
"BECStatusBox": ("bec_widgets.widgets.services.bec_status_box.bec_status_box", "BECStatusBox"),
"BecConsole": ("bec_widgets.widgets.editors.bec_console.bec_console", "BecConsole"),
"ColorButton": ("bec_widgets.widgets.utility.visual.color_button.color_button", "ColorButton"),
"ColorButtonNative": (
"bec_widgets.widgets.utility.visual.color_button_native.color_button_native",
"ColorButtonNative",
),
"ColormapSelector": (
"bec_widgets.widgets.utility.visual.colormap_selector.colormap_selector",
"ColormapSelector",
),
"DapComboBox": ("bec_widgets.widgets.dap.dap_combo_box.dap_combo_box", "DapComboBox"),
"DarkModeButton": (
"bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button",
"DarkModeButton",
),
"DeviceBrowser": (
"bec_widgets.widgets.services.device_browser.device_browser",
"DeviceBrowser",
),
"DeviceComboBox": (
"bec_widgets.widgets.control.device_input.device_combobox.device_combobox",
"DeviceComboBox",
),
"Heatmap": ("bec_widgets.widgets.plots.heatmap.heatmap", "Heatmap"),
"IDEExplorer": ("bec_widgets.widgets.utility.ide_explorer.ide_explorer", "IDEExplorer"),
"Image": ("bec_widgets.widgets.plots.image.image", "Image"),
"LMFitDialog": ("bec_widgets.widgets.dap.lmfit_dialog.lmfit_dialog", "LMFitDialog"),
"LogPanel": ("bec_widgets.widgets.utility.logpanel.logpanel", "LogPanel"),
"Minesweeper": ("bec_widgets.widgets.games.minesweeper", "Minesweeper"),
"MonacoWidget": ("bec_widgets.widgets.editors.monaco.monaco_widget", "MonacoWidget"),
"MotorMap": ("bec_widgets.widgets.plots.motor_map.motor_map", "MotorMap"),
"MultiWaveform": ("bec_widgets.widgets.plots.multi_waveform.multi_waveform", "MultiWaveform"),
"PdfViewerWidget": ("bec_widgets.widgets.utility.pdf_viewer.pdf_viewer", "PdfViewerWidget"),
"PositionIndicator": (
"bec_widgets.widgets.control.device_control.position_indicator.position_indicator",
"PositionIndicator",
),
"PositionerBox": (
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box.positioner_box",
"PositionerBox",
),
"PositionerBox2D": (
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box_2d.positioner_box_2d",
"PositionerBox2D",
),
"PositionerControlLine": (
"bec_widgets.widgets.control.device_control.positioner_box.positioner_control_line.positioner_control_line",
"PositionerControlLine",
),
"PositionerGroup": (
"bec_widgets.widgets.control.device_control.positioner_group.positioner_group",
"PositionerGroup",
),
"ResetButton": ("bec_widgets.widgets.control.buttons.button_reset.button_reset", "ResetButton"),
"ResumeButton": (
"bec_widgets.widgets.control.buttons.button_resume.button_resume",
"ResumeButton",
),
"RingProgressBar": (
"bec_widgets.widgets.progress.ring_progress_bar.ring_progress_bar",
"RingProgressBar",
),
"SBBMonitor": ("bec_widgets.widgets.editors.sbb_monitor.sbb_monitor", "SBBMonitor"),
"ScanControl": ("bec_widgets.widgets.control.scan_control.scan_control", "ScanControl"),
"ScanMetadata": ("bec_widgets.widgets.editors.scan_metadata.scan_metadata", "ScanMetadata"),
"ScanProgressBar": (
"bec_widgets.widgets.progress.scan_progressbar.scan_progressbar",
"ScanProgressBar",
),
"ScatterWaveform": (
"bec_widgets.widgets.plots.scatter_waveform.scatter_waveform",
"ScatterWaveform",
),
"SignalComboBox": (
"bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox",
"SignalComboBox",
),
"SignalLabel": ("bec_widgets.widgets.utility.signal_label.signal_label", "SignalLabel"),
"SpinnerWidget": ("bec_widgets.widgets.utility.spinner.spinner", "SpinnerWidget"),
"StopButton": ("bec_widgets.widgets.control.buttons.stop_button.stop_button", "StopButton"),
"TextBox": ("bec_widgets.widgets.editors.text_box.text_box", "TextBox"),
"ToggleSwitch": ("bec_widgets.widgets.utility.toggle.toggle", "ToggleSwitch"),
"Waveform": ("bec_widgets.widgets.plots.waveform.waveform", "Waveform"),
"WebsiteWidget": ("bec_widgets.widgets.editors.website.website", "WebsiteWidget"),
"WidgetFinderComboBox": (
"bec_widgets.widgets.utility.widget_finder.widget_finder",
"WidgetFinderComboBox",
),
}
widget_icons = {
"AbortButton": "cancel",
"BECColorMapWidget": "palette",
"BECMainWindow": "widgets",
"BECProgressBar": "page_control",
"BECQueue": "edit_note",
"BECShell": "hub",
"BECSpinBox": "123",
"BECStatusBox": "widgets",
"BecConsole": "terminal",
"ColorButton": "colors",
"ColorButtonNative": "colors",
"ColormapSelector": "palette",
"DapComboBox": "data_exploration",
"DarkModeButton": "dark_mode",
"DeviceBrowser": "lists",
"DeviceComboBox": "list_alt",
"Heatmap": "dataset",
"IDEExplorer": "widgets",
"Image": "image",
"LMFitDialog": "monitoring",
"LogPanel": "browse_activity",
"Minesweeper": "videogame_asset",
"MonacoWidget": "code",
"MotorMap": "my_location",
"MultiWaveform": "ssid_chart",
"PdfViewerWidget": "picture_as_pdf",
"PositionIndicator": "horizontal_distribute",
"PositionerBox": "switch_right",
"PositionerBox2D": "switch_right",
"PositionerControlLine": "switch_left",
"PositionerGroup": "grid_view",
"ResetButton": "restart_alt",
"ResumeButton": "resume",
"RingProgressBar": "track_changes",
"SBBMonitor": "train",
"ScanControl": "tune",
"ScanMetadata": "list_alt",
"ScanProgressBar": "timelapse",
"ScatterWaveform": "scatter_plot",
"SignalComboBox": "list_alt",
"SignalLabel": "scoreboard",
"SpinnerWidget": "progress_activity",
"StopButton": "dangerous",
"TextBox": "chat",
"ToggleSwitch": "toggle_on",
"Waveform": "show_chart",
"WebsiteWidget": "travel_explore",
"WidgetFinderComboBox": "frame_inspect",
}
@@ -206,7 +206,6 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
def _populate_registry_widgets(self):
try:
widget_handler.update_available_widgets()
items = sorted(widget_handler.widget_classes.keys())
except Exception as exc:
print(f"Failed to load registered widgets: {exc}")
@@ -335,20 +334,13 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
If kwargs does not contain `object_name`, it will default to the provided shortcut.
"""
# Ensure registry is loaded
widget_handler.update_available_widgets()
cls = widget_handler.widget_classes.get(widget_type)
if cls is None:
raise ValueError(f"Unknown registered widget type: {widget_type}")
if kwargs is None:
kwargs = {"object_name": shortcut}
else:
kwargs = dict(kwargs)
kwargs.setdefault("object_name", shortcut)
# Instantiate and add
widget = cls(**kwargs)
widget = widget_handler.create_widget(widget_type, **kwargs)
if not isinstance(widget, QWidget):
raise TypeError(
f"Instantiated object for type '{widget_type}' is not a QWidget: {type(widget)}"
+56 -4
View File
@@ -4,6 +4,7 @@ import importlib.metadata
import inspect
import pkgutil
import traceback
from functools import lru_cache
from importlib import util as importlib_util
from importlib.machinery import FileFinder, ModuleSpec, SourceFileLoader
from types import ModuleType
@@ -11,7 +12,11 @@ from typing import Generator
from bec_lib.logger import bec_logger
from bec_widgets.utils.plugin_utils import BECClassContainer, BECClassInfo
from bec_widgets.utils.plugin_utils import (
BECClassContainer,
BECClassInfo,
rpc_widget_registry_from_source,
)
logger = bec_logger.logger
@@ -53,6 +58,14 @@ def _submodule_by_name(module: ModuleType, name: str):
return None
def _submodule_spec_by_name(module: ModuleType, name: str) -> ModuleSpec | None:
for module_info in pkgutil.iter_modules(module.__path__):
if module_info.name != name or not isinstance(module_info.module_finder, FileFinder):
continue
return module_info.module_finder.find_spec(module_info.name)
return None
def _get_widgets_from_module(module: ModuleType) -> BECClassContainer:
"""Find any BECWidget subclasses in the given module and return them with their info."""
from bec_widgets.utils.bec_widget import BECWidget # avoid circular import
@@ -90,16 +103,55 @@ def get_plugin_client_module() -> ModuleType | None:
return _submodule_by_name(plugin, "client") if (plugin := user_widget_plugin()) else None
def get_plugin_designer_module() -> ModuleType | None:
"""If there is a plugin repository installed, return the designer module."""
return (
_submodule_by_name(plugin, "designer_plugins") if (plugin := user_widget_plugin()) else None
)
@lru_cache
def get_plugin_rpc_widget_registry() -> dict[str, tuple[str, str]]:
"""If there is a plugin repository installed, return the RPC widget registry."""
plugin = user_widget_plugin()
if plugin is None:
return {}
client_spec = _submodule_spec_by_name(plugin, "client")
if client_spec is not None and client_spec.origin:
try:
return rpc_widget_registry_from_source(client_spec.origin)
except (OSError, SyntaxError) as exc:
logger.warning(f"Could not parse plugin RPC widget registry: {exc}")
client_module = get_plugin_client_module()
if client_module is None:
return {}
registry = {}
for plugin_name, plugin_class in inspect.getmembers(client_module, inspect.isclass):
if hasattr(plugin_class, "_IMPORT_MODULE"):
registry[plugin_name] = (plugin_class._IMPORT_MODULE, plugin_class.__name__)
return registry
@lru_cache
def get_plugin_designer_registry() -> dict[str, tuple[str, str]]:
"""If there is a plugin repository installed, return the designer plugin registry."""
designer_module = get_plugin_designer_module()
if designer_module and hasattr(designer_module, "designer_plugins"):
return designer_module.designer_plugins
return {}
def get_all_plugin_widgets() -> BECClassContainer:
"""If there is a plugin repository installed, load all widgets from it."""
if plugin := user_widget_plugin():
return _all_widgets_from_all_submods(plugin)
else:
return BECClassContainer()
return BECClassContainer()
if __name__ == "__main__": # pragma: no cover
widgets = get_plugin_rpc_widget_registry()
client = get_plugin_client_module()
print(get_all_plugin_widgets())
...
+57 -321
View File
@@ -1,12 +1,9 @@
"""Module for handling filter I/O operations in BEC Widgets for input fields.
These operations include filtering device/signal names and/or device types.
"""
"""Small helpers for populating editable combo boxes used by device inputs."""
from abc import ABC, abstractmethod
from __future__ import annotations
from bec_lib.logger import bec_logger
from qtpy.QtCore import QStringListModel
from qtpy.QtWidgets import QComboBox, QCompleter, QLineEdit
from qtpy.QtWidgets import QComboBox
from typeguard import TypeCheckError
from bec_widgets.utils.ophyd_kind_util import Kind
@@ -14,329 +11,68 @@ from bec_widgets.utils.ophyd_kind_util import Kind
logger = bec_logger.logger
class WidgetFilterHandler(ABC):
"""Abstract base class for widget filter handlers"""
@abstractmethod
def set_selection(self, widget, selection: list[str | tuple]) -> None:
"""Set the filtered_selection for the widget
Args:
widget: Widget instance
selection (list[str | tuple]): Filtered selection of items.
If tuple, it contains (text, data) pairs.
"""
@abstractmethod
def check_input(self, widget, text: str) -> bool:
"""Check if the input text is in the filtered selection
Args:
widget: Widget instance
text (str): Input text
Returns:
bool: True if the input text is in the filtered selection
"""
@abstractmethod
def update_with_kind(
self, kind: Kind, signal_filter: set, device_info: dict, device_name: str
) -> list[str | tuple]:
"""Update the selection based on the kind of signal.
Args:
kind (Kind): The kind of signal to filter.
signal_filter (set): Set of signal kinds to filter.
device_info (dict): Dictionary containing device information.
device_name (str): Name of the device.
Returns:
list[str | tuple]: A list of filtered signals based on the kind.
"""
# This method should be implemented in subclasses or extended as needed
def update_with_bec_signal_class(
self,
signal_class_filter: str | list[str],
client,
ndim_filter: int | list[int] | None = None,
) -> list[tuple[str, str, dict]]:
"""Update the selection based on signal classes using device_manager.get_bec_signals.
Args:
signal_class_filter (str|list[str]): List of signal class names to filter.
client: BEC client instance.
ndim_filter (int | list[int] | None): Filter signals by dimensionality.
If provided, only signals with matching ndim will be included.
Returns:
list[tuple[str, str, dict]]: A list of (device_name, signal_name, signal_config) tuples.
"""
if not client or not hasattr(client, "device_manager"):
return []
try:
signals = client.device_manager.get_bec_signals(signal_class_filter)
except TypeCheckError as e:
logger.warning(f"Error retrieving signals: {e}")
return []
if ndim_filter is None:
return signals
if isinstance(ndim_filter, int):
ndim_filter = [ndim_filter]
filtered_signals = []
for device_name, signal_name, signal_config in signals:
ndim = None
if isinstance(signal_config, dict):
ndim = signal_config.get("describe", {}).get("signal_info", {}).get("ndim")
if ndim in ndim_filter:
filtered_signals.append((device_name, signal_name, signal_config))
return filtered_signals
def replace_combobox_items(combo_box: QComboBox, items: list[str | tuple]) -> None:
"""Replace all combobox entries with strings or ``(text, data)`` tuples."""
combo_box.clear()
for item in items:
if isinstance(item, str):
combo_box.addItem(item)
else:
combo_box.addItem(*item)
class LineEditFilterHandler(WidgetFilterHandler):
"""Handler for QLineEdit widget"""
def set_selection(self, widget: QLineEdit, selection: list[str | tuple]) -> None:
"""Set the selection for the widget to the completer model
Args:
widget (QLineEdit): The QLineEdit widget
selection (list[str | tuple]): Filtered selection of items. If tuple, it contains (text, data) pairs.
"""
if isinstance(selection, tuple):
# If selection is a tuple, it contains (text, data) pairs
selection = [text for text, _ in selection]
if not isinstance(widget.completer, QCompleter):
completer = QCompleter(widget)
widget.setCompleter(completer)
widget.completer.setModel(QStringListModel(selection, widget))
def check_input(self, widget: QLineEdit, text: str) -> bool:
"""Check if the input text is in the filtered selection
Args:
widget (QLineEdit): The QLineEdit widget
text (str): Input text
Returns:
bool: True if the input text is in the filtered selection
"""
model = widget.completer.model()
model_data = [model.data(model.index(i)) for i in range(model.rowCount())]
return text in model_data
def update_with_kind(
self, kind: Kind, signal_filter: set, device_info: dict, device_name: str
) -> list[str | tuple]:
"""Update the selection based on the kind of signal.
Args:
kind (Kind): The kind of signal to filter.
signal_filter (set): Set of signal kinds to filter.
device_info (dict): Dictionary containing device information.
device_name (str): Name of the device.
Returns:
list[str | tuple]: A list of filtered signals based on the kind.
"""
return [
signal
for signal, signal_info in device_info.items()
if kind in signal_filter and (signal_info.get("kind_str", None) == str(kind.name))
]
def combobox_contains_text(combo_box: QComboBox, text: str) -> bool:
"""Return whether *text* is present as visible combobox text."""
return any(combo_box.itemText(i) == text for i in range(combo_box.count()))
class ComboBoxFilterHandler(WidgetFilterHandler):
"""Handler for QComboBox widget"""
def signal_items_for_kind(
*, kind: Kind, signal_filter: set[Kind], device_info: dict, device_name: str
) -> list[tuple[str, dict]]:
"""Build display entries for signals matching a BEC signal kind."""
items: list[tuple[str, dict]] = []
for signal_name, signal_info in device_info.items():
if kind not in signal_filter or signal_info.get("kind_str") != kind.name:
continue
def set_selection(self, widget: QComboBox, selection: list[str | tuple]) -> None:
"""Set the selection for the widget to the completer model
obj_name = signal_info.get("obj_name", "")
component_name = signal_info.get("component_name", "")
signal_without_device = obj_name.removeprefix(f"{device_name}_")
if not signal_without_device:
signal_without_device = obj_name
Args:
widget (QComboBox): The QComboBox widget
selection (list[str | tuple]): Filtered selection of items. If tuple, it contains (text, data) pairs.
"""
widget.clear()
if len(selection) == 0:
return
for element in selection:
if isinstance(element, str):
widget.addItem(element)
elif isinstance(element, tuple):
# If element is a tuple, it contains (text, data) pairs
widget.addItem(*element)
def check_input(self, widget: QComboBox, text: str) -> bool:
"""Check if the input text is in the filtered selection
Args:
widget (QComboBox): The QComboBox widget
text (str): Input text
Returns:
bool: True if the input text is in the filtered selection
"""
return text in [widget.itemText(i) for i in range(widget.count())]
def update_with_kind(
self, kind: Kind, signal_filter: set, device_info: dict, device_name: str
) -> list[str | tuple]:
"""Update the selection based on the kind of signal.
Args:
kind (Kind): The kind of signal to filter.
signal_filter (set): Set of signal kinds to filter.
device_info (dict): Dictionary containing device information.
device_name (str): Name of the device.
Returns:
list[str | tuple]: A list of filtered signals based on the kind.
"""
out = []
for signal, signal_info in device_info.items():
if kind not in signal_filter or (signal_info.get("kind_str", None) != str(kind.name)):
continue
obj_name = signal_info.get("obj_name", "")
component_name = signal_info.get("component_name", "")
signal_wo_device = obj_name.removeprefix(f"{device_name}_")
if not signal_wo_device:
signal_wo_device = obj_name
if signal_wo_device != signal and component_name.replace(".", "_") != signal_wo_device:
# If the object name is not the same as the signal name, we use the object name
# to display in the combobox.
out.append((f"{signal_wo_device} ({signal})", signal_info))
else:
# If the object name is the same as the signal name, we do not change it.
out.append((signal, signal_info))
return out
if (
signal_without_device != signal_name
and component_name.replace(".", "_") != signal_without_device
):
items.append((f"{signal_without_device} ({signal_name})", signal_info))
else:
items.append((signal_name, signal_info))
return items
class FilterIO:
"""Public interface to set filters for input widgets.
It supports the list of widgets stored in class attribute _handlers.
"""
def get_bec_signals_for_classes(
*, client, signal_class_filter: str | list[str], ndim_filter: int | list[int] | None = None
) -> list[tuple[str, str, dict]]:
"""Return BEC signals filtered by signal class and optional dimensionality."""
if not client or not hasattr(client, "device_manager"):
return []
_handlers = {QLineEdit: LineEditFilterHandler, QComboBox: ComboBoxFilterHandler}
try:
signals = client.device_manager.get_bec_signals(signal_class_filter)
except TypeCheckError as exc:
logger.warning(f"Error retrieving signals: {exc}")
return []
@staticmethod
def set_selection(widget, selection: list[str | tuple], ignore_errors=True):
"""
Retrieve value from the widget instance.
if ndim_filter is None:
return signals
Args:
widget: Widget instance.
selection (list[str | tuple]): Filtered selection of items.
If tuple, it contains (text, data) pairs.
ignore_errors(bool, optional): Whether to ignore if no handler is found.
"""
handler_class = FilterIO._find_handler(widget)
if handler_class:
return handler_class().set_selection(widget=widget, selection=selection)
if not ignore_errors:
raise ValueError(
f"No matching handler for widget type: {type(widget)} in handler list {FilterIO._handlers}"
)
return None
@staticmethod
def check_input(widget, text: str, ignore_errors=True):
"""
Check if the input text is in the filtered selection.
Args:
widget: Widget instance.
text(str): Input text.
ignore_errors(bool, optional): Whether to ignore if no handler is found.
Returns:
bool: True if the input text is in the filtered selection.
"""
handler_class = FilterIO._find_handler(widget)
if handler_class:
return handler_class().check_input(widget=widget, text=text)
if not ignore_errors:
raise ValueError(
f"No matching handler for widget type: {type(widget)} in handler list {FilterIO._handlers}"
)
return None
@staticmethod
def update_with_kind(
widget, kind: Kind, signal_filter: set, device_info: dict, device_name: str
) -> list[str | tuple]:
"""
Update the selection based on the kind of signal.
Args:
widget: Widget instance.
kind (Kind): The kind of signal to filter.
signal_filter (set): Set of signal kinds to filter.
device_info (dict): Dictionary containing device information.
device_name (str): Name of the device.
Returns:
list[str | tuple]: A list of filtered signals based on the kind.
"""
handler_class = FilterIO._find_handler(widget)
if handler_class:
return handler_class().update_with_kind(
kind=kind,
signal_filter=signal_filter,
device_info=device_info,
device_name=device_name,
)
raise ValueError(
f"No matching handler for widget type: {type(widget)} in handler list {FilterIO._handlers}"
)
@staticmethod
def update_with_signal_class(
widget, signal_class_filter: list[str], client, ndim_filter: int | list[int] | None = None
) -> list[tuple[str, str, dict]]:
"""
Update the selection based on signal classes using device_manager.get_bec_signals.
Args:
widget: Widget instance.
signal_class_filter (list[str]): List of signal class names to filter.
client: BEC client instance.
ndim_filter (int | list[int] | None): Filter signals by dimensionality.
If provided, only signals with matching ndim will be included.
Returns:
list[tuple[str, str, dict]]: A list of (device_name, signal_name, signal_config) tuples.
"""
handler_class = FilterIO._find_handler(widget)
if handler_class:
return handler_class().update_with_bec_signal_class(
signal_class_filter=signal_class_filter, client=client, ndim_filter=ndim_filter
)
raise ValueError(
f"No matching handler for widget type: {type(widget)} in handler list {FilterIO._handlers}"
)
@staticmethod
def _find_handler(widget):
"""
Find the appropriate handler for the widget by checking its base classes.
Args:
widget: Widget instance.
Returns:
handler_class: The handler class if found, otherwise None.
"""
for base in type(widget).__mro__:
if base in FilterIO._handlers:
return FilterIO._handlers[base]
return None
accepted_ndim = [ndim_filter] if isinstance(ndim_filter, int) else ndim_filter
filtered_signals: list[tuple[str, str, dict]] = []
for device_name, signal_name, signal_config in signals:
ndim = None
if isinstance(signal_config, dict):
ndim = signal_config.get("describe", {}).get("signal_info", {}).get("ndim")
if ndim in accepted_ndim:
filtered_signals.append((device_name, signal_name, signal_config))
return filtered_signals
+69 -2
View File
@@ -14,7 +14,11 @@ import isort
from bec_lib.logger import bec_logger
from qtpy.QtCore import Property as QtProperty
from bec_widgets.utils.generate_designer_plugin import DesignerPluginGenerator, plugin_filenames
from bec_widgets.utils.generate_designer_plugin import (
DesignerPluginGenerator,
DesignerPluginInfo,
plugin_filenames,
)
from bec_widgets.utils.plugin_utils import BECClassContainer, get_custom_classes
logger = bec_logger.logger
@@ -250,6 +254,58 @@ class {class_name}(RPCBase):\n"""
file.write(formatted_content)
def write_designer_plugins(plugin_infos: list[DesignerPluginInfo], file_name: str):
"""
Write a registry of Qt widget classes with designer plugins.
Args:
plugin_infos(list[DesignerPluginInfo]): The designer plugin metadata to write.
file_name(str): The name of the file to write to.
"""
plugin_infos = sorted(plugin_infos, key=lambda info: info.plugin_name_pascal)
content = """# This file was automatically generated by generate_cli.py
# type: ignore
from __future__ import annotations
# pylint: skip-file
designer_plugins = {
"""
for info in plugin_infos:
widget_module = info.plugin_class.__module__
widget_class = info.plugin_name_pascal
content += f' "{info.plugin_name_pascal}": ("{widget_module}", "{widget_class}"),\n'
content += """
}
widget_icons = {
"""
for info in plugin_infos:
content += f' "{info.plugin_name_pascal}": "{info.icon_name}",\n'
content += """
}
"""
try:
formatted_content = black.format_str(content, mode=black.Mode(line_length=100))
except black.NothingChanged:
formatted_content = content
config = isort.Config(
profile="black",
line_length=100,
multi_line_output=3,
include_trailing_comma=False,
known_first_party=["bec_widgets"],
)
formatted_content = isort.code(formatted_content, config=config)
with open(file_name, "w", encoding="utf-8") as file:
file.write(formatted_content)
def main():
"""
Main entry point for the script, controlled by command line arguments.
@@ -303,6 +359,8 @@ def main():
else:
non_overwrite_classes = []
designer_plugin_infos = []
for cls in rpc_classes.plugins:
logger.info(f"Writing bec-designer plugin files for {cls.__name__}...")
@@ -310,21 +368,30 @@ def main():
logger.error(
f"Not writing plugin files for {cls.__name__} because a built-in widget with that name exists"
)
continue
plugin = DesignerPluginGenerator(cls)
if not hasattr(plugin, "info"):
if not hasattr(plugin, "info") or plugin.excluded:
continue
def _exists(file: str):
return os.path.exists(os.path.join(plugin.info.base_path, file))
if any(_exists(file) for file in plugin_filenames(plugin.info.plugin_name_snake)):
if _exists(plugin.filenames.plugin):
designer_plugin_infos.append(plugin.info)
logger.debug(
f"Skipping generation of extra plugin files for {plugin.info.plugin_name_snake} - at least one file out of 'plugin.py', 'pyproject', and 'register_{plugin.info.plugin_name_snake}.py' already exists."
)
continue
plugin.run()
designer_plugin_infos.append(plugin.info)
# Write designer_plugins.py with plugin import metadata for all widgets with designer plugins.
designer_plugins_path = module_dir / client_subdir / "designer_plugins.py"
logger.info(f"Generating designer plugin registry at {designer_plugins_path}")
write_designer_plugins(designer_plugin_infos, str(designer_plugins_path))
if __name__ == "__main__": # pragma: no cover
@@ -29,6 +29,7 @@ class DesignerPluginInfo:
self.plugin_name_pascal = plugin_class.__name__
self.plugin_name_snake = pascal_to_snake(self.plugin_name_pascal)
self.widget_import = f"from {plugin_class.__module__} import {self.plugin_name_pascal}"
self.icon_name = getattr(plugin_class, "ICON_NAME", "")
plugin_module = (
".".join(plugin_class.__module__.split(".")[:-1]) + f".{self.plugin_name_snake}_plugin"
)
@@ -63,6 +64,10 @@ class DesignerPluginGenerator:
def filenames(self):
return plugin_filenames(self.info.plugin_name_snake)
@property
def excluded(self):
return self._excluded
def run(self, validate=True):
if self._excluded:
print(f"Plugin {self.widget.__name__} is excluded from generation.")
+110 -50
View File
@@ -1,56 +1,22 @@
from __future__ import annotations
import ast
import importlib
import inspect
import os
from dataclasses import dataclass
from functools import lru_cache
from pathlib import Path
from typing import TYPE_CHECKING, Iterable
from bec_lib.plugin_helper import _get_available_plugins
from qtpy.QtWidgets import QWidget
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.bec_widget import BECWidget
if TYPE_CHECKING: # pragma: no cover
from qtpy.QtWidgets import QWidget
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
def get_plugin_widgets() -> dict[str, BECConnector]:
"""
Get all available widgets from the plugin directory. Widgets are classes that inherit from BECConnector.
The plugins are provided through python plugins and specified in the respective pyproject.toml file using
the following key:
[project.entry-points."bec.widgets.user_widgets"]
plugin_widgets = "path.to.plugin.module"
e.g.
[project.entry-points."bec.widgets.user_widgets"]
plugin_widgets = "pxiii_bec.bec_widgets.widgets"
assuming that the widgets module for the package pxiii_bec is located at pxiii_bec/bec_widgets/widgets and
contains the widgets to be loaded within the pxiii_bec/bec_widgets/widgets/__init__.py file.
Returns:
dict[str, BECConnector]: A dictionary of widget names and their respective classes.
"""
modules = _get_available_plugins("bec.widgets.user_widgets")
loaded_plugins = {}
print(modules)
for module in modules:
mods = inspect.getmembers(module, predicate=_filter_plugins)
for name, mod_cls in mods:
if name in loaded_plugins:
print(f"Duplicated widgets plugin {name}.")
loaded_plugins[name] = mod_cls
return loaded_plugins
def _filter_plugins(obj):
return inspect.isclass(obj) and issubclass(obj, BECConnector)
def get_plugin_auto_updates() -> dict[str, type[AutoUpdates]]:
"""
Get all available auto update classes from the plugin directory. AutoUpdates must inherit from AutoUpdate and be
@@ -66,6 +32,8 @@ def get_plugin_auto_updates() -> dict[str, type[AutoUpdates]]:
Returns:
dict[str, AutoUpdates]: A dictionary of widget names and their respective classes.
"""
from bec_lib.plugin_helper import _get_available_plugins
modules = _get_available_plugins("bec.widgets.auto_updates")
loaded_plugins = {}
for module in modules:
@@ -168,6 +136,11 @@ class BECClassContainer:
def _collect_classes_from_package(repo_name: str, package: str) -> BECClassContainer:
"""Collect classes from a package subtree (for example ``widgets`` or ``applications``)."""
from qtpy.QtWidgets import QWidget
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.bec_widget import BECWidget
collection = BECClassContainer()
try:
anchor_module = importlib.import_module(f"{repo_name}.{package}")
@@ -194,17 +167,18 @@ def _collect_classes_from_package(repo_name: str, package: str) -> BECClassConta
for name in dir(module):
obj = getattr(module, name)
if not isinstance(obj, type):
continue
if not hasattr(obj, "__module__") or obj.__module__ != module.__name__:
continue
if isinstance(obj, type):
class_info = BECClassInfo(name=name, module=module.__name__, file=path, obj=obj)
if issubclass(obj, BECConnector):
class_info.is_connector = True
if issubclass(obj, QWidget) or issubclass(obj, BECWidget):
class_info.is_widget = True
if hasattr(obj, "PLUGIN") and obj.PLUGIN:
class_info.is_plugin = True
collection.add_class(class_info)
class_info = BECClassInfo(name=name, module=module.__name__, file=path, obj=obj)
if issubclass(obj, BECConnector):
class_info.is_connector = True
if issubclass(obj, QWidget) or issubclass(obj, BECWidget):
class_info.is_widget = True
if hasattr(obj, "PLUGIN") and obj.PLUGIN:
class_info.is_plugin = True
collection.add_class(class_info)
return collection
@@ -229,3 +203,89 @@ def get_custom_classes(
for package in selected_packages:
collection += _collect_classes_from_package(repo_name, package)
return collection
def _get_designer_registry() -> dict[str, tuple[str, str]]:
from bec_widgets.cli.designer_plugins import designer_plugins
return designer_plugins
def _resolve_widget_from_registry(import_path: str, widget_name: str) -> type[QWidget]:
widget = importlib.import_module(import_path)
return getattr(widget, widget_name)
def designer_plugin_exists(name: str) -> bool:
from bec_widgets.utils.bec_plugin_helper import get_plugin_designer_registry
internal_registry = _get_designer_registry()
external_registry = get_plugin_designer_registry()
return name in internal_registry or name in external_registry
def get_designer_plugin(name: str, raise_on_missing: bool = True) -> type[QWidget] | None:
from bec_widgets.utils.bec_plugin_helper import get_plugin_designer_registry
internal_registry = _get_designer_registry()
external_registry = get_plugin_designer_registry()
if name in external_registry:
import_path, widget_name = external_registry[name]
return _resolve_widget_from_registry(import_path, widget_name)
if name in internal_registry:
import_path, widget_name = internal_registry[name]
return _resolve_widget_from_registry(import_path, widget_name)
if raise_on_missing:
raise ValueError(
f"Designer plugin {name} not found in either internal or external registry."
)
return None
def rpc_widget_registry_from_source(path: str | Path) -> dict[str, tuple[str, str]]:
"""Parse a generated RPC client module and return its widget registry."""
source_path = Path(path)
module_node = ast.parse(source_path.read_text(encoding="utf-8"), filename=str(source_path))
registry = {}
for node in module_node.body:
if not isinstance(node, ast.ClassDef):
continue
for item in node.body:
if not isinstance(item, ast.Assign):
continue
if not any(
isinstance(target, ast.Name) and target.id == "_IMPORT_MODULE"
for target in item.targets
):
continue
if isinstance(item.value, ast.Constant) and isinstance(item.value.value, str):
registry[node.name] = (item.value.value, node.name)
break
return registry
@lru_cache
def get_rpc_widget_registry() -> dict[str, tuple[str, str]]:
client_path = Path(__file__).resolve().parents[1] / "cli" / "client.py"
return rpc_widget_registry_from_source(client_path)
@lru_cache
def rpc_widget_registry() -> dict[str, tuple[str, str]]:
from bec_widgets.utils.bec_plugin_helper import get_plugin_rpc_widget_registry
internal_registry = get_rpc_widget_registry()
external_registry = get_plugin_rpc_widget_registry()
return {**external_registry, **internal_registry}
def get_rpc_widget(name: str, raise_on_missing: bool = True) -> type[QWidget] | None:
registry = rpc_widget_registry()
if name in registry:
import_path, widget_name = registry[name]
return _resolve_widget_from_registry(import_path, widget_name)
if raise_on_missing:
raise ValueError(f"RPC widget {name} not found in registry.")
return None
+17 -25
View File
@@ -1,42 +1,34 @@
from __future__ import annotations
from bec_widgets.cli.client_utils import IGNORE_WIDGETS
from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.plugin_utils import get_custom_classes
from typing import TYPE_CHECKING
from bec_widgets.utils.plugin_utils import get_rpc_widget, rpc_widget_registry
if TYPE_CHECKING: # pragma: no cover
from bec_widgets.utils.bec_widget import BECWidget
class RPCWidgetHandler:
"""Handler class for creating widgets from RPC messages."""
def __init__(self):
self._widget_classes = None
self._widget_registry = None
@property
def widget_classes(self) -> dict[str, type[BECWidget]]:
def widget_classes(self) -> dict[str, tuple[str, str]]:
"""
Get the available widget classes.
Returns:
dict: The available widget classes.
"""
if self._widget_classes is None:
self.update_available_widgets()
return self._widget_classes # type: ignore
registry = rpc_widget_registry()
if not registry:
return {}
return registry
def update_available_widgets(self):
"""
Update the available widgets.
Returns:
None
"""
self._widget_classes = (
get_custom_classes("bec_widgets", packages=("widgets", "applications"))
+ get_all_plugin_widgets()
).as_dict(IGNORE_WIDGETS)
def create_widget(self, widget_type, **kwargs) -> BECWidget:
@staticmethod
def create_widget(widget_type, **kwargs) -> BECWidget:
"""
Create a widget from an RPC message.
@@ -48,9 +40,9 @@ class RPCWidgetHandler:
Returns:
widget(BECWidget): The created widget.
"""
widget_class = self.widget_classes.get(widget_type) # type: ignore
if widget_class:
return widget_class(**kwargs)
widget = get_rpc_widget(widget_type, raise_on_missing=False)
if widget:
return widget(**kwargs)
raise ValueError(f"Unknown widget type: {widget_type}")
+4 -2
View File
@@ -27,8 +27,10 @@ from qtpy.QtWidgets import (
import bec_widgets
from bec_widgets.utils.toolbars.splitter import ResizableSpacer
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import BECDeviceFilter
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import (
BECDeviceFilter,
DeviceComboBox,
)
logger = bec_logger.logger
+9 -85
View File
@@ -1,10 +1,8 @@
from bec_lib.logger import bec_logger
from qtpy import PYQT6, PYSIDE6
from qtpy import PYSIDE6
from qtpy.QtCore import QFile, QIODevice
from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets
from bec_widgets.utils.generate_designer_plugin import DesignerPluginInfo
from bec_widgets.utils.plugin_utils import get_custom_classes
from bec_widgets.utils.plugin_utils import get_designer_plugin
logger = bec_logger.logger
@@ -12,16 +10,14 @@ if PYSIDE6:
from qtpy.QtUiTools import QUiLoader
class CustomUiLoader(QUiLoader):
def __init__(self, baseinstance, custom_widgets: dict | None = None):
def __init__(self, baseinstance):
super().__init__(baseinstance)
self.custom_widgets = custom_widgets or {}
self.baseinstance = baseinstance
def createWidget(self, class_name, parent=None, name=""):
if class_name in self.custom_widgets:
widget = self.custom_widgets[class_name](self.baseinstance)
return widget
widget = get_designer_plugin(class_name, raise_on_missing=False)
if widget is not None:
return widget(self.baseinstance)
return super().createWidget(class_name, self.baseinstance, name)
@@ -31,16 +27,9 @@ class UILoader:
def __init__(self, parent=None):
self.parent = parent
self.custom_widgets = (
get_custom_classes("bec_widgets") + get_all_plugin_widgets()
).as_dict()
if PYSIDE6:
self.loader = self.load_ui_pyside6
elif PYQT6:
self.loader = self.load_ui_pyqt6
else:
if not PYSIDE6:
raise ImportError("No compatible Qt bindings found.")
self.loader = self.load_ui_pyside6
def load_ui_pyside6(self, ui_file, parent=None):
"""
@@ -53,7 +42,7 @@ class UILoader:
QWidget: The loaded widget.
"""
parent = parent or self.parent
loader = CustomUiLoader(parent, self.custom_widgets)
loader = CustomUiLoader(parent)
file = QFile(ui_file)
if not file.open(QIODevice.ReadOnly):
raise IOError(f"Cannot open file: {ui_file}")
@@ -61,71 +50,6 @@ class UILoader:
file.close()
return widget
def load_ui_pyqt6(self, ui_file, parent=None):
"""
Specific loader for PyQt6 using loadUi.
Args:
ui_file(str): Path to the .ui file.
parent(QWidget): Parent widget.
Returns:
QWidget: The loaded widget.
"""
from PyQt6.uic.Loader.loader import DynamicUILoader
class CustomDynamicUILoader(DynamicUILoader):
def __init__(self, package, custom_widgets: dict = None):
super().__init__(package)
self.custom_widgets = custom_widgets or {}
def _handle_custom_widgets(self, el):
"""Handle the <customwidgets> element."""
def header2module(header):
"""header2module(header) -> string
Convert paths to C++ header files to according Python modules
>>> header2module("foo/bar/baz.h")
'foo.bar.baz'
"""
if header.endswith(".h"):
header = header[:-2]
mpath = []
for part in header.split("/"):
# Ignore any empty parts or those that refer to the current
# directory.
if part not in ("", "."):
if part == "..":
# We should allow this for Python3.
raise SyntaxError(
"custom widget header file name may not contain '..'."
)
mpath.append(part)
return ".".join(mpath)
for custom_widget in el:
classname = custom_widget.findtext("class")
header = custom_widget.findtext("header")
if header:
header = self._translate_bec_widgets_header(header)
self.factory.addCustomWidget(
classname,
custom_widget.findtext("extends") or "QWidget",
header2module(header),
)
def _translate_bec_widgets_header(self, header):
for name, value in self.custom_widgets.items():
if header == DesignerPluginInfo.pascal_to_snake(name):
return value.__module__
return header
return CustomDynamicUILoader("", self.custom_widgets).loadUi(ui_file, parent)
def load_ui(self, ui_file, parent=None):
"""
Universal UI loader method.
+5 -1
View File
@@ -85,7 +85,11 @@ class ComboBoxHandler(WidgetHandler):
def set_value(self, widget: QComboBox, value: int | str) -> None:
if isinstance(value, str):
value = widget.findText(value)
index = widget.findText(value)
if index < 0 and widget.isEditable():
widget.setCurrentText(value)
return
value = index
if isinstance(value, int):
widget.setCurrentIndex(value)
@@ -19,8 +19,7 @@ from qtpy.QtWidgets import (
import bec_widgets.widgets.containers.qt_ads as QtAds
from bec_widgets import BECWidget, SafeProperty, SafeSlot
from bec_widgets.applications.views.view import ViewTourSteps
from bec_widgets.utils.bec_dispatcher import BECDispatcher
from bec_widgets.cli.designer_plugins import widget_icons
from bec_widgets.utils.colors import apply_theme
from bec_widgets.utils.rpc_decorator import rpc_timeout
from bec_widgets.utils.rpc_widget_handler import widget_handler
@@ -35,25 +34,25 @@ from bec_widgets.utils.widget_state_manager import WidgetStateManager
from bec_widgets.widgets.containers.dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.containers.dock_area.profile_utils import (
SETTINGS_KEYS,
default_profile_candidates,
baseline_profile_candidates,
delete_profile_files,
get_last_profile,
is_profile_read_only,
is_quick_select,
list_profiles,
list_quick_profiles,
load_default_profile_screenshot,
load_user_profile_screenshot,
load_baseline_profile_screenshot,
load_runtime_profile_screenshot,
now_iso_utc,
open_default_settings,
open_user_settings,
open_baseline_settings,
open_runtime_settings,
profile_origin,
profile_origin_display,
read_manifest,
restore_user_from_default,
restore_runtime_from_baseline,
runtime_profile_candidates,
set_last_profile,
set_quick_select,
user_profile_candidates,
write_manifest,
)
from bec_widgets.widgets.containers.dock_area.settings.dialogs import (
@@ -65,22 +64,7 @@ from bec_widgets.widgets.containers.dock_area.toolbar_components.workspace_actio
WorkspaceConnection,
workspace_bundle,
)
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindowNoRPC
from bec_widgets.widgets.containers.qt_ads import CDockWidget
from bec_widgets.widgets.control.device_control.positioner_box import PositionerBox, PositionerBox2D
from bec_widgets.widgets.control.scan_control import ScanControl
from bec_widgets.widgets.editors.bec_console.bec_console import BecConsole, BECShell
from bec_widgets.widgets.plots.heatmap.heatmap import Heatmap
from bec_widgets.widgets.plots.image.image import Image
from bec_widgets.widgets.plots.motor_map.motor_map import MotorMap
from bec_widgets.widgets.plots.multi_waveform.multi_waveform import MultiWaveform
from bec_widgets.widgets.plots.scatter_waveform.scatter_waveform import ScatterWaveform
from bec_widgets.widgets.plots.waveform.waveform import Waveform
from bec_widgets.widgets.progress.ring_progress_bar.ring_progress_bar import RingProgressBar
from bec_widgets.widgets.services.bec_queue.bec_queue import BECQueue
from bec_widgets.widgets.services.bec_status_box.bec_status_box import BECStatusBox
from bec_widgets.widgets.utility.logpanel.logpanel import LogPanel
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
logger = bec_logger.logger
@@ -108,6 +92,7 @@ class BECDockArea(DockAreaWidget):
"list_profiles",
"save_profile",
"load_profile",
"restore_baseline_profile",
"delete_profile",
]
@@ -143,6 +128,10 @@ class BECDockArea(DockAreaWidget):
self._mode = mode
# Toolbar
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import (
DarkModeButton,
)
self.dark_mode_button = DarkModeButton(parent=self, toolbar=True)
self.dark_mode_button.setVisible(enable_profile_management)
self._setup_toolbar()
@@ -235,11 +224,8 @@ class BECDockArea(DockAreaWidget):
def _load_initial_profile(self, name: str) -> None:
"""Load the initial profile."""
self.load_profile(name)
combo = self.toolbar.components.get_action("workspace_combo").widget
combo.blockSignals(True)
if not self._empty_profile_active:
combo.setCurrentText(name)
combo.blockSignals(False)
self._set_workspace_combo_text_silent(name)
def _start_empty_workspace(self) -> None:
"""
@@ -344,39 +330,42 @@ class BECDockArea(DockAreaWidget):
self.toolbar = ModularToolBar(parent=self)
plot_actions = {
"waveform": (Waveform.ICON_NAME, "Add Waveform", "Waveform"),
"waveform": (widget_icons["Waveform"], "Add Waveform", "Waveform"),
"scatter_waveform": (
ScatterWaveform.ICON_NAME,
widget_icons["ScatterWaveform"],
"Add Scatter Waveform",
"ScatterWaveform",
),
"multi_waveform": (MultiWaveform.ICON_NAME, "Add Multi Waveform", "MultiWaveform"),
"image": (Image.ICON_NAME, "Add Image", "Image"),
"motor_map": (MotorMap.ICON_NAME, "Add Motor Map", "MotorMap"),
"heatmap": (Heatmap.ICON_NAME, "Add Heatmap", "Heatmap"),
"multi_waveform": (
widget_icons["MultiWaveform"],
"Add Multi Waveform",
"MultiWaveform",
),
"image": (widget_icons["Image"], "Add Image", "Image"),
"motor_map": (widget_icons["MotorMap"], "Add Motor Map", "MotorMap"),
"heatmap": (widget_icons["Heatmap"], "Add Heatmap", "Heatmap"),
}
device_actions = {
"scan_control": (ScanControl.ICON_NAME, "Add Scan Control", "ScanControl"),
"positioner_box": (PositionerBox.ICON_NAME, "Add Device Box", "PositionerBox"),
"scan_control": (widget_icons["ScanControl"], "Add Scan Control", "ScanControl"),
"positioner_box": (widget_icons["PositionerBox"], "Add Device Box", "PositionerBox"),
"positioner_box_2D": (
PositionerBox2D.ICON_NAME,
widget_icons["PositionerBox2D"],
"Add Device 2D Box",
"PositionerBox2D",
),
}
util_actions = {
"queue": (BECQueue.ICON_NAME, "Add Scan Queue", "BECQueue"),
"status": (BECStatusBox.ICON_NAME, "Add BEC Status Box", "BECStatusBox"),
"queue": (widget_icons["BECQueue"], "Add Scan Queue", "BECQueue"),
"status": (widget_icons["BECStatusBox"], "Add BEC Status Box", "BECStatusBox"),
"progress_bar": (
RingProgressBar.ICON_NAME,
widget_icons["RingProgressBar"],
"Add Circular ProgressBar",
"RingProgressBar",
),
"terminal": (BecConsole.ICON_NAME, "Add Terminal", "BecConsole"),
"bec_shell": (BECShell.ICON_NAME, "Add BEC Shell", "BECShell"),
"log_panel": (LogPanel.ICON_NAME, "Add LogPanel - Disabled", "LogPanel"),
"sbb_monitor": ("train", "Add SBB Monitor", "SBBMonitor"),
"log_panel": (LogPanel.ICON_NAME, "Add LogPanel", "LogPanel"),
"terminal": (widget_icons["BecConsole"], "Add Terminal", "BecConsole"),
"bec_shell": (widget_icons["BECShell"], "Add BEC Shell", "BECShell"),
"sbb_monitor": (widget_icons["SBBMonitor"], "Add SBB Monitor", "SBBMonitor"),
"log_panel": (widget_icons["LogPanel"], "Add LogPanel", "LogPanel"),
}
# Create expandable menu actions (original behavior)
@@ -591,13 +580,13 @@ class BECDockArea(DockAreaWidget):
@property
def profile_namespace(self) -> str | None:
"""Namespace used to scope user/default profile files for this dock area."""
"""Namespace used to scope runtime/baseline profile files for this dock area."""
return self._resolve_profile_namespace()
def _profile_exists(self, name: str, namespace: str | None) -> bool:
return any(
os.path.exists(path) for path in user_profile_candidates(name, namespace)
) or any(os.path.exists(path) for path in default_profile_candidates(name, namespace))
os.path.exists(path) for path in runtime_profile_candidates(name, namespace)
) or any(os.path.exists(path) for path in baseline_profile_candidates(name, namespace))
def _write_snapshot_to_settings(self, settings, save_preview: bool = True) -> None:
"""
@@ -623,35 +612,34 @@ class BECDockArea(DockAreaWidget):
name: str,
namespace: str | None,
*,
write_default: bool = True,
write_user: bool = True,
write_baseline: bool = True,
write_runtime: bool = True,
save_preview: bool = True,
) -> None:
"""
Write profile settings to default and/or user settings files.
Write profile settings to baseline and/or runtime settings files.
Args:
name: The profile name.
namespace: The profile namespace.
write_default: Whether to write to the default settings file.
write_user: Whether to write to the user settings file.
write_baseline: Whether to write to the baseline settings file.
write_runtime: Whether to write to the runtime settings file.
save_preview: Whether to save a screenshot preview.
"""
if write_default:
ds = open_default_settings(name, namespace=namespace)
self._write_snapshot_to_settings(ds, save_preview=save_preview)
if not ds.value(SETTINGS_KEYS["created_at"], ""):
ds.setValue(SETTINGS_KEYS["created_at"], now_iso_utc())
if not ds.value(SETTINGS_KEYS["is_quick_select"], None):
ds.setValue(SETTINGS_KEYS["is_quick_select"], True)
if write_user:
us = open_user_settings(name, namespace=namespace)
self._write_snapshot_to_settings(us, save_preview=save_preview)
if not us.value(SETTINGS_KEYS["created_at"], ""):
us.setValue(SETTINGS_KEYS["created_at"], now_iso_utc())
if not us.value(SETTINGS_KEYS["is_quick_select"], None):
us.setValue(SETTINGS_KEYS["is_quick_select"], True)
def _write_settings(open_settings) -> None:
settings = open_settings(name, namespace=namespace)
self._write_snapshot_to_settings(settings, save_preview=save_preview)
if not settings.value(SETTINGS_KEYS["created_at"], ""):
settings.setValue(SETTINGS_KEYS["created_at"], now_iso_utc())
if not settings.value(SETTINGS_KEYS["is_quick_select"], None):
settings.setValue(SETTINGS_KEYS["is_quick_select"], True)
if write_baseline:
_write_settings(open_baseline_settings)
if write_runtime:
_write_settings(open_runtime_settings)
def _finalize_profile_change(self, name: str, namespace: str | None) -> None:
"""
@@ -669,6 +657,14 @@ class BECDockArea(DockAreaWidget):
combo = self.toolbar.components.get_action("workspace_combo").widget
combo.refresh_profiles(active_profile=name)
def _set_workspace_combo_text_silent(self, text: str) -> None:
combo = self.toolbar.components.get_action("workspace_combo").widget
was_blocked = combo.blockSignals(True)
try:
combo.setCurrentText(text)
finally:
combo.blockSignals(was_blocked)
def _enter_empty_profile_state(self) -> None:
"""
Switch to the transient empty workspace state.
@@ -705,10 +701,10 @@ class BECDockArea(DockAreaWidget):
Save the current workspace profile.
On first save of a given name:
- writes a default copy to states/default/<name>.ini with tag=default and created_at
- writes a user copy to states/user/<name>.ini with tag=user and created_at
On subsequent saves of user-owned profiles:
- updates both the default and user copies so restore uses the latest snapshot.
- writes a baseline copy to profiles/baseline/<name>.ini with created_at
- writes a runtime copy to profiles/runtime/<name>.ini with created_at
On subsequent saves:
- updates both the baseline and runtime copies so restore uses the latest snapshot.
Read-only bundled profiles cannot be overwritten.
Args:
@@ -772,7 +768,7 @@ class BECDockArea(DockAreaWidget):
overwrite_existing = origin == "settings"
origin_before_save = profile_origin(name, namespace=namespace)
overwrite_default = overwrite_existing and origin_before_save == "settings"
overwrite_baseline = overwrite_existing and origin_before_save == "settings"
# Display saving placeholder in toolbar
workspace_combo = self.toolbar.components.get_action("workspace_combo").widget
@@ -781,12 +777,12 @@ class BECDockArea(DockAreaWidget):
workspace_combo.setCurrentIndex(0)
workspace_combo.blockSignals(False)
# Write to default and/or user settings
should_write_default = overwrite_default or not any(
os.path.exists(path) for path in default_profile_candidates(name, namespace)
# Write to baseline and/or runtime settings
should_write_baseline = overwrite_baseline or not any(
os.path.exists(path) for path in baseline_profile_candidates(name, namespace)
)
self._write_profile_settings(
name, namespace, write_default=should_write_default, write_user=True
name, namespace, write_baseline=should_write_baseline, write_runtime=True
)
set_quick_select(name, quickselect, namespace=namespace)
@@ -796,7 +792,6 @@ class BECDockArea(DockAreaWidget):
self._pending_autosave_skip = (current_profile, name)
else:
self._pending_autosave_skip = None
workspace_combo.setCurrentText(name)
self._finalize_profile_change(name, namespace)
@SafeSlot()
@@ -816,16 +811,21 @@ class BECDockArea(DockAreaWidget):
@SafeSlot()
@SafeSlot(str)
@SafeSlot(str, bool)
@rpc_timeout(None)
def load_profile(self, name: str | None = None):
def load_profile(self, name: str | None = None, restore_baseline: bool = False):
"""
Load a workspace profile.
Before switching, persist the current profile to the user copy.
Prefer loading the user copy; fall back to the default copy.
Before switching, persist the current profile to the runtime copy.
Prefer loading the runtime copy; fall back to the baseline copy. When
``restore_baseline`` is True, first overwrite the runtime copy with the
baseline profile and then load it.
Args:
name (str | None): The name of the profile to load. If None, prompts the user.
restore_baseline (bool): If True, restore the runtime copy from the
baseline before loading. Defaults to False.
"""
if name == "":
return
@@ -844,14 +844,17 @@ class BECDockArea(DockAreaWidget):
if skip_pair and skip_pair == (prev_name, name):
self._pending_autosave_skip = None
else:
us_prev = open_user_settings(prev_name, namespace=namespace)
us_prev = open_runtime_settings(prev_name, namespace=namespace)
self._write_snapshot_to_settings(us_prev, save_preview=True)
if restore_baseline:
restore_runtime_from_baseline(name, namespace=namespace)
settings = None
if any(os.path.exists(path) for path in user_profile_candidates(name, namespace)):
settings = open_user_settings(name, namespace=namespace)
elif any(os.path.exists(path) for path in default_profile_candidates(name, namespace)):
settings = open_default_settings(name, namespace=namespace)
if any(os.path.exists(path) for path in runtime_profile_candidates(name, namespace)):
settings = open_runtime_settings(name, namespace=namespace)
elif any(os.path.exists(path) for path in baseline_profile_candidates(name, namespace)):
settings = open_baseline_settings(name, namespace=namespace)
if settings is None:
logger.warning(f"Profile '{name}' not found in namespace '{namespace}'. Creating new.")
self.delete_all()
@@ -893,32 +896,36 @@ class BECDockArea(DockAreaWidget):
@SafeSlot()
@SafeSlot(str)
def restore_user_profile_from_default(self, name: str | None = None):
@SafeSlot(str, bool)
@rpc_timeout(None)
def restore_baseline_profile(self, name: str | None = None, show_dialog: bool = False):
"""
Overwrite the user copy of *name* with the default baseline.
Overwrite the runtime copy of *name* with the baseline.
If *name* is None, target the currently active profile.
Args:
name (str | None): The name of the profile to restore. If None, uses the current profile.
show_dialog (bool): If True, ask for confirmation before restoring.
"""
target = name or getattr(self, "_current_profile_name", None)
if not target:
return
namespace = self.profile_namespace
current_pixmap = None
if self.isVisible():
current_pixmap = QPixmap()
ba = bytes(self.screenshot_bytes())
current_pixmap.loadFromData(ba)
if current_pixmap is None or current_pixmap.isNull():
current_pixmap = load_user_profile_screenshot(target, namespace=namespace)
default_pixmap = load_default_profile_screenshot(target, namespace=namespace)
if show_dialog:
current_pixmap = None
if self.isVisible():
current_pixmap = QPixmap()
ba = bytes(self.screenshot_bytes())
current_pixmap.loadFromData(ba)
if current_pixmap is None or current_pixmap.isNull():
current_pixmap = load_runtime_profile_screenshot(target, namespace=namespace)
baseline_pixmap = load_baseline_profile_screenshot(target, namespace=namespace)
if not RestoreProfileDialog.confirm(self, current_pixmap, default_pixmap):
return
if not RestoreProfileDialog.confirm(self, current_pixmap, baseline_pixmap):
return
restore_user_from_default(target, namespace=namespace)
restore_runtime_from_baseline(target, namespace=namespace)
self.delete_all()
self.load_profile(target)
@@ -1053,7 +1060,7 @@ class BECDockArea(DockAreaWidget):
manage_action = self.toolbar.components.get_action("manage_workspaces").action
if self.manage_dialog is None or not self.manage_dialog.isVisible():
self.manage_widget = WorkSpaceManager(
self, target_widget=self, default_profile=self._current_profile_name
self, target_widget=self, active_profile=self._current_profile_name
)
self.manage_dialog = QDialog(modal=False)
@@ -1152,7 +1159,7 @@ class BECDockArea(DockAreaWidget):
return
namespace = self.profile_namespace
settings = open_user_settings(name, namespace=namespace)
settings = open_runtime_settings(name, namespace=namespace)
self._write_snapshot_to_settings(settings)
set_last_profile(name, namespace=namespace, instance=self._last_profile_instance_id())
self._exit_snapshot_written = True
@@ -1182,6 +1189,8 @@ class BECDockArea(DockAreaWidget):
)
step_ids.append(step_id)
from bec_widgets.applications.views.view import ViewTourSteps
return ViewTourSteps(view_title="Dock Area Workspace", step_ids=step_ids)
def cleanup(self):
@@ -1202,6 +1211,9 @@ class BECDockArea(DockAreaWidget):
if __name__ == "__main__": # pragma: no cover
import sys
from bec_widgets.utils.bec_dispatcher import BECDispatcher
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindowNoRPC
app = QApplication(sys.argv)
apply_theme("dark")
dispatcher = BECDispatcher(gui_id="ads")
@@ -2,9 +2,13 @@
Utilities for managing BECDockArea profiles stored in INI files.
Policy:
- All created/modified profiles are stored under the BEC settings root: <base_path>/profiles/{default,user}
- Bundled read-only defaults are discovered in BW core states/default and plugin bec_widgets/profiles but never written to.
- Lookup order when reading: user settings default app or plugin bundled default.
- All created/modified profiles are stored under the BEC settings root:
<base_path>/profiles/{baseline,runtime}
- Bundled read-only baselines are discovered in BW core profiles and plugin
bec_widgets/profiles but never written to.
- Lookup order when reading: runtime settings baseline app or plugin bundled baseline.
- Legacy settings paths profiles/{default,user} are read through a thin segment
alias layer and copied to the canonical location on first access.
"""
from __future__ import annotations
@@ -32,6 +36,12 @@ logger = bec_logger.logger
MODULE_PATH = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
ProfileOrigin = Literal["module", "plugin", "settings", "unknown"]
ProfileSegment = Literal["baseline", "runtime"]
_PROFILE_SEGMENT_ALIASES: dict[ProfileSegment, tuple[str, str]] = {
"baseline": ("baseline", "default"),
"runtime": ("runtime", "user"),
}
def module_profiles_dir() -> str:
@@ -130,7 +140,7 @@ def _profiles_dir(segment: str, namespace: str | None) -> str:
Build (and ensure) the directory that holds profiles for a namespace segment.
Args:
segment (str): Either ``"user"`` or ``"default"``.
segment (str): Profile segment directory name.
namespace (str | None): Optional namespace label to scope profiles.
Returns:
@@ -143,157 +153,175 @@ def _profiles_dir(segment: str, namespace: str | None) -> str:
return path
def _user_path_candidates(name: str, namespace: str | None) -> list[str]:
"""
Generate candidate user-profile paths honoring namespace fallbacks.
Args:
name (str): Profile name without extension.
namespace (str | None): Optional namespace label.
Returns:
list[str]: Ordered list of candidate user profile paths (.ini files).
"""
def _candidate_namespaces(namespace: str | None) -> list[str | None]:
ns = slugify.slugify(namespace, separator="_") if namespace else None
primary = os.path.join(_profiles_dir("user", ns), f"{name}.ini")
if not ns:
return [primary]
legacy = os.path.join(_profiles_dir("user", None), f"{name}.ini")
return [primary, legacy] if legacy != primary else [primary]
return [None]
return [ns, None]
def _default_path_candidates(name: str, namespace: str | None) -> list[str]:
def _segment_profile_path(segment_name: str, name: str, namespace: str | None) -> str:
return os.path.join(_profiles_dir(segment_name, namespace), f"{name}.ini")
def _canonical_profile_path(segment: ProfileSegment, name: str, namespace: str | None) -> str:
return _segment_profile_path(_PROFILE_SEGMENT_ALIASES[segment][0], name, namespace)
def _segment_path_candidates(
segment: ProfileSegment,
name: str,
namespace: str | None,
*,
include_legacy: bool = True,
migrate_legacy: bool = True,
) -> list[str]:
"""
Generate candidate default-profile paths honoring namespace fallbacks.
Generate profile candidates for a canonical segment.
Args:
name (str): Profile name without extension.
namespace (str | None): Optional namespace label.
Returns:
list[str]: Ordered list of candidate default profile paths (.ini files).
Canonical baseline/runtime files are always preferred. Namespace fallback
files and legacy default/user files are copied to the primary canonical path
when the primary file does not exist.
"""
ns = slugify.slugify(namespace, separator="_") if namespace else None
primary = os.path.join(_profiles_dir("default", ns), f"{name}.ini")
if not ns:
return [primary]
legacy = os.path.join(_profiles_dir("default", None), f"{name}.ini")
return [primary, legacy] if legacy != primary else [primary]
canonical = [
_segment_profile_path(_PROFILE_SEGMENT_ALIASES[segment][0], name, ns)
for ns in _candidate_namespaces(namespace)
]
legacy = []
if include_legacy:
legacy = [
_segment_profile_path(_PROFILE_SEGMENT_ALIASES[segment][1], name, ns)
for ns in _candidate_namespaces(namespace)
]
primary_canonical = canonical[0]
if migrate_legacy and not os.path.exists(primary_canonical):
canonical_src = next((path for path in canonical[1:] if os.path.exists(path)), None)
if canonical_src:
os.makedirs(os.path.dirname(primary_canonical), exist_ok=True)
shutil.copy2(canonical_src, primary_canonical)
elif include_legacy:
legacy_src = next((path for path in legacy if os.path.exists(path)), None)
if legacy_src:
os.makedirs(os.path.dirname(primary_canonical), exist_ok=True)
shutil.copy2(legacy_src, primary_canonical)
return list(dict.fromkeys(canonical + legacy))
def default_profiles_dir(namespace: str | None = None) -> str:
def baseline_profiles_dir(namespace: str | None = None) -> str:
"""
Return the directory that stores default profiles for the namespace.
Return the directory that stores baseline profiles for the namespace.
Args:
namespace (str | None, optional): Namespace label. Defaults to ``None``.
Returns:
str: Absolute path to the default profile directory.
str: Absolute path to the baseline profile directory.
"""
return _profiles_dir("default", namespace)
return _profiles_dir("baseline", namespace)
def user_profiles_dir(namespace: str | None = None) -> str:
def runtime_profiles_dir(namespace: str | None = None) -> str:
"""
Return the directory that stores user profiles for the namespace.
Return the directory that stores runtime profiles for the namespace.
Args:
namespace (str | None, optional): Namespace label. Defaults to ``None``.
Returns:
str: Absolute path to the user profile directory.
str: Absolute path to the runtime profile directory.
"""
return _profiles_dir("user", namespace)
return _profiles_dir("runtime", namespace)
def default_profile_path(name: str, namespace: str | None = None) -> str:
def baseline_profile_path(name: str, namespace: str | None = None) -> str:
"""
Compute the canonical default profile path for a profile name.
Compute the canonical baseline profile path for a profile name.
Args:
name (str): Profile name without extension.
namespace (str | None, optional): Namespace label. Defaults to ``None``.
Returns:
str: Absolute path to the default profile file (.ini).
str: Absolute path to the baseline profile file (.ini).
"""
return _default_path_candidates(name, namespace)[0]
return _canonical_profile_path("baseline", name, namespace)
def user_profile_path(name: str, namespace: str | None = None) -> str:
def runtime_profile_path(name: str, namespace: str | None = None) -> str:
"""
Compute the canonical user profile path for a profile name.
Compute the canonical runtime profile path for a profile name.
Args:
name (str): Profile name without extension.
namespace (str | None, optional): Namespace label. Defaults to ``None``.
Returns:
str: Absolute path to the user profile file (.ini).
str: Absolute path to the runtime profile file (.ini).
"""
return _user_path_candidates(name, namespace)[0]
return _canonical_profile_path("runtime", name, namespace)
def user_profile_candidates(name: str, namespace: str | None = None) -> list[str]:
def runtime_profile_candidates(name: str, namespace: str | None = None) -> list[str]:
"""
List all user profile path candidates for a profile name.
List all runtime profile path candidates for a profile name.
Args:
name (str): Profile name without extension.
namespace (str | None, optional): Namespace label. Defaults to ``None``.
Returns:
list[str]: De-duplicated list of candidate user profile paths.
list[str]: De-duplicated list of candidate runtime profile paths.
"""
return list(dict.fromkeys(_user_path_candidates(name, namespace)))
return _segment_path_candidates("runtime", name, namespace)
def default_profile_candidates(name: str, namespace: str | None = None) -> list[str]:
def baseline_profile_candidates(name: str, namespace: str | None = None) -> list[str]:
"""
List all default profile path candidates for a profile name.
List all baseline profile path candidates for a profile name.
Args:
name (str): Profile name without extension.
namespace (str | None, optional): Namespace label. Defaults to ``None``.
Returns:
list[str]: De-duplicated list of candidate default profile paths.
list[str]: De-duplicated list of candidate baseline profile paths.
"""
return list(dict.fromkeys(_default_path_candidates(name, namespace)))
return _segment_path_candidates("baseline", name, namespace)
def _existing_user_settings(name: str, namespace: str | None = None) -> QSettings | None:
def _existing_runtime_settings(name: str, namespace: str | None = None) -> QSettings | None:
"""
Resolve the first existing user profile settings object.
Resolve the first existing runtime profile settings object.
Args:
name (str): Profile name without extension.
namespace (str | None, optional): Namespace label to search. Defaults to ``None``.
Returns:
QSettings | None: Config for the first existing user profile candidate, or ``None``
QSettings | None: Config for the first existing runtime profile candidate, or ``None``
when no files are present.
"""
for path in user_profile_candidates(name, namespace):
for path in runtime_profile_candidates(name, namespace):
if os.path.exists(path):
return QSettings(path, QSettings.IniFormat)
return None
def _existing_default_settings(name: str, namespace: str | None = None) -> QSettings | None:
def _existing_baseline_settings(name: str, namespace: str | None = None) -> QSettings | None:
"""
Resolve the first existing default profile settings object.
Resolve the first existing baseline profile settings object.
Args:
name (str): Profile name without extension.
namespace (str | None, optional): Namespace label to search. Defaults to ``None``.
Returns:
QSettings | None: Config for the first existing default profile candidate, or ``None``
QSettings | None: Config for the first existing baseline profile candidate, or ``None``
when no files are present.
"""
for path in default_profile_candidates(name, namespace):
for path in baseline_profile_candidates(name, namespace):
if os.path.exists(path):
return QSettings(path, QSettings.IniFormat)
return None
@@ -347,7 +375,7 @@ def profile_origin(name: str, namespace: str | None = None) -> ProfileOrigin:
plugin_path = plugin_profile_path(name)
if plugin_path and os.path.exists(plugin_path):
return "plugin"
for path in user_profile_candidates(name, namespace) + default_profile_candidates(
for path in runtime_profile_candidates(name, namespace) + baseline_profile_candidates(
name, namespace
):
if os.path.exists(path):
@@ -406,8 +434,8 @@ def delete_profile_files(name: str, namespace: str | None = None) -> bool:
read_only = is_profile_read_only(name, namespace)
removed = False
# Always allow removing user copies; keep default copies for read-only origins.
for path in set(user_profile_candidates(name, namespace)):
# Always allow removing runtime copies; keep baseline copies for read-only origins.
for path in set(runtime_profile_candidates(name, namespace)):
try:
os.remove(path)
removed = True
@@ -415,7 +443,7 @@ def delete_profile_files(name: str, namespace: str | None = None) -> bool:
continue
if not read_only:
for path in set(default_profile_candidates(name, namespace)):
for path in set(baseline_profile_candidates(name, namespace)):
try:
os.remove(path)
removed = True
@@ -443,7 +471,7 @@ SETTINGS_KEYS = {
def list_profiles(namespace: str | None = None) -> list[str]:
"""
Enumerate all known profile names, syncing bundled defaults when missing locally.
Enumerate all known profile names, syncing bundled baselines when missing locally.
Args:
namespace (str | None, optional): Namespace label scoped to the profile set.
@@ -459,16 +487,27 @@ def list_profiles(namespace: str | None = None) -> list[str]:
return set()
return {os.path.splitext(f)[0] for f in os.listdir(directory) if f.endswith(".ini")}
settings_dirs = {default_profiles_dir(namespace), user_profiles_dir(namespace)}
settings_dirs = {baseline_profiles_dir(namespace), runtime_profiles_dir(namespace)}
if ns:
settings_dirs.add(default_profiles_dir(None))
settings_dirs.add(user_profiles_dir(None))
settings_dirs.add(baseline_profiles_dir(None))
settings_dirs.add(runtime_profiles_dir(None))
for segment in ("baseline", "runtime"):
for legacy_dir in [
_profiles_dir(_PROFILE_SEGMENT_ALIASES[segment][1], item)
for item in _candidate_namespaces(namespace)
]:
settings_dirs.add(legacy_dir)
settings_names: set[str] = set()
for directory in settings_dirs:
settings_names |= _collect_from(directory)
# Also consider read-only defaults from core module and beamline plugin repositories
for name in sorted(settings_names):
runtime_profile_candidates(name, namespace)
baseline_profile_candidates(name, namespace)
# Also consider read-only baselines from core module and beamline plugin repositories
read_only_sources: dict[str, tuple[str, str]] = {}
sources: list[tuple[str, str | None]] = [
("module", module_profiles_dir()),
@@ -484,17 +523,17 @@ def list_profiles(namespace: str | None = None) -> list[str]:
read_only_sources.setdefault(name, (origin, os.path.join(directory, filename)))
for name, (_origin, src) in sorted(read_only_sources.items()):
# Ensure a copy in the namespace-specific settings default directory
dst_default = default_profile_path(name, namespace)
if not os.path.exists(dst_default):
os.makedirs(os.path.dirname(dst_default), exist_ok=True)
shutil.copyfile(src, dst_default)
# Ensure a user copy exists to allow edits in the writable settings area
dst_user = user_profile_path(name, namespace)
if not os.path.exists(dst_user):
os.makedirs(os.path.dirname(dst_user), exist_ok=True)
shutil.copyfile(src, dst_user)
s = open_user_settings(name, namespace)
# Ensure a copy in the namespace-specific settings baseline directory.
dst_baseline = baseline_profile_path(name, namespace)
if not os.path.exists(dst_baseline):
os.makedirs(os.path.dirname(dst_baseline), exist_ok=True)
shutil.copy2(src, dst_baseline)
# Ensure a runtime copy exists to allow edits in the writable settings area.
dst_runtime = runtime_profile_path(name, namespace)
if not os.path.exists(dst_runtime):
os.makedirs(os.path.dirname(dst_runtime), exist_ok=True)
shutil.copy2(src, dst_runtime)
s = open_runtime_settings(name, namespace)
if s.value(SETTINGS_KEYS["created_at"], "") == "":
s.setValue(SETTINGS_KEYS["created_at"], now_iso_utc())
@@ -504,32 +543,34 @@ def list_profiles(namespace: str | None = None) -> list[str]:
return sorted(settings_names)
def open_default_settings(name: str, namespace: str | None = None) -> QSettings:
def open_baseline_settings(name: str, namespace: str | None = None) -> QSettings:
"""
Open (and create if necessary) the default profile settings file.
Open (and create if necessary) the baseline profile settings file.
Args:
name (str): Profile name without extension.
namespace (str | None, optional): Namespace label. Defaults to ``None``.
Returns:
QSettings: Settings instance targeting the default profile file.
QSettings: Settings instance targeting the baseline profile file.
"""
return QSettings(default_profile_path(name, namespace), QSettings.IniFormat)
baseline_profile_candidates(name, namespace)
return QSettings(baseline_profile_path(name, namespace), QSettings.IniFormat)
def open_user_settings(name: str, namespace: str | None = None) -> QSettings:
def open_runtime_settings(name: str, namespace: str | None = None) -> QSettings:
"""
Open (and create if necessary) the user profile settings file.
Open (and create if necessary) the runtime profile settings file.
Args:
name (str): Profile name without extension.
namespace (str | None, optional): Namespace label. Defaults to ``None``.
Returns:
QSettings: Settings instance targeting the user profile file.
QSettings: Settings instance targeting the runtime profile file.
"""
return QSettings(user_profile_path(name, namespace), QSettings.IniFormat)
runtime_profile_candidates(name, namespace)
return QSettings(runtime_profile_path(name, namespace), QSettings.IniFormat)
def _app_settings() -> QSettings:
@@ -759,26 +800,26 @@ def read_manifest(settings: QSettings) -> list[dict]:
return items
def restore_user_from_default(name: str, namespace: str | None = None) -> None:
def restore_runtime_from_baseline(name: str, namespace: str | None = None) -> None:
"""
Copy the default profile to the user profile, preserving quick-select flag.
Copy the baseline profile to the runtime profile, preserving quick-select flag.
Args:
name(str): Profile name without extension.
namespace(str | None, optional): Namespace label. Defaults to ``None``.
"""
src = None
for candidate in default_profile_candidates(name, namespace):
for candidate in baseline_profile_candidates(name, namespace):
if os.path.exists(candidate):
src = candidate
break
if not src:
return
dst = user_profile_path(name, namespace)
dst = runtime_profile_path(name, namespace)
preserve_quick_select = is_quick_select(name, namespace)
os.makedirs(os.path.dirname(dst), exist_ok=True)
shutil.copyfile(src, dst)
s = open_user_settings(name, namespace)
s = open_runtime_settings(name, namespace)
if not s.value(SETTINGS_KEYS["created_at"], ""):
s.setValue(SETTINGS_KEYS["created_at"], now_iso_utc())
if preserve_quick_select:
@@ -796,9 +837,9 @@ def is_quick_select(name: str, namespace: str | None = None) -> bool:
Returns:
bool: True if quick-select is enabled for the profile.
"""
s = _existing_user_settings(name, namespace)
s = _existing_runtime_settings(name, namespace)
if s is None:
s = _existing_default_settings(name, namespace)
s = _existing_baseline_settings(name, namespace)
if s is None:
return False
return s.value(SETTINGS_KEYS["is_quick_select"], False, type=bool)
@@ -813,13 +854,13 @@ def set_quick_select(name: str, enabled: bool, namespace: str | None = None) ->
enabled(bool): True to enable quick-select, False to disable.
namespace(str | None, optional): Namespace label. Defaults to ``None``.
"""
s = open_user_settings(name, namespace)
s = open_runtime_settings(name, namespace)
s.setValue(SETTINGS_KEYS["is_quick_select"], bool(enabled))
def list_quick_profiles(namespace: str | None = None) -> list[str]:
"""
List only profiles that have quick-select enabled (user wins over default).
List only profiles that have quick-select enabled (runtime wins over baseline).
Args:
namespace(str | None, optional): Namespace label. Defaults to ``None``.
@@ -909,8 +950,8 @@ class ProfileInfo(BaseModel):
is_quick_select: bool = False
widget_count: int = 0
size_kb: int = 0
user_path: str = ""
default_path: str = ""
runtime_path: str = ""
baseline_path: str = ""
origin: ProfileOrigin = "unknown"
is_read_only: bool = False
@@ -924,19 +965,19 @@ def get_profile_info(name: str, namespace: str | None = None) -> ProfileInfo:
namespace (str | None, optional): Namespace label. Defaults to ``None``.
Returns:
ProfileInfo: Structured profile metadata, preferring the user copy when present.
ProfileInfo: Structured profile metadata, preferring the runtime copy when present.
"""
user_paths = user_profile_candidates(name, namespace)
default_paths = default_profile_candidates(name, namespace)
u_path = next((p for p in user_paths if os.path.exists(p)), user_paths[0])
d_path = next((p for p in default_paths if os.path.exists(p)), default_paths[0])
runtime_paths = runtime_profile_candidates(name, namespace)
baseline_paths = baseline_profile_candidates(name, namespace)
r_path = next((p for p in runtime_paths if os.path.exists(p)), runtime_paths[0])
b_path = next((p for p in baseline_paths if os.path.exists(p)), baseline_paths[0])
origin = profile_origin(name, namespace)
read_only = origin in {"module", "plugin"}
prefer_user = os.path.exists(u_path)
if prefer_user:
s = QSettings(u_path, QSettings.IniFormat)
elif os.path.exists(d_path):
s = QSettings(d_path, QSettings.IniFormat)
prefer_runtime = os.path.exists(r_path)
if prefer_runtime:
s = QSettings(r_path, QSettings.IniFormat)
elif os.path.exists(b_path):
s = QSettings(b_path, QSettings.IniFormat)
else:
s = None
if s is None:
@@ -957,14 +998,14 @@ def get_profile_info(name: str, namespace: str | None = None) -> ProfileInfo:
is_quick_select=False,
widget_count=0,
size_kb=0,
user_path=u_path,
default_path=d_path,
runtime_path=r_path,
baseline_path=b_path,
origin=origin,
is_read_only=read_only,
)
created = s.value(SETTINGS_KEYS["created_at"], "", type=str) or now_iso_utc()
src_path = u_path if prefer_user else d_path
src_path = r_path if prefer_runtime else b_path
modified = _file_modified_iso(src_path)
count = _manifest_count(s)
try:
@@ -990,8 +1031,8 @@ def get_profile_info(name: str, namespace: str | None = None) -> ProfileInfo:
is_quick_select=is_quick_select(name, namespace),
widget_count=count,
size_kb=size_kb,
user_path=u_path,
default_path=d_path,
runtime_path=r_path,
baseline_path=b_path,
origin=origin,
is_read_only=read_only,
)
@@ -999,7 +1040,7 @@ def get_profile_info(name: str, namespace: str | None = None) -> ProfileInfo:
def load_profile_screenshot(name: str, namespace: str | None = None) -> QPixmap | None:
"""
Load the stored screenshot pixmap for a profile from settings (user preferred).
Load the stored screenshot pixmap for a profile from settings (runtime preferred).
Args:
name (str): Profile name without extension.
@@ -1008,17 +1049,17 @@ def load_profile_screenshot(name: str, namespace: str | None = None) -> QPixmap
Returns:
QPixmap | None: Screenshot pixmap or ``None`` if unavailable.
"""
s = _existing_user_settings(name, namespace)
s = _existing_runtime_settings(name, namespace)
if s is None:
s = _existing_default_settings(name, namespace)
s = _existing_baseline_settings(name, namespace)
if s is None:
return None
return _load_screenshot_from_settings(s)
def load_default_profile_screenshot(name: str, namespace: str | None = None) -> QPixmap | None:
def load_baseline_profile_screenshot(name: str, namespace: str | None = None) -> QPixmap | None:
"""
Load the screenshot from the default profile copy, if available.
Load the screenshot from the baseline profile copy, if available.
Args:
name (str): Profile name without extension.
@@ -1027,15 +1068,15 @@ def load_default_profile_screenshot(name: str, namespace: str | None = None) ->
Returns:
QPixmap | None: Screenshot pixmap or ``None`` if unavailable.
"""
s = _existing_default_settings(name, namespace)
s = _existing_baseline_settings(name, namespace)
if s is None:
return None
return _load_screenshot_from_settings(s)
def load_user_profile_screenshot(name: str, namespace: str | None = None) -> QPixmap | None:
def load_runtime_profile_screenshot(name: str, namespace: str | None = None) -> QPixmap | None:
"""
Load the screenshot from the user profile copy, if available.
Load the screenshot from the runtime profile copy, if available.
Args:
name (str): Profile name without extension.
@@ -1044,7 +1085,7 @@ def load_user_profile_screenshot(name: str, namespace: str | None = None) -> QPi
Returns:
QPixmap | None: Screenshot pixmap or ``None`` if unavailable.
"""
s = _existing_user_settings(name, namespace)
s = _existing_runtime_settings(name, namespace)
if s is None:
return None
return _load_screenshot_from_settings(s)
@@ -160,7 +160,7 @@ class SaveProfileDialog(QDialog):
self,
"Read-only profile",
(
f"'{name}' is a default profile provided by {provider} and cannot be overwritten.\n"
f"'{name}' is a baseline profile provided by {provider} and cannot be overwritten.\n"
"Please choose a different name."
),
)
@@ -179,7 +179,7 @@ class SaveProfileDialog(QDialog):
"Overwrite profile",
(
f"A profile named '{name}' already exists.\n\n"
"Overwriting will update both the saved profile and its restore default.\n"
"Overwriting will update both the runtime profile and its restore baseline.\n"
"Do you want to continue?"
),
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No,
@@ -257,21 +257,24 @@ class PreviewPanel(QGroupBox):
class RestoreProfileDialog(QDialog):
"""
Confirmation dialog that previews the current profile screenshot against the default baseline.
Confirmation dialog that previews the current runtime screenshot against the baseline.
"""
def __init__(
self, parent: QWidget | None, current_pixmap: QPixmap | None, default_pixmap: QPixmap | None
self,
parent: QWidget | None,
current_pixmap: QPixmap | None,
baseline_pixmap: QPixmap | None,
):
super().__init__(parent)
self.setWindowTitle("Restore Profile to Default")
self.setWindowTitle("Restore Profile to Baseline")
self.setModal(True)
self.resize(880, 480)
layout = QVBoxLayout(self)
info_label = QLabel(
"Restoring will discard your custom layout and replace it with the default profile."
"Restoring will discard your runtime layout and replace it with the baseline profile."
)
info_label.setWordWrap(True)
layout.addWidget(info_label)
@@ -280,7 +283,7 @@ class RestoreProfileDialog(QDialog):
layout.addLayout(preview_row)
current_preview = PreviewPanel("Current", current_pixmap, self)
default_preview = PreviewPanel("Default", default_pixmap, self)
baseline_preview = PreviewPanel("Baseline", baseline_pixmap, self)
# Equal expansion left/right
preview_row.addWidget(current_preview, 1)
@@ -292,7 +295,7 @@ class RestoreProfileDialog(QDialog):
arrow_label.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Expanding)
preview_row.addWidget(arrow_label)
preview_row.addWidget(default_preview, 1)
preview_row.addWidget(baseline_preview, 1)
# Enforce equal stretch for both previews
preview_row.setStretch(0, 1)
@@ -300,7 +303,7 @@ class RestoreProfileDialog(QDialog):
preview_row.setStretch(2, 1)
warn_label = QLabel(
"This action cannot be undone. Do you want to restore the default layout now?"
"This action cannot be undone. Do you want to restore the baseline layout now?"
)
warn_label.setWordWrap(True)
layout.addWidget(warn_label)
@@ -324,7 +327,7 @@ class RestoreProfileDialog(QDialog):
@staticmethod
def confirm(
parent: QWidget | None, current_pixmap: QPixmap | None, default_pixmap: QPixmap | None
parent: QWidget | None, current_pixmap: QPixmap | None, baseline_pixmap: QPixmap | None
) -> bool:
dialog = RestoreProfileDialog(parent, current_pixmap, default_pixmap)
dialog = RestoreProfileDialog(parent, current_pixmap, baseline_pixmap)
return dialog.exec() == QDialog.Accepted
@@ -48,7 +48,7 @@ class WorkSpaceManager(BECWidget, QWidget):
HEADERS = ["Actions", "Profile", "Author"]
def __init__(
self, parent=None, target_widget=None, default_profile: str | None = None, **kwargs
self, parent=None, target_widget=None, active_profile: str | None = None, **kwargs
):
super().__init__(parent=parent, **kwargs)
self.target_widget = target_widget
@@ -59,13 +59,13 @@ class WorkSpaceManager(BECWidget, QWidget):
self._init_ui()
if self.target_widget is not None and hasattr(self.target_widget, "profile_changed"):
self.target_widget.profile_changed.connect(self.on_profile_changed)
if default_profile is not None:
self._select_by_name(default_profile)
self._show_profile_details(default_profile)
if active_profile is not None:
self._select_by_name(active_profile)
self._show_profile_details(active_profile)
def _init_ui(self):
self.root_layout = QHBoxLayout(self)
self.splitter = QSplitter(Qt.Horizontal, self)
self.splitter = QSplitter(Qt.Orientation.Horizontal, self)
self.root_layout.addWidget(self.splitter)
# Init components
@@ -89,7 +89,9 @@ class WorkSpaceManager(BECWidget, QWidget):
left_panel.setMinimumWidth(220)
# Make the screenshot preview expand to fill remaining space
self.screenshot_label.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
self.screenshot_label.setSizePolicy(
QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding
)
self.right_box = QGroupBox("Profile Screenshot Preview", self)
right_col = QVBoxLayout(self.right_box)
@@ -250,8 +252,8 @@ class WorkSpaceManager(BECWidget, QWidget):
("Quick select", "Yes" if info.is_quick_select else "No"),
("Widgets", str(info.widget_count)),
("Size (KB)", str(info.size_kb)),
("User path", info.user_path or ""),
("Default path", info.default_path or ""),
("Runtime path", info.runtime_path or ""),
("Baseline path", info.baseline_path or ""),
]
for k, v in entries:
self.profile_details_tree.addTopLevelItem(QTreeWidgetItem([k, v]))
@@ -24,19 +24,9 @@ class ProfileComboBox(QComboBox):
def set_quick_profile_provider(self, provider: Callable[[], list[str]]) -> None:
self._quick_provider = provider
def refresh_profiles(
self, active_profile: str | None = None, show_empty_profile: bool = False
def _refresh_profiles(
self, current_text: str, active_profile: str | None = None, show_empty_profile: bool = False
) -> None:
"""
Refresh the profile list and ensure the active profile is visible.
Args:
active_profile(str | None): The currently active profile name.
show_empty_profile(bool): If True, show an explicit empty unsaved workspace entry.
"""
current_text = active_profile or self.currentText()
self.blockSignals(True)
self.clear()
quick_profiles = self._quick_provider()
@@ -103,7 +93,6 @@ class ProfileComboBox(QComboBox):
if index >= 0:
self.setCurrentIndex(index)
self.blockSignals(False)
if active_profile and self.currentText() != active_profile:
idx = self.findText(active_profile)
if idx >= 0:
@@ -115,6 +104,24 @@ class ProfileComboBox(QComboBox):
else:
self.setToolTip("")
def refresh_profiles(
self, active_profile: str | None = None, show_empty_profile: bool = False
) -> None:
"""
Refresh the profile list and ensure the active profile is visible.
Args:
active_profile(str | None): The currently active profile name.
show_empty_profile(bool): If True, show an explicit empty unsaved workspace entry.
"""
current_text = active_profile or self.currentText()
was_blocked = self.blockSignals(True)
try:
self._refresh_profiles(current_text, active_profile, show_empty_profile)
finally:
self.blockSignals(was_blocked)
def workspace_bundle(components: ToolbarComponents, enable_tools: bool = True) -> ToolbarBundle:
"""
@@ -122,6 +129,7 @@ def workspace_bundle(components: ToolbarComponents, enable_tools: bool = True) -
Args:
components (ToolbarComponents): The components to be added to the bundle.
enable_tools(bool): If True, show the workspace management tools; otherwise, only show the profile combo.
Returns:
ToolbarBundle: The workspace toolbar bundle.
@@ -143,15 +151,15 @@ def workspace_bundle(components: ToolbarComponents, enable_tools: bool = True) -
components.get_action("save_workspace").action.setVisible(enable_tools)
components.add_safe(
"reset_default_workspace",
"reset_baseline_workspace",
MaterialIconAction(
icon_name="undo",
tooltip="Refresh Current Workspace",
tooltip="Restore Baseline Profile",
checkable=False,
parent=components.toolbar,
),
)
components.get_action("reset_default_workspace").action.setVisible(enable_tools)
components.get_action("reset_baseline_workspace").action.setVisible(enable_tools)
components.add_safe(
"manage_workspaces",
@@ -164,7 +172,7 @@ def workspace_bundle(components: ToolbarComponents, enable_tools: bool = True) -
bundle = ToolbarBundle("workspace", components)
bundle.add_action("workspace_combo")
bundle.add_action("save_workspace")
bundle.add_action("reset_default_workspace")
bundle.add_action("reset_baseline_workspace")
bundle.add_action("manage_workspaces")
return bundle
@@ -194,9 +202,9 @@ class WorkspaceConnection(BundleConnection):
self.target_widget.load_profile
)
reset_action = self.components.get_action("reset_default_workspace").action
reset_action = self.components.get_action("reset_baseline_workspace").action
if reset_action.isVisible():
reset_action.triggered.connect(self._reset_workspace_to_default)
reset_action.triggered.connect(self._reset_workspace_to_baseline)
manage_action = self.components.get_action("manage_workspaces").action
if manage_action.isVisible():
@@ -213,9 +221,9 @@ class WorkspaceConnection(BundleConnection):
self.target_widget.load_profile
)
reset_action = self.components.get_action("reset_default_workspace").action
reset_action = self.components.get_action("reset_baseline_workspace").action
if reset_action.isVisible():
reset_action.triggered.disconnect(self._reset_workspace_to_default)
reset_action.triggered.disconnect(self._reset_workspace_to_baseline)
manage_action = self.components.get_action("manage_workspaces").action
if manage_action.isVisible():
@@ -223,8 +231,8 @@ class WorkspaceConnection(BundleConnection):
self._connected = False
@SafeSlot()
def _reset_workspace_to_default(self):
def _reset_workspace_to_baseline(self):
"""
Refreshes the current workspace.
"""
self.target_widget.restore_user_profile_from_default()
self.target_widget.restore_baseline_profile(show_dialog=True)
@@ -4,11 +4,7 @@ import webbrowser
class BECWebLinksMixin:
@staticmethod
def open_bec_docs():
webbrowser.open("https://beamline-experiment-control.readthedocs.io/en/latest/")
@staticmethod
def open_bec_widgets_docs():
webbrowser.open("https://bec.readthedocs.io/projects/bec-widgets/en/latest/")
webbrowser.open("https://bec-project.github.io/bec_docs/")
@staticmethod
def open_bec_bug_report():
@@ -355,17 +355,13 @@ class BECMainWindow(BECWidget, QMainWindow):
bec_docs = QAction("BEC Docs", self)
bec_docs.setIcon(help_icon)
widgets_docs = QAction("BEC Widgets Docs", self)
widgets_docs.setIcon(help_icon)
bug_report = QAction("Bug Report", self)
bug_report.setIcon(bug_icon)
bec_docs.triggered.connect(BECWebLinksMixin.open_bec_docs)
widgets_docs.triggered.connect(BECWebLinksMixin.open_bec_widgets_docs)
bug_report.triggered.connect(BECWebLinksMixin.open_bec_bug_report)
help_menu.addAction(bec_docs)
help_menu.addAction(widgets_docs)
help_menu.addAction(bug_report)
################################################################################
@@ -21,9 +21,9 @@ from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.widgets.control.device_control.position_indicator.position_indicator import (
PositionIndicator,
)
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import BECDeviceFilter
from bec_widgets.widgets.control.device_input.device_line_edit.device_line_edit import (
DeviceLineEdit,
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import (
BECDeviceFilter,
DeviceComboBox,
)
from bec_widgets.widgets.utility.spinner.spinner import SpinnerWidget
@@ -257,10 +257,10 @@ class PositionerBoxBase(BECWidget, QWidget):
self._dialog = QDialog(self)
self._dialog.setWindowTitle("Positioner Selection")
layout = QVBoxLayout()
line_edit = DeviceLineEdit(
line_edit = DeviceComboBox(
self, client=self.client, device_filter=[BECDeviceFilter.POSITIONER]
)
line_edit.textChanged.connect(set_positioner)
line_edit.currentTextChanged.connect(set_positioner)
layout.addWidget(line_edit)
close_button = QPushButton("Close")
close_button.clicked.connect(self._dialog.accept)
@@ -1,458 +0,0 @@
from __future__ import annotations
import enum
from bec_lib.device import ComputedSignal, Device, Positioner, ReadoutPriority
from bec_lib.device import Signal as BECSignal
from bec_lib.logger import bec_logger
from pydantic import field_validator
from bec_widgets.utils.bec_connector import ConnectionConfig
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.utils.filter_io import FilterIO
from bec_widgets.utils.widget_io import WidgetIO
logger = bec_logger.logger
class BECDeviceFilter(enum.Enum):
"""Filter for the device classes."""
DEVICE = "Device"
POSITIONER = "Positioner"
SIGNAL = "Signal"
COMPUTED_SIGNAL = "ComputedSignal"
class DeviceInputConfig(ConnectionConfig):
device_filter: list[str] = []
readout_filter: list[str] = []
devices: list[str] = []
default: str | None = None
arg_name: str | None = None
apply_filter: bool = True
signal_class_filter: list[str] = []
@field_validator("device_filter")
@classmethod
def check_device_filter(cls, v, values):
valid_device_filters = [entry.value for entry in BECDeviceFilter]
for filt in v:
if filt not in valid_device_filters:
raise ValueError(
f"Device filter {filt} is not a valid device filter {valid_device_filters}."
)
return v
@field_validator("readout_filter")
@classmethod
def check_readout_filter(cls, v, values):
valid_device_filters = [entry.value for entry in ReadoutPriority]
for filt in v:
if filt not in valid_device_filters:
raise ValueError(
f"Device filter {filt} is not a valid device filter {valid_device_filters}."
)
return v
class DeviceInputBase(BECWidget):
"""
Mixin base class for device input widgets.
It allows to filter devices from BEC based on
device class and readout priority.
"""
_device_handler = {
BECDeviceFilter.DEVICE: Device,
BECDeviceFilter.POSITIONER: Positioner,
BECDeviceFilter.SIGNAL: BECSignal,
BECDeviceFilter.COMPUTED_SIGNAL: ComputedSignal,
}
_filter_handler = {
BECDeviceFilter.DEVICE: "filter_to_device",
BECDeviceFilter.POSITIONER: "filter_to_positioner",
BECDeviceFilter.SIGNAL: "filter_to_signal",
BECDeviceFilter.COMPUTED_SIGNAL: "filter_to_computed_signal",
ReadoutPriority.MONITORED: "readout_monitored",
ReadoutPriority.BASELINE: "readout_baseline",
ReadoutPriority.ASYNC: "readout_async",
ReadoutPriority.CONTINUOUS: "readout_continuous",
ReadoutPriority.ON_REQUEST: "readout_on_request",
}
def __init__(self, parent=None, client=None, config=None, gui_id: str | None = None, **kwargs):
if config is None:
config = DeviceInputConfig(widget_class=self.__class__.__name__)
else:
if isinstance(config, dict):
config = DeviceInputConfig(**config)
self.config = config
super().__init__(
parent=parent, client=client, config=config, gui_id=gui_id, theme_update=True, **kwargs
)
self.get_bec_shortcuts()
self._device_filter = []
self._readout_filter = []
self._devices = []
### QtSlots ###
@SafeSlot(str)
def set_device(self, device: str):
"""
Set the device.
Args:
device (str): Default name.
"""
if self.validate_device(device) is True:
WidgetIO.set_value(widget=self, value=device)
self.config.default = device
else:
logger.warning(
f"Device {device} is not in the filtered selection of {self}: {self.devices}."
)
@SafeSlot()
def update_devices_from_filters(self):
"""Update the devices based on the current filter selection
in self.device_filter and self.readout_filter. If apply_filter is False,
it will not apply the filters, store the filter settings and return.
"""
current_device = WidgetIO.get_value(widget=self, as_string=True)
self.config.device_filter = self.device_filter
self.config.readout_filter = self.readout_filter
self.config.signal_class_filter = self.signal_class_filter
if self.apply_filter is False:
return
all_dev = self.dev.enabled_devices
devs = self._filter_devices_by_signal_class(all_dev)
# Filter based on device class
devs = [dev for dev in devs if self._check_device_filter(dev)]
# Filter based on readout priority
devs = [dev for dev in devs if self._check_readout_filter(dev)]
self.devices = [device.name for device in devs]
if current_device != "":
self.set_device(current_device)
@SafeSlot(list)
def set_available_devices(self, devices: list[str]):
"""
Set the devices. If a device in the list is not valid, it will not be considered.
Args:
devices (list[str]): List of devices.
"""
self.apply_filter = False
self.devices = devices
### QtProperties ###
@SafeProperty(
"QStringList",
doc="List of devices. If updated, it will disable the apply filters property.",
)
def devices(self) -> list[str]:
"""
Get the list of devices for the applied filters.
Returns:
list[str]: List of devices.
"""
return self._devices
@devices.setter
def devices(self, value: list):
self._devices = value
self.config.devices = value
FilterIO.set_selection(widget=self, selection=value)
@SafeProperty(str)
def default(self):
"""Get the default device name. If set through this property, it will update only if the device is within the filtered selection."""
return self.config.default
@default.setter
def default(self, value: str):
if self.validate_device(value) is False:
return
self.config.default = value
WidgetIO.set_value(widget=self, value=value)
@SafeProperty(bool)
def apply_filter(self):
"""Apply the filters on the devices."""
return self.config.apply_filter
@apply_filter.setter
def apply_filter(self, value: bool):
self.config.apply_filter = value
self.update_devices_from_filters()
@SafeProperty("QStringList")
def signal_class_filter(self) -> list[str]:
"""
Get the signal class filter for devices.
Returns:
list[str]: List of signal class names used for filtering devices.
"""
return self.config.signal_class_filter
@signal_class_filter.setter
def signal_class_filter(self, value: list[str] | None):
"""
Set the signal class filter and update the device list.
Args:
value (list[str] | None): List of signal class names to filter by.
"""
self.config.signal_class_filter = value or []
self.update_devices_from_filters()
@SafeProperty(bool)
def filter_to_device(self):
"""Include devices in filters."""
return BECDeviceFilter.DEVICE in self.device_filter
@filter_to_device.setter
def filter_to_device(self, value: bool):
if value is True and BECDeviceFilter.DEVICE not in self.device_filter:
self._device_filter.append(BECDeviceFilter.DEVICE)
if value is False and BECDeviceFilter.DEVICE in self.device_filter:
self._device_filter.remove(BECDeviceFilter.DEVICE)
self.update_devices_from_filters()
@SafeProperty(bool)
def filter_to_positioner(self):
"""Include devices of type Positioner in filters."""
return BECDeviceFilter.POSITIONER in self.device_filter
@filter_to_positioner.setter
def filter_to_positioner(self, value: bool):
if value is True and BECDeviceFilter.POSITIONER not in self.device_filter:
self._device_filter.append(BECDeviceFilter.POSITIONER)
if value is False and BECDeviceFilter.POSITIONER in self.device_filter:
self._device_filter.remove(BECDeviceFilter.POSITIONER)
self.update_devices_from_filters()
@SafeProperty(bool)
def filter_to_signal(self):
"""Include devices of type Signal in filters."""
return BECDeviceFilter.SIGNAL in self.device_filter
@filter_to_signal.setter
def filter_to_signal(self, value: bool):
if value is True and BECDeviceFilter.SIGNAL not in self.device_filter:
self._device_filter.append(BECDeviceFilter.SIGNAL)
if value is False and BECDeviceFilter.SIGNAL in self.device_filter:
self._device_filter.remove(BECDeviceFilter.SIGNAL)
self.update_devices_from_filters()
@SafeProperty(bool)
def filter_to_computed_signal(self):
"""Include devices of type ComputedSignal in filters."""
return BECDeviceFilter.COMPUTED_SIGNAL in self.device_filter
@filter_to_computed_signal.setter
def filter_to_computed_signal(self, value: bool):
if value is True and BECDeviceFilter.COMPUTED_SIGNAL not in self.device_filter:
self._device_filter.append(BECDeviceFilter.COMPUTED_SIGNAL)
if value is False and BECDeviceFilter.COMPUTED_SIGNAL in self.device_filter:
self._device_filter.remove(BECDeviceFilter.COMPUTED_SIGNAL)
self.update_devices_from_filters()
@SafeProperty(bool)
def readout_monitored(self):
"""Include devices with readout priority Monitored in filters."""
return ReadoutPriority.MONITORED in self.readout_filter
@readout_monitored.setter
def readout_monitored(self, value: bool):
if value is True and ReadoutPriority.MONITORED not in self.readout_filter:
self._readout_filter.append(ReadoutPriority.MONITORED)
if value is False and ReadoutPriority.MONITORED in self.readout_filter:
self._readout_filter.remove(ReadoutPriority.MONITORED)
self.update_devices_from_filters()
@SafeProperty(bool)
def readout_baseline(self):
"""Include devices with readout priority Baseline in filters."""
return ReadoutPriority.BASELINE in self.readout_filter
@readout_baseline.setter
def readout_baseline(self, value: bool):
if value is True and ReadoutPriority.BASELINE not in self.readout_filter:
self._readout_filter.append(ReadoutPriority.BASELINE)
if value is False and ReadoutPriority.BASELINE in self.readout_filter:
self._readout_filter.remove(ReadoutPriority.BASELINE)
self.update_devices_from_filters()
@SafeProperty(bool)
def readout_async(self):
"""Include devices with readout priority Async in filters."""
return ReadoutPriority.ASYNC in self.readout_filter
@readout_async.setter
def readout_async(self, value: bool):
if value is True and ReadoutPriority.ASYNC not in self.readout_filter:
self._readout_filter.append(ReadoutPriority.ASYNC)
if value is False and ReadoutPriority.ASYNC in self.readout_filter:
self._readout_filter.remove(ReadoutPriority.ASYNC)
self.update_devices_from_filters()
@SafeProperty(bool)
def readout_continuous(self):
"""Include devices with readout priority continuous in filters."""
return ReadoutPriority.CONTINUOUS in self.readout_filter
@readout_continuous.setter
def readout_continuous(self, value: bool):
if value is True and ReadoutPriority.CONTINUOUS not in self.readout_filter:
self._readout_filter.append(ReadoutPriority.CONTINUOUS)
if value is False and ReadoutPriority.CONTINUOUS in self.readout_filter:
self._readout_filter.remove(ReadoutPriority.CONTINUOUS)
self.update_devices_from_filters()
@SafeProperty(bool)
def readout_on_request(self):
"""Include devices with readout priority OnRequest in filters."""
return ReadoutPriority.ON_REQUEST in self.readout_filter
@readout_on_request.setter
def readout_on_request(self, value: bool):
if value is True and ReadoutPriority.ON_REQUEST not in self.readout_filter:
self._readout_filter.append(ReadoutPriority.ON_REQUEST)
if value is False and ReadoutPriority.ON_REQUEST in self.readout_filter:
self._readout_filter.remove(ReadoutPriority.ON_REQUEST)
self.update_devices_from_filters()
### Python Methods and Properties ###
@property
def device_filter(self) -> list[object]:
"""Get the list of filters to apply on the devices."""
return self._device_filter
@property
def readout_filter(self) -> list[str]:
"""Get the list of filters to apply on the devices"""
return self._readout_filter
def get_available_filters(self) -> list:
"""Get the available filters."""
return [entry for entry in BECDeviceFilter]
def get_readout_priority_filters(self) -> list:
"""Get the available readout priority filters."""
return [entry for entry in ReadoutPriority]
def set_device_filter(
self, filter_selection: str | BECDeviceFilter | list[str] | list[BECDeviceFilter]
):
"""
Set the device filter. If None is passed, no filters are applied and all devices included.
Args:
filter_selection (str | list[str]): Device filters. It is recommended to make an enum for the filters.
"""
filters = None
if isinstance(filter_selection, list):
filters = [self._filter_handler.get(entry) for entry in filter_selection]
if isinstance(filter_selection, str) or isinstance(filter_selection, BECDeviceFilter):
filters = [self._filter_handler.get(filter_selection)]
if filters is None or any([entry is None for entry in filters]):
logger.warning(f"Device filter {filter_selection} is not in the device filter list.")
return
for entry in filters:
setattr(self, entry, True)
def set_readout_priority_filter(
self, filter_selection: str | ReadoutPriority | list[str] | list[ReadoutPriority]
):
"""
Set the readout priority filter. If None is passed, all filters are included.
Args:
filter_selection (str | list[str]): Readout priority filters.
"""
filters = None
if isinstance(filter_selection, list):
filters = [self._filter_handler.get(entry) for entry in filter_selection]
if isinstance(filter_selection, str) or isinstance(filter_selection, ReadoutPriority):
filters = [self._filter_handler.get(filter_selection)]
if filters is None or any([entry is None for entry in filters]):
logger.warning(
f"Readout priority filter {filter_selection} is not in the readout priority list."
)
return
for entry in filters:
setattr(self, entry, True)
def _check_device_filter(
self, device: Device | BECSignal | ComputedSignal | Positioner
) -> bool:
"""Check if filter for device type is applied or not.
Args:
device(Device | Signal | ComputedSignal | Positioner): Device object.
"""
return all(isinstance(device, self._device_handler[entry]) for entry in self.device_filter)
def _filter_devices_by_signal_class(
self, devices: list[Device | BECSignal | ComputedSignal | Positioner]
) -> list[Device | BECSignal | ComputedSignal | Positioner]:
"""Filter devices by signal class, if a signal class filter is set."""
if not self.config.signal_class_filter:
return devices
if not self.client or not hasattr(self.client, "device_manager"):
return []
signals = FilterIO.update_with_signal_class(
widget=self, signal_class_filter=self.config.signal_class_filter, client=self.client
)
allowed_devices = {device_name for device_name, _, _ in signals}
return [dev for dev in devices if dev.name in allowed_devices]
def _check_readout_filter(
self, device: Device | BECSignal | ComputedSignal | Positioner
) -> bool:
"""Check if filter for readout priority is applied or not.
Args:
device(Device | Signal | ComputedSignal | Positioner): Device object.
"""
return device.readout_priority in self.readout_filter
def get_device_object(self, device: str) -> object:
"""
Get the device object based on the device name.
Args:
device(str): Device name.
Returns:
object: Device object, can be device of type Device, Positioner, Signal or ComputedSignal.
"""
self.validate_device(device)
dev = getattr(self.dev, device, None)
if dev is None:
raise ValueError(
f"Device {device} is not found in the device manager {self.dev} as enabled device."
)
return dev
def validate_device(self, device: str) -> bool:
"""
Validate the device if it is present in the filtered device selection.
Args:
device(str): Device to validate.
"""
all_devs = [dev.name for dev in self.dev.enabled_devices]
if device in self.devices and device in all_devs:
return True
return False
@@ -1,301 +0,0 @@
from bec_lib.callback_handler import EventType
from bec_lib.device import Signal
from bec_lib.logger import bec_logger
from qtpy.QtCore import Property
from bec_widgets.utils.bec_connector import ConnectionConfig
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.filter_io import FilterIO
from bec_widgets.utils.ophyd_kind_util import Kind
from bec_widgets.utils.widget_io import WidgetIO
logger = bec_logger.logger
class DeviceSignalInputBaseConfig(ConnectionConfig):
"""Configuration class for DeviceSignalInputBase."""
signal_filter: str | list[str] | None = None
signal_class_filter: list[str] | None = None
ndim_filter: int | list[int] | None = None
default: str | None = None
arg_name: str | None = None
device: str | None = None
signals: list[str] | None = None
class DeviceSignalInputBase(BECWidget):
"""
Mixin base class for device signal input widgets.
Mixin class for device signal input widgets. This class provides methods to get the device signal list and device
signal object based on the current text of the widget.
"""
RPC = False
_filter_handler = {
Kind.hinted: "include_hinted_signals",
Kind.normal: "include_normal_signals",
Kind.config: "include_config_signals",
}
def __init__(
self,
client=None,
config: DeviceSignalInputBaseConfig | dict | None = None,
gui_id: str = None,
**kwargs,
):
self.config = self._process_config_input(config)
super().__init__(client=client, config=self.config, gui_id=gui_id, **kwargs)
self._device = None
self.get_bec_shortcuts()
self._signal_filter = set()
self._signals = []
self._hinted_signals = []
self._normal_signals = []
self._config_signals = []
self._device_update_register = self.bec_dispatcher.client.callbacks.register(
EventType.DEVICE_UPDATE, self.update_signals_from_filters
)
### Qt Slots ###
@SafeSlot(str)
def set_signal(self, signal: str):
"""
Set the signal.
Args:
signal (str): signal name.
"""
if self.validate_signal(signal):
WidgetIO.set_value(widget=self, value=signal)
self.config.default = signal
else:
logger.warning(
f"Signal {signal} not found for device {self.device} and filtered selection {self.signal_filter}."
)
@SafeSlot(str)
def set_device(self, device: str | None):
"""
Set the device. If device is not valid, device will be set to None which happens
Args:
device(str): device name.
"""
if self.validate_device(device) is False:
self._device = None
else:
self._device = device
self.update_signals_from_filters()
@SafeSlot(dict, dict)
@SafeSlot()
def update_signals_from_filters(
self, content: dict | None = None, metadata: dict | None = None
):
"""Update the filters for the device signals based on list in self.signal_filter.
In addition, store the hinted, normal and config signals in separate lists to allow
customisation within QLineEdit.
Note:
Signal and ComputedSignals have no signals. The naming convention follows the device name.
"""
self.config.signal_filter = self.signal_filter
# pylint: disable=protected-access
if not self.validate_device(self._device):
self._device = None
self.config.device = self._device
self._signals = []
self._hinted_signals = []
self._normal_signals = []
self._config_signals = []
FilterIO.set_selection(widget=self, selection=self._signals)
return
device = self.get_device_object(self._device)
device_info = device._info.get("signals", {})
# See above convention for Signals and ComputedSignals
if isinstance(device, Signal):
self._signals = [(self._device, {})]
self._hinted_signals = [(self._device, {})]
self._normal_signals = []
self._config_signals = []
FilterIO.set_selection(widget=self, selection=self._signals)
return
def _update(kind: Kind):
return FilterIO.update_with_kind(
widget=self,
kind=kind,
signal_filter=self.signal_filter,
device_info=device_info,
device_name=self._device,
)
self._hinted_signals = _update(Kind.hinted)
self._normal_signals = _update(Kind.normal)
self._config_signals = _update(Kind.config)
self._signals = self._hinted_signals + self._normal_signals + self._config_signals
FilterIO.set_selection(widget=self, selection=self.signals)
### Qt Properties ###
@Property(str)
def device(self) -> str:
"""Get the selected device."""
if self._device is None:
return ""
return self._device
@device.setter
def device(self, value: str):
"""Set the device and update the filters, only allow devices present in the devicemanager."""
self._device = value
self.config.device = value
self.update_signals_from_filters()
@Property(bool)
def include_hinted_signals(self):
"""Include hinted signals in filters."""
return Kind.hinted in self.signal_filter
@include_hinted_signals.setter
def include_hinted_signals(self, value: bool):
if value:
self._signal_filter.add(Kind.hinted)
else:
self._signal_filter.discard(Kind.hinted)
self.update_signals_from_filters()
@Property(bool)
def include_normal_signals(self):
"""Include normal signals in filters."""
return Kind.normal in self.signal_filter
@include_normal_signals.setter
def include_normal_signals(self, value: bool):
if value:
self._signal_filter.add(Kind.normal)
else:
self._signal_filter.discard(Kind.normal)
self.update_signals_from_filters()
@Property(bool)
def include_config_signals(self):
"""Include config signals in filters."""
return Kind.config in self.signal_filter
@include_config_signals.setter
def include_config_signals(self, value: bool):
if value:
self._signal_filter.add(Kind.config)
else:
self._signal_filter.discard(Kind.config)
self.update_signals_from_filters()
### Properties and Methods ###
@property
def signals(self) -> list[str]:
"""
Get the list of device signals for the applied filters.
Returns:
list[str]: List of device signals.
"""
return self._signals
@signals.setter
def signals(self, value: list[str]):
self._signals = value
self.config.signals = value
FilterIO.set_selection(widget=self, selection=value)
@property
def signal_filter(self) -> list[str]:
"""Get the list of filters to apply on the device signals."""
return self._signal_filter
def get_available_filters(self) -> list[str]:
"""Get the available filters."""
return [entry for entry in self._filter_handler]
def set_filter(self, filter_selection: str | list[str]):
"""
Set the device filter. If None, all devices are included.
Args:
filter_selection (str | list[str]): Device filters from BECDeviceFilter and BECReadoutPriority.
"""
filters = None
if isinstance(filter_selection, list):
filters = [self._filter_handler.get(entry) for entry in filter_selection]
if isinstance(filter_selection, str):
filters = [self._filter_handler.get(filter_selection)]
if filters is None:
return
for entry in filters:
setattr(self, entry, True)
def get_device_object(self, device: str) -> object | None:
"""
Get the device object based on the device name.
Args:
device(str): Device name.
Returns:
object: Device object, can be device of type Device, Positioner, Signal or ComputedSignal.
"""
self.validate_device(device)
dev = getattr(self.dev, device, None)
if dev is None:
logger.warning(f"Device {device} not found in devicemanager.")
return None
return dev
def validate_device(self, device: str | None, raise_on_false: bool = False) -> bool:
"""
Validate the device if it is present in current BEC instance.
Args:
device(str): Device to validate.
raise_on_false(bool): Raise ValueError if device is not found.
"""
if device in self.dev:
return True
if raise_on_false is True:
raise ValueError(f"Device {device} not found in devicemanager.")
return False
def validate_signal(self, signal: str) -> bool:
"""
Validate the signal if it is present in the device signals.
Args:
signal(str): Signal to validate.
"""
for entry in self.signals:
if isinstance(entry, tuple):
entry = entry[0]
if entry == signal:
return True
return False
def _process_config_input(self, config: DeviceSignalInputBaseConfig | dict | None):
if config is None:
return DeviceSignalInputBaseConfig(widget_class=self.__class__.__name__)
return DeviceSignalInputBaseConfig.model_validate(config)
def cleanup(self):
"""
Cleanup the widget.
"""
self.bec_dispatcher.client.callbacks.remove(self._device_update_register)
super().cleanup()
@@ -1,32 +1,81 @@
from __future__ import annotations
import enum
from bec_lib.callback_handler import EventType
from bec_lib.device import ReadoutPriority
from qtpy.QtCore import QSize, Signal, Slot
from qtpy.QtWidgets import QComboBox, QSizePolicy
from bec_lib.device import ComputedSignal, Device, Positioner, ReadoutPriority
from bec_lib.device import Signal as BECSignal
from bec_lib.logger import bec_logger
from pydantic import Field, field_validator
from qtpy.QtCore import QSize, QStringListModel, Signal, Slot
from qtpy.QtWidgets import QComboBox, QCompleter, QSizePolicy
from bec_widgets.utils.bec_connector import ConnectionConfig
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.utils.error_popups import SafeProperty
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import (
BECDeviceFilter,
DeviceInputBase,
DeviceInputConfig,
)
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.utils.filter_io import get_bec_signals_for_classes, replace_combobox_items
logger = bec_logger.logger
class DeviceComboBox(DeviceInputBase, QComboBox):
class BECDeviceFilter(enum.Enum):
"""Filter for BEC device classes."""
DEVICE = "Device"
POSITIONER = "Positioner"
SIGNAL = "Signal"
COMPUTED_SIGNAL = "ComputedSignal"
class DeviceInputConfig(ConnectionConfig):
device_filter: list[str] = Field(default_factory=list)
readout_filter: list[str] = Field(default_factory=list)
devices: list[str] = Field(default_factory=list)
default: str | None = None
arg_name: str | None = None
apply_filter: bool = True
signal_class_filter: list[str] = Field(default_factory=list)
autocomplete: bool = False
@field_validator("device_filter")
@classmethod
def check_device_filter(cls, value):
valid_filters = [entry.value for entry in BECDeviceFilter]
for device_filter in value:
if device_filter not in valid_filters:
raise ValueError(
f"Device filter {device_filter} is not a valid device filter {valid_filters}."
)
return value
@field_validator("readout_filter")
@classmethod
def check_readout_filter(cls, value):
valid_filters = [entry.value for entry in ReadoutPriority]
for readout_filter in value:
if readout_filter not in valid_filters:
raise ValueError(
f"Readout filter {readout_filter} is not a valid readout filter {valid_filters}."
)
return value
class DeviceComboBox(BECWidget, QComboBox):
"""
Combobox widget for device input with autocomplete for device names.
Editable combobox for BEC device input.
Args:
parent: Parent widget.
client: BEC client object.
config: Device input configuration.
gui_id: GUI ID.
device_filter: Device filter, name of the device class from BECDeviceFilter and BECReadoutPriority. Check DeviceInputBase for more details.
readout_priority_filter: Readout priority filter, name of the readout priority class from BECDeviceFilter and BECReadoutPriority. Check DeviceInputBase for more details.
available_devices: List of available devices, if passed, it sets apply filters to false and device/readout priority filters will not be applied.
device_filter: Device class filter from BECDeviceFilter.
readout_priority_filter: Readout priority filter from ReadoutPriority.
available_devices: Explicit list of devices. Passing this disables automatic filtering.
default: Default device name.
arg_name: Argument name, can be used for the other widgets which has to call some other function in bec using correct argument names.
signal_class_filter: List of signal classes to filter the devices by. Only devices with signals of these classes will be shown.
arg_name: Argument name used by scan/input widgets.
signal_class_filter: Only show devices with signals of these classes.
"""
ICON_NAME = "list_alt"
@@ -37,62 +86,97 @@ class DeviceComboBox(DeviceInputBase, QComboBox):
device_reset = Signal()
device_config_update = Signal()
_device_handler = {
BECDeviceFilter.DEVICE: Device,
BECDeviceFilter.POSITIONER: Positioner,
BECDeviceFilter.SIGNAL: BECSignal,
BECDeviceFilter.COMPUTED_SIGNAL: ComputedSignal,
}
def __init__(
self,
parent=None,
client=None,
config: DeviceInputConfig = None,
config: DeviceInputConfig | dict | None = None,
gui_id: str | None = None,
device_filter: BECDeviceFilter | list[BECDeviceFilter] | None = None,
readout_priority_filter: (
str | ReadoutPriority | list[str] | list[ReadoutPriority] | None
) = None,
device_filter: BECDeviceFilter | str | list[BECDeviceFilter | str] | None = None,
readout_priority_filter: str | ReadoutPriority | list[str | ReadoutPriority] | None = None,
available_devices: list[str] | None = None,
default: str | None = None,
arg_name: str | None = None,
signal_class_filter: list[str] | None = None,
autocomplete: bool | None = None,
**kwargs,
):
super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs)
if arg_name is not None:
self.config.arg_name = arg_name
self.arg_name = arg_name
self.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Fixed)
self.setMinimumSize(QSize(100, 0))
self.config = self._process_config(config)
super().__init__(
parent=parent,
client=client,
config=self.config,
gui_id=gui_id,
theme_update=True,
**kwargs,
)
self.get_bec_shortcuts()
self._device_filter: list[BECDeviceFilter] = []
self._readout_filter: list[ReadoutPriority] = []
self._devices: list[str] = []
self._callback_id = None
self._is_valid_input = False
self._accent_colors = get_accent_colors()
self._set_first_element_as_empty = False
self._completer_model = QStringListModel(self)
self.setEditable(True)
self.setInsertPolicy(QComboBox.NoInsert)
self.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Fixed)
self.setMinimumSize(QSize(100, 0))
if arg_name is not None:
self.config.arg_name = arg_name
self.arg_name = arg_name
if available_devices is None and self.config.devices:
available_devices = self.config.devices
if device_filter is None and self.config.device_filter:
device_filter = self.config.device_filter
if readout_priority_filter is None and self.config.readout_filter:
readout_priority_filter = self.config.readout_filter
if signal_class_filter is None and self.config.signal_class_filter:
signal_class_filter = self.config.signal_class_filter
if default is None and self.config.default:
default = self.config.default
if autocomplete is not None:
self.config.autocomplete = autocomplete
if self.config.autocomplete:
self.autocomplete = True
# We do not consider the config that is passed here, this produced problems
# with QtDesigner, since config and input arguments may differ and resolve properly
# Implementing this logic and config recoverage is postponed.
# Set available devices if passed
if available_devices is not None:
self.set_available_devices(available_devices)
# Set readout priority filter default is all
if readout_priority_filter is not None:
self.set_readout_priority_filter(readout_priority_filter)
else:
self.set_readout_priority_filter(
[
ReadoutPriority.MONITORED,
ReadoutPriority.BASELINE,
ReadoutPriority.ASYNC,
ReadoutPriority.CONTINUOUS,
ReadoutPriority.ON_REQUEST,
]
)
# Device filter default is None
self.set_readout_priority_filter(
readout_priority_filter
or [
ReadoutPriority.MONITORED,
ReadoutPriority.BASELINE,
ReadoutPriority.ASYNC,
ReadoutPriority.CONTINUOUS,
ReadoutPriority.ON_REQUEST,
]
)
if device_filter is not None:
self.set_device_filter(device_filter)
if signal_class_filter is not None:
self.signal_class_filter = signal_class_filter
# Set default device if passed
if default is not None:
self.set_device(default)
else:
self.setCurrentText("")
self._callback_id = self.bec_dispatcher.client.callbacks.register(
EventType.DEVICE_UPDATE, self.on_device_update
)
@@ -100,39 +184,250 @@ class DeviceComboBox(DeviceInputBase, QComboBox):
self.currentTextChanged.connect(self.check_validity)
self.check_validity(self.currentText())
@staticmethod
def _process_config(config: DeviceInputConfig | dict | None) -> DeviceInputConfig:
if config is None:
return DeviceInputConfig(widget_class="DeviceComboBox")
return DeviceInputConfig.model_validate(config)
@SafeSlot(str)
def set_device(self, device: str):
"""Set the current device if it is valid for the current filters."""
if self.validate_device(device):
self.setCurrentText(device)
self.config.default = device
else:
logger.warning(
f"Device {device} is not in the filtered selection of {self}: {self.devices}."
)
@SafeSlot()
def update_devices_from_filters(self):
"""Refresh the available device list from current device/readout/signal filters."""
current_device = self.currentText()
self.config.device_filter = [entry.value for entry in self.device_filter]
self.config.readout_filter = [entry.value for entry in self.readout_filter]
self.config.signal_class_filter = self.signal_class_filter
if not self.apply_filter:
return
devices = self._filter_devices_by_signal_class(self.dev.enabled_devices)
devices = [device for device in devices if self._check_device_filter(device)]
devices = [device for device in devices if self._check_readout_filter(device)]
self.devices = [device.name for device in devices]
if current_device:
self.setCurrentText(current_device)
self.check_validity(current_device)
@SafeSlot(list)
def set_available_devices(self, devices: list[str]):
"""Use an explicit device list and disable automatic BEC filtering."""
self.apply_filter = False
self.devices = devices
@SafeProperty("QStringList")
def devices(self) -> list[str]:
"""Devices available after filtering."""
return self._devices
@devices.setter
def devices(self, value: list[str]):
self._devices = value
self.config.devices = value
self._replace_items(value)
@SafeProperty(str)
def default(self):
"""Default selected device."""
return self.config.default
@default.setter
def default(self, value: str):
self.set_device(value)
@SafeProperty(bool)
def apply_filter(self):
"""Whether BEC filters are applied to the device list."""
return self.config.apply_filter
@apply_filter.setter
def apply_filter(self, value: bool):
self.config.apply_filter = value
if value:
self.update_devices_from_filters()
@SafeProperty("QStringList")
def signal_class_filter(self) -> list[str]:
"""Signal class names used to restrict devices."""
return self.config.signal_class_filter
@signal_class_filter.setter
def signal_class_filter(self, value: list[str] | None):
self.config.signal_class_filter = value or []
self.update_devices_from_filters()
@SafeProperty(bool)
def filter_to_device(self):
"""Include generic Device objects."""
return BECDeviceFilter.DEVICE in self.device_filter
@filter_to_device.setter
def filter_to_device(self, value: bool):
self._set_device_filter_enabled(BECDeviceFilter.DEVICE, value)
@SafeProperty(bool)
def filter_to_positioner(self):
"""Include Positioner devices."""
return BECDeviceFilter.POSITIONER in self.device_filter
@filter_to_positioner.setter
def filter_to_positioner(self, value: bool):
self._set_device_filter_enabled(BECDeviceFilter.POSITIONER, value)
@SafeProperty(bool)
def filter_to_signal(self):
"""Include Signal devices."""
return BECDeviceFilter.SIGNAL in self.device_filter
@filter_to_signal.setter
def filter_to_signal(self, value: bool):
self._set_device_filter_enabled(BECDeviceFilter.SIGNAL, value)
@SafeProperty(bool)
def filter_to_computed_signal(self):
"""Include ComputedSignal devices."""
return BECDeviceFilter.COMPUTED_SIGNAL in self.device_filter
@filter_to_computed_signal.setter
def filter_to_computed_signal(self, value: bool):
self._set_device_filter_enabled(BECDeviceFilter.COMPUTED_SIGNAL, value)
@SafeProperty(bool)
def readout_monitored(self):
"""Include monitored devices."""
return ReadoutPriority.MONITORED in self.readout_filter
@readout_monitored.setter
def readout_monitored(self, value: bool):
self._set_readout_filter_enabled(ReadoutPriority.MONITORED, value)
@SafeProperty(bool)
def readout_baseline(self):
"""Include baseline devices."""
return ReadoutPriority.BASELINE in self.readout_filter
@readout_baseline.setter
def readout_baseline(self, value: bool):
self._set_readout_filter_enabled(ReadoutPriority.BASELINE, value)
@SafeProperty(bool)
def readout_async(self):
"""Include async devices."""
return ReadoutPriority.ASYNC in self.readout_filter
@readout_async.setter
def readout_async(self, value: bool):
self._set_readout_filter_enabled(ReadoutPriority.ASYNC, value)
@SafeProperty(bool)
def readout_continuous(self):
"""Include continuous devices."""
return ReadoutPriority.CONTINUOUS in self.readout_filter
@readout_continuous.setter
def readout_continuous(self, value: bool):
self._set_readout_filter_enabled(ReadoutPriority.CONTINUOUS, value)
@SafeProperty(bool)
def readout_on_request(self):
"""Include on-request devices."""
return ReadoutPriority.ON_REQUEST in self.readout_filter
@readout_on_request.setter
def readout_on_request(self, value: bool):
self._set_readout_filter_enabled(ReadoutPriority.ON_REQUEST, value)
@SafeProperty(bool)
def set_first_element_as_empty(self) -> bool:
"""
Whether the first element in the combobox should be empty.
This is useful to allow the user to select a device from the list.
"""
"""Whether an empty choice is inserted as the first item."""
return self._set_first_element_as_empty
@set_first_element_as_empty.setter
def set_first_element_as_empty(self, value: bool) -> None:
"""
Set whether the first element in the combobox should be empty.
This is useful to allow the user to select a device from the list.
Args:
value (bool): True if the first element should be empty, False otherwise.
"""
self._set_first_element_as_empty = value
if self._set_first_element_as_empty:
self.insertItem(0, "")
current_text = self.currentText()
if value:
if self.count() == 0 or self.itemText(0) != "":
self.insertItem(0, "")
self.setCurrentIndex(0)
elif self.count() > 0 and self.itemText(0) == "":
self.removeItem(0)
if not current_text:
self.setCurrentText("")
@SafeProperty(bool)
def autocomplete(self) -> bool:
"""Whether autocomplete suggestions are enabled while editing."""
return self.config.autocomplete
@autocomplete.setter
def autocomplete(self, value: bool) -> None:
self.config.autocomplete = value
if value:
completer = QCompleter(self._completer_model, self)
self.setCompleter(completer)
else:
if self.count() > 0 and self.itemText(0) == "":
self.removeItem(0)
self._restore_default_completer()
@property
def device_filter(self) -> list[BECDeviceFilter]:
"""Device class filters."""
return self._device_filter
@property
def readout_filter(self) -> list[ReadoutPriority]:
"""Readout priority filters."""
return self._readout_filter
@property
def is_valid_input(self) -> bool:
"""Whether the current text represents a valid device selection."""
return self._is_valid_input
def get_available_filters(self) -> list[BECDeviceFilter]:
"""Return available device class filters."""
return list(BECDeviceFilter)
def get_readout_priority_filters(self) -> list[ReadoutPriority]:
"""Return available readout priority filters."""
return list(ReadoutPriority)
def set_device_filter(
self, filter_selection: BECDeviceFilter | str | list[BECDeviceFilter | str]
):
"""Enable one or more device class filters."""
for device_filter in self._as_list(filter_selection):
normalized = self._normalize_device_filter(device_filter)
if normalized is None:
logger.warning(f"Device filter {device_filter} is not in the device filter list.")
continue
self._set_device_filter_enabled(normalized, True)
def set_readout_priority_filter(
self, filter_selection: ReadoutPriority | str | list[ReadoutPriority | str]
):
"""Enable one or more readout priority filters."""
for readout_filter in self._as_list(filter_selection):
normalized = self._normalize_readout_filter(readout_filter)
if normalized is None:
logger.warning(
f"Readout priority filter {readout_filter} is not in the readout priority list."
)
continue
self._set_readout_filter_enabled(normalized, True)
def on_device_update(self, action: str, content: dict) -> None:
"""
Callback for device update events. Triggers the device_update signal.
Args:
action (str): The action that triggered the event.
content (dict): The content of the config update.
"""
"""Refresh filters when the BEC device configuration changes."""
if action in ["add", "remove", "reload"]:
self.device_config_update.emit()
@@ -143,21 +438,13 @@ class DeviceComboBox(DeviceInputBase, QComboBox):
super().cleanup()
def get_current_device(self) -> object:
"""
Get the current device object based on the current value.
Returns:
object: Device object, can be device of type Device, Positioner, Signal or ComputedSignal.
"""
dev_name = self.currentText()
return self.get_device_object(dev_name)
"""Return the current BEC device object."""
return self.get_device_object(self._device_name_from_text(self.currentText()))
@Slot(str)
def check_validity(self, input_text: str) -> None:
"""
Check if the current value is a valid device name.
"""
if self.validate_device(input_text) is True:
"""Validate current text and update visual state."""
if self.validate_device(input_text):
self._is_valid_input = True
self.device_selected.emit(input_text)
self.setStyleSheet("border: 1px solid transparent;")
@@ -167,33 +454,105 @@ class DeviceComboBox(DeviceInputBase, QComboBox):
if self.isEnabled():
self.setStyleSheet("border: 1px solid red;")
def validate_device(self, device: str) -> bool: # type: ignore[override]
"""
Extend validation so that previewsignal pseudodevices (labels like
``"eiger_preview"``) are accepted as valid choices.
def validate_device(self, device: str | None) -> bool:
"""Validate a device against the current filtered device selection."""
if not device:
return False
device_name = self._device_name_from_text(device)
all_devices = [dev.name for dev in self.dev.enabled_devices]
return device_name in self.devices and device_name in all_devices
The validation run only on device not on the previewsignal.
def get_device_object(self, device: str) -> object:
"""Return a device object by name."""
dev = getattr(self.dev, device, None)
if dev is None:
raise ValueError(
f"Device {device} is not found in the device manager {self.dev} as enabled device."
)
return dev
Args:
device: The text currently entered/selected.
@staticmethod
def _as_list(value):
return value if isinstance(value, list) else [value]
Returns:
True if the device is a genuine BEC device *or* one of the
whitelisted previewsignal entries.
"""
idx = self.findText(device)
if idx >= 0 and isinstance(self.itemData(idx), tuple):
device = self.itemData(idx)[0] # type: ignore[assignment]
return super().validate_device(device)
@staticmethod
def _normalize_device_filter(value: BECDeviceFilter | str) -> BECDeviceFilter | None:
if isinstance(value, BECDeviceFilter):
return value
return BECDeviceFilter._value2member_map_.get(value)
@property
def is_valid_input(self) -> bool:
"""Whether the current text represents a valid device selection."""
return self._is_valid_input
@staticmethod
def _normalize_readout_filter(value: ReadoutPriority | str) -> ReadoutPriority | None:
if isinstance(value, ReadoutPriority):
return value
return ReadoutPriority._value2member_map_.get(value)
def _set_device_filter_enabled(self, device_filter: BECDeviceFilter, enabled: bool):
if enabled and device_filter not in self._device_filter:
self._device_filter.append(device_filter)
elif not enabled and device_filter in self._device_filter:
self._device_filter.remove(device_filter)
self.update_devices_from_filters()
def _set_readout_filter_enabled(self, readout_filter: ReadoutPriority, enabled: bool):
if enabled and readout_filter not in self._readout_filter:
self._readout_filter.append(readout_filter)
elif not enabled and readout_filter in self._readout_filter:
self._readout_filter.remove(readout_filter)
self.update_devices_from_filters()
def _check_device_filter(
self, device: Device | BECSignal | ComputedSignal | Positioner
) -> bool:
if not self.device_filter:
return True
return any(isinstance(device, self._device_handler[entry]) for entry in self.device_filter)
def _check_readout_filter(
self, device: Device | BECSignal | ComputedSignal | Positioner
) -> bool:
if not self.readout_filter:
return True
return device.readout_priority in self.readout_filter
def _filter_devices_by_signal_class(
self, devices: list[Device | BECSignal | ComputedSignal | Positioner]
) -> list[Device | BECSignal | ComputedSignal | Positioner]:
if not self.config.signal_class_filter:
return devices
signals = get_bec_signals_for_classes(
client=self.client, signal_class_filter=self.config.signal_class_filter
)
allowed_devices = {device_name for device_name, _, _ in signals}
return [device for device in devices if device.name in allowed_devices]
def _replace_items(self, devices: list[str]):
current_text = self.currentText()
replace_combobox_items(self, devices)
self._update_completer_model(devices)
if self._set_first_element_as_empty:
self.insertItem(0, "")
self.setCurrentText(current_text)
def _update_completer_model(self, items: list[str]) -> None:
self._completer_model.setStringList(items)
def _restore_default_completer(self) -> None:
if self.completer() is not None and self.completer().model() == self.model():
return
current_text = self.currentText()
self.setEditable(False)
self.setEditable(True)
self.setCurrentText(current_text)
def _device_name_from_text(self, text: str) -> str:
index = self.findText(text)
if index >= 0 and isinstance(self.itemData(index), tuple):
return self.itemData(index)[0]
return text
if __name__ == "__main__": # pragma: no cover
# pylint: disable=import-outside-toplevel
from qtpy.QtWidgets import (
QApplication,
QCheckBox,
@@ -235,10 +594,7 @@ if __name__ == "__main__": # pragma: no cover
def _apply_filters():
raw = class_input.text().strip()
if raw:
combo.signal_class_filter = [entry.strip() for entry in raw.split(",") if entry.strip()]
else:
combo.signal_class_filter = []
combo.signal_class_filter = [entry.strip() for entry in raw.split(",") if entry.strip()]
combo.filter_to_device = filter_device.isChecked()
combo.filter_to_positioner = filter_positioner.isChecked()
combo.filter_to_signal = filter_signal.isChecked()
@@ -1,197 +0,0 @@
from bec_lib.callback_handler import EventType
from bec_lib.device import ReadoutPriority
from bec_lib.logger import bec_logger
from qtpy.QtCore import QSize, Signal, Slot
from qtpy.QtGui import QPainter, QPaintEvent, QPen
from qtpy.QtWidgets import QApplication, QCompleter, QLineEdit, QSizePolicy
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import (
BECDeviceFilter,
DeviceInputBase,
DeviceInputConfig,
)
logger = bec_logger.logger
class DeviceLineEdit(DeviceInputBase, QLineEdit):
"""
Line edit widget for device input with autocomplete for device names.
Args:
parent: Parent widget.
client: BEC client object.
config: Device input configuration.
gui_id: GUI ID.
device_filter: Device filter, name of the device class from BECDeviceFilter and BECReadoutPriority. Check DeviceInputBase for more details.
readout_priority_filter: Readout priority filter, name of the readout priority class from BECDeviceFilter and BECReadoutPriority. Check DeviceInputBase for more details.
available_devices: List of available devices, if passed, it sets apply filters to false and device/readout priority filters will not be applied.
default: Default device name.
arg_name: Argument name, can be used for the other widgets which has to call some other function in bec using correct argument names.
"""
device_selected = Signal(str)
device_config_update = Signal()
PLUGIN = True
RPC = False
ICON_NAME = "edit_note"
def __init__(
self,
parent=None,
client=None,
config: DeviceInputConfig = None,
gui_id: str | None = None,
device_filter: BECDeviceFilter | list[BECDeviceFilter] | None = None,
readout_priority_filter: (
str | ReadoutPriority | list[str] | list[ReadoutPriority] | None
) = None,
available_devices: list[str] | None = None,
default: str | None = None,
arg_name: str | None = None,
**kwargs,
):
self._callback_id = None
self.__is_valid_input = False
self._accent_colors = get_accent_colors()
super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs)
self.completer = QCompleter(self)
self.setCompleter(self.completer)
if arg_name is not None:
self.config.arg_name = arg_name
self.arg_name = arg_name
self.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Fixed)
self.setMinimumSize(QSize(100, 0))
# We do not consider the config that is passed here, this produced problems
# with QtDesigner, since config and input arguments may differ and resolve properly
# Implementing this logic and config recoverage is postponed.
# Set available devices if passed
if available_devices is not None:
self.set_available_devices(available_devices)
# Set readout priority filter default is all
if readout_priority_filter is not None:
self.set_readout_priority_filter(readout_priority_filter)
else:
self.set_readout_priority_filter(
[
ReadoutPriority.MONITORED,
ReadoutPriority.BASELINE,
ReadoutPriority.ASYNC,
ReadoutPriority.CONTINUOUS,
ReadoutPriority.ON_REQUEST,
]
)
# Device filter default is None
if device_filter is not None:
self.set_device_filter(device_filter)
# Set default device if passed
if default is not None:
self.set_device(default)
self._callback_id = self.bec_dispatcher.client.callbacks.register(
EventType.DEVICE_UPDATE, self.on_device_update
)
self.device_config_update.connect(self.update_devices_from_filters)
self.textChanged.connect(self.check_validity)
self.check_validity(self.text())
@property
def _is_valid_input(self) -> bool:
"""
Check if the current value is a valid device name.
Returns:
bool: True if the current value is a valid device name, False otherwise.
"""
return self.__is_valid_input
@_is_valid_input.setter
def _is_valid_input(self, value: bool) -> None:
self.__is_valid_input = value
def on_device_update(self, action: str, content: dict) -> None:
"""
Callback for device update events. Triggers the device_update signal.
Args:
action (str): The action that triggered the event.
content (dict): The content of the config update.
"""
if action in ["add", "remove", "reload"]:
self.device_config_update.emit()
def cleanup(self):
"""Cleanup the widget."""
if self._callback_id is not None:
self.bec_dispatcher.client.callbacks.remove(self._callback_id)
super().cleanup()
def get_current_device(self) -> object:
"""
Get the current device object based on the current value.
Returns:
object: Device object, can be device of type Device, Positioner, Signal or ComputedSignal.
"""
dev_name = self.text()
return self.get_device_object(dev_name)
def paintEvent(self, event: QPaintEvent) -> None:
"""Extend the paint event to set the border color based on the validity of the input.
Args:
event (PySide6.QtGui.QPaintEvent) : Paint event.
"""
# logger.info(f"Received paint event: {event} in {self.__class__}")
super().paintEvent(event)
if self._is_valid_input is False and self.isEnabled() is True:
painter = QPainter(self)
pen = QPen()
pen.setWidth(2)
pen.setColor(self._accent_colors.emergency)
painter.setPen(pen)
painter.drawRect(self.rect().adjusted(1, 1, -1, -1))
painter.end()
@Slot(str)
def check_validity(self, input_text: str) -> None:
"""
Check if the current value is a valid device name.
"""
if self.validate_device(input_text) is True:
self._is_valid_input = True
self.device_selected.emit(input_text)
else:
self._is_valid_input = False
self.update()
if __name__ == "__main__": # pragma: no cover
# pylint: disable=import-outside-toplevel
from qtpy.QtWidgets import QVBoxLayout, QWidget
from bec_widgets.utils.colors import apply_theme
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import (
SignalComboBox,
)
app = QApplication([])
apply_theme("dark")
widget = QWidget()
widget.setFixedSize(200, 200)
layout = QVBoxLayout()
widget.setLayout(layout)
line_edit = DeviceLineEdit()
line_edit.filter_to_positioner = True
signal_line_edit = SignalComboBox()
line_edit.textChanged.connect(signal_line_edit.set_device)
line_edit.set_available_devices(["samx", "samy", "samz"])
line_edit.set_device("samx")
layout.addWidget(line_edit)
layout.addWidget(signal_line_edit)
widget.show()
app.exec_()
@@ -1 +0,0 @@
{'files': ['device_line_edit.py']}
@@ -1,59 +0,0 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from qtpy.QtWidgets import QWidget
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.control.device_input.device_line_edit.device_line_edit import (
DeviceLineEdit,
)
DOM_XML = """
<ui language='c++'>
<widget class='DeviceLineEdit' name='device_line_edit'>
</widget>
</ui>
"""
class DeviceLineEditPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
if parent is None:
return QWidget()
t = DeviceLineEdit(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return "BEC Input Widgets"
def icon(self):
return designer_material_icon(DeviceLineEdit.ICON_NAME)
def includeFile(self):
return "device_line_edit"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "DeviceLineEdit"
def toolTip(self):
return ""
def whatsThis(self):
return self.toolTip()
@@ -1,17 +0,0 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.control.device_input.device_line_edit.device_line_edit_plugin import (
DeviceLineEditPlugin,
)
QPyDesignerCustomWidgetCollection.addCustomWidget(DeviceLineEditPlugin())
if __name__ == "__main__": # pragma: no cover
main()
@@ -1,37 +1,54 @@
from __future__ import annotations
from qtpy.QtCore import QSize, Qt, Signal
from qtpy.QtWidgets import QComboBox, QSizePolicy
from bec_lib.callback_handler import EventType
from bec_lib.device import Signal as BECSignal
from bec_lib.logger import bec_logger
from qtpy.QtCore import Property, QSize, QStringListModel, Qt, Signal, Slot
from qtpy.QtWidgets import QComboBox, QCompleter, QSizePolicy
from bec_widgets.utils.bec_connector import ConnectionConfig
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.utils.filter_io import ComboBoxFilterHandler, FilterIO
from bec_widgets.utils.ophyd_kind_util import Kind
from bec_widgets.widgets.control.device_input.base_classes.device_signal_input_base import (
DeviceSignalInputBase,
DeviceSignalInputBaseConfig,
from bec_widgets.utils.filter_io import (
get_bec_signals_for_classes,
replace_combobox_items,
signal_items_for_kind,
)
from bec_widgets.utils.ophyd_kind_util import Kind
logger = bec_logger.logger
class SignalComboBox(DeviceSignalInputBase, QComboBox):
class SignalComboBoxConfig(ConnectionConfig):
"""Configuration for SignalComboBox."""
signal_filter: list[str] | None = None
signal_class_filter: list[str] | None = None
ndim_filter: int | list[int] | None = None
default: str | None = None
arg_name: str | None = None
device: str | None = None
signals: list[str] | None = None
autocomplete: bool = False
class SignalComboBox(BECWidget, QComboBox):
"""
Line edit widget for device input with autocomplete for device names.
Editable combobox for selecting BEC device signals.
Args:
parent: Parent widget.
client: BEC client object.
config: Device input configuration.
config: Signal combobox configuration.
gui_id: GUI ID.
device: Device name to filter signals from.
signal_filter: Signal filter, list of signal kinds from ophyd Kind enum. Check DeviceSignalInputBase for more details.
signal_class_filter: List of signal classes to filter the signals by. Only signals of these classes will be shown.
ndim_filter: Dimensionality filter, int or list of ints to filter signals by their number of dimensions. If signal do not support ndim, it will be included in the selection anyway.
default: Default device name.
arg_name: Argument name, can be used for the other widgets which has to call some other function in bec using correct argument names.
store_signal_config: Whether to store the full signal config in the combobox item data.
signal_filter: Signal kind filters from Kind.
signal_class_filter: Signal classes to show.
ndim_filter: Dimensionality filter for signal-class based lists.
default: Default signal name.
arg_name: Argument name used by scan/input widgets.
store_signal_config: Whether to store signal config in item data.
require_device: If True, signals are only shown/validated when a device is set.
Signals:
device_signal_changed: Emitted when the current text represents a valid signal selection.
signal_reset: Emitted when validation fails and the selection should be treated as cleared.
"""
USER_ACCESS = ["set_signal", "set_device", "signals", "get_signal_name"]
@@ -47,289 +64,368 @@ class SignalComboBox(DeviceSignalInputBase, QComboBox):
self,
parent=None,
client=None,
config: DeviceSignalInputBaseConfig | None = None,
config: SignalComboBoxConfig | dict | None = None,
gui_id: str | None = None,
device: str | None = None,
signal_filter: list[Kind] | None = None,
signal_filter: list[Kind | str] | Kind | str | None = None,
signal_class_filter: list[str] | None = None,
ndim_filter: int | list[int] | None = None,
default: str | None = None,
arg_name: str | None = None,
store_signal_config: bool = True,
require_device: bool = False,
autocomplete: bool | None = None,
**kwargs,
):
super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs)
self.config = self._process_config(config)
super().__init__(parent=parent, client=client, config=self.config, gui_id=gui_id, **kwargs)
self.get_bec_shortcuts()
self._device: str | None = None
self._signal_filter: set[Kind] = set()
self._signals: list[str | tuple[str, dict]] = []
self._hinted_signals: list[tuple[str, dict]] = []
self._normal_signals: list[tuple[str, dict]] = []
self._config_signals: list[tuple[str, dict]] = []
self._set_first_element_as_empty = False
self._signal_class_filter = signal_class_filter or []
self._store_signal_config = store_signal_config
self._require_device = require_device
self._is_valid_input = False
self._completer_model = QStringListModel(self)
if arg_name is not None:
self.config.arg_name = arg_name
self.arg_name = arg_name
if default is not None:
self.set_device(default)
if signal_filter is None and self.config.signal_filter:
signal_filter = self.config.signal_filter
if signal_class_filter is None and self.config.signal_class_filter:
self._signal_class_filter = self.config.signal_class_filter
if ndim_filter is None and self.config.ndim_filter is not None:
ndim_filter = self.config.ndim_filter
if device is None and self.config.device:
device = self.config.device
if default is None and self.config.default:
default = self.config.default
if autocomplete is not None:
self.config.autocomplete = autocomplete
self.config.ndim_filter = ndim_filter
self.setEditable(True)
self.setInsertPolicy(QComboBox.NoInsert)
self.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Fixed)
self.setMinimumSize(QSize(100, 0))
self._set_first_element_as_empty = True
self._signal_class_filter = signal_class_filter or []
self._store_signal_config = store_signal_config
self.config.ndim_filter = ndim_filter or None
self._require_device = require_device
self._is_valid_input = False
if self.config.autocomplete:
self.autocomplete = True
# Note: Runtime arguments (e.g. device, default, arg_name) intentionally take
# precedence over values from the passed-in config. Full reconciliation and
# restoration of state between designer-provided config and runtime arguments
# is not yet implemented, as earlier attempts caused issues with QtDesigner.
self._device_update_register = self.bec_dispatcher.client.callbacks.register(
EventType.DEVICE_UPDATE, self.update_signals_from_filters
)
self.currentTextChanged.connect(self.on_text_changed)
# Kind filtering is always applied; class filtering is additive. If signal_filter is None,
# we default to hinted+normal, even when signal_class_filter is empty or None. To disable
# kinds, pass an explicit signal_filter or toggle include_* after init.
if signal_filter is not None:
self.set_filter(signal_filter)
else:
self.set_filter([Kind.hinted, Kind.normal, Kind.config])
self.set_filter(signal_filter or [Kind.hinted, Kind.normal, Kind.config])
if device is not None:
self.set_device(device)
if default is not None:
self.set_signal(default)
self.check_validity(self.currentText())
@staticmethod
def _process_config(config: SignalComboBoxConfig | dict | None) -> SignalComboBoxConfig:
if config is None:
return SignalComboBoxConfig(widget_class="SignalComboBox")
return SignalComboBoxConfig.model_validate(config)
@SafeSlot(str)
def set_signal(self, signal: str):
"""Set the current signal if it is available in the combobox."""
display_text = self._display_text_for_signal(signal)
if display_text is None:
logger.warning(
f"Signal {signal} not found for device {self.device} and filtered selection {self.signal_filter}."
)
return
self.setCurrentText(display_text)
self.config.default = signal
@SafeSlot(str)
def set_device(self, device: str | None):
"""
Set the device. When signal_class_filter is active, ensures base-class
logic runs and then refreshes the signal list to show only signals from
that device matching the signal class filter.
Args:
device(str): device name.
"""
super().set_device(device)
if self._signal_class_filter:
# Refresh the signal list to show only this device's signals
self.update_signals_from_signal_classes()
"""Set the device that scopes kind-based signal filtering."""
if not self.validate_device(device):
self._device = None
else:
self._device = device
self.config.device = self._device
self.update_signals_from_filters()
@SafeSlot()
@SafeSlot(dict, dict)
def update_signals_from_filters(
self, content: dict | None = None, metadata: dict | None = None
):
"""Update the filters for the combobox.
When signal_class_filter is active, skip the normal Kind-based filtering.
Args:
content (dict | None): Content dictionary from BEC event.
metadata (dict | None): Metadata dictionary from BEC event.
"""
super().update_signals_from_filters(content, metadata)
"""Refresh available signals from the current device and filters."""
self.config.signal_filter = [kind.name for kind in self.signal_filter]
if self._signal_class_filter:
self.update_signals_from_signal_classes()
return
# pylint: disable=protected-access
if FilterIO._find_handler(self) is ComboBoxFilterHandler:
if len(self._config_signals) > 0:
self.insertItem(
len(self._hinted_signals) + len(self._normal_signals), "Config Signals"
)
self.model().item(len(self._hinted_signals) + len(self._normal_signals)).setEnabled(
False
)
if len(self._normal_signals) > 0:
self.insertItem(len(self._hinted_signals), "Normal Signals")
self.model().item(len(self._hinted_signals)).setEnabled(False)
if len(self._hinted_signals) > 0:
self.insertItem(0, "Hinted Signals")
self.model().item(0).setEnabled(False)
if not self.validate_device(self._device):
self._device = None
self.config.device = None
self._set_signal_groups([], [], [])
return
device = self.get_device_object(self._device)
device_info = device._info.get("signals", {})
if isinstance(device, BECSignal):
self._set_signal_groups([(self._device, {})], [], [])
return
self._set_signal_groups(
signal_items_for_kind(
kind=Kind.hinted,
signal_filter=self.signal_filter,
device_info=device_info,
device_name=self._device,
),
signal_items_for_kind(
kind=Kind.normal,
signal_filter=self.signal_filter,
device_info=device_info,
device_name=self._device,
),
signal_items_for_kind(
kind=Kind.config,
signal_filter=self.signal_filter,
device_info=device_info,
device_name=self._device,
),
)
@Property(str)
def device(self) -> str:
"""Selected device."""
return self._device or ""
@device.setter
def device(self, value: str):
self.set_device(value)
@Property(bool)
def include_hinted_signals(self):
"""Include hinted signals."""
return Kind.hinted in self.signal_filter
@include_hinted_signals.setter
def include_hinted_signals(self, value: bool):
self._set_kind_filter_enabled(Kind.hinted, value)
@Property(bool)
def include_normal_signals(self):
"""Include normal signals."""
return Kind.normal in self.signal_filter
@include_normal_signals.setter
def include_normal_signals(self, value: bool):
self._set_kind_filter_enabled(Kind.normal, value)
@Property(bool)
def include_config_signals(self):
"""Include config signals."""
return Kind.config in self.signal_filter
@include_config_signals.setter
def include_config_signals(self, value: bool):
self._set_kind_filter_enabled(Kind.config, value)
@SafeProperty(bool)
def set_first_element_as_empty(self) -> bool:
"""
Whether the first element in the combobox should be empty.
This is useful to allow the user to select a device from the list.
"""
"""Whether an empty choice is inserted as the first item."""
return self._set_first_element_as_empty
@set_first_element_as_empty.setter
def set_first_element_as_empty(self, value: bool) -> None:
"""
Set whether the first element in the combobox should be empty.
This is useful to allow the user to select a device from the list.
Args:
value (bool): True if the first element should be empty, False otherwise.
"""
self._set_first_element_as_empty = value
if self._set_first_element_as_empty:
self.insertItem(0, "")
if value:
if self.count() == 0 or self.itemText(0) != "":
self.insertItem(0, "")
self.setCurrentIndex(0)
else:
if self.count() > 0 and self.itemText(0) == "":
self.removeItem(0)
elif self.count() > 0 and self.itemText(0) == "":
self.removeItem(0)
@SafeProperty("QStringList")
def signal_class_filter(self) -> list[str]:
"""
Get the list of signal classes to filter.
Returns:
list[str]: List of signal class names to filter.
"""
"""Signal class names used to build the signal list."""
return self._signal_class_filter
@signal_class_filter.setter
def signal_class_filter(self, value: list[str] | None):
"""
Set the signal class filter.
Args:
value (list[str] | None): List of signal class names to filter, or None/empty
to disable class-based filtering and revert to the default behavior.
"""
normalized_value = value or []
self._signal_class_filter = normalized_value
self.config.signal_class_filter = normalized_value
if self._signal_class_filter:
self.update_signals_from_signal_classes()
else:
self.update_signals_from_filters()
self._signal_class_filter = value or []
self.config.signal_class_filter = self._signal_class_filter
self.update_signals_from_filters()
@SafeProperty(int)
def ndim_filter(self) -> int:
"""Dimensionality filter for signals."""
"""Dimensionality filter for signal-class based lists."""
return self.config.ndim_filter if isinstance(self.config.ndim_filter, int) else -1
@ndim_filter.setter
def ndim_filter(self, value: int):
self.config.ndim_filter = None if value < 0 else value
if self._signal_class_filter:
self.update_signals_from_signal_classes(ndim_filter=self.config.ndim_filter)
self.update_signals_from_filters()
@SafeProperty(bool)
def require_device(self) -> bool:
"""
If True, signals are only shown/validated when a device is set.
Note:
This property affects list rebuilding only when a signal_class_filter
is active. Without a signal class filter, the available signals are
managed by the standard Kind-based filtering.
"""
"""Whether validation/listing requires a selected device."""
return self._require_device
@require_device.setter
def require_device(self, value: bool):
self._require_device = value
# Rebuild list when toggled, but only when using signal_class_filter
if self._signal_class_filter:
self.update_signals_from_signal_classes()
self.update_signals_from_filters()
def set_to_obj_name(self, obj_name: str) -> bool:
"""
Set the combobox to the object name of the signal.
@SafeProperty(bool)
def autocomplete(self) -> bool:
"""Whether autocomplete suggestions are enabled while editing."""
return self.config.autocomplete
Args:
obj_name (str): Object name of the signal.
@autocomplete.setter
def autocomplete(self, value: bool) -> None:
self.config.autocomplete = value
if value:
completer = QCompleter(self._completer_model, self)
self.setCompleter(completer)
else:
self._restore_default_completer()
Returns:
bool: True if the object name was found and set, False otherwise.
"""
for i in range(self.count()):
signal_data = self.itemData(i)
if signal_data and signal_data.get("obj_name") == obj_name:
self.setCurrentIndex(i)
return True
@property
def signals(self) -> list[str | tuple[str, dict]]:
"""Available signals after filtering."""
return self._signals
@signals.setter
def signals(self, value: list[str | tuple[str, dict]]):
self._signals = value
self.config.signals = [entry[0] if isinstance(entry, tuple) else entry for entry in value]
self._replace_signal_items()
@property
def signal_filter(self) -> set[Kind]:
"""Signal kind filters."""
return self._signal_filter
@property
def is_valid_input(self) -> bool:
"""Whether the current text represents a valid signal selection."""
return self._is_valid_input
@property
def selected_signal_comp_name(self) -> str:
"""Component name for the current signal, falling back to object name."""
index = self._find_signal_index(self.currentText())
if index < 0:
return self.get_signal_name()
signal_info = self.itemData(index)
if isinstance(signal_info, dict):
return signal_info.get("component_name") or self.get_signal_name()
return self.get_signal_name()
def set_filter(self, filter_selection: Kind | str | list[Kind | str] | None):
"""Enable one or more signal kind filters."""
if filter_selection is None:
return
filters = filter_selection if isinstance(filter_selection, list) else [filter_selection]
for signal_filter in filters:
kind = self._normalize_kind(signal_filter)
if kind is not None:
self._signal_filter.add(kind)
self.update_signals_from_filters()
def get_available_filters(self) -> list[Kind]:
"""Return available signal kind filters."""
return [Kind.hinted, Kind.normal, Kind.config]
def get_device_object(self, device: str) -> object | None:
"""Return a BEC device object by name."""
dev = getattr(self.dev, device, None)
if dev is None:
logger.warning(f"Device {device} not found in devicemanager.")
return None
return dev
def validate_device(self, device: str | None, raise_on_false: bool = False) -> bool:
"""Validate that a device exists in the current device manager."""
if device in self.dev:
return True
if raise_on_false:
raise ValueError(f"Device {device} not found in devicemanager.")
return False
def set_to_first_enabled(self) -> bool:
"""
Set the combobox to the first enabled item.
def validate_signal(self, signal: str) -> bool:
"""Validate a signal by display text, object name, or component name."""
return self._display_text_for_signal(signal) is not None
Returns:
bool: True if an enabled item was found and set, False otherwise.
"""
for i in range(self.count()):
if self.model().item(i).isEnabled():
self.setCurrentIndex(i)
def set_to_obj_name(self, obj_name: str) -> bool:
"""Select the item whose signal config has the given object name."""
index = self._find_signal_index(obj_name)
if index < 0:
return False
self.setCurrentIndex(index)
return True
def set_to_first_enabled(self) -> bool:
"""Select the first enabled item."""
for index in range(self.count()):
item = self.model().item(index)
if item is not None and item.isEnabled():
self.setCurrentIndex(index)
return True
return False
def get_signal_name(self) -> str:
"""
Get the signal name from the combobox.
Returns:
str: The signal name.
"""
signal_name = self.currentText()
index = self.findText(signal_name)
if index == -1:
return signal_name
"""Return the selected signal object name when available."""
current_text = self.currentText()
index = self._find_signal_index(current_text)
if index < 0:
return current_text
signal_info = self.itemData(index)
if signal_info:
signal_name = signal_info.get("obj_name", signal_name)
return signal_name if signal_name else ""
if isinstance(signal_info, dict):
return signal_info.get("obj_name") or current_text
return current_text
def get_signal_config(self) -> dict | None:
"""
Get the signal config from the combobox for the currently selected signal.
Returns:
dict | None: The signal configuration dictionary or None if not available.
"""
"""Return the selected signal config if item-data storage is enabled."""
if not self._store_signal_config:
return None
index = self.currentIndex()
if index == -1:
return None
signal_info = self.itemData(index)
return signal_info if signal_info else None
signal_info = self.itemData(self.currentIndex())
return signal_info if isinstance(signal_info, dict) else None
def update_signals_from_signal_classes(self, ndim_filter: int | list[int] | None = None):
"""
Update the combobox with signals filtered by signal classes and optionally by ndim.
Uses device_manager.get_bec_signals() to retrieve signals.
If a device is set, only shows signals from that device.
Args:
ndim_filter (int | list[int] | None): Filter signals by dimensionality.
If provided, only signals with matching ndim will be included.
Can be a single int or a list of ints. Use None to include all dimensions.
If not provided, uses the previously set ndim_filter.
"""
"""Refresh signals from device_manager.get_bec_signals for class-based filtering."""
if not self._signal_class_filter:
return
if self._require_device and not self._device:
self.clear()
self._signals = []
FilterIO.set_selection(widget=self, selection=self._signals)
self.signals = []
return
# Update stored ndim_filter if a new one is provided
if ndim_filter is not None:
self.config.ndim_filter = ndim_filter
self.clear()
# Get signals with ndim filtering applied at the FilterIO level
signals = FilterIO.update_with_signal_class(
widget=self,
signal_class_filter=self._signal_class_filter,
signals = get_bec_signals_for_classes(
client=self.client,
ndim_filter=self.config.ndim_filter, # Pass ndim_filter to FilterIO
signal_class_filter=self._signal_class_filter,
ndim_filter=self.config.ndim_filter,
)
# Track signals for validation and FilterIO selection
self.clear()
self._signals = []
for device_name, signal_name, signal_config in signals:
# Filter by device if one is set
if self._device and device_name != self._device:
continue
if self._signal_filter:
@@ -339,53 +435,44 @@ class SignalComboBox(DeviceSignalInputBase, QComboBox):
}:
continue
# Get storage_name for tooltip
storage_name = signal_config.get("storage_name", "")
# Store the full signal config as item data if requested
if self._store_signal_config:
self.addItem(signal_name, signal_config)
else:
self.addItem(signal_name)
# Track for validation
self._signals.append(signal_name)
# Set tooltip to storage_name (Qt.ToolTipRole = 3)
storage_name = signal_config.get("storage_name", "")
if storage_name:
self.setItemData(self.count() - 1, storage_name, Qt.ItemDataRole.ToolTipRole)
# Keep FilterIO selection in sync for validate_signal
FilterIO.set_selection(widget=self, selection=self._signals)
self.config.signals = [
entry if isinstance(entry, str) else entry[0] for entry in self._signals
]
self._update_completer_model(self.config.signals)
if self._set_first_element_as_empty and self.count() > 0 and self.itemText(0) != "":
self.insertItem(0, "")
@SafeSlot()
def reset_selection(self):
"""Reset the selection of the combobox."""
self.clear()
self.setItemText(0, "Select a device")
"""Reset the current selection and refresh available signals."""
self.setCurrentText("")
self.update_signals_from_filters()
self.device_signal_changed.emit("")
@SafeSlot(str)
def on_text_changed(self, text: str):
"""Validate and emit only when the signal is valid.
For a positioner, the readback value has to be renamed to the device name.
When using signal_class_filter, device validation is skipped.
"""
"""Validate the current text when edited or selected."""
self.check_validity(text)
@Slot(str)
def check_validity(self, input_text: str) -> None:
"""Check if the current value is a valid signal and emit only when valid."""
"""Validate current text and update visual state."""
if self._signal_class_filter:
if self._require_device and (not self._device or not input_text):
is_valid = False
else:
is_valid = self.validate_signal(input_text)
is_valid = not (self._require_device and not self._device) and self.validate_signal(
input_text
)
else:
if self._require_device and not self.validate_device(self._device):
is_valid = False
else:
is_valid = self.validate_device(self._device) and self.validate_signal(input_text)
is_valid = self.validate_device(self._device) and self.validate_signal(input_text)
if is_valid:
self._is_valid_input = True
@@ -397,18 +484,105 @@ class SignalComboBox(DeviceSignalInputBase, QComboBox):
if self.isEnabled():
self.setStyleSheet("border: 1px solid red;")
@property
def selected_signal_comp_name(self) -> str:
return dict(self.signals).get(self.currentText(), {}).get("component_name", "")
def cleanup(self):
"""Cleanup the widget."""
self.bec_dispatcher.client.callbacks.remove(self._device_update_register)
super().cleanup()
@property
def is_valid_input(self) -> bool:
"""Whether the current text represents a valid signal selection."""
return self._is_valid_input
@staticmethod
def _normalize_kind(value: Kind | str) -> Kind | None:
if isinstance(value, Kind):
return value
return Kind.__members__.get(value) or Kind.__members__.get(value.lower())
def _set_kind_filter_enabled(self, kind: Kind, enabled: bool):
if enabled:
self._signal_filter.add(kind)
else:
self._signal_filter.discard(kind)
self.update_signals_from_filters()
def _set_signal_groups(
self,
hinted: list[tuple[str, dict]],
normal: list[tuple[str, dict]],
config: list[tuple[str, dict]],
) -> None:
self._hinted_signals = hinted
self._normal_signals = normal
self._config_signals = config
self.signals = self._hinted_signals + self._normal_signals + self._config_signals
self._insert_group_headers()
def _replace_signal_items(self):
replace_combobox_items(self, self._signals)
self._update_completer_model(self._signal_display_texts(self._signals))
if self._set_first_element_as_empty and self.count() > 0 and self.itemText(0) != "":
self.insertItem(0, "")
def _insert_group_headers(self):
offset = (
1
if self._set_first_element_as_empty and self.count() > 0 and self.itemText(0) == ""
else 0
)
if self._config_signals:
index = offset + len(self._hinted_signals) + len(self._normal_signals)
self.insertItem(index, "Config Signals")
self.model().item(index).setEnabled(False)
if self._normal_signals:
index = offset + len(self._hinted_signals)
self.insertItem(index, "Normal Signals")
self.model().item(index).setEnabled(False)
if self._hinted_signals:
index = offset
self.insertItem(index, "Hinted Signals")
self.model().item(index).setEnabled(False)
def _display_text_for_signal(self, signal: str) -> str | None:
for entry in self._signals:
display_text = entry[0] if isinstance(entry, tuple) else entry
if display_text == signal:
return display_text
if isinstance(entry, tuple) and self._signal_info_matches(entry[1], signal):
return display_text
return None
@staticmethod
def _signal_info_matches(signal_info: dict, signal: str) -> bool:
return signal in {
signal_info.get("obj_name"),
signal_info.get("component_name"),
signal_info.get("component_name", "").replace(".", "_"),
}
def _find_signal_index(self, signal: str) -> int:
index = self.findText(signal)
if index >= 0:
return index
for item_index in range(self.count()):
signal_info = self.itemData(item_index)
if isinstance(signal_info, dict) and self._signal_info_matches(signal_info, signal):
return item_index
return -1
@staticmethod
def _signal_display_texts(signals: list[str | tuple[str, dict]]) -> list[str]:
return [entry[0] if isinstance(entry, tuple) else entry for entry in signals]
def _update_completer_model(self, items: list[str]) -> None:
self._completer_model.setStringList(items)
def _restore_default_completer(self) -> None:
if self.completer() is not None and self.completer().model() == self.model():
return
current_text = self.currentText()
self.setEditable(False)
self.setEditable(True)
self.setCurrentText(current_text)
if __name__ == "__main__": # pragma: no cover
# pylint: disable=import-outside-toplevel
from qtpy.QtWidgets import QApplication, QVBoxLayout, QWidget
from bec_widgets.utils.colors import apply_theme
@@ -417,16 +591,14 @@ if __name__ == "__main__": # pragma: no cover
apply_theme("dark")
widget = QWidget()
widget.setFixedSize(200, 200)
layout = QVBoxLayout()
widget.setLayout(layout)
layout = QVBoxLayout(widget)
box = SignalComboBox(
device="waveform",
signal_class_filter=["AsyncSignal", "AsyncMultiSignal"],
ndim_filter=[1, 2],
store_signal_config=True,
signal_filter=[Kind.hinted, Kind.normal, Kind.config],
) # change signal filter class to test
box.setEditable(True)
)
layout.addWidget(box)
widget.show()
app.exec_()
@@ -1,17 +0,0 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.control.device_input.signal_line_edit.signal_line_edit_plugin import (
SignalLineEditPlugin,
)
QPyDesignerCustomWidgetCollection.addCustomWidget(SignalLineEditPlugin())
if __name__ == "__main__": # pragma: no cover
main()
@@ -1,169 +0,0 @@
from bec_lib.device import Positioner
from qtpy.QtCore import QSize, Signal, Slot
from qtpy.QtGui import QPainter, QPaintEvent, QPen
from qtpy.QtWidgets import QCompleter, QLineEdit, QSizePolicy
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.utils.ophyd_kind_util import Kind
from bec_widgets.widgets.control.device_input.base_classes.device_signal_input_base import (
DeviceSignalInputBase,
)
class SignalLineEdit(DeviceSignalInputBase, QLineEdit):
"""
Line edit widget for device input with autocomplete for device names.
Args:
parent: Parent widget.
client: BEC client object.
config: Device input configuration.
gui_id: GUI ID.
device_filter: Device filter, name of the device class from BECDeviceFilter and BECReadoutPriority. Check DeviceInputBase for more details.
default: Default device name.
arg_name: Argument name, can be used for the other widgets which has to call some other function in bec using correct argument names.
"""
USER_ACCESS = ["_is_valid_input", "set_signal", "set_device", "signals"]
device_signal_changed = Signal(str)
PLUGIN = True
RPC = False
ICON_NAME = "vital_signs"
def __init__(
self,
parent=None,
client=None,
config: DeviceSignalInputBase = None,
gui_id: str | None = None,
device: str | None = None,
signal_filter: str | list[str] | None = None,
default: str | None = None,
arg_name: str | None = None,
**kwargs,
):
self.__is_valid_input = False
super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs)
self._accent_colors = get_accent_colors()
self.completer = QCompleter(self)
self.setCompleter(self.completer)
if arg_name is not None:
self.config.arg_name = arg_name
self.arg_name = arg_name
if default is not None:
self.set_device(default)
self.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Fixed)
self.setMinimumSize(QSize(100, 0))
# We do not consider the config that is passed here, this produced problems
# with QtDesigner, since config and input arguments may differ and resolve properly
# Implementing this logic and config recoverage is postponed.
if signal_filter is not None:
self.set_filter(signal_filter)
else:
self.set_filter([Kind.hinted, Kind.normal, Kind.config])
if device is not None:
self.set_device(device)
if default is not None:
self.set_signal(default)
self.textChanged.connect(self.check_validity)
self.check_validity(self.text())
@property
def _is_valid_input(self) -> bool:
"""
Check if the current value is a valid device name.
Returns:
bool: True if the current value is a valid device name, False otherwise.
"""
return self.__is_valid_input
@_is_valid_input.setter
def _is_valid_input(self, value: bool) -> None:
self.__is_valid_input = value
def get_current_device(self) -> object:
"""
Get the current device object based on the current value.
Returns:
object: Device object, can be device of type Device, Positioner, Signal or ComputedSignal.
"""
dev_name = self.text()
return self.get_device_object(dev_name)
def paintEvent(self, event: QPaintEvent) -> None:
"""Extend the paint event to set the border color based on the validity of the input.
Args:
event (PySide6.QtGui.QPaintEvent) : Paint event.
"""
super().paintEvent(event)
painter = QPainter(self)
pen = QPen()
pen.setWidth(2)
if self._is_valid_input is False and self.isEnabled() is True:
pen.setColor(self._accent_colors.emergency)
painter.setPen(pen)
painter.drawRect(self.rect().adjusted(1, 1, -1, -1))
@Slot(str)
def check_validity(self, input_text: str) -> None:
"""
Check if the current value is a valid device name.
"""
if self.validate_signal(input_text) is True:
self._is_valid_input = True
self.on_text_changed(input_text)
else:
self._is_valid_input = False
self.update()
@Slot(str)
def on_text_changed(self, text: str):
"""Slot for text changed. If a device is selected and the signal is changed and valid it emits a signal.
For a positioner, the readback value has to be renamed to the device name.
Args:
text (str): Text in the combobox.
"""
print("test")
if self.validate_device(self.device) is False:
return
if self.validate_signal(text) is False:
return
if text == "readback" and isinstance(self.get_device_object(self.device), Positioner):
device_signal = self.device
else:
device_signal = f"{self.device}_{text}"
self.device_signal_changed.emit(device_signal)
if __name__ == "__main__": # pragma: no cover
# pylint: disable=import-outside-toplevel
from qtpy.QtWidgets import QApplication, QVBoxLayout, QWidget
from bec_widgets.utils.colors import apply_theme
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import (
DeviceComboBox,
)
app = QApplication([])
apply_theme("dark")
widget = QWidget()
widget.setFixedSize(200, 200)
layout = QVBoxLayout()
widget.setLayout(layout)
device_line_edit = DeviceComboBox()
device_line_edit.filter_to_positioner = True
signal_line_edit = SignalLineEdit()
device_line_edit.device_selected.connect(signal_line_edit.set_device)
layout.addWidget(device_line_edit)
layout.addWidget(signal_line_edit)
widget.show()
app.exec_()
@@ -1 +0,0 @@
{'files': ['signal_line_edit.py']}
@@ -1,59 +0,0 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from qtpy.QtWidgets import QWidget
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.control.device_input.signal_line_edit.signal_line_edit import (
SignalLineEdit,
)
DOM_XML = """
<ui language='c++'>
<widget class='SignalLineEdit' name='signal_line_edit'>
</widget>
</ui>
"""
class SignalLineEditPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
if parent is None:
return QWidget()
t = SignalLineEdit(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return "BEC Input Widgets"
def icon(self):
return designer_material_icon(SignalLineEdit.ICON_NAME)
def includeFile(self):
return "signal_line_edit"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "SignalLineEdit"
def toolTip(self):
return ""
def whatsThis(self):
return self.toolTip()
@@ -26,7 +26,6 @@ from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.widgets.control.buttons.stop_button.stop_button import StopButton
from bec_widgets.widgets.control.scan_control.scan_group_box import ScanGroupBox
from bec_widgets.widgets.editors.scan_metadata.scan_metadata import ScanMetadata
from bec_widgets.widgets.utility.toggle.toggle import ToggleSwitch
class ScanParameterConfig(BaseModel):
@@ -84,7 +83,6 @@ class ScanControl(BECWidget, QWidget):
self.kwarg_boxes = []
self.expert_mode = False # TODO implement in the future versions
self.previous_scan = None
self.last_scan_found = None
# Widget Default Parameters
self.config.default_scan = default_scan
@@ -123,17 +121,12 @@ class ScanControl(BECWidget, QWidget):
scan_selection_layout.addWidget(self.comboBox_scan_selection, 1)
self.scan_selection_group.layout().addLayout(scan_selection_layout)
# Label to reload the last scan parameters within scan selection group box
self.toggle_layout = QHBoxLayout()
self.toggle_layout.addSpacerItem(
QSpacerItem(0, 0, QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed)
# Button to reload the last scan parameters on demand.
self.last_scan_button = QPushButton(
"Restore last scan parameters", self.scan_selection_group
)
self.last_scan_label = QLabel("Restore last scan parameters", self.scan_selection_group)
self.toggle = ToggleSwitch(parent=self.scan_selection_group, checked=False)
self.toggle.enabled.connect(self.request_last_executed_scan_parameters)
self.toggle_layout.addWidget(self.last_scan_label)
self.toggle_layout.addWidget(self.toggle)
self.scan_selection_group.layout().addLayout(self.toggle_layout)
self.last_scan_button.clicked.connect(self.request_last_executed_scan_parameters)
self.scan_selection_group.layout().addWidget(self.last_scan_button)
self.scan_selection_group.setSizePolicy(
QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Fixed
)
@@ -206,7 +199,6 @@ class ScanControl(BECWidget, QWidget):
"""Callback for scan selection combo box"""
selected_scan_name = self.comboBox_scan_selection.currentText()
self.scan_selected.emit(selected_scan_name)
self.request_last_executed_scan_parameters()
self.restore_scan_parameters(selected_scan_name)
@SafeSlot()
@@ -215,10 +207,6 @@ class ScanControl(BECWidget, QWidget):
"""
Requests the last executed scan parameters from BEC and restores them to the scan control widget.
"""
self.last_scan_found = False
if not self.toggle.checked:
return
current_scan = self.comboBox_scan_selection.currentText()
history = (
self.client.connector.xread(
@@ -246,8 +234,6 @@ class ScanControl(BECWidget, QWidget):
if merged and self.kwarg_boxes:
for box in self.kwarg_boxes:
box.set_parameters(merged)
self.last_scan_found = True
break
@SafeProperty(str)
@@ -496,8 +482,6 @@ class ScanControl(BECWidget, QWidget):
Args:
scan_name(str): Name of the scan to restore the parameters for.
"""
if self.last_scan_found is True:
return
scan_params = self.config.scans.get(scan_name, None)
if scan_params is None and self.previous_scan is None:
return
@@ -21,9 +21,9 @@ from qtpy.QtWidgets import (
)
from bec_widgets.utils.widget_io import WidgetIO
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import BECDeviceFilter
from bec_widgets.widgets.control.device_input.device_line_edit.device_line_edit import (
DeviceLineEdit,
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import (
BECDeviceFilter,
DeviceComboBox,
)
logger = bec_logger.logger
@@ -164,8 +164,8 @@ class ScanCheckBox(QCheckBox):
class ScanGroupBox(QGroupBox):
WIDGET_HANDLER = {
ScanArgType.DEVICE: DeviceLineEdit,
ScanArgType.DEVICEBASE: DeviceLineEdit,
ScanArgType.DEVICE: DeviceComboBox,
ScanArgType.DEVICEBASE: DeviceComboBox,
ScanArgType.FLOAT: ScanDoubleSpinBox,
ScanArgType.INT: ScanSpinBox,
ScanArgType.BOOL: ScanCheckBox,
@@ -271,9 +271,17 @@ class ScanGroupBox(QGroupBox):
continue
if default == "_empty":
default = None
widget = widget_class(parent=self.parent(), arg_name=arg_name, default=default)
if isinstance(widget, DeviceLineEdit):
widget.set_device_filter(BECDeviceFilter.DEVICE)
if widget_class is DeviceComboBox:
widget = widget_class(
parent=self.parent(),
arg_name=arg_name,
default=default,
device_filter=BECDeviceFilter.DEVICE,
autocomplete=True,
)
else:
widget = widget_class(parent=self.parent(), arg_name=arg_name, default=default)
if isinstance(widget, DeviceComboBox):
self.selected_devices[widget] = ""
widget.device_selected.connect(self.emit_device_selected)
if isinstance(widget, ScanLiteralsComboBox):
@@ -311,7 +319,7 @@ class ScanGroupBox(QGroupBox):
return
for widget in self.widgets[-len(self.inputs) :]:
if isinstance(widget, DeviceLineEdit):
if isinstance(widget, DeviceComboBox):
self.selected_devices[widget] = ""
widget.close()
widget.deleteLater()
@@ -323,7 +331,7 @@ class ScanGroupBox(QGroupBox):
def remove_all_widget_bundles(self):
"""Remove every widget bundle from the scan control layout."""
for widget in list(self.widgets):
if isinstance(widget, DeviceLineEdit):
if isinstance(widget, DeviceComboBox):
self.selected_devices.pop(widget, None)
widget.close()
widget.deleteLater()
@@ -360,8 +368,10 @@ class ScanGroupBox(QGroupBox):
for j in range(self.layout.columnCount()):
try: # In case that the bundle size changes
widget = self.layout.itemAtPosition(i, j).widget()
if isinstance(widget, DeviceLineEdit) and device_object:
if isinstance(widget, DeviceComboBox) and device_object:
value = widget.get_current_device()
elif isinstance(widget, DeviceComboBox):
value = widget.currentText()
else:
value = WidgetIO.get_value(widget)
args.append(value)
@@ -373,8 +383,10 @@ class ScanGroupBox(QGroupBox):
kwargs = {}
for i in range(self.layout.columnCount()):
widget = self.layout.itemAtPosition(1, i).widget()
if isinstance(widget, DeviceLineEdit) and device_object:
if isinstance(widget, DeviceComboBox) and device_object:
value = widget.get_current_device().name
elif isinstance(widget, DeviceComboBox):
value = widget.currentText()
elif isinstance(widget, ScanLiteralsComboBox):
value = widget.get_value()
else:
@@ -390,7 +402,7 @@ class ScanGroupBox(QGroupBox):
if item is not None:
widget = item.widget()
if widget is not None:
if isinstance(widget, DeviceLineEdit):
if isinstance(widget, DeviceComboBox):
widget_rows += 1
return widget_rows
+18 -7
View File
@@ -2,21 +2,16 @@ from __future__ import annotations
import json
from dataclasses import dataclass
from typing import Literal
from typing import TYPE_CHECKING, Literal
import numpy as np
import pyqtgraph as pg
from bec_lib import bec_logger, messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
from pydantic import BaseModel, Field, field_validator
from qtpy.QtCore import QObject, Qt, QThread, QTimer, Signal
from qtpy.QtGui import QTransform
from scipy.interpolate import (
CloughTocher2DInterpolator,
LinearNDInterpolator,
NearestNDInterpolator,
)
from scipy.spatial import cKDTree
from toolz import partition
from bec_widgets.utils.bec_connector import ConnectionConfig
@@ -32,6 +27,22 @@ from bec_widgets.widgets.plots.plot_base import PlotBase
logger = bec_logger.logger
if TYPE_CHECKING:
from scipy.interpolate import (
CloughTocher2DInterpolator,
LinearNDInterpolator,
NearestNDInterpolator,
)
from scipy.spatial import cKDTree
else:
CloughTocher2DInterpolator, LinearNDInterpolator, NearestNDInterpolator = lazy_import_from(
"scipy.interpolate",
["CloughTocher2DInterpolator", "LinearNDInterpolator", "NearestNDInterpolator"],
)
cKDTree = lazy_import_from("scipy.spatial", ["cKDTree"])
class HeatmapDeviceSignal(BaseModel):
"""The configuration of a signal in the scatter waveform widget."""
@@ -3,8 +3,10 @@ from qtpy.QtWidgets import QHBoxLayout, QSizePolicy, QWidget
from bec_widgets.utils.toolbars.actions import NoCheckDelegate, WidgetAction
from bec_widgets.utils.toolbars.bundles import ToolbarBundle, ToolbarComponents
from bec_widgets.utils.toolbars.connections import BundleConnection
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import BECDeviceFilter
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import (
BECDeviceFilter,
DeviceComboBox,
)
class MotorSelection(QWidget):
@@ -6,8 +6,10 @@ from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.toolbars.actions import DeviceComboBoxAction, WidgetAction
from bec_widgets.utils.toolbars.bundles import ToolbarComponents
from bec_widgets.utils.toolbars.toolbar import ToolbarBundle
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import BECDeviceFilter
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import (
BECDeviceFilter,
DeviceComboBox,
)
from bec_widgets.widgets.utility.visual.colormap_widget.colormap_widget import BECColorMapWidget
@@ -58,7 +58,7 @@
</widget>
</item>
<item row="0" column="1">
<widget class="DeviceLineEdit" name="device_x"/>
<widget class="DeviceComboBox" name="device_x"/>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label_2">
@@ -87,7 +87,7 @@
</widget>
</item>
<item row="0" column="1">
<widget class="DeviceLineEdit" name="device_y"/>
<widget class="DeviceComboBox" name="device_y"/>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label_4">
@@ -116,7 +116,7 @@
</widget>
</item>
<item row="0" column="1">
<widget class="DeviceLineEdit" name="device_z"/>
<widget class="DeviceComboBox" name="device_z"/>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label_6">
@@ -135,9 +135,9 @@
</widget>
<customwidgets>
<customwidget>
<class>DeviceLineEdit</class>
<extends>QLineEdit</extends>
<header>device_line_edit</header>
<class>DeviceComboBox</class>
<extends>QComboBox</extends>
<header>device_combo_box</header>
</customwidget>
<customwidget>
<class>ToggleSwitch</class>
@@ -79,7 +79,7 @@ class CurveRow(QTreeWidgetItem):
Columns:
0: Actions (delete or "Add DAP" if source=device)
1..2: DeviceLineEdit and QLineEdit if source=device, or "Model" label and DapComboBox if source=dap
1..2: DeviceComboBox and QLineEdit if source=device, or "Model" label and DapComboBox if source=dap
3: ColorButton
4: Style QComboBox
5: Pen width QSpinBox
@@ -10,6 +10,7 @@ from bec_lib.device import Positioner
from bec_lib.endpoints import MessageEndpoints
from bec_lib.lmfit_serializer import serialize_lmfit_params, serialize_param_object
from bec_lib.scan_data_container import ScanDataContainer
from bec_lib.utils.import_utils import lazy_import
from pydantic import Field, ValidationError, field_validator
from qtpy.QtCore import Qt, QTimer, Signal
from qtpy.QtWidgets import (
@@ -54,13 +55,7 @@ _DAP_PARAM = object()
if TYPE_CHECKING: # pragma: no cover
import lmfit # type: ignore
else:
try:
import lmfit # type: ignore
except Exception as e: # pragma: no cover
logger.warning(
f"lmfit could not be imported: {e}. Custom DAP functionality will be unavailable."
)
lmfit = None
lmfit = lazy_import("lmfit")
# noinspection PyDataclass
@@ -25,9 +25,7 @@ from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.utils.ophyd_kind_util import Kind
from bec_widgets.widgets.control.device_input.device_line_edit.device_line_edit import (
DeviceLineEdit,
)
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import SignalComboBox
if TYPE_CHECKING:
@@ -58,7 +56,7 @@ class ChoiceDialog(QDialog):
layout = QHBoxLayout()
self._device_field = DeviceLineEdit(parent=parent, client=client)
self._device_field = DeviceComboBox(parent=parent, client=client)
self._signal_field = SignalComboBox(parent=parent, client=client)
layout.addWidget(self._device_field)
layout.addWidget(self._signal_field)
@@ -73,10 +71,13 @@ class ChoiceDialog(QDialog):
self._signal_field.include_config_signals = show_config
self.setLayout(layout)
self._device_field.textChanged.connect(self._update_device)
self._device_field.currentTextChanged.connect(self._update_device)
if device:
self._device_field.set_device(device)
if signal and signal in set(s[0] for s in self._signal_field.signals):
available_signals = {
entry[0] if isinstance(entry, tuple) else entry for entry in self._signal_field.signals
}
if signal and signal in available_signals:
self._signal_field.set_signal(signal)
def _display_error(self):
@@ -97,19 +98,19 @@ class ChoiceDialog(QDialog):
self._device_field.set_device(device)
self._signal_field.set_device(device)
self._device_field.setStyleSheet(
f"QLineEdit {{ border-style: solid; border-width: 2px; border-color: {self._accent_colors.success.name() if self._accent_colors else 'green'}}}"
f"QComboBox {{ border-style: solid; border-width: 2px; border-color: {self._accent_colors.success.name() if self._accent_colors else 'green'}}}"
)
self.button_box.button(QDialogButtonBox.Ok).setEnabled(True)
else:
self._device_field.setStyleSheet(
f"QLineEdit {{ border-style: solid; border-width: 2px; border-color: {self._accent_colors.emergency.name() if self._accent_colors else 'red'}}}"
f"QComboBox {{ border-style: solid; border-width: 2px; border-color: {self._accent_colors.emergency.name() if self._accent_colors else 'red'}}}"
)
self.button_box.button(QDialogButtonBox.Ok).setEnabled(False)
self._signal_field.clear()
def accept(self):
self.accepted_output.emit(
self._device_field.text(), self._signal_field.selected_signal_comp_name
self._device_field.currentText(), self._signal_field.selected_signal_comp_name
)
self.cleanup()
return super().accept()
@@ -170,7 +171,7 @@ class SignalLabel(BECWidget, QWidget):
client (BECClient, optional): The BEC client. Defaults to None.
device (str, optional): The device name. Defaults to None.
signal (str, optional): The signal name. Defaults to None.
selection_dialog_config (DeviceSignalInputBaseConfig | dict, optional): Configuration for the signal selection dialog.
selection_dialog_config: Configuration for the signal selection dialog.
show_select_button (bool, optional): Whether to show the select button. Defaults to True.
show_default_units (bool, optional): Whether to show default units. Defaults to False.
custom_label (str, optional): Custom label for the widget. Defaults to "".
+4 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "bec_widgets"
version = "3.7.1"
version = "3.9.1"
description = "BEC Widgets"
requires-python = ">=3.11"
classifiers = [
@@ -58,6 +58,9 @@ dev = [
]
qtermwidget = ["pyside6_qtermwidget"]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
-1
View File
@@ -59,5 +59,4 @@ def test_run_line_scan_with_parameters_e2e(scan_control, bec_client_lib, qtbot):
last_scan = queue.scan_storage.storage[-1]
assert last_scan.status_message.info["scan_name"] == scan_name
assert last_scan.status_message.info["exp_time"] == kwargs["exp_time"]
assert last_scan.status_message.info["scan_motors"] == [args["device"]]
assert last_scan.status_message.info["num_points"] == kwargs["steps"]
-1
View File
@@ -84,7 +84,6 @@ def test_scan_metadata_for_custom_scan(
last_scan = queue.scan_storage.storage[-1]
assert last_scan.status_message.info["scan_name"] == scan_name
assert last_scan.status_message.info["exp_time"] == kwargs["exp_time"]
assert last_scan.status_message.info["scan_motors"] == [args["device"]]
assert last_scan.status_message.info["num_points"] == kwargs["steps"]
if valid:
-1
View File
@@ -71,7 +71,6 @@ def bec_queue_msg_full():
},
"report_instructions": [{"scan_progress": 20}],
"scan_id": "2d704cc3-c172-404c-866d-608ce09fce40",
"scan_motors": ["samx"],
"scan_number": 1289,
}
],
+54 -114
View File
@@ -1,152 +1,92 @@
from unittest import mock
import pytest
from bec_lib.device import ReadoutPriority
from qtpy.QtWidgets import QWidget
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import (
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import (
BECDeviceFilter,
DeviceInputBase,
DeviceComboBox,
DeviceInputConfig,
)
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from .client_mocks import mocked_client
from .conftest import create_widget
# DeviceInputBase is meant to be mixed in a QWidget
class DeviceInputWidget(DeviceInputBase, QWidget):
"""Thin wrapper around DeviceInputBase to make it a QWidget"""
def __init__(self, parent=None, client=None, config=None, gui_id=None, **kwargs):
super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs)
@pytest.fixture
def device_input_base(qtbot, mocked_client):
"""Fixture with mocked FilterIO and WidgetIO"""
with mock.patch("bec_widgets.utils.filter_io.FilterIO.set_selection"):
with mock.patch("bec_widgets.utils.widget_io.WidgetIO.set_value"):
with mock.patch("bec_widgets.utils.widget_io.WidgetIO.get_value"):
widget = create_widget(qtbot=qtbot, widget=DeviceInputWidget, client=mocked_client)
yield widget
def test_device_input_base_init(device_input_base):
"""Test init"""
assert device_input_base is not None
assert device_input_base.client is not None
assert isinstance(device_input_base, DeviceInputBase)
assert device_input_base.config.widget_class == "DeviceInputWidget"
assert device_input_base.config.device_filter == []
assert device_input_base.config.default is None
assert device_input_base.devices == []
def test_device_input_base_init_with_config(qtbot, mocked_client):
"""Test init with Config"""
def test_device_combobox_init_with_config(qtbot, mocked_client):
config = {
"widget_class": "DeviceInputWidget",
"widget_class": "DeviceComboBox",
"gui_id": "test_gui_id",
"device_filter": [BECDeviceFilter.POSITIONER],
"device_filter": [BECDeviceFilter.POSITIONER.value],
"default": "samx",
}
widget = DeviceInputWidget(client=mocked_client, config=config)
widget2 = DeviceInputWidget(
client=mocked_client, config=DeviceInputConfig.model_validate(config)
widget = create_widget(
qtbot=qtbot,
widget=DeviceComboBox,
client=mocked_client,
config=DeviceInputConfig.model_validate(config),
)
qtbot.addWidget(widget)
qtbot.addWidget(widget2)
qtbot.waitExposed(widget)
qtbot.waitExposed(widget2)
for w in [widget, widget2]:
assert w.config.gui_id == "test_gui_id"
assert w.config.device_filter == ["Positioner"]
assert w.config.default == "samx"
assert widget.config.gui_id == "test_gui_id"
assert widget.config.device_filter == ["Positioner"]
assert widget.config.default == "samx"
def test_device_input_base_set_device_filter(device_input_base):
"""Test device filter setter."""
device_input_base.set_device_filter(BECDeviceFilter.POSITIONER)
assert device_input_base.config.device_filter == ["Positioner"]
def test_device_combobox_set_device_filter(qtbot, mocked_client):
widget = create_widget(qtbot=qtbot, widget=DeviceComboBox, client=mocked_client)
widget.set_device_filter(BECDeviceFilter.POSITIONER)
assert widget.config.device_filter == ["Positioner"]
def test_device_input_base_set_device_filter_error(device_input_base):
"""Test set_device_filter with Noneexisting class. This should not raise. It writes a log message entry."""
device_input_base.set_device_filter("NonExistingClass")
assert device_input_base.device_filter == []
def test_device_combobox_set_device_filter_error(qtbot, mocked_client):
widget = create_widget(qtbot=qtbot, widget=DeviceComboBox, client=mocked_client)
widget.set_device_filter("NonExistingClass")
assert widget.device_filter == []
def test_device_input_base_set_default_device(device_input_base):
"""Test setting the default device. Also tests the update_devices method."""
device_input_base.set_device("samx")
assert device_input_base.config.default == None
device_input_base.set_device_filter(BECDeviceFilter.POSITIONER)
device_input_base.set_readout_priority_filter(ReadoutPriority.MONITORED)
device_input_base.set_device("samx")
assert device_input_base.config.default == "samx"
def test_device_combobox_set_default_device(qtbot, mocked_client):
widget = create_widget(qtbot=qtbot, widget=DeviceComboBox, client=mocked_client)
widget.set_device("samx")
assert widget.config.default == "samx"
def test_device_input_base_get_filters(device_input_base):
"""Test getting the available filters."""
filters = device_input_base.get_available_filters()
selection = [
BECDeviceFilter.POSITIONER,
BECDeviceFilter.DEVICE,
BECDeviceFilter.COMPUTED_SIGNAL,
BECDeviceFilter.SIGNAL,
] + [
ReadoutPriority.MONITORED,
ReadoutPriority.BASELINE,
ReadoutPriority.ASYNC,
ReadoutPriority.ON_REQUEST,
]
assert [entry for entry in filters if entry in selection]
def test_device_combobox_get_filters(qtbot, mocked_client):
widget = create_widget(qtbot=qtbot, widget=DeviceComboBox, client=mocked_client)
assert BECDeviceFilter.POSITIONER in widget.get_available_filters()
assert ReadoutPriority.MONITORED in widget.get_readout_priority_filters()
def test_device_input_base_properties(device_input_base):
"""Test setting the properties of the device input base."""
assert device_input_base.device_filter == []
device_input_base.filter_to_device = True
assert device_input_base.device_filter == [BECDeviceFilter.DEVICE]
device_input_base.filter_to_positioner = True
assert device_input_base.device_filter == [BECDeviceFilter.DEVICE, BECDeviceFilter.POSITIONER]
device_input_base.filter_to_computed_signal = True
assert device_input_base.device_filter == [
BECDeviceFilter.DEVICE,
BECDeviceFilter.POSITIONER,
BECDeviceFilter.COMPUTED_SIGNAL,
]
device_input_base.filter_to_signal = True
assert device_input_base.device_filter == [
def test_device_combobox_properties(qtbot, mocked_client):
widget = create_widget(qtbot=qtbot, widget=DeviceComboBox, client=mocked_client)
widget.filter_to_device = True
widget.filter_to_positioner = True
widget.filter_to_computed_signal = True
widget.filter_to_signal = True
assert widget.device_filter == [
BECDeviceFilter.DEVICE,
BECDeviceFilter.POSITIONER,
BECDeviceFilter.COMPUTED_SIGNAL,
BECDeviceFilter.SIGNAL,
]
assert device_input_base.readout_filter == []
device_input_base.readout_async = True
assert device_input_base.readout_filter == [ReadoutPriority.ASYNC]
device_input_base.readout_baseline = True
assert device_input_base.readout_filter == [ReadoutPriority.ASYNC, ReadoutPriority.BASELINE]
device_input_base.readout_monitored = True
assert device_input_base.readout_filter == [
ReadoutPriority.ASYNC,
ReadoutPriority.BASELINE,
ReadoutPriority.MONITORED,
]
device_input_base.readout_on_request = True
assert device_input_base.readout_filter == [
ReadoutPriority.ASYNC,
ReadoutPriority.BASELINE,
ReadoutPriority.MONITORED,
ReadoutPriority.ON_REQUEST,
]
widget.readout_async = True
widget.readout_baseline = True
widget.readout_monitored = True
widget.readout_on_request = True
assert ReadoutPriority.ASYNC in widget.readout_filter
assert ReadoutPriority.BASELINE in widget.readout_filter
assert ReadoutPriority.MONITORED in widget.readout_filter
assert ReadoutPriority.ON_REQUEST in widget.readout_filter
def test_device_combobox_signal_class_filter(qtbot, mocked_client):
"""Test device filtering via signal_class_filter on combobox."""
mocked_client.device_manager.get_bec_signals = mock.MagicMock(
return_value=[
("samx", "async_signal", {"signal_class": "AsyncSignal"}),
+34 -76
View File
@@ -1,10 +1,9 @@
import pytest
from bec_lib.device import ReadoutPriority
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import BECDeviceFilter
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.device_input.device_line_edit.device_line_edit import (
DeviceLineEdit,
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import (
BECDeviceFilter,
DeviceComboBox,
)
from .client_mocks import mocked_client
@@ -37,6 +36,21 @@ def test_device_input_combobox_init(device_input_combobox):
assert device_input_combobox.client is not None
assert isinstance(device_input_combobox, DeviceComboBox)
assert device_input_combobox.config.widget_class == "DeviceComboBox"
assert device_input_combobox.isEditable() is True
assert device_input_combobox.currentText() == ""
assert device_input_combobox.is_valid_input is False
assert device_input_combobox.config.device_filter == []
assert device_input_combobox.config.readout_filter == [
ReadoutPriority.MONITORED.value,
ReadoutPriority.BASELINE.value,
ReadoutPriority.ASYNC.value,
ReadoutPriority.CONTINUOUS.value,
ReadoutPriority.ON_REQUEST.value,
]
assert device_input_combobox.config.default is None
assert device_input_combobox.autocomplete is False
assert device_input_combobox.completer() is not None
assert device_input_combobox.completer().model() == device_input_combobox.model()
assert device_input_combobox.devices == [
"samx",
"samy",
@@ -65,81 +79,25 @@ def test_device_input_combobox_init_with_kwargs(device_input_combobox_with_kwarg
assert device_input_combobox_with_kwargs.config.arg_name == "test_arg_name"
def test_device_input_combobox_autocomplete(qtbot, mocked_client):
widget = DeviceComboBox(client=mocked_client, autocomplete=True)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
assert widget.autocomplete is True
assert widget.completer() is not None
assert widget.completer().model().stringList() == widget.devices
assert widget.completer().model() != widget.model()
widget.autocomplete = False
assert widget.completer() is not None
assert widget.completer().model() == widget.model()
def test_get_device_from_input_combobox_init(device_input_combobox):
device_input_combobox.setCurrentIndex(0)
device_text = device_input_combobox.currentText()
current_device = device_input_combobox.get_current_device()
assert current_device.name == device_text
@pytest.fixture
def device_input_line_edit(qtbot, mocked_client):
widget = DeviceLineEdit(client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
@pytest.fixture
def device_input_line_edit_with_kwargs(qtbot, mocked_client):
widget = DeviceLineEdit(
client=mocked_client,
gui_id="test_gui_id",
device_filter=[BECDeviceFilter.POSITIONER],
default="samx",
arg_name="test_arg_name",
)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def test_device_input_line_edit_init(device_input_line_edit):
assert device_input_line_edit is not None
assert device_input_line_edit.client is not None
assert isinstance(device_input_line_edit, DeviceLineEdit)
assert device_input_line_edit.config.widget_class == "DeviceLineEdit"
assert device_input_line_edit.config.device_filter == []
assert device_input_line_edit.config.readout_filter == [
ReadoutPriority.MONITORED,
ReadoutPriority.BASELINE,
ReadoutPriority.ASYNC,
ReadoutPriority.CONTINUOUS,
ReadoutPriority.ON_REQUEST,
]
assert device_input_line_edit.config.default is None
assert device_input_line_edit.devices == [
"samx",
"samy",
"samz",
"aptrx",
"aptry",
"gauss_bpm",
"gauss_adc1",
"gauss_adc2",
"gauss_adc3",
"bpm4i",
"bpm3a",
"bpm3i",
"eiger",
"waveform1d",
"async_device",
"test",
"test_device",
]
def test_device_input_line_edit_init_with_kwargs(device_input_line_edit_with_kwargs):
assert device_input_line_edit_with_kwargs.config.gui_id == "test_gui_id"
assert device_input_line_edit_with_kwargs.config.device_filter == ["Positioner"]
assert device_input_line_edit_with_kwargs.config.default == "samx"
assert device_input_line_edit_with_kwargs.config.arg_name == "test_arg_name"
def test_get_device_from_input_line_edit_init(device_input_line_edit):
device_input_line_edit.setText("samx")
device_text = device_input_line_edit.text()
current_device = device_input_line_edit.get_current_device()
assert current_device.name == device_text
+2 -5
View File
@@ -146,10 +146,7 @@ class TestDeviceManagerViewDialogs:
group_combo: QtWidgets.QComboBox = dialog._control_widgets["group_combo"]
assert group_combo.count() == len(OPHYD_DEVICE_TEMPLATES)
# Test select a group from available templates
variant_combo = dialog._control_widgets["variant_combo"]
assert variant_combo.isEnabled() is False
with qtbot.waitSignal(group_combo.currentTextChanged):
epics_signal_index = group_combo.findText("EpicsSignal")
group_combo.setCurrentIndex(epics_signal_index) # Select "EpicsSignal" group
@@ -235,7 +232,7 @@ class TestDeviceManagerViewDialogs:
sample_config = {
"name": "TestDevice",
"enabled": True,
"deviceClass": "ophyd.EpicsSignal",
"deviceClass": "ophyd_devices.EpicsSignal",
"readoutPriority": "baseline",
"deviceConfig": {"read_pv": "X25DA-ES1-MOT:GET"},
}
@@ -248,7 +245,7 @@ class TestDeviceManagerViewDialogs:
assert variant_combo.currentText() == "EpicsSignal"
config = dialog._device_config_template.get_config_fields()
assert config["name"] == "TestDevice"
assert config["deviceClass"] == "ophyd.EpicsSignal"
assert config["deviceClass"] == "ophyd_devices.EpicsSignal"
assert config["deviceConfig"]["read_pv"] == "X25DA-ES1-MOT:GET"
# Test now to add the device config with different validation results
+46 -74
View File
@@ -2,54 +2,28 @@ from unittest import mock
import pytest
from bec_lib.device import Signal
from qtpy.QtWidgets import QWidget
from bec_widgets.utils.ophyd_kind_util import Kind
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import BECDeviceFilter
from bec_widgets.widgets.control.device_input.base_classes.device_signal_input_base import (
DeviceSignalInputBase,
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import (
BECDeviceFilter,
DeviceComboBox,
)
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import SignalComboBox
from bec_widgets.widgets.control.device_input.signal_line_edit.signal_line_edit import (
SignalLineEdit,
)
from .client_mocks import mocked_client
from .conftest import create_widget
class FakeSignal(Signal):
"""Fake signal to test the DeviceSignalInputBase."""
class DeviceInputWidget(DeviceSignalInputBase, QWidget):
"""Thin wrapper around DeviceInputBase to make it a QWidget"""
@pytest.fixture
def device_signal_base(qtbot, mocked_client):
"""Fixture with mocked FilterIO and WidgetIO"""
with mock.patch("bec_widgets.utils.filter_io.FilterIO.set_selection"):
with mock.patch("bec_widgets.utils.widget_io.WidgetIO.set_value"):
widget = create_widget(qtbot=qtbot, widget=DeviceInputWidget, client=mocked_client)
yield widget
"""Fake signal used by SignalComboBox tests."""
@pytest.fixture
def device_signal_combobox(qtbot, mocked_client):
"""Fixture with mocked FilterIO and WidgetIO"""
widget = create_widget(qtbot=qtbot, widget=SignalComboBox, client=mocked_client)
yield widget
@pytest.fixture
def device_signal_line_edit(qtbot, mocked_client):
"""Fixture with mocked FilterIO and WidgetIO"""
widget = create_widget(qtbot=qtbot, widget=SignalLineEdit, client=mocked_client)
yield widget
@pytest.fixture
def test_device_signal_combo(qtbot, mocked_client):
"""Fixture to create a SignalComboBox widget and a DeviceInputWidget widget"""
@@ -63,34 +37,49 @@ def test_device_signal_combo(qtbot, mocked_client):
yield input, signal
def test_device_signal_base_init(device_signal_base):
"""Test if the DeviceSignalInputBase is initialized correctly"""
assert device_signal_base._device is None
assert device_signal_base._signal_filter == set()
assert device_signal_base._signals == []
assert device_signal_base._hinted_signals == []
assert device_signal_base._normal_signals == []
assert device_signal_base._config_signals == []
def test_signal_combobox_init(device_signal_combobox):
assert device_signal_combobox._device is None
assert device_signal_combobox._signal_filter == {Kind.config, Kind.normal, Kind.hinted}
assert device_signal_combobox._signals == []
assert device_signal_combobox._hinted_signals == []
assert device_signal_combobox._normal_signals == []
assert device_signal_combobox._config_signals == []
assert device_signal_combobox.autocomplete is False
assert device_signal_combobox.completer() is not None
assert device_signal_combobox.completer().model() == device_signal_combobox.model()
def test_device_signal_qproperties(device_signal_base):
"""Test if the DeviceSignalInputBase has the correct QProperties"""
assert device_signal_base._signal_filter == set()
device_signal_base.include_config_signals = False
device_signal_base.include_normal_signals = False
assert device_signal_base._signal_filter == set()
device_signal_base.include_config_signals = True
assert device_signal_base._signal_filter == {Kind.config}
device_signal_base.include_normal_signals = True
assert device_signal_base._signal_filter == {Kind.config, Kind.normal}
device_signal_base.include_hinted_signals = True
assert device_signal_base._signal_filter == {Kind.config, Kind.normal, Kind.hinted}
device_signal_base.include_hinted_signals = True
assert device_signal_base._signal_filter == {Kind.config, Kind.normal, Kind.hinted}
device_signal_base.include_hinted_signals = True
assert device_signal_base._signal_filter == {Kind.config, Kind.normal, Kind.hinted}
device_signal_base.include_hinted_signals = False
assert device_signal_base._signal_filter == {Kind.config, Kind.normal}
def test_signal_combobox_autocomplete(qtbot, mocked_client):
widget = create_widget(
qtbot=qtbot, widget=SignalComboBox, client=mocked_client, autocomplete=True
)
widget.set_device("samx")
assert widget.autocomplete is True
assert widget.completer() is not None
assert widget.completer().model().stringList() == ["samx (readback)", "setpoint", "velocity"]
assert widget.completer().model() != widget.model()
widget.autocomplete = False
assert widget.completer() is not None
assert widget.completer().model() == widget.model()
def test_signal_combobox_qproperties(device_signal_combobox):
device_signal_combobox.include_config_signals = False
device_signal_combobox.include_normal_signals = False
device_signal_combobox.include_hinted_signals = False
assert device_signal_combobox._signal_filter == set()
device_signal_combobox.include_config_signals = True
assert device_signal_combobox._signal_filter == {Kind.config}
device_signal_combobox.include_normal_signals = True
assert device_signal_combobox._signal_filter == {Kind.config, Kind.normal}
device_signal_combobox.include_hinted_signals = True
assert device_signal_combobox._signal_filter == {Kind.config, Kind.normal, Kind.hinted}
device_signal_combobox.include_hinted_signals = False
assert device_signal_combobox._signal_filter == {Kind.config, Kind.normal}
def test_signal_combobox(qtbot, device_signal_combobox):
@@ -128,26 +117,9 @@ def test_signal_combobox(qtbot, device_signal_combobox):
assert device_signal_combobox._hinted_signals == [("fake_signal", {})]
def test_signal_lineedit(device_signal_line_edit):
"""Test the signal_combobox"""
assert device_signal_line_edit._signals == []
device_signal_line_edit.include_normal_signals = True
device_signal_line_edit.include_hinted_signals = True
device_signal_line_edit.include_config_signals = True
assert device_signal_line_edit.signals == []
device_signal_line_edit.set_device("samx")
assert device_signal_line_edit.signals == ["readback", "setpoint", "velocity"]
device_signal_line_edit.set_signal("readback")
assert device_signal_line_edit.text() == "readback"
assert device_signal_line_edit._is_valid_input is True
device_signal_line_edit.setText("invalid")
assert device_signal_line_edit._is_valid_input is False
def test_device_signal_input_base_cleanup(qtbot, mocked_client):
with mock.patch.object(mocked_client.callbacks, "remove"):
widget = DeviceInputWidget(client=mocked_client)
widget = SignalComboBox(client=mocked_client)
widget.close()
widget.deleteLater()
+333 -201
View File
@@ -19,19 +19,19 @@ from bec_widgets.widgets.containers.dock_area.basic_dock_area import (
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea, SaveProfileDialog
from bec_widgets.widgets.containers.dock_area.profile_utils import (
SETTINGS_KEYS,
default_profile_path,
baseline_profile_path,
get_profile_info,
is_profile_read_only,
is_quick_select,
list_profiles,
load_default_profile_screenshot,
load_user_profile_screenshot,
open_default_settings,
open_user_settings,
load_baseline_profile_screenshot,
load_runtime_profile_screenshot,
open_baseline_settings,
open_runtime_settings,
read_manifest,
restore_user_from_default,
restore_runtime_from_baseline,
runtime_profile_path,
set_quick_select,
user_profile_path,
write_manifest,
)
from bec_widgets.widgets.containers.dock_area.settings.dialogs import (
@@ -188,17 +188,17 @@ class _NamespaceProfiles:
def __init__(self, widget: BECDockArea):
self.namespace = widget.profile_namespace
def open_user(self, name: str):
return open_user_settings(name, namespace=self.namespace)
def open_runtime(self, name: str):
return open_runtime_settings(name, namespace=self.namespace)
def open_default(self, name: str):
return open_default_settings(name, namespace=self.namespace)
def open_baseline(self, name: str):
return open_baseline_settings(name, namespace=self.namespace)
def user_path(self, name: str) -> str:
return user_profile_path(name, namespace=self.namespace)
def runtime_path(self, name: str) -> str:
return runtime_profile_path(name, namespace=self.namespace)
def default_path(self, name: str) -> str:
return default_profile_path(name, namespace=self.namespace)
def baseline_path(self, name: str) -> str:
return baseline_profile_path(name, namespace=self.namespace)
def list_profiles(self) -> list[str]:
return list_profiles(namespace=self.namespace)
@@ -239,7 +239,7 @@ class TestBasicDockArea:
assert basic_dock_area.widget_map(bec_widgets_only=False)["panel_bec"] is panel_bec
def test_new_widget_string_creates_widget(self, basic_dock_area, qtbot):
basic_dock_area.new("DarkModeButton")
basic_dock_area.new("RingProgressBar")
qtbot.waitUntil(lambda: len(basic_dock_area.dock_list()) > 0, timeout=1000)
assert basic_dock_area.widget_list()
@@ -615,35 +615,6 @@ class TestBasicDockArea:
]
class TestAdvancedDockAreaInit:
"""Test initialization and basic properties."""
def test_init(self, advanced_dock_area):
assert advanced_dock_area is not None
assert isinstance(advanced_dock_area, BECDockArea)
assert advanced_dock_area.mode == "creator"
assert hasattr(advanced_dock_area, "dock_manager")
assert hasattr(advanced_dock_area, "toolbar")
assert hasattr(advanced_dock_area, "dark_mode_button")
assert hasattr(advanced_dock_area, "state_manager")
def test_rpc_and_plugin_flags(self):
assert BECDockArea.RPC is True
assert BECDockArea.PLUGIN is False
def test_user_access_list(self):
expected_methods = [
"new",
"widget_map",
"widget_list",
"workspace_is_locked",
"attach_all",
"delete_all",
]
for method in expected_methods:
assert method in BECDockArea.USER_ACCESS
class TestDockManagement:
"""Test dock creation, management, and manipulation."""
@@ -652,7 +623,7 @@ class TestDockManagement:
initial_count = len(advanced_dock_area.dock_list())
# Create a widget by string name
widget = advanced_dock_area.new("DarkModeButton")
widget = advanced_dock_area.new("RingProgressBar")
# Wait for the dock to be created (since it's async)
qtbot.wait(200)
@@ -720,7 +691,7 @@ class TestDockManagement:
initial_count = len(widget_map)
# Create a widget
advanced_dock_area.new("DarkModeButton")
advanced_dock_area.new("RingProgressBar")
qtbot.wait(200)
# Check widget map updated
@@ -734,7 +705,7 @@ class TestDockManagement:
initial_count = len(widget_list)
# Create a widget
advanced_dock_area.new("DarkModeButton")
advanced_dock_area.new("RingProgressBar")
qtbot.wait(200)
# Check widget list updated
@@ -744,8 +715,8 @@ class TestDockManagement:
def test_delete_all(self, advanced_dock_area, qtbot):
"""Test delete_all functionality."""
# Create multiple widgets
advanced_dock_area.new("DarkModeButton")
advanced_dock_area.new("DarkModeButton")
advanced_dock_area.new("RingProgressBar")
advanced_dock_area.new("RingProgressBar")
# Wait for docks to be created
qtbot.wait(200)
@@ -801,7 +772,7 @@ class TestWorkspaceLocking:
def test_lock_workspace_property_setter(self, advanced_dock_area, qtbot):
"""Test workspace_is_locked property setter."""
# Create a dock first
advanced_dock_area.new("DarkModeButton")
advanced_dock_area.new("RingProgressBar")
qtbot.wait(200)
# Initially unlocked
@@ -851,16 +822,6 @@ class TestDeveloperMode:
class TestToolbarFunctionality:
"""Test toolbar setup and functionality."""
def test_toolbar_setup(self, advanced_dock_area):
"""Test toolbar is properly set up."""
assert hasattr(advanced_dock_area, "toolbar")
assert hasattr(advanced_dock_area, "_ACTION_MAPPINGS")
# Check that action mappings are properly set
assert "menu_plots" in advanced_dock_area._ACTION_MAPPINGS
assert "menu_devices" in advanced_dock_area._ACTION_MAPPINGS
assert "menu_utils" in advanced_dock_area._ACTION_MAPPINGS
def test_toolbar_plot_actions(self, advanced_dock_area):
"""Test plot toolbar actions trigger widget creation."""
plot_actions = [
@@ -926,8 +887,8 @@ class TestToolbarFunctionality:
def test_attach_all_action(self, advanced_dock_area, qtbot):
"""Test attach_all toolbar action."""
# Create floating docks
advanced_dock_area.new("DarkModeButton", start_floating=True)
advanced_dock_area.new("DarkModeButton", start_floating=True)
advanced_dock_area.new("RingProgressBar", start_floating=True)
advanced_dock_area.new("RingProgressBar", start_floating=True)
qtbot.wait(200)
@@ -946,7 +907,7 @@ class TestToolbarFunctionality:
def test_load_profile_restores_floating_dock(self, advanced_dock_area, qtbot):
helper = profile_helper(advanced_dock_area)
settings = helper.open_user("floating_profile")
settings = helper.open_runtime("floating_profile")
settings.clear()
settings.setValue("profile/created_at", "2025-11-23T00:00:00Z")
@@ -955,7 +916,7 @@ class TestToolbarFunctionality:
# Floating entry
settings.setArrayIndex(0)
settings.setValue("object_name", "FloatingWaveform")
settings.setValue("widget_class", "DarkModeButton")
settings.setValue("widget_class", "RingProgressBar")
settings.setValue("closable", True)
settings.setValue("floatable", True)
settings.setValue("movable", True)
@@ -973,7 +934,7 @@ class TestToolbarFunctionality:
# Anchored entry
settings.setArrayIndex(1)
settings.setValue("object_name", "EmbeddedWaveform")
settings.setValue("widget_class", "DarkModeButton")
settings.setValue("widget_class", "RingProgressBar")
settings.setValue("closable", True)
settings.setValue("floatable", True)
settings.setValue("movable", True)
@@ -1215,18 +1176,6 @@ class TestPreviewPanel:
assert "No preview available" in panel.image_label.text()
class TestRestoreProfileDialog:
"""Test restore dialog confirmation flow."""
def test_confirm_accepts(self, monkeypatch):
monkeypatch.setattr(RestoreProfileDialog, "exec", lambda self: QDialog.Accepted)
assert RestoreProfileDialog.confirm(None, QPixmap(), QPixmap()) is True
def test_confirm_rejects(self, monkeypatch):
monkeypatch.setattr(RestoreProfileDialog, "exec", lambda self: QDialog.Rejected)
assert RestoreProfileDialog.confirm(None, QPixmap(), QPixmap()) is False
class TestProfileInfoAndScreenshots:
"""Tests for profile utilities metadata and screenshot helpers."""
@@ -1246,9 +1195,9 @@ class TestProfileInfoAndScreenshots:
settings.endArray()
settings.sync()
def test_get_profile_info_user_origin(self, temp_profile_dir):
name = "info_user"
settings = open_user_settings(name)
def test_get_profile_info_runtime_origin(self, temp_profile_dir):
name = "info_runtime"
settings = open_runtime_settings(name)
settings.setValue(profile_utils.SETTINGS_KEYS["created_at"], "2023-01-01T00:00:00Z")
settings.setValue("profile/author", "Custom")
set_quick_select(name, True)
@@ -1262,22 +1211,22 @@ class TestProfileInfoAndScreenshots:
assert info.is_quick_select is True
assert info.widget_count == 3
assert info.author == "User"
assert info.user_path.endswith(f"{name}.ini")
assert info.runtime_path.endswith(f"{name}.ini")
assert info.size_kb >= 0
def test_get_profile_info_default_only(self, temp_profile_dir):
name = "info_default"
settings = open_default_settings(name)
def test_get_profile_info_baseline_only(self, temp_profile_dir):
name = "info_baseline"
settings = open_baseline_settings(name)
self._write_manifest(settings, count=1)
user_path = user_profile_path(name)
if os.path.exists(user_path):
os.remove(user_path)
runtime_path = runtime_profile_path(name)
if os.path.exists(runtime_path):
os.remove(runtime_path)
info = get_profile_info(name)
assert info.origin == "settings"
assert info.user_path.endswith(f"{name}.ini")
assert info.baseline_path.endswith(f"{name}.ini")
assert info.widget_count == 1
def test_get_profile_info_module_readonly(self, module_profile_factory):
@@ -1289,10 +1238,10 @@ class TestProfileInfoAndScreenshots:
def test_get_profile_info_unknown_profile(self):
name = "nonexistent_profile"
if os.path.exists(user_profile_path(name)):
os.remove(user_profile_path(name))
if os.path.exists(default_profile_path(name)):
os.remove(default_profile_path(name))
if os.path.exists(runtime_profile_path(name)):
os.remove(runtime_profile_path(name))
if os.path.exists(baseline_profile_path(name)):
os.remove(baseline_profile_path(name))
info = get_profile_info(name)
@@ -1300,29 +1249,29 @@ class TestProfileInfoAndScreenshots:
assert info.is_read_only is False
assert info.widget_count == 0
def test_load_user_profile_screenshot(self, temp_profile_dir):
name = "user_screenshot"
settings = open_user_settings(name)
def test_load_runtime_profile_screenshot(self, temp_profile_dir):
name = "runtime_screenshot"
settings = open_runtime_settings(name)
settings.setValue(profile_utils.SETTINGS_KEYS["screenshot"], self.PNG_BYTES)
settings.sync()
pix = load_user_profile_screenshot(name)
pix = load_runtime_profile_screenshot(name)
assert pix is not None and not pix.isNull()
def test_load_default_profile_screenshot(self, temp_profile_dir):
name = "default_screenshot"
settings = open_default_settings(name)
def test_load_baseline_profile_screenshot(self, temp_profile_dir):
name = "baseline_screenshot"
settings = open_baseline_settings(name)
settings.setValue(profile_utils.SETTINGS_KEYS["screenshot"], self.PNG_BYTES)
settings.sync()
pix = load_default_profile_screenshot(name)
pix = load_baseline_profile_screenshot(name)
assert pix is not None and not pix.isNull()
def test_load_screenshot_from_settings_invalid(self, temp_profile_dir):
name = "invalid_screenshot"
settings = open_user_settings(name)
settings = open_runtime_settings(name)
settings.setValue(profile_utils.SETTINGS_KEYS["screenshot"], "not-an-image")
settings.sync()
@@ -1332,7 +1281,7 @@ class TestProfileInfoAndScreenshots:
def test_load_screenshot_from_settings_bytes(self, temp_profile_dir):
name = "bytes_screenshot"
settings = open_user_settings(name)
settings = open_runtime_settings(name)
settings.setValue(profile_utils.SETTINGS_KEYS["screenshot"], self.PNG_BYTES)
settings.sync()
@@ -1347,7 +1296,7 @@ class TestWorkSpaceManager:
@staticmethod
def _create_profiles(names):
for name in names:
settings = open_user_settings(name)
settings = open_runtime_settings(name)
settings.setValue("meta", "value")
settings.sync()
@@ -1411,7 +1360,7 @@ class TestWorkSpaceManager:
manager.delete_profile(name)
assert not os.path.exists(user_profile_path(name))
assert not os.path.exists(runtime_profile_path(name))
assert target.refresh_calls >= 1
def test_delete_readonly_profile_shows_message(
@@ -1441,21 +1390,23 @@ class TestWorkSpaceManager:
class TestAdvancedDockAreaRestoreAndDialogs:
"""Additional coverage for restore flows and workspace dialogs."""
def test_restore_user_profile_from_default_confirm_true(self, advanced_dock_area, monkeypatch):
def test_restore_runtime_profile_from_baseline_confirm_true(
self, advanced_dock_area, monkeypatch
):
profile_name = "profile_restore_true"
helper = profile_helper(advanced_dock_area)
helper.open_default(profile_name).sync()
helper.open_user(profile_name).sync()
helper.open_baseline(profile_name).sync()
helper.open_runtime(profile_name).sync()
advanced_dock_area._current_profile_name = profile_name
advanced_dock_area.isVisible = lambda: False
pix = QPixmap(8, 8)
pix.fill(Qt.red)
monkeypatch.setattr(
"bec_widgets.widgets.containers.dock_area.dock_area.load_user_profile_screenshot",
"bec_widgets.widgets.containers.dock_area.dock_area.load_runtime_profile_screenshot",
lambda name, namespace=None: pix,
)
monkeypatch.setattr(
"bec_widgets.widgets.containers.dock_area.dock_area.load_default_profile_screenshot",
"bec_widgets.widgets.containers.dock_area.dock_area.load_baseline_profile_screenshot",
lambda name, namespace=None: pix,
)
monkeypatch.setattr(
@@ -1465,12 +1416,12 @@ class TestAdvancedDockAreaRestoreAndDialogs:
with (
patch(
"bec_widgets.widgets.containers.dock_area.dock_area.restore_user_from_default"
"bec_widgets.widgets.containers.dock_area.dock_area.restore_runtime_from_baseline"
) as mock_restore,
patch.object(advanced_dock_area, "delete_all") as mock_delete_all,
patch.object(advanced_dock_area, "load_profile") as mock_load_profile,
):
advanced_dock_area.restore_user_profile_from_default()
advanced_dock_area.restore_baseline_profile(show_dialog=True)
assert mock_restore.call_count == 1
args, kwargs = mock_restore.call_args
@@ -1479,20 +1430,22 @@ class TestAdvancedDockAreaRestoreAndDialogs:
mock_delete_all.assert_called_once()
mock_load_profile.assert_called_once_with(profile_name)
def test_restore_user_profile_from_default_confirm_false(self, advanced_dock_area, monkeypatch):
def test_restore_runtime_profile_from_baseline_confirm_false(
self, advanced_dock_area, monkeypatch
):
profile_name = "profile_restore_false"
helper = profile_helper(advanced_dock_area)
helper.open_default(profile_name).sync()
helper.open_user(profile_name).sync()
helper.open_baseline(profile_name).sync()
helper.open_runtime(profile_name).sync()
advanced_dock_area._current_profile_name = profile_name
advanced_dock_area.isVisible = lambda: False
monkeypatch.setattr(
"bec_widgets.widgets.containers.dock_area.dock_area.load_user_profile_screenshot",
lambda name: QPixmap(),
"bec_widgets.widgets.containers.dock_area.dock_area.load_runtime_profile_screenshot",
lambda name, namespace=None: QPixmap(),
)
monkeypatch.setattr(
"bec_widgets.widgets.containers.dock_area.dock_area.load_default_profile_screenshot",
lambda name: QPixmap(),
"bec_widgets.widgets.containers.dock_area.dock_area.load_baseline_profile_screenshot",
lambda name, namespace=None: QPixmap(),
)
monkeypatch.setattr(
"bec_widgets.widgets.containers.dock_area.dock_area.RestoreProfileDialog.confirm",
@@ -1500,24 +1453,49 @@ class TestAdvancedDockAreaRestoreAndDialogs:
)
with patch(
"bec_widgets.widgets.containers.dock_area.dock_area.restore_user_from_default"
"bec_widgets.widgets.containers.dock_area.dock_area.restore_runtime_from_baseline"
) as mock_restore:
advanced_dock_area.restore_user_profile_from_default()
advanced_dock_area.restore_baseline_profile(show_dialog=True)
mock_restore.assert_not_called()
def test_restore_user_profile_from_default_no_target(self, advanced_dock_area, monkeypatch):
def test_restore_runtime_profile_from_baseline_without_dialog(self, advanced_dock_area):
profile_name = "alignment_scan"
helper = profile_helper(advanced_dock_area)
helper.open_baseline(profile_name).sync()
helper.open_runtime(profile_name).sync()
with (
patch(
"bec_widgets.widgets.containers.dock_area.dock_area.RestoreProfileDialog.confirm"
) as mock_confirm,
patch(
"bec_widgets.widgets.containers.dock_area.dock_area.restore_runtime_from_baseline"
) as mock_restore,
patch.object(advanced_dock_area, "delete_all") as mock_delete_all,
patch.object(advanced_dock_area, "load_profile") as mock_load_profile,
):
advanced_dock_area.restore_baseline_profile(profile_name, show_dialog=False)
mock_confirm.assert_not_called()
mock_restore.assert_called_once_with(
profile_name, namespace=advanced_dock_area.profile_namespace
)
mock_delete_all.assert_called_once()
mock_load_profile.assert_called_once_with(profile_name)
def test_restore_runtime_profile_from_baseline_no_target(self, advanced_dock_area, monkeypatch):
advanced_dock_area._current_profile_name = None
with patch(
"bec_widgets.widgets.containers.dock_area.dock_area.RestoreProfileDialog.confirm"
) as mock_confirm:
advanced_dock_area.restore_user_profile_from_default()
advanced_dock_area.restore_baseline_profile(show_dialog=True)
mock_confirm.assert_not_called()
def test_refresh_workspace_list_with_refresh_profiles(self, advanced_dock_area):
profile_name = "refresh_profile"
helper = profile_helper(advanced_dock_area)
helper.open_user(profile_name).sync()
helper.open_runtime(profile_name).sync()
# Simulate a normal named-profile state (not transient empty startup mode).
advanced_dock_area._empty_profile_active = False
advanced_dock_area._current_profile_name = profile_name
@@ -1572,8 +1550,8 @@ class TestAdvancedDockAreaRestoreAndDialogs:
active = "active_profile"
quick = "quick_profile"
helper = profile_helper(advanced_dock_area)
helper.open_user(active).sync()
helper.open_user(quick).sync()
helper.open_runtime(active).sync()
helper.open_runtime(quick).sync()
helper.set_quick_select(quick, True)
combo_stub = ComboStub()
@@ -1600,7 +1578,7 @@ class TestAdvancedDockAreaRestoreAndDialogs:
advanced_dock_area._current_profile_name = "manager_profile"
helper = profile_helper(advanced_dock_area)
helper.open_user("manager_profile").sync()
helper.open_runtime("manager_profile").sync()
advanced_dock_area.show_workspace_manager()
@@ -1635,18 +1613,104 @@ class TestProfileManagement:
def test_profile_path(self, temp_profile_dir):
"""Test profile path generation."""
path = user_profile_path("test_profile")
expected = os.path.join(temp_profile_dir, "user", "test_profile.ini")
path = runtime_profile_path("test_profile")
expected = os.path.join(temp_profile_dir, "runtime", "test_profile.ini")
assert path == expected
default_path = default_profile_path("test_profile")
expected_default = os.path.join(temp_profile_dir, "default", "test_profile.ini")
assert default_path == expected_default
baseline_path = baseline_profile_path("test_profile")
expected_baseline = os.path.join(temp_profile_dir, "baseline", "test_profile.ini")
assert baseline_path == expected_baseline
def test_open_settings(self, temp_profile_dir):
"""Test opening settings for a profile."""
settings = open_user_settings("test_profile")
assert isinstance(settings, QSettings)
def test_legacy_user_profile_is_mapped_to_runtime(self, temp_profile_dir):
"""Legacy user profiles are copied into the canonical runtime segment."""
name = "legacy_runtime"
legacy_dir = os.path.join(temp_profile_dir, "user")
os.makedirs(legacy_dir, exist_ok=True)
legacy_path = os.path.join(legacy_dir, f"{name}.ini")
legacy_settings = QSettings(legacy_path, QSettings.IniFormat)
legacy_settings.setValue("test/value", "legacy")
legacy_settings.sync()
canonical_path = runtime_profile_path(name)
assert not os.path.exists(canonical_path)
assert name in list_profiles()
assert os.path.exists(canonical_path)
assert open_runtime_settings(name).value("test/value", "", type=str) == "legacy"
def test_legacy_default_profile_is_mapped_to_baseline(self, temp_profile_dir):
"""Legacy default profiles are copied into the canonical baseline segment."""
name = "legacy_baseline"
legacy_dir = os.path.join(temp_profile_dir, "default")
os.makedirs(legacy_dir, exist_ok=True)
legacy_path = os.path.join(legacy_dir, f"{name}.ini")
legacy_settings = QSettings(legacy_path, QSettings.IniFormat)
legacy_settings.setValue("test/value", "legacy")
legacy_settings.sync()
canonical_path = baseline_profile_path(name)
assert not os.path.exists(canonical_path)
assert name in list_profiles()
assert os.path.exists(canonical_path)
assert open_baseline_settings(name).value("test/value", "", type=str) == "legacy"
def test_runtime_namespace_fallback_is_materialized(self, temp_profile_dir):
"""Canonical runtime namespace fallback is copied before opening primary settings."""
name = "runtime_namespace_fallback"
fallback_settings = open_runtime_settings(name)
fallback_settings.setValue("test/value", "fallback")
fallback_settings.sync()
namespaced_path = runtime_profile_path(name, namespace="beamline")
assert not os.path.exists(namespaced_path)
settings = open_runtime_settings(name, namespace="beamline")
assert os.path.exists(namespaced_path)
assert settings.value("test/value", "", type=str) == "fallback"
def test_baseline_namespace_fallback_is_materialized(self, temp_profile_dir):
"""Canonical baseline namespace fallback is copied before opening primary settings."""
name = "baseline_namespace_fallback"
fallback_settings = open_baseline_settings(name)
fallback_settings.setValue("test/value", "fallback")
fallback_settings.sync()
namespaced_path = baseline_profile_path(name, namespace="beamline")
assert not os.path.exists(namespaced_path)
settings = open_baseline_settings(name, namespace="beamline")
assert os.path.exists(namespaced_path)
assert settings.value("test/value", "", type=str) == "fallback"
def test_canonical_profile_wins_over_legacy_profile(self, temp_profile_dir):
"""Canonical runtime/baseline files are not overwritten by legacy fallback files."""
name = "canonical_wins"
runtime_settings = open_runtime_settings(name)
runtime_settings.setValue("test/value", "canonical-runtime")
runtime_settings.sync()
baseline_settings = open_baseline_settings(name)
baseline_settings.setValue("test/value", "canonical-baseline")
baseline_settings.sync()
for segment, value in (("user", "legacy-runtime"), ("default", "legacy-baseline")):
legacy_dir = os.path.join(temp_profile_dir, segment)
os.makedirs(legacy_dir, exist_ok=True)
legacy_settings = QSettings(
os.path.join(legacy_dir, f"{name}.ini"), QSettings.IniFormat
)
legacy_settings.setValue("test/value", value)
legacy_settings.sync()
assert name in list_profiles()
assert open_runtime_settings(name).value("test/value", "", type=str) == "canonical-runtime"
assert (
open_baseline_settings(name).value("test/value", "", type=str) == "canonical-baseline"
)
def test_list_profiles_empty(self, temp_profile_dir):
"""Test listing profiles when directory is empty."""
@@ -1666,7 +1730,7 @@ class TestProfileManagement:
# Create some test profile files
profile_names = ["profile1", "profile2", "profile3"]
for name in profile_names:
settings = open_user_settings(name)
settings = open_runtime_settings(name)
settings.setValue("test", "value")
settings.sync()
@@ -1676,29 +1740,29 @@ class TestProfileManagement:
def test_readonly_profile_operations(self, temp_profile_dir, module_profile_factory):
"""Test read-only profile functionality."""
profile_name = "user_profile"
profile_name = "runtime_profile"
# Initially should not be read-only
assert not is_profile_read_only(profile_name)
# Create a user profile and ensure it's writable
settings = open_user_settings(profile_name)
# Create a runtime profile and ensure it's writable
settings = open_runtime_settings(profile_name)
settings.setValue("test", "value")
settings.sync()
assert not is_profile_read_only(profile_name)
# Verify a bundled module profile is detected as read-only
readonly_name = module_profile_factory("module_default")
readonly_name = module_profile_factory("module_baseline")
assert is_profile_read_only(readonly_name)
def test_write_and_read_manifest(self, temp_profile_dir, advanced_dock_area, qtbot):
"""Test writing and reading dock manifest."""
settings = open_user_settings("test_manifest")
settings = open_runtime_settings("test_manifest")
# Create real docks
advanced_dock_area.new("DarkModeButton")
advanced_dock_area.new("DarkModeButton")
advanced_dock_area.new("DarkModeButton")
advanced_dock_area.new("RingProgressBar")
advanced_dock_area.new("RingProgressBar")
advanced_dock_area.new("RingProgressBar")
# Wait for docks to be created
qtbot.wait(1000)
@@ -1723,18 +1787,18 @@ class TestProfileManagement:
def test_restore_preserves_quick_select(self, temp_profile_dir):
"""Ensure restoring keeps the quick select flag when it was enabled."""
profile_name = "restorable_profile"
default_settings = open_default_settings(profile_name)
default_settings.setValue("test", "default")
default_settings.sync()
baseline_settings = open_baseline_settings(profile_name)
baseline_settings.setValue("test", "baseline")
baseline_settings.sync()
user_settings = open_user_settings(profile_name)
user_settings.setValue("test", "user")
user_settings.sync()
runtime_settings = open_runtime_settings(profile_name)
runtime_settings.setValue("test", "runtime")
runtime_settings.sync()
set_quick_select(profile_name, True)
assert is_quick_select(profile_name)
restore_user_from_default(profile_name)
restore_runtime_from_baseline(profile_name)
assert is_quick_select(profile_name)
@@ -1758,7 +1822,7 @@ class TestWorkspaceProfileOperations:
widget.prepare_for_shutdown()
mock_write.assert_not_called()
helper.open_user("real_profile").sync()
helper.open_runtime("real_profile").sync()
widget.load_profile("real_profile")
assert widget._empty_profile_active is False
assert widget._empty_profile_consumed is True
@@ -1772,7 +1836,7 @@ class TestWorkspaceProfileOperations:
profile_name = module_profile_factory("readonly_profile")
new_profile = f"{profile_name}_custom"
helper = profile_helper(advanced_dock_area)
target_path = helper.user_path(new_profile)
target_path = helper.runtime_path(new_profile)
if os.path.exists(target_path):
os.remove(target_path)
@@ -1802,11 +1866,11 @@ class TestWorkspaceProfileOperations:
helper = profile_helper(advanced_dock_area)
# Create a profile with manifest
settings = helper.open_user(profile_name)
settings = helper.open_runtime(profile_name)
settings.beginWriteArray("manifest/widgets", 1)
settings.setArrayIndex(0)
settings.setValue("object_name", "test_widget")
settings.setValue("widget_class", "DarkModeButton")
settings.setValue("widget_class", "RingProgressBar")
settings.setValue("closable", True)
settings.setValue("floatable", True)
settings.setValue("movable", True)
@@ -1823,6 +1887,83 @@ class TestWorkspaceProfileOperations:
widget_map = advanced_dock_area.widget_map()
assert "test_widget" in widget_map
def test_load_profile_default_does_not_restore_baseline(self, advanced_dock_area):
"""Regular profile loading should not restore the runtime copy."""
profile_name = "load_without_baseline_restore"
helper = profile_helper(advanced_dock_area)
helper.open_runtime(profile_name).sync()
with patch(
"bec_widgets.widgets.containers.dock_area.dock_area.restore_runtime_from_baseline"
) as mock_restore:
advanced_dock_area.load_profile(profile_name)
mock_restore.assert_not_called()
assert advanced_dock_area._current_profile_name == profile_name
def test_load_profile_restores_baseline_without_dialog(self, advanced_dock_area):
"""CLI loading can restore the runtime copy from baseline without confirmation."""
profile_name = "alignment_scan"
helper = profile_helper(advanced_dock_area)
helper.open_baseline(profile_name).sync()
helper.open_runtime(profile_name).sync()
with (
patch(
"bec_widgets.widgets.containers.dock_area.dock_area.RestoreProfileDialog.confirm"
) as mock_confirm,
patch(
"bec_widgets.widgets.containers.dock_area.dock_area.restore_runtime_from_baseline"
) as mock_restore,
):
advanced_dock_area.load_profile(profile_name, restore_baseline=True)
mock_confirm.assert_not_called()
mock_restore.assert_called_once_with(
profile_name, namespace=advanced_dock_area.profile_namespace
)
assert advanced_dock_area._current_profile_name == profile_name
def test_load_profile_materializes_runtime_namespace_fallback(self, advanced_dock_area):
"""Loading a runtime fallback copies it into the active namespace before opening."""
profile_name = "load_runtime_namespace_fallback"
helper = profile_helper(advanced_dock_area)
fallback_settings = open_runtime_settings(profile_name)
fallback_settings.setValue("test/value", "fallback")
fallback_settings.sync()
namespaced_path = helper.runtime_path(profile_name)
assert not os.path.exists(namespaced_path)
advanced_dock_area.load_profile(profile_name)
assert os.path.exists(namespaced_path)
assert (
QSettings(namespaced_path, QSettings.IniFormat).value("test/value", "", type=str)
== "fallback"
)
assert advanced_dock_area._current_profile_name == profile_name
def test_load_profile_materializes_baseline_namespace_fallback(self, advanced_dock_area):
"""Loading a baseline fallback copies it into the active namespace before opening."""
profile_name = "load_baseline_namespace_fallback"
helper = profile_helper(advanced_dock_area)
fallback_settings = open_baseline_settings(profile_name)
fallback_settings.setValue("test/value", "fallback")
fallback_settings.sync()
namespaced_path = helper.baseline_path(profile_name)
assert not os.path.exists(namespaced_path)
advanced_dock_area.load_profile(profile_name)
assert os.path.exists(namespaced_path)
assert (
QSettings(namespaced_path, QSettings.IniFormat).value("test/value", "", type=str)
== "fallback"
)
assert advanced_dock_area._current_profile_name == profile_name
def test_save_as_skips_autosave_source_profile(
self, advanced_dock_area, temp_profile_dir, qtbot
):
@@ -1831,11 +1972,11 @@ class TestWorkspaceProfileOperations:
new_profile = "autosave_new"
helper = profile_helper(advanced_dock_area)
settings = helper.open_user(source_profile)
settings = helper.open_runtime(source_profile)
settings.beginWriteArray("manifest/widgets", 1)
settings.setArrayIndex(0)
settings.setValue("object_name", "source_widget")
settings.setValue("widget_class", "DarkModeButton")
settings.setValue("widget_class", "RingProgressBar")
settings.setValue("closable", True)
settings.setValue("floatable", True)
settings.setValue("movable", True)
@@ -1844,7 +1985,7 @@ class TestWorkspaceProfileOperations:
advanced_dock_area.load_profile(source_profile)
qtbot.wait(500)
advanced_dock_area.new("DarkModeButton")
advanced_dock_area.new("RingProgressBar")
qtbot.wait(500)
class StubDialog:
@@ -1863,11 +2004,16 @@ class TestWorkspaceProfileOperations:
with patch(
"bec_widgets.widgets.containers.dock_area.dock_area.SaveProfileDialog", StubDialog
):
advanced_dock_area.save_profile(show_dialog=True)
widgets_before_save = list(advanced_dock_area.widget_list())
with patch.object(advanced_dock_area, "load_profile") as mock_load_profile:
advanced_dock_area.save_profile(show_dialog=True)
qtbot.wait(100)
mock_load_profile.assert_not_called()
qtbot.wait(500)
source_manifest = read_manifest(helper.open_user(source_profile))
new_manifest = read_manifest(helper.open_user(new_profile))
assert list(advanced_dock_area.widget_list()) == widgets_before_save
source_manifest = read_manifest(helper.open_runtime(source_profile))
new_manifest = read_manifest(helper.open_runtime(new_profile))
assert len(source_manifest) == 1
assert len(new_manifest) == 2
@@ -1879,11 +2025,11 @@ class TestWorkspaceProfileOperations:
helper = profile_helper(advanced_dock_area)
for profile in (profile_a, profile_b):
settings = helper.open_user(profile)
settings = helper.open_runtime(profile)
settings.beginWriteArray("manifest/widgets", 1)
settings.setArrayIndex(0)
settings.setValue("object_name", f"{profile}_widget")
settings.setValue("widget_class", "DarkModeButton")
settings.setValue("widget_class", "RingProgressBar")
settings.setValue("closable", True)
settings.setValue("floatable", True)
settings.setValue("movable", True)
@@ -1892,13 +2038,13 @@ class TestWorkspaceProfileOperations:
advanced_dock_area.load_profile(profile_a)
qtbot.wait(500)
advanced_dock_area.new("DarkModeButton")
advanced_dock_area.new("RingProgressBar")
qtbot.wait(500)
advanced_dock_area.load_profile(profile_b)
qtbot.wait(500)
manifest_a = read_manifest(helper.open_user(profile_a))
manifest_a = read_manifest(helper.open_runtime(profile_a))
assert len(manifest_a) == 2
def test_delete_profile_readonly(
@@ -1907,15 +2053,15 @@ class TestWorkspaceProfileOperations:
"""Test deleting bundled profile removes only the writable copy."""
profile_name = module_profile_factory("readonly_profile")
helper = profile_helper(advanced_dock_area)
helper.list_profiles() # ensure default and user copies are materialized
helper.open_default(profile_name).sync()
settings = helper.open_user(profile_name)
helper.list_profiles() # ensure baseline and runtime copies are materialized
helper.open_baseline(profile_name).sync()
settings = helper.open_runtime(profile_name)
settings.setValue("test", "value")
settings.sync()
user_path = helper.user_path(profile_name)
default_path = helper.default_path(profile_name)
assert os.path.exists(user_path)
assert os.path.exists(default_path)
runtime_path = helper.runtime_path(profile_name)
baseline_path = helper.baseline_path(profile_name)
assert os.path.exists(runtime_path)
assert os.path.exists(baseline_path)
with patch.object(advanced_dock_area.toolbar.components, "get_action") as mock_get_action:
mock_combo = MagicMock()
@@ -1936,9 +2082,9 @@ class TestWorkspaceProfileOperations:
mock_question.assert_not_called()
mock_info.assert_called_once()
# Read-only profile should remain intact (user + default copies)
assert os.path.exists(user_path)
assert os.path.exists(default_path)
# Read-only profile should remain intact (runtime + baseline copies)
assert os.path.exists(runtime_path)
assert os.path.exists(baseline_path)
def test_delete_profile_success(self, advanced_dock_area, temp_profile_dir):
"""Test successful profile deletion."""
@@ -1946,11 +2092,11 @@ class TestWorkspaceProfileOperations:
helper = profile_helper(advanced_dock_area)
# Create regular profile
settings = helper.open_user(profile_name)
settings = helper.open_runtime(profile_name)
settings.setValue("test", "value")
settings.sync()
user_path = helper.user_path(profile_name)
assert os.path.exists(user_path)
runtime_path = helper.runtime_path(profile_name)
assert os.path.exists(runtime_path)
with patch.object(advanced_dock_area.toolbar.components, "get_action") as mock_get_action:
mock_combo = MagicMock()
@@ -1968,7 +2114,7 @@ class TestWorkspaceProfileOperations:
mock_question.assert_called_once()
mock_refresh.assert_called_once()
# Profile should be deleted
assert not os.path.exists(user_path)
assert not os.path.exists(runtime_path)
def test_delete_profile_cli_usage(self, advanced_dock_area, temp_profile_dir):
"""Test delete_profile with explicit name (CLI usage - no dialog by default)."""
@@ -1976,24 +2122,24 @@ class TestWorkspaceProfileOperations:
helper = profile_helper(advanced_dock_area)
# Create regular profile
settings = helper.open_user(profile_name)
settings = helper.open_runtime(profile_name)
settings.setValue("test", "value")
settings.sync()
user_path = helper.user_path(profile_name)
assert os.path.exists(user_path)
runtime_path = helper.runtime_path(profile_name)
assert os.path.exists(runtime_path)
# Delete without dialog (CLI usage - default behavior)
result = advanced_dock_area.delete_profile(profile_name)
assert result is True
assert not os.path.exists(user_path)
assert not os.path.exists(runtime_path)
def test_refresh_workspace_list(self, advanced_dock_area, temp_profile_dir):
"""Test refreshing workspace list."""
# Create some profiles
helper = profile_helper(advanced_dock_area)
for name in ["profile1", "profile2"]:
settings = helper.open_user(name)
settings = helper.open_runtime(name)
settings.setValue("test", "value")
settings.sync()
@@ -2033,20 +2179,6 @@ class TestCleanupAndMisc:
# Verify dock was removed
assert len(advanced_dock_area.dock_list()) == initial_count - 1
def test_apply_dock_lock(self, advanced_dock_area, qtbot):
"""Test _apply_dock_lock functionality."""
# Create a dock first
advanced_dock_area.new("DarkModeButton")
qtbot.wait(200)
# Test locking
advanced_dock_area._apply_dock_lock(True)
# No assertion needed - just verify it doesn't crash
# Test unlocking
advanced_dock_area._apply_dock_lock(False)
# No assertion needed - just verify it doesn't crash
def test_make_dock(self, advanced_dock_area):
"""Test _make_dock functionality."""
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import (
@@ -2336,8 +2468,8 @@ class TestModeTransitions:
def test_mode_switching_preserves_existing_docks(self, advanced_dock_area, qtbot):
"""Test that mode switching doesn't affect existing docked widgets."""
# Create some widgets
advanced_dock_area.new("DarkModeButton")
advanced_dock_area.new("DarkModeButton")
advanced_dock_area.new("RingProgressBar")
advanced_dock_area.new("RingProgressBar")
qtbot.wait(200)
initial_dock_count = len(advanced_dock_area.dock_list())
+28 -54
View File
@@ -1,8 +1,9 @@
import pytest
from qtpy.QtWidgets import QComboBox
from bec_widgets.utils.filter_io import FilterIO
from bec_widgets.widgets.control.device_input.device_line_edit.device_line_edit import (
DeviceLineEdit,
from bec_widgets.utils.filter_io import (
combobox_contains_text,
get_bec_signals_for_classes,
replace_combobox_items,
)
from bec_widgets.widgets.dap.dap_combo_box.dap_combo_box import DapComboBox
@@ -10,65 +11,38 @@ from .client_mocks import mocked_client
from .conftest import create_widget
@pytest.fixture(scope="function")
def dap_mock(qtbot, mocked_client):
"""Fixture for QLineEdit widget"""
models = ["GaussianModel", "LorentzModel", "SineModel"]
mocked_client.dap._available_dap_plugins.keys.return_value = models
def test_replace_combobox_items(qtbot, mocked_client):
widget = create_widget(qtbot, DapComboBox, client=mocked_client)
return widget
replace_combobox_items(widget, ["testA", ("testB", {"payload": True})])
assert widget.count() == 2
assert widget.itemText(0) == "testA"
assert widget.itemText(1) == "testB"
assert widget.itemData(1) == {"payload": True}
assert combobox_contains_text(widget, "testA") is True
assert combobox_contains_text(widget, "missing") is False
@pytest.fixture(scope="function")
def line_edit_mock(qtbot, mocked_client):
"""Fixture for QLineEdit widget"""
widget = create_widget(qtbot, DeviceLineEdit, client=mocked_client)
return widget
def test_set_selection_combo_box(dap_mock):
"""Test set selection for QComboBox using DapComboBox"""
assert dap_mock.fit_model_combobox.count() == 3
FilterIO.set_selection(dap_mock.fit_model_combobox, selection=["testA", "testB"])
assert dap_mock.fit_model_combobox.count() == 2
assert FilterIO.check_input(widget=dap_mock.fit_model_combobox, text="testA") is True
def test_set_selection_line_edit(line_edit_mock):
"""Test set selection for QComboBox using DapComboBox"""
FilterIO.set_selection(line_edit_mock, selection=["testA", "testB"])
assert line_edit_mock.completer.model().rowCount() == 2
model = line_edit_mock.completer.model()
model_data = [model.data(model.index(i)) for i in range(model.rowCount())]
assert model_data == ["testA", "testB"]
assert FilterIO.check_input(widget=line_edit_mock, text="testA") is True
FilterIO.set_selection(line_edit_mock, selection=["testC"])
assert FilterIO.check_input(widget=line_edit_mock, text="testA") is False
assert FilterIO.check_input(widget=line_edit_mock, text="testC") is True
def test_update_with_signal_class_combo_box_ndim_filter(dap_mock, mocked_client):
def test_get_bec_signals_for_classes_ndim_filter(mocked_client):
signals = [
("dev1", "sig1", {"describe": {"signal_info": {"ndim": 1}}}),
("dev1", "sig2", {"describe": {"signal_info": {"ndim": 2}}}),
]
mocked_client.device_manager.get_bec_signals = lambda _filters: signals
out = FilterIO.update_with_signal_class(
widget=dap_mock.fit_model_combobox,
signal_class_filter=["AsyncSignal"],
client=mocked_client,
ndim_filter=1,
out = get_bec_signals_for_classes(
client=mocked_client, signal_class_filter=["AsyncSignal"], ndim_filter=1
)
assert out == [("dev1", "sig1", {"describe": {"signal_info": {"ndim": 1}}})]
def test_update_with_signal_class_line_edit_passthrough(line_edit_mock, mocked_client):
signals = [("dev1", "sig1", {"describe": {"signal_info": {"ndim": 1}}})]
mocked_client.device_manager.get_bec_signals = lambda _filters: signals
out = FilterIO.update_with_signal_class(
widget=line_edit_mock,
signal_class_filter=["AsyncSignal"],
client=mocked_client,
ndim_filter=1,
)
assert out == signals
def test_replace_combobox_items_empty(qtbot):
widget = QComboBox()
qtbot.addWidget(widget)
widget.addItem("old")
replace_combobox_items(widget, [])
assert widget.count() == 0
+25 -1
View File
@@ -5,7 +5,8 @@ import black
import isort
import pytest
from bec_widgets.utils.generate_cli import ClientGenerator
from bec_widgets.utils.generate_cli import ClientGenerator, write_designer_plugins
from bec_widgets.utils.generate_designer_plugin import DesignerPluginInfo
from bec_widgets.utils.plugin_utils import BECClassContainer, BECClassInfo
# pylint: disable=missing-function-docstring
@@ -59,6 +60,14 @@ class MockViewWithContent:
"""Activate view."""
class MockDesignerWidgetBase:
ICON_NAME = "mock_icon"
class MockDesignerWidget(MockDesignerWidgetBase):
pass
def test_client_generator_with_black_formatting():
generator = ClientGenerator(base=True)
container = BECClassContainer()
@@ -285,3 +294,18 @@ c = a + b"""
content = file.read()
assert corrected in content
def test_write_designer_plugins(tmp_path):
file_name = tmp_path / "designer_plugins.py"
write_designer_plugins([DesignerPluginInfo(MockDesignerWidget)], str(file_name))
with open(file_name, "r", encoding="utf-8") as file:
content = file.read()
assert '"MockDesignerWidget":' in content
assert '"tests.unit_tests.test_generate_cli_client"' in content
assert '"MockDesignerWidget"' in content
assert '"MockDesignerWidget": "mock_icon"' in content
assert "MockDesignerWidgetPlugin" not in content
+2 -2
View File
@@ -62,8 +62,8 @@ TEST_LOG_MESSAGES = [
@pytest.fixture
def log_panel(qtbot, mocked_client):
mocked_client.connector.xread = lambda *_, **__: TEST_LOG_MESSAGES
def log_panel(qtbot, mocked_client, monkeypatch):
monkeypatch.setattr(mocked_client.connector, "xread", lambda *_, **__: TEST_LOG_MESSAGES)
widget = LogPanel()
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
+1 -3
View File
@@ -247,12 +247,10 @@ def test_bec_weblinks(monkeypatch):
monkeypatch.setattr(webbrowser, "open", fake_open)
BECWebLinksMixin.open_bec_docs()
BECWebLinksMixin.open_bec_widgets_docs()
BECWebLinksMixin.open_bec_bug_report()
assert opened_urls == [
"https://beamline-experiment-control.readthedocs.io/en/latest/",
"https://bec.readthedocs.io/projects/bec-widgets/en/latest/",
"https://bec-project.github.io/bec_docs/",
"https://github.com/bec-project/bec_widgets/issues",
]
+6 -2
View File
@@ -14,7 +14,9 @@ from .test_scan_control import available_scans_message
@pytest.fixture
def monaco_widget(qtbot, mocked_client):
widget = MonacoWidget(client=mocked_client)
mocked_client.connector.set(MessageEndpoints.available_scans(), available_scans_message)
mocked_client.connector.set_and_publish(
MessageEndpoints.available_scans(), available_scans_message
)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
@@ -62,7 +64,9 @@ def test_monaco_widget_get_scan_control_code(monaco_widget: MonacoWidget, qtbot,
"""
Test that the MonacoWidget can get scan control code from the dialog.
"""
mocked_client.connector.set(MessageEndpoints.available_scans(), available_scans_message)
mocked_client.connector.set_and_publish(
MessageEndpoints.available_scans(), available_scans_message
)
scan_control_dialog = ScanControlDialog(client=mocked_client)
qtbot.addWidget(scan_control_dialog)
+3 -5
View File
@@ -12,9 +12,7 @@ from bec_widgets.widgets.control.device_control.positioner_box import (
PositionerBox,
PositionerControlLine,
)
from bec_widgets.widgets.control.device_input.device_line_edit.device_line_edit import (
DeviceLineEdit,
)
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from .client_mocks import mocked_client
from .conftest import create_widget
@@ -164,8 +162,8 @@ def test_positioner_box_open_dialog_selection(qtbot, positioner_box):
# pylint: disable=protected-access
assert positioner_box._dialog is not None
qtbot.waitUntil(lambda: positioner_box._dialog.isVisible() is True, timeout=1000)
line_edit = positioner_box._dialog.findChild(DeviceLineEdit)
line_edit.setText("samy")
line_edit = positioner_box._dialog.findChild(DeviceComboBox)
line_edit.setCurrentText("samy")
close_button = positioner_box._dialog.findChild(QPushButton)
assert close_button.text() == "Close"
qtbot.mouseClick(close_button, Qt.LeftButton)
+15 -15
View File
@@ -1,7 +1,6 @@
from unittest.mock import patch
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.plugin_utils import BECClassContainer, BECClassInfo
from bec_widgets.utils import plugin_utils
from bec_widgets.utils.rpc_widget_handler import RPCWidgetHandler
@@ -10,21 +9,22 @@ def test_rpc_widget_handler():
assert "Image" in handler.widget_classes
assert "RingProgressBar" in handler.widget_classes
assert "BECDockArea" in handler.widget_classes
class _TestPluginWidget(BECWidget): ...
assert isinstance(handler.widget_classes["Image"], tuple)
@patch(
"bec_widgets.utils.rpc_widget_handler.get_all_plugin_widgets",
return_value=BECClassContainer(
[
BECClassInfo(name="DeviceComboBox", obj=_TestPluginWidget, module="", file=""),
BECClassInfo(name="NewPluginWidget", obj=_TestPluginWidget, module="", file=""),
]
),
"bec_widgets.utils.bec_plugin_helper.get_plugin_rpc_widget_registry",
return_value={
"Image": ("plugin.module", "PluginImage"),
"NewPluginWidget": ("plugin.module", "NewPluginWidget"),
},
)
def test_duplicate_plugins_not_allowed(_):
handler = RPCWidgetHandler()
assert handler.widget_classes["DeviceComboBox"] is not _TestPluginWidget
assert handler.widget_classes["NewPluginWidget"] is _TestPluginWidget
plugin_utils.rpc_widget_registry.cache_clear()
try:
handler = RPCWidgetHandler()
assert handler.widget_classes["Image"] != ("plugin.module", "PluginImage")
assert handler.widget_classes["NewPluginWidget"] == ("plugin.module", "NewPluginWidget")
finally:
plugin_utils.rpc_widget_registry.cache_clear()
+51 -8
View File
@@ -9,6 +9,7 @@ from qtpy.QtCore import QModelIndex, Qt
from bec_widgets.utils.forms_from_types.items import StrFormItem
from bec_widgets.utils.widget_io import WidgetIO
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.scan_control import ScanControl
from .client_mocks import mocked_client
@@ -256,7 +257,9 @@ scan_history = ScanHistoryMessage(
@pytest.fixture(scope="function")
def scan_control(qtbot, mocked_client): # , mock_dev):
mocked_client.connector.set(MessageEndpoints.available_scans(), available_scans_message)
mocked_client.connector.set_and_publish(
MessageEndpoints.available_scans(), available_scans_message
)
mocked_client.connector.xadd(
topic=MessageEndpoints.scan_history(), msg_dict={"data": scan_history}
)
@@ -302,6 +305,13 @@ def test_on_scan_selected(scan_control, scan_name):
assert widget is not None # Confirm that a widget exists
expected_widget_type = scan_control.arg_box.WIDGET_HANDLER.get(arg_value, None)
assert isinstance(widget, expected_widget_type) # Confirm the widget type matches
if isinstance(widget, DeviceComboBox):
assert widget.currentText() == ""
assert widget.autocomplete is True
assert "samx" in widget.devices
assert (
"async_device" in widget.devices
) # async device should also be present in the device list
# Check kwargs boxes
kwargs_group = [param for param in expected_scan_info["gui_config"]["kwarg_groups"]]
@@ -501,12 +511,47 @@ def test_changing_scans_remember_parameters(scan_control, mocked_client):
assert grid_kwargs["burst_at_each_point"] == kwargs["burst_at_each_point"]
@pytest.mark.skip(reason="Unreliable - GH issue #1134")
def test_get_scan_parameters_from_redis(scan_control, mocked_client):
def test_scan_selection_does_not_fetch_last_scan_parameters(
scan_control, mocked_client, monkeypatch
):
xread = MagicMock(wraps=mocked_client.connector.xread)
monkeypatch.setattr(mocked_client.connector, "xread", xread)
scan_control.comboBox_scan_selection.setCurrentText("line_scan")
assert scan_control.comboBox_scan_selection.currentText() == "line_scan"
scan_control.comboBox_scan_selection.setCurrentText("grid_scan")
xread.assert_not_called()
def test_restore_last_scan_parameters_button_fetches_on_demand(
scan_control, mocked_client, monkeypatch
):
xread = MagicMock(wraps=mocked_client.connector.xread)
monkeypatch.setattr(mocked_client.connector, "xread", xread)
scan_control.comboBox_scan_selection.setCurrentText("grid_scan")
scan_control.comboBox_scan_selection.setCurrentText("line_scan")
xread.assert_not_called()
scan_control.last_scan_button.click()
xread.assert_called_once_with(
MessageEndpoints.scan_history(), from_start=True, user_id=scan_control.object_name
)
args, kwargs = scan_control.get_scan_parameters(bec_object=False)
assert args == ["samx", 0.0, 2.0]
assert kwargs["steps"] == 10
assert kwargs["relative"] is False
assert kwargs["exp_time"] == 2
def test_get_scan_parameters_from_redis(scan_control):
scan_name = "line_scan"
scan_control.comboBox_scan_selection.setCurrentText(scan_name)
scan_control.toggle.checked = True
scan_control.last_scan_button.click()
args, kwargs = scan_control.get_scan_parameters(bec_object=False)
@@ -586,8 +631,7 @@ def test_scan_metadata_is_passed_to_scan_function(scan_control: ScanControl):
scans.grid_scan.assert_called_once_with(metadata=TEST_MD)
@pytest.mark.skip(reason="Unreliable - GH issue #1134")
def test_restore_parameters_with_fewer_arg_bundles(scan_control, qtbot):
def test_restore_parameters_with_fewer_arg_bundles(scan_control):
"""
Ensure that when more argument bundles are present than exist in the
stored history, restoring parameters regenerates the arg box to the
@@ -603,8 +647,7 @@ def test_restore_parameters_with_fewer_arg_bundles(scan_control, qtbot):
assert scan_control.arg_box.count_arg_rows() == 3
# Trigger restore of parameters from history
scan_control.toggle.checked = True
qtbot.wait(200)
scan_control.last_scan_button.click()
# After restore, arg_box should have only one bundle (the history size)
assert scan_control.arg_box.count_arg_rows() == 1
+10 -7
View File
@@ -6,8 +6,8 @@ import pytest
from qtpy import QtCore
from qtpy.QtWidgets import QDialogButtonBox, QLabel
from bec_widgets.widgets.control.device_input.base_classes.device_signal_input_base import (
DeviceSignalInputBaseConfig,
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import (
SignalComboBoxConfig,
)
from bec_widgets.widgets.utility.signal_label.signal_label import ChoiceDialog, SignalLabel
@@ -61,7 +61,7 @@ SAMX_INFO_DICT = {
@pytest.fixture
def signal_label(qtbot, mocked_client: MagicMock):
with patch.object(mocked_client.device_manager.devices.samx, "_info", SAMX_INFO_DICT):
config = DeviceSignalInputBaseConfig(device="samx", default="samx")
config = SignalComboBoxConfig(device="samx", default="samx")
widget = SignalLabel(
config=config, custom_label="Test Label", custom_units="m/s", client=mocked_client
)
@@ -149,7 +149,8 @@ def test_choose_signal_dialog_sends_choices(signal_label: SignalLabel, qtbot):
dialog = signal_label.show_choice_dialog()
qtbot.waitUntil(dialog.button_box.button(QDialogButtonBox.Ok).isVisible, timeout=500)
dialog._device_field.dev["test device"] = MagicMock()
dialog._device_field.setText("test device")
dialog._device_field.devices = ["test device"]
dialog._device_field.setCurrentText("test device")
dialog._signal_field._signals = [("test signal", {"component_name": "test signal"})]
dialog._signal_field.addItem("test signal")
dialog._signal_field.setCurrentIndex(0)
@@ -162,7 +163,8 @@ def test_dialog_handler_updates_devices(signal_label: SignalLabel, qtbot):
dialog = signal_label.show_choice_dialog()
qtbot.waitUntil(dialog.button_box.button(QDialogButtonBox.Ok).isVisible, timeout=500)
dialog._device_field.dev["flux_capacitor"] = MagicMock()
dialog._device_field.setText("flux_capacitor")
dialog._device_field.devices = ["flux_capacitor"]
dialog._device_field.setCurrentText("flux_capacitor")
dialog._signal_field._signals = [("spin_speed", {"component_name": "spin_speed"})]
dialog._signal_field.addItem("spin_speed")
dialog._signal_field.setCurrentIndex(0)
@@ -176,7 +178,7 @@ def test_choose_signal_dialog_invalid_device(signal_label: SignalLabel, qtbot):
signal_label._process_dialog = MagicMock()
dialog = signal_label.show_choice_dialog()
qtbot.waitUntil(dialog.button_box.button(QDialogButtonBox.Ok).isVisible, timeout=500)
dialog._device_field.setText("invalid device")
dialog._device_field.setCurrentText("invalid device")
dialog._signal_field.addItem("test signal")
dialog._signal_field.setCurrentIndex(0)
qtbot.mouseClick(dialog.button_box.button(QDialogButtonBox.Ok), QtCore.Qt.LeftButton)
@@ -206,7 +208,8 @@ def test_dialog_has_signals(signal_label: SignalLabel, qtbot):
"signals": {"signal 1": {"kind_str": "hinted"}, "signal 2": {"kind_str": "normal"}}
}
dialog._device_field.setText("test device")
dialog._device_field.devices = ["test device"]
dialog._device_field.setCurrentText("test device")
assert dialog._signal_field.count() == 2 # the actual signal and the category label
assert dialog._signal_field.currentText() == "signal 1"