1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-05-09 16:22:08 +02:00

Compare commits

..

34 Commits

Author SHA1 Message Date
wakonig_k 7e4e37ad16 fix(web_links): update documentation links in BECWebLinksMixin 2026-05-05 16:09:47 +02:00
semantic-release c1d5069a48 3.8.0
Automatically generated by python-semantic-release
2026-05-01 15:16:03 +00:00
wyzula_j 0b1f0b4c26 fix(dock_area): change to show_dialo=False for CLI profile baseline restore 2026-05-01 17:15:03 +02:00
wyzula_j cc825972c2 fix(dock_area): cli call load_profile has restore_baseline kwarg 2026-05-01 17:15:03 +02:00
wyzula_j 17865a2c33 feat(dock_area): add CLI restore current profile from baseline with optional confirmation dialog 2026-05-01 17:15:03 +02:00
semantic-release 0728811238 3.7.3
Automatically generated by python-semantic-release
2026-05-01 11:33:30 +00:00
wyzula_j 717d74b19e test(dock_area): remove low-value tests 2026-05-01 13:32:46 +02:00
wyzula_j dd32caf6e8 fix(dock_area): profile names changed, default->baseline, user->runtime 2026-05-01 13:32:46 +02:00
semantic-release 603edede9c 3.7.2
Automatically generated by python-semantic-release
2026-04-29 13:13:24 +00:00
copilot-swe-agent[bot] 30ef25533a fix(workspace-actions): use try/finally and restore previous blocked state in refresh_profiles
Agent-Logs-Url: https://github.com/bec-project/bec_widgets/sessions/004cb4bc-5015-485e-a803-1e63876b7024

