1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-29 03:22:37 +02:00

Compare commits

..

153 Commits

Author SHA1 Message Date
wakonig_k 2cbe2a4542 refactor: rename to beamline states 2026-02-05 09:45:19 +01:00
wakonig_k d0d14ae78f feat: copy app id to clipboard 2026-02-05 09:45:05 +01:00
wyzula_j f850da49cd fix(launch_window): added one more connection for BECStatusBroker to show launcher 2026-02-05 09:27:09 +01:00
wyzula_j f0d33c5cd7 feat(main_window): status bar added to main window bottom left 2026-02-05 09:25:50 +01:00
wyzula_j 48b333fd73 feat(status_bar): added StatusToolBar with status actions and Broker 2026-02-05 09:25:50 +01:00
wyzula_j 1534118f21 fix(colors): more benevolent fetching of colormap names, avoid hardcoded wrong colormap mapping from GradientWidget from pg 2026-02-02 14:11:59 +01:00
wyzula_j 572797626c fix(screen_utils): screen utilities added and fixed sizing for widgets from launch window and main app 2026-01-31 20:05:04 +01:00
wyzula_j 40a666aa18 refactor(dock_area): change name to BECDockArea 2026-01-30 16:26:35 +01:00
appel_c 577ca4301a fix(ophyd-validation): add device_manager_ds argument if available for ophyd validation 2026-01-30 14:32:35 +01:00
wyzula_j df4082b31b fix(editors): VSCode widget removed 2026-01-30 12:54:05 +01:00
perl_d aadb3e129a ci: cancel previous CI run for PR or branch 2026-01-30 10:11:24 +01:00
wyzula_j 0580b539fa feat(image): modernization of image widget 2026-01-29 14:37:09 +01:00
wyzula_j b79c4862c5 fix(device_combobox): public flag for valid input 2026-01-29 14:37:09 +01:00
wyzula_j 148b41e238 fix(device_input_widgets): removed RPC access 2026-01-28 12:47:19 +01:00
wyzula_j 6e398e8077 feat(device_combobox): device filter added based on its signal classes 2026-01-28 12:47:19 +01:00
wyzula_j 8d75c2af1c feat(signal_combobox): extended that can filter by signal class and dimension of the signal 2026-01-28 12:47:19 +01:00
wyzula_j 24dbb885f6 feat(plot_base): plot_base, image and heatmap widget adopted to property-toolbar sync 2026-01-28 11:16:48 +01:00
wyzula_j 3b7bad85d3 feat(toolbar): toolbar can be synced with the property_changed for toggle actions 2026-01-28 11:16:48 +01:00
wyzula_j de09cc660a feat(SafeProperty): SafeProperty emits property_changed signal 2026-01-28 11:16:48 +01:00
wakonig_k 4bb8e86509 test(e2e): raise with widget name 2026-01-28 10:50:52 +01:00
wakonig_k e5b76bc855 fix(rpc_server): use single shot instead of processEvents to avoid dead locks 2026-01-28 10:50:52 +01:00
wakonig_k 99176198ee fix: adjust ring progress bar to ads 2026-01-28 10:50:52 +01:00
wakonig_k dcfc573052 fix(FakeDevice): add _info dict 2026-01-28 10:50:52 +01:00
wakonig_k 9290a9a23b feat(color): add relative luminance calculation 2026-01-28 10:50:52 +01:00
wakonig_k d48b9d224f feat: add export and load settings methods to BECConnector; add SafeProperty safe getter flag 2026-01-28 10:50:52 +01:00
wyzula_j 43c311782d fix(rpc_register): listing only valid connections 2026-01-27 21:53:10 +01:00
wyzula_j 44f7acaeda fix(launch_window): logic for showing launcher 2026-01-27 21:53:10 +01:00
wyzula_j 0b212c3100 fix(main_window): parent fixed for notification broker 2026-01-27 21:53:10 +01:00
appel_c d8ebae49ad fix(device-progress-bar): remove stretch in content layout 2026-01-26 22:07:14 +01:00
perl_d 153fb62a04 style: wrap progress bar in widget to fix background 2026-01-26 22:07:14 +01:00
perl_d d67227d20c test: fix test 2026-01-26 22:07:14 +01:00
perl_d dc1072c247 fix: tooltip logic and disable button on running scan 2026-01-26 22:07:14 +01:00
perl_d beb337201c feat: attach config cancellation to closeEvent 2026-01-26 22:07:14 +01:00
perl_d 75162ef8a8 fix: remove manual stylesheet deletion/override 2026-01-26 22:07:14 +01:00
perl_d cc89252fb3 fix: 'Any' type annotations 2026-01-26 22:07:14 +01:00
appel_c 36fa0e649c fix(_OverlayEventFilter): fix typo 2026-01-26 22:07:14 +01:00
appel_c 8e173cb17e refactor(device-form-dialog): Use native QDialogButtonBox instead of GroupBox layout 2026-01-26 22:07:14 +01:00
appel_c 322655fc5e refactor(busy-loager): Improve eventFilter to avoid crashs if target or overlay is None. 2026-01-26 22:07:14 +01:00
appel_c 2b5b7360ae test(device-manager-view): improve test coverage for device-manager-view 2026-01-26 22:07:14 +01:00
appel_c b325d1bb4f fix(signal-label): Fix signal label cleanup, missing parent in constructors 2026-01-26 22:07:14 +01:00
appel_c ee6fd5fb9e test cleanup add mocked client 2026-01-26 22:07:14 +01:00
appel_c 53fe1ac63d fix(device-manager-display-widget): fix error message popup on cancelling upload 2026-01-26 22:07:14 +01:00
appel_c 58e57169e8 test(config-communicator): add test for cancel action 2026-01-26 22:07:14 +01:00
appel_c 2b27faf779 fix(device-init-progress-bar): fix ui format for device init progressbar 2026-01-26 22:07:14 +01:00
appel_c b1a3403cd3 fix(busy-loader): adjust busy loader and tests 2026-01-26 22:07:14 +01:00
appel_c b38d6dc549 refactor(busy-loader): refactor busy loader to use custom widget 2026-01-26 22:07:14 +01:00
appel_c cc45fed387 feat(device-initialization-progress-bar): add progress bar for device initialization 2026-01-26 22:07:14 +01:00
wyzula_j 5a594925f0 fix(colors): added logger to the apply theme 2026-01-26 20:53:24 +01:00
wyzula_j e76dea6f69 fix(launch_window): processEvents removed 2026-01-26 20:53:24 +01:00
wyzula_j f4c14d66db fix(advanced_dock_area): removed the singleShot for load_initial_profile 2026-01-26 20:53:24 +01:00
wyzula_j 4ef1344fec fix(view):removed splitter logic 2026-01-26 20:53:24 +01:00
wyzula_j 5e63814afe fix(basic_dock_area): removed the singleShot usage 2026-01-26 20:53:24 +01:00
wyzula_j 6be6dafd7d fix(widgets): processEvent removed from widgets using it 2026-01-26 20:53:24 +01:00
wyzula_j fd1edf8177 fix: remove singleShots from BECConnector and adjustments of dock area logic 2026-01-26 20:53:24 +01:00
wyzula_j 8102f31956 fix(positioner_box): layout HV centered and size taken from the ui file 2026-01-26 20:53:24 +01:00
wyzula_j f9b92dacc3 fix(bec_connector): use RPC register to fetch all connections 2026-01-26 20:53:24 +01:00
wyzula_j a219de11c1 feat(motor_map): motor selection adopted to splitter action 2026-01-23 15:10:12 +01:00
wyzula_j 45e9f03093 feat(toolbar): splitter action added 2026-01-23 15:10:12 +01:00
wakonig_k 48e2a97ece fix(scatter waveform): fix tab order for settings panel 2026-01-18 18:36:33 +01:00
wyzula_j 953760c828 fix(scatter_waveform): remove curve_json from the properties 2026-01-18 18:36:33 +01:00
wyzula_j dc3129357b fix(signal_combo_box): get_signal_name added; remove duplicates from heatmap and scatter waveform settings; 2026-01-18 18:36:33 +01:00
wyzula_j 12746ae4aa fix(scatter_waveform): modernization of scatter waveform settings dialog 2026-01-18 18:36:33 +01:00
wyzula_j 7e9cc20e59 fix(scatter_waveform): devices and entries saved as properties 2026-01-18 18:36:33 +01:00
wyzula_j 5209f4c210 fix(heatmap): devices are saved as SafeProperties 2026-01-16 17:29:19 +01:00
wakonig_k 5f30ab5aa2 test(script_tree): improve hover event handling with waitUntil 2026-01-16 17:07:48 +01:00
wakonig_k 3926c5c947 feat(web console): add support for shared web console sessions 2026-01-16 17:07:48 +01:00
appel_c f71c8c882f test(device-manager): use mocked client for tests 2026-01-16 11:05:18 +01:00
appel_c 04a30ea04c refactor(ophyd-validation): Allow option to keep device visible after successful validation 2026-01-16 11:05:18 +01:00
appel_c cbdeae15a1 fix(device-manager): fix minor icon synchronization bugs 2026-01-16 11:05:18 +01:00
appel_c 6aa33cacfa fix(device-manager-display-widget): Remove devices from ophyd validation after upload to BEC 2026-01-16 11:05:18 +01:00
appel_c 73cfe8da4c test(device-form-dialog): adapt tests 2026-01-16 11:05:18 +01:00
appel_c 0467d88010 fix(device-form-dialog): Adapt device-form-dialog ophyd validation test 2026-01-16 11:05:18 +01:00
appel_c c41ef4401d fix(device-form-dialog): Adapt DeviceFormDialog to run validation of config upon editing/adding a config, and forward validation results 2026-01-16 11:05:18 +01:00
wyzula_j 4f2a840c21 fix(CLI): change the default behavior of launching the profiles in CLI 2026-01-16 10:56:22 +01:00
wyzula_j 91050e88ae refactor(advanced_dock_area): change remove_widget to delete 2026-01-16 10:56:22 +01:00
wyzula_j 028efed5bc fix(advanced_dock_area): empty profile is always empty 2026-01-16 10:56:22 +01:00
wyzula_j 80f2ca40cb fix(advanced_dock_area): CLI API adjustments docs + names 2026-01-16 10:56:22 +01:00
wyzula_j 7c32d47f52 fix(advanced_dock_area): replace sanitize_namespace with slugify 2026-01-16 10:56:22 +01:00
wyzula_j bf7299c31e fix(client_utils): delete is deleting window and its content 2026-01-16 10:56:22 +01:00
wyzula_j f3470b409d fix(CLI): dock_area can be created from CLI with specific profile or empty 2026-01-16 10:56:22 +01:00
wyzula_j 3486dd4e44 fix(advanced_dock_area): remove widget from dock area by object name 2026-01-16 10:56:22 +01:00
wyzula_j 46fe5498b5 fix(advanced_dock_area): profile behaviour adjusted, cleanup of the codebase 2026-01-16 10:56:22 +01:00
wyzula_j e94ce73950 fix: sanitize name space util for bec connector and ads 2026-01-16 10:56:22 +01:00
wyzula_j 3cc469a3d1 fix(main_app): dock area from main app shares the workspace name with the CLI one to reuse the profiles created in the cli companion window 2026-01-16 10:56:22 +01:00
wyzula_j b4e1a7927d fix(launch_window): launch geometry for widgets launched from launcher to 80% of the primary screen as default 2026-01-16 10:56:22 +01:00
wyzula_j 84950cc651 fix(launch_window): argument to start with the gui class 2026-01-16 10:56:22 +01:00
wyzula_j 24cc8c7b98 fix(dock_area): the old BECDockArea(pg) removed and replaces by AdvancedDockArea(ADS) 2026-01-16 10:56:22 +01:00
wyzula_j 2132ace01b fix(advanced_dock_area): removed non-functional dock_list and dock_map from RPC 2026-01-16 10:56:22 +01:00
wyzula_j 67650b96a2 fix(advanced_dock_area): new profiles are saved with quickselect as default 2026-01-16 10:56:22 +01:00
wyzula_j 6b1d2958c3 fix(advanced_dock_area): ensure the general profile exists when launched first time 2026-01-16 10:56:22 +01:00
wyzula_j dab1defc76 fix(advanced_dock_area): remove all widgets when loading new profiles 2026-01-16 10:56:22 +01:00
wyzula_j c02f509867 fix(basic_dock_area): delete_all will also delete floating docks 2026-01-16 10:56:22 +01:00
wyzula_j b585a608c7 fix(main_window): delete on close 2026-01-16 10:56:22 +01:00
wakonig_k 21862e8021 fix(main_app): center the application window on the screen 2026-01-14 23:13:09 +01:00
wakonig_k 15ac1c0182 fix(main_app): refactor main function and update script entry point in pyproject.toml 2026-01-14 23:13:09 +01:00
wakonig_k da23a47213 ci: use shared issue sync action instead of local version 2026-01-09 10:36:19 +01:00
wakonig_k 1bb0f1a855 fix(developer widget): save before executing a scripts 2026-01-08 12:55:31 +01:00
wakonig_k f121d09baa fix(monaco widget): reset current_file 2026-01-08 12:55:31 +01:00
wakonig_k dd7a5e11df fix(monaco dock): update last focused editor when closing 2026-01-08 12:55:31 +01:00
wakonig_k 2d4eabead0 fix(monaco_dock): update editor metadata handling and improve open_file method 2026-01-08 12:55:31 +01:00
wakonig_k e607d34337 refactor(developer_widget): enhance documentation and add missing imports 2026-01-08 12:55:31 +01:00
wakonig_k 4a2bc9fcd9 feat(developer_widget): add signal connection for focused editor changes to disable run button for macro files 2026-01-08 12:55:31 +01:00
wyzula_j 2ffe269727 fix(client): client API regenerated 2026-01-05 11:25:55 +01:00
wyzula_j de5773662a feat(device-manager): Add DeviceManager Widget for BEC Widget main applications 2026-01-05 11:25:55 +01:00
wyzula_j 53b50e3420 fix(general_app): old general app example removed 2026-01-05 11:25:55 +01:00
wyzula_j b16f88b217 fix(heatmap): interpolation thread is killed only on exit, logger for dandling thread 2026-01-05 11:25:55 +01:00
wyzula_j 063e5d064c perf(heatmap): thread worker optimization 2026-01-05 11:25:55 +01:00
wyzula_j c354a9b249 fix(heatmap): interpolation of the image moved to separate thread 2026-01-05 11:25:55 +01:00
wyzula_j caa4e449e4 fix(motor_map): x/y motor are saved in properties 2026-01-05 11:25:55 +01:00
perl_d afc8c4733e fix: don't wait forever 2026-01-05 11:25:55 +01:00
wyzula_j a00024c66f fix(widget_state_manager): PROPERTIES_TO_SKIP are not restored even if in ini file 2026-01-05 11:25:55 +01:00
wyzula_j 5c18b291b5 feat(advanced_dock_area): floating docks restore with relative geometry 2026-01-05 11:25:55 +01:00
wakonig_k 08dde431a6 refactor: improvements to enum access 2026-01-05 11:25:55 +01:00
wyzula_j 7daa25d7c1 feat(advanced_dock_area): instance lock for multiple ads in same session 2026-01-05 11:25:55 +01:00
wyzula_j 8842eb617a fix(widgets): removed isVisible from all SafeProperties 2026-01-05 11:25:55 +01:00
wyzula_j 1d0634e142 fix(bec_widget): improved qt enums; grab safeguard 2026-01-05 11:25:55 +01:00
wyzula_j dc6946c924 fix(qt_ads): pythons stubs match structure of PySide6QtAds 2026-01-05 11:25:55 +01:00
wyzula_j 377bad4854 fix(widget_state_manager): filtering of not wanted properties 2026-01-05 11:25:55 +01:00
wyzula_j 6cdd813734 refactor(main_app): adapted for DockAreaWidget changes 2026-01-05 11:25:55 +01:00
wyzula_j 3f46f7eb7e refactor(developer_view): changed to use DockAreaWidget 2026-01-05 11:25:55 +01:00
wyzula_j 73f474c7e7 refactor(monaco_dock): changed to use DockAreaWidget 2026-01-05 11:25:55 +01:00
wyzula_j 2dfae4d38f feat(advanced_dock_area): created DockAreaWidget base class; profile management through namespaces; dock area variants 2026-01-05 11:25:54 +01:00
wyzula_j f7061baf7b fix(main_window): removed general forced cleanup 2026-01-05 11:25:54 +01:00
wyzula_j 5865d0f97d feat(advanced_dock_area): UI/UX for profile management improved, saving directories logic adjusted 2026-01-05 11:25:54 +01:00
wyzula_j c204815c42 fix(main_window): cleanup adjusted with shiboken6 2026-01-05 11:25:54 +01:00
wyzula_j af8f3911aa fix(dark_mode_button): skip settings added 2026-01-05 11:25:54 +01:00
wyzula_j 73afb5a472 fix(widget_state_manager): added shiboken check 2026-01-05 11:25:54 +01:00
wyzula_j 5836f286de feat(bec_widget): save screenshot to bytes 2026-01-05 11:25:54 +01:00
wyzula_j 5567274f2d fix(becconnector): ophyd thread killer on exit + in conftest 2026-01-05 11:25:54 +01:00
wakonig_k 7983a4527a feat(guided_tour): add guided tour 2026-01-05 11:25:54 +01:00
wakonig_k 0f63543326 fix: add metadata to scan control export 2026-01-05 11:25:54 +01:00
wyzula_j 01755aba07 feat(developer_view): add developer view 2026-01-05 11:25:54 +01:00
wyzula_j b4987fe759 feat(jupyter_console_window): adjustment for general usage 2026-01-05 11:25:54 +01:00
wakonig_k b0cb048c81 feat(ads): add pyi stub file to provide type hints for ads 2026-01-05 11:25:54 +01:00
appel_c e8c062a48f feat(dm-view): initial device manager view added 2026-01-05 11:25:54 +01:00
appel_c dfe914bb7e feat(help-inspector): add help inspector widget 2026-01-05 11:25:54 +01:00
wyzula_j b66353bf6e fix(signal_label): dispatcher unsubscribed in the cleanup 2026-01-05 11:25:54 +01:00
wyzula_j ead1d38b49 fix(client): abort, reset, stop button removed from RPC access 2026-01-05 11:25:54 +01:00
wyzula_j b2505c6a56 feat(main_app): main app with interactive app switcher 2026-01-05 11:25:54 +01:00
wyzula_j 663c00f1a4 feat(actions): actions can be created with label text with beside or under alignment 2026-01-05 11:25:54 +01:00
wyzula_j 3dd688540e feat(busy_loader): busy loader added to bec widget base class 2026-01-05 11:25:54 +01:00
wakonig_k 092ac915a8 feat: add SafeConnect 2026-01-05 11:25:54 +01:00
wyzula_j 03015a72a6 fix(bec_widgets): adapt to bec_qthemes 1.0; themes can be only applied on living Qt objects 2026-01-05 11:25:54 +01:00
wyzula_j 7dcaf8fe4c feat(advanced_dock_area): added ads based dock area with profiles 2026-01-05 11:25:54 +01:00
wyzula_j 02db6307e4 fix(web_console): added startup kwarg 2026-01-05 11:25:54 +01:00
wyzula_j 3a10cac7c8 refactor(bec_main_window): main app theme renamed to View 2026-01-05 11:25:54 +01:00
wyzula_j 64fecd16dd fix(widget_state_manager): state manager can save all properties recursively to already existing settings 2026-01-05 11:25:54 +01:00
wyzula_j 76639b3e04 feat(bec_widget): attach/detach method for all widgets + client regenerated 2026-01-05 11:25:54 +01:00
wyzula_j a767ee8331 feat(widget_io): widget hierarchy can grap all bec connectors from the widget recursively 2026-01-05 11:25:54 +01:00
wyzula_j 5c33f1a6d4 fix(bec_connector): widget_removed and name_established signals added 2026-01-05 11:25:54 +01:00
wakonig_k af320d812b ci: install ttyd 2026-01-05 11:25:54 +01:00
wakonig_k 5bfb50fdc6 ci: add artifact upload 2026-01-05 11:25:54 +01:00
wyzula_j 5393a84494 build: PySide6-QtAds; bec_qtheme V1; dependencies updated and adjusted 2026-01-05 11:25:54 +01:00
249 changed files with 5419 additions and 14972 deletions
+1 -1
View File
@@ -62,4 +62,4 @@ runs:
uv pip install --system -e ./ophyd_devices
uv pip install --system -e ./bec/bec_lib[dev]
uv pip install --system -e ./bec/bec_ipython_client
uv pip install --system -e ./bec_widgets[dev,qtermwidget]
uv pip install --system -e ./bec_widgets[dev,pyside6]
-169
View File
@@ -1,169 +0,0 @@
##########################
### AI-generated file. ###
##########################
"""Aggregate and merge benchmark JSON files.
The workflow runs the same benchmark suite on multiple independent runners.
This script reads every JSON file produced by those attempts, normalizes the
contained benchmark values, and writes a compact mapping JSON where each value is
the median across attempts. It can also merge independent hyperfine JSON files
from one runner into a single hyperfine-style JSON file.
"""
from __future__ import annotations
import argparse
import json
import statistics
from pathlib import Path
from typing import Any
from compare_benchmarks import Benchmark, extract_benchmarks
def collect_benchmarks(paths: list[Path]) -> dict[str, list[Benchmark]]:
"""Collect benchmarks from multiple JSON files.
Args:
paths (list[Path]): Paths to hyperfine, pytest-benchmark, or compact
mapping JSON files.
Returns:
dict[str, list[Benchmark]]: Benchmarks grouped by benchmark name.
"""
collected: dict[str, list[Benchmark]] = {}
for path in paths:
for name, benchmark in extract_benchmarks(path).items():
collected.setdefault(name, []).append(benchmark)
return collected
def aggregate(collected: dict[str, list[Benchmark]]) -> dict[str, dict[str, object]]:
"""Aggregate grouped benchmarks using the median value.
Args:
collected (dict[str, list[Benchmark]]): Benchmarks grouped by benchmark
name.
Returns:
dict[str, dict[str, object]]: Compact mapping JSON data. Each benchmark
contains ``value``, ``unit``, ``metric``, ``attempts``, and
``attempt_values``.
"""
aggregated: dict[str, dict[str, object]] = {}
for name, benchmarks in sorted(collected.items()):
values = [benchmark.value for benchmark in benchmarks]
unit = next((benchmark.unit for benchmark in benchmarks if benchmark.unit), "")
metric = next((benchmark.metric for benchmark in benchmarks if benchmark.metric), "value")
aggregated[name] = {
"value": statistics.median(values),
"unit": unit,
"metric": f"median-of-attempt-{metric}",
"attempts": len(values),
"attempt_values": values,
}
return aggregated
def merge_hyperfine_results(paths: list[Path]) -> dict[str, Any]:
"""Merge hyperfine result files.
Args:
paths (list[Path]): Hyperfine JSON files to merge.
Returns:
dict[str, Any]: Hyperfine-style JSON object containing all result rows.
Raises:
ValueError: If any file has no hyperfine ``results`` list.
"""
merged: dict[str, Any] = {"results": []}
for path in paths:
data = json.loads(path.read_text(encoding="utf-8"))
results = data.get("results", []) if isinstance(data, dict) else None
if not isinstance(results, list):
raise ValueError(f"{path} has no hyperfine results list")
merged["results"].extend(results)
return merged
def main_from_paths(input_dir: Path, output: Path) -> int:
"""Aggregate all JSON files in a directory and write the result.
Args:
input_dir (Path): Directory containing benchmark JSON files.
output (Path): Path where the aggregate JSON should be written.
Returns:
int: Always ``0`` on success.
Raises:
ValueError: If no JSON files are found in ``input_dir``.
"""
paths = sorted(input_dir.rglob("*.json"))
if not paths:
raise ValueError(f"No benchmark JSON files found in {input_dir}")
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(
json.dumps(aggregate(collect_benchmarks(paths)), indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
return 0
def merge_from_paths(input_dir: Path, output: Path) -> int:
"""Merge all hyperfine JSON files in a directory and write the result.
Args:
input_dir (Path): Directory containing hyperfine JSON files.
output (Path): Path where the merged JSON should be written.
Returns:
int: Always ``0`` on success.
Raises:
ValueError: If no JSON files are found in ``input_dir``.
"""
paths = sorted(input_dir.glob("*.json"))
if not paths:
raise ValueError(f"No hyperfine JSON files found in {input_dir}")
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(
json.dumps(merge_hyperfine_results(paths), indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
return 0
def main() -> int:
"""Run the benchmark aggregation command line interface.
Returns:
int: Always ``0`` on success.
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--mode",
choices=("aggregate", "merge-hyperfine"),
default="aggregate",
help="Operation to perform.",
)
parser.add_argument("--input-dir", required=True, type=Path)
parser.add_argument("--output", required=True, type=Path)
args = parser.parse_args()
if args.mode == "merge-hyperfine":
return merge_from_paths(input_dir=args.input_dir, output=args.output)
return main_from_paths(input_dir=args.input_dir, output=args.output)
if __name__ == "__main__":
raise SystemExit(main())
-454
View File
@@ -1,454 +0,0 @@
##########################
### AI-generated file. ###
##########################
"""Compare benchmark JSON files and write a GitHub Actions summary.
The script supports JSON emitted by hyperfine, JSON emitted by pytest-benchmark,
and a compact mapping format generated by ``aggregate_benchmarks.py``. Timing
formats prefer median values and fall back to mean values when median values are
not present.
"""
from __future__ import annotations
import argparse
import json
import math
from dataclasses import dataclass
from pathlib import Path
from typing import Any
@dataclass(frozen=True)
class Benchmark:
"""Normalized benchmark result.
Attributes:
name (str): Stable benchmark name used to match baseline and current results.
value (float): Numeric benchmark value used for comparison.
unit (str): Display unit for the value, for example ``"s"``.
metric (str): Source metric name, for example ``"median"`` or ``"mean"``.
"""
name: str
value: float
unit: str
metric: str = "value"
@dataclass(frozen=True)
class Comparison:
"""Comparison between one baseline benchmark and one current benchmark.
Attributes:
name (str): Benchmark name.
baseline (float): Baseline benchmark value.
current (float): Current benchmark value.
delta_percent (float): Percent change from baseline to current.
unit (str): Display unit for both values.
metric (str): Current result metric used for comparison.
regressed (bool): Whether the change exceeds the configured threshold in
the worse direction.
improved (bool): Whether the change exceeds the configured threshold in
the better direction.
"""
name: str
baseline: float
current: float
delta_percent: float
unit: str
metric: str
regressed: bool
improved: bool
def _read_json(path: Path) -> Any:
"""Read JSON data from a file.
Args:
path (Path): Path to the JSON file.
Returns:
Any: Parsed JSON value.
"""
with path.open("r", encoding="utf-8") as stream:
return json.load(stream)
def _as_float(value: Any) -> float | None:
"""Convert a value to a finite float.
Args:
value (Any): Value to convert.
Returns:
float | None: Converted finite float, or ``None`` if conversion fails.
"""
try:
result = float(value)
except (TypeError, ValueError):
return None
if math.isfinite(result):
return result
return None
def _extract_hyperfine(data: dict[str, Any]) -> dict[str, Benchmark]:
"""Extract normalized benchmarks from hyperfine JSON.
Args:
data (dict[str, Any]): Parsed hyperfine JSON object.
Returns:
dict[str, Benchmark]: Benchmarks keyed by command name.
"""
benchmarks: dict[str, Benchmark] = {}
for result in data.get("results", []):
if not isinstance(result, dict):
continue
name = str(result.get("command") or result.get("name") or "").strip()
metric = "median"
value = _as_float(result.get(metric))
if value is None:
metric = "mean"
value = _as_float(result.get(metric))
if name and value is not None:
benchmarks[name] = Benchmark(name=name, value=value, unit="s", metric=metric)
return benchmarks
def _extract_pytest_benchmark(data: dict[str, Any]) -> dict[str, Benchmark]:
"""Extract normalized benchmarks from pytest-benchmark JSON.
Args:
data (dict[str, Any]): Parsed pytest-benchmark JSON object.
Returns:
dict[str, Benchmark]: Benchmarks keyed by full benchmark name.
"""
benchmarks: dict[str, Benchmark] = {}
for benchmark in data.get("benchmarks", []):
if not isinstance(benchmark, dict):
continue
name = str(benchmark.get("fullname") or benchmark.get("name") or "").strip()
stats = benchmark.get("stats", {})
value = None
metric = "median"
if isinstance(stats, dict):
value = _as_float(stats.get(metric))
if value is None:
metric = "mean"
value = _as_float(stats.get(metric))
if name and value is not None:
benchmarks[name] = Benchmark(name=name, value=value, unit="s", metric=metric)
return benchmarks
def _extract_simple_mapping(data: dict[str, Any]) -> dict[str, Benchmark]:
"""Extract normalized benchmarks from a compact mapping JSON object.
Args:
data (dict[str, Any]): Parsed mapping where each benchmark is either a
raw number or an object containing ``value``, ``unit``, and ``metric``.
Returns:
dict[str, Benchmark]: Benchmarks keyed by mapping key.
"""
benchmarks: dict[str, Benchmark] = {}
for name, raw_value in data.items():
if name in {"version", "context", "commit", "timestamp"}:
continue
value = _as_float(raw_value)
unit = ""
metric = "value"
if value is None and isinstance(raw_value, dict):
value = _as_float(raw_value.get("value"))
unit = str(raw_value.get("unit") or "")
metric = str(raw_value.get("metric") or "value")
if value is not None:
benchmarks[str(name)] = Benchmark(name=str(name), value=value, unit=unit, metric=metric)
return benchmarks
def extract_benchmarks(path: Path) -> dict[str, Benchmark]:
"""Extract normalized benchmarks from a supported JSON file.
Args:
path (Path): Path to a hyperfine, pytest-benchmark, or compact mapping
JSON file.
Returns:
dict[str, Benchmark]: Normalized benchmarks keyed by name.
Raises:
ValueError: If the JSON root is not an object or no supported benchmark
entries can be extracted.
"""
data = _read_json(path)
if not isinstance(data, dict):
raise ValueError(f"{path} must contain a JSON object")
extractors = (_extract_hyperfine, _extract_pytest_benchmark, _extract_simple_mapping)
for extractor in extractors:
benchmarks = extractor(data)
if benchmarks:
return benchmarks
raise ValueError(f"No supported benchmark entries found in {path}")
def compare_benchmarks(
baseline: dict[str, Benchmark],
current: dict[str, Benchmark],
threshold_percent: float,
higher_is_better: bool,
) -> tuple[list[Comparison], list[str], list[str]]:
"""Compare baseline benchmarks with current benchmarks.
Args:
baseline (dict[str, Benchmark]): Baseline benchmarks keyed by name.
current (dict[str, Benchmark]): Current benchmarks keyed by name.
threshold_percent (float): Regression threshold in percent.
higher_is_better (bool): If ``True``, lower current values are treated as
regressions. If ``False``, higher current values are treated as
regressions.
Returns:
tuple[list[Comparison], list[str], list[str]]: Comparisons for common
benchmark names, names missing from current results, and names newly
present in current results.
"""
comparisons: list[Comparison] = []
missing_in_current: list[str] = []
new_in_current: list[str] = []
for name, baseline_benchmark in sorted(baseline.items()):
current_benchmark = current.get(name)
if current_benchmark is None:
missing_in_current.append(name)
continue
if baseline_benchmark.value == 0:
delta_percent = 0.0
else:
delta_percent = (
(current_benchmark.value - baseline_benchmark.value)
/ abs(baseline_benchmark.value)
* 100
)
if higher_is_better:
regressed = delta_percent <= -threshold_percent
improved = delta_percent >= threshold_percent
else:
regressed = delta_percent >= threshold_percent
improved = delta_percent <= -threshold_percent
comparisons.append(
Comparison(
name=name,
baseline=baseline_benchmark.value,
current=current_benchmark.value,
delta_percent=delta_percent,
unit=current_benchmark.unit or baseline_benchmark.unit,
metric=current_benchmark.metric,
regressed=regressed,
improved=improved,
)
)
for name in sorted(set(current) - set(baseline)):
new_in_current.append(name)
return comparisons, missing_in_current, new_in_current
def _format_value(value: float, unit: str) -> str:
"""Format a benchmark value for Markdown output.
Args:
value (float): Numeric benchmark value.
unit (str): Display unit.
Returns:
str: Formatted value with optional unit suffix.
"""
suffix = f" {unit}" if unit else ""
return f"{value:.6g}{suffix}"
def _format_status(comparison: Comparison) -> str:
"""Format a comparison status for Markdown output."""
if comparison.regressed:
return ":red_circle: regressed"
if comparison.improved:
return ":green_circle: improved"
return "ok"
def write_summary(
path: Path,
comparisons: list[Comparison],
missing_in_current: list[str],
new_in_current: list[str],
threshold_percent: float,
higher_is_better: bool,
) -> None:
"""Write a Markdown benchmark comparison summary.
Args:
path (Path): Path where the summary should be written.
comparisons (list[Comparison]): Comparison rows for matching benchmarks.
missing_in_current (list[str]): Baseline benchmark names missing from the
current result.
new_in_current (list[str]): Current benchmark names not present in the
baseline result.
threshold_percent (float): Regression threshold in percent.
higher_is_better (bool): Whether higher benchmark values are considered
better.
"""
regressions = [comparison for comparison in comparisons if comparison.regressed]
improvements = [comparison for comparison in comparisons if comparison.improved]
direction = "higher is better" if higher_is_better else "lower is better"
sorted_comparisons = sorted(comparisons, key=lambda comparison: comparison.name)
lines = [
"<!-- bw-benchmark-comment -->",
"## Benchmark comparison",
"",
f"Threshold: {threshold_percent:g}% ({direction}).",
f"Result: {len(regressions)} regression(s), {len(improvements)} improvement(s) beyond threshold.",
]
lines.append("")
if regressions:
lines.extend(
[
f"{len(regressions)} benchmark(s) regressed beyond the configured threshold.",
"",
"| Benchmark | Baseline | Current | Change |",
"| --- | ---: | ---: | ---: |",
]
)
for comparison in regressions:
lines.append(
"| "
f"{comparison.name} | "
f"{_format_value(comparison.baseline, comparison.unit)} | "
f"{_format_value(comparison.current, comparison.unit)} | "
f"{comparison.delta_percent:+.2f}% |"
)
else:
lines.append("No benchmark regression exceeded the configured threshold.")
lines.append("")
if improvements:
lines.extend(
[
f"{len(improvements)} benchmark(s) improved beyond the configured threshold.",
"",
"| Benchmark | Baseline | Current | Change |",
"| --- | ---: | ---: | ---: |",
]
)
for comparison in improvements:
lines.append(
"| "
f"{comparison.name} | "
f"{_format_value(comparison.baseline, comparison.unit)} | "
f"{_format_value(comparison.current, comparison.unit)} | "
f"{comparison.delta_percent:+.2f}% |"
)
else:
lines.append("No benchmark improvement exceeded the configured threshold.")
if sorted_comparisons:
lines.extend(
[
"",
"<details>",
"<summary>All benchmark results</summary>",
"",
"| Benchmark | Baseline | Current | Change | Status |",
"| --- | ---: | ---: | ---: | --- |",
]
)
for comparison in sorted_comparisons:
lines.append(
"| "
f"{comparison.name} | "
f"{_format_value(comparison.baseline, comparison.unit)} | "
f"{_format_value(comparison.current, comparison.unit)} | "
f"{comparison.delta_percent:+.2f}% | "
f"{_format_status(comparison)} |"
)
lines.extend(["", "</details>"])
if missing_in_current:
lines.extend(["", "Missing benchmarks in the current run:"])
lines.extend(f"- `{name}`" for name in missing_in_current)
if new_in_current:
lines.extend(["", "New benchmarks in the current run:"])
lines.extend(f"- `{name}`" for name in new_in_current)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
def main() -> int:
"""Run the benchmark comparison command line interface.
Returns:
int: ``1`` when a regression exceeds the threshold, otherwise ``0``.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--baseline", required=True, type=Path)
parser.add_argument("--current", required=True, type=Path)
parser.add_argument("--summary", required=True, type=Path)
parser.add_argument("--threshold-percent", required=True, type=float)
parser.add_argument("--higher-is-better", action="store_true")
args = parser.parse_args()
baseline = extract_benchmarks(args.baseline)
current = extract_benchmarks(args.current)
comparisons, missing_in_current, new_in_current = compare_benchmarks(
baseline=baseline,
current=current,
threshold_percent=args.threshold_percent,
higher_is_better=args.higher_is_better,
)
write_summary(
path=args.summary,
comparisons=comparisons,
missing_in_current=missing_in_current,
new_in_current=new_in_current,
threshold_percent=args.threshold_percent,
higher_is_better=args.higher_is_better,
)
return 1 if any(comparison.regressed for comparison in comparisons) else 0
if __name__ == "__main__":
raise SystemExit(main())
-74
View File
@@ -1,74 +0,0 @@
#!/usr/bin/env bash
##########################
### AI-generated file. ###
##########################
set -euo pipefail
mkdir -p benchmark-results
benchmark_json="${BENCHMARK_JSON:-benchmark-results/current.json}"
benchmark_root="$(dirname "$benchmark_json")"
hyperfine_benchmark_dir="${BENCHMARK_HYPERFINE_DIR:-tests/benchmarks/hyperfine}"
pytest_benchmark_dirs="${BENCHMARK_PYTEST_DIRS:-${BENCHMARK_PYTEST_DIR:-}}"
benchmark_work_dir="$benchmark_root/raw-results"
hyperfine_json_dir="$benchmark_work_dir/hyperfine"
pytest_json="$benchmark_work_dir/pytest.json"
shopt -s nullglob
benchmark_scripts=()
benchmark_scripts=("$hyperfine_benchmark_dir"/benchmark_*.sh)
shopt -u nullglob
pytest_dirs=()
for pytest_benchmark_dir in $pytest_benchmark_dirs; do
if [ -d "$pytest_benchmark_dir" ]; then
pytest_dirs+=("$pytest_benchmark_dir")
else
echo "Pytest benchmark directory not found: $pytest_benchmark_dir" >&2
exit 1
fi
done
if [ "${#benchmark_scripts[@]}" -eq 0 ] && [ "${#pytest_dirs[@]}" -eq 0 ]; then
echo "No benchmark scripts or pytest benchmarks found" >&2
exit 1
fi
echo "Benchmark Python: $(command -v python)"
python -c 'import sys; print(sys.version)'
rm -rf "$benchmark_work_dir"
mkdir -p "$hyperfine_json_dir"
if [ "${#benchmark_scripts[@]}" -gt 0 ]; then
for benchmark_script in "${benchmark_scripts[@]}"; do
title="$(sed -n 's/^# BENCHMARK_TITLE:[[:space:]]*//p' "$benchmark_script" | head -n 1)"
if [ -z "$title" ]; then
title="$(basename "$benchmark_script" .sh)"
fi
benchmark_name="$(basename "$benchmark_script" .sh)"
benchmark_result_json="$hyperfine_json_dir/$benchmark_name.json"
echo "Preflight benchmark script: $benchmark_script"
bash "$benchmark_script"
hyperfine \
--show-output \
--warmup 1 \
--runs 5 \
--command-name "$title" \
--export-json "$benchmark_result_json" \
"bash $(printf "%q" "$benchmark_script")"
done
fi
if [ "${#pytest_dirs[@]}" -gt 0 ]; then
pytest \
-q "${pytest_dirs[@]}" \
--benchmark-only \
--benchmark-json "$pytest_json"
fi
python .github/scripts/aggregate_benchmarks.py \
--input-dir "$benchmark_work_dir" \
--output "$benchmark_json"
-125
View File
@@ -1,125 +0,0 @@
##########################
### AI-generated file. ###
##########################
"""Run a command with BEC e2e services available."""
from __future__ import annotations
import argparse
import os
import shutil
import subprocess
import tempfile
import time
from pathlib import Path
import bec_lib
from bec_ipython_client import BECIPythonClient
from bec_lib.redis_connector import RedisConnector
from bec_lib.service_config import ServiceConfig, ServiceConfigModel
from redis import Redis
def _wait_for_redis(host: str, port: int) -> None:
client = Redis(host=host, port=port)
deadline = time.monotonic() + 10
while time.monotonic() < deadline:
try:
if client.ping():
return
except Exception:
time.sleep(0.1)
raise RuntimeError(f"Redis did not start on {host}:{port}")
def _start_redis(files_path: Path, host: str, port: int) -> subprocess.Popen:
redis_server = shutil.which("redis-server")
if redis_server is None:
raise RuntimeError("redis-server executable not found")
return subprocess.Popen(
[
redis_server,
"--bind",
host,
"--port",
str(port),
"--save",
"",
"--appendonly",
"no",
"--dir",
str(files_path),
]
)
def _write_configs(files_path: Path, host: str, port: int) -> Path:
test_config = files_path / "test_config.yaml"
services_config = files_path / "services_config.yaml"
bec_lib_path = Path(bec_lib.__file__).resolve().parent
shutil.copyfile(bec_lib_path / "tests" / "test_config.yaml", test_config)
service_config = ServiceConfigModel(
redis={"host": host, "port": port}, file_writer={"base_path": str(files_path)}
)
services_config.write_text(service_config.model_dump_json(indent=4), encoding="utf-8")
return services_config
def _load_demo_config(services_config: Path) -> None:
bec = BECIPythonClient(ServiceConfig(services_config), RedisConnector, forced=True)
bec.start()
try:
bec.config.load_demo_config()
finally:
bec.shutdown()
bec._client._reset_singleton()
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("command", nargs=argparse.REMAINDER)
args = parser.parse_args()
if args.command[:1] == ["--"]:
args.command = args.command[1:]
if not args.command:
raise ValueError("No command provided")
host = "127.0.0.1"
port = 6379
with tempfile.TemporaryDirectory(prefix="bec-benchmark-") as tmp:
files_path = Path(tmp)
services_config = _write_configs(files_path, host, port)
redis_process = _start_redis(files_path, host, port)
processes = None
service_handler = None
try:
_wait_for_redis(host, port)
from bec_server.bec_server_utils.service_handler import ServiceHandler
service_handler = ServiceHandler(
bec_path=files_path, config_path=services_config, interface="subprocess"
)
processes = service_handler.start()
_load_demo_config(services_config)
env = os.environ.copy()
return subprocess.run(args.command, env=env, check=False).returncode
finally:
if service_handler is not None and processes is not None:
service_handler.stop(processes)
redis_process.terminate()
try:
redis_process.wait(timeout=10)
except subprocess.TimeoutExpired:
redis_process.kill()
if __name__ == "__main__":
raise SystemExit(main())
-242
View File
@@ -1,242 +0,0 @@
name: BW Benchmarks
on: [ workflow_call ]
permissions:
contents: read
env:
BENCHMARK_JSON: benchmark-results/current.json
BENCHMARK_BASELINE_JSON: gh-pages-benchmark-data/benchmarks/latest.json
BENCHMARK_SUMMARY: benchmark-results/summary.md
BENCHMARK_COMMAND: "bash .github/scripts/run_benchmarks.sh"
BENCHMARK_THRESHOLD_PERCENT: 20
BENCHMARK_HIGHER_IS_BETTER: false
jobs:
benchmark_attempt:
runs-on: ubuntu-latest
continue-on-error: true
permissions:
contents: read
defaults:
run:
shell: bash -el {0}
strategy:
fail-fast: false
matrix:
attempt: [ 1, 2, 3 ]
env:
BENCHMARK_JSON: benchmark-results/current-${{ matrix.attempt }}.json
BEC_CORE_BRANCH: main
OPHYD_DEVICES_BRANCH: main
PLUGIN_REPO_BRANCH: main
BENCHMARK_PYTEST_DIRS: tests/unit_tests/benchmarks
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
steps:
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
repository: bec-project/bec_widgets
ref: ${{ github.event.pull_request.head.sha || github.sha }}
- name: Set up Conda
uses: conda-incubator/setup-miniconda@v3
with:
auto-update-conda: true
auto-activate-base: true
python-version: "3.11"
- name: Install system dependencies
run: |
sudo apt-get update
sudo apt-get install -y libgl1 libegl1 x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb
sudo apt-get -y install libnss3 libxdamage1 libasound2t64 libatomic1 libxcursor1
sudo apt-get -y install ttyd hyperfine redis-server
- name: Install full e2e environment
run: |
echo -e "\033[35;1m Using branch $BEC_CORE_BRANCH of BEC CORE \033[0;m";
git clone --branch "$BEC_CORE_BRANCH" https://github.com/bec-project/bec.git
echo -e "\033[35;1m Using branch $OPHYD_DEVICES_BRANCH of OPHYD_DEVICES \033[0;m";
git clone --branch "$OPHYD_DEVICES_BRANCH" https://github.com/bec-project/ophyd_devices.git
export OHPYD_DEVICES_PATH=$PWD/ophyd_devices
echo -e "\033[35;1m Using branch $PLUGIN_REPO_BRANCH of bec_testing_plugin \033[0;m";
git clone --branch "$PLUGIN_REPO_BRANCH" https://github.com/bec-project/bec_testing_plugin.git
cd ./bec
conda create -q -n test-environment python=3.11
conda activate test-environment
source ./bin/install_bec_dev.sh -t
cd ../
python -m pip install -e ./ophyd_devices -e .[dev,pyside6] -e ./bec_testing_plugin pytest-benchmark
mkdir -p "$(dirname "$BENCHMARK_JSON")"
python .github/scripts/run_with_bec_servers.py -- bash -lc "$BENCHMARK_COMMAND"
test -s "$BENCHMARK_JSON"
- name: Upload benchmark artifact
uses: actions/upload-artifact@v4
with:
name: bw-benchmark-json-${{ matrix.attempt }}
path: ${{ env.BENCHMARK_JSON }}
benchmark:
needs: [ benchmark_attempt ]
runs-on: ubuntu-latest
permissions:
contents: read
issues: write
pull-requests: write
steps:
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
repository: bec-project/bec_widgets
ref: ${{ github.event.pull_request.head.sha || github.sha }}
- name: Download benchmark attempts
uses: actions/download-artifact@v4
with:
pattern: bw-benchmark-json-*
path: benchmark-results/attempts
merge-multiple: true
- name: Aggregate benchmark attempts
run: |
python .github/scripts/aggregate_benchmarks.py \
--input-dir benchmark-results/attempts \
--output "$BENCHMARK_JSON"
- name: Upload aggregate benchmark artifact
uses: actions/upload-artifact@v4
with:
name: bw-benchmark-json
path: ${{ env.BENCHMARK_JSON }}
- name: Fetch gh-pages benchmark data
run: |
if git ls-remote --exit-code --heads origin gh-pages; then
git clone --depth=1 --branch gh-pages "$GITHUB_SERVER_URL/$GITHUB_REPOSITORY.git" gh-pages-benchmark-data
else
mkdir -p gh-pages-benchmark-data
fi
- name: Compare with latest gh-pages benchmark
id: compare
continue-on-error: true
run: |
if [ ! -s "$BENCHMARK_BASELINE_JSON" ]; then
mkdir -p "$(dirname "$BENCHMARK_SUMMARY")"
{
echo "<!-- bw-benchmark-comment -->"
echo "## Benchmark comparison"
echo
echo "No benchmark baseline was found on gh-pages."
} > "$BENCHMARK_SUMMARY"
exit 0
fi
args=(
--baseline "$BENCHMARK_BASELINE_JSON"
--current "$BENCHMARK_JSON"
--summary "$BENCHMARK_SUMMARY"
--threshold-percent "$BENCHMARK_THRESHOLD_PERCENT"
)
if [ "$BENCHMARK_HIGHER_IS_BETTER" = "true" ]; then
args+=(--higher-is-better)
fi
set +e
python .github/scripts/compare_benchmarks.py "${args[@]}"
status=$?
set -e
if [ ! -s "$BENCHMARK_SUMMARY" ]; then
mkdir -p "$(dirname "$BENCHMARK_SUMMARY")"
{
echo "<!-- bw-benchmark-comment -->"
echo "## Benchmark comparison"
echo
echo "Benchmark comparison failed before writing a summary."
} > "$BENCHMARK_SUMMARY"
fi
exit "$status"
- name: Find existing benchmark PR comment
if: github.event_name == 'pull_request'
id: fc
uses: peter-evans/find-comment@v3
with:
issue-number: ${{ github.event.pull_request.number }}
comment-author: github-actions[bot]
body-includes: "<!-- bw-benchmark-comment -->"
- name: Create or update benchmark PR comment
if: github.event_name == 'pull_request'
uses: peter-evans/create-or-update-comment@v5
with:
issue-number: ${{ github.event.pull_request.number }}
comment-id: ${{ steps.fc.outputs.comment-id }}
body-path: ${{ env.BENCHMARK_SUMMARY }}
edit-mode: replace
- name: Fail on benchmark regression
if: github.event_name == 'pull_request' && steps.compare.outcome == 'failure'
run: exit 1
publish:
needs: [ benchmark ]
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
repository: bec-project/bec_widgets
ref: ${{ github.sha }}
- name: Download aggregate benchmark artifact
uses: actions/download-artifact@v4
with:
name: bw-benchmark-json
path: benchmark-results
- name: Verify aggregate benchmark artifact
run: test -s "$BENCHMARK_JSON"
- name: Prepare gh-pages for publishing
run: |
# Clean up any existing worktree/directory
if [ -d gh-pages-benchmark-data ]; then
git worktree remove gh-pages-benchmark-data --force || rm -rf gh-pages-benchmark-data
fi
if git ls-remote --exit-code --heads origin gh-pages; then
git fetch --depth=1 origin gh-pages
git worktree add gh-pages-benchmark-data FETCH_HEAD
else
git worktree add --detach gh-pages-benchmark-data
git -C gh-pages-benchmark-data checkout --orphan gh-pages
git -C gh-pages-benchmark-data rm -rf .
fi
- name: Publish benchmark data to gh-pages
working-directory: gh-pages-benchmark-data
run: |
mkdir -p benchmarks/history
cp "../$BENCHMARK_JSON" benchmarks/latest.json
cp "../$BENCHMARK_JSON" "benchmarks/history/${GITHUB_SHA}.json"
git config user.name "github-actions[bot]"
git config user.email "41898282+github-actions[bot]@users.noreply.github.com"
git add benchmarks/latest.json "benchmarks/history/${GITHUB_SHA}.json"
git commit -m "Update BW benchmark data for ${GITHUB_SHA}" || exit 0
git push origin HEAD:gh-pages
+7 -17
View File
@@ -1,19 +1,19 @@
name: Full CI
on:
on:
push:
pull_request:
workflow_dispatch:
inputs:
BEC_WIDGETS_BRANCH:
description: "Branch of BEC Widgets to install"
description: 'Branch of BEC Widgets to install'
required: false
type: string
BEC_CORE_BRANCH:
description: "Branch of BEC Core to install"
description: 'Branch of BEC Core to install'
required: false
type: string
OPHYD_DEVICES_BRANCH:
description: "Branch of Ophyd Devices to install"
description: 'Branch of Ophyd Devices to install'
required: false
type: string
@@ -23,7 +23,6 @@ concurrency:
permissions:
pull-requests: write
contents: read
jobs:
check_pr_status:
@@ -34,15 +33,6 @@ jobs:
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/formatter.yml
benchmark:
needs: [check_pr_status]
if: needs.check_pr_status.outputs.branch-pr == ''
permissions:
contents: write
issues: write
pull-requests: write
uses: ./.github/workflows/benchmark.yml
unit-test:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
@@ -79,9 +69,9 @@ jobs:
uses: ./.github/workflows/child_repos.yml
with:
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main'}}
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main'}}
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
plugin_repos:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
@@ -91,4 +81,4 @@ jobs:
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
secrets:
GH_READ_TOKEN: ${{ secrets.GH_READ_TOKEN }}
GH_READ_TOKEN: ${{ secrets.GH_READ_TOKEN }}
+1 -1
View File
@@ -55,5 +55,5 @@ jobs:
uses: actions/upload-artifact@v4
with:
name: pytest-logs
path: ./bec/logs/*.log
path: ./logs/*.log
retention-days: 7
+13 -12
View File
@@ -1,25 +1,25 @@
name: Run Pytest with different Python versions
on:
on:
workflow_call:
inputs:
pr_number:
description: "Pull request number"
description: 'Pull request number'
required: false
type: number
BEC_CORE_BRANCH:
description: "Branch of BEC Core to install"
description: 'Branch of BEC Core to install'
required: false
default: "main"
default: 'main'
type: string
OPHYD_DEVICES_BRANCH:
description: "Branch of Ophyd Devices to install"
description: 'Branch of Ophyd Devices to install'
required: false
default: "main"
default: 'main'
type: string
BEC_WIDGETS_BRANCH:
description: "Branch of BEC Widgets to install"
description: 'Branch of BEC Widgets to install'
required: false
default: "main"
default: 'main'
type: string
jobs:
@@ -30,14 +30,15 @@ jobs:
python-version: ["3.11", "3.12", "3.13"]
env:
BEC_WIDGETS_BRANCH: main # Set the branch you want for bec_widgets
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
BEC_WIDGETS_BRANCH: main # Set the branch you want for bec_widgets
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
PROJECT_PATH: ${{ github.repository }}
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
steps:
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
@@ -55,4 +56,4 @@ jobs:
- name: Run Pytest
run: |
pip install pytest pytest-random-order
pytest -v --junitxml=report.xml --random-order --ignore=tests/unit_tests/benchmarks ./tests/unit_tests
pytest -v --junitxml=report.xml --random-order ./tests/unit_tests
+12 -10
View File
@@ -1,30 +1,32 @@
name: Run Pytest with Coverage
on:
on:
workflow_call:
inputs:
pr_number:
description: "Pull request number"
description: 'Pull request number'
required: false
type: number
BEC_CORE_BRANCH:
description: "Branch of BEC Core to install"
description: 'Branch of BEC Core to install'
required: false
default: "main"
default: 'main'
type: string
OPHYD_DEVICES_BRANCH:
description: "Branch of Ophyd Devices to install"
description: 'Branch of Ophyd Devices to install'
required: false
default: "main"
default: 'main'
type: string
BEC_WIDGETS_BRANCH:
description: "Branch of BEC Widgets to install"
description: 'Branch of BEC Widgets to install'
required: false
default: "main"
default: 'main'
type: string
secrets:
CODECOV_TOKEN:
required: true
permissions:
pull-requests: write
@@ -53,7 +55,7 @@ jobs:
- name: Run Pytest with Coverage
id: coverage
run: pytest --random-order --cov=bec_widgets --cov-config=pyproject.toml --cov-branch --cov-report=xml --no-cov-on-fail --ignore=tests/unit_tests/benchmarks tests/unit_tests/
run: pytest --random-order --cov=bec_widgets --cov-config=pyproject.toml --cov-branch --cov-report=xml --no-cov-on-fail tests/unit_tests/
- name: Upload test artifacts
uses: actions/upload-artifact@v4
@@ -67,4 +69,4 @@ jobs:
uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
slug: bec-project/bec_widgets
slug: bec-project/bec_widgets
+1 -3
View File
@@ -177,6 +177,4 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
#
tombi.toml
#.idea/
-1063
View File
File diff suppressed because it is too large Load Diff
+18 -12
View File
@@ -1,13 +1,19 @@
import os
import sys
import bec_widgets.widgets.containers.qt_ads as QtAds
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
if sys.platform.startswith("linux"):
qt_platform = os.environ.get("QT_QPA_PLATFORM", "")
if qt_platform != "offscreen":
os.environ["QT_QPA_PLATFORM"] = "xcb"
# Default QtAds configuration
QtAds.CDockManager.setConfigFlag(QtAds.CDockManager.eConfigFlag.FocusHighlighting, True)
QtAds.CDockManager.setConfigFlag(
QtAds.CDockManager.eConfigFlag.RetainTabSizeWhenCloseButtonHidden, True
)
__all__ = ["BECWidget", "SafeSlot", "SafeProperty"]
def __getattr__(name):
if name == "BECWidget":
from bec_widgets.utils.bec_widget import BECWidget
return BECWidget
if name in {"SafeSlot", "SafeProperty"}:
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
return {"SafeSlot": SafeSlot, "SafeProperty": SafeProperty}[name]
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
-15
View File
@@ -1,15 +0,0 @@
import os
import sys
import bec_widgets.widgets.containers.qt_ads as QtAds
if sys.platform.startswith("linux"):
qt_platform = os.environ.get("QT_QPA_PLATFORM", "")
if qt_platform != "offscreen":
os.environ["QT_QPA_PLATFORM"] = "xcb"
# Default QtAds configuration
QtAds.CDockManager.setConfigFlag(QtAds.CDockManager.eConfigFlag.FocusHighlighting, True)
QtAds.CDockManager.setConfigFlag(
QtAds.CDockManager.eConfigFlag.RetainTabSizeWhenCloseButtonHidden, True
)
+14 -11
View File
@@ -1,7 +1,5 @@
from __future__ import annotations
from typing import Literal
from bec_lib import bec_logger
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
@@ -11,32 +9,37 @@ logger = bec_logger.logger
def dock_area(
object_name: str | None = None, startup_profile: str | Literal["restore", "skip"] | None = None
object_name: str | None = None, profile: str | None = None, start_empty: bool = False
) -> BECDockArea:
"""
Create an advanced dock area using Qt Advanced Docking System.
Args:
object_name(str): The name of the advanced dock area.
startup_profile(str | Literal["restore", "skip"] | None): Startup mode for
the workspace:
- None: start empty
- "restore": restore last used profile
- "skip": do not initialize profile state
- "<name>": load specific profile
profile(str|None): Optional profile to load; if None the "general" profile is used.
start_empty(bool): If True, start with an empty dock area when loading specified profile.
Returns:
BECDockArea: The created advanced dock area.
Note:
The "general" profile is mandatory and will always exist. If manually deleted,
it will be automatically recreated.
"""
# Default to "general" profile when called from CLI without specifying a profile
effective_profile = profile if profile is not None else "general"
widget = BECDockArea(
object_name=object_name,
restore_initial_profile=True,
root_widget=True,
profile_namespace="bec",
startup_profile=startup_profile,
init_profile=effective_profile,
start_empty=start_empty,
)
logger.info(
f"Created advanced dock area with profile: {effective_profile}, start_empty: {start_empty}"
)
logger.info(f"Created advanced dock area with startup_profile: {startup_profile}")
return widget
+20 -27
View File
@@ -20,13 +20,13 @@ from qtpy.QtWidgets import (
)
import bec_widgets
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets
from bec_widgets.utils.container_utils import WidgetContainerUtils
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.name_utils import pascal_to_snake
from bec_widgets.utils.plugin_utils import get_plugin_auto_updates
from bec_widgets.utils.round_frame import RoundedFrame
from bec_widgets.utils.rpc_register import RPCRegister
from bec_widgets.utils.screen_utils import apply_window_geometry, centered_geometry_for_app
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.utils.ui_loader import UILoader
@@ -43,7 +43,6 @@ if TYPE_CHECKING: # pragma: no cover
logger = bec_logger.logger
MODULE_PATH = os.path.dirname(bec_widgets.__file__)
START_EMPTY_PROFILE_OPTION = "Start Empty (No Profile)"
class LaunchTile(RoundedFrame):
@@ -147,7 +146,8 @@ class LaunchTile(RoundedFrame):
# Action button
self.action_button = QPushButton("Open")
self.action_button.setStyleSheet("""
self.action_button.setStyleSheet(
"""
QPushButton {
background-color: #007AFF;
border: none;
@@ -159,7 +159,8 @@ class LaunchTile(RoundedFrame):
QPushButton:hover {
background-color: #005BB5;
}
""")
"""
)
self.layout.addWidget(self.action_button, alignment=Qt.AlignmentFlag.AlignCenter)
def _fit_label_to_width(self, label: QLabel, max_width: int, min_pt: int = 10):
@@ -188,7 +189,6 @@ class LaunchTile(RoundedFrame):
class LaunchWindow(BECMainWindow):
RPC = True
PLUGIN = False
TILE_SIZE = (250, 300)
DEFAULT_LAUNCH_SIZE = (800, 600)
USER_ACCESS = ["show_launcher", "hide_launcher"]
@@ -353,7 +353,7 @@ class LaunchWindow(BECMainWindow):
def _refresh_dock_area_profiles(self, preserve_selection: bool = True) -> None:
"""
Refresh the dock-area profile selector, optionally preserving the selection.
Defaults to Start Empty when no valid selection can be preserved.
Sets the combobox to the last used profile or "general" if no selection preserved.
Args:
preserve_selection(bool): Whether to preserve the current selection or not.
@@ -368,10 +368,9 @@ class LaunchWindow(BECMainWindow):
)
profiles = list_profiles("bec")
selector_items = [START_EMPTY_PROFILE_OPTION, *profiles]
selector.blockSignals(True)
selector.clear()
for profile in selector_items:
for profile in profiles:
selector.addItem(profile)
if selected_text:
@@ -380,31 +379,21 @@ class LaunchWindow(BECMainWindow):
if idx >= 0:
selector.setCurrentIndex(idx)
else:
# Selection no longer exists, fall back to default startup selection.
# Selection no longer exists, fall back to last profile or "general"
self._set_selector_to_default_profile(selector, profiles)
else:
# No selection to preserve, use default startup selection.
# No selection to preserve, use last profile or "general"
self._set_selector_to_default_profile(selector, profiles)
selector.blockSignals(False)
def _set_selector_to_default_profile(self, selector: QComboBox, profiles: list[str]) -> None:
"""
Set the selector default.
Preference order:
1) Start Empty option (if available)
2) Last used profile
3) First available profile
Set the selector to the last used profile or "general" as fallback.
Args:
selector(QComboBox): The combobox to set.
profiles(list[str]): List of available profiles.
"""
start_empty_idx = selector.findText(START_EMPTY_PROFILE_OPTION, Qt.MatchFlag.MatchExactly)
if start_empty_idx >= 0:
selector.setCurrentIndex(start_empty_idx)
return
# Try to get last used profile
last_profile = get_last_profile(namespace="bec")
if last_profile and last_profile in profiles:
@@ -413,6 +402,13 @@ class LaunchWindow(BECMainWindow):
selector.setCurrentIndex(idx)
return
# Fall back to "general" profile
if "general" in profiles:
idx = selector.findText("general", Qt.MatchFlag.MatchExactly)
if idx >= 0:
selector.setCurrentIndex(idx)
return
# If nothing else, select first item
if selector.count() > 0:
selector.setCurrentIndex(0)
@@ -591,14 +587,11 @@ class LaunchWindow(BECMainWindow):
"""
tile = self.tiles.get("dock_area")
if tile is None or tile.selector is None:
startup_profile = None
profile = None
else:
selection = tile.selector.currentText().strip()
if selection == START_EMPTY_PROFILE_OPTION:
startup_profile = None
else:
startup_profile = selection if selection else None
return self.launch("dock_area", startup_profile=startup_profile)
profile = selection if selection else None
return self.launch("dock_area", profile=profile)
def _open_widget(self):
"""
+23 -205
View File
@@ -1,29 +1,22 @@
from bec_qthemes import material_icon
from qtpy.QtGui import QAction # type: ignore
from qtpy.QtWidgets import QApplication, QHBoxLayout, QStackedWidget, QWidget
from bec_widgets.applications.navigation_centre.reveal_animator import ANIMATION_DURATION
from bec_widgets.applications.navigation_centre.side_bar import SideBar
from bec_widgets.applications.navigation_centre.side_bar_components import NavigationItem
from bec_widgets.applications.views.admin_view.admin_view import AdminView
from bec_widgets.applications.views.developer_view.developer_view import DeveloperView
from bec_widgets.applications.views.device_manager_view.device_manager_view import DeviceManagerView
from bec_widgets.applications.views.dock_area_view.dock_area_view import DockAreaView
from bec_widgets.applications.views.view import ViewBase, WaveformViewInline, WaveformViewPopup
from bec_widgets.utils.colors import apply_theme
from bec_widgets.utils.guided_tour import GuidedTour
from bec_widgets.utils.name_utils import sanitize_namespace
from bec_widgets.utils.screen_utils import (
apply_centered_size,
available_screen_geometry,
main_app_size_for_screen,
)
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow
class BECMainApp(BECMainWindow):
RPC = False
PLUGIN = False
def __init__(
self,
@@ -55,58 +48,51 @@ class BECMainApp(BECMainWindow):
self._add_views()
# Initialize guided tour
self.guided_tour = GuidedTour(self)
self._setup_guided_tour()
def _add_views(self):
self.add_section("BEC Applications", "bec_apps")
self.dock_area = DockAreaView(self)
self.ads = BECDockArea(self, profile_namespace="bec", auto_profile_namespace=False)
self.ads.setObjectName("MainWorkspace")
self.device_manager = DeviceManagerView(self)
# self.developer_view = DeveloperView(self) #TODO temporary disable until the bugs with BECShell are resolved
self.admin_view = AdminView(self)
self.developer_view = DeveloperView(self)
self.add_view(icon="widgets", title="Dock Area", widget=self.dock_area, mini_text="Docks")
self.add_view(
icon="widgets", title="Dock Area", id="dock_area", widget=self.ads, mini_text="Docks"
)
self.add_view(
icon="display_settings",
title="Device Manager",
id="device_manager",
widget=self.device_manager,
mini_text="DM",
)
# TODO temporary disable until the bugs with BECShell are resolved
# self.add_view(
# icon="code_blocks",
# title="IDE",
# widget=self.developer_view,
# mini_text="IDE",
# exclusive=True,
# )
self.add_view(
icon="admin_panel_settings",
title="Admin View",
widget=self.admin_view,
mini_text="Admin",
from_top=False,
icon="code_blocks",
title="IDE",
widget=self.developer_view,
id="developer_view",
exclusive=True,
)
if self._show_examples:
self.add_section("Examples", "examples")
waveform_view_popup = WaveformViewPopup(
parent=self, view_id="waveform_view_popup", title="Waveform Plot"
parent=self, id="waveform_view_popup", title="Waveform Plot"
)
waveform_view_stack = WaveformViewInline(
parent=self, view_id="waveform_view_stack", title="Waveform Plot"
parent=self, id="waveform_view_stack", title="Waveform Plot"
)
self.add_view(
icon="show_chart",
title="Waveform With Popup",
id="waveform_popup",
widget=waveform_view_popup,
mini_text="Popup",
)
self.add_view(
icon="show_chart",
title="Waveform InLine Stack",
id="waveform_stack",
widget=waveform_view_stack,
mini_text="Stack",
)
@@ -114,9 +100,6 @@ class BECMainApp(BECMainWindow):
self.set_current("dock_area")
self.sidebar.add_dark_mode_item()
# Add guided tour to Help menu
self._add_guided_tour_to_menu()
# --- Public API ------------------------------------------------------
def add_section(self, title: str, id: str, position: int | None = None):
return self.sidebar.add_section(title, id, position)
@@ -132,7 +115,7 @@ class BECMainApp(BECMainWindow):
*,
icon: str,
title: str,
view_id: str | None = None,
id: str,
widget: QWidget,
mini_text: str | None = None,
position: int | None = None,
@@ -146,8 +129,7 @@ class BECMainApp(BECMainWindow):
Args:
icon(str): Icon name for the nav item.
title(str): Title for the nav item.
view_id(str, optional): Unique ID for the view/item. If omitted, uses mini_text;
if mini_text is also omitted, uses title.
id(str): Unique ID for the view/item.
widget(QWidget): The widget to add to the stack.
mini_text(str, optional): Short text for the nav item when sidebar is collapsed.
position(int, optional): Position to insert the nav item.
@@ -160,11 +142,10 @@ class BECMainApp(BECMainWindow):
"""
resolved_id = sanitize_namespace(view_id or mini_text or title)
item = self.sidebar.add_item(
icon=icon,
title=title,
id=resolved_id,
id=id,
mini_text=mini_text,
position=position,
from_top=from_top,
@@ -174,15 +155,13 @@ class BECMainApp(BECMainWindow):
# Wrap plain widgets into a ViewBase so enter/exit hooks are available
if isinstance(widget, ViewBase):
view_widget = widget
view_widget.view_id = resolved_id
view_widget.view_id = id
view_widget.view_title = title
else:
view_widget = ViewBase(content=widget, parent=self, view_id=resolved_id, title=title)
view_widget.change_object_name(resolved_id)
view_widget = ViewBase(content=widget, parent=self, id=id, title=title)
idx = self.stack.addWidget(view_widget)
self._view_index[resolved_id] = idx
self._view_index[id] = idx
return item
def set_current(self, id: str) -> None:
@@ -191,12 +170,6 @@ class BECMainApp(BECMainWindow):
# Internal: route sidebar selection to the stack
def _on_view_selected(self, vid: str) -> None:
# Special handling for views that can not be switched to (e.g. dark mode toggle)
# Not registered as proper view with a stack index, so we ignore any logic below
# as it will anyways not result in a stack switch.
idx = self._view_index.get(vid)
if idx is None or not (0 <= idx < self.stack.count()):
return
# Determine current view
current_index = self.stack.currentIndex()
current_view = (
@@ -222,160 +195,6 @@ class BECMainApp(BECMainWindow):
if hasattr(new_view, "on_enter"):
new_view.on_enter()
def _setup_guided_tour(self):
"""
Setup the guided tour for the main application.
Registers key UI components and delegates to views for their internal components.
"""
tour_steps = []
# --- General Layout Components ---
# Register the sidebar toggle button
toggle_step = self.guided_tour.register_widget(
widget=self.sidebar.toggle,
title="Sidebar Toggle",
text="Click this button to expand or collapse the sidebar. When expanded, you can see full navigation item titles and section names.",
)
tour_steps.append(toggle_step)
# Register the sidebar icons
sidebar_dock_area = self.sidebar.components.get("dock_area")
if sidebar_dock_area:
dock_step = self.guided_tour.register_widget(
widget=sidebar_dock_area,
title="Dock Area View",
text="Click here to access the Dock Area view, where you can manage and arrange your dockable panels.",
)
tour_steps.append(dock_step)
sidebar_device_manager = self.sidebar.components.get("device_manager")
if sidebar_device_manager:
device_manager_step = self.guided_tour.register_widget(
widget=sidebar_device_manager,
title="Device Manager View",
text="Click here to open the Device Manager view, where you can view and manage device configs.",
)
tour_steps.append(device_manager_step)
sidebar_developer_view = self.sidebar.components.get("developer_view")
if sidebar_developer_view:
developer_view_step = self.guided_tour.register_widget(
widget=sidebar_developer_view,
title="Developer View",
text="Click here to access the Developer view to write scripts and macros.",
)
tour_steps.append(developer_view_step)
# Register the dark mode toggle
dark_mode_item = self.sidebar.components.get("dark_mode")
if dark_mode_item:
dark_mode_step = self.guided_tour.register_widget(
widget=dark_mode_item,
title="Theme Toggle",
text="Switch between light and dark themes. The theme preference is saved and will be applied when you restart the application.",
)
tour_steps.append(dark_mode_step)
# Register the client info label
if hasattr(self, "_client_info_hover"):
client_info_step = self.guided_tour.register_widget(
widget=self._client_info_hover,
title="Client Status",
text="Displays status messages and information from the BEC Server.",
)
tour_steps.append(client_info_step)
# Register the scan progress bar if available
if hasattr(self, "_scan_progress_hover"):
progress_step = self.guided_tour.register_widget(
widget=self._scan_progress_hover,
title="Scan Progress",
text="Monitor the progress of ongoing scans. Hover over the progress bar to see detailed information including elapsed time and estimated completion.",
)
tour_steps.append(progress_step)
# Register the notification indicator in the status bar
if hasattr(self, "notification_indicator"):
notif_step = self.guided_tour.register_widget(
widget=self.notification_indicator,
title="Notification Center",
text="View system notifications, errors, and status updates. Click to filter notifications by type or expand to see all details.",
)
tour_steps.append(notif_step)
# --- View-Specific Components ---
# Register all views that can extend the tour
for view_id, view_index in self._view_index.items():
view_widget = self.stack.widget(view_index)
if not view_widget or not hasattr(view_widget, "register_tour_steps"):
continue
# Get the view's tour steps
view_tour = view_widget.register_tour_steps(self.guided_tour, self)
if view_tour is None:
if hasattr(view_widget.content, "register_tour_steps"):
view_tour = view_widget.content.register_tour_steps(self.guided_tour, self)
if view_tour is None:
continue
# Get the corresponding sidebar navigation item
nav_item = self.sidebar.components.get(view_id)
if not nav_item:
continue
# Use the view's title for the navigation button
nav_step = self.guided_tour.register_widget(
widget=nav_item,
title=view_tour.view_title,
text=f"Let's explore the features of the {view_tour.view_title}.",
)
tour_steps.append(nav_step)
tour_steps.extend(view_tour.step_ids)
# Create the tour with all registered steps
if tour_steps:
self.guided_tour.create_tour(tour_steps)
def start_guided_tour(self):
"""
Public method to start the guided tour.
This can be called programmatically or connected to a menu/button action.
"""
self.guided_tour.start_tour()
def _add_guided_tour_to_menu(self):
"""
Add a 'Guided Tour' action to the Help menu.
"""
# Find the Help menu
menu_bar = self.menuBar()
help_menu = None
for action in menu_bar.actions():
if action.text() == "Help":
help_menu = action.menu()
break
if help_menu:
# Add separator before the tour action
help_menu.addSeparator()
# Create and add the guided tour action
tour_action = QAction("Start Guided Tour", self)
tour_action.setIcon(material_icon("help"))
tour_action.triggered.connect(self.start_guided_tour)
tour_action.setShortcut("F1") # Add keyboard shortcut
help_menu.addAction(tour_action)
def cleanup(self):
for view_id, idx in self._view_index.items():
view = self.stack.widget(idx)
view.close()
view.deleteLater()
super().cleanup()
def main(): # pragma: no cover
"""
@@ -394,7 +213,6 @@ def main(): # pragma: no cover
args, qt_args = parser.parse_known_args(sys.argv[1:])
app = QApplication([sys.argv[0], *qt_args])
app.setApplicationName("BEC")
apply_theme("dark")
w = BECMainApp(show_examples=args.examples)
@@ -127,10 +127,12 @@ class NavigationItem(QWidget):
self._icon_size_expanded = QtCore.QSize(26, 26)
self.icon_btn.setIconSize(self._icon_size_collapsed)
# Remove QToolButton hover/pressed background/outline
self.icon_btn.setStyleSheet("""
self.icon_btn.setStyleSheet(
"""
QToolButton:hover { background: transparent; border: none; }
QToolButton:pressed { background: transparent; border: none; }
""")
"""
)
# Mini label below icon
self.mini_lbl = QLabel(self._mini_text, self)
@@ -1,35 +0,0 @@
"""Module for Admin View."""
from qtpy.QtWidgets import QWidget
from bec_widgets.applications.views.view import ViewBase
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.services.bec_atlas_admin_view.bec_atlas_admin_view import BECAtlasAdminView
class AdminView(ViewBase):
"""
A view for administrators to change the current active experiment, manage messaging
services, and more tasks reserved for users with admin privileges.
"""
def __init__(
self,
parent: QWidget | None = None,
content: QWidget | None = None,
*,
view_id: str | None = None,
title: str | None = None,
**kwargs,
):
super().__init__(parent=parent, content=content, view_id=view_id, title=title)
self.admin_widget = BECAtlasAdminView(parent=self)
self.set_content(self.admin_widget)
@SafeSlot()
def on_exit(self) -> None:
"""Called before the view is hidden.
Default implementation does nothing. Override in subclasses.
"""
self.admin_widget.logout()
@@ -1,7 +1,9 @@
from __future__ import annotations
from qtpy.QtWidgets import QWidget
from bec_widgets.applications.views.developer_view.developer_widget import DeveloperWidget
from bec_widgets.applications.views.view import ViewBase, ViewTourSteps
from bec_widgets.applications.views.view import ViewBase
class DeveloperView(ViewBase):
@@ -14,89 +16,13 @@ class DeveloperView(ViewBase):
parent: QWidget | None = None,
content: QWidget | None = None,
*,
view_id: str | None = None,
id: str | None = None,
title: str | None = None,
**kwargs,
):
super().__init__(parent=parent, content=content, view_id=view_id, title=title, **kwargs)
super().__init__(parent=parent, content=content, id=id, title=title)
self.developer_widget = DeveloperWidget(parent=self)
self.set_content(self.developer_widget)
def register_tour_steps(self, guided_tour, main_app) -> ViewTourSteps | None:
"""Register Developer View components with the guided tour.
Args:
guided_tour: The GuidedTour instance to register with.
main_app: The main application instance (for accessing set_current).
Returns:
ViewTourSteps | None: Model containing view title and step IDs.
"""
step_ids = []
dev_widget = self.developer_widget
# IDE Toolbar
def get_ide_toolbar():
main_app.set_current("developer_view")
return (dev_widget.toolbar, None)
step_id = guided_tour.register_widget(
widget=get_ide_toolbar,
title="IDE Toolbar",
text="Quick access to save files, execute scripts, and configure IDE settings. Use the toolbar to manage your code and execution.",
)
step_ids.append(step_id)
# IDE Explorer
def get_ide_explorer():
main_app.set_current("developer_view")
return (dev_widget.explorer_dock.widget(), None)
step_id = guided_tour.register_widget(
widget=get_ide_explorer,
title="File Explorer",
text="Browse and manage your macro files. Create new files, open existing ones, and organize your scripts.",
)
step_ids.append(step_id)
# IDE Editor
def get_ide_editor():
main_app.set_current("developer_view")
return (dev_widget.monaco_dock.widget(), None)
step_id = guided_tour.register_widget(
widget=get_ide_editor,
title="Code Editor",
text="Write and edit Python code with syntax highlighting, auto-completion, and signature help. Monaco editor provides a modern coding experience.",
)
step_ids.append(step_id)
# IDE Console
def get_ide_console():
main_app.set_current("developer_view")
return (dev_widget.console_dock.widget(), None)
step_id = guided_tour.register_widget(
widget=get_ide_console,
title="BEC Shell Console",
text="Interactive Python console with BEC integration. Execute commands, test code snippets, and interact with the BEC system in real-time.",
)
step_ids.append(step_id)
# IDE Plotting Area
def get_ide_plotting():
main_app.set_current("developer_view")
return (dev_widget.plotting_ads, None)
step_id = guided_tour.register_widget(
widget=get_ide_plotting,
title="Plotting Area",
text="View plots and visualizations generated by your scripts. Arrange multiple plots in a flexible layout.",
)
step_ids.append(step_id)
return ViewTourSteps(view_title="Developer View", step_ids=step_ids)
if __name__ == "__main__":
import sys
@@ -126,11 +52,7 @@ if __name__ == "__main__":
_app.resize(width, height)
developer_view = DeveloperView()
_app.add_view(
icon="code_blocks",
title="IDE",
widget=developer_view,
view_id="developer_view",
exclusive=True,
icon="code_blocks", title="IDE", widget=developer_view, id="developer_view", exclusive=True
)
_app.show()
# developer_view.show()
@@ -16,9 +16,9 @@ from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.widgets.containers.dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
from bec_widgets.widgets.containers.qt_ads import CDockWidget
from bec_widgets.widgets.editors.bec_console.bec_console import BecConsole, BECShell
from bec_widgets.widgets.editors.monaco.monaco_dock import MonacoDock
from bec_widgets.widgets.editors.monaco.monaco_widget import MonacoWidget
from bec_widgets.widgets.editors.web_console.web_console import BECShell, WebConsole
from bec_widgets.widgets.utility.ide_explorer.ide_explorer import IDEExplorer
@@ -79,8 +79,6 @@ def markdown_to_html(md_text: str) -> str:
class DeveloperWidget(DockAreaWidget):
RPC = False
PLUGIN = False
def __init__(self, parent=None, **kwargs):
super().__init__(parent=parent, variant="compact", **kwargs)
@@ -94,11 +92,11 @@ class DeveloperWidget(DockAreaWidget):
self.explorer = IDEExplorer(self)
self.explorer.setObjectName("Explorer")
self.console = BECShell(self, rpc_exposed=False)
self.console = BECShell(self)
self.console.setObjectName("BEC Shell")
self.terminal = BecConsole(self, rpc_exposed=False)
self.terminal = WebConsole(self)
self.terminal.setObjectName("Terminal")
self.monaco = MonacoDock(self, rpc_exposed=False, rpc_passthrough_children=False)
self.monaco = MonacoDock(self)
self.monaco.setObjectName("MonacoEditor")
self.monaco.save_enabled.connect(self._on_save_enabled_update)
self.plotting_ads = BECDockArea(
@@ -410,3 +408,23 @@ class DeveloperWidget(DockAreaWidget):
"""Clean up resources used by the developer widget."""
self.delete_all()
return super().cleanup()
if __name__ == "__main__":
import sys
from bec_qthemes import apply_theme
from qtpy.QtWidgets import QApplication
from bec_widgets.applications.main_app import BECMainApp
app = QApplication(sys.argv)
apply_theme("dark")
_app = BECMainApp()
_app.show()
# developer_view.show()
# developer_view.setWindowTitle("Developer View")
# developer_view.resize(1920, 1080)
# developer_view.set_stretch(horizontal=[1, 3, 2], vertical=[5, 5]) #can be set during runtime
sys.exit(app.exec_())
@@ -31,7 +31,7 @@ logger = bec_logger.logger
class DeviceManagerOphydValidationDialog(QtWidgets.QDialog):
"""Popup dialog to test Ophyd device configurations interactively."""
def __init__(self, parent=None, config: dict | None = None): # type: ignore
def __init__(self, parent=None, config: dict | None = None): # type:ignore
super().__init__(parent)
self.setWindowTitle("Device Manager Ophyd Test")
self._config_status = ConfigStatus.UNKNOWN.value
@@ -133,7 +133,7 @@ class DeviceFormDialog(QtWidgets.QDialog):
# validated: config_status, connection_status
accepted_data = QtCore.Signal(dict, int, int, str, str)
def __init__(self, parent=None, add_btn_text: str = "Add Device"): # type: ignore
def __init__(self, parent=None, add_btn_text: str = "Add Device"): # type:ignore
super().__init__(parent)
# Track old device name if config is edited
self._old_device_name: str = ""
@@ -4,7 +4,7 @@ from __future__ import annotations
from enum import IntEnum
from functools import partial
from typing import TYPE_CHECKING, Any, List, Tuple
from typing import TYPE_CHECKING, List, Tuple
from bec_lib.logger import bec_logger
from bec_qthemes import apply_theme, material_icon
@@ -103,14 +103,16 @@ class CustomBusyWidget(QWidget):
button_width = int(button_height * aspect_ratio)
self.cancel_button.setFixedSize(button_width, button_height)
color = get_accent_colors()
self.cancel_button.setStyleSheet(f"""
self.cancel_button.setStyleSheet(
f"""
QPushButton {{
background-color: {color.emergency.name()};
color: white;
font-weight: 600;
border-radius: 6px;
}}
""")
"""
)
# Layout
content_layout = QVBoxLayout(self)
@@ -126,10 +128,12 @@ class CustomBusyWidget(QWidget):
bg_color = color._colors.get("BG", None)
if bg_color is None: # Fallback if missing
bg_color = QColor(50, 50, 50, 255)
self.setStyleSheet(f"""
self.setStyleSheet(
f"""
background-color: {bg_color.name()};
border-radius: 12px;
""")
"""
)
def _ui_scale(self) -> int:
parent = self.parent()
@@ -169,7 +173,7 @@ class DeviceManagerDisplayWidget(DockAreaWidget):
self._upload_redis_dialog: UploadRedisDialog | None = None
self._dialog_validation_connection: QMetaObject.Connection | None = None
# NOTE: We need here a separate config helper instance to avoid conflicts with
# NOTE: We need here a seperate config helper instance to avoid conflicts with
# other communications to REDIS as uploading a config through a CommunicationConfigAction
# will block if we use the config_helper from self.client.config._config_helper
self._config_helper = config_helper.ConfigHelper(self.client.connector)
@@ -607,8 +611,8 @@ class DeviceManagerDisplayWidget(DockAreaWidget):
self.device_table_view._is_config_in_sync_with_redis()
)
validation_results = self.device_table_view.get_validation_results()
for config, config_status, connection_status in validation_results.values():
if connection_status == ConnectionStatus.CONNECTED.value:
for config, config_status, connnection_status in validation_results.values():
if connnection_status == ConnectionStatus.CONNECTED.value:
self.device_table_view.update_device_validation(
config, config_status, ConnectionStatus.CAN_CONNECT, ""
)
@@ -1,12 +1,11 @@
"""Module for Device Manager View."""
from qtpy.QtCore import QRect
from qtpy.QtWidgets import QWidget
from bec_widgets.applications.views.device_manager_view.device_manager_widget import (
DeviceManagerWidget,
)
from bec_widgets.applications.views.view import ViewBase, ViewTourSteps
from bec_widgets.applications.views.view import ViewBase
from bec_widgets.utils.error_popups import SafeSlot
@@ -20,21 +19,11 @@ class DeviceManagerView(ViewBase):
parent: QWidget | None = None,
content: QWidget | None = None,
*,
view_id: str | None = None,
id: str | None = None,
title: str | None = None,
**kwargs,
):
super().__init__(
parent=parent,
content=content,
view_id=view_id,
title=title,
rpc_passthrough_children=False,
**kwargs,
)
self.device_manager_widget = DeviceManagerWidget(
parent=self, rpc_exposed=False, rpc_passthrough_children=False
)
super().__init__(parent=parent, content=content, id=id, title=title)
self.device_manager_widget = DeviceManagerWidget(parent=self)
self.set_content(self.device_manager_widget)
@SafeSlot()
@@ -45,110 +34,6 @@ class DeviceManagerView(ViewBase):
"""
self.device_manager_widget.on_enter()
def register_tour_steps(self, guided_tour, main_app) -> ViewTourSteps | None:
"""Register Device Manager components with the guided tour.
Args:
guided_tour: The GuidedTour instance to register with.
main_app: The main application instance (for accessing set_current).
Returns:
ViewTourSteps | None: Model containing view title and step IDs.
"""
step_ids = []
dm_widget = self.device_manager_widget
# The device_manager_widget is not yet initialized, so we will register
# tour steps for its uninitialized state.
# Register Load Current Config button
def get_load_current():
main_app.set_current("device_manager")
if dm_widget._initialized is True:
return (None, None)
return (dm_widget.button_load_current_config, None)
step_id = guided_tour.register_widget(
widget=get_load_current,
title="Load Current Config",
text="Load the current device configuration from the BEC server.",
)
step_ids.append(step_id)
# Register Load Config From File button
def get_load_file():
main_app.set_current("device_manager")
if dm_widget._initialized is True:
return (None, None)
return (dm_widget.button_load_config_from_file, None)
step_id = guided_tour.register_widget(
widget=get_load_file,
title="Load Config From File",
text="Load a device configuration from a YAML file on disk.",
)
step_ids.append(step_id)
## Register steps for the initialized state
# Register main device table
def get_device_table():
main_app.set_current("device_manager")
if dm_widget._initialized is False:
return (None, None)
return (dm_widget.device_manager_display.device_table_view, None)
step_id = guided_tour.register_widget(
widget=get_device_table,
title="Device Table",
text="This table displays the config that is prepared to be uploaded to the BEC server. It allows users to review and modify device config settings, and also validate them before uploading to the BEC server.",
)
step_ids.append(step_id)
col_text_mapping = {
0: "Shows if a device configuration is valid. Automatically validated when adding a new device.",
1: "Shows if a device is connectable. Validated on demand.",
2: "Device name, unique across all devices within a config.",
3: "Device class used to initialize the device on the BEC server.",
4: "Defines how BEC treats readings of the device during scans. The options are 'monitored', 'baseline', 'async', 'continuous' or 'on_demand'.",
5: "Defines how BEC reacts if a device readback fails. Options are 'raise', 'retry', or 'buffer'.",
6: "User-defined tags associated with the device.",
7: "A brief description of the device.",
8: "Device is enabled when the configuration is loaded.",
9: "Device is set to read-only.",
10: "This flag allows to configure if the 'trigger' method of the device is called during scans.",
}
# We have at least one device registered
def get_device_table_row(column: int):
main_app.set_current("device_manager")
if dm_widget._initialized is False:
return (None, None)
table = dm_widget.device_manager_display.device_table_view.table
header = table.horizontalHeader()
x = header.sectionViewportPosition(column)
table.horizontalScrollBar().setValue(x)
# Recompute after scrolling
x = header.sectionViewportPosition(column)
w = header.sectionSize(column)
h = header.height()
rect = QRect(x, 0, w, h)
top_left = header.viewport().mapTo(main_app, rect.topLeft())
return (QRect(top_left, rect.size()), col_text_mapping.get(column, ""))
for col, text in col_text_mapping.items():
step_id = guided_tour.register_widget(
widget=lambda col=col: get_device_table_row(col),
title=f"{dm_widget.device_manager_display.device_table_view.table.horizontalHeaderItem(col).text()}",
text=text,
)
step_ids.append(step_id)
if not step_ids:
return None
return ViewTourSteps(view_title="Device Manager", step_ids=step_ids)
if __name__ == "__main__": # pragma: no cover
import sys
@@ -180,7 +65,7 @@ if __name__ == "__main__": # pragma: no cover
_app.add_view(
icon="display_settings",
title="Device Manager",
view_id="device_manager",
id="device_manager",
widget=device_manager_view.device_manager_widget,
mini_text="DM",
)
@@ -22,8 +22,8 @@ class DeviceManagerWidget(BECWidget, QtWidgets.QWidget):
RPC = False
def __init__(self, parent=None, client=None, **kwargs):
super().__init__(parent=parent, client=client, **kwargs)
def __init__(self, parent=None, client=None):
super().__init__(parent=parent, client=client)
self.stacked_layout = QtWidgets.QStackedLayout()
self.stacked_layout.setContentsMargins(0, 0, 0, 0)
self.stacked_layout.setSpacing(0)
@@ -1,31 +0,0 @@
from qtpy.QtWidgets import QWidget
from bec_widgets.applications.views.view import ViewBase
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
class DockAreaView(ViewBase):
"""
Modular dock area view for arranging and managing multiple dockable widgets.
"""
RPC_CONTENT_CLASS = BECDockArea
def __init__(
self,
parent: QWidget | None = None,
content: QWidget | None = None,
*,
view_id: str | None = None,
title: str | None = None,
**kwargs,
):
super().__init__(parent=parent, content=content, view_id=view_id, title=title, **kwargs)
self.dock_area = BECDockArea(
self,
profile_namespace="bec",
auto_profile_namespace=False,
object_name="DockArea",
rpc_exposed=False,
)
self.set_content(self.dock_area)
+52 -73
View File
@@ -1,8 +1,5 @@
from __future__ import annotations
from typing import List
from pydantic import BaseModel
from qtpy.QtCore import QEventLoop
from qtpy.QtWidgets import (
QDialog,
@@ -17,26 +14,14 @@ from qtpy.QtWidgets import (
QWidget,
)
from bec_widgets import BECWidget
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.toolbars.status_bar import StatusToolBar
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import SignalComboBox
from bec_widgets.widgets.plots.waveform.waveform import Waveform
class ViewTourSteps(BaseModel):
"""Model representing tour steps for a view.
Attributes:
view_title: The human-readable title of the view.
step_ids: List of registered step IDs in the order they should appear.
"""
view_title: str
step_ids: List[str]
class ViewBase(BECWidget, QWidget):
class ViewBase(QWidget):
"""Wrapper for a content widget used inside the main app's stacked view.
Subclasses can implement `on_enter` and `on_exit` to run custom logic when the view becomes visible or is about to be hidden.
@@ -44,43 +29,72 @@ class ViewBase(BECWidget, QWidget):
Args:
content (QWidget): The actual view widget to display.
parent (QWidget | None): Parent widget.
view_id (str | None): Optional view view_id, useful for debugging or introspection.
id (str | None): Optional view id, useful for debugging or introspection.
title (str | None): Optional human-readable title.
show_status (bool): Whether to show a status toolbar at the top of the view.
"""
RPC = True
PLUGIN = False
USER_ACCESS = ["activate"]
RPC_CONTENT_CLASS: type[QWidget] | None = None
RPC_CONTENT_ATTR = "content"
def __init__(
self,
parent: QWidget | None = None,
content: QWidget | None = None,
*,
view_id: str | None = None,
id: str | None = None,
title: str | None = None,
**kwargs,
show_status: bool = False,
status_names: list[str] | None = None,
):
super().__init__(parent=parent, **kwargs)
super().__init__(parent=parent)
self.content: QWidget | None = None
self.view_id = view_id
self.view_id = id
self.view_title = title
lay = QVBoxLayout(self)
lay.setContentsMargins(0, 0, 0, 0)
lay.setSpacing(0)
self.status_bar: StatusToolBar | None = None
if show_status:
# If explicit status names are provided, default to showing only those.
show_all = status_names is None
self.setup_status_bar(show_all_status=show_all, status_names=status_names)
if content is not None:
self.set_content(content)
def set_content(self, content: QWidget) -> None:
"""Replace the current content widget with a new one."""
if self.content is not None:
self.layout().removeWidget(self.content)
self.content.setParent(None)
self.content.close()
self.content.deleteLater()
self.content = content
self.layout().addWidget(content)
if self.status_bar is not None:
insert_at = self.layout().indexOf(self.status_bar) + 1
self.layout().insertWidget(insert_at, content)
else:
self.layout().addWidget(content)
def setup_status_bar(
self, *, show_all_status: bool = True, status_names: list[str] | None = None
) -> None:
"""Create and attach a status toolbar managed by the status broker."""
if self.status_bar is not None:
return
names_arg = None if show_all_status else status_names
self.status_bar = StatusToolBar(parent=self, names=names_arg)
self.layout().addWidget(self.status_bar)
def set_status(
self, name: str = "main", *, state=None, text: str | None = None, tooltip: str | None = None
) -> None:
"""Manually set a status item on the status bar."""
if self.status_bar is None:
self.setup_status_bar(show_all_status=True)
if self.status_bar is None:
return
self.status_bar.set_status(name=name, state=state, text=text, tooltip=tooltip)
@SafeSlot()
def on_enter(self) -> None:
@@ -99,41 +113,6 @@ class ViewBase(BECWidget, QWidget):
"""
return True
@SafeSlot()
def activate(self) -> None:
"""Switch the parent application to this view."""
if not self.view_id:
raise ValueError("Cannot switch view without a view_id.")
parent = self.parent()
while parent is not None:
if hasattr(parent, "set_current"):
parent.set_current(self.view_id)
return
parent = parent.parent()
raise RuntimeError("Could not find a parent application with set_current().")
def cleanup(self):
if self.content is not None:
self.content.close()
self.content.deleteLater()
super().cleanup()
def register_tour_steps(self, guided_tour, main_app) -> ViewTourSteps | None:
"""Register this view's components with the guided tour.
Args:
guided_tour: The GuidedTour instance to register with.
main_app: The main application instance (for accessing set_current).
Returns:
ViewTourSteps | None: A model containing the view title and step IDs,
or None if this view has no tour steps.
Override this method in subclasses to register view-specific components.
"""
return None
####################################################################################################
# Example views for demonstration/testing purposes
@@ -160,17 +139,17 @@ class WaveformViewPopup(ViewBase): # pragma: no cover
self.device_edit.insertItem(0, "")
self.device_edit.setEditable(True)
self.device_edit.setCurrentIndex(0)
self.signal_edit = SignalComboBox(parent=self)
self.signal_edit.include_config_signals = False
self.signal_edit.insertItem(0, "")
self.signal_edit.setEditable(True)
self.device_edit.currentTextChanged.connect(self.signal_edit.set_device)
self.device_edit.device_reset.connect(self.signal_edit.reset_selection)
self.entry_edit = SignalComboBox(parent=self)
self.entry_edit.include_config_signals = False
self.entry_edit.insertItem(0, "")
self.entry_edit.setEditable(True)
self.device_edit.currentTextChanged.connect(self.entry_edit.set_device)
self.device_edit.device_reset.connect(self.entry_edit.reset_selection)
form = QFormLayout()
form.addRow(label)
form.addRow("Device", self.device_edit)
form.addRow("Signal", self.signal_edit)
form.addRow("Signal", self.entry_edit)
buttons = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel, parent=dialog)
buttons.accepted.connect(dialog.accept)
@@ -182,7 +161,7 @@ class WaveformViewPopup(ViewBase): # pragma: no cover
if dialog.exec_() == QDialog.Accepted:
self.waveform.plot(
device_y=self.device_edit.currentText(), signal_y=self.signal_edit.currentText()
y_name=self.device_edit.currentText(), y_entry=self.entry_edit.currentText()
)
@SafeSlot()
@@ -307,7 +286,7 @@ class WaveformViewInline(ViewBase): # pragma: no cover
dev = self.device_edit.currentText()
sig = self.entry_edit.currentText()
if dev and sig:
self.waveform.plot(device_y=dev, signal_y=sig)
self.waveform.plot(y_name=dev, y_entry=sig)
self.stack.setCurrentIndex(1)
def _show_waveform_without_changes(self):
-1
View File
@@ -1 +0,0 @@
from bec_widgets.cli.rpc import rpc_base
+170 -587
View File
File diff suppressed because it is too large Load Diff
+37 -86
View File
@@ -10,9 +10,9 @@ import threading
import time
from contextlib import contextmanager
from threading import Lock
from typing import TYPE_CHECKING, Callable, Literal, TypeAlias, cast
from typing import TYPE_CHECKING, Literal, TypeAlias, cast
from bec_lib.endpoints import EndpointInfo, MessageEndpoints
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
from rich.console import Console
@@ -232,11 +232,6 @@ class BECGuiClient(RPCBase):
"""The launcher object."""
return RPCBase(gui_id=f"{self._gui_id}:launcher", parent=self, object_name="launcher")
def _safe_register_stream(self, endpoint: EndpointInfo, cb: Callable, **kwargs):
"""Check if already registered for registration in idempotent functions."""
if not self._client.connector.any_stream_is_registered(endpoint, cb=cb):
self._client.connector.register(endpoint, cb=cb, **kwargs)
def connect_to_gui_server(self, gui_id: str) -> None:
"""Connect to a GUI server"""
# Unregister the old callback
@@ -252,9 +247,10 @@ class BECGuiClient(RPCBase):
self._ipython_registry = {}
# Register the new callback
self._safe_register_stream(
self._client.connector.register(
MessageEndpoints.gui_registry_state(self._gui_id),
cb=self._handle_registry_update,
parent=self,
from_start=True,
)
@@ -301,32 +297,14 @@ class BECGuiClient(RPCBase):
return self._raise_all()
return self._start(wait=wait)
def change_theme(self, theme: Literal["light", "dark"] | None = None) -> None:
"""
Apply a GUI theme or toggle between dark and light.
Args:
theme(Literal["light", "dark"] | None): Theme to apply. If None, the current
theme is fetched from the GUI and toggled.
"""
if not self._check_if_server_is_alive():
self._start(wait=True)
with wait_for_server(self):
if theme is None:
current_theme = self.launcher._run_rpc("fetch_theme")
next_theme = "light" if current_theme == "dark" else "dark"
else:
next_theme = theme
self.launcher._run_rpc("change_theme", theme=next_theme)
def new(
self,
name: str | None = None,
wait: bool = True,
geometry: tuple[int, int, int, int] | None = None,
launch_script: str = "dock_area",
startup_profile: str | Literal["restore", "skip"] | None = None,
profile: str | None = None,
start_empty: bool = False,
**kwargs,
) -> client.AdvancedDockArea:
"""Create a new top-level dock area.
@@ -336,81 +314,48 @@ class BECGuiClient(RPCBase):
wait(bool, optional): Whether to wait for the server to start. Defaults to True.
geometry(tuple[int, int, int, int] | None): The geometry of the dock area (pos_x, pos_y, w, h).
launch_script(str): The launch script to use. Defaults to "dock_area".
startup_profile(str | Literal["restore", "skip"] | None): Startup mode for
the dock area:
- None: start in transient empty workspace
- "restore": restore last-used profile
- "skip": skip profile initialization
- "<name>": load the named profile
profile(str | None): The profile name to load. If None, loads the "general" profile.
Use a profile name to load a specific saved profile.
start_empty(bool): If True, start with an empty dock area when loading specified profile.
**kwargs: Additional keyword arguments passed to the dock area.
Returns:
client.AdvancedDockArea: The new dock area.
Examples:
>>> gui.new() # Start with an empty unsaved workspace
>>> gui.new(startup_profile="restore") # Restore last profile
>>> gui.new(startup_profile="my_profile") # Load explicit profile
"""
if "profile" in kwargs or "start_empty" in kwargs:
raise TypeError(
"gui.new() no longer accepts 'profile' or 'start_empty'. Use 'startup_profile' instead."
)
Note:
The "general" profile is mandatory and will always exist. If manually deleted,
it will be automatically recreated.
Examples:
>>> gui.new() # Start with the "general" profile
>>> gui.new(profile="my_profile") # Load specific profile, if profile does not exist, the new profile is created empty with specified name
>>> gui.new(start_empty=True) # Start with "general" profile but empty dock area
>>> gui.new(profile="my_profile", start_empty=True) # Start with "my_profile" profile but empty dock area
"""
if not self._check_if_server_is_alive():
self.start(wait=True)
if wait:
with wait_for_server(self):
return self._new_impl(
name=name,
geometry=geometry,
widget = self.launcher._run_rpc(
"launch",
launch_script=launch_script,
startup_profile=startup_profile,
**kwargs,
)
return self._new_impl(
name=name,
geometry=geometry,
launch_script=launch_script,
startup_profile=startup_profile,
**kwargs,
)
def _new_impl(
self,
*,
name: str | None,
geometry: tuple[int, int, int, int] | None,
launch_script: str,
startup_profile: str | Literal["restore", "skip"] | None,
**kwargs,
):
if launch_script == "dock_area":
try:
return self.launcher._run_rpc(
"system.launch_dock_area",
name=name,
geometry=geometry,
startup_profile=startup_profile,
profile=profile,
start_empty=start_empty,
**kwargs,
)
except ValueError as exc:
error = str(exc)
if (
"Unknown system RPC method: system.launch_dock_area" not in error
and "has no attribute 'system.launch_dock_area'" not in error
):
raise
logger.debug("Server does not support system.launch_dock_area; using launcher RPC")
return self.launcher._run_rpc(
) # pylint: disable=protected-access
return widget
widget = self.launcher._run_rpc(
"launch",
launch_script=launch_script,
name=name,
geometry=geometry,
startup_profile=startup_profile,
profile=profile,
start_empty=start_empty,
**kwargs,
) # pylint: disable=protected-access
return widget
def delete(self, name: str) -> None:
"""Delete a dock area and its parent window.
@@ -535,14 +480,20 @@ class BECGuiClient(RPCBase):
def _start(self, wait: bool = False) -> None:
self._killed = False
self._safe_register_stream(
MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update
self._client.connector.register(
MessageEndpoints.gui_registry_state(self._gui_id),
cb=self._handle_registry_update,
parent=self,
)
return self._start_server(wait=wait)
def _handle_registry_update(self, msg: dict[str, GUIRegistryStateMessage]) -> None:
@staticmethod
def _handle_registry_update(
msg: dict[str, GUIRegistryStateMessage], parent: BECGuiClient
) -> None:
# This was causing a deadlock during shutdown, not sure why.
# with self._lock:
self = parent
self._server_registry = cast(dict[str, RegistryState], msg["data"].state)
self._update_dynamic_namespace(self._server_registry)
@@ -7,7 +7,6 @@ import inspect
import os
import sys
from pathlib import Path
from typing import get_overloads
import black
import isort
@@ -19,6 +18,20 @@ from bec_widgets.utils.plugin_utils import BECClassContainer, get_custom_classes
logger = bec_logger.logger
if sys.version_info >= (3, 11):
from typing import get_overloads
else:
print(
"Python version is less than 3.11, using dummy function for get_overloads. "
"If you want to use the real function 'typing.get_overloads()', please use Python 3.11 or later."
)
def get_overloads(_obj):
"""
Dummy function for Python versions before 3.11.
"""
return []
class ClientGenerator:
def __init__(self, base=False):
@@ -41,7 +54,7 @@ from __future__ import annotations
from bec_lib.logger import bec_logger
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call, rpc_timeout
{"from bec_widgets.utils.bec_plugin_helper import get_plugin_client_module" if self._base else ""}
{"from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets, get_plugin_client_module" if self._base else ""}
logger = bec_logger.logger
@@ -81,7 +94,7 @@ logger = bec_logger.logger
if self._base:
self.content += """
class _WidgetsEnumType(str, enum.Enum):
\"\"\" Enum for the available widgets, to be generated programmatically \"\"\"
\"\"\" Enum for the available widgets, to be generated programatically \"\"\"
...
"""
@@ -98,19 +111,27 @@ _Widgets = {
self.content += """
try:
_plugin_widgets = get_all_plugin_widgets().as_dict()
plugin_client = get_plugin_client_module()
Widgets = _WidgetsEnumType("Widgets", {name: name for name in _plugin_widgets} | _Widgets)
if (_overlap := _Widgets.keys() & _plugin_widgets.keys()) != set():
for _widget in _overlap:
logger.warning(f"Detected duplicate widget {_widget} in plugin repo file: {inspect.getfile(_plugin_widgets[_widget])} !")
for plugin_name, plugin_class in inspect.getmembers(plugin_client, inspect.isclass):
if issubclass(plugin_class, RPCBase) and plugin_class is not RPCBase:
if plugin_name not in _Widgets:
_Widgets[plugin_name] = plugin_name
if plugin_name in globals():
conflicting_file = (
inspect.getfile(_plugin_widgets[plugin_name])
if plugin_name in _plugin_widgets
else f"{plugin_client}"
)
logger.warning(
f"Plugin widget {plugin_name} in {plugin_class._IMPORT_MODULE} conflicts with a built-in class!"
f"Plugin widget {plugin_name} from {conflicting_file} conflicts with a built-in class!"
)
continue
else:
if plugin_name not in _overlap:
globals()[plugin_name] = plugin_class
Widgets = _WidgetsEnumType("Widgets", _Widgets)
except ImportError as e:
logger.error(f"Failed loading plugins: \\n{reduce(add, traceback.format_exception(e))}")
"""
@@ -125,8 +146,12 @@ except ImportError as e:
class_name = cls.__name__
self.content += f"""
class {class_name}(RPCBase):\n"""
if class_name == "BECDockArea":
self.content += f"""
class {class_name}(RPCBase):"""
else:
self.content += f"""
class {class_name}(RPCBase):"""
if cls.__doc__:
# We only want the first line of the docstring
@@ -137,11 +162,19 @@ class {class_name}(RPCBase):\n"""
else:
class_docs = cls.__doc__.split("\n")[1]
self.content += f"""
\"\"\"{class_docs}\"\"\"\n"""
user_access_entries = self._get_user_access_entries(cls)
self.content += f' _IMPORT_MODULE="{cls.__module__}"\n'
for method_entry in user_access_entries:
method, obj, is_property_setter = self._resolve_method_object(cls, method_entry)
\"\"\"{class_docs}\"\"\"
"""
if not cls.USER_ACCESS:
self.content += """...
"""
for method in cls.USER_ACCESS:
is_property_setter = False
obj = getattr(cls, method, None)
if obj is None:
obj = getattr(cls, method.split(".setter")[0], None)
is_property_setter = True
method = method.split(".setter")[0]
if obj is None:
raise AttributeError(
f"Method {method} not found in class {cls.__name__}. "
@@ -183,34 +216,6 @@ class {class_name}(RPCBase):\n"""
{doc}
\"\"\""""
@staticmethod
def _get_user_access_entries(cls) -> list[str]:
entries = list(getattr(cls, "USER_ACCESS", []))
content_cls = getattr(cls, "RPC_CONTENT_CLASS", None)
if content_cls is not None:
entries.extend(getattr(content_cls, "USER_ACCESS", []))
return list(dict.fromkeys(entries))
@staticmethod
def _resolve_method_object(cls, method_entry: str):
method_name = method_entry
is_property_setter = False
if method_entry.endswith(".setter"):
is_property_setter = True
method_name = method_entry.split(".setter")[0]
candidate_classes = [cls]
content_cls = getattr(cls, "RPC_CONTENT_CLASS", None)
if content_cls is not None:
candidate_classes.append(content_cls)
for candidate_cls in candidate_classes:
obj = getattr(candidate_cls, method_name, None)
if obj is not None:
return method_name, obj, is_property_setter
return method_name, None, is_property_setter
def _rpc_call(self, timeout_info: dict[str, float | None]):
"""
Decorator to mark a method as an RPC call.
@@ -286,8 +291,7 @@ def main():
client_path = module_dir / client_subdir / "client.py"
packages = ("widgets", "applications") if module_name == "bec_widgets" else ("widgets",)
rpc_classes = get_custom_classes(module_name, packages=packages)
rpc_classes = get_custom_classes(module_name)
logger.info(f"Obtained classes with RPC objects: {rpc_classes!r}")
generator = ClientGenerator(base=module_name == "bec_widgets")
+7 -9
View File
@@ -248,7 +248,9 @@ class RPCBase:
self._rpc_response = None
self._msg_wait_event.clear()
self._client.connector.register(
MessageEndpoints.gui_instruction_response(request_id), cb=self._on_rpc_response
MessageEndpoints.gui_instruction_response(request_id),
cb=self._on_rpc_response,
parent=self,
)
self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg)
@@ -274,10 +276,11 @@ class RPCBase:
self._rpc_response = None
return self._create_widget_from_msg_result(msg_result)
def _on_rpc_response(self, msg_obj: MessageObject) -> None:
@staticmethod
def _on_rpc_response(msg_obj: MessageObject, parent: RPCBase) -> None:
msg = cast(messages.RequestResponseMessage, msg_obj.value)
self._rpc_response = msg
self._msg_wait_event.set()
parent._rpc_response = msg
parent._msg_wait_event.set()
def _create_widget_from_msg_result(self, msg_result):
if msg_result is None:
@@ -289,11 +292,6 @@ class RPCBase:
return {
key: self._create_widget_from_msg_result(val) for key, val in msg_result.items()
}
rpc_enabled = msg_result.get("__rpc__", True)
if rpc_enabled is False:
return None
msg_result = dict(msg_result)
cls = msg_result.pop("widget_class", None)
msg_result.pop("__rpc__", None)
@@ -32,8 +32,7 @@ class RPCWidgetHandler:
None
"""
self._widget_classes = (
get_custom_classes("bec_widgets", packages=("widgets", "applications"))
+ get_all_plugin_widgets()
get_custom_classes("bec_widgets") + get_all_plugin_widgets()
).as_dict(IGNORE_WIDGETS)
def create_widget(self, widget_type, **kwargs) -> BECWidget:
@@ -8,7 +8,6 @@ import sys
from contextlib import redirect_stderr, redirect_stdout
import darkdetect
import shiboken6
from bec_lib.logger import bec_logger
from bec_lib.service_config import ServiceConfig
from bec_qthemes import apply_theme
@@ -19,8 +18,8 @@ from qtpy.QtWidgets import QApplication
import bec_widgets
from bec_widgets.applications.launch_window import LaunchWindow
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.bec_dispatcher import BECDispatcher
from bec_widgets.utils.rpc_register import RPCRegister
logger = bec_logger.logger
@@ -94,7 +93,6 @@ class GUIServer:
"""
Run the GUI server.
"""
logger.info("Starting GUIServer", repr(self))
self.app = QApplication(sys.argv)
if darkdetect.isDark():
apply_theme("dark")
@@ -103,11 +101,11 @@ class GUIServer:
self.app.setApplicationName("BEC")
self.app.gui_id = self.gui_id # type: ignore
self.app.gui_server = self # type: ignore # make server accessible from QApplication for getattr in widgets
self.setup_bec_icon()
service_config = self._get_service_config()
self.dispatcher = BECDispatcher(config=service_config, gui_id=self.gui_id)
# self.dispatcher.start_cli_server(gui_id=self.gui_id)
if self.gui_class:
self.launcher_window = LaunchWindow(
@@ -120,7 +118,7 @@ class GUIServer:
self.launcher_window.setAttribute(Qt.WA_ShowWithoutActivating) # type: ignore
self.app.aboutToQuit.connect(self.shutdown)
self.app.setQuitOnLastWindowClosed(True)
self.app.setQuitOnLastWindowClosed(False)
def sigint_handler(*args):
# display message, for people to let it terminate gracefully
@@ -129,7 +127,8 @@ class GUIServer:
with RPCRegister.delayed_broadcast():
for widget in QApplication.instance().topLevelWidgets(): # type: ignore
widget.close()
self.shutdown()
if self.app:
self.app.quit()
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
@@ -150,10 +149,9 @@ class GUIServer:
self.app.setWindowIcon(icon)
def shutdown(self):
logger.info("Shutdown GUIServer", repr(self))
if self.launcher_window and shiboken6.isValid(self.launcher_window):
self.launcher_window.close()
self.launcher_window.deleteLater()
"""
Shutdown the GUI server.
"""
if pylsp_server.is_running():
pylsp_server.stop()
if self.dispatcher:
@@ -25,8 +25,8 @@ from qtpy.QtWidgets import (
QWidget,
)
from bec_widgets.cli.rpc.rpc_widget_handler import widget_handler
from bec_widgets.utils.colors import apply_theme
from bec_widgets.utils.rpc_widget_handler import widget_handler
from bec_widgets.utils.widget_io import WidgetHierarchy as wh
from bec_widgets.widgets.editors.jupyter_console.jupyter_console import BECJupyterConsole
+1 -8
View File
@@ -1,7 +1,6 @@
# pylint: skip-file
from unittest.mock import MagicMock
from bec_lib.config_helper import ConfigHelper
from bec_lib.device import Device as BECDevice
from bec_lib.device import Positioner as BECPositioner
from bec_lib.device import ReadoutPriority
@@ -220,9 +219,7 @@ class Device(FakeDevice):
class DMMock:
def __init__(self, *args, **kwargs):
self._service = args[0]
self.config_helper = ConfigHelper(self._service.connector, self._service._service_name)
def __init__(self):
self.devices = DeviceContainer()
self.enabled_devices = [device for device in self.devices if device.enabled]
@@ -276,10 +273,6 @@ class DMMock:
configs.append(device._config)
return configs
def initialize(*_): ...
def shutdown(self): ...
DEVICES = [
FakePositioner("samx", limits=[-10, 10], read_value=2.0),
+12
View File
@@ -1 +1,13 @@
from qtpy.QtWebEngineWidgets import QWebEngineView
from .bec_connector import BECConnector, ConnectionConfig
from .bec_dispatcher import BECDispatcher
from .bec_table import BECTable
from .colors import Colors
from .container_utils import WidgetContainerUtils
from .crosshair import Crosshair
from .entry_validator import EntryValidator
from .layout_manager import GridLayoutManager
from .rpc_decorator import register_rpc_methods, rpc_public
from .ui_loader import UILoader
from .validator_delegate import DoubleValidationDelegate
+11 -59
View File
@@ -12,17 +12,18 @@ import shiboken6 as shb
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import_from
from pydantic import BaseModel, Field, field_validator
from qtpy.QtCore import Property, QObject, QRunnable, QThreadPool, Signal
from qtpy.QtCore import Property, QObject, QRunnable, QThreadPool, QTimer, Signal
from qtpy.QtWidgets import QApplication
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.error_popups import ErrorPopupUtility, SafeSlot
from bec_widgets.utils.name_utils import sanitize_namespace
from bec_widgets.utils.rpc_register import RPCRegister
from bec_widgets.utils.widget_io import WidgetHierarchy
from bec_widgets.utils.yaml_dialog import load_yaml, load_yaml_gui, save_yaml, save_yaml_gui
if TYPE_CHECKING: # pragma: no cover
from bec_widgets.utils.bec_dispatcher import BECDispatcher
from bec_widgets.widgets.containers.dock import BECDock
else:
BECDispatcher = lazy_import_from("bec_widgets.utils.bec_dispatcher", ("BECDispatcher",))
@@ -88,8 +89,6 @@ class BECConnector:
gui_id: str | None = None,
object_name: str | None = None,
root_widget: bool = False,
rpc_exposed: bool = True,
rpc_passthrough_children: bool = True,
**kwargs,
):
"""
@@ -101,10 +100,6 @@ class BECConnector:
gui_id(str, optional): The GUI ID.
object_name(str, optional): The object name.
root_widget(bool, optional): If set to True, the parent_id will be always set to None, thus enforcing that the widget is accessible as a root widget of the BECGuiClient object.
rpc_exposed(bool, optional): If set to False, this instance is excluded from RPC registry broadcast and CLI namespace discovery.
rpc_passthrough_children(bool, optional): Only relevant when ``rpc_exposed=False``.
If True, RPC-visible children rebind to the next visible ancestor.
If False (default), children stay hidden behind this widget.
**kwargs:
"""
# Extract object_name from kwargs to not pass it to Qt class
@@ -133,13 +128,8 @@ class BECConnector:
# the function depends on BECClient, and BECDispatcher
@SafeSlot()
def terminate(client=self.client, dispatcher=self.bec_dispatcher):
app = QApplication.instance()
gui_server = getattr(app, "gui_server", None)
if gui_server and hasattr(gui_server, "shutdown"):
gui_server.shutdown()
logger.info("Disconnecting", repr(dispatcher))
dispatcher.disconnect_all()
dispatcher.stop_cli_server()
try: # shutdown ophyd threads if any
from ophyd._pyepics_shim import _dispatcher
@@ -167,7 +157,7 @@ class BECConnector:
)
self.config = ConnectionConfig(widget_class=self.__class__.__name__)
# If the gui_id is passed, it should be respected. However, this should be revisited since
# If the gui_id is passed, it should be respected. However, this should be revisted since
# the gui_id has to be unique, and may no longer be.
if gui_id:
self.config.gui_id = gui_id
@@ -195,11 +185,6 @@ class BECConnector:
# If set to True, the parent_id will be always set to None, thus enforcing that the widget is accessible as a root widget of the BECGuiClient object.
self.root_widget = root_widget
# If set to False, this instance is not exposed through RPC at all.
self.rpc_exposed = bool(rpc_exposed)
# If True on a hidden parent (rpc_exposed=False), children can bubble up to
# the next visible RPC ancestor.
self.rpc_passthrough_children = bool(rpc_passthrough_children)
self._update_object_name()
@@ -208,41 +193,11 @@ class BECConnector:
try:
if self.root_widget:
return None
connector_parent = self._get_rpc_parent_ancestor()
connector_parent = WidgetHierarchy._get_becwidget_ancestor(self)
return connector_parent.gui_id if connector_parent else None
except:
logger.error(f"Error getting parent_id for {self.__class__.__name__}")
def _get_rpc_parent_ancestor(self) -> BECConnector | None:
"""
Find the nearest ancestor that is RPC-addressable.
Rules:
- If an ancestor has ``rpc_exposed=False``, it is an explicit visibility
boundary unless ``rpc_passthrough_children=True``.
- If an ancestor has ``RPC=False`` (but remains rpc_exposed), it is treated
as structural and children continue to the next ancestor.
- Lookup always happens through ``WidgetHierarchy.get_becwidget_ancestor``
so plain ``QWidget`` nodes between connectors are ignored.
"""
current = self
while True:
parent = WidgetHierarchy.get_becwidget_ancestor(current)
if parent is None:
return None
if not getattr(parent, "rpc_exposed", True):
if getattr(parent, "rpc_passthrough_children", False):
current = parent
continue
return parent
if getattr(parent, "RPC", True):
return parent
current = parent
return None
def change_object_name(self, name: str) -> None:
"""
Change the object name of the widget. Unregister old name and register the new one.
@@ -261,9 +216,8 @@ class BECConnector:
"""
# 1) Enforce unique objectName among siblings with the same BECConnector parent
self._enforce_unique_sibling_name()
# 2) Register the object for RPC unless instance-level exposure is disabled.
if getattr(self, "rpc_exposed", True):
self.rpc_register.add_rpc(self)
# 2) Register the object for RPC
self.rpc_register.add_rpc(self)
try:
self.name_established.emit(self.object_name)
except RuntimeError as e:
@@ -281,7 +235,7 @@ class BECConnector:
if not shb.isValid(self):
return
parent_bec = WidgetHierarchy.get_becwidget_ancestor(self)
parent_bec = WidgetHierarchy._get_becwidget_ancestor(self)
if parent_bec:
# We have a parent => only compare with siblings under that parent
@@ -291,7 +245,7 @@ class BECConnector:
# Use RPCRegister to avoid QApplication.allWidgets() during event processing.
connections = self.rpc_register.list_all_connections().values()
all_bec = [w for w in connections if isinstance(w, BECConnector) and shb.isValid(w)]
siblings = [w for w in all_bec if WidgetHierarchy.get_becwidget_ancestor(w) is None]
siblings = [w for w in all_bec if WidgetHierarchy._get_becwidget_ancestor(w) is None]
# Collect used names among siblings
used_names = {sib.objectName() for sib in siblings if sib is not self}
@@ -319,8 +273,6 @@ class BECConnector:
Args:
name (str): The new object name.
"""
# sanitize before setting to avoid issues with Qt object names and RPC namespaces
name = sanitize_namespace(name)
super().setObjectName(name)
self.object_name = name
if self.rpc_register.object_is_registered(self):
@@ -399,7 +351,7 @@ class BECConnector:
"""
self.config = config
# FIXME some thoughts are required to decide how this should work with rpc registry
# FIXME some thoughts are required to decide how thhis should work with rpc registry
def apply_config(self, config: dict, generate_new_id: bool = True) -> None:
"""
Apply the configuration to the widget.
@@ -417,7 +369,7 @@ class BECConnector:
else:
self.gui_id = self.config.gui_id
# FIXME some thoughts are required to decide how this should work with rpc registry
# FIXME some thoughts are required to decide how thhis should work with rpc registry
def load_config(self, path: str | None = None, gui: bool = False):
"""
Load the configuration of the widget from YAML.
+12 -14
View File
@@ -123,16 +123,17 @@ class BECDispatcher:
self._registered_slots: DefaultDict[Hashable, QtThreadSafeCallback] = (
collections.defaultdict()
)
self.client = client
if client is None:
if config is not None and not isinstance(config, ServiceConfig):
# config is supposed to be a path
config = ServiceConfig(config)
if self.client is None:
if config is not None:
if not isinstance(config, ServiceConfig):
# config is supposed to be a path
config = ServiceConfig(config)
self.client = BECClient(
config=config, connector_cls=QtRedisConnector, name="BECWidgets"
)
else:
self.client = client
if self.client.started:
# have to reinitialize client to use proper connector
logger.info("Shutting down BECClient to switch to QtRedisConnector")
@@ -175,15 +176,12 @@ class BECDispatcher:
cb_info (dict | None): A dictionary containing information about the callback. Defaults to None.
"""
qt_slot = QtThreadSafeCallback(cb=slot, cb_info=cb_info)
if not self.client.connector.any_stream_is_registered(topics, qt_slot):
if qt_slot not in self._registered_slots:
self._registered_slots[qt_slot] = qt_slot
qt_slot = self._registered_slots[qt_slot]
self.client.connector.register(topics, cb=qt_slot, **kwargs)
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
qt_slot.topics.update(set(topics_str))
else:
logger.warning(f"Attempted to create duplicate stream subscription for {topics=}")
if qt_slot not in self._registered_slots:
self._registered_slots[qt_slot] = qt_slot
qt_slot = self._registered_slots[qt_slot]
self.client.connector.register(topics, cb=qt_slot, **kwargs)
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
qt_slot.topics.update(set(topics_str))
def disconnect_slot(
self, slot: Callable, topics: EndpointInfo | str | list[EndpointInfo] | list[str]
-87
View File
@@ -1,87 +0,0 @@
"""
Login dialog for user authentication.
The Login Widget is styled in a Material Design style and emits
the entered credentials through a signal for further processing.
"""
from qtpy.QtCore import Qt, Signal
from qtpy.QtWidgets import QLabel, QLineEdit, QPushButton, QVBoxLayout, QWidget
class BECLogin(QWidget):
"""Login dialog for user authentication in Material Design style."""
credentials_entered = Signal(str, str)
def __init__(self, parent=None):
super().__init__(parent=parent)
# Only displayed if this widget as standalone widget, and not embedded in another widget
self.setWindowTitle("Login")
title = QLabel("Sign in", parent=self)
title.setAlignment(Qt.AlignmentFlag.AlignCenter)
title.setStyleSheet("""
#QLabel
{
font-size: 18px;
font-weight: 600;
}
""")
self.username = QLineEdit(parent=self)
self.username.setPlaceholderText("Username")
self.password = QLineEdit(parent=self)
self.password.setPlaceholderText("Password")
self.password.setEchoMode(QLineEdit.EchoMode.Password)
self.ok_btn = QPushButton("Sign in", parent=self)
self.ok_btn.setDefault(True)
self.ok_btn.clicked.connect(self._emit_credentials)
# If the user presses Enter in the password field, trigger the OK button click
self.password.returnPressed.connect(self.ok_btn.click)
# Build Layout
layout = QVBoxLayout(self)
layout.setContentsMargins(32, 32, 32, 32)
layout.setSpacing(16)
layout.addWidget(title)
layout.addSpacing(8)
layout.addWidget(self.username)
layout.addWidget(self.password)
layout.addSpacing(12)
layout.addWidget(self.ok_btn)
self.username.setFocus()
self.setStyleSheet("""
QLineEdit {
padding: 8px;
}
""")
def _clear_password(self):
"""Clear the password field."""
self.password.clear()
def _emit_credentials(self):
"""Emit credentials and clear the password field."""
self.credentials_entered.emit(self.username.text().strip(), self.password.text())
self._clear_password()
if __name__ == "__main__": # pragma: no cover
import sys
from bec_qthemes import apply_theme
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
apply_theme("light")
dialog = BECLogin()
dialog.credentials_entered.connect(lambda u, p: print(f"Username: {u}, Password: {p}"))
dialog.show()
sys.exit(app.exec_())
+1 -1
View File
@@ -10,11 +10,11 @@ from qtpy.QtGui import QFont, QPixmap
from qtpy.QtWidgets import QApplication, QFileDialog, QLabel, QVBoxLayout, QWidget
import bec_widgets.widgets.containers.qt_ads as QtAds
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.bec_connector import BECConnector, ConnectionConfig
from bec_widgets.utils.busy_loader import install_busy_loader
from bec_widgets.utils.error_popups import SafeConnect, SafeSlot
from bec_widgets.utils.rpc_decorator import rpc_timeout
from bec_widgets.utils.rpc_register import RPCRegister
from bec_widgets.utils.widget_io import WidgetHierarchy
from bec_widgets.widgets.utility.spinner.spinner import SpinnerWidget
+1 -1
View File
@@ -43,7 +43,7 @@ class WidgetContainerUtils:
if list_of_names is None:
list_of_names = []
ii = 0
while ii < 1000: # 1000 is arbitrary!
while ii < 1000: # 1000 is arbritrary!
name_candidate = f"{name}_{ii}"
if name_candidate not in list_of_names:
return name_candidate
+1 -3
View File
@@ -106,9 +106,7 @@ class TypedForm(BECWidget, QWidget):
def _add_griditem(self, item: FormItemSpec, row: int):
grid = self._form_grid.layout()
# Use title from FieldInfo if available, otherwise use the property name
label_text = item.info.title if item.info.title else item.name
label = QLabel(parent=self._form_grid, text=label_text)
label = QLabel(parent=self._form_grid, text=item.name)
label.setProperty("_model_field_name", item.name)
label.setToolTip(item.info.description or item.name)
grid.addWidget(label, row, 0)
+2 -4
View File
@@ -71,7 +71,7 @@ class FormItemSpec(BaseModel):
"""
The specification for an item in a dynamically generated form. Uses a pydantic FieldInfo
to store most annotation info, since one of the main purposes is to store data for
forms generated from pydantic models, but can also be composed from other sources or by hand.
forms genrated from pydantic models, but can also be composed from other sources or by hand.
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
@@ -192,7 +192,7 @@ class DynamicFormItem(QWidget):
@abstractmethod
def _add_main_widget(self) -> None:
self._main_widget: QWidget
"""Add the main data entry widget to self._main_widget and apply any
"""Add the main data entry widget to self._main_widget and appply any
constraints from the field info"""
@SafeSlot()
@@ -231,8 +231,6 @@ class StrFormItem(DynamicFormItem):
def __init__(self, parent: QWidget | None = None, *, spec: FormItemSpec) -> None:
super().__init__(parent=parent, spec=spec)
self._main_widget.textChanged.connect(self._value_changed)
if spec.info.description:
self._main_widget.setPlaceholderText(spec.info.description)
def _add_main_widget(self) -> None:
self._main_widget = QLineEdit()
-35
View File
@@ -1,35 +0,0 @@
"""Module providing fuzzy search utilities for the BEC widgets."""
from __future__ import annotations
from typing import Any
from thefuzz import fuzz
FUZZY_SEARCH_THRESHOLD = 80
def is_match(
text: str, row_data: dict[str, Any], relevant_keys: list[str], enable_fuzzy: bool
) -> bool:
"""
Check if the text matches any of the relevant keys in the row data.
Args:
text (str): The text to search for.
row_data (dict[str, Any]): The row data to search in.
relevant_keys (list[str]): The keys to consider for searching.
enable_fuzzy (bool): Whether to use fuzzy matching.
Returns:
bool: True if a match is found, False otherwise.
"""
for key in relevant_keys:
data = str(row_data.get(key, "") or "")
if enable_fuzzy:
match_ratio = fuzz.partial_ratio(text.lower(), data.lower())
if match_ratio >= FUZZY_SEARCH_THRESHOLD:
return True
else:
if text.lower() in data.lower():
return True
return False
+31 -119
View File
@@ -4,7 +4,7 @@ from __future__ import annotations
import sys
import weakref
from typing import Callable, Dict, List, Literal, TypedDict
from typing import Callable, Dict, List, TypedDict
from uuid import uuid4
import louie
@@ -12,18 +12,15 @@ from bec_lib.logger import bec_logger
from bec_qthemes import material_icon
from louie import saferef
from qtpy.QtCore import QEvent, QObject, QRect, Qt, Signal
from qtpy.QtGui import QAction, QColor, QKeySequence, QPainter, QPen, QShortcut
from qtpy.QtGui import QAction, QColor, QPainter, QPen
from qtpy.QtWidgets import (
QAbstractItemView,
QApplication,
QFrame,
QHBoxLayout,
QLabel,
QMainWindow,
QMenu,
QMenuBar,
QPushButton,
QTableWidgetItem,
QToolBar,
QVBoxLayout,
QWidget,
@@ -43,9 +40,9 @@ class TourStep(TypedDict):
widget_ref: (
louie.saferef.BoundMethodWeakref
| weakref.ReferenceType[
QWidget | QAction | QRect | Callable[[], tuple[QWidget | QAction | QRect, str | None]]
QWidget | QAction | Callable[[], tuple[QWidget | QAction, str | None]]
]
| Callable[[], tuple[QWidget | QAction | QRect, str | None]]
| Callable[[], tuple[QWidget | QAction, str | None]]
| None
)
text: str
@@ -67,13 +64,15 @@ class TutorialOverlay(QWidget):
box = QFrame(self)
app = QApplication.instance()
bg_color = app.palette().window().color()
box.setStyleSheet(f"""
box.setStyleSheet(
f"""
QFrame {{
background-color: {bg_color.name()};
border-radius: 8px;
padding: 8px;
}}
""")
"""
)
layout = QVBoxLayout(box)
# Top layout with close button (left) and step indicator (right)
@@ -104,12 +103,10 @@ class TutorialOverlay(QWidget):
# Back button with material icon
self.back_btn = QPushButton("Back")
self.back_btn.setIcon(material_icon("arrow_back"))
self.back_btn.setToolTip("Press Backspace to go back")
# Next button with material icon
self.next_btn = QPushButton("Next")
self.next_btn.setIcon(material_icon("arrow_forward"))
self.next_btn.setToolTip("Press Enter to continue")
btn_layout.addStretch()
btn_layout.addWidget(self.back_btn)
@@ -118,15 +115,6 @@ class TutorialOverlay(QWidget):
layout.addLayout(top_layout)
layout.addWidget(self.label)
layout.addLayout(btn_layout)
# Escape closes the tour
QShortcut(QKeySequence(Qt.Key.Key_Escape), self, activated=self.close_btn.click)
# Enter and Return activates the next button
QShortcut(QKeySequence(Qt.Key.Key_Return), self, activated=self.next_btn.click)
QShortcut(QKeySequence(Qt.Key.Key_Enter), self, activated=self.next_btn.click)
# Map Backspace to the back button
QShortcut(QKeySequence(Qt.Key.Key_Backspace), self, activated=self.back_btn.click)
return box
def paintEvent(self, event): # pylint: disable=unused-argument
@@ -235,9 +223,6 @@ class TutorialOverlay(QWidget):
self.message_box.show()
self.update()
# Update the focus policy of the buttons
self.back_btn.setEnabled(current_step > 1)
def eventFilter(self, obj, event):
if event.type() == QEvent.Type.Resize:
self.setGeometry(obj.rect())
@@ -277,9 +262,7 @@ class GuidedTour(QObject):
def register_widget(
self,
*,
widget: (
QWidget | QAction | QRect | Callable[[], tuple[QWidget | QAction | QRect, str | None]]
),
widget: QWidget | QAction | Callable[[], tuple[QWidget | QAction, str | None]],
text: str = "",
title: str = "",
) -> str:
@@ -287,7 +270,7 @@ class GuidedTour(QObject):
Register a widget with help text for tours.
Args:
widget (QWidget | QAction | QRect | Callable[[], tuple[QWidget | QAction | QRect, str | None]]): The target widget or a callable that returns the widget and its help text.
widget (QWidget | QAction | Callable[[], tuple[QWidget | QAction, str | None]]): The target widget or a callable that returns the widget and its help text.
text (str): The help text for the widget. This will be shown during the tour.
title (str, optional): A title for the widget (defaults to its class name or action text).
@@ -310,9 +293,6 @@ class GuidedTour(QObject):
widget_ref = _resolve_toolbar_button
default_title = getattr(widget, "tooltip", "Toolbar Menu")
elif isinstance(widget, QRect):
widget_ref = widget
default_title = "Area"
else:
widget_ref = saferef.safe_ref(widget)
default_title = widget.__class__.__name__ if hasattr(widget, "__class__") else "Widget"
@@ -347,14 +327,11 @@ class GuidedTour(QObject):
if mb and mb not in menubars:
menubars.append(mb)
menubars += [mb for mb in mw.findChildren(QMenuBar) if mb not in menubars]
menubars += [mb for mb in mw.findChildren(QMenu) if mb not in menubars]
for mb in menubars:
if action in mb.actions():
ar = mb.actionGeometry(action)
top_left = mb.mapTo(mw, ar.topLeft())
return QRect(top_left, ar.size())
return None
def unregister_widget(self, step_id: str) -> bool:
@@ -475,9 +452,9 @@ class GuidedTour(QObject):
if self._current_index > 0:
self._current_index -= 1
self._show_current_step(direction="prev")
self._show_current_step()
def _show_current_step(self, direction: Literal["next"] | Literal["prev"] = "next"):
def _show_current_step(self):
"""Display the current step."""
if not self._active or not self.overlay:
return
@@ -487,9 +464,7 @@ class GuidedTour(QObject):
target, step_text = self._resolve_step_target(step)
if target is None:
self._advance_past_invalid_step(
step_title, reason="Step target no longer exists.", direction=direction
)
self._advance_past_invalid_step(step_title, reason="Step target no longer exists.")
return
main_window = self.main_window
@@ -498,9 +473,7 @@ class GuidedTour(QObject):
self.stop_tour()
return
highlight_rect = self._get_highlight_rect(
main_window, target, step_title, direction=direction
)
highlight_rect = self._get_highlight_rect(main_window, target, step_title)
if highlight_rect is None:
return
@@ -510,6 +483,9 @@ class GuidedTour(QObject):
self.overlay.show_step(highlight_rect, step_title, step_text, current_step, total_steps)
# Update button states
self.overlay.back_btn.setEnabled(self._current_index > 0)
# Update next button text and state
is_last_step = self._current_index >= len(self._tour_steps) - 1
if is_last_step:
@@ -523,7 +499,7 @@ class GuidedTour(QObject):
self.step_changed.emit(self._current_index + 1, len(self._tour_steps))
def _resolve_step_target(self, step: TourStep) -> tuple[QWidget | QAction | QRect | None, str]:
def _resolve_step_target(self, step: TourStep) -> tuple[QWidget | QAction | None, str]:
"""
Resolve the target widget/action for the given step.
@@ -531,7 +507,7 @@ class GuidedTour(QObject):
step(TourStep): The tour step to resolve.
Returns:
tuple[QWidget | QAction | QRect | None, str]: The resolved target, optional QRect, and the step text.
tuple[QWidget | QAction | None, str]: The resolved target and the step text.
"""
widget_ref = step.get("widget_ref")
step_text = step.get("text", "")
@@ -544,7 +520,7 @@ class GuidedTour(QObject):
if target is None:
return None, step_text
if callable(target) and not isinstance(target, (QWidget, QAction, QRect)):
if callable(target) and not isinstance(target, (QWidget, QAction)):
result = target()
if isinstance(result, tuple):
target, alt_text = result
@@ -556,11 +532,7 @@ class GuidedTour(QObject):
return target, step_text
def _get_highlight_rect(
self,
main_window: QWidget,
target: QWidget | QAction | QRect,
step_title: str,
direction: Literal["next"] | Literal["prev"] = "next",
self, main_window: QWidget, target: QWidget | QAction, step_title: str
) -> QRect | None:
"""
Get the QRect in main_window coordinates to highlight for the given target.
@@ -573,15 +545,12 @@ class GuidedTour(QObject):
Returns:
QRect | None: The rectangle to highlight, or None if not found/visible.
"""
if isinstance(target, QRect):
return target
if isinstance(target, QAction):
rect = self._action_highlight_rect(target)
if rect is None:
self._advance_past_invalid_step(
step_title,
reason=f"Could not find visible widget or menu for QAction {target.text()!r}.",
direction=direction,
)
return None
return rect
@@ -590,60 +559,28 @@ class GuidedTour(QObject):
if self._visible_check:
if not target.isVisible():
self._advance_past_invalid_step(
step_title, reason=f"Widget {target!r} is not visible.", direction=direction
step_title, reason=f"Widget {target!r} is not visible."
)
return None
rect = target.rect()
top_left = target.mapTo(main_window, rect.topLeft())
return QRect(top_left, rect.size())
if isinstance(target, QTableWidgetItem):
# NOTE: On header items (which are also QTableWidgetItems), this does not work,
# Header items are just used as data containers by Qt, thus, we have to directly
# pass the QRect through the method (+ make sure the appropriate header section
# is visible). This can be handled in the callable method.)
table = target.tableWidget()
if self._visible_check:
if not table.isVisible():
self._advance_past_invalid_step(
step_title,
reason=f"Table widget {table!r} is not visible.",
direction=direction,
)
return None
# Table item
if table.item(target.row(), target.column()) == target:
table.scrollToItem(target, QAbstractItemView.ScrollHint.PositionAtCenter)
rect = table.visualItemRect(target)
top_left = table.viewport().mapTo(main_window, rect.topLeft())
return QRect(top_left, rect.size())
self._advance_past_invalid_step(
step_title, reason=f"Unsupported step target type: {type(target)}", direction=direction
step_title, reason=f"Unsupported step target type: {type(target)}"
)
return None
def _advance_past_invalid_step(
self, step_title: str, *, reason: str, direction: Literal["next"] | Literal["prev"] = "next"
):
def _advance_past_invalid_step(self, step_title: str, *, reason: str):
"""
Skip the current step (or stop the tour) when the target cannot be visualised.
"""
logger.warning(f"{reason} Skipping step {step_title!r}.")
if direction == "next":
if self._current_index < len(self._tour_steps) - 1:
self._current_index += 1
self._show_current_step()
else:
self.stop_tour()
elif direction == "prev":
if self._current_index > 0:
self._current_index -= 1
self._show_current_step(direction="prev")
else:
self.stop_tour()
logger.warning("%s Skipping step %r.", reason, step_title)
if self._current_index < len(self._tour_steps) - 1:
self._current_index += 1
self._show_current_step()
else:
self.stop_tour()
def get_registered_widgets(self) -> Dict[str, TourStep]:
"""Get all registered widgets."""
@@ -726,33 +663,8 @@ class MainWindow(QMainWindow): # pragma: no cover
title="Tools Menu",
)
sub_menu_action = self.tools_menu_actions["notes"].action
def get_sub_menu_action():
# open the tools menu
menu_button = self.tools_menu_action._button_ref()
if menu_button:
menu_button.showMenu()
return (
self.tools_menu_action.actions["notes"].action,
"This action allows you to add notes.",
)
sub_menu = self.guided_help.register_widget(
widget=get_sub_menu_action,
text="This is a sub-action within the tools menu.",
title="Add Note Action",
)
# Create tour from registered widgets
self.tour_step_ids = [
sub_menu,
primary_step,
secondary_step,
toolbar_action_step,
tools_menu_step,
]
self.tour_step_ids = [primary_step, secondary_step, toolbar_action_step, tools_menu_step]
widget_ids = self.tour_step_ids
self.guided_help.create_tour(widget_ids)
@@ -129,7 +129,7 @@ class HelpInspector(BECWidget, QtWidgets.QWidget):
# TODO check what happens if the HELP Inspector itself is embedded in another BECWidget
# I suppose we would like to get the first ancestor that is a BECWidget, not the topmost one
if not isinstance(widget, BECWidget):
widget = WidgetHierarchy.get_becwidget_ancestor(widget)
widget = WidgetHierarchy._get_becwidget_ancestor(widget)
if widget:
if widget is self:
self._toggle_mode(False)
+1 -1
View File
@@ -15,7 +15,7 @@ class Kind(IFBase):
"""
This is used in the .kind attribute of all OphydObj (Signals, Devices).
A Device examines its components' .kind attribute to decide whether to
A Device examines its components' .kind atttribute to decide whether to
traverse it in read(), read_configuration(), or neither. Additionally, if
decides whether to include its name in `hints['fields']`.
"""
@@ -22,7 +22,7 @@ class {plugin_name_pascal}Plugin(QDesignerCustomWidgetInterface): # pragma: no
def createWidget(self, parent):
if parent is None:
return QWidget()
return QWidget()
t = {plugin_name_pascal}(parent)
return t
+22 -39
View File
@@ -7,9 +7,9 @@ from dataclasses import dataclass
from typing import TYPE_CHECKING, Iterable
from bec_lib.plugin_helper import _get_available_plugins
from qtpy.QtWidgets import QWidget
from qtpy.QtWidgets import QGraphicsWidget, QWidget
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils import BECConnector
from bec_widgets.utils.bec_widget import BECWidget
if TYPE_CHECKING: # pragma: no cover
@@ -166,17 +166,18 @@ class BECClassContainer:
return [info.obj for info in self.collection]
def _collect_classes_from_package(repo_name: str, package: str) -> BECClassContainer:
"""Collect classes from a package subtree (for example ``widgets`` or ``applications``)."""
collection = BECClassContainer()
try:
anchor_module = importlib.import_module(f"{repo_name}.{package}")
except ModuleNotFoundError as exc:
# Some plugin repositories expose only one subtree. Skip gracefully if it does not exist.
if exc.name == f"{repo_name}.{package}":
return collection
raise
def get_custom_classes(repo_name: str) -> BECClassContainer:
"""
Get all RPC-enabled classes in the specified repository.
Args:
repo_name(str): The name of the repository.
Returns:
dict: A dictionary with keys "connector_classes" and "top_level_classes" and values as lists of classes.
"""
collection = BECClassContainer()
anchor_module = importlib.import_module(f"{repo_name}.widgets")
directory = os.path.dirname(anchor_module.__file__)
for root, _, files in sorted(os.walk(directory)):
for file in files:
@@ -184,13 +185,13 @@ def _collect_classes_from_package(repo_name: str, package: str) -> BECClassConta
continue
path = os.path.join(root, file)
rel_dir = os.path.dirname(os.path.relpath(path, directory))
if rel_dir in ("", "."):
subs = os.path.dirname(os.path.relpath(path, directory)).split("/")
if len(subs) == 1 and not subs[0]:
module_name = file.split(".")[0]
else:
module_name = ".".join(rel_dir.split(os.sep) + [file.split(".")[0]])
module_name = ".".join(subs + [file.split(".")[0]])
module = importlib.import_module(f"{repo_name}.{package}.{module_name}")
module = importlib.import_module(f"{repo_name}.widgets.{module_name}")
for name in dir(module):
obj = getattr(module, name)
@@ -202,30 +203,12 @@ def _collect_classes_from_package(repo_name: str, package: str) -> BECClassConta
class_info.is_connector = True
if issubclass(obj, QWidget) or issubclass(obj, BECWidget):
class_info.is_widget = True
if len(subs) == 1 and (
issubclass(obj, QWidget) or issubclass(obj, QGraphicsWidget)
):
class_info.is_top_level = True
if hasattr(obj, "PLUGIN") and obj.PLUGIN:
class_info.is_plugin = True
collection.add_class(class_info)
return collection
def get_custom_classes(
repo_name: str, packages: tuple[str, ...] | None = None
) -> BECClassContainer:
"""
Get all relevant classes for RPC/CLI in the specified repository.
By default, discovery is limited to ``<repo>.widgets`` for backward compatibility.
Additional package subtrees (for example ``applications``) can be included explicitly.
Args:
repo_name(str): The name of the repository.
packages(tuple[str, ...] | None): Optional tuple of package names to scan. Defaults to ("widgets",) for backward compatibility.
Returns:
BECClassContainer: Container with collected class information.
"""
selected_packages = packages or ("widgets",)
collection = BECClassContainer()
for package in selected_packages:
collection += _collect_classes_from_package(repo_name, package)
return collection
+4 -2
View File
@@ -69,11 +69,13 @@ class RoundedFrame(QFrame):
"""
Update the style of the frame based on the background color.
"""
self.setStyleSheet(f"""
self.setStyleSheet(
f"""
QFrame#roundedFrame {{
border-radius: {self._radius}px;
}}
""")
"""
)
self.apply_plot_widget_style()
def apply_plot_widget_style(self, border: str = "none"):
+25 -101
View File
@@ -4,24 +4,20 @@ import functools
import traceback
import types
from contextlib import contextmanager
from typing import TYPE_CHECKING, Callable, Literal, TypeVar
from typing import TYPE_CHECKING, Callable, TypeVar
from bec_lib.client import BECClient
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import
from qtpy.QtCore import Qt, QTimer
from qtpy.QtWidgets import QWidget
from redis.exceptions import RedisError
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils import BECDispatcher
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.bec_dispatcher import BECDispatcher
from bec_widgets.utils.container_utils import WidgetContainerUtils
from bec_widgets.utils.error_popups import ErrorPopupUtility
from bec_widgets.utils.rpc_register import RPCRegister
from bec_widgets.utils.screen_utils import apply_window_geometry
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow, BECMainWindowNoRPC
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow
if TYPE_CHECKING: # pragma: no cover
from bec_lib import messages
@@ -118,14 +114,11 @@ class RPCServer:
logger.debug(f"Received RPC instruction: {msg}, metadata: {metadata}")
with rpc_exception_hook(functools.partial(self.send_response, request_id, False)):
try:
obj = self.get_object_from_config(msg["parameter"])
method = msg["action"]
args = msg["parameter"].get("args", [])
kwargs = msg["parameter"].get("kwargs", {})
if method.startswith("system."):
res = self.run_system_rpc(method, args, kwargs)
else:
obj = self.get_object_from_config(msg["parameter"])
res = self.run_rpc(obj, method, args, kwargs)
res = self.run_rpc(obj, method, args, kwargs)
except Exception:
content = traceback.format_exc()
logger.error(f"Error while executing RPC instruction: {content}")
@@ -156,7 +149,7 @@ class RPCServer:
if method == "raise" and hasattr(
obj, "setWindowState"
): # special case for raising windows, should work even if minimized
# this is a special case for raising windows for gnome on Red Hat (RHEL) 9 systems where changing focus is suppressed by default
# this is a special case for raising windows for gnome on rethat 9 systems where changing focus is supressed by default
# The procedure is as follows:
# 1. Get the current window state to check if the window is minimized and remove minimized flag
# 2. Then in order to force gnome to raise the window, we set the window to stay on top temporarily
@@ -181,96 +174,18 @@ class RPCServer:
obj.show()
res = None
else:
target_obj, method_obj = self._resolve_rpc_target(obj, method)
method_obj = getattr(obj, method)
# check if the method accepts args and kwargs
if not callable(method_obj):
if not args:
res = method_obj
else:
setattr(target_obj, method, args[0])
setattr(obj, method, args[0])
res = None
else:
res = method_obj(*args, **kwargs)
return res
def _resolve_rpc_target(self, obj, method: str) -> tuple[object, object]:
"""
Resolve a method/property access target for RPC execution.
Primary target is the object itself. If not found there and the class defines
``RPC_CONTENT_CLASS``, unresolved method names can be delegated to the content
widget referenced by ``RPC_CONTENT_ATTR`` (default ``content``), but only when
the method is explicitly listed in the content class ``USER_ACCESS``.
"""
if hasattr(obj, method):
return obj, getattr(obj, method)
content_cls = getattr(type(obj), "RPC_CONTENT_CLASS", None)
if content_cls is None:
raise AttributeError(f"{type(obj).__name__} has no attribute '{method}'")
content_user_access = set()
for entry in getattr(content_cls, "USER_ACCESS", []):
if entry.endswith(".setter"):
content_user_access.add(entry.split(".setter")[0])
else:
content_user_access.add(entry)
if method not in content_user_access:
raise AttributeError(f"{type(obj).__name__} has no attribute '{method}'")
content_attr = getattr(type(obj), "RPC_CONTENT_ATTR", "content")
target_obj = getattr(obj, content_attr, None)
if target_obj is None:
raise AttributeError(
f"{type(obj).__name__} has no content target '{content_attr}' for RPC delegation"
)
if not isinstance(target_obj, content_cls):
raise AttributeError(
f"{type(obj).__name__}.{content_attr} is not instance of {content_cls.__name__}"
)
if not hasattr(target_obj, method):
raise AttributeError(f"{content_cls.__name__} has no attribute '{method}'")
return target_obj, getattr(target_obj, method)
def run_system_rpc(self, method: str, args: list, kwargs: dict):
if method == "system.launch_dock_area":
return self._launch_dock_area(*args, **kwargs)
if method == "system.list_capabilities":
return {"system.launch_dock_area": True}
raise ValueError(f"Unknown system RPC method: {method}")
@staticmethod
def _launch_dock_area(
name: str | None = None,
geometry: tuple[int, int, int, int] | None = None,
startup_profile: str | Literal["restore", "skip"] | None = None,
) -> QWidget | None:
from bec_widgets.applications import bw_launch
with RPCRegister.delayed_broadcast() as rpc_register:
existing_dock_areas = rpc_register.get_names_of_rpc_by_class_type(BECDockArea)
if name is not None:
WidgetContainerUtils.raise_for_invalid_name(name)
if name in existing_dock_areas:
name = WidgetContainerUtils.generate_unique_name(name, existing_dock_areas)
else:
name = WidgetContainerUtils.generate_unique_name("dock_area", existing_dock_areas)
result_widget = bw_launch.dock_area(object_name=name, startup_profile=startup_profile)
result_widget.window().setWindowTitle(f"BEC - {name}")
if isinstance(result_widget, BECMainWindow):
apply_window_geometry(result_widget, geometry)
result_widget.show()
else:
window = BECMainWindowNoRPC()
window.setCentralWidget(result_widget)
window.setWindowTitle(f"BEC - {result_widget.objectName()}")
apply_window_geometry(window, geometry)
window.show()
return result_widget
def serialize_result_and_send(self, request_id: str, res: object):
"""
Serialize the result of an RPC call and send it back to the client.
@@ -340,9 +255,6 @@ class RPCServer:
# Respect RPC = False
if getattr(obj, "RPC", True) is False:
return None
# Respect rpc_exposed = False
if getattr(obj, "rpc_exposed", True) is False:
return None
return self._serialize_bec_connector(obj, wait=True)
def emit_heartbeat(self) -> None:
@@ -371,8 +283,6 @@ class RPCServer:
continue
if not getattr(val, "RPC", True):
continue
if not getattr(val, "rpc_exposed", True):
continue
data[key] = self._serialize_bec_connector(val)
if self._broadcasted_data == data:
return
@@ -423,9 +333,23 @@ class RPCServer:
"widget_class": widget_class,
"config": config_dict,
"container_proxy": container_proxy,
"__rpc__": getattr(connector, "rpc_exposed", True),
"__rpc__": True,
}
@staticmethod
def _get_becwidget_ancestor(widget: QObject) -> BECConnector | None:
"""
Traverse up the parent chain to find the nearest BECConnector.
Returns None if none is found.
"""
parent = widget.parent()
while parent is not None:
if isinstance(parent, BECConnector):
return parent
parent = parent.parent()
return None
# Suppose clients register callbacks to receive updates
def add_registry_update_callback(self, cb: Callable) -> None:
"""
@@ -442,5 +366,5 @@ class RPCServer:
self.status = messages.BECStatus.IDLE
self._heartbeat_timer.stop()
self.emit_heartbeat()
logger.info("Succeeded in shutting down CLI server")
logger.info("Succeded in shutting down CLI server")
self.client.shutdown()
+256 -7
View File
@@ -5,7 +5,8 @@ import os
import weakref
from abc import ABC, abstractmethod
from contextlib import contextmanager
from typing import Dict, Literal
from enum import Enum
from typing import Dict, Literal, Union
from bec_lib.device import ReadoutPriority
from bec_lib.logger import bec_logger
@@ -15,6 +16,7 @@ from qtpy.QtGui import QAction, QColor, QIcon # type: ignore
from qtpy.QtWidgets import (
QApplication,
QComboBox,
QGraphicsDropShadowEffect,
QHBoxLayout,
QLabel,
QMenu,
@@ -26,6 +28,7 @@ from qtpy.QtWidgets import (
)
import bec_widgets
from bec_widgets.utils.colors import AccentColors, get_accent_colors
from bec_widgets.utils.toolbars.splitter import ResizableSpacer
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import BECDeviceFilter
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
@@ -35,19 +38,16 @@ logger = bec_logger.logger
MODULE_PATH = os.path.dirname(bec_widgets.__file__)
def create_action_with_text(toolbar_action, toolbar: QToolBar, min_size: QSize | None = None):
def create_action_with_text(toolbar_action, toolbar: QToolBar):
"""
Helper function to create a toolbar button with text beside or under the icon.
Args:
toolbar_action(ToolBarAction): The toolbar action to create the button for.
toolbar(ModularToolBar): The toolbar to add the button to.
min_size(QSize, optional): The minimum size for the button. Defaults to None.
"""
btn = QToolButton(parent=toolbar)
if min_size is not None:
btn.setMinimumSize(min_size)
if getattr(toolbar_action, "label_text", None):
toolbar_action.action.setText(toolbar_action.label_text)
if getattr(toolbar_action, "tooltip", None):
@@ -105,6 +105,205 @@ class LongPressToolButton(QToolButton):
self.showMenu()
class StatusState(str, Enum):
DEFAULT = "default"
HIGHLIGHT = "highlight"
WARNING = "warning"
EMERGENCY = "emergency"
SUCCESS = "success"
class StatusIndicatorWidget(QWidget):
"""Pill-shaped status indicator with icon + label using accent colors."""
def __init__(
self, parent=None, text: str = "Ready", state: StatusState | str = StatusState.DEFAULT
):
super().__init__(parent)
self.setObjectName("StatusIndicatorWidget")
self._text = text
self._state = self._normalize_state(state)
self._theme_connected = False
layout = QHBoxLayout(self)
layout.setContentsMargins(6, 2, 8, 2)
layout.setSpacing(6)
self._icon_label = QLabel(self)
self._icon_label.setFixedSize(18, 18)
self._text_label = QLabel(self)
self._text_label.setText(self._text)
layout.addWidget(self._icon_label)
layout.addWidget(self._text_label)
# Give it a consistent pill height
self.setMinimumHeight(24)
self.setSizePolicy(QSizePolicy.Policy.Minimum, QSizePolicy.Policy.Fixed)
# Soft shadow similar to notification banners
self._shadow = QGraphicsDropShadowEffect(self)
self._shadow.setBlurRadius(18)
self._shadow.setOffset(0, 2)
self.setGraphicsEffect(self._shadow)
self._apply_state(self._state)
self._connect_theme_change()
def set_state(self, state: Union[StatusState, str]):
"""Update state and refresh visuals."""
self._state = self._normalize_state(state)
self._apply_state(self._state)
def set_text(self, text: str):
"""Update the displayed text."""
self._text = text
self._text_label.setText(text)
def _apply_state(self, state: StatusState):
palette = self._resolve_accent_colors()
color_attr = {
StatusState.DEFAULT: "default",
StatusState.HIGHLIGHT: "highlight",
StatusState.WARNING: "warning",
StatusState.EMERGENCY: "emergency",
StatusState.SUCCESS: "success",
}.get(state, "default")
base_color = getattr(palette, color_attr, None) or getattr(
palette, "default", QColor("gray")
)
# Apply style first (returns text color for label)
text_color = self._update_style(base_color, self._theme_fg_color())
theme_name = self._theme_name()
# Choose icon per state
icon_name_map = {
StatusState.DEFAULT: "check_circle",
StatusState.HIGHLIGHT: "check_circle",
StatusState.SUCCESS: "check_circle",
StatusState.WARNING: "warning",
StatusState.EMERGENCY: "dangerous",
}
icon_name = icon_name_map.get(state, "check_circle")
# Icon color:
# - Dark mode: follow text color (usually white) for high contrast.
# - Light mode: use a stronger version of the accent color for a colored glyph
# that stands out on the pastel pill background.
if theme_name == "light":
icon_q = QColor(base_color)
icon_color = icon_q.name(QColor.HexRgb)
else:
icon_color = text_color
icon = material_icon(
icon_name, size=(18, 18), convert_to_pixmap=False, filled=True, color=icon_color
)
if not icon.isNull():
self._icon_label.setPixmap(icon.pixmap(18, 18))
def _update_style(self, color: QColor, fg_color: QColor) -> str:
# Ensure the widget actually paints its own background
self.setAttribute(Qt.WidgetAttribute.WA_StyledBackground, True)
fg = QColor(fg_color)
text_color = fg.name(QColor.HexRgb)
theme_name = self._theme_name()
base = QColor(color)
start = QColor(base)
end = QColor(base)
border = QColor(base)
if theme_name == "light":
start.setAlphaF(0.20)
end.setAlphaF(0.06)
else:
start.setAlphaF(0.35)
end.setAlphaF(0.12)
border = border.darker(120)
# shadow color tuned per theme to match notification banners
if hasattr(self, "_shadow"):
if theme_name == "light":
shadow_color = QColor(15, 23, 42, 60) # softer shadow on light bg
else:
shadow_color = QColor(0, 0, 0, 160)
self._shadow.setColor(shadow_color)
# Use a fixed radius for a stable pill look inside toolbars
radius = 10
self.setStyleSheet(
f"""
#StatusIndicatorWidget {{
background-color: qlineargradient(spread:pad, x1:0, y1:0, x2:1, y2:1,
stop:0 {start.name(QColor.HexArgb)}, stop:1 {end.name(QColor.HexArgb)});
border: 1px solid {border.name(QColor.HexRgb)};
border-radius: {radius}px;
padding: 2px 8px;
}}
#StatusIndicatorWidget QLabel {{
color: {text_color};
background: transparent;
}}
"""
)
return text_color
def _theme_fg_color(self) -> QColor:
app = QApplication.instance()
theme = getattr(app, "theme", None)
if theme is not None and hasattr(theme, "color"):
try:
fg = theme.color("FG")
if isinstance(fg, QColor):
return fg
except Exception:
pass
palette = self._resolve_accent_colors()
base = getattr(palette, "default", QColor("white"))
luminance = (0.299 * base.red() + 0.587 * base.green() + 0.114 * base.blue()) / 255
return QColor("#000000") if luminance > 0.65 else QColor("#ffffff")
def _theme_name(self) -> str:
app = QApplication.instance()
theme = getattr(app, "theme", None)
name = getattr(theme, "theme", None)
if isinstance(name, str):
return name.lower()
return "dark"
def _connect_theme_change(self):
if self._theme_connected:
return
app = QApplication.instance()
theme = getattr(app, "theme", None)
if theme is not None and hasattr(theme, "theme_changed"):
try:
theme.theme_changed.connect(lambda _: self._apply_state(self._state))
self._theme_connected = True
except Exception:
pass
@staticmethod
def _normalize_state(state: Union[StatusState, str]) -> StatusState:
if isinstance(state, StatusState):
return state
try:
return StatusState(state)
except ValueError:
return StatusState.DEFAULT
@staticmethod
def _resolve_accent_colors() -> AccentColors:
return get_accent_colors()
class ToolBarAction(ABC):
"""
Abstract base class for toolbar actions.
@@ -151,6 +350,54 @@ class SeparatorAction(ToolBarAction):
toolbar.addSeparator()
class StatusIndicatorAction(ToolBarAction):
"""Toolbar action hosting a LED indicator and status text."""
def __init__(
self,
*,
text: str = "Ready",
state: Union[StatusState, str] = StatusState.DEFAULT,
tooltip: str | None = None,
):
super().__init__(icon_path=None, tooltip=tooltip or "View status", checkable=False)
self._text = text
self._state: StatusState = StatusIndicatorWidget._normalize_state(state)
self.widget: StatusIndicatorWidget | None = None
self.tooltip = tooltip or ""
def add_to_toolbar(self, toolbar: QToolBar, target: QWidget):
if (
self.widget is None
or self.widget.parent() is None
or self.widget.parent() is not toolbar
):
self.widget = StatusIndicatorWidget(parent=toolbar, text=self._text, state=self._state)
self.action = toolbar.addWidget(self.widget)
self.action.setText(self._text)
self.set_tooltip(self.tooltip)
def set_state(self, state: Union[StatusState, str]):
self._state = StatusIndicatorWidget._normalize_state(state)
if self.widget is not None:
self.widget.set_state(self._state)
def set_text(self, text: str):
self._text = text
if self.widget is not None:
self.widget.set_text(text)
if hasattr(self, "action") and self.action is not None:
self.action.setText(text)
def set_tooltip(self, tooltip: str | None):
"""Set tooltip on both the underlying widget and the QWidgetAction."""
self.tooltip = tooltip or ""
if self.widget is not None:
self.widget.setToolTip(self.tooltip)
if hasattr(self, "action") and self.action is not None:
self.action.setToolTip(self.tooltip)
class QtIconAction(IconAction):
def __init__(
self,
@@ -602,14 +849,16 @@ class ExpandableMenuAction(ToolBarAction):
button.setIcon(QIcon(self.icon_path))
button.setText(self.tooltip)
button.setPopupMode(QToolButton.ToolButtonPopupMode.InstantPopup)
button.setStyleSheet("""
button.setStyleSheet(
"""
QToolButton {
font-size: 14px;
}
QMenu {
font-size: 14px;
}
""")
"""
)
menu = QMenu(button)
for action_container in self.actions.values():
action: QAction = action_container.action
+4 -2
View File
@@ -106,7 +106,8 @@ class ResizableSpacer(QWidget):
self.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed)
self.setStyleSheet("""
self.setStyleSheet(
"""
ResizableSpacer {
background-color: transparent;
margin: 0px;
@@ -116,7 +117,8 @@ class ResizableSpacer(QWidget):
ResizableSpacer:hover {
background-color: rgba(100, 100, 200, 80);
}
""")
"""
)
self.setContentsMargins(0, 0, 0, 0)
+283
View File
@@ -0,0 +1,283 @@
from __future__ import annotations
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.messages import BeamlineStateConfig
from qtpy.QtCore import QObject, QTimer, Signal
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.toolbars.actions import StatusIndicatorAction, StatusState
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
logger = bec_logger.logger
class BECStatusBroker(BECConnector, QObject):
"""Listen to BEC beamline state endpoints and emit structured signals."""
_instance: "BECStatusBroker | None" = None
_initialized: bool = False
available_updated = Signal(list) # list of states available
status_updated = Signal(str, dict) # name, status update
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, parent=None, gui_id: str | None = None, client=None, **kwargs):
if self._initialized:
return
super().__init__(parent=parent, gui_id=gui_id, client=client, **kwargs)
self._watched: set[str] = set()
self.bec_dispatcher.connect_slot(
self.on_available, MessageEndpoints.available_beamline_states()
)
self._initialized = True
self.refresh_available()
def refresh_available(self):
"""Fetch the current set of beamline conditions once."""
try:
msg = self.client.connector.get_last(MessageEndpoints.available_beamline_states())
logger.info(f"StatusBroker: fetched available conditions payload: {msg}")
if msg:
self.on_available(msg.get("data").content, None)
except Exception as exc: # pragma: no cover - runtime env
logger.debug(f"Could not fetch available conditions: {exc}")
@SafeSlot(dict, dict)
def on_available(self, data: dict, meta: dict | None = None):
state_list = data.get("states") # latest one from the stream
self.available_updated.emit(state_list)
for state in state_list:
name = state.name
if name:
self.watch_state(name)
def watch_state(self, name: str):
"""Subscribe to updates for a single beamline state."""
if name in self._watched:
return
self._watched.add(name)
endpoint = MessageEndpoints.beamline_state(name)
logger.info(f"StatusBroker: watching state '{name}' on {endpoint.endpoint}")
self.bec_dispatcher.connect_slot(self.on_state, endpoint)
self.fetch_state(name)
def fetch_state(self, name: str):
"""Fetch the current value of a beamline state once."""
endpoint = MessageEndpoints.beamline_state(name)
try:
msg = self.client.connector.get_last(endpoint)
logger.info(f"StatusBroker: fetched state '{name}' payload: {msg}")
if msg:
self.on_state(msg.get("data").content, None)
except Exception as exc: # pragma: no cover - runtime env
logger.debug(f"Could not fetch state {name}: {exc}")
@SafeSlot(dict, dict)
def on_state(self, data: dict, meta: dict | None = None):
name = data.get("name")
if not name:
return
logger.info(f"StatusBroker: state update for '{name}' -> {data}")
self.status_updated.emit(str(name), data)
@classmethod
def reset_singleton(cls):
"""
Reset the singleton instance of the BECStatusBroker.
"""
cls._instance = None
cls._initialized = False
class StatusToolBar(ModularToolBar):
"""Status toolbar that auto-manages beamline state indicators."""
STATUS_MAP: dict[str, StatusState] = {
"valid": StatusState.SUCCESS,
"warning": StatusState.WARNING,
"invalid": StatusState.EMERGENCY,
}
def __init__(self, parent=None, names: list[str] | None = None, **kwargs):
super().__init__(parent=parent, orientation="horizontal", **kwargs)
self.setObjectName("StatusToolbar")
self._status_bundle = self.new_bundle("status")
self.show_bundles(["status"])
self._apply_status_toolbar_style()
self.allowed_names: set[str] | None = set(names) if names is not None else None
logger.info(f"StatusToolbar init allowed_names={self.allowed_names}")
self.broker = BECStatusBroker()
self.broker.available_updated.connect(self.on_available_updated)
self.broker.status_updated.connect(self.on_status_updated)
QTimer.singleShot(0, self.refresh_from_broker)
def refresh_from_broker(self) -> None:
if self.allowed_names is None:
self.broker.refresh_available()
else:
for name in self.allowed_names:
if not self.components.exists(name):
# Pre-create a placeholder pill so it is visible even before data arrives.
self.add_status_item(
name=name, text=name, state=StatusState.DEFAULT, tooltip=None
)
self.broker.watch_state(name)
def _apply_status_toolbar_style(self) -> None:
self.setStyleSheet(
"QToolBar#StatusToolbar {"
f" background-color: {self.background_color};"
" border: none;"
" border-bottom: 1px solid palette(mid);"
"}"
)
# -------- Slots for updates --------
@SafeSlot(list)
def on_available_updated(self, available_states: list):
"""Process the available states stream and start watching them."""
# Keep track of current names from the broker to remove stale ones.
current_names: set[str] = set()
for state in available_states:
if not isinstance(state, BeamlineStateConfig):
continue
name = state.name
title = state.title or name
if not name:
continue
current_names.add(name)
logger.info(f"StatusToolbar: discovered state '{name}' title='{title}'")
# auto-add unless filtered out
if self.allowed_names is None or name in self.allowed_names:
self.add_status_item(name=name, text=title, state=StatusState.DEFAULT, tooltip=None)
else:
# keep hidden but present for context menu toggling
self.add_status_item(name=name, text=title, state=StatusState.DEFAULT, tooltip=None)
act = self.components.get_action(name)
if act and act.action:
act.action.setVisible(False)
# Remove actions that are no longer present in available_states.
known_actions = [
n for n in self.components._components.keys() if n not in ("separator",)
] # direct access used for clean-up
for name in known_actions:
if name not in current_names:
logger.info(f"StatusToolbar: removing stale state '{name}'")
try:
self.components.remove_action(name)
except Exception as exc:
logger.warning(f"Failed to remove stale state '{name}': {exc}")
self.refresh()
@SafeSlot(str, dict)
def on_status_updated(self, name: str, payload: dict): # TODO finish update logic
"""Update a status pill when a state update arrives."""
state = self.STATUS_MAP.get(str(payload.get("status", "")).lower(), StatusState.DEFAULT)
action = self.components.get_action(name) if self.components.exists(name) else None
# Only update the label when a title is explicitly provided; otherwise keep current text.
title = payload.get("title") or None
text = title
if text is None and action is None:
text = payload.get("name") or name
if "label" in payload:
tooltip = payload.get("label") or ""
else:
tooltip = None
logger.info(
f"StatusToolbar: update state '{name}' -> state={state} text='{text}' tooltip='{tooltip}'"
)
self.set_status(name=name, text=text, state=state, tooltip=tooltip)
# -------- Items Management --------
def add_status_item(
self,
name: str,
*,
text: str = "Ready",
state: StatusState | str = StatusState.DEFAULT,
tooltip: str | None = None,
) -> StatusIndicatorAction | None:
"""
Add or update a named status item in the toolbar.
After you added all actions, call `toolbar.refresh()` to update the display.
Args:
name(str): Unique name for the status item.
text(str): Text to display in the status item.
state(StatusState | str): State of the status item.
tooltip(str | None): Optional tooltip for the status item.
Returns:
StatusIndicatorAction | None: The created or updated status action, or None if toolbar is not initialized.
"""
if self._status_bundle is None:
return
if self.components.exists(name):
return
action = StatusIndicatorAction(text=text, state=state, tooltip=tooltip)
return self.add_status_action(name, action)
def add_status_action(
self, name: str, action: StatusIndicatorAction
) -> StatusIndicatorAction | None:
"""
Attach an existing StatusIndicatorAction to the status toolbar.
After you added all actions, call `toolbar.refresh()` to update the display.
Args:
name(str): Unique name for the status item.
action(StatusIndicatorAction): The status action to add.
Returns:
StatusIndicatorAction | None: The added status action, or None if toolbar is not initialized.
"""
self.components.add_safe(name, action)
self.get_bundle("status").add_action(name)
self.refresh()
self.broker.fetch_state(name)
return action
def set_status(
self,
name: str = "main",
*,
state: StatusState | str | None = None,
text: str | None = None,
tooltip: str | None = None,
) -> None:
"""
Update the status item with the given name, creating it if necessary.
Args:
name(str): Unique name for the status item.
state(StatusState | str | None): New state for the status item.
text(str | None): New text for the status item.
"""
action = self.components.get_action(name) if self.components.exists(name) else None
if action is None:
action = self.add_status_item(
name, text=text or "Ready", state=state or "default", tooltip=tooltip
)
if action is None:
return
if state is not None:
action.set_state(state)
if text is not None:
action.set_text(text)
if tooltip is not None and hasattr(action, "set_tooltip"):
action.set_tooltip(tooltip)
+8 -4
View File
@@ -291,7 +291,8 @@ class ModularToolBar(QToolBar):
menu = QMenu(self)
theme = get_theme_name()
if theme == "dark":
menu.setStyleSheet("""
menu.setStyleSheet(
"""
QMenu {
background-color: rgba(50, 50, 50, 0.9);
border: 1px solid rgba(255, 255, 255, 0.2);
@@ -299,10 +300,12 @@ class ModularToolBar(QToolBar):
QMenu::item:selected {
background-color: rgba(0, 0, 255, 0.2);
}
""")
"""
)
else:
# Light theme styling
menu.setStyleSheet("""
menu.setStyleSheet(
"""
QMenu {
background-color: rgba(255, 255, 255, 0.9);
border: 1px solid rgba(0, 0, 0, 0.2);
@@ -310,7 +313,8 @@ class ModularToolBar(QToolBar):
QMenu::item:selected {
background-color: rgba(0, 0, 255, 0.2);
}
""")
"""
)
for ii, bundle in enumerate(self.shown_bundles):
self.handle_bundle_context_menu(menu, bundle)
if ii < len(self.shown_bundles) - 1:
-95
View File
@@ -1,95 +0,0 @@
from __future__ import annotations
import shiboken6
from qtpy.QtCore import QPropertyAnimation, QRect, QSequentialAnimationGroup, Qt
from qtpy.QtWidgets import QFrame, QWidget
class WidgetHighlighter:
"""
Utility that highlights widgets by drawing a temporary frame around them.
"""
def __init__(
self,
*,
frame_parent: QWidget | None = None,
window_flags: Qt.WindowType | Qt.WindowFlags = Qt.WindowType.Tool
| Qt.WindowType.FramelessWindowHint
| Qt.WindowType.WindowStaysOnTopHint,
style_sheet: str = "border: 2px solid #FF00FF; border-radius: 6px; background: transparent;",
) -> None:
self._frame_parent = frame_parent
self._window_flags = window_flags
self._style_sheet = style_sheet
self._frame: QFrame | None = None
self._animation_group: QSequentialAnimationGroup | None = None
def highlight(self, widget: QWidget | None) -> None:
"""
Highlight the given widget with a pulsing frame.
"""
if widget is None or not shiboken6.isValid(widget):
return
frame = self._ensure_frame()
frame.hide()
geom = widget.frameGeometry()
top_left = widget.mapToGlobal(widget.rect().topLeft())
frame.setGeometry(top_left.x(), top_left.y(), geom.width(), geom.height())
frame.setWindowOpacity(1.0)
frame.show()
start_rect = QRect(
top_left.x() - 5, top_left.y() - 5, geom.width() + 10, geom.height() + 10
)
pulse = QPropertyAnimation(frame, b"geometry", frame)
pulse.setDuration(300)
pulse.setStartValue(start_rect)
pulse.setEndValue(QRect(top_left.x(), top_left.y(), geom.width(), geom.height()))
fade = QPropertyAnimation(frame, b"windowOpacity", frame)
fade.setDuration(2000)
fade.setStartValue(1.0)
fade.setEndValue(0.0)
fade.finished.connect(frame.hide)
if self._animation_group is not None:
old_group = self._animation_group
self._animation_group = None
old_group.stop()
old_group.deleteLater()
animation = QSequentialAnimationGroup(frame)
animation.addAnimation(pulse)
animation.addAnimation(fade)
animation.start()
self._animation_group = animation
def cleanup(self) -> None:
"""
Delete the highlight frame and cancel pending animations.
"""
if self._animation_group is not None:
self._animation_group.stop()
self._animation_group.deleteLater()
self._animation_group = None
if self._frame is not None:
self._frame.hide()
self._frame.deleteLater()
self._frame = None
@property
def frame(self) -> QFrame | None:
"""Return the currently allocated highlight frame (if any)."""
return self._frame
def _ensure_frame(self) -> QFrame:
if self._frame is None:
self._frame = QFrame(self._frame_parent, self._window_flags)
self._frame.setAttribute(Qt.WidgetAttribute.WA_TransparentForMouseEvents)
self._frame.setStyleSheet(self._style_sheet)
return self._frame
+55 -117
View File
@@ -2,12 +2,10 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TYPE_CHECKING, Type, TypeVar, cast
import shiboken6 as shb
from bec_lib import bec_logger
from qtpy.QtCore import Qt
from qtpy.QtWidgets import (
QApplication,
QCheckBox,
@@ -26,21 +24,13 @@ from qtpy.QtWidgets import (
from bec_widgets.widgets.utility.toggle.toggle import ToggleSwitch
if TYPE_CHECKING: # pragma: no cover
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils import BECConnector
logger = bec_logger.logger
TAncestor = TypeVar("TAncestor", bound=QWidget)
@dataclass(frozen=True)
class WidgetTreeNode:
widget: QWidget
parent: QWidget | None
depth: int
prefix: str
class WidgetHandler(ABC):
"""Abstract base class for all widget handlers."""
@@ -330,72 +320,6 @@ class WidgetIO:
class WidgetHierarchy:
@staticmethod
def iter_widget_tree(widget: QWidget, *, exclude_internal_widgets: bool = True):
"""
Yield WidgetTreeNode entries for the widget hierarchy.
"""
visited: set[int] = set()
yield from WidgetHierarchy._iter_widget_tree_nodes(
widget, None, exclude_internal_widgets, visited, [], 0
)
@staticmethod
def _iter_widget_tree_nodes(
widget: QWidget,
parent: QWidget | None,
exclude_internal_widgets: bool,
visited: set[int],
branch_flags: list[bool],
depth: int,
):
if widget is None or not shb.isValid(widget):
return
widget_id = id(widget)
if widget_id in visited:
return
visited.add(widget_id)
prefix = WidgetHierarchy._build_prefix(branch_flags)
yield WidgetTreeNode(widget=widget, parent=parent, depth=depth, prefix=prefix)
children = WidgetHierarchy._filtered_children(widget, exclude_internal_widgets)
for idx, child in enumerate(children):
is_last = idx == len(children) - 1
yield from WidgetHierarchy._iter_widget_tree_nodes(
child,
widget,
exclude_internal_widgets,
visited,
branch_flags + [is_last],
depth + 1,
)
@staticmethod
def _build_prefix(branch_flags: list[bool]) -> str:
if not branch_flags:
return ""
parts: list[str] = []
for flag in branch_flags[:-1]:
parts.append(" " if flag else "")
parts.append("└─ " if branch_flags[-1] else "├─ ")
return "".join(parts)
@staticmethod
def _filtered_children(widget: QWidget, exclude_internal_widgets: bool) -> list[QWidget]:
children: list[QWidget] = []
for child in widget.findChildren(QWidget, options=Qt.FindDirectChildrenOnly):
if not shb.isValid(child):
continue
if (
exclude_internal_widgets
and isinstance(widget, QComboBox)
and child.__class__.__name__ in ["QFrame", "QBoxLayout", "QListView"]
):
continue
children.append(child)
return children
@staticmethod
def print_widget_hierarchy(
widget,
@@ -418,36 +342,55 @@ class WidgetHierarchy:
only_bec_widgets(bool, optional): Whether to print only widgets that are instances of BECWidget.
show_parent(bool, optional): Whether to display which BECWidget is the parent of each discovered BECWidget.
"""
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils import BECConnector
from bec_widgets.widgets.plots.waveform.waveform import Waveform
for node in WidgetHierarchy.iter_widget_tree(
widget, exclude_internal_widgets=exclude_internal_widgets
):
current = node.widget
is_bec = isinstance(current, BECConnector)
if only_bec_widgets and not is_bec:
# 1) Filter out widgets that are not BECConnectors (if 'only_bec_widgets' is True)
is_bec = isinstance(widget, BECConnector)
if only_bec_widgets and not is_bec:
return
# 2) Determine and print the parent's info (closest BECConnector)
parent_info = ""
if show_parent and is_bec:
ancestor = WidgetHierarchy._get_becwidget_ancestor(widget)
if ancestor:
parent_label = ancestor.objectName() or ancestor.__class__.__name__
parent_info = f" parent={parent_label}"
else:
parent_info = " parent=None"
widget_info = f"{widget.__class__.__name__} ({widget.objectName()}){parent_info}"
print(prefix + widget_info)
# 3) If it's a Waveform, explicitly print the curves
if isinstance(widget, Waveform):
for curve in widget.curves:
curve_prefix = prefix + " └─ "
print(
f"{curve_prefix}{curve.__class__.__name__} ({curve.objectName()}) "
f"parent={widget.objectName()}"
)
# 4) Recursively handle each child if:
# - It's a QWidget
# - It is a BECConnector (or we don't care about filtering)
# - Its closest BECConnector parent is the current widget
for child in widget.findChildren(QWidget):
if only_bec_widgets and not isinstance(child, BECConnector):
continue
parent_info = ""
if show_parent and is_bec:
ancestor = WidgetHierarchy.get_becwidget_ancestor(current)
if ancestor:
parent_label = ancestor.objectName() or ancestor.__class__.__name__
parent_info = f" parent={parent_label}"
else:
parent_info = " parent=None"
widget_info = f"{current.__class__.__name__} ({current.objectName()}){parent_info}"
print(node.prefix + widget_info)
if isinstance(current, Waveform):
for curve in current.curves:
curve_prefix = node.prefix + " "
print(
f"{curve_prefix}└─ {curve.__class__.__name__} ({curve.objectName()}) "
f"parent={current.objectName()}"
)
# if WidgetHierarchy._get_becwidget_ancestor(child) == widget:
child_prefix = prefix + " └─ "
WidgetHierarchy.print_widget_hierarchy(
child,
indent=indent + 1,
grab_values=grab_values,
prefix=child_prefix,
exclude_internal_widgets=exclude_internal_widgets,
only_bec_widgets=only_bec_widgets,
show_parent=show_parent,
)
@staticmethod
def print_becconnector_hierarchy_from_app():
@@ -468,7 +411,7 @@ class WidgetHierarchy:
from qtpy.QtWidgets import QApplication
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils import BECConnector
from bec_widgets.widgets.plots.plot_base import PlotBase
# 1) Gather ALL QWidget-based BECConnector objects
@@ -487,7 +430,7 @@ class WidgetHierarchy:
# 3) Build a map of (closest BECConnector parent) -> list of children
parent_map = defaultdict(list)
for w in bec_widgets:
parent_bec = WidgetHierarchy.get_becwidget_ancestor(w)
parent_bec = WidgetHierarchy._get_becwidget_ancestor(w)
parent_map[parent_bec].append(w)
# 4) Define a recursive printer to show each object's children
@@ -524,17 +467,12 @@ class WidgetHierarchy:
print_tree(root, prefix=" ")
@staticmethod
def get_becwidget_ancestor(widget):
def _get_becwidget_ancestor(widget):
"""
Traverse up the parent chain to find the nearest BECConnector.
Args:
widget: Starting widget to find the ancestor for.
Returns:
The nearest ancestor that is a BECConnector, or None if not found.
Returns None if none is found.
"""
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils import BECConnector
# Guard against deleted/invalid Qt wrappers
if not shb.isValid(widget):
@@ -636,13 +574,13 @@ class WidgetHierarchy:
Return all BECConnector instances whose closest BECConnector ancestor is the given widget,
including the widget itself if it is a BECConnector.
"""
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils import BECConnector
connectors: list[BECConnector] = []
if isinstance(widget, BECConnector):
connectors.append(widget)
for child in widget.findChildren(BECConnector):
if WidgetHierarchy.get_becwidget_ancestor(child) is widget:
if WidgetHierarchy._get_becwidget_ancestor(child) is widget:
connectors.append(child)
return connectors
@@ -664,7 +602,7 @@ class WidgetHierarchy:
return None
try:
from bec_widgets.utils.bec_connector import BECConnector # local import to avoid cycles
from bec_widgets.utils import BECConnector # local import to avoid cycles
is_bec_target = False
if isinstance(ancestor_class, str):
@@ -673,7 +611,7 @@ class WidgetHierarchy:
is_bec_target = issubclass(ancestor_class, BECConnector)
if is_bec_target:
ancestor = WidgetHierarchy.get_becwidget_ancestor(widget)
ancestor = WidgetHierarchy._get_becwidget_ancestor(widget)
return cast(TAncestor, ancestor)
except Exception as e:
logger.error(f"Error importing BECConnector: {e}")
@@ -41,12 +41,12 @@ class AutoUpdates(BECMainWindow):
parent=self,
object_name="dock_area",
enable_profile_management=False,
startup_profile="skip",
restore_initial_profile=False,
)
self.setCentralWidget(self.dock_area)
self._auto_update_selected_device: str | None = None
self._default_dock = None # type: ignore
self._default_dock = None # type:ignore
self.current_widget: BECWidget | None = None
self.dock_name = None
self._enabled = True
@@ -63,7 +63,7 @@ class AutoUpdates(BECMainWindow):
Disconnect all connections for the auto updates.
"""
self.bec_dispatcher.disconnect_slot(
self._on_scan_status, MessageEndpoints.scan_status() # type: ignore
self._on_scan_status, MessageEndpoints.scan_status() # type:ignore
)
@property
@@ -244,10 +244,10 @@ class AutoUpdates(BECMainWindow):
wf = self.set_dock_to_widget("Waveform")
# Get the scan report devices reported by the scan
dev_x = info.scan_report_devices[0] # type: ignore
dev_x = info.scan_report_devices[0] # type:ignore
# For the y axis, get the selected device
dev_y = self.get_selected_device(info.readout_priority["monitored"]) # type: ignore
dev_y = self.get_selected_device(info.readout_priority["monitored"]) # type:ignore
if not dev_y:
return
@@ -256,8 +256,8 @@ class AutoUpdates(BECMainWindow):
# as the label and title
wf.clear_all()
wf.plot(
device_x=dev_x,
device_y=dev_y,
x_name=dev_x,
y_name=dev_y,
label=f"Scan {info.scan_number} - {dev_y}",
title=f"Scan {info.scan_number}",
x_label=dev_x,
@@ -265,7 +265,7 @@ class AutoUpdates(BECMainWindow):
)
logger.info(
f"Auto Update [simple_line_scan]: Started plot with: device_x={dev_x}, device_y={dev_y}"
f"Auto Update [simple_line_scan]: Started plot with: x_name={dev_x}, y_name={dev_y}"
)
def simple_grid_scan(self, info: ScanStatusMessage) -> None:
@@ -279,8 +279,8 @@ class AutoUpdates(BECMainWindow):
scatter = self.set_dock_to_widget("ScatterWaveform")
# Get the scan report devices reported by the scan
dev_x, dev_y = info.scan_report_devices[0], info.scan_report_devices[1] # type: ignore
dev_z = self.get_selected_device(info.readout_priority["monitored"]) # type: ignore
dev_x, dev_y = info.scan_report_devices[0], info.scan_report_devices[1] # type:ignore
dev_z = self.get_selected_device(info.readout_priority["monitored"]) # type:ignore
if None in (dev_x, dev_y, dev_z):
return
@@ -288,14 +288,11 @@ class AutoUpdates(BECMainWindow):
# Clear the scatter waveform widget and plot the data
scatter.clear_all()
scatter.plot(
device_x=dev_x,
device_y=dev_y,
device_z=dev_z,
label=f"Scan {info.scan_number} - {dev_z}",
x_name=dev_x, y_name=dev_y, z_name=dev_z, label=f"Scan {info.scan_number} - {dev_z}"
)
logger.info(
f"Auto Update [simple_grid_scan]: Started plot with: device_x={dev_x}, device_y={dev_y}, device_z={dev_z}"
f"Auto Update [simple_grid_scan]: Started plot with: x_name={dev_x}, y_name={dev_y}, z_name={dev_z}"
)
def best_effort(self, info: ScanStatusMessage) -> None:
@@ -309,8 +306,8 @@ class AutoUpdates(BECMainWindow):
# If the scan report devices are empty, there is nothing we can do
if not info.scan_report_devices:
return
dev_x = info.scan_report_devices[0] # type: ignore
dev_y = self.get_selected_device(info.readout_priority["monitored"]) # type: ignore
dev_x = info.scan_report_devices[0] # type:ignore
dev_y = self.get_selected_device(info.readout_priority["monitored"]) # type:ignore
if not dev_y:
return
@@ -320,17 +317,15 @@ class AutoUpdates(BECMainWindow):
# Clear the waveform widget and plot the data
wf.clear_all()
wf.plot(
device_x=dev_x,
device_y=dev_y,
x_name=dev_x,
y_name=dev_y,
label=f"Scan {info.scan_number} - {dev_y}",
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=dev_y,
)
logger.info(
f"Auto Update [best_effort]: Started plot with: device_x={dev_x}, device_y={dev_y}"
)
logger.info(f"Auto Update [best_effort]: Started plot with: x_name={dev_x}, y_name={dev_y}")
#######################################################################
################# GUI Callbacks #######################################
@@ -13,9 +13,8 @@ from shiboken6 import isValid
import bec_widgets.widgets.containers.qt_ads as QtAds
from bec_widgets import BECWidget, SafeSlot
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.cli.rpc.rpc_widget_handler import widget_handler
from bec_widgets.utils.property_editor import PropertyEditor
from bec_widgets.utils.rpc_widget_handler import widget_handler
from bec_widgets.utils.toolbars.actions import MaterialIconAction
from bec_widgets.widgets.containers.qt_ads import (
CDockAreaWidget,
@@ -113,7 +112,6 @@ class DockAreaWidget(BECWidget, QWidget):
)
self._root_layout.addWidget(self.dock_manager, 1)
self._install_manager_parent_guards()
################################################################################
# Dock Utility Helpers
@@ -256,54 +254,6 @@ class DockAreaWidget(BECWidget, QWidget):
return lambda dock: self._default_close_handler(dock, widget)
def _install_manager_parent_guards(self) -> None:
"""
Track ADS structural changes so drag/drop-created tab areas keep stable parenting.
"""
self.dock_manager.dockAreaCreated.connect(self._normalize_all_dock_parents)
self.dock_manager.dockWidgetAdded.connect(self._normalize_all_dock_parents)
self.dock_manager.stateRestored.connect(self._normalize_all_dock_parents)
self.dock_manager.restoringState.connect(self._normalize_all_dock_parents)
self.dock_manager.focusedDockWidgetChanged.connect(self._normalize_all_dock_parents)
self._normalize_all_dock_parents()
def _iter_all_dock_areas(self) -> list[CDockAreaWidget]:
"""Return all dock areas from all known dock containers."""
areas: list[CDockAreaWidget] = []
for i in range(self.dock_manager.dockAreaCount()):
area = self.dock_manager.dockArea(i)
if area is None or not isValid(area):
continue
areas.append(area)
return areas
def _connect_dock_area_parent_guards(self) -> None:
"""Bind area-level tab/view events to parent normalization."""
for area in self._iter_all_dock_areas():
try:
area.currentChanged.connect(
self._normalize_all_dock_parents, Qt.ConnectionType.UniqueConnection
)
area.viewToggled.connect(
self._normalize_all_dock_parents, Qt.ConnectionType.UniqueConnection
)
except TypeError:
area.currentChanged.connect(self._normalize_all_dock_parents)
area.viewToggled.connect(self._normalize_all_dock_parents)
def _normalize_all_dock_parents(self, *_args) -> None:
"""
Ensure each dock has a stable parent after tab switches, re-docking, or restore.
"""
self._connect_dock_area_parent_guards()
for dock in self.dock_list():
if dock is None or not isValid(dock):
continue
area_widget = dock.dockAreaWidget()
target_parent = area_widget if area_widget is not None else self.dock_manager
if dock.parent() is not target_parent:
dock.setParent(target_parent)
def _make_dock(
self,
widget: QWidget,
@@ -406,7 +356,6 @@ class DockAreaWidget(BECWidget, QWidget):
self._apply_floating_state_to_dock(dock, floating_state)
if resolved_icon is not None:
dock.setIcon(resolved_icon)
self._normalize_all_dock_parents()
return dock
def _delete_dock(self, dock: CDockWidget) -> None:
@@ -1315,7 +1264,7 @@ class DockAreaWidget(BECWidget, QWidget):
or a sequence of button names to hide.
show_settings_action(bool | None): Control whether a dock settings/property action should
be installed. Defaults to ``False`` for the basic dock area; subclasses
such as `BECDockArea` override the default to ``True``.
such as `AdvancedDockArea` override the default to ``True``.
promote_central(bool): When True, promote the created dock to be the dock manager's
central widget (useful for editor stacks or other root content).
dock_icon(QIcon | None): Optional icon applied to the dock via ``CDockWidget.setIcon``.
@@ -1385,40 +1334,37 @@ class DockAreaWidget(BECWidget, QWidget):
dock = self._create_dock_from_spec(spec)
return dock if return_dock else widget
def _iter_all_docks(self) -> list[CDockWidget]:
"""Return all docks, including those hosted in floating containers."""
docks = list(self.dock_manager.dockWidgets())
seen = {id(d) for d in docks}
for container in self.dock_manager.floatingWidgets():
if container is None:
continue
for dock in container.dockWidgets():
if dock is None:
continue
if id(dock) in seen:
continue
docks.append(dock)
seen.add(id(dock))
return docks
def dock_map(self) -> dict[str, CDockWidget]:
"""Return the dock widgets map as dictionary with names as keys."""
return self.dock_manager.dockWidgetsMap()
return {dock.objectName(): dock for dock in self._iter_all_docks() if dock.objectName()}
def dock_list(self) -> list[CDockWidget]:
"""Return the list of dock widgets."""
return list(self.dock_map().values())
return self._iter_all_docks()
def widget_map(self, bec_widgets_only: bool = True) -> dict[str, QWidget]:
"""
Return a dictionary mapping widget names to their corresponding widgets.
def widget_map(self) -> dict[str, QWidget]:
"""Return a dictionary mapping widget names to their corresponding widgets."""
return {dock.objectName(): dock.widget() for dock in self.dock_list()}
Args:
bec_widgets_only(bool): If True, only include widgets that are BECConnector instances.
"""
widgets: dict[str, QWidget] = {}
for dock in self.dock_list():
widget = dock.widget()
if not isinstance(widget, QWidget):
continue
if bec_widgets_only and not isinstance(widget, BECConnector):
continue
widgets[dock.objectName()] = widget
return widgets
def widget_list(self, bec_widgets_only: bool = True) -> list[QWidget]:
"""
Return a list of widgets contained in the dock area.
Args:
bec_widgets_only(bool): If True, only include widgets that are BECConnector instances.
"""
return list(self.widget_map(bec_widgets_only=bec_widgets_only).values())
def widget_list(self) -> list[QWidget]:
"""Return a list of all widgets contained in the dock area."""
return [dock.widget() for dock in self.dock_list() if isinstance(dock.widget(), QWidget)]
@SafeSlot()
def attach_all(self):
@@ -19,11 +19,10 @@ from qtpy.QtWidgets import (
import bec_widgets.widgets.containers.qt_ads as QtAds
from bec_widgets import BECWidget, SafeProperty, SafeSlot
from bec_widgets.applications.views.view import ViewTourSteps
from bec_widgets.utils.bec_dispatcher import BECDispatcher
from bec_widgets.cli.rpc.rpc_widget_handler import widget_handler
from bec_widgets.utils import BECDispatcher
from bec_widgets.utils.colors import apply_theme
from bec_widgets.utils.rpc_decorator import rpc_timeout
from bec_widgets.utils.rpc_widget_handler import widget_handler
from bec_widgets.utils.toolbars.actions import (
ExpandableMenuAction,
MaterialIconAction,
@@ -69,7 +68,7 @@ from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow
from bec_widgets.widgets.containers.qt_ads import CDockWidget
from bec_widgets.widgets.control.device_control.positioner_box import PositionerBox, PositionerBox2D
from bec_widgets.widgets.control.scan_control import ScanControl
from bec_widgets.widgets.editors.bec_console.bec_console import BecConsole, BECShell
from bec_widgets.widgets.editors.web_console.web_console import BECShell, WebConsole
from bec_widgets.widgets.plots.heatmap.heatmap import Heatmap
from bec_widgets.widgets.plots.image.image import Image
from bec_widgets.widgets.plots.motor_map.motor_map import MotorMap
@@ -79,7 +78,7 @@ from bec_widgets.widgets.plots.waveform.waveform import Waveform
from bec_widgets.widgets.progress.ring_progress_bar.ring_progress_bar import RingProgressBar
from bec_widgets.widgets.services.bec_queue.bec_queue import BECQueue
from bec_widgets.widgets.services.bec_status_box.bec_status_box import BECStatusBox
from bec_widgets.widgets.utility.logpanel.logpanel import LogPanel
from bec_widgets.widgets.utility.logpanel import LogPanel
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
logger = bec_logger.logger
@@ -87,7 +86,6 @@ logger = bec_logger.logger
_PROFILE_NAMESPACE_UNSET = object()
PROFILE_STATE_KEYS = {key: SETTINGS_KEYS[key] for key in ("geom", "state", "ads_state")}
StartupProfile = Literal["restore", "skip"] | str | None
class BECDockArea(DockAreaWidget):
@@ -125,7 +123,9 @@ class BECDockArea(DockAreaWidget):
instance_id: str | None = None,
auto_save_upon_exit: bool = True,
enable_profile_management: bool = True,
startup_profile: StartupProfile = "restore",
restore_initial_profile: bool = True,
init_profile: str | None = None,
start_empty: bool = False,
**kwargs,
):
self._profile_namespace_hint = profile_namespace
@@ -134,9 +134,14 @@ class BECDockArea(DockAreaWidget):
self._instance_id = slugify.slugify(instance_id, separator="_") if instance_id else None
self._auto_save_upon_exit = auto_save_upon_exit
self._profile_management_enabled = enable_profile_management
self._startup_profile = self._normalize_startup_profile(startup_profile)
self._restore_initial_profile = restore_initial_profile
self._init_profile = init_profile
self._start_empty = start_empty
super().__init__(
parent, default_add_direction=default_add_direction, title="BEC Dock Area", **kwargs
parent,
default_add_direction=default_add_direction,
title="Advanced Dock Area",
**kwargs,
)
# Initialize mode property first (before toolbar setup)
@@ -156,16 +161,14 @@ class BECDockArea(DockAreaWidget):
self._root_layout.insertWidget(0, self.toolbar)
# Populate and hook the workspace combo
self._refresh_workspace_list()
self._current_profile_name = None
self._empty_profile_active = False
self._empty_profile_consumed = False
self._pending_autosave_skip: tuple[str, str] | None = None
self._exit_snapshot_written = False
self._refresh_workspace_list()
# State manager
self.state_manager = WidgetStateManager(
self, serialize_from_root=True, root_id="BECDockArea"
self, serialize_from_root=True, root_id="AdvancedDockArea"
)
# Developer mode state
@@ -173,81 +176,83 @@ class BECDockArea(DockAreaWidget):
# Initialize default editable state based on current lock
self._set_editable(True) # default to editable; will sync toolbar toggle below
if self._ensure_initial_profile():
self._refresh_workspace_list()
# Apply the requested mode after everything is set up
self.mode = mode
self._fetch_initial_profile()
if self._restore_initial_profile:
self._fetch_initial_profile()
@staticmethod
def _normalize_startup_profile(startup_profile: StartupProfile) -> StartupProfile:
def _ensure_initial_profile(self) -> bool:
"""
Normalize startup profile values.
"""
if startup_profile == "":
return None
return startup_profile
Ensure the "general" workspace profile always exists for the current namespace.
The "general" profile is mandatory and will be recreated if deleted.
If list_profile fails due to file permission or corrupted profiles, no action taken.
def _resolve_restore_startup_profile(self) -> str | None:
Returns:
bool: True if a profile was created, False otherwise.
"""
Resolve the profile name when startup profile is set to "restore".
"""
combo = self.toolbar.components.get_action("workspace_combo").widget
namespace = self.profile_namespace
try:
existing_profiles = list_profiles(namespace)
except Exception as exc: # pragma: no cover - defensive guard
logger.warning(f"Unable to enumerate profiles for namespace '{namespace}': {exc}")
return False
instance_id = self._last_profile_instance_id()
if instance_id:
inst_profile = get_last_profile(
namespace=namespace, instance=instance_id, allow_namespace_fallback=False
)
if inst_profile and self._profile_exists(inst_profile, namespace):
return inst_profile
# Always ensure "general" profile exists
name = "general"
if name in existing_profiles:
return False
last = get_last_profile(namespace=namespace)
if last and self._profile_exists(last, namespace):
return last
logger.info(
f"Profile '{name}' not found in namespace '{namespace}'. Creating mandatory '{name}' workspace."
)
combo_text = combo.currentText().strip()
if combo_text and self._profile_exists(combo_text, namespace):
return combo_text
return None
self._write_profile_settings(name, namespace, save_preview=False)
set_quick_select(name, True, namespace=namespace)
set_last_profile(name, namespace=namespace, instance=self._last_profile_instance_id())
return True
def _fetch_initial_profile(self):
startup_profile = self._startup_profile
# Restore last-used profile if available; otherwise fall back to combo selection
combo = self.toolbar.components.get_action("workspace_combo").widget
namespace = self.profile_namespace
init_profile = None
if startup_profile == "skip":
logger.debug("Skipping startup profile initialization.")
return
if startup_profile == "restore":
restored = self._resolve_restore_startup_profile()
if restored:
self._load_initial_profile(restored)
return
self._start_empty_workspace()
return
if startup_profile is None:
self._start_empty_workspace()
return
self._load_initial_profile(startup_profile)
# First priority: use init_profile if explicitly provided
if self._init_profile:
init_profile = self._init_profile
else:
# Try to restore from last used profile
instance_id = self._last_profile_instance_id()
if instance_id:
inst_profile = get_last_profile(
namespace=namespace, instance=instance_id, allow_namespace_fallback=False
)
if inst_profile and self._profile_exists(inst_profile, namespace):
init_profile = inst_profile
if not init_profile:
last = get_last_profile(namespace=namespace)
if last and self._profile_exists(last, namespace):
init_profile = last
else:
text = combo.currentText()
init_profile = text if text else None
if not init_profile:
# Fall back to "general" profile which is guaranteed to exist
if self._profile_exists("general", namespace):
init_profile = "general"
if init_profile:
self._load_initial_profile(init_profile)
def _load_initial_profile(self, name: str) -> None:
"""Load the initial profile."""
self.load_profile(name)
if not self._empty_profile_active:
self._set_workspace_combo_text_silent(name)
def _start_empty_workspace(self) -> None:
"""
Initialize the dock area in transient empty-profile mode.
"""
if (
getattr(self, "_current_profile_name", None) is None
and not self._empty_profile_consumed
):
self.delete_all()
self._enter_empty_profile_state()
self.load_profile(name, start_empty=self._start_empty)
combo = self.toolbar.components.get_action("workspace_combo").widget
combo.blockSignals(True)
combo.setCurrentText(name)
combo.blockSignals(False)
def _customize_dock(self, dock: CDockWidget, widget: QWidget) -> None:
prefs = getattr(dock, "_dock_preferences", {}) or {}
@@ -298,7 +303,7 @@ class BECDockArea(DockAreaWidget):
or a sequence of button names to hide.
show_settings_action(bool | None): Control whether a dock settings/property action should
be installed. Defaults to ``False`` for the basic dock area; subclasses
such as `BECDockArea` override the default to ``True``.
such as `AdvancedDockArea` override the default to ``True``.
promote_central(bool): When True, promote the created dock to be the dock manager's
central widget (useful for editor stacks or other root content).
object_name(str | None): Optional object name to assign to the created widget.
@@ -369,11 +374,10 @@ class BECDockArea(DockAreaWidget):
"Add Circular ProgressBar",
"RingProgressBar",
),
"terminal": (BecConsole.ICON_NAME, "Add Terminal", "BecConsole"),
"terminal": (WebConsole.ICON_NAME, "Add Terminal", "WebConsole"),
"bec_shell": (BECShell.ICON_NAME, "Add BEC Shell", "BECShell"),
"log_panel": (LogPanel.ICON_NAME, "Add LogPanel - Disabled", "LogPanel"),
"sbb_monitor": ("train", "Add SBB Monitor", "SBBMonitor"),
"log_panel": (LogPanel.ICON_NAME, "Add LogPanel", "LogPanel"),
}
# Create expandable menu actions (original behavior)
@@ -485,7 +489,9 @@ class BECDockArea(DockAreaWidget):
# first two items not needed for this part
for key, (_, _, widget_type) in mapping.items():
act = menu.actions[key].action
if key == "terminal":
if widget_type == "LogPanel":
act.setEnabled(False) # keep disabled per issue #644
elif key == "terminal":
act.triggered.connect(
lambda _, t=widget_type: self.new(widget=t, closable=True, startup_cmd=None)
)
@@ -506,7 +512,10 @@ class BECDockArea(DockAreaWidget):
for action_id, (_, _, widget_type) in mapping.items():
flat_action_id = f"flat_{action_id}"
flat_action = self.toolbar.components.get_action(flat_action_id).action
flat_action.triggered.connect(lambda _, t=widget_type: self.new(t))
if widget_type == "LogPanel":
flat_action.setEnabled(False) # keep disabled per issue #644
else:
flat_action.triggered.connect(lambda _, t=widget_type: self.new(t))
_connect_flat_actions(self._ACTION_MAPPINGS["menu_plots"])
_connect_flat_actions(self._ACTION_MAPPINGS["menu_devices"])
@@ -591,6 +600,13 @@ class BECDockArea(DockAreaWidget):
"""Namespace used to scope user/default profile files for this dock area."""
return self._resolve_profile_namespace()
def _active_profile_name_or_default(self) -> str:
name = getattr(self, "_current_profile_name", None)
if not name:
name = "general"
self._current_profile_name = name
return name
def _profile_exists(self, name: str, namespace: str | None) -> bool:
return any(
os.path.exists(path) for path in user_profile_candidates(name, namespace)
@@ -658,34 +674,12 @@ class BECDockArea(DockAreaWidget):
name: The profile name.
namespace: The profile namespace.
"""
self._empty_profile_active = False
self._empty_profile_consumed = True
self._current_profile_name = name
self.profile_changed.emit(name)
set_last_profile(name, namespace=namespace, instance=self._last_profile_instance_id())
combo = self.toolbar.components.get_action("workspace_combo").widget
combo.refresh_profiles(active_profile=name)
def _set_workspace_combo_text_silent(self, text: str) -> None:
combo = self.toolbar.components.get_action("workspace_combo").widget
was_blocked = combo.blockSignals(True)
try:
combo.setCurrentText(text)
finally:
combo.blockSignals(was_blocked)
def _enter_empty_profile_state(self) -> None:
"""
Switch to the transient empty workspace state.
In this mode there is no active profile name, the toolbar shows an
explicit blank profile entry, and no autosave on shutdown is performed.
"""
self._empty_profile_active = True
self._current_profile_name = None
self._pending_autosave_skip = None
self._refresh_workspace_list()
@SafeSlot()
def list_profiles(self) -> list[str]:
"""
@@ -801,6 +795,7 @@ class BECDockArea(DockAreaWidget):
self._pending_autosave_skip = (current_profile, name)
else:
self._pending_autosave_skip = None
workspace_combo.setCurrentText(name)
self._finalize_profile_change(name, namespace)
@SafeSlot()
@@ -818,10 +813,10 @@ class BECDockArea(DockAreaWidget):
"""
self.save_profile(name, show_dialog=True)
@SafeSlot()
@SafeSlot(str)
@SafeSlot(str, bool)
@rpc_timeout(None)
def load_profile(self, name: str | None = None):
def load_profile(self, name: str | None = None, start_empty: bool = False):
"""
Load a workspace profile.
@@ -830,10 +825,8 @@ class BECDockArea(DockAreaWidget):
Args:
name (str | None): The name of the profile to load. If None, prompts the user.
start_empty (bool): If True, load a profile without any widgets. Danger of overwriting the dynamic state of that profile.
"""
if name == "":
return
if not name: # Gui fallback if the name is not provided
name, ok = QInputDialog.getText(
self, "Load Workspace", "Enter the name of the workspace profile to load:"
@@ -865,6 +858,10 @@ class BECDockArea(DockAreaWidget):
# Clear existing docks and remove all widgets
self.delete_all()
if start_empty:
self._finalize_profile_change(name, namespace)
return
# Rebuild widgets and restore states
for item in read_manifest(settings):
obj_name = item["object_name"]
@@ -1010,36 +1007,25 @@ class BECDockArea(DockAreaWidget):
"""
combo = self.toolbar.components.get_action("workspace_combo").widget
active_profile = getattr(self, "_current_profile_name", None)
empty_profile_active = bool(getattr(self, "_empty_profile_active", False))
namespace = self.profile_namespace
if hasattr(combo, "set_quick_profile_provider"):
combo.set_quick_profile_provider(lambda ns=namespace: list_quick_profiles(namespace=ns))
if hasattr(combo, "refresh_profiles"):
if empty_profile_active:
combo.refresh_profiles(active_profile, show_empty_profile=True)
else:
combo.refresh_profiles(active_profile)
combo.refresh_profiles(active_profile)
else:
# Fallback for regular QComboBox
combo.blockSignals(True)
combo.clear()
quick_profiles = list_quick_profiles(namespace=namespace)
items = [""] if empty_profile_active else []
items.extend(quick_profiles)
items = list(quick_profiles)
if active_profile and active_profile not in items:
items.insert(0, active_profile)
combo.addItems(items)
if empty_profile_active:
idx = combo.findText("")
if idx >= 0:
combo.setCurrentIndex(idx)
elif active_profile:
if active_profile:
idx = combo.findText(active_profile)
if idx >= 0:
combo.setCurrentIndex(idx)
if empty_profile_active:
combo.setToolTip("Unsaved empty workspace")
elif active_profile and active_profile not in quick_profiles:
if active_profile and active_profile not in quick_profiles:
combo.setToolTip("Active profile is not in quick select")
else:
combo.setToolTip("")
@@ -1144,16 +1130,7 @@ class BECDockArea(DockAreaWidget):
logger.info("ADS prepare_for_shutdown: skipping (already handled or destroyed)")
return
if getattr(self, "_empty_profile_active", False):
logger.info("ADS prepare_for_shutdown: skipping autosave for unsaved empty workspace")
self._exit_snapshot_written = True
return
name = getattr(self, "_current_profile_name", None)
if not name:
logger.info("ADS prepare_for_shutdown: skipping autosave (no active profile)")
self._exit_snapshot_written = True
return
name = self._active_profile_name_or_default()
namespace = self.profile_namespace
settings = open_user_settings(name, namespace=namespace)
@@ -1161,33 +1138,6 @@ class BECDockArea(DockAreaWidget):
set_last_profile(name, namespace=namespace, instance=self._last_profile_instance_id())
self._exit_snapshot_written = True
def register_tour_steps(self, guided_tour, main_app):
"""Register Dock Area components with the guided tour.
Args:
guided_tour: The GuidedTour instance to register with.
main_app: The main application instance (for accessing set_current).
Returns:
ViewTourSteps | None: Model containing view title and step IDs.
"""
step_ids = []
# Register Dock Area toolbar
def get_dock_toolbar():
main_app.set_current("dock_area")
return (self.toolbar, None)
step_id = guided_tour.register_widget(
widget=get_dock_toolbar,
title="Dock Area Toolbar",
text="Use this toolbar to add widgets, manage workspaces, save and load profiles, and control the layout of your workspace.",
)
step_ids.append(step_id)
return ViewTourSteps(view_title="Dock Area Workspace", step_ids=step_ids)
def cleanup(self):
"""
Cleanup the dock area.
@@ -1,5 +1,5 @@
"""
Utilities for managing BECDockArea profiles stored in INI files.
Utilities for managing AdvancedDockArea profiles stored in INI files.
Policy:
- All created/modified profiles are stored under the BEC settings root: <base_path>/profiles/{default,user}
@@ -36,12 +36,12 @@ ProfileOrigin = Literal["module", "plugin", "settings", "unknown"]
def module_profiles_dir() -> str:
"""
Return the built-in BECDockArea profiles directory bundled with the module.
Return the built-in AdvancedDockArea profiles directory bundled with the module.
Returns:
str: Absolute path of the read-only module profiles directory.
"""
return os.path.join(MODULE_PATH, "containers", "dock_area", "profiles")
return os.path.join(MODULE_PATH, "containers", "advanced_dock_area", "profiles")
@lru_cache(maxsize=1)
@@ -115,12 +115,12 @@ def _settings_profiles_root() -> str:
str: Absolute path to the profiles root. The directory is created if missing.
"""
client = BECClient()
bec_widgets_settings = client._service_config.config.get("widgets_settings")
bec_widgets_settings = client._service_config.config.get("bec_widgets_settings")
bec_widgets_setting_path = (
bec_widgets_settings.get("base_path") if bec_widgets_settings else None
)
default_path = os.path.join(bec_widgets_setting_path, "profiles")
root = os.path.expanduser(os.environ.get("BECWIDGETS_PROFILE_DIR", default_path))
root = os.environ.get("BECWIDGETS_PROFILE_DIR", default_path)
os.makedirs(root, exist_ok=True)
return root
@@ -138,7 +138,7 @@ def _profiles_dir(segment: str, namespace: str | None) -> str:
"""
base = os.path.join(_settings_profiles_root(), segment)
ns = slugify.slugify(namespace, separator="_") if namespace else None
path = os.path.expanduser(os.path.join(base, ns) if ns else base)
path = os.path.join(base, ns) if ns else base
os.makedirs(path, exist_ok=True)
return path
@@ -330,7 +330,7 @@ class WorkSpaceManager(BECWidget, QWidget):
return
self.target_widget.save_profile_dialog()
# BECDockArea will emit profile_changed which will trigger table refresh,
# AdvancedDockArea will emit profile_changed which will trigger table refresh,
# but ensure the UI stays in sync even if the signal is delayed.
self.render_table()
current = getattr(self.target_widget, "_current_profile_name", None)
@@ -24,103 +24,69 @@ class ProfileComboBox(QComboBox):
def set_quick_profile_provider(self, provider: Callable[[], list[str]]) -> None:
self._quick_provider = provider
def refresh_profiles(
self, active_profile: str | None = None, show_empty_profile: bool = False
) -> None:
def refresh_profiles(self, active_profile: str | None = None):
"""
Refresh the profile list and ensure the active profile is visible.
Args:
active_profile(str | None): The currently active profile name.
show_empty_profile(bool): If True, show an explicit empty unsaved workspace entry.
"""
current_text = active_profile or self.currentText()
was_blocked = self.blockSignals(True)
try:
self.clear()
self.blockSignals(True)
self.clear()
quick_profiles = self._quick_provider()
quick_set = set(quick_profiles)
quick_profiles = self._quick_provider()
quick_set = set(quick_profiles)
items: list[str] = []
if show_empty_profile:
items.append("")
items = list(quick_profiles)
if active_profile and active_profile not in quick_set:
items.insert(0, active_profile)
if active_profile and active_profile not in quick_set:
items.append(active_profile)
for profile in items:
self.addItem(profile)
idx = self.count() - 1
for profile in quick_profiles:
if profile not in items:
items.append(profile)
# Reset any custom styling
self.setItemData(idx, None, Qt.ItemDataRole.FontRole)
self.setItemData(idx, None, Qt.ItemDataRole.ToolTipRole)
self.setItemData(idx, None, Qt.ItemDataRole.ForegroundRole)
if active_profile and active_profile not in quick_set:
# keep active profile at the top when not in quick list
items.remove(active_profile)
insert_pos = 1 if show_empty_profile else 0
items.insert(insert_pos, active_profile)
if active_profile and profile == active_profile:
tooltip = "Active workspace profile"
if profile not in quick_set:
font = QFont(self.font())
font.setItalic(True)
font.setBold(True)
self.setItemData(idx, font, Qt.ItemDataRole.FontRole)
self.setItemData(
idx, self.palette().highlight().color(), Qt.ItemDataRole.ForegroundRole
)
tooltip = "Active profile (not in quick select)"
self.setItemData(idx, tooltip, Qt.ItemDataRole.ToolTipRole)
self.setCurrentIndex(idx)
elif profile not in quick_set:
self.setItemData(idx, "Not in quick select", Qt.ItemDataRole.ToolTipRole)
for profile in items:
self.addItem(profile)
idx = self.count() - 1
# Restore selection if possible
index = self.findText(current_text)
if index >= 0:
self.setCurrentIndex(index)
# Reset any custom styling
self.setItemData(idx, None, Qt.ItemDataRole.FontRole)
self.setItemData(idx, None, Qt.ItemDataRole.ToolTipRole)
self.setItemData(idx, None, Qt.ItemDataRole.ForegroundRole)
if profile == "":
self.setItemData(idx, "Unsaved empty workspace", Qt.ItemDataRole.ToolTipRole)
if active_profile is None:
font = QFont(self.font())
font.setItalic(True)
self.setItemData(idx, font, Qt.ItemDataRole.FontRole)
self.setCurrentIndex(idx)
continue
if active_profile and profile == active_profile:
tooltip = "Active workspace profile"
if profile not in quick_set:
font = QFont(self.font())
font.setItalic(True)
font.setBold(True)
self.setItemData(idx, font, Qt.ItemDataRole.FontRole)
self.setItemData(
idx, self.palette().highlight().color(), Qt.ItemDataRole.ForegroundRole
)
tooltip = "Active profile (not in quick select)"
self.setItemData(idx, tooltip, Qt.ItemDataRole.ToolTipRole)
self.setCurrentIndex(idx)
elif profile not in quick_set:
self.setItemData(idx, "Not in quick select", Qt.ItemDataRole.ToolTipRole)
# Restore selection if possible
if show_empty_profile and active_profile is None:
empty_idx = self.findText("")
if empty_idx >= 0:
self.setCurrentIndex(empty_idx)
else:
index = self.findText(current_text)
if index >= 0:
self.setCurrentIndex(index)
if active_profile and self.currentText() != active_profile:
idx = self.findText(active_profile)
if idx >= 0:
self.setCurrentIndex(idx)
if show_empty_profile and self.currentText() == "":
self.setToolTip("Unsaved empty workspace")
elif active_profile and active_profile not in quick_set:
self.setToolTip("Active profile is not in quick select")
else:
self.setToolTip("")
finally:
self.blockSignals(was_blocked)
self.blockSignals(False)
if active_profile and self.currentText() != active_profile:
idx = self.findText(active_profile)
if idx >= 0:
self.setCurrentIndex(idx)
if active_profile and active_profile not in quick_set:
self.setToolTip("Active profile is not in quick select")
else:
self.setToolTip("")
def workspace_bundle(components: ToolbarComponents, enable_tools: bool = True) -> ToolbarBundle:
"""
Creates a workspace toolbar bundle for BECDockArea.
Creates a workspace toolbar bundle for AdvancedDockArea.
Args:
components (ToolbarComponents): The components to be added to the bundle.
@@ -173,7 +139,7 @@ def workspace_bundle(components: ToolbarComponents, enable_tools: bool = True) -
class WorkspaceConnection(BundleConnection):
"""
Connection class for workspace actions in BECDockArea.
Connection class for workspace actions in AdvancedDockArea.
"""
def __init__(self, components: ToolbarComponents, target_widget=None):
@@ -101,12 +101,14 @@ class Explorer(BECWidget, QWidget):
palette = get_theme_palette()
separator_color = palette.mid().color()
self.splitter.setStyleSheet(f"""
self.splitter.setStyleSheet(
f"""
QSplitter::handle {{
height: 0.1px;
background-color: rgba({separator_color.red()}, {separator_color.green()}, {separator_color.blue()}, 60);
}}
""")
"""
)
def _update_spacer(self) -> None:
"""Update the spacer size based on section states"""
@@ -63,7 +63,7 @@ class ScriptTreeWidget(QWidget):
layout.setSpacing(0)
# Create tree view
self.tree = QTreeView(parent=self)
self.tree = QTreeView()
self.tree.setHeaderHidden(True)
self.tree.setRootIsDecorated(True)
@@ -71,12 +71,12 @@ class ScriptTreeWidget(QWidget):
self.tree.setMouseTracking(True)
# Create file system model
self.model = QFileSystemModel(parent=self)
self.model = QFileSystemModel()
self.model.setNameFilters(["*.py"])
self.model.setNameFilterDisables(False)
# Create proxy model to filter out underscore directories
self.proxy_model = QSortFilterProxyModel(parent=self)
self.proxy_model = QSortFilterProxyModel()
self.proxy_model.setFilterRegularExpression(QRegularExpression("^[^_].*"))
self.proxy_model.setSourceModel(self.model)
self.tree.setModel(self.proxy_model)
@@ -22,7 +22,7 @@ from qtpy.QtWidgets import (
)
from typeguard import typechecked
from bec_widgets.utils.rpc_widget_handler import widget_handler
from bec_widgets.cli.rpc.rpc_widget_handler import widget_handler
class LayoutManagerWidget(QWidget):
@@ -1,83 +1,27 @@
import sys
from qtpy import QtGui, QtWidgets
from qtpy.QtCore import QPoint, Qt
from qtpy.QtWidgets import (
QApplication,
QFrame,
QHBoxLayout,
QLabel,
QProgressBar,
QVBoxLayout,
QWidget,
)
from qtpy.QtWidgets import QApplication, QHBoxLayout, QLabel, QProgressBar, QVBoxLayout, QWidget
class WidgetTooltip(QWidget):
"""Frameless, always-on-top window that behaves like a tooltip."""
def __init__(self, content: QWidget) -> None:
super().__init__(
None,
Qt.WindowType.ToolTip
| Qt.WindowType.FramelessWindowHint
| Qt.WindowType.WindowStaysOnTopHint,
)
self.setAttribute(Qt.WidgetAttribute.WA_ShowWithoutActivating)
self.setAttribute(Qt.WidgetAttribute.WA_TranslucentBackground)
super().__init__(None, Qt.ToolTip | Qt.FramelessWindowHint | Qt.WindowStaysOnTopHint)
self.setAttribute(Qt.WA_ShowWithoutActivating)
self.setMouseTracking(True)
self.content = content
layout = QVBoxLayout(self)
layout.setContentsMargins(14, 14, 14, 14)
self._card = QFrame(self)
self._card.setObjectName("WidgetTooltipCard")
card_layout = QVBoxLayout(self._card)
card_layout.setContentsMargins(12, 10, 12, 10)
card_layout.addWidget(self.content)
shadow = QtWidgets.QGraphicsDropShadowEffect(self._card)
shadow.setBlurRadius(18)
shadow.setOffset(0, 2)
shadow.setColor(QtGui.QColor(0, 0, 0, 140))
self._card.setGraphicsEffect(shadow)
layout.addWidget(self._card)
self.apply_theme()
layout.setContentsMargins(6, 6, 6, 6)
layout.addWidget(self.content)
self.adjustSize()
def leaveEvent(self, _event) -> None:
self.hide()
def apply_theme(self) -> None:
palette = QApplication.palette()
base = palette.color(QtGui.QPalette.ColorRole.Base)
text = palette.color(QtGui.QPalette.ColorRole.Text)
border = palette.color(QtGui.QPalette.ColorRole.Mid)
background = QtGui.QColor(base)
background.setAlpha(242)
self._card.setStyleSheet(f"""
QFrame#WidgetTooltipCard {{
background: {background.name(QtGui.QColor.NameFormat.HexArgb)};
border: 1px solid {border.name()};
border-radius: 12px;
}}
QFrame#WidgetTooltipCard QLabel {{
color: {text.name()};
background: transparent;
}}
""")
def show_above(self, global_pos: QPoint, offset: int = 8) -> None:
"""
Show the tooltip above a global position, adjusting to stay within screen bounds.
Args:
global_pos(QPoint): The global position to show above.
offset(int, optional): The vertical offset from the global position. Defaults to 8 pixels.
"""
self.apply_theme()
self.adjustSize()
screen = QApplication.screenAt(global_pos) or QApplication.primaryScreen()
screen_geo = screen.availableGeometry()
@@ -86,43 +30,11 @@ class WidgetTooltip(QWidget):
x = global_pos.x() - geom.width() // 2
y = global_pos.y() - geom.height() - offset
self._navigate_screen_coordinates(screen_geo, geom, x, y)
def show_near(self, global_pos: QPoint, offset: QPoint | None = None) -> None:
"""
Show the tooltip near a global position, adjusting to stay within screen bounds.
By default, it will try to show below and to the right of the position,
but if that would cause it to go off-screen, it will flip to the other side.
Args:
global_pos(QPoint): The global position to show near.
offset(QPoint, optional): The offset from the global position. Defaults to QPoint(12, 16).
"""
self.apply_theme()
self.adjustSize()
offset = offset or QPoint(12, 16)
screen = QApplication.screenAt(global_pos) or QApplication.primaryScreen()
screen_geo = screen.availableGeometry()
geom = self.geometry()
x = global_pos.x() + offset.x()
y = global_pos.y() + offset.y()
if x + geom.width() > screen_geo.right():
x = global_pos.x() - geom.width() - abs(offset.x())
if y + geom.height() > screen_geo.bottom():
y = global_pos.y() - geom.height() - abs(offset.y())
self._navigate_screen_coordinates(screen_geo, geom, x, y)
def _navigate_screen_coordinates(self, screen_geo, geom, x, y):
x = max(screen_geo.left(), min(x, screen_geo.right() - geom.width()))
y = max(screen_geo.top(), min(y, screen_geo.bottom() - geom.height()))
self.move(x, y)
self.show()
self.raise_()
class HoverWidget(QWidget):
@@ -28,7 +28,7 @@ from qtpy.QtCore import QObject, QTimer
from qtpy.QtWidgets import QApplication, QFrame, QMainWindow, QScrollArea, QWidget
from bec_widgets import SafeProperty, SafeSlot
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils import BECConnector
from bec_widgets.utils.colors import apply_theme
from bec_widgets.utils.widget_io import WidgetIO
@@ -134,13 +134,15 @@ class NotificationToast(QFrame):
bg.setAlphaF(0.30)
icon_bg = bg.name(QtGui.QColor.HexArgb)
icon_btn.setFixedSize(40, 40)
icon_btn.setStyleSheet(f"""
icon_btn.setStyleSheet(
f"""
QToolButton {{
background: {icon_bg};
border: none;
border-radius: 20px; /* perfect circle */
}}
""")
"""
)
title_lbl = QtWidgets.QLabel(self._title)
@@ -325,13 +327,15 @@ class NotificationToast(QFrame):
bg = QtGui.QColor(SEVERITY[value.value]["color"])
bg.setAlphaF(0.30)
icon_bg = bg.name(QtGui.QColor.HexArgb)
self._icon_btn.setStyleSheet(f"""
self._icon_btn.setStyleSheet(
f"""
QToolButton {{
background: {icon_bg};
border: none;
border-radius: 20px;
}}
""")
"""
)
self.apply_theme(self._theme)
# keep injected gradient in sync
if getattr(self, "_hg_enabled", False):
@@ -387,7 +391,8 @@ class NotificationToast(QFrame):
card_bg.setAlphaF(0.88)
btn_hover = self._accent_color.name()
self.setStyleSheet(f"""
self.setStyleSheet(
f"""
#NotificationToast {{
background: {card_bg.name(QtGui.QColor.HexArgb)};
border-radius: 12px;
@@ -401,15 +406,18 @@ class NotificationToast(QFrame):
font-size: 14px;
}}
#NotificationToast QPushButton:hover {{ color: {btn_hover}; }}
""")
"""
)
# traceback panel colours
trace_bg = "#1e1e1e" if theme == "dark" else "#f0f0f0"
self.trace_view.setStyleSheet(f"""
self.trace_view.setStyleSheet(
f"""
background:{trace_bg};
color:{palette['body']};
border:none;
border-radius:8px;
""")
"""
)
# icon glyph vs badge background: darker badge, lighter icon in light mode
icon_fg = "#ffffff" if theme == "light" else self._accent_color.name()
@@ -430,13 +438,15 @@ class NotificationToast(QFrame):
else:
badge_bg.setAlphaF(0.30)
icon_bg = badge_bg.name(QtGui.QColor.HexArgb)
self._icon_btn.setStyleSheet(f"""
self._icon_btn.setStyleSheet(
f"""
QToolButton {{
background: {icon_bg};
border: none;
border-radius: 20px;
}}
""")
"""
)
# stronger accent wash in light mode, slightly stronger in dark too
self._accent_alpha = 110 if theme == "light" else 60
@@ -583,7 +593,8 @@ class NotificationCentre(QScrollArea):
self.setWidgetResizable(True)
# transparent background so only the toast cards are visible
self.setAttribute(QtCore.Qt.WA_TranslucentBackground, True)
self.setStyleSheet("""
self.setStyleSheet(
"""
#NotificationCentre { background: transparent; }
#NotificationCentre QScrollBar:vertical {
background: transparent;
@@ -599,7 +610,8 @@ class NotificationCentre(QScrollArea):
#NotificationCentre QScrollBar::sub-line:vertical { height: 0; }
#NotificationCentre QScrollBar::add-page:vertical,
#NotificationCentre QScrollBar::sub-page:vertical { background: transparent; }
""")
"""
)
self.setHorizontalScrollBarPolicy(QtCore.Qt.ScrollBarAlwaysOff)
self.setFrameShape(QtWidgets.QFrame.NoFrame)
self.setFixedWidth(fixed_width)
@@ -946,7 +958,8 @@ class NotificationIndicator(QWidget):
self._group.buttonToggled.connect(self._button_toggled)
# minimalistic look: no frames or backgrounds on the buttons
self.setStyleSheet("""
self.setStyleSheet(
"""
QToolButton {
border: none;
background: transparent;
@@ -957,7 +970,8 @@ class NotificationIndicator(QWidget):
background: rgba(255, 255, 255, 40);
font-weight: 600;
}
""")
"""
)
# initial state: none checked (autodismiss behaviour)
for k in kinds:
@@ -2,7 +2,6 @@ from __future__ import annotations
import os
from bec_lib import bec_logger
from bec_lib.endpoints import MessageEndpoints
from qtpy.QtCore import QEvent, QSize, Qt, QTimer
from qtpy.QtGui import QAction, QActionGroup, QIcon
@@ -18,10 +17,11 @@ from qtpy.QtWidgets import (
)
import bec_widgets
from bec_widgets.utils import UILoader
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import apply_theme
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.ui_loader import UILoader
from bec_widgets.utils.toolbars.status_bar import StatusToolBar
from bec_widgets.widgets.containers.main_window.addons.hover_widget import HoverWidget
from bec_widgets.widgets.containers.main_window.addons.notification_center.notification_banner import (
BECNotificationBroker,
@@ -31,17 +31,12 @@ from bec_widgets.widgets.containers.main_window.addons.notification_center.notif
from bec_widgets.widgets.containers.main_window.addons.scroll_label import ScrollLabel
from bec_widgets.widgets.containers.main_window.addons.web_links import BECWebLinksMixin
from bec_widgets.widgets.progress.scan_progressbar.scan_progressbar import ScanProgressBar
from bec_widgets.widgets.utility.widget_hierarchy_tree.widget_hierarchy_tree import (
WidgetHierarchyDialog,
)
MODULE_PATH = os.path.dirname(bec_widgets.__file__)
# Ensure the application does not use the native menu bar on macOS to be consistent with linux development.
QApplication.setAttribute(Qt.ApplicationAttribute.AA_DontUseNativeMenuBar, True)
logger = bec_logger.logger
class BECMainWindow(BECWidget, QMainWindow):
RPC = True
@@ -54,7 +49,6 @@ class BECMainWindow(BECWidget, QMainWindow):
self.app = QApplication.instance()
self.status_bar = self.statusBar()
self._launcher_window = None
self.setWindowTitle(window_title)
self.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose, True)
@@ -63,7 +57,6 @@ class BECMainWindow(BECWidget, QMainWindow):
self.notification_broker = BECNotificationBroker(parent=self)
self._nc_margin = 16
self._position_notification_centre()
self._widget_hierarchy_dialog: WidgetHierarchyDialog | None = None
# Init ui
self._init_ui()
@@ -122,14 +115,11 @@ class BECMainWindow(BECWidget, QMainWindow):
Prepare the BEC specific widgets in the status bar.
"""
# Left: AppID label
self._app_id_label = QLabel()
self._app_id_label.setAlignment(
Qt.AlignmentFlag.AlignHCenter | Qt.AlignmentFlag.AlignVCenter
)
self.status_bar.addWidget(self._app_id_label)
# Left: Beamline condition status toolbar (auto-fetches all conditions)
self._status_toolbar = StatusToolBar(parent=self, names=None)
self.status_bar.addWidget(self._status_toolbar)
# Add a separator after the app ID label
# Add a separator after the status toolbar
self._add_separator()
# Centre: Clientinfo label (stretch=1 so it expands)
@@ -196,18 +186,14 @@ class BECMainWindow(BECWidget, QMainWindow):
def _add_scan_progress_bar(self):
# Setting HoverWidget for the scan progress bar - minimal and full version
self._scan_progress_bar_simple = ScanProgressBar(
self, one_line_design=True, rpc_exposed=False, rpc_passthrough_children=False
)
self._scan_progress_bar_simple = ScanProgressBar(self, one_line_design=True)
self._scan_progress_bar_simple.show_elapsed_time = False
self._scan_progress_bar_simple.show_remaining_time = False
self._scan_progress_bar_simple.show_source_label = False
self._scan_progress_bar_simple.progressbar.label_template = ""
self._scan_progress_bar_simple.progressbar.setFixedHeight(self.SCAN_PROGRESS_HEIGHT)
self._scan_progress_bar_simple.progressbar.setFixedWidth(self.SCAN_PROGRESS_WIDTH)
self._scan_progress_bar_full = ScanProgressBar(
self, rpc_exposed=False, rpc_passthrough_children=False
)
self._scan_progress_bar_full = ScanProgressBar(self)
self._scan_progress_hover = HoverWidget(
self, simple=self._scan_progress_bar_simple, full=self._scan_progress_bar_full
)
@@ -265,7 +251,7 @@ class BECMainWindow(BECWidget, QMainWindow):
self.ui = loader.loader(ui_file)
self.setCentralWidget(self.ui)
def fetch_theme(self) -> str:
def _fetch_theme(self) -> str:
return self.app.theme.theme
def _get_launcher_from_qapp(self):
@@ -286,16 +272,6 @@ class BECMainWindow(BECWidget, QMainWindow):
Show the launcher if it exists.
"""
launcher = self._get_launcher_from_qapp()
if launcher is None:
from bec_widgets.applications.launch_window import LaunchWindow
cli_server = getattr(self.bec_dispatcher, "cli_server", None)
if cli_server is None:
logger.warning("Cannot open launcher: CLI server is not available.")
return
launcher = LaunchWindow(gui_id=f"{cli_server.gui_id}:launcher")
launcher.setAttribute(Qt.WA_ShowWithoutActivating) # type: ignore[arg-type]
self._launcher_window = launcher
if launcher:
launcher.show()
launcher.activateWindow()
@@ -333,11 +309,6 @@ class BECMainWindow(BECWidget, QMainWindow):
light_theme_action.triggered.connect(lambda: self.change_theme("light"))
dark_theme_action.triggered.connect(lambda: self.change_theme("dark"))
theme_menu.addSeparator()
widget_tree_action = QAction("Show Widget Hierarchy", self)
widget_tree_action.triggered.connect(self._show_widget_hierarchy_dialog)
theme_menu.addAction(widget_tree_action)
# Set the default theme
if hasattr(self.app, "theme") and self.app.theme:
theme_name = self.app.theme.theme.lower()
@@ -367,13 +338,27 @@ class BECMainWindow(BECWidget, QMainWindow):
help_menu.addAction(bec_docs)
help_menu.addAction(widgets_docs)
help_menu.addAction(bug_report)
help_menu.addSeparator()
self._app_id_action = QAction(self)
self._app_id_action.triggered.connect(self._copy_app_id_to_clipboard)
help_menu.addAction(self._app_id_action)
def _copy_app_id_to_clipboard(self):
"""
Copy the app ID to the clipboard.
"""
if self.bec_dispatcher.cli_server is not None:
server_id = self.bec_dispatcher.cli_server.gui_id
clipboard = QApplication.clipboard()
clipboard.setText(server_id)
################################################################################
# Status Bar Addons
################################################################################
def display_app_id(self):
"""
Display the app ID in the status bar.
Display the app ID in the Help menu.
"""
if self.bec_dispatcher.cli_server is None:
status_message = "Not connected"
@@ -381,7 +366,8 @@ class BECMainWindow(BECWidget, QMainWindow):
# Get the server ID from the dispatcher
server_id = self.bec_dispatcher.cli_server.gui_id
status_message = f"App ID: {server_id}"
self._app_id_label.setText(status_message)
if hasattr(self, "_app_id_action"):
self._app_id_action.setText(status_message)
@SafeSlot(dict, dict)
def display_client_message(self, msg: dict, meta: dict):
@@ -421,23 +407,7 @@ class BECMainWindow(BECWidget, QMainWindow):
return True
return super().event(event)
def _show_widget_hierarchy_dialog(self):
if self._widget_hierarchy_dialog is None:
dialog = WidgetHierarchyDialog(root_widget=None, parent=self)
dialog.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose, True)
dialog.destroyed.connect(lambda: setattr(self, "_widget_hierarchy_dialog", None))
self._widget_hierarchy_dialog = dialog
self._widget_hierarchy_dialog.refresh()
self._widget_hierarchy_dialog.show()
self._widget_hierarchy_dialog.raise_()
self._widget_hierarchy_dialog.activateWindow()
def cleanup(self):
# Widget hierarchy dialog cleanup
if self._widget_hierarchy_dialog is not None:
self._widget_hierarchy_dialog.close()
self._widget_hierarchy_dialog = None
# Timer cleanup
if hasattr(self, "_client_info_expire_timer") and self._client_info_expire_timer.isActive():
self._client_info_expire_timer.stop()
@@ -0,0 +1,3 @@
from .positioner_box_base import PositionerBoxBase
__ALL__ = ["PositionerBoxBase"]
@@ -14,10 +14,10 @@ from qtpy.QtWidgets import (
QLineEdit,
QPushButton,
QVBoxLayout,
QWidget,
)
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.compact_popup import CompactPopupWidget
from bec_widgets.widgets.control.device_control.position_indicator.position_indicator import (
PositionIndicator,
)
@@ -43,7 +43,7 @@ class DeviceUpdateUIComponents(TypedDict):
units: QLabel
class PositionerBoxBase(BECWidget, QWidget):
class PositionerBoxBase(BECWidget, CompactPopupWidget):
"""Contains some core logic for positioner box widgets"""
current_path = ""
@@ -57,10 +57,7 @@ class PositionerBoxBase(BECWidget, QWidget):
parent: The parent widget.
device (Positioner): The device to control.
"""
super().__init__(parent=parent, **kwargs)
self.main_layout = QVBoxLayout(self)
self.main_layout.setContentsMargins(0, 0, 0, 0)
self.main_layout.setSpacing(0)
super().__init__(parent=parent, layout=QVBoxLayout, **kwargs)
self._dialog = None
self.get_bec_shortcuts()
@@ -126,7 +123,7 @@ class PositionerBoxBase(BECWidget, QWidget):
queue="emergency",
metadata={"RID": request_id, "response": False},
)
self.client.connector.send(MessageEndpoints.scan_queue_request(self.client.username), msg)
self.client.connector.send(MessageEndpoints.scan_queue_request(), msg)
# pylint: disable=unused-argument
def _on_device_readback(
@@ -176,9 +173,11 @@ class PositionerBoxBase(BECWidget, QWidget):
if is_moving:
spinner.start()
spinner.setToolTip("Device is moving")
self.set_global_state("warning")
else:
spinner.stop()
spinner.setToolTip("Device is idle")
self.set_global_state("success")
else:
spinner.setVisible(False)
@@ -197,8 +196,9 @@ class PositionerBoxBase(BECWidget, QWidget):
pos = (readback_val - limits[0]) / (limits[1] - limits[0])
position_indicator.set_value(pos)
@staticmethod
def _update_limits_ui(limits: tuple[float, float], position_indicator, setpoint_validator):
def _update_limits_ui(
self, limits: tuple[float, float], position_indicator, setpoint_validator
):
if limits is not None and limits[0] != limits[1]:
position_indicator.setToolTip(f"Min: {limits[0]}, Max: {limits[1]}")
setpoint_validator.setRange(limits[0], limits[1])
@@ -223,9 +223,8 @@ class PositionerBoxBase(BECWidget, QWidget):
self.bec_dispatcher.disconnect_slot(slot, MessageEndpoints.device_readback(old_device))
self.bec_dispatcher.connect_slot(slot, MessageEndpoints.device_readback(new_device))
@staticmethod
def _toggle_enable_buttons(ui: DeviceUpdateUIComponents, enable: bool) -> None:
"""Toggle enable/disable on available buttons
def _toggle_enable_buttons(self, ui: DeviceUpdateUIComponents, enable: bool) -> None:
"""Toogle enable/disable on available buttons
Args:
enable (bool): Enable buttons
@@ -11,12 +11,12 @@ from qtpy.QtCore import Qt, Signal
from qtpy.QtGui import QDoubleValidator
from qtpy.QtWidgets import QDoubleSpinBox
from bec_widgets.utils import UILoader
from bec_widgets.utils.colors import apply_theme, get_accent_colors
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.utils.ui_loader import UILoader
from bec_widgets.widgets.control.device_control.positioner_box.positioner_box_base import (
from bec_widgets.widgets.control.device_control.positioner_box._base import PositionerBoxBase
from bec_widgets.widgets.control.device_control.positioner_box._base.positioner_box_base import (
DeviceUpdateUIComponents,
PositionerBoxBase,
)
logger = bec_logger.logger
@@ -63,10 +63,10 @@ class PositionerBox(PositionerBoxBase):
self.ui = UILoader(self).loader(os.path.join(self.current_path, self.ui_file))
self.main_layout.addWidget(self.ui)
self.main_layout.setSpacing(0)
self.main_layout.setContentsMargins(0, 0, 0, 0)
self.main_layout.setAlignment(Qt.AlignmentFlag.AlignVCenter | Qt.AlignmentFlag.AlignHCenter)
self.addWidget(self.ui)
self.layout.setSpacing(0)
self.layout.setContentsMargins(0, 0, 0, 0)
self.layout.setAlignment(Qt.AlignmentFlag.AlignVCenter | Qt.AlignmentFlag.AlignHCenter)
ui_min_size = self.ui.minimumSize()
ui_min_hint = self.ui.minimumSizeHint()
self.setMinimumSize(
@@ -115,6 +115,8 @@ class PositionerBox(PositionerBoxBase):
return
old_device = self._device
self._device = value
if not self.label:
self.label = value
self.device_changed.emit(old_device, value)
@SafeProperty(bool)
@@ -12,12 +12,12 @@ from qtpy.QtCore import Signal
from qtpy.QtGui import QDoubleValidator
from qtpy.QtWidgets import QDoubleSpinBox
from bec_widgets.utils import UILoader
from bec_widgets.utils.colors import apply_theme
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.utils.ui_loader import UILoader
from bec_widgets.widgets.control.device_control.positioner_box.positioner_box_base import (
from bec_widgets.widgets.control.device_control.positioner_box._base import PositionerBoxBase
from bec_widgets.widgets.control.device_control.positioner_box._base.positioner_box_base import (
DeviceUpdateUIComponents,
PositionerBoxBase,
)
logger = bec_logger.logger
@@ -96,9 +96,9 @@ class PositionerBox2D(PositionerBoxBase):
def connect_ui(self):
"""Connect the UI components to signals, data, or routines"""
self.main_layout.addWidget(self.ui)
self.main_layout.setSpacing(0)
self.main_layout.setContentsMargins(0, 0, 0, 0)
self.addWidget(self.ui)
self.layout.setSpacing(0)
self.layout.setContentsMargins(0, 0, 0, 0)
def _init_ui(val: QDoubleValidator, device_id: DeviceId):
ui = self._device_ui_components_hv(device_id)
@@ -200,6 +200,7 @@ class PositionerBox2D(PositionerBoxBase):
return
old_device = self._device_hor
self._device_hor = value
self.label = f"{self._device_hor}, {self._device_ver}"
self.device_changed_hor.emit(old_device, value)
self._init_device(self.device_hor, self.position_update_hor.emit, self.update_limits_hor)
@@ -219,6 +220,7 @@ class PositionerBox2D(PositionerBoxBase):
return
old_device = self._device_ver
self._device_ver = value
self.label = f"{self._device_hor}, {self._device_ver}"
self.device_changed_ver.emit(old_device, value)
self._init_device(self.device_ver, self.position_update_ver.emit, self.update_limits_ver)
@@ -1,8 +1,6 @@
import os
from bec_lib.device import Positioner
from qtpy.QtCore import Qt
from qtpy.QtWidgets import QSizePolicy
from bec_widgets.widgets.control.device_control.positioner_box import PositionerBox
@@ -24,82 +22,7 @@ class PositionerControlLine(PositionerBox):
device (Positioner): The device to control.
"""
self.current_path = os.path.dirname(__file__)
self._indicator_switch_width = 0
self._horizontal_indicator_width = 0
self._vertical_indicator_width = 15
self._indicator_thickness = 10
self._indicator_is_horizontal = False
self._line_height = self.dimensions[0]
super().__init__(parent=parent, device=device, *args, **kwargs)
self._configure_line_layout()
self._update_indicator_orientation()
def _configure_line_layout(self):
device_box = self.ui.device_box
indicator = self.ui.position_indicator
self.main_layout.setAlignment(Qt.AlignmentFlag(0))
self.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed)
self.ui.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed)
device_box.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed)
self._line_height = max(
self.dimensions[0],
self.ui.minimumSizeHint().height(),
self.ui.sizeHint().height(),
device_box.minimumSizeHint().height(),
device_box.sizeHint().height(),
)
device_box.setFixedHeight(self._line_height)
device_box.setMinimumWidth(self.dimensions[1])
device_box.setMaximumWidth(16777215)
self.setFixedHeight(self._line_height)
self.setMinimumWidth(self.dimensions[1])
self.ui.verticalLayout.setContentsMargins(0, 0, 0, 0)
self.ui.verticalLayout.setSpacing(0)
self.ui.readback.setMaximumWidth(16777215)
self.ui.setpoint.setMaximumWidth(16777215)
self.ui.step_size.setMaximumWidth(16777215)
indicator_hint = indicator.minimumSizeHint()
step_hint = self.ui.step_size.sizeHint()
self._indicator_thickness = max(indicator_hint.height(), 10)
self._vertical_indicator_width = max(indicator.minimumWidth(), 15)
self._horizontal_indicator_width = max(90, step_hint.width())
base_width = max(device_box.minimumSizeHint().width(), self.dimensions[1])
self._indicator_switch_width = (
base_width - self._vertical_indicator_width + self._horizontal_indicator_width
)
def resizeEvent(self, event):
super().resizeEvent(event)
self._update_indicator_orientation()
def _update_indicator_orientation(self):
if not hasattr(self, "ui"):
return
indicator = self.ui.position_indicator
available_width = self.ui.device_box.width() or self.width() or self.dimensions[1]
should_use_horizontal = available_width >= self._indicator_switch_width
if should_use_horizontal == self._indicator_is_horizontal:
return
self._indicator_is_horizontal = should_use_horizontal
indicator.vertical = not should_use_horizontal
if should_use_horizontal:
indicator.setMinimumSize(self._horizontal_indicator_width, self._indicator_thickness)
indicator.setMaximumHeight(self._indicator_thickness)
indicator.setMaximumWidth(16777215)
indicator.setSizePolicy(QSizePolicy.Policy.MinimumExpanding, QSizePolicy.Policy.Fixed)
else:
indicator.setMinimumSize(self._vertical_indicator_width, self._indicator_thickness)
indicator.setMaximumSize(self._vertical_indicator_width, 16777215)
indicator.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Preferred)
indicator.updateGeometry()
if __name__ == "__main__": # pragma: no cover
@@ -2,18 +2,12 @@
<ui version="4.0">
<class>Form</class>
<widget class="QWidget" name="Form">
<property name="sizePolicy">
<sizepolicy hsizetype="Expanding" vsizetype="Fixed">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>592</width>
<height>76</height>
<width>612</width>
<height>91</height>
</rect>
</property>
<property name="minimumSize">
@@ -32,29 +26,8 @@
<string>Form</string>
</property>
<layout class="QVBoxLayout" name="verticalLayout">
<property name="spacing">
<number>0</number>
</property>
<property name="leftMargin">
<number>0</number>
</property>
<property name="topMargin">
<number>0</number>
</property>
<property name="rightMargin">
<number>0</number>
</property>
<property name="bottomMargin">
<number>0</number>
</property>
<item>
<widget class="QGroupBox" name="device_box">
<property name="sizePolicy">
<sizepolicy hsizetype="Expanding" vsizetype="Fixed">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="title">
<string>Device Name</string>
</property>
@@ -254,12 +227,12 @@
<customwidgets>
<customwidget>
<class>PositionIndicator</class>
<extends></extends>
<extends>QWidget</extends>
<header>position_indicator</header>
</customwidget>
<customwidget>
<class>SpinnerWidget</class>
<extends></extends>
<extends>QWidget</extends>
<header>spinner_widget</header>
</customwidget>
</customwidgets>
@@ -27,13 +27,30 @@ class PositionerGroupBox(QGroupBox):
self.layout().setContentsMargins(0, 0, 0, 0)
self.layout().setSpacing(0)
self.widget = PositionerBox(self, dev_name)
self.widget.compact_view = True
self.widget.expand_popup = False
self.layout().addWidget(self.widget)
self.widget.position_update.connect(self._on_position_update)
self.widget.expand.connect(self._on_expand)
self.setTitle(self.device_name)
self.widget.force_update_readback()
def _on_expand(self, expand):
if expand:
self.setTitle("")
self.setFlat(True)
else:
self.setTitle(self.device_name)
self.setFlat(False)
def _on_position_update(self, pos: float):
self.position_update.emit(pos)
precision = getattr(self.widget.dev[self.widget.device], "precision", 8)
try:
precision = int(precision)
except (TypeError, ValueError):
precision = int(8)
self.widget.label = f"{pos:.{precision}f}"
def close(self):
self.widget.close()
@@ -7,7 +7,7 @@ from bec_lib.device import Signal as BECSignal
from bec_lib.logger import bec_logger
from pydantic import field_validator
from bec_widgets.utils.bec_connector import ConnectionConfig
from bec_widgets.utils import ConnectionConfig
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.utils.filter_io import FilterIO
@@ -3,7 +3,7 @@ from bec_lib.device import Signal
from bec_lib.logger import bec_logger
from qtpy.QtCore import Property
from bec_widgets.utils.bec_connector import ConnectionConfig
from bec_widgets.utils import ConnectionConfig
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.filter_io import FilterIO
@@ -266,7 +266,6 @@ class DeviceSignalInputBase(BECWidget):
Args:
device(str): Device to validate.
raise_on_false(bool): Raise ValueError if device is not found.
"""
if device in self.dev:
return True
@@ -1,7 +1,7 @@
import json
from typing import Any, Callable, Generator, Iterable, TypeVar
from bec_lib.utils.json_extended import ExtendedEncoder
from bec_lib.utils.json import ExtendedEncoder
from qtpy.QtCore import QByteArray, QMimeData, QObject, Signal # type: ignore
from qtpy.QtWidgets import QListWidgetItem
@@ -51,7 +51,8 @@ class _DeviceEntryWidget(QFrame):
self.setToolTip(self._rich_text())
def _rich_text(self):
return dedent(f"""
return dedent(
f"""
<b><u><h2> {self._device_spec.name}: </h2></u></b>
<table>
<tr><td> description: </td><td><i> {self._device_spec.description} </i></td></tr>
@@ -59,7 +60,8 @@ class _DeviceEntryWidget(QFrame):
<tr><td> enabled: </td><td><i> {self._device_spec.enabled} </i></td></tr>
<tr><td> read only: </td><td><i> {self._device_spec.readOnly} </i></td></tr>
</table>
""")
"""
)
def setup_title_layout(self, device_spec: HashableDevice):
self._title_layout = QHBoxLayout()
@@ -13,7 +13,7 @@ if TYPE_CHECKING:
from .available_device_group import AvailableDeviceGroup
class _DeviceListWidget(QListWidget):
class _DeviceListWiget(QListWidget):
def _item_iter(self):
return (self.item(i) for i in range(self.count()))
@@ -44,7 +44,7 @@ class Ui_AvailableDeviceGroup(object):
self.n_included.setObjectName("n_included")
title_layout.addWidget(self.n_included)
self.device_list = _DeviceListWidget(AvailableDeviceGroup)
self.device_list = _DeviceListWiget(AvailableDeviceGroup)
self.device_list.setSelectionMode(QListWidget.SelectionMode.ExtendedSelection)
self.device_list.setObjectName("device_list")
self.device_list.setFrameStyle(0)
@@ -34,13 +34,13 @@ class HashModel(str, Enum):
class DeviceResourceBackend(Protocol):
@property
def tag_groups(self) -> dict[str, set[HashableDevice]]:
"""A dictionary of all available devices separated by tag groups. The same device may
"""A dictionary of all availble devices separated by tag groups. The same device may
appear more than once (in different groups)."""
...
@property
def all_devices(self) -> set[HashableDevice]:
"""A set of all available devices. The same device may not appear more than once."""
"""A set of all availble devices. The same device may not appear more than once."""
...
@property
@@ -5,8 +5,9 @@ in DeviceTableRow entries.
from __future__ import annotations
import traceback
from copy import deepcopy
from typing import TYPE_CHECKING, Any, Callable, Iterable, Tuple
from typing import TYPE_CHECKING, Any, Callable, Iterable, Literal, Tuple
from bec_lib.atlas_models import Device as DeviceModel
from bec_lib.callback_handler import EventType
@@ -18,7 +19,6 @@ from thefuzz import fuzz
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.fuzzy_search import is_match
from bec_widgets.widgets.control.device_manager.components.device_table.device_table_row import (
DeviceTableRow,
)
@@ -37,6 +37,34 @@ _DeviceCfgIter = Iterable[dict[str, Any]]
# DeviceValidationResult: device_config, config_status, connection_status, error_message
_ValidationResultIter = Iterable[Tuple[dict[str, Any], ConfigStatus, ConnectionStatus, str]]
FUZZY_SEARCH_THRESHOLD = 80
def is_match(
text: str, row_data: dict[str, Any], relevant_keys: list[str], enable_fuzzy: bool
) -> bool:
"""
Check if the text matches any of the relevant keys in the row data.
Args:
text (str): The text to search for.
row_data (dict[str, Any]): The row data to search in.
relevant_keys (list[str]): The keys to consider for searching.
enable_fuzzy (bool): Whether to use fuzzy matching.
Returns:
bool: True if a match is found, False otherwise.
"""
for key in relevant_keys:
data = str(row_data.get(key, "") or "")
if enable_fuzzy:
match_ratio = fuzz.partial_ratio(text.lower(), data.lower())
if match_ratio >= FUZZY_SEARCH_THRESHOLD:
return True
else:
if text.lower() in data.lower():
return True
return False
class TableSortOnHold:
"""Context manager for putting table sorting on hold. Works with nested calls."""
@@ -9,7 +9,6 @@ from qtpy.QtGui import QColor
from qtpy.QtWidgets import (
QApplication,
QComboBox,
QGroupBox,
QHBoxLayout,
QLabel,
QPushButton,
@@ -19,7 +18,7 @@ from qtpy.QtWidgets import (
QWidget,
)
from bec_widgets.utils.bec_connector import ConnectionConfig
from bec_widgets.utils import ConnectionConfig
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import apply_theme, get_accent_colors
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
@@ -172,13 +171,7 @@ class ScanControl(BECWidget, QWidget):
self.layout.addStretch()
def _add_metadata_form(self):
# Wrap metadata form in a group box
self._metadata_group = QGroupBox("Scan Metadata", self)
self._metadata_group.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Fixed)
metadata_layout = QVBoxLayout(self._metadata_group)
metadata_layout.addWidget(self._metadata_form)
self.layout.addWidget(self._metadata_group)
self.layout.addWidget(self._metadata_form)
self._metadata_form.update_with_new_scan(self.comboBox_scan_selection.currentText())
self.scan_selected.connect(self._metadata_form.update_with_new_scan)
self._metadata_form.form_data_updated.connect(self.update_scan_metadata)
@@ -347,14 +347,14 @@ class ScanGroupBox(QGroupBox):
def get_parameters(self, device_object: bool = True):
"""
Returns the parameters from the widgets in the scan control layout formatted to run scan from BEC.
Returns the parameters from the widgets in the scan control layout formated to run scan from BEC.
"""
if self.box_type == "args":
return self._get_arg_parameters(device_object=device_object)
return self._get_arg_parameterts(device_object=device_object)
elif self.box_type == "kwargs":
return self._get_kwarg_parameters(device_object=device_object)
def _get_arg_parameters(self, device_object: bool = True):
def _get_arg_parameterts(self, device_object: bool = True):
args = []
for i in range(1, self.layout.rowCount()):
for j in range(self.layout.columnCount()):
@@ -2,19 +2,22 @@
from bec_lib.logger import bec_logger
from qtpy.QtCore import Property, Signal, Slot
from qtpy.QtWidgets import QComboBox
from qtpy.QtWidgets import QComboBox, QVBoxLayout, QWidget
from bec_widgets.utils.bec_widget import BECWidget
logger = bec_logger.logger
class DapComboBox(BECWidget, QComboBox):
class DapComboBox(BECWidget, QWidget):
"""
Editable combobox listing the available DAP models.
The DAPComboBox widget is an extension to the QComboBox with all avaialble DAP model from BEC.
The widget behaves as a plain QComboBox and keeps ``fit_model_combobox`` as an alias to itself
for backwards compatibility with older call sites.
Args:
parent: Parent widget.
client: BEC client object.
gui_id: GUI ID.
default: Default device name.
"""
ICON_NAME = "data_exploration"
@@ -42,20 +45,19 @@ class DapComboBox(BECWidget, QComboBox):
**kwargs,
):
super().__init__(parent=parent, client=client, gui_id=gui_id, **kwargs)
self.fit_model_combobox = self # Just for backwards compatibility with older call sites, the widget itself is the combobox
self._available_models: list[str] = []
self.layout = QVBoxLayout(self)
self.fit_model_combobox = QComboBox(self)
self.layout.addWidget(self.fit_model_combobox)
self.layout.setContentsMargins(0, 0, 0, 0)
self._available_models = None
self._x_axis = None
self._y_axis = None
self._is_valid_input = False
self.setEditable(True)
self.populate_fit_model_combobox()
self.currentTextChanged.connect(self._on_text_changed)
self.fit_model_combobox.currentTextChanged.connect(self._update_current_fit)
# Set default fit model
self.select_default_fit(default_fit)
self.check_validity(self.currentText())
def select_default_fit(self, default_fit: str | None = "GaussianModel"):
def select_default_fit(self, default_fit: str | None):
"""Set the default fit model.
Args:
@@ -63,8 +65,8 @@ class DapComboBox(BECWidget, QComboBox):
"""
if self._validate_dap_model(default_fit):
self.select_fit_model(default_fit)
elif self.available_models:
self.select_fit_model(self.available_models[0])
else:
self.select_fit_model("GaussianModel")
@property
def available_models(self):
@@ -112,40 +114,12 @@ class DapComboBox(BECWidget, QComboBox):
self._y_axis = y_axis
self.y_axis_updated.emit(y_axis)
@Slot(str)
def _on_text_changed(self, fit_name: str):
"""
Validate and emit updates for the current text.
Args:
fit_name(str): The current text in the combobox, representing the selected fit model.
"""
self.check_validity(fit_name)
if not self._is_valid_input:
return
def _update_current_fit(self, fit_name: str):
"""Update the current fit."""
self.fit_model_updated.emit(fit_name)
if self.x_axis is not None and self.y_axis is not None:
self.new_dap_config.emit(self._x_axis, self._y_axis, fit_name)
@Slot(str)
def check_validity(self, fit_name: str):
"""
Highlight invalid manual entries similarly to DeviceComboBox.
Args:
fit_name(str): The current text in the combobox, representing the selected fit model.
"""
if self._validate_dap_model(fit_name):
self._is_valid_input = True
self.setStyleSheet("border: 1px solid transparent;")
else:
self._is_valid_input = False
if self.isEnabled():
self.setStyleSheet("border: 1px solid red;")
else:
self.setStyleSheet("border: 1px solid transparent;")
@Slot(str)
def select_x_axis(self, x_axis: str):
"""Slot to update the x axis.
@@ -154,7 +128,7 @@ class DapComboBox(BECWidget, QComboBox):
x_axis(str): X axis.
"""
self.x_axis = x_axis
self._on_text_changed(self.currentText())
self._update_current_fit(self.fit_model_combobox.currentText())
@Slot(str)
def select_y_axis(self, y_axis: str):
@@ -164,26 +138,25 @@ class DapComboBox(BECWidget, QComboBox):
y_axis(str): Y axis.
"""
self.y_axis = y_axis
self._on_text_changed(self.currentText())
self._update_current_fit(self.fit_model_combobox.currentText())
@Slot(str)
def select_fit_model(self, fit_name: str | None):
"""Slot to update the fit model.
Args:
fit_name(str): Fit model name.
default_device(str): Default device name.
"""
if not self._validate_dap_model(fit_name):
raise ValueError(f"Fit {fit_name} is not valid.")
self.setCurrentText(fit_name)
self.fit_model_combobox.setCurrentText(fit_name)
def populate_fit_model_combobox(self):
"""Populate the fit_model_combobox with the devices."""
# pylint: disable=protected-access
available_plugins = getattr(getattr(self.client, "dap", None), "_available_dap_plugins", {})
self.available_models = [model for model in available_plugins.keys()]
self.clear()
self.addItems(self.available_models)
self.available_models = [model for model in self.client.dap._available_dap_plugins.keys()]
self.fit_model_combobox.clear()
self.fit_model_combobox.addItems(self.available_models)
def _validate_dap_model(self, model: str | None) -> bool:
"""Validate the DAP model.
@@ -193,23 +166,23 @@ class DapComboBox(BECWidget, QComboBox):
"""
if model is None:
return False
return model in self.available_models
@property
def is_valid_input(self) -> bool:
"""Whether the current text matches an available DAP model."""
return self._is_valid_input
if model not in self.available_models:
return False
return True
if __name__ == "__main__": # pragma: no cover
import sys
# pylint: disable=import-outside-toplevel
from qtpy.QtWidgets import QApplication
from bec_widgets.utils.colors import apply_theme
app = QApplication(sys.argv)
app = QApplication([])
apply_theme("dark")
dialog = DapComboBox()
dialog.show()
sys.exit(app.exec_())
widget = QWidget()
widget.setFixedSize(200, 200)
layout = QVBoxLayout()
widget.setLayout(layout)
layout.addWidget(DapComboBox())
widget.show()
app.exec_()
@@ -1,14 +1,13 @@
import os
import shiboken6
from bec_lib.logger import bec_logger
from qtpy.QtCore import Signal
from qtpy.QtWidgets import QPushButton, QSizePolicy, QTreeWidgetItem, QVBoxLayout, QWidget
from qtpy.QtWidgets import QPushButton, QTreeWidgetItem, QVBoxLayout, QWidget
from bec_widgets.utils import UILoader
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.utils.ui_loader import UILoader
logger = bec_logger.logger
@@ -35,7 +34,7 @@ class LMFitDialog(BECWidget, QWidget):
**kwargs,
):
"""
Initializes the LMFitDialog widget.
Initialises the LMFitDialog widget.
Args:
parent (QWidget): The parent widget.
@@ -69,27 +68,6 @@ class LMFitDialog(BECWidget, QWidget):
self._hide_curve_selection = False
self._hide_summary = False
self._hide_parameters = False
self._configure_embedded_size_policy()
def _configure_embedded_size_policy(self):
"""Allow the compact dialog to shrink more gracefully in embedded layouts."""
if self._ui_file != "lmfit_dialog_compact.ui":
return
self.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Preferred)
self.ui.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Preferred)
for group in (
self.ui.group_curve_selection,
self.ui.group_summary,
self.ui.group_parameters,
):
group.setMinimumHeight(0)
group.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Preferred)
for view in (self.ui.curve_list, self.ui.summary_tree, self.ui.param_tree):
view.setMinimumHeight(0)
view.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Ignored)
@property
def enable_actions(self) -> bool:
@@ -99,14 +77,8 @@ class LMFitDialog(BECWidget, QWidget):
@enable_actions.setter
def enable_actions(self, enable: bool):
self._enable_actions = enable
valid_buttons = {}
for name, button in self.action_buttons.items():
# just to be sure we have a valid c++ object
if button is None or not shiboken6.isValid(button):
continue
for button in self.action_buttons.values():
button.setEnabled(enable)
valid_buttons[name] = button
self.action_buttons = valid_buttons
@SafeProperty(list)
def active_action_list(self) -> list[str]:
@@ -117,6 +89,16 @@ class LMFitDialog(BECWidget, QWidget):
def active_action_list(self, actions: list[str]):
self._active_actions = actions
# This SafeSlot needed?
@SafeSlot(bool)
def set_actions_enabled(self, enable: bool) -> bool:
"""SafeSlot to enable the move to buttons.
Args:
enable (bool): Whether to enable the action buttons.
"""
self.enable_actions = enable
@SafeProperty(bool)
def always_show_latest(self):
"""SafeProperty to indicate if always the latest DAP update is displayed."""
@@ -172,21 +154,19 @@ class LMFitDialog(BECWidget, QWidget):
self.ui.group_parameters.setVisible(not show)
@property
def fit_curve_id(self) -> str | None:
def fit_curve_id(self) -> str:
"""SafeProperty for the currently displayed fit curve_id."""
return self._fit_curve_id
@fit_curve_id.setter
def fit_curve_id(self, curve_id: str | None):
def fit_curve_id(self, curve_id: str):
"""Setter for the currently displayed fit curve_id.
Args:
curve_id (str | None): The curve_id of the fit curve to be displayed,
or None to clear the selection.
fit_curve_id (str): The curve_id of the fit curve to be displayed.
"""
self._fit_curve_id = curve_id
if curve_id is not None:
self.selected_fit.emit(curve_id)
self.selected_fit.emit(curve_id)
@SafeSlot(str)
def remove_dap_data(self, curve_id: str):
@@ -196,15 +176,6 @@ class LMFitDialog(BECWidget, QWidget):
curve_id (str): The curve_id of the DAP data to be removed.
"""
self.summary_data.pop(curve_id, None)
if self.fit_curve_id == curve_id:
self.action_buttons = {}
self.ui.summary_tree.clear()
self.ui.param_tree.clear()
remaining = list(self.summary_data.keys())
if remaining:
self.fit_curve_id = remaining[0]
else:
self._fit_curve_id = None
self.refresh_curve_list()
@SafeSlot(str)
@@ -280,7 +251,6 @@ class LMFitDialog(BECWidget, QWidget):
params (list): List of LMFit parameters for the fit curve.
"""
self._move_buttons = []
self.action_buttons = {}
self.ui.param_tree.clear()
for param in params:
param_name = param[0]
@@ -299,16 +269,18 @@ class LMFitDialog(BECWidget, QWidget):
if param_name in self.active_action_list: # pylint: disable=unsupported-membership-test
# Create a push button to move the motor to a specific position
widget = QWidget()
button = QPushButton("Move")
button = QPushButton(f"Move to {param_name}")
button.clicked.connect(self._create_move_action(param_name, param[1]))
if self.enable_actions:
if self.enable_actions is True:
button.setEnabled(True)
else:
button.setEnabled(False)
button.setStyleSheet(f"""
button.setStyleSheet(
f"""
QPushButton:enabled {{ background-color: {self._accent_colors.success.name()};color: white; }}
QPushButton:disabled {{ background-color: grey;color: white; }}
""")
"""
)
self.action_buttons[param_name] = button
layout = QVBoxLayout()
layout.addWidget(self.action_buttons[param_name])
@@ -14,18 +14,6 @@
<string>Form</string>
</property>
<layout class="QGridLayout" name="gridLayout">
<property name="leftMargin">
<number>0</number>
</property>
<property name="topMargin">
<number>0</number>
</property>
<property name="rightMargin">
<number>0</number>
</property>
<property name="bottomMargin">
<number>0</number>
</property>
<item row="0" column="0">
<widget class="QSplitter" name="splitter_2">
<property name="sizePolicy">
@@ -34,6 +22,15 @@
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="frameShape">
<enum>QFrame::Shape::VLine</enum>
</property>
<property name="frameShadow">
<enum>QFrame::Shadow::Plain</enum>
</property>
<property name="lineWidth">
<number>1</number>
</property>
<property name="orientation">
<enum>Qt::Orientation::Horizontal</enum>
</property>
@@ -44,12 +41,6 @@
<bool>true</bool>
</property>
<widget class="QGroupBox" name="group_curve_selection">
<property name="minimumSize">
<size>
<width>120</width>
<height>0</height>
</size>
</property>
<property name="title">
<string>Select Curve</string>
</property>
@@ -67,36 +58,18 @@
</sizepolicy>
</property>
<property name="orientation">
<enum>Qt::Orientation::Horizontal</enum>
<enum>Qt::Orientation::Vertical</enum>
</property>
<widget class="QGroupBox" name="group_summary">
<property name="minimumSize">
<size>
<width>180</width>
<height>0</height>
</size>
</property>
<property name="title">
<string>Fit Summary</string>
</property>
<layout class="QVBoxLayout" name="verticalLayout_2">
<item>
<widget class="QTreeWidget" name="summary_tree">
<property name="alternatingRowColors">
<bool>true</bool>
</property>
<property name="indentation">
<number>0</number>
</property>
<property name="rootIsDecorated">
<bool>false</bool>
</property>
<property name="uniformRowHeights">
<bool>false</bool>
</property>
<attribute name="headerDefaultSectionSize">
<number>90</number>
</attribute>
<column>
<property name="text">
<string>Property</string>
@@ -112,33 +85,12 @@
</layout>
</widget>
<widget class="QGroupBox" name="group_parameters">
<property name="minimumSize">
<size>
<width>240</width>
<height>0</height>
</size>
</property>
<property name="title">
<string>Parameter Details</string>
</property>
<layout class="QVBoxLayout" name="verticalLayout_3">
<item>
<widget class="QTreeWidget" name="param_tree">
<property name="alternatingRowColors">
<bool>true</bool>
</property>
<property name="indentation">
<number>0</number>
</property>
<property name="rootIsDecorated">
<bool>false</bool>
</property>
<property name="columnCount">
<number>4</number>
</property>
<attribute name="headerDefaultSectionSize">
<number>80</number>
</attribute>
<column>
<property name="text">
<string>Parameter</string>
@@ -154,11 +106,6 @@
<string>Std</string>
</property>
</column>
<column>
<property name="text">
<string>Action</string>
</property>
</column>
</widget>
</item>
</layout>
@@ -95,12 +95,6 @@
<height>0</height>
</size>
</property>
<property name="indentation">
<number>0</number>
</property>
<property name="rootIsDecorated">
<bool>false</bool>
</property>
<property name="uniformRowHeights">
<bool>false</bool>
</property>
@@ -153,12 +147,6 @@
<width>0</width>
<height>0</height>
</size>
</property>
<property name="indentation">
<number>0</number>
</property>
<property name="rootIsDecorated">
<bool>false</bool>
</property>
<property name="columnCount">
<number>4</number>
@@ -1,605 +0,0 @@
from __future__ import annotations
import enum
from dataclasses import dataclass, field
from uuid import uuid4
from weakref import WeakValueDictionary
import shiboken6
from bec_lib.logger import bec_logger
from qtpy.QtCore import Qt, Signal
from qtpy.QtGui import QMouseEvent
from qtpy.QtWidgets import (
QApplication,
QHBoxLayout,
QLabel,
QStackedLayout,
QTabWidget,
QVBoxLayout,
QWidget,
)
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.widgets.utility.bec_term.protocol import BecTerminal
from bec_widgets.widgets.utility.bec_term.util import get_current_bec_term_class
logger = bec_logger.logger
_BecTermClass = get_current_bec_term_class()
# Note on definitions:
# Terminal: an instance of a terminal widget with a system shell
# Console: one of possibly several widgets which may share ownership of one single terminal
# Shell: a Console set to start the BEC IPython client in its terminal
class ConsoleMode(str, enum.Enum):
ACTIVE = "active"
INACTIVE = "inactive"
HIDDEN = "hidden"
@dataclass
class _TerminalOwnerInfo:
"""Should be managed only by the BecConsoleRegistry. Consoles should ask the registry for
necessary ownership info."""
owner_console_id: str | None = None
registered_console_ids: set[str] = field(default_factory=set)
instance: BecTerminal | None = None
terminal_id: str = ""
initialized: bool = False
persist_session: bool = False
fallback_holder: QWidget | None = None
class BecConsoleRegistry:
"""
A registry for the BecConsole class to manage its instances.
"""
def __init__(self):
"""
Initialize the registry.
"""
self._consoles: WeakValueDictionary[str, BecConsole] = WeakValueDictionary()
self._terminal_registry: dict[str, _TerminalOwnerInfo] = {}
@staticmethod
def _is_valid_qobject(obj: object | None) -> bool:
return obj is not None and shiboken6.isValid(obj)
def _connect_app_cleanup(self) -> None:
app = QApplication.instance()
if app is None:
return
app.aboutToQuit.connect(self.clear, Qt.ConnectionType.UniqueConnection)
@staticmethod
def _new_terminal_info(console: BecConsole) -> _TerminalOwnerInfo:
term = _BecTermClass()
return _TerminalOwnerInfo(
registered_console_ids={console.console_id},
owner_console_id=console.console_id,
instance=term,
terminal_id=console.terminal_id,
persist_session=console.persist_terminal_session,
)
@staticmethod
def _replace_terminal(info: _TerminalOwnerInfo, console: BecConsole) -> None:
info.instance = _BecTermClass()
info.initialized = False
info.owner_console_id = console.console_id
info.registered_console_ids.add(console.console_id)
info.persist_session = info.persist_session or console.persist_terminal_session
def _delete_terminal_info(self, info: _TerminalOwnerInfo) -> None:
if self._is_valid_qobject(info.instance):
info.instance.deleteLater() # type: ignore[union-attr]
info.instance = None
if self._is_valid_qobject(info.fallback_holder):
info.fallback_holder.deleteLater()
info.fallback_holder = None
def _parking_parent(
self,
info: _TerminalOwnerInfo,
console: BecConsole | None = None,
*,
avoid_console: bool = False,
) -> QWidget | None:
for console_id in info.registered_console_ids:
candidate = self._consoles.get(console_id)
if candidate is None or candidate is console:
continue
if self._is_valid_qobject(candidate):
return candidate._term_holder
if console is None or not self._is_valid_qobject(console):
return None
window = console.window()
if window is not None and window is not console and self._is_valid_qobject(window):
return window
if not avoid_console:
return console._term_holder
return None
def _fallback_holder(
self,
info: _TerminalOwnerInfo,
console: BecConsole | None = None,
*,
avoid_console: bool = False,
) -> QWidget:
if not self._is_valid_qobject(info.fallback_holder):
info.fallback_holder = QWidget(
parent=self._parking_parent(info, console, avoid_console=avoid_console)
)
info.fallback_holder.setObjectName(f"_bec_console_terminal_holder_{info.terminal_id}")
info.fallback_holder.hide()
return info.fallback_holder
def _park_terminal(
self,
info: _TerminalOwnerInfo,
console: BecConsole | None = None,
*,
avoid_console: bool = False,
) -> None:
if not self._is_valid_qobject(info.instance):
return
parent = self._parking_parent(info, console, avoid_console=avoid_console)
if parent is None and info.persist_session:
parent = self._fallback_holder(info, console, avoid_console=avoid_console)
info.instance.hide() # type: ignore[union-attr]
info.instance.setParent(parent) # type: ignore[union-attr]
def clear(self) -> None:
"""Delete every tracked terminal and holder."""
for info in list(self._terminal_registry.values()):
self._delete_terminal_info(info)
self._terminal_registry.clear()
self._consoles.clear()
def register(self, console: BecConsole):
"""
Register an instance of BecConsole. If there is already a terminal with the associated
terminal_id, this does not automatically grant ownership.
Args:
console (BecConsole): The instance to register.
"""
self._connect_app_cleanup()
self._consoles[console.console_id] = console
console_id, terminal_id = console.console_id, console.terminal_id
term_info = self._terminal_registry.get(terminal_id)
if term_info is None:
self._terminal_registry[terminal_id] = self._new_terminal_info(console)
return
term_info.persist_session = term_info.persist_session or console.persist_terminal_session
had_registered_consoles = bool(term_info.registered_console_ids)
term_info.registered_console_ids.add(console_id)
if not self._is_valid_qobject(term_info.instance):
self._replace_terminal(term_info, console)
return
if (
term_info.owner_console_id is not None
and term_info.owner_console_id not in self._consoles
):
term_info.owner_console_id = None
if term_info.owner_console_id is None and not had_registered_consoles:
term_info.owner_console_id = console_id
logger.info(f"Registered new console {console_id} for terminal {terminal_id}")
def unregister(self, console: BecConsole):
"""
Unregister an instance of BecConsole.
Args:
console (BecConsole): The instance to unregister.
"""
console_id, terminal_id = console.console_id, console.terminal_id
if console_id in self._consoles:
del self._consoles[console_id]
if (term_info := self._terminal_registry.get(terminal_id)) is None:
return
detached = console._detach_terminal_widget(term_info.instance)
if console_id in term_info.registered_console_ids:
term_info.registered_console_ids.remove(console_id)
if term_info.owner_console_id == console_id:
term_info.owner_console_id = None
if not term_info.registered_console_ids:
if term_info.persist_session and self._is_valid_qobject(term_info.instance):
self._park_terminal(term_info, console, avoid_console=True)
logger.info(f"Unregistered console {console_id} for terminal {terminal_id}")
return
self._delete_terminal_info(term_info)
del self._terminal_registry[terminal_id]
elif detached:
self._park_terminal(term_info, console, avoid_console=True)
logger.info(f"Unregistered console {console_id} for terminal {terminal_id}")
def is_owner(self, console: BecConsole):
"""Returns true if the given console is the owner of its terminal"""
if console not in self._consoles.values():
return False
if (info := self._terminal_registry.get(console.terminal_id)) is None:
logger.warning(f"Console {console.console_id} references an unknown terminal!")
return False
if not self._is_valid_qobject(info.instance):
return False
return info.owner_console_id == console.console_id
def take_ownership(self, console: BecConsole) -> BecTerminal | None:
"""
Transfer ownership of a terminal to the given console.
Args:
console: the console which wishes to take ownership of its associated terminal.
Returns:
BecTerminal | None: The instance if ownership transfer was successful, None otherwise.
"""
console_id, terminal_id = console.console_id, console.terminal_id
if terminal_id not in self._terminal_registry:
self.register(console)
instance_info = self._terminal_registry[terminal_id]
if not self._is_valid_qobject(instance_info.instance):
self._replace_terminal(instance_info, console)
if (old_owner_console_ide := instance_info.owner_console_id) is not None:
if (
old_owner_console_ide != console_id
and (old_owner := self._consoles.get(old_owner_console_ide)) is not None
):
old_owner.yield_ownership() # call this on the old owner to make sure it is updated
instance_info.owner_console_id = console_id
instance_info.registered_console_ids.add(console_id)
logger.info(f"Transferred ownership of terminal {terminal_id} to {console_id}")
return instance_info.instance
def try_get_term(self, console: BecConsole) -> BecTerminal | None:
"""
Return the terminal instance if the requesting console is the owner
Args:
console: the requesting console.
Returns:
BecTerminal | None: The instance if the console is the owner, None otherwise.
"""
console_id, terminal_id = console.console_id, console.terminal_id
logger.debug(f"checking term for {console_id}")
if terminal_id not in self._terminal_registry:
logger.warning(f"Terminal {terminal_id} not found in registry")
return None
instance_info = self._terminal_registry[terminal_id]
if not self._is_valid_qobject(instance_info.instance):
if instance_info.owner_console_id == console_id:
self._replace_terminal(instance_info, console)
else:
return None
if instance_info.owner_console_id == console_id:
return instance_info.instance
def yield_ownership(self, console: BecConsole):
"""
Yield ownership of an instance without destroying it. The instance remains in the
registry with no owner, available for another widget to claim.
Args:
console (BecConsole): The console which wishes to yield ownership of its associated terminal.
"""
console_id, terminal_id = console.console_id, console.terminal_id
logger.debug(f"Console {console_id} attempted to yield ownership")
if console_id not in self._consoles or terminal_id not in self._terminal_registry:
return
term_info = self._terminal_registry[terminal_id]
if term_info.owner_console_id != console_id:
logger.debug(f"But it was not the owner, which was {term_info.owner_console_id}!")
return
term_info.owner_console_id = None
console._detach_terminal_widget(term_info.instance)
self._park_terminal(term_info, console)
def should_initialize(self, console: BecConsole) -> bool:
"""Return true if the console should send its startup command to the terminal."""
info = self._terminal_registry.get(console.terminal_id)
if info is None:
return False
return (
info.owner_console_id == console.console_id
and not info.initialized
and self._is_valid_qobject(info.instance)
)
def mark_initialized(self, console: BecConsole) -> None:
info = self._terminal_registry.get(console.terminal_id)
if info is not None and info.owner_console_id == console.console_id:
info.initialized = True
def owner_is_visible(self, term_id: str) -> bool:
"""
Check if the owner of an instance is currently visible.
Args:
term_id (str): The terminal ID to check.
Returns:
bool: True if the owner is visible, False otherwise.
"""
instance_info = self._terminal_registry.get(term_id)
if (
instance_info is None
or instance_info.owner_console_id is None
or not self._is_valid_qobject(instance_info.instance)
):
return False
if (owner := self._consoles.get(instance_info.owner_console_id)) is None:
return False
return owner.isVisible()
_bec_console_registry = BecConsoleRegistry()
class _Overlay(QWidget):
def __init__(self, console: BecConsole):
super().__init__(parent=console)
self._console = console
def mousePressEvent(self, event: QMouseEvent) -> None:
if event.button() == Qt.MouseButton.LeftButton:
self._console.take_terminal_ownership()
event.accept()
return
return super().mousePressEvent(event)
class BecConsole(BECWidget, QWidget):
"""A console widget with access to a shared registry of terminals, such that instances can be moved around."""
_js_callback = Signal(bool)
initialized = Signal()
PLUGIN = True
ICON_NAME = "terminal"
persist_terminal_session = False
def __init__(
self,
parent=None,
config=None,
client=None,
gui_id=None,
startup_cmd: str | None = None,
terminal_id: str | None = None,
**kwargs,
):
super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs)
self._mode = ConsoleMode.INACTIVE
self._startup_cmd = startup_cmd
self._is_initialized = False
self.terminal_id = terminal_id or str(uuid4())
self.console_id = self.gui_id
self.term: BecTerminal | None = None # Will be set in _set_up_instance
self._set_up_instance()
def _set_up_instance(self):
"""
Set up the web instance and UI elements.
"""
self._stacked_layout = QStackedLayout()
# self._stacked_layout.setStackingMode(QStackedLayout.StackingMode.StackAll)
self._term_holder = QWidget()
self._term_layout = QVBoxLayout()
self._term_layout.setContentsMargins(0, 0, 0, 0)
self._term_holder.setLayout(self._term_layout)
self.setLayout(self._stacked_layout)
# prepare overlay
self._overlay = _Overlay(self)
layout = QVBoxLayout(self._overlay)
layout.setAlignment(Qt.AlignmentFlag.AlignCenter)
label = QLabel("Click to activate terminal", self._overlay)
layout.addWidget(label)
self._stacked_layout.addWidget(self._term_holder)
self._stacked_layout.addWidget(self._overlay)
# will create a new terminal instance if there isn't already one for this ID
_bec_console_registry.register(self)
self._infer_mode()
self._ensure_startup_started()
def _infer_mode(self):
self.term = _bec_console_registry.try_get_term(self)
if self.term:
self._set_mode(ConsoleMode.ACTIVE)
elif self.isHidden():
self._set_mode(ConsoleMode.HIDDEN)
else:
self._set_mode(ConsoleMode.INACTIVE)
def _set_mode(self, mode: ConsoleMode):
"""
Set the mode of the web console.
Args:
mode (ConsoleMode): The mode to set.
"""
match mode:
case ConsoleMode.ACTIVE:
if self.term:
if self._term_layout.indexOf(self.term) == -1: # type: ignore[arg-type]
self._term_layout.addWidget(self.term) # type: ignore # BecTerminal is QWidget
self.term.show() # type: ignore[attr-defined]
self._stacked_layout.setCurrentIndex(0)
self._mode = mode
else:
self._stacked_layout.setCurrentIndex(1)
self._mode = ConsoleMode.INACTIVE
case ConsoleMode.INACTIVE:
self._stacked_layout.setCurrentIndex(1)
self._mode = mode
case ConsoleMode.HIDDEN:
self._stacked_layout.setCurrentIndex(1)
self._mode = mode
@property
def startup_cmd(self):
"""
Get the startup command for the web console.
"""
return self._startup_cmd
@startup_cmd.setter
def startup_cmd(self, cmd: str | None):
"""
Set the startup command for the console.
"""
self._startup_cmd = cmd
def write(self, data: str, send_return: bool = True):
"""
Send data to the console
Args:
data (str): The data to send.
send_return (bool): Whether to send a return after the data.
"""
if self.term:
self.term.write(data, send_return)
def _ensure_startup_started(self):
if not self.startup_cmd or not _bec_console_registry.should_initialize(self):
return
self.write(self.startup_cmd, True)
_bec_console_registry.mark_initialized(self)
def _detach_terminal_widget(self, term: BecTerminal | None) -> bool:
if term is None or not BecConsoleRegistry._is_valid_qobject(term):
if self.term is term:
self.term = None
return False
is_child = self.isAncestorOf(term) # type: ignore[arg-type]
if self._term_layout.indexOf(term) != -1: # type: ignore[arg-type]
self._term_layout.removeWidget(term) # type: ignore[arg-type]
is_child = True
if is_child:
term.hide() # type: ignore[attr-defined]
term.setParent(None) # type: ignore[attr-defined]
if self.term is term:
self.term = None
return is_child
def take_terminal_ownership(self):
"""
Take ownership of a web instance from the registry. This will transfer the instance
from its current owner (if any) to this widget.
"""
# Get the instance from registry
self.term = _bec_console_registry.take_ownership(self)
self._infer_mode()
self._ensure_startup_started()
if self._mode == ConsoleMode.ACTIVE:
logger.debug(f"Widget {self.gui_id} took ownership of instance {self.terminal_id}")
def yield_ownership(self):
"""
Yield ownership of the instance. The instance remains in the registry with no owner,
available for another widget to claim. This is automatically called when the
widget becomes hidden.
"""
_bec_console_registry.yield_ownership(self)
self._infer_mode()
if self._mode != ConsoleMode.ACTIVE:
logger.debug(f"Widget {self.gui_id} yielded ownership of instance {self.terminal_id}")
def hideEvent(self, event):
"""Called when the widget is hidden. Automatically yields ownership."""
self.yield_ownership()
super().hideEvent(event)
def showEvent(self, event):
"""Called when the widget is shown. Updates UI state based on ownership."""
super().showEvent(event)
if not _bec_console_registry.is_owner(self):
if not _bec_console_registry.owner_is_visible(self.terminal_id):
self.take_terminal_ownership()
def cleanup(self):
"""Unregister this console on destruction."""
_bec_console_registry.unregister(self)
super().cleanup()
class BECShell(BecConsole):
"""
A BecConsole pre-configured to run the BEC shell.
We cannot simply expose the web console properties to Qt as we need to have a deterministic
startup behavior for sharing the same shell instance across multiple widgets.
"""
ICON_NAME = "hub"
persist_terminal_session = True
def __init__(self, parent=None, config=None, client=None, gui_id=None, **kwargs):
super().__init__(
parent=parent,
config=config,
client=client,
gui_id=gui_id,
terminal_id="bec_shell",
**kwargs,
)
@property
def startup_cmd(self):
"""
Get the startup command for the BEC shell.
"""
if self.bec_dispatcher.cli_server is None:
return "bec --nogui"
return f"bec --gui-id {self.bec_dispatcher.cli_server.gui_id}"
@startup_cmd.setter
def startup_cmd(self, cmd: str | None): ...
if __name__ == "__main__": # pragma: no cover
import sys
app = QApplication(sys.argv)
widget = QTabWidget()
# Create two consoles with different unique_ids
bec_console_1a = BecConsole(startup_cmd="htop", gui_id="console_1_a", terminal_id="terminal_1")
bec_console_1b = BecConsole(startup_cmd="htop", gui_id="console_1_b", terminal_id="terminal_1")
bec_console_1 = QWidget()
bec_console_1_layout = QHBoxLayout(bec_console_1)
bec_console_1_layout.addWidget(bec_console_1a)
bec_console_1_layout.addWidget(bec_console_1b)
bec_console2 = BECShell()
bec_console3 = BecConsole(gui_id="console_3", terminal_id="terminal_1")
widget.addTab(bec_console_1, "Console 1")
widget.addTab(bec_console2, "Console 2 - BEC Shell")
widget.addTab(bec_console3, "Console 3 -- mirror of Console 1")
widget.show()
widget.resize(800, 600)
sys.exit(app.exec_())
@@ -1 +0,0 @@
{'files': ['bec_console.py']}
@@ -1 +0,0 @@
{'files': ['bec_console.py']}
@@ -47,13 +47,15 @@ class BECJupyterConsole(RichJupyterWidget): # pragma: no cover:
)
def _init_bec_kernel(self):
self.execute("""
self.execute(
"""
from bec_ipython_client.main import BECIPythonClient
bec = BECIPythonClient()
bec.start()
dev = bec.device_manager.devices if bec else None
scans = bec.scans if bec else None
""")
"""
)
def _cleanup_bec(self):
if getattr(self, "ipyclient", None) is not None and self.inprocess is True:
@@ -6,7 +6,7 @@ from typing import Any, cast
from bec_lib.logger import bec_logger
from bec_lib.macro_update_handler import has_executable_code
from qtpy.QtCore import Signal
from qtpy.QtCore import QEvent, QTimer, Signal
from qtpy.QtWidgets import QFileDialog, QMessageBox, QToolButton, QWidget
from bec_widgets.widgets.containers.dock_area.basic_dock_area import DockAreaWidget
@@ -36,12 +36,12 @@ class MonacoDock(DockAreaWidget):
**kwargs,
)
self.dock_manager.focusedDockWidgetChanged.connect(self._on_focus_event)
self.dock_manager.installEventFilter(self)
self._last_focused_editor: CDockWidget | None = None
self.focused_editor.connect(self._on_last_focused_editor_changed)
initial_editor = self.add_editor()
if isinstance(initial_editor, CDockWidget):
self.last_focused_editor = initial_editor
self._install_manager_scan_and_fix_guards()
def _create_editor_widget(self) -> MonacoWidget:
"""Create a configured Monaco editor widget."""
@@ -73,8 +73,7 @@ class MonacoDock(DockAreaWidget):
logger.info(f"Editor '{widget.current_file}' has unsaved changes: {widget.get_text()}")
self.save_enabled.emit(widget.modified)
@staticmethod
def _update_tab_title_for_modification(dock: CDockWidget, modified: bool):
def _update_tab_title_for_modification(self, dock: CDockWidget, modified: bool):
"""Update the tab title to show modification status with a dot indicator."""
current_title = dock.windowTitle()
@@ -99,12 +98,14 @@ class MonacoDock(DockAreaWidget):
return
active_sig = signatures[signature.get("activeSignature", 0)]
active_param = signature.get("activeParameter", 0) # TODO: Add highlight for active_param
# Get signature label and documentation
label = active_sig.get("label", "")
doc_obj = active_sig.get("documentation", {})
documentation = doc_obj.get("value", "") if isinstance(doc_obj, dict) else str(doc_obj)
# Format the Markdown output
# Format the markdown output
markdown = f"```python\n{label}\n```\n\n{documentation}"
self.signature_help.emit(markdown)
@@ -155,10 +156,9 @@ class MonacoDock(DockAreaWidget):
if self.last_focused_editor is dock:
self.last_focused_editor = None
# After topology changes, make sure single-tab areas get a plus button
self._scan_and_fix_areas()
QTimer.singleShot(0, self._scan_and_fix_areas)
@staticmethod
def reset_widget(widget: MonacoWidget):
def reset_widget(self, widget: MonacoWidget):
"""
Reset the given Monaco editor widget to its initial state.
@@ -193,23 +193,23 @@ class MonacoDock(DockAreaWidget):
# pylint: disable=protected-access
area._monaco_plus_btn = plus_btn
def _install_manager_scan_and_fix_guards(self) -> None:
"""
Track ADS structural changes to trigger scan and fix of dock areas for plus button injection.
"""
self.dock_manager.dockAreaCreated.connect(self._scan_and_fix_areas)
self.dock_manager.dockWidgetAdded.connect(self._scan_and_fix_areas)
self.dock_manager.stateRestored.connect(self._scan_and_fix_areas)
self.dock_manager.restoringState.connect(self._scan_and_fix_areas)
self.dock_manager.focusedDockWidgetChanged.connect(self._scan_and_fix_areas)
self._scan_and_fix_areas()
def _scan_and_fix_areas(self, *_arg):
def _scan_and_fix_areas(self):
# Find all dock areas under this manager and ensure each single-tab area has a plus button
areas = self.dock_manager.findChildren(CDockAreaWidget)
for a in areas:
self._ensure_area_plus(a)
def eventFilter(self, obj, event):
# Track dock manager events
if obj is self.dock_manager and event.type() in (
QEvent.Type.ChildAdded,
QEvent.Type.ChildRemoved,
QEvent.Type.LayoutRequest,
):
QTimer.singleShot(0, self._scan_and_fix_areas)
return super().eventFilter(obj, event)
def add_editor(
self, area: Any | None = None, title: str | None = None, tooltip: str | None = None
) -> CDockWidget:
@@ -258,7 +258,7 @@ class MonacoDock(DockAreaWidget):
if area_widget is not None:
self._ensure_area_plus(area_widget)
self._scan_and_fix_areas()
QTimer.singleShot(0, self._scan_and_fix_areas)
self.last_focused_editor = dock
return dock
@@ -362,7 +362,8 @@ if __name__ == "__main__": # pragma: no cover
widget.set_language("python")
widget.set_theme("vs-dark")
widget.editor.set_minimap_enabled(False)
widget.set_text("""
widget.set_text(
"""
import numpy as np
from typing import TYPE_CHECKING
@@ -379,7 +380,8 @@ if TYPE_CHECKING:
# This is a comment
def hello_world():
print("Hello, world!")
""")
"""
)
widget.set_highlighted_lines(1, 3)
widget.show()
qapp.exec_()

Some files were not shown because too many files have changed in this diff Show More