Compare commits

..

53 Commits

Author SHA1 Message Date
wakonig_k d91a61ae45 f - wip 2025-12-23 11:37:16 +01:00
wakonig_k 0419b63e3f feat(web console): add support for shared web console sessions 2025-12-23 11:07:34 +01:00
wyzula_j b4ad75aade fix(client): client API regenerated 2025-12-19 14:12:48 +01:00
wyzula_j 82ce27a700 feat(device-manager): Add DeviceManager Widget for BEC Widget main applications 2025-12-19 14:12:08 +01:00
wyzula_j fe8e6d9427 fix(general_app): old general app example removed 2025-12-19 14:12:08 +01:00
wyzula_j f8cd8d0d06 fix(heatmap): interpolation thread is killed only on exit, logger for dandling thread 2025-12-19 14:12:08 +01:00
wyzula_j 821b61bcc0 perf(heatmap): thread worker optimization 2025-12-19 14:12:08 +01:00
wyzula_j d9afe31d61 fix(heatmap): interpolation of the image moved to separate thread 2025-12-19 14:12:08 +01:00
wyzula_j 8df2576390 fix(motor_map): x/y motor are saved in properties 2025-12-19 14:12:08 +01:00
perl_d 240c6dd439 fix: don't wait forever 2025-12-19 14:08:47 +01:00
wyzula_j 6039d070b7 fix(widget_state_manager): PROPERTIES_TO_SKIP are not restored even if in ini file 2025-12-19 14:08:47 +01:00
wyzula_j 04fc10213d feat(advanced_dock_area): floating docks restore with relative geometry 2025-12-19 14:08:47 +01:00
wakonig_k c3d6eb009f refactor: improvements to enum access 2025-12-19 14:08:47 +01:00
wyzula_j 669a84cb21 feat(advanced_dock_area): instance lock for multiple ads in same session 2025-12-19 14:08:47 +01:00
wyzula_j 1211a66577 fix(widgets): removed isVisible from all SafeProperties 2025-12-19 14:08:47 +01:00
wyzula_j a2374f00b0 fix(bec_widget): improved qt enums; grab safeguard 2025-12-19 14:08:47 +01:00
wyzula_j 92dc947e68 fix(qt_ads): pythons stubs match structure of PySide6QtAds 2025-12-19 14:08:47 +01:00
wyzula_j 574fd051c1 fix(widget_state_manager): filtering of not wanted properties 2025-12-19 14:08:47 +01:00
wyzula_j 8070d60370 refactor(main_app): adapted for DockAreaWidget changes 2025-12-19 14:08:47 +01:00
wyzula_j 23a232da9c refactor(developer_view): changed to use DockAreaWidget 2025-12-19 14:08:47 +01:00
wyzula_j c7d40ca82c refactor(monaco_dock): changed to use DockAreaWidget 2025-12-19 14:08:47 +01:00
wyzula_j 1c38d7a6ff feat(advanced_dock_area): created DockAreaWidget base class; profile management through namespaces; dock area variants 2025-12-19 14:08:47 +01:00
wyzula_j 75afac2fc7 fix(main_window): removed general forced cleanup 2025-12-19 14:08:47 +01:00
wyzula_j 4ad8b7cb22 feat(advanced_dock_area): UI/UX for profile management improved, saving directories logic adjusted 2025-12-19 14:08:47 +01:00
wyzula_j 8c9d06e9d6 fix(main_window): cleanup adjusted with shiboken6 2025-12-19 14:08:47 +01:00
wyzula_j 781f7cc055 fix(dark_mode_button): skip settings added 2025-12-19 14:08:47 +01:00
wyzula_j 2dac1c38c1 fix(widget_state_manager): added shiboken check 2025-12-19 14:08:47 +01:00
wyzula_j a00bb0fe58 feat(bec_widget): save screenshot to bytes 2025-12-19 14:08:47 +01:00
wyzula_j 435873b539 fix(becconnector): ophyd thread killer on exit + in conftest 2025-12-19 14:08:47 +01:00
wakonig_k cb253e5998 feat(guided_tour): add guided tour 2025-12-19 14:08:47 +01:00
wakonig_k 17fa18d9d2 fix: add metadata to scan control export 2025-12-19 14:08:47 +01:00
wyzula_j f7210b88ea feat(developer_view): add developer view 2025-12-19 14:08:47 +01:00
wyzula_j cb2ccb02ff feat(jupyter_console_window): adjustment for general usage 2025-12-19 14:08:47 +01:00
wakonig_k cacf98cb9a feat(ads): add pyi stub file to provide type hints for ads 2025-12-19 14:08:47 +01:00
appel_c 5d0ec2186b feat(dm-view): initial device manager view added 2025-12-19 14:08:47 +01:00
appel_c fc4ad051f8 feat(help-inspector): add help inspector widget 2025-12-19 14:08:47 +01:00
wyzula_j d5aaba1adb fix(signal_label): dispatcher unsubscribed in the cleanup 2025-12-19 14:08:47 +01:00
wyzula_j 5b9fdc7d30 fix(client): abort, reset, stop button removed from RPC access 2025-12-19 14:08:47 +01:00
wyzula_j 943a911a17 feat(main_app): main app with interactive app switcher 2025-12-19 14:08:47 +01:00
wyzula_j 80f7829adc feat(actions): actions can be created with label text with beside or under alignment 2025-12-19 14:08:47 +01:00
wyzula_j f6f0ad445f feat(busy_loader): busy loader added to bec widget base class 2025-12-19 14:08:47 +01:00
wakonig_k 554704a63a feat: add SafeConnect 2025-12-19 14:08:47 +01:00
wyzula_j a3e044bf50 fix(bec_widgets): adapt to bec_qthemes 1.0; themes can be only applied on living Qt objects 2025-12-19 14:08:47 +01:00
wyzula_j 66ae2b25fd feat(advanced_dock_area): added ads based dock area with profiles 2025-12-19 13:33:53 +01:00
wyzula_j 532b7422b8 fix(web_console): added startup kwarg 2025-12-19 13:33:53 +01:00
wyzula_j 80d1c29ab1 refactor(bec_main_window): main app theme renamed to View 2025-12-19 13:33:53 +01:00
wyzula_j 18b0cd4142 fix(widget_state_manager): state manager can save all properties recursively to already existing settings 2025-12-19 13:33:53 +01:00
wyzula_j a26e1f4811 feat(bec_widget): attach/detach method for all widgets + client regenerated 2025-12-19 13:33:51 +01:00
wyzula_j 3b7bc2b25a feat(widget_io): widget hierarchy can grap all bec connectors from the widget recursively 2025-12-19 11:38:43 +01:00
wyzula_j a7cf98cb58 fix(bec_connector): widget_removed and name_established signals added 2025-12-19 11:38:43 +01:00
wakonig_k 01b317367a ci: install ttyd 2025-12-19 11:38:43 +01:00
wakonig_k a9a4d3aa6e ci: add artifact upload 2025-12-19 11:38:43 +01:00
wyzula_j 605c13a6ea build: PySide6-QtAds; bec_qtheme V1; dependencies updated and adjusted 2025-12-19 11:38:43 +01:00
397 changed files with 12541 additions and 35191 deletions
+1 -1
View File
@@ -62,4 +62,4 @@ runs:
uv pip install --system -e ./ophyd_devices
uv pip install --system -e ./bec/bec_lib[dev]
uv pip install --system -e ./bec/bec_ipython_client
uv pip install --system -e ./bec_widgets[dev,qtermwidget]
uv pip install --system -e ./bec_widgets[dev,pyside6]
-169
View File
@@ -1,169 +0,0 @@
##########################
### AI-generated file. ###
##########################
"""Aggregate and merge benchmark JSON files.
The workflow runs the same benchmark suite on multiple independent runners.
This script reads every JSON file produced by those attempts, normalizes the
contained benchmark values, and writes a compact mapping JSON where each value is
the median across attempts. It can also merge independent hyperfine JSON files
from one runner into a single hyperfine-style JSON file.
"""
from __future__ import annotations
import argparse
import json
import statistics
from pathlib import Path
from typing import Any
from compare_benchmarks import Benchmark, extract_benchmarks
def collect_benchmarks(paths: list[Path]) -> dict[str, list[Benchmark]]:
"""Collect benchmarks from multiple JSON files.
Args:
paths (list[Path]): Paths to hyperfine, pytest-benchmark, or compact
mapping JSON files.
Returns:
dict[str, list[Benchmark]]: Benchmarks grouped by benchmark name.
"""
collected: dict[str, list[Benchmark]] = {}
for path in paths:
for name, benchmark in extract_benchmarks(path).items():
collected.setdefault(name, []).append(benchmark)
return collected
def aggregate(collected: dict[str, list[Benchmark]]) -> dict[str, dict[str, object]]:
"""Aggregate grouped benchmarks using the median value.
Args:
collected (dict[str, list[Benchmark]]): Benchmarks grouped by benchmark
name.
Returns:
dict[str, dict[str, object]]: Compact mapping JSON data. Each benchmark
contains ``value``, ``unit``, ``metric``, ``attempts``, and
``attempt_values``.
"""
aggregated: dict[str, dict[str, object]] = {}
for name, benchmarks in sorted(collected.items()):
values = [benchmark.value for benchmark in benchmarks]
unit = next((benchmark.unit for benchmark in benchmarks if benchmark.unit), "")
metric = next((benchmark.metric for benchmark in benchmarks if benchmark.metric), "value")
aggregated[name] = {
"value": statistics.median(values),
"unit": unit,
"metric": f"median-of-attempt-{metric}",
"attempts": len(values),
"attempt_values": values,
}
return aggregated
def merge_hyperfine_results(paths: list[Path]) -> dict[str, Any]:
"""Merge hyperfine result files.
Args:
paths (list[Path]): Hyperfine JSON files to merge.
Returns:
dict[str, Any]: Hyperfine-style JSON object containing all result rows.
Raises:
ValueError: If any file has no hyperfine ``results`` list.
"""
merged: dict[str, Any] = {"results": []}
for path in paths:
data = json.loads(path.read_text(encoding="utf-8"))
results = data.get("results", []) if isinstance(data, dict) else None
if not isinstance(results, list):
raise ValueError(f"{path} has no hyperfine results list")
merged["results"].extend(results)
return merged
def main_from_paths(input_dir: Path, output: Path) -> int:
"""Aggregate all JSON files in a directory and write the result.
Args:
input_dir (Path): Directory containing benchmark JSON files.
output (Path): Path where the aggregate JSON should be written.
Returns:
int: Always ``0`` on success.
Raises:
ValueError: If no JSON files are found in ``input_dir``.
"""
paths = sorted(input_dir.rglob("*.json"))
if not paths:
raise ValueError(f"No benchmark JSON files found in {input_dir}")
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(
json.dumps(aggregate(collect_benchmarks(paths)), indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
return 0
def merge_from_paths(input_dir: Path, output: Path) -> int:
"""Merge all hyperfine JSON files in a directory and write the result.
Args:
input_dir (Path): Directory containing hyperfine JSON files.
output (Path): Path where the merged JSON should be written.
Returns:
int: Always ``0`` on success.
Raises:
ValueError: If no JSON files are found in ``input_dir``.
"""
paths = sorted(input_dir.glob("*.json"))
if not paths:
raise ValueError(f"No hyperfine JSON files found in {input_dir}")
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(
json.dumps(merge_hyperfine_results(paths), indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
return 0
def main() -> int:
"""Run the benchmark aggregation command line interface.
Returns:
int: Always ``0`` on success.
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--mode",
choices=("aggregate", "merge-hyperfine"),
default="aggregate",
help="Operation to perform.",
)
parser.add_argument("--input-dir", required=True, type=Path)
parser.add_argument("--output", required=True, type=Path)
args = parser.parse_args()
if args.mode == "merge-hyperfine":
return merge_from_paths(input_dir=args.input_dir, output=args.output)
return main_from_paths(input_dir=args.input_dir, output=args.output)
if __name__ == "__main__":
raise SystemExit(main())
-454
View File
@@ -1,454 +0,0 @@
##########################
### AI-generated file. ###
##########################
"""Compare benchmark JSON files and write a GitHub Actions summary.
The script supports JSON emitted by hyperfine, JSON emitted by pytest-benchmark,
and a compact mapping format generated by ``aggregate_benchmarks.py``. Timing
formats prefer median values and fall back to mean values when median values are
not present.
"""
from __future__ import annotations
import argparse
import json
import math
from dataclasses import dataclass
from pathlib import Path
from typing import Any
@dataclass(frozen=True)
class Benchmark:
"""Normalized benchmark result.
Attributes:
name (str): Stable benchmark name used to match baseline and current results.
value (float): Numeric benchmark value used for comparison.
unit (str): Display unit for the value, for example ``"s"``.
metric (str): Source metric name, for example ``"median"`` or ``"mean"``.
"""
name: str
value: float
unit: str
metric: str = "value"
@dataclass(frozen=True)
class Comparison:
"""Comparison between one baseline benchmark and one current benchmark.
Attributes:
name (str): Benchmark name.
baseline (float): Baseline benchmark value.
current (float): Current benchmark value.
delta_percent (float): Percent change from baseline to current.
unit (str): Display unit for both values.
metric (str): Current result metric used for comparison.
regressed (bool): Whether the change exceeds the configured threshold in
the worse direction.
improved (bool): Whether the change exceeds the configured threshold in
the better direction.
"""
name: str
baseline: float
current: float
delta_percent: float
unit: str
metric: str
regressed: bool
improved: bool
def _read_json(path: Path) -> Any:
"""Read JSON data from a file.
Args:
path (Path): Path to the JSON file.
Returns:
Any: Parsed JSON value.
"""
with path.open("r", encoding="utf-8") as stream:
return json.load(stream)
def _as_float(value: Any) -> float | None:
"""Convert a value to a finite float.
Args:
value (Any): Value to convert.
Returns:
float | None: Converted finite float, or ``None`` if conversion fails.
"""
try:
result = float(value)
except (TypeError, ValueError):
return None
if math.isfinite(result):
return result
return None
def _extract_hyperfine(data: dict[str, Any]) -> dict[str, Benchmark]:
"""Extract normalized benchmarks from hyperfine JSON.
Args:
data (dict[str, Any]): Parsed hyperfine JSON object.
Returns:
dict[str, Benchmark]: Benchmarks keyed by command name.
"""
benchmarks: dict[str, Benchmark] = {}
for result in data.get("results", []):
if not isinstance(result, dict):
continue
name = str(result.get("command") or result.get("name") or "").strip()
metric = "median"
value = _as_float(result.get(metric))
if value is None:
metric = "mean"
value = _as_float(result.get(metric))
if name and value is not None:
benchmarks[name] = Benchmark(name=name, value=value, unit="s", metric=metric)
return benchmarks
def _extract_pytest_benchmark(data: dict[str, Any]) -> dict[str, Benchmark]:
"""Extract normalized benchmarks from pytest-benchmark JSON.
Args:
data (dict[str, Any]): Parsed pytest-benchmark JSON object.
Returns:
dict[str, Benchmark]: Benchmarks keyed by full benchmark name.
"""
benchmarks: dict[str, Benchmark] = {}
for benchmark in data.get("benchmarks", []):
if not isinstance(benchmark, dict):
continue
name = str(benchmark.get("fullname") or benchmark.get("name") or "").strip()
stats = benchmark.get("stats", {})
value = None
metric = "median"
if isinstance(stats, dict):
value = _as_float(stats.get(metric))
if value is None:
metric = "mean"
value = _as_float(stats.get(metric))
if name and value is not None:
benchmarks[name] = Benchmark(name=name, value=value, unit="s", metric=metric)
return benchmarks
def _extract_simple_mapping(data: dict[str, Any]) -> dict[str, Benchmark]:
"""Extract normalized benchmarks from a compact mapping JSON object.
Args:
data (dict[str, Any]): Parsed mapping where each benchmark is either a
raw number or an object containing ``value``, ``unit``, and ``metric``.
Returns:
dict[str, Benchmark]: Benchmarks keyed by mapping key.
"""
benchmarks: dict[str, Benchmark] = {}
for name, raw_value in data.items():
if name in {"version", "context", "commit", "timestamp"}:
continue
value = _as_float(raw_value)
unit = ""
metric = "value"
if value is None and isinstance(raw_value, dict):
value = _as_float(raw_value.get("value"))
unit = str(raw_value.get("unit") or "")
metric = str(raw_value.get("metric") or "value")
if value is not None:
benchmarks[str(name)] = Benchmark(name=str(name), value=value, unit=unit, metric=metric)
return benchmarks
def extract_benchmarks(path: Path) -> dict[str, Benchmark]:
"""Extract normalized benchmarks from a supported JSON file.
Args:
path (Path): Path to a hyperfine, pytest-benchmark, or compact mapping
JSON file.
Returns:
dict[str, Benchmark]: Normalized benchmarks keyed by name.
Raises:
ValueError: If the JSON root is not an object or no supported benchmark
entries can be extracted.
"""
data = _read_json(path)
if not isinstance(data, dict):
raise ValueError(f"{path} must contain a JSON object")
extractors = (_extract_hyperfine, _extract_pytest_benchmark, _extract_simple_mapping)
for extractor in extractors:
benchmarks = extractor(data)
if benchmarks:
return benchmarks
raise ValueError(f"No supported benchmark entries found in {path}")
def compare_benchmarks(
baseline: dict[str, Benchmark],
current: dict[str, Benchmark],
threshold_percent: float,
higher_is_better: bool,
) -> tuple[list[Comparison], list[str], list[str]]:
"""Compare baseline benchmarks with current benchmarks.
Args:
baseline (dict[str, Benchmark]): Baseline benchmarks keyed by name.
current (dict[str, Benchmark]): Current benchmarks keyed by name.
threshold_percent (float): Regression threshold in percent.
higher_is_better (bool): If ``True``, lower current values are treated as
regressions. If ``False``, higher current values are treated as
regressions.
Returns:
tuple[list[Comparison], list[str], list[str]]: Comparisons for common
benchmark names, names missing from current results, and names newly
present in current results.
"""
comparisons: list[Comparison] = []
missing_in_current: list[str] = []
new_in_current: list[str] = []
for name, baseline_benchmark in sorted(baseline.items()):
current_benchmark = current.get(name)
if current_benchmark is None:
missing_in_current.append(name)
continue
if baseline_benchmark.value == 0:
delta_percent = 0.0
else:
delta_percent = (
(current_benchmark.value - baseline_benchmark.value)
/ abs(baseline_benchmark.value)
* 100
)
if higher_is_better:
regressed = delta_percent <= -threshold_percent
improved = delta_percent >= threshold_percent
else:
regressed = delta_percent >= threshold_percent
improved = delta_percent <= -threshold_percent
comparisons.append(
Comparison(
name=name,
baseline=baseline_benchmark.value,
current=current_benchmark.value,
delta_percent=delta_percent,
unit=current_benchmark.unit or baseline_benchmark.unit,
metric=current_benchmark.metric,
regressed=regressed,
improved=improved,
)
)
for name in sorted(set(current) - set(baseline)):
new_in_current.append(name)
return comparisons, missing_in_current, new_in_current
def _format_value(value: float, unit: str) -> str:
"""Format a benchmark value for Markdown output.
Args:
value (float): Numeric benchmark value.
unit (str): Display unit.
Returns:
str: Formatted value with optional unit suffix.
"""
suffix = f" {unit}" if unit else ""
return f"{value:.6g}{suffix}"
def _format_status(comparison: Comparison) -> str:
"""Format a comparison status for Markdown output."""
if comparison.regressed:
return ":red_circle: regressed"
if comparison.improved:
return ":green_circle: improved"
return "ok"
def write_summary(
path: Path,
comparisons: list[Comparison],
missing_in_current: list[str],
new_in_current: list[str],
threshold_percent: float,
higher_is_better: bool,
) -> None:
"""Write a Markdown benchmark comparison summary.
Args:
path (Path): Path where the summary should be written.
comparisons (list[Comparison]): Comparison rows for matching benchmarks.
missing_in_current (list[str]): Baseline benchmark names missing from the
current result.
new_in_current (list[str]): Current benchmark names not present in the
baseline result.
threshold_percent (float): Regression threshold in percent.
higher_is_better (bool): Whether higher benchmark values are considered
better.
"""
regressions = [comparison for comparison in comparisons if comparison.regressed]
improvements = [comparison for comparison in comparisons if comparison.improved]
direction = "higher is better" if higher_is_better else "lower is better"
sorted_comparisons = sorted(comparisons, key=lambda comparison: comparison.name)
lines = [
"<!-- bw-benchmark-comment -->",
"## Benchmark comparison",
"",
f"Threshold: {threshold_percent:g}% ({direction}).",
f"Result: {len(regressions)} regression(s), {len(improvements)} improvement(s) beyond threshold.",
]
lines.append("")
if regressions:
lines.extend(
[
f"{len(regressions)} benchmark(s) regressed beyond the configured threshold.",
"",
"| Benchmark | Baseline | Current | Change |",
"| --- | ---: | ---: | ---: |",
]
)
for comparison in regressions:
lines.append(
"| "
f"{comparison.name} | "
f"{_format_value(comparison.baseline, comparison.unit)} | "
f"{_format_value(comparison.current, comparison.unit)} | "
f"{comparison.delta_percent:+.2f}% |"
)
else:
lines.append("No benchmark regression exceeded the configured threshold.")
lines.append("")
if improvements:
lines.extend(
[
f"{len(improvements)} benchmark(s) improved beyond the configured threshold.",
"",
"| Benchmark | Baseline | Current | Change |",
"| --- | ---: | ---: | ---: |",
]
)
for comparison in improvements:
lines.append(
"| "
f"{comparison.name} | "
f"{_format_value(comparison.baseline, comparison.unit)} | "
f"{_format_value(comparison.current, comparison.unit)} | "
f"{comparison.delta_percent:+.2f}% |"
)
else:
lines.append("No benchmark improvement exceeded the configured threshold.")
if sorted_comparisons:
lines.extend(
[
"",
"<details>",
"<summary>All benchmark results</summary>",
"",
"| Benchmark | Baseline | Current | Change | Status |",
"| --- | ---: | ---: | ---: | --- |",
]
)
for comparison in sorted_comparisons:
lines.append(
"| "
f"{comparison.name} | "
f"{_format_value(comparison.baseline, comparison.unit)} | "
f"{_format_value(comparison.current, comparison.unit)} | "
f"{comparison.delta_percent:+.2f}% | "
f"{_format_status(comparison)} |"
)
lines.extend(["", "</details>"])
if missing_in_current:
lines.extend(["", "Missing benchmarks in the current run:"])
lines.extend(f"- `{name}`" for name in missing_in_current)
if new_in_current:
lines.extend(["", "New benchmarks in the current run:"])
lines.extend(f"- `{name}`" for name in new_in_current)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
def main() -> int:
"""Run the benchmark comparison command line interface.
Returns:
int: ``1`` when a regression exceeds the threshold, otherwise ``0``.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--baseline", required=True, type=Path)
parser.add_argument("--current", required=True, type=Path)
parser.add_argument("--summary", required=True, type=Path)
parser.add_argument("--threshold-percent", required=True, type=float)
parser.add_argument("--higher-is-better", action="store_true")
args = parser.parse_args()
baseline = extract_benchmarks(args.baseline)
current = extract_benchmarks(args.current)
comparisons, missing_in_current, new_in_current = compare_benchmarks(
baseline=baseline,
current=current,
threshold_percent=args.threshold_percent,
higher_is_better=args.higher_is_better,
)
write_summary(
path=args.summary,
comparisons=comparisons,
missing_in_current=missing_in_current,
new_in_current=new_in_current,
threshold_percent=args.threshold_percent,
higher_is_better=args.higher_is_better,
)
return 1 if any(comparison.regressed for comparison in comparisons) else 0
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,342 @@
import functools
import os
from typing import Literal
import requests
from github import Github
from pydantic import BaseModel
class GHConfig(BaseModel):
token: str
organization: str
repository: str
project_number: int
graphql_url: str
rest_url: str
headers: dict
class ProjectItemHandler:
"""
A class to handle GitHub project items.
"""
def __init__(self, gh_config: GHConfig):
self.gh_config = gh_config
self.gh = Github(gh_config.token)
self.repo = self.gh.get_repo(f"{gh_config.organization}/{gh_config.repository}")
self.project_node_id = self.get_project_node_id()
def set_issue_status(
self,
status: Literal[
"Selected for Development",
"Weekly Backlog",
"In Development",
"Ready For Review",
"On Hold",
"Done",
],
issue_number: int | None = None,
issue_node_id: str | None = None,
):
"""
Set the status field of a GitHub issue in the project.
Args:
status (str): The status to set. Must be one of the predefined statuses.
issue_number (int, optional): The issue number. If not provided, issue_node_id must be provided.
issue_node_id (str, optional): The issue node ID. If not provided, issue_number must be provided.
"""
if not issue_number and not issue_node_id:
raise ValueError("Either issue_number or issue_node_id must be provided.")
if issue_number and issue_node_id:
raise ValueError("Only one of issue_number or issue_node_id must be provided.")
if issue_number is not None:
issue = self.repo.get_issue(issue_number)
issue_id = self.get_issue_info(issue.node_id)[0]["id"]
else:
issue_id = issue_node_id
field_id, option_id = self.get_status_field_id(field_name=status)
self.set_field_option(issue_id, field_id, option_id)
def run_graphql(self, query: str, variables: dict) -> dict:
"""
Execute a GraphQL query against the GitHub API.
Args:
query (str): The GraphQL query to execute.
variables (dict): The variables to pass to the query.
Returns:
dict: The response from the GitHub API.
"""
response = requests.post(
self.gh_config.graphql_url,
json={"query": query, "variables": variables},
headers=self.gh_config.headers,
timeout=10,
)
if response.status_code != 200:
raise Exception(
f"Query failed with status code {response.status_code}: {response.text}"
)
return response.json()
def get_project_node_id(self):
"""
Retrieve the project node ID from the GitHub API.
"""
query = """
query($owner: String!, $number: Int!) {
organization(login: $owner) {
projectV2(number: $number) {
id
}
}
}
"""
variables = {"owner": self.gh_config.organization, "number": self.gh_config.project_number}
resp = self.run_graphql(query, variables)
return resp["data"]["organization"]["projectV2"]["id"]
def get_issue_info(self, issue_node_id: str):
"""
Get the project-related information for a given issue node ID.
Args:
issue_node_id (str): The node ID of the issue. Please note that this is not the issue number and typically starts with "I".
Returns:
list[dict]: A list of project items associated with the issue.
"""
query = """
query($issueId: ID!) {
node(id: $issueId) {
... on Issue {
projectItems(first: 10) {
nodes {
project {
id
title
}
id
fieldValues(first: 20) {
nodes {
... on ProjectV2ItemFieldSingleSelectValue {
name
field {
... on ProjectV2SingleSelectField {
name
}
}
}
}
}
}
}
}
}
}
"""
variables = {"issueId": issue_node_id}
resp = self.run_graphql(query, variables)
return resp["data"]["node"]["projectItems"]["nodes"]
def get_status_field_id(
self,
field_name: Literal[
"Selected for Development",
"Weekly Backlog",
"In Development",
"Ready For Review",
"On Hold",
"Done",
],
) -> tuple[str, str]:
"""
Get the status field ID and option ID for the given field name in the project.
Args:
field_name (str): The name of the field to retrieve.
Must be one of the predefined statuses.
Returns:
tuple[str, str]: A tuple containing the field ID and option ID.
"""
field_id = None
option_id = None
project_fields = self.get_project_fields()
for field in project_fields:
if field["name"] != "Status":
continue
field_id = field["id"]
for option in field["options"]:
if option["name"] == field_name:
option_id = option["id"]
break
if not field_id or not option_id:
raise ValueError(f"Field '{field_name}' not found in project fields.")
return field_id, option_id
def set_field_option(self, item_id, field_id, option_id):
"""
Set the option of a project item for a single-select field.
Args:
item_id (str): The ID of the project item to update.
field_id (str): The ID of the field to update.
option_id (str): The ID of the option to set.
"""
mutation = """
mutation($projectId: ID!, $itemId: ID!, $fieldId: ID!, $optionId: String!) {
updateProjectV2ItemFieldValue(
input: {
projectId: $projectId
itemId: $itemId
fieldId: $fieldId
value: { singleSelectOptionId: $optionId }
}
) {
projectV2Item {
id
}
}
}
"""
variables = {
"projectId": self.project_node_id,
"itemId": item_id,
"fieldId": field_id,
"optionId": option_id,
}
return self.run_graphql(mutation, variables)
@functools.lru_cache(maxsize=1)
def get_project_fields(self) -> list[dict]:
"""
Get the available fields in the project.
This method caches the result to avoid multiple API calls.
Returns:
list[dict]: A list of fields in the project.
"""
query = """
query($projectId: ID!) {
node(id: $projectId) {
... on ProjectV2 {
fields(first: 50) {
nodes {
... on ProjectV2SingleSelectField {
id
name
options {
id
name
}
}
}
}
}
}
}
"""
variables = {"projectId": self.project_node_id}
resp = self.run_graphql(query, variables)
return list(filter(bool, resp["data"]["node"]["fields"]["nodes"]))
def get_pull_request_linked_issues(self, pr_number: int) -> list[dict]:
"""
Get the linked issues of a pull request.
Args:
pr_number (int): The pull request number.
Returns:
list[dict]: A list of linked issues.
"""
query = """
query($number: Int!, $owner: String!, $repo: String!) {
repository(owner: $owner, name: $repo) {
pullRequest(number: $number) {
id
closingIssuesReferences(first: 50) {
edges {
node {
id
body
number
title
}
}
}
}
}
}
"""
variables = {
"number": pr_number,
"owner": self.gh_config.organization,
"repo": self.gh_config.repository,
}
resp = self.run_graphql(query, variables)
edges = resp["data"]["repository"]["pullRequest"]["closingIssuesReferences"]["edges"]
return [edge["node"] for edge in edges if edge.get("node")]
def main():
# GitHub settings
token = os.getenv("TOKEN")
org = os.getenv("ORG")
repo = os.getenv("REPO")
project_number = os.getenv("PROJECT_NUMBER")
pr_number = os.getenv("PR_NUMBER")
if not token:
raise ValueError("GitHub token is not set. Please set the TOKEN environment variable.")
if not org:
raise ValueError("GitHub organization is not set. Please set the ORG environment variable.")
if not repo:
raise ValueError("GitHub repository is not set. Please set the REPO environment variable.")
if not project_number:
raise ValueError(
"GitHub project number is not set. Please set the PROJECT_NUMBER environment variable."
)
if not pr_number:
raise ValueError(
"Pull request number is not set. Please set the PR_NUMBER environment variable."
)
project_number = int(project_number)
pr_number = int(pr_number)
gh_config = GHConfig(
token=token,
organization=org,
repository=repo,
project_number=project_number,
graphql_url="https://api.github.com/graphql",
rest_url=f"https://api.github.com/repos/{org}/{repo}/issues",
headers={"Authorization": f"Bearer {token}", "Accept": "application/vnd.github+json"},
)
project_item_handler = ProjectItemHandler(gh_config=gh_config)
# Get PR info
pr = project_item_handler.repo.get_pull(pr_number)
# Get the linked issues of the pull request
linked_issues = project_item_handler.get_pull_request_linked_issues(pr_number=pr_number)
print(f"Linked issues: {linked_issues}")
target_status = "In Development" if pr.draft else "Ready For Review"
print(f"Target status: {target_status}")
for issue in linked_issues:
project_item_handler.set_issue_status(issue_number=issue["number"], status=target_status)
if __name__ == "__main__":
main()
@@ -0,0 +1,2 @@
pydantic
pygithub
-74
View File
@@ -1,74 +0,0 @@
#!/usr/bin/env bash
##########################
### AI-generated file. ###
##########################
set -euo pipefail
mkdir -p benchmark-results
benchmark_json="${BENCHMARK_JSON:-benchmark-results/current.json}"
benchmark_root="$(dirname "$benchmark_json")"
hyperfine_benchmark_dir="${BENCHMARK_HYPERFINE_DIR:-tests/benchmarks/hyperfine}"
pytest_benchmark_dirs="${BENCHMARK_PYTEST_DIRS:-${BENCHMARK_PYTEST_DIR:-}}"
benchmark_work_dir="$benchmark_root/raw-results"
hyperfine_json_dir="$benchmark_work_dir/hyperfine"
pytest_json="$benchmark_work_dir/pytest.json"
shopt -s nullglob
benchmark_scripts=()
benchmark_scripts=("$hyperfine_benchmark_dir"/benchmark_*.sh)
shopt -u nullglob
pytest_dirs=()
for pytest_benchmark_dir in $pytest_benchmark_dirs; do
if [ -d "$pytest_benchmark_dir" ]; then
pytest_dirs+=("$pytest_benchmark_dir")
else
echo "Pytest benchmark directory not found: $pytest_benchmark_dir" >&2
exit 1
fi
done
if [ "${#benchmark_scripts[@]}" -eq 0 ] && [ "${#pytest_dirs[@]}" -eq 0 ]; then
echo "No benchmark scripts or pytest benchmarks found" >&2
exit 1
fi
echo "Benchmark Python: $(command -v python)"
python -c 'import sys; print(sys.version)'
rm -rf "$benchmark_work_dir"
mkdir -p "$hyperfine_json_dir"
if [ "${#benchmark_scripts[@]}" -gt 0 ]; then
for benchmark_script in "${benchmark_scripts[@]}"; do
title="$(sed -n 's/^# BENCHMARK_TITLE:[[:space:]]*//p' "$benchmark_script" | head -n 1)"
if [ -z "$title" ]; then
title="$(basename "$benchmark_script" .sh)"
fi
benchmark_name="$(basename "$benchmark_script" .sh)"
benchmark_result_json="$hyperfine_json_dir/$benchmark_name.json"
echo "Preflight benchmark script: $benchmark_script"
bash "$benchmark_script"
hyperfine \
--show-output \
--warmup 1 \
--runs 5 \
--command-name "$title" \
--export-json "$benchmark_result_json" \
"bash $(printf "%q" "$benchmark_script")"
done
fi
if [ "${#pytest_dirs[@]}" -gt 0 ]; then
pytest \
-q "${pytest_dirs[@]}" \
--benchmark-only \
--benchmark-json "$pytest_json"
fi
python .github/scripts/aggregate_benchmarks.py \
--input-dir "$benchmark_work_dir" \
--output "$benchmark_json"
-125
View File
@@ -1,125 +0,0 @@
##########################
### AI-generated file. ###
##########################
"""Run a command with BEC e2e services available."""
from __future__ import annotations
import argparse
import os
import shutil
import subprocess
import tempfile
import time
from pathlib import Path
import bec_lib
from bec_ipython_client import BECIPythonClient
from bec_lib.redis_connector import RedisConnector
from bec_lib.service_config import ServiceConfig, ServiceConfigModel
from redis import Redis
def _wait_for_redis(host: str, port: int) -> None:
client = Redis(host=host, port=port)
deadline = time.monotonic() + 10
while time.monotonic() < deadline:
try:
if client.ping():
return
except Exception:
time.sleep(0.1)
raise RuntimeError(f"Redis did not start on {host}:{port}")
def _start_redis(files_path: Path, host: str, port: int) -> subprocess.Popen:
redis_server = shutil.which("redis-server")
if redis_server is None:
raise RuntimeError("redis-server executable not found")
return subprocess.Popen(
[
redis_server,
"--bind",
host,
"--port",
str(port),
"--save",
"",
"--appendonly",
"no",
"--dir",
str(files_path),
]
)
def _write_configs(files_path: Path, host: str, port: int) -> Path:
test_config = files_path / "test_config.yaml"
services_config = files_path / "services_config.yaml"
bec_lib_path = Path(bec_lib.__file__).resolve().parent
shutil.copyfile(bec_lib_path / "tests" / "test_config.yaml", test_config)
service_config = ServiceConfigModel(
redis={"host": host, "port": port}, file_writer={"base_path": str(files_path)}
)
services_config.write_text(service_config.model_dump_json(indent=4), encoding="utf-8")
return services_config
def _load_demo_config(services_config: Path) -> None:
bec = BECIPythonClient(ServiceConfig(services_config), RedisConnector, forced=True)
bec.start()
try:
bec.config.load_demo_config()
finally:
bec.shutdown()
bec._client._reset_singleton()
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("command", nargs=argparse.REMAINDER)
args = parser.parse_args()
if args.command[:1] == ["--"]:
args.command = args.command[1:]
if not args.command:
raise ValueError("No command provided")
host = "127.0.0.1"
port = 6379
with tempfile.TemporaryDirectory(prefix="bec-benchmark-") as tmp:
files_path = Path(tmp)
services_config = _write_configs(files_path, host, port)
redis_process = _start_redis(files_path, host, port)
processes = None
service_handler = None
try:
_wait_for_redis(host, port)
from bec_server.bec_server_utils.service_handler import ServiceHandler
service_handler = ServiceHandler(
bec_path=files_path, config_path=services_config, interface="subprocess"
)
processes = service_handler.start()
_load_demo_config(services_config)
env = os.environ.copy()
return subprocess.run(args.command, env=env, check=False).returncode
finally:
if service_handler is not None and processes is not None:
service_handler.stop(processes)
redis_process.terminate()
try:
redis_process.wait(timeout=10)
except subprocess.TimeoutExpired:
redis_process.kill()
if __name__ == "__main__":
raise SystemExit(main())
-242
View File
@@ -1,242 +0,0 @@
name: BW Benchmarks
on: [ workflow_call ]
permissions:
contents: read
env:
BENCHMARK_JSON: benchmark-results/current.json
BENCHMARK_BASELINE_JSON: gh-pages-benchmark-data/benchmarks/latest.json
BENCHMARK_SUMMARY: benchmark-results/summary.md
BENCHMARK_COMMAND: "bash .github/scripts/run_benchmarks.sh"
BENCHMARK_THRESHOLD_PERCENT: 20
BENCHMARK_HIGHER_IS_BETTER: false
jobs:
benchmark_attempt:
runs-on: ubuntu-latest
continue-on-error: true
permissions:
contents: read
defaults:
run:
shell: bash -el {0}
strategy:
fail-fast: false
matrix:
attempt: [ 1, 2, 3 ]
env:
BENCHMARK_JSON: benchmark-results/current-${{ matrix.attempt }}.json
BEC_CORE_BRANCH: main
OPHYD_DEVICES_BRANCH: main
PLUGIN_REPO_BRANCH: main
BENCHMARK_PYTEST_DIRS: tests/unit_tests/benchmarks
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
steps:
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
repository: bec-project/bec_widgets
ref: ${{ github.event.pull_request.head.sha || github.sha }}
- name: Set up Conda
uses: conda-incubator/setup-miniconda@v3
with:
auto-update-conda: true
auto-activate-base: true
python-version: "3.11"
- name: Install system dependencies
run: |
sudo apt-get update
sudo apt-get install -y libgl1 libegl1 x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb
sudo apt-get -y install libnss3 libxdamage1 libasound2t64 libatomic1 libxcursor1
sudo apt-get -y install ttyd hyperfine redis-server
- name: Install full e2e environment
run: |
echo -e "\033[35;1m Using branch $BEC_CORE_BRANCH of BEC CORE \033[0;m";
git clone --branch "$BEC_CORE_BRANCH" https://github.com/bec-project/bec.git
echo -e "\033[35;1m Using branch $OPHYD_DEVICES_BRANCH of OPHYD_DEVICES \033[0;m";
git clone --branch "$OPHYD_DEVICES_BRANCH" https://github.com/bec-project/ophyd_devices.git
export OHPYD_DEVICES_PATH=$PWD/ophyd_devices
echo -e "\033[35;1m Using branch $PLUGIN_REPO_BRANCH of bec_testing_plugin \033[0;m";
git clone --branch "$PLUGIN_REPO_BRANCH" https://github.com/bec-project/bec_testing_plugin.git
cd ./bec
conda create -q -n test-environment python=3.11
conda activate test-environment
source ./bin/install_bec_dev.sh -t
cd ../
python -m pip install -e ./ophyd_devices -e .[dev,pyside6] -e ./bec_testing_plugin pytest-benchmark
mkdir -p "$(dirname "$BENCHMARK_JSON")"
python .github/scripts/run_with_bec_servers.py -- bash -lc "$BENCHMARK_COMMAND"
test -s "$BENCHMARK_JSON"
- name: Upload benchmark artifact
uses: actions/upload-artifact@v4
with:
name: bw-benchmark-json-${{ matrix.attempt }}
path: ${{ env.BENCHMARK_JSON }}
benchmark:
needs: [ benchmark_attempt ]
runs-on: ubuntu-latest
permissions:
contents: read
issues: write
pull-requests: write
steps:
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
repository: bec-project/bec_widgets
ref: ${{ github.event.pull_request.head.sha || github.sha }}
- name: Download benchmark attempts
uses: actions/download-artifact@v4
with:
pattern: bw-benchmark-json-*
path: benchmark-results/attempts
merge-multiple: true
- name: Aggregate benchmark attempts
run: |
python .github/scripts/aggregate_benchmarks.py \
--input-dir benchmark-results/attempts \
--output "$BENCHMARK_JSON"
- name: Upload aggregate benchmark artifact
uses: actions/upload-artifact@v4
with:
name: bw-benchmark-json
path: ${{ env.BENCHMARK_JSON }}
- name: Fetch gh-pages benchmark data
run: |
if git ls-remote --exit-code --heads origin gh-pages; then
git clone --depth=1 --branch gh-pages "$GITHUB_SERVER_URL/$GITHUB_REPOSITORY.git" gh-pages-benchmark-data
else
mkdir -p gh-pages-benchmark-data
fi
- name: Compare with latest gh-pages benchmark
id: compare
continue-on-error: true
run: |
if [ ! -s "$BENCHMARK_BASELINE_JSON" ]; then
mkdir -p "$(dirname "$BENCHMARK_SUMMARY")"
{
echo "<!-- bw-benchmark-comment -->"
echo "## Benchmark comparison"
echo
echo "No benchmark baseline was found on gh-pages."
} > "$BENCHMARK_SUMMARY"
exit 0
fi
args=(
--baseline "$BENCHMARK_BASELINE_JSON"
--current "$BENCHMARK_JSON"
--summary "$BENCHMARK_SUMMARY"
--threshold-percent "$BENCHMARK_THRESHOLD_PERCENT"
)
if [ "$BENCHMARK_HIGHER_IS_BETTER" = "true" ]; then
args+=(--higher-is-better)
fi
set +e
python .github/scripts/compare_benchmarks.py "${args[@]}"
status=$?
set -e
if [ ! -s "$BENCHMARK_SUMMARY" ]; then
mkdir -p "$(dirname "$BENCHMARK_SUMMARY")"
{
echo "<!-- bw-benchmark-comment -->"
echo "## Benchmark comparison"
echo
echo "Benchmark comparison failed before writing a summary."
} > "$BENCHMARK_SUMMARY"
fi
exit "$status"
- name: Find existing benchmark PR comment
if: github.event_name == 'pull_request'
id: fc
uses: peter-evans/find-comment@v3
with:
issue-number: ${{ github.event.pull_request.number }}
comment-author: github-actions[bot]
body-includes: "<!-- bw-benchmark-comment -->"
- name: Create or update benchmark PR comment
if: github.event_name == 'pull_request'
uses: peter-evans/create-or-update-comment@v5
with:
issue-number: ${{ github.event.pull_request.number }}
comment-id: ${{ steps.fc.outputs.comment-id }}
body-path: ${{ env.BENCHMARK_SUMMARY }}
edit-mode: replace
- name: Fail on benchmark regression
if: github.event_name == 'pull_request' && steps.compare.outcome == 'failure'
run: exit 1
publish:
needs: [ benchmark ]
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
repository: bec-project/bec_widgets
ref: ${{ github.sha }}
- name: Download aggregate benchmark artifact
uses: actions/download-artifact@v4
with:
name: bw-benchmark-json
path: benchmark-results
- name: Verify aggregate benchmark artifact
run: test -s "$BENCHMARK_JSON"
- name: Prepare gh-pages for publishing
run: |
# Clean up any existing worktree/directory
if [ -d gh-pages-benchmark-data ]; then
git worktree remove gh-pages-benchmark-data --force || rm -rf gh-pages-benchmark-data
fi
if git ls-remote --exit-code --heads origin gh-pages; then
git fetch --depth=1 origin gh-pages
git worktree add gh-pages-benchmark-data FETCH_HEAD
else
git worktree add --detach gh-pages-benchmark-data
git -C gh-pages-benchmark-data checkout --orphan gh-pages
git -C gh-pages-benchmark-data rm -rf .
fi
- name: Publish benchmark data to gh-pages
working-directory: gh-pages-benchmark-data
run: |
mkdir -p benchmarks/history
cp "../$BENCHMARK_JSON" benchmarks/latest.json
cp "../$BENCHMARK_JSON" "benchmarks/history/${GITHUB_SHA}.json"
git config user.name "github-actions[bot]"
git config user.email "41898282+github-actions[bot]@users.noreply.github.com"
git add benchmarks/latest.json "benchmarks/history/${GITHUB_SHA}.json"
git commit -m "Update BW benchmark data for ${GITHUB_SHA}" || exit 0
git push origin HEAD:gh-pages
-21
View File
@@ -45,18 +45,6 @@ jobs:
cd ./bec
pip install pytest pytest-random-order
pytest -v --maxfail=2 --junitxml=report.xml --random-order ./bec_server/tests ./bec_ipython_client/tests/client_tests ./bec_lib/tests
- name: Upload BEC unit test artifacts if job fails
if: failure()
uses: actions/upload-artifact@v4
with:
name: bec-unit-test-artifacts
path: |
./bec/report.xml
./bec/logs/*.log
if-no-files-found: ignore
retention-days: 7
bec-e2e-test:
name: BEC End2End Tests
runs-on: ubuntu-latest
@@ -74,12 +62,3 @@ jobs:
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH }}
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH }}
PYTHON_VERSION: '3.11'
- name: Upload BEC e2e logs if job fails
if: failure()
uses: actions/upload-artifact@v4
with:
name: bec-e2e-test-logs
path: ./_e2e_test_checkout_/bec/logs/*.log
if-no-files-found: ignore
retention-days: 7
+7 -21
View File
@@ -1,29 +1,24 @@
name: Full CI
on:
on:
push:
pull_request:
workflow_dispatch:
inputs:
BEC_WIDGETS_BRANCH:
description: "Branch of BEC Widgets to install"
description: 'Branch of BEC Widgets to install'
required: false
type: string
BEC_CORE_BRANCH:
description: "Branch of BEC Core to install"
description: 'Branch of BEC Core to install'
required: false
type: string
OPHYD_DEVICES_BRANCH:
description: "Branch of Ophyd Devices to install"
description: 'Branch of Ophyd Devices to install'
required: false
type: string
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
permissions:
pull-requests: write
contents: read
jobs:
check_pr_status:
@@ -34,15 +29,6 @@ jobs:
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/formatter.yml
benchmark:
needs: [check_pr_status]
if: needs.check_pr_status.outputs.branch-pr == ''
permissions:
contents: write
issues: write
pull-requests: write
uses: ./.github/workflows/benchmark.yml
unit-test:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
@@ -79,9 +65,9 @@ jobs:
uses: ./.github/workflows/child_repos.yml
with:
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main'}}
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main'}}
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
plugin_repos:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
@@ -91,4 +77,4 @@ jobs:
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
secrets:
GH_READ_TOKEN: ${{ secrets.GH_READ_TOKEN }}
GH_READ_TOKEN: ${{ secrets.GH_READ_TOKEN }}
+9 -10
View File
@@ -9,10 +9,10 @@ jobs:
shell: bash -el {0}
env:
CHILD_PIPELINE_BRANCH: main # Set the branch you want for ophyd_devices
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
PLUGIN_REPO_BRANCH: main # Set the branch you want for the plugin repo
CHILD_PIPELINE_BRANCH: main # Set the branch you want for ophyd_devices
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
PLUGIN_REPO_BRANCH: main # Set the branch you want for the plugin repo
PROJECT_PATH: ${{ github.repository }}
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
@@ -23,16 +23,15 @@ jobs:
- name: Set up Conda
uses: conda-incubator/setup-miniconda@v3
with:
auto-update-conda: true
auto-activate-base: true
python-version: "3.11"
auto-update-conda: true
auto-activate-base: true
python-version: '3.11'
- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y libgl1 libegl1 x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb
sudo apt-get -y install libnss3 libxdamage1 libasound2t64 libatomic1 libxcursor1
sudo apt-get -y install ttyd
- name: Conda install and run pytest
run: |
@@ -55,5 +54,5 @@ jobs:
uses: actions/upload-artifact@v4
with:
name: pytest-logs
path: ./bec/logs/*.log
retention-days: 7
path: ./logs/*.log
retention-days: 7
+13 -12
View File
@@ -1,25 +1,25 @@
name: Run Pytest with different Python versions
on:
on:
workflow_call:
inputs:
pr_number:
description: "Pull request number"
description: 'Pull request number'
required: false
type: number
BEC_CORE_BRANCH:
description: "Branch of BEC Core to install"
description: 'Branch of BEC Core to install'
required: false
default: "main"
default: 'main'
type: string
OPHYD_DEVICES_BRANCH:
description: "Branch of Ophyd Devices to install"
description: 'Branch of Ophyd Devices to install'
required: false
default: "main"
default: 'main'
type: string
BEC_WIDGETS_BRANCH:
description: "Branch of BEC Widgets to install"
description: 'Branch of BEC Widgets to install'
required: false
default: "main"
default: 'main'
type: string
jobs:
@@ -30,14 +30,15 @@ jobs:
python-version: ["3.11", "3.12", "3.13"]
env:
BEC_WIDGETS_BRANCH: main # Set the branch you want for bec_widgets
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
BEC_WIDGETS_BRANCH: main # Set the branch you want for bec_widgets
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
PROJECT_PATH: ${{ github.repository }}
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
steps:
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
@@ -55,4 +56,4 @@ jobs:
- name: Run Pytest
run: |
pip install pytest pytest-random-order
pytest -v --junitxml=report.xml --random-order --ignore=tests/unit_tests/benchmarks ./tests/unit_tests
pytest -v --junitxml=report.xml --random-order ./tests/unit_tests
+12 -10
View File
@@ -1,30 +1,32 @@
name: Run Pytest with Coverage
on:
on:
workflow_call:
inputs:
pr_number:
description: "Pull request number"
description: 'Pull request number'
required: false
type: number
BEC_CORE_BRANCH:
description: "Branch of BEC Core to install"
description: 'Branch of BEC Core to install'
required: false
default: "main"
default: 'main'
type: string
OPHYD_DEVICES_BRANCH:
description: "Branch of Ophyd Devices to install"
description: 'Branch of Ophyd Devices to install'
required: false
default: "main"
default: 'main'
type: string
BEC_WIDGETS_BRANCH:
description: "Branch of BEC Widgets to install"
description: 'Branch of BEC Widgets to install'
required: false
default: "main"
default: 'main'
type: string
secrets:
CODECOV_TOKEN:
required: true
permissions:
pull-requests: write
@@ -53,7 +55,7 @@ jobs:
- name: Run Pytest with Coverage
id: coverage
run: pytest --random-order --cov=bec_widgets --cov-config=pyproject.toml --cov-branch --cov-report=xml --no-cov-on-fail --ignore=tests/unit_tests/benchmarks tests/unit_tests/
run: pytest --random-order --cov=bec_widgets --cov-config=pyproject.toml --cov-branch --cov-report=xml --no-cov-on-fail tests/unit_tests/
- name: Upload test artifacts
uses: actions/upload-artifact@v4
@@ -67,4 +69,4 @@ jobs:
uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
slug: bec-project/bec_widgets
slug: bec-project/bec_widgets
+24 -19
View File
@@ -2,18 +2,7 @@ name: Sync PR to Project
on:
pull_request:
types:
[
opened,
assigned,
unassigned,
edited,
ready_for_review,
converted_to_draft,
reopened,
synchronize,
closed,
]
types: [opened, edited, ready_for_review, converted_to_draft, reopened, synchronize]
jobs:
sync-project:
@@ -24,12 +13,28 @@ jobs:
pull-requests: read
contents: read
env:
PROJECT_NUMBER: 3 # BEC Project
ORG: 'bec-project'
REPO: 'bec_widgets'
TOKEN: ${{ secrets.ADD_ISSUE_TO_PROJECT }}
PR_NUMBER: ${{ github.event.pull_request.number }}
steps:
- name: Sync PR to Project
uses: bec-project/action-issue-sync-pr@v1
- name: Set up python environment
uses: actions/setup-python@v4
with:
token: ${{ secrets.ADD_ISSUE_TO_PROJECT }}
org: ${{ github.repository_owner }}
repo: ${{ github.event.repository.name }}
project-number: 3
pr-number: ${{ github.event.pull_request.number }}
python-version: 3.11
- name: Checkout repo
uses: actions/checkout@v4
with:
repository: ${{ github.repository }}
ref: ${{ github.event.pull_request.head.ref }}
- name: Install dependencies
run: |
pip install -r ./.github/scripts/pr_issue_sync/requirements.txt
- name: Sync PR to Project
run: |
python ./.github/scripts/pr_issue_sync/pr_issue_sync.py
+1 -3
View File
@@ -177,6 +177,4 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
#
tombi.toml
#.idea/
-1470
View File
File diff suppressed because it is too large Load Diff
+2 -1
View File
@@ -192,7 +192,8 @@ Positioner boxes and tweak controls handle precise moves, homing, and calibratio
## Documentation
The documentation can be found [here](https://bec.readthedocs.io/).
Documentation of BEC Widgets can be found [here](https://bec-widgets.readthedocs.io/en/latest/). The documentation of
the BEC can be found [here](https://bec.readthedocs.io/en/latest/).
## License
+18 -12
View File
@@ -1,13 +1,19 @@
import os
import sys
import bec_widgets.widgets.containers.qt_ads as QtAds
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
if sys.platform.startswith("linux"):
qt_platform = os.environ.get("QT_QPA_PLATFORM", "")
if qt_platform != "offscreen":
os.environ["QT_QPA_PLATFORM"] = "xcb"
# Default QtAds configuration
QtAds.CDockManager.setConfigFlag(QtAds.CDockManager.eConfigFlag.FocusHighlighting, True)
QtAds.CDockManager.setConfigFlag(
QtAds.CDockManager.eConfigFlag.RetainTabSizeWhenCloseButtonHidden, True
)
__all__ = ["BECWidget", "SafeSlot", "SafeProperty"]
def __getattr__(name):
if name == "BECWidget":
from bec_widgets.utils.bec_widget import BECWidget
return BECWidget
if name in {"SafeSlot", "SafeProperty"}:
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
return {"SafeSlot": SafeSlot, "SafeProperty": SafeProperty}[name]
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
-15
View File
@@ -1,15 +0,0 @@
import os
import sys
import bec_widgets.widgets.containers.qt_ads as QtAds
if sys.platform.startswith("linux"):
qt_platform = os.environ.get("QT_QPA_PLATFORM", "")
if qt_platform != "offscreen":
os.environ["QT_QPA_PLATFORM"] = "xcb"
# Default QtAds configuration
QtAds.CDockManager.setConfigFlag(QtAds.CDockManager.eConfigFlag.FocusHighlighting, True)
QtAds.CDockManager.setConfigFlag(
QtAds.CDockManager.eConfigFlag.RetainTabSizeWhenCloseButtonHidden, True
)
+4 -35
View File
@@ -1,43 +1,12 @@
from __future__ import annotations
from typing import Literal
from bec_lib import bec_logger
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
logger = bec_logger.logger
from bec_widgets.widgets.containers.dock.dock_area import BECDockArea
def dock_area(
object_name: str | None = None, startup_profile: str | Literal["restore", "skip"] | None = None
) -> BECDockArea:
"""
Create an advanced dock area using Qt Advanced Docking System.
Args:
object_name(str): The name of the advanced dock area.
startup_profile(str | Literal["restore", "skip"] | None): Startup mode for
the workspace:
- None: start empty
- "restore": restore last used profile
- "skip": do not initialize profile state
- "<name>": load specific profile
Returns:
BECDockArea: The created advanced dock area.
"""
widget = BECDockArea(
object_name=object_name,
root_widget=True,
profile_namespace="bec",
startup_profile=startup_profile,
)
logger.info(f"Created advanced dock area with startup_profile: {startup_profile}")
return widget
def dock_area(object_name: str | None = None) -> BECDockArea:
_dock_area = BECDockArea(object_name=object_name, root_widget=True)
return _dock_area
def auto_update_dock_area(object_name: str | None = None) -> AutoUpdates:
+57 -214
View File
@@ -20,19 +20,17 @@ from qtpy.QtWidgets import (
)
import bec_widgets
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets
from bec_widgets.utils.container_utils import WidgetContainerUtils
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.name_utils import pascal_to_snake
from bec_widgets.utils.plugin_utils import get_plugin_auto_updates
from bec_widgets.utils.round_frame import RoundedFrame
from bec_widgets.utils.rpc_register import RPCRegister
from bec_widgets.utils.screen_utils import apply_window_geometry, centered_geometry_for_app
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.utils.ui_loader import UILoader
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
from bec_widgets.widgets.containers.dock_area.profile_utils import get_last_profile, list_profiles
from bec_widgets.widgets.containers.dock.dock_area import BECDockArea
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow, BECMainWindowNoRPC
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
@@ -43,7 +41,6 @@ if TYPE_CHECKING: # pragma: no cover
logger = bec_logger.logger
MODULE_PATH = os.path.dirname(bec_widgets.__file__)
START_EMPTY_PROFILE_OPTION = "Start Empty (No Profile)"
class LaunchTile(RoundedFrame):
@@ -77,28 +74,23 @@ class LaunchTile(RoundedFrame):
circular_pixmap.fill(Qt.transparent)
painter = QPainter(circular_pixmap)
painter.setRenderHints(QPainter.RenderHint.Antialiasing, True)
painter.setRenderHints(QPainter.Antialiasing, True)
path = QPainterPath()
path.addEllipse(0, 0, size, size)
painter.setClipPath(path)
pixmap = pixmap.scaled(
size,
size,
Qt.AspectRatioMode.KeepAspectRatio,
Qt.TransformationMode.SmoothTransformation,
)
pixmap = pixmap.scaled(size, size, Qt.KeepAspectRatio, Qt.SmoothTransformation)
painter.drawPixmap(0, 0, pixmap)
painter.end()
self.icon_label.setPixmap(circular_pixmap)
self.layout.addWidget(self.icon_label, alignment=Qt.AlignmentFlag.AlignCenter)
self.layout.addWidget(self.icon_label, alignment=Qt.AlignCenter)
# Top label
self.top_label = QLabel(top_label.upper())
font_top = self.top_label.font()
font_top.setPointSize(10)
self.top_label.setFont(font_top)
self.layout.addWidget(self.top_label, alignment=Qt.AlignmentFlag.AlignCenter)
self.layout.addWidget(self.top_label, alignment=Qt.AlignCenter)
# Main label
self.main_label = QLabel(main_label)
@@ -108,7 +100,7 @@ class LaunchTile(RoundedFrame):
font_main.setPointSize(14)
font_main.setBold(True)
self.main_label.setFont(font_main)
self.main_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.main_label.setAlignment(Qt.AlignCenter)
# Shrink font if the default would wrap on this platform / DPI
content_width = (
@@ -124,13 +116,13 @@ class LaunchTile(RoundedFrame):
self.layout.addWidget(self.main_label)
self.spacer_top = QSpacerItem(0, 10, QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed)
self.spacer_top = QSpacerItem(0, 10, QSizePolicy.Fixed, QSizePolicy.Fixed)
self.layout.addItem(self.spacer_top)
# Description
self.description_label = QLabel(description)
self.description_label.setWordWrap(True)
self.description_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.description_label.setAlignment(Qt.AlignCenter)
self.layout.addWidget(self.description_label)
# Selector
@@ -140,14 +132,13 @@ class LaunchTile(RoundedFrame):
else:
self.selector = None
self.spacer_bottom = QSpacerItem(
0, 0, QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Expanding
)
self.spacer_bottom = QSpacerItem(0, 0, QSizePolicy.Fixed, QSizePolicy.Expanding)
self.layout.addItem(self.spacer_bottom)
# Action button
self.action_button = QPushButton("Open")
self.action_button.setStyleSheet("""
self.action_button.setStyleSheet(
"""
QPushButton {
background-color: #007AFF;
border: none;
@@ -159,8 +150,9 @@ class LaunchTile(RoundedFrame):
QPushButton:hover {
background-color: #005BB5;
}
""")
self.layout.addWidget(self.action_button, alignment=Qt.AlignmentFlag.AlignCenter)
"""
)
self.layout.addWidget(self.action_button, alignment=Qt.AlignCenter)
def _fit_label_to_width(self, label: QLabel, max_width: int, min_pt: int = 10):
"""
@@ -183,31 +175,21 @@ class LaunchTile(RoundedFrame):
metrics = QFontMetrics(font)
label.setFont(font)
label.setWordWrap(False)
label.setText(metrics.elidedText(label.text(), Qt.TextElideMode.ElideRight, max_width))
label.setText(metrics.elidedText(label.text(), Qt.ElideRight, max_width))
class LaunchWindow(BECMainWindow):
RPC = True
PLUGIN = False
TILE_SIZE = (250, 300)
DEFAULT_LAUNCH_SIZE = (800, 600)
USER_ACCESS = ["show_launcher", "hide_launcher"]
def __init__(
self,
parent=None,
gui_id: str = None,
window_title="BEC Launcher",
launch_gui_class: str = None,
launch_gui_id: str = None,
*args,
**kwargs,
self, parent=None, gui_id: str = None, window_title="BEC Launcher", *args, **kwargs
):
super().__init__(parent=parent, gui_id=gui_id, window_title=window_title, **kwargs)
self.app = QApplication.instance()
self.tiles: dict[str, LaunchTile] = {}
self._logged_unparented_connections: set[str] = set()
# Track the smallest mainlabel font size chosen so far
self._min_main_label_pt: int | None = None
@@ -216,7 +198,7 @@ class LaunchWindow(BECMainWindow):
self.toolbar = ModularToolBar(parent=self)
self.addToolBar(Qt.TopToolBarArea, self.toolbar)
self.spacer = QWidget(self)
self.spacer.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
self.spacer.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
self.toolbar.addWidget(self.spacer)
self.toolbar.addWidget(self.dark_mode_button)
@@ -229,13 +211,11 @@ class LaunchWindow(BECMainWindow):
name="dock_area",
icon_path=os.path.join(MODULE_PATH, "assets", "app_icons", "bec_widgets_icon.png"),
top_label="Get started",
main_label="BEC Advanced Dock Area",
description="Flexible application for managing modular widgets and user profiles.",
action_button=self._open_dock_area,
show_selector=True,
selector_items=list_profiles("bec"),
main_label="BEC Dock Area",
description="Highly flexible and customizable dock area application with modular widgets.",
action_button=lambda: self.launch("dock_area"),
show_selector=False,
)
self._refresh_dock_area_profiles(preserve_selection=False)
self.available_auto_updates: dict[str, type[AutoUpdates]] = (
self._update_available_auto_updates()
@@ -285,11 +265,6 @@ class LaunchWindow(BECMainWindow):
self.register.callbacks.append(self._turn_off_the_lights)
self.register.broadcast()
if launch_gui_class and launch_gui_id:
# If a specific gui class is provided, launch it and hide the launcher
self.launch(launch_gui_class, name=launch_gui_id)
self.hide()
def register_tile(
self,
name: str,
@@ -325,7 +300,7 @@ class LaunchWindow(BECMainWindow):
)
tile.setFixedWidth(self.TILE_SIZE[0])
tile.setMinimumHeight(self.TILE_SIZE[1])
tile.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.MinimumExpanding)
tile.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.MinimumExpanding)
if action_button:
tile.action_button.clicked.connect(action_button)
if show_selector and selector_items:
@@ -351,73 +326,6 @@ class LaunchWindow(BECMainWindow):
self.tiles[name] = tile
def _refresh_dock_area_profiles(self, preserve_selection: bool = True) -> None:
"""
Refresh the dock-area profile selector, optionally preserving the selection.
Defaults to Start Empty when no valid selection can be preserved.
Args:
preserve_selection(bool): Whether to preserve the current selection or not.
"""
tile = self.tiles.get("dock_area")
if tile is None or tile.selector is None:
return
selector = tile.selector
selected_text = (
selector.currentText().strip() if preserve_selection and selector.count() > 0 else ""
)
profiles = list_profiles("bec")
selector_items = [START_EMPTY_PROFILE_OPTION, *profiles]
selector.blockSignals(True)
selector.clear()
for profile in selector_items:
selector.addItem(profile)
if selected_text:
# Try to preserve the current selection
idx = selector.findText(selected_text, Qt.MatchFlag.MatchExactly)
if idx >= 0:
selector.setCurrentIndex(idx)
else:
# Selection no longer exists, fall back to default startup selection.
self._set_selector_to_default_profile(selector, profiles)
else:
# No selection to preserve, use default startup selection.
self._set_selector_to_default_profile(selector, profiles)
selector.blockSignals(False)
def _set_selector_to_default_profile(self, selector: QComboBox, profiles: list[str]) -> None:
"""
Set the selector default.
Preference order:
1) Start Empty option (if available)
2) Last used profile
3) First available profile
Args:
selector(QComboBox): The combobox to set.
profiles(list[str]): List of available profiles.
"""
start_empty_idx = selector.findText(START_EMPTY_PROFILE_OPTION, Qt.MatchFlag.MatchExactly)
if start_empty_idx >= 0:
selector.setCurrentIndex(start_empty_idx)
return
# Try to get last used profile
last_profile = get_last_profile(namespace="bec")
if last_profile and last_profile in profiles:
idx = selector.findText(last_profile, Qt.MatchFlag.MatchExactly)
if idx >= 0:
selector.setCurrentIndex(idx)
return
# If nothing else, select first item
if selector.count() > 0:
selector.setCurrentIndex(0)
def launch(
self,
launch_script: str,
@@ -439,14 +347,14 @@ class LaunchWindow(BECMainWindow):
from bec_widgets.applications import bw_launch
with RPCRegister.delayed_broadcast() as rpc_register:
if geometry is None and launch_script != "custom_ui_file":
geometry = self._default_launch_geometry()
existing_dock_areas = rpc_register.get_names_of_rpc_by_class_type(BECDockArea)
if name is not None:
WidgetContainerUtils.raise_for_invalid_name(name)
# If name already exists, generate a unique one with counter suffix
if name in existing_dock_areas:
name = WidgetContainerUtils.generate_unique_name(name, existing_dock_areas)
raise ValueError(
f"Name {name} must be unique for dock areas, but already exists: {existing_dock_areas}."
)
WidgetContainerUtils.raise_for_invalid_name(name)
else:
name = "dock_area"
name = WidgetContainerUtils.generate_unique_name(name, existing_dock_areas)
@@ -464,31 +372,32 @@ class LaunchWindow(BECMainWindow):
if launch_script == "auto_update":
auto_update = kwargs.pop("auto_update", None)
return self._launch_auto_update(auto_update, geometry=geometry)
return self._launch_auto_update(auto_update)
if launch_script == "widget":
widget = kwargs.pop("widget", None)
if widget is None:
raise ValueError("Widget name must be provided.")
return self._launch_widget(widget, geometry=geometry)
return self._launch_widget(widget)
launch = getattr(bw_launch, launch_script, None)
if launch is None:
raise ValueError(f"Launch script {launch_script} not found.")
result_widget = launch(name, **kwargs)
result_widget = launch(name)
result_widget.resize(result_widget.minimumSizeHint())
# TODO Should we simply use the specified name as title here?
result_widget.window().setWindowTitle(f"BEC - {name}")
logger.info(f"Created new dock area: {name}")
if geometry is not None:
result_widget.setGeometry(*geometry)
if isinstance(result_widget, BECMainWindow):
apply_window_geometry(result_widget, geometry)
result_widget.show()
else:
window = BECMainWindowNoRPC()
window.setCentralWidget(result_widget)
window.setWindowTitle(f"BEC - {result_widget.objectName()}")
apply_window_geometry(window, geometry)
window.show()
return result_widget
@@ -523,15 +432,13 @@ class LaunchWindow(BECMainWindow):
window = BECMainWindow(object_name=filename)
window.setCentralWidget(loaded)
QApplication.processEvents()
window.setWindowTitle(f"BEC - {filename}")
apply_window_geometry(window, None)
window.show()
logger.info(f"Launched custom UI: {filename}, type: {type(window).__name__}")
return window
def _launch_auto_update(
self, auto_update: str, geometry: tuple[int, int, int, int] | None = None
) -> AutoUpdates:
def _launch_auto_update(self, auto_update: str) -> AutoUpdates:
if auto_update in self.available_auto_updates:
auto_update_cls = self.available_auto_updates[auto_update]
window = auto_update_cls()
@@ -541,14 +448,12 @@ class LaunchWindow(BECMainWindow):
window = AutoUpdates()
window.resize(window.minimumSizeHint())
QApplication.processEvents()
window.setWindowTitle(f"BEC - {window.objectName()}")
apply_window_geometry(window, geometry)
window.show()
return window
def _launch_widget(
self, widget: type[BECWidget], geometry: tuple[int, int, int, int] | None = None
) -> QWidget:
def _launch_widget(self, widget: type[BECWidget]) -> QWidget:
name = pascal_to_snake(widget.__name__)
WidgetContainerUtils.raise_for_invalid_name(name)
@@ -557,11 +462,11 @@ class LaunchWindow(BECMainWindow):
widget_instance = widget(root_widget=True, object_name=name)
assert isinstance(widget_instance, QWidget)
QApplication.processEvents()
window.setCentralWidget(widget_instance)
window.resize(window.minimumSizeHint())
window.setWindowTitle(f"BEC - {widget_instance.objectName()}")
apply_window_geometry(window, geometry)
window.show()
return window
@@ -586,21 +491,6 @@ class LaunchWindow(BECMainWindow):
auto_update = None
return self.launch("auto_update", auto_update=auto_update)
def _open_dock_area(self):
"""
Open Advanced Dock Area using the selected profile.
"""
tile = self.tiles.get("dock_area")
if tile is None or tile.selector is None:
startup_profile = None
else:
selection = tile.selector.currentText().strip()
if selection == START_EMPTY_PROFILE_OPTION:
startup_profile = None
else:
startup_profile = selection if selection else None
return self.launch("dock_area", startup_profile=startup_profile)
def _open_widget(self):
"""
Open a widget from the available widgets.
@@ -612,10 +502,6 @@ class LaunchWindow(BECMainWindow):
raise ValueError(f"Widget {widget} not found in available widgets.")
return self.launch("widget", widget=self.available_widgets[widget])
def _default_launch_geometry(self) -> tuple[int, int, int, int] | None:
width, height = self.DEFAULT_LAUNCH_SIZE
return centered_geometry_for_app(width=width, height=height)
@SafeSlot(popup_error=True)
def _open_custom_ui_file(self):
"""
@@ -652,96 +538,53 @@ class LaunchWindow(BECMainWindow):
self.hide()
def showEvent(self, event):
self._refresh_dock_area_profiles()
super().showEvent(event)
self.setFixedSize(self.size())
def _has_external_window(self, connections: dict) -> bool:
def _launcher_is_last_widget(self, connections: dict) -> bool:
"""
Check if any registered non-launcher connection owns a top-level Qt window.
Check if the launcher is the last widget in the application.
"""
for connection in connections.values():
if self._connection_belongs_to_launcher(connection):
continue
if isinstance(connection, QWidget) and connection.isWindow():
return True
return False
def _log_unparented_connections(self, connections: dict) -> None:
"""
Log non-launcher RPC connections that remain without an active top-level window.
"""
for connection in connections.values():
if self._connection_belongs_to_launcher(connection):
continue
if isinstance(connection, QWidget) and connection.isWindow():
continue
connection_description = (
f"type={type(connection).__name__} objectName={connection.objectName()!r} "
f"gui_id={connection.gui_id!r}"
)
if connection_description in self._logged_unparented_connections:
continue
self._logged_unparented_connections.add(connection_description)
logger.warning(
"Registered non-launcher RPC connection has no active top-level window: "
f"{connection_description}"
)
def _connection_belongs_to_launcher(self, connection: QObject) -> bool:
"""
Check whether a registered connection is the launcher itself or part of its Qt hierarchy.
"""
if connection is self or connection.gui_id == self.gui_id:
return True
parent = connection.parent()
while parent is not None:
if parent is self:
return True
parent = parent.parent()
return False
remaining_connections = [
connection for connection in connections.values() if connection.parent_id != self.gui_id
]
return len(remaining_connections) <= 4
def _turn_off_the_lights(self, connections: dict):
"""
If there is only one connection remaining, it is the launcher, so we show it.
Once the launcher is closed as the last window, we quit the application.
"""
if self._has_external_window(connections):
self.hide()
if self._launcher_is_last_widget(connections):
self.show()
self.activateWindow()
self.raise_()
if self.app:
self.app.setQuitOnLastWindowClosed(False) # type: ignore
self.app.setQuitOnLastWindowClosed(True) # type: ignore
return
self._log_unparented_connections(connections)
self.show()
self.activateWindow()
self.raise_()
self.hide()
if self.app:
self.app.setQuitOnLastWindowClosed(True) # type: ignore
self.app.setQuitOnLastWindowClosed(False) # type: ignore
def closeEvent(self, event):
"""
Close the launcher window.
"""
connections = self.register.list_all_connections()
if self._has_external_window(connections):
event.ignore()
self.hide()
if self._launcher_is_last_widget(connections):
event.accept()
return
event.accept()
event.ignore()
self.hide()
if __name__ == "__main__": # pragma: no cover
if __name__ == "__main__":
import sys
from bec_widgets.utils.colors import apply_theme
app = QApplication(sys.argv)
apply_theme("dark")
launcher = LaunchWindow()
launcher.show()
sys.exit(app.exec())
+39 -226
View File
@@ -1,29 +1,17 @@
from bec_qthemes import material_icon
from qtpy.QtGui import QAction # type: ignore
from qtpy.QtWidgets import QApplication, QHBoxLayout, QStackedWidget, QWidget
from bec_widgets.applications.navigation_centre.reveal_animator import ANIMATION_DURATION
from bec_widgets.applications.navigation_centre.side_bar import SideBar
from bec_widgets.applications.navigation_centre.side_bar_components import NavigationItem
from bec_widgets.applications.views.admin_view.admin_view import AdminView
from bec_widgets.applications.views.developer_view.developer_view import DeveloperView
from bec_widgets.applications.views.device_manager_view.device_manager_view import DeviceManagerView
from bec_widgets.applications.views.dock_area_view.dock_area_view import DockAreaView
from bec_widgets.applications.views.view import ViewBase, WaveformViewInline, WaveformViewPopup
from bec_widgets.utils.colors import apply_theme
from bec_widgets.utils.guided_tour import GuidedTour
from bec_widgets.utils.name_utils import sanitize_namespace
from bec_widgets.utils.screen_utils import (
apply_centered_size,
available_screen_geometry,
main_app_size_for_screen,
)
from bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area import AdvancedDockArea
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow
class BECMainApp(BECMainWindow):
RPC = False
PLUGIN = False
def __init__(
self,
@@ -55,58 +43,53 @@ class BECMainApp(BECMainWindow):
self._add_views()
# Initialize guided tour
self.guided_tour = GuidedTour(self)
self._setup_guided_tour()
def _add_views(self):
self.add_section("BEC Applications", "bec_apps")
self.dock_area = DockAreaView(self)
self.ads = AdvancedDockArea(
self, profile_namespace="main_workspace", auto_profile_namespace=False
)
self.ads.setObjectName("MainWorkspace")
self.device_manager = DeviceManagerView(self)
# self.developer_view = DeveloperView(self) #TODO temporary disable until the bugs with BECShell are resolved
self.admin_view = AdminView(self)
self.developer_view = DeveloperView(self)
self.add_view(icon="widgets", title="Dock Area", widget=self.dock_area, mini_text="Docks")
self.add_view(
icon="widgets", title="Dock Area", id="dock_area", widget=self.ads, mini_text="Docks"
)
self.add_view(
icon="display_settings",
title="Device Manager",
id="device_manager",
widget=self.device_manager,
mini_text="DM",
)
# TODO temporary disable until the bugs with BECShell are resolved
# self.add_view(
# icon="code_blocks",
# title="IDE",
# widget=self.developer_view,
# mini_text="IDE",
# exclusive=True,
# )
self.add_view(
icon="admin_panel_settings",
title="Admin View",
widget=self.admin_view,
mini_text="Admin",
from_top=False,
icon="code_blocks",
title="IDE",
widget=self.developer_view,
id="developer_view",
exclusive=True,
)
if self._show_examples:
self.add_section("Examples", "examples")
waveform_view_popup = WaveformViewPopup(
parent=self, view_id="waveform_view_popup", title="Waveform Plot"
parent=self, id="waveform_view_popup", title="Waveform Plot"
)
waveform_view_stack = WaveformViewInline(
parent=self, view_id="waveform_view_stack", title="Waveform Plot"
parent=self, id="waveform_view_stack", title="Waveform Plot"
)
self.add_view(
icon="show_chart",
title="Waveform With Popup",
id="waveform_popup",
widget=waveform_view_popup,
mini_text="Popup",
)
self.add_view(
icon="show_chart",
title="Waveform InLine Stack",
id="waveform_stack",
widget=waveform_view_stack,
mini_text="Stack",
)
@@ -114,9 +97,6 @@ class BECMainApp(BECMainWindow):
self.set_current("dock_area")
self.sidebar.add_dark_mode_item()
# Add guided tour to Help menu
self._add_guided_tour_to_menu()
# --- Public API ------------------------------------------------------
def add_section(self, title: str, id: str, position: int | None = None):
return self.sidebar.add_section(title, id, position)
@@ -132,7 +112,7 @@ class BECMainApp(BECMainWindow):
*,
icon: str,
title: str,
view_id: str | None = None,
id: str,
widget: QWidget,
mini_text: str | None = None,
position: int | None = None,
@@ -146,8 +126,7 @@ class BECMainApp(BECMainWindow):
Args:
icon(str): Icon name for the nav item.
title(str): Title for the nav item.
view_id(str, optional): Unique ID for the view/item. If omitted, uses mini_text;
if mini_text is also omitted, uses title.
id(str): Unique ID for the view/item.
widget(QWidget): The widget to add to the stack.
mini_text(str, optional): Short text for the nav item when sidebar is collapsed.
position(int, optional): Position to insert the nav item.
@@ -160,11 +139,10 @@ class BECMainApp(BECMainWindow):
"""
resolved_id = sanitize_namespace(view_id or mini_text or title)
item = self.sidebar.add_item(
icon=icon,
title=title,
id=resolved_id,
id=id,
mini_text=mini_text,
position=position,
from_top=from_top,
@@ -174,15 +152,13 @@ class BECMainApp(BECMainWindow):
# Wrap plain widgets into a ViewBase so enter/exit hooks are available
if isinstance(widget, ViewBase):
view_widget = widget
view_widget.view_id = resolved_id
view_widget.view_id = id
view_widget.view_title = title
else:
view_widget = ViewBase(content=widget, parent=self, view_id=resolved_id, title=title)
view_widget.change_object_name(resolved_id)
view_widget = ViewBase(content=widget, parent=self, id=id, title=title)
idx = self.stack.addWidget(view_widget)
self._view_index[resolved_id] = idx
self._view_index[id] = idx
return item
def set_current(self, id: str) -> None:
@@ -191,12 +167,6 @@ class BECMainApp(BECMainWindow):
# Internal: route sidebar selection to the stack
def _on_view_selected(self, vid: str) -> None:
# Special handling for views that can not be switched to (e.g. dark mode toggle)
# Not registered as proper view with a stack index, so we ignore any logic below
# as it will anyways not result in a stack switch.
idx = self._view_index.get(vid)
if idx is None or not (0 <= idx < self.stack.count()):
return
# Determine current view
current_index = self.stack.currentIndex()
current_view = (
@@ -222,167 +192,8 @@ class BECMainApp(BECMainWindow):
if hasattr(new_view, "on_enter"):
new_view.on_enter()
def _setup_guided_tour(self):
"""
Setup the guided tour for the main application.
Registers key UI components and delegates to views for their internal components.
"""
tour_steps = []
# --- General Layout Components ---
# Register the sidebar toggle button
toggle_step = self.guided_tour.register_widget(
widget=self.sidebar.toggle,
title="Sidebar Toggle",
text="Click this button to expand or collapse the sidebar. When expanded, you can see full navigation item titles and section names.",
)
tour_steps.append(toggle_step)
# Register the sidebar icons
sidebar_dock_area = self.sidebar.components.get("dock_area")
if sidebar_dock_area:
dock_step = self.guided_tour.register_widget(
widget=sidebar_dock_area,
title="Dock Area View",
text="Click here to access the Dock Area view, where you can manage and arrange your dockable panels.",
)
tour_steps.append(dock_step)
sidebar_device_manager = self.sidebar.components.get("device_manager")
if sidebar_device_manager:
device_manager_step = self.guided_tour.register_widget(
widget=sidebar_device_manager,
title="Device Manager View",
text="Click here to open the Device Manager view, where you can view and manage device configs.",
)
tour_steps.append(device_manager_step)
sidebar_developer_view = self.sidebar.components.get("developer_view")
if sidebar_developer_view:
developer_view_step = self.guided_tour.register_widget(
widget=sidebar_developer_view,
title="Developer View",
text="Click here to access the Developer view to write scripts and macros.",
)
tour_steps.append(developer_view_step)
# Register the dark mode toggle
dark_mode_item = self.sidebar.components.get("dark_mode")
if dark_mode_item:
dark_mode_step = self.guided_tour.register_widget(
widget=dark_mode_item,
title="Theme Toggle",
text="Switch between light and dark themes. The theme preference is saved and will be applied when you restart the application.",
)
tour_steps.append(dark_mode_step)
# Register the client info label
if hasattr(self, "_client_info_hover"):
client_info_step = self.guided_tour.register_widget(
widget=self._client_info_hover,
title="Client Status",
text="Displays status messages and information from the BEC Server.",
)
tour_steps.append(client_info_step)
# Register the scan progress bar if available
if hasattr(self, "_scan_progress_hover"):
progress_step = self.guided_tour.register_widget(
widget=self._scan_progress_hover,
title="Scan Progress",
text="Monitor the progress of ongoing scans. Hover over the progress bar to see detailed information including elapsed time and estimated completion.",
)
tour_steps.append(progress_step)
# Register the notification indicator in the status bar
if hasattr(self, "notification_indicator"):
notif_step = self.guided_tour.register_widget(
widget=self.notification_indicator,
title="Notification Center",
text="View system notifications, errors, and status updates. Click to filter notifications by type or expand to see all details.",
)
tour_steps.append(notif_step)
# --- View-Specific Components ---
# Register all views that can extend the tour
for view_id, view_index in self._view_index.items():
view_widget = self.stack.widget(view_index)
if not view_widget or not hasattr(view_widget, "register_tour_steps"):
continue
# Get the view's tour steps
view_tour = view_widget.register_tour_steps(self.guided_tour, self)
if view_tour is None:
if hasattr(view_widget.content, "register_tour_steps"):
view_tour = view_widget.content.register_tour_steps(self.guided_tour, self)
if view_tour is None:
continue
# Get the corresponding sidebar navigation item
nav_item = self.sidebar.components.get(view_id)
if not nav_item:
continue
# Use the view's title for the navigation button
nav_step = self.guided_tour.register_widget(
widget=nav_item,
title=view_tour.view_title,
text=f"Let's explore the features of the {view_tour.view_title}.",
)
tour_steps.append(nav_step)
tour_steps.extend(view_tour.step_ids)
# Create the tour with all registered steps
if tour_steps:
self.guided_tour.create_tour(tour_steps)
def start_guided_tour(self):
"""
Public method to start the guided tour.
This can be called programmatically or connected to a menu/button action.
"""
self.guided_tour.start_tour()
def _add_guided_tour_to_menu(self):
"""
Add a 'Guided Tour' action to the Help menu.
"""
# Find the Help menu
menu_bar = self.menuBar()
help_menu = None
for action in menu_bar.actions():
if action.text() == "Help":
help_menu = action.menu()
break
if help_menu:
# Add separator before the tour action
help_menu.addSeparator()
# Create and add the guided tour action
tour_action = QAction("Start Guided Tour", self)
tour_action.setIcon(material_icon("help"))
tour_action.triggered.connect(self.start_guided_tour)
tour_action.setShortcut("F1") # Add keyboard shortcut
help_menu.addAction(tour_action)
def cleanup(self):
for view_id, idx in self._view_index.items():
view = self.stack.widget(idx)
view.close()
view.deleteLater()
super().cleanup()
def main(): # pragma: no cover
"""
Main function to run the BEC main application, exposed as a script entry point through
pyproject.toml.
"""
# pylint: disable=import-outside-toplevel
if __name__ == "__main__": # pragma: no cover
import argparse
import sys
@@ -394,21 +205,23 @@ def main(): # pragma: no cover
args, qt_args = parser.parse_known_args(sys.argv[1:])
app = QApplication([sys.argv[0], *qt_args])
app.setApplicationName("BEC")
apply_theme("dark")
w = BECMainApp(show_examples=args.examples)
screen_geometry = available_screen_geometry()
if screen_geometry is not None:
width, height = main_app_size_for_screen(screen_geometry)
apply_centered_size(w, width, height, available=screen_geometry)
else:
w.resize(w.minimumSizeHint())
screen = app.primaryScreen()
screen_geometry = screen.availableGeometry()
screen_width = screen_geometry.width()
screen_height = screen_geometry.height()
# 70% of screen height, keep 16:9 ratio
height = int(screen_height * 0.9)
width = int(height * (16 / 9))
# If width exceeds screen width, scale down
if width > screen_width * 0.9:
width = int(screen_width * 0.9)
height = int(width / (16 / 9))
w.resize(width, height)
w.show()
sys.exit(app.exec())
if __name__ == "__main__": # pragma: no cover
main()
@@ -127,10 +127,12 @@ class NavigationItem(QWidget):
self._icon_size_expanded = QtCore.QSize(26, 26)
self.icon_btn.setIconSize(self._icon_size_collapsed)
# Remove QToolButton hover/pressed background/outline
self.icon_btn.setStyleSheet("""
self.icon_btn.setStyleSheet(
"""
QToolButton:hover { background: transparent; border: none; }
QToolButton:pressed { background: transparent; border: none; }
""")
"""
)
# Mini label below icon
self.mini_lbl = QLabel(self._mini_text, self)
@@ -1,35 +0,0 @@
"""Module for Admin View."""
from qtpy.QtWidgets import QWidget
from bec_widgets.applications.views.view import ViewBase
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.services.bec_atlas_admin_view.bec_atlas_admin_view import BECAtlasAdminView
class AdminView(ViewBase):
"""
A view for administrators to change the current active experiment, manage messaging
services, and more tasks reserved for users with admin privileges.
"""
def __init__(
self,
parent: QWidget | None = None,
content: QWidget | None = None,
*,
view_id: str | None = None,
title: str | None = None,
**kwargs,
):
super().__init__(parent=parent, content=content, view_id=view_id, title=title)
self.admin_widget = BECAtlasAdminView(parent=self)
self.set_content(self.admin_widget)
@SafeSlot()
def on_exit(self) -> None:
"""Called before the view is hidden.
Default implementation does nothing. Override in subclasses.
"""
self.admin_widget.logout()
@@ -1,7 +1,7 @@
from qtpy.QtWidgets import QWidget
from bec_widgets.applications.views.developer_view.developer_widget import DeveloperWidget
from bec_widgets.applications.views.view import ViewBase, ViewTourSteps
from bec_widgets.applications.views.view import ViewBase
class DeveloperView(ViewBase):
@@ -14,89 +14,13 @@ class DeveloperView(ViewBase):
parent: QWidget | None = None,
content: QWidget | None = None,
*,
view_id: str | None = None,
id: str | None = None,
title: str | None = None,
**kwargs,
):
super().__init__(parent=parent, content=content, view_id=view_id, title=title, **kwargs)
super().__init__(parent=parent, content=content, id=id, title=title)
self.developer_widget = DeveloperWidget(parent=self)
self.set_content(self.developer_widget)
def register_tour_steps(self, guided_tour, main_app) -> ViewTourSteps | None:
"""Register Developer View components with the guided tour.
Args:
guided_tour: The GuidedTour instance to register with.
main_app: The main application instance (for accessing set_current).
Returns:
ViewTourSteps | None: Model containing view title and step IDs.
"""
step_ids = []
dev_widget = self.developer_widget
# IDE Toolbar
def get_ide_toolbar():
main_app.set_current("developer_view")
return (dev_widget.toolbar, None)
step_id = guided_tour.register_widget(
widget=get_ide_toolbar,
title="IDE Toolbar",
text="Quick access to save files, execute scripts, and configure IDE settings. Use the toolbar to manage your code and execution.",
)
step_ids.append(step_id)
# IDE Explorer
def get_ide_explorer():
main_app.set_current("developer_view")
return (dev_widget.explorer_dock.widget(), None)
step_id = guided_tour.register_widget(
widget=get_ide_explorer,
title="File Explorer",
text="Browse and manage your macro files. Create new files, open existing ones, and organize your scripts.",
)
step_ids.append(step_id)
# IDE Editor
def get_ide_editor():
main_app.set_current("developer_view")
return (dev_widget.monaco_dock.widget(), None)
step_id = guided_tour.register_widget(
widget=get_ide_editor,
title="Code Editor",
text="Write and edit Python code with syntax highlighting, auto-completion, and signature help. Monaco editor provides a modern coding experience.",
)
step_ids.append(step_id)
# IDE Console
def get_ide_console():
main_app.set_current("developer_view")
return (dev_widget.console_dock.widget(), None)
step_id = guided_tour.register_widget(
widget=get_ide_console,
title="BEC Shell Console",
text="Interactive Python console with BEC integration. Execute commands, test code snippets, and interact with the BEC system in real-time.",
)
step_ids.append(step_id)
# IDE Plotting Area
def get_ide_plotting():
main_app.set_current("developer_view")
return (dev_widget.plotting_ads, None)
step_id = guided_tour.register_widget(
widget=get_ide_plotting,
title="Plotting Area",
text="View plots and visualizations generated by your scripts. Arrange multiple plots in a flexible layout.",
)
step_ids.append(step_id)
return ViewTourSteps(view_title="Developer View", step_ids=step_ids)
if __name__ == "__main__":
import sys
@@ -126,11 +50,7 @@ if __name__ == "__main__":
_app.resize(width, height)
developer_view = DeveloperView()
_app.add_view(
icon="code_blocks",
title="IDE",
widget=developer_view,
view_id="developer_view",
exclusive=True,
icon="code_blocks", title="IDE", widget=developer_view, id="developer_view", exclusive=True
)
_app.show()
# developer_view.show()
@@ -1,5 +1,3 @@
from __future__ import annotations
import re
import markdown
@@ -13,12 +11,11 @@ from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.toolbars.actions import MaterialIconAction
from bec_widgets.utils.toolbars.bundles import ToolbarBundle
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.widgets.containers.dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
from bec_widgets.widgets.containers.qt_ads import CDockWidget
from bec_widgets.widgets.editors.bec_console.bec_console import BecConsole, BECShell
from bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area import AdvancedDockArea
from bec_widgets.widgets.containers.advanced_dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.editors.monaco.monaco_dock import MonacoDock
from bec_widgets.widgets.editors.monaco.monaco_widget import MonacoWidget
from bec_widgets.widgets.editors.web_console.web_console import WebConsole
from bec_widgets.widgets.utility.ide_explorer.ide_explorer import IDEExplorer
@@ -79,8 +76,6 @@ def markdown_to_html(md_text: str) -> str:
class DeveloperWidget(DockAreaWidget):
RPC = False
PLUGIN = False
def __init__(self, parent=None, **kwargs):
super().__init__(parent=parent, variant="compact", **kwargs)
@@ -93,15 +88,14 @@ class DeveloperWidget(DockAreaWidget):
# Initialize the widgets
self.explorer = IDEExplorer(self)
self.explorer.setObjectName("Explorer")
self.console = BECShell(self, rpc_exposed=False)
self.console.setObjectName("BEC Shell")
self.terminal = BecConsole(self, rpc_exposed=False)
self.console = WebConsole(self)
self.console.setObjectName("Console")
self.terminal = WebConsole(self, startup_cmd="")
self.terminal.setObjectName("Terminal")
self.monaco = MonacoDock(self, rpc_exposed=False, rpc_passthrough_children=False)
self.monaco = MonacoDock(self)
self.monaco.setObjectName("MonacoEditor")
self.monaco.save_enabled.connect(self._on_save_enabled_update)
self.plotting_ads = BECDockArea(
self.plotting_ads = AdvancedDockArea(
self,
mode="plot",
default_add_direction="bottom",
@@ -130,7 +124,6 @@ class DeveloperWidget(DockAreaWidget):
# Connect editor signals
self.explorer.file_open_requested.connect(self._open_new_file)
self.monaco.macro_file_updated.connect(self.explorer.refresh_macro_file)
self.monaco.focused_editor.connect(self._on_focused_editor_changed)
self.toolbar.show_bundles(["save", "execution", "settings"])
@@ -287,17 +280,14 @@ class DeveloperWidget(DockAreaWidget):
@SafeSlot()
def on_save(self):
"""Save the currently focused file in the Monaco editor."""
self.monaco.save_file()
@SafeSlot()
def on_save_as(self):
"""Save the currently focused file in the Monaco editor with a 'Save As' dialog."""
self.monaco.save_file(force_save_as=True)
@SafeSlot()
def on_vim_triggered(self):
"""Toggle Vim mode in the Monaco editor."""
self.monaco.set_vim_mode(self.toolbar.components.get_action("vim").action.isChecked())
@SafeSlot(bool)
@@ -314,38 +304,22 @@ class DeveloperWidget(DockAreaWidget):
widget = self.script_editor_tab.widget()
if not isinstance(widget, MonacoWidget):
return
if widget.modified:
# Save the file before execution if there are unsaved changes
self.monaco.save_file()
if widget.modified:
# If still modified, user likely cancelled save dialog
return
self.current_script_id = upload_script(self.client.connector, widget.get_text())
self.console.write(f'bec._run_script("{self.current_script_id}")')
print(f"Uploaded script with ID: {self.current_script_id}")
@SafeSlot()
def on_stop(self):
"""Stop the execution of the currently running script"""
if not self.current_script_id:
return
self.console.send_ctrl_c()
@property
def current_script_id(self):
"""Get the ID of the currently running script."""
return self._current_script_id
@current_script_id.setter
def current_script_id(self, value: str | None):
"""
Set the ID of the currently running script.
Args:
value (str | None): The script ID to set.
Raises:
ValueError: If the provided value is not a string or None.
"""
if value is not None and not isinstance(value, str):
raise ValueError("Script ID must be a string.")
old_script_id = self._current_script_id
@@ -362,28 +336,6 @@ class DeveloperWidget(DockAreaWidget):
self.on_script_execution_info, MessageEndpoints.script_execution_info(new_script_id)
)
@SafeSlot(CDockWidget)
def _on_focused_editor_changed(self, tab_widget: CDockWidget):
"""
Disable the run / stop buttons if the focused editor is a macro file.
Args:
tab_widget: The currently focused tab widget in the Monaco editor.
"""
if not isinstance(tab_widget, CDockWidget):
return
widget = tab_widget.widget()
if not isinstance(widget, MonacoWidget):
return
file_scope = widget.metadata.get("scope", "")
run_action = self.toolbar.components.get_action("run")
stop_action = self.toolbar.components.get_action("stop")
if "macro" in file_scope:
run_action.action.setEnabled(False)
stop_action.action.setEnabled(False)
else:
run_action.action.setEnabled(True)
stop_action.action.setEnabled(True)
@SafeSlot(dict, dict)
def on_script_execution_info(self, content: dict, metadata: dict):
"""
@@ -407,6 +359,25 @@ class DeveloperWidget(DockAreaWidget):
widget.set_highlighted_lines(line_number, line_number)
def cleanup(self):
"""Clean up resources used by the developer widget."""
self.delete_all()
return super().cleanup()
if __name__ == "__main__":
import sys
from bec_qthemes import apply_theme
from qtpy.QtWidgets import QApplication
from bec_widgets.applications.main_app import BECMainApp
app = QApplication(sys.argv)
apply_theme("dark")
_app = BECMainApp()
_app.show()
# developer_view.show()
# developer_view.setWindowTitle("Developer View")
# developer_view.resize(1920, 1080)
# developer_view.set_stretch(horizontal=[1, 3, 2], vertical=[5, 5]) #can be set during runtime
sys.exit(app.exec_())
@@ -1,7 +1,5 @@
"""Dialogs for device configuration forms and ophyd testing."""
from typing import Any, Iterable, Tuple
from bec_lib.atlas_models import Device as DeviceModel
from bec_lib.logger import bec_logger
from ophyd_devices.interfaces.device_config_templates.ophyd_templates import OPHYD_DEVICE_TEMPLATES
@@ -22,7 +20,6 @@ from bec_widgets.widgets.control.device_manager.components.ophyd_validation impo
)
DEFAULT_DEVICE = "CustomDevice"
_ValidationResultIter = Iterable[Tuple[dict[str, Any], ConfigStatus, ConnectionStatus, str]]
logger = bec_logger.logger
@@ -31,7 +28,7 @@ logger = bec_logger.logger
class DeviceManagerOphydValidationDialog(QtWidgets.QDialog):
"""Popup dialog to test Ophyd device configurations interactively."""
def __init__(self, parent=None, config: dict | None = None): # type: ignore
def __init__(self, parent=None, config: dict | None = None): # type:ignore
super().__init__(parent)
self.setWindowTitle("Device Manager Ophyd Test")
self._config_status = ConfigStatus.UNKNOWN.value
@@ -50,11 +47,11 @@ class DeviceManagerOphydValidationDialog(QtWidgets.QDialog):
self.text_box.setReadOnly(True)
layout.addWidget(self.text_box)
# Connect signal for validation messages
# Load and apply configuration
config = config or {}
device_name = config.get("name", None)
if device_name:
self.device_manager_ophyd_test.add_device_to_keep_visible_after_validation(device_name)
self.device_manager_ophyd_test.change_device_configs([config], True, True)
# Dialog Buttons: equal size, stacked horizontally
button_box = QtWidgets.QDialogButtonBox(QtWidgets.QDialogButtonBox.StandardButton.Close)
@@ -69,9 +66,6 @@ class DeviceManagerOphydValidationDialog(QtWidgets.QDialog):
self._resize_dialog()
self.finished.connect(self._finished)
# Add and test device config
self.device_manager_ophyd_test.change_device_configs([config], added=True, connect=True)
def _resize_dialog(self):
"""Resize the dialog based on the screen size."""
app: QtCore.QCoreApplication = QtWidgets.QApplication.instance()
@@ -133,7 +127,7 @@ class DeviceFormDialog(QtWidgets.QDialog):
# validated: config_status, connection_status
accepted_data = QtCore.Signal(dict, int, int, str, str)
def __init__(self, parent=None, add_btn_text: str = "Add Device"): # type: ignore
def __init__(self, parent=None, add_btn_text: str = "Add Device"): # type:ignore
super().__init__(parent)
# Track old device name if config is edited
self._old_device_name: str = ""
@@ -176,17 +170,12 @@ class DeviceFormDialog(QtWidgets.QDialog):
self.cancel_btn = QtWidgets.QPushButton("Cancel")
self.reset_btn = QtWidgets.QPushButton("Reset Form")
btn_box = QtWidgets.QDialogButtonBox(self)
btn_box.addButton(self.cancel_btn, QtWidgets.QDialogButtonBox.ButtonRole.RejectRole)
btn_box.addButton(self.reset_btn, QtWidgets.QDialogButtonBox.ButtonRole.ActionRole)
btn_box.addButton(
self.test_connection_btn, QtWidgets.QDialogButtonBox.ButtonRole.ActionRole
)
btn_box.addButton(self.add_btn, QtWidgets.QDialogButtonBox.ButtonRole.AcceptRole)
for btn in btn_box.buttons():
btn_layout = QtWidgets.QHBoxLayout()
for btn in (self.cancel_btn, self.reset_btn, self.test_connection_btn, self.add_btn):
btn.setSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Preferred)
layout.addWidget(btn_box)
btn_layout.addWidget(btn)
btn_box = QtWidgets.QGroupBox("Actions")
btn_box.setLayout(btn_layout)
frame_layout.addWidget(btn_box)
# Connect signals to explicit slots
@@ -199,17 +188,11 @@ class DeviceFormDialog(QtWidgets.QDialog):
self.update_variant_combo(self._control_widgets["group_combo"].currentText())
self.finished.connect(self._finished)
# Wait dialog when adding config
self._wait_dialog: QtWidgets.QProgressDialog | None = None
@SafeSlot(int)
def _finished(self, state: int):
for widget in self._control_widgets.values():
widget.close()
widget.deleteLater()
if self._wait_dialog is not None:
self._wait_dialog.close()
self._wait_dialog.deleteLater()
@property
def config_validation_result(self) -> tuple[dict, int, int, str]:
@@ -285,132 +268,42 @@ class DeviceFormDialog(QtWidgets.QDialog):
OPHYD_DEVICE_TEMPLATES[DEFAULT_DEVICE][DEFAULT_DEVICE]
)
def _create_validation_dialog(self) -> QtWidgets.QProgressDialog:
"""
Create and show a validation progress dialog while validating the device configuration.
The dialog will be modal and prevent user interaction until validation is complete.
"""
wait_dialog = QtWidgets.QProgressDialog(
"Validating config... please wait", None, 0, 0, parent=self
)
wait_dialog.setWindowModality(QtCore.Qt.WindowModality.ApplicationModal)
wait_dialog.setCancelButton(None)
wait_dialog.setMinimumDuration(0)
return wait_dialog
def _create_and_run_ophyd_validation(self, config: dict[str, Any]) -> OphydValidation:
"""Run ophyd validation test on the current device configuration."""
ophyd_validation = OphydValidation(parent=self)
ophyd_validation.validation_completed.connect(self._handle_validation_result)
ophyd_validation.multiple_validations_completed.connect(
self._handle_devices_already_in_session_results
)
# NOTE Use singleShot here to ensure that the signal is emitted after all other scheduled
# tasks in the event loop are processed. This avoids potential deadlocks. In particular,
# this is relevant for the _wait_dialog exec which opens a modal dialog during validation
# and therefore must not have the signal emitted immediately in the same event loop iteration.
# Otherwise, the callback may be scheduled before the dialog is shown resulting in a deadlock.
QtCore.QTimer.singleShot(
0, lambda: ophyd_validation.change_device_configs([config], True, False)
)
return ophyd_validation
@SafeSlot(list)
def _handle_devices_already_in_session_results(
self, validation_results: _ValidationResultIter
) -> None:
"""Handle completion if device is already in session."""
if len(validation_results) != 1:
logger.error(
"Expected a single device validation result, but got multiple. Using first result."
)
result = validation_results[0] if len(validation_results) > 0 else None
if result is None:
logger.error(
f"Received validation results: {validation_results} of unexpected length 0. Returning."
)
return
device_config, config_status, connection_status, validation_msg = result
self._handle_validation_result(
device_config, config_status, connection_status, validation_msg
)
@SafeSlot(dict, int, int, str)
def _handle_validation_result(
self, device_config: dict, config_status: int, connection_status: int, validation_msg: str
):
"""Handle completion of validation."""
def _add_config(self):
config = self._device_config_template.get_config_fields()
config_status = ConfigStatus.UNKNOWN.value
connection_status = ConnectionStatus.UNKNOWN.value
validation_msg = ""
try:
if (
DeviceModel.model_validate(device_config)
== DeviceModel.model_validate(self._validation_result[0])
and connection_status == ConnectionStatus.UNKNOWN.value
if DeviceModel.model_validate(config) == DeviceModel.model_validate(
self._validation_result[0]
):
# Config unchanged, we can reuse previous connection status. Only do this if the new
# connection status is UNKNOWN as the current validation should not test the connection.
config_status = self._validation_result[1]
connection_status = self._validation_result[2]
validation_msg = self._validation_result[3]
except Exception:
logger.debug(
f"Device config validation changed for config: {device_config} compared to previous validation. Using status from recent validation."
f"Device config validation changed for config: {config} compared to {self._validation_result[0]}. Returning UNKNOWN statuses."
)
self._validation_result = (device_config, config_status, connection_status, validation_msg)
if self._wait_dialog is not None:
self._wait_dialog.accept()
self._wait_dialog.close()
self._wait_dialog.deleteLater()
self._wait_dialog = None
def _add_config(self):
"""
Adding a config will always run a validation check of the config without a connection test.
We will check if tests have already run, and reuse the information in case they also tested the connection to the device.
"""
config = self._device_config_template.get_config_fields()
# I. First we validate that the device name is valid, as this may create issues within the OphydValidation widget.
# Validate device name first. If invalid, this should immediately block adding the device.
if not validate_name(config.get("name", "")):
msg_box = self._create_warning_message_box(
"Invalid Device Name",
f"Device is invalid, cannot be empty or contain spaces. Please provide a valid name. {config.get('name', '')!r}",
f"Device is invalid, can not be empty with spaces. Please provide a valid name. {config.get('name', '')!r} ",
)
msg_box.exec()
return
if config_status == ConfigStatus.INVALID.value:
msg_box = self._create_warning_message_box(
"Invalid Device Configuration",
f"Device configuration is invalid. Last known validation message:\n\nErrors:\n{validation_msg}",
)
msg_box.exec()
return
# II. Next we will run the validation check of the config without connection test.
# We will show a wait dialog while this is happening, and compare the results with the last known validation results.
# If the config is unchanged, we will use the connection status results from the last validation.
self._wait_dialog = self._create_validation_dialog()
ophyd_validation: OphydValidation | None = None
try:
ophyd_validation = self._create_and_run_ophyd_validation(config)
# NOTE If dialog was already closed, this means that a validation callback was already received
# which closed the dialog. In this case, we skip exec to avoid deadlock. With the singleShot above,
# this should not happen, but we keep the check for safety.
if self._wait_dialog is not None:
self._wait_dialog.exec() # This will block until the validation is complete
config, config_status, connection_status, validation_msg = self._validation_result
if config_status == ConfigStatus.INVALID.value:
msg_box = self._create_warning_message_box(
"Invalid Device Configuration",
f"Device configuration is invalid. Last known validation message:\n\nErrors:\n{self._validation_result[3]}",
)
msg_box.exec()
return
self.accepted_data.emit(
config, config_status, connection_status, validation_msg, self._old_device_name
)
self.accept()
finally:
if ophyd_validation is not None:
ophyd_validation.close()
ophyd_validation.deleteLater()
self.accepted_data.emit(
config, config_status, connection_status, validation_msg, self._old_device_name
)
self.accept()
def _create_warning_message_box(self, title: str, text: str) -> QtWidgets.QMessageBox:
msg_box = QtWidgets.QMessageBox(self)
@@ -425,6 +318,7 @@ class DeviceFormDialog(QtWidgets.QDialog):
result = dialog.exec()
if result in (QtWidgets.QDialog.Accepted, QtWidgets.QDialog.Rejected):
self.config_validation_result = dialog.validation_result
# self._device_config_template.set_config_fields(self.config_validation_result[0])
def _reset_config(self):
self._device_config_template.reset_to_defaults()
@@ -4,7 +4,7 @@ from __future__ import annotations
from enum import IntEnum
from functools import partial
from typing import TYPE_CHECKING, Any, List, Tuple
from typing import TYPE_CHECKING, Dict, List, Tuple
from bec_lib.logger import bec_logger
from bec_qthemes import apply_theme, material_icon
@@ -12,17 +12,16 @@ from qtpy import QtCore, QtGui, QtWidgets
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.control.device_manager.components import OphydValidation
from bec_widgets.widgets.control.device_manager.components.ophyd_validation import (
ConfigStatus,
ConnectionStatus,
get_validation_icons,
)
from bec_widgets.widgets.progress.bec_progressbar.bec_progressbar import BECProgressBar
if TYPE_CHECKING:
from bec_widgets.utils.colors import AccentColor
from bec_widgets.widgets.control.device_manager.components.device_table.device_table import (
_ValidationResultIter,
)
logger = bec_logger.logger
@@ -235,18 +234,22 @@ class UploadRedisDialog(QtWidgets.QDialog):
class UploadAction(IntEnum):
"""Enum for upload actions."""
CANCEL = QtWidgets.QDialog.DialogCode.Rejected
OK = QtWidgets.QDialog.DialogCode.Accepted
CONNECTION_TEST_REQUESTED = 999
CANCEL = QtWidgets.QDialog.Rejected
OK = QtWidgets.QDialog.Accepted
# Request ophyd validation for all untested device connections
# list of device configs, added: bool, connect: bool
request_ophyd_validation = QtCore.Signal(list, bool, bool)
# Signal to trigger upload after confirmation
upload_confirmed = QtCore.Signal(int)
def __init__(self, parent, device_configs: dict[str, Tuple[dict, int, int]] | None = None):
def __init__(
self,
parent,
ophyd_test_widget: OphydValidation,
device_configs: dict[str, Tuple[dict, int, int]] | None = None,
):
super().__init__(parent=parent)
self.device_configs: dict[str, Tuple[dict, int, int]] = device_configs or {}
self.ophyd_test_widget = ophyd_test_widget
self._transparent_button_style = "background-color: transparent; border: none;"
self.colors = get_accent_colors()
@@ -264,9 +267,14 @@ class UploadRedisDialog(QtWidgets.QDialog):
self.has_invalid_configs: int = 0
self.has_untested_connections: int = 0
self.has_cannot_connect: int = 0
self._current_progress: int | None = None
self._setup_ui()
self._update_ui()
# Disable validation features if no ophyd test widget provided, else connect validation
self._validation_connection = self.ophyd_test_widget.validation_completed.connect(
self._update_from_ophyd_device_tests
)
def set_device_config(self, device_configs: dict[str, Tuple[dict, int, int]]):
"""
@@ -280,6 +288,18 @@ class UploadRedisDialog(QtWidgets.QDialog):
self.device_configs = device_configs
self._update_ui()
def accept(self):
self.cleanup()
return super().accept()
def reject(self):
self.cleanup()
return super().reject()
def cleanup(self):
"""Cleanup on dialog finish."""
self.ophyd_test_widget.validation_completed.disconnect(self._validation_connection)
def _setup_ui(self):
"""Setup the main UI for the dialog."""
self.setWindowTitle("Upload Configuration to BEC Server")
@@ -327,6 +347,11 @@ class UploadRedisDialog(QtWidgets.QDialog):
button_layout.addWidget(self.validate_connections_btn)
button_layout.addStretch()
button_layout.addSpacing(16)
# Progress bar
self._progress_bar = BECProgressBar(self)
self._progress_bar.setVisible(False)
button_layout.addWidget(self._progress_bar)
action_layout.addLayout(button_layout)
# Status indicator
@@ -473,7 +498,7 @@ class UploadRedisDialog(QtWidgets.QDialog):
@SafeSlot()
def _validate_connections(self):
"""Request validation of all untested connections. This will close the dialog."""
"""Request validation of all untested connections."""
testable_devices: List[dict] = []
for _, (config, _, connection_status) in self.device_configs.items():
if connection_status == ConnectionStatus.UNKNOWN.value:
@@ -482,8 +507,13 @@ class UploadRedisDialog(QtWidgets.QDialog):
testable_devices.append(config)
if len(testable_devices) > 0:
self.request_ophyd_validation.emit(testable_devices, True, True)
self.done(self.UploadAction.CONNECTION_TEST_REQUESTED)
self.validate_connections_btn.setEnabled(False)
self._progress_bar.setVisible(True)
self._progress_bar.maximum = len(testable_devices)
self._progress_bar.minimum = 0
self._progress_bar.set_value(0)
self._current_progress = 0
self.ophyd_test_widget.change_device_configs(testable_devices, added=True, connect=True)
@SafeSlot()
def _handle_upload(self):
@@ -513,8 +543,7 @@ class UploadRedisDialog(QtWidgets.QDialog):
[
detailed_text,
"These devices may not be reachable and disabled BEC upon loading the config.",
"Consider validating these connections before proceeding.\n\n",
"Continue anyway?",
"Consider validating these connections before.",
]
)
reply = QtWidgets.QMessageBox.critical(
@@ -582,40 +611,35 @@ class UploadRedisDialog(QtWidgets.QDialog):
return
self.update_device_status(device_config, config_status, connection_status)
@SafeSlot(list)
def _multiple_updates_from_ophyd_device_tests(self, validation_results: _ValidationResultIter):
"""
Callback slot for receiving multiple validation result updates from the ophyd test widget.
Args:
validation_results (list): List of tuples containing (device_config, config_status, connection_status, validation_msg).
"""
for cfg, cfg_status, conn_status, val_msg in validation_results:
self.update_device_status(cfg, cfg_status, conn_status)
self._update_ui()
@SafeSlot(dict, int, int)
def update_device_status(self, device_config: dict, config_status: int, connection_status: int):
"""Update the status of a specific device."""
# Update device config status
self._update_device_configs(device_config, config_status, connection_status, "")
# Recalculate summaries and UI state
self._update_ui()
def _update_device_configs(
self,
device_config: dict[str, Any],
config_status: int,
connection_status: int,
validation_msg: str,
):
device_name = device_config.get("name", "")
old_config, _, _ = self.device_configs.get(device_name, (None, None, None))
if old_config is not None:
self.device_configs[device_name] = (device_config, config_status, connection_status)
else:
# If device not found, add it
self.config_section.add_device(device_config, config_status, connection_status)
if self._current_progress is not None:
self._current_progress += 1
self._progress_bar.set_value(self._current_progress)
if self._current_progress >= self._progress_bar.maximum:
self._progress_bar.setVisible(False)
self._progress_bar.set_value(0)
self._current_progress = None
self.validation_completed()
self._update_ui()
return
# Update UI sections
self.config_section.add_device(device_config, config_status, connection_status)
# Recalculate summaries and UI state
self._update_ui()
def validation_completed(self):
"""Called when connection validation is completed."""
self.validate_connections_btn.setEnabled(True)
self._update_ui()
def main(): # pragma: no cover
@@ -681,7 +705,12 @@ def main(): # pragma: no cover
]
configs = {cfg[0]["name"]: cfg for cfg in sample_configs}
apply_theme("dark")
dialog = UploadRedisDialog(parent=None, device_configs=configs)
from unittest import mock
ophyd_test_widget = mock.MagicMock(spec=OphydValidation)
dialog = UploadRedisDialog(
parent=None, device_configs=configs, ophyd_test_widget=ophyd_test_widget
)
dialog.show()
sys.exit(app.exec_())
@@ -2,29 +2,18 @@ from __future__ import annotations
import os
from functools import partial
from typing import TYPE_CHECKING, List, Literal, get_args
from typing import List, Literal, get_args
import yaml
from bec_lib import config_helper
from bec_lib.bec_yaml_loader import yaml_load
from bec_lib.callback_handler import EventType
from bec_lib.endpoints import MessageEndpoints
from bec_lib.file_utils import DeviceConfigWriter
from bec_lib.logger import bec_logger
from bec_lib.messages import ConfigAction, ScanStatusMessage
from bec_lib.messages import ConfigAction
from bec_lib.plugin_helper import plugin_package_name, plugin_repo_path
from bec_qthemes import apply_theme, material_icon
from qtpy.QtCore import QMetaObject, Qt, QThreadPool, Signal
from qtpy.QtGui import QColor
from qtpy.QtWidgets import (
QApplication,
QFileDialog,
QMessageBox,
QPushButton,
QTextEdit,
QVBoxLayout,
QWidget,
)
from bec_qthemes import apply_theme
from qtpy.QtCore import QMetaObject, QThreadPool, Signal
from qtpy.QtWidgets import QFileDialog, QMessageBox, QTextEdit, QVBoxLayout, QWidget
from bec_widgets.applications.views.device_manager_view.device_manager_dialogs import (
ConfigChoiceDialog,
@@ -33,12 +22,11 @@ from bec_widgets.applications.views.device_manager_view.device_manager_dialogs i
from bec_widgets.applications.views.device_manager_view.device_manager_dialogs.upload_redis_dialog import (
UploadRedisDialog,
)
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.toolbars.actions import MaterialIconAction
from bec_widgets.utils.toolbars.bundles import ToolbarBundle
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.widgets.containers.dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.containers.advanced_dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.control.device_manager.components import (
DeviceTable,
DMConfigView,
@@ -47,19 +35,11 @@ from bec_widgets.widgets.control.device_manager.components import (
)
from bec_widgets.widgets.control.device_manager.components._util import SharedSelectionSignal
from bec_widgets.widgets.control.device_manager.components.ophyd_validation.ophyd_validation_utils import (
ConfigStatus,
ConnectionStatus,
)
from bec_widgets.widgets.progress.device_initialization_progress_bar.device_initialization_progress_bar import (
DeviceInitializationProgressBar,
)
from bec_widgets.widgets.services.device_browser.device_item.config_communicator import (
CommunicateConfigAction,
)
from bec_widgets.widgets.utility.spinner.spinner import SpinnerWidget
if TYPE_CHECKING: # pragma: no cover
from bec_lib.client import BECClient
logger = bec_logger.logger
@@ -70,84 +50,6 @@ _yes_no_question = partial(
)
class CustomBusyWidget(QWidget):
"""Custom busy widget to show during device config upload."""
cancel_requested = Signal()
def __init__(self, parent=None, client: BECClient | None = None):
super().__init__(parent=parent)
# Widgets
self.progress = QWidget(parent=self)
self.progress_layout = QVBoxLayout(self.progress)
self.progress_layout.setContentsMargins(6, 6, 6, 6)
self.progress_inner = DeviceInitializationProgressBar(parent=self.progress, client=client)
self.progress_layout.addWidget(self.progress_inner)
self.progress.setMinimumWidth(320)
# Spinner
self.spinner = SpinnerWidget(parent=self)
scale = self._ui_scale()
spinner_size = int(scale * 0.12) if scale else 1
spinner_size = max(32, min(spinner_size, 96))
self.spinner.setFixedSize(spinner_size, spinner_size)
# Cancel button
self.cancel_button = QPushButton("Cancel Upload", parent=self)
self.cancel_button.setIcon(material_icon("cancel"))
self.cancel_button.clicked.connect(self.cancel_requested.emit)
button_height = int(spinner_size * 0.9)
button_height = max(36, min(button_height, 72))
aspect_ratio = 3.8 # width / height, visually stable for text buttons
button_width = int(button_height * aspect_ratio)
self.cancel_button.setFixedSize(button_width, button_height)
color = get_accent_colors()
self.cancel_button.setStyleSheet(f"""
QPushButton {{
background-color: {color.emergency.name()};
color: white;
font-weight: 600;
border-radius: 6px;
}}
""")
# Layout
content_layout = QVBoxLayout(self)
content_layout.setContentsMargins(24, 24, 24, 24)
content_layout.setSpacing(16)
content_layout.addStretch()
content_layout.addWidget(self.spinner, 0, Qt.AlignmentFlag.AlignHCenter)
content_layout.addWidget(self.progress, 0, Qt.AlignmentFlag.AlignHCenter)
content_layout.addStretch()
content_layout.addWidget(self.cancel_button, 0, Qt.AlignmentFlag.AlignHCenter)
if hasattr(color, "_colors"):
bg_color = color._colors.get("BG", None)
if bg_color is None: # Fallback if missing
bg_color = QColor(50, 50, 50, 255)
self.setStyleSheet(f"""
background-color: {bg_color.name()};
border-radius: 12px;
""")
def _ui_scale(self) -> int:
parent = self.parent()
if not parent:
return 0
return min(parent.width(), parent.height())
def showEvent(self, event):
"""Show event to start the spinner."""
super().showEvent(event)
self.spinner.start()
def hideEvent(self, event):
"""Hide event to stop the spinner."""
super().hideEvent(event)
self.spinner.stop()
class DeviceManagerDisplayWidget(DockAreaWidget):
"""Device Manager main display widget. This contains all sub-widgets and the toolbar."""
@@ -155,23 +57,13 @@ class DeviceManagerDisplayWidget(DockAreaWidget):
request_ophyd_validation = Signal(list, bool, bool)
def __init__(self, parent=None, *args, **kwargs):
def __init__(self, parent=None, client=None, *args, **kwargs):
super().__init__(parent=parent, variant="compact", *args, **kwargs)
# State variable for config upload
self._config_upload_active: bool = False
self._config_in_sync: bool = False
scan_status = self.bec_dispatcher.client.connector.get(MessageEndpoints.scan_status())
initial_status = scan_status.status if scan_status is not None else "closed"
self._scan_is_running: bool = initial_status in ["open", "paused"]
# Push to Redis dialog
self._upload_redis_dialog: UploadRedisDialog | None = None
self._dialog_validation_connection: QMetaObject.Connection | None = None
# NOTE: We need here a separate config helper instance to avoid conflicts with
# other communications to REDIS as uploading a config through a CommunicationConfigAction
# will block if we use the config_helper from self.client.config._config_helper
self._config_helper = config_helper.ConfigHelper(self.client.connector)
self._shared_selection = SharedSelectionSignal()
@@ -215,62 +107,23 @@ class DeviceManagerDisplayWidget(DockAreaWidget):
(self.request_ophyd_validation, (self.ophyd_test_view.change_device_configs,)),
(
self.device_table_view.device_configs_changed,
(self.ophyd_test_view.device_table_config_changed,),
(self.ophyd_test_view.change_device_configs,),
),
(
self.device_table_view.device_config_in_sync_with_redis,
(self._update_config_in_sync,),
(self._update_config_enabled_button,),
),
(self.device_table_view.device_row_dbl_clicked, (self._edit_device_action,)),
]:
for slot in slots:
signal.connect(slot)
self._scan_status_callback_id = self.bec_dispatcher.client.callbacks.register(
EventType.SCAN_STATUS, self._update_scan_running
)
# Add toolbar
self._add_toolbar()
# Build dock layout using shared helpers
self._build_docks()
def cleanup(self):
self.bec_dispatcher.client.callbacks.remove(self._scan_status_callback_id)
super().cleanup()
def closeEvent(self, event):
"""If config upload is active when application is exiting, cancel it."""
logger.info("Application is quitting, checking for active config upload...")
if self._config_upload_active:
logger.info("Application is quitting, cancelling active config upload...")
self._config_helper.send_config_request(
action="cancel", config=None, wait_for_response=True, timeout_s=10
)
logger.info("Config upload cancelled.")
super().closeEvent(event)
##############################
### Custom set busy widget ###
##############################
def create_busy_state_widget(self) -> QWidget:
"""Create a custom busy state widget for uploading device configurations."""
widget = CustomBusyWidget(parent=self, client=self.client)
widget.cancel_requested.connect(self._cancel_device_config_upload)
return widget
def _set_busy_wrapper(self, enabled: bool):
"""Thin wrapper around set_busy to flip the state variable."""
self._busy_overlay.set_opacity(0.92)
self._config_upload_active = enabled
self.set_busy(enabled=enabled)
##############################
### Toolbar and Dock setup ###
##############################
def _add_toolbar(self):
self.toolbar = ModularToolBar(self)
@@ -452,36 +305,6 @@ class DeviceManagerDisplayWidget(DockAreaWidget):
# Add load config from plugin dir
self.toolbar.add_bundle(table_bundle)
######################################
### Update button state management ###
######################################
@SafeSlot(dict, dict)
def _update_scan_running(self, scan_info: dict, _: dict):
"""disable editing when scans are running and enable editing when they are finished"""
msg = ScanStatusMessage.model_validate(scan_info)
self._scan_is_running = msg.status in ["open", "paused"]
self._update_config_enabled_button()
def _update_config_in_sync(self, in_sync: bool):
self._config_in_sync = in_sync
self._update_config_enabled_button()
def _update_config_enabled_button(self):
action = self.toolbar.components.get_action("update_config_redis")
enabled = not self._config_in_sync and not self._scan_is_running
action.action.setEnabled(enabled)
if enabled: # button is enabled
action.action.setToolTip("Push current config to BEC Server")
elif self._scan_is_running:
action.action.setToolTip("Scan is currently running, config updates disabled.")
else:
action.action.setToolTip("Current config is in sync with BEC Server, updates disabled.")
#######################
### Action Handlers ###
#######################
@SafeSlot()
@SafeSlot(bool)
def _run_validate_connection(self, connect: bool = True):
@@ -489,15 +312,16 @@ class DeviceManagerDisplayWidget(DockAreaWidget):
configs = list(self.device_table_view.get_selected_device_configs())
if not configs:
configs = self.device_table_view.get_device_config()
# Adjust the state of the icons in the device table view
self.device_table_view.update_multiple_device_validations(
[
(cfg, ConfigStatus.UNKNOWN.value, ConnectionStatus.UNKNOWN.value, "")
for cfg in configs
]
)
self.request_ophyd_validation.emit(configs, True, connect)
def _update_config_enabled_button(self, enabled: bool):
action = self.toolbar.components.get_action("update_config_redis")
action.action.setEnabled(not enabled)
if enabled:
action.action.setToolTip("Push current config to BEC Server")
else:
action.action.setToolTip("Current config is in sync with BEC Server, button disabled.")
@SafeSlot()
def _load_file_action(self):
"""Action for the 'load' action to load a config from disk for the io_bundle of the toolbar."""
@@ -600,15 +424,17 @@ class DeviceManagerDisplayWidget(DockAreaWidget):
"Do you really want to flush the current config in BEC Server?",
)
if reply == QMessageBox.StandardButton.Yes:
self.set_busy(enabled=True, text="Flushing configuration in BEC Server...")
self.client.config.reset_config()
logger.info("Successfully flushed configuration in BEC Server.")
self.set_busy(enabled=False)
# Check if config is in sync, enable load redis button
self.device_table_view.device_config_in_sync_with_redis.emit(
self.device_table_view._is_config_in_sync_with_redis()
)
validation_results = self.device_table_view.get_validation_results()
for config, config_status, connection_status in validation_results.values():
if connection_status == ConnectionStatus.CONNECTED.value:
for config, config_status, connnection_status in validation_results.values():
if connnection_status == ConnectionStatus.CONNECTED.value:
self.device_table_view.update_device_validation(
config, config_status, ConnectionStatus.CAN_CONNECT, ""
)
@@ -648,10 +474,7 @@ class DeviceManagerDisplayWidget(DockAreaWidget):
validation_results = self.device_table_view.get_validation_results()
# Create and show upload dialog
self._upload_redis_dialog = UploadRedisDialog(
parent=self, device_configs=validation_results
)
self._upload_redis_dialog.request_ophyd_validation.connect(
self.request_ophyd_validation.emit
parent=self, device_configs=validation_results, ophyd_test_widget=self.ophyd_test_view
)
# Show dialog
@@ -661,10 +484,6 @@ class DeviceManagerDisplayWidget(DockAreaWidget):
self._push_composition_to_redis(action="set")
elif reply == UploadRedisDialog.UploadAction.CANCEL:
self.ophyd_test_view.cancel_all_validations()
elif reply == UploadRedisDialog.UploadAction.CONNECTION_TEST_REQUESTED:
return QMessageBox.information(
self, "Connection Test Requested", "Running connection test on untested devices."
)
def _push_composition_to_redis(self, action: ConfigAction):
"""Push the current device composition to Redis."""
@@ -677,37 +496,12 @@ class DeviceManagerDisplayWidget(DockAreaWidget):
comm.signals.done.connect(self._handle_push_complete_to_communicator)
comm.signals.error.connect(self._handle_exception_from_communicator)
threadpool.start(comm)
self._set_busy_wrapper(enabled=True)
def _cancel_device_config_upload(self):
"""Cancel the device configuration upload process."""
threadpool = QThreadPool.globalInstance()
comm = CommunicateConfigAction(self._config_helper, None, {}, "cancel")
# Cancelling will raise an exception in the communicator, so we connect to the failure handler
comm.signals.error.connect(self._handle_cancel_config_upload_failed)
threadpool.start(comm)
def _handle_cancel_config_upload_failed(self, exception: Exception):
"""Handle failure to cancel the config upload."""
self._set_busy_wrapper(enabled=False)
validation_results = self.device_table_view.get_validation_results()
devices_to_update = []
for config, config_status, connection_status in validation_results.values():
devices_to_update.append(
(config, config_status, ConnectionStatus.UNKNOWN.value, "Upload Cancelled")
)
# Rerun validation of all devices after cancellation
self.device_table_view.update_multiple_device_validations(devices_to_update)
self.ophyd_test_view.change_device_configs(
[cfg for cfg, _, _, _ in devices_to_update], added=True, skip_validation=False
)
# Config is in sync with BEC, so we update the state
self.device_table_view.device_config_in_sync_with_redis.emit(False)
self.set_busy(enabled=True, text="Uploading configuration to BEC Server...")
def _handle_push_complete_to_communicator(self):
"""Handle completion of the config push to Redis."""
self._set_busy_wrapper(enabled=False)
self.set_busy(enabled=False)
self._update_validation_icons_after_upload()
def _handle_exception_from_communicator(self, exception: Exception):
"""Handle exceptions from the config communicator."""
@@ -716,7 +510,22 @@ class DeviceManagerDisplayWidget(DockAreaWidget):
"Error Uploading Config",
f"An error occurred while uploading the configuration to BEC Server:\n{str(exception)}",
)
self._set_busy_wrapper(enabled=False)
self.set_busy(enabled=False)
self._update_validation_icons_after_upload()
def _update_validation_icons_after_upload(self):
"""Update validation icons after uploading config to Redis."""
if self.client.device_manager is None:
return
device_names_in_session = list(self.client.device_manager.devices.keys())
validation_results = self.device_table_view.get_validation_results()
devices_to_update = []
for config, config_status, connection_status in validation_results.values():
if config["name"] in device_names_in_session:
devices_to_update.append(
(config, config_status, ConnectionStatus.CONNECTED.value, "")
)
self.device_table_view.update_multiple_device_validations(devices_to_update)
@SafeSlot()
def _save_to_disk_action(self):
@@ -782,7 +591,7 @@ class DeviceManagerDisplayWidget(DockAreaWidget):
):
if old_device_name and old_device_name != data.get("name", ""):
self.device_table_view.remove_device(old_device_name)
self._add_to_table_from_dialog(data, config_status, connection_status, msg, old_device_name)
self.device_table_view.update_device_configs([data])
@SafeSlot(dict, int, int, str, str)
def _add_to_table_from_dialog(
@@ -793,15 +602,7 @@ class DeviceManagerDisplayWidget(DockAreaWidget):
msg: str,
old_device_name: str = "",
):
if connection_status == ConnectionStatus.UNKNOWN.value:
self.device_table_view.update_device_configs([data], skip_validation=False)
else: # Connection status was tested in dialog
# If device is connected, we remove it from the ophyd validation view
self.device_table_view.update_device_configs([data], skip_validation=True)
# Update validation status in device table view and ophyd validation view
self.ophyd_test_view._on_device_test_completed(
data, config_status, connection_status, msg
)
self.device_table_view.add_device_configs([data])
@SafeSlot()
def _remove_device_action(self):
@@ -1,12 +1,11 @@
"""Module for Device Manager View."""
from qtpy.QtCore import QRect
from qtpy.QtWidgets import QWidget
from bec_widgets.applications.views.device_manager_view.device_manager_widget import (
DeviceManagerWidget,
)
from bec_widgets.applications.views.view import ViewBase, ViewTourSteps
from bec_widgets.applications.views.view import ViewBase
from bec_widgets.utils.error_popups import SafeSlot
@@ -20,21 +19,11 @@ class DeviceManagerView(ViewBase):
parent: QWidget | None = None,
content: QWidget | None = None,
*,
view_id: str | None = None,
id: str | None = None,
title: str | None = None,
**kwargs,
):
super().__init__(
parent=parent,
content=content,
view_id=view_id,
title=title,
rpc_passthrough_children=False,
**kwargs,
)
self.device_manager_widget = DeviceManagerWidget(
parent=self, rpc_exposed=False, rpc_passthrough_children=False
)
super().__init__(parent=parent, content=content, id=id, title=title)
self.device_manager_widget = DeviceManagerWidget(parent=self)
self.set_content(self.device_manager_widget)
@SafeSlot()
@@ -45,110 +34,6 @@ class DeviceManagerView(ViewBase):
"""
self.device_manager_widget.on_enter()
def register_tour_steps(self, guided_tour, main_app) -> ViewTourSteps | None:
"""Register Device Manager components with the guided tour.
Args:
guided_tour: The GuidedTour instance to register with.
main_app: The main application instance (for accessing set_current).
Returns:
ViewTourSteps | None: Model containing view title and step IDs.
"""
step_ids = []
dm_widget = self.device_manager_widget
# The device_manager_widget is not yet initialized, so we will register
# tour steps for its uninitialized state.
# Register Load Current Config button
def get_load_current():
main_app.set_current("device_manager")
if dm_widget._initialized is True:
return (None, None)
return (dm_widget.button_load_current_config, None)
step_id = guided_tour.register_widget(
widget=get_load_current,
title="Load Current Config",
text="Load the current device configuration from the BEC server.",
)
step_ids.append(step_id)
# Register Load Config From File button
def get_load_file():
main_app.set_current("device_manager")
if dm_widget._initialized is True:
return (None, None)
return (dm_widget.button_load_config_from_file, None)
step_id = guided_tour.register_widget(
widget=get_load_file,
title="Load Config From File",
text="Load a device configuration from a YAML file on disk.",
)
step_ids.append(step_id)
## Register steps for the initialized state
# Register main device table
def get_device_table():
main_app.set_current("device_manager")
if dm_widget._initialized is False:
return (None, None)
return (dm_widget.device_manager_display.device_table_view, None)
step_id = guided_tour.register_widget(
widget=get_device_table,
title="Device Table",
text="This table displays the config that is prepared to be uploaded to the BEC server. It allows users to review and modify device config settings, and also validate them before uploading to the BEC server.",
)
step_ids.append(step_id)
col_text_mapping = {
0: "Shows if a device configuration is valid. Automatically validated when adding a new device.",
1: "Shows if a device is connectable. Validated on demand.",
2: "Device name, unique across all devices within a config.",
3: "Device class used to initialize the device on the BEC server.",
4: "Defines how BEC treats readings of the device during scans. The options are 'monitored', 'baseline', 'async', 'continuous' or 'on_demand'.",
5: "Defines how BEC reacts if a device readback fails. Options are 'raise', 'retry', or 'buffer'.",
6: "User-defined tags associated with the device.",
7: "A brief description of the device.",
8: "Device is enabled when the configuration is loaded.",
9: "Device is set to read-only.",
10: "This flag allows to configure if the 'trigger' method of the device is called during scans.",
}
# We have at least one device registered
def get_device_table_row(column: int):
main_app.set_current("device_manager")
if dm_widget._initialized is False:
return (None, None)
table = dm_widget.device_manager_display.device_table_view.table
header = table.horizontalHeader()
x = header.sectionViewportPosition(column)
table.horizontalScrollBar().setValue(x)
# Recompute after scrolling
x = header.sectionViewportPosition(column)
w = header.sectionSize(column)
h = header.height()
rect = QRect(x, 0, w, h)
top_left = header.viewport().mapTo(main_app, rect.topLeft())
return (QRect(top_left, rect.size()), col_text_mapping.get(column, ""))
for col, text in col_text_mapping.items():
step_id = guided_tour.register_widget(
widget=lambda col=col: get_device_table_row(col),
title=f"{dm_widget.device_manager_display.device_table_view.table.horizontalHeaderItem(col).text()}",
text=text,
)
step_ids.append(step_id)
if not step_ids:
return None
return ViewTourSteps(view_title="Device Manager", step_ids=step_ids)
if __name__ == "__main__": # pragma: no cover
import sys
@@ -180,7 +65,7 @@ if __name__ == "__main__": # pragma: no cover
_app.add_view(
icon="display_settings",
title="Device Manager",
view_id="device_manager",
id="device_manager",
widget=device_manager_view.device_manager_widget,
mini_text="DM",
)
@@ -22,8 +22,8 @@ class DeviceManagerWidget(BECWidget, QtWidgets.QWidget):
RPC = False
def __init__(self, parent=None, client=None, **kwargs):
super().__init__(parent=parent, client=client, **kwargs)
def __init__(self, parent=None, client=None):
super().__init__(parent=parent, client=client)
self.stacked_layout = QtWidgets.QStackedLayout()
self.stacked_layout.setContentsMargins(0, 0, 0, 0)
self.stacked_layout.setSpacing(0)
@@ -1,31 +0,0 @@
from qtpy.QtWidgets import QWidget
from bec_widgets.applications.views.view import ViewBase
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
class DockAreaView(ViewBase):
"""
Modular dock area view for arranging and managing multiple dockable widgets.
"""
RPC_CONTENT_CLASS = BECDockArea
def __init__(
self,
parent: QWidget | None = None,
content: QWidget | None = None,
*,
view_id: str | None = None,
title: str | None = None,
**kwargs,
):
super().__init__(parent=parent, content=content, view_id=view_id, title=title, **kwargs)
self.dock_area = BECDockArea(
self,
profile_namespace="bec",
auto_profile_namespace=False,
object_name="DockArea",
rpc_exposed=False,
)
self.set_content(self.dock_area)
+107 -64
View File
@@ -2,8 +2,7 @@ from __future__ import annotations
from typing import List
from pydantic import BaseModel
from qtpy.QtCore import QEventLoop
from qtpy.QtCore import QEventLoop, Qt, QTimer
from qtpy.QtWidgets import (
QDialog,
QDialogButtonBox,
@@ -12,31 +11,55 @@ from qtpy.QtWidgets import (
QLabel,
QMessageBox,
QPushButton,
QSplitter,
QStackedLayout,
QVBoxLayout,
QWidget,
)
from bec_widgets import BECWidget
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import SignalComboBox
from bec_widgets.widgets.plots.waveform.waveform import Waveform
class ViewTourSteps(BaseModel):
"""Model representing tour steps for a view.
Attributes:
view_title: The human-readable title of the view.
step_ids: List of registered step IDs in the order they should appear.
def set_splitter_weights(splitter: QSplitter, weights: List[float]) -> None:
"""
Apply initial sizes to a splitter using weight ratios, e.g. [1,3,2,1].
Works for horizontal or vertical splitters and sets matching stretch factors.
"""
view_title: str
step_ids: List[str]
def apply():
n = splitter.count()
if n == 0:
return
w = list(weights[:n]) + [1] * max(0, n - len(weights))
w = [max(0.0, float(x)) for x in w]
tot_w = sum(w)
if tot_w <= 0:
w = [1.0] * n
tot_w = float(n)
total_px = (
splitter.width()
if splitter.orientation() == Qt.Orientation.Horizontal
else splitter.height()
)
if total_px < 2:
QTimer.singleShot(0, apply)
return
sizes = [max(1, int(total_px * (wi / tot_w))) for wi in w]
diff = total_px - sum(sizes)
if diff != 0:
idx = max(range(n), key=lambda i: w[i])
sizes[idx] = max(1, sizes[idx] + diff)
splitter.setSizes(sizes)
for i, wi in enumerate(w):
splitter.setStretchFactor(i, max(1, int(round(wi * 100))))
QTimer.singleShot(0, apply)
class ViewBase(BECWidget, QWidget):
class ViewBase(QWidget):
"""Wrapper for a content widget used inside the main app's stacked view.
Subclasses can implement `on_enter` and `on_exit` to run custom logic when the view becomes visible or is about to be hidden.
@@ -44,28 +67,21 @@ class ViewBase(BECWidget, QWidget):
Args:
content (QWidget): The actual view widget to display.
parent (QWidget | None): Parent widget.
view_id (str | None): Optional view view_id, useful for debugging or introspection.
id (str | None): Optional view id, useful for debugging or introspection.
title (str | None): Optional human-readable title.
"""
RPC = True
PLUGIN = False
USER_ACCESS = ["activate"]
RPC_CONTENT_CLASS: type[QWidget] | None = None
RPC_CONTENT_ATTR = "content"
def __init__(
self,
parent: QWidget | None = None,
content: QWidget | None = None,
*,
view_id: str | None = None,
id: str | None = None,
title: str | None = None,
**kwargs,
):
super().__init__(parent=parent, **kwargs)
super().__init__(parent=parent)
self.content: QWidget | None = None
self.view_id = view_id
self.view_id = id
self.view_title = title
lay = QVBoxLayout(self)
@@ -99,40 +115,67 @@ class ViewBase(BECWidget, QWidget):
"""
return True
@SafeSlot()
def activate(self) -> None:
"""Switch the parent application to this view."""
if not self.view_id:
raise ValueError("Cannot switch view without a view_id.")
####### Default view has to be done with setting up splitters ########
def set_default_view(self, horizontal_weights: list, vertical_weights: list):
"""Apply initial weights to every horizontal and vertical splitter.
parent = self.parent()
while parent is not None:
if hasattr(parent, "set_current"):
parent.set_current(self.view_id)
return
parent = parent.parent()
raise RuntimeError("Could not find a parent application with set_current().")
def cleanup(self):
if self.content is not None:
self.content.close()
self.content.deleteLater()
super().cleanup()
def register_tour_steps(self, guided_tour, main_app) -> ViewTourSteps | None:
"""Register this view's components with the guided tour.
Args:
guided_tour: The GuidedTour instance to register with.
main_app: The main application instance (for accessing set_current).
Returns:
ViewTourSteps | None: A model containing the view title and step IDs,
or None if this view has no tour steps.
Override this method in subclasses to register view-specific components.
Examples:
horizontal_weights = [1, 3, 2, 1]
vertical_weights = [3, 7] # top:bottom = 30:70
"""
return None
splitters_h = []
splitters_v = []
for splitter in self.findChildren(QSplitter):
if splitter.orientation() == Qt.Orientation.Horizontal:
splitters_h.append(splitter)
elif splitter.orientation() == Qt.Orientation.Vertical:
splitters_v.append(splitter)
def apply_all():
for s in splitters_h:
set_splitter_weights(s, horizontal_weights)
for s in splitters_v:
set_splitter_weights(s, vertical_weights)
QTimer.singleShot(0, apply_all)
def set_stretch(self, *, horizontal=None, vertical=None):
"""Update splitter weights and re-apply to all splitters.
Accepts either a list/tuple of weights (e.g., [1,3,2,1]) or a role dict
for convenience: horizontal roles = {"left","center","right"},
vertical roles = {"top","bottom"}.
"""
def _coerce_h(x):
if x is None:
return None
if isinstance(x, (list, tuple)):
return list(map(float, x))
if isinstance(x, dict):
return [
float(x.get("left", 1)),
float(x.get("center", x.get("middle", 1))),
float(x.get("right", 1)),
]
return None
def _coerce_v(x):
if x is None:
return None
if isinstance(x, (list, tuple)):
return list(map(float, x))
if isinstance(x, dict):
return [float(x.get("top", 1)), float(x.get("bottom", 1))]
return None
h = _coerce_h(horizontal)
v = _coerce_v(vertical)
if h is None:
h = [1, 1, 1]
if v is None:
v = [1, 1]
self.set_default_view(h, v)
####################################################################################################
@@ -160,17 +203,17 @@ class WaveformViewPopup(ViewBase): # pragma: no cover
self.device_edit.insertItem(0, "")
self.device_edit.setEditable(True)
self.device_edit.setCurrentIndex(0)
self.signal_edit = SignalComboBox(parent=self)
self.signal_edit.include_config_signals = False
self.signal_edit.insertItem(0, "")
self.signal_edit.setEditable(True)
self.device_edit.currentTextChanged.connect(self.signal_edit.set_device)
self.device_edit.device_reset.connect(self.signal_edit.reset_selection)
self.entry_edit = SignalComboBox(parent=self)
self.entry_edit.include_config_signals = False
self.entry_edit.insertItem(0, "")
self.entry_edit.setEditable(True)
self.device_edit.currentTextChanged.connect(self.entry_edit.set_device)
self.device_edit.device_reset.connect(self.entry_edit.reset_selection)
form = QFormLayout()
form.addRow(label)
form.addRow("Device", self.device_edit)
form.addRow("Signal", self.signal_edit)
form.addRow("Signal", self.entry_edit)
buttons = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel, parent=dialog)
buttons.accepted.connect(dialog.accept)
@@ -182,7 +225,7 @@ class WaveformViewPopup(ViewBase): # pragma: no cover
if dialog.exec_() == QDialog.Accepted:
self.waveform.plot(
device_y=self.device_edit.currentText(), signal_y=self.signal_edit.currentText()
y_name=self.device_edit.currentText(), y_entry=self.entry_edit.currentText()
)
@SafeSlot()
@@ -307,7 +350,7 @@ class WaveformViewInline(ViewBase): # pragma: no cover
dev = self.device_edit.currentText()
sig = self.entry_edit.currentText()
if dev and sig:
self.waveform.plot(device_y=dev, signal_y=sig)
self.waveform.plot(y_name=dev, y_entry=sig)
self.stack.setCurrentIndex(1)
def _show_waveform_without_changes(self):
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
-1
View File
@@ -1 +0,0 @@
from bec_widgets.cli.rpc import rpc_base
+820 -1097
View File
File diff suppressed because it is too large Load Diff
+36 -275
View File
@@ -5,15 +5,14 @@ from __future__ import annotations
import json
import os
import select
import signal
import subprocess
import threading
import time
from contextlib import contextmanager
from threading import Lock
from typing import TYPE_CHECKING, Callable, Literal, TypeAlias, cast
from typing import TYPE_CHECKING, Literal, TypeAlias, cast
from bec_lib.endpoints import EndpointInfo, MessageEndpoints
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
from rich.console import Console
@@ -34,12 +33,6 @@ else:
logger = bec_logger.logger
IGNORE_WIDGETS = ["LaunchWindow"]
PROCESS_TERMINATION_TIMEOUT = 10
PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT = 2
PROCESS_OUTPUT_SELECT_TIMEOUT = 0.2
GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT = 3
GRACEFUL_SERVER_SHUTDOWN_TIMEOUT = 5
OUTPUT_READER_STOP_EVENT_ATTR = "_bec_output_reader_stop_event"
RegistryState: TypeAlias = dict[
Literal["gui_id", "name", "widget_class", "config", "__rpc__", "container_proxy"],
@@ -60,16 +53,14 @@ def _filter_output(output: str) -> str:
return output
def _get_output(process, logger, stop_event: threading.Event | None = None) -> None:
def _get_output(process, logger) -> None:
log_func = {process.stdout: logger.debug, process.stderr: logger.info}
stream_buffer = {process.stdout: [], process.stderr: []}
try:
os.set_blocking(process.stdout.fileno(), False)
os.set_blocking(process.stderr.fileno(), False)
while process.poll() is None and not (stop_event and stop_event.is_set()):
readylist, _, _ = select.select(
[process.stdout, process.stderr], [], [], PROCESS_OUTPUT_SELECT_TIMEOUT
)
while process.poll() is None:
readylist, _, _ = select.select([process.stdout, process.stderr], [], [], 1)
for stream in (process.stdout, process.stderr):
buf = stream_buffer[stream]
if stream in readylist:
@@ -84,95 +75,6 @@ def _get_output(process, logger, stop_event: threading.Event | None = None) -> N
logger.error(f"Error reading process output: {str(e)}")
def _process_group_snapshot(process) -> str:
try:
pgid = os.getpgid(process.pid)
except ProcessLookupError:
return "Process group snapshot unavailable: process already exited"
try:
result = subprocess.run(
["ps", "-o", "pid,ppid,pgid,stat,command", "-g", str(pgid)],
check=False,
capture_output=True,
text=True,
timeout=2,
)
except Exception as exc:
return f"Process group snapshot unavailable: {exc}"
output = result.stdout.strip()
if not output:
return f"Process group snapshot empty for pgid={pgid}"
return output
def _terminate_plot_process(process, logger, timeout: float = PROCESS_TERMINATION_TIMEOUT) -> None:
if process.poll() is not None:
return
process_info = f"pid={process.pid} command={process.args}"
try:
pgid = os.getpgid(process.pid)
process_info = f"pid={process.pid} pgid={pgid} command={process.args}"
logger.info(f"Terminating GUI process group {process_info}")
os.killpg(pgid, signal.SIGTERM)
except ProcessLookupError:
process.wait(timeout=timeout)
return
except Exception as exc:
logger.warning("Failed to terminate GUI process group; terminating process only.")
logger.info(f"GUI process termination failure details: {exc}. pid={process.pid}")
process.terminate()
try:
process.wait(timeout=timeout)
return
except subprocess.TimeoutExpired:
logger.warning(f"GUI process did not stop within {timeout}s; killing it.")
logger.info(
f"GUI process force-kill details: {process_info}\n"
f"{_process_group_snapshot(process)}"
)
try:
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
except ProcessLookupError as e:
logger.error(f"Failed to kill GUI process group: {e}")
process.wait(timeout=timeout)
return
process.wait(timeout=timeout)
def _wait_for_process_exit(process, timeout: float) -> bool:
try:
process.wait(timeout=timeout)
except subprocess.TimeoutExpired:
return False
return True
def _join_process_output_thread(process, thread: threading.Thread | None, logger) -> None:
if thread is None:
return
thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT)
if not thread.is_alive():
return
if stop_event := getattr(thread, OUTPUT_READER_STOP_EVENT_ATTR, None):
stop_event.set()
for stream in (process.stdout, process.stderr):
if stream is None:
continue
try:
stream.close()
except OSError as e:
logger.error(f"Failed to close stream {str(e)}")
thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT)
if thread.is_alive():
logger.warning("GUI process output reader thread did not stop after process shutdown.")
logger.info(f"GUI process output reader thread details: pid={process.pid}")
def _start_plot_process(
gui_id: str,
gui_class_id: str,
@@ -224,14 +126,8 @@ def _start_plot_process(
if logger is None:
process_output_processing_thread = None
else:
process_output_stop_event = threading.Event()
process_output_processing_thread = threading.Thread(
target=_get_output, args=(process, logger, process_output_stop_event)
)
setattr(
process_output_processing_thread,
OUTPUT_READER_STOP_EVENT_ATTR,
process_output_stop_event,
target=_get_output, args=(process, logger)
)
process_output_processing_thread.start()
return process, process_output_processing_thread
@@ -326,7 +222,6 @@ class BECGuiClient(RPCBase):
self._ipython_registry: dict[str, RPCReference] = {}
self.available_widgets = AvailableWidgetsNamespace()
register_serializer_extension()
self._rpc_timeout = 60
####################
#### Client API ####
@@ -337,21 +232,6 @@ class BECGuiClient(RPCBase):
"""The launcher object."""
return RPCBase(gui_id=f"{self._gui_id}:launcher", parent=self, object_name="launcher")
def set_rpc_timeout(self, timeout: float):
"""Set the timeout for RPC calls to the GUI server.
Args:
timeout(float): The timeout in seconds.
"""
if not isinstance(timeout, (int, float)) or timeout < 0:
raise ValueError("Timeout must be a non-negative number.")
self._rpc_timeout = timeout
def _safe_register_stream(self, endpoint: EndpointInfo, cb: Callable, **kwargs):
"""Check if already registered for registration in idempotent functions."""
if not self._client.connector.any_stream_is_registered(endpoint, cb=cb):
self._client.connector.register(endpoint, cb=cb, **kwargs)
def connect_to_gui_server(self, gui_id: str) -> None:
"""Connect to a GUI server"""
# Unregister the old callback
@@ -367,9 +247,10 @@ class BECGuiClient(RPCBase):
self._ipython_registry = {}
# Register the new callback
self._safe_register_stream(
self._client.connector.register(
MessageEndpoints.gui_registry_state(self._gui_id),
cb=self._handle_registry_update,
parent=self,
from_start=True,
)
@@ -416,119 +297,38 @@ class BECGuiClient(RPCBase):
return self._raise_all()
return self._start(wait=wait)
def change_theme(self, theme: Literal["light", "dark"] | None = None) -> None:
"""
Apply a GUI theme or toggle between dark and light.
Args:
theme(Literal["light", "dark"] | None): Theme to apply. If None, the current
theme is fetched from the GUI and toggled.
"""
if not self._check_if_server_is_alive():
self._start(wait=True)
with wait_for_server(self):
if theme is None:
current_theme = self.launcher._run_rpc("fetch_theme")
next_theme = "light" if current_theme == "dark" else "dark"
else:
next_theme = theme
self.launcher._run_rpc("change_theme", theme=next_theme)
def new(
self,
name: str | None = None,
wait: bool = True,
geometry: tuple[int, int, int, int] | None = None,
launch_script: str = "dock_area",
startup_profile: str | Literal["restore", "skip"] | None = None,
**kwargs,
) -> client.AdvancedDockArea:
) -> client.BECDockArea:
"""Create a new top-level dock area.
Args:
name(str, optional): The name of the dock area. Defaults to None.
wait(bool, optional): Whether to wait for the server to start. Defaults to True.
geometry(tuple[int, int, int, int] | None): The geometry of the dock area (pos_x, pos_y, w, h).
launch_script(str): The launch script to use. Defaults to "dock_area".
startup_profile(str | Literal["restore", "skip"] | None): Startup mode for
the dock area:
- None: start in transient empty workspace
- "restore": restore last-used profile
- "skip": skip profile initialization
- "<name>": load the named profile
**kwargs: Additional keyword arguments passed to the dock area.
geometry(tuple[int, int, int, int] | None): The geometry of the dock area (pos_x, pos_y, w, h)
Returns:
client.AdvancedDockArea: The new dock area.
Examples:
>>> gui.new() # Start with an empty unsaved workspace
>>> gui.new(startup_profile="restore") # Restore last profile
>>> gui.new(startup_profile="my_profile") # Load explicit profile
client.BECDockArea: The new dock area.
"""
if "profile" in kwargs or "start_empty" in kwargs:
raise TypeError(
"gui.new() no longer accepts 'profile' or 'start_empty'. Use 'startup_profile' instead."
)
if not self._check_if_server_is_alive():
self.show(wait=True)
self.start(wait=True)
if wait:
with wait_for_server(self):
return self._new_impl(
name=name,
geometry=geometry,
launch_script=launch_script,
startup_profile=startup_profile,
**kwargs,
)
return self._new_impl(
name=name,
geometry=geometry,
launch_script=launch_script,
startup_profile=startup_profile,
**kwargs,
)
def _new_impl(
self,
*,
name: str | None,
geometry: tuple[int, int, int, int] | None,
launch_script: str,
startup_profile: str | Literal["restore", "skip"] | None,
**kwargs,
):
if launch_script == "dock_area":
try:
return self.launcher._run_rpc(
"system.launch_dock_area",
name=name,
geometry=geometry,
startup_profile=startup_profile,
**kwargs,
)
except ValueError as exc:
error = str(exc)
if (
"Unknown system RPC method: system.launch_dock_area" not in error
and "has no attribute 'system.launch_dock_area'" not in error
):
raise
logger.debug("Server does not support system.launch_dock_area; using launcher RPC")
return self.launcher._run_rpc(
"launch",
launch_script=launch_script,
name=name,
geometry=geometry,
startup_profile=startup_profile,
**kwargs,
widget = self.launcher._run_rpc(
"launch", launch_script=launch_script, name=name, geometry=geometry, **kwargs
) # pylint: disable=protected-access
return widget
widget = self.launcher._run_rpc(
"launch", launch_script=launch_script, name=name, geometry=geometry, **kwargs
) # pylint: disable=protected-access
return widget
def delete(self, name: str) -> None:
"""Delete a dock area and its parent window.
"""Delete a dock area.
Args:
name(str): The name of the dock area.
@@ -536,19 +336,7 @@ class BECGuiClient(RPCBase):
widget = self.windows.get(name)
if widget is None:
raise ValueError(f"Dock area {name} not found.")
# Get the container_proxy (parent window) gui_id from the server registry
obj = self._server_registry.get(widget._gui_id)
if obj is None:
raise ValueError(f"Widget {name} not found in registry.")
container_gui_id = obj.get("container_proxy")
if container_gui_id:
# Close the container window which will also clean up the dock area
widget._run_rpc("close", gui_id=container_gui_id) # pylint: disable=protected-access
else:
# Fallback: just close the dock area directly
widget._run_rpc("close") # pylint: disable=protected-access
widget._run_rpc("close") # pylint: disable=protected-access
def delete_all(self) -> None:
"""Delete all dock areas."""
@@ -569,13 +357,11 @@ class BECGuiClient(RPCBase):
if self._process:
logger.success("Stopping GUI...")
if not self._request_server_shutdown():
_terminate_plot_process(self._process, logger)
_join_process_output_thread(
self._process, self._process_output_processing_thread, logger
)
self._process.terminate()
if self._process_output_processing_thread:
self._process_output_processing_thread.join()
self._process.wait()
self._process = None
self._process_output_processing_thread = None
# Unregister the registry state
self._client.connector.unregister(
@@ -594,37 +380,6 @@ class BECGuiClient(RPCBase):
#### Private methods ####
#########################
def _request_server_shutdown(self) -> bool:
if self._process is None or self._process.poll() is not None:
return True
process_details = f"pid={self._process.pid} command={self._process.args}"
logger.info(f"Requesting graceful GUI shutdown {process_details}")
try:
self.launcher._run_rpc( # pylint: disable=protected-access
"system.shutdown",
wait_for_rpc_response=True,
timeout=GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT,
)
except Exception as exc:
logger.warning(
"Could not confirm graceful GUI shutdown via RPC; "
"falling back to process termination."
)
logger.info(f"Graceful GUI shutdown RPC failure details: {exc}. {process_details}")
return False
if _wait_for_process_exit(self._process, GRACEFUL_SERVER_SHUTDOWN_TIMEOUT):
logger.info(f"GUI server exited after graceful shutdown {process_details}")
return True
logger.warning(
"GUI server did not exit after graceful shutdown request; "
"falling back to process termination."
)
logger.info(
f"Graceful GUI shutdown timeout details: {process_details}\n"
f"{_process_group_snapshot(self._process)}"
)
return False
def _check_if_server_is_alive(self):
"""Checks if the process is alive"""
if self._process is None:
@@ -683,14 +438,20 @@ class BECGuiClient(RPCBase):
def _start(self, wait: bool = False) -> None:
self._killed = False
self._safe_register_stream(
MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update
self._client.connector.register(
MessageEndpoints.gui_registry_state(self._gui_id),
cb=self._handle_registry_update,
parent=self,
)
return self._start_server(wait=wait)
def _handle_registry_update(self, msg: dict[str, GUIRegistryStateMessage]) -> None:
@staticmethod
def _handle_registry_update(
msg: dict[str, GUIRegistryStateMessage], parent: BECGuiClient
) -> None:
# This was causing a deadlock during shutdown, not sure why.
# with self._lock:
self = parent
self._server_registry = cast(dict[str, RegistryState], msg["data"].state)
self._update_dynamic_namespace(self._server_registry)
@@ -698,7 +459,7 @@ class BECGuiClient(RPCBase):
if self.launcher and len(self._top_level) == 0:
self.launcher._run_rpc("show") # pylint: disable=protected-access
for window in self._top_level.values():
window.raise_window()
window.show()
def _show_all(self):
with wait_for_server(self):
@@ -717,7 +478,7 @@ class BECGuiClient(RPCBase):
if self.launcher and len(self._top_level) == 0:
self.launcher._run_rpc("raise") # pylint: disable=protected-access
for window in self._top_level.values():
window.raise_window()
window._run_rpc("raise") # type: ignore[attr-defined]
def _raise_all(self):
with wait_for_server(self):
-166
View File
@@ -1,166 +0,0 @@
# This file was automatically generated by generate_cli.py
# type: ignore
from __future__ import annotations
# pylint: skip-file
designer_plugins = {
"AbortButton": ("bec_widgets.widgets.control.buttons.button_abort.button_abort", "AbortButton"),
"BECColorMapWidget": (
"bec_widgets.widgets.utility.visual.colormap_widget.colormap_widget",
"BECColorMapWidget",
),
"BECMainWindow": ("bec_widgets.widgets.containers.main_window.main_window", "BECMainWindow"),
"BECProgressBar": (
"bec_widgets.widgets.progress.bec_progressbar.bec_progressbar",
"BECProgressBar",
),
"BECQueue": ("bec_widgets.widgets.services.bec_queue.bec_queue", "BECQueue"),
"BECShell": ("bec_widgets.widgets.editors.bec_console.bec_console", "BECShell"),
"BECSpinBox": ("bec_widgets.widgets.utility.spinbox.decimal_spinbox", "BECSpinBox"),
"BECStatusBox": ("bec_widgets.widgets.services.bec_status_box.bec_status_box", "BECStatusBox"),
"BeamlineStateManager": (
"bec_widgets.widgets.services.beamline_states.beamline_state_manager",
"BeamlineStateManager",
),
"BecConsole": ("bec_widgets.widgets.editors.bec_console.bec_console", "BecConsole"),
"ColorButton": ("bec_widgets.widgets.utility.visual.color_button.color_button", "ColorButton"),
"ColorButtonNative": (
"bec_widgets.widgets.utility.visual.color_button_native.color_button_native",
"ColorButtonNative",
),
"ColormapSelector": (
"bec_widgets.widgets.utility.visual.colormap_selector.colormap_selector",
"ColormapSelector",
),
"DapComboBox": ("bec_widgets.widgets.dap.dap_combo_box.dap_combo_box", "DapComboBox"),
"DarkModeButton": (
"bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button",
"DarkModeButton",
),
"DeviceBrowser": (
"bec_widgets.widgets.services.device_browser.device_browser",
"DeviceBrowser",
),
"DeviceComboBox": (
"bec_widgets.widgets.control.device_input.device_combobox.device_combobox",
"DeviceComboBox",
),
"Heatmap": ("bec_widgets.widgets.plots.heatmap.heatmap", "Heatmap"),
"IDEExplorer": ("bec_widgets.widgets.utility.ide_explorer.ide_explorer", "IDEExplorer"),
"Image": ("bec_widgets.widgets.plots.image.image", "Image"),
"LMFitDialog": ("bec_widgets.widgets.dap.lmfit_dialog.lmfit_dialog", "LMFitDialog"),
"LogPanel": ("bec_widgets.widgets.utility.logpanel.logpanel", "LogPanel"),
"Minesweeper": ("bec_widgets.widgets.games.minesweeper", "Minesweeper"),
"MonacoWidget": ("bec_widgets.widgets.editors.monaco.monaco_widget", "MonacoWidget"),
"MotorMap": ("bec_widgets.widgets.plots.motor_map.motor_map", "MotorMap"),
"MultiWaveform": ("bec_widgets.widgets.plots.multi_waveform.multi_waveform", "MultiWaveform"),
"PdfViewerWidget": ("bec_widgets.widgets.utility.pdf_viewer.pdf_viewer", "PdfViewerWidget"),
"PositionIndicator": (
"bec_widgets.widgets.control.device_control.position_indicator.position_indicator",
"PositionIndicator",
),
"PositionerBox": (
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box.positioner_box",
"PositionerBox",
),
"PositionerBox2D": (
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box_2d.positioner_box_2d",
"PositionerBox2D",
),
"PositionerControlLine": (
"bec_widgets.widgets.control.device_control.positioner_box.positioner_control_line.positioner_control_line",
"PositionerControlLine",
),
"PositionerGroup": (
"bec_widgets.widgets.control.device_control.positioner_group.positioner_group",
"PositionerGroup",
),
"ResetButton": ("bec_widgets.widgets.control.buttons.button_reset.button_reset", "ResetButton"),
"ResumeButton": (
"bec_widgets.widgets.control.buttons.button_resume.button_resume",
"ResumeButton",
),
"RingProgressBar": (
"bec_widgets.widgets.progress.ring_progress_bar.ring_progress_bar",
"RingProgressBar",
),
"SBBMonitor": ("bec_widgets.widgets.editors.sbb_monitor.sbb_monitor", "SBBMonitor"),
"ScanControl": ("bec_widgets.widgets.control.scan_control.scan_control", "ScanControl"),
"ScanMetadata": ("bec_widgets.widgets.editors.scan_metadata.scan_metadata", "ScanMetadata"),
"ScanProgressBar": (
"bec_widgets.widgets.progress.scan_progressbar.scan_progressbar",
"ScanProgressBar",
),
"ScatterWaveform": (
"bec_widgets.widgets.plots.scatter_waveform.scatter_waveform",
"ScatterWaveform",
),
"SignalComboBox": (
"bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox",
"SignalComboBox",
),
"SignalLabel": ("bec_widgets.widgets.utility.signal_label.signal_label", "SignalLabel"),
"SpinnerWidget": ("bec_widgets.widgets.utility.spinner.spinner", "SpinnerWidget"),
"StopButton": ("bec_widgets.widgets.control.buttons.stop_button.stop_button", "StopButton"),
"TextBox": ("bec_widgets.widgets.editors.text_box.text_box", "TextBox"),
"ToggleSwitch": ("bec_widgets.widgets.utility.toggle.toggle", "ToggleSwitch"),
"Waveform": ("bec_widgets.widgets.plots.waveform.waveform", "Waveform"),
"WebsiteWidget": ("bec_widgets.widgets.editors.website.website", "WebsiteWidget"),
"WidgetFinderComboBox": (
"bec_widgets.widgets.utility.widget_finder.widget_finder",
"WidgetFinderComboBox",
),
}
widget_icons = {
"AbortButton": "cancel",
"BECColorMapWidget": "palette",
"BECMainWindow": "widgets",
"BECProgressBar": "page_control",
"BECQueue": "edit_note",
"BECShell": "hub",
"BECSpinBox": "123",
"BECStatusBox": "widgets",
"BeamlineStateManager": "format_list_bulleted",
"BecConsole": "terminal",
"ColorButton": "colors",
"ColorButtonNative": "colors",
"ColormapSelector": "palette",
"DapComboBox": "data_exploration",
"DarkModeButton": "dark_mode",
"DeviceBrowser": "lists",
"DeviceComboBox": "list_alt",
"Heatmap": "dataset",
"IDEExplorer": "widgets",
"Image": "image",
"LMFitDialog": "monitoring",
"LogPanel": "browse_activity",
"Minesweeper": "videogame_asset",
"MonacoWidget": "code",
"MotorMap": "my_location",
"MultiWaveform": "ssid_chart",
"PdfViewerWidget": "picture_as_pdf",
"PositionIndicator": "horizontal_distribute",
"PositionerBox": "switch_right",
"PositionerBox2D": "switch_right",
"PositionerControlLine": "switch_left",
"PositionerGroup": "grid_view",
"ResetButton": "restart_alt",
"ResumeButton": "resume",
"RingProgressBar": "track_changes",
"SBBMonitor": "train",
"ScanControl": "tune",
"ScanMetadata": "list_alt",
"ScanProgressBar": "timelapse",
"ScatterWaveform": "scatter_plot",
"SignalComboBox": "list_alt",
"SignalLabel": "scoreboard",
"SpinnerWidget": "progress_activity",
"StopButton": "dangerous",
"TextBox": "chat",
"ToggleSwitch": "toggle_on",
"Waveform": "show_chart",
"WebsiteWidget": "travel_explore",
"WidgetFinderComboBox": "frame_inspect",
}
@@ -7,22 +7,31 @@ import inspect
import os
import sys
from pathlib import Path
from typing import get_overloads
import black
import isort
from bec_lib.logger import bec_logger
from qtpy.QtCore import Property as QtProperty
from bec_widgets.utils.generate_designer_plugin import (
DesignerPluginGenerator,
DesignerPluginInfo,
plugin_filenames,
)
from bec_widgets.utils.generate_designer_plugin import DesignerPluginGenerator, plugin_filenames
from bec_widgets.utils.plugin_utils import BECClassContainer, get_custom_classes
logger = bec_logger.logger
if sys.version_info >= (3, 11):
from typing import get_overloads
else:
print(
"Python version is less than 3.11, using dummy function for get_overloads. "
"If you want to use the real function 'typing.get_overloads()', please use Python 3.11 or later."
)
def get_overloads(_obj):
"""
Dummy function for Python versions before 3.11.
"""
return []
class ClientGenerator:
def __init__(self, base=False):
@@ -45,7 +54,7 @@ from __future__ import annotations
from bec_lib.logger import bec_logger
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call, rpc_timeout
{"from bec_widgets.utils.bec_plugin_helper import get_plugin_client_module" if self._base else ""}
{"from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets, get_plugin_client_module" if self._base else ""}
logger = bec_logger.logger
@@ -85,7 +94,7 @@ logger = bec_logger.logger
if self._base:
self.content += """
class _WidgetsEnumType(str, enum.Enum):
\"\"\" Enum for the available widgets, to be generated programmatically \"\"\"
\"\"\" Enum for the available widgets, to be generated programatically \"\"\"
...
"""
@@ -102,19 +111,27 @@ _Widgets = {
self.content += """
try:
_plugin_widgets = get_all_plugin_widgets().as_dict()
plugin_client = get_plugin_client_module()
Widgets = _WidgetsEnumType("Widgets", {name: name for name in _plugin_widgets} | _Widgets)
if (_overlap := _Widgets.keys() & _plugin_widgets.keys()) != set():
for _widget in _overlap:
logger.warning(f"Detected duplicate widget {_widget} in plugin repo file: {inspect.getfile(_plugin_widgets[_widget])} !")
for plugin_name, plugin_class in inspect.getmembers(plugin_client, inspect.isclass):
if issubclass(plugin_class, RPCBase) and plugin_class is not RPCBase:
if plugin_name not in _Widgets:
_Widgets[plugin_name] = plugin_name
if plugin_name in globals():
conflicting_file = (
inspect.getfile(_plugin_widgets[plugin_name])
if plugin_name in _plugin_widgets
else f"{plugin_client}"
)
logger.warning(
f"Plugin widget {plugin_name} in {plugin_class._IMPORT_MODULE} conflicts with a built-in class!"
f"Plugin widget {plugin_name} from {conflicting_file} conflicts with a built-in class!"
)
continue
else:
if plugin_name not in _overlap:
globals()[plugin_name] = plugin_class
Widgets = _WidgetsEnumType("Widgets", _Widgets)
except ImportError as e:
logger.error(f"Failed loading plugins: \\n{reduce(add, traceback.format_exception(e))}")
"""
@@ -129,8 +146,12 @@ except ImportError as e:
class_name = cls.__name__
self.content += f"""
class {class_name}(RPCBase):\n"""
if class_name == "BECDockArea":
self.content += f"""
class {class_name}(RPCBase):"""
else:
self.content += f"""
class {class_name}(RPCBase):"""
if cls.__doc__:
# We only want the first line of the docstring
@@ -141,11 +162,19 @@ class {class_name}(RPCBase):\n"""
else:
class_docs = cls.__doc__.split("\n")[1]
self.content += f"""
\"\"\"{class_docs}\"\"\"\n"""
user_access_entries = self._get_user_access_entries(cls)
self.content += f' _IMPORT_MODULE="{cls.__module__}"\n'
for method_entry in user_access_entries:
method, obj, is_property_setter = self._resolve_method_object(cls, method_entry)
\"\"\"{class_docs}\"\"\"
"""
if not cls.USER_ACCESS:
self.content += """...
"""
for method in cls.USER_ACCESS:
is_property_setter = False
obj = getattr(cls, method, None)
if obj is None:
obj = getattr(cls, method.split(".setter")[0], None)
is_property_setter = True
method = method.split(".setter")[0]
if obj is None:
raise AttributeError(
f"Method {method} not found in class {cls.__name__}. "
@@ -187,34 +216,6 @@ class {class_name}(RPCBase):\n"""
{doc}
\"\"\""""
@staticmethod
def _get_user_access_entries(cls) -> list[str]:
entries = list(getattr(cls, "USER_ACCESS", []))
content_cls = getattr(cls, "RPC_CONTENT_CLASS", None)
if content_cls is not None:
entries.extend(getattr(content_cls, "USER_ACCESS", []))
return list(dict.fromkeys(entries))
@staticmethod
def _resolve_method_object(cls, method_entry: str):
method_name = method_entry
is_property_setter = False
if method_entry.endswith(".setter"):
is_property_setter = True
method_name = method_entry.split(".setter")[0]
candidate_classes = [cls]
content_cls = getattr(cls, "RPC_CONTENT_CLASS", None)
if content_cls is not None:
candidate_classes.append(content_cls)
for candidate_cls in candidate_classes:
obj = getattr(candidate_cls, method_name, None)
if obj is not None:
return method_name, obj, is_property_setter
return method_name, None, is_property_setter
def _rpc_call(self, timeout_info: dict[str, float | None]):
"""
Decorator to mark a method as an RPC call.
@@ -254,58 +255,6 @@ class {class_name}(RPCBase):\n"""
file.write(formatted_content)
def write_designer_plugins(plugin_infos: list[DesignerPluginInfo], file_name: str):
"""
Write a registry of Qt widget classes with designer plugins.
Args:
plugin_infos(list[DesignerPluginInfo]): The designer plugin metadata to write.
file_name(str): The name of the file to write to.
"""
plugin_infos = sorted(plugin_infos, key=lambda info: info.plugin_name_pascal)
content = """# This file was automatically generated by generate_cli.py
# type: ignore
from __future__ import annotations
# pylint: skip-file
designer_plugins = {
"""
for info in plugin_infos:
widget_module = info.plugin_class.__module__
widget_class = info.plugin_name_pascal
content += f' "{info.plugin_name_pascal}": ("{widget_module}", "{widget_class}"),\n'
content += """
}
widget_icons = {
"""
for info in plugin_infos:
content += f' "{info.plugin_name_pascal}": "{info.icon_name}",\n'
content += """
}
"""
try:
formatted_content = black.format_str(content, mode=black.Mode(line_length=100))
except black.NothingChanged:
formatted_content = content
config = isort.Config(
profile="black",
line_length=100,
multi_line_output=3,
include_trailing_comma=False,
known_first_party=["bec_widgets"],
)
formatted_content = isort.code(formatted_content, config=config)
with open(file_name, "w", encoding="utf-8") as file:
file.write(formatted_content)
def main():
"""
Main entry point for the script, controlled by command line arguments.
@@ -342,8 +291,7 @@ def main():
client_path = module_dir / client_subdir / "client.py"
packages = ("widgets", "applications") if module_name == "bec_widgets" else ("widgets",)
rpc_classes = get_custom_classes(module_name, packages=packages)
rpc_classes = get_custom_classes(module_name)
logger.info(f"Obtained classes with RPC objects: {rpc_classes!r}")
generator = ClientGenerator(base=module_name == "bec_widgets")
@@ -359,8 +307,6 @@ def main():
else:
non_overwrite_classes = []
designer_plugin_infos = []
for cls in rpc_classes.plugins:
logger.info(f"Writing bec-designer plugin files for {cls.__name__}...")
@@ -368,30 +314,21 @@ def main():
logger.error(
f"Not writing plugin files for {cls.__name__} because a built-in widget with that name exists"
)
continue
plugin = DesignerPluginGenerator(cls)
if not hasattr(plugin, "info") or plugin.excluded:
if not hasattr(plugin, "info"):
continue
def _exists(file: str):
return os.path.exists(os.path.join(plugin.info.base_path, file))
if any(_exists(file) for file in plugin_filenames(plugin.info.plugin_name_snake)):
if _exists(plugin.filenames.plugin):
designer_plugin_infos.append(plugin.info)
logger.debug(
f"Skipping generation of extra plugin files for {plugin.info.plugin_name_snake} - at least one file out of 'plugin.py', 'pyproject', and 'register_{plugin.info.plugin_name_snake}.py' already exists."
)
continue
plugin.run()
designer_plugin_infos.append(plugin.info)
# Write designer_plugins.py with plugin import metadata for all widgets with designer plugins.
designer_plugins_path = module_dir / client_subdir / "designer_plugins.py"
logger.info(f"Generating designer plugin registry at {designer_plugins_path}")
write_designer_plugins(designer_plugin_infos, str(designer_plugins_path))
if __name__ == "__main__": # pragma: no cover
+10 -59
View File
@@ -2,7 +2,6 @@ from __future__ import annotations
import inspect
import threading
import time
import uuid
from functools import wraps
from typing import TYPE_CHECKING, Any, cast
@@ -10,7 +9,6 @@ from typing import TYPE_CHECKING, Any, cast
from bec_lib.client import BECClient
from bec_lib.device import DeviceBaseWithConfig
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
if TYPE_CHECKING: # pragma: no cover
@@ -26,9 +24,6 @@ else:
# pylint: disable=protected-access
_DEFAULT_RPC_TIMEOUT = object()
logger = bec_logger.logger
def _name_arg(arg):
if isinstance(arg, DeviceBaseWithConfig):
@@ -159,7 +154,6 @@ class RPCReference:
class RPCBase:
def __init__(
self,
gui_id: str | None = None,
@@ -213,16 +207,12 @@ class RPCBase:
# Use explicit call to ensure action name is 'raise' (not 'raise_')
return self._run_rpc("raise")
def hide(self):
"""Hide this widget (or its container)."""
return self._run_rpc("hide")
def _run_rpc(
self,
method,
*args,
wait_for_rpc_response: bool = True,
timeout: float | None | object = _DEFAULT_RPC_TIMEOUT,
wait_for_rpc_response=True,
timeout=5,
gui_id: str | None = None,
**kwargs,
) -> Any:
@@ -233,16 +223,13 @@ class RPCBase:
method: The method to call.
args: The arguments to pass to the method.
wait_for_rpc_response: Whether to wait for the RPC response.
timeout: The timeout for the RPC response. If omitted, the client's default RPC
timeout is used. If explicitly set to None, wait indefinitely.
timeout: The timeout for the RPC response.
gui_id: The GUI ID to use for the RPC call. If None, the default GUI ID is used.
kwargs: The keyword arguments to pass to the method.
Returns:
The result of the RPC call.
"""
if timeout is _DEFAULT_RPC_TIMEOUT:
timeout = self._root._rpc_timeout
if method in ["show", "hide", "raise"] and gui_id is None:
obj = self._root._server_registry.get(self._gui_id)
if obj is None:
@@ -261,42 +248,17 @@ class RPCBase:
self._rpc_response = None
self._msg_wait_event.clear()
self._client.connector.register(
MessageEndpoints.gui_instruction_response(request_id), cb=self._on_rpc_response
MessageEndpoints.gui_instruction_response(request_id),
cb=self._on_rpc_response,
parent=self,
)
target_gui_id = gui_id or self._gui_id
sent_at = time.time()
deadline = sent_at + timeout if timeout is not None else None
rpc_msg.metadata.update(
{
"method": method,
"receiver": receiver,
"target_gui_id": target_gui_id,
"object_name": self.object_name,
"wait_for_response": wait_for_rpc_response,
"timeout": timeout,
"sent_at": sent_at,
"deadline": deadline,
}
)
logger.info(
"Sending GUI RPC request "
f"request_id={request_id} method={method} receiver={receiver} "
f"target_gui_id={target_gui_id} object_name={self.object_name} "
f"wait_for_response={wait_for_rpc_response} timeout={timeout}"
)
self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg)
if wait_for_rpc_response:
try:
finished = self._msg_wait_event.wait(timeout)
if not finished:
logger.error(
"GUI RPC response timeout "
f"request_id={request_id} method={method} receiver={receiver} "
f"target_gui_id={target_gui_id} object_name={self.object_name} "
f"timeout={timeout}"
)
raise RPCResponseTimeoutError(request_id, timeout)
finally:
self._msg_wait_event.clear()
@@ -308,23 +270,17 @@ class RPCBase:
# the _on_rpc_response method
assert isinstance(self._rpc_response, messages.RequestResponseMessage)
logger.info(
"Received GUI RPC response "
f"request_id={request_id} method={method} receiver={receiver} "
f"target_gui_id={target_gui_id} object_name={self.object_name} "
f"accepted={self._rpc_response.accepted}"
)
if not self._rpc_response.accepted:
raise ValueError(self._rpc_response.message["error"])
msg_result = self._rpc_response.message.get("result")
self._rpc_response = None
return self._create_widget_from_msg_result(msg_result)
def _on_rpc_response(self, msg_obj: MessageObject) -> None:
@staticmethod
def _on_rpc_response(msg_obj: MessageObject, parent: RPCBase) -> None:
msg = cast(messages.RequestResponseMessage, msg_obj.value)
logger.debug(f"GUI RPC response callback received: {msg}")
self._rpc_response = msg
self._msg_wait_event.set()
parent._rpc_response = msg
parent._msg_wait_event.set()
def _create_widget_from_msg_result(self, msg_result):
if msg_result is None:
@@ -336,11 +292,6 @@ class RPCBase:
return {
key: self._create_widget_from_msg_result(val) for key, val in msg_result.items()
}
rpc_enabled = msg_result.get("__rpc__", True)
if rpc_enabled is False:
return None
msg_result = dict(msg_result)
cls = msg_result.pop("widget_class", None)
msg_result.pop("__rpc__", None)
@@ -5,13 +5,14 @@ from threading import RLock
from typing import TYPE_CHECKING, Callable
from weakref import WeakValueDictionary
import shiboken6 as shb
from bec_lib.logger import bec_logger
from qtpy.QtCore import QObject
if TYPE_CHECKING: # pragma: no cover
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.widgets.containers.dock.dock import BECDock
from bec_widgets.widgets.containers.dock.dock_area import BECDockArea
logger = bec_logger.logger
@@ -108,19 +109,11 @@ class RPCRegister:
dict: A dictionary containing all the registered RPC objects.
"""
with self._lock:
connections = {}
for gui_id, obj in self._rpc_register.items():
try:
if not shb.isValid(obj):
continue
connections[gui_id] = obj
except Exception as e:
logger.warning(f"Error checking validity of object {gui_id}: {e}")
continue
connections = dict(self._rpc_register)
return connections
def get_names_of_rpc_by_class_type(
self, cls: type[BECWidget] | type[BECConnector]
self, cls: type[BECWidget] | type[BECConnector] | type[BECDock] | type[BECDockArea]
) -> list[str]:
"""Get all the names of the widgets.
+56
View File
@@ -0,0 +1,56 @@
from __future__ import annotations
from bec_widgets.cli.client_utils import IGNORE_WIDGETS
from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.plugin_utils import get_custom_classes
class RPCWidgetHandler:
"""Handler class for creating widgets from RPC messages."""
def __init__(self):
self._widget_classes = None
@property
def widget_classes(self) -> dict[str, type[BECWidget]]:
"""
Get the available widget classes.
Returns:
dict: The available widget classes.
"""
if self._widget_classes is None:
self.update_available_widgets()
return self._widget_classes # type: ignore
def update_available_widgets(self):
"""
Update the available widgets.
Returns:
None
"""
self._widget_classes = (
get_custom_classes("bec_widgets") + get_all_plugin_widgets()
).as_dict(IGNORE_WIDGETS)
def create_widget(self, widget_type, **kwargs) -> BECWidget:
"""
Create a widget from an RPC message.
Args:
widget_type(str): The type of the widget.
name (str): The name of the widget.
**kwargs: The keyword arguments for the widget.
Returns:
widget(BECWidget): The created widget.
"""
widget_class = self.widget_classes.get(widget_type) # type: ignore
if widget_class:
return widget_class(**kwargs)
raise ValueError(f"Unknown widget type: {widget_type}")
widget_handler = RPCWidgetHandler()
@@ -5,11 +5,9 @@ import json
import os
import signal
import sys
import traceback
from contextlib import redirect_stderr, redirect_stdout
import darkdetect
import shiboken6
from bec_lib.logger import bec_logger
from bec_lib.service_config import ServiceConfig
from bec_qthemes import apply_theme
@@ -20,8 +18,8 @@ from qtpy.QtWidgets import QApplication
import bec_widgets
from bec_widgets.applications.launch_window import LaunchWindow
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.bec_dispatcher import BECDispatcher
from bec_widgets.utils.rpc_register import RPCRegister
logger = bec_logger.logger
@@ -64,7 +62,6 @@ class GUIServer:
self.app: QApplication | None = None
self.launcher_window: LaunchWindow | None = None
self.dispatcher: BECDispatcher | None = None
self._shutdown_started = False
def start(self):
"""
@@ -76,7 +73,6 @@ class GUIServer:
bec_logger._stderr_log_level = bec_logger.LOGLEVEL.ERROR
bec_logger._update_sinks()
bec_logger.disabled_modules = ["bec_lib.scan_items"]
with redirect_stdout(SimpleFileLikeFromLogOutputFunc(logger.info)): # type: ignore
with redirect_stderr(SimpleFileLikeFromLogOutputFunc(logger.error)): # type: ignore
self._run()
@@ -97,7 +93,6 @@ class GUIServer:
"""
Run the GUI server.
"""
logger.info("Starting GUIServer", repr(self))
self.app = QApplication(sys.argv)
if darkdetect.isDark():
apply_theme("dark")
@@ -106,27 +101,35 @@ class GUIServer:
self.app.setApplicationName("BEC")
self.app.gui_id = self.gui_id # type: ignore
self.app.gui_server = self # type: ignore # make server accessible from QApplication for getattr in widgets
self.setup_bec_icon()
service_config = self._get_service_config()
self.dispatcher = BECDispatcher(config=service_config, gui_id=self.gui_id)
# self.dispatcher.start_cli_server(gui_id=self.gui_id)
if self.gui_class:
self.launcher_window = LaunchWindow(
gui_id=f"{self.gui_id}:launcher",
launch_gui_class=self.gui_class,
launch_gui_id=self.gui_class_id,
)
else:
self.launcher_window = LaunchWindow(gui_id=f"{self.gui_id}:launcher")
self.launcher_window = LaunchWindow(gui_id=f"{self.gui_id}:launcher")
self.launcher_window.setAttribute(Qt.WA_ShowWithoutActivating) # type: ignore
self.app.aboutToQuit.connect(self.shutdown)
self.app.setQuitOnLastWindowClosed(True)
self.app.setQuitOnLastWindowClosed(False)
signal.signal(signal.SIGINT, self.request_shutdown)
signal.signal(signal.SIGTERM, self.request_shutdown)
if self.gui_class:
# If the server is started with a specific gui class, we launch it.
# This will automatically hide the launcher.
self.launcher_window.launch(self.gui_class, name=self.gui_class_id)
def sigint_handler(*args):
# display message, for people to let it terminate gracefully
print("Caught SIGINT, exiting")
# Widgets should be all closed.
with RPCRegister.delayed_broadcast():
for widget in QApplication.instance().topLevelWidgets(): # type: ignore
widget.close()
if self.app:
self.app.quit()
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
sys.exit(self.app.exec())
@@ -143,67 +146,15 @@ class GUIServer:
)
self.app.setWindowIcon(icon)
def request_shutdown(self, signum=None, _frame=None):
"""
Request Qt application shutdown from an RPC call or OS signal.
Cleanup itself is handled by ``shutdown()``, which is connected to
``QApplication.aboutToQuit``. Calling it directly here would run BEC/RPC
teardown before Qt has processed the widget close events.
"""
signal_name = signal.Signals(signum).name if signum is not None else "shutdown"
pid = os.getpid()
if self.app is None:
logger.info(f"Caught {signal_name}, shutting down GUI server pid={pid} without app")
self.shutdown()
return
widgets = [
f"{widget.__class__.__name__}(objectName={widget.objectName()!r})"
for widget in self.app.topLevelWidgets()
]
logger.info(
f"Caught {signal_name}, requesting GUI server shutdown pid={pid} "
f"top_level_widgets={widgets}"
)
with RPCRegister.delayed_broadcast():
for widget in self.app.topLevelWidgets():
widget.close()
self.app.quit()
@staticmethod
def _run_shutdown_step(step: str, callback):
try:
callback()
except Exception as exc:
logger.error(
f"GUIServer shutdown step failed pid={os.getpid()} step={step}: {exc}\n"
f"{traceback.format_exc()}"
)
def shutdown(self):
if self._shutdown_started:
return
self._shutdown_started = True
logger.info(f"Shutdown GUIServer pid={os.getpid()} {repr(self)}")
def close_launcher_window():
if self.launcher_window and shiboken6.isValid(self.launcher_window):
self.launcher_window.close()
self.launcher_window.deleteLater()
def stop_pylsp_server():
if pylsp_server.is_running():
pylsp_server.stop()
def stop_dispatcher():
if self.dispatcher:
self.dispatcher.stop_cli_server()
self.dispatcher.disconnect_all()
self._run_shutdown_step("close_launcher_window", close_launcher_window)
self._run_shutdown_step("stop_pylsp_server", stop_pylsp_server)
self._run_shutdown_step("stop_dispatcher", stop_dispatcher)
"""
Shutdown the GUI server.
"""
if pylsp_server.is_running():
pylsp_server.stop()
if self.dispatcher:
self.dispatcher.stop_cli_server()
self.dispatcher.disconnect_all()
def main():
@@ -25,9 +25,11 @@ from qtpy.QtWidgets import (
QWidget,
)
from bec_widgets import BECWidget
from bec_widgets.cli.rpc.rpc_widget_handler import widget_handler
from bec_widgets.utils.colors import apply_theme
from bec_widgets.utils.rpc_widget_handler import widget_handler
from bec_widgets.utils.widget_io import WidgetHierarchy as wh
from bec_widgets.widgets.containers.dock import BECDockArea
from bec_widgets.widgets.editors.jupyter_console.jupyter_console import BECJupyterConsole
@@ -206,6 +208,7 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
def _populate_registry_widgets(self):
try:
widget_handler.update_available_widgets()
items = sorted(widget_handler.widget_classes.keys())
except Exception as exc:
print(f"Failed to load registered widgets: {exc}")
@@ -334,13 +337,20 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
If kwargs does not contain `object_name`, it will default to the provided shortcut.
"""
# Ensure registry is loaded
widget_handler.update_available_widgets()
cls = widget_handler.widget_classes.get(widget_type)
if cls is None:
raise ValueError(f"Unknown registered widget type: {widget_type}")
if kwargs is None:
kwargs = {"object_name": shortcut}
else:
kwargs = dict(kwargs)
kwargs.setdefault("object_name", shortcut)
widget = widget_handler.create_widget(widget_type, **kwargs)
# Instantiate and add
widget = cls(**kwargs)
if not isinstance(widget, QWidget):
raise TypeError(
f"Instantiated object for type '{widget_type}' is not a QWidget: {type(widget)}"
@@ -356,6 +366,15 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
def closeEvent(self, event):
"""Override to handle things when main window is closed."""
# clean up any widgets that might have custom cleanup
try:
# call cleanup on known containers if present
dock = self._widgets_by_name.get("dock")
if isinstance(dock, BECDockArea):
dock.cleanup()
dock.close()
except Exception:
pass
# Ensure the embedded kernel and BEC client are shut down before window teardown
self.console.shutdown_kernel()
+5 -30
View File
@@ -1,7 +1,5 @@
# pylint: skip-file
from unittest.mock import MagicMock
from bec_lib.config_helper import ConfigHelper
from bec_lib.device import Device as BECDevice
from bec_lib.device import Positioner as BECPositioner
from bec_lib.device import ReadoutPriority
@@ -15,7 +13,7 @@ class FakeDevice(BECDevice):
super().__init__(name=name)
self._enabled = enabled
self.signals = {self.name: {"value": 1.0}}
self._description = {self.name: {"source": self.name, "dtype": "number", "shape": []}}
self.description = {self.name: {"source": self.name, "dtype": "number", "shape": []}}
self._readout_priority = readout_priority
self._config = {
"readoutPriority": "baseline",
@@ -26,16 +24,6 @@ class FakeDevice(BECDevice):
"readOnly": False,
"name": self.name,
}
self._info = {
"signals": {
self.name: {
"kind_str": "hinted",
"component_name": self.name,
"obj_name": self.name,
"signal_class": "Signal",
}
}
}
@property
def readout_priority(self):
@@ -74,7 +62,7 @@ class FakeDevice(BECDevice):
Returns:
dict: Description of the device
"""
return self._description
return self.description
class FakePositioner(BECPositioner):
@@ -96,7 +84,7 @@ class FakePositioner(BECPositioner):
self._limits = limits
self._readout_priority = readout_priority
self.signals = {self.name: {"value": 1.0}}
self._description = {self.name: {"source": self.name, "dtype": "number", "shape": []}}
self.description = {self.name: {"source": self.name, "dtype": "number", "shape": []}}
self._config = {
"readoutPriority": "baseline",
"deviceClass": "ophyd_devices.SimPositioner",
@@ -176,7 +164,7 @@ class FakePositioner(BECPositioner):
Returns:
dict: Description of the device
"""
return self._description
return self.description
@property
def precision(self):
@@ -220,9 +208,7 @@ class Device(FakeDevice):
class DMMock:
def __init__(self, *args, **kwargs):
self._service = args[0]
self.config_helper = ConfigHelper(self._service.connector, self._service._service_name)
def __init__(self):
self.devices = DeviceContainer()
self.enabled_devices = [device for device in self.devices if device.enabled]
@@ -269,17 +255,6 @@ class DMMock:
signals.append((device_name, signal_name, signal_info))
return signals
def _get_redis_device_config(self) -> list[dict]:
"""Mock method to emulate DeviceManager._get_redis_device_config."""
configs = []
for device in self.devices.values():
configs.append(device._config)
return configs
def initialize(*_): ...
def shutdown(self): ...
DEVICES = [
FakePositioner("samx", limits=[-10, 10], read_value=2.0),
+12
View File
@@ -1 +1,13 @@
from qtpy.QtWebEngineWidgets import QWebEngineView
from .bec_connector import BECConnector, ConnectionConfig
from .bec_dispatcher import BECDispatcher
from .bec_table import BECTable
from .colors import Colors
from .container_utils import WidgetContainerUtils
from .crosshair import Crosshair
from .entry_validator import EntryValidator
from .layout_manager import GridLayoutManager
from .rpc_decorator import register_rpc_methods, rpc_public
from .ui_loader import UILoader
from .validator_delegate import DoubleValidationDelegate
+32 -131
View File
@@ -8,21 +8,20 @@ import uuid
from datetime import datetime
from typing import TYPE_CHECKING, Optional
import shiboken6 as shb
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import_from
from pydantic import BaseModel, Field, field_validator
from qtpy.QtCore import Property, QObject, QRunnable, QThreadPool, Signal
from qtpy.QtCore import QObject, QRunnable, QThreadPool, QTimer, Signal
from qtpy.QtWidgets import QApplication
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.error_popups import ErrorPopupUtility, SafeSlot
from bec_widgets.utils.name_utils import sanitize_namespace
from bec_widgets.utils.rpc_register import RPCRegister
from bec_widgets.utils.widget_io import WidgetHierarchy
from bec_widgets.utils.yaml_dialog import load_yaml, load_yaml_gui, save_yaml, save_yaml_gui
if TYPE_CHECKING: # pragma: no cover
from bec_widgets.utils.bec_dispatcher import BECDispatcher
from bec_widgets.widgets.containers.dock import BECDock
else:
BECDispatcher = lazy_import_from("bec_widgets.utils.bec_dispatcher", ("BECDispatcher",))
@@ -87,9 +86,8 @@ class BECConnector:
config: ConnectionConfig | None = None,
gui_id: str | None = None,
object_name: str | None = None,
parent_dock: BECDock | None = None, # TODO should go away -> issue created #473
root_widget: bool = False,
rpc_exposed: bool = True,
rpc_passthrough_children: bool = True,
**kwargs,
):
"""
@@ -100,17 +98,12 @@ class BECConnector:
config(ConnectionConfig, optional): The connection configuration with specific gui id.
gui_id(str, optional): The GUI ID.
object_name(str, optional): The object name.
parent_dock(BECDock, optional): The parent dock.# TODO should go away -> issue created #473
root_widget(bool, optional): If set to True, the parent_id will be always set to None, thus enforcing that the widget is accessible as a root widget of the BECGuiClient object.
rpc_exposed(bool, optional): If set to False, this instance is excluded from RPC registry broadcast and CLI namespace discovery.
rpc_passthrough_children(bool, optional): Only relevant when ``rpc_exposed=False``.
If True, RPC-visible children rebind to the next visible ancestor.
If False (default), children stay hidden behind this widget.
**kwargs:
"""
# Extract object_name from kwargs to not pass it to Qt class
object_name = object_name or kwargs.pop("objectName", None)
if object_name is not None:
object_name = sanitize_namespace(object_name)
# Ensure the parent is always the first argument for QObject
parent = kwargs.pop("parent", None)
# This initializes the QObject or any qt related class BECConnector has to be used from this line down with QObject, otherwise hierarchy logic will not work
@@ -126,6 +119,7 @@ class BECConnector:
# BEC related connections
self.bec_dispatcher = BECDispatcher(client=client)
self.client = self.bec_dispatcher.client if client is None else client
self._parent_dock = parent_dock # TODO also remove at some point -> issue created #473
self.rpc_register = RPCRegister()
if not self.client in BECConnector.EXIT_HANDLERS:
@@ -133,13 +127,8 @@ class BECConnector:
# the function depends on BECClient, and BECDispatcher
@SafeSlot()
def terminate(client=self.client, dispatcher=self.bec_dispatcher):
app = QApplication.instance()
gui_server = getattr(app, "gui_server", None)
if gui_server and hasattr(gui_server, "shutdown"):
gui_server.shutdown()
logger.info("Disconnecting", repr(dispatcher))
dispatcher.disconnect_all()
dispatcher.stop_cli_server()
try: # shutdown ophyd threads if any
from ophyd._pyepics_shim import _dispatcher
@@ -167,7 +156,7 @@ class BECConnector:
)
self.config = ConnectionConfig(widget_class=self.__class__.__name__)
# If the gui_id is passed, it should be respected. However, this should be revisited since
# If the gui_id is passed, it should be respected. However, this should be revisted since
# the gui_id has to be unique, and may no longer be.
if gui_id:
self.config.gui_id = gui_id
@@ -195,54 +184,19 @@ class BECConnector:
# If set to True, the parent_id will be always set to None, thus enforcing that the widget is accessible as a root widget of the BECGuiClient object.
self.root_widget = root_widget
# If set to False, this instance is not exposed through RPC at all.
self.rpc_exposed = bool(rpc_exposed)
# If True on a hidden parent (rpc_exposed=False), children can bubble up to
# the next visible RPC ancestor.
self.rpc_passthrough_children = bool(rpc_passthrough_children)
self._update_object_name()
QTimer.singleShot(0, self._update_object_name)
@property
def parent_id(self) -> str | None:
try:
if self.root_widget:
return None
connector_parent = self._get_rpc_parent_ancestor()
connector_parent = WidgetHierarchy._get_becwidget_ancestor(self)
return connector_parent.gui_id if connector_parent else None
except:
logger.error(f"Error getting parent_id for {self.__class__.__name__}")
def _get_rpc_parent_ancestor(self) -> BECConnector | None:
"""
Find the nearest ancestor that is RPC-addressable.
Rules:
- If an ancestor has ``rpc_exposed=False``, it is an explicit visibility
boundary unless ``rpc_passthrough_children=True``.
- If an ancestor has ``RPC=False`` (but remains rpc_exposed), it is treated
as structural and children continue to the next ancestor.
- Lookup always happens through ``WidgetHierarchy.get_becwidget_ancestor``
so plain ``QWidget`` nodes between connectors are ignored.
"""
current = self
while True:
parent = WidgetHierarchy.get_becwidget_ancestor(current)
if parent is None:
return None
if not getattr(parent, "rpc_exposed", True):
if getattr(parent, "rpc_passthrough_children", False):
current = parent
continue
return parent
if getattr(parent, "RPC", True):
return parent
current = parent
return None
def change_object_name(self, name: str) -> None:
"""
Change the object name of the widget. Unregister old name and register the new one.
@@ -252,7 +206,7 @@ class BECConnector:
"""
self.rpc_register.remove_rpc(self)
self.setObjectName(name.replace("-", "_").replace(" ", "_"))
self._update_object_name()
QTimer.singleShot(0, self._update_object_name)
def _update_object_name(self) -> None:
"""
@@ -261,13 +215,11 @@ class BECConnector:
"""
# 1) Enforce unique objectName among siblings with the same BECConnector parent
self._enforce_unique_sibling_name()
# 2) Register the object for RPC unless instance-level exposure is disabled.
if getattr(self, "rpc_exposed", True):
self.rpc_register.add_rpc(self)
# 2) Register the object for RPC
self.rpc_register.add_rpc(self)
try:
self.name_established.emit(self.object_name)
except RuntimeError as e:
logger.warning(f"Error emitting name_established signal: {e}")
except RuntimeError:
return
def _enforce_unique_sibling_name(self):
@@ -278,20 +230,23 @@ class BECConnector:
- If there's a nearest BECConnector parent, only compare with children of that parent.
- If parent is None (i.e., top-level object), compare with all other top-level BECConnectors.
"""
if not shb.isValid(self):
return
parent_bec = WidgetHierarchy.get_becwidget_ancestor(self)
QApplication.sendPostedEvents()
parent_bec = WidgetHierarchy._get_becwidget_ancestor(self)
if parent_bec:
# We have a parent => only compare with siblings under that parent
siblings = [sib for sib in parent_bec.findChildren(BECConnector) if shb.isValid(sib)]
siblings = parent_bec.findChildren(BECConnector)
else:
# No parent => treat all top-level BECConnectors as siblings
# Use RPCRegister to avoid QApplication.allWidgets() during event processing.
connections = self.rpc_register.list_all_connections().values()
all_bec = [w for w in connections if isinstance(w, BECConnector) and shb.isValid(w)]
siblings = [w for w in all_bec if WidgetHierarchy.get_becwidget_ancestor(w) is None]
# 1) Gather all BECConnectors from QApplication
all_widgets = QApplication.allWidgets()
all_bec = [w for w in all_widgets if isinstance(w, BECConnector)]
# 2) "Top-level" means closest BECConnector parent is None
top_level_bec = [
w for w in all_bec if WidgetHierarchy._get_becwidget_ancestor(w) is None
]
# 3) We are among these top-level siblings
siblings = top_level_bec
# Collect used names among siblings
used_names = {sib.objectName() for sib in siblings if sib is not self}
@@ -319,8 +274,6 @@ class BECConnector:
Args:
name (str): The new object name.
"""
# sanitize before setting to avoid issues with Qt object names and RPC namespaces
name = sanitize_namespace(name)
super().setObjectName(name)
self.object_name = name
if self.rpc_register.object_is_registered(self):
@@ -399,7 +352,7 @@ class BECConnector:
"""
self.config = config
# FIXME some thoughts are required to decide how this should work with rpc registry
# FIXME some thoughts are required to decide how thhis should work with rpc registry
def apply_config(self, config: dict, generate_new_id: bool = True) -> None:
"""
Apply the configuration to the widget.
@@ -417,7 +370,7 @@ class BECConnector:
else:
self.gui_id = self.config.gui_id
# FIXME some thoughts are required to decide how this should work with rpc registry
# FIXME some thoughts are required to decide how thhis should work with rpc registry
def load_config(self, path: str | None = None, gui: bool = False):
"""
Load the configuration of the widget from YAML.
@@ -503,8 +456,12 @@ class BECConnector:
def remove(self):
"""Cleanup the BECConnector"""
# If the widget is attached to a dock, remove it from the dock.
# TODO this should be handled by dock and dock are not by BECConnector -> issue created #473
if self._parent_dock is not None:
self._parent_dock.delete(self.object_name)
# If the widget is from Qt, trigger its close method.
if hasattr(self, "close"):
elif hasattr(self, "close"):
self.close()
# If the widget is neither from a Dock nor from Qt, remove it from the RPC registry.
# i.e. Curve Item from Waveform
@@ -528,62 +485,6 @@ class BECConnector:
else:
return self.config
def export_settings(self) -> dict:
"""
Export the settings of the widget as dict.
Returns:
dict: The exported settings of the widget.
"""
# We first get all qproperties that were defined in a bec_widgets class
objs = self._get_bec_meta_objects()
settings = {}
for prop_name in objs.keys():
try:
prop_value = getattr(self, prop_name)
settings[prop_name] = prop_value
except Exception as e:
logger.warning(
f"Could not export property '{prop_name}' from '{self.__class__.__name__}': {e}"
)
return settings
def load_settings(self, settings: dict) -> None:
"""
Load the settings of the widget from dict.
Args:
settings (dict): The settings to load into the widget.
"""
objs = self._get_bec_meta_objects()
for prop_name, prop_value in settings.items():
if prop_name in objs:
try:
setattr(self, prop_name, prop_value)
except Exception as e:
logger.warning(
f"Could not load property '{prop_name}' into '{self.__class__.__name__}': {e}"
)
def _get_bec_meta_objects(self) -> dict:
"""
Get BEC meta objects for the widget.
Returns:
dict: BEC meta objects.
"""
if not isinstance(self, QObject):
return {}
objects = {}
for name, attr in vars(self.__class__).items():
if isinstance(attr, Property):
# Check if the property is a SafeProperty
is_safe_property = getattr(attr.fget, "__is_safe_getter__", False)
if is_safe_property:
objects[name] = attr
return objects
# --- Example usage of BECConnector: running a simple task ---
if __name__ == "__main__": # pragma: no cover
+13 -52
View File
@@ -3,9 +3,8 @@ from __future__ import annotations
import collections
import random
import string
import time
from collections.abc import Callable
from typing import TYPE_CHECKING, Any, DefaultDict, Hashable, Union
from typing import TYPE_CHECKING, DefaultDict, Hashable, Union
import louie
import redis
@@ -16,7 +15,6 @@ from bec_lib.service_config import ServiceConfig
from qtpy.QtCore import QObject
from qtpy.QtCore import Signal as pyqtSignal
from bec_widgets.utils.rpc_logging import elapsed_seconds, format_elapsed
from bec_widgets.utils.serialization import register_serializer_extension
logger = bec_logger.logger
@@ -27,39 +25,6 @@ if TYPE_CHECKING: # pragma: no cover
from bec_widgets.utils.rpc_server import RPCServer
def _log_rpc_dispatcher_receive(msg_content: Any, metadata: Any) -> None:
if not isinstance(msg_content, dict) or not isinstance(metadata, dict):
return
request_id = metadata.get("request_id")
method = msg_content.get("action")
parameter = msg_content.get("parameter")
if request_id is None or method is None or not isinstance(parameter, dict):
return
dispatch_received_at = time.time()
sent_at = metadata.get("sent_at")
deadline = metadata.get("deadline")
timeout = metadata.get("timeout")
dispatch_latency = elapsed_seconds(sent_at, dispatch_received_at)
stale_on_dispatch = deadline is not None and dispatch_received_at > deadline
target_gui_id = parameter.get("gui_id") or metadata.get("target_gui_id")
logger.info(
"GUI RPC dispatcher received request before Qt callback emit "
f"request_id={request_id} method={method} receiver={metadata.get('receiver')} "
f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} "
f"timeout={timeout} dispatch_latency_s={format_elapsed(dispatch_latency)} "
f"stale_on_dispatch={stale_on_dispatch}"
)
if stale_on_dispatch:
logger.warning(
"GUI RPC dispatcher received request after client timeout deadline "
f"request_id={request_id} method={method} receiver={metadata.get('receiver')} "
f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} "
f"timeout={timeout} dispatch_latency_s={format_elapsed(dispatch_latency)}"
)
class QtThreadSafeCallback(QObject):
"""QtThreadSafeCallback is a wrapper around a callback function to make it thread-safe for Qt."""
@@ -123,12 +88,10 @@ class QtRedisConnector(RedisConnector):
# we can notice kwargs are lost when passed to Qt slot
metadata = msg.metadata
_log_rpc_dispatcher_receive(msg.content, metadata)
cb(msg.content, metadata)
else:
# from stream
msg = msg["data"]
_log_rpc_dispatcher_receive(msg.content, msg.metadata)
cb(msg.content, msg.metadata)
@@ -160,16 +123,17 @@ class BECDispatcher:
self._registered_slots: DefaultDict[Hashable, QtThreadSafeCallback] = (
collections.defaultdict()
)
self.client = client
if client is None:
if config is not None and not isinstance(config, ServiceConfig):
# config is supposed to be a path
config = ServiceConfig(config)
if self.client is None:
if config is not None:
if not isinstance(config, ServiceConfig):
# config is supposed to be a path
config = ServiceConfig(config)
self.client = BECClient(
config=config, connector_cls=QtRedisConnector, name="BECWidgets"
)
else:
self.client = client
if self.client.started:
# have to reinitialize client to use proper connector
logger.info("Shutting down BECClient to switch to QtRedisConnector")
@@ -212,15 +176,12 @@ class BECDispatcher:
cb_info (dict | None): A dictionary containing information about the callback. Defaults to None.
"""
qt_slot = QtThreadSafeCallback(cb=slot, cb_info=cb_info)
if not self.client.connector.any_stream_is_registered(topics, qt_slot):
if qt_slot not in self._registered_slots:
self._registered_slots[qt_slot] = qt_slot
qt_slot = self._registered_slots[qt_slot]
self.client.connector.register(topics, cb=qt_slot, **kwargs)
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
qt_slot.topics.update(set(topics_str))
else:
logger.warning(f"Attempted to create duplicate stream subscription for {topics=}")
if qt_slot not in self._registered_slots:
self._registered_slots[qt_slot] = qt_slot
qt_slot = self._registered_slots[qt_slot]
self.client.connector.register(topics, cb=qt_slot, **kwargs)
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
qt_slot.topics.update(set(topics_str))
def disconnect_slot(
self, slot: Callable, topics: EndpointInfo | str | list[EndpointInfo] | list[str]
-87
View File
@@ -1,87 +0,0 @@
"""
Login dialog for user authentication.
The Login Widget is styled in a Material Design style and emits
the entered credentials through a signal for further processing.
"""
from qtpy.QtCore import Qt, Signal
from qtpy.QtWidgets import QLabel, QLineEdit, QPushButton, QVBoxLayout, QWidget
class BECLogin(QWidget):
"""Login dialog for user authentication in Material Design style."""
credentials_entered = Signal(str, str)
def __init__(self, parent=None):
super().__init__(parent=parent)
# Only displayed if this widget as standalone widget, and not embedded in another widget
self.setWindowTitle("Login")
title = QLabel("Sign in", parent=self)
title.setAlignment(Qt.AlignmentFlag.AlignCenter)
title.setStyleSheet("""
#QLabel
{
font-size: 18px;
font-weight: 600;
}
""")
self.username = QLineEdit(parent=self)
self.username.setPlaceholderText("Username")
self.password = QLineEdit(parent=self)
self.password.setPlaceholderText("Password")
self.password.setEchoMode(QLineEdit.EchoMode.Password)
self.ok_btn = QPushButton("Sign in", parent=self)
self.ok_btn.setDefault(True)
self.ok_btn.clicked.connect(self._emit_credentials)
# If the user presses Enter in the password field, trigger the OK button click
self.password.returnPressed.connect(self.ok_btn.click)
# Build Layout
layout = QVBoxLayout(self)
layout.setContentsMargins(32, 32, 32, 32)
layout.setSpacing(16)
layout.addWidget(title)
layout.addSpacing(8)
layout.addWidget(self.username)
layout.addWidget(self.password)
layout.addSpacing(12)
layout.addWidget(self.ok_btn)
self.username.setFocus()
self.setStyleSheet("""
QLineEdit {
padding: 8px;
}
""")
def _clear_password(self):
"""Clear the password field."""
self.password.clear()
def _emit_credentials(self):
"""Emit credentials and clear the password field."""
self.credentials_entered.emit(self.username.text().strip(), self.password.text())
self._clear_password()
if __name__ == "__main__": # pragma: no cover
import sys
from bec_qthemes import apply_theme
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
apply_theme("light")
dialog = BECLogin()
dialog.credentials_entered.connect(lambda u, p: print(f"Username: {u}, Password: {p}"))
dialog.show()
sys.exit(app.exec_())
+4 -65
View File
@@ -4,7 +4,6 @@ import importlib.metadata
import inspect
import pkgutil
import traceback
from functools import lru_cache
from importlib import util as importlib_util
from importlib.machinery import FileFinder, ModuleSpec, SourceFileLoader
from types import ModuleType
@@ -12,11 +11,7 @@ from typing import Generator
from bec_lib.logger import bec_logger
from bec_widgets.utils.plugin_utils import (
BECClassContainer,
BECClassInfo,
rpc_widget_registry_from_source,
)
from bec_widgets.utils.plugin_utils import BECClassContainer, BECClassInfo
logger = bec_logger.logger
@@ -58,14 +53,6 @@ def _submodule_by_name(module: ModuleType, name: str):
return None
def _submodule_spec_by_name(module: ModuleType, name: str) -> ModuleSpec | None:
for module_info in pkgutil.iter_modules(module.__path__):
if module_info.name != name or not isinstance(module_info.module_finder, FileFinder):
continue
return module_info.module_finder.find_spec(module_info.name)
return None
def _get_widgets_from_module(module: ModuleType) -> BECClassContainer:
"""Find any BECWidget subclasses in the given module and return them with their info."""
from bec_widgets.utils.bec_widget import BECWidget # avoid circular import
@@ -103,64 +90,16 @@ def get_plugin_client_module() -> ModuleType | None:
return _submodule_by_name(plugin, "client") if (plugin := user_widget_plugin()) else None
def get_plugin_designer_module() -> ModuleType | None:
"""If there is a plugin repository installed, return the designer module."""
return (
_submodule_by_name(plugin, "designer_plugins") if (plugin := user_widget_plugin()) else None
)
@lru_cache
def get_plugin_rpc_widget_registry() -> dict[str, tuple[str, str]]:
"""If there is a plugin repository installed, return the RPC widget registry."""
plugin = user_widget_plugin()
if plugin is None:
return {}
client_spec = _submodule_spec_by_name(plugin, "client")
if client_spec is not None and client_spec.origin:
try:
return rpc_widget_registry_from_source(client_spec.origin)
except (OSError, SyntaxError) as exc:
logger.warning(f"Could not parse plugin RPC widget registry: {exc}")
client_module = get_plugin_client_module()
if client_module is None:
return {}
registry = {}
for plugin_name, plugin_class in inspect.getmembers(client_module, inspect.isclass):
if hasattr(plugin_class, "_IMPORT_MODULE"):
registry[plugin_name] = (plugin_class._IMPORT_MODULE, plugin_class.__name__)
return registry
@lru_cache
def get_plugin_designer_registry() -> dict[str, tuple[str, str]]:
"""If there is a plugin repository installed, return the designer plugin registry."""
designer_module = get_plugin_designer_module()
if designer_module and hasattr(designer_module, "designer_plugins"):
return designer_module.designer_plugins
return {}
@lru_cache
def get_plugin_widget_icons() -> dict[str, str]:
"""If there is a plugin repository installed, return the designer widget icon registry."""
designer_module = get_plugin_designer_module()
if designer_module and hasattr(designer_module, "widget_icons"):
return designer_module.widget_icons
return {}
def get_all_plugin_widgets() -> BECClassContainer:
"""If there is a plugin repository installed, load all widgets from it."""
if plugin := user_widget_plugin():
return _all_widgets_from_all_submods(plugin)
return BECClassContainer()
else:
return BECClassContainer()
if __name__ == "__main__": # pragma: no cover
widgets = get_plugin_rpc_widget_registry()
client = get_plugin_client_module()
print(get_all_plugin_widgets())
...
+75 -122
View File
@@ -6,20 +6,18 @@ from typing import TYPE_CHECKING
import shiboken6
from bec_lib.logger import bec_logger
from qtpy.QtCore import QBuffer, QByteArray, QIODevice, QObject, Qt
from qtpy.QtGui import QFont, QPixmap
from qtpy.QtWidgets import QApplication, QFileDialog, QLabel, QVBoxLayout, QWidget
from qtpy.QtGui import QPixmap
from qtpy.QtWidgets import QApplication, QFileDialog, QWidget
import bec_widgets.widgets.containers.qt_ads as QtAds
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.bec_connector import BECConnector, ConnectionConfig
from bec_widgets.utils.busy_loader import install_busy_loader
from bec_widgets.utils.error_popups import SafeConnect, SafeSlot
from bec_widgets.utils.rpc_decorator import rpc_timeout
from bec_widgets.utils.rpc_register import RPCRegister
from bec_widgets.utils.widget_io import WidgetHierarchy
from bec_widgets.widgets.utility.spinner.spinner import SpinnerWidget
if TYPE_CHECKING: # pragma: no cover
from bec_widgets.utils.busy_loader import BusyLoaderOverlay
from bec_widgets.widgets.containers.dock import BECDock
logger = bec_logger.logger
@@ -40,6 +38,8 @@ class BECWidget(BECConnector):
gui_id: str | None = None,
theme_update: bool = False,
start_busy: bool = False,
busy_text: str = "Loading…",
parent_dock: BECDock | None = None, # TODO should go away -> issue created #473
**kwargs,
):
"""
@@ -58,7 +58,9 @@ class BECWidget(BECConnector):
theme_update(bool, optional): Whether to subscribe to theme updates. Defaults to False. When set to True, the
widget's apply_theme method will be called when the theme changes.
"""
super().__init__(client=client, config=config, gui_id=gui_id, **kwargs)
super().__init__(
client=client, config=config, gui_id=gui_id, parent_dock=parent_dock, **kwargs
)
if not isinstance(self, QObject):
raise RuntimeError(f"{repr(self)} is not a subclass of QWidget")
if theme_update:
@@ -66,14 +68,18 @@ class BECWidget(BECConnector):
self._connect_to_theme_change()
# Initialize optional busy loader overlay utility (lazy by default)
self._busy_overlay: "BusyLoaderOverlay" | None = None
self._busy_state_widget: QWidget | None = None
self._busy_overlay = None
self._loading = False
self._busy_overlay = self._install_busy_loader()
if start_busy and isinstance(self, QWidget):
self._show_busy_overlay()
self._loading = True
try:
overlay = self._ensure_busy_overlay(busy_text=busy_text)
if overlay is not None:
overlay.setGeometry(self.rect())
overlay.raise_()
overlay.show()
self._loading = True
except Exception as exc:
logger.debug(f"Busy loader init skipped: {exc}")
def _connect_to_theme_change(self):
"""Connect to the theme change signal."""
@@ -94,109 +100,48 @@ class BECWidget(BECConnector):
self._update_overlay_theme(theme)
self.apply_theme(theme)
def create_busy_state_widget(self) -> QWidget:
"""
Method to create a custom busy state widget to be shown in the busy overlay.
Child classes should overrid this method to provide a custom widget if desired.
Returns:
QWidget: The custom busy state widget.
NOTE:
The implementation here is a SpinnerWidget with a "Loading..." label. This is the default
busy state widget for all BECWidgets. However, child classes with specific needs for the
busy state can easily overrite this method to provide a custom widget. The signature of
the method must be preserved to ensure compatibility with the busy overlay system. If
the widget provides a 'cleanup' method, it will be called when the overlay is cleaned up.
The widget may connect to the _busy_overlay signals foreground_color_changed and
scrim_color_changed to update its colors when the theme changes.
"""
# Widget
class BusyStateWidget(QWidget):
def __init__(self, parent=None):
super().__init__(parent)
# label
label = QLabel("Loading...", self)
label.setAlignment(Qt.AlignHCenter | Qt.AlignVCenter)
f = QFont(label.font())
f.setBold(True)
f.setPointSize(f.pointSize() + 1)
label.setFont(f)
# spinner
spinner = SpinnerWidget(self)
spinner.setFixedSize(42, 42)
# Layout
lay = QVBoxLayout(self)
lay.setContentsMargins(24, 24, 24, 24)
lay.setSpacing(10)
lay.addStretch(1)
lay.addWidget(spinner, 0, Qt.AlignHCenter)
lay.addWidget(label, 0, Qt.AlignHCenter)
lay.addStretch(1)
self.setLayout(lay)
def showEvent(self, event):
"""Show event to start the spinner."""
super().showEvent(event)
for child in self.findChildren(SpinnerWidget):
child.start()
def hideEvent(self, event):
"""Hide event to stop the spinner."""
super().hideEvent(event)
for child in self.findChildren(SpinnerWidget):
child.stop()
widget = BusyStateWidget(self)
return widget
def _install_busy_loader(self) -> "BusyLoaderOverlay" | None:
"""
Create the busy overlay on demand and cache it in _busy_overlay.
def _ensure_busy_overlay(self, *, busy_text: str = "Loading…"):
"""Create the busy overlay on demand and cache it in _busy_overlay.
Returns the overlay instance or None if not a QWidget.
"""
if not isinstance(self, QWidget):
return None
overlay = getattr(self, "_busy_overlay", None)
if overlay is None:
from bec_widgets.utils.busy_loader import install_busy_loader
overlay = install_busy_loader(target=self, start_loading=False)
overlay = install_busy_loader(self, text=busy_text, start_loading=False)
self._busy_overlay = overlay
# Create and set the busy state widget
self._busy_state_widget = self.create_busy_state_widget()
self._busy_overlay.set_widget(self._busy_state_widget)
return overlay
def _show_busy_overlay(self) -> None:
def _init_busy_loader(self, *, start_busy: bool = False, busy_text: str = "Loading…") -> None:
"""Create and attach the loading overlay to this widget if QWidget is present."""
if not isinstance(self, QWidget):
return
if self._busy_overlay is not None:
self._busy_overlay.setGeometry(self.rect()) # pylint: disable=no-member
self._ensure_busy_overlay(busy_text=busy_text)
if start_busy and self._busy_overlay is not None:
self._busy_overlay.setGeometry(self.rect())
self._busy_overlay.raise_()
self._busy_overlay.show()
def set_busy(self, enabled: bool) -> None:
def set_busy(self, enabled: bool, text: str | None = None) -> None:
"""
Set the busy state of the widget. This will show or hide the loading overlay, which will
block user interaction with the widget and show the busy_state_widget if provided. Per
default, the busy state widget is a spinner with "Loading..." text.
Enable/disable the loading overlay. Optionally update the text.
Args:
enabled(bool): Whether to enable the busy state.
enabled(bool): Whether to enable the loading overlay.
text(str, optional): The text to display on the overlay. If None, the text is not changed.
"""
if not isinstance(self, QWidget):
return
# If not yet installed, install the busy overlay now together with the busy state widget
if self._busy_overlay is None:
self._busy_overlay = self._install_busy_loader()
if getattr(self, "_busy_overlay", None) is None:
self._ensure_busy_overlay(busy_text=text or "Loading…")
if text is not None:
self.set_busy_text(text)
if enabled:
self._show_busy_overlay()
self._busy_overlay.setGeometry(self.rect())
self._busy_overlay.raise_()
self._busy_overlay.show()
else:
self._busy_overlay.hide()
self._loading = bool(enabled)
@@ -210,6 +155,19 @@ class BECWidget(BECConnector):
"""
return bool(getattr(self, "_loading", False))
def set_busy_text(self, text: str) -> None:
"""
Update the text on the loading overlay.
Args:
text(str): The text to display on the overlay.
"""
overlay = getattr(self, "_busy_overlay", None)
if overlay is None:
overlay = self._ensure_busy_overlay(busy_text=text)
if overlay is not None:
overlay.set_text(text)
@SafeSlot(str)
def apply_theme(self, theme: str):
"""
@@ -222,8 +180,8 @@ class BECWidget(BECConnector):
def _update_overlay_theme(self, theme: str):
try:
overlay = getattr(self, "_busy_overlay", None)
if overlay is not None:
overlay._update_palette()
if overlay is not None and hasattr(overlay, "update_palette"):
overlay.update_palette()
except Exception:
logger.warning(f"Failed to apply theme {theme} to {self}")
@@ -330,34 +288,29 @@ class BECWidget(BECConnector):
# All widgets need to call super().cleanup() in their cleanup method
logger.info(f"Registry cleanup for widget {self.__class__.__name__}")
self.rpc_register.remove_rpc(self)
children = self.findChildren(BECWidget)
for child in children:
if not shiboken6.isValid(child):
# If the child is not valid, it means it has already been deleted
continue
child.close()
child.deleteLater()
children = self.findChildren(BECWidget)
for child in children:
if not shiboken6.isValid(child):
# If the child is not valid, it means it has already been deleted
continue
child.close()
child.deleteLater()
# Tear down busy overlay explicitly to stop spinner and remove filters
overlay = getattr(self, "_busy_overlay", None)
if overlay is not None and shiboken6.isValid(overlay):
try:
overlay.hide()
filt = getattr(overlay, "_filter", None)
if filt is not None and shiboken6.isValid(filt):
try:
self.removeEventFilter(filt)
except Exception as exc:
logger.warning(
f"Failed to remove event filter from busy overlay: {exc}"
)
# Cleanup the overlay widget. This will call cleanup on the custom widget if present.
overlay.cleanup()
overlay.deleteLater()
except Exception as exc:
logger.warning(f"Failed to delete busy overlay: {exc}")
# Tear down busy overlay explicitly to stop spinner and remove filters
overlay = getattr(self, "_busy_overlay", None)
if overlay is not None and shiboken6.isValid(overlay):
try:
overlay.hide()
filt = getattr(overlay, "_filter", None)
if filt is not None and shiboken6.isValid(filt):
try:
self.removeEventFilter(filt)
except Exception as exc:
logger.warning(f"Failed to remove event filter from busy overlay: {exc}")
overlay.deleteLater()
except Exception as exc:
logger.warning(f"Failed to delete busy overlay: {exc}")
self._busy_overlay = None
def closeEvent(self, event):
"""Wrap the close even to ensure the rpc_register is cleaned up."""
+118 -190
View File
@@ -1,8 +1,7 @@
from __future__ import annotations
from bec_lib.logger import bec_logger
from qtpy.QtCore import QEvent, QObject, Qt, QTimer, Signal
from qtpy.QtGui import QColor
from qtpy.QtCore import QEvent, QObject, Qt, QTimer
from qtpy.QtGui import QColor, QFont
from qtpy.QtWidgets import (
QApplication,
QFrame,
@@ -14,10 +13,10 @@ from qtpy.QtWidgets import (
QWidget,
)
from bec_widgets import BECWidget
from bec_widgets.utils.colors import apply_theme
from bec_widgets.utils.error_popups import SafeProperty
logger = bec_logger.logger
from bec_widgets.widgets.plots.waveform.waveform import Waveform
from bec_widgets.widgets.utility.spinner.spinner import SpinnerWidget
class _OverlayEventFilter(QObject):
@@ -29,10 +28,6 @@ class _OverlayEventFilter(QObject):
self._overlay = overlay
def eventFilter(self, obj, event):
if not hasattr(self, "_target") or self._target is None:
return False
if not hasattr(self, "_overlay") or self._overlay is None:
return False
if obj is self._target and event.type() in (
QEvent.Resize,
QEvent.Show,
@@ -58,201 +53,132 @@ class BusyLoaderOverlay(QWidget):
BusyLoaderOverlay: The overlay instance.
"""
foreground_color_changed = Signal(QColor)
scrim_color_changed = Signal(QColor)
def __init__(self, parent: QWidget, opacity: float = 0.35, **kwargs):
def __init__(self, parent: QWidget, text: str = "Loading…", opacity: float = 0.85, **kwargs):
super().__init__(parent=parent, **kwargs)
self.setAttribute(Qt.WA_StyledBackground, True)
self.setAutoFillBackground(False)
self.setAttribute(Qt.WA_TranslucentBackground, True)
self._opacity = opacity
self._scrim_color = QColor(128, 128, 128, 110)
self._label_color = QColor(240, 240, 240)
self._filter: QObject | None = None
# Set Main Layout
layout = QVBoxLayout(self)
layout.setContentsMargins(24, 24, 24, 24)
layout.setSpacing(10)
self.setLayout(layout)
self._label = QLabel(text, self)
self._label.setAlignment(Qt.AlignHCenter | Qt.AlignVCenter)
f = QFont(self._label.font())
f.setBold(True)
f.setPointSize(f.pointSize() + 1)
self._label.setFont(f)
# Custom widget placeholder
self._custom_widget: QWidget | None = None
self._spinner = SpinnerWidget(self)
self._spinner.setFixedSize(42, 42)
lay = QVBoxLayout(self)
lay.setContentsMargins(24, 24, 24, 24)
lay.setSpacing(10)
lay.addStretch(1)
lay.addWidget(self._spinner, 0, Qt.AlignHCenter)
lay.addWidget(self._label, 0, Qt.AlignHCenter)
lay.addStretch(1)
# Add a frame around the content
self._frame = QFrame(self)
self._frame.setObjectName("busyFrame")
self._frame.setAttribute(Qt.WA_TransparentForMouseEvents, True)
self._frame.lower()
# Defaults
self._update_palette()
self._scrim_color = QColor(0, 0, 0, 110)
self._label_color = QColor(240, 240, 240)
self.update_palette()
# Start hidden; interactions beneath are blocked while visible
self.hide()
@SafeProperty(QColor, notify=scrim_color_changed)
def scrim_color(self) -> QColor:
# --- API ---
def set_text(self, text: str):
"""
The overlay scrim color.
"""
return self._scrim_color
@scrim_color.setter
def scrim_color(self, value: QColor):
if not isinstance(value, QColor):
raise TypeError("scrim_color must be a QColor")
self._scrim_color = value
self.update()
@SafeProperty(QColor, notify=foreground_color_changed)
def foreground_color(self) -> QColor:
"""
The overlay foreground color (text, spinner).
"""
return self._label_color
@foreground_color.setter
def foreground_color(self, value: QColor):
if not isinstance(value, QColor):
try:
color = QColor(value)
if not color.isValid():
raise ValueError(f"Invalid color: {value}")
except Exception:
# pylint: disable=raise-missing-from
raise ValueError(f"Color {value} is invalid, cannot be converted to QColor")
self._label_color = value
self.update()
def set_filter(self, filt: _OverlayEventFilter):
"""
Set an event filter to keep the overlay sized and stacked over its target.
Update the overlay text.
Args:
filt(QObject): The event filter instance.
text(str): The text to display on the overlay.
"""
self._filter = filt
target = filt._target
if self.parent() != target:
logger.warning(f"Overlay parent {self.parent()} does not match filter target {target}")
target.installEventFilter(self._filter)
######################
### Public methods ###
######################
def set_widget(self, widget: QWidget):
"""
Set a custom widget as an overlay for the busy overlay.
Args:
widget(QWidget): The custom widget to display.
"""
lay = self.layout()
if lay is None:
return
self._custom_widget = widget
lay.addWidget(widget, 0, Qt.AlignHCenter)
self._label.setText(text)
def set_opacity(self, opacity: float):
"""
Set the overlay opacity. Only values between 0.0 and 1.0 are accepted. If a
value outside this range is provided, it will be clamped.
Set overlay opacity (0..1).
Args:
opacity(float): The opacity value between 0.0 (fully transparent) and 1.0 (fully opaque).
"""
self._opacity = max(0.0, min(1.0, float(opacity)))
# Re-apply alpha using the current theme color
base = self.scrim_color
base.setAlpha(int(255 * self._opacity))
self.scrim_color = base
self._update_palette()
if isinstance(self._scrim_color, QColor):
base = QColor(self._scrim_color)
base.setAlpha(int(255 * self._opacity))
self._scrim_color = base
self.update()
##########################
### Internal methods ###
##########################
def _update_palette(self):
def update_palette(self):
"""
Update colors from the current application theme.
"""
_app = QApplication.instance()
if hasattr(_app, "theme"):
theme = _app.theme # type: ignore[attr-defined]
_bg = theme.color("BORDER")
_fg = theme.color("FG")
app = QApplication.instance()
if hasattr(app, "theme"):
theme = app.theme # type: ignore[attr-defined]
self._bg = theme.color("BORDER")
self._fg = theme.color("FG")
self._primary = theme.color("PRIMARY")
else:
# Fallback neutrals
_bg = QColor(30, 30, 30)
_fg = QColor(230, 230, 230)
self._bg = QColor(30, 30, 30)
self._fg = QColor(230, 230, 230)
# Semi-transparent scrim derived from bg
base = _bg if isinstance(_bg, QColor) else QColor(str(_bg))
base.setAlpha(int(255 * max(0.0, min(1.0, getattr(self, "_opacity", 0.35)))))
self.scrim_color = base
fg = _fg if isinstance(_fg, QColor) else QColor(str(_fg))
self.foreground_color = fg
# Set the frame style with updated foreground colors
r, g, b, a = base.getRgb()
self._scrim_color = QColor(self._bg)
self._scrim_color.setAlpha(int(255 * max(0.0, min(1.0, getattr(self, "_opacity", 0.35)))))
self._spinner.update()
fg_hex = self._fg.name() if isinstance(self._fg, QColor) else str(self._fg)
self._label.setStyleSheet(f"color: {fg_hex};")
self._frame.setStyleSheet(
f"#busyFrame {{ border: 2px dashed {self.foreground_color.name()}; border-radius: 9px; background-color: rgba({r}, {g}, {b}, {a}); }}"
f"#busyFrame {{ border: 2px dashed {fg_hex}; border-radius: 9px; background-color: rgba(128, 128, 128, 110); }}"
)
self.update()
#############################
### Custom Event Handlers ###
#############################
# --- QWidget overrides ---
def showEvent(self, e):
# Call showEvent on custom widget if present
if self._custom_widget is not None:
self._custom_widget.showEvent(e)
self._spinner.start()
super().showEvent(e)
def hideEvent(self, e):
# Call hideEvent on custom widget if present
if self._custom_widget is not None:
self._custom_widget.hideEvent(e)
self._spinner.stop()
super().hideEvent(e)
def resizeEvent(self, e):
# Call resizeEvent on custom widget if present
if self._custom_widget is not None:
self._custom_widget.resizeEvent(e)
super().resizeEvent(e)
r = self.rect().adjusted(10, 10, -10, -10)
self._frame.setGeometry(r)
# TODO should we have this cleanup here?
def cleanup(self):
"""Cleanup resources used by the overlay."""
if self._custom_widget is not None:
if hasattr(self._custom_widget, "cleanup"):
self._custom_widget.cleanup()
def paintEvent(self, e):
super().paintEvent(e)
def install_busy_loader(
target: QWidget, start_loading: bool = False, opacity: float = 0.35
target: QWidget, text: str = "Loading…", start_loading: bool = False, opacity: float = 0.35
) -> BusyLoaderOverlay:
"""
Attach a BusyLoaderOverlay to `target` and keep it sized and stacked.
Args:
target(QWidget): The widget to overlay.
text(str): Initial text to display.
start_loading(bool): If True, show the overlay immediately.
opacity(float): Overlay opacity (0..1).
Returns:
BusyLoaderOverlay: The overlay instance.
"""
overlay = BusyLoaderOverlay(parent=target, opacity=opacity)
overlay = BusyLoaderOverlay(target, text=text, opacity=opacity)
overlay.setGeometry(target.rect())
overlay.set_filter(_OverlayEventFilter(target=target, overlay=overlay))
filt = _OverlayEventFilter(target, overlay)
overlay._filter = filt # type: ignore[attr-defined]
target.installEventFilter(filt)
if start_loading:
overlay.show()
return overlay
@@ -261,63 +187,65 @@ def install_busy_loader(
# --------------------------
# Launchable demo
# --------------------------
class DemoWidget(BECWidget, QWidget): # pragma: no cover
def __init__(self, parent=None):
super().__init__(
parent=parent, theme_update=True, start_busy=True, busy_text="Demo: Initializing…"
)
self._title = QLabel("Demo Content", self)
self._title.setAlignment(Qt.AlignCenter)
self._title.setFrameStyle(QFrame.Panel | QFrame.Sunken)
lay = QVBoxLayout(self)
lay.addWidget(self._title)
waveform = Waveform(self)
waveform.plot([1, 2, 3, 4, 5])
lay.addWidget(waveform, 1)
QTimer.singleShot(5000, self._ready)
def _ready(self):
self._title.setText("Ready ✓")
self.set_busy(False)
class DemoWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("Busy Loader — BECWidget demo")
left = DemoWidget()
right = DemoWidget()
btn_on = QPushButton("Right → Loading")
btn_off = QPushButton("Right → Ready")
btn_text = QPushButton("Set custom text")
btn_on.clicked.connect(lambda: right.set_busy(True, "Fetching data…"))
btn_off.clicked.connect(lambda: right.set_busy(False))
btn_text.clicked.connect(lambda: right.set_busy_text("Almost there…"))
panel = QWidget()
prow = QVBoxLayout(panel)
prow.addWidget(btn_on)
prow.addWidget(btn_off)
prow.addWidget(btn_text)
prow.addStretch(1)
central = QWidget()
row = QHBoxLayout(central)
row.setContentsMargins(12, 12, 12, 12)
row.setSpacing(12)
row.addWidget(left, 1)
row.addWidget(right, 1)
row.addWidget(panel, 0)
self.setCentralWidget(central)
self.resize(900, 420)
if __name__ == "__main__": # pragma: no cover
import sys
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.widgets.plots.waveform.waveform import Waveform
class DemoWidget(BECWidget, QWidget): # pragma: no cover
def __init__(self, parent=None, start_busy: bool = False):
super().__init__(parent=parent, theme_update=True, start_busy=start_busy)
self._title = QLabel("Demo Content", self)
self._title.setAlignment(Qt.AlignCenter)
self._title.setFrameStyle(QFrame.Panel | QFrame.Sunken)
lay = QVBoxLayout(self)
lay.addWidget(self._title)
waveform = Waveform(self)
waveform.plot([1, 2, 3, 4, 5])
lay.addWidget(waveform, 1)
QTimer.singleShot(5000, self._ready)
def _ready(self):
self._title.setText("Ready ✓")
self.set_busy(False)
class DemoWindow(QMainWindow): # pragma: no cover
def __init__(self):
super().__init__()
self.setWindowTitle("Busy Loader — BECWidget demo")
left = DemoWidget(start_busy=True)
right = DemoWidget()
btn_on = QPushButton("Right → Loading")
btn_off = QPushButton("Right → Ready")
btn_text = QPushButton("Set custom text")
btn_on.clicked.connect(lambda: right.set_busy(True))
btn_off.clicked.connect(lambda: right.set_busy(False))
panel = QWidget()
prow = QVBoxLayout(panel)
prow.addWidget(btn_on)
prow.addWidget(btn_off)
prow.addWidget(btn_text)
prow.addStretch(1)
central = QWidget()
row = QHBoxLayout(central)
row.setContentsMargins(12, 12, 12, 12)
row.setSpacing(12)
row.addWidget(left, 1)
row.addWidget(right, 1)
row.addWidget(panel, 0)
self.setCentralWidget(central)
self.resize(900, 420)
app = QApplication(sys.argv)
apply_theme("light")
w = DemoWindow()
+72 -222
View File
@@ -1,27 +1,23 @@
from __future__ import annotations
import re
from functools import lru_cache
from typing import Any, Literal
from typing import Literal
import numpy as np
import pyqtgraph as pg
from bec_lib import bec_logger
from bec_qthemes import apply_theme as apply_theme_global
from bec_qthemes._theme import AccentColors
from pydantic_core import PydanticCustomError
from pyqtgraph.graphicsItems.GradientEditorItem import Gradients
from qtpy.QtCore import QEvent, QEventLoop
from qtpy.QtGui import QColor
from qtpy.QtWidgets import QApplication
logger = bec_logger.logger
def get_theme_name():
if QApplication.instance() is None or not hasattr(QApplication.instance(), "theme"):
return "dark"
return QApplication.instance().theme.theme
else:
return QApplication.instance().theme.theme
def get_theme_palette():
@@ -51,122 +47,31 @@ def apply_theme(theme: Literal["dark", "light"]):
"""
Apply the theme via the global theming API. This updates QSS, QPalette, and pyqtgraph globally.
"""
logger.info(f"Applying theme: {theme}")
process_all_deferred_deletes(QApplication.instance())
apply_theme_global(theme)
process_all_deferred_deletes(QApplication.instance())
def theme_color(theme: Any | None, key: str, fallback: QColor | str) -> QColor:
"""
Return a QColor from a BEC theme, or the fallback when no theme is set.
"""
fallback_color = fallback if isinstance(fallback, QColor) else QColor(str(fallback))
if theme is None:
return fallback_color
return theme.color(key, fallback_color.name())
def rgba(color: QColor | str, alpha: int) -> str:
"""
Return a QSS-compatible rgba string.
"""
qcolor = color if isinstance(color, QColor) else QColor(str(color))
return f"rgba({qcolor.red()}, {qcolor.green()}, {qcolor.blue()}, {alpha})"
class Colors:
@staticmethod
def list_available_colormaps() -> list[str]:
"""
List colormap names available via the pyqtgraph colormap registry.
Note: This does not include `GradientEditorItem` presets (used by HistogramLUT menus).
"""
def _list(source: str | None = None) -> list[str]:
try:
return pg.colormap.listMaps() if source is None else pg.colormap.listMaps(source)
except Exception: # pragma: no cover - backend may be missing
return []
return [*_list(None), *_list("matplotlib"), *_list("colorcet")]
@staticmethod
def list_available_gradient_presets() -> list[str]:
def golden_ratio(num: int) -> list:
"""Calculate the golden ratio for a given number of angles.
Args:
num (int): Number of angles
Returns:
list: List of angles calculated using the golden ratio.
"""
List `GradientEditorItem` preset names (HistogramLUT right-click menu entries).
"""
from pyqtgraph.graphicsItems.GradientEditorItem import Gradients
return list(Gradients.keys())
@staticmethod
def canonical_colormap_name(color_map: str) -> str:
"""
Return an available colormap/preset name if a case-insensitive match exists.
"""
requested = (color_map or "").strip()
if not requested:
return requested
registry = Colors.list_available_colormaps()
presets = Colors.list_available_gradient_presets()
available = set(registry) | set(presets)
if requested in available:
return requested
# Case-insensitive match.
requested_lc = requested.casefold()
for name in available:
if name.casefold() == requested_lc:
return name
return requested
@staticmethod
def get_colormap(color_map: str) -> pg.ColorMap:
"""
Resolve a string into a `pg.ColorMap` using either:
- the `pg.colormap` registry (optionally including matplotlib/colorcet backends), or
- `GradientEditorItem` presets (HistogramLUT right-click menu).
"""
name = Colors.canonical_colormap_name(color_map)
if not name:
raise ValueError("Empty colormap name")
return Colors._get_colormap_cached(name)
@staticmethod
@lru_cache(maxsize=256)
def _get_colormap_cached(name: str) -> pg.ColorMap:
# 1) Registry/backends
try:
cmap = pg.colormap.get(name)
if cmap is not None:
return cmap
except Exception:
pass
for source in ("matplotlib", "colorcet"):
try:
cmap = pg.colormap.get(name, source=source)
if cmap is not None:
return cmap
except Exception:
continue
# 2) Presets -> ColorMap
if name not in Gradients:
raise KeyError(f"Colormap '{name}' not found")
ge = pg.GradientEditorItem()
ge.loadPreset(name)
return ge.colorMap()
phi = 2 * np.pi * ((1 + np.sqrt(5)) / 2)
angles = []
for ii in range(num):
x = np.cos(ii * phi)
y = np.sin(ii * phi)
angle = np.arctan2(y, x)
angles.append(angle)
return angles
@staticmethod
def set_theme_offset(theme: Literal["light", "dark"] | None = None, offset=0.2) -> tuple:
@@ -229,7 +134,7 @@ class Colors:
if theme_offset < 0 or theme_offset > 1:
raise ValueError("theme_offset must be between 0 and 1")
cmap = Colors.get_colormap(colormap)
cmap = pg.colormap.get(colormap)
min_pos, max_pos = Colors.set_theme_offset(theme, theme_offset)
# Generate positions that are evenly spaced within the acceptable range
@@ -238,7 +143,20 @@ class Colors:
else:
positions = np.linspace(min_pos, max_pos, num)
return Colors._format_mapped_colors(cmap.map(positions, mode="float"), format)
# Sample colors from the colormap at the calculated positions
colors = cmap.map(positions, mode="float")
color_list = []
for color in colors:
if format.upper() == "HEX":
color_list.append(QColor.fromRgbF(*color).name())
elif format.upper() == "RGB":
color_list.append(tuple((np.array(color) * 255).astype(int)))
elif format.upper() == "QCOLOR":
color_list.append(QColor.fromRgbF(*color))
else:
raise ValueError("Unsupported format. Please choose 'RGB', 'HEX', or 'QColor'.")
return color_list
@staticmethod
def golden_angle_color(
@@ -264,7 +182,7 @@ class Colors:
ValueError: If theme_offset is not between 0 and 1.
"""
cmap = Colors.get_colormap(colormap)
cmap = pg.colormap.get(colormap)
phi = (1 + np.sqrt(5)) / 2 # Golden ratio
golden_angle_conjugate = 1 - (1 / phi) # Approximately 0.38196601125
@@ -274,19 +192,20 @@ class Colors:
positions = np.mod(np.arange(num) * golden_angle_conjugate, 1)
positions = min_pos + positions * (max_pos - min_pos)
return Colors._format_mapped_colors(cmap.map(positions, mode="float"), format)
# Sample colors from the colormap at the calculated positions
colors = cmap.map(positions, mode="float")
color_list = []
@staticmethod
def _format_mapped_colors(colors: np.ndarray, format: Literal["QColor", "HEX", "RGB"]) -> list:
color_format = format.upper()
if color_format not in {"QCOLOR", "HEX", "RGB"}:
raise ValueError("Unsupported format. Please choose 'RGB', 'HEX', or 'QColor'.")
if color_format == "QCOLOR":
return [QColor.fromRgbF(*color) for color in colors]
if color_format == "HEX":
return [QColor.fromRgbF(*color).name() for color in colors]
return [tuple((np.array(color) * 255).astype(int)) for color in colors]
for color in colors:
if format.upper() == "HEX":
color_list.append(QColor.fromRgbF(*color).name())
elif format.upper() == "RGB":
color_list.append(tuple((np.array(color) * 255).astype(int)))
elif format.upper() == "QCOLOR":
color_list.append(QColor.fromRgbF(*color))
else:
raise ValueError("Unsupported format. Please choose 'RGB', 'HEX', or 'QColor'.")
return color_list
@staticmethod
def hex_to_rgba(hex_color: str, alpha=255) -> tuple:
@@ -310,6 +229,22 @@ class Colors:
raise ValueError("HEX color must be 6 or 8 characters long.")
return (r, g, b, alpha)
@staticmethod
def rgba_to_hex(r: int, g: int, b: int, a: int = 255) -> str:
"""
Convert RGBA color to HEX.
Args:
r(int): Red value (0-255).
g(int): Green value (0-255).
b(int): Blue value (0-255).
a(int): Alpha value (0-255). Default is 255 (opaque).
Returns:
hec_color(str): HEX color string.
"""
return "#{:02X}{:02X}{:02X}{:02X}".format(r, g, b, a)
@staticmethod
def validate_color(color: tuple | str) -> tuple | str:
"""
@@ -513,103 +448,18 @@ class Colors:
Raises:
PydanticCustomError: If colormap is invalid.
"""
normalized = Colors.canonical_colormap_name(color_map)
try:
Colors.get_colormap(normalized)
except Exception as ext:
logger.warning(f"Colormap validation error: {ext}")
available_pg_maps = pg.colormap.listMaps()
available_mpl_maps = pg.colormap.listMaps("matplotlib")
available_mpl_colorcet = pg.colormap.listMaps("colorcet")
available_colormaps = available_pg_maps + available_mpl_maps + available_mpl_colorcet
if color_map not in available_colormaps:
if return_error:
available_colormaps = sorted(
set(Colors.list_available_colormaps())
| set(Colors.list_available_gradient_presets())
)
raise PydanticCustomError(
"unsupported colormap",
f"Colormap '{color_map}' not found in the current installation of pyqtgraph. Choose from the following: {available_colormaps}.",
f"Colormap '{color_map}' not found in the current installation of pyqtgraph. Choose on the following: {available_colormaps}.",
{"wrong_value": color_map},
)
else:
return False
return normalized
@staticmethod
def relative_luminance(color: QColor) -> float:
"""
Calculate the relative luminance of a QColor according to WCAG 2.0 standards.
See https://www.w3.org/TR/WCAG21/#dfn-relative-luminance.
Args:
color(QColor): The color to calculate the relative luminance for.
Returns:
float: The relative luminance of the color.
"""
r = color.red() / 255.0
g = color.green() / 255.0
b = color.blue() / 255.0
def adjust(c):
if c <= 0.03928:
return c / 12.92
return ((c + 0.055) / 1.055) ** 2.4
r = adjust(r)
g = adjust(g)
b = adjust(b)
return 0.2126 * r + 0.7152 * g + 0.0722 * b
@staticmethod
def _tint_strength(
accent: QColor, background: QColor, min_tint: float = 0.06, max_tint: float = 0.18
) -> float:
"""
Calculate the tint strength based on the contrast between the accent and background colors.
min_tint and max_tint define the range of tint strength and are empirically chosen.
Args:
accent(QColor): The accent color.
background(QColor): The background color.
min_tint(float): The minimum tint strength.
max_tint(float): The maximum tint strength.
Returns:
float: The tint strength between 0 and 1.
"""
l_accent = Colors.relative_luminance(accent)
l_bg = Colors.relative_luminance(background)
contrast = abs(l_accent - l_bg)
# normalize contrast to a value between 0 and 1
t = min(contrast / 0.9, 1.0)
return min_tint + t * (max_tint - min_tint)
@staticmethod
def _blend(background: QColor, accent: QColor, t: float) -> QColor:
"""
Blend two colors based on a tint strength t.
"""
return QColor(
round(background.red() + (accent.red() - background.red()) * t),
round(background.green() + (accent.green() - background.green()) * t),
round(background.blue() + (accent.blue() - background.blue()) * t),
round(background.alpha() + (accent.alpha() - background.alpha()) * t),
)
@staticmethod
def subtle_background_color(accent: QColor, background: QColor) -> QColor:
"""
Generate a subtle, contrast-safe background color derived from an accent color.
Args:
accent(QColor): The accent color.
background(QColor): The background color.
Returns:
QColor: The generated subtle background color.
"""
if not accent.isValid() or not background.isValid():
return background
tint = Colors._tint_strength(accent, background)
return Colors._blend(background, accent, tint)
return color_map
+1 -1
View File
@@ -43,7 +43,7 @@ class WidgetContainerUtils:
if list_of_names is None:
list_of_names = []
ii = 0
while ii < 1000: # 1000 is arbitrary!
while ii < 1000: # 1000 is arbritrary!
name_candidate = f"{name}_{ii}"
if name_candidate not in list_of_names:
return name_candidate
+7 -203
View File
@@ -1,38 +1,19 @@
import functools
import sys
import traceback
from typing import Any, Callable, Literal
import shiboken6
from bec_lib.logger import bec_logger
from louie.saferef import safe_ref
from qtpy.QtCore import Property, QObject, Qt, Signal, Slot
from qtpy.QtWidgets import (
QApplication,
QLabel,
QMessageBox,
QPushButton,
QSpinBox,
QTabWidget,
QVBoxLayout,
QWidget,
)
from qtpy.QtWidgets import QApplication, QMessageBox, QPushButton, QVBoxLayout, QWidget
logger = bec_logger.logger
RAISE_ERROR_DEFAULT = False
def SafeProperty(
prop_type,
*prop_args,
popup_error: bool = False,
default: Any = None,
auto_emit: bool = False,
emit_value: Literal["stored", "input"] | Callable[[object, object], object] = "stored",
emit_on_change: bool = True,
**prop_kwargs,
):
def SafeProperty(prop_type, *prop_args, popup_error: bool = False, default=None, **prop_kwargs):
"""
Decorator to create a Qt Property with safe getter and setter so that
Qt Designer won't crash if an exception occurs in either method.
@@ -41,15 +22,7 @@ def SafeProperty(
prop_type: The property type (e.g., str, bool, int, custom classes, etc.)
popup_error (bool): If True, show a popup for any error; otherwise, ignore or log silently.
default: Any default/fallback value to return if the getter raises an exception.
auto_emit (bool): If True, automatically emit property_changed signal when setter is called.
Requires the widget to have a property_changed signal (str, object).
Note: This is different from Qt's 'notify' parameter which expects a Signal.
emit_value: Controls which value is emitted when auto_emit=True.
- "stored" (default): emit the value from the getter after setter runs
- "input": emit the raw setter input
- callable: called as emit_value(self_, value) after setter and must return the value to emit
emit_on_change (bool): If True, emit only when the stored value changes.
*prop_args, **prop_kwargs: Passed along to the underlying Qt Property constructor (check https://doc.qt.io/qt-6/properties.html).
*prop_args, **prop_kwargs: Passed along to the underlying Qt Property constructor.
Usage:
@SafeProperty(int, default=-1)
@@ -61,41 +34,6 @@ def SafeProperty(
def some_value(self, val: int):
# your setter logic
...
# With auto-emit for toolbar sync:
@SafeProperty(bool, auto_emit=True)
def fft(self) -> bool:
return self._fft
@fft.setter
def fft(self, value: bool):
self._fft = value
# property_changed.emit("fft", value) is called automatically
# With custom emit modes:
@SafeProperty(int, auto_emit=True, emit_value="stored")
def precision_stored(self) -> int:
return self._precision_stored
@precision_stored.setter
def precision_stored(self, value: int):
self._precision_stored = max(0, int(value))
@SafeProperty(int, auto_emit=True, emit_value="input")
def precision_input(self) -> int:
return self._precision_input
@precision_input.setter
def precision_input(self, value: int):
self._precision_input = max(0, int(value))
@SafeProperty(int, auto_emit=True, emit_value=lambda _self, v: int(v) * 10)
def precision_callable(self) -> int:
return self._precision_callable
@precision_callable.setter
def precision_callable(self, value: int):
self._precision_callable = max(0, int(value))
"""
def decorator(py_getter):
@@ -115,8 +53,6 @@ def SafeProperty(
logger.error(f"SafeProperty error in GETTER of '{prop_name}':\n{error_msg}")
return default
safe_getter.__is_safe_getter__ = True # type: ignore[attr-defined]
class PropertyWrapper:
"""
Intermediate wrapper used so that the user can optionally chain .setter(...).
@@ -132,42 +68,8 @@ def SafeProperty(
@functools.wraps(setter_func)
def safe_setter(self_, value):
try:
before_value = None
if auto_emit and emit_on_change:
try:
before_value = self.getter_func(self_)
except Exception as e:
logger.warning(
f"SafeProperty could not get 'before' value for change detection: {e}"
)
before_value = None
result = setter_func(self_, value)
# Auto-emit property_changed if auto_emit=True and signal exists
if auto_emit and hasattr(self_, "property_changed"):
prop_name = py_getter.__name__
try:
if callable(emit_value):
emit_payload = emit_value(self_, value)
elif emit_value == "input":
emit_payload = value
else:
emit_payload = self.getter_func(self_)
if emit_on_change and before_value == emit_payload:
return result
self_.property_changed.emit(prop_name, emit_payload)
except Exception as notify_error:
# Don't fail the setter if notification fails
logger.warning(
f"SafeProperty auto_emit failed for '{prop_name}': {notify_error}"
)
return result
except Exception as e:
logger.warning(f"SafeProperty setter caught exception: {e}")
return setter_func(self_, value)
except Exception:
prop_name = f"{setter_func.__module__}.{setter_func.__qualname__}"
error_msg = traceback.format_exc()
@@ -433,100 +335,6 @@ def ErrorPopupUtility():
return _popup_utility_instance
class SafePropertyExampleWidget(QWidget): # pragma: no cover
"""
Example widget showcasing SafeProperty auto_emit modes.
"""
property_changed = Signal(str, object)
def __init__(self):
super().__init__()
self.setWindowTitle("SafeProperty auto_emit example")
self._precision_stored = 0
self._precision_input = 0
self._precision_callable = 0
layout = QVBoxLayout(self)
self.status = QLabel("last emit: <none>", self)
self.spinbox_stored = QSpinBox(self)
self.spinbox_stored.setRange(-5, 10)
self.spinbox_stored.setValue(0)
self.label_stored = QLabel("stored emit: <none>", self)
self.spinbox_input = QSpinBox(self)
self.spinbox_input.setRange(-5, 10)
self.spinbox_input.setValue(0)
self.label_input = QLabel("input emit: <none>", self)
self.spinbox_callable = QSpinBox(self)
self.spinbox_callable.setRange(-5, 10)
self.spinbox_callable.setValue(0)
self.label_callable = QLabel("callable emit: <none>", self)
layout.addWidget(QLabel("stored emit (normalized value):", self))
layout.addWidget(self.spinbox_stored)
layout.addWidget(self.label_stored)
layout.addWidget(QLabel("input emit (raw setter input):", self))
layout.addWidget(self.spinbox_input)
layout.addWidget(self.label_input)
layout.addWidget(QLabel("callable emit (custom mapping):", self))
layout.addWidget(self.spinbox_callable)
layout.addWidget(self.label_callable)
layout.addWidget(self.status)
self.spinbox_stored.valueChanged.connect(self._on_spinbox_stored)
self.spinbox_input.valueChanged.connect(self._on_spinbox_input)
self.spinbox_callable.valueChanged.connect(self._on_spinbox_callable)
self.property_changed.connect(self._on_property_changed)
@SafeProperty(int, auto_emit=True, emit_value="stored", doc="Clamped precision value.")
def precision_stored(self) -> int:
return self._precision_stored
@precision_stored.setter
def precision_stored(self, value: int):
self._precision_stored = max(0, int(value))
@SafeProperty(int, auto_emit=True, emit_value="input", doc="Emit raw input value.")
def precision_input(self) -> int:
return self._precision_input
@precision_input.setter
def precision_input(self, value: int):
self._precision_input = max(0, int(value))
@SafeProperty(int, auto_emit=True, emit_value=lambda _self, v: int(v) * 10)
def precision_callable(self) -> int:
return self._precision_callable
@precision_callable.setter
def precision_callable(self, value: int):
self._precision_callable = max(0, int(value))
def _on_spinbox_stored(self, value: int):
self.precision_stored = value
def _on_spinbox_input(self, value: int):
self.precision_input = value
def _on_spinbox_callable(self, value: int):
self.precision_callable = value
def _on_property_changed(self, prop_name: str, value):
self.status.setText(f"last emit: {prop_name}={value}")
if prop_name == "precision_stored":
self.label_stored.setText(f"stored emit: {value}")
elif prop_name == "precision_input":
self.label_input.setText(f"input emit: {value}")
elif prop_name == "precision_callable":
self.label_callable.setText(f"callable emit: {value}")
class ExampleWidget(QWidget): # pragma: no cover
"""
Example widget to demonstrate error handling with the ErrorPopupUtility.
@@ -581,10 +389,6 @@ class ExampleWidget(QWidget): # pragma: no cover
if __name__ == "__main__": # pragma: no cover
app = QApplication(sys.argv)
tabs = QTabWidget()
tabs.setWindowTitle("Error Popups & SafeProperty Examples")
tabs.addTab(ExampleWidget(), "Error Popups")
tabs.addTab(SafePropertyExampleWidget(), "SafeProperty auto_emit")
tabs.resize(420, 520)
tabs.show()
widget = ExampleWidget()
widget.show()
sys.exit(app.exec_())

Some files were not shown because too many files have changed in this diff Show More