Co-authored-by: wyzula-jan <133381102+wyzula-jan@users.noreply.github.com>
2026-04-29 15:12:35 +02:00
wyzula_j 73b44cffb2 fix(dock-area): avoid switching profile when saving new profile 2026-04-29 15:12:35 +02:00
wakonig_k a614d662d6 test: fix assertions after updating ophyd devices templates
Co-authored-by: Copilot <copilot@github.com>
2026-04-28 14:45:42 +02:00
wakonig_k 3f1aa80756 chore: update header comments in script files to indicate AI generation 2026-04-24 16:17:44 +02:00
wakonig_k 409c9e5bfa ci: increase threshold to 20 percent 2026-04-22 13:10:15 +02:00
wakonig_k 19b5c8f724 ci: fix benchmark upload 2026-04-22 13:10:15 +02:00
wyzula_j 5056ef8946 test: remove references to "scan_motors" in tests 2026-04-22 08:12:39 +02:00
wakonig_k 551d38d901 build: add pytest-benchmark dependency 2026-04-21 13:58:38 +02:00
wakonig_k 999b7a2321 ci: add benchmark workflow 2026-04-21 13:58:38 +02:00
semantic-release 5dc373bd8e 3.7.1
Automatically generated by python-semantic-release
2026-04-21 11:56:57 +00:00
appel_c 91afc775d5 test: fix exit status and status access in tests 2026-04-21 13:56:12 +02:00
appel_c 55694ff2b9 fix(heatmap): fix access to status from metadata 2026-04-21 13:56:12 +02:00
perl_d 5b68a51aaa tests: skip broken and mark with issue 2026-04-21 12:43:31 +02:00
semantic-release f13fa75e25 3.7.0
Automatically generated by python-semantic-release
2026-04-21 10:42:54 +00:00
wakonig_k 0cf84cd1d8 feat: move companion app to applications 2026-04-21 12:42:08 +02:00
wakonig_k 3e77f54034 refactor: cleanup of imports 2026-04-21 12:42:08 +02:00
semantic-release f7616102d8 3.6.0
Automatically generated by python-semantic-release
2026-04-21 06:39:15 +00:00
perl_d 5a497c3598 fix: small usability changes 2026-04-21 08:38:24 +02:00
perl_d 23e3644619 feat: add button/slot to pause/unpause logs 2026-04-21 08:38:24 +02:00
perl_d a5db2dc340 fix: change resize mode to interactive 2026-04-21 08:38:24 +02:00
perl_d 2e8f43fcac feat: add logpanel to menu 2026-04-21 08:38:24 +02:00
perl_d 09bb1121d8 feat: migrate logpanel to table model/view 2026-04-21 08:38:24 +02:00
semantic-release c9aaa77b3c 3.5.1
Automatically generated by python-semantic-release
2026-04-20 13:06:31 +00:00
perl_d f7a1ee49a4 fix: don't assume attr exists if we timed out waiting for it 2026-04-20 15:05:47 +02:00
perl_d 8e51c1adb6 refactor: don't import real widgets in client 2026-04-19 16:05:56 +02:00
103 changed files with 3137 additions and 1777 deletions
+169
View File
@@ -0,0 +1,169 @@
##########################
### AI-generated file. ###
##########################
"""Aggregate and merge benchmark JSON files.
The workflow runs the same benchmark suite on multiple independent runners.
This script reads every JSON file produced by those attempts, normalizes the
contained benchmark values, and writes a compact mapping JSON where each value is
the median across attempts. It can also merge independent hyperfine JSON files
from one runner into a single hyperfine-style JSON file.
"""
from __future__ import annotations
import argparse
import json
import statistics
from pathlib import Path
from typing import Any
from compare_benchmarks import Benchmark, extract_benchmarks
def collect_benchmarks(paths: list[Path]) -> dict[str, list[Benchmark]]:
"""Collect benchmarks from multiple JSON files.
Args:
paths (list[Path]): Paths to hyperfine, pytest-benchmark, or compact
mapping JSON files.
Returns:
dict[str, list[Benchmark]]: Benchmarks grouped by benchmark name.
"""
collected: dict[str, list[Benchmark]] = {}
for path in paths:
for name, benchmark in extract_benchmarks(path).items():
collected.setdefault(name, []).append(benchmark)
return collected
def aggregate(collected: dict[str, list[Benchmark]]) -> dict[str, dict[str, object]]:
"""Aggregate grouped benchmarks using the median value.
Args:
collected (dict[str, list[Benchmark]]): Benchmarks grouped by benchmark
name.
Returns:
dict[str, dict[str, object]]: Compact mapping JSON data. Each benchmark
contains ``value``, ``unit``, ``metric``, ``attempts``, and
``attempt_values``.
"""
aggregated: dict[str, dict[str, object]] = {}
for name, benchmarks in sorted(collected.items()):
values = [benchmark.value for benchmark in benchmarks]
unit = next((benchmark.unit for benchmark in benchmarks if benchmark.unit), "")
metric = next((benchmark.metric for benchmark in benchmarks if benchmark.metric), "value")
aggregated[name] = {
"value": statistics.median(values),
"unit": unit,
"metric": f"median-of-attempt-{metric}",
"attempts": len(values),
"attempt_values": values,
}
return aggregated
def merge_hyperfine_results(paths: list[Path]) -> dict[str, Any]:
"""Merge hyperfine result files.
Args:
paths (list[Path]): Hyperfine JSON files to merge.
Returns:
dict[str, Any]: Hyperfine-style JSON object containing all result rows.
Raises:
ValueError: If any file has no hyperfine ``results`` list.
"""
merged: dict[str, Any] = {"results": []}
for path in paths:
data = json.loads(path.read_text(encoding="utf-8"))
results = data.get("results", []) if isinstance(data, dict) else None
if not isinstance(results, list):
raise ValueError(f"{path} has no hyperfine results list")
merged["results"].extend(results)
return merged
def main_from_paths(input_dir: Path, output: Path) -> int:
"""Aggregate all JSON files in a directory and write the result.
Args:
input_dir (Path): Directory containing benchmark JSON files.
output (Path): Path where the aggregate JSON should be written.
Returns:
int: Always ``0`` on success.
Raises:
ValueError: If no JSON files are found in ``input_dir``.
"""
paths = sorted(input_dir.rglob("*.json"))
if not paths:
raise ValueError(f"No benchmark JSON files found in {input_dir}")
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(
json.dumps(aggregate(collect_benchmarks(paths)), indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
return 0
def merge_from_paths(input_dir: Path, output: Path) -> int:
"""Merge all hyperfine JSON files in a directory and write the result.
Args:
input_dir (Path): Directory containing hyperfine JSON files.
output (Path): Path where the merged JSON should be written.
Returns:
int: Always ``0`` on success.
Raises:
ValueError: If no JSON files are found in ``input_dir``.
"""
paths = sorted(input_dir.glob("*.json"))
if not paths:
raise ValueError(f"No hyperfine JSON files found in {input_dir}")
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(
json.dumps(merge_hyperfine_results(paths), indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
return 0
def main() -> int:
"""Run the benchmark aggregation command line interface.
Returns:
int: Always ``0`` on success.
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--mode",
choices=("aggregate", "merge-hyperfine"),
default="aggregate",
help="Operation to perform.",
)
parser.add_argument("--input-dir", required=True, type=Path)
parser.add_argument("--output", required=True, type=Path)
args = parser.parse_args()
if args.mode == "merge-hyperfine":
return merge_from_paths(input_dir=args.input_dir, output=args.output)
return main_from_paths(input_dir=args.input_dir, output=args.output)
if __name__ == "__main__":
raise SystemExit(main())
+454
View File
@@ -0,0 +1,454 @@
##########################
### AI-generated file. ###
##########################
"""Compare benchmark JSON files and write a GitHub Actions summary.
The script supports JSON emitted by hyperfine, JSON emitted by pytest-benchmark,
and a compact mapping format generated by ``aggregate_benchmarks.py``. Timing
formats prefer median values and fall back to mean values when median values are
not present.
"""
from __future__ import annotations
import argparse
import json
import math
from dataclasses import dataclass
from pathlib import Path
from typing import Any
@dataclass(frozen=True)
class Benchmark:
"""Normalized benchmark result.
Attributes:
name (str): Stable benchmark name used to match baseline and current results.
value (float): Numeric benchmark value used for comparison.
unit (str): Display unit for the value, for example ``"s"``.
metric (str): Source metric name, for example ``"median"`` or ``"mean"``.
"""
name: str
value: float
unit: str
metric: str = "value"
@dataclass(frozen=True)
class Comparison:
"""Comparison between one baseline benchmark and one current benchmark.
Attributes:
name (str): Benchmark name.
baseline (float): Baseline benchmark value.
current (float): Current benchmark value.
delta_percent (float): Percent change from baseline to current.
unit (str): Display unit for both values.
metric (str): Current result metric used for comparison.
regressed (bool): Whether the change exceeds the configured threshold in
the worse direction.
improved (bool): Whether the change exceeds the configured threshold in
the better direction.
"""
name: str
baseline: float
current: float
delta_percent: float
unit: str
metric: str
regressed: bool
improved: bool
def _read_json(path: Path) -> Any:
"""Read JSON data from a file.
Args:
path (Path): Path to the JSON file.
Returns:
Any: Parsed JSON value.
"""
with path.open("r", encoding="utf-8") as stream:
return json.load(stream)
def _as_float(value: Any) -> float | None:
"""Convert a value to a finite float.
Args:
value (Any): Value to convert.
Returns:
float | None: Converted finite float, or ``None`` if conversion fails.
"""
try:
result = float(value)
except (TypeError, ValueError):
return None
if math.isfinite(result):
return result
return None
def _extract_hyperfine(data: dict[str, Any]) -> dict[str, Benchmark]:
"""Extract normalized benchmarks from hyperfine JSON.
Args:
data (dict[str, Any]): Parsed hyperfine JSON object.
Returns:
dict[str, Benchmark]: Benchmarks keyed by command name.
"""
benchmarks: dict[str, Benchmark] = {}
for result in data.get("results", []):
if not isinstance(result, dict):
continue
name = str(result.get("command") or result.get("name") or "").strip()
metric = "median"
value = _as_float(result.get(metric))
if value is None:
metric = "mean"
value = _as_float(result.get(metric))
if name and value is not None:
benchmarks[name] = Benchmark(name=name, value=value, unit="s", metric=metric)
return benchmarks
def _extract_pytest_benchmark(data: dict[str, Any]) -> dict[str, Benchmark]:
"""Extract normalized benchmarks from pytest-benchmark JSON.
Args:
data (dict[str, Any]): Parsed pytest-benchmark JSON object.
Returns:
dict[str, Benchmark]: Benchmarks keyed by full benchmark name.
"""
benchmarks: dict[str, Benchmark] = {}
for benchmark in data.get("benchmarks", []):
if not isinstance(benchmark, dict):
continue
name = str(benchmark.get("fullname") or benchmark.get("name") or "").strip()
stats = benchmark.get("stats", {})
value = None
metric = "median"
if isinstance(stats, dict):
value = _as_float(stats.get(metric))
if value is None:
metric = "mean"
value = _as_float(stats.get(metric))
if name and value is not None:
benchmarks[name] = Benchmark(name=name, value=value, unit="s", metric=metric)
return benchmarks
def _extract_simple_mapping(data: dict[str, Any]) -> dict[str, Benchmark]:
"""Extract normalized benchmarks from a compact mapping JSON object.
Args:
data (dict[str, Any]): Parsed mapping where each benchmark is either a
raw number or an object containing ``value``, ``unit``, and ``metric``.
Returns:
dict[str, Benchmark]: Benchmarks keyed by mapping key.
"""
benchmarks: dict[str, Benchmark] = {}
for name, raw_value in data.items():
if name in {"version", "context", "commit", "timestamp"}:
continue
value = _as_float(raw_value)
unit = ""
metric = "value"
if value is None and isinstance(raw_value, dict):
value = _as_float(raw_value.get("value"))
unit = str(raw_value.get("unit") or "")
metric = str(raw_value.get("metric") or "value")
if value is not None:
benchmarks[str(name)] = Benchmark(name=str(name), value=value, unit=unit, metric=metric)
return benchmarks
def extract_benchmarks(path: Path) -> dict[str, Benchmark]:
"""Extract normalized benchmarks from a supported JSON file.
Args:
path (Path): Path to a hyperfine, pytest-benchmark, or compact mapping
JSON file.
Returns:
dict[str, Benchmark]: Normalized benchmarks keyed by name.
Raises:
ValueError: If the JSON root is not an object or no supported benchmark
entries can be extracted.
"""
data = _read_json(path)
if not isinstance(data, dict):
raise ValueError(f"{path} must contain a JSON object")
extractors = (_extract_hyperfine, _extract_pytest_benchmark, _extract_simple_mapping)
for extractor in extractors:
benchmarks = extractor(data)
if benchmarks:
return benchmarks
raise ValueError(f"No supported benchmark entries found in {path}")
def compare_benchmarks(
baseline: dict[str, Benchmark],
current: dict[str, Benchmark],
threshold_percent: float,
higher_is_better: bool,
) -> tuple[list[Comparison], list[str], list[str]]:
"""Compare baseline benchmarks with current benchmarks.
Args:
baseline (dict[str, Benchmark]): Baseline benchmarks keyed by name.
current (dict[str, Benchmark]): Current benchmarks keyed by name.
threshold_percent (float): Regression threshold in percent.
higher_is_better (bool): If ``True``, lower current values are treated as
regressions. If ``False``, higher current values are treated as
regressions.
Returns:
tuple[list[Comparison], list[str], list[str]]: Comparisons for common
benchmark names, names missing from current results, and names newly
present in current results.
"""
comparisons: list[Comparison] = []
missing_in_current: list[str] = []
new_in_current: list[str] = []
for name, baseline_benchmark in sorted(baseline.items()):
current_benchmark = current.get(name)
if current_benchmark is None:
missing_in_current.append(name)
continue
if baseline_benchmark.value == 0:
delta_percent = 0.0
else:
delta_percent = (
(current_benchmark.value - baseline_benchmark.value)
/ abs(baseline_benchmark.value)
* 100
)
if higher_is_better:
regressed = delta_percent <= -threshold_percent
improved = delta_percent >= threshold_percent
else:
regressed = delta_percent >= threshold_percent
improved = delta_percent <= -threshold_percent
comparisons.append(
Comparison(
name=name,
baseline=baseline_benchmark.value,
current=current_benchmark.value,
delta_percent=delta_percent,
unit=current_benchmark.unit or baseline_benchmark.unit,
metric=current_benchmark.metric,
regressed=regressed,
improved=improved,
)
)
for name in sorted(set(current) - set(baseline)):
new_in_current.append(name)
return comparisons, missing_in_current, new_in_current
def _format_value(value: float, unit: str) -> str:
"""Format a benchmark value for Markdown output.
Args:
value (float): Numeric benchmark value.
unit (str): Display unit.
Returns:
str: Formatted value with optional unit suffix.
"""
suffix = f" {unit}" if unit else ""
return f"{value:.6g}{suffix}"
def _format_status(comparison: Comparison) -> str:
"""Format a comparison status for Markdown output."""
if comparison.regressed:
return ":red_circle: regressed"
if comparison.improved:
return ":green_circle: improved"
return "ok"
def write_summary(
path: Path,
comparisons: list[Comparison],
missing_in_current: list[str],
new_in_current: list[str],
threshold_percent: float,
higher_is_better: bool,
) -> None:
"""Write a Markdown benchmark comparison summary.
Args:
path (Path): Path where the summary should be written.
comparisons (list[Comparison]): Comparison rows for matching benchmarks.
missing_in_current (list[str]): Baseline benchmark names missing from the
current result.
new_in_current (list[str]): Current benchmark names not present in the
baseline result.
threshold_percent (float): Regression threshold in percent.
higher_is_better (bool): Whether higher benchmark values are considered
better.
"""
regressions = [comparison for comparison in comparisons if comparison.regressed]
improvements = [comparison for comparison in comparisons if comparison.improved]
direction = "higher is better" if higher_is_better else "lower is better"
sorted_comparisons = sorted(comparisons, key=lambda comparison: comparison.name)
lines = [
"<!-- bw-benchmark-comment -->",
"## Benchmark comparison",
"",
f"Threshold: {threshold_percent:g}% ({direction}).",
f"Result: {len(regressions)} regression(s), {len(improvements)} improvement(s) beyond threshold.",
]
lines.append("")
if regressions:
lines.extend(
[
f"{len(regressions)} benchmark(s) regressed beyond the configured threshold.",
"",
"| Benchmark | Baseline | Current | Change |",
"| --- | ---: | ---: | ---: |",
]
)
for comparison in regressions:
lines.append(
"| "
f"{comparison.name} | "
f"{_format_value(comparison.baseline, comparison.unit)} | "
f"{_format_value(comparison.current, comparison.unit)} | "
f"{comparison.delta_percent:+.2f}% |"
)
else:
lines.append("No benchmark regression exceeded the configured threshold.")
lines.append("")
if improvements:
lines.extend(
[
f"{len(improvements)} benchmark(s) improved beyond the configured threshold.",
"",
"| Benchmark | Baseline | Current | Change |",
"| --- | ---: | ---: | ---: |",
]
)
for comparison in improvements:
lines.append(
"| "
f"{comparison.name} | "
f"{_format_value(comparison.baseline, comparison.unit)} | "
f"{_format_value(comparison.current, comparison.unit)} | "
f"{comparison.delta_percent:+.2f}% |"
)
else:
lines.append("No benchmark improvement exceeded the configured threshold.")
if sorted_comparisons:
lines.extend(
[
"",
"<details>",
"<summary>All benchmark results</summary>",
"",
"| Benchmark | Baseline | Current | Change | Status |",
"| --- | ---: | ---: | ---: | --- |",
]
)
for comparison in sorted_comparisons:
lines.append(
"| "
f"{comparison.name} | "
f"{_format_value(comparison.baseline, comparison.unit)} | "
f"{_format_value(comparison.current, comparison.unit)} | "
f"{comparison.delta_percent:+.2f}% | "
f"{_format_status(comparison)} |"
)
lines.extend(["", "</details>"])
if missing_in_current:
lines.extend(["", "Missing benchmarks in the current run:"])
lines.extend(f"- `{name}`" for name in missing_in_current)
if new_in_current:
lines.extend(["", "New benchmarks in the current run:"])
lines.extend(f"- `{name}`" for name in new_in_current)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
def main() -> int:
"""Run the benchmark comparison command line interface.
Returns:
int: ``1`` when a regression exceeds the threshold, otherwise ``0``.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--baseline", required=True, type=Path)
parser.add_argument("--current", required=True, type=Path)
parser.add_argument("--summary", required=True, type=Path)
parser.add_argument("--threshold-percent", required=True, type=float)
parser.add_argument("--higher-is-better", action="store_true")
args = parser.parse_args()
baseline = extract_benchmarks(args.baseline)
current = extract_benchmarks(args.current)
comparisons, missing_in_current, new_in_current = compare_benchmarks(
baseline=baseline,
current=current,
threshold_percent=args.threshold_percent,
higher_is_better=args.higher_is_better,
)
write_summary(
path=args.summary,
comparisons=comparisons,
missing_in_current=missing_in_current,
new_in_current=new_in_current,
threshold_percent=args.threshold_percent,
higher_is_better=args.higher_is_better,
)
return 1 if any(comparison.regressed for comparison in comparisons) else 0
if __name__ == "__main__":
raise SystemExit(main())
+74
View File
@@ -0,0 +1,74 @@
#!/usr/bin/env bash
##########################
### AI-generated file. ###
##########################
set -euo pipefail
mkdir -p benchmark-results
benchmark_json="${BENCHMARK_JSON:-benchmark-results/current.json}"
benchmark_root="$(dirname "$benchmark_json")"
hyperfine_benchmark_dir="${BENCHMARK_HYPERFINE_DIR:-tests/benchmarks/hyperfine}"
pytest_benchmark_dirs="${BENCHMARK_PYTEST_DIRS:-${BENCHMARK_PYTEST_DIR:-}}"
benchmark_work_dir="$benchmark_root/raw-results"
hyperfine_json_dir="$benchmark_work_dir/hyperfine"
pytest_json="$benchmark_work_dir/pytest.json"
shopt -s nullglob
benchmark_scripts=()
benchmark_scripts=("$hyperfine_benchmark_dir"/benchmark_*.sh)
shopt -u nullglob
pytest_dirs=()
for pytest_benchmark_dir in $pytest_benchmark_dirs; do
if [ -d "$pytest_benchmark_dir" ]; then
pytest_dirs+=("$pytest_benchmark_dir")
else
echo "Pytest benchmark directory not found: $pytest_benchmark_dir" >&2
exit 1
fi
done
if [ "${#benchmark_scripts[@]}" -eq 0 ] && [ "${#pytest_dirs[@]}" -eq 0 ]; then
echo "No benchmark scripts or pytest benchmarks found" >&2
exit 1
fi
echo "Benchmark Python: $(command -v python)"
python -c 'import sys; print(sys.version)'
rm -rf "$benchmark_work_dir"
mkdir -p "$hyperfine_json_dir"
if [ "${#benchmark_scripts[@]}" -gt 0 ]; then
for benchmark_script in "${benchmark_scripts[@]}"; do
title="$(sed -n 's/^# BENCHMARK_TITLE:[[:space:]]*//p' "$benchmark_script" | head -n 1)"
if [ -z "$title" ]; then
title="$(basename "$benchmark_script" .sh)"
fi
benchmark_name="$(basename "$benchmark_script" .sh)"
benchmark_result_json="$hyperfine_json_dir/$benchmark_name.json"
echo "Preflight benchmark script: $benchmark_script"
bash "$benchmark_script"
hyperfine \
--show-output \
--warmup 1 \
--runs 5 \
--command-name "$title" \
--export-json "$benchmark_result_json" \
"bash $(printf "%q" "$benchmark_script")"
done
fi
if [ "${#pytest_dirs[@]}" -gt 0 ]; then
pytest \
-q "${pytest_dirs[@]}" \
--benchmark-only \
--benchmark-json "$pytest_json"
fi
python .github/scripts/aggregate_benchmarks.py \
--input-dir "$benchmark_work_dir" \
--output "$benchmark_json"
+125
View File
@@ -0,0 +1,125 @@
##########################
### AI-generated file. ###
##########################
"""Run a command with BEC e2e services available."""
from __future__ import annotations
import argparse
import os
import shutil
import subprocess
import tempfile
import time
from pathlib import Path
import bec_lib
from bec_ipython_client import BECIPythonClient
from bec_lib.redis_connector import RedisConnector
from bec_lib.service_config import ServiceConfig, ServiceConfigModel
from redis import Redis
def _wait_for_redis(host: str, port: int) -> None:
client = Redis(host=host, port=port)
deadline = time.monotonic() + 10
while time.monotonic() < deadline:
try:
if client.ping():
return
except Exception:
time.sleep(0.1)
raise RuntimeError(f"Redis did not start on {host}:{port}")
def _start_redis(files_path: Path, host: str, port: int) -> subprocess.Popen:
redis_server = shutil.which("redis-server")
if redis_server is None:
raise RuntimeError("redis-server executable not found")
return subprocess.Popen(
[
redis_server,
"--bind",
host,
"--port",
str(port),
"--save",
"",
"--appendonly",
"no",
"--dir",
str(files_path),
]
)
def _write_configs(files_path: Path, host: str, port: int) -> Path:
test_config = files_path / "test_config.yaml"
services_config = files_path / "services_config.yaml"
bec_lib_path = Path(bec_lib.__file__).resolve().parent
shutil.copyfile(bec_lib_path / "tests" / "test_config.yaml", test_config)
service_config = ServiceConfigModel(
redis={"host": host, "port": port}, file_writer={"base_path": str(files_path)}
)
services_config.write_text(service_config.model_dump_json(indent=4), encoding="utf-8")
return services_config
def _load_demo_config(services_config: Path) -> None:
bec = BECIPythonClient(ServiceConfig(services_config), RedisConnector, forced=True)
bec.start()
try:
bec.config.load_demo_config()
finally:
bec.shutdown()
bec._client._reset_singleton()
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("command", nargs=argparse.REMAINDER)
args = parser.parse_args()
if args.command[:1] == ["--"]:
args.command = args.command[1:]
if not args.command:
raise ValueError("No command provided")
host = "127.0.0.1"
port = 6379
with tempfile.TemporaryDirectory(prefix="bec-benchmark-") as tmp:
files_path = Path(tmp)
services_config = _write_configs(files_path, host, port)
redis_process = _start_redis(files_path, host, port)
processes = None
service_handler = None
try:
_wait_for_redis(host, port)
from bec_server.bec_server_utils.service_handler import ServiceHandler
service_handler = ServiceHandler(
bec_path=files_path, config_path=services_config, interface="subprocess"
)
processes = service_handler.start()
_load_demo_config(services_config)
env = os.environ.copy()
return subprocess.run(args.command, env=env, check=False).returncode
finally:
if service_handler is not None and processes is not None:
service_handler.stop(processes)
redis_process.terminate()
try:
redis_process.wait(timeout=10)
except subprocess.TimeoutExpired:
redis_process.kill()
if __name__ == "__main__":
raise SystemExit(main())
+242
View File
@@ -0,0 +1,242 @@
name: BW Benchmarks
on: [ workflow_call ]
permissions:
contents: read
env:
BENCHMARK_JSON: benchmark-results/current.json
BENCHMARK_BASELINE_JSON: gh-pages-benchmark-data/benchmarks/latest.json
BENCHMARK_SUMMARY: benchmark-results/summary.md
BENCHMARK_COMMAND: "bash .github/scripts/run_benchmarks.sh"
BENCHMARK_THRESHOLD_PERCENT: 20
BENCHMARK_HIGHER_IS_BETTER: false
jobs:
benchmark_attempt:
runs-on: ubuntu-latest
continue-on-error: true
permissions:
contents: read
defaults:
run:
shell: bash -el {0}
strategy:
fail-fast: false
matrix:
attempt: [ 1, 2, 3 ]
env:
BENCHMARK_JSON: benchmark-results/current-${{ matrix.attempt }}.json
BEC_CORE_BRANCH: main
OPHYD_DEVICES_BRANCH: main
PLUGIN_REPO_BRANCH: main
BENCHMARK_PYTEST_DIRS: tests/unit_tests/benchmarks
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
steps:
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
repository: bec-project/bec_widgets
ref: ${{ github.event.pull_request.head.sha || github.sha }}
- name: Set up Conda
uses: conda-incubator/setup-miniconda@v3
with:
auto-update-conda: true
auto-activate-base: true
python-version: "3.11"
- name: Install system dependencies
run: |
sudo apt-get update
sudo apt-get install -y libgl1 libegl1 x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb
sudo apt-get -y install libnss3 libxdamage1 libasound2t64 libatomic1 libxcursor1
sudo apt-get -y install ttyd hyperfine redis-server
- name: Install full e2e environment
run: |
echo -e "\033[35;1m Using branch $BEC_CORE_BRANCH of BEC CORE \033[0;m";
git clone --branch "$BEC_CORE_BRANCH" https://github.com/bec-project/bec.git
echo -e "\033[35;1m Using branch $OPHYD_DEVICES_BRANCH of OPHYD_DEVICES \033[0;m";
git clone --branch "$OPHYD_DEVICES_BRANCH" https://github.com/bec-project/ophyd_devices.git
export OHPYD_DEVICES_PATH=$PWD/ophyd_devices
echo -e "\033[35;1m Using branch $PLUGIN_REPO_BRANCH of bec_testing_plugin \033[0;m";
git clone --branch "$PLUGIN_REPO_BRANCH" https://github.com/bec-project/bec_testing_plugin.git
cd ./bec
conda create -q -n test-environment python=3.11
conda activate test-environment
source ./bin/install_bec_dev.sh -t
cd ../
python -m pip install -e ./ophyd_devices -e .[dev,pyside6] -e ./bec_testing_plugin pytest-benchmark
mkdir -p "$(dirname "$BENCHMARK_JSON")"
python .github/scripts/run_with_bec_servers.py -- bash -lc "$BENCHMARK_COMMAND"
test -s "$BENCHMARK_JSON"
- name: Upload benchmark artifact
uses: actions/upload-artifact@v4
with:
name: bw-benchmark-json-${{ matrix.attempt }}
path: ${{ env.BENCHMARK_JSON }}
benchmark:
needs: [ benchmark_attempt ]
runs-on: ubuntu-latest
permissions:
contents: read
issues: write
pull-requests: write
steps:
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
repository: bec-project/bec_widgets
ref: ${{ github.event.pull_request.head.sha || github.sha }}
- name: Download benchmark attempts
uses: actions/download-artifact@v4
with:
pattern: bw-benchmark-json-*
path: benchmark-results/attempts
merge-multiple: true
- name: Aggregate benchmark attempts
run: |
python .github/scripts/aggregate_benchmarks.py \
--input-dir benchmark-results/attempts \
--output "$BENCHMARK_JSON"
- name: Upload aggregate benchmark artifact
uses: actions/upload-artifact@v4
with:
name: bw-benchmark-json
path: ${{ env.BENCHMARK_JSON }}
- name: Fetch gh-pages benchmark data
run: |
if git ls-remote --exit-code --heads origin gh-pages; then
git clone --depth=1 --branch gh-pages "$GITHUB_SERVER_URL/$GITHUB_REPOSITORY.git" gh-pages-benchmark-data
else
mkdir -p gh-pages-benchmark-data
fi
- name: Compare with latest gh-pages benchmark
id: compare
continue-on-error: true
run: |
if [ ! -s "$BENCHMARK_BASELINE_JSON" ]; then
mkdir -p "$(dirname "$BENCHMARK_SUMMARY")"
{
echo "<!-- bw-benchmark-comment -->"
echo "## Benchmark comparison"
echo
echo "No benchmark baseline was found on gh-pages."
} > "$BENCHMARK_SUMMARY"
exit 0
fi
args=(
--baseline "$BENCHMARK_BASELINE_JSON"
--current "$BENCHMARK_JSON"
--summary "$BENCHMARK_SUMMARY"
--threshold-percent "$BENCHMARK_THRESHOLD_PERCENT"
)
if [ "$BENCHMARK_HIGHER_IS_BETTER" = "true" ]; then
args+=(--higher-is-better)
fi
set +e
python .github/scripts/compare_benchmarks.py "${args[@]}"
status=$?
set -e
if [ ! -s "$BENCHMARK_SUMMARY" ]; then
mkdir -p "$(dirname "$BENCHMARK_SUMMARY")"
{
echo "<!-- bw-benchmark-comment -->"
echo "## Benchmark comparison"
echo
echo "Benchmark comparison failed before writing a summary."
} > "$BENCHMARK_SUMMARY"
fi
exit "$status"
- name: Find existing benchmark PR comment
if: github.event_name == 'pull_request'
id: fc
uses: peter-evans/find-comment@v3
with:
issue-number: ${{ github.event.pull_request.number }}
comment-author: github-actions[bot]
body-includes: "<!-- bw-benchmark-comment -->"
- name: Create or update benchmark PR comment
if: github.event_name == 'pull_request'
uses: peter-evans/create-or-update-comment@v5
with:
issue-number: ${{ github.event.pull_request.number }}
comment-id: ${{ steps.fc.outputs.comment-id }}
body-path: ${{ env.BENCHMARK_SUMMARY }}
edit-mode: replace
- name: Fail on benchmark regression
if: github.event_name == 'pull_request' && steps.compare.outcome == 'failure'
run: exit 1
publish:
needs: [ benchmark ]
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
repository: bec-project/bec_widgets
ref: ${{ github.sha }}
- name: Download aggregate benchmark artifact
uses: actions/download-artifact@v4
with:
name: bw-benchmark-json
path: benchmark-results
- name: Verify aggregate benchmark artifact
run: test -s "$BENCHMARK_JSON"
- name: Prepare gh-pages for publishing
run: |
# Clean up any existing worktree/directory
if [ -d gh-pages-benchmark-data ]; then
git worktree remove gh-pages-benchmark-data --force || rm -rf gh-pages-benchmark-data
fi
if git ls-remote --exit-code --heads origin gh-pages; then
git fetch --depth=1 origin gh-pages
git worktree add gh-pages-benchmark-data FETCH_HEAD
else
git worktree add --detach gh-pages-benchmark-data
git -C gh-pages-benchmark-data checkout --orphan gh-pages
git -C gh-pages-benchmark-data rm -rf .
fi
- name: Publish benchmark data to gh-pages
working-directory: gh-pages-benchmark-data
run: |
mkdir -p benchmarks/history
cp "../$BENCHMARK_JSON" benchmarks/latest.json
cp "../$BENCHMARK_JSON" "benchmarks/history/${GITHUB_SHA}.json"
git config user.name "github-actions[bot]"
git config user.email "41898282+github-actions[bot]@users.noreply.github.com"
git add benchmarks/latest.json "benchmarks/history/${GITHUB_SHA}.json"
git commit -m "Update BW benchmark data for ${GITHUB_SHA}" || exit 0
git push origin HEAD:gh-pages
+17 -7
View File
@@ -1,19 +1,19 @@
name: Full CI
on:
on:
push:
pull_request:
workflow_dispatch:
inputs:
BEC_WIDGETS_BRANCH:
description: 'Branch of BEC Widgets to install'
description: "Branch of BEC Widgets to install"
required: false
type: string
BEC_CORE_BRANCH:
description: 'Branch of BEC Core to install'
description: "Branch of BEC Core to install"
required: false
type: string
OPHYD_DEVICES_BRANCH:
description: 'Branch of Ophyd Devices to install'
description: "Branch of Ophyd Devices to install"
required: false
type: string
@@ -23,6 +23,7 @@ concurrency:
permissions:
pull-requests: write
contents: read
jobs:
check_pr_status:
@@ -33,6 +34,15 @@ jobs:
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/formatter.yml
benchmark:
needs: [check_pr_status]
if: needs.check_pr_status.outputs.branch-pr == ''
permissions:
contents: write
issues: write
pull-requests: write
uses: ./.github/workflows/benchmark.yml
unit-test:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
@@ -69,9 +79,9 @@ jobs:
uses: ./.github/workflows/child_repos.yml
with:
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main'}}
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main'}}
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
plugin_repos:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
@@ -81,4 +91,4 @@ jobs:
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
secrets:
GH_READ_TOKEN: ${{ secrets.GH_READ_TOKEN }}
GH_READ_TOKEN: ${{ secrets.GH_READ_TOKEN }}
+12 -13
View File
@@ -1,25 +1,25 @@
name: Run Pytest with different Python versions
on:
on:
workflow_call:
inputs:
pr_number:
description: 'Pull request number'
description: "Pull request number"
required: false
type: number
BEC_CORE_BRANCH:
description: 'Branch of BEC Core to install'
description: "Branch of BEC Core to install"
required: false
default: 'main'
default: "main"
type: string
OPHYD_DEVICES_BRANCH:
description: 'Branch of Ophyd Devices to install'
description: "Branch of Ophyd Devices to install"
required: false
default: 'main'
default: "main"
type: string
BEC_WIDGETS_BRANCH:
description: 'Branch of BEC Widgets to install'
description: "Branch of BEC Widgets to install"
required: false
default: 'main'
default: "main"
type: string
jobs:
@@ -30,15 +30,14 @@ jobs:
python-version: ["3.11", "3.12", "3.13"]
env:
BEC_WIDGETS_BRANCH: main # Set the branch you want for bec_widgets
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
BEC_WIDGETS_BRANCH: main # Set the branch you want for bec_widgets
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
PROJECT_PATH: ${{ github.repository }}
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
steps:
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
@@ -56,4 +55,4 @@ jobs:
- name: Run Pytest
run: |
pip install pytest pytest-random-order
pytest -v --junitxml=report.xml --random-order ./tests/unit_tests
pytest -v --junitxml=report.xml --random-order --ignore=tests/unit_tests/benchmarks ./tests/unit_tests
+10 -12
View File
@@ -1,32 +1,30 @@
name: Run Pytest with Coverage
on:
on:
workflow_call:
inputs:
pr_number:
description: 'Pull request number'
description: "Pull request number"
required: false
type: number
BEC_CORE_BRANCH:
description: 'Branch of BEC Core to install'
description: "Branch of BEC Core to install"
required: false
default: 'main'
default: "main"
type: string
OPHYD_DEVICES_BRANCH:
description: 'Branch of Ophyd Devices to install'
description: "Branch of Ophyd Devices to install"
required: false
default: 'main'
default: "main"
type: string
BEC_WIDGETS_BRANCH:
description: 'Branch of BEC Widgets to install'
description: "Branch of BEC Widgets to install"
required: false
default: 'main'
default: "main"
type: string
secrets:
CODECOV_TOKEN:
required: true
permissions:
pull-requests: write
@@ -55,7 +53,7 @@ jobs:
- name: Run Pytest with Coverage
id: coverage
run: pytest --random-order --cov=bec_widgets --cov-config=pyproject.toml --cov-branch --cov-report=xml --no-cov-on-fail tests/unit_tests/
run: pytest --random-order --cov=bec_widgets --cov-config=pyproject.toml --cov-branch --cov-report=xml --no-cov-on-fail --ignore=tests/unit_tests/benchmarks tests/unit_tests/
- name: Upload test artifacts
uses: actions/upload-artifact@v4
@@ -69,4 +67,4 @@ jobs:
uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
slug: bec-project/bec_widgets
slug: bec-project/bec_widgets
+137
View File
@@ -1,6 +1,143 @@
# CHANGELOG
## v3.8.0 (2026-05-01)
### Bug Fixes
- **dock_area**: Change to show_dialo=False for CLI profile baseline restore
([`0b1f0b4`](https://github.com/bec-project/bec_widgets/commit/0b1f0b4c262ff31469b7114b9f00bf0a7b85e8f2))
- **dock_area**: Cli call load_profile has restore_baseline kwarg
([`cc82597`](https://github.com/bec-project/bec_widgets/commit/cc825972c202cd9ded32f8b2d1ce5f822c2ebdba))
### Features
- **dock_area**: Add CLI restore current profile from baseline with optional confirmation dialog
([`17865a2`](https://github.com/bec-project/bec_widgets/commit/17865a2c338a4a1f944659dde4ec05c25a8dd963))
## v3.7.3 (2026-05-01)
### Bug Fixes
- **dock_area**: Profile names changed, default->baseline, user->runtime
([`dd32caf`](https://github.com/bec-project/bec_widgets/commit/dd32caf6e815fd1922b6aae84d00decad9dbf869))
### Testing
- **dock_area**: Remove low-value tests
([`717d74b`](https://github.com/bec-project/bec_widgets/commit/717d74b19e8c6960209190c47ba32732ffaa0094))
## v3.7.2 (2026-04-29)
### Bug Fixes
- **dock-area**: Avoid switching profile when saving new profile
([`73b44cf`](https://github.com/bec-project/bec_widgets/commit/73b44cffb219347cacb609f3b93068eda6701b42))
- **workspace-actions**: Use try/finally and restore previous blocked state in refresh_profiles
([`30ef255`](https://github.com/bec-project/bec_widgets/commit/30ef25533af9df5a9bd9e69808dc49fdf22f4318))
Agent-Logs-Url:
https://github.com/bec-project/bec_widgets/sessions/004cb4bc-5015-485e-a803-1e63876b7024
Co-authored-by: wyzula-jan <133381102+wyzula-jan@users.noreply.github.com>
### Build System
- Add pytest-benchmark dependency
([`551d38d`](https://github.com/bec-project/bec_widgets/commit/551d38d90111361e64bfcf10a1409be71dd298bc))
### Chores
- Update header comments in script files to indicate AI generation
([`3f1aa80`](https://github.com/bec-project/bec_widgets/commit/3f1aa80756368454c3631cb6cf9d29db28af177a))
### Continuous Integration
- Add benchmark workflow
([`999b7a2`](https://github.com/bec-project/bec_widgets/commit/999b7a2321f2f222c04b056a2db4280f66de9c48))
- Fix benchmark upload
([`19b5c8f`](https://github.com/bec-project/bec_widgets/commit/19b5c8f724dbdb7421957c290f9213bf072392df))
- Increase threshold to 20 percent
([`409c9e5`](https://github.com/bec-project/bec_widgets/commit/409c9e5bfacdfc003f1bc8c9944f01798bd3818e))
### Testing
- Fix assertions after updating ophyd devices templates
([`a614d66`](https://github.com/bec-project/bec_widgets/commit/a614d662d6da716a49afb4ed3a3903f108210386))
Co-authored-by: Copilot <copilot@github.com>
- Remove references to "scan_motors" in tests
([`5056ef8`](https://github.com/bec-project/bec_widgets/commit/5056ef8946d03a20e802709d3fe81c84c195fe41))
## v3.7.1 (2026-04-21)
### Bug Fixes
- **heatmap**: Fix access to status from metadata
([`55694ff`](https://github.com/bec-project/bec_widgets/commit/55694ff2b96581e03c63c8b8e068e2db79bcf780))
### Testing
- Fix exit status and status access in tests
([`91afc77`](https://github.com/bec-project/bec_widgets/commit/91afc775d59b4ba31bed3585847a67c301acf9b0))
## v3.7.0 (2026-04-21)
### Features
- Move companion app to applications
([`0cf84cd`](https://github.com/bec-project/bec_widgets/commit/0cf84cd1d839ac4a39ffb5fb9ba57d432e04348a))
### Refactoring
- Cleanup of imports
([`3e77f54`](https://github.com/bec-project/bec_widgets/commit/3e77f540345f56b9f184a332fcdd50d4d4c8c621))
## v3.6.0 (2026-04-21)
### Bug Fixes
- Change resize mode to interactive
([`a5db2dc`](https://github.com/bec-project/bec_widgets/commit/a5db2dc340f3386e68b300fd4528a44f87cbbf97))
- Small usability changes
([`5a497c3`](https://github.com/bec-project/bec_widgets/commit/5a497c3598c2d8f27916d91d53c646d5d6d3a4a7))
### Features
- Add button/slot to pause/unpause logs
([`23e3644`](https://github.com/bec-project/bec_widgets/commit/23e3644619de958bcfdb8a0b2ee1f7c2ce05b235))
- Add logpanel to menu
([`2e8f43f`](https://github.com/bec-project/bec_widgets/commit/2e8f43fcac581cd1c227308198565d142a1bf276))
- Migrate logpanel to table model/view
([`09bb112`](https://github.com/bec-project/bec_widgets/commit/09bb1121d83bac1f6e4827daa476fbe7cd5b3a80))
## v3.5.1 (2026-04-20)
### Bug Fixes
- Don't assume attr exists if we timed out waiting for it
([`f7a1ee4`](https://github.com/bec-project/bec_widgets/commit/f7a1ee49a42c58ba315c8957b45a80d862ffe745))
### Refactoring
- Don't import real widgets in client
([`8e51c1a`](https://github.com/bec-project/bec_widgets/commit/8e51c1adb6a7658c54846794cf97b774cbac2193))
## v3.5.0 (2026-04-14)
### Bug Fixes
+3 -11
View File
@@ -1,21 +1,13 @@
import os
import sys
if sys.platform.startswith("linux"):
qt_platform = os.environ.get("QT_QPA_PLATFORM", "")
if qt_platform != "offscreen":
os.environ["QT_QPA_PLATFORM"] = "xcb"
__all__ = ["BECWidget", "SafeSlot", "SafeProperty"]
def __getattr__(name: str):
def __getattr__(name):
if name == "BECWidget":
from bec_widgets.utils.bec_widget import BECWidget
return BECWidget
if name in {"SafeProperty", "SafeSlot"}:
if name in {"SafeSlot", "SafeProperty"}:
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
return {"SafeProperty": SafeProperty, "SafeSlot": SafeSlot}[name]
return {"SafeSlot": SafeSlot, "SafeProperty": SafeProperty}[name]
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
+15
View File
@@ -0,0 +1,15 @@
import os
import sys
import bec_widgets.widgets.containers.qt_ads as QtAds
if sys.platform.startswith("linux"):
qt_platform = os.environ.get("QT_QPA_PLATFORM", "")
if qt_platform != "offscreen":
os.environ["QT_QPA_PLATFORM"] = "xcb"
# Default QtAds configuration
QtAds.CDockManager.setConfigFlag(QtAds.CDockManager.eConfigFlag.FocusHighlighting, True)
QtAds.CDockManager.setConfigFlag(
QtAds.CDockManager.eConfigFlag.RetainTabSizeWhenCloseButtonHidden, True
)
@@ -19,8 +19,8 @@ from qtpy.QtWidgets import QApplication
import bec_widgets
from bec_widgets.applications.launch_window import LaunchWindow
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.bec_dispatcher import BECDispatcher
from bec_widgets.utils.rpc_register import RPCRegister
logger = bec_logger.logger
+1 -1
View File
@@ -20,13 +20,13 @@ from qtpy.QtWidgets import (
)
import bec_widgets
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets
from bec_widgets.utils.container_utils import WidgetContainerUtils
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.name_utils import pascal_to_snake
from bec_widgets.utils.plugin_utils import get_plugin_auto_updates
from bec_widgets.utils.round_frame import RoundedFrame
from bec_widgets.utils.rpc_register import RPCRegister
from bec_widgets.utils.screen_utils import apply_window_geometry, centered_geometry_for_app
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.utils.ui_loader import UILoader
@@ -0,0 +1,2 @@
from .config_choice_dialog import ConfigChoiceDialog
from .device_form_dialog import DeviceFormDialog
@@ -8,16 +8,14 @@ from ophyd_devices.interfaces.device_config_templates.ophyd_templates import OPH
from qtpy import QtCore, QtWidgets
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.control.device_manager.components import OphydValidation
from bec_widgets.widgets.control.device_manager.components.device_config_template.device_config_template import (
DeviceConfigTemplate,
)
from bec_widgets.widgets.control.device_manager.components.device_config_template.template_items import (
validate_name,
)
from bec_widgets.widgets.control.device_manager.components.ophyd_validation.ophyd_validation import (
OphydValidation,
)
from bec_widgets.widgets.control.device_manager.components.ophyd_validation.ophyd_validation_utils import (
from bec_widgets.widgets.control.device_manager.components.ophyd_validation import (
ConfigStatus,
ConnectionStatus,
format_error_to_md,
@@ -12,7 +12,7 @@ from qtpy import QtCore, QtGui, QtWidgets
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.control.device_manager.components.ophyd_validation.ophyd_validation_utils import (
from bec_widgets.widgets.control.device_manager.components.ophyd_validation import (
ConfigStatus,
ConnectionStatus,
get_validation_icons,
@@ -13,7 +13,6 @@ from bec_lib.file_utils import DeviceConfigWriter
from bec_lib.logger import bec_logger
from bec_lib.messages import ConfigAction, ScanStatusMessage
from bec_lib.plugin_helper import plugin_package_name, plugin_repo_path
from bec_lib.utils.import_utils import lazy_import_from
from bec_qthemes import apply_theme, material_icon
from qtpy.QtCore import QMetaObject, Qt, QThreadPool, Signal
from qtpy.QtGui import QColor
@@ -27,18 +26,26 @@ from qtpy.QtWidgets import (
QWidget,
)
from bec_widgets.applications.views.device_manager_view.device_manager_dialogs import (
ConfigChoiceDialog,
DeviceFormDialog,
)
from bec_widgets.applications.views.device_manager_view.device_manager_dialogs.upload_redis_dialog import (
UploadRedisDialog,
)
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.toolbars.actions import MaterialIconAction
from bec_widgets.utils.toolbars.bundles import ToolbarBundle
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.widgets.containers.dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.control.device_manager.components._util import SharedSelectionSignal
from bec_widgets.widgets.control.device_manager.components.device_table.device_table import (
from bec_widgets.widgets.control.device_manager.components import (
DeviceTable,
DMConfigView,
DocstringView,
OphydValidation,
)
from bec_widgets.widgets.control.device_manager.components.dm_config_view import DMConfigView
from bec_widgets.widgets.control.device_manager.components.dm_docstring_view import DocstringView
from bec_widgets.widgets.control.device_manager.components._util import SharedSelectionSignal
from bec_widgets.widgets.control.device_manager.components.ophyd_validation.ophyd_validation_utils import (
ConfigStatus,
ConnectionStatus,
@@ -54,29 +61,8 @@ from bec_widgets.widgets.utility.spinner.spinner import SpinnerWidget
if TYPE_CHECKING: # pragma: no cover
from bec_lib.client import BECClient
from bec_widgets.applications.views.device_manager_view.device_manager_dialogs.upload_redis_dialog import (
UploadRedisDialog,
)
logger = bec_logger.logger
ConfigChoiceDialog = lazy_import_from(
"bec_widgets.applications.views.device_manager_view.device_manager_dialogs.config_choice_dialog",
("ConfigChoiceDialog",),
)
DeviceFormDialog = lazy_import_from(
"bec_widgets.applications.views.device_manager_view.device_manager_dialogs.device_form_dialog",
("DeviceFormDialog",),
)
UploadRedisDialog = lazy_import_from(
"bec_widgets.applications.views.device_manager_view.device_manager_dialogs.upload_redis_dialog",
("UploadRedisDialog",),
)
OphydValidation = lazy_import_from(
"bec_widgets.widgets.control.device_manager.components.ophyd_validation.ophyd_validation",
("OphydValidation",),
)
_yes_no_question = partial(
QMessageBox.question,
buttons=QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No,
@@ -1,17 +1,14 @@
"""Module for Device Manager View."""
from bec_lib.utils.import_utils import lazy_import_from
from qtpy.QtCore import QRect
from qtpy.QtWidgets import QWidget
from bec_widgets.applications.views.device_manager_view.device_manager_widget import (
DeviceManagerWidget,
)
from bec_widgets.applications.views.view import ViewBase, ViewTourSteps
from bec_widgets.utils.error_popups import SafeSlot
DeviceManagerWidget = lazy_import_from(
"bec_widgets.applications.views.device_manager_view.device_manager_widget",
("DeviceManagerWidget",),
)
class DeviceManagerView(ViewBase):
"""
@@ -6,18 +6,15 @@ import os
from bec_lib.bec_yaml_loader import yaml_load
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import_from
from bec_qthemes import material_icon
from qtpy import QtCore, QtWidgets
from bec_widgets.applications.views.device_manager_view.device_manager_display_widget import (
DeviceManagerDisplayWidget,
)
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeSlot
DeviceManagerDisplayWidget = lazy_import_from(
"bec_widgets.applications.views.device_manager_view.device_manager_display_widget",
("DeviceManagerDisplayWidget",),
)
logger = bec_logger.logger
+1
View File
@@ -0,0 +1 @@
from bec_widgets.cli.rpc import rpc_base
+356 -41
View File
@@ -13,7 +13,7 @@ from typing import Literal, Optional
from bec_lib.logger import bec_logger
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call, rpc_timeout
from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets, get_plugin_client_module
from bec_widgets.utils.bec_plugin_helper import get_plugin_client_module
logger = bec_logger.logger
@@ -62,29 +62,19 @@ _Widgets = {
try:
_plugin_widgets = get_all_plugin_widgets().as_dict()
plugin_client = get_plugin_client_module()
Widgets = _WidgetsEnumType("Widgets", {name: name for name in _plugin_widgets} | _Widgets)
if (_overlap := _Widgets.keys() & _plugin_widgets.keys()) != set():
for _widget in _overlap:
logger.warning(
f"Detected duplicate widget {_widget} in plugin repo file: {inspect.getfile(_plugin_widgets[_widget])} !"
)
for plugin_name, plugin_class in inspect.getmembers(plugin_client, inspect.isclass):
if issubclass(plugin_class, RPCBase) and plugin_class is not RPCBase:
if plugin_name not in _Widgets:
_Widgets[plugin_name] = plugin_name
if plugin_name in globals():
conflicting_file = (
inspect.getfile(_plugin_widgets[plugin_name])
if plugin_name in _plugin_widgets
else f"{plugin_client}"
)
logger.warning(
f"Plugin widget {plugin_name} from {conflicting_file} conflicts with a built-in class!"
f"Plugin widget {plugin_name} in {plugin_class._IMPORT_MODULE} conflicts with a built-in class!"
)
continue
if plugin_name not in _overlap:
else:
globals()[plugin_name] = plugin_class
Widgets = _WidgetsEnumType("Widgets", _Widgets)
except ImportError as e:
logger.error(f"Failed loading plugins: \n{reduce(add, traceback.format_exception(e))}")
@@ -92,6 +82,8 @@ except ImportError as e:
class AdminView(RPCBase):
"""A view for administrators to change the current active experiment, manage messaging"""
_IMPORT_MODULE = "bec_widgets.applications.views.admin_view.admin_view"
@rpc_call
def activate(self) -> "None":
"""
@@ -100,6 +92,8 @@ class AdminView(RPCBase):
class AutoUpdates(RPCBase):
_IMPORT_MODULE = "bec_widgets.widgets.containers.auto_update.auto_updates"
@property
@rpc_call
def enabled(self) -> "bool":
@@ -136,6 +130,8 @@ class AutoUpdates(RPCBase):
class AvailableDeviceResources(RPCBase):
_IMPORT_MODULE = "bec_widgets.widgets.control.device_manager.components.available_device_resources.available_device_resources"
@rpc_call
def remove(self):
"""
@@ -156,6 +152,8 @@ class AvailableDeviceResources(RPCBase):
class BECDockArea(RPCBase):
_IMPORT_MODULE = "bec_widgets.widgets.containers.dock_area.dock_area"
@rpc_call
def new(
self,
@@ -342,10 +340,10 @@ class BECDockArea(RPCBase):
Save the current workspace profile.
On first save of a given name:
- writes a default copy to states/default/<name>.ini with tag=default and created_at
- writes a user copy to states/user/<name>.ini with tag=user and created_at
On subsequent saves of user-owned profiles:
- updates both the default and user copies so restore uses the latest snapshot.
- writes a baseline copy to profiles/baseline/<name>.ini with created_at
- writes a runtime copy to profiles/runtime/<name>.ini with created_at
On subsequent saves:
- updates both the baseline and runtime copies so restore uses the latest snapshot.
Read-only bundled profiles cannot be overwritten.
Args:
@@ -360,15 +358,31 @@ class BECDockArea(RPCBase):
@rpc_timeout(None)
@rpc_call
def load_profile(self, name: "str | None" = None):
def load_profile(self, name: "str | None" = None, restore_baseline: "bool" = False):
"""
Load a workspace profile.
Before switching, persist the current profile to the user copy.
Prefer loading the user copy; fall back to the default copy.
Before switching, persist the current profile to the runtime copy.
Prefer loading the runtime copy; fall back to the baseline copy. When
``restore_baseline`` is True, first overwrite the runtime copy with the
baseline profile and then load it.
Args:
name (str | None): The name of the profile to load. If None, prompts the user.
restore_baseline (bool): If True, restore the runtime copy from the
baseline before loading. Defaults to False.
"""
@rpc_timeout(None)
@rpc_call
def restore_baseline_profile(self, name: "str | None" = None, show_dialog: "bool" = False):
"""
Overwrite the runtime copy of *name* with the baseline.
If *name* is None, target the currently active profile.
Args:
name (str | None): The name of the profile to restore. If None, uses the current profile.
show_dialog (bool): If True, ask for confirmation before restoring.
"""
@rpc_call
@@ -391,6 +405,8 @@ class BECDockArea(RPCBase):
class BECMainWindow(RPCBase):
_IMPORT_MODULE = "bec_widgets.widgets.containers.main_window.main_window"
@rpc_call
def remove(self):
"""
@@ -413,6 +429,8 @@ class BECMainWindow(RPCBase):
class BECProgressBar(RPCBase):
"""A custom progress bar with smooth transitions. The displayed text can be customized using a template."""
_IMPORT_MODULE = "bec_widgets.widgets.progress.bec_progressbar.bec_progressbar"
@rpc_call
def set_value(self, value):
"""
@@ -486,6 +504,8 @@ class BECProgressBar(RPCBase):
class BECQueue(RPCBase):
"""Widget to display the BEC queue."""
_IMPORT_MODULE = "bec_widgets.widgets.services.bec_queue.bec_queue"
@rpc_call
def remove(self):
"""
@@ -508,6 +528,8 @@ class BECQueue(RPCBase):
class BECShell(RPCBase):
"""A BecConsole pre-configured to run the BEC shell."""
_IMPORT_MODULE = "bec_widgets.widgets.editors.bec_console.bec_console"
@rpc_call
def remove(self):
"""
@@ -530,6 +552,8 @@ class BECShell(RPCBase):
class BECStatusBox(RPCBase):
"""An autonomous widget to display the status of BEC services."""
_IMPORT_MODULE = "bec_widgets.widgets.services.bec_status_box.bec_status_box"
@rpc_call
def get_server_state(self) -> "str":
"""
@@ -565,6 +589,8 @@ class BECStatusBox(RPCBase):
class BaseROI(RPCBase):
"""Base class for all Region of Interest (ROI) implementations."""
_IMPORT_MODULE = "bec_widgets.widgets.plots.roi.image_roi"
@property
@rpc_call
def label(self) -> "str":
@@ -694,6 +720,8 @@ class BaseROI(RPCBase):
class BecConsole(RPCBase):
"""A console widget with access to a shared registry of terminals, such that instances can be moved around."""
_IMPORT_MODULE = "bec_widgets.widgets.editors.bec_console.bec_console"
@rpc_call
def remove(self):
"""
@@ -716,6 +744,8 @@ class BecConsole(RPCBase):
class CircularROI(RPCBase):
"""Circular Region of Interest with center/diameter tracking and auto-labeling."""
_IMPORT_MODULE = "bec_widgets.widgets.plots.roi.image_roi"
@property
@rpc_call
def label(self) -> "str":
@@ -843,6 +873,8 @@ class CircularROI(RPCBase):
class Curve(RPCBase):
_IMPORT_MODULE = "bec_widgets.widgets.plots.waveform.curve"
@rpc_call
def remove(self):
"""
@@ -1009,6 +1041,8 @@ class Curve(RPCBase):
class DapComboBox(RPCBase):
"""Editable combobox listing the available DAP models."""
_IMPORT_MODULE = "bec_widgets.widgets.dap.dap_combo_box.dap_combo_box"
@rpc_call
def select_y_axis(self, y_axis: str):
"""
@@ -1040,6 +1074,8 @@ class DapComboBox(RPCBase):
class DeveloperView(RPCBase):
"""A view for users to write scripts and macros and execute them within the application."""
_IMPORT_MODULE = "bec_widgets.applications.views.developer_view.developer_view"
@rpc_call
def activate(self) -> "None":
"""
@@ -1050,6 +1086,8 @@ class DeveloperView(RPCBase):
class DeviceBrowser(RPCBase):
"""DeviceBrowser is a widget that displays all available devices in the current BEC session."""
_IMPORT_MODULE = "bec_widgets.widgets.services.device_browser.device_browser"
@rpc_call
def remove(self):
"""
@@ -1072,6 +1110,8 @@ class DeviceBrowser(RPCBase):
class DeviceInitializationProgressBar(RPCBase):
"""A progress bar that displays the progress of device initialization."""
_IMPORT_MODULE = "bec_widgets.widgets.progress.device_initialization_progress_bar.device_initialization_progress_bar"
@rpc_call
def remove(self):
"""
@@ -1094,6 +1134,8 @@ class DeviceInitializationProgressBar(RPCBase):
class DeviceInputBase(RPCBase):
"""Mixin base class for device input widgets."""
_IMPORT_MODULE = "bec_widgets.widgets.control.device_input.base_classes.device_input_base"
@rpc_call
def remove(self):
"""
@@ -1116,6 +1158,8 @@ class DeviceInputBase(RPCBase):
class DeviceManagerView(RPCBase):
"""A view for users to manage devices within the application."""
_IMPORT_MODULE = "bec_widgets.applications.views.device_manager_view.device_manager_view"
@rpc_call
def activate(self) -> "None":
"""
@@ -1126,6 +1170,8 @@ class DeviceManagerView(RPCBase):
class DockAreaView(RPCBase):
"""Modular dock area view for arranging and managing multiple dockable widgets."""
_IMPORT_MODULE = "bec_widgets.applications.views.dock_area_view.dock_area_view"
@rpc_call
def activate(self) -> "None":
"""
@@ -1318,10 +1364,10 @@ class DockAreaView(RPCBase):
Save the current workspace profile.
On first save of a given name:
- writes a default copy to states/default/<name>.ini with tag=default and created_at
- writes a user copy to states/user/<name>.ini with tag=user and created_at
On subsequent saves of user-owned profiles:
- updates both the default and user copies so restore uses the latest snapshot.
- writes a baseline copy to profiles/baseline/<name>.ini with created_at
- writes a runtime copy to profiles/runtime/<name>.ini with created_at
On subsequent saves:
- updates both the baseline and runtime copies so restore uses the latest snapshot.
Read-only bundled profiles cannot be overwritten.
Args:
@@ -1336,15 +1382,31 @@ class DockAreaView(RPCBase):
@rpc_timeout(None)
@rpc_call
def load_profile(self, name: "str | None" = None):
def load_profile(self, name: "str | None" = None, restore_baseline: "bool" = False):
"""
Load a workspace profile.
Before switching, persist the current profile to the user copy.
Prefer loading the user copy; fall back to the default copy.
Before switching, persist the current profile to the runtime copy.
Prefer loading the runtime copy; fall back to the baseline copy. When
``restore_baseline`` is True, first overwrite the runtime copy with the
baseline profile and then load it.
Args:
name (str | None): The name of the profile to load. If None, prompts the user.
restore_baseline (bool): If True, restore the runtime copy from the
baseline before loading. Defaults to False.
"""
@rpc_timeout(None)
@rpc_call
def restore_baseline_profile(self, name: "str | None" = None, show_dialog: "bool" = False):
"""
Overwrite the runtime copy of *name* with the baseline.
If *name* is None, target the currently active profile.
Args:
name (str | None): The name of the profile to restore. If None, uses the current profile.
show_dialog (bool): If True, ask for confirmation before restoring.
"""
@rpc_call
@@ -1369,6 +1431,8 @@ class DockAreaView(RPCBase):
class DockAreaWidget(RPCBase):
"""Lightweight dock area that exposes the core Qt ADS docking helpers without any"""
_IMPORT_MODULE = "bec_widgets.widgets.containers.dock_area.basic_dock_area"
@rpc_call
def new(
self,
@@ -1553,6 +1617,8 @@ class DockAreaWidget(RPCBase):
class EllipticalROI(RPCBase):
"""Elliptical Region of Interest with centre/width/height tracking and auto-labelling."""
_IMPORT_MODULE = "bec_widgets.widgets.plots.roi.image_roi"
@property
@rpc_call
def label(self) -> "str":
@@ -1675,6 +1741,8 @@ class EllipticalROI(RPCBase):
class Heatmap(RPCBase):
"""Heatmap widget for visualizing 2d grid data with color mapping for the z-axis."""
_IMPORT_MODULE = "bec_widgets.widgets.plots.heatmap.heatmap"
@rpc_call
def remove(self):
"""
@@ -2373,6 +2441,8 @@ class Heatmap(RPCBase):
class Image(RPCBase):
"""Image widget for displaying 2D data."""
_IMPORT_MODULE = "bec_widgets.widgets.plots.image.image"
@rpc_call
def remove(self):
"""
@@ -2984,6 +3054,8 @@ class Image(RPCBase):
class ImageItem(RPCBase):
_IMPORT_MODULE = "bec_widgets.widgets.plots.image.image_item"
@property
@rpc_call
def color_map(self) -> "str":
@@ -3134,6 +3206,8 @@ class ImageItem(RPCBase):
class LaunchWindow(RPCBase):
_IMPORT_MODULE = "bec_widgets.applications.launch_window"
@rpc_call
def show_launcher(self):
"""
@@ -3148,33 +3222,224 @@ class LaunchWindow(RPCBase):
class LogPanel(RPCBase):
"""Displays a log panel"""
"""Live display of the BEC logs in a table view."""
_IMPORT_MODULE = "bec_widgets.widgets.utility.logpanel.logpanel"
@rpc_call
def set_plain_text(self, text: str) -> None:
def remove(self):
"""
Set the plain text of the widget.
Args:
text (str): The text to set.
Cleanup the BECConnector
"""
@rpc_call
def set_html_text(self, text: str) -> None:
def attach(self):
"""
Set the HTML text of the widget.
None
"""
@rpc_call
def detach(self):
"""
Detach the widget from its parent dock widget (if widget is in the dock), making it a floating widget.
"""
class Minesweeper(RPCBase):
_IMPORT_MODULE = "bec_widgets.widgets.games.minesweeper"
class MonacoDock(RPCBase):
"""MonacoDock is a dock widget that contains Monaco editor instances."""
_IMPORT_MODULE = "bec_widgets.widgets.editors.monaco.monaco_dock"
@rpc_call
def new(
self,
widget: "QWidget | str",
*,
closable: "bool" = True,
floatable: "bool" = True,
movable: "bool" = True,
start_floating: "bool" = False,
floating_state: "Mapping[str, object] | None" = None,
where: "Literal['left', 'right', 'top', 'bottom'] | None" = None,
on_close: "Callable[[CDockWidget, QWidget], None] | None" = None,
tab_with: "CDockWidget | QWidget | str | None" = None,
relative_to: "CDockWidget | QWidget | str | None" = None,
return_dock: "bool" = False,
show_title_bar: "bool | None" = None,
title_buttons: "Mapping[str, bool] | Sequence[str] | str | None" = None,
show_settings_action: "bool | None" = False,
promote_central: "bool" = False,
dock_icon: "QIcon | None" = None,
apply_widget_icon: "bool" = True,
object_name: "str | None" = None,
**widget_kwargs,
) -> "QWidget | CDockWidget | BECWidget":
"""
Create a new widget (or reuse an instance) and add it as a dock.
Args:
text (str): The text to set.
widget(QWidget | str): Instance or registered widget type string.
closable(bool): Whether the dock is closable.
floatable(bool): Whether the dock is floatable.
movable(bool): Whether the dock is movable.
start_floating(bool): Whether to start the dock floating.
floating_state(Mapping | None): Optional floating geometry metadata to apply when floating.
where(Literal["left", "right", "top", "bottom"] | None): Dock placement hint relative to the dock area (ignored when
``relative_to`` is provided without an explicit value).
on_close(Callable[[CDockWidget, QWidget], None] | None): Optional custom close handler accepting (dock, widget).
tab_with(CDockWidget | QWidget | str | None): Existing dock (or widget/name) to tab the new dock alongside.
relative_to(CDockWidget | QWidget | str | None): Existing dock (or widget/name) used as the positional anchor.
When supplied and ``where`` is ``None``, the new dock inherits the
anchor's current dock area.
return_dock(bool): When True, return the created dock instead of the widget.
show_title_bar(bool | None): Explicitly show or hide the dock area's title bar.
title_buttons(Mapping[str, bool] | Sequence[str] | str | None): Mapping or iterable describing which title bar buttons should
remain visible. Provide a mapping of button names (``"float"``,
``"close"``, ``"menu"``, ``"auto_hide"``, ``"minimize"``) to booleans,
or a sequence of button names to hide.
show_settings_action(bool | None): Control whether a dock settings/property action should
be installed. Defaults to ``False`` for the basic dock area; subclasses
such as `BECDockArea` override the default to ``True``.
promote_central(bool): When True, promote the created dock to be the dock manager's
central widget (useful for editor stacks or other root content).
dock_icon(QIcon | None): Optional icon applied to the dock via ``CDockWidget.setIcon``.
Provide a `QIcon` (e.g. from ``material_icon``). When ``None`` (default),
the widget's ``ICON_NAME`` attribute is used when available.
apply_widget_icon(bool): When False, skip automatically resolving the icon from
the widget's ``ICON_NAME`` (useful for callers who want no icon and do not pass one explicitly).
object_name(str | None): Optional object name to assign to the created widget.
**widget_kwargs: Additional keyword arguments passed to the widget constructor
when creating by type name.
Returns:
The widget instance by default, or the created `CDockWidget` when `return_dock` is True.
"""
@rpc_call
def dock_map(self) -> "dict[str, CDockWidget]":
"""
Return the dock widgets map as dictionary with names as keys.
"""
class Minesweeper(RPCBase): ...
@rpc_call
def dock_list(self) -> "list[CDockWidget]":
"""
Return the list of dock widgets.
"""
@rpc_call
def widget_map(self, bec_widgets_only: "bool" = True) -> "dict[str, QWidget]":
"""
Return a dictionary mapping widget names to their corresponding widgets.
Args:
bec_widgets_only(bool): If True, only include widgets that are BECConnector instances.
"""
@rpc_call
def widget_list(self, bec_widgets_only: "bool" = True) -> "list[QWidget]":
"""
Return a list of widgets contained in the dock area.
Args:
bec_widgets_only(bool): If True, only include widgets that are BECConnector instances.
"""
@rpc_call
def attach_all(self):
"""
Re-attach floating docks back into the dock manager.
"""
@rpc_call
def delete_all(self):
"""
Delete all docks and their associated widgets.
"""
@rpc_call
def delete(self, object_name: "str") -> "bool":
"""
Remove a widget from the dock area by its object name.
Args:
object_name: The object name of the widget to remove.
Returns:
bool: True if the widget was found and removed, False otherwise.
Raises:
ValueError: If no widget with the given object name is found.
Example:
>>> dock_area.delete("my_widget")
True
"""
@rpc_call
def set_layout_ratios(
self,
*,
horizontal: "Sequence[float] | Mapping[int | str, float] | None" = None,
vertical: "Sequence[float] | Mapping[int | str, float] | None" = None,
splitter_overrides: "Mapping[int | str | Sequence[int], Sequence[float] | Mapping[int | str, float]] | None" = None,
) -> "None":
"""
Adjust splitter ratios in the dock layout.
Args:
horizontal: Weights applied to every horizontal splitter encountered.
vertical: Weights applied to every vertical splitter encountered.
splitter_overrides: Optional overrides targeting specific splitters identified
by their index path (e.g. ``{0: [1, 2], (1, 0): [3, 5]}``). Paths are zero-based
indices following the splitter hierarchy, starting from the root splitter.
Example:
To build three columns with custom per-column ratios::
area.set_layout_ratios(
horizontal=[1, 2, 1], # column widths
splitter_overrides={
0: [1, 2], # column 0 (two rows)
1: [3, 2, 1], # column 1 (three rows)
2: [1], # column 2 (single row)
},
)
"""
@rpc_call
def describe_layout(self) -> "list[dict[str, Any]]":
"""
Return metadata describing splitter paths, orientations, and contained docks.
Useful for determining the keys to use in `set_layout_ratios(splitter_overrides=...)`.
"""
@rpc_call
def print_layout_structure(self) -> "None":
"""
Pretty-print the current splitter paths to stdout.
"""
@rpc_call
def set_central_dock(self, dock: "CDockWidget | QWidget | str") -> "None":
"""
Promote an existing dock to be the dock manager's central widget.
Args:
dock(CDockWidget | QWidget | str): Dock reference to promote.
"""
class MonacoWidget(RPCBase):
"""A simple Monaco editor widget"""
_IMPORT_MODULE = "bec_widgets.widgets.editors.monaco.monaco_widget"
@rpc_call
def set_text(
self, text: "str", file_name: "str | None" = None, reset: "bool" = False
@@ -3349,6 +3614,8 @@ class MonacoWidget(RPCBase):
class MotorMap(RPCBase):
"""Motor map widget for plotting motor positions in 2D including a trace of the last points."""
_IMPORT_MODULE = "bec_widgets.widgets.plots.motor_map.motor_map"
@rpc_call
def remove(self):
"""
@@ -3819,6 +4086,8 @@ class MotorMap(RPCBase):
class MultiWaveform(RPCBase):
"""MultiWaveform widget for displaying multiple waveforms emitted by a single signal."""
_IMPORT_MODULE = "bec_widgets.widgets.plots.multi_waveform.multi_waveform"
@rpc_call
def remove(self):
"""
@@ -4278,6 +4547,8 @@ class MultiWaveform(RPCBase):
class PdfViewerWidget(RPCBase):
"""A widget to display PDF documents with toolbar controls."""
_IMPORT_MODULE = "bec_widgets.widgets.utility.pdf_viewer.pdf_viewer"
@rpc_call
def load_pdf(self, file_path: str):
"""
@@ -4409,6 +4680,10 @@ class PdfViewerWidget(RPCBase):
class PositionIndicator(RPCBase):
"""Display a position within a defined range, e.g. motor limits."""
_IMPORT_MODULE = (
"bec_widgets.widgets.control.device_control.position_indicator.position_indicator"
)
@rpc_call
def set_value(self, position: float):
"""
@@ -4474,6 +4749,10 @@ class PositionIndicator(RPCBase):
class PositionerBox(RPCBase):
"""Simple Widget to control a positioner in box form"""
_IMPORT_MODULE = (
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box.positioner_box"
)
@rpc_call
def set_positioner(self, positioner: "str | Positioner"):
"""
@@ -4506,6 +4785,8 @@ class PositionerBox(RPCBase):
class PositionerBox2D(RPCBase):
"""Simple Widget to control two positioners in box form"""
_IMPORT_MODULE = "bec_widgets.widgets.control.device_control.positioner_box.positioner_box_2d.positioner_box_2d"
@rpc_call
def set_positioner_hor(self, positioner: "str | Positioner"):
"""
@@ -4575,6 +4856,8 @@ class PositionerBox2D(RPCBase):
class PositionerControlLine(RPCBase):
"""A widget that controls a single device."""
_IMPORT_MODULE = "bec_widgets.widgets.control.device_control.positioner_box.positioner_control_line.positioner_control_line"
@rpc_call
def set_positioner(self, positioner: "str | Positioner"):
"""
@@ -4607,6 +4890,8 @@ class PositionerControlLine(RPCBase):
class PositionerGroup(RPCBase):
"""Simple Widget to control a positioner in box form"""
_IMPORT_MODULE = "bec_widgets.widgets.control.device_control.positioner_group.positioner_group"
@rpc_call
def set_positioners(self, device_names: "str"):
"""
@@ -4638,6 +4923,8 @@ class PositionerGroup(RPCBase):
class RectangularROI(RPCBase):
"""Defines a rectangular Region of Interest (ROI) with additional functionality."""
_IMPORT_MODULE = "bec_widgets.widgets.plots.roi.image_roi"
@property
@rpc_call
def label(self) -> "str":
@@ -4767,6 +5054,8 @@ class RectangularROI(RPCBase):
class ResumeButton(RPCBase):
"""A button that continue scan queue."""
_IMPORT_MODULE = "bec_widgets.widgets.control.buttons.button_resume.button_resume"
@rpc_call
def remove(self):
"""
@@ -4787,6 +5076,8 @@ class ResumeButton(RPCBase):
class Ring(RPCBase):
_IMPORT_MODULE = "bec_widgets.widgets.progress.ring_progress_bar.ring"
@rpc_call
def set_value(self, value: "int | float"):
"""
@@ -4880,6 +5171,8 @@ class Ring(RPCBase):
class RingProgressBar(RPCBase):
_IMPORT_MODULE = "bec_widgets.widgets.progress.ring_progress_bar.ring_progress_bar"
@rpc_call
def remove(self):
"""
@@ -4959,12 +5252,14 @@ class RingProgressBar(RPCBase):
class SBBMonitor(RPCBase):
"""A widget to display the SBB monitor website."""
...
_IMPORT_MODULE = "bec_widgets.widgets.editors.sbb_monitor.sbb_monitor"
class ScanControl(RPCBase):
"""Widget to submit new scans to the queue."""
_IMPORT_MODULE = "bec_widgets.widgets.control.scan_control.scan_control"
@rpc_call
def attach(self):
"""
@@ -4988,6 +5283,8 @@ class ScanControl(RPCBase):
class ScanProgressBar(RPCBase):
"""Widget to display a progress bar that is hooked up to the scan progress of a scan."""
_IMPORT_MODULE = "bec_widgets.widgets.progress.scan_progressbar.scan_progressbar"
@rpc_call
def remove(self):
"""
@@ -5010,6 +5307,8 @@ class ScanProgressBar(RPCBase):
class ScatterCurve(RPCBase):
"""Scatter curve item for the scatter waveform widget."""
_IMPORT_MODULE = "bec_widgets.widgets.plots.scatter_waveform.scatter_curve"
@property
@rpc_call
def color_map(self) -> "str":
@@ -5019,6 +5318,8 @@ class ScatterCurve(RPCBase):
class ScatterWaveform(RPCBase):
_IMPORT_MODULE = "bec_widgets.widgets.plots.scatter_waveform.scatter_waveform"
@rpc_call
def remove(self):
"""
@@ -5486,6 +5787,8 @@ class ScatterWaveform(RPCBase):
class SignalLabel(RPCBase):
_IMPORT_MODULE = "bec_widgets.widgets.utility.signal_label.signal_label"
@property
@rpc_call
def custom_label(self) -> "str":
@@ -5630,6 +5933,8 @@ class SignalLabel(RPCBase):
class TextBox(RPCBase):
"""A widget that displays text in plain and HTML format"""
_IMPORT_MODULE = "bec_widgets.widgets.editors.text_box.text_box"
@rpc_call
def set_plain_text(self, text: str) -> None:
"""
@@ -5652,6 +5957,8 @@ class TextBox(RPCBase):
class ViewBase(RPCBase):
"""Wrapper for a content widget used inside the main app's stacked view."""
_IMPORT_MODULE = "bec_widgets.applications.views.view"
@rpc_call
def activate(self) -> "None":
"""
@@ -5662,6 +5969,8 @@ class ViewBase(RPCBase):
class Waveform(RPCBase):
"""Widget for plotting waveforms."""
_IMPORT_MODULE = "bec_widgets.widgets.plots.waveform.waveform"
@rpc_call
def remove(self):
"""
@@ -6240,6 +6549,8 @@ class Waveform(RPCBase):
class WaveformViewInline(RPCBase):
_IMPORT_MODULE = "bec_widgets.applications.views.view"
@rpc_call
def activate(self) -> "None":
"""
@@ -6248,6 +6559,8 @@ class WaveformViewInline(RPCBase):
class WaveformViewPopup(RPCBase):
_IMPORT_MODULE = "bec_widgets.applications.views.view"
@rpc_call
def activate(self) -> "None":
"""
@@ -6258,6 +6571,8 @@ class WaveformViewPopup(RPCBase):
class WebsiteWidget(RPCBase):
"""A simple widget to display a website"""
_IMPORT_MODULE = "bec_widgets.widgets.editors.website.website"
@rpc_call
def set_url(self, url: str) -> None:
"""
-79
View File
@@ -1,79 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from bec_lib.utils.import_utils import lazy_import_from
from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widget_references
from bec_widgets.utils.plugin_utils import get_custom_class_references
try:
from bec_widgets.cli.constants import IGNORE_WIDGETS
except ModuleNotFoundError: # pragma: no cover
IGNORE_WIDGETS = ["LaunchWindow"]
if TYPE_CHECKING: # pragma: no cover
from bec_widgets.utils.bec_widget import BECWidget
class RPCWidgetHandler:
"""Handler class for creating widgets from RPC messages."""
def __init__(self):
self._widget_classes = None
@property
def widget_classes(self) -> dict[str, type["BECWidget"]]:
"""
Get the available widget classes.
Returns:
dict: The available widget classes.
"""
if self._widget_classes is None:
self.update_available_widgets()
return self._widget_classes # type: ignore
def update_available_widgets(self):
"""
Update the available widgets.
Returns:
None
"""
ignored = set(IGNORE_WIDGETS)
widget_classes = {
reference.name: lazy_import_from(reference.module, (reference.name,))
for reference in get_all_plugin_widget_references(use_cache=False)
if reference.name not in ignored
}
widget_classes.update(
{
reference.name: lazy_import_from(reference.module, (reference.name,))
for reference in get_custom_class_references(
"bec_widgets", packages=("widgets", "applications"), use_cache=False
)
if reference.name not in ignored
}
)
self._widget_classes = widget_classes
def create_widget(self, widget_type, **kwargs) -> "BECWidget":
"""
Create a widget from an RPC message.
Args:
widget_type(str): The type of the widget.
name (str): The name of the widget.
**kwargs: The keyword arguments for the widget.
Returns:
widget(BECWidget): The created widget.
"""
widget_class = self.widget_classes.get(widget_type) # type: ignore
if widget_class:
return widget_class(**kwargs)
raise ValueError(f"Unknown widget type: {widget_type}")
widget_handler = RPCWidgetHandler()
@@ -25,8 +25,8 @@ from qtpy.QtWidgets import (
QWidget,
)
from bec_widgets.cli.rpc.rpc_widget_handler import widget_handler
from bec_widgets.utils.colors import apply_theme
from bec_widgets.utils.rpc_widget_handler import widget_handler
from bec_widgets.utils.widget_io import WidgetHierarchy as wh
from bec_widgets.widgets.editors.jupyter_console.jupyter_console import BECJupyterConsole
+1
View File
@@ -0,0 +1 @@
+1 -1
View File
@@ -15,9 +15,9 @@ from pydantic import BaseModel, Field, field_validator
from qtpy.QtCore import Property, QObject, QRunnable, QThreadPool, Signal
from qtpy.QtWidgets import QApplication
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.error_popups import ErrorPopupUtility, SafeSlot
from bec_widgets.utils.name_utils import sanitize_namespace
from bec_widgets.utils.rpc_register import RPCRegister
from bec_widgets.utils.widget_io import WidgetHierarchy
from bec_widgets.utils.yaml_dialog import load_yaml, load_yaml_gui, save_yaml, save_yaml_gui
+3 -59
View File
@@ -1,9 +1,7 @@
from __future__ import annotations
import ast
import importlib.metadata
import inspect
import logging
import pkgutil
import traceback
from importlib import util as importlib_util
@@ -11,65 +9,11 @@ from importlib.machinery import FileFinder, ModuleSpec, SourceFileLoader
from types import ModuleType
from typing import Generator
from bec_widgets.utils.plugin_utils import (
BECClassContainer,
BECClassInfo,
BECClassReference,
_ast_node_name,
_class_has_rpc_markers,
_discover_class_references_from_roots,
_find_package_roots,
)
from bec_lib.logger import bec_logger
logger = logging.getLogger(__name__)
from bec_widgets.utils.plugin_utils import BECClassContainer, BECClassInfo
def _plugin_class_is_candidate(node: ast.ClassDef) -> bool:
base_names = {_ast_node_name(base) for base in node.bases}
return bool({"BECWidget", "BECConnector"} & base_names) or _class_has_rpc_markers(node)
_PLUGIN_WIDGET_REFERENCE_CACHE: dict[tuple[tuple[str, str], ...], tuple[BECClassReference, ...]] = (
{}
)
def _plugin_entry_point_snapshot() -> tuple[tuple[str, str], ...]:
return tuple(
sorted(
(entry_point.name, entry_point.module)
for entry_point in importlib.metadata.entry_points(group="bec.widgets.user_widgets") # type: ignore
)
)
def _build_plugin_widget_references() -> tuple[BECClassReference, ...]:
references: list[BECClassReference] = []
seen_names: set[str] = set()
for entry_point in importlib.metadata.entry_points(group="bec.widgets.user_widgets"): # type: ignore
try:
package_roots = _find_package_roots(entry_point.module)
except ModuleNotFoundError:
continue
for reference in _discover_class_references_from_roots(
entry_point.module,
package_roots,
file_name_filter=lambda file_name: file_name.endswith(".py")
and not file_name.startswith("__"),
candidate_filter=_plugin_class_is_candidate,
):
if reference.name in seen_names:
continue
references.append(reference)
seen_names.add(reference.name)
return tuple(references)
def get_all_plugin_widget_references(*, use_cache: bool = True) -> list[BECClassReference]:
snapshot = _plugin_entry_point_snapshot()
if not use_cache or snapshot not in _PLUGIN_WIDGET_REFERENCE_CACHE:
_PLUGIN_WIDGET_REFERENCE_CACHE[snapshot] = _build_plugin_widget_references()
return list(_PLUGIN_WIDGET_REFERENCE_CACHE[snapshot])
logger = bec_logger.logger
def _submodule_specs(module: ModuleType) -> tuple[ModuleSpec | None, ...]:
+2 -1
View File
@@ -10,16 +10,17 @@ from qtpy.QtGui import QFont, QPixmap
from qtpy.QtWidgets import QApplication, QFileDialog, QLabel, QVBoxLayout, QWidget
import bec_widgets.widgets.containers.qt_ads as QtAds
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.bec_connector import BECConnector, ConnectionConfig
from bec_widgets.utils.busy_loader import install_busy_loader
from bec_widgets.utils.error_popups import SafeConnect, SafeSlot
from bec_widgets.utils.rpc_decorator import rpc_timeout
from bec_widgets.utils.rpc_register import RPCRegister
from bec_widgets.utils.widget_io import WidgetHierarchy
from bec_widgets.widgets.utility.spinner.spinner import SpinnerWidget
if TYPE_CHECKING: # pragma: no cover
from bec_widgets.utils.busy_loader import BusyLoaderOverlay
from bec_widgets.widgets.containers.dock import BECDock
logger = bec_logger.logger
@@ -7,6 +7,7 @@ import inspect
import os
import sys
from pathlib import Path
from typing import get_overloads
import black
import isort
@@ -18,20 +19,6 @@ from bec_widgets.utils.plugin_utils import BECClassContainer, get_custom_classes
logger = bec_logger.logger
if sys.version_info >= (3, 11):
from typing import get_overloads
else:
print(
"Python version is less than 3.11, using dummy function for get_overloads. "
"If you want to use the real function 'typing.get_overloads()', please use Python 3.11 or later."
)
def get_overloads(_obj):
"""
Dummy function for Python versions before 3.11.
"""
return []
class ClientGenerator:
def __init__(self, base=False):
@@ -54,7 +41,7 @@ from __future__ import annotations
from bec_lib.logger import bec_logger
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call, rpc_timeout
{"from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets, get_plugin_client_module" if self._base else ""}
{"from bec_widgets.utils.bec_plugin_helper import get_plugin_client_module" if self._base else ""}
logger = bec_logger.logger
@@ -111,27 +98,19 @@ _Widgets = {
self.content += """
try:
_plugin_widgets = get_all_plugin_widgets().as_dict()
plugin_client = get_plugin_client_module()
Widgets = _WidgetsEnumType("Widgets", {name: name for name in _plugin_widgets} | _Widgets)
if (_overlap := _Widgets.keys() & _plugin_widgets.keys()) != set():
for _widget in _overlap:
logger.warning(f"Detected duplicate widget {_widget} in plugin repo file: {inspect.getfile(_plugin_widgets[_widget])} !")
for plugin_name, plugin_class in inspect.getmembers(plugin_client, inspect.isclass):
if issubclass(plugin_class, RPCBase) and plugin_class is not RPCBase:
if plugin_name not in _Widgets:
_Widgets[plugin_name] = plugin_name
if plugin_name in globals():
conflicting_file = (
inspect.getfile(_plugin_widgets[plugin_name])
if plugin_name in _plugin_widgets
else f"{plugin_client}"
)
logger.warning(
f"Plugin widget {plugin_name} from {conflicting_file} conflicts with a built-in class!"
f"Plugin widget {plugin_name} in {plugin_class._IMPORT_MODULE} conflicts with a built-in class!"
)
continue
if plugin_name not in _overlap:
else:
globals()[plugin_name] = plugin_class
Widgets = _WidgetsEnumType("Widgets", _Widgets)
except ImportError as e:
logger.error(f"Failed loading plugins: \\n{reduce(add, traceback.format_exception(e))}")
"""
@@ -146,12 +125,8 @@ except ImportError as e:
class_name = cls.__name__
if class_name == "BECDockArea":
self.content += f"""
class {class_name}(RPCBase):"""
else:
self.content += f"""
class {class_name}(RPCBase):"""
self.content += f"""
class {class_name}(RPCBase):\n"""
if cls.__doc__:
# We only want the first line of the docstring
@@ -162,13 +137,9 @@ class {class_name}(RPCBase):"""
else:
class_docs = cls.__doc__.split("\n")[1]
self.content += f"""
\"\"\"{class_docs}\"\"\"
"""
\"\"\"{class_docs}\"\"\"\n"""
user_access_entries = self._get_user_access_entries(cls)
if not user_access_entries:
self.content += """...
"""
self.content += f' _IMPORT_MODULE="{cls.__module__}"\n'
for method_entry in user_access_entries:
method, obj, is_property_setter = self._resolve_method_object(cls, method_entry)
if obj is None:
+47 -288
View File
@@ -1,23 +1,22 @@
from __future__ import annotations
import ast
import importlib
import inspect
import os
from dataclasses import dataclass
from functools import lru_cache
from importlib import util as importlib_util
from typing import TYPE_CHECKING, Callable, Iterable
from typing import TYPE_CHECKING, Iterable
from bec_lib.plugin_helper import _get_available_plugins
from qtpy.QtWidgets import QWidget
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.bec_widget import BECWidget
if TYPE_CHECKING: # pragma: no cover
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
_DISCOVERY_BASE_NAMES = frozenset({"BECConnector", "BECWidget", "ViewBase"})
def get_plugin_widgets() -> dict[str, "BECConnector"]:
def get_plugin_widgets() -> dict[str, BECConnector]:
"""
Get all available widgets from the plugin directory. Widgets are classes that inherit from BECConnector.
The plugins are provided through python plugins and specified in the respective pyproject.toml file using
@@ -36,10 +35,9 @@ def get_plugin_widgets() -> dict[str, "BECConnector"]:
Returns:
dict[str, BECConnector]: A dictionary of widget names and their respective classes.
"""
from bec_lib.plugin_helper import _get_available_plugins
modules = _get_available_plugins("bec.widgets.user_widgets")
loaded_plugins = {}
print(modules)
for module in modules:
mods = inspect.getmembers(module, predicate=_filter_plugins)
for name, mod_cls in mods:
@@ -50,8 +48,6 @@ def get_plugin_widgets() -> dict[str, "BECConnector"]:
def _filter_plugins(obj):
from bec_widgets.utils.bec_connector import BECConnector
return inspect.isclass(obj) and issubclass(obj, BECConnector)
@@ -70,8 +66,6 @@ def get_plugin_auto_updates() -> dict[str, type[AutoUpdates]]:
Returns:
dict[str, AutoUpdates]: A dictionary of widget names and their respective classes.
"""
from bec_lib.plugin_helper import _get_available_plugins
modules = _get_available_plugins("bec.widgets.auto_updates")
loaded_plugins = {}
for module in modules:
@@ -96,20 +90,14 @@ class BECClassInfo:
name: str
module: str
file: str
obj: type["BECWidget"]
obj: type[BECWidget]
is_connector: bool = False
is_widget: bool = False
is_plugin: bool = False
@dataclass(frozen=True)
class BECClassReference:
name: str
module: str
class BECClassContainer:
def __init__(self, initial: Iterable[BECClassInfo] = ()):
def __init__(self, initial: Iterable[BECClassInfo] = []):
self._collection: list[BECClassInfo] = list(initial)
def __repr__(self):
@@ -121,13 +109,12 @@ class BECClassContainer:
def __add__(self, other: BECClassContainer):
return BECClassContainer((*self, *(c for c in other if c.name not in self.names)))
def as_dict(self, ignores: list[str] | None = None) -> dict[str, type["BECWidget"]]:
def as_dict(self, ignores: list[str] = []) -> dict[str, type[BECWidget]]:
"""get a dict of {name: Type} for all the entries in the collection.
Args:
ignores(list[str]): a list of class names to exclude from the dictionary."""
ignore_set = set(ignores or ())
return {c.name: c.obj for c in self if c.name not in ignore_set}
return {c.name: c.obj for c in self if c.name not in ignores}
def add_class(self, class_info: BECClassInfo):
"""
@@ -179,276 +166,48 @@ class BECClassContainer:
return [info.obj for info in self.collection]
def _ast_node_name(node: ast.expr) -> str | None:
if isinstance(node, ast.Name):
return node.id
if isinstance(node, ast.Attribute):
return node.attr
return None
def _class_has_rpc_markers(node: ast.ClassDef) -> bool:
for stmt in node.body:
if isinstance(stmt, ast.Assign):
target_names = {target.id for target in stmt.targets if isinstance(target, ast.Name)}
if (
"PLUGIN" in target_names
and isinstance(stmt.value, ast.Constant)
and stmt.value.value
):
return True
if {"RPC_CONTENT_CLASS", "USER_ACCESS"} & target_names:
return True
if isinstance(stmt, ast.AnnAssign) and isinstance(stmt.target, ast.Name):
if (
stmt.target.id == "PLUGIN"
and isinstance(stmt.value, ast.Constant)
and stmt.value.value
):
return True
if stmt.target.id in {"RPC_CONTENT_CLASS", "USER_ACCESS"}:
return True
return False
def _class_is_candidate(node: ast.ClassDef) -> bool:
base_names = {_ast_node_name(base) for base in node.bases}
return bool(_DISCOVERY_BASE_NAMES & base_names) or _class_has_rpc_markers(node)
def _candidate_top_level_class_names(path: str) -> list[str]:
with open(path, encoding="utf-8") as file_handle:
module = ast.parse(file_handle.read(), filename=path)
return [
node.name
for node in module.body
if isinstance(node, ast.ClassDef) and _class_is_candidate(node)
]
@lru_cache(maxsize=64)
def _find_package_roots(module_name: str) -> tuple[str, ...]:
spec = importlib_util.find_spec(module_name)
if spec is None:
raise ModuleNotFoundError(module_name)
package_roots = tuple(spec.submodule_search_locations or ())
if package_roots:
return package_roots
if spec.origin:
return (os.path.dirname(spec.origin),)
raise ModuleNotFoundError(module_name)
def _discover_class_references_from_roots(
module_prefix: str,
package_roots: Iterable[str],
*,
file_name_filter: Callable[[str], bool],
candidate_filter: Callable[[ast.ClassDef], bool],
) -> tuple[BECClassReference, ...]:
references: list[BECClassReference] = []
seen_names: set[str] = set()
for package_root in package_roots:
for root, _, files in sorted(os.walk(package_root)):
for file_name in sorted(files):
if not file_name_filter(file_name):
continue
path = os.path.join(root, file_name)
with open(path, encoding="utf-8") as file_handle:
module = ast.parse(file_handle.read(), filename=path)
rel_path = os.path.relpath(path, package_root).removesuffix(".py")
module_name = ".".join([module_prefix, *rel_path.split(os.sep)])
for node in module.body:
if not isinstance(node, ast.ClassDef) or not candidate_filter(node):
continue
if node.name in seen_names:
continue
references.append(BECClassReference(name=node.name, module=module_name))
seen_names.add(node.name)
return tuple(references)
def _iter_candidate_modules(repo_name: str, package: str) -> Iterable[tuple[str, str, list[str]]]:
try:
package_roots = _find_package_roots(f"{repo_name}.{package}")
except ModuleNotFoundError:
return ()
modules: list[tuple[str, str, list[str]]] = []
for directory in package_roots:
for root, _, files in sorted(os.walk(directory)):
for file_name in sorted(files):
if (
not file_name.endswith(".py")
or file_name.startswith("__")
or file_name.startswith("register_")
or file_name.endswith("_plugin.py")
):
continue
path = os.path.join(root, file_name)
rel_dir = os.path.dirname(os.path.relpath(path, directory))
module_name = (
file_name.removesuffix(".py")
if rel_dir in ("", ".")
else ".".join(rel_dir.split(os.sep) + [file_name.removesuffix(".py")])
)
class_names = _candidate_top_level_class_names(path)
if class_names:
modules.append((f"{repo_name}.{package}.{module_name}", path, class_names))
return tuple(modules)
def _collect_classes_from_package(repo_name: str, package: str) -> BECClassContainer:
"""Collect classes from a package subtree (for example ``widgets`` or ``applications``)."""
collection = BECClassContainer()
for module_name, path, _ in _iter_candidate_modules(repo_name, package):
from qtpy.QtWidgets import QWidget
try:
anchor_module = importlib.import_module(f"{repo_name}.{package}")
except ModuleNotFoundError as exc:
# Some plugin repositories expose only one subtree. Skip gracefully if it does not exist.
if exc.name == f"{repo_name}.{package}":
return collection
raise
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.bec_widget import BECWidget
module = importlib.import_module(module_name)
for name, obj in inspect.getmembers(module, inspect.isclass):
if obj.__module__ != module.__name__:
directory = os.path.dirname(anchor_module.__file__)
for root, _, files in sorted(os.walk(directory)):
for file in files:
if not file.endswith(".py") or file.startswith("__"):
continue
class_info = BECClassInfo(name=name, module=module.__name__, file=path, obj=obj)
if issubclass(obj, BECConnector):
class_info.is_connector = True
if issubclass(obj, QWidget) or issubclass(obj, BECWidget):
class_info.is_widget = True
if hasattr(obj, "PLUGIN") and obj.PLUGIN:
class_info.is_plugin = True
collection.add_class(class_info)
path = os.path.join(root, file)
rel_dir = os.path.dirname(os.path.relpath(path, directory))
if rel_dir in ("", "."):
module_name = file.split(".")[0]
else:
module_name = ".".join(rel_dir.split(os.sep) + [file.split(".")[0]])
module = importlib.import_module(f"{repo_name}.{package}.{module_name}")
for name in dir(module):
obj = getattr(module, name)
if not hasattr(obj, "__module__") or obj.__module__ != module.__name__:
continue
if isinstance(obj, type):
class_info = BECClassInfo(name=name, module=module.__name__, file=path, obj=obj)
if issubclass(obj, BECConnector):
class_info.is_connector = True
if issubclass(obj, QWidget) or issubclass(obj, BECWidget):
class_info.is_widget = True
if hasattr(obj, "PLUGIN") and obj.PLUGIN:
class_info.is_plugin = True
collection.add_class(class_info)
return collection
def _build_ast_inheritance_map(
repo_name: str, packages: tuple[str, ...]
) -> dict[str, tuple[str, set[str]]]:
"""
Walk all candidate modules in the given packages and return a map of:
class_name -> (module_name, {direct_base_names})
This is used for the transitive-closure widget discovery so that subclasses
of discovered widget bases are themselves discoverable without needing to
repeat ``PLUGIN = True`` on every intermediate or leaf class.
"""
mapping: dict[str, tuple[str, set[str]]] = {}
for package in packages:
try:
package_roots = _find_package_roots(f"{repo_name}.{package}")
except ModuleNotFoundError:
continue
for directory in package_roots:
for root, _, files in sorted(os.walk(directory)):
for file_name in sorted(files):
if (
not file_name.endswith(".py")
or file_name.startswith("__")
or file_name.startswith("register_")
or file_name.endswith("_plugin.py")
):
continue
path = os.path.join(root, file_name)
rel_dir = os.path.dirname(os.path.relpath(path, directory))
module_name = (
file_name.removesuffix(".py")
if rel_dir in ("", ".")
else ".".join(rel_dir.split(os.sep) + [file_name.removesuffix(".py")])
)
full_module = f"{repo_name}.{package}.{module_name}"
with open(path, encoding="utf-8") as fh:
tree = ast.parse(fh.read(), filename=path)
for node in tree.body:
if not isinstance(node, ast.ClassDef):
continue
base_names = {_ast_node_name(b) for b in node.bases} - {None}
mapping[node.name] = (full_module, base_names) # type: ignore[arg-type]
return mapping
@lru_cache(maxsize=32)
def _cached_custom_class_references(
repo_name: str, packages: tuple[str, ...]
) -> tuple[BECClassReference, ...]:
"""Discover widget/connector class references using a transitive-closure AST scan.
The first pass identifies classes that directly inherit from
``_DISCOVERY_BASE_NAMES`` or carry explicit RPC markers (``PLUGIN``,
``USER_ACCESS``, ``RPC_CONTENT_CLASS``). Subsequent passes treat every
newly found class name as an additional base name, so subclasses of
subclasses are discovered automatically without requiring each
intermediate class to repeat ``PLUGIN = True``.
"""
inheritance_map = _build_ast_inheritance_map(repo_name, packages)
# Seed with _class_has_rpc_markers — we need the AST nodes for that check.
# Re-parse only to identify initial RPC-marker classes; inheritance_map
# already has everything else we need.
rpc_marker_names: set[str] = set()
for package in packages:
try:
package_roots = _find_package_roots(f"{repo_name}.{package}")
except ModuleNotFoundError:
continue
for directory in package_roots:
for root, _, files in sorted(os.walk(directory)):
for file_name in sorted(files):
if (
not file_name.endswith(".py")
or file_name.startswith("__")
or file_name.startswith("register_")
or file_name.endswith("_plugin.py")
):
continue
path = os.path.join(root, file_name)
with open(path, encoding="utf-8") as fh:
tree = ast.parse(fh.read(), filename=path)
for node in tree.body:
if isinstance(node, ast.ClassDef) and _class_has_rpc_markers(node):
rpc_marker_names.add(node.name)
# Transitive closure: start with known base names + RPC-marker classes,
# then repeatedly add classes whose direct bases are already known.
known: set[str] = set(_DISCOVERY_BASE_NAMES) | rpc_marker_names
changed = True
while changed:
changed = False
for class_name, (_, bases) in inheritance_map.items():
if class_name not in known and bases & known:
known.add(class_name)
changed = True
# Build the final list of references, preserving first-seen order and
# keeping only classes that are in the inheritance map (i.e. have a module).
references: list[BECClassReference] = []
seen_names: set[str] = set()
for class_name, (module_name, bases) in inheritance_map.items():
if class_name not in known:
continue
if class_name in seen_names:
continue
# Only emit if the class is actually a candidate (not just a raw base)
if class_name in _DISCOVERY_BASE_NAMES:
continue
references.append(BECClassReference(name=class_name, module=module_name))
seen_names.add(class_name)
return tuple(references)
def get_custom_class_references(
repo_name: str, packages: tuple[str, ...] | None = None, *, use_cache: bool = True
) -> list[BECClassReference]:
selected_packages = packages or ("widgets",)
if use_cache:
return list(_cached_custom_class_references(repo_name, tuple(selected_packages)))
_cached_custom_class_references.cache_clear()
return list(_cached_custom_class_references(repo_name, tuple(selected_packages)))
def get_custom_classes(
repo_name: str, packages: tuple[str, ...] | None = None
) -> BECClassContainer:
+2 -1
View File
@@ -14,17 +14,18 @@ from qtpy.QtCore import Qt, QTimer
from qtpy.QtWidgets import QWidget
from redis.exceptions import RedisError
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.bec_dispatcher import BECDispatcher
from bec_widgets.utils.container_utils import WidgetContainerUtils
from bec_widgets.utils.error_popups import ErrorPopupUtility
from bec_widgets.utils.rpc_register import RPCRegister
from bec_widgets.utils.screen_utils import apply_window_geometry
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow, BECMainWindowNoRPC
if TYPE_CHECKING: # pragma: no cover
from bec_lib import messages
from qtpy.QtCore import QObject
else:
messages = lazy_import("bec_lib.messages")
logger = bec_logger.logger
+57
View File
@@ -0,0 +1,57 @@
from __future__ import annotations
from bec_widgets.cli.client_utils import IGNORE_WIDGETS
from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.plugin_utils import get_custom_classes
class RPCWidgetHandler:
"""Handler class for creating widgets from RPC messages."""
def __init__(self):
self._widget_classes = None
@property
def widget_classes(self) -> dict[str, type[BECWidget]]:
"""
Get the available widget classes.
Returns:
dict: The available widget classes.
"""
if self._widget_classes is None:
self.update_available_widgets()
return self._widget_classes # type: ignore
def update_available_widgets(self):
"""
Update the available widgets.
Returns:
None
"""
self._widget_classes = (
get_custom_classes("bec_widgets", packages=("widgets", "applications"))
+ get_all_plugin_widgets()
).as_dict(IGNORE_WIDGETS)
def create_widget(self, widget_type, **kwargs) -> BECWidget:
"""
Create a widget from an RPC message.
Args:
widget_type(str): The type of the widget.
name (str): The name of the widget.
**kwargs: The keyword arguments for the widget.
Returns:
widget(BECWidget): The created widget.
"""
widget_class = self.widget_classes.get(widget_type) # type: ignore
if widget_class:
return widget_class(**kwargs)
raise ValueError(f"Unknown widget type: {widget_type}")
widget_handler = RPCWidgetHandler()
@@ -13,9 +13,9 @@ from shiboken6 import isValid
import bec_widgets.widgets.containers.qt_ads as QtAds
from bec_widgets import BECWidget, SafeSlot
from bec_widgets.cli.rpc.rpc_widget_handler import widget_handler
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.property_editor import PropertyEditor
from bec_widgets.utils.rpc_widget_handler import widget_handler
from bec_widgets.utils.toolbars.actions import MaterialIconAction
from bec_widgets.widgets.containers.qt_ads import (
CDockAreaWidget,
@@ -20,10 +20,10 @@ from qtpy.QtWidgets import (
import bec_widgets.widgets.containers.qt_ads as QtAds
from bec_widgets import BECWidget, SafeProperty, SafeSlot
from bec_widgets.applications.views.view import ViewTourSteps
from bec_widgets.cli.rpc.rpc_widget_handler import widget_handler
from bec_widgets.utils.bec_dispatcher import BECDispatcher
from bec_widgets.utils.colors import apply_theme
from bec_widgets.utils.rpc_decorator import rpc_timeout
from bec_widgets.utils.rpc_widget_handler import widget_handler
from bec_widgets.utils.toolbars.actions import (
ExpandableMenuAction,
MaterialIconAction,
@@ -35,25 +35,25 @@ from bec_widgets.utils.widget_state_manager import WidgetStateManager
from bec_widgets.widgets.containers.dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.containers.dock_area.profile_utils import (
SETTINGS_KEYS,
default_profile_candidates,
baseline_profile_candidates,
delete_profile_files,
get_last_profile,
is_profile_read_only,
is_quick_select,
list_profiles,
list_quick_profiles,
load_default_profile_screenshot,
load_user_profile_screenshot,
load_baseline_profile_screenshot,
load_runtime_profile_screenshot,
now_iso_utc,
open_default_settings,
open_user_settings,
open_baseline_settings,
open_runtime_settings,
profile_origin,
profile_origin_display,
read_manifest,
restore_user_from_default,
restore_runtime_from_baseline,
runtime_profile_candidates,
set_last_profile,
set_quick_select,
user_profile_candidates,
write_manifest,
)
from bec_widgets.widgets.containers.dock_area.settings.dialogs import (
@@ -68,7 +68,7 @@ from bec_widgets.widgets.containers.dock_area.toolbar_components.workspace_actio
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindowNoRPC
from bec_widgets.widgets.containers.qt_ads import CDockWidget
from bec_widgets.widgets.control.device_control.positioner_box import PositionerBox, PositionerBox2D
from bec_widgets.widgets.control.scan_control.scan_control import ScanControl
from bec_widgets.widgets.control.scan_control import ScanControl
from bec_widgets.widgets.editors.bec_console.bec_console import BecConsole, BECShell
from bec_widgets.widgets.plots.heatmap.heatmap import Heatmap
from bec_widgets.widgets.plots.image.image import Image
@@ -79,7 +79,7 @@ from bec_widgets.widgets.plots.waveform.waveform import Waveform
from bec_widgets.widgets.progress.ring_progress_bar.ring_progress_bar import RingProgressBar
from bec_widgets.widgets.services.bec_queue.bec_queue import BECQueue
from bec_widgets.widgets.services.bec_status_box.bec_status_box import BECStatusBox
from bec_widgets.widgets.utility.logpanel import LogPanel
from bec_widgets.widgets.utility.logpanel.logpanel import LogPanel
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
logger = bec_logger.logger
@@ -108,6 +108,7 @@ class BECDockArea(DockAreaWidget):
"list_profiles",
"save_profile",
"load_profile",
"restore_baseline_profile",
"delete_profile",
]
@@ -235,11 +236,8 @@ class BECDockArea(DockAreaWidget):
def _load_initial_profile(self, name: str) -> None:
"""Load the initial profile."""
self.load_profile(name)
combo = self.toolbar.components.get_action("workspace_combo").widget
combo.blockSignals(True)
if not self._empty_profile_active:
combo.setCurrentText(name)
combo.blockSignals(False)
self._set_workspace_combo_text_silent(name)
def _start_empty_workspace(self) -> None:
"""
@@ -376,6 +374,7 @@ class BECDockArea(DockAreaWidget):
"bec_shell": (BECShell.ICON_NAME, "Add BEC Shell", "BECShell"),
"log_panel": (LogPanel.ICON_NAME, "Add LogPanel - Disabled", "LogPanel"),
"sbb_monitor": ("train", "Add SBB Monitor", "SBBMonitor"),
"log_panel": (LogPanel.ICON_NAME, "Add LogPanel", "LogPanel"),
}
# Create expandable menu actions (original behavior)
@@ -487,9 +486,7 @@ class BECDockArea(DockAreaWidget):
# first two items not needed for this part
for key, (_, _, widget_type) in mapping.items():
act = menu.actions[key].action
if widget_type == "LogPanel":
act.setEnabled(False) # keep disabled per issue #644
elif key == "terminal":
if key == "terminal":
act.triggered.connect(
lambda _, t=widget_type: self.new(widget=t, closable=True, startup_cmd=None)
)
@@ -510,10 +507,7 @@ class BECDockArea(DockAreaWidget):
for action_id, (_, _, widget_type) in mapping.items():
flat_action_id = f"flat_{action_id}"
flat_action = self.toolbar.components.get_action(flat_action_id).action
if widget_type == "LogPanel":
flat_action.setEnabled(False) # keep disabled per issue #644
else:
flat_action.triggered.connect(lambda _, t=widget_type: self.new(t))
flat_action.triggered.connect(lambda _, t=widget_type: self.new(t))
_connect_flat_actions(self._ACTION_MAPPINGS["menu_plots"])
_connect_flat_actions(self._ACTION_MAPPINGS["menu_devices"])
@@ -595,13 +589,13 @@ class BECDockArea(DockAreaWidget):
@property
def profile_namespace(self) -> str | None:
"""Namespace used to scope user/default profile files for this dock area."""
"""Namespace used to scope runtime/baseline profile files for this dock area."""
return self._resolve_profile_namespace()
def _profile_exists(self, name: str, namespace: str | None) -> bool:
return any(
os.path.exists(path) for path in user_profile_candidates(name, namespace)
) or any(os.path.exists(path) for path in default_profile_candidates(name, namespace))
os.path.exists(path) for path in runtime_profile_candidates(name, namespace)
) or any(os.path.exists(path) for path in baseline_profile_candidates(name, namespace))
def _write_snapshot_to_settings(self, settings, save_preview: bool = True) -> None:
"""
@@ -627,35 +621,34 @@ class BECDockArea(DockAreaWidget):
name: str,
namespace: str | None,
*,
write_default: bool = True,
write_user: bool = True,
write_baseline: bool = True,
write_runtime: bool = True,
save_preview: bool = True,
) -> None:
"""
Write profile settings to default and/or user settings files.
Write profile settings to baseline and/or runtime settings files.
Args:
name: The profile name.
namespace: The profile namespace.
write_default: Whether to write to the default settings file.
write_user: Whether to write to the user settings file.
write_baseline: Whether to write to the baseline settings file.
write_runtime: Whether to write to the runtime settings file.
save_preview: Whether to save a screenshot preview.
"""
if write_default:
ds = open_default_settings(name, namespace=namespace)
self._write_snapshot_to_settings(ds, save_preview=save_preview)
if not ds.value(SETTINGS_KEYS["created_at"], ""):
ds.setValue(SETTINGS_KEYS["created_at"], now_iso_utc())
if not ds.value(SETTINGS_KEYS["is_quick_select"], None):
ds.setValue(SETTINGS_KEYS["is_quick_select"], True)
if write_user:
us = open_user_settings(name, namespace=namespace)
self._write_snapshot_to_settings(us, save_preview=save_preview)
if not us.value(SETTINGS_KEYS["created_at"], ""):
us.setValue(SETTINGS_KEYS["created_at"], now_iso_utc())
if not us.value(SETTINGS_KEYS["is_quick_select"], None):
us.setValue(SETTINGS_KEYS["is_quick_select"], True)
def _write_settings(open_settings) -> None:
settings = open_settings(name, namespace=namespace)
self._write_snapshot_to_settings(settings, save_preview=save_preview)
if not settings.value(SETTINGS_KEYS["created_at"], ""):
settings.setValue(SETTINGS_KEYS["created_at"], now_iso_utc())
if not settings.value(SETTINGS_KEYS["is_quick_select"], None):
settings.setValue(SETTINGS_KEYS["is_quick_select"], True)
if write_baseline:
_write_settings(open_baseline_settings)
if write_runtime:
_write_settings(open_runtime_settings)
def _finalize_profile_change(self, name: str, namespace: str | None) -> None:
"""
@@ -673,6 +666,14 @@ class BECDockArea(DockAreaWidget):
combo = self.toolbar.components.get_action("workspace_combo").widget
combo.refresh_profiles(active_profile=name)
def _set_workspace_combo_text_silent(self, text: str) -> None:
combo = self.toolbar.components.get_action("workspace_combo").widget
was_blocked = combo.blockSignals(True)
try:
combo.setCurrentText(text)
finally:
combo.blockSignals(was_blocked)
def _enter_empty_profile_state(self) -> None:
"""
Switch to the transient empty workspace state.
@@ -709,10 +710,10 @@ class BECDockArea(DockAreaWidget):
Save the current workspace profile.
On first save of a given name:
- writes a default copy to states/default/<name>.ini with tag=default and created_at
- writes a user copy to states/user/<name>.ini with tag=user and created_at
On subsequent saves of user-owned profiles:
- updates both the default and user copies so restore uses the latest snapshot.
- writes a baseline copy to profiles/baseline/<name>.ini with created_at
- writes a runtime copy to profiles/runtime/<name>.ini with created_at
On subsequent saves:
- updates both the baseline and runtime copies so restore uses the latest snapshot.
Read-only bundled profiles cannot be overwritten.
Args:
@@ -776,7 +777,7 @@ class BECDockArea(DockAreaWidget):
overwrite_existing = origin == "settings"
origin_before_save = profile_origin(name, namespace=namespace)
overwrite_default = overwrite_existing and origin_before_save == "settings"
overwrite_baseline = overwrite_existing and origin_before_save == "settings"
# Display saving placeholder in toolbar
workspace_combo = self.toolbar.components.get_action("workspace_combo").widget
@@ -785,12 +786,12 @@ class BECDockArea(DockAreaWidget):
workspace_combo.setCurrentIndex(0)
workspace_combo.blockSignals(False)
# Write to default and/or user settings
should_write_default = overwrite_default or not any(
os.path.exists(path) for path in default_profile_candidates(name, namespace)
# Write to baseline and/or runtime settings
should_write_baseline = overwrite_baseline or not any(
os.path.exists(path) for path in baseline_profile_candidates(name, namespace)
)
self._write_profile_settings(
name, namespace, write_default=should_write_default, write_user=True
name, namespace, write_baseline=should_write_baseline, write_runtime=True
)
set_quick_select(name, quickselect, namespace=namespace)
@@ -800,7 +801,6 @@ class BECDockArea(DockAreaWidget):
self._pending_autosave_skip = (current_profile, name)
else:
self._pending_autosave_skip = None
workspace_combo.setCurrentText(name)
self._finalize_profile_change(name, namespace)
@SafeSlot()
@@ -820,16 +820,21 @@ class BECDockArea(DockAreaWidget):
@SafeSlot()
@SafeSlot(str)
@SafeSlot(str, bool)
@rpc_timeout(None)
def load_profile(self, name: str | None = None):
def load_profile(self, name: str | None = None, restore_baseline: bool = False):
"""
Load a workspace profile.
Before switching, persist the current profile to the user copy.
Prefer loading the user copy; fall back to the default copy.
Before switching, persist the current profile to the runtime copy.
Prefer loading the runtime copy; fall back to the baseline copy. When
``restore_baseline`` is True, first overwrite the runtime copy with the
baseline profile and then load it.
Args:
name (str | None): The name of the profile to load. If None, prompts the user.
restore_baseline (bool): If True, restore the runtime copy from the
baseline before loading. Defaults to False.
"""
if name == "":
return
@@ -848,14 +853,17 @@ class BECDockArea(DockAreaWidget):
if skip_pair and skip_pair == (prev_name, name):
self._pending_autosave_skip = None
else:
us_prev = open_user_settings(prev_name, namespace=namespace)
us_prev = open_runtime_settings(prev_name, namespace=namespace)
self._write_snapshot_to_settings(us_prev, save_preview=True)
if restore_baseline:
restore_runtime_from_baseline(name, namespace=namespace)
settings = None
if any(os.path.exists(path) for path in user_profile_candidates(name, namespace)):
settings = open_user_settings(name, namespace=namespace)
elif any(os.path.exists(path) for path in default_profile_candidates(name, namespace)):
settings = open_default_settings(name, namespace=namespace)
if any(os.path.exists(path) for path in runtime_profile_candidates(name, namespace)):
settings = open_runtime_settings(name, namespace=namespace)
elif any(os.path.exists(path) for path in baseline_profile_candidates(name, namespace)):
settings = open_baseline_settings(name, namespace=namespace)
if settings is None:
logger.warning(f"Profile '{name}' not found in namespace '{namespace}'. Creating new.")
self.delete_all()
@@ -897,32 +905,36 @@ class BECDockArea(DockAreaWidget):
@SafeSlot()
@SafeSlot(str)
def restore_user_profile_from_default(self, name: str | None = None):
@SafeSlot(str, bool)
@rpc_timeout(None)
def restore_baseline_profile(self, name: str | None = None, show_dialog: bool = False):
"""
Overwrite the user copy of *name* with the default baseline.
Overwrite the runtime copy of *name* with the baseline.
If *name* is None, target the currently active profile.
Args:
name (str | None): The name of the profile to restore. If None, uses the current profile.
show_dialog (bool): If True, ask for confirmation before restoring.
"""
target = name or getattr(self, "_current_profile_name", None)
if not target:
return
namespace = self.profile_namespace
current_pixmap = None
if self.isVisible():
current_pixmap = QPixmap()
ba = bytes(self.screenshot_bytes())
current_pixmap.loadFromData(ba)
if current_pixmap is None or current_pixmap.isNull():
current_pixmap = load_user_profile_screenshot(target, namespace=namespace)
default_pixmap = load_default_profile_screenshot(target, namespace=namespace)
if show_dialog:
current_pixmap = None
if self.isVisible():
current_pixmap = QPixmap()
ba = bytes(self.screenshot_bytes())
current_pixmap.loadFromData(ba)
if current_pixmap is None or current_pixmap.isNull():
current_pixmap = load_runtime_profile_screenshot(target, namespace=namespace)
baseline_pixmap = load_baseline_profile_screenshot(target, namespace=namespace)
if not RestoreProfileDialog.confirm(self, current_pixmap, default_pixmap):
return
if not RestoreProfileDialog.confirm(self, current_pixmap, baseline_pixmap):
return
restore_user_from_default(target, namespace=namespace)
restore_runtime_from_baseline(target, namespace=namespace)
self.delete_all()
self.load_profile(target)
@@ -1057,7 +1069,7 @@ class BECDockArea(DockAreaWidget):
manage_action = self.toolbar.components.get_action("manage_workspaces").action
if self.manage_dialog is None or not self.manage_dialog.isVisible():
self.manage_widget = WorkSpaceManager(
self, target_widget=self, default_profile=self._current_profile_name
self, target_widget=self, active_profile=self._current_profile_name
)
self.manage_dialog = QDialog(modal=False)
@@ -1156,7 +1168,7 @@ class BECDockArea(DockAreaWidget):
return
namespace = self.profile_namespace
settings = open_user_settings(name, namespace=namespace)
settings = open_runtime_settings(name, namespace=namespace)
self._write_snapshot_to_settings(settings)
set_last_profile(name, namespace=namespace, instance=self._last_profile_instance_id())
self._exit_snapshot_written = True
@@ -2,9 +2,13 @@
Utilities for managing BECDockArea profiles stored in INI files.
Policy:
- All created/modified profiles are stored under the BEC settings root: <base_path>/profiles/{default,user}
- Bundled read-only defaults are discovered in BW core states/default and plugin bec_widgets/profiles but never written to.
- Lookup order when reading: user settings default app or plugin bundled default.
- All created/modified profiles are stored under the BEC settings root:
<base_path>/profiles/{baseline,runtime}
- Bundled read-only baselines are discovered in BW core profiles and plugin
bec_widgets/profiles but never written to.
- Lookup order when reading: runtime settings baseline app or plugin bundled baseline.
- Legacy settings paths profiles/{default,user} are read through a thin segment
alias layer and copied to the canonical location on first access.
"""
from __future__ import annotations
@@ -32,6 +36,12 @@ logger = bec_logger.logger
MODULE_PATH = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
ProfileOrigin = Literal["module", "plugin", "settings", "unknown"]
ProfileSegment = Literal["baseline", "runtime"]
_PROFILE_SEGMENT_ALIASES: dict[ProfileSegment, tuple[str, str]] = {
"baseline": ("baseline", "default"),
"runtime": ("runtime", "user"),
}
def module_profiles_dir() -> str:
@@ -130,7 +140,7 @@ def _profiles_dir(segment: str, namespace: str | None) -> str:
Build (and ensure) the directory that holds profiles for a namespace segment.
Args:
segment (str): Either ``"user"`` or ``"default"``.
segment (str): Profile segment directory name.
namespace (str | None): Optional namespace label to scope profiles.
Returns:
@@ -143,157 +153,175 @@ def _profiles_dir(segment: str, namespace: str | None) -> str:
return path
def _user_path_candidates(name: str, namespace: str | None) -> list[str]:
"""
Generate candidate user-profile paths honoring namespace fallbacks.
Args:
name (str): Profile name without extension.
namespace (str | None): Optional namespace label.
Returns:
list[str]: Ordered list of candidate user profile paths (.ini files).
"""
def _candidate_namespaces(namespace: str | None) -> list[str | None]:
ns = slugify.slugify(namespace, separator="_") if namespace else None
primary = os.path.join(_profiles_dir("user", ns), f"{name}.ini")
if not ns:
return [primary]
legacy = os.path.join(_profiles_dir("user", None), f"{name}.ini")
return [primary, legacy] if legacy != primary else [primary]
return [None]
return [ns, None]
def _default_path_candidates(name: str, namespace: str | None) -> list[str]:
def _segment_profile_path(segment_name: str, name: str, namespace: str | None) -> str:
return os.path.join(_profiles_dir(segment_name, namespace), f"{name}.ini")
def _canonical_profile_path(segment: ProfileSegment, name: str, namespace: str | None) -> str:
return _segment_profile_path(_PROFILE_SEGMENT_ALIASES[segment][0], name, namespace)
def _segment_path_candidates(
segment: ProfileSegment,
name: str,
namespace: str | None,
*,
include_legacy: bool = True,
migrate_legacy: bool = True,
) -> list[str]:
"""
Generate candidate default-profile paths honoring namespace fallbacks.
Generate profile candidates for a canonical segment.
Args:
name (str): Profile name without extension.
namespace (str | None): Optional namespace label.
Returns:
list[str]: Ordered list of candidate default profile paths (.ini files).
Canonical baseline/runtime files are always preferred. Namespace fallback
files and legacy default/user files are copied to the primary canonical path
when the primary file does not exist.
"""
ns = slugify.slugify(namespace, separator="_") if namespace else None
primary = os.path.join(_profiles_dir("default", ns), f"{name}.ini")
if not ns:
return [primary]
legacy = os.path.join(_profiles_dir("default", None), f"{name}.ini")
return [primary, legacy] if legacy != primary else [primary]
canonical = [
_segment_profile_path(_PROFILE_SEGMENT_ALIASES[segment][0], name, ns)
for ns in _candidate_namespaces(namespace)
]
legacy = []
if include_legacy:
legacy = [
_segment_profile_path(_PROFILE_SEGMENT_ALIASES[segment][1], name, ns)
for ns in _candidate_namespaces(namespace)
]
primary_canonical = canonical[0]
if migrate_legacy and not os.path.exists(primary_canonical):
canonical_src = next((path for path in canonical[1:] if os.path.exists(path)), None)
if canonical_src:
os.makedirs(os.path.dirname(primary_canonical), exist_ok=True)
shutil.copy2(canonical_src, primary_canonical)
elif include_legacy:
legacy_src = next((path for path in legacy if os.path.exists(path)), None)
if legacy_src:
os.makedirs(os.path.dirname(primary_canonical), exist_ok=True)
shutil.copy2(legacy_src, primary_canonical)
return list(dict.fromkeys(canonical + legacy))
def default_profiles_dir(namespace: str | None = None) -> str:
def baseline_profiles_dir(namespace: str | None = None) -> str:
"""
Return the directory that stores default profiles for the namespace.
Return the directory that stores baseline profiles for the namespace.
Args:
namespace (str | None, optional): Namespace label. Defaults to ``None``.
Returns:
str: Absolute path to the default profile directory.
str: Absolute path to the baseline profile directory.
"""
return _profiles_dir("default", namespace)
return _profiles_dir("baseline", namespace)
def user_profiles_dir(namespace: str | None = None) -> str:
def runtime_profiles_dir(namespace: str | None = None) -> str:
"""
Return the directory that stores user profiles for the namespace.
Return the directory that stores runtime profiles for the namespace.
Args:
namespace (str | None, optional): Namespace label. Defaults to ``None``.
Returns:
str: Absolute path to the user profile directory.
str: Absolute path to the runtime profile directory.
"""
return _profiles_dir("user", namespace)
return _profiles_dir("runtime", namespace)
def default_profile_path(name: str, namespace: str | None = None) -> str:
def baseline_profile_path(name: str, namespace: str | None = None) -> str:
"""
Compute the canonical default profile path for a profile name.
Compute the canonical baseline profile path for a profile name.
Args:
name (str): Profile name without extension.
namespace (str | None, optional): Namespace label. Defaults to ``None``.
Returns:
str: Absolute path to the default profile file (.ini).
str: Absolute path to the baseline profile file (.ini).
"""
return _default_path_candidates(name, namespace)[0]
return _canonical_profile_path("baseline", name, namespace)
def user_profile_path(name: str, namespace: str | None = None) -> str:
def runtime_profile_path(name: str, namespace: str | None = None) -> str:
"""
Compute the canonical user profile path for a profile name.
Compute the canonical runtime profile path for a profile name.
Args:
name (str): Profile name without extension.
namespace (str | None, optional): Namespace label. Defaults to ``None``.
Returns:
str: Absolute path to the user profile file (.ini).
str: Absolute path to the runtime profile file (.ini).
"""
return _user_path_candidates(name, namespace)[0]
return _canonical_profile_path("runtime", name, namespace)
def user_profile_candidates(name: str, namespace: str | None = None) -> list[str]:
def runtime_profile_candidates(name: str, namespace: str | None = None) -> list[str]:
"""
List all user profile path candidates for a profile name.
List all runtime profile path candidates for a profile name.
Args:
name (str): Profile name without extension.
namespace (str | None, optional): Namespace label. Defaults to ``None``.
Returns:
list[str]: De-duplicated list of candidate user profile paths.
list[str]: De-duplicated list of candidate runtime profile paths.
"""
return list(dict.fromkeys(_user_path_candidates(name, namespace)))
return _segment_path_candidates("runtime", name, namespace)
def default_profile_candidates(name: str, namespace: str | None = None) -> list[str]:
def baseline_profile_candidates(name: str, namespace: str | None = None) -> list[str]:
"""
List all default profile path candidates for a profile name.
List all baseline profile path candidates for a profile name.
Args:
name (str): Profile name without extension.
namespace (str | None, optional): Namespace label. Defaults to ``None``.
Returns:
list[str]: De-duplicated list of candidate default profile paths.
list[str]: De-duplicated list of candidate baseline profile paths.
"""
return list(dict.fromkeys(_default_path_candidates(name, namespace)))
return _segment_path_candidates("baseline", name, namespace)
def _existing_user_settings(name: str, namespace: str | None = None) -> QSettings | None:
def _existing_runtime_settings(name: str, namespace: str | None = None) -> QSettings | None:
"""
Resolve the first existing user profile settings object.
Resolve the first existing runtime profile settings object.
Args:
name (str): Profile name without extension.
namespace (str | None, optional): Namespace label to search. Defaults to ``None``.
Returns:
QSettings | None: Config for the first existing user profile candidate, or ``None``
QSettings | None: Config for the first existing runtime profile candidate, or ``None``
when no files are present.
"""
for path in user_profile_candidates(name, namespace):
for path in runtime_profile_candidates(name, namespace):
if os.path.exists(path):
return QSettings(path, QSettings.IniFormat)
return None
def _existing_default_settings(name: str, namespace: str | None = None) -> QSettings | None:
def _existing_baseline_settings(name: str, namespace: str | None = None) -> QSettings | None:
"""
Resolve the first existing default profile settings object.
Resolve the first existing baseline profile settings object.
Args:
name (str): Profile name without extension.
namespace (str | None, optional): Namespace label to search. Defaults to ``None``.
Returns:
QSettings | None: Config for the first existing default profile candidate, or ``None``
QSettings | None: Config for the first existing baseline profile candidate, or ``None``
when no files are present.
"""
for path in default_profile_candidates(name, namespace):
for path in baseline_profile_candidates(name, namespace):
if os.path.exists(path):
return QSettings(path, QSettings.IniFormat)
return None
@@ -347,7 +375,7 @@ def profile_origin(name: str, namespace: str | None = None) -> ProfileOrigin:
plugin_path = plugin_profile_path(name)
if plugin_path and os.path.exists(plugin_path):
return "plugin"
for path in user_profile_candidates(name, namespace) + default_profile_candidates(
for path in runtime_profile_candidates(name, namespace) + baseline_profile_candidates(
name, namespace
):
if os.path.exists(path):
@@ -406,8 +434,8 @@ def delete_profile_files(name: str, namespace: str | None = None) -> bool:
read_only = is_profile_read_only(name, namespace)
removed = False
# Always allow removing user copies; keep default copies for read-only origins.
for path in set(user_profile_candidates(name, namespace)):
# Always allow removing runtime copies; keep baseline copies for read-only origins.
for path in set(runtime_profile_candidates(name, namespace)):
try:
os.remove(path)
removed = True
@@ -415,7 +443,7 @@ def delete_profile_files(name: str, namespace: str | None = None) -> bool:
continue
if not read_only:
for path in set(default_profile_candidates(name, namespace)):
for path in set(baseline_profile_candidates(name, namespace)):
try:
os.remove(path)
removed = True
@@ -443,7 +471,7 @@ SETTINGS_KEYS = {
def list_profiles(namespace: str | None = None) -> list[str]:
"""
Enumerate all known profile names, syncing bundled defaults when missing locally.
Enumerate all known profile names, syncing bundled baselines when missing locally.
Args:
namespace (str | None, optional): Namespace label scoped to the profile set.
@@ -459,16 +487,27 @@ def list_profiles(namespace: str | None = None) -> list[str]:
return set()
return {os.path.splitext(f)[0] for f in os.listdir(directory) if f.endswith(".ini")}
settings_dirs = {default_profiles_dir(namespace), user_profiles_dir(namespace)}
settings_dirs = {baseline_profiles_dir(namespace), runtime_profiles_dir(namespace)}
if ns:
settings_dirs.add(default_profiles_dir(None))
settings_dirs.add(user_profiles_dir(None))
settings_dirs.add(baseline_profiles_dir(None))
settings_dirs.add(runtime_profiles_dir(None))
for segment in ("baseline", "runtime"):
for legacy_dir in [
_profiles_dir(_PROFILE_SEGMENT_ALIASES[segment][1], item)
for item in _candidate_namespaces(namespace)
]:
settings_dirs.add(legacy_dir)
settings_names: set[str] = set()
for directory in settings_dirs:
settings_names |= _collect_from(directory)
# Also consider read-only defaults from core module and beamline plugin repositories
for name in sorted(settings_names):
runtime_profile_candidates(name, namespace)
baseline_profile_candidates(name, namespace)
# Also consider read-only baselines from core module and beamline plugin repositories
read_only_sources: dict[str, tuple[str, str]] = {}
sources: list[tuple[str, str | None]] = [
("module", module_profiles_dir()),
@@ -484,17 +523,17 @@ def list_profiles(namespace: str | None = None) -> list[str]:
read_only_sources.setdefault(name, (origin, os.path.join(directory, filename)))
for name, (_origin, src) in sorted(read_only_sources.items()):
# Ensure a copy in the namespace-specific settings default directory
dst_default = default_profile_path(name, namespace)
if not os.path.exists(dst_default):
os.makedirs(os.path.dirname(dst_default), exist_ok=True)
shutil.copyfile(src, dst_default)
# Ensure a user copy exists to allow edits in the writable settings area
dst_user = user_profile_path(name, namespace)
if not os.path.exists(dst_user):
os.makedirs(os.path.dirname(dst_user), exist_ok=True)
shutil.copyfile(src, dst_user)
s = open_user_settings(name, namespace)
# Ensure a copy in the namespace-specific settings baseline directory.
dst_baseline = baseline_profile_path(name, namespace)
if not os.path.exists(dst_baseline):
os.makedirs(os.path.dirname(dst_baseline), exist_ok=True)
shutil.copy2(src, dst_baseline)
# Ensure a runtime copy exists to allow edits in the writable settings area.
dst_runtime = runtime_profile_path(name, namespace)
if not os.path.exists(dst_runtime):
os.makedirs(os.path.dirname(dst_runtime), exist_ok=True)
shutil.copy2(src, dst_runtime)
s = open_runtime_settings(name, namespace)
if s.value(SETTINGS_KEYS["created_at"], "") == "":
s.setValue(SETTINGS_KEYS["created_at"], now_iso_utc())
@@ -504,32 +543,34 @@ def list_profiles(namespace: str | None = None) -> list[str]:
return sorted(settings_names)
def open_default_settings(name: str, namespace: str | None = None) -> QSettings:
def open_baseline_settings(name: str, namespace: str | None = None) -> QSettings:
"""
Open (and create if necessary) the default profile settings file.
Open (and create if necessary) the baseline profile settings file.
Args:
name (str): Profile name without extension.
namespace (str | None, optional): Namespace label. Defaults to ``None``.
Returns:
QSettings: Settings instance targeting the default profile file.
QSettings: Settings instance targeting the baseline profile file.
"""
return QSettings(default_profile_path(name, namespace), QSettings.IniFormat)
baseline_profile_candidates(name, namespace)
return QSettings(baseline_profile_path(name, namespace), QSettings.IniFormat)
def open_user_settings(name: str, namespace: str | None = None) -> QSettings:
def open_runtime_settings(name: str, namespace: str | None = None) -> QSettings:
"""
Open (and create if necessary) the user profile settings file.
Open (and create if necessary) the runtime profile settings file.
Args:
name (str): Profile name without extension.
namespace (str | None, optional): Namespace label. Defaults to ``None``.
Returns:
QSettings: Settings instance targeting the user profile file.
QSettings: Settings instance targeting the runtime profile file.
"""
return QSettings(user_profile_path(name, namespace), QSettings.IniFormat)
runtime_profile_candidates(name, namespace)
return QSettings(runtime_profile_path(name, namespace), QSettings.IniFormat)
def _app_settings() -> QSettings:
@@ -759,26 +800,26 @@ def read_manifest(settings: QSettings) -> list[dict]:
return items
def restore_user_from_default(name: str, namespace: str | None = None) -> None:
def restore_runtime_from_baseline(name: str, namespace: str | None = None) -> None:
"""
Copy the default profile to the user profile, preserving quick-select flag.
Copy the baseline profile to the runtime profile, preserving quick-select flag.
Args:
name(str): Profile name without extension.
namespace(str | None, optional): Namespace label. Defaults to ``None``.
"""
src = None
for candidate in default_profile_candidates(name, namespace):
for candidate in baseline_profile_candidates(name, namespace):
if os.path.exists(candidate):
src = candidate
break
if not src:
return
dst = user_profile_path(name, namespace)
dst = runtime_profile_path(name, namespace)
preserve_quick_select = is_quick_select(name, namespace)
os.makedirs(os.path.dirname(dst), exist_ok=True)
shutil.copyfile(src, dst)
s = open_user_settings(name, namespace)
s = open_runtime_settings(name, namespace)
if not s.value(SETTINGS_KEYS["created_at"], ""):
s.setValue(SETTINGS_KEYS["created_at"], now_iso_utc())
if preserve_quick_select:
@@ -796,9 +837,9 @@ def is_quick_select(name: str, namespace: str | None = None) -> bool:
Returns:
bool: True if quick-select is enabled for the profile.
"""
s = _existing_user_settings(name, namespace)
s = _existing_runtime_settings(name, namespace)
if s is None:
s = _existing_default_settings(name, namespace)
s = _existing_baseline_settings(name, namespace)
if s is None:
return False
return s.value(SETTINGS_KEYS["is_quick_select"], False, type=bool)
@@ -813,13 +854,13 @@ def set_quick_select(name: str, enabled: bool, namespace: str | None = None) ->
enabled(bool): True to enable quick-select, False to disable.
namespace(str | None, optional): Namespace label. Defaults to ``None``.
"""
s = open_user_settings(name, namespace)
s = open_runtime_settings(name, namespace)
s.setValue(SETTINGS_KEYS["is_quick_select"], bool(enabled))
def list_quick_profiles(namespace: str | None = None) -> list[str]:
"""
List only profiles that have quick-select enabled (user wins over default).
List only profiles that have quick-select enabled (runtime wins over baseline).
Args:
namespace(str | None, optional): Namespace label. Defaults to ``None``.
@@ -909,8 +950,8 @@ class ProfileInfo(BaseModel):
is_quick_select: bool = False
widget_count: int = 0
size_kb: int = 0
user_path: str = ""
default_path: str = ""
runtime_path: str = ""
baseline_path: str = ""
origin: ProfileOrigin = "unknown"
is_read_only: bool = False
@@ -924,19 +965,19 @@ def get_profile_info(name: str, namespace: str | None = None) -> ProfileInfo:
namespace (str | None, optional): Namespace label. Defaults to ``None``.
Returns:
ProfileInfo: Structured profile metadata, preferring the user copy when present.
ProfileInfo: Structured profile metadata, preferring the runtime copy when present.
"""
user_paths = user_profile_candidates(name, namespace)
default_paths = default_profile_candidates(name, namespace)
u_path = next((p for p in user_paths if os.path.exists(p)), user_paths[0])
d_path = next((p for p in default_paths if os.path.exists(p)), default_paths[0])
runtime_paths = runtime_profile_candidates(name, namespace)
baseline_paths = baseline_profile_candidates(name, namespace)
r_path = next((p for p in runtime_paths if os.path.exists(p)), runtime_paths[0])
b_path = next((p for p in baseline_paths if os.path.exists(p)), baseline_paths[0])
origin = profile_origin(name, namespace)
read_only = origin in {"module", "plugin"}
prefer_user = os.path.exists(u_path)
if prefer_user:
s = QSettings(u_path, QSettings.IniFormat)
elif os.path.exists(d_path):
s = QSettings(d_path, QSettings.IniFormat)
prefer_runtime = os.path.exists(r_path)
if prefer_runtime:
s = QSettings(r_path, QSettings.IniFormat)
elif os.path.exists(b_path):
s = QSettings(b_path, QSettings.IniFormat)
else:
s = None
if s is None:
@@ -957,14 +998,14 @@ def get_profile_info(name: str, namespace: str | None = None) -> ProfileInfo:
is_quick_select=False,
widget_count=0,
size_kb=0,
user_path=u_path,
default_path=d_path,
runtime_path=r_path,
baseline_path=b_path,
origin=origin,
is_read_only=read_only,
)
created = s.value(SETTINGS_KEYS["created_at"], "", type=str) or now_iso_utc()
src_path = u_path if prefer_user else d_path
src_path = r_path if prefer_runtime else b_path
modified = _file_modified_iso(src_path)
count = _manifest_count(s)
try:
@@ -990,8 +1031,8 @@ def get_profile_info(name: str, namespace: str | None = None) -> ProfileInfo:
is_quick_select=is_quick_select(name, namespace),
widget_count=count,
size_kb=size_kb,
user_path=u_path,
default_path=d_path,
runtime_path=r_path,
baseline_path=b_path,
origin=origin,
is_read_only=read_only,
)
@@ -999,7 +1040,7 @@ def get_profile_info(name: str, namespace: str | None = None) -> ProfileInfo:
def load_profile_screenshot(name: str, namespace: str | None = None) -> QPixmap | None:
"""
Load the stored screenshot pixmap for a profile from settings (user preferred).
Load the stored screenshot pixmap for a profile from settings (runtime preferred).
Args:
name (str): Profile name without extension.
@@ -1008,17 +1049,17 @@ def load_profile_screenshot(name: str, namespace: str | None = None) -> QPixmap
Returns:
QPixmap | None: Screenshot pixmap or ``None`` if unavailable.
"""
s = _existing_user_settings(name, namespace)
s = _existing_runtime_settings(name, namespace)
if s is None:
s = _existing_default_settings(name, namespace)
s = _existing_baseline_settings(name, namespace)
if s is None:
return None
return _load_screenshot_from_settings(s)
def load_default_profile_screenshot(name: str, namespace: str | None = None) -> QPixmap | None:
def load_baseline_profile_screenshot(name: str, namespace: str | None = None) -> QPixmap | None:
"""
Load the screenshot from the default profile copy, if available.
Load the screenshot from the baseline profile copy, if available.
Args:
name (str): Profile name without extension.
@@ -1027,15 +1068,15 @@ def load_default_profile_screenshot(name: str, namespace: str | None = None) ->
Returns:
QPixmap | None: Screenshot pixmap or ``None`` if unavailable.
"""
s = _existing_default_settings(name, namespace)
s = _existing_baseline_settings(name, namespace)
if s is None:
return None
return _load_screenshot_from_settings(s)
def load_user_profile_screenshot(name: str, namespace: str | None = None) -> QPixmap | None:
def load_runtime_profile_screenshot(name: str, namespace: str | None = None) -> QPixmap | None:
"""
Load the screenshot from the user profile copy, if available.
Load the screenshot from the runtime profile copy, if available.
Args:
name (str): Profile name without extension.
@@ -1044,7 +1085,7 @@ def load_user_profile_screenshot(name: str, namespace: str | None = None) -> QPi
Returns:
QPixmap | None: Screenshot pixmap or ``None`` if unavailable.
"""
s = _existing_user_settings(name, namespace)
s = _existing_runtime_settings(name, namespace)
if s is None:
return None
return _load_screenshot_from_settings(s)
@@ -160,7 +160,7 @@ class SaveProfileDialog(QDialog):
self,
"Read-only profile",
(
f"'{name}' is a default profile provided by {provider} and cannot be overwritten.\n"
f"'{name}' is a baseline profile provided by {provider} and cannot be overwritten.\n"
"Please choose a different name."
),
)
@@ -179,7 +179,7 @@ class SaveProfileDialog(QDialog):
"Overwrite profile",
(
f"A profile named '{name}' already exists.\n\n"
"Overwriting will update both the saved profile and its restore default.\n"
"Overwriting will update both the runtime profile and its restore baseline.\n"
"Do you want to continue?"
),
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No,
@@ -257,21 +257,24 @@ class PreviewPanel(QGroupBox):
class RestoreProfileDialog(QDialog):
"""
Confirmation dialog that previews the current profile screenshot against the default baseline.
Confirmation dialog that previews the current runtime screenshot against the baseline.
"""
def __init__(
self, parent: QWidget | None, current_pixmap: QPixmap | None, default_pixmap: QPixmap | None
self,
parent: QWidget | None,
current_pixmap: QPixmap | None,
baseline_pixmap: QPixmap | None,
):
super().__init__(parent)
self.setWindowTitle("Restore Profile to Default")
self.setWindowTitle("Restore Profile to Baseline")
self.setModal(True)
self.resize(880, 480)
layout = QVBoxLayout(self)
info_label = QLabel(
"Restoring will discard your custom layout and replace it with the default profile."
"Restoring will discard your runtime layout and replace it with the baseline profile."
)
info_label.setWordWrap(True)
layout.addWidget(info_label)
@@ -280,7 +283,7 @@ class RestoreProfileDialog(QDialog):
layout.addLayout(preview_row)
current_preview = PreviewPanel("Current", current_pixmap, self)
default_preview = PreviewPanel("Default", default_pixmap, self)
baseline_preview = PreviewPanel("Baseline", baseline_pixmap, self)
# Equal expansion left/right
preview_row.addWidget(current_preview, 1)
@@ -292,7 +295,7 @@ class RestoreProfileDialog(QDialog):
arrow_label.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Expanding)
preview_row.addWidget(arrow_label)
preview_row.addWidget(default_preview, 1)
preview_row.addWidget(baseline_preview, 1)
# Enforce equal stretch for both previews
preview_row.setStretch(0, 1)
@@ -300,7 +303,7 @@ class RestoreProfileDialog(QDialog):
preview_row.setStretch(2, 1)
warn_label = QLabel(
"This action cannot be undone. Do you want to restore the default layout now?"
"This action cannot be undone. Do you want to restore the baseline layout now?"
)
warn_label.setWordWrap(True)
layout.addWidget(warn_label)
@@ -324,7 +327,7 @@ class RestoreProfileDialog(QDialog):
@staticmethod
def confirm(
parent: QWidget | None, current_pixmap: QPixmap | None, default_pixmap: QPixmap | None
parent: QWidget | None, current_pixmap: QPixmap | None, baseline_pixmap: QPixmap | None
) -> bool:
dialog = RestoreProfileDialog(parent, current_pixmap, default_pixmap)
dialog = RestoreProfileDialog(parent, current_pixmap, baseline_pixmap)
return dialog.exec() == QDialog.Accepted
@@ -48,7 +48,7 @@ class WorkSpaceManager(BECWidget, QWidget):
HEADERS = ["Actions", "Profile", "Author"]
def __init__(
self, parent=None, target_widget=None, default_profile: str | None = None, **kwargs
self, parent=None, target_widget=None, active_profile: str | None = None, **kwargs
):
super().__init__(parent=parent, **kwargs)
self.target_widget = target_widget
@@ -59,13 +59,13 @@ class WorkSpaceManager(BECWidget, QWidget):
self._init_ui()
if self.target_widget is not None and hasattr(self.target_widget, "profile_changed"):
self.target_widget.profile_changed.connect(self.on_profile_changed)
if default_profile is not None:
self._select_by_name(default_profile)
self._show_profile_details(default_profile)
if active_profile is not None:
self._select_by_name(active_profile)
self._show_profile_details(active_profile)
def _init_ui(self):
self.root_layout = QHBoxLayout(self)
self.splitter = QSplitter(Qt.Horizontal, self)
self.splitter = QSplitter(Qt.Orientation.Horizontal, self)
self.root_layout.addWidget(self.splitter)
# Init components
@@ -89,7 +89,9 @@ class WorkSpaceManager(BECWidget, QWidget):
left_panel.setMinimumWidth(220)
# Make the screenshot preview expand to fill remaining space
self.screenshot_label.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
self.screenshot_label.setSizePolicy(
QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding
)
self.right_box = QGroupBox("Profile Screenshot Preview", self)
right_col = QVBoxLayout(self.right_box)
@@ -250,8 +252,8 @@ class WorkSpaceManager(BECWidget, QWidget):
("Quick select", "Yes" if info.is_quick_select else "No"),
("Widgets", str(info.widget_count)),
("Size (KB)", str(info.size_kb)),
("User path", info.user_path or ""),
("Default path", info.default_path or ""),
("Runtime path", info.runtime_path or ""),
("Baseline path", info.baseline_path or ""),
]
for k, v in entries:
self.profile_details_tree.addTopLevelItem(QTreeWidgetItem([k, v]))
@@ -24,19 +24,9 @@ class ProfileComboBox(QComboBox):
def set_quick_profile_provider(self, provider: Callable[[], list[str]]) -> None:
self._quick_provider = provider
def refresh_profiles(
self, active_profile: str | None = None, show_empty_profile: bool = False
def _refresh_profiles(
self, current_text: str, active_profile: str | None = None, show_empty_profile: bool = False
) -> None:
"""
Refresh the profile list and ensure the active profile is visible.
Args:
active_profile(str | None): The currently active profile name.
show_empty_profile(bool): If True, show an explicit empty unsaved workspace entry.
"""
current_text = active_profile or self.currentText()
self.blockSignals(True)
self.clear()
quick_profiles = self._quick_provider()
@@ -103,7 +93,6 @@ class ProfileComboBox(QComboBox):
if index >= 0:
self.setCurrentIndex(index)
self.blockSignals(False)
if active_profile and self.currentText() != active_profile:
idx = self.findText(active_profile)
if idx >= 0:
@@ -115,6 +104,24 @@ class ProfileComboBox(QComboBox):
else:
self.setToolTip("")
def refresh_profiles(
self, active_profile: str | None = None, show_empty_profile: bool = False
) -> None:
"""
Refresh the profile list and ensure the active profile is visible.
Args:
active_profile(str | None): The currently active profile name.
show_empty_profile(bool): If True, show an explicit empty unsaved workspace entry.
"""
current_text = active_profile or self.currentText()
was_blocked = self.blockSignals(True)
try:
self._refresh_profiles(current_text, active_profile, show_empty_profile)
finally:
self.blockSignals(was_blocked)
def workspace_bundle(components: ToolbarComponents, enable_tools: bool = True) -> ToolbarBundle:
"""
@@ -122,6 +129,7 @@ def workspace_bundle(components: ToolbarComponents, enable_tools: bool = True) -
Args:
components (ToolbarComponents): The components to be added to the bundle.
enable_tools(bool): If True, show the workspace management tools; otherwise, only show the profile combo.
Returns:
ToolbarBundle: The workspace toolbar bundle.
@@ -143,15 +151,15 @@ def workspace_bundle(components: ToolbarComponents, enable_tools: bool = True) -
components.get_action("save_workspace").action.setVisible(enable_tools)
components.add_safe(
"reset_default_workspace",
"reset_baseline_workspace",
MaterialIconAction(
icon_name="undo",
tooltip="Refresh Current Workspace",
tooltip="Restore Baseline Profile",
checkable=False,
parent=components.toolbar,
),
)
components.get_action("reset_default_workspace").action.setVisible(enable_tools)
components.get_action("reset_baseline_workspace").action.setVisible(enable_tools)
components.add_safe(
"manage_workspaces",
@@ -164,7 +172,7 @@ def workspace_bundle(components: ToolbarComponents, enable_tools: bool = True) -
bundle = ToolbarBundle("workspace", components)
bundle.add_action("workspace_combo")
bundle.add_action("save_workspace")
bundle.add_action("reset_default_workspace")
bundle.add_action("reset_baseline_workspace")
bundle.add_action("manage_workspaces")
return bundle
@@ -194,9 +202,9 @@ class WorkspaceConnection(BundleConnection):
self.target_widget.load_profile
)
reset_action = self.components.get_action("reset_default_workspace").action
reset_action = self.components.get_action("reset_baseline_workspace").action
if reset_action.isVisible():
reset_action.triggered.connect(self._reset_workspace_to_default)
reset_action.triggered.connect(self._reset_workspace_to_baseline)
manage_action = self.components.get_action("manage_workspaces").action
if manage_action.isVisible():
@@ -213,9 +221,9 @@ class WorkspaceConnection(BundleConnection):
self.target_widget.load_profile
)
reset_action = self.components.get_action("reset_default_workspace").action
reset_action = self.components.get_action("reset_baseline_workspace").action
if reset_action.isVisible():
reset_action.triggered.disconnect(self._reset_workspace_to_default)
reset_action.triggered.disconnect(self._reset_workspace_to_baseline)
manage_action = self.components.get_action("manage_workspaces").action
if manage_action.isVisible():
@@ -223,8 +231,8 @@ class WorkspaceConnection(BundleConnection):
self._connected = False
@SafeSlot()
def _reset_workspace_to_default(self):
def _reset_workspace_to_baseline(self):
"""
Refreshes the current workspace.
"""
self.target_widget.restore_user_profile_from_default()
self.target_widget.restore_baseline_profile(show_dialog=True)
@@ -22,7 +22,7 @@ from qtpy.QtWidgets import (
)
from typeguard import typechecked
from bec_widgets.cli.rpc.rpc_widget_handler import widget_handler
from bec_widgets.utils.rpc_widget_handler import widget_handler
class LayoutManagerWidget(QWidget):
@@ -4,11 +4,7 @@ import webbrowser
class BECWebLinksMixin:
@staticmethod
def open_bec_docs():
webbrowser.open("https://beamline-experiment-control.readthedocs.io/en/latest/")
@staticmethod
def open_bec_widgets_docs():
webbrowser.open("https://bec.readthedocs.io/projects/bec-widgets/en/latest/")
webbrowser.open("https://bec-project.github.io/bec_docs/")
@staticmethod
def open_bec_bug_report():
@@ -355,17 +355,13 @@ class BECMainWindow(BECWidget, QMainWindow):
bec_docs = QAction("BEC Docs", self)
bec_docs.setIcon(help_icon)
widgets_docs = QAction("BEC Widgets Docs", self)
widgets_docs.setIcon(help_icon)
bug_report = QAction("Bug Report", self)
bug_report.setIcon(bug_icon)
bec_docs.triggered.connect(BECWebLinksMixin.open_bec_docs)
widgets_docs.triggered.connect(BECWebLinksMixin.open_bec_widgets_docs)
bug_report.triggered.connect(BECWebLinksMixin.open_bec_bug_report)
help_menu.addAction(bec_docs)
help_menu.addAction(widgets_docs)
help_menu.addAction(bug_report)
################################################################################
@@ -1,4 +1 @@
from PySide6QtAds import *
CDockManager.setConfigFlag(CDockManager.eConfigFlag.FocusHighlighting, True)
CDockManager.setConfigFlag(CDockManager.eConfigFlag.RetainTabSizeWhenCloseButtonHidden, True)
@@ -0,0 +1 @@
from .components import DeviceTable, DMConfigView, DocstringView, OphydValidation
@@ -0,0 +1,5 @@
# from .device_table_view import DeviceTableView
from .device_table.device_table import DeviceTable
from .dm_config_view import DMConfigView
from .dm_docstring_view import DocstringView, docstring_to_markdown
from .ophyd_validation.ophyd_validation import OphydValidation
@@ -0,0 +1,3 @@
from .available_device_resources import AvailableDeviceResources
__all__ = ["AvailableDeviceResources"]
@@ -22,7 +22,7 @@ from bec_widgets.utils.fuzzy_search import is_match
from bec_widgets.widgets.control.device_manager.components.device_table.device_table_row import (
DeviceTableRow,
)
from bec_widgets.widgets.control.device_manager.components.ophyd_validation.ophyd_validation_utils import (
from bec_widgets.widgets.control.device_manager.components.ophyd_validation import (
ConfigStatus,
ConnectionStatus,
get_validation_icons,
@@ -2,7 +2,7 @@
from bec_lib.atlas_models import Device as DeviceModel
from bec_widgets.widgets.control.device_manager.components.ophyd_validation.ophyd_validation_utils import (
from bec_widgets.widgets.control.device_manager.components.ophyd_validation import (
ConfigStatus,
ConnectionStatus,
)
@@ -0,0 +1,8 @@
from .ophyd_validation_utils import (
ConfigStatus,
ConnectionStatus,
DeviceTestModel,
format_error_to_md,
get_validation_icons,
)
from .validation_list_item import ValidationButton, ValidationListItem
@@ -22,16 +22,14 @@ from bec_widgets.utils.bec_list import BECList
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.widgets.control.device_manager.components.ophyd_validation.ophyd_validation_utils import (
from bec_widgets.widgets.control.device_manager.components.ophyd_validation import (
ConfigStatus,
ConnectionStatus,
DeviceTestModel,
format_error_to_md,
get_validation_icons,
)
from bec_widgets.widgets.control.device_manager.components.ophyd_validation.validation_list_item import (
ValidationButton,
ValidationListItem,
format_error_to_md,
get_validation_icons,
)
READY_TO_TEST = False
@@ -7,7 +7,7 @@ from qtpy import QtCore, QtGui, QtWidgets
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.control.device_manager.components.ophyd_validation.ophyd_validation_utils import (
from bec_widgets.widgets.control.device_manager.components.ophyd_validation import (
ConfigStatus,
ConnectionStatus,
DeviceTestModel,
@@ -0,0 +1 @@
from .scan_control import ScanControl
@@ -1,12 +1,10 @@
from bec_lib.utils.import_utils import lazy_import_from
from bec_ipython_client.main import BECIPythonClient
from qtconsole.inprocess import QtInProcessKernelManager
from qtconsole.manager import QtKernelManager
from qtconsole.rich_jupyter_widget import RichJupyterWidget
from qtpy.QtCore import Qt
from qtpy.QtWidgets import QApplication, QMainWindow
BECIPythonClient = lazy_import_from("bec_ipython_client.main", ("BECIPythonClient",))
class BECJupyterConsole(RichJupyterWidget): # pragma: no cover:
def __init__(self, inprocess: bool = False):
@@ -11,7 +11,7 @@ from bec_lib.logger import bec_logger
from qtpy.QtCore import QSize, Qt
from qtpy.QtWidgets import QDialog, QDialogButtonBox, QPushButton, QVBoxLayout
from bec_widgets.widgets.control.scan_control.scan_control import ScanControl
from bec_widgets.widgets.control.scan_control import ScanControl
logger = bec_logger.logger
+1 -1
View File
@@ -611,7 +611,7 @@ class Heatmap(ImageBase):
scan_msg = self.scan_item.status_message
elif hasattr(self.scan_item, "metadata"):
metadata = self.scan_item.metadata["bec"]
status = metadata["exit_status"]
status = metadata["status"]
scan_id = metadata["scan_id"]
scan_name = metadata["scan_name"]
scan_type = metadata["scan_type"]
@@ -12,15 +12,15 @@ from pyqtgraph import SignalProxy
from qtpy.QtCore import QThreadPool, Signal
from qtpy.QtWidgets import QFileDialog, QListWidget, QToolButton, QVBoxLayout, QWidget
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.list_of_expandable_frames import ListOfExpandableFrames
from bec_widgets.utils.rpc_register import RPCRegister
from bec_widgets.utils.ui_loader import UILoader
from bec_widgets.widgets.services.device_browser.device_item import DeviceItem
from bec_widgets.widgets.services.device_browser.device_item.device_config_dialog import (
DirectUpdateDeviceConfigDialog,
)
from bec_widgets.widgets.services.device_browser.device_item.device_item import DeviceItem
from bec_widgets.widgets.services.device_browser.util import map_device_type_to_icon
logger = bec_logger.logger
@@ -0,0 +1 @@
from .device_item import DeviceItem
@@ -0,0 +1,5 @@
from .scan_history_device_viewer import ScanHistoryDeviceViewer
from .scan_history_metadata_viewer import ScanHistoryMetadataViewer
from .scan_history_view import ScanHistoryView
__all__ = ["ScanHistoryDeviceViewer", "ScanHistoryMetadataViewer", "ScanHistoryView"]
@@ -330,10 +330,8 @@ class ScanHistoryView(BECWidget, QtWidgets.QTreeWidget):
if __name__ == "__main__": # pragma: no cover
# pylint: disable=import-outside-toplevel
from bec_widgets.widgets.services.scan_history_browser.components.scan_history_device_viewer import (
from bec_widgets.widgets.services.scan_history_browser.components import (
ScanHistoryDeviceViewer,
)
from bec_widgets.widgets.services.scan_history_browser.components.scan_history_metadata_viewer import (
ScanHistoryMetadataViewer,
)
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
@@ -1,13 +1,9 @@
from qtpy import QtCore, QtWidgets
from bec_widgets.utils.bec_widget import BECWidget, ConnectionConfig
from bec_widgets.widgets.services.scan_history_browser.components.scan_history_device_viewer import (
from bec_widgets.widgets.services.scan_history_browser.components import (
ScanHistoryDeviceViewer,
)
from bec_widgets.widgets.services.scan_history_browser.components.scan_history_metadata_viewer import (
ScanHistoryMetadataViewer,
)
from bec_widgets.widgets.services.scan_history_browser.components.scan_history_view import (
ScanHistoryView,
)
@@ -1,58 +0,0 @@
"""Utilities for filtering and formatting in the LogPanel"""
from __future__ import annotations
import re
from collections import deque
from typing import Callable, Iterator
from bec_lib.logger import LogLevel
from bec_lib.messages import LogMessage
from qtpy.QtCore import QDateTime
LinesHtmlFormatter = Callable[[deque[LogMessage]], Iterator[str]]
LineFormatter = Callable[[LogMessage], str]
LineFilter = Callable[[LogMessage], bool] | None
ANSI_ESCAPE_REGEX = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
def replace_escapes(s: str):
s = ANSI_ESCAPE_REGEX.sub("", s)
return s.replace(" ", "&nbsp;").replace("\n", "<br />").replace("\t", " ")
def level_filter(msg: LogMessage, thresh: int):
return LogLevel[msg.content["log_type"].upper()].value >= thresh
def noop_format(line: LogMessage):
_textline = line.log_msg if isinstance(line.log_msg, str) else line.log_msg["text"]
return replace_escapes(_textline.strip()) + "<br />"
def simple_color_format(line: LogMessage, colors: dict[LogLevel, str]):
color = colors.get(LogLevel[line.content["log_type"].upper()]) or colors[LogLevel.INFO]
return f'<font color="{color}">{noop_format(line)}</font>'
def create_formatter(line_format: LineFormatter, line_filter: LineFilter) -> LinesHtmlFormatter:
def _formatter(data: deque[LogMessage]):
if line_filter is not None:
return (line_format(line) for line in data if line_filter(line))
else:
return (line_format(line) for line in data)
return _formatter
def log_txt(line):
return line.log_msg if isinstance(line.log_msg, str) else line.log_msg["text"]
def log_time(line):
return QDateTime.fromMSecsSinceEpoch(int(line.log_msg["record"]["time"]["timestamp"] * 1000))
def log_svc(line):
return line.log_msg["service_name"]
@@ -30,7 +30,7 @@ class LogPanelPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
return DOM_XML
def group(self):
return "BEC Services"
return ""
def icon(self):
return designer_material_icon(LogPanel.ICON_NAME)
@@ -51,7 +51,7 @@ class LogPanelPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
return "LogPanel"
def toolTip(self):
return "Displays a log panel"
return "LogPanel"
def whatsThis(self):
return self.toolTip()
+432 -329
View File
@@ -2,21 +2,31 @@
from __future__ import annotations
import operator
import os
import re
from collections import deque
from functools import partial, reduce
from re import Pattern
from typing import TYPE_CHECKING, Literal
from dataclasses import dataclass
from functools import partial
from typing import Iterable, Literal
from bec_lib.client import BECClient
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import LogLevel, bec_logger
from bec_lib.messages import LogMessage, StatusMessage
from pyqtgraph import SignalProxy
from qtpy.QtCore import QDateTime, QObject, Qt, Signal
from qtpy.QtGui import QFont
from bec_qthemes import material_icon
from qtpy.QtCore import Signal # type: ignore
from qtpy.QtCore import (
QAbstractTableModel,
QCoreApplication,
QDateTime,
QModelIndex,
QObject,
QPersistentModelIndex,
QSize,
QSortFilterProxyModel,
Qt,
QTimer,
)
from qtpy.QtGui import QColor
from qtpy.QtWidgets import (
QApplication,
QCheckBox,
@@ -25,204 +35,414 @@ from qtpy.QtWidgets import (
QDialog,
QGridLayout,
QHBoxLayout,
QHeaderView,
QLabel,
QLineEdit,
QPushButton,
QScrollArea,
QTextEdit,
QSizePolicy,
QTableView,
QToolButton,
QVBoxLayout,
QWidget,
)
from thefuzz import fuzz
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.colors import apply_theme, get_theme_palette
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import apply_theme
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.editors.text_box.text_box import TextBox
from bec_widgets.widgets.services.bec_status_box.bec_status_box import BECServiceStatusMixin
from bec_widgets.widgets.utility.logpanel._util import (
LineFilter,
LineFormatter,
LinesHtmlFormatter,
create_formatter,
level_filter,
log_svc,
log_time,
log_txt,
noop_format,
simple_color_format,
)
if TYPE_CHECKING: # pragma: no cover
from qtpy.QtCore import SignalInstance
logger = bec_logger.logger
MODULE_PATH = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
# TODO: improve log color handling
DEFAULT_LOG_COLORS = {
LogLevel.INFO: "#FFFFFF",
LogLevel.SUCCESS: "#00FF00",
LogLevel.WARNING: "#FFCC00",
LogLevel.ERROR: "#FF0000",
LogLevel.DEBUG: "#0000CC",
_DEFAULT_LOG_COLORS = {
LogLevel.INFO.name: QColor("#FFFFFF"),
LogLevel.SUCCESS.name: QColor("#00FF00"),
LogLevel.WARNING.name: QColor("#FFCC00"),
LogLevel.ERROR.name: QColor("#FF0000"),
LogLevel.DEBUG.name: QColor("#0000CC"),
}
@dataclass(frozen=True)
class _Constants:
FUZZ_THRESHOLD = 80
UPDATE_INTERVAL_MS = 200
headers = ["level", "timestamp", "service_name", "message", "function"]
_CONST = _Constants()
class TimestampUpdate:
def __init__(self, value: QDateTime | None, update_type: Literal["start", "end"]) -> None:
self.value = value
self.update_type = update_type
class BecLogsQueue(BECConnector, QObject):
"""Manages getting logs from BEC Redis and formatting them for display"""
RPC = False
new_message = Signal()
new_messages = Signal()
paused = Signal(bool)
_instance: BecLogsQueue | None = None
def __init__(
self,
parent: QObject | None,
maxlen: int = 1000,
line_formatter: LineFormatter = noop_format,
**kwargs,
) -> None:
@classmethod
def instance(cls):
if cls._instance is None:
cls._instance = cls(QCoreApplication.instance())
return cls._instance
def __init__(self, parent: QObject | None, maxlen: int = 2500, **kwargs) -> None:
if BecLogsQueue._instance:
raise RuntimeError("Create no more than one BecLogsQueue - use BecLogsQueue.instance()")
super().__init__(parent=parent, **kwargs)
self._timestamp_start: QDateTime | None = None
self._timestamp_end: QDateTime | None = None
self._max_length = maxlen
self._data: deque[LogMessage] = deque([], self._max_length)
self._display_queue: deque[str] = deque([], self._max_length)
self._log_level: str | None = None
self._search_query: Pattern | str | None = None
self._selected_services: set[str] | None = None
self._set_formatter_and_update_filter(line_formatter)
# instance attribute still accessible after c++ object is deleted, so the callback can be unregistered
self._paused = False
self._data = deque(
(
item["data"]
for item in self.bec_dispatcher.client.connector.xread(
MessageEndpoints.log(), count=self._max_length, id="0"
)
),
maxlen=self._max_length,
)
self._incoming: deque[LogMessage] = deque([], maxlen=self._max_length)
self.bec_dispatcher.connect_slot(self._process_incoming_log_msg, MessageEndpoints.log())
self._update_timer = QTimer(self, interval=_CONST.UPDATE_INTERVAL_MS)
self._update_timer.timeout.connect(self._proc_update)
QCoreApplication.instance().aboutToQuit.connect(self.cleanup) # type: ignore
self._update_timer.start()
def __len__(self):
return len(self._data)
@SafeSlot()
def toggle_pause(self):
self._paused = not self._paused
self.paused.emit(self._paused)
def row_data(self, index: int) -> LogMessage | None:
if index < 0 or index > (len(self._data) - 1):
return None
return self._data[index]
def cell_data(self, row: int, key: str):
if key == "level":
return self._data[row].log_type.upper()
msg_item = self._data[row].log_msg
if isinstance(msg_item, str):
return msg_item
if key == "service_name":
return msg_item.get(key)
elif key in ["service_name", "function", "message"]:
return msg_item.get("record", {}).get(key)
elif key == "timestamp":
return msg_item.get("record", {}).get("time", {}).get("repr")
def log_timestamp(self, row: int) -> float:
msg_item = self._data[row].log_msg
if isinstance(msg_item, str):
return 0
return msg_item.get("record", {}).get("time", {}).get("timestamp")
def cleanup(self, *_):
"""Stop listening to the Redis log stream"""
self.bec_dispatcher.disconnect_slot(
self._process_incoming_log_msg, [MessageEndpoints.log()]
)
self._update_timer.stop()
BecLogsQueue._instance = None
@SafeSlot(verify_sender=True)
def _process_incoming_log_msg(self, msg: dict, _metadata: dict):
try:
_msg = LogMessage(**msg)
self._data.append(_msg)
if self.filter is None or self.filter(_msg):
self._display_queue.append(self._line_formatter(_msg))
self.new_message.emit()
self._incoming.append(_msg)
except Exception as e:
if "Internal C++ object (BecLogsQueue) already deleted." in e.args:
return
logger.warning(f"Error in LogPanel incoming message callback: {e}")
def _set_formatter_and_update_filter(self, line_formatter: LineFormatter = noop_format):
self._line_formatter: LineFormatter = line_formatter
self._queue_formatter: LinesHtmlFormatter = create_formatter(
self._line_formatter, self.filter
)
def _combine_filters(self, *args: LineFilter):
return lambda msg: reduce(operator.and_, [filt(msg) for filt in args if filt is not None])
def _create_re_filter(self) -> LineFilter:
if self._search_query is None:
return None
elif isinstance(self._search_query, str):
return lambda line: self._search_query in log_txt(line)
return lambda line: self._search_query.match(log_txt(line)) is not None
def _create_service_filter(self):
return (
lambda line: self._selected_services is None or log_svc(line) in self._selected_services
)
def _create_timestamp_filter(self) -> LineFilter:
s, e = self._timestamp_start, self._timestamp_end
if s is e is None:
return lambda msg: True
def _time_filter(msg):
msg_time = log_time(msg)
if s is None:
return msg_time <= e
if e is None:
return s <= msg_time
return s <= msg_time <= e
return _time_filter
@property
def filter(self) -> LineFilter:
"""A function which filters a log message based on all applied criteria"""
thresh = LogLevel[self._log_level].value if self._log_level is not None else 0
return self._combine_filters(
partial(level_filter, thresh=thresh),
self._create_re_filter(),
self._create_timestamp_filter(),
self._create_service_filter(),
)
def update_level_filter(self, level: str):
"""Change the log-level of the level filter"""
if level not in [l.name for l in LogLevel]:
logger.error(f"Logging level {level} unrecognized for filter!")
@SafeSlot(verify_sender=True)
def _proc_update(self):
if self._paused or len(self._incoming) == 0:
return
self._log_level = level
self._set_formatter_and_update_filter(self._line_formatter)
self._data.extend(self._incoming)
self._incoming.clear()
self.new_messages.emit()
def update_search_filter(self, search_query: Pattern | str | None = None):
"""Change the string or regex to filter against"""
self._search_query = search_query
self._set_formatter_and_update_filter(self._line_formatter)
def update_time_filter(self, start: QDateTime | None, end: QDateTime | None):
"""Change the start and/or end times to filter against"""
self._timestamp_start = start
self._timestamp_end = end
self._set_formatter_and_update_filter(self._line_formatter)
class BecLogsTableModel(QAbstractTableModel):
def __init__(self, parent: QWidget | None = None):
super().__init__(parent)
self.log_queue = BecLogsQueue.instance()
self.log_queue.new_messages.connect(self.handle_new_messages)
self._headers = _CONST.headers
def update_service_filter(self, services: set[str]):
"""Change the selected services to display"""
self._selected_services = services
self._set_formatter_and_update_filter(self._line_formatter)
def rowCount(self, parent: QModelIndex | QPersistentModelIndex = QModelIndex()) -> int:
return len(self.log_queue)
def update_line_formatter(self, line_formatter: LineFormatter):
"""Update the formatter"""
self._set_formatter_and_update_filter(line_formatter)
def columnCount(self, parent: QModelIndex | QPersistentModelIndex = QModelIndex()) -> int:
return len(self._headers)
def display_all(self) -> str:
"""Return formatted output for all log messages"""
return "\n".join(self._queue_formatter(self._data.copy()))
def headerData(self, section, orientation, role=int(Qt.ItemDataRole.DisplayRole)):
if role == Qt.ItemDataRole.DisplayRole and orientation == Qt.Orientation.Horizontal:
return self._headers[section]
return None
def format_new(self):
"""Return formatted output for the display queue"""
res = "\n".join(self._display_queue)
self._display_queue = deque([], self._max_length)
return res
def get_row_data(self, index: QModelIndex) -> LogMessage | None:
"""Return the row data for the given index."""
if not index.isValid():
return None
return self.log_queue.row_data(index.row())
def clear_logs(self):
"""Clear the cache and display queue"""
self._data = deque([])
self._display_queue = deque([])
def timestamp(self, row: int):
return QDateTime.fromMSecsSinceEpoch(int(self.log_queue.log_timestamp(row) * 1000))
def fetch_history(self):
"""Fetch all available messages from Redis"""
self._data = deque(
item["data"]
for item in self.bec_dispatcher.client.connector.xread(
MessageEndpoints.log().endpoint, from_start=True, count=self._max_length
)
def data(self, index, role=int(Qt.ItemDataRole.DisplayRole)):
"""Return data for the given index and role."""
if not index.isValid():
return
if role in [Qt.ItemDataRole.DisplayRole, Qt.ItemDataRole.ToolTipRole]:
return self.log_queue.cell_data(index.row(), self._headers[index.column()])
if role in [Qt.ItemDataRole.ForegroundRole]:
return self._map_log_level_color(self.log_queue.cell_data(index.row(), "level"))
def _map_log_level_color(self, data):
return _DEFAULT_LOG_COLORS.get(data)
def handle_new_messages(self):
self.dataChanged.emit(
self.index(0, 0), self.index(self.rowCount() - 1, self.columnCount() - 1)
)
def unique_service_names_from_history(self) -> set[str]:
"""Go through the log history to determine active service names"""
return set(msg.log_msg["service_name"] for msg in self._data)
class LogMsgProxyModel(QSortFilterProxyModel):
show_service_column = Signal(bool)
def __init__(
self,
parent=None,
service_filter: set[str] | None = None,
level_filter: LogLevel | None = None,
):
super().__init__(parent)
self._service_filter = service_filter or set()
self._level_filter: LogLevel | None = level_filter
self._filter_text: str = ""
self._fuzzy_search: bool = False
self._time_filter_start: QDateTime | None = None
self._time_filter_end: QDateTime | None = None
def get_row_data(self, rows: Iterable[QModelIndex]) -> Iterable[LogMessage | None]:
return (self.sourceModel().get_row_data(self.mapToSource(idx)) for idx in rows)
def sourceModel(self) -> BecLogsTableModel:
return super().sourceModel() # type: ignore
@SafeSlot(int, int)
def refresh(self, *_):
self.invalidateRowsFilter()
@SafeSlot(None)
@SafeSlot(set)
def update_service_filter(self, filter: set[str]):
"""Filter to the selected services (show any service in the provided set)
Args:
filter (set[str] | None): set of services for which to show logs"""
self._service_filter = filter
self.show_service_column.emit(len(filter) != 1)
self.invalidateRowsFilter()
@SafeSlot(None)
@SafeSlot(LogLevel)
def update_level_filter(self, filter: LogLevel | None):
"""Filter to the selected log level
Args:
filter (str | None): lowest log level to show"""
self._level_filter = filter
self.invalidateRowsFilter()
@SafeSlot(str)
def update_filter_text(self, filter: str):
"""Filter messages based on text
Args:
filter (str | None): set of services for which to show logs"""
self._filter_text = filter
self.invalidateRowsFilter()
@SafeSlot(bool)
def update_fuzzy(self, state: bool):
"""Set text filter to fuzzy search or not
Args:
state (bool): fuzzy search on"""
self._fuzzy_search = state
self.invalidateRowsFilter()
@SafeSlot(TimestampUpdate)
def update_timestamp(self, update: TimestampUpdate):
if update.update_type == "start":
self._time_filter_start = update.value
else:
self._time_filter_end = update.value
self.invalidateRowsFilter()
def filterAcceptsRow(self, source_row: int, source_parent) -> bool:
# No service filter, and no filter text, display everything
possible_filters = [
self._service_filter,
self._level_filter,
self._filter_text,
self._time_filter_start,
self._time_filter_end,
]
if not any(map(bool, possible_filters)):
return True
model = self.sourceModel()
# Filter out services
if self._service_filter:
col = _CONST.headers.index("service_name")
if model.data(model.index(source_row, col, source_parent)) not in self._service_filter:
return False
# Filter out levels
if self._level_filter:
col = _CONST.headers.index("level")
level: str = model.data(model.index(source_row, col, source_parent)) # type: ignore
if LogLevel[level] < self._level_filter:
return False
# Filter time
if self._time_filter_start:
if model.timestamp(source_row) < self._time_filter_start:
return False
if self._time_filter_end:
if model.timestamp(source_row) > self._time_filter_end:
return False
# Filter message text - must go last because this can return True
if self._filter_text:
col = _CONST.headers.index("message")
msg: str = model.data(model.index(source_row, col, source_parent)).lower() # type: ignore
if self._fuzzy_search:
return fuzz.partial_ratio(self._filter_text.lower(), msg) >= _CONST.FUZZ_THRESHOLD
else:
return self._filter_text.lower() in msg.lower()
return True
class BecLogTableView(QTableView):
def __init__(self, *args, max_message_width: int = 1000, **kwargs) -> None:
super().__init__(*args, **kwargs)
header = QHeaderView(Qt.Orientation.Horizontal, parent=self)
header.setSectionResizeMode(QHeaderView.ResizeMode.Interactive)
header.setStretchLastSection(True)
header.setMaximumSectionSize(max_message_width)
self.setHorizontalHeader(header)
def model(self) -> LogMsgProxyModel:
return super().model() # type: ignore
class LogPanel(BECWidget, QWidget):
"""Live display of the BEC logs in a table view."""
PLUGIN = True
ICON_NAME = "browse_activity"
def __init__(
self,
parent: QWidget | None = None,
max_message_width: int = 1000,
show_toolbar: bool = True,
service_filter: set[str] | None = None,
level_filter: LogLevel | None = None,
**kwargs,
) -> None:
super().__init__(parent=parent, **kwargs)
self._setup_models(service_filter=service_filter, level_filter=level_filter)
self._layout = QVBoxLayout()
self.setLayout(self._layout)
if show_toolbar:
self._setup_toolbar(client=self.client)
self._setup_table_view(max_message_width=max_message_width)
self._update_service_filter(service_filter or set())
if show_toolbar:
self._connect_toolbar()
self._proxy.show_service_column.connect(self._show_service_column)
colors = QApplication.instance().theme.accent_colors # type: ignore
dict_colors = QApplication.instance().theme.colors # type: ignore
_DEFAULT_LOG_COLORS.update(
{
LogLevel.INFO.name: dict_colors["FG"],
LogLevel.SUCCESS.name: colors.success,
LogLevel.WARNING.name: colors.warning,
LogLevel.ERROR.name: colors.emergency,
LogLevel.DEBUG.name: dict_colors["BORDER"],
}
)
self._table.scrollToBottom()
def _setup_models(self, service_filter: set[str] | None, level_filter: LogLevel | None):
self._model = BecLogsTableModel(parent=self)
self._proxy = LogMsgProxyModel(
parent=self, service_filter=service_filter, level_filter=level_filter
)
self._proxy.setSourceModel(self._model)
self._model.log_queue.new_messages.connect(self._proxy.refresh)
def _setup_table_view(self, max_message_width: int) -> None:
"""Setup the table view."""
self._table = BecLogTableView(self, max_message_width=max_message_width)
self._table.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
self._layout.addWidget(self._table)
self._table.setModel(self._proxy)
self._table.setHorizontalScrollMode(QTableView.ScrollMode.ScrollPerPixel)
self._table.setTextElideMode(Qt.TextElideMode.ElideRight)
self._table.resizeColumnsToContents()
def _setup_toolbar(self, client: BECClient):
self._toolbar = LogPanelToolbar(self, client)
self._layout.addWidget(self._toolbar)
def _connect_toolbar(self):
self._toolbar.services_selected.connect(self._proxy.update_service_filter)
self._toolbar.search_textbox.textChanged.connect(self._proxy.update_filter_text)
self._toolbar.level_changed.connect(self._proxy.update_level_filter)
self._toolbar.fuzzy_changed.connect(self._proxy.update_fuzzy)
self._toolbar.timestamp_update.connect(self._proxy.update_timestamp)
self._toolbar.pause_button.clicked.connect(self._model.log_queue.toggle_pause)
self._model.log_queue.paused.connect(self._toolbar._update_pause_button_icon)
def _update_service_filter(self, filter: set[str]):
self._service_filter = filter
self._proxy.update_service_filter(filter)
self._table.setColumnHidden(
_CONST.headers.index("service_name"), len(self._service_filter) == 1
)
@SafeSlot(bool)
def _show_service_column(self, show: bool):
self._table.setColumnHidden(_CONST.headers.index("service_name"), not show)
def sizeHint(self) -> QSize:
return QSize(600, 300)
class LogPanelToolbar(QWidget):
services_selected = Signal(set)
level_changed = Signal(LogLevel)
fuzzy_changed = Signal(bool)
timestamp_update = Signal(TimestampUpdate)
services_selected: SignalInstance = Signal(set)
def __init__(self, parent: QWidget | None = None) -> None:
def __init__(self, parent: QWidget | None = None, client: BECClient | None = None) -> None:
"""A toolbar for the logpanel, mainly used for managing the states of filters"""
super().__init__(parent)
@@ -231,51 +451,69 @@ class LogPanelToolbar(QWidget):
self._timestamp_end: QDateTime | None = None
self._unique_service_names: set[str] = set()
self._services_selected: set[str] | None = None
self._services_selected: set[str] = set()
self.layout = QHBoxLayout(self) # type: ignore
self._layout = QHBoxLayout(self)
self.service_choice_button = QPushButton("Select services", self)
self.layout.addWidget(self.service_choice_button)
self.service_choice_button.clicked.connect(self._open_service_filter_dialog)
if client is not None:
self.client = client
self.service_choice_button = QPushButton("Select services", self)
self._layout.addWidget(self.service_choice_button)
self.service_choice_button.clicked.connect(self._open_service_filter_dialog)
self.service_list_update(self.client.service_status)
self._services_selected = self._unique_service_names
self.filter_level_dropdown = self._log_level_box()
self.layout.addWidget(self.filter_level_dropdown)
self.clear_button = QPushButton("Clear all", self)
self.layout.addWidget(self.clear_button)
self.fetch_button = QPushButton("Fetch history", self)
self.layout.addWidget(self.fetch_button)
self._layout.addWidget(self.filter_level_dropdown)
self.filter_level_dropdown.currentTextChanged.connect(self._emit_level)
self._string_search_box()
self.timerange_button = QPushButton("Set time range", self)
self.layout.addWidget(self.timerange_button)
self._layout.addWidget(self.timerange_button)
self.timerange_button.clicked.connect(self._open_datetime_dialog)
@property
def time_start(self):
return self._timestamp_start
self.pause_button = QToolButton()
self.pause_button.setIcon(material_icon("pause", size=(20, 20), convert_to_pixmap=False))
self._PLAYING_TOOLTIP = "Pause live log updates."
self._PAUSED_TOOLTIP = "Continue live log updates."
self.pause_button.setToolTip(self._PLAYING_TOOLTIP)
self._layout.addWidget(self.pause_button)
@property
def time_end(self):
return self._timestamp_end
@SafeSlot(bool)
def _update_pause_button_icon(self, paused):
if paused:
icon = "play_arrow"
tooltip = self._PAUSED_TOOLTIP
else:
icon = "pause"
tooltip = self._PLAYING_TOOLTIP
self.pause_button.setIcon(material_icon(icon, size=(20, 20), convert_to_pixmap=False))
self.pause_button.setToolTip(tooltip)
def _string_search_box(self):
self.layout.addWidget(QLabel("Search: "))
self._layout.addWidget(QLabel("Search: "))
self.search_textbox = QLineEdit()
self.layout.addWidget(self.search_textbox)
self.layout.addWidget(QLabel("Use regex: "))
self.regex_enabled = QCheckBox()
self.layout.addWidget(self.regex_enabled)
self.update_re_button = QPushButton("Update search", self)
self.layout.addWidget(self.update_re_button)
self._layout.addWidget(self.search_textbox)
self._layout.addWidget(QLabel("Fuzzy: "))
self.fuzzy = QCheckBox()
self._layout.addWidget(self.fuzzy)
self.fuzzy.checkStateChanged.connect(self._emit_fuzzy)
def _log_level_box(self):
box = QComboBox()
box.setToolTip("Display logs with equal or greater significance to the selected level.")
[box.addItem(l.name) for l in LogLevel]
[box.addItem(level.name) for level in LogLevel]
return box
@SafeSlot(str)
def _emit_level(self, level: str):
self.level_changed.emit(LogLevel[level])
@SafeSlot(Qt.CheckState)
def _emit_fuzzy(self, state: Qt.CheckState):
self.fuzzy_changed.emit(state == Qt.CheckState.Checked)
def _current_ts(self, selection_type: Literal["start", "end"]):
if selection_type == "start":
return self._timestamp_start
@@ -284,6 +522,7 @@ class LogPanelToolbar(QWidget):
else:
raise ValueError(f"timestamps can only be for the start or end, not {selection_type}")
@SafeSlot()
def _open_datetime_dialog(self):
"""Open dialog window for timestamp filter selection"""
self._dt_dialog = QDialog(self)
@@ -312,8 +551,8 @@ class LogPanelToolbar(QWidget):
)
_layout.addWidget(date_clear_button)
for v in [("start", label_start), ("end", label_end)]:
date_button_set(*v)
date_button_set("start", label_start)
date_button_set("end", label_end)
close_button = QPushButton("Close", parent=self._dt_dialog)
close_button.clicked.connect(self._dt_dialog.accept)
@@ -352,27 +591,23 @@ class LogPanelToolbar(QWidget):
self._timestamp_start = dt
else:
self._timestamp_end = dt
self.timestamp_update.emit(TimestampUpdate(value=dt, update_type=selection_type))
@SafeSlot(dict, set)
def service_list_update(
self, services_info: dict[str, StatusMessage], services_from_history: set[str], *_, **__
):
def service_list_update(self, services_info: dict[str, StatusMessage]):
"""Change the list of services which can be selected"""
self._unique_service_names = set([s.split("/")[0] for s in services_info.keys()])
self._unique_service_names |= services_from_history
if self._services_selected is None:
self._services_selected = self._unique_service_names
@SafeSlot()
def _open_service_filter_dialog(self):
self.service_list_update(self.client.service_status)
if len(self._unique_service_names) == 0 or self._services_selected is None:
return
self._svc_dialog = QDialog(self)
self._svc_dialog.setWindowTitle(f"Select services to show logs from")
self._svc_dialog.setWindowTitle("Select services to show logs from")
layout = QVBoxLayout()
self._svc_dialog.setLayout(layout)
service_cb_grid = QGridLayout(parent=self._svc_dialog)
service_cb_grid = QGridLayout()
layout.addLayout(service_cb_grid)
def check_box(name: str, checked: Qt.CheckState):
@@ -398,146 +633,6 @@ class LogPanelToolbar(QWidget):
self._svc_dialog.deleteLater()
class LogPanel(TextBox):
"""Displays a log panel"""
ICON_NAME = "terminal"
service_list_update = Signal(dict, set)
def __init__(
self,
parent=None,
client: BECClient | None = None,
service_status: BECServiceStatusMixin | None = None,
**kwargs,
):
"""Initialize the LogPanel widget."""
super().__init__(parent=parent, client=client, config={"text": ""}, **kwargs)
self._update_colors()
self._service_status = service_status or BECServiceStatusMixin(self, client=self.client) # type: ignore
self._log_manager = BecLogsQueue(
parent=self, line_formatter=partial(simple_color_format, colors=self._colors)
)
self._proxy_update = SignalProxy(
self._log_manager.new_message, rateLimit=1, slot=self._on_append
)
self.toolbar = LogPanelToolbar(parent=self)
self.toolbar_area = QScrollArea()
self.toolbar_area.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAlwaysOff)
self.toolbar_area.setSizeAdjustPolicy(QScrollArea.SizeAdjustPolicy.AdjustToContents)
self.toolbar_area.setFixedHeight(int(self.toolbar.clear_button.height() * 2))
self.toolbar_area.setWidget(self.toolbar)
self.layout.addWidget(self.toolbar_area)
self.toolbar.clear_button.clicked.connect(self._on_clear)
self.toolbar.fetch_button.clicked.connect(self._on_fetch)
self.toolbar.update_re_button.clicked.connect(self._on_re_update)
self.toolbar.search_textbox.returnPressed.connect(self._on_re_update)
self.toolbar.regex_enabled.checkStateChanged.connect(self._on_re_update)
self.toolbar.filter_level_dropdown.currentTextChanged.connect(self._set_level_filter)
self.toolbar.timerange_button.clicked.connect(self._choose_datetime)
self._service_status.services_update.connect(self._update_service_list)
self.service_list_update.connect(self.toolbar.service_list_update)
self.toolbar.services_selected.connect(self._update_service_filter)
self.text_box_text_edit.setFont(QFont("monospace", 12))
self.text_box_text_edit.setHtml("")
self.text_box_text_edit.setLineWrapMode(QTextEdit.LineWrapMode.NoWrap)
self._connect_to_theme_change()
@SafeSlot(set)
def _update_service_filter(self, services: set[str]):
self._log_manager.update_service_filter(services)
self._on_redraw()
@SafeSlot(dict, dict)
def _update_service_list(self, services_info: dict[str, StatusMessage], *_, **__):
self.service_list_update.emit(
services_info, self._log_manager.unique_service_names_from_history()
)
@SafeSlot()
def _choose_datetime(self):
self.toolbar._open_datetime_dialog()
self._set_time_filter()
def _connect_to_theme_change(self):
"""Connect to the theme change signal."""
qapp = QApplication.instance()
if hasattr(qapp, "theme_signal"):
qapp.theme_signal.theme_updated.connect(self._on_redraw) # type: ignore
def _update_colors(self):
self._colors = DEFAULT_LOG_COLORS.copy()
self._colors.update({LogLevel.INFO: get_theme_palette().text().color().name()})
def _cursor_to_end(self):
c = self.text_box_text_edit.textCursor()
c.movePosition(c.MoveOperation.End)
self.text_box_text_edit.setTextCursor(c)
@SafeSlot()
@SafeSlot(str)
def _on_redraw(self, *_):
self._update_colors()
self._log_manager.update_line_formatter(partial(simple_color_format, colors=self._colors))
self.set_html_text(self._log_manager.display_all())
self._cursor_to_end()
@SafeSlot(verify_sender=True)
def _on_append(self, *_):
self.text_box_text_edit.insertHtml(self._log_manager.format_new())
self._cursor_to_end()
@SafeSlot()
def _on_clear(self):
self._log_manager.clear_logs()
self.set_html_text(self._log_manager.display_all())
self._cursor_to_end()
@SafeSlot()
@SafeSlot(Qt.CheckState)
def _on_re_update(self, *_):
if self.toolbar.regex_enabled.isChecked():
try:
search_query = re.compile(self.toolbar.search_textbox.text())
except Exception as e:
logger.warning(f"Failed to compile search regex with error {e}")
search_query = None
logger.info(f"Setting LogPanel search regex to {search_query}")
else:
search_query = self.toolbar.search_textbox.text()
logger.info(f'Setting LogPanel search string to "{search_query}"')
self._log_manager.update_search_filter(search_query)
self.set_html_text(self._log_manager.display_all())
self._cursor_to_end()
@SafeSlot()
def _on_fetch(self):
self._log_manager.fetch_history()
self.set_html_text(self._log_manager.display_all())
self._cursor_to_end()
@SafeSlot(str)
def _set_level_filter(self, level: str):
self._log_manager.update_level_filter(level)
self._on_redraw()
@SafeSlot()
def _set_time_filter(self):
self._log_manager.update_time_filter(self.toolbar.time_start, self.toolbar.time_end)
self._on_redraw()
def cleanup(self):
self._service_status.cleanup()
self._log_manager.cleanup()
self._log_manager.deleteLater()
super().cleanup()
if __name__ == "__main__": # pragma: no cover
import sys
@@ -545,7 +640,15 @@ if __name__ == "__main__": # pragma: no cover
app = QApplication(sys.argv)
apply_theme("dark")
widget = LogPanel()
panel = QWidget()
queue = BecLogsQueue(panel)
layout = QVBoxLayout(panel)
layout.addWidget(QLabel("All logs, no filters:"))
layout.addWidget(LogPanel())
layout.addWidget(QLabel("All services, level filter WARNING preapplied:"))
layout.addWidget(LogPanel(level_filter=LogLevel.WARNING))
layout.addWidget(QLabel('All services, service filter {"DeviceServer"} preapplied:'))
layout.addWidget(LogPanel(service_filter={"DeviceServer"}))
widget.show()
panel.show()
sys.exit(app.exec())
@@ -3,6 +3,7 @@ from qtpy import QtCore, QtGui
from qtpy.QtCore import Property, Signal, Slot
from qtpy.QtWidgets import QSizePolicy, QVBoxLayout, QWidget
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import Colors
+10 -11
View File
@@ -1,6 +1,6 @@
[project]
name = "bec_widgets"
version = "3.5.0"
version = "3.8.0"
description = "BEC Widgets"
requires-python = ">=3.11"
classifiers = [
@@ -12,19 +12,19 @@ dependencies = [
"PyJWT~=2.9",
"PySide6==6.9.0",
"PySide6-QtAds==4.4.0",
"bec_ipython_client~=3.107,>=3.107.2", # needed for jupyter console
"bec_ipython_client~=3.107,>=3.107.2", # needed for jupyter console
"bec_lib~=3.107,>=3.107.2",
"bec_qthemes~=1.0, >=1.3.4",
"black>=26,<27", # needed for bw-generate-cli
"black>=26,<27", # needed for bw-generate-cli
"copier~=9.7",
"darkdetect~=0.8",
"isort>=5.13, <9.0", # needed for bw-generate-cli
"isort>=5.13, <9.0", # needed for bw-generate-cli
"markdown~=3.9",
"ophyd_devices~=1.29, >=1.29.1",
"pydantic~=2.0",
"pylsp-bec~=1.2",
"pyqtgraph==0.13.7",
"qtconsole~=5.5, >=5.5.1", # needed for jupyter console
"qtconsole~=5.5, >=5.5.1", # needed for jupyter console
"qtmonaco~=0.8, >=0.8.1",
"qtpy~=2.4",
"thefuzz~=0.22",
@@ -38,8 +38,8 @@ Homepage = "https://gitlab.psi.ch/bec/bec_widgets"
[project.scripts]
bec-app = "bec_widgets.applications.main_app:main"
bec-designer = "bec_widgets.utils.bec_designer:main"
bec-gui-server = "bec_widgets.cli.server:main"
bw-generate-cli = "bec_widgets.cli.generate_cli:main"
bec-gui-server = "bec_widgets.applications.companion_app:main"
bw-generate-cli = "bec_widgets.utils.generate_cli:main"
[project.optional-dependencies]
dev = [
@@ -52,12 +52,11 @@ dev = [
"pytest-xvfb~=3.0",
"pytest~=8.0",
"pytest-cov~=6.1.1",
"pytest-benchmark~=5.2",
"watchdog~=6.0",
"pre_commit~=4.2",
]
qtermwidget = [
"pyside6_qtermwidget",
]
qtermwidget = ["pyside6_qtermwidget"]
[build-system]
requires = ["hatchling"]
@@ -68,7 +67,7 @@ line-length = 100
skip-magic-trailing-comma = true
[tool.coverage.report]
skip_empty = true # exclude empty *files*, e.g. __init__.py, from the report
skip_empty = true # exclude empty *files*, e.g. __init__.py, from the report
exclude_lines = [
"pragma: no cover",
"if TYPE_CHECKING:",
@@ -0,0 +1,5 @@
#!/usr/bin/env bash
# BENCHMARK_TITLE: Import bec_widgets
set -euo pipefail
python -c 'import bec_widgets; print(bec_widgets.__file__)'
@@ -0,0 +1,5 @@
#!/usr/bin/env bash
# BENCHMARK_TITLE: BEC IPython client with companion app
set -euo pipefail
bec --post-startup-file tests/benchmarks/hyperfine/utils/exit_bec_startup.py
@@ -0,0 +1,5 @@
#!/usr/bin/env bash
# BENCHMARK_TITLE: BEC IPython client without companion app
set -euo pipefail
bec --nogui --post-startup-file tests/benchmarks/hyperfine/utils/exit_bec_startup.py
@@ -0,0 +1,5 @@
import time
_ip = get_ipython()
_ip.confirm_exit = False
_ip.ask_exit()
+18 -5
View File
@@ -1,3 +1,5 @@
import traceback
import pytest
import qtpy.QtCore
from pytestqt.exceptions import TimeoutError as QtBotTimeoutError
@@ -5,12 +7,14 @@ from qtpy.QtCore import QTimer
class TestableQTimer(QTimer):
_instances: list[tuple[QTimer, str]] = []
_instances: list[tuple[QTimer, str, str]] = []
_current_test_name: str = ""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
TestableQTimer._instances.append((self, TestableQTimer._current_test_name))
tb = traceback.format_stack()
init_line = list(filter(lambda msg: "QTimer(" in msg, tb))[-1]
TestableQTimer._instances.append((self, TestableQTimer._current_test_name, init_line))
@classmethod
def check_all_stopped(cls, qtbot):
@@ -20,12 +24,21 @@ class TestableQTimer(QTimer):
except RuntimeError as e:
return "already deleted" in e.args[0]
def _format_timers(timers: list[tuple[QTimer, str, str]]):
return "\n".join(
f"Timer: {t[0]}\n in test: {t[1]}\n created at:{t[2]}" for t in timers
)
try:
qtbot.waitUntil(lambda: all(_is_done_or_deleted(timer) for timer, _ in cls._instances))
qtbot.waitUntil(
lambda: all(_is_done_or_deleted(timer) for timer, _, _ in cls._instances)
)
except QtBotTimeoutError as exc:
active_timers = list(filter(lambda t: t[0].isActive(), cls._instances))
(t.stop() for t, _ in cls._instances)
raise TimeoutError(f"Failed to stop all timers: {active_timers}") from exc
(t.stop() for t, _, _ in cls._instances)
raise TimeoutError(
f"Failed to stop all timers:\n{_format_timers(active_timers)}"
) from exc
cls._instances = []
+5 -4
View File
@@ -5,7 +5,7 @@ import random
import pytest
from bec_widgets.cli.client_utils import BECGuiClient
from bec_widgets.widgets.control.scan_control.scan_control import ScanControl
from bec_widgets.widgets.control.scan_control import ScanControl
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
@@ -33,7 +33,7 @@ def threads_check_fixture(threads_check):
@pytest.fixture
def gui_id():
"""New gui id each time, to ensure no 'gui is alive' zombie key can perturb"""
return f"figure_{random.randint(0,100)}" # make a new gui id each time, to ensure no 'gui is alive' zombie key can perturb
return f"figure_{random.randint(0, 100)}" # make a new gui id each time, to ensure no 'gui is alive' zombie key can perturb
@pytest.fixture(scope="function")
@@ -51,6 +51,7 @@ def connected_client_gui_obj(qtbot, gui_id, bec_client_lib):
qtbot.waitUntil(lambda: len(gui.bec.widget_list()) == 0, timeout=10000)
yield gui
finally:
gui.bec.delete_all() # ensure clean state
qtbot.waitUntil(lambda: len(gui.bec.widget_list()) == 0, timeout=10000)
if (bec := getattr(gui, "bec", None)) is not None:
bec.delete_all() # ensure clean state
qtbot.waitUntil(lambda: len(bec.widget_list()) == 0, timeout=10000)
gui.kill_server()
+7
View File
@@ -89,6 +89,13 @@ def test_available_widgets(qtbot, connected_client_gui_obj):
# Skip private attributes
if object_name.startswith("_"):
continue
# Skip BECShell as ttyd is not installed
if object_name == "BECShell":
continue
# Skip BecConsole as ttyd is not installed
if object_name == "BecConsole":
continue
#############################
######### Add widget ########
-1
View File
@@ -59,5 +59,4 @@ def test_run_line_scan_with_parameters_e2e(scan_control, bec_client_lib, qtbot):
last_scan = queue.scan_storage.storage[-1]
assert last_scan.status_message.info["scan_name"] == scan_name
assert last_scan.status_message.info["exp_time"] == kwargs["exp_time"]
assert last_scan.status_message.info["scan_motors"] == [args["device"]]
assert last_scan.status_message.info["num_points"] == kwargs["steps"]
+1 -2
View File
@@ -10,7 +10,7 @@ except ImportError:
from qtpy.QtWidgets import QGridLayout
from bec_widgets.utils.widget_io import WidgetIO
from bec_widgets.widgets.control.scan_control.scan_control import ScanControl
from bec_widgets.widgets.control.scan_control import ScanControl
@pytest.fixture(scope="function")
@@ -84,7 +84,6 @@ def test_scan_metadata_for_custom_scan(
last_scan = queue.scan_storage.storage[-1]
assert last_scan.status_message.info["scan_name"] == scan_name
assert last_scan.status_message.info["exp_time"] == kwargs["exp_time"]
assert last_scan.status_message.info["scan_motors"] == [args["device"]]
assert last_scan.status_message.info["num_points"] == kwargs["steps"]
if valid:
@@ -260,22 +260,6 @@ def test_widgets_e2e_image(qtbot, connected_client_gui_obj, random_generator_fro
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# TODO re-enable when issue is resolved #560
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_log_panel(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the LogPanel widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.LogPanel)
# widget: client.LogPanel
# # No rpc calls to check so far
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_minesweeper(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the MineSweeper widget."""
@@ -0,0 +1,27 @@
from __future__ import annotations
import pytest
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
from bec_widgets.widgets.plots.waveform.waveform import Waveform
from tests.unit_tests.client_mocks import mocked_client
@pytest.fixture
def dock_area(qtbot, mocked_client):
widget = BECDockArea(client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def test_add_waveform_to_dock_area(benchmark, dock_area, qtbot, mocked_client):
"""Benchmark adding a Waveform widget to an existing dock area."""
def add_waveform():
dock_area.new("Waveform")
return dock_area
dock = benchmark(add_waveform)
assert dock is not None
+4 -4
View File
@@ -23,11 +23,11 @@ from pytestqt.exceptions import TimeoutError as QtBotTimeoutError
from qtpy.QtCore import QEvent, QEventLoop
from qtpy.QtWidgets import QApplication, QMessageBox
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.tests.utils import DEVICES, DMMock
from bec_widgets.utils import bec_dispatcher as bec_dispatcher_module
from bec_widgets.utils import error_popups
from bec_widgets.utils.bec_dispatcher import QtRedisConnector
from bec_widgets.utils.rpc_register import RPCRegister
# Patch to set default RAISE_ERROR_DEFAULT to True for tests
# This means that by default, error popups will raise exceptions during tests
@@ -227,7 +227,7 @@ def create_history_file(file_path, data: dict, metadata: dict) -> messages.ScanH
msg = messages.ScanHistoryMessage(
scan_id=metadata["scan_id"],
scan_name=metadata["scan_name"],
exit_status=metadata["exit_status"],
exit_status=metadata["status"],
file_path=file_path,
scan_number=metadata["scan_number"],
dataset_number=metadata["dataset_number"],
@@ -274,7 +274,7 @@ def grid_scan_history_msg(tmpdir):
"scan_id": "test_scan",
"scan_name": "grid_scan",
"scan_type": "step",
"exit_status": "closed",
"status": "closed",
"scan_number": 1,
"dataset_number": 1,
"request_inputs": {
@@ -354,7 +354,7 @@ def scan_history_factory(tmpdir):
"scan_id": scan_id,
"scan_name": scan_name,
"scan_type": scan_type,
"exit_status": "closed",
"status": "closed",
"scan_number": scan_number,
"dataset_number": dataset_number,
"request_inputs": {
-1
View File
@@ -71,7 +71,6 @@ def bec_queue_msg_full():
},
"report_instructions": [{"scan_progress": 20}],
"scan_id": "2d704cc3-c172-404c-866d-608ce09fce40",
"scan_motors": ["samx"],
"scan_number": 1289,
}
],
@@ -9,7 +9,8 @@ from bec_widgets.cli.rpc.rpc_base import RPCBase
from bec_widgets.utils.plugin_utils import BECClassContainer, BECClassInfo
class _TestGlobalPlugin(RPCBase): ...
class _TestGlobalPlugin(RPCBase):
_IMPORT_MODULE = "test.global.plugin.widgets"
mock_client_module_globals = SimpleNamespace()
@@ -25,12 +26,13 @@ mock_client_module_globals.Widgets = _TestGlobalPlugin
def test_plugins_dont_clobber_client_globals(bec_logger: MagicMock):
reload(client)
bec_logger.logger.warning.assert_called_with(
"Plugin widget Widgets from namespace(Widgets=<class 'tests.unit_tests.test_client_plugin_widgets._TestGlobalPlugin'>) conflicts with a built-in class!"
"Plugin widget Widgets in test.global.plugin.widgets conflicts with a built-in class!"
)
assert isinstance(client.Widgets, enum.EnumType)
class _TestDuplicatePlugin(RPCBase): ...
class _TestDuplicatePlugin(RPCBase):
_IMPORT_MODULE = "test.duplicate.plugin.module"
mock_client_module_duplicate = SimpleNamespace()
@@ -54,7 +56,7 @@ def test_duplicate_plugins_not_allowed(_, bec_logger: MagicMock):
reload(client)
assert (
call(
f"Detected duplicate widget Waveform in plugin repo file: {inspect.getfile(_TestDuplicatePlugin)} !"
"Plugin widget Waveform in test.duplicate.plugin.module conflicts with a built-in class!"
)
in bec_logger.logger.warning.mock_calls
)
+2 -3
View File
@@ -8,9 +8,8 @@ from bec_widgets.utils.bec_connector import ConnectionConfig
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import Colors, apply_theme
from bec_widgets.widgets.plots.waveform.curve import CurveConfig
from .client_mocks import mocked_client
from .conftest import create_widget
from tests.unit_tests.client_mocks import mocked_client
from tests.unit_tests.conftest import create_widget
def test_color_validation_CSS():
+2 -2
View File
@@ -7,8 +7,8 @@ from qtpy.QtGui import QTransform
from bec_widgets.utils.crosshair import Crosshair
from bec_widgets.widgets.plots.image.image_item import ImageItem
from bec_widgets.widgets.plots.waveform.waveform import Waveform
from tests.unit_tests.client_mocks import mocked_client
from .client_mocks import mocked_client
from .conftest import create_widget
# pylint: disable = redefined-outer-name
@@ -214,7 +214,7 @@ def test_crosshair_clicked_signal(qtbot, plot_widget_with_crosshair):
pos_in_widget = graphics_view.mapFromScene(pos_in_scene)
# Simulate mouse click
qtbot.mouseClick(graphics_view.viewport(), Qt.MouseButton.LeftButton, pos=pos_in_widget)
qtbot.mouseClick(graphics_view.viewport(), Qt.LeftButton, pos=pos_in_widget)
x, y = emitted_positions[0]
+2 -3
View File
@@ -12,9 +12,8 @@ from bec_widgets.widgets.plots.waveform.settings.curve_settings.curve_tree impor
ScanIndexValidator,
)
from bec_widgets.widgets.plots.waveform.waveform import Waveform
from .client_mocks import dap_plugin_message, mocked_client, mocked_client_with_dap
from .conftest import create_widget
from tests.unit_tests.client_mocks import dap_plugin_message, mocked_client, mocked_client_with_dap
from tests.unit_tests.conftest import create_widget
##################################################
# CurveSetting
+1 -1
View File
@@ -19,7 +19,7 @@ from .client_mocks import mocked_client
if TYPE_CHECKING: # pragma: no cover
from qtpy.QtWidgets import QListWidgetItem
from bec_widgets.widgets.services.device_browser.device_item.device_item import DeviceItem
from bec_widgets.widgets.services.device_browser.device_item import DeviceItem
# pylint: disable=no-member
@@ -16,6 +16,8 @@ from qtpy import QtCore, QtGui, QtWidgets
from bec_widgets.utils.bec_list import BECList
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.widgets.control.device_manager import DeviceTable, DMConfigView, DocstringView
from bec_widgets.widgets.control.device_manager.components import docstring_to_markdown
from bec_widgets.widgets.control.device_manager.components.constants import HEADERS_HELP_MD
from bec_widgets.widgets.control.device_manager.components.device_config_template.device_config_template import (
DeviceConfigTemplate,
@@ -32,17 +34,9 @@ from bec_widgets.widgets.control.device_manager.components.device_config_templat
ReadoutPriorityComboBox,
_try_literal_eval,
)
from bec_widgets.widgets.control.device_manager.components.device_table.device_table import (
DeviceTable,
)
from bec_widgets.widgets.control.device_manager.components.device_table.device_table_row import (
DeviceTableRow,
)
from bec_widgets.widgets.control.device_manager.components.dm_config_view import DMConfigView
from bec_widgets.widgets.control.device_manager.components.dm_docstring_view import (
DocstringView,
docstring_to_markdown,
)
from bec_widgets.widgets.control.device_manager.components.ophyd_validation.ophyd_validation import (
DeviceTest,
LegendLabel,
+6 -8
View File
@@ -31,11 +31,12 @@ from bec_widgets.applications.views.device_manager_view.device_manager_view impo
DeviceManagerWidget,
)
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.widgets.control.device_manager.components.device_table.device_table import (
from bec_widgets.widgets.control.device_manager.components import (
DeviceTable,
DMConfigView,
DocstringView,
OphydValidation,
)
from bec_widgets.widgets.control.device_manager.components.dm_config_view import DMConfigView
from bec_widgets.widgets.control.device_manager.components.dm_docstring_view import DocstringView
from bec_widgets.widgets.control.device_manager.components.ophyd_validation.ophyd_validation import (
ConfigStatus,
ConnectionStatus,
@@ -145,10 +146,7 @@ class TestDeviceManagerViewDialogs:
group_combo: QtWidgets.QComboBox = dialog._control_widgets["group_combo"]
assert group_combo.count() == len(OPHYD_DEVICE_TEMPLATES)
# Test select a group from available templates
variant_combo = dialog._control_widgets["variant_combo"]
assert variant_combo.isEnabled() is False
with qtbot.waitSignal(group_combo.currentTextChanged):
epics_signal_index = group_combo.findText("EpicsSignal")
group_combo.setCurrentIndex(epics_signal_index) # Select "EpicsSignal" group
@@ -234,7 +232,7 @@ class TestDeviceManagerViewDialogs:
sample_config = {
"name": "TestDevice",
"enabled": True,
"deviceClass": "ophyd.EpicsSignal",
"deviceClass": "ophyd_devices.EpicsSignal",
"readoutPriority": "baseline",
"deviceConfig": {"read_pv": "X25DA-ES1-MOT:GET"},
}
@@ -247,7 +245,7 @@ class TestDeviceManagerViewDialogs:
assert variant_combo.currentText() == "EpicsSignal"
config = dialog._device_config_template.get_config_fields()
assert config["name"] == "TestDevice"
assert config["deviceClass"] == "ophyd.EpicsSignal"
assert config["deviceClass"] == "ophyd_devices.EpicsSignal"
assert config["deviceConfig"]["read_pv"] == "X25DA-ES1-MOT:GET"
# Test now to add the device config with different validation results
+312 -186
View File
@@ -19,19 +19,19 @@ from bec_widgets.widgets.containers.dock_area.basic_dock_area import (
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea, SaveProfileDialog
from bec_widgets.widgets.containers.dock_area.profile_utils import (
SETTINGS_KEYS,
default_profile_path,
baseline_profile_path,
get_profile_info,
is_profile_read_only,
is_quick_select,
list_profiles,
load_default_profile_screenshot,
load_user_profile_screenshot,
open_default_settings,
open_user_settings,
load_baseline_profile_screenshot,
load_runtime_profile_screenshot,
open_baseline_settings,
open_runtime_settings,
read_manifest,
restore_user_from_default,
restore_runtime_from_baseline,
runtime_profile_path,
set_quick_select,
user_profile_path,
write_manifest,
)
from bec_widgets.widgets.containers.dock_area.settings.dialogs import (
@@ -188,17 +188,17 @@ class _NamespaceProfiles:
def __init__(self, widget: BECDockArea):
self.namespace = widget.profile_namespace
def open_user(self, name: str):
return open_user_settings(name, namespace=self.namespace)
def open_runtime(self, name: str):
return open_runtime_settings(name, namespace=self.namespace)
def open_default(self, name: str):
return open_default_settings(name, namespace=self.namespace)
def open_baseline(self, name: str):
return open_baseline_settings(name, namespace=self.namespace)
def user_path(self, name: str) -> str:
return user_profile_path(name, namespace=self.namespace)
def runtime_path(self, name: str) -> str:
return runtime_profile_path(name, namespace=self.namespace)
def default_path(self, name: str) -> str:
return default_profile_path(name, namespace=self.namespace)
def baseline_path(self, name: str) -> str:
return baseline_profile_path(name, namespace=self.namespace)
def list_profiles(self) -> list[str]:
return list_profiles(namespace=self.namespace)
@@ -615,35 +615,6 @@ class TestBasicDockArea:
]
class TestAdvancedDockAreaInit:
"""Test initialization and basic properties."""
def test_init(self, advanced_dock_area):
assert advanced_dock_area is not None
assert isinstance(advanced_dock_area, BECDockArea)
assert advanced_dock_area.mode == "creator"
assert hasattr(advanced_dock_area, "dock_manager")
assert hasattr(advanced_dock_area, "toolbar")
assert hasattr(advanced_dock_area, "dark_mode_button")
assert hasattr(advanced_dock_area, "state_manager")
def test_rpc_and_plugin_flags(self):
assert BECDockArea.RPC is True
assert BECDockArea.PLUGIN is False
def test_user_access_list(self):
expected_methods = [
"new",
"widget_map",
"widget_list",
"workspace_is_locked",
"attach_all",
"delete_all",
]
for method in expected_methods:
assert method in BECDockArea.USER_ACCESS
class TestDockManagement:
"""Test dock creation, management, and manipulation."""
@@ -851,16 +822,6 @@ class TestDeveloperMode:
class TestToolbarFunctionality:
"""Test toolbar setup and functionality."""
def test_toolbar_setup(self, advanced_dock_area):
"""Test toolbar is properly set up."""
assert hasattr(advanced_dock_area, "toolbar")
assert hasattr(advanced_dock_area, "_ACTION_MAPPINGS")
# Check that action mappings are properly set
assert "menu_plots" in advanced_dock_area._ACTION_MAPPINGS
assert "menu_devices" in advanced_dock_area._ACTION_MAPPINGS
assert "menu_utils" in advanced_dock_area._ACTION_MAPPINGS
def test_toolbar_plot_actions(self, advanced_dock_area):
"""Test plot toolbar actions trigger widget creation."""
plot_actions = [
@@ -946,7 +907,7 @@ class TestToolbarFunctionality:
def test_load_profile_restores_floating_dock(self, advanced_dock_area, qtbot):
helper = profile_helper(advanced_dock_area)
settings = helper.open_user("floating_profile")
settings = helper.open_runtime("floating_profile")
settings.clear()
settings.setValue("profile/created_at", "2025-11-23T00:00:00Z")
@@ -1215,18 +1176,6 @@ class TestPreviewPanel:
assert "No preview available" in panel.image_label.text()
class TestRestoreProfileDialog:
"""Test restore dialog confirmation flow."""
def test_confirm_accepts(self, monkeypatch):
monkeypatch.setattr(RestoreProfileDialog, "exec", lambda self: QDialog.Accepted)
assert RestoreProfileDialog.confirm(None, QPixmap(), QPixmap()) is True
def test_confirm_rejects(self, monkeypatch):
monkeypatch.setattr(RestoreProfileDialog, "exec", lambda self: QDialog.Rejected)
assert RestoreProfileDialog.confirm(None, QPixmap(), QPixmap()) is False
class TestProfileInfoAndScreenshots:
"""Tests for profile utilities metadata and screenshot helpers."""
@@ -1246,9 +1195,9 @@ class TestProfileInfoAndScreenshots:
settings.endArray()
settings.sync()
def test_get_profile_info_user_origin(self, temp_profile_dir):
name = "info_user"
settings = open_user_settings(name)
def test_get_profile_info_runtime_origin(self, temp_profile_dir):
name = "info_runtime"
settings = open_runtime_settings(name)
settings.setValue(profile_utils.SETTINGS_KEYS["created_at"], "2023-01-01T00:00:00Z")
settings.setValue("profile/author", "Custom")
set_quick_select(name, True)
@@ -1262,22 +1211,22 @@ class TestProfileInfoAndScreenshots:
assert info.is_quick_select is True
assert info.widget_count == 3
assert info.author == "User"
assert info.user_path.endswith(f"{name}.ini")
assert info.runtime_path.endswith(f"{name}.ini")
assert info.size_kb >= 0
def test_get_profile_info_default_only(self, temp_profile_dir):
name = "info_default"
settings = open_default_settings(name)
def test_get_profile_info_baseline_only(self, temp_profile_dir):
name = "info_baseline"
settings = open_baseline_settings(name)
self._write_manifest(settings, count=1)
user_path = user_profile_path(name)
if os.path.exists(user_path):
os.remove(user_path)
runtime_path = runtime_profile_path(name)
if os.path.exists(runtime_path):
os.remove(runtime_path)
info = get_profile_info(name)
assert info.origin == "settings"
assert info.user_path.endswith(f"{name}.ini")
assert info.baseline_path.endswith(f"{name}.ini")
assert info.widget_count == 1
def test_get_profile_info_module_readonly(self, module_profile_factory):
@@ -1289,10 +1238,10 @@ class TestProfileInfoAndScreenshots:
def test_get_profile_info_unknown_profile(self):
name = "nonexistent_profile"
if os.path.exists(user_profile_path(name)):
os.remove(user_profile_path(name))
if os.path.exists(default_profile_path(name)):
os.remove(default_profile_path(name))
if os.path.exists(runtime_profile_path(name)):
os.remove(runtime_profile_path(name))
if os.path.exists(baseline_profile_path(name)):
os.remove(baseline_profile_path(name))
info = get_profile_info(name)
@@ -1300,29 +1249,29 @@ class TestProfileInfoAndScreenshots:
assert info.is_read_only is False
assert info.widget_count == 0
def test_load_user_profile_screenshot(self, temp_profile_dir):
name = "user_screenshot"
settings = open_user_settings(name)
def test_load_runtime_profile_screenshot(self, temp_profile_dir):
name = "runtime_screenshot"
settings = open_runtime_settings(name)
settings.setValue(profile_utils.SETTINGS_KEYS["screenshot"], self.PNG_BYTES)
settings.sync()
pix = load_user_profile_screenshot(name)
pix = load_runtime_profile_screenshot(name)
assert pix is not None and not pix.isNull()
def test_load_default_profile_screenshot(self, temp_profile_dir):
name = "default_screenshot"
settings = open_default_settings(name)
def test_load_baseline_profile_screenshot(self, temp_profile_dir):
name = "baseline_screenshot"
settings = open_baseline_settings(name)
settings.setValue(profile_utils.SETTINGS_KEYS["screenshot"], self.PNG_BYTES)
settings.sync()
pix = load_default_profile_screenshot(name)
pix = load_baseline_profile_screenshot(name)
assert pix is not None and not pix.isNull()
def test_load_screenshot_from_settings_invalid(self, temp_profile_dir):
name = "invalid_screenshot"
settings = open_user_settings(name)
settings = open_runtime_settings(name)
settings.setValue(profile_utils.SETTINGS_KEYS["screenshot"], "not-an-image")
settings.sync()
@@ -1332,7 +1281,7 @@ class TestProfileInfoAndScreenshots:
def test_load_screenshot_from_settings_bytes(self, temp_profile_dir):
name = "bytes_screenshot"
settings = open_user_settings(name)
settings = open_runtime_settings(name)
settings.setValue(profile_utils.SETTINGS_KEYS["screenshot"], self.PNG_BYTES)
settings.sync()
@@ -1347,7 +1296,7 @@ class TestWorkSpaceManager:
@staticmethod
def _create_profiles(names):
for name in names:
settings = open_user_settings(name)
settings = open_runtime_settings(name)
settings.setValue("meta", "value")
settings.sync()
@@ -1411,7 +1360,7 @@ class TestWorkSpaceManager:
manager.delete_profile(name)
assert not os.path.exists(user_profile_path(name))
assert not os.path.exists(runtime_profile_path(name))
assert target.refresh_calls >= 1
def test_delete_readonly_profile_shows_message(
@@ -1441,21 +1390,23 @@ class TestWorkSpaceManager:
class TestAdvancedDockAreaRestoreAndDialogs:
"""Additional coverage for restore flows and workspace dialogs."""
def test_restore_user_profile_from_default_confirm_true(self, advanced_dock_area, monkeypatch):
def test_restore_runtime_profile_from_baseline_confirm_true(
self, advanced_dock_area, monkeypatch
):
profile_name = "profile_restore_true"
helper = profile_helper(advanced_dock_area)
helper.open_default(profile_name).sync()
helper.open_user(profile_name).sync()
helper.open_baseline(profile_name).sync()
helper.open_runtime(profile_name).sync()
advanced_dock_area._current_profile_name = profile_name
advanced_dock_area.isVisible = lambda: False
pix = QPixmap(8, 8)
pix.fill(Qt.red)
monkeypatch.setattr(
"bec_widgets.widgets.containers.dock_area.dock_area.load_user_profile_screenshot",
"bec_widgets.widgets.containers.dock_area.dock_area.load_runtime_profile_screenshot",
lambda name, namespace=None: pix,
)
monkeypatch.setattr(
"bec_widgets.widgets.containers.dock_area.dock_area.load_default_profile_screenshot",
"bec_widgets.widgets.containers.dock_area.dock_area.load_baseline_profile_screenshot",
lambda name, namespace=None: pix,
)
monkeypatch.setattr(
@@ -1465,12 +1416,12 @@ class TestAdvancedDockAreaRestoreAndDialogs:
with (
patch(
"bec_widgets.widgets.containers.dock_area.dock_area.restore_user_from_default"
"bec_widgets.widgets.containers.dock_area.dock_area.restore_runtime_from_baseline"
) as mock_restore,
patch.object(advanced_dock_area, "delete_all") as mock_delete_all,
patch.object(advanced_dock_area, "load_profile") as mock_load_profile,
):
advanced_dock_area.restore_user_profile_from_default()
advanced_dock_area.restore_baseline_profile(show_dialog=True)
assert mock_restore.call_count == 1
args, kwargs = mock_restore.call_args
@@ -1479,20 +1430,22 @@ class TestAdvancedDockAreaRestoreAndDialogs:
mock_delete_all.assert_called_once()
mock_load_profile.assert_called_once_with(profile_name)
def test_restore_user_profile_from_default_confirm_false(self, advanced_dock_area, monkeypatch):
def test_restore_runtime_profile_from_baseline_confirm_false(
self, advanced_dock_area, monkeypatch
):
profile_name = "profile_restore_false"
helper = profile_helper(advanced_dock_area)
helper.open_default(profile_name).sync()
helper.open_user(profile_name).sync()
helper.open_baseline(profile_name).sync()
helper.open_runtime(profile_name).sync()
advanced_dock_area._current_profile_name = profile_name
advanced_dock_area.isVisible = lambda: False
monkeypatch.setattr(
"bec_widgets.widgets.containers.dock_area.dock_area.load_user_profile_screenshot",
lambda name: QPixmap(),
"bec_widgets.widgets.containers.dock_area.dock_area.load_runtime_profile_screenshot",
lambda name, namespace=None: QPixmap(),
)
monkeypatch.setattr(
"bec_widgets.widgets.containers.dock_area.dock_area.load_default_profile_screenshot",
lambda name: QPixmap(),
"bec_widgets.widgets.containers.dock_area.dock_area.load_baseline_profile_screenshot",
lambda name, namespace=None: QPixmap(),
)
monkeypatch.setattr(
"bec_widgets.widgets.containers.dock_area.dock_area.RestoreProfileDialog.confirm",
@@ -1500,24 +1453,49 @@ class TestAdvancedDockAreaRestoreAndDialogs:
)
with patch(
"bec_widgets.widgets.containers.dock_area.dock_area.restore_user_from_default"
"bec_widgets.widgets.containers.dock_area.dock_area.restore_runtime_from_baseline"
) as mock_restore:
advanced_dock_area.restore_user_profile_from_default()
advanced_dock_area.restore_baseline_profile(show_dialog=True)
mock_restore.assert_not_called()
def test_restore_user_profile_from_default_no_target(self, advanced_dock_area, monkeypatch):
def test_restore_runtime_profile_from_baseline_without_dialog(self, advanced_dock_area):
profile_name = "alignment_scan"
helper = profile_helper(advanced_dock_area)
helper.open_baseline(profile_name).sync()
helper.open_runtime(profile_name).sync()
with (
patch(
"bec_widgets.widgets.containers.dock_area.dock_area.RestoreProfileDialog.confirm"
) as mock_confirm,
patch(
"bec_widgets.widgets.containers.dock_area.dock_area.restore_runtime_from_baseline"
) as mock_restore,
patch.object(advanced_dock_area, "delete_all") as mock_delete_all,
patch.object(advanced_dock_area, "load_profile") as mock_load_profile,
):
advanced_dock_area.restore_baseline_profile(profile_name, show_dialog=False)
mock_confirm.assert_not_called()
mock_restore.assert_called_once_with(
profile_name, namespace=advanced_dock_area.profile_namespace
)
mock_delete_all.assert_called_once()
mock_load_profile.assert_called_once_with(profile_name)
def test_restore_runtime_profile_from_baseline_no_target(self, advanced_dock_area, monkeypatch):
advanced_dock_area._current_profile_name = None
with patch(
"bec_widgets.widgets.containers.dock_area.dock_area.RestoreProfileDialog.confirm"
) as mock_confirm:
advanced_dock_area.restore_user_profile_from_default()
advanced_dock_area.restore_baseline_profile(show_dialog=True)
mock_confirm.assert_not_called()
def test_refresh_workspace_list_with_refresh_profiles(self, advanced_dock_area):
profile_name = "refresh_profile"
helper = profile_helper(advanced_dock_area)
helper.open_user(profile_name).sync()
helper.open_runtime(profile_name).sync()
# Simulate a normal named-profile state (not transient empty startup mode).
advanced_dock_area._empty_profile_active = False
advanced_dock_area._current_profile_name = profile_name
@@ -1572,8 +1550,8 @@ class TestAdvancedDockAreaRestoreAndDialogs:
active = "active_profile"
quick = "quick_profile"
helper = profile_helper(advanced_dock_area)
helper.open_user(active).sync()
helper.open_user(quick).sync()
helper.open_runtime(active).sync()
helper.open_runtime(quick).sync()
helper.set_quick_select(quick, True)
combo_stub = ComboStub()
@@ -1600,7 +1578,7 @@ class TestAdvancedDockAreaRestoreAndDialogs:
advanced_dock_area._current_profile_name = "manager_profile"
helper = profile_helper(advanced_dock_area)
helper.open_user("manager_profile").sync()
helper.open_runtime("manager_profile").sync()
advanced_dock_area.show_workspace_manager()
@@ -1635,18 +1613,104 @@ class TestProfileManagement:
def test_profile_path(self, temp_profile_dir):
"""Test profile path generation."""
path = user_profile_path("test_profile")
expected = os.path.join(temp_profile_dir, "user", "test_profile.ini")
path = runtime_profile_path("test_profile")
expected = os.path.join(temp_profile_dir, "runtime", "test_profile.ini")
assert path == expected
default_path = default_profile_path("test_profile")
expected_default = os.path.join(temp_profile_dir, "default", "test_profile.ini")
assert default_path == expected_default
baseline_path = baseline_profile_path("test_profile")
expected_baseline = os.path.join(temp_profile_dir, "baseline", "test_profile.ini")
assert baseline_path == expected_baseline
def test_open_settings(self, temp_profile_dir):
"""Test opening settings for a profile."""
settings = open_user_settings("test_profile")
assert isinstance(settings, QSettings)
def test_legacy_user_profile_is_mapped_to_runtime(self, temp_profile_dir):
"""Legacy user profiles are copied into the canonical runtime segment."""
name = "legacy_runtime"
legacy_dir = os.path.join(temp_profile_dir, "user")
os.makedirs(legacy_dir, exist_ok=True)
legacy_path = os.path.join(legacy_dir, f"{name}.ini")
legacy_settings = QSettings(legacy_path, QSettings.IniFormat)
legacy_settings.setValue("test/value", "legacy")
legacy_settings.sync()
canonical_path = runtime_profile_path(name)
assert not os.path.exists(canonical_path)
assert name in list_profiles()
assert os.path.exists(canonical_path)
assert open_runtime_settings(name).value("test/value", "", type=str) == "legacy"
def test_legacy_default_profile_is_mapped_to_baseline(self, temp_profile_dir):
"""Legacy default profiles are copied into the canonical baseline segment."""
name = "legacy_baseline"
legacy_dir = os.path.join(temp_profile_dir, "default")
os.makedirs(legacy_dir, exist_ok=True)
legacy_path = os.path.join(legacy_dir, f"{name}.ini")
legacy_settings = QSettings(legacy_path, QSettings.IniFormat)
legacy_settings.setValue("test/value", "legacy")
legacy_settings.sync()
canonical_path = baseline_profile_path(name)
assert not os.path.exists(canonical_path)
assert name in list_profiles()
assert os.path.exists(canonical_path)
assert open_baseline_settings(name).value("test/value", "", type=str) == "legacy"
def test_runtime_namespace_fallback_is_materialized(self, temp_profile_dir):
"""Canonical runtime namespace fallback is copied before opening primary settings."""
name = "runtime_namespace_fallback"
fallback_settings = open_runtime_settings(name)
fallback_settings.setValue("test/value", "fallback")
fallback_settings.sync()
namespaced_path = runtime_profile_path(name, namespace="beamline")
assert not os.path.exists(namespaced_path)
settings = open_runtime_settings(name, namespace="beamline")
assert os.path.exists(namespaced_path)
assert settings.value("test/value", "", type=str) == "fallback"
def test_baseline_namespace_fallback_is_materialized(self, temp_profile_dir):
"""Canonical baseline namespace fallback is copied before opening primary settings."""
name = "baseline_namespace_fallback"
fallback_settings = open_baseline_settings(name)
fallback_settings.setValue("test/value", "fallback")
fallback_settings.sync()
namespaced_path = baseline_profile_path(name, namespace="beamline")
assert not os.path.exists(namespaced_path)
settings = open_baseline_settings(name, namespace="beamline")
assert os.path.exists(namespaced_path)
assert settings.value("test/value", "", type=str) == "fallback"
def test_canonical_profile_wins_over_legacy_profile(self, temp_profile_dir):
"""Canonical runtime/baseline files are not overwritten by legacy fallback files."""
name = "canonical_wins"
runtime_settings = open_runtime_settings(name)
runtime_settings.setValue("test/value", "canonical-runtime")
runtime_settings.sync()
baseline_settings = open_baseline_settings(name)
baseline_settings.setValue("test/value", "canonical-baseline")
baseline_settings.sync()
for segment, value in (("user", "legacy-runtime"), ("default", "legacy-baseline")):
legacy_dir = os.path.join(temp_profile_dir, segment)
os.makedirs(legacy_dir, exist_ok=True)
legacy_settings = QSettings(
os.path.join(legacy_dir, f"{name}.ini"), QSettings.IniFormat
)
legacy_settings.setValue("test/value", value)
legacy_settings.sync()
assert name in list_profiles()
assert open_runtime_settings(name).value("test/value", "", type=str) == "canonical-runtime"
assert (
open_baseline_settings(name).value("test/value", "", type=str) == "canonical-baseline"
)
def test_list_profiles_empty(self, temp_profile_dir):
"""Test listing profiles when directory is empty."""
@@ -1666,7 +1730,7 @@ class TestProfileManagement:
# Create some test profile files
profile_names = ["profile1", "profile2", "profile3"]
for name in profile_names:
settings = open_user_settings(name)
settings = open_runtime_settings(name)
settings.setValue("test", "value")
settings.sync()
@@ -1676,24 +1740,24 @@ class TestProfileManagement:
def test_readonly_profile_operations(self, temp_profile_dir, module_profile_factory):
"""Test read-only profile functionality."""
profile_name = "user_profile"
profile_name = "runtime_profile"
# Initially should not be read-only
assert not is_profile_read_only(profile_name)
# Create a user profile and ensure it's writable
settings = open_user_settings(profile_name)
# Create a runtime profile and ensure it's writable
settings = open_runtime_settings(profile_name)
settings.setValue("test", "value")
settings.sync()
assert not is_profile_read_only(profile_name)
# Verify a bundled module profile is detected as read-only
readonly_name = module_profile_factory("module_default")
readonly_name = module_profile_factory("module_baseline")
assert is_profile_read_only(readonly_name)
def test_write_and_read_manifest(self, temp_profile_dir, advanced_dock_area, qtbot):
"""Test writing and reading dock manifest."""
settings = open_user_settings("test_manifest")
settings = open_runtime_settings("test_manifest")
# Create real docks
advanced_dock_area.new("DarkModeButton")
@@ -1723,18 +1787,18 @@ class TestProfileManagement:
def test_restore_preserves_quick_select(self, temp_profile_dir):
"""Ensure restoring keeps the quick select flag when it was enabled."""
profile_name = "restorable_profile"
default_settings = open_default_settings(profile_name)
default_settings.setValue("test", "default")
default_settings.sync()
baseline_settings = open_baseline_settings(profile_name)
baseline_settings.setValue("test", "baseline")
baseline_settings.sync()
user_settings = open_user_settings(profile_name)
user_settings.setValue("test", "user")
user_settings.sync()
runtime_settings = open_runtime_settings(profile_name)
runtime_settings.setValue("test", "runtime")
runtime_settings.sync()
set_quick_select(profile_name, True)
assert is_quick_select(profile_name)
restore_user_from_default(profile_name)
restore_runtime_from_baseline(profile_name)
assert is_quick_select(profile_name)
@@ -1758,7 +1822,7 @@ class TestWorkspaceProfileOperations:
widget.prepare_for_shutdown()
mock_write.assert_not_called()
helper.open_user("real_profile").sync()
helper.open_runtime("real_profile").sync()
widget.load_profile("real_profile")
assert widget._empty_profile_active is False
assert widget._empty_profile_consumed is True
@@ -1772,7 +1836,7 @@ class TestWorkspaceProfileOperations:
profile_name = module_profile_factory("readonly_profile")
new_profile = f"{profile_name}_custom"
helper = profile_helper(advanced_dock_area)
target_path = helper.user_path(new_profile)
target_path = helper.runtime_path(new_profile)
if os.path.exists(target_path):
os.remove(target_path)
@@ -1802,7 +1866,7 @@ class TestWorkspaceProfileOperations:
helper = profile_helper(advanced_dock_area)
# Create a profile with manifest
settings = helper.open_user(profile_name)
settings = helper.open_runtime(profile_name)
settings.beginWriteArray("manifest/widgets", 1)
settings.setArrayIndex(0)
settings.setValue("object_name", "test_widget")
@@ -1823,6 +1887,83 @@ class TestWorkspaceProfileOperations:
widget_map = advanced_dock_area.widget_map()
assert "test_widget" in widget_map
def test_load_profile_default_does_not_restore_baseline(self, advanced_dock_area):
"""Regular profile loading should not restore the runtime copy."""
profile_name = "load_without_baseline_restore"
helper = profile_helper(advanced_dock_area)
helper.open_runtime(profile_name).sync()
with patch(
"bec_widgets.widgets.containers.dock_area.dock_area.restore_runtime_from_baseline"
) as mock_restore:
advanced_dock_area.load_profile(profile_name)
mock_restore.assert_not_called()
assert advanced_dock_area._current_profile_name == profile_name
def test_load_profile_restores_baseline_without_dialog(self, advanced_dock_area):
"""CLI loading can restore the runtime copy from baseline without confirmation."""
profile_name = "alignment_scan"
helper = profile_helper(advanced_dock_area)
helper.open_baseline(profile_name).sync()
helper.open_runtime(profile_name).sync()
with (
patch(
"bec_widgets.widgets.containers.dock_area.dock_area.RestoreProfileDialog.confirm"
) as mock_confirm,
patch(
"bec_widgets.widgets.containers.dock_area.dock_area.restore_runtime_from_baseline"
) as mock_restore,
):
advanced_dock_area.load_profile(profile_name, restore_baseline=True)
mock_confirm.assert_not_called()
mock_restore.assert_called_once_with(
profile_name, namespace=advanced_dock_area.profile_namespace
)
assert advanced_dock_area._current_profile_name == profile_name
def test_load_profile_materializes_runtime_namespace_fallback(self, advanced_dock_area):
"""Loading a runtime fallback copies it into the active namespace before opening."""
profile_name = "load_runtime_namespace_fallback"
helper = profile_helper(advanced_dock_area)
fallback_settings = open_runtime_settings(profile_name)
fallback_settings.setValue("test/value", "fallback")
fallback_settings.sync()
namespaced_path = helper.runtime_path(profile_name)
assert not os.path.exists(namespaced_path)
advanced_dock_area.load_profile(profile_name)
assert os.path.exists(namespaced_path)
assert (
QSettings(namespaced_path, QSettings.IniFormat).value("test/value", "", type=str)
== "fallback"
)
assert advanced_dock_area._current_profile_name == profile_name
def test_load_profile_materializes_baseline_namespace_fallback(self, advanced_dock_area):
"""Loading a baseline fallback copies it into the active namespace before opening."""
profile_name = "load_baseline_namespace_fallback"
helper = profile_helper(advanced_dock_area)
fallback_settings = open_baseline_settings(profile_name)
fallback_settings.setValue("test/value", "fallback")
fallback_settings.sync()
namespaced_path = helper.baseline_path(profile_name)
assert not os.path.exists(namespaced_path)
advanced_dock_area.load_profile(profile_name)
assert os.path.exists(namespaced_path)
assert (
QSettings(namespaced_path, QSettings.IniFormat).value("test/value", "", type=str)
== "fallback"
)
assert advanced_dock_area._current_profile_name == profile_name
def test_save_as_skips_autosave_source_profile(
self, advanced_dock_area, temp_profile_dir, qtbot
):
@@ -1831,7 +1972,7 @@ class TestWorkspaceProfileOperations:
new_profile = "autosave_new"
helper = profile_helper(advanced_dock_area)
settings = helper.open_user(source_profile)
settings = helper.open_runtime(source_profile)
settings.beginWriteArray("manifest/widgets", 1)
settings.setArrayIndex(0)
settings.setValue("object_name", "source_widget")
@@ -1863,11 +2004,16 @@ class TestWorkspaceProfileOperations:
with patch(
"bec_widgets.widgets.containers.dock_area.dock_area.SaveProfileDialog", StubDialog
):
advanced_dock_area.save_profile(show_dialog=True)
widgets_before_save = list(advanced_dock_area.widget_list())
with patch.object(advanced_dock_area, "load_profile") as mock_load_profile:
advanced_dock_area.save_profile(show_dialog=True)
qtbot.wait(100)
mock_load_profile.assert_not_called()
qtbot.wait(500)
source_manifest = read_manifest(helper.open_user(source_profile))
new_manifest = read_manifest(helper.open_user(new_profile))
assert list(advanced_dock_area.widget_list()) == widgets_before_save
source_manifest = read_manifest(helper.open_runtime(source_profile))
new_manifest = read_manifest(helper.open_runtime(new_profile))
assert len(source_manifest) == 1
assert len(new_manifest) == 2
@@ -1879,7 +2025,7 @@ class TestWorkspaceProfileOperations:
helper = profile_helper(advanced_dock_area)
for profile in (profile_a, profile_b):
settings = helper.open_user(profile)
settings = helper.open_runtime(profile)
settings.beginWriteArray("manifest/widgets", 1)
settings.setArrayIndex(0)
settings.setValue("object_name", f"{profile}_widget")
@@ -1898,7 +2044,7 @@ class TestWorkspaceProfileOperations:
advanced_dock_area.load_profile(profile_b)
qtbot.wait(500)
manifest_a = read_manifest(helper.open_user(profile_a))
manifest_a = read_manifest(helper.open_runtime(profile_a))
assert len(manifest_a) == 2
def test_delete_profile_readonly(
@@ -1907,15 +2053,15 @@ class TestWorkspaceProfileOperations:
"""Test deleting bundled profile removes only the writable copy."""
profile_name = module_profile_factory("readonly_profile")
helper = profile_helper(advanced_dock_area)
helper.list_profiles() # ensure default and user copies are materialized
helper.open_default(profile_name).sync()
settings = helper.open_user(profile_name)
helper.list_profiles() # ensure baseline and runtime copies are materialized
helper.open_baseline(profile_name).sync()
settings = helper.open_runtime(profile_name)
settings.setValue("test", "value")
settings.sync()
user_path = helper.user_path(profile_name)
default_path = helper.default_path(profile_name)
assert os.path.exists(user_path)
assert os.path.exists(default_path)
runtime_path = helper.runtime_path(profile_name)
baseline_path = helper.baseline_path(profile_name)
assert os.path.exists(runtime_path)
assert os.path.exists(baseline_path)
with patch.object(advanced_dock_area.toolbar.components, "get_action") as mock_get_action:
mock_combo = MagicMock()
@@ -1936,9 +2082,9 @@ class TestWorkspaceProfileOperations:
mock_question.assert_not_called()
mock_info.assert_called_once()
# Read-only profile should remain intact (user + default copies)
assert os.path.exists(user_path)
assert os.path.exists(default_path)
# Read-only profile should remain intact (runtime + baseline copies)
assert os.path.exists(runtime_path)
assert os.path.exists(baseline_path)
def test_delete_profile_success(self, advanced_dock_area, temp_profile_dir):
"""Test successful profile deletion."""
@@ -1946,11 +2092,11 @@ class TestWorkspaceProfileOperations:
helper = profile_helper(advanced_dock_area)
# Create regular profile
settings = helper.open_user(profile_name)
settings = helper.open_runtime(profile_name)
settings.setValue("test", "value")
settings.sync()
user_path = helper.user_path(profile_name)
assert os.path.exists(user_path)
runtime_path = helper.runtime_path(profile_name)
assert os.path.exists(runtime_path)
with patch.object(advanced_dock_area.toolbar.components, "get_action") as mock_get_action:
mock_combo = MagicMock()
@@ -1968,7 +2114,7 @@ class TestWorkspaceProfileOperations:
mock_question.assert_called_once()
mock_refresh.assert_called_once()
# Profile should be deleted
assert not os.path.exists(user_path)
assert not os.path.exists(runtime_path)
def test_delete_profile_cli_usage(self, advanced_dock_area, temp_profile_dir):
"""Test delete_profile with explicit name (CLI usage - no dialog by default)."""
@@ -1976,24 +2122,24 @@ class TestWorkspaceProfileOperations:
helper = profile_helper(advanced_dock_area)
# Create regular profile
settings = helper.open_user(profile_name)
settings = helper.open_runtime(profile_name)
settings.setValue("test", "value")
settings.sync()
user_path = helper.user_path(profile_name)
assert os.path.exists(user_path)
runtime_path = helper.runtime_path(profile_name)
assert os.path.exists(runtime_path)
# Delete without dialog (CLI usage - default behavior)
result = advanced_dock_area.delete_profile(profile_name)
assert result is True
assert not os.path.exists(user_path)
assert not os.path.exists(runtime_path)
def test_refresh_workspace_list(self, advanced_dock_area, temp_profile_dir):
"""Test refreshing workspace list."""
# Create some profiles
helper = profile_helper(advanced_dock_area)
for name in ["profile1", "profile2"]:
settings = helper.open_user(name)
settings = helper.open_runtime(name)
settings.setValue("test", "value")
settings.sync()
@@ -2033,20 +2179,6 @@ class TestCleanupAndMisc:
# Verify dock was removed
assert len(advanced_dock_area.dock_list()) == initial_count - 1
def test_apply_dock_lock(self, advanced_dock_area, qtbot):
"""Test _apply_dock_lock functionality."""
# Create a dock first
advanced_dock_area.new("DarkModeButton")
qtbot.wait(200)
# Test locking
advanced_dock_area._apply_dock_lock(True)
# No assertion needed - just verify it doesn't crash
# Test unlocking
advanced_dock_area._apply_dock_lock(False)
# No assertion needed - just verify it doesn't crash
def test_make_dock(self, advanced_dock_area):
"""Test _make_dock functionality."""
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import (
@@ -2229,7 +2361,6 @@ class TestFlatToolbarActions:
"flat_progress_bar",
"flat_terminal",
"flat_bec_shell",
"flat_log_panel",
"flat_sbb_monitor",
]
@@ -2289,11 +2420,6 @@ class TestFlatToolbarActions:
action.trigger()
mock_new.assert_called_once_with(widget_type)
def test_flat_log_panel_action_disabled(self, advanced_dock_area):
"""Test that flat log panel action is disabled."""
action = advanced_dock_area.toolbar.components.get_action("flat_log_panel").action
assert not action.isEnabled()
class TestModeTransitions:
"""Test mode transitions and state consistency."""
+11 -16
View File
@@ -5,7 +5,7 @@ import black
import isort
import pytest
from bec_widgets.cli.generate_cli import ClientGenerator
from bec_widgets.utils.generate_cli import ClientGenerator
from bec_widgets.utils.plugin_utils import BECClassContainer, BECClassInfo
# pylint: disable=missing-function-docstring
@@ -104,8 +104,7 @@ def test_client_generator_with_black_formatting():
from bec_lib.logger import bec_logger
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call, rpc_timeout
from bec_widgets.utils.bec_plugin_helper import (get_all_plugin_widgets,
get_plugin_client_module)
from bec_widgets.utils.bec_plugin_helper import get_plugin_client_module
logger = bec_logger.logger
@@ -123,31 +122,25 @@ def test_client_generator_with_black_formatting():
try:
_plugin_widgets = get_all_plugin_widgets().as_dict()
plugin_client = get_plugin_client_module()
Widgets = _WidgetsEnumType("Widgets", {name: name for name in _plugin_widgets} | _Widgets)
if (_overlap := _Widgets.keys() & _plugin_widgets.keys()) != set():
for _widget in _overlap:
logger.warning(f"Detected duplicate widget {_widget} in plugin repo file: {inspect.getfile(_plugin_widgets[_widget])} !")
for plugin_name, plugin_class in inspect.getmembers(plugin_client, inspect.isclass):
if issubclass(plugin_class, RPCBase) and plugin_class is not RPCBase:
if plugin_name not in _Widgets:
_Widgets[plugin_name] = plugin_name
if plugin_name in globals():
conflicting_file = (
inspect.getfile(_plugin_widgets[plugin_name])
if plugin_name in _plugin_widgets
else f"{plugin_client}"
)
logger.warning(
f"Plugin widget {plugin_name} from {conflicting_file} conflicts with a built-in class!"
f"Plugin widget {plugin_name} in {plugin_class._IMPORT_MODULE} conflicts with a built-in class!"
)
continue
if plugin_name not in _overlap:
else:
globals()[plugin_name] = plugin_class
Widgets = _WidgetsEnumType("Widgets", _Widgets)
except ImportError as e:
logger.error(f"Failed loading plugins: \\n{reduce(add, traceback.format_exception(e))}")
class MockBECFigure(RPCBase):
_IMPORT_MODULE = "tests.unit_tests.test_generate_cli_client"
@rpc_call
def add_plot(self, plot_id: str):
"""
@@ -162,6 +155,8 @@ def test_client_generator_with_black_formatting():
class MockBECWaveform1D(RPCBase):
_IMPORT_MODULE = "tests.unit_tests.test_generate_cli_client"
@rpc_call
def set_frequency(self, frequency: float) -> list:
"""
+3 -1
View File
@@ -16,7 +16,9 @@ from bec_widgets.widgets.plots.heatmap.heatmap import (
)
# pytest: disable=unused-import
from .client_mocks import create_dummy_scan_item, mocked_client
from tests.unit_tests.client_mocks import mocked_client
from .client_mocks import create_dummy_scan_item
@pytest.fixture
+5 -10
View File
@@ -7,9 +7,8 @@ from qtpy.QtCore import QPointF, Qt
from bec_widgets.widgets.plots.image.image import Image
from bec_widgets.widgets.plots.image.setting_widgets.image_roi_tree import ROIPropertyTree
from bec_widgets.widgets.plots.roi.image_roi import CircularROI, RectangularROI
from .client_mocks import mocked_client
from .conftest import create_widget
from tests.unit_tests.client_mocks import mocked_client
from tests.unit_tests.conftest import create_widget
@pytest.fixture
@@ -45,7 +44,7 @@ def test_initialization(roi_tree, image_widget):
assert roi_tree.plot == image_widget.plot_item
assert roi_tree.controller == image_widget.roi_controller
assert isinstance(roi_tree.roi_items, dict)
assert len(roi_tree.tree.findItems("", Qt.MatchFlag.MatchContains)) == 0 # Empty tree initially
assert len(roi_tree.tree.findItems("", Qt.MatchContains)) == 0 # Empty tree initially
# Check toolbar actions
assert roi_tree.toolbar.components.get_action("roi_rectangle")
@@ -67,16 +66,12 @@ def test_controller_connection(roi_tree, image_widget):
# Verify that ROI was added to the tree
assert roi in roi_tree.roi_items
assert (
len(roi_tree.tree.findItems("test_roi", Qt.MatchFlag.MatchExactly, roi_tree.COL_ROI)) == 1
)
assert len(roi_tree.tree.findItems("test_roi", Qt.MatchExactly, roi_tree.COL_ROI)) == 1
# Remove ROI via controller and check that it's removed from the tree
image_widget.remove_roi(0)
assert roi not in roi_tree.roi_items
assert (
len(roi_tree.tree.findItems("test_roi", Qt.MatchFlag.MatchExactly, roi_tree.COL_ROI)) == 0
)
assert len(roi_tree.tree.findItems("test_roi", Qt.MatchExactly, roi_tree.COL_ROI)) == 0
def test_expand_collapse_tree(roi_tree, image_widget):
+2 -3
View File
@@ -12,9 +12,8 @@ from bec_widgets.widgets.plots.roi.image_roi import (
RectangularROI,
ROIController,
)
from .client_mocks import mocked_client
from .conftest import create_widget
from tests.unit_tests.client_mocks import mocked_client
from tests.unit_tests.conftest import create_widget
@pytest.fixture(params=["rect", "circle", "ellipse"])
+2 -3
View File
@@ -5,9 +5,8 @@ from bec_lib.endpoints import MessageEndpoints
from qtpy.QtCore import QPointF
from bec_widgets.widgets.plots.image.image import Image
from .client_mocks import mocked_client
from .conftest import create_widget
from tests.unit_tests.client_mocks import mocked_client
from tests.unit_tests.conftest import create_widget
##################################################
# Image widget base functionality tests
+106 -146
View File
@@ -7,163 +7,123 @@ from collections import deque
from unittest.mock import MagicMock, patch
import pytest
from bec_lib.logger import LogLevel
from bec_lib.messages import LogMessage
from bec_lib.redis_connector import StreamMessage
from qtpy.QtCore import QDateTime
from bec_widgets.widgets.utility.logpanel._util import (
log_time,
replace_escapes,
simple_color_format,
)
from bec_widgets.widgets.utility.logpanel.logpanel import DEFAULT_LOG_COLORS, LogPanel
from bec_widgets.widgets.utility.logpanel.logpanel import LogPanel, TimestampUpdate
from .client_mocks import mocked_client
TEST_TABLE_STRING = "2025-01-15 15:57:18 | bec_server.scan_server.scan_queue | [INFO] | \n \x1b[3m primary queue / ScanQueueStatus.RUNNING \x1b[0m\n┏━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━┳━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━┓\n\x1b[1m \x1b[0m\x1b[1m queue_id \x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mscan_id\x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mis_scan\x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mtype\x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mscan_numb…\x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mIQ status\x1b[0m\x1b[1m \x1b[0m┃\n┡━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━╇━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━┩\n│ bbe50c82-6… │ None │ False │ mv │ None │ PENDING │\n└─────────────┴─────────┴─────────┴──────┴────────────┴───────────┘\n\n"
TEST_LOG_MESSAGES = [
LogMessage(
metadata={},
log_type="debug",
log_msg={
"text": "datetime | debug | test log message",
"record": {"time": {"timestamp": 123456789.000}},
"service_name": "ScanServer",
},
),
LogMessage(
metadata={},
log_type="info",
log_msg={
"text": "datetime | info | test log message",
"record": {"time": {"timestamp": 123456789.007}},
"service_name": "ScanServer",
},
),
LogMessage(
metadata={},
log_type="success",
log_msg={
"text": "datetime | success | test log message",
"record": {"time": {"timestamp": 123456789.012}},
"service_name": "ScanServer",
},
),
]
TEST_COMBINED_PLAINTEXT = "datetime | debug | test log message\ndatetime | info | test log message\ndatetime | success | test log message\n"
@pytest.fixture
def raw_queue():
yield deque(TEST_LOG_MESSAGES, maxlen=100)
@pytest.fixture
def log_panel(qtbot, mocked_client: MagicMock):
widget = LogPanel(client=mocked_client, service_status=MagicMock())
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def test_log_panel_init(log_panel: LogPanel):
assert log_panel.plain_text == ""
def test_table_string_processing():
assert "\x1b" in TEST_TABLE_STRING
sanitized = replace_escapes(TEST_TABLE_STRING)
assert "\x1b" not in sanitized
assert " " not in sanitized
assert "\n" not in sanitized
@pytest.mark.parametrize(
["msg", "color"], zip(TEST_LOG_MESSAGES, ["#0000CC", "#FFFFFF", "#00FF00"])
)
def test_color_format(msg: LogMessage, color: str):
assert color in simple_color_format(msg, DEFAULT_LOG_COLORS)
def test_logpanel_output(qtbot, log_panel: LogPanel):
log_panel._log_manager._data = deque(TEST_LOG_MESSAGES)
log_panel._on_redraw()
assert log_panel.plain_text == TEST_COMBINED_PLAINTEXT
def display_queue_empty():
print(log_panel._log_manager._display_queue)
return len(log_panel._log_manager._display_queue) == 0
next_text = "datetime | error | test log message"
msg = LogMessage(
metadata={},
log_type="error",
log_msg={"text": next_text, "record": {}, "service_name": "ScanServer"},
)
log_panel._log_manager._process_incoming_log_msg(
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
)
qtbot.waitUntil(display_queue_empty, timeout=5000)
assert log_panel.plain_text == TEST_COMBINED_PLAINTEXT + next_text + "\n"
def test_level_filter(log_panel: LogPanel):
log_panel._log_manager._data = deque(TEST_LOG_MESSAGES)
log_panel._log_manager.update_level_filter("INFO")
log_panel._on_redraw()
assert (
log_panel.plain_text
== "datetime | info | test log message\ndatetime | success | test log message\n"
)
def test_clear_button(log_panel: LogPanel):
log_panel._log_manager._data = deque(TEST_LOG_MESSAGES)
log_panel.toolbar.clear_button.click()
assert log_panel._log_manager._data == deque([])
def test_timestamp_filter(log_panel: LogPanel):
log_panel._log_manager._timestamp_start = QDateTime(1973, 11, 29, 21, 33, 9, 5, 1)
pytest.approx(log_panel._log_manager._timestamp_start.toMSecsSinceEpoch() / 1000, 123456789.005)
log_panel._log_manager._timestamp_end = QDateTime(1973, 11, 29, 21, 33, 9, 10, 1)
pytest.approx(log_panel._log_manager._timestamp_end.toMSecsSinceEpoch() / 1000, 123456789.010)
filter_ = log_panel._log_manager._create_timestamp_filter()
assert not filter_(TEST_LOG_MESSAGES[0])
assert filter_(TEST_LOG_MESSAGES[1])
assert not filter_(TEST_LOG_MESSAGES[2])
def test_error_handling_in_callback(log_panel: LogPanel):
log_panel._log_manager.new_message = MagicMock()
with patch("bec_widgets.widgets.utility.logpanel.logpanel.logger") as logger:
# generally errors should be logged
log_panel._log_manager.new_message.emit = MagicMock(
side_effect=ValueError("Something went wrong")
)
msg = LogMessage(
{"data": msg}
for msg in [
LogMessage(
metadata={},
log_type="debug",
log_msg={
"text": "datetime | debug | test log message",
"record": {"time": {"timestamp": 123456789.000}},
"record": {
"time": {"timestamp": 123456789.000, "repr": "2025-01-01 00:00:01"},
"message": "test debug message abcd",
"function": "_debug",
},
"service_name": "ScanServer",
},
),
LogMessage(
metadata={},
log_type="info",
log_msg={
"text": "datetime | info | test info log message",
"record": {
"time": {"timestamp": 123456789.007, "repr": "2025-01-01 00:00:02"},
"message": "test info message efgh",
"function": "_info",
},
"service_name": "DeviceServer",
},
),
LogMessage(
metadata={},
log_type="success",
log_msg={
"text": "datetime | success | test log message",
"record": {
"time": {"timestamp": 123456789.012, "repr": "2025-01-01 00:00:03"},
"message": "test success message ijkl",
"function": "_success",
},
"service_name": "ScanServer",
},
),
]
]
@pytest.fixture
def log_panel(qtbot, mocked_client):
mocked_client.connector.xread = lambda *_, **__: TEST_LOG_MESSAGES
widget = LogPanel()
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
widget._model.log_queue.cleanup()
widget.close()
widget.deleteLater()
qtbot.wait(100)
def test_log_panel_init(qtbot, log_panel: LogPanel):
assert log_panel
def test_log_panel_filters(qtbot, log_panel: LogPanel):
assert log_panel._proxy.rowCount() == 3
# Service filter
log_panel._update_service_filter({"DeviceServer"})
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 1, timeout=200)
log_panel._update_service_filter(set())
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 3, timeout=200)
# Text filter
log_panel._proxy.update_filter_text("efgh")
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 1, timeout=200)
log_panel._proxy.update_filter_text("")
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 3, timeout=200)
# Time filter
log_panel._proxy.update_timestamp(
TimestampUpdate(value=QDateTime.fromMSecsSinceEpoch(123456789004), update_type="start")
)
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 2, timeout=200)
log_panel._proxy.update_timestamp(
TimestampUpdate(value=QDateTime.fromMSecsSinceEpoch(123456789009), update_type="end")
)
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 1, timeout=200)
log_panel._proxy.update_timestamp(TimestampUpdate(value=None, update_type="start"))
log_panel._proxy.update_timestamp(TimestampUpdate(value=None, update_type="end"))
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 3, timeout=200)
# Level filter
log_panel._proxy.update_level_filter(LogLevel.SUCCESS)
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 1, timeout=200)
log_panel._proxy.update_level_filter(None)
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 3, timeout=200)
def test_log_panel_update(qtbot, log_panel: LogPanel):
log_panel._model.log_queue._incoming.append(
LogMessage(
metadata={},
log_type="error",
log_msg={
"text": "datetime | error | test log message",
"record": {
"time": {"timestamp": 123456789.015, "repr": "2025-01-01 00:00:03"},
"message": "test error message xyz",
"function": "_error",
},
"service_name": "ScanServer",
},
)
log_panel._log_manager._process_incoming_log_msg(
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
)
logger.warning.assert_called_once()
# this specific error should be ignored and not relogged
log_panel._log_manager.new_message.emit = MagicMock(
side_effect=RuntimeError("Internal C++ object (BecLogsQueue) already deleted.")
)
log_panel._log_manager._process_incoming_log_msg(
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
)
logger.warning.assert_called_once()
)
log_panel._model.log_queue._proc_update()
qtbot.waitUntil(lambda: log_panel._model.rowCount() == 4, timeout=500)
+1 -3
View File
@@ -247,12 +247,10 @@ def test_bec_weblinks(monkeypatch):
monkeypatch.setattr(webbrowser, "open", fake_open)
BECWebLinksMixin.open_bec_docs()
BECWebLinksMixin.open_bec_widgets_docs()
BECWebLinksMixin.open_bec_bug_report()
assert opened_urls == [
"https://beamline-experiment-control.readthedocs.io/en/latest/",
"https://bec.readthedocs.io/projects/bec-widgets/en/latest/",
"https://bec-project.github.io/bec_docs/",
"https://github.com/bec-project/bec_widgets/issues",
]
+1 -1
View File
@@ -1,8 +1,8 @@
from qtpy.QtTest import QSignalSpy
from bec_widgets.widgets.plots.motor_map.motor_map import MotorMap
from tests.unit_tests.client_mocks import mocked_client
from .client_mocks import mocked_client
from .conftest import create_widget
@@ -1,8 +1,8 @@
import numpy as np
from bec_widgets.widgets.plots.multi_waveform.multi_waveform import MultiWaveform
from tests.unit_tests.client_mocks import mocked_client
from .client_mocks import mocked_client
from .conftest import create_widget
##################################################
+1 -13
View File
@@ -1,6 +1,4 @@
import sys
from bec_widgets.utils.plugin_utils import get_custom_class_references, get_custom_classes
from bec_widgets.utils.plugin_utils import get_custom_classes
def test_client_generator_classes():
@@ -12,13 +10,3 @@ def test_client_generator_classes():
assert "Waveform" in connector_cls_names
assert "MotorMap" in plugins
assert "NonExisting" not in plugins
def test_get_custom_class_references_avoids_importing_widget_modules():
target_module = "bec_widgets.widgets.plots.image.image"
sys.modules.pop(target_module, None)
references = get_custom_class_references("bec_widgets", use_cache=False)
assert "Image" in [reference.name for reference in references]
assert target_module not in sys.modules
@@ -3,8 +3,7 @@ import pytest
from bec_widgets.utils.settings_dialog import SettingsDialog
from bec_widgets.widgets.progress.ring_progress_bar.ring_progress_bar import RingProgressBar
from bec_widgets.widgets.progress.ring_progress_bar.ring_progress_settings_cards import RingSettings
from .client_mocks import mocked_client
from tests.unit_tests.client_mocks import mocked_client
@pytest.fixture
+1 -1
View File
@@ -1,4 +1,4 @@
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.rpc_register import RPCRegister
class FakeObject:
+1 -1
View File
@@ -5,7 +5,7 @@ import pytest
from bec_lib.service_config import ServiceConfig
from qtpy.QtWidgets import QWidget
from bec_widgets.cli.server import GUIServer
from bec_widgets.applications.companion_app import GUIServer
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.rpc_server import RegistryNotReadyError, RPCServer, SingleshotRPCRepeat
+17 -37
View File
@@ -1,9 +1,8 @@
import sys
from types import ModuleType
from unittest.mock import patch
from bec_widgets.cli.rpc.rpc_widget_handler import RPCWidgetHandler
from bec_widgets.utils.plugin_utils import BECClassReference
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.plugin_utils import BECClassContainer, BECClassInfo
from bec_widgets.utils.rpc_widget_handler import RPCWidgetHandler
def test_rpc_widget_handler():
@@ -13,38 +12,19 @@ def test_rpc_widget_handler():
assert "BECDockArea" in handler.widget_classes
def test_duplicate_plugins_not_allowed(monkeypatch):
builtin_module = ModuleType("builtin_test_widgets")
plugin_module = ModuleType("plugin_test_widgets")
class _TestPluginWidget(BECWidget): ...
SharedWidgetBuiltin = type("SharedWidget", (), {})
SharedWidgetBuiltin.__module__ = builtin_module.__name__
setattr(builtin_module, "SharedWidget", SharedWidgetBuiltin)
SharedWidgetPlugin = type("SharedWidget", (), {})
SharedWidgetPlugin.__module__ = plugin_module.__name__
setattr(plugin_module, "SharedWidget", SharedWidgetPlugin)
NewPluginWidget = type("NewPluginWidget", (), {})
NewPluginWidget.__module__ = plugin_module.__name__
setattr(plugin_module, "NewPluginWidget", NewPluginWidget)
monkeypatch.setitem(sys.modules, builtin_module.__name__, builtin_module)
monkeypatch.setitem(sys.modules, plugin_module.__name__, plugin_module)
with (
patch(
"bec_widgets.cli.rpc.rpc_widget_handler.get_all_plugin_widget_references",
return_value=[
BECClassReference(name="SharedWidget", module=plugin_module.__name__),
BECClassReference(name="NewPluginWidget", module=plugin_module.__name__),
],
),
patch(
"bec_widgets.cli.rpc.rpc_widget_handler.get_custom_class_references",
return_value=[BECClassReference(name="SharedWidget", module=builtin_module.__name__)],
),
):
handler = RPCWidgetHandler()
assert handler.widget_classes["SharedWidget"].__module__ == builtin_module.__name__
assert handler.widget_classes["NewPluginWidget"].__module__ == plugin_module.__name__
@patch(
"bec_widgets.utils.rpc_widget_handler.get_all_plugin_widgets",
return_value=BECClassContainer(
[
BECClassInfo(name="DeviceComboBox", obj=_TestPluginWidget, module="", file=""),
BECClassInfo(name="NewPluginWidget", obj=_TestPluginWidget, module="", file=""),
]
),
)
def test_duplicate_plugins_not_allowed(_):
handler = RPCWidgetHandler()
assert handler.widget_classes["DeviceComboBox"] is not _TestPluginWidget
assert handler.widget_classes["NewPluginWidget"] is _TestPluginWidget
+3 -1
View File
@@ -9,7 +9,7 @@ from qtpy.QtCore import QModelIndex, Qt
from bec_widgets.utils.forms_from_types.items import StrFormItem
from bec_widgets.utils.widget_io import WidgetIO
from bec_widgets.widgets.control.scan_control.scan_control import ScanControl
from bec_widgets.widgets.control.scan_control import ScanControl
from .client_mocks import mocked_client
@@ -501,6 +501,7 @@ def test_changing_scans_remember_parameters(scan_control, mocked_client):
assert grid_kwargs["burst_at_each_point"] == kwargs["burst_at_each_point"]
@pytest.mark.skip(reason="Unreliable - GH issue #1134")
def test_get_scan_parameters_from_redis(scan_control, mocked_client):
scan_name = "line_scan"
scan_control.comboBox_scan_selection.setCurrentText(scan_name)
@@ -585,6 +586,7 @@ def test_scan_metadata_is_passed_to_scan_function(scan_control: ScanControl):
scans.grid_scan.assert_called_once_with(metadata=TEST_MD)
@pytest.mark.skip(reason="Unreliable - GH issue #1134")
def test_restore_parameters_with_fewer_arg_bundles(scan_control, qtbot):
"""
Ensure that when more argument bundles are present than exist in the

Some files were not shown because too many files have changed in this diff Show More