mirror of
https://github.com/bec-project/bec_widgets.git
synced 2026-06-13 00:20:57 +02:00
Compare commits
155 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d99d5e1370 | |||
| 402c721279 | |||
| 6883910797 | |||
| 7de228a412 | |||
| c998e3ec48 | |||
| 1e3661c318 | |||
| 007a408e1a | |||
| 1534118f21 | |||
| 572797626c | |||
| 40a666aa18 | |||
| 577ca4301a | |||
| df4082b31b | |||
| aadb3e129a | |||
| 0580b539fa | |||
| b79c4862c5 | |||
| 148b41e238 | |||
| 6e398e8077 | |||
| 8d75c2af1c | |||
| 24dbb885f6 | |||
| 3b7bad85d3 | |||
| de09cc660a | |||
| 4bb8e86509 | |||
| e5b76bc855 | |||
| 99176198ee | |||
| dcfc573052 | |||
| 9290a9a23b | |||
| d48b9d224f | |||
| 43c311782d | |||
| 44f7acaeda | |||
| 0b212c3100 | |||
| d8ebae49ad | |||
| 153fb62a04 | |||
| d67227d20c | |||
| dc1072c247 | |||
| beb337201c | |||
| 75162ef8a8 | |||
| cc89252fb3 | |||
| 36fa0e649c | |||
| 8e173cb17e | |||
| 322655fc5e | |||
| 2b5b7360ae | |||
| b325d1bb4f | |||
| ee6fd5fb9e | |||
| 53fe1ac63d | |||
| 58e57169e8 | |||
| 2b27faf779 | |||
| b1a3403cd3 | |||
| b38d6dc549 | |||
| cc45fed387 | |||
| 5a594925f0 | |||
| e76dea6f69 | |||
| f4c14d66db | |||
| 4ef1344fec | |||
| 5e63814afe | |||
| 6be6dafd7d | |||
| fd1edf8177 | |||
| 8102f31956 | |||
| f9b92dacc3 | |||
| a219de11c1 | |||
| 45e9f03093 | |||
| 48e2a97ece | |||
| 953760c828 | |||
| dc3129357b | |||
| 12746ae4aa | |||
| 7e9cc20e59 | |||
| 5209f4c210 | |||
| 5f30ab5aa2 | |||
| 3926c5c947 | |||
| f71c8c882f | |||
| 04a30ea04c | |||
| cbdeae15a1 | |||
| 6aa33cacfa | |||
| 73cfe8da4c | |||
| 0467d88010 | |||
| c41ef4401d | |||
| 4f2a840c21 | |||
| 91050e88ae | |||
| 028efed5bc | |||
| 80f2ca40cb | |||
| 7c32d47f52 | |||
| bf7299c31e | |||
| f3470b409d | |||
| 3486dd4e44 | |||
| 46fe5498b5 | |||
| e94ce73950 | |||
| 3cc469a3d1 | |||
| b4e1a7927d | |||
| 84950cc651 | |||
| 24cc8c7b98 | |||
| 2132ace01b | |||
| 67650b96a2 | |||
| 6b1d2958c3 | |||
| dab1defc76 | |||
| c02f509867 | |||
| b585a608c7 | |||
| 21862e8021 | |||
| 15ac1c0182 | |||
| da23a47213 | |||
| 1bb0f1a855 | |||
| f121d09baa | |||
| dd7a5e11df | |||
| 2d4eabead0 | |||
| e607d34337 | |||
| 4a2bc9fcd9 | |||
| 2ffe269727 | |||
| de5773662a | |||
| 53b50e3420 | |||
| b16f88b217 | |||
| 063e5d064c | |||
| c354a9b249 | |||
| caa4e449e4 | |||
| afc8c4733e | |||
| a00024c66f | |||
| 5c18b291b5 | |||
| 08dde431a6 | |||
| 7daa25d7c1 | |||
| 8842eb617a | |||
| 1d0634e142 | |||
| dc6946c924 | |||
| 377bad4854 | |||
| 6cdd813734 | |||
| 3f46f7eb7e | |||
| 73f474c7e7 | |||
| 2dfae4d38f | |||
| f7061baf7b | |||
| 5865d0f97d | |||
| c204815c42 | |||
| af8f3911aa | |||
| 73afb5a472 | |||
| 5836f286de | |||
| 5567274f2d | |||
| 7983a4527a | |||
| 0f63543326 | |||
| 01755aba07 | |||
| b4987fe759 | |||
| b0cb048c81 | |||
| e8c062a48f | |||
| dfe914bb7e | |||
| b66353bf6e | |||
| ead1d38b49 | |||
| b2505c6a56 | |||
| 663c00f1a4 | |||
| 3dd688540e | |||
| 092ac915a8 | |||
| 03015a72a6 | |||
| 7dcaf8fe4c | |||
| 02db6307e4 | |||
| 3a10cac7c8 | |||
| 64fecd16dd | |||
| 76639b3e04 | |||
| a767ee8331 | |||
| 5c33f1a6d4 | |||
| af320d812b | |||
| 5bfb50fdc6 | |||
| 5393a84494 |
@@ -62,4 +62,4 @@ runs:
|
||||
uv pip install --system -e ./ophyd_devices
|
||||
uv pip install --system -e ./bec/bec_lib[dev]
|
||||
uv pip install --system -e ./bec/bec_ipython_client
|
||||
uv pip install --system -e ./bec_widgets[dev,qtermwidget]
|
||||
uv pip install --system -e ./bec_widgets[dev,pyside6]
|
||||
|
||||
@@ -1,169 +0,0 @@
|
||||
##########################
|
||||
### AI-generated file. ###
|
||||
##########################
|
||||
|
||||
"""Aggregate and merge benchmark JSON files.
|
||||
|
||||
The workflow runs the same benchmark suite on multiple independent runners.
|
||||
This script reads every JSON file produced by those attempts, normalizes the
|
||||
contained benchmark values, and writes a compact mapping JSON where each value is
|
||||
the median across attempts. It can also merge independent hyperfine JSON files
|
||||
from one runner into a single hyperfine-style JSON file.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import statistics
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from compare_benchmarks import Benchmark, extract_benchmarks
|
||||
|
||||
|
||||
def collect_benchmarks(paths: list[Path]) -> dict[str, list[Benchmark]]:
|
||||
"""Collect benchmarks from multiple JSON files.
|
||||
|
||||
Args:
|
||||
paths (list[Path]): Paths to hyperfine, pytest-benchmark, or compact
|
||||
mapping JSON files.
|
||||
|
||||
Returns:
|
||||
dict[str, list[Benchmark]]: Benchmarks grouped by benchmark name.
|
||||
"""
|
||||
|
||||
collected: dict[str, list[Benchmark]] = {}
|
||||
for path in paths:
|
||||
for name, benchmark in extract_benchmarks(path).items():
|
||||
collected.setdefault(name, []).append(benchmark)
|
||||
return collected
|
||||
|
||||
|
||||
def aggregate(collected: dict[str, list[Benchmark]]) -> dict[str, dict[str, object]]:
|
||||
"""Aggregate grouped benchmarks using the median value.
|
||||
|
||||
Args:
|
||||
collected (dict[str, list[Benchmark]]): Benchmarks grouped by benchmark
|
||||
name.
|
||||
|
||||
Returns:
|
||||
dict[str, dict[str, object]]: Compact mapping JSON data. Each benchmark
|
||||
contains ``value``, ``unit``, ``metric``, ``attempts``, and
|
||||
``attempt_values``.
|
||||
"""
|
||||
|
||||
aggregated: dict[str, dict[str, object]] = {}
|
||||
for name, benchmarks in sorted(collected.items()):
|
||||
values = [benchmark.value for benchmark in benchmarks]
|
||||
unit = next((benchmark.unit for benchmark in benchmarks if benchmark.unit), "")
|
||||
metric = next((benchmark.metric for benchmark in benchmarks if benchmark.metric), "value")
|
||||
aggregated[name] = {
|
||||
"value": statistics.median(values),
|
||||
"unit": unit,
|
||||
"metric": f"median-of-attempt-{metric}",
|
||||
"attempts": len(values),
|
||||
"attempt_values": values,
|
||||
}
|
||||
return aggregated
|
||||
|
||||
|
||||
def merge_hyperfine_results(paths: list[Path]) -> dict[str, Any]:
|
||||
"""Merge hyperfine result files.
|
||||
|
||||
Args:
|
||||
paths (list[Path]): Hyperfine JSON files to merge.
|
||||
|
||||
Returns:
|
||||
dict[str, Any]: Hyperfine-style JSON object containing all result rows.
|
||||
|
||||
Raises:
|
||||
ValueError: If any file has no hyperfine ``results`` list.
|
||||
"""
|
||||
|
||||
merged: dict[str, Any] = {"results": []}
|
||||
for path in paths:
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
results = data.get("results", []) if isinstance(data, dict) else None
|
||||
if not isinstance(results, list):
|
||||
raise ValueError(f"{path} has no hyperfine results list")
|
||||
merged["results"].extend(results)
|
||||
return merged
|
||||
|
||||
|
||||
def main_from_paths(input_dir: Path, output: Path) -> int:
|
||||
"""Aggregate all JSON files in a directory and write the result.
|
||||
|
||||
Args:
|
||||
input_dir (Path): Directory containing benchmark JSON files.
|
||||
output (Path): Path where the aggregate JSON should be written.
|
||||
|
||||
Returns:
|
||||
int: Always ``0`` on success.
|
||||
|
||||
Raises:
|
||||
ValueError: If no JSON files are found in ``input_dir``.
|
||||
"""
|
||||
|
||||
paths = sorted(input_dir.rglob("*.json"))
|
||||
if not paths:
|
||||
raise ValueError(f"No benchmark JSON files found in {input_dir}")
|
||||
|
||||
output.parent.mkdir(parents=True, exist_ok=True)
|
||||
output.write_text(
|
||||
json.dumps(aggregate(collect_benchmarks(paths)), indent=2, sort_keys=True) + "\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
def merge_from_paths(input_dir: Path, output: Path) -> int:
|
||||
"""Merge all hyperfine JSON files in a directory and write the result.
|
||||
|
||||
Args:
|
||||
input_dir (Path): Directory containing hyperfine JSON files.
|
||||
output (Path): Path where the merged JSON should be written.
|
||||
|
||||
Returns:
|
||||
int: Always ``0`` on success.
|
||||
|
||||
Raises:
|
||||
ValueError: If no JSON files are found in ``input_dir``.
|
||||
"""
|
||||
|
||||
paths = sorted(input_dir.glob("*.json"))
|
||||
if not paths:
|
||||
raise ValueError(f"No hyperfine JSON files found in {input_dir}")
|
||||
|
||||
output.parent.mkdir(parents=True, exist_ok=True)
|
||||
output.write_text(
|
||||
json.dumps(merge_hyperfine_results(paths), indent=2, sort_keys=True) + "\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
def main() -> int:
|
||||
"""Run the benchmark aggregation command line interface.
|
||||
|
||||
Returns:
|
||||
int: Always ``0`` on success.
|
||||
"""
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"--mode",
|
||||
choices=("aggregate", "merge-hyperfine"),
|
||||
default="aggregate",
|
||||
help="Operation to perform.",
|
||||
)
|
||||
parser.add_argument("--input-dir", required=True, type=Path)
|
||||
parser.add_argument("--output", required=True, type=Path)
|
||||
args = parser.parse_args()
|
||||
if args.mode == "merge-hyperfine":
|
||||
return merge_from_paths(input_dir=args.input_dir, output=args.output)
|
||||
return main_from_paths(input_dir=args.input_dir, output=args.output)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
@@ -1,454 +0,0 @@
|
||||
##########################
|
||||
### AI-generated file. ###
|
||||
##########################
|
||||
|
||||
"""Compare benchmark JSON files and write a GitHub Actions summary.
|
||||
|
||||
The script supports JSON emitted by hyperfine, JSON emitted by pytest-benchmark,
|
||||
and a compact mapping format generated by ``aggregate_benchmarks.py``. Timing
|
||||
formats prefer median values and fall back to mean values when median values are
|
||||
not present.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import math
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Benchmark:
|
||||
"""Normalized benchmark result.
|
||||
|
||||
Attributes:
|
||||
name (str): Stable benchmark name used to match baseline and current results.
|
||||
value (float): Numeric benchmark value used for comparison.
|
||||
unit (str): Display unit for the value, for example ``"s"``.
|
||||
metric (str): Source metric name, for example ``"median"`` or ``"mean"``.
|
||||
"""
|
||||
|
||||
name: str
|
||||
value: float
|
||||
unit: str
|
||||
metric: str = "value"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Comparison:
|
||||
"""Comparison between one baseline benchmark and one current benchmark.
|
||||
|
||||
Attributes:
|
||||
name (str): Benchmark name.
|
||||
baseline (float): Baseline benchmark value.
|
||||
current (float): Current benchmark value.
|
||||
delta_percent (float): Percent change from baseline to current.
|
||||
unit (str): Display unit for both values.
|
||||
metric (str): Current result metric used for comparison.
|
||||
regressed (bool): Whether the change exceeds the configured threshold in
|
||||
the worse direction.
|
||||
improved (bool): Whether the change exceeds the configured threshold in
|
||||
the better direction.
|
||||
"""
|
||||
|
||||
name: str
|
||||
baseline: float
|
||||
current: float
|
||||
delta_percent: float
|
||||
unit: str
|
||||
metric: str
|
||||
regressed: bool
|
||||
improved: bool
|
||||
|
||||
|
||||
def _read_json(path: Path) -> Any:
|
||||
"""Read JSON data from a file.
|
||||
|
||||
Args:
|
||||
path (Path): Path to the JSON file.
|
||||
|
||||
Returns:
|
||||
Any: Parsed JSON value.
|
||||
"""
|
||||
|
||||
with path.open("r", encoding="utf-8") as stream:
|
||||
return json.load(stream)
|
||||
|
||||
|
||||
def _as_float(value: Any) -> float | None:
|
||||
"""Convert a value to a finite float.
|
||||
|
||||
Args:
|
||||
value (Any): Value to convert.
|
||||
|
||||
Returns:
|
||||
float | None: Converted finite float, or ``None`` if conversion fails.
|
||||
"""
|
||||
|
||||
try:
|
||||
result = float(value)
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
if math.isfinite(result):
|
||||
return result
|
||||
return None
|
||||
|
||||
|
||||
def _extract_hyperfine(data: dict[str, Any]) -> dict[str, Benchmark]:
|
||||
"""Extract normalized benchmarks from hyperfine JSON.
|
||||
|
||||
Args:
|
||||
data (dict[str, Any]): Parsed hyperfine JSON object.
|
||||
|
||||
Returns:
|
||||
dict[str, Benchmark]: Benchmarks keyed by command name.
|
||||
"""
|
||||
|
||||
benchmarks: dict[str, Benchmark] = {}
|
||||
for result in data.get("results", []):
|
||||
if not isinstance(result, dict):
|
||||
continue
|
||||
name = str(result.get("command") or result.get("name") or "").strip()
|
||||
metric = "median"
|
||||
value = _as_float(result.get(metric))
|
||||
if value is None:
|
||||
metric = "mean"
|
||||
value = _as_float(result.get(metric))
|
||||
if name and value is not None:
|
||||
benchmarks[name] = Benchmark(name=name, value=value, unit="s", metric=metric)
|
||||
return benchmarks
|
||||
|
||||
|
||||
def _extract_pytest_benchmark(data: dict[str, Any]) -> dict[str, Benchmark]:
|
||||
"""Extract normalized benchmarks from pytest-benchmark JSON.
|
||||
|
||||
Args:
|
||||
data (dict[str, Any]): Parsed pytest-benchmark JSON object.
|
||||
|
||||
Returns:
|
||||
dict[str, Benchmark]: Benchmarks keyed by full benchmark name.
|
||||
"""
|
||||
|
||||
benchmarks: dict[str, Benchmark] = {}
|
||||
for benchmark in data.get("benchmarks", []):
|
||||
if not isinstance(benchmark, dict):
|
||||
continue
|
||||
|
||||
name = str(benchmark.get("fullname") or benchmark.get("name") or "").strip()
|
||||
stats = benchmark.get("stats", {})
|
||||
value = None
|
||||
metric = "median"
|
||||
if isinstance(stats, dict):
|
||||
value = _as_float(stats.get(metric))
|
||||
if value is None:
|
||||
metric = "mean"
|
||||
value = _as_float(stats.get(metric))
|
||||
if name and value is not None:
|
||||
benchmarks[name] = Benchmark(name=name, value=value, unit="s", metric=metric)
|
||||
return benchmarks
|
||||
|
||||
|
||||
def _extract_simple_mapping(data: dict[str, Any]) -> dict[str, Benchmark]:
|
||||
"""Extract normalized benchmarks from a compact mapping JSON object.
|
||||
|
||||
Args:
|
||||
data (dict[str, Any]): Parsed mapping where each benchmark is either a
|
||||
raw number or an object containing ``value``, ``unit``, and ``metric``.
|
||||
|
||||
Returns:
|
||||
dict[str, Benchmark]: Benchmarks keyed by mapping key.
|
||||
"""
|
||||
|
||||
benchmarks: dict[str, Benchmark] = {}
|
||||
|
||||
for name, raw_value in data.items():
|
||||
if name in {"version", "context", "commit", "timestamp"}:
|
||||
continue
|
||||
|
||||
value = _as_float(raw_value)
|
||||
unit = ""
|
||||
metric = "value"
|
||||
if value is None and isinstance(raw_value, dict):
|
||||
value = _as_float(raw_value.get("value"))
|
||||
unit = str(raw_value.get("unit") or "")
|
||||
metric = str(raw_value.get("metric") or "value")
|
||||
|
||||
if value is not None:
|
||||
benchmarks[str(name)] = Benchmark(name=str(name), value=value, unit=unit, metric=metric)
|
||||
|
||||
return benchmarks
|
||||
|
||||
|
||||
def extract_benchmarks(path: Path) -> dict[str, Benchmark]:
|
||||
"""Extract normalized benchmarks from a supported JSON file.
|
||||
|
||||
Args:
|
||||
path (Path): Path to a hyperfine, pytest-benchmark, or compact mapping
|
||||
JSON file.
|
||||
|
||||
Returns:
|
||||
dict[str, Benchmark]: Normalized benchmarks keyed by name.
|
||||
|
||||
Raises:
|
||||
ValueError: If the JSON root is not an object or no supported benchmark
|
||||
entries can be extracted.
|
||||
"""
|
||||
|
||||
data = _read_json(path)
|
||||
if not isinstance(data, dict):
|
||||
raise ValueError(f"{path} must contain a JSON object")
|
||||
|
||||
extractors = (_extract_hyperfine, _extract_pytest_benchmark, _extract_simple_mapping)
|
||||
for extractor in extractors:
|
||||
benchmarks = extractor(data)
|
||||
if benchmarks:
|
||||
return benchmarks
|
||||
|
||||
raise ValueError(f"No supported benchmark entries found in {path}")
|
||||
|
||||
|
||||
def compare_benchmarks(
|
||||
baseline: dict[str, Benchmark],
|
||||
current: dict[str, Benchmark],
|
||||
threshold_percent: float,
|
||||
higher_is_better: bool,
|
||||
) -> tuple[list[Comparison], list[str], list[str]]:
|
||||
"""Compare baseline benchmarks with current benchmarks.
|
||||
|
||||
Args:
|
||||
baseline (dict[str, Benchmark]): Baseline benchmarks keyed by name.
|
||||
current (dict[str, Benchmark]): Current benchmarks keyed by name.
|
||||
threshold_percent (float): Regression threshold in percent.
|
||||
higher_is_better (bool): If ``True``, lower current values are treated as
|
||||
regressions. If ``False``, higher current values are treated as
|
||||
regressions.
|
||||
|
||||
Returns:
|
||||
tuple[list[Comparison], list[str], list[str]]: Comparisons for common
|
||||
benchmark names, names missing from current results, and names newly
|
||||
present in current results.
|
||||
"""
|
||||
|
||||
comparisons: list[Comparison] = []
|
||||
missing_in_current: list[str] = []
|
||||
new_in_current: list[str] = []
|
||||
|
||||
for name, baseline_benchmark in sorted(baseline.items()):
|
||||
current_benchmark = current.get(name)
|
||||
if current_benchmark is None:
|
||||
missing_in_current.append(name)
|
||||
continue
|
||||
|
||||
if baseline_benchmark.value == 0:
|
||||
delta_percent = 0.0
|
||||
else:
|
||||
delta_percent = (
|
||||
(current_benchmark.value - baseline_benchmark.value)
|
||||
/ abs(baseline_benchmark.value)
|
||||
* 100
|
||||
)
|
||||
|
||||
if higher_is_better:
|
||||
regressed = delta_percent <= -threshold_percent
|
||||
improved = delta_percent >= threshold_percent
|
||||
else:
|
||||
regressed = delta_percent >= threshold_percent
|
||||
improved = delta_percent <= -threshold_percent
|
||||
|
||||
comparisons.append(
|
||||
Comparison(
|
||||
name=name,
|
||||
baseline=baseline_benchmark.value,
|
||||
current=current_benchmark.value,
|
||||
delta_percent=delta_percent,
|
||||
unit=current_benchmark.unit or baseline_benchmark.unit,
|
||||
metric=current_benchmark.metric,
|
||||
regressed=regressed,
|
||||
improved=improved,
|
||||
)
|
||||
)
|
||||
|
||||
for name in sorted(set(current) - set(baseline)):
|
||||
new_in_current.append(name)
|
||||
|
||||
return comparisons, missing_in_current, new_in_current
|
||||
|
||||
|
||||
def _format_value(value: float, unit: str) -> str:
|
||||
"""Format a benchmark value for Markdown output.
|
||||
|
||||
Args:
|
||||
value (float): Numeric benchmark value.
|
||||
unit (str): Display unit.
|
||||
|
||||
Returns:
|
||||
str: Formatted value with optional unit suffix.
|
||||
"""
|
||||
|
||||
suffix = f" {unit}" if unit else ""
|
||||
return f"{value:.6g}{suffix}"
|
||||
|
||||
|
||||
def _format_status(comparison: Comparison) -> str:
|
||||
"""Format a comparison status for Markdown output."""
|
||||
|
||||
if comparison.regressed:
|
||||
return ":red_circle: regressed"
|
||||
if comparison.improved:
|
||||
return ":green_circle: improved"
|
||||
return "ok"
|
||||
|
||||
|
||||
def write_summary(
|
||||
path: Path,
|
||||
comparisons: list[Comparison],
|
||||
missing_in_current: list[str],
|
||||
new_in_current: list[str],
|
||||
threshold_percent: float,
|
||||
higher_is_better: bool,
|
||||
) -> None:
|
||||
"""Write a Markdown benchmark comparison summary.
|
||||
|
||||
Args:
|
||||
path (Path): Path where the summary should be written.
|
||||
comparisons (list[Comparison]): Comparison rows for matching benchmarks.
|
||||
missing_in_current (list[str]): Baseline benchmark names missing from the
|
||||
current result.
|
||||
new_in_current (list[str]): Current benchmark names not present in the
|
||||
baseline result.
|
||||
threshold_percent (float): Regression threshold in percent.
|
||||
higher_is_better (bool): Whether higher benchmark values are considered
|
||||
better.
|
||||
"""
|
||||
|
||||
regressions = [comparison for comparison in comparisons if comparison.regressed]
|
||||
improvements = [comparison for comparison in comparisons if comparison.improved]
|
||||
direction = "higher is better" if higher_is_better else "lower is better"
|
||||
sorted_comparisons = sorted(comparisons, key=lambda comparison: comparison.name)
|
||||
|
||||
lines = [
|
||||
"<!-- bw-benchmark-comment -->",
|
||||
"## Benchmark comparison",
|
||||
"",
|
||||
f"Threshold: {threshold_percent:g}% ({direction}).",
|
||||
f"Result: {len(regressions)} regression(s), {len(improvements)} improvement(s) beyond threshold.",
|
||||
]
|
||||
lines.append("")
|
||||
|
||||
if regressions:
|
||||
lines.extend(
|
||||
[
|
||||
f"{len(regressions)} benchmark(s) regressed beyond the configured threshold.",
|
||||
"",
|
||||
"| Benchmark | Baseline | Current | Change |",
|
||||
"| --- | ---: | ---: | ---: |",
|
||||
]
|
||||
)
|
||||
for comparison in regressions:
|
||||
lines.append(
|
||||
"| "
|
||||
f"{comparison.name} | "
|
||||
f"{_format_value(comparison.baseline, comparison.unit)} | "
|
||||
f"{_format_value(comparison.current, comparison.unit)} | "
|
||||
f"{comparison.delta_percent:+.2f}% |"
|
||||
)
|
||||
else:
|
||||
lines.append("No benchmark regression exceeded the configured threshold.")
|
||||
|
||||
lines.append("")
|
||||
|
||||
if improvements:
|
||||
lines.extend(
|
||||
[
|
||||
f"{len(improvements)} benchmark(s) improved beyond the configured threshold.",
|
||||
"",
|
||||
"| Benchmark | Baseline | Current | Change |",
|
||||
"| --- | ---: | ---: | ---: |",
|
||||
]
|
||||
)
|
||||
for comparison in improvements:
|
||||
lines.append(
|
||||
"| "
|
||||
f"{comparison.name} | "
|
||||
f"{_format_value(comparison.baseline, comparison.unit)} | "
|
||||
f"{_format_value(comparison.current, comparison.unit)} | "
|
||||
f"{comparison.delta_percent:+.2f}% |"
|
||||
)
|
||||
else:
|
||||
lines.append("No benchmark improvement exceeded the configured threshold.")
|
||||
|
||||
if sorted_comparisons:
|
||||
lines.extend(
|
||||
[
|
||||
"",
|
||||
"<details>",
|
||||
"<summary>All benchmark results</summary>",
|
||||
"",
|
||||
"| Benchmark | Baseline | Current | Change | Status |",
|
||||
"| --- | ---: | ---: | ---: | --- |",
|
||||
]
|
||||
)
|
||||
for comparison in sorted_comparisons:
|
||||
lines.append(
|
||||
"| "
|
||||
f"{comparison.name} | "
|
||||
f"{_format_value(comparison.baseline, comparison.unit)} | "
|
||||
f"{_format_value(comparison.current, comparison.unit)} | "
|
||||
f"{comparison.delta_percent:+.2f}% | "
|
||||
f"{_format_status(comparison)} |"
|
||||
)
|
||||
lines.extend(["", "</details>"])
|
||||
|
||||
if missing_in_current:
|
||||
lines.extend(["", "Missing benchmarks in the current run:"])
|
||||
lines.extend(f"- `{name}`" for name in missing_in_current)
|
||||
|
||||
if new_in_current:
|
||||
lines.extend(["", "New benchmarks in the current run:"])
|
||||
lines.extend(f"- `{name}`" for name in new_in_current)
|
||||
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
||||
|
||||
|
||||
def main() -> int:
|
||||
"""Run the benchmark comparison command line interface.
|
||||
|
||||
Returns:
|
||||
int: ``1`` when a regression exceeds the threshold, otherwise ``0``.
|
||||
"""
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--baseline", required=True, type=Path)
|
||||
parser.add_argument("--current", required=True, type=Path)
|
||||
parser.add_argument("--summary", required=True, type=Path)
|
||||
parser.add_argument("--threshold-percent", required=True, type=float)
|
||||
parser.add_argument("--higher-is-better", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
baseline = extract_benchmarks(args.baseline)
|
||||
current = extract_benchmarks(args.current)
|
||||
comparisons, missing_in_current, new_in_current = compare_benchmarks(
|
||||
baseline=baseline,
|
||||
current=current,
|
||||
threshold_percent=args.threshold_percent,
|
||||
higher_is_better=args.higher_is_better,
|
||||
)
|
||||
|
||||
write_summary(
|
||||
path=args.summary,
|
||||
comparisons=comparisons,
|
||||
missing_in_current=missing_in_current,
|
||||
new_in_current=new_in_current,
|
||||
threshold_percent=args.threshold_percent,
|
||||
higher_is_better=args.higher_is_better,
|
||||
)
|
||||
|
||||
return 1 if any(comparison.regressed for comparison in comparisons) else 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
@@ -1,74 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
##########################
|
||||
### AI-generated file. ###
|
||||
##########################
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
mkdir -p benchmark-results
|
||||
benchmark_json="${BENCHMARK_JSON:-benchmark-results/current.json}"
|
||||
benchmark_root="$(dirname "$benchmark_json")"
|
||||
hyperfine_benchmark_dir="${BENCHMARK_HYPERFINE_DIR:-tests/benchmarks/hyperfine}"
|
||||
pytest_benchmark_dirs="${BENCHMARK_PYTEST_DIRS:-${BENCHMARK_PYTEST_DIR:-}}"
|
||||
benchmark_work_dir="$benchmark_root/raw-results"
|
||||
hyperfine_json_dir="$benchmark_work_dir/hyperfine"
|
||||
pytest_json="$benchmark_work_dir/pytest.json"
|
||||
|
||||
shopt -s nullglob
|
||||
benchmark_scripts=()
|
||||
benchmark_scripts=("$hyperfine_benchmark_dir"/benchmark_*.sh)
|
||||
shopt -u nullglob
|
||||
|
||||
pytest_dirs=()
|
||||
for pytest_benchmark_dir in $pytest_benchmark_dirs; do
|
||||
if [ -d "$pytest_benchmark_dir" ]; then
|
||||
pytest_dirs+=("$pytest_benchmark_dir")
|
||||
else
|
||||
echo "Pytest benchmark directory not found: $pytest_benchmark_dir" >&2
|
||||
exit 1
|
||||
fi
|
||||
done
|
||||
|
||||
if [ "${#benchmark_scripts[@]}" -eq 0 ] && [ "${#pytest_dirs[@]}" -eq 0 ]; then
|
||||
echo "No benchmark scripts or pytest benchmarks found" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "Benchmark Python: $(command -v python)"
|
||||
python -c 'import sys; print(sys.version)'
|
||||
|
||||
rm -rf "$benchmark_work_dir"
|
||||
mkdir -p "$hyperfine_json_dir"
|
||||
|
||||
if [ "${#benchmark_scripts[@]}" -gt 0 ]; then
|
||||
for benchmark_script in "${benchmark_scripts[@]}"; do
|
||||
title="$(sed -n 's/^# BENCHMARK_TITLE:[[:space:]]*//p' "$benchmark_script" | head -n 1)"
|
||||
if [ -z "$title" ]; then
|
||||
title="$(basename "$benchmark_script" .sh)"
|
||||
fi
|
||||
benchmark_name="$(basename "$benchmark_script" .sh)"
|
||||
benchmark_result_json="$hyperfine_json_dir/$benchmark_name.json"
|
||||
echo "Preflight benchmark script: $benchmark_script"
|
||||
bash "$benchmark_script"
|
||||
|
||||
hyperfine \
|
||||
--show-output \
|
||||
--warmup 1 \
|
||||
--runs 5 \
|
||||
--command-name "$title" \
|
||||
--export-json "$benchmark_result_json" \
|
||||
"bash $(printf "%q" "$benchmark_script")"
|
||||
done
|
||||
fi
|
||||
|
||||
if [ "${#pytest_dirs[@]}" -gt 0 ]; then
|
||||
pytest \
|
||||
-q "${pytest_dirs[@]}" \
|
||||
--benchmark-only \
|
||||
--benchmark-json "$pytest_json"
|
||||
fi
|
||||
|
||||
python .github/scripts/aggregate_benchmarks.py \
|
||||
--input-dir "$benchmark_work_dir" \
|
||||
--output "$benchmark_json"
|
||||
@@ -1,125 +0,0 @@
|
||||
##########################
|
||||
### AI-generated file. ###
|
||||
##########################
|
||||
|
||||
"""Run a command with BEC e2e services available."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import bec_lib
|
||||
from bec_ipython_client import BECIPythonClient
|
||||
from bec_lib.redis_connector import RedisConnector
|
||||
from bec_lib.service_config import ServiceConfig, ServiceConfigModel
|
||||
from redis import Redis
|
||||
|
||||
|
||||
def _wait_for_redis(host: str, port: int) -> None:
|
||||
client = Redis(host=host, port=port)
|
||||
deadline = time.monotonic() + 10
|
||||
while time.monotonic() < deadline:
|
||||
try:
|
||||
if client.ping():
|
||||
return
|
||||
except Exception:
|
||||
time.sleep(0.1)
|
||||
raise RuntimeError(f"Redis did not start on {host}:{port}")
|
||||
|
||||
|
||||
def _start_redis(files_path: Path, host: str, port: int) -> subprocess.Popen:
|
||||
redis_server = shutil.which("redis-server")
|
||||
if redis_server is None:
|
||||
raise RuntimeError("redis-server executable not found")
|
||||
|
||||
return subprocess.Popen(
|
||||
[
|
||||
redis_server,
|
||||
"--bind",
|
||||
host,
|
||||
"--port",
|
||||
str(port),
|
||||
"--save",
|
||||
"",
|
||||
"--appendonly",
|
||||
"no",
|
||||
"--dir",
|
||||
str(files_path),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def _write_configs(files_path: Path, host: str, port: int) -> Path:
|
||||
test_config = files_path / "test_config.yaml"
|
||||
services_config = files_path / "services_config.yaml"
|
||||
|
||||
bec_lib_path = Path(bec_lib.__file__).resolve().parent
|
||||
shutil.copyfile(bec_lib_path / "tests" / "test_config.yaml", test_config)
|
||||
|
||||
service_config = ServiceConfigModel(
|
||||
redis={"host": host, "port": port}, file_writer={"base_path": str(files_path)}
|
||||
)
|
||||
services_config.write_text(service_config.model_dump_json(indent=4), encoding="utf-8")
|
||||
return services_config
|
||||
|
||||
|
||||
def _load_demo_config(services_config: Path) -> None:
|
||||
bec = BECIPythonClient(ServiceConfig(services_config), RedisConnector, forced=True)
|
||||
bec.start()
|
||||
try:
|
||||
bec.config.load_demo_config()
|
||||
finally:
|
||||
bec.shutdown()
|
||||
bec._client._reset_singleton()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("command", nargs=argparse.REMAINDER)
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.command[:1] == ["--"]:
|
||||
args.command = args.command[1:]
|
||||
if not args.command:
|
||||
raise ValueError("No command provided")
|
||||
|
||||
host = "127.0.0.1"
|
||||
port = 6379
|
||||
|
||||
with tempfile.TemporaryDirectory(prefix="bec-benchmark-") as tmp:
|
||||
files_path = Path(tmp)
|
||||
services_config = _write_configs(files_path, host, port)
|
||||
redis_process = _start_redis(files_path, host, port)
|
||||
processes = None
|
||||
service_handler = None
|
||||
try:
|
||||
_wait_for_redis(host, port)
|
||||
|
||||
from bec_server.bec_server_utils.service_handler import ServiceHandler
|
||||
|
||||
service_handler = ServiceHandler(
|
||||
bec_path=files_path, config_path=services_config, interface="subprocess"
|
||||
)
|
||||
processes = service_handler.start()
|
||||
_load_demo_config(services_config)
|
||||
|
||||
env = os.environ.copy()
|
||||
return subprocess.run(args.command, env=env, check=False).returncode
|
||||
finally:
|
||||
if service_handler is not None and processes is not None:
|
||||
service_handler.stop(processes)
|
||||
redis_process.terminate()
|
||||
try:
|
||||
redis_process.wait(timeout=10)
|
||||
except subprocess.TimeoutExpired:
|
||||
redis_process.kill()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
@@ -1,242 +0,0 @@
|
||||
name: BW Benchmarks
|
||||
|
||||
on: [ workflow_call ]
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
env:
|
||||
BENCHMARK_JSON: benchmark-results/current.json
|
||||
BENCHMARK_BASELINE_JSON: gh-pages-benchmark-data/benchmarks/latest.json
|
||||
BENCHMARK_SUMMARY: benchmark-results/summary.md
|
||||
BENCHMARK_COMMAND: "bash .github/scripts/run_benchmarks.sh"
|
||||
BENCHMARK_THRESHOLD_PERCENT: 20
|
||||
BENCHMARK_HIGHER_IS_BETTER: false
|
||||
|
||||
jobs:
|
||||
benchmark_attempt:
|
||||
runs-on: ubuntu-latest
|
||||
continue-on-error: true
|
||||
permissions:
|
||||
contents: read
|
||||
defaults:
|
||||
run:
|
||||
shell: bash -el {0}
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
attempt: [ 1, 2, 3 ]
|
||||
|
||||
env:
|
||||
BENCHMARK_JSON: benchmark-results/current-${{ matrix.attempt }}.json
|
||||
BEC_CORE_BRANCH: main
|
||||
OPHYD_DEVICES_BRANCH: main
|
||||
PLUGIN_REPO_BRANCH: main
|
||||
BENCHMARK_PYTEST_DIRS: tests/unit_tests/benchmarks
|
||||
QTWEBENGINE_DISABLE_SANDBOX: 1
|
||||
QT_QPA_PLATFORM: "offscreen"
|
||||
|
||||
steps:
|
||||
- name: Checkout BEC Widgets
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: bec-project/bec_widgets
|
||||
ref: ${{ github.event.pull_request.head.sha || github.sha }}
|
||||
|
||||
- name: Set up Conda
|
||||
uses: conda-incubator/setup-miniconda@v3
|
||||
with:
|
||||
auto-update-conda: true
|
||||
auto-activate-base: true
|
||||
python-version: "3.11"
|
||||
|
||||
- name: Install system dependencies
|
||||
run: |
|
||||
sudo apt-get update
|
||||
sudo apt-get install -y libgl1 libegl1 x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb
|
||||
sudo apt-get -y install libnss3 libxdamage1 libasound2t64 libatomic1 libxcursor1
|
||||
sudo apt-get -y install ttyd hyperfine redis-server
|
||||
|
||||
- name: Install full e2e environment
|
||||
run: |
|
||||
echo -e "\033[35;1m Using branch $BEC_CORE_BRANCH of BEC CORE \033[0;m";
|
||||
git clone --branch "$BEC_CORE_BRANCH" https://github.com/bec-project/bec.git
|
||||
echo -e "\033[35;1m Using branch $OPHYD_DEVICES_BRANCH of OPHYD_DEVICES \033[0;m";
|
||||
git clone --branch "$OPHYD_DEVICES_BRANCH" https://github.com/bec-project/ophyd_devices.git
|
||||
export OHPYD_DEVICES_PATH=$PWD/ophyd_devices
|
||||
echo -e "\033[35;1m Using branch $PLUGIN_REPO_BRANCH of bec_testing_plugin \033[0;m";
|
||||
git clone --branch "$PLUGIN_REPO_BRANCH" https://github.com/bec-project/bec_testing_plugin.git
|
||||
cd ./bec
|
||||
conda create -q -n test-environment python=3.11
|
||||
conda activate test-environment
|
||||
source ./bin/install_bec_dev.sh -t
|
||||
cd ../
|
||||
python -m pip install -e ./ophyd_devices -e .[dev,pyside6] -e ./bec_testing_plugin pytest-benchmark
|
||||
|
||||
mkdir -p "$(dirname "$BENCHMARK_JSON")"
|
||||
python .github/scripts/run_with_bec_servers.py -- bash -lc "$BENCHMARK_COMMAND"
|
||||
test -s "$BENCHMARK_JSON"
|
||||
|
||||
- name: Upload benchmark artifact
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: bw-benchmark-json-${{ matrix.attempt }}
|
||||
path: ${{ env.BENCHMARK_JSON }}
|
||||
|
||||
benchmark:
|
||||
needs: [ benchmark_attempt ]
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
issues: write
|
||||
pull-requests: write
|
||||
|
||||
steps:
|
||||
- name: Checkout BEC Widgets
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: bec-project/bec_widgets
|
||||
ref: ${{ github.event.pull_request.head.sha || github.sha }}
|
||||
|
||||
- name: Download benchmark attempts
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
pattern: bw-benchmark-json-*
|
||||
path: benchmark-results/attempts
|
||||
merge-multiple: true
|
||||
|
||||
- name: Aggregate benchmark attempts
|
||||
run: |
|
||||
python .github/scripts/aggregate_benchmarks.py \
|
||||
--input-dir benchmark-results/attempts \
|
||||
--output "$BENCHMARK_JSON"
|
||||
|
||||
- name: Upload aggregate benchmark artifact
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: bw-benchmark-json
|
||||
path: ${{ env.BENCHMARK_JSON }}
|
||||
|
||||
- name: Fetch gh-pages benchmark data
|
||||
run: |
|
||||
if git ls-remote --exit-code --heads origin gh-pages; then
|
||||
git clone --depth=1 --branch gh-pages "$GITHUB_SERVER_URL/$GITHUB_REPOSITORY.git" gh-pages-benchmark-data
|
||||
else
|
||||
mkdir -p gh-pages-benchmark-data
|
||||
fi
|
||||
|
||||
- name: Compare with latest gh-pages benchmark
|
||||
id: compare
|
||||
continue-on-error: true
|
||||
run: |
|
||||
if [ ! -s "$BENCHMARK_BASELINE_JSON" ]; then
|
||||
mkdir -p "$(dirname "$BENCHMARK_SUMMARY")"
|
||||
{
|
||||
echo "<!-- bw-benchmark-comment -->"
|
||||
echo "## Benchmark comparison"
|
||||
echo
|
||||
echo "No benchmark baseline was found on gh-pages."
|
||||
} > "$BENCHMARK_SUMMARY"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
args=(
|
||||
--baseline "$BENCHMARK_BASELINE_JSON"
|
||||
--current "$BENCHMARK_JSON"
|
||||
--summary "$BENCHMARK_SUMMARY"
|
||||
--threshold-percent "$BENCHMARK_THRESHOLD_PERCENT"
|
||||
)
|
||||
|
||||
if [ "$BENCHMARK_HIGHER_IS_BETTER" = "true" ]; then
|
||||
args+=(--higher-is-better)
|
||||
fi
|
||||
|
||||
set +e
|
||||
python .github/scripts/compare_benchmarks.py "${args[@]}"
|
||||
status=$?
|
||||
set -e
|
||||
|
||||
if [ ! -s "$BENCHMARK_SUMMARY" ]; then
|
||||
mkdir -p "$(dirname "$BENCHMARK_SUMMARY")"
|
||||
{
|
||||
echo "<!-- bw-benchmark-comment -->"
|
||||
echo "## Benchmark comparison"
|
||||
echo
|
||||
echo "Benchmark comparison failed before writing a summary."
|
||||
} > "$BENCHMARK_SUMMARY"
|
||||
fi
|
||||
|
||||
exit "$status"
|
||||
|
||||
- name: Find existing benchmark PR comment
|
||||
if: github.event_name == 'pull_request'
|
||||
id: fc
|
||||
uses: peter-evans/find-comment@v3
|
||||
with:
|
||||
issue-number: ${{ github.event.pull_request.number }}
|
||||
comment-author: github-actions[bot]
|
||||
body-includes: "<!-- bw-benchmark-comment -->"
|
||||
|
||||
- name: Create or update benchmark PR comment
|
||||
if: github.event_name == 'pull_request'
|
||||
uses: peter-evans/create-or-update-comment@v5
|
||||
with:
|
||||
issue-number: ${{ github.event.pull_request.number }}
|
||||
comment-id: ${{ steps.fc.outputs.comment-id }}
|
||||
body-path: ${{ env.BENCHMARK_SUMMARY }}
|
||||
edit-mode: replace
|
||||
|
||||
- name: Fail on benchmark regression
|
||||
if: github.event_name == 'pull_request' && steps.compare.outcome == 'failure'
|
||||
run: exit 1
|
||||
|
||||
publish:
|
||||
needs: [ benchmark ]
|
||||
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: write
|
||||
|
||||
steps:
|
||||
- name: Checkout BEC Widgets
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: bec-project/bec_widgets
|
||||
ref: ${{ github.sha }}
|
||||
|
||||
- name: Download aggregate benchmark artifact
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
name: bw-benchmark-json
|
||||
path: benchmark-results
|
||||
|
||||
- name: Verify aggregate benchmark artifact
|
||||
run: test -s "$BENCHMARK_JSON"
|
||||
|
||||
- name: Prepare gh-pages for publishing
|
||||
run: |
|
||||
# Clean up any existing worktree/directory
|
||||
if [ -d gh-pages-benchmark-data ]; then
|
||||
git worktree remove gh-pages-benchmark-data --force || rm -rf gh-pages-benchmark-data
|
||||
fi
|
||||
|
||||
if git ls-remote --exit-code --heads origin gh-pages; then
|
||||
git fetch --depth=1 origin gh-pages
|
||||
git worktree add gh-pages-benchmark-data FETCH_HEAD
|
||||
else
|
||||
git worktree add --detach gh-pages-benchmark-data
|
||||
git -C gh-pages-benchmark-data checkout --orphan gh-pages
|
||||
git -C gh-pages-benchmark-data rm -rf .
|
||||
fi
|
||||
|
||||
- name: Publish benchmark data to gh-pages
|
||||
working-directory: gh-pages-benchmark-data
|
||||
run: |
|
||||
mkdir -p benchmarks/history
|
||||
cp "../$BENCHMARK_JSON" benchmarks/latest.json
|
||||
cp "../$BENCHMARK_JSON" "benchmarks/history/${GITHUB_SHA}.json"
|
||||
git config user.name "github-actions[bot]"
|
||||
git config user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||
git add benchmarks/latest.json "benchmarks/history/${GITHUB_SHA}.json"
|
||||
git commit -m "Update BW benchmark data for ${GITHUB_SHA}" || exit 0
|
||||
git push origin HEAD:gh-pages
|
||||
@@ -45,18 +45,6 @@ jobs:
|
||||
cd ./bec
|
||||
pip install pytest pytest-random-order
|
||||
pytest -v --maxfail=2 --junitxml=report.xml --random-order ./bec_server/tests ./bec_ipython_client/tests/client_tests ./bec_lib/tests
|
||||
|
||||
- name: Upload BEC unit test artifacts if job fails
|
||||
if: failure()
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: bec-unit-test-artifacts
|
||||
path: |
|
||||
./bec/report.xml
|
||||
./bec/logs/*.log
|
||||
if-no-files-found: ignore
|
||||
retention-days: 7
|
||||
|
||||
bec-e2e-test:
|
||||
name: BEC End2End Tests
|
||||
runs-on: ubuntu-latest
|
||||
@@ -74,12 +62,3 @@ jobs:
|
||||
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH }}
|
||||
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH }}
|
||||
PYTHON_VERSION: '3.11'
|
||||
|
||||
- name: Upload BEC e2e logs if job fails
|
||||
if: failure()
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: bec-e2e-test-logs
|
||||
path: ./_e2e_test_checkout_/bec/logs/*.log
|
||||
if-no-files-found: ignore
|
||||
retention-days: 7
|
||||
|
||||
@@ -1,19 +1,19 @@
|
||||
name: Full CI
|
||||
on:
|
||||
on:
|
||||
push:
|
||||
pull_request:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
BEC_WIDGETS_BRANCH:
|
||||
description: "Branch of BEC Widgets to install"
|
||||
description: 'Branch of BEC Widgets to install'
|
||||
required: false
|
||||
type: string
|
||||
BEC_CORE_BRANCH:
|
||||
description: "Branch of BEC Core to install"
|
||||
description: 'Branch of BEC Core to install'
|
||||
required: false
|
||||
type: string
|
||||
OPHYD_DEVICES_BRANCH:
|
||||
description: "Branch of Ophyd Devices to install"
|
||||
description: 'Branch of Ophyd Devices to install'
|
||||
required: false
|
||||
type: string
|
||||
|
||||
@@ -23,7 +23,6 @@ concurrency:
|
||||
|
||||
permissions:
|
||||
pull-requests: write
|
||||
contents: read
|
||||
|
||||
jobs:
|
||||
check_pr_status:
|
||||
@@ -34,15 +33,6 @@ jobs:
|
||||
if: needs.check_pr_status.outputs.branch-pr == ''
|
||||
uses: ./.github/workflows/formatter.yml
|
||||
|
||||
benchmark:
|
||||
needs: [check_pr_status]
|
||||
if: needs.check_pr_status.outputs.branch-pr == ''
|
||||
permissions:
|
||||
contents: write
|
||||
issues: write
|
||||
pull-requests: write
|
||||
uses: ./.github/workflows/benchmark.yml
|
||||
|
||||
unit-test:
|
||||
needs: [check_pr_status, formatter]
|
||||
if: needs.check_pr_status.outputs.branch-pr == ''
|
||||
@@ -79,9 +69,9 @@ jobs:
|
||||
uses: ./.github/workflows/child_repos.yml
|
||||
with:
|
||||
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
|
||||
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main'}}
|
||||
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main'}}
|
||||
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
|
||||
|
||||
|
||||
plugin_repos:
|
||||
needs: [check_pr_status, formatter]
|
||||
if: needs.check_pr_status.outputs.branch-pr == ''
|
||||
@@ -91,4 +81,4 @@ jobs:
|
||||
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
|
||||
|
||||
secrets:
|
||||
GH_READ_TOKEN: ${{ secrets.GH_READ_TOKEN }}
|
||||
GH_READ_TOKEN: ${{ secrets.GH_READ_TOKEN }}
|
||||
@@ -55,5 +55,5 @@ jobs:
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: pytest-logs
|
||||
path: ./bec/logs/*.log
|
||||
path: ./logs/*.log
|
||||
retention-days: 7
|
||||
|
||||
@@ -1,25 +1,25 @@
|
||||
name: Run Pytest with different Python versions
|
||||
on:
|
||||
on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
pr_number:
|
||||
description: "Pull request number"
|
||||
description: 'Pull request number'
|
||||
required: false
|
||||
type: number
|
||||
BEC_CORE_BRANCH:
|
||||
description: "Branch of BEC Core to install"
|
||||
description: 'Branch of BEC Core to install'
|
||||
required: false
|
||||
default: "main"
|
||||
default: 'main'
|
||||
type: string
|
||||
OPHYD_DEVICES_BRANCH:
|
||||
description: "Branch of Ophyd Devices to install"
|
||||
description: 'Branch of Ophyd Devices to install'
|
||||
required: false
|
||||
default: "main"
|
||||
default: 'main'
|
||||
type: string
|
||||
BEC_WIDGETS_BRANCH:
|
||||
description: "Branch of BEC Widgets to install"
|
||||
description: 'Branch of BEC Widgets to install'
|
||||
required: false
|
||||
default: "main"
|
||||
default: 'main'
|
||||
type: string
|
||||
|
||||
jobs:
|
||||
@@ -30,14 +30,15 @@ jobs:
|
||||
python-version: ["3.11", "3.12", "3.13"]
|
||||
|
||||
env:
|
||||
BEC_WIDGETS_BRANCH: main # Set the branch you want for bec_widgets
|
||||
BEC_CORE_BRANCH: main # Set the branch you want for bec
|
||||
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
|
||||
BEC_WIDGETS_BRANCH: main # Set the branch you want for bec_widgets
|
||||
BEC_CORE_BRANCH: main # Set the branch you want for bec
|
||||
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
|
||||
PROJECT_PATH: ${{ github.repository }}
|
||||
QTWEBENGINE_DISABLE_SANDBOX: 1
|
||||
QT_QPA_PLATFORM: "offscreen"
|
||||
|
||||
steps:
|
||||
|
||||
- name: Checkout BEC Widgets
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
@@ -55,4 +56,4 @@ jobs:
|
||||
- name: Run Pytest
|
||||
run: |
|
||||
pip install pytest pytest-random-order
|
||||
pytest -v --junitxml=report.xml --random-order --ignore=tests/unit_tests/benchmarks ./tests/unit_tests
|
||||
pytest -v --junitxml=report.xml --random-order ./tests/unit_tests
|
||||
|
||||
@@ -1,30 +1,32 @@
|
||||
name: Run Pytest with Coverage
|
||||
on:
|
||||
on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
pr_number:
|
||||
description: "Pull request number"
|
||||
description: 'Pull request number'
|
||||
required: false
|
||||
type: number
|
||||
BEC_CORE_BRANCH:
|
||||
description: "Branch of BEC Core to install"
|
||||
description: 'Branch of BEC Core to install'
|
||||
required: false
|
||||
default: "main"
|
||||
default: 'main'
|
||||
type: string
|
||||
OPHYD_DEVICES_BRANCH:
|
||||
description: "Branch of Ophyd Devices to install"
|
||||
description: 'Branch of Ophyd Devices to install'
|
||||
required: false
|
||||
default: "main"
|
||||
default: 'main'
|
||||
type: string
|
||||
BEC_WIDGETS_BRANCH:
|
||||
description: "Branch of BEC Widgets to install"
|
||||
description: 'Branch of BEC Widgets to install'
|
||||
required: false
|
||||
default: "main"
|
||||
default: 'main'
|
||||
type: string
|
||||
secrets:
|
||||
CODECOV_TOKEN:
|
||||
required: true
|
||||
|
||||
|
||||
|
||||
permissions:
|
||||
pull-requests: write
|
||||
|
||||
@@ -53,7 +55,7 @@ jobs:
|
||||
|
||||
- name: Run Pytest with Coverage
|
||||
id: coverage
|
||||
run: pytest --random-order --cov=bec_widgets --cov-config=pyproject.toml --cov-branch --cov-report=xml --no-cov-on-fail --ignore=tests/unit_tests/benchmarks tests/unit_tests/
|
||||
run: pytest --random-order --cov=bec_widgets --cov-config=pyproject.toml --cov-branch --cov-report=xml --no-cov-on-fail tests/unit_tests/
|
||||
|
||||
- name: Upload test artifacts
|
||||
uses: actions/upload-artifact@v4
|
||||
@@ -67,4 +69,4 @@ jobs:
|
||||
uses: codecov/codecov-action@v5
|
||||
with:
|
||||
token: ${{ secrets.CODECOV_TOKEN }}
|
||||
slug: bec-project/bec_widgets
|
||||
slug: bec-project/bec_widgets
|
||||
+1
-3
@@ -177,6 +177,4 @@ cython_debug/
|
||||
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
|
||||
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||
#.idea/
|
||||
#
|
||||
tombi.toml
|
||||
#.idea/
|
||||
-1470
File diff suppressed because it is too large
Load Diff
@@ -192,7 +192,8 @@ Positioner boxes and tweak controls handle precise moves, homing, and calibratio
|
||||
|
||||
## Documentation
|
||||
|
||||
The documentation can be found [here](https://bec.readthedocs.io/).
|
||||
Documentation of BEC Widgets can be found [here](https://bec-widgets.readthedocs.io/en/latest/). The documentation of
|
||||
the BEC can be found [here](https://bec.readthedocs.io/en/latest/).
|
||||
|
||||
## License
|
||||
|
||||
|
||||
+18
-12
@@ -1,13 +1,19 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
import bec_widgets.widgets.containers.qt_ads as QtAds
|
||||
from bec_widgets.utils.bec_widget import BECWidget
|
||||
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
|
||||
|
||||
if sys.platform.startswith("linux"):
|
||||
qt_platform = os.environ.get("QT_QPA_PLATFORM", "")
|
||||
if qt_platform != "offscreen":
|
||||
os.environ["QT_QPA_PLATFORM"] = "xcb"
|
||||
|
||||
# Default QtAds configuration
|
||||
QtAds.CDockManager.setConfigFlag(QtAds.CDockManager.eConfigFlag.FocusHighlighting, True)
|
||||
QtAds.CDockManager.setConfigFlag(
|
||||
QtAds.CDockManager.eConfigFlag.RetainTabSizeWhenCloseButtonHidden, True
|
||||
)
|
||||
|
||||
__all__ = ["BECWidget", "SafeSlot", "SafeProperty"]
|
||||
|
||||
|
||||
def __getattr__(name):
|
||||
if name == "BECWidget":
|
||||
from bec_widgets.utils.bec_widget import BECWidget
|
||||
|
||||
return BECWidget
|
||||
if name in {"SafeSlot", "SafeProperty"}:
|
||||
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
|
||||
|
||||
return {"SafeSlot": SafeSlot, "SafeProperty": SafeProperty}[name]
|
||||
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
import bec_widgets.widgets.containers.qt_ads as QtAds
|
||||
|
||||
if sys.platform.startswith("linux"):
|
||||
qt_platform = os.environ.get("QT_QPA_PLATFORM", "")
|
||||
if qt_platform != "offscreen":
|
||||
os.environ["QT_QPA_PLATFORM"] = "xcb"
|
||||
|
||||
# Default QtAds configuration
|
||||
QtAds.CDockManager.setConfigFlag(QtAds.CDockManager.eConfigFlag.FocusHighlighting, True)
|
||||
QtAds.CDockManager.setConfigFlag(
|
||||
QtAds.CDockManager.eConfigFlag.RetainTabSizeWhenCloseButtonHidden, True
|
||||
)
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Literal
|
||||
|
||||
from bec_lib import bec_logger
|
||||
|
||||
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
|
||||
@@ -11,32 +9,37 @@ logger = bec_logger.logger
|
||||
|
||||
|
||||
def dock_area(
|
||||
object_name: str | None = None, startup_profile: str | Literal["restore", "skip"] | None = None
|
||||
object_name: str | None = None, profile: str | None = None, start_empty: bool = False
|
||||
) -> BECDockArea:
|
||||
"""
|
||||
Create an advanced dock area using Qt Advanced Docking System.
|
||||
|
||||
Args:
|
||||
object_name(str): The name of the advanced dock area.
|
||||
startup_profile(str | Literal["restore", "skip"] | None): Startup mode for
|
||||
the workspace:
|
||||
- None: start empty
|
||||
- "restore": restore last used profile
|
||||
- "skip": do not initialize profile state
|
||||
- "<name>": load specific profile
|
||||
profile(str|None): Optional profile to load; if None the "general" profile is used.
|
||||
start_empty(bool): If True, start with an empty dock area when loading specified profile.
|
||||
|
||||
Returns:
|
||||
BECDockArea: The created advanced dock area.
|
||||
|
||||
Note:
|
||||
The "general" profile is mandatory and will always exist. If manually deleted,
|
||||
it will be automatically recreated.
|
||||
"""
|
||||
# Default to "general" profile when called from CLI without specifying a profile
|
||||
effective_profile = profile if profile is not None else "general"
|
||||
|
||||
widget = BECDockArea(
|
||||
object_name=object_name,
|
||||
restore_initial_profile=True,
|
||||
root_widget=True,
|
||||
profile_namespace="bec",
|
||||
startup_profile=startup_profile,
|
||||
init_profile=effective_profile,
|
||||
start_empty=start_empty,
|
||||
)
|
||||
logger.info(
|
||||
f"Created advanced dock area with profile: {effective_profile}, start_empty: {start_empty}"
|
||||
)
|
||||
logger.info(f"Created advanced dock area with startup_profile: {startup_profile}")
|
||||
return widget
|
||||
|
||||
|
||||
|
||||
@@ -20,13 +20,13 @@ from qtpy.QtWidgets import (
|
||||
)
|
||||
|
||||
import bec_widgets
|
||||
from bec_widgets.cli.rpc.rpc_register import RPCRegister
|
||||
from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets
|
||||
from bec_widgets.utils.container_utils import WidgetContainerUtils
|
||||
from bec_widgets.utils.error_popups import SafeSlot
|
||||
from bec_widgets.utils.name_utils import pascal_to_snake
|
||||
from bec_widgets.utils.plugin_utils import get_plugin_auto_updates
|
||||
from bec_widgets.utils.round_frame import RoundedFrame
|
||||
from bec_widgets.utils.rpc_register import RPCRegister
|
||||
from bec_widgets.utils.screen_utils import apply_window_geometry, centered_geometry_for_app
|
||||
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
|
||||
from bec_widgets.utils.ui_loader import UILoader
|
||||
@@ -43,7 +43,6 @@ if TYPE_CHECKING: # pragma: no cover
|
||||
|
||||
logger = bec_logger.logger
|
||||
MODULE_PATH = os.path.dirname(bec_widgets.__file__)
|
||||
START_EMPTY_PROFILE_OPTION = "Start Empty (No Profile)"
|
||||
|
||||
|
||||
class LaunchTile(RoundedFrame):
|
||||
@@ -147,7 +146,8 @@ class LaunchTile(RoundedFrame):
|
||||
|
||||
# Action button
|
||||
self.action_button = QPushButton("Open")
|
||||
self.action_button.setStyleSheet("""
|
||||
self.action_button.setStyleSheet(
|
||||
"""
|
||||
QPushButton {
|
||||
background-color: #007AFF;
|
||||
border: none;
|
||||
@@ -159,7 +159,8 @@ class LaunchTile(RoundedFrame):
|
||||
QPushButton:hover {
|
||||
background-color: #005BB5;
|
||||
}
|
||||
""")
|
||||
"""
|
||||
)
|
||||
self.layout.addWidget(self.action_button, alignment=Qt.AlignmentFlag.AlignCenter)
|
||||
|
||||
def _fit_label_to_width(self, label: QLabel, max_width: int, min_pt: int = 10):
|
||||
@@ -207,7 +208,6 @@ class LaunchWindow(BECMainWindow):
|
||||
|
||||
self.app = QApplication.instance()
|
||||
self.tiles: dict[str, LaunchTile] = {}
|
||||
self._logged_unparented_connections: set[str] = set()
|
||||
# Track the smallest main‑label font size chosen so far
|
||||
self._min_main_label_pt: int | None = None
|
||||
|
||||
@@ -354,7 +354,7 @@ class LaunchWindow(BECMainWindow):
|
||||
def _refresh_dock_area_profiles(self, preserve_selection: bool = True) -> None:
|
||||
"""
|
||||
Refresh the dock-area profile selector, optionally preserving the selection.
|
||||
Defaults to Start Empty when no valid selection can be preserved.
|
||||
Sets the combobox to the last used profile or "general" if no selection preserved.
|
||||
|
||||
Args:
|
||||
preserve_selection(bool): Whether to preserve the current selection or not.
|
||||
@@ -369,10 +369,9 @@ class LaunchWindow(BECMainWindow):
|
||||
)
|
||||
|
||||
profiles = list_profiles("bec")
|
||||
selector_items = [START_EMPTY_PROFILE_OPTION, *profiles]
|
||||
selector.blockSignals(True)
|
||||
selector.clear()
|
||||
for profile in selector_items:
|
||||
for profile in profiles:
|
||||
selector.addItem(profile)
|
||||
|
||||
if selected_text:
|
||||
@@ -381,31 +380,21 @@ class LaunchWindow(BECMainWindow):
|
||||
if idx >= 0:
|
||||
selector.setCurrentIndex(idx)
|
||||
else:
|
||||
# Selection no longer exists, fall back to default startup selection.
|
||||
# Selection no longer exists, fall back to last profile or "general"
|
||||
self._set_selector_to_default_profile(selector, profiles)
|
||||
else:
|
||||
# No selection to preserve, use default startup selection.
|
||||
# No selection to preserve, use last profile or "general"
|
||||
self._set_selector_to_default_profile(selector, profiles)
|
||||
selector.blockSignals(False)
|
||||
|
||||
def _set_selector_to_default_profile(self, selector: QComboBox, profiles: list[str]) -> None:
|
||||
"""
|
||||
Set the selector default.
|
||||
|
||||
Preference order:
|
||||
1) Start Empty option (if available)
|
||||
2) Last used profile
|
||||
3) First available profile
|
||||
Set the selector to the last used profile or "general" as fallback.
|
||||
|
||||
Args:
|
||||
selector(QComboBox): The combobox to set.
|
||||
profiles(list[str]): List of available profiles.
|
||||
"""
|
||||
start_empty_idx = selector.findText(START_EMPTY_PROFILE_OPTION, Qt.MatchFlag.MatchExactly)
|
||||
if start_empty_idx >= 0:
|
||||
selector.setCurrentIndex(start_empty_idx)
|
||||
return
|
||||
|
||||
# Try to get last used profile
|
||||
last_profile = get_last_profile(namespace="bec")
|
||||
if last_profile and last_profile in profiles:
|
||||
@@ -414,6 +403,13 @@ class LaunchWindow(BECMainWindow):
|
||||
selector.setCurrentIndex(idx)
|
||||
return
|
||||
|
||||
# Fall back to "general" profile
|
||||
if "general" in profiles:
|
||||
idx = selector.findText("general", Qt.MatchFlag.MatchExactly)
|
||||
if idx >= 0:
|
||||
selector.setCurrentIndex(idx)
|
||||
return
|
||||
|
||||
# If nothing else, select first item
|
||||
if selector.count() > 0:
|
||||
selector.setCurrentIndex(0)
|
||||
@@ -592,14 +588,11 @@ class LaunchWindow(BECMainWindow):
|
||||
"""
|
||||
tile = self.tiles.get("dock_area")
|
||||
if tile is None or tile.selector is None:
|
||||
startup_profile = None
|
||||
profile = None
|
||||
else:
|
||||
selection = tile.selector.currentText().strip()
|
||||
if selection == START_EMPTY_PROFILE_OPTION:
|
||||
startup_profile = None
|
||||
else:
|
||||
startup_profile = selection if selection else None
|
||||
return self.launch("dock_area", startup_profile=startup_profile)
|
||||
profile = selection if selection else None
|
||||
return self.launch("dock_area", profile=profile)
|
||||
|
||||
def _open_widget(self):
|
||||
"""
|
||||
@@ -656,83 +649,53 @@ class LaunchWindow(BECMainWindow):
|
||||
super().showEvent(event)
|
||||
self.setFixedSize(self.size())
|
||||
|
||||
def _has_external_window(self, connections: dict) -> bool:
|
||||
def _launcher_is_last_widget(self, connections: dict) -> bool:
|
||||
"""
|
||||
Check if any registered non-launcher connection owns a top-level Qt window.
|
||||
Check if the launcher is the last widget in the application.
|
||||
"""
|
||||
|
||||
# get all parents of connections
|
||||
for connection in connections.values():
|
||||
if self._connection_belongs_to_launcher(connection):
|
||||
continue
|
||||
if isinstance(connection, QWidget) and connection.isWindow():
|
||||
return True
|
||||
return False
|
||||
|
||||
def _log_unparented_connections(self, connections: dict) -> None:
|
||||
"""
|
||||
Log non-launcher RPC connections that remain without an active top-level window.
|
||||
"""
|
||||
for connection in connections.values():
|
||||
if self._connection_belongs_to_launcher(connection):
|
||||
continue
|
||||
if isinstance(connection, QWidget) and connection.isWindow():
|
||||
continue
|
||||
|
||||
connection_description = (
|
||||
f"type={type(connection).__name__} objectName={connection.objectName()!r} "
|
||||
f"gui_id={connection.gui_id!r}"
|
||||
)
|
||||
if connection_description in self._logged_unparented_connections:
|
||||
continue
|
||||
self._logged_unparented_connections.add(connection_description)
|
||||
logger.warning(
|
||||
"Registered non-launcher RPC connection has no active top-level window: "
|
||||
f"{connection_description}"
|
||||
)
|
||||
|
||||
def _connection_belongs_to_launcher(self, connection: QObject) -> bool:
|
||||
"""
|
||||
Check whether a registered connection is the launcher itself or part of its Qt hierarchy.
|
||||
"""
|
||||
if connection is self or connection.gui_id == self.gui_id:
|
||||
return True
|
||||
|
||||
parent = connection.parent()
|
||||
while parent is not None:
|
||||
if parent is self:
|
||||
return True
|
||||
parent = parent.parent()
|
||||
|
||||
return False
|
||||
try:
|
||||
parent = connection.parent()
|
||||
if parent is None and connection.objectName() != self.objectName():
|
||||
logger.info(
|
||||
f"Found non-launcher connection without parent: {connection.objectName()}"
|
||||
)
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting parent of connection: {e}")
|
||||
return False
|
||||
return True
|
||||
|
||||
def _turn_off_the_lights(self, connections: dict):
|
||||
"""
|
||||
If there is only one connection remaining, it is the launcher, so we show it.
|
||||
Once the launcher is closed as the last window, we quit the application.
|
||||
"""
|
||||
if self._has_external_window(connections):
|
||||
self.hide()
|
||||
if self._launcher_is_last_widget(connections):
|
||||
self.show()
|
||||
self.activateWindow()
|
||||
self.raise_()
|
||||
if self.app:
|
||||
self.app.setQuitOnLastWindowClosed(False) # type: ignore
|
||||
self.app.setQuitOnLastWindowClosed(True) # type: ignore
|
||||
return
|
||||
|
||||
self._log_unparented_connections(connections)
|
||||
self.show()
|
||||
self.activateWindow()
|
||||
self.raise_()
|
||||
self.hide()
|
||||
if self.app:
|
||||
self.app.setQuitOnLastWindowClosed(True) # type: ignore
|
||||
self.app.setQuitOnLastWindowClosed(False) # type: ignore
|
||||
|
||||
def closeEvent(self, event):
|
||||
"""
|
||||
Close the launcher window.
|
||||
"""
|
||||
connections = self.register.list_all_connections()
|
||||
if self._has_external_window(connections):
|
||||
event.ignore()
|
||||
self.hide()
|
||||
if self._launcher_is_last_widget(connections):
|
||||
event.accept()
|
||||
return
|
||||
|
||||
event.accept()
|
||||
event.ignore()
|
||||
self.hide()
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
|
||||
@@ -1,18 +1,13 @@
|
||||
from bec_qthemes import material_icon
|
||||
from qtpy.QtGui import QAction # type: ignore
|
||||
from qtpy.QtWidgets import QApplication, QHBoxLayout, QStackedWidget, QWidget
|
||||
|
||||
from bec_widgets.applications.navigation_centre.reveal_animator import ANIMATION_DURATION
|
||||
from bec_widgets.applications.navigation_centre.side_bar import SideBar
|
||||
from bec_widgets.applications.navigation_centre.side_bar_components import NavigationItem
|
||||
from bec_widgets.applications.views.admin_view.admin_view import AdminView
|
||||
from bec_widgets.applications.views.developer_view.developer_view import DeveloperView
|
||||
from bec_widgets.applications.views.device_manager_view.device_manager_view import DeviceManagerView
|
||||
from bec_widgets.applications.views.dock_area_view.dock_area_view import DockAreaView
|
||||
from bec_widgets.applications.views.view import ViewBase, WaveformViewInline, WaveformViewPopup
|
||||
from bec_widgets.utils.colors import apply_theme
|
||||
from bec_widgets.utils.guided_tour import GuidedTour
|
||||
from bec_widgets.utils.name_utils import sanitize_namespace
|
||||
from bec_widgets.utils.screen_utils import (
|
||||
apply_centered_size,
|
||||
available_screen_geometry,
|
||||
@@ -55,58 +50,54 @@ class BECMainApp(BECMainWindow):
|
||||
|
||||
self._add_views()
|
||||
|
||||
# Initialize guided tour
|
||||
self.guided_tour = GuidedTour(self)
|
||||
self._setup_guided_tour()
|
||||
|
||||
def _add_views(self):
|
||||
self.add_section("BEC Applications", "bec_apps")
|
||||
self.dock_area = DockAreaView(self)
|
||||
self.device_manager = DeviceManagerView(self)
|
||||
# self.developer_view = DeveloperView(self) #TODO temporary disable until the bugs with BECShell are resolved
|
||||
self.admin_view = AdminView(self)
|
||||
self.developer_view = DeveloperView(self)
|
||||
|
||||
self.add_view(icon="widgets", title="Dock Area", widget=self.dock_area, mini_text="Docks")
|
||||
self.add_view(
|
||||
icon="widgets",
|
||||
title="Dock Area",
|
||||
id="dock_area",
|
||||
widget=self.dock_area,
|
||||
mini_text="Docks",
|
||||
)
|
||||
self.add_view(
|
||||
icon="display_settings",
|
||||
title="Device Manager",
|
||||
id="device_manager",
|
||||
widget=self.device_manager,
|
||||
mini_text="DM",
|
||||
)
|
||||
# TODO temporary disable until the bugs with BECShell are resolved
|
||||
# self.add_view(
|
||||
# icon="code_blocks",
|
||||
# title="IDE",
|
||||
# widget=self.developer_view,
|
||||
# mini_text="IDE",
|
||||
# exclusive=True,
|
||||
# )
|
||||
self.add_view(
|
||||
icon="admin_panel_settings",
|
||||
title="Admin View",
|
||||
widget=self.admin_view,
|
||||
mini_text="Admin",
|
||||
from_top=False,
|
||||
icon="code_blocks",
|
||||
title="IDE",
|
||||
widget=self.developer_view,
|
||||
id="developer_view",
|
||||
exclusive=True,
|
||||
)
|
||||
|
||||
if self._show_examples:
|
||||
self.add_section("Examples", "examples")
|
||||
waveform_view_popup = WaveformViewPopup(
|
||||
parent=self, view_id="waveform_view_popup", title="Waveform Plot"
|
||||
parent=self, id="waveform_view_popup", title="Waveform Plot"
|
||||
)
|
||||
waveform_view_stack = WaveformViewInline(
|
||||
parent=self, view_id="waveform_view_stack", title="Waveform Plot"
|
||||
parent=self, id="waveform_view_stack", title="Waveform Plot"
|
||||
)
|
||||
|
||||
self.add_view(
|
||||
icon="show_chart",
|
||||
title="Waveform With Popup",
|
||||
id="waveform_popup",
|
||||
widget=waveform_view_popup,
|
||||
mini_text="Popup",
|
||||
)
|
||||
self.add_view(
|
||||
icon="show_chart",
|
||||
title="Waveform InLine Stack",
|
||||
id="waveform_stack",
|
||||
widget=waveform_view_stack,
|
||||
mini_text="Stack",
|
||||
)
|
||||
@@ -114,9 +105,6 @@ class BECMainApp(BECMainWindow):
|
||||
self.set_current("dock_area")
|
||||
self.sidebar.add_dark_mode_item()
|
||||
|
||||
# Add guided tour to Help menu
|
||||
self._add_guided_tour_to_menu()
|
||||
|
||||
# --- Public API ------------------------------------------------------
|
||||
def add_section(self, title: str, id: str, position: int | None = None):
|
||||
return self.sidebar.add_section(title, id, position)
|
||||
@@ -132,7 +120,7 @@ class BECMainApp(BECMainWindow):
|
||||
*,
|
||||
icon: str,
|
||||
title: str,
|
||||
view_id: str | None = None,
|
||||
id: str,
|
||||
widget: QWidget,
|
||||
mini_text: str | None = None,
|
||||
position: int | None = None,
|
||||
@@ -146,8 +134,7 @@ class BECMainApp(BECMainWindow):
|
||||
Args:
|
||||
icon(str): Icon name for the nav item.
|
||||
title(str): Title for the nav item.
|
||||
view_id(str, optional): Unique ID for the view/item. If omitted, uses mini_text;
|
||||
if mini_text is also omitted, uses title.
|
||||
id(str): Unique ID for the view/item.
|
||||
widget(QWidget): The widget to add to the stack.
|
||||
mini_text(str, optional): Short text for the nav item when sidebar is collapsed.
|
||||
position(int, optional): Position to insert the nav item.
|
||||
@@ -160,11 +147,10 @@ class BECMainApp(BECMainWindow):
|
||||
|
||||
|
||||
"""
|
||||
resolved_id = sanitize_namespace(view_id or mini_text or title)
|
||||
item = self.sidebar.add_item(
|
||||
icon=icon,
|
||||
title=title,
|
||||
id=resolved_id,
|
||||
id=id,
|
||||
mini_text=mini_text,
|
||||
position=position,
|
||||
from_top=from_top,
|
||||
@@ -174,15 +160,13 @@ class BECMainApp(BECMainWindow):
|
||||
# Wrap plain widgets into a ViewBase so enter/exit hooks are available
|
||||
if isinstance(widget, ViewBase):
|
||||
view_widget = widget
|
||||
view_widget.view_id = resolved_id
|
||||
view_widget.view_id = id
|
||||
view_widget.view_title = title
|
||||
else:
|
||||
view_widget = ViewBase(content=widget, parent=self, view_id=resolved_id, title=title)
|
||||
|
||||
view_widget.change_object_name(resolved_id)
|
||||
view_widget = ViewBase(content=widget, parent=self, id=id, title=title)
|
||||
|
||||
idx = self.stack.addWidget(view_widget)
|
||||
self._view_index[resolved_id] = idx
|
||||
self._view_index[id] = idx
|
||||
return item
|
||||
|
||||
def set_current(self, id: str) -> None:
|
||||
@@ -191,12 +175,6 @@ class BECMainApp(BECMainWindow):
|
||||
|
||||
# Internal: route sidebar selection to the stack
|
||||
def _on_view_selected(self, vid: str) -> None:
|
||||
# Special handling for views that can not be switched to (e.g. dark mode toggle)
|
||||
# Not registered as proper view with a stack index, so we ignore any logic below
|
||||
# as it will anyways not result in a stack switch.
|
||||
idx = self._view_index.get(vid)
|
||||
if idx is None or not (0 <= idx < self.stack.count()):
|
||||
return
|
||||
# Determine current view
|
||||
current_index = self.stack.currentIndex()
|
||||
current_view = (
|
||||
@@ -222,160 +200,6 @@ class BECMainApp(BECMainWindow):
|
||||
if hasattr(new_view, "on_enter"):
|
||||
new_view.on_enter()
|
||||
|
||||
def _setup_guided_tour(self):
|
||||
"""
|
||||
Setup the guided tour for the main application.
|
||||
Registers key UI components and delegates to views for their internal components.
|
||||
"""
|
||||
tour_steps = []
|
||||
|
||||
# --- General Layout Components ---
|
||||
|
||||
# Register the sidebar toggle button
|
||||
toggle_step = self.guided_tour.register_widget(
|
||||
widget=self.sidebar.toggle,
|
||||
title="Sidebar Toggle",
|
||||
text="Click this button to expand or collapse the sidebar. When expanded, you can see full navigation item titles and section names.",
|
||||
)
|
||||
tour_steps.append(toggle_step)
|
||||
|
||||
# Register the sidebar icons
|
||||
sidebar_dock_area = self.sidebar.components.get("dock_area")
|
||||
if sidebar_dock_area:
|
||||
dock_step = self.guided_tour.register_widget(
|
||||
widget=sidebar_dock_area,
|
||||
title="Dock Area View",
|
||||
text="Click here to access the Dock Area view, where you can manage and arrange your dockable panels.",
|
||||
)
|
||||
tour_steps.append(dock_step)
|
||||
|
||||
sidebar_device_manager = self.sidebar.components.get("device_manager")
|
||||
if sidebar_device_manager:
|
||||
device_manager_step = self.guided_tour.register_widget(
|
||||
widget=sidebar_device_manager,
|
||||
title="Device Manager View",
|
||||
text="Click here to open the Device Manager view, where you can view and manage device configs.",
|
||||
)
|
||||
tour_steps.append(device_manager_step)
|
||||
|
||||
sidebar_developer_view = self.sidebar.components.get("developer_view")
|
||||
if sidebar_developer_view:
|
||||
developer_view_step = self.guided_tour.register_widget(
|
||||
widget=sidebar_developer_view,
|
||||
title="Developer View",
|
||||
text="Click here to access the Developer view to write scripts and macros.",
|
||||
)
|
||||
tour_steps.append(developer_view_step)
|
||||
|
||||
# Register the dark mode toggle
|
||||
dark_mode_item = self.sidebar.components.get("dark_mode")
|
||||
if dark_mode_item:
|
||||
dark_mode_step = self.guided_tour.register_widget(
|
||||
widget=dark_mode_item,
|
||||
title="Theme Toggle",
|
||||
text="Switch between light and dark themes. The theme preference is saved and will be applied when you restart the application.",
|
||||
)
|
||||
tour_steps.append(dark_mode_step)
|
||||
|
||||
# Register the client info label
|
||||
if hasattr(self, "_client_info_hover"):
|
||||
client_info_step = self.guided_tour.register_widget(
|
||||
widget=self._client_info_hover,
|
||||
title="Client Status",
|
||||
text="Displays status messages and information from the BEC Server.",
|
||||
)
|
||||
tour_steps.append(client_info_step)
|
||||
|
||||
# Register the scan progress bar if available
|
||||
if hasattr(self, "_scan_progress_hover"):
|
||||
progress_step = self.guided_tour.register_widget(
|
||||
widget=self._scan_progress_hover,
|
||||
title="Scan Progress",
|
||||
text="Monitor the progress of ongoing scans. Hover over the progress bar to see detailed information including elapsed time and estimated completion.",
|
||||
)
|
||||
tour_steps.append(progress_step)
|
||||
|
||||
# Register the notification indicator in the status bar
|
||||
if hasattr(self, "notification_indicator"):
|
||||
notif_step = self.guided_tour.register_widget(
|
||||
widget=self.notification_indicator,
|
||||
title="Notification Center",
|
||||
text="View system notifications, errors, and status updates. Click to filter notifications by type or expand to see all details.",
|
||||
)
|
||||
tour_steps.append(notif_step)
|
||||
|
||||
# --- View-Specific Components ---
|
||||
|
||||
# Register all views that can extend the tour
|
||||
for view_id, view_index in self._view_index.items():
|
||||
view_widget = self.stack.widget(view_index)
|
||||
if not view_widget or not hasattr(view_widget, "register_tour_steps"):
|
||||
continue
|
||||
|
||||
# Get the view's tour steps
|
||||
view_tour = view_widget.register_tour_steps(self.guided_tour, self)
|
||||
if view_tour is None:
|
||||
if hasattr(view_widget.content, "register_tour_steps"):
|
||||
view_tour = view_widget.content.register_tour_steps(self.guided_tour, self)
|
||||
if view_tour is None:
|
||||
continue
|
||||
|
||||
# Get the corresponding sidebar navigation item
|
||||
nav_item = self.sidebar.components.get(view_id)
|
||||
if not nav_item:
|
||||
continue
|
||||
|
||||
# Use the view's title for the navigation button
|
||||
nav_step = self.guided_tour.register_widget(
|
||||
widget=nav_item,
|
||||
title=view_tour.view_title,
|
||||
text=f"Let's explore the features of the {view_tour.view_title}.",
|
||||
)
|
||||
tour_steps.append(nav_step)
|
||||
tour_steps.extend(view_tour.step_ids)
|
||||
|
||||
# Create the tour with all registered steps
|
||||
if tour_steps:
|
||||
self.guided_tour.create_tour(tour_steps)
|
||||
|
||||
def start_guided_tour(self):
|
||||
"""
|
||||
Public method to start the guided tour.
|
||||
This can be called programmatically or connected to a menu/button action.
|
||||
"""
|
||||
self.guided_tour.start_tour()
|
||||
|
||||
def _add_guided_tour_to_menu(self):
|
||||
"""
|
||||
Add a 'Guided Tour' action to the Help menu.
|
||||
"""
|
||||
|
||||
# Find the Help menu
|
||||
menu_bar = self.menuBar()
|
||||
help_menu = None
|
||||
for action in menu_bar.actions():
|
||||
if action.text() == "Help":
|
||||
help_menu = action.menu()
|
||||
break
|
||||
|
||||
if help_menu:
|
||||
# Add separator before the tour action
|
||||
help_menu.addSeparator()
|
||||
|
||||
# Create and add the guided tour action
|
||||
tour_action = QAction("Start Guided Tour", self)
|
||||
tour_action.setIcon(material_icon("help"))
|
||||
tour_action.triggered.connect(self.start_guided_tour)
|
||||
tour_action.setShortcut("F1") # Add keyboard shortcut
|
||||
help_menu.addAction(tour_action)
|
||||
|
||||
def cleanup(self):
|
||||
for view_id, idx in self._view_index.items():
|
||||
view = self.stack.widget(idx)
|
||||
view.close()
|
||||
view.deleteLater()
|
||||
super().cleanup()
|
||||
|
||||
|
||||
def main(): # pragma: no cover
|
||||
"""
|
||||
@@ -394,7 +218,6 @@ def main(): # pragma: no cover
|
||||
args, qt_args = parser.parse_known_args(sys.argv[1:])
|
||||
|
||||
app = QApplication([sys.argv[0], *qt_args])
|
||||
app.setApplicationName("BEC")
|
||||
apply_theme("dark")
|
||||
w = BECMainApp(show_examples=args.examples)
|
||||
|
||||
|
||||
@@ -127,10 +127,12 @@ class NavigationItem(QWidget):
|
||||
self._icon_size_expanded = QtCore.QSize(26, 26)
|
||||
self.icon_btn.setIconSize(self._icon_size_collapsed)
|
||||
# Remove QToolButton hover/pressed background/outline
|
||||
self.icon_btn.setStyleSheet("""
|
||||
self.icon_btn.setStyleSheet(
|
||||
"""
|
||||
QToolButton:hover { background: transparent; border: none; }
|
||||
QToolButton:pressed { background: transparent; border: none; }
|
||||
""")
|
||||
"""
|
||||
)
|
||||
|
||||
# Mini label below icon
|
||||
self.mini_lbl = QLabel(self._mini_text, self)
|
||||
|
||||
@@ -1,35 +0,0 @@
|
||||
"""Module for Admin View."""
|
||||
|
||||
from qtpy.QtWidgets import QWidget
|
||||
|
||||
from bec_widgets.applications.views.view import ViewBase
|
||||
from bec_widgets.utils.error_popups import SafeSlot
|
||||
from bec_widgets.widgets.services.bec_atlas_admin_view.bec_atlas_admin_view import BECAtlasAdminView
|
||||
|
||||
|
||||
class AdminView(ViewBase):
|
||||
"""
|
||||
A view for administrators to change the current active experiment, manage messaging
|
||||
services, and more tasks reserved for users with admin privileges.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
parent: QWidget | None = None,
|
||||
content: QWidget | None = None,
|
||||
*,
|
||||
view_id: str | None = None,
|
||||
title: str | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(parent=parent, content=content, view_id=view_id, title=title)
|
||||
self.admin_widget = BECAtlasAdminView(parent=self)
|
||||
self.set_content(self.admin_widget)
|
||||
|
||||
@SafeSlot()
|
||||
def on_exit(self) -> None:
|
||||
"""Called before the view is hidden.
|
||||
|
||||
Default implementation does nothing. Override in subclasses.
|
||||
"""
|
||||
self.admin_widget.logout()
|
||||
@@ -1,7 +1,7 @@
|
||||
from qtpy.QtWidgets import QWidget
|
||||
|
||||
from bec_widgets.applications.views.developer_view.developer_widget import DeveloperWidget
|
||||
from bec_widgets.applications.views.view import ViewBase, ViewTourSteps
|
||||
from bec_widgets.applications.views.view import ViewBase
|
||||
|
||||
|
||||
class DeveloperView(ViewBase):
|
||||
@@ -14,89 +14,13 @@ class DeveloperView(ViewBase):
|
||||
parent: QWidget | None = None,
|
||||
content: QWidget | None = None,
|
||||
*,
|
||||
view_id: str | None = None,
|
||||
id: str | None = None,
|
||||
title: str | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(parent=parent, content=content, view_id=view_id, title=title, **kwargs)
|
||||
super().__init__(parent=parent, content=content, id=id, title=title)
|
||||
self.developer_widget = DeveloperWidget(parent=self)
|
||||
self.set_content(self.developer_widget)
|
||||
|
||||
def register_tour_steps(self, guided_tour, main_app) -> ViewTourSteps | None:
|
||||
"""Register Developer View components with the guided tour.
|
||||
|
||||
Args:
|
||||
guided_tour: The GuidedTour instance to register with.
|
||||
main_app: The main application instance (for accessing set_current).
|
||||
|
||||
Returns:
|
||||
ViewTourSteps | None: Model containing view title and step IDs.
|
||||
"""
|
||||
step_ids = []
|
||||
dev_widget = self.developer_widget
|
||||
|
||||
# IDE Toolbar
|
||||
def get_ide_toolbar():
|
||||
main_app.set_current("developer_view")
|
||||
return (dev_widget.toolbar, None)
|
||||
|
||||
step_id = guided_tour.register_widget(
|
||||
widget=get_ide_toolbar,
|
||||
title="IDE Toolbar",
|
||||
text="Quick access to save files, execute scripts, and configure IDE settings. Use the toolbar to manage your code and execution.",
|
||||
)
|
||||
step_ids.append(step_id)
|
||||
|
||||
# IDE Explorer
|
||||
def get_ide_explorer():
|
||||
main_app.set_current("developer_view")
|
||||
return (dev_widget.explorer_dock.widget(), None)
|
||||
|
||||
step_id = guided_tour.register_widget(
|
||||
widget=get_ide_explorer,
|
||||
title="File Explorer",
|
||||
text="Browse and manage your macro files. Create new files, open existing ones, and organize your scripts.",
|
||||
)
|
||||
step_ids.append(step_id)
|
||||
|
||||
# IDE Editor
|
||||
def get_ide_editor():
|
||||
main_app.set_current("developer_view")
|
||||
return (dev_widget.monaco_dock.widget(), None)
|
||||
|
||||
step_id = guided_tour.register_widget(
|
||||
widget=get_ide_editor,
|
||||
title="Code Editor",
|
||||
text="Write and edit Python code with syntax highlighting, auto-completion, and signature help. Monaco editor provides a modern coding experience.",
|
||||
)
|
||||
step_ids.append(step_id)
|
||||
|
||||
# IDE Console
|
||||
def get_ide_console():
|
||||
main_app.set_current("developer_view")
|
||||
return (dev_widget.console_dock.widget(), None)
|
||||
|
||||
step_id = guided_tour.register_widget(
|
||||
widget=get_ide_console,
|
||||
title="BEC Shell Console",
|
||||
text="Interactive Python console with BEC integration. Execute commands, test code snippets, and interact with the BEC system in real-time.",
|
||||
)
|
||||
step_ids.append(step_id)
|
||||
|
||||
# IDE Plotting Area
|
||||
def get_ide_plotting():
|
||||
main_app.set_current("developer_view")
|
||||
return (dev_widget.plotting_ads, None)
|
||||
|
||||
step_id = guided_tour.register_widget(
|
||||
widget=get_ide_plotting,
|
||||
title="Plotting Area",
|
||||
text="View plots and visualizations generated by your scripts. Arrange multiple plots in a flexible layout.",
|
||||
)
|
||||
step_ids.append(step_id)
|
||||
|
||||
return ViewTourSteps(view_title="Developer View", step_ids=step_ids)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
@@ -126,11 +50,7 @@ if __name__ == "__main__":
|
||||
_app.resize(width, height)
|
||||
developer_view = DeveloperView()
|
||||
_app.add_view(
|
||||
icon="code_blocks",
|
||||
title="IDE",
|
||||
widget=developer_view,
|
||||
view_id="developer_view",
|
||||
exclusive=True,
|
||||
icon="code_blocks", title="IDE", widget=developer_view, id="developer_view", exclusive=True
|
||||
)
|
||||
_app.show()
|
||||
# developer_view.show()
|
||||
|
||||
@@ -16,9 +16,9 @@ from bec_widgets.utils.toolbars.toolbar import ModularToolBar
|
||||
from bec_widgets.widgets.containers.dock_area.basic_dock_area import DockAreaWidget
|
||||
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
|
||||
from bec_widgets.widgets.containers.qt_ads import CDockWidget
|
||||
from bec_widgets.widgets.editors.bec_console.bec_console import BecConsole, BECShell
|
||||
from bec_widgets.widgets.editors.monaco.monaco_dock import MonacoDock
|
||||
from bec_widgets.widgets.editors.monaco.monaco_widget import MonacoWidget
|
||||
from bec_widgets.widgets.editors.web_console.web_console import BECShell, WebConsole
|
||||
from bec_widgets.widgets.utility.ide_explorer.ide_explorer import IDEExplorer
|
||||
|
||||
|
||||
@@ -94,11 +94,11 @@ class DeveloperWidget(DockAreaWidget):
|
||||
self.explorer = IDEExplorer(self)
|
||||
self.explorer.setObjectName("Explorer")
|
||||
|
||||
self.console = BECShell(self, rpc_exposed=False)
|
||||
self.console = BECShell(self)
|
||||
self.console.setObjectName("BEC Shell")
|
||||
self.terminal = BecConsole(self, rpc_exposed=False)
|
||||
self.terminal = WebConsole(self)
|
||||
self.terminal.setObjectName("Terminal")
|
||||
self.monaco = MonacoDock(self, rpc_exposed=False, rpc_passthrough_children=False)
|
||||
self.monaco = MonacoDock(self)
|
||||
self.monaco.setObjectName("MonacoEditor")
|
||||
self.monaco.save_enabled.connect(self._on_save_enabled_update)
|
||||
self.plotting_ads = BECDockArea(
|
||||
@@ -410,3 +410,23 @@ class DeveloperWidget(DockAreaWidget):
|
||||
"""Clean up resources used by the developer widget."""
|
||||
self.delete_all()
|
||||
return super().cleanup()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
|
||||
from bec_qthemes import apply_theme
|
||||
from qtpy.QtWidgets import QApplication
|
||||
|
||||
from bec_widgets.applications.main_app import BECMainApp
|
||||
|
||||
app = QApplication(sys.argv)
|
||||
apply_theme("dark")
|
||||
|
||||
_app = BECMainApp()
|
||||
_app.show()
|
||||
# developer_view.show()
|
||||
# developer_view.setWindowTitle("Developer View")
|
||||
# developer_view.resize(1920, 1080)
|
||||
# developer_view.set_stretch(horizontal=[1, 3, 2], vertical=[5, 5]) #can be set during runtime
|
||||
sys.exit(app.exec_())
|
||||
|
||||
+2
-2
@@ -31,7 +31,7 @@ logger = bec_logger.logger
|
||||
class DeviceManagerOphydValidationDialog(QtWidgets.QDialog):
|
||||
"""Popup dialog to test Ophyd device configurations interactively."""
|
||||
|
||||
def __init__(self, parent=None, config: dict | None = None): # type: ignore
|
||||
def __init__(self, parent=None, config: dict | None = None): # type:ignore
|
||||
super().__init__(parent)
|
||||
self.setWindowTitle("Device Manager Ophyd Test")
|
||||
self._config_status = ConfigStatus.UNKNOWN.value
|
||||
@@ -133,7 +133,7 @@ class DeviceFormDialog(QtWidgets.QDialog):
|
||||
# validated: config_status, connection_status
|
||||
accepted_data = QtCore.Signal(dict, int, int, str, str)
|
||||
|
||||
def __init__(self, parent=None, add_btn_text: str = "Add Device"): # type: ignore
|
||||
def __init__(self, parent=None, add_btn_text: str = "Add Device"): # type:ignore
|
||||
super().__init__(parent)
|
||||
# Track old device name if config is edited
|
||||
self._old_device_name: str = ""
|
||||
|
||||
+1
-1
@@ -4,7 +4,7 @@ from __future__ import annotations
|
||||
|
||||
from enum import IntEnum
|
||||
from functools import partial
|
||||
from typing import TYPE_CHECKING, Any, List, Tuple
|
||||
from typing import TYPE_CHECKING, List, Tuple
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_qthemes import apply_theme, material_icon
|
||||
|
||||
+11
-7
@@ -103,14 +103,16 @@ class CustomBusyWidget(QWidget):
|
||||
button_width = int(button_height * aspect_ratio)
|
||||
self.cancel_button.setFixedSize(button_width, button_height)
|
||||
color = get_accent_colors()
|
||||
self.cancel_button.setStyleSheet(f"""
|
||||
self.cancel_button.setStyleSheet(
|
||||
f"""
|
||||
QPushButton {{
|
||||
background-color: {color.emergency.name()};
|
||||
color: white;
|
||||
font-weight: 600;
|
||||
border-radius: 6px;
|
||||
}}
|
||||
""")
|
||||
"""
|
||||
)
|
||||
|
||||
# Layout
|
||||
content_layout = QVBoxLayout(self)
|
||||
@@ -126,10 +128,12 @@ class CustomBusyWidget(QWidget):
|
||||
bg_color = color._colors.get("BG", None)
|
||||
if bg_color is None: # Fallback if missing
|
||||
bg_color = QColor(50, 50, 50, 255)
|
||||
self.setStyleSheet(f"""
|
||||
self.setStyleSheet(
|
||||
f"""
|
||||
background-color: {bg_color.name()};
|
||||
border-radius: 12px;
|
||||
""")
|
||||
"""
|
||||
)
|
||||
|
||||
def _ui_scale(self) -> int:
|
||||
parent = self.parent()
|
||||
@@ -169,7 +173,7 @@ class DeviceManagerDisplayWidget(DockAreaWidget):
|
||||
self._upload_redis_dialog: UploadRedisDialog | None = None
|
||||
self._dialog_validation_connection: QMetaObject.Connection | None = None
|
||||
|
||||
# NOTE: We need here a separate config helper instance to avoid conflicts with
|
||||
# NOTE: We need here a seperate config helper instance to avoid conflicts with
|
||||
# other communications to REDIS as uploading a config through a CommunicationConfigAction
|
||||
# will block if we use the config_helper from self.client.config._config_helper
|
||||
self._config_helper = config_helper.ConfigHelper(self.client.connector)
|
||||
@@ -607,8 +611,8 @@ class DeviceManagerDisplayWidget(DockAreaWidget):
|
||||
self.device_table_view._is_config_in_sync_with_redis()
|
||||
)
|
||||
validation_results = self.device_table_view.get_validation_results()
|
||||
for config, config_status, connection_status in validation_results.values():
|
||||
if connection_status == ConnectionStatus.CONNECTED.value:
|
||||
for config, config_status, connnection_status in validation_results.values():
|
||||
if connnection_status == ConnectionStatus.CONNECTED.value:
|
||||
self.device_table_view.update_device_validation(
|
||||
config, config_status, ConnectionStatus.CAN_CONNECT, ""
|
||||
)
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
"""Module for Device Manager View."""
|
||||
|
||||
from qtpy.QtCore import QRect
|
||||
from qtpy.QtWidgets import QWidget
|
||||
|
||||
from bec_widgets.applications.views.device_manager_view.device_manager_widget import (
|
||||
DeviceManagerWidget,
|
||||
)
|
||||
from bec_widgets.applications.views.view import ViewBase, ViewTourSteps
|
||||
from bec_widgets.applications.views.view import ViewBase
|
||||
from bec_widgets.utils.error_popups import SafeSlot
|
||||
|
||||
|
||||
@@ -20,21 +19,11 @@ class DeviceManagerView(ViewBase):
|
||||
parent: QWidget | None = None,
|
||||
content: QWidget | None = None,
|
||||
*,
|
||||
view_id: str | None = None,
|
||||
id: str | None = None,
|
||||
title: str | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(
|
||||
parent=parent,
|
||||
content=content,
|
||||
view_id=view_id,
|
||||
title=title,
|
||||
rpc_passthrough_children=False,
|
||||
**kwargs,
|
||||
)
|
||||
self.device_manager_widget = DeviceManagerWidget(
|
||||
parent=self, rpc_exposed=False, rpc_passthrough_children=False
|
||||
)
|
||||
super().__init__(parent=parent, content=content, id=id, title=title)
|
||||
self.device_manager_widget = DeviceManagerWidget(parent=self)
|
||||
self.set_content(self.device_manager_widget)
|
||||
|
||||
@SafeSlot()
|
||||
@@ -45,110 +34,6 @@ class DeviceManagerView(ViewBase):
|
||||
"""
|
||||
self.device_manager_widget.on_enter()
|
||||
|
||||
def register_tour_steps(self, guided_tour, main_app) -> ViewTourSteps | None:
|
||||
"""Register Device Manager components with the guided tour.
|
||||
|
||||
Args:
|
||||
guided_tour: The GuidedTour instance to register with.
|
||||
main_app: The main application instance (for accessing set_current).
|
||||
|
||||
Returns:
|
||||
ViewTourSteps | None: Model containing view title and step IDs.
|
||||
"""
|
||||
step_ids = []
|
||||
dm_widget = self.device_manager_widget
|
||||
|
||||
# The device_manager_widget is not yet initialized, so we will register
|
||||
# tour steps for its uninitialized state.
|
||||
|
||||
# Register Load Current Config button
|
||||
def get_load_current():
|
||||
main_app.set_current("device_manager")
|
||||
if dm_widget._initialized is True:
|
||||
return (None, None)
|
||||
return (dm_widget.button_load_current_config, None)
|
||||
|
||||
step_id = guided_tour.register_widget(
|
||||
widget=get_load_current,
|
||||
title="Load Current Config",
|
||||
text="Load the current device configuration from the BEC server.",
|
||||
)
|
||||
step_ids.append(step_id)
|
||||
|
||||
# Register Load Config From File button
|
||||
def get_load_file():
|
||||
main_app.set_current("device_manager")
|
||||
if dm_widget._initialized is True:
|
||||
return (None, None)
|
||||
return (dm_widget.button_load_config_from_file, None)
|
||||
|
||||
step_id = guided_tour.register_widget(
|
||||
widget=get_load_file,
|
||||
title="Load Config From File",
|
||||
text="Load a device configuration from a YAML file on disk.",
|
||||
)
|
||||
step_ids.append(step_id)
|
||||
|
||||
## Register steps for the initialized state
|
||||
# Register main device table
|
||||
def get_device_table():
|
||||
main_app.set_current("device_manager")
|
||||
if dm_widget._initialized is False:
|
||||
return (None, None)
|
||||
return (dm_widget.device_manager_display.device_table_view, None)
|
||||
|
||||
step_id = guided_tour.register_widget(
|
||||
widget=get_device_table,
|
||||
title="Device Table",
|
||||
text="This table displays the config that is prepared to be uploaded to the BEC server. It allows users to review and modify device config settings, and also validate them before uploading to the BEC server.",
|
||||
)
|
||||
step_ids.append(step_id)
|
||||
|
||||
col_text_mapping = {
|
||||
0: "Shows if a device configuration is valid. Automatically validated when adding a new device.",
|
||||
1: "Shows if a device is connectable. Validated on demand.",
|
||||
2: "Device name, unique across all devices within a config.",
|
||||
3: "Device class used to initialize the device on the BEC server.",
|
||||
4: "Defines how BEC treats readings of the device during scans. The options are 'monitored', 'baseline', 'async', 'continuous' or 'on_demand'.",
|
||||
5: "Defines how BEC reacts if a device readback fails. Options are 'raise', 'retry', or 'buffer'.",
|
||||
6: "User-defined tags associated with the device.",
|
||||
7: "A brief description of the device.",
|
||||
8: "Device is enabled when the configuration is loaded.",
|
||||
9: "Device is set to read-only.",
|
||||
10: "This flag allows to configure if the 'trigger' method of the device is called during scans.",
|
||||
}
|
||||
|
||||
# We have at least one device registered
|
||||
def get_device_table_row(column: int):
|
||||
main_app.set_current("device_manager")
|
||||
if dm_widget._initialized is False:
|
||||
return (None, None)
|
||||
table = dm_widget.device_manager_display.device_table_view.table
|
||||
header = table.horizontalHeader()
|
||||
x = header.sectionViewportPosition(column)
|
||||
table.horizontalScrollBar().setValue(x)
|
||||
# Recompute after scrolling
|
||||
x = header.sectionViewportPosition(column)
|
||||
w = header.sectionSize(column)
|
||||
h = header.height()
|
||||
rect = QRect(x, 0, w, h)
|
||||
top_left = header.viewport().mapTo(main_app, rect.topLeft())
|
||||
|
||||
return (QRect(top_left, rect.size()), col_text_mapping.get(column, ""))
|
||||
|
||||
for col, text in col_text_mapping.items():
|
||||
step_id = guided_tour.register_widget(
|
||||
widget=lambda col=col: get_device_table_row(col),
|
||||
title=f"{dm_widget.device_manager_display.device_table_view.table.horizontalHeaderItem(col).text()}",
|
||||
text=text,
|
||||
)
|
||||
step_ids.append(step_id)
|
||||
|
||||
if not step_ids:
|
||||
return None
|
||||
|
||||
return ViewTourSteps(view_title="Device Manager", step_ids=step_ids)
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
import sys
|
||||
@@ -180,7 +65,7 @@ if __name__ == "__main__": # pragma: no cover
|
||||
_app.add_view(
|
||||
icon="display_settings",
|
||||
title="Device Manager",
|
||||
view_id="device_manager",
|
||||
id="device_manager",
|
||||
widget=device_manager_view.device_manager_widget,
|
||||
mini_text="DM",
|
||||
)
|
||||
|
||||
@@ -22,8 +22,8 @@ class DeviceManagerWidget(BECWidget, QtWidgets.QWidget):
|
||||
|
||||
RPC = False
|
||||
|
||||
def __init__(self, parent=None, client=None, **kwargs):
|
||||
super().__init__(parent=parent, client=client, **kwargs)
|
||||
def __init__(self, parent=None, client=None):
|
||||
super().__init__(parent=parent, client=client)
|
||||
self.stacked_layout = QtWidgets.QStackedLayout()
|
||||
self.stacked_layout.setContentsMargins(0, 0, 0, 0)
|
||||
self.stacked_layout.setSpacing(0)
|
||||
|
||||
@@ -9,23 +9,16 @@ class DockAreaView(ViewBase):
|
||||
Modular dock area view for arranging and managing multiple dockable widgets.
|
||||
"""
|
||||
|
||||
RPC_CONTENT_CLASS = BECDockArea
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
parent: QWidget | None = None,
|
||||
content: QWidget | None = None,
|
||||
*,
|
||||
view_id: str | None = None,
|
||||
id: str | None = None,
|
||||
title: str | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(parent=parent, content=content, view_id=view_id, title=title, **kwargs)
|
||||
super().__init__(parent=parent, content=content, id=id, title=title)
|
||||
self.dock_area = BECDockArea(
|
||||
self,
|
||||
profile_namespace="bec",
|
||||
auto_profile_namespace=False,
|
||||
object_name="DockArea",
|
||||
rpc_exposed=False,
|
||||
self, profile_namespace="bec", auto_profile_namespace=False, object_name="DockArea"
|
||||
)
|
||||
self.set_content(self.dock_area)
|
||||
|
||||
@@ -1,8 +1,5 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import List
|
||||
|
||||
from pydantic import BaseModel
|
||||
from qtpy.QtCore import QEventLoop
|
||||
from qtpy.QtWidgets import (
|
||||
QDialog,
|
||||
@@ -17,26 +14,13 @@ from qtpy.QtWidgets import (
|
||||
QWidget,
|
||||
)
|
||||
|
||||
from bec_widgets import BECWidget
|
||||
from bec_widgets.utils.error_popups import SafeSlot
|
||||
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
|
||||
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import SignalComboBox
|
||||
from bec_widgets.widgets.plots.waveform.waveform import Waveform
|
||||
|
||||
|
||||
class ViewTourSteps(BaseModel):
|
||||
"""Model representing tour steps for a view.
|
||||
|
||||
Attributes:
|
||||
view_title: The human-readable title of the view.
|
||||
step_ids: List of registered step IDs in the order they should appear.
|
||||
"""
|
||||
|
||||
view_title: str
|
||||
step_ids: List[str]
|
||||
|
||||
|
||||
class ViewBase(BECWidget, QWidget):
|
||||
class ViewBase(QWidget):
|
||||
"""Wrapper for a content widget used inside the main app's stacked view.
|
||||
|
||||
Subclasses can implement `on_enter` and `on_exit` to run custom logic when the view becomes visible or is about to be hidden.
|
||||
@@ -44,28 +28,21 @@ class ViewBase(BECWidget, QWidget):
|
||||
Args:
|
||||
content (QWidget): The actual view widget to display.
|
||||
parent (QWidget | None): Parent widget.
|
||||
view_id (str | None): Optional view view_id, useful for debugging or introspection.
|
||||
id (str | None): Optional view id, useful for debugging or introspection.
|
||||
title (str | None): Optional human-readable title.
|
||||
"""
|
||||
|
||||
RPC = True
|
||||
PLUGIN = False
|
||||
USER_ACCESS = ["activate"]
|
||||
RPC_CONTENT_CLASS: type[QWidget] | None = None
|
||||
RPC_CONTENT_ATTR = "content"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
parent: QWidget | None = None,
|
||||
content: QWidget | None = None,
|
||||
*,
|
||||
view_id: str | None = None,
|
||||
id: str | None = None,
|
||||
title: str | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(parent=parent, **kwargs)
|
||||
super().__init__(parent=parent)
|
||||
self.content: QWidget | None = None
|
||||
self.view_id = view_id
|
||||
self.view_id = id
|
||||
self.view_title = title
|
||||
|
||||
lay = QVBoxLayout(self)
|
||||
@@ -99,41 +76,6 @@ class ViewBase(BECWidget, QWidget):
|
||||
"""
|
||||
return True
|
||||
|
||||
@SafeSlot()
|
||||
def activate(self) -> None:
|
||||
"""Switch the parent application to this view."""
|
||||
if not self.view_id:
|
||||
raise ValueError("Cannot switch view without a view_id.")
|
||||
|
||||
parent = self.parent()
|
||||
while parent is not None:
|
||||
if hasattr(parent, "set_current"):
|
||||
parent.set_current(self.view_id)
|
||||
return
|
||||
parent = parent.parent()
|
||||
raise RuntimeError("Could not find a parent application with set_current().")
|
||||
|
||||
def cleanup(self):
|
||||
if self.content is not None:
|
||||
self.content.close()
|
||||
self.content.deleteLater()
|
||||
super().cleanup()
|
||||
|
||||
def register_tour_steps(self, guided_tour, main_app) -> ViewTourSteps | None:
|
||||
"""Register this view's components with the guided tour.
|
||||
|
||||
Args:
|
||||
guided_tour: The GuidedTour instance to register with.
|
||||
main_app: The main application instance (for accessing set_current).
|
||||
|
||||
Returns:
|
||||
ViewTourSteps | None: A model containing the view title and step IDs,
|
||||
or None if this view has no tour steps.
|
||||
|
||||
Override this method in subclasses to register view-specific components.
|
||||
"""
|
||||
return None
|
||||
|
||||
|
||||
####################################################################################################
|
||||
# Example views for demonstration/testing purposes
|
||||
@@ -160,17 +102,17 @@ class WaveformViewPopup(ViewBase): # pragma: no cover
|
||||
self.device_edit.insertItem(0, "")
|
||||
self.device_edit.setEditable(True)
|
||||
self.device_edit.setCurrentIndex(0)
|
||||
self.signal_edit = SignalComboBox(parent=self)
|
||||
self.signal_edit.include_config_signals = False
|
||||
self.signal_edit.insertItem(0, "")
|
||||
self.signal_edit.setEditable(True)
|
||||
self.device_edit.currentTextChanged.connect(self.signal_edit.set_device)
|
||||
self.device_edit.device_reset.connect(self.signal_edit.reset_selection)
|
||||
self.entry_edit = SignalComboBox(parent=self)
|
||||
self.entry_edit.include_config_signals = False
|
||||
self.entry_edit.insertItem(0, "")
|
||||
self.entry_edit.setEditable(True)
|
||||
self.device_edit.currentTextChanged.connect(self.entry_edit.set_device)
|
||||
self.device_edit.device_reset.connect(self.entry_edit.reset_selection)
|
||||
|
||||
form = QFormLayout()
|
||||
form.addRow(label)
|
||||
form.addRow("Device", self.device_edit)
|
||||
form.addRow("Signal", self.signal_edit)
|
||||
form.addRow("Signal", self.entry_edit)
|
||||
|
||||
buttons = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel, parent=dialog)
|
||||
buttons.accepted.connect(dialog.accept)
|
||||
@@ -182,7 +124,7 @@ class WaveformViewPopup(ViewBase): # pragma: no cover
|
||||
|
||||
if dialog.exec_() == QDialog.Accepted:
|
||||
self.waveform.plot(
|
||||
device_y=self.device_edit.currentText(), signal_y=self.signal_edit.currentText()
|
||||
y_name=self.device_edit.currentText(), y_entry=self.entry_edit.currentText()
|
||||
)
|
||||
|
||||
@SafeSlot()
|
||||
@@ -307,7 +249,7 @@ class WaveformViewInline(ViewBase): # pragma: no cover
|
||||
dev = self.device_edit.currentText()
|
||||
sig = self.entry_edit.currentText()
|
||||
if dev and sig:
|
||||
self.waveform.plot(device_y=dev, signal_y=sig)
|
||||
self.waveform.plot(y_name=dev, y_entry=sig)
|
||||
self.stack.setCurrentIndex(1)
|
||||
|
||||
def _show_waveform_without_changes(self):
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1 +0,0 @@
|
||||
from bec_widgets.cli.rpc import rpc_base
|
||||
|
||||
+198
-662
File diff suppressed because it is too large
Load Diff
+48
-245
@@ -5,15 +5,14 @@ from __future__ import annotations
|
||||
import json
|
||||
import os
|
||||
import select
|
||||
import signal
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
from contextlib import contextmanager
|
||||
from threading import Lock
|
||||
from typing import TYPE_CHECKING, Callable, Literal, TypeAlias, cast
|
||||
from typing import TYPE_CHECKING, Literal, TypeAlias, cast
|
||||
|
||||
from bec_lib.endpoints import EndpointInfo, MessageEndpoints
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
|
||||
from rich.console import Console
|
||||
@@ -34,12 +33,6 @@ else:
|
||||
logger = bec_logger.logger
|
||||
|
||||
IGNORE_WIDGETS = ["LaunchWindow"]
|
||||
PROCESS_TERMINATION_TIMEOUT = 10
|
||||
PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT = 2
|
||||
PROCESS_OUTPUT_SELECT_TIMEOUT = 0.2
|
||||
GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT = 3
|
||||
GRACEFUL_SERVER_SHUTDOWN_TIMEOUT = 5
|
||||
OUTPUT_READER_STOP_EVENT_ATTR = "_bec_output_reader_stop_event"
|
||||
|
||||
RegistryState: TypeAlias = dict[
|
||||
Literal["gui_id", "name", "widget_class", "config", "__rpc__", "container_proxy"],
|
||||
@@ -60,16 +53,14 @@ def _filter_output(output: str) -> str:
|
||||
return output
|
||||
|
||||
|
||||
def _get_output(process, logger, stop_event: threading.Event | None = None) -> None:
|
||||
def _get_output(process, logger) -> None:
|
||||
log_func = {process.stdout: logger.debug, process.stderr: logger.info}
|
||||
stream_buffer = {process.stdout: [], process.stderr: []}
|
||||
try:
|
||||
os.set_blocking(process.stdout.fileno(), False)
|
||||
os.set_blocking(process.stderr.fileno(), False)
|
||||
while process.poll() is None and not (stop_event and stop_event.is_set()):
|
||||
readylist, _, _ = select.select(
|
||||
[process.stdout, process.stderr], [], [], PROCESS_OUTPUT_SELECT_TIMEOUT
|
||||
)
|
||||
while process.poll() is None:
|
||||
readylist, _, _ = select.select([process.stdout, process.stderr], [], [], 1)
|
||||
for stream in (process.stdout, process.stderr):
|
||||
buf = stream_buffer[stream]
|
||||
if stream in readylist:
|
||||
@@ -84,95 +75,6 @@ def _get_output(process, logger, stop_event: threading.Event | None = None) -> N
|
||||
logger.error(f"Error reading process output: {str(e)}")
|
||||
|
||||
|
||||
def _process_group_snapshot(process) -> str:
|
||||
try:
|
||||
pgid = os.getpgid(process.pid)
|
||||
except ProcessLookupError:
|
||||
return "Process group snapshot unavailable: process already exited"
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["ps", "-o", "pid,ppid,pgid,stat,command", "-g", str(pgid)],
|
||||
check=False,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=2,
|
||||
)
|
||||
except Exception as exc:
|
||||
return f"Process group snapshot unavailable: {exc}"
|
||||
output = result.stdout.strip()
|
||||
if not output:
|
||||
return f"Process group snapshot empty for pgid={pgid}"
|
||||
return output
|
||||
|
||||
|
||||
def _terminate_plot_process(process, logger, timeout: float = PROCESS_TERMINATION_TIMEOUT) -> None:
|
||||
if process.poll() is not None:
|
||||
return
|
||||
|
||||
process_info = f"pid={process.pid} command={process.args}"
|
||||
try:
|
||||
pgid = os.getpgid(process.pid)
|
||||
process_info = f"pid={process.pid} pgid={pgid} command={process.args}"
|
||||
logger.info(f"Terminating GUI process group {process_info}")
|
||||
os.killpg(pgid, signal.SIGTERM)
|
||||
except ProcessLookupError:
|
||||
process.wait(timeout=timeout)
|
||||
return
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to terminate GUI process group; terminating process only.")
|
||||
logger.info(f"GUI process termination failure details: {exc}. pid={process.pid}")
|
||||
process.terminate()
|
||||
|
||||
try:
|
||||
process.wait(timeout=timeout)
|
||||
return
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.warning(f"GUI process did not stop within {timeout}s; killing it.")
|
||||
logger.info(
|
||||
f"GUI process force-kill details: {process_info}\n"
|
||||
f"{_process_group_snapshot(process)}"
|
||||
)
|
||||
|
||||
try:
|
||||
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
|
||||
except ProcessLookupError as e:
|
||||
logger.error(f"Failed to kill GUI process group: {e}")
|
||||
process.wait(timeout=timeout)
|
||||
return
|
||||
process.wait(timeout=timeout)
|
||||
|
||||
|
||||
def _wait_for_process_exit(process, timeout: float) -> bool:
|
||||
try:
|
||||
process.wait(timeout=timeout)
|
||||
except subprocess.TimeoutExpired:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _join_process_output_thread(process, thread: threading.Thread | None, logger) -> None:
|
||||
if thread is None:
|
||||
return
|
||||
thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT)
|
||||
if not thread.is_alive():
|
||||
return
|
||||
|
||||
if stop_event := getattr(thread, OUTPUT_READER_STOP_EVENT_ATTR, None):
|
||||
stop_event.set()
|
||||
|
||||
for stream in (process.stdout, process.stderr):
|
||||
if stream is None:
|
||||
continue
|
||||
try:
|
||||
stream.close()
|
||||
except OSError as e:
|
||||
logger.error(f"Failed to close stream {str(e)}")
|
||||
thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT)
|
||||
if thread.is_alive():
|
||||
logger.warning("GUI process output reader thread did not stop after process shutdown.")
|
||||
logger.info(f"GUI process output reader thread details: pid={process.pid}")
|
||||
|
||||
|
||||
def _start_plot_process(
|
||||
gui_id: str,
|
||||
gui_class_id: str,
|
||||
@@ -224,14 +126,8 @@ def _start_plot_process(
|
||||
if logger is None:
|
||||
process_output_processing_thread = None
|
||||
else:
|
||||
process_output_stop_event = threading.Event()
|
||||
process_output_processing_thread = threading.Thread(
|
||||
target=_get_output, args=(process, logger, process_output_stop_event)
|
||||
)
|
||||
setattr(
|
||||
process_output_processing_thread,
|
||||
OUTPUT_READER_STOP_EVENT_ATTR,
|
||||
process_output_stop_event,
|
||||
target=_get_output, args=(process, logger)
|
||||
)
|
||||
process_output_processing_thread.start()
|
||||
return process, process_output_processing_thread
|
||||
@@ -326,7 +222,6 @@ class BECGuiClient(RPCBase):
|
||||
self._ipython_registry: dict[str, RPCReference] = {}
|
||||
self.available_widgets = AvailableWidgetsNamespace()
|
||||
register_serializer_extension()
|
||||
self._rpc_timeout = 60
|
||||
|
||||
####################
|
||||
#### Client API ####
|
||||
@@ -337,21 +232,6 @@ class BECGuiClient(RPCBase):
|
||||
"""The launcher object."""
|
||||
return RPCBase(gui_id=f"{self._gui_id}:launcher", parent=self, object_name="launcher")
|
||||
|
||||
def set_rpc_timeout(self, timeout: float):
|
||||
"""Set the timeout for RPC calls to the GUI server.
|
||||
|
||||
Args:
|
||||
timeout(float): The timeout in seconds.
|
||||
"""
|
||||
if not isinstance(timeout, (int, float)) or timeout < 0:
|
||||
raise ValueError("Timeout must be a non-negative number.")
|
||||
self._rpc_timeout = timeout
|
||||
|
||||
def _safe_register_stream(self, endpoint: EndpointInfo, cb: Callable, **kwargs):
|
||||
"""Check if already registered for registration in idempotent functions."""
|
||||
if not self._client.connector.any_stream_is_registered(endpoint, cb=cb):
|
||||
self._client.connector.register(endpoint, cb=cb, **kwargs)
|
||||
|
||||
def connect_to_gui_server(self, gui_id: str) -> None:
|
||||
"""Connect to a GUI server"""
|
||||
# Unregister the old callback
|
||||
@@ -367,9 +247,10 @@ class BECGuiClient(RPCBase):
|
||||
self._ipython_registry = {}
|
||||
|
||||
# Register the new callback
|
||||
self._safe_register_stream(
|
||||
self._client.connector.register(
|
||||
MessageEndpoints.gui_registry_state(self._gui_id),
|
||||
cb=self._handle_registry_update,
|
||||
parent=self,
|
||||
from_start=True,
|
||||
)
|
||||
|
||||
@@ -416,32 +297,14 @@ class BECGuiClient(RPCBase):
|
||||
return self._raise_all()
|
||||
return self._start(wait=wait)
|
||||
|
||||
def change_theme(self, theme: Literal["light", "dark"] | None = None) -> None:
|
||||
"""
|
||||
Apply a GUI theme or toggle between dark and light.
|
||||
|
||||
Args:
|
||||
theme(Literal["light", "dark"] | None): Theme to apply. If None, the current
|
||||
theme is fetched from the GUI and toggled.
|
||||
"""
|
||||
if not self._check_if_server_is_alive():
|
||||
self._start(wait=True)
|
||||
|
||||
with wait_for_server(self):
|
||||
if theme is None:
|
||||
current_theme = self.launcher._run_rpc("fetch_theme")
|
||||
next_theme = "light" if current_theme == "dark" else "dark"
|
||||
else:
|
||||
next_theme = theme
|
||||
self.launcher._run_rpc("change_theme", theme=next_theme)
|
||||
|
||||
def new(
|
||||
self,
|
||||
name: str | None = None,
|
||||
wait: bool = True,
|
||||
geometry: tuple[int, int, int, int] | None = None,
|
||||
launch_script: str = "dock_area",
|
||||
startup_profile: str | Literal["restore", "skip"] | None = None,
|
||||
profile: str | None = None,
|
||||
start_empty: bool = False,
|
||||
**kwargs,
|
||||
) -> client.AdvancedDockArea:
|
||||
"""Create a new top-level dock area.
|
||||
@@ -451,81 +314,48 @@ class BECGuiClient(RPCBase):
|
||||
wait(bool, optional): Whether to wait for the server to start. Defaults to True.
|
||||
geometry(tuple[int, int, int, int] | None): The geometry of the dock area (pos_x, pos_y, w, h).
|
||||
launch_script(str): The launch script to use. Defaults to "dock_area".
|
||||
startup_profile(str | Literal["restore", "skip"] | None): Startup mode for
|
||||
the dock area:
|
||||
- None: start in transient empty workspace
|
||||
- "restore": restore last-used profile
|
||||
- "skip": skip profile initialization
|
||||
- "<name>": load the named profile
|
||||
profile(str | None): The profile name to load. If None, loads the "general" profile.
|
||||
Use a profile name to load a specific saved profile.
|
||||
start_empty(bool): If True, start with an empty dock area when loading specified profile.
|
||||
**kwargs: Additional keyword arguments passed to the dock area.
|
||||
|
||||
Returns:
|
||||
client.AdvancedDockArea: The new dock area.
|
||||
|
||||
Examples:
|
||||
>>> gui.new() # Start with an empty unsaved workspace
|
||||
>>> gui.new(startup_profile="restore") # Restore last profile
|
||||
>>> gui.new(startup_profile="my_profile") # Load explicit profile
|
||||
"""
|
||||
if "profile" in kwargs or "start_empty" in kwargs:
|
||||
raise TypeError(
|
||||
"gui.new() no longer accepts 'profile' or 'start_empty'. Use 'startup_profile' instead."
|
||||
)
|
||||
Note:
|
||||
The "general" profile is mandatory and will always exist. If manually deleted,
|
||||
it will be automatically recreated.
|
||||
|
||||
Examples:
|
||||
>>> gui.new() # Start with the "general" profile
|
||||
>>> gui.new(profile="my_profile") # Load specific profile, if profile does not exist, the new profile is created empty with specified name
|
||||
>>> gui.new(start_empty=True) # Start with "general" profile but empty dock area
|
||||
>>> gui.new(profile="my_profile", start_empty=True) # Start with "my_profile" profile but empty dock area
|
||||
"""
|
||||
if not self._check_if_server_is_alive():
|
||||
self.show(wait=True)
|
||||
self.start(wait=True)
|
||||
if wait:
|
||||
with wait_for_server(self):
|
||||
return self._new_impl(
|
||||
name=name,
|
||||
geometry=geometry,
|
||||
widget = self.launcher._run_rpc(
|
||||
"launch",
|
||||
launch_script=launch_script,
|
||||
startup_profile=startup_profile,
|
||||
**kwargs,
|
||||
)
|
||||
return self._new_impl(
|
||||
name=name,
|
||||
geometry=geometry,
|
||||
launch_script=launch_script,
|
||||
startup_profile=startup_profile,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def _new_impl(
|
||||
self,
|
||||
*,
|
||||
name: str | None,
|
||||
geometry: tuple[int, int, int, int] | None,
|
||||
launch_script: str,
|
||||
startup_profile: str | Literal["restore", "skip"] | None,
|
||||
**kwargs,
|
||||
):
|
||||
if launch_script == "dock_area":
|
||||
try:
|
||||
return self.launcher._run_rpc(
|
||||
"system.launch_dock_area",
|
||||
name=name,
|
||||
geometry=geometry,
|
||||
startup_profile=startup_profile,
|
||||
profile=profile,
|
||||
start_empty=start_empty,
|
||||
**kwargs,
|
||||
)
|
||||
except ValueError as exc:
|
||||
error = str(exc)
|
||||
if (
|
||||
"Unknown system RPC method: system.launch_dock_area" not in error
|
||||
and "has no attribute 'system.launch_dock_area'" not in error
|
||||
):
|
||||
raise
|
||||
logger.debug("Server does not support system.launch_dock_area; using launcher RPC")
|
||||
|
||||
return self.launcher._run_rpc(
|
||||
) # pylint: disable=protected-access
|
||||
return widget
|
||||
widget = self.launcher._run_rpc(
|
||||
"launch",
|
||||
launch_script=launch_script,
|
||||
name=name,
|
||||
geometry=geometry,
|
||||
startup_profile=startup_profile,
|
||||
profile=profile,
|
||||
start_empty=start_empty,
|
||||
**kwargs,
|
||||
) # pylint: disable=protected-access
|
||||
return widget
|
||||
|
||||
def delete(self, name: str) -> None:
|
||||
"""Delete a dock area and its parent window.
|
||||
@@ -569,13 +399,11 @@ class BECGuiClient(RPCBase):
|
||||
|
||||
if self._process:
|
||||
logger.success("Stopping GUI...")
|
||||
if not self._request_server_shutdown():
|
||||
_terminate_plot_process(self._process, logger)
|
||||
_join_process_output_thread(
|
||||
self._process, self._process_output_processing_thread, logger
|
||||
)
|
||||
self._process.terminate()
|
||||
if self._process_output_processing_thread:
|
||||
self._process_output_processing_thread.join()
|
||||
self._process.wait()
|
||||
self._process = None
|
||||
self._process_output_processing_thread = None
|
||||
|
||||
# Unregister the registry state
|
||||
self._client.connector.unregister(
|
||||
@@ -594,37 +422,6 @@ class BECGuiClient(RPCBase):
|
||||
#### Private methods ####
|
||||
#########################
|
||||
|
||||
def _request_server_shutdown(self) -> bool:
|
||||
if self._process is None or self._process.poll() is not None:
|
||||
return True
|
||||
process_details = f"pid={self._process.pid} command={self._process.args}"
|
||||
logger.info(f"Requesting graceful GUI shutdown {process_details}")
|
||||
try:
|
||||
self.launcher._run_rpc( # pylint: disable=protected-access
|
||||
"system.shutdown",
|
||||
wait_for_rpc_response=True,
|
||||
timeout=GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"Could not confirm graceful GUI shutdown via RPC; "
|
||||
"falling back to process termination."
|
||||
)
|
||||
logger.info(f"Graceful GUI shutdown RPC failure details: {exc}. {process_details}")
|
||||
return False
|
||||
if _wait_for_process_exit(self._process, GRACEFUL_SERVER_SHUTDOWN_TIMEOUT):
|
||||
logger.info(f"GUI server exited after graceful shutdown {process_details}")
|
||||
return True
|
||||
logger.warning(
|
||||
"GUI server did not exit after graceful shutdown request; "
|
||||
"falling back to process termination."
|
||||
)
|
||||
logger.info(
|
||||
f"Graceful GUI shutdown timeout details: {process_details}\n"
|
||||
f"{_process_group_snapshot(self._process)}"
|
||||
)
|
||||
return False
|
||||
|
||||
def _check_if_server_is_alive(self):
|
||||
"""Checks if the process is alive"""
|
||||
if self._process is None:
|
||||
@@ -683,14 +480,20 @@ class BECGuiClient(RPCBase):
|
||||
|
||||
def _start(self, wait: bool = False) -> None:
|
||||
self._killed = False
|
||||
self._safe_register_stream(
|
||||
MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update
|
||||
self._client.connector.register(
|
||||
MessageEndpoints.gui_registry_state(self._gui_id),
|
||||
cb=self._handle_registry_update,
|
||||
parent=self,
|
||||
)
|
||||
return self._start_server(wait=wait)
|
||||
|
||||
def _handle_registry_update(self, msg: dict[str, GUIRegistryStateMessage]) -> None:
|
||||
@staticmethod
|
||||
def _handle_registry_update(
|
||||
msg: dict[str, GUIRegistryStateMessage], parent: BECGuiClient
|
||||
) -> None:
|
||||
# This was causing a deadlock during shutdown, not sure why.
|
||||
# with self._lock:
|
||||
self = parent
|
||||
self._server_registry = cast(dict[str, RegistryState], msg["data"].state)
|
||||
self._update_dynamic_namespace(self._server_registry)
|
||||
|
||||
@@ -698,7 +501,7 @@ class BECGuiClient(RPCBase):
|
||||
if self.launcher and len(self._top_level) == 0:
|
||||
self.launcher._run_rpc("show") # pylint: disable=protected-access
|
||||
for window in self._top_level.values():
|
||||
window.raise_window()
|
||||
window.show()
|
||||
|
||||
def _show_all(self):
|
||||
with wait_for_server(self):
|
||||
@@ -717,7 +520,7 @@ class BECGuiClient(RPCBase):
|
||||
if self.launcher and len(self._top_level) == 0:
|
||||
self.launcher._run_rpc("raise") # pylint: disable=protected-access
|
||||
for window in self._top_level.values():
|
||||
window.raise_window()
|
||||
window._run_rpc("raise") # type: ignore[attr-defined]
|
||||
|
||||
def _raise_all(self):
|
||||
with wait_for_server(self):
|
||||
|
||||
@@ -1,166 +0,0 @@
|
||||
# This file was automatically generated by generate_cli.py
|
||||
# type: ignore
|
||||
from __future__ import annotations
|
||||
|
||||
# pylint: skip-file
|
||||
|
||||
designer_plugins = {
|
||||
"AbortButton": ("bec_widgets.widgets.control.buttons.button_abort.button_abort", "AbortButton"),
|
||||
"BECColorMapWidget": (
|
||||
"bec_widgets.widgets.utility.visual.colormap_widget.colormap_widget",
|
||||
"BECColorMapWidget",
|
||||
),
|
||||
"BECMainWindow": ("bec_widgets.widgets.containers.main_window.main_window", "BECMainWindow"),
|
||||
"BECProgressBar": (
|
||||
"bec_widgets.widgets.progress.bec_progressbar.bec_progressbar",
|
||||
"BECProgressBar",
|
||||
),
|
||||
"BECQueue": ("bec_widgets.widgets.services.bec_queue.bec_queue", "BECQueue"),
|
||||
"BECShell": ("bec_widgets.widgets.editors.bec_console.bec_console", "BECShell"),
|
||||
"BECSpinBox": ("bec_widgets.widgets.utility.spinbox.decimal_spinbox", "BECSpinBox"),
|
||||
"BECStatusBox": ("bec_widgets.widgets.services.bec_status_box.bec_status_box", "BECStatusBox"),
|
||||
"BeamlineStateManager": (
|
||||
"bec_widgets.widgets.services.beamline_states.beamline_state_manager",
|
||||
"BeamlineStateManager",
|
||||
),
|
||||
"BecConsole": ("bec_widgets.widgets.editors.bec_console.bec_console", "BecConsole"),
|
||||
"ColorButton": ("bec_widgets.widgets.utility.visual.color_button.color_button", "ColorButton"),
|
||||
"ColorButtonNative": (
|
||||
"bec_widgets.widgets.utility.visual.color_button_native.color_button_native",
|
||||
"ColorButtonNative",
|
||||
),
|
||||
"ColormapSelector": (
|
||||
"bec_widgets.widgets.utility.visual.colormap_selector.colormap_selector",
|
||||
"ColormapSelector",
|
||||
),
|
||||
"DapComboBox": ("bec_widgets.widgets.dap.dap_combo_box.dap_combo_box", "DapComboBox"),
|
||||
"DarkModeButton": (
|
||||
"bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button",
|
||||
"DarkModeButton",
|
||||
),
|
||||
"DeviceBrowser": (
|
||||
"bec_widgets.widgets.services.device_browser.device_browser",
|
||||
"DeviceBrowser",
|
||||
),
|
||||
"DeviceComboBox": (
|
||||
"bec_widgets.widgets.control.device_input.device_combobox.device_combobox",
|
||||
"DeviceComboBox",
|
||||
),
|
||||
"Heatmap": ("bec_widgets.widgets.plots.heatmap.heatmap", "Heatmap"),
|
||||
"IDEExplorer": ("bec_widgets.widgets.utility.ide_explorer.ide_explorer", "IDEExplorer"),
|
||||
"Image": ("bec_widgets.widgets.plots.image.image", "Image"),
|
||||
"LMFitDialog": ("bec_widgets.widgets.dap.lmfit_dialog.lmfit_dialog", "LMFitDialog"),
|
||||
"LogPanel": ("bec_widgets.widgets.utility.logpanel.logpanel", "LogPanel"),
|
||||
"Minesweeper": ("bec_widgets.widgets.games.minesweeper", "Minesweeper"),
|
||||
"MonacoWidget": ("bec_widgets.widgets.editors.monaco.monaco_widget", "MonacoWidget"),
|
||||
"MotorMap": ("bec_widgets.widgets.plots.motor_map.motor_map", "MotorMap"),
|
||||
"MultiWaveform": ("bec_widgets.widgets.plots.multi_waveform.multi_waveform", "MultiWaveform"),
|
||||
"PdfViewerWidget": ("bec_widgets.widgets.utility.pdf_viewer.pdf_viewer", "PdfViewerWidget"),
|
||||
"PositionIndicator": (
|
||||
"bec_widgets.widgets.control.device_control.position_indicator.position_indicator",
|
||||
"PositionIndicator",
|
||||
),
|
||||
"PositionerBox": (
|
||||
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box.positioner_box",
|
||||
"PositionerBox",
|
||||
),
|
||||
"PositionerBox2D": (
|
||||
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box_2d.positioner_box_2d",
|
||||
"PositionerBox2D",
|
||||
),
|
||||
"PositionerControlLine": (
|
||||
"bec_widgets.widgets.control.device_control.positioner_box.positioner_control_line.positioner_control_line",
|
||||
"PositionerControlLine",
|
||||
),
|
||||
"PositionerGroup": (
|
||||
"bec_widgets.widgets.control.device_control.positioner_group.positioner_group",
|
||||
"PositionerGroup",
|
||||
),
|
||||
"ResetButton": ("bec_widgets.widgets.control.buttons.button_reset.button_reset", "ResetButton"),
|
||||
"ResumeButton": (
|
||||
"bec_widgets.widgets.control.buttons.button_resume.button_resume",
|
||||
"ResumeButton",
|
||||
),
|
||||
"RingProgressBar": (
|
||||
"bec_widgets.widgets.progress.ring_progress_bar.ring_progress_bar",
|
||||
"RingProgressBar",
|
||||
),
|
||||
"SBBMonitor": ("bec_widgets.widgets.editors.sbb_monitor.sbb_monitor", "SBBMonitor"),
|
||||
"ScanControl": ("bec_widgets.widgets.control.scan_control.scan_control", "ScanControl"),
|
||||
"ScanMetadata": ("bec_widgets.widgets.editors.scan_metadata.scan_metadata", "ScanMetadata"),
|
||||
"ScanProgressBar": (
|
||||
"bec_widgets.widgets.progress.scan_progressbar.scan_progressbar",
|
||||
"ScanProgressBar",
|
||||
),
|
||||
"ScatterWaveform": (
|
||||
"bec_widgets.widgets.plots.scatter_waveform.scatter_waveform",
|
||||
"ScatterWaveform",
|
||||
),
|
||||
"SignalComboBox": (
|
||||
"bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox",
|
||||
"SignalComboBox",
|
||||
),
|
||||
"SignalLabel": ("bec_widgets.widgets.utility.signal_label.signal_label", "SignalLabel"),
|
||||
"SpinnerWidget": ("bec_widgets.widgets.utility.spinner.spinner", "SpinnerWidget"),
|
||||
"StopButton": ("bec_widgets.widgets.control.buttons.stop_button.stop_button", "StopButton"),
|
||||
"TextBox": ("bec_widgets.widgets.editors.text_box.text_box", "TextBox"),
|
||||
"ToggleSwitch": ("bec_widgets.widgets.utility.toggle.toggle", "ToggleSwitch"),
|
||||
"Waveform": ("bec_widgets.widgets.plots.waveform.waveform", "Waveform"),
|
||||
"WebsiteWidget": ("bec_widgets.widgets.editors.website.website", "WebsiteWidget"),
|
||||
"WidgetFinderComboBox": (
|
||||
"bec_widgets.widgets.utility.widget_finder.widget_finder",
|
||||
"WidgetFinderComboBox",
|
||||
),
|
||||
}
|
||||
|
||||
widget_icons = {
|
||||
"AbortButton": "cancel",
|
||||
"BECColorMapWidget": "palette",
|
||||
"BECMainWindow": "widgets",
|
||||
"BECProgressBar": "page_control",
|
||||
"BECQueue": "edit_note",
|
||||
"BECShell": "hub",
|
||||
"BECSpinBox": "123",
|
||||
"BECStatusBox": "widgets",
|
||||
"BeamlineStateManager": "format_list_bulleted",
|
||||
"BecConsole": "terminal",
|
||||
"ColorButton": "colors",
|
||||
"ColorButtonNative": "colors",
|
||||
"ColormapSelector": "palette",
|
||||
"DapComboBox": "data_exploration",
|
||||
"DarkModeButton": "dark_mode",
|
||||
"DeviceBrowser": "lists",
|
||||
"DeviceComboBox": "list_alt",
|
||||
"Heatmap": "dataset",
|
||||
"IDEExplorer": "widgets",
|
||||
"Image": "image",
|
||||
"LMFitDialog": "monitoring",
|
||||
"LogPanel": "browse_activity",
|
||||
"Minesweeper": "videogame_asset",
|
||||
"MonacoWidget": "code",
|
||||
"MotorMap": "my_location",
|
||||
"MultiWaveform": "ssid_chart",
|
||||
"PdfViewerWidget": "picture_as_pdf",
|
||||
"PositionIndicator": "horizontal_distribute",
|
||||
"PositionerBox": "switch_right",
|
||||
"PositionerBox2D": "switch_right",
|
||||
"PositionerControlLine": "switch_left",
|
||||
"PositionerGroup": "grid_view",
|
||||
"ResetButton": "restart_alt",
|
||||
"ResumeButton": "resume",
|
||||
"RingProgressBar": "track_changes",
|
||||
"SBBMonitor": "train",
|
||||
"ScanControl": "tune",
|
||||
"ScanMetadata": "list_alt",
|
||||
"ScanProgressBar": "timelapse",
|
||||
"ScatterWaveform": "scatter_plot",
|
||||
"SignalComboBox": "list_alt",
|
||||
"SignalLabel": "scoreboard",
|
||||
"SpinnerWidget": "progress_activity",
|
||||
"StopButton": "dangerous",
|
||||
"TextBox": "chat",
|
||||
"ToggleSwitch": "toggle_on",
|
||||
"Waveform": "show_chart",
|
||||
"WebsiteWidget": "travel_explore",
|
||||
"WidgetFinderComboBox": "frame_inspect",
|
||||
}
|
||||
@@ -7,22 +7,31 @@ import inspect
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import get_overloads
|
||||
|
||||
import black
|
||||
import isort
|
||||
from bec_lib.logger import bec_logger
|
||||
from qtpy.QtCore import Property as QtProperty
|
||||
|
||||
from bec_widgets.utils.generate_designer_plugin import (
|
||||
DesignerPluginGenerator,
|
||||
DesignerPluginInfo,
|
||||
plugin_filenames,
|
||||
)
|
||||
from bec_widgets.utils.generate_designer_plugin import DesignerPluginGenerator, plugin_filenames
|
||||
from bec_widgets.utils.plugin_utils import BECClassContainer, get_custom_classes
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
if sys.version_info >= (3, 11):
|
||||
from typing import get_overloads
|
||||
else:
|
||||
print(
|
||||
"Python version is less than 3.11, using dummy function for get_overloads. "
|
||||
"If you want to use the real function 'typing.get_overloads()', please use Python 3.11 or later."
|
||||
)
|
||||
|
||||
def get_overloads(_obj):
|
||||
"""
|
||||
Dummy function for Python versions before 3.11.
|
||||
"""
|
||||
return []
|
||||
|
||||
|
||||
class ClientGenerator:
|
||||
def __init__(self, base=False):
|
||||
@@ -45,7 +54,7 @@ from __future__ import annotations
|
||||
from bec_lib.logger import bec_logger
|
||||
|
||||
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call, rpc_timeout
|
||||
{"from bec_widgets.utils.bec_plugin_helper import get_plugin_client_module" if self._base else ""}
|
||||
{"from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets, get_plugin_client_module" if self._base else ""}
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
@@ -85,7 +94,7 @@ logger = bec_logger.logger
|
||||
if self._base:
|
||||
self.content += """
|
||||
class _WidgetsEnumType(str, enum.Enum):
|
||||
\"\"\" Enum for the available widgets, to be generated programmatically \"\"\"
|
||||
\"\"\" Enum for the available widgets, to be generated programatically \"\"\"
|
||||
...
|
||||
"""
|
||||
|
||||
@@ -102,19 +111,27 @@ _Widgets = {
|
||||
self.content += """
|
||||
|
||||
try:
|
||||
_plugin_widgets = get_all_plugin_widgets().as_dict()
|
||||
plugin_client = get_plugin_client_module()
|
||||
Widgets = _WidgetsEnumType("Widgets", {name: name for name in _plugin_widgets} | _Widgets)
|
||||
|
||||
if (_overlap := _Widgets.keys() & _plugin_widgets.keys()) != set():
|
||||
for _widget in _overlap:
|
||||
logger.warning(f"Detected duplicate widget {_widget} in plugin repo file: {inspect.getfile(_plugin_widgets[_widget])} !")
|
||||
for plugin_name, plugin_class in inspect.getmembers(plugin_client, inspect.isclass):
|
||||
if issubclass(plugin_class, RPCBase) and plugin_class is not RPCBase:
|
||||
if plugin_name not in _Widgets:
|
||||
_Widgets[plugin_name] = plugin_name
|
||||
if plugin_name in globals():
|
||||
conflicting_file = (
|
||||
inspect.getfile(_plugin_widgets[plugin_name])
|
||||
if plugin_name in _plugin_widgets
|
||||
else f"{plugin_client}"
|
||||
)
|
||||
logger.warning(
|
||||
f"Plugin widget {plugin_name} in {plugin_class._IMPORT_MODULE} conflicts with a built-in class!"
|
||||
f"Plugin widget {plugin_name} from {conflicting_file} conflicts with a built-in class!"
|
||||
)
|
||||
continue
|
||||
else:
|
||||
if plugin_name not in _overlap:
|
||||
globals()[plugin_name] = plugin_class
|
||||
Widgets = _WidgetsEnumType("Widgets", _Widgets)
|
||||
except ImportError as e:
|
||||
logger.error(f"Failed loading plugins: \\n{reduce(add, traceback.format_exception(e))}")
|
||||
"""
|
||||
@@ -129,8 +146,12 @@ except ImportError as e:
|
||||
|
||||
class_name = cls.__name__
|
||||
|
||||
self.content += f"""
|
||||
class {class_name}(RPCBase):\n"""
|
||||
if class_name == "BECDockArea":
|
||||
self.content += f"""
|
||||
class {class_name}(RPCBase):"""
|
||||
else:
|
||||
self.content += f"""
|
||||
class {class_name}(RPCBase):"""
|
||||
|
||||
if cls.__doc__:
|
||||
# We only want the first line of the docstring
|
||||
@@ -141,11 +162,19 @@ class {class_name}(RPCBase):\n"""
|
||||
else:
|
||||
class_docs = cls.__doc__.split("\n")[1]
|
||||
self.content += f"""
|
||||
\"\"\"{class_docs}\"\"\"\n"""
|
||||
user_access_entries = self._get_user_access_entries(cls)
|
||||
self.content += f' _IMPORT_MODULE="{cls.__module__}"\n'
|
||||
for method_entry in user_access_entries:
|
||||
method, obj, is_property_setter = self._resolve_method_object(cls, method_entry)
|
||||
\"\"\"{class_docs}\"\"\"
|
||||
"""
|
||||
if not cls.USER_ACCESS:
|
||||
self.content += """...
|
||||
"""
|
||||
|
||||
for method in cls.USER_ACCESS:
|
||||
is_property_setter = False
|
||||
obj = getattr(cls, method, None)
|
||||
if obj is None:
|
||||
obj = getattr(cls, method.split(".setter")[0], None)
|
||||
is_property_setter = True
|
||||
method = method.split(".setter")[0]
|
||||
if obj is None:
|
||||
raise AttributeError(
|
||||
f"Method {method} not found in class {cls.__name__}. "
|
||||
@@ -187,34 +216,6 @@ class {class_name}(RPCBase):\n"""
|
||||
{doc}
|
||||
\"\"\""""
|
||||
|
||||
@staticmethod
|
||||
def _get_user_access_entries(cls) -> list[str]:
|
||||
entries = list(getattr(cls, "USER_ACCESS", []))
|
||||
content_cls = getattr(cls, "RPC_CONTENT_CLASS", None)
|
||||
if content_cls is not None:
|
||||
entries.extend(getattr(content_cls, "USER_ACCESS", []))
|
||||
return list(dict.fromkeys(entries))
|
||||
|
||||
@staticmethod
|
||||
def _resolve_method_object(cls, method_entry: str):
|
||||
method_name = method_entry
|
||||
is_property_setter = False
|
||||
|
||||
if method_entry.endswith(".setter"):
|
||||
is_property_setter = True
|
||||
method_name = method_entry.split(".setter")[0]
|
||||
|
||||
candidate_classes = [cls]
|
||||
content_cls = getattr(cls, "RPC_CONTENT_CLASS", None)
|
||||
if content_cls is not None:
|
||||
candidate_classes.append(content_cls)
|
||||
|
||||
for candidate_cls in candidate_classes:
|
||||
obj = getattr(candidate_cls, method_name, None)
|
||||
if obj is not None:
|
||||
return method_name, obj, is_property_setter
|
||||
return method_name, None, is_property_setter
|
||||
|
||||
def _rpc_call(self, timeout_info: dict[str, float | None]):
|
||||
"""
|
||||
Decorator to mark a method as an RPC call.
|
||||
@@ -254,58 +255,6 @@ class {class_name}(RPCBase):\n"""
|
||||
file.write(formatted_content)
|
||||
|
||||
|
||||
def write_designer_plugins(plugin_infos: list[DesignerPluginInfo], file_name: str):
|
||||
"""
|
||||
Write a registry of Qt widget classes with designer plugins.
|
||||
|
||||
Args:
|
||||
plugin_infos(list[DesignerPluginInfo]): The designer plugin metadata to write.
|
||||
file_name(str): The name of the file to write to.
|
||||
"""
|
||||
plugin_infos = sorted(plugin_infos, key=lambda info: info.plugin_name_pascal)
|
||||
content = """# This file was automatically generated by generate_cli.py
|
||||
# type: ignore
|
||||
from __future__ import annotations
|
||||
|
||||
# pylint: skip-file
|
||||
|
||||
designer_plugins = {
|
||||
"""
|
||||
for info in plugin_infos:
|
||||
widget_module = info.plugin_class.__module__
|
||||
widget_class = info.plugin_name_pascal
|
||||
content += f' "{info.plugin_name_pascal}": ("{widget_module}", "{widget_class}"),\n'
|
||||
|
||||
content += """
|
||||
}
|
||||
|
||||
widget_icons = {
|
||||
"""
|
||||
for info in plugin_infos:
|
||||
content += f' "{info.plugin_name_pascal}": "{info.icon_name}",\n'
|
||||
|
||||
content += """
|
||||
}
|
||||
"""
|
||||
|
||||
try:
|
||||
formatted_content = black.format_str(content, mode=black.Mode(line_length=100))
|
||||
except black.NothingChanged:
|
||||
formatted_content = content
|
||||
|
||||
config = isort.Config(
|
||||
profile="black",
|
||||
line_length=100,
|
||||
multi_line_output=3,
|
||||
include_trailing_comma=False,
|
||||
known_first_party=["bec_widgets"],
|
||||
)
|
||||
formatted_content = isort.code(formatted_content, config=config)
|
||||
|
||||
with open(file_name, "w", encoding="utf-8") as file:
|
||||
file.write(formatted_content)
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
Main entry point for the script, controlled by command line arguments.
|
||||
@@ -359,8 +308,6 @@ def main():
|
||||
else:
|
||||
non_overwrite_classes = []
|
||||
|
||||
designer_plugin_infos = []
|
||||
|
||||
for cls in rpc_classes.plugins:
|
||||
logger.info(f"Writing bec-designer plugin files for {cls.__name__}...")
|
||||
|
||||
@@ -368,30 +315,21 @@ def main():
|
||||
logger.error(
|
||||
f"Not writing plugin files for {cls.__name__} because a built-in widget with that name exists"
|
||||
)
|
||||
continue
|
||||
|
||||
plugin = DesignerPluginGenerator(cls)
|
||||
if not hasattr(plugin, "info") or plugin.excluded:
|
||||
if not hasattr(plugin, "info"):
|
||||
continue
|
||||
|
||||
def _exists(file: str):
|
||||
return os.path.exists(os.path.join(plugin.info.base_path, file))
|
||||
|
||||
if any(_exists(file) for file in plugin_filenames(plugin.info.plugin_name_snake)):
|
||||
if _exists(plugin.filenames.plugin):
|
||||
designer_plugin_infos.append(plugin.info)
|
||||
logger.debug(
|
||||
f"Skipping generation of extra plugin files for {plugin.info.plugin_name_snake} - at least one file out of 'plugin.py', 'pyproject', and 'register_{plugin.info.plugin_name_snake}.py' already exists."
|
||||
)
|
||||
continue
|
||||
|
||||
plugin.run()
|
||||
designer_plugin_infos.append(plugin.info)
|
||||
|
||||
# Write designer_plugins.py with plugin import metadata for all widgets with designer plugins.
|
||||
designer_plugins_path = module_dir / client_subdir / "designer_plugins.py"
|
||||
logger.info(f"Generating designer plugin registry at {designer_plugins_path}")
|
||||
write_designer_plugins(designer_plugin_infos, str(designer_plugins_path))
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
@@ -2,7 +2,6 @@ from __future__ import annotations
|
||||
|
||||
import inspect
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
from functools import wraps
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
@@ -10,7 +9,6 @@ from typing import TYPE_CHECKING, Any, cast
|
||||
from bec_lib.client import BECClient
|
||||
from bec_lib.device import DeviceBaseWithConfig
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
@@ -26,9 +24,6 @@ else:
|
||||
|
||||
# pylint: disable=protected-access
|
||||
|
||||
_DEFAULT_RPC_TIMEOUT = object()
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
def _name_arg(arg):
|
||||
if isinstance(arg, DeviceBaseWithConfig):
|
||||
@@ -159,7 +154,6 @@ class RPCReference:
|
||||
|
||||
|
||||
class RPCBase:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
gui_id: str | None = None,
|
||||
@@ -213,16 +207,12 @@ class RPCBase:
|
||||
# Use explicit call to ensure action name is 'raise' (not 'raise_')
|
||||
return self._run_rpc("raise")
|
||||
|
||||
def hide(self):
|
||||
"""Hide this widget (or its container)."""
|
||||
return self._run_rpc("hide")
|
||||
|
||||
def _run_rpc(
|
||||
self,
|
||||
method,
|
||||
*args,
|
||||
wait_for_rpc_response: bool = True,
|
||||
timeout: float | None | object = _DEFAULT_RPC_TIMEOUT,
|
||||
wait_for_rpc_response=True,
|
||||
timeout=5,
|
||||
gui_id: str | None = None,
|
||||
**kwargs,
|
||||
) -> Any:
|
||||
@@ -233,16 +223,13 @@ class RPCBase:
|
||||
method: The method to call.
|
||||
args: The arguments to pass to the method.
|
||||
wait_for_rpc_response: Whether to wait for the RPC response.
|
||||
timeout: The timeout for the RPC response. If omitted, the client's default RPC
|
||||
timeout is used. If explicitly set to None, wait indefinitely.
|
||||
timeout: The timeout for the RPC response.
|
||||
gui_id: The GUI ID to use for the RPC call. If None, the default GUI ID is used.
|
||||
kwargs: The keyword arguments to pass to the method.
|
||||
|
||||
Returns:
|
||||
The result of the RPC call.
|
||||
"""
|
||||
if timeout is _DEFAULT_RPC_TIMEOUT:
|
||||
timeout = self._root._rpc_timeout
|
||||
if method in ["show", "hide", "raise"] and gui_id is None:
|
||||
obj = self._root._server_registry.get(self._gui_id)
|
||||
if obj is None:
|
||||
@@ -261,42 +248,17 @@ class RPCBase:
|
||||
self._rpc_response = None
|
||||
self._msg_wait_event.clear()
|
||||
self._client.connector.register(
|
||||
MessageEndpoints.gui_instruction_response(request_id), cb=self._on_rpc_response
|
||||
MessageEndpoints.gui_instruction_response(request_id),
|
||||
cb=self._on_rpc_response,
|
||||
parent=self,
|
||||
)
|
||||
|
||||
target_gui_id = gui_id or self._gui_id
|
||||
sent_at = time.time()
|
||||
deadline = sent_at + timeout if timeout is not None else None
|
||||
rpc_msg.metadata.update(
|
||||
{
|
||||
"method": method,
|
||||
"receiver": receiver,
|
||||
"target_gui_id": target_gui_id,
|
||||
"object_name": self.object_name,
|
||||
"wait_for_response": wait_for_rpc_response,
|
||||
"timeout": timeout,
|
||||
"sent_at": sent_at,
|
||||
"deadline": deadline,
|
||||
}
|
||||
)
|
||||
logger.info(
|
||||
"Sending GUI RPC request "
|
||||
f"request_id={request_id} method={method} receiver={receiver} "
|
||||
f"target_gui_id={target_gui_id} object_name={self.object_name} "
|
||||
f"wait_for_response={wait_for_rpc_response} timeout={timeout}"
|
||||
)
|
||||
self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg)
|
||||
|
||||
if wait_for_rpc_response:
|
||||
try:
|
||||
finished = self._msg_wait_event.wait(timeout)
|
||||
if not finished:
|
||||
logger.error(
|
||||
"GUI RPC response timeout "
|
||||
f"request_id={request_id} method={method} receiver={receiver} "
|
||||
f"target_gui_id={target_gui_id} object_name={self.object_name} "
|
||||
f"timeout={timeout}"
|
||||
)
|
||||
raise RPCResponseTimeoutError(request_id, timeout)
|
||||
finally:
|
||||
self._msg_wait_event.clear()
|
||||
@@ -308,23 +270,17 @@ class RPCBase:
|
||||
# the _on_rpc_response method
|
||||
assert isinstance(self._rpc_response, messages.RequestResponseMessage)
|
||||
|
||||
logger.info(
|
||||
"Received GUI RPC response "
|
||||
f"request_id={request_id} method={method} receiver={receiver} "
|
||||
f"target_gui_id={target_gui_id} object_name={self.object_name} "
|
||||
f"accepted={self._rpc_response.accepted}"
|
||||
)
|
||||
if not self._rpc_response.accepted:
|
||||
raise ValueError(self._rpc_response.message["error"])
|
||||
msg_result = self._rpc_response.message.get("result")
|
||||
self._rpc_response = None
|
||||
return self._create_widget_from_msg_result(msg_result)
|
||||
|
||||
def _on_rpc_response(self, msg_obj: MessageObject) -> None:
|
||||
@staticmethod
|
||||
def _on_rpc_response(msg_obj: MessageObject, parent: RPCBase) -> None:
|
||||
msg = cast(messages.RequestResponseMessage, msg_obj.value)
|
||||
logger.debug(f"GUI RPC response callback received: {msg}")
|
||||
self._rpc_response = msg
|
||||
self._msg_wait_event.set()
|
||||
parent._rpc_response = msg
|
||||
parent._msg_wait_event.set()
|
||||
|
||||
def _create_widget_from_msg_result(self, msg_result):
|
||||
if msg_result is None:
|
||||
@@ -336,11 +292,6 @@ class RPCBase:
|
||||
return {
|
||||
key: self._create_widget_from_msg_result(val) for key, val in msg_result.items()
|
||||
}
|
||||
rpc_enabled = msg_result.get("__rpc__", True)
|
||||
if rpc_enabled is False:
|
||||
return None
|
||||
|
||||
msg_result = dict(msg_result)
|
||||
cls = msg_result.pop("widget_class", None)
|
||||
msg_result.pop("__rpc__", None)
|
||||
|
||||
|
||||
@@ -0,0 +1,57 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from bec_widgets.cli.client_utils import IGNORE_WIDGETS
|
||||
from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets
|
||||
from bec_widgets.utils.bec_widget import BECWidget
|
||||
from bec_widgets.utils.plugin_utils import get_custom_classes
|
||||
|
||||
|
||||
class RPCWidgetHandler:
|
||||
"""Handler class for creating widgets from RPC messages."""
|
||||
|
||||
def __init__(self):
|
||||
self._widget_classes = None
|
||||
|
||||
@property
|
||||
def widget_classes(self) -> dict[str, type[BECWidget]]:
|
||||
"""
|
||||
Get the available widget classes.
|
||||
|
||||
Returns:
|
||||
dict: The available widget classes.
|
||||
"""
|
||||
if self._widget_classes is None:
|
||||
self.update_available_widgets()
|
||||
return self._widget_classes # type: ignore
|
||||
|
||||
def update_available_widgets(self):
|
||||
"""
|
||||
Update the available widgets.
|
||||
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
self._widget_classes = (
|
||||
get_custom_classes("bec_widgets", packages=("widgets", "applications"))
|
||||
+ get_all_plugin_widgets()
|
||||
).as_dict(IGNORE_WIDGETS)
|
||||
|
||||
def create_widget(self, widget_type, **kwargs) -> BECWidget:
|
||||
"""
|
||||
Create a widget from an RPC message.
|
||||
|
||||
Args:
|
||||
widget_type(str): The type of the widget.
|
||||
name (str): The name of the widget.
|
||||
**kwargs: The keyword arguments for the widget.
|
||||
|
||||
Returns:
|
||||
widget(BECWidget): The created widget.
|
||||
"""
|
||||
widget_class = self.widget_classes.get(widget_type) # type: ignore
|
||||
if widget_class:
|
||||
return widget_class(**kwargs)
|
||||
raise ValueError(f"Unknown widget type: {widget_type}")
|
||||
|
||||
|
||||
widget_handler = RPCWidgetHandler()
|
||||
@@ -5,11 +5,9 @@ import json
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
import traceback
|
||||
from contextlib import redirect_stderr, redirect_stdout
|
||||
|
||||
import darkdetect
|
||||
import shiboken6
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_lib.service_config import ServiceConfig
|
||||
from bec_qthemes import apply_theme
|
||||
@@ -20,8 +18,8 @@ from qtpy.QtWidgets import QApplication
|
||||
|
||||
import bec_widgets
|
||||
from bec_widgets.applications.launch_window import LaunchWindow
|
||||
from bec_widgets.cli.rpc.rpc_register import RPCRegister
|
||||
from bec_widgets.utils.bec_dispatcher import BECDispatcher
|
||||
from bec_widgets.utils.rpc_register import RPCRegister
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
@@ -64,7 +62,6 @@ class GUIServer:
|
||||
self.app: QApplication | None = None
|
||||
self.launcher_window: LaunchWindow | None = None
|
||||
self.dispatcher: BECDispatcher | None = None
|
||||
self._shutdown_started = False
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
@@ -76,7 +73,6 @@ class GUIServer:
|
||||
bec_logger._stderr_log_level = bec_logger.LOGLEVEL.ERROR
|
||||
bec_logger._update_sinks()
|
||||
|
||||
bec_logger.disabled_modules = ["bec_lib.scan_items"]
|
||||
with redirect_stdout(SimpleFileLikeFromLogOutputFunc(logger.info)): # type: ignore
|
||||
with redirect_stderr(SimpleFileLikeFromLogOutputFunc(logger.error)): # type: ignore
|
||||
self._run()
|
||||
@@ -97,7 +93,6 @@ class GUIServer:
|
||||
"""
|
||||
Run the GUI server.
|
||||
"""
|
||||
logger.info("Starting GUIServer", repr(self))
|
||||
self.app = QApplication(sys.argv)
|
||||
if darkdetect.isDark():
|
||||
apply_theme("dark")
|
||||
@@ -106,11 +101,11 @@ class GUIServer:
|
||||
|
||||
self.app.setApplicationName("BEC")
|
||||
self.app.gui_id = self.gui_id # type: ignore
|
||||
self.app.gui_server = self # type: ignore # make server accessible from QApplication for getattr in widgets
|
||||
self.setup_bec_icon()
|
||||
|
||||
service_config = self._get_service_config()
|
||||
self.dispatcher = BECDispatcher(config=service_config, gui_id=self.gui_id)
|
||||
# self.dispatcher.start_cli_server(gui_id=self.gui_id)
|
||||
|
||||
if self.gui_class:
|
||||
self.launcher_window = LaunchWindow(
|
||||
@@ -123,10 +118,20 @@ class GUIServer:
|
||||
self.launcher_window.setAttribute(Qt.WA_ShowWithoutActivating) # type: ignore
|
||||
|
||||
self.app.aboutToQuit.connect(self.shutdown)
|
||||
self.app.setQuitOnLastWindowClosed(True)
|
||||
self.app.setQuitOnLastWindowClosed(False)
|
||||
|
||||
signal.signal(signal.SIGINT, self.request_shutdown)
|
||||
signal.signal(signal.SIGTERM, self.request_shutdown)
|
||||
def sigint_handler(*args):
|
||||
# display message, for people to let it terminate gracefully
|
||||
print("Caught SIGINT, exiting")
|
||||
# Widgets should be all closed.
|
||||
with RPCRegister.delayed_broadcast():
|
||||
for widget in QApplication.instance().topLevelWidgets(): # type: ignore
|
||||
widget.close()
|
||||
if self.app:
|
||||
self.app.quit()
|
||||
|
||||
signal.signal(signal.SIGINT, sigint_handler)
|
||||
signal.signal(signal.SIGTERM, sigint_handler)
|
||||
|
||||
sys.exit(self.app.exec())
|
||||
|
||||
@@ -143,67 +148,15 @@ class GUIServer:
|
||||
)
|
||||
self.app.setWindowIcon(icon)
|
||||
|
||||
def request_shutdown(self, signum=None, _frame=None):
|
||||
"""
|
||||
Request Qt application shutdown from an RPC call or OS signal.
|
||||
|
||||
Cleanup itself is handled by ``shutdown()``, which is connected to
|
||||
``QApplication.aboutToQuit``. Calling it directly here would run BEC/RPC
|
||||
teardown before Qt has processed the widget close events.
|
||||
"""
|
||||
signal_name = signal.Signals(signum).name if signum is not None else "shutdown"
|
||||
pid = os.getpid()
|
||||
if self.app is None:
|
||||
logger.info(f"Caught {signal_name}, shutting down GUI server pid={pid} without app")
|
||||
self.shutdown()
|
||||
return
|
||||
|
||||
widgets = [
|
||||
f"{widget.__class__.__name__}(objectName={widget.objectName()!r})"
|
||||
for widget in self.app.topLevelWidgets()
|
||||
]
|
||||
logger.info(
|
||||
f"Caught {signal_name}, requesting GUI server shutdown pid={pid} "
|
||||
f"top_level_widgets={widgets}"
|
||||
)
|
||||
with RPCRegister.delayed_broadcast():
|
||||
for widget in self.app.topLevelWidgets():
|
||||
widget.close()
|
||||
self.app.quit()
|
||||
|
||||
@staticmethod
|
||||
def _run_shutdown_step(step: str, callback):
|
||||
try:
|
||||
callback()
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
f"GUIServer shutdown step failed pid={os.getpid()} step={step}: {exc}\n"
|
||||
f"{traceback.format_exc()}"
|
||||
)
|
||||
|
||||
def shutdown(self):
|
||||
if self._shutdown_started:
|
||||
return
|
||||
self._shutdown_started = True
|
||||
logger.info(f"Shutdown GUIServer pid={os.getpid()} {repr(self)}")
|
||||
|
||||
def close_launcher_window():
|
||||
if self.launcher_window and shiboken6.isValid(self.launcher_window):
|
||||
self.launcher_window.close()
|
||||
self.launcher_window.deleteLater()
|
||||
|
||||
def stop_pylsp_server():
|
||||
if pylsp_server.is_running():
|
||||
pylsp_server.stop()
|
||||
|
||||
def stop_dispatcher():
|
||||
if self.dispatcher:
|
||||
self.dispatcher.stop_cli_server()
|
||||
self.dispatcher.disconnect_all()
|
||||
|
||||
self._run_shutdown_step("close_launcher_window", close_launcher_window)
|
||||
self._run_shutdown_step("stop_pylsp_server", stop_pylsp_server)
|
||||
self._run_shutdown_step("stop_dispatcher", stop_dispatcher)
|
||||
"""
|
||||
Shutdown the GUI server.
|
||||
"""
|
||||
if pylsp_server.is_running():
|
||||
pylsp_server.stop()
|
||||
if self.dispatcher:
|
||||
self.dispatcher.stop_cli_server()
|
||||
self.dispatcher.disconnect_all()
|
||||
|
||||
|
||||
def main():
|
||||
@@ -25,8 +25,8 @@ from qtpy.QtWidgets import (
|
||||
QWidget,
|
||||
)
|
||||
|
||||
from bec_widgets.cli.rpc.rpc_widget_handler import widget_handler
|
||||
from bec_widgets.utils.colors import apply_theme
|
||||
from bec_widgets.utils.rpc_widget_handler import widget_handler
|
||||
from bec_widgets.utils.widget_io import WidgetHierarchy as wh
|
||||
from bec_widgets.widgets.editors.jupyter_console.jupyter_console import BECJupyterConsole
|
||||
|
||||
@@ -206,6 +206,7 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
|
||||
|
||||
def _populate_registry_widgets(self):
|
||||
try:
|
||||
widget_handler.update_available_widgets()
|
||||
items = sorted(widget_handler.widget_classes.keys())
|
||||
except Exception as exc:
|
||||
print(f"Failed to load registered widgets: {exc}")
|
||||
@@ -334,13 +335,20 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
|
||||
|
||||
If kwargs does not contain `object_name`, it will default to the provided shortcut.
|
||||
"""
|
||||
# Ensure registry is loaded
|
||||
widget_handler.update_available_widgets()
|
||||
cls = widget_handler.widget_classes.get(widget_type)
|
||||
if cls is None:
|
||||
raise ValueError(f"Unknown registered widget type: {widget_type}")
|
||||
|
||||
if kwargs is None:
|
||||
kwargs = {"object_name": shortcut}
|
||||
else:
|
||||
kwargs = dict(kwargs)
|
||||
kwargs.setdefault("object_name", shortcut)
|
||||
|
||||
widget = widget_handler.create_widget(widget_type, **kwargs)
|
||||
# Instantiate and add
|
||||
widget = cls(**kwargs)
|
||||
if not isinstance(widget, QWidget):
|
||||
raise TypeError(
|
||||
f"Instantiated object for type '{widget_type}' is not a QWidget: {type(widget)}"
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
# pylint: skip-file
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from bec_lib.config_helper import ConfigHelper
|
||||
from bec_lib.device import Device as BECDevice
|
||||
from bec_lib.device import Positioner as BECPositioner
|
||||
from bec_lib.device import ReadoutPriority
|
||||
@@ -15,7 +14,7 @@ class FakeDevice(BECDevice):
|
||||
super().__init__(name=name)
|
||||
self._enabled = enabled
|
||||
self.signals = {self.name: {"value": 1.0}}
|
||||
self._description = {self.name: {"source": self.name, "dtype": "number", "shape": []}}
|
||||
self.description = {self.name: {"source": self.name, "dtype": "number", "shape": []}}
|
||||
self._readout_priority = readout_priority
|
||||
self._config = {
|
||||
"readoutPriority": "baseline",
|
||||
@@ -74,7 +73,7 @@ class FakeDevice(BECDevice):
|
||||
Returns:
|
||||
dict: Description of the device
|
||||
"""
|
||||
return self._description
|
||||
return self.description
|
||||
|
||||
|
||||
class FakePositioner(BECPositioner):
|
||||
@@ -96,7 +95,7 @@ class FakePositioner(BECPositioner):
|
||||
self._limits = limits
|
||||
self._readout_priority = readout_priority
|
||||
self.signals = {self.name: {"value": 1.0}}
|
||||
self._description = {self.name: {"source": self.name, "dtype": "number", "shape": []}}
|
||||
self.description = {self.name: {"source": self.name, "dtype": "number", "shape": []}}
|
||||
self._config = {
|
||||
"readoutPriority": "baseline",
|
||||
"deviceClass": "ophyd_devices.SimPositioner",
|
||||
@@ -176,7 +175,7 @@ class FakePositioner(BECPositioner):
|
||||
Returns:
|
||||
dict: Description of the device
|
||||
"""
|
||||
return self._description
|
||||
return self.description
|
||||
|
||||
@property
|
||||
def precision(self):
|
||||
@@ -220,9 +219,7 @@ class Device(FakeDevice):
|
||||
|
||||
|
||||
class DMMock:
|
||||
def __init__(self, *args, **kwargs):
|
||||
self._service = args[0]
|
||||
self.config_helper = ConfigHelper(self._service.connector, self._service._service_name)
|
||||
def __init__(self):
|
||||
self.devices = DeviceContainer()
|
||||
self.enabled_devices = [device for device in self.devices if device.enabled]
|
||||
|
||||
@@ -276,10 +273,6 @@ class DMMock:
|
||||
configs.append(device._config)
|
||||
return configs
|
||||
|
||||
def initialize(*_): ...
|
||||
|
||||
def shutdown(self): ...
|
||||
|
||||
|
||||
DEVICES = [
|
||||
FakePositioner("samx", limits=[-10, 10], read_value=2.0),
|
||||
|
||||
@@ -1 +1,13 @@
|
||||
from qtpy.QtWebEngineWidgets import QWebEngineView
|
||||
|
||||
from .bec_connector import BECConnector, ConnectionConfig
|
||||
from .bec_dispatcher import BECDispatcher
|
||||
from .bec_table import BECTable
|
||||
from .colors import Colors
|
||||
from .container_utils import WidgetContainerUtils
|
||||
from .crosshair import Crosshair
|
||||
from .entry_validator import EntryValidator
|
||||
from .layout_manager import GridLayoutManager
|
||||
from .rpc_decorator import register_rpc_methods, rpc_public
|
||||
from .ui_loader import UILoader
|
||||
from .validator_delegate import DoubleValidationDelegate
|
||||
|
||||
@@ -15,9 +15,9 @@ from pydantic import BaseModel, Field, field_validator
|
||||
from qtpy.QtCore import Property, QObject, QRunnable, QThreadPool, Signal
|
||||
from qtpy.QtWidgets import QApplication
|
||||
|
||||
from bec_widgets.cli.rpc.rpc_register import RPCRegister
|
||||
from bec_widgets.utils.error_popups import ErrorPopupUtility, SafeSlot
|
||||
from bec_widgets.utils.name_utils import sanitize_namespace
|
||||
from bec_widgets.utils.rpc_register import RPCRegister
|
||||
from bec_widgets.utils.widget_io import WidgetHierarchy
|
||||
from bec_widgets.utils.yaml_dialog import load_yaml, load_yaml_gui, save_yaml, save_yaml_gui
|
||||
|
||||
@@ -88,8 +88,6 @@ class BECConnector:
|
||||
gui_id: str | None = None,
|
||||
object_name: str | None = None,
|
||||
root_widget: bool = False,
|
||||
rpc_exposed: bool = True,
|
||||
rpc_passthrough_children: bool = True,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
@@ -101,10 +99,6 @@ class BECConnector:
|
||||
gui_id(str, optional): The GUI ID.
|
||||
object_name(str, optional): The object name.
|
||||
root_widget(bool, optional): If set to True, the parent_id will be always set to None, thus enforcing that the widget is accessible as a root widget of the BECGuiClient object.
|
||||
rpc_exposed(bool, optional): If set to False, this instance is excluded from RPC registry broadcast and CLI namespace discovery.
|
||||
rpc_passthrough_children(bool, optional): Only relevant when ``rpc_exposed=False``.
|
||||
If True, RPC-visible children rebind to the next visible ancestor.
|
||||
If False (default), children stay hidden behind this widget.
|
||||
**kwargs:
|
||||
"""
|
||||
# Extract object_name from kwargs to not pass it to Qt class
|
||||
@@ -133,13 +127,8 @@ class BECConnector:
|
||||
# the function depends on BECClient, and BECDispatcher
|
||||
@SafeSlot()
|
||||
def terminate(client=self.client, dispatcher=self.bec_dispatcher):
|
||||
app = QApplication.instance()
|
||||
gui_server = getattr(app, "gui_server", None)
|
||||
if gui_server and hasattr(gui_server, "shutdown"):
|
||||
gui_server.shutdown()
|
||||
logger.info("Disconnecting", repr(dispatcher))
|
||||
dispatcher.disconnect_all()
|
||||
dispatcher.stop_cli_server()
|
||||
|
||||
try: # shutdown ophyd threads if any
|
||||
from ophyd._pyepics_shim import _dispatcher
|
||||
@@ -167,7 +156,7 @@ class BECConnector:
|
||||
)
|
||||
self.config = ConnectionConfig(widget_class=self.__class__.__name__)
|
||||
|
||||
# If the gui_id is passed, it should be respected. However, this should be revisited since
|
||||
# If the gui_id is passed, it should be respected. However, this should be revisted since
|
||||
# the gui_id has to be unique, and may no longer be.
|
||||
if gui_id:
|
||||
self.config.gui_id = gui_id
|
||||
@@ -195,11 +184,6 @@ class BECConnector:
|
||||
|
||||
# If set to True, the parent_id will be always set to None, thus enforcing that the widget is accessible as a root widget of the BECGuiClient object.
|
||||
self.root_widget = root_widget
|
||||
# If set to False, this instance is not exposed through RPC at all.
|
||||
self.rpc_exposed = bool(rpc_exposed)
|
||||
# If True on a hidden parent (rpc_exposed=False), children can bubble up to
|
||||
# the next visible RPC ancestor.
|
||||
self.rpc_passthrough_children = bool(rpc_passthrough_children)
|
||||
|
||||
self._update_object_name()
|
||||
|
||||
@@ -208,41 +192,11 @@ class BECConnector:
|
||||
try:
|
||||
if self.root_widget:
|
||||
return None
|
||||
connector_parent = self._get_rpc_parent_ancestor()
|
||||
connector_parent = WidgetHierarchy._get_becwidget_ancestor(self)
|
||||
return connector_parent.gui_id if connector_parent else None
|
||||
except:
|
||||
logger.error(f"Error getting parent_id for {self.__class__.__name__}")
|
||||
|
||||
def _get_rpc_parent_ancestor(self) -> BECConnector | None:
|
||||
"""
|
||||
Find the nearest ancestor that is RPC-addressable.
|
||||
|
||||
Rules:
|
||||
- If an ancestor has ``rpc_exposed=False``, it is an explicit visibility
|
||||
boundary unless ``rpc_passthrough_children=True``.
|
||||
- If an ancestor has ``RPC=False`` (but remains rpc_exposed), it is treated
|
||||
as structural and children continue to the next ancestor.
|
||||
- Lookup always happens through ``WidgetHierarchy.get_becwidget_ancestor``
|
||||
so plain ``QWidget`` nodes between connectors are ignored.
|
||||
"""
|
||||
current = self
|
||||
while True:
|
||||
parent = WidgetHierarchy.get_becwidget_ancestor(current)
|
||||
if parent is None:
|
||||
return None
|
||||
|
||||
if not getattr(parent, "rpc_exposed", True):
|
||||
if getattr(parent, "rpc_passthrough_children", False):
|
||||
current = parent
|
||||
continue
|
||||
return parent
|
||||
|
||||
if getattr(parent, "RPC", True):
|
||||
return parent
|
||||
|
||||
current = parent
|
||||
return None
|
||||
|
||||
def change_object_name(self, name: str) -> None:
|
||||
"""
|
||||
Change the object name of the widget. Unregister old name and register the new one.
|
||||
@@ -261,9 +215,8 @@ class BECConnector:
|
||||
"""
|
||||
# 1) Enforce unique objectName among siblings with the same BECConnector parent
|
||||
self._enforce_unique_sibling_name()
|
||||
# 2) Register the object for RPC unless instance-level exposure is disabled.
|
||||
if getattr(self, "rpc_exposed", True):
|
||||
self.rpc_register.add_rpc(self)
|
||||
# 2) Register the object for RPC
|
||||
self.rpc_register.add_rpc(self)
|
||||
try:
|
||||
self.name_established.emit(self.object_name)
|
||||
except RuntimeError as e:
|
||||
@@ -281,7 +234,7 @@ class BECConnector:
|
||||
if not shb.isValid(self):
|
||||
return
|
||||
|
||||
parent_bec = WidgetHierarchy.get_becwidget_ancestor(self)
|
||||
parent_bec = WidgetHierarchy._get_becwidget_ancestor(self)
|
||||
|
||||
if parent_bec:
|
||||
# We have a parent => only compare with siblings under that parent
|
||||
@@ -291,7 +244,7 @@ class BECConnector:
|
||||
# Use RPCRegister to avoid QApplication.allWidgets() during event processing.
|
||||
connections = self.rpc_register.list_all_connections().values()
|
||||
all_bec = [w for w in connections if isinstance(w, BECConnector) and shb.isValid(w)]
|
||||
siblings = [w for w in all_bec if WidgetHierarchy.get_becwidget_ancestor(w) is None]
|
||||
siblings = [w for w in all_bec if WidgetHierarchy._get_becwidget_ancestor(w) is None]
|
||||
|
||||
# Collect used names among siblings
|
||||
used_names = {sib.objectName() for sib in siblings if sib is not self}
|
||||
@@ -399,7 +352,7 @@ class BECConnector:
|
||||
"""
|
||||
self.config = config
|
||||
|
||||
# FIXME some thoughts are required to decide how this should work with rpc registry
|
||||
# FIXME some thoughts are required to decide how thhis should work with rpc registry
|
||||
def apply_config(self, config: dict, generate_new_id: bool = True) -> None:
|
||||
"""
|
||||
Apply the configuration to the widget.
|
||||
@@ -417,7 +370,7 @@ class BECConnector:
|
||||
else:
|
||||
self.gui_id = self.config.gui_id
|
||||
|
||||
# FIXME some thoughts are required to decide how this should work with rpc registry
|
||||
# FIXME some thoughts are required to decide how thhis should work with rpc registry
|
||||
def load_config(self, path: str | None = None, gui: bool = False):
|
||||
"""
|
||||
Load the configuration of the widget from YAML.
|
||||
|
||||
@@ -3,9 +3,8 @@ from __future__ import annotations
|
||||
import collections
|
||||
import random
|
||||
import string
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from typing import TYPE_CHECKING, Any, DefaultDict, Hashable, Union
|
||||
from typing import TYPE_CHECKING, DefaultDict, Hashable, Union
|
||||
|
||||
import louie
|
||||
import redis
|
||||
@@ -16,7 +15,6 @@ from bec_lib.service_config import ServiceConfig
|
||||
from qtpy.QtCore import QObject
|
||||
from qtpy.QtCore import Signal as pyqtSignal
|
||||
|
||||
from bec_widgets.utils.rpc_logging import elapsed_seconds, format_elapsed
|
||||
from bec_widgets.utils.serialization import register_serializer_extension
|
||||
|
||||
logger = bec_logger.logger
|
||||
@@ -27,39 +25,6 @@ if TYPE_CHECKING: # pragma: no cover
|
||||
from bec_widgets.utils.rpc_server import RPCServer
|
||||
|
||||
|
||||
def _log_rpc_dispatcher_receive(msg_content: Any, metadata: Any) -> None:
|
||||
if not isinstance(msg_content, dict) or not isinstance(metadata, dict):
|
||||
return
|
||||
request_id = metadata.get("request_id")
|
||||
method = msg_content.get("action")
|
||||
parameter = msg_content.get("parameter")
|
||||
if request_id is None or method is None or not isinstance(parameter, dict):
|
||||
return
|
||||
|
||||
dispatch_received_at = time.time()
|
||||
sent_at = metadata.get("sent_at")
|
||||
deadline = metadata.get("deadline")
|
||||
timeout = metadata.get("timeout")
|
||||
dispatch_latency = elapsed_seconds(sent_at, dispatch_received_at)
|
||||
stale_on_dispatch = deadline is not None and dispatch_received_at > deadline
|
||||
target_gui_id = parameter.get("gui_id") or metadata.get("target_gui_id")
|
||||
|
||||
logger.info(
|
||||
"GUI RPC dispatcher received request before Qt callback emit "
|
||||
f"request_id={request_id} method={method} receiver={metadata.get('receiver')} "
|
||||
f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} "
|
||||
f"timeout={timeout} dispatch_latency_s={format_elapsed(dispatch_latency)} "
|
||||
f"stale_on_dispatch={stale_on_dispatch}"
|
||||
)
|
||||
if stale_on_dispatch:
|
||||
logger.warning(
|
||||
"GUI RPC dispatcher received request after client timeout deadline "
|
||||
f"request_id={request_id} method={method} receiver={metadata.get('receiver')} "
|
||||
f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} "
|
||||
f"timeout={timeout} dispatch_latency_s={format_elapsed(dispatch_latency)}"
|
||||
)
|
||||
|
||||
|
||||
class QtThreadSafeCallback(QObject):
|
||||
"""QtThreadSafeCallback is a wrapper around a callback function to make it thread-safe for Qt."""
|
||||
|
||||
@@ -123,12 +88,10 @@ class QtRedisConnector(RedisConnector):
|
||||
|
||||
# we can notice kwargs are lost when passed to Qt slot
|
||||
metadata = msg.metadata
|
||||
_log_rpc_dispatcher_receive(msg.content, metadata)
|
||||
cb(msg.content, metadata)
|
||||
else:
|
||||
# from stream
|
||||
msg = msg["data"]
|
||||
_log_rpc_dispatcher_receive(msg.content, msg.metadata)
|
||||
cb(msg.content, msg.metadata)
|
||||
|
||||
|
||||
@@ -160,16 +123,17 @@ class BECDispatcher:
|
||||
self._registered_slots: DefaultDict[Hashable, QtThreadSafeCallback] = (
|
||||
collections.defaultdict()
|
||||
)
|
||||
self.client = client
|
||||
|
||||
if client is None:
|
||||
if config is not None and not isinstance(config, ServiceConfig):
|
||||
# config is supposed to be a path
|
||||
config = ServiceConfig(config)
|
||||
if self.client is None:
|
||||
if config is not None:
|
||||
if not isinstance(config, ServiceConfig):
|
||||
# config is supposed to be a path
|
||||
config = ServiceConfig(config)
|
||||
self.client = BECClient(
|
||||
config=config, connector_cls=QtRedisConnector, name="BECWidgets"
|
||||
)
|
||||
else:
|
||||
self.client = client
|
||||
if self.client.started:
|
||||
# have to reinitialize client to use proper connector
|
||||
logger.info("Shutting down BECClient to switch to QtRedisConnector")
|
||||
@@ -212,15 +176,12 @@ class BECDispatcher:
|
||||
cb_info (dict | None): A dictionary containing information about the callback. Defaults to None.
|
||||
"""
|
||||
qt_slot = QtThreadSafeCallback(cb=slot, cb_info=cb_info)
|
||||
if not self.client.connector.any_stream_is_registered(topics, qt_slot):
|
||||
if qt_slot not in self._registered_slots:
|
||||
self._registered_slots[qt_slot] = qt_slot
|
||||
qt_slot = self._registered_slots[qt_slot]
|
||||
self.client.connector.register(topics, cb=qt_slot, **kwargs)
|
||||
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
|
||||
qt_slot.topics.update(set(topics_str))
|
||||
else:
|
||||
logger.warning(f"Attempted to create duplicate stream subscription for {topics=}")
|
||||
if qt_slot not in self._registered_slots:
|
||||
self._registered_slots[qt_slot] = qt_slot
|
||||
qt_slot = self._registered_slots[qt_slot]
|
||||
self.client.connector.register(topics, cb=qt_slot, **kwargs)
|
||||
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
|
||||
qt_slot.topics.update(set(topics_str))
|
||||
|
||||
def disconnect_slot(
|
||||
self, slot: Callable, topics: EndpointInfo | str | list[EndpointInfo] | list[str]
|
||||
|
||||
@@ -20,13 +20,15 @@ class BECLogin(QWidget):
|
||||
|
||||
title = QLabel("Sign in", parent=self)
|
||||
title.setAlignment(Qt.AlignmentFlag.AlignCenter)
|
||||
title.setStyleSheet("""
|
||||
title.setStyleSheet(
|
||||
"""
|
||||
#QLabel
|
||||
{
|
||||
font-size: 18px;
|
||||
font-weight: 600;
|
||||
}
|
||||
""")
|
||||
"""
|
||||
)
|
||||
|
||||
self.username = QLineEdit(parent=self)
|
||||
self.username.setPlaceholderText("Username")
|
||||
@@ -55,11 +57,13 @@ class BECLogin(QWidget):
|
||||
|
||||
self.username.setFocus()
|
||||
|
||||
self.setStyleSheet("""
|
||||
self.setStyleSheet(
|
||||
"""
|
||||
QLineEdit {
|
||||
padding: 8px;
|
||||
}
|
||||
""")
|
||||
"""
|
||||
)
|
||||
|
||||
def _clear_password(self):
|
||||
"""Clear the password field."""
|
||||
|
||||
@@ -4,7 +4,6 @@ import importlib.metadata
|
||||
import inspect
|
||||
import pkgutil
|
||||
import traceback
|
||||
from functools import lru_cache
|
||||
from importlib import util as importlib_util
|
||||
from importlib.machinery import FileFinder, ModuleSpec, SourceFileLoader
|
||||
from types import ModuleType
|
||||
@@ -12,11 +11,7 @@ from typing import Generator
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
|
||||
from bec_widgets.utils.plugin_utils import (
|
||||
BECClassContainer,
|
||||
BECClassInfo,
|
||||
rpc_widget_registry_from_source,
|
||||
)
|
||||
from bec_widgets.utils.plugin_utils import BECClassContainer, BECClassInfo
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
@@ -58,14 +53,6 @@ def _submodule_by_name(module: ModuleType, name: str):
|
||||
return None
|
||||
|
||||
|
||||
def _submodule_spec_by_name(module: ModuleType, name: str) -> ModuleSpec | None:
|
||||
for module_info in pkgutil.iter_modules(module.__path__):
|
||||
if module_info.name != name or not isinstance(module_info.module_finder, FileFinder):
|
||||
continue
|
||||
return module_info.module_finder.find_spec(module_info.name)
|
||||
return None
|
||||
|
||||
|
||||
def _get_widgets_from_module(module: ModuleType) -> BECClassContainer:
|
||||
"""Find any BECWidget subclasses in the given module and return them with their info."""
|
||||
from bec_widgets.utils.bec_widget import BECWidget # avoid circular import
|
||||
@@ -103,64 +90,16 @@ def get_plugin_client_module() -> ModuleType | None:
|
||||
return _submodule_by_name(plugin, "client") if (plugin := user_widget_plugin()) else None
|
||||
|
||||
|
||||
def get_plugin_designer_module() -> ModuleType | None:
|
||||
"""If there is a plugin repository installed, return the designer module."""
|
||||
return (
|
||||
_submodule_by_name(plugin, "designer_plugins") if (plugin := user_widget_plugin()) else None
|
||||
)
|
||||
|
||||
|
||||
@lru_cache
|
||||
def get_plugin_rpc_widget_registry() -> dict[str, tuple[str, str]]:
|
||||
"""If there is a plugin repository installed, return the RPC widget registry."""
|
||||
plugin = user_widget_plugin()
|
||||
if plugin is None:
|
||||
return {}
|
||||
|
||||
client_spec = _submodule_spec_by_name(plugin, "client")
|
||||
if client_spec is not None and client_spec.origin:
|
||||
try:
|
||||
return rpc_widget_registry_from_source(client_spec.origin)
|
||||
except (OSError, SyntaxError) as exc:
|
||||
logger.warning(f"Could not parse plugin RPC widget registry: {exc}")
|
||||
|
||||
client_module = get_plugin_client_module()
|
||||
if client_module is None:
|
||||
return {}
|
||||
registry = {}
|
||||
for plugin_name, plugin_class in inspect.getmembers(client_module, inspect.isclass):
|
||||
if hasattr(plugin_class, "_IMPORT_MODULE"):
|
||||
registry[plugin_name] = (plugin_class._IMPORT_MODULE, plugin_class.__name__)
|
||||
return registry
|
||||
|
||||
|
||||
@lru_cache
|
||||
def get_plugin_designer_registry() -> dict[str, tuple[str, str]]:
|
||||
"""If there is a plugin repository installed, return the designer plugin registry."""
|
||||
designer_module = get_plugin_designer_module()
|
||||
if designer_module and hasattr(designer_module, "designer_plugins"):
|
||||
return designer_module.designer_plugins
|
||||
return {}
|
||||
|
||||
|
||||
@lru_cache
|
||||
def get_plugin_widget_icons() -> dict[str, str]:
|
||||
"""If there is a plugin repository installed, return the designer widget icon registry."""
|
||||
designer_module = get_plugin_designer_module()
|
||||
if designer_module and hasattr(designer_module, "widget_icons"):
|
||||
return designer_module.widget_icons
|
||||
return {}
|
||||
|
||||
|
||||
def get_all_plugin_widgets() -> BECClassContainer:
|
||||
"""If there is a plugin repository installed, load all widgets from it."""
|
||||
if plugin := user_widget_plugin():
|
||||
return _all_widgets_from_all_submods(plugin)
|
||||
return BECClassContainer()
|
||||
else:
|
||||
return BECClassContainer()
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
widgets = get_plugin_rpc_widget_registry()
|
||||
|
||||
client = get_plugin_client_module()
|
||||
print(get_all_plugin_widgets())
|
||||
...
|
||||
|
||||
@@ -10,16 +10,17 @@ from qtpy.QtGui import QFont, QPixmap
|
||||
from qtpy.QtWidgets import QApplication, QFileDialog, QLabel, QVBoxLayout, QWidget
|
||||
|
||||
import bec_widgets.widgets.containers.qt_ads as QtAds
|
||||
from bec_widgets.cli.rpc.rpc_register import RPCRegister
|
||||
from bec_widgets.utils.bec_connector import BECConnector, ConnectionConfig
|
||||
from bec_widgets.utils.busy_loader import install_busy_loader
|
||||
from bec_widgets.utils.error_popups import SafeConnect, SafeSlot
|
||||
from bec_widgets.utils.rpc_decorator import rpc_timeout
|
||||
from bec_widgets.utils.rpc_register import RPCRegister
|
||||
from bec_widgets.utils.widget_io import WidgetHierarchy
|
||||
from bec_widgets.widgets.utility.spinner.spinner import SpinnerWidget
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from bec_widgets.utils.busy_loader import BusyLoaderOverlay
|
||||
from bec_widgets.widgets.containers.dock import BECDock
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
@@ -330,34 +331,32 @@ class BECWidget(BECConnector):
|
||||
# All widgets need to call super().cleanup() in their cleanup method
|
||||
logger.info(f"Registry cleanup for widget {self.__class__.__name__}")
|
||||
self.rpc_register.remove_rpc(self)
|
||||
children = self.findChildren(BECWidget)
|
||||
for child in children:
|
||||
if not shiboken6.isValid(child):
|
||||
# If the child is not valid, it means it has already been deleted
|
||||
continue
|
||||
child.close()
|
||||
child.deleteLater()
|
||||
children = self.findChildren(BECWidget)
|
||||
for child in children:
|
||||
if not shiboken6.isValid(child):
|
||||
# If the child is not valid, it means it has already been deleted
|
||||
continue
|
||||
child.close()
|
||||
child.deleteLater()
|
||||
|
||||
# Tear down busy overlay explicitly to stop spinner and remove filters
|
||||
overlay = getattr(self, "_busy_overlay", None)
|
||||
if overlay is not None and shiboken6.isValid(overlay):
|
||||
try:
|
||||
overlay.hide()
|
||||
filt = getattr(overlay, "_filter", None)
|
||||
if filt is not None and shiboken6.isValid(filt):
|
||||
try:
|
||||
self.removeEventFilter(filt)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
f"Failed to remove event filter from busy overlay: {exc}"
|
||||
)
|
||||
# Tear down busy overlay explicitly to stop spinner and remove filters
|
||||
overlay = getattr(self, "_busy_overlay", None)
|
||||
if overlay is not None and shiboken6.isValid(overlay):
|
||||
try:
|
||||
overlay.hide()
|
||||
filt = getattr(overlay, "_filter", None)
|
||||
if filt is not None and shiboken6.isValid(filt):
|
||||
try:
|
||||
self.removeEventFilter(filt)
|
||||
except Exception as exc:
|
||||
logger.warning(f"Failed to remove event filter from busy overlay: {exc}")
|
||||
|
||||
# Cleanup the overlay widget. This will call cleanup on the custom widget if present.
|
||||
# Cleanup the overlay widget. This will call cleanup on the custom widget if present.
|
||||
|
||||
overlay.cleanup()
|
||||
overlay.deleteLater()
|
||||
except Exception as exc:
|
||||
logger.warning(f"Failed to delete busy overlay: {exc}")
|
||||
overlay.cleanup()
|
||||
overlay.deleteLater()
|
||||
except Exception as exc:
|
||||
logger.warning(f"Failed to delete busy overlay: {exc}")
|
||||
|
||||
def closeEvent(self, event):
|
||||
"""Wrap the close even to ensure the rpc_register is cleaned up."""
|
||||
|
||||
+65
-34
@@ -2,7 +2,7 @@ from __future__ import annotations
|
||||
|
||||
import re
|
||||
from functools import lru_cache
|
||||
from typing import Any, Literal
|
||||
from typing import Literal
|
||||
|
||||
import numpy as np
|
||||
import pyqtgraph as pg
|
||||
@@ -21,7 +21,8 @@ logger = bec_logger.logger
|
||||
def get_theme_name():
|
||||
if QApplication.instance() is None or not hasattr(QApplication.instance(), "theme"):
|
||||
return "dark"
|
||||
return QApplication.instance().theme.theme
|
||||
else:
|
||||
return QApplication.instance().theme.theme
|
||||
|
||||
|
||||
def get_theme_palette():
|
||||
@@ -57,25 +58,6 @@ def apply_theme(theme: Literal["dark", "light"]):
|
||||
process_all_deferred_deletes(QApplication.instance())
|
||||
|
||||
|
||||
def theme_color(theme: Any | None, key: str, fallback: QColor | str) -> QColor:
|
||||
"""
|
||||
Return a QColor from a BEC theme, or the fallback when no theme is set.
|
||||
"""
|
||||
|
||||
fallback_color = fallback if isinstance(fallback, QColor) else QColor(str(fallback))
|
||||
if theme is None:
|
||||
return fallback_color
|
||||
return theme.color(key, fallback_color.name())
|
||||
|
||||
|
||||
def rgba(color: QColor | str, alpha: int) -> str:
|
||||
"""
|
||||
Return a QSS-compatible rgba string.
|
||||
"""
|
||||
qcolor = color if isinstance(color, QColor) else QColor(str(color))
|
||||
return f"rgba({qcolor.red()}, {qcolor.green()}, {qcolor.blue()}, {alpha})"
|
||||
|
||||
|
||||
class Colors:
|
||||
@staticmethod
|
||||
def list_available_colormaps() -> list[str]:
|
||||
@@ -168,6 +150,25 @@ class Colors:
|
||||
|
||||
return ge.colorMap()
|
||||
|
||||
@staticmethod
|
||||
def golden_ratio(num: int) -> list:
|
||||
"""Calculate the golden ratio for a given number of angles.
|
||||
|
||||
Args:
|
||||
num (int): Number of angles
|
||||
|
||||
Returns:
|
||||
list: List of angles calculated using the golden ratio.
|
||||
"""
|
||||
phi = 2 * np.pi * ((1 + np.sqrt(5)) / 2)
|
||||
angles = []
|
||||
for ii in range(num):
|
||||
x = np.cos(ii * phi)
|
||||
y = np.sin(ii * phi)
|
||||
angle = np.arctan2(y, x)
|
||||
angles.append(angle)
|
||||
return angles
|
||||
|
||||
@staticmethod
|
||||
def set_theme_offset(theme: Literal["light", "dark"] | None = None, offset=0.2) -> tuple:
|
||||
"""
|
||||
@@ -238,7 +239,20 @@ class Colors:
|
||||
else:
|
||||
positions = np.linspace(min_pos, max_pos, num)
|
||||
|
||||
return Colors._format_mapped_colors(cmap.map(positions, mode="float"), format)
|
||||
# Sample colors from the colormap at the calculated positions
|
||||
colors = cmap.map(positions, mode="float")
|
||||
color_list = []
|
||||
|
||||
for color in colors:
|
||||
if format.upper() == "HEX":
|
||||
color_list.append(QColor.fromRgbF(*color).name())
|
||||
elif format.upper() == "RGB":
|
||||
color_list.append(tuple((np.array(color) * 255).astype(int)))
|
||||
elif format.upper() == "QCOLOR":
|
||||
color_list.append(QColor.fromRgbF(*color))
|
||||
else:
|
||||
raise ValueError("Unsupported format. Please choose 'RGB', 'HEX', or 'QColor'.")
|
||||
return color_list
|
||||
|
||||
@staticmethod
|
||||
def golden_angle_color(
|
||||
@@ -274,19 +288,20 @@ class Colors:
|
||||
positions = np.mod(np.arange(num) * golden_angle_conjugate, 1)
|
||||
positions = min_pos + positions * (max_pos - min_pos)
|
||||
|
||||
return Colors._format_mapped_colors(cmap.map(positions, mode="float"), format)
|
||||
# Sample colors from the colormap at the calculated positions
|
||||
colors = cmap.map(positions, mode="float")
|
||||
color_list = []
|
||||
|
||||
@staticmethod
|
||||
def _format_mapped_colors(colors: np.ndarray, format: Literal["QColor", "HEX", "RGB"]) -> list:
|
||||
color_format = format.upper()
|
||||
if color_format not in {"QCOLOR", "HEX", "RGB"}:
|
||||
raise ValueError("Unsupported format. Please choose 'RGB', 'HEX', or 'QColor'.")
|
||||
|
||||
if color_format == "QCOLOR":
|
||||
return [QColor.fromRgbF(*color) for color in colors]
|
||||
if color_format == "HEX":
|
||||
return [QColor.fromRgbF(*color).name() for color in colors]
|
||||
return [tuple((np.array(color) * 255).astype(int)) for color in colors]
|
||||
for color in colors:
|
||||
if format.upper() == "HEX":
|
||||
color_list.append(QColor.fromRgbF(*color).name())
|
||||
elif format.upper() == "RGB":
|
||||
color_list.append(tuple((np.array(color) * 255).astype(int)))
|
||||
elif format.upper() == "QCOLOR":
|
||||
color_list.append(QColor.fromRgbF(*color))
|
||||
else:
|
||||
raise ValueError("Unsupported format. Please choose 'RGB', 'HEX', or 'QColor'.")
|
||||
return color_list
|
||||
|
||||
@staticmethod
|
||||
def hex_to_rgba(hex_color: str, alpha=255) -> tuple:
|
||||
@@ -310,6 +325,22 @@ class Colors:
|
||||
raise ValueError("HEX color must be 6 or 8 characters long.")
|
||||
return (r, g, b, alpha)
|
||||
|
||||
@staticmethod
|
||||
def rgba_to_hex(r: int, g: int, b: int, a: int = 255) -> str:
|
||||
"""
|
||||
Convert RGBA color to HEX.
|
||||
|
||||
Args:
|
||||
r(int): Red value (0-255).
|
||||
g(int): Green value (0-255).
|
||||
b(int): Blue value (0-255).
|
||||
a(int): Alpha value (0-255). Default is 255 (opaque).
|
||||
|
||||
Returns:
|
||||
hec_color(str): HEX color string.
|
||||
"""
|
||||
return "#{:02X}{:02X}{:02X}{:02X}".format(r, g, b, a)
|
||||
|
||||
@staticmethod
|
||||
def validate_color(color: tuple | str) -> tuple | str:
|
||||
"""
|
||||
|
||||
@@ -43,7 +43,7 @@ class WidgetContainerUtils:
|
||||
if list_of_names is None:
|
||||
list_of_names = []
|
||||
ii = 0
|
||||
while ii < 1000: # 1000 is arbitrary!
|
||||
while ii < 1000: # 1000 is arbritrary!
|
||||
name_candidate = f"{name}_{ii}"
|
||||
if name_candidate not in list_of_names:
|
||||
return name_candidate
|
||||
|
||||
+318
-91
@@ -1,12 +1,12 @@
|
||||
"""Small helpers for populating editable combo boxes used by device inputs."""
|
||||
"""Module for handling filter I/O operations in BEC Widgets for input fields.
|
||||
These operations include filtering device/signal names and/or device types.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from contextlib import nullcontext
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from qtpy.QtCore import QSignalBlocker
|
||||
from qtpy.QtWidgets import QComboBox
|
||||
from qtpy.QtCore import QStringListModel
|
||||
from qtpy.QtWidgets import QComboBox, QCompleter, QLineEdit
|
||||
from typeguard import TypeCheckError
|
||||
|
||||
from bec_widgets.utils.ophyd_kind_util import Kind
|
||||
@@ -14,102 +14,329 @@ from bec_widgets.utils.ophyd_kind_util import Kind
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
def replace_combobox_items(
|
||||
combo_box: QComboBox,
|
||||
items: list[str | tuple],
|
||||
*,
|
||||
preserve_current_text: bool = False,
|
||||
block_signals: bool = False,
|
||||
) -> None:
|
||||
"""Replace all combobox entries.
|
||||
class WidgetFilterHandler(ABC):
|
||||
"""Abstract base class for widget filter handlers"""
|
||||
|
||||
Args:
|
||||
combo_box: Combobox whose entries should be replaced.
|
||||
items: Entries to add. String entries are added as display text. Tuple entries are
|
||||
passed to ``QComboBox.addItem`` as ``(text, data)``.
|
||||
preserve_current_text: If True, restore the combobox text after replacing the items.
|
||||
block_signals: If True, block combobox signals while the items are replaced.
|
||||
"""
|
||||
current_text = combo_box.currentText()
|
||||
signal_blocker = QSignalBlocker(combo_box) if block_signals else nullcontext()
|
||||
with signal_blocker:
|
||||
combo_box.clear()
|
||||
for item in items:
|
||||
if isinstance(item, str):
|
||||
combo_box.addItem(item)
|
||||
@abstractmethod
|
||||
def set_selection(self, widget, selection: list[str | tuple]) -> None:
|
||||
"""Set the filtered_selection for the widget
|
||||
|
||||
Args:
|
||||
widget: Widget instance
|
||||
selection (list[str | tuple]): Filtered selection of items.
|
||||
If tuple, it contains (text, data) pairs.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def check_input(self, widget, text: str) -> bool:
|
||||
"""Check if the input text is in the filtered selection
|
||||
|
||||
Args:
|
||||
widget: Widget instance
|
||||
text (str): Input text
|
||||
|
||||
Returns:
|
||||
bool: True if the input text is in the filtered selection
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def update_with_kind(
|
||||
self, kind: Kind, signal_filter: set, device_info: dict, device_name: str
|
||||
) -> list[str | tuple]:
|
||||
"""Update the selection based on the kind of signal.
|
||||
|
||||
Args:
|
||||
kind (Kind): The kind of signal to filter.
|
||||
signal_filter (set): Set of signal kinds to filter.
|
||||
device_info (dict): Dictionary containing device information.
|
||||
device_name (str): Name of the device.
|
||||
|
||||
Returns:
|
||||
list[str | tuple]: A list of filtered signals based on the kind.
|
||||
"""
|
||||
# This method should be implemented in subclasses or extended as needed
|
||||
|
||||
def update_with_bec_signal_class(
|
||||
self,
|
||||
signal_class_filter: str | list[str],
|
||||
client,
|
||||
ndim_filter: int | list[int] | None = None,
|
||||
) -> list[tuple[str, str, dict]]:
|
||||
"""Update the selection based on signal classes using device_manager.get_bec_signals.
|
||||
|
||||
Args:
|
||||
signal_class_filter (str|list[str]): List of signal class names to filter.
|
||||
client: BEC client instance.
|
||||
ndim_filter (int | list[int] | None): Filter signals by dimensionality.
|
||||
If provided, only signals with matching ndim will be included.
|
||||
|
||||
Returns:
|
||||
list[tuple[str, str, dict]]: A list of (device_name, signal_name, signal_config) tuples.
|
||||
"""
|
||||
if not client or not hasattr(client, "device_manager"):
|
||||
return []
|
||||
|
||||
try:
|
||||
signals = client.device_manager.get_bec_signals(signal_class_filter)
|
||||
except TypeCheckError as e:
|
||||
logger.warning(f"Error retrieving signals: {e}")
|
||||
return []
|
||||
|
||||
if ndim_filter is None:
|
||||
return signals
|
||||
|
||||
if isinstance(ndim_filter, int):
|
||||
ndim_filter = [ndim_filter]
|
||||
|
||||
filtered_signals = []
|
||||
for device_name, signal_name, signal_config in signals:
|
||||
ndim = None
|
||||
if isinstance(signal_config, dict):
|
||||
ndim = signal_config.get("describe", {}).get("signal_info", {}).get("ndim")
|
||||
|
||||
if ndim in ndim_filter:
|
||||
filtered_signals.append((device_name, signal_name, signal_config))
|
||||
|
||||
return filtered_signals
|
||||
|
||||
|
||||
class LineEditFilterHandler(WidgetFilterHandler):
|
||||
"""Handler for QLineEdit widget"""
|
||||
|
||||
def set_selection(self, widget: QLineEdit, selection: list[str | tuple]) -> None:
|
||||
"""Set the selection for the widget to the completer model
|
||||
|
||||
Args:
|
||||
widget (QLineEdit): The QLineEdit widget
|
||||
selection (list[str | tuple]): Filtered selection of items. If tuple, it contains (text, data) pairs.
|
||||
"""
|
||||
if isinstance(selection, tuple):
|
||||
# If selection is a tuple, it contains (text, data) pairs
|
||||
selection = [text for text, _ in selection]
|
||||
if not isinstance(widget.completer, QCompleter):
|
||||
completer = QCompleter(widget)
|
||||
widget.setCompleter(completer)
|
||||
widget.completer.setModel(QStringListModel(selection, widget))
|
||||
|
||||
def check_input(self, widget: QLineEdit, text: str) -> bool:
|
||||
"""Check if the input text is in the filtered selection
|
||||
|
||||
Args:
|
||||
widget (QLineEdit): The QLineEdit widget
|
||||
text (str): Input text
|
||||
|
||||
Returns:
|
||||
bool: True if the input text is in the filtered selection
|
||||
"""
|
||||
model = widget.completer.model()
|
||||
model_data = [model.data(model.index(i)) for i in range(model.rowCount())]
|
||||
return text in model_data
|
||||
|
||||
def update_with_kind(
|
||||
self, kind: Kind, signal_filter: set, device_info: dict, device_name: str
|
||||
) -> list[str | tuple]:
|
||||
"""Update the selection based on the kind of signal.
|
||||
|
||||
Args:
|
||||
kind (Kind): The kind of signal to filter.
|
||||
signal_filter (set): Set of signal kinds to filter.
|
||||
device_info (dict): Dictionary containing device information.
|
||||
device_name (str): Name of the device.
|
||||
|
||||
Returns:
|
||||
list[str | tuple]: A list of filtered signals based on the kind.
|
||||
"""
|
||||
|
||||
return [
|
||||
signal
|
||||
for signal, signal_info in device_info.items()
|
||||
if kind in signal_filter and (signal_info.get("kind_str", None) == str(kind.name))
|
||||
]
|
||||
|
||||
|
||||
class ComboBoxFilterHandler(WidgetFilterHandler):
|
||||
"""Handler for QComboBox widget"""
|
||||
|
||||
def set_selection(self, widget: QComboBox, selection: list[str | tuple]) -> None:
|
||||
"""Set the selection for the widget to the completer model
|
||||
|
||||
Args:
|
||||
widget (QComboBox): The QComboBox widget
|
||||
selection (list[str | tuple]): Filtered selection of items. If tuple, it contains (text, data) pairs.
|
||||
"""
|
||||
widget.clear()
|
||||
if len(selection) == 0:
|
||||
return
|
||||
for element in selection:
|
||||
if isinstance(element, str):
|
||||
widget.addItem(element)
|
||||
elif isinstance(element, tuple):
|
||||
# If element is a tuple, it contains (text, data) pairs
|
||||
widget.addItem(*element)
|
||||
|
||||
def check_input(self, widget: QComboBox, text: str) -> bool:
|
||||
"""Check if the input text is in the filtered selection
|
||||
|
||||
Args:
|
||||
widget (QComboBox): The QComboBox widget
|
||||
text (str): Input text
|
||||
|
||||
Returns:
|
||||
bool: True if the input text is in the filtered selection
|
||||
"""
|
||||
return text in [widget.itemText(i) for i in range(widget.count())]
|
||||
|
||||
def update_with_kind(
|
||||
self, kind: Kind, signal_filter: set, device_info: dict, device_name: str
|
||||
) -> list[str | tuple]:
|
||||
"""Update the selection based on the kind of signal.
|
||||
|
||||
Args:
|
||||
kind (Kind): The kind of signal to filter.
|
||||
signal_filter (set): Set of signal kinds to filter.
|
||||
device_info (dict): Dictionary containing device information.
|
||||
device_name (str): Name of the device.
|
||||
|
||||
Returns:
|
||||
list[str | tuple]: A list of filtered signals based on the kind.
|
||||
"""
|
||||
out = []
|
||||
for signal, signal_info in device_info.items():
|
||||
if kind not in signal_filter or (signal_info.get("kind_str", None) != str(kind.name)):
|
||||
continue
|
||||
obj_name = signal_info.get("obj_name", "")
|
||||
component_name = signal_info.get("component_name", "")
|
||||
signal_wo_device = obj_name.removeprefix(f"{device_name}_")
|
||||
if not signal_wo_device:
|
||||
signal_wo_device = obj_name
|
||||
|
||||
if signal_wo_device != signal and component_name.replace(".", "_") != signal_wo_device:
|
||||
# If the object name is not the same as the signal name, we use the object name
|
||||
# to display in the combobox.
|
||||
out.append((f"{signal_wo_device} ({signal})", signal_info))
|
||||
else:
|
||||
combo_box.addItem(*item)
|
||||
if preserve_current_text:
|
||||
combo_box.setCurrentText(current_text)
|
||||
# If the object name is the same as the signal name, we do not change it.
|
||||
out.append((signal, signal_info))
|
||||
|
||||
return out
|
||||
|
||||
|
||||
def signal_items_for_kind(
|
||||
*, kind: Kind, signal_filter: set[Kind], device_info: dict, device_name: str
|
||||
) -> list[tuple[str, dict]]:
|
||||
"""Build display entries for signals matching a BEC signal kind.
|
||||
|
||||
Args:
|
||||
kind: Signal kind to collect.
|
||||
signal_filter: Enabled signal kinds.
|
||||
device_info: Signal metadata from the BEC device info dictionary.
|
||||
device_name: Name of the device owning the signals.
|
||||
|
||||
Returns:
|
||||
Combobox entries as ``(display_text, signal_info)`` tuples.
|
||||
class FilterIO:
|
||||
"""Public interface to set filters for input widgets.
|
||||
It supports the list of widgets stored in class attribute _handlers.
|
||||
"""
|
||||
items: list[tuple[str, dict]] = []
|
||||
for signal_name, signal_info in device_info.items():
|
||||
if kind not in signal_filter or signal_info.get("kind_str") != kind.name:
|
||||
continue
|
||||
|
||||
obj_name = signal_info.get("obj_name", "")
|
||||
component_name = signal_info.get("component_name", "")
|
||||
signal_without_device = obj_name.removeprefix(f"{device_name}_")
|
||||
if not signal_without_device:
|
||||
signal_without_device = obj_name
|
||||
_handlers = {QLineEdit: LineEditFilterHandler, QComboBox: ComboBoxFilterHandler}
|
||||
|
||||
if (
|
||||
signal_without_device != signal_name
|
||||
and component_name.replace(".", "_") != signal_without_device
|
||||
):
|
||||
items.append((f"{signal_without_device} ({signal_name})", signal_info))
|
||||
else:
|
||||
items.append((signal_name, signal_info))
|
||||
return items
|
||||
@staticmethod
|
||||
def set_selection(widget, selection: list[str | tuple], ignore_errors=True):
|
||||
"""
|
||||
Retrieve value from the widget instance.
|
||||
|
||||
Args:
|
||||
widget: Widget instance.
|
||||
selection (list[str | tuple]): Filtered selection of items.
|
||||
If tuple, it contains (text, data) pairs.
|
||||
ignore_errors(bool, optional): Whether to ignore if no handler is found.
|
||||
"""
|
||||
handler_class = FilterIO._find_handler(widget)
|
||||
if handler_class:
|
||||
return handler_class().set_selection(widget=widget, selection=selection)
|
||||
if not ignore_errors:
|
||||
raise ValueError(
|
||||
f"No matching handler for widget type: {type(widget)} in handler list {FilterIO._handlers}"
|
||||
)
|
||||
return None
|
||||
|
||||
def get_bec_signals_for_classes(
|
||||
*, client, signal_class_filter: str | list[str], ndim_filter: int | list[int] | None = None
|
||||
) -> list[tuple[str, str, dict]]:
|
||||
"""Return BEC signals filtered by signal class and optional dimensionality.
|
||||
@staticmethod
|
||||
def check_input(widget, text: str, ignore_errors=True):
|
||||
"""
|
||||
Check if the input text is in the filtered selection.
|
||||
|
||||
Args:
|
||||
client: BEC client that provides ``device_manager.get_bec_signals``.
|
||||
signal_class_filter: Signal class name or class names passed to the device manager.
|
||||
ndim_filter: Optional dimensionality filter. If provided, only signals whose
|
||||
``describe.signal_info.ndim`` is in this value are returned.
|
||||
Args:
|
||||
widget: Widget instance.
|
||||
text(str): Input text.
|
||||
ignore_errors(bool, optional): Whether to ignore if no handler is found.
|
||||
|
||||
Returns:
|
||||
Tuples of ``(device_name, signal_name, signal_config)`` for matching signals.
|
||||
"""
|
||||
if not client or not hasattr(client, "device_manager"):
|
||||
return []
|
||||
Returns:
|
||||
bool: True if the input text is in the filtered selection.
|
||||
"""
|
||||
handler_class = FilterIO._find_handler(widget)
|
||||
if handler_class:
|
||||
return handler_class().check_input(widget=widget, text=text)
|
||||
if not ignore_errors:
|
||||
raise ValueError(
|
||||
f"No matching handler for widget type: {type(widget)} in handler list {FilterIO._handlers}"
|
||||
)
|
||||
return None
|
||||
|
||||
try:
|
||||
signals = client.device_manager.get_bec_signals(signal_class_filter)
|
||||
except TypeCheckError as exc:
|
||||
logger.warning(f"Error retrieving signals: {exc}")
|
||||
return []
|
||||
@staticmethod
|
||||
def update_with_kind(
|
||||
widget, kind: Kind, signal_filter: set, device_info: dict, device_name: str
|
||||
) -> list[str | tuple]:
|
||||
"""
|
||||
Update the selection based on the kind of signal.
|
||||
|
||||
if ndim_filter is None:
|
||||
return signals
|
||||
Args:
|
||||
widget: Widget instance.
|
||||
kind (Kind): The kind of signal to filter.
|
||||
signal_filter (set): Set of signal kinds to filter.
|
||||
device_info (dict): Dictionary containing device information.
|
||||
device_name (str): Name of the device.
|
||||
|
||||
accepted_ndim = [ndim_filter] if isinstance(ndim_filter, int) else ndim_filter
|
||||
filtered_signals: list[tuple[str, str, dict]] = []
|
||||
for device_name, signal_name, signal_config in signals:
|
||||
ndim = None
|
||||
if isinstance(signal_config, dict):
|
||||
ndim = signal_config.get("describe", {}).get("signal_info", {}).get("ndim")
|
||||
if ndim in accepted_ndim:
|
||||
filtered_signals.append((device_name, signal_name, signal_config))
|
||||
return filtered_signals
|
||||
Returns:
|
||||
list[str | tuple]: A list of filtered signals based on the kind.
|
||||
"""
|
||||
handler_class = FilterIO._find_handler(widget)
|
||||
if handler_class:
|
||||
return handler_class().update_with_kind(
|
||||
kind=kind,
|
||||
signal_filter=signal_filter,
|
||||
device_info=device_info,
|
||||
device_name=device_name,
|
||||
)
|
||||
raise ValueError(
|
||||
f"No matching handler for widget type: {type(widget)} in handler list {FilterIO._handlers}"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def update_with_signal_class(
|
||||
widget, signal_class_filter: list[str], client, ndim_filter: int | list[int] | None = None
|
||||
) -> list[tuple[str, str, dict]]:
|
||||
"""
|
||||
Update the selection based on signal classes using device_manager.get_bec_signals.
|
||||
|
||||
Args:
|
||||
widget: Widget instance.
|
||||
signal_class_filter (list[str]): List of signal class names to filter.
|
||||
client: BEC client instance.
|
||||
ndim_filter (int | list[int] | None): Filter signals by dimensionality.
|
||||
If provided, only signals with matching ndim will be included.
|
||||
|
||||
Returns:
|
||||
list[tuple[str, str, dict]]: A list of (device_name, signal_name, signal_config) tuples.
|
||||
"""
|
||||
handler_class = FilterIO._find_handler(widget)
|
||||
if handler_class:
|
||||
return handler_class().update_with_bec_signal_class(
|
||||
signal_class_filter=signal_class_filter, client=client, ndim_filter=ndim_filter
|
||||
)
|
||||
raise ValueError(
|
||||
f"No matching handler for widget type: {type(widget)} in handler list {FilterIO._handlers}"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _find_handler(widget):
|
||||
"""
|
||||
Find the appropriate handler for the widget by checking its base classes.
|
||||
|
||||
Args:
|
||||
widget: Widget instance.
|
||||
|
||||
Returns:
|
||||
handler_class: The handler class if found, otherwise None.
|
||||
"""
|
||||
for base in type(widget).__mro__:
|
||||
if base in FilterIO._handlers:
|
||||
return FilterIO._handlers[base]
|
||||
return None
|
||||
|
||||
@@ -106,9 +106,7 @@ class TypedForm(BECWidget, QWidget):
|
||||
|
||||
def _add_griditem(self, item: FormItemSpec, row: int):
|
||||
grid = self._form_grid.layout()
|
||||
# Use title from FieldInfo if available, otherwise use the property name
|
||||
label_text = item.info.title if item.info.title else item.name
|
||||
label = QLabel(parent=self._form_grid, text=label_text)
|
||||
label = QLabel(parent=self._form_grid, text=item.name)
|
||||
label.setProperty("_model_field_name", item.name)
|
||||
label.setToolTip(item.info.description or item.name)
|
||||
grid.addWidget(label, row, 0)
|
||||
@@ -150,7 +148,7 @@ class TypedForm(BECWidget, QWidget):
|
||||
self.adjustSize()
|
||||
|
||||
def _new_grid_layout(self):
|
||||
new_grid = QGridLayout()
|
||||
new_grid = QGridLayout(self)
|
||||
new_grid.setContentsMargins(0, 0, 0, 0)
|
||||
return new_grid
|
||||
|
||||
|
||||
@@ -71,7 +71,7 @@ class FormItemSpec(BaseModel):
|
||||
"""
|
||||
The specification for an item in a dynamically generated form. Uses a pydantic FieldInfo
|
||||
to store most annotation info, since one of the main purposes is to store data for
|
||||
forms generated from pydantic models, but can also be composed from other sources or by hand.
|
||||
forms genrated from pydantic models, but can also be composed from other sources or by hand.
|
||||
"""
|
||||
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
@@ -192,7 +192,7 @@ class DynamicFormItem(QWidget):
|
||||
@abstractmethod
|
||||
def _add_main_widget(self) -> None:
|
||||
self._main_widget: QWidget
|
||||
"""Add the main data entry widget to self._main_widget and apply any
|
||||
"""Add the main data entry widget to self._main_widget and appply any
|
||||
constraints from the field info"""
|
||||
|
||||
@SafeSlot()
|
||||
@@ -231,8 +231,6 @@ class StrFormItem(DynamicFormItem):
|
||||
def __init__(self, parent: QWidget | None = None, *, spec: FormItemSpec) -> None:
|
||||
super().__init__(parent=parent, spec=spec)
|
||||
self._main_widget.textChanged.connect(self._value_changed)
|
||||
if spec.info.description:
|
||||
self._main_widget.setPlaceholderText(spec.info.description)
|
||||
|
||||
def _add_main_widget(self) -> None:
|
||||
self._main_widget = QLineEdit()
|
||||
|
||||
@@ -1,53 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Mapping
|
||||
from typing import Any
|
||||
|
||||
from bec_lib.scan_args import ScanArgument
|
||||
from pydantic import BaseModel
|
||||
from pydantic_core import PydanticUndefined
|
||||
|
||||
from bec_widgets.utils.scan_arg_metadata import ui_config_from_metadata
|
||||
|
||||
NUMERIC_BOUND_KEYS = {"gt", "ge", "lt", "le"}
|
||||
|
||||
|
||||
def pydantic_model_input_configs(model: type[BaseModel]) -> list[dict[str, Any]]:
|
||||
"""Return scan-control-style field items for a Pydantic model."""
|
||||
configs = []
|
||||
for name, info in model.model_fields.items():
|
||||
metadata: dict[str, Any] = {}
|
||||
for entry in info.metadata:
|
||||
if isinstance(entry, ScanArgument):
|
||||
metadata.update(entry.model_dump(exclude_none=True))
|
||||
continue
|
||||
for key in NUMERIC_BOUND_KEYS:
|
||||
value = getattr(entry, key, None)
|
||||
if value is not None:
|
||||
metadata.setdefault(key, value)
|
||||
|
||||
if isinstance(info.json_schema_extra, Mapping):
|
||||
metadata.update(dict(info.json_schema_extra))
|
||||
|
||||
if info.description and metadata.get("description") is None:
|
||||
metadata["description"] = info.description
|
||||
|
||||
default: Any
|
||||
if info.default is not PydanticUndefined:
|
||||
default = info.default
|
||||
elif info.default_factory is not None:
|
||||
default = info.get_default(call_default_factory=True)
|
||||
else:
|
||||
default = None
|
||||
|
||||
display_name = metadata.get("display_name") or info.title
|
||||
if display_name is None:
|
||||
display_name = name.replace("_", " ").capitalize()
|
||||
|
||||
item = ui_config_from_metadata(
|
||||
name=name, metadata=metadata, default=default, display_name=display_name
|
||||
)
|
||||
item.update({key: value for key, value in metadata.items() if key not in item})
|
||||
configs.append(item)
|
||||
|
||||
return configs
|
||||
@@ -1,815 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from types import NoneType
|
||||
from typing import Any, Literal, get_args, get_origin
|
||||
|
||||
from bec_lib.device import DeviceBase, Signal
|
||||
from pydantic import BaseModel, ValidationError
|
||||
from pydantic.fields import FieldInfo
|
||||
from qtpy.QtCore import Qt
|
||||
from qtpy.QtCore import Signal as QtSignal
|
||||
from qtpy.QtWidgets import (
|
||||
QCheckBox,
|
||||
QComboBox,
|
||||
QDoubleSpinBox,
|
||||
QFormLayout,
|
||||
QHBoxLayout,
|
||||
QLineEdit,
|
||||
QSpinBox,
|
||||
QWidget,
|
||||
)
|
||||
|
||||
from bec_widgets.utils.forms_from_types.pydantic_model_info_adapter import (
|
||||
NUMERIC_BOUND_KEYS,
|
||||
pydantic_model_input_configs,
|
||||
)
|
||||
from bec_widgets.utils.scan_arg_metadata import (
|
||||
apply_numeric_limits,
|
||||
apply_numeric_precision,
|
||||
apply_unit_metadata,
|
||||
device_units,
|
||||
)
|
||||
from bec_widgets.utils.widget_io import WidgetIO
|
||||
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
|
||||
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import SignalComboBox
|
||||
from bec_widgets.widgets.utility.spinbox.decimal_spinbox import BECSpinBox
|
||||
|
||||
|
||||
class OptionalValueWidget(QWidget):
|
||||
"""Wrap a value widget with an enable checkbox for optional Pydantic fields.
|
||||
|
||||
Attributes:
|
||||
value_changed: Signal emitted with the current value whenever the checkbox
|
||||
state or wrapped widget value changes.
|
||||
"""
|
||||
|
||||
value_changed = QtSignal(object)
|
||||
|
||||
def __init__(self, value_widget: QWidget, parent: QWidget | None = None) -> None:
|
||||
"""Create an optional-value wrapper.
|
||||
|
||||
Args:
|
||||
value_widget: Input widget used when the optional value is enabled.
|
||||
parent: Optional parent widget.
|
||||
"""
|
||||
super().__init__(parent=parent)
|
||||
self._value_widget = value_widget
|
||||
self._checkbox = QCheckBox(self)
|
||||
self._checkbox.setToolTip("Enable value")
|
||||
self._value_widget.setParent(self)
|
||||
|
||||
layout = QHBoxLayout(self)
|
||||
layout.setContentsMargins(0, 0, 0, 0)
|
||||
layout.setSpacing(8)
|
||||
layout.addWidget(self._checkbox)
|
||||
layout.addWidget(self._value_widget, 1)
|
||||
|
||||
self._checkbox.toggled.connect(self._on_enabled_changed)
|
||||
WidgetIO.connect_widget_change_signal(self._value_widget, self._emit_current_value)
|
||||
self._on_enabled_changed(False)
|
||||
|
||||
@property
|
||||
def value_widget(self) -> QWidget:
|
||||
"""Return the wrapped input widget.
|
||||
|
||||
Returns:
|
||||
The widget that edits the non-``None`` value.
|
||||
"""
|
||||
return self._value_widget
|
||||
|
||||
@property
|
||||
def checkbox(self) -> QCheckBox:
|
||||
"""Return the checkbox controlling whether the value is enabled.
|
||||
|
||||
Returns:
|
||||
The enable checkbox.
|
||||
"""
|
||||
return self._checkbox
|
||||
|
||||
def value(self) -> Any:
|
||||
"""Return the current optional value.
|
||||
|
||||
Returns:
|
||||
``None`` when the checkbox is unchecked; otherwise the wrapped widget value.
|
||||
"""
|
||||
if not self._checkbox.isChecked():
|
||||
return None
|
||||
return WidgetIO.get_value(self._value_widget)
|
||||
|
||||
def set_value(self, value: Any) -> None:
|
||||
"""Set the optional value.
|
||||
|
||||
Args:
|
||||
value: Value to set on the wrapped widget. ``None`` disables the value.
|
||||
"""
|
||||
enabled = value is not None
|
||||
self._checkbox.setChecked(enabled)
|
||||
self._value_widget.setEnabled(enabled)
|
||||
if enabled:
|
||||
WidgetIO.set_value(self._value_widget, value)
|
||||
|
||||
def _on_enabled_changed(self, enabled: bool) -> None:
|
||||
self._value_widget.setEnabled(enabled)
|
||||
self.value_changed.emit(self.value())
|
||||
|
||||
def _emit_current_value(self, *_args) -> None:
|
||||
self.value_changed.emit(self.value())
|
||||
|
||||
|
||||
class PydanticWidgetForm(QWidget):
|
||||
"""Generate a Qt form from a Pydantic model.
|
||||
|
||||
The form maps Pydantic field annotations to Qt widgets, applies supported
|
||||
field metadata, and exposes typed and raw data accessors for the generated
|
||||
fields.
|
||||
|
||||
Attributes:
|
||||
changed: Signal emitted whenever a generated input widget changes.
|
||||
validity_changed: Signal emitted by :meth:`validate` with the current
|
||||
validation result.
|
||||
"""
|
||||
|
||||
changed = QtSignal()
|
||||
validity_changed = QtSignal(bool)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
model: type[BaseModel],
|
||||
parent: QWidget | None = None,
|
||||
*,
|
||||
data: BaseModel | dict[str, Any] | None = None,
|
||||
read_only_fields: set[str] | None = None,
|
||||
client=None,
|
||||
) -> None:
|
||||
"""Create a generated form for a Pydantic model.
|
||||
|
||||
Args:
|
||||
model: Pydantic model class used to generate fields and validate data.
|
||||
parent: Optional parent widget.
|
||||
data: Optional initial model instance or raw field-value mapping.
|
||||
read_only_fields: Field names that should be displayed but not editable.
|
||||
client: Optional BEC client passed to domain-specific widgets such as
|
||||
device and signal combo boxes.
|
||||
"""
|
||||
super().__init__(parent=parent)
|
||||
self._model = model
|
||||
self._client = client
|
||||
self._read_only_fields = set(read_only_fields or set())
|
||||
self._widgets: dict[str, QWidget] = {}
|
||||
self._field_configs: dict[str, dict[str, Any]] = {}
|
||||
self._baseline: dict[str, Any] = {}
|
||||
|
||||
self._layout = QFormLayout()
|
||||
self._layout.setContentsMargins(0, 0, 0, 0)
|
||||
self._layout.setHorizontalSpacing(10)
|
||||
self._layout.setVerticalSpacing(8)
|
||||
self._layout.setFieldGrowthPolicy(QFormLayout.FieldGrowthPolicy.AllNonFixedFieldsGrow)
|
||||
self._layout.setLabelAlignment(Qt.AlignmentFlag.AlignRight)
|
||||
self.setLayout(self._layout)
|
||||
|
||||
self._populate()
|
||||
if data is not None:
|
||||
self.set_data(data)
|
||||
self.mark_clean()
|
||||
|
||||
@property
|
||||
def model(self) -> type[BaseModel]:
|
||||
"""Return the active Pydantic model class.
|
||||
|
||||
Returns:
|
||||
The model class currently used by this form.
|
||||
"""
|
||||
return self._model
|
||||
|
||||
@property
|
||||
def widgets(self) -> dict[str, QWidget]:
|
||||
"""Return generated field widgets keyed by model field name.
|
||||
|
||||
Returns:
|
||||
A shallow copy of the field-widget mapping. Optional fields return
|
||||
their outer :class:`OptionalValueWidget`.
|
||||
"""
|
||||
return dict(self._widgets)
|
||||
|
||||
def field_widget(self, name: str) -> QWidget:
|
||||
"""Return the generated widget for a field.
|
||||
|
||||
Args:
|
||||
name: Model field name.
|
||||
|
||||
Returns:
|
||||
The generated field widget. Optional fields return their outer
|
||||
:class:`OptionalValueWidget`.
|
||||
|
||||
Raises:
|
||||
KeyError: If no widget exists for ``name``.
|
||||
"""
|
||||
return self._widgets[name]
|
||||
|
||||
def input_widget(self, name: str) -> QWidget:
|
||||
"""Return the direct input widget for a field.
|
||||
|
||||
Args:
|
||||
name: Model field name.
|
||||
|
||||
Returns:
|
||||
The editable input widget. Optional fields return the wrapped value
|
||||
widget instead of the outer optional wrapper.
|
||||
|
||||
Raises:
|
||||
KeyError: If no widget exists for ``name``.
|
||||
"""
|
||||
widget = self._widgets[name]
|
||||
if isinstance(widget, OptionalValueWidget):
|
||||
return widget.value_widget
|
||||
return widget
|
||||
|
||||
def input_widgets(self) -> dict[str, QWidget]:
|
||||
"""Return direct input widgets keyed by model field name.
|
||||
|
||||
Returns:
|
||||
Mapping of field names to editable input widgets.
|
||||
"""
|
||||
return {name: self.input_widget(name) for name in self._widgets}
|
||||
|
||||
def input_widgets_by_type(self, widget_type: type[QWidget]) -> list[QWidget]:
|
||||
"""Return direct input widgets matching a widget type.
|
||||
|
||||
Args:
|
||||
widget_type: Qt widget class to match with ``isinstance``.
|
||||
|
||||
Returns:
|
||||
List of input widgets matching ``widget_type``.
|
||||
"""
|
||||
return [
|
||||
widget for widget in self.input_widgets().values() if isinstance(widget, widget_type)
|
||||
]
|
||||
|
||||
def set_model(self, model: type[BaseModel], data: dict[str, Any] | None = None) -> None:
|
||||
"""Replace the active model and rebuild the form.
|
||||
|
||||
Args:
|
||||
model: New Pydantic model class.
|
||||
data: Optional initial data for the new model. When omitted, values
|
||||
from fields shared with the previous model are preserved.
|
||||
"""
|
||||
old_data = self.raw_data()
|
||||
self.cleanup()
|
||||
self._model = model
|
||||
self._populate()
|
||||
if data is None:
|
||||
data = {key: value for key, value in old_data.items() if key in model.model_fields}
|
||||
self.set_partial_data(data)
|
||||
self.mark_clean()
|
||||
|
||||
def set_data(self, data: BaseModel | dict[str, Any]) -> None:
|
||||
"""Set form values from a model instance or mapping.
|
||||
|
||||
Args:
|
||||
data: Pydantic model instance or raw field-value mapping.
|
||||
"""
|
||||
values = data.model_dump() if isinstance(data, BaseModel) else dict(data)
|
||||
self.set_partial_data(values)
|
||||
|
||||
def set_partial_data(self, data: dict[str, Any]) -> None:
|
||||
"""Set values for fields present in the form.
|
||||
|
||||
Unknown keys are ignored, which allows callers to pass larger model
|
||||
dumps or backend payloads safely.
|
||||
|
||||
Args:
|
||||
data: Field-value mapping to apply.
|
||||
"""
|
||||
for name, value in data.items():
|
||||
if name not in self._widgets:
|
||||
continue
|
||||
self._set_widget_value(name, value)
|
||||
self._refresh_reference_units()
|
||||
self.changed.emit()
|
||||
|
||||
def raw_data(self) -> dict[str, Any]:
|
||||
"""Return current widget values without Pydantic validation.
|
||||
|
||||
Returns:
|
||||
Mapping of model field names to raw widget values.
|
||||
"""
|
||||
return {name: self._read_widget_value(name) for name in self._widgets}
|
||||
|
||||
def get_data(self) -> dict[str, Any]:
|
||||
"""Return current data after Pydantic validation.
|
||||
|
||||
Returns:
|
||||
Validated model data as a dictionary.
|
||||
|
||||
Raises:
|
||||
ValidationError: If Pydantic validation fails.
|
||||
ValueError: If domain widget validation fails.
|
||||
"""
|
||||
return self.model_instance().model_dump()
|
||||
|
||||
def model_instance(self) -> BaseModel:
|
||||
"""Return the current values as a Pydantic model instance.
|
||||
|
||||
Returns:
|
||||
Validated instance of the active model class.
|
||||
|
||||
Raises:
|
||||
ValidationError: If Pydantic validation fails.
|
||||
ValueError: If domain widget validation fails.
|
||||
"""
|
||||
self._validate_domain_widgets()
|
||||
return self._model.model_validate(self.raw_data())
|
||||
|
||||
def validate(self) -> bool:
|
||||
"""Validate the current form values.
|
||||
|
||||
Returns:
|
||||
``True`` when current values validate successfully, otherwise ``False``.
|
||||
"""
|
||||
try:
|
||||
self.get_data()
|
||||
except (ValidationError, ValueError):
|
||||
self.validity_changed.emit(False)
|
||||
return False
|
||||
self.validity_changed.emit(True)
|
||||
return True
|
||||
|
||||
def dirty_fields(self) -> set[str]:
|
||||
"""Return fields whose raw values differ from the clean baseline.
|
||||
|
||||
Returns:
|
||||
Set of dirty field names.
|
||||
"""
|
||||
current = self.raw_data()
|
||||
fields = set(current) | set(self._baseline)
|
||||
return {field for field in fields if current.get(field) != self._baseline.get(field)}
|
||||
|
||||
def mark_clean(self) -> None:
|
||||
"""Store the current raw values as the clean baseline."""
|
||||
self._baseline = self.raw_data()
|
||||
|
||||
def reset_to_baseline(self) -> None:
|
||||
"""Restore the form values to the current clean baseline."""
|
||||
self.set_partial_data(self._baseline)
|
||||
|
||||
def editable_data(self) -> dict[str, Any]:
|
||||
"""Return validated data excluding read-only fields.
|
||||
|
||||
Returns:
|
||||
Validated editable field values.
|
||||
|
||||
Raises:
|
||||
ValidationError: If Pydantic validation fails.
|
||||
ValueError: If domain widget validation fails.
|
||||
"""
|
||||
return {
|
||||
key: value
|
||||
for key, value in self.get_data().items()
|
||||
if key not in self._read_only_fields
|
||||
}
|
||||
|
||||
def raw_editable_data(self) -> dict[str, Any]:
|
||||
"""Return raw widget data excluding read-only fields.
|
||||
|
||||
Returns:
|
||||
Raw editable field values.
|
||||
"""
|
||||
return {
|
||||
key: value
|
||||
for key, value in self.raw_data().items()
|
||||
if key not in self._read_only_fields
|
||||
}
|
||||
|
||||
def cleanup(self) -> None:
|
||||
"""Close and schedule deletion of all generated field widgets."""
|
||||
while self._layout.rowCount():
|
||||
row = self._layout.takeRow(0)
|
||||
for item in (row.labelItem, row.fieldItem):
|
||||
widget = item.widget() if item is not None else None
|
||||
if widget is not None:
|
||||
widget.close()
|
||||
# Detach before deleteLater: a child pending deletion that still has a
|
||||
# signal connection into this form crashes if the form is garbage
|
||||
# collected before the deferred delete is processed.
|
||||
widget.setParent(None)
|
||||
widget.deleteLater()
|
||||
self._widgets.clear()
|
||||
self._field_configs.clear()
|
||||
|
||||
def closeEvent(self, event) -> None: # noqa: N802
|
||||
self.cleanup()
|
||||
super().closeEvent(event)
|
||||
|
||||
def _populate(self) -> None:
|
||||
for config in pydantic_model_input_configs(self._model):
|
||||
name = config["name"]
|
||||
info = self._model.model_fields[name]
|
||||
widget = self._create_widget(name, info)
|
||||
label_text = config["display_name"]
|
||||
self._layout.addRow(label_text, widget)
|
||||
label = self._layout.labelForField(widget)
|
||||
if label is not None:
|
||||
label.setProperty("_model_field_name", name)
|
||||
if config.get("tooltip") and label is not None:
|
||||
label.setToolTip(config["tooltip"])
|
||||
widget.setEnabled(name not in self._read_only_fields)
|
||||
self._widgets[name] = widget
|
||||
self._field_configs[name] = config
|
||||
self._set_widget_value(name, config["default"])
|
||||
self._apply_field_metadata(name)
|
||||
self._connect_widget(widget)
|
||||
|
||||
self._connect_device_signal_widgets()
|
||||
self._connect_reference_unit_widgets()
|
||||
self._refresh_reference_units()
|
||||
|
||||
def _create_widget(self, name: str, info: FieldInfo) -> QWidget:
|
||||
annotation = info.annotation
|
||||
args = get_args(annotation)
|
||||
optional = NoneType in args
|
||||
non_none_args = tuple(arg for arg in args if arg is not NoneType)
|
||||
value_annotation = non_none_args[0] if len(non_none_args) == 1 else annotation
|
||||
|
||||
widget = self._create_value_widget(name, value_annotation)
|
||||
numeric = value_annotation in (int, float) or any(
|
||||
arg in (int, float) for arg in get_args(value_annotation)
|
||||
)
|
||||
if optional and (numeric or value_annotation is bool):
|
||||
return OptionalValueWidget(widget, parent=self)
|
||||
return widget
|
||||
|
||||
def _create_value_widget(self, name: str, annotation: Any) -> QWidget:
|
||||
args = get_args(annotation)
|
||||
if (
|
||||
isinstance(annotation, type)
|
||||
and issubclass(annotation, Signal)
|
||||
or any(isinstance(arg, type) and issubclass(arg, Signal) for arg in args)
|
||||
):
|
||||
return SignalComboBox(
|
||||
parent=self,
|
||||
client=self._client,
|
||||
require_device=self._model_has_device_field(),
|
||||
arg_name=name,
|
||||
)
|
||||
if (
|
||||
isinstance(annotation, type)
|
||||
and issubclass(annotation, DeviceBase)
|
||||
or any(isinstance(arg, type) and issubclass(arg, DeviceBase) for arg in args)
|
||||
):
|
||||
return DeviceComboBox(parent=self, client=self._client, arg_name=name)
|
||||
if get_origin(annotation) is Literal:
|
||||
widget = QComboBox(self)
|
||||
widget.addItems([str(value) for value in get_args(annotation)])
|
||||
return widget
|
||||
if annotation is bool:
|
||||
return QCheckBox(self)
|
||||
if annotation is int:
|
||||
spin_box = QSpinBox(self)
|
||||
spin_box.setRange(-2147483647, 2147483647)
|
||||
return spin_box
|
||||
if annotation is float:
|
||||
spin_box = BECSpinBox(self)
|
||||
spin_box.setRange(-1_000_000_000, 1_000_000_000)
|
||||
return spin_box
|
||||
return QLineEdit(self)
|
||||
|
||||
def _apply_field_metadata(self, name: str) -> None:
|
||||
config = self._field_configs[name]
|
||||
field_widget = self._widgets[name]
|
||||
input_widget = self.input_widget(name)
|
||||
|
||||
if config.get("precision") is not None:
|
||||
apply_numeric_precision(input_widget, config)
|
||||
if any(config.get(key) is not None for key in NUMERIC_BOUND_KEYS):
|
||||
apply_numeric_limits(input_widget, config)
|
||||
|
||||
apply_unit_metadata(field_widget, config)
|
||||
if input_widget is not field_widget:
|
||||
apply_unit_metadata(input_widget, config)
|
||||
|
||||
def _connect_widget(self, widget: QWidget) -> None:
|
||||
if isinstance(widget, OptionalValueWidget):
|
||||
widget.value_changed.connect(lambda _value: self.changed.emit())
|
||||
return
|
||||
WidgetIO.connect_widget_change_signal(widget, lambda *_args: self.changed.emit())
|
||||
|
||||
def _connect_device_signal_widgets(self) -> None:
|
||||
devices = [
|
||||
widget for widget in self._widgets.values() if isinstance(widget, DeviceComboBox)
|
||||
]
|
||||
signals = [
|
||||
widget for widget in self._widgets.values() if isinstance(widget, SignalComboBox)
|
||||
]
|
||||
if not devices or not signals:
|
||||
return
|
||||
device_widget = devices[0]
|
||||
for signal_widget in signals:
|
||||
device_widget.device_selected.connect(signal_widget.set_device)
|
||||
device_widget.device_reset.connect(lambda w=signal_widget: w.set_device(None))
|
||||
if device_widget.currentText().strip():
|
||||
signal_widget.set_device(device_widget.currentText().strip())
|
||||
|
||||
def _connect_reference_unit_widgets(self) -> None:
|
||||
for name, widget in self.input_widgets().items():
|
||||
if not isinstance(widget, DeviceComboBox):
|
||||
continue
|
||||
widget.device_selected.connect(
|
||||
lambda _device_name, field_name=name: self._update_reference_units(field_name)
|
||||
)
|
||||
widget.device_reset.connect(
|
||||
lambda field_name=name: self._apply_reference_units(field_name, None)
|
||||
)
|
||||
widget.currentTextChanged.connect(
|
||||
lambda text, field_name=name: self._handle_reference_device_text(field_name, text)
|
||||
)
|
||||
|
||||
def _refresh_reference_units(self) -> None:
|
||||
for name, widget in self.input_widgets().items():
|
||||
if isinstance(widget, DeviceComboBox):
|
||||
self._update_reference_units(name)
|
||||
|
||||
def _update_reference_units(self, source_name: str) -> None:
|
||||
widget = self.input_widget(source_name)
|
||||
if not isinstance(widget, DeviceComboBox) or not widget.is_valid_input:
|
||||
self._apply_reference_units(source_name, None)
|
||||
return
|
||||
self._apply_reference_units(source_name, device_units(widget.get_current_device()))
|
||||
|
||||
def _apply_reference_units(self, source_name: str, units: str | None) -> None:
|
||||
for field_name, config in self._field_configs.items():
|
||||
if config.get("reference_units") != source_name:
|
||||
continue
|
||||
field_widget = self.field_widget(field_name)
|
||||
input_widget = self.input_widget(field_name)
|
||||
apply_unit_metadata(field_widget, config, units)
|
||||
if input_widget is not field_widget:
|
||||
apply_unit_metadata(input_widget, config, units)
|
||||
|
||||
def _handle_reference_device_text(self, source_name: str, device_name: str) -> None:
|
||||
widget = self.input_widget(source_name)
|
||||
if isinstance(widget, DeviceComboBox) and not widget.validate_device(device_name):
|
||||
self._apply_reference_units(source_name, None)
|
||||
|
||||
def _validate_domain_widgets(self) -> None:
|
||||
for widget in self._widgets.values():
|
||||
if isinstance(widget, DeviceComboBox):
|
||||
device = widget.currentText().strip()
|
||||
if not device:
|
||||
raise ValueError("Device is required.")
|
||||
if not widget.is_valid_input:
|
||||
raise ValueError(f"Device '{device}' is not available.")
|
||||
if isinstance(widget, SignalComboBox):
|
||||
signal = widget.get_signal_name().strip()
|
||||
if signal and not widget.is_valid_input:
|
||||
raise ValueError(f"Signal '{signal}' is not available.")
|
||||
|
||||
def _read_widget_value(self, name: str) -> Any:
|
||||
widget = self._widgets[name]
|
||||
info = self._model.model_fields[name]
|
||||
if isinstance(widget, OptionalValueWidget):
|
||||
return widget.value()
|
||||
if isinstance(widget, QLineEdit):
|
||||
value = WidgetIO.get_value(widget)
|
||||
return None if NoneType in get_args(info.annotation) and value == "" else value
|
||||
if isinstance(widget, QComboBox) and get_origin(info.annotation) is Literal:
|
||||
return WidgetIO.get_value(widget, as_string=True)
|
||||
return WidgetIO.get_value(widget)
|
||||
|
||||
def _set_widget_value(self, name: str, value: Any) -> None:
|
||||
widget = self._widgets[name]
|
||||
if isinstance(widget, OptionalValueWidget):
|
||||
widget.set_value(value)
|
||||
return
|
||||
if value is None:
|
||||
if isinstance(widget, QLineEdit):
|
||||
value = ""
|
||||
elif isinstance(widget, QCheckBox):
|
||||
value = False
|
||||
elif isinstance(widget, (QSpinBox, QDoubleSpinBox)):
|
||||
value = 0
|
||||
WidgetIO.set_value(widget, value)
|
||||
|
||||
def _model_has_device_field(self) -> bool:
|
||||
for field in self._model.model_fields.values():
|
||||
annotation = field.annotation
|
||||
args = get_args(annotation)
|
||||
has_device = (
|
||||
isinstance(annotation, type)
|
||||
and issubclass(annotation, DeviceBase)
|
||||
or any(isinstance(arg, type) and issubclass(arg, DeviceBase) for arg in args)
|
||||
)
|
||||
has_signal = (
|
||||
isinstance(annotation, type)
|
||||
and issubclass(annotation, Signal)
|
||||
or any(isinstance(arg, type) and issubclass(arg, Signal) for arg in args)
|
||||
)
|
||||
if has_device and not has_signal:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
import json
|
||||
import sys
|
||||
|
||||
from bec_lib.scan_args import ScanArgument
|
||||
from pydantic import Field
|
||||
from qtpy.QtWidgets import QApplication, QLabel, QPushButton, QTabWidget, QTextEdit, QVBoxLayout
|
||||
|
||||
from bec_widgets.utils.colors import apply_theme
|
||||
|
||||
class BasicScanConfig(BaseModel):
|
||||
"""Plain Pydantic fields without GUI metadata."""
|
||||
|
||||
sample_name: str
|
||||
enabled: bool = True
|
||||
repeats: int = 3
|
||||
|
||||
class LimitConfig(BaseModel):
|
||||
"""Normal Pydantic Field metadata."""
|
||||
|
||||
mode: Literal["monitor", "scan", "calibration"] = "scan"
|
||||
low_limit: (
|
||||
float | None
|
||||
) # example of the field without additional metadata, still works in form
|
||||
high_limit: float | None = Field(
|
||||
default=10.0,
|
||||
title="High limit",
|
||||
description="Optional upper allowed value.",
|
||||
json_schema_extra={"precision": 4},
|
||||
)
|
||||
tolerance: float = Field(
|
||||
default=0.1,
|
||||
title="Tolerance",
|
||||
description="Warning tolerance around configured limits.",
|
||||
json_schema_extra={"precision": 4},
|
||||
)
|
||||
|
||||
class ScanArgumentConfig(BaseModel):
|
||||
"""ScanArgument metadata applied through Field extras."""
|
||||
|
||||
settling_time: float = Field(
|
||||
default=0.0,
|
||||
**ScanArgument(
|
||||
display_name="Settling time",
|
||||
description="Time to wait after moving.",
|
||||
units="s",
|
||||
precision=3,
|
||||
ge=0,
|
||||
).model_dump(),
|
||||
)
|
||||
frames: int = Field(
|
||||
default=1,
|
||||
**ScanArgument(
|
||||
display_name="Frames", description="Number of frames per trigger.", ge=1
|
||||
).model_dump(),
|
||||
)
|
||||
|
||||
class DeviceSignalLimitsConfig(BaseModel):
|
||||
"""Device, signal, and numeric fields whose units follow the selected device."""
|
||||
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
|
||||
device: DeviceBase | str = Field(
|
||||
default="",
|
||||
**ScanArgument(display_name="Device", description="Positioner device.").model_dump(),
|
||||
)
|
||||
signal: Signal | str | None = Field(
|
||||
default=None,
|
||||
**ScanArgument(display_name="Signal", description="Device signal.").model_dump(),
|
||||
)
|
||||
low_limit: float | None = Field(
|
||||
default=None,
|
||||
**ScanArgument(
|
||||
display_name="Low limit",
|
||||
description="Optional lower limit.",
|
||||
reference_units="device",
|
||||
precision=4,
|
||||
).model_dump(),
|
||||
)
|
||||
high_limit: float | None = Field(
|
||||
default=None,
|
||||
**ScanArgument(
|
||||
display_name="High limit",
|
||||
description="Optional upper limit.",
|
||||
reference_units="device",
|
||||
precision=4,
|
||||
).model_dump(),
|
||||
)
|
||||
|
||||
class DisplayConfig(BaseModel):
|
||||
title: str | None = Field(
|
||||
default=None, title="Title", description="Optional display title."
|
||||
)
|
||||
show_grid: bool = Field(default=True, title="Show grid")
|
||||
refresh_interval: int = Field(
|
||||
default=1000, title="Refresh interval", description="Refresh interval in milliseconds."
|
||||
)
|
||||
|
||||
class DeviceAndSignalConfig(BaseModel):
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
|
||||
title: str | None = Field(
|
||||
default=None, title="Title", description="Optional display title."
|
||||
)
|
||||
device: DeviceBase | str = Field(
|
||||
default="", title="Device", description="BEC device selection."
|
||||
)
|
||||
signal: Signal | str | None = Field(
|
||||
default=None,
|
||||
title="Signal",
|
||||
description="Signal selection scoped to the selected device.",
|
||||
)
|
||||
refresh_interval: int = Field(
|
||||
default=1000, title="Refresh interval", description="Refresh interval in milliseconds."
|
||||
)
|
||||
|
||||
class DeviceOnlyConfig(BaseModel):
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
|
||||
title: str | None = Field(
|
||||
default=None, title="Title", description="Optional display title."
|
||||
)
|
||||
device: DeviceBase | str = Field(
|
||||
default="", title="Device", description="BEC device selection."
|
||||
)
|
||||
refresh_interval: int = Field(
|
||||
default=1000, title="Refresh interval", description="Refresh interval in milliseconds."
|
||||
)
|
||||
|
||||
class SignalOnlyConfig(BaseModel):
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
|
||||
title: str | None = Field(
|
||||
default=None, title="Title", description="Optional display title."
|
||||
)
|
||||
signal: Signal | str | None = Field(
|
||||
default=None,
|
||||
title="Signal",
|
||||
description="Global BEC signal selection without a device field.",
|
||||
)
|
||||
refresh_interval: int = Field(
|
||||
default=1000, title="Refresh interval", description="Refresh interval in milliseconds."
|
||||
)
|
||||
|
||||
class ExampleWindow(QWidget):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.setWindowTitle("PydanticWidgetForm example")
|
||||
|
||||
self._tabs = QTabWidget(self)
|
||||
self._output = QTextEdit(self)
|
||||
self._output.setReadOnly(True)
|
||||
self._output.setPlaceholderText("Validated form data appears here.")
|
||||
self._forms: list[PydanticWidgetForm] = []
|
||||
|
||||
self._add_form("Basic", PydanticWidgetForm(BasicScanConfig))
|
||||
self._add_form("Limits", PydanticWidgetForm(LimitConfig))
|
||||
self._add_form("ScanArgument", PydanticWidgetForm(ScanArgumentConfig))
|
||||
self._add_form("Display", PydanticWidgetForm(DisplayConfig))
|
||||
self._add_form("Device + signal", PydanticWidgetForm(DeviceAndSignalConfig))
|
||||
self._add_form("Device limits", PydanticWidgetForm(DeviceSignalLimitsConfig))
|
||||
self._add_form("Device only", PydanticWidgetForm(DeviceOnlyConfig))
|
||||
self._add_form("Signal only", PydanticWidgetForm(SignalOnlyConfig))
|
||||
|
||||
show_data = QPushButton("Show current tab data", self)
|
||||
show_data.clicked.connect(self._show_current_data)
|
||||
|
||||
layout = QVBoxLayout(self)
|
||||
layout.addWidget(QLabel("Generated forms from Pydantic models", self))
|
||||
layout.addWidget(self._tabs)
|
||||
layout.addWidget(show_data)
|
||||
layout.addWidget(self._output)
|
||||
|
||||
def _add_form(self, title: str, form: PydanticWidgetForm) -> None:
|
||||
form.changed.connect(lambda _form=form: self._on_form_changed(_form))
|
||||
self._forms.append(form)
|
||||
self._tabs.addTab(form, title)
|
||||
|
||||
def _show_current_data(self, _checked: bool = False, *, validate: bool = True) -> None:
|
||||
form = self._forms[self._tabs.currentIndex()]
|
||||
if validate:
|
||||
try:
|
||||
data = form.get_data()
|
||||
except (ValidationError, ValueError) as exc:
|
||||
self._output.setPlainText(str(exc))
|
||||
return
|
||||
key = "data"
|
||||
else:
|
||||
data = form.raw_data()
|
||||
key = "raw_data"
|
||||
self._output.setPlainText(
|
||||
json.dumps(
|
||||
{key: data, "dirty_fields": sorted(form.dirty_fields())}, indent=2, default=str
|
||||
)
|
||||
)
|
||||
|
||||
def _on_form_changed(self, form: PydanticWidgetForm) -> None:
|
||||
if form is self._forms[self._tabs.currentIndex()]:
|
||||
self._show_current_data(validate=False)
|
||||
|
||||
app = QApplication(sys.argv)
|
||||
apply_theme("dark")
|
||||
window = ExampleWindow()
|
||||
window.show()
|
||||
sys.exit(app.exec())
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user