mirror of
https://github.com/bec-project/bec_widgets.git
synced 2026-04-18 06:15:37 +02:00
Compare commits
3 Commits
ci/benchma
...
feature/sc
| Author | SHA1 | Date | |
|---|---|---|---|
| 8d2663c09b | |||
| bf5fc6460b | |||
| 50d5c0460f |
2
.github/actions/bw_install/action.yml
vendored
2
.github/actions/bw_install/action.yml
vendored
@@ -62,4 +62,4 @@ runs:
|
||||
uv pip install --system -e ./ophyd_devices
|
||||
uv pip install --system -e ./bec/bec_lib[dev]
|
||||
uv pip install --system -e ./bec/bec_ipython_client
|
||||
uv pip install --system -e ./bec_widgets[dev,qtermwidget]
|
||||
uv pip install --system -e ./bec_widgets[dev,pyside6]
|
||||
|
||||
166
.github/scripts/aggregate_benchmarks.py
vendored
166
.github/scripts/aggregate_benchmarks.py
vendored
@@ -1,166 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Aggregate and merge benchmark JSON files.
|
||||
|
||||
The workflow runs the same benchmark suite on multiple independent runners.
|
||||
This script reads every JSON file produced by those attempts, normalizes the
|
||||
contained benchmark values, and writes a compact mapping JSON where each value is
|
||||
the median across attempts. It can also merge independent hyperfine JSON files
|
||||
from one runner into a single hyperfine-style JSON file.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import statistics
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from compare_benchmarks import Benchmark, extract_benchmarks
|
||||
|
||||
|
||||
def collect_benchmarks(paths: list[Path]) -> dict[str, list[Benchmark]]:
|
||||
"""Collect benchmarks from multiple JSON files.
|
||||
|
||||
Args:
|
||||
paths (list[Path]): Paths to hyperfine, pytest-benchmark, or compact
|
||||
mapping JSON files.
|
||||
|
||||
Returns:
|
||||
dict[str, list[Benchmark]]: Benchmarks grouped by benchmark name.
|
||||
"""
|
||||
|
||||
collected: dict[str, list[Benchmark]] = {}
|
||||
for path in paths:
|
||||
for name, benchmark in extract_benchmarks(path).items():
|
||||
collected.setdefault(name, []).append(benchmark)
|
||||
return collected
|
||||
|
||||
|
||||
def aggregate(collected: dict[str, list[Benchmark]]) -> dict[str, dict[str, object]]:
|
||||
"""Aggregate grouped benchmarks using the median value.
|
||||
|
||||
Args:
|
||||
collected (dict[str, list[Benchmark]]): Benchmarks grouped by benchmark
|
||||
name.
|
||||
|
||||
Returns:
|
||||
dict[str, dict[str, object]]: Compact mapping JSON data. Each benchmark
|
||||
contains ``value``, ``unit``, ``metric``, ``attempts``, and
|
||||
``attempt_values``.
|
||||
"""
|
||||
|
||||
aggregated: dict[str, dict[str, object]] = {}
|
||||
for name, benchmarks in sorted(collected.items()):
|
||||
values = [benchmark.value for benchmark in benchmarks]
|
||||
unit = next((benchmark.unit for benchmark in benchmarks if benchmark.unit), "")
|
||||
metric = next((benchmark.metric for benchmark in benchmarks if benchmark.metric), "value")
|
||||
aggregated[name] = {
|
||||
"value": statistics.median(values),
|
||||
"unit": unit,
|
||||
"metric": f"median-of-attempt-{metric}",
|
||||
"attempts": len(values),
|
||||
"attempt_values": values,
|
||||
}
|
||||
return aggregated
|
||||
|
||||
|
||||
def merge_hyperfine_results(paths: list[Path]) -> dict[str, Any]:
|
||||
"""Merge hyperfine result files.
|
||||
|
||||
Args:
|
||||
paths (list[Path]): Hyperfine JSON files to merge.
|
||||
|
||||
Returns:
|
||||
dict[str, Any]: Hyperfine-style JSON object containing all result rows.
|
||||
|
||||
Raises:
|
||||
ValueError: If any file has no hyperfine ``results`` list.
|
||||
"""
|
||||
|
||||
merged: dict[str, Any] = {"results": []}
|
||||
for path in paths:
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
results = data.get("results", []) if isinstance(data, dict) else None
|
||||
if not isinstance(results, list):
|
||||
raise ValueError(f"{path} has no hyperfine results list")
|
||||
merged["results"].extend(results)
|
||||
return merged
|
||||
|
||||
|
||||
def main_from_paths(input_dir: Path, output: Path) -> int:
|
||||
"""Aggregate all JSON files in a directory and write the result.
|
||||
|
||||
Args:
|
||||
input_dir (Path): Directory containing benchmark JSON files.
|
||||
output (Path): Path where the aggregate JSON should be written.
|
||||
|
||||
Returns:
|
||||
int: Always ``0`` on success.
|
||||
|
||||
Raises:
|
||||
ValueError: If no JSON files are found in ``input_dir``.
|
||||
"""
|
||||
|
||||
paths = sorted(input_dir.rglob("*.json"))
|
||||
if not paths:
|
||||
raise ValueError(f"No benchmark JSON files found in {input_dir}")
|
||||
|
||||
output.parent.mkdir(parents=True, exist_ok=True)
|
||||
output.write_text(
|
||||
json.dumps(aggregate(collect_benchmarks(paths)), indent=2, sort_keys=True) + "\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
def merge_from_paths(input_dir: Path, output: Path) -> int:
|
||||
"""Merge all hyperfine JSON files in a directory and write the result.
|
||||
|
||||
Args:
|
||||
input_dir (Path): Directory containing hyperfine JSON files.
|
||||
output (Path): Path where the merged JSON should be written.
|
||||
|
||||
Returns:
|
||||
int: Always ``0`` on success.
|
||||
|
||||
Raises:
|
||||
ValueError: If no JSON files are found in ``input_dir``.
|
||||
"""
|
||||
|
||||
paths = sorted(input_dir.glob("*.json"))
|
||||
if not paths:
|
||||
raise ValueError(f"No hyperfine JSON files found in {input_dir}")
|
||||
|
||||
output.parent.mkdir(parents=True, exist_ok=True)
|
||||
output.write_text(
|
||||
json.dumps(merge_hyperfine_results(paths), indent=2, sort_keys=True) + "\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
def main() -> int:
|
||||
"""Run the benchmark aggregation command line interface.
|
||||
|
||||
Returns:
|
||||
int: Always ``0`` on success.
|
||||
"""
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"--mode",
|
||||
choices=("aggregate", "merge-hyperfine"),
|
||||
default="aggregate",
|
||||
help="Operation to perform.",
|
||||
)
|
||||
parser.add_argument("--input-dir", required=True, type=Path)
|
||||
parser.add_argument("--output", required=True, type=Path)
|
||||
args = parser.parse_args()
|
||||
if args.mode == "merge-hyperfine":
|
||||
return merge_from_paths(input_dir=args.input_dir, output=args.output)
|
||||
return main_from_paths(input_dir=args.input_dir, output=args.output)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
411
.github/scripts/compare_benchmarks.py
vendored
411
.github/scripts/compare_benchmarks.py
vendored
@@ -1,411 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Compare benchmark JSON files and write a GitHub Actions summary.
|
||||
|
||||
The script supports JSON emitted by hyperfine, JSON emitted by pytest-benchmark,
|
||||
and a compact mapping format generated by ``aggregate_benchmarks.py``. Timing
|
||||
formats prefer median values and fall back to mean values when median values are
|
||||
not present.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import math
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Benchmark:
|
||||
"""Normalized benchmark result.
|
||||
|
||||
Attributes:
|
||||
name (str): Stable benchmark name used to match baseline and current results.
|
||||
value (float): Numeric benchmark value used for comparison.
|
||||
unit (str): Display unit for the value, for example ``"s"``.
|
||||
metric (str): Source metric name, for example ``"median"`` or ``"mean"``.
|
||||
"""
|
||||
|
||||
name: str
|
||||
value: float
|
||||
unit: str
|
||||
metric: str = "value"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Comparison:
|
||||
"""Comparison between one baseline benchmark and one current benchmark.
|
||||
|
||||
Attributes:
|
||||
name (str): Benchmark name.
|
||||
baseline (float): Baseline benchmark value.
|
||||
current (float): Current benchmark value.
|
||||
delta_percent (float): Percent change from baseline to current.
|
||||
unit (str): Display unit for both values.
|
||||
metric (str): Current result metric used for comparison.
|
||||
regressed (bool): Whether the change exceeds the configured threshold.
|
||||
"""
|
||||
|
||||
name: str
|
||||
baseline: float
|
||||
current: float
|
||||
delta_percent: float
|
||||
unit: str
|
||||
metric: str
|
||||
regressed: bool
|
||||
|
||||
|
||||
def _read_json(path: Path) -> Any:
|
||||
"""Read JSON data from a file.
|
||||
|
||||
Args:
|
||||
path (Path): Path to the JSON file.
|
||||
|
||||
Returns:
|
||||
Any: Parsed JSON value.
|
||||
"""
|
||||
|
||||
with path.open("r", encoding="utf-8") as stream:
|
||||
return json.load(stream)
|
||||
|
||||
|
||||
def _as_float(value: Any) -> float | None:
|
||||
"""Convert a value to a finite float.
|
||||
|
||||
Args:
|
||||
value (Any): Value to convert.
|
||||
|
||||
Returns:
|
||||
float | None: Converted finite float, or ``None`` if conversion fails.
|
||||
"""
|
||||
|
||||
try:
|
||||
result = float(value)
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
if math.isfinite(result):
|
||||
return result
|
||||
return None
|
||||
|
||||
|
||||
def _extract_hyperfine(data: dict[str, Any]) -> dict[str, Benchmark]:
|
||||
"""Extract normalized benchmarks from hyperfine JSON.
|
||||
|
||||
Args:
|
||||
data (dict[str, Any]): Parsed hyperfine JSON object.
|
||||
|
||||
Returns:
|
||||
dict[str, Benchmark]: Benchmarks keyed by command name.
|
||||
"""
|
||||
|
||||
benchmarks: dict[str, Benchmark] = {}
|
||||
for result in data.get("results", []):
|
||||
if not isinstance(result, dict):
|
||||
continue
|
||||
name = str(result.get("command") or result.get("name") or "").strip()
|
||||
metric = "median"
|
||||
value = _as_float(result.get(metric))
|
||||
if value is None:
|
||||
metric = "mean"
|
||||
value = _as_float(result.get(metric))
|
||||
if name and value is not None:
|
||||
benchmarks[name] = Benchmark(name=name, value=value, unit="s", metric=metric)
|
||||
return benchmarks
|
||||
|
||||
|
||||
def _extract_pytest_benchmark(data: dict[str, Any]) -> dict[str, Benchmark]:
|
||||
"""Extract normalized benchmarks from pytest-benchmark JSON.
|
||||
|
||||
Args:
|
||||
data (dict[str, Any]): Parsed pytest-benchmark JSON object.
|
||||
|
||||
Returns:
|
||||
dict[str, Benchmark]: Benchmarks keyed by full benchmark name.
|
||||
"""
|
||||
|
||||
benchmarks: dict[str, Benchmark] = {}
|
||||
for benchmark in data.get("benchmarks", []):
|
||||
if not isinstance(benchmark, dict):
|
||||
continue
|
||||
|
||||
name = str(benchmark.get("fullname") or benchmark.get("name") or "").strip()
|
||||
stats = benchmark.get("stats", {})
|
||||
value = None
|
||||
metric = "median"
|
||||
if isinstance(stats, dict):
|
||||
value = _as_float(stats.get(metric))
|
||||
if value is None:
|
||||
metric = "mean"
|
||||
value = _as_float(stats.get(metric))
|
||||
if name and value is not None:
|
||||
benchmarks[name] = Benchmark(name=name, value=value, unit="s", metric=metric)
|
||||
return benchmarks
|
||||
|
||||
|
||||
def _extract_simple_mapping(data: dict[str, Any]) -> dict[str, Benchmark]:
|
||||
"""Extract normalized benchmarks from a compact mapping JSON object.
|
||||
|
||||
Args:
|
||||
data (dict[str, Any]): Parsed mapping where each benchmark is either a
|
||||
raw number or an object containing ``value``, ``unit``, and ``metric``.
|
||||
|
||||
Returns:
|
||||
dict[str, Benchmark]: Benchmarks keyed by mapping key.
|
||||
"""
|
||||
|
||||
benchmarks: dict[str, Benchmark] = {}
|
||||
|
||||
for name, raw_value in data.items():
|
||||
if name in {"version", "context", "commit", "timestamp"}:
|
||||
continue
|
||||
|
||||
value = _as_float(raw_value)
|
||||
unit = ""
|
||||
metric = "value"
|
||||
if value is None and isinstance(raw_value, dict):
|
||||
value = _as_float(raw_value.get("value"))
|
||||
unit = str(raw_value.get("unit") or "")
|
||||
metric = str(raw_value.get("metric") or "value")
|
||||
|
||||
if value is not None:
|
||||
benchmarks[str(name)] = Benchmark(name=str(name), value=value, unit=unit, metric=metric)
|
||||
|
||||
return benchmarks
|
||||
|
||||
|
||||
def extract_benchmarks(path: Path) -> dict[str, Benchmark]:
|
||||
"""Extract normalized benchmarks from a supported JSON file.
|
||||
|
||||
Args:
|
||||
path (Path): Path to a hyperfine, pytest-benchmark, or compact mapping
|
||||
JSON file.
|
||||
|
||||
Returns:
|
||||
dict[str, Benchmark]: Normalized benchmarks keyed by name.
|
||||
|
||||
Raises:
|
||||
ValueError: If the JSON root is not an object or no supported benchmark
|
||||
entries can be extracted.
|
||||
"""
|
||||
|
||||
data = _read_json(path)
|
||||
if not isinstance(data, dict):
|
||||
raise ValueError(f"{path} must contain a JSON object")
|
||||
|
||||
extractors = (_extract_hyperfine, _extract_pytest_benchmark, _extract_simple_mapping)
|
||||
for extractor in extractors:
|
||||
benchmarks = extractor(data)
|
||||
if benchmarks:
|
||||
return benchmarks
|
||||
|
||||
raise ValueError(f"No supported benchmark entries found in {path}")
|
||||
|
||||
|
||||
def compare_benchmarks(
|
||||
baseline: dict[str, Benchmark],
|
||||
current: dict[str, Benchmark],
|
||||
threshold_percent: float,
|
||||
higher_is_better: bool,
|
||||
) -> tuple[list[Comparison], list[str], list[str]]:
|
||||
"""Compare baseline benchmarks with current benchmarks.
|
||||
|
||||
Args:
|
||||
baseline (dict[str, Benchmark]): Baseline benchmarks keyed by name.
|
||||
current (dict[str, Benchmark]): Current benchmarks keyed by name.
|
||||
threshold_percent (float): Regression threshold in percent.
|
||||
higher_is_better (bool): If ``True``, lower current values are treated as
|
||||
regressions. If ``False``, higher current values are treated as
|
||||
regressions.
|
||||
|
||||
Returns:
|
||||
tuple[list[Comparison], list[str], list[str]]: Comparisons for common
|
||||
benchmark names, names missing from current results, and names newly
|
||||
present in current results.
|
||||
"""
|
||||
|
||||
comparisons: list[Comparison] = []
|
||||
missing_in_current: list[str] = []
|
||||
new_in_current: list[str] = []
|
||||
|
||||
for name, baseline_benchmark in sorted(baseline.items()):
|
||||
current_benchmark = current.get(name)
|
||||
if current_benchmark is None:
|
||||
missing_in_current.append(name)
|
||||
continue
|
||||
|
||||
if baseline_benchmark.value == 0:
|
||||
delta_percent = 0.0
|
||||
else:
|
||||
delta_percent = (
|
||||
(current_benchmark.value - baseline_benchmark.value)
|
||||
/ abs(baseline_benchmark.value)
|
||||
* 100
|
||||
)
|
||||
|
||||
if higher_is_better:
|
||||
regressed = delta_percent <= -threshold_percent
|
||||
else:
|
||||
regressed = delta_percent >= threshold_percent
|
||||
|
||||
comparisons.append(
|
||||
Comparison(
|
||||
name=name,
|
||||
baseline=baseline_benchmark.value,
|
||||
current=current_benchmark.value,
|
||||
delta_percent=delta_percent,
|
||||
unit=current_benchmark.unit or baseline_benchmark.unit,
|
||||
metric=current_benchmark.metric,
|
||||
regressed=regressed,
|
||||
)
|
||||
)
|
||||
|
||||
for name in sorted(set(current) - set(baseline)):
|
||||
new_in_current.append(name)
|
||||
|
||||
return comparisons, missing_in_current, new_in_current
|
||||
|
||||
|
||||
def _format_value(value: float, unit: str) -> str:
|
||||
"""Format a benchmark value for Markdown output.
|
||||
|
||||
Args:
|
||||
value (float): Numeric benchmark value.
|
||||
unit (str): Display unit.
|
||||
|
||||
Returns:
|
||||
str: Formatted value with optional unit suffix.
|
||||
"""
|
||||
|
||||
suffix = f" {unit}" if unit else ""
|
||||
return f"{value:.6g}{suffix}"
|
||||
|
||||
|
||||
def write_summary(
|
||||
path: Path,
|
||||
comparisons: list[Comparison],
|
||||
missing_in_current: list[str],
|
||||
new_in_current: list[str],
|
||||
threshold_percent: float,
|
||||
higher_is_better: bool,
|
||||
) -> None:
|
||||
"""Write a Markdown benchmark comparison summary.
|
||||
|
||||
Args:
|
||||
path (Path): Path where the summary should be written.
|
||||
comparisons (list[Comparison]): Comparison rows for matching benchmarks.
|
||||
missing_in_current (list[str]): Baseline benchmark names missing from the
|
||||
current result.
|
||||
new_in_current (list[str]): Current benchmark names not present in the
|
||||
baseline result.
|
||||
threshold_percent (float): Regression threshold in percent.
|
||||
higher_is_better (bool): Whether higher benchmark values are considered
|
||||
better.
|
||||
"""
|
||||
|
||||
regressions = [comparison for comparison in comparisons if comparison.regressed]
|
||||
direction = "higher is better" if higher_is_better else "lower is better"
|
||||
sorted_comparisons = sorted(comparisons, key=lambda comparison: comparison.name)
|
||||
|
||||
lines = [
|
||||
"<!-- bw-benchmark-comment -->",
|
||||
"## Benchmark comparison",
|
||||
"",
|
||||
f"Threshold: {threshold_percent:g}% ({direction}).",
|
||||
]
|
||||
lines.append("")
|
||||
|
||||
if regressions:
|
||||
lines.extend(
|
||||
[
|
||||
f"{len(regressions)} benchmark(s) regressed beyond the configured threshold.",
|
||||
"",
|
||||
"| Benchmark | Baseline | Current | Change |",
|
||||
"| --- | ---: | ---: | ---: |",
|
||||
]
|
||||
)
|
||||
for comparison in regressions:
|
||||
lines.append(
|
||||
"| "
|
||||
f"{comparison.name} | "
|
||||
f"{_format_value(comparison.baseline, comparison.unit)} | "
|
||||
f"{_format_value(comparison.current, comparison.unit)} | "
|
||||
f"{comparison.delta_percent:+.2f}% |"
|
||||
)
|
||||
else:
|
||||
lines.append("No benchmark regression exceeded the configured threshold.")
|
||||
|
||||
if sorted_comparisons:
|
||||
lines.extend(
|
||||
[
|
||||
"",
|
||||
"<details>",
|
||||
"<summary>All benchmark results</summary>",
|
||||
"",
|
||||
"| Benchmark | Baseline | Current | Change | Status |",
|
||||
"| --- | ---: | ---: | ---: | --- |",
|
||||
]
|
||||
)
|
||||
for comparison in sorted_comparisons:
|
||||
status = "regressed" if comparison.regressed else "ok"
|
||||
lines.append(
|
||||
"| "
|
||||
f"{comparison.name} | "
|
||||
f"{_format_value(comparison.baseline, comparison.unit)} | "
|
||||
f"{_format_value(comparison.current, comparison.unit)} | "
|
||||
f"{comparison.delta_percent:+.2f}% | "
|
||||
f"{status} |"
|
||||
)
|
||||
lines.extend(["", "</details>"])
|
||||
|
||||
if missing_in_current:
|
||||
lines.extend(["", "Missing benchmarks in the current run:"])
|
||||
lines.extend(f"- `{name}`" for name in missing_in_current)
|
||||
|
||||
if new_in_current:
|
||||
lines.extend(["", "New benchmarks in the current run:"])
|
||||
lines.extend(f"- `{name}`" for name in new_in_current)
|
||||
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
||||
|
||||
|
||||
def main() -> int:
|
||||
"""Run the benchmark comparison command line interface.
|
||||
|
||||
Returns:
|
||||
int: ``1`` when a regression exceeds the threshold, otherwise ``0``.
|
||||
"""
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--baseline", required=True, type=Path)
|
||||
parser.add_argument("--current", required=True, type=Path)
|
||||
parser.add_argument("--summary", required=True, type=Path)
|
||||
parser.add_argument("--threshold-percent", required=True, type=float)
|
||||
parser.add_argument("--higher-is-better", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
baseline = extract_benchmarks(args.baseline)
|
||||
current = extract_benchmarks(args.current)
|
||||
comparisons, missing_in_current, new_in_current = compare_benchmarks(
|
||||
baseline=baseline,
|
||||
current=current,
|
||||
threshold_percent=args.threshold_percent,
|
||||
higher_is_better=args.higher_is_better,
|
||||
)
|
||||
|
||||
write_summary(
|
||||
path=args.summary,
|
||||
comparisons=comparisons,
|
||||
missing_in_current=missing_in_current,
|
||||
new_in_current=new_in_current,
|
||||
threshold_percent=args.threshold_percent,
|
||||
higher_is_better=args.higher_is_better,
|
||||
)
|
||||
|
||||
return 1 if any(comparison.regressed for comparison in comparisons) else 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
69
.github/scripts/run_benchmarks.sh
vendored
69
.github/scripts/run_benchmarks.sh
vendored
@@ -1,69 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
mkdir -p benchmark-results
|
||||
benchmark_json="${BENCHMARK_JSON:-benchmark-results/current.json}"
|
||||
benchmark_root="$(dirname "$benchmark_json")"
|
||||
hyperfine_benchmark_dir="${BENCHMARK_HYPERFINE_DIR:-tests/benchmarks/hyperfine}"
|
||||
pytest_benchmark_dirs="${BENCHMARK_PYTEST_DIRS:-${BENCHMARK_PYTEST_DIR:-}}"
|
||||
benchmark_work_dir="$benchmark_root/raw-results"
|
||||
hyperfine_json_dir="$benchmark_work_dir/hyperfine"
|
||||
pytest_json="$benchmark_work_dir/pytest.json"
|
||||
|
||||
shopt -s nullglob
|
||||
benchmark_scripts=()
|
||||
benchmark_scripts=("$hyperfine_benchmark_dir"/benchmark_*.sh)
|
||||
shopt -u nullglob
|
||||
|
||||
pytest_dirs=()
|
||||
for pytest_benchmark_dir in $pytest_benchmark_dirs; do
|
||||
if [ -d "$pytest_benchmark_dir" ]; then
|
||||
pytest_dirs+=("$pytest_benchmark_dir")
|
||||
else
|
||||
echo "Pytest benchmark directory not found: $pytest_benchmark_dir" >&2
|
||||
exit 1
|
||||
fi
|
||||
done
|
||||
|
||||
if [ "${#benchmark_scripts[@]}" -eq 0 ] && [ "${#pytest_dirs[@]}" -eq 0 ]; then
|
||||
echo "No benchmark scripts or pytest benchmarks found" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "Benchmark Python: $(command -v python)"
|
||||
python -c 'import sys; print(sys.version)'
|
||||
|
||||
rm -rf "$benchmark_work_dir"
|
||||
mkdir -p "$hyperfine_json_dir"
|
||||
|
||||
if [ "${#benchmark_scripts[@]}" -gt 0 ]; then
|
||||
for benchmark_script in "${benchmark_scripts[@]}"; do
|
||||
title="$(sed -n 's/^# BENCHMARK_TITLE:[[:space:]]*//p' "$benchmark_script" | head -n 1)"
|
||||
if [ -z "$title" ]; then
|
||||
title="$(basename "$benchmark_script" .sh)"
|
||||
fi
|
||||
benchmark_name="$(basename "$benchmark_script" .sh)"
|
||||
benchmark_result_json="$hyperfine_json_dir/$benchmark_name.json"
|
||||
echo "Preflight benchmark script: $benchmark_script"
|
||||
bash "$benchmark_script"
|
||||
|
||||
hyperfine \
|
||||
--show-output \
|
||||
--warmup 1 \
|
||||
--runs 5 \
|
||||
--command-name "$title" \
|
||||
--export-json "$benchmark_result_json" \
|
||||
"bash $(printf "%q" "$benchmark_script")"
|
||||
done
|
||||
fi
|
||||
|
||||
if [ "${#pytest_dirs[@]}" -gt 0 ]; then
|
||||
pytest \
|
||||
-q "${pytest_dirs[@]}" \
|
||||
--benchmark-only \
|
||||
--benchmark-json "$pytest_json"
|
||||
fi
|
||||
|
||||
python .github/scripts/aggregate_benchmarks.py \
|
||||
--input-dir "$benchmark_work_dir" \
|
||||
--output "$benchmark_json"
|
||||
122
.github/scripts/run_with_bec_servers.py
vendored
122
.github/scripts/run_with_bec_servers.py
vendored
@@ -1,122 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Run a command with BEC e2e services available."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import bec_lib
|
||||
from bec_ipython_client import BECIPythonClient
|
||||
from bec_lib.redis_connector import RedisConnector
|
||||
from bec_lib.service_config import ServiceConfig, ServiceConfigModel
|
||||
from redis import Redis
|
||||
|
||||
|
||||
def _wait_for_redis(host: str, port: int) -> None:
|
||||
client = Redis(host=host, port=port)
|
||||
deadline = time.monotonic() + 10
|
||||
while time.monotonic() < deadline:
|
||||
try:
|
||||
if client.ping():
|
||||
return
|
||||
except Exception:
|
||||
time.sleep(0.1)
|
||||
raise RuntimeError(f"Redis did not start on {host}:{port}")
|
||||
|
||||
|
||||
def _start_redis(files_path: Path, host: str, port: int) -> subprocess.Popen:
|
||||
redis_server = shutil.which("redis-server")
|
||||
if redis_server is None:
|
||||
raise RuntimeError("redis-server executable not found")
|
||||
|
||||
return subprocess.Popen(
|
||||
[
|
||||
redis_server,
|
||||
"--bind",
|
||||
host,
|
||||
"--port",
|
||||
str(port),
|
||||
"--save",
|
||||
"",
|
||||
"--appendonly",
|
||||
"no",
|
||||
"--dir",
|
||||
str(files_path),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def _write_configs(files_path: Path, host: str, port: int) -> Path:
|
||||
test_config = files_path / "test_config.yaml"
|
||||
services_config = files_path / "services_config.yaml"
|
||||
|
||||
bec_lib_path = Path(bec_lib.__file__).resolve().parent
|
||||
shutil.copyfile(bec_lib_path / "tests" / "test_config.yaml", test_config)
|
||||
|
||||
service_config = ServiceConfigModel(
|
||||
redis={"host": host, "port": port}, file_writer={"base_path": str(files_path)}
|
||||
)
|
||||
services_config.write_text(service_config.model_dump_json(indent=4), encoding="utf-8")
|
||||
return services_config
|
||||
|
||||
|
||||
def _load_demo_config(services_config: Path) -> None:
|
||||
bec = BECIPythonClient(ServiceConfig(services_config), RedisConnector, forced=True)
|
||||
bec.start()
|
||||
try:
|
||||
bec.config.load_demo_config()
|
||||
finally:
|
||||
bec.shutdown()
|
||||
bec._client._reset_singleton()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("command", nargs=argparse.REMAINDER)
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.command[:1] == ["--"]:
|
||||
args.command = args.command[1:]
|
||||
if not args.command:
|
||||
raise ValueError("No command provided")
|
||||
|
||||
host = "127.0.0.1"
|
||||
port = 6379
|
||||
|
||||
with tempfile.TemporaryDirectory(prefix="bec-benchmark-") as tmp:
|
||||
files_path = Path(tmp)
|
||||
services_config = _write_configs(files_path, host, port)
|
||||
redis_process = _start_redis(files_path, host, port)
|
||||
processes = None
|
||||
service_handler = None
|
||||
try:
|
||||
_wait_for_redis(host, port)
|
||||
|
||||
from bec_server.bec_server_utils.service_handler import ServiceHandler
|
||||
|
||||
service_handler = ServiceHandler(
|
||||
bec_path=files_path, config_path=services_config, interface="subprocess"
|
||||
)
|
||||
processes = service_handler.start()
|
||||
_load_demo_config(services_config)
|
||||
|
||||
env = os.environ.copy()
|
||||
return subprocess.run(args.command, env=env, check=False).returncode
|
||||
finally:
|
||||
if service_handler is not None and processes is not None:
|
||||
service_handler.stop(processes)
|
||||
redis_process.terminate()
|
||||
try:
|
||||
redis_process.wait(timeout=10)
|
||||
except subprocess.TimeoutExpired:
|
||||
redis_process.kill()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
239
.github/workflows/benchmark.yml
vendored
239
.github/workflows/benchmark.yml
vendored
@@ -1,239 +0,0 @@
|
||||
name: BW Benchmarks
|
||||
|
||||
on: [workflow_call]
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
env:
|
||||
BENCHMARK_JSON: benchmark-results/current.json
|
||||
BENCHMARK_BASELINE_JSON: gh-pages-benchmark-data/benchmarks/latest.json
|
||||
BENCHMARK_SUMMARY: benchmark-results/summary.md
|
||||
BENCHMARK_COMMAND: "bash .github/scripts/run_benchmarks.sh"
|
||||
BENCHMARK_THRESHOLD_PERCENT: 10
|
||||
BENCHMARK_HIGHER_IS_BETTER: false
|
||||
|
||||
jobs:
|
||||
benchmark_attempt:
|
||||
runs-on: ubuntu-latest
|
||||
continue-on-error: true
|
||||
permissions:
|
||||
contents: read
|
||||
defaults:
|
||||
run:
|
||||
shell: bash -el {0}
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
attempt: [1, 2, 3]
|
||||
|
||||
env:
|
||||
BENCHMARK_JSON: benchmark-results/current-${{ matrix.attempt }}.json
|
||||
BEC_CORE_BRANCH: main
|
||||
OPHYD_DEVICES_BRANCH: main
|
||||
PLUGIN_REPO_BRANCH: main
|
||||
BENCHMARK_PYTEST_DIRS: tests/unit_tests/benchmarks
|
||||
QTWEBENGINE_DISABLE_SANDBOX: 1
|
||||
QT_QPA_PLATFORM: "offscreen"
|
||||
|
||||
steps:
|
||||
- name: Checkout BEC Widgets
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: bec-project/bec_widgets
|
||||
ref: ${{ github.event.pull_request.head.sha || github.sha }}
|
||||
|
||||
- name: Set up Conda
|
||||
uses: conda-incubator/setup-miniconda@v3
|
||||
with:
|
||||
auto-update-conda: true
|
||||
auto-activate-base: true
|
||||
python-version: "3.11"
|
||||
|
||||
- name: Install system dependencies
|
||||
run: |
|
||||
sudo apt-get update
|
||||
sudo apt-get install -y libgl1 libegl1 x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb
|
||||
sudo apt-get -y install libnss3 libxdamage1 libasound2t64 libatomic1 libxcursor1
|
||||
sudo apt-get -y install ttyd hyperfine redis-server
|
||||
|
||||
- name: Install full e2e environment
|
||||
run: |
|
||||
echo -e "\033[35;1m Using branch $BEC_CORE_BRANCH of BEC CORE \033[0;m";
|
||||
git clone --branch "$BEC_CORE_BRANCH" https://github.com/bec-project/bec.git
|
||||
echo -e "\033[35;1m Using branch $OPHYD_DEVICES_BRANCH of OPHYD_DEVICES \033[0;m";
|
||||
git clone --branch "$OPHYD_DEVICES_BRANCH" https://github.com/bec-project/ophyd_devices.git
|
||||
export OHPYD_DEVICES_PATH=$PWD/ophyd_devices
|
||||
echo -e "\033[35;1m Using branch $PLUGIN_REPO_BRANCH of bec_testing_plugin \033[0;m";
|
||||
git clone --branch "$PLUGIN_REPO_BRANCH" https://github.com/bec-project/bec_testing_plugin.git
|
||||
cd ./bec
|
||||
conda create -q -n test-environment python=3.11
|
||||
conda activate test-environment
|
||||
source ./bin/install_bec_dev.sh -t
|
||||
cd ../
|
||||
python -m pip install -e ./ophyd_devices -e .[dev,pyside6] -e ./bec_testing_plugin pytest-benchmark
|
||||
|
||||
mkdir -p "$(dirname "$BENCHMARK_JSON")"
|
||||
python .github/scripts/run_with_bec_servers.py -- bash -lc "$BENCHMARK_COMMAND"
|
||||
test -s "$BENCHMARK_JSON"
|
||||
|
||||
- name: Upload benchmark artifact
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: bw-benchmark-json-${{ matrix.attempt }}
|
||||
path: ${{ env.BENCHMARK_JSON }}
|
||||
|
||||
benchmark:
|
||||
needs: [benchmark_attempt]
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
issues: write
|
||||
pull-requests: write
|
||||
|
||||
steps:
|
||||
- name: Checkout BEC Widgets
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: bec-project/bec_widgets
|
||||
ref: ${{ github.event.pull_request.head.sha || github.sha }}
|
||||
|
||||
- name: Download benchmark attempts
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
pattern: bw-benchmark-json-*
|
||||
path: benchmark-results/attempts
|
||||
merge-multiple: true
|
||||
|
||||
- name: Aggregate benchmark attempts
|
||||
run: |
|
||||
python .github/scripts/aggregate_benchmarks.py \
|
||||
--input-dir benchmark-results/attempts \
|
||||
--output "$BENCHMARK_JSON"
|
||||
|
||||
- name: Upload aggregate benchmark artifact
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: bw-benchmark-json
|
||||
path: ${{ env.BENCHMARK_JSON }}
|
||||
|
||||
- name: Fetch gh-pages benchmark data
|
||||
run: |
|
||||
if git ls-remote --exit-code --heads origin gh-pages; then
|
||||
git clone --depth=1 --branch gh-pages "$GITHUB_SERVER_URL/$GITHUB_REPOSITORY.git" gh-pages-benchmark-data
|
||||
else
|
||||
mkdir -p gh-pages-benchmark-data
|
||||
fi
|
||||
|
||||
- name: Compare with latest gh-pages benchmark
|
||||
id: compare
|
||||
continue-on-error: true
|
||||
run: |
|
||||
if [ ! -s "$BENCHMARK_BASELINE_JSON" ]; then
|
||||
mkdir -p "$(dirname "$BENCHMARK_SUMMARY")"
|
||||
{
|
||||
echo "<!-- bw-benchmark-comment -->"
|
||||
echo "## Benchmark comparison"
|
||||
echo
|
||||
echo "No benchmark baseline was found on gh-pages."
|
||||
} > "$BENCHMARK_SUMMARY"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
args=(
|
||||
--baseline "$BENCHMARK_BASELINE_JSON"
|
||||
--current "$BENCHMARK_JSON"
|
||||
--summary "$BENCHMARK_SUMMARY"
|
||||
--threshold-percent "$BENCHMARK_THRESHOLD_PERCENT"
|
||||
)
|
||||
|
||||
if [ "$BENCHMARK_HIGHER_IS_BETTER" = "true" ]; then
|
||||
args+=(--higher-is-better)
|
||||
fi
|
||||
|
||||
set +e
|
||||
python .github/scripts/compare_benchmarks.py "${args[@]}"
|
||||
status=$?
|
||||
set -e
|
||||
|
||||
if [ ! -s "$BENCHMARK_SUMMARY" ]; then
|
||||
mkdir -p "$(dirname "$BENCHMARK_SUMMARY")"
|
||||
{
|
||||
echo "<!-- bw-benchmark-comment -->"
|
||||
echo "## Benchmark comparison"
|
||||
echo
|
||||
echo "Benchmark comparison failed before writing a summary."
|
||||
} > "$BENCHMARK_SUMMARY"
|
||||
fi
|
||||
|
||||
exit "$status"
|
||||
|
||||
- name: Find existing benchmark PR comment
|
||||
if: github.event_name == 'pull_request'
|
||||
id: fc
|
||||
uses: peter-evans/find-comment@v3
|
||||
with:
|
||||
issue-number: ${{ github.event.pull_request.number }}
|
||||
comment-author: github-actions[bot]
|
||||
body-includes: "<!-- bw-benchmark-comment -->"
|
||||
|
||||
- name: Create or update benchmark PR comment
|
||||
if: github.event_name == 'pull_request'
|
||||
uses: peter-evans/create-or-update-comment@v5
|
||||
with:
|
||||
issue-number: ${{ github.event.pull_request.number }}
|
||||
comment-id: ${{ steps.fc.outputs.comment-id }}
|
||||
body-path: ${{ env.BENCHMARK_SUMMARY }}
|
||||
edit-mode: replace
|
||||
|
||||
- name: Fail on benchmark regression
|
||||
if: github.event_name == 'pull_request' && steps.compare.outcome == 'failure'
|
||||
run: exit 1
|
||||
|
||||
publish:
|
||||
needs: [benchmark]
|
||||
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: write
|
||||
|
||||
steps:
|
||||
- name: Checkout BEC Widgets
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: bec-project/bec_widgets
|
||||
ref: ${{ github.sha }}
|
||||
|
||||
- name: Download aggregate benchmark artifact
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
name: bw-benchmark-json
|
||||
path: .
|
||||
|
||||
- name: Prepare gh-pages for publishing
|
||||
run: |
|
||||
# Clean up any existing worktree/directory
|
||||
if [ -d gh-pages-benchmark-data ]; then
|
||||
git worktree remove gh-pages-benchmark-data --force || rm -rf gh-pages-benchmark-data
|
||||
fi
|
||||
|
||||
if git ls-remote --exit-code --heads origin gh-pages; then
|
||||
git fetch --depth=1 origin gh-pages
|
||||
git worktree add gh-pages-benchmark-data FETCH_HEAD
|
||||
else
|
||||
git worktree add --detach gh-pages-benchmark-data
|
||||
git -C gh-pages-benchmark-data checkout --orphan gh-pages
|
||||
git -C gh-pages-benchmark-data rm -rf .
|
||||
fi
|
||||
|
||||
- name: Publish benchmark data to gh-pages
|
||||
working-directory: gh-pages-benchmark-data
|
||||
run: |
|
||||
mkdir -p benchmarks/history
|
||||
cp "../$BENCHMARK_JSON" benchmarks/latest.json
|
||||
cp "../$BENCHMARK_JSON" "benchmarks/history/${GITHUB_SHA}.json"
|
||||
git config user.name "github-actions[bot]"
|
||||
git config user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||
git add benchmarks/latest.json "benchmarks/history/${GITHUB_SHA}.json"
|
||||
git commit -m "Update BW benchmark data for ${GITHUB_SHA}" || exit 0
|
||||
git push origin HEAD:gh-pages
|
||||
24
.github/workflows/ci.yml
vendored
24
.github/workflows/ci.yml
vendored
@@ -1,19 +1,19 @@
|
||||
name: Full CI
|
||||
on:
|
||||
on:
|
||||
push:
|
||||
pull_request:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
BEC_WIDGETS_BRANCH:
|
||||
description: "Branch of BEC Widgets to install"
|
||||
description: 'Branch of BEC Widgets to install'
|
||||
required: false
|
||||
type: string
|
||||
BEC_CORE_BRANCH:
|
||||
description: "Branch of BEC Core to install"
|
||||
description: 'Branch of BEC Core to install'
|
||||
required: false
|
||||
type: string
|
||||
OPHYD_DEVICES_BRANCH:
|
||||
description: "Branch of Ophyd Devices to install"
|
||||
description: 'Branch of Ophyd Devices to install'
|
||||
required: false
|
||||
type: string
|
||||
|
||||
@@ -23,7 +23,6 @@ concurrency:
|
||||
|
||||
permissions:
|
||||
pull-requests: write
|
||||
contents: read
|
||||
|
||||
jobs:
|
||||
check_pr_status:
|
||||
@@ -34,15 +33,6 @@ jobs:
|
||||
if: needs.check_pr_status.outputs.branch-pr == ''
|
||||
uses: ./.github/workflows/formatter.yml
|
||||
|
||||
benchmark:
|
||||
needs: [check_pr_status]
|
||||
if: needs.check_pr_status.outputs.branch-pr == ''
|
||||
permissions:
|
||||
contents: write
|
||||
issues: write
|
||||
pull-requests: write
|
||||
uses: ./.github/workflows/benchmark.yml
|
||||
|
||||
unit-test:
|
||||
needs: [check_pr_status, formatter]
|
||||
if: needs.check_pr_status.outputs.branch-pr == ''
|
||||
@@ -79,9 +69,9 @@ jobs:
|
||||
uses: ./.github/workflows/child_repos.yml
|
||||
with:
|
||||
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
|
||||
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main'}}
|
||||
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main'}}
|
||||
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
|
||||
|
||||
|
||||
plugin_repos:
|
||||
needs: [check_pr_status, formatter]
|
||||
if: needs.check_pr_status.outputs.branch-pr == ''
|
||||
@@ -91,4 +81,4 @@ jobs:
|
||||
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
|
||||
|
||||
secrets:
|
||||
GH_READ_TOKEN: ${{ secrets.GH_READ_TOKEN }}
|
||||
GH_READ_TOKEN: ${{ secrets.GH_READ_TOKEN }}
|
||||
25
.github/workflows/pytest-matrix.yml
vendored
25
.github/workflows/pytest-matrix.yml
vendored
@@ -1,25 +1,25 @@
|
||||
name: Run Pytest with different Python versions
|
||||
on:
|
||||
on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
pr_number:
|
||||
description: "Pull request number"
|
||||
description: 'Pull request number'
|
||||
required: false
|
||||
type: number
|
||||
BEC_CORE_BRANCH:
|
||||
description: "Branch of BEC Core to install"
|
||||
description: 'Branch of BEC Core to install'
|
||||
required: false
|
||||
default: "main"
|
||||
default: 'main'
|
||||
type: string
|
||||
OPHYD_DEVICES_BRANCH:
|
||||
description: "Branch of Ophyd Devices to install"
|
||||
description: 'Branch of Ophyd Devices to install'
|
||||
required: false
|
||||
default: "main"
|
||||
default: 'main'
|
||||
type: string
|
||||
BEC_WIDGETS_BRANCH:
|
||||
description: "Branch of BEC Widgets to install"
|
||||
description: 'Branch of BEC Widgets to install'
|
||||
required: false
|
||||
default: "main"
|
||||
default: 'main'
|
||||
type: string
|
||||
|
||||
jobs:
|
||||
@@ -30,14 +30,15 @@ jobs:
|
||||
python-version: ["3.11", "3.12", "3.13"]
|
||||
|
||||
env:
|
||||
BEC_WIDGETS_BRANCH: main # Set the branch you want for bec_widgets
|
||||
BEC_CORE_BRANCH: main # Set the branch you want for bec
|
||||
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
|
||||
BEC_WIDGETS_BRANCH: main # Set the branch you want for bec_widgets
|
||||
BEC_CORE_BRANCH: main # Set the branch you want for bec
|
||||
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
|
||||
PROJECT_PATH: ${{ github.repository }}
|
||||
QTWEBENGINE_DISABLE_SANDBOX: 1
|
||||
QT_QPA_PLATFORM: "offscreen"
|
||||
|
||||
steps:
|
||||
|
||||
- name: Checkout BEC Widgets
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
@@ -55,4 +56,4 @@ jobs:
|
||||
- name: Run Pytest
|
||||
run: |
|
||||
pip install pytest pytest-random-order
|
||||
pytest -v --junitxml=report.xml --random-order --ignore=tests/unit_tests/benchmarks ./tests/unit_tests
|
||||
pytest -v --junitxml=report.xml --random-order ./tests/unit_tests
|
||||
|
||||
22
.github/workflows/pytest.yml
vendored
22
.github/workflows/pytest.yml
vendored
@@ -1,30 +1,32 @@
|
||||
name: Run Pytest with Coverage
|
||||
on:
|
||||
on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
pr_number:
|
||||
description: "Pull request number"
|
||||
description: 'Pull request number'
|
||||
required: false
|
||||
type: number
|
||||
BEC_CORE_BRANCH:
|
||||
description: "Branch of BEC Core to install"
|
||||
description: 'Branch of BEC Core to install'
|
||||
required: false
|
||||
default: "main"
|
||||
default: 'main'
|
||||
type: string
|
||||
OPHYD_DEVICES_BRANCH:
|
||||
description: "Branch of Ophyd Devices to install"
|
||||
description: 'Branch of Ophyd Devices to install'
|
||||
required: false
|
||||
default: "main"
|
||||
default: 'main'
|
||||
type: string
|
||||
BEC_WIDGETS_BRANCH:
|
||||
description: "Branch of BEC Widgets to install"
|
||||
description: 'Branch of BEC Widgets to install'
|
||||
required: false
|
||||
default: "main"
|
||||
default: 'main'
|
||||
type: string
|
||||
secrets:
|
||||
CODECOV_TOKEN:
|
||||
required: true
|
||||
|
||||
|
||||
|
||||
permissions:
|
||||
pull-requests: write
|
||||
|
||||
@@ -53,7 +55,7 @@ jobs:
|
||||
|
||||
- name: Run Pytest with Coverage
|
||||
id: coverage
|
||||
run: pytest --random-order --cov=bec_widgets --cov-config=pyproject.toml --cov-branch --cov-report=xml --no-cov-on-fail --ignore=tests/unit_tests/benchmarks tests/unit_tests/
|
||||
run: pytest --random-order --cov=bec_widgets --cov-config=pyproject.toml --cov-branch --cov-report=xml --no-cov-on-fail tests/unit_tests/
|
||||
|
||||
- name: Upload test artifacts
|
||||
uses: actions/upload-artifact@v4
|
||||
@@ -67,4 +69,4 @@ jobs:
|
||||
uses: codecov/codecov-action@v5
|
||||
with:
|
||||
token: ${{ secrets.CODECOV_TOKEN }}
|
||||
slug: bec-project/bec_widgets
|
||||
slug: bec-project/bec_widgets
|
||||
4
.gitignore
vendored
4
.gitignore
vendored
@@ -177,6 +177,4 @@ cython_debug/
|
||||
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
|
||||
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||
#.idea/
|
||||
#
|
||||
tombi.toml
|
||||
#.idea/
|
||||
52
CHANGELOG.md
52
CHANGELOG.md
@@ -1,58 +1,6 @@
|
||||
# CHANGELOG
|
||||
|
||||
|
||||
## v3.5.0 (2026-04-14)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
- Connect signals the correct way around
|
||||
([`f562c61`](https://github.com/bec-project/bec_widgets/commit/f562c61e3cec3387f6821bad74403beeb3436355))
|
||||
|
||||
- Create new bec shell if deleted
|
||||
([`1754e75`](https://github.com/bec-project/bec_widgets/commit/1754e759f0c59f2f4063f661bacd334127326947))
|
||||
|
||||
- Formatting in plugin template
|
||||
([`fa2ef83`](https://github.com/bec-project/bec_widgets/commit/fa2ef83bb9dfeeb4c5fc7cd77168c16101c32693))
|
||||
|
||||
- **bec_console**: Persistent bec session
|
||||
([`9b0ec9d`](https://github.com/bec-project/bec_widgets/commit/9b0ec9dd79ad1adc5d211dd703db7441da965f34))
|
||||
|
||||
### Features
|
||||
|
||||
- Add qtermwidget plugin and replace web term
|
||||
([`02cb393`](https://github.com/bec-project/bec_widgets/commit/02cb393bb086165dc64917b633d5570d02e1a2a9))
|
||||
|
||||
### Refactoring
|
||||
|
||||
- Code cleanup
|
||||
([`bda5d38`](https://github.com/bec-project/bec_widgets/commit/bda5d389651bb2b13734cd31159679e85b1bd583))
|
||||
|
||||
|
||||
## v3.4.4 (2026-04-14)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
- Check duplicate stream sub
|
||||
([`c7de320`](https://github.com/bec-project/bec_widgets/commit/c7de320ca564264a31b84931f553170f25659685))
|
||||
|
||||
- Check for duplicate subscriptions in GUIClient
|
||||
([`37747ba`](https://github.com/bec-project/bec_widgets/commit/37747babda407040333c6bd04646be9a49e0ee81))
|
||||
|
||||
- Make gui client registry callback non static
|
||||
([`32f5d48`](https://github.com/bec-project/bec_widgets/commit/32f5d486d3fc8d41df2668c58932ae982819b285))
|
||||
|
||||
- Remove staticmethod subscription
|
||||
([`0ff1fdc`](https://github.com/bec-project/bec_widgets/commit/0ff1fdc81578eec3ffc5d4030fca7b357a0b4c2f))
|
||||
|
||||
|
||||
## v3.4.3 (2026-04-13)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
- Set OPHYD_CONTROL_LAYER to dummy for tests
|
||||
([`5e84d3b`](https://github.com/bec-project/bec_widgets/commit/5e84d3bec608ae9f2ee6dae67db2e3e1387b1f59))
|
||||
|
||||
|
||||
## v3.4.2 (2026-04-01)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
@@ -16,9 +16,9 @@ from bec_widgets.utils.toolbars.toolbar import ModularToolBar
|
||||
from bec_widgets.widgets.containers.dock_area.basic_dock_area import DockAreaWidget
|
||||
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
|
||||
from bec_widgets.widgets.containers.qt_ads import CDockWidget
|
||||
from bec_widgets.widgets.editors.bec_console.bec_console import BecConsole, BECShell
|
||||
from bec_widgets.widgets.editors.monaco.monaco_dock import MonacoDock
|
||||
from bec_widgets.widgets.editors.monaco.monaco_widget import MonacoWidget
|
||||
from bec_widgets.widgets.editors.web_console.web_console import BECShell, WebConsole
|
||||
from bec_widgets.widgets.utility.ide_explorer.ide_explorer import IDEExplorer
|
||||
|
||||
|
||||
@@ -96,7 +96,7 @@ class DeveloperWidget(DockAreaWidget):
|
||||
|
||||
self.console = BECShell(self, rpc_exposed=False)
|
||||
self.console.setObjectName("BEC Shell")
|
||||
self.terminal = BecConsole(self, rpc_exposed=False)
|
||||
self.terminal = WebConsole(self, rpc_exposed=False)
|
||||
self.terminal.setObjectName("Terminal")
|
||||
self.monaco = MonacoDock(self, rpc_exposed=False, rpc_passthrough_children=False)
|
||||
self.monaco.setObjectName("MonacoEditor")
|
||||
@@ -410,3 +410,23 @@ class DeveloperWidget(DockAreaWidget):
|
||||
"""Clean up resources used by the developer widget."""
|
||||
self.delete_all()
|
||||
return super().cleanup()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
|
||||
from bec_qthemes import apply_theme
|
||||
from qtpy.QtWidgets import QApplication
|
||||
|
||||
from bec_widgets.applications.main_app import BECMainApp
|
||||
|
||||
app = QApplication(sys.argv)
|
||||
apply_theme("dark")
|
||||
|
||||
_app = BECMainApp()
|
||||
_app.show()
|
||||
# developer_view.show()
|
||||
# developer_view.setWindowTitle("Developer View")
|
||||
# developer_view.resize(1920, 1080)
|
||||
# developer_view.set_stretch(horizontal=[1, 3, 2], vertical=[5, 5]) #can be set during runtime
|
||||
sys.exit(app.exec_())
|
||||
|
||||
@@ -32,7 +32,6 @@ _Widgets = {
|
||||
"BECQueue": "BECQueue",
|
||||
"BECShell": "BECShell",
|
||||
"BECStatusBox": "BECStatusBox",
|
||||
"BecConsole": "BecConsole",
|
||||
"DapComboBox": "DapComboBox",
|
||||
"DeviceBrowser": "DeviceBrowser",
|
||||
"Heatmap": "Heatmap",
|
||||
@@ -57,6 +56,7 @@ _Widgets = {
|
||||
"SignalLabel": "SignalLabel",
|
||||
"TextBox": "TextBox",
|
||||
"Waveform": "Waveform",
|
||||
"WebConsole": "WebConsole",
|
||||
"WebsiteWidget": "WebsiteWidget",
|
||||
}
|
||||
|
||||
@@ -506,7 +506,7 @@ class BECQueue(RPCBase):
|
||||
|
||||
|
||||
class BECShell(RPCBase):
|
||||
"""A BecConsole pre-configured to run the BEC shell."""
|
||||
"""A WebConsole pre-configured to run the BEC shell."""
|
||||
|
||||
@rpc_call
|
||||
def remove(self):
|
||||
@@ -691,28 +691,6 @@ class BaseROI(RPCBase):
|
||||
"""
|
||||
|
||||
|
||||
class BecConsole(RPCBase):
|
||||
"""A console widget with access to a shared registry of terminals, such that instances can be moved around."""
|
||||
|
||||
@rpc_call
|
||||
def remove(self):
|
||||
"""
|
||||
Cleanup the BECConnector
|
||||
"""
|
||||
|
||||
@rpc_call
|
||||
def attach(self):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
@rpc_call
|
||||
def detach(self):
|
||||
"""
|
||||
Detach the widget from its parent dock widget (if widget is in the dock), making it a floating widget.
|
||||
"""
|
||||
|
||||
|
||||
class CircularROI(RPCBase):
|
||||
"""Circular Region of Interest with center/diameter tracking and auto-labeling."""
|
||||
|
||||
@@ -6439,6 +6417,28 @@ class WaveformViewPopup(RPCBase):
|
||||
"""
|
||||
|
||||
|
||||
class WebConsole(RPCBase):
|
||||
"""A simple widget to display a website"""
|
||||
|
||||
@rpc_call
|
||||
def remove(self):
|
||||
"""
|
||||
Cleanup the BECConnector
|
||||
"""
|
||||
|
||||
@rpc_call
|
||||
def attach(self):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
@rpc_call
|
||||
def detach(self):
|
||||
"""
|
||||
Detach the widget from its parent dock widget (if widget is in the dock), making it a floating widget.
|
||||
"""
|
||||
|
||||
|
||||
class WebsiteWidget(RPCBase):
|
||||
"""A simple widget to display a website"""
|
||||
|
||||
|
||||
@@ -10,9 +10,9 @@ import threading
|
||||
import time
|
||||
from contextlib import contextmanager
|
||||
from threading import Lock
|
||||
from typing import TYPE_CHECKING, Callable, Literal, TypeAlias, cast
|
||||
from typing import TYPE_CHECKING, Literal, TypeAlias, cast
|
||||
|
||||
from bec_lib.endpoints import EndpointInfo, MessageEndpoints
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
|
||||
from rich.console import Console
|
||||
@@ -232,11 +232,6 @@ class BECGuiClient(RPCBase):
|
||||
"""The launcher object."""
|
||||
return RPCBase(gui_id=f"{self._gui_id}:launcher", parent=self, object_name="launcher")
|
||||
|
||||
def _safe_register_stream(self, endpoint: EndpointInfo, cb: Callable, **kwargs):
|
||||
"""Check if already registered for registration in idempotent functions."""
|
||||
if not self._client.connector.any_stream_is_registered(endpoint, cb=cb):
|
||||
self._client.connector.register(endpoint, cb=cb, **kwargs)
|
||||
|
||||
def connect_to_gui_server(self, gui_id: str) -> None:
|
||||
"""Connect to a GUI server"""
|
||||
# Unregister the old callback
|
||||
@@ -252,9 +247,10 @@ class BECGuiClient(RPCBase):
|
||||
self._ipython_registry = {}
|
||||
|
||||
# Register the new callback
|
||||
self._safe_register_stream(
|
||||
self._client.connector.register(
|
||||
MessageEndpoints.gui_registry_state(self._gui_id),
|
||||
cb=self._handle_registry_update,
|
||||
parent=self,
|
||||
from_start=True,
|
||||
)
|
||||
|
||||
@@ -535,14 +531,20 @@ class BECGuiClient(RPCBase):
|
||||
|
||||
def _start(self, wait: bool = False) -> None:
|
||||
self._killed = False
|
||||
self._safe_register_stream(
|
||||
MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update
|
||||
self._client.connector.register(
|
||||
MessageEndpoints.gui_registry_state(self._gui_id),
|
||||
cb=self._handle_registry_update,
|
||||
parent=self,
|
||||
)
|
||||
return self._start_server(wait=wait)
|
||||
|
||||
def _handle_registry_update(self, msg: dict[str, GUIRegistryStateMessage]) -> None:
|
||||
@staticmethod
|
||||
def _handle_registry_update(
|
||||
msg: dict[str, GUIRegistryStateMessage], parent: BECGuiClient
|
||||
) -> None:
|
||||
# This was causing a deadlock during shutdown, not sure why.
|
||||
# with self._lock:
|
||||
self = parent
|
||||
self._server_registry = cast(dict[str, RegistryState], msg["data"].state)
|
||||
self._update_dynamic_namespace(self._server_registry)
|
||||
|
||||
|
||||
@@ -248,7 +248,9 @@ class RPCBase:
|
||||
self._rpc_response = None
|
||||
self._msg_wait_event.clear()
|
||||
self._client.connector.register(
|
||||
MessageEndpoints.gui_instruction_response(request_id), cb=self._on_rpc_response
|
||||
MessageEndpoints.gui_instruction_response(request_id),
|
||||
cb=self._on_rpc_response,
|
||||
parent=self,
|
||||
)
|
||||
|
||||
self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg)
|
||||
@@ -274,10 +276,11 @@ class RPCBase:
|
||||
self._rpc_response = None
|
||||
return self._create_widget_from_msg_result(msg_result)
|
||||
|
||||
def _on_rpc_response(self, msg_obj: MessageObject) -> None:
|
||||
@staticmethod
|
||||
def _on_rpc_response(msg_obj: MessageObject, parent: RPCBase) -> None:
|
||||
msg = cast(messages.RequestResponseMessage, msg_obj.value)
|
||||
self._rpc_response = msg
|
||||
self._msg_wait_event.set()
|
||||
parent._rpc_response = msg
|
||||
parent._msg_wait_event.set()
|
||||
|
||||
def _create_widget_from_msg_result(self, msg_result):
|
||||
if msg_result is None:
|
||||
|
||||
@@ -175,15 +175,12 @@ class BECDispatcher:
|
||||
cb_info (dict | None): A dictionary containing information about the callback. Defaults to None.
|
||||
"""
|
||||
qt_slot = QtThreadSafeCallback(cb=slot, cb_info=cb_info)
|
||||
if not self.client.connector.any_stream_is_registered(topics, qt_slot):
|
||||
if qt_slot not in self._registered_slots:
|
||||
self._registered_slots[qt_slot] = qt_slot
|
||||
qt_slot = self._registered_slots[qt_slot]
|
||||
self.client.connector.register(topics, cb=qt_slot, **kwargs)
|
||||
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
|
||||
qt_slot.topics.update(set(topics_str))
|
||||
else:
|
||||
logger.warning(f"Attempted to create duplicate stream subscription for {topics=}")
|
||||
if qt_slot not in self._registered_slots:
|
||||
self._registered_slots[qt_slot] = qt_slot
|
||||
qt_slot = self._registered_slots[qt_slot]
|
||||
self.client.connector.register(topics, cb=qt_slot, **kwargs)
|
||||
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
|
||||
qt_slot.topics.update(set(topics_str))
|
||||
|
||||
def disconnect_slot(
|
||||
self, slot: Callable, topics: EndpointInfo | str | list[EndpointInfo] | list[str]
|
||||
|
||||
@@ -22,7 +22,7 @@ class {plugin_name_pascal}Plugin(QDesignerCustomWidgetInterface): # pragma: no
|
||||
|
||||
def createWidget(self, parent):
|
||||
if parent is None:
|
||||
return QWidget()
|
||||
return QWidget()
|
||||
t = {plugin_name_pascal}(parent)
|
||||
return t
|
||||
|
||||
|
||||
@@ -69,7 +69,7 @@ from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow
|
||||
from bec_widgets.widgets.containers.qt_ads import CDockWidget
|
||||
from bec_widgets.widgets.control.device_control.positioner_box import PositionerBox, PositionerBox2D
|
||||
from bec_widgets.widgets.control.scan_control import ScanControl
|
||||
from bec_widgets.widgets.editors.bec_console.bec_console import BecConsole, BECShell
|
||||
from bec_widgets.widgets.editors.web_console.web_console import BECShell, WebConsole
|
||||
from bec_widgets.widgets.plots.heatmap.heatmap import Heatmap
|
||||
from bec_widgets.widgets.plots.image.image import Image
|
||||
from bec_widgets.widgets.plots.motor_map.motor_map import MotorMap
|
||||
@@ -372,7 +372,7 @@ class BECDockArea(DockAreaWidget):
|
||||
"Add Circular ProgressBar",
|
||||
"RingProgressBar",
|
||||
),
|
||||
"terminal": (BecConsole.ICON_NAME, "Add Terminal", "BecConsole"),
|
||||
"terminal": (WebConsole.ICON_NAME, "Add Terminal", "WebConsole"),
|
||||
"bec_shell": (BECShell.ICON_NAME, "Add BEC Shell", "BECShell"),
|
||||
"log_panel": (LogPanel.ICON_NAME, "Add LogPanel - Disabled", "LogPanel"),
|
||||
"sbb_monitor": ("train", "Add SBB Monitor", "SBBMonitor"),
|
||||
|
||||
@@ -25,6 +25,7 @@ from bec_widgets.utils.colors import apply_theme, get_accent_colors
|
||||
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
|
||||
from bec_widgets.widgets.control.buttons.stop_button.stop_button import StopButton
|
||||
from bec_widgets.widgets.control.scan_control.scan_group_box import ScanGroupBox
|
||||
from bec_widgets.widgets.control.scan_control.scan_info_adapter import ScanInfoAdapter
|
||||
from bec_widgets.widgets.editors.scan_metadata.scan_metadata import ScanMetadata
|
||||
from bec_widgets.widgets.utility.toggle.toggle import ToggleSwitch
|
||||
|
||||
@@ -97,6 +98,7 @@ class ScanControl(BECWidget, QWidget):
|
||||
self._hide_scan_control_buttons = False
|
||||
self._hide_metadata = False
|
||||
self._hide_scan_selection_combobox = False
|
||||
self._scan_info_adapter = ScanInfoAdapter()
|
||||
|
||||
# Create and set main layout
|
||||
self._init_UI()
|
||||
@@ -195,7 +197,8 @@ class ScanControl(BECWidget, QWidget):
|
||||
allowed_scans = [
|
||||
scan_name
|
||||
for scan_name, scan_info in self.available_scans.items()
|
||||
if scan_info["base_class"] in supported_scans and len(scan_info["gui_config"]) > 0
|
||||
if scan_info["base_class"] in supported_scans
|
||||
and self._scan_info_adapter.has_scan_ui_config(scan_info)
|
||||
]
|
||||
|
||||
else:
|
||||
@@ -390,14 +393,14 @@ class ScanControl(BECWidget, QWidget):
|
||||
self.reset_layout()
|
||||
selected_scan_info = self.available_scans.get(scan_name, {})
|
||||
|
||||
gui_config = selected_scan_info.get("gui_config", {})
|
||||
self.arg_group = gui_config.get("arg_group", None)
|
||||
self.kwarg_groups = gui_config.get("kwarg_groups", None)
|
||||
gui_config = self._scan_info_adapter.build_scan_ui_config(selected_scan_info)
|
||||
arg_group = gui_config.get("arg_group", None)
|
||||
kwarg_groups = gui_config.get("kwarg_groups", [])
|
||||
|
||||
if bool(self.arg_group["arg_inputs"]):
|
||||
self.add_arg_group(self.arg_group)
|
||||
if len(self.kwarg_groups) > 0:
|
||||
self.add_kwargs_boxes(self.kwarg_groups)
|
||||
if arg_group and bool(arg_group.get("arg_inputs")):
|
||||
self.add_arg_group(arg_group)
|
||||
if kwarg_groups:
|
||||
self.add_kwargs_boxes(kwarg_groups)
|
||||
|
||||
self.update()
|
||||
self.adjustSize()
|
||||
|
||||
@@ -209,6 +209,8 @@ class ScanGroupBox(QGroupBox):
|
||||
|
||||
self.labels = []
|
||||
self.widgets = []
|
||||
self._widget_configs = {}
|
||||
self._column_labels = {}
|
||||
self.selected_devices = {}
|
||||
|
||||
self.init_box(self.config)
|
||||
@@ -247,6 +249,7 @@ class ScanGroupBox(QGroupBox):
|
||||
label = QLabel(text=display_name)
|
||||
self.layout.addWidget(label, row, column_index)
|
||||
self.labels.append(label)
|
||||
self._column_labels[column_index] = label
|
||||
|
||||
def add_input_widgets(self, group_inputs: dict, row) -> None:
|
||||
"""
|
||||
@@ -272,21 +275,30 @@ class ScanGroupBox(QGroupBox):
|
||||
if default == "_empty":
|
||||
default = None
|
||||
widget = widget_class(parent=self.parent(), arg_name=arg_name, default=default)
|
||||
self._apply_numeric_precision(widget, item)
|
||||
self._apply_numeric_limits(widget, item)
|
||||
if isinstance(widget, DeviceLineEdit):
|
||||
widget.set_device_filter(BECDeviceFilter.DEVICE)
|
||||
self.selected_devices[widget] = ""
|
||||
widget.device_selected.connect(self.emit_device_selected)
|
||||
widget.textChanged.connect(
|
||||
lambda text, device_widget=widget: self._handle_device_text_changed(
|
||||
device_widget, text
|
||||
)
|
||||
)
|
||||
if isinstance(widget, ScanLiteralsComboBox):
|
||||
widget.set_literals(item["type"].get("Literal", []))
|
||||
tooltip = item.get("tooltip", None)
|
||||
if tooltip is not None:
|
||||
widget.setToolTip(item["tooltip"])
|
||||
self._widget_configs[widget] = item
|
||||
self._apply_unit_metadata(widget, item)
|
||||
self.layout.addWidget(widget, row, column_index)
|
||||
self.widgets.append(widget)
|
||||
|
||||
@Slot(str)
|
||||
def emit_device_selected(self, device_name):
|
||||
self.selected_devices[self.sender()] = device_name.strip()
|
||||
sender = self.sender()
|
||||
self.selected_devices[sender] = device_name.strip()
|
||||
if isinstance(sender, DeviceLineEdit):
|
||||
self._update_reference_units(sender, self._device_units(sender.get_current_device()))
|
||||
selected_devices_str = " ".join(self.selected_devices.values())
|
||||
self.device_selected.emit(selected_devices_str)
|
||||
|
||||
@@ -313,6 +325,7 @@ class ScanGroupBox(QGroupBox):
|
||||
for widget in self.widgets[-len(self.inputs) :]:
|
||||
if isinstance(widget, DeviceLineEdit):
|
||||
self.selected_devices[widget] = ""
|
||||
self._widget_configs.pop(widget, None)
|
||||
widget.close()
|
||||
widget.deleteLater()
|
||||
self.widgets = self.widgets[: -len(self.inputs)]
|
||||
@@ -325,6 +338,7 @@ class ScanGroupBox(QGroupBox):
|
||||
for widget in list(self.widgets):
|
||||
if isinstance(widget, DeviceLineEdit):
|
||||
self.selected_devices.pop(widget, None)
|
||||
self._widget_configs.pop(widget, None)
|
||||
widget.close()
|
||||
widget.deleteLater()
|
||||
self.layout.removeWidget(widget)
|
||||
@@ -423,3 +437,119 @@ class ScanGroupBox(QGroupBox):
|
||||
if widget.arg_name == key:
|
||||
WidgetIO.set_value(widget, value)
|
||||
break
|
||||
|
||||
@staticmethod
|
||||
def _unit_tooltip(item: dict, units: str | None = None) -> str | None:
|
||||
tooltip = item.get("tooltip", None)
|
||||
reference_units = item.get("reference_units", None)
|
||||
units = units or item.get("units", None)
|
||||
tooltip_parts = [tooltip] if tooltip else []
|
||||
if units:
|
||||
tooltip_parts.append(f"Units: {units}")
|
||||
elif reference_units:
|
||||
tooltip_parts.append(f"Units from: {reference_units}")
|
||||
if tooltip_parts:
|
||||
return "\n".join(tooltip_parts)
|
||||
return None
|
||||
|
||||
def _apply_unit_metadata(self, widget, item: dict, units: str | None = None) -> None:
|
||||
units = units or item.get("units", None)
|
||||
tooltip = self._unit_tooltip(item, units)
|
||||
widget.setToolTip(tooltip or "")
|
||||
if hasattr(widget, "setSuffix"):
|
||||
widget.setSuffix(f" {units}" if units else "")
|
||||
|
||||
def _refresh_column_label(self, column: int, item: dict) -> None:
|
||||
if column not in self._column_labels:
|
||||
return
|
||||
self._column_labels[column].setText(item.get("display_name", item.get("name", None)))
|
||||
|
||||
@staticmethod
|
||||
def _device_units(device) -> str | None:
|
||||
egu = getattr(device, "egu", None)
|
||||
if not callable(egu):
|
||||
return None
|
||||
try:
|
||||
return egu()
|
||||
except Exception:
|
||||
logger.exception("Failed to fetch engineering units from device %s", device)
|
||||
return None
|
||||
|
||||
def _widget_position(self, widget) -> tuple[int, int] | None:
|
||||
for row in range(self.layout.rowCount()):
|
||||
for column in range(self.layout.columnCount()):
|
||||
item = self.layout.itemAtPosition(row, column)
|
||||
if item is not None and item.widget() is widget:
|
||||
return row, column
|
||||
return None
|
||||
|
||||
def _update_reference_units(self, device_widget: DeviceLineEdit, units: str | None) -> None:
|
||||
position = self._widget_position(device_widget)
|
||||
if position is None:
|
||||
return
|
||||
source_row, _ = position
|
||||
source_name = device_widget.arg_name
|
||||
|
||||
for widget in self.widgets:
|
||||
item = self._widget_configs.get(widget, {})
|
||||
if item.get("reference_units") != source_name:
|
||||
continue
|
||||
widget_position = self._widget_position(widget)
|
||||
if widget_position is None:
|
||||
continue
|
||||
row, column = widget_position
|
||||
if self.box_type == "args" and row != source_row:
|
||||
continue
|
||||
self._apply_unit_metadata(widget, item, units)
|
||||
self._refresh_column_label(column, item)
|
||||
|
||||
def _handle_device_text_changed(self, device_widget: DeviceLineEdit, device_name: str) -> None:
|
||||
if not device_widget.validate_device(device_name):
|
||||
self.selected_devices[device_widget] = ""
|
||||
self._update_reference_units(device_widget, None)
|
||||
|
||||
@staticmethod
|
||||
def _apply_numeric_precision(widget: ScanDoubleSpinBox, item: dict) -> None:
|
||||
if not isinstance(widget, ScanDoubleSpinBox):
|
||||
return
|
||||
|
||||
precision = item.get("precision")
|
||||
if precision is None:
|
||||
return
|
||||
|
||||
try:
|
||||
widget.setDecimals(max(0, int(precision)))
|
||||
except (TypeError, ValueError):
|
||||
logger.warning(
|
||||
"Ignoring invalid precision %r for parameter %s", precision, item.get("name")
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _apply_numeric_limits(widget: ScanDoubleSpinBox | ScanSpinBox, item: dict) -> None:
|
||||
if isinstance(widget, ScanSpinBox):
|
||||
minimum = -2147483647 # largest int which qt allows
|
||||
maximum = 2147483647
|
||||
if item.get("ge") is not None:
|
||||
minimum = int(item["ge"])
|
||||
if item.get("gt") is not None:
|
||||
minimum = int(item["gt"]) + 1
|
||||
if item.get("le") is not None:
|
||||
maximum = int(item["le"])
|
||||
if item.get("lt") is not None:
|
||||
maximum = int(item["lt"]) - 1
|
||||
widget.setRange(minimum, maximum)
|
||||
return
|
||||
|
||||
if isinstance(widget, ScanDoubleSpinBox):
|
||||
minimum = -float("inf")
|
||||
maximum = float("inf")
|
||||
step = 10 ** (-widget.decimals())
|
||||
if item.get("ge") is not None:
|
||||
minimum = float(item["ge"])
|
||||
if item.get("gt") is not None:
|
||||
minimum = float(item["gt"]) + step
|
||||
if item.get("le") is not None:
|
||||
maximum = float(item["le"])
|
||||
if item.get("lt") is not None:
|
||||
maximum = float(item["lt"]) - step
|
||||
widget.setRange(minimum, maximum)
|
||||
|
||||
243
bec_widgets/widgets/control/scan_control/scan_info_adapter.py
Normal file
243
bec_widgets/widgets/control/scan_control/scan_info_adapter.py
Normal file
@@ -0,0 +1,243 @@
|
||||
"""Helpers for translating BEC scan metadata into ScanControl UI configuration."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
AnnotationValue = str | dict[str, Any] | list[Any] | None
|
||||
ScanArgumentMetadata = dict[str, Any]
|
||||
SignatureEntry = dict[str, Any]
|
||||
ScanInputConfig = dict[str, Any]
|
||||
ScanInfo = dict[str, Any]
|
||||
ScanUIConfig = dict[str, Any]
|
||||
|
||||
|
||||
class ScanInfoAdapter:
|
||||
"""Normalize available-scan payloads into the structure consumed by ``ScanControl``."""
|
||||
|
||||
@staticmethod
|
||||
def has_scan_ui_config(scan_info: ScanInfo) -> bool:
|
||||
"""Check whether a scan exposes enough metadata to build a UI.
|
||||
|
||||
Args:
|
||||
scan_info (ScanInfo): Available-scan payload for one scan.
|
||||
|
||||
Returns:
|
||||
bool: ``True`` when a supported GUI metadata field is present.
|
||||
"""
|
||||
return bool(scan_info.get("gui_visibility") or scan_info.get("gui_config"))
|
||||
|
||||
@staticmethod
|
||||
def format_display_name(name: str) -> str:
|
||||
"""Convert a parameter name into a user-facing label.
|
||||
|
||||
Args:
|
||||
name (str): Raw parameter name.
|
||||
|
||||
Returns:
|
||||
str: Formatted display label such as ``Exp Time``.
|
||||
"""
|
||||
parts = re.split(r"(_|\d+)", name)
|
||||
return " ".join(part.capitalize() for part in parts if part.isalnum()).strip()
|
||||
|
||||
@staticmethod
|
||||
def resolve_tooltip(scan_argument: ScanArgumentMetadata) -> str | None:
|
||||
"""Resolve the tooltip text from parsed ``ScanArgument`` metadata.
|
||||
|
||||
Args:
|
||||
scan_argument (ScanArgumentMetadata): Parsed ``ScanArgument`` metadata.
|
||||
|
||||
Returns:
|
||||
str | None: Explicit tooltip text if provided, otherwise the description fallback.
|
||||
"""
|
||||
return scan_argument.get("tooltip") or scan_argument.get("description")
|
||||
|
||||
@staticmethod
|
||||
def parse_annotation(
|
||||
annotation: AnnotationValue,
|
||||
) -> tuple[AnnotationValue, ScanArgumentMetadata]:
|
||||
"""Extract the serialized base annotation and ``ScanArgument`` metadata.
|
||||
|
||||
Args:
|
||||
annotation (AnnotationValue): Serialized annotation payload from BEC.
|
||||
|
||||
Returns:
|
||||
tuple[AnnotationValue, ScanArgumentMetadata]: The unwrapped annotation and parsed
|
||||
``ScanArgument`` metadata.
|
||||
"""
|
||||
scan_argument: ScanArgumentMetadata = {}
|
||||
if isinstance(annotation, list):
|
||||
annotation = next(
|
||||
(entry for entry in annotation if entry != "NoneType"),
|
||||
annotation[0] if annotation else "_empty",
|
||||
)
|
||||
if isinstance(annotation, dict) and "Annotated" in annotation:
|
||||
annotated = annotation["Annotated"]
|
||||
annotation = annotated.get("type", "_empty")
|
||||
scan_argument = annotated.get("metadata", {}).get("ScanArgument", {}) or {}
|
||||
return annotation, scan_argument
|
||||
|
||||
@staticmethod
|
||||
def scan_arg_type_from_annotation(annotation: AnnotationValue) -> AnnotationValue:
|
||||
"""Normalize an annotation value to the widget type expected by ``ScanControl``.
|
||||
|
||||
Args:
|
||||
annotation (AnnotationValue): Serialized or parsed annotation value.
|
||||
|
||||
Returns:
|
||||
AnnotationValue: The normalized type identifier used by the widget layer.
|
||||
"""
|
||||
if isinstance(annotation, dict):
|
||||
return annotation
|
||||
if annotation in ("_empty", None):
|
||||
return "str"
|
||||
return annotation
|
||||
|
||||
def scan_input_from_signature(
|
||||
self, param: SignatureEntry, arg: bool = False
|
||||
) -> ScanInputConfig:
|
||||
"""Build one ScanControl input description from a signature entry.
|
||||
|
||||
Args:
|
||||
param (SignatureEntry): Serialized signature entry.
|
||||
arg (bool): Whether the parameter belongs to the positional arg bundle.
|
||||
|
||||
Returns:
|
||||
ScanInputConfig: Normalized input configuration for ``ScanControl``.
|
||||
"""
|
||||
annotation, scan_argument = self.parse_annotation(param.get("annotation"))
|
||||
return self._build_scan_input(
|
||||
name=param["name"],
|
||||
annotation=annotation,
|
||||
scan_argument=scan_argument,
|
||||
arg=arg,
|
||||
default=None if arg else param.get("default", None),
|
||||
)
|
||||
|
||||
def scan_input_from_arg_input(
|
||||
self, name: str, item_type: AnnotationValue, signature_by_name: dict[str, SignatureEntry]
|
||||
) -> ScanInputConfig:
|
||||
"""Build one arg-bundle input description from ``arg_input`` metadata.
|
||||
|
||||
Args:
|
||||
name (str): Argument name from ``arg_input``.
|
||||
item_type (AnnotationValue): Serialized argument type from ``arg_input``.
|
||||
signature_by_name (dict[str, SignatureEntry]): Signature entries indexed by
|
||||
parameter name.
|
||||
|
||||
Returns:
|
||||
ScanInputConfig: Normalized input configuration for one arg-bundle field.
|
||||
"""
|
||||
if name in signature_by_name:
|
||||
scan_input = self.scan_input_from_signature(signature_by_name[name], arg=True)
|
||||
scan_input["type"] = self.scan_arg_type_from_annotation(
|
||||
self.parse_annotation(signature_by_name[name].get("annotation"))[0]
|
||||
)
|
||||
else:
|
||||
annotation, scan_argument = self.parse_annotation(item_type)
|
||||
scan_input = self._build_scan_input(
|
||||
name=name,
|
||||
annotation=annotation,
|
||||
scan_argument=scan_argument,
|
||||
arg=True,
|
||||
default=None,
|
||||
)
|
||||
if scan_input["type"] in ("_empty", None):
|
||||
scan_input["type"] = item_type
|
||||
return scan_input
|
||||
|
||||
def _build_scan_input(
|
||||
self,
|
||||
name: str,
|
||||
annotation: AnnotationValue,
|
||||
scan_argument: ScanArgumentMetadata,
|
||||
*,
|
||||
arg: bool,
|
||||
default: Any,
|
||||
) -> ScanInputConfig:
|
||||
"""Build one normalized ScanControl input configuration.
|
||||
|
||||
Args:
|
||||
name (str): Parameter name.
|
||||
annotation (AnnotationValue): Parsed annotation value.
|
||||
scan_argument (ScanArgumentMetadata): Parsed ``ScanArgument`` metadata.
|
||||
arg (bool): Whether the parameter belongs to the positional arg bundle.
|
||||
default (Any): Default value for the parameter.
|
||||
|
||||
Returns:
|
||||
ScanInputConfig: Normalized input configuration.
|
||||
"""
|
||||
return {
|
||||
"arg": arg,
|
||||
"name": name,
|
||||
"type": self.scan_arg_type_from_annotation(annotation),
|
||||
"display_name": scan_argument.get("display_name") or self.format_display_name(name),
|
||||
"tooltip": self.resolve_tooltip(scan_argument),
|
||||
"default": default,
|
||||
"expert": scan_argument.get("expert", False),
|
||||
"precision": scan_argument.get("precision"),
|
||||
"units": scan_argument.get("units"),
|
||||
"reference_units": scan_argument.get("reference_units"),
|
||||
"gt": scan_argument.get("gt"),
|
||||
"ge": scan_argument.get("ge"),
|
||||
"lt": scan_argument.get("lt"),
|
||||
"le": scan_argument.get("le"),
|
||||
"alternative_group": scan_argument.get("alternative_group"),
|
||||
}
|
||||
|
||||
def build_scan_ui_config(self, scan_info: ScanInfo) -> ScanUIConfig:
|
||||
"""Normalize one available-scan entry into the widget UI configuration.
|
||||
|
||||
Args:
|
||||
scan_info (ScanInfo): Available-scan payload for one scan.
|
||||
|
||||
Returns:
|
||||
ScanUIConfig: Legacy group structure consumed by ``ScanControl`` and
|
||||
``ScanGroupBox``.
|
||||
"""
|
||||
gui_visualization = (
|
||||
scan_info.get("gui_visualization") or scan_info.get("gui_visibility") or {}
|
||||
)
|
||||
if not gui_visualization and scan_info.get("gui_config"):
|
||||
return scan_info["gui_config"]
|
||||
|
||||
signature = scan_info.get("signature", [])
|
||||
signature_by_name = {entry["name"]: entry for entry in signature}
|
||||
|
||||
arg_group = None
|
||||
arg_input = scan_info.get("arg_input", {})
|
||||
if isinstance(arg_input, dict) and arg_input:
|
||||
bundle_size = scan_info.get("arg_bundle_size", {})
|
||||
inputs = [
|
||||
self.scan_input_from_arg_input(name, item_type, signature_by_name)
|
||||
for name, item_type in arg_input.items()
|
||||
]
|
||||
arg_group = {
|
||||
"name": "Scan Arguments",
|
||||
"bundle": bundle_size.get("bundle"),
|
||||
"arg_inputs": arg_input,
|
||||
"inputs": inputs,
|
||||
"min": bundle_size.get("min"),
|
||||
"max": bundle_size.get("max"),
|
||||
}
|
||||
|
||||
kwarg_groups = []
|
||||
arg_names = set(arg_input) if isinstance(arg_input, dict) else set()
|
||||
for group_name, input_names in gui_visualization.items():
|
||||
inputs = []
|
||||
for input_name in input_names:
|
||||
if input_name in arg_names or input_name not in signature_by_name:
|
||||
continue
|
||||
param = signature_by_name[input_name]
|
||||
if param.get("kind") in ("VAR_POSITIONAL", "VAR_KEYWORD"):
|
||||
continue
|
||||
inputs.append(self.scan_input_from_signature(param))
|
||||
if inputs:
|
||||
kwarg_groups.append({"name": group_name, "inputs": inputs})
|
||||
|
||||
return {
|
||||
"scan_class_name": scan_info.get("class"),
|
||||
"arg_group": arg_group,
|
||||
"kwarg_groups": kwarg_groups,
|
||||
}
|
||||
@@ -1,605 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
from dataclasses import dataclass, field
|
||||
from uuid import uuid4
|
||||
from weakref import WeakValueDictionary
|
||||
|
||||
import shiboken6
|
||||
from bec_lib.logger import bec_logger
|
||||
from qtpy.QtCore import Qt, Signal
|
||||
from qtpy.QtGui import QMouseEvent
|
||||
from qtpy.QtWidgets import (
|
||||
QApplication,
|
||||
QHBoxLayout,
|
||||
QLabel,
|
||||
QStackedLayout,
|
||||
QTabWidget,
|
||||
QVBoxLayout,
|
||||
QWidget,
|
||||
)
|
||||
|
||||
from bec_widgets.utils.bec_widget import BECWidget
|
||||
from bec_widgets.widgets.utility.bec_term.protocol import BecTerminal
|
||||
from bec_widgets.widgets.utility.bec_term.util import get_current_bec_term_class
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
_BecTermClass = get_current_bec_term_class()
|
||||
|
||||
# Note on definitions:
|
||||
# Terminal: an instance of a terminal widget with a system shell
|
||||
# Console: one of possibly several widgets which may share ownership of one single terminal
|
||||
# Shell: a Console set to start the BEC IPython client in its terminal
|
||||
|
||||
|
||||
class ConsoleMode(str, enum.Enum):
|
||||
ACTIVE = "active"
|
||||
INACTIVE = "inactive"
|
||||
HIDDEN = "hidden"
|
||||
|
||||
|
||||
@dataclass
|
||||
class _TerminalOwnerInfo:
|
||||
"""Should be managed only by the BecConsoleRegistry. Consoles should ask the registry for
|
||||
necessary ownership info."""
|
||||
|
||||
owner_console_id: str | None = None
|
||||
registered_console_ids: set[str] = field(default_factory=set)
|
||||
instance: BecTerminal | None = None
|
||||
terminal_id: str = ""
|
||||
initialized: bool = False
|
||||
persist_session: bool = False
|
||||
fallback_holder: QWidget | None = None
|
||||
|
||||
|
||||
class BecConsoleRegistry:
|
||||
"""
|
||||
A registry for the BecConsole class to manage its instances.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialize the registry.
|
||||
"""
|
||||
self._consoles: WeakValueDictionary[str, BecConsole] = WeakValueDictionary()
|
||||
self._terminal_registry: dict[str, _TerminalOwnerInfo] = {}
|
||||
|
||||
@staticmethod
|
||||
def _is_valid_qobject(obj: object | None) -> bool:
|
||||
return obj is not None and shiboken6.isValid(obj)
|
||||
|
||||
def _connect_app_cleanup(self) -> None:
|
||||
app = QApplication.instance()
|
||||
if app is None:
|
||||
return
|
||||
app.aboutToQuit.connect(self.clear, Qt.ConnectionType.UniqueConnection)
|
||||
|
||||
@staticmethod
|
||||
def _new_terminal_info(console: BecConsole) -> _TerminalOwnerInfo:
|
||||
term = _BecTermClass()
|
||||
return _TerminalOwnerInfo(
|
||||
registered_console_ids={console.console_id},
|
||||
owner_console_id=console.console_id,
|
||||
instance=term,
|
||||
terminal_id=console.terminal_id,
|
||||
persist_session=console.persist_terminal_session,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _replace_terminal(info: _TerminalOwnerInfo, console: BecConsole) -> None:
|
||||
info.instance = _BecTermClass()
|
||||
info.initialized = False
|
||||
info.owner_console_id = console.console_id
|
||||
info.registered_console_ids.add(console.console_id)
|
||||
info.persist_session = info.persist_session or console.persist_terminal_session
|
||||
|
||||
def _delete_terminal_info(self, info: _TerminalOwnerInfo) -> None:
|
||||
if self._is_valid_qobject(info.instance):
|
||||
info.instance.deleteLater() # type: ignore[union-attr]
|
||||
info.instance = None
|
||||
if self._is_valid_qobject(info.fallback_holder):
|
||||
info.fallback_holder.deleteLater()
|
||||
info.fallback_holder = None
|
||||
|
||||
def _parking_parent(
|
||||
self,
|
||||
info: _TerminalOwnerInfo,
|
||||
console: BecConsole | None = None,
|
||||
*,
|
||||
avoid_console: bool = False,
|
||||
) -> QWidget | None:
|
||||
for console_id in info.registered_console_ids:
|
||||
candidate = self._consoles.get(console_id)
|
||||
if candidate is None or candidate is console:
|
||||
continue
|
||||
if self._is_valid_qobject(candidate):
|
||||
return candidate._term_holder
|
||||
|
||||
if console is None or not self._is_valid_qobject(console):
|
||||
return None
|
||||
|
||||
window = console.window()
|
||||
if window is not None and window is not console and self._is_valid_qobject(window):
|
||||
return window
|
||||
|
||||
if not avoid_console:
|
||||
return console._term_holder
|
||||
return None
|
||||
|
||||
def _fallback_holder(
|
||||
self,
|
||||
info: _TerminalOwnerInfo,
|
||||
console: BecConsole | None = None,
|
||||
*,
|
||||
avoid_console: bool = False,
|
||||
) -> QWidget:
|
||||
if not self._is_valid_qobject(info.fallback_holder):
|
||||
info.fallback_holder = QWidget(
|
||||
parent=self._parking_parent(info, console, avoid_console=avoid_console)
|
||||
)
|
||||
info.fallback_holder.setObjectName(f"_bec_console_terminal_holder_{info.terminal_id}")
|
||||
info.fallback_holder.hide()
|
||||
return info.fallback_holder
|
||||
|
||||
def _park_terminal(
|
||||
self,
|
||||
info: _TerminalOwnerInfo,
|
||||
console: BecConsole | None = None,
|
||||
*,
|
||||
avoid_console: bool = False,
|
||||
) -> None:
|
||||
if not self._is_valid_qobject(info.instance):
|
||||
return
|
||||
|
||||
parent = self._parking_parent(info, console, avoid_console=avoid_console)
|
||||
if parent is None and info.persist_session:
|
||||
parent = self._fallback_holder(info, console, avoid_console=avoid_console)
|
||||
|
||||
info.instance.hide() # type: ignore[union-attr]
|
||||
info.instance.setParent(parent) # type: ignore[union-attr]
|
||||
|
||||
def clear(self) -> None:
|
||||
"""Delete every tracked terminal and holder."""
|
||||
for info in list(self._terminal_registry.values()):
|
||||
self._delete_terminal_info(info)
|
||||
self._terminal_registry.clear()
|
||||
self._consoles.clear()
|
||||
|
||||
def register(self, console: BecConsole):
|
||||
"""
|
||||
Register an instance of BecConsole. If there is already a terminal with the associated
|
||||
terminal_id, this does not automatically grant ownership.
|
||||
|
||||
Args:
|
||||
console (BecConsole): The instance to register.
|
||||
"""
|
||||
self._connect_app_cleanup()
|
||||
self._consoles[console.console_id] = console
|
||||
console_id, terminal_id = console.console_id, console.terminal_id
|
||||
term_info = self._terminal_registry.get(terminal_id)
|
||||
if term_info is None:
|
||||
self._terminal_registry[terminal_id] = self._new_terminal_info(console)
|
||||
return
|
||||
|
||||
term_info.persist_session = term_info.persist_session or console.persist_terminal_session
|
||||
had_registered_consoles = bool(term_info.registered_console_ids)
|
||||
term_info.registered_console_ids.add(console_id)
|
||||
if not self._is_valid_qobject(term_info.instance):
|
||||
self._replace_terminal(term_info, console)
|
||||
return
|
||||
if (
|
||||
term_info.owner_console_id is not None
|
||||
and term_info.owner_console_id not in self._consoles
|
||||
):
|
||||
term_info.owner_console_id = None
|
||||
if term_info.owner_console_id is None and not had_registered_consoles:
|
||||
term_info.owner_console_id = console_id
|
||||
logger.info(f"Registered new console {console_id} for terminal {terminal_id}")
|
||||
|
||||
def unregister(self, console: BecConsole):
|
||||
"""
|
||||
Unregister an instance of BecConsole.
|
||||
|
||||
Args:
|
||||
console (BecConsole): The instance to unregister.
|
||||
"""
|
||||
console_id, terminal_id = console.console_id, console.terminal_id
|
||||
if console_id in self._consoles:
|
||||
del self._consoles[console_id]
|
||||
if (term_info := self._terminal_registry.get(terminal_id)) is None:
|
||||
return
|
||||
detached = console._detach_terminal_widget(term_info.instance)
|
||||
if console_id in term_info.registered_console_ids:
|
||||
term_info.registered_console_ids.remove(console_id)
|
||||
if term_info.owner_console_id == console_id:
|
||||
term_info.owner_console_id = None
|
||||
if not term_info.registered_console_ids:
|
||||
if term_info.persist_session and self._is_valid_qobject(term_info.instance):
|
||||
self._park_terminal(term_info, console, avoid_console=True)
|
||||
logger.info(f"Unregistered console {console_id} for terminal {terminal_id}")
|
||||
return
|
||||
|
||||
self._delete_terminal_info(term_info)
|
||||
del self._terminal_registry[terminal_id]
|
||||
elif detached:
|
||||
self._park_terminal(term_info, console, avoid_console=True)
|
||||
|
||||
logger.info(f"Unregistered console {console_id} for terminal {terminal_id}")
|
||||
|
||||
def is_owner(self, console: BecConsole):
|
||||
"""Returns true if the given console is the owner of its terminal"""
|
||||
if console not in self._consoles.values():
|
||||
return False
|
||||
if (info := self._terminal_registry.get(console.terminal_id)) is None:
|
||||
logger.warning(f"Console {console.console_id} references an unknown terminal!")
|
||||
return False
|
||||
if not self._is_valid_qobject(info.instance):
|
||||
return False
|
||||
return info.owner_console_id == console.console_id
|
||||
|
||||
def take_ownership(self, console: BecConsole) -> BecTerminal | None:
|
||||
"""
|
||||
Transfer ownership of a terminal to the given console.
|
||||
|
||||
Args:
|
||||
console: the console which wishes to take ownership of its associated terminal.
|
||||
Returns:
|
||||
BecTerminal | None: The instance if ownership transfer was successful, None otherwise.
|
||||
"""
|
||||
console_id, terminal_id = console.console_id, console.terminal_id
|
||||
|
||||
if terminal_id not in self._terminal_registry:
|
||||
self.register(console)
|
||||
|
||||
instance_info = self._terminal_registry[terminal_id]
|
||||
if not self._is_valid_qobject(instance_info.instance):
|
||||
self._replace_terminal(instance_info, console)
|
||||
if (old_owner_console_ide := instance_info.owner_console_id) is not None:
|
||||
if (
|
||||
old_owner_console_ide != console_id
|
||||
and (old_owner := self._consoles.get(old_owner_console_ide)) is not None
|
||||
):
|
||||
old_owner.yield_ownership() # call this on the old owner to make sure it is updated
|
||||
instance_info.owner_console_id = console_id
|
||||
instance_info.registered_console_ids.add(console_id)
|
||||
logger.info(f"Transferred ownership of terminal {terminal_id} to {console_id}")
|
||||
return instance_info.instance
|
||||
|
||||
def try_get_term(self, console: BecConsole) -> BecTerminal | None:
|
||||
"""
|
||||
Return the terminal instance if the requesting console is the owner
|
||||
|
||||
Args:
|
||||
console: the requesting console.
|
||||
Returns:
|
||||
BecTerminal | None: The instance if the console is the owner, None otherwise.
|
||||
"""
|
||||
console_id, terminal_id = console.console_id, console.terminal_id
|
||||
logger.debug(f"checking term for {console_id}")
|
||||
if terminal_id not in self._terminal_registry:
|
||||
logger.warning(f"Terminal {terminal_id} not found in registry")
|
||||
return None
|
||||
|
||||
instance_info = self._terminal_registry[terminal_id]
|
||||
if not self._is_valid_qobject(instance_info.instance):
|
||||
if instance_info.owner_console_id == console_id:
|
||||
self._replace_terminal(instance_info, console)
|
||||
else:
|
||||
return None
|
||||
if instance_info.owner_console_id == console_id:
|
||||
return instance_info.instance
|
||||
|
||||
def yield_ownership(self, console: BecConsole):
|
||||
"""
|
||||
Yield ownership of an instance without destroying it. The instance remains in the
|
||||
registry with no owner, available for another widget to claim.
|
||||
|
||||
Args:
|
||||
console (BecConsole): The console which wishes to yield ownership of its associated terminal.
|
||||
"""
|
||||
console_id, terminal_id = console.console_id, console.terminal_id
|
||||
logger.debug(f"Console {console_id} attempted to yield ownership")
|
||||
if console_id not in self._consoles or terminal_id not in self._terminal_registry:
|
||||
return
|
||||
|
||||
term_info = self._terminal_registry[terminal_id]
|
||||
if term_info.owner_console_id != console_id:
|
||||
logger.debug(f"But it was not the owner, which was {term_info.owner_console_id}!")
|
||||
return
|
||||
term_info.owner_console_id = None
|
||||
console._detach_terminal_widget(term_info.instance)
|
||||
self._park_terminal(term_info, console)
|
||||
|
||||
def should_initialize(self, console: BecConsole) -> bool:
|
||||
"""Return true if the console should send its startup command to the terminal."""
|
||||
info = self._terminal_registry.get(console.terminal_id)
|
||||
if info is None:
|
||||
return False
|
||||
return (
|
||||
info.owner_console_id == console.console_id
|
||||
and not info.initialized
|
||||
and self._is_valid_qobject(info.instance)
|
||||
)
|
||||
|
||||
def mark_initialized(self, console: BecConsole) -> None:
|
||||
info = self._terminal_registry.get(console.terminal_id)
|
||||
if info is not None and info.owner_console_id == console.console_id:
|
||||
info.initialized = True
|
||||
|
||||
def owner_is_visible(self, term_id: str) -> bool:
|
||||
"""
|
||||
Check if the owner of an instance is currently visible.
|
||||
|
||||
Args:
|
||||
term_id (str): The terminal ID to check.
|
||||
Returns:
|
||||
bool: True if the owner is visible, False otherwise.
|
||||
"""
|
||||
instance_info = self._terminal_registry.get(term_id)
|
||||
if (
|
||||
instance_info is None
|
||||
or instance_info.owner_console_id is None
|
||||
or not self._is_valid_qobject(instance_info.instance)
|
||||
):
|
||||
return False
|
||||
|
||||
if (owner := self._consoles.get(instance_info.owner_console_id)) is None:
|
||||
return False
|
||||
return owner.isVisible()
|
||||
|
||||
|
||||
_bec_console_registry = BecConsoleRegistry()
|
||||
|
||||
|
||||
class _Overlay(QWidget):
|
||||
def __init__(self, console: BecConsole):
|
||||
super().__init__(parent=console)
|
||||
self._console = console
|
||||
|
||||
def mousePressEvent(self, event: QMouseEvent) -> None:
|
||||
if event.button() == Qt.MouseButton.LeftButton:
|
||||
self._console.take_terminal_ownership()
|
||||
event.accept()
|
||||
return
|
||||
return super().mousePressEvent(event)
|
||||
|
||||
|
||||
class BecConsole(BECWidget, QWidget):
|
||||
"""A console widget with access to a shared registry of terminals, such that instances can be moved around."""
|
||||
|
||||
_js_callback = Signal(bool)
|
||||
initialized = Signal()
|
||||
|
||||
PLUGIN = True
|
||||
ICON_NAME = "terminal"
|
||||
persist_terminal_session = False
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
parent=None,
|
||||
config=None,
|
||||
client=None,
|
||||
gui_id=None,
|
||||
startup_cmd: str | None = None,
|
||||
terminal_id: str | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs)
|
||||
self._mode = ConsoleMode.INACTIVE
|
||||
self._startup_cmd = startup_cmd
|
||||
self._is_initialized = False
|
||||
self.terminal_id = terminal_id or str(uuid4())
|
||||
self.console_id = self.gui_id
|
||||
self.term: BecTerminal | None = None # Will be set in _set_up_instance
|
||||
|
||||
self._set_up_instance()
|
||||
|
||||
def _set_up_instance(self):
|
||||
"""
|
||||
Set up the web instance and UI elements.
|
||||
"""
|
||||
self._stacked_layout = QStackedLayout()
|
||||
# self._stacked_layout.setStackingMode(QStackedLayout.StackingMode.StackAll)
|
||||
self._term_holder = QWidget()
|
||||
self._term_layout = QVBoxLayout()
|
||||
self._term_layout.setContentsMargins(0, 0, 0, 0)
|
||||
self._term_holder.setLayout(self._term_layout)
|
||||
|
||||
self.setLayout(self._stacked_layout)
|
||||
|
||||
# prepare overlay
|
||||
self._overlay = _Overlay(self)
|
||||
layout = QVBoxLayout(self._overlay)
|
||||
layout.setAlignment(Qt.AlignmentFlag.AlignCenter)
|
||||
label = QLabel("Click to activate terminal", self._overlay)
|
||||
layout.addWidget(label)
|
||||
|
||||
self._stacked_layout.addWidget(self._term_holder)
|
||||
self._stacked_layout.addWidget(self._overlay)
|
||||
|
||||
# will create a new terminal instance if there isn't already one for this ID
|
||||
_bec_console_registry.register(self)
|
||||
self._infer_mode()
|
||||
self._ensure_startup_started()
|
||||
|
||||
def _infer_mode(self):
|
||||
self.term = _bec_console_registry.try_get_term(self)
|
||||
if self.term:
|
||||
self._set_mode(ConsoleMode.ACTIVE)
|
||||
elif self.isHidden():
|
||||
self._set_mode(ConsoleMode.HIDDEN)
|
||||
else:
|
||||
self._set_mode(ConsoleMode.INACTIVE)
|
||||
|
||||
def _set_mode(self, mode: ConsoleMode):
|
||||
"""
|
||||
Set the mode of the web console.
|
||||
|
||||
Args:
|
||||
mode (ConsoleMode): The mode to set.
|
||||
"""
|
||||
|
||||
match mode:
|
||||
case ConsoleMode.ACTIVE:
|
||||
if self.term:
|
||||
if self._term_layout.indexOf(self.term) == -1: # type: ignore[arg-type]
|
||||
self._term_layout.addWidget(self.term) # type: ignore # BecTerminal is QWidget
|
||||
self.term.show() # type: ignore[attr-defined]
|
||||
self._stacked_layout.setCurrentIndex(0)
|
||||
self._mode = mode
|
||||
else:
|
||||
self._stacked_layout.setCurrentIndex(1)
|
||||
self._mode = ConsoleMode.INACTIVE
|
||||
case ConsoleMode.INACTIVE:
|
||||
self._stacked_layout.setCurrentIndex(1)
|
||||
self._mode = mode
|
||||
case ConsoleMode.HIDDEN:
|
||||
self._stacked_layout.setCurrentIndex(1)
|
||||
self._mode = mode
|
||||
|
||||
@property
|
||||
def startup_cmd(self):
|
||||
"""
|
||||
Get the startup command for the web console.
|
||||
"""
|
||||
return self._startup_cmd
|
||||
|
||||
@startup_cmd.setter
|
||||
def startup_cmd(self, cmd: str | None):
|
||||
"""
|
||||
Set the startup command for the console.
|
||||
"""
|
||||
self._startup_cmd = cmd
|
||||
|
||||
def write(self, data: str, send_return: bool = True):
|
||||
"""
|
||||
Send data to the console
|
||||
|
||||
Args:
|
||||
data (str): The data to send.
|
||||
send_return (bool): Whether to send a return after the data.
|
||||
"""
|
||||
if self.term:
|
||||
self.term.write(data, send_return)
|
||||
|
||||
def _ensure_startup_started(self):
|
||||
if not self.startup_cmd or not _bec_console_registry.should_initialize(self):
|
||||
return
|
||||
self.write(self.startup_cmd, True)
|
||||
_bec_console_registry.mark_initialized(self)
|
||||
|
||||
def _detach_terminal_widget(self, term: BecTerminal | None) -> bool:
|
||||
if term is None or not BecConsoleRegistry._is_valid_qobject(term):
|
||||
if self.term is term:
|
||||
self.term = None
|
||||
return False
|
||||
|
||||
is_child = self.isAncestorOf(term) # type: ignore[arg-type]
|
||||
if self._term_layout.indexOf(term) != -1: # type: ignore[arg-type]
|
||||
self._term_layout.removeWidget(term) # type: ignore[arg-type]
|
||||
is_child = True
|
||||
if is_child:
|
||||
term.hide() # type: ignore[attr-defined]
|
||||
term.setParent(None) # type: ignore[attr-defined]
|
||||
if self.term is term:
|
||||
self.term = None
|
||||
return is_child
|
||||
|
||||
def take_terminal_ownership(self):
|
||||
"""
|
||||
Take ownership of a web instance from the registry. This will transfer the instance
|
||||
from its current owner (if any) to this widget.
|
||||
"""
|
||||
# Get the instance from registry
|
||||
self.term = _bec_console_registry.take_ownership(self)
|
||||
self._infer_mode()
|
||||
self._ensure_startup_started()
|
||||
if self._mode == ConsoleMode.ACTIVE:
|
||||
logger.debug(f"Widget {self.gui_id} took ownership of instance {self.terminal_id}")
|
||||
|
||||
def yield_ownership(self):
|
||||
"""
|
||||
Yield ownership of the instance. The instance remains in the registry with no owner,
|
||||
available for another widget to claim. This is automatically called when the
|
||||
widget becomes hidden.
|
||||
"""
|
||||
_bec_console_registry.yield_ownership(self)
|
||||
self._infer_mode()
|
||||
if self._mode != ConsoleMode.ACTIVE:
|
||||
logger.debug(f"Widget {self.gui_id} yielded ownership of instance {self.terminal_id}")
|
||||
|
||||
def hideEvent(self, event):
|
||||
"""Called when the widget is hidden. Automatically yields ownership."""
|
||||
self.yield_ownership()
|
||||
super().hideEvent(event)
|
||||
|
||||
def showEvent(self, event):
|
||||
"""Called when the widget is shown. Updates UI state based on ownership."""
|
||||
super().showEvent(event)
|
||||
if not _bec_console_registry.is_owner(self):
|
||||
if not _bec_console_registry.owner_is_visible(self.terminal_id):
|
||||
self.take_terminal_ownership()
|
||||
|
||||
def cleanup(self):
|
||||
"""Unregister this console on destruction."""
|
||||
_bec_console_registry.unregister(self)
|
||||
super().cleanup()
|
||||
|
||||
|
||||
class BECShell(BecConsole):
|
||||
"""
|
||||
A BecConsole pre-configured to run the BEC shell.
|
||||
We cannot simply expose the web console properties to Qt as we need to have a deterministic
|
||||
startup behavior for sharing the same shell instance across multiple widgets.
|
||||
"""
|
||||
|
||||
ICON_NAME = "hub"
|
||||
persist_terminal_session = True
|
||||
|
||||
def __init__(self, parent=None, config=None, client=None, gui_id=None, **kwargs):
|
||||
super().__init__(
|
||||
parent=parent,
|
||||
config=config,
|
||||
client=client,
|
||||
gui_id=gui_id,
|
||||
terminal_id="bec_shell",
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
@property
|
||||
def startup_cmd(self):
|
||||
"""
|
||||
Get the startup command for the BEC shell.
|
||||
"""
|
||||
if self.bec_dispatcher.cli_server is None:
|
||||
return "bec --nogui"
|
||||
return f"bec --gui-id {self.bec_dispatcher.cli_server.gui_id}"
|
||||
|
||||
@startup_cmd.setter
|
||||
def startup_cmd(self, cmd: str | None): ...
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
import sys
|
||||
|
||||
app = QApplication(sys.argv)
|
||||
widget = QTabWidget()
|
||||
|
||||
# Create two consoles with different unique_ids
|
||||
bec_console_1a = BecConsole(startup_cmd="htop", gui_id="console_1_a", terminal_id="terminal_1")
|
||||
bec_console_1b = BecConsole(startup_cmd="htop", gui_id="console_1_b", terminal_id="terminal_1")
|
||||
bec_console_1 = QWidget()
|
||||
bec_console_1_layout = QHBoxLayout(bec_console_1)
|
||||
bec_console_1_layout.addWidget(bec_console_1a)
|
||||
bec_console_1_layout.addWidget(bec_console_1b)
|
||||
bec_console2 = BECShell()
|
||||
bec_console3 = BecConsole(gui_id="console_3", terminal_id="terminal_1")
|
||||
widget.addTab(bec_console_1, "Console 1")
|
||||
widget.addTab(bec_console2, "Console 2 - BEC Shell")
|
||||
widget.addTab(bec_console3, "Console 3 -- mirror of Console 1")
|
||||
widget.show()
|
||||
|
||||
widget.resize(800, 600)
|
||||
|
||||
sys.exit(app.exec_())
|
||||
@@ -1 +0,0 @@
|
||||
{'files': ['bec_console.py']}
|
||||
@@ -1 +0,0 @@
|
||||
{'files': ['bec_console.py']}
|
||||
@@ -0,0 +1 @@
|
||||
{'files': ['web_console.py']}
|
||||
@@ -5,7 +5,7 @@ from qtpy.QtDesigner import QDesignerCustomWidgetInterface
|
||||
from qtpy.QtWidgets import QWidget
|
||||
|
||||
from bec_widgets.utils.bec_designer import designer_material_icon
|
||||
from bec_widgets.widgets.editors.bec_console.bec_console import BECShell
|
||||
from bec_widgets.widgets.editors.web_console.web_console import BECShell
|
||||
|
||||
DOM_XML = """
|
||||
<ui language='c++'>
|
||||
@@ -6,7 +6,7 @@ def main(): # pragma: no cover
|
||||
return
|
||||
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
|
||||
|
||||
from bec_widgets.widgets.editors.bec_console.bec_shell_plugin import BECShellPlugin
|
||||
from bec_widgets.widgets.editors.web_console.bec_shell_plugin import BECShellPlugin
|
||||
|
||||
QPyDesignerCustomWidgetCollection.addCustomWidget(BECShellPlugin())
|
||||
|
||||
@@ -6,9 +6,9 @@ def main(): # pragma: no cover
|
||||
return
|
||||
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
|
||||
|
||||
from bec_widgets.widgets.editors.bec_console.bec_console_plugin import BecConsolePlugin
|
||||
from bec_widgets.widgets.editors.web_console.web_console_plugin import WebConsolePlugin
|
||||
|
||||
QPyDesignerCustomWidgetCollection.addCustomWidget(BecConsolePlugin())
|
||||
QPyDesignerCustomWidgetCollection.addCustomWidget(WebConsolePlugin())
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
705
bec_widgets/widgets/editors/web_console/web_console.py
Normal file
705
bec_widgets/widgets/editors/web_console/web_console.py
Normal file
@@ -0,0 +1,705 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
import json
|
||||
import secrets
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from louie.saferef import safe_ref
|
||||
from pydantic import BaseModel
|
||||
from qtpy.QtCore import Qt, QTimer, QUrl, Signal, qInstallMessageHandler
|
||||
from qtpy.QtGui import QMouseEvent, QResizeEvent
|
||||
from qtpy.QtWebEngineWidgets import QWebEnginePage, QWebEngineView
|
||||
from qtpy.QtWidgets import QApplication, QLabel, QTabWidget, QVBoxLayout, QWidget
|
||||
|
||||
from bec_widgets.utils.bec_widget import BECWidget
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class ConsoleMode(str, enum.Enum):
|
||||
ACTIVE = "active"
|
||||
INACTIVE = "inactive"
|
||||
HIDDEN = "hidden"
|
||||
|
||||
|
||||
class PageOwnerInfo(BaseModel):
|
||||
owner_gui_id: str | None = None
|
||||
widget_ids: list[str] = []
|
||||
page: QWebEnginePage | None = None
|
||||
initialized: bool = False
|
||||
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
|
||||
|
||||
class WebConsoleRegistry:
|
||||
"""
|
||||
A registry for the WebConsole class to manage its instances.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialize the registry.
|
||||
"""
|
||||
self._instances = {}
|
||||
self._server_process = None
|
||||
self._server_port = None
|
||||
self._token = secrets.token_hex(16)
|
||||
self._page_registry: dict[str, PageOwnerInfo] = {}
|
||||
|
||||
def register(self, instance: WebConsole):
|
||||
"""
|
||||
Register an instance of WebConsole.
|
||||
|
||||
Args:
|
||||
instance (WebConsole): The instance to register.
|
||||
"""
|
||||
self._instances[instance.gui_id] = safe_ref(instance)
|
||||
self.cleanup()
|
||||
|
||||
if instance._unique_id:
|
||||
self._register_page(instance)
|
||||
|
||||
if self._server_process is None:
|
||||
# Start the ttyd server if not already running
|
||||
self.start_ttyd()
|
||||
|
||||
def start_ttyd(self, use_zsh: bool | None = None):
|
||||
"""
|
||||
Start the ttyd server
|
||||
ttyd -q -W -t 'theme={"background": "black"}' zsh
|
||||
|
||||
Args:
|
||||
use_zsh (bool): Whether to use zsh or bash. If None, it will try to detect if zsh is available.
|
||||
"""
|
||||
|
||||
# First, check if ttyd is installed
|
||||
try:
|
||||
subprocess.run(["ttyd", "--version"], check=True, stdout=subprocess.PIPE)
|
||||
except FileNotFoundError:
|
||||
# pylint: disable=raise-missing-from
|
||||
raise RuntimeError("ttyd is not installed. Please install it first.")
|
||||
|
||||
if use_zsh is None:
|
||||
# Check if we can use zsh
|
||||
try:
|
||||
subprocess.run(["zsh", "--version"], check=True, stdout=subprocess.PIPE)
|
||||
use_zsh = True
|
||||
except FileNotFoundError:
|
||||
use_zsh = False
|
||||
|
||||
command = [
|
||||
"ttyd",
|
||||
"-p",
|
||||
"0",
|
||||
"-W",
|
||||
"-t",
|
||||
'theme={"background": "black"}',
|
||||
"-c",
|
||||
f"user:{self._token}",
|
||||
]
|
||||
if use_zsh:
|
||||
command.append("zsh")
|
||||
else:
|
||||
command.append("bash")
|
||||
|
||||
# Start the ttyd server
|
||||
self._server_process = subprocess.Popen(
|
||||
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
||||
)
|
||||
|
||||
self._wait_for_server_port()
|
||||
|
||||
self._server_process.stdout.close()
|
||||
self._server_process.stderr.close()
|
||||
|
||||
def _wait_for_server_port(self, timeout: float = 10):
|
||||
"""
|
||||
Wait for the ttyd server to start and get the port number.
|
||||
|
||||
Args:
|
||||
timeout (float): The timeout in seconds to wait for the server to start.
|
||||
"""
|
||||
start_time = time.time()
|
||||
while True:
|
||||
output = self._server_process.stderr.readline()
|
||||
if output == b"" and self._server_process.poll() is not None:
|
||||
break
|
||||
if not output:
|
||||
continue
|
||||
|
||||
output = output.decode("utf-8").strip()
|
||||
if "Listening on" in output:
|
||||
# Extract the port number from the output
|
||||
self._server_port = int(output.split(":")[-1])
|
||||
logger.info(f"ttyd server started on port {self._server_port}")
|
||||
break
|
||||
if time.time() - start_time > timeout:
|
||||
raise TimeoutError(
|
||||
"Timeout waiting for ttyd server to start. Please check if ttyd is installed and available in your PATH."
|
||||
)
|
||||
|
||||
def cleanup(self):
|
||||
"""
|
||||
Clean up the registry by removing any instances that are no longer valid.
|
||||
"""
|
||||
for gui_id, weak_ref in list(self._instances.items()):
|
||||
if weak_ref() is None:
|
||||
del self._instances[gui_id]
|
||||
|
||||
if not self._instances and self._server_process:
|
||||
# If no instances are left, terminate the server process
|
||||
self._server_process.terminate()
|
||||
self._server_process = None
|
||||
self._server_port = None
|
||||
logger.info("ttyd server terminated")
|
||||
|
||||
def unregister(self, instance: WebConsole):
|
||||
"""
|
||||
Unregister an instance of WebConsole.
|
||||
|
||||
Args:
|
||||
instance (WebConsole): The instance to unregister.
|
||||
"""
|
||||
if instance.gui_id in self._instances:
|
||||
del self._instances[instance.gui_id]
|
||||
|
||||
if instance._unique_id:
|
||||
self._unregister_page(instance._unique_id, instance.gui_id)
|
||||
|
||||
self.cleanup()
|
||||
|
||||
def _register_page(self, instance: WebConsole):
|
||||
"""
|
||||
Register a page in the registry. Please note that this does not transfer ownership
|
||||
for already existing pages; it simply records which widget currently owns the page.
|
||||
Use transfer_page_ownership to change ownership.
|
||||
|
||||
Args:
|
||||
instance (WebConsole): The instance to register.
|
||||
"""
|
||||
|
||||
unique_id = instance._unique_id
|
||||
gui_id = instance.gui_id
|
||||
|
||||
if unique_id is None:
|
||||
return
|
||||
|
||||
if unique_id not in self._page_registry:
|
||||
page = BECWebEnginePage()
|
||||
page.authenticationRequired.connect(instance._authenticate)
|
||||
self._page_registry[unique_id] = PageOwnerInfo(
|
||||
owner_gui_id=gui_id, widget_ids=[gui_id], page=page
|
||||
)
|
||||
logger.info(f"Registered new page {unique_id} for {gui_id}")
|
||||
return
|
||||
|
||||
if gui_id not in self._page_registry[unique_id].widget_ids:
|
||||
self._page_registry[unique_id].widget_ids.append(gui_id)
|
||||
|
||||
def _unregister_page(self, unique_id: str, gui_id: str):
|
||||
"""
|
||||
Unregister a page from the registry.
|
||||
|
||||
Args:
|
||||
unique_id (str): The unique identifier for the page.
|
||||
gui_id (str): The GUI ID of the widget.
|
||||
"""
|
||||
if unique_id not in self._page_registry:
|
||||
return
|
||||
page_info = self._page_registry[unique_id]
|
||||
if gui_id in page_info.widget_ids:
|
||||
page_info.widget_ids.remove(gui_id)
|
||||
if page_info.owner_gui_id == gui_id:
|
||||
page_info.owner_gui_id = None
|
||||
if not page_info.widget_ids:
|
||||
if page_info.page:
|
||||
page_info.page.deleteLater()
|
||||
del self._page_registry[unique_id]
|
||||
|
||||
logger.info(f"Unregistered page {unique_id} for {gui_id}")
|
||||
|
||||
def get_page_info(self, unique_id: str) -> PageOwnerInfo | None:
|
||||
"""
|
||||
Get a page from the registry.
|
||||
|
||||
Args:
|
||||
unique_id (str): The unique identifier for the page.
|
||||
|
||||
Returns:
|
||||
PageOwnerInfo | None: The page info if found, None otherwise.
|
||||
"""
|
||||
if unique_id not in self._page_registry:
|
||||
return None
|
||||
return self._page_registry[unique_id]
|
||||
|
||||
def take_page_ownership(self, unique_id: str, new_owner_gui_id: str) -> QWebEnginePage | None:
|
||||
"""
|
||||
Transfer ownership of a page to a new owner.
|
||||
|
||||
Args:
|
||||
unique_id (str): The unique identifier for the page.
|
||||
new_owner_gui_id (str): The GUI ID of the new owner.
|
||||
|
||||
Returns:
|
||||
QWebEnginePage | None: The page if ownership transfer was successful, None otherwise.
|
||||
"""
|
||||
if unique_id not in self._page_registry:
|
||||
logger.warning(f"Page {unique_id} not found in registry")
|
||||
return None
|
||||
|
||||
page_info = self._page_registry[unique_id]
|
||||
old_owner_gui_id = page_info.owner_gui_id
|
||||
if old_owner_gui_id:
|
||||
old_owner_ref = self._instances.get(old_owner_gui_id)
|
||||
if old_owner_ref:
|
||||
old_owner_instance = old_owner_ref()
|
||||
if old_owner_instance:
|
||||
old_owner_instance.yield_ownership()
|
||||
page_info.owner_gui_id = new_owner_gui_id
|
||||
|
||||
logger.info(f"Transferred ownership of page {unique_id} to {new_owner_gui_id}")
|
||||
return page_info.page
|
||||
|
||||
def yield_ownership(self, gui_id: str) -> bool:
|
||||
"""
|
||||
Yield ownership of a page without destroying it. The page remains in the
|
||||
registry with no owner, available for another widget to claim.
|
||||
|
||||
Args:
|
||||
gui_id (str): The GUI ID of the widget yielding ownership.
|
||||
|
||||
Returns:
|
||||
bool: True if ownership was yielded, False otherwise.
|
||||
"""
|
||||
if gui_id not in self._instances:
|
||||
return False
|
||||
|
||||
instance = self._instances[gui_id]()
|
||||
if instance is None:
|
||||
return False
|
||||
|
||||
unique_id = instance._unique_id
|
||||
if unique_id is None:
|
||||
return False
|
||||
|
||||
if unique_id not in self._page_registry:
|
||||
return False
|
||||
|
||||
page_owner_info = self._page_registry[unique_id]
|
||||
if page_owner_info.owner_gui_id != gui_id:
|
||||
return False
|
||||
|
||||
page_owner_info.owner_gui_id = None
|
||||
return True
|
||||
|
||||
def owner_is_visible(self, unique_id: str) -> bool:
|
||||
"""
|
||||
Check if the owner of a page is currently visible.
|
||||
|
||||
Args:
|
||||
unique_id (str): The unique identifier for the page.
|
||||
Returns:
|
||||
bool: True if the owner is visible, False otherwise.
|
||||
"""
|
||||
page_info = self.get_page_info(unique_id)
|
||||
if page_info is None or page_info.owner_gui_id is None:
|
||||
return False
|
||||
|
||||
owner_ref = self._instances.get(page_info.owner_gui_id)
|
||||
if owner_ref is None:
|
||||
return False
|
||||
|
||||
owner_instance = owner_ref()
|
||||
if owner_instance is None:
|
||||
return False
|
||||
|
||||
return owner_instance.isVisible()
|
||||
|
||||
|
||||
_web_console_registry = WebConsoleRegistry()
|
||||
|
||||
|
||||
def suppress_qt_messages(type_, context, msg):
|
||||
if context.category in ["js", "default"]:
|
||||
return
|
||||
print(msg)
|
||||
|
||||
|
||||
qInstallMessageHandler(suppress_qt_messages)
|
||||
|
||||
|
||||
class BECWebEnginePage(QWebEnginePage):
|
||||
def javaScriptConsoleMessage(self, level, message, lineNumber, sourceID):
|
||||
logger.info(f"[JS Console] {level.name} at line {lineNumber} in {sourceID}: {message}")
|
||||
|
||||
|
||||
class WebConsole(BECWidget, QWidget):
|
||||
"""
|
||||
A simple widget to display a website
|
||||
"""
|
||||
|
||||
_js_callback = Signal(bool)
|
||||
initialized = Signal()
|
||||
|
||||
PLUGIN = True
|
||||
ICON_NAME = "terminal"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
parent=None,
|
||||
config=None,
|
||||
client=None,
|
||||
gui_id=None,
|
||||
startup_cmd: str | None = None,
|
||||
is_bec_shell: bool = False,
|
||||
unique_id: str | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs)
|
||||
self._mode = ConsoleMode.INACTIVE
|
||||
self._is_bec_shell = is_bec_shell
|
||||
self._startup_cmd = startup_cmd
|
||||
self._is_initialized = False
|
||||
self._unique_id = unique_id
|
||||
self.page = None # Will be set in _set_up_page
|
||||
|
||||
self._startup_timer = QTimer()
|
||||
self._startup_timer.setInterval(500)
|
||||
self._startup_timer.timeout.connect(self._check_page_ready)
|
||||
self._startup_timer.start()
|
||||
self._js_callback.connect(self._on_js_callback)
|
||||
|
||||
self._set_up_page()
|
||||
|
||||
def _set_up_page(self):
|
||||
"""
|
||||
Set up the web page and UI elements.
|
||||
"""
|
||||
layout = QVBoxLayout()
|
||||
layout.setContentsMargins(0, 0, 0, 0)
|
||||
self.browser = QWebEngineView(self)
|
||||
|
||||
layout.addWidget(self.browser)
|
||||
self.setLayout(layout)
|
||||
|
||||
# prepare overlay
|
||||
self.overlay = QWidget(self)
|
||||
layout = QVBoxLayout(self.overlay)
|
||||
layout.setAlignment(Qt.AlignmentFlag.AlignCenter)
|
||||
label = QLabel("Click to activate terminal", self.overlay)
|
||||
layout.addWidget(label)
|
||||
self.overlay.hide()
|
||||
|
||||
_web_console_registry.register(self)
|
||||
self._token = _web_console_registry._token
|
||||
|
||||
# If no unique_id is provided, create a new page
|
||||
if not self._unique_id:
|
||||
self.page = BECWebEnginePage(self)
|
||||
self.page.authenticationRequired.connect(self._authenticate)
|
||||
self.page.setUrl(QUrl(f"http://localhost:{_web_console_registry._server_port}"))
|
||||
self.browser.setPage(self.page)
|
||||
self._set_mode(ConsoleMode.ACTIVE)
|
||||
return
|
||||
|
||||
# Try to get the page from the registry
|
||||
page_info = _web_console_registry.get_page_info(self._unique_id)
|
||||
if page_info and page_info.page:
|
||||
self.page = page_info.page
|
||||
if not page_info.owner_gui_id or page_info.owner_gui_id == self.gui_id:
|
||||
self.browser.setPage(self.page)
|
||||
# Only set URL if this is a newly created page (no URL set yet)
|
||||
if self.page.url().isEmpty():
|
||||
self.page.setUrl(QUrl(f"http://localhost:{_web_console_registry._server_port}"))
|
||||
else:
|
||||
# We have an existing page, so we don't need the startup timer
|
||||
self._startup_timer.stop()
|
||||
if page_info.owner_gui_id != self.gui_id:
|
||||
self._set_mode(ConsoleMode.INACTIVE)
|
||||
else:
|
||||
self._set_mode(ConsoleMode.ACTIVE)
|
||||
|
||||
def _set_mode(self, mode: ConsoleMode):
|
||||
"""
|
||||
Set the mode of the web console.
|
||||
|
||||
Args:
|
||||
mode (ConsoleMode): The mode to set.
|
||||
"""
|
||||
if not self._unique_id:
|
||||
# For non-unique_id consoles, always active
|
||||
mode = ConsoleMode.ACTIVE
|
||||
|
||||
self._mode = mode
|
||||
match mode:
|
||||
case ConsoleMode.ACTIVE:
|
||||
self.browser.setVisible(True)
|
||||
self.overlay.hide()
|
||||
case ConsoleMode.INACTIVE:
|
||||
self.browser.setVisible(False)
|
||||
self.overlay.show()
|
||||
case ConsoleMode.HIDDEN:
|
||||
self.browser.setVisible(False)
|
||||
self.overlay.hide()
|
||||
|
||||
def _check_page_ready(self):
|
||||
"""
|
||||
Check if the page is ready and stop the timer if it is.
|
||||
"""
|
||||
if not self.page or self.page.isLoading():
|
||||
return
|
||||
|
||||
self.page.runJavaScript("window.term !== undefined", self._js_callback.emit)
|
||||
|
||||
def _on_js_callback(self, ready: bool):
|
||||
"""
|
||||
Callback for when the JavaScript is ready.
|
||||
"""
|
||||
if not ready:
|
||||
return
|
||||
self._is_initialized = True
|
||||
self._startup_timer.stop()
|
||||
if self.startup_cmd:
|
||||
if self._unique_id:
|
||||
page_info = _web_console_registry.get_page_info(self._unique_id)
|
||||
if page_info is None:
|
||||
return
|
||||
if not page_info.initialized:
|
||||
page_info.initialized = True
|
||||
self.write(self.startup_cmd)
|
||||
else:
|
||||
self.write(self.startup_cmd)
|
||||
self.initialized.emit()
|
||||
|
||||
@property
|
||||
def startup_cmd(self):
|
||||
"""
|
||||
Get the startup command for the web console.
|
||||
"""
|
||||
if self._is_bec_shell:
|
||||
if self.bec_dispatcher.cli_server is None:
|
||||
return "bec --nogui"
|
||||
return f"bec --gui-id {self.bec_dispatcher.cli_server.gui_id}"
|
||||
return self._startup_cmd
|
||||
|
||||
@startup_cmd.setter
|
||||
def startup_cmd(self, cmd: str):
|
||||
"""
|
||||
Set the startup command for the web console.
|
||||
"""
|
||||
if not isinstance(cmd, str):
|
||||
raise ValueError("Startup command must be a string.")
|
||||
self._startup_cmd = cmd
|
||||
|
||||
def write(self, data: str, send_return: bool = True):
|
||||
"""
|
||||
Send data to the web page
|
||||
|
||||
Args:
|
||||
data (str): The data to send.
|
||||
send_return (bool): Whether to send a return after the data.
|
||||
"""
|
||||
cmd = f"window.term.paste({json.dumps(data)});"
|
||||
if self.page is None:
|
||||
logger.warning("Cannot write to web console: page is not initialized.")
|
||||
return
|
||||
self.page.runJavaScript(cmd)
|
||||
if send_return:
|
||||
self.send_return()
|
||||
|
||||
def take_page_ownership(self, unique_id: str | None = None):
|
||||
"""
|
||||
Take ownership of a web page from the registry. This will transfer the page
|
||||
from its current owner (if any) to this widget.
|
||||
|
||||
Args:
|
||||
unique_id (str): The unique identifier of the page to take ownership of.
|
||||
If None, uses this widget's unique_id.
|
||||
"""
|
||||
if unique_id is None:
|
||||
unique_id = self._unique_id
|
||||
|
||||
if not unique_id:
|
||||
logger.warning("Cannot take page ownership without a unique_id")
|
||||
return
|
||||
|
||||
# Get the page from registry
|
||||
page = _web_console_registry.take_page_ownership(unique_id, self.gui_id)
|
||||
|
||||
if not page:
|
||||
logger.warning(f"Page {unique_id} not found in registry")
|
||||
return
|
||||
|
||||
self.page = page
|
||||
self.browser.setPage(page)
|
||||
self._set_mode(ConsoleMode.ACTIVE)
|
||||
logger.info(f"Widget {self.gui_id} took ownership of page {unique_id}")
|
||||
|
||||
def _on_ownership_lost(self):
|
||||
"""
|
||||
Called when this widget loses ownership of its page.
|
||||
Displays the overlay and hides the browser.
|
||||
"""
|
||||
self._set_mode(ConsoleMode.INACTIVE)
|
||||
logger.info(f"Widget {self.gui_id} lost ownership of page {self._unique_id}")
|
||||
|
||||
def yield_ownership(self):
|
||||
"""
|
||||
Yield ownership of the page. The page remains in the registry with no owner,
|
||||
available for another widget to claim. This is automatically called when the
|
||||
widget becomes hidden.
|
||||
"""
|
||||
if not self._unique_id:
|
||||
return
|
||||
success = _web_console_registry.yield_ownership(self.gui_id)
|
||||
if success:
|
||||
self._on_ownership_lost()
|
||||
logger.info(f"Widget {self.gui_id} yielded ownership of page {self._unique_id}")
|
||||
|
||||
def has_ownership(self) -> bool:
|
||||
"""
|
||||
Check if this widget currently has ownership of a page.
|
||||
|
||||
Returns:
|
||||
bool: True if this widget owns a page, False otherwise.
|
||||
"""
|
||||
if not self._unique_id:
|
||||
return False
|
||||
page_info = _web_console_registry.get_page_info(self._unique_id)
|
||||
if page_info is None:
|
||||
return False
|
||||
return page_info.owner_gui_id == self.gui_id
|
||||
|
||||
def hideEvent(self, event):
|
||||
"""
|
||||
Called when the widget is hidden. Automatically yields ownership.
|
||||
"""
|
||||
if self.has_ownership():
|
||||
self.yield_ownership()
|
||||
self._set_mode(ConsoleMode.HIDDEN)
|
||||
super().hideEvent(event)
|
||||
|
||||
def showEvent(self, event):
|
||||
"""
|
||||
Called when the widget is shown. Updates UI state based on ownership.
|
||||
"""
|
||||
super().showEvent(event)
|
||||
if self._unique_id and not self.has_ownership():
|
||||
# Take ownership if the page does not have an owner or
|
||||
# the owner is not visible
|
||||
page_info = _web_console_registry.get_page_info(self._unique_id)
|
||||
if page_info is None:
|
||||
self._set_mode(ConsoleMode.INACTIVE)
|
||||
return
|
||||
if page_info.owner_gui_id is None or not _web_console_registry.owner_is_visible(
|
||||
self._unique_id
|
||||
):
|
||||
self.take_page_ownership(self._unique_id)
|
||||
return
|
||||
if page_info.owner_gui_id != self.gui_id:
|
||||
self._set_mode(ConsoleMode.INACTIVE)
|
||||
return
|
||||
|
||||
def resizeEvent(self, event: QResizeEvent) -> None:
|
||||
super().resizeEvent(event)
|
||||
self.overlay.resize(event.size())
|
||||
|
||||
def mousePressEvent(self, event: QMouseEvent) -> None:
|
||||
if event.button() == Qt.MouseButton.LeftButton and not self.has_ownership():
|
||||
self.take_page_ownership(self._unique_id)
|
||||
event.accept()
|
||||
return
|
||||
return super().mousePressEvent(event)
|
||||
|
||||
def _authenticate(self, _, auth):
|
||||
"""
|
||||
Authenticate the request with the provided username and password.
|
||||
"""
|
||||
auth.setUser("user")
|
||||
auth.setPassword(self._token)
|
||||
|
||||
def send_return(self):
|
||||
"""
|
||||
Send return to the web page
|
||||
"""
|
||||
self.page.runJavaScript(
|
||||
"document.querySelector('textarea.xterm-helper-textarea').dispatchEvent(new KeyboardEvent('keypress', {charCode: 13}))"
|
||||
)
|
||||
|
||||
def send_ctrl_c(self):
|
||||
"""
|
||||
Send Ctrl+C to the web page
|
||||
"""
|
||||
self.page.runJavaScript(
|
||||
"document.querySelector('textarea.xterm-helper-textarea').dispatchEvent(new KeyboardEvent('keypress', {charCode: 3}))"
|
||||
)
|
||||
|
||||
def set_readonly(self, readonly: bool):
|
||||
"""
|
||||
Set the web console to read-only mode.
|
||||
"""
|
||||
if not isinstance(readonly, bool):
|
||||
raise ValueError("Readonly must be a boolean.")
|
||||
self.setEnabled(not readonly)
|
||||
|
||||
def cleanup(self):
|
||||
"""
|
||||
Clean up the registry by removing any instances that are no longer valid.
|
||||
"""
|
||||
self._startup_timer.stop()
|
||||
_web_console_registry.unregister(self)
|
||||
super().cleanup()
|
||||
|
||||
|
||||
class BECShell(WebConsole):
|
||||
"""
|
||||
A WebConsole pre-configured to run the BEC shell.
|
||||
We cannot simply expose the web console properties to Qt as we need to have a deterministic
|
||||
startup behavior for sharing the same shell instance across multiple widgets.
|
||||
"""
|
||||
|
||||
ICON_NAME = "hub"
|
||||
|
||||
def __init__(self, parent=None, config=None, client=None, gui_id=None, **kwargs):
|
||||
super().__init__(
|
||||
parent=parent,
|
||||
config=config,
|
||||
client=client,
|
||||
gui_id=gui_id,
|
||||
is_bec_shell=True,
|
||||
unique_id="bec_shell",
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
import sys
|
||||
|
||||
app = QApplication(sys.argv)
|
||||
widget = QTabWidget()
|
||||
|
||||
# Create two consoles with different unique_ids
|
||||
web_console1 = WebConsole(startup_cmd="bec --nogui", unique_id="console1")
|
||||
web_console2 = WebConsole(startup_cmd="htop")
|
||||
web_console3 = WebConsole(startup_cmd="bec --nogui", unique_id="console1")
|
||||
widget.addTab(web_console1, "Console 1")
|
||||
widget.addTab(web_console2, "Console 2")
|
||||
widget.addTab(web_console3, "Console 3 -- mirror of Console 1")
|
||||
widget.show()
|
||||
|
||||
# Demonstrate page sharing:
|
||||
# After initialization, web_console2 can take ownership of console1's page:
|
||||
# web_console2.take_page_ownership("console1")
|
||||
|
||||
widget.resize(800, 600)
|
||||
|
||||
def _close_cons1():
|
||||
web_console2.close()
|
||||
web_console2.deleteLater()
|
||||
|
||||
# QTimer.singleShot(3000, _close_cons1)
|
||||
|
||||
sys.exit(app.exec_())
|
||||
@@ -0,0 +1 @@
|
||||
{'files': ['web_console.py']}
|
||||
@@ -5,17 +5,17 @@ from qtpy.QtDesigner import QDesignerCustomWidgetInterface
|
||||
from qtpy.QtWidgets import QWidget
|
||||
|
||||
from bec_widgets.utils.bec_designer import designer_material_icon
|
||||
from bec_widgets.widgets.editors.bec_console.bec_console import BecConsole
|
||||
from bec_widgets.widgets.editors.web_console.web_console import WebConsole
|
||||
|
||||
DOM_XML = """
|
||||
<ui language='c++'>
|
||||
<widget class='BecConsole' name='bec_console'>
|
||||
<widget class='WebConsole' name='web_console'>
|
||||
</widget>
|
||||
</ui>
|
||||
"""
|
||||
|
||||
|
||||
class BecConsolePlugin(QDesignerCustomWidgetInterface): # pragma: no cover
|
||||
class WebConsolePlugin(QDesignerCustomWidgetInterface): # pragma: no cover
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._form_editor = None
|
||||
@@ -23,20 +23,20 @@ class BecConsolePlugin(QDesignerCustomWidgetInterface): # pragma: no cover
|
||||
def createWidget(self, parent):
|
||||
if parent is None:
|
||||
return QWidget()
|
||||
t = BecConsole(parent)
|
||||
t = WebConsole(parent)
|
||||
return t
|
||||
|
||||
def domXml(self):
|
||||
return DOM_XML
|
||||
|
||||
def group(self):
|
||||
return ""
|
||||
return "BEC Developer"
|
||||
|
||||
def icon(self):
|
||||
return designer_material_icon(BecConsole.ICON_NAME)
|
||||
return designer_material_icon(WebConsole.ICON_NAME)
|
||||
|
||||
def includeFile(self):
|
||||
return "bec_console"
|
||||
return "web_console"
|
||||
|
||||
def initialize(self, form_editor):
|
||||
self._form_editor = form_editor
|
||||
@@ -48,10 +48,10 @@ class BecConsolePlugin(QDesignerCustomWidgetInterface): # pragma: no cover
|
||||
return self._form_editor is not None
|
||||
|
||||
def name(self):
|
||||
return "BecConsole"
|
||||
return "WebConsole"
|
||||
|
||||
def toolTip(self):
|
||||
return "A console widget with access to a shared registry of terminals, such that instances can be moved around."
|
||||
return ""
|
||||
|
||||
def whatsThis(self):
|
||||
return self.toolTip()
|
||||
@@ -1,11 +0,0 @@
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
import sys
|
||||
|
||||
from pyside6_qtermwidget import QTermWidget # pylint: disable=ungrouped-imports
|
||||
from qtpy.QtWidgets import QApplication # pylint: disable=ungrouped-imports
|
||||
|
||||
app = QApplication(sys.argv)
|
||||
widget = QTermWidget()
|
||||
|
||||
widget.show()
|
||||
sys.exit(app.exec())
|
||||
@@ -1,8 +0,0 @@
|
||||
from typing import Protocol, runtime_checkable
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class BecTerminal(Protocol):
|
||||
"""Implementors of this protocol must also be subclasses of QWidget"""
|
||||
|
||||
def write(self, text: str, add_newline: bool = True): ...
|
||||
@@ -1,241 +0,0 @@
|
||||
"""A wrapper for the optional external dependency pyside6_qtermwidget.
|
||||
Simply displays a message in a QLabel if the dependency is not installed."""
|
||||
|
||||
import os
|
||||
from functools import wraps
|
||||
from typing import Sequence
|
||||
|
||||
from qtpy.QtCore import QIODevice, QPoint, QSize, QUrl, Signal # type: ignore
|
||||
from qtpy.QtGui import QAction, QFont, QKeyEvent, QResizeEvent, Qt # type: ignore
|
||||
from qtpy.QtWidgets import QLabel, QVBoxLayout, QWidget
|
||||
|
||||
try:
|
||||
from pyside6_qtermwidget import QTermWidget
|
||||
except ImportError:
|
||||
QTermWidget = None
|
||||
|
||||
|
||||
def _forward(func):
|
||||
"""Apply to a private method to forward the call to the method on QTermWidget with the same name,
|
||||
(with leading '_' removed) if it is defined, otherwise do nothing."""
|
||||
|
||||
@wraps(func)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
target = getattr(self, "_main_widget")
|
||||
if QTermWidget:
|
||||
method = getattr(target, func.__name__[1:])
|
||||
return method(*args, **kwargs)
|
||||
else:
|
||||
...
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
class BecQTerm(QWidget):
|
||||
activity = Signal()
|
||||
bell = Signal(str)
|
||||
copy_available = Signal(bool)
|
||||
current_directory_changed = Signal(str)
|
||||
finished = Signal()
|
||||
profile_changed = Signal(str)
|
||||
received_data = Signal(str)
|
||||
silence = Signal()
|
||||
term_got_focus = Signal()
|
||||
term_key_pressed = Signal(QKeyEvent)
|
||||
term_lost_focus = Signal()
|
||||
title_changed = Signal()
|
||||
url_activated = Signal(QUrl, bool)
|
||||
|
||||
def __init__(self, /, parent: QWidget | None = None, **kwargs) -> None:
|
||||
super().__init__(parent)
|
||||
self._layout = QVBoxLayout()
|
||||
self.setLayout(self._layout)
|
||||
if QTermWidget:
|
||||
self._main_widget = QTermWidget(parent=self)
|
||||
self._main_widget.activity.connect(self.activity)
|
||||
self._main_widget.bell.connect(self.bell)
|
||||
self._main_widget.copyAvailable.connect(self.copy_available)
|
||||
self._main_widget.currentDirectoryChanged.connect(self.current_directory_changed)
|
||||
self._main_widget.finished.connect(self.finished)
|
||||
self._main_widget.profileChanged.connect(self.profile_changed)
|
||||
self._main_widget.receivedData.connect(self.received_data)
|
||||
self._main_widget.silence.connect(self.silence)
|
||||
self._main_widget.termGetFocus.connect(self.term_got_focus)
|
||||
self._main_widget.termKeyPressed.connect(self.term_key_pressed)
|
||||
self._main_widget.termLostFocus.connect(self.term_lost_focus)
|
||||
self._main_widget.titleChanged.connect(self.title_changed)
|
||||
self._main_widget.urlActivated.connect(self.url_activated)
|
||||
self._setEnvironment([f"{k}={v}" for k, v in os.environ.items()])
|
||||
self._setColorScheme("Solarized")
|
||||
else:
|
||||
self._layout.setAlignment(Qt.AlignmentFlag.AlignCenter)
|
||||
self._main_widget = QLabel("pyside6_qterminal is not installed!")
|
||||
|
||||
self._layout.addWidget(self._main_widget)
|
||||
|
||||
def write(self, text: str, add_newline: bool = True):
|
||||
if add_newline:
|
||||
text += "\n"
|
||||
self._sendText(text)
|
||||
|
||||
# automatically forwarded to the widget only if it exists
|
||||
@_forward
|
||||
def _addCustomColorSchemeDir(self, custom_dir: str, /) -> None: ...
|
||||
@_forward
|
||||
def _autoHideMouseAfter(self, delay: int, /) -> None: ...
|
||||
@_forward
|
||||
def _availableColorSchemes(self) -> list[str]: ...
|
||||
@_forward
|
||||
def _availableKeyBindings(self) -> list[str]: ...
|
||||
@_forward
|
||||
def _bracketText(self, text: str, /) -> None: ...
|
||||
@_forward
|
||||
def _bracketedPasteModeIsDisabled(self, /) -> bool: ...
|
||||
@_forward
|
||||
def _changeDir(self, dir: str, /) -> None: ...
|
||||
@_forward
|
||||
def _clear(self, /) -> None: ...
|
||||
@_forward
|
||||
def _clearCustomKeyBindingsDir(self, /) -> None: ...
|
||||
@_forward
|
||||
def _copyClipboard(self, /) -> None: ...
|
||||
@_forward
|
||||
def _disableBracketedPasteMode(self, disable: bool, /) -> None: ...
|
||||
@_forward
|
||||
def _filterActions(self, position: QPoint, /) -> list[QAction]: ...
|
||||
@_forward
|
||||
def _flowControlEnabled(self, /) -> bool: ...
|
||||
@_forward
|
||||
def _getAvailableColorSchemes(self, /) -> list[str]: ...
|
||||
@_forward
|
||||
def _getForegroundProcessId(self, /) -> int: ...
|
||||
@_forward
|
||||
def _getMargin(self, /) -> int: ...
|
||||
@_forward
|
||||
def _getPtySlaveFd(self, /) -> int: ...
|
||||
@_forward
|
||||
def _getSelectionEnd(self, row: int, column: int, /) -> None: ...
|
||||
@_forward
|
||||
def _getSelectionStart(self, row: int, column: int, /) -> None: ...
|
||||
@_forward
|
||||
def _getShellPID(self, /) -> int: ...
|
||||
@_forward
|
||||
def _getTerminalFont(self, /) -> QFont: ...
|
||||
@_forward
|
||||
def _historyLinesCount(self, /) -> int: ...
|
||||
@_forward
|
||||
def _historySize(self, /) -> int: ...
|
||||
@_forward
|
||||
def _icon(self, /) -> str: ...
|
||||
@_forward
|
||||
def _isBidiEnabled(self, /) -> bool: ...
|
||||
@_forward
|
||||
def _isTitleChanged(self, /) -> bool: ...
|
||||
@_forward
|
||||
def _keyBindings(self, /) -> str: ...
|
||||
@_forward
|
||||
def _pasteClipboard(self, /) -> None: ...
|
||||
@_forward
|
||||
def _pasteSelection(self, /) -> None: ...
|
||||
@_forward
|
||||
def _resizeEvent(self, arg__1: QResizeEvent, /) -> None: ...
|
||||
@_forward
|
||||
def _saveHistory(self, device: QIODevice, /) -> None: ...
|
||||
@_forward
|
||||
def _screenColumnsCount(self, /) -> int: ...
|
||||
@_forward
|
||||
def _screenLinesCount(self, /) -> int: ...
|
||||
@_forward
|
||||
def _scrollToEnd(self, /) -> None: ...
|
||||
@_forward
|
||||
def _selectedText(self, /, preserveLineBreaks: bool = ...) -> str: ...
|
||||
@_forward
|
||||
def _selectionChanged(self, textSelected: bool, /) -> None: ...
|
||||
@_forward
|
||||
def _sendKeyEvent(self, e: QKeyEvent, /) -> None: ...
|
||||
@_forward
|
||||
def _sendText(self, text: str, /) -> None: ...
|
||||
@_forward
|
||||
def _sessionFinished(self, /) -> None: ...
|
||||
@_forward
|
||||
def _setArgs(self, args: Sequence[str], /) -> None: ...
|
||||
@_forward
|
||||
def _setAutoClose(self, arg__1: bool, /) -> None: ...
|
||||
@_forward
|
||||
def _setBidiEnabled(self, enabled: bool, /) -> None: ...
|
||||
@_forward
|
||||
def _setBlinkingCursor(self, blink: bool, /) -> None: ...
|
||||
@_forward
|
||||
def _setBoldIntense(self, boldIntense: bool, /) -> None: ...
|
||||
@_forward
|
||||
def _setColorScheme(self, name: str, /) -> None: ...
|
||||
@_forward
|
||||
def _setConfirmMultilinePaste(self, confirmMultilinePaste: bool, /) -> None: ...
|
||||
@_forward
|
||||
def _setCustomKeyBindingsDir(self, custom_dir: str, /) -> None: ...
|
||||
@_forward
|
||||
def _setDrawLineChars(self, drawLineChars: bool, /) -> None: ...
|
||||
@_forward
|
||||
def _setEnvironment(self, environment: Sequence[str], /) -> None: ...
|
||||
@_forward
|
||||
def _setFlowControlEnabled(self, enabled: bool, /) -> None: ...
|
||||
@_forward
|
||||
def _setFlowControlWarningEnabled(self, enabled: bool, /) -> None: ...
|
||||
@_forward
|
||||
def _setHistorySize(self, lines: int, /) -> None: ...
|
||||
@_forward
|
||||
def _setKeyBindings(self, kb: str, /) -> None: ...
|
||||
@_forward
|
||||
def _setMargin(self, arg__1: int, /) -> None: ...
|
||||
@_forward
|
||||
def _setMonitorActivity(self, arg__1: bool, /) -> None: ...
|
||||
@_forward
|
||||
def _setMonitorSilence(self, arg__1: bool, /) -> None: ...
|
||||
@_forward
|
||||
def _setMotionAfterPasting(self, arg__1: int, /) -> None: ...
|
||||
@_forward
|
||||
def _setSelectionEnd(self, row: int, column: int, /) -> None: ...
|
||||
@_forward
|
||||
def _setSelectionStart(self, row: int, column: int, /) -> None: ...
|
||||
@_forward
|
||||
def _setShellProgram(self, program: str, /) -> None: ...
|
||||
@_forward
|
||||
def _setSilenceTimeout(self, seconds: int, /) -> None: ...
|
||||
@_forward
|
||||
def _setSize(self, arg__1: QSize, /) -> None: ...
|
||||
@_forward
|
||||
def _setTerminalBackgroundImage(self, backgroundImage: str, /) -> None: ...
|
||||
@_forward
|
||||
def _setTerminalBackgroundMode(self, mode: int, /) -> None: ...
|
||||
@_forward
|
||||
def _setTerminalFont(self, font: QFont | str | Sequence[str], /) -> None: ...
|
||||
@_forward
|
||||
def _setTerminalOpacity(self, level: float, /) -> None: ...
|
||||
@_forward
|
||||
def _setTerminalSizeHint(self, enabled: bool, /) -> None: ...
|
||||
@_forward
|
||||
def _setTrimPastedTrailingNewlines(self, trimPastedTrailingNewlines: bool, /) -> None: ...
|
||||
@_forward
|
||||
def _setWordCharacters(self, chars: str, /) -> None: ...
|
||||
@_forward
|
||||
def _setWorkingDirectory(self, dir: str, /) -> None: ...
|
||||
@_forward
|
||||
def _sizeHint(self, /) -> QSize: ...
|
||||
@_forward
|
||||
def _startShellProgram(self, /) -> None: ...
|
||||
@_forward
|
||||
def _startTerminalTeletype(self, /) -> None: ...
|
||||
@_forward
|
||||
def _terminalSizeHint(self, /) -> bool: ...
|
||||
@_forward
|
||||
def _title(self, /) -> str: ...
|
||||
@_forward
|
||||
def _toggleShowSearchBar(self, /) -> None: ...
|
||||
@_forward
|
||||
def _wordCharacters(self, /) -> str: ...
|
||||
@_forward
|
||||
def _workingDirectory(self, /) -> str: ...
|
||||
@_forward
|
||||
def _zoomIn(self, /) -> None: ...
|
||||
@_forward
|
||||
def _zoomOut(self, /) -> None: ...
|
||||
@@ -1,6 +0,0 @@
|
||||
from bec_widgets.widgets.utility.bec_term.protocol import BecTerminal
|
||||
from bec_widgets.widgets.utility.bec_term.qtermwidget_wrapper import BecQTerm
|
||||
|
||||
|
||||
def get_current_bec_term_class() -> type[BecTerminal]:
|
||||
return BecQTerm
|
||||
152
pyproject.toml
152
pyproject.toml
@@ -1,34 +1,54 @@
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[project]
|
||||
name = "bec_widgets"
|
||||
version = "3.5.0"
|
||||
version = "3.4.2"
|
||||
description = "BEC Widgets"
|
||||
requires-python = ">=3.11"
|
||||
classifiers = [
|
||||
"Development Status :: 3 - Alpha",
|
||||
"Programming Language :: Python :: 3",
|
||||
"Topic :: Scientific/Engineering",
|
||||
"Development Status :: 3 - Alpha",
|
||||
"Programming Language :: Python :: 3",
|
||||
"Topic :: Scientific/Engineering",
|
||||
]
|
||||
dependencies = [
|
||||
"PyJWT~=2.9",
|
||||
"PySide6==6.9.0",
|
||||
"PySide6-QtAds==4.4.0",
|
||||
"bec_ipython_client~=3.107,>=3.107.2", # needed for jupyter console
|
||||
"bec_lib~=3.107,>=3.107.2",
|
||||
"bec_qthemes~=1.0, >=1.3.4",
|
||||
"black>=26,<27", # needed for bw-generate-cli
|
||||
"copier~=9.7",
|
||||
"darkdetect~=0.8",
|
||||
"isort>=5.13, <9.0", # needed for bw-generate-cli
|
||||
"markdown~=3.9",
|
||||
"ophyd_devices~=1.29, >=1.29.1",
|
||||
"pydantic~=2.0",
|
||||
"pylsp-bec~=1.2",
|
||||
"pyqtgraph==0.13.7",
|
||||
"qtconsole~=5.5, >=5.5.1", # needed for jupyter console
|
||||
"qtmonaco~=0.8, >=0.8.1",
|
||||
"qtpy~=2.4",
|
||||
"thefuzz~=0.22",
|
||||
"typer~=0.15",
|
||||
"bec_ipython_client~=3.107,>=3.107.2", # needed for jupyter console
|
||||
"bec_lib~=3.107,>=3.107.2",
|
||||
"bec_qthemes~=1.0, >=1.3.4",
|
||||
"black>=26,<27", # needed for bw-generate-cli
|
||||
"isort>=5.13, <9.0", # needed for bw-generate-cli
|
||||
"ophyd_devices~=1.29, >=1.29.1",
|
||||
"pydantic~=2.0",
|
||||
"pyqtgraph==0.13.7",
|
||||
"PySide6==6.9.0",
|
||||
"qtconsole~=5.5, >=5.5.1", # needed for jupyter console
|
||||
"qtpy~=2.4",
|
||||
"thefuzz~=0.22",
|
||||
"qtmonaco~=0.8, >=0.8.1",
|
||||
"darkdetect~=0.8",
|
||||
"PySide6-QtAds==4.4.0",
|
||||
"pylsp-bec~=1.2",
|
||||
"copier~=9.7",
|
||||
"typer~=0.15",
|
||||
"markdown~=3.9",
|
||||
"PyJWT~=2.9",
|
||||
]
|
||||
|
||||
|
||||
[project.optional-dependencies]
|
||||
dev = [
|
||||
"coverage~=7.0",
|
||||
"fakeredis~=2.23, >=2.23.2",
|
||||
"pytest-bec-e2e>=2.21.4, <=4.0",
|
||||
"pytest-qt~=4.4",
|
||||
"pytest-random-order~=1.1",
|
||||
"pytest-timeout~=2.2",
|
||||
"pytest-xvfb~=3.0",
|
||||
"pytest~=8.0",
|
||||
"pytest-cov~=6.1.1",
|
||||
"watchdog~=6.0",
|
||||
"pre_commit~=4.2",
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
@@ -36,47 +56,10 @@ dependencies = [
|
||||
Homepage = "https://gitlab.psi.ch/bec/bec_widgets"
|
||||
|
||||
[project.scripts]
|
||||
bec-app = "bec_widgets.applications.main_app:main"
|
||||
bec-designer = "bec_widgets.utils.bec_designer:main"
|
||||
bec-gui-server = "bec_widgets.cli.server:main"
|
||||
bw-generate-cli = "bec_widgets.cli.generate_cli:main"
|
||||
|
||||
[project.optional-dependencies]
|
||||
dev = [
|
||||
"coverage~=7.0",
|
||||
"fakeredis~=2.23, >=2.23.2",
|
||||
"pytest-bec-e2e>=2.21.4, <=4.0",
|
||||
"pytest-qt~=4.4",
|
||||
"pytest-random-order~=1.1",
|
||||
"pytest-timeout~=2.2",
|
||||
"pytest-xvfb~=3.0",
|
||||
"pytest~=8.0",
|
||||
"pytest-cov~=6.1.1",
|
||||
"watchdog~=6.0",
|
||||
"pre_commit~=4.2",
|
||||
]
|
||||
qtermwidget = [
|
||||
"pyside6_qtermwidget",
|
||||
]
|
||||
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[tool.black]
|
||||
line-length = 100
|
||||
skip-magic-trailing-comma = true
|
||||
|
||||
[tool.coverage.report]
|
||||
skip_empty = true # exclude empty *files*, e.g. __init__.py, from the report
|
||||
exclude_lines = [
|
||||
"pragma: no cover",
|
||||
"if TYPE_CHECKING:",
|
||||
"return NotImplemented",
|
||||
"raise NotImplementedError",
|
||||
"\\.\\.\\.",
|
||||
'if __name__ == "__main__":',
|
||||
]
|
||||
bec-gui-server = "bec_widgets.cli.server:main"
|
||||
bec-designer = "bec_widgets.utils.bec_designer:main"
|
||||
bec-app = "bec_widgets.applications.main_app:main"
|
||||
|
||||
[tool.hatch.build.targets.wheel]
|
||||
include = ["*"]
|
||||
@@ -86,6 +69,10 @@ exclude = ["docs/**", "tests/**"]
|
||||
include = ["*"]
|
||||
exclude = ["docs/**", "tests/**"]
|
||||
|
||||
[tool.black]
|
||||
line-length = 100
|
||||
skip-magic-trailing-comma = true
|
||||
|
||||
[tool.isort]
|
||||
profile = "black"
|
||||
line_length = 100
|
||||
@@ -93,12 +80,6 @@ multi_line_output = 3
|
||||
include_trailing_comma = true
|
||||
known_first_party = ["bec_widgets"]
|
||||
|
||||
[tool.ruff]
|
||||
line-length = 100
|
||||
|
||||
[tool.ruff.format]
|
||||
skip-magic-trailing-comma = true
|
||||
|
||||
[tool.semantic_release]
|
||||
build_command = "pip install build wheel && python -m build"
|
||||
version_toml = ["pyproject.toml:project.version"]
|
||||
@@ -109,16 +90,16 @@ default = "semantic-release <semantic-release>"
|
||||
|
||||
[tool.semantic_release.commit_parser_options]
|
||||
allowed_tags = [
|
||||
"build",
|
||||
"chore",
|
||||
"ci",
|
||||
"docs",
|
||||
"feat",
|
||||
"fix",
|
||||
"perf",
|
||||
"style",
|
||||
"refactor",
|
||||
"test",
|
||||
"build",
|
||||
"chore",
|
||||
"ci",
|
||||
"docs",
|
||||
"feat",
|
||||
"fix",
|
||||
"perf",
|
||||
"style",
|
||||
"refactor",
|
||||
"test",
|
||||
]
|
||||
minor_tags = ["feat"]
|
||||
patch_tags = ["fix", "perf"]
|
||||
@@ -135,3 +116,14 @@ env = "GH_TOKEN"
|
||||
[tool.semantic_release.publish]
|
||||
dist_glob_patterns = ["dist/*"]
|
||||
upload_to_vcs_release = true
|
||||
|
||||
[tool.coverage.report]
|
||||
skip_empty = true # exclude empty *files*, e.g. __init__.py, from the report
|
||||
exclude_lines = [
|
||||
"pragma: no cover",
|
||||
"if TYPE_CHECKING:",
|
||||
"return NotImplemented",
|
||||
"raise NotImplementedError",
|
||||
"\\.\\.\\.",
|
||||
'if __name__ == "__main__":',
|
||||
]
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# BENCHMARK_TITLE: Import bec_widgets
|
||||
set -euo pipefail
|
||||
|
||||
python -c 'import bec_widgets; print(bec_widgets.__file__)'
|
||||
@@ -1,5 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# BENCHMARK_TITLE: BEC IPython client with companion app
|
||||
set -euo pipefail
|
||||
|
||||
bec --post-startup-file tests/benchmarks/hyperfine/utils/exit_bec_startup.py
|
||||
@@ -1,5 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# BENCHMARK_TITLE: BEC IPython client without companion app
|
||||
set -euo pipefail
|
||||
|
||||
bec --nogui --post-startup-file tests/benchmarks/hyperfine/utils/exit_bec_startup.py
|
||||
@@ -1,5 +0,0 @@
|
||||
import time
|
||||
|
||||
_ip = get_ipython()
|
||||
_ip.confirm_exit = False
|
||||
_ip.ask_exit()
|
||||
@@ -1,7 +1,6 @@
|
||||
import pytest
|
||||
|
||||
from bec_widgets.cli.client import Image, MotorMap, Waveform
|
||||
from bec_widgets.cli.client_utils import BECGuiClient
|
||||
from bec_widgets.cli.rpc.rpc_base import RPCReference
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
@@ -123,7 +122,7 @@ def test_ring_bar(qtbot, connected_client_gui_obj):
|
||||
assert gui._ipython_registry[bar._gui_id].__class__.__name__ == "RingProgressBar"
|
||||
|
||||
|
||||
def test_rpc_gui_obj(connected_client_gui_obj: BECGuiClient, qtbot):
|
||||
def test_rpc_gui_obj(connected_client_gui_obj, qtbot):
|
||||
gui = connected_client_gui_obj
|
||||
|
||||
qtbot.waitUntil(lambda: len(gui.windows) == 1, timeout=3000)
|
||||
|
||||
@@ -93,8 +93,8 @@ def test_available_widgets(qtbot, connected_client_gui_obj):
|
||||
if object_name == "BECShell":
|
||||
continue
|
||||
|
||||
# Skip BecConsole as ttyd is not installed
|
||||
if object_name == "BecConsole":
|
||||
# Skip WebConsole as ttyd is not installed
|
||||
if object_name == "WebConsole":
|
||||
continue
|
||||
|
||||
#############################
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
|
||||
from bec_widgets.widgets.plots.waveform.waveform import Waveform
|
||||
from tests.unit_tests.client_mocks import mocked_client
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def dock_area(qtbot, mocked_client):
|
||||
widget = BECDockArea(client=mocked_client)
|
||||
qtbot.addWidget(widget)
|
||||
qtbot.waitExposed(widget)
|
||||
yield widget
|
||||
|
||||
|
||||
def test_add_waveform_to_dock_area(benchmark, dock_area, qtbot, mocked_client):
|
||||
"""Benchmark adding a Waveform widget to an existing dock area."""
|
||||
|
||||
def add_waveform():
|
||||
dock_area.new("Waveform")
|
||||
return dock_area
|
||||
|
||||
dock = benchmark(add_waveform)
|
||||
|
||||
assert dock is not None
|
||||
@@ -1,8 +1,3 @@
|
||||
# Force ophyd onto its dummy control layer in tests so importing it does not
|
||||
# try to create a real EPICS CA context.
|
||||
import os
|
||||
|
||||
os.environ.setdefault("OPHYD_CONTROL_LAYER", "dummy")
|
||||
import json
|
||||
import time
|
||||
from unittest import mock
|
||||
@@ -18,7 +13,7 @@ from bec_lib.client import BECClient
|
||||
from bec_lib.messages import _StoredDataInfo
|
||||
from bec_qthemes import apply_theme
|
||||
from bec_qthemes._theme import Theme
|
||||
from ophyd._dummy_shim import _dispatcher
|
||||
from ophyd._pyepics_shim import _dispatcher
|
||||
from pytestqt.exceptions import TimeoutError as QtBotTimeoutError
|
||||
from qtpy.QtCore import QEvent, QEventLoop
|
||||
from qtpy.QtWidgets import QApplication, QMessageBox
|
||||
|
||||
@@ -1,252 +0,0 @@
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
import shiboken6
|
||||
from qtpy.QtCore import QEvent, QEventLoop, Qt
|
||||
from qtpy.QtGui import QHideEvent, QShowEvent
|
||||
from qtpy.QtTest import QTest
|
||||
from qtpy.QtWidgets import QApplication, QWidget
|
||||
|
||||
import bec_widgets.widgets.editors.bec_console.bec_console as bec_console_module
|
||||
from bec_widgets.widgets.editors.bec_console.bec_console import (
|
||||
BecConsole,
|
||||
BECShell,
|
||||
ConsoleMode,
|
||||
_bec_console_registry,
|
||||
)
|
||||
|
||||
from .client_mocks import mocked_client
|
||||
|
||||
|
||||
def process_deferred_deletes():
|
||||
app = QApplication.instance()
|
||||
app.sendPostedEvents(None, QEvent.DeferredDelete)
|
||||
app.processEvents(QEventLoop.AllEvents)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def clean_bec_console_registry():
|
||||
_bec_console_registry.clear()
|
||||
yield
|
||||
_bec_console_registry.clear()
|
||||
process_deferred_deletes()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def console_widget(qtbot):
|
||||
"""Create a BecConsole widget."""
|
||||
widget = BecConsole(client=mocked_client, gui_id="test_console", terminal_id="test_terminal")
|
||||
qtbot.addWidget(widget)
|
||||
return widget
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def two_console_widgets_same_terminal(qtbot):
|
||||
widget1 = BecConsole(client=mocked_client, gui_id="console_1", terminal_id="shared_terminal")
|
||||
widget2 = BecConsole(client=mocked_client, gui_id="console_2", terminal_id="shared_terminal")
|
||||
qtbot.addWidget(widget1)
|
||||
qtbot.addWidget(widget2)
|
||||
return widget1, widget2
|
||||
|
||||
|
||||
def test_bec_console_initialization(console_widget: BecConsole):
|
||||
assert console_widget.console_id == "test_console"
|
||||
assert console_widget.terminal_id == "test_terminal"
|
||||
assert console_widget._mode == ConsoleMode.ACTIVE
|
||||
assert console_widget.term is not None
|
||||
assert console_widget._overlay.isHidden()
|
||||
console_widget.show()
|
||||
assert console_widget.isVisible()
|
||||
assert _bec_console_registry.owner_is_visible(console_widget.terminal_id)
|
||||
|
||||
|
||||
def test_bec_console_yield_terminal_ownership(console_widget):
|
||||
console_widget.show()
|
||||
console_widget.take_terminal_ownership()
|
||||
console_widget.yield_ownership()
|
||||
assert console_widget.term is None
|
||||
assert console_widget._mode == ConsoleMode.INACTIVE
|
||||
|
||||
|
||||
def test_bec_console_hide_event_yields_ownership(console_widget):
|
||||
console_widget.take_terminal_ownership()
|
||||
console_widget.hideEvent(QHideEvent())
|
||||
assert console_widget.term is None
|
||||
assert console_widget._mode == ConsoleMode.HIDDEN
|
||||
|
||||
|
||||
def test_bec_console_show_event_takes_ownership(console_widget):
|
||||
console_widget.yield_ownership()
|
||||
console_widget.showEvent(QShowEvent())
|
||||
assert console_widget.term is not None
|
||||
assert console_widget._mode == ConsoleMode.ACTIVE
|
||||
|
||||
|
||||
def test_bec_console_overlay_click_takes_ownership(qtbot, console_widget):
|
||||
console_widget.yield_ownership()
|
||||
assert console_widget._mode == ConsoleMode.HIDDEN
|
||||
|
||||
QTest.mouseClick(console_widget._overlay, Qt.LeftButton)
|
||||
assert console_widget.term is not None
|
||||
assert console_widget._mode == ConsoleMode.ACTIVE
|
||||
assert not console_widget._overlay.isVisible()
|
||||
|
||||
|
||||
def test_two_consoles_shared_terminal(two_console_widgets_same_terminal):
|
||||
widget1, widget2 = two_console_widgets_same_terminal
|
||||
|
||||
# Widget1 takes ownership
|
||||
widget1.take_terminal_ownership()
|
||||
assert widget1.term is not None
|
||||
assert widget1._mode == ConsoleMode.ACTIVE
|
||||
assert widget2.term is None
|
||||
assert widget2._mode == ConsoleMode.HIDDEN
|
||||
|
||||
# Widget2 takes ownership
|
||||
widget2.take_terminal_ownership()
|
||||
assert widget2.term is not None
|
||||
assert widget2._mode == ConsoleMode.ACTIVE
|
||||
assert widget1.term is None
|
||||
assert widget1._mode == ConsoleMode.HIDDEN
|
||||
|
||||
|
||||
def test_bec_console_registry_cleanup(console_widget: BecConsole):
|
||||
console_widget.take_terminal_ownership()
|
||||
terminal_id = console_widget.terminal_id
|
||||
|
||||
assert terminal_id in _bec_console_registry._terminal_registry
|
||||
_bec_console_registry.unregister(console_widget)
|
||||
assert terminal_id not in _bec_console_registry._terminal_registry
|
||||
|
||||
|
||||
def test_bec_shell_initialization(qtbot):
|
||||
widget = BECShell(gui_id="bec_shell")
|
||||
qtbot.addWidget(widget)
|
||||
|
||||
assert widget.console_id == "bec_shell"
|
||||
assert widget.terminal_id == "bec_shell"
|
||||
assert widget.startup_cmd is not None
|
||||
|
||||
|
||||
def test_bec_console_write(console_widget):
|
||||
console_widget.take_terminal_ownership()
|
||||
with mock.patch.object(console_widget.term, "write") as mock_write:
|
||||
console_widget.write("test command")
|
||||
mock_write.assert_called_once_with("test command", True)
|
||||
|
||||
|
||||
def test_is_owner(console_widget: BecConsole):
|
||||
assert _bec_console_registry.is_owner(console_widget)
|
||||
mock_console = mock.MagicMock()
|
||||
mock_console.console_id = "fake_console"
|
||||
_bec_console_registry._consoles["fake_console"] = mock_console
|
||||
assert not _bec_console_registry.is_owner(mock_console)
|
||||
mock_console.terminal_id = console_widget.terminal_id
|
||||
assert not _bec_console_registry.is_owner(mock_console)
|
||||
|
||||
|
||||
def test_closing_active_console_keeps_terminal_valid_for_remaining_console(qtbot):
|
||||
widget1 = BecConsole(client=mocked_client, gui_id="close_owner", terminal_id="shared_close")
|
||||
widget2 = BecConsole(client=mocked_client, gui_id="remaining", terminal_id="shared_close")
|
||||
qtbot.addWidget(widget2)
|
||||
|
||||
widget1.take_terminal_ownership()
|
||||
term = widget1.term
|
||||
assert term is not None
|
||||
|
||||
widget1.close()
|
||||
widget1.deleteLater()
|
||||
process_deferred_deletes()
|
||||
|
||||
assert shiboken6.isValid(term)
|
||||
widget2.take_terminal_ownership()
|
||||
|
||||
assert widget2.term is term
|
||||
assert widget2._mode == ConsoleMode.ACTIVE
|
||||
|
||||
|
||||
def test_active_console_detaches_terminal_before_destruction(qtbot):
|
||||
widget1 = BecConsole(client=mocked_client, gui_id="owner", terminal_id="shared_detach")
|
||||
widget2 = BecConsole(client=mocked_client, gui_id="survivor", terminal_id="shared_detach")
|
||||
qtbot.addWidget(widget1)
|
||||
qtbot.addWidget(widget2)
|
||||
|
||||
widget1.take_terminal_ownership()
|
||||
term = widget1.term
|
||||
assert term is not None
|
||||
assert widget1.isAncestorOf(term)
|
||||
|
||||
widget1.close()
|
||||
|
||||
assert shiboken6.isValid(term)
|
||||
assert not widget1.isAncestorOf(term)
|
||||
assert term.parent() is widget2._term_holder
|
||||
|
||||
|
||||
def test_bec_shell_terminal_persists_after_last_shell_unregisters(qtbot):
|
||||
shell = BECShell(gui_id="bec_shell_persistent")
|
||||
qtbot.addWidget(shell)
|
||||
term = shell.term
|
||||
assert term is not None
|
||||
|
||||
_bec_console_registry.unregister(shell)
|
||||
|
||||
info = _bec_console_registry._terminal_registry["bec_shell"]
|
||||
assert info.registered_console_ids == set()
|
||||
assert info.owner_console_id is None
|
||||
assert info.persist_session is True
|
||||
assert info.instance is term
|
||||
assert shiboken6.isValid(term)
|
||||
|
||||
|
||||
def test_new_bec_shell_claims_preserved_terminal(qtbot):
|
||||
shell1 = BECShell(gui_id="bec_shell_first")
|
||||
term = shell1.term
|
||||
assert term is not None
|
||||
|
||||
shell1.close()
|
||||
shell1.deleteLater()
|
||||
process_deferred_deletes()
|
||||
|
||||
assert "bec_shell" in _bec_console_registry._terminal_registry
|
||||
assert shiboken6.isValid(term)
|
||||
|
||||
shell2 = BECShell(gui_id="bec_shell_second")
|
||||
qtbot.addWidget(shell2)
|
||||
shell2.showEvent(QShowEvent())
|
||||
|
||||
assert shell2.term is term
|
||||
assert shell2._mode == ConsoleMode.ACTIVE
|
||||
|
||||
|
||||
def test_persistent_bec_shell_sends_startup_command_once(qtbot, monkeypatch):
|
||||
class RecordingTerminal(QWidget):
|
||||
writes = []
|
||||
|
||||
def write(self, text: str, add_newline: bool = True):
|
||||
self.writes.append((text, add_newline))
|
||||
|
||||
monkeypatch.setattr(bec_console_module, "_BecTermClass", RecordingTerminal)
|
||||
|
||||
shell1 = BECShell(gui_id="bec_shell_startup_first")
|
||||
shell1.close()
|
||||
shell1.deleteLater()
|
||||
process_deferred_deletes()
|
||||
|
||||
shell2 = BECShell(gui_id="bec_shell_startup_second")
|
||||
qtbot.addWidget(shell2)
|
||||
shell2.showEvent(QShowEvent())
|
||||
|
||||
assert len(RecordingTerminal.writes) == 1
|
||||
assert RecordingTerminal.writes[0][0].startswith("bec ")
|
||||
assert RecordingTerminal.writes[0][1] is True
|
||||
|
||||
|
||||
def test_plain_console_terminal_removed_after_last_unregister(qtbot):
|
||||
widget = BecConsole(client=mocked_client, gui_id="plain_console", terminal_id="plain_terminal")
|
||||
qtbot.addWidget(widget)
|
||||
|
||||
assert "plain_terminal" in _bec_console_registry._terminal_registry
|
||||
_bec_console_registry.unregister(widget)
|
||||
|
||||
assert "plain_terminal" not in _bec_console_registry._terminal_registry
|
||||
@@ -918,7 +918,7 @@ class TestToolbarFunctionality:
|
||||
action.trigger()
|
||||
if action_name == "terminal":
|
||||
mock_new.assert_called_once_with(
|
||||
widget="BecConsole", closable=True, startup_cmd=None
|
||||
widget="WebConsole", closable=True, startup_cmd=None
|
||||
)
|
||||
else:
|
||||
mock_new.assert_called_once_with(widget=widget_type)
|
||||
@@ -2272,7 +2272,7 @@ class TestFlatToolbarActions:
|
||||
"flat_queue": "BECQueue",
|
||||
"flat_status": "BECStatusBox",
|
||||
"flat_progress_bar": "RingProgressBar",
|
||||
"flat_terminal": "BecConsole",
|
||||
"flat_terminal": "WebConsole",
|
||||
"flat_bec_shell": "BECShell",
|
||||
"flat_sbb_monitor": "SBBMonitor",
|
||||
}
|
||||
|
||||
@@ -277,6 +277,133 @@ def test_populate_scans(scan_control, mocked_client):
|
||||
assert sorted(items) == sorted(expected_scans)
|
||||
|
||||
|
||||
def test_scan_control_uses_gui_visibility_and_signature(qtbot, mocked_client):
|
||||
scan_info = {
|
||||
"class": "AnnotatedScan",
|
||||
"base_class": "ScanBase",
|
||||
"arg_input": {
|
||||
"device": "DeviceBase",
|
||||
"start": {
|
||||
"Annotated": {
|
||||
"type": "float",
|
||||
"metadata": {
|
||||
"ScanArgument": {
|
||||
"display_name": "Start Position",
|
||||
"description": "Start position",
|
||||
"tooltip": "Custom start tooltip",
|
||||
"expert": False,
|
||||
"alternative_group": None,
|
||||
"units": None,
|
||||
"reference_units": "device",
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
"stop": {
|
||||
"Annotated": {
|
||||
"type": "float",
|
||||
"metadata": {
|
||||
"ScanArgument": {
|
||||
"display_name": None,
|
||||
"description": "Stop position",
|
||||
"tooltip": None,
|
||||
"expert": False,
|
||||
"alternative_group": None,
|
||||
"units": None,
|
||||
"reference_units": "device",
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
},
|
||||
"arg_bundle_size": {"bundle": 3, "min": 1, "max": None},
|
||||
"gui_visibility": {
|
||||
"Movement Parameters": ["steps", "step_size"],
|
||||
"Acquisition Parameters": ["exp_time", "relative"],
|
||||
},
|
||||
"required_kwargs": [],
|
||||
"signature": [
|
||||
{"name": "args", "kind": "VAR_POSITIONAL", "default": "_empty", "annotation": "_empty"},
|
||||
{"name": "steps", "kind": "KEYWORD_ONLY", "default": 10, "annotation": "int"},
|
||||
{
|
||||
"name": "step_size",
|
||||
"kind": "KEYWORD_ONLY",
|
||||
"default": None,
|
||||
"annotation": {
|
||||
"Annotated": {
|
||||
"type": "float",
|
||||
"metadata": {
|
||||
"ScanArgument": {
|
||||
"display_name": "Step Size Custom",
|
||||
"description": "Step size",
|
||||
"tooltip": "Custom step tooltip",
|
||||
"expert": False,
|
||||
"alternative_group": "scan_resolution",
|
||||
"units": "mm",
|
||||
"reference_units": None,
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "exp_time",
|
||||
"kind": "KEYWORD_ONLY",
|
||||
"default": 0,
|
||||
"annotation": {
|
||||
"Annotated": {
|
||||
"type": "float",
|
||||
"metadata": {
|
||||
"ScanArgument": {
|
||||
"display_name": None,
|
||||
"description": None,
|
||||
"tooltip": "Exposure time",
|
||||
"expert": False,
|
||||
"alternative_group": None,
|
||||
"units": "s",
|
||||
"reference_units": None,
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
},
|
||||
{"name": "relative", "kind": "KEYWORD_ONLY", "default": False, "annotation": "bool"},
|
||||
{"name": "kwargs", "kind": "VAR_KEYWORD", "default": "_empty", "annotation": "_empty"},
|
||||
],
|
||||
}
|
||||
mocked_client.connector.set(
|
||||
MessageEndpoints.available_scans(),
|
||||
AvailableResourceMessage(resource={"annotated_scan": scan_info}),
|
||||
)
|
||||
|
||||
widget = ScanControl(client=mocked_client)
|
||||
qtbot.addWidget(widget)
|
||||
qtbot.waitExposed(widget)
|
||||
widget.comboBox_scan_selection.setCurrentText("annotated_scan")
|
||||
|
||||
assert widget.comboBox_scan_selection.count() == 1
|
||||
assert widget.arg_box.layout.itemAtPosition(0, 1).widget().text() == "Start Position"
|
||||
assert widget.arg_box.widgets[1].toolTip() == "Custom start tooltip\nUnits from: device"
|
||||
with patch.object(mocked_client.device_manager.devices.samx, "egu", return_value="mm"):
|
||||
WidgetIO.set_value(widget.arg_box.widgets[0], "samx")
|
||||
assert widget.arg_box.layout.itemAtPosition(0, 1).widget().text() == "Start Position"
|
||||
assert widget.arg_box.widgets[1].suffix() == " mm"
|
||||
assert widget.arg_box.widgets[1].toolTip() == "Custom start tooltip\nUnits: mm"
|
||||
widget.arg_box.widgets[0].setText("not_a_device")
|
||||
assert widget.arg_box.layout.itemAtPosition(0, 1).widget().text() == "Start Position"
|
||||
assert widget.arg_box.widgets[1].suffix() == ""
|
||||
assert widget.arg_box.widgets[1].toolTip() == "Custom start tooltip\nUnits from: device"
|
||||
assert [box.title() for box in widget.kwarg_boxes] == [
|
||||
"Movement Parameters",
|
||||
"Acquisition Parameters",
|
||||
]
|
||||
assert widget.kwarg_boxes[0].layout.itemAtPosition(0, 1).widget().text() == "Step Size Custom"
|
||||
assert widget.kwarg_boxes[0].widgets[1].suffix() == " mm"
|
||||
assert widget.kwarg_boxes[0].widgets[1].toolTip() == "Custom step tooltip\nUnits: mm"
|
||||
assert widget.kwarg_boxes[1].layout.itemAtPosition(0, 0).widget().text() == "Exp Time"
|
||||
assert widget.kwarg_boxes[1].widgets[0].toolTip() == "Exposure time\nUnits: s"
|
||||
|
||||
|
||||
def test_current_scan(scan_control, mocked_client):
|
||||
current_scan = scan_control.current_scan
|
||||
wrong_scan = "error_scan"
|
||||
|
||||
@@ -157,3 +157,81 @@ def test_arg_box(qtbot):
|
||||
# Widget 2
|
||||
assert arg_box.widgets[2].__class__.__name__ == "ScanSpinBox"
|
||||
assert arg_box.widgets[2].arg_name
|
||||
|
||||
|
||||
def test_spinbox_limits_from_scan_info(qtbot):
|
||||
group_input = {
|
||||
"name": "Kwarg Test",
|
||||
"inputs": [
|
||||
{
|
||||
"arg": False,
|
||||
"name": "exp_time",
|
||||
"type": "float",
|
||||
"display_name": "Exp Time",
|
||||
"tooltip": "Exposure time in seconds",
|
||||
"default": 2.0,
|
||||
"expert": False,
|
||||
"precision": 3,
|
||||
"gt": 1.5,
|
||||
"ge": None,
|
||||
"lt": 5.0,
|
||||
"le": None,
|
||||
},
|
||||
{
|
||||
"arg": False,
|
||||
"name": "num_points",
|
||||
"type": "int",
|
||||
"display_name": "Num Points",
|
||||
"tooltip": "Number of points",
|
||||
"default": 4,
|
||||
"expert": False,
|
||||
"gt": None,
|
||||
"ge": 3,
|
||||
"lt": 9,
|
||||
"le": None,
|
||||
},
|
||||
{
|
||||
"arg": False,
|
||||
"name": "settling_time",
|
||||
"type": "float",
|
||||
"display_name": "Settling Time",
|
||||
"tooltip": "Settling time in seconds",
|
||||
"default": 0.5,
|
||||
"expert": False,
|
||||
"gt": None,
|
||||
"ge": 0.2,
|
||||
"lt": None,
|
||||
"le": 3.5,
|
||||
},
|
||||
{
|
||||
"arg": False,
|
||||
"name": "steps",
|
||||
"type": "int",
|
||||
"display_name": "Steps",
|
||||
"tooltip": "Number of steps",
|
||||
"default": 4,
|
||||
"expert": False,
|
||||
"gt": 0,
|
||||
"ge": None,
|
||||
"lt": None,
|
||||
"le": 10,
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
kwarg_box = ScanGroupBox(box_type="kwargs", config=group_input)
|
||||
|
||||
exp_time = kwarg_box.widgets[0]
|
||||
num_points = kwarg_box.widgets[1]
|
||||
settling_time = kwarg_box.widgets[2]
|
||||
steps = kwarg_box.widgets[3]
|
||||
|
||||
assert exp_time.decimals() == 3
|
||||
assert exp_time.minimum() == 1.501
|
||||
assert exp_time.maximum() == 4.999
|
||||
assert num_points.minimum() == 3
|
||||
assert num_points.maximum() == 8
|
||||
assert settling_time.minimum() == 0.2
|
||||
assert settling_time.maximum() == 3.5
|
||||
assert steps.minimum() == 1
|
||||
assert steps.maximum() == 10
|
||||
|
||||
476
tests/unit_tests/test_web_console.py
Normal file
476
tests/unit_tests/test_web_console.py
Normal file
@@ -0,0 +1,476 @@
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from qtpy.QtCore import Qt
|
||||
from qtpy.QtGui import QHideEvent
|
||||
from qtpy.QtNetwork import QAuthenticator
|
||||
|
||||
from bec_widgets.widgets.editors.web_console.web_console import (
|
||||
BECShell,
|
||||
ConsoleMode,
|
||||
WebConsole,
|
||||
_web_console_registry,
|
||||
)
|
||||
|
||||
from .client_mocks import mocked_client
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mocked_server_startup():
|
||||
"""Mock the web console server startup process."""
|
||||
with mock.patch(
|
||||
"bec_widgets.widgets.editors.web_console.web_console.subprocess"
|
||||
) as mock_subprocess:
|
||||
with mock.patch.object(_web_console_registry, "_wait_for_server_port"):
|
||||
_web_console_registry._server_port = 12345
|
||||
yield mock_subprocess
|
||||
|
||||
|
||||
def static_console(qtbot, client, unique_id: str | None = None):
|
||||
"""Fixture to provide a static unique_id for WebConsole tests."""
|
||||
if unique_id is None:
|
||||
widget = WebConsole(client=client)
|
||||
else:
|
||||
widget = WebConsole(client=client, unique_id=unique_id)
|
||||
qtbot.addWidget(widget)
|
||||
qtbot.waitExposed(widget)
|
||||
return widget
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def console_widget(qtbot, mocked_client, mocked_server_startup):
|
||||
"""Create a WebConsole widget with mocked server startup."""
|
||||
yield static_console(qtbot, mocked_client)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bec_shell_widget(qtbot, mocked_client, mocked_server_startup):
|
||||
"""Create a BECShell widget with mocked server startup."""
|
||||
widget = BECShell(client=mocked_client)
|
||||
qtbot.addWidget(widget)
|
||||
qtbot.waitExposed(widget)
|
||||
yield widget
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def console_widget_with_static_id(qtbot, mocked_client, mocked_server_startup):
|
||||
"""Create a WebConsole widget with a static unique ID."""
|
||||
yield static_console(qtbot, mocked_client, unique_id="test_console")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def two_console_widgets_same_id(qtbot, mocked_client, mocked_server_startup):
|
||||
"""Create two WebConsole widgets sharing the same unique ID."""
|
||||
widget1 = static_console(qtbot, mocked_client, unique_id="shared_console")
|
||||
widget2 = static_console(qtbot, mocked_client, unique_id="shared_console")
|
||||
yield widget1, widget2
|
||||
|
||||
|
||||
def test_web_console_widget_initialization(console_widget):
|
||||
assert (
|
||||
console_widget.page.url().toString()
|
||||
== f"http://localhost:{_web_console_registry._server_port}"
|
||||
)
|
||||
|
||||
|
||||
def test_web_console_write(console_widget):
|
||||
# Test the write method
|
||||
with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js:
|
||||
console_widget.write("Hello, World!")
|
||||
|
||||
assert mock.call('window.term.paste("Hello, World!");') in mock_run_js.mock_calls
|
||||
|
||||
|
||||
def test_web_console_write_no_return(console_widget):
|
||||
# Test the write method with send_return=False
|
||||
with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js:
|
||||
console_widget.write("Hello, World!", send_return=False)
|
||||
|
||||
assert mock.call('window.term.paste("Hello, World!");') in mock_run_js.mock_calls
|
||||
assert mock_run_js.call_count == 1
|
||||
|
||||
|
||||
def test_web_console_send_return(console_widget):
|
||||
# Test the send_return method
|
||||
with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js:
|
||||
console_widget.send_return()
|
||||
|
||||
script = mock_run_js.call_args[0][0]
|
||||
assert "new KeyboardEvent('keypress', {charCode: 13})" in script
|
||||
assert mock_run_js.call_count == 1
|
||||
|
||||
|
||||
def test_web_console_send_ctrl_c(console_widget):
|
||||
# Test the send_ctrl_c method
|
||||
with mock.patch.object(console_widget.page, "runJavaScript") as mock_run_js:
|
||||
console_widget.send_ctrl_c()
|
||||
|
||||
script = mock_run_js.call_args[0][0]
|
||||
assert "new KeyboardEvent('keypress', {charCode: 3})" in script
|
||||
assert mock_run_js.call_count == 1
|
||||
|
||||
|
||||
def test_web_console_authenticate(console_widget):
|
||||
# Test the _authenticate method
|
||||
token = _web_console_registry._token
|
||||
mock_auth = mock.MagicMock(spec=QAuthenticator)
|
||||
console_widget._authenticate(None, mock_auth)
|
||||
mock_auth.setUser.assert_called_once_with("user")
|
||||
mock_auth.setPassword.assert_called_once_with(token)
|
||||
|
||||
|
||||
def test_web_console_registry_wait_for_server_port():
|
||||
# Test the _wait_for_server_port method
|
||||
with mock.patch.object(_web_console_registry, "_server_process") as mock_subprocess:
|
||||
mock_subprocess.stderr.readline.side_effect = [b"Starting", b"Listening on port: 12345"]
|
||||
_web_console_registry._wait_for_server_port()
|
||||
assert _web_console_registry._server_port == 12345
|
||||
|
||||
|
||||
def test_web_console_registry_wait_for_server_port_timeout():
|
||||
# Test the _wait_for_server_port method with timeout
|
||||
with mock.patch.object(_web_console_registry, "_server_process") as mock_subprocess:
|
||||
with pytest.raises(TimeoutError):
|
||||
_web_console_registry._wait_for_server_port(timeout=0.1)
|
||||
|
||||
|
||||
def test_web_console_startup_command_execution(console_widget, qtbot):
|
||||
"""Test that the startup command is triggered after successful initialization."""
|
||||
# Set a custom startup command
|
||||
console_widget.startup_cmd = "test startup command"
|
||||
|
||||
assert console_widget.startup_cmd == "test startup command"
|
||||
|
||||
# Generator to simulate JS initialization sequence
|
||||
def js_readiness_sequence():
|
||||
yield False # First call: not ready yet
|
||||
while True:
|
||||
yield True # Any subsequent calls: ready
|
||||
|
||||
readiness_gen = js_readiness_sequence()
|
||||
|
||||
def mock_run_js(script, callback=None):
|
||||
# Check if this is the initialization check call
|
||||
if "window.term !== undefined" in script and callback:
|
||||
ready = next(readiness_gen)
|
||||
callback(ready)
|
||||
else:
|
||||
# For other JavaScript calls (like paste), just call the callback
|
||||
if callback:
|
||||
callback(True)
|
||||
|
||||
with mock.patch.object(
|
||||
console_widget.page, "runJavaScript", side_effect=mock_run_js
|
||||
) as mock_run_js_method:
|
||||
# Reset initialization state and start the timer
|
||||
console_widget._is_initialized = False
|
||||
console_widget._startup_timer.start()
|
||||
|
||||
# Wait for the initialization to complete
|
||||
qtbot.waitUntil(lambda: console_widget._is_initialized, timeout=3000)
|
||||
|
||||
# Verify that the startup command was executed
|
||||
startup_calls = [
|
||||
call
|
||||
for call in mock_run_js_method.call_args_list
|
||||
if "test startup command" in str(call)
|
||||
]
|
||||
assert len(startup_calls) > 0, "Startup command should have been executed"
|
||||
|
||||
# Verify the initialized signal was emitted
|
||||
assert console_widget._is_initialized is True
|
||||
assert not console_widget._startup_timer.isActive()
|
||||
|
||||
|
||||
def test_bec_shell_startup_contains_gui_id(bec_shell_widget):
|
||||
"""Test that the BEC shell startup command includes the GUI ID."""
|
||||
bec_shell = bec_shell_widget
|
||||
|
||||
assert bec_shell._is_bec_shell
|
||||
assert bec_shell._unique_id == "bec_shell"
|
||||
|
||||
with mock.patch.object(bec_shell.bec_dispatcher, "cli_server", None):
|
||||
assert bec_shell.startup_cmd == "bec --nogui"
|
||||
|
||||
with mock.patch.object(bec_shell.bec_dispatcher.cli_server, "gui_id", "test_gui_id"):
|
||||
assert bec_shell.startup_cmd == "bec --gui-id test_gui_id"
|
||||
|
||||
|
||||
def test_web_console_set_readonly(console_widget):
|
||||
# Test the set_readonly method
|
||||
console_widget.set_readonly(True)
|
||||
assert not console_widget.isEnabled()
|
||||
|
||||
console_widget.set_readonly(False)
|
||||
assert console_widget.isEnabled()
|
||||
|
||||
|
||||
def test_web_console_with_unique_id(console_widget_with_static_id):
|
||||
"""Test creating a WebConsole with a unique_id."""
|
||||
widget = console_widget_with_static_id
|
||||
|
||||
assert widget._unique_id == "test_console"
|
||||
assert widget._unique_id in _web_console_registry._page_registry
|
||||
page_info = _web_console_registry.get_page_info("test_console")
|
||||
assert page_info is not None
|
||||
assert page_info.owner_gui_id == widget.gui_id
|
||||
assert widget.gui_id in page_info.widget_ids
|
||||
|
||||
|
||||
def test_web_console_page_sharing(two_console_widgets_same_id):
|
||||
"""Test that two widgets can share the same page using unique_id."""
|
||||
widget1, widget2 = two_console_widgets_same_id
|
||||
|
||||
# Both should reference the same page in the registry
|
||||
page_info = _web_console_registry.get_page_info("shared_console")
|
||||
assert page_info is not None
|
||||
assert widget1.gui_id in page_info.widget_ids
|
||||
assert widget2.gui_id in page_info.widget_ids
|
||||
assert widget1.page == widget2.page
|
||||
|
||||
|
||||
def test_web_console_has_ownership(console_widget_with_static_id):
|
||||
"""Test the has_ownership method."""
|
||||
widget = console_widget_with_static_id
|
||||
|
||||
# Widget should have ownership by default
|
||||
assert widget.has_ownership()
|
||||
|
||||
|
||||
def test_web_console_yield_ownership(console_widget_with_static_id):
|
||||
"""Test yielding ownership of a page."""
|
||||
widget = console_widget_with_static_id
|
||||
|
||||
assert widget.has_ownership()
|
||||
|
||||
# Yield ownership
|
||||
widget.yield_ownership()
|
||||
|
||||
# Widget should no longer have ownership
|
||||
assert not widget.has_ownership()
|
||||
page_info = _web_console_registry.get_page_info("test_console")
|
||||
assert page_info.owner_gui_id is None
|
||||
# Overlay should be shown
|
||||
assert widget._mode == ConsoleMode.INACTIVE
|
||||
|
||||
|
||||
def test_web_console_take_page_ownership(two_console_widgets_same_id):
|
||||
"""Test taking ownership of a page."""
|
||||
widget1, widget2 = two_console_widgets_same_id
|
||||
|
||||
# Widget1 should have ownership initially
|
||||
assert widget1.has_ownership()
|
||||
assert not widget2.has_ownership()
|
||||
|
||||
# Widget2 takes ownership
|
||||
widget2.take_page_ownership()
|
||||
|
||||
# Now widget2 should have ownership
|
||||
assert not widget1.has_ownership()
|
||||
assert widget2.has_ownership()
|
||||
|
||||
assert widget2._mode == ConsoleMode.ACTIVE
|
||||
assert widget1._mode == ConsoleMode.INACTIVE
|
||||
|
||||
|
||||
def test_web_console_hide_event_yields_ownership(qtbot, console_widget_with_static_id):
|
||||
"""Test that hideEvent yields ownership."""
|
||||
widget = console_widget_with_static_id
|
||||
|
||||
assert widget.has_ownership()
|
||||
|
||||
# Hide the widget. Note that we cannot call widget.hide() directly
|
||||
# because it doesn't trigger the hideEvent in tests as widgets are
|
||||
# not visible in the test environment.
|
||||
widget.hideEvent(QHideEvent())
|
||||
qtbot.wait(100) # Allow event processing
|
||||
|
||||
# Widget should have yielded ownership
|
||||
assert not widget.has_ownership()
|
||||
page_info = _web_console_registry.get_page_info("test_console")
|
||||
assert page_info.owner_gui_id is None
|
||||
|
||||
|
||||
def test_web_console_show_event_takes_ownership(console_widget_with_static_id):
|
||||
"""Test that showEvent takes ownership when page has no owner."""
|
||||
widget = console_widget_with_static_id
|
||||
|
||||
# Yield ownership
|
||||
widget.yield_ownership()
|
||||
assert not widget.has_ownership()
|
||||
|
||||
# Show the widget again
|
||||
widget.show()
|
||||
|
||||
# Widget should have reclaimed ownership
|
||||
assert widget.has_ownership()
|
||||
assert widget.browser.isVisible()
|
||||
assert not widget.overlay.isVisible()
|
||||
|
||||
|
||||
def test_web_console_mouse_press_takes_ownership(qtbot, two_console_widgets_same_id):
|
||||
"""Test that clicking on overlay takes ownership."""
|
||||
widget1, widget2 = two_console_widgets_same_id
|
||||
widget1.show()
|
||||
widget2.show()
|
||||
|
||||
# Widget1 has ownership, widget2 doesn't
|
||||
assert widget1.has_ownership()
|
||||
assert not widget2.has_ownership()
|
||||
assert widget1.isVisible()
|
||||
assert widget1._mode == ConsoleMode.ACTIVE
|
||||
assert widget2._mode == ConsoleMode.INACTIVE
|
||||
|
||||
qtbot.mouseClick(widget2, Qt.MouseButton.LeftButton)
|
||||
|
||||
# Widget2 should now have ownership
|
||||
assert widget2.has_ownership()
|
||||
assert not widget1.has_ownership()
|
||||
|
||||
|
||||
def test_web_console_registry_cleanup_removes_page(console_widget_with_static_id):
|
||||
"""Test that the registry cleans up pages when all widgets are removed."""
|
||||
widget = console_widget_with_static_id
|
||||
|
||||
assert widget._unique_id in _web_console_registry._page_registry
|
||||
|
||||
# Cleanup the widget
|
||||
widget.cleanup()
|
||||
|
||||
# Page should be removed from registry
|
||||
assert widget._unique_id not in _web_console_registry._page_registry
|
||||
|
||||
|
||||
def test_web_console_without_unique_id_no_page_sharing(console_widget):
|
||||
"""Test that widgets without unique_id don't participate in page sharing."""
|
||||
widget = console_widget
|
||||
|
||||
# Widget should not be in the page registry
|
||||
assert widget._unique_id is None
|
||||
assert not widget.has_ownership() # Should return False for non-unique widgets
|
||||
|
||||
|
||||
def test_web_console_registry_get_page_info_nonexistent(qtbot, mocked_client):
|
||||
"""Test getting page info for a non-existent page."""
|
||||
page_info = _web_console_registry.get_page_info("nonexistent")
|
||||
assert page_info is None
|
||||
|
||||
|
||||
def test_web_console_take_ownership_without_unique_id(console_widget):
|
||||
"""Test that take_page_ownership fails gracefully without unique_id."""
|
||||
widget = console_widget
|
||||
# Should not crash when taking ownership without unique_id
|
||||
widget.take_page_ownership()
|
||||
|
||||
|
||||
def test_web_console_yield_ownership_without_unique_id(console_widget):
|
||||
"""Test that yield_ownership fails gracefully without unique_id."""
|
||||
widget = console_widget
|
||||
# Should not crash when yielding ownership without unique_id
|
||||
widget.yield_ownership()
|
||||
|
||||
|
||||
def test_registry_yield_ownership_gui_id_not_in_instances():
|
||||
"""Test registry yield_ownership returns False when gui_id not in instances."""
|
||||
result = _web_console_registry.yield_ownership("nonexistent_gui_id")
|
||||
assert result is False
|
||||
|
||||
|
||||
def test_registry_yield_ownership_instance_is_none(console_widget_with_static_id):
|
||||
"""Test registry yield_ownership returns False when instance weakref is dead."""
|
||||
widget = console_widget_with_static_id
|
||||
gui_id = widget.gui_id
|
||||
|
||||
# Store the gui_id and simulate the weakref being dead
|
||||
_web_console_registry._instances[gui_id] = lambda: None
|
||||
|
||||
result = _web_console_registry.yield_ownership(gui_id)
|
||||
assert result is False
|
||||
|
||||
|
||||
def test_registry_yield_ownership_unique_id_none(console_widget_with_static_id):
|
||||
"""Test registry yield_ownership returns False when page info's unique_id is None."""
|
||||
widget = console_widget_with_static_id
|
||||
gui_id = widget.gui_id
|
||||
unique_id = widget._unique_id
|
||||
widget._unique_id = None
|
||||
|
||||
result = _web_console_registry.yield_ownership(gui_id)
|
||||
assert result is False
|
||||
|
||||
widget._unique_id = unique_id # Restore for cleanup
|
||||
|
||||
|
||||
def test_registry_yield_ownership_unique_id_not_in_page_registry(console_widget_with_static_id):
|
||||
"""Test registry yield_ownership returns False when unique_id not in page registry."""
|
||||
widget = console_widget_with_static_id
|
||||
gui_id = widget.gui_id
|
||||
unique_id = widget._unique_id
|
||||
widget._unique_id = "nonexistent_unique_id"
|
||||
|
||||
result = _web_console_registry.yield_ownership(gui_id)
|
||||
assert result is False
|
||||
|
||||
widget._unique_id = unique_id # Restore for cleanup
|
||||
|
||||
|
||||
def test_registry_owner_is_visible_page_info_none():
|
||||
"""Test owner_is_visible returns False when page info doesn't exist."""
|
||||
result = _web_console_registry.owner_is_visible("nonexistent_page")
|
||||
assert result is False
|
||||
|
||||
|
||||
def test_registry_owner_is_visible_no_owner(console_widget_with_static_id):
|
||||
"""Test owner_is_visible returns False when page has no owner."""
|
||||
widget = console_widget_with_static_id
|
||||
|
||||
# Yield ownership so there's no owner
|
||||
widget.yield_ownership()
|
||||
page_info = _web_console_registry.get_page_info(widget._unique_id)
|
||||
assert page_info.owner_gui_id is None
|
||||
|
||||
result = _web_console_registry.owner_is_visible(widget._unique_id)
|
||||
assert result is False
|
||||
|
||||
|
||||
def test_registry_owner_is_visible_owner_ref_none(console_widget_with_static_id):
|
||||
"""Test owner_is_visible returns False when owner ref doesn't exist in instances."""
|
||||
widget = console_widget_with_static_id
|
||||
unique_id = widget._unique_id
|
||||
|
||||
# Remove owner from instances dict
|
||||
del _web_console_registry._instances[widget.gui_id]
|
||||
|
||||
result = _web_console_registry.owner_is_visible(unique_id)
|
||||
assert result is False
|
||||
|
||||
|
||||
def test_registry_owner_is_visible_owner_instance_none(console_widget_with_static_id):
|
||||
"""Test owner_is_visible returns False when owner instance weakref is dead."""
|
||||
widget = console_widget_with_static_id
|
||||
unique_id = widget._unique_id
|
||||
gui_id = widget.gui_id
|
||||
|
||||
# Simulate dead weakref
|
||||
_web_console_registry._instances[gui_id] = lambda: None
|
||||
|
||||
result = _web_console_registry.owner_is_visible(unique_id)
|
||||
assert result is False
|
||||
|
||||
|
||||
def test_registry_owner_is_visible_owner_visible(console_widget_with_static_id):
|
||||
"""Test owner_is_visible returns True when owner is visible."""
|
||||
widget = console_widget_with_static_id
|
||||
widget.show()
|
||||
|
||||
result = _web_console_registry.owner_is_visible(widget._unique_id)
|
||||
assert result is True
|
||||
|
||||
|
||||
def test_registry_owner_is_visible_owner_not_visible(console_widget_with_static_id):
|
||||
"""Test owner_is_visible returns False when owner is not visible."""
|
||||
widget = console_widget_with_static_id
|
||||
widget.hide()
|
||||
|
||||
result = _web_console_registry.owner_is_visible(widget._unique_id)
|
||||
assert result is False
|
||||
Reference in New Issue
Block a user