Compare commits

..

2 Commits

Author SHA1 Message Date
wakonig_k ea306868df wip - subprocess 2025-07-07 19:02:50 +02:00
wakonig_k 4f3fd2f906 wip - feat: add monaco editor 2025-07-07 19:02:50 +02:00
696 changed files with 59243 additions and 77119 deletions
+1 -2
View File
@@ -53,7 +53,6 @@ runs:
sudo apt-get update
sudo apt-get install -y libgl1 libegl1 x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb
sudo apt-get -y install libnss3 libxdamage1 libasound2t64 libatomic1 libxcursor1
sudo apt-get -y install ttyd
- name: Install Python dependencies
shell: bash
@@ -62,4 +61,4 @@ runs:
uv pip install --system -e ./ophyd_devices
uv pip install --system -e ./bec/bec_lib[dev]
uv pip install --system -e ./bec/bec_ipython_client
uv pip install --system -e ./bec_widgets[dev,qtermwidget]
uv pip install --system -e ./bec_widgets[dev,pyside6]
-6
View File
@@ -1,6 +0,0 @@
version: 2
updates:
- package-ecosystem: "pip"
directory: "/"
schedule:
interval: "weekly"
-169
View File
@@ -1,169 +0,0 @@
##########################
### AI-generated file. ###
##########################
"""Aggregate and merge benchmark JSON files.
The workflow runs the same benchmark suite on multiple independent runners.
This script reads every JSON file produced by those attempts, normalizes the
contained benchmark values, and writes a compact mapping JSON where each value is
the median across attempts. It can also merge independent hyperfine JSON files
from one runner into a single hyperfine-style JSON file.
"""
from __future__ import annotations
import argparse
import json
import statistics
from pathlib import Path
from typing import Any
from compare_benchmarks import Benchmark, extract_benchmarks
def collect_benchmarks(paths: list[Path]) -> dict[str, list[Benchmark]]:
"""Collect benchmarks from multiple JSON files.
Args:
paths (list[Path]): Paths to hyperfine, pytest-benchmark, or compact
mapping JSON files.
Returns:
dict[str, list[Benchmark]]: Benchmarks grouped by benchmark name.
"""
collected: dict[str, list[Benchmark]] = {}
for path in paths:
for name, benchmark in extract_benchmarks(path).items():
collected.setdefault(name, []).append(benchmark)
return collected
def aggregate(collected: dict[str, list[Benchmark]]) -> dict[str, dict[str, object]]:
"""Aggregate grouped benchmarks using the median value.
Args:
collected (dict[str, list[Benchmark]]): Benchmarks grouped by benchmark
name.
Returns:
dict[str, dict[str, object]]: Compact mapping JSON data. Each benchmark
contains ``value``, ``unit``, ``metric``, ``attempts``, and
``attempt_values``.
"""
aggregated: dict[str, dict[str, object]] = {}
for name, benchmarks in sorted(collected.items()):
values = [benchmark.value for benchmark in benchmarks]
unit = next((benchmark.unit for benchmark in benchmarks if benchmark.unit), "")
metric = next((benchmark.metric for benchmark in benchmarks if benchmark.metric), "value")
aggregated[name] = {
"value": statistics.median(values),
"unit": unit,
"metric": f"median-of-attempt-{metric}",
"attempts": len(values),
"attempt_values": values,
}
return aggregated
def merge_hyperfine_results(paths: list[Path]) -> dict[str, Any]:
"""Merge hyperfine result files.
Args:
paths (list[Path]): Hyperfine JSON files to merge.
Returns:
dict[str, Any]: Hyperfine-style JSON object containing all result rows.
Raises:
ValueError: If any file has no hyperfine ``results`` list.
"""
merged: dict[str, Any] = {"results": []}
for path in paths:
data = json.loads(path.read_text(encoding="utf-8"))
results = data.get("results", []) if isinstance(data, dict) else None
if not isinstance(results, list):
raise ValueError(f"{path} has no hyperfine results list")
merged["results"].extend(results)
return merged
def main_from_paths(input_dir: Path, output: Path) -> int:
"""Aggregate all JSON files in a directory and write the result.
Args:
input_dir (Path): Directory containing benchmark JSON files.
output (Path): Path where the aggregate JSON should be written.
Returns:
int: Always ``0`` on success.
Raises:
ValueError: If no JSON files are found in ``input_dir``.
"""
paths = sorted(input_dir.rglob("*.json"))
if not paths:
raise ValueError(f"No benchmark JSON files found in {input_dir}")
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(
json.dumps(aggregate(collect_benchmarks(paths)), indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
return 0
def merge_from_paths(input_dir: Path, output: Path) -> int:
"""Merge all hyperfine JSON files in a directory and write the result.
Args:
input_dir (Path): Directory containing hyperfine JSON files.
output (Path): Path where the merged JSON should be written.
Returns:
int: Always ``0`` on success.
Raises:
ValueError: If no JSON files are found in ``input_dir``.
"""
paths = sorted(input_dir.glob("*.json"))
if not paths:
raise ValueError(f"No hyperfine JSON files found in {input_dir}")
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(
json.dumps(merge_hyperfine_results(paths), indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
return 0
def main() -> int:
"""Run the benchmark aggregation command line interface.
Returns:
int: Always ``0`` on success.
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--mode",
choices=("aggregate", "merge-hyperfine"),
default="aggregate",
help="Operation to perform.",
)
parser.add_argument("--input-dir", required=True, type=Path)
parser.add_argument("--output", required=True, type=Path)
args = parser.parse_args()
if args.mode == "merge-hyperfine":
return merge_from_paths(input_dir=args.input_dir, output=args.output)
return main_from_paths(input_dir=args.input_dir, output=args.output)
if __name__ == "__main__":
raise SystemExit(main())
-454
View File
@@ -1,454 +0,0 @@
##########################
### AI-generated file. ###
##########################
"""Compare benchmark JSON files and write a GitHub Actions summary.
The script supports JSON emitted by hyperfine, JSON emitted by pytest-benchmark,
and a compact mapping format generated by ``aggregate_benchmarks.py``. Timing
formats prefer median values and fall back to mean values when median values are
not present.
"""
from __future__ import annotations
import argparse
import json
import math
from dataclasses import dataclass
from pathlib import Path
from typing import Any
@dataclass(frozen=True)
class Benchmark:
"""Normalized benchmark result.
Attributes:
name (str): Stable benchmark name used to match baseline and current results.
value (float): Numeric benchmark value used for comparison.
unit (str): Display unit for the value, for example ``"s"``.
metric (str): Source metric name, for example ``"median"`` or ``"mean"``.
"""
name: str
value: float
unit: str
metric: str = "value"
@dataclass(frozen=True)
class Comparison:
"""Comparison between one baseline benchmark and one current benchmark.
Attributes:
name (str): Benchmark name.
baseline (float): Baseline benchmark value.
current (float): Current benchmark value.
delta_percent (float): Percent change from baseline to current.
unit (str): Display unit for both values.
metric (str): Current result metric used for comparison.
regressed (bool): Whether the change exceeds the configured threshold in
the worse direction.
improved (bool): Whether the change exceeds the configured threshold in
the better direction.
"""
name: str
baseline: float
current: float
delta_percent: float
unit: str
metric: str
regressed: bool
improved: bool
def _read_json(path: Path) -> Any:
"""Read JSON data from a file.
Args:
path (Path): Path to the JSON file.
Returns:
Any: Parsed JSON value.
"""
with path.open("r", encoding="utf-8") as stream:
return json.load(stream)
def _as_float(value: Any) -> float | None:
"""Convert a value to a finite float.
Args:
value (Any): Value to convert.
Returns:
float | None: Converted finite float, or ``None`` if conversion fails.
"""
try:
result = float(value)
except (TypeError, ValueError):
return None
if math.isfinite(result):
return result
return None
def _extract_hyperfine(data: dict[str, Any]) -> dict[str, Benchmark]:
"""Extract normalized benchmarks from hyperfine JSON.
Args:
data (dict[str, Any]): Parsed hyperfine JSON object.
Returns:
dict[str, Benchmark]: Benchmarks keyed by command name.
"""
benchmarks: dict[str, Benchmark] = {}
for result in data.get("results", []):
if not isinstance(result, dict):
continue
name = str(result.get("command") or result.get("name") or "").strip()
metric = "median"
value = _as_float(result.get(metric))
if value is None:
metric = "mean"
value = _as_float(result.get(metric))
if name and value is not None:
benchmarks[name] = Benchmark(name=name, value=value, unit="s", metric=metric)
return benchmarks
def _extract_pytest_benchmark(data: dict[str, Any]) -> dict[str, Benchmark]:
"""Extract normalized benchmarks from pytest-benchmark JSON.
Args:
data (dict[str, Any]): Parsed pytest-benchmark JSON object.
Returns:
dict[str, Benchmark]: Benchmarks keyed by full benchmark name.
"""
benchmarks: dict[str, Benchmark] = {}
for benchmark in data.get("benchmarks", []):
if not isinstance(benchmark, dict):
continue
name = str(benchmark.get("fullname") or benchmark.get("name") or "").strip()
stats = benchmark.get("stats", {})
value = None
metric = "median"
if isinstance(stats, dict):
value = _as_float(stats.get(metric))
if value is None:
metric = "mean"
value = _as_float(stats.get(metric))
if name and value is not None:
benchmarks[name] = Benchmark(name=name, value=value, unit="s", metric=metric)
return benchmarks
def _extract_simple_mapping(data: dict[str, Any]) -> dict[str, Benchmark]:
"""Extract normalized benchmarks from a compact mapping JSON object.
Args:
data (dict[str, Any]): Parsed mapping where each benchmark is either a
raw number or an object containing ``value``, ``unit``, and ``metric``.
Returns:
dict[str, Benchmark]: Benchmarks keyed by mapping key.
"""
benchmarks: dict[str, Benchmark] = {}
for name, raw_value in data.items():
if name in {"version", "context", "commit", "timestamp"}:
continue
value = _as_float(raw_value)
unit = ""
metric = "value"
if value is None and isinstance(raw_value, dict):
value = _as_float(raw_value.get("value"))
unit = str(raw_value.get("unit") or "")
metric = str(raw_value.get("metric") or "value")
if value is not None:
benchmarks[str(name)] = Benchmark(name=str(name), value=value, unit=unit, metric=metric)
return benchmarks
def extract_benchmarks(path: Path) -> dict[str, Benchmark]:
"""Extract normalized benchmarks from a supported JSON file.
Args:
path (Path): Path to a hyperfine, pytest-benchmark, or compact mapping
JSON file.
Returns:
dict[str, Benchmark]: Normalized benchmarks keyed by name.
Raises:
ValueError: If the JSON root is not an object or no supported benchmark
entries can be extracted.
"""
data = _read_json(path)
if not isinstance(data, dict):
raise ValueError(f"{path} must contain a JSON object")
extractors = (_extract_hyperfine, _extract_pytest_benchmark, _extract_simple_mapping)
for extractor in extractors:
benchmarks = extractor(data)
if benchmarks:
return benchmarks
raise ValueError(f"No supported benchmark entries found in {path}")
def compare_benchmarks(
baseline: dict[str, Benchmark],
current: dict[str, Benchmark],
threshold_percent: float,
higher_is_better: bool,
) -> tuple[list[Comparison], list[str], list[str]]:
"""Compare baseline benchmarks with current benchmarks.
Args:
baseline (dict[str, Benchmark]): Baseline benchmarks keyed by name.
current (dict[str, Benchmark]): Current benchmarks keyed by name.
threshold_percent (float): Regression threshold in percent.
higher_is_better (bool): If ``True``, lower current values are treated as
regressions. If ``False``, higher current values are treated as
regressions.
Returns:
tuple[list[Comparison], list[str], list[str]]: Comparisons for common
benchmark names, names missing from current results, and names newly
present in current results.
"""
comparisons: list[Comparison] = []
missing_in_current: list[str] = []
new_in_current: list[str] = []
for name, baseline_benchmark in sorted(baseline.items()):
current_benchmark = current.get(name)
if current_benchmark is None:
missing_in_current.append(name)
continue
if baseline_benchmark.value == 0:
delta_percent = 0.0
else:
delta_percent = (
(current_benchmark.value - baseline_benchmark.value)
/ abs(baseline_benchmark.value)
* 100
)
if higher_is_better:
regressed = delta_percent <= -threshold_percent
improved = delta_percent >= threshold_percent
else:
regressed = delta_percent >= threshold_percent
improved = delta_percent <= -threshold_percent
comparisons.append(
Comparison(
name=name,
baseline=baseline_benchmark.value,
current=current_benchmark.value,
delta_percent=delta_percent,
unit=current_benchmark.unit or baseline_benchmark.unit,
metric=current_benchmark.metric,
regressed=regressed,
improved=improved,
)
)
for name in sorted(set(current) - set(baseline)):
new_in_current.append(name)
return comparisons, missing_in_current, new_in_current
def _format_value(value: float, unit: str) -> str:
"""Format a benchmark value for Markdown output.
Args:
value (float): Numeric benchmark value.
unit (str): Display unit.
Returns:
str: Formatted value with optional unit suffix.
"""
suffix = f" {unit}" if unit else ""
return f"{value:.6g}{suffix}"
def _format_status(comparison: Comparison) -> str:
"""Format a comparison status for Markdown output."""
if comparison.regressed:
return ":red_circle: regressed"
if comparison.improved:
return ":green_circle: improved"
return "ok"
def write_summary(
path: Path,
comparisons: list[Comparison],
missing_in_current: list[str],
new_in_current: list[str],
threshold_percent: float,
higher_is_better: bool,
) -> None:
"""Write a Markdown benchmark comparison summary.
Args:
path (Path): Path where the summary should be written.
comparisons (list[Comparison]): Comparison rows for matching benchmarks.
missing_in_current (list[str]): Baseline benchmark names missing from the
current result.
new_in_current (list[str]): Current benchmark names not present in the
baseline result.
threshold_percent (float): Regression threshold in percent.
higher_is_better (bool): Whether higher benchmark values are considered
better.
"""
regressions = [comparison for comparison in comparisons if comparison.regressed]
improvements = [comparison for comparison in comparisons if comparison.improved]
direction = "higher is better" if higher_is_better else "lower is better"
sorted_comparisons = sorted(comparisons, key=lambda comparison: comparison.name)
lines = [
"<!-- bw-benchmark-comment -->",
"## Benchmark comparison",
"",
f"Threshold: {threshold_percent:g}% ({direction}).",
f"Result: {len(regressions)} regression(s), {len(improvements)} improvement(s) beyond threshold.",
]
lines.append("")
if regressions:
lines.extend(
[
f"{len(regressions)} benchmark(s) regressed beyond the configured threshold.",
"",
"| Benchmark | Baseline | Current | Change |",
"| --- | ---: | ---: | ---: |",
]
)
for comparison in regressions:
lines.append(
"| "
f"{comparison.name} | "
f"{_format_value(comparison.baseline, comparison.unit)} | "
f"{_format_value(comparison.current, comparison.unit)} | "
f"{comparison.delta_percent:+.2f}% |"
)
else:
lines.append("No benchmark regression exceeded the configured threshold.")
lines.append("")
if improvements:
lines.extend(
[
f"{len(improvements)} benchmark(s) improved beyond the configured threshold.",
"",
"| Benchmark | Baseline | Current | Change |",
"| --- | ---: | ---: | ---: |",
]
)
for comparison in improvements:
lines.append(
"| "
f"{comparison.name} | "
f"{_format_value(comparison.baseline, comparison.unit)} | "
f"{_format_value(comparison.current, comparison.unit)} | "
f"{comparison.delta_percent:+.2f}% |"
)
else:
lines.append("No benchmark improvement exceeded the configured threshold.")
if sorted_comparisons:
lines.extend(
[
"",
"<details>",
"<summary>All benchmark results</summary>",
"",
"| Benchmark | Baseline | Current | Change | Status |",
"| --- | ---: | ---: | ---: | --- |",
]
)
for comparison in sorted_comparisons:
lines.append(
"| "
f"{comparison.name} | "
f"{_format_value(comparison.baseline, comparison.unit)} | "
f"{_format_value(comparison.current, comparison.unit)} | "
f"{comparison.delta_percent:+.2f}% | "
f"{_format_status(comparison)} |"
)
lines.extend(["", "</details>"])
if missing_in_current:
lines.extend(["", "Missing benchmarks in the current run:"])
lines.extend(f"- `{name}`" for name in missing_in_current)
if new_in_current:
lines.extend(["", "New benchmarks in the current run:"])
lines.extend(f"- `{name}`" for name in new_in_current)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
def main() -> int:
"""Run the benchmark comparison command line interface.
Returns:
int: ``1`` when a regression exceeds the threshold, otherwise ``0``.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--baseline", required=True, type=Path)
parser.add_argument("--current", required=True, type=Path)
parser.add_argument("--summary", required=True, type=Path)
parser.add_argument("--threshold-percent", required=True, type=float)
parser.add_argument("--higher-is-better", action="store_true")
args = parser.parse_args()
baseline = extract_benchmarks(args.baseline)
current = extract_benchmarks(args.current)
comparisons, missing_in_current, new_in_current = compare_benchmarks(
baseline=baseline,
current=current,
threshold_percent=args.threshold_percent,
higher_is_better=args.higher_is_better,
)
write_summary(
path=args.summary,
comparisons=comparisons,
missing_in_current=missing_in_current,
new_in_current=new_in_current,
threshold_percent=args.threshold_percent,
higher_is_better=args.higher_is_better,
)
return 1 if any(comparison.regressed for comparison in comparisons) else 0
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,342 @@
import functools
import os
from typing import Literal
import requests
from github import Github
from pydantic import BaseModel
class GHConfig(BaseModel):
token: str
organization: str
repository: str
project_number: int
graphql_url: str
rest_url: str
headers: dict
class ProjectItemHandler:
"""
A class to handle GitHub project items.
"""
def __init__(self, gh_config: GHConfig):
self.gh_config = gh_config
self.gh = Github(gh_config.token)
self.repo = self.gh.get_repo(f"{gh_config.organization}/{gh_config.repository}")
self.project_node_id = self.get_project_node_id()
def set_issue_status(
self,
status: Literal[
"Selected for Development",
"Weekly Backlog",
"In Development",
"Ready For Review",
"On Hold",
"Done",
],
issue_number: int | None = None,
issue_node_id: str | None = None,
):
"""
Set the status field of a GitHub issue in the project.
Args:
status (str): The status to set. Must be one of the predefined statuses.
issue_number (int, optional): The issue number. If not provided, issue_node_id must be provided.
issue_node_id (str, optional): The issue node ID. If not provided, issue_number must be provided.
"""
if not issue_number and not issue_node_id:
raise ValueError("Either issue_number or issue_node_id must be provided.")
if issue_number and issue_node_id:
raise ValueError("Only one of issue_number or issue_node_id must be provided.")
if issue_number is not None:
issue = self.repo.get_issue(issue_number)
issue_id = self.get_issue_info(issue.node_id)[0]["id"]
else:
issue_id = issue_node_id
field_id, option_id = self.get_status_field_id(field_name=status)
self.set_field_option(issue_id, field_id, option_id)
def run_graphql(self, query: str, variables: dict) -> dict:
"""
Execute a GraphQL query against the GitHub API.
Args:
query (str): The GraphQL query to execute.
variables (dict): The variables to pass to the query.
Returns:
dict: The response from the GitHub API.
"""
response = requests.post(
self.gh_config.graphql_url,
json={"query": query, "variables": variables},
headers=self.gh_config.headers,
timeout=10,
)
if response.status_code != 200:
raise Exception(
f"Query failed with status code {response.status_code}: {response.text}"
)
return response.json()
def get_project_node_id(self):
"""
Retrieve the project node ID from the GitHub API.
"""
query = """
query($owner: String!, $number: Int!) {
organization(login: $owner) {
projectV2(number: $number) {
id
}
}
}
"""
variables = {"owner": self.gh_config.organization, "number": self.gh_config.project_number}
resp = self.run_graphql(query, variables)
return resp["data"]["organization"]["projectV2"]["id"]
def get_issue_info(self, issue_node_id: str):
"""
Get the project-related information for a given issue node ID.
Args:
issue_node_id (str): The node ID of the issue. Please note that this is not the issue number and typically starts with "I".
Returns:
list[dict]: A list of project items associated with the issue.
"""
query = """
query($issueId: ID!) {
node(id: $issueId) {
... on Issue {
projectItems(first: 10) {
nodes {
project {
id
title
}
id
fieldValues(first: 20) {
nodes {
... on ProjectV2ItemFieldSingleSelectValue {
name
field {
... on ProjectV2SingleSelectField {
name
}
}
}
}
}
}
}
}
}
}
"""
variables = {"issueId": issue_node_id}
resp = self.run_graphql(query, variables)
return resp["data"]["node"]["projectItems"]["nodes"]
def get_status_field_id(
self,
field_name: Literal[
"Selected for Development",
"Weekly Backlog",
"In Development",
"Ready For Review",
"On Hold",
"Done",
],
) -> tuple[str, str]:
"""
Get the status field ID and option ID for the given field name in the project.
Args:
field_name (str): The name of the field to retrieve.
Must be one of the predefined statuses.
Returns:
tuple[str, str]: A tuple containing the field ID and option ID.
"""
field_id = None
option_id = None
project_fields = self.get_project_fields()
for field in project_fields:
if field["name"] != "Status":
continue
field_id = field["id"]
for option in field["options"]:
if option["name"] == field_name:
option_id = option["id"]
break
if not field_id or not option_id:
raise ValueError(f"Field '{field_name}' not found in project fields.")
return field_id, option_id
def set_field_option(self, item_id, field_id, option_id):
"""
Set the option of a project item for a single-select field.
Args:
item_id (str): The ID of the project item to update.
field_id (str): The ID of the field to update.
option_id (str): The ID of the option to set.
"""
mutation = """
mutation($projectId: ID!, $itemId: ID!, $fieldId: ID!, $optionId: String!) {
updateProjectV2ItemFieldValue(
input: {
projectId: $projectId
itemId: $itemId
fieldId: $fieldId
value: { singleSelectOptionId: $optionId }
}
) {
projectV2Item {
id
}
}
}
"""
variables = {
"projectId": self.project_node_id,
"itemId": item_id,
"fieldId": field_id,
"optionId": option_id,
}
return self.run_graphql(mutation, variables)
@functools.lru_cache(maxsize=1)
def get_project_fields(self) -> list[dict]:
"""
Get the available fields in the project.
This method caches the result to avoid multiple API calls.
Returns:
list[dict]: A list of fields in the project.
"""
query = """
query($projectId: ID!) {
node(id: $projectId) {
... on ProjectV2 {
fields(first: 50) {
nodes {
... on ProjectV2SingleSelectField {
id
name
options {
id
name
}
}
}
}
}
}
}
"""
variables = {"projectId": self.project_node_id}
resp = self.run_graphql(query, variables)
return list(filter(bool, resp["data"]["node"]["fields"]["nodes"]))
def get_pull_request_linked_issues(self, pr_number: int) -> list[dict]:
"""
Get the linked issues of a pull request.
Args:
pr_number (int): The pull request number.
Returns:
list[dict]: A list of linked issues.
"""
query = """
query($number: Int!, $owner: String!, $repo: String!) {
repository(owner: $owner, name: $repo) {
pullRequest(number: $number) {
id
closingIssuesReferences(first: 50) {
edges {
node {
id
body
number
title
}
}
}
}
}
}
"""
variables = {
"number": pr_number,
"owner": self.gh_config.organization,
"repo": self.gh_config.repository,
}
resp = self.run_graphql(query, variables)
edges = resp["data"]["repository"]["pullRequest"]["closingIssuesReferences"]["edges"]
return [edge["node"] for edge in edges if edge.get("node")]
def main():
# GitHub settings
token = os.getenv("TOKEN")
org = os.getenv("ORG")
repo = os.getenv("REPO")
project_number = os.getenv("PROJECT_NUMBER")
pr_number = os.getenv("PR_NUMBER")
if not token:
raise ValueError("GitHub token is not set. Please set the TOKEN environment variable.")
if not org:
raise ValueError("GitHub organization is not set. Please set the ORG environment variable.")
if not repo:
raise ValueError("GitHub repository is not set. Please set the REPO environment variable.")
if not project_number:
raise ValueError(
"GitHub project number is not set. Please set the PROJECT_NUMBER environment variable."
)
if not pr_number:
raise ValueError(
"Pull request number is not set. Please set the PR_NUMBER environment variable."
)
project_number = int(project_number)
pr_number = int(pr_number)
gh_config = GHConfig(
token=token,
organization=org,
repository=repo,
project_number=project_number,
graphql_url="https://api.github.com/graphql",
rest_url=f"https://api.github.com/repos/{org}/{repo}/issues",
headers={"Authorization": f"Bearer {token}", "Accept": "application/vnd.github+json"},
)
project_item_handler = ProjectItemHandler(gh_config=gh_config)
# Get PR info
pr = project_item_handler.repo.get_pull(pr_number)
# Get the linked issues of the pull request
linked_issues = project_item_handler.get_pull_request_linked_issues(pr_number=pr_number)
print(f"Linked issues: {linked_issues}")
target_status = "In Development" if pr.draft else "Ready For Review"
print(f"Target status: {target_status}")
for issue in linked_issues:
project_item_handler.set_issue_status(issue_number=issue["number"], status=target_status)
if __name__ == "__main__":
main()
@@ -0,0 +1,2 @@
pydantic
pygithub
-74
View File
@@ -1,74 +0,0 @@
#!/usr/bin/env bash
##########################
### AI-generated file. ###
##########################
set -euo pipefail
mkdir -p benchmark-results
benchmark_json="${BENCHMARK_JSON:-benchmark-results/current.json}"
benchmark_root="$(dirname "$benchmark_json")"
hyperfine_benchmark_dir="${BENCHMARK_HYPERFINE_DIR:-tests/benchmarks/hyperfine}"
pytest_benchmark_dirs="${BENCHMARK_PYTEST_DIRS:-${BENCHMARK_PYTEST_DIR:-}}"
benchmark_work_dir="$benchmark_root/raw-results"
hyperfine_json_dir="$benchmark_work_dir/hyperfine"
pytest_json="$benchmark_work_dir/pytest.json"
shopt -s nullglob
benchmark_scripts=()
benchmark_scripts=("$hyperfine_benchmark_dir"/benchmark_*.sh)
shopt -u nullglob
pytest_dirs=()
for pytest_benchmark_dir in $pytest_benchmark_dirs; do
if [ -d "$pytest_benchmark_dir" ]; then
pytest_dirs+=("$pytest_benchmark_dir")
else
echo "Pytest benchmark directory not found: $pytest_benchmark_dir" >&2
exit 1
fi
done
if [ "${#benchmark_scripts[@]}" -eq 0 ] && [ "${#pytest_dirs[@]}" -eq 0 ]; then
echo "No benchmark scripts or pytest benchmarks found" >&2
exit 1
fi
echo "Benchmark Python: $(command -v python)"
python -c 'import sys; print(sys.version)'
rm -rf "$benchmark_work_dir"
mkdir -p "$hyperfine_json_dir"
if [ "${#benchmark_scripts[@]}" -gt 0 ]; then
for benchmark_script in "${benchmark_scripts[@]}"; do
title="$(sed -n 's/^# BENCHMARK_TITLE:[[:space:]]*//p' "$benchmark_script" | head -n 1)"
if [ -z "$title" ]; then
title="$(basename "$benchmark_script" .sh)"
fi
benchmark_name="$(basename "$benchmark_script" .sh)"
benchmark_result_json="$hyperfine_json_dir/$benchmark_name.json"
echo "Preflight benchmark script: $benchmark_script"
bash "$benchmark_script"
hyperfine \
--show-output \
--warmup 1 \
--runs 5 \
--command-name "$title" \
--export-json "$benchmark_result_json" \
"bash $(printf "%q" "$benchmark_script")"
done
fi
if [ "${#pytest_dirs[@]}" -gt 0 ]; then
pytest \
-q "${pytest_dirs[@]}" \
--benchmark-only \
--benchmark-json "$pytest_json"
fi
python .github/scripts/aggregate_benchmarks.py \
--input-dir "$benchmark_work_dir" \
--output "$benchmark_json"
-125
View File
@@ -1,125 +0,0 @@
##########################
### AI-generated file. ###
##########################
"""Run a command with BEC e2e services available."""
from __future__ import annotations
import argparse
import os
import shutil
import subprocess
import tempfile
import time
from pathlib import Path
import bec_lib
from bec_ipython_client import BECIPythonClient
from bec_lib.redis_connector import RedisConnector
from bec_lib.service_config import ServiceConfig, ServiceConfigModel
from redis import Redis
def _wait_for_redis(host: str, port: int) -> None:
client = Redis(host=host, port=port)
deadline = time.monotonic() + 10
while time.monotonic() < deadline:
try:
if client.ping():
return
except Exception:
time.sleep(0.1)
raise RuntimeError(f"Redis did not start on {host}:{port}")
def _start_redis(files_path: Path, host: str, port: int) -> subprocess.Popen:
redis_server = shutil.which("redis-server")
if redis_server is None:
raise RuntimeError("redis-server executable not found")
return subprocess.Popen(
[
redis_server,
"--bind",
host,
"--port",
str(port),
"--save",
"",
"--appendonly",
"no",
"--dir",
str(files_path),
]
)
def _write_configs(files_path: Path, host: str, port: int) -> Path:
test_config = files_path / "test_config.yaml"
services_config = files_path / "services_config.yaml"
bec_lib_path = Path(bec_lib.__file__).resolve().parent
shutil.copyfile(bec_lib_path / "tests" / "test_config.yaml", test_config)
service_config = ServiceConfigModel(
redis={"host": host, "port": port}, file_writer={"base_path": str(files_path)}
)
services_config.write_text(service_config.model_dump_json(indent=4), encoding="utf-8")
return services_config
def _load_demo_config(services_config: Path) -> None:
bec = BECIPythonClient(ServiceConfig(services_config), RedisConnector, forced=True)
bec.start()
try:
bec.config.load_demo_config()
finally:
bec.shutdown()
bec._client._reset_singleton()
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("command", nargs=argparse.REMAINDER)
args = parser.parse_args()
if args.command[:1] == ["--"]:
args.command = args.command[1:]
if not args.command:
raise ValueError("No command provided")
host = "127.0.0.1"
port = 6379
with tempfile.TemporaryDirectory(prefix="bec-benchmark-") as tmp:
files_path = Path(tmp)
services_config = _write_configs(files_path, host, port)
redis_process = _start_redis(files_path, host, port)
processes = None
service_handler = None
try:
_wait_for_redis(host, port)
from bec_server.bec_server_utils.service_handler import ServiceHandler
service_handler = ServiceHandler(
bec_path=files_path, config_path=services_config, interface="subprocess"
)
processes = service_handler.start()
_load_demo_config(services_config)
env = os.environ.copy()
return subprocess.run(args.command, env=env, check=False).returncode
finally:
if service_handler is not None and processes is not None:
service_handler.stop(processes)
redis_process.terminate()
try:
redis_process.wait(timeout=10)
except subprocess.TimeoutExpired:
redis_process.kill()
if __name__ == "__main__":
raise SystemExit(main())
-242
View File
@@ -1,242 +0,0 @@
name: BW Benchmarks
on: [ workflow_call ]
permissions:
contents: read
env:
BENCHMARK_JSON: benchmark-results/current.json
BENCHMARK_BASELINE_JSON: gh-pages-benchmark-data/benchmarks/latest.json
BENCHMARK_SUMMARY: benchmark-results/summary.md
BENCHMARK_COMMAND: "bash .github/scripts/run_benchmarks.sh"
BENCHMARK_THRESHOLD_PERCENT: 20
BENCHMARK_HIGHER_IS_BETTER: false
jobs:
benchmark_attempt:
runs-on: ubuntu-latest
continue-on-error: true
permissions:
contents: read
defaults:
run:
shell: bash -el {0}
strategy:
fail-fast: false
matrix:
attempt: [ 1, 2, 3 ]
env:
BENCHMARK_JSON: benchmark-results/current-${{ matrix.attempt }}.json
BEC_CORE_BRANCH: main
OPHYD_DEVICES_BRANCH: main
PLUGIN_REPO_BRANCH: main
BENCHMARK_PYTEST_DIRS: tests/unit_tests/benchmarks
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
steps:
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
repository: bec-project/bec_widgets
ref: ${{ github.event.pull_request.head.sha || github.sha }}
- name: Set up Conda
uses: conda-incubator/setup-miniconda@v3
with:
auto-update-conda: true
auto-activate-base: true
python-version: "3.11"
- name: Install system dependencies
run: |
sudo apt-get update
sudo apt-get install -y libgl1 libegl1 x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb
sudo apt-get -y install libnss3 libxdamage1 libasound2t64 libatomic1 libxcursor1
sudo apt-get -y install ttyd hyperfine redis-server
- name: Install full e2e environment
run: |
echo -e "\033[35;1m Using branch $BEC_CORE_BRANCH of BEC CORE \033[0;m";
git clone --branch "$BEC_CORE_BRANCH" https://github.com/bec-project/bec.git
echo -e "\033[35;1m Using branch $OPHYD_DEVICES_BRANCH of OPHYD_DEVICES \033[0;m";
git clone --branch "$OPHYD_DEVICES_BRANCH" https://github.com/bec-project/ophyd_devices.git
export OHPYD_DEVICES_PATH=$PWD/ophyd_devices
echo -e "\033[35;1m Using branch $PLUGIN_REPO_BRANCH of bec_testing_plugin \033[0;m";
git clone --branch "$PLUGIN_REPO_BRANCH" https://github.com/bec-project/bec_testing_plugin.git
cd ./bec
conda create -q -n test-environment python=3.11
conda activate test-environment
source ./bin/install_bec_dev.sh -t
cd ../
python -m pip install -e ./ophyd_devices -e .[dev,pyside6] -e ./bec_testing_plugin pytest-benchmark
mkdir -p "$(dirname "$BENCHMARK_JSON")"
python .github/scripts/run_with_bec_servers.py -- bash -lc "$BENCHMARK_COMMAND"
test -s "$BENCHMARK_JSON"
- name: Upload benchmark artifact
uses: actions/upload-artifact@v4
with:
name: bw-benchmark-json-${{ matrix.attempt }}
path: ${{ env.BENCHMARK_JSON }}
benchmark:
needs: [ benchmark_attempt ]
runs-on: ubuntu-latest
permissions:
contents: read
issues: write
pull-requests: write
steps:
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
repository: bec-project/bec_widgets
ref: ${{ github.event.pull_request.head.sha || github.sha }}
- name: Download benchmark attempts
uses: actions/download-artifact@v4
with:
pattern: bw-benchmark-json-*
path: benchmark-results/attempts
merge-multiple: true
- name: Aggregate benchmark attempts
run: |
python .github/scripts/aggregate_benchmarks.py \
--input-dir benchmark-results/attempts \
--output "$BENCHMARK_JSON"
- name: Upload aggregate benchmark artifact
uses: actions/upload-artifact@v4
with:
name: bw-benchmark-json
path: ${{ env.BENCHMARK_JSON }}
- name: Fetch gh-pages benchmark data
run: |
if git ls-remote --exit-code --heads origin gh-pages; then
git clone --depth=1 --branch gh-pages "$GITHUB_SERVER_URL/$GITHUB_REPOSITORY.git" gh-pages-benchmark-data
else
mkdir -p gh-pages-benchmark-data
fi
- name: Compare with latest gh-pages benchmark
id: compare
continue-on-error: true
run: |
if [ ! -s "$BENCHMARK_BASELINE_JSON" ]; then
mkdir -p "$(dirname "$BENCHMARK_SUMMARY")"
{
echo "<!-- bw-benchmark-comment -->"
echo "## Benchmark comparison"
echo
echo "No benchmark baseline was found on gh-pages."
} > "$BENCHMARK_SUMMARY"
exit 0
fi
args=(
--baseline "$BENCHMARK_BASELINE_JSON"
--current "$BENCHMARK_JSON"
--summary "$BENCHMARK_SUMMARY"
--threshold-percent "$BENCHMARK_THRESHOLD_PERCENT"
)
if [ "$BENCHMARK_HIGHER_IS_BETTER" = "true" ]; then
args+=(--higher-is-better)
fi
set +e
python .github/scripts/compare_benchmarks.py "${args[@]}"
status=$?
set -e
if [ ! -s "$BENCHMARK_SUMMARY" ]; then
mkdir -p "$(dirname "$BENCHMARK_SUMMARY")"
{
echo "<!-- bw-benchmark-comment -->"
echo "## Benchmark comparison"
echo
echo "Benchmark comparison failed before writing a summary."
} > "$BENCHMARK_SUMMARY"
fi
exit "$status"
- name: Find existing benchmark PR comment
if: github.event_name == 'pull_request'
id: fc
uses: peter-evans/find-comment@v3
with:
issue-number: ${{ github.event.pull_request.number }}
comment-author: github-actions[bot]
body-includes: "<!-- bw-benchmark-comment -->"
- name: Create or update benchmark PR comment
if: github.event_name == 'pull_request'
uses: peter-evans/create-or-update-comment@v5
with:
issue-number: ${{ github.event.pull_request.number }}
comment-id: ${{ steps.fc.outputs.comment-id }}
body-path: ${{ env.BENCHMARK_SUMMARY }}
edit-mode: replace
- name: Fail on benchmark regression
if: github.event_name == 'pull_request' && steps.compare.outcome == 'failure'
run: exit 1
publish:
needs: [ benchmark ]
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
repository: bec-project/bec_widgets
ref: ${{ github.sha }}
- name: Download aggregate benchmark artifact
uses: actions/download-artifact@v4
with:
name: bw-benchmark-json
path: benchmark-results
- name: Verify aggregate benchmark artifact
run: test -s "$BENCHMARK_JSON"
- name: Prepare gh-pages for publishing
run: |
# Clean up any existing worktree/directory
if [ -d gh-pages-benchmark-data ]; then
git worktree remove gh-pages-benchmark-data --force || rm -rf gh-pages-benchmark-data
fi
if git ls-remote --exit-code --heads origin gh-pages; then
git fetch --depth=1 origin gh-pages
git worktree add gh-pages-benchmark-data FETCH_HEAD
else
git worktree add --detach gh-pages-benchmark-data
git -C gh-pages-benchmark-data checkout --orphan gh-pages
git -C gh-pages-benchmark-data rm -rf .
fi
- name: Publish benchmark data to gh-pages
working-directory: gh-pages-benchmark-data
run: |
mkdir -p benchmarks/history
cp "../$BENCHMARK_JSON" benchmarks/latest.json
cp "../$BENCHMARK_JSON" "benchmarks/history/${GITHUB_SHA}.json"
git config user.name "github-actions[bot]"
git config user.email "41898282+github-actions[bot]@users.noreply.github.com"
git add benchmarks/latest.json "benchmarks/history/${GITHUB_SHA}.json"
git commit -m "Update BW benchmark data for ${GITHUB_SHA}" || exit 0
git push origin HEAD:gh-pages
-85
View File
@@ -1,85 +0,0 @@
name: Run Pytest with Coverage
on:
workflow_call:
inputs:
BEC_CORE_BRANCH:
description: 'Branch for BEC Core'
required: false
default: 'main'
type: string
OPHYD_DEVICES_BRANCH:
description: 'Branch for Ophyd Devices'
required: false
default: 'main'
type: string
BEC_WIDGETS_BRANCH:
description: 'Branch for BEC Widgets'
required: false
default: 'main'
type: string
jobs:
bec:
name: BEC Unit Tests
runs-on: ubuntu-latest
defaults:
run:
shell: bash -el {0}
steps:
- name: Checkout BEC
uses: actions/checkout@v4
with:
repository: bec-project/bec
ref: ${{ inputs.BEC_CORE_BRANCH }}
- name: Install BEC and dependencies
uses: ./.github/actions/bec_install
with:
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH }}
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH }}
PYTHON_VERSION: '3.11'
- name: Run Pytest
run: |
cd ./bec
pip install pytest pytest-random-order
pytest -v --maxfail=2 --junitxml=report.xml --random-order ./bec_server/tests ./bec_ipython_client/tests/client_tests ./bec_lib/tests
- name: Upload BEC unit test artifacts if job fails
if: failure()
uses: actions/upload-artifact@v4
with:
name: bec-unit-test-artifacts
path: |
./bec/report.xml
./bec/logs/*.log
if-no-files-found: ignore
retention-days: 7
bec-e2e-test:
name: BEC End2End Tests
runs-on: ubuntu-latest
steps:
- name: Checkout BEC
uses: actions/checkout@v4
with:
repository: bec-project/bec
ref: ${{ inputs.BEC_CORE_BRANCH }}
- name: Run E2E Tests
uses: ./.github/actions/bec_e2e_install
with:
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH }}
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH }}
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH }}
PYTHON_VERSION: '3.11'
- name: Upload BEC e2e logs if job fails
if: failure()
uses: actions/upload-artifact@v4
with:
name: bec-e2e-test-logs
path: ./_e2e_test_checkout_/bec/logs/*.log
if-no-files-found: ignore
retention-days: 7
+5 -39
View File
@@ -1,29 +1,24 @@
name: Full CI
on:
on:
push:
pull_request:
workflow_dispatch:
inputs:
BEC_WIDGETS_BRANCH:
description: "Branch of BEC Widgets to install"
description: 'Branch of BEC Widgets to install'
required: false
type: string
BEC_CORE_BRANCH:
description: "Branch of BEC Core to install"
description: 'Branch of BEC Core to install'
required: false
type: string
OPHYD_DEVICES_BRANCH:
description: "Branch of Ophyd Devices to install"
description: 'Branch of Ophyd Devices to install'
required: false
type: string
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
permissions:
pull-requests: write
contents: read
jobs:
check_pr_status:
@@ -34,15 +29,6 @@ jobs:
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/formatter.yml
benchmark:
needs: [check_pr_status]
if: needs.check_pr_status.outputs.branch-pr == ''
permissions:
contents: write
issues: write
pull-requests: write
uses: ./.github/workflows/benchmark.yml
unit-test:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
@@ -71,24 +57,4 @@ jobs:
end2end-test:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/end2end-conda.yml
child-repos:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/child_repos.yml
with:
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main'}}
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
plugin_repos:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: bec-project/bec/.github/workflows/plugin_repos.yml@main
with:
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
secrets:
GH_READ_TOKEN: ${{ secrets.GH_READ_TOKEN }}
uses: ./.github/workflows/end2end-conda.yml
+9 -10
View File
@@ -9,10 +9,10 @@ jobs:
shell: bash -el {0}
env:
CHILD_PIPELINE_BRANCH: main # Set the branch you want for ophyd_devices
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
PLUGIN_REPO_BRANCH: main # Set the branch you want for the plugin repo
CHILD_PIPELINE_BRANCH: main # Set the branch you want for ophyd_devices
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
PLUGIN_REPO_BRANCH: main # Set the branch you want for the plugin repo
PROJECT_PATH: ${{ github.repository }}
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
@@ -23,16 +23,15 @@ jobs:
- name: Set up Conda
uses: conda-incubator/setup-miniconda@v3
with:
auto-update-conda: true
auto-activate-base: true
python-version: "3.11"
auto-update-conda: true
auto-activate-base: true
python-version: '3.11'
- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y libgl1 libegl1 x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb
sudo apt-get -y install libnss3 libxdamage1 libasound2t64 libatomic1 libxcursor1
sudo apt-get -y install ttyd
- name: Conda install and run pytest
run: |
@@ -55,5 +54,5 @@ jobs:
uses: actions/upload-artifact@v4
with:
name: pytest-logs
path: ./bec/logs/*.log
retention-days: 7
path: ./logs/*.log
retention-days: 7
+14 -13
View File
@@ -1,25 +1,25 @@
name: Run Pytest with different Python versions
on:
on:
workflow_call:
inputs:
pr_number:
description: "Pull request number"
description: 'Pull request number'
required: false
type: number
BEC_CORE_BRANCH:
description: "Branch of BEC Core to install"
description: 'Branch of BEC Core to install'
required: false
default: "main"
default: 'main'
type: string
OPHYD_DEVICES_BRANCH:
description: "Branch of Ophyd Devices to install"
description: 'Branch of Ophyd Devices to install'
required: false
default: "main"
default: 'main'
type: string
BEC_WIDGETS_BRANCH:
description: "Branch of BEC Widgets to install"
description: 'Branch of BEC Widgets to install'
required: false
default: "main"
default: 'main'
type: string
jobs:
@@ -27,17 +27,18 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.11", "3.12", "3.13"]
python-version: ["3.10", "3.11", "3.12"]
env:
BEC_WIDGETS_BRANCH: main # Set the branch you want for bec_widgets
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
BEC_WIDGETS_BRANCH: main # Set the branch you want for bec_widgets
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
PROJECT_PATH: ${{ github.repository }}
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
steps:
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
@@ -55,4 +56,4 @@ jobs:
- name: Run Pytest
run: |
pip install pytest pytest-random-order
pytest -v --junitxml=report.xml --random-order --ignore=tests/unit_tests/benchmarks ./tests/unit_tests
pytest -v --junitxml=report.xml --random-order ./tests/unit_tests
+12 -18
View File
@@ -1,30 +1,32 @@
name: Run Pytest with Coverage
on:
on:
workflow_call:
inputs:
pr_number:
description: "Pull request number"
description: 'Pull request number'
required: false
type: number
BEC_CORE_BRANCH:
description: "Branch of BEC Core to install"
description: 'Branch of BEC Core to install'
required: false
default: "main"
default: 'main'
type: string
OPHYD_DEVICES_BRANCH:
description: "Branch of Ophyd Devices to install"
description: 'Branch of Ophyd Devices to install'
required: false
default: "main"
default: 'main'
type: string
BEC_WIDGETS_BRANCH:
description: "Branch of BEC Widgets to install"
description: 'Branch of BEC Widgets to install'
required: false
default: "main"
default: 'main'
type: string
secrets:
CODECOV_TOKEN:
required: true
permissions:
pull-requests: write
@@ -53,18 +55,10 @@ jobs:
- name: Run Pytest with Coverage
id: coverage
run: pytest --random-order --cov=bec_widgets --cov-config=pyproject.toml --cov-branch --cov-report=xml --no-cov-on-fail --ignore=tests/unit_tests/benchmarks tests/unit_tests/
- name: Upload test artifacts
uses: actions/upload-artifact@v4
if: failure()
with:
name: image-references
path: bec_widgets/tests/reference_failures/
if-no-files-found: ignore
run: pytest --random-order --cov=bec_widgets --cov-config=pyproject.toml --cov-branch --cov-report=xml --no-cov-on-fail tests/unit_tests/
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
slug: bec-project/bec_widgets
slug: bec-project/bec_widgets
+4 -8
View File
@@ -2,18 +2,14 @@ name: 'Close stale issues and PRs'
on:
schedule:
- cron: '00 10 * * *'
workflow_dispatch:
jobs:
stale:
runs-on: ubuntu-latest
permissions:
issues: write
pull-requests: write
steps:
- uses: actions/stale@v9
with:
stale-issue-message: 'This issue is stale because it has been open 120 days with no activity. Remove stale label or comment or this will be closed in 14 days.'
stale-pr-message: 'This PR is stale because it has been open 120 days with no activity. Remove stale label or comment or this will be closed in 14 days.'
days-before-stale: 120
days-before-close: 14
stale-issue-message: 'This issue is stale because it has been open 60 days with no activity. Remove stale label or comment or this will be closed in 7 days.'
stale-pr-message: 'This PR is stale because it has been open 60 days with no activity. Remove stale label or comment or this will be closed in 7 days.'
days-before-stale: 60
days-before-close: 7
+24 -19
View File
@@ -2,18 +2,7 @@ name: Sync PR to Project
on:
pull_request:
types:
[
opened,
assigned,
unassigned,
edited,
ready_for_review,
converted_to_draft,
reopened,
synchronize,
closed,
]
types: [opened, edited, ready_for_review, converted_to_draft, reopened, synchronize]
jobs:
sync-project:
@@ -24,12 +13,28 @@ jobs:
pull-requests: read
contents: read
env:
PROJECT_NUMBER: 3 # BEC Project
ORG: 'bec-project'
REPO: 'bec_widgets'
TOKEN: ${{ secrets.ADD_ISSUE_TO_PROJECT }}
PR_NUMBER: ${{ github.event.pull_request.number }}
steps:
- name: Sync PR to Project
uses: bec-project/action-issue-sync-pr@v1
- name: Set up python environment
uses: actions/setup-python@v4
with:
token: ${{ secrets.ADD_ISSUE_TO_PROJECT }}
org: ${{ github.repository_owner }}
repo: ${{ github.event.repository.name }}
project-number: 3
pr-number: ${{ github.event.pull_request.number }}
python-version: 3.11
- name: Checkout repo
uses: actions/checkout@v4
with:
repository: ${{ github.repository }}
ref: ${{ github.event.pull_request.head.ref }}
- name: Install dependencies
run: |
pip install -r ./.github/scripts/pr_issue_sync/requirements.txt
- name: Sync PR to Project
run: |
python ./.github/scripts/pr_issue_sync/pr_issue_sync.py
+1 -3
View File
@@ -177,6 +177,4 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
#
tombi.toml
#.idea/
+289
View File
@@ -0,0 +1,289 @@
# This file is a template, and might need editing before it works on your project.
# Official language image. Look for the different tagged releases at:
# https://hub.docker.com/r/library/python/tags/
image: $CI_DEPENDENCY_PROXY_GROUP_IMAGE_PREFIX/python:3.11
#commands to run in the Docker container before starting each job.
variables:
DOCKER_TLS_CERTDIR: ""
BEC_CORE_BRANCH:
description: bec branch
value: main
OPHYD_DEVICES_BRANCH:
description: ophyd_devices branch
value: main
CHILD_PIPELINE_BRANCH: $CI_DEFAULT_BRANCH
CHECK_PKG_VERSIONS:
description: Whether to run additional tests against min/max/random selection of dependencies. Set to 1 for running.
value: 0
workflow:
rules:
- if: $CI_PIPELINE_SOURCE == "schedule"
- if: $CI_PIPELINE_SOURCE == "web"
- if: $CI_PIPELINE_SOURCE == "pipeline"
- if: $CI_PIPELINE_SOURCE == "parent_pipeline"
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
- if: $CI_COMMIT_BRANCH && $CI_OPEN_MERGE_REQUESTS
when: never
- if: $CI_COMMIT_BRANCH
include:
- template: Security/Secret-Detection.gitlab-ci.yml
- project: "bec/awi_utils"
file: "/templates/check-packages-job.yml"
inputs:
stage: test
path: "."
pytest_args: "-v,--random-order,tests/unit_tests"
pip_args: ".[dev]"
# different stages in the pipeline
stages:
- Formatter
- test
- AdditionalTests
- End2End
- Deploy
.install-qt-webengine-deps: &install-qt-webengine-deps
- apt-get -y install libnss3 libxdamage1 libasound2 libatomic1 libxcursor1
- export QTWEBENGINE_DISABLE_SANDBOX=1
.clone-repos: &clone-repos
- echo -e "\033[35;1m Using branch $BEC_CORE_BRANCH of BEC CORE \033[0;m";
- git clone --branch $BEC_CORE_BRANCH https://gitlab.psi.ch/bec/bec.git
- echo -e "\033[35;1m Using branch $OPHYD_DEVICES_BRANCH of OPHYD_DEVICES \033[0;m";
- git clone --branch $OPHYD_DEVICES_BRANCH https://gitlab.psi.ch/bec/ophyd_devices.git
- export OHPYD_DEVICES_PATH=$PWD/ophyd_devices
.install-repos: &install-repos
- pip install -e ./ophyd_devices
- pip install -e ./bec/bec_lib[dev]
- pip install -e ./bec/bec_ipython_client
- pip install -e ./bec/pytest_bec_e2e
.install-os-packages: &install-os-packages
- apt-get update
- apt-get install -y libgl1-mesa-glx libegl1-mesa x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb
- *install-qt-webengine-deps
before_script:
- if [[ "$CI_PROJECT_PATH" != "bec/bec_widgets" ]]; then
echo -e "\033[35;1m Using branch $CHILD_PIPELINE_BRANCH of BEC Widgets \033[0;m";
test -d bec_widgets || git clone --branch $CHILD_PIPELINE_BRANCH https://gitlab.psi.ch/bec/bec_widgets.git; cd bec_widgets;
fi
formatter:
stage: Formatter
needs: []
script:
- pip install -e ./[dev]
- isort --check --diff --line-length=100 --profile=black --multi-line=3 --trailing-comma ./
- black --check --diff --color --line-length=100 --skip-magic-trailing-comma ./
rules:
- if: $CI_PROJECT_PATH == "bec/bec_widgets"
pylint:
stage: Formatter
needs: []
before_script:
- pip install pylint pylint-exit anybadge
- pip install -e .[dev]
script:
- mkdir ./pylint
- pylint ./bec_widgets --output-format=text --output=./pylint/pylint.log | tee ./pylint/pylint.log || pylint-exit $?
- PYLINT_SCORE=$(sed -n 's/^Your code has been rated at \([-0-9.]*\)\/.*/\1/p' ./pylint/pylint.log)
- anybadge --label=Pylint --file=pylint/pylint.svg --value=$PYLINT_SCORE 2=red 4=orange 8=yellow 10=green
- echo "Pylint score is $PYLINT_SCORE"
artifacts:
paths:
- ./pylint/
expire_in: 1 week
rules:
- if: $CI_PROJECT_PATH == "bec/bec_widgets"
pylint-check:
stage: Formatter
needs: []
allow_failure: true
before_script:
- pip install pylint pylint-exit anybadge
- apt-get update
- apt-get install -y bc
script:
- git fetch origin $CI_MERGE_REQUEST_TARGET_BRANCH_NAME
# Identify changed Python files
- if [ "$CI_PIPELINE_SOURCE" == "merge_request_event" ]; then
TARGET_BRANCH_COMMIT_SHA=$(git rev-parse origin/$CI_MERGE_REQUEST_TARGET_BRANCH_NAME);
CHANGED_FILES=$(git diff --name-only $TARGET_BRANCH_COMMIT_SHA HEAD | grep '\.py$' || true);
else
CHANGED_FILES=$(git diff --name-only $CI_COMMIT_BEFORE_SHA $CI_COMMIT_SHA | grep '\.py$' || true);
fi
- if [ -z "$CHANGED_FILES" ]; then echo "No Python files changed."; exit 0; fi
- echo "Changed Python files:"
- $CHANGED_FILES
# Run pylint only on changed files
- mkdir ./pylint
- pylint $CHANGED_FILES --output-format=text | tee ./pylint/pylint_changed_files.log || pylint-exit $?
- PYLINT_SCORE=$(sed -n 's/^Your code has been rated at \([-0-9.]*\)\/.*/\1/p' ./pylint/pylint_changed_files.log)
- echo "Pylint score is $PYLINT_SCORE"
# Fail the job if the pylint score is below 9
- if [ "$(echo "$PYLINT_SCORE < 9" | bc)" -eq 1 ]; then echo "Your pylint score is below the acceptable threshold (9)."; exit 1; fi
artifacts:
paths:
- ./pylint/
expire_in: 1 week
rules:
- if: $CI_PROJECT_PATH == "bec/bec_widgets"
tests:
stage: test
needs: []
variables:
QT_QPA_PLATFORM: "offscreen"
script:
- *clone-repos
- *install-os-packages
- *install-repos
- pip install -e .[dev,pyside6]
- coverage run --source=./bec_widgets -m pytest -v --junitxml=report.xml --maxfail=2 --random-order --full-trace ./tests/unit_tests
- coverage report
- coverage xml
coverage: '/(?i)total.*? (100(?:\.0+)?\%|[1-9]?\d(?:\.\d+)?\%)$/'
artifacts:
reports:
junit: report.xml
coverage_report:
coverage_format: cobertura
path: coverage.xml
paths:
- tests/reference_failures/
when: always
generate-client-check:
stage: test
needs: []
variables:
QT_QPA_PLATFORM: "offscreen"
script:
- *clone-repos
- *install-os-packages
- *install-repos
- pip install -e .[dev,pyside6]
- bw-generate-cli --target bec_widgets
# if there are changes in the generated files, fail the job
- git diff --exit-code
test-matrix:
parallel:
matrix:
- PYTHON_VERSION:
- "3.10"
- "3.11"
- "3.12"
QT_PCKG:
- "pyside6"
stage: AdditionalTests
needs: []
variables:
QT_QPA_PLATFORM: "offscreen"
PYTHON_VERSION: ""
QT_PCKG: ""
image: $CI_DEPENDENCY_PROXY_GROUP_IMAGE_PREFIX/python:$PYTHON_VERSION
script:
- *clone-repos
- *install-os-packages
- *install-repos
- pip install -e .[dev,$QT_PCKG]
- pytest -v --maxfail=2 --junitxml=report.xml --random-order ./tests/unit_tests
end-2-end-conda:
stage: End2End
needs: []
image: continuumio/miniconda3:25.1.1-2
allow_failure: false
variables:
QT_QPA_PLATFORM: "offscreen"
script:
- *clone-repos
- *install-os-packages
- conda config --show-sources
- conda config --add channels conda-forge
- conda config --system --remove channels https://repo.anaconda.com/pkgs/main
- conda config --system --remove channels https://repo.anaconda.com/pkgs/r
- conda config --remove channels https://repo.anaconda.com/pkgs/main
- conda config --remove channels https://repo.anaconda.com/pkgs/r
- conda config --show-sources
- conda config --set channel_priority strict
- conda config --set always_yes yes --set changeps1 no
- conda create -q -n test-environment python=3.11
- conda init bash
- source ~/.bashrc
- conda activate test-environment
- cd ./bec
- source ./bin/install_bec_dev.sh -t
- cd ../
- pip install -e ./ophyd_devices
- pip install -e .[dev,pyside6]
- pytest -v --files-path ./ --start-servers --random-order ./tests/end-2-end
artifacts:
when: on_failure
paths:
- ./logs/*.log
expire_in: 1 week
rules:
- if: '$CI_PIPELINE_SOURCE == "schedule"'
- if: '$CI_PIPELINE_SOURCE == "web"'
- if: '$CI_PIPELINE_SOURCE == "pipeline"'
- if: '$CI_PIPELINE_SOURCE == "parent_pipeline"'
- if: '$CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "main"'
- if: '$CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "production"'
- if: "$CI_MERGE_REQUEST_TARGET_BRANCH_NAME =~ /^pre_release.*$/"
semver:
stage: Deploy
needs: ["tests"]
script:
- git config --global user.name "ci_update_bot"
- git config --global user.email "ci_update_bot@bec.ch"
- git checkout "$CI_COMMIT_REF_NAME"
- git reset --hard origin/"$CI_COMMIT_REF_NAME"
# delete all local tags
- git tag -l | xargs git tag -d
- git fetch --tags
- git tag
# build and publish package
- pip install python-semantic-release==9.* wheel build twine
- export GL_TOKEN=$CI_UPDATES
- semantic-release -vv version
# check if any artifacts were created
- if [ ! -d dist ]; then echo No release will be made; exit 0; fi
- twine upload dist/* -u __token__ -p $CI_PYPI_TOKEN --skip-existing
- semantic-release publish
allow_failure: false
rules:
- if: '$CI_COMMIT_REF_NAME == "main" && $CI_PROJECT_PATH == "bec/bec_widgets"'
pages:
stage: Deploy
needs: ["semver"]
variables:
TARGET_BRANCH: $CI_COMMIT_REF_NAME
rules:
- if: "$CI_COMMIT_TAG != null"
variables:
TARGET_BRANCH: $CI_COMMIT_TAG
- if: '$CI_COMMIT_REF_NAME == "main" && $CI_PROJECT_PATH == "bec/bec_widgets"'
script:
- curl -X POST -d "branches=$CI_COMMIT_REF_NAME" -d "token=$RTD_TOKEN" https://readthedocs.org/api/v2/webhook/bec-widgets/253243/
+1 -1
View File
@@ -52,7 +52,7 @@ persistent=yes
# Minimum Python version to use for version dependent checks. Will default to
# the version used to run pylint.
py-version=3.11
py-version=3.10
# When enabled, pylint would attempt to guess common misconfiguration and emit
# user-friendly hints instead of false-positive error messages.
-2393
View File
File diff suppressed because it is too large Load Diff
+47 -165
View File
@@ -1,199 +1,81 @@
![banner_opti](https://github.com/user-attachments/assets/44e483be-3f0d-4eb0-bd98-613157456b81)
# BEC Widgets
[![CI](https://github.com/bec-project/bec_widgets/actions/workflows/ci.yml/badge.svg)](https://github.com/bec-project/bec_widgets/actions/workflows/ci.yml)
[![badge](https://img.shields.io/pypi/v/bec-widgets)](https://pypi.org/project/bec-widgets/)
[![License](https://img.shields.io/github/license/bec-project/bec_widgets)](./LICENSE)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Python](https://img.shields.io/badge/python-3.11%20%7C%203.12%20%7C%203.13-blue?logo=python&logoColor=white)](https://www.python.org)
[![Python](https://img.shields.io/badge/python-3.10%20%7C%203.11%20%7C%203.12-blue?logo=python&logoColor=white)](https://www.python.org)
[![PySide6](https://img.shields.io/badge/PySide6-blue?logo=qt&logoColor=white)](https://doc.qt.io/qtforpython/)
[![Conventional Commits](https://img.shields.io/badge/conventional%20commits-1.0.0-yellow?logo=conventionalcommits&logoColor=white)](https://conventionalcommits.org)
[![codecov](https://codecov.io/gh/bec-project/bec_widgets/graph/badge.svg?token=0Z9IQRJKMY)](https://codecov.io/gh/bec-project/bec_widgets)
A modular PySide6(Qt6) toolkit for [BEC (Beamline Experiment Control)](https://github.com/bec-project/bec). Create
high-performance, dockable GUIs to move devices, run scans, and stream live or disk data—powered by Redis and a modular
plugin system.
## Highlights
**⚠️ Important Notice:**
- **No-code first** — For ~90% of day-to-day workflows, you can compose, operate, and save workspaces **without writing
a single line of code**. Just launch, drag widgets, and do your experiment.
- **Flexible layout composition** — Build complex experiment GUIs in seconds with the `BECDockArea`: dragdock, tab,
split, and export profiles/workspaces for reuse.
- **CLI / scripting** — Control your beamline experiment from the command line a robust RPC layer using
`BECIPythonClient`.
- **Designer integration** — Use Qt Designer plugins to drop BEC widgets next to any Qt control, then launch the `.ui`
with the custom BEC loader for a zeroglue workflow.
- **Operational integration** — Widgets stay in sync with your running BEC/Redis as the single source of truth:
Subscribe to events from BEC and create dynamically updating UIs. BECWidgets also grants you easy access the
acquisition history.
- **Extensible by design** — Build new widgets with minimal boilerplate using `BECWidget` and `BECDispatcher` for BEC data and
messaging. Use the generator command to scaffold RPC interfaces and Designer plugin stubs; beamline plugins can extend
or override behavior as needed.
🚨 **PyQt6 is no longer supported** due to incompatibilities with Qt Designer. Please use **PySide6** instead. 🚨
## Table of Contents
- [Installation](#installation)
- [Features](#features)
- [1. Dock area interface: build GUIs in seconds](#1-dock-area-interface-build-guis-in-seconds)
- [2. Qt Designer plugins + BEC Launcher (no glue)](#2-qt-designer-plugins--bec-launcher-no-glue)
- [3. Robust RPC from CLI & remote scripting](#3-robust-rpc-from-cli--remote-scripting)
- [4. Rapid development (extensible by design)](#4-rapid-development-extensible-by-design)
- [Widget Library](#widget-library)
- [Documentation](#documentation)
- [License](#license)
BEC Widgets is a GUI framework designed for interaction with [BEC (Beamline Experiment Control)](https://gitlab.psi.ch/bec/bec).
## Installation
Use any of the following setups:
### Stable release
Use the package manager [pip](https://pip.pypa.io/en/stable/) to install BEC Widgets:
```bash
pip install bec_widgets
pip install bec_widgets[pyside6]
```
### From source (recommended for development)
For development purposes, you can clone the repository and install the package locally in editable mode:
```bash
git clone https://github.com/bec-project/bec_widgets.git
git clone https://gitlab.psi.ch/bec/bec-widgets
cd bec_widgets
pip install -e .[dev]
pip install -e .[dev,pyside6]
```
## Features
### 1. Dock area interface: build GUIs in seconds
The fastest way to explore BEC Widgets. Launch the BEC IPython client with simply `bec` in terminal and the **BECDockArea** opens as the default UI:
drag widgets, dock/tab/split panes, and explore. Everything is live—widgets auto-connect to BEC/Redis, so you can
operate immediately and refine later with RPC or Designer if needed.
![dock_area_example](https://github.com/user-attachments/assets/219a2806-19a8-4a07-9734-b7b554850833)
### 2. Qt Designer plugins + BEC Launcher (no glue)
All BEC Widgets ship as **Qt Designer plugins** with our custom Qt Designer launchable by `bec-designer`. Design your UI
visually in Designer, save a `.ui`, then launch it with
the **BEC Launcher**—no glue code. Widgets autoconnect to BEC/Redis on startup, so your UI is operational immediately.
![designer_opti](https://github.com/user-attachments/assets/fed4843c-1cce-438a-b41f-6636fa5e1545)
### 3. Robust RPC from CLI & remote scripting
Operate and automate BEC Widgets directly from the `BECIPythonClient`. Create or attach to GUIs, address any sub-widget
via a simple hierarchical API with tab-completion, and script event-driven behavior that reacts to BEC (scan lifecycle,
active devices, topics)—so your UI can be heavily automated.
- Create & control GUIs: launch, load profiles, open/close panels, tweak properties—all from the shell.
- Hierarchical addressing: navigate widgets and sub-widgets with discoverable paths and tab-completion.
- Event scripting: subscribe to BEC events (e.g., scan start/finish, device readiness, topic updates) and trigger
actions,switch profiles, open diagnostic views, or start specific scans.
- Remote & headless: run automation on analysis nodes or from notebooks without a local GUI process.
- Plays with no-code: Use the Dock Area / BEC Designer to set up the layout and add automation with RPC when needed.
![rpc_opti](https://github.com/user-attachments/assets/666be7fb-9a0d-44c2-8d44-2f9d1dae4497)
### 4. Rapid development (extensible by design)
Build new widgets fast: Inherit from `BECWidget`, list your RPC methods in `USER_ACCESS`, and use `bec_dispatcher` to
bind endpoints. Then run `bw-generate-cli --target <your-plugin-repo>`. This generates the RPC CLI bindings and a Qt
Designer plugin that are immediately usable with your BEC setup. Widgets
come online with live BEC/Redis wiring out of the box.
<details>
<summary> View code: Example Widget </summary>
```python
from typing import Literal
from qtpy.QtWidgets import QWidget, QLabel, QPushButton, QHBoxLayout, QVBoxLayout, QApplication
from qtpy.QtCore import Slot
from bec_lib.endpoints import MessageEndpoints
from bec_widgets import BECWidget, SafeSlot
class SimpleMotorWidget(BECWidget, QWidget):
USER_ACCESS = ["move"]
def __init__(self, parent=None, motor_name="samx", step=5.0, **kwargs):
super().__init__(parent=parent, **kwargs)
self.motor_name = motor_name
self.step = float(step)
self.get_bec_shortcuts()
self.value_label = QLabel(f"{self.motor_name}: —")
self.btn_left = QPushButton("◀︎ -5")
self.btn_right = QPushButton("+5 ▶︎")
row = QHBoxLayout()
row.addWidget(self.btn_left)
row.addWidget(self.btn_right)
col = QVBoxLayout(self)
col.addWidget(self.value_label)
col.addLayout(row)
self.btn_left.clicked.connect(lambda: self.move("left", self.step))
self.btn_right.clicked.connect(lambda: self.move("right", self.step))
self.bec_dispatcher.connect_slot(self.on_readback, MessageEndpoints.device_readback(self.motor_name))
@SafeSlot(dict, dict)
def on_readback(self, data: dict, meta: dict):
current_value = data.get("signals").get(self.motor_name).get('value')
self.value_label.setText(f"{self.motor_name}: {current_value:.3f}")
@Slot(str, float)
def move(self, direction: Literal["left", "right"] = "left", step: float = 5.0):
if direction == "left":
self.dev[self.motor_name].move(-step, relative=True)
else:
self.dev[self.motor_name].move(step, relative=True)
if __name__ == "__main__":
import sys
app = QApplication(sys.argv)
w = SimpleMotorWidget(motor_name="samx", step=5.0)
w.setWindowTitle("MotorJogWidget")
w.resize(280, 90)
w.show()
sys.exit(app.exec_())
```
</details>
## Widget Library
A large and growing catalog—plug, configure, run:
### Plotting
Waveform, MultiWaveform, and Image/Heatmap widgets deliver responsive plots with crosshairs and ROIs for live and
history data.
<img width="1108" height="838" alt="plotting_hr" src="https://github.com/user-attachments/assets/f50462a5-178d-44d4-aee5-d378c74b107b" />
### Scan orchestration and motion control.
Start and stop scans, track progress, reuse parameter presets, and browse history from a focused control surface.
Positioner boxes and tweak controls handle precise moves, homing, and calibration for daytoday alignment.
<img width="1496" height="1388" alt="control" src="https://github.com/user-attachments/assets/d4fb2e2e-04f9-4621-8087-790680797620" />
BEC Widgets now **only supports PySide6**. Users must manually install PySide6 as no default Qt distribution is
specified.
## Documentation
The documentation can be found [here](https://bec.readthedocs.io/).
Documentation of BEC Widgets can be found [here](https://bec-widgets.readthedocs.io/en/latest/). The documentation of the BEC can be found [here](https://bec.readthedocs.io/en/latest/).
## Contributing
All commits should use the Angular commit scheme:
> #### <a name="commit-header"></a>Angular Commit Message Header
>
> ```
> <type>(<scope>): <short summary>
> │ │ │
> │ │ └─⫸ Summary in present tense. Not capitalized. No period at the end.
> │ │
> │ └─⫸ Commit Scope: animations|bazel|benchpress|common|compiler|compiler-cli|core|
> │ elements|forms|http|language-service|localize|platform-browser|
> │ platform-browser-dynamic|platform-server|router|service-worker|
> │ upgrade|zone.js|packaging|changelog|docs-infra|migrations|ngcc|ve|
> │ devtools
>
> └─⫸ Commit Type: build|ci|docs|feat|fix|perf|refactor|test
> ```
>
> The `<type>` and `<summary>` fields are mandatory, the `(<scope>)` field is optional.
> ##### Type
>
> Must be one of the following:
>
> * **build**: Changes that affect the build system or external dependencies (example scopes: gulp, broccoli, npm)
> * **ci**: Changes to our CI configuration files and scripts (examples: CircleCi, SauceLabs)
> * **docs**: Documentation only changes
> * **feat**: A new feature
> * **fix**: A bug fix
> * **perf**: A code change that improves performance
> * **refactor**: A code change that neither fixes a bug nor adds a feature
> * **test**: Adding missing tests or correcting existing tests
## License
[BSD-3-Clause](https://choosealicense.com/licenses/bsd-3-clause/)
-28
View File
@@ -1,28 +0,0 @@
While BEC Widgets is shipped with BSD-3-Clause license, it includes third-party components with different licenses. Below is a list of these components along with their respective licenses.
Core Dependencies:
- BEC: BSD-3-Clause License, see [here](https://github.com/bec-project/bec/blob/main/LICENSE)
- black: MIT License, see [here](https://github.com/psf/black/blob/main/LICENSE)
- isort: MIT License, see [here](https://github.com/PyCQA/isort/blob/main/LICENSE)
- pydantic: MIT License, see [here](https://github.com/pydantic/pydantic/blob/main/LICENSE)
- pyqtgraph: MIT License, see [here](https://github.com/pyqtgraph/pyqtgraph/blob/master/LICENSE.txt)
- PySide6: LGPLv3 License, see [here](https://doc.qt.io/qtforpython/licenses.html)
- qtconsole: BSD-3-Clause License, see [here](https://github.com/spyder-ide/qtconsole/blob/main/LICENSE)
- qtpy: MIT License, see [here](https://github.com/spyder-ide/qtpy/blob/master/LICENSE.txt)
- qtmonaco: BSD-3-Clause License, see [here](https://github.com/bec-project/qtmonaco/blob/main/LICENSE)
- thefuzz: MIT License, see [here](https://github.com/seatgeek/thefuzz/blob/master/LICENSE.txt)
Additional Dependencies (Testing/Development):
- coverage: Apache License 2.0, see [here](https://github.com/coveragepy/coveragepy/blob/main/LICENSE.txt)
- fakeredis: BSD-3-Clause License, see [here](https://github.com/cunla/fakeredis-py/blob/master/LICENSE)
- pytest-bec-e2e: BSD-3-Clause License, see [here](https://github.com/bec-project/bec/blob/main/LICENSE)
- pytest-qt: MIT License, see [here](https://github.com/pytest-dev/pytest-qt/blob/master/LICENSE)
- pytest-random-order: MIT License, see [here](https://github.com/pytest-dev/pytest-random-order/blob/main/LICENSE)
- pytest-timeout: MIT License, see [here](https://github.com/pytest-dev/pytest-timeout/blob/main/LICENSE)
- pytest-xvfb: MIT License, see [here](https://github.com/The-Compiler/pytest-xvfb/blob/master/LICENSE)
- pytest: MIT License, see [here](https://github.com/pytest-dev/pytest/blob/main/LICENSE)
- pytest-cov: MIT License, see [here](https://github.com/pytest-dev/pytest-cov/blob/main/LICENSE)
- watchdog: Apache License 2.0, see [here](https://github.com/gorakhargosh/watchdog/blob/master/LICENSE)
- pre_commit: MIT License, see [here](https://github.com/pre-commit/pre-commit/blob/main/LICENSE)
+3 -12
View File
@@ -1,13 +1,4 @@
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
__all__ = ["BECWidget", "SafeSlot", "SafeProperty"]
def __getattr__(name):
if name == "BECWidget":
from bec_widgets.utils.bec_widget import BECWidget
return BECWidget
if name in {"SafeSlot", "SafeProperty"}:
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
return {"SafeSlot": SafeSlot, "SafeProperty": SafeProperty}[name]
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
-15
View File
@@ -1,15 +0,0 @@
import os
import sys
import bec_widgets.widgets.containers.qt_ads as QtAds
if sys.platform.startswith("linux"):
qt_platform = os.environ.get("QT_QPA_PLATFORM", "")
if qt_platform != "offscreen":
os.environ["QT_QPA_PLATFORM"] = "xcb"
# Default QtAds configuration
QtAds.CDockManager.setConfigFlag(QtAds.CDockManager.eConfigFlag.FocusHighlighting, True)
QtAds.CDockManager.setConfigFlag(
QtAds.CDockManager.eConfigFlag.RetainTabSizeWhenCloseButtonHidden, True
)
+4 -35
View File
@@ -1,43 +1,12 @@
from __future__ import annotations
from typing import Literal
from bec_lib import bec_logger
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
logger = bec_logger.logger
from bec_widgets.widgets.containers.dock.dock_area import BECDockArea
def dock_area(
object_name: str | None = None, startup_profile: str | Literal["restore", "skip"] | None = None
) -> BECDockArea:
"""
Create an advanced dock area using Qt Advanced Docking System.
Args:
object_name(str): The name of the advanced dock area.
startup_profile(str | Literal["restore", "skip"] | None): Startup mode for
the workspace:
- None: start empty
- "restore": restore last used profile
- "skip": do not initialize profile state
- "<name>": load specific profile
Returns:
BECDockArea: The created advanced dock area.
"""
widget = BECDockArea(
object_name=object_name,
root_widget=True,
profile_namespace="bec",
startup_profile=startup_profile,
)
logger.info(f"Created advanced dock area with startup_profile: {startup_profile}")
return widget
def dock_area(object_name: str | None = None) -> BECDockArea:
_dock_area = BECDockArea(object_name=object_name, root_widget=True)
return _dock_area
def auto_update_dock_area(object_name: str | None = None) -> AutoUpdates:
+71 -235
View File
@@ -20,20 +20,18 @@ from qtpy.QtWidgets import (
)
import bec_widgets
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets
from bec_widgets.utils.container_utils import WidgetContainerUtils
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.name_utils import pascal_to_snake
from bec_widgets.utils.plugin_utils import get_plugin_auto_updates
from bec_widgets.utils.round_frame import RoundedFrame
from bec_widgets.utils.rpc_register import RPCRegister
from bec_widgets.utils.screen_utils import apply_window_geometry, centered_geometry_for_app
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.utils.ui_loader import UILoader
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
from bec_widgets.widgets.containers.dock_area.profile_utils import get_last_profile, list_profiles
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow, BECMainWindowNoRPC
from bec_widgets.widgets.containers.dock.dock_area import BECDockArea
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow, UILaunchWindow
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
if TYPE_CHECKING: # pragma: no cover
@@ -43,7 +41,6 @@ if TYPE_CHECKING: # pragma: no cover
logger = bec_logger.logger
MODULE_PATH = os.path.dirname(bec_widgets.__file__)
START_EMPTY_PROFILE_OPTION = "Start Empty (No Profile)"
class LaunchTile(RoundedFrame):
@@ -77,28 +74,23 @@ class LaunchTile(RoundedFrame):
circular_pixmap.fill(Qt.transparent)
painter = QPainter(circular_pixmap)
painter.setRenderHints(QPainter.RenderHint.Antialiasing, True)
painter.setRenderHints(QPainter.Antialiasing, True)
path = QPainterPath()
path.addEllipse(0, 0, size, size)
painter.setClipPath(path)
pixmap = pixmap.scaled(
size,
size,
Qt.AspectRatioMode.KeepAspectRatio,
Qt.TransformationMode.SmoothTransformation,
)
pixmap = pixmap.scaled(size, size, Qt.KeepAspectRatio, Qt.SmoothTransformation)
painter.drawPixmap(0, 0, pixmap)
painter.end()
self.icon_label.setPixmap(circular_pixmap)
self.layout.addWidget(self.icon_label, alignment=Qt.AlignmentFlag.AlignCenter)
self.layout.addWidget(self.icon_label, alignment=Qt.AlignCenter)
# Top label
self.top_label = QLabel(top_label.upper())
font_top = self.top_label.font()
font_top.setPointSize(10)
self.top_label.setFont(font_top)
self.layout.addWidget(self.top_label, alignment=Qt.AlignmentFlag.AlignCenter)
self.layout.addWidget(self.top_label, alignment=Qt.AlignCenter)
# Main label
self.main_label = QLabel(main_label)
@@ -108,7 +100,7 @@ class LaunchTile(RoundedFrame):
font_main.setPointSize(14)
font_main.setBold(True)
self.main_label.setFont(font_main)
self.main_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.main_label.setAlignment(Qt.AlignCenter)
# Shrink font if the default would wrap on this platform / DPI
content_width = (
@@ -124,13 +116,13 @@ class LaunchTile(RoundedFrame):
self.layout.addWidget(self.main_label)
self.spacer_top = QSpacerItem(0, 10, QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed)
self.spacer_top = QSpacerItem(0, 10, QSizePolicy.Fixed, QSizePolicy.Fixed)
self.layout.addItem(self.spacer_top)
# Description
self.description_label = QLabel(description)
self.description_label.setWordWrap(True)
self.description_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.description_label.setAlignment(Qt.AlignCenter)
self.layout.addWidget(self.description_label)
# Selector
@@ -140,14 +132,13 @@ class LaunchTile(RoundedFrame):
else:
self.selector = None
self.spacer_bottom = QSpacerItem(
0, 0, QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Expanding
)
self.spacer_bottom = QSpacerItem(0, 0, QSizePolicy.Fixed, QSizePolicy.Expanding)
self.layout.addItem(self.spacer_bottom)
# Action button
self.action_button = QPushButton("Open")
self.action_button.setStyleSheet("""
self.action_button.setStyleSheet(
"""
QPushButton {
background-color: #007AFF;
border: none;
@@ -159,8 +150,9 @@ class LaunchTile(RoundedFrame):
QPushButton:hover {
background-color: #005BB5;
}
""")
self.layout.addWidget(self.action_button, alignment=Qt.AlignmentFlag.AlignCenter)
"""
)
self.layout.addWidget(self.action_button, alignment=Qt.AlignCenter)
def _fit_label_to_width(self, label: QLabel, max_width: int, min_pt: int = 10):
"""
@@ -183,31 +175,21 @@ class LaunchTile(RoundedFrame):
metrics = QFontMetrics(font)
label.setFont(font)
label.setWordWrap(False)
label.setText(metrics.elidedText(label.text(), Qt.TextElideMode.ElideRight, max_width))
label.setText(metrics.elidedText(label.text(), Qt.ElideRight, max_width))
class LaunchWindow(BECMainWindow):
RPC = True
PLUGIN = False
TILE_SIZE = (250, 300)
DEFAULT_LAUNCH_SIZE = (800, 600)
USER_ACCESS = ["show_launcher", "hide_launcher"]
def __init__(
self,
parent=None,
gui_id: str = None,
window_title="BEC Launcher",
launch_gui_class: str = None,
launch_gui_id: str = None,
*args,
**kwargs,
self, parent=None, gui_id: str = None, window_title="BEC Launcher", *args, **kwargs
):
super().__init__(parent=parent, gui_id=gui_id, window_title=window_title, **kwargs)
self.app = QApplication.instance()
self.tiles: dict[str, LaunchTile] = {}
self._logged_unparented_connections: set[str] = set()
# Track the smallest mainlabel font size chosen so far
self._min_main_label_pt: int | None = None
@@ -216,7 +198,7 @@ class LaunchWindow(BECMainWindow):
self.toolbar = ModularToolBar(parent=self)
self.addToolBar(Qt.TopToolBarArea, self.toolbar)
self.spacer = QWidget(self)
self.spacer.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
self.spacer.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
self.toolbar.addWidget(self.spacer)
self.toolbar.addWidget(self.dark_mode_button)
@@ -229,13 +211,11 @@ class LaunchWindow(BECMainWindow):
name="dock_area",
icon_path=os.path.join(MODULE_PATH, "assets", "app_icons", "bec_widgets_icon.png"),
top_label="Get started",
main_label="BEC Advanced Dock Area",
description="Flexible application for managing modular widgets and user profiles.",
action_button=self._open_dock_area,
show_selector=True,
selector_items=list_profiles("bec"),
main_label="BEC Dock Area",
description="Highly flexible and customizable dock area application with modular widgets.",
action_button=lambda: self.launch("dock_area"),
show_selector=False,
)
self._refresh_dock_area_profiles(preserve_selection=False)
self.available_auto_updates: dict[str, type[AutoUpdates]] = (
self._update_available_auto_updates()
@@ -285,11 +265,6 @@ class LaunchWindow(BECMainWindow):
self.register.callbacks.append(self._turn_off_the_lights)
self.register.broadcast()
if launch_gui_class and launch_gui_id:
# If a specific gui class is provided, launch it and hide the launcher
self.launch(launch_gui_class, name=launch_gui_id)
self.hide()
def register_tile(
self,
name: str,
@@ -325,7 +300,7 @@ class LaunchWindow(BECMainWindow):
)
tile.setFixedWidth(self.TILE_SIZE[0])
tile.setMinimumHeight(self.TILE_SIZE[1])
tile.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.MinimumExpanding)
tile.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.MinimumExpanding)
if action_button:
tile.action_button.clicked.connect(action_button)
if show_selector and selector_items:
@@ -351,73 +326,6 @@ class LaunchWindow(BECMainWindow):
self.tiles[name] = tile
def _refresh_dock_area_profiles(self, preserve_selection: bool = True) -> None:
"""
Refresh the dock-area profile selector, optionally preserving the selection.
Defaults to Start Empty when no valid selection can be preserved.
Args:
preserve_selection(bool): Whether to preserve the current selection or not.
"""
tile = self.tiles.get("dock_area")
if tile is None or tile.selector is None:
return
selector = tile.selector
selected_text = (
selector.currentText().strip() if preserve_selection and selector.count() > 0 else ""
)
profiles = list_profiles("bec")
selector_items = [START_EMPTY_PROFILE_OPTION, *profiles]
selector.blockSignals(True)
selector.clear()
for profile in selector_items:
selector.addItem(profile)
if selected_text:
# Try to preserve the current selection
idx = selector.findText(selected_text, Qt.MatchFlag.MatchExactly)
if idx >= 0:
selector.setCurrentIndex(idx)
else:
# Selection no longer exists, fall back to default startup selection.
self._set_selector_to_default_profile(selector, profiles)
else:
# No selection to preserve, use default startup selection.
self._set_selector_to_default_profile(selector, profiles)
selector.blockSignals(False)
def _set_selector_to_default_profile(self, selector: QComboBox, profiles: list[str]) -> None:
"""
Set the selector default.
Preference order:
1) Start Empty option (if available)
2) Last used profile
3) First available profile
Args:
selector(QComboBox): The combobox to set.
profiles(list[str]): List of available profiles.
"""
start_empty_idx = selector.findText(START_EMPTY_PROFILE_OPTION, Qt.MatchFlag.MatchExactly)
if start_empty_idx >= 0:
selector.setCurrentIndex(start_empty_idx)
return
# Try to get last used profile
last_profile = get_last_profile(namespace="bec")
if last_profile and last_profile in profiles:
idx = selector.findText(last_profile, Qt.MatchFlag.MatchExactly)
if idx >= 0:
selector.setCurrentIndex(idx)
return
# If nothing else, select first item
if selector.count() > 0:
selector.setCurrentIndex(0)
def launch(
self,
launch_script: str,
@@ -439,14 +347,14 @@ class LaunchWindow(BECMainWindow):
from bec_widgets.applications import bw_launch
with RPCRegister.delayed_broadcast() as rpc_register:
if geometry is None and launch_script != "custom_ui_file":
geometry = self._default_launch_geometry()
existing_dock_areas = rpc_register.get_names_of_rpc_by_class_type(BECDockArea)
if name is not None:
WidgetContainerUtils.raise_for_invalid_name(name)
# If name already exists, generate a unique one with counter suffix
if name in existing_dock_areas:
name = WidgetContainerUtils.generate_unique_name(name, existing_dock_areas)
raise ValueError(
f"Name {name} must be unique for dock areas, but already exists: {existing_dock_areas}."
)
WidgetContainerUtils.raise_for_invalid_name(name)
else:
name = "dock_area"
name = WidgetContainerUtils.generate_unique_name(name, existing_dock_areas)
@@ -464,46 +372,43 @@ class LaunchWindow(BECMainWindow):
if launch_script == "auto_update":
auto_update = kwargs.pop("auto_update", None)
return self._launch_auto_update(auto_update, geometry=geometry)
return self._launch_auto_update(auto_update)
if launch_script == "widget":
widget = kwargs.pop("widget", None)
if widget is None:
raise ValueError("Widget name must be provided.")
return self._launch_widget(widget, geometry=geometry)
return self._launch_widget(widget)
launch = getattr(bw_launch, launch_script, None)
if launch is None:
raise ValueError(f"Launch script {launch_script} not found.")
result_widget = launch(name, **kwargs)
result_widget = launch(name)
result_widget.resize(result_widget.minimumSizeHint())
# TODO Should we simply use the specified name as title here?
result_widget.window().setWindowTitle(f"BEC - {name}")
logger.info(f"Created new dock area: {name}")
if geometry is not None:
result_widget.setGeometry(*geometry)
if isinstance(result_widget, BECMainWindow):
apply_window_geometry(result_widget, geometry)
result_widget.show()
else:
window = BECMainWindowNoRPC()
window = BECMainWindow()
window.setCentralWidget(result_widget)
window.setWindowTitle(f"BEC - {result_widget.objectName()}")
apply_window_geometry(window, geometry)
window.show()
return result_widget
def _launch_custom_ui_file(self, ui_file: str | None) -> BECMainWindow:
"""
Load a custom .ui file. If the top-level widget is a MainWindow subclass,
instantiate it directly; otherwise, embed it in a UILaunchWindow.
"""
# Load the custom UI file
if ui_file is None:
raise ValueError("UI file must be provided for custom UI file launch.")
filename = os.path.basename(ui_file).split(".")[0]
WidgetContainerUtils.raise_for_invalid_name(filename)
# Parse the UI to detect top-level widget class
tree = ET.parse(ui_file)
root = tree.getroot()
# Check if the top-level widget is a QMainWindow
@@ -511,27 +416,22 @@ class LaunchWindow(BECMainWindow):
if widget is None:
raise ValueError("No widget found in the UI file.")
# Load the UI into a widget
loader = UILoader(None)
loaded = loader.loader(ui_file)
if widget.attrib.get("class") == "QMainWindow":
raise ValueError(
"Loading a QMainWindow from a UI file is currently not supported. "
"If you need this, please contact the BEC team or create a ticket on gitlab.psi.ch/bec/bec_widgets."
)
# Display the UI in a BECMainWindow
if isinstance(loaded, BECMainWindow):
window = loaded
window.object_name = filename
else:
window = BECMainWindow(object_name=filename)
window.setCentralWidget(loaded)
window.setWindowTitle(f"BEC - {filename}")
apply_window_geometry(window, None)
window = UILaunchWindow(object_name=filename)
QApplication.processEvents()
result_widget = UILoader(window).loader(ui_file)
window.setCentralWidget(result_widget)
window.setWindowTitle(f"BEC - {window.object_name}")
window.show()
logger.info(f"Launched custom UI: {filename}, type: {type(window).__name__}")
logger.info(f"Object name of new instance: {result_widget.objectName()}, {window.gui_id}")
return window
def _launch_auto_update(
self, auto_update: str, geometry: tuple[int, int, int, int] | None = None
) -> AutoUpdates:
def _launch_auto_update(self, auto_update: str) -> AutoUpdates:
if auto_update in self.available_auto_updates:
auto_update_cls = self.available_auto_updates[auto_update]
window = auto_update_cls()
@@ -541,27 +441,25 @@ class LaunchWindow(BECMainWindow):
window = AutoUpdates()
window.resize(window.minimumSizeHint())
QApplication.processEvents()
window.setWindowTitle(f"BEC - {window.objectName()}")
apply_window_geometry(window, geometry)
window.show()
return window
def _launch_widget(
self, widget: type[BECWidget], geometry: tuple[int, int, int, int] | None = None
) -> QWidget:
def _launch_widget(self, widget: type[BECWidget]) -> QWidget:
name = pascal_to_snake(widget.__name__)
WidgetContainerUtils.raise_for_invalid_name(name)
window = BECMainWindowNoRPC()
window = BECMainWindow()
widget_instance = widget(root_widget=True, object_name=name)
assert isinstance(widget_instance, QWidget)
QApplication.processEvents()
window.setCentralWidget(widget_instance)
window.resize(window.minimumSizeHint())
window.setWindowTitle(f"BEC - {widget_instance.objectName()}")
apply_window_geometry(window, geometry)
window.show()
return window
@@ -586,21 +484,6 @@ class LaunchWindow(BECMainWindow):
auto_update = None
return self.launch("auto_update", auto_update=auto_update)
def _open_dock_area(self):
"""
Open Advanced Dock Area using the selected profile.
"""
tile = self.tiles.get("dock_area")
if tile is None or tile.selector is None:
startup_profile = None
else:
selection = tile.selector.currentText().strip()
if selection == START_EMPTY_PROFILE_OPTION:
startup_profile = None
else:
startup_profile = selection if selection else None
return self.launch("dock_area", startup_profile=startup_profile)
def _open_widget(self):
"""
Open a widget from the available widgets.
@@ -612,10 +495,6 @@ class LaunchWindow(BECMainWindow):
raise ValueError(f"Widget {widget} not found in available widgets.")
return self.launch("widget", widget=self.available_widgets[widget])
def _default_launch_geometry(self) -> tuple[int, int, int, int] | None:
width, height = self.DEFAULT_LAUNCH_SIZE
return centered_geometry_for_app(width=width, height=height)
@SafeSlot(popup_error=True)
def _open_custom_ui_file(self):
"""
@@ -652,96 +531,53 @@ class LaunchWindow(BECMainWindow):
self.hide()
def showEvent(self, event):
self._refresh_dock_area_profiles()
super().showEvent(event)
self.setFixedSize(self.size())
def _has_external_window(self, connections: dict) -> bool:
def _launcher_is_last_widget(self, connections: dict) -> bool:
"""
Check if any registered non-launcher connection owns a top-level Qt window.
Check if the launcher is the last widget in the application.
"""
for connection in connections.values():
if self._connection_belongs_to_launcher(connection):
continue
if isinstance(connection, QWidget) and connection.isWindow():
return True
return False
def _log_unparented_connections(self, connections: dict) -> None:
"""
Log non-launcher RPC connections that remain without an active top-level window.
"""
for connection in connections.values():
if self._connection_belongs_to_launcher(connection):
continue
if isinstance(connection, QWidget) and connection.isWindow():
continue
connection_description = (
f"type={type(connection).__name__} objectName={connection.objectName()!r} "
f"gui_id={connection.gui_id!r}"
)
if connection_description in self._logged_unparented_connections:
continue
self._logged_unparented_connections.add(connection_description)
logger.warning(
"Registered non-launcher RPC connection has no active top-level window: "
f"{connection_description}"
)
def _connection_belongs_to_launcher(self, connection: QObject) -> bool:
"""
Check whether a registered connection is the launcher itself or part of its Qt hierarchy.
"""
if connection is self or connection.gui_id == self.gui_id:
return True
parent = connection.parent()
while parent is not None:
if parent is self:
return True
parent = parent.parent()
return False
remaining_connections = [
connection for connection in connections.values() if connection.parent_id != self.gui_id
]
return len(remaining_connections) <= 4
def _turn_off_the_lights(self, connections: dict):
"""
If there is only one connection remaining, it is the launcher, so we show it.
Once the launcher is closed as the last window, we quit the application.
"""
if self._has_external_window(connections):
self.hide()
if self._launcher_is_last_widget(connections):
self.show()
self.activateWindow()
self.raise_()
if self.app:
self.app.setQuitOnLastWindowClosed(False) # type: ignore
self.app.setQuitOnLastWindowClosed(True) # type: ignore
return
self._log_unparented_connections(connections)
self.show()
self.activateWindow()
self.raise_()
self.hide()
if self.app:
self.app.setQuitOnLastWindowClosed(True) # type: ignore
self.app.setQuitOnLastWindowClosed(False) # type: ignore
def closeEvent(self, event):
"""
Close the launcher window.
"""
connections = self.register.list_all_connections()
if self._has_external_window(connections):
event.ignore()
self.hide()
if self._launcher_is_last_widget(connections):
event.accept()
return
event.accept()
event.ignore()
self.hide()
if __name__ == "__main__": # pragma: no cover
if __name__ == "__main__":
import sys
from bec_widgets.utils.colors import apply_theme
app = QApplication(sys.argv)
apply_theme("dark")
launcher = LaunchWindow()
launcher.show()
sys.exit(app.exec())
-414
View File
@@ -1,414 +0,0 @@
from bec_qthemes import material_icon
from qtpy.QtGui import QAction # type: ignore
from qtpy.QtWidgets import QApplication, QHBoxLayout, QStackedWidget, QWidget
from bec_widgets.applications.navigation_centre.reveal_animator import ANIMATION_DURATION
from bec_widgets.applications.navigation_centre.side_bar import SideBar
from bec_widgets.applications.navigation_centre.side_bar_components import NavigationItem
from bec_widgets.applications.views.admin_view.admin_view import AdminView
from bec_widgets.applications.views.developer_view.developer_view import DeveloperView
from bec_widgets.applications.views.device_manager_view.device_manager_view import DeviceManagerView
from bec_widgets.applications.views.dock_area_view.dock_area_view import DockAreaView
from bec_widgets.applications.views.view import ViewBase, WaveformViewInline, WaveformViewPopup
from bec_widgets.utils.colors import apply_theme
from bec_widgets.utils.guided_tour import GuidedTour
from bec_widgets.utils.name_utils import sanitize_namespace
from bec_widgets.utils.screen_utils import (
apply_centered_size,
available_screen_geometry,
main_app_size_for_screen,
)
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow
class BECMainApp(BECMainWindow):
RPC = False
PLUGIN = False
def __init__(
self,
parent=None,
*args,
anim_duration: int = ANIMATION_DURATION,
show_examples: bool = False,
**kwargs,
):
super().__init__(parent=parent, *args, **kwargs)
self._show_examples = bool(show_examples)
# --- Compose central UI (sidebar + stack)
self.sidebar = SideBar(parent=self, anim_duration=anim_duration)
self.stack = QStackedWidget(self)
container = QWidget(self)
layout = QHBoxLayout(container)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(0)
layout.addWidget(self.sidebar, 0)
layout.addWidget(self.stack, 1)
self.setCentralWidget(container)
# Mapping for view switching
self._view_index: dict[str, int] = {}
self._current_view_id: str | None = None
self.sidebar.view_selected.connect(self._on_view_selected)
self._add_views()
# Initialize guided tour
self.guided_tour = GuidedTour(self)
self._setup_guided_tour()
def _add_views(self):
self.add_section("BEC Applications", "bec_apps")
self.dock_area = DockAreaView(self)
self.device_manager = DeviceManagerView(self)
# self.developer_view = DeveloperView(self) #TODO temporary disable until the bugs with BECShell are resolved
self.admin_view = AdminView(self)
self.add_view(icon="widgets", title="Dock Area", widget=self.dock_area, mini_text="Docks")
self.add_view(
icon="display_settings",
title="Device Manager",
widget=self.device_manager,
mini_text="DM",
)
# TODO temporary disable until the bugs with BECShell are resolved
# self.add_view(
# icon="code_blocks",
# title="IDE",
# widget=self.developer_view,
# mini_text="IDE",
# exclusive=True,
# )
self.add_view(
icon="admin_panel_settings",
title="Admin View",
widget=self.admin_view,
mini_text="Admin",
from_top=False,
)
if self._show_examples:
self.add_section("Examples", "examples")
waveform_view_popup = WaveformViewPopup(
parent=self, view_id="waveform_view_popup", title="Waveform Plot"
)
waveform_view_stack = WaveformViewInline(
parent=self, view_id="waveform_view_stack", title="Waveform Plot"
)
self.add_view(
icon="show_chart",
title="Waveform With Popup",
widget=waveform_view_popup,
mini_text="Popup",
)
self.add_view(
icon="show_chart",
title="Waveform InLine Stack",
widget=waveform_view_stack,
mini_text="Stack",
)
self.set_current("dock_area")
self.sidebar.add_dark_mode_item()
# Add guided tour to Help menu
self._add_guided_tour_to_menu()
# --- Public API ------------------------------------------------------
def add_section(self, title: str, id: str, position: int | None = None):
return self.sidebar.add_section(title, id, position)
def add_separator(self):
return self.sidebar.add_separator()
def add_dark_mode_item(self, id: str = "dark_mode", position: int | None = None):
return self.sidebar.add_dark_mode_item(id=id, position=position)
def add_view(
self,
*,
icon: str,
title: str,
view_id: str | None = None,
widget: QWidget,
mini_text: str | None = None,
position: int | None = None,
from_top: bool = True,
toggleable: bool = True,
exclusive: bool = True,
) -> NavigationItem:
"""
Register a view in the stack and create a matching nav item in the sidebar.
Args:
icon(str): Icon name for the nav item.
title(str): Title for the nav item.
view_id(str, optional): Unique ID for the view/item. If omitted, uses mini_text;
if mini_text is also omitted, uses title.
widget(QWidget): The widget to add to the stack.
mini_text(str, optional): Short text for the nav item when sidebar is collapsed.
position(int, optional): Position to insert the nav item.
from_top(bool, optional): Whether to count position from the top or bottom.
toggleable(bool, optional): Whether the nav item is toggleable.
exclusive(bool, optional): Whether the nav item is exclusive.
Returns:
NavigationItem: The created navigation item.
"""
resolved_id = sanitize_namespace(view_id or mini_text or title)
item = self.sidebar.add_item(
icon=icon,
title=title,
id=resolved_id,
mini_text=mini_text,
position=position,
from_top=from_top,
toggleable=toggleable,
exclusive=exclusive,
)
# Wrap plain widgets into a ViewBase so enter/exit hooks are available
if isinstance(widget, ViewBase):
view_widget = widget
view_widget.view_id = resolved_id
view_widget.view_title = title
else:
view_widget = ViewBase(content=widget, parent=self, view_id=resolved_id, title=title)
view_widget.change_object_name(resolved_id)
idx = self.stack.addWidget(view_widget)
self._view_index[resolved_id] = idx
return item
def set_current(self, id: str) -> None:
if id in self._view_index:
self.sidebar.activate_item(id)
# Internal: route sidebar selection to the stack
def _on_view_selected(self, vid: str) -> None:
# Special handling for views that can not be switched to (e.g. dark mode toggle)
# Not registered as proper view with a stack index, so we ignore any logic below
# as it will anyways not result in a stack switch.
idx = self._view_index.get(vid)
if idx is None or not (0 <= idx < self.stack.count()):
return
# Determine current view
current_index = self.stack.currentIndex()
current_view = (
self.stack.widget(current_index) if 0 <= current_index < self.stack.count() else None
)
# Ask current view whether we may leave
if current_view is not None and hasattr(current_view, "on_exit"):
may_leave = current_view.on_exit()
if may_leave is False:
# Veto: restore previous highlight without re-emitting selection
if self._current_view_id is not None:
self.sidebar.activate_item(self._current_view_id, emit_signal=False)
return
# Proceed with switch
idx = self._view_index.get(vid)
if idx is None or not (0 <= idx < self.stack.count()):
return
self.stack.setCurrentIndex(idx)
new_view = self.stack.widget(idx)
self._current_view_id = vid
if hasattr(new_view, "on_enter"):
new_view.on_enter()
def _setup_guided_tour(self):
"""
Setup the guided tour for the main application.
Registers key UI components and delegates to views for their internal components.
"""
tour_steps = []
# --- General Layout Components ---
# Register the sidebar toggle button
toggle_step = self.guided_tour.register_widget(
widget=self.sidebar.toggle,
title="Sidebar Toggle",
text="Click this button to expand or collapse the sidebar. When expanded, you can see full navigation item titles and section names.",
)
tour_steps.append(toggle_step)
# Register the sidebar icons
sidebar_dock_area = self.sidebar.components.get("dock_area")
if sidebar_dock_area:
dock_step = self.guided_tour.register_widget(
widget=sidebar_dock_area,
title="Dock Area View",
text="Click here to access the Dock Area view, where you can manage and arrange your dockable panels.",
)
tour_steps.append(dock_step)
sidebar_device_manager = self.sidebar.components.get("device_manager")
if sidebar_device_manager:
device_manager_step = self.guided_tour.register_widget(
widget=sidebar_device_manager,
title="Device Manager View",
text="Click here to open the Device Manager view, where you can view and manage device configs.",
)
tour_steps.append(device_manager_step)
sidebar_developer_view = self.sidebar.components.get("developer_view")
if sidebar_developer_view:
developer_view_step = self.guided_tour.register_widget(
widget=sidebar_developer_view,
title="Developer View",
text="Click here to access the Developer view to write scripts and macros.",
)
tour_steps.append(developer_view_step)
# Register the dark mode toggle
dark_mode_item = self.sidebar.components.get("dark_mode")
if dark_mode_item:
dark_mode_step = self.guided_tour.register_widget(
widget=dark_mode_item,
title="Theme Toggle",
text="Switch between light and dark themes. The theme preference is saved and will be applied when you restart the application.",
)
tour_steps.append(dark_mode_step)
# Register the client info label
if hasattr(self, "_client_info_hover"):
client_info_step = self.guided_tour.register_widget(
widget=self._client_info_hover,
title="Client Status",
text="Displays status messages and information from the BEC Server.",
)
tour_steps.append(client_info_step)
# Register the scan progress bar if available
if hasattr(self, "_scan_progress_hover"):
progress_step = self.guided_tour.register_widget(
widget=self._scan_progress_hover,
title="Scan Progress",
text="Monitor the progress of ongoing scans. Hover over the progress bar to see detailed information including elapsed time and estimated completion.",
)
tour_steps.append(progress_step)
# Register the notification indicator in the status bar
if hasattr(self, "notification_indicator"):
notif_step = self.guided_tour.register_widget(
widget=self.notification_indicator,
title="Notification Center",
text="View system notifications, errors, and status updates. Click to filter notifications by type or expand to see all details.",
)
tour_steps.append(notif_step)
# --- View-Specific Components ---
# Register all views that can extend the tour
for view_id, view_index in self._view_index.items():
view_widget = self.stack.widget(view_index)
if not view_widget or not hasattr(view_widget, "register_tour_steps"):
continue
# Get the view's tour steps
view_tour = view_widget.register_tour_steps(self.guided_tour, self)
if view_tour is None:
if hasattr(view_widget.content, "register_tour_steps"):
view_tour = view_widget.content.register_tour_steps(self.guided_tour, self)
if view_tour is None:
continue
# Get the corresponding sidebar navigation item
nav_item = self.sidebar.components.get(view_id)
if not nav_item:
continue
# Use the view's title for the navigation button
nav_step = self.guided_tour.register_widget(
widget=nav_item,
title=view_tour.view_title,
text=f"Let's explore the features of the {view_tour.view_title}.",
)
tour_steps.append(nav_step)
tour_steps.extend(view_tour.step_ids)
# Create the tour with all registered steps
if tour_steps:
self.guided_tour.create_tour(tour_steps)
def start_guided_tour(self):
"""
Public method to start the guided tour.
This can be called programmatically or connected to a menu/button action.
"""
self.guided_tour.start_tour()
def _add_guided_tour_to_menu(self):
"""
Add a 'Guided Tour' action to the Help menu.
"""
# Find the Help menu
menu_bar = self.menuBar()
help_menu = None
for action in menu_bar.actions():
if action.text() == "Help":
help_menu = action.menu()
break
if help_menu:
# Add separator before the tour action
help_menu.addSeparator()
# Create and add the guided tour action
tour_action = QAction("Start Guided Tour", self)
tour_action.setIcon(material_icon("help"))
tour_action.triggered.connect(self.start_guided_tour)
tour_action.setShortcut("F1") # Add keyboard shortcut
help_menu.addAction(tour_action)
def cleanup(self):
for view_id, idx in self._view_index.items():
view = self.stack.widget(idx)
view.close()
view.deleteLater()
super().cleanup()
def main(): # pragma: no cover
"""
Main function to run the BEC main application, exposed as a script entry point through
pyproject.toml.
"""
# pylint: disable=import-outside-toplevel
import argparse
import sys
parser = argparse.ArgumentParser(description="BEC Main Application")
parser.add_argument(
"--examples", action="store_true", help="Show the Examples section with waveform demo views"
)
# Let Qt consume the remaining args
args, qt_args = parser.parse_known_args(sys.argv[1:])
app = QApplication([sys.argv[0], *qt_args])
app.setApplicationName("BEC")
apply_theme("dark")
w = BECMainApp(show_examples=args.examples)
screen_geometry = available_screen_geometry()
if screen_geometry is not None:
width, height = main_app_size_for_screen(screen_geometry)
apply_centered_size(w, width, height, available=screen_geometry)
else:
w.resize(w.minimumSizeHint())
w.show()
sys.exit(app.exec())
if __name__ == "__main__": # pragma: no cover
main()
@@ -1,114 +0,0 @@
from __future__ import annotations
from qtpy.QtCore import QEasingCurve, QParallelAnimationGroup, QPropertyAnimation
from qtpy.QtWidgets import QGraphicsOpacityEffect, QWidget
ANIMATION_DURATION = 500 # ms
class RevealAnimator:
"""Animate reveal/hide for a single widget using opacity + max W/H.
This keeps the widget always visible to avoid jitter from setVisible().
Collapsed state: opacity=0, maxW=0, maxH=0.
Expanded state: opacity=1, maxW=sizeHint.width(), maxH=sizeHint.height().
"""
def __init__(
self,
widget: QWidget,
duration: int = ANIMATION_DURATION,
easing: QEasingCurve.Type = QEasingCurve.InOutCubic,
initially_revealed: bool = False,
*,
animate_opacity: bool = True,
animate_width: bool = True,
animate_height: bool = True,
):
self.widget = widget
self.animate_opacity = animate_opacity
self.animate_width = animate_width
self.animate_height = animate_height
# Opacity effect
self.fx = QGraphicsOpacityEffect(widget)
widget.setGraphicsEffect(self.fx)
# Animations
self.opacity_anim = (
QPropertyAnimation(self.fx, b"opacity") if self.animate_opacity else None
)
self.width_anim = (
QPropertyAnimation(widget, b"maximumWidth") if self.animate_width else None
)
self.height_anim = (
QPropertyAnimation(widget, b"maximumHeight") if self.animate_height else None
)
for anim in (self.opacity_anim, self.width_anim, self.height_anim):
if anim is not None:
anim.setDuration(duration)
anim.setEasingCurve(easing)
# Initialize to requested state
self.set_immediate(initially_revealed)
def _natural_sizes(self) -> tuple[int, int]:
sh = self.widget.sizeHint()
w = max(sh.width(), 1)
h = max(sh.height(), 1)
return w, h
def set_immediate(self, revealed: bool):
"""
Immediately set the widget to the target revealed/collapsed state.
Args:
revealed(bool): True to reveal, False to collapse.
"""
w, h = self._natural_sizes()
if self.animate_opacity:
self.fx.setOpacity(1.0 if revealed else 0.0)
if self.animate_width:
self.widget.setMaximumWidth(w if revealed else 0)
if self.animate_height:
self.widget.setMaximumHeight(h if revealed else 0)
def setup(self, reveal: bool):
"""
Prepare animations to transition to the target revealed/collapsed state.
Args:
reveal(bool): True to reveal, False to collapse.
"""
# Prepare animations from current state to target
target_w, target_h = self._natural_sizes()
if self.opacity_anim is not None:
self.opacity_anim.setStartValue(self.fx.opacity())
self.opacity_anim.setEndValue(1.0 if reveal else 0.0)
if self.width_anim is not None:
self.width_anim.setStartValue(self.widget.maximumWidth())
self.width_anim.setEndValue(target_w if reveal else 0)
if self.height_anim is not None:
self.height_anim.setStartValue(self.widget.maximumHeight())
self.height_anim.setEndValue(target_h if reveal else 0)
def add_to_group(self, group: QParallelAnimationGroup):
"""
Add the prepared animations to the given animation group.
Args:
group(QParallelAnimationGroup): The animation group to add to.
"""
if self.opacity_anim is not None:
group.addAnimation(self.opacity_anim)
if self.width_anim is not None:
group.addAnimation(self.width_anim)
if self.height_anim is not None:
group.addAnimation(self.height_anim)
def animations(self):
"""
Get a list of all animations (non-None) for adding to a group.
"""
return [
anim
for anim in (self.opacity_anim, self.height_anim, self.width_anim)
if anim is not None
]
@@ -1,357 +0,0 @@
from __future__ import annotations
from bec_qthemes import material_icon
from qtpy import QtWidgets
from qtpy.QtCore import QEasingCurve, QParallelAnimationGroup, QPropertyAnimation, Qt, Signal
from qtpy.QtWidgets import (
QGraphicsOpacityEffect,
QHBoxLayout,
QLabel,
QScrollArea,
QToolButton,
QVBoxLayout,
QWidget,
)
from bec_widgets import SafeProperty, SafeSlot
from bec_widgets.applications.navigation_centre.reveal_animator import ANIMATION_DURATION
from bec_widgets.applications.navigation_centre.side_bar_components import (
DarkModeNavItem,
NavigationItem,
SectionHeader,
SideBarSeparator,
)
class SideBar(QScrollArea):
view_selected = Signal(str)
toggled = Signal(bool)
def __init__(
self,
parent=None,
title: str = "Control Panel",
collapsed_width: int = 56,
expanded_width: int = 250,
anim_duration: int = ANIMATION_DURATION,
):
super().__init__(parent=parent)
self.setObjectName("SideBar")
# private attributes
self._is_expanded = False
self._collapsed_width = collapsed_width
self._expanded_width = expanded_width
self._anim_duration = anim_duration
# containers
self.components = {}
self._item_opts: dict[str, dict] = {}
# Scroll area properties
self.setWidgetResizable(True)
self.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
self.setVerticalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
self.setFrameShape(QtWidgets.QFrame.NoFrame)
self.setFixedWidth(self._collapsed_width)
# Content widget holding buttons for switching views
self.content = QWidget(self)
self.content_layout = QVBoxLayout(self.content)
self.content_layout.setContentsMargins(0, 0, 0, 0)
self.content_layout.setSpacing(4)
self.setWidget(self.content)
# Track active navigation item
self._active_id = None
# Top row with title and toggle button
self.toggle_row = QWidget(self)
self.toggle_row_layout = QHBoxLayout(self.toggle_row)
self.title_label = QLabel(title, self)
self.title_label.setObjectName("TopTitle")
self.title_label.setStyleSheet("font-weight: 600;")
self.title_fx = QGraphicsOpacityEffect(self.title_label)
self.title_label.setGraphicsEffect(self.title_fx)
self.title_fx.setOpacity(0.0)
self.title_label.setVisible(False) # TODO dirty trick to avoid layout shift
self.toggle = QToolButton(self)
self.toggle.setCheckable(False)
self.toggle.setIcon(material_icon("keyboard_arrow_right", convert_to_pixmap=False))
self.toggle.clicked.connect(self.on_expand)
self.toggle_row_layout.addWidget(self.title_label, 1, Qt.AlignLeft | Qt.AlignVCenter)
self.toggle_row_layout.addWidget(self.toggle, 1, Qt.AlignHCenter | Qt.AlignVCenter)
# To push the content up always
self._bottom_spacer = QtWidgets.QSpacerItem(
0, 0, QtWidgets.QSizePolicy.Minimum, QtWidgets.QSizePolicy.Expanding
)
# Add core widgets to layout
self.content_layout.addWidget(self.toggle_row)
self.content_layout.addItem(self._bottom_spacer)
# Animations
self.width_anim = QPropertyAnimation(self, b"bar_width")
self.width_anim.setDuration(self._anim_duration)
self.width_anim.setEasingCurve(QEasingCurve.InOutCubic)
self.title_anim = QPropertyAnimation(self.title_fx, b"opacity")
self.title_anim.setDuration(self._anim_duration)
self.title_anim.setEasingCurve(QEasingCurve.InOutCubic)
self.group = QParallelAnimationGroup(self)
self.group.addAnimation(self.width_anim)
self.group.addAnimation(self.title_anim)
self.group.finished.connect(self._on_anim_finished)
app = QtWidgets.QApplication.instance()
if app is not None and hasattr(app, "theme") and hasattr(app.theme, "theme_changed"):
app.theme.theme_changed.connect(self._on_theme_changed)
@SafeProperty(int)
def bar_width(self) -> int:
"""
Get the current width of the side bar.
Returns:
int: The current width of the side bar.
"""
return self.width()
@bar_width.setter
def bar_width(self, width: int):
"""
Set the width of the side bar.
Args:
width(int): The new width of the side bar.
"""
self.setFixedWidth(width)
@SafeProperty(bool)
def is_expanded(self) -> bool:
"""
Check if the side bar is expanded.
Returns:
bool: True if the side bar is expanded, False otherwise.
"""
return self._is_expanded
@SafeSlot()
@SafeSlot(bool)
def on_expand(self):
"""
Toggle the expansion state of the side bar.
"""
self._is_expanded = not self._is_expanded
self.toggle.setIcon(
material_icon(
"keyboard_arrow_left" if self._is_expanded else "keyboard_arrow_right",
convert_to_pixmap=False,
)
)
if self._is_expanded:
self.toggle_row_layout.setAlignment(self.toggle, Qt.AlignRight | Qt.AlignVCenter)
self.group.stop()
# Setting limits for animations of the side bar
self.width_anim.setStartValue(self.width())
self.width_anim.setEndValue(
self._expanded_width if self._is_expanded else self._collapsed_width
)
self.title_anim.setStartValue(self.title_fx.opacity())
self.title_anim.setEndValue(1.0 if self._is_expanded else 0.0)
# Setting limits for animations of the components
for comp in self.components.values():
if hasattr(comp, "setup_animations"):
comp.setup_animations(self._is_expanded)
self.group.start()
if self._is_expanded:
# TODO do not like this trick, but it is what it is for now
self.title_label.setVisible(self._is_expanded)
for comp in self.components.values():
if hasattr(comp, "set_visible"):
comp.set_visible(self._is_expanded)
self.toggled.emit(self._is_expanded)
@SafeSlot()
def _on_anim_finished(self):
if not self._is_expanded:
self.toggle_row_layout.setAlignment(self.toggle, Qt.AlignHCenter | Qt.AlignVCenter)
# TODO do not like this trick, but it is what it is for now
self.title_label.setVisible(self._is_expanded)
for comp in self.components.values():
if hasattr(comp, "set_visible"):
comp.set_visible(self._is_expanded)
@SafeSlot(str)
def _on_theme_changed(self, theme_name: str):
# Refresh toggle arrow icon so it picks up the new theme
self.toggle.setIcon(
material_icon(
"keyboard_arrow_left" if self._is_expanded else "keyboard_arrow_right",
convert_to_pixmap=False,
)
)
# Refresh each component that supports it
for comp in self.components.values():
if hasattr(comp, "refresh_theme"):
comp.refresh_theme()
else:
comp.style().unpolish(comp)
comp.style().polish(comp)
comp.update()
self.style().unpolish(self)
self.style().polish(self)
self.update()
def add_section(self, title: str, id: str, position: int | None = None) -> SectionHeader:
"""
Add a section header to the side bar.
Args:
title(str): The title of the section.
id(str): Unique ID for the section.
position(int, optional): Position to insert the section header.
Returns:
SectionHeader: The created section header.
"""
header = SectionHeader(self, title, anim_duration=self._anim_duration)
position = position if position is not None else self.content_layout.count() - 1
self.content_layout.insertWidget(position, header)
for anim in header.animations:
self.group.addAnimation(anim)
self.components[id] = header
return header
def add_separator(
self, *, from_top: bool = True, position: int | None = None
) -> SideBarSeparator:
"""
Add a separator line to the side bar. Separators are treated like regular
items; you can place multiple separators anywhere using `from_top` and `position`.
"""
line = SideBarSeparator(self)
line.setStyleSheet("margin:12px;")
self._insert_nav_item(line, from_top=from_top, position=position)
return line
def add_item(
self,
icon: str,
title: str,
id: str,
mini_text: str | None = None,
position: int | None = None,
*,
from_top: bool = True,
toggleable: bool = True,
exclusive: bool = True,
) -> NavigationItem:
"""
Add a navigation item to the side bar.
Args:
icon(str): Icon name for the nav item.
title(str): Title for the nav item.
id(str): Unique ID for the nav item.
mini_text(str, optional): Short text for the nav item when sidebar is collapsed.
position(int, optional): Position to insert the nav item.
from_top(bool, optional): Whether to count position from the top or bottom.
toggleable(bool, optional): Whether the nav item is toggleable.
exclusive(bool, optional): Whether the nav item is exclusive.
Returns:
NavigationItem: The created navigation item.
"""
item = NavigationItem(
parent=self,
title=title,
icon_name=icon,
mini_text=mini_text,
toggleable=toggleable,
exclusive=exclusive,
anim_duration=self._anim_duration,
)
self._insert_nav_item(item, from_top=from_top, position=position)
for anim in item.build_animations():
self.group.addAnimation(anim)
self.components[id] = item
# Connect activation to activation logic, passing id unchanged
item.activated.connect(lambda id=id: self.activate_item(id))
return item
def activate_item(self, target_id: str, *, emit_signal: bool = True):
target = self.components.get(target_id)
if target is None:
return
# Non-toggleable acts like an action: do not change any toggled states
if hasattr(target, "toggleable") and not target.toggleable:
self._active_id = target_id
if emit_signal:
self.view_selected.emit(target_id)
return
is_exclusive = getattr(target, "exclusive", True)
if is_exclusive:
# Radio-like behavior among exclusive items only
for comp_id, comp in self.components.items():
if not isinstance(comp, NavigationItem):
continue
if comp is target:
comp.set_active(True)
else:
# Only untoggle other items that are also exclusive
if getattr(comp, "exclusive", True):
comp.set_active(False)
# Leave non-exclusive items as they are
else:
# Non-exclusive toggles independently
target.set_active(not target.is_active())
self._active_id = target_id
if emit_signal:
self.view_selected.emit(target_id)
def add_dark_mode_item(
self, id: str = "dark_mode", position: int | None = None
) -> DarkModeNavItem:
"""
Add a dark mode toggle item to the side bar.
Args:
id(str): Unique ID for the dark mode item.
position(int, optional): Position to insert the dark mode item.
Returns:
DarkModeNavItem: The created dark mode navigation item.
"""
item = DarkModeNavItem(parent=self, id=id, anim_duration=self._anim_duration)
# compute bottom insertion point (same semantics as from_top=False)
self._insert_nav_item(item, from_top=False, position=position)
for anim in item.build_animations():
self.group.addAnimation(anim)
self.components[id] = item
item.activated.connect(lambda id=id: self.activate_item(id))
return item
def _insert_nav_item(
self, item: QWidget, *, from_top: bool = True, position: int | None = None
):
if from_top:
base_index = self.content_layout.indexOf(self._bottom_spacer)
pos = base_index if position is None else min(base_index, position)
else:
base = self.content_layout.indexOf(self._bottom_spacer) + 1
pos = base if position is None else base + max(0, position)
self.content_layout.insertWidget(pos, item)
@@ -1,370 +0,0 @@
from __future__ import annotations
from bec_qthemes import material_icon
from qtpy import QtCore
from qtpy.QtCore import QEasingCurve, QPropertyAnimation, Qt
from qtpy.QtWidgets import (
QApplication,
QFrame,
QHBoxLayout,
QLabel,
QSizePolicy,
QToolButton,
QVBoxLayout,
QWidget,
)
from bec_widgets import SafeProperty
from bec_widgets.applications.navigation_centre.reveal_animator import (
ANIMATION_DURATION,
RevealAnimator,
)
def get_on_primary():
app = QApplication.instance()
if app is not None and hasattr(app, "theme"):
return app.theme.color("ON_PRIMARY")
return "#FFFFFF"
def get_fg():
app = QApplication.instance()
if app is not None and hasattr(app, "theme"):
return app.theme.color("FG")
return "#FFFFFF"
class SideBarSeparator(QFrame):
"""A horizontal line separator for use in SideBar."""
def __init__(self, parent=None):
super().__init__(parent)
self.setObjectName("SideBarSeparator")
self.setFrameShape(QFrame.NoFrame)
self.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed)
self.setFixedHeight(2)
self.setProperty("variant", "separator")
class SectionHeader(QWidget):
"""A section header with a label and a horizontal line below."""
def __init__(self, parent=None, text: str = None, anim_duration: int = ANIMATION_DURATION):
super().__init__(parent)
self.setObjectName("SectionHeader")
self.lbl = QLabel(text, self)
self.lbl.setObjectName("SectionHeaderLabel")
self.lbl.setAlignment(Qt.AlignLeft | Qt.AlignVCenter)
self._reveal = RevealAnimator(self.lbl, duration=anim_duration, initially_revealed=False)
self.line = SideBarSeparator(self)
lay = QVBoxLayout(self)
# keep your margins/spacing preferences here if needed
lay.setContentsMargins(12, 0, 12, 0)
lay.setSpacing(6)
lay.addWidget(self.lbl)
lay.addWidget(self.line)
self.animations = self.build_animations()
def build_animations(self) -> list[QPropertyAnimation]:
"""
Build and return animations for expanding/collapsing the sidebar.
Returns:
list[QPropertyAnimation]: List of animations.
"""
return self._reveal.animations()
def setup_animations(self, expanded: bool):
"""
Setup animations for expanding/collapsing the sidebar.
Args:
expanded(bool): True if the sidebar is expanded, False if collapsed.
"""
self._reveal.setup(expanded)
class NavigationItem(QWidget):
"""A nav tile with an icon + labels and an optional expandable body.
Provides animations for collapsed/expanded sidebar states via
build_animations()/setup_animations(), similar to SectionHeader.
"""
activated = QtCore.Signal()
def __init__(
self,
parent=None,
*,
title: str,
icon_name: str,
mini_text: str | None = None,
toggleable: bool = True,
exclusive: bool = True,
anim_duration: int = ANIMATION_DURATION,
):
super().__init__(parent=parent)
self.setObjectName("NavigationItem")
# Private attributes
self._title = title
self._icon_name = icon_name
self._mini_text = mini_text or title
self._toggleable = toggleable
self._toggled = False
self._exclusive = exclusive
# Main Icon
self.icon_btn = QToolButton(self)
self.icon_btn.setIcon(material_icon(self._icon_name, filled=False, convert_to_pixmap=False))
self.icon_btn.setAutoRaise(True)
self._icon_size_collapsed = QtCore.QSize(20, 20)
self._icon_size_expanded = QtCore.QSize(26, 26)
self.icon_btn.setIconSize(self._icon_size_collapsed)
# Remove QToolButton hover/pressed background/outline
self.icon_btn.setStyleSheet("""
QToolButton:hover { background: transparent; border: none; }
QToolButton:pressed { background: transparent; border: none; }
""")
# Mini label below icon
self.mini_lbl = QLabel(self._mini_text, self)
self.mini_lbl.setObjectName("NavMiniLabel")
self.mini_lbl.setAlignment(Qt.AlignCenter)
self.mini_lbl.setStyleSheet("font-size: 10px;")
self.reveal_mini_lbl = RevealAnimator(
widget=self.mini_lbl,
initially_revealed=True,
animate_width=False,
duration=anim_duration,
)
# Container for icon + mini label
self.mini_icon = QWidget(self)
mini_lay = QVBoxLayout(self.mini_icon)
mini_lay.setContentsMargins(0, 2, 0, 2)
mini_lay.setSpacing(2)
mini_lay.addWidget(self.icon_btn, 0, Qt.AlignCenter)
mini_lay.addWidget(self.mini_lbl, 0, Qt.AlignCenter)
# Title label
self.title_lbl = QLabel(self._title, self)
self.title_lbl.setObjectName("NavTitleLabel")
self.title_lbl.setAlignment(Qt.AlignLeft | Qt.AlignVCenter)
self.title_lbl.setStyleSheet("font-size: 13px;")
self.reveal_title_lbl = RevealAnimator(
widget=self.title_lbl,
initially_revealed=False,
animate_height=False,
duration=anim_duration,
)
self.title_lbl.setVisible(False) # TODO dirty trick to avoid layout shift
lay = QHBoxLayout(self)
lay.setContentsMargins(12, 2, 12, 2)
lay.setSpacing(6)
lay.addWidget(self.mini_icon, 0, Qt.AlignHCenter | Qt.AlignTop)
lay.addWidget(self.title_lbl, 1, Qt.AlignLeft | Qt.AlignVCenter)
self.icon_size_anim = QPropertyAnimation(self.icon_btn, b"iconSize")
self.icon_size_anim.setDuration(anim_duration)
self.icon_size_anim.setEasingCurve(QEasingCurve.InOutCubic)
# Connect icon button to emit activation
self.icon_btn.clicked.connect(self._emit_activated)
self.setMouseTracking(True)
self.setAttribute(Qt.WA_StyledBackground, True)
def is_active(self) -> bool:
"""Return whether the item is currently active/selected."""
return self.property("toggled") is True
def build_animations(self) -> list[QPropertyAnimation]:
"""
Build and return animations for expanding/collapsing the sidebar.
Returns:
list[QPropertyAnimation]: List of animations.
"""
return (
self.reveal_title_lbl.animations()
+ self.reveal_mini_lbl.animations()
+ [self.icon_size_anim]
)
def setup_animations(self, expanded: bool):
"""
Setup animations for expanding/collapsing the sidebar.
Args:
expanded(bool): True if the sidebar is expanded, False if collapsed.
"""
self.reveal_mini_lbl.setup(not expanded)
self.reveal_title_lbl.setup(expanded)
self.icon_size_anim.setStartValue(self.icon_btn.iconSize())
self.icon_size_anim.setEndValue(
self._icon_size_expanded if expanded else self._icon_size_collapsed
)
def set_visible(self, visible: bool):
"""Set visibility of the title label."""
self.title_lbl.setVisible(visible)
def _emit_activated(self):
self.activated.emit()
def set_active(self, active: bool):
"""
Set the active/selected state of the item.
Args:
active(bool): True to set active, False to deactivate.
"""
self.setProperty("toggled", active)
self.toggled = active
# ensure style refresh
self.style().unpolish(self)
self.style().polish(self)
self.update()
def mousePressEvent(self, event):
self.activated.emit()
super().mousePressEvent(event)
@SafeProperty(bool)
def toggleable(self) -> bool:
"""
Whether the item is toggleable (like a button) or not (like an action).
Returns:
bool: True if toggleable, False otherwise.
"""
return self._toggleable
@toggleable.setter
def toggleable(self, value: bool):
"""
Set whether the item is toggleable (like a button) or not (like an action).
Args:
value(bool): True to make toggleable, False otherwise.
"""
self._toggleable = bool(value)
@SafeProperty(bool)
def toggled(self) -> bool:
"""
Whether the item is currently toggled/selected.
Returns:
bool: True if toggled, False otherwise.
"""
return self._toggled
@toggled.setter
def toggled(self, value: bool):
"""
Set whether the item is currently toggled/selected.
Args:
value(bool): True to set toggled, False to untoggle.
"""
self._toggled = value
if value:
new_icon = material_icon(
self._icon_name, filled=True, color=get_on_primary(), convert_to_pixmap=False
)
else:
new_icon = material_icon(
self._icon_name, filled=False, color=get_fg(), convert_to_pixmap=False
)
self.icon_btn.setIcon(new_icon)
# Re-polish so QSS applies correct colors to icon/labels
for w in (self, self.icon_btn, self.title_lbl, self.mini_lbl):
w.style().unpolish(w)
w.style().polish(w)
w.update()
@SafeProperty(bool)
def exclusive(self) -> bool:
"""
Whether the item is exclusive in its toggle group.
Returns:
bool: True if exclusive, False otherwise.
"""
return self._exclusive
@exclusive.setter
def exclusive(self, value: bool):
"""
Set whether the item is exclusive in its toggle group.
Args:
value(bool): True to make exclusive, False otherwise.
"""
self._exclusive = bool(value)
def refresh_theme(self):
# Recompute icon/label colors according to current theme and state
# Trigger the toggled setter to rebuild the icon with the correct color
self.toggled = self._toggled
# Ensure QSS-driven text/icon colors refresh
for w in (self, self.icon_btn, self.title_lbl, self.mini_lbl):
w.style().unpolish(w)
w.style().polish(w)
w.update()
class DarkModeNavItem(NavigationItem):
"""Bottom action item that toggles app theme and updates its icon/text."""
def __init__(
self, parent=None, *, id: str = "dark_mode", anim_duration: int = ANIMATION_DURATION
):
super().__init__(
parent=parent,
title="Dark mode",
icon_name="dark_mode",
mini_text="Dark",
toggleable=False, # action-like, no selection highlight changes
exclusive=False,
anim_duration=anim_duration,
)
self._id = id
self._sync_from_qapp_theme()
self.activated.connect(self.toggle_theme)
def _qapp_dark_enabled(self) -> bool:
qapp = QApplication.instance()
return bool(getattr(getattr(qapp, "theme", None), "theme", None) == "dark")
def _sync_from_qapp_theme(self):
is_dark = self._qapp_dark_enabled()
# Update labels
self.title_lbl.setText("Light mode" if is_dark else "Dark mode")
self.mini_lbl.setText("Light" if is_dark else "Dark")
# Update icon
self.icon_btn.setIcon(
material_icon("light_mode" if is_dark else "dark_mode", convert_to_pixmap=False)
)
def refresh_theme(self):
self._sync_from_qapp_theme()
for w in (self, self.icon_btn, self.title_lbl, self.mini_lbl):
w.style().unpolish(w)
w.style().polish(w)
w.update()
def toggle_theme(self):
"""Toggle application theme and update icon/text."""
from bec_widgets.utils.colors import apply_theme
is_dark = self._qapp_dark_enabled()
apply_theme("light" if is_dark else "dark")
self._sync_from_qapp_theme()
@@ -1,35 +0,0 @@
"""Module for Admin View."""
from qtpy.QtWidgets import QWidget
from bec_widgets.applications.views.view import ViewBase
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.services.bec_atlas_admin_view.bec_atlas_admin_view import BECAtlasAdminView
class AdminView(ViewBase):
"""
A view for administrators to change the current active experiment, manage messaging
services, and more tasks reserved for users with admin privileges.
"""
def __init__(
self,
parent: QWidget | None = None,
content: QWidget | None = None,
*,
view_id: str | None = None,
title: str | None = None,
**kwargs,
):
super().__init__(parent=parent, content=content, view_id=view_id, title=title)
self.admin_widget = BECAtlasAdminView(parent=self)
self.set_content(self.admin_widget)
@SafeSlot()
def on_exit(self) -> None:
"""Called before the view is hidden.
Default implementation does nothing. Override in subclasses.
"""
self.admin_widget.logout()
@@ -1,140 +0,0 @@
from qtpy.QtWidgets import QWidget
from bec_widgets.applications.views.developer_view.developer_widget import DeveloperWidget
from bec_widgets.applications.views.view import ViewBase, ViewTourSteps
class DeveloperView(ViewBase):
"""
A view for users to write scripts and macros and execute them within the application.
"""
def __init__(
self,
parent: QWidget | None = None,
content: QWidget | None = None,
*,
view_id: str | None = None,
title: str | None = None,
**kwargs,
):
super().__init__(parent=parent, content=content, view_id=view_id, title=title, **kwargs)
self.developer_widget = DeveloperWidget(parent=self)
self.set_content(self.developer_widget)
def register_tour_steps(self, guided_tour, main_app) -> ViewTourSteps | None:
"""Register Developer View components with the guided tour.
Args:
guided_tour: The GuidedTour instance to register with.
main_app: The main application instance (for accessing set_current).
Returns:
ViewTourSteps | None: Model containing view title and step IDs.
"""
step_ids = []
dev_widget = self.developer_widget
# IDE Toolbar
def get_ide_toolbar():
main_app.set_current("developer_view")
return (dev_widget.toolbar, None)
step_id = guided_tour.register_widget(
widget=get_ide_toolbar,
title="IDE Toolbar",
text="Quick access to save files, execute scripts, and configure IDE settings. Use the toolbar to manage your code and execution.",
)
step_ids.append(step_id)
# IDE Explorer
def get_ide_explorer():
main_app.set_current("developer_view")
return (dev_widget.explorer_dock.widget(), None)
step_id = guided_tour.register_widget(
widget=get_ide_explorer,
title="File Explorer",
text="Browse and manage your macro files. Create new files, open existing ones, and organize your scripts.",
)
step_ids.append(step_id)
# IDE Editor
def get_ide_editor():
main_app.set_current("developer_view")
return (dev_widget.monaco_dock.widget(), None)
step_id = guided_tour.register_widget(
widget=get_ide_editor,
title="Code Editor",
text="Write and edit Python code with syntax highlighting, auto-completion, and signature help. Monaco editor provides a modern coding experience.",
)
step_ids.append(step_id)
# IDE Console
def get_ide_console():
main_app.set_current("developer_view")
return (dev_widget.console_dock.widget(), None)
step_id = guided_tour.register_widget(
widget=get_ide_console,
title="BEC Shell Console",
text="Interactive Python console with BEC integration. Execute commands, test code snippets, and interact with the BEC system in real-time.",
)
step_ids.append(step_id)
# IDE Plotting Area
def get_ide_plotting():
main_app.set_current("developer_view")
return (dev_widget.plotting_ads, None)
step_id = guided_tour.register_widget(
widget=get_ide_plotting,
title="Plotting Area",
text="View plots and visualizations generated by your scripts. Arrange multiple plots in a flexible layout.",
)
step_ids.append(step_id)
return ViewTourSteps(view_title="Developer View", step_ids=step_ids)
if __name__ == "__main__":
import sys
from bec_qthemes import apply_theme
from qtpy.QtWidgets import QApplication
from bec_widgets.applications.main_app import BECMainApp
app = QApplication(sys.argv)
apply_theme("dark")
_app = BECMainApp()
screen = app.primaryScreen()
screen_geometry = screen.availableGeometry()
screen_width = screen_geometry.width()
screen_height = screen_geometry.height()
# 70% of screen height, keep 16:9 ratio
height = int(screen_height * 0.9)
width = int(height * (16 / 9))
# If width exceeds screen width, scale down
if width > screen_width * 0.9:
width = int(screen_width * 0.9)
height = int(width / (16 / 9))
_app.resize(width, height)
developer_view = DeveloperView()
_app.add_view(
icon="code_blocks",
title="IDE",
widget=developer_view,
view_id="developer_view",
exclusive=True,
)
_app.show()
# developer_view.show()
# developer_view.setWindowTitle("Developer View")
# developer_view.resize(1920, 1080)
# developer_view.set_stretch(horizontal=[1, 3, 2], vertical=[5, 5]) #can be set during runtime
sys.exit(app.exec_())
@@ -1,412 +0,0 @@
from __future__ import annotations
import re
import markdown
from bec_lib.endpoints import MessageEndpoints
from bec_lib.script_executor import upload_script
from bec_qthemes import material_icon
from qtpy.QtGui import QKeySequence, QShortcut
from qtpy.QtWidgets import QTextEdit
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.toolbars.actions import MaterialIconAction
from bec_widgets.utils.toolbars.bundles import ToolbarBundle
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.widgets.containers.dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
from bec_widgets.widgets.containers.qt_ads import CDockWidget
from bec_widgets.widgets.editors.bec_console.bec_console import BecConsole, BECShell
from bec_widgets.widgets.editors.monaco.monaco_dock import MonacoDock
from bec_widgets.widgets.editors.monaco.monaco_widget import MonacoWidget
from bec_widgets.widgets.utility.ide_explorer.ide_explorer import IDEExplorer
def markdown_to_html(md_text: str) -> str:
"""Convert Markdown with syntax highlighting to HTML (Qt-compatible)."""
# Preprocess: convert consecutive >>> lines to Python code blocks
def replace_python_examples(match):
indent = match.group(1)
examples = match.group(2)
# Remove >>> prefix and clean up the code
lines = []
for line in examples.strip().split("\n"):
line = line.strip()
if line.startswith(">>> "):
lines.append(line[4:]) # Remove '>>> '
elif line.startswith(">>>"):
lines.append(line[3:]) # Remove '>>>'
code = "\n".join(lines)
return f"{indent}```python\n{indent}{code}\n{indent}```"
# Match one or more consecutive >>> lines (with same indentation)
pattern = r"^(\s*)((?:>>> .+(?:\n|$))+)"
md_text = re.sub(pattern, replace_python_examples, md_text, flags=re.MULTILINE)
extensions = ["fenced_code", "codehilite", "tables", "sane_lists"]
html = markdown.markdown(
md_text,
extensions=extensions,
extension_configs={
"codehilite": {"linenums": False, "guess_lang": False, "noclasses": True}
},
output_format="html",
)
# Remove hardcoded background colors that conflict with themes
html = re.sub(r'style="background: #[^"]*"', 'style="background: transparent"', html)
html = re.sub(r"background: #[^;]*;", "", html)
# Add CSS to force code blocks to wrap
css = """
<style>
pre, code {
white-space: pre-wrap !important;
word-wrap: break-word !important;
overflow-wrap: break-word !important;
}
.codehilite pre {
white-space: pre-wrap !important;
word-wrap: break-word !important;
overflow-wrap: break-word !important;
}
</style>
"""
return css + html
class DeveloperWidget(DockAreaWidget):
RPC = False
PLUGIN = False
def __init__(self, parent=None, **kwargs):
super().__init__(parent=parent, variant="compact", **kwargs)
# Promote toolbar above the dock manager provided by the base class
self.toolbar = ModularToolBar(self)
self.init_developer_toolbar()
self._root_layout.insertWidget(0, self.toolbar)
# Initialize the widgets
self.explorer = IDEExplorer(self)
self.explorer.setObjectName("Explorer")
self.console = BECShell(self, rpc_exposed=False)
self.console.setObjectName("BEC Shell")
self.terminal = BecConsole(self, rpc_exposed=False)
self.terminal.setObjectName("Terminal")
self.monaco = MonacoDock(self, rpc_exposed=False, rpc_passthrough_children=False)
self.monaco.setObjectName("MonacoEditor")
self.monaco.save_enabled.connect(self._on_save_enabled_update)
self.plotting_ads = BECDockArea(
self,
mode="plot",
default_add_direction="bottom",
profile_namespace="developer_plotting",
auto_profile_namespace=False,
enable_profile_management=False,
variant="compact",
)
self.plotting_ads.setObjectName("PlottingArea")
self.signature_help = QTextEdit(self)
self.signature_help.setObjectName("Signature Help")
self.signature_help.setAcceptRichText(True)
self.signature_help.setReadOnly(True)
self.signature_help.setLineWrapMode(QTextEdit.LineWrapMode.WidgetWidth)
opt = self.signature_help.document().defaultTextOption()
opt.setWrapMode(opt.WrapMode.WrapAnywhere)
self.signature_help.document().setDefaultTextOption(opt)
self.monaco.signature_help.connect(
lambda text: self.signature_help.setHtml(markdown_to_html(text))
)
self._current_script_id: str | None = None
self.script_editor_tab = None
self._initialize_layout()
# Connect editor signals
self.explorer.file_open_requested.connect(self._open_new_file)
self.monaco.macro_file_updated.connect(self.explorer.refresh_macro_file)
self.monaco.focused_editor.connect(self._on_focused_editor_changed)
self.toolbar.show_bundles(["save", "execution", "settings"])
def _initialize_layout(self) -> None:
"""Create the default dock arrangement for the developer workspace."""
# Monaco editor as the central dock
self.monaco_dock = self.new(
self.monaco,
closable=False,
floatable=False,
movable=False,
return_dock=True,
show_title_bar=False,
show_settings_action=False,
title_buttons={"float": False, "close": False, "menu": False},
# promote_central=True,
)
# Explorer on the left without a title bar
self.explorer_dock = self.new(
self.explorer,
where="left",
closable=False,
floatable=False,
movable=False,
return_dock=True,
show_title_bar=False,
)
# Console and terminal tabbed along the bottom
self.console_dock = self.new(
self.console,
relative_to=self.monaco_dock,
where="bottom",
closable=False,
floatable=False,
movable=False,
return_dock=True,
title_buttons={"float": True, "close": False},
)
self.terminal_dock = self.new(
self.terminal,
closable=False,
floatable=False,
movable=False,
tab_with=self.console_dock,
return_dock=True,
title_buttons={"float": False, "close": False},
)
# Plotting area on the right with signature help tabbed alongside
self.plotting_ads_dock = self.new(
self.plotting_ads,
where="right",
closable=False,
floatable=False,
movable=False,
return_dock=True,
title_buttons={"float": True},
)
self.signature_dock = self.new(
self.signature_help,
closable=False,
floatable=False,
movable=False,
tab_with=self.plotting_ads_dock,
return_dock=True,
title_buttons={"float": False, "close": False},
)
self.set_layout_ratios(horizontal=[2, 5, 3], vertical=[7, 3])
def init_developer_toolbar(self):
"""Initialize the developer toolbar with necessary actions and widgets."""
save_button = MaterialIconAction(
icon_name="save", tooltip="Save", label_text="Save", filled=True, parent=self
)
save_button.action.triggered.connect(self.on_save)
self.toolbar.components.add_safe("save", save_button)
save_as_button = MaterialIconAction(
icon_name="save_as", tooltip="Save As", label_text="Save As", parent=self
)
self.toolbar.components.add_safe("save_as", save_as_button)
save_as_button.action.triggered.connect(self.on_save_as)
save_bundle = ToolbarBundle("save", self.toolbar.components)
save_bundle.add_action("save")
save_bundle.add_action("save_as")
self.toolbar.add_bundle(save_bundle)
run_action = MaterialIconAction(
icon_name="play_arrow",
tooltip="Run current file",
label_text="Run",
filled=True,
parent=self,
)
run_action.action.triggered.connect(self.on_execute)
self.toolbar.components.add_safe("run", run_action)
stop_action = MaterialIconAction(
icon_name="stop",
tooltip="Stop current execution",
label_text="Stop",
filled=True,
parent=self,
)
stop_action.action.triggered.connect(self.on_stop)
self.toolbar.components.add_safe("stop", stop_action)
execution_bundle = ToolbarBundle("execution", self.toolbar.components)
execution_bundle.add_action("run")
execution_bundle.add_action("stop")
self.toolbar.add_bundle(execution_bundle)
vim_action = MaterialIconAction(
icon_name="vim",
tooltip="Toggle Vim Mode",
label_text="Vim",
filled=True,
parent=self,
checkable=True,
)
self.toolbar.components.add_safe("vim", vim_action)
vim_action.action.triggered.connect(self.on_vim_triggered)
settings_bundle = ToolbarBundle("settings", self.toolbar.components)
settings_bundle.add_action("vim")
self.toolbar.add_bundle(settings_bundle)
save_shortcut = QShortcut(QKeySequence("Ctrl+S"), self)
save_shortcut.activated.connect(self.on_save)
save_as_shortcut = QShortcut(QKeySequence("Ctrl+Shift+S"), self)
save_as_shortcut.activated.connect(self.on_save_as)
def _open_new_file(self, file_name: str, scope: str):
self.monaco.open_file(file_name, scope)
# Set read-only mode for shared files
if "shared" in scope:
self.monaco.set_file_readonly(file_name, True)
# Add appropriate icon based on file type
if "script" in scope:
# Use script icon for script files
icon = material_icon("script", size=(24, 24))
self.monaco.set_file_icon(file_name, icon)
elif "macro" in scope:
# Use function icon for macro files
icon = material_icon("function", size=(24, 24))
self.monaco.set_file_icon(file_name, icon)
@SafeSlot()
def on_save(self):
"""Save the currently focused file in the Monaco editor."""
self.monaco.save_file()
@SafeSlot()
def on_save_as(self):
"""Save the currently focused file in the Monaco editor with a 'Save As' dialog."""
self.monaco.save_file(force_save_as=True)
@SafeSlot()
def on_vim_triggered(self):
"""Toggle Vim mode in the Monaco editor."""
self.monaco.set_vim_mode(self.toolbar.components.get_action("vim").action.isChecked())
@SafeSlot(bool)
def _on_save_enabled_update(self, enabled: bool):
self.toolbar.components.get_action("save").action.setEnabled(enabled)
self.toolbar.components.get_action("save_as").action.setEnabled(enabled)
@SafeSlot()
def on_execute(self):
"""Upload and run the currently focused script in the Monaco editor."""
self.script_editor_tab = self.monaco.last_focused_editor
if not self.script_editor_tab:
return
widget = self.script_editor_tab.widget()
if not isinstance(widget, MonacoWidget):
return
if widget.modified:
# Save the file before execution if there are unsaved changes
self.monaco.save_file()
if widget.modified:
# If still modified, user likely cancelled save dialog
return
self.current_script_id = upload_script(self.client.connector, widget.get_text())
self.console.write(f'bec._run_script("{self.current_script_id}")')
print(f"Uploaded script with ID: {self.current_script_id}")
@SafeSlot()
def on_stop(self):
"""Stop the execution of the currently running script"""
if not self.current_script_id:
return
self.console.send_ctrl_c()
@property
def current_script_id(self):
"""Get the ID of the currently running script."""
return self._current_script_id
@current_script_id.setter
def current_script_id(self, value: str | None):
"""
Set the ID of the currently running script.
Args:
value (str | None): The script ID to set.
Raises:
ValueError: If the provided value is not a string or None.
"""
if value is not None and not isinstance(value, str):
raise ValueError("Script ID must be a string.")
old_script_id = self._current_script_id
self._current_script_id = value
self._update_subscription(value, old_script_id)
def _update_subscription(self, new_script_id: str | None, old_script_id: str | None):
if old_script_id is not None:
self.bec_dispatcher.disconnect_slot(
self.on_script_execution_info, MessageEndpoints.script_execution_info(old_script_id)
)
if new_script_id is not None:
self.bec_dispatcher.connect_slot(
self.on_script_execution_info, MessageEndpoints.script_execution_info(new_script_id)
)
@SafeSlot(CDockWidget)
def _on_focused_editor_changed(self, tab_widget: CDockWidget):
"""
Disable the run / stop buttons if the focused editor is a macro file.
Args:
tab_widget: The currently focused tab widget in the Monaco editor.
"""
if not isinstance(tab_widget, CDockWidget):
return
widget = tab_widget.widget()
if not isinstance(widget, MonacoWidget):
return
file_scope = widget.metadata.get("scope", "")
run_action = self.toolbar.components.get_action("run")
stop_action = self.toolbar.components.get_action("stop")
if "macro" in file_scope:
run_action.action.setEnabled(False)
stop_action.action.setEnabled(False)
else:
run_action.action.setEnabled(True)
stop_action.action.setEnabled(True)
@SafeSlot(dict, dict)
def on_script_execution_info(self, content: dict, metadata: dict):
"""
Handle script execution info messages to update the editor highlights.
Args:
content (dict): The content of the message containing execution info.
metadata (dict): Additional metadata for the message.
"""
print(f"Script execution info: {content}")
current_lines = content.get("current_lines")
if self.script_editor_tab is None:
return
widget = self.script_editor_tab.widget()
if not isinstance(widget, MonacoWidget):
return
if not current_lines:
widget.clear_highlighted_lines()
return
line_number = current_lines[0]
widget.clear_highlighted_lines()
widget.set_highlighted_lines(line_number, line_number)
def cleanup(self):
"""Clean up resources used by the developer widget."""
self.delete_all()
return super().cleanup()
@@ -1,2 +0,0 @@
from .config_choice_dialog import ConfigChoiceDialog
from .device_form_dialog import DeviceFormDialog
@@ -1,49 +0,0 @@
"""Dialog to choose config loading method: replace, add or cancel."""
from enum import IntEnum
from qtpy.QtWidgets import QDialog, QDialogButtonBox, QLabel, QSizePolicy, QVBoxLayout
class ConfigChoiceDialog(QDialog):
class Result(IntEnum):
CANCEL = QDialog.Rejected
ADD = 2
REPLACE = 3
def __init__(
self,
parent=None,
custom_label: str = "Do you want to replace the current config or add to it?",
):
super().__init__(parent)
self.setWindowTitle("Load Config")
layout = QVBoxLayout(self)
label = QLabel(custom_label)
label.setWordWrap(True)
layout.addWidget(label)
# Use QDialogButtonBox for native layout
self.button_box = QDialogButtonBox(self)
self.cancel_btn = self.button_box.addButton(
"Cancel", QDialogButtonBox.ButtonRole.ActionRole # RejectRole will be next to Accept...
)
self.replace_btn = self.button_box.addButton(
"Replace", QDialogButtonBox.ButtonRole.AcceptRole
)
self.add_btn = self.button_box.addButton("Add", QDialogButtonBox.ButtonRole.AcceptRole)
layout.addWidget(self.button_box)
for btn in [self.replace_btn, self.add_btn, self.cancel_btn]:
btn.setMinimumWidth(80)
btn.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Preferred)
# Connections using native done(int)
self.replace_btn.clicked.connect(lambda: self.done(self.Result.REPLACE))
self.add_btn.clicked.connect(lambda: self.done(self.Result.ADD))
self.cancel_btn.clicked.connect(lambda: self.done(self.Result.CANCEL))
self.replace_btn.setFocus()
@@ -1,447 +0,0 @@
"""Dialogs for device configuration forms and ophyd testing."""
from typing import Any, Iterable, Tuple
from bec_lib.atlas_models import Device as DeviceModel
from bec_lib.logger import bec_logger
from ophyd_devices.interfaces.device_config_templates.ophyd_templates import OPHYD_DEVICE_TEMPLATES
from qtpy import QtCore, QtWidgets
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.control.device_manager.components import OphydValidation
from bec_widgets.widgets.control.device_manager.components.device_config_template.device_config_template import (
DeviceConfigTemplate,
)
from bec_widgets.widgets.control.device_manager.components.device_config_template.template_items import (
validate_name,
)
from bec_widgets.widgets.control.device_manager.components.ophyd_validation import (
ConfigStatus,
ConnectionStatus,
format_error_to_md,
)
DEFAULT_DEVICE = "CustomDevice"
_ValidationResultIter = Iterable[Tuple[dict[str, Any], ConfigStatus, ConnectionStatus, str]]
logger = bec_logger.logger
class DeviceManagerOphydValidationDialog(QtWidgets.QDialog):
"""Popup dialog to test Ophyd device configurations interactively."""
def __init__(self, parent=None, config: dict | None = None): # type: ignore
super().__init__(parent)
self.setWindowTitle("Device Manager Ophyd Test")
self._config_status = ConfigStatus.UNKNOWN.value
self._connection_status = ConnectionStatus.UNKNOWN.value
self._validated_config: dict = {}
self._validation_msg: str = ""
layout = QtWidgets.QVBoxLayout(self)
# Core test widget
self.device_manager_ophyd_test = OphydValidation()
layout.addWidget(self.device_manager_ophyd_test)
# Log/Markdown box for messages
self.text_box = QtWidgets.QTextEdit()
self.text_box.setReadOnly(True)
layout.addWidget(self.text_box)
# Load and apply configuration
config = config or {}
device_name = config.get("name", None)
if device_name:
self.device_manager_ophyd_test.add_device_to_keep_visible_after_validation(device_name)
# Dialog Buttons: equal size, stacked horizontally
button_box = QtWidgets.QDialogButtonBox(QtWidgets.QDialogButtonBox.StandardButton.Close)
for button in button_box.buttons():
button.setSizePolicy(
QtWidgets.QSizePolicy.Policy.Expanding, QtWidgets.QSizePolicy.Policy.Fixed
)
button.clicked.connect(self.accept)
# button_box.setCenterButtons(False)
layout.addWidget(button_box)
self.device_manager_ophyd_test.validation_completed.connect(self._on_device_validated)
self._resize_dialog()
self.finished.connect(self._finished)
# Add and test device config
self.device_manager_ophyd_test.change_device_configs([config], added=True, connect=True)
def _resize_dialog(self):
"""Resize the dialog based on the screen size."""
app: QtCore.QCoreApplication = QtWidgets.QApplication.instance()
screen = app.primaryScreen()
screen_geometry = screen.availableGeometry()
screen_width = screen_geometry.width()
screen_height = screen_geometry.height()
# 70% of screen height, keep 4:3 ratio
height = int(screen_height * 0.7)
width = int(height * (4 / 3))
# If width exceeds screen width, scale down
if width > screen_width * 0.9:
width = int(screen_width * 0.9)
height = int(width / (4 / 3))
self.resize(width, height)
def _on_device_validated(
self, device_config: dict, config_status: int, connection_status: int, validation_msg: str
):
device_name = device_config.get("name", "")
self._config_status = config_status
self._connection_status = connection_status
self._validated_config = device_config
self._validation_msg = validation_msg
self.text_box.setMarkdown(format_error_to_md(device_name, validation_msg))
@SafeSlot(int)
def _finished(self, state: int):
self.device_manager_ophyd_test.close()
self.device_manager_ophyd_test.deleteLater()
@property
def validation_result(self) -> tuple[dict, int, int, str]:
"""
Return the result of the validation as a tuple of
Returns:
result (Tuple[dict, int, int]): A tuple containing:
validated_config (dict): The validated device configuration.
config_status (int): The configuration status.
connection_status (int): The connection status.
"""
return (
self._validated_config,
self._config_status,
self._connection_status,
self._validation_msg,
)
class DeviceFormDialog(QtWidgets.QDialog):
# Signal emitted when device configuration is accepted, only
# emitted when the user clicks the "Add Device" button
# The integer values indicate if the device config was
# validated: config_status, connection_status
accepted_data = QtCore.Signal(dict, int, int, str, str)
def __init__(self, parent=None, add_btn_text: str = "Add Device"): # type: ignore
super().__init__(parent)
# Track old device name if config is edited
self._old_device_name: str = ""
# Config validation result
self._validation_result: tuple[dict, int, int, str] = (
{},
ConfigStatus.UNKNOWN.value,
ConnectionStatus.UNKNOWN.value,
"",
)
# Group to variants mapping
self._group_variants: dict[str, list[str]] = {
group: [variant for variant in variants.keys()]
for group, variants in OPHYD_DEVICE_TEMPLATES.items()
}
self._control_widgets: dict[str, QtWidgets.QWidget] = {}
# Setup layout
self.setWindowTitle("Device Config Dialog")
layout = QtWidgets.QVBoxLayout(self)
# Control panel
self._control_box = self.create_control_panel()
layout.addWidget(self._control_box)
# Device config template display
self._device_config_template = DeviceConfigTemplate(parent=self)
self._frame = QtWidgets.QFrame()
self._frame.setFrameShape(QtWidgets.QFrame.StyledPanel)
self._frame.setFrameShadow(QtWidgets.QFrame.Raised)
frame_layout = QtWidgets.QVBoxLayout(self._frame)
frame_layout.addWidget(self._device_config_template)
layout.addWidget(self._frame)
# Custom buttons
self.add_btn = QtWidgets.QPushButton(add_btn_text)
self.test_connection_btn = QtWidgets.QPushButton("Test Connection")
self.cancel_btn = QtWidgets.QPushButton("Cancel")
self.reset_btn = QtWidgets.QPushButton("Reset Form")
btn_box = QtWidgets.QDialogButtonBox(self)
btn_box.addButton(self.cancel_btn, QtWidgets.QDialogButtonBox.ButtonRole.RejectRole)
btn_box.addButton(self.reset_btn, QtWidgets.QDialogButtonBox.ButtonRole.ActionRole)
btn_box.addButton(
self.test_connection_btn, QtWidgets.QDialogButtonBox.ButtonRole.ActionRole
)
btn_box.addButton(self.add_btn, QtWidgets.QDialogButtonBox.ButtonRole.AcceptRole)
for btn in btn_box.buttons():
btn.setSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Preferred)
layout.addWidget(btn_box)
frame_layout.addWidget(btn_box)
# Connect signals to explicit slots
self.add_btn.clicked.connect(self._add_config)
self.test_connection_btn.clicked.connect(self._test_connection)
self.reset_btn.clicked.connect(self._reset_config)
self.cancel_btn.clicked.connect(self._reject_config)
# layout.addWidget(self._device_config_template)
self.update_variant_combo(self._control_widgets["group_combo"].currentText())
self.finished.connect(self._finished)
# Wait dialog when adding config
self._wait_dialog: QtWidgets.QProgressDialog | None = None
@SafeSlot(int)
def _finished(self, state: int):
for widget in self._control_widgets.values():
widget.close()
widget.deleteLater()
if self._wait_dialog is not None:
self._wait_dialog.close()
self._wait_dialog.deleteLater()
@property
def config_validation_result(self) -> tuple[dict, int, int, str]:
"""Return the result of the last configuration validation."""
return self._validation_result
@config_validation_result.setter
def config_validation_result(self, result: tuple[dict, int, int, str]):
self._validation_result = result
def set_device_config(self, device_config: dict):
"""Set the device configuration in the template form."""
# Figure out which group and variant this config belongs to
device_class = device_config.get("deviceClass", None)
for group, variants in OPHYD_DEVICE_TEMPLATES.items():
for variant, template_info in variants.items():
if template_info.get("deviceClass", None) == device_class:
# Found the matching group and variant
self._control_widgets["group_combo"].setCurrentText(group)
self.update_variant_combo(group)
self._control_widgets["variant_combo"].setCurrentText(variant)
self._device_config_template.set_config_fields(device_config)
return
# If no match found, set to default
self._control_widgets["group_combo"].setCurrentText(DEFAULT_DEVICE)
self.update_variant_combo(DEFAULT_DEVICE)
self._device_config_template.set_config_fields(device_config)
self._old_device_name = device_config.get("name", "")
def sizeHint(self) -> QtCore.QSize:
return QtCore.QSize(1600, 1000)
def create_control_panel(self) -> QtWidgets.QGroupBox:
self._control_box = QtWidgets.QGroupBox("Choose a Device Group")
layout = QtWidgets.QGridLayout(self._control_box)
group_label = QtWidgets.QLabel("Device Group:")
layout.addWidget(group_label, 0, 0)
group_combo = QtWidgets.QComboBox()
group_combo.addItems(self._group_variants.keys())
self._control_widgets["group_combo"] = group_combo
layout.addWidget(group_combo, 1, 0)
variant_label = QtWidgets.QLabel("Variants:")
layout.addWidget(variant_label, 0, 1)
variant_combo = QtWidgets.QComboBox()
self._control_widgets["variant_combo"] = variant_combo
layout.addWidget(variant_combo, 1, 1)
group_combo.currentTextChanged.connect(self.update_variant_combo)
variant_combo.currentTextChanged.connect(self.update_device_config_template)
return self._control_box
def update_variant_combo(self, group_name: str):
variant_combo = self._control_widgets["variant_combo"]
variant_combo.clear()
variant_combo.addItems(self._group_variants.get(group_name, []))
if variant_combo.count() <= 1:
variant_combo.setEnabled(False)
else:
variant_combo.setEnabled(True)
def update_device_config_template(self, variant_name: str):
group_name = self._control_widgets["group_combo"].currentText()
template_info = OPHYD_DEVICE_TEMPLATES.get(group_name, {}).get(variant_name, {})
if template_info:
self._device_config_template.change_template(template_info)
else:
self._device_config_template.change_template(
OPHYD_DEVICE_TEMPLATES[DEFAULT_DEVICE][DEFAULT_DEVICE]
)
def _create_validation_dialog(self) -> QtWidgets.QProgressDialog:
"""
Create and show a validation progress dialog while validating the device configuration.
The dialog will be modal and prevent user interaction until validation is complete.
"""
wait_dialog = QtWidgets.QProgressDialog(
"Validating config... please wait", None, 0, 0, parent=self
)
wait_dialog.setWindowModality(QtCore.Qt.WindowModality.ApplicationModal)
wait_dialog.setCancelButton(None)
wait_dialog.setMinimumDuration(0)
return wait_dialog
def _create_and_run_ophyd_validation(self, config: dict[str, Any]) -> OphydValidation:
"""Run ophyd validation test on the current device configuration."""
ophyd_validation = OphydValidation(parent=self)
ophyd_validation.validation_completed.connect(self._handle_validation_result)
ophyd_validation.multiple_validations_completed.connect(
self._handle_devices_already_in_session_results
)
# NOTE Use singleShot here to ensure that the signal is emitted after all other scheduled
# tasks in the event loop are processed. This avoids potential deadlocks. In particular,
# this is relevant for the _wait_dialog exec which opens a modal dialog during validation
# and therefore must not have the signal emitted immediately in the same event loop iteration.
# Otherwise, the callback may be scheduled before the dialog is shown resulting in a deadlock.
QtCore.QTimer.singleShot(
0, lambda: ophyd_validation.change_device_configs([config], True, False)
)
return ophyd_validation
@SafeSlot(list)
def _handle_devices_already_in_session_results(
self, validation_results: _ValidationResultIter
) -> None:
"""Handle completion if device is already in session."""
if len(validation_results) != 1:
logger.error(
"Expected a single device validation result, but got multiple. Using first result."
)
result = validation_results[0] if len(validation_results) > 0 else None
if result is None:
logger.error(
f"Received validation results: {validation_results} of unexpected length 0. Returning."
)
return
device_config, config_status, connection_status, validation_msg = result
self._handle_validation_result(
device_config, config_status, connection_status, validation_msg
)
@SafeSlot(dict, int, int, str)
def _handle_validation_result(
self, device_config: dict, config_status: int, connection_status: int, validation_msg: str
):
"""Handle completion of validation."""
try:
if (
DeviceModel.model_validate(device_config)
== DeviceModel.model_validate(self._validation_result[0])
and connection_status == ConnectionStatus.UNKNOWN.value
):
# Config unchanged, we can reuse previous connection status. Only do this if the new
# connection status is UNKNOWN as the current validation should not test the connection.
connection_status = self._validation_result[2]
validation_msg = self._validation_result[3]
except Exception:
logger.debug(
f"Device config validation changed for config: {device_config} compared to previous validation. Using status from recent validation."
)
self._validation_result = (device_config, config_status, connection_status, validation_msg)
if self._wait_dialog is not None:
self._wait_dialog.accept()
self._wait_dialog.close()
self._wait_dialog.deleteLater()
self._wait_dialog = None
def _add_config(self):
"""
Adding a config will always run a validation check of the config without a connection test.
We will check if tests have already run, and reuse the information in case they also tested the connection to the device.
"""
config = self._device_config_template.get_config_fields()
# I. First we validate that the device name is valid, as this may create issues within the OphydValidation widget.
# Validate device name first. If invalid, this should immediately block adding the device.
if not validate_name(config.get("name", "")):
msg_box = self._create_warning_message_box(
"Invalid Device Name",
f"Device is invalid, cannot be empty or contain spaces. Please provide a valid name. {config.get('name', '')!r}",
)
msg_box.exec()
return
# II. Next we will run the validation check of the config without connection test.
# We will show a wait dialog while this is happening, and compare the results with the last known validation results.
# If the config is unchanged, we will use the connection status results from the last validation.
self._wait_dialog = self._create_validation_dialog()
ophyd_validation: OphydValidation | None = None
try:
ophyd_validation = self._create_and_run_ophyd_validation(config)
# NOTE If dialog was already closed, this means that a validation callback was already received
# which closed the dialog. In this case, we skip exec to avoid deadlock. With the singleShot above,
# this should not happen, but we keep the check for safety.
if self._wait_dialog is not None:
self._wait_dialog.exec() # This will block until the validation is complete
config, config_status, connection_status, validation_msg = self._validation_result
if config_status == ConfigStatus.INVALID.value:
msg_box = self._create_warning_message_box(
"Invalid Device Configuration",
f"Device configuration is invalid. Last known validation message:\n\nErrors:\n{self._validation_result[3]}",
)
msg_box.exec()
return
self.accepted_data.emit(
config, config_status, connection_status, validation_msg, self._old_device_name
)
self.accept()
finally:
if ophyd_validation is not None:
ophyd_validation.close()
ophyd_validation.deleteLater()
def _create_warning_message_box(self, title: str, text: str) -> QtWidgets.QMessageBox:
msg_box = QtWidgets.QMessageBox(self)
msg_box.setIcon(QtWidgets.QMessageBox.Warning)
msg_box.setWindowTitle(title)
msg_box.setText(text)
return msg_box
def _test_connection(self):
config = self._device_config_template.get_config_fields()
dialog = DeviceManagerOphydValidationDialog(self, config=config)
result = dialog.exec()
if result in (QtWidgets.QDialog.Accepted, QtWidgets.QDialog.Rejected):
self.config_validation_result = dialog.validation_result
def _reset_config(self):
self._device_config_template.reset_to_defaults()
def _reject_config(self):
self.reject()
if __name__ == "__main__": # pragma: no cover
import sys
from bec_qthemes import apply_theme
app = QtWidgets.QApplication(sys.argv)
apply_theme("light")
dialog = DeviceFormDialog()
dialog.resize(1200, 800)
dialog.show()
sys.exit(app.exec())
@@ -1,691 +0,0 @@
"""Module for the upload redis dialog in the device manager view."""
from __future__ import annotations
from enum import IntEnum
from functools import partial
from typing import TYPE_CHECKING, Any, List, Tuple
from bec_lib.logger import bec_logger
from bec_qthemes import apply_theme, material_icon
from qtpy import QtCore, QtGui, QtWidgets
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.control.device_manager.components.ophyd_validation import (
ConfigStatus,
ConnectionStatus,
get_validation_icons,
)
if TYPE_CHECKING:
from bec_widgets.utils.colors import AccentColor
from bec_widgets.widgets.control.device_manager.components.device_table.device_table import (
_ValidationResultIter,
)
logger = bec_logger.logger
class DeviceStatusItem(QtWidgets.QWidget):
"""Individual device status item widget for the validation display."""
def __init__(
self, device_config: dict, config_status: int, connection_status: int, parent=None
):
super().__init__(parent)
self.device_name = device_config.get("name", "")
self.device_config: dict = device_config
self.config_status = ConfigStatus(config_status)
self.connection_status = ConnectionStatus(connection_status)
self._transparent_button_style = "background-color: transparent; border: none;"
# Get validation icons
self.colors = get_accent_colors()
self._icon_size = (20, 20)
self.icons = get_validation_icons(self.colors, self._icon_size)
self._setup_ui()
self._update_display()
def _setup_ui(self):
"""Setup the UI for the device status item."""
layout = QtWidgets.QHBoxLayout(self)
layout.setContentsMargins(8, 4, 8, 4)
layout.setSpacing(8)
# Device name label
self.name_label = QtWidgets.QLabel(self.device_name)
self.name_label.setMinimumWidth(150)
layout.addWidget(self.name_label)
layout.addStretch()
# Config status icon
self.config_icon_label = self._create_status_icon_label(self._icon_size)
layout.addWidget(self.config_icon_label)
# Connection status icon
self.connection_icon_label = self._create_status_icon_label(self._icon_size)
layout.addWidget(self.connection_icon_label)
def _create_status_icon_label(self, icon_size: tuple[int, int]) -> QtWidgets.QPushButton:
button = QtWidgets.QPushButton()
button.setFlat(True)
button.setEnabled(False)
button.setStyleSheet(self._transparent_button_style)
button.setFixedSize(icon_size[0], icon_size[1])
return button
def _update_display(self):
"""Update the visual display based on current status."""
# Update config status
config_icon = self.icons["config_status"].get(self.config_status.value)
if config_icon:
self.config_icon_label.setIcon(config_icon)
# Update connection status
connection_icon = self.icons["connection_status"].get(self.connection_status.value)
if connection_icon:
self.connection_icon_label.setIcon(connection_icon)
def update_status(self, config_status: int, connection_status: int):
"""Update the status and refresh display."""
self.config_status = ConfigStatus(config_status)
self.connection_status = ConnectionStatus(connection_status)
self._update_display()
class SortTableItem(QtWidgets.QTableWidgetItem):
"""Custom TableWidgetItem with hidden __column_data attribute for sorting."""
def __lt__(self, other: QtWidgets.QTableWidgetItem) -> bool:
"""Override less-than operator for sorting."""
if not isinstance(other, QtWidgets.QTableWidgetItem):
return NotImplemented
self_data = self.data(QtCore.Qt.ItemDataRole.UserRole)
other_data = other.data(QtCore.Qt.ItemDataRole.UserRole)
if self_data is not None and other_data is not None:
self_data: DeviceStatusItem
other_data: DeviceStatusItem
if self_data.config_status != other_data.config_status:
return self_data.config_status < other_data.config_status
else:
return self_data.connection_status < other_data.connection_status
return super().__lt__(other)
def __gt__(self, other: QtWidgets.QTableWidgetItem) -> bool:
"""Override less-than operator for sorting."""
if not isinstance(other, QtWidgets.QTableWidgetItem):
return NotImplemented
self_data = self.data(QtCore.Qt.ItemDataRole.UserRole)
other_data = other.data(QtCore.Qt.ItemDataRole.UserRole)
if self_data is not None and other_data is not None:
self_data: DeviceStatusItem
other_data: DeviceStatusItem
if self_data.config_status != other_data.config_status:
return self_data.config_status > other_data.config_status
else:
return self_data.connection_status > other_data.connection_status
return super().__gt__(other)
class ValidationSection(QtWidgets.QGroupBox):
"""Section widget for displaying validation results."""
def __init__(self, title: str, parent=None):
super().__init__(title, parent=parent)
self._setup_ui()
# self.device_items: Dict[str, DeviceStatusItem] = {}
def _setup_ui(self):
"""Setup the UI for the validation section."""
layout = QtWidgets.QVBoxLayout(self)
# Status summary label
summary_layout = QtWidgets.QHBoxLayout()
self.summary_icon = QtWidgets.QLabel()
self.summary_icon.setFixedSize(24, 24)
self.summary_label = QtWidgets.QLabel()
self.summary_label.setWordWrap(True)
summary_layout.addWidget(self.summary_icon)
summary_layout.addWidget(self.summary_label)
layout.addLayout(summary_layout)
# Scroll area for device items
self.table = QtWidgets.QTableWidget()
self.table.setColumnCount(1)
self.table.horizontalHeader().setSectionResizeMode(QtWidgets.QHeaderView.Stretch)
self.table.horizontalHeader().hide()
self.table.verticalHeader().hide()
self.table.setShowGrid(False) # r
self.table.sortItems(0, QtCore.Qt.SortOrder.AscendingOrder)
layout.addWidget(self.table)
QtCore.QTimer.singleShot(0, self.adjustSize)
def add_device(self, device_config: dict, config_status: int, connection_status: int):
"""
Add a device to the validation section.
Args:
device_config (dict): The device configuration dictionary.
config_status (int): The configuration status.
connection_status (int): The connection status.
"""
self.table.setSortingEnabled(False)
device_name = device_config.get("name", "")
row = self._find_row_by_name(device_name)
if row is not None:
widget: DeviceStatusItem = self.table.cellWidget(row, 0)
widget.update_status(config_status, connection_status)
else:
row_position = self.table.rowCount()
self.table.insertRow(row_position)
sort_item = SortTableItem(device_name)
sort_item.setText("")
self.table.setItem(row_position, 0, sort_item)
device_item = DeviceStatusItem(device_config, config_status, connection_status)
sort_item.setData(QtCore.Qt.ItemDataRole.UserRole, device_item)
self.table.setCellWidget(row_position, 0, device_item)
self.table.resizeRowsToContents()
self.table.setSortingEnabled(True)
def _find_row_by_name(self, device_name: str) -> int | None:
"""
Find a row by device name.
Args:
name (str): The name of the device to find.
Returns:
int | None: The row index if found, else None.
"""
for row in range(self.table.rowCount()):
item: SortTableItem = self.table.item(row, 0)
widget: DeviceStatusItem = self.table.cellWidget(row, 0)
if widget.device_name == device_name:
return row
return None
def remove_device(self, device_name: str):
"""Remove a device from the table by name."""
self.table.setSortingEnabled(False)
row = self._find_row_by_name(device_name)
if row is not None:
self.table.removeRow(row)
self.table.setSortingEnabled(True)
def clear_devices(self):
"""Clear all device items."""
self.table.setSortingEnabled(False)
while self.table.rowCount() > 0:
self.table.removeRow(0)
self.table.setSortingEnabled(True)
def update_summary(self, text: str, icon: QtGui.QPixmap = None):
"""Update the summary label."""
self.summary_label.setText(text)
if icon:
self.summary_icon.setPixmap(icon)
class UploadRedisDialog(QtWidgets.QDialog):
"""
Dialog for uploading device configurations to BEC server with validation checks.
"""
class UploadAction(IntEnum):
"""Enum for upload actions."""
CANCEL = QtWidgets.QDialog.DialogCode.Rejected
OK = QtWidgets.QDialog.DialogCode.Accepted
CONNECTION_TEST_REQUESTED = 999
# Request ophyd validation for all untested device connections
# list of device configs, added: bool, connect: bool
request_ophyd_validation = QtCore.Signal(list, bool, bool)
def __init__(self, parent, device_configs: dict[str, Tuple[dict, int, int]] | None = None):
super().__init__(parent=parent)
self.device_configs: dict[str, Tuple[dict, int, int]] = device_configs or {}
self._transparent_button_style = "background-color: transparent; border: none;"
self.colors = get_accent_colors()
self.icons = get_validation_icons(self.colors, (20, 20))
material_icon_partial = partial(material_icon, size=(24, 24), filled=True)
self._label_icons = {
"success": material_icon_partial("check_circle", color=self.colors.success),
"warning": material_icon_partial("warning", color=self.colors.warning),
"error": material_icon_partial("error", color=self.colors.emergency),
"reload": material_icon_partial("refresh", color=self.colors.default),
"upload": material_icon_partial("cloud_upload", color=self.colors.default),
}
# Track validation states
self.has_invalid_configs: int = 0
self.has_untested_connections: int = 0
self.has_cannot_connect: int = 0
self._setup_ui()
self._update_ui()
def set_device_config(self, device_configs: dict[str, Tuple[dict, int, int]]):
"""
Update the device configuration in the dialog.
Args:
device_configs (dict[str, Tuple[dict, int, int]]): New device configurations with structure
{device_name: (config_dict, config_status, connection_status)}.
"""
self.config_section.clear_devices()
self.device_configs = device_configs
self._update_ui()
def _setup_ui(self):
"""Setup the main UI for the dialog."""
self.setWindowTitle("Upload Configuration to BEC Server")
self.setModal(True) # Blocks interaction with other parts of the app
layout = QtWidgets.QVBoxLayout(self)
layout.setSpacing(16)
# Header
header_label = QtWidgets.QLabel("Review Configuration Before Upload")
header_label.setStyleSheet("font-size: 16px; font-weight: bold; margin-bottom: 8px;")
layout.addWidget(header_label)
# Description
desc_label = QtWidgets.QLabel(
"Please review the configuration and connection status of all devices before uploading to BEC Server."
)
desc_label.setWordWrap(True)
desc_label.setStyleSheet("color: #666; margin-bottom: 16px;")
layout.addWidget(desc_label)
# Config validation section
sections_layout = QtWidgets.QHBoxLayout()
self.config_section = ValidationSection("Configuration Validation")
sections_layout.addWidget(self.config_section)
layout.addLayout(sections_layout)
# Action buttons section
self._setup_action_buttons(layout)
# Dialog buttons
self._setup_dialog_buttons(layout)
self.adjustSize()
def _setup_action_buttons(self, parent_layout: QtWidgets.QLayout):
"""Setup the action buttons section."""
action_group = QtWidgets.QGroupBox("Actions")
action_layout = QtWidgets.QVBoxLayout(action_group)
# Validate connections button
button_layout = QtWidgets.QHBoxLayout()
self.validate_connections_btn = QtWidgets.QPushButton("Validate All Connections")
self.validate_connections_btn.setIcon(self._label_icons["reload"])
self.validate_connections_btn.clicked.connect(self._validate_connections)
button_layout.addWidget(self.validate_connections_btn)
button_layout.addStretch()
button_layout.addSpacing(16)
action_layout.addLayout(button_layout)
# Status indicator
status_layout = QtWidgets.QHBoxLayout()
self.status_icon = QtWidgets.QPushButton()
self.status_icon.setFlat(True)
self.status_icon.setEnabled(False)
self.status_icon.setStyleSheet(self._transparent_button_style)
self.status_icon.setFixedSize(24, 24)
self.status_label = QtWidgets.QLabel()
self.status_label.setWordWrap(True)
status_layout.addWidget(self.status_icon)
status_layout.addWidget(self.status_label)
action_layout.addLayout(status_layout)
parent_layout.addWidget(action_group)
def _setup_dialog_buttons(self, parent_layout: QtWidgets.QLayout):
"""Setup the dialog buttons."""
button_layout = QtWidgets.QHBoxLayout()
# Cancel button
self.cancel_btn = QtWidgets.QPushButton("Cancel")
self.cancel_btn.clicked.connect(self.reject)
button_layout.addWidget(self.cancel_btn)
button_layout.addStretch()
# Upload button
self.upload_btn = QtWidgets.QPushButton("Upload to BEC Server")
self.upload_btn.setIcon(self._label_icons["upload"])
self.upload_btn.clicked.connect(self._handle_upload)
button_layout.addWidget(self.upload_btn)
parent_layout.addLayout(button_layout)
def _populate_device_data(self):
"""Populate the dialog with device configuration data."""
if not self.device_configs:
return
self.has_invalid_configs = 0
self.has_untested_connections = 0
self.has_cannot_connect = 0
for device_name, (config, config_status, connection_status) in self.device_configs.items():
# Add to appropriate sections
self.config_section.add_device(config, config_status, connection_status)
# Track statistics
if config_status == ConfigStatus.INVALID.value:
self.has_invalid_configs += 1
if connection_status == ConnectionStatus.UNKNOWN.value:
self.has_untested_connections += 1
if connection_status == ConnectionStatus.CANNOT_CONNECT.value:
self.has_cannot_connect += 1
# Update section summaries
num_devices = len(self.device_configs)
# Config validation summary
if self.has_invalid_configs > 0:
icon = self._label_icons["error"]
text = f"{self.has_invalid_configs} of {num_devices} device configurations are invalid."
else:
icon = self._label_icons["success"]
text = f"All {num_devices} device configurations are valid."
if self.has_untested_connections > 0:
icon = self._label_icons["warning"]
text += f"{self.has_untested_connections} device connections are not tested."
if self.has_cannot_connect > 0:
icon = self._label_icons["warning"]
text += f"{self.has_cannot_connect} device connections cannot be established."
self.config_section.update_summary(text, icon)
def _update_ui(self):
"""Update UI state based on validation results."""
# Update first the device data
self._populate_device_data()
# Invalid configuration have highest priority, upload disabled
if self.has_invalid_configs:
self.status_icon.setIcon(self._label_icons["error"])
self.status_label.setText(
"\n".join(
[
f"{self.has_invalid_configs} device configurations are invalid.",
"Please fix configuration errors before uploading.",
]
)
)
self.upload_btn.setEnabled(False)
self.validate_connections_btn.setEnabled(False)
self.validate_connections_btn.setText("Invalid Configurations")
# Next priority: connections that cannot be established, error but upload is enabled
elif self.has_cannot_connect:
self.status_icon.setIcon(self._label_icons["warning"])
self.status_label.setText(
"\n".join(
[
f"{self.has_cannot_connect} connections cannot be established.",
"Please fix connection issues before uploading.",
]
)
)
self.upload_btn.setEnabled(True)
self.validate_connections_btn.setEnabled(True)
self.validate_connections_btn.setText(
f"Validate {self.has_untested_connections + self.has_cannot_connect} Connections"
)
# Next priority: untested connections, warning but upload is enabled
elif self.has_untested_connections:
self.status_icon.setIcon(self._label_icons["warning"])
self.status_label.setText(
"\n".join(
[
f"{self.has_untested_connections} connections have not been tested.",
"Consider validating connections before uploading.",
]
)
)
self.upload_btn.setEnabled(True)
self.validate_connections_btn.setEnabled(True)
self.validate_connections_btn.setText(
f"Validate {self.has_untested_connections + self.has_cannot_connect} Connections"
)
# All good, upload enabled
else:
self.status_icon.setIcon(self._label_icons["success"])
self.status_label.setText(
"\n".join(
[
"All device configurations are valid.",
"All connections have been successfully tested.",
]
)
)
self.upload_btn.setEnabled(True)
self.validate_connections_btn.setEnabled(False)
self.validate_connections_btn.setText("All Connections Validated")
@SafeSlot()
def _validate_connections(self):
"""Request validation of all untested connections. This will close the dialog."""
testable_devices: List[dict] = []
for _, (config, _, connection_status) in self.device_configs.items():
if connection_status == ConnectionStatus.UNKNOWN.value:
testable_devices.append(config)
elif connection_status == ConnectionStatus.CANNOT_CONNECT.value:
testable_devices.append(config)
if len(testable_devices) > 0:
self.request_ophyd_validation.emit(testable_devices, True, True)
self.done(self.UploadAction.CONNECTION_TEST_REQUESTED)
@SafeSlot()
def _handle_upload(self):
"""Handle the upload button click with appropriate confirmations."""
# First priority: invalid configurations, block upload
if self.has_invalid_configs:
detailed_text = (
f"There is {self.has_invalid_configs} device with an invalid configuration."
if self.has_invalid_configs == 1
else f"There are {self.has_invalid_configs} devices with invalid configurations."
)
text = " ".join(
[detailed_text, "Invalid configuration can not be uploaded to the BEC Server."]
)
QtWidgets.QMessageBox.critical(self, "Device Configurations Invalid", text)
self.done(self.UploadAction.CANCEL)
return
# Next priority: connections that cannot be established, show warning, but allow to proceed
if self.has_cannot_connect:
detailed_text = (
f"There is {self.has_cannot_connect} device that cannot connect"
if self.has_cannot_connect == 1
else f"There are {self.has_cannot_connect} devices that cannot connect."
)
text = " ".join(
[
detailed_text,
"These devices may not be reachable and disabled BEC upon loading the config.",
"Consider validating these connections before proceeding.\n\n",
"Continue anyway?",
]
)
reply = QtWidgets.QMessageBox.critical(
self,
"Devices cannot Connect",
text,
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No,
QtWidgets.QMessageBox.No,
)
if reply == QtWidgets.QMessageBox.No:
return
# If some connections are untested, warn the user
if self.has_untested_connections:
detailed_text = (
f"There is {self.has_untested_connections} device with untested connections."
if self.has_untested_connections == 1
else f"There are {self.has_untested_connections} devices with untested connections."
)
text = " ".join(
[
detailed_text,
"Uploading without validating connections may result in devices that cannot be reached when the configuration is applied.",
]
)
reply = QtWidgets.QMessageBox.question(
self,
"Untested Connections",
text,
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No,
QtWidgets.QMessageBox.No,
)
if reply == QtWidgets.QMessageBox.No:
return
# Final confirmation
text = " ".join(
["You are about to upload the device configurations to BEC Server.", "Please confirm."]
)
reply = QtWidgets.QMessageBox.question(
self,
"Upload to BEC Server",
text,
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No,
QtWidgets.QMessageBox.Yes,
)
if reply == QtWidgets.QMessageBox.Yes:
self.done(self.UploadAction.OK)
else:
self.done(self.UploadAction.CANCEL)
@SafeSlot(dict, int, int, str)
def _update_from_ophyd_device_tests(
self,
device_config: dict,
config_status: int,
connection_status: int,
validation_message: str = "",
):
"""
Update device status from ophyd device tests. This has to be with a connection_status that was updated.
"""
if connection_status == ConnectionStatus.UNKNOWN.value:
return
self.update_device_status(device_config, config_status, connection_status)
@SafeSlot(list)
def _multiple_updates_from_ophyd_device_tests(self, validation_results: _ValidationResultIter):
"""
Callback slot for receiving multiple validation result updates from the ophyd test widget.
Args:
validation_results (list): List of tuples containing (device_config, config_status, connection_status, validation_msg).
"""
for cfg, cfg_status, conn_status, val_msg in validation_results:
self.update_device_status(cfg, cfg_status, conn_status)
self._update_ui()
@SafeSlot(dict, int, int)
def update_device_status(self, device_config: dict, config_status: int, connection_status: int):
"""Update the status of a specific device."""
# Update device config status
self._update_device_configs(device_config, config_status, connection_status, "")
# Recalculate summaries and UI state
self._update_ui()
def _update_device_configs(
self,
device_config: dict[str, Any],
config_status: int,
connection_status: int,
validation_msg: str,
):
device_name = device_config.get("name", "")
old_config, _, _ = self.device_configs.get(device_name, (None, None, None))
if old_config is not None:
self.device_configs[device_name] = (device_config, config_status, connection_status)
else:
# If device not found, add it
self.config_section.add_device(device_config, config_status, connection_status)
def main(): # pragma: no cover
"""Test the upload redis dialog."""
import sys
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
# Sample device configurations for testing
sample_configs = [
(
{"name": "motor_x", "deviceClass": "EpicsMotor"},
ConfigStatus.VALID.value,
ConnectionStatus.CONNECTED.value,
),
(
{"name": "detector_1", "deviceClass": "EpicsSignal"},
ConfigStatus.VALID.value,
ConnectionStatus.CONNECTED.value,
),
(
{"name": "detector_2", "deviceClass": "EpicsSignal"},
ConfigStatus.VALID.value,
ConnectionStatus.UNKNOWN.value,
),
(
{"name": "motor_y", "deviceClass": "EpicsMotor"},
ConfigStatus.VALID.value,
ConnectionStatus.CONNECTED.value,
),
(
{"name": "motor_z", "deviceClass": "EpicsMotor"},
ConfigStatus.VALID.value,
ConnectionStatus.CONNECTED.value,
),
(
{"name": "motor_x1", "deviceClass": "EpicsMotor"},
ConfigStatus.VALID.value,
ConnectionStatus.CONNECTED.value,
),
(
{"name": "detector_11", "deviceClass": "EpicsSignal"},
ConfigStatus.VALID.value,
ConnectionStatus.CANNOT_CONNECT.value,
),
(
{"name": "detector_21", "deviceClass": "EpicsSignal"},
ConfigStatus.INVALID.value,
ConnectionStatus.UNKNOWN.value,
),
(
{"name": "motor_y1", "deviceClass": "EpicsMotor"},
ConfigStatus.VALID.value,
ConnectionStatus.CANNOT_CONNECT.value,
),
(
{"name": "motor_z1", "deviceClass": "EpicsMotor"},
ConfigStatus.VALID.value,
ConnectionStatus.CONNECTED.value,
),
]
configs = {cfg[0]["name"]: cfg for cfg in sample_configs}
apply_theme("dark")
dialog = UploadRedisDialog(parent=None, device_configs=configs)
dialog.show()
sys.exit(app.exec_())
if __name__ == "__main__": # pragma: no cover
main()
@@ -1,864 +0,0 @@
from __future__ import annotations
import os
from functools import partial
from typing import TYPE_CHECKING, List, Literal, get_args
import yaml
from bec_lib import config_helper
from bec_lib.bec_yaml_loader import yaml_load
from bec_lib.callback_handler import EventType
from bec_lib.endpoints import MessageEndpoints
from bec_lib.file_utils import DeviceConfigWriter
from bec_lib.logger import bec_logger
from bec_lib.messages import ConfigAction, ScanStatusMessage
from bec_lib.plugin_helper import plugin_package_name, plugin_repo_path
from bec_qthemes import apply_theme, material_icon
from qtpy.QtCore import QMetaObject, Qt, QThreadPool, Signal
from qtpy.QtGui import QColor
from qtpy.QtWidgets import (
QApplication,
QFileDialog,
QMessageBox,
QPushButton,
QTextEdit,
QVBoxLayout,
QWidget,
)
from bec_widgets.applications.views.device_manager_view.device_manager_dialogs import (
ConfigChoiceDialog,
DeviceFormDialog,
)
from bec_widgets.applications.views.device_manager_view.device_manager_dialogs.upload_redis_dialog import (
UploadRedisDialog,
)
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.toolbars.actions import MaterialIconAction
from bec_widgets.utils.toolbars.bundles import ToolbarBundle
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.widgets.containers.dock_area.basic_dock_area import DockAreaWidget
from bec_widgets.widgets.control.device_manager.components import (
DeviceTable,
DMConfigView,
DocstringView,
OphydValidation,
)
from bec_widgets.widgets.control.device_manager.components._util import SharedSelectionSignal
from bec_widgets.widgets.control.device_manager.components.ophyd_validation.ophyd_validation_utils import (
ConfigStatus,
ConnectionStatus,
)
from bec_widgets.widgets.progress.device_initialization_progress_bar.device_initialization_progress_bar import (
DeviceInitializationProgressBar,
)
from bec_widgets.widgets.services.device_browser.device_item.config_communicator import (
CommunicateConfigAction,
)
from bec_widgets.widgets.utility.spinner.spinner import SpinnerWidget
if TYPE_CHECKING: # pragma: no cover
from bec_lib.client import BECClient
logger = bec_logger.logger
_yes_no_question = partial(
QMessageBox.question,
buttons=QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No,
defaultButton=QMessageBox.StandardButton.No,
)
class CustomBusyWidget(QWidget):
"""Custom busy widget to show during device config upload."""
cancel_requested = Signal()
def __init__(self, parent=None, client: BECClient | None = None):
super().__init__(parent=parent)
# Widgets
self.progress = QWidget(parent=self)
self.progress_layout = QVBoxLayout(self.progress)
self.progress_layout.setContentsMargins(6, 6, 6, 6)
self.progress_inner = DeviceInitializationProgressBar(parent=self.progress, client=client)
self.progress_layout.addWidget(self.progress_inner)
self.progress.setMinimumWidth(320)
# Spinner
self.spinner = SpinnerWidget(parent=self)
scale = self._ui_scale()
spinner_size = int(scale * 0.12) if scale else 1
spinner_size = max(32, min(spinner_size, 96))
self.spinner.setFixedSize(spinner_size, spinner_size)
# Cancel button
self.cancel_button = QPushButton("Cancel Upload", parent=self)
self.cancel_button.setIcon(material_icon("cancel"))
self.cancel_button.clicked.connect(self.cancel_requested.emit)
button_height = int(spinner_size * 0.9)
button_height = max(36, min(button_height, 72))
aspect_ratio = 3.8 # width / height, visually stable for text buttons
button_width = int(button_height * aspect_ratio)
self.cancel_button.setFixedSize(button_width, button_height)
color = get_accent_colors()
self.cancel_button.setStyleSheet(f"""
QPushButton {{
background-color: {color.emergency.name()};
color: white;
font-weight: 600;
border-radius: 6px;
}}
""")
# Layout
content_layout = QVBoxLayout(self)
content_layout.setContentsMargins(24, 24, 24, 24)
content_layout.setSpacing(16)
content_layout.addStretch()
content_layout.addWidget(self.spinner, 0, Qt.AlignmentFlag.AlignHCenter)
content_layout.addWidget(self.progress, 0, Qt.AlignmentFlag.AlignHCenter)
content_layout.addStretch()
content_layout.addWidget(self.cancel_button, 0, Qt.AlignmentFlag.AlignHCenter)
if hasattr(color, "_colors"):
bg_color = color._colors.get("BG", None)
if bg_color is None: # Fallback if missing
bg_color = QColor(50, 50, 50, 255)
self.setStyleSheet(f"""
background-color: {bg_color.name()};
border-radius: 12px;
""")
def _ui_scale(self) -> int:
parent = self.parent()
if not parent:
return 0
return min(parent.width(), parent.height())
def showEvent(self, event):
"""Show event to start the spinner."""
super().showEvent(event)
self.spinner.start()
def hideEvent(self, event):
"""Hide event to stop the spinner."""
super().hideEvent(event)
self.spinner.stop()
class DeviceManagerDisplayWidget(DockAreaWidget):
"""Device Manager main display widget. This contains all sub-widgets and the toolbar."""
RPC = False
request_ophyd_validation = Signal(list, bool, bool)
def __init__(self, parent=None, *args, **kwargs):
super().__init__(parent=parent, variant="compact", *args, **kwargs)
# State variable for config upload
self._config_upload_active: bool = False
self._config_in_sync: bool = False
scan_status = self.bec_dispatcher.client.connector.get(MessageEndpoints.scan_status())
initial_status = scan_status.status if scan_status is not None else "closed"
self._scan_is_running: bool = initial_status in ["open", "paused"]
# Push to Redis dialog
self._upload_redis_dialog: UploadRedisDialog | None = None
self._dialog_validation_connection: QMetaObject.Connection | None = None
# NOTE: We need here a separate config helper instance to avoid conflicts with
# other communications to REDIS as uploading a config through a CommunicationConfigAction
# will block if we use the config_helper from self.client.config._config_helper
self._config_helper = config_helper.ConfigHelper(self.client.connector)
self._shared_selection = SharedSelectionSignal()
# Device Table View widget
self.device_table_view = DeviceTable(self)
# Device Config View widget
self.dm_config_view = DMConfigView(self)
# Docstring View
self.dm_docs_view = DocstringView(self)
# Ophyd Test view
self.ophyd_widget_view = QWidget(self)
layout = QVBoxLayout(self.ophyd_widget_view)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(4)
self.ophyd_test_view = OphydValidation(self, hide_legend=False)
layout.addWidget(self.ophyd_test_view)
# Validation Results view
self.validation_results = QTextEdit(self)
self.validation_results.setReadOnly(True)
self.validation_results.setPlaceholderText("Validation results will appear here...")
layout.addWidget(self.validation_results)
self.ophyd_test_view.item_clicked.connect(self._ophyd_test_item_clicked_cb)
for signal, slots in [
(
self.device_table_view.selected_devices,
(self.dm_config_view.on_select_config, self.dm_docs_view.on_select_config),
),
(
self.ophyd_test_view.validation_completed,
(self.device_table_view.update_device_validation,),
),
(
self.ophyd_test_view.multiple_validations_completed,
(self.device_table_view.update_multiple_device_validations,),
),
(self.request_ophyd_validation, (self.ophyd_test_view.change_device_configs,)),
(
self.device_table_view.device_configs_changed,
(self.ophyd_test_view.device_table_config_changed,),
),
(
self.device_table_view.device_config_in_sync_with_redis,
(self._update_config_in_sync,),
),
(self.device_table_view.device_row_dbl_clicked, (self._edit_device_action,)),
]:
for slot in slots:
signal.connect(slot)
self._scan_status_callback_id = self.bec_dispatcher.client.callbacks.register(
EventType.SCAN_STATUS, self._update_scan_running
)
# Add toolbar
self._add_toolbar()
# Build dock layout using shared helpers
self._build_docks()
def cleanup(self):
self.bec_dispatcher.client.callbacks.remove(self._scan_status_callback_id)
super().cleanup()
def closeEvent(self, event):
"""If config upload is active when application is exiting, cancel it."""
logger.info("Application is quitting, checking for active config upload...")
if self._config_upload_active:
logger.info("Application is quitting, cancelling active config upload...")
self._config_helper.send_config_request(
action="cancel", config=None, wait_for_response=True, timeout_s=10
)
logger.info("Config upload cancelled.")
super().closeEvent(event)
##############################
### Custom set busy widget ###
##############################
def create_busy_state_widget(self) -> QWidget:
"""Create a custom busy state widget for uploading device configurations."""
widget = CustomBusyWidget(parent=self, client=self.client)
widget.cancel_requested.connect(self._cancel_device_config_upload)
return widget
def _set_busy_wrapper(self, enabled: bool):
"""Thin wrapper around set_busy to flip the state variable."""
self._busy_overlay.set_opacity(0.92)
self._config_upload_active = enabled
self.set_busy(enabled=enabled)
##############################
### Toolbar and Dock setup ###
##############################
def _add_toolbar(self):
self.toolbar = ModularToolBar(self)
# Add IO actions
self._add_io_actions()
self._add_table_actions()
self.toolbar.show_bundles(["IO", "Table"])
self._root_layout.insertWidget(0, self.toolbar)
def _build_docks(self) -> None:
# Central device table
self.device_table_view_dock = self.new(
self.device_table_view,
return_dock=True,
closable=False,
floatable=False,
movable=False,
show_title_bar=False,
)
# Bottom area: docstrings
self.dm_docs_view_dock = self.new(
self.dm_docs_view,
where="bottom",
relative_to=self.device_table_view_dock,
return_dock=True,
closable=False,
floatable=False,
movable=False,
show_title_bar=False,
)
# Config view left of docstrings
self.dm_config_view_dock = self.new(
self.dm_config_view,
where="left",
relative_to=self.dm_docs_view_dock,
return_dock=True,
closable=False,
floatable=False,
movable=False,
show_title_bar=False,
)
# Right area: ophyd test + validation
self.ophyd_test_dock_view = self.new(
self.ophyd_widget_view,
where="right",
relative_to=self.device_table_view_dock,
return_dock=True,
closable=False,
floatable=False,
movable=False,
show_title_bar=False,
)
self.set_layout_ratios(splitter_overrides={0: [7, 3], 1: [3, 7]})
def _add_io_actions(self):
# Create IO bundle
io_bundle = ToolbarBundle("IO", self.toolbar.components)
# Load from disk
load = MaterialIconAction(
text_position="under",
icon_name="file_open",
parent=self,
tooltip="Load configuration file from disk",
label_text="Load Config",
)
self.toolbar.components.add_safe("load", load)
load.action.triggered.connect(self._load_file_action)
io_bundle.add_action("load")
# Add safe to disk
save_to_disk = MaterialIconAction(
text_position="under",
icon_name="file_save",
parent=self,
tooltip="Save config to disk",
label_text="Save Config",
)
self.toolbar.components.add_safe("save_to_disk", save_to_disk)
save_to_disk.action.triggered.connect(self._save_to_disk_action)
io_bundle.add_action("save_to_disk")
# Add flush config in redis
flush_redis = MaterialIconAction(
text_position="under",
icon_name="delete_sweep",
parent=self,
tooltip="Flush current config in BEC Server",
label_text="Flush loaded Config",
)
flush_redis.action.triggered.connect(self._flush_redis_action)
self.toolbar.components.add_safe("flush_redis", flush_redis)
io_bundle.add_action("flush_redis")
# Add load config from redis
load_redis = MaterialIconAction(
text_position="under",
icon_name="cached",
parent=self,
tooltip="Load current config from BEC Server",
label_text="Get loaded Config",
)
load_redis.action.triggered.connect(self._load_redis_action)
self.toolbar.components.add_safe("load_redis", load_redis)
io_bundle.add_action("load_redis")
# Update config action
update_config_redis = MaterialIconAction(
text_position="under",
icon_name="cloud_upload",
parent=self,
tooltip="Update current config in BEC Server",
label_text="Update Config",
)
update_config_redis.action.setEnabled(False)
update_config_redis.action.triggered.connect(self._update_redis_action)
self.toolbar.components.add_safe("update_config_redis", update_config_redis)
io_bundle.add_action("update_config_redis")
# Add load config from plugin dir
self.toolbar.add_bundle(io_bundle)
# Table actions
def _add_table_actions(self) -> None:
table_bundle = ToolbarBundle("Table", self.toolbar.components)
# Reset composed view
reset_composed = MaterialIconAction(
text_position="under",
icon_name="delete_sweep",
parent=self,
tooltip="Reset current composed config view",
label_text="Reset Config View",
)
reset_composed.action.triggered.connect(self._reset_composed_view)
self.toolbar.components.add_safe("reset_composed", reset_composed)
table_bundle.add_action("reset_composed")
# Add device
add_device = MaterialIconAction(
text_position="under",
icon_name="add",
parent=self,
tooltip="Add new device",
label_text="Add Device",
)
add_device.action.triggered.connect(self._add_device_action)
self.toolbar.components.add_safe("add_device", add_device)
table_bundle.add_action("add_device")
# Remove device
remove_device = MaterialIconAction(
text_position="under",
icon_name="remove",
parent=self,
tooltip="Remove device",
label_text="Remove Device",
)
remove_device.action.triggered.connect(self._remove_device_action)
self.toolbar.components.add_safe("remove_device", remove_device)
table_bundle.add_action("remove_device")
# Rerun validation
rerun_validation = MaterialIconAction(
text_position="under",
icon_name="checklist",
parent=self,
tooltip="Run device validation with 'connect' on selected devices",
label_text="Validate Connection",
)
rerun_validation.action.triggered.connect(self._run_validate_connection)
self.toolbar.components.add_safe("rerun_validation", rerun_validation)
table_bundle.add_action("rerun_validation")
# Add load config from plugin dir
self.toolbar.add_bundle(table_bundle)
######################################
### Update button state management ###
######################################
@SafeSlot(dict, dict)
def _update_scan_running(self, scan_info: dict, _: dict):
"""disable editing when scans are running and enable editing when they are finished"""
msg = ScanStatusMessage.model_validate(scan_info)
self._scan_is_running = msg.status in ["open", "paused"]
self._update_config_enabled_button()
def _update_config_in_sync(self, in_sync: bool):
self._config_in_sync = in_sync
self._update_config_enabled_button()
def _update_config_enabled_button(self):
action = self.toolbar.components.get_action("update_config_redis")
enabled = not self._config_in_sync and not self._scan_is_running
action.action.setEnabled(enabled)
if enabled: # button is enabled
action.action.setToolTip("Push current config to BEC Server")
elif self._scan_is_running:
action.action.setToolTip("Scan is currently running, config updates disabled.")
else:
action.action.setToolTip("Current config is in sync with BEC Server, updates disabled.")
#######################
### Action Handlers ###
#######################
@SafeSlot()
@SafeSlot(bool)
def _run_validate_connection(self, connect: bool = True):
"""Action for the 'rerun_validation' action to rerun validation on selected devices."""
configs = list(self.device_table_view.get_selected_device_configs())
if not configs:
configs = self.device_table_view.get_device_config()
# Adjust the state of the icons in the device table view
self.device_table_view.update_multiple_device_validations(
[
(cfg, ConfigStatus.UNKNOWN.value, ConnectionStatus.UNKNOWN.value, "")
for cfg in configs
]
)
self.request_ophyd_validation.emit(configs, True, connect)
@SafeSlot()
def _load_file_action(self):
"""Action for the 'load' action to load a config from disk for the io_bundle of the toolbar."""
config_path = self._get_config_base_path()
# Implement the file loading logic here
start_dir = os.path.abspath(config_path)
file_path = self._get_file_path(start_dir, "open_file")
if file_path:
self._load_config_from_file(file_path)
def _get_config_base_path(self) -> str:
"""Get the base path for device configurations."""
try:
plugin_path = plugin_repo_path()
plugin_name = plugin_package_name()
config_path = os.path.join(plugin_path, plugin_name, "device_configs")
except ValueError:
# Get the recovery config path as fallback
config_path = self._get_recovery_config_path()
logger.warning(
f"No plugin repository installed, fallback to recovery config path: {config_path}"
)
return config_path
def _get_file_path(self, start_dir: str, mode: Literal["open_file", "save_file"]) -> str:
ALLOWED_EXTS = [".yaml", ".yml"]
filter_str = "YAML files (*.yaml *.yml);;All Files (*)"
initial_filter = "YAML files (*.yaml *.yml);;"
if mode == "open_file":
file_path, _ = QFileDialog.getOpenFileName(
self,
caption="Select Config File",
dir=start_dir,
filter=filter_str,
selectedFilter=initial_filter,
)
else:
file_path, _ = QFileDialog.getSaveFileName(
self,
caption="Save Config File",
dir=start_dir,
filter=filter_str,
selectedFilter=initial_filter,
)
if not file_path:
return ""
_, ext = os.path.splitext(file_path)
if ext.lower() not in ALLOWED_EXTS:
file_path += ".yaml"
return file_path
def _load_config_from_file(self, file_path: str):
"""
Load device config from a given file path and update the device table view.
Args:
file_path (str): Path to the configuration file.
"""
try:
config = [{"name": k, **v} for k, v in yaml_load(file_path).items()]
except Exception as e:
logger.error(f"Failed to load config from file {file_path}. Error: {e}")
return
self._open_config_choice_dialog(config)
def _open_config_choice_dialog(self, config: List[dict]):
"""
Open a dialog to choose whether to replace or add the loaded config.
Args:
config (List[dict]): List of device configurations loaded from the file.
"""
if len(self.device_table_view.get_device_config()) == 0:
# If no config is composed yet, load directly
self.device_table_view.set_device_config(config)
return
dialog = ConfigChoiceDialog(self)
result = dialog.exec()
if result == ConfigChoiceDialog.Result.REPLACE:
self.device_table_view.set_device_config(config)
elif result == ConfigChoiceDialog.Result.ADD:
self.device_table_view.add_device_configs(config)
@SafeSlot()
def _flush_redis_action(self):
"""Action to flush the current config in Redis."""
if self.client.device_manager is None:
logger.error("No device manager connected, cannot load config from BEC Server.")
return
if len(self.client.device_manager.devices) == 0:
logger.info("No devices in BEC Server, nothing to flush.")
QMessageBox.information(
self, "No Devices", "There is currently no config loaded on the BEC Server."
)
return
reply = _yes_no_question(
self,
"Flush BEC Server Config",
"Do you really want to flush the current config in BEC Server?",
)
if reply == QMessageBox.StandardButton.Yes:
self.client.config.reset_config()
logger.info("Successfully flushed configuration in BEC Server.")
# Check if config is in sync, enable load redis button
self.device_table_view.device_config_in_sync_with_redis.emit(
self.device_table_view._is_config_in_sync_with_redis()
)
validation_results = self.device_table_view.get_validation_results()
for config, config_status, connection_status in validation_results.values():
if connection_status == ConnectionStatus.CONNECTED.value:
self.device_table_view.update_device_validation(
config, config_status, ConnectionStatus.CAN_CONNECT, ""
)
@SafeSlot()
def _load_redis_action(self):
"""Action for the 'load_redis' action to load the current config from Redis for the io_bundle of the toolbar."""
if self.client.device_manager is None:
logger.error("No device manager connected, cannot load config from BEC Server.")
return
if not self.device_table_view.get_device_config():
# If no config is composed yet, load directly
self.device_table_view.set_device_config(
self.client.device_manager._get_redis_device_config()
)
return
reply = _yes_no_question(
self,
"Load currently active config in BEC Server",
"Do you really want to discard the current config and reload?",
)
if reply == QMessageBox.StandardButton.Yes:
self.device_table_view.set_device_config(
self.client.device_manager._get_redis_device_config()
)
@SafeSlot()
def _update_redis_action(self) -> None | QMessageBox.StandardButton:
"""Action to push the current composition to Redis using the upload dialog."""
# Check if validations are still running
if self.ophyd_test_view.running_ophyd_tests is True:
return QMessageBox.warning(
self, "Validation in Progress", "Please wait for the validation to finish."
)
# Get all device configurations with their validation status
validation_results = self.device_table_view.get_validation_results()
# Create and show upload dialog
self._upload_redis_dialog = UploadRedisDialog(
parent=self, device_configs=validation_results
)
self._upload_redis_dialog.request_ophyd_validation.connect(
self.request_ophyd_validation.emit
)
# Show dialog
reply = self._upload_redis_dialog.exec_()
if reply == UploadRedisDialog.UploadAction.OK:
self._push_composition_to_redis(action="set")
elif reply == UploadRedisDialog.UploadAction.CANCEL:
self.ophyd_test_view.cancel_all_validations()
elif reply == UploadRedisDialog.UploadAction.CONNECTION_TEST_REQUESTED:
return QMessageBox.information(
self, "Connection Test Requested", "Running connection test on untested devices."
)
def _push_composition_to_redis(self, action: ConfigAction):
"""Push the current device composition to Redis."""
if action not in get_args(ConfigAction):
logger.error(f"Invalid config action: {action} for uploading to BEC Server.")
return
config = {cfg.pop("name"): cfg for cfg in self.device_table_view.get_device_config()}
threadpool = QThreadPool.globalInstance()
comm = CommunicateConfigAction(self._config_helper, None, config, action)
comm.signals.done.connect(self._handle_push_complete_to_communicator)
comm.signals.error.connect(self._handle_exception_from_communicator)
threadpool.start(comm)
self._set_busy_wrapper(enabled=True)
def _cancel_device_config_upload(self):
"""Cancel the device configuration upload process."""
threadpool = QThreadPool.globalInstance()
comm = CommunicateConfigAction(self._config_helper, None, {}, "cancel")
# Cancelling will raise an exception in the communicator, so we connect to the failure handler
comm.signals.error.connect(self._handle_cancel_config_upload_failed)
threadpool.start(comm)
def _handle_cancel_config_upload_failed(self, exception: Exception):
"""Handle failure to cancel the config upload."""
self._set_busy_wrapper(enabled=False)
validation_results = self.device_table_view.get_validation_results()
devices_to_update = []
for config, config_status, connection_status in validation_results.values():
devices_to_update.append(
(config, config_status, ConnectionStatus.UNKNOWN.value, "Upload Cancelled")
)
# Rerun validation of all devices after cancellation
self.device_table_view.update_multiple_device_validations(devices_to_update)
self.ophyd_test_view.change_device_configs(
[cfg for cfg, _, _, _ in devices_to_update], added=True, skip_validation=False
)
# Config is in sync with BEC, so we update the state
self.device_table_view.device_config_in_sync_with_redis.emit(False)
def _handle_push_complete_to_communicator(self):
"""Handle completion of the config push to Redis."""
self._set_busy_wrapper(enabled=False)
def _handle_exception_from_communicator(self, exception: Exception):
"""Handle exceptions from the config communicator."""
QMessageBox.critical(
self,
"Error Uploading Config",
f"An error occurred while uploading the configuration to BEC Server:\n{str(exception)}",
)
self._set_busy_wrapper(enabled=False)
@SafeSlot()
def _save_to_disk_action(self):
"""Action for the 'save_to_disk' action to save the current config to disk."""
# Check if plugin repo is installed...
try:
config_path = self._get_recovery_config_path()
except ValueError:
# Get the recovery config path as fallback
config_path = os.path.abspath(os.path.expanduser("~"))
logger.warning(f"Failed to find recovery config path, fallback to: {config_path}")
# Implement the file loading logic here
file_path = self._get_file_path(config_path, "save_file")
if file_path:
config = {cfg.pop("name"): cfg for cfg in self.device_table_view.get_device_config()}
if os.path.exists(file_path):
reply = _yes_no_question(
self,
"Overwrite File",
f"The file '{file_path}' already exists. Do you want to overwrite it?",
)
if reply != QMessageBox.StandardButton.Yes:
return
with open(file_path, "w") as file:
file.write(yaml.dump(config))
# Table actions
@SafeSlot()
def _reset_composed_view(self):
"""Action for the 'reset_composed_view' action to reset the composed view."""
reply = _yes_no_question(
self,
"Clear View",
"You are about to clear the current composed config view, please confirm...",
)
if reply == QMessageBox.StandardButton.Yes:
self.device_table_view.clear_device_configs()
@SafeSlot(dict)
def _edit_device_action(self, device_config: dict):
"""Action to edit a selected device configuration."""
dialog = DeviceFormDialog(parent=self, add_btn_text="Apply Changes")
dialog.accepted_data.connect(self._update_device_to_table_from_dialog)
dialog.set_device_config(device_config)
dialog.open()
@SafeSlot()
def _add_device_action(self):
"""Action for the 'add_device' action to add a new device."""
dialog = DeviceFormDialog(parent=self, add_btn_text="Add Device")
dialog.accepted_data.connect(self._add_to_table_from_dialog)
dialog.open()
@SafeSlot(dict, int, int, str, str)
def _update_device_to_table_from_dialog(
self,
data: dict,
config_status: int,
connection_status: int,
msg: str,
old_device_name: str = "",
):
if old_device_name and old_device_name != data.get("name", ""):
self.device_table_view.remove_device(old_device_name)
self._add_to_table_from_dialog(data, config_status, connection_status, msg, old_device_name)
@SafeSlot(dict, int, int, str, str)
def _add_to_table_from_dialog(
self,
data: dict,
config_status: int,
connection_status: int,
msg: str,
old_device_name: str = "",
):
if connection_status == ConnectionStatus.UNKNOWN.value:
self.device_table_view.update_device_configs([data], skip_validation=False)
else: # Connection status was tested in dialog
# If device is connected, we remove it from the ophyd validation view
self.device_table_view.update_device_configs([data], skip_validation=True)
# Update validation status in device table view and ophyd validation view
self.ophyd_test_view._on_device_test_completed(
data, config_status, connection_status, msg
)
@SafeSlot()
def _remove_device_action(self):
"""Action for the 'remove_device' action to remove a device."""
configs = self.device_table_view.get_selected_device_configs()
if not configs:
QMessageBox.warning(
self, "No devices selected", "Please select devices from the table to remove."
)
return
if self.device_table_view._remove_configs_dialog([cfg["name"] for cfg in configs]):
self.device_table_view.remove_device_configs(configs)
@SafeSlot(dict, int, int, str, str)
def _ophyd_test_item_clicked_cb(
self, device_config: dict, config_status: int, connection_status: int, msg: str, md_msg: str
) -> None:
self.validation_results.setMarkdown(md_msg)
def _get_recovery_config_path(self) -> str:
"""Get the recovery config path from the log_writer config."""
# pylint: disable=protected-access
log_writer_config = self.client._service_config.config.get("log_writer", {})
writer = DeviceConfigWriter(service_config=log_writer_config)
return os.path.abspath(os.path.expanduser(writer.get_recovery_directory()))
if __name__ == "__main__": # pragma: no cover
import sys
from qtpy.QtWidgets import QApplication
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
app = QApplication(sys.argv)
w = QWidget()
l = QVBoxLayout()
w.setLayout(l)
apply_theme("dark")
button = DarkModeButton()
l.addWidget(button)
device_manager_view = DeviceManagerDisplayWidget()
l.addWidget(device_manager_view)
w.show()
w.setWindowTitle("Device Manager View")
screen = app.primaryScreen()
screen_geometry = screen.availableGeometry()
screen_width = screen_geometry.width()
screen_height = screen_geometry.height()
# 70% of screen height, keep 16:9 ratio
height = int(screen_height * 0.9)
width = int(height * (16 / 9))
# If width exceeds screen width, scale down
if width > screen_width * 0.9:
width = int(screen_width * 0.9)
height = int(width / (16 / 9))
w.resize(width, height)
sys.exit(app.exec_())
@@ -1,188 +0,0 @@
"""Module for Device Manager View."""
from qtpy.QtCore import QRect
from qtpy.QtWidgets import QWidget
from bec_widgets.applications.views.device_manager_view.device_manager_widget import (
DeviceManagerWidget,
)
from bec_widgets.applications.views.view import ViewBase, ViewTourSteps
from bec_widgets.utils.error_popups import SafeSlot
class DeviceManagerView(ViewBase):
"""
A view for users to manage devices within the application.
"""
def __init__(
self,
parent: QWidget | None = None,
content: QWidget | None = None,
*,
view_id: str | None = None,
title: str | None = None,
**kwargs,
):
super().__init__(
parent=parent,
content=content,
view_id=view_id,
title=title,
rpc_passthrough_children=False,
**kwargs,
)
self.device_manager_widget = DeviceManagerWidget(
parent=self, rpc_exposed=False, rpc_passthrough_children=False
)
self.set_content(self.device_manager_widget)
@SafeSlot()
def on_enter(self) -> None:
"""Called after the view becomes current/visible.
Default implementation does nothing. Override in subclasses.
"""
self.device_manager_widget.on_enter()
def register_tour_steps(self, guided_tour, main_app) -> ViewTourSteps | None:
"""Register Device Manager components with the guided tour.
Args:
guided_tour: The GuidedTour instance to register with.
main_app: The main application instance (for accessing set_current).
Returns:
ViewTourSteps | None: Model containing view title and step IDs.
"""
step_ids = []
dm_widget = self.device_manager_widget
# The device_manager_widget is not yet initialized, so we will register
# tour steps for its uninitialized state.
# Register Load Current Config button
def get_load_current():
main_app.set_current("device_manager")
if dm_widget._initialized is True:
return (None, None)
return (dm_widget.button_load_current_config, None)
step_id = guided_tour.register_widget(
widget=get_load_current,
title="Load Current Config",
text="Load the current device configuration from the BEC server.",
)
step_ids.append(step_id)
# Register Load Config From File button
def get_load_file():
main_app.set_current("device_manager")
if dm_widget._initialized is True:
return (None, None)
return (dm_widget.button_load_config_from_file, None)
step_id = guided_tour.register_widget(
widget=get_load_file,
title="Load Config From File",
text="Load a device configuration from a YAML file on disk.",
)
step_ids.append(step_id)
## Register steps for the initialized state
# Register main device table
def get_device_table():
main_app.set_current("device_manager")
if dm_widget._initialized is False:
return (None, None)
return (dm_widget.device_manager_display.device_table_view, None)
step_id = guided_tour.register_widget(
widget=get_device_table,
title="Device Table",
text="This table displays the config that is prepared to be uploaded to the BEC server. It allows users to review and modify device config settings, and also validate them before uploading to the BEC server.",
)
step_ids.append(step_id)
col_text_mapping = {
0: "Shows if a device configuration is valid. Automatically validated when adding a new device.",
1: "Shows if a device is connectable. Validated on demand.",
2: "Device name, unique across all devices within a config.",
3: "Device class used to initialize the device on the BEC server.",
4: "Defines how BEC treats readings of the device during scans. The options are 'monitored', 'baseline', 'async', 'continuous' or 'on_demand'.",
5: "Defines how BEC reacts if a device readback fails. Options are 'raise', 'retry', or 'buffer'.",
6: "User-defined tags associated with the device.",
7: "A brief description of the device.",
8: "Device is enabled when the configuration is loaded.",
9: "Device is set to read-only.",
10: "This flag allows to configure if the 'trigger' method of the device is called during scans.",
}
# We have at least one device registered
def get_device_table_row(column: int):
main_app.set_current("device_manager")
if dm_widget._initialized is False:
return (None, None)
table = dm_widget.device_manager_display.device_table_view.table
header = table.horizontalHeader()
x = header.sectionViewportPosition(column)
table.horizontalScrollBar().setValue(x)
# Recompute after scrolling
x = header.sectionViewportPosition(column)
w = header.sectionSize(column)
h = header.height()
rect = QRect(x, 0, w, h)
top_left = header.viewport().mapTo(main_app, rect.topLeft())
return (QRect(top_left, rect.size()), col_text_mapping.get(column, ""))
for col, text in col_text_mapping.items():
step_id = guided_tour.register_widget(
widget=lambda col=col: get_device_table_row(col),
title=f"{dm_widget.device_manager_display.device_table_view.table.horizontalHeaderItem(col).text()}",
text=text,
)
step_ids.append(step_id)
if not step_ids:
return None
return ViewTourSteps(view_title="Device Manager", step_ids=step_ids)
if __name__ == "__main__": # pragma: no cover
import sys
from bec_qthemes import apply_theme
from qtpy.QtWidgets import QApplication
from bec_widgets.applications.main_app import BECMainApp
app = QApplication(sys.argv)
apply_theme("dark")
_app = BECMainApp()
screen = app.primaryScreen()
screen_geometry = screen.availableGeometry()
screen_width = screen_geometry.width()
screen_height = screen_geometry.height()
# 70% of screen height, keep 16:9 ratio
height = int(screen_height * 0.9)
width = int(height * (16 / 9))
# If width exceeds screen width, scale down
if width > screen_width * 0.9:
width = int(screen_width * 0.9)
height = int(width / (16 / 9))
_app.resize(width, height)
device_manager_view = DeviceManagerView()
_app.add_view(
icon="display_settings",
title="Device Manager",
view_id="device_manager",
widget=device_manager_view.device_manager_widget,
mini_text="DM",
)
_app.show()
sys.exit(app.exec_())
@@ -1,112 +0,0 @@
"""Top Level wrapper for device_manager widget"""
from __future__ import annotations
import os
from bec_lib.bec_yaml_loader import yaml_load
from bec_lib.logger import bec_logger
from bec_qthemes import material_icon
from qtpy import QtCore, QtWidgets
from bec_widgets.applications.views.device_manager_view.device_manager_display_widget import (
DeviceManagerDisplayWidget,
)
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeSlot
logger = bec_logger.logger
class DeviceManagerWidget(BECWidget, QtWidgets.QWidget):
RPC = False
def __init__(self, parent=None, client=None, **kwargs):
super().__init__(parent=parent, client=client, **kwargs)
self.stacked_layout = QtWidgets.QStackedLayout()
self.stacked_layout.setContentsMargins(0, 0, 0, 0)
self.stacked_layout.setSpacing(0)
self.stacked_layout.setStackingMode(QtWidgets.QStackedLayout.StackAll)
self.setLayout(self.stacked_layout)
# Add device manager view
self.device_manager_display = DeviceManagerDisplayWidget(parent=self, client=self.client)
self.stacked_layout.addWidget(self.device_manager_display)
# Add overlay widget
self._overlay_widget = QtWidgets.QWidget(self)
self._customize_overlay()
self.stacked_layout.addWidget(self._overlay_widget)
self._initialized = False
def on_enter(self) -> None:
"""Called after the widget becomes visible."""
if self._initialized is False:
self.stacked_layout.setCurrentWidget(self._overlay_widget)
def _customize_overlay(self):
self._overlay_widget.setAutoFillBackground(True)
self._overlay_layout = QtWidgets.QVBoxLayout()
self._overlay_layout.setAlignment(QtCore.Qt.AlignmentFlag.AlignCenter)
self._overlay_widget.setLayout(self._overlay_layout)
self._overlay_widget.setSizePolicy(
QtWidgets.QSizePolicy.Policy.Expanding, QtWidgets.QSizePolicy.Policy.Expanding
)
# Load current config
self.button_load_current_config = QtWidgets.QPushButton("Load Current Config")
icon = material_icon(icon_name="database", size=(24, 24), convert_to_pixmap=False)
self.button_load_current_config.setIcon(icon)
self._overlay_layout.addWidget(self.button_load_current_config)
self.button_load_current_config.clicked.connect(self._load_config_clicked)
# Load config from disk
self.button_load_config_from_file = QtWidgets.QPushButton("Load Config From File")
icon = material_icon(icon_name="folder", size=(24, 24), convert_to_pixmap=False)
self.button_load_config_from_file.setIcon(icon)
self._overlay_layout.addWidget(self.button_load_config_from_file)
self.button_load_config_from_file.clicked.connect(self._load_config_from_file_clicked)
self._overlay_widget.setVisible(True)
def _load_config_from_file_clicked(self):
"""Handle click on 'Load Config From File' button."""
self.device_manager_display._load_file_action()
self._initialized = True # Set initialized to True after first load
self.stacked_layout.setCurrentWidget(self.device_manager_display)
@SafeSlot()
def _load_config_clicked(self):
"""Handle click on 'Load Current Config' button."""
config = self.client.device_manager._get_redis_device_config()
self.device_manager_display.device_table_view.set_device_config(config)
self._initialized = True # Set initialized to True after first load
self.stacked_layout.setCurrentWidget(self.device_manager_display)
if __name__ == "__main__": # pragma: no cover
import sys
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
from bec_widgets.utils.colors import apply_theme
apply_theme("light")
widget = QtWidgets.QWidget()
layout = QtWidgets.QVBoxLayout(widget)
widget.setLayout(layout)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(0)
device_manager = DeviceManagerWidget()
# config = device_manager.client.device_manager._get_redis_device_config()
# device_manager.device_table_view.set_device_config(config)
layout.addWidget(device_manager)
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
dark_mode_button = DarkModeButton()
layout.addWidget(dark_mode_button)
widget.show()
device_manager.setWindowTitle("Device Manager View")
device_manager.resize(1600, 1200)
# developer_view.set_stretch(horizontal=[1, 3, 2], vertical=[5, 5]) #can be set during runtime
sys.exit(app.exec_())
@@ -1,31 +0,0 @@
from qtpy.QtWidgets import QWidget
from bec_widgets.applications.views.view import ViewBase
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
class DockAreaView(ViewBase):
"""
Modular dock area view for arranging and managing multiple dockable widgets.
"""
RPC_CONTENT_CLASS = BECDockArea
def __init__(
self,
parent: QWidget | None = None,
content: QWidget | None = None,
*,
view_id: str | None = None,
title: str | None = None,
**kwargs,
):
super().__init__(parent=parent, content=content, view_id=view_id, title=title, **kwargs)
self.dock_area = BECDockArea(
self,
profile_namespace="bec",
auto_profile_namespace=False,
object_name="DockArea",
rpc_exposed=False,
)
self.set_content(self.dock_area)
-320
View File
@@ -1,320 +0,0 @@
from __future__ import annotations
from typing import List
from pydantic import BaseModel
from qtpy.QtCore import QEventLoop
from qtpy.QtWidgets import (
QDialog,
QDialogButtonBox,
QFormLayout,
QHBoxLayout,
QLabel,
QMessageBox,
QPushButton,
QStackedLayout,
QVBoxLayout,
QWidget,
)
from bec_widgets import BECWidget
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import SignalComboBox
from bec_widgets.widgets.plots.waveform.waveform import Waveform
class ViewTourSteps(BaseModel):
"""Model representing tour steps for a view.
Attributes:
view_title: The human-readable title of the view.
step_ids: List of registered step IDs in the order they should appear.
"""
view_title: str
step_ids: List[str]
class ViewBase(BECWidget, QWidget):
"""Wrapper for a content widget used inside the main app's stacked view.
Subclasses can implement `on_enter` and `on_exit` to run custom logic when the view becomes visible or is about to be hidden.
Args:
content (QWidget): The actual view widget to display.
parent (QWidget | None): Parent widget.
view_id (str | None): Optional view view_id, useful for debugging or introspection.
title (str | None): Optional human-readable title.
"""
RPC = True
PLUGIN = False
USER_ACCESS = ["activate"]
RPC_CONTENT_CLASS: type[QWidget] | None = None
RPC_CONTENT_ATTR = "content"
def __init__(
self,
parent: QWidget | None = None,
content: QWidget | None = None,
*,
view_id: str | None = None,
title: str | None = None,
**kwargs,
):
super().__init__(parent=parent, **kwargs)
self.content: QWidget | None = None
self.view_id = view_id
self.view_title = title
lay = QVBoxLayout(self)
lay.setContentsMargins(0, 0, 0, 0)
lay.setSpacing(0)
if content is not None:
self.set_content(content)
def set_content(self, content: QWidget) -> None:
"""Replace the current content widget with a new one."""
if self.content is not None:
self.content.setParent(None)
self.content = content
self.layout().addWidget(content)
@SafeSlot()
def on_enter(self) -> None:
"""Called after the view becomes current/visible.
Default implementation does nothing. Override in subclasses.
"""
pass
@SafeSlot()
def on_exit(self) -> bool:
"""Called before the view is switched away/hidden.
Return True to allow switching, or False to veto.
Default implementation allows switching.
"""
return True
@SafeSlot()
def activate(self) -> None:
"""Switch the parent application to this view."""
if not self.view_id:
raise ValueError("Cannot switch view without a view_id.")
parent = self.parent()
while parent is not None:
if hasattr(parent, "set_current"):
parent.set_current(self.view_id)
return
parent = parent.parent()
raise RuntimeError("Could not find a parent application with set_current().")
def cleanup(self):
if self.content is not None:
self.content.close()
self.content.deleteLater()
super().cleanup()
def register_tour_steps(self, guided_tour, main_app) -> ViewTourSteps | None:
"""Register this view's components with the guided tour.
Args:
guided_tour: The GuidedTour instance to register with.
main_app: The main application instance (for accessing set_current).
Returns:
ViewTourSteps | None: A model containing the view title and step IDs,
or None if this view has no tour steps.
Override this method in subclasses to register view-specific components.
"""
return None
####################################################################################################
# Example views for demonstration/testing purposes
####################################################################################################
# --- Popup UI version ---
class WaveformViewPopup(ViewBase): # pragma: no cover
def __init__(self, parent=None, *args, **kwargs):
super().__init__(parent=parent, *args, **kwargs)
self.waveform = Waveform(parent=self)
self.set_content(self.waveform)
@SafeSlot()
def on_enter(self) -> None:
dialog = QDialog(self)
dialog.setWindowTitle("Configure Waveform View")
label = QLabel("Select device and signal for the waveform plot:", parent=dialog)
# same as in the CurveRow used in waveform
self.device_edit = DeviceComboBox(parent=self)
self.device_edit.insertItem(0, "")
self.device_edit.setEditable(True)
self.device_edit.setCurrentIndex(0)
self.signal_edit = SignalComboBox(parent=self)
self.signal_edit.include_config_signals = False
self.signal_edit.insertItem(0, "")
self.signal_edit.setEditable(True)
self.device_edit.currentTextChanged.connect(self.signal_edit.set_device)
self.device_edit.device_reset.connect(self.signal_edit.reset_selection)
form = QFormLayout()
form.addRow(label)
form.addRow("Device", self.device_edit)
form.addRow("Signal", self.signal_edit)
buttons = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel, parent=dialog)
buttons.accepted.connect(dialog.accept)
buttons.rejected.connect(dialog.reject)
v = QVBoxLayout(dialog)
v.addLayout(form)
v.addWidget(buttons)
if dialog.exec_() == QDialog.Accepted:
self.waveform.plot(
device_y=self.device_edit.currentText(), signal_y=self.signal_edit.currentText()
)
@SafeSlot()
def on_exit(self) -> bool:
ans = QMessageBox.question(
self,
"Switch and clear?",
"Do you want to switch views and clear the plot?",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No,
)
if ans == QMessageBox.Yes:
self.waveform.clear_all()
return True
return False
# --- Inline stacked UI version ---
class WaveformViewInline(ViewBase): # pragma: no cover
def __init__(self, parent=None, *args, **kwargs):
super().__init__(parent=parent, *args, **kwargs)
# Root layout for this view uses a stacked layout
self.stack = QStackedLayout()
container = QWidget(self)
container.setLayout(self.stack)
self.set_content(container)
# --- Page 0: Settings page (inline form)
self.settings_page = QWidget()
sp_layout = QVBoxLayout(self.settings_page)
sp_layout.setContentsMargins(16, 16, 16, 16)
sp_layout.setSpacing(12)
title = QLabel("Select device and signal for the waveform plot:", parent=self.settings_page)
self.device_edit = DeviceComboBox(parent=self.settings_page)
self.device_edit.insertItem(0, "")
self.device_edit.setEditable(True)
self.device_edit.setCurrentIndex(0)
self.entry_edit = SignalComboBox(parent=self.settings_page)
self.entry_edit.include_config_signals = False
self.entry_edit.insertItem(0, "")
self.entry_edit.setEditable(True)
self.device_edit.currentTextChanged.connect(self.entry_edit.set_device)
self.device_edit.device_reset.connect(self.entry_edit.reset_selection)
form = QFormLayout()
form.addRow(title)
form.addRow("Device", self.device_edit)
form.addRow("Signal", self.entry_edit)
btn_row = QHBoxLayout()
ok_btn = QPushButton("OK", parent=self.settings_page)
cancel_btn = QPushButton("Cancel", parent=self.settings_page)
btn_row.addStretch(1)
btn_row.addWidget(cancel_btn)
btn_row.addWidget(ok_btn)
sp_layout.addLayout(form)
sp_layout.addLayout(btn_row)
# --- Page 1: Waveform page
self.waveform_page = QWidget()
wf_layout = QVBoxLayout(self.waveform_page)
wf_layout.setContentsMargins(0, 0, 0, 0)
self.waveform = Waveform(parent=self.waveform_page)
wf_layout.addWidget(self.waveform)
# --- Page 2: Exit confirmation page (inline)
self.confirm_page = QWidget()
cp_layout = QVBoxLayout(self.confirm_page)
cp_layout.setContentsMargins(16, 16, 16, 16)
cp_layout.setSpacing(12)
qlabel = QLabel("Do you want to switch views and clear the plot?", parent=self.confirm_page)
cp_buttons = QHBoxLayout()
no_btn = QPushButton("No", parent=self.confirm_page)
yes_btn = QPushButton("Yes", parent=self.confirm_page)
cp_buttons.addStretch(1)
cp_buttons.addWidget(no_btn)
cp_buttons.addWidget(yes_btn)
cp_layout.addWidget(qlabel)
cp_layout.addLayout(cp_buttons)
# Add pages to the stack
self.stack.addWidget(self.settings_page) # index 0
self.stack.addWidget(self.waveform_page) # index 1
self.stack.addWidget(self.confirm_page) # index 2
# Wire settings buttons
ok_btn.clicked.connect(self._apply_settings_and_show_waveform)
cancel_btn.clicked.connect(self._show_waveform_without_changes)
# Prepare result holder for the inline confirmation
self._exit_choice_yes = None
yes_btn.clicked.connect(lambda: self._exit_reply(True))
no_btn.clicked.connect(lambda: self._exit_reply(False))
@SafeSlot()
def on_enter(self) -> None:
# Always start on the settings page when entering
self.stack.setCurrentIndex(0)
@SafeSlot()
def on_exit(self) -> bool:
# Show inline confirmation page and synchronously wait for a choice
# -> trick to make the choice blocking, however popup would be cleaner solution
self._exit_choice_yes = None
self.stack.setCurrentIndex(2)
loop = QEventLoop()
self._exit_loop = loop
loop.exec_()
if self._exit_choice_yes:
self.waveform.clear_all()
return True
# Revert to waveform view if user cancelled switching
self.stack.setCurrentIndex(1)
return False
def _apply_settings_and_show_waveform(self):
dev = self.device_edit.currentText()
sig = self.entry_edit.currentText()
if dev and sig:
self.waveform.plot(device_y=dev, signal_y=sig)
self.stack.setCurrentIndex(1)
def _show_waveform_without_changes(self):
# Just show waveform page without plotting
self.stack.setCurrentIndex(1)
def _exit_reply(self, yes: bool):
self._exit_choice_yes = bool(yes)
if hasattr(self, "_exit_loop") and self._exit_loop.isRunning():
self._exit_loop.quit()
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
-1
View File
@@ -1 +0,0 @@
from bec_widgets.cli.rpc import rpc_base
+663 -2906
View File
File diff suppressed because it is too large Load Diff
+52 -332
View File
@@ -5,41 +5,31 @@ from __future__ import annotations
import json
import os
import select
import signal
import subprocess
import threading
import time
from contextlib import contextmanager
from threading import Lock
from typing import TYPE_CHECKING, Callable, Literal, TypeAlias, cast
from typing import TYPE_CHECKING, Literal, TypeAlias, cast
from bec_lib.endpoints import EndpointInfo, MessageEndpoints
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
from bec_lib.utils.import_utils import lazy_import_from
from rich.console import Console
from rich.table import Table
import bec_widgets.cli.client as client
from bec_widgets.cli.rpc.rpc_base import RPCBase, RPCReference
from bec_widgets.utils.serialization import register_serializer_extension
if TYPE_CHECKING: # pragma: no cover
from bec_lib.messages import GUIRegistryStateMessage
import bec_widgets.cli.client as client
else:
GUIRegistryStateMessage = lazy_import_from("bec_lib.messages", "GUIRegistryStateMessage")
client = lazy_import("bec_widgets.cli.client")
logger = bec_logger.logger
IGNORE_WIDGETS = ["LaunchWindow"]
PROCESS_TERMINATION_TIMEOUT = 10
PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT = 2
PROCESS_OUTPUT_SELECT_TIMEOUT = 0.2
GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT = 3
GRACEFUL_SERVER_SHUTDOWN_TIMEOUT = 5
OUTPUT_READER_STOP_EVENT_ATTR = "_bec_output_reader_stop_event"
RegistryState: TypeAlias = dict[
Literal["gui_id", "name", "widget_class", "config", "__rpc__", "container_proxy"],
@@ -60,16 +50,14 @@ def _filter_output(output: str) -> str:
return output
def _get_output(process, logger, stop_event: threading.Event | None = None) -> None:
log_func = {process.stdout: logger.debug, process.stderr: logger.info}
def _get_output(process, logger) -> None:
log_func = {process.stdout: logger.debug, process.stderr: logger.error}
stream_buffer = {process.stdout: [], process.stderr: []}
try:
os.set_blocking(process.stdout.fileno(), False)
os.set_blocking(process.stderr.fileno(), False)
while process.poll() is None and not (stop_event and stop_event.is_set()):
readylist, _, _ = select.select(
[process.stdout, process.stderr], [], [], PROCESS_OUTPUT_SELECT_TIMEOUT
)
while process.poll() is None:
readylist, _, _ = select.select([process.stdout, process.stderr], [], [], 1)
for stream in (process.stdout, process.stderr):
buf = stream_buffer[stream]
if stream in readylist:
@@ -84,95 +72,6 @@ def _get_output(process, logger, stop_event: threading.Event | None = None) -> N
logger.error(f"Error reading process output: {str(e)}")
def _process_group_snapshot(process) -> str:
try:
pgid = os.getpgid(process.pid)
except ProcessLookupError:
return "Process group snapshot unavailable: process already exited"
try:
result = subprocess.run(
["ps", "-o", "pid,ppid,pgid,stat,command", "-g", str(pgid)],
check=False,
capture_output=True,
text=True,
timeout=2,
)
except Exception as exc:
return f"Process group snapshot unavailable: {exc}"
output = result.stdout.strip()
if not output:
return f"Process group snapshot empty for pgid={pgid}"
return output
def _terminate_plot_process(process, logger, timeout: float = PROCESS_TERMINATION_TIMEOUT) -> None:
if process.poll() is not None:
return
process_info = f"pid={process.pid} command={process.args}"
try:
pgid = os.getpgid(process.pid)
process_info = f"pid={process.pid} pgid={pgid} command={process.args}"
logger.info(f"Terminating GUI process group {process_info}")
os.killpg(pgid, signal.SIGTERM)
except ProcessLookupError:
process.wait(timeout=timeout)
return
except Exception as exc:
logger.warning("Failed to terminate GUI process group; terminating process only.")
logger.info(f"GUI process termination failure details: {exc}. pid={process.pid}")
process.terminate()
try:
process.wait(timeout=timeout)
return
except subprocess.TimeoutExpired:
logger.warning(f"GUI process did not stop within {timeout}s; killing it.")
logger.info(
f"GUI process force-kill details: {process_info}\n"
f"{_process_group_snapshot(process)}"
)
try:
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
except ProcessLookupError as e:
logger.error(f"Failed to kill GUI process group: {e}")
process.wait(timeout=timeout)
return
process.wait(timeout=timeout)
def _wait_for_process_exit(process, timeout: float) -> bool:
try:
process.wait(timeout=timeout)
except subprocess.TimeoutExpired:
return False
return True
def _join_process_output_thread(process, thread: threading.Thread | None, logger) -> None:
if thread is None:
return
thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT)
if not thread.is_alive():
return
if stop_event := getattr(thread, OUTPUT_READER_STOP_EVENT_ATTR, None):
stop_event.set()
for stream in (process.stdout, process.stderr):
if stream is None:
continue
try:
stream.close()
except OSError as e:
logger.error(f"Failed to close stream {str(e)}")
thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT)
if thread.is_alive():
logger.warning("GUI process output reader thread did not stop after process shutdown.")
logger.info(f"GUI process output reader thread details: pid={process.pid}")
def _start_plot_process(
gui_id: str,
gui_class_id: str,
@@ -224,14 +123,8 @@ def _start_plot_process(
if logger is None:
process_output_processing_thread = None
else:
process_output_stop_event = threading.Event()
process_output_processing_thread = threading.Thread(
target=_get_output, args=(process, logger, process_output_stop_event)
)
setattr(
process_output_processing_thread,
OUTPUT_READER_STOP_EVENT_ATTR,
process_output_stop_event,
target=_get_output, args=(process, logger)
)
process_output_processing_thread.start()
return process, process_output_processing_thread
@@ -258,10 +151,8 @@ def wait_for_server(client: BECGuiClient):
raise RuntimeError("GUI is not alive")
try:
if client._gui_started_event.wait(timeout=timeout):
if client._gui_started_timer is not None:
# cancel the timer, we are done
client._gui_started_timer.cancel()
client._gui_started_timer.join()
client._gui_started_timer.cancel()
client._gui_started_timer.join()
else:
raise TimeoutError("Could not connect to GUI server")
finally:
@@ -326,7 +217,6 @@ class BECGuiClient(RPCBase):
self._ipython_registry: dict[str, RPCReference] = {}
self.available_widgets = AvailableWidgetsNamespace()
register_serializer_extension()
self._rpc_timeout = 60
####################
#### Client API ####
@@ -337,21 +227,6 @@ class BECGuiClient(RPCBase):
"""The launcher object."""
return RPCBase(gui_id=f"{self._gui_id}:launcher", parent=self, object_name="launcher")
def set_rpc_timeout(self, timeout: float):
"""Set the timeout for RPC calls to the GUI server.
Args:
timeout(float): The timeout in seconds.
"""
if not isinstance(timeout, (int, float)) or timeout < 0:
raise ValueError("Timeout must be a non-negative number.")
self._rpc_timeout = timeout
def _safe_register_stream(self, endpoint: EndpointInfo, cb: Callable, **kwargs):
"""Check if already registered for registration in idempotent functions."""
if not self._client.connector.any_stream_is_registered(endpoint, cb=cb):
self._client.connector.register(endpoint, cb=cb, **kwargs)
def connect_to_gui_server(self, gui_id: str) -> None:
"""Connect to a GUI server"""
# Unregister the old callback
@@ -367,9 +242,10 @@ class BECGuiClient(RPCBase):
self._ipython_registry = {}
# Register the new callback
self._safe_register_stream(
self._client.connector.register(
MessageEndpoints.gui_registry_state(self._gui_id),
cb=self._handle_registry_update,
parent=self,
from_start=True,
)
@@ -385,150 +261,50 @@ class BECGuiClient(RPCBase):
def start(self, wait: bool = False) -> None:
"""Start the GUI server."""
logger.warning("Using <gui>.start() is deprecated, use <gui>.show() instead.")
return self._start(wait=wait)
def show(self, wait=True) -> None:
"""
Show the GUI window.
If the GUI server is not running, it will be started.
Args:
wait(bool): Whether to wait for the server to start. Defaults to True.
"""
def show(self):
"""Show the GUI window."""
if self._check_if_server_is_alive():
return self._show_all()
return self._start(wait=wait)
return self.start(wait=True)
def hide(self):
"""Hide the GUI window."""
return self._hide_all()
def raise_window(self, wait: bool = True) -> None:
"""
Bring GUI windows to the front.
If the GUI server is not running, it will be started.
Args:
wait(bool): Whether to wait for the server to start. Defaults to True.
"""
if self._check_if_server_is_alive():
return self._raise_all()
return self._start(wait=wait)
def change_theme(self, theme: Literal["light", "dark"] | None = None) -> None:
"""
Apply a GUI theme or toggle between dark and light.
Args:
theme(Literal["light", "dark"] | None): Theme to apply. If None, the current
theme is fetched from the GUI and toggled.
"""
if not self._check_if_server_is_alive():
self._start(wait=True)
with wait_for_server(self):
if theme is None:
current_theme = self.launcher._run_rpc("fetch_theme")
next_theme = "light" if current_theme == "dark" else "dark"
else:
next_theme = theme
self.launcher._run_rpc("change_theme", theme=next_theme)
def new(
self,
name: str | None = None,
wait: bool = True,
geometry: tuple[int, int, int, int] | None = None,
launch_script: str = "dock_area",
startup_profile: str | Literal["restore", "skip"] | None = None,
**kwargs,
) -> client.AdvancedDockArea:
) -> client.BECDockArea:
"""Create a new top-level dock area.
Args:
name(str, optional): The name of the dock area. Defaults to None.
wait(bool, optional): Whether to wait for the server to start. Defaults to True.
geometry(tuple[int, int, int, int] | None): The geometry of the dock area (pos_x, pos_y, w, h).
launch_script(str): The launch script to use. Defaults to "dock_area".
startup_profile(str | Literal["restore", "skip"] | None): Startup mode for
the dock area:
- None: start in transient empty workspace
- "restore": restore last-used profile
- "skip": skip profile initialization
- "<name>": load the named profile
**kwargs: Additional keyword arguments passed to the dock area.
geometry(tuple[int, int, int, int] | None): The geometry of the dock area (pos_x, pos_y, w, h)
Returns:
client.AdvancedDockArea: The new dock area.
Examples:
>>> gui.new() # Start with an empty unsaved workspace
>>> gui.new(startup_profile="restore") # Restore last profile
>>> gui.new(startup_profile="my_profile") # Load explicit profile
client.BECDockArea: The new dock area.
"""
if "profile" in kwargs or "start_empty" in kwargs:
raise TypeError(
"gui.new() no longer accepts 'profile' or 'start_empty'. Use 'startup_profile' instead."
)
if not self._check_if_server_is_alive():
self.show(wait=True)
self.start(wait=True)
if wait:
with wait_for_server(self):
return self._new_impl(
name=name,
geometry=geometry,
launch_script=launch_script,
startup_profile=startup_profile,
**kwargs,
)
return self._new_impl(
name=name,
geometry=geometry,
launch_script=launch_script,
startup_profile=startup_profile,
**kwargs,
)
def _new_impl(
self,
*,
name: str | None,
geometry: tuple[int, int, int, int] | None,
launch_script: str,
startup_profile: str | Literal["restore", "skip"] | None,
**kwargs,
):
if launch_script == "dock_area":
try:
return self.launcher._run_rpc(
"system.launch_dock_area",
name=name,
geometry=geometry,
startup_profile=startup_profile,
**kwargs,
)
except ValueError as exc:
error = str(exc)
if (
"Unknown system RPC method: system.launch_dock_area" not in error
and "has no attribute 'system.launch_dock_area'" not in error
):
raise
logger.debug("Server does not support system.launch_dock_area; using launcher RPC")
return self.launcher._run_rpc(
"launch",
launch_script=launch_script,
name=name,
geometry=geometry,
startup_profile=startup_profile,
**kwargs,
widget = self.launcher._run_rpc(
"launch", launch_script=launch_script, name=name, geometry=geometry, **kwargs
) # pylint: disable=protected-access
return widget
widget = self.launcher._run_rpc(
"launch", launch_script=launch_script, name=name, geometry=geometry, **kwargs
) # pylint: disable=protected-access
return widget
def delete(self, name: str) -> None:
"""Delete a dock area and its parent window.
"""Delete a dock area.
Args:
name(str): The name of the dock area.
@@ -536,19 +312,7 @@ class BECGuiClient(RPCBase):
widget = self.windows.get(name)
if widget is None:
raise ValueError(f"Dock area {name} not found.")
# Get the container_proxy (parent window) gui_id from the server registry
obj = self._server_registry.get(widget._gui_id)
if obj is None:
raise ValueError(f"Widget {name} not found in registry.")
container_gui_id = obj.get("container_proxy")
if container_gui_id:
# Close the container window which will also clean up the dock area
widget._run_rpc("close", gui_id=container_gui_id) # pylint: disable=protected-access
else:
# Fallback: just close the dock area directly
widget._run_rpc("close") # pylint: disable=protected-access
widget._run_rpc("close") # pylint: disable=protected-access
def delete_all(self) -> None:
"""Delete all dock areas."""
@@ -569,13 +333,11 @@ class BECGuiClient(RPCBase):
if self._process:
logger.success("Stopping GUI...")
if not self._request_server_shutdown():
_terminate_plot_process(self._process, logger)
_join_process_output_thread(
self._process, self._process_output_processing_thread, logger
)
self._process.terminate()
if self._process_output_processing_thread:
self._process_output_processing_thread.join()
self._process.wait()
self._process = None
self._process_output_processing_thread = None
# Unregister the registry state
self._client.connector.unregister(
@@ -594,37 +356,6 @@ class BECGuiClient(RPCBase):
#### Private methods ####
#########################
def _request_server_shutdown(self) -> bool:
if self._process is None or self._process.poll() is not None:
return True
process_details = f"pid={self._process.pid} command={self._process.args}"
logger.info(f"Requesting graceful GUI shutdown {process_details}")
try:
self.launcher._run_rpc( # pylint: disable=protected-access
"system.shutdown",
wait_for_rpc_response=True,
timeout=GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT,
)
except Exception as exc:
logger.warning(
"Could not confirm graceful GUI shutdown via RPC; "
"falling back to process termination."
)
logger.info(f"Graceful GUI shutdown RPC failure details: {exc}. {process_details}")
return False
if _wait_for_process_exit(self._process, GRACEFUL_SERVER_SHUTDOWN_TIMEOUT):
logger.info(f"GUI server exited after graceful shutdown {process_details}")
return True
logger.warning(
"GUI server did not exit after graceful shutdown request; "
"falling back to process termination."
)
logger.info(
f"Graceful GUI shutdown timeout details: {process_details}\n"
f"{_process_group_snapshot(self._process)}"
)
return False
def _check_if_server_is_alive(self):
"""Checks if the process is alive"""
if self._process is None:
@@ -637,8 +368,7 @@ class BECGuiClient(RPCBase):
timeout = 60
# Wait for 'bec' gui to be registered, this may take some time
# After 60s timeout. Should this raise an exception on timeout?
start = time.monotonic()
while time.monotonic() < start + timeout:
while time.time() < time.time() + timeout:
if len(list(self._server_registry.keys())) < 2 or not hasattr(
self, self._anchor_widget
):
@@ -652,9 +382,6 @@ class BECGuiClient(RPCBase):
"""
Start the GUI server, and execute callback when it is launched
"""
if self._gui_is_alive():
self._gui_started_event.set()
return
if self._process is None or self._process.poll() is not None:
logger.success("GUI starting...")
self._startup_timeout = 5
@@ -683,22 +410,28 @@ class BECGuiClient(RPCBase):
def _start(self, wait: bool = False) -> None:
self._killed = False
self._safe_register_stream(
MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update
self._client.connector.register(
MessageEndpoints.gui_registry_state(self._gui_id),
cb=self._handle_registry_update,
parent=self,
)
return self._start_server(wait=wait)
def _handle_registry_update(self, msg: dict[str, GUIRegistryStateMessage]) -> None:
@staticmethod
def _handle_registry_update(
msg: dict[str, GUIRegistryStateMessage], parent: BECGuiClient
) -> None:
# This was causing a deadlock during shutdown, not sure why.
# with self._lock:
self = parent
self._server_registry = cast(dict[str, RegistryState], msg["data"].state)
self._update_dynamic_namespace(self._server_registry)
def _do_show_all(self):
if self.launcher and len(self._top_level) == 0:
self.launcher._run_rpc("show") # pylint: disable=protected-access
rpc_client = RPCBase(gui_id=f"{self._gui_id}:launcher", parent=self)
rpc_client._run_rpc("show") # pylint: disable=protected-access
for window in self._top_level.values():
window.raise_window()
window.show()
def _show_all(self):
with wait_for_server(self):
@@ -706,24 +439,11 @@ class BECGuiClient(RPCBase):
def _hide_all(self):
with wait_for_server(self):
if self._killed:
return
self.launcher._run_rpc("hide")
for window in self._top_level.values():
window.hide()
def _do_raise_all(self):
"""Bring GUI windows to the front."""
if self.launcher and len(self._top_level) == 0:
self.launcher._run_rpc("raise") # pylint: disable=protected-access
for window in self._top_level.values():
window.raise_window()
def _raise_all(self):
with wait_for_server(self):
if self._killed:
return
return self._do_raise_all()
rpc_client = RPCBase(gui_id=f"{self._gui_id}:launcher", parent=self)
rpc_client._run_rpc("hide") # pylint: disable=protected-access
if not self._killed:
for window in self._top_level.values():
window.hide()
def _update_dynamic_namespace(self, server_registry: dict):
"""
@@ -804,7 +524,7 @@ if __name__ == "__main__": # pragma: no cover
# Test the client_utils.py module
gui = BECGuiClient()
gui.show(wait=True)
gui.start(wait=True)
gui.new().new(widget="Waveform")
time.sleep(10)
finally:
-166
View File
@@ -1,166 +0,0 @@
# This file was automatically generated by generate_cli.py
# type: ignore
from __future__ import annotations
# pylint: skip-file
designer_plugins = {
"AbortButton": ("bec_widgets.widgets.control.buttons.button_abort.button_abort", "AbortButton"),
"BECColorMapWidget": (
"bec_widgets.widgets.utility.visual.colormap_widget.colormap_widget",
"BECColorMapWidget",
),
"BECMainWindow": ("bec_widgets.widgets.containers.main_window.main_window", "BECMainWindow"),
"BECProgressBar": (
"bec_widgets.widgets.progress.bec_progressbar.bec_progressbar",
"BECProgressBar",
),
"BECQueue": ("bec_widgets.widgets.services.bec_queue.bec_queue", "BECQueue"),
"BECShell": ("bec_widgets.widgets.editors.bec_console.bec_console", "BECShell"),
"BECSpinBox": ("bec_widgets.widgets.utility.spinbox.decimal_spinbox", "BECSpinBox"),
"BECStatusBox": ("bec_widgets.widgets.services.bec_status_box.bec_status_box", "BECStatusBox"),
"BeamlineStateManager": (
"bec_widgets.widgets.services.beamline_states.beamline_state_manager",
"BeamlineStateManager",
),
"BecConsole": ("bec_widgets.widgets.editors.bec_console.bec_console", "BecConsole"),
"ColorButton": ("bec_widgets.widgets.utility.visual.color_button.color_button", "ColorButton"),
"ColorButtonNative": (
"bec_widgets.widgets.utility.visual.color_button_native.color_button_native",
"ColorButtonNative",
),
"ColormapSelector": (
"bec_widgets.widgets.utility.visual.colormap_selector.colormap_selector",
"ColormapSelector",
),
"DapComboBox": ("bec_widgets.widgets.dap.dap_combo_box.dap_combo_box", "DapComboBox"),
"DarkModeButton": (
"bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button",
"DarkModeButton",
),
"DeviceBrowser": (
"bec_widgets.widgets.services.device_browser.device_browser",
"DeviceBrowser",
),
"DeviceComboBox": (
"bec_widgets.widgets.control.device_input.device_combobox.device_combobox",
"DeviceComboBox",
),
"Heatmap": ("bec_widgets.widgets.plots.heatmap.heatmap", "Heatmap"),
"IDEExplorer": ("bec_widgets.widgets.utility.ide_explorer.ide_explorer", "IDEExplorer"),
"Image": ("bec_widgets.widgets.plots.image.image", "Image"),
"LMFitDialog": ("bec_widgets.widgets.dap.lmfit_dialog.lmfit_dialog", "LMFitDialog"),
"LogPanel": ("bec_widgets.widgets.utility.logpanel.logpanel", "LogPanel"),
"Minesweeper": ("bec_widgets.widgets.games.minesweeper", "Minesweeper"),
"MonacoWidget": ("bec_widgets.widgets.editors.monaco.monaco_widget", "MonacoWidget"),
"MotorMap": ("bec_widgets.widgets.plots.motor_map.motor_map", "MotorMap"),
"MultiWaveform": ("bec_widgets.widgets.plots.multi_waveform.multi_waveform", "MultiWaveform"),
"PdfViewerWidget": ("bec_widgets.widgets.utility.pdf_viewer.pdf_viewer", "PdfViewerWidget"),
"PositionIndicator": (
"bec_widgets.widgets.control.device_control.position_indicator.position_indicator",
"PositionIndicator",
),
"PositionerBox": (
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box.positioner_box",
"PositionerBox",
),
"PositionerBox2D": (
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box_2d.positioner_box_2d",
"PositionerBox2D",
),
"PositionerControlLine": (
"bec_widgets.widgets.control.device_control.positioner_box.positioner_control_line.positioner_control_line",
"PositionerControlLine",
),
"PositionerGroup": (
"bec_widgets.widgets.control.device_control.positioner_group.positioner_group",
"PositionerGroup",
),
"ResetButton": ("bec_widgets.widgets.control.buttons.button_reset.button_reset", "ResetButton"),
"ResumeButton": (
"bec_widgets.widgets.control.buttons.button_resume.button_resume",
"ResumeButton",
),
"RingProgressBar": (
"bec_widgets.widgets.progress.ring_progress_bar.ring_progress_bar",
"RingProgressBar",
),
"SBBMonitor": ("bec_widgets.widgets.editors.sbb_monitor.sbb_monitor", "SBBMonitor"),
"ScanControl": ("bec_widgets.widgets.control.scan_control.scan_control", "ScanControl"),
"ScanMetadata": ("bec_widgets.widgets.editors.scan_metadata.scan_metadata", "ScanMetadata"),
"ScanProgressBar": (
"bec_widgets.widgets.progress.scan_progressbar.scan_progressbar",
"ScanProgressBar",
),
"ScatterWaveform": (
"bec_widgets.widgets.plots.scatter_waveform.scatter_waveform",
"ScatterWaveform",
),
"SignalComboBox": (
"bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox",
"SignalComboBox",
),
"SignalLabel": ("bec_widgets.widgets.utility.signal_label.signal_label", "SignalLabel"),
"SpinnerWidget": ("bec_widgets.widgets.utility.spinner.spinner", "SpinnerWidget"),
"StopButton": ("bec_widgets.widgets.control.buttons.stop_button.stop_button", "StopButton"),
"TextBox": ("bec_widgets.widgets.editors.text_box.text_box", "TextBox"),
"ToggleSwitch": ("bec_widgets.widgets.utility.toggle.toggle", "ToggleSwitch"),
"Waveform": ("bec_widgets.widgets.plots.waveform.waveform", "Waveform"),
"WebsiteWidget": ("bec_widgets.widgets.editors.website.website", "WebsiteWidget"),
"WidgetFinderComboBox": (
"bec_widgets.widgets.utility.widget_finder.widget_finder",
"WidgetFinderComboBox",
),
}
widget_icons = {
"AbortButton": "cancel",
"BECColorMapWidget": "palette",
"BECMainWindow": "widgets",
"BECProgressBar": "page_control",
"BECQueue": "edit_note",
"BECShell": "hub",
"BECSpinBox": "123",
"BECStatusBox": "widgets",
"BeamlineStateManager": "format_list_bulleted",
"BecConsole": "terminal",
"ColorButton": "colors",
"ColorButtonNative": "colors",
"ColormapSelector": "palette",
"DapComboBox": "data_exploration",
"DarkModeButton": "dark_mode",
"DeviceBrowser": "lists",
"DeviceComboBox": "list_alt",
"Heatmap": "dataset",
"IDEExplorer": "widgets",
"Image": "image",
"LMFitDialog": "monitoring",
"LogPanel": "browse_activity",
"Minesweeper": "videogame_asset",
"MonacoWidget": "code",
"MotorMap": "my_location",
"MultiWaveform": "ssid_chart",
"PdfViewerWidget": "picture_as_pdf",
"PositionIndicator": "horizontal_distribute",
"PositionerBox": "switch_right",
"PositionerBox2D": "switch_right",
"PositionerControlLine": "switch_left",
"PositionerGroup": "grid_view",
"ResetButton": "restart_alt",
"ResumeButton": "resume",
"RingProgressBar": "track_changes",
"SBBMonitor": "train",
"ScanControl": "tune",
"ScanMetadata": "list_alt",
"ScanProgressBar": "timelapse",
"ScatterWaveform": "scatter_plot",
"SignalComboBox": "list_alt",
"SignalLabel": "scoreboard",
"SpinnerWidget": "progress_activity",
"StopButton": "dangerous",
"TextBox": "chat",
"ToggleSwitch": "toggle_on",
"Waveform": "show_chart",
"WebsiteWidget": "travel_explore",
"WidgetFinderComboBox": "frame_inspect",
}
@@ -7,22 +7,31 @@ import inspect
import os
import sys
from pathlib import Path
from typing import get_overloads
import black
import isort
from bec_lib.logger import bec_logger
from qtpy.QtCore import Property as QtProperty
from bec_widgets.utils.generate_designer_plugin import (
DesignerPluginGenerator,
DesignerPluginInfo,
plugin_filenames,
)
from bec_widgets.utils.generate_designer_plugin import DesignerPluginGenerator, plugin_filenames
from bec_widgets.utils.plugin_utils import BECClassContainer, get_custom_classes
logger = bec_logger.logger
if sys.version_info >= (3, 11):
from typing import get_overloads
else:
print(
"Python version is less than 3.11, using dummy function for get_overloads. "
"If you want to use the real function 'typing.get_overloads()', please use Python 3.11 or later."
)
def get_overloads(_obj):
"""
Dummy function for Python versions before 3.11.
"""
return []
class ClientGenerator:
def __init__(self, base=False):
@@ -44,8 +53,8 @@ from __future__ import annotations
{base_imports}
from bec_lib.logger import bec_logger
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call, rpc_timeout
{"from bec_widgets.utils.bec_plugin_helper import get_plugin_client_module" if self._base else ""}
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call
{"from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets, get_plugin_client_module" if self._base else ""}
logger = bec_logger.logger
@@ -85,7 +94,7 @@ logger = bec_logger.logger
if self._base:
self.content += """
class _WidgetsEnumType(str, enum.Enum):
\"\"\" Enum for the available widgets, to be generated programmatically \"\"\"
\"\"\" Enum for the available widgets, to be generated programatically \"\"\"
...
"""
@@ -102,19 +111,27 @@ _Widgets = {
self.content += """
try:
_plugin_widgets = get_all_plugin_widgets().as_dict()
plugin_client = get_plugin_client_module()
Widgets = _WidgetsEnumType("Widgets", {name: name for name in _plugin_widgets} | _Widgets)
if (_overlap := _Widgets.keys() & _plugin_widgets.keys()) != set():
for _widget in _overlap:
logger.warning(f"Detected duplicate widget {_widget} in plugin repo file: {inspect.getfile(_plugin_widgets[_widget])} !")
for plugin_name, plugin_class in inspect.getmembers(plugin_client, inspect.isclass):
if issubclass(plugin_class, RPCBase) and plugin_class is not RPCBase:
if plugin_name not in _Widgets:
_Widgets[plugin_name] = plugin_name
if plugin_name in globals():
conflicting_file = (
inspect.getfile(_plugin_widgets[plugin_name])
if plugin_name in _plugin_widgets
else f"{plugin_client}"
)
logger.warning(
f"Plugin widget {plugin_name} in {plugin_class._IMPORT_MODULE} conflicts with a built-in class!"
f"Plugin widget {plugin_name} from {conflicting_file} conflicts with a built-in class!"
)
continue
else:
if plugin_name not in _overlap:
globals()[plugin_name] = plugin_class
Widgets = _WidgetsEnumType("Widgets", _Widgets)
except ImportError as e:
logger.error(f"Failed loading plugins: \\n{reduce(add, traceback.format_exception(e))}")
"""
@@ -129,8 +146,12 @@ except ImportError as e:
class_name = cls.__name__
self.content += f"""
class {class_name}(RPCBase):\n"""
if class_name == "BECDockArea":
self.content += f"""
class {class_name}(RPCBase):"""
else:
self.content += f"""
class {class_name}(RPCBase):"""
if cls.__doc__:
# We only want the first line of the docstring
@@ -141,20 +162,25 @@ class {class_name}(RPCBase):\n"""
else:
class_docs = cls.__doc__.split("\n")[1]
self.content += f"""
\"\"\"{class_docs}\"\"\"\n"""
user_access_entries = self._get_user_access_entries(cls)
self.content += f' _IMPORT_MODULE="{cls.__module__}"\n'
for method_entry in user_access_entries:
method, obj, is_property_setter = self._resolve_method_object(cls, method_entry)
\"\"\"{class_docs}\"\"\"
"""
if not cls.USER_ACCESS:
self.content += """...
"""
for method in cls.USER_ACCESS:
is_property_setter = False
obj = getattr(cls, method, None)
if obj is None:
obj = getattr(cls, method.split(".setter")[0], None)
is_property_setter = True
method = method.split(".setter")[0]
if obj is None:
raise AttributeError(
f"Method {method} not found in class {cls.__name__}. "
f"Please check the USER_ACCESS list."
)
if hasattr(obj, "__rpc_timeout__"):
timeout = {"value": obj.__rpc_timeout__}
else:
timeout = {}
if isinstance(obj, (property, QtProperty)):
# for the cli, we can map qt properties to regular properties
if is_property_setter:
@@ -179,54 +205,14 @@ class {class_name}(RPCBase):\n"""
def {method}{str(sig_overload)}: ...
"""
self.content += f"""
{self._rpc_call(timeout)}"""
self.content += """
@rpc_call"""
self.content += f"""
def {method}{str(sig)}:
\"\"\"
{doc}
\"\"\""""
@staticmethod
def _get_user_access_entries(cls) -> list[str]:
entries = list(getattr(cls, "USER_ACCESS", []))
content_cls = getattr(cls, "RPC_CONTENT_CLASS", None)
if content_cls is not None:
entries.extend(getattr(content_cls, "USER_ACCESS", []))
return list(dict.fromkeys(entries))
@staticmethod
def _resolve_method_object(cls, method_entry: str):
method_name = method_entry
is_property_setter = False
if method_entry.endswith(".setter"):
is_property_setter = True
method_name = method_entry.split(".setter")[0]
candidate_classes = [cls]
content_cls = getattr(cls, "RPC_CONTENT_CLASS", None)
if content_cls is not None:
candidate_classes.append(content_cls)
for candidate_cls in candidate_classes:
obj = getattr(candidate_cls, method_name, None)
if obj is not None:
return method_name, obj, is_property_setter
return method_name, None, is_property_setter
def _rpc_call(self, timeout_info: dict[str, float | None]):
"""
Decorator to mark a method as an RPC call.
This is used to generate the client code for the method.
"""
if not timeout_info:
return "@rpc_call"
timeout = timeout_info.get("value", None)
return f"""
@rpc_timeout({timeout})
@rpc_call"""
def write(self, file_name: str):
"""
Write the content to a file, automatically formatted with black.
@@ -254,58 +240,6 @@ class {class_name}(RPCBase):\n"""
file.write(formatted_content)
def write_designer_plugins(plugin_infos: list[DesignerPluginInfo], file_name: str):
"""
Write a registry of Qt widget classes with designer plugins.
Args:
plugin_infos(list[DesignerPluginInfo]): The designer plugin metadata to write.
file_name(str): The name of the file to write to.
"""
plugin_infos = sorted(plugin_infos, key=lambda info: info.plugin_name_pascal)
content = """# This file was automatically generated by generate_cli.py
# type: ignore
from __future__ import annotations
# pylint: skip-file
designer_plugins = {
"""
for info in plugin_infos:
widget_module = info.plugin_class.__module__
widget_class = info.plugin_name_pascal
content += f' "{info.plugin_name_pascal}": ("{widget_module}", "{widget_class}"),\n'
content += """
}
widget_icons = {
"""
for info in plugin_infos:
content += f' "{info.plugin_name_pascal}": "{info.icon_name}",\n'
content += """
}
"""
try:
formatted_content = black.format_str(content, mode=black.Mode(line_length=100))
except black.NothingChanged:
formatted_content = content
config = isort.Config(
profile="black",
line_length=100,
multi_line_output=3,
include_trailing_comma=False,
known_first_party=["bec_widgets"],
)
formatted_content = isort.code(formatted_content, config=config)
with open(file_name, "w", encoding="utf-8") as file:
file.write(formatted_content)
def main():
"""
Main entry point for the script, controlled by command line arguments.
@@ -342,8 +276,7 @@ def main():
client_path = module_dir / client_subdir / "client.py"
packages = ("widgets", "applications") if module_name == "bec_widgets" else ("widgets",)
rpc_classes = get_custom_classes(module_name, packages=packages)
rpc_classes = get_custom_classes(module_name)
logger.info(f"Obtained classes with RPC objects: {rpc_classes!r}")
generator = ClientGenerator(base=module_name == "bec_widgets")
@@ -359,8 +292,6 @@ def main():
else:
non_overwrite_classes = []
designer_plugin_infos = []
for cls in rpc_classes.plugins:
logger.info(f"Writing bec-designer plugin files for {cls.__name__}...")
@@ -368,30 +299,21 @@ def main():
logger.error(
f"Not writing plugin files for {cls.__name__} because a built-in widget with that name exists"
)
continue
plugin = DesignerPluginGenerator(cls)
if not hasattr(plugin, "info") or plugin.excluded:
if not hasattr(plugin, "info"):
continue
def _exists(file: str):
return os.path.exists(os.path.join(plugin.info.base_path, file))
if any(_exists(file) for file in plugin_filenames(plugin.info.plugin_name_snake)):
if _exists(plugin.filenames.plugin):
designer_plugin_infos.append(plugin.info)
logger.debug(
f"Skipping generation of extra plugin files for {plugin.info.plugin_name_snake} - at least one file out of 'plugin.py', 'pyproject', and 'register_{plugin.info.plugin_name_snake}.py' already exists."
)
continue
plugin.run()
designer_plugin_infos.append(plugin.info)
# Write designer_plugins.py with plugin import metadata for all widgets with designer plugins.
designer_plugins_path = module_dir / client_subdir / "designer_plugins.py"
logger.info(f"Generating designer plugin registry at {designer_plugins_path}")
write_designer_plugins(designer_plugin_infos, str(designer_plugins_path))
if __name__ == "__main__": # pragma: no cover
+19 -109
View File
@@ -2,15 +2,12 @@ from __future__ import annotations
import inspect
import threading
import time
import uuid
from functools import wraps
from typing import TYPE_CHECKING, Any, cast
from bec_lib.client import BECClient
from bec_lib.device import DeviceBaseWithConfig
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
if TYPE_CHECKING: # pragma: no cover
@@ -26,46 +23,6 @@ else:
# pylint: disable=protected-access
_DEFAULT_RPC_TIMEOUT = object()
logger = bec_logger.logger
def _name_arg(arg):
if isinstance(arg, DeviceBaseWithConfig):
# if dev.<device> is passed to GUI, it passes full_name
if hasattr(arg, "full_name"):
return arg.full_name
elif hasattr(arg, "name"):
return arg.name
return arg
def _transform_args_kwargs(args, kwargs) -> tuple[tuple, dict]:
return tuple(_name_arg(arg) for arg in args), {k: _name_arg(v) for k, v in kwargs.items()}
def rpc_timeout(timeout):
"""
A decorator to set a timeout for an RPC call.
Args:
timeout: The timeout in seconds.
Returns:
The decorated function.
"""
def decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
if "timeout" not in kwargs:
kwargs["timeout"] = timeout
return func(self, *args, **kwargs)
return wrapper
return decorator
def rpc_call(func):
"""
@@ -90,7 +47,15 @@ def rpc_call(func):
return None # func(*args, **kwargs)
caller_frame = caller_frame.f_back
args, kwargs = _transform_args_kwargs(args, kwargs)
out = []
for arg in args:
if hasattr(arg, "name"):
arg = arg.name
out.append(arg)
args = tuple(out)
for key, val in kwargs.items():
if hasattr(val, "name"):
kwargs[key] = val.name
if not self._root._gui_is_alive():
raise RuntimeError("GUI is not alive")
return self._run_rpc(func.__name__, *args, **kwargs)
@@ -159,7 +124,6 @@ class RPCReference:
class RPCBase:
def __init__(
self,
gui_id: str | None = None,
@@ -208,21 +172,12 @@ class RPCBase:
parent = parent._parent
return parent # type: ignore
def raise_window(self):
"""Bring this widget (or its container) to the front."""
# Use explicit call to ensure action name is 'raise' (not 'raise_')
return self._run_rpc("raise")
def hide(self):
"""Hide this widget (or its container)."""
return self._run_rpc("hide")
def _run_rpc(
self,
method,
*args,
wait_for_rpc_response: bool = True,
timeout: float | None | object = _DEFAULT_RPC_TIMEOUT,
wait_for_rpc_response=True,
timeout=5,
gui_id: str | None = None,
**kwargs,
) -> Any:
@@ -233,22 +188,13 @@ class RPCBase:
method: The method to call.
args: The arguments to pass to the method.
wait_for_rpc_response: Whether to wait for the RPC response.
timeout: The timeout for the RPC response. If omitted, the client's default RPC
timeout is used. If explicitly set to None, wait indefinitely.
timeout: The timeout for the RPC response.
gui_id: The GUI ID to use for the RPC call. If None, the default GUI ID is used.
kwargs: The keyword arguments to pass to the method.
Returns:
The result of the RPC call.
"""
if timeout is _DEFAULT_RPC_TIMEOUT:
timeout = self._root._rpc_timeout
if method in ["show", "hide", "raise"] and gui_id is None:
obj = self._root._server_registry.get(self._gui_id)
if obj is None:
raise ValueError(f"Widget {self._gui_id} not found.")
gui_id = obj.get("container_proxy") # type: ignore
request_id = str(uuid.uuid4())
rpc_msg = messages.GUIInstructionMessage(
action=method,
@@ -261,42 +207,17 @@ class RPCBase:
self._rpc_response = None
self._msg_wait_event.clear()
self._client.connector.register(
MessageEndpoints.gui_instruction_response(request_id), cb=self._on_rpc_response
MessageEndpoints.gui_instruction_response(request_id),
cb=self._on_rpc_response,
parent=self,
)
target_gui_id = gui_id or self._gui_id
sent_at = time.time()
deadline = sent_at + timeout if timeout is not None else None
rpc_msg.metadata.update(
{
"method": method,
"receiver": receiver,
"target_gui_id": target_gui_id,
"object_name": self.object_name,
"wait_for_response": wait_for_rpc_response,
"timeout": timeout,
"sent_at": sent_at,
"deadline": deadline,
}
)
logger.info(
"Sending GUI RPC request "
f"request_id={request_id} method={method} receiver={receiver} "
f"target_gui_id={target_gui_id} object_name={self.object_name} "
f"wait_for_response={wait_for_rpc_response} timeout={timeout}"
)
self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg)
if wait_for_rpc_response:
try:
finished = self._msg_wait_event.wait(timeout)
if not finished:
logger.error(
"GUI RPC response timeout "
f"request_id={request_id} method={method} receiver={receiver} "
f"target_gui_id={target_gui_id} object_name={self.object_name} "
f"timeout={timeout}"
)
raise RPCResponseTimeoutError(request_id, timeout)
finally:
self._msg_wait_event.clear()
@@ -308,23 +229,17 @@ class RPCBase:
# the _on_rpc_response method
assert isinstance(self._rpc_response, messages.RequestResponseMessage)
logger.info(
"Received GUI RPC response "
f"request_id={request_id} method={method} receiver={receiver} "
f"target_gui_id={target_gui_id} object_name={self.object_name} "
f"accepted={self._rpc_response.accepted}"
)
if not self._rpc_response.accepted:
raise ValueError(self._rpc_response.message["error"])
msg_result = self._rpc_response.message.get("result")
self._rpc_response = None
return self._create_widget_from_msg_result(msg_result)
def _on_rpc_response(self, msg_obj: MessageObject) -> None:
@staticmethod
def _on_rpc_response(msg_obj: MessageObject, parent: RPCBase) -> None:
msg = cast(messages.RequestResponseMessage, msg_obj.value)
logger.debug(f"GUI RPC response callback received: {msg}")
self._rpc_response = msg
self._msg_wait_event.set()
parent._rpc_response = msg
parent._msg_wait_event.set()
def _create_widget_from_msg_result(self, msg_result):
if msg_result is None:
@@ -336,11 +251,6 @@ class RPCBase:
return {
key: self._create_widget_from_msg_result(val) for key, val in msg_result.items()
}
rpc_enabled = msg_result.get("__rpc__", True)
if rpc_enabled is False:
return None
msg_result = dict(msg_result)
cls = msg_result.pop("widget_class", None)
msg_result.pop("__rpc__", None)
@@ -5,13 +5,14 @@ from threading import RLock
from typing import TYPE_CHECKING, Callable
from weakref import WeakValueDictionary
import shiboken6 as shb
from bec_lib.logger import bec_logger
from qtpy.QtCore import QObject
if TYPE_CHECKING: # pragma: no cover
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.widgets.containers.dock.dock import BECDock
from bec_widgets.widgets.containers.dock.dock_area import BECDockArea
logger = bec_logger.logger
@@ -108,19 +109,11 @@ class RPCRegister:
dict: A dictionary containing all the registered RPC objects.
"""
with self._lock:
connections = {}
for gui_id, obj in self._rpc_register.items():
try:
if not shb.isValid(obj):
continue
connections[gui_id] = obj
except Exception as e:
logger.warning(f"Error checking validity of object {gui_id}: {e}")
continue
connections = dict(self._rpc_register)
return connections
def get_names_of_rpc_by_class_type(
self, cls: type[BECWidget] | type[BECConnector]
self, cls: type[BECWidget] | type[BECConnector] | type[BECDock] | type[BECDockArea]
) -> list[str]:
"""Get all the names of the widgets.
+56
View File
@@ -0,0 +1,56 @@
from __future__ import annotations
from bec_widgets.cli.client_utils import IGNORE_WIDGETS
from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.plugin_utils import get_custom_classes
class RPCWidgetHandler:
"""Handler class for creating widgets from RPC messages."""
def __init__(self):
self._widget_classes = None
@property
def widget_classes(self) -> dict[str, type[BECWidget]]:
"""
Get the available widget classes.
Returns:
dict: The available widget classes.
"""
if self._widget_classes is None:
self.update_available_widgets()
return self._widget_classes # type: ignore
def update_available_widgets(self):
"""
Update the available widgets.
Returns:
None
"""
self._widget_classes = (
get_custom_classes("bec_widgets") + get_all_plugin_widgets()
).as_dict(IGNORE_WIDGETS)
def create_widget(self, widget_type, **kwargs) -> BECWidget:
"""
Create a widget from an RPC message.
Args:
widget_type(str): The type of the widget.
name (str): The name of the widget.
**kwargs: The keyword arguments for the widget.
Returns:
widget(BECWidget): The created widget.
"""
widget_class = self.widget_classes.get(widget_type) # type: ignore
if widget_class:
return widget_class(**kwargs)
raise ValueError(f"Unknown widget type: {widget_type}")
widget_handler = RPCWidgetHandler()
@@ -5,23 +5,18 @@ import json
import os
import signal
import sys
import traceback
from contextlib import redirect_stderr, redirect_stdout
import darkdetect
import shiboken6
from bec_lib.logger import bec_logger
from bec_lib.service_config import ServiceConfig
from bec_qthemes import apply_theme
from qtmonaco.pylsp_provider import pylsp_server
from qtpy.QtCore import QSize, Qt
from qtpy.QtGui import QIcon
from qtpy.QtWidgets import QApplication
import bec_widgets
from bec_widgets.applications.launch_window import LaunchWindow
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.bec_dispatcher import BECDispatcher
from bec_widgets.utils.rpc_register import RPCRegister
logger = bec_logger.logger
@@ -64,7 +59,6 @@ class GUIServer:
self.app: QApplication | None = None
self.launcher_window: LaunchWindow | None = None
self.dispatcher: BECDispatcher | None = None
self._shutdown_started = False
def start(self):
"""
@@ -76,7 +70,6 @@ class GUIServer:
bec_logger._stderr_log_level = bec_logger.LOGLEVEL.ERROR
bec_logger._update_sinks()
bec_logger.disabled_modules = ["bec_lib.scan_items"]
with redirect_stdout(SimpleFileLikeFromLogOutputFunc(logger.info)): # type: ignore
with redirect_stderr(SimpleFileLikeFromLogOutputFunc(logger.error)): # type: ignore
self._run()
@@ -97,36 +90,38 @@ class GUIServer:
"""
Run the GUI server.
"""
logger.info("Starting GUIServer", repr(self))
self.app = QApplication(sys.argv)
if darkdetect.isDark():
apply_theme("dark")
else:
apply_theme("light")
self.app.setApplicationName("BEC")
self.app.gui_id = self.gui_id # type: ignore
self.app.gui_server = self # type: ignore # make server accessible from QApplication for getattr in widgets
self.setup_bec_icon()
service_config = self._get_service_config()
self.dispatcher = BECDispatcher(config=service_config, gui_id=self.gui_id)
# self.dispatcher.start_cli_server(gui_id=self.gui_id)
if self.gui_class:
self.launcher_window = LaunchWindow(
gui_id=f"{self.gui_id}:launcher",
launch_gui_class=self.gui_class,
launch_gui_id=self.gui_class_id,
)
else:
self.launcher_window = LaunchWindow(gui_id=f"{self.gui_id}:launcher")
self.launcher_window = LaunchWindow(gui_id=f"{self.gui_id}:launcher")
self.launcher_window.setAttribute(Qt.WA_ShowWithoutActivating) # type: ignore
self.app.aboutToQuit.connect(self.shutdown)
self.app.setQuitOnLastWindowClosed(True)
self.app.setQuitOnLastWindowClosed(False)
signal.signal(signal.SIGINT, self.request_shutdown)
signal.signal(signal.SIGTERM, self.request_shutdown)
if self.gui_class:
# If the server is started with a specific gui class, we launch it.
# This will automatically hide the launcher.
self.launcher_window.launch(self.gui_class, name=self.gui_class_id)
def sigint_handler(*args):
# display message, for people to let it terminate gracefully
print("Caught SIGINT, exiting")
# Widgets should be all closed.
with RPCRegister.delayed_broadcast():
for widget in QApplication.instance().topLevelWidgets(): # type: ignore
widget.close()
if self.app:
self.app.quit()
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
sys.exit(self.app.exec())
@@ -143,67 +138,13 @@ class GUIServer:
)
self.app.setWindowIcon(icon)
def request_shutdown(self, signum=None, _frame=None):
"""
Request Qt application shutdown from an RPC call or OS signal.
Cleanup itself is handled by ``shutdown()``, which is connected to
``QApplication.aboutToQuit``. Calling it directly here would run BEC/RPC
teardown before Qt has processed the widget close events.
"""
signal_name = signal.Signals(signum).name if signum is not None else "shutdown"
pid = os.getpid()
if self.app is None:
logger.info(f"Caught {signal_name}, shutting down GUI server pid={pid} without app")
self.shutdown()
return
widgets = [
f"{widget.__class__.__name__}(objectName={widget.objectName()!r})"
for widget in self.app.topLevelWidgets()
]
logger.info(
f"Caught {signal_name}, requesting GUI server shutdown pid={pid} "
f"top_level_widgets={widgets}"
)
with RPCRegister.delayed_broadcast():
for widget in self.app.topLevelWidgets():
widget.close()
self.app.quit()
@staticmethod
def _run_shutdown_step(step: str, callback):
try:
callback()
except Exception as exc:
logger.error(
f"GUIServer shutdown step failed pid={os.getpid()} step={step}: {exc}\n"
f"{traceback.format_exc()}"
)
def shutdown(self):
if self._shutdown_started:
return
self._shutdown_started = True
logger.info(f"Shutdown GUIServer pid={os.getpid()} {repr(self)}")
def close_launcher_window():
if self.launcher_window and shiboken6.isValid(self.launcher_window):
self.launcher_window.close()
self.launcher_window.deleteLater()
def stop_pylsp_server():
if pylsp_server.is_running():
pylsp_server.stop()
def stop_dispatcher():
if self.dispatcher:
self.dispatcher.stop_cli_server()
self.dispatcher.disconnect_all()
self._run_shutdown_step("close_launcher_window", close_launcher_window)
self._run_shutdown_step("stop_pylsp_server", stop_pylsp_server)
self._run_shutdown_step("stop_dispatcher", stop_dispatcher)
"""
Shutdown the GUI server.
"""
if self.dispatcher:
self.dispatcher.stop_cli_server()
self.dispatcher.disconnect_all()
def main():
@@ -0,0 +1,92 @@
import os
import sys
from qtpy.QtCore import QSize
from qtpy.QtGui import QActionGroup, QIcon
from qtpy.QtWidgets import QApplication, QMainWindow, QStyle
import bec_widgets
from bec_widgets.examples.general_app.web_links import BECWebLinksMixin
from bec_widgets.utils.colors import apply_theme
from bec_widgets.utils.ui_loader import UILoader
MODULE_PATH = os.path.dirname(bec_widgets.__file__)
class BECGeneralApp(QMainWindow):
def __init__(self, parent=None):
super(BECGeneralApp, self).__init__(parent)
ui_file_path = os.path.join(os.path.dirname(__file__), "general_app.ui")
self.load_ui(ui_file_path)
self.resize(1280, 720)
self.ini_ui()
def ini_ui(self):
self._setup_icons()
self._hook_menubar_docs()
self._hook_theme_bar()
def load_ui(self, ui_file):
loader = UILoader(self)
self.ui = loader.loader(ui_file)
self.setCentralWidget(self.ui)
def _hook_menubar_docs(self):
# BEC Docs
self.ui.action_BEC_docs.triggered.connect(BECWebLinksMixin.open_bec_docs)
# BEC Widgets Docs
self.ui.action_BEC_widgets_docs.triggered.connect(BECWebLinksMixin.open_bec_widgets_docs)
# Bug report
self.ui.action_bug_report.triggered.connect(BECWebLinksMixin.open_bec_bug_report)
def change_theme(self, theme):
apply_theme(theme)
def _setup_icons(self):
help_icon = QApplication.style().standardIcon(QStyle.SP_MessageBoxQuestion)
bug_icon = QApplication.style().standardIcon(QStyle.SP_MessageBoxInformation)
computer_icon = QIcon.fromTheme("computer")
widget_icon = QIcon(os.path.join(MODULE_PATH, "assets", "designer_icons", "dock_area.png"))
self.ui.action_BEC_docs.setIcon(help_icon)
self.ui.action_BEC_widgets_docs.setIcon(help_icon)
self.ui.action_bug_report.setIcon(bug_icon)
self.ui.central_tab.setTabIcon(0, widget_icon)
self.ui.central_tab.setTabIcon(1, computer_icon)
def _hook_theme_bar(self):
self.ui.action_light.setCheckable(True)
self.ui.action_dark.setCheckable(True)
# Create an action group to make sure only one can be checked at a time
theme_group = QActionGroup(self)
theme_group.addAction(self.ui.action_light)
theme_group.addAction(self.ui.action_dark)
theme_group.setExclusive(True)
# Connect the actions to the theme change method
self.ui.action_light.triggered.connect(lambda: self.change_theme("light"))
self.ui.action_dark.triggered.connect(lambda: self.change_theme("dark"))
self.ui.action_dark.trigger()
def main(): # pragma: no cover
app = QApplication(sys.argv)
icon = QIcon()
icon.addFile(
os.path.join(MODULE_PATH, "assets", "app_icons", "BEC-General-App.png"), size=QSize(48, 48)
)
app.setWindowIcon(icon)
main_window = BECGeneralApp()
main_window.show()
sys.exit(app.exec_())
if __name__ == "__main__": # pragma: no cover
main()

Some files were not shown because too many files have changed in this diff Show More