1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-11 11:10:53 +02:00

Compare commits

..

1 Commits

438 changed files with 5621 additions and 32659 deletions

View File

@@ -12,9 +12,6 @@ variables:
description: ophyd_devices branch
value: main
CHILD_PIPELINE_BRANCH: $CI_DEFAULT_BRANCH
CHECK_PKG_VERSIONS:
description: Whether to run additional tests against min/max/random selection of dependencies. Set to 1 for running.
value: 0
workflow:
rules:
@@ -34,9 +31,8 @@ include:
inputs:
stage: test
path: "."
pytest_args: "-v,--random-order,tests/unit_tests"
ignore_dep_group: "pyqt6"
pip_args: ".[dev,pyside6]"
pytest_args: "-v --random-order tests/"
exclude_packages: ""
# different stages in the pipeline
stages:
@@ -61,7 +57,6 @@ stages:
- pip install -e ./ophyd_devices
- pip install -e ./bec/bec_lib[dev]
- pip install -e ./bec/bec_ipython_client
- pip install -e ./bec/pytest_bec_e2e
.install-os-packages: &install-os-packages
- apt-get update
@@ -78,9 +73,9 @@ formatter:
stage: Formatter
needs: []
script:
- pip install bec_lib[dev]
- isort --check --diff --line-length=100 --profile=black --multi-line=3 --trailing-comma ./
- black --check --diff --color --line-length=100 --skip-magic-trailing-comma ./
- pip install black isort
- isort --check --diff ./
- black --check --diff --color ./
rules:
- if: $CI_PROJECT_PATH == "bec/bec_widgets"
@@ -148,7 +143,7 @@ tests:
- *clone-repos
- *install-os-packages
- *install-repos
- pip install -e .[dev,pyside6]
- pip install -e .[dev,pyqt6]
- coverage run --source=./bec_widgets -m pytest -v --junitxml=report.xml --maxfail=2 --random-order --full-trace ./tests/unit_tests
- coverage report
- coverage xml
@@ -172,6 +167,7 @@ test-matrix:
- "3.12"
QT_PCKG:
- "pyside6"
- "pyqt6"
stage: AdditionalTests
needs: []
@@ -197,13 +193,7 @@ end-2-end-conda:
script:
- *clone-repos
- *install-os-packages
- conda config --show-sources
- conda config --add channels conda-forge
- conda config --system --remove channels https://repo.anaconda.com/pkgs/main
- conda config --system --remove channels https://repo.anaconda.com/pkgs/r
- conda config --remove channels https://repo.anaconda.com/pkgs/main
- conda config --remove channels https://repo.anaconda.com/pkgs/r
- conda config --show-sources
- conda config --prepend channels conda-forge
- conda config --set channel_priority strict
- conda config --set always_yes yes --set changeps1 no
- conda create -q -n test-environment python=3.11
@@ -216,7 +206,7 @@ end-2-end-conda:
- cd ../
- pip install -e ./ophyd_devices
- pip install -e .[dev,pyside6]
- pip install -e .[dev,pyqt6]
- cd ./tests/end-2-end
- pytest -v --start-servers --flush-redis --random-order
@@ -233,7 +223,6 @@ end-2-end-conda:
- if: '$CI_PIPELINE_SOURCE == "parent_pipeline"'
- if: '$CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "main"'
- if: '$CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "production"'
- if: '$CI_MERGE_REQUEST_TARGET_BRANCH_NAME =~ /^pre_release.*$/'
semver:
stage: Deploy

File diff suppressed because it is too large Load Diff

View File

@@ -1,17 +1,12 @@
# BEC Widgets
**⚠️ Important Notice:**
🚨 **PyQt6 is no longer supported** due to incompatibilities with Qt Designer. Please use **PySide6** instead. 🚨
BEC Widgets is a GUI framework designed for interaction with [BEC (Beamline Experiment Control)](https://gitlab.psi.ch/bec/bec).
## Installation
Use the package manager [pip](https://pip.pypa.io/en/stable/) to install BEC Widgets:
```bash
pip install bec_widgets[pyside6]
pip install bec_widgets PyQt6
```
For development purposes, you can clone the repository and install the package locally in editable mode:
@@ -19,12 +14,22 @@ For development purposes, you can clone the repository and install the package l
```bash
git clone https://gitlab.psi.ch/bec/bec-widgets
cd bec_widgets
pip install -e .[dev,pyside6]
pip install -e .[dev,pyqt6]
```
BEC Widgets now **only supports PySide6**. Users must manually install PySide6 as no default Qt distribution is
specified.
BEC Widgets currently supports both Pyside6 and PyQt6, however, no default distribution is specified. As a result, users must install one of the supported
Python Qt distributions manually.
To select a specific Python Qt distribution, install the package with an additional tag:
```bash
pip install bec_widgets[pyqt6]
```
or
```bash
pip install bec_widgets[pyside6]
```
## Documentation
Documentation of BEC Widgets can be found [here](https://bec-widgets.readthedocs.io/en/latest/). The documentation of the BEC can be found [here](https://bec.readthedocs.io/en/latest/).
@@ -34,7 +39,7 @@ Documentation of BEC Widgets can be found [here](https://bec-widgets.readthedocs
All commits should use the Angular commit scheme:
> #### <a name="commit-header"></a>Angular Commit Message Header
>
>
> ```
> <type>(<scope>): <short summary>
> │ │ │
@@ -48,13 +53,13 @@ All commits should use the Angular commit scheme:
>
> └─⫸ Commit Type: build|ci|docs|feat|fix|perf|refactor|test
> ```
>
>
> The `<type>` and `<summary>` fields are mandatory, the `(<scope>)` field is optional.
> ##### Type
>
>
> Must be one of the following:
>
>
> * **build**: Changes that affect the build system or external dependencies (example scopes: gulp, broccoli, npm)
> * **ci**: Changes to our CI configuration files and scripts (examples: CircleCi, SauceLabs)
> * **docs**: Documentation only changes
@@ -66,5 +71,4 @@ All commits should use the Angular commit scheme:
## License
[BSD-3-Clause](https://choosealicense.com/licenses/bsd-3-clause/)
[BSD-3-Clause](https://choosealicense.com/licenses/bsd-3-clause/)

View File

@@ -5,31 +5,41 @@ It is a preliminary version of the GUI, which will be added to the main branch a
import os
from typing import Optional
from bec_lib.device import Positioner as BECPositioner
from bec_lib.device import Signal as BECSignal
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from qtpy.QtCore import QSize
from qtpy.QtCore import QSize, Signal
from qtpy.QtGui import QIcon
from qtpy.QtWidgets import QApplication
from qtpy.QtWidgets import (
QApplication,
QCheckBox,
QDoubleSpinBox,
QMainWindow,
QPushButton,
QSpinBox,
)
import bec_widgets
from bec_widgets.qt_utils.error_popups import SafeSlot as Slot
from bec_widgets.qt_utils.toolbar import WidgetAction
from bec_widgets.utils import UILoader
from bec_widgets.utils.bec_dispatcher import BECDispatcher
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.widgets.control.buttons.stop_button.stop_button import StopButton
from bec_widgets.widgets.control.device_control.positioner_group.positioner_group import (
PositionerGroup,
)
from bec_widgets.widgets.dap.lmfit_dialog.lmfit_dialog import LMFitDialog
from bec_widgets.widgets.plots.waveform.waveform_widget import BECWaveformWidget
from bec_widgets.widgets.progress.bec_progressbar.bec_progressbar import BECProgressBar
from bec_widgets.widgets.bec_progressbar.bec_progressbar import BECProgressBar
from bec_widgets.widgets.device_line_edit.device_line_edit import DeviceLineEdit
from bec_widgets.widgets.lmfit_dialog.lmfit_dialog import LMFitDialog
from bec_widgets.widgets.positioner_box.positioner_box import PositionerBox
from bec_widgets.widgets.positioner_group.positioner_group import PositionerGroup
from bec_widgets.widgets.stop_button.stop_button import StopButton
from bec_widgets.widgets.toggle.toggle import ToggleSwitch
from bec_widgets.widgets.waveform.waveform_widget import BECWaveformWidget
MODULE_PATH = os.path.dirname(bec_widgets.__file__)
logger = bec_logger.logger
# FIXME BECWaveFormWidget is gone, this app will not work until adapted to new Waveform
class Alignment1D:
"""Alignment GUI to perform 1D scans"""

View File

@@ -27,17 +27,25 @@ class AutoUpdates:
def __init__(self, gui: BECDockArea):
self.gui = gui
self._default_dock = None
self._default_fig = None
self.msg_queue = Queue()
self.auto_update_thread = None
self._shutdown_sentinel = object()
self.start()
def start(self):
"""
Start the auto update thread.
"""
self.auto_update_thread = threading.Thread(target=self.process_queue)
self.auto_update_thread.start()
def start_default_dock(self):
"""
Create a default dock for the auto updates.
"""
dock = self.gui.add_dock("default_figure")
dock.add_widget("BECFigure")
self.dock_name = "default_figure"
self._default_dock = self.gui.new(self.dock_name)
self._default_dock.new("BECFigure")
self._default_fig = self._default_dock.elements_list[0]
@staticmethod
def get_scan_info(msg) -> ScanInfo:
@@ -65,9 +73,15 @@ class AutoUpdates:
"""
Get the default figure from the GUI.
"""
return self._default_fig
dock = self.gui.panels.get(self.dock_name, [])
if not dock:
return None
widgets = dock.widget_list
if not widgets:
return None
return widgets[0]
def do_update(self, msg):
def run(self, msg):
"""
Run the update function if enabled.
"""
@@ -76,9 +90,20 @@ class AutoUpdates:
if msg.status != "open":
return
info = self.get_scan_info(msg)
return self.handler(info)
self.handler(info)
def get_selected_device(self, monitored_devices, selected_device):
def process_queue(self):
"""
Process the message queue.
"""
while True:
msg = self.msg_queue.get()
if msg is self._shutdown_sentinel:
break
self.run(msg)
@staticmethod
def get_selected_device(monitored_devices, selected_device):
"""
Get the selected device for the plot. If no device is selected, the first
device in the monitored devices list is selected.
@@ -95,11 +120,14 @@ class AutoUpdates:
Default update function.
"""
if info.scan_name == "line_scan" and info.scan_report_devices:
return self.simple_line_scan(info)
self.simple_line_scan(info)
return
if info.scan_name == "grid_scan" and info.scan_report_devices:
return self.simple_grid_scan(info)
self.simple_grid_scan(info)
return
if info.scan_report_devices:
return self.best_effort(info)
self.best_effort(info)
return
def simple_line_scan(self, info: ScanInfo) -> None:
"""
@@ -109,19 +137,12 @@ class AutoUpdates:
if not fig:
return
dev_x = info.scan_report_devices[0]
selected_device = yield self.gui.selected_device
dev_y = self.get_selected_device(info.monitored_devices, selected_device)
dev_y = self.get_selected_device(info.monitored_devices, self.gui.selected_device)
if not dev_y:
return
yield fig.clear_all()
yield fig.plot(
x_name=dev_x,
y_name=dev_y,
label=f"Scan {info.scan_number} - {dev_y}",
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=dev_y,
)
fig.clear_all()
plt = fig.plot(x_name=dev_x, y_name=dev_y, label=f"Scan {info.scan_number} - {dev_y}")
plt.set(title=f"Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
def simple_grid_scan(self, info: ScanInfo) -> None:
"""
@@ -132,18 +153,12 @@ class AutoUpdates:
return
dev_x = info.scan_report_devices[0]
dev_y = info.scan_report_devices[1]
selected_device = yield self.gui.selected_device
dev_z = self.get_selected_device(info.monitored_devices, selected_device)
yield fig.clear_all()
yield fig.plot(
x_name=dev_x,
y_name=dev_y,
z_name=dev_z,
label=f"Scan {info.scan_number} - {dev_z}",
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=dev_y,
dev_z = self.get_selected_device(info.monitored_devices, self.gui.selected_device)
fig.clear_all()
plt = fig.plot(
x_name=dev_x, y_name=dev_y, z_name=dev_z, label=f"Scan {info.scan_number} - {dev_z}"
)
plt.set(title=f"Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
def best_effort(self, info: ScanInfo) -> None:
"""
@@ -153,16 +168,17 @@ class AutoUpdates:
if not fig:
return
dev_x = info.scan_report_devices[0]
selected_device = yield self.gui.selected_device
dev_y = self.get_selected_device(info.monitored_devices, selected_device)
dev_y = self.get_selected_device(info.monitored_devices, self.gui.selected_device)
if not dev_y:
return
yield fig.clear_all()
yield fig.plot(
x_name=dev_x,
y_name=dev_y,
label=f"Scan {info.scan_number} - {dev_y}",
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=dev_y,
)
fig.clear_all()
plt = fig.plot(x_name=dev_x, y_name=dev_y, label=f"Scan {info.scan_number} - {dev_y}")
plt.set(title=f"Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
def shutdown(self):
"""
Shutdown the auto update thread.
"""
self.msg_queue.put(self._shutdown_sentinel)
if self.auto_update_thread:
self.auto_update_thread.join()

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,3 @@
"""Client utilities for the BEC GUI."""
from __future__ import annotations
import importlib
@@ -9,42 +7,59 @@ import os
import select
import subprocess
import threading
from contextlib import contextmanager
import time
import uuid
from functools import wraps
from typing import TYPE_CHECKING
from bec_lib.client import BECClient
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
from rich.console import Console
from rich.table import Table
from bec_lib.utils.import_utils import isinstance_based_on_class_name, lazy_import, lazy_import_from
import bec_widgets.cli.client as client
from bec_widgets.cli.auto_updates import AutoUpdates
from bec_widgets.cli.rpc.rpc_base import RPCBase
if TYPE_CHECKING:
from bec_lib import messages
from bec_lib.connector import MessageObject
from bec_lib.device import DeviceBase
from bec_lib.redis_connector import StreamMessage
else:
messages = lazy_import("bec_lib.messages")
MessageObject = lazy_import_from("bec_lib.connector", ("MessageObject",))
StreamMessage = lazy_import_from("bec_lib.redis_connector", ("StreamMessage",))
messages = lazy_import("bec_lib.messages")
# from bec_lib.connector import MessageObject
MessageObject = lazy_import_from("bec_lib.connector", ("MessageObject",))
BECDispatcher = lazy_import_from("bec_widgets.utils.bec_dispatcher", ("BECDispatcher",))
logger = bec_logger.logger
IGNORE_WIDGETS = ["BECDockArea", "BECDock"]
def rpc_call(func):
"""
A decorator for calling a function on the server.
def _filter_output(output: str) -> str:
Args:
func: The function to call.
Returns:
The result of the function call.
"""
Filter out the output from the process.
"""
if "IMKClient" in output:
# only relevant on macOS
# see https://discussions.apple.com/thread/255761734?sortBy=rank
return ""
return output
@wraps(func)
def wrapper(self, *args, **kwargs):
# we could rely on a strict type check here, but this is more flexible
# moreover, it would anyway crash for objects...
out = []
for arg in args:
if hasattr(arg, "name"):
arg = arg.name
out.append(arg)
args = tuple(out)
for key, val in kwargs.items():
if hasattr(val, "name"):
kwargs[key] = val.name
if not self.gui_is_alive():
raise RuntimeError("GUI is not alive")
return self._run_rpc(func.__name__, *args, **kwargs)
return wrapper
def _get_output(process, logger) -> None:
@@ -60,7 +75,6 @@ def _get_output(process, logger) -> None:
if stream in readylist:
buf.append(stream.read(4096))
output, _, remaining = "".join(buf).rpartition("\n")
output = _filter_output(output)
if output:
log_func[stream](output)
buf.clear()
@@ -69,9 +83,7 @@ def _get_output(process, logger) -> None:
logger.error(f"Error reading process output: {str(e)}")
def _start_plot_process(
gui_id: str, gui_class: type, gui_class_id: str, config: dict | str, logger=None
) -> tuple[subprocess.Popen[str], threading.Thread | None]:
def _start_plot_process(gui_id: str, gui_class: type, config: dict | str, logger=None) -> None:
"""
Start the plot in a new process.
@@ -80,20 +92,11 @@ def _start_plot_process(
process will not be captured.
"""
# pylint: disable=subprocess-run-check
command = [
"bec-gui-server",
"--id",
gui_id,
"--gui_class",
gui_class.__name__,
"--gui_class_id",
gui_class_id,
"--hide",
]
command = ["bec-gui-server", "--id", gui_id, "--gui_class", gui_class.__name__]
if config:
if isinstance(config, dict):
config = json.dumps(config)
command.extend(["--config", str(config)])
command.extend(["--config", config])
env_dict = os.environ.copy()
env_dict["PYTHONUNBUFFERED"] = "1"
@@ -123,117 +126,14 @@ def _start_plot_process(
return process, process_output_processing_thread
class RepeatTimer(threading.Timer):
"""RepeatTimer class."""
def run(self):
while not self.finished.wait(self.interval):
self.function(*self.args, **self.kwargs)
# pylint: disable=protected-access
@contextmanager
def wait_for_server(client: BECGuiClient):
"""Context manager to wait for the server to start."""
timeout = client._startup_timeout
if not timeout:
if client._gui_is_alive():
# there is hope, let's wait a bit
timeout = 1
else:
raise RuntimeError("GUI is not alive")
try:
if client._gui_started_event.wait(timeout=timeout):
client._gui_started_timer.cancel()
client._gui_started_timer.join()
else:
raise TimeoutError("Could not connect to GUI server")
finally:
# after initial waiting period, do not wait so much any more
# (only relevant if GUI didn't start)
client._startup_timeout = 0
yield
class WidgetNameSpace:
def __repr__(self):
console = Console()
table = Table(title="Available widgets for BEC CLI usage")
table.add_column("Widget Name", justify="left", style="magenta")
table.add_column("Description", justify="left")
for attr, value in self.__dict__.items():
docs = value.__doc__
docs = docs if docs else "No description available"
table.add_row(attr, docs)
console.print(table)
return f""
class AvailableWidgetsNamespace:
"""Namespace for available widgets in the BEC GUI."""
def __init__(self):
for widget in client.Widgets:
name = widget.value
if name in IGNORE_WIDGETS:
continue
setattr(self, name, name)
def __repr__(self):
console = Console()
table = Table(title="Available widgets for BEC CLI usage")
table.add_column("Widget Name", justify="left", style="magenta")
table.add_column("Description", justify="left")
for attr_name, _ in self.__dict__.items():
docs = getattr(client, attr_name).__doc__
docs = docs if docs else "No description available"
table.add_row(attr_name, docs if len(docs.strip()) > 0 else "No description available")
console.print(table)
return "" # f"<{self.__class__.__name__}>"
class BECDockArea(client.BECDockArea):
"""Extend the BECDockArea class and add namespaces to access widgets of docks."""
def __init__(self, gui_id=None, config=None, name=None, parent=None):
super().__init__(gui_id, config, name, parent)
# Add namespaces for DockArea
self.elements = WidgetNameSpace()
class BECGuiClient(RPCBase):
"""BEC GUI client class. Container for GUI applications within Python."""
_top_level: dict[str, BECDockArea] = {}
class BECGuiClientMixin:
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._default_dock_name = "bec"
self._auto_updates_enabled = True
self._auto_updates = None
self._killed = False
self._startup_timeout = 0
self._gui_started_timer = None
self._gui_started_event = threading.Event()
self._process = None
self._process_output_processing_thread = None
@property
def windows(self) -> dict:
"""Dictionary with dock areas in the GUI."""
return self._top_level
@property
def window_list(self) -> list:
"""List with dock areas in the GUI."""
return list(self._top_level.values())
# FIXME AUTO UPDATES
# @property
# def auto_updates(self):
# if self._auto_updates_enabled:
# with wait_for_server(self):
# return self._auto_updates
self.auto_updates = self._get_update_script()
self._target_endpoint = MessageEndpoints.scan_status()
self._selected_device = None
def _get_update_script(self) -> AutoUpdates | None:
eps = imd.entry_points(group="bec.widgets.auto_updates")
@@ -244,226 +144,197 @@ class BECGuiClient(RPCBase):
# if the module is not found, we skip it
if spec is None:
continue
return ep.load()(gui=self._top_level["main"])
return ep.load()(gui=self)
except Exception as e:
logger.error(f"Error loading auto update script from plugin: {str(e)}")
return None
# FIXME AUTO UPDATES
# @property
# def selected_device(self) -> str | None:
# """
# Selected device for the plot.
# """
# auto_update_config_ep = MessageEndpoints.gui_auto_update_config(self._gui_id)
# auto_update_config = self._client.connector.get(auto_update_config_ep)
# if auto_update_config:
# return auto_update_config.selected_device
# return None
# @selected_device.setter
# def selected_device(self, device: str | DeviceBase):
# if isinstance_based_on_class_name(device, "bec_lib.device.DeviceBase"):
# self._client.connector.set_and_publish(
# MessageEndpoints.gui_auto_update_config(self._gui_id),
# messages.GUIAutoUpdateConfigMessage(selected_device=device.name),
# )
# elif isinstance(device, str):
# self._client.connector.set_and_publish(
# MessageEndpoints.gui_auto_update_config(self._gui_id),
# messages.GUIAutoUpdateConfigMessage(selected_device=device),
# )
# else:
# raise ValueError("Device must be a string or a device object")
# FIXME AUTO UPDATES
# def _start_update_script(self) -> None:
# self._client.connector.register(MessageEndpoints.scan_status(), cb=self._handle_msg_update)
# def _handle_msg_update(self, msg: StreamMessage) -> None:
# if self.auto_updates is not None:
# # pylint: disable=protected-access
# return self._update_script_msg_parser(msg.value)
# def _update_script_msg_parser(self, msg: messages.BECMessage) -> None:
# if isinstance(msg, messages.ScanStatusMessage):
# if not self._gui_is_alive():
# return
# if self._auto_updates_enabled:
# return self.auto_updates.do_update(msg)
def _gui_post_startup(self):
# if self._auto_updates_enabled:
# if self._auto_updates is None:
# auto_updates = self._get_update_script()
# if auto_updates is None:
# AutoUpdates.create_default_dock = True
# AutoUpdates.enabled = True
# auto_updates = AutoUpdates(self._top_level["main"].widget)
# if auto_updates.create_default_dock:
# auto_updates.start_default_dock()
# self._start_update_script()
# self._auto_updates = auto_updates
self._top_level[self._default_dock_name] = BECDockArea(
gui_id=f"{self._default_dock_name}", name=self._default_dock_name, parent=self
)
self._do_show_all()
self._gui_started_event.set()
def _start_server(self, wait: bool = False) -> None:
@property
def selected_device(self):
"""
Start the GUI server, and execute callback when it is launched
Selected device for the plot.
"""
return self._selected_device
@selected_device.setter
def selected_device(self, device: str | DeviceBase):
if isinstance_based_on_class_name(device, "bec_lib.device.DeviceBase"):
self._selected_device = device.name
elif isinstance(device, str):
self._selected_device = device
else:
raise ValueError("Device must be a string or a device object")
def _start_update_script(self) -> None:
self._client.connector.register(
self._target_endpoint, cb=self._handle_msg_update, parent=self
)
@staticmethod
def _handle_msg_update(msg: MessageObject, parent: BECGuiClientMixin) -> None:
if parent.auto_updates is not None:
# pylint: disable=protected-access
parent._update_script_msg_parser(msg.value)
def _update_script_msg_parser(self, msg: messages.BECMessage) -> None:
if isinstance(msg, messages.ScanStatusMessage):
if not self.gui_is_alive():
return
self.auto_updates.msg_queue.put(msg)
def show(self) -> None:
"""
Show the figure.
"""
if self._process is None or self._process.poll() is not None:
logger.success("GUI starting...")
self._startup_timeout = 5
self._gui_started_event.clear()
self._start_update_script()
self._process, self._process_output_processing_thread = _start_plot_process(
self._gui_id,
self.__class__,
gui_class_id=self._default_dock_name,
config=self._client._service_config.config, # pylint: disable=protected-access
logger=logger,
self._gui_id, self.__class__, self._client._service_config.config, logger=logger
)
while not self.gui_is_alive():
print("Waiting for GUI to start...")
time.sleep(1)
logger.success(f"GUI started with id: {self._gui_id}")
def gui_started_callback(callback):
try:
if callable(callback):
callback()
finally:
threading.current_thread().cancel()
self._gui_started_timer = RepeatTimer(
0.5, lambda: self._gui_is_alive() and gui_started_callback(self._gui_post_startup)
)
self._gui_started_timer.start()
if wait:
self._gui_started_event.wait()
def _dump(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
return rpc_client._run_rpc("_dump")
def start(self, wait: bool = True) -> None:
"""Start the server and show the GUI window."""
return self._start_server(wait=wait)
def _do_show_all(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
rpc_client._run_rpc("show") # pylint: disable=protected-access
for window in self._top_level.values():
window.show()
def _show_all(self):
with wait_for_server(self):
return self._do_show_all()
def _hide_all(self):
with wait_for_server(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
rpc_client._run_rpc("hide") # pylint: disable=protected-access
# because of the registry callbacks, we may have
# dock areas that are already killed, but not yet
# removed from the registry state
if not self._killed:
for window in self._top_level.values():
window.hide()
def show(self):
"""Show the GUI window."""
if self._process is not None:
return self._show_all()
# backward compatibility: show() was also starting server
return self._start_server(wait=True)
def hide(self):
"""Hide the GUI window."""
return self._hide_all()
def new(
self,
name: str | None = None,
wait: bool = True,
geometry: tuple[int, int, int, int] | None = None,
) -> BECDockArea:
"""Create a new top-level dock area.
Args:
name(str, optional): The name of the dock area. Defaults to None.
wait(bool, optional): Whether to wait for the server to start. Defaults to True.
geometry(tuple[int, int, int, int] | None): The geometry of the dock area (pos_x, pos_y, w, h)
Returns:
BECDockArea: The new dock area.
def close(self) -> None:
"""
if len(self.window_list) == 0:
self.show()
if wait:
with wait_for_server(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
widget = rpc_client._run_rpc(
"new_dock_area", name, geometry
) # pylint: disable=protected-access
self._top_level[widget.widget_name] = widget
return widget
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
widget = rpc_client._run_rpc(
"new_dock_area", name, geometry
) # pylint: disable=protected-access
self._top_level[widget.widget_name] = widget
return widget
def delete(self, name: str) -> None:
"""Delete a dock area.
Args:
name(str): The name of the dock area.
Close the gui window.
"""
widget = self.windows.get(name)
if widget is None:
raise ValueError(f"Dock area {name} not found.")
widget._run_rpc("close") # pylint: disable=protected-access
def delete_all(self) -> None:
"""Delete all dock areas."""
for widget_name in self.windows.keys():
self.delete(widget_name)
def close(self):
"""Deprecated. Use kill_server() instead."""
# FIXME, deprecated in favor of kill, will be removed in the future
self.kill_server()
def kill_server(self) -> None:
"""Kill the GUI server."""
self._top_level.clear()
self._killed = True
if self._gui_started_timer is not None:
self._gui_started_timer.cancel()
self._gui_started_timer.join()
if self._process is None:
return
self._client.shutdown()
if self._process:
logger.success("Stopping GUI...")
self._process.terminate()
if self._process_output_processing_thread:
self._process_output_processing_thread.join()
self._process.wait()
self._process = None
if self.auto_updates is not None:
self.auto_updates.shutdown()
if __name__ == "__main__": # pragma: no cover
from bec_lib.client import BECClient
from bec_lib.service_config import ServiceConfig
class RPCResponseTimeoutError(Exception):
"""Exception raised when an RPC response is not received within the expected time."""
config = ServiceConfig()
client = BECClient(config)
client.start()
def __init__(self, request_id, timeout):
super().__init__(
f"RPC response not received within {timeout} seconds for request ID {request_id}"
)
# Test the client_utils.py module
gui = BECGuiClient()
gui.start()
print(gui.window_list)
class RPCBase:
def __init__(self, gui_id: str = None, config: dict = None, parent=None) -> None:
self._client = BECClient() # BECClient is a singleton; here, we simply get the instance
self._config = config if config is not None else {}
self._gui_id = gui_id if gui_id is not None else str(uuid.uuid4())[:5]
self._parent = parent
self._msg_wait_event = threading.Event()
self._rpc_response = None
super().__init__()
# print(f"RPCBase: {self._gui_id}")
def __repr__(self):
type_ = type(self)
qualname = type_.__qualname__
return f"<{qualname} object at {hex(id(self))}>"
@property
def _root(self):
"""
Get the root widget. This is the BECFigure widget that holds
the anchor gui_id.
"""
parent = self
# pylint: disable=protected-access
while parent._parent is not None:
parent = parent._parent
return parent
def _run_rpc(self, method, *args, wait_for_rpc_response=True, timeout=3, **kwargs):
"""
Run the RPC call.
Args:
method: The method to call.
args: The arguments to pass to the method.
wait_for_rpc_response: Whether to wait for the RPC response.
kwargs: The keyword arguments to pass to the method.
Returns:
The result of the RPC call.
"""
request_id = str(uuid.uuid4())
rpc_msg = messages.GUIInstructionMessage(
action=method,
parameter={"args": args, "kwargs": kwargs, "gui_id": self._gui_id},
metadata={"request_id": request_id},
)
# pylint: disable=protected-access
receiver = self._root._gui_id
if wait_for_rpc_response:
self._rpc_response = None
self._msg_wait_event.clear()
self._client.connector.register(
MessageEndpoints.gui_instruction_response(request_id),
cb=self._on_rpc_response,
parent=self,
)
self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg)
if wait_for_rpc_response:
try:
finished = self._msg_wait_event.wait(10)
if not finished:
raise RPCResponseTimeoutError(request_id, timeout)
finally:
self._msg_wait_event.clear()
self._client.connector.unregister(
MessageEndpoints.gui_instruction_response(request_id), cb=self._on_rpc_response
)
# get class name
if not self._rpc_response.accepted:
raise ValueError(self._rpc_response.message["error"])
msg_result = self._rpc_response.message.get("result")
self._rpc_response = None
return self._create_widget_from_msg_result(msg_result)
@staticmethod
def _on_rpc_response(msg: MessageObject, parent: RPCBase) -> None:
msg = msg.value
parent._msg_wait_event.set()
parent._rpc_response = msg
def _create_widget_from_msg_result(self, msg_result):
if msg_result is None:
return None
if isinstance(msg_result, list):
return [self._create_widget_from_msg_result(res) for res in msg_result]
if isinstance(msg_result, dict):
if "__rpc__" not in msg_result:
return {
key: self._create_widget_from_msg_result(val) for key, val in msg_result.items()
}
cls = msg_result.pop("widget_class", None)
msg_result.pop("__rpc__", None)
if not cls:
return msg_result
cls = getattr(client, cls)
# print(msg_result)
return cls(parent=self, **msg_result)
return msg_result
def gui_is_alive(self):
"""
Check if the GUI is alive.
"""
heart = self._client.connector.get(MessageEndpoints.gui_heartbeat(self._root._gui_id))
if heart is None:
return False
if heart.status == messages.BECStatus.RUNNING:
return True
return False

View File

@@ -11,7 +11,7 @@ import isort
from qtpy.QtCore import Property as QtProperty
from bec_widgets.utils.generate_designer_plugin import DesignerPluginGenerator
from bec_widgets.utils.plugin_utils import BECClassContainer, get_custom_classes
from bec_widgets.utils.plugin_utils import BECClassContainer, get_rpc_classes
if sys.version_info >= (3, 11):
from typing import get_overloads
@@ -35,7 +35,7 @@ from __future__ import annotations
import enum
from typing import Literal, Optional, overload
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call
from bec_widgets.cli.client_utils import RPCBase, rpc_call, BECGuiClientMixin
# pylint: skip-file"""
@@ -43,21 +43,14 @@ from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call
def generate_client(self, class_container: BECClassContainer):
"""
Generate the client for the published classes, skipping any classes
that have `RPC = False`.
Generate the client for the published classes.
Args:
class_container: The class container with the classes to generate the client for.
"""
# Filter out classes that explicitly have RPC=False
rpc_top_level_classes = [
cls for cls in class_container.rpc_top_level_classes if getattr(cls, "RPC", True)
]
rpc_top_level_classes = class_container.rpc_top_level_classes
rpc_top_level_classes.sort(key=lambda x: x.__name__)
connector_classes = [
cls for cls in class_container.connector_classes if getattr(cls, "RPC", True)
]
connector_classes = class_container.connector_classes
connector_classes.sort(key=lambda x: x.__name__)
self.write_client_enum(rpc_top_level_classes)
@@ -88,28 +81,16 @@ class Widgets(str, enum.Enum):
class_name = cls.__name__
if class_name == "BECDockArea":
# Generate the content
if cls.__name__ == "BECDockArea":
self.content += f"""
class {class_name}(RPCBase):"""
class {class_name}(RPCBase, BECGuiClientMixin):"""
else:
self.content += f"""
class {class_name}(RPCBase):"""
if cls.__doc__:
# We only want the first line of the docstring
# But skip the first line if it's a blank line
first_line = cls.__doc__.split("\n")[0]
if first_line:
class_docs = first_line
else:
class_docs = cls.__doc__.split("\n")[1]
self.content += f"""
\"\"\"{class_docs}\"\"\"
"""
if not cls.USER_ACCESS:
self.content += """...
"""
for method in cls.USER_ACCESS:
is_property_setter = False
obj = getattr(cls, method, None)
@@ -119,10 +100,8 @@ class {class_name}(RPCBase):"""
method = method.split(".setter")[0]
if obj is None:
raise AttributeError(
f"Method {method} not found in class {cls.__name__}. "
f"Please check the USER_ACCESS list."
f"Method {method} not found in class {cls.__name__}. Please check the USER_ACCESS list."
)
if isinstance(obj, (property, QtProperty)):
# for the cli, we can map qt properties to regular properties
if is_property_setter:
@@ -196,7 +175,7 @@ def main():
current_path = os.path.dirname(__file__)
client_path = os.path.join(current_path, "client.py")
rpc_classes = get_custom_classes("bec_widgets")
rpc_classes = get_rpc_classes("bec_widgets")
generator = ClientGenerator()
generator.generate_client(rpc_classes)

View File

@@ -1,197 +0,0 @@
from __future__ import annotations
import threading
import uuid
from functools import wraps
from typing import TYPE_CHECKING, Any, cast
from bec_lib.client import BECClient
from bec_lib.endpoints import MessageEndpoints
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
import bec_widgets.cli.client as client
if TYPE_CHECKING:
from bec_lib import messages
from bec_lib.connector import MessageObject
else:
messages = lazy_import("bec_lib.messages")
# from bec_lib.connector import MessageObject
MessageObject = lazy_import_from("bec_lib.connector", ("MessageObject",))
def rpc_call(func):
"""
A decorator for calling a function on the server.
Args:
func: The function to call.
Returns:
The result of the function call.
"""
@wraps(func)
def wrapper(self, *args, **kwargs):
# we could rely on a strict type check here, but this is more flexible
# moreover, it would anyway crash for objects...
out = []
for arg in args:
if hasattr(arg, "name"):
arg = arg.name
out.append(arg)
args = tuple(out)
for key, val in kwargs.items():
if hasattr(val, "name"):
kwargs[key] = val.name
if not self._root._gui_is_alive():
raise RuntimeError("GUI is not alive")
return self._run_rpc(func.__name__, *args, **kwargs)
return wrapper
class RPCResponseTimeoutError(Exception):
"""Exception raised when an RPC response is not received within the expected time."""
def __init__(self, request_id, timeout):
super().__init__(
f"RPC response not received within {timeout} seconds for request ID {request_id}"
)
class RPCBase:
def __init__(
self,
gui_id: str | None = None,
config: dict | None = None,
name: str | None = None,
parent=None,
) -> None:
self._client = BECClient() # BECClient is a singleton; here, we simply get the instance
self._config = config if config is not None else {}
self._gui_id = gui_id if gui_id is not None else str(uuid.uuid4())[:5]
self._name = name if name is not None else str(uuid.uuid4())[:5]
self._parent = parent
self._msg_wait_event = threading.Event()
self._rpc_response = None
super().__init__()
# print(f"RPCBase: {self._gui_id}")
def __repr__(self):
type_ = type(self)
qualname = type_.__qualname__
return f"<{qualname} with name: {self.widget_name}>"
def remove(self):
"""
Remove the widget.
"""
self._run_rpc("remove")
@property
def widget_name(self):
"""
Get the widget name.
"""
return self._name
@property
def _root(self):
"""
Get the root widget. This is the BECFigure widget that holds
the anchor gui_id.
"""
parent = self
# pylint: disable=protected-access
while parent._parent is not None:
parent = parent._parent
return parent
def _run_rpc(self, method, *args, wait_for_rpc_response=True, timeout=3, **kwargs) -> Any:
"""
Run the RPC call.
Args:
method: The method to call.
args: The arguments to pass to the method.
wait_for_rpc_response: Whether to wait for the RPC response.
kwargs: The keyword arguments to pass to the method.
Returns:
The result of the RPC call.
"""
request_id = str(uuid.uuid4())
rpc_msg = messages.GUIInstructionMessage(
action=method,
parameter={"args": args, "kwargs": kwargs, "gui_id": self._gui_id},
metadata={"request_id": request_id},
)
# pylint: disable=protected-access
receiver = self._root._gui_id
if wait_for_rpc_response:
self._rpc_response = None
self._msg_wait_event.clear()
self._client.connector.register(
MessageEndpoints.gui_instruction_response(request_id),
cb=self._on_rpc_response,
parent=self,
)
self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg)
if wait_for_rpc_response:
try:
finished = self._msg_wait_event.wait(timeout)
if not finished:
raise RPCResponseTimeoutError(request_id, timeout)
finally:
self._msg_wait_event.clear()
self._client.connector.unregister(
MessageEndpoints.gui_instruction_response(request_id), cb=self._on_rpc_response
)
# get class name
if not self._rpc_response.accepted:
raise ValueError(self._rpc_response.message["error"])
msg_result = self._rpc_response.message.get("result")
self._rpc_response = None
return self._create_widget_from_msg_result(msg_result)
@staticmethod
def _on_rpc_response(msg: MessageObject, parent: RPCBase) -> None:
msg = msg.value
parent._msg_wait_event.set()
parent._rpc_response = msg
def _create_widget_from_msg_result(self, msg_result):
if msg_result is None:
return None
if isinstance(msg_result, list):
return [self._create_widget_from_msg_result(res) for res in msg_result]
if isinstance(msg_result, dict):
if "__rpc__" not in msg_result:
return {
key: self._create_widget_from_msg_result(val) for key, val in msg_result.items()
}
cls = msg_result.pop("widget_class", None)
msg_result.pop("__rpc__", None)
if not cls:
return msg_result
cls = getattr(client, cls)
# print(msg_result)
return cls(parent=self, **msg_result)
return msg_result
def _gui_is_alive(self):
"""
Check if the GUI is alive.
"""
heart = self._client.connector.get(MessageEndpoints.gui_heartbeat(self._root._gui_id))
if heart is None:
return False
if heart.status == messages.BECStatus.RUNNING:
return True
return False

View File

@@ -1,21 +1,8 @@
from __future__ import annotations
from functools import wraps
from threading import Lock
from typing import TYPE_CHECKING, Callable
from weakref import WeakValueDictionary
from bec_lib.logger import bec_logger
from qtpy.QtCore import QObject
if TYPE_CHECKING: # pragma: no cover
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.widgets.containers.dock.dock import BECDock
from bec_widgets.widgets.containers.dock.dock_area import BECDockArea
logger = bec_logger.logger
class RPCRegister:
"""
@@ -60,7 +47,7 @@ class RPCRegister:
raise ValueError(f"RPC object {rpc} must have a 'gui_id' attribute.")
self._rpc_register.pop(rpc.gui_id, None)
def get_rpc_by_id(self, gui_id: str) -> QObject | None:
def get_rpc_by_id(self, gui_id: str) -> QObject:
"""
Get an RPC object by its ID.
@@ -68,7 +55,7 @@ class RPCRegister:
gui_id(str): The ID of the RPC object to be retrieved.
Returns:
QObject | None: The RPC object with the given ID or None
QObject: The RPC object with the given ID.
"""
rpc_object = self._rpc_register.get(gui_id, None)
return rpc_object
@@ -84,19 +71,6 @@ class RPCRegister:
connections = dict(self._rpc_register)
return connections
def get_names_of_rpc_by_class_type(
self, cls: BECWidget | BECConnector | BECDock | BECDockArea
) -> list[str]:
"""Get all the names of the widgets.
Args:
cls(BECWidget | BECConnector): The class of the RPC object to be retrieved.
"""
# This retrieves any rpc objects that are subclass of BECWidget,
# i.e. curve and image items are excluded
widgets = [rpc for rpc in self._rpc_register.values() if isinstance(rpc, cls)]
return [widget._name for widget in widgets]
@classmethod
def reset_singleton(cls):
"""

View File

@@ -1,9 +1,4 @@
from __future__ import annotations
from typing import Any
from bec_widgets.cli.client_utils import IGNORE_WIDGETS
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils import BECConnector
class RPCWidgetHandler:
@@ -13,7 +8,7 @@ class RPCWidgetHandler:
self._widget_classes = None
@property
def widget_classes(self) -> dict[str, type[BECWidget]]:
def widget_classes(self):
"""
Get the available widget classes.
@@ -22,7 +17,7 @@ class RPCWidgetHandler:
"""
if self._widget_classes is None:
self.update_available_widgets()
return self._widget_classes # type: ignore
return self._widget_classes
def update_available_widgets(self):
"""
@@ -31,28 +26,27 @@ class RPCWidgetHandler:
Returns:
None
"""
from bec_widgets.utils.plugin_utils import get_custom_classes
from bec_widgets.utils.plugin_utils import get_rpc_classes
clss = get_custom_classes("bec_widgets")
self._widget_classes = {
cls.__name__: cls for cls in clss.widgets if cls.__name__ not in IGNORE_WIDGETS
}
clss = get_rpc_classes("bec_widgets")
self._widget_classes = {cls.__name__: cls for cls in clss.top_level_classes}
def create_widget(self, widget_type, name: str | None = None, **kwargs) -> BECWidget:
def create_widget(self, widget_type, **kwargs) -> BECConnector:
"""
Create a widget from an RPC message.
Args:
widget_type(str): The type of the widget.
name (str): The name of the widget.
**kwargs: The keyword arguments for the widget.
Returns:
widget(BECWidget): The created widget.
widget(BECConnector): The created widget.
"""
widget_class = self.widget_classes.get(widget_type) # type: ignore
if self._widget_classes is None:
self.update_available_widgets()
widget_class = self._widget_classes.get(widget_type)
if widget_class:
return widget_class(name=name, **kwargs)
return widget_class(**kwargs)
raise ValueError(f"Unknown widget type: {widget_type}")

View File

@@ -1,56 +1,29 @@
from __future__ import annotations
import functools
import inspect
import json
import signal
import sys
import types
from contextlib import contextmanager, redirect_stderr, redirect_stdout
from contextlib import redirect_stderr, redirect_stdout
from typing import Union
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.service_config import ServiceConfig
from bec_lib.utils.import_utils import lazy_import
from qtpy.QtCore import Qt, QTimer
from redis.exceptions import RedisError
from qtpy.QtCore import QTimer
from bec_widgets.cli.rpc import rpc_register
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.qt_utils.error_popups import ErrorPopupUtility
from bec_widgets.cli.rpc_register import RPCRegister
from bec_widgets.utils import BECDispatcher
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.widgets.containers.dock import BECDockArea
from bec_widgets.widgets.containers.figure import BECFigure
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow
from bec_widgets.utils.bec_dispatcher import QtRedisConnector
from bec_widgets.widgets.dock.dock_area import BECDockArea
from bec_widgets.widgets.figure import BECFigure
messages = lazy_import("bec_lib.messages")
logger = bec_logger.logger
@contextmanager
def rpc_exception_hook(err_func):
"""This context replaces the popup message box for error display with a specific hook"""
# get error popup utility singleton
popup = ErrorPopupUtility()
# save current setting
old_exception_hook = popup.custom_exception_hook
# install err_func, if it is a callable
# IMPORTANT, Keep self here, because this method is overwriting the custom_exception_hook
# of the ErrorPopupUtility (popup instance) class.
def custom_exception_hook(self, exc_type, value, tb, **kwargs):
err_func({"error": popup.get_error_message(exc_type, value, tb)})
popup.custom_exception_hook = types.MethodType(custom_exception_hook, popup)
try:
yield popup
finally:
# restore state of error popup utility singleton
popup.custom_exception_hook = old_exception_hook
class BECWidgetsCLIServer:
def __init__(
@@ -59,18 +32,16 @@ class BECWidgetsCLIServer:
dispatcher: BECDispatcher = None,
client=None,
config=None,
gui_class: Union[BECFigure, BECDockArea] = BECDockArea,
gui_class_id: str = "bec",
gui_class: Union[BECFigure, BECDockArea] = BECFigure,
) -> None:
self.status = messages.BECStatus.BUSY
self.dispatcher = BECDispatcher(config=config) if dispatcher is None else dispatcher
self.client = self.dispatcher.client if client is None else client
self.client.start()
self.gui_id = gui_id
# register broadcast callback
self.gui = gui_class(gui_id=self.gui_id)
self.rpc_register = RPCRegister()
self.gui = gui_class(parent=None, name=gui_class_id, gui_id=gui_class_id)
# self.rpc_register.add_rpc(self.gui)
self.rpc_register.add_rpc(self.gui)
self.dispatcher.connect_slot(
self.on_rpc_update, MessageEndpoints.gui_instructions(self.gui_id)
@@ -83,24 +54,22 @@ class BECWidgetsCLIServer:
self.status = messages.BECStatus.RUNNING
logger.success(f"Server started with gui_id: {self.gui_id}")
# Create initial object -> BECFigure or BECDockArea
def on_rpc_update(self, msg: dict, metadata: dict):
request_id = metadata.get("request_id")
logger.debug(f"Received RPC instruction: {msg}, metadata: {metadata}")
with rpc_exception_hook(functools.partial(self.send_response, request_id, False)):
try:
obj = self.get_object_from_config(msg["parameter"])
method = msg["action"]
args = msg["parameter"].get("args", [])
kwargs = msg["parameter"].get("kwargs", {})
res = self.run_rpc(obj, method, args, kwargs)
except Exception as e:
logger.error(f"Error while executing RPC instruction: {e}")
self.send_response(request_id, False, {"error": str(e)})
else:
logger.debug(f"RPC instruction executed successfully: {res}")
self.send_response(request_id, True, {"result": res})
try:
obj = self.get_object_from_config(msg["parameter"])
method = msg["action"]
args = msg["parameter"].get("args", [])
kwargs = msg["parameter"].get("kwargs", {})
res = self.run_rpc(obj, method, args, kwargs)
except Exception as e:
logger.error(f"Error while executing RPC instruction: {e}")
self.send_response(request_id, False, {"error": str(e)})
else:
logger.debug(f"RPC instruction executed successfully: {res}")
self.send_response(request_id, True, {"result": res})
def send_response(self, request_id: str, accepted: bool, msg: dict):
self.client.connector.set_and_publish(
@@ -127,7 +96,11 @@ class BECWidgetsCLIServer:
setattr(obj, method, args[0])
res = None
else:
res = method_obj(*args, **kwargs)
sig = inspect.signature(method_obj)
if sig.parameters:
res = method_obj(*args, **kwargs)
else:
res = method_obj()
if isinstance(res, list):
res = [self.serialize_object(obj) for obj in res]
@@ -141,9 +114,6 @@ class BECWidgetsCLIServer:
if isinstance(obj, BECConnector):
return {
"gui_id": obj.gui_id,
"name": (
obj._name if hasattr(obj, "_name") else obj.__class__.__name__
), # pylint: disable=protected-access
"widget_class": obj.__class__.__name__,
"config": obj.config.model_dump(),
"__rpc__": True,
@@ -152,14 +122,11 @@ class BECWidgetsCLIServer:
def emit_heartbeat(self):
logger.trace(f"Emitting heartbeat for {self.gui_id}")
try:
self.client.connector.set(
MessageEndpoints.gui_heartbeat(self.gui_id),
messages.StatusMessage(name=self.gui_id, status=self.status, info={}),
expire=10,
)
except RedisError as exc:
logger.error(f"Error while emitting heartbeat: {exc}")
self.client.connector.set(
MessageEndpoints.gui_heartbeat(self.gui_id),
messages.StatusMessage(name=self.gui_id, status=self.status, info={}),
expire=10,
)
def shutdown(self): # TODO not sure if needed when cleanup is done at level of BECConnector
logger.info(f"Shutting down server with gui_id: {self.gui_id}")
@@ -188,12 +155,7 @@ class SimpleFileLikeFromLogOutputFunc:
return
def _start_server(
gui_id: str,
gui_class: Union[BECFigure, BECDockArea],
gui_class_id: str = "bec",
config: str | None = None,
):
def _start_server(gui_id: str, gui_class: Union[BECFigure, BECDockArea], config: str | None = None):
if config:
try:
config = json.loads(config)
@@ -210,9 +172,7 @@ def _start_server(
# service_name="BECWidgetsCLIServer",
# service_config=service_config.service_config,
# )
server = BECWidgetsCLIServer(
gui_id=gui_id, config=service_config, gui_class=gui_class, gui_class_id=gui_class_id
)
server = BECWidgetsCLIServer(gui_id=gui_id, config=service_config, gui_class=gui_class)
return server
@@ -222,52 +182,42 @@ def main():
from qtpy.QtCore import QSize
from qtpy.QtGui import QIcon
from qtpy.QtWidgets import QApplication
from qtpy.QtWidgets import QApplication, QMainWindow
import bec_widgets
bec_logger.level = bec_logger.LOGLEVEL.DEBUG
if __name__ != "__main__":
# if not running as main, set the log level to critical
# pylint: disable=protected-access
bec_logger._stderr_log_level = bec_logger.LOGLEVEL.CRITICAL
parser = argparse.ArgumentParser(description="BEC Widgets CLI Server")
parser.add_argument("--id", type=str, default="test", help="The id of the server")
parser.add_argument("--id", type=str, help="The id of the server")
parser.add_argument(
"--gui_class",
type=str,
help="Name of the gui class to be rendered. Possible values: \n- BECFigure\n- BECDockArea",
)
parser.add_argument(
"--gui_class_id",
type=str,
default="bec",
help="The id of the gui class that is added to the QApplication",
)
parser.add_argument("--config", type=str, help="Config file or config string.")
parser.add_argument("--hide", action="store_true", help="Hide on startup")
args = parser.parse_args()
bec_logger.level = bec_logger.LOGLEVEL.INFO
if args.hide:
# pylint: disable=protected-access
bec_logger._stderr_log_level = bec_logger.LOGLEVEL.ERROR
bec_logger._update_sinks()
if args.gui_class == "BECDockArea":
gui_class = BECDockArea
elif args.gui_class == "BECFigure":
if args.gui_class == "BECFigure":
gui_class = BECFigure
elif args.gui_class == "BECDockArea":
gui_class = BECDockArea
else:
print(
"Please specify a valid gui_class to run. Use -h for help."
"\n Starting with default gui_class BECFigure."
)
gui_class = BECDockArea
gui_class = BECFigure
with redirect_stdout(SimpleFileLikeFromLogOutputFunc(logger.info)):
with redirect_stderr(SimpleFileLikeFromLogOutputFunc(logger.error)):
app = QApplication(sys.argv)
# set close on last window, only if not under control of client ;
# indeed, Qt considers a hidden window a closed window, so if all windows
# are hidden by default it exits
app.setQuitOnLastWindowClosed(not args.hide)
app.setApplicationName("BEC Figure")
module_path = os.path.dirname(bec_widgets.__file__)
icon = QIcon()
icon.addFile(
@@ -275,33 +225,22 @@ def main():
size=QSize(48, 48),
)
app.setWindowIcon(icon)
# store gui id within QApplication object, to make it available to all widgets
app.gui_id = args.id
# args.id = "abff6"
server = _start_server(args.id, gui_class, args.gui_class_id, args.config)
win = QMainWindow()
win.setWindowTitle("BEC Widgets")
win = BECMainWindow(gui_id=f"{server.gui_id}:window")
win.setAttribute(Qt.WA_ShowWithoutActivating)
win.setWindowTitle("BEC")
server = _start_server(args.id, gui_class, args.config)
RPCRegister().add_rpc(win)
gui = server.gui
win.setCentralWidget(gui)
if not args.hide:
win.show()
win.resize(800, 600)
win.show()
app.aboutToQuit.connect(server.shutdown)
def sigint_handler(*args):
# display message, for people to let it terminate gracefully
print("Caught SIGINT, exiting")
# first hide all top level windows
# this is to discriminate the cases between "user clicks on [X]"
# (which should be filtered, to not close -see BECDockArea-)
# or "app is asked to close"
for window in app.topLevelWidgets():
window.hide() # so, we know we can exit because it is hidden
app.quit()
signal.signal(signal.SIGINT, sigint_handler)
@@ -310,5 +249,6 @@ def main():
sys.exit(app.exec())
if __name__ == "__main__":
if __name__ == "__main__": # pragma: no cover
sys.argv = ["bec_widgets.cli.server", "--id", "e2860", "--gui_class", "BECDockArea"]
main()

View File

@@ -3,11 +3,12 @@ import os
import numpy as np
import pyqtgraph as pg
from bec_qthemes import material_icon
from qtpy.QtCore import QSize
from qtpy.QtGui import QIcon
from qtpy.QtWidgets import (
QApplication,
QGroupBox,
QHBoxLayout,
QPushButton,
QSplitter,
QTabWidget,
QVBoxLayout,
@@ -15,15 +16,10 @@ from qtpy.QtWidgets import (
)
from bec_widgets.utils import BECDispatcher
from bec_widgets.utils.widget_io import WidgetHierarchy as wh
from bec_widgets.widgets.containers.dock import BECDockArea
from bec_widgets.widgets.containers.figure import BECFigure
from bec_widgets.widgets.containers.layout_manager.layout_manager import LayoutManagerWidget
from bec_widgets.widgets.editors.jupyter_console.jupyter_console import BECJupyterConsole
from bec_widgets.widgets.plots_next_gen.image.image import Image
from bec_widgets.widgets.plots_next_gen.plot_base import PlotBase
from bec_widgets.widgets.plots_next_gen.scatter_waveform.scatter_waveform import ScatterWaveform
from bec_widgets.widgets.plots_next_gen.waveform.waveform import Waveform
from bec_widgets.utils.colors import apply_theme
from bec_widgets.widgets.dock.dock_area import BECDockArea
from bec_widgets.widgets.figure import BECFigure
from bec_widgets.widgets.jupyter_console.jupyter_console import BECJupyterConsole
class JupyterConsoleWindow(QWidget): # pragma: no cover:
@@ -40,7 +36,6 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
{
"np": np,
"pg": pg,
"wh": wh,
"fig": self.figure,
"dock": self.dock,
"w1": self.w1,
@@ -54,22 +49,13 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
"w9": self.w9,
"w10": self.w10,
"d0": self.d0,
"d1": self.d1,
"d2": self.d2,
"wave": self.wf,
# "bar": self.bar,
# "cm": self.colormap,
"im": self.im,
"mi": self.mi,
"mm": self.mm,
"mw": self.mw,
"lm": self.lm,
"btn1": self.btn1,
"btn2": self.btn2,
"btn3": self.btn3,
"btn4": self.btn4,
"btn5": self.btn5,
"btn6": self.btn6,
"pb": self.pb,
"pi": self.pi,
"wf": self.wf,
"scatter": self.scatter,
"scatter_mi": self.scatter,
}
)
@@ -94,61 +80,11 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
second_tab_layout.addWidget(self.figure)
tab_widget.addTab(second_tab, "BEC Figure")
third_tab = QWidget()
third_tab_layout = QVBoxLayout(third_tab)
self.lm = LayoutManagerWidget()
third_tab_layout.addWidget(self.lm)
tab_widget.addTab(third_tab, "Layout Manager Widget")
fourth_tab = QWidget()
fourth_tab_layout = QVBoxLayout(fourth_tab)
self.pb = PlotBase()
self.pi = self.pb.plot_item
fourth_tab_layout.addWidget(self.pb)
tab_widget.addTab(fourth_tab, "PlotBase")
tab_widget.setCurrentIndex(3)
group_box = QGroupBox("Jupyter Console", splitter)
group_box_layout = QVBoxLayout(group_box)
self.console = BECJupyterConsole(inprocess=True)
group_box_layout.addWidget(self.console)
# Some buttons for layout testing
self.btn1 = QPushButton("Button 1")
self.btn2 = QPushButton("Button 2")
self.btn3 = QPushButton("Button 3")
self.btn4 = QPushButton("Button 4")
self.btn5 = QPushButton("Button 5")
self.btn6 = QPushButton("Button 6")
fifth_tab = QWidget()
fifth_tab_layout = QVBoxLayout(fifth_tab)
self.wf = Waveform()
fifth_tab_layout.addWidget(self.wf)
tab_widget.addTab(fifth_tab, "Waveform Next Gen")
tab_widget.setCurrentIndex(4)
sixth_tab = QWidget()
sixth_tab_layout = QVBoxLayout(sixth_tab)
self.im = Image()
self.mi = self.im.main_image
sixth_tab_layout.addWidget(self.im)
tab_widget.addTab(sixth_tab, "Image Next Gen")
tab_widget.setCurrentIndex(5)
seventh_tab = QWidget()
seventh_tab_layout = QVBoxLayout(seventh_tab)
self.scatter = ScatterWaveform()
self.scatter_mi = self.scatter.main_curve
self.scatter.plot("samx", "samy", "bpm4i")
seventh_tab_layout.addWidget(self.scatter)
tab_widget.addTab(seventh_tab, "Scatter Waveform")
tab_widget.setCurrentIndex(6)
# add stuff to the new Waveform widget
self._init_waveform()
# add stuff to figure
self._init_figure()
@@ -157,15 +93,16 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
self.setWindowTitle("Jupyter Console Window")
def _init_waveform(self):
# self.wfng._add_curve_custom(x=np.arange(10), y=np.random.rand(10), label="curve1")
# self.wfng._add_curve_custom(x=np.arange(10), y=np.random.rand(10), label="curve2")
# self.wfng._add_curve_custom(x=np.arange(10), y=np.random.rand(10), label="curve3")
self.wf.plot(y_name="bpm4i", y_entry="bpm4i", dap="GaussianModel")
self.wf.plot(y_name="bpm3a", y_entry="bpm3a", dap="GaussianModel")
def _init_figure(self):
self.w1 = self.figure.plot(x_name="samx", y_name="bpm4i", row=0, col=0)
self.w1 = self.figure.plot(
x_name="samx",
y_name="bpm4i",
# title="Standard Plot with sync device, custom labels - w1",
# x_label="Motor Position",
# y_label="Intensity (A.U.)",
row=0,
col=0,
)
self.w1.set(
title="Standard Plot with sync device, custom labels - w1",
x_label="Motor Position",
@@ -221,11 +158,24 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
def _init_dock(self):
self.d0 = self.dock.new(name="dock_0")
self.mm = self.d0.new("BECMotorMapWidget")
self.d0 = self.dock.add_dock(name="dock_0")
self.mm = self.d0.add_widget("BECMotorMapWidget")
self.mm.change_motors("samx", "samy")
self.mw = None # self.wf.multi_waveform(monitor="waveform") # , config=config)
self.d1 = self.dock.add_dock(name="dock_1", position="right")
self.im = self.d1.add_widget("BECImageWidget")
self.im.image("waveform", "1d")
self.d2 = self.dock.add_dock(name="dock_2", position="bottom")
self.wf = self.d2.add_widget("BECWaveformWidget", row=0, col=0)
self.wf.plot(x_name="samx", y_name="bpm3a")
self.wf.plot(x_name="samx", y_name="bpm4i", dap="GaussianModel")
# self.bar = self.d2.add_widget("RingProgressBar", row=0, col=1)
# self.bar.set_diameter(200)
# self.d3 = self.dock.add_dock(name="dock_3", position="bottom")
# self.colormap = pg.GradientWidget()
# self.d3.add_widget(self.colormap, row=0, col=0)
self.dock.save_state()
@@ -250,7 +200,8 @@ if __name__ == "__main__": # pragma: no cover
app = QApplication(sys.argv)
app.setApplicationName("Jupyter Console")
app.setApplicationDisplayName("Jupyter Console")
icon = material_icon("terminal", color=(255, 255, 255, 255), filled=True)
apply_theme("dark")
icon = material_icon("terminal", color="#434343", filled=True)
app.setWindowIcon(icon)
bec_dispatcher = BECDispatcher()
@@ -259,7 +210,6 @@ if __name__ == "__main__": # pragma: no cover
win = JupyterConsoleWindow()
win.show()
win.resize(1500, 800)
app.aboutToQuit.connect(win.close)
sys.exit(app.exec_())

View File

@@ -1,380 +0,0 @@
from __future__ import annotations
import sys
from typing import Literal
import pyqtgraph as pg
from qtpy.QtCore import Property, QEasingCurve, QObject, QPropertyAnimation
from qtpy.QtWidgets import (
QApplication,
QHBoxLayout,
QMainWindow,
QPushButton,
QSizePolicy,
QVBoxLayout,
QWidget,
)
from typeguard import typechecked
from bec_widgets.widgets.containers.layout_manager.layout_manager import LayoutManagerWidget
class DimensionAnimator(QObject):
"""
Helper class to animate the size of a panel widget.
"""
def __init__(self, panel_widget: QWidget, direction: str):
super().__init__()
self.panel_widget = panel_widget
self.direction = direction
self._size = 0
@Property(int)
def panel_width(self):
"""
Returns the current width of the panel widget.
"""
return self._size
@panel_width.setter
def panel_width(self, val: int):
"""
Set the width of the panel widget.
Args:
val(int): The width to set.
"""
self._size = val
self.panel_widget.setFixedWidth(val)
@Property(int)
def panel_height(self):
"""
Returns the current height of the panel widget.
"""
return self._size
@panel_height.setter
def panel_height(self, val: int):
"""
Set the height of the panel widget.
Args:
val(int): The height to set.
"""
self._size = val
self.panel_widget.setFixedHeight(val)
class CollapsiblePanelManager(QObject):
"""
Manager class to handle collapsible panels from a main widget using LayoutManagerWidget.
"""
def __init__(self, layout_manager: LayoutManagerWidget, reference_widget: QWidget, parent=None):
super().__init__(parent)
self.layout_manager = layout_manager
self.reference_widget = reference_widget
self.animations = {}
self.panels = {}
self.direction_settings = {
"left": {"property": b"maximumWidth", "default_size": 200},
"right": {"property": b"maximumWidth", "default_size": 200},
"top": {"property": b"maximumHeight", "default_size": 150},
"bottom": {"property": b"maximumHeight", "default_size": 150},
}
def add_panel(
self,
direction: Literal["left", "right", "top", "bottom"],
panel_widget: QWidget,
target_size: int | None = None,
duration: int = 300,
):
"""
Add a panel widget to the layout manager.
Args:
direction(Literal["left", "right", "top", "bottom"]): Direction of the panel.
panel_widget(QWidget): The panel widget to add.
target_size(int, optional): The target size of the panel. Defaults to None.
duration(int): The duration of the animation in milliseconds. Defaults to 300.
"""
if direction not in self.direction_settings:
raise ValueError("Direction must be one of 'left', 'right', 'top', 'bottom'.")
if target_size is None:
target_size = self.direction_settings[direction]["default_size"]
self.layout_manager.add_widget_relative(
widget=panel_widget, reference_widget=self.reference_widget, position=direction
)
panel_widget.setVisible(False)
# Set initial constraints as flexible
if direction in ["left", "right"]:
panel_widget.setMaximumWidth(0)
panel_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
else:
panel_widget.setMaximumHeight(0)
panel_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
self.panels[direction] = {
"widget": panel_widget,
"direction": direction,
"target_size": target_size,
"duration": duration,
"animator": None,
}
def toggle_panel(
self,
direction: Literal["left", "right", "top", "bottom"],
target_size: int | None = None,
duration: int | None = None,
easing_curve: QEasingCurve = QEasingCurve.InOutQuad,
ensure_max: bool = False,
scale: float | None = None,
animation: bool = True,
):
"""
Toggle the specified panel.
Parameters:
direction (Literal["left", "right", "top", "bottom"]): Direction of the panel to toggle.
target_size (int, optional): Override target size for this toggle.
duration (int, optional): Override the animation duration.
easing_curve (QEasingCurve): Animation easing curve.
ensure_max (bool): If True, animate as a fixed-size panel.
scale (float, optional): If provided, calculate target_size from main widget size.
animation (bool): If False, no animation is performed; panel instantly toggles.
"""
if direction not in self.panels:
raise ValueError(f"No panel found in direction '{direction}'.")
panel_info = self.panels[direction]
panel_widget = panel_info["widget"]
dir_settings = self.direction_settings[direction]
# Determine final target size
if scale is not None:
main_rect = self.reference_widget.geometry()
if direction in ["left", "right"]:
computed_target = int(main_rect.width() * scale)
else:
computed_target = int(main_rect.height() * scale)
final_target_size = computed_target
else:
if target_size is None:
final_target_size = panel_info["target_size"]
else:
final_target_size = target_size
if duration is None:
duration = panel_info["duration"]
expanding_property = dir_settings["property"]
currently_visible = panel_widget.isVisible()
if ensure_max:
if panel_info["animator"] is None:
panel_info["animator"] = DimensionAnimator(panel_widget, direction)
animator = panel_info["animator"]
if direction in ["left", "right"]:
prop_name = b"panel_width"
else:
prop_name = b"panel_height"
else:
animator = None
prop_name = expanding_property
if currently_visible:
# Hide the panel
if ensure_max:
start_value = final_target_size
end_value = 0
finish_callback = lambda w=panel_widget, d=direction: self._after_hide_reset(w, d)
else:
start_value = (
panel_widget.width()
if direction in ["left", "right"]
else panel_widget.height()
)
end_value = 0
finish_callback = lambda w=panel_widget: w.setVisible(False)
else:
# Show the panel
start_value = 0
end_value = final_target_size
finish_callback = None
if ensure_max:
# Fix panel exactly
if direction in ["left", "right"]:
panel_widget.setMinimumWidth(0)
panel_widget.setMaximumWidth(final_target_size)
panel_widget.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Expanding)
else:
panel_widget.setMinimumHeight(0)
panel_widget.setMaximumHeight(final_target_size)
panel_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed)
else:
# Flexible mode
if direction in ["left", "right"]:
panel_widget.setMinimumWidth(0)
panel_widget.setMaximumWidth(final_target_size)
panel_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
else:
panel_widget.setMinimumHeight(0)
panel_widget.setMaximumHeight(final_target_size)
panel_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
panel_widget.setVisible(True)
if not animation:
# No animation: instantly set final state
if end_value == 0:
# Hiding
if ensure_max:
# Reset after hide
self._after_hide_reset(panel_widget, direction)
else:
panel_widget.setVisible(False)
else:
# Showing
if ensure_max:
# Already set fixed size
if direction in ["left", "right"]:
panel_widget.setFixedWidth(end_value)
else:
panel_widget.setFixedHeight(end_value)
else:
# Just set maximum dimension
if direction in ["left", "right"]:
panel_widget.setMaximumWidth(end_value)
else:
panel_widget.setMaximumHeight(end_value)
return
# With animation
animation = QPropertyAnimation(animator if ensure_max else panel_widget, prop_name)
animation.setDuration(duration)
animation.setStartValue(start_value)
animation.setEndValue(end_value)
animation.setEasingCurve(easing_curve)
if end_value == 0 and finish_callback:
animation.finished.connect(finish_callback)
elif end_value == 0 and not finish_callback:
animation.finished.connect(lambda w=panel_widget: w.setVisible(False))
animation.start()
self.animations[panel_widget] = animation
@typechecked
def _after_hide_reset(
self, panel_widget: QWidget, direction: Literal["left", "right", "top", "bottom"]
):
"""
Reset the panel widget after hiding it in ensure_max mode.
Args:
panel_widget(QWidget): The panel widget to reset.
direction(Literal["left", "right", "top", "bottom"]): The direction of the panel.
"""
# Called after hiding a panel in ensure_max mode
panel_widget.setVisible(False)
if direction in ["left", "right"]:
panel_widget.setMinimumWidth(0)
panel_widget.setMaximumWidth(0)
panel_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
else:
panel_widget.setMinimumHeight(0)
panel_widget.setMaximumHeight(16777215)
panel_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
####################################################################################################
# The following code is for the GUI control panel to interact with the CollapsiblePanelManager.
# It is not covered by any tests as it serves only as an example for the CollapsiblePanelManager class.
####################################################################################################
class MainWindow(QMainWindow): # pragma: no cover
def __init__(self):
super().__init__()
self.setWindowTitle("Panels with ensure_max, scale, and animation toggle")
self.resize(800, 600)
central_widget = QWidget()
self.setCentralWidget(central_widget)
main_layout = QVBoxLayout(central_widget)
main_layout.setContentsMargins(10, 10, 10, 10)
main_layout.setSpacing(10)
# Buttons
buttons_layout = QHBoxLayout()
self.btn_left = QPushButton("Toggle Left (ensure_max=True)")
self.btn_top = QPushButton("Toggle Top (scale=0.5, no animation)")
self.btn_right = QPushButton("Toggle Right (ensure_max=True, scale=0.3)")
self.btn_bottom = QPushButton("Toggle Bottom (no animation)")
buttons_layout.addWidget(self.btn_left)
buttons_layout.addWidget(self.btn_top)
buttons_layout.addWidget(self.btn_right)
buttons_layout.addWidget(self.btn_bottom)
main_layout.addLayout(buttons_layout)
self.layout_manager = LayoutManagerWidget()
main_layout.addWidget(self.layout_manager)
# Main widget
self.main_plot = pg.PlotWidget()
self.main_plot.plot([1, 2, 3, 4], [4, 3, 2, 1])
self.layout_manager.add_widget(self.main_plot, 0, 0)
self.panel_manager = CollapsiblePanelManager(self.layout_manager, self.main_plot)
# Panels
self.left_panel = pg.PlotWidget()
self.left_panel.plot([1, 2, 3], [3, 2, 1])
self.panel_manager.add_panel("left", self.left_panel, target_size=200)
self.right_panel = pg.PlotWidget()
self.right_panel.plot([10, 20, 30], [1, 10, 1])
self.panel_manager.add_panel("right", self.right_panel, target_size=200)
self.top_panel = pg.PlotWidget()
self.top_panel.plot([1, 2, 3], [1, 2, 3])
self.panel_manager.add_panel("top", self.top_panel, target_size=150)
self.bottom_panel = pg.PlotWidget()
self.bottom_panel.plot([2, 4, 6], [10, 5, 10])
self.panel_manager.add_panel("bottom", self.bottom_panel, target_size=150)
# Connect buttons
# Left with ensure_max
self.btn_left.clicked.connect(
lambda: self.panel_manager.toggle_panel("left", ensure_max=True)
)
# Top with scale=0.5 and no animation
self.btn_top.clicked.connect(
lambda: self.panel_manager.toggle_panel("top", scale=0.5, animation=False)
)
# Right with ensure_max, scale=0.3
self.btn_right.clicked.connect(
lambda: self.panel_manager.toggle_panel("right", ensure_max=True, scale=0.3)
)
# Bottom no animation
self.btn_bottom.clicked.connect(
lambda: self.panel_manager.toggle_panel("bottom", target_size=100, animation=False)
)
if __name__ == "__main__": # pragma: no cover
app = QApplication(sys.argv)
w = MainWindow()
w.show()
sys.exit(app.exec())

View File

@@ -2,93 +2,9 @@ import functools
import sys
import traceback
from bec_lib.logger import bec_logger
from qtpy.QtCore import Property, QObject, Qt, Signal, Slot
from qtpy.QtCore import QObject, Qt, Signal, Slot
from qtpy.QtWidgets import QApplication, QMessageBox, QPushButton, QVBoxLayout, QWidget
logger = bec_logger.logger
def SafeProperty(prop_type, *prop_args, popup_error: bool = False, default=None, **prop_kwargs):
"""
Decorator to create a Qt Property with safe getter and setter so that
Qt Designer won't crash if an exception occurs in either method.
Args:
prop_type: The property type (e.g., str, bool, int, custom classes, etc.)
popup_error (bool): If True, show a popup for any error; otherwise, ignore or log silently.
default: Any default/fallback value to return if the getter raises an exception.
*prop_args, **prop_kwargs: Passed along to the underlying Qt Property constructor.
Usage:
@SafeProperty(int, default=-1)
def some_value(self) -> int:
# your getter logic
return ... # if an exception is raised, returns -1
@some_value.setter
def some_value(self, val: int):
# your setter logic
...
"""
def decorator(py_getter):
"""Decorator for the user's property getter function."""
@functools.wraps(py_getter)
def safe_getter(self_):
try:
return py_getter(self_)
except Exception:
# Identify which property function triggered error
prop_name = f"{py_getter.__module__}.{py_getter.__qualname__}"
error_msg = traceback.format_exc()
if popup_error:
ErrorPopupUtility().custom_exception_hook(*sys.exc_info(), popup_error=True)
logger.error(f"SafeProperty error in GETTER of '{prop_name}':\n{error_msg}")
return default
class PropertyWrapper:
"""
Intermediate wrapper used so that the user can optionally chain .setter(...).
"""
def __init__(self, getter_func):
# We store only our safe_getter in the wrapper
self.getter_func = safe_getter
def setter(self, setter_func):
"""Wraps the user-defined setter to handle errors safely."""
@functools.wraps(setter_func)
def safe_setter(self_, value):
try:
return setter_func(self_, value)
except Exception:
prop_name = f"{setter_func.__module__}.{setter_func.__qualname__}"
error_msg = traceback.format_exc()
if popup_error:
ErrorPopupUtility().custom_exception_hook(
*sys.exc_info(), popup_error=True
)
logger.error(f"SafeProperty error in SETTER of '{prop_name}':\n{error_msg}")
return
# Return the full read/write Property
return Property(prop_type, self.getter_func, safe_setter, *prop_args, **prop_kwargs)
def __call__(self):
"""
If user never calls `.setter(...)`, produce a read-only property.
"""
return Property(prop_type, self.getter_func, None, *prop_args, **prop_kwargs)
return PropertyWrapper(py_getter)
return decorator
def SafeSlot(*slot_args, **slot_kwargs): # pylint: disable=invalid-name
"""Function with args, acting like a decorator, applying "error_managed" decorator + Qt Slot
@@ -106,13 +22,7 @@ def SafeSlot(*slot_args, **slot_kwargs): # pylint: disable=invalid-name
try:
return method(*args, **kwargs)
except Exception:
slot_name = f"{method.__module__}.{method.__qualname__}"
error_msg = traceback.format_exc()
if popup_error:
ErrorPopupUtility().custom_exception_hook(
*sys.exc_info(), popup_error=popup_error
)
logger.error(f"SafeSlot error in slot '{slot_name}':\n{error_msg}")
ErrorPopupUtility().custom_exception_hook(*sys.exc_info(), popup_error=popup_error)
return wrapper
@@ -181,12 +91,6 @@ class _ErrorPopupUtility(QObject):
msg.setMinimumHeight(400)
msg.exec_()
def show_property_error(self, title, message, widget):
"""
Show a property-specific error message.
"""
self.error_occurred.emit(title, message, widget)
def format_traceback(self, traceback_message: str) -> str:
"""
Format the traceback message to be displayed in the error popup by adding indentation to each line.
@@ -223,14 +127,12 @@ class _ErrorPopupUtility(QObject):
error_message = " ".join(captured_message)
return error_message
def get_error_message(self, exctype, value, tb):
return "".join(traceback.format_exception(exctype, value, tb))
def custom_exception_hook(self, exctype, value, tb, popup_error=False):
if popup_error or self.enable_error_popup:
error_message = traceback.format_exception(exctype, value, tb)
self.error_occurred.emit(
"Method error" if popup_error else "Application Error",
self.get_error_message(exctype, value, tb),
"".join(error_message),
self.parent(),
)
else:

View File

@@ -1,72 +0,0 @@
from __future__ import annotations
from bec_qthemes import material_icon
from qtpy.QtWidgets import (
QFrame,
QHBoxLayout,
QLabel,
QLayout,
QSizePolicy,
QToolButton,
QVBoxLayout,
QWidget,
)
from bec_widgets.qt_utils.error_popups import SafeProperty, SafeSlot
class ExpandableGroupFrame(QFrame):
EXPANDED_ICON_NAME: str = "collapse_all"
COLLAPSED_ICON_NAME: str = "expand_all"
def __init__(self, title: str, parent: QWidget | None = None, expanded: bool = True) -> None:
super().__init__(parent=parent)
self._expanded = expanded
self.setFrameStyle(QFrame.Shape.StyledPanel | QFrame.Shadow.Plain)
self.setSizePolicy(QSizePolicy.Policy.Minimum, QSizePolicy.Policy.Minimum)
self._layout = QVBoxLayout()
self._layout.setContentsMargins(0, 0, 0, 0)
self.setLayout(self._layout)
self._title_layout = QHBoxLayout()
self._layout.addLayout(self._title_layout)
self._expansion_button = QToolButton()
self._update_icon()
self._title = QLabel(f"<b>{title}</b>")
self._title_layout.addWidget(self._expansion_button)
self._title_layout.addWidget(self._title)
self._contents = QWidget()
self._layout.addWidget(self._contents)
self._expansion_button.clicked.connect(self.switch_expanded_state)
self.expanded = self._expanded # type: ignore
def set_layout(self, layout: QLayout) -> None:
self._contents.setLayout(layout)
self._contents.layout().setContentsMargins(0, 0, 0, 0) # type: ignore
@SafeSlot()
def switch_expanded_state(self):
self.expanded = not self.expanded # type: ignore
self._update_icon()
@SafeProperty(bool)
def expanded(self): # type: ignore
return self._expanded
@expanded.setter
def expanded(self, expanded: bool):
self._expanded = expanded
self._contents.setVisible(expanded)
self.updateGeometry()
def _update_icon(self):
self._expansion_button.setIcon(
material_icon(icon_name=self.EXPANDED_ICON_NAME, size=(10, 10), convert_to_pixmap=False)
if self.expanded
else material_icon(
icon_name=self.COLLAPSED_ICON_NAME, size=(10, 10), convert_to_pixmap=False
)
)

View File

@@ -13,7 +13,7 @@ from qtpy.QtWidgets import (
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import get_accent_colors, get_theme_palette
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
from bec_widgets.widgets.dark_mode_button.dark_mode_button import DarkModeButton
class PaletteViewer(BECWidget, QWidget):

View File

@@ -1,157 +0,0 @@
import pyqtgraph as pg
from qtpy.QtCore import Property
from qtpy.QtWidgets import QApplication, QFrame, QHBoxLayout, QVBoxLayout, QWidget
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
class RoundedFrame(BECWidget, QFrame):
"""
A custom QFrame with rounded corners and optional theme updates.
The frame can contain any QWidget, however it is mainly designed to wrap PlotWidgets to provide a consistent look and feel with other BEC Widgets.
"""
def __init__(
self,
parent=None,
content_widget: QWidget = None,
background_color: str = None,
theme_update: bool = True,
radius: int = 10,
**kwargs,
):
super().__init__(**kwargs)
QFrame.__init__(self, parent)
self.background_color = background_color
self.theme_update = theme_update if background_color is None else False
self._radius = radius
# Apply rounded frame styling
self.setProperty("skip_settings", True)
self.setObjectName("roundedFrame")
# Create a layout for the frame
self.layout = QHBoxLayout(self)
self.layout.setContentsMargins(5, 5, 5, 5) # Set 5px margin
# Add the content widget to the layout
if content_widget:
self.layout.addWidget(content_widget)
# Store reference to the content widget
self.content_widget = content_widget
# Automatically apply initial styles to the GraphicalLayoutWidget if applicable
self.apply_plot_widget_style()
self._connect_to_theme_change()
def apply_theme(self, theme: str):
"""
Apply the theme to the frame and its content if theme updates are enabled.
"""
if not self.theme_update:
return
# Update background color based on the theme
if theme == "light":
self.background_color = "#e9ecef" # Subtle contrast for light mode
else:
self.background_color = "#141414" # Dark mode
self.update_style()
@Property(int)
def radius(self):
"""Radius of the rounded corners."""
return self._radius
@radius.setter
def radius(self, value: int):
self._radius = value
self.update_style()
def update_style(self):
"""
Update the style of the frame based on the background color.
"""
if self.background_color:
self.setStyleSheet(
f"""
QFrame#roundedFrame {{
background-color: {self.background_color};
border-radius: {self._radius}; /* Rounded corners */
}}
"""
)
self.apply_plot_widget_style()
def apply_plot_widget_style(self, border: str = "none"):
"""
Automatically apply background, border, and axis styles to the PlotWidget.
Args:
border (str): Border style (e.g., 'none', '1px solid red').
"""
if isinstance(self.content_widget, pg.GraphicsLayoutWidget):
# Apply border style via stylesheet
self.content_widget.setStyleSheet(
f"""
GraphicsLayoutWidget {{
border: {border}; /* Explicitly set the border */
}}
"""
)
self.content_widget.setBackground(self.background_color)
class ExampleApp(QWidget): # pragma: no cover
def __init__(self):
super().__init__()
self.setWindowTitle("Rounded Plots Example")
# Main layout
layout = QVBoxLayout(self)
dark_button = DarkModeButton()
# Create PlotWidgets
plot1 = pg.GraphicsLayoutWidget()
plot_item_1 = pg.PlotItem()
plot_item_1.plot([1, 3, 2, 4, 6, 5], pen="r")
plot1.plot_item = plot_item_1
plot2 = pg.GraphicsLayoutWidget()
plot_item_2 = pg.PlotItem()
plot_item_2.plot([1, 2, 4, 8, 16, 32], pen="r")
plot2.plot_item = plot_item_2
# Wrap PlotWidgets in RoundedFrame
rounded_plot1 = RoundedFrame(content_widget=plot1, theme_update=True)
rounded_plot2 = RoundedFrame(content_widget=plot2, theme_update=True)
# Add to layout
layout.addWidget(dark_button)
layout.addWidget(rounded_plot1)
layout.addWidget(rounded_plot2)
self.setLayout(layout)
from qtpy.QtCore import QTimer
def change_theme():
rounded_plot1.apply_theme("light")
rounded_plot2.apply_theme("dark")
QTimer.singleShot(100, change_theme)
if __name__ == "__main__": # pragma: no cover
app = QApplication([])
window = ExampleApp()
window.show()
app.exec()

View File

@@ -1,6 +1,6 @@
from qtpy.QtWidgets import QDialog, QDialogButtonBox, QHBoxLayout, QPushButton, QVBoxLayout, QWidget
from bec_widgets.qt_utils.error_popups import SafeSlot
from bec_widgets.qt_utils.error_popups import SafeSlot as Slot
class SettingWidget(QWidget):
@@ -20,14 +20,14 @@ class SettingWidget(QWidget):
def set_target_widget(self, target_widget: QWidget):
self.target_widget = target_widget
@SafeSlot()
@Slot()
def accept_changes(self):
"""
Accepts the changes made in the settings widget and applies them to the target widget.
"""
pass
@SafeSlot(dict)
@Slot(dict)
def display_current_settings(self, config_dict: dict):
"""
Displays the current settings of the target widget in the settings widget.
@@ -54,13 +54,12 @@ class SettingsDialog(QDialog):
settings_widget: SettingWidget = None,
window_title: str = "Settings",
config: dict = None,
modal: bool = False,
*args,
**kwargs,
):
super().__init__(parent, *args, **kwargs)
self.setModal(modal)
self.setModal(False)
self.setWindowTitle(window_title)
@@ -93,7 +92,7 @@ class SettingsDialog(QDialog):
ok_button.setDefault(True)
ok_button.setAutoDefault(True)
@SafeSlot()
@Slot()
def accept(self):
"""
Accept the changes made in the settings widget and close the dialog.
@@ -101,7 +100,7 @@ class SettingsDialog(QDialog):
self.widget.accept_changes()
super().accept()
@SafeSlot()
@Slot()
def apply_changes(self):
"""
Apply the changes made in the settings widget without closing the dialog.

View File

@@ -1,369 +0,0 @@
import sys
from typing import Literal, Optional
from qtpy.QtCore import Property, QEasingCurve, QPropertyAnimation
from qtpy.QtGui import QAction
from qtpy.QtWidgets import (
QApplication,
QFrame,
QHBoxLayout,
QLabel,
QMainWindow,
QScrollArea,
QSizePolicy,
QStackedWidget,
QVBoxLayout,
QWidget,
)
from bec_widgets.qt_utils.toolbar import MaterialIconAction, ModularToolBar
class SidePanel(QWidget):
"""
Side panel widget that can be placed on the left, right, top, or bottom of the main widget.
"""
def __init__(
self,
parent=None,
orientation: Literal["left", "right", "top", "bottom"] = "left",
panel_max_width: int = 200,
animation_duration: int = 200,
animations_enabled: bool = True,
):
super().__init__(parent=parent)
self.setProperty("skip_settings", True)
self.setObjectName("SidePanel")
self._orientation = orientation
self._panel_max_width = panel_max_width
self._animation_duration = animation_duration
self._animations_enabled = animations_enabled
self._panel_width = 0
self._panel_height = 0
self.panel_visible = False
self.current_action: Optional[QAction] = None
self.current_index: Optional[int] = None
self.switching_actions = False
self._init_ui()
def _init_ui(self):
"""
Initialize the UI elements.
"""
if self._orientation in ("left", "right"):
self.main_layout = QHBoxLayout(self)
self.main_layout.setContentsMargins(0, 0, 0, 0)
self.main_layout.setSpacing(0)
self.toolbar = ModularToolBar(target_widget=self, orientation="vertical")
self.container = QWidget()
self.container.layout = QVBoxLayout(self.container)
self.container.layout.setContentsMargins(0, 0, 0, 0)
self.container.layout.setSpacing(0)
self.stack_widget = QStackedWidget()
self.stack_widget.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Expanding)
self.stack_widget.setMinimumWidth(5)
self.stack_widget.setMaximumWidth(self._panel_max_width)
if self._orientation == "left":
self.main_layout.addWidget(self.toolbar)
self.main_layout.addWidget(self.container)
else:
self.main_layout.addWidget(self.container)
self.main_layout.addWidget(self.toolbar)
self.container.layout.addWidget(self.stack_widget)
self.menu_anim = QPropertyAnimation(self, b"panel_width")
self.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Expanding)
self.panel_width = 0 # start hidden
else:
self.main_layout = QVBoxLayout(self)
self.main_layout.setContentsMargins(0, 0, 0, 0)
self.main_layout.setSpacing(0)
self.toolbar = ModularToolBar(target_widget=self, orientation="horizontal")
self.container = QWidget()
self.container.layout = QVBoxLayout(self.container)
self.container.layout.setContentsMargins(0, 0, 0, 0)
self.container.layout.setSpacing(0)
self.stack_widget = QStackedWidget()
self.stack_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed)
self.stack_widget.setMinimumHeight(5)
self.stack_widget.setMaximumHeight(self._panel_max_width)
if self._orientation == "top":
self.main_layout.addWidget(self.toolbar)
self.main_layout.addWidget(self.container)
else:
self.main_layout.addWidget(self.container)
self.main_layout.addWidget(self.toolbar)
self.container.layout.addWidget(self.stack_widget)
self.menu_anim = QPropertyAnimation(self, b"panel_height")
self.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed)
self.panel_height = 0 # start hidden
self.menu_anim.setDuration(self._animation_duration)
self.menu_anim.setEasingCurve(QEasingCurve.InOutQuad)
@Property(int)
def panel_width(self):
"""Get the panel width."""
return self._panel_width
@panel_width.setter
def panel_width(self, width: int):
"""Set the panel width."""
self._panel_width = width
if self._orientation in ("left", "right"):
self.stack_widget.setFixedWidth(width)
@Property(int)
def panel_height(self):
"""Get the panel height."""
return self._panel_height
@panel_height.setter
def panel_height(self, height: int):
"""Set the panel height."""
self._panel_height = height
if self._orientation in ("top", "bottom"):
self.stack_widget.setFixedHeight(height)
@Property(int)
def panel_max_width(self):
"""Get the maximum width of the panel."""
return self._panel_max_width
@panel_max_width.setter
def panel_max_width(self, size: int):
"""Set the maximum width of the panel."""
self._panel_max_width = size
if self._orientation in ("left", "right"):
self.stack_widget.setMaximumWidth(self._panel_max_width)
else:
self.stack_widget.setMaximumHeight(self._panel_max_width)
@Property(int)
def animation_duration(self):
"""Get the duration of the animation."""
return self._animation_duration
@animation_duration.setter
def animation_duration(self, duration: int):
"""Set the duration of the animation."""
self._animation_duration = duration
self.menu_anim.setDuration(duration)
@Property(bool)
def animations_enabled(self):
"""Get the status of the animations."""
return self._animations_enabled
@animations_enabled.setter
def animations_enabled(self, enabled: bool):
"""Set the status of the animations."""
self._animations_enabled = enabled
def show_panel(self, idx: int):
"""
Show the side panel with animation and switch to idx.
"""
self.stack_widget.setCurrentIndex(idx)
self.panel_visible = True
self.current_index = idx
if self._orientation in ("left", "right"):
start_val, end_val = 0, self._panel_max_width
else:
start_val, end_val = 0, self._panel_max_width
if self._animations_enabled:
self.menu_anim.stop()
self.menu_anim.setStartValue(start_val)
self.menu_anim.setEndValue(end_val)
self.menu_anim.start()
else:
if self._orientation in ("left", "right"):
self.panel_width = end_val
else:
self.panel_height = end_val
def hide_panel(self):
"""
Hide the side panel with animation.
"""
self.panel_visible = False
self.current_index = None
if self._orientation in ("left", "right"):
start_val, end_val = self._panel_max_width, 0
else:
start_val, end_val = self._panel_max_width, 0
if self._animations_enabled:
self.menu_anim.stop()
self.menu_anim.setStartValue(start_val)
self.menu_anim.setEndValue(end_val)
self.menu_anim.start()
else:
if self._orientation in ("left", "right"):
self.panel_width = end_val
else:
self.panel_height = end_val
def switch_to(self, idx: int):
"""
Switch to the specified index without animation.
"""
if self.current_index != idx:
self.stack_widget.setCurrentIndex(idx)
self.current_index = idx
def add_menu(self, action_id: str, icon_name: str, tooltip: str, widget: QWidget, title: str):
"""
Add a menu to the side panel.
Args:
action_id(str): The ID of the action.
icon_name(str): The name of the icon.
tooltip(str): The tooltip for the action.
widget(QWidget): The widget to add to the panel.
title(str): The title of the panel.
"""
# container_widget: top-level container for the stacked page
container_widget = QWidget()
container_layout = QVBoxLayout(container_widget)
container_layout.setContentsMargins(0, 0, 0, 0)
container_layout.setSpacing(5)
title_label = QLabel(f"<b>{title}</b>")
title_label.setStyleSheet("font-size: 16px;")
container_layout.addWidget(title_label)
# Create a QScrollArea for the actual widget to ensure scrolling if the widget inside is too large
scroll_area = QScrollArea()
scroll_area.setFrameShape(QFrame.NoFrame)
scroll_area.setWidgetResizable(True)
# Let the scroll area expand in both directions if there's room
scroll_area.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
scroll_area.setWidget(widget)
# Put the scroll area in the container layout
container_layout.addWidget(scroll_area)
# Optionally stretch the scroll area to fill vertical space
container_layout.setStretchFactor(scroll_area, 1)
# Add container_widget to the stacked widget
index = self.stack_widget.count()
self.stack_widget.addWidget(container_widget)
# Add an action to the toolbar
action = MaterialIconAction(icon_name=icon_name, tooltip=tooltip, checkable=True)
self.toolbar.add_action(action_id, action, target_widget=self)
def on_action_toggled(checked: bool):
if self.switching_actions:
return
if checked:
if self.current_action and self.current_action != action.action:
self.switching_actions = True
self.current_action.setChecked(False)
self.switching_actions = False
self.current_action = action.action
if not self.panel_visible:
self.show_panel(index)
else:
self.switch_to(index)
else:
if self.current_action == action.action:
self.current_action = None
self.hide_panel()
action.action.toggled.connect(on_action_toggled)
############################################
# DEMO APPLICATION
############################################
class ExampleApp(QMainWindow): # pragma: no cover
def __init__(self):
super().__init__()
self.setWindowTitle("Side Panel Example")
central_widget = QWidget()
self.setCentralWidget(central_widget)
self.layout = QHBoxLayout(central_widget)
# Create side panel
self.side_panel = SidePanel(self, orientation="left", panel_max_width=250)
self.layout.addWidget(self.side_panel)
from bec_widgets.widgets.plots_next_gen.waveform.waveform import Waveform
self.plot = Waveform()
self.layout.addWidget(self.plot)
self.add_side_menus()
def add_side_menus(self):
widget1 = QWidget()
layout1 = QVBoxLayout(widget1)
for i in range(15):
layout1.addWidget(QLabel(f"Widget 1 label row {i}"))
self.side_panel.add_menu(
action_id="widget1",
icon_name="counter_1",
tooltip="Show Widget 1",
widget=widget1,
title="Widget 1 Panel",
)
widget2 = QWidget()
layout2 = QVBoxLayout(widget2)
layout2.addWidget(QLabel("Short widget 2 content"))
self.side_panel.add_menu(
action_id="widget2",
icon_name="counter_2",
tooltip="Show Widget 2",
widget=widget2,
title="Widget 2 Panel",
)
widget3 = QWidget()
layout3 = QVBoxLayout(widget3)
for i in range(10):
layout3.addWidget(QLabel(f"Line {i} for Widget 3"))
self.side_panel.add_menu(
action_id="widget3",
icon_name="counter_3",
tooltip="Show Widget 3",
widget=widget3,
title="Widget 3 Panel",
)
if __name__ == "__main__": # pragma: no cover
app = QApplication(sys.argv)
window = ExampleApp()
window.resize(1000, 700)
window.show()
sys.exit(app.exec())

View File

@@ -2,70 +2,28 @@
from __future__ import annotations
import os
import sys
from abc import ABC, abstractmethod
from collections import defaultdict
from typing import Dict, List, Literal, Tuple
from typing import Literal
from bec_qthemes._icon.material_icons import material_icon
from qtpy.QtCore import QSize, Qt, QTimer
from qtpy.QtCore import QSize, Qt
from qtpy.QtGui import QAction, QColor, QIcon
from qtpy.QtWidgets import (
QApplication,
QComboBox,
QHBoxLayout,
QLabel,
QMainWindow,
QMenu,
QSizePolicy,
QStyle,
QToolBar,
QToolButton,
QVBoxLayout,
QWidget,
)
import bec_widgets
from bec_widgets.utils.colors import set_theme
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
MODULE_PATH = os.path.dirname(bec_widgets.__file__)
# Ensure that icons are shown in menus (especially on macOS)
QApplication.setAttribute(Qt.AA_DontShowIconsInMenus, False)
class LongPressToolButton(QToolButton):
def __init__(self, *args, long_press_threshold=500, **kwargs):
super().__init__(*args, **kwargs)
self.long_press_threshold = long_press_threshold
self._long_press_timer = QTimer(self)
self._long_press_timer.setSingleShot(True)
self._long_press_timer.timeout.connect(self.handleLongPress)
self._pressed = False
self._longPressed = False
def mousePressEvent(self, event):
self._pressed = True
self._longPressed = False
self._long_press_timer.start(self.long_press_threshold)
super().mousePressEvent(event)
def mouseReleaseEvent(self, event):
self._pressed = False
if self._longPressed:
self._longPressed = False
self._long_press_timer.stop()
event.accept() # Prevent normal click action after a long press
return
self._long_press_timer.stop()
super().mouseReleaseEvent(event)
def handleLongPress(self):
if self._pressed:
self._longPressed = True
self.showMenu()
class ToolBarAction(ABC):
"""
@@ -73,7 +31,7 @@ class ToolBarAction(ABC):
Args:
icon_path (str, optional): The name of the icon file from `assets/toolbar_icons`. Defaults to None.
tooltip (str, optional): The tooltip for the action. Defaults to None.
tooltip (bool, optional): The tooltip for the action. Defaults to None.
checkable (bool, optional): Whether the action is checkable. Defaults to False.
"""
@@ -123,33 +81,15 @@ class IconAction(ToolBarAction):
toolbar.addAction(self.action)
class QtIconAction(ToolBarAction):
def __init__(self, standard_icon, tooltip=None, checkable=False, parent=None):
super().__init__(icon_path=None, tooltip=tooltip, checkable=checkable)
self.standard_icon = standard_icon
self.icon = QApplication.style().standardIcon(standard_icon)
self.action = QAction(self.icon, self.tooltip, parent)
self.action.setCheckable(self.checkable)
def add_to_toolbar(self, toolbar, target):
toolbar.addAction(self.action)
def get_icon(self):
return self.icon
class MaterialIconAction(ToolBarAction):
class MaterialIconAction:
"""
Action with a Material icon for the toolbar.
Args:
icon_name (str, optional): The name of the Material icon. Defaults to None.
tooltip (str, optional): The tooltip for the action. Defaults to None.
icon_path (str, optional): The name of the Material icon. Defaults to None.
tooltip (bool, optional): The tooltip for the action. Defaults to None.
checkable (bool, optional): Whether the action is checkable. Defaults to False.
filled (bool, optional): Whether the icon is filled. Defaults to False.
color (str | tuple | QColor | dict[Literal["dark", "light"], str] | None, optional): The color of the icon.
Defaults to None.
parent (QWidget or None, optional): Parent widget for the underlying QAction.
"""
def __init__(
@@ -159,41 +99,30 @@ class MaterialIconAction(ToolBarAction):
checkable: bool = False,
filled: bool = False,
color: str | tuple | QColor | dict[Literal["dark", "light"], str] | None = None,
parent=None,
):
super().__init__(icon_path=None, tooltip=tooltip, checkable=checkable)
self.icon_name = icon_name
self.tooltip = tooltip
self.checkable = checkable
self.action = None
self.filled = filled
self.color = color
# Generate the icon using the material_icon helper
self.icon = material_icon(
def add_to_toolbar(self, toolbar: QToolBar, target: QWidget):
icon = self.get_icon()
self.action = QAction(icon, self.tooltip, target)
self.action.setCheckable(self.checkable)
toolbar.addAction(self.action)
def get_icon(self):
icon = material_icon(
self.icon_name,
size=(20, 20),
convert_to_pixmap=False,
filled=self.filled,
color=self.color,
)
self.action = QAction(self.icon, self.tooltip, parent=parent)
self.action.setCheckable(self.checkable)
def add_to_toolbar(self, toolbar: QToolBar, target: QWidget):
"""
Adds the action to the toolbar.
Args:
toolbar(QToolBar): The toolbar to add the action to.
target(QWidget): The target widget for the action.
"""
toolbar.addAction(self.action)
def get_icon(self):
"""
Returns the icon for the action.
Returns:
QIcon: The icon for the action.
"""
return self.icon
return icon
class DeviceSelectionAction(ToolBarAction):
@@ -203,9 +132,10 @@ class DeviceSelectionAction(ToolBarAction):
Args:
label (str): The label for the combobox.
device_combobox (DeviceComboBox): The combobox for selecting the device.
"""
def __init__(self, label: str | None = None, device_combobox=None):
def __init__(self, label: str, device_combobox):
super().__init__()
self.label = label
self.device_combobox = device_combobox
@@ -214,131 +144,15 @@ class DeviceSelectionAction(ToolBarAction):
def add_to_toolbar(self, toolbar, target):
widget = QWidget()
layout = QHBoxLayout(widget)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(0)
if self.label is not None:
label = QLabel(f"{self.label}")
layout.addWidget(label)
if self.device_combobox is not None:
layout.addWidget(self.device_combobox)
toolbar.addWidget(widget)
label = QLabel(f"{self.label}")
layout.addWidget(label)
layout.addWidget(self.device_combobox)
toolbar.addWidget(widget)
def set_combobox_style(self, color: str):
self.device_combobox.setStyleSheet(f"QComboBox {{ background-color: {color}; }}")
class SwitchableToolBarAction(ToolBarAction):
"""
A split toolbar action that combines a main action and a drop-down menu for additional actions.
The main button displays the currently selected action's icon and tooltip. Clicking on the main button
triggers that action. Clicking on the drop-down arrow displays a menu with alternative actions. When an
alternative action is selected, it becomes the new default and its callback is immediately executed.
This design mimics the behavior seen in Adobe Photoshop or Affinity Designer toolbars.
Args:
actions (dict): A dictionary mapping a unique key to a ToolBarAction instance.
initial_action (str, optional): The key of the initial default action. If not provided, the first action is used.
tooltip (str, optional): An optional tooltip for the split action; if provided, it overrides the default action's tooltip.
checkable (bool, optional): Whether the action is checkable. Defaults to True.
parent (QWidget, optional): Parent widget for the underlying QAction.
"""
def __init__(
self,
actions: Dict[str, ToolBarAction],
initial_action: str = None,
tooltip: str = None,
checkable: bool = True,
default_state_checked: bool = False,
parent=None,
):
super().__init__(icon_path=None, tooltip=tooltip, checkable=checkable)
self.actions = actions
self.current_key = initial_action if initial_action is not None else next(iter(actions))
self.parent = parent
self.checkable = checkable
self.default_state_checked = default_state_checked
self.main_button = None
self.menu_actions: Dict[str, QAction] = {}
def add_to_toolbar(self, toolbar: QToolBar, target: QWidget):
"""
Adds the split action to the toolbar.
Args:
toolbar (QToolBar): The toolbar to add the action to.
target (QWidget): The target widget for the action.
"""
self.main_button = LongPressToolButton(toolbar)
self.main_button.setPopupMode(QToolButton.MenuButtonPopup)
self.main_button.setCheckable(self.checkable)
default_action = self.actions[self.current_key]
self.main_button.setIcon(default_action.get_icon())
self.main_button.setToolTip(default_action.tooltip)
self.main_button.clicked.connect(self._trigger_current_action)
menu = QMenu(self.main_button)
for key, action_obj in self.actions.items():
menu_action = QAction(action_obj.get_icon(), action_obj.tooltip, self.main_button)
menu_action.setIconVisibleInMenu(True)
menu_action.setCheckable(self.checkable)
menu_action.setChecked(key == self.current_key)
menu_action.triggered.connect(lambda checked, k=key: self.set_default_action(k))
menu.addAction(menu_action)
self.main_button.setMenu(menu)
toolbar.addWidget(self.main_button)
def _trigger_current_action(self):
"""
Triggers the current action associated with the main button.
"""
action_obj = self.actions[self.current_key]
action_obj.action.trigger()
def set_default_action(self, key: str):
"""
Sets the default action for the split action.
Args:
key(str): The key of the action to set as default.
"""
self.current_key = key
new_action = self.actions[self.current_key]
self.main_button.setIcon(new_action.get_icon())
self.main_button.setToolTip(new_action.tooltip)
# Update check state of menu items
for k, menu_act in self.actions.items():
menu_act.action.setChecked(False)
new_action.action.trigger()
# Active action chosen from menu is always checked, uncheck through main button
if self.checkable:
new_action.action.setChecked(True)
self.main_button.setChecked(True)
def block_all_signals(self, block: bool = True):
"""
Blocks or unblocks all signals for the actions in the toolbar.
Args:
block (bool): Whether to block signals. Defaults to True.
"""
self.main_button.blockSignals(block)
for action in self.actions.values():
action.action.blockSignals(block)
def set_state_all(self, state: bool):
"""
Uncheck all actions in the toolbar.
"""
for action in self.actions.values():
action.action.setChecked(state)
self.main_button.setChecked(state)
def get_icon(self) -> QIcon:
return self.actions[self.current_key].get_icon()
class WidgetAction(ToolBarAction):
"""
Action for adding any widget to the toolbar.
@@ -346,33 +160,19 @@ class WidgetAction(ToolBarAction):
Args:
label (str|None): The label for the widget.
widget (QWidget): The widget to be added to the toolbar.
"""
def __init__(
self,
label: str | None = None,
widget: QWidget = None,
adjust_size: bool = True,
parent=None,
):
super().__init__(icon_path=None, tooltip=label, checkable=False)
def __init__(self, label: str | None = None, widget: QWidget = None, parent=None):
super().__init__(parent)
self.label = label
self.widget = widget
self.container = None
self.adjust_size = adjust_size
def add_to_toolbar(self, toolbar: QToolBar, target: QWidget):
"""
Adds the widget to the toolbar.
Args:
toolbar (QToolBar): The toolbar to add the widget to.
target (QWidget): The target widget for the action.
"""
self.container = QWidget()
layout = QHBoxLayout(self.container)
container = QWidget()
layout = QHBoxLayout(container)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(0)
layout.setSpacing(5)
if self.label is not None:
label_widget = QLabel(f"{self.label}")
@@ -380,7 +180,7 @@ class WidgetAction(ToolBarAction):
label_widget.setAlignment(Qt.AlignVCenter | Qt.AlignRight)
layout.addWidget(label_widget)
if isinstance(self.widget, QComboBox) and self.adjust_size:
if isinstance(self.widget, QComboBox):
self.widget.setSizeAdjustPolicy(QComboBox.AdjustToContents)
size_policy = QSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed)
@@ -393,12 +193,19 @@ class WidgetAction(ToolBarAction):
layout.addWidget(self.widget)
toolbar.addWidget(self.container)
# Store the container as the action to allow toggling visibility.
self.action = self.container
toolbar.addWidget(container)
@staticmethod
def calculate_minimum_width(combo_box: QComboBox) -> int:
"""
Calculate the minimum width required to display the longest item in the combo box.
Args:
combo_box (QComboBox): The combo box to calculate the width for.
Returns:
int: The calculated minimum width in pixels.
"""
font_metrics = combo_box.fontMetrics()
max_width = max(font_metrics.width(combo_box.itemText(i)) for i in range(combo_box.count()))
return max_width + 60
@@ -412,6 +219,7 @@ class ExpandableMenuAction(ToolBarAction):
label (str): The label for the menu.
actions (dict): A dictionary of actions to populate the menu.
icon_path (str, optional): The path to the icon file. Defaults to None.
"""
def __init__(self, label: str, actions: dict, icon_path: str = None):
@@ -438,15 +246,12 @@ class ExpandableMenuAction(ToolBarAction):
menu = QMenu(button)
for action_id, action in self.actions.items():
sub_action = QAction(action.tooltip, target)
sub_action.setIconVisibleInMenu(True)
if action.icon_path:
if hasattr(action, "icon_path"):
icon = QIcon()
icon.addFile(action.icon_path, size=QSize(20, 20))
sub_action.setIcon(icon)
elif hasattr(action, "get_icon") and callable(action.get_icon):
sub_icon = action.get_icon()
if sub_icon and not sub_icon.isNull():
sub_action.setIcon(sub_icon)
elif hasattr(action, "get_icon"):
sub_action.setIcon(action.get_icon())
sub_action.setCheckable(action.checkable)
menu.addAction(sub_action)
self.widgets[action_id] = sub_action
@@ -454,547 +259,38 @@ class ExpandableMenuAction(ToolBarAction):
toolbar.addWidget(button)
class ToolbarBundle:
"""
Represents a bundle of toolbar actions, keyed by action_id.
Allows direct dictionary-like access: self.actions["some_id"] -> ToolBarAction object.
"""
def __init__(self, bundle_id: str = None, actions=None):
"""
Args:
bundle_id (str): Unique identifier for the bundle.
actions: Either None or a list of (action_id, ToolBarAction) tuples.
"""
self.bundle_id = bundle_id
self._actions: dict[str, ToolBarAction] = {}
if actions is not None:
for action_id, action in actions:
self._actions[action_id] = action
def add_action(self, action_id: str, action: ToolBarAction):
"""
Adds or replaces an action in the bundle.
Args:
action_id (str): Unique identifier for the action.
action (ToolBarAction): The action to add.
"""
self._actions[action_id] = action
def remove_action(self, action_id: str):
"""
Removes an action from the bundle by ID.
Ignores if not present.
Args:
action_id (str): Unique identifier for the action to remove.
"""
self._actions.pop(action_id, None)
@property
def actions(self) -> dict[str, ToolBarAction]:
"""
Return the internal dictionary of actions so that you can do
bundle.actions["drag_mode"] -> ToolBarAction instance.
"""
return self._actions
class ModularToolBar(QToolBar):
"""Modular toolbar with optional automatic initialization.
Args:
parent (QWidget, optional): The parent widget of the toolbar. Defaults to None.
actions (dict, optional): A dictionary of action creators to populate the toolbar. Defaults to None.
actions (list[ToolBarAction], optional): A list of action creators to populate the toolbar. Defaults to None.
target_widget (QWidget, optional): The widget that the actions will target. Defaults to None.
orientation (Literal["horizontal", "vertical"], optional): The initial orientation of the toolbar. Defaults to "horizontal".
background_color (str, optional): The background color of the toolbar. Defaults to "rgba(0, 0, 0, 0)".
"""
def __init__(
self,
parent=None,
actions: dict | None = None,
target_widget=None,
orientation: Literal["horizontal", "vertical"] = "horizontal",
background_color: str = "rgba(0, 0, 0, 0)",
):
def __init__(self, parent=None, actions: dict | None = None, target_widget=None):
super().__init__(parent)
self.widgets = defaultdict(dict)
self.background_color = background_color
self.set_background_color(self.background_color)
# Set the initial orientation
self.set_orientation(orientation)
# Initialize bundles
self.bundles = {}
self.toolbar_items = []
self.set_background_color()
if actions is not None and target_widget is not None:
self.populate_toolbar(actions, target_widget)
def populate_toolbar(self, actions: dict, target_widget: QWidget):
def populate_toolbar(self, actions: dict, target_widget):
"""Populates the toolbar with a set of actions.
Args:
actions (dict): A dictionary of action creators to populate the toolbar.
actions (list[ToolBarAction]): A list of action creators to populate the toolbar.
target_widget (QWidget): The widget that the actions will target.
"""
self.clear()
self.toolbar_items.clear() # Reset the order tracking
for action_id, action in actions.items():
action.add_to_toolbar(self, target_widget)
self.widgets[action_id] = action
self.toolbar_items.append(("action", action_id))
self.update_separators() # Ensure separators are updated after populating
def set_background_color(self, color: str = "rgba(0, 0, 0, 0)"):
"""
Sets the background color and other appearance settings.
Args:
color (str): The background color of the toolbar.
"""
def set_background_color(self):
self.setIconSize(QSize(20, 20))
self.setMovable(False)
self.setFloatable(False)
self.setContentsMargins(0, 0, 0, 0)
self.background_color = color
self.setStyleSheet(f"QToolBar {{ background-color: {color}; border: none; }}")
def set_orientation(self, orientation: Literal["horizontal", "vertical"]):
"""Sets the orientation of the toolbar.
Args:
orientation (Literal["horizontal", "vertical"]): The desired orientation of the toolbar.
"""
if orientation == "horizontal":
self.setOrientation(Qt.Horizontal)
elif orientation == "vertical":
self.setOrientation(Qt.Vertical)
else:
raise ValueError("Orientation must be 'horizontal' or 'vertical'.")
def update_material_icon_colors(self, new_color: str | tuple | QColor):
"""
Updates the color of all MaterialIconAction icons.
Args:
new_color (str | tuple | QColor): The new color.
"""
for action in self.widgets.values():
if isinstance(action, MaterialIconAction):
action.color = new_color
updated_icon = action.get_icon()
action.action.setIcon(updated_icon)
def add_action(self, action_id: str, action: ToolBarAction, target_widget: QWidget):
"""
Adds a new standalone action dynamically.
Args:
action_id (str): Unique identifier.
action (ToolBarAction): The action to add.
target_widget (QWidget): The target widget.
"""
if action_id in self.widgets:
raise ValueError(f"Action with ID '{action_id}' already exists.")
action.add_to_toolbar(self, target_widget)
self.widgets[action_id] = action
self.toolbar_items.append(("action", action_id))
self.update_separators()
def hide_action(self, action_id: str):
"""
Hides a specific action.
Args:
action_id (str): Unique identifier.
"""
if action_id not in self.widgets:
raise ValueError(f"Action with ID '{action_id}' does not exist.")
action = self.widgets[action_id]
if hasattr(action, "action") and action.action is not None:
action.action.setVisible(False)
self.update_separators()
def show_action(self, action_id: str):
"""
Shows a specific action.
Args:
action_id (str): Unique identifier.
"""
if action_id not in self.widgets:
raise ValueError(f"Action with ID '{action_id}' does not exist.")
action = self.widgets[action_id]
if hasattr(action, "action") and action.action is not None:
action.action.setVisible(True)
self.update_separators()
def add_bundle(self, bundle: ToolbarBundle, target_widget: QWidget):
"""
Adds a bundle of actions, separated by a separator.
Args:
bundle (ToolbarBundle): The bundle.
target_widget (QWidget): The target widget.
"""
if bundle.bundle_id in self.bundles:
raise ValueError(f"ToolbarBundle with ID '{bundle.bundle_id}' already exists.")
if self.toolbar_items:
sep = SeparatorAction()
sep.add_to_toolbar(self, target_widget)
self.toolbar_items.append(("separator", None))
for action_id, action_obj in bundle.actions.items():
action_obj.add_to_toolbar(self, target_widget)
self.widgets[action_id] = action_obj
self.bundles[bundle.bundle_id] = list(bundle.actions.keys())
self.toolbar_items.append(("bundle", bundle.bundle_id))
self.update_separators()
def add_action_to_bundle(self, bundle_id: str, action_id: str, action, target_widget: QWidget):
"""
Dynamically adds an action to an existing bundle.
Args:
bundle_id (str): The bundle ID.
action_id (str): Unique identifier.
action (ToolBarAction): The action to add.
target_widget (QWidget): The target widget.
"""
if bundle_id not in self.bundles:
raise ValueError(f"Bundle '{bundle_id}' does not exist.")
if action_id in self.widgets:
raise ValueError(f"Action with ID '{action_id}' already exists.")
action.add_to_toolbar(self, target_widget)
new_qaction = action.action
self.removeAction(new_qaction)
bundle_action_ids = self.bundles[bundle_id]
if bundle_action_ids:
last_bundle_action = self.widgets[bundle_action_ids[-1]].action
actions_list = self.actions()
try:
index = actions_list.index(last_bundle_action)
except ValueError:
self.addAction(new_qaction)
else:
if index + 1 < len(actions_list):
before_action = actions_list[index + 1]
self.insertAction(before_action, new_qaction)
else:
self.addAction(new_qaction)
else:
self.addAction(new_qaction)
self.widgets[action_id] = action
self.bundles[bundle_id].append(action_id)
self.update_separators()
def contextMenuEvent(self, event):
"""
Overrides the context menu event to show toolbar actions with checkboxes and icons.
Args:
event (QContextMenuEvent): The context menu event.
"""
menu = QMenu(self)
for item_type, identifier in self.toolbar_items:
if item_type == "separator":
menu.addSeparator()
elif item_type == "bundle":
self.handle_bundle_context_menu(menu, identifier)
elif item_type == "action":
self.handle_action_context_menu(menu, identifier)
menu.triggered.connect(self.handle_menu_triggered)
menu.exec_(event.globalPos())
def handle_bundle_context_menu(self, menu: QMenu, bundle_id: str):
"""
Adds bundle actions to the context menu.
Args:
menu (QMenu): The context menu.
bundle_id (str): The bundle identifier.
"""
action_ids = self.bundles.get(bundle_id, [])
for act_id in action_ids:
toolbar_action = self.widgets.get(act_id)
if not isinstance(toolbar_action, ToolBarAction) or not hasattr(
toolbar_action, "action"
):
continue
qaction = toolbar_action.action
if not isinstance(qaction, QAction):
continue
display_name = qaction.text() or toolbar_action.tooltip or act_id
menu_action = QAction(display_name, self)
menu_action.setCheckable(True)
menu_action.setChecked(qaction.isVisible())
menu_action.setData(act_id) # Store the action_id
# Set the icon if available
if qaction.icon() and not qaction.icon().isNull():
menu_action.setIcon(qaction.icon())
menu.addAction(menu_action)
def handle_action_context_menu(self, menu: QMenu, action_id: str):
"""
Adds a single toolbar action to the context menu.
Args:
menu (QMenu): The context menu to which the action is added.
action_id (str): Unique identifier for the action.
"""
toolbar_action = self.widgets.get(action_id)
if not isinstance(toolbar_action, ToolBarAction) or not hasattr(toolbar_action, "action"):
return
qaction = toolbar_action.action
if not isinstance(qaction, QAction):
return
display_name = qaction.text() or toolbar_action.tooltip or action_id
menu_action = QAction(display_name, self)
menu_action.setCheckable(True)
menu_action.setChecked(qaction.isVisible())
menu_action.setData(action_id) # Store the action_id
# Set the icon if available
if qaction.icon() and not qaction.icon().isNull():
menu_action.setIcon(qaction.icon())
menu.addAction(menu_action)
def handle_menu_triggered(self, action):
"""
Handles the triggered signal from the context menu.
Args:
action: Action triggered.
"""
action_id = action.data()
if action_id:
self.toggle_action_visibility(action_id, action.isChecked())
def toggle_action_visibility(self, action_id: str, visible: bool):
"""
Toggles the visibility of a specific action.
Args:
action_id (str): Unique identifier.
visible (bool): Whether the action should be visible.
"""
if action_id not in self.widgets:
return
tool_action = self.widgets[action_id]
if hasattr(tool_action, "action") and tool_action.action is not None:
tool_action.action.setVisible(visible)
self.update_separators()
def update_separators(self):
"""
Hide separators that are adjacent to another separator or have no non-separator actions between them.
"""
toolbar_actions = self.actions()
# First pass: set visibility based on surrounding non-separator actions.
for i, action in enumerate(toolbar_actions):
if not action.isSeparator():
continue
prev_visible = None
for j in range(i - 1, -1, -1):
if toolbar_actions[j].isVisible():
prev_visible = toolbar_actions[j]
break
next_visible = None
for j in range(i + 1, len(toolbar_actions)):
if toolbar_actions[j].isVisible():
next_visible = toolbar_actions[j]
break
if (prev_visible is None or prev_visible.isSeparator()) and (
next_visible is None or next_visible.isSeparator()
):
action.setVisible(False)
else:
action.setVisible(True)
# Second pass: ensure no two visible separators are adjacent.
prev = None
for action in toolbar_actions:
if action.isVisible() and action.isSeparator():
if prev and prev.isSeparator():
action.setVisible(False)
else:
prev = action
else:
if action.isVisible():
prev = action
class MainWindow(QMainWindow): # pragma: no cover
def __init__(self):
super().__init__()
self.setWindowTitle("Toolbar / ToolbarBundle Demo")
self.central_widget = QWidget()
self.setCentralWidget(self.central_widget)
self.test_label = QLabel(text="This is a test label.")
self.central_widget.layout = QVBoxLayout(self.central_widget)
self.central_widget.layout.addWidget(self.test_label)
self.toolbar = ModularToolBar(parent=self, target_widget=self)
self.addToolBar(self.toolbar)
self.add_switchable_button_checkable()
self.add_switchable_button_non_checkable()
self.add_widget_actions()
self.add_bundles()
self.add_menus()
# For theme testing
self.dark_button = DarkModeButton(toolbar=True)
dark_mode_action = WidgetAction(label=None, widget=self.dark_button)
self.toolbar.add_action("dark_mode", dark_mode_action, self)
def add_bundles(self):
home_action = MaterialIconAction(
icon_name="home", tooltip="Home", checkable=False, parent=self
)
settings_action = MaterialIconAction(
icon_name="settings", tooltip="Settings", checkable=True, parent=self
)
profile_action = MaterialIconAction(
icon_name="person", tooltip="Profile", checkable=True, parent=self
)
main_actions_bundle = ToolbarBundle(
bundle_id="main_actions",
actions=[
("home_action", home_action),
("settings_action", settings_action),
("profile_action", profile_action),
],
)
self.toolbar.add_bundle(main_actions_bundle, target_widget=self)
home_action.action.triggered.connect(lambda: self.switchable_action.set_state_all(False))
search_action = MaterialIconAction(
icon_name="search", tooltip="Search", checkable=False, parent=self
)
help_action = MaterialIconAction(
icon_name="help", tooltip="Help", checkable=False, parent=self
)
second_bundle = ToolbarBundle(
bundle_id="secondary_actions",
actions=[("search_action", search_action), ("help_action", help_action)],
)
self.toolbar.add_bundle(second_bundle, target_widget=self)
new_action = MaterialIconAction(
icon_name="counter_1", tooltip="New Action", checkable=True, parent=self
)
self.toolbar.add_action_to_bundle(
"main_actions", "new_action", new_action, target_widget=self
)
def add_menus(self):
menu_material_actions = {
"mat1": MaterialIconAction(
icon_name="home", tooltip="Material Home", checkable=True, parent=self
),
"mat2": MaterialIconAction(
icon_name="settings", tooltip="Material Settings", checkable=True, parent=self
),
"mat3": MaterialIconAction(
icon_name="info", tooltip="Material Info", checkable=True, parent=self
),
}
menu_qt_actions = {
"qt1": QtIconAction(
standard_icon=QStyle.SP_FileIcon, tooltip="Qt File", checkable=True, parent=self
),
"qt2": QtIconAction(
standard_icon=QStyle.SP_DirIcon, tooltip="Qt Directory", checkable=True, parent=self
),
"qt3": QtIconAction(
standard_icon=QStyle.SP_TrashIcon, tooltip="Qt Trash", checkable=True, parent=self
),
}
expandable_menu_material = ExpandableMenuAction(
label="Material Menu", actions=menu_material_actions
)
expandable_menu_qt = ExpandableMenuAction(label="Qt Menu", actions=menu_qt_actions)
self.toolbar.add_action("material_menu", expandable_menu_material, self)
self.toolbar.add_action("qt_menu", expandable_menu_qt, self)
def add_switchable_button_checkable(self):
action1 = MaterialIconAction(
icon_name="hdr_auto", tooltip="Action 1", checkable=True, parent=self
)
action2 = MaterialIconAction(
icon_name="hdr_auto", tooltip="Action 2", checkable=True, filled=True, parent=self
)
self.switchable_action = SwitchableToolBarAction(
actions={"action1": action1, "action2": action2},
initial_action="action1",
tooltip="Switchable Action",
checkable=True,
parent=self,
)
self.toolbar.add_action("switchable_action", self.switchable_action, self)
action1.action.toggled.connect(
lambda checked: self.test_label.setText(f"Action 1 triggered, checked = {checked}")
)
action2.action.toggled.connect(
lambda checked: self.test_label.setText(f"Action 2 triggered, checked = {checked}")
)
def add_switchable_button_non_checkable(self):
action1 = MaterialIconAction(
icon_name="counter_1", tooltip="Action 1", checkable=False, parent=self
)
action2 = MaterialIconAction(
icon_name="counter_2", tooltip="Action 2", checkable=False, parent=self
)
switchable_action = SwitchableToolBarAction(
actions={"action1": action1, "action2": action2},
initial_action="action1",
tooltip="Switchable Action",
checkable=False,
parent=self,
)
self.toolbar.add_action("switchable_action_no_toggle", switchable_action, self)
action1.action.triggered.connect(
lambda checked: self.test_label.setText(
f"Action 1 (non-checkable) triggered, checked = {checked}"
)
)
action2.action.triggered.connect(
lambda checked: self.test_label.setText(
f"Action 2 (non-checkable) triggered, checked = {checked}"
)
)
switchable_action.actions["action1"].action.setChecked(True)
def add_widget_actions(self):
combo = QComboBox()
combo.addItems(["Option 1", "Option 2", "Option 3"])
self.toolbar.add_action("device_combo", WidgetAction(label="Device:", widget=combo), self)
if __name__ == "__main__": # pragma: no cover
app = QApplication(sys.argv)
set_theme("light")
main_window = MainWindow()
main_window.show()
sys.exit(app.exec_())
self.setStyleSheet("QToolBar { background-color: rgba(0, 0, 0, 0); border: none; }")

View File

@@ -224,11 +224,3 @@ DEVICES = [
Positioner("test", limits=[-10, 10], read_value=2.0),
Device("test_device"),
]
def check_remote_data_size(widget, plot_name, num_elements):
"""
Check if the remote data has the correct number of elements.
Used in the qtbot.waitUntil function.
"""
return len(widget.get_all_data()[plot_name]["x"]) == num_elements

View File

@@ -4,8 +4,7 @@ from __future__ import annotations
import os
import time
import uuid
from datetime import datetime
from typing import TYPE_CHECKING, Optional
from typing import Optional
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import_from
@@ -13,15 +12,11 @@ from pydantic import BaseModel, Field, field_validator
from qtpy.QtCore import QObject, QRunnable, QThreadPool, Signal
from qtpy.QtWidgets import QApplication
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.cli.rpc_register import RPCRegister
from bec_widgets.qt_utils.error_popups import ErrorPopupUtility
from bec_widgets.qt_utils.error_popups import SafeSlot as pyqtSlot
from bec_widgets.utils.container_utils import WidgetContainerUtils
from bec_widgets.utils.yaml_dialog import load_yaml, load_yaml_gui, save_yaml, save_yaml_gui
if TYPE_CHECKING:
from bec_widgets.utils.bec_dispatcher import BECDispatcher
logger = bec_logger.logger
BECDispatcher = lazy_import_from("bec_widgets.utils.bec_dispatcher", ("BECDispatcher",))
@@ -41,7 +36,8 @@ class ConnectionConfig(BaseModel):
"""Generate a GUI ID if none is provided."""
if v is None:
widget_class = values.data["widget_class"]
v = f"{widget_class}_{datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f')}"
v = f"{widget_class}_{str(time.time())}"
return v
return v
@@ -73,16 +69,10 @@ class Worker(QRunnable):
class BECConnector:
"""Connection mixin class to handle BEC client and device manager"""
USER_ACCESS = ["_config_dict", "_get_all_rpc", "_rpc_id"]
USER_ACCESS = ["_config_dict", "_get_all_rpc"]
EXIT_HANDLERS = {}
def __init__(
self,
client=None,
config: ConnectionConfig | None = None,
gui_id: str | None = None,
name: str | None = None,
):
def __init__(self, client=None, config: ConnectionConfig = None, gui_id: str = None):
# BEC related connections
self.bec_dispatcher = BECDispatcher(client=client)
self.client = self.bec_dispatcher.client if client is None else client
@@ -110,22 +100,15 @@ class BECConnector:
)
self.config = ConnectionConfig(widget_class=self.__class__.__name__)
# I feel that we should not allow BECConnector to be created with a custom gui_id
# because this would break with the logic in the RPCRegister of retrieving widgets by type
# iterating over all widgets and checkinf if the register widget starts with the string that is passsed.
# If the gui_id is randomly generated, this would break since that widget would have a
# gui_id that is generated in a different way.
if gui_id:
self.config.gui_id = gui_id
self.gui_id: str = gui_id
self.gui_id = gui_id
else:
self.gui_id: str = self.config.gui_id # type: ignore
if name is None:
name = self.__class__.__name__
else:
if not WidgetContainerUtils.has_name_valid_chars(name):
raise ValueError(f"Name {name} contains invalid characters.")
self._name = name if name else self.__class__.__name__
self.gui_id = self.config.gui_id
# register widget to rpc register
# be careful: when registering, and the object is not a BECWidget,
# cleanup has to called manually since there is no 'closeEvent'
self.rpc_register = RPCRegister()
self.rpc_register.add_rpc(self)
@@ -133,8 +116,6 @@ class BECConnector:
self.error_utility = ErrorPopupUtility()
self._thread_pool = QThreadPool.globalInstance()
# Store references to running workers so they're not garbage collected prematurely.
self._workers = []
def submit_task(self, fn, *args, on_complete: pyqtSlot = None, **kwargs) -> Worker:
"""
@@ -163,14 +144,11 @@ class BECConnector:
>>> def on_complete():
>>> print("Task complete")
>>> self.submit_task(my_function, 1, 2, on_complete=on_complete)
"""
worker = Worker(fn, *args, **kwargs)
if on_complete:
worker.signals.completed.connect(on_complete)
# Keep a reference to the worker so it is not garbage collected.
self._workers.append(worker)
# When the worker is done, remove it from our list.
worker.signals.completed.connect(lambda: self._workers.remove(worker))
self._thread_pool.start(worker)
return worker
@@ -202,39 +180,37 @@ class BECConnector:
@_config_dict.setter
def _config_dict(self, config: BaseModel) -> None:
"""
Set the configuration of the widget.
Get the configuration of the widget.
Args:
config (BaseModel): The new configuration model.
Returns:
dict: The configuration of the widget.
"""
self.config = config
# FIXME some thoughts are required to decide how thhis should work with rpc registry
def apply_config(self, config: dict, generate_new_id: bool = True) -> None:
"""
Apply the configuration to the widget.
Args:
config (dict): Configuration settings.
generate_new_id (bool): If True, generate a new GUI ID for the widget.
config(dict): Configuration settings.
generate_new_id(bool): If True, generate a new GUI ID for the widget.
"""
self.config = ConnectionConfig(**config)
if generate_new_id is True:
gui_id = str(uuid.uuid4())
self.rpc_register.remove_rpc(self)
self._set_gui_id(gui_id)
self.set_gui_id(gui_id)
self.rpc_register.add_rpc(self)
else:
self.gui_id = self.config.gui_id
# FIXME some thoughts are required to decide how thhis should work with rpc registry
def load_config(self, path: str | None = None, gui: bool = False):
"""
Load the configuration of the widget from YAML.
Args:
path (str | None): Path to the configuration file for non-GUI dialog mode.
gui (bool): If True, use the GUI dialog to load the configuration file.
path(str): Path to the configuration file for non-GUI dialog mode.
gui(bool): If True, use the GUI dialog to load the configuration file.
"""
if gui is True:
config = load_yaml_gui(self)
@@ -253,8 +229,8 @@ class BECConnector:
Save the configuration of the widget to YAML.
Args:
path (str | None): Path to save the configuration file for non-GUI dialog mode.
gui (bool): If True, use the GUI dialog to save the configuration file.
path(str): Path to save the configuration file for non-GUI dialog mode.
gui(bool): If True, use the GUI dialog to save the configuration file.
"""
if gui is True:
save_yaml_gui(self, self._config_dict)
@@ -262,15 +238,16 @@ class BECConnector:
if path is None:
path = os.getcwd()
file_path = os.path.join(path, f"{self.__class__.__name__}_config.yaml")
save_yaml(file_path, self._config_dict)
# @pyqtSlot(str)
def _set_gui_id(self, gui_id: str) -> None:
@pyqtSlot(str)
def set_gui_id(self, gui_id: str) -> None:
"""
Set the GUI ID for the widget.
Args:
gui_id (str): GUI ID.
gui_id(str): GUI ID
"""
self.config.gui_id = gui_id
self.gui_id = gui_id
@@ -291,7 +268,7 @@ class BECConnector:
"""Update the client and device manager from BEC and create object for BEC shortcuts.
Args:
client: BEC client.
client: BEC client
"""
self.client = client
self.get_bec_shortcuts()
@@ -302,68 +279,25 @@ class BECConnector:
Update the configuration for the widget.
Args:
config (ConnectionConfig | dict): Configuration settings.
config(ConnectionConfig): Configuration settings.
"""
gui_id = getattr(config, "gui_id", None)
if isinstance(config, dict):
config = ConnectionConfig(**config)
self.config = config
if gui_id and config.gui_id != gui_id: # Recreating config should not overwrite the gui_id
self.config.gui_id = gui_id
# TODO add error handler
def remove(self):
"""Cleanup the BECConnector"""
if hasattr(self, "close"):
self.close()
if hasattr(self, "deleteLater"):
self.deleteLater()
else:
self.rpc_register.remove_rpc(self)
self.config = config
def get_config(self, dict_output: bool = True) -> dict | BaseModel:
"""
Get the configuration of the widget.
Args:
dict_output (bool): If True, return the configuration as a dictionary.
If False, return the configuration as a pydantic model.
dict_output(bool): If True, return the configuration as a dictionary. If False, return the configuration as a pydantic model.
Returns:
dict | BaseModel: The configuration of the widget.
dict: The configuration of the plot widget.
"""
if dict_output:
return self.config.model_dump()
else:
return self.config
# --- Example usage of BECConnector: running a simple task ---
if __name__ == "__main__": # pragma: no cover
import sys
# Create a QApplication instance (required for QThreadPool)
app = QApplication(sys.argv)
connector = BECConnector()
def print_numbers():
"""
Task function that prints numbers 1 to 10 with a 0.5 second delay between each.
"""
for i in range(1, 11):
print(i)
time.sleep(0.5)
def task_complete():
"""
Called when the task is complete.
"""
print("Task complete")
# Exit the application after the task completes.
app.quit()
# Submit the task using the connector's submit_task method.
connector.submit_task(print_numbers, on_complete=task_complete)
# Start the Qt event loop.
sys.exit(app.exec_())

View File

@@ -93,24 +93,17 @@ def patch_designer(): # pragma: no cover
_extend_path_var("PATH", os.fspath(Path(sys._base_executable).parent), True)
else:
if sys.platform == "linux":
suffix = f"{sys.abiflags}.so"
env_var = "LD_PRELOAD"
current_pid = os.getpid()
with open(f"/proc/{current_pid}/maps", "rt") as f:
for line in f:
if "libpython" in line:
lib_path = line.split()[-1]
os.environ[env_var] = lib_path
break
elif sys.platform == "darwin":
suffix = ".dylib"
env_var = "DYLD_INSERT_LIBRARIES"
version = f"{major_version}.{minor_version}"
library_name = f"libpython{version}{suffix}"
lib_path = str(Path(sysconfig.get_config_var("LIBDIR")) / library_name)
os.environ[env_var] = lib_path
else:
raise RuntimeError(f"Unsupported platform: {sys.platform}")
version = f"{major_version}.{minor_version}"
library_name = f"libpython{version}{suffix}"
lib_path = str(Path(sysconfig.get_config_var("LIBDIR")) / library_name)
os.environ[env_var] = lib_path
if is_pyenv_python() or is_virtual_env():
# append all editable packages to the PYTHONPATH

View File

@@ -5,43 +5,28 @@ analyse data. Requesting a new fit may lead to request piling up and an overall
will allow you to decide by yourself when to unblock and execute the callback again."""
from pyqtgraph import SignalProxy
from qtpy.QtCore import QTimer, Signal
from bec_widgets.qt_utils.error_popups import SafeSlot
from qtpy.QtCore import Signal, Slot
class BECSignalProxy(SignalProxy):
"""
Thin wrapper around the SignalProxy class to allow signal calls to be blocked,
but arguments still being stored.
"""Thin wrapper around the SignalProxy class to allow signal calls to be blocked, but args still being stored
Args:
*args: Arguments to pass to the SignalProxy class.
rateLimit (int): The rateLimit of the proxy.
timeout (float): The number of seconds after which the proxy automatically
unblocks if still blocked. Default is 10.0 seconds.
**kwargs: Keyword arguments to pass to the SignalProxy class.
*args: Arguments to pass to the SignalProxy class
rateLimit (int): The rateLimit of the proxy
**kwargs: Keyword arguments to pass to the SignalProxy class
Example:
>>> proxy = BECSignalProxy(signal, rate_limit=25, slot=callback)
"""
>>> proxy = BECSignalProxy(signal, rate_limit=25, slot=callback)"""
is_blocked = Signal(bool)
def __init__(self, *args, rateLimit=25, timeout=10.0, **kwargs):
def __init__(self, *args, rateLimit=25, **kwargs):
super().__init__(*args, rateLimit=rateLimit, **kwargs)
self._blocking = False
self.old_args = None
self.new_args = None
# Store timeout value (in seconds)
self._timeout = timeout
# Create a single-shot timer for auto-unblocking
self._timer = QTimer()
self._timer.setSingleShot(True)
self._timer.timeout.connect(self._timeout_unblock)
@property
def blocked(self):
"""Returns if the proxy is blocked"""
@@ -61,30 +46,9 @@ class BECSignalProxy(SignalProxy):
self.old_args = args
super().signalReceived(*args)
self._timer.start(int(self._timeout * 1000))
@SafeSlot()
@Slot()
def unblock_proxy(self):
"""Unblock the proxy, and call the signalReceived method in case there was an update of the args."""
if self.blocked:
self._timer.stop()
self.blocked = False
if self.new_args != self.old_args:
self.signalReceived(*self.new_args)
@SafeSlot()
def _timeout_unblock(self):
"""
Internal method called by the QTimer upon timeout. Unblocks the proxy
automatically if it is still blocked.
"""
if self.blocked:
self.unblock_proxy()
def cleanup(self):
"""
Cleanup the proxy by stopping the timer and disconnecting the timeout signal.
"""
self._timer.stop()
self._timer.timeout.disconnect(self._timeout_unblock)
self._timer.deleteLater()
self.blocked = False
if self.new_args != self.old_args:
self.signalReceived(*self.new_args)

View File

@@ -1,7 +1,5 @@
from __future__ import annotations
from typing import TYPE_CHECKING
import darkdetect
from bec_lib.logger import bec_logger
from qtpy.QtCore import Slot
@@ -9,10 +7,6 @@ from qtpy.QtWidgets import QApplication, QWidget
from bec_widgets.utils.bec_connector import BECConnector, ConnectionConfig
from bec_widgets.utils.colors import set_theme
from bec_widgets.utils.container_utils import WidgetContainerUtils
if TYPE_CHECKING: # pragma: no cover
from bec_widgets.widgets.containers.dock import BECDock
logger = bec_logger.logger
@@ -23,18 +17,13 @@ class BECWidget(BECConnector):
# The icon name is the name of the icon in the icon theme, typically a name taken
# from fonts.google.com/icons. Override this in subclasses to set the icon name.
ICON_NAME = "widgets"
USER_ACCESS = ["remove"]
# pylint: disable=too-many-arguments
def __init__(
self,
client=None,
config: ConnectionConfig = None,
gui_id: str | None = None,
gui_id: str = None,
theme_update: bool = False,
name: str | None = None,
parent_dock: BECDock | None = None,
**kwargs,
):
"""
Base class for all BEC widgets. This class should be used as a mixin class for all BEC widgets, e.g.:
@@ -55,9 +44,9 @@ class BECWidget(BECConnector):
"""
if not isinstance(self, QWidget):
raise RuntimeError(f"{repr(self)} is not a subclass of QWidget")
super().__init__(client=client, config=config, gui_id=gui_id)
super().__init__(client=client, config=config, gui_id=gui_id, name=name)
self._parent_dock = parent_dock
# Set the theme to auto if it is not set yet
app = QApplication.instance()
if not hasattr(app, "theme"):
# DO NOT SET THE THEME TO AUTO! Otherwise, the qwebengineview will segfault
@@ -77,7 +66,7 @@ class BECWidget(BECConnector):
if hasattr(qapp, "theme_signal"):
qapp.theme_signal.theme_updated.connect(self._update_theme)
def _update_theme(self, theme: str | None = None):
def _update_theme(self, theme: str):
"""Update the theme."""
if theme is None:
qapp = QApplication.instance()
@@ -98,12 +87,10 @@ class BECWidget(BECConnector):
def cleanup(self):
"""Cleanup the widget."""
# All widgets need to call super().cleanup() in their cleanup method
self.rpc_register.remove_rpc(self)
def closeEvent(self, event):
"""Wrap the close even to ensure the rpc_register is cleaned up."""
self.rpc_register.remove_rpc(self)
try:
self.cleanup()
finally:
super().closeEvent(event) # pylint: disable=no-member
super().closeEvent(event)

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import itertools
import re
from typing import TYPE_CHECKING, Literal
@@ -70,64 +71,15 @@ def apply_theme(theme: Literal["dark", "light"]):
Apply the theme to all pyqtgraph widgets. Do not use this function directly. Use set_theme instead.
"""
app = QApplication.instance()
graphic_layouts = [
child
for top in app.topLevelWidgets()
for child in top.findChildren(pg.GraphicsLayoutWidget)
]
plot_items = [
item
for gl in graphic_layouts
for item in gl.ci.items.keys() # ci is internal pg.GraphicsLayout that hosts all items
if isinstance(item, pg.PlotItem)
]
histograms = [
item
for gl in graphic_layouts
for item in gl.ci.items.keys() # ci is internal pg.GraphicsLayout that hosts all items
if isinstance(item, pg.HistogramLUTItem)
]
# Update background color based on the theme
if theme == "light":
background_color = "#e9ecef" # Subtle contrast for light mode
foreground_color = "#141414"
label_color = "#000000"
axis_color = "#666666"
else:
background_color = "#141414" # Dark mode
foreground_color = "#e9ecef"
label_color = "#FFFFFF"
axis_color = "#CCCCCC"
# update GraphicsLayoutWidget
pg.setConfigOptions(foreground=foreground_color, background=background_color)
for pg_widget in graphic_layouts:
pg_widget.setBackground(background_color)
# update PlotItems
for plot_item in plot_items:
for axis in ["left", "right", "top", "bottom"]:
plot_item.getAxis(axis).setPen(pg.mkPen(color=axis_color))
plot_item.getAxis(axis).setTextPen(pg.mkPen(color=label_color))
# Change title color
plot_item.titleLabel.setText(plot_item.titleLabel.text, color=label_color)
# Change legend color
if hasattr(plot_item, "legend") and plot_item.legend is not None:
plot_item.legend.setLabelTextColor(label_color)
# if legend is in plot item and theme is changed, has to be like that because of pg opt logic
for sample, label in plot_item.legend.items:
label_text = label.text
label.setText(label_text, color=label_color)
# update HistogramLUTItem
for histogram in histograms:
histogram.axis.setPen(pg.mkPen(color=axis_color))
histogram.axis.setTextPen(pg.mkPen(color=label_color))
# go through all pyqtgraph widgets and set background
children = itertools.chain.from_iterable(
top.findChildren(pg.GraphicsLayoutWidget) for top in app.topLevelWidgets()
)
pg.setConfigOptions(
foreground="d" if theme == "dark" else "k", background="k" if theme == "dark" else "w"
)
for pg_widget in children:
pg_widget.setBackground("k" if theme == "dark" else "w")
# now define stylesheet according to theme and apply it
style = bec_qthemes.load_stylesheet(theme)

View File

@@ -1,55 +1,30 @@
from __future__ import annotations
import itertools
from typing import Literal, Type
from typing import Type
from qtpy.QtWidgets import QWidget
from bec_widgets.cli.rpc.rpc_register import RPCRegister
class WidgetContainerUtils:
# We need one handler that checks if a WIDGET of a given name is already created for that DOCKAREA
# 1. If the name exists, then it depends whether the name was auto-generated -> add _1 to the name
# or alternatively raise an error that it can't be added again ( just raise an error)
# 2. Dock names in between docks should also be unique
@staticmethod
def has_name_valid_chars(name: str) -> bool:
"""Check if the name is valid.
def generate_unique_widget_id(container: dict, prefix: str = "widget") -> str:
"""
Generate a unique widget ID.
Args:
name(str): The name to be checked.
container(dict): The container of widgets.
prefix(str): The prefix of the widget ID.
Returns:
bool: True if the name is valid, False otherwise.
widget_id(str): The unique widget ID.
"""
if not name or len(name) > 256:
return False # Don't accept empty names or names longer than 256 characters
check_value = name.replace("_", "").replace("-", "")
if not check_value.isalnum() or not check_value.isascii():
return False
return True
@staticmethod
def generate_unique_name(name: str, list_of_names: list[str] | None = None) -> str:
"""Generate a unique ID.
Args:
name(str): The name of the widget.
Returns:
tuple (str): The unique name
"""
if list_of_names is None:
list_of_names = []
ii = 0
while ii < 1000: # 1000 is arbritrary!
name_candidate = f"{name}_{ii}"
if name_candidate not in list_of_names:
return name_candidate
ii += 1
raise ValueError("Could not generate a unique name after within 1000 attempts.")
existing_ids = set(container.keys())
for i in itertools.count(1):
widget_id = f"{prefix}_{i}"
if widget_id not in existing_ids:
return widget_id
@staticmethod
def find_first_widget_by_class(

View File

@@ -1,7 +1,4 @@
from __future__ import annotations
from collections import defaultdict
from typing import Any
import numpy as np
import pyqtgraph as pg
@@ -9,16 +6,13 @@ from qtpy.QtCore import QObject, Qt, Signal, Slot
from qtpy.QtWidgets import QApplication
class CrosshairScatterItem(pg.ScatterPlotItem):
class NonDownsamplingScatterPlotItem(pg.ScatterPlotItem):
def setDownsampling(self, ds=None, auto=None, method=None):
pass
def setClipToView(self, state):
pass
def setAlpha(self, *args, **kwargs):
pass
class Crosshair(QObject):
# QT Position of mouse cursor
@@ -53,15 +47,9 @@ class Crosshair(QObject):
self.v_line.skip_auto_range = True
self.h_line = pg.InfiniteLine(angle=0, movable=False)
self.h_line.skip_auto_range = True
# Add custom attribute to identify crosshair lines
self.v_line.is_crosshair = True
self.h_line.is_crosshair = True
self.plot_item.addItem(self.v_line, ignoreBounds=True)
self.plot_item.addItem(self.h_line, ignoreBounds=True)
# Initialize highlighted curve in a case of multiple curves
self.highlighted_curve_index = None
# Add TextItem to display coordinates
self.coord_label = pg.TextItem("", anchor=(1, 1), fill=(0, 0, 0, 100))
self.coord_label.setVisible(False) # Hide initially
@@ -82,7 +70,6 @@ class Crosshair(QObject):
self.plot_item.ctrl.downsampleSpin.valueChanged.connect(self.clear_markers)
# Initialize markers
self.items = []
self.marker_moved_1d = {}
self.marker_clicked_1d = {}
self.marker_2d = None
@@ -126,103 +113,61 @@ class Crosshair(QObject):
self.coord_label.fill = pg.mkBrush(label_bg_color)
self.coord_label.border = pg.mkPen(None)
@Slot(int)
def update_highlighted_curve(self, curve_index: int):
"""
Update the highlighted curve in the case of multiple curves in a plot item.
Args:
curve_index(int): The index of curve to highlight
"""
self.highlighted_curve_index = curve_index
self.clear_markers()
self.update_markers()
def update_markers(self):
"""Update the markers for the crosshair, creating new ones if necessary."""
if self.highlighted_curve_index is not None and hasattr(self.plot_item, "visible_curves"):
# Focus on the highlighted curve only
self.items = [self.plot_item.visible_curves[self.highlighted_curve_index]]
else:
# Handle all curves
self.items = self.plot_item.items
# Create or update markers
for item in self.items:
# Create new markers
for item in self.plot_item.items:
if isinstance(item, pg.PlotDataItem): # 1D plot
if item.name() in self.marker_moved_1d:
continue
pen = item.opts["pen"]
color = pen.color() if hasattr(pen, "color") else pg.mkColor(pen)
name = item.name() or str(id(item))
if name in self.marker_moved_1d:
# Update existing markers
marker_moved = self.marker_moved_1d[name]
marker_moved.setPen(pg.mkPen(color))
# Update clicked markers' brushes
for marker_clicked in self.marker_clicked_1d[name]:
alpha = marker_clicked.opts["brush"].color().alpha()
marker_clicked.setBrush(
pg.mkBrush(color.red(), color.green(), color.blue(), alpha)
)
# Update z-values
marker_moved.setZValue(item.zValue() + 1)
for marker_clicked in self.marker_clicked_1d[name]:
marker_clicked.setZValue(item.zValue() + 1)
else:
# Create new markers
marker_moved = CrosshairScatterItem(
size=10, pen=pg.mkPen(color), brush=pg.mkBrush(None)
)
marker_moved.skip_auto_range = True
marker_moved.is_crosshair = True
self.marker_moved_1d[name] = marker_moved
self.plot_item.addItem(marker_moved)
# Set marker z-value higher than the curve
marker_moved.setZValue(item.zValue() + 1)
marker_moved = NonDownsamplingScatterPlotItem(
size=10, pen=pg.mkPen(color), brush=pg.mkBrush(None)
)
marker_moved.skip_auto_range = True
self.marker_moved_1d[item.name()] = marker_moved
self.plot_item.addItem(marker_moved)
# Create glowing effect markers for clicked events
for size, alpha in [(18, 64), (14, 128), (10, 255)]:
marker_clicked = NonDownsamplingScatterPlotItem(
size=size,
pen=pg.mkPen(None),
brush=pg.mkBrush(color.red(), color.green(), color.blue(), alpha),
)
marker_clicked.skip_auto_range = True
self.marker_clicked_1d[item.name()] = marker_clicked
self.plot_item.addItem(marker_clicked)
# Create glowing effect markers for clicked events
marker_clicked_list = []
for size, alpha in [(18, 64), (14, 128), (10, 255)]:
marker_clicked = CrosshairScatterItem(
size=size,
pen=pg.mkPen(None),
brush=pg.mkBrush(color.red(), color.green(), color.blue(), alpha),
)
marker_clicked.skip_auto_range = True
marker_clicked.is_crosshair = True
self.plot_item.addItem(marker_clicked)
marker_clicked.setZValue(item.zValue() + 1)
marker_clicked_list.append(marker_clicked)
self.marker_clicked_1d[name] = marker_clicked_list
elif isinstance(item, pg.ImageItem): # 2D plot
if self.marker_2d is not None:
continue
self.marker_2d = pg.ROI(
[0, 0], size=[1, 1], pen=pg.mkPen("r", width=2), movable=False
)
self.marker_2d.skip_auto_range = True
self.plot_item.addItem(self.marker_2d)
def snap_to_data(
self, x: float, y: float
) -> tuple[None, None] | tuple[defaultdict[Any, list], defaultdict[Any, list]]:
def snap_to_data(self, x, y) -> tuple[defaultdict[list], defaultdict[list]]:
"""
Finds the nearest data points to the given x and y coordinates.
Args:
x(float): The x-coordinate of the mouse cursor
y(float): The y-coordinate of the mouse cursor
x: The x-coordinate of the mouse cursor
y: The y-coordinate of the mouse cursor
Returns:
tuple: x and y values snapped to the nearest data
"""
y_values = defaultdict(list)
x_values = defaultdict(list)
image_2d = None
# Iterate through items in the plot
for item in self.items:
for item in self.plot_item.items:
if isinstance(item, pg.PlotDataItem): # 1D plot
name = item.name() or str(id(item))
name = item.name()
plot_data = item._getDisplayDataset()
if plot_data is None:
continue
@@ -241,9 +186,9 @@ class Crosshair(QObject):
y_values[name] = closest_y
x_values[name] = closest_x
elif isinstance(item, pg.ImageItem): # 2D plot
name = item.config.monitor or str(id(item))
name = item.config.monitor
image_2d = item.image
# Clip the x and y values to the image dimensions to avoid out of bounds errors
# clip the x and y values to the image dimensions to avoid out of bounds errors
y_values[name] = int(np.clip(y, 0, image_2d.shape[1] - 1))
x_values[name] = int(np.clip(x, 0, image_2d.shape[0] - 1))
@@ -311,9 +256,9 @@ class Crosshair(QObject):
# not sure how we got here, but just to be safe...
return
for item in self.items:
for item in self.plot_item.items:
if isinstance(item, pg.PlotDataItem):
name = item.name() or str(id(item))
name = item.name()
x, y = x_snap_values[name], y_snap_values[name]
if x is None or y is None:
continue
@@ -326,7 +271,7 @@ class Crosshair(QObject):
)
self.coordinatesChanged1D.emit(coordinate_to_emit)
elif isinstance(item, pg.ImageItem):
name = item.config.monitor or str(id(item))
name = item.config.monitor
x, y = x_snap_values[name], y_snap_values[name]
if x is None or y is None:
continue
@@ -364,14 +309,13 @@ class Crosshair(QObject):
# not sure how we got here, but just to be safe...
return
for item in self.items:
for item in self.plot_item.items:
if isinstance(item, pg.PlotDataItem):
name = item.name() or str(id(item))
name = item.name()
x, y = x_snap_values[name], y_snap_values[name]
if x is None or y is None:
continue
for marker_clicked in self.marker_clicked_1d[name]:
marker_clicked.setData([x], [y])
self.marker_clicked_1d[name].setData([x], [y])
x_snapped_scaled, y_snapped_scaled = self.scale_emitted_coordinates(x, y)
coordinate_to_emit = (
name,
@@ -380,7 +324,7 @@ class Crosshair(QObject):
)
self.coordinatesClicked1D.emit(coordinate_to_emit)
elif isinstance(item, pg.ImageItem):
name = item.config.monitor or str(id(item))
name = item.config.monitor
x, y = x_snap_values[name], y_snap_values[name]
if x is None or y is None:
continue
@@ -393,12 +337,9 @@ class Crosshair(QObject):
def clear_markers(self):
"""Clears the markers from the plot."""
for marker in self.marker_moved_1d.values():
self.plot_item.removeItem(marker)
for markers in self.marker_clicked_1d.values():
for marker in markers:
self.plot_item.removeItem(marker)
self.marker_moved_1d.clear()
self.marker_clicked_1d.clear()
marker.clear()
for marker in self.marker_clicked_1d.values():
marker.clear()
def scale_emitted_coordinates(self, x, y):
"""Scales the emitted coordinates if the axes are in log scale.
@@ -424,17 +365,9 @@ class Crosshair(QObject):
"""
x, y = pos
x_scaled, y_scaled = self.scale_emitted_coordinates(x, y)
text = f"({x_scaled:.{self.precision}g}, {y_scaled:.{self.precision}g})"
for item in self.items:
if isinstance(item, pg.ImageItem):
image = item.image
ix = int(np.clip(x, 0, image.shape[0] - 1))
iy = int(np.clip(y, 0, image.shape[1] - 1))
intensity = image[ix, iy]
text += f"\nIntensity: {intensity:.{self.precision}g}"
break
# Update coordinate label
self.coord_label.setText(text)
# # Update coordinate label
self.coord_label.setText(f"({x_scaled:.{self.precision}g}, {y_scaled:.{self.precision}g})")
self.coord_label.setPos(x, y)
self.coord_label.setVisible(True)
@@ -450,9 +383,6 @@ class Crosshair(QObject):
self.clear_markers()
def cleanup(self):
if self.marker_2d is not None:
self.plot_item.removeItem(self.marker_2d)
self.marker_2d = None
self.plot_item.removeItem(self.v_line)
self.plot_item.removeItem(self.h_line)
self.plot_item.removeItem(self.coord_label)

View File

@@ -22,9 +22,7 @@ class EntryValidator:
if entry is None or entry == "":
entry = next(iter(device._hints), name) if hasattr(device, "_hints") else name
if entry not in description:
raise ValueError(
f"Entry '{entry}' not found in device '{name}' signals. Available signals: {description.keys()}"
)
raise ValueError(f"Entry '{entry}' not found in device '{name}' signals")
return entry

View File

@@ -143,7 +143,7 @@ class DesignerPluginGenerator:
if __name__ == "__main__": # pragma: no cover
# from bec_widgets.widgets.bec_queue.bec_queue import BECQueue
from bec_widgets.widgets.utility.spinner import SpinnerWidget
from bec_widgets.widgets.spinner.spinner import SpinnerWidget
generator = DesignerPluginGenerator(SpinnerWidget)
generator.run(validate=False)

View File

@@ -148,7 +148,10 @@ class BECTickItem(BECIndicatorItem):
def cleanup(self) -> None:
"""Cleanup the item"""
self.remove_from_plot()
self.tick_item = None
if self.tick_item is not None:
self.tick_item.close()
self.tick_item.deleteLater()
self.tick_item = None
class BECArrowItem(BECIndicatorItem):

View File

@@ -53,7 +53,7 @@ class BECClassInfo:
obj: type
is_connector: bool = False
is_widget: bool = False
is_plugin: bool = False
is_top_level: bool = False
class BECClassContainer:
@@ -88,14 +88,14 @@ class BECClassContainer:
"""
Get all top-level classes.
"""
return [info.obj for info in self.collection if info.is_plugin]
return [info.obj for info in self.collection if info.is_top_level]
@property
def plugins(self):
"""
Get all plugins. These are all classes that are on the top level and are widgets.
"""
return [info.obj for info in self.collection if info.is_widget and info.is_plugin]
return [info.obj for info in self.collection if info.is_widget and info.is_top_level]
@property
def widgets(self):
@@ -109,17 +109,10 @@ class BECClassContainer:
"""
Get all top-level classes that are RPC-enabled. These are all classes that users can choose from.
"""
return [info.obj for info in self.collection if info.is_plugin and info.is_connector]
@property
def classes(self):
"""
Get all classes.
"""
return [info.obj for info in self.collection]
return [info.obj for info in self.collection if info.is_top_level and info.is_connector]
def get_custom_classes(repo_name: str) -> BECClassContainer:
def get_rpc_classes(repo_name: str) -> BECClassContainer:
"""
Get all RPC-enabled classes in the specified repository.
@@ -160,8 +153,6 @@ def get_custom_classes(repo_name: str) -> BECClassContainer:
issubclass(obj, QWidget) or issubclass(obj, QGraphicsWidget)
):
class_info.is_top_level = True
if hasattr(obj, "PLUGIN") and obj.PLUGIN:
class_info.is_plugin = True
collection.add_class(class_info)
return collection

View File

@@ -4,7 +4,7 @@ from qtpy import PYQT6, PYSIDE6, QT_VERSION
from qtpy.QtCore import QFile, QIODevice
from bec_widgets.utils.generate_designer_plugin import DesignerPluginInfo
from bec_widgets.utils.plugin_utils import get_custom_classes
from bec_widgets.utils.plugin_utils import get_rpc_classes
if PYSIDE6:
from PySide6.QtUiTools import QUiLoader
@@ -30,7 +30,7 @@ class UILoader:
def __init__(self, parent=None):
self.parent = parent
widgets = get_custom_classes("bec_widgets").classes
widgets = get_rpc_classes("bec_widgets").top_level_classes
self.custom_widgets = {widget.__name__: widget for widget in widgets}

View File

@@ -1,5 +1,6 @@
# pylint: disable=no-name-in-module
from abc import ABC, abstractmethod
from typing import Literal
from qtpy.QtWidgets import (
QApplication,
@@ -15,8 +16,6 @@ from qtpy.QtWidgets import (
QWidget,
)
from bec_widgets.widgets.utility.toggle.toggle import ToggleSwitch
class WidgetHandler(ABC):
"""Abstract base class for all widget handlers."""
@@ -29,15 +28,6 @@ class WidgetHandler(ABC):
def set_value(self, widget: QWidget, value):
"""Set a value on the widget instance."""
def connect_change_signal(self, widget: QWidget, slot):
"""
Connect a change signal from this widget to the given slot.
If the widget type doesn't have a known "value changed" signal, do nothing.
slot: a function accepting two arguments (widget, value)
"""
pass
class LineEditHandler(WidgetHandler):
"""Handler for QLineEdit widgets."""
@@ -48,9 +38,6 @@ class LineEditHandler(WidgetHandler):
def set_value(self, widget: QLineEdit, value: str) -> None:
widget.setText(value)
def connect_change_signal(self, widget: QLineEdit, slot):
widget.textChanged.connect(lambda text, w=widget: slot(w, text))
class ComboBoxHandler(WidgetHandler):
"""Handler for QComboBox widgets."""
@@ -66,11 +53,6 @@ class ComboBoxHandler(WidgetHandler):
if isinstance(value, int):
widget.setCurrentIndex(value)
def connect_change_signal(self, widget: QComboBox, slot):
# currentIndexChanged(int) or currentIndexChanged(str) both possible.
# We use currentIndexChanged(int) for a consistent behavior.
widget.currentIndexChanged.connect(lambda idx, w=widget: slot(w, self.get_value(w)))
class TableWidgetHandler(WidgetHandler):
"""Handler for QTableWidget widgets."""
@@ -90,16 +72,6 @@ class TableWidgetHandler(WidgetHandler):
item = QTableWidgetItem(str(cell_value))
widget.setItem(row, col, item)
def connect_change_signal(self, widget: QTableWidget, slot):
# If desired, we could connect cellChanged(row, col) and then fetch all data.
# This might be noisy if table is large.
# For demonstration, connect cellChanged to update entire table value.
def on_cell_changed(row, col, w=widget):
val = self.get_value(w)
slot(w, val)
widget.cellChanged.connect(on_cell_changed)
class SpinBoxHandler(WidgetHandler):
"""Handler for QSpinBox and QDoubleSpinBox widgets."""
@@ -110,9 +82,6 @@ class SpinBoxHandler(WidgetHandler):
def set_value(self, widget, value):
widget.setValue(value)
def connect_change_signal(self, widget: QSpinBox | QDoubleSpinBox, slot):
widget.valueChanged.connect(lambda val, w=widget: slot(w, val))
class CheckBoxHandler(WidgetHandler):
"""Handler for QCheckBox widgets."""
@@ -123,22 +92,6 @@ class CheckBoxHandler(WidgetHandler):
def set_value(self, widget, value):
widget.setChecked(value)
def connect_change_signal(self, widget: QCheckBox, slot):
widget.toggled.connect(lambda val, w=widget: slot(w, val))
class ToggleSwitchHandler(WidgetHandler):
"""Handler for ToggleSwitch widgets."""
def get_value(self, widget, **kwargs):
return widget.checked
def set_value(self, widget, value):
widget.checked = value
def connect_change_signal(self, widget: ToggleSwitch, slot):
widget.enabled.connect(lambda val, w=widget: slot(w, val))
class LabelHandler(WidgetHandler):
"""Handler for QLabel widgets."""
@@ -146,15 +99,12 @@ class LabelHandler(WidgetHandler):
def get_value(self, widget, **kwargs):
return widget.text()
def set_value(self, widget: QLabel, value):
def set_value(self, widget, value):
widget.setText(value)
# QLabel typically doesn't have user-editable changes. No signal to connect.
# If needed, this can remain empty.
class WidgetIO:
"""Public interface for getting, setting values and connecting signals using handler mapping"""
"""Public interface for getting and setting values using handler mapping"""
_handlers = {
QLineEdit: LineEditHandler,
@@ -164,7 +114,6 @@ class WidgetIO:
QDoubleSpinBox: SpinBoxHandler,
QCheckBox: CheckBoxHandler,
QLabel: LabelHandler,
ToggleSwitch: ToggleSwitchHandler,
}
@staticmethod
@@ -199,17 +148,6 @@ class WidgetIO:
elif not ignore_errors:
raise ValueError(f"No handler for widget type: {type(widget)}")
@staticmethod
def connect_widget_change_signal(widget, slot):
"""
Connect the widget's value-changed signal to a generic slot function (widget, value).
This now delegates the logic to the widget's handler.
"""
handler_class = WidgetIO._find_handler(widget)
if handler_class:
handler = handler_class()
handler.connect_change_signal(widget, slot)
@staticmethod
def check_and_adjust_limits(spin_box: QDoubleSpinBox, number: float):
"""
@@ -371,8 +309,8 @@ class WidgetHierarchy:
WidgetHierarchy.import_config_from_dict(child, widget_config, set_values)
# Example usage
def hierarchy_example(): # pragma: no cover
# Example application to demonstrate the usage of the functions
if __name__ == "__main__": # pragma: no cover
app = QApplication([])
# Create instance of WidgetHierarchy
@@ -427,37 +365,3 @@ def hierarchy_example(): # pragma: no cover
print(f"Config dict new REDUCED: {config_dict_new_reduced}")
app.exec()
def widget_io_signal_example(): # pragma: no cover
app = QApplication([])
main_widget = QWidget()
layout = QVBoxLayout(main_widget)
line_edit = QLineEdit(main_widget)
combo_box = QComboBox(main_widget)
spin_box = QSpinBox(main_widget)
combo_box.addItems(["Option 1", "Option 2", "Option 3"])
layout.addWidget(line_edit)
layout.addWidget(combo_box)
layout.addWidget(spin_box)
main_widget.show()
def universal_slot(w, val):
print(f"Widget {w.objectName() or w} changed, new value: {val}")
# Connect all supported widgets through their handlers
WidgetIO.connect_widget_change_signal(line_edit, universal_slot)
WidgetIO.connect_widget_change_signal(combo_box, universal_slot)
WidgetIO.connect_widget_change_signal(spin_box, universal_slot)
app.exec_()
if __name__ == "__main__": # pragma: no cover
# Change example function to test different scenarios
# hierarchy_example()
widget_io_signal_example()

View File

@@ -1,223 +0,0 @@
from __future__ import annotations
from bec_lib import bec_logger
from qtpy.QtCore import QSettings
from qtpy.QtWidgets import (
QApplication,
QCheckBox,
QFileDialog,
QHBoxLayout,
QLabel,
QLineEdit,
QPushButton,
QSpinBox,
QVBoxLayout,
QWidget,
)
logger = bec_logger.logger
class WidgetStateManager:
"""
A class to manage the state of a widget by saving and loading the state to and from a INI file.
Args:
widget(QWidget): The widget to manage the state for.
"""
def __init__(self, widget):
self.widget = widget
def save_state(self, filename: str = None):
"""
Save the state of the widget to an INI file.
Args:
filename(str): The filename to save the state to.
"""
if not filename:
filename, _ = QFileDialog.getSaveFileName(
self.widget, "Save Settings", "", "INI Files (*.ini)"
)
if filename:
settings = QSettings(filename, QSettings.IniFormat)
self._save_widget_state_qsettings(self.widget, settings)
def load_state(self, filename: str = None):
"""
Load the state of the widget from an INI file.
Args:
filename(str): The filename to load the state from.
"""
if not filename:
filename, _ = QFileDialog.getOpenFileName(
self.widget, "Load Settings", "", "INI Files (*.ini)"
)
if filename:
settings = QSettings(filename, QSettings.IniFormat)
self._load_widget_state_qsettings(self.widget, settings)
def _save_widget_state_qsettings(self, widget: QWidget, settings: QSettings):
"""
Save the state of the widget to QSettings.
Args:
widget(QWidget): The widget to save the state for.
settings(QSettings): The QSettings object to save the state to.
"""
if widget.property("skip_settings") is True:
return
meta = widget.metaObject()
widget_name = self._get_full_widget_name(widget)
settings.beginGroup(widget_name)
for i in range(meta.propertyCount()):
prop = meta.property(i)
name = prop.name()
if (
name == "objectName"
or not prop.isReadable()
or not prop.isWritable()
or not prop.isStored() # can be extended to fine filter
):
continue
value = widget.property(name)
settings.setValue(name, value)
settings.endGroup()
# Recursively process children (only if they aren't skipped)
for child in widget.children():
if (
child.objectName()
and child.property("skip_settings") is not True
and not isinstance(child, QLabel)
):
self._save_widget_state_qsettings(child, settings)
def _load_widget_state_qsettings(self, widget: QWidget, settings: QSettings):
"""
Load the state of the widget from QSettings.
Args:
widget(QWidget): The widget to load the state for.
settings(QSettings): The QSettings object to load the state from.
"""
if widget.property("skip_settings") is True:
return
meta = widget.metaObject()
widget_name = self._get_full_widget_name(widget)
settings.beginGroup(widget_name)
for i in range(meta.propertyCount()):
prop = meta.property(i)
name = prop.name()
if settings.contains(name):
value = settings.value(name)
widget.setProperty(name, value)
settings.endGroup()
# Recursively process children (only if they aren't skipped)
for child in widget.children():
if (
child.objectName()
and child.property("skip_settings") is not True
and not isinstance(child, QLabel)
):
self._load_widget_state_qsettings(child, settings)
def _get_full_widget_name(self, widget: QWidget):
"""
Get the full name of the widget including its parent names.
Args:
widget(QWidget): The widget to get the full name for.
Returns:
str: The full name of the widget.
"""
name = widget.objectName()
parent = widget.parent()
while parent:
obj_name = parent.objectName() or parent.metaObject().className()
name = obj_name + "." + name
parent = parent.parent()
return name
class ExampleApp(QWidget): # pragma: no cover:
def __init__(self):
super().__init__()
self.setObjectName("MainWindow")
self.setWindowTitle("State Manager Example")
layout = QVBoxLayout(self)
# A line edit to store some user text
self.line_edit = QLineEdit(self)
self.line_edit.setObjectName("MyLineEdit")
self.line_edit.setPlaceholderText("Enter some text here...")
layout.addWidget(self.line_edit)
# A spin box to hold a numeric value
self.spin_box = QSpinBox(self)
self.spin_box.setObjectName("MySpinBox")
self.spin_box.setRange(0, 100)
layout.addWidget(self.spin_box)
# A checkbox to hold a boolean value
self.check_box = QCheckBox("Enable feature?", self)
self.check_box.setObjectName("MyCheckBox")
layout.addWidget(self.check_box)
# A checkbox that we want to skip
self.check_box_skip = QCheckBox("Enable feature - skip save?", self)
self.check_box_skip.setProperty("skip_state", True)
self.check_box_skip.setObjectName("MyCheckBoxSkip")
layout.addWidget(self.check_box_skip)
# CREATE A "SIDE PANEL" with nested structure and skip all what is inside
self.side_panel = QWidget(self)
self.side_panel.setObjectName("SidePanel")
self.side_panel.setProperty("skip_settings", True) # skip the ENTIRE panel
layout.addWidget(self.side_panel)
# Put some sub-widgets inside side_panel
panel_layout = QVBoxLayout(self.side_panel)
self.panel_label = QLabel("Label in side panel", self.side_panel)
self.panel_label.setObjectName("PanelLabel")
panel_layout.addWidget(self.panel_label)
self.panel_edit = QLineEdit(self.side_panel)
self.panel_edit.setObjectName("PanelLineEdit")
self.panel_edit.setPlaceholderText("I am inside side panel")
panel_layout.addWidget(self.panel_edit)
self.panel_checkbox = QCheckBox("Enable feature in side panel?", self.side_panel)
self.panel_checkbox.setObjectName("PanelCheckBox")
panel_layout.addWidget(self.panel_checkbox)
# Save/Load buttons
button_layout = QHBoxLayout()
self.save_button = QPushButton("Save State", self)
self.load_button = QPushButton("Load State", self)
button_layout.addWidget(self.save_button)
button_layout.addWidget(self.load_button)
layout.addLayout(button_layout)
# Create the state manager
self.state_manager = WidgetStateManager(self)
# Connect buttons
self.save_button.clicked.connect(lambda: self.state_manager.save_state())
self.load_button.clicked.connect(lambda: self.state_manager.load_state())
if __name__ == "__main__": # pragma: no cover:
import sys
app = QApplication(sys.argv)
w = ExampleApp()
w.show()
sys.exit(app.exec_())

View File

@@ -0,0 +1 @@

View File

@@ -59,7 +59,7 @@ class DeviceInputBase(BECWidget):
ReadoutPriority.ON_REQUEST: "readout_on_request",
}
def __init__(self, client=None, config=None, gui_id: str | None = None, **kwargs):
def __init__(self, client=None, config=None, gui_id: str = None):
if config is None:
config = DeviceInputConfig(widget_class=self.__class__.__name__)
@@ -67,7 +67,7 @@ class DeviceInputBase(BECWidget):
if isinstance(config, dict):
config = DeviceInputConfig(**config)
self.config = config
super().__init__(client=client, config=config, gui_id=gui_id, theme_update=True, **kwargs)
super().__init__(client=client, config=config, gui_id=gui_id, theme_update=True)
self.get_bec_shortcuts()
self._device_filter = []
self._readout_filter = []

View File

@@ -35,14 +35,14 @@ class DeviceSignalInputBase(BECWidget):
Kind.config: "include_config_signals",
}
def __init__(self, client=None, config=None, gui_id: str = None, **kwargs):
def __init__(self, client=None, config=None, gui_id: str = None):
if config is None:
config = DeviceSignalInputBaseConfig(widget_class=self.__class__.__name__)
else:
if isinstance(config, dict):
config = DeviceSignalInputBaseConfig(**config)
self.config = config
super().__init__(client=client, config=config, gui_id=gui_id, **kwargs)
super().__init__(client=client, config=config, gui_id=gui_id)
self._device = None
self.get_bec_shortcuts()

View File

@@ -4,7 +4,7 @@
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.progress.bec_progressbar.bec_progressbar import BECProgressBar
from bec_widgets.widgets.bec_progressbar.bec_progressbar import BECProgressBar
DOM_XML = """
<ui language='c++'>

View File

@@ -14,7 +14,6 @@ class BECProgressBar(BECWidget, QWidget):
A custom progress bar with smooth transitions. The displayed text can be customized using a template.
"""
PLUGIN = True
USER_ACCESS = [
"set_value",
"set_maximum",
@@ -24,8 +23,8 @@ class BECProgressBar(BECWidget, QWidget):
]
ICON_NAME = "page_control"
def __init__(self, parent=None, client=None, config=None, gui_id=None, **kwargs):
super().__init__(client=client, config=config, gui_id=gui_id, **kwargs)
def __init__(self, parent=None, client=None, config=None, gui_id=None):
super().__init__(client=client, config=config, gui_id=gui_id)
QWidget.__init__(self, parent=parent)
accent_colors = get_accent_colors()

View File

@@ -6,9 +6,7 @@ def main(): # pragma: no cover
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.progress.bec_progressbar.bec_progress_bar_plugin import (
BECProgressBarPlugin,
)
from bec_widgets.widgets.bec_progressbar.bec_progress_bar_plugin import BECProgressBarPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(BECProgressBarPlugin())

View File

@@ -10,10 +10,10 @@ from bec_widgets.qt_utils.compact_popup import CompactPopupWidget
from bec_widgets.qt_utils.toolbar import ModularToolBar, SeparatorAction, WidgetAction
from bec_widgets.utils.bec_connector import ConnectionConfig
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.widgets.control.buttons.button_abort.button_abort import AbortButton
from bec_widgets.widgets.control.buttons.button_reset.button_reset import ResetButton
from bec_widgets.widgets.control.buttons.button_resume.button_resume import ResumeButton
from bec_widgets.widgets.control.buttons.stop_button.stop_button import StopButton
from bec_widgets.widgets.button_abort.button_abort import AbortButton
from bec_widgets.widgets.button_reset.button_reset import ResetButton
from bec_widgets.widgets.button_resume.button_resume import ResumeButton
from bec_widgets.widgets.stop_button.stop_button import StopButton
class BECQueue(BECWidget, CompactPopupWidget):
@@ -21,7 +21,6 @@ class BECQueue(BECWidget, CompactPopupWidget):
Widget to display the BEC queue.
"""
PLUGIN = True
ICON_NAME = "edit_note"
status_colors = {
"STOPPED": "red",
@@ -42,9 +41,8 @@ class BECQueue(BECWidget, CompactPopupWidget):
config: ConnectionConfig = None,
gui_id: str = None,
refresh_upon_start: bool = True,
**kwargs,
):
super().__init__(client=client, config=config, gui_id=gui_id, **kwargs)
super().__init__(client, config, gui_id)
CompactPopupWidget.__init__(self, parent=parent, layout=QVBoxLayout)
self.layout.setSpacing(0)
self.layout.setContentsMargins(0, 0, 0, 0)

View File

@@ -6,7 +6,7 @@ from qtpy.QtDesigner import QDesignerCustomWidgetInterface
import bec_widgets
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.services.bec_queue.bec_queue import BECQueue
from bec_widgets.widgets.bec_queue.bec_queue import BECQueue
DOM_XML = """
<ui language='c++'>

View File

@@ -6,7 +6,7 @@ def main(): # pragma: no cover
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.services.bec_queue.bec_queue_plugin import BECQueuePlugin
from bec_widgets.widgets.bec_queue.bec_queue_plugin import BECQueuePlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(BECQueuePlugin())

View File

@@ -4,17 +4,18 @@ The widget automatically updates the status of all running BEC services, and dis
from __future__ import annotations
import sys
from collections import defaultdict
from dataclasses import dataclass
from typing import TYPE_CHECKING
from bec_lib.utils.import_utils import lazy_import_from
from qtpy.QtCore import QObject, QTimer, Signal, Slot
from qtpy.QtWidgets import QHBoxLayout, QTreeWidget, QTreeWidgetItem
from qtpy.QtWidgets import QHBoxLayout, QTreeWidget, QTreeWidgetItem, QWidget
from bec_widgets.qt_utils.compact_popup import CompactPopupWidget
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.widgets.services.bec_status_box.status_item import StatusItem
from bec_widgets.widgets.bec_status_box.status_item import StatusItem
if TYPE_CHECKING:
from bec_lib.client import BECClient
@@ -74,7 +75,6 @@ class BECStatusBox(BECWidget, CompactPopupWidget):
gui_id Optional(str): The unique id for the widget. Defaults to None.
"""
PLUGIN = True
CORE_SERVICES = ["DeviceServer", "ScanServer", "SciHub", "ScanBundler", "FileWriterManager"]
service_update = Signal(BECServiceInfoContainer)
@@ -87,9 +87,8 @@ class BECStatusBox(BECWidget, CompactPopupWidget):
client: BECClient = None,
bec_service_status_mixin: BECServiceStatusMixin = None,
gui_id: str = None,
**kwargs,
):
super().__init__(client=client, gui_id=gui_id, **kwargs)
super().__init__(client=client, gui_id=gui_id)
CompactPopupWidget.__init__(self, parent=parent, layout=QHBoxLayout)
self.box_name = box_name

View File

@@ -6,7 +6,7 @@ from qtpy.QtDesigner import QDesignerCustomWidgetInterface
import bec_widgets
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.services.bec_status_box.bec_status_box import BECStatusBox
from bec_widgets.widgets.bec_status_box.bec_status_box import BECStatusBox
DOM_XML = """
<ui language='c++'>

View File

@@ -6,7 +6,7 @@ def main(): # pragma: no cover
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.services.bec_status_box.bec_status_box_plugin import BECStatusBoxPlugin
from bec_widgets.widgets.bec_status_box.bec_status_box_plugin import BECStatusBoxPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(BECStatusBoxPlugin())

View File

@@ -4,7 +4,7 @@
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.control.buttons.button_abort.button_abort import AbortButton
from bec_widgets.widgets.button_abort.button_abort import AbortButton
DOM_XML = """
<ui language='c++'>

View File

@@ -9,20 +9,12 @@ from bec_widgets.utils.bec_widget import BECWidget
class AbortButton(BECWidget, QWidget):
"""A button that abort the scan."""
PLUGIN = True
ICON_NAME = "cancel"
def __init__(
self,
parent=None,
client=None,
config=None,
gui_id=None,
toolbar=False,
scan_id=None,
**kwargs,
self, parent=None, client=None, config=None, gui_id=None, toolbar=False, scan_id=None
):
super().__init__(client=client, config=config, gui_id=gui_id, **kwargs)
super().__init__(client=client, config=config, gui_id=gui_id)
QWidget.__init__(self, parent=parent)
self.get_bec_shortcuts()

View File

@@ -6,9 +6,7 @@ def main(): # pragma: no cover
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.control.buttons.button_abort.abort_button_plugin import (
AbortButtonPlugin,
)
from bec_widgets.widgets.button_abort.abort_button_plugin import AbortButtonPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(AbortButtonPlugin())

View File

@@ -9,11 +9,10 @@ from bec_widgets.utils.bec_widget import BECWidget
class ResetButton(BECWidget, QWidget):
"""A button that resets the scan queue."""
PLUGIN = True
ICON_NAME = "restart_alt"
def __init__(self, parent=None, client=None, config=None, gui_id=None, toolbar=False, **kwargs):
super().__init__(client=client, config=config, gui_id=gui_id, **kwargs)
def __init__(self, parent=None, client=None, config=None, gui_id=None, toolbar=False):
super().__init__(client=client, config=config, gui_id=gui_id)
QWidget.__init__(self, parent=parent)
self.get_bec_shortcuts()

View File

@@ -6,9 +6,7 @@ def main(): # pragma: no cover
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.control.buttons.button_reset.reset_button_plugin import (
ResetButtonPlugin,
)
from bec_widgets.widgets.button_reset.reset_button_plugin import ResetButtonPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(ResetButtonPlugin())

View File

@@ -4,7 +4,7 @@
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.control.buttons.button_reset.button_reset import ResetButton
from bec_widgets.widgets.button_reset.button_reset import ResetButton
DOM_XML = """
<ui language='c++'>

View File

@@ -9,11 +9,10 @@ from bec_widgets.utils.bec_widget import BECWidget
class ResumeButton(BECWidget, QWidget):
"""A button that continue scan queue."""
PLUGIN = True
ICON_NAME = "resume"
def __init__(self, parent=None, client=None, config=None, gui_id=None, toolbar=False, **kwargs):
super().__init__(client=client, config=config, gui_id=gui_id, **kwargs)
def __init__(self, parent=None, client=None, config=None, gui_id=None, toolbar=False):
super().__init__(client=client, config=config, gui_id=gui_id)
QWidget.__init__(self, parent=parent)
self.get_bec_shortcuts()

View File

@@ -6,9 +6,7 @@ def main(): # pragma: no cover
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.control.buttons.button_resume.resume_button_plugin import (
ResumeButtonPlugin,
)
from bec_widgets.widgets.button_resume.resume_button_plugin import ResumeButtonPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(ResumeButtonPlugin())

View File

@@ -4,7 +4,7 @@
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.control.buttons.button_resume.button_resume import ResumeButton
from bec_widgets.widgets.button_resume.button_resume import ResumeButton
DOM_XML = """
<ui language='c++'>

View File

@@ -17,7 +17,6 @@ class ColorButton(QWidget):
color_selected = Signal(str)
PLUGIN = True
ICON_NAME = "colors"
def __init__(self, *args, **kwargs):

View File

@@ -4,7 +4,7 @@ from qtpy.QtDesigner import QDesignerCustomWidgetInterface
import bec_widgets
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.utility.visual.color_button.color_button import ColorButton
from bec_widgets.widgets.color_button.color_button import ColorButton
DOM_XML = """
<ui language='c++'>

View File

@@ -6,9 +6,7 @@ def main(): # pragma: no cover
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.utility.visual.color_button.color_button_plugin import (
ColorButtonPlugin,
)
from bec_widgets.widgets.color_button.color_button_plugin import ColorButtonPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(ColorButtonPlugin())

View File

@@ -47,7 +47,6 @@ class ColormapSelector(QWidget):
colormap_changed_signal = Signal(str)
ICON_NAME = "palette"
PLUGIN = True
def __init__(self, parent=None, default_colormaps=None):
super().__init__(parent=parent)

View File

@@ -6,7 +6,7 @@ from qtpy.QtDesigner import QDesignerCustomWidgetInterface
import bec_widgets
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.utility.visual.colormap_selector.colormap_selector import ColormapSelector
from bec_widgets.widgets.colormap_selector.colormap_selector import ColormapSelector
DOM_XML = """
<ui language='c++'>

View File

@@ -6,7 +6,7 @@ def main(): # pragma: no cover
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.utility.visual.colormap_selector.colormap_selector_plugin import (
from bec_widgets.widgets.colormap_selector.colormap_selector_plugin import (
ColormapSelectorPlugin,
)

View File

@@ -4,7 +4,7 @@
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.utility.visual.colormap_widget.colormap_widget import BECColorMapWidget
from bec_widgets.widgets.colormap_widget.colormap_widget import BECColorMapWidget
DOM_XML = """
<ui language='c++'>

View File

@@ -10,10 +10,9 @@ class BECColorMapWidget(BECWidget, QWidget):
colormap_changed_signal = Signal(str)
ICON_NAME = "palette"
USER_ACCESS = ["colormap"]
PLUGIN = True
def __init__(self, parent=None, cmap: str = "magma", **kwargs):
super().__init__(**kwargs)
def __init__(self, parent=None, cmap: str = "magma"):
super().__init__()
QWidget.__init__(self, parent=parent)
# Create the ColorMapButton

View File

@@ -6,7 +6,7 @@ def main(): # pragma: no cover
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.utility.visual.colormap_widget.bec_color_map_widget_plugin import (
from bec_widgets.widgets.colormap_widget.bec_color_map_widget_plugin import (
BECColorMapWidgetPlugin,
)

View File

@@ -10,13 +10,9 @@ import fcntl
import html
import os
import pty
import re
import signal
import sys
import time
import pyte
from pygments.token import Token
from pyte.screens import History
from qtpy import QtCore, QtGui, QtWidgets
from qtpy.QtCore import Property as pyqtProperty
@@ -236,17 +232,12 @@ class Backend(QtCore.QObject):
class BECConsole(QtWidgets.QWidget):
"""Container widget for the terminal text area"""
PLUGIN = True
ICON_NAME = "terminal"
prompt = pyqtSignal(bool)
def __init__(self, parent=None, cols=132):
super().__init__(parent)
self.term = _TerminalWidget(self, cols, rows=43)
self.term.prompt.connect(self.prompt) # forward signal from term to this widget
self.scroll_bar = QScrollBar(Qt.Vertical, self)
# self.scroll_bar.hide()
layout = QHBoxLayout(self)
@@ -328,38 +319,9 @@ class BECConsole(QtWidgets.QWidget):
def start(self, deactivate_ctrl_d=True):
self.term.start(deactivate_ctrl_d=deactivate_ctrl_d)
def push(self, text, hit_return=False):
def push(self, text):
"""Push some text to the terminal"""
return self.term.push(text, hit_return=hit_return)
def execute_command(self, command):
self.push(command, hit_return=True)
def set_prompt_tokens(self, *tokens):
"""Prepare regexp to identify prompt, based on tokens
Tokens are returned from get_ipython().prompts.in_prompt_tokens()
"""
regex_parts = []
for token_type, token_value in tokens:
if token_type == Token.PromptNum: # Handle dynamic prompt number
regex_parts.append(r"[\d\?]+") # Match one or more digits or '?'
else:
# Escape other prompt parts (e.g., "In [", "]: ")
if not token_value:
regex_parts.append(".+?") # arbitrary string
else:
regex_parts.append(re.escape(token_value))
# Combine into a single regex
prompt_pattern = "".join(regex_parts)
self.term._prompt_re = re.compile(prompt_pattern + r"\s*$")
def terminate(self, timeout=10):
self.term.stop(timeout=timeout)
def send_ctrl_c(self, timeout=None):
self.term.send_ctrl_c(timeout)
return self.term.push(text)
cols = pyqtProperty(int, get_cols, set_cols)
rows = pyqtProperty(int, get_rows, set_rows)
@@ -373,15 +335,7 @@ class _TerminalWidget(QtWidgets.QPlainTextEdit):
Start ``Backend`` process and render Pyte output as text.
"""
prompt = pyqtSignal(bool)
def __init__(self, parent, cols=125, rows=50, **kwargs):
# regexp to match prompt
self._prompt_re = None
# last prompt
self._prompt_str = None
# process pid
self.pid = None
# file descriptor to communicate with the subprocess
self.fd = None
self.backend = None
@@ -478,7 +432,7 @@ class _TerminalWidget(QtWidgets.QPlainTextEdit):
self.update_term_size()
# Start the Bash process
self.pid, self.fd = self.fork_shell()
self.fd = self.fork_shell()
if self.fd:
# Create the ``Backend`` object
@@ -494,62 +448,6 @@ class _TerminalWidget(QtWidgets.QPlainTextEdit):
self.appendHtml(f"<br><h2>{repr(self._cmd)} - Process exited.</h2>")
self.setReadOnly(True)
def send_ctrl_c(self, wait_prompt=True, timeout=None):
"""Send CTRL-C to the process
If wait_prompt=True (default), wait for a new prompt after CTRL-C
If no prompt is displayed after 'timeout' seconds, TimeoutError is raised
"""
os.kill(self.pid, signal.SIGINT)
if wait_prompt:
timeout_error = False
if timeout:
def set_timeout_error():
nonlocal timeout_error
timeout_error = True
timeout_timer = QTimer()
timeout_timer.singleShot(timeout * 1000, set_timeout_error)
while self._prompt_str is None:
QApplication.instance().process_events()
if timeout_error:
raise TimeoutError(
f"CTRL-C: could not get back to prompt after {timeout} seconds."
)
def _is_running(self):
if os.waitpid(self.pid, os.WNOHANG) == (0, 0):
return True
return False
def stop(self, kill=True, timeout=None):
"""Stop the running process
SIGTERM is the default signal for terminating processes.
If kill=True (default), SIGKILL will be sent if the process does not exit after timeout
"""
# try to exit gracefully
os.kill(self.pid, signal.SIGTERM)
# wait until process is truly dead
t0 = time.perf_counter()
while self._is_running():
time.sleep(1)
if timeout is not None and time.perf_counter() - t0 > timeout:
# still alive after 'timeout' seconds
if kill:
# send SIGKILL and make a last check in loop
os.kill(self.pid, signal.SIGKILL)
kill = False
else:
# still running after timeout...
raise TimeoutError(
f"Could not terminate process with pid: {self.pid} within timeout"
)
self.process_exited()
def data_ready(self, screen):
"""Handle new screen: redraw, set scroll bar max and slider, move cursor to its position
@@ -641,13 +539,11 @@ class _TerminalWidget(QtWidgets.QPlainTextEdit):
elif code is not None:
self.write(code)
def push(self, text, hit_return=False):
def push(self, text):
"""
Write 'text' to terminal
"""
self.write(text.encode("utf-8"))
if hit_return:
self.write(b"\n")
def contextMenuEvent(self, event):
if self.fd is None:
@@ -753,20 +649,6 @@ class _TerminalWidget(QtWidgets.QPlainTextEdit):
self.output[line_no] = line
# fill the text area with HTML contents in one go
self.appendHtml(f"<pre>{chr(10).join(self.output)}</pre>")
if self._prompt_re is not None:
text_buf = self.toPlainText()
prompt = self._prompt_re.search(text_buf)
if prompt is None:
if self._prompt_str:
self.prompt.emit(False)
self._prompt_str = None
else:
prompt_str = prompt.string.rstrip()
if prompt_str != self._prompt_str:
self._prompt_str = prompt_str
self.prompt.emit(True)
# did updates, all clean
screen.dirty.clear()
@@ -828,7 +710,7 @@ class _TerminalWidget(QtWidgets.QPlainTextEdit):
# We are in the parent process.
# Set file control
fcntl.fcntl(fd, fcntl.F_SETFL, os.O_NONBLOCK)
return pid, fd
return fd
if __name__ == "__main__":
@@ -845,24 +727,6 @@ if __name__ == "__main__":
console = BECConsole(mainwin)
mainwin.setCentralWidget(console)
def check_prompt(at_prompt):
if at_prompt:
print("NEW PROMPT")
else:
print("EXECUTING SOMETHING...")
console.set_prompt_tokens(
(Token.OutPromptNum, ""),
(Token.Prompt, ""), # will match arbitrary string,
(Token.Prompt, " ["),
(Token.PromptNum, "3"),
(Token.Prompt, "/"),
(Token.PromptNum, "1"),
(Token.Prompt, "] "),
(Token.Prompt, ""),
)
console.prompt.connect(check_prompt)
console.start()
# Show widget and launch Qt's event loop.

View File

@@ -6,7 +6,7 @@ from qtpy.QtDesigner import QDesignerCustomWidgetInterface
import bec_widgets
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.editors.console.console import BECConsole
from bec_widgets.widgets.console.console import BECConsole
DOM_XML = """
<ui language='c++'>

View File

@@ -6,7 +6,7 @@ def main(): # pragma: no cover
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.editors.console.console_plugin import BECConsolePlugin
from bec_widgets.widgets.console.console_plugin import BECConsolePlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(BECConsolePlugin())

View File

@@ -1,340 +0,0 @@
from collections import deque
from typing import Literal, Optional
import pyqtgraph as pg
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from pydantic import Field, field_validator
from pyqtgraph.exporters import MatplotlibExporter
from qtpy.QtCore import Signal, Slot
from qtpy.QtWidgets import QWidget
from bec_widgets.utils import Colors
from bec_widgets.widgets.containers.figure.plots.plot_base import BECPlotBase, SubplotConfig
logger = bec_logger.logger
class BECMultiWaveformConfig(SubplotConfig):
color_palette: Optional[str] = Field(
"magma", description="The color palette of the figure widget.", validate_default=True
)
curve_limit: Optional[int] = Field(
200, description="The maximum number of curves to display on the plot."
)
flush_buffer: Optional[bool] = Field(
False, description="Flush the buffer of the plot widget when the curve limit is reached."
)
monitor: Optional[str] = Field(
None, description="The monitor to set for the plot widget."
) # TODO validate monitor in bec -> maybe make it as SignalData class for validation purpose
curve_width: Optional[int] = Field(1, description="The width of the curve on the plot.")
opacity: Optional[int] = Field(50, description="The opacity of the curve on the plot.")
highlight_last_curve: Optional[bool] = Field(
True, description="Highlight the last curve on the plot."
)
model_config: dict = {"validate_assignment": True}
_validate_color_map_z = field_validator("color_palette")(Colors.validate_color_map)
class BECMultiWaveform(BECPlotBase):
monitor_signal_updated = Signal()
highlighted_curve_index_changed = Signal(int)
USER_ACCESS = [
"_rpc_id",
"_config_dict",
"curves",
"set_monitor",
"set_opacity",
"set_curve_limit",
"set_curve_highlight",
"set_colormap",
"set",
"set_title",
"set_x_label",
"set_y_label",
"set_x_scale",
"set_y_scale",
"set_x_lim",
"set_y_lim",
"set_grid",
"set_colormap",
"enable_fps_monitor",
"lock_aspect_ratio",
"export",
"get_all_data",
"remove",
]
def __init__(
self,
parent: Optional[QWidget] = None,
parent_figure=None,
config: Optional[BECMultiWaveformConfig] = None,
client=None,
gui_id: Optional[str] = None,
):
if config is None:
config = BECMultiWaveformConfig(widget_class=self.__class__.__name__)
super().__init__(
parent=parent, parent_figure=parent_figure, config=config, client=client, gui_id=gui_id
)
self.old_scan_id = None
self.scan_id = None
self.monitor = None
self.connected = False
self.current_highlight_index = 0
self._curves = deque()
self.visible_curves = []
self.number_of_visible_curves = 0
# Get bec shortcuts dev, scans, queue, scan_storage, dap
self.get_bec_shortcuts()
@property
def curves(self) -> deque:
"""
Get the curves of the plot widget as a deque.
Returns:
deque: Deque of curves.
"""
return self._curves
@curves.setter
def curves(self, value: deque):
self._curves = value
@property
def highlight_last_curve(self) -> bool:
"""
Get the highlight_last_curve property.
Returns:
bool: The highlight_last_curve property.
"""
return self.config.highlight_last_curve
@highlight_last_curve.setter
def highlight_last_curve(self, value: bool):
self.config.highlight_last_curve = value
def set_monitor(self, monitor: str):
"""
Set the monitor for the plot widget.
Args:
monitor (str): The monitor to set.
"""
self.config.monitor = monitor
self._connect_monitor()
def _connect_monitor(self):
"""
Connect the monitor to the plot widget.
"""
try:
previous_monitor = self.monitor
except AttributeError:
previous_monitor = None
if previous_monitor and self.connected is True:
self.bec_dispatcher.disconnect_slot(
self.on_monitor_1d_update, MessageEndpoints.device_monitor_1d(previous_monitor)
)
if self.config.monitor and self.connected is False:
self.bec_dispatcher.connect_slot(
self.on_monitor_1d_update, MessageEndpoints.device_monitor_1d(self.config.monitor)
)
self.connected = True
self.monitor = self.config.monitor
@Slot(dict, dict)
def on_monitor_1d_update(self, msg: dict, metadata: dict):
"""
Update the plot widget with the monitor data.
Args:
msg(dict): The message data.
metadata(dict): The metadata of the message.
"""
data = msg.get("data", None)
current_scan_id = metadata.get("scan_id", None)
if current_scan_id != self.scan_id:
self.scan_id = current_scan_id
self.clear_curves()
self.curves.clear()
if self.crosshair:
self.crosshair.clear_markers()
# Always create a new curve and add it
curve = pg.PlotDataItem()
curve.setData(data)
self.plot_item.addItem(curve)
self.curves.append(curve)
# Max Trace and scale colors
self.set_curve_limit(self.config.curve_limit, self.config.flush_buffer)
self.monitor_signal_updated.emit()
@Slot(int)
def set_curve_highlight(self, index: int):
"""
Set the curve highlight based on visible curves.
Args:
index (int): The index of the curve to highlight among visible curves.
"""
self.plot_item.visible_curves = [curve for curve in self.curves if curve.isVisible()]
num_visible_curves = len(self.plot_item.visible_curves)
self.number_of_visible_curves = num_visible_curves
if num_visible_curves == 0:
return # No curves to highlight
if index >= num_visible_curves:
index = num_visible_curves - 1
elif index < 0:
index = num_visible_curves + index
self.current_highlight_index = index
num_colors = num_visible_curves
colors = Colors.evenly_spaced_colors(
colormap=self.config.color_palette, num=num_colors, format="HEX"
)
for i, curve in enumerate(self.plot_item.visible_curves):
curve.setPen()
if i == self.current_highlight_index:
curve.setPen(pg.mkPen(color=colors[i], width=5))
curve.setAlpha(alpha=1, auto=False)
curve.setZValue(1)
else:
curve.setPen(pg.mkPen(color=colors[i], width=1))
curve.setAlpha(alpha=self.config.opacity / 100, auto=False)
curve.setZValue(0)
self.highlighted_curve_index_changed.emit(self.current_highlight_index)
@Slot(int)
def set_opacity(self, opacity: int):
"""
Set the opacity of the curve on the plot.
Args:
opacity(int): The opacity of the curve. 0-100.
"""
self.config.opacity = max(0, min(100, opacity))
self.set_curve_highlight(self.current_highlight_index)
@Slot(int, bool)
def set_curve_limit(self, max_trace: int, flush_buffer: bool = False):
"""
Set the maximum number of traces to display on the plot.
Args:
max_trace (int): The maximum number of traces to display.
flush_buffer (bool): Flush the buffer.
"""
self.config.curve_limit = max_trace
self.config.flush_buffer = flush_buffer
if self.config.curve_limit is None:
self.scale_colors()
return
if self.config.flush_buffer:
# Remove excess curves from the plot and the deque
while len(self.curves) > self.config.curve_limit:
curve = self.curves.popleft()
self.plot_item.removeItem(curve)
else:
# Hide or show curves based on the new max_trace
num_curves_to_show = min(self.config.curve_limit, len(self.curves))
for i, curve in enumerate(self.curves):
if i < len(self.curves) - num_curves_to_show:
curve.hide()
else:
curve.show()
self.scale_colors()
def scale_colors(self):
"""
Scale the colors of the curves based on the current colormap.
"""
if self.config.highlight_last_curve:
self.set_curve_highlight(-1) # Use -1 to highlight the last visible curve
else:
self.set_curve_highlight(self.current_highlight_index)
def set_colormap(self, colormap: str):
"""
Set the colormap for the curves.
Args:
colormap(str): Colormap for the curves.
"""
self.config.color_palette = colormap
self.set_curve_highlight(self.current_highlight_index)
def hook_crosshair(self) -> None:
super().hook_crosshair()
if self.crosshair:
self.highlighted_curve_index_changed.connect(self.crosshair.update_highlighted_curve)
if self.curves:
self.crosshair.update_highlighted_curve(self.current_highlight_index)
def get_all_data(self, output: Literal["dict", "pandas"] = "dict") -> dict:
"""
Extract all curve data into a dictionary or a pandas DataFrame.
Args:
output (Literal["dict", "pandas"]): Format of the output data.
Returns:
dict | pd.DataFrame: Data of all curves in the specified format.
"""
data = {}
try:
import pandas as pd
except ImportError:
pd = None
if output == "pandas":
logger.warning(
"Pandas is not installed. "
"Please install pandas using 'pip install pandas'."
"Output will be dictionary instead."
)
output = "dict"
curve_keys = []
curves_list = list(self.curves)
for i, curve in enumerate(curves_list):
x_data, y_data = curve.getData()
if x_data is not None or y_data is not None:
key = f"curve_{i}"
curve_keys.append(key)
if output == "dict":
data[key] = {"x": x_data.tolist(), "y": y_data.tolist()}
elif output == "pandas" and pd is not None:
data[key] = pd.DataFrame({"x": x_data, "y": y_data})
if output == "pandas" and pd is not None:
combined_data = pd.concat([data[key] for key in curve_keys], axis=1, keys=curve_keys)
return combined_data
return data
def clear_curves(self):
"""
Remove all curves from the plot, excluding crosshair items.
"""
items_to_remove = []
for item in self.plot_item.items:
if not getattr(item, "is_crosshair", False) and isinstance(item, pg.PlotDataItem):
items_to_remove.append(item)
for item in items_to_remove:
self.plot_item.removeItem(item)
def export_to_matplotlib(self):
"""
Export current waveform to matplotlib GUI. Available only if matplotlib is installed in the environment.
"""
MatplotlibExporter(self.plot_item).export()

View File

@@ -1,882 +0,0 @@
import math
import sys
from typing import Dict, Literal, Optional, Set, Tuple, Union
from qtpy.QtWidgets import (
QApplication,
QComboBox,
QGridLayout,
QGroupBox,
QHBoxLayout,
QLabel,
QLineEdit,
QMainWindow,
QMessageBox,
QPushButton,
QSpinBox,
QSplitter,
QVBoxLayout,
QWidget,
)
from typeguard import typechecked
from bec_widgets.cli.rpc.rpc_widget_handler import widget_handler
class LayoutManagerWidget(QWidget):
"""
A robust layout manager that extends QGridLayout functionality, allowing
users to add/remove widgets, access widgets by coordinates, shift widgets,
and change the layout dynamically with automatic reindexing to keep the grid compact.
Supports adding widgets via QWidget instances or string identifiers referencing the widget handler.
"""
def __init__(self, parent=None, auto_reindex=True):
super().__init__(parent)
self.setObjectName("LayoutManagerWidget")
self.layout = QGridLayout(self)
self.auto_reindex = auto_reindex
# Mapping from widget to its position (row, col, rowspan, colspan)
self.widget_positions: Dict[QWidget, Tuple[int, int, int, int]] = {}
# Mapping from (row, col) to widget
self.position_widgets: Dict[Tuple[int, int], QWidget] = {}
# Keep track of the current position for automatic placement
self.current_row = 0
self.current_col = 0
def add_widget(
self,
widget: QWidget | str,
row: int | None = None,
col: Optional[int] = None,
rowspan: int = 1,
colspan: int = 1,
shift_existing: bool = True,
shift_direction: Literal["down", "up", "left", "right"] = "right",
) -> QWidget:
"""
Add a widget to the grid with enhanced shifting capabilities.
Args:
widget (QWidget | str): The widget to add. If str, it is used to create a widget via widget_handler.
row (int, optional): The row to add the widget to. If None, the next available row is used.
col (int, optional): The column to add the widget to. If None, the next available column is used.
rowspan (int): Number of rows the widget spans. Default is 1.
colspan (int): Number of columns the widget spans. Default is 1.
shift_existing (bool): Whether to shift existing widgets if the target position is occupied. Default is True.
shift_direction (Literal["down", "up", "left", "right"]): Direction to shift existing widgets. Default is "right".
Returns:
QWidget: The widget that was added.
"""
# Handle widget creation if a BECWidget string identifier is provided
if isinstance(widget, str):
widget = widget_handler.create_widget(widget)
if row is None:
row = self.current_row
if col is None:
col = self.current_col
if (row, col) in self.position_widgets:
if shift_existing:
# Attempt to shift the existing widget in the specified direction
self.shift_widgets(direction=shift_direction, start_row=row, start_col=col)
else:
raise ValueError(f"Position ({row}, {col}) is already occupied.")
# Add the widget to the layout
self.layout.addWidget(widget, row, col, rowspan, colspan)
self.widget_positions[widget] = (row, col, rowspan, colspan)
self.position_widgets[(row, col)] = widget
# Update current position for automatic placement
self.current_col = col + colspan
self.current_row = max(self.current_row, row)
if self.auto_reindex:
self.reindex_grid()
return widget
def add_widget_relative(
self,
widget: QWidget | str,
reference_widget: QWidget,
position: Literal["left", "right", "top", "bottom"],
rowspan: int = 1,
colspan: int = 1,
shift_existing: bool = True,
shift_direction: Literal["down", "up", "left", "right"] = "right",
) -> QWidget:
"""
Add a widget relative to an existing widget.
Args:
widget (QWidget | str): The widget to add. If str, it is used to create a widget via widget_handler.
reference_widget (QWidget): The widget relative to which the new widget will be placed.
position (Literal["left", "right", "top", "bottom"]): Position relative to the reference widget.
rowspan (int): Number of rows the widget spans. Default is 1.
colspan (int): Number of columns the widget spans. Default is 1.
shift_existing (bool): Whether to shift existing widgets if the target position is occupied.
shift_direction (Literal["down", "up", "left", "right"]): Direction to shift existing widgets.
Returns:
QWidget: The widget that was added.
Raises:
ValueError: If the reference widget is not found.
"""
if reference_widget not in self.widget_positions:
raise ValueError("Reference widget not found in layout.")
ref_row, ref_col, ref_rowspan, ref_colspan = self.widget_positions[reference_widget]
# Determine new widget position based on the specified relative position
if position == "left":
new_row = ref_row
new_col = ref_col - 1
elif position == "right":
new_row = ref_row
new_col = ref_col + ref_colspan
elif position == "top":
new_row = ref_row - 1
new_col = ref_col
elif position == "bottom":
new_row = ref_row + ref_rowspan
new_col = ref_col
else:
raise ValueError("Invalid position. Choose from 'left', 'right', 'top', 'bottom'.")
# Add the widget at the calculated position
return self.add_widget(
widget=widget,
row=new_row,
col=new_col,
rowspan=rowspan,
colspan=colspan,
shift_existing=shift_existing,
shift_direction=shift_direction,
)
def move_widget_by_coords(
self,
current_row: int,
current_col: int,
new_row: int,
new_col: int,
shift: bool = True,
shift_direction: Literal["down", "up", "left", "right"] = "right",
) -> None:
"""
Move a widget from (current_row, current_col) to (new_row, new_col).
Args:
current_row (int): Current row of the widget.
current_col (int): Current column of the widget.
new_row (int): Target row.
new_col (int): Target column.
shift (bool): Whether to shift existing widgets if the target position is occupied.
shift_direction (Literal["down", "up", "left", "right"]): Direction to shift existing widgets.
Raises:
ValueError: If the widget is not found or target position is invalid.
"""
self.move_widget(
old_row=current_row,
old_col=current_col,
new_row=new_row,
new_col=new_col,
shift=shift,
shift_direction=shift_direction,
)
@typechecked
def move_widget_by_object(
self,
widget: QWidget,
new_row: int,
new_col: int,
shift: bool = True,
shift_direction: Literal["down", "up", "left", "right"] = "right",
) -> None:
"""
Move a widget to a new position using the widget object.
Args:
widget (QWidget): The widget to move.
new_row (int): Target row.
new_col (int): Target column.
shift (bool): Whether to shift existing widgets if the target position is occupied.
shift_direction (Literal["down", "up", "left", "right"]): Direction to shift existing widgets.
Raises:
ValueError: If the widget is not found or target position is invalid.
"""
if widget not in self.widget_positions:
raise ValueError("Widget not found in layout.")
old_position = self.widget_positions[widget]
old_row, old_col = old_position[0], old_position[1]
self.move_widget(
old_row=old_row,
old_col=old_col,
new_row=new_row,
new_col=new_col,
shift=shift,
shift_direction=shift_direction,
)
@typechecked
def move_widget(
self,
old_row: int | None = None,
old_col: int | None = None,
new_row: int | None = None,
new_col: int | None = None,
shift: bool = True,
shift_direction: Literal["down", "up", "left", "right"] = "right",
) -> None:
"""
Move a widget to a new position. If the new position is occupied and shift is True,
shift the existing widget to the specified direction.
Args:
old_row (int, optional): The current row of the widget.
old_col (int, optional): The current column of the widget.
new_row (int, optional): The target row to move the widget to.
new_col (int, optional): The target column to move the widget to.
shift (bool): Whether to shift existing widgets if the target position is occupied.
shift_direction (Literal["down", "up", "left", "right"]): Direction to shift existing widgets.
Raises:
ValueError: If the widget is not found or target position is invalid.
"""
if new_row is None or new_col is None:
raise ValueError("Must provide both new_row and new_col to move a widget.")
if old_row is None and old_col is None:
raise ValueError(f"No widget found at position ({old_row}, {old_col}).")
widget = self.get_widget(old_row, old_col)
if (new_row, new_col) in self.position_widgets:
if not shift:
raise ValueError(f"Position ({new_row}, {new_col}) is already occupied.")
# Shift the existing widget to make space
self.shift_widgets(
direction=shift_direction,
start_row=new_row if shift_direction in ["down", "up"] else 0,
start_col=new_col if shift_direction in ["left", "right"] else 0,
)
# Proceed to move the widget
self.layout.removeWidget(widget)
old_position = self.widget_positions.pop(widget)
self.position_widgets.pop((old_position[0], old_position[1]))
self.layout.addWidget(widget, new_row, new_col, old_position[2], old_position[3])
self.widget_positions[widget] = (new_row, new_col, old_position[2], old_position[3])
self.position_widgets[(new_row, new_col)] = widget
# Update current_row and current_col for automatic placement if needed
self.current_row = max(self.current_row, new_row)
self.current_col = max(self.current_col, new_col + old_position[3])
if self.auto_reindex:
self.reindex_grid()
@typechecked
def shift_widgets(
self,
direction: Literal["down", "up", "left", "right"],
start_row: int = 0,
start_col: int = 0,
) -> None:
"""
Shift widgets in the grid in the specified direction starting from the given position.
Args:
direction (Literal["down", "up", "left", "right"]): Direction to shift widgets.
start_row (int): Starting row index.
start_col (int): Starting column index.
Raises:
ValueError: If shifting causes widgets to go out of grid boundaries.
"""
shifts = []
positions_to_shift = [(start_row, start_col)]
visited_positions = set()
while positions_to_shift:
row, col = positions_to_shift.pop(0)
if (row, col) in visited_positions:
continue
visited_positions.add((row, col))
widget = self.position_widgets.get((row, col))
if widget is None:
continue # No widget at this position
# Compute new position based on the direction
if direction == "down":
new_row = row + 1
new_col = col
elif direction == "up":
new_row = row - 1
new_col = col
elif direction == "right":
new_row = row
new_col = col + 1
elif direction == "left":
new_row = row
new_col = col - 1
# Check for negative indices
if new_row < 0 or new_col < 0:
raise ValueError("Shifting widgets out of grid boundaries.")
# If the new position is occupied, add it to the positions to shift
if (new_row, new_col) in self.position_widgets:
positions_to_shift.append((new_row, new_col))
shifts.append(
(widget, (row, col), (new_row, new_col), self.widget_positions[widget][2:])
)
# Remove all widgets from their old positions
for widget, (old_row, old_col), _, _ in shifts:
self.layout.removeWidget(widget)
self.position_widgets.pop((old_row, old_col))
# Add widgets to their new positions
for widget, _, (new_row, new_col), (rowspan, colspan) in shifts:
self.layout.addWidget(widget, new_row, new_col, rowspan, colspan)
self.widget_positions[widget] = (new_row, new_col, rowspan, colspan)
self.position_widgets[(new_row, new_col)] = widget
# Update current_row and current_col if needed
self.current_row = max(self.current_row, new_row)
self.current_col = max(self.current_col, new_col + colspan)
def shift_all_widgets(self, direction: Literal["down", "up", "left", "right"]) -> None:
"""
Shift all widgets in the grid in the specified direction to make room and prevent negative indices.
Args:
direction (Literal["down", "up", "left", "right"]): Direction to shift all widgets.
"""
# First, collect all the shifts to perform
shifts = []
for widget, (row, col, rowspan, colspan) in self.widget_positions.items():
if direction == "down":
new_row = row + 1
new_col = col
elif direction == "up":
new_row = row - 1
new_col = col
elif direction == "right":
new_row = row
new_col = col + 1
elif direction == "left":
new_row = row
new_col = col - 1
# Check for negative indices
if new_row < 0 or new_col < 0:
raise ValueError("Shifting widgets out of grid boundaries.")
shifts.append((widget, (row, col), (new_row, new_col), (rowspan, colspan)))
# Now perform the shifts
for widget, (old_row, old_col), (new_row, new_col), (rowspan, colspan) in shifts:
self.layout.removeWidget(widget)
self.position_widgets.pop((old_row, old_col))
for widget, (old_row, old_col), (new_row, new_col), (rowspan, colspan) in shifts:
self.layout.addWidget(widget, new_row, new_col, rowspan, colspan)
self.widget_positions[widget] = (new_row, new_col, rowspan, colspan)
self.position_widgets[(new_row, new_col)] = widget
# Update current_row and current_col based on new widget positions
self.current_row = max((pos[0] for pos in self.position_widgets.keys()), default=0)
self.current_col = max((pos[1] for pos in self.position_widgets.keys()), default=0)
def remove(
self,
row: int | None = None,
col: int | None = None,
coordinates: Tuple[int, int] | None = None,
) -> None:
"""
Remove a widget from the layout. Can be removed by widget ID or by coordinates.
Args:
row (int, optional): The row coordinate of the widget to remove.
col (int, optional): The column coordinate of the widget to remove.
coordinates (tuple[int, int], optional): The (row, col) coordinates of the widget to remove.
Raises:
ValueError: If the widget to remove is not found.
"""
if coordinates:
row, col = coordinates
widget = self.get_widget(row, col)
if widget is None:
raise ValueError(f"No widget found at coordinates {coordinates}.")
elif row is not None and col is not None:
widget = self.get_widget(row, col)
if widget is None:
raise ValueError(f"No widget found at position ({row}, {col}).")
else:
raise ValueError(
"Must provide either widget_id, coordinates, or both row and col for removal."
)
self.remove_widget(widget)
def remove_widget(self, widget: QWidget) -> None:
"""
Remove a widget from the grid and reindex the grid to keep it compact.
Args:
widget (QWidget): The widget to remove.
Raises:
ValueError: If the widget is not found in the layout.
"""
if widget not in self.widget_positions:
raise ValueError("Widget not found in layout.")
position = self.widget_positions.pop(widget)
self.position_widgets.pop((position[0], position[1]))
self.layout.removeWidget(widget)
widget.setParent(None) # Remove widget from the parent
widget.deleteLater()
# Reindex the grid to maintain compactness
if self.auto_reindex:
self.reindex_grid()
def get_widget(self, row: int, col: int) -> QWidget | None:
"""
Get the widget at the specified position.
Args:
row (int): The row coordinate.
col (int): The column coordinate.
Returns:
QWidget | None: The widget at the specified position, or None if empty.
"""
return self.position_widgets.get((row, col))
def get_widget_position(self, widget: QWidget) -> Tuple[int, int, int, int] | None:
"""
Get the position of the specified widget.
Args:
widget (QWidget): The widget to query.
Returns:
Tuple[int, int, int, int] | None: The (row, col, rowspan, colspan) tuple, or None if not found.
"""
return self.widget_positions.get(widget)
def change_layout(self, num_rows: int | None = None, num_cols: int | None = None) -> None:
"""
Change the layout to have a certain number of rows and/or columns,
rearranging the widgets accordingly.
If only one of num_rows or num_cols is provided, the other is calculated automatically
based on the number of widgets and the provided constraint.
If both are provided, num_rows is calculated based on num_cols.
Args:
num_rows (int | None): The new maximum number of rows.
num_cols (int | None): The new maximum number of columns.
"""
if num_rows is None and num_cols is None:
return # Nothing to change
total_widgets = len(self.widget_positions)
if num_cols is not None:
# Calculate num_rows based on num_cols
num_rows = math.ceil(total_widgets / num_cols)
elif num_rows is not None:
# Calculate num_cols based on num_rows
num_cols = math.ceil(total_widgets / num_rows)
# Sort widgets by current position (row-major order)
widgets_sorted = sorted(
self.widget_positions.items(),
key=lambda item: (item[1][0], item[1][1]), # Sort by row, then column
)
# Clear the layout without deleting widgets
for widget, _ in widgets_sorted:
self.layout.removeWidget(widget)
# Reset position mappings
self.widget_positions.clear()
self.position_widgets.clear()
# Re-add widgets based on new layout constraints
current_row, current_col = 0, 0
for widget, _ in widgets_sorted:
if current_col >= num_cols:
current_col = 0
current_row += 1
self.layout.addWidget(widget, current_row, current_col, 1, 1)
self.widget_positions[widget] = (current_row, current_col, 1, 1)
self.position_widgets[(current_row, current_col)] = widget
current_col += 1
# Update current_row and current_col for automatic placement
self.current_row = current_row
self.current_col = current_col
# Reindex the grid to ensure compactness
self.reindex_grid()
def clear_layout(self) -> None:
"""
Remove all widgets from the layout without deleting them.
"""
for widget in list(self.widget_positions):
self.layout.removeWidget(widget)
self.position_widgets.pop(
(self.widget_positions[widget][0], self.widget_positions[widget][1])
)
self.widget_positions.pop(widget)
widget.setParent(None) # Optionally hide/remove the widget
self.current_row = 0
self.current_col = 0
def reindex_grid(self) -> None:
"""
Reindex the grid to remove empty rows and columns, ensuring that
widget coordinates are contiguous and start from (0, 0).
"""
# Step 1: Collect all occupied positions
occupied_positions = sorted(self.position_widgets.keys())
if not occupied_positions:
# No widgets to reindex
self.clear_layout()
return
# Step 2: Determine the new mapping by eliminating empty columns and rows
# Find unique rows and columns
unique_rows = sorted(set(pos[0] for pos in occupied_positions))
unique_cols = sorted(set(pos[1] for pos in occupied_positions))
# Create mappings from old to new indices
row_mapping = {old_row: new_row for new_row, old_row in enumerate(unique_rows)}
col_mapping = {old_col: new_col for new_col, old_col in enumerate(unique_cols)}
# Step 3: Collect widgets with their new positions
widgets_with_new_positions = []
for widget, (row, col, rowspan, colspan) in self.widget_positions.items():
new_row = row_mapping[row]
new_col = col_mapping[col]
widgets_with_new_positions.append((widget, new_row, new_col, rowspan, colspan))
# Step 4: Clear the layout and reset mappings
self.clear_layout()
# Reset current_row and current_col
self.current_row = 0
self.current_col = 0
# Step 5: Re-add widgets with new positions
for widget, new_row, new_col, rowspan, colspan in widgets_with_new_positions:
self.layout.addWidget(widget, new_row, new_col, rowspan, colspan)
self.widget_positions[widget] = (new_row, new_col, rowspan, colspan)
self.position_widgets[(new_row, new_col)] = widget
# Update current position for automatic placement
self.current_col = max(self.current_col, new_col + colspan)
self.current_row = max(self.current_row, new_row)
def get_widgets_positions(self) -> Dict[QWidget, Tuple[int, int, int, int]]:
"""
Get the positions of all widgets in the layout.
Returns:
Dict[QWidget, Tuple[int, int, int, int]]: Mapping of widgets to their (row, col, rowspan, colspan).
"""
return self.widget_positions.copy()
def print_all_button_text(self):
"""Debug function to print the text of all QPushButton widgets."""
print("Coordinates - Button Text")
for coord, widget in self.position_widgets.items():
if isinstance(widget, QPushButton):
print(f"{coord} - {widget.text()}")
####################################################################################################
# The following code is for the GUI control panel to interact with the LayoutManagerWidget.
# It is not covered by any tests as it serves only as an example for the LayoutManagerWidget class.
####################################################################################################
class ControlPanel(QWidget): # pragma: no cover
def __init__(self, layout_manager: LayoutManagerWidget):
super().__init__()
self.layout_manager = layout_manager
self.init_ui()
def init_ui(self):
main_layout = QVBoxLayout()
# Add Widget by Coordinates
add_coord_group = QGroupBox("Add Widget by Coordinates")
add_coord_layout = QGridLayout()
add_coord_layout.addWidget(QLabel("Text:"), 0, 0)
self.text_input = QLineEdit()
add_coord_layout.addWidget(self.text_input, 0, 1)
add_coord_layout.addWidget(QLabel("Row:"), 1, 0)
self.row_input = QSpinBox()
self.row_input.setMinimum(0)
add_coord_layout.addWidget(self.row_input, 1, 1)
add_coord_layout.addWidget(QLabel("Column:"), 2, 0)
self.col_input = QSpinBox()
self.col_input.setMinimum(0)
add_coord_layout.addWidget(self.col_input, 2, 1)
self.add_button = QPushButton("Add at Coordinates")
self.add_button.clicked.connect(self.add_at_coordinates)
add_coord_layout.addWidget(self.add_button, 3, 0, 1, 2)
add_coord_group.setLayout(add_coord_layout)
main_layout.addWidget(add_coord_group)
# Add Widget Relative
add_rel_group = QGroupBox("Add Widget Relative to Existing")
add_rel_layout = QGridLayout()
add_rel_layout.addWidget(QLabel("Text:"), 0, 0)
self.rel_text_input = QLineEdit()
add_rel_layout.addWidget(self.rel_text_input, 0, 1)
add_rel_layout.addWidget(QLabel("Reference Widget:"), 1, 0)
self.ref_widget_combo = QComboBox()
add_rel_layout.addWidget(self.ref_widget_combo, 1, 1)
add_rel_layout.addWidget(QLabel("Position:"), 2, 0)
self.position_combo = QComboBox()
self.position_combo.addItems(["left", "right", "top", "bottom"])
add_rel_layout.addWidget(self.position_combo, 2, 1)
self.add_rel_button = QPushButton("Add Relative")
self.add_rel_button.clicked.connect(self.add_relative)
add_rel_layout.addWidget(self.add_rel_button, 3, 0, 1, 2)
add_rel_group.setLayout(add_rel_layout)
main_layout.addWidget(add_rel_group)
# Remove Widget
remove_group = QGroupBox("Remove Widget")
remove_layout = QGridLayout()
remove_layout.addWidget(QLabel("Row:"), 0, 0)
self.remove_row_input = QSpinBox()
self.remove_row_input.setMinimum(0)
remove_layout.addWidget(self.remove_row_input, 0, 1)
remove_layout.addWidget(QLabel("Column:"), 1, 0)
self.remove_col_input = QSpinBox()
self.remove_col_input.setMinimum(0)
remove_layout.addWidget(self.remove_col_input, 1, 1)
self.remove_button = QPushButton("Remove at Coordinates")
self.remove_button.clicked.connect(self.remove_widget)
remove_layout.addWidget(self.remove_button, 2, 0, 1, 2)
remove_group.setLayout(remove_layout)
main_layout.addWidget(remove_group)
# Change Layout
change_layout_group = QGroupBox("Change Layout")
change_layout_layout = QGridLayout()
change_layout_layout.addWidget(QLabel("Number of Rows:"), 0, 0)
self.change_rows_input = QSpinBox()
self.change_rows_input.setMinimum(1)
self.change_rows_input.setValue(1) # Default value
change_layout_layout.addWidget(self.change_rows_input, 0, 1)
change_layout_layout.addWidget(QLabel("Number of Columns:"), 1, 0)
self.change_cols_input = QSpinBox()
self.change_cols_input.setMinimum(1)
self.change_cols_input.setValue(1) # Default value
change_layout_layout.addWidget(self.change_cols_input, 1, 1)
self.change_layout_button = QPushButton("Apply Layout Change")
self.change_layout_button.clicked.connect(self.change_layout)
change_layout_layout.addWidget(self.change_layout_button, 2, 0, 1, 2)
change_layout_group.setLayout(change_layout_layout)
main_layout.addWidget(change_layout_group)
# Remove All Widgets
self.clear_all_button = QPushButton("Clear All Widgets")
self.clear_all_button.clicked.connect(self.clear_all_widgets)
main_layout.addWidget(self.clear_all_button)
# Refresh Reference Widgets and Print Button
self.refresh_button = QPushButton("Refresh Reference Widgets")
self.refresh_button.clicked.connect(self.refresh_references)
self.print_button = QPushButton("Print All Button Text")
self.print_button.clicked.connect(self.layout_manager.print_all_button_text)
main_layout.addWidget(self.refresh_button)
main_layout.addWidget(self.print_button)
main_layout.addStretch()
self.setLayout(main_layout)
self.refresh_references()
def refresh_references(self):
self.ref_widget_combo.clear()
widgets = self.layout_manager.get_widgets_positions()
for widget in widgets:
if isinstance(widget, QPushButton):
self.ref_widget_combo.addItem(widget.text(), widget)
def add_at_coordinates(self):
text = self.text_input.text()
row = self.row_input.value()
col = self.col_input.value()
if not text:
QMessageBox.warning(self, "Input Error", "Please enter text for the button.")
return
button = QPushButton(text)
try:
self.layout_manager.add_widget(widget=button, row=row, col=col)
self.refresh_references()
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
def add_relative(self):
text = self.rel_text_input.text()
ref_index = self.ref_widget_combo.currentIndex()
ref_widget = self.ref_widget_combo.itemData(ref_index)
position = self.position_combo.currentText()
if not text:
QMessageBox.warning(self, "Input Error", "Please enter text for the button.")
return
if ref_widget is None:
QMessageBox.warning(self, "Input Error", "Please select a reference widget.")
return
button = QPushButton(text)
try:
self.layout_manager.add_widget_relative(
widget=button, reference_widget=ref_widget, position=position
)
self.refresh_references()
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
def remove_widget(self):
row = self.remove_row_input.value()
col = self.remove_col_input.value()
try:
widget = self.layout_manager.get_widget(row, col)
if widget is None:
QMessageBox.warning(self, "Not Found", f"No widget found at ({row}, {col}).")
return
self.layout_manager.remove_widget(widget)
self.refresh_references()
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
def change_layout(self):
num_rows = self.change_rows_input.value()
num_cols = self.change_cols_input.value()
try:
self.layout_manager.change_layout(num_rows=num_rows, num_cols=num_cols)
self.refresh_references()
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
def clear_all_widgets(self):
reply = QMessageBox.question(
self,
"Confirm Clear",
"Are you sure you want to remove all widgets?",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No,
)
if reply == QMessageBox.Yes:
try:
self.layout_manager.clear_layout()
self.refresh_references()
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
class MainWindow(QMainWindow): # pragma: no cover
def __init__(self):
super().__init__()
self.setWindowTitle("Layout Manager Demo")
self.resize(800, 600)
self.init_ui()
def init_ui(self):
central_widget = QWidget()
main_layout = QHBoxLayout()
# Layout Area GroupBox
layout_group = QGroupBox("Layout Area")
layout_group.setMinimumSize(400, 400)
layout_layout = QVBoxLayout()
self.layout_manager = LayoutManagerWidget()
layout_layout.addWidget(self.layout_manager)
layout_group.setLayout(layout_layout)
# Splitter
splitter = QSplitter()
splitter.addWidget(layout_group)
# Control Panel
control_panel = ControlPanel(self.layout_manager)
control_group = QGroupBox("Control Panel")
control_layout = QVBoxLayout()
control_layout.addWidget(control_panel)
control_layout.addStretch()
control_group.setLayout(control_layout)
splitter.addWidget(control_group)
main_layout.addWidget(splitter)
central_widget.setLayout(main_layout)
self.setCentralWidget(central_widget)
if __name__ == "__main__": # pragma: no cover
app = QApplication(sys.argv)
window = MainWindow()
window.show()
sys.exit(app.exec_())

View File

@@ -1,74 +0,0 @@
from bec_lib.logger import bec_logger
from qtpy.QtWidgets import QApplication, QMainWindow
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.container_utils import WidgetContainerUtils
from bec_widgets.widgets.containers.dock.dock_area import BECDockArea
logger = bec_logger.logger
class BECMainWindow(BECWidget, QMainWindow):
def __init__(self, gui_id: str = None, *args, **kwargs):
BECWidget.__init__(self, gui_id=gui_id, **kwargs)
QMainWindow.__init__(self, *args, **kwargs)
def _dump(self):
"""Return a dictionary with informations about the application state, for use in tests"""
# TODO: ModularToolBar and something else leak top-level widgets (3 or 4 QMenu + 2 QWidget);
# so, a filtering based on title is applied here, but the solution is to not have those widgets
# as top-level (so for now, a window with no title does not appear in _dump() result)
# NOTE: the main window itself is excluded, since we want to dump dock areas
info = {
tlw.gui_id: {
"title": tlw.windowTitle(),
"visible": tlw.isVisible(),
"class": str(type(tlw)),
}
for tlw in QApplication.instance().topLevelWidgets()
if tlw is not self and tlw.windowTitle()
}
# Add the main window dock area
info[self.centralWidget().gui_id] = {
"title": self.windowTitle(),
"visible": self.isVisible(),
"class": str(type(self.centralWidget())),
}
return info
def new_dock_area(
self, name: str | None = None, geometry: tuple[int, int, int, int] | None = None
) -> BECDockArea:
"""Create a new dock area.
Args:
name(str): The name of the dock area.
geometry(tuple): The geometry parameters to be passed to the dock area.
Returns:
BECDockArea: The newly created dock area.
"""
rpc_register = RPCRegister()
existing_dock_areas = rpc_register.get_names_of_rpc_by_class_type(BECDockArea)
if name is not None:
if name in existing_dock_areas:
raise ValueError(
f"Name {name} must be unique for dock areas, but already exists: {existing_dock_areas}."
)
else:
name = "dock_area"
name = WidgetContainerUtils.generate_unique_name(name, existing_dock_areas)
dock_area = BECDockArea(name=name)
dock_area.resize(dock_area.minimumSizeHint())
# TODO Should we simply use the specified name as title here?
dock_area.window().setWindowTitle(f"BEC - {name}")
logger.info(f"Created new dock area: {name}")
logger.info(f"Existing dock areas: {geometry}")
if geometry is not None:
dock_area.setGeometry(*geometry)
dock_area.show()
return dock_area
def cleanup(self):
super().close()

View File

@@ -1,11 +0,0 @@
from bec_widgets.widgets.control.device_control.positioner_box.positioner_box.positioner_box import (
PositionerBox,
)
from bec_widgets.widgets.control.device_control.positioner_box.positioner_box_2d.positioner_box_2d import (
PositionerBox2D,
)
from bec_widgets.widgets.control.device_control.positioner_box.positioner_control_line.positioner_control_line import (
PositionerControlLine,
)
__ALL__ = ["PositionerBox", "PositionerControlLine", "PositionerBox2D"]

View File

@@ -1,3 +0,0 @@
from .positioner_box_base import PositionerBoxBase
__ALL__ = ["PositionerBoxBase"]

View File

@@ -1,243 +0,0 @@
import uuid
from abc import abstractmethod
from ast import Tuple
from typing import Callable, TypedDict
from bec_lib.device import Positioner
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.messages import ScanQueueMessage
from qtpy.QtWidgets import (
QDialog,
QDoubleSpinBox,
QGroupBox,
QLabel,
QLineEdit,
QPushButton,
QVBoxLayout,
)
from bec_widgets.qt_utils.compact_popup import CompactPopupWidget
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.widgets.control.device_control.position_indicator.position_indicator import (
PositionIndicator,
)
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import BECDeviceFilter
from bec_widgets.widgets.control.device_input.device_line_edit.device_line_edit import (
DeviceLineEdit,
)
from bec_widgets.widgets.utility.spinner.spinner import SpinnerWidget
logger = bec_logger.logger
class DeviceUpdateUIComponents(TypedDict):
spinner: SpinnerWidget
setpoint: QLineEdit
readback: QLabel
position_indicator: PositionIndicator
step_size: QDoubleSpinBox
device_box: QGroupBox
stop: QPushButton
tweak_increase: QPushButton
tweak_decrease: QPushButton
class PositionerBoxBase(BECWidget, CompactPopupWidget):
"""Contains some core logic for positioner box widgets"""
current_path = ""
ICON_NAME = "switch_right"
def __init__(self, parent=None, **kwargs):
"""Initialize the PositionerBox widget.
Args:
parent: The parent widget.
device (Positioner): The device to control.
"""
super().__init__(**kwargs)
CompactPopupWidget.__init__(self, parent=parent, layout=QVBoxLayout)
self._dialog = None
self.get_bec_shortcuts()
def _check_device_is_valid(self, device: str):
"""Check if the device is a positioner
Args:
device (str): The device name
"""
if device not in self.dev:
logger.info(f"Device {device} not found in the device list")
return False
if not isinstance(self.dev[device], Positioner):
logger.info(f"Device {device} is not a positioner")
return False
return True
@abstractmethod
def _device_ui_components(self, device: str) -> DeviceUpdateUIComponents: ...
def _init_device(
self,
device: str,
position_emit: Callable[[float], None],
limit_update: Callable[[tuple[float, float]], None],
):
"""Init the device view and readback"""
if self._check_device_is_valid(device):
data = self.dev[device].read()
self._on_device_readback(
device,
self._device_ui_components(device),
{"signals": data},
{},
position_emit,
limit_update,
)
def _stop_device(self, device: str):
"""Stop call"""
request_id = str(uuid.uuid4())
params = {"device": device, "rpc_id": request_id, "func": "stop", "args": [], "kwargs": {}}
msg = ScanQueueMessage(
scan_type="device_rpc",
parameter=params,
queue="emergency",
metadata={"RID": request_id, "response": False},
)
self.client.connector.send(MessageEndpoints.scan_queue_request(), msg)
# pylint: disable=unused-argument
def _on_device_readback(
self,
device: str,
ui_components: DeviceUpdateUIComponents,
msg_content: dict,
metadata: dict,
position_emit: Callable[[float], None],
limit_update: Callable[[tuple[float, float]], None],
):
signals = msg_content.get("signals", {})
# pylint: disable=protected-access
hinted_signals = self.dev[device]._hints
precision = self.dev[device].precision
spinner = ui_components["spinner"]
position_indicator = ui_components["position_indicator"]
readback = ui_components["readback"]
setpoint = ui_components["setpoint"]
readback_val = None
setpoint_val = None
if len(hinted_signals) == 1:
signal = hinted_signals[0]
readback_val = signals.get(signal, {}).get("value")
for setpoint_signal in ["setpoint", "user_setpoint"]:
setpoint_val = signals.get(f"{device}_{setpoint_signal}", {}).get("value")
if setpoint_val is not None:
break
for moving_signal in ["motor_done_move", "motor_is_moving"]:
is_moving = signals.get(f"{device}_{moving_signal}", {}).get("value")
if is_moving is not None:
break
if is_moving is not None:
spinner.setVisible(True)
if is_moving:
spinner.start()
spinner.setToolTip("Device is moving")
self.set_global_state("warning")
else:
spinner.stop()
spinner.setToolTip("Device is idle")
self.set_global_state("success")
else:
spinner.setVisible(False)
if readback_val is not None:
readback.setText(f"{readback_val:.{precision}f}")
position_emit(readback_val)
if setpoint_val is not None:
setpoint.setText(f"{setpoint_val:.{precision}f}")
limits = self.dev[device].limits
limit_update(limits)
if limits is not None and readback_val is not None and limits[0] != limits[1]:
pos = (readback_val - limits[0]) / (limits[1] - limits[0])
position_indicator.set_value(pos)
def _update_limits_ui(
self, limits: tuple[float, float], position_indicator, setpoint_validator
):
if limits is not None and limits[0] != limits[1]:
position_indicator.setToolTip(f"Min: {limits[0]}, Max: {limits[1]}")
setpoint_validator.setRange(limits[0], limits[1])
else:
position_indicator.setToolTip("No limits set")
setpoint_validator.setRange(float("-inf"), float("inf"))
def _update_device_ui(self, device: str, ui: DeviceUpdateUIComponents):
ui["device_box"].setTitle(device)
ui["readback"].setToolTip(f"{device} readback")
ui["setpoint"].setToolTip(f"{device} setpoint")
ui["step_size"].setToolTip(f"Step size for {device}")
precision = self.dev[device].precision
if precision is not None:
ui["step_size"].setDecimals(precision)
ui["step_size"].setValue(10**-precision * 10)
def _swap_readback_signal_connection(self, slot, old_device, new_device):
self.bec_dispatcher.disconnect_slot(slot, MessageEndpoints.device_readback(old_device))
self.bec_dispatcher.connect_slot(slot, MessageEndpoints.device_readback(new_device))
def _toggle_enable_buttons(self, ui: DeviceUpdateUIComponents, enable: bool) -> None:
"""Toogle enable/disable on available buttons
Args:
enable (bool): Enable buttons
"""
ui["tweak_increase"].setEnabled(enable)
ui["tweak_decrease"].setEnabled(enable)
ui["stop"].setEnabled(enable)
ui["setpoint"].setEnabled(enable)
ui["step_size"].setEnabled(enable)
def _on_device_change(
self,
old_device: str,
new_device: str,
position_emit: Callable[[float], None],
limit_update: Callable[[tuple[float, float]], None],
on_device_readback: Callable,
ui: DeviceUpdateUIComponents,
):
logger.info(f"Device changed from {old_device} to {new_device}")
self._toggle_enable_buttons(ui, True)
self._init_device(new_device, position_emit, limit_update)
self._swap_readback_signal_connection(on_device_readback, old_device, new_device)
self._update_device_ui(new_device, ui)
def _open_dialog_selection(self, set_positioner: Callable):
def _ods():
"""Open dialog window for positioner selection"""
self._dialog = QDialog(self)
self._dialog.setWindowTitle("Positioner Selection")
layout = QVBoxLayout()
line_edit = DeviceLineEdit(
self, client=self.client, device_filter=[BECDeviceFilter.POSITIONER]
)
line_edit.textChanged.connect(set_positioner)
layout.addWidget(line_edit)
close_button = QPushButton("Close")
close_button.clicked.connect(self._dialog.accept)
layout.addWidget(close_button)
self._dialog.setLayout(layout)
self._dialog.exec()
self._dialog = None
return _ods

View File

@@ -1,242 +0,0 @@
""" Module for a PositionerBox widget to control a positioner device."""
from __future__ import annotations
import os
from bec_lib.device import Positioner
from bec_lib.logger import bec_logger
from bec_qthemes import material_icon
from qtpy.QtCore import Signal
from qtpy.QtGui import QDoubleValidator
from qtpy.QtWidgets import QDoubleSpinBox
from bec_widgets.qt_utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.utils import UILoader
from bec_widgets.utils.colors import get_accent_colors, set_theme
from bec_widgets.widgets.control.device_control.positioner_box._base import PositionerBoxBase
from bec_widgets.widgets.control.device_control.positioner_box._base.positioner_box_base import (
DeviceUpdateUIComponents,
)
logger = bec_logger.logger
MODULE_PATH = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
class PositionerBox(PositionerBoxBase):
"""Simple Widget to control a positioner in box form"""
ui_file = "positioner_box.ui"
dimensions = (234, 224)
PLUGIN = True
USER_ACCESS = ["set_positioner"]
device_changed = Signal(str, str)
# Signal emitted to inform listeners about a position update
position_update = Signal(float)
def __init__(self, parent=None, device: Positioner | str | None = None, **kwargs):
"""Initialize the PositionerBox widget.
Args:
parent: The parent widget.
device (Positioner): The device to control.
"""
super().__init__(parent=parent, **kwargs)
self._device = ""
self._limits = None
if self.current_path == "":
self.current_path = os.path.dirname(__file__)
self.init_ui()
self.device = device
self._init_device(self.device, self.position_update.emit, self.update_limits)
def init_ui(self):
"""Init the ui"""
self.device_changed.connect(self.on_device_change)
self.ui = UILoader(self).loader(os.path.join(self.current_path, self.ui_file))
self.addWidget(self.ui)
self.layout.setSpacing(0)
self.layout.setContentsMargins(0, 0, 0, 0)
# fix the size of the device box
db = self.ui.device_box
db.setFixedHeight(self.dimensions[0])
db.setFixedWidth(self.dimensions[1])
self.ui.step_size.setStepType(QDoubleSpinBox.AdaptiveDecimalStepType)
self.ui.stop.clicked.connect(self.on_stop)
self.ui.stop.setToolTip("Stop")
self.ui.stop.setStyleSheet(
f"QPushButton {{background-color: {get_accent_colors().emergency.name()}; color: white;}}"
)
self.ui.tweak_right.clicked.connect(self.on_tweak_right)
self.ui.tweak_right.setToolTip("Tweak right")
self.ui.tweak_left.clicked.connect(self.on_tweak_left)
self.ui.tweak_left.setToolTip("Tweak left")
self.ui.setpoint.returnPressed.connect(self.on_setpoint_change)
self.setpoint_validator = QDoubleValidator()
self.ui.setpoint.setValidator(self.setpoint_validator)
self.ui.spinner_widget.start()
self.ui.tool_button.clicked.connect(self._open_dialog_selection(self.set_positioner))
icon = material_icon(icon_name="edit_note", size=(16, 16), convert_to_pixmap=False)
self.ui.tool_button.setIcon(icon)
def force_update_readback(self):
self._init_device(self.device, self.position_update.emit, self.update_limits)
@SafeProperty(str)
def device(self):
"""Property to set the device"""
return self._device
@device.setter
def device(self, value: str):
"""Setter, checks if device is a string"""
if not value or not isinstance(value, str):
return
if not self._check_device_is_valid(value):
return
old_device = self._device
self._device = value
if not self.label:
self.label = value
self.device_changed.emit(old_device, value)
@SafeProperty(bool)
def hide_device_selection(self):
"""Hide the device selection"""
return not self.ui.tool_button.isVisible()
@hide_device_selection.setter
def hide_device_selection(self, value: bool):
"""Set the device selection visibility"""
self.ui.tool_button.setVisible(not value)
@SafeSlot(bool)
def show_device_selection(self, value: bool):
"""Show the device selection
Args:
value (bool): Show the device selection
"""
self.hide_device_selection = not value
@SafeSlot(str)
def set_positioner(self, positioner: str | Positioner):
"""Set the device
Args:
positioner (Positioner | str) : Positioner to set, accepts str or the device
"""
if isinstance(positioner, Positioner):
positioner = positioner.name
self.device = positioner
@SafeSlot(str, str)
def on_device_change(self, old_device: str, new_device: str):
"""Upon changing the device, a check will be performed if the device is a Positioner.
Args:
old_device (str): The old device name.
new_device (str): The new device name.
"""
if not self._check_device_is_valid(new_device):
return
self._on_device_change(
old_device,
new_device,
self.position_update.emit,
self.update_limits,
self.on_device_readback,
self._device_ui_components(new_device),
)
def _device_ui_components(self, device: str) -> DeviceUpdateUIComponents:
return {
"spinner": self.ui.spinner_widget,
"position_indicator": self.ui.position_indicator,
"readback": self.ui.readback,
"setpoint": self.ui.setpoint,
"step_size": self.ui.step_size,
"device_box": self.ui.device_box,
"stop": self.ui.stop,
"tweak_increase": self.ui.tweak_right,
"tweak_decrease": self.ui.tweak_left,
}
@SafeSlot(dict, dict)
def on_device_readback(self, msg_content: dict, metadata: dict):
"""Callback for device readback.
Args:
msg_content (dict): The message content.
metadata (dict): The message metadata.
"""
self._on_device_readback(
self.device,
self._device_ui_components(self.device),
msg_content,
metadata,
self.position_update.emit,
self.update_limits,
)
def update_limits(self, limits: tuple):
"""Update limits
Args:
limits (tuple): Limits of the positioner
"""
if limits == self._limits:
return
self._limits = limits
self._update_limits_ui(limits, self.ui.position_indicator, self.setpoint_validator)
@SafeSlot()
def on_stop(self):
self._stop_device(self.device)
@property
def step_size(self):
"""Step size for tweak"""
return self.ui.step_size.value()
@SafeSlot()
def on_tweak_right(self):
"""Tweak motor right"""
self.dev[self.device].move(self.step_size, relative=True)
@SafeSlot()
def on_tweak_left(self):
"""Tweak motor left"""
self.dev[self.device].move(-self.step_size, relative=True)
@SafeSlot()
def on_setpoint_change(self):
"""Change the setpoint for the motor"""
self.ui.setpoint.clearFocus()
setpoint = self.ui.setpoint.text()
self.dev[self.device].move(float(setpoint), relative=False)
self.ui.tweak_left.setToolTip(f"Tweak left by {self.step_size}")
self.ui.tweak_right.setToolTip(f"Tweak right by {self.step_size}")
if __name__ == "__main__": # pragma: no cover
import sys
from qtpy.QtWidgets import QApplication # pylint: disable=ungrouped-imports
app = QApplication(sys.argv)
set_theme("dark")
widget = PositionerBox(device="bpm4i")
widget.show()
sys.exit(app.exec_())

View File

@@ -1,56 +0,0 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.control.device_control.positioner_box.positioner_box_2d.positioner_box_2d import (
PositionerBox2D,
)
DOM_XML = """
<ui language='c++'>
<widget class='PositionerBox2D' name='positioner_box2_d'>
</widget>
</ui>
"""
class PositionerBox2DPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
t = PositionerBox2D(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return "Device Control"
def icon(self):
return designer_material_icon(PositionerBox2D.ICON_NAME)
def includeFile(self):
return "positioner_box2_d"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "PositionerBox2D"
def toolTip(self):
return "Simple Widget to control two positioners in box form"
def whatsThis(self):
return self.toolTip()

View File

@@ -1,482 +0,0 @@
""" Module for a PositionerBox2D widget to control two positioner devices."""
from __future__ import annotations
import os
from typing import Literal
from bec_lib.device import Positioner
from bec_lib.logger import bec_logger
from bec_qthemes import material_icon
from qtpy.QtCore import Signal
from qtpy.QtGui import QDoubleValidator
from qtpy.QtWidgets import QDoubleSpinBox
from bec_widgets.qt_utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.utils import UILoader
from bec_widgets.utils.colors import set_theme
from bec_widgets.widgets.control.device_control.positioner_box._base import PositionerBoxBase
from bec_widgets.widgets.control.device_control.positioner_box._base.positioner_box_base import (
DeviceUpdateUIComponents,
)
logger = bec_logger.logger
MODULE_PATH = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
DeviceId = Literal["horizontal", "vertical"]
class PositionerBox2D(PositionerBoxBase):
"""Simple Widget to control two positioners in box form"""
ui_file = "positioner_box_2d.ui"
PLUGIN = True
USER_ACCESS = ["set_positioner_hor", "set_positioner_ver"]
device_changed_hor = Signal(str, str)
device_changed_ver = Signal(str, str)
# Signals emitted to inform listeners about a position update
position_update_hor = Signal(float)
position_update_ver = Signal(float)
def __init__(
self,
parent=None,
device_hor: Positioner | str | None = None,
device_ver: Positioner | str | None = None,
**kwargs,
):
"""Initialize the PositionerBox widget.
Args:
parent: The parent widget.
device_hor (Positioner | str): The first device to control - assigned the horizontal axis.
device_ver (Positioner | str): The second device to control - assigned the vertical axis.
"""
super().__init__(parent=parent, **kwargs)
self._device_hor = ""
self._device_ver = ""
self._limits_hor = None
self._limits_ver = None
self._dialog = None
if self.current_path == "":
self.current_path = os.path.dirname(__file__)
self.init_ui()
self.device_hor = device_hor
self.device_ver = device_ver
self.connect_ui()
def init_ui(self):
"""Init the ui"""
self.device_changed_hor.connect(self.on_device_change_hor)
self.device_changed_ver.connect(self.on_device_change_ver)
self.ui = UILoader(self).loader(os.path.join(self.current_path, self.ui_file))
self.setpoint_validator_hor = QDoubleValidator()
self.setpoint_validator_ver = QDoubleValidator()
def connect_ui(self):
"""Connect the UI components to signals, data, or routines"""
self.addWidget(self.ui)
self.layout.setSpacing(0)
self.layout.setContentsMargins(0, 0, 0, 0)
def _init_ui(val: QDoubleValidator, device_id: DeviceId):
ui = self._device_ui_components_hv(device_id)
tweak_inc = (
self.on_tweak_inc_hor if device_id == "horizontal" else self.on_tweak_inc_ver
)
tweak_dec = (
self.on_tweak_dec_hor if device_id == "horizontal" else self.on_tweak_dec_ver
)
ui["setpoint"].setValidator(val)
ui["setpoint"].returnPressed.connect(
self.on_setpoint_change_hor
if device_id == "horizontal"
else self.on_setpoint_change_ver
)
ui["stop"].setToolTip("Stop")
ui["step_size"].setStepType(QDoubleSpinBox.StepType.AdaptiveDecimalStepType)
ui["tweak_increase"].clicked.connect(tweak_inc)
ui["tweak_decrease"].clicked.connect(tweak_dec)
_init_ui(self.setpoint_validator_hor, "horizontal")
_init_ui(self.setpoint_validator_ver, "vertical")
self.ui.stop_button.button.clicked.connect(self.on_stop)
self.ui.step_decrease_hor.clicked.connect(self.on_step_dec_hor)
self.ui.step_decrease_ver.clicked.connect(self.on_step_dec_ver)
self.ui.step_increase_hor.clicked.connect(self.on_step_inc_hor)
self.ui.step_increase_ver.clicked.connect(self.on_step_inc_ver)
self.ui.tool_button_hor.clicked.connect(
self._open_dialog_selection(self.set_positioner_hor)
)
self.ui.tool_button_ver.clicked.connect(
self._open_dialog_selection(self.set_positioner_ver)
)
icon = material_icon(icon_name="edit_note", size=(16, 16), convert_to_pixmap=False)
self.ui.tool_button_hor.setIcon(icon)
self.ui.tool_button_ver.setIcon(icon)
step_tooltip = "Step by the step size"
tweak_tooltip = "Tweak by 1/10th the step size"
for b in [
self.ui.step_increase_hor,
self.ui.step_increase_ver,
self.ui.step_decrease_hor,
self.ui.step_decrease_ver,
]:
b.setToolTip(step_tooltip)
for b in [
self.ui.tweak_increase_hor,
self.ui.tweak_increase_ver,
self.ui.tweak_decrease_hor,
self.ui.tweak_decrease_ver,
]:
b.setToolTip(tweak_tooltip)
icon_options = {"size": (16, 16), "convert_to_pixmap": False}
self.ui.tweak_increase_hor.setIcon(
material_icon(icon_name="keyboard_arrow_right", **icon_options)
)
self.ui.step_increase_hor.setIcon(
material_icon(icon_name="keyboard_double_arrow_right", **icon_options)
)
self.ui.tweak_decrease_hor.setIcon(
material_icon(icon_name="keyboard_arrow_left", **icon_options)
)
self.ui.step_decrease_hor.setIcon(
material_icon(icon_name="keyboard_double_arrow_left", **icon_options)
)
self.ui.tweak_increase_ver.setIcon(
material_icon(icon_name="keyboard_arrow_up", **icon_options)
)
self.ui.step_increase_ver.setIcon(
material_icon(icon_name="keyboard_double_arrow_up", **icon_options)
)
self.ui.tweak_decrease_ver.setIcon(
material_icon(icon_name="keyboard_arrow_down", **icon_options)
)
self.ui.step_decrease_ver.setIcon(
material_icon(icon_name="keyboard_double_arrow_down", **icon_options)
)
@SafeProperty(str)
def device_hor(self):
"""SafeProperty to set the device"""
return self._device_hor
@device_hor.setter
def device_hor(self, value: str):
"""Setter, checks if device is a string"""
if not value or not isinstance(value, str):
return
if not self._check_device_is_valid(value):
return
if value == self.device_ver:
return
old_device = self._device_hor
self._device_hor = value
self.label = f"{self._device_hor}, {self._device_ver}"
self.device_changed_hor.emit(old_device, value)
self._init_device(self.device_hor, self.position_update_hor.emit, self.update_limits_hor)
@SafeProperty(str)
def device_ver(self):
"""SafeProperty to set the device"""
return self._device_ver
@device_ver.setter
def device_ver(self, value: str):
"""Setter, checks if device is a string"""
if not value or not isinstance(value, str):
return
if not self._check_device_is_valid(value):
return
if value == self.device_hor:
return
old_device = self._device_ver
self._device_ver = value
self.label = f"{self._device_hor}, {self._device_ver}"
self.device_changed_ver.emit(old_device, value)
self._init_device(self.device_ver, self.position_update_ver.emit, self.update_limits_ver)
@SafeProperty(bool)
def hide_device_selection(self):
"""Hide the device selection"""
return not self.ui.tool_button_hor.isVisible()
@hide_device_selection.setter
def hide_device_selection(self, value: bool):
"""Set the device selection visibility"""
self.ui.tool_button_hor.setVisible(not value)
self.ui.tool_button_ver.setVisible(not value)
@SafeProperty(bool)
def hide_device_boxes(self):
"""Hide the device selection"""
return not self.ui.device_box_hor.isVisible()
@hide_device_boxes.setter
def hide_device_boxes(self, value: bool):
"""Set the device selection visibility"""
self.ui.device_box_hor.setVisible(not value)
self.ui.device_box_ver.setVisible(not value)
@SafeSlot(bool)
def show_device_selection(self, value: bool):
"""Show the device selection
Args:
value (bool): Show the device selection
"""
self.hide_device_selection = not value
@SafeSlot(str)
def set_positioner_hor(self, positioner: str | Positioner):
"""Set the device
Args:
positioner (Positioner | str) : Positioner to set, accepts str or the device
"""
if isinstance(positioner, Positioner):
positioner = positioner.name
self.device_hor = positioner
@SafeSlot(str)
def set_positioner_ver(self, positioner: str | Positioner):
"""Set the device
Args:
positioner (Positioner | str) : Positioner to set, accepts str or the device
"""
if isinstance(positioner, Positioner):
positioner = positioner.name
self.device_ver = positioner
@SafeSlot(str, str)
def on_device_change_hor(self, old_device: str, new_device: str):
"""Upon changing the device, a check will be performed if the device is a Positioner.
Args:
old_device (str): The old device name.
new_device (str): The new device name.
"""
if not self._check_device_is_valid(new_device):
return
self._on_device_change(
old_device,
new_device,
self.position_update_hor.emit,
self.update_limits_hor,
self.on_device_readback_hor,
self._device_ui_components_hv("horizontal"),
)
@SafeSlot(str, str)
def on_device_change_ver(self, old_device: str, new_device: str):
"""Upon changing the device, a check will be performed if the device is a Positioner.
Args:
old_device (str): The old device name.
new_device (str): The new device name.
"""
if not self._check_device_is_valid(new_device):
return
self._on_device_change(
old_device,
new_device,
self.position_update_ver.emit,
self.update_limits_ver,
self.on_device_readback_ver,
self._device_ui_components_hv("vertical"),
)
def _device_ui_components_hv(self, device: DeviceId) -> DeviceUpdateUIComponents:
if device == "horizontal":
return {
"spinner": self.ui.spinner_widget_hor,
"position_indicator": self.ui.position_indicator_hor,
"readback": self.ui.readback_hor,
"setpoint": self.ui.setpoint_hor,
"step_size": self.ui.step_size_hor,
"device_box": self.ui.device_box_hor,
"stop": self.ui.stop_button,
"tweak_increase": self.ui.tweak_increase_hor,
"tweak_decrease": self.ui.tweak_decrease_hor,
}
elif device == "vertical":
return {
"spinner": self.ui.spinner_widget_ver,
"position_indicator": self.ui.position_indicator_ver,
"readback": self.ui.readback_ver,
"setpoint": self.ui.setpoint_ver,
"step_size": self.ui.step_size_ver,
"device_box": self.ui.device_box_ver,
"stop": self.ui.stop_button,
"tweak_increase": self.ui.tweak_increase_ver,
"tweak_decrease": self.ui.tweak_decrease_ver,
}
else:
raise ValueError(f"Device {device} is not represented by this UI")
def _device_ui_components(self, device: str):
if device == self.device_hor:
return self._device_ui_components_hv("horizontal")
if device == self.device_ver:
return self._device_ui_components_hv("vertical")
@SafeSlot(dict, dict)
def on_device_readback_hor(self, msg_content: dict, metadata: dict):
"""Callback for device readback.
Args:
msg_content (dict): The message content.
metadata (dict): The message metadata.
"""
self._on_device_readback(
self.device_hor,
self._device_ui_components_hv("horizontal"),
msg_content,
metadata,
self.position_update_hor.emit,
self.update_limits_hor,
)
@SafeSlot(dict, dict)
def on_device_readback_ver(self, msg_content: dict, metadata: dict):
"""Callback for device readback.
Args:
msg_content (dict): The message content.
metadata (dict): The message metadata.
"""
self._on_device_readback(
self.device_ver,
self._device_ui_components_hv("vertical"),
msg_content,
metadata,
self.position_update_ver.emit,
self.update_limits_ver,
)
def update_limits_hor(self, limits: tuple):
"""Update limits
Args:
limits (tuple): Limits of the positioner
"""
if limits == self._limits_hor:
return
self._limits_hor = limits
self._update_limits_ui(limits, self.ui.position_indicator_hor, self.setpoint_validator_hor)
def update_limits_ver(self, limits: tuple):
"""Update limits
Args:
limits (tuple): Limits of the positioner
"""
if limits == self._limits_ver:
return
self._limits_ver = limits
self._update_limits_ui(limits, self.ui.position_indicator_ver, self.setpoint_validator_ver)
@SafeSlot()
def on_stop(self):
self._stop_device(f"{self.device_hor} or {self.device_ver}")
@SafeProperty(float)
def step_size_hor(self):
"""Step size for tweak"""
return self.ui.step_size_hor.value()
@step_size_hor.setter
def step_size_hor(self, val: float):
"""Step size for tweak"""
self.ui.step_size_hor.setValue(val)
@SafeProperty(float)
def step_size_ver(self):
"""Step size for tweak"""
return self.ui.step_size_ver.value()
@step_size_ver.setter
def step_size_ver(self, val: float):
"""Step size for tweak"""
self.ui.step_size_ver.setValue(val)
@SafeSlot()
def on_tweak_inc_hor(self):
"""Tweak device a up"""
self.dev[self.device_hor].move(self.step_size_hor / 10, relative=True)
@SafeSlot()
def on_tweak_dec_hor(self):
"""Tweak device a down"""
self.dev[self.device_hor].move(-self.step_size_hor / 10, relative=True)
@SafeSlot()
def on_step_inc_hor(self):
"""Tweak device a up"""
self.dev[self.device_hor].move(self.step_size_hor, relative=True)
@SafeSlot()
def on_step_dec_hor(self):
"""Tweak device a down"""
self.dev[self.device_hor].move(-self.step_size_hor, relative=True)
@SafeSlot()
def on_tweak_inc_ver(self):
"""Tweak device a up"""
self.dev[self.device_ver].move(self.step_size_ver / 10, relative=True)
@SafeSlot()
def on_tweak_dec_ver(self):
"""Tweak device b down"""
self.dev[self.device_ver].move(-self.step_size_ver / 10, relative=True)
@SafeSlot()
def on_step_inc_ver(self):
"""Tweak device b up"""
self.dev[self.device_ver].move(self.step_size_ver, relative=True)
@SafeSlot()
def on_step_dec_ver(self):
"""Tweak device a down"""
self.dev[self.device_ver].move(-self.step_size_ver, relative=True)
@SafeSlot()
def on_setpoint_change_hor(self):
"""Change the setpoint for device a"""
self.ui.setpoint_hor.clearFocus()
setpoint = self.ui.setpoint_hor.text()
self.dev[self.device_hor].move(float(setpoint), relative=False)
@SafeSlot()
def on_setpoint_change_ver(self):
"""Change the setpoint for device b"""
self.ui.setpoint_ver.clearFocus()
setpoint = self.ui.setpoint_ver.text()
self.dev[self.device_ver].move(float(setpoint), relative=False)
if __name__ == "__main__": # pragma: no cover
import sys
from qtpy.QtWidgets import QApplication # pylint: disable=ungrouped-imports
app = QApplication(sys.argv)
set_theme("dark")
widget = PositionerBox2D()
widget.show()
sys.exit(app.exec_())

Some files were not shown because too many files have changed in this diff Show More