Compare commits

..

145 Commits

Author SHA1 Message Date
wyzula_j 7c769d3522 feat(image): modernization of image widget 2026-01-28 23:56:52 +01:00
wyzula_j 91ff057054 fix(device_combobox): public flag for valid input 2026-01-28 23:56:52 +01:00
wyzula_j 2e971e8cc5 feat(waveform): composite DAP with multiple models 2026-01-28 23:56:52 +01:00
wyzula_j cbe970d76a feat(curve, waveform): add dap_parameters for lmfit customization in DAP requests 2026-01-28 23:56:52 +01:00
wyzula_j 697b7a7bee fix(colors): more benevolent fetching of colormap names, avoid hardcoded wrong colormap mapping from GradientWidget from pg 2026-01-28 18:05:12 +01:00
wyzula_j 148b41e238 fix(device_input_widgets): removed RPC access 2026-01-28 12:47:19 +01:00
wyzula_j 6e398e8077 feat(device_combobox): device filter added based on its signal classes 2026-01-28 12:47:19 +01:00
wyzula_j 8d75c2af1c feat(signal_combobox): extended that can filter by signal class and dimension of the signal 2026-01-28 12:47:19 +01:00
wyzula_j 24dbb885f6 feat(plot_base): plot_base, image and heatmap widget adopted to property-toolbar sync 2026-01-28 11:16:48 +01:00
wyzula_j 3b7bad85d3 feat(toolbar): toolbar can be synced with the property_changed for toggle actions 2026-01-28 11:16:48 +01:00
wyzula_j de09cc660a feat(SafeProperty): SafeProperty emits property_changed signal 2026-01-28 11:16:48 +01:00
wakonig_k 4bb8e86509 test(e2e): raise with widget name 2026-01-28 10:50:52 +01:00
wakonig_k e5b76bc855 fix(rpc_server): use single shot instead of processEvents to avoid dead locks 2026-01-28 10:50:52 +01:00
wakonig_k 99176198ee fix: adjust ring progress bar to ads 2026-01-28 10:50:52 +01:00
wakonig_k dcfc573052 fix(FakeDevice): add _info dict 2026-01-28 10:50:52 +01:00
wakonig_k 9290a9a23b feat(color): add relative luminance calculation 2026-01-28 10:50:52 +01:00
wakonig_k d48b9d224f feat: add export and load settings methods to BECConnector; add SafeProperty safe getter flag 2026-01-28 10:50:52 +01:00
wyzula_j 43c311782d fix(rpc_register): listing only valid connections 2026-01-27 21:53:10 +01:00
wyzula_j 44f7acaeda fix(launch_window): logic for showing launcher 2026-01-27 21:53:10 +01:00
wyzula_j 0b212c3100 fix(main_window): parent fixed for notification broker 2026-01-27 21:53:10 +01:00
appel_c d8ebae49ad fix(device-progress-bar): remove stretch in content layout 2026-01-26 22:07:14 +01:00
perl_d 153fb62a04 style: wrap progress bar in widget to fix background 2026-01-26 22:07:14 +01:00
perl_d d67227d20c test: fix test 2026-01-26 22:07:14 +01:00
perl_d dc1072c247 fix: tooltip logic and disable button on running scan 2026-01-26 22:07:14 +01:00
perl_d beb337201c feat: attach config cancellation to closeEvent 2026-01-26 22:07:14 +01:00
perl_d 75162ef8a8 fix: remove manual stylesheet deletion/override 2026-01-26 22:07:14 +01:00
perl_d cc89252fb3 fix: 'Any' type annotations 2026-01-26 22:07:14 +01:00
appel_c 36fa0e649c fix(_OverlayEventFilter): fix typo 2026-01-26 22:07:14 +01:00
appel_c 8e173cb17e refactor(device-form-dialog): Use native QDialogButtonBox instead of GroupBox layout 2026-01-26 22:07:14 +01:00
appel_c 322655fc5e refactor(busy-loager): Improve eventFilter to avoid crashs if target or overlay is None. 2026-01-26 22:07:14 +01:00
appel_c 2b5b7360ae test(device-manager-view): improve test coverage for device-manager-view 2026-01-26 22:07:14 +01:00
appel_c b325d1bb4f fix(signal-label): Fix signal label cleanup, missing parent in constructors 2026-01-26 22:07:14 +01:00
appel_c ee6fd5fb9e test cleanup add mocked client 2026-01-26 22:07:14 +01:00
appel_c 53fe1ac63d fix(device-manager-display-widget): fix error message popup on cancelling upload 2026-01-26 22:07:14 +01:00
appel_c 58e57169e8 test(config-communicator): add test for cancel action 2026-01-26 22:07:14 +01:00
appel_c 2b27faf779 fix(device-init-progress-bar): fix ui format for device init progressbar 2026-01-26 22:07:14 +01:00
appel_c b1a3403cd3 fix(busy-loader): adjust busy loader and tests 2026-01-26 22:07:14 +01:00
appel_c b38d6dc549 refactor(busy-loader): refactor busy loader to use custom widget 2026-01-26 22:07:14 +01:00
appel_c cc45fed387 feat(device-initialization-progress-bar): add progress bar for device initialization 2026-01-26 22:07:14 +01:00
wyzula_j 5a594925f0 fix(colors): added logger to the apply theme 2026-01-26 20:53:24 +01:00
wyzula_j e76dea6f69 fix(launch_window): processEvents removed 2026-01-26 20:53:24 +01:00
wyzula_j f4c14d66db fix(advanced_dock_area): removed the singleShot for load_initial_profile 2026-01-26 20:53:24 +01:00
wyzula_j 4ef1344fec fix(view):removed splitter logic 2026-01-26 20:53:24 +01:00
wyzula_j 5e63814afe fix(basic_dock_area): removed the singleShot usage 2026-01-26 20:53:24 +01:00
wyzula_j 6be6dafd7d fix(widgets): processEvent removed from widgets using it 2026-01-26 20:53:24 +01:00
wyzula_j fd1edf8177 fix: remove singleShots from BECConnector and adjustments of dock area logic 2026-01-26 20:53:24 +01:00
wyzula_j 8102f31956 fix(positioner_box): layout HV centered and size taken from the ui file 2026-01-26 20:53:24 +01:00
wyzula_j f9b92dacc3 fix(bec_connector): use RPC register to fetch all connections 2026-01-26 20:53:24 +01:00
wyzula_j a219de11c1 feat(motor_map): motor selection adopted to splitter action 2026-01-23 15:10:12 +01:00
wyzula_j 45e9f03093 feat(toolbar): splitter action added 2026-01-23 15:10:12 +01:00
wakonig_k 48e2a97ece fix(scatter waveform): fix tab order for settings panel 2026-01-18 18:36:33 +01:00
wyzula_j 953760c828 fix(scatter_waveform): remove curve_json from the properties 2026-01-18 18:36:33 +01:00
wyzula_j dc3129357b fix(signal_combo_box): get_signal_name added; remove duplicates from heatmap and scatter waveform settings; 2026-01-18 18:36:33 +01:00
wyzula_j 12746ae4aa fix(scatter_waveform): modernization of scatter waveform settings dialog 2026-01-18 18:36:33 +01:00
wyzula_j 7e9cc20e59 fix(scatter_waveform): devices and entries saved as properties 2026-01-18 18:36:33 +01:00
wyzula_j 5209f4c210 fix(heatmap): devices are saved as SafeProperties 2026-01-16 17:29:19 +01:00
wakonig_k 5f30ab5aa2 test(script_tree): improve hover event handling with waitUntil 2026-01-16 17:07:48 +01:00
wakonig_k 3926c5c947 feat(web console): add support for shared web console sessions 2026-01-16 17:07:48 +01:00
appel_c f71c8c882f test(device-manager): use mocked client for tests 2026-01-16 11:05:18 +01:00
appel_c 04a30ea04c refactor(ophyd-validation): Allow option to keep device visible after successful validation 2026-01-16 11:05:18 +01:00
appel_c cbdeae15a1 fix(device-manager): fix minor icon synchronization bugs 2026-01-16 11:05:18 +01:00
appel_c 6aa33cacfa fix(device-manager-display-widget): Remove devices from ophyd validation after upload to BEC 2026-01-16 11:05:18 +01:00
appel_c 73cfe8da4c test(device-form-dialog): adapt tests 2026-01-16 11:05:18 +01:00
appel_c 0467d88010 fix(device-form-dialog): Adapt device-form-dialog ophyd validation test 2026-01-16 11:05:18 +01:00
appel_c c41ef4401d fix(device-form-dialog): Adapt DeviceFormDialog to run validation of config upon editing/adding a config, and forward validation results 2026-01-16 11:05:18 +01:00
wyzula_j 4f2a840c21 fix(CLI): change the default behavior of launching the profiles in CLI 2026-01-16 10:56:22 +01:00
wyzula_j 91050e88ae refactor(advanced_dock_area): change remove_widget to delete 2026-01-16 10:56:22 +01:00
wyzula_j 028efed5bc fix(advanced_dock_area): empty profile is always empty 2026-01-16 10:56:22 +01:00
wyzula_j 80f2ca40cb fix(advanced_dock_area): CLI API adjustments docs + names 2026-01-16 10:56:22 +01:00
wyzula_j 7c32d47f52 fix(advanced_dock_area): replace sanitize_namespace with slugify 2026-01-16 10:56:22 +01:00
wyzula_j bf7299c31e fix(client_utils): delete is deleting window and its content 2026-01-16 10:56:22 +01:00
wyzula_j f3470b409d fix(CLI): dock_area can be created from CLI with specific profile or empty 2026-01-16 10:56:22 +01:00
wyzula_j 3486dd4e44 fix(advanced_dock_area): remove widget from dock area by object name 2026-01-16 10:56:22 +01:00
wyzula_j 46fe5498b5 fix(advanced_dock_area): profile behaviour adjusted, cleanup of the codebase 2026-01-16 10:56:22 +01:00
wyzula_j e94ce73950 fix: sanitize name space util for bec connector and ads 2026-01-16 10:56:22 +01:00
wyzula_j 3cc469a3d1 fix(main_app): dock area from main app shares the workspace name with the CLI one to reuse the profiles created in the cli companion window 2026-01-16 10:56:22 +01:00
wyzula_j b4e1a7927d fix(launch_window): launch geometry for widgets launched from launcher to 80% of the primary screen as default 2026-01-16 10:56:22 +01:00
wyzula_j 84950cc651 fix(launch_window): argument to start with the gui class 2026-01-16 10:56:22 +01:00
wyzula_j 24cc8c7b98 fix(dock_area): the old BECDockArea(pg) removed and replaces by AdvancedDockArea(ADS) 2026-01-16 10:56:22 +01:00
wyzula_j 2132ace01b fix(advanced_dock_area): removed non-functional dock_list and dock_map from RPC 2026-01-16 10:56:22 +01:00
wyzula_j 67650b96a2 fix(advanced_dock_area): new profiles are saved with quickselect as default 2026-01-16 10:56:22 +01:00
wyzula_j 6b1d2958c3 fix(advanced_dock_area): ensure the general profile exists when launched first time 2026-01-16 10:56:22 +01:00
wyzula_j dab1defc76 fix(advanced_dock_area): remove all widgets when loading new profiles 2026-01-16 10:56:22 +01:00
wyzula_j c02f509867 fix(basic_dock_area): delete_all will also delete floating docks 2026-01-16 10:56:22 +01:00
wyzula_j b585a608c7 fix(main_window): delete on close 2026-01-16 10:56:22 +01:00
wakonig_k 21862e8021 fix(main_app): center the application window on the screen 2026-01-14 23:13:09 +01:00
wakonig_k 15ac1c0182 fix(main_app): refactor main function and update script entry point in pyproject.toml 2026-01-14 23:13:09 +01:00
wakonig_k da23a47213 ci: use shared issue sync action instead of local version 2026-01-09 10:36:19 +01:00
wakonig_k 1bb0f1a855 fix(developer widget): save before executing a scripts 2026-01-08 12:55:31 +01:00
wakonig_k f121d09baa fix(monaco widget): reset current_file 2026-01-08 12:55:31 +01:00
wakonig_k dd7a5e11df fix(monaco dock): update last focused editor when closing 2026-01-08 12:55:31 +01:00
wakonig_k 2d4eabead0 fix(monaco_dock): update editor metadata handling and improve open_file method 2026-01-08 12:55:31 +01:00
wakonig_k e607d34337 refactor(developer_widget): enhance documentation and add missing imports 2026-01-08 12:55:31 +01:00
wakonig_k 4a2bc9fcd9 feat(developer_widget): add signal connection for focused editor changes to disable run button for macro files 2026-01-08 12:55:31 +01:00
wyzula_j 2ffe269727 fix(client): client API regenerated 2026-01-05 11:25:55 +01:00
wyzula_j de5773662a feat(device-manager): Add DeviceManager Widget for BEC Widget main applications 2026-01-05 11:25:55 +01:00
wyzula_j 53b50e3420 fix(general_app): old general app example removed 2026-01-05 11:25:55 +01:00
wyzula_j b16f88b217 fix(heatmap): interpolation thread is killed only on exit, logger for dandling thread 2026-01-05 11:25:55 +01:00
wyzula_j 063e5d064c perf(heatmap): thread worker optimization 2026-01-05 11:25:55 +01:00
wyzula_j c354a9b249 fix(heatmap): interpolation of the image moved to separate thread 2026-01-05 11:25:55 +01:00
wyzula_j caa4e449e4 fix(motor_map): x/y motor are saved in properties 2026-01-05 11:25:55 +01:00
perl_d afc8c4733e fix: don't wait forever 2026-01-05 11:25:55 +01:00
wyzula_j a00024c66f fix(widget_state_manager): PROPERTIES_TO_SKIP are not restored even if in ini file 2026-01-05 11:25:55 +01:00
wyzula_j 5c18b291b5 feat(advanced_dock_area): floating docks restore with relative geometry 2026-01-05 11:25:55 +01:00
wakonig_k 08dde431a6 refactor: improvements to enum access 2026-01-05 11:25:55 +01:00
wyzula_j 7daa25d7c1 feat(advanced_dock_area): instance lock for multiple ads in same session 2026-01-05 11:25:55 +01:00
wyzula_j 8842eb617a fix(widgets): removed isVisible from all SafeProperties 2026-01-05 11:25:55 +01:00
wyzula_j 1d0634e142 fix(bec_widget): improved qt enums; grab safeguard 2026-01-05 11:25:55 +01:00
wyzula_j dc6946c924 fix(qt_ads): pythons stubs match structure of PySide6QtAds 2026-01-05 11:25:55 +01:00
wyzula_j 377bad4854 fix(widget_state_manager): filtering of not wanted properties 2026-01-05 11:25:55 +01:00
wyzula_j 6cdd813734 refactor(main_app): adapted for DockAreaWidget changes 2026-01-05 11:25:55 +01:00
wyzula_j 3f46f7eb7e refactor(developer_view): changed to use DockAreaWidget 2026-01-05 11:25:55 +01:00
wyzula_j 73f474c7e7 refactor(monaco_dock): changed to use DockAreaWidget 2026-01-05 11:25:55 +01:00
wyzula_j 2dfae4d38f feat(advanced_dock_area): created DockAreaWidget base class; profile management through namespaces; dock area variants 2026-01-05 11:25:54 +01:00
wyzula_j f7061baf7b fix(main_window): removed general forced cleanup 2026-01-05 11:25:54 +01:00
wyzula_j 5865d0f97d feat(advanced_dock_area): UI/UX for profile management improved, saving directories logic adjusted 2026-01-05 11:25:54 +01:00
wyzula_j c204815c42 fix(main_window): cleanup adjusted with shiboken6 2026-01-05 11:25:54 +01:00
wyzula_j af8f3911aa fix(dark_mode_button): skip settings added 2026-01-05 11:25:54 +01:00
wyzula_j 73afb5a472 fix(widget_state_manager): added shiboken check 2026-01-05 11:25:54 +01:00
wyzula_j 5836f286de feat(bec_widget): save screenshot to bytes 2026-01-05 11:25:54 +01:00
wyzula_j 5567274f2d fix(becconnector): ophyd thread killer on exit + in conftest 2026-01-05 11:25:54 +01:00
wakonig_k 7983a4527a feat(guided_tour): add guided tour 2026-01-05 11:25:54 +01:00
wakonig_k 0f63543326 fix: add metadata to scan control export 2026-01-05 11:25:54 +01:00
wyzula_j 01755aba07 feat(developer_view): add developer view 2026-01-05 11:25:54 +01:00
wyzula_j b4987fe759 feat(jupyter_console_window): adjustment for general usage 2026-01-05 11:25:54 +01:00
wakonig_k b0cb048c81 feat(ads): add pyi stub file to provide type hints for ads 2026-01-05 11:25:54 +01:00
appel_c e8c062a48f feat(dm-view): initial device manager view added 2026-01-05 11:25:54 +01:00
appel_c dfe914bb7e feat(help-inspector): add help inspector widget 2026-01-05 11:25:54 +01:00
wyzula_j b66353bf6e fix(signal_label): dispatcher unsubscribed in the cleanup 2026-01-05 11:25:54 +01:00
wyzula_j ead1d38b49 fix(client): abort, reset, stop button removed from RPC access 2026-01-05 11:25:54 +01:00
wyzula_j b2505c6a56 feat(main_app): main app with interactive app switcher 2026-01-05 11:25:54 +01:00
wyzula_j 663c00f1a4 feat(actions): actions can be created with label text with beside or under alignment 2026-01-05 11:25:54 +01:00
wyzula_j 3dd688540e feat(busy_loader): busy loader added to bec widget base class 2026-01-05 11:25:54 +01:00
wakonig_k 092ac915a8 feat: add SafeConnect 2026-01-05 11:25:54 +01:00
wyzula_j 03015a72a6 fix(bec_widgets): adapt to bec_qthemes 1.0; themes can be only applied on living Qt objects 2026-01-05 11:25:54 +01:00
wyzula_j 7dcaf8fe4c feat(advanced_dock_area): added ads based dock area with profiles 2026-01-05 11:25:54 +01:00
wyzula_j 02db6307e4 fix(web_console): added startup kwarg 2026-01-05 11:25:54 +01:00
wyzula_j 3a10cac7c8 refactor(bec_main_window): main app theme renamed to View 2026-01-05 11:25:54 +01:00
wyzula_j 64fecd16dd fix(widget_state_manager): state manager can save all properties recursively to already existing settings 2026-01-05 11:25:54 +01:00
wyzula_j 76639b3e04 feat(bec_widget): attach/detach method for all widgets + client regenerated 2026-01-05 11:25:54 +01:00
wyzula_j a767ee8331 feat(widget_io): widget hierarchy can grap all bec connectors from the widget recursively 2026-01-05 11:25:54 +01:00
wyzula_j 5c33f1a6d4 fix(bec_connector): widget_removed and name_established signals added 2026-01-05 11:25:54 +01:00
wakonig_k af320d812b ci: install ttyd 2026-01-05 11:25:54 +01:00
wakonig_k 5bfb50fdc6 ci: add artifact upload 2026-01-05 11:25:54 +01:00
wyzula_j 5393a84494 build: PySide6-QtAds; bec_qtheme V1; dependencies updated and adjusted 2026-01-05 11:25:54 +01:00
362 changed files with 9289 additions and 25477 deletions
+1 -1
View File
@@ -62,4 +62,4 @@ runs:
uv pip install --system -e ./ophyd_devices
uv pip install --system -e ./bec/bec_lib[dev]
uv pip install --system -e ./bec/bec_ipython_client
uv pip install --system -e ./bec_widgets[dev,qtermwidget]
uv pip install --system -e ./bec_widgets[dev,pyside6]
-169
View File
@@ -1,169 +0,0 @@
##########################
### AI-generated file. ###
##########################
"""Aggregate and merge benchmark JSON files.
The workflow runs the same benchmark suite on multiple independent runners.
This script reads every JSON file produced by those attempts, normalizes the
contained benchmark values, and writes a compact mapping JSON where each value is
the median across attempts. It can also merge independent hyperfine JSON files
from one runner into a single hyperfine-style JSON file.
"""
from __future__ import annotations
import argparse
import json
import statistics
from pathlib import Path
from typing import Any
from compare_benchmarks import Benchmark, extract_benchmarks
def collect_benchmarks(paths: list[Path]) -> dict[str, list[Benchmark]]:
"""Collect benchmarks from multiple JSON files.
Args:
paths (list[Path]): Paths to hyperfine, pytest-benchmark, or compact
mapping JSON files.
Returns:
dict[str, list[Benchmark]]: Benchmarks grouped by benchmark name.
"""
collected: dict[str, list[Benchmark]] = {}
for path in paths:
for name, benchmark in extract_benchmarks(path).items():
collected.setdefault(name, []).append(benchmark)
return collected
def aggregate(collected: dict[str, list[Benchmark]]) -> dict[str, dict[str, object]]:
"""Aggregate grouped benchmarks using the median value.
Args:
collected (dict[str, list[Benchmark]]): Benchmarks grouped by benchmark
name.
Returns:
dict[str, dict[str, object]]: Compact mapping JSON data. Each benchmark
contains ``value``, ``unit``, ``metric``, ``attempts``, and
``attempt_values``.
"""
aggregated: dict[str, dict[str, object]] = {}
for name, benchmarks in sorted(collected.items()):
values = [benchmark.value for benchmark in benchmarks]
unit = next((benchmark.unit for benchmark in benchmarks if benchmark.unit), "")
metric = next((benchmark.metric for benchmark in benchmarks if benchmark.metric), "value")
aggregated[name] = {
"value": statistics.median(values),
"unit": unit,
"metric": f"median-of-attempt-{metric}",
"attempts": len(values),
"attempt_values": values,
}
return aggregated
def merge_hyperfine_results(paths: list[Path]) -> dict[str, Any]:
"""Merge hyperfine result files.
Args:
paths (list[Path]): Hyperfine JSON files to merge.
Returns:
dict[str, Any]: Hyperfine-style JSON object containing all result rows.
Raises:
ValueError: If any file has no hyperfine ``results`` list.
"""
merged: dict[str, Any] = {"results": []}
for path in paths:
data = json.loads(path.read_text(encoding="utf-8"))
results = data.get("results", []) if isinstance(data, dict) else None
if not isinstance(results, list):
raise ValueError(f"{path} has no hyperfine results list")
merged["results"].extend(results)
return merged
def main_from_paths(input_dir: Path, output: Path) -> int:
"""Aggregate all JSON files in a directory and write the result.
Args:
input_dir (Path): Directory containing benchmark JSON files.
output (Path): Path where the aggregate JSON should be written.
Returns:
int: Always ``0`` on success.
Raises:
ValueError: If no JSON files are found in ``input_dir``.
"""
paths = sorted(input_dir.rglob("*.json"))
if not paths:
raise ValueError(f"No benchmark JSON files found in {input_dir}")
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(
json.dumps(aggregate(collect_benchmarks(paths)), indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
return 0
def merge_from_paths(input_dir: Path, output: Path) -> int:
"""Merge all hyperfine JSON files in a directory and write the result.
Args:
input_dir (Path): Directory containing hyperfine JSON files.
output (Path): Path where the merged JSON should be written.
Returns:
int: Always ``0`` on success.
Raises:
ValueError: If no JSON files are found in ``input_dir``.
"""
paths = sorted(input_dir.glob("*.json"))
if not paths:
raise ValueError(f"No hyperfine JSON files found in {input_dir}")
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(
json.dumps(merge_hyperfine_results(paths), indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
return 0
def main() -> int:
"""Run the benchmark aggregation command line interface.
Returns:
int: Always ``0`` on success.
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--mode",
choices=("aggregate", "merge-hyperfine"),
default="aggregate",
help="Operation to perform.",
)
parser.add_argument("--input-dir", required=True, type=Path)
parser.add_argument("--output", required=True, type=Path)
args = parser.parse_args()
if args.mode == "merge-hyperfine":
return merge_from_paths(input_dir=args.input_dir, output=args.output)
return main_from_paths(input_dir=args.input_dir, output=args.output)
if __name__ == "__main__":
raise SystemExit(main())
-454
View File
@@ -1,454 +0,0 @@
##########################
### AI-generated file. ###
##########################
"""Compare benchmark JSON files and write a GitHub Actions summary.
The script supports JSON emitted by hyperfine, JSON emitted by pytest-benchmark,
and a compact mapping format generated by ``aggregate_benchmarks.py``. Timing
formats prefer median values and fall back to mean values when median values are
not present.
"""
from __future__ import annotations
import argparse
import json
import math
from dataclasses import dataclass
from pathlib import Path
from typing import Any
@dataclass(frozen=True)
class Benchmark:
"""Normalized benchmark result.
Attributes:
name (str): Stable benchmark name used to match baseline and current results.
value (float): Numeric benchmark value used for comparison.
unit (str): Display unit for the value, for example ``"s"``.
metric (str): Source metric name, for example ``"median"`` or ``"mean"``.
"""
name: str
value: float
unit: str
metric: str = "value"
@dataclass(frozen=True)
class Comparison:
"""Comparison between one baseline benchmark and one current benchmark.
Attributes:
name (str): Benchmark name.
baseline (float): Baseline benchmark value.
current (float): Current benchmark value.
delta_percent (float): Percent change from baseline to current.
unit (str): Display unit for both values.
metric (str): Current result metric used for comparison.
regressed (bool): Whether the change exceeds the configured threshold in
the worse direction.
improved (bool): Whether the change exceeds the configured threshold in
the better direction.
"""
name: str
baseline: float
current: float
delta_percent: float
unit: str
metric: str
regressed: bool
improved: bool
def _read_json(path: Path) -> Any:
"""Read JSON data from a file.
Args:
path (Path): Path to the JSON file.
Returns:
Any: Parsed JSON value.
"""
with path.open("r", encoding="utf-8") as stream:
return json.load(stream)
def _as_float(value: Any) -> float | None:
"""Convert a value to a finite float.
Args:
value (Any): Value to convert.
Returns:
float | None: Converted finite float, or ``None`` if conversion fails.
"""
try:
result = float(value)
except (TypeError, ValueError):
return None
if math.isfinite(result):
return result
return None
def _extract_hyperfine(data: dict[str, Any]) -> dict[str, Benchmark]:
"""Extract normalized benchmarks from hyperfine JSON.
Args:
data (dict[str, Any]): Parsed hyperfine JSON object.
Returns:
dict[str, Benchmark]: Benchmarks keyed by command name.
"""
benchmarks: dict[str, Benchmark] = {}
for result in data.get("results", []):
if not isinstance(result, dict):
continue
name = str(result.get("command") or result.get("name") or "").strip()
metric = "median"
value = _as_float(result.get(metric))
if value is None:
metric = "mean"
value = _as_float(result.get(metric))
if name and value is not None:
benchmarks[name] = Benchmark(name=name, value=value, unit="s", metric=metric)
return benchmarks
def _extract_pytest_benchmark(data: dict[str, Any]) -> dict[str, Benchmark]:
"""Extract normalized benchmarks from pytest-benchmark JSON.
Args:
data (dict[str, Any]): Parsed pytest-benchmark JSON object.
Returns:
dict[str, Benchmark]: Benchmarks keyed by full benchmark name.
"""
benchmarks: dict[str, Benchmark] = {}
for benchmark in data.get("benchmarks", []):
if not isinstance(benchmark, dict):
continue
name = str(benchmark.get("fullname") or benchmark.get("name") or "").strip()
stats = benchmark.get("stats", {})
value = None
metric = "median"
if isinstance(stats, dict):
value = _as_float(stats.get(metric))
if value is None:
metric = "mean"
value = _as_float(stats.get(metric))
if name and value is not None:
benchmarks[name] = Benchmark(name=name, value=value, unit="s", metric=metric)
return benchmarks
def _extract_simple_mapping(data: dict[str, Any]) -> dict[str, Benchmark]:
"""Extract normalized benchmarks from a compact mapping JSON object.
Args:
data (dict[str, Any]): Parsed mapping where each benchmark is either a
raw number or an object containing ``value``, ``unit``, and ``metric``.
Returns:
dict[str, Benchmark]: Benchmarks keyed by mapping key.
"""
benchmarks: dict[str, Benchmark] = {}
for name, raw_value in data.items():
if name in {"version", "context", "commit", "timestamp"}:
continue
value = _as_float(raw_value)
unit = ""
metric = "value"
if value is None and isinstance(raw_value, dict):
value = _as_float(raw_value.get("value"))
unit = str(raw_value.get("unit") or "")
metric = str(raw_value.get("metric") or "value")
if value is not None:
benchmarks[str(name)] = Benchmark(name=str(name), value=value, unit=unit, metric=metric)
return benchmarks
def extract_benchmarks(path: Path) -> dict[str, Benchmark]:
"""Extract normalized benchmarks from a supported JSON file.
Args:
path (Path): Path to a hyperfine, pytest-benchmark, or compact mapping
JSON file.
Returns:
dict[str, Benchmark]: Normalized benchmarks keyed by name.
Raises:
ValueError: If the JSON root is not an object or no supported benchmark
entries can be extracted.
"""
data = _read_json(path)
if not isinstance(data, dict):
raise ValueError(f"{path} must contain a JSON object")
extractors = (_extract_hyperfine, _extract_pytest_benchmark, _extract_simple_mapping)
for extractor in extractors:
benchmarks = extractor(data)
if benchmarks:
return benchmarks
raise ValueError(f"No supported benchmark entries found in {path}")
def compare_benchmarks(
baseline: dict[str, Benchmark],
current: dict[str, Benchmark],
threshold_percent: float,
higher_is_better: bool,
) -> tuple[list[Comparison], list[str], list[str]]:
"""Compare baseline benchmarks with current benchmarks.
Args:
baseline (dict[str, Benchmark]): Baseline benchmarks keyed by name.
current (dict[str, Benchmark]): Current benchmarks keyed by name.
threshold_percent (float): Regression threshold in percent.
higher_is_better (bool): If ``True``, lower current values are treated as
regressions. If ``False``, higher current values are treated as
regressions.
Returns:
tuple[list[Comparison], list[str], list[str]]: Comparisons for common
benchmark names, names missing from current results, and names newly
present in current results.
"""
comparisons: list[Comparison] = []
missing_in_current: list[str] = []
new_in_current: list[str] = []
for name, baseline_benchmark in sorted(baseline.items()):
current_benchmark = current.get(name)
if current_benchmark is None:
missing_in_current.append(name)
continue
if baseline_benchmark.value == 0:
delta_percent = 0.0
else:
delta_percent = (
(current_benchmark.value - baseline_benchmark.value)
/ abs(baseline_benchmark.value)
* 100
)
if higher_is_better:
regressed = delta_percent <= -threshold_percent
improved = delta_percent >= threshold_percent
else:
regressed = delta_percent >= threshold_percent
improved = delta_percent <= -threshold_percent
comparisons.append(
Comparison(
name=name,
baseline=baseline_benchmark.value,
current=current_benchmark.value,
delta_percent=delta_percent,
unit=current_benchmark.unit or baseline_benchmark.unit,
metric=current_benchmark.metric,
regressed=regressed,
improved=improved,
)
)
for name in sorted(set(current) - set(baseline)):
new_in_current.append(name)
return comparisons, missing_in_current, new_in_current
def _format_value(value: float, unit: str) -> str:
"""Format a benchmark value for Markdown output.
Args:
value (float): Numeric benchmark value.
unit (str): Display unit.
Returns:
str: Formatted value with optional unit suffix.
"""
suffix = f" {unit}" if unit else ""
return f"{value:.6g}{suffix}"
def _format_status(comparison: Comparison) -> str:
"""Format a comparison status for Markdown output."""
if comparison.regressed:
return ":red_circle: regressed"
if comparison.improved:
return ":green_circle: improved"
return "ok"
def write_summary(
path: Path,
comparisons: list[Comparison],
missing_in_current: list[str],
new_in_current: list[str],
threshold_percent: float,
higher_is_better: bool,
) -> None:
"""Write a Markdown benchmark comparison summary.
Args:
path (Path): Path where the summary should be written.
comparisons (list[Comparison]): Comparison rows for matching benchmarks.
missing_in_current (list[str]): Baseline benchmark names missing from the
current result.
new_in_current (list[str]): Current benchmark names not present in the
baseline result.
threshold_percent (float): Regression threshold in percent.
higher_is_better (bool): Whether higher benchmark values are considered
better.
"""
regressions = [comparison for comparison in comparisons if comparison.regressed]
improvements = [comparison for comparison in comparisons if comparison.improved]
direction = "higher is better" if higher_is_better else "lower is better"
sorted_comparisons = sorted(comparisons, key=lambda comparison: comparison.name)
lines = [
"<!-- bw-benchmark-comment -->",
"## Benchmark comparison",
"",
f"Threshold: {threshold_percent:g}% ({direction}).",
f"Result: {len(regressions)} regression(s), {len(improvements)} improvement(s) beyond threshold.",
]
lines.append("")
if regressions:
lines.extend(
[
f"{len(regressions)} benchmark(s) regressed beyond the configured threshold.",
"",
"| Benchmark | Baseline | Current | Change |",
"| --- | ---: | ---: | ---: |",
]
)
for comparison in regressions:
lines.append(
"| "
f"{comparison.name} | "
f"{_format_value(comparison.baseline, comparison.unit)} | "
f"{_format_value(comparison.current, comparison.unit)} | "
f"{comparison.delta_percent:+.2f}% |"
)
else:
lines.append("No benchmark regression exceeded the configured threshold.")
lines.append("")
if improvements:
lines.extend(
[
f"{len(improvements)} benchmark(s) improved beyond the configured threshold.",
"",
"| Benchmark | Baseline | Current | Change |",
"| --- | ---: | ---: | ---: |",
]
)
for comparison in improvements:
lines.append(
"| "
f"{comparison.name} | "
f"{_format_value(comparison.baseline, comparison.unit)} | "
f"{_format_value(comparison.current, comparison.unit)} | "
f"{comparison.delta_percent:+.2f}% |"
)
else:
lines.append("No benchmark improvement exceeded the configured threshold.")
if sorted_comparisons:
lines.extend(
[
"",
"<details>",
"<summary>All benchmark results</summary>",
"",
"| Benchmark | Baseline | Current | Change | Status |",
"| --- | ---: | ---: | ---: | --- |",
]
)
for comparison in sorted_comparisons:
lines.append(
"| "
f"{comparison.name} | "
f"{_format_value(comparison.baseline, comparison.unit)} | "
f"{_format_value(comparison.current, comparison.unit)} | "
f"{comparison.delta_percent:+.2f}% | "
f"{_format_status(comparison)} |"
)
lines.extend(["", "</details>"])
if missing_in_current:
lines.extend(["", "Missing benchmarks in the current run:"])
lines.extend(f"- `{name}`" for name in missing_in_current)
if new_in_current:
lines.extend(["", "New benchmarks in the current run:"])
lines.extend(f"- `{name}`" for name in new_in_current)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
def main() -> int:
"""Run the benchmark comparison command line interface.
Returns:
int: ``1`` when a regression exceeds the threshold, otherwise ``0``.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--baseline", required=True, type=Path)
parser.add_argument("--current", required=True, type=Path)
parser.add_argument("--summary", required=True, type=Path)
parser.add_argument("--threshold-percent", required=True, type=float)
parser.add_argument("--higher-is-better", action="store_true")
args = parser.parse_args()
baseline = extract_benchmarks(args.baseline)
current = extract_benchmarks(args.current)
comparisons, missing_in_current, new_in_current = compare_benchmarks(
baseline=baseline,
current=current,
threshold_percent=args.threshold_percent,
higher_is_better=args.higher_is_better,
)
write_summary(
path=args.summary,
comparisons=comparisons,
missing_in_current=missing_in_current,
new_in_current=new_in_current,
threshold_percent=args.threshold_percent,
higher_is_better=args.higher_is_better,
)
return 1 if any(comparison.regressed for comparison in comparisons) else 0
if __name__ == "__main__":
raise SystemExit(main())
-74
View File
@@ -1,74 +0,0 @@
#!/usr/bin/env bash
##########################
### AI-generated file. ###
##########################
set -euo pipefail
mkdir -p benchmark-results
benchmark_json="${BENCHMARK_JSON:-benchmark-results/current.json}"
benchmark_root="$(dirname "$benchmark_json")"
hyperfine_benchmark_dir="${BENCHMARK_HYPERFINE_DIR:-tests/benchmarks/hyperfine}"
pytest_benchmark_dirs="${BENCHMARK_PYTEST_DIRS:-${BENCHMARK_PYTEST_DIR:-}}"
benchmark_work_dir="$benchmark_root/raw-results"
hyperfine_json_dir="$benchmark_work_dir/hyperfine"
pytest_json="$benchmark_work_dir/pytest.json"
shopt -s nullglob
benchmark_scripts=()
benchmark_scripts=("$hyperfine_benchmark_dir"/benchmark_*.sh)
shopt -u nullglob
pytest_dirs=()
for pytest_benchmark_dir in $pytest_benchmark_dirs; do
if [ -d "$pytest_benchmark_dir" ]; then
pytest_dirs+=("$pytest_benchmark_dir")
else
echo "Pytest benchmark directory not found: $pytest_benchmark_dir" >&2
exit 1
fi
done
if [ "${#benchmark_scripts[@]}" -eq 0 ] && [ "${#pytest_dirs[@]}" -eq 0 ]; then
echo "No benchmark scripts or pytest benchmarks found" >&2
exit 1
fi
echo "Benchmark Python: $(command -v python)"
python -c 'import sys; print(sys.version)'
rm -rf "$benchmark_work_dir"
mkdir -p "$hyperfine_json_dir"
if [ "${#benchmark_scripts[@]}" -gt 0 ]; then
for benchmark_script in "${benchmark_scripts[@]}"; do
title="$(sed -n 's/^# BENCHMARK_TITLE:[[:space:]]*//p' "$benchmark_script" | head -n 1)"
if [ -z "$title" ]; then
title="$(basename "$benchmark_script" .sh)"
fi
benchmark_name="$(basename "$benchmark_script" .sh)"
benchmark_result_json="$hyperfine_json_dir/$benchmark_name.json"
echo "Preflight benchmark script: $benchmark_script"
bash "$benchmark_script"
hyperfine \
--show-output \
--warmup 1 \
--runs 5 \
--command-name "$title" \
--export-json "$benchmark_result_json" \
"bash $(printf "%q" "$benchmark_script")"
done
fi
if [ "${#pytest_dirs[@]}" -gt 0 ]; then
pytest \
-q "${pytest_dirs[@]}" \
--benchmark-only \
--benchmark-json "$pytest_json"
fi
python .github/scripts/aggregate_benchmarks.py \
--input-dir "$benchmark_work_dir" \
--output "$benchmark_json"
-125
View File
@@ -1,125 +0,0 @@
##########################
### AI-generated file. ###
##########################
"""Run a command with BEC e2e services available."""
from __future__ import annotations
import argparse
import os
import shutil
import subprocess
import tempfile
import time
from pathlib import Path
import bec_lib
from bec_ipython_client import BECIPythonClient
from bec_lib.redis_connector import RedisConnector
from bec_lib.service_config import ServiceConfig, ServiceConfigModel
from redis import Redis
def _wait_for_redis(host: str, port: int) -> None:
client = Redis(host=host, port=port)
deadline = time.monotonic() + 10
while time.monotonic() < deadline:
try:
if client.ping():
return
except Exception:
time.sleep(0.1)
raise RuntimeError(f"Redis did not start on {host}:{port}")
def _start_redis(files_path: Path, host: str, port: int) -> subprocess.Popen:
redis_server = shutil.which("redis-server")
if redis_server is None:
raise RuntimeError("redis-server executable not found")
return subprocess.Popen(
[
redis_server,
"--bind",
host,
"--port",
str(port),
"--save",
"",
"--appendonly",
"no",
"--dir",
str(files_path),
]
)
def _write_configs(files_path: Path, host: str, port: int) -> Path:
test_config = files_path / "test_config.yaml"
services_config = files_path / "services_config.yaml"
bec_lib_path = Path(bec_lib.__file__).resolve().parent
shutil.copyfile(bec_lib_path / "tests" / "test_config.yaml", test_config)
service_config = ServiceConfigModel(
redis={"host": host, "port": port}, file_writer={"base_path": str(files_path)}
)
services_config.write_text(service_config.model_dump_json(indent=4), encoding="utf-8")
return services_config
def _load_demo_config(services_config: Path) -> None:
bec = BECIPythonClient(ServiceConfig(services_config), RedisConnector, forced=True)
bec.start()
try:
bec.config.load_demo_config()
finally:
bec.shutdown()
bec._client._reset_singleton()
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("command", nargs=argparse.REMAINDER)
args = parser.parse_args()
if args.command[:1] == ["--"]:
args.command = args.command[1:]
if not args.command:
raise ValueError("No command provided")
host = "127.0.0.1"
port = 6379
with tempfile.TemporaryDirectory(prefix="bec-benchmark-") as tmp:
files_path = Path(tmp)
services_config = _write_configs(files_path, host, port)
redis_process = _start_redis(files_path, host, port)
processes = None
service_handler = None
try:
_wait_for_redis(host, port)
from bec_server.bec_server_utils.service_handler import ServiceHandler
service_handler = ServiceHandler(
bec_path=files_path, config_path=services_config, interface="subprocess"
)
processes = service_handler.start()
_load_demo_config(services_config)
env = os.environ.copy()
return subprocess.run(args.command, env=env, check=False).returncode
finally:
if service_handler is not None and processes is not None:
service_handler.stop(processes)
redis_process.terminate()
try:
redis_process.wait(timeout=10)
except subprocess.TimeoutExpired:
redis_process.kill()
if __name__ == "__main__":
raise SystemExit(main())
-242
View File
@@ -1,242 +0,0 @@
name: BW Benchmarks
on: [ workflow_call ]
permissions:
contents: read
env:
BENCHMARK_JSON: benchmark-results/current.json
BENCHMARK_BASELINE_JSON: gh-pages-benchmark-data/benchmarks/latest.json
BENCHMARK_SUMMARY: benchmark-results/summary.md
BENCHMARK_COMMAND: "bash .github/scripts/run_benchmarks.sh"
BENCHMARK_THRESHOLD_PERCENT: 20
BENCHMARK_HIGHER_IS_BETTER: false
jobs:
benchmark_attempt:
runs-on: ubuntu-latest
continue-on-error: true
permissions:
contents: read
defaults:
run:
shell: bash -el {0}
strategy:
fail-fast: false
matrix:
attempt: [ 1, 2, 3 ]
env:
BENCHMARK_JSON: benchmark-results/current-${{ matrix.attempt }}.json
BEC_CORE_BRANCH: main
OPHYD_DEVICES_BRANCH: main
PLUGIN_REPO_BRANCH: main
BENCHMARK_PYTEST_DIRS: tests/unit_tests/benchmarks
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
steps:
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
repository: bec-project/bec_widgets
ref: ${{ github.event.pull_request.head.sha || github.sha }}
- name: Set up Conda
uses: conda-incubator/setup-miniconda@v3
with:
auto-update-conda: true
auto-activate-base: true
python-version: "3.11"
- name: Install system dependencies
run: |
sudo apt-get update
sudo apt-get install -y libgl1 libegl1 x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb
sudo apt-get -y install libnss3 libxdamage1 libasound2t64 libatomic1 libxcursor1
sudo apt-get -y install ttyd hyperfine redis-server
- name: Install full e2e environment
run: |
echo -e "\033[35;1m Using branch $BEC_CORE_BRANCH of BEC CORE \033[0;m";
git clone --branch "$BEC_CORE_BRANCH" https://github.com/bec-project/bec.git
echo -e "\033[35;1m Using branch $OPHYD_DEVICES_BRANCH of OPHYD_DEVICES \033[0;m";
git clone --branch "$OPHYD_DEVICES_BRANCH" https://github.com/bec-project/ophyd_devices.git
export OHPYD_DEVICES_PATH=$PWD/ophyd_devices
echo -e "\033[35;1m Using branch $PLUGIN_REPO_BRANCH of bec_testing_plugin \033[0;m";
git clone --branch "$PLUGIN_REPO_BRANCH" https://github.com/bec-project/bec_testing_plugin.git
cd ./bec
conda create -q -n test-environment python=3.11
conda activate test-environment
source ./bin/install_bec_dev.sh -t
cd ../
python -m pip install -e ./ophyd_devices -e .[dev,pyside6] -e ./bec_testing_plugin pytest-benchmark
mkdir -p "$(dirname "$BENCHMARK_JSON")"
python .github/scripts/run_with_bec_servers.py -- bash -lc "$BENCHMARK_COMMAND"
test -s "$BENCHMARK_JSON"
- name: Upload benchmark artifact
uses: actions/upload-artifact@v4
with:
name: bw-benchmark-json-${{ matrix.attempt }}
path: ${{ env.BENCHMARK_JSON }}
benchmark:
needs: [ benchmark_attempt ]
runs-on: ubuntu-latest
permissions:
contents: read
issues: write
pull-requests: write
steps:
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
repository: bec-project/bec_widgets
ref: ${{ github.event.pull_request.head.sha || github.sha }}
- name: Download benchmark attempts
uses: actions/download-artifact@v4
with:
pattern: bw-benchmark-json-*
path: benchmark-results/attempts
merge-multiple: true
- name: Aggregate benchmark attempts
run: |
python .github/scripts/aggregate_benchmarks.py \
--input-dir benchmark-results/attempts \
--output "$BENCHMARK_JSON"
- name: Upload aggregate benchmark artifact
uses: actions/upload-artifact@v4
with:
name: bw-benchmark-json
path: ${{ env.BENCHMARK_JSON }}
- name: Fetch gh-pages benchmark data
run: |
if git ls-remote --exit-code --heads origin gh-pages; then
git clone --depth=1 --branch gh-pages "$GITHUB_SERVER_URL/$GITHUB_REPOSITORY.git" gh-pages-benchmark-data
else
mkdir -p gh-pages-benchmark-data
fi
- name: Compare with latest gh-pages benchmark
id: compare
continue-on-error: true
run: |
if [ ! -s "$BENCHMARK_BASELINE_JSON" ]; then
mkdir -p "$(dirname "$BENCHMARK_SUMMARY")"
{
echo "<!-- bw-benchmark-comment -->"
echo "## Benchmark comparison"
echo
echo "No benchmark baseline was found on gh-pages."
} > "$BENCHMARK_SUMMARY"
exit 0
fi
args=(
--baseline "$BENCHMARK_BASELINE_JSON"
--current "$BENCHMARK_JSON"
--summary "$BENCHMARK_SUMMARY"
--threshold-percent "$BENCHMARK_THRESHOLD_PERCENT"
)
if [ "$BENCHMARK_HIGHER_IS_BETTER" = "true" ]; then
args+=(--higher-is-better)
fi
set +e
python .github/scripts/compare_benchmarks.py "${args[@]}"
status=$?
set -e
if [ ! -s "$BENCHMARK_SUMMARY" ]; then
mkdir -p "$(dirname "$BENCHMARK_SUMMARY")"
{
echo "<!-- bw-benchmark-comment -->"
echo "## Benchmark comparison"
echo
echo "Benchmark comparison failed before writing a summary."
} > "$BENCHMARK_SUMMARY"
fi
exit "$status"
- name: Find existing benchmark PR comment
if: github.event_name == 'pull_request'
id: fc
uses: peter-evans/find-comment@v3
with:
issue-number: ${{ github.event.pull_request.number }}
comment-author: github-actions[bot]
body-includes: "<!-- bw-benchmark-comment -->"
- name: Create or update benchmark PR comment
if: github.event_name == 'pull_request'
uses: peter-evans/create-or-update-comment@v5
with:
issue-number: ${{ github.event.pull_request.number }}
comment-id: ${{ steps.fc.outputs.comment-id }}
body-path: ${{ env.BENCHMARK_SUMMARY }}
edit-mode: replace
- name: Fail on benchmark regression
if: github.event_name == 'pull_request' && steps.compare.outcome == 'failure'
run: exit 1
publish:
needs: [ benchmark ]
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
repository: bec-project/bec_widgets
ref: ${{ github.sha }}
- name: Download aggregate benchmark artifact
uses: actions/download-artifact@v4
with:
name: bw-benchmark-json
path: benchmark-results
- name: Verify aggregate benchmark artifact
run: test -s "$BENCHMARK_JSON"
- name: Prepare gh-pages for publishing
run: |
# Clean up any existing worktree/directory
if [ -d gh-pages-benchmark-data ]; then
git worktree remove gh-pages-benchmark-data --force || rm -rf gh-pages-benchmark-data
fi
if git ls-remote --exit-code --heads origin gh-pages; then
git fetch --depth=1 origin gh-pages
git worktree add gh-pages-benchmark-data FETCH_HEAD
else
git worktree add --detach gh-pages-benchmark-data
git -C gh-pages-benchmark-data checkout --orphan gh-pages
git -C gh-pages-benchmark-data rm -rf .
fi
- name: Publish benchmark data to gh-pages
working-directory: gh-pages-benchmark-data
run: |
mkdir -p benchmarks/history
cp "../$BENCHMARK_JSON" benchmarks/latest.json
cp "../$BENCHMARK_JSON" "benchmarks/history/${GITHUB_SHA}.json"
git config user.name "github-actions[bot]"
git config user.email "41898282+github-actions[bot]@users.noreply.github.com"
git add benchmarks/latest.json "benchmarks/history/${GITHUB_SHA}.json"
git commit -m "Update BW benchmark data for ${GITHUB_SHA}" || exit 0
git push origin HEAD:gh-pages
-21
View File
@@ -45,18 +45,6 @@ jobs:
cd ./bec
pip install pytest pytest-random-order
pytest -v --maxfail=2 --junitxml=report.xml --random-order ./bec_server/tests ./bec_ipython_client/tests/client_tests ./bec_lib/tests
- name: Upload BEC unit test artifacts if job fails
if: failure()
uses: actions/upload-artifact@v4
with:
name: bec-unit-test-artifacts
path: |
./bec/report.xml
./bec/logs/*.log
if-no-files-found: ignore
retention-days: 7
bec-e2e-test:
name: BEC End2End Tests
runs-on: ubuntu-latest
@@ -74,12 +62,3 @@ jobs:
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH }}
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH }}
PYTHON_VERSION: '3.11'
- name: Upload BEC e2e logs if job fails
if: failure()
uses: actions/upload-artifact@v4
with:
name: bec-e2e-test-logs
path: ./_e2e_test_checkout_/bec/logs/*.log
if-no-files-found: ignore
retention-days: 7
+7 -21
View File
@@ -1,29 +1,24 @@
name: Full CI
on:
on:
push:
pull_request:
workflow_dispatch:
inputs:
BEC_WIDGETS_BRANCH:
description: "Branch of BEC Widgets to install"
description: 'Branch of BEC Widgets to install'
required: false
type: string
BEC_CORE_BRANCH:
description: "Branch of BEC Core to install"
description: 'Branch of BEC Core to install'
required: false
type: string
OPHYD_DEVICES_BRANCH:
description: "Branch of Ophyd Devices to install"
description: 'Branch of Ophyd Devices to install'
required: false
type: string
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
permissions:
pull-requests: write
contents: read
jobs:
check_pr_status:
@@ -34,15 +29,6 @@ jobs:
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/formatter.yml
benchmark:
needs: [check_pr_status]
if: needs.check_pr_status.outputs.branch-pr == ''
permissions:
contents: write
issues: write
pull-requests: write
uses: ./.github/workflows/benchmark.yml
unit-test:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
@@ -79,9 +65,9 @@ jobs:
uses: ./.github/workflows/child_repos.yml
with:
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main'}}
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main'}}
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
plugin_repos:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
@@ -91,4 +77,4 @@ jobs:
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
secrets:
GH_READ_TOKEN: ${{ secrets.GH_READ_TOKEN }}
GH_READ_TOKEN: ${{ secrets.GH_READ_TOKEN }}
+1 -1
View File
@@ -55,5 +55,5 @@ jobs:
uses: actions/upload-artifact@v4
with:
name: pytest-logs
path: ./bec/logs/*.log
path: ./logs/*.log
retention-days: 7
+13 -12
View File
@@ -1,25 +1,25 @@
name: Run Pytest with different Python versions
on:
on:
workflow_call:
inputs:
pr_number:
description: "Pull request number"
description: 'Pull request number'
required: false
type: number
BEC_CORE_BRANCH:
description: "Branch of BEC Core to install"
description: 'Branch of BEC Core to install'
required: false
default: "main"
default: 'main'
type: string
OPHYD_DEVICES_BRANCH:
description: "Branch of Ophyd Devices to install"
description: 'Branch of Ophyd Devices to install'
required: false
default: "main"
default: 'main'
type: string
BEC_WIDGETS_BRANCH:
description: "Branch of BEC Widgets to install"
description: 'Branch of BEC Widgets to install'
required: false
default: "main"
default: 'main'
type: string
jobs:
@@ -30,14 +30,15 @@ jobs:
python-version: ["3.11", "3.12", "3.13"]
env:
BEC_WIDGETS_BRANCH: main # Set the branch you want for bec_widgets
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
BEC_WIDGETS_BRANCH: main # Set the branch you want for bec_widgets
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
PROJECT_PATH: ${{ github.repository }}
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
steps:
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
@@ -55,4 +56,4 @@ jobs:
- name: Run Pytest
run: |
pip install pytest pytest-random-order
pytest -v --junitxml=report.xml --random-order --ignore=tests/unit_tests/benchmarks ./tests/unit_tests
pytest -v --junitxml=report.xml --random-order ./tests/unit_tests
+12 -10
View File
@@ -1,30 +1,32 @@
name: Run Pytest with Coverage
on:
on:
workflow_call:
inputs:
pr_number:
description: "Pull request number"
description: 'Pull request number'
required: false
type: number
BEC_CORE_BRANCH:
description: "Branch of BEC Core to install"
description: 'Branch of BEC Core to install'
required: false
default: "main"
default: 'main'
type: string
OPHYD_DEVICES_BRANCH:
description: "Branch of Ophyd Devices to install"
description: 'Branch of Ophyd Devices to install'
required: false
default: "main"
default: 'main'
type: string
BEC_WIDGETS_BRANCH:
description: "Branch of BEC Widgets to install"
description: 'Branch of BEC Widgets to install'
required: false
default: "main"
default: 'main'
type: string
secrets:
CODECOV_TOKEN:
required: true
permissions:
pull-requests: write
@@ -53,7 +55,7 @@ jobs:
- name: Run Pytest with Coverage
id: coverage
run: pytest --random-order --cov=bec_widgets --cov-config=pyproject.toml --cov-branch --cov-report=xml --no-cov-on-fail --ignore=tests/unit_tests/benchmarks tests/unit_tests/
run: pytest --random-order --cov=bec_widgets --cov-config=pyproject.toml --cov-branch --cov-report=xml --no-cov-on-fail tests/unit_tests/
- name: Upload test artifacts
uses: actions/upload-artifact@v4
@@ -67,4 +69,4 @@ jobs:
uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
slug: bec-project/bec_widgets
slug: bec-project/bec_widgets
+1 -3
View File
@@ -177,6 +177,4 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
#
tombi.toml
#.idea/
-1470
View File
File diff suppressed because it is too large Load Diff
+2 -1
View File
@@ -192,7 +192,8 @@ Positioner boxes and tweak controls handle precise moves, homing, and calibratio
## Documentation
The documentation can be found [here](https://bec.readthedocs.io/).
Documentation of BEC Widgets can be found [here](https://bec-widgets.readthedocs.io/en/latest/). The documentation of
the BEC can be found [here](https://bec.readthedocs.io/en/latest/).
## License
+18 -12
View File
@@ -1,13 +1,19 @@
import os
import sys
import bec_widgets.widgets.containers.qt_ads as QtAds
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
if sys.platform.startswith("linux"):
qt_platform = os.environ.get("QT_QPA_PLATFORM", "")
if qt_platform != "offscreen":
os.environ["QT_QPA_PLATFORM"] = "xcb"
# Default QtAds configuration
QtAds.CDockManager.setConfigFlag(QtAds.CDockManager.eConfigFlag.FocusHighlighting, True)
QtAds.CDockManager.setConfigFlag(
QtAds.CDockManager.eConfigFlag.RetainTabSizeWhenCloseButtonHidden, True
)
__all__ = ["BECWidget", "SafeSlot", "SafeProperty"]
def __getattr__(name):
if name == "BECWidget":
from bec_widgets.utils.bec_widget import BECWidget
return BECWidget
if name in {"SafeSlot", "SafeProperty"}:
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
return {"SafeSlot": SafeSlot, "SafeProperty": SafeProperty}[name]
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
-15
View File
@@ -1,15 +0,0 @@
import os
import sys
import bec_widgets.widgets.containers.qt_ads as QtAds
if sys.platform.startswith("linux"):
qt_platform = os.environ.get("QT_QPA_PLATFORM", "")
if qt_platform != "offscreen":
os.environ["QT_QPA_PLATFORM"] = "xcb"
# Default QtAds configuration
QtAds.CDockManager.setConfigFlag(QtAds.CDockManager.eConfigFlag.FocusHighlighting, True)
QtAds.CDockManager.setConfigFlag(
QtAds.CDockManager.eConfigFlag.RetainTabSizeWhenCloseButtonHidden, True
)
+19 -16
View File
@@ -1,42 +1,45 @@
from __future__ import annotations
from typing import Literal
from bec_lib import bec_logger
from bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area import AdvancedDockArea
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
logger = bec_logger.logger
def dock_area(
object_name: str | None = None, startup_profile: str | Literal["restore", "skip"] | None = None
) -> BECDockArea:
object_name: str | None = None, profile: str | None = None, start_empty: bool = False
) -> AdvancedDockArea:
"""
Create an advanced dock area using Qt Advanced Docking System.
Args:
object_name(str): The name of the advanced dock area.
startup_profile(str | Literal["restore", "skip"] | None): Startup mode for
the workspace:
- None: start empty
- "restore": restore last used profile
- "skip": do not initialize profile state
- "<name>": load specific profile
profile(str|None): Optional profile to load; if None the "general" profile is used.
start_empty(bool): If True, start with an empty dock area when loading specified profile.
Returns:
BECDockArea: The created advanced dock area.
AdvancedDockArea: The created advanced dock area.
Note:
The "general" profile is mandatory and will always exist. If manually deleted,
it will be automatically recreated.
"""
# Default to "general" profile when called from CLI without specifying a profile
effective_profile = profile if profile is not None else "general"
widget = BECDockArea(
widget = AdvancedDockArea(
object_name=object_name,
restore_initial_profile=True,
root_widget=True,
profile_namespace="bec",
startup_profile=startup_profile,
init_profile=effective_profile,
start_empty=start_empty,
)
logger.info(
f"Created advanced dock area with profile: {effective_profile}, start_empty: {start_empty}"
)
logger.info(f"Created advanced dock area with startup_profile: {startup_profile}")
return widget
@@ -48,7 +51,7 @@ def auto_update_dock_area(object_name: str | None = None) -> AutoUpdates:
object_name(str): The name of the dock area.
Returns:
BECDockArea: The created dock area.
AdvancedDockArea: The created dock area.
"""
_auto_update = AutoUpdates(object_name=object_name)
return _auto_update
+98 -127
View File
@@ -20,19 +20,21 @@ from qtpy.QtWidgets import (
)
import bec_widgets
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets
from bec_widgets.utils.container_utils import WidgetContainerUtils
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.name_utils import pascal_to_snake
from bec_widgets.utils.plugin_utils import get_plugin_auto_updates
from bec_widgets.utils.round_frame import RoundedFrame
from bec_widgets.utils.rpc_register import RPCRegister
from bec_widgets.utils.screen_utils import apply_window_geometry, centered_geometry_for_app
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.utils.ui_loader import UILoader
from bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area import AdvancedDockArea
from bec_widgets.widgets.containers.advanced_dock_area.profile_utils import (
get_last_profile,
list_profiles,
)
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
from bec_widgets.widgets.containers.dock_area.profile_utils import get_last_profile, list_profiles
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow, BECMainWindowNoRPC
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
@@ -43,7 +45,6 @@ if TYPE_CHECKING: # pragma: no cover
logger = bec_logger.logger
MODULE_PATH = os.path.dirname(bec_widgets.__file__)
START_EMPTY_PROFILE_OPTION = "Start Empty (No Profile)"
class LaunchTile(RoundedFrame):
@@ -77,28 +78,23 @@ class LaunchTile(RoundedFrame):
circular_pixmap.fill(Qt.transparent)
painter = QPainter(circular_pixmap)
painter.setRenderHints(QPainter.RenderHint.Antialiasing, True)
painter.setRenderHints(QPainter.Antialiasing, True)
path = QPainterPath()
path.addEllipse(0, 0, size, size)
painter.setClipPath(path)
pixmap = pixmap.scaled(
size,
size,
Qt.AspectRatioMode.KeepAspectRatio,
Qt.TransformationMode.SmoothTransformation,
)
pixmap = pixmap.scaled(size, size, Qt.KeepAspectRatio, Qt.SmoothTransformation)
painter.drawPixmap(0, 0, pixmap)
painter.end()
self.icon_label.setPixmap(circular_pixmap)
self.layout.addWidget(self.icon_label, alignment=Qt.AlignmentFlag.AlignCenter)
self.layout.addWidget(self.icon_label, alignment=Qt.AlignCenter)
# Top label
self.top_label = QLabel(top_label.upper())
font_top = self.top_label.font()
font_top.setPointSize(10)
self.top_label.setFont(font_top)
self.layout.addWidget(self.top_label, alignment=Qt.AlignmentFlag.AlignCenter)
self.layout.addWidget(self.top_label, alignment=Qt.AlignCenter)
# Main label
self.main_label = QLabel(main_label)
@@ -108,7 +104,7 @@ class LaunchTile(RoundedFrame):
font_main.setPointSize(14)
font_main.setBold(True)
self.main_label.setFont(font_main)
self.main_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.main_label.setAlignment(Qt.AlignCenter)
# Shrink font if the default would wrap on this platform / DPI
content_width = (
@@ -124,13 +120,13 @@ class LaunchTile(RoundedFrame):
self.layout.addWidget(self.main_label)
self.spacer_top = QSpacerItem(0, 10, QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed)
self.spacer_top = QSpacerItem(0, 10, QSizePolicy.Fixed, QSizePolicy.Fixed)
self.layout.addItem(self.spacer_top)
# Description
self.description_label = QLabel(description)
self.description_label.setWordWrap(True)
self.description_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.description_label.setAlignment(Qt.AlignCenter)
self.layout.addWidget(self.description_label)
# Selector
@@ -140,14 +136,13 @@ class LaunchTile(RoundedFrame):
else:
self.selector = None
self.spacer_bottom = QSpacerItem(
0, 0, QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Expanding
)
self.spacer_bottom = QSpacerItem(0, 0, QSizePolicy.Fixed, QSizePolicy.Expanding)
self.layout.addItem(self.spacer_bottom)
# Action button
self.action_button = QPushButton("Open")
self.action_button.setStyleSheet("""
self.action_button.setStyleSheet(
"""
QPushButton {
background-color: #007AFF;
border: none;
@@ -159,8 +154,9 @@ class LaunchTile(RoundedFrame):
QPushButton:hover {
background-color: #005BB5;
}
""")
self.layout.addWidget(self.action_button, alignment=Qt.AlignmentFlag.AlignCenter)
"""
)
self.layout.addWidget(self.action_button, alignment=Qt.AlignCenter)
def _fit_label_to_width(self, label: QLabel, max_width: int, min_pt: int = 10):
"""
@@ -183,14 +179,12 @@ class LaunchTile(RoundedFrame):
metrics = QFontMetrics(font)
label.setFont(font)
label.setWordWrap(False)
label.setText(metrics.elidedText(label.text(), Qt.TextElideMode.ElideRight, max_width))
label.setText(metrics.elidedText(label.text(), Qt.ElideRight, max_width))
class LaunchWindow(BECMainWindow):
RPC = True
PLUGIN = False
TILE_SIZE = (250, 300)
DEFAULT_LAUNCH_SIZE = (800, 600)
USER_ACCESS = ["show_launcher", "hide_launcher"]
def __init__(
@@ -207,7 +201,6 @@ class LaunchWindow(BECMainWindow):
self.app = QApplication.instance()
self.tiles: dict[str, LaunchTile] = {}
self._logged_unparented_connections: set[str] = set()
# Track the smallest mainlabel font size chosen so far
self._min_main_label_pt: int | None = None
@@ -216,7 +209,7 @@ class LaunchWindow(BECMainWindow):
self.toolbar = ModularToolBar(parent=self)
self.addToolBar(Qt.TopToolBarArea, self.toolbar)
self.spacer = QWidget(self)
self.spacer.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
self.spacer.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
self.toolbar.addWidget(self.spacer)
self.toolbar.addWidget(self.dark_mode_button)
@@ -325,7 +318,7 @@ class LaunchWindow(BECMainWindow):
)
tile.setFixedWidth(self.TILE_SIZE[0])
tile.setMinimumHeight(self.TILE_SIZE[1])
tile.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.MinimumExpanding)
tile.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.MinimumExpanding)
if action_button:
tile.action_button.clicked.connect(action_button)
if show_selector and selector_items:
@@ -354,7 +347,7 @@ class LaunchWindow(BECMainWindow):
def _refresh_dock_area_profiles(self, preserve_selection: bool = True) -> None:
"""
Refresh the dock-area profile selector, optionally preserving the selection.
Defaults to Start Empty when no valid selection can be preserved.
Sets the combobox to the last used profile or "general" if no selection preserved.
Args:
preserve_selection(bool): Whether to preserve the current selection or not.
@@ -369,10 +362,9 @@ class LaunchWindow(BECMainWindow):
)
profiles = list_profiles("bec")
selector_items = [START_EMPTY_PROFILE_OPTION, *profiles]
selector.blockSignals(True)
selector.clear()
for profile in selector_items:
for profile in profiles:
selector.addItem(profile)
if selected_text:
@@ -381,31 +373,21 @@ class LaunchWindow(BECMainWindow):
if idx >= 0:
selector.setCurrentIndex(idx)
else:
# Selection no longer exists, fall back to default startup selection.
# Selection no longer exists, fall back to last profile or "general"
self._set_selector_to_default_profile(selector, profiles)
else:
# No selection to preserve, use default startup selection.
# No selection to preserve, use last profile or "general"
self._set_selector_to_default_profile(selector, profiles)
selector.blockSignals(False)
def _set_selector_to_default_profile(self, selector: QComboBox, profiles: list[str]) -> None:
"""
Set the selector default.
Preference order:
1) Start Empty option (if available)
2) Last used profile
3) First available profile
Set the selector to the last used profile or "general" as fallback.
Args:
selector(QComboBox): The combobox to set.
profiles(list[str]): List of available profiles.
"""
start_empty_idx = selector.findText(START_EMPTY_PROFILE_OPTION, Qt.MatchFlag.MatchExactly)
if start_empty_idx >= 0:
selector.setCurrentIndex(start_empty_idx)
return
# Try to get last used profile
last_profile = get_last_profile(namespace="bec")
if last_profile and last_profile in profiles:
@@ -414,6 +396,13 @@ class LaunchWindow(BECMainWindow):
selector.setCurrentIndex(idx)
return
# Fall back to "general" profile
if "general" in profiles:
idx = selector.findText("general", Qt.MatchFlag.MatchExactly)
if idx >= 0:
selector.setCurrentIndex(idx)
return
# If nothing else, select first item
if selector.count() > 0:
selector.setCurrentIndex(0)
@@ -439,9 +428,7 @@ class LaunchWindow(BECMainWindow):
from bec_widgets.applications import bw_launch
with RPCRegister.delayed_broadcast() as rpc_register:
if geometry is None and launch_script != "custom_ui_file":
geometry = self._default_launch_geometry()
existing_dock_areas = rpc_register.get_names_of_rpc_by_class_type(BECDockArea)
existing_dock_areas = rpc_register.get_names_of_rpc_by_class_type(AdvancedDockArea)
if name is not None:
WidgetContainerUtils.raise_for_invalid_name(name)
# If name already exists, generate a unique one with counter suffix
@@ -464,13 +451,13 @@ class LaunchWindow(BECMainWindow):
if launch_script == "auto_update":
auto_update = kwargs.pop("auto_update", None)
return self._launch_auto_update(auto_update, geometry=geometry)
return self._launch_auto_update(auto_update)
if launch_script == "widget":
widget = kwargs.pop("widget", None)
if widget is None:
raise ValueError("Widget name must be provided.")
return self._launch_widget(widget, geometry=geometry)
return self._launch_widget(widget)
launch = getattr(bw_launch, launch_script, None)
if launch is None:
@@ -482,13 +469,13 @@ class LaunchWindow(BECMainWindow):
logger.info(f"Created new dock area: {name}")
if isinstance(result_widget, BECMainWindow):
apply_window_geometry(result_widget, geometry)
self._apply_window_geometry(result_widget, geometry)
result_widget.show()
else:
window = BECMainWindowNoRPC()
window.setCentralWidget(result_widget)
window.setWindowTitle(f"BEC - {result_widget.objectName()}")
apply_window_geometry(window, geometry)
self._apply_window_geometry(window, geometry)
window.show()
return result_widget
@@ -524,14 +511,12 @@ class LaunchWindow(BECMainWindow):
window.setCentralWidget(loaded)
window.setWindowTitle(f"BEC - {filename}")
apply_window_geometry(window, None)
self._apply_window_geometry(window, None)
window.show()
logger.info(f"Launched custom UI: {filename}, type: {type(window).__name__}")
return window
def _launch_auto_update(
self, auto_update: str, geometry: tuple[int, int, int, int] | None = None
) -> AutoUpdates:
def _launch_auto_update(self, auto_update: str) -> AutoUpdates:
if auto_update in self.available_auto_updates:
auto_update_cls = self.available_auto_updates[auto_update]
window = auto_update_cls()
@@ -542,13 +527,11 @@ class LaunchWindow(BECMainWindow):
window.resize(window.minimumSizeHint())
window.setWindowTitle(f"BEC - {window.objectName()}")
apply_window_geometry(window, geometry)
self._apply_window_geometry(window, None)
window.show()
return window
def _launch_widget(
self, widget: type[BECWidget], geometry: tuple[int, int, int, int] | None = None
) -> QWidget:
def _launch_widget(self, widget: type[BECWidget]) -> QWidget:
name = pascal_to_snake(widget.__name__)
WidgetContainerUtils.raise_for_invalid_name(name)
@@ -561,7 +544,7 @@ class LaunchWindow(BECMainWindow):
window.setCentralWidget(widget_instance)
window.resize(window.minimumSizeHint())
window.setWindowTitle(f"BEC - {widget_instance.objectName()}")
apply_window_geometry(window, geometry)
self._apply_window_geometry(window, None)
window.show()
return window
@@ -592,14 +575,11 @@ class LaunchWindow(BECMainWindow):
"""
tile = self.tiles.get("dock_area")
if tile is None or tile.selector is None:
startup_profile = None
profile = None
else:
selection = tile.selector.currentText().strip()
if selection == START_EMPTY_PROFILE_OPTION:
startup_profile = None
else:
startup_profile = selection if selection else None
return self.launch("dock_area", startup_profile=startup_profile)
profile = selection if selection else None
return self.launch("dock_area", profile=profile)
def _open_widget(self):
"""
@@ -612,9 +592,30 @@ class LaunchWindow(BECMainWindow):
raise ValueError(f"Widget {widget} not found in available widgets.")
return self.launch("widget", widget=self.available_widgets[widget])
def _default_launch_geometry(self) -> tuple[int, int, int, int] | None:
width, height = self.DEFAULT_LAUNCH_SIZE
return centered_geometry_for_app(width=width, height=height)
def _apply_window_geometry(
self, window: QWidget, geometry: tuple[int, int, int, int] | None
) -> None:
"""Apply a provided geometry or center the window with an 80% layout."""
if geometry is not None:
window.setGeometry(*geometry)
return
default_geometry = self._default_window_geometry(window)
if default_geometry is not None:
window.setGeometry(*default_geometry)
else:
window.resize(window.minimumSizeHint())
@staticmethod
def _default_window_geometry(window: QWidget) -> tuple[int, int, int, int] | None:
screen = window.screen() or QApplication.primaryScreen()
if screen is None:
return None
available = screen.availableGeometry()
width = int(available.width() * 0.8)
height = int(available.height() * 0.8)
x = available.x() + (available.width() - width) // 2
y = available.y() + (available.height() - height) // 2
return x, y, width, height
@SafeSlot(popup_error=True)
def _open_custom_ui_file(self):
@@ -656,86 +657,56 @@ class LaunchWindow(BECMainWindow):
super().showEvent(event)
self.setFixedSize(self.size())
def _has_external_window(self, connections: dict) -> bool:
def _launcher_is_last_widget(self, connections: dict) -> bool:
"""
Check if any registered non-launcher connection owns a top-level Qt window.
Check if the launcher is the last widget in the application.
"""
# get all parents of connections
for connection in connections.values():
if self._connection_belongs_to_launcher(connection):
continue
if isinstance(connection, QWidget) and connection.isWindow():
return True
return False
def _log_unparented_connections(self, connections: dict) -> None:
"""
Log non-launcher RPC connections that remain without an active top-level window.
"""
for connection in connections.values():
if self._connection_belongs_to_launcher(connection):
continue
if isinstance(connection, QWidget) and connection.isWindow():
continue
connection_description = (
f"type={type(connection).__name__} objectName={connection.objectName()!r} "
f"gui_id={connection.gui_id!r}"
)
if connection_description in self._logged_unparented_connections:
continue
self._logged_unparented_connections.add(connection_description)
logger.warning(
"Registered non-launcher RPC connection has no active top-level window: "
f"{connection_description}"
)
def _connection_belongs_to_launcher(self, connection: QObject) -> bool:
"""
Check whether a registered connection is the launcher itself or part of its Qt hierarchy.
"""
if connection is self or connection.gui_id == self.gui_id:
return True
parent = connection.parent()
while parent is not None:
if parent is self:
return True
parent = parent.parent()
return False
try:
parent = connection.parent()
if parent is None and connection.objectName() != self.objectName():
logger.info(
f"Found non-launcher connection without parent: {connection.objectName()}"
)
return False
except Exception as e:
logger.error(f"Error getting parent of connection: {e}")
return False
return True
def _turn_off_the_lights(self, connections: dict):
"""
If there is only one connection remaining, it is the launcher, so we show it.
Once the launcher is closed as the last window, we quit the application.
"""
if self._has_external_window(connections):
self.hide()
if self._launcher_is_last_widget(connections):
self.show()
self.activateWindow()
self.raise_()
if self.app:
self.app.setQuitOnLastWindowClosed(False) # type: ignore
self.app.setQuitOnLastWindowClosed(True) # type: ignore
return
self._log_unparented_connections(connections)
self.show()
self.activateWindow()
self.raise_()
self.hide()
if self.app:
self.app.setQuitOnLastWindowClosed(True) # type: ignore
self.app.setQuitOnLastWindowClosed(False) # type: ignore
def closeEvent(self, event):
"""
Close the launcher window.
"""
connections = self.register.list_all_connections()
if self._has_external_window(connections):
event.ignore()
self.hide()
if self._launcher_is_last_widget(connections):
event.accept()
return
event.accept()
event.ignore()
self.hide()
if __name__ == "__main__": # pragma: no cover
if __name__ == "__main__":
import sys
from bec_widgets.utils.colors import apply_theme
+42 -216
View File
@@ -1,29 +1,17 @@
from bec_qthemes import material_icon
from qtpy.QtGui import QAction # type: ignore
from qtpy.QtWidgets import QApplication, QHBoxLayout, QStackedWidget, QWidget
from bec_widgets.applications.navigation_centre.reveal_animator import ANIMATION_DURATION
from bec_widgets.applications.navigation_centre.side_bar import SideBar
from bec_widgets.applications.navigation_centre.side_bar_components import NavigationItem
from bec_widgets.applications.views.admin_view.admin_view import AdminView
from bec_widgets.applications.views.developer_view.developer_view import DeveloperView
from bec_widgets.applications.views.device_manager_view.device_manager_view import DeviceManagerView
from bec_widgets.applications.views.dock_area_view.dock_area_view import DockAreaView
from bec_widgets.applications.views.view import ViewBase, WaveformViewInline, WaveformViewPopup
from bec_widgets.utils.colors import apply_theme
from bec_widgets.utils.guided_tour import GuidedTour
from bec_widgets.utils.name_utils import sanitize_namespace
from bec_widgets.utils.screen_utils import (
apply_centered_size,
available_screen_geometry,
main_app_size_for_screen,
)
from bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area import AdvancedDockArea
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow
class BECMainApp(BECMainWindow):
RPC = False
PLUGIN = False
def __init__(
self,
@@ -55,58 +43,51 @@ class BECMainApp(BECMainWindow):
self._add_views()
# Initialize guided tour
self.guided_tour = GuidedTour(self)
self._setup_guided_tour()
def _add_views(self):
self.add_section("BEC Applications", "bec_apps")
self.dock_area = DockAreaView(self)
self.ads = AdvancedDockArea(self, profile_namespace="bec", auto_profile_namespace=False)
self.ads.setObjectName("MainWorkspace")
self.device_manager = DeviceManagerView(self)
# self.developer_view = DeveloperView(self) #TODO temporary disable until the bugs with BECShell are resolved
self.admin_view = AdminView(self)
self.developer_view = DeveloperView(self)
self.add_view(icon="widgets", title="Dock Area", widget=self.dock_area, mini_text="Docks")
self.add_view(
icon="widgets", title="Dock Area", id="dock_area", widget=self.ads, mini_text="Docks"
)
self.add_view(
icon="display_settings",
title="Device Manager",
id="device_manager",
widget=self.device_manager,
mini_text="DM",
)
# TODO temporary disable until the bugs with BECShell are resolved
# self.add_view(
# icon="code_blocks",
# title="IDE",
# widget=self.developer_view,
# mini_text="IDE",
# exclusive=True,
# )
self.add_view(
icon="admin_panel_settings",
title="Admin View",
widget=self.admin_view,
mini_text="Admin",
from_top=False,
icon="code_blocks",
title="IDE",
widget=self.developer_view,
id="developer_view",
exclusive=True,
)
if self._show_examples:
self.add_section("Examples", "examples")
waveform_view_popup = WaveformViewPopup(
parent=self, view_id="waveform_view_popup", title="Waveform Plot"
parent=self, id="waveform_view_popup", title="Waveform Plot"
)
waveform_view_stack = WaveformViewInline(
parent=self, view_id="waveform_view_stack", title="Waveform Plot"
parent=self, id="waveform_view_stack", title="Waveform Plot"
)
self.add_view(
icon="show_chart",
title="Waveform With Popup",
id="waveform_popup",
widget=waveform_view_popup,
mini_text="Popup",
)
self.add_view(
icon="show_chart",
title="Waveform InLine Stack",
id="waveform_stack",
widget=waveform_view_stack,
mini_text="Stack",
)
@@ -114,9 +95,6 @@ class BECMainApp(BECMainWindow):
self.set_current("dock_area")
self.sidebar.add_dark_mode_item()
# Add guided tour to Help menu
self._add_guided_tour_to_menu()
# --- Public API ------------------------------------------------------
def add_section(self, title: str, id: str, position: int | None = None):
return self.sidebar.add_section(title, id, position)
@@ -132,7 +110,7 @@ class BECMainApp(BECMainWindow):
*,
icon: str,
title: str,
view_id: str | None = None,
id: str,
widget: QWidget,
mini_text: str | None = None,
position: int | None = None,
@@ -146,8 +124,7 @@ class BECMainApp(BECMainWindow):
Args:
icon(str): Icon name for the nav item.
title(str): Title for the nav item.
view_id(str, optional): Unique ID for the view/item. If omitted, uses mini_text;
if mini_text is also omitted, uses title.
id(str): Unique ID for the view/item.
widget(QWidget): The widget to add to the stack.
mini_text(str, optional): Short text for the nav item when sidebar is collapsed.
position(int, optional): Position to insert the nav item.
@@ -160,11 +137,10 @@ class BECMainApp(BECMainWindow):
"""
resolved_id = sanitize_namespace(view_id or mini_text or title)
item = self.sidebar.add_item(
icon=icon,
title=title,
id=resolved_id,
id=id,
mini_text=mini_text,
position=position,
from_top=from_top,
@@ -174,15 +150,13 @@ class BECMainApp(BECMainWindow):
# Wrap plain widgets into a ViewBase so enter/exit hooks are available
if isinstance(widget, ViewBase):
view_widget = widget
view_widget.view_id = resolved_id
view_widget.view_id = id
view_widget.view_title = title
else:
view_widget = ViewBase(content=widget, parent=self, view_id=resolved_id, title=title)
view_widget.change_object_name(resolved_id)
view_widget = ViewBase(content=widget, parent=self, id=id, title=title)
idx = self.stack.addWidget(view_widget)
self._view_index[resolved_id] = idx
self._view_index[id] = idx
return item
def set_current(self, id: str) -> None:
@@ -191,12 +165,6 @@ class BECMainApp(BECMainWindow):
# Internal: route sidebar selection to the stack
def _on_view_selected(self, vid: str) -> None:
# Special handling for views that can not be switched to (e.g. dark mode toggle)
# Not registered as proper view with a stack index, so we ignore any logic below
# as it will anyways not result in a stack switch.
idx = self._view_index.get(vid)
if idx is None or not (0 <= idx < self.stack.count()):
return
# Determine current view
current_index = self.stack.currentIndex()
current_view = (
@@ -222,160 +190,6 @@ class BECMainApp(BECMainWindow):
if hasattr(new_view, "on_enter"):
new_view.on_enter()
def _setup_guided_tour(self):
"""
Setup the guided tour for the main application.
Registers key UI components and delegates to views for their internal components.
"""
tour_steps = []
# --- General Layout Components ---
# Register the sidebar toggle button
toggle_step = self.guided_tour.register_widget(
widget=self.sidebar.toggle,
title="Sidebar Toggle",
text="Click this button to expand or collapse the sidebar. When expanded, you can see full navigation item titles and section names.",
)
tour_steps.append(toggle_step)
# Register the sidebar icons
sidebar_dock_area = self.sidebar.components.get("dock_area")
if sidebar_dock_area:
dock_step = self.guided_tour.register_widget(
widget=sidebar_dock_area,
title="Dock Area View",
text="Click here to access the Dock Area view, where you can manage and arrange your dockable panels.",
)
tour_steps.append(dock_step)
sidebar_device_manager = self.sidebar.components.get("device_manager")
if sidebar_device_manager:
device_manager_step = self.guided_tour.register_widget(
widget=sidebar_device_manager,
title="Device Manager View",
text="Click here to open the Device Manager view, where you can view and manage device configs.",
)
tour_steps.append(device_manager_step)
sidebar_developer_view = self.sidebar.components.get("developer_view")
if sidebar_developer_view:
developer_view_step = self.guided_tour.register_widget(
widget=sidebar_developer_view,
title="Developer View",
text="Click here to access the Developer view to write scripts and macros.",
)
tour_steps.append(developer_view_step)
# Register the dark mode toggle
dark_mode_item = self.sidebar.components.get("dark_mode")
if dark_mode_item:
dark_mode_step = self.guided_tour.register_widget(
widget=dark_mode_item,
title="Theme Toggle",
text="Switch between light and dark themes. The theme preference is saved and will be applied when you restart the application.",
)
tour_steps.append(dark_mode_step)
# Register the client info label
if hasattr(self, "_client_info_hover"):
client_info_step = self.guided_tour.register_widget(
widget=self._client_info_hover,
title="Client Status",
text="Displays status messages and information from the BEC Server.",
)
tour_steps.append(client_info_step)
# Register the scan progress bar if available
if hasattr(self, "_scan_progress_hover"):
progress_step = self.guided_tour.register_widget(
widget=self._scan_progress_hover,
title="Scan Progress",
text="Monitor the progress of ongoing scans. Hover over the progress bar to see detailed information including elapsed time and estimated completion.",
)
tour_steps.append(progress_step)
# Register the notification indicator in the status bar
if hasattr(self, "notification_indicator"):
notif_step = self.guided_tour.register_widget(
widget=self.notification_indicator,
title="Notification Center",
text="View system notifications, errors, and status updates. Click to filter notifications by type or expand to see all details.",
)
tour_steps.append(notif_step)
# --- View-Specific Components ---
# Register all views that can extend the tour
for view_id, view_index in self._view_index.items():
view_widget = self.stack.widget(view_index)
if not view_widget or not hasattr(view_widget, "register_tour_steps"):
continue
# Get the view's tour steps
view_tour = view_widget.register_tour_steps(self.guided_tour, self)
if view_tour is None:
if hasattr(view_widget.content, "register_tour_steps"):
view_tour = view_widget.content.register_tour_steps(self.guided_tour, self)
if view_tour is None:
continue
# Get the corresponding sidebar navigation item
nav_item = self.sidebar.components.get(view_id)
if not nav_item:
continue
# Use the view's title for the navigation button
nav_step = self.guided_tour.register_widget(
widget=nav_item,
title=view_tour.view_title,
text=f"Let's explore the features of the {view_tour.view_title}.",
)
tour_steps.append(nav_step)
tour_steps.extend(view_tour.step_ids)
# Create the tour with all registered steps
if tour_steps:
self.guided_tour.create_tour(tour_steps)
def start_guided_tour(self):
"""
Public method to start the guided tour.
This can be called programmatically or connected to a menu/button action.
"""
self.guided_tour.start_tour()
def _add_guided_tour_to_menu(self):
"""
Add a 'Guided Tour' action to the Help menu.
"""
# Find the Help menu
menu_bar = self.menuBar()
help_menu = None
for action in menu_bar.actions():
if action.text() == "Help":
help_menu = action.menu()
break
if help_menu:
# Add separator before the tour action
help_menu.addSeparator()
# Create and add the guided tour action
tour_action = QAction("Start Guided Tour", self)
tour_action.setIcon(material_icon("help"))
tour_action.triggered.connect(self.start_guided_tour)
tour_action.setShortcut("F1") # Add keyboard shortcut
help_menu.addAction(tour_action)
def cleanup(self):
for view_id, idx in self._view_index.items():
view = self.stack.widget(idx)
view.close()
view.deleteLater()
super().cleanup()
def main(): # pragma: no cover
"""
@@ -394,16 +208,28 @@ def main(): # pragma: no cover
args, qt_args = parser.parse_known_args(sys.argv[1:])
app = QApplication([sys.argv[0], *qt_args])
app.setApplicationName("BEC")
apply_theme("dark")
w = BECMainApp(show_examples=args.examples)
screen_geometry = available_screen_geometry()
if screen_geometry is not None:
width, height = main_app_size_for_screen(screen_geometry)
apply_centered_size(w, width, height, available=screen_geometry)
else:
w.resize(w.minimumSizeHint())
screen = app.primaryScreen()
screen_geometry = screen.availableGeometry()
screen_width = screen_geometry.width()
screen_height = screen_geometry.height()
# 70% of screen height, keep 16:9 ratio
height = int(screen_height * 0.9)
width = int(height * (16 / 9))
# If width exceeds screen width, scale down
if width > screen_width * 0.9:
width = int(screen_width * 0.9)
height = int(width / (16 / 9))
w.resize(width, height)
# Center the window on the screen
x = screen_geometry.x() + (screen_geometry.width() - width) // 2
y = screen_geometry.y() + (screen_geometry.height() - height) // 2
w.move(x, y)
w.show()
@@ -127,10 +127,12 @@ class NavigationItem(QWidget):
self._icon_size_expanded = QtCore.QSize(26, 26)
self.icon_btn.setIconSize(self._icon_size_collapsed)
# Remove QToolButton hover/pressed background/outline
self.icon_btn.setStyleSheet("""
self.icon_btn.setStyleSheet(
"""
QToolButton:hover { background: transparent; border: none; }
QToolButton:pressed { background: transparent; border: none; }
""")
"""
)
# Mini label below icon
self.mini_lbl = QLabel(self._mini_text, self)
@@ -1,35 +0,0 @@
"""Module for Admin View."""
from qtpy.QtWidgets import QWidget
from bec_widgets.applications.views.view import ViewBase
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.services.bec_atlas_admin_view.bec_atlas_admin_view import BECAtlasAdminView
class AdminView(ViewBase):
"""
A view for administrators to change the current active experiment, manage messaging
services, and more tasks reserved for users with admin privileges.
"""
def __init__(
self,
parent: QWidget | None = None,
content: QWidget | None = None,
*,
view_id: str | None = None,
title: str | None = None,
**kwargs,
):
super().__init__(parent=parent, content=content, view_id=view_id, title=title)
self.admin_widget = BECAtlasAdminView(parent=self)
self.set_content(self.admin_widget)
@SafeSlot()
def on_exit(self) -> None:
"""Called before the view is hidden.
Default implementation does nothing. Override in subclasses.
"""
self.admin_widget.logout()
@@ -1,7 +1,7 @@
from qtpy.QtWidgets import QWidget
from bec_widgets.applications.views.developer_view.developer_widget import DeveloperWidget
from bec_widgets.applications.views.view import ViewBase, ViewTourSteps
from bec_widgets.applications.views.view import ViewBase
class DeveloperView(ViewBase):
@@ -14,89 +14,13 @@ class DeveloperView(ViewBase):
parent: QWidget | None = None,
content: QWidget | None = None,
*,
view_id: str | None = None,
id: str | None = None,
title: str | None = None,
**kwargs,
):
super().__init__(parent=parent, content=content, view_id=view_id, title=title, **kwargs)
super().__init__(parent=parent, content=content, id=id, title=title)
self.developer_widget = DeveloperWidget(parent=self)
self.set_content(self.developer_widget)
def register_tour_steps(self, guided_tour, main_app) -> ViewTourSteps | None:
"""Register Developer View components with the guided tour.
Args:
guided_tour: The GuidedTour instance to register with.
main_app: The main application instance (for accessing set_current).
Returns:
ViewTourSteps | None: Model containing view title and step IDs.
"""
step_ids = []
dev_widget = self.developer_widget
# IDE Toolbar
def get_ide_toolbar():
main_app.set_current("developer_view")
return (dev_widget.toolbar, None)
step_id = guided_tour.register_widget(
widget=get_ide_toolbar,
title="IDE Toolbar",
text="Quick access to save files, execute scripts, and configure IDE settings. Use the toolbar to manage your code and execution.",
)
step_ids.append(step_id)
# IDE Explorer
def get_ide_explorer():
main_app.set_current("developer_view")
return (dev_widget.explorer_dock.widget(), None)
step_id = guided_tour.register_widget(
widget=get_ide_explorer,
title="File Explorer",
text="Browse and manage your macro files. Create new files, open existing ones, and organize your scripts.",
)
step_ids.append(step_id)
# IDE Editor
def get_ide_editor():
main_app.set_current("developer_view")
return (dev_widget.monaco_dock.widget(), None)
step_id = guided_tour.register_widget(
widget=get_ide_editor,
title="Code Editor",
text="Write and edit Python code with syntax highlighting, auto-completion, and signature help. Monaco editor provides a modern coding experience.",
)
step_ids.append(step_id)
# IDE Console
def get_ide_console():
main_app.set_current("developer_view")
return (dev_widget.console_dock.widget(), None)
step_id = guided_tour.register_widget(
widget=get_ide_console,
title="BEC Shell Console",
text="Interactive Python console with BEC integration. Execute commands, test code snippets, and interact with the BEC system in real-time.",
)
step_ids.append(step_id)
# IDE Plotting Area
def get_ide_plotting():
main_app.set_current("developer_view")
return (dev_widget.plotting_ads, None)
step_id = guided_tour.register_widget(
widget=get_ide_plotting,
title="Plotting Area",
text="View plots and visualizations generated by your scripts. Arrange multiple plots in a flexible layout.",
)
step_ids.append(step_id)
return ViewTourSteps(view_title="Developer View", step_ids=step_ids)
if __name__ == "__main__":
import sys
@@ -126,11 +50,7 @@ if __name__ == "__main__":
_app.resize(width, height)
developer_view = DeveloperView()
_app.add_view(
icon="code_blocks",
title="IDE",
widget=developer_view,
view_id="developer_view",
exclusive=True,
icon="code_blocks", title="IDE", widget=developer_view, id="developer_view", exclusive=True
)
_app.show()
# developer_view.show()
@@ -13,12 +13,12 @@ from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.toolbars.actions import MaterialIconAction
from bec_widgets.utils.toolbars.bundles import ToolbarBundle
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.widgets.containers.dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
from bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area import AdvancedDockArea
from bec_widgets.widgets.containers.advanced_dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.containers.qt_ads import CDockWidget
from bec_widgets.widgets.editors.bec_console.bec_console import BecConsole, BECShell
from bec_widgets.widgets.editors.monaco.monaco_dock import MonacoDock
from bec_widgets.widgets.editors.monaco.monaco_widget import MonacoWidget
from bec_widgets.widgets.editors.web_console.web_console import BECShell, WebConsole
from bec_widgets.widgets.utility.ide_explorer.ide_explorer import IDEExplorer
@@ -79,8 +79,6 @@ def markdown_to_html(md_text: str) -> str:
class DeveloperWidget(DockAreaWidget):
RPC = False
PLUGIN = False
def __init__(self, parent=None, **kwargs):
super().__init__(parent=parent, variant="compact", **kwargs)
@@ -94,14 +92,14 @@ class DeveloperWidget(DockAreaWidget):
self.explorer = IDEExplorer(self)
self.explorer.setObjectName("Explorer")
self.console = BECShell(self, rpc_exposed=False)
self.console = BECShell(self)
self.console.setObjectName("BEC Shell")
self.terminal = BecConsole(self, rpc_exposed=False)
self.terminal = WebConsole(self)
self.terminal.setObjectName("Terminal")
self.monaco = MonacoDock(self, rpc_exposed=False, rpc_passthrough_children=False)
self.monaco = MonacoDock(self)
self.monaco.setObjectName("MonacoEditor")
self.monaco.save_enabled.connect(self._on_save_enabled_update)
self.plotting_ads = BECDockArea(
self.plotting_ads = AdvancedDockArea(
self,
mode="plot",
default_add_direction="bottom",
@@ -410,3 +408,23 @@ class DeveloperWidget(DockAreaWidget):
"""Clean up resources used by the developer widget."""
self.delete_all()
return super().cleanup()
if __name__ == "__main__":
import sys
from bec_qthemes import apply_theme
from qtpy.QtWidgets import QApplication
from bec_widgets.applications.main_app import BECMainApp
app = QApplication(sys.argv)
apply_theme("dark")
_app = BECMainApp()
_app.show()
# developer_view.show()
# developer_view.setWindowTitle("Developer View")
# developer_view.resize(1920, 1080)
# developer_view.set_stretch(horizontal=[1, 3, 2], vertical=[5, 5]) #can be set during runtime
sys.exit(app.exec_())
@@ -31,7 +31,7 @@ logger = bec_logger.logger
class DeviceManagerOphydValidationDialog(QtWidgets.QDialog):
"""Popup dialog to test Ophyd device configurations interactively."""
def __init__(self, parent=None, config: dict | None = None): # type: ignore
def __init__(self, parent=None, config: dict | None = None): # type:ignore
super().__init__(parent)
self.setWindowTitle("Device Manager Ophyd Test")
self._config_status = ConfigStatus.UNKNOWN.value
@@ -133,7 +133,7 @@ class DeviceFormDialog(QtWidgets.QDialog):
# validated: config_status, connection_status
accepted_data = QtCore.Signal(dict, int, int, str, str)
def __init__(self, parent=None, add_btn_text: str = "Add Device"): # type: ignore
def __init__(self, parent=None, add_btn_text: str = "Add Device"): # type:ignore
super().__init__(parent)
# Track old device name if config is edited
self._old_device_name: str = ""
@@ -4,7 +4,7 @@ from __future__ import annotations
from enum import IntEnum
from functools import partial
from typing import TYPE_CHECKING, Any, List, Tuple
from typing import TYPE_CHECKING, List, Tuple
from bec_lib.logger import bec_logger
from bec_qthemes import apply_theme, material_icon
@@ -38,7 +38,7 @@ from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.toolbars.actions import MaterialIconAction
from bec_widgets.utils.toolbars.bundles import ToolbarBundle
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.widgets.containers.dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.containers.advanced_dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.control.device_manager.components import (
DeviceTable,
DMConfigView,
@@ -103,14 +103,16 @@ class CustomBusyWidget(QWidget):
button_width = int(button_height * aspect_ratio)
self.cancel_button.setFixedSize(button_width, button_height)
color = get_accent_colors()
self.cancel_button.setStyleSheet(f"""
self.cancel_button.setStyleSheet(
f"""
QPushButton {{
background-color: {color.emergency.name()};
color: white;
font-weight: 600;
border-radius: 6px;
}}
""")
"""
)
# Layout
content_layout = QVBoxLayout(self)
@@ -126,10 +128,12 @@ class CustomBusyWidget(QWidget):
bg_color = color._colors.get("BG", None)
if bg_color is None: # Fallback if missing
bg_color = QColor(50, 50, 50, 255)
self.setStyleSheet(f"""
self.setStyleSheet(
f"""
background-color: {bg_color.name()};
border-radius: 12px;
""")
"""
)
def _ui_scale(self) -> int:
parent = self.parent()
@@ -169,7 +173,7 @@ class DeviceManagerDisplayWidget(DockAreaWidget):
self._upload_redis_dialog: UploadRedisDialog | None = None
self._dialog_validation_connection: QMetaObject.Connection | None = None
# NOTE: We need here a separate config helper instance to avoid conflicts with
# NOTE: We need here a seperate config helper instance to avoid conflicts with
# other communications to REDIS as uploading a config through a CommunicationConfigAction
# will block if we use the config_helper from self.client.config._config_helper
self._config_helper = config_helper.ConfigHelper(self.client.connector)
@@ -607,8 +611,8 @@ class DeviceManagerDisplayWidget(DockAreaWidget):
self.device_table_view._is_config_in_sync_with_redis()
)
validation_results = self.device_table_view.get_validation_results()
for config, config_status, connection_status in validation_results.values():
if connection_status == ConnectionStatus.CONNECTED.value:
for config, config_status, connnection_status in validation_results.values():
if connnection_status == ConnectionStatus.CONNECTED.value:
self.device_table_view.update_device_validation(
config, config_status, ConnectionStatus.CAN_CONNECT, ""
)
@@ -1,12 +1,11 @@
"""Module for Device Manager View."""
from qtpy.QtCore import QRect
from qtpy.QtWidgets import QWidget
from bec_widgets.applications.views.device_manager_view.device_manager_widget import (
DeviceManagerWidget,
)
from bec_widgets.applications.views.view import ViewBase, ViewTourSteps
from bec_widgets.applications.views.view import ViewBase
from bec_widgets.utils.error_popups import SafeSlot
@@ -20,21 +19,11 @@ class DeviceManagerView(ViewBase):
parent: QWidget | None = None,
content: QWidget | None = None,
*,
view_id: str | None = None,
id: str | None = None,
title: str | None = None,
**kwargs,
):
super().__init__(
parent=parent,
content=content,
view_id=view_id,
title=title,
rpc_passthrough_children=False,
**kwargs,
)
self.device_manager_widget = DeviceManagerWidget(
parent=self, rpc_exposed=False, rpc_passthrough_children=False
)
super().__init__(parent=parent, content=content, id=id, title=title)
self.device_manager_widget = DeviceManagerWidget(parent=self)
self.set_content(self.device_manager_widget)
@SafeSlot()
@@ -45,110 +34,6 @@ class DeviceManagerView(ViewBase):
"""
self.device_manager_widget.on_enter()
def register_tour_steps(self, guided_tour, main_app) -> ViewTourSteps | None:
"""Register Device Manager components with the guided tour.
Args:
guided_tour: The GuidedTour instance to register with.
main_app: The main application instance (for accessing set_current).
Returns:
ViewTourSteps | None: Model containing view title and step IDs.
"""
step_ids = []
dm_widget = self.device_manager_widget
# The device_manager_widget is not yet initialized, so we will register
# tour steps for its uninitialized state.
# Register Load Current Config button
def get_load_current():
main_app.set_current("device_manager")
if dm_widget._initialized is True:
return (None, None)
return (dm_widget.button_load_current_config, None)
step_id = guided_tour.register_widget(
widget=get_load_current,
title="Load Current Config",
text="Load the current device configuration from the BEC server.",
)
step_ids.append(step_id)
# Register Load Config From File button
def get_load_file():
main_app.set_current("device_manager")
if dm_widget._initialized is True:
return (None, None)
return (dm_widget.button_load_config_from_file, None)
step_id = guided_tour.register_widget(
widget=get_load_file,
title="Load Config From File",
text="Load a device configuration from a YAML file on disk.",
)
step_ids.append(step_id)
## Register steps for the initialized state
# Register main device table
def get_device_table():
main_app.set_current("device_manager")
if dm_widget._initialized is False:
return (None, None)
return (dm_widget.device_manager_display.device_table_view, None)
step_id = guided_tour.register_widget(
widget=get_device_table,
title="Device Table",
text="This table displays the config that is prepared to be uploaded to the BEC server. It allows users to review and modify device config settings, and also validate them before uploading to the BEC server.",
)
step_ids.append(step_id)
col_text_mapping = {
0: "Shows if a device configuration is valid. Automatically validated when adding a new device.",
1: "Shows if a device is connectable. Validated on demand.",
2: "Device name, unique across all devices within a config.",
3: "Device class used to initialize the device on the BEC server.",
4: "Defines how BEC treats readings of the device during scans. The options are 'monitored', 'baseline', 'async', 'continuous' or 'on_demand'.",
5: "Defines how BEC reacts if a device readback fails. Options are 'raise', 'retry', or 'buffer'.",
6: "User-defined tags associated with the device.",
7: "A brief description of the device.",
8: "Device is enabled when the configuration is loaded.",
9: "Device is set to read-only.",
10: "This flag allows to configure if the 'trigger' method of the device is called during scans.",
}
# We have at least one device registered
def get_device_table_row(column: int):
main_app.set_current("device_manager")
if dm_widget._initialized is False:
return (None, None)
table = dm_widget.device_manager_display.device_table_view.table
header = table.horizontalHeader()
x = header.sectionViewportPosition(column)
table.horizontalScrollBar().setValue(x)
# Recompute after scrolling
x = header.sectionViewportPosition(column)
w = header.sectionSize(column)
h = header.height()
rect = QRect(x, 0, w, h)
top_left = header.viewport().mapTo(main_app, rect.topLeft())
return (QRect(top_left, rect.size()), col_text_mapping.get(column, ""))
for col, text in col_text_mapping.items():
step_id = guided_tour.register_widget(
widget=lambda col=col: get_device_table_row(col),
title=f"{dm_widget.device_manager_display.device_table_view.table.horizontalHeaderItem(col).text()}",
text=text,
)
step_ids.append(step_id)
if not step_ids:
return None
return ViewTourSteps(view_title="Device Manager", step_ids=step_ids)
if __name__ == "__main__": # pragma: no cover
import sys
@@ -180,7 +65,7 @@ if __name__ == "__main__": # pragma: no cover
_app.add_view(
icon="display_settings",
title="Device Manager",
view_id="device_manager",
id="device_manager",
widget=device_manager_view.device_manager_widget,
mini_text="DM",
)
@@ -22,8 +22,8 @@ class DeviceManagerWidget(BECWidget, QtWidgets.QWidget):
RPC = False
def __init__(self, parent=None, client=None, **kwargs):
super().__init__(parent=parent, client=client, **kwargs)
def __init__(self, parent=None, client=None):
super().__init__(parent=parent, client=client)
self.stacked_layout = QtWidgets.QStackedLayout()
self.stacked_layout.setContentsMargins(0, 0, 0, 0)
self.stacked_layout.setSpacing(0)
@@ -1,31 +0,0 @@
from qtpy.QtWidgets import QWidget
from bec_widgets.applications.views.view import ViewBase
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
class DockAreaView(ViewBase):
"""
Modular dock area view for arranging and managing multiple dockable widgets.
"""
RPC_CONTENT_CLASS = BECDockArea
def __init__(
self,
parent: QWidget | None = None,
content: QWidget | None = None,
*,
view_id: str | None = None,
title: str | None = None,
**kwargs,
):
super().__init__(parent=parent, content=content, view_id=view_id, title=title, **kwargs)
self.dock_area = BECDockArea(
self,
profile_namespace="bec",
auto_profile_namespace=False,
object_name="DockArea",
rpc_exposed=False,
)
self.set_content(self.dock_area)
+14 -72
View File
@@ -1,8 +1,5 @@
from __future__ import annotations
from typing import List
from pydantic import BaseModel
from qtpy.QtCore import QEventLoop
from qtpy.QtWidgets import (
QDialog,
@@ -17,26 +14,13 @@ from qtpy.QtWidgets import (
QWidget,
)
from bec_widgets import BECWidget
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import SignalComboBox
from bec_widgets.widgets.plots.waveform.waveform import Waveform
class ViewTourSteps(BaseModel):
"""Model representing tour steps for a view.
Attributes:
view_title: The human-readable title of the view.
step_ids: List of registered step IDs in the order they should appear.
"""
view_title: str
step_ids: List[str]
class ViewBase(BECWidget, QWidget):
class ViewBase(QWidget):
"""Wrapper for a content widget used inside the main app's stacked view.
Subclasses can implement `on_enter` and `on_exit` to run custom logic when the view becomes visible or is about to be hidden.
@@ -44,28 +28,21 @@ class ViewBase(BECWidget, QWidget):
Args:
content (QWidget): The actual view widget to display.
parent (QWidget | None): Parent widget.
view_id (str | None): Optional view view_id, useful for debugging or introspection.
id (str | None): Optional view id, useful for debugging or introspection.
title (str | None): Optional human-readable title.
"""
RPC = True
PLUGIN = False
USER_ACCESS = ["activate"]
RPC_CONTENT_CLASS: type[QWidget] | None = None
RPC_CONTENT_ATTR = "content"
def __init__(
self,
parent: QWidget | None = None,
content: QWidget | None = None,
*,
view_id: str | None = None,
id: str | None = None,
title: str | None = None,
**kwargs,
):
super().__init__(parent=parent, **kwargs)
super().__init__(parent=parent)
self.content: QWidget | None = None
self.view_id = view_id
self.view_id = id
self.view_title = title
lay = QVBoxLayout(self)
@@ -99,41 +76,6 @@ class ViewBase(BECWidget, QWidget):
"""
return True
@SafeSlot()
def activate(self) -> None:
"""Switch the parent application to this view."""
if not self.view_id:
raise ValueError("Cannot switch view without a view_id.")
parent = self.parent()
while parent is not None:
if hasattr(parent, "set_current"):
parent.set_current(self.view_id)
return
parent = parent.parent()
raise RuntimeError("Could not find a parent application with set_current().")
def cleanup(self):
if self.content is not None:
self.content.close()
self.content.deleteLater()
super().cleanup()
def register_tour_steps(self, guided_tour, main_app) -> ViewTourSteps | None:
"""Register this view's components with the guided tour.
Args:
guided_tour: The GuidedTour instance to register with.
main_app: The main application instance (for accessing set_current).
Returns:
ViewTourSteps | None: A model containing the view title and step IDs,
or None if this view has no tour steps.
Override this method in subclasses to register view-specific components.
"""
return None
####################################################################################################
# Example views for demonstration/testing purposes
@@ -160,17 +102,17 @@ class WaveformViewPopup(ViewBase): # pragma: no cover
self.device_edit.insertItem(0, "")
self.device_edit.setEditable(True)
self.device_edit.setCurrentIndex(0)
self.signal_edit = SignalComboBox(parent=self)
self.signal_edit.include_config_signals = False
self.signal_edit.insertItem(0, "")
self.signal_edit.setEditable(True)
self.device_edit.currentTextChanged.connect(self.signal_edit.set_device)
self.device_edit.device_reset.connect(self.signal_edit.reset_selection)
self.entry_edit = SignalComboBox(parent=self)
self.entry_edit.include_config_signals = False
self.entry_edit.insertItem(0, "")
self.entry_edit.setEditable(True)
self.device_edit.currentTextChanged.connect(self.entry_edit.set_device)
self.device_edit.device_reset.connect(self.entry_edit.reset_selection)
form = QFormLayout()
form.addRow(label)
form.addRow("Device", self.device_edit)
form.addRow("Signal", self.signal_edit)
form.addRow("Signal", self.entry_edit)
buttons = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel, parent=dialog)
buttons.accepted.connect(dialog.accept)
@@ -182,7 +124,7 @@ class WaveformViewPopup(ViewBase): # pragma: no cover
if dialog.exec_() == QDialog.Accepted:
self.waveform.plot(
device_y=self.device_edit.currentText(), signal_y=self.signal_edit.currentText()
y_name=self.device_edit.currentText(), y_entry=self.entry_edit.currentText()
)
@SafeSlot()
@@ -307,7 +249,7 @@ class WaveformViewInline(ViewBase): # pragma: no cover
dev = self.device_edit.currentText()
sig = self.entry_edit.currentText()
if dev and sig:
self.waveform.plot(device_y=dev, signal_y=sig)
self.waveform.plot(y_name=dev, y_entry=sig)
self.stack.setCurrentIndex(1)
def _show_waveform_without_changes(self):
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
-1
View File
@@ -1 +0,0 @@
from bec_widgets.cli.rpc import rpc_base
+254 -716
View File
File diff suppressed because it is too large Load Diff
+48 -245
View File
@@ -5,15 +5,14 @@ from __future__ import annotations
import json
import os
import select
import signal
import subprocess
import threading
import time
from contextlib import contextmanager
from threading import Lock
from typing import TYPE_CHECKING, Callable, Literal, TypeAlias, cast
from typing import TYPE_CHECKING, Literal, TypeAlias, cast
from bec_lib.endpoints import EndpointInfo, MessageEndpoints
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
from rich.console import Console
@@ -34,12 +33,6 @@ else:
logger = bec_logger.logger
IGNORE_WIDGETS = ["LaunchWindow"]
PROCESS_TERMINATION_TIMEOUT = 10
PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT = 2
PROCESS_OUTPUT_SELECT_TIMEOUT = 0.2
GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT = 3
GRACEFUL_SERVER_SHUTDOWN_TIMEOUT = 5
OUTPUT_READER_STOP_EVENT_ATTR = "_bec_output_reader_stop_event"
RegistryState: TypeAlias = dict[
Literal["gui_id", "name", "widget_class", "config", "__rpc__", "container_proxy"],
@@ -60,16 +53,14 @@ def _filter_output(output: str) -> str:
return output
def _get_output(process, logger, stop_event: threading.Event | None = None) -> None:
def _get_output(process, logger) -> None:
log_func = {process.stdout: logger.debug, process.stderr: logger.info}
stream_buffer = {process.stdout: [], process.stderr: []}
try:
os.set_blocking(process.stdout.fileno(), False)
os.set_blocking(process.stderr.fileno(), False)
while process.poll() is None and not (stop_event and stop_event.is_set()):
readylist, _, _ = select.select(
[process.stdout, process.stderr], [], [], PROCESS_OUTPUT_SELECT_TIMEOUT
)
while process.poll() is None:
readylist, _, _ = select.select([process.stdout, process.stderr], [], [], 1)
for stream in (process.stdout, process.stderr):
buf = stream_buffer[stream]
if stream in readylist:
@@ -84,95 +75,6 @@ def _get_output(process, logger, stop_event: threading.Event | None = None) -> N
logger.error(f"Error reading process output: {str(e)}")
def _process_group_snapshot(process) -> str:
try:
pgid = os.getpgid(process.pid)
except ProcessLookupError:
return "Process group snapshot unavailable: process already exited"
try:
result = subprocess.run(
["ps", "-o", "pid,ppid,pgid,stat,command", "-g", str(pgid)],
check=False,
capture_output=True,
text=True,
timeout=2,
)
except Exception as exc:
return f"Process group snapshot unavailable: {exc}"
output = result.stdout.strip()
if not output:
return f"Process group snapshot empty for pgid={pgid}"
return output
def _terminate_plot_process(process, logger, timeout: float = PROCESS_TERMINATION_TIMEOUT) -> None:
if process.poll() is not None:
return
process_info = f"pid={process.pid} command={process.args}"
try:
pgid = os.getpgid(process.pid)
process_info = f"pid={process.pid} pgid={pgid} command={process.args}"
logger.info(f"Terminating GUI process group {process_info}")
os.killpg(pgid, signal.SIGTERM)
except ProcessLookupError:
process.wait(timeout=timeout)
return
except Exception as exc:
logger.warning("Failed to terminate GUI process group; terminating process only.")
logger.info(f"GUI process termination failure details: {exc}. pid={process.pid}")
process.terminate()
try:
process.wait(timeout=timeout)
return
except subprocess.TimeoutExpired:
logger.warning(f"GUI process did not stop within {timeout}s; killing it.")
logger.info(
f"GUI process force-kill details: {process_info}\n"
f"{_process_group_snapshot(process)}"
)
try:
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
except ProcessLookupError as e:
logger.error(f"Failed to kill GUI process group: {e}")
process.wait(timeout=timeout)
return
process.wait(timeout=timeout)
def _wait_for_process_exit(process, timeout: float) -> bool:
try:
process.wait(timeout=timeout)
except subprocess.TimeoutExpired:
return False
return True
def _join_process_output_thread(process, thread: threading.Thread | None, logger) -> None:
if thread is None:
return
thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT)
if not thread.is_alive():
return
if stop_event := getattr(thread, OUTPUT_READER_STOP_EVENT_ATTR, None):
stop_event.set()
for stream in (process.stdout, process.stderr):
if stream is None:
continue
try:
stream.close()
except OSError as e:
logger.error(f"Failed to close stream {str(e)}")
thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT)
if thread.is_alive():
logger.warning("GUI process output reader thread did not stop after process shutdown.")
logger.info(f"GUI process output reader thread details: pid={process.pid}")
def _start_plot_process(
gui_id: str,
gui_class_id: str,
@@ -224,14 +126,8 @@ def _start_plot_process(
if logger is None:
process_output_processing_thread = None
else:
process_output_stop_event = threading.Event()
process_output_processing_thread = threading.Thread(
target=_get_output, args=(process, logger, process_output_stop_event)
)
setattr(
process_output_processing_thread,
OUTPUT_READER_STOP_EVENT_ATTR,
process_output_stop_event,
target=_get_output, args=(process, logger)
)
process_output_processing_thread.start()
return process, process_output_processing_thread
@@ -326,7 +222,6 @@ class BECGuiClient(RPCBase):
self._ipython_registry: dict[str, RPCReference] = {}
self.available_widgets = AvailableWidgetsNamespace()
register_serializer_extension()
self._rpc_timeout = 60
####################
#### Client API ####
@@ -337,21 +232,6 @@ class BECGuiClient(RPCBase):
"""The launcher object."""
return RPCBase(gui_id=f"{self._gui_id}:launcher", parent=self, object_name="launcher")
def set_rpc_timeout(self, timeout: float):
"""Set the timeout for RPC calls to the GUI server.
Args:
timeout(float): The timeout in seconds.
"""
if not isinstance(timeout, (int, float)) or timeout < 0:
raise ValueError("Timeout must be a non-negative number.")
self._rpc_timeout = timeout
def _safe_register_stream(self, endpoint: EndpointInfo, cb: Callable, **kwargs):
"""Check if already registered for registration in idempotent functions."""
if not self._client.connector.any_stream_is_registered(endpoint, cb=cb):
self._client.connector.register(endpoint, cb=cb, **kwargs)
def connect_to_gui_server(self, gui_id: str) -> None:
"""Connect to a GUI server"""
# Unregister the old callback
@@ -367,9 +247,10 @@ class BECGuiClient(RPCBase):
self._ipython_registry = {}
# Register the new callback
self._safe_register_stream(
self._client.connector.register(
MessageEndpoints.gui_registry_state(self._gui_id),
cb=self._handle_registry_update,
parent=self,
from_start=True,
)
@@ -416,32 +297,14 @@ class BECGuiClient(RPCBase):
return self._raise_all()
return self._start(wait=wait)
def change_theme(self, theme: Literal["light", "dark"] | None = None) -> None:
"""
Apply a GUI theme or toggle between dark and light.
Args:
theme(Literal["light", "dark"] | None): Theme to apply. If None, the current
theme is fetched from the GUI and toggled.
"""
if not self._check_if_server_is_alive():
self._start(wait=True)
with wait_for_server(self):
if theme is None:
current_theme = self.launcher._run_rpc("fetch_theme")
next_theme = "light" if current_theme == "dark" else "dark"
else:
next_theme = theme
self.launcher._run_rpc("change_theme", theme=next_theme)
def new(
self,
name: str | None = None,
wait: bool = True,
geometry: tuple[int, int, int, int] | None = None,
launch_script: str = "dock_area",
startup_profile: str | Literal["restore", "skip"] | None = None,
profile: str | None = None,
start_empty: bool = False,
**kwargs,
) -> client.AdvancedDockArea:
"""Create a new top-level dock area.
@@ -451,81 +314,48 @@ class BECGuiClient(RPCBase):
wait(bool, optional): Whether to wait for the server to start. Defaults to True.
geometry(tuple[int, int, int, int] | None): The geometry of the dock area (pos_x, pos_y, w, h).
launch_script(str): The launch script to use. Defaults to "dock_area".
startup_profile(str | Literal["restore", "skip"] | None): Startup mode for
the dock area:
- None: start in transient empty workspace
- "restore": restore last-used profile
- "skip": skip profile initialization
- "<name>": load the named profile
profile(str | None): The profile name to load. If None, loads the "general" profile.
Use a profile name to load a specific saved profile.
start_empty(bool): If True, start with an empty dock area when loading specified profile.
**kwargs: Additional keyword arguments passed to the dock area.
Returns:
client.AdvancedDockArea: The new dock area.
Examples:
>>> gui.new() # Start with an empty unsaved workspace
>>> gui.new(startup_profile="restore") # Restore last profile
>>> gui.new(startup_profile="my_profile") # Load explicit profile
"""
if "profile" in kwargs or "start_empty" in kwargs:
raise TypeError(
"gui.new() no longer accepts 'profile' or 'start_empty'. Use 'startup_profile' instead."
)
Note:
The "general" profile is mandatory and will always exist. If manually deleted,
it will be automatically recreated.
Examples:
>>> gui.new() # Start with the "general" profile
>>> gui.new(profile="my_profile") # Load specific profile, if profile does not exist, the new profile is created empty with specified name
>>> gui.new(start_empty=True) # Start with "general" profile but empty dock area
>>> gui.new(profile="my_profile", start_empty=True) # Start with "my_profile" profile but empty dock area
"""
if not self._check_if_server_is_alive():
self.show(wait=True)
self.start(wait=True)
if wait:
with wait_for_server(self):
return self._new_impl(
name=name,
geometry=geometry,
widget = self.launcher._run_rpc(
"launch",
launch_script=launch_script,
startup_profile=startup_profile,
**kwargs,
)
return self._new_impl(
name=name,
geometry=geometry,
launch_script=launch_script,
startup_profile=startup_profile,
**kwargs,
)
def _new_impl(
self,
*,
name: str | None,
geometry: tuple[int, int, int, int] | None,
launch_script: str,
startup_profile: str | Literal["restore", "skip"] | None,
**kwargs,
):
if launch_script == "dock_area":
try:
return self.launcher._run_rpc(
"system.launch_dock_area",
name=name,
geometry=geometry,
startup_profile=startup_profile,
profile=profile,
start_empty=start_empty,
**kwargs,
)
except ValueError as exc:
error = str(exc)
if (
"Unknown system RPC method: system.launch_dock_area" not in error
and "has no attribute 'system.launch_dock_area'" not in error
):
raise
logger.debug("Server does not support system.launch_dock_area; using launcher RPC")
return self.launcher._run_rpc(
) # pylint: disable=protected-access
return widget
widget = self.launcher._run_rpc(
"launch",
launch_script=launch_script,
name=name,
geometry=geometry,
startup_profile=startup_profile,
profile=profile,
start_empty=start_empty,
**kwargs,
) # pylint: disable=protected-access
return widget
def delete(self, name: str) -> None:
"""Delete a dock area and its parent window.
@@ -569,13 +399,11 @@ class BECGuiClient(RPCBase):
if self._process:
logger.success("Stopping GUI...")
if not self._request_server_shutdown():
_terminate_plot_process(self._process, logger)
_join_process_output_thread(
self._process, self._process_output_processing_thread, logger
)
self._process.terminate()
if self._process_output_processing_thread:
self._process_output_processing_thread.join()
self._process.wait()
self._process = None
self._process_output_processing_thread = None
# Unregister the registry state
self._client.connector.unregister(
@@ -594,37 +422,6 @@ class BECGuiClient(RPCBase):
#### Private methods ####
#########################
def _request_server_shutdown(self) -> bool:
if self._process is None or self._process.poll() is not None:
return True
process_details = f"pid={self._process.pid} command={self._process.args}"
logger.info(f"Requesting graceful GUI shutdown {process_details}")
try:
self.launcher._run_rpc( # pylint: disable=protected-access
"system.shutdown",
wait_for_rpc_response=True,
timeout=GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT,
)
except Exception as exc:
logger.warning(
"Could not confirm graceful GUI shutdown via RPC; "
"falling back to process termination."
)
logger.info(f"Graceful GUI shutdown RPC failure details: {exc}. {process_details}")
return False
if _wait_for_process_exit(self._process, GRACEFUL_SERVER_SHUTDOWN_TIMEOUT):
logger.info(f"GUI server exited after graceful shutdown {process_details}")
return True
logger.warning(
"GUI server did not exit after graceful shutdown request; "
"falling back to process termination."
)
logger.info(
f"Graceful GUI shutdown timeout details: {process_details}\n"
f"{_process_group_snapshot(self._process)}"
)
return False
def _check_if_server_is_alive(self):
"""Checks if the process is alive"""
if self._process is None:
@@ -683,14 +480,20 @@ class BECGuiClient(RPCBase):
def _start(self, wait: bool = False) -> None:
self._killed = False
self._safe_register_stream(
MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update
self._client.connector.register(
MessageEndpoints.gui_registry_state(self._gui_id),
cb=self._handle_registry_update,
parent=self,
)
return self._start_server(wait=wait)
def _handle_registry_update(self, msg: dict[str, GUIRegistryStateMessage]) -> None:
@staticmethod
def _handle_registry_update(
msg: dict[str, GUIRegistryStateMessage], parent: BECGuiClient
) -> None:
# This was causing a deadlock during shutdown, not sure why.
# with self._lock:
self = parent
self._server_registry = cast(dict[str, RegistryState], msg["data"].state)
self._update_dynamic_namespace(self._server_registry)
@@ -698,7 +501,7 @@ class BECGuiClient(RPCBase):
if self.launcher and len(self._top_level) == 0:
self.launcher._run_rpc("show") # pylint: disable=protected-access
for window in self._top_level.values():
window.raise_window()
window.show()
def _show_all(self):
with wait_for_server(self):
@@ -717,7 +520,7 @@ class BECGuiClient(RPCBase):
if self.launcher and len(self._top_level) == 0:
self.launcher._run_rpc("raise") # pylint: disable=protected-access
for window in self._top_level.values():
window.raise_window()
window._run_rpc("raise") # type: ignore[attr-defined]
def _raise_all(self):
with wait_for_server(self):
-166
View File
@@ -1,166 +0,0 @@
# This file was automatically generated by generate_cli.py
# type: ignore
from __future__ import annotations
# pylint: skip-file
designer_plugins = {
"AbortButton": ("bec_widgets.widgets.control.buttons.button_abort.button_abort", "AbortButton"),
"BECColorMapWidget": (
"bec_widgets.widgets.utility.visual.colormap_widget.colormap_widget",
"BECColorMapWidget",
),
"BECMainWindow": ("bec_widgets.widgets.containers.main_window.main_window", "BECMainWindow"),
"BECProgressBar": (
"bec_widgets.widgets.progress.bec_progressbar.bec_progressbar",
"BECProgressBar",
),
"BECQueue": ("bec_widgets.widgets.services.bec_queue.bec_queue", "BECQueue"),
"BECShell": ("bec_widgets.widgets.editors.bec_console.bec_console", "BECShell"),
"BECSpinBox": ("bec_widgets.widgets.utility.spinbox.decimal_spinbox", "BECSpinBox"),
"BECStatusBox": ("bec_widgets.widgets.services.bec_status_box.bec_status_box", "BECStatusBox"),
"BeamlineStateManager": (
"bec_widgets.widgets.services.beamline_states.beamline_state_manager",
"BeamlineStateManager",
),
"BecConsole": ("bec_widgets.widgets.editors.bec_console.bec_console", "BecConsole"),
"ColorButton": ("bec_widgets.widgets.utility.visual.color_button.color_button", "ColorButton"),
"ColorButtonNative": (
"bec_widgets.widgets.utility.visual.color_button_native.color_button_native",
"ColorButtonNative",
),
"ColormapSelector": (
"bec_widgets.widgets.utility.visual.colormap_selector.colormap_selector",
"ColormapSelector",
),
"DapComboBox": ("bec_widgets.widgets.dap.dap_combo_box.dap_combo_box", "DapComboBox"),
"DarkModeButton": (
"bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button",
"DarkModeButton",
),
"DeviceBrowser": (
"bec_widgets.widgets.services.device_browser.device_browser",
"DeviceBrowser",
),
"DeviceComboBox": (
"bec_widgets.widgets.control.device_input.device_combobox.device_combobox",
"DeviceComboBox",
),
"Heatmap": ("bec_widgets.widgets.plots.heatmap.heatmap", "Heatmap"),
"IDEExplorer": ("bec_widgets.widgets.utility.ide_explorer.ide_explorer", "IDEExplorer"),
"Image": ("bec_widgets.widgets.plots.image.image", "Image"),
"LMFitDialog": ("bec_widgets.widgets.dap.lmfit_dialog.lmfit_dialog", "LMFitDialog"),
"LogPanel": ("bec_widgets.widgets.utility.logpanel.logpanel", "LogPanel"),
"Minesweeper": ("bec_widgets.widgets.games.minesweeper", "Minesweeper"),
"MonacoWidget": ("bec_widgets.widgets.editors.monaco.monaco_widget", "MonacoWidget"),
"MotorMap": ("bec_widgets.widgets.plots.motor_map.motor_map", "MotorMap"),
"MultiWaveform": ("bec_widgets.widgets.plots.multi_waveform.multi_waveform", "MultiWaveform"),
"PdfViewerWidget": ("bec_widgets.widgets.utility.pdf_viewer.pdf_viewer", "PdfViewerWidget"),
"PositionIndicator": (
"bec_widgets.widgets.control.device_control.position_indicator.position_indicator",
"PositionIndicator",
),
"PositionerBox": (
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box.positioner_box",
"PositionerBox",
),
"PositionerBox2D": (
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box_2d.positioner_box_2d",
"PositionerBox2D",
),
"PositionerControlLine": (
"bec_widgets.widgets.control.device_control.positioner_box.positioner_control_line.positioner_control_line",
"PositionerControlLine",
),
"PositionerGroup": (
"bec_widgets.widgets.control.device_control.positioner_group.positioner_group",
"PositionerGroup",
),
"ResetButton": ("bec_widgets.widgets.control.buttons.button_reset.button_reset", "ResetButton"),
"ResumeButton": (
"bec_widgets.widgets.control.buttons.button_resume.button_resume",
"ResumeButton",
),
"RingProgressBar": (
"bec_widgets.widgets.progress.ring_progress_bar.ring_progress_bar",
"RingProgressBar",
),
"SBBMonitor": ("bec_widgets.widgets.editors.sbb_monitor.sbb_monitor", "SBBMonitor"),
"ScanControl": ("bec_widgets.widgets.control.scan_control.scan_control", "ScanControl"),
"ScanMetadata": ("bec_widgets.widgets.editors.scan_metadata.scan_metadata", "ScanMetadata"),
"ScanProgressBar": (
"bec_widgets.widgets.progress.scan_progressbar.scan_progressbar",
"ScanProgressBar",
),
"ScatterWaveform": (
"bec_widgets.widgets.plots.scatter_waveform.scatter_waveform",
"ScatterWaveform",
),
"SignalComboBox": (
"bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox",
"SignalComboBox",
),
"SignalLabel": ("bec_widgets.widgets.utility.signal_label.signal_label", "SignalLabel"),
"SpinnerWidget": ("bec_widgets.widgets.utility.spinner.spinner", "SpinnerWidget"),
"StopButton": ("bec_widgets.widgets.control.buttons.stop_button.stop_button", "StopButton"),
"TextBox": ("bec_widgets.widgets.editors.text_box.text_box", "TextBox"),
"ToggleSwitch": ("bec_widgets.widgets.utility.toggle.toggle", "ToggleSwitch"),
"Waveform": ("bec_widgets.widgets.plots.waveform.waveform", "Waveform"),
"WebsiteWidget": ("bec_widgets.widgets.editors.website.website", "WebsiteWidget"),
"WidgetFinderComboBox": (
"bec_widgets.widgets.utility.widget_finder.widget_finder",
"WidgetFinderComboBox",
),
}
widget_icons = {
"AbortButton": "cancel",
"BECColorMapWidget": "palette",
"BECMainWindow": "widgets",
"BECProgressBar": "page_control",
"BECQueue": "edit_note",
"BECShell": "hub",
"BECSpinBox": "123",
"BECStatusBox": "widgets",
"BeamlineStateManager": "format_list_bulleted",
"BecConsole": "terminal",
"ColorButton": "colors",
"ColorButtonNative": "colors",
"ColormapSelector": "palette",
"DapComboBox": "data_exploration",
"DarkModeButton": "dark_mode",
"DeviceBrowser": "lists",
"DeviceComboBox": "list_alt",
"Heatmap": "dataset",
"IDEExplorer": "widgets",
"Image": "image",
"LMFitDialog": "monitoring",
"LogPanel": "browse_activity",
"Minesweeper": "videogame_asset",
"MonacoWidget": "code",
"MotorMap": "my_location",
"MultiWaveform": "ssid_chart",
"PdfViewerWidget": "picture_as_pdf",
"PositionIndicator": "horizontal_distribute",
"PositionerBox": "switch_right",
"PositionerBox2D": "switch_right",
"PositionerControlLine": "switch_left",
"PositionerGroup": "grid_view",
"ResetButton": "restart_alt",
"ResumeButton": "resume",
"RingProgressBar": "track_changes",
"SBBMonitor": "train",
"ScanControl": "tune",
"ScanMetadata": "list_alt",
"ScanProgressBar": "timelapse",
"ScatterWaveform": "scatter_plot",
"SignalComboBox": "list_alt",
"SignalLabel": "scoreboard",
"SpinnerWidget": "progress_activity",
"StopButton": "dangerous",
"TextBox": "chat",
"ToggleSwitch": "toggle_on",
"Waveform": "show_chart",
"WebsiteWidget": "travel_explore",
"WidgetFinderComboBox": "frame_inspect",
}
@@ -7,22 +7,31 @@ import inspect
import os
import sys
from pathlib import Path
from typing import get_overloads
import black
import isort
from bec_lib.logger import bec_logger
from qtpy.QtCore import Property as QtProperty
from bec_widgets.utils.generate_designer_plugin import (
DesignerPluginGenerator,
DesignerPluginInfo,
plugin_filenames,
)
from bec_widgets.utils.generate_designer_plugin import DesignerPluginGenerator, plugin_filenames
from bec_widgets.utils.plugin_utils import BECClassContainer, get_custom_classes
logger = bec_logger.logger
if sys.version_info >= (3, 11):
from typing import get_overloads
else:
print(
"Python version is less than 3.11, using dummy function for get_overloads. "
"If you want to use the real function 'typing.get_overloads()', please use Python 3.11 or later."
)
def get_overloads(_obj):
"""
Dummy function for Python versions before 3.11.
"""
return []
class ClientGenerator:
def __init__(self, base=False):
@@ -45,7 +54,7 @@ from __future__ import annotations
from bec_lib.logger import bec_logger
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call, rpc_timeout
{"from bec_widgets.utils.bec_plugin_helper import get_plugin_client_module" if self._base else ""}
{"from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets, get_plugin_client_module" if self._base else ""}
logger = bec_logger.logger
@@ -85,7 +94,7 @@ logger = bec_logger.logger
if self._base:
self.content += """
class _WidgetsEnumType(str, enum.Enum):
\"\"\" Enum for the available widgets, to be generated programmatically \"\"\"
\"\"\" Enum for the available widgets, to be generated programatically \"\"\"
...
"""
@@ -102,19 +111,27 @@ _Widgets = {
self.content += """
try:
_plugin_widgets = get_all_plugin_widgets().as_dict()
plugin_client = get_plugin_client_module()
Widgets = _WidgetsEnumType("Widgets", {name: name for name in _plugin_widgets} | _Widgets)
if (_overlap := _Widgets.keys() & _plugin_widgets.keys()) != set():
for _widget in _overlap:
logger.warning(f"Detected duplicate widget {_widget} in plugin repo file: {inspect.getfile(_plugin_widgets[_widget])} !")
for plugin_name, plugin_class in inspect.getmembers(plugin_client, inspect.isclass):
if issubclass(plugin_class, RPCBase) and plugin_class is not RPCBase:
if plugin_name not in _Widgets:
_Widgets[plugin_name] = plugin_name
if plugin_name in globals():
conflicting_file = (
inspect.getfile(_plugin_widgets[plugin_name])
if plugin_name in _plugin_widgets
else f"{plugin_client}"
)
logger.warning(
f"Plugin widget {plugin_name} in {plugin_class._IMPORT_MODULE} conflicts with a built-in class!"
f"Plugin widget {plugin_name} from {conflicting_file} conflicts with a built-in class!"
)
continue
else:
if plugin_name not in _overlap:
globals()[plugin_name] = plugin_class
Widgets = _WidgetsEnumType("Widgets", _Widgets)
except ImportError as e:
logger.error(f"Failed loading plugins: \\n{reduce(add, traceback.format_exception(e))}")
"""
@@ -129,8 +146,12 @@ except ImportError as e:
class_name = cls.__name__
self.content += f"""
class {class_name}(RPCBase):\n"""
if class_name == "BECDockArea":
self.content += f"""
class {class_name}(RPCBase):"""
else:
self.content += f"""
class {class_name}(RPCBase):"""
if cls.__doc__:
# We only want the first line of the docstring
@@ -141,11 +162,19 @@ class {class_name}(RPCBase):\n"""
else:
class_docs = cls.__doc__.split("\n")[1]
self.content += f"""
\"\"\"{class_docs}\"\"\"\n"""
user_access_entries = self._get_user_access_entries(cls)
self.content += f' _IMPORT_MODULE="{cls.__module__}"\n'
for method_entry in user_access_entries:
method, obj, is_property_setter = self._resolve_method_object(cls, method_entry)
\"\"\"{class_docs}\"\"\"
"""
if not cls.USER_ACCESS:
self.content += """...
"""
for method in cls.USER_ACCESS:
is_property_setter = False
obj = getattr(cls, method, None)
if obj is None:
obj = getattr(cls, method.split(".setter")[0], None)
is_property_setter = True
method = method.split(".setter")[0]
if obj is None:
raise AttributeError(
f"Method {method} not found in class {cls.__name__}. "
@@ -187,34 +216,6 @@ class {class_name}(RPCBase):\n"""
{doc}
\"\"\""""
@staticmethod
def _get_user_access_entries(cls) -> list[str]:
entries = list(getattr(cls, "USER_ACCESS", []))
content_cls = getattr(cls, "RPC_CONTENT_CLASS", None)
if content_cls is not None:
entries.extend(getattr(content_cls, "USER_ACCESS", []))
return list(dict.fromkeys(entries))
@staticmethod
def _resolve_method_object(cls, method_entry: str):
method_name = method_entry
is_property_setter = False
if method_entry.endswith(".setter"):
is_property_setter = True
method_name = method_entry.split(".setter")[0]
candidate_classes = [cls]
content_cls = getattr(cls, "RPC_CONTENT_CLASS", None)
if content_cls is not None:
candidate_classes.append(content_cls)
for candidate_cls in candidate_classes:
obj = getattr(candidate_cls, method_name, None)
if obj is not None:
return method_name, obj, is_property_setter
return method_name, None, is_property_setter
def _rpc_call(self, timeout_info: dict[str, float | None]):
"""
Decorator to mark a method as an RPC call.
@@ -254,58 +255,6 @@ class {class_name}(RPCBase):\n"""
file.write(formatted_content)
def write_designer_plugins(plugin_infos: list[DesignerPluginInfo], file_name: str):
"""
Write a registry of Qt widget classes with designer plugins.
Args:
plugin_infos(list[DesignerPluginInfo]): The designer plugin metadata to write.
file_name(str): The name of the file to write to.
"""
plugin_infos = sorted(plugin_infos, key=lambda info: info.plugin_name_pascal)
content = """# This file was automatically generated by generate_cli.py
# type: ignore
from __future__ import annotations
# pylint: skip-file
designer_plugins = {
"""
for info in plugin_infos:
widget_module = info.plugin_class.__module__
widget_class = info.plugin_name_pascal
content += f' "{info.plugin_name_pascal}": ("{widget_module}", "{widget_class}"),\n'
content += """
}
widget_icons = {
"""
for info in plugin_infos:
content += f' "{info.plugin_name_pascal}": "{info.icon_name}",\n'
content += """
}
"""
try:
formatted_content = black.format_str(content, mode=black.Mode(line_length=100))
except black.NothingChanged:
formatted_content = content
config = isort.Config(
profile="black",
line_length=100,
multi_line_output=3,
include_trailing_comma=False,
known_first_party=["bec_widgets"],
)
formatted_content = isort.code(formatted_content, config=config)
with open(file_name, "w", encoding="utf-8") as file:
file.write(formatted_content)
def main():
"""
Main entry point for the script, controlled by command line arguments.
@@ -342,8 +291,7 @@ def main():
client_path = module_dir / client_subdir / "client.py"
packages = ("widgets", "applications") if module_name == "bec_widgets" else ("widgets",)
rpc_classes = get_custom_classes(module_name, packages=packages)
rpc_classes = get_custom_classes(module_name)
logger.info(f"Obtained classes with RPC objects: {rpc_classes!r}")
generator = ClientGenerator(base=module_name == "bec_widgets")
@@ -359,8 +307,6 @@ def main():
else:
non_overwrite_classes = []
designer_plugin_infos = []
for cls in rpc_classes.plugins:
logger.info(f"Writing bec-designer plugin files for {cls.__name__}...")
@@ -368,30 +314,21 @@ def main():
logger.error(
f"Not writing plugin files for {cls.__name__} because a built-in widget with that name exists"
)
continue
plugin = DesignerPluginGenerator(cls)
if not hasattr(plugin, "info") or plugin.excluded:
if not hasattr(plugin, "info"):
continue
def _exists(file: str):
return os.path.exists(os.path.join(plugin.info.base_path, file))
if any(_exists(file) for file in plugin_filenames(plugin.info.plugin_name_snake)):
if _exists(plugin.filenames.plugin):
designer_plugin_infos.append(plugin.info)
logger.debug(
f"Skipping generation of extra plugin files for {plugin.info.plugin_name_snake} - at least one file out of 'plugin.py', 'pyproject', and 'register_{plugin.info.plugin_name_snake}.py' already exists."
)
continue
plugin.run()
designer_plugin_infos.append(plugin.info)
# Write designer_plugins.py with plugin import metadata for all widgets with designer plugins.
designer_plugins_path = module_dir / client_subdir / "designer_plugins.py"
logger.info(f"Generating designer plugin registry at {designer_plugins_path}")
write_designer_plugins(designer_plugin_infos, str(designer_plugins_path))
if __name__ == "__main__": # pragma: no cover
+10 -59
View File
@@ -2,7 +2,6 @@ from __future__ import annotations
import inspect
import threading
import time
import uuid
from functools import wraps
from typing import TYPE_CHECKING, Any, cast
@@ -10,7 +9,6 @@ from typing import TYPE_CHECKING, Any, cast
from bec_lib.client import BECClient
from bec_lib.device import DeviceBaseWithConfig
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
if TYPE_CHECKING: # pragma: no cover
@@ -26,9 +24,6 @@ else:
# pylint: disable=protected-access
_DEFAULT_RPC_TIMEOUT = object()
logger = bec_logger.logger
def _name_arg(arg):
if isinstance(arg, DeviceBaseWithConfig):
@@ -159,7 +154,6 @@ class RPCReference:
class RPCBase:
def __init__(
self,
gui_id: str | None = None,
@@ -213,16 +207,12 @@ class RPCBase:
# Use explicit call to ensure action name is 'raise' (not 'raise_')
return self._run_rpc("raise")
def hide(self):
"""Hide this widget (or its container)."""
return self._run_rpc("hide")
def _run_rpc(
self,
method,
*args,
wait_for_rpc_response: bool = True,
timeout: float | None | object = _DEFAULT_RPC_TIMEOUT,
wait_for_rpc_response=True,
timeout=5,
gui_id: str | None = None,
**kwargs,
) -> Any:
@@ -233,16 +223,13 @@ class RPCBase:
method: The method to call.
args: The arguments to pass to the method.
wait_for_rpc_response: Whether to wait for the RPC response.
timeout: The timeout for the RPC response. If omitted, the client's default RPC
timeout is used. If explicitly set to None, wait indefinitely.
timeout: The timeout for the RPC response.
gui_id: The GUI ID to use for the RPC call. If None, the default GUI ID is used.
kwargs: The keyword arguments to pass to the method.
Returns:
The result of the RPC call.
"""
if timeout is _DEFAULT_RPC_TIMEOUT:
timeout = self._root._rpc_timeout
if method in ["show", "hide", "raise"] and gui_id is None:
obj = self._root._server_registry.get(self._gui_id)
if obj is None:
@@ -261,42 +248,17 @@ class RPCBase:
self._rpc_response = None
self._msg_wait_event.clear()
self._client.connector.register(
MessageEndpoints.gui_instruction_response(request_id), cb=self._on_rpc_response
MessageEndpoints.gui_instruction_response(request_id),
cb=self._on_rpc_response,
parent=self,
)
target_gui_id = gui_id or self._gui_id
sent_at = time.time()
deadline = sent_at + timeout if timeout is not None else None
rpc_msg.metadata.update(
{
"method": method,
"receiver": receiver,
"target_gui_id": target_gui_id,
"object_name": self.object_name,
"wait_for_response": wait_for_rpc_response,
"timeout": timeout,
"sent_at": sent_at,
"deadline": deadline,
}
)
logger.info(
"Sending GUI RPC request "
f"request_id={request_id} method={method} receiver={receiver} "
f"target_gui_id={target_gui_id} object_name={self.object_name} "
f"wait_for_response={wait_for_rpc_response} timeout={timeout}"
)
self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg)
if wait_for_rpc_response:
try:
finished = self._msg_wait_event.wait(timeout)
if not finished:
logger.error(
"GUI RPC response timeout "
f"request_id={request_id} method={method} receiver={receiver} "
f"target_gui_id={target_gui_id} object_name={self.object_name} "
f"timeout={timeout}"
)
raise RPCResponseTimeoutError(request_id, timeout)
finally:
self._msg_wait_event.clear()
@@ -308,23 +270,17 @@ class RPCBase:
# the _on_rpc_response method
assert isinstance(self._rpc_response, messages.RequestResponseMessage)
logger.info(
"Received GUI RPC response "
f"request_id={request_id} method={method} receiver={receiver} "
f"target_gui_id={target_gui_id} object_name={self.object_name} "
f"accepted={self._rpc_response.accepted}"
)
if not self._rpc_response.accepted:
raise ValueError(self._rpc_response.message["error"])
msg_result = self._rpc_response.message.get("result")
self._rpc_response = None
return self._create_widget_from_msg_result(msg_result)
def _on_rpc_response(self, msg_obj: MessageObject) -> None:
@staticmethod
def _on_rpc_response(msg_obj: MessageObject, parent: RPCBase) -> None:
msg = cast(messages.RequestResponseMessage, msg_obj.value)
logger.debug(f"GUI RPC response callback received: {msg}")
self._rpc_response = msg
self._msg_wait_event.set()
parent._rpc_response = msg
parent._msg_wait_event.set()
def _create_widget_from_msg_result(self, msg_result):
if msg_result is None:
@@ -336,11 +292,6 @@ class RPCBase:
return {
key: self._create_widget_from_msg_result(val) for key, val in msg_result.items()
}
rpc_enabled = msg_result.get("__rpc__", True)
if rpc_enabled is False:
return None
msg_result = dict(msg_result)
cls = msg_result.pop("widget_class", None)
msg_result.pop("__rpc__", None)
+56
View File
@@ -0,0 +1,56 @@
from __future__ import annotations
from bec_widgets.cli.client_utils import IGNORE_WIDGETS
from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.plugin_utils import get_custom_classes
class RPCWidgetHandler:
"""Handler class for creating widgets from RPC messages."""
def __init__(self):
self._widget_classes = None
@property
def widget_classes(self) -> dict[str, type[BECWidget]]:
"""
Get the available widget classes.
Returns:
dict: The available widget classes.
"""
if self._widget_classes is None:
self.update_available_widgets()
return self._widget_classes # type: ignore
def update_available_widgets(self):
"""
Update the available widgets.
Returns:
None
"""
self._widget_classes = (
get_custom_classes("bec_widgets") + get_all_plugin_widgets()
).as_dict(IGNORE_WIDGETS)
def create_widget(self, widget_type, **kwargs) -> BECWidget:
"""
Create a widget from an RPC message.
Args:
widget_type(str): The type of the widget.
name (str): The name of the widget.
**kwargs: The keyword arguments for the widget.
Returns:
widget(BECWidget): The created widget.
"""
widget_class = self.widget_classes.get(widget_type) # type: ignore
if widget_class:
return widget_class(**kwargs)
raise ValueError(f"Unknown widget type: {widget_type}")
widget_handler = RPCWidgetHandler()
@@ -5,11 +5,9 @@ import json
import os
import signal
import sys
import traceback
from contextlib import redirect_stderr, redirect_stdout
import darkdetect
import shiboken6
from bec_lib.logger import bec_logger
from bec_lib.service_config import ServiceConfig
from bec_qthemes import apply_theme
@@ -20,8 +18,8 @@ from qtpy.QtWidgets import QApplication
import bec_widgets
from bec_widgets.applications.launch_window import LaunchWindow
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.bec_dispatcher import BECDispatcher
from bec_widgets.utils.rpc_register import RPCRegister
logger = bec_logger.logger
@@ -64,7 +62,6 @@ class GUIServer:
self.app: QApplication | None = None
self.launcher_window: LaunchWindow | None = None
self.dispatcher: BECDispatcher | None = None
self._shutdown_started = False
def start(self):
"""
@@ -76,7 +73,6 @@ class GUIServer:
bec_logger._stderr_log_level = bec_logger.LOGLEVEL.ERROR
bec_logger._update_sinks()
bec_logger.disabled_modules = ["bec_lib.scan_items"]
with redirect_stdout(SimpleFileLikeFromLogOutputFunc(logger.info)): # type: ignore
with redirect_stderr(SimpleFileLikeFromLogOutputFunc(logger.error)): # type: ignore
self._run()
@@ -97,7 +93,6 @@ class GUIServer:
"""
Run the GUI server.
"""
logger.info("Starting GUIServer", repr(self))
self.app = QApplication(sys.argv)
if darkdetect.isDark():
apply_theme("dark")
@@ -106,11 +101,11 @@ class GUIServer:
self.app.setApplicationName("BEC")
self.app.gui_id = self.gui_id # type: ignore
self.app.gui_server = self # type: ignore # make server accessible from QApplication for getattr in widgets
self.setup_bec_icon()
service_config = self._get_service_config()
self.dispatcher = BECDispatcher(config=service_config, gui_id=self.gui_id)
# self.dispatcher.start_cli_server(gui_id=self.gui_id)
if self.gui_class:
self.launcher_window = LaunchWindow(
@@ -123,10 +118,20 @@ class GUIServer:
self.launcher_window.setAttribute(Qt.WA_ShowWithoutActivating) # type: ignore
self.app.aboutToQuit.connect(self.shutdown)
self.app.setQuitOnLastWindowClosed(True)
self.app.setQuitOnLastWindowClosed(False)
signal.signal(signal.SIGINT, self.request_shutdown)
signal.signal(signal.SIGTERM, self.request_shutdown)
def sigint_handler(*args):
# display message, for people to let it terminate gracefully
print("Caught SIGINT, exiting")
# Widgets should be all closed.
with RPCRegister.delayed_broadcast():
for widget in QApplication.instance().topLevelWidgets(): # type: ignore
widget.close()
if self.app:
self.app.quit()
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
sys.exit(self.app.exec())
@@ -143,67 +148,15 @@ class GUIServer:
)
self.app.setWindowIcon(icon)
def request_shutdown(self, signum=None, _frame=None):
"""
Request Qt application shutdown from an RPC call or OS signal.
Cleanup itself is handled by ``shutdown()``, which is connected to
``QApplication.aboutToQuit``. Calling it directly here would run BEC/RPC
teardown before Qt has processed the widget close events.
"""
signal_name = signal.Signals(signum).name if signum is not None else "shutdown"
pid = os.getpid()
if self.app is None:
logger.info(f"Caught {signal_name}, shutting down GUI server pid={pid} without app")
self.shutdown()
return
widgets = [
f"{widget.__class__.__name__}(objectName={widget.objectName()!r})"
for widget in self.app.topLevelWidgets()
]
logger.info(
f"Caught {signal_name}, requesting GUI server shutdown pid={pid} "
f"top_level_widgets={widgets}"
)
with RPCRegister.delayed_broadcast():
for widget in self.app.topLevelWidgets():
widget.close()
self.app.quit()
@staticmethod
def _run_shutdown_step(step: str, callback):
try:
callback()
except Exception as exc:
logger.error(
f"GUIServer shutdown step failed pid={os.getpid()} step={step}: {exc}\n"
f"{traceback.format_exc()}"
)
def shutdown(self):
if self._shutdown_started:
return
self._shutdown_started = True
logger.info(f"Shutdown GUIServer pid={os.getpid()} {repr(self)}")
def close_launcher_window():
if self.launcher_window and shiboken6.isValid(self.launcher_window):
self.launcher_window.close()
self.launcher_window.deleteLater()
def stop_pylsp_server():
if pylsp_server.is_running():
pylsp_server.stop()
def stop_dispatcher():
if self.dispatcher:
self.dispatcher.stop_cli_server()
self.dispatcher.disconnect_all()
self._run_shutdown_step("close_launcher_window", close_launcher_window)
self._run_shutdown_step("stop_pylsp_server", stop_pylsp_server)
self._run_shutdown_step("stop_dispatcher", stop_dispatcher)
"""
Shutdown the GUI server.
"""
if pylsp_server.is_running():
pylsp_server.stop()
if self.dispatcher:
self.dispatcher.stop_cli_server()
self.dispatcher.disconnect_all()
def main():
@@ -25,8 +25,8 @@ from qtpy.QtWidgets import (
QWidget,
)
from bec_widgets.cli.rpc.rpc_widget_handler import widget_handler
from bec_widgets.utils.colors import apply_theme
from bec_widgets.utils.rpc_widget_handler import widget_handler
from bec_widgets.utils.widget_io import WidgetHierarchy as wh
from bec_widgets.widgets.editors.jupyter_console.jupyter_console import BECJupyterConsole
@@ -206,6 +206,7 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
def _populate_registry_widgets(self):
try:
widget_handler.update_available_widgets()
items = sorted(widget_handler.widget_classes.keys())
except Exception as exc:
print(f"Failed to load registered widgets: {exc}")
@@ -334,13 +335,20 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
If kwargs does not contain `object_name`, it will default to the provided shortcut.
"""
# Ensure registry is loaded
widget_handler.update_available_widgets()
cls = widget_handler.widget_classes.get(widget_type)
if cls is None:
raise ValueError(f"Unknown registered widget type: {widget_type}")
if kwargs is None:
kwargs = {"object_name": shortcut}
else:
kwargs = dict(kwargs)
kwargs.setdefault("object_name", shortcut)
widget = widget_handler.create_widget(widget_type, **kwargs)
# Instantiate and add
widget = cls(**kwargs)
if not isinstance(widget, QWidget):
raise TypeError(
f"Instantiated object for type '{widget_type}' is not a QWidget: {type(widget)}"
+5 -12
View File
@@ -1,7 +1,6 @@
# pylint: skip-file
from unittest.mock import MagicMock
from bec_lib.config_helper import ConfigHelper
from bec_lib.device import Device as BECDevice
from bec_lib.device import Positioner as BECPositioner
from bec_lib.device import ReadoutPriority
@@ -15,7 +14,7 @@ class FakeDevice(BECDevice):
super().__init__(name=name)
self._enabled = enabled
self.signals = {self.name: {"value": 1.0}}
self._description = {self.name: {"source": self.name, "dtype": "number", "shape": []}}
self.description = {self.name: {"source": self.name, "dtype": "number", "shape": []}}
self._readout_priority = readout_priority
self._config = {
"readoutPriority": "baseline",
@@ -74,7 +73,7 @@ class FakeDevice(BECDevice):
Returns:
dict: Description of the device
"""
return self._description
return self.description
class FakePositioner(BECPositioner):
@@ -96,7 +95,7 @@ class FakePositioner(BECPositioner):
self._limits = limits
self._readout_priority = readout_priority
self.signals = {self.name: {"value": 1.0}}
self._description = {self.name: {"source": self.name, "dtype": "number", "shape": []}}
self.description = {self.name: {"source": self.name, "dtype": "number", "shape": []}}
self._config = {
"readoutPriority": "baseline",
"deviceClass": "ophyd_devices.SimPositioner",
@@ -176,7 +175,7 @@ class FakePositioner(BECPositioner):
Returns:
dict: Description of the device
"""
return self._description
return self.description
@property
def precision(self):
@@ -220,9 +219,7 @@ class Device(FakeDevice):
class DMMock:
def __init__(self, *args, **kwargs):
self._service = args[0]
self.config_helper = ConfigHelper(self._service.connector, self._service._service_name)
def __init__(self):
self.devices = DeviceContainer()
self.enabled_devices = [device for device in self.devices if device.enabled]
@@ -276,10 +273,6 @@ class DMMock:
configs.append(device._config)
return configs
def initialize(*_): ...
def shutdown(self): ...
DEVICES = [
FakePositioner("samx", limits=[-10, 10], read_value=2.0),
+12
View File
@@ -1 +1,13 @@
from qtpy.QtWebEngineWidgets import QWebEngineView
from .bec_connector import BECConnector, ConnectionConfig
from .bec_dispatcher import BECDispatcher
from .bec_table import BECTable
from .colors import Colors
from .container_utils import WidgetContainerUtils
from .crosshair import Crosshair
from .entry_validator import EntryValidator
from .layout_manager import GridLayoutManager
from .rpc_decorator import register_rpc_methods, rpc_public
from .ui_loader import UILoader
from .validator_delegate import DoubleValidationDelegate
+11 -59
View File
@@ -12,17 +12,18 @@ import shiboken6 as shb
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import_from
from pydantic import BaseModel, Field, field_validator
from qtpy.QtCore import Property, QObject, QRunnable, QThreadPool, Signal
from qtpy.QtCore import Property, QObject, QRunnable, QThreadPool, QTimer, Signal
from qtpy.QtWidgets import QApplication
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.error_popups import ErrorPopupUtility, SafeSlot
from bec_widgets.utils.name_utils import sanitize_namespace
from bec_widgets.utils.rpc_register import RPCRegister
from bec_widgets.utils.widget_io import WidgetHierarchy
from bec_widgets.utils.yaml_dialog import load_yaml, load_yaml_gui, save_yaml, save_yaml_gui
if TYPE_CHECKING: # pragma: no cover
from bec_widgets.utils.bec_dispatcher import BECDispatcher
from bec_widgets.widgets.containers.dock import BECDock
else:
BECDispatcher = lazy_import_from("bec_widgets.utils.bec_dispatcher", ("BECDispatcher",))
@@ -88,8 +89,6 @@ class BECConnector:
gui_id: str | None = None,
object_name: str | None = None,
root_widget: bool = False,
rpc_exposed: bool = True,
rpc_passthrough_children: bool = True,
**kwargs,
):
"""
@@ -101,10 +100,6 @@ class BECConnector:
gui_id(str, optional): The GUI ID.
object_name(str, optional): The object name.
root_widget(bool, optional): If set to True, the parent_id will be always set to None, thus enforcing that the widget is accessible as a root widget of the BECGuiClient object.
rpc_exposed(bool, optional): If set to False, this instance is excluded from RPC registry broadcast and CLI namespace discovery.
rpc_passthrough_children(bool, optional): Only relevant when ``rpc_exposed=False``.
If True, RPC-visible children rebind to the next visible ancestor.
If False (default), children stay hidden behind this widget.
**kwargs:
"""
# Extract object_name from kwargs to not pass it to Qt class
@@ -133,13 +128,8 @@ class BECConnector:
# the function depends on BECClient, and BECDispatcher
@SafeSlot()
def terminate(client=self.client, dispatcher=self.bec_dispatcher):
app = QApplication.instance()
gui_server = getattr(app, "gui_server", None)
if gui_server and hasattr(gui_server, "shutdown"):
gui_server.shutdown()
logger.info("Disconnecting", repr(dispatcher))
dispatcher.disconnect_all()
dispatcher.stop_cli_server()
try: # shutdown ophyd threads if any
from ophyd._pyepics_shim import _dispatcher
@@ -167,7 +157,7 @@ class BECConnector:
)
self.config = ConnectionConfig(widget_class=self.__class__.__name__)
# If the gui_id is passed, it should be respected. However, this should be revisited since
# If the gui_id is passed, it should be respected. However, this should be revisted since
# the gui_id has to be unique, and may no longer be.
if gui_id:
self.config.gui_id = gui_id
@@ -195,11 +185,6 @@ class BECConnector:
# If set to True, the parent_id will be always set to None, thus enforcing that the widget is accessible as a root widget of the BECGuiClient object.
self.root_widget = root_widget
# If set to False, this instance is not exposed through RPC at all.
self.rpc_exposed = bool(rpc_exposed)
# If True on a hidden parent (rpc_exposed=False), children can bubble up to
# the next visible RPC ancestor.
self.rpc_passthrough_children = bool(rpc_passthrough_children)
self._update_object_name()
@@ -208,41 +193,11 @@ class BECConnector:
try:
if self.root_widget:
return None
connector_parent = self._get_rpc_parent_ancestor()
connector_parent = WidgetHierarchy._get_becwidget_ancestor(self)
return connector_parent.gui_id if connector_parent else None
except:
logger.error(f"Error getting parent_id for {self.__class__.__name__}")
def _get_rpc_parent_ancestor(self) -> BECConnector | None:
"""
Find the nearest ancestor that is RPC-addressable.
Rules:
- If an ancestor has ``rpc_exposed=False``, it is an explicit visibility
boundary unless ``rpc_passthrough_children=True``.
- If an ancestor has ``RPC=False`` (but remains rpc_exposed), it is treated
as structural and children continue to the next ancestor.
- Lookup always happens through ``WidgetHierarchy.get_becwidget_ancestor``
so plain ``QWidget`` nodes between connectors are ignored.
"""
current = self
while True:
parent = WidgetHierarchy.get_becwidget_ancestor(current)
if parent is None:
return None
if not getattr(parent, "rpc_exposed", True):
if getattr(parent, "rpc_passthrough_children", False):
current = parent
continue
return parent
if getattr(parent, "RPC", True):
return parent
current = parent
return None
def change_object_name(self, name: str) -> None:
"""
Change the object name of the widget. Unregister old name and register the new one.
@@ -261,9 +216,8 @@ class BECConnector:
"""
# 1) Enforce unique objectName among siblings with the same BECConnector parent
self._enforce_unique_sibling_name()
# 2) Register the object for RPC unless instance-level exposure is disabled.
if getattr(self, "rpc_exposed", True):
self.rpc_register.add_rpc(self)
# 2) Register the object for RPC
self.rpc_register.add_rpc(self)
try:
self.name_established.emit(self.object_name)
except RuntimeError as e:
@@ -281,7 +235,7 @@ class BECConnector:
if not shb.isValid(self):
return
parent_bec = WidgetHierarchy.get_becwidget_ancestor(self)
parent_bec = WidgetHierarchy._get_becwidget_ancestor(self)
if parent_bec:
# We have a parent => only compare with siblings under that parent
@@ -291,7 +245,7 @@ class BECConnector:
# Use RPCRegister to avoid QApplication.allWidgets() during event processing.
connections = self.rpc_register.list_all_connections().values()
all_bec = [w for w in connections if isinstance(w, BECConnector) and shb.isValid(w)]
siblings = [w for w in all_bec if WidgetHierarchy.get_becwidget_ancestor(w) is None]
siblings = [w for w in all_bec if WidgetHierarchy._get_becwidget_ancestor(w) is None]
# Collect used names among siblings
used_names = {sib.objectName() for sib in siblings if sib is not self}
@@ -319,8 +273,6 @@ class BECConnector:
Args:
name (str): The new object name.
"""
# sanitize before setting to avoid issues with Qt object names and RPC namespaces
name = sanitize_namespace(name)
super().setObjectName(name)
self.object_name = name
if self.rpc_register.object_is_registered(self):
@@ -399,7 +351,7 @@ class BECConnector:
"""
self.config = config
# FIXME some thoughts are required to decide how this should work with rpc registry
# FIXME some thoughts are required to decide how thhis should work with rpc registry
def apply_config(self, config: dict, generate_new_id: bool = True) -> None:
"""
Apply the configuration to the widget.
@@ -417,7 +369,7 @@ class BECConnector:
else:
self.gui_id = self.config.gui_id
# FIXME some thoughts are required to decide how this should work with rpc registry
# FIXME some thoughts are required to decide how thhis should work with rpc registry
def load_config(self, path: str | None = None, gui: bool = False):
"""
Load the configuration of the widget from YAML.
+13 -52
View File
@@ -3,9 +3,8 @@ from __future__ import annotations
import collections
import random
import string
import time
from collections.abc import Callable
from typing import TYPE_CHECKING, Any, DefaultDict, Hashable, Union
from typing import TYPE_CHECKING, DefaultDict, Hashable, Union
import louie
import redis
@@ -16,7 +15,6 @@ from bec_lib.service_config import ServiceConfig
from qtpy.QtCore import QObject
from qtpy.QtCore import Signal as pyqtSignal
from bec_widgets.utils.rpc_logging import elapsed_seconds, format_elapsed
from bec_widgets.utils.serialization import register_serializer_extension
logger = bec_logger.logger
@@ -27,39 +25,6 @@ if TYPE_CHECKING: # pragma: no cover
from bec_widgets.utils.rpc_server import RPCServer
def _log_rpc_dispatcher_receive(msg_content: Any, metadata: Any) -> None:
if not isinstance(msg_content, dict) or not isinstance(metadata, dict):
return
request_id = metadata.get("request_id")
method = msg_content.get("action")
parameter = msg_content.get("parameter")
if request_id is None or method is None or not isinstance(parameter, dict):
return
dispatch_received_at = time.time()
sent_at = metadata.get("sent_at")
deadline = metadata.get("deadline")
timeout = metadata.get("timeout")
dispatch_latency = elapsed_seconds(sent_at, dispatch_received_at)
stale_on_dispatch = deadline is not None and dispatch_received_at > deadline
target_gui_id = parameter.get("gui_id") or metadata.get("target_gui_id")
logger.info(
"GUI RPC dispatcher received request before Qt callback emit "
f"request_id={request_id} method={method} receiver={metadata.get('receiver')} "
f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} "
f"timeout={timeout} dispatch_latency_s={format_elapsed(dispatch_latency)} "
f"stale_on_dispatch={stale_on_dispatch}"
)
if stale_on_dispatch:
logger.warning(
"GUI RPC dispatcher received request after client timeout deadline "
f"request_id={request_id} method={method} receiver={metadata.get('receiver')} "
f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} "
f"timeout={timeout} dispatch_latency_s={format_elapsed(dispatch_latency)}"
)
class QtThreadSafeCallback(QObject):
"""QtThreadSafeCallback is a wrapper around a callback function to make it thread-safe for Qt."""
@@ -123,12 +88,10 @@ class QtRedisConnector(RedisConnector):
# we can notice kwargs are lost when passed to Qt slot
metadata = msg.metadata
_log_rpc_dispatcher_receive(msg.content, metadata)
cb(msg.content, metadata)
else:
# from stream
msg = msg["data"]
_log_rpc_dispatcher_receive(msg.content, msg.metadata)
cb(msg.content, msg.metadata)
@@ -160,16 +123,17 @@ class BECDispatcher:
self._registered_slots: DefaultDict[Hashable, QtThreadSafeCallback] = (
collections.defaultdict()
)
self.client = client
if client is None:
if config is not None and not isinstance(config, ServiceConfig):
# config is supposed to be a path
config = ServiceConfig(config)
if self.client is None:
if config is not None:
if not isinstance(config, ServiceConfig):
# config is supposed to be a path
config = ServiceConfig(config)
self.client = BECClient(
config=config, connector_cls=QtRedisConnector, name="BECWidgets"
)
else:
self.client = client
if self.client.started:
# have to reinitialize client to use proper connector
logger.info("Shutting down BECClient to switch to QtRedisConnector")
@@ -212,15 +176,12 @@ class BECDispatcher:
cb_info (dict | None): A dictionary containing information about the callback. Defaults to None.
"""
qt_slot = QtThreadSafeCallback(cb=slot, cb_info=cb_info)
if not self.client.connector.any_stream_is_registered(topics, qt_slot):
if qt_slot not in self._registered_slots:
self._registered_slots[qt_slot] = qt_slot
qt_slot = self._registered_slots[qt_slot]
self.client.connector.register(topics, cb=qt_slot, **kwargs)
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
qt_slot.topics.update(set(topics_str))
else:
logger.warning(f"Attempted to create duplicate stream subscription for {topics=}")
if qt_slot not in self._registered_slots:
self._registered_slots[qt_slot] = qt_slot
qt_slot = self._registered_slots[qt_slot]
self.client.connector.register(topics, cb=qt_slot, **kwargs)
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
qt_slot.topics.update(set(topics_str))
def disconnect_slot(
self, slot: Callable, topics: EndpointInfo | str | list[EndpointInfo] | list[str]
-87
View File
@@ -1,87 +0,0 @@
"""
Login dialog for user authentication.
The Login Widget is styled in a Material Design style and emits
the entered credentials through a signal for further processing.
"""
from qtpy.QtCore import Qt, Signal
from qtpy.QtWidgets import QLabel, QLineEdit, QPushButton, QVBoxLayout, QWidget
class BECLogin(QWidget):
"""Login dialog for user authentication in Material Design style."""
credentials_entered = Signal(str, str)
def __init__(self, parent=None):
super().__init__(parent=parent)
# Only displayed if this widget as standalone widget, and not embedded in another widget
self.setWindowTitle("Login")
title = QLabel("Sign in", parent=self)
title.setAlignment(Qt.AlignmentFlag.AlignCenter)
title.setStyleSheet("""
#QLabel
{
font-size: 18px;
font-weight: 600;
}
""")
self.username = QLineEdit(parent=self)
self.username.setPlaceholderText("Username")
self.password = QLineEdit(parent=self)
self.password.setPlaceholderText("Password")
self.password.setEchoMode(QLineEdit.EchoMode.Password)
self.ok_btn = QPushButton("Sign in", parent=self)
self.ok_btn.setDefault(True)
self.ok_btn.clicked.connect(self._emit_credentials)
# If the user presses Enter in the password field, trigger the OK button click
self.password.returnPressed.connect(self.ok_btn.click)
# Build Layout
layout = QVBoxLayout(self)
layout.setContentsMargins(32, 32, 32, 32)
layout.setSpacing(16)
layout.addWidget(title)
layout.addSpacing(8)
layout.addWidget(self.username)
layout.addWidget(self.password)
layout.addSpacing(12)
layout.addWidget(self.ok_btn)
self.username.setFocus()
self.setStyleSheet("""
QLineEdit {
padding: 8px;
}
""")
def _clear_password(self):
"""Clear the password field."""
self.password.clear()
def _emit_credentials(self):
"""Emit credentials and clear the password field."""
self.credentials_entered.emit(self.username.text().strip(), self.password.text())
self._clear_password()
if __name__ == "__main__": # pragma: no cover
import sys
from bec_qthemes import apply_theme
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
apply_theme("light")
dialog = BECLogin()
dialog.credentials_entered.connect(lambda u, p: print(f"Username: {u}, Password: {p}"))
dialog.show()
sys.exit(app.exec_())
+4 -65
View File
@@ -4,7 +4,6 @@ import importlib.metadata
import inspect
import pkgutil
import traceback
from functools import lru_cache
from importlib import util as importlib_util
from importlib.machinery import FileFinder, ModuleSpec, SourceFileLoader
from types import ModuleType
@@ -12,11 +11,7 @@ from typing import Generator
from bec_lib.logger import bec_logger
from bec_widgets.utils.plugin_utils import (
BECClassContainer,
BECClassInfo,
rpc_widget_registry_from_source,
)
from bec_widgets.utils.plugin_utils import BECClassContainer, BECClassInfo
logger = bec_logger.logger
@@ -58,14 +53,6 @@ def _submodule_by_name(module: ModuleType, name: str):
return None
def _submodule_spec_by_name(module: ModuleType, name: str) -> ModuleSpec | None:
for module_info in pkgutil.iter_modules(module.__path__):
if module_info.name != name or not isinstance(module_info.module_finder, FileFinder):
continue
return module_info.module_finder.find_spec(module_info.name)
return None
def _get_widgets_from_module(module: ModuleType) -> BECClassContainer:
"""Find any BECWidget subclasses in the given module and return them with their info."""
from bec_widgets.utils.bec_widget import BECWidget # avoid circular import
@@ -103,64 +90,16 @@ def get_plugin_client_module() -> ModuleType | None:
return _submodule_by_name(plugin, "client") if (plugin := user_widget_plugin()) else None
def get_plugin_designer_module() -> ModuleType | None:
"""If there is a plugin repository installed, return the designer module."""
return (
_submodule_by_name(plugin, "designer_plugins") if (plugin := user_widget_plugin()) else None
)
@lru_cache
def get_plugin_rpc_widget_registry() -> dict[str, tuple[str, str]]:
"""If there is a plugin repository installed, return the RPC widget registry."""
plugin = user_widget_plugin()
if plugin is None:
return {}
client_spec = _submodule_spec_by_name(plugin, "client")
if client_spec is not None and client_spec.origin:
try:
return rpc_widget_registry_from_source(client_spec.origin)
except (OSError, SyntaxError) as exc:
logger.warning(f"Could not parse plugin RPC widget registry: {exc}")
client_module = get_plugin_client_module()
if client_module is None:
return {}
registry = {}
for plugin_name, plugin_class in inspect.getmembers(client_module, inspect.isclass):
if hasattr(plugin_class, "_IMPORT_MODULE"):
registry[plugin_name] = (plugin_class._IMPORT_MODULE, plugin_class.__name__)
return registry
@lru_cache
def get_plugin_designer_registry() -> dict[str, tuple[str, str]]:
"""If there is a plugin repository installed, return the designer plugin registry."""
designer_module = get_plugin_designer_module()
if designer_module and hasattr(designer_module, "designer_plugins"):
return designer_module.designer_plugins
return {}
@lru_cache
def get_plugin_widget_icons() -> dict[str, str]:
"""If there is a plugin repository installed, return the designer widget icon registry."""
designer_module = get_plugin_designer_module()
if designer_module and hasattr(designer_module, "widget_icons"):
return designer_module.widget_icons
return {}
def get_all_plugin_widgets() -> BECClassContainer:
"""If there is a plugin repository installed, load all widgets from it."""
if plugin := user_widget_plugin():
return _all_widgets_from_all_submods(plugin)
return BECClassContainer()
else:
return BECClassContainer()
if __name__ == "__main__": # pragma: no cover
widgets = get_plugin_rpc_widget_registry()
client = get_plugin_client_module()
print(get_all_plugin_widgets())
...
+25 -26
View File
@@ -10,16 +10,17 @@ from qtpy.QtGui import QFont, QPixmap
from qtpy.QtWidgets import QApplication, QFileDialog, QLabel, QVBoxLayout, QWidget
import bec_widgets.widgets.containers.qt_ads as QtAds
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.bec_connector import BECConnector, ConnectionConfig
from bec_widgets.utils.busy_loader import install_busy_loader
from bec_widgets.utils.error_popups import SafeConnect, SafeSlot
from bec_widgets.utils.rpc_decorator import rpc_timeout
from bec_widgets.utils.rpc_register import RPCRegister
from bec_widgets.utils.widget_io import WidgetHierarchy
from bec_widgets.widgets.utility.spinner.spinner import SpinnerWidget
if TYPE_CHECKING: # pragma: no cover
from bec_widgets.utils.busy_loader import BusyLoaderOverlay
from bec_widgets.widgets.containers.dock import BECDock
logger = bec_logger.logger
@@ -330,34 +331,32 @@ class BECWidget(BECConnector):
# All widgets need to call super().cleanup() in their cleanup method
logger.info(f"Registry cleanup for widget {self.__class__.__name__}")
self.rpc_register.remove_rpc(self)
children = self.findChildren(BECWidget)
for child in children:
if not shiboken6.isValid(child):
# If the child is not valid, it means it has already been deleted
continue
child.close()
child.deleteLater()
children = self.findChildren(BECWidget)
for child in children:
if not shiboken6.isValid(child):
# If the child is not valid, it means it has already been deleted
continue
child.close()
child.deleteLater()
# Tear down busy overlay explicitly to stop spinner and remove filters
overlay = getattr(self, "_busy_overlay", None)
if overlay is not None and shiboken6.isValid(overlay):
try:
overlay.hide()
filt = getattr(overlay, "_filter", None)
if filt is not None and shiboken6.isValid(filt):
try:
self.removeEventFilter(filt)
except Exception as exc:
logger.warning(
f"Failed to remove event filter from busy overlay: {exc}"
)
# Tear down busy overlay explicitly to stop spinner and remove filters
overlay = getattr(self, "_busy_overlay", None)
if overlay is not None and shiboken6.isValid(overlay):
try:
overlay.hide()
filt = getattr(overlay, "_filter", None)
if filt is not None and shiboken6.isValid(filt):
try:
self.removeEventFilter(filt)
except Exception as exc:
logger.warning(f"Failed to remove event filter from busy overlay: {exc}")
# Cleanup the overlay widget. This will call cleanup on the custom widget if present.
# Cleanup the overlay widget. This will call cleanup on the custom widget if present.
overlay.cleanup()
overlay.deleteLater()
except Exception as exc:
logger.warning(f"Failed to delete busy overlay: {exc}")
overlay.cleanup()
overlay.deleteLater()
except Exception as exc:
logger.warning(f"Failed to delete busy overlay: {exc}")
def closeEvent(self, event):
"""Wrap the close even to ensure the rpc_register is cleaned up."""
+67 -41
View File
@@ -2,7 +2,7 @@ from __future__ import annotations
import re
from functools import lru_cache
from typing import Any, Literal
from typing import Literal
import numpy as np
import pyqtgraph as pg
@@ -21,7 +21,8 @@ logger = bec_logger.logger
def get_theme_name():
if QApplication.instance() is None or not hasattr(QApplication.instance(), "theme"):
return "dark"
return QApplication.instance().theme.theme
else:
return QApplication.instance().theme.theme
def get_theme_palette():
@@ -57,25 +58,6 @@ def apply_theme(theme: Literal["dark", "light"]):
process_all_deferred_deletes(QApplication.instance())
def theme_color(theme: Any | None, key: str, fallback: QColor | str) -> QColor:
"""
Return a QColor from a BEC theme, or the fallback when no theme is set.
"""
fallback_color = fallback if isinstance(fallback, QColor) else QColor(str(fallback))
if theme is None:
return fallback_color
return theme.color(key, fallback_color.name())
def rgba(color: QColor | str, alpha: int) -> str:
"""
Return a QSS-compatible rgba string.
"""
qcolor = color if isinstance(color, QColor) else QColor(str(color))
return f"rgba({qcolor.red()}, {qcolor.green()}, {qcolor.blue()}, {alpha})"
class Colors:
@staticmethod
def list_available_colormaps() -> list[str]:
@@ -119,13 +101,8 @@ class Colors:
return requested
# Case-insensitive match.
requested_lc = requested.casefold()
for name in available:
if name.casefold() == requested_lc:
return name
return requested
lower_to_canonical = {name.lower(): name for name in available}
return lower_to_canonical.get(requested.lower(), requested)
@staticmethod
def get_colormap(color_map: str) -> pg.ColorMap:
@@ -168,6 +145,25 @@ class Colors:
return ge.colorMap()
@staticmethod
def golden_ratio(num: int) -> list:
"""Calculate the golden ratio for a given number of angles.
Args:
num (int): Number of angles
Returns:
list: List of angles calculated using the golden ratio.
"""
phi = 2 * np.pi * ((1 + np.sqrt(5)) / 2)
angles = []
for ii in range(num):
x = np.cos(ii * phi)
y = np.sin(ii * phi)
angle = np.arctan2(y, x)
angles.append(angle)
return angles
@staticmethod
def set_theme_offset(theme: Literal["light", "dark"] | None = None, offset=0.2) -> tuple:
"""
@@ -238,7 +234,20 @@ class Colors:
else:
positions = np.linspace(min_pos, max_pos, num)
return Colors._format_mapped_colors(cmap.map(positions, mode="float"), format)
# Sample colors from the colormap at the calculated positions
colors = cmap.map(positions, mode="float")
color_list = []
for color in colors:
if format.upper() == "HEX":
color_list.append(QColor.fromRgbF(*color).name())
elif format.upper() == "RGB":
color_list.append(tuple((np.array(color) * 255).astype(int)))
elif format.upper() == "QCOLOR":
color_list.append(QColor.fromRgbF(*color))
else:
raise ValueError("Unsupported format. Please choose 'RGB', 'HEX', or 'QColor'.")
return color_list
@staticmethod
def golden_angle_color(
@@ -274,19 +283,20 @@ class Colors:
positions = np.mod(np.arange(num) * golden_angle_conjugate, 1)
positions = min_pos + positions * (max_pos - min_pos)
return Colors._format_mapped_colors(cmap.map(positions, mode="float"), format)
# Sample colors from the colormap at the calculated positions
colors = cmap.map(positions, mode="float")
color_list = []
@staticmethod
def _format_mapped_colors(colors: np.ndarray, format: Literal["QColor", "HEX", "RGB"]) -> list:
color_format = format.upper()
if color_format not in {"QCOLOR", "HEX", "RGB"}:
raise ValueError("Unsupported format. Please choose 'RGB', 'HEX', or 'QColor'.")
if color_format == "QCOLOR":
return [QColor.fromRgbF(*color) for color in colors]
if color_format == "HEX":
return [QColor.fromRgbF(*color).name() for color in colors]
return [tuple((np.array(color) * 255).astype(int)) for color in colors]
for color in colors:
if format.upper() == "HEX":
color_list.append(QColor.fromRgbF(*color).name())
elif format.upper() == "RGB":
color_list.append(tuple((np.array(color) * 255).astype(int)))
elif format.upper() == "QCOLOR":
color_list.append(QColor.fromRgbF(*color))
else:
raise ValueError("Unsupported format. Please choose 'RGB', 'HEX', or 'QColor'.")
return color_list
@staticmethod
def hex_to_rgba(hex_color: str, alpha=255) -> tuple:
@@ -310,6 +320,22 @@ class Colors:
raise ValueError("HEX color must be 6 or 8 characters long.")
return (r, g, b, alpha)
@staticmethod
def rgba_to_hex(r: int, g: int, b: int, a: int = 255) -> str:
"""
Convert RGBA color to HEX.
Args:
r(int): Red value (0-255).
g(int): Green value (0-255).
b(int): Blue value (0-255).
a(int): Alpha value (0-255). Default is 255 (opaque).
Returns:
hec_color(str): HEX color string.
"""
return "#{:02X}{:02X}{:02X}{:02X}".format(r, g, b, a)
@staticmethod
def validate_color(color: tuple | str) -> tuple | str:
"""
+1 -1
View File
@@ -43,7 +43,7 @@ class WidgetContainerUtils:
if list_of_names is None:
list_of_names = []
ii = 0
while ii < 1000: # 1000 is arbitrary!
while ii < 1000: # 1000 is arbritrary!
name_candidate = f"{name}_{ii}"
if name_candidate not in list_of_names:
return name_candidate
+318 -91
View File
@@ -1,12 +1,12 @@
"""Small helpers for populating editable combo boxes used by device inputs."""
"""Module for handling filter I/O operations in BEC Widgets for input fields.
These operations include filtering device/signal names and/or device types.
"""
from __future__ import annotations
from contextlib import nullcontext
from abc import ABC, abstractmethod
from bec_lib.logger import bec_logger
from qtpy.QtCore import QSignalBlocker
from qtpy.QtWidgets import QComboBox
from qtpy.QtCore import QStringListModel
from qtpy.QtWidgets import QComboBox, QCompleter, QLineEdit
from typeguard import TypeCheckError
from bec_widgets.utils.ophyd_kind_util import Kind
@@ -14,102 +14,329 @@ from bec_widgets.utils.ophyd_kind_util import Kind
logger = bec_logger.logger
def replace_combobox_items(
combo_box: QComboBox,
items: list[str | tuple],
*,
preserve_current_text: bool = False,
block_signals: bool = False,
) -> None:
"""Replace all combobox entries.
class WidgetFilterHandler(ABC):
"""Abstract base class for widget filter handlers"""
Args:
combo_box: Combobox whose entries should be replaced.
items: Entries to add. String entries are added as display text. Tuple entries are
passed to ``QComboBox.addItem`` as ``(text, data)``.
preserve_current_text: If True, restore the combobox text after replacing the items.
block_signals: If True, block combobox signals while the items are replaced.
"""
current_text = combo_box.currentText()
signal_blocker = QSignalBlocker(combo_box) if block_signals else nullcontext()
with signal_blocker:
combo_box.clear()
for item in items:
if isinstance(item, str):
combo_box.addItem(item)
@abstractmethod
def set_selection(self, widget, selection: list[str | tuple]) -> None:
"""Set the filtered_selection for the widget
Args:
widget: Widget instance
selection (list[str | tuple]): Filtered selection of items.
If tuple, it contains (text, data) pairs.
"""
@abstractmethod
def check_input(self, widget, text: str) -> bool:
"""Check if the input text is in the filtered selection
Args:
widget: Widget instance
text (str): Input text
Returns:
bool: True if the input text is in the filtered selection
"""
@abstractmethod
def update_with_kind(
self, kind: Kind, signal_filter: set, device_info: dict, device_name: str
) -> list[str | tuple]:
"""Update the selection based on the kind of signal.
Args:
kind (Kind): The kind of signal to filter.
signal_filter (set): Set of signal kinds to filter.
device_info (dict): Dictionary containing device information.
device_name (str): Name of the device.
Returns:
list[str | tuple]: A list of filtered signals based on the kind.
"""
# This method should be implemented in subclasses or extended as needed
def update_with_bec_signal_class(
self,
signal_class_filter: str | list[str],
client,
ndim_filter: int | list[int] | None = None,
) -> list[tuple[str, str, dict]]:
"""Update the selection based on signal classes using device_manager.get_bec_signals.
Args:
signal_class_filter (str|list[str]): List of signal class names to filter.
client: BEC client instance.
ndim_filter (int | list[int] | None): Filter signals by dimensionality.
If provided, only signals with matching ndim will be included.
Returns:
list[tuple[str, str, dict]]: A list of (device_name, signal_name, signal_config) tuples.
"""
if not client or not hasattr(client, "device_manager"):
return []
try:
signals = client.device_manager.get_bec_signals(signal_class_filter)
except TypeCheckError as e:
logger.warning(f"Error retrieving signals: {e}")
return []
if ndim_filter is None:
return signals
if isinstance(ndim_filter, int):
ndim_filter = [ndim_filter]
filtered_signals = []
for device_name, signal_name, signal_config in signals:
ndim = None
if isinstance(signal_config, dict):
ndim = signal_config.get("describe", {}).get("signal_info", {}).get("ndim")
if ndim in ndim_filter:
filtered_signals.append((device_name, signal_name, signal_config))
return filtered_signals
class LineEditFilterHandler(WidgetFilterHandler):
"""Handler for QLineEdit widget"""
def set_selection(self, widget: QLineEdit, selection: list[str | tuple]) -> None:
"""Set the selection for the widget to the completer model
Args:
widget (QLineEdit): The QLineEdit widget
selection (list[str | tuple]): Filtered selection of items. If tuple, it contains (text, data) pairs.
"""
if isinstance(selection, tuple):
# If selection is a tuple, it contains (text, data) pairs
selection = [text for text, _ in selection]
if not isinstance(widget.completer, QCompleter):
completer = QCompleter(widget)
widget.setCompleter(completer)
widget.completer.setModel(QStringListModel(selection, widget))
def check_input(self, widget: QLineEdit, text: str) -> bool:
"""Check if the input text is in the filtered selection
Args:
widget (QLineEdit): The QLineEdit widget
text (str): Input text
Returns:
bool: True if the input text is in the filtered selection
"""
model = widget.completer.model()
model_data = [model.data(model.index(i)) for i in range(model.rowCount())]
return text in model_data
def update_with_kind(
self, kind: Kind, signal_filter: set, device_info: dict, device_name: str
) -> list[str | tuple]:
"""Update the selection based on the kind of signal.
Args:
kind (Kind): The kind of signal to filter.
signal_filter (set): Set of signal kinds to filter.
device_info (dict): Dictionary containing device information.
device_name (str): Name of the device.
Returns:
list[str | tuple]: A list of filtered signals based on the kind.
"""
return [
signal
for signal, signal_info in device_info.items()
if kind in signal_filter and (signal_info.get("kind_str", None) == str(kind.name))
]
class ComboBoxFilterHandler(WidgetFilterHandler):
"""Handler for QComboBox widget"""
def set_selection(self, widget: QComboBox, selection: list[str | tuple]) -> None:
"""Set the selection for the widget to the completer model
Args:
widget (QComboBox): The QComboBox widget
selection (list[str | tuple]): Filtered selection of items. If tuple, it contains (text, data) pairs.
"""
widget.clear()
if len(selection) == 0:
return
for element in selection:
if isinstance(element, str):
widget.addItem(element)
elif isinstance(element, tuple):
# If element is a tuple, it contains (text, data) pairs
widget.addItem(*element)
def check_input(self, widget: QComboBox, text: str) -> bool:
"""Check if the input text is in the filtered selection
Args:
widget (QComboBox): The QComboBox widget
text (str): Input text
Returns:
bool: True if the input text is in the filtered selection
"""
return text in [widget.itemText(i) for i in range(widget.count())]
def update_with_kind(
self, kind: Kind, signal_filter: set, device_info: dict, device_name: str
) -> list[str | tuple]:
"""Update the selection based on the kind of signal.
Args:
kind (Kind): The kind of signal to filter.
signal_filter (set): Set of signal kinds to filter.
device_info (dict): Dictionary containing device information.
device_name (str): Name of the device.
Returns:
list[str | tuple]: A list of filtered signals based on the kind.
"""
out = []
for signal, signal_info in device_info.items():
if kind not in signal_filter or (signal_info.get("kind_str", None) != str(kind.name)):
continue
obj_name = signal_info.get("obj_name", "")
component_name = signal_info.get("component_name", "")
signal_wo_device = obj_name.removeprefix(f"{device_name}_")
if not signal_wo_device:
signal_wo_device = obj_name
if signal_wo_device != signal and component_name.replace(".", "_") != signal_wo_device:
# If the object name is not the same as the signal name, we use the object name
# to display in the combobox.
out.append((f"{signal_wo_device} ({signal})", signal_info))
else:
combo_box.addItem(*item)
if preserve_current_text:
combo_box.setCurrentText(current_text)
# If the object name is the same as the signal name, we do not change it.
out.append((signal, signal_info))
return out
def signal_items_for_kind(
*, kind: Kind, signal_filter: set[Kind], device_info: dict, device_name: str
) -> list[tuple[str, dict]]:
"""Build display entries for signals matching a BEC signal kind.
Args:
kind: Signal kind to collect.
signal_filter: Enabled signal kinds.
device_info: Signal metadata from the BEC device info dictionary.
device_name: Name of the device owning the signals.
Returns:
Combobox entries as ``(display_text, signal_info)`` tuples.
class FilterIO:
"""Public interface to set filters for input widgets.
It supports the list of widgets stored in class attribute _handlers.
"""
items: list[tuple[str, dict]] = []
for signal_name, signal_info in device_info.items():
if kind not in signal_filter or signal_info.get("kind_str") != kind.name:
continue
obj_name = signal_info.get("obj_name", "")
component_name = signal_info.get("component_name", "")
signal_without_device = obj_name.removeprefix(f"{device_name}_")
if not signal_without_device:
signal_without_device = obj_name
_handlers = {QLineEdit: LineEditFilterHandler, QComboBox: ComboBoxFilterHandler}
if (
signal_without_device != signal_name
and component_name.replace(".", "_") != signal_without_device
):
items.append((f"{signal_without_device} ({signal_name})", signal_info))
else:
items.append((signal_name, signal_info))
return items
@staticmethod
def set_selection(widget, selection: list[str | tuple], ignore_errors=True):
"""
Retrieve value from the widget instance.
Args:
widget: Widget instance.
selection (list[str | tuple]): Filtered selection of items.
If tuple, it contains (text, data) pairs.
ignore_errors(bool, optional): Whether to ignore if no handler is found.
"""
handler_class = FilterIO._find_handler(widget)
if handler_class:
return handler_class().set_selection(widget=widget, selection=selection)
if not ignore_errors:
raise ValueError(
f"No matching handler for widget type: {type(widget)} in handler list {FilterIO._handlers}"
)
return None
def get_bec_signals_for_classes(
*, client, signal_class_filter: str | list[str], ndim_filter: int | list[int] | None = None
) -> list[tuple[str, str, dict]]:
"""Return BEC signals filtered by signal class and optional dimensionality.
@staticmethod
def check_input(widget, text: str, ignore_errors=True):
"""
Check if the input text is in the filtered selection.
Args:
client: BEC client that provides ``device_manager.get_bec_signals``.
signal_class_filter: Signal class name or class names passed to the device manager.
ndim_filter: Optional dimensionality filter. If provided, only signals whose
``describe.signal_info.ndim`` is in this value are returned.
Args:
widget: Widget instance.
text(str): Input text.
ignore_errors(bool, optional): Whether to ignore if no handler is found.
Returns:
Tuples of ``(device_name, signal_name, signal_config)`` for matching signals.
"""
if not client or not hasattr(client, "device_manager"):
return []
Returns:
bool: True if the input text is in the filtered selection.
"""
handler_class = FilterIO._find_handler(widget)
if handler_class:
return handler_class().check_input(widget=widget, text=text)
if not ignore_errors:
raise ValueError(
f"No matching handler for widget type: {type(widget)} in handler list {FilterIO._handlers}"
)
return None
try:
signals = client.device_manager.get_bec_signals(signal_class_filter)
except TypeCheckError as exc:
logger.warning(f"Error retrieving signals: {exc}")
return []
@staticmethod
def update_with_kind(
widget, kind: Kind, signal_filter: set, device_info: dict, device_name: str
) -> list[str | tuple]:
"""
Update the selection based on the kind of signal.
if ndim_filter is None:
return signals
Args:
widget: Widget instance.
kind (Kind): The kind of signal to filter.
signal_filter (set): Set of signal kinds to filter.
device_info (dict): Dictionary containing device information.
device_name (str): Name of the device.
accepted_ndim = [ndim_filter] if isinstance(ndim_filter, int) else ndim_filter
filtered_signals: list[tuple[str, str, dict]] = []
for device_name, signal_name, signal_config in signals:
ndim = None
if isinstance(signal_config, dict):
ndim = signal_config.get("describe", {}).get("signal_info", {}).get("ndim")
if ndim in accepted_ndim:
filtered_signals.append((device_name, signal_name, signal_config))
return filtered_signals
Returns:
list[str | tuple]: A list of filtered signals based on the kind.
"""
handler_class = FilterIO._find_handler(widget)
if handler_class:
return handler_class().update_with_kind(
kind=kind,
signal_filter=signal_filter,
device_info=device_info,
device_name=device_name,
)
raise ValueError(
f"No matching handler for widget type: {type(widget)} in handler list {FilterIO._handlers}"
)
@staticmethod
def update_with_signal_class(
widget, signal_class_filter: list[str], client, ndim_filter: int | list[int] | None = None
) -> list[tuple[str, str, dict]]:
"""
Update the selection based on signal classes using device_manager.get_bec_signals.
Args:
widget: Widget instance.
signal_class_filter (list[str]): List of signal class names to filter.
client: BEC client instance.
ndim_filter (int | list[int] | None): Filter signals by dimensionality.
If provided, only signals with matching ndim will be included.
Returns:
list[tuple[str, str, dict]]: A list of (device_name, signal_name, signal_config) tuples.
"""
handler_class = FilterIO._find_handler(widget)
if handler_class:
return handler_class().update_with_bec_signal_class(
signal_class_filter=signal_class_filter, client=client, ndim_filter=ndim_filter
)
raise ValueError(
f"No matching handler for widget type: {type(widget)} in handler list {FilterIO._handlers}"
)
@staticmethod
def _find_handler(widget):
"""
Find the appropriate handler for the widget by checking its base classes.
Args:
widget: Widget instance.
Returns:
handler_class: The handler class if found, otherwise None.
"""
for base in type(widget).__mro__:
if base in FilterIO._handlers:
return FilterIO._handlers[base]
return None
+2 -4
View File
@@ -106,9 +106,7 @@ class TypedForm(BECWidget, QWidget):
def _add_griditem(self, item: FormItemSpec, row: int):
grid = self._form_grid.layout()
# Use title from FieldInfo if available, otherwise use the property name
label_text = item.info.title if item.info.title else item.name
label = QLabel(parent=self._form_grid, text=label_text)
label = QLabel(parent=self._form_grid, text=item.name)
label.setProperty("_model_field_name", item.name)
label.setToolTip(item.info.description or item.name)
grid.addWidget(label, row, 0)
@@ -150,7 +148,7 @@ class TypedForm(BECWidget, QWidget):
self.adjustSize()
def _new_grid_layout(self):
new_grid = QGridLayout()
new_grid = QGridLayout(self)
new_grid.setContentsMargins(0, 0, 0, 0)
return new_grid
+2 -4
View File
@@ -71,7 +71,7 @@ class FormItemSpec(BaseModel):
"""
The specification for an item in a dynamically generated form. Uses a pydantic FieldInfo
to store most annotation info, since one of the main purposes is to store data for
forms generated from pydantic models, but can also be composed from other sources or by hand.
forms genrated from pydantic models, but can also be composed from other sources or by hand.
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
@@ -192,7 +192,7 @@ class DynamicFormItem(QWidget):
@abstractmethod
def _add_main_widget(self) -> None:
self._main_widget: QWidget
"""Add the main data entry widget to self._main_widget and apply any
"""Add the main data entry widget to self._main_widget and appply any
constraints from the field info"""
@SafeSlot()
@@ -231,8 +231,6 @@ class StrFormItem(DynamicFormItem):
def __init__(self, parent: QWidget | None = None, *, spec: FormItemSpec) -> None:
super().__init__(parent=parent, spec=spec)
self._main_widget.textChanged.connect(self._value_changed)
if spec.info.description:
self._main_widget.setPlaceholderText(spec.info.description)
def _add_main_widget(self) -> None:
self._main_widget = QLineEdit()
@@ -1,53 +0,0 @@
from __future__ import annotations
from collections.abc import Mapping
from typing import Any
from bec_lib.scan_args import ScanArgument
from pydantic import BaseModel
from pydantic_core import PydanticUndefined
from bec_widgets.utils.scan_arg_metadata import ui_config_from_metadata
NUMERIC_BOUND_KEYS = {"gt", "ge", "lt", "le"}
def pydantic_model_input_configs(model: type[BaseModel]) -> list[dict[str, Any]]:
"""Return scan-control-style field items for a Pydantic model."""
configs = []
for name, info in model.model_fields.items():
metadata: dict[str, Any] = {}
for entry in info.metadata:
if isinstance(entry, ScanArgument):
metadata.update(entry.model_dump(exclude_none=True))
continue
for key in NUMERIC_BOUND_KEYS:
value = getattr(entry, key, None)
if value is not None:
metadata.setdefault(key, value)
if isinstance(info.json_schema_extra, Mapping):
metadata.update(dict(info.json_schema_extra))
if info.description and metadata.get("description") is None:
metadata["description"] = info.description
default: Any
if info.default is not PydanticUndefined:
default = info.default
elif info.default_factory is not None:
default = info.get_default(call_default_factory=True)
else:
default = None
display_name = metadata.get("display_name") or info.title
if display_name is None:
display_name = name.replace("_", " ").capitalize()
item = ui_config_from_metadata(
name=name, metadata=metadata, default=default, display_name=display_name
)
item.update({key: value for key, value in metadata.items() if key not in item})
configs.append(item)
return configs
@@ -1,815 +0,0 @@
from __future__ import annotations
from types import NoneType
from typing import Any, Literal, get_args, get_origin
from bec_lib.device import DeviceBase, Signal
from pydantic import BaseModel, ValidationError
from pydantic.fields import FieldInfo
from qtpy.QtCore import Qt
from qtpy.QtCore import Signal as QtSignal
from qtpy.QtWidgets import (
QCheckBox,
QComboBox,
QDoubleSpinBox,
QFormLayout,
QHBoxLayout,
QLineEdit,
QSpinBox,
QWidget,
)
from bec_widgets.utils.forms_from_types.pydantic_model_info_adapter import (
NUMERIC_BOUND_KEYS,
pydantic_model_input_configs,
)
from bec_widgets.utils.scan_arg_metadata import (
apply_numeric_limits,
apply_numeric_precision,
apply_unit_metadata,
device_units,
)
from bec_widgets.utils.widget_io import WidgetIO
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import SignalComboBox
from bec_widgets.widgets.utility.spinbox.decimal_spinbox import BECSpinBox
class OptionalValueWidget(QWidget):
"""Wrap a value widget with an enable checkbox for optional Pydantic fields.
Attributes:
value_changed: Signal emitted with the current value whenever the checkbox
state or wrapped widget value changes.
"""
value_changed = QtSignal(object)
def __init__(self, value_widget: QWidget, parent: QWidget | None = None) -> None:
"""Create an optional-value wrapper.
Args:
value_widget: Input widget used when the optional value is enabled.
parent: Optional parent widget.
"""
super().__init__(parent=parent)
self._value_widget = value_widget
self._checkbox = QCheckBox(self)
self._checkbox.setToolTip("Enable value")
self._value_widget.setParent(self)
layout = QHBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(8)
layout.addWidget(self._checkbox)
layout.addWidget(self._value_widget, 1)
self._checkbox.toggled.connect(self._on_enabled_changed)
WidgetIO.connect_widget_change_signal(self._value_widget, self._emit_current_value)
self._on_enabled_changed(False)
@property
def value_widget(self) -> QWidget:
"""Return the wrapped input widget.
Returns:
The widget that edits the non-``None`` value.
"""
return self._value_widget
@property
def checkbox(self) -> QCheckBox:
"""Return the checkbox controlling whether the value is enabled.
Returns:
The enable checkbox.
"""
return self._checkbox
def value(self) -> Any:
"""Return the current optional value.
Returns:
``None`` when the checkbox is unchecked; otherwise the wrapped widget value.
"""
if not self._checkbox.isChecked():
return None
return WidgetIO.get_value(self._value_widget)
def set_value(self, value: Any) -> None:
"""Set the optional value.
Args:
value: Value to set on the wrapped widget. ``None`` disables the value.
"""
enabled = value is not None
self._checkbox.setChecked(enabled)
self._value_widget.setEnabled(enabled)
if enabled:
WidgetIO.set_value(self._value_widget, value)
def _on_enabled_changed(self, enabled: bool) -> None:
self._value_widget.setEnabled(enabled)
self.value_changed.emit(self.value())
def _emit_current_value(self, *_args) -> None:
self.value_changed.emit(self.value())
class PydanticWidgetForm(QWidget):
"""Generate a Qt form from a Pydantic model.
The form maps Pydantic field annotations to Qt widgets, applies supported
field metadata, and exposes typed and raw data accessors for the generated
fields.
Attributes:
changed: Signal emitted whenever a generated input widget changes.
validity_changed: Signal emitted by :meth:`validate` with the current
validation result.
"""
changed = QtSignal()
validity_changed = QtSignal(bool)
def __init__(
self,
model: type[BaseModel],
parent: QWidget | None = None,
*,
data: BaseModel | dict[str, Any] | None = None,
read_only_fields: set[str] | None = None,
client=None,
) -> None:
"""Create a generated form for a Pydantic model.
Args:
model: Pydantic model class used to generate fields and validate data.
parent: Optional parent widget.
data: Optional initial model instance or raw field-value mapping.
read_only_fields: Field names that should be displayed but not editable.
client: Optional BEC client passed to domain-specific widgets such as
device and signal combo boxes.
"""
super().__init__(parent=parent)
self._model = model
self._client = client
self._read_only_fields = set(read_only_fields or set())
self._widgets: dict[str, QWidget] = {}
self._field_configs: dict[str, dict[str, Any]] = {}
self._baseline: dict[str, Any] = {}
self._layout = QFormLayout()
self._layout.setContentsMargins(0, 0, 0, 0)
self._layout.setHorizontalSpacing(10)
self._layout.setVerticalSpacing(8)
self._layout.setFieldGrowthPolicy(QFormLayout.FieldGrowthPolicy.AllNonFixedFieldsGrow)
self._layout.setLabelAlignment(Qt.AlignmentFlag.AlignRight)
self.setLayout(self._layout)
self._populate()
if data is not None:
self.set_data(data)
self.mark_clean()
@property
def model(self) -> type[BaseModel]:
"""Return the active Pydantic model class.
Returns:
The model class currently used by this form.
"""
return self._model
@property
def widgets(self) -> dict[str, QWidget]:
"""Return generated field widgets keyed by model field name.
Returns:
A shallow copy of the field-widget mapping. Optional fields return
their outer :class:`OptionalValueWidget`.
"""
return dict(self._widgets)
def field_widget(self, name: str) -> QWidget:
"""Return the generated widget for a field.
Args:
name: Model field name.
Returns:
The generated field widget. Optional fields return their outer
:class:`OptionalValueWidget`.
Raises:
KeyError: If no widget exists for ``name``.
"""
return self._widgets[name]
def input_widget(self, name: str) -> QWidget:
"""Return the direct input widget for a field.
Args:
name: Model field name.
Returns:
The editable input widget. Optional fields return the wrapped value
widget instead of the outer optional wrapper.
Raises:
KeyError: If no widget exists for ``name``.
"""
widget = self._widgets[name]
if isinstance(widget, OptionalValueWidget):
return widget.value_widget
return widget
def input_widgets(self) -> dict[str, QWidget]:
"""Return direct input widgets keyed by model field name.
Returns:
Mapping of field names to editable input widgets.
"""
return {name: self.input_widget(name) for name in self._widgets}
def input_widgets_by_type(self, widget_type: type[QWidget]) -> list[QWidget]:
"""Return direct input widgets matching a widget type.
Args:
widget_type: Qt widget class to match with ``isinstance``.
Returns:
List of input widgets matching ``widget_type``.
"""
return [
widget for widget in self.input_widgets().values() if isinstance(widget, widget_type)
]
def set_model(self, model: type[BaseModel], data: dict[str, Any] | None = None) -> None:
"""Replace the active model and rebuild the form.
Args:
model: New Pydantic model class.
data: Optional initial data for the new model. When omitted, values
from fields shared with the previous model are preserved.
"""
old_data = self.raw_data()
self.cleanup()
self._model = model
self._populate()
if data is None:
data = {key: value for key, value in old_data.items() if key in model.model_fields}
self.set_partial_data(data)
self.mark_clean()
def set_data(self, data: BaseModel | dict[str, Any]) -> None:
"""Set form values from a model instance or mapping.
Args:
data: Pydantic model instance or raw field-value mapping.
"""
values = data.model_dump() if isinstance(data, BaseModel) else dict(data)
self.set_partial_data(values)
def set_partial_data(self, data: dict[str, Any]) -> None:
"""Set values for fields present in the form.
Unknown keys are ignored, which allows callers to pass larger model
dumps or backend payloads safely.
Args:
data: Field-value mapping to apply.
"""
for name, value in data.items():
if name not in self._widgets:
continue
self._set_widget_value(name, value)
self._refresh_reference_units()
self.changed.emit()
def raw_data(self) -> dict[str, Any]:
"""Return current widget values without Pydantic validation.
Returns:
Mapping of model field names to raw widget values.
"""
return {name: self._read_widget_value(name) for name in self._widgets}
def get_data(self) -> dict[str, Any]:
"""Return current data after Pydantic validation.
Returns:
Validated model data as a dictionary.
Raises:
ValidationError: If Pydantic validation fails.
ValueError: If domain widget validation fails.
"""
return self.model_instance().model_dump()
def model_instance(self) -> BaseModel:
"""Return the current values as a Pydantic model instance.
Returns:
Validated instance of the active model class.
Raises:
ValidationError: If Pydantic validation fails.
ValueError: If domain widget validation fails.
"""
self._validate_domain_widgets()
return self._model.model_validate(self.raw_data())
def validate(self) -> bool:
"""Validate the current form values.
Returns:
``True`` when current values validate successfully, otherwise ``False``.
"""
try:
self.get_data()
except (ValidationError, ValueError):
self.validity_changed.emit(False)
return False
self.validity_changed.emit(True)
return True
def dirty_fields(self) -> set[str]:
"""Return fields whose raw values differ from the clean baseline.
Returns:
Set of dirty field names.
"""
current = self.raw_data()
fields = set(current) | set(self._baseline)
return {field for field in fields if current.get(field) != self._baseline.get(field)}
def mark_clean(self) -> None:
"""Store the current raw values as the clean baseline."""
self._baseline = self.raw_data()
def reset_to_baseline(self) -> None:
"""Restore the form values to the current clean baseline."""
self.set_partial_data(self._baseline)
def editable_data(self) -> dict[str, Any]:
"""Return validated data excluding read-only fields.
Returns:
Validated editable field values.
Raises:
ValidationError: If Pydantic validation fails.
ValueError: If domain widget validation fails.
"""
return {
key: value
for key, value in self.get_data().items()
if key not in self._read_only_fields
}
def raw_editable_data(self) -> dict[str, Any]:
"""Return raw widget data excluding read-only fields.
Returns:
Raw editable field values.
"""
return {
key: value
for key, value in self.raw_data().items()
if key not in self._read_only_fields
}
def cleanup(self) -> None:
"""Close and schedule deletion of all generated field widgets."""
while self._layout.rowCount():
row = self._layout.takeRow(0)
for item in (row.labelItem, row.fieldItem):
widget = item.widget() if item is not None else None
if widget is not None:
widget.close()
# Detach before deleteLater: a child pending deletion that still has a
# signal connection into this form crashes if the form is garbage
# collected before the deferred delete is processed.
widget.setParent(None)
widget.deleteLater()
self._widgets.clear()
self._field_configs.clear()
def closeEvent(self, event) -> None: # noqa: N802
self.cleanup()
super().closeEvent(event)
def _populate(self) -> None:
for config in pydantic_model_input_configs(self._model):
name = config["name"]
info = self._model.model_fields[name]
widget = self._create_widget(name, info)
label_text = config["display_name"]
self._layout.addRow(label_text, widget)
label = self._layout.labelForField(widget)
if label is not None:
label.setProperty("_model_field_name", name)
if config.get("tooltip") and label is not None:
label.setToolTip(config["tooltip"])
widget.setEnabled(name not in self._read_only_fields)
self._widgets[name] = widget
self._field_configs[name] = config
self._set_widget_value(name, config["default"])
self._apply_field_metadata(name)
self._connect_widget(widget)
self._connect_device_signal_widgets()
self._connect_reference_unit_widgets()
self._refresh_reference_units()
def _create_widget(self, name: str, info: FieldInfo) -> QWidget:
annotation = info.annotation
args = get_args(annotation)
optional = NoneType in args
non_none_args = tuple(arg for arg in args if arg is not NoneType)
value_annotation = non_none_args[0] if len(non_none_args) == 1 else annotation
widget = self._create_value_widget(name, value_annotation)
numeric = value_annotation in (int, float) or any(
arg in (int, float) for arg in get_args(value_annotation)
)
if optional and (numeric or value_annotation is bool):
return OptionalValueWidget(widget, parent=self)
return widget
def _create_value_widget(self, name: str, annotation: Any) -> QWidget:
args = get_args(annotation)
if (
isinstance(annotation, type)
and issubclass(annotation, Signal)
or any(isinstance(arg, type) and issubclass(arg, Signal) for arg in args)
):
return SignalComboBox(
parent=self,
client=self._client,
require_device=self._model_has_device_field(),
arg_name=name,
)
if (
isinstance(annotation, type)
and issubclass(annotation, DeviceBase)
or any(isinstance(arg, type) and issubclass(arg, DeviceBase) for arg in args)
):
return DeviceComboBox(parent=self, client=self._client, arg_name=name)
if get_origin(annotation) is Literal:
widget = QComboBox(self)
widget.addItems([str(value) for value in get_args(annotation)])
return widget
if annotation is bool:
return QCheckBox(self)
if annotation is int:
spin_box = QSpinBox(self)
spin_box.setRange(-2147483647, 2147483647)
return spin_box
if annotation is float:
spin_box = BECSpinBox(self)
spin_box.setRange(-1_000_000_000, 1_000_000_000)
return spin_box
return QLineEdit(self)
def _apply_field_metadata(self, name: str) -> None:
config = self._field_configs[name]
field_widget = self._widgets[name]
input_widget = self.input_widget(name)
if config.get("precision") is not None:
apply_numeric_precision(input_widget, config)
if any(config.get(key) is not None for key in NUMERIC_BOUND_KEYS):
apply_numeric_limits(input_widget, config)
apply_unit_metadata(field_widget, config)
if input_widget is not field_widget:
apply_unit_metadata(input_widget, config)
def _connect_widget(self, widget: QWidget) -> None:
if isinstance(widget, OptionalValueWidget):
widget.value_changed.connect(lambda _value: self.changed.emit())
return
WidgetIO.connect_widget_change_signal(widget, lambda *_args: self.changed.emit())
def _connect_device_signal_widgets(self) -> None:
devices = [
widget for widget in self._widgets.values() if isinstance(widget, DeviceComboBox)
]
signals = [
widget for widget in self._widgets.values() if isinstance(widget, SignalComboBox)
]
if not devices or not signals:
return
device_widget = devices[0]
for signal_widget in signals:
device_widget.device_selected.connect(signal_widget.set_device)
device_widget.device_reset.connect(lambda w=signal_widget: w.set_device(None))
if device_widget.currentText().strip():
signal_widget.set_device(device_widget.currentText().strip())
def _connect_reference_unit_widgets(self) -> None:
for name, widget in self.input_widgets().items():
if not isinstance(widget, DeviceComboBox):
continue
widget.device_selected.connect(
lambda _device_name, field_name=name: self._update_reference_units(field_name)
)
widget.device_reset.connect(
lambda field_name=name: self._apply_reference_units(field_name, None)
)
widget.currentTextChanged.connect(
lambda text, field_name=name: self._handle_reference_device_text(field_name, text)
)
def _refresh_reference_units(self) -> None:
for name, widget in self.input_widgets().items():
if isinstance(widget, DeviceComboBox):
self._update_reference_units(name)
def _update_reference_units(self, source_name: str) -> None:
widget = self.input_widget(source_name)
if not isinstance(widget, DeviceComboBox) or not widget.is_valid_input:
self._apply_reference_units(source_name, None)
return
self._apply_reference_units(source_name, device_units(widget.get_current_device()))
def _apply_reference_units(self, source_name: str, units: str | None) -> None:
for field_name, config in self._field_configs.items():
if config.get("reference_units") != source_name:
continue
field_widget = self.field_widget(field_name)
input_widget = self.input_widget(field_name)
apply_unit_metadata(field_widget, config, units)
if input_widget is not field_widget:
apply_unit_metadata(input_widget, config, units)
def _handle_reference_device_text(self, source_name: str, device_name: str) -> None:
widget = self.input_widget(source_name)
if isinstance(widget, DeviceComboBox) and not widget.validate_device(device_name):
self._apply_reference_units(source_name, None)
def _validate_domain_widgets(self) -> None:
for widget in self._widgets.values():
if isinstance(widget, DeviceComboBox):
device = widget.currentText().strip()
if not device:
raise ValueError("Device is required.")
if not widget.is_valid_input:
raise ValueError(f"Device '{device}' is not available.")
if isinstance(widget, SignalComboBox):
signal = widget.get_signal_name().strip()
if signal and not widget.is_valid_input:
raise ValueError(f"Signal '{signal}' is not available.")
def _read_widget_value(self, name: str) -> Any:
widget = self._widgets[name]
info = self._model.model_fields[name]
if isinstance(widget, OptionalValueWidget):
return widget.value()
if isinstance(widget, QLineEdit):
value = WidgetIO.get_value(widget)
return None if NoneType in get_args(info.annotation) and value == "" else value
if isinstance(widget, QComboBox) and get_origin(info.annotation) is Literal:
return WidgetIO.get_value(widget, as_string=True)
return WidgetIO.get_value(widget)
def _set_widget_value(self, name: str, value: Any) -> None:
widget = self._widgets[name]
if isinstance(widget, OptionalValueWidget):
widget.set_value(value)
return
if value is None:
if isinstance(widget, QLineEdit):
value = ""
elif isinstance(widget, QCheckBox):
value = False
elif isinstance(widget, (QSpinBox, QDoubleSpinBox)):
value = 0
WidgetIO.set_value(widget, value)
def _model_has_device_field(self) -> bool:
for field in self._model.model_fields.values():
annotation = field.annotation
args = get_args(annotation)
has_device = (
isinstance(annotation, type)
and issubclass(annotation, DeviceBase)
or any(isinstance(arg, type) and issubclass(arg, DeviceBase) for arg in args)
)
has_signal = (
isinstance(annotation, type)
and issubclass(annotation, Signal)
or any(isinstance(arg, type) and issubclass(arg, Signal) for arg in args)
)
if has_device and not has_signal:
return True
return False
if __name__ == "__main__": # pragma: no cover
import json
import sys
from bec_lib.scan_args import ScanArgument
from pydantic import Field
from qtpy.QtWidgets import QApplication, QLabel, QPushButton, QTabWidget, QTextEdit, QVBoxLayout
from bec_widgets.utils.colors import apply_theme
class BasicScanConfig(BaseModel):
"""Plain Pydantic fields without GUI metadata."""
sample_name: str
enabled: bool = True
repeats: int = 3
class LimitConfig(BaseModel):
"""Normal Pydantic Field metadata."""
mode: Literal["monitor", "scan", "calibration"] = "scan"
low_limit: (
float | None
) # example of the field without additional metadata, still works in form
high_limit: float | None = Field(
default=10.0,
title="High limit",
description="Optional upper allowed value.",
json_schema_extra={"precision": 4},
)
tolerance: float = Field(
default=0.1,
title="Tolerance",
description="Warning tolerance around configured limits.",
json_schema_extra={"precision": 4},
)
class ScanArgumentConfig(BaseModel):
"""ScanArgument metadata applied through Field extras."""
settling_time: float = Field(
default=0.0,
**ScanArgument(
display_name="Settling time",
description="Time to wait after moving.",
units="s",
precision=3,
ge=0,
).model_dump(),
)
frames: int = Field(
default=1,
**ScanArgument(
display_name="Frames", description="Number of frames per trigger.", ge=1
).model_dump(),
)
class DeviceSignalLimitsConfig(BaseModel):
"""Device, signal, and numeric fields whose units follow the selected device."""
model_config = {"arbitrary_types_allowed": True}
device: DeviceBase | str = Field(
default="",
**ScanArgument(display_name="Device", description="Positioner device.").model_dump(),
)
signal: Signal | str | None = Field(
default=None,
**ScanArgument(display_name="Signal", description="Device signal.").model_dump(),
)
low_limit: float | None = Field(
default=None,
**ScanArgument(
display_name="Low limit",
description="Optional lower limit.",
reference_units="device",
precision=4,
).model_dump(),
)
high_limit: float | None = Field(
default=None,
**ScanArgument(
display_name="High limit",
description="Optional upper limit.",
reference_units="device",
precision=4,
).model_dump(),
)
class DisplayConfig(BaseModel):
title: str | None = Field(
default=None, title="Title", description="Optional display title."
)
show_grid: bool = Field(default=True, title="Show grid")
refresh_interval: int = Field(
default=1000, title="Refresh interval", description="Refresh interval in milliseconds."
)
class DeviceAndSignalConfig(BaseModel):
model_config = {"arbitrary_types_allowed": True}
title: str | None = Field(
default=None, title="Title", description="Optional display title."
)
device: DeviceBase | str = Field(
default="", title="Device", description="BEC device selection."
)
signal: Signal | str | None = Field(
default=None,
title="Signal",
description="Signal selection scoped to the selected device.",
)
refresh_interval: int = Field(
default=1000, title="Refresh interval", description="Refresh interval in milliseconds."
)
class DeviceOnlyConfig(BaseModel):
model_config = {"arbitrary_types_allowed": True}
title: str | None = Field(
default=None, title="Title", description="Optional display title."
)
device: DeviceBase | str = Field(
default="", title="Device", description="BEC device selection."
)
refresh_interval: int = Field(
default=1000, title="Refresh interval", description="Refresh interval in milliseconds."
)
class SignalOnlyConfig(BaseModel):
model_config = {"arbitrary_types_allowed": True}
title: str | None = Field(
default=None, title="Title", description="Optional display title."
)
signal: Signal | str | None = Field(
default=None,
title="Signal",
description="Global BEC signal selection without a device field.",
)
refresh_interval: int = Field(
default=1000, title="Refresh interval", description="Refresh interval in milliseconds."
)
class ExampleWindow(QWidget):
def __init__(self) -> None:
super().__init__()
self.setWindowTitle("PydanticWidgetForm example")
self._tabs = QTabWidget(self)
self._output = QTextEdit(self)
self._output.setReadOnly(True)
self._output.setPlaceholderText("Validated form data appears here.")
self._forms: list[PydanticWidgetForm] = []
self._add_form("Basic", PydanticWidgetForm(BasicScanConfig))
self._add_form("Limits", PydanticWidgetForm(LimitConfig))
self._add_form("ScanArgument", PydanticWidgetForm(ScanArgumentConfig))
self._add_form("Display", PydanticWidgetForm(DisplayConfig))
self._add_form("Device + signal", PydanticWidgetForm(DeviceAndSignalConfig))
self._add_form("Device limits", PydanticWidgetForm(DeviceSignalLimitsConfig))
self._add_form("Device only", PydanticWidgetForm(DeviceOnlyConfig))
self._add_form("Signal only", PydanticWidgetForm(SignalOnlyConfig))
show_data = QPushButton("Show current tab data", self)
show_data.clicked.connect(self._show_current_data)
layout = QVBoxLayout(self)
layout.addWidget(QLabel("Generated forms from Pydantic models", self))
layout.addWidget(self._tabs)
layout.addWidget(show_data)
layout.addWidget(self._output)
def _add_form(self, title: str, form: PydanticWidgetForm) -> None:
form.changed.connect(lambda _form=form: self._on_form_changed(_form))
self._forms.append(form)
self._tabs.addTab(form, title)
def _show_current_data(self, _checked: bool = False, *, validate: bool = True) -> None:
form = self._forms[self._tabs.currentIndex()]
if validate:
try:
data = form.get_data()
except (ValidationError, ValueError) as exc:
self._output.setPlainText(str(exc))
return
key = "data"
else:
data = form.raw_data()
key = "raw_data"
self._output.setPlainText(
json.dumps(
{key: data, "dirty_fields": sorted(form.dirty_fields())}, indent=2, default=str
)
)
def _on_form_changed(self, form: PydanticWidgetForm) -> None:
if form is self._forms[self._tabs.currentIndex()]:
self._show_current_data(validate=False)
app = QApplication(sys.argv)
apply_theme("dark")
window = ExampleWindow()
window.show()
sys.exit(app.exec())

Some files were not shown because too many files have changed in this diff Show